// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

//go:build !js
// +build !js

package webrtc

import (
	
	
	
	
	
	

	
	
	
)

var errSCTPNotEstablished = errors.New("SCTP not established")

// DataChannel represents a WebRTC DataChannel
// The DataChannel interface represents a network channel
// which can be used for bidirectional peer-to-peer transfers of arbitrary data.
type DataChannel struct {
	mu sync.RWMutex

	statsID                    string
	label                      string
	ordered                    bool
	maxPacketLifeTime          *uint16
	maxRetransmits             *uint16
	protocol                   string
	negotiated                 bool
	id                         *uint16
	readyState                 atomic.Value // DataChannelState
	bufferedAmountLowThreshold uint64
	detachCalled               bool
	readLoopActive             chan struct{}
	isGracefulClosed           bool

	// The binaryType represents attribute MUST, on getting, return the value to
	// which it was last set. On setting, if the new value is either the string
	// "blob" or the string "arraybuffer", then set the IDL attribute to this
	// new value. Otherwise, throw a SyntaxError. When an DataChannel object
	// is created, the binaryType attribute MUST be initialized to the string
	// "blob". This attribute controls how binary data is exposed to scripts.
	// binaryType                 string

	onMessageHandler    func(DataChannelMessage)
	openHandlerOnce     sync.Once
	onOpenHandler       func()
	dialHandlerOnce     sync.Once
	onDialHandler       func()
	onCloseHandler      func()
	onBufferedAmountLow func()
	onErrorHandler      func(error)

	sctpTransport *SCTPTransport
	dataChannel   *datachannel.DataChannel

	// A reference to the associated api object used by this datachannel
	api *API
	log logging.LeveledLogger
}

// NewDataChannel creates a new DataChannel.
// This constructor is part of the ORTC API. It is not
// meant to be used together with the basic WebRTC API.
func ( *API) ( *SCTPTransport,  *DataChannelParameters) (*DataChannel, error) {
	,  := .newDataChannel(, nil, .settingEngine.LoggerFactory.NewLogger("ortc"))
	if  != nil {
		return nil, 
	}

	 = .open()
	if  != nil {
		return nil, 
	}

	return , nil
}

// newDataChannel is an internal constructor for the data channel used to
// create the DataChannel object before the networking is set up.
func ( *API) (
	 *DataChannelParameters,
	 *SCTPTransport,
	 logging.LeveledLogger,
) (*DataChannel, error) {
	// https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #5)
	if len(.Label) > 65535 {
		return nil, &rtcerr.TypeError{Err: ErrStringSizeLimit}
	}

	 := &DataChannel{
		sctpTransport:     ,
		statsID:           fmt.Sprintf("DataChannel-%d", time.Now().UnixNano()),
		label:             .Label,
		protocol:          .Protocol,
		negotiated:        .Negotiated,
		id:                .ID,
		ordered:           .Ordered,
		maxPacketLifeTime: .MaxPacketLifeTime,
		maxRetransmits:    .MaxRetransmits,
		api:               ,
		log:               ,
	}

	.setReadyState(DataChannelStateConnecting)

	return , nil
}

// open opens the datachannel over the sctp transport.
func ( *DataChannel) ( *SCTPTransport) error { //nolint:cyclop
	 := .association()
	if  == nil {
		return errSCTPNotEstablished
	}

	.mu.Lock()
	if .sctpTransport != nil { // already open
		.mu.Unlock()

		return nil
	}
	.sctpTransport = 
	var  datachannel.ChannelType
	var  uint32

	switch {
	case .maxPacketLifeTime == nil && .maxRetransmits == nil:
		if .ordered {
			 = datachannel.ChannelTypeReliable
		} else {
			 = datachannel.ChannelTypeReliableUnordered
		}

	case .maxRetransmits != nil:
		 = uint32(*.maxRetransmits)
		if .ordered {
			 = datachannel.ChannelTypePartialReliableRexmit
		} else {
			 = datachannel.ChannelTypePartialReliableRexmitUnordered
		}
	default:
		 = uint32(*.maxPacketLifeTime)
		if .ordered {
			 = datachannel.ChannelTypePartialReliableTimed
		} else {
			 = datachannel.ChannelTypePartialReliableTimedUnordered
		}
	}

	 := &datachannel.Config{
		ChannelType:          ,
		Priority:             datachannel.ChannelPriorityNormal,
		ReliabilityParameter: ,
		Label:                .label,
		Protocol:             .protocol,
		Negotiated:           .negotiated,
		LoggerFactory:        .api.settingEngine.LoggerFactory,
	}

	if .id == nil {
		// avoid holding lock when generating ID, since id generation locks
		.mu.Unlock()
		var  *uint16
		 := .sctpTransport.generateAndSetDataChannelID(.sctpTransport.dtlsTransport.role(), &)
		if  != nil {
			return 
		}
		.mu.Lock()
		.id = 
	}
	,  := datachannel.Dial(, *.id, )
	if  != nil {
		.mu.Unlock()

		return 
	}

	// bufferedAmountLowThreshold and onBufferedAmountLow might be set earlier
	.SetBufferedAmountLowThreshold(.bufferedAmountLowThreshold)
	.OnBufferedAmountLow(.onBufferedAmountLow)
	.mu.Unlock()

	.onDial()
	.handleOpen(, false, .negotiated)

	return nil
}

// Transport returns the SCTPTransport instance the DataChannel is sending over.
func ( *DataChannel) () *SCTPTransport {
	.mu.RLock()
	defer .mu.RUnlock()

	return .sctpTransport
}

// After onOpen is complete check that the user called detach
// and provide an error message if the call was missed.
func ( *DataChannel) () {
	.mu.RLock()
	defer .mu.RUnlock()

	if .api.settingEngine.detach.DataChannels && !.detachCalled {
		.log.Warn("webrtc.DetachDataChannels() enabled but didn't Detach, call Detach from OnOpen")
	}
}

// OnOpen sets an event handler which is invoked when
// the underlying data transport has been established (or re-established).
func ( *DataChannel) ( func()) {
	.mu.Lock()
	.openHandlerOnce = sync.Once{}
	.onOpenHandler = 
	.mu.Unlock()

	if .ReadyState() == DataChannelStateOpen {
		// If the data channel is already open, call the handler immediately.
		go .openHandlerOnce.Do(func() {
			()
			.checkDetachAfterOpen()
		})
	}
}

func ( *DataChannel) () {
	.mu.RLock()
	 := .onOpenHandler
	if .isGracefulClosed {
		.mu.RUnlock()

		return
	}
	.mu.RUnlock()

	if  != nil {
		go .openHandlerOnce.Do(func() {
			()
			.checkDetachAfterOpen()
		})
	}
}

// OnDial sets an event handler which is invoked when the
// peer has been dialed, but before said peer has responded.
func ( *DataChannel) ( func()) {
	.mu.Lock()
	.dialHandlerOnce = sync.Once{}
	.onDialHandler = 
	.mu.Unlock()

	if .ReadyState() == DataChannelStateOpen {
		// If the data channel is already open, call the handler immediately.
		go .dialHandlerOnce.Do()
	}
}

func ( *DataChannel) () {
	.mu.RLock()
	 := .onDialHandler
	if .isGracefulClosed {
		.mu.RUnlock()

		return
	}
	.mu.RUnlock()

	if  != nil {
		go .dialHandlerOnce.Do()
	}
}

// OnClose sets an event handler which is invoked when
// the underlying data transport has been closed.
// Note: Due to backwards compatibility, there is a chance that
// OnClose can be called, even if the GracefulClose is used.
// If this is the case for you, you can deregister OnClose
// prior to GracefulClose.
func ( *DataChannel) ( func()) {
	.mu.Lock()
	defer .mu.Unlock()
	.onCloseHandler = 
}

func ( *DataChannel) () {
	.mu.RLock()
	 := .onCloseHandler
	.mu.RUnlock()

	if  != nil {
		go ()
	}
}

// OnMessage sets an event handler which is invoked on a binary
// message arrival over the sctp transport from a remote peer.
// OnMessage can currently receive messages up to 16384 bytes
// in size. Check out the detach API if you want to use larger
// message sizes. Note that browser support for larger messages
// is also limited.
func ( *DataChannel) ( func( DataChannelMessage)) {
	.mu.Lock()
	defer .mu.Unlock()
	.onMessageHandler = 
}

func ( *DataChannel) ( DataChannelMessage) {
	.mu.RLock()
	 := .onMessageHandler
	if .isGracefulClosed {
		.mu.RUnlock()

		return
	}
	.mu.RUnlock()

	if  == nil {
		return
	}
	()
}

func ( *DataChannel) ( *datachannel.DataChannel, ,  bool) {
	.mu.Lock()
	if .isGracefulClosed { // The channel was closed during the connecting state
		.mu.Unlock()
		if  := .Close();  != nil {
			.log.Errorf("Failed to close DataChannel that was closed during connecting state %v", .Error())
		}
		.onClose()

		return
	}
	.dataChannel = 
	 := .bufferedAmountLowThreshold
	 := .onBufferedAmountLow
	.mu.Unlock()
	.setReadyState(DataChannelStateOpen)

	// Fire the OnOpen handler immediately not using pion/datachannel
	// * detached datachannels have no read loop, the user needs to read and query themselves
	// * remote datachannels should fire OnOpened. This isn't spec compliant, but we can't break behavior yet
	// * already negotiated datachannels should fire OnOpened
	if .api.settingEngine.detach.DataChannels ||  ||  {
		// bufferedAmountLowThreshold and onBufferedAmountLow might be set earlier
		.dataChannel.SetBufferedAmountLowThreshold()
		.dataChannel.OnBufferedAmountLow()
		.onOpen()
	} else {
		.OnOpen(func() {
			.onOpen()
		})
	}

	.mu.Lock()
	defer .mu.Unlock()

	if .isGracefulClosed {
		return
	}

	if !.api.settingEngine.detach.DataChannels {
		.readLoopActive = make(chan struct{})
		go .readLoop()
	}
}

// OnError sets an event handler which is invoked when
// the underlying data transport cannot be read.
func ( *DataChannel) ( func( error)) {
	.mu.Lock()
	defer .mu.Unlock()
	.onErrorHandler = 
}

func ( *DataChannel) ( error) {
	.mu.RLock()
	 := .onErrorHandler
	if .isGracefulClosed {
		.mu.RUnlock()

		return
	}
	.mu.RUnlock()

	if  != nil {
		go ()
	}
}

func ( *DataChannel) () {
	defer func() {
		.mu.Lock()
		 := .readLoopActive
		.mu.Unlock()
		defer close()
	}()

	 := make([]byte, sctpMaxMessageSizeUnsetValue)
	for {
		, ,  := .dataChannel.ReadDataChannel()
		if  != nil {
			if errors.Is(, io.ErrShortBuffer) {
				if int64() < int64(.api.settingEngine.getSCTPMaxMessageSize()) {
					 = append(, make([]byte, len())...) // nolint

					continue
				}

				.log.Errorf(
					"Incoming DataChannel message larger then Max Message size %v",
					.api.settingEngine.getSCTPMaxMessageSize(),
				)
			}

			.setReadyState(DataChannelStateClosed)
			if !errors.Is(, io.EOF) {
				.onError()
			}
			.onClose()

			return
		}

		.onMessage(DataChannelMessage{
			Data:     append([]byte{}, [:]...),
			IsString: ,
		})
	}
}

// Send sends the binary message to the DataChannel peer.
func ( *DataChannel) ( []byte) error {
	 := .ensureOpen()
	if  != nil {
		return 
	}

	_,  = .dataChannel.WriteDataChannel(, false)

	return 
}

// SendText sends the text message to the DataChannel peer.
func ( *DataChannel) ( string) error {
	 := .ensureOpen()
	if  != nil {
		return 
	}

	_,  = .dataChannel.WriteDataChannel([]byte(), true)

	return 
}

func ( *DataChannel) () error {
	.mu.RLock()
	defer .mu.RUnlock()
	if .ReadyState() != DataChannelStateOpen {
		return io.ErrClosedPipe
	}

	return nil
}

// Detach allows you to detach the underlying datachannel.
// This provides an idiomatic API to work with
// (`io.ReadWriteCloser` with its `.Read()` and `.Write()` methods,
// as opposed to `.Send()` and `.OnMessage`),
// however it disables the OnMessage callback.
// Before calling Detach you have to enable this behavior by calling
// webrtc.DetachDataChannels(). Combining detached and normal data channels
// is not supported.
// Please refer to the data-channels-detach example and the
// pion/datachannel documentation for the correct way to handle the
// resulting DataChannel object.
func ( *DataChannel) () (datachannel.ReadWriteCloser, error) {
	return .DetachWithDeadline()
}

// DetachWithDeadline allows you to detach the underlying datachannel.
// It is the same as Detach but returns a ReadWriteCloserDeadliner.
func ( *DataChannel) () (datachannel.ReadWriteCloserDeadliner, error) {
	.mu.Lock()

	if !.api.settingEngine.detach.DataChannels {
		.mu.Unlock()

		return nil, errDetachNotEnabled
	}

	if .dataChannel == nil {
		.mu.Unlock()

		return nil, errDetachBeforeOpened
	}

	.detachCalled = true

	 := .dataChannel
	.mu.Unlock()

	// Remove the reference from SCTPTransport so that the datachannel
	// can be garbage collected on close
	.sctpTransport.lock.Lock()
	 := len(.sctpTransport.dataChannels)
	 := 0
	for  := 0;  < ; ++ {
		if  == .sctpTransport.dataChannels[] {
			continue
		}
		.sctpTransport.dataChannels[] = .sctpTransport.dataChannels[]
		++
	}
	for  := ;  < ; ++ {
		.sctpTransport.dataChannels[] = nil
	}
	.sctpTransport.dataChannels = .sctpTransport.dataChannels[:]
	.sctpTransport.lock.Unlock()

	return , nil
}

// Close Closes the DataChannel. It may be called regardless of whether
// the DataChannel object was created by this peer or the remote peer.
func ( *DataChannel) () error {
	return .close(false)
}

// GracefulClose Closes the DataChannel. It may be called regardless of whether
// the DataChannel object was created by this peer or the remote peer. It also waits
// for any goroutines it started to complete. This is only safe to call outside of
// DataChannel callbacks or if in a callback, in its own goroutine.
func ( *DataChannel) () error {
	return .close(true)
}

// Normally, close only stops writes from happening, so graceful=true
// will wait for reads to be finished based on underlying SCTP association
// closure or a SCTP reset stream from the other side. This is safe to call
// with graceful=true after tearing down a PeerConnection but not
// necessarily before. For example, if you used a vnet and dropped all packets
// right before closing the DataChannel, you'd need never see a reset stream.
func ( *DataChannel) ( bool) error {
	.mu.Lock()
	.isGracefulClosed = true
	 := .readLoopActive
	if  &&  != nil {
		defer func() {
			<-
		}()
	}
	 := .dataChannel != nil
	.mu.Unlock()

	if .ReadyState() == DataChannelStateClosed {
		return nil
	}

	.setReadyState(DataChannelStateClosing)
	if ! {
		return nil
	}

	return .dataChannel.Close()
}

// Label represents a label that can be used to distinguish this
// DataChannel object from other DataChannel objects. Scripts are
// allowed to create multiple DataChannel objects with the same label.
func ( *DataChannel) () string {
	.mu.RLock()
	defer .mu.RUnlock()

	return .label
}

// Ordered returns true if the DataChannel is ordered, and false if
// out-of-order delivery is allowed.
func ( *DataChannel) () bool {
	.mu.RLock()
	defer .mu.RUnlock()

	return .ordered
}

// MaxPacketLifeTime represents the length of the time window (msec) during
// which transmissions and retransmissions may occur in unreliable mode.
func ( *DataChannel) () *uint16 {
	.mu.RLock()
	defer .mu.RUnlock()

	return .maxPacketLifeTime
}

// MaxRetransmits represents the maximum number of retransmissions that are
// attempted in unreliable mode.
func ( *DataChannel) () *uint16 {
	.mu.RLock()
	defer .mu.RUnlock()

	return .maxRetransmits
}

// Protocol represents the name of the sub-protocol used with this
// DataChannel.
func ( *DataChannel) () string {
	.mu.RLock()
	defer .mu.RUnlock()

	return .protocol
}

// Negotiated represents whether this DataChannel was negotiated by the
// application (true), or not (false).
func ( *DataChannel) () bool {
	.mu.RLock()
	defer .mu.RUnlock()

	return .negotiated
}

// ID represents the ID for this DataChannel. The value is initially
// null, which is what will be returned if the ID was not provided at
// channel creation time, and the DTLS role of the SCTP transport has not
// yet been negotiated. Otherwise, it will return the ID that was either
// selected by the script or generated. After the ID is set to a non-null
// value, it will not change.
func ( *DataChannel) () *uint16 {
	.mu.RLock()
	defer .mu.RUnlock()

	return .id
}

// ReadyState represents the state of the DataChannel object.
func ( *DataChannel) () DataChannelState {
	if ,  := .readyState.Load().(DataChannelState);  {
		return 
	}

	return DataChannelState(0)
}

// BufferedAmount represents the number of bytes of application data
// (UTF-8 text and binary data) that have been queued using send(). Even
// though the data transmission can occur in parallel, the returned value
// MUST NOT be decreased before the current task yielded back to the event
// loop to prevent race conditions. The value does not include framing
// overhead incurred by the protocol, or buffering done by the operating
// system or network hardware. The value of BufferedAmount slot will only
// increase with each call to the send() method as long as the ReadyState is
// open; however, BufferedAmount does not reset to zero once the channel
// closes.
func ( *DataChannel) () uint64 {
	.mu.RLock()
	defer .mu.RUnlock()

	if .dataChannel == nil {
		return 0
	}

	return .dataChannel.BufferedAmount()
}

// BufferedAmountLowThreshold represents the threshold at which the
// bufferedAmount is considered to be low. When the bufferedAmount decreases
// from above this threshold to equal or below it, the bufferedamountlow
// event fires. BufferedAmountLowThreshold is initially zero on each new
// DataChannel, but the application may change its value at any time.
// The threshold is set to 0 by default.
func ( *DataChannel) () uint64 {
	.mu.RLock()
	defer .mu.RUnlock()

	if .dataChannel == nil {
		return .bufferedAmountLowThreshold
	}

	return .dataChannel.BufferedAmountLowThreshold()
}

// SetBufferedAmountLowThreshold is used to update the threshold.
// See BufferedAmountLowThreshold().
func ( *DataChannel) ( uint64) {
	.mu.Lock()
	defer .mu.Unlock()

	.bufferedAmountLowThreshold = 

	if .dataChannel != nil {
		.dataChannel.SetBufferedAmountLowThreshold()
	}
}

// OnBufferedAmountLow sets an event handler which is invoked when
// the number of bytes of outgoing data becomes lower than or equal to the
// BufferedAmountLowThreshold.
func ( *DataChannel) ( func()) {
	.mu.Lock()
	defer .mu.Unlock()

	.onBufferedAmountLow = 
	if .dataChannel != nil {
		.dataChannel.OnBufferedAmountLow()
	}
}

func ( *DataChannel) () string {
	.mu.Lock()
	defer .mu.Unlock()

	return .statsID
}

func ( *DataChannel) ( *statsReportCollector) {
	.Collecting()

	.mu.Lock()
	defer .mu.Unlock()

	 := DataChannelStats{
		Timestamp: statsTimestampNow(),
		Type:      StatsTypeDataChannel,
		ID:        .statsID,
		Label:     .label,
		Protocol:  .protocol,
		// TransportID string `json:"transportId"`
		State: .ReadyState(),
	}

	if .id != nil {
		.DataChannelIdentifier = int32(*.id)
	}

	if .dataChannel != nil {
		.MessagesSent = .dataChannel.MessagesSent()
		.BytesSent = .dataChannel.BytesSent()
		.MessagesReceived = .dataChannel.MessagesReceived()
		.BytesReceived = .dataChannel.BytesReceived()
	}

	.Collect(.ID, )
}

func ( *DataChannel) ( DataChannelState) {
	.readyState.Store()
}