// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package rfc8888 provides an interceptor that generates congestion control // feedback reports as defined by RFC 8888.
package rfc8888 import ( ) // TickerFactory is a factory to create new tickers. type TickerFactory func(d time.Duration) ticker // SenderInterceptorFactory is a interceptor.Factory for a SenderInterceptor. type SenderInterceptorFactory struct { opts []Option } // NewInterceptor constructs a new SenderInterceptor. func ( *SenderInterceptorFactory) ( string) (interceptor.Interceptor, error) { := &SenderInterceptor{ NoOp: interceptor.NoOp{}, log: logging.NewDefaultLoggerFactory().NewLogger("rfc8888_interceptor"), lock: sync.Mutex{}, wg: sync.WaitGroup{}, recorder: NewRecorder(), interval: 100 * time.Millisecond, maxReportSize: 1200, packetChan: make(chan packet), newTicker: func( time.Duration) ticker { return &timeTicker{time.NewTicker()} }, now: time.Now, close: make(chan struct{}), } for , := range .opts { := () if != nil { return nil, } } return , nil } // NewSenderInterceptor returns a new SenderInterceptorFactory configured with the given options. func ( ...Option) (*SenderInterceptorFactory, error) { return &SenderInterceptorFactory{opts: }, nil } // SenderInterceptor sends congestion control feedback as specified in RFC 8888. type SenderInterceptor struct { interceptor.NoOp log logging.LeveledLogger lock sync.Mutex wg sync.WaitGroup recorder *Recorder interval time.Duration maxReportSize int64 packetChan chan packet newTicker TickerFactory now func() time.Time close chan struct{} } type packet struct { arrival time.Time ssrc uint32 sequenceNumber uint16 ecn uint8 } // BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method // will be called once per packet batch. func ( *SenderInterceptor) ( interceptor.RTCPWriter) interceptor.RTCPWriter { .lock.Lock() defer .lock.Unlock() if .isClosed() { return } .wg.Add(1) go .loop() return } // BindRemoteStream lets you modify any incoming RTP packets. // It is called once for per RemoteStream. The returned method // will be called once per rtp packet.. func ( *SenderInterceptor) ( *interceptor.StreamInfo, interceptor.RTPReader, ) interceptor.RTPReader { return interceptor.RTPReaderFunc(func( []byte, interceptor.Attributes) (int, interceptor.Attributes, error) { , , := .Read(, ) if != nil { return 0, nil, } if == nil { = make(interceptor.Attributes) } , := .GetRTPHeader([:]) if != nil { return 0, nil, } := packet{ arrival: .now(), ssrc: .SSRC, sequenceNumber: .SequenceNumber, ecn: 0, // ECN is not supported (yet). } .packetChan <- return , , nil }) } // Close closes the interceptor. func ( *SenderInterceptor) () error { .log.Trace("close") defer .wg.Wait() if !.isClosed() { close(.close) } return nil } func ( *SenderInterceptor) () bool { select { case <-.close: return true default: return false } } func ( *SenderInterceptor) ( interceptor.RTCPWriter) { defer .wg.Done() select { case <-.close: return case := <-.packetChan: .log.Tracef("got first packet: %v", ) .recorder.AddPacket(.arrival, .ssrc, .sequenceNumber, .ecn) } .log.Trace("start loop") := .newTicker(.interval) for { select { case <-.close: .Stop() return case := <-.packetChan: .log.Tracef("got packet: %v", ) .recorder.AddPacket(.arrival, .ssrc, .sequenceNumber, .ecn) case <-.Ch(): := .now() .log.Tracef("report triggered at %v", ) if == nil { .log.Trace("no writer added, continue") continue } := .recorder.BuildReport(, int(.maxReportSize)) if == nil { continue } .log.Tracef("got report: %v", ) if , := .Write([]rtcp.Packet{}, nil); != nil { .log.Error(.Error()) } } } }