package quic

import (
	
	
	
	

	
	
	
	
	
	
)

// A ReceiveStream is a unidirectional Receive Stream.
type ReceiveStream struct {
	mutex sync.Mutex

	streamID protocol.StreamID

	sender streamSender

	frameQueue  *frameSorter
	finalOffset protocol.ByteCount

	currentFrame       []byte
	currentFrameDone   func()
	readPosInFrame     int
	currentFrameIsLast bool // is the currentFrame the last frame on this stream

	queuedStopSending   bool
	queuedMaxStreamData bool

	// Set once we read the io.EOF or the cancellation error.
	// Note that for local cancellations, this doesn't necessarily mean that we know the final offset yet.
	errorRead           bool
	completed           bool // set once we've called streamSender.onStreamCompleted
	cancelledRemotely   bool
	cancelledLocally    bool
	cancelErr           *StreamError
	closeForShutdownErr error

	readPos      protocol.ByteCount
	reliableSize protocol.ByteCount

	readChan chan struct{}
	readOnce chan struct{} // cap: 1, to protect against concurrent use of Read
	deadline monotime.Time

	flowController flowcontrol.StreamFlowController
}

var (
	_ streamControlFrameGetter  = &ReceiveStream{}
	_ receiveStreamFrameHandler = &ReceiveStream{}
)

func newReceiveStream(
	 protocol.StreamID,
	 streamSender,
	 flowcontrol.StreamFlowController,
) *ReceiveStream {
	return &ReceiveStream{
		streamID:       ,
		sender:         ,
		flowController: ,
		frameQueue:     newFrameSorter(),
		readChan:       make(chan struct{}, 1),
		readOnce:       make(chan struct{}, 1),
		finalOffset:    protocol.MaxByteCount,
	}
}

// StreamID returns the stream ID.
func ( *ReceiveStream) () protocol.StreamID {
	return .streamID
}

// Read reads data from the stream.
// Read can be made to time out using [ReceiveStream.SetReadDeadline].
// If the stream was canceled, the error is a [StreamError].
func ( *ReceiveStream) ( []byte) (int, error) {
	// Concurrent use of Read is not permitted (and doesn't make any sense),
	// but sometimes people do it anyway.
	// Make sure that we only execute one call at any given time to avoid hard to debug failures.
	.readOnce <- struct{}{}
	defer func() { <-.readOnce }()

	.mutex.Lock()
	, , ,  := .readImpl()
	 := .isNewlyCompleted()
	.mutex.Unlock()

	if  {
		.sender.onStreamCompleted(.streamID)
	}
	if  {
		.sender.onHasStreamControlFrame(.streamID, )
	}
	if  {
		.sender.onHasConnectionData()
	}
	return , 
}

func ( *ReceiveStream) () bool {
	if .completed {
		return false
	}
	// We need to know the final offset (either via FIN or RESET_STREAM) for flow control accounting.
	if .finalOffset == protocol.MaxByteCount {
		return false
	}
	// We're done with the stream if it was cancelled locally...
	if .cancelledLocally {
		.completed = true
		return true
	}
	// ... or if the error (either io.EOF or the reset error) was read
	if .errorRead {
		.completed = true
		return true
	}
	return false
}

func ( *ReceiveStream) ( []byte) ( bool,  bool,  int,  error) {
	if .currentFrameIsLast && .currentFrame == nil {
		.errorRead = true
		return false, false, 0, io.EOF
	}
	if .cancelledLocally || .isRemoteCancellationEffective() {
		.errorRead = true
		return false, false, 0, .cancelErr
	}
	if .closeForShutdownErr != nil {
		return false, false, 0, .closeForShutdownErr
	}

	var  int
	var  *time.Timer
	for  < len() {
		if .currentFrame == nil || .readPosInFrame >= len(.currentFrame) {
			.dequeueNextFrame()
		}
		if .currentFrame == nil &&  > 0 {
			return , , , .closeForShutdownErr
		}

		for {
			// Stop waiting on errors
			if .closeForShutdownErr != nil {
				return , , , .closeForShutdownErr
			}
			if .cancelledLocally || .isRemoteCancellationEffective() {
				.errorRead = true
				return , , , .cancelErr
			}

			 := .deadline
			if !.IsZero() && !monotime.Now().Before() {
				return , , , errDeadline
			}

			if .currentFrame != nil || .currentFrameIsLast {
				break
			}

			.mutex.Unlock()
			if .IsZero() {
				<-.readChan
			} else {
				if  == nil {
					 = time.NewTimer(monotime.Until())
					defer .Stop()
				} else {
					.Reset(monotime.Until())
				}
				select {
				case <-.readChan:
				case <-.C:
				}
			}
			.mutex.Lock()
			.dequeueNextFrame()
		}

		if  > len() {
			return , , , fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", , len())
		}
		if .readPosInFrame > len(.currentFrame) {
			return , , , fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", .readPosInFrame, len(.currentFrame))
		}
		 := copy([:], .currentFrame[.readPosInFrame:])

		// when a RESET_STREAM was received, the flow controller was already
		// informed about the final offset for this stream
		if !.isRemoteCancellationEffective() {
			,  := .flowController.AddBytesRead(protocol.ByteCount())
			if  {
				.queuedMaxStreamData = true
				 = true
			}
			if  {
				 = true
			}
		}

		.readPosInFrame += 
		.readPos += protocol.ByteCount()
		 += 

		if .isRemoteCancellationEffective() {
			.flowController.Abandon()
		}

		if .readPosInFrame >= len(.currentFrame) && .currentFrameIsLast {
			.currentFrame = nil
			if .currentFrameDone != nil {
				.currentFrameDone()
			}
			.errorRead = true
			return , , , io.EOF
		}
	}
	if .isRemoteCancellationEffective() {
		.errorRead = true
		return , , , .cancelErr
	}
	return , , , nil
}

// isRemoteCancellationEffective returns whether the stream was cancelled remotely
// and all reliable data has been read.
func ( *ReceiveStream) () bool {
	return .cancelledRemotely && .readPos >= .reliableSize
}

// Peek fills b with stream data, without consuming the stream data.
// It blocks until len(b) bytes are available, or an error occurs.
// It respects the stream deadline set by SetReadDeadline.
// If the stream ends before len(b) bytes are available,
// it returns the number of bytes peeked along with io.EOF.
func ( *ReceiveStream) ( []byte) (int, error) {
	if len() == 0 {
		return 0, nil
	}

	// prevent concurrent use with Read
	.readOnce <- struct{}{}
	defer func() { <-.readOnce }()

	return .peekImpl()
}

func ( *ReceiveStream) ( []byte) (int, error) {
	.mutex.Lock()
	defer .mutex.Unlock()

	var  *time.Timer

	for {
		if .currentFrameIsLast && .currentFrame == nil {
			return 0, io.EOF
		}
		if .cancelledLocally || .isRemoteCancellationEffective() {
			return 0, .cancelErr
		}
		if .closeForShutdownErr != nil {
			return 0, .closeForShutdownErr
		}

		 := .deadline
		if !.IsZero() && !monotime.Now().Before() {
			return 0, errDeadline
		}

		if .currentFrame == nil || .readPosInFrame >= len(.currentFrame) {
			.dequeueNextFrame()
		}

		if .currentFrame != nil && .readPosInFrame < len(.currentFrame) {
			 := len(.currentFrame) - .readPosInFrame

			if  >= len() {
				copy(, .currentFrame[.readPosInFrame:])
				return len(), nil
			}

			 := .readPos + protocol.ByteCount()
			// First peek, then copy.
			// This avoids copying data if there's not enough data in the queue.
			if  := .frameQueue.Peek(, [:]);  == nil {
				copy([:], .currentFrame[.readPosInFrame:])
				return len(), nil
			}

			if .currentFrameIsLast {
				copy([:], .currentFrame[.readPosInFrame:])
				return , io.EOF
			}

			// If the stream was remotely cancelled and the request extends beyond the reliable size,
			// return the data available with the cancel error (once it's all received).
			if .cancelledRemotely && .readPos+protocol.ByteCount(len()) > .reliableSize {
				 := int(.reliableSize - .readPos)
				 :=  - 
				// only return once all available data is contiguous
				if  <= 0 || .frameQueue.Peek(, [:]) == nil {
					copy([:], .currentFrame[.readPosInFrame:])
					return , .cancelErr
				}
			}

			// If the request extends beyond the stream's final offset,
			// return the data available with EOF (once it's all received).
			if .readPos+protocol.ByteCount(len()) > .finalOffset {
				 := int(.finalOffset - .readPos)
				 :=  - 
				// only return once all available data is contiguous
				if  <= 0 || .frameQueue.Peek(, [:]) == nil {
					copy([:], .currentFrame[.readPosInFrame:])
					return , io.EOF
				}
			}
		}

		if .currentFrameIsLast || .readPos >= .finalOffset {
			return 0, io.EOF
		}

		.mutex.Unlock()
		if .IsZero() {
			<-.readChan
		} else {
			if  == nil {
				 = time.NewTimer(monotime.Until())
				defer .Stop()
			} else {
				.Reset(monotime.Until())
			}
			select {
			case <-.readChan:
			case <-.C:
			}
		}
		.mutex.Lock()
		if .currentFrame == nil || .readPosInFrame >= len(.currentFrame) {
			.dequeueNextFrame()
		}
	}
}

func ( *ReceiveStream) () {
	var  protocol.ByteCount
	// We're done with the last frame. Release the buffer.
	if .currentFrameDone != nil {
		.currentFrameDone()
	}
	, .currentFrame, .currentFrameDone = .frameQueue.Pop()
	.currentFrameIsLast = +protocol.ByteCount(len(.currentFrame)) >= .finalOffset && !.cancelledRemotely
	.readPosInFrame = 0
}

// CancelRead aborts receiving on this stream.
// It instructs the peer to stop transmitting stream data.
// Read will unblock immediately, and future Read calls will fail.
// When called multiple times or after reading the io.EOF it is a no-op.
func ( *ReceiveStream) ( StreamErrorCode) {
	.mutex.Lock()
	 := .cancelReadImpl()
	 := .isNewlyCompleted()
	.mutex.Unlock()

	if  {
		.sender.onHasStreamControlFrame(.streamID, )
	}
	if  {
		.flowController.Abandon()
		.sender.onStreamCompleted(.streamID)
	}
}

func ( *ReceiveStream) ( qerr.StreamErrorCode) ( bool) {
	if .cancelledLocally { // duplicate call to CancelRead
		return false
	}
	if .closeForShutdownErr != nil {
		return false
	}
	.cancelledLocally = true
	if .errorRead || .cancelledRemotely {
		return false
	}
	.queuedStopSending = true
	.cancelErr = &StreamError{StreamID: .streamID, ErrorCode: , Remote: false}
	.signalRead()
	return true
}

func ( *ReceiveStream) ( *wire.StreamFrame,  monotime.Time) error {
	.mutex.Lock()
	 := .handleStreamFrameImpl(, )
	 := .isNewlyCompleted()
	.mutex.Unlock()

	if  {
		.flowController.Abandon()
		.sender.onStreamCompleted(.streamID)
	}
	return 
}

func ( *ReceiveStream) ( *wire.StreamFrame,  monotime.Time) error {
	 := .Offset + .DataLen()
	if  := .flowController.UpdateHighestReceived(, .Fin, );  != nil {
		return 
	}
	if .Fin {
		.finalOffset = 
	}
	if .cancelledLocally {
		return nil
	}
	if  := .frameQueue.Push(.Data, .Offset, .PutBack);  != nil {
		return 
	}
	.signalRead()
	return nil
}

func ( *ReceiveStream) ( *wire.ResetStreamFrame,  monotime.Time) error {
	.mutex.Lock()
	 := .handleResetStreamFrameImpl(, )
	 := .isNewlyCompleted()
	.mutex.Unlock()

	if  {
		.sender.onStreamCompleted(.streamID)
	}
	return 
}

func ( *ReceiveStream) ( *wire.ResetStreamFrame,  monotime.Time) error {
	if .closeForShutdownErr != nil {
		return nil
	}
	if  := .flowController.UpdateHighestReceived(.FinalSize, true, );  != nil {
		return 
	}
	.finalOffset = .FinalSize

	// senders are allowed to reduce the reliable size, but frames might have been reordered
	if (!.cancelledRemotely && .reliableSize == 0) || .ReliableSize < .reliableSize {
		.reliableSize = .ReliableSize
	}
	if .readPos >= .reliableSize {
		// calling Abandon multiple times is a no-op
		.flowController.Abandon()
	}
	// ignore duplicate RESET_STREAM frames for this stream (after checking their final offset)
	if .cancelledRemotely {
		return nil
	}

	// don't save the error if the RESET_STREAM frames was received after CancelRead was called
	if .cancelledLocally {
		return nil
	}
	.cancelledRemotely = true
	.cancelErr = &StreamError{StreamID: .streamID, ErrorCode: .ErrorCode, Remote: true}
	.signalRead()
	return nil
}

func ( *ReceiveStream) ( monotime.Time) ( ackhandler.Frame, ,  bool) {
	.mutex.Lock()
	defer .mutex.Unlock()

	if !.queuedStopSending && !.queuedMaxStreamData {
		return ackhandler.Frame{}, false, false
	}
	if .queuedStopSending {
		.queuedStopSending = false
		return ackhandler.Frame{
			Frame: &wire.StopSendingFrame{StreamID: .streamID, ErrorCode: .cancelErr.ErrorCode},
		}, true, .queuedMaxStreamData
	}

	.queuedMaxStreamData = false
	return ackhandler.Frame{
		Frame: &wire.MaxStreamDataFrame{
			StreamID:          .streamID,
			MaximumStreamData: .flowController.GetWindowUpdate(),
		},
	}, true, false
}

// SetReadDeadline sets the deadline for future Read calls and
// any currently-blocked Read call.
// A zero value for t means Read will not time out.
func ( *ReceiveStream) ( time.Time) error {
	.mutex.Lock()
	.deadline = monotime.FromTime()
	.mutex.Unlock()
	.signalRead()
	return nil
}

// CloseForShutdown closes a stream abruptly.
// It makes Read unblock (and return the error) immediately.
// The peer will NOT be informed about this: the stream is closed without sending a FIN or RESET.
func ( *ReceiveStream) ( error) {
	.mutex.Lock()
	.closeForShutdownErr = 
	.mutex.Unlock()
	.signalRead()
}

// signalRead performs a non-blocking send on the readChan
func ( *ReceiveStream) () {
	select {
	case .readChan <- struct{}{}:
	default:
	}
}