package quic
import (
"context"
"net"
"os"
"sync"
"time"
"github.com/quic-go/quic-go/internal/ackhandler"
"github.com/quic-go/quic-go/internal/flowcontrol"
"github.com/quic-go/quic-go/internal/monotime"
"github.com/quic-go/quic-go/internal/protocol"
"github.com/quic-go/quic-go/internal/wire"
)
type deadlineError struct {}
func (deadlineError ) Error () string { return "deadline exceeded" }
func (deadlineError ) Temporary () bool { return true }
func (deadlineError ) Timeout () bool { return true }
func (deadlineError ) Unwrap () error { return os .ErrDeadlineExceeded }
var errDeadline net .Error = &deadlineError {}
type streamSender interface {
onHasConnectionData()
onHasStreamData(protocol .StreamID , *SendStream )
onHasStreamControlFrame(protocol .StreamID , streamControlFrameGetter )
onStreamCompleted(protocol .StreamID )
}
type uniStreamSender struct {
streamSender
onStreamCompletedImpl func ()
onHasStreamControlFrameImpl func (protocol .StreamID , streamControlFrameGetter )
}
func (s *uniStreamSender ) onHasStreamData (id protocol .StreamID , str *SendStream ) {
s .streamSender .onHasStreamData (id , str )
}
func (s *uniStreamSender ) onStreamCompleted (protocol .StreamID ) { s .onStreamCompletedImpl () }
func (s *uniStreamSender ) onHasStreamControlFrame (id protocol .StreamID , str streamControlFrameGetter ) {
s .onHasStreamControlFrameImpl (id , str )
}
var _ streamSender = &uniStreamSender {}
type Stream struct {
receiveStr *ReceiveStream
sendStr *SendStream
completedMutex sync .Mutex
sender streamSender
receiveStreamCompleted bool
sendStreamCompleted bool
}
var (
_ outgoingStream = &Stream {}
_ sendStreamFrameHandler = &Stream {}
_ receiveStreamFrameHandler = &Stream {}
)
func newStream(
ctx context .Context ,
streamID protocol .StreamID ,
sender streamSender ,
flowController flowcontrol .StreamFlowController ,
supportsResetStreamAt bool ,
) *Stream {
s := &Stream {sender : sender }
senderForSendStream := &uniStreamSender {
streamSender : sender ,
onStreamCompletedImpl : func () {
s .completedMutex .Lock ()
s .sendStreamCompleted = true
s .checkIfCompleted ()
s .completedMutex .Unlock ()
},
onHasStreamControlFrameImpl : func (id protocol .StreamID , str streamControlFrameGetter ) {
sender .onHasStreamControlFrame (streamID , s )
},
}
s .sendStr = newSendStream (ctx , streamID , senderForSendStream , flowController , supportsResetStreamAt )
senderForReceiveStream := &uniStreamSender {
streamSender : sender ,
onStreamCompletedImpl : func () {
s .completedMutex .Lock ()
s .receiveStreamCompleted = true
s .checkIfCompleted ()
s .completedMutex .Unlock ()
},
onHasStreamControlFrameImpl : func (id protocol .StreamID , str streamControlFrameGetter ) {
sender .onHasStreamControlFrame (streamID , s )
},
}
s .receiveStr = newReceiveStream (streamID , senderForReceiveStream , flowController )
return s
}
func (s *Stream ) StreamID () protocol .StreamID {
return s .sendStr .StreamID ()
}
func (s *Stream ) Read (p []byte ) (int , error ) {
return s .receiveStr .Read (p )
}
func (s *Stream ) Peek (b []byte ) (int , error ) {
return s .receiveStr .Peek (b )
}
func (s *Stream ) Write (p []byte ) (int , error ) {
return s .sendStr .Write (p )
}
func (s *Stream ) SetReliableBoundary () {
s .sendStr .SetReliableBoundary ()
}
func (s *Stream ) CancelWrite (errorCode StreamErrorCode ) {
s .sendStr .CancelWrite (errorCode )
}
func (s *Stream ) CancelRead (errorCode StreamErrorCode ) {
s .receiveStr .CancelRead (errorCode )
}
func (s *Stream ) Context () context .Context {
return s .sendStr .Context ()
}
func (s *Stream ) Close () error {
return s .sendStr .Close ()
}
func (s *Stream ) handleResetStreamFrame (frame *wire .ResetStreamFrame , rcvTime monotime .Time ) error {
return s .receiveStr .handleResetStreamFrame (frame , rcvTime )
}
func (s *Stream ) handleStreamFrame (frame *wire .StreamFrame , rcvTime monotime .Time ) error {
return s .receiveStr .handleStreamFrame (frame , rcvTime )
}
func (s *Stream ) handleStopSendingFrame (frame *wire .StopSendingFrame ) {
s .sendStr .handleStopSendingFrame (frame )
}
func (s *Stream ) updateSendWindow (limit protocol .ByteCount ) {
s .sendStr .updateSendWindow (limit )
}
func (s *Stream ) enableResetStreamAt () {
s .sendStr .enableResetStreamAt ()
}
func (s *Stream ) popStreamFrame (maxBytes protocol .ByteCount , v protocol .Version ) (_ ackhandler .StreamFrame , _ *wire .StreamDataBlockedFrame , hasMore bool ) {
return s .sendStr .popStreamFrame (maxBytes , v )
}
func (s *Stream ) getControlFrame (now monotime .Time ) (_ ackhandler .Frame , ok , hasMore bool ) {
f , ok , _ := s .sendStr .getControlFrame (now )
if ok {
return f , true , true
}
return s .receiveStr .getControlFrame (now )
}
func (s *Stream ) SetReadDeadline (t time .Time ) error {
return s .receiveStr .SetReadDeadline (t )
}
func (s *Stream ) SetWriteDeadline (t time .Time ) error {
return s .sendStr .SetWriteDeadline (t )
}
func (s *Stream ) SetDeadline (t time .Time ) error {
_ = s .receiveStr .SetReadDeadline (t )
_ = s .sendStr .SetWriteDeadline (t )
return nil
}
func (s *Stream ) closeForShutdown (err error ) {
s .sendStr .closeForShutdown (err )
s .receiveStr .closeForShutdown (err )
}
func (s *Stream ) checkIfCompleted () {
if s .sendStreamCompleted && s .receiveStreamCompleted {
s .sender .onStreamCompleted (s .StreamID ())
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .