package quic
import (
"fmt"
"io"
"sync"
"time"
"github.com/quic-go/quic-go/internal/ackhandler"
"github.com/quic-go/quic-go/internal/flowcontrol"
"github.com/quic-go/quic-go/internal/monotime"
"github.com/quic-go/quic-go/internal/protocol"
"github.com/quic-go/quic-go/internal/qerr"
"github.com/quic-go/quic-go/internal/wire"
)
type ReceiveStream struct {
mutex sync .Mutex
streamID protocol .StreamID
sender streamSender
frameQueue *frameSorter
finalOffset protocol .ByteCount
currentFrame []byte
currentFrameDone func ()
readPosInFrame int
currentFrameIsLast bool
queuedStopSending bool
queuedMaxStreamData bool
errorRead bool
completed bool
cancelledRemotely bool
cancelledLocally bool
cancelErr *StreamError
closeForShutdownErr error
readPos protocol .ByteCount
reliableSize protocol .ByteCount
readChan chan struct {}
readOnce chan struct {}
deadline monotime .Time
flowController flowcontrol .StreamFlowController
}
var (
_ streamControlFrameGetter = &ReceiveStream {}
_ receiveStreamFrameHandler = &ReceiveStream {}
)
func newReceiveStream(
streamID protocol .StreamID ,
sender streamSender ,
flowController flowcontrol .StreamFlowController ,
) *ReceiveStream {
return &ReceiveStream {
streamID : streamID ,
sender : sender ,
flowController : flowController ,
frameQueue : newFrameSorter (),
readChan : make (chan struct {}, 1 ),
readOnce : make (chan struct {}, 1 ),
finalOffset : protocol .MaxByteCount ,
}
}
func (s *ReceiveStream ) StreamID () protocol .StreamID {
return s .streamID
}
func (s *ReceiveStream ) Read (p []byte ) (int , error ) {
s .readOnce <- struct {}{}
defer func () { <-s .readOnce }()
s .mutex .Lock ()
queuedStreamWindowUpdate , queuedConnWindowUpdate , n , err := s .readImpl (p )
completed := s .isNewlyCompleted ()
s .mutex .Unlock ()
if completed {
s .sender .onStreamCompleted (s .streamID )
}
if queuedStreamWindowUpdate {
s .sender .onHasStreamControlFrame (s .streamID , s )
}
if queuedConnWindowUpdate {
s .sender .onHasConnectionData ()
}
return n , err
}
func (s *ReceiveStream ) isNewlyCompleted () bool {
if s .completed {
return false
}
if s .finalOffset == protocol .MaxByteCount {
return false
}
if s .cancelledLocally {
s .completed = true
return true
}
if s .errorRead {
s .completed = true
return true
}
return false
}
func (s *ReceiveStream ) readImpl (p []byte ) (hasStreamWindowUpdate bool , hasConnWindowUpdate bool , _ int , _ error ) {
if s .currentFrameIsLast && s .currentFrame == nil {
s .errorRead = true
return false , false , 0 , io .EOF
}
if s .cancelledLocally || s .isRemoteCancellationEffective () {
s .errorRead = true
return false , false , 0 , s .cancelErr
}
if s .closeForShutdownErr != nil {
return false , false , 0 , s .closeForShutdownErr
}
var bytesRead int
var deadlineTimer *time .Timer
for bytesRead < len (p ) {
if s .currentFrame == nil || s .readPosInFrame >= len (s .currentFrame ) {
s .dequeueNextFrame ()
}
if s .currentFrame == nil && bytesRead > 0 {
return hasStreamWindowUpdate , hasConnWindowUpdate , bytesRead , s .closeForShutdownErr
}
for {
if s .closeForShutdownErr != nil {
return hasStreamWindowUpdate , hasConnWindowUpdate , bytesRead , s .closeForShutdownErr
}
if s .cancelledLocally || s .isRemoteCancellationEffective () {
s .errorRead = true
return hasStreamWindowUpdate , hasConnWindowUpdate , bytesRead , s .cancelErr
}
deadline := s .deadline
if !deadline .IsZero () && !monotime .Now ().Before (deadline ) {
return hasStreamWindowUpdate , hasConnWindowUpdate , bytesRead , errDeadline
}
if s .currentFrame != nil || s .currentFrameIsLast {
break
}
s .mutex .Unlock ()
if deadline .IsZero () {
<-s .readChan
} else {
if deadlineTimer == nil {
deadlineTimer = time .NewTimer (monotime .Until (deadline ))
defer deadlineTimer .Stop ()
} else {
deadlineTimer .Reset (monotime .Until (deadline ))
}
select {
case <- s .readChan :
case <- deadlineTimer .C :
}
}
s .mutex .Lock ()
s .dequeueNextFrame ()
}
if bytesRead > len (p ) {
return hasStreamWindowUpdate , hasConnWindowUpdate , bytesRead , fmt .Errorf ("BUG: bytesRead (%d) > len(p) (%d) in stream.Read" , bytesRead , len (p ))
}
if s .readPosInFrame > len (s .currentFrame ) {
return hasStreamWindowUpdate , hasConnWindowUpdate , bytesRead , fmt .Errorf ("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read" , s .readPosInFrame , len (s .currentFrame ))
}
m := copy (p [bytesRead :], s .currentFrame [s .readPosInFrame :])
if !s .isRemoteCancellationEffective () {
hasStream , hasConn := s .flowController .AddBytesRead (protocol .ByteCount (m ))
if hasStream {
s .queuedMaxStreamData = true
hasStreamWindowUpdate = true
}
if hasConn {
hasConnWindowUpdate = true
}
}
s .readPosInFrame += m
s .readPos += protocol .ByteCount (m )
bytesRead += m
if s .isRemoteCancellationEffective () {
s .flowController .Abandon ()
}
if s .readPosInFrame >= len (s .currentFrame ) && s .currentFrameIsLast {
s .currentFrame = nil
if s .currentFrameDone != nil {
s .currentFrameDone ()
}
s .errorRead = true
return hasStreamWindowUpdate , hasConnWindowUpdate , bytesRead , io .EOF
}
}
if s .isRemoteCancellationEffective () {
s .errorRead = true
return hasStreamWindowUpdate , hasConnWindowUpdate , bytesRead , s .cancelErr
}
return hasStreamWindowUpdate , hasConnWindowUpdate , bytesRead , nil
}
func (s *ReceiveStream ) isRemoteCancellationEffective () bool {
return s .cancelledRemotely && s .readPos >= s .reliableSize
}
func (s *ReceiveStream ) Peek (b []byte ) (int , error ) {
if len (b ) == 0 {
return 0 , nil
}
s .readOnce <- struct {}{}
defer func () { <-s .readOnce }()
return s .peekImpl (b )
}
func (s *ReceiveStream ) peekImpl (b []byte ) (int , error ) {
s .mutex .Lock ()
defer s .mutex .Unlock ()
var deadlineTimer *time .Timer
for {
if s .currentFrameIsLast && s .currentFrame == nil {
return 0 , io .EOF
}
if s .cancelledLocally || s .isRemoteCancellationEffective () {
return 0 , s .cancelErr
}
if s .closeForShutdownErr != nil {
return 0 , s .closeForShutdownErr
}
deadline := s .deadline
if !deadline .IsZero () && !monotime .Now ().Before (deadline ) {
return 0 , errDeadline
}
if s .currentFrame == nil || s .readPosInFrame >= len (s .currentFrame ) {
s .dequeueNextFrame ()
}
if s .currentFrame != nil && s .readPosInFrame < len (s .currentFrame ) {
availableInCurrentFrame := len (s .currentFrame ) - s .readPosInFrame
if availableInCurrentFrame >= len (b ) {
copy (b , s .currentFrame [s .readPosInFrame :])
return len (b ), nil
}
offset := s .readPos + protocol .ByteCount (availableInCurrentFrame )
if err := s .frameQueue .Peek (offset , b [availableInCurrentFrame :]); err == nil {
copy (b [:availableInCurrentFrame ], s .currentFrame [s .readPosInFrame :])
return len (b ), nil
}
if s .currentFrameIsLast {
copy (b [:availableInCurrentFrame ], s .currentFrame [s .readPosInFrame :])
return availableInCurrentFrame , io .EOF
}
if s .cancelledRemotely && s .readPos +protocol .ByteCount (len (b )) > s .reliableSize {
total := int (s .reliableSize - s .readPos )
needed := total - availableInCurrentFrame
if needed <= 0 || s .frameQueue .Peek (offset , b [availableInCurrentFrame :total ]) == nil {
copy (b [:availableInCurrentFrame ], s .currentFrame [s .readPosInFrame :])
return total , s .cancelErr
}
}
if s .readPos +protocol .ByteCount (len (b )) > s .finalOffset {
total := int (s .finalOffset - s .readPos )
needed := total - availableInCurrentFrame
if needed <= 0 || s .frameQueue .Peek (offset , b [availableInCurrentFrame :total ]) == nil {
copy (b [:availableInCurrentFrame ], s .currentFrame [s .readPosInFrame :])
return total , io .EOF
}
}
}
if s .currentFrameIsLast || s .readPos >= s .finalOffset {
return 0 , io .EOF
}
s .mutex .Unlock ()
if deadline .IsZero () {
<-s .readChan
} else {
if deadlineTimer == nil {
deadlineTimer = time .NewTimer (monotime .Until (deadline ))
defer deadlineTimer .Stop ()
} else {
deadlineTimer .Reset (monotime .Until (deadline ))
}
select {
case <- s .readChan :
case <- deadlineTimer .C :
}
}
s .mutex .Lock ()
if s .currentFrame == nil || s .readPosInFrame >= len (s .currentFrame ) {
s .dequeueNextFrame ()
}
}
}
func (s *ReceiveStream ) dequeueNextFrame () {
var offset protocol .ByteCount
if s .currentFrameDone != nil {
s .currentFrameDone ()
}
offset , s .currentFrame , s .currentFrameDone = s .frameQueue .Pop ()
s .currentFrameIsLast = offset +protocol .ByteCount (len (s .currentFrame )) >= s .finalOffset && !s .cancelledRemotely
s .readPosInFrame = 0
}
func (s *ReceiveStream ) CancelRead (errorCode StreamErrorCode ) {
s .mutex .Lock ()
queuedNewControlFrame := s .cancelReadImpl (errorCode )
completed := s .isNewlyCompleted ()
s .mutex .Unlock ()
if queuedNewControlFrame {
s .sender .onHasStreamControlFrame (s .streamID , s )
}
if completed {
s .flowController .Abandon ()
s .sender .onStreamCompleted (s .streamID )
}
}
func (s *ReceiveStream ) cancelReadImpl (errorCode qerr .StreamErrorCode ) (queuedNewControlFrame bool ) {
if s .cancelledLocally {
return false
}
if s .closeForShutdownErr != nil {
return false
}
s .cancelledLocally = true
if s .errorRead || s .cancelledRemotely {
return false
}
s .queuedStopSending = true
s .cancelErr = &StreamError {StreamID : s .streamID , ErrorCode : errorCode , Remote : false }
s .signalRead ()
return true
}
func (s *ReceiveStream ) handleStreamFrame (frame *wire .StreamFrame , now monotime .Time ) error {
s .mutex .Lock ()
err := s .handleStreamFrameImpl (frame , now )
completed := s .isNewlyCompleted ()
s .mutex .Unlock ()
if completed {
s .flowController .Abandon ()
s .sender .onStreamCompleted (s .streamID )
}
return err
}
func (s *ReceiveStream ) handleStreamFrameImpl (frame *wire .StreamFrame , now monotime .Time ) error {
maxOffset := frame .Offset + frame .DataLen ()
if err := s .flowController .UpdateHighestReceived (maxOffset , frame .Fin , now ); err != nil {
return err
}
if frame .Fin {
s .finalOffset = maxOffset
}
if s .cancelledLocally {
return nil
}
if err := s .frameQueue .Push (frame .Data , frame .Offset , frame .PutBack ); err != nil {
return err
}
s .signalRead ()
return nil
}
func (s *ReceiveStream ) handleResetStreamFrame (frame *wire .ResetStreamFrame , now monotime .Time ) error {
s .mutex .Lock ()
err := s .handleResetStreamFrameImpl (frame , now )
completed := s .isNewlyCompleted ()
s .mutex .Unlock ()
if completed {
s .sender .onStreamCompleted (s .streamID )
}
return err
}
func (s *ReceiveStream ) handleResetStreamFrameImpl (frame *wire .ResetStreamFrame , now monotime .Time ) error {
if s .closeForShutdownErr != nil {
return nil
}
if err := s .flowController .UpdateHighestReceived (frame .FinalSize , true , now ); err != nil {
return err
}
s .finalOffset = frame .FinalSize
if (!s .cancelledRemotely && s .reliableSize == 0 ) || frame .ReliableSize < s .reliableSize {
s .reliableSize = frame .ReliableSize
}
if s .readPos >= s .reliableSize {
s .flowController .Abandon ()
}
if s .cancelledRemotely {
return nil
}
if s .cancelledLocally {
return nil
}
s .cancelledRemotely = true
s .cancelErr = &StreamError {StreamID : s .streamID , ErrorCode : frame .ErrorCode , Remote : true }
s .signalRead ()
return nil
}
func (s *ReceiveStream ) getControlFrame (now monotime .Time ) (_ ackhandler .Frame , ok , hasMore bool ) {
s .mutex .Lock ()
defer s .mutex .Unlock ()
if !s .queuedStopSending && !s .queuedMaxStreamData {
return ackhandler .Frame {}, false , false
}
if s .queuedStopSending {
s .queuedStopSending = false
return ackhandler .Frame {
Frame : &wire .StopSendingFrame {StreamID : s .streamID , ErrorCode : s .cancelErr .ErrorCode },
}, true , s .queuedMaxStreamData
}
s .queuedMaxStreamData = false
return ackhandler .Frame {
Frame : &wire .MaxStreamDataFrame {
StreamID : s .streamID ,
MaximumStreamData : s .flowController .GetWindowUpdate (now ),
},
}, true , false
}
func (s *ReceiveStream ) SetReadDeadline (t time .Time ) error {
s .mutex .Lock ()
s .deadline = monotime .FromTime (t )
s .mutex .Unlock ()
s .signalRead ()
return nil
}
func (s *ReceiveStream ) closeForShutdown (err error ) {
s .mutex .Lock ()
s .closeForShutdownErr = err
s .mutex .Unlock ()
s .signalRead ()
}
func (s *ReceiveStream ) signalRead () {
select {
case s .readChan <- struct {}{}:
default :
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .