package rfc8888
import (
"sync"
"time"
"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtcp"
)
type TickerFactory func (d time .Duration ) ticker
type SenderInterceptorFactory struct {
opts []Option
}
func (s *SenderInterceptorFactory ) NewInterceptor (_ string ) (interceptor .Interceptor , error ) {
senderInterceptor := &SenderInterceptor {
NoOp : interceptor .NoOp {},
log : logging .NewDefaultLoggerFactory ().NewLogger ("rfc8888_interceptor" ),
lock : sync .Mutex {},
wg : sync .WaitGroup {},
recorder : NewRecorder (),
interval : 100 * time .Millisecond ,
maxReportSize : 1200 ,
packetChan : make (chan packet ),
newTicker : func (d time .Duration ) ticker {
return &timeTicker {time .NewTicker (d )}
},
now : time .Now ,
close : make (chan struct {}),
}
for _ , opt := range s .opts {
err := opt (senderInterceptor )
if err != nil {
return nil , err
}
}
return senderInterceptor , nil
}
func NewSenderInterceptor (opts ...Option ) (*SenderInterceptorFactory , error ) {
return &SenderInterceptorFactory {opts : opts }, nil
}
type SenderInterceptor struct {
interceptor .NoOp
log logging .LeveledLogger
lock sync .Mutex
wg sync .WaitGroup
recorder *Recorder
interval time .Duration
maxReportSize int64
packetChan chan packet
newTicker TickerFactory
now func () time .Time
close chan struct {}
}
type packet struct {
arrival time .Time
ssrc uint32
sequenceNumber uint16
ecn uint8
}
func (s *SenderInterceptor ) BindRTCPWriter (writer interceptor .RTCPWriter ) interceptor .RTCPWriter {
s .lock .Lock ()
defer s .lock .Unlock ()
if s .isClosed () {
return writer
}
s .wg .Add (1 )
go s .loop (writer )
return writer
}
func (s *SenderInterceptor ) BindRemoteStream (
_ *interceptor .StreamInfo , reader interceptor .RTPReader ,
) interceptor .RTPReader {
return interceptor .RTPReaderFunc (func (b []byte , a interceptor .Attributes ) (int , interceptor .Attributes , error ) {
i , attr , err := reader .Read (b , a )
if err != nil {
return 0 , nil , err
}
if attr == nil {
attr = make (interceptor .Attributes )
}
header , err := attr .GetRTPHeader (b [:i ])
if err != nil {
return 0 , nil , err
}
p := packet {
arrival : s .now (),
ssrc : header .SSRC ,
sequenceNumber : header .SequenceNumber ,
ecn : 0 ,
}
s .packetChan <- p
return i , attr , nil
})
}
func (s *SenderInterceptor ) Close () error {
s .log .Trace ("close" )
defer s .wg .Wait ()
if !s .isClosed () {
close (s .close )
}
return nil
}
func (s *SenderInterceptor ) isClosed () bool {
select {
case <- s .close :
return true
default :
return false
}
}
func (s *SenderInterceptor ) loop (writer interceptor .RTCPWriter ) {
defer s .wg .Done ()
select {
case <- s .close :
return
case pkt := <- s .packetChan :
s .log .Tracef ("got first packet: %v" , pkt )
s .recorder .AddPacket (pkt .arrival , pkt .ssrc , pkt .sequenceNumber , pkt .ecn )
}
s .log .Trace ("start loop" )
t := s .newTicker (s .interval )
for {
select {
case <- s .close :
t .Stop ()
return
case pkt := <- s .packetChan :
s .log .Tracef ("got packet: %v" , pkt )
s .recorder .AddPacket (pkt .arrival , pkt .ssrc , pkt .sequenceNumber , pkt .ecn )
case <- t .Ch ():
now := s .now ()
s .log .Tracef ("report triggered at %v" , now )
if writer == nil {
s .log .Trace ("no writer added, continue" )
continue
}
pkts := s .recorder .BuildReport (now , int (s .maxReportSize ))
if pkts == nil {
continue
}
s .log .Tracef ("got report: %v" , pkts )
if _ , err := writer .Write ([]rtcp .Packet {pkts }, nil ); err != nil {
s .log .Error (err .Error())
}
}
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .