package yamux

import (
	
	
	
	
	
)

type streamState int

const (
	streamInit streamState = iota
	streamSYNSent
	streamSYNReceived
	streamEstablished
	streamFinished
)

type halfStreamState int

const (
	halfOpen halfStreamState = iota
	halfClosed
	halfReset
)

// Stream is used to represent a logical stream
// within a session.
type Stream struct {
	sendWindow uint32

	memorySpan MemoryManager

	id      uint32
	session *Session

	recvWindow uint32
	epochStart time.Time

	state                 streamState
	writeState, readState halfStreamState
	writeErr, readErr     error
	stateLock             sync.Mutex

	recvBuf segmentedBuffer

	recvNotifyCh chan struct{}
	sendNotifyCh chan struct{}

	readDeadline, writeDeadline pipeDeadline
}

// newStream is used to construct a new stream within a given session for an ID.
// It assumes that a memory allocation has been obtained for the initialWindow.
func newStream( *Session,  uint32,  streamState,  uint32,  MemoryManager) *Stream {
	 := &Stream{
		id:            ,
		session:       ,
		state:         ,
		sendWindow:    initialStreamWindow,
		readDeadline:  makePipeDeadline(),
		writeDeadline: makePipeDeadline(),
		memorySpan:    ,
		// Initialize the recvBuf with initialStreamWindow, not config.InitialStreamWindowSize.
		// The peer isn't allowed to send more data than initialStreamWindow until we've sent
		// the first window update (which will grant it up to config.InitialStreamWindowSize).
		recvBuf:      newSegmentedBuffer(),
		recvWindow:   .config.InitialStreamWindowSize,
		epochStart:   time.Now(),
		recvNotifyCh: make(chan struct{}, 1),
		sendNotifyCh: make(chan struct{}, 1),
	}
	return 
}

// Session returns the associated stream session
func ( *Stream) () *Session {
	return .session
}

// StreamID returns the ID of this stream
func ( *Stream) () uint32 {
	return .id
}

// Read is used to read from the stream
func ( *Stream) ( []byte) ( int,  error) {
:
	.stateLock.Lock()
	 := .readState
	 := .readErr
	.stateLock.Unlock()

	switch  {
	case halfOpen:
		// Open -> read
	case halfClosed:
		 := .recvBuf.Len() == 0
		if  {
			return 0, io.EOF
		}
		// Closed, but we have data pending -> read.
	case halfReset:
		return 0, 
	default:
		panic("unknown state")
	}

	// If there is no data available, block
	if .recvBuf.Len() == 0 {
		select {
		case <-.recvNotifyCh:
			goto 
		case <-.readDeadline.wait():
			return 0, ErrTimeout
		}
	}

	// Read any bytes
	, _ = .recvBuf.Read()

	// Send a window update potentially
	 = .sendWindowUpdate(.readDeadline.wait())
	return , 
}

// Write is used to write to the stream
func ( *Stream) ( []byte) (int, error) {
	var  int
	for  < len() {
		,  := .write([:])
		 += 
		if  != nil {
			return , 
		}
	}
	return , nil
}

// write is used to write to the stream, may return on
// a short write.
func ( *Stream) ( []byte) ( int,  error) {
	var  uint16
	var  uint32
	var  header

:
	.stateLock.Lock()
	 := .writeState
	 := .writeErr
	.stateLock.Unlock()

	switch  {
	case halfOpen:
		// Open for writing -> write
	case halfClosed:
		return 0, ErrStreamClosed
	case halfReset:
		return 0, 
	default:
		panic("unknown state")
	}

	// If there is no data available, block
	 := atomic.LoadUint32(&.sendWindow)
	if  == 0 {
		select {
		case <-.sendNotifyCh:
			goto 
		case <-.writeDeadline.wait():
			return 0, ErrTimeout
		}
	}

	// Determine the flags if any
	 = .sendFlags()

	// Send up to min(message, window
	 = min(, .session.config.MaxMessageSize-headerSize, uint32(len()))

	// Send the header
	 = encode(typeData, , .id, )
	if  = .session.sendMsg(, [:], .writeDeadline.wait(), true);  != nil {
		return 0, 
	}

	// Reduce our send window
	atomic.AddUint32(&.sendWindow, ^uint32(-1))

	// Unlock
	return int(), 
}

// sendFlags determines any flags that are appropriate
// based on the current stream state
func ( *Stream) () uint16 {
	.stateLock.Lock()
	defer .stateLock.Unlock()
	var  uint16
	switch .state {
	case streamInit:
		 |= flagSYN
		.state = streamSYNSent
	case streamSYNReceived:
		 |= flagACK
		.state = streamEstablished
	}
	return 
}

// sendWindowUpdate potentially sends a window update enabling
// further writes to take place. Must be invoked with the lock.
func ( *Stream) ( <-chan struct{}) error {
	// Determine the flags if any
	 := .sendFlags()

	// Update the receive window.
	,  := .recvBuf.GrowTo(.recvWindow,  != 0)
	if ! {
		return nil
	}

	 := time.Now()
	if  := .session.getRTT();  == 0 &&  > 0 && .Sub(.epochStart) < *4 {
		var  uint32
		if .recvWindow > math.MaxUint32/2 {
			 = min(math.MaxUint32, .session.config.MaxStreamWindowSize)
		} else {
			 = min(.recvWindow*2, .session.config.MaxStreamWindowSize)
		}
		if  > .recvWindow {
			 :=  - .recvWindow
			if  := .memorySpan.ReserveMemory(int(), 128);  == nil {
				.recvWindow = 
				_,  = .recvBuf.GrowTo(.recvWindow, true)
			}
		}
	}

	.epochStart = 
	 := encode(typeWindowUpdate, , .id, )
	return .session.sendMsg(, nil, , true)
}

// sendClose is used to send a FIN
func ( *Stream) () error {
	 := .sendFlags()
	 |= flagFIN
	 := encode(typeWindowUpdate, , .id, 0)
	return .session.sendMsg(, nil, nil, true)
}

// sendReset is used to send a RST
func ( *Stream) ( uint32) error {
	 := encode(typeWindowUpdate, flagRST, .id, )
	return .session.sendMsg(, nil, nil, true)
}

// Reset resets the stream (forcibly closes the stream)
func ( *Stream) () error {
	return .ResetWithError(0)
}

func ( *Stream) ( uint32) error {
	 := false
	.stateLock.Lock()
	switch .state {
	case streamFinished:
		.stateLock.Unlock()
		return nil
	case streamInit:
		// we haven't sent anything, so we don't need to send a reset.
	case streamSYNSent, streamSYNReceived, streamEstablished:
		 = true
	default:
		panic("unhandled state")
	}

	// at least one direction is open, we need to reset.

	// If we've already sent/received an EOF, no need to reset that side.
	if .writeState == halfOpen {
		.writeState = halfReset
		.writeErr = &StreamError{Remote: false, ErrorCode: }
	}
	if .readState == halfOpen {
		.readState = halfReset
		.readErr = &StreamError{Remote: false, ErrorCode: }
	}
	.state = streamFinished
	.notifyWaiting()
	.stateLock.Unlock()
	if  {
		_ = .sendReset()
	}
	.cleanup()
	return nil
}

// CloseWrite is used to close the stream for writing.
func ( *Stream) () error {
	.stateLock.Lock()
	switch .writeState {
	case halfOpen:
		// Open for writing -> close write
	case halfClosed:
		.stateLock.Unlock()
		return nil
	case halfReset:
		.stateLock.Unlock()
		return .writeErr
	default:
		panic("invalid state")
	}
	.writeState = halfClosed
	 := .readState != halfOpen
	if  {
		.state = streamFinished
	}
	.stateLock.Unlock()
	.notifyWaiting()

	 := .sendClose()
	if  {
		// we're fully closed, might as well be nice to the user and
		// free everything early.
		.cleanup()
	}
	return 
}

// CloseRead is used to close the stream for reading.
// Note: Remote is not notified.
func ( *Stream) () error {
	 := false
	.stateLock.Lock()
	switch .readState {
	case halfOpen:
		// Open for reading -> close read
	case halfClosed, halfReset:
		.stateLock.Unlock()
		return nil
	default:
		panic("invalid state")
	}
	.readState = halfReset
	.readErr = ErrStreamReset
	 = .writeState != halfOpen
	if  {
		.state = streamFinished
	}
	.stateLock.Unlock()
	.notifyWaiting()
	if  {
		// we're fully closed, might as well be nice to the user and
		// free everything early.
		.cleanup()
	}
	return nil
}

// Close is used to close the stream.
func ( *Stream) () error {
	_ = .CloseRead() // can't fail.
	return .CloseWrite()
}

// forceClose is used for when the session is exiting
func ( *Stream) ( error) {
	.stateLock.Lock()
	if .readState == halfOpen {
		.readState = halfReset
		.readErr = 
	}
	if .writeState == halfOpen {
		.writeState = halfReset
		.writeErr = 
	}
	.state = streamFinished
	.notifyWaiting()
	.stateLock.Unlock()

	.readDeadline.set(time.Time{})
	.writeDeadline.set(time.Time{})
}

// called when fully closed to release any system resources.
func ( *Stream) () {
	.session.closeStream(.id)
	.readDeadline.set(time.Time{})
	.writeDeadline.set(time.Time{})
}

// processFlags is used to update the state of the stream
// based on set flags, if any. Lock must be held
func ( *Stream) ( header,  uint16) {
	// Close the stream without holding the state lock
	var  bool
	defer func() {
		if  {
			.cleanup()
		}
	}()

	if &flagACK == flagACK {
		.stateLock.Lock()
		if .state == streamSYNSent {
			.state = streamEstablished
		}
		.stateLock.Unlock()
		.session.establishStream(.id)
	}
	if &flagFIN == flagFIN {
		var  bool
		.stateLock.Lock()
		if .readState == halfOpen {
			.readState = halfClosed
			if .writeState != halfOpen {
				// We're now fully closed.
				 = true
				.state = streamFinished
			}
			 = true
		}
		.stateLock.Unlock()
		if  {
			.notifyWaiting()
		}
	}
	if &flagRST == flagRST {
		.stateLock.Lock()
		var  error = ErrStreamReset
		// Length in a window update frame with RST flag encodes an error code.
		if .MsgType() == typeWindowUpdate {
			 = &StreamError{Remote: true, ErrorCode: .Length()}
		}
		if .readState == halfOpen {
			.readState = halfReset
			.readErr = 
		}
		if .writeState == halfOpen {
			.writeState = halfReset
			.writeErr = 
		}
		.state = streamFinished
		.stateLock.Unlock()
		 = true
		.notifyWaiting()
	}
}

// notifyWaiting notifies all the waiting channels
func ( *Stream) () {
	asyncNotify(.recvNotifyCh)
	asyncNotify(.sendNotifyCh)
}

// incrSendWindow updates the size of our send window
func ( *Stream) ( header,  uint16) {
	.processFlags(, )
	// Increase window, unblock a sender
	atomic.AddUint32(&.sendWindow, .Length())
	asyncNotify(.sendNotifyCh)
}

// readData is used to handle a data frame
func ( *Stream) ( header,  uint16,  io.Reader) error {
	.processFlags(, )

	// Check that our recv window is not exceeded
	 := .Length()
	if  == 0 {
		return nil
	}

	// Copy into buffer
	if  := .recvBuf.Append(, );  != nil {
		.session.logger.Printf("[ERR] yamux: Failed to read stream data on stream %d: %v", .id, )
		return 
	}
	// Unblock the reader
	asyncNotify(.recvNotifyCh)
	return nil
}

// SetDeadline sets the read and write deadlines
func ( *Stream) ( time.Time) error {
	if  := .SetReadDeadline();  != nil {
		return 
	}
	if  := .SetWriteDeadline();  != nil {
		return 
	}
	return nil
}

// SetReadDeadline sets the deadline for future Read calls.
func ( *Stream) ( time.Time) error {
	.stateLock.Lock()
	defer .stateLock.Unlock()
	if .readState == halfOpen {
		.readDeadline.set()
	}
	return nil
}

// SetWriteDeadline sets the deadline for future Write calls
func ( *Stream) ( time.Time) error {
	.stateLock.Lock()
	defer .stateLock.Unlock()
	if .writeState == halfOpen {
		.writeDeadline.set()
	}
	return nil
}