package webtransport
import (
"errors"
"fmt"
"io"
"net"
"sync"
"time"
"github.com/quic-go/quic-go"
)
const sessionCloseErrorCode quic .StreamErrorCode = 0x170d7b68
type SendStream interface {
io .Writer
io .Closer
StreamID () quic .StreamID
CancelWrite (StreamErrorCode )
SetWriteDeadline (time .Time ) error
}
type ReceiveStream interface {
io .Reader
StreamID () quic .StreamID
CancelRead (StreamErrorCode )
SetReadDeadline (time .Time ) error
}
type Stream interface {
SendStream
ReceiveStream
SetDeadline (time .Time ) error
}
type sendStream struct {
str quic .SendStream
streamHdr []byte
onClose func ()
once sync .Once
}
var _ SendStream = &sendStream {}
func newSendStream(str quic .SendStream , hdr []byte , onClose func ()) *sendStream {
return &sendStream {str : str , streamHdr : hdr , onClose : onClose }
}
func (s *sendStream ) maybeSendStreamHeader () (err error ) {
s .once .Do (func () {
if _ , e := s .str .Write (s .streamHdr ); e != nil {
err = e
return
}
s .streamHdr = nil
})
return
}
func (s *sendStream ) Write (b []byte ) (int , error ) {
if err := s .maybeSendStreamHeader (); err != nil {
return 0 , err
}
n , err := s .str .Write (b )
if err != nil && !isTimeoutError (err ) {
s .onClose ()
}
return n , maybeConvertStreamError (err )
}
func (s *sendStream ) CancelWrite (e StreamErrorCode ) {
s .str .CancelWrite (webtransportCodeToHTTPCode (e ))
s .onClose ()
}
func (s *sendStream ) closeWithSession () {
s .str .CancelWrite (sessionCloseErrorCode )
}
func (s *sendStream ) Close () error {
if err := s .maybeSendStreamHeader (); err != nil {
return err
}
s .onClose ()
return maybeConvertStreamError (s .str .Close ())
}
func (s *sendStream ) SetWriteDeadline (t time .Time ) error {
return maybeConvertStreamError (s .str .SetWriteDeadline (t ))
}
func (s *sendStream ) StreamID () quic .StreamID {
return s .str .StreamID ()
}
type receiveStream struct {
str quic .ReceiveStream
onClose func ()
}
var _ ReceiveStream = &receiveStream {}
func newReceiveStream(str quic .ReceiveStream , onClose func ()) *receiveStream {
return &receiveStream {str : str , onClose : onClose }
}
func (s *receiveStream ) Read (b []byte ) (int , error ) {
n , err := s .str .Read (b )
if err != nil && !isTimeoutError (err ) {
s .onClose ()
}
return n , maybeConvertStreamError (err )
}
func (s *receiveStream ) CancelRead (e StreamErrorCode ) {
s .str .CancelRead (webtransportCodeToHTTPCode (e ))
s .onClose ()
}
func (s *receiveStream ) closeWithSession () {
s .str .CancelRead (sessionCloseErrorCode )
}
func (s *receiveStream ) SetReadDeadline (t time .Time ) error {
return maybeConvertStreamError (s .str .SetReadDeadline (t ))
}
func (s *receiveStream ) StreamID () quic .StreamID {
return s .str .StreamID ()
}
type stream struct {
*sendStream
*receiveStream
mx sync .Mutex
sendSideClosed, recvSideClosed bool
onClose func ()
}
var _ Stream = &stream {}
func newStream(str quic .Stream , hdr []byte , onClose func ()) *stream {
s := &stream {onClose : onClose }
s .sendStream = newSendStream (str , hdr , func () { s .registerClose (true ) })
s .receiveStream = newReceiveStream (str , func () { s .registerClose (false ) })
return s
}
func (s *stream ) registerClose (isSendSide bool ) {
s .mx .Lock ()
if isSendSide {
s .sendSideClosed = true
} else {
s .recvSideClosed = true
}
isClosed := s .sendSideClosed && s .recvSideClosed
s .mx .Unlock ()
if isClosed {
s .onClose ()
}
}
func (s *stream ) closeWithSession () {
s .sendStream .closeWithSession ()
s .receiveStream .closeWithSession ()
}
func (s *stream ) SetDeadline (t time .Time ) error {
err1 := s .sendStream .SetWriteDeadline (t )
err2 := s .receiveStream .SetReadDeadline (t )
if err1 != nil {
return err1
}
return err2
}
func (s *stream ) StreamID () quic .StreamID {
return s .receiveStream .StreamID ()
}
func maybeConvertStreamError(err error ) error {
if err == nil {
return nil
}
var streamErr *quic .StreamError
if errors .As (err , &streamErr ) {
errorCode , cerr := httpCodeToWebtransportCode (streamErr .ErrorCode )
if cerr != nil {
return fmt .Errorf ("stream reset, but failed to convert stream error %d: %w" , streamErr .ErrorCode , cerr )
}
return &StreamError {
ErrorCode : errorCode ,
Remote : streamErr .Remote ,
}
}
return err
}
func isTimeoutError(err error ) bool {
nerr , ok := err .(net .Error )
if !ok {
return false
}
return nerr .Timeout ()
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .