// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package datachannel implements WebRTC Data Channels
package datachannel import ( ) const receiveMTU = 8192 // Reader is an extended io.Reader // that also returns if the message is text. type Reader interface { ReadDataChannel([]byte) (int, bool, error) } // ReadDeadliner extends an io.Reader to expose setting a read deadline. type ReadDeadliner interface { SetReadDeadline(time.Time) error } // Writer is an extended io.Writer // that also allows indicating if a message is text. type Writer interface { WriteDataChannel([]byte, bool) (int, error) } // WriteDeadliner extends an io.Writer to expose setting a write deadline. type WriteDeadliner interface { SetWriteDeadline(time.Time) error } // ReadWriteCloser is an extended io.ReadWriteCloser // that also implements our Reader and Writer. type ReadWriteCloser interface { io.Reader io.Writer Reader Writer io.Closer } // ReadWriteCloserDeadliner is an extended ReadWriteCloser // that also implements r/w deadline. type ReadWriteCloserDeadliner interface { ReadWriteCloser ReadDeadliner WriteDeadliner } // DataChannel represents a data channel type DataChannel struct { Config // stats messagesSent uint32 messagesReceived uint32 bytesSent uint64 bytesReceived uint64 mu sync.Mutex onOpenCompleteHandler func() openCompleteHandlerOnce sync.Once stream *sctp.Stream log logging.LeveledLogger } // Config is used to configure the data channel. type Config struct { ChannelType ChannelType Negotiated bool Priority uint16 ReliabilityParameter uint32 Label string Protocol string LoggerFactory logging.LoggerFactory } func newDataChannel( *sctp.Stream, *Config) *DataChannel { return &DataChannel{ Config: *, stream: , log: .LoggerFactory.NewLogger("datachannel"), } } // Dial opens a data channels over SCTP func ( *sctp.Association, uint16, *Config) (*DataChannel, error) { , := .OpenStream(, sctp.PayloadTypeWebRTCBinary) if != nil { return nil, } , := Client(, ) if != nil { return nil, } return , nil } // Client opens a data channel over an SCTP stream func ( *sctp.Stream, *Config) (*DataChannel, error) { := &channelOpen{ ChannelType: .ChannelType, Priority: .Priority, ReliabilityParameter: .ReliabilityParameter, Label: []byte(.Label), Protocol: []byte(.Protocol), } if !.Negotiated { , := .Marshal() if != nil { return nil, fmt.Errorf("failed to marshal ChannelOpen %w", ) } if _, = .WriteSCTP(, sctp.PayloadTypeWebRTCDCEP); != nil { return nil, fmt.Errorf("failed to send ChannelOpen %w", ) } } return newDataChannel(, ), nil } // Accept is used to accept incoming data channels over SCTP func ( *sctp.Association, *Config, ...*DataChannel) (*DataChannel, error) { , := .AcceptStream() if != nil { return nil, } for , := range { if .StreamIdentifier() == .StreamIdentifier() { .stream.SetDefaultPayloadType(sctp.PayloadTypeWebRTCBinary) return , nil } } .SetDefaultPayloadType(sctp.PayloadTypeWebRTCBinary) , := Server(, ) if != nil { return nil, } return , nil } // Server accepts a data channel over an SCTP stream func ( *sctp.Stream, *Config) (*DataChannel, error) { := make([]byte, receiveMTU) , , := .ReadSCTP() if != nil { return nil, } if != sctp.PayloadTypeWebRTCDCEP { return nil, fmt.Errorf("%w %s", ErrInvalidPayloadProtocolIdentifier, ) } , := parseExpectDataChannelOpen([:]) if != nil { return nil, fmt.Errorf("failed to parse DataChannelOpen packet %w", ) } .ChannelType = .ChannelType .Priority = .Priority .ReliabilityParameter = .ReliabilityParameter .Label = string(.Label) .Protocol = string(.Protocol) := newDataChannel(, ) = .writeDataChannelAck() if != nil { return nil, } = .commitReliabilityParams() if != nil { return nil, } return , nil } // Read reads a packet of len(p) bytes as binary data func ( *DataChannel) ( []byte) (int, error) { , , := .ReadDataChannel() return , } // ReadDataChannel reads a packet of len(p) bytes func ( *DataChannel) ( []byte) (int, bool, error) { for { , , := .stream.ReadSCTP() if errors.Is(, io.EOF) { // When the peer sees that an incoming stream was // reset, it also resets its corresponding outgoing stream. if := .stream.Close(); != nil { return 0, false, } } if != nil { return 0, false, } if == sctp.PayloadTypeWebRTCDCEP { if = .handleDCEP([:]); != nil { .log.Errorf("Failed to handle DCEP: %s", .Error()) } continue } else if == sctp.PayloadTypeWebRTCBinaryEmpty || == sctp.PayloadTypeWebRTCStringEmpty { = 0 } atomic.AddUint32(&.messagesReceived, 1) atomic.AddUint64(&.bytesReceived, uint64()) := == sctp.PayloadTypeWebRTCString || == sctp.PayloadTypeWebRTCStringEmpty return , , } } // SetReadDeadline sets a deadline for reads to return func ( *DataChannel) ( time.Time) error { return .stream.SetReadDeadline() } // SetWriteDeadline sets a deadline for writes to return, // only available if the BlockWrite is enabled for sctp func ( *DataChannel) ( time.Time) error { return .stream.SetWriteDeadline() } // MessagesSent returns the number of messages sent func ( *DataChannel) () uint32 { return atomic.LoadUint32(&.messagesSent) } // MessagesReceived returns the number of messages received func ( *DataChannel) () uint32 { return atomic.LoadUint32(&.messagesReceived) } // OnOpen sets an event handler which is invoked when // a DATA_CHANNEL_ACK message is received. // The handler is called only on thefor the channel opened // https://datatracker.ietf.org/doc/html/draft-ietf-rtcweb-data-protocol-09#section-5.2 func ( *DataChannel) ( func()) { .mu.Lock() .openCompleteHandlerOnce = sync.Once{} .onOpenCompleteHandler = .mu.Unlock() } func ( *DataChannel) () { .mu.Lock() := .onOpenCompleteHandler .mu.Unlock() if != nil { go .openCompleteHandlerOnce.Do(func() { () }) } } // BytesSent returns the number of bytes sent func ( *DataChannel) () uint64 { return atomic.LoadUint64(&.bytesSent) } // BytesReceived returns the number of bytes received func ( *DataChannel) () uint64 { return atomic.LoadUint64(&.bytesReceived) } // StreamIdentifier returns the Stream identifier associated to the stream. func ( *DataChannel) () uint16 { return .stream.StreamIdentifier() } func ( *DataChannel) ( []byte) error { , := parse() if != nil { return fmt.Errorf("failed to parse DataChannel packet %w", ) } switch msg := .(type) { case *channelAck: if := .commitReliabilityParams(); != nil { return } .onOpenComplete() default: return fmt.Errorf("%w, wanted ACK got %v", ErrUnexpectedDataChannelType, ) } return nil } // Write writes len(p) bytes from p as binary data func ( *DataChannel) ( []byte) ( int, error) { return .WriteDataChannel(, false) } // WriteDataChannel writes len(p) bytes from p func ( *DataChannel) ( []byte, bool) ( int, error) { // https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-12#section-6.6 // SCTP does not support the sending of empty user messages. Therefore, // if an empty message has to be sent, the appropriate PPID (WebRTC // String Empty or WebRTC Binary Empty) is used and the SCTP user // message of one zero byte is sent. When receiving an SCTP user // message with one of these PPIDs, the receiver MUST ignore the SCTP // user message and process it as an empty message. var sctp.PayloadProtocolIdentifier switch { case ! && len() > 0: = sctp.PayloadTypeWebRTCBinary case ! && len() == 0: = sctp.PayloadTypeWebRTCBinaryEmpty case && len() > 0: = sctp.PayloadTypeWebRTCString case && len() == 0: = sctp.PayloadTypeWebRTCStringEmpty } atomic.AddUint32(&.messagesSent, 1) atomic.AddUint64(&.bytesSent, uint64(len())) if len() == 0 { , := .stream.WriteSCTP([]byte{0}, ) return 0, } return .stream.WriteSCTP(, ) } func ( *DataChannel) () error { := channelAck{} , := .Marshal() if != nil { return fmt.Errorf("failed to marshal ChannelOpen ACK: %w", ) } if _, = .stream.WriteSCTP(, sctp.PayloadTypeWebRTCDCEP); != nil { return fmt.Errorf("failed to send ChannelOpen ACK: %w", ) } return } // Close closes the DataChannel and the underlying SCTP stream. func ( *DataChannel) () error { // https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.7 // Closing of a data channel MUST be signaled by resetting the // corresponding outgoing streams [RFC6525]. This means that if one // side decides to close the data channel, it resets the corresponding // outgoing stream. When the peer sees that an incoming stream was // reset, it also resets its corresponding outgoing stream. Once this // is completed, the data channel is closed. Resetting a stream sets // the Stream Sequence Numbers (SSNs) of the stream back to 'zero' with // a corresponding notification to the application layer that the reset // has been performed. Streams are available for reuse after a reset // has been performed. return .stream.Close() } // BufferedAmount returns the number of bytes of data currently queued to be // sent over this stream. func ( *DataChannel) () uint64 { return .stream.BufferedAmount() } // BufferedAmountLowThreshold returns the number of bytes of buffered outgoing // data that is considered "low." Defaults to 0. func ( *DataChannel) () uint64 { return .stream.BufferedAmountLowThreshold() } // SetBufferedAmountLowThreshold is used to update the threshold. // See BufferedAmountLowThreshold(). func ( *DataChannel) ( uint64) { .stream.SetBufferedAmountLowThreshold() } // OnBufferedAmountLow sets the callback handler which would be called when the // number of bytes of outgoing data buffered is lower than the threshold. func ( *DataChannel) ( func()) { .stream.OnBufferedAmountLow() } func ( *DataChannel) () error { switch .Config.ChannelType { case ChannelTypeReliable: .stream.SetReliabilityParams(false, sctp.ReliabilityTypeReliable, .Config.ReliabilityParameter) case ChannelTypeReliableUnordered: .stream.SetReliabilityParams(true, sctp.ReliabilityTypeReliable, .Config.ReliabilityParameter) case ChannelTypePartialReliableRexmit: .stream.SetReliabilityParams(false, sctp.ReliabilityTypeRexmit, .Config.ReliabilityParameter) case ChannelTypePartialReliableRexmitUnordered: .stream.SetReliabilityParams(true, sctp.ReliabilityTypeRexmit, .Config.ReliabilityParameter) case ChannelTypePartialReliableTimed: .stream.SetReliabilityParams(false, sctp.ReliabilityTypeTimed, .Config.ReliabilityParameter) case ChannelTypePartialReliableTimedUnordered: .stream.SetReliabilityParams(true, sctp.ReliabilityTypeTimed, .Config.ReliabilityParameter) default: return fmt.Errorf("%w %v", ErrInvalidChannelType, .Config.ChannelType) } return nil }