package webtransport
import (
"context"
"sync"
"time"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
"github.com/quic-go/quic-go/quicvarint"
)
type session struct {
created chan struct {}
counter int
conn *Session
}
type sessionManager struct {
refCount sync .WaitGroup
ctx context .Context
ctxCancel context .CancelFunc
timeout time .Duration
mx sync .Mutex
conns map [quic .ConnectionTracingID ]map [sessionID ]*session
}
func newSessionManager(timeout time .Duration ) *sessionManager {
m := &sessionManager {
timeout : timeout ,
conns : make (map [quic .ConnectionTracingID ]map [sessionID ]*session ),
}
m .ctx , m .ctxCancel = context .WithCancel (context .Background ())
return m
}
func (m *sessionManager ) AddStream (connTracingID quic .ConnectionTracingID , str quic .Stream , id sessionID ) {
sess , isExisting := m .getOrCreateSession (connTracingID , id )
if isExisting {
sess .conn .addIncomingStream (str )
return
}
m .refCount .Add (1 )
go func () {
defer m .refCount .Done ()
m .handleStream (str , sess )
m .mx .Lock ()
defer m .mx .Unlock ()
sess .counter --
if sess .counter == 0 && sess .conn == nil {
m .maybeDelete (connTracingID , id )
}
}()
}
func (m *sessionManager ) maybeDelete (connTracingID quic .ConnectionTracingID , id sessionID ) {
sessions , ok := m .conns [connTracingID ]
if !ok {
return
}
delete (sessions , id )
if len (sessions ) == 0 {
delete (m .conns , connTracingID )
}
}
func (m *sessionManager ) AddUniStream (connTracingID quic .ConnectionTracingID , str quic .ReceiveStream ) {
idv , err := quicvarint .Read (quicvarint .NewReader (str ))
if err != nil {
str .CancelRead (1337 )
}
id := sessionID (idv )
sess , isExisting := m .getOrCreateSession (connTracingID , id )
if isExisting {
sess .conn .addIncomingUniStream (str )
return
}
m .refCount .Add (1 )
go func () {
defer m .refCount .Done ()
m .handleUniStream (str , sess )
m .mx .Lock ()
defer m .mx .Unlock ()
sess .counter --
if sess .counter == 0 && sess .conn == nil {
m .maybeDelete (connTracingID , id )
}
}()
}
func (m *sessionManager ) getOrCreateSession (connTracingID quic .ConnectionTracingID , id sessionID ) (sess *session , existed bool ) {
m .mx .Lock ()
defer m .mx .Unlock ()
sessions , ok := m .conns [connTracingID ]
if !ok {
sessions = make (map [sessionID ]*session )
m .conns [connTracingID ] = sessions
}
sess , ok = sessions [id ]
if ok && sess .conn != nil {
return sess , true
}
if !ok {
sess = &session {created : make (chan struct {})}
sessions [id ] = sess
}
sess .counter ++
return sess , false
}
func (m *sessionManager ) handleStream (str quic .Stream , sess *session ) {
t := time .NewTimer (m .timeout )
defer t .Stop ()
select {
case <- sess .created :
sess .conn .addIncomingStream (str )
case <- t .C :
str .CancelRead (WebTransportBufferedStreamRejectedErrorCode )
str .CancelWrite (WebTransportBufferedStreamRejectedErrorCode )
case <- m .ctx .Done ():
}
}
func (m *sessionManager ) handleUniStream (str quic .ReceiveStream , sess *session ) {
t := time .NewTimer (m .timeout )
defer t .Stop ()
select {
case <- sess .created :
sess .conn .addIncomingUniStream (str )
case <- t .C :
str .CancelRead (WebTransportBufferedStreamRejectedErrorCode )
case <- m .ctx .Done ():
}
}
func (m *sessionManager ) AddSession (qconn http3 .Connection , id sessionID , requestStr http3 .Stream ) *Session {
conn := newSession (id , qconn , requestStr )
connTracingID := qconn .Context ().Value (quic .ConnectionTracingKey ).(quic .ConnectionTracingID )
m .mx .Lock ()
defer m .mx .Unlock ()
sessions , ok := m .conns [connTracingID ]
if !ok {
sessions = make (map [sessionID ]*session )
m .conns [connTracingID ] = sessions
}
if sess , ok := sessions [id ]; ok {
sess .conn = conn
close (sess .created )
return conn
}
c := make (chan struct {})
close (c )
sessions [id ] = &session {created : c , conn : conn }
return conn
}
func (m *sessionManager ) Close () {
m .ctxCancel ()
m .refCount .Wait ()
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .