package webtransport
import (
"context"
"slices"
"sync"
"time"
"github.com/quic-go/quic-go"
)
type unestablishedSession struct {
Streams []*quic .Stream
UniStreams []*quic .ReceiveStream
Timer *time .Timer
}
type sessionEntry struct {
Unestablished *unestablishedSession
Session *Session
}
const maxRecentlyClosedSessions = 16
type sessionManager struct {
timeout time .Duration
mx sync .Mutex
sessions map [sessionID ]sessionEntry
recentlyClosedSessions []sessionID
}
func newSessionManager(timeout time .Duration ) *sessionManager {
return &sessionManager {
timeout : timeout ,
sessions : make (map [sessionID ]sessionEntry ),
}
}
func (m *sessionManager ) AddStream (str *quic .Stream , id sessionID ) {
m .mx .Lock ()
defer m .mx .Unlock ()
entry , ok := m .sessions [id ]
if !ok {
if slices .Contains (m .recentlyClosedSessions , id ) {
str .CancelRead (WTBufferedStreamRejectedErrorCode )
str .CancelWrite (WTBufferedStreamRejectedErrorCode )
return
}
entry = sessionEntry {Unestablished : &unestablishedSession {}}
m .sessions [id ] = entry
}
if entry .Session != nil {
entry .Session .addIncomingStream (str )
return
}
entry .Unestablished .Streams = append (entry .Unestablished .Streams , str )
m .resetTimer (id )
}
func (m *sessionManager ) AddUniStream (str *quic .ReceiveStream , id sessionID ) {
m .mx .Lock ()
defer m .mx .Unlock ()
entry , ok := m .sessions [id ]
if !ok {
if slices .Contains (m .recentlyClosedSessions , id ) {
str .CancelRead (WTBufferedStreamRejectedErrorCode )
return
}
entry = sessionEntry {Unestablished : &unestablishedSession {}}
m .sessions [id ] = entry
}
if entry .Session != nil {
entry .Session .addIncomingUniStream (str )
return
}
entry .Unestablished .UniStreams = append (entry .Unestablished .UniStreams , str )
m .resetTimer (id )
}
func (m *sessionManager ) resetTimer (id sessionID ) {
entry := m .sessions [id ]
if entry .Unestablished .Timer != nil {
entry .Unestablished .Timer .Reset (m .timeout )
return
}
entry .Unestablished .Timer = time .AfterFunc (m .timeout , func () { m .onTimer (id ) })
}
func (m *sessionManager ) onTimer (id sessionID ) {
m .mx .Lock ()
defer m .mx .Unlock ()
sessionEntry , ok := m .sessions [id ]
if !ok {
return
}
if sessionEntry .Session != nil {
return
}
for _ , str := range sessionEntry .Unestablished .Streams {
str .CancelRead (WTBufferedStreamRejectedErrorCode )
str .CancelWrite (WTBufferedStreamRejectedErrorCode )
}
for _ , uniStr := range sessionEntry .Unestablished .UniStreams {
uniStr .CancelRead (WTBufferedStreamRejectedErrorCode )
}
delete (m .sessions , id )
}
func (m *sessionManager ) AddSession (id sessionID , s *Session ) {
m .mx .Lock ()
defer m .mx .Unlock ()
entry , ok := m .sessions [id ]
if ok && entry .Unestablished != nil {
for _ , str := range entry .Unestablished .Streams {
s .addIncomingStream (str )
}
for _ , uniStr := range entry .Unestablished .UniStreams {
s .addIncomingUniStream (uniStr )
}
if entry .Unestablished .Timer != nil {
entry .Unestablished .Timer .Stop ()
}
entry .Unestablished = nil
}
m .sessions [id ] = sessionEntry {Session : s }
context .AfterFunc (s .Context (), func () {
m .deleteSession (id )
})
}
func (m *sessionManager ) deleteSession (id sessionID ) {
m .mx .Lock ()
defer m .mx .Unlock ()
delete (m .sessions , id )
m .recentlyClosedSessions = append (m .recentlyClosedSessions , id )
if len (m .recentlyClosedSessions ) > maxRecentlyClosedSessions {
m .recentlyClosedSessions = m .recentlyClosedSessions [1 :]
}
}
func (m *sessionManager ) Close () {
m .mx .Lock ()
defer m .mx .Unlock ()
for _ , entry := range m .sessions {
if entry .Unestablished != nil && entry .Unestablished .Timer != nil {
entry .Unestablished .Timer .Stop ()
}
}
clear (m .sessions )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .