package twcc
import (
"errors"
"math/rand"
"sync"
"time"
"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtp"
)
type SenderInterceptorFactory struct {
opts []Option
}
var errClosed = errors .New ("interceptor is closed" )
func (s *SenderInterceptorFactory ) NewInterceptor (_ string ) (interceptor .Interceptor , error ) {
senderInterceptor := &SenderInterceptor {
log : logging .NewDefaultLoggerFactory ().NewLogger ("twcc_sender_interceptor" ),
packetChan : make (chan packet ),
close : make (chan struct {}),
interval : 100 * time .Millisecond ,
startTime : time .Now (),
}
for _ , opt := range s .opts {
err := opt (senderInterceptor )
if err != nil {
return nil , err
}
}
return senderInterceptor , nil
}
func NewSenderInterceptor (opts ...Option ) (*SenderInterceptorFactory , error ) {
return &SenderInterceptorFactory {opts : opts }, nil
}
type SenderInterceptor struct {
interceptor .NoOp
log logging .LeveledLogger
m sync .Mutex
wg sync .WaitGroup
close chan struct {}
interval time .Duration
startTime time .Time
recorder *Recorder
packetChan chan packet
}
type Option func (*SenderInterceptor ) error
func SendInterval (interval time .Duration ) Option {
return func (s *SenderInterceptor ) error {
s .interval = interval
return nil
}
}
func (s *SenderInterceptor ) BindRTCPWriter (writer interceptor .RTCPWriter ) interceptor .RTCPWriter {
s .m .Lock ()
defer s .m .Unlock ()
s .recorder = NewRecorder (rand .Uint32 ())
if s .isClosed () {
return writer
}
s .wg .Add (1 )
go s .loop (writer )
return writer
}
type packet struct {
hdr *rtp .Header
sequenceNumber uint16
arrivalTime int64
ssrc uint32
}
func (s *SenderInterceptor ) BindRemoteStream (
info *interceptor .StreamInfo , reader interceptor .RTPReader ,
) interceptor .RTPReader {
var hdrExtID uint8
for _ , e := range info .RTPHeaderExtensions {
if e .URI == transportCCURI {
hdrExtID = uint8 (e .ID )
break
}
}
if hdrExtID == 0 {
return reader
}
return interceptor .RTPReaderFunc (
func (buf []byte , attributes interceptor .Attributes ) (int , interceptor .Attributes , error ) {
i , attr , err := reader .Read (buf , attributes )
if err != nil {
return 0 , nil , err
}
if attr == nil {
attr = make (interceptor .Attributes )
}
header , err := attr .GetRTPHeader (buf [:i ])
if err != nil {
return 0 , nil , err
}
var tccExt rtp .TransportCCExtension
if ext := header .GetExtension (hdrExtID ); ext != nil {
err = tccExt .Unmarshal (ext )
if err != nil {
return 0 , nil , err
}
p := packet {
hdr : header ,
sequenceNumber : tccExt .TransportSequence ,
arrivalTime : time .Since (s .startTime ).Microseconds (),
ssrc : info .SSRC ,
}
select {
case <- s .close :
return 0 , nil , errClosed
case s .packetChan <- p :
}
}
return i , attr , nil
},
)
}
func (s *SenderInterceptor ) Close () error {
defer s .wg .Wait ()
s .m .Lock ()
defer s .m .Unlock ()
if !s .isClosed () {
close (s .close )
}
return nil
}
func (s *SenderInterceptor ) isClosed () bool {
select {
case <- s .close :
return true
default :
return false
}
}
func (s *SenderInterceptor ) loop (writer interceptor .RTCPWriter ) {
defer s .wg .Done ()
select {
case <- s .close :
return
case p := <- s .packetChan :
s .recorder .Record (p .ssrc , p .sequenceNumber , p .arrivalTime )
}
ticker := time .NewTicker (s .interval )
for {
select {
case <- s .close :
ticker .Stop ()
return
case p := <- s .packetChan :
s .recorder .Record (p .ssrc , p .sequenceNumber , p .arrivalTime )
case <- ticker .C :
pkts := s .recorder .BuildFeedbackPacket ()
if len (pkts ) == 0 {
continue
}
if _ , err := writer .Write (pkts , nil ); err != nil {
s .log .Error (err .Error())
}
}
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .