package libp2pwebrtc

import (
	
	
	
	

	
	
	

	
	
)

const (
	// maxSendMessageSize is the maximum message size of the Protobuf message we send / receive.
	// NOTE: Change `varintOverhead` if you change this.
	maxSendMessageSize = 16384
	// Proto overhead assumption is 5 bytes
	protoOverhead = 5
	// Varint overhead is assumed to be 2 bytes. This is safe since
	// 1. This is only used and when writing message, and
	// 2. We only send messages in chunks of `maxMessageSize - varintOverhead`
	// which includes the data and the protobuf header. Since `maxMessageSize`
	// is less than or equal to 2 ^ 14, the varint will not be more than
	// 2 bytes in length.
	varintOverhead = 2

	// maxTotalControlMessagesSize is the maximum total size of all control messages we will
	// write on this stream.
	// 4 control messages of size 10 bytes + 10 bytes buffer. This number doesn't need to be
	// exact. In the worst case, we enqueue these many bytes more in the webrtc peer connection
	// send queue.
	maxTotalControlMessagesSize = 50

	// maxFINACKWait is the maximum amount of time a stream will wait to read
	// FIN_ACK before closing the data channel
	maxFINACKWait = 10 * time.Second

	// maxReceiveMessageSize is the maximum message size of the Protobuf message we receive.
	maxReceiveMessageSize = 256<<10 + 1<<10 // 1kB buffer
)

type receiveState uint8

const (
	receiveStateReceiving receiveState = iota
	receiveStateDataRead               // received and read the FIN
	receiveStateReset                  // either by calling CloseRead locally, or by receiving
)

type sendState uint8

const (
	sendStateSending sendState = iota
	sendStateDataSent
	sendStateDataReceived
	sendStateReset
)

// Package pion detached data channel into a net.Conn
// and then a network.MuxedStream
type stream struct {
	mx sync.Mutex

	// readerMx ensures that only a single goroutine reads from the reader. Read is not threadsafe
	// But we may need to read from reader for control messages from a different goroutine.
	readerMx  sync.Mutex
	reader    pbio.Reader
	readError error

	// this buffer is limited up to a single message. Reason we need it
	// is because a reader might read a message midway, and so we need a
	// wait to buffer that for as long as the remaining part is not (yet) read
	nextMessage  *pb.Message
	receiveState receiveState

	writer             pbio.Writer // concurrent writes prevented by mx
	writeStateChanged  chan struct{}
	sendState          sendState
	writeDeadline      time.Time
	writeError         error
	maxSendMessageSize int

	controlMessageReaderOnce sync.Once
	// controlMessageReaderEndTime is the end time for reading FIN_ACK from the control
	// message reader. We cannot rely on SetReadDeadline to do this since that is prone to
	// race condition where a previous deadline timer fires after the latest call to
	// SetReadDeadline
	// See: https://github.com/pion/sctp/pull/290
	controlMessageReaderEndTime time.Time

	onDoneOnce          sync.Once
	onDone              func()
	id                  uint16 // for logging purposes
	dataChannel         *datachannel.DataChannel
	closeForShutdownErr error
}

var _ network.MuxedStream = &stream{}

func newStream(
	 *webrtc.DataChannel,
	 datachannel.ReadWriteCloser,
	 int,
	 func(),
) *stream {
	 := &stream{
		reader:             pbio.NewDelimitedReader(, maxReceiveMessageSize),
		writer:             pbio.NewDelimitedWriter(),
		writeStateChanged:  make(chan struct{}, 1),
		id:                 *.ID(),
		dataChannel:        .(*datachannel.DataChannel),
		onDone:             ,
		maxSendMessageSize: ,
	}
	.dataChannel.SetBufferedAmountLowThreshold(uint64(.sendBufferLowThreshold()))
	.dataChannel.OnBufferedAmountLow(func() {
		.notifyWriteStateChanged()
	})
	return 
}

func ( *stream) () error {
	.mx.Lock()
	 := .closeForShutdownErr != nil
	.mx.Unlock()
	if  {
		return nil
	}
	defer .cleanup()
	 := .CloseWrite()
	 := .CloseRead()
	if  != nil ||  != nil {
		.Reset()
		return errors.Join(, )
	}

	.mx.Lock()
	if .controlMessageReaderEndTime.IsZero() {
		.controlMessageReaderEndTime = time.Now().Add(maxFINACKWait)
		.setDataChannelReadDeadline(time.Now().Add(-1 * time.Hour))
	}
	.mx.Unlock()
	return nil
}

func ( *stream) () error {
	return .ResetWithError(0)
}

func ( *stream) ( network.StreamErrorCode) error {
	.mx.Lock()
	 := .closeForShutdownErr != nil
	.mx.Unlock()
	if  {
		return nil
	}

	defer .cleanup()
	 := .cancelWrite()
	 := .closeRead(, false)
	.setDataChannelReadDeadline(time.Now().Add(-1 * time.Hour))
	return errors.Join(, )
}

func ( *stream) ( error) {
	defer .cleanup()

	.mx.Lock()
	defer .mx.Unlock()

	.closeForShutdownErr = 
	.notifyWriteStateChanged()
}

func ( *stream) ( time.Time) error {
	_ = .SetReadDeadline()
	return .SetWriteDeadline()
}

// processIncomingFlag processes the flag(FIN/RST/etc) on msg.
// It needs to be called while the mutex is locked.
func ( *stream) ( *pb.Message) {
	if .Flag == nil {
		return
	}

	switch .GetFlag() {
	case pb.Message_STOP_SENDING:
		// We must process STOP_SENDING after sending a FIN(sendStateDataSent). Remote peer
		// may not send a FIN_ACK once it has sent a STOP_SENDING
		if .sendState == sendStateSending || .sendState == sendStateDataSent {
			.sendState = sendStateReset
			.writeError = &network.StreamError{Remote: true, ErrorCode: network.StreamErrorCode(.GetErrorCode())}
		}
		.notifyWriteStateChanged()
	case pb.Message_FIN_ACK:
		.sendState = sendStateDataReceived
		.notifyWriteStateChanged()
	case pb.Message_FIN:
		if .receiveState == receiveStateReceiving {
			.receiveState = receiveStateDataRead
		}
		if  := .writer.WriteMsg(&pb.Message{Flag: pb.Message_FIN_ACK.Enum()});  != nil {
			log.Debugf("failed to send FIN_ACK: %s", )
			// Remote has finished writing all the data It'll stop waiting for the
			// FIN_ACK eventually or will be notified when we close the datachannel
		}
		.spawnControlMessageReader()
	case pb.Message_RESET:
		if .receiveState == receiveStateReceiving {
			.receiveState = receiveStateReset
			.readError = &network.StreamError{Remote: true, ErrorCode: network.StreamErrorCode(.GetErrorCode())}
		}
		if .sendState == sendStateSending || .sendState == sendStateDataSent {
			.sendState = sendStateReset
			.writeError = &network.StreamError{Remote: true, ErrorCode: network.StreamErrorCode(.GetErrorCode())}
		}
		.spawnControlMessageReader()
	}
}

// spawnControlMessageReader is used for processing control messages after the reader is closed.
func ( *stream) () {
	.controlMessageReaderOnce.Do(func() {
		// Spawn a goroutine to ensure that we're not holding any locks
		go func() {
			// cleanup the sctp deadline timer goroutine
			defer .setDataChannelReadDeadline(time.Time{})

			defer .dataChannel.Close()

			// Unblock any Read call waiting on reader.ReadMsg
			.setDataChannelReadDeadline(time.Now().Add(-1 * time.Hour))

			.readerMx.Lock()
			// We have the lock: any readers blocked on reader.ReadMsg have exited.
			.mx.Lock()
			defer .mx.Unlock()
			// From this point onwards only this goroutine will do reader.ReadMsg.
			// We just wanted to ensure any exising readers have exited.
			// Read calls from this point onwards will exit immediately on checking
			// s.readState
			.readerMx.Unlock()

			if .nextMessage != nil {
				.processIncomingFlag(.nextMessage)
				.nextMessage = nil
			}
			var  pb.Message
			for {
				// Connection closed. No need to cleanup the data channel.
				if .closeForShutdownErr != nil {
					return
				}
				// Write half of the stream completed.
				if .sendState == sendStateDataReceived || .sendState == sendStateReset {
					return
				}
				// FIN_ACK wait deadling exceeded.
				if !.controlMessageReaderEndTime.IsZero() && time.Now().After(.controlMessageReaderEndTime) {
					return
				}

				.setDataChannelReadDeadline(.controlMessageReaderEndTime)
				.mx.Unlock()
				 := .reader.ReadMsg(&)
				.mx.Lock()
				if  != nil {
					// We have to manually manage deadline exceeded errors since pion/sctp can
					// return deadline exceeded error for cancelled deadlines
					// see: https://github.com/pion/sctp/pull/290/files
					if errors.Is(, os.ErrDeadlineExceeded) {
						continue
					}
					return
				}
				.processIncomingFlag(&)
			}
		}()
	})
}

func ( *stream) () {
	.onDoneOnce.Do(func() {
		if .onDone != nil {
			.onDone()
		}
	})
}