package nack
import (
"sync"
"github.com/pion/interceptor"
"github.com/pion/interceptor/internal/rtpbuffer"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)
type ResponderInterceptorFactory struct {
opts []ResponderOption
}
func (r *ResponderInterceptorFactory ) NewInterceptor (_ string ) (interceptor .Interceptor , error ) {
responderInterceptor := &ResponderInterceptor {
streamsFilter : streamSupportNack ,
size : 1024 ,
log : logging .NewDefaultLoggerFactory ().NewLogger ("nack_responder" ),
streams : map [uint32 ]*localStream {},
}
for _ , opt := range r .opts {
if err := opt (responderInterceptor ); err != nil {
return nil , err
}
}
if responderInterceptor .packetFactory == nil {
responderInterceptor .packetFactory = rtpbuffer .NewPacketFactoryCopy ()
}
if _ , err := rtpbuffer .NewRTPBuffer (responderInterceptor .size ); err != nil {
return nil , err
}
return responderInterceptor , nil
}
type ResponderInterceptor struct {
interceptor .NoOp
streamsFilter func (info *interceptor .StreamInfo ) bool
size uint16
log logging .LeveledLogger
packetFactory rtpbuffer .PacketFactory
streams map [uint32 ]*localStream
streamsMu sync .Mutex
}
type localStream struct {
rtpBuffer *rtpbuffer .RTPBuffer
rtpBufferMutex sync .RWMutex
rtpWriter interceptor .RTPWriter
}
func NewResponderInterceptor (opts ...ResponderOption ) (*ResponderInterceptorFactory , error ) {
return &ResponderInterceptorFactory {opts }, nil
}
func (n *ResponderInterceptor ) BindRTCPReader (reader interceptor .RTCPReader ) interceptor .RTCPReader {
return interceptor .RTCPReaderFunc (func (b []byte , a interceptor .Attributes ) (int , interceptor .Attributes , error ) {
i , attr , err := reader .Read (b , a )
if err != nil {
return 0 , nil , err
}
if attr == nil {
attr = make (interceptor .Attributes )
}
pkts , err := attr .GetRTCPPackets (b [:i ])
if err != nil {
return 0 , nil , err
}
for _ , rtcpPacket := range pkts {
nack , ok := rtcpPacket .(*rtcp .TransportLayerNack )
if !ok {
continue
}
go n .resendPackets (nack )
}
return i , attr , err
})
}
func (n *ResponderInterceptor ) BindLocalStream (
info *interceptor .StreamInfo , writer interceptor .RTPWriter ,
) interceptor .RTPWriter {
if !n .streamsFilter (info ) {
return writer
}
rtpBuffer , _ := rtpbuffer .NewRTPBuffer (n .size )
stream := &localStream {
rtpBuffer : rtpBuffer ,
rtpWriter : writer ,
}
n .streamsMu .Lock ()
n .streams [info .SSRC ] = stream
n .streamsMu .Unlock ()
return interceptor .RTPWriterFunc (
func (header *rtp .Header , payload []byte , attributes interceptor .Attributes ) (int , error ) {
if header .SSRC != info .SSRC {
return writer .Write (header , payload , attributes )
}
pkt , err := n .packetFactory .NewPacket (header , payload , info .SSRCRetransmission , info .PayloadTypeRetransmission )
if err != nil {
return 0 , err
}
stream .rtpBufferMutex .Lock ()
defer stream .rtpBufferMutex .Unlock ()
rtpBuffer .Add (pkt )
return writer .Write (header , payload , attributes )
},
)
}
func (n *ResponderInterceptor ) UnbindLocalStream (info *interceptor .StreamInfo ) {
n .streamsMu .Lock ()
delete (n .streams , info .SSRC )
n .streamsMu .Unlock ()
}
func (n *ResponderInterceptor ) resendPackets (nack *rtcp .TransportLayerNack ) {
n .streamsMu .Lock ()
stream , ok := n .streams [nack .MediaSSRC ]
n .streamsMu .Unlock ()
if !ok {
return
}
for i := range nack .Nacks {
nack .Nacks [i ].Range (func (seq uint16 ) bool {
stream .rtpBufferMutex .Lock ()
defer stream .rtpBufferMutex .Unlock ()
if p := stream .rtpBuffer .Get (seq ); p != nil {
if _ , err := stream .rtpWriter .Write (p .Header (), p .Payload (), interceptor .Attributes {}); err != nil {
n .log .Warnf ("failed resending nacked packet: %+v" , err )
}
p .Release ()
}
return true
})
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .