// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package sctp

import (
	
	
	
	
	
	
	

	
	
)

const (
	// ReliabilityTypeReliable is used for reliable transmission.
	ReliabilityTypeReliable byte = 0
	// ReliabilityTypeRexmit is used for partial reliability by retransmission count.
	ReliabilityTypeRexmit byte = 1
	// ReliabilityTypeTimed is used for partial reliability by retransmission duration.
	ReliabilityTypeTimed byte = 2
)

// StreamState is an enum for SCTP Stream state field
// This field identifies the state of stream.
type StreamState int

// StreamState enums.
const (
	StreamStateOpen    StreamState = iota // Stream object starts with StreamStateOpen
	StreamStateClosing                    // Outgoing stream is being reset
	StreamStateClosed                     // Stream has been closed
)

func ( StreamState) () string {
	switch  {
	case StreamStateOpen:
		return "open"
	case StreamStateClosing:
		return "closing"
	case StreamStateClosed:
		return "closed"
	}

	return "unknown"
}

// SCTP stream errors.
var (
	ErrOutboundPacketTooLarge = errors.New("outbound packet larger than maximum message size")
	ErrStreamClosed           = errors.New("stream closed")
	ErrReadDeadlineExceeded   = fmt.Errorf("read deadline exceeded: %w", os.ErrDeadlineExceeded)
)

// Stream represents an SCTP stream.
type Stream struct {
	association         *Association
	lock                sync.RWMutex
	streamIdentifier    uint16
	defaultPayloadType  PayloadProtocolIdentifier
	reassemblyQueue     *reassemblyQueue
	sequenceNumber      uint16
	readNotifier        *sync.Cond
	readErr             error
	readTimeoutCancel   chan struct{}
	writeDeadline       *deadline.Deadline
	writeLock           sync.Mutex
	unordered           bool
	reliabilityType     byte
	reliabilityValue    uint32
	bufferedAmount      uint64
	bufferedAmountLow   uint64
	onBufferedAmountLow func()
	state               StreamState
	log                 logging.LeveledLogger
	name                string
}

// StreamIdentifier returns the Stream identifier associated to the stream.
func ( *Stream) () uint16 {
	.lock.RLock()
	defer .lock.RUnlock()

	return .streamIdentifier
}

// SetDefaultPayloadType sets the default payload type used by Write.
func ( *Stream) ( PayloadProtocolIdentifier) {
	atomic.StoreUint32((*uint32)(&.defaultPayloadType), uint32())
}

// SetReliabilityParams sets reliability parameters for this stream.
func ( *Stream) ( bool,  byte,  uint32) {
	.lock.Lock()
	defer .lock.Unlock()

	.setReliabilityParams(, , )
}

// setReliabilityParams sets reliability parameters for this stream.
// The caller should hold the lock.
func ( *Stream) ( bool,  byte,  uint32) {
	.log.Debugf("[%s] reliability params: ordered=%v type=%d value=%d",
		.name, !, , )
	.unordered = 
	.reliabilityType = 
	.reliabilityValue = 
}

// Read reads a packet of len(p) bytes, dropping the Payload Protocol Identifier.
// Returns EOF when the stream is reset or an error if the stream is closed
// otherwise.
func ( *Stream) ( []byte) (int, error) {
	, ,  := .ReadSCTP()

	return , 
}

// ReadSCTP reads a packet of len(payload) bytes and returns the associated Payload
// Protocol Identifier.
// Returns EOF when the stream is reset or an error if the stream is closed
// otherwise.
func ( *Stream) ( []byte) (int, PayloadProtocolIdentifier, error) {
	.lock.Lock()
	defer .lock.Unlock()

	defer func() {
		// close readTimeoutCancel if the current read timeout routine is no longer effective
		if .readTimeoutCancel != nil && .readErr != nil {
			close(.readTimeoutCancel)
			.readTimeoutCancel = nil
		}
	}()

	for {
		, ,  := .reassemblyQueue.read()
		if  == nil || errors.Is(, io.ErrShortBuffer) {
			return , , 
		}

		if .readErr != nil {
			return 0, PayloadProtocolIdentifier(0), .readErr
		}

		.readNotifier.Wait()
	}
}

// SetReadDeadline sets the read deadline in an identical way to net.Conn.
func ( *Stream) ( time.Time) error {
	.lock.Lock()
	defer .lock.Unlock()

	if .readTimeoutCancel != nil {
		close(.readTimeoutCancel)
		.readTimeoutCancel = nil
	}

	if .readErr != nil {
		if !errors.Is(.readErr, ErrReadDeadlineExceeded) {
			return nil
		}
		.readErr = nil
	}

	if !.IsZero() {
		.readTimeoutCancel = make(chan struct{})

		go func( chan struct{}) {
			 := time.NewTimer(time.Until())
			select {
			case <-:
				.Stop()

				return
			case <-.C:
				select {
				case <-:
					return
				default:
				}
				.lock.Lock()
				if .readErr == nil {
					.readErr = ErrReadDeadlineExceeded
				}
				.readTimeoutCancel = nil
				.lock.Unlock()

				.readNotifier.Signal()
			}
		}(.readTimeoutCancel)
	}

	return nil
}

func ( *Stream) ( *chunkPayloadData) {
	.lock.Lock()
	defer .lock.Unlock()

	var  bool
	if .reassemblyQueue.push() {
		 = .reassemblyQueue.isReadable()
		.log.Debugf("[%s] reassemblyQueue readable=%v", .name, )
		if  {
			.log.Debugf("[%s] readNotifier.signal()", .name)
			.readNotifier.Signal()
			.log.Debugf("[%s] readNotifier.signal() done", .name)
		}
	}
}

func ( *Stream) ( uint16) {
	var  bool

	func() {
		.lock.Lock()
		defer .lock.Unlock()

		if .unordered {
			return // unordered chunks are handled by handleForwardUnordered method
		}

		// Remove all chunks older than or equal to the new TSN from
		// the reassemblyQueue.
		.reassemblyQueue.forwardTSNForOrdered()
		 = .reassemblyQueue.isReadable()
	}()

	// Notify the reader asynchronously if there's a data chunk to read.
	if  {
		.readNotifier.Signal()
	}
}

func ( *Stream) ( uint32) {
	var  bool

	func() {
		.lock.Lock()
		defer .lock.Unlock()

		if !.unordered {
			return // ordered chunks are handled by handleForwardTSNOrdered method
		}

		// Remove all chunks older than or equal to the new TSN from
		// the reassemblyQueue.
		.reassemblyQueue.forwardTSNForUnordered()
		 = .reassemblyQueue.isReadable()
	}()

	// Notify the reader asynchronously if there's a data chunk to read.
	if  {
		.readNotifier.Signal()
	}
}

// Write writes len(payload) bytes from payload with the default Payload Protocol Identifier.
func ( *Stream) ( []byte) ( int,  error) {
	 := PayloadProtocolIdentifier(atomic.LoadUint32((*uint32)(&.defaultPayloadType)))

	return .WriteSCTP(, )
}

// WriteSCTP writes len(payload) bytes from payload to the DTLS connection.
func ( *Stream) ( []byte,  PayloadProtocolIdentifier) (int, error) {
	 := .association.MaxMessageSize()
	if len() > int() {
		return 0, fmt.Errorf("%w: %v", ErrOutboundPacketTooLarge, )
	}

	if .State() != StreamStateOpen {
		return 0, ErrStreamClosed
	}

	// the send could fail if the association is blocked for writing (timeout), it will left a hole
	// in the stream sequence number space, so we need to lock the write to avoid concurrent send and decrement
	// the sequence number in case of failure
	if .association.isBlockWrite() {
		.writeLock.Lock()
	}
	,  := .packetize(, )
	 := len()
	 := .association.sendPayloadData(.writeDeadline, )
	if  != nil {
		.lock.Lock()
		.bufferedAmount -= uint64()
		if ! {
			.sequenceNumber--
		}
		.lock.Unlock()
		 = 0
	}
	if .association.isBlockWrite() {
		.writeLock.Unlock()
	}

	return , 
}

// SetWriteDeadline sets the write deadline in an identical way to net.Conn,
// it will only work for blocking writes.
func ( *Stream) ( time.Time) error {
	.writeDeadline.Set()

	return nil
}

// SetDeadline sets the read and write deadlines in an identical way to net.Conn.
func ( *Stream) ( time.Time) error {
	if  := .SetReadDeadline();  != nil {
		return 
	}

	return .SetWriteDeadline()
}

func ( *Stream) ( []byte,  PayloadProtocolIdentifier) ([]*chunkPayloadData, bool) {
	.lock.Lock()
	defer .lock.Unlock()

	 := uint32(0)
	 := uint32(len()) //nolint:gosec // G115

	// From draft-ietf-rtcweb-data-protocol-09, section 6:
	//   All Data Channel Establishment Protocol messages MUST be sent using
	//   ordered delivery and reliable transmission.
	 :=  != PayloadTypeWebRTCDCEP && .unordered

	var  []*chunkPayloadData
	var  *chunkPayloadData
	for  != 0 {
		 := min32(.association.maxPayloadSize, )

		// Copy the userdata since we'll have to store it until acked
		// and the caller may re-use the buffer in the mean time
		 := make([]byte, )
		copy(, [:+])

		 := &chunkPayloadData{
			streamIdentifier:     .streamIdentifier,
			userData:             ,
			unordered:            ,
			beginningFragment:     == 0,
			endingFragment:       - == 0,
			immediateSack:        false,
			payloadType:          ,
			streamSequenceNumber: .sequenceNumber,
			head:                 ,
		}

		if  == nil {
			 = 
		}

		 = append(, )

		 -= 
		 += 
	}

	// RFC 4960 Sec 6.6
	// Note: When transmitting ordered and unordered data, an endpoint does
	// not increment its Stream Sequence Number when transmitting a DATA
	// chunk with U flag set to 1.
	if ! {
		.sequenceNumber++
	}

	.bufferedAmount += uint64(len())
	.log.Tracef("[%s] bufferedAmount = %d", .name, .bufferedAmount)

	return , 
}

// Close closes the write-direction of the stream.
// Future calls to Write are not permitted after calling Close.
func ( *Stream) () error {
	if ,  := func() (uint16, bool) {
		.lock.Lock()
		defer .lock.Unlock()

		.log.Debugf("[%s] Close: state=%s", .name, .state.String())

		if .state == StreamStateOpen {
			if .readErr == nil {
				.state = StreamStateClosing
			} else {
				.state = StreamStateClosed
			}
			.log.Debugf("[%s] state change: open => %s", .name, .state.String())

			return .streamIdentifier, true
		}

		return .streamIdentifier, false
	}();  {
		// Reset the outgoing stream
		// https://tools.ietf.org/html/rfc6525
		return .association.sendResetRequest()
	}

	return nil
}

// BufferedAmount returns the number of bytes of data currently queued to be sent over this stream.
func ( *Stream) () uint64 {
	.lock.RLock()
	defer .lock.RUnlock()

	return .bufferedAmount
}

// BufferedAmountLowThreshold returns the number of bytes of buffered outgoing data that is
// considered "low." Defaults to 0.
func ( *Stream) () uint64 {
	.lock.RLock()
	defer .lock.RUnlock()

	return .bufferedAmountLow
}

// SetBufferedAmountLowThreshold is used to update the threshold.
// See BufferedAmountLowThreshold().
func ( *Stream) ( uint64) {
	.lock.Lock()
	defer .lock.Unlock()

	.bufferedAmountLow = 
}

// OnBufferedAmountLow sets the callback handler which would be called when the number of
// bytes of outgoing data buffered is lower than the threshold.
func ( *Stream) ( func()) {
	.lock.Lock()
	defer .lock.Unlock()

	.onBufferedAmountLow = 
}

// This method is called by association's readLoop (go-)routine to notify this stream
// of the specified amount of outgoing data has been delivered to the peer.
func ( *Stream) ( int) {
	if  <= 0 {
		return
	}

	.lock.Lock()

	 := .bufferedAmount

	if .bufferedAmount < uint64() {
		.bufferedAmount = 0
		.log.Errorf("[%s] released buffer size %d should be <= %d",
			.name, , .bufferedAmount)
	} else {
		.bufferedAmount -= uint64()
	}

	.log.Tracef("[%s] bufferedAmount = %d", .name, .bufferedAmount)

	if .onBufferedAmountLow != nil &&  > .bufferedAmountLow && .bufferedAmount <= .bufferedAmountLow {
		 := .onBufferedAmountLow
		.lock.Unlock()
		()

		return
	}

	.lock.Unlock()
}

func ( *Stream) () int {
	// No lock is required as it reads the size with atomic load function.
	return .reassemblyQueue.getNumBytes()
}

func ( *Stream) () {
	.lock.Lock()
	defer .lock.Unlock()

	.log.Debugf("[%s] onInboundStreamReset: state=%s", .name, .state.String())

	// No more inbound data to read. Unblock the read with io.EOF.
	// This should cause DCEP layer (datachannel package) to call Close() which
	// will reset outgoing stream also.

	// See RFC 8831 section 6.7:
	//	if one side decides to close the data channel, it resets the corresponding
	//	outgoing stream.  When the peer sees that an incoming stream was
	//	reset, it also resets its corresponding outgoing stream.  Once this
	//	is completed, the data channel is closed.

	.readErr = io.EOF
	.readNotifier.Broadcast()

	if .state == StreamStateClosing {
		.log.Debugf("[%s] state change: closing => closed", .name)
		.state = StreamStateClosed
	}
}

// State return the stream state.
func ( *Stream) () StreamState {
	.lock.RLock()
	defer .lock.RUnlock()

	return .state
}