package nack
import (
"math/rand"
"sync"
"time"
"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtcp"
)
type GeneratorInterceptorFactory struct {
opts []GeneratorOption
}
func (g *GeneratorInterceptorFactory ) NewInterceptor (_ string ) (interceptor .Interceptor , error ) {
generatorInterceptor := &GeneratorInterceptor {
streamsFilter : streamSupportNack ,
size : 512 ,
skipLastN : 0 ,
maxNacksPerPacket : 0 ,
interval : time .Millisecond * 100 ,
receiveLogs : map [uint32 ]*receiveLog {},
nackCountLogs : map [uint32 ]map [uint16 ]uint16 {},
close : make (chan struct {}),
log : logging .NewDefaultLoggerFactory ().NewLogger ("nack_generator" ),
}
for _ , opt := range g .opts {
if err := opt (generatorInterceptor ); err != nil {
return nil , err
}
}
if _ , err := newReceiveLog (generatorInterceptor .size ); err != nil {
return nil , err
}
return generatorInterceptor , nil
}
type GeneratorInterceptor struct {
interceptor .NoOp
streamsFilter func (info *interceptor .StreamInfo ) bool
size uint16
skipLastN uint16
maxNacksPerPacket uint16
interval time .Duration
m sync .Mutex
wg sync .WaitGroup
close chan struct {}
log logging .LeveledLogger
nackCountLogs map [uint32 ]map [uint16 ]uint16
receiveLogs map [uint32 ]*receiveLog
receiveLogsMu sync .Mutex
}
func NewGeneratorInterceptor (opts ...GeneratorOption ) (*GeneratorInterceptorFactory , error ) {
return &GeneratorInterceptorFactory {opts }, nil
}
func (n *GeneratorInterceptor ) BindRTCPWriter (writer interceptor .RTCPWriter ) interceptor .RTCPWriter {
n .m .Lock ()
defer n .m .Unlock ()
if n .isClosed () {
return writer
}
n .wg .Add (1 )
go n .loop (writer )
return writer
}
func (n *GeneratorInterceptor ) BindRemoteStream (
info *interceptor .StreamInfo , reader interceptor .RTPReader ,
) interceptor .RTPReader {
if !n .streamsFilter (info ) {
return reader
}
receiveLog , _ := newReceiveLog (n .size )
n .receiveLogsMu .Lock ()
n .receiveLogs [info .SSRC ] = receiveLog
n .receiveLogsMu .Unlock ()
return interceptor .RTPReaderFunc (func (b []byte , a interceptor .Attributes ) (int , interceptor .Attributes , error ) {
i , attr , err := reader .Read (b , a )
if err != nil {
return 0 , nil , err
}
if attr == nil {
attr = make (interceptor .Attributes )
}
header , err := attr .GetRTPHeader (b [:i ])
if err != nil {
return 0 , nil , err
}
receiveLog .add (header .SequenceNumber )
return i , attr , nil
})
}
func (n *GeneratorInterceptor ) UnbindRemoteStream (info *interceptor .StreamInfo ) {
n .receiveLogsMu .Lock ()
delete (n .receiveLogs , info .SSRC )
n .receiveLogsMu .Unlock ()
}
func (n *GeneratorInterceptor ) Close () error {
defer n .wg .Wait ()
n .m .Lock ()
defer n .m .Unlock ()
if !n .isClosed () {
close (n .close )
}
return nil
}
func (n *GeneratorInterceptor ) loop (rtcpWriter interceptor .RTCPWriter ) {
defer n .wg .Done ()
senderSSRC := rand .Uint32 ()
missingPacketSeqNums := make ([]uint16 , n .size )
filteredMissingPacket := make ([]uint16 , n .size )
ticker := time .NewTicker (n .interval )
defer ticker .Stop ()
for {
select {
case <- ticker .C :
func () {
n .receiveLogsMu .Lock ()
defer n .receiveLogsMu .Unlock ()
for ssrc , receiveLog := range n .receiveLogs {
missing := receiveLog .missingSeqNumbers (n .skipLastN , missingPacketSeqNums )
if len (missing ) == 0 || n .nackCountLogs [ssrc ] == nil {
n .nackCountLogs [ssrc ] = map [uint16 ]uint16 {}
}
if len (missing ) == 0 {
continue
}
nack := &rtcp .TransportLayerNack {}
c := 0
if n .maxNacksPerPacket > 0 {
for _ , missingSeq := range missing {
if n .nackCountLogs [ssrc ][missingSeq ] < n .maxNacksPerPacket {
filteredMissingPacket [c ] = missingSeq
c ++
}
n .nackCountLogs [ssrc ][missingSeq ]++
}
if c == 0 {
continue
}
nack = &rtcp .TransportLayerNack {
SenderSSRC : senderSSRC ,
MediaSSRC : ssrc ,
Nacks : rtcp .NackPairsFromSequenceNumbers (filteredMissingPacket [:c ]),
}
} else {
nack = &rtcp .TransportLayerNack {
SenderSSRC : senderSSRC ,
MediaSSRC : ssrc ,
Nacks : rtcp .NackPairsFromSequenceNumbers (missing ),
}
}
for nackSeq := range n .nackCountLogs [ssrc ] {
isMissing := false
for _ , missingSeq := range missing {
if missingSeq == nackSeq {
isMissing = true
break
}
}
if !isMissing {
delete (n .nackCountLogs [ssrc ], nackSeq )
}
}
if _ , err := rtcpWriter .Write ([]rtcp .Packet {nack }, interceptor .Attributes {}); err != nil {
n .log .Warnf ("failed sending nack: %+v" , err )
}
}
}()
case <- n .close :
return
}
}
}
func (n *GeneratorInterceptor ) isClosed () bool {
select {
case <- n .close :
return true
default :
return false
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .