// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package packetio provides packet buffer
package packetio import ( ) var errPacketTooBig = errors.New("packet too big") // BufferPacketType allow the Buffer to know which packet protocol is writing. type BufferPacketType int const ( // RTPBufferPacket indicates the Buffer that is handling RTP packets RTPBufferPacket BufferPacketType = 1 // RTCPBufferPacket indicates the Buffer that is handling RTCP packets RTCPBufferPacket BufferPacketType = 2 ) // Buffer allows writing packets to an intermediate buffer, which can then be read form. // This is verify similar to bytes.Buffer but avoids combining multiple writes into a single read. type Buffer struct { mutex sync.Mutex // this is a circular buffer. If head <= tail, then the useful // data is in the interval [head, tail[. If tail < head, then // the useful data is the union of [head, len[ and [0, tail[. // In order to avoid ambiguity when head = tail, we always leave // an unused byte in the buffer. data []byte head, tail int notify chan struct{} closed bool count int limitCount, limitSize int readDeadline *deadline.Deadline } const ( minSize = 2048 cutoffSize = 128 * 1024 maxSize = 4 * 1024 * 1024 ) // NewBuffer creates a new Buffer. func () *Buffer { return &Buffer{ notify: make(chan struct{}, 1), readDeadline: deadline.New(), } } // available returns true if the buffer is large enough to fit a packet // of the given size, taking overhead into account. func ( *Buffer) ( int) bool { := .head - .tail if <= 0 { += len(.data) } // we interpret head=tail as empty, so always keep a byte free if +2+1 > { return false } return true } // grow increases the size of the buffer. If it returns nil, then the // buffer has been grown. It returns ErrFull if hits a limit. func ( *Buffer) () error { var int if len(.data) < cutoffSize { = 2 * len(.data) } else { = 5 * len(.data) / 4 } if < minSize { = minSize } if (.limitSize <= 0 || sizeHardLimit) && > maxSize { = maxSize } // one byte slack if .limitSize > 0 && > .limitSize+1 { = .limitSize + 1 } if <= len(.data) { return ErrFull } := make([]byte, ) var int if .head <= .tail { // data was contiguous = copy(, .data[.head:.tail]) } else { // data was discontinuous = copy(, .data[.head:]) += copy([:], .data[:.tail]) } .head = 0 .tail = .data = return nil } // Write appends a copy of the packet data to the buffer. // Returns ErrFull if the packet doesn't fit. // // Note that the packet size is limited to 65536 bytes since v0.11.0 due to the internal data structure. func ( *Buffer) ( []byte) (int, error) { if len() >= 0x10000 { return 0, errPacketTooBig } .mutex.Lock() if .closed { .mutex.Unlock() return 0, io.ErrClosedPipe } if (.limitCount > 0 && .count >= .limitCount) || (.limitSize > 0 && .size()+2+len() > .limitSize) { .mutex.Unlock() return 0, ErrFull } // grow the buffer until the packet fits for !.available(len()) { := .grow() if != nil { .mutex.Unlock() return 0, } } // store the length of the packet .data[.tail] = uint8(len() >> 8) .tail++ if .tail >= len(.data) { .tail = 0 } .data[.tail] = uint8(len()) .tail++ if .tail >= len(.data) { .tail = 0 } // store the packet := copy(.data[.tail:], ) .tail += if .tail >= len(.data) { // we reached the end, wrap around := copy(.data, [:]) .tail = } .count++ select { case .notify <- struct{}{}: default: } .mutex.Unlock() return len(), nil } // Read populates the given byte slice, returning the number of bytes read. // Blocks until data is available or the buffer is closed. // Returns io.ErrShortBuffer is the packet is too small to copy the Write. // Returns io.EOF if the buffer is closed. func ( *Buffer) ( []byte) ( int, error) { //nolint:gocognit // Return immediately if the deadline is already exceeded. select { case <-.readDeadline.Done(): return 0, &netError{ErrTimeout, true, true} default: } for { .mutex.Lock() if .head != .tail { // decode the packet size := .data[.head] .head++ if .head >= len(.data) { .head = 0 } := .data[.head] .head++ if .head >= len(.data) { .head = 0 } := int((uint16() << 8) | uint16()) // determine the number of bytes we'll actually copy := if > len() { = len() } // copy the data if .head+ < len(.data) { copy(, .data[.head:.head+]) } else { := copy(, .data[.head:]) copy([:], .data[:-]) } // advance head, discarding any data that wasn't copied .head += if .head >= len(.data) { .head -= len(.data) } if .head == .tail { // the buffer is empty, reset to beginning // in order to improve cache locality. .head = 0 .tail = 0 } .count-- .mutex.Unlock() if < { return , io.ErrShortBuffer } return , nil } if .closed { .mutex.Unlock() return 0, io.EOF } .mutex.Unlock() select { case <-.readDeadline.Done(): return 0, &netError{ErrTimeout, true, true} case <-.notify: } } } // Close the buffer, unblocking any pending reads. // Data in the buffer can still be read, Read will return io.EOF only when empty. func ( *Buffer) () ( error) { .mutex.Lock() if .closed { .mutex.Unlock() return nil } .closed = true close(.notify) .mutex.Unlock() return nil } // Count returns the number of packets in the buffer. func ( *Buffer) () int { .mutex.Lock() defer .mutex.Unlock() return .count } // SetLimitCount controls the maximum number of packets that can be buffered. // Causes Write to return ErrFull when this limit is reached. // A zero value will disable this limit. func ( *Buffer) ( int) { .mutex.Lock() defer .mutex.Unlock() .limitCount = } // Size returns the total byte size of packets in the buffer, including // a small amount of administrative overhead. func ( *Buffer) () int { .mutex.Lock() defer .mutex.Unlock() return .size() } func ( *Buffer) () int { := .tail - .head if < 0 { += len(.data) } return } // SetLimitSize controls the maximum number of bytes that can be buffered. // Causes Write to return ErrFull when this limit is reached. // A zero value means 4MB since v0.11.0. // // User can set packetioSizeHardLimit build tag to enable 4MB hard limit. // When packetioSizeHardLimit build tag is set, SetLimitSize exceeding // the hard limit will be silently discarded. func ( *Buffer) ( int) { .mutex.Lock() defer .mutex.Unlock() .limitSize = } // SetReadDeadline sets the deadline for the Read operation. // Setting to zero means no deadline. func ( *Buffer) ( time.Time) error { .readDeadline.Set() return nil }