package webrtc
import (
"encoding/binary"
"fmt"
"io"
"sync"
"time"
"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/srtp/v3"
"github.com/pion/webrtc/v4/internal/util"
)
type trackStreams struct {
track *TrackRemote
streamInfo, repairStreamInfo *interceptor .StreamInfo
rtpReadStream *srtp .ReadStreamSRTP
rtpInterceptor interceptor .RTPReader
rtcpReadStream *srtp .ReadStreamSRTCP
rtcpInterceptor interceptor .RTCPReader
repairReadStream *srtp .ReadStreamSRTP
repairInterceptor interceptor .RTPReader
repairStreamChannel chan rtxPacketWithAttributes
repairRtcpReadStream *srtp .ReadStreamSRTCP
repairRtcpInterceptor interceptor .RTCPReader
}
type rtxPacketWithAttributes struct {
pkt []byte
attributes interceptor .Attributes
pool *sync .Pool
}
func (p *rtxPacketWithAttributes ) release () {
if p .pkt != nil {
b := p .pkt [:cap (p .pkt )]
p .pool .Put (b )
p .pkt = nil
}
}
type RTPReceiver struct {
kind RTPCodecType
transport *DTLSTransport
tracks []trackStreams
closed, received chan interface {}
mu sync .RWMutex
tr *RTPTransceiver
api *API
rtxPool sync .Pool
}
func (api *API ) NewRTPReceiver (kind RTPCodecType , transport *DTLSTransport ) (*RTPReceiver , error ) {
if transport == nil {
return nil , errRTPReceiverDTLSTransportNil
}
r := &RTPReceiver {
kind : kind ,
transport : transport ,
api : api ,
closed : make (chan interface {}),
received : make (chan interface {}),
tracks : []trackStreams {},
rtxPool : sync .Pool {New : func () interface {} {
return make ([]byte , api .settingEngine .getReceiveMTU ())
}},
}
return r , nil
}
func (r *RTPReceiver ) setRTPTransceiver (tr *RTPTransceiver ) {
r .mu .Lock ()
defer r .mu .Unlock ()
r .tr = tr
}
func (r *RTPReceiver ) Transport () *DTLSTransport {
r .mu .RLock ()
defer r .mu .RUnlock ()
return r .transport
}
func (r *RTPReceiver ) getParameters () RTPParameters {
parameters := r .api .mediaEngine .getRTPParametersByKind (
r .kind ,
[]RTPTransceiverDirection {RTPTransceiverDirectionRecvonly },
)
if r .tr != nil {
parameters .Codecs = r .tr .getCodecs ()
}
return parameters
}
func (r *RTPReceiver ) GetParameters () RTPParameters {
r .mu .RLock ()
defer r .mu .RUnlock ()
return r .getParameters ()
}
func (r *RTPReceiver ) Track () *TrackRemote {
r .mu .RLock ()
defer r .mu .RUnlock ()
if len (r .tracks ) != 1 {
return nil
}
return r .tracks [0 ].track
}
func (r *RTPReceiver ) Tracks () []*TrackRemote {
r .mu .RLock ()
defer r .mu .RUnlock ()
var tracks []*TrackRemote
for i := range r .tracks {
tracks = append (tracks , r .tracks [i ].track )
}
return tracks
}
func (r *RTPReceiver ) RTPTransceiver () *RTPTransceiver {
r .mu .Lock ()
defer r .mu .Unlock ()
return r .tr
}
func (r *RTPReceiver ) configureReceive (parameters RTPReceiveParameters ) {
r .mu .Lock ()
defer r .mu .Unlock ()
for i := range parameters .Encodings {
t := trackStreams {
track : newTrackRemote (
r .kind ,
parameters .Encodings [i ].SSRC ,
parameters .Encodings [i ].RTX .SSRC ,
parameters .Encodings [i ].RID ,
r ,
),
}
r .tracks = append (r .tracks , t )
}
}
func (r *RTPReceiver ) startReceive (parameters RTPReceiveParameters ) error {
r .mu .Lock ()
defer r .mu .Unlock ()
select {
case <- r .received :
return errRTPReceiverReceiveAlreadyCalled
default :
}
globalParams := r .getParameters ()
codec := RTPCodecCapability {}
if len (globalParams .Codecs ) != 0 {
codec = globalParams .Codecs [0 ].RTPCodecCapability
}
for i := range parameters .Encodings {
if parameters .Encodings [i ].RID != "" {
continue
}
var streams *trackStreams
for idx , ts := range r .tracks {
if ts .track != nil && ts .track .SSRC () == parameters .Encodings [i ].SSRC {
streams = &r .tracks [idx ]
break
}
}
if streams == nil {
return fmt .Errorf ("%w: %d" , errRTPReceiverWithSSRCTrackStreamNotFound , parameters .Encodings [i ].SSRC )
}
streams .streamInfo = createStreamInfo (
"" ,
parameters .Encodings [i ].SSRC ,
0 , 0 , 0 , 0 , 0 ,
codec ,
globalParams .HeaderExtensions ,
)
var err error
if streams .rtpReadStream , streams .rtpInterceptor , streams .rtcpReadStream , streams .rtcpInterceptor , err = r .transport .streamsForSSRC (parameters .Encodings [i ].SSRC , *streams .streamInfo ); err != nil {
return err
}
if rtxSsrc := parameters .Encodings [i ].RTX .SSRC ; rtxSsrc != 0 {
streamInfo := createStreamInfo ("" , rtxSsrc , 0 , 0 , 0 , 0 , 0 , codec , globalParams .HeaderExtensions )
rtpReadStream , rtpInterceptor , rtcpReadStream , rtcpInterceptor , err := r .transport .streamsForSSRC (
rtxSsrc ,
*streamInfo ,
)
if err != nil {
return err
}
if err = r .receiveForRtx (
rtxSsrc ,
"" ,
streamInfo ,
rtpReadStream ,
rtpInterceptor ,
rtcpReadStream ,
rtcpInterceptor ,
); err != nil {
return err
}
}
}
close (r .received )
return nil
}
func (r *RTPReceiver ) Receive (parameters RTPReceiveParameters ) error {
r .configureReceive (parameters )
return r .startReceive (parameters )
}
func (r *RTPReceiver ) Read (b []byte ) (n int , a interceptor .Attributes , err error ) {
select {
case <- r .received :
return r .tracks [0 ].rtcpInterceptor .Read (b , a )
case <- r .closed :
return 0 , nil , io .ErrClosedPipe
}
}
func (r *RTPReceiver ) ReadSimulcast (b []byte , rid string ) (n int , a interceptor .Attributes , err error ) {
select {
case <- r .received :
var rtcpInterceptor interceptor .RTCPReader
r .mu .Lock ()
for _ , t := range r .tracks {
if t .track != nil && t .track .rid == rid {
rtcpInterceptor = t .rtcpInterceptor
}
}
r .mu .Unlock ()
if rtcpInterceptor == nil {
return 0 , nil , fmt .Errorf ("%w: %s" , errRTPReceiverForRIDTrackStreamNotFound , rid )
}
return rtcpInterceptor .Read (b , a )
case <- r .closed :
return 0 , nil , io .ErrClosedPipe
}
}
func (r *RTPReceiver ) ReadRTCP () ([]rtcp .Packet , interceptor .Attributes , error ) {
b := make ([]byte , r .api .settingEngine .getReceiveMTU ())
i , attributes , err := r .Read (b )
if err != nil {
return nil , nil , err
}
pkts , err := rtcp .Unmarshal (b [:i ])
if err != nil {
return nil , nil , err
}
return pkts , attributes , nil
}
func (r *RTPReceiver ) ReadSimulcastRTCP (rid string ) ([]rtcp .Packet , interceptor .Attributes , error ) {
b := make ([]byte , r .api .settingEngine .getReceiveMTU ())
i , attributes , err := r .ReadSimulcast (b , rid )
if err != nil {
return nil , nil , err
}
pkts , err := rtcp .Unmarshal (b [:i ])
return pkts , attributes , err
}
func (r *RTPReceiver ) haveReceived () bool {
select {
case <- r .received :
return true
default :
return false
}
}
func (r *RTPReceiver ) Stop () error {
r .mu .Lock ()
defer r .mu .Unlock ()
var err error
select {
case <- r .closed :
return err
default :
}
select {
case <- r .received :
for i := range r .tracks {
errs := []error {}
if r .tracks [i ].rtcpReadStream != nil {
errs = append (errs , r .tracks [i ].rtcpReadStream .Close ())
}
if r .tracks [i ].rtpReadStream != nil {
errs = append (errs , r .tracks [i ].rtpReadStream .Close ())
}
if r .tracks [i ].repairReadStream != nil {
errs = append (errs , r .tracks [i ].repairReadStream .Close ())
}
if r .tracks [i ].repairRtcpReadStream != nil {
errs = append (errs , r .tracks [i ].repairRtcpReadStream .Close ())
}
if r .tracks [i ].streamInfo != nil {
r .api .interceptor .UnbindRemoteStream (r .tracks [i ].streamInfo )
}
if r .tracks [i ].repairStreamInfo != nil {
r .api .interceptor .UnbindRemoteStream (r .tracks [i ].repairStreamInfo )
}
err = util .FlattenErrs (errs )
}
default :
}
close (r .closed )
return err
}
func (r *RTPReceiver ) streamsForTrack (t *TrackRemote ) *trackStreams {
for i := range r .tracks {
if r .tracks [i ].track == t {
return &r .tracks [i ]
}
}
return nil
}
func (r *RTPReceiver ) readRTP (b []byte , reader *TrackRemote ) (n int , a interceptor .Attributes , err error ) {
select {
case <- r .received :
case <- r .closed :
return 0 , nil , io .EOF
}
if t := r .streamsForTrack (reader ); t != nil {
return t .rtpInterceptor .Read (b , a )
}
return 0 , nil , fmt .Errorf ("%w: %d" , errRTPReceiverWithSSRCTrackStreamNotFound , reader .SSRC ())
}
func (r *RTPReceiver ) receiveForRid (
rid string ,
params RTPParameters ,
streamInfo *interceptor .StreamInfo ,
rtpReadStream *srtp .ReadStreamSRTP ,
rtpInterceptor interceptor .RTPReader ,
rtcpReadStream *srtp .ReadStreamSRTCP ,
rtcpInterceptor interceptor .RTCPReader ,
) (*TrackRemote , error ) {
r .mu .Lock ()
defer r .mu .Unlock ()
for i := range r .tracks {
if r .tracks [i ].track .RID () == rid {
r .tracks [i ].track .mu .Lock ()
r .tracks [i ].track .kind = r .kind
r .tracks [i ].track .codec = params .Codecs [0 ]
r .tracks [i ].track .params = params
r .tracks [i ].track .ssrc = SSRC (streamInfo .SSRC )
r .tracks [i ].track .mu .Unlock ()
r .tracks [i ].streamInfo = streamInfo
r .tracks [i ].rtpReadStream = rtpReadStream
r .tracks [i ].rtpInterceptor = rtpInterceptor
r .tracks [i ].rtcpReadStream = rtcpReadStream
r .tracks [i ].rtcpInterceptor = rtcpInterceptor
return r .tracks [i ].track , nil
}
}
return nil , fmt .Errorf ("%w: %s" , errRTPReceiverForRIDTrackStreamNotFound , rid )
}
func (r *RTPReceiver ) receiveForRtx (
ssrc SSRC ,
rsid string ,
streamInfo *interceptor .StreamInfo ,
rtpReadStream *srtp .ReadStreamSRTP ,
rtpInterceptor interceptor .RTPReader ,
rtcpReadStream *srtp .ReadStreamSRTCP ,
rtcpInterceptor interceptor .RTCPReader ,
) error {
var track *trackStreams
if ssrc != 0 && len (r .tracks ) == 1 {
track = &r .tracks [0 ]
} else {
for i := range r .tracks {
if r .tracks [i ].track .RID () == rsid {
track = &r .tracks [i ]
if track .track .RtxSSRC () == 0 {
track .track .setRtxSSRC (SSRC (streamInfo .SSRC ))
}
break
}
}
}
if track == nil {
return fmt .Errorf ("%w: ssrc(%d) rsid(%s)" , errRTPReceiverForRIDTrackStreamNotFound , ssrc , rsid )
}
track .repairStreamInfo = streamInfo
track .repairReadStream = rtpReadStream
track .repairInterceptor = rtpInterceptor
track .repairRtcpReadStream = rtcpReadStream
track .repairRtcpInterceptor = rtcpInterceptor
track .repairStreamChannel = make (chan rtxPacketWithAttributes , 50 )
go func () {
for {
b := r .rtxPool .Get ().([]byte )
i , attributes , err := track .repairInterceptor .Read (b , nil )
if err != nil {
r .rtxPool .Put (b )
return
}
hasExtension := b [0 ]&0b10000 > 0
hasPadding := b [0 ]&0b100000 > 0
csrcCount := b [0 ] & 0b1111
headerLength := uint16 (12 + (4 * csrcCount ))
paddingLength := 0
if hasExtension {
headerLength += 4 * (1 + binary .BigEndian .Uint16 (b [headerLength +2 :headerLength +4 ]))
}
if hasPadding {
paddingLength = int (b [i -1 ])
}
if i -int (headerLength )-paddingLength < 2 {
r .rtxPool .Put (b )
continue
}
if attributes == nil {
attributes = make (interceptor .Attributes )
}
attributes .Set (AttributeRtxPayloadType , b [1 ]&0x7F )
attributes .Set (AttributeRtxSequenceNumber , binary .BigEndian .Uint16 (b [2 :4 ]))
attributes .Set (AttributeRtxSsrc , binary .BigEndian .Uint32 (b [8 :12 ]))
b [1 ] = (b [1 ] & 0x80 ) | uint8 (track .track .PayloadType ())
b [2 ] = b [headerLength ]
b [3 ] = b [headerLength +1 ]
binary .BigEndian .PutUint32 (b [8 :12 ], uint32 (track .track .SSRC ()))
copy (b [headerLength :i -2 ], b [headerLength +2 :i ])
select {
case <- r .closed :
r .rtxPool .Put (b )
return
case track .repairStreamChannel <- rtxPacketWithAttributes {pkt : b [:i -2 ], attributes : attributes , pool : &r .rtxPool }:
default :
}
}
}()
return nil
}
func (r *RTPReceiver ) SetReadDeadline (t time .Time ) error {
r .mu .RLock ()
defer r .mu .RUnlock ()
return r .tracks [0 ].rtcpReadStream .SetReadDeadline (t )
}
func (r *RTPReceiver ) SetReadDeadlineSimulcast (deadline time .Time , rid string ) error {
r .mu .RLock ()
defer r .mu .RUnlock ()
for _ , t := range r .tracks {
if t .track != nil && t .track .rid == rid {
return t .rtcpReadStream .SetReadDeadline (deadline )
}
}
return fmt .Errorf ("%w: %s" , errRTPReceiverForRIDTrackStreamNotFound , rid )
}
func (r *RTPReceiver ) setRTPReadDeadline (deadline time .Time , reader *TrackRemote ) error {
r .mu .RLock ()
defer r .mu .RUnlock ()
if t := r .streamsForTrack (reader ); t != nil {
return t .rtpReadStream .SetReadDeadline (deadline )
}
return fmt .Errorf ("%w: %d" , errRTPReceiverWithSSRCTrackStreamNotFound , reader .SSRC ())
}
func (r *RTPReceiver ) readRTX (reader *TrackRemote ) *rtxPacketWithAttributes {
if !reader .HasRTX () {
return nil
}
select {
case <- r .received :
default :
return nil
}
if t := r .streamsForTrack (reader ); t != nil {
select {
case rtxPacketReceived := <- t .repairStreamChannel :
return &rtxPacketReceived
default :
}
}
return nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .