package webtransport
import (
"context"
"errors"
"fmt"
"io"
"net"
"os"
"sync"
"time"
"github.com/quic-go/quic-go"
)
type quicSendStream interface {
io .WriteCloser
CancelWrite(quic .StreamErrorCode )
Context() context .Context
SetWriteDeadline(time .Time ) error
SetReliableBoundary()
}
var (
_ quicSendStream = &quic .SendStream {}
_ quicSendStream = &quic .Stream {}
)
type quicReceiveStream interface {
io .Reader
CancelRead(quic .StreamErrorCode )
SetReadDeadline(time .Time ) error
}
var (
_ quicReceiveStream = &quic .ReceiveStream {}
_ quicReceiveStream = &quic .Stream {}
)
type SendStream struct {
str quicSendStream
streamHdr []byte
streamHdrMu sync .Mutex
sendingHdrAsync bool
onClose func ()
closeOnce sync .Once
closed chan struct {}
closeErr error
deadlineMu sync .Mutex
writeDeadline time .Time
deadlineNotifyCh chan struct {}
}
func newSendStream(str quicSendStream , hdr []byte , onClose func ()) *SendStream {
return &SendStream {
str : str ,
closed : make (chan struct {}),
streamHdr : hdr ,
onClose : onClose ,
}
}
func (s *SendStream ) Write (b []byte ) (int , error ) {
n , err := s .write (b )
if err != nil && !isTimeoutError (err ) {
s .onClose ()
}
var strErr *quic .StreamError
if errors .As (err , &strErr ) && strErr .ErrorCode == WTSessionGoneErrorCode {
return n , s .handleSessionGoneError ()
}
return n , maybeConvertStreamError (err )
}
func (s *SendStream ) handleSessionGoneError () error {
s .deadlineMu .Lock ()
if s .deadlineNotifyCh == nil {
s .deadlineNotifyCh = make (chan struct {}, 1 )
}
s .deadlineMu .Unlock ()
for {
s .deadlineMu .Lock ()
deadline := s .writeDeadline
s .deadlineMu .Unlock ()
var timerCh <-chan time .Time
if !deadline .IsZero () {
if d := time .Until (deadline ); d > 0 {
timerCh = time .After (d )
} else {
return os .ErrDeadlineExceeded
}
}
select {
case <- s .closed :
return s .closeErr
case <- timerCh :
return os .ErrDeadlineExceeded
case <- s .deadlineNotifyCh :
}
}
}
func (s *SendStream ) write (b []byte ) (int , error ) {
s .streamHdrMu .Lock ()
err := s .maybeSendStreamHeader ()
s .streamHdrMu .Unlock ()
if err != nil {
return 0 , err
}
return s .str .Write (b )
}
func (s *SendStream ) maybeSendStreamHeader () error {
if len (s .streamHdr ) == 0 {
return nil
}
n , err := s .str .Write (s .streamHdr )
if n > 0 {
s .streamHdr = s .streamHdr [n :]
}
s .str .SetReliableBoundary ()
if err != nil {
return err
}
s .streamHdr = nil
return nil
}
func (s *SendStream ) CancelWrite (e StreamErrorCode ) {
s .streamHdrMu .Lock ()
if s .sendingHdrAsync {
s .streamHdrMu .Unlock ()
return
}
if len (s .streamHdr ) > 0 {
s .sendingHdrAsync = true
streamHdr := s .streamHdr
s .streamHdr = nil
s .streamHdrMu .Unlock ()
go func () {
s .SetWriteDeadline (time .Time {})
_, _ = s .str .Write (streamHdr )
s .str .SetReliableBoundary ()
s .str .CancelWrite (webtransportCodeToHTTPCode (e ))
s .onClose ()
}()
return
}
s .streamHdrMu .Unlock ()
s .str .CancelWrite (webtransportCodeToHTTPCode (e ))
s .onClose ()
}
func (s *SendStream ) closeWithSession (err error ) {
s .closeOnce .Do (func () {
s .closeErr = err
s .str .CancelWrite (WTSessionGoneErrorCode )
close (s .closed )
})
}
func (s *SendStream ) Close () error {
s .streamHdrMu .Lock ()
if s .sendingHdrAsync {
s .streamHdrMu .Unlock ()
return nil
}
if len (s .streamHdr ) > 0 {
s .sendingHdrAsync = true
streamHdr := s .streamHdr
s .streamHdr = nil
s .streamHdrMu .Unlock ()
go func () {
s .SetWriteDeadline (time .Time {})
_, _ = s .str .Write (streamHdr )
s .str .SetReliableBoundary ()
_ = s .str .Close ()
s .onClose ()
}()
return nil
}
s .streamHdrMu .Unlock ()
s .onClose ()
return maybeConvertStreamError (s .str .Close ())
}
func (s *SendStream ) Context () context .Context {
return s .str .Context ()
}
func (s *SendStream ) SetWriteDeadline (t time .Time ) error {
s .deadlineMu .Lock ()
s .writeDeadline = t
if s .deadlineNotifyCh != nil {
select {
case s .deadlineNotifyCh <- struct {}{}:
default :
}
}
s .deadlineMu .Unlock ()
return maybeConvertStreamError (s .str .SetWriteDeadline (t ))
}
type ReceiveStream struct {
str quicReceiveStream
onClose func ()
closeOnce sync .Once
closed chan struct {}
closeErr error
deadlineMu sync .Mutex
readDeadline time .Time
deadlineNotifyCh chan struct {}
}
func newReceiveStream(str quicReceiveStream , onClose func ()) *ReceiveStream {
return &ReceiveStream {
str : str ,
closed : make (chan struct {}),
onClose : onClose ,
}
}
func (s *ReceiveStream ) Read (b []byte ) (int , error ) {
n , err := s .str .Read (b )
if err != nil && !isTimeoutError (err ) {
s .onClose ()
}
var strErr *quic .StreamError
if errors .As (err , &strErr ) && strErr .ErrorCode == WTSessionGoneErrorCode {
return n , s .handleSessionGoneError ()
}
return n , maybeConvertStreamError (err )
}
func (s *ReceiveStream ) handleSessionGoneError () error {
s .deadlineMu .Lock ()
if s .deadlineNotifyCh == nil {
s .deadlineNotifyCh = make (chan struct {}, 1 )
}
s .deadlineMu .Unlock ()
for {
s .deadlineMu .Lock ()
deadline := s .readDeadline
s .deadlineMu .Unlock ()
var timerCh <-chan time .Time
if !deadline .IsZero () {
if d := time .Until (deadline ); d > 0 {
timerCh = time .After (d )
} else {
return os .ErrDeadlineExceeded
}
}
select {
case <- s .closed :
return s .closeErr
case <- timerCh :
return os .ErrDeadlineExceeded
case <- s .deadlineNotifyCh :
}
}
}
func (s *ReceiveStream ) CancelRead (e StreamErrorCode ) {
s .str .CancelRead (webtransportCodeToHTTPCode (e ))
s .onClose ()
}
func (s *ReceiveStream ) closeWithSession (err error ) {
s .closeOnce .Do (func () {
s .closeErr = err
s .str .CancelRead (WTSessionGoneErrorCode )
close (s .closed )
})
}
func (s *ReceiveStream ) SetReadDeadline (t time .Time ) error {
s .deadlineMu .Lock ()
s .readDeadline = t
if s .deadlineNotifyCh != nil {
select {
case s .deadlineNotifyCh <- struct {}{}:
default :
}
}
s .deadlineMu .Unlock ()
return maybeConvertStreamError (s .str .SetReadDeadline (t ))
}
type Stream struct {
sendStr *SendStream
recvStr *ReceiveStream
mx sync .Mutex
sendSideClosed, recvSideClosed bool
onClose func ()
}
func newStream(str *quic .Stream , hdr []byte , onClose func ()) *Stream {
s := &Stream {onClose : onClose }
s .sendStr = newSendStream (str , hdr , func () { s .registerClose (true ) })
s .recvStr = newReceiveStream (str , func () { s .registerClose (false ) })
return s
}
func (s *Stream ) Write (b []byte ) (int , error ) {
return s .sendStr .Write (b )
}
func (s *Stream ) Read (b []byte ) (int , error ) {
return s .recvStr .Read (b )
}
func (s *Stream ) CancelWrite (e StreamErrorCode ) {
s .sendStr .CancelWrite (e )
}
func (s *Stream ) CancelRead (e StreamErrorCode ) {
s .recvStr .CancelRead (e )
}
func (s *Stream ) Close () error {
return s .sendStr .Close ()
}
func (s *Stream ) registerClose (isSendSide bool ) {
s .mx .Lock ()
if isSendSide {
s .sendSideClosed = true
} else {
s .recvSideClosed = true
}
isClosed := s .sendSideClosed && s .recvSideClosed
s .mx .Unlock ()
if isClosed {
s .onClose ()
}
}
func (s *Stream ) closeWithSession (err error ) {
s .sendStr .closeWithSession (err )
s .recvStr .closeWithSession (err )
}
func (s *Stream ) Context () context .Context {
return s .sendStr .Context ()
}
func (s *Stream ) SetWriteDeadline (t time .Time ) error {
return s .sendStr .SetWriteDeadline (t )
}
func (s *Stream ) SetReadDeadline (t time .Time ) error {
return s .recvStr .SetReadDeadline (t )
}
func (s *Stream ) SetDeadline (t time .Time ) error {
err1 := s .SetWriteDeadline (t )
err2 := s .SetReadDeadline (t )
return errors .Join (err1 , err2 )
}
func maybeConvertStreamError(err error ) error {
if err == nil {
return nil
}
var streamErr *quic .StreamError
if errors .As (err , &streamErr ) {
errorCode , cerr := httpCodeToWebtransportCode (streamErr .ErrorCode )
if cerr != nil {
return fmt .Errorf ("stream reset, but failed to convert stream error %d: %w" , streamErr .ErrorCode , cerr )
}
return &StreamError {
ErrorCode : errorCode ,
Remote : streamErr .Remote ,
}
}
return err
}
func isTimeoutError(err error ) bool {
nerr , ok := err .(net .Error )
if !ok {
return false
}
return nerr .Timeout ()
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .