// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package twcc provides interceptors to implement transport wide congestion control.
package twcc import ( ) const ( packetWindowMicroseconds = 500_000 maxMissingSequenceNumbers = 0x7FFE ) // Recorder records incoming RTP packets and their delays and creates // transport wide congestion control feedback reports as specified in // https://datatracker.ietf.org/doc/html/draft-holmer-rmcat-transport-wide-cc-extensions-01 type Recorder struct { arrivalTimeMap packetArrivalTimeMap sequenceUnwrapper sequencenumber.Unwrapper // startSequenceNumber is the first sequence number that will be included in the the // next feedback packet. startSequenceNumber *int64 senderSSRC uint32 mediaSSRC uint32 fbPktCnt uint8 packetsHeld int } // NewRecorder creates a new Recorder which uses the given senderSSRC in the created // feedback packets. func ( uint32) *Recorder { return &Recorder{ senderSSRC: , } } // Record marks a packet with mediaSSRC and a transport wide sequence number sequenceNumber as received at arrivalTime. func ( *Recorder) ( uint32, uint16, int64) { .mediaSSRC = // "Unwrap" the sequence number to get a monotonically increasing sequence number that // won't wrap around after math.MaxUint16. := .sequenceUnwrapper.Unwrap() .maybeCullOldPackets(, ) if .startSequenceNumber == nil || < *.startSequenceNumber { .startSequenceNumber = & } // We are only interested in the first time a packet is received. if .arrivalTimeMap.HasReceived() { return } .arrivalTimeMap.AddPacket(, ) .packetsHeld++ // Limit the range of sequence numbers to send feedback for. if *.startSequenceNumber < .arrivalTimeMap.BeginSequenceNumber() { := .arrivalTimeMap.BeginSequenceNumber() .startSequenceNumber = & } } func ( *Recorder) ( int64, int64) { if .startSequenceNumber != nil && *.startSequenceNumber >= .arrivalTimeMap.EndSequenceNumber() && >= packetWindowMicroseconds { .arrivalTimeMap.RemoveOldPackets(, -packetWindowMicroseconds) } } // PacketsHeld returns the number of received packets currently held by the recorder. func ( *Recorder) () int { return .packetsHeld } // BuildFeedbackPacket creates a new RTCP packet containing a TWCC feedback report. func ( *Recorder) () []rtcp.Packet { if .startSequenceNumber == nil { return nil } := .arrivalTimeMap.EndSequenceNumber() var []rtcp.Packet for *.startSequenceNumber < { := .maybeBuildFeedbackPacket(*.startSequenceNumber, ) if == nil { break } = append(, .getRTCP()) // NOTE: we don't erase packets from the history in case they need to be resent // after a reordering. They will be removed instead in Record when they get too // old. } .packetsHeld = 0 return } // maybeBuildFeedbackPacket builds a feedback packet starting from startSN (inclusive) until // endSN (exclusive). func ( *Recorder) (, int64) *feedback { // NOTE: The logic of this method is inspired by the implementation in Chrome. // See https://source.chromium.org/chromium/chromium/src/+/refs/heads/main:third_party/webrtc/modules/remote_bitrate_estimator/remote_estimator_proxy.cc;l=276;drc=b5cd13bb6d5d157a5fbe3628b2dd1c1e106203c6 //nolint:lll , := .arrivalTimeMap.Clamp(), .arrivalTimeMap.Clamp() // Create feedback on demand, as we don't yet know if there are packets in the range that have been // received. var *feedback := for := ; < ; ++ { , , := .arrivalTimeMap.FindNextAtOrAfter() = if ! || >= { break } if == nil { = newFeedback(.senderSSRC, .mediaSSRC, .fbPktCnt) .fbPktCnt++ // It should be possible to add seq to this new packet. // If the difference between seq and beginSeqNumInclusive is too large, discard // reporting too old missing packets. := max64(, -maxMissingSequenceNumbers) // baseSequenceNumber is the expected first sequence number. This is known, // but we may not have actually received it, so the base time should be the time // of the first received packet in the feedback. .setBase(uint16(), ) //nolint:gosec // G115 if !.addReceived(uint16(), ) { //nolint:gosec // G115 // Could not add a single received packet to the feedback. // This is unexpected to actually occur, but if it does, we'll // try again after skipping any missing packets. // NOTE: It's fine that we already incremented fbPktCnt, as in essence // we did actually "skip" a feedback (and this matches Chrome's behavior). .startSequenceNumber = & return nil } } else if !.addReceived(uint16(), ) { //nolint:gosec // G115 // Could not add timestamp. Packet may be full. Return // and try again with a fresh packet. break } = + 1 } .startSequenceNumber = & return } type feedback struct { rtcp *rtcp.TransportLayerCC baseSequenceNumber uint16 refTimestamp64MS int64 lastTimestampUS int64 nextSequenceNumber uint16 sequenceNumberCount uint16 len int lastChunk chunk chunks []rtcp.PacketStatusChunk deltas []*rtcp.RecvDelta } func newFeedback(, uint32, uint8) *feedback { return &feedback{ rtcp: &rtcp.TransportLayerCC{ SenderSSRC: , MediaSSRC: , FbPktCount: , }, } } func ( *feedback) ( uint16, int64) { .baseSequenceNumber = .nextSequenceNumber = .baseSequenceNumber .refTimestamp64MS = / 64e3 .lastTimestampUS = .refTimestamp64MS * 64e3 } func ( *feedback) () *rtcp.TransportLayerCC { .rtcp.PacketStatusCount = .sequenceNumberCount .rtcp.ReferenceTime = uint32(.refTimestamp64MS) //nolint:gosec // G115 .rtcp.BaseSequenceNumber = .baseSequenceNumber for len(.lastChunk.deltas) > 0 { .chunks = append(.chunks, .lastChunk.encode()) } .rtcp.PacketChunks = append(.rtcp.PacketChunks, .chunks...) .rtcp.RecvDeltas = .deltas // 4 bytes header + 16 bytes twcc header + 2 bytes for each chunk + length of deltas := 20 + len(.rtcp.PacketChunks)*2 + .len := %4 != 0 for %4 != 0 { ++ } .rtcp.Header = rtcp.Header{ Count: rtcp.FormatTCC, Type: rtcp.TypeTransportSpecificFeedback, Padding: , Length: uint16(( / 4) - 1), //nolint:gosec // G115 } return .rtcp } func ( *feedback) ( uint16, int64) bool { := - .lastTimestampUS var int64 if >= 0 { = ( + rtcp.TypeTCCDeltaScaleFactor/2) / rtcp.TypeTCCDeltaScaleFactor } else { = ( - rtcp.TypeTCCDeltaScaleFactor/2) / rtcp.TypeTCCDeltaScaleFactor } // delta doesn't fit into 16 bit, need to create new packet if < math.MinInt16 || > math.MaxInt16 { return false } := * rtcp.TypeTCCDeltaScaleFactor for ; .nextSequenceNumber != ; .nextSequenceNumber++ { if !.lastChunk.canAdd(rtcp.TypeTCCPacketNotReceived) { .chunks = append(.chunks, .lastChunk.encode()) } .lastChunk.add(rtcp.TypeTCCPacketNotReceived) .sequenceNumberCount++ } var uint16 switch { case >= 0 && <= 0xff: .len++ = rtcp.TypeTCCPacketReceivedSmallDelta default: .len += 2 = rtcp.TypeTCCPacketReceivedLargeDelta } if !.lastChunk.canAdd() { .chunks = append(.chunks, .lastChunk.encode()) } .lastChunk.add() .deltas = append(.deltas, &rtcp.RecvDelta{ Type: , Delta: , }) .lastTimestampUS += .sequenceNumberCount++ .nextSequenceNumber++ return true } const ( maxRunLengthCap = 0x1fff // 13 bits maxOneBitCap = 14 // bits maxTwoBitCap = 7 // bits ) type chunk struct { hasLargeDelta bool hasDifferentTypes bool deltas []uint16 } func ( *chunk) ( uint16) bool { if len(.deltas) < maxTwoBitCap { return true } if len(.deltas) < maxOneBitCap && !.hasLargeDelta && != rtcp.TypeTCCPacketReceivedLargeDelta { return true } if len(.deltas) < maxRunLengthCap && !.hasDifferentTypes && == .deltas[0] { return true } return false } func ( *chunk) ( uint16) { .deltas = append(.deltas, ) .hasLargeDelta = .hasLargeDelta || == rtcp.TypeTCCPacketReceivedLargeDelta .hasDifferentTypes = .hasDifferentTypes || != .deltas[0] } func ( *chunk) () rtcp.PacketStatusChunk { if !.hasDifferentTypes { defer .reset() return &rtcp.RunLengthChunk{ PacketStatusSymbol: .deltas[0], RunLength: uint16(len(.deltas)), //nolint:gosec // G115 } } if len(.deltas) == maxOneBitCap { defer .reset() return &rtcp.StatusVectorChunk{ SymbolSize: rtcp.TypeTCCSymbolSizeOneBit, SymbolList: .deltas, } } := minInt(maxTwoBitCap, len(.deltas)) := &rtcp.StatusVectorChunk{ SymbolSize: rtcp.TypeTCCSymbolSizeTwoBit, SymbolList: .deltas[:], } .deltas = .deltas[:] .hasDifferentTypes = false .hasLargeDelta = false if len(.deltas) > 0 { := .deltas[0] for , := range .deltas { if != { .hasDifferentTypes = true } if == rtcp.TypeTCCPacketReceivedLargeDelta { .hasLargeDelta = true } } } return } func ( *chunk) () { .deltas = []uint16{} .hasLargeDelta = false .hasDifferentTypes = false } func maxInt(, int) int { if > { return } return } func minInt(, int) int { if < { return } return } func max64(, int64) int64 { if > { return } return } func min64(, int64) int64 { if < { return } return }