package twcc
import (
"math"
"github.com/pion/interceptor/internal/sequencenumber"
"github.com/pion/rtcp"
)
const (
packetWindowMicroseconds = 500_000
maxMissingSequenceNumbers = 0x7FFE
)
type Recorder struct {
arrivalTimeMap packetArrivalTimeMap
sequenceUnwrapper sequencenumber .Unwrapper
startSequenceNumber *int64
senderSSRC uint32
mediaSSRC uint32
fbPktCnt uint8
packetsHeld int
}
func NewRecorder (senderSSRC uint32 ) *Recorder {
return &Recorder {
senderSSRC : senderSSRC ,
}
}
func (r *Recorder ) Record (mediaSSRC uint32 , sequenceNumber uint16 , arrivalTime int64 ) {
r .mediaSSRC = mediaSSRC
unwrappedSN := r .sequenceUnwrapper .Unwrap (sequenceNumber )
r .maybeCullOldPackets (unwrappedSN , arrivalTime )
if r .startSequenceNumber == nil || unwrappedSN < *r .startSequenceNumber {
r .startSequenceNumber = &unwrappedSN
}
if r .arrivalTimeMap .HasReceived (unwrappedSN ) {
return
}
r .arrivalTimeMap .AddPacket (unwrappedSN , arrivalTime )
r .packetsHeld ++
if *r .startSequenceNumber < r .arrivalTimeMap .BeginSequenceNumber () {
sn := r .arrivalTimeMap .BeginSequenceNumber ()
r .startSequenceNumber = &sn
}
}
func (r *Recorder ) maybeCullOldPackets (sequenceNumber int64 , arrivalTime int64 ) {
if r .startSequenceNumber != nil && *r .startSequenceNumber >= r .arrivalTimeMap .EndSequenceNumber () &&
arrivalTime >= packetWindowMicroseconds {
r .arrivalTimeMap .RemoveOldPackets (sequenceNumber , arrivalTime -packetWindowMicroseconds )
}
}
func (r *Recorder ) PacketsHeld () int {
return r .packetsHeld
}
func (r *Recorder ) BuildFeedbackPacket () []rtcp .Packet {
if r .startSequenceNumber == nil {
return nil
}
endSN := r .arrivalTimeMap .EndSequenceNumber ()
var feedbacks []rtcp .Packet
for *r .startSequenceNumber < endSN {
feedback := r .maybeBuildFeedbackPacket (*r .startSequenceNumber , endSN )
if feedback == nil {
break
}
feedbacks = append (feedbacks , feedback .getRTCP ())
}
r .packetsHeld = 0
return feedbacks
}
func (r *Recorder ) maybeBuildFeedbackPacket (beginSeqNumInclusive , endSeqNumExclusive int64 ) *feedback {
startSNInclusive , endSNExclusive := r .arrivalTimeMap .Clamp (beginSeqNumInclusive ), r .arrivalTimeMap .Clamp (endSeqNumExclusive )
var fb *feedback
nextSequenceNumber := beginSeqNumInclusive
for seq := startSNInclusive ; seq < endSNExclusive ; seq ++ {
foundSeq , arrivalTime , ok := r .arrivalTimeMap .FindNextAtOrAfter (seq )
seq = foundSeq
if !ok || seq >= endSNExclusive {
break
}
if fb == nil {
fb = newFeedback (r .senderSSRC , r .mediaSSRC , r .fbPktCnt )
r .fbPktCnt ++
baseSequenceNumber := max64 (beginSeqNumInclusive , seq -maxMissingSequenceNumbers )
fb .setBase (uint16 (baseSequenceNumber ), arrivalTime )
if !fb .addReceived (uint16 (seq ), arrivalTime ) {
r .startSequenceNumber = &seq
return nil
}
} else if !fb .addReceived (uint16 (seq ), arrivalTime ) {
break
}
nextSequenceNumber = seq + 1
}
r .startSequenceNumber = &nextSequenceNumber
return fb
}
type feedback struct {
rtcp *rtcp .TransportLayerCC
baseSequenceNumber uint16
refTimestamp64MS int64
lastTimestampUS int64
nextSequenceNumber uint16
sequenceNumberCount uint16
len int
lastChunk chunk
chunks []rtcp .PacketStatusChunk
deltas []*rtcp .RecvDelta
}
func newFeedback(senderSSRC , mediaSSRC uint32 , count uint8 ) *feedback {
return &feedback {
rtcp : &rtcp .TransportLayerCC {
SenderSSRC : senderSSRC ,
MediaSSRC : mediaSSRC ,
FbPktCount : count ,
},
}
}
func (f *feedback ) setBase (sequenceNumber uint16 , timeUS int64 ) {
f .baseSequenceNumber = sequenceNumber
f .nextSequenceNumber = f .baseSequenceNumber
f .refTimestamp64MS = timeUS / 64e3
f .lastTimestampUS = f .refTimestamp64MS * 64e3
}
func (f *feedback ) getRTCP () *rtcp .TransportLayerCC {
f .rtcp .PacketStatusCount = f .sequenceNumberCount
f .rtcp .ReferenceTime = uint32 (f .refTimestamp64MS )
f .rtcp .BaseSequenceNumber = f .baseSequenceNumber
for len (f .lastChunk .deltas ) > 0 {
f .chunks = append (f .chunks , f .lastChunk .encode ())
}
f .rtcp .PacketChunks = append (f .rtcp .PacketChunks , f .chunks ...)
f .rtcp .RecvDeltas = f .deltas
padLen := 20 + len (f .rtcp .PacketChunks )*2 + f .len
padding := padLen %4 != 0
for padLen %4 != 0 {
padLen ++
}
f .rtcp .Header = rtcp .Header {
Count : rtcp .FormatTCC ,
Type : rtcp .TypeTransportSpecificFeedback ,
Padding : padding ,
Length : uint16 ((padLen / 4 ) - 1 ),
}
return f .rtcp
}
func (f *feedback ) addReceived (sequenceNumber uint16 , timestampUS int64 ) bool {
deltaUS := timestampUS - f .lastTimestampUS
var delta250US int64
if deltaUS >= 0 {
delta250US = (deltaUS + rtcp .TypeTCCDeltaScaleFactor /2 ) / rtcp .TypeTCCDeltaScaleFactor
} else {
delta250US = (deltaUS - rtcp .TypeTCCDeltaScaleFactor /2 ) / rtcp .TypeTCCDeltaScaleFactor
}
if delta250US < math .MinInt16 || delta250US > math .MaxInt16 {
return false
}
deltaUSRounded := delta250US * rtcp .TypeTCCDeltaScaleFactor
for ; f .nextSequenceNumber != sequenceNumber ; f .nextSequenceNumber ++ {
if !f .lastChunk .canAdd (rtcp .TypeTCCPacketNotReceived ) {
f .chunks = append (f .chunks , f .lastChunk .encode ())
}
f .lastChunk .add (rtcp .TypeTCCPacketNotReceived )
f .sequenceNumberCount ++
}
var recvDelta uint16
switch {
case delta250US >= 0 && delta250US <= 0xff :
f .len ++
recvDelta = rtcp .TypeTCCPacketReceivedSmallDelta
default :
f .len += 2
recvDelta = rtcp .TypeTCCPacketReceivedLargeDelta
}
if !f .lastChunk .canAdd (recvDelta ) {
f .chunks = append (f .chunks , f .lastChunk .encode ())
}
f .lastChunk .add (recvDelta )
f .deltas = append (f .deltas , &rtcp .RecvDelta {
Type : recvDelta ,
Delta : deltaUSRounded ,
})
f .lastTimestampUS += deltaUSRounded
f .sequenceNumberCount ++
f .nextSequenceNumber ++
return true
}
const (
maxRunLengthCap = 0x1fff
maxOneBitCap = 14
maxTwoBitCap = 7
)
type chunk struct {
hasLargeDelta bool
hasDifferentTypes bool
deltas []uint16
}
func (c *chunk ) canAdd (delta uint16 ) bool {
if len (c .deltas ) < maxTwoBitCap {
return true
}
if len (c .deltas ) < maxOneBitCap && !c .hasLargeDelta && delta != rtcp .TypeTCCPacketReceivedLargeDelta {
return true
}
if len (c .deltas ) < maxRunLengthCap && !c .hasDifferentTypes && delta == c .deltas [0 ] {
return true
}
return false
}
func (c *chunk ) add (delta uint16 ) {
c .deltas = append (c .deltas , delta )
c .hasLargeDelta = c .hasLargeDelta || delta == rtcp .TypeTCCPacketReceivedLargeDelta
c .hasDifferentTypes = c .hasDifferentTypes || delta != c .deltas [0 ]
}
func (c *chunk ) encode () rtcp .PacketStatusChunk {
if !c .hasDifferentTypes {
defer c .reset ()
return &rtcp .RunLengthChunk {
PacketStatusSymbol : c .deltas [0 ],
RunLength : uint16 (len (c .deltas )),
}
}
if len (c .deltas ) == maxOneBitCap {
defer c .reset ()
return &rtcp .StatusVectorChunk {
SymbolSize : rtcp .TypeTCCSymbolSizeOneBit ,
SymbolList : c .deltas ,
}
}
minCap := minInt (maxTwoBitCap , len (c .deltas ))
svc := &rtcp .StatusVectorChunk {
SymbolSize : rtcp .TypeTCCSymbolSizeTwoBit ,
SymbolList : c .deltas [:minCap ],
}
c .deltas = c .deltas [minCap :]
c .hasDifferentTypes = false
c .hasLargeDelta = false
if len (c .deltas ) > 0 {
tmp := c .deltas [0 ]
for _ , d := range c .deltas {
if tmp != d {
c .hasDifferentTypes = true
}
if d == rtcp .TypeTCCPacketReceivedLargeDelta {
c .hasLargeDelta = true
}
}
}
return svc
}
func (c *chunk ) reset () {
c .deltas = []uint16 {}
c .hasLargeDelta = false
c .hasDifferentTypes = false
}
func maxInt(a , b int ) int {
if a > b {
return a
}
return b
}
func minInt(a , b int ) int {
if a < b {
return a
}
return b
}
func max64(a , b int64 ) int64 {
if a > b {
return a
}
return b
}
func min64(a , b int64 ) int64 {
if a < b {
return a
}
return b
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .