package report
import (
"sync"
"time"
"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)
type TickerFactory func (d time .Duration ) Ticker
type SenderInterceptorFactory struct {
opts []SenderOption
}
func (s *SenderInterceptorFactory ) NewInterceptor (_ string ) (interceptor .Interceptor , error ) {
senderInterceptor := &SenderInterceptor {
interval : 1 * time .Second ,
now : time .Now ,
newTicker : func (d time .Duration ) Ticker {
return &timeTicker {time .NewTicker (d )}
},
log : logging .NewDefaultLoggerFactory ().NewLogger ("sender_interceptor" ),
close : make (chan struct {}),
}
for _ , opt := range s .opts {
if err := opt (senderInterceptor ); err != nil {
return nil , err
}
}
return senderInterceptor , nil
}
func NewSenderInterceptor (opts ...SenderOption ) (*SenderInterceptorFactory , error ) {
return &SenderInterceptorFactory {opts }, nil
}
type SenderInterceptor struct {
interceptor .NoOp
interval time .Duration
now func () time .Time
newTicker TickerFactory
streams sync .Map
log logging .LeveledLogger
m sync .Mutex
wg sync .WaitGroup
close chan struct {}
started chan struct {}
useLatestPacket bool
}
func (s *SenderInterceptor ) isClosed () bool {
select {
case <- s .close :
return true
default :
return false
}
}
func (s *SenderInterceptor ) Close () error {
defer s .wg .Wait ()
s .m .Lock ()
defer s .m .Unlock ()
if !s .isClosed () {
close (s .close )
}
return nil
}
func (s *SenderInterceptor ) BindRTCPWriter (writer interceptor .RTCPWriter ) interceptor .RTCPWriter {
s .m .Lock ()
defer s .m .Unlock ()
if s .isClosed () {
return writer
}
s .wg .Add (1 )
go s .loop (writer )
return writer
}
func (s *SenderInterceptor ) loop (rtcpWriter interceptor .RTCPWriter ) {
defer s .wg .Done ()
ticker := s .newTicker (s .interval )
defer ticker .Stop ()
if s .started != nil {
close (s .started )
}
for {
select {
case <- ticker .Ch ():
now := s .now ()
s .streams .Range (func (_ , value interface {}) bool {
if stream , ok := value .(*senderStream ); !ok {
s .log .Warnf ("failed to cast SenderInterceptor stream" )
} else if _ , err := rtcpWriter .Write (
[]rtcp .Packet {stream .generateReport (now )}, interceptor .Attributes {},
); err != nil {
s .log .Warnf ("failed sending: %+v" , err )
}
return true
})
case <- s .close :
return
}
}
}
func (s *SenderInterceptor ) BindLocalStream (
info *interceptor .StreamInfo , writer interceptor .RTPWriter ,
) interceptor .RTPWriter {
stream := newSenderStream (info .SSRC , info .ClockRate , s .useLatestPacket )
s .streams .Store (info .SSRC , stream )
return interceptor .RTPWriterFunc (func (header *rtp .Header , payload []byte , a interceptor .Attributes ) (int , error ) {
stream .processRTP (s .now (), header , payload )
return writer .Write (header , payload , a )
})
}
func (s *SenderInterceptor ) UnbindLocalStream (info *interceptor .StreamInfo ) {
s .streams .Delete (info .SSRC )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .