package http3
import (
"context"
"errors"
"fmt"
"io"
"log/slog"
"maps"
"net"
"sync"
"sync/atomic"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3/qlog"
"github.com/quic-go/quic-go/qlogwriter"
"github.com/quic-go/quic-go/quicvarint"
)
const maxQuarterStreamID = 1 <<60 - 1
const invalidStreamID = quic .StreamID (-1 )
type rawConn struct {
conn *quic .Conn
logger *slog .Logger
enableDatagrams bool
streamMx sync .Mutex
streams map [quic .StreamID ]*stateTrackingStream
rcvdControlStr atomic .Bool
rcvdQPACKEncoderStr atomic .Bool
rcvdQPACKDecoderStr atomic .Bool
controlStrHandler func (*quic .ReceiveStream , *frameParser )
onStreamsEmpty func ()
settings *Settings
receivedSettings chan struct {}
qlogger qlogwriter .Recorder
qloggerWG sync .WaitGroup
}
func newRawConn(
quicConn *quic .Conn ,
enableDatagrams bool ,
onStreamsEmpty func (),
controlStrHandler func (*quic .ReceiveStream , *frameParser ),
qlogger qlogwriter .Recorder ,
logger *slog .Logger ,
) *rawConn {
c := &rawConn {
conn : quicConn ,
logger : logger ,
enableDatagrams : enableDatagrams ,
receivedSettings : make (chan struct {}),
streams : make (map [quic .StreamID ]*stateTrackingStream ),
qlogger : qlogger ,
onStreamsEmpty : onStreamsEmpty ,
controlStrHandler : controlStrHandler ,
}
if qlogger != nil {
context .AfterFunc (quicConn .Context (), c .closeQlogger )
}
return c
}
func (c *rawConn ) OpenUniStream () (*quic .SendStream , error ) {
return c .conn .OpenUniStream ()
}
func (c *rawConn ) openControlStream (settings *settingsFrame ) (*quic .SendStream , error ) {
c .qloggerWG .Add (1 )
defer c .qloggerWG .Done ()
str , err := c .conn .OpenUniStream ()
if err != nil {
return nil , err
}
b := make ([]byte , 0 , 64 )
b = quicvarint .Append (b , streamTypeControlStream )
b = settings .Append (b )
if c .qlogger != nil {
sf := qlog .SettingsFrame {
MaxFieldSectionSize : settings .MaxFieldSectionSize ,
Other : maps .Clone (settings .Other ),
}
if settings .Datagram {
sf .Datagram = pointer (true )
}
if settings .ExtendedConnect {
sf .ExtendedConnect = pointer (true )
}
c .qlogger .RecordEvent (qlog .FrameCreated {
StreamID : str .StreamID (),
Raw : qlog .RawInfo {Length : len (b )},
Frame : qlog .Frame {Frame : sf },
})
}
if _ , err := str .Write (b ); err != nil {
return nil , err
}
return str , nil
}
func (c *rawConn ) TrackStream (str *quic .Stream ) *stateTrackingStream {
hstr := newStateTrackingStream (str , c , func (b []byte ) error { return c .sendDatagram (str .StreamID (), b ) })
c .streamMx .Lock ()
c .streams [str .StreamID ()] = hstr
c .qloggerWG .Add (1 )
c .streamMx .Unlock ()
return hstr
}
func (c *rawConn ) RemoteAddr () net .Addr {
return c .conn .RemoteAddr ()
}
func (c *rawConn ) ConnectionState () quic .ConnectionState {
return c .conn .ConnectionState ()
}
func (c *rawConn ) clearStream (id quic .StreamID ) {
c .streamMx .Lock ()
defer c .streamMx .Unlock ()
if _ , ok := c .streams [id ]; ok {
delete (c .streams , id )
c .qloggerWG .Done ()
}
if len (c .streams ) == 0 {
c .onStreamsEmpty ()
}
}
func (c *rawConn ) hasActiveStreams () bool {
c .streamMx .Lock ()
defer c .streamMx .Unlock ()
return len (c .streams ) > 0
}
func (c *rawConn ) CloseWithError (code quic .ApplicationErrorCode , msg string ) error {
return c .conn .CloseWithError (code , msg )
}
func (c *rawConn ) handleUnidirectionalStream (str *quic .ReceiveStream , isServer bool ) {
c .qloggerWG .Add (1 )
defer c .qloggerWG .Done ()
streamType , err := quicvarint .Read (quicvarint .NewReader (str ))
if err != nil {
if c .logger != nil {
c .logger .Debug ("reading stream type on stream failed" , "stream ID" , str .StreamID (), "error" , err )
}
return
}
switch streamType {
case streamTypeControlStream :
case streamTypeQPACKEncoderStream :
if isFirst := c .rcvdQPACKEncoderStr .CompareAndSwap (false , true ); !isFirst {
c .CloseWithError (quic .ApplicationErrorCode (ErrCodeStreamCreationError ), "duplicate QPACK encoder stream" )
}
return
case streamTypeQPACKDecoderStream :
if isFirst := c .rcvdQPACKDecoderStr .CompareAndSwap (false , true ); !isFirst {
c .CloseWithError (quic .ApplicationErrorCode (ErrCodeStreamCreationError ), "duplicate QPACK decoder stream" )
}
return
case streamTypePushStream :
if isServer {
c .CloseWithError (quic .ApplicationErrorCode (ErrCodeStreamCreationError ), "" )
} else {
c .CloseWithError (quic .ApplicationErrorCode (ErrCodeIDError ), "" )
}
return
default :
str .CancelRead (quic .StreamErrorCode (ErrCodeStreamCreationError ))
return
}
if isFirstControlStr := c .rcvdControlStr .CompareAndSwap (false , true ); !isFirstControlStr {
c .conn .CloseWithError (quic .ApplicationErrorCode (ErrCodeStreamCreationError ), "duplicate control stream" )
return
}
c .handleControlStream (str )
}
func (c *rawConn ) handleControlStream (str *quic .ReceiveStream ) {
fp := &frameParser {closeConn : c .conn .CloseWithError , r : str , streamID : str .StreamID ()}
f , err := fp .ParseNext (c .qlogger )
if err != nil {
var serr *quic .StreamError
if err == io .EOF || errors .As (err , &serr ) {
c .conn .CloseWithError (quic .ApplicationErrorCode (ErrCodeClosedCriticalStream ), "" )
return
}
c .conn .CloseWithError (quic .ApplicationErrorCode (ErrCodeFrameError ), "" )
return
}
sf , ok := f .(*settingsFrame )
if !ok {
c .conn .CloseWithError (quic .ApplicationErrorCode (ErrCodeMissingSettings ), "" )
return
}
c .settings = &Settings {
EnableDatagrams : sf .Datagram ,
EnableExtendedConnect : sf .ExtendedConnect ,
Other : sf .Other ,
}
close (c .receivedSettings )
if sf .Datagram {
if c .enableDatagrams && !c .ConnectionState ().SupportsDatagrams .Remote {
c .CloseWithError (quic .ApplicationErrorCode (ErrCodeSettingsError ), "missing QUIC Datagram support" )
return
}
c .qloggerWG .Add (1 )
go func () {
defer c .qloggerWG .Done ()
if err := c .receiveDatagrams (); err != nil {
if c .logger != nil {
c .logger .Debug ("receiving datagrams failed" , "error" , err )
}
}
}()
}
if c .controlStrHandler != nil {
c .controlStrHandler (str , fp )
}
}
func (c *rawConn ) sendDatagram (streamID quic .StreamID , b []byte ) error {
data := make ([]byte , 0 , len (b )+8 )
quarterStreamID := uint64 (streamID / 4 )
data = quicvarint .Append (data , uint64 (streamID /4 ))
data = append (data , b ...)
if c .qlogger != nil {
c .qlogger .RecordEvent (qlog .DatagramCreated {
QuaterStreamID : quarterStreamID ,
Raw : qlog .RawInfo {
Length : len (data ),
PayloadLength : len (b ),
},
})
}
return c .conn .SendDatagram (data )
}
func (c *rawConn ) receiveDatagrams () error {
for {
b , err := c .conn .ReceiveDatagram (context .Background ())
if err != nil {
return err
}
quarterStreamID , n , err := quicvarint .Parse (b )
if err != nil {
c .CloseWithError (quic .ApplicationErrorCode (ErrCodeDatagramError ), "" )
return fmt .Errorf ("could not read quarter stream id: %w" , err )
}
if c .qlogger != nil {
c .qlogger .RecordEvent (qlog .DatagramParsed {
QuaterStreamID : quarterStreamID ,
Raw : qlog .RawInfo {
Length : len (b ),
PayloadLength : len (b ) - n ,
},
})
}
if quarterStreamID > maxQuarterStreamID {
c .CloseWithError (quic .ApplicationErrorCode (ErrCodeDatagramError ), "" )
return fmt .Errorf ("invalid quarter stream id: %w" , err )
}
streamID := quic .StreamID (4 * quarterStreamID )
c .streamMx .Lock ()
dg , ok := c .streams [streamID ]
c .streamMx .Unlock ()
if !ok {
continue
}
dg .enqueueDatagram (b [n :])
}
}
func (c *rawConn ) ReceivedSettings () <-chan struct {} { return c .receivedSettings }
func (c *rawConn ) Settings () *Settings { return c .settings }
func (c *rawConn ) closeQlogger () {
if c .qlogger == nil {
return
}
c .qloggerWG .Wait ()
c .qlogger .Close ()
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .