package webrtc
import (
"fmt"
"io"
"sync"
"time"
"github.com/pion/interceptor"
"github.com/pion/randutil"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4/internal/util"
)
type trackEncoding struct {
track TrackLocal
srtpStream *srtpWriterFuture
rtcpInterceptor interceptor .RTCPReader
streamInfo interceptor .StreamInfo
context *baseTrackLocalContext
ssrc, ssrcRTX, ssrcFEC SSRC
}
type RTPSender struct {
trackEncodings []*trackEncoding
transport *DTLSTransport
payloadType PayloadType
kind RTPCodecType
negotiated bool
api *API
id string
rtpTransceiver *RTPTransceiver
mu sync .RWMutex
sendCalled, stopCalled chan struct {}
}
func (api *API ) NewRTPSender (track TrackLocal , transport *DTLSTransport ) (*RTPSender , error ) {
if track == nil {
return nil , errRTPSenderTrackNil
} else if transport == nil {
return nil , errRTPSenderDTLSTransportNil
}
id , err := randutil .GenerateCryptoRandomString (32 , "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" )
if err != nil {
return nil , err
}
r := &RTPSender {
transport : transport ,
api : api ,
sendCalled : make (chan struct {}),
stopCalled : make (chan struct {}),
id : id ,
kind : track .Kind (),
}
r .addEncoding (track )
return r , nil
}
func (r *RTPSender ) isNegotiated () bool {
r .mu .RLock ()
defer r .mu .RUnlock ()
return r .negotiated
}
func (r *RTPSender ) setNegotiated () {
r .mu .Lock ()
defer r .mu .Unlock ()
r .negotiated = true
}
func (r *RTPSender ) setRTPTransceiver (rtpTransceiver *RTPTransceiver ) {
r .mu .Lock ()
defer r .mu .Unlock ()
r .rtpTransceiver = rtpTransceiver
}
func (r *RTPSender ) Transport () *DTLSTransport {
r .mu .RLock ()
defer r .mu .RUnlock ()
return r .transport
}
func (r *RTPSender ) GetParameters () RTPSendParameters {
r .mu .RLock ()
defer r .mu .RUnlock ()
var encodings []RTPEncodingParameters
for _ , trackEncoding := range r .trackEncodings {
var rid string
if trackEncoding .track != nil {
rid = trackEncoding .track .RID ()
}
encodings = append (encodings , RTPEncodingParameters {
RTPCodingParameters : RTPCodingParameters {
RID : rid ,
SSRC : trackEncoding .ssrc ,
RTX : RTPRtxParameters {SSRC : trackEncoding .ssrcRTX },
FEC : RTPFecParameters {SSRC : trackEncoding .ssrcFEC },
PayloadType : r .payloadType ,
},
})
}
sendParameters := RTPSendParameters {
RTPParameters : r .api .mediaEngine .getRTPParametersByKind (
r .kind ,
[]RTPTransceiverDirection {RTPTransceiverDirectionSendonly },
),
Encodings : encodings ,
}
if r .rtpTransceiver != nil {
sendParameters .Codecs = r .rtpTransceiver .getCodecs ()
} else {
sendParameters .Codecs = r .api .mediaEngine .getCodecsByKind (r .kind )
}
return sendParameters
}
func (r *RTPSender ) AddEncoding (track TrackLocal ) error {
r .mu .Lock ()
defer r .mu .Unlock ()
if track == nil {
return errRTPSenderTrackNil
}
if track .RID () == "" {
return errRTPSenderRidNil
}
if r .hasStopped () {
return errRTPSenderStopped
}
if r .hasSent () {
return errRTPSenderSendAlreadyCalled
}
var refTrack TrackLocal
if len (r .trackEncodings ) != 0 {
refTrack = r .trackEncodings [0 ].track
}
if refTrack == nil || refTrack .RID () == "" {
return errRTPSenderNoBaseEncoding
}
if refTrack .ID () != track .ID () || refTrack .StreamID () != track .StreamID () || refTrack .Kind () != track .Kind () {
return errRTPSenderBaseEncodingMismatch
}
for _ , encoding := range r .trackEncodings {
if encoding .track == nil {
continue
}
if encoding .track .RID () == track .RID () {
return errRTPSenderRIDCollision
}
}
r .addEncoding (track )
return nil
}
func (r *RTPSender ) addEncoding (track TrackLocal ) {
trackEncoding := &trackEncoding {
track : track ,
ssrc : SSRC (util .RandUint32 ()),
}
if r .api .mediaEngine .isRTXEnabled (r .kind , []RTPTransceiverDirection {RTPTransceiverDirectionSendonly }) {
trackEncoding .ssrcRTX = SSRC (util .RandUint32 ())
}
if r .api .mediaEngine .isFECEnabled (r .kind , []RTPTransceiverDirection {RTPTransceiverDirectionSendonly }) {
trackEncoding .ssrcFEC = SSRC (util .RandUint32 ())
}
r .trackEncodings = append (r .trackEncodings , trackEncoding )
}
func (r *RTPSender ) Track () TrackLocal {
r .mu .RLock ()
defer r .mu .RUnlock ()
if len (r .trackEncodings ) == 0 {
return nil
}
return r .trackEncodings [0 ].track
}
func (r *RTPSender ) ReplaceTrack (track TrackLocal ) error {
r .mu .Lock ()
defer r .mu .Unlock ()
if track != nil && r .kind != track .Kind () {
return ErrRTPSenderNewTrackHasIncorrectKind
}
if track != nil && len (r .trackEncodings ) > 1 {
return ErrRTPSenderNewTrackHasIncorrectEnvelope
}
var replacedTrack TrackLocal
var context *baseTrackLocalContext
for _ , e := range r .trackEncodings {
replacedTrack = e .track
context = e .context
if r .hasSent () && replacedTrack != nil {
if err := replacedTrack .Unbind (context ); err != nil {
return err
}
}
if !r .hasSent () || track == nil {
e .track = track
}
}
if !r .hasSent () || track == nil {
return nil
}
params := r .api .mediaEngine .getRTPParametersByKind (
track .Kind (),
[]RTPTransceiverDirection {RTPTransceiverDirectionSendonly },
)
codec , err := track .Bind (&baseTrackLocalContext {
id : context .ID (),
params : params ,
ssrc : context .SSRC (),
ssrcRTX : context .SSRCRetransmission (),
ssrcFEC : context .SSRCForwardErrorCorrection (),
writeStream : context .WriteStream (),
rtcpInterceptor : context .RTCPReader (),
})
if err != nil {
if _ , reBindErr := replacedTrack .Bind (context ); reBindErr != nil {
return reBindErr
}
return err
}
if r .payloadType != codec .PayloadType {
context .params .Codecs = []RTPCodecParameters {codec }
}
r .trackEncodings [0 ].track = track
return nil
}
func (r *RTPSender ) Send (parameters RTPSendParameters ) error {
r .mu .Lock ()
defer r .mu .Unlock ()
switch {
case r .hasSent ():
return errRTPSenderSendAlreadyCalled
case r .trackEncodings [0 ].track == nil :
return errRTPSenderTrackRemoved
}
for idx := range r .trackEncodings {
trackEncoding := r .trackEncodings [idx ]
srtpStream := &srtpWriterFuture {ssrc : parameters .Encodings [idx ].SSRC , rtpSender : r }
writeStream := &interceptorToTrackLocalWriter {}
rtpParameters := r .api .mediaEngine .getRTPParametersByKind (
trackEncoding .track .Kind (),
[]RTPTransceiverDirection {RTPTransceiverDirectionSendonly },
)
trackEncoding .srtpStream = srtpStream
trackEncoding .ssrc = parameters .Encodings [idx ].SSRC
trackEncoding .ssrcRTX = parameters .Encodings [idx ].RTX .SSRC
trackEncoding .ssrcFEC = parameters .Encodings [idx ].FEC .SSRC
trackEncoding .rtcpInterceptor = r .api .interceptor .BindRTCPReader (
interceptor .RTCPReaderFunc (
func (in []byte , a interceptor .Attributes ) (n int , attributes interceptor .Attributes , err error ) {
n , err = trackEncoding .srtpStream .Read (in )
return n , a , err
},
),
)
trackEncoding .context = &baseTrackLocalContext {
id : r .id ,
params : rtpParameters ,
ssrc : parameters .Encodings [idx ].SSRC ,
ssrcFEC : parameters .Encodings [idx ].FEC .SSRC ,
ssrcRTX : parameters .Encodings [idx ].RTX .SSRC ,
writeStream : writeStream ,
rtcpInterceptor : trackEncoding .rtcpInterceptor ,
}
codec , err := trackEncoding .track .Bind (trackEncoding .context )
if err != nil {
return err
}
trackEncoding .context .params .Codecs = []RTPCodecParameters {codec }
trackEncoding .streamInfo = *createStreamInfo (
r .id ,
parameters .Encodings [idx ].SSRC ,
parameters .Encodings [idx ].RTX .SSRC ,
parameters .Encodings [idx ].FEC .SSRC ,
codec .PayloadType ,
findRTXPayloadType (codec .PayloadType , rtpParameters .Codecs ),
findFECPayloadType (rtpParameters .Codecs ),
codec .RTPCodecCapability ,
parameters .HeaderExtensions ,
)
rtpInterceptor := r .api .interceptor .BindLocalStream (
&trackEncoding .streamInfo ,
interceptor .RTPWriterFunc (func (header *rtp .Header , payload []byte , _ interceptor .Attributes ) (int , error ) {
return srtpStream .WriteRTP (header , payload )
}),
)
writeStream .interceptor .Store (rtpInterceptor )
}
close (r .sendCalled )
return nil
}
func (r *RTPSender ) Stop () error {
r .mu .Lock ()
if stopped := r .hasStopped (); stopped {
r .mu .Unlock ()
return nil
}
close (r .stopCalled )
r .mu .Unlock ()
if !r .hasSent () {
return nil
}
if err := r .ReplaceTrack (nil ); err != nil {
return err
}
errs := []error {}
for _ , trackEncoding := range r .trackEncodings {
r .api .interceptor .UnbindLocalStream (&trackEncoding .streamInfo )
if trackEncoding .srtpStream != nil {
errs = append (errs , trackEncoding .srtpStream .Close ())
}
}
return util .FlattenErrs (errs )
}
func (r *RTPSender ) Read (b []byte ) (n int , a interceptor .Attributes , err error ) {
select {
case <- r .sendCalled :
return r .trackEncodings [0 ].rtcpInterceptor .Read (b , a )
case <- r .stopCalled :
return 0 , nil , io .ErrClosedPipe
}
}
func (r *RTPSender ) ReadRTCP () ([]rtcp .Packet , interceptor .Attributes , error ) {
b := make ([]byte , r .api .settingEngine .getReceiveMTU ())
i , attributes , err := r .Read (b )
if err != nil {
return nil , nil , err
}
pkts , err := rtcp .Unmarshal (b [:i ])
if err != nil {
return nil , nil , err
}
return pkts , attributes , nil
}
func (r *RTPSender ) ReadSimulcast (b []byte , rid string ) (n int , a interceptor .Attributes , err error ) {
select {
case <- r .sendCalled :
r .mu .Lock ()
for _ , t := range r .trackEncodings {
if t .track != nil && t .track .RID () == rid {
reader := t .rtcpInterceptor
r .mu .Unlock ()
return reader .Read (b , a )
}
}
r .mu .Unlock ()
return 0 , nil , fmt .Errorf ("%w: %s" , errRTPSenderNoTrackForRID , rid )
case <- r .stopCalled :
return 0 , nil , io .ErrClosedPipe
}
}
func (r *RTPSender ) ReadSimulcastRTCP (rid string ) ([]rtcp .Packet , interceptor .Attributes , error ) {
b := make ([]byte , r .api .settingEngine .getReceiveMTU ())
i , attributes , err := r .ReadSimulcast (b , rid )
if err != nil {
return nil , nil , err
}
pkts , err := rtcp .Unmarshal (b [:i ])
return pkts , attributes , err
}
func (r *RTPSender ) SetReadDeadline (t time .Time ) error {
return r .trackEncodings [0 ].srtpStream .SetReadDeadline (t )
}
func (r *RTPSender ) SetReadDeadlineSimulcast (deadline time .Time , rid string ) error {
r .mu .RLock ()
defer r .mu .RUnlock ()
for _ , t := range r .trackEncodings {
if t .track != nil && t .track .RID () == rid {
return t .srtpStream .SetReadDeadline (deadline )
}
}
return fmt .Errorf ("%w: %s" , errRTPSenderNoTrackForRID , rid )
}
func (r *RTPSender ) hasSent () bool {
select {
case <- r .sendCalled :
return true
default :
return false
}
}
func (r *RTPSender ) hasStopped () bool {
select {
case <- r .stopCalled :
return true
default :
return false
}
}
func (r *RTPSender ) configureRTXAndFEC () {
r .mu .RLock ()
defer r .mu .RUnlock ()
for _ , trackEncoding := range r .trackEncodings {
if !r .api .mediaEngine .isRTXEnabled (r .kind , []RTPTransceiverDirection {RTPTransceiverDirectionSendonly }) {
trackEncoding .ssrcRTX = SSRC (0 )
}
if !r .api .mediaEngine .isFECEnabled (r .kind , []RTPTransceiverDirection {RTPTransceiverDirectionSendonly }) {
trackEncoding .ssrcFEC = SSRC (0 )
}
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .