// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>// SPDX-License-Identifier: MIT
// Package packetio provides packet buffer
package packetioimport ()var errPacketTooBig = errors.New("packet too big")// BufferPacketType allow the Buffer to know which packet protocol is writing.typeBufferPacketTypeintconst (// RTPBufferPacket indicates the Buffer that is handling RTP packetsRTPBufferPacketBufferPacketType = 1// RTCPBufferPacket indicates the Buffer that is handling RTCP packetsRTCPBufferPacketBufferPacketType = 2)// Buffer allows writing packets to an intermediate buffer, which can then be read form.// This is verify similar to bytes.Buffer but avoids combining multiple writes into a single read.typeBufferstruct { mutex sync.Mutex// this is a circular buffer. If head <= tail, then the useful // data is in the interval [head, tail[. If tail < head, then // the useful data is the union of [head, len[ and [0, tail[. // In order to avoid ambiguity when head = tail, we always leave // an unused byte in the buffer. data []byte head, tail int notify chanstruct{} closed bool count int limitCount, limitSize int readDeadline *deadline.Deadline}const ( minSize = 2048 cutoffSize = 128 * 1024 maxSize = 4 * 1024 * 1024)// NewBuffer creates a new Buffer.func () *Buffer {return &Buffer{notify: make(chanstruct{}, 1),readDeadline: deadline.New(), }}// available returns true if the buffer is large enough to fit a packet// of the given size, taking overhead into account.func ( *Buffer) ( int) bool { := .head - .tailif <= 0 { += len(.data) }// we interpret head=tail as empty, so always keep a byte freeif +2+1 > {returnfalse }returntrue}// grow increases the size of the buffer. If it returns nil, then the// buffer has been grown. It returns ErrFull if hits a limit.func ( *Buffer) () error {varintiflen(.data) < cutoffSize { = 2 * len(.data) } else { = 5 * len(.data) / 4 }if < minSize { = minSize }if (.limitSize <= 0 || sizeHardLimit) && > maxSize { = maxSize }// one byte slackif .limitSize > 0 && > .limitSize+1 { = .limitSize + 1 }if <= len(.data) {returnErrFull } := make([]byte, )varintif .head <= .tail {// data was contiguous = copy(, .data[.head:.tail]) } else {// data was discontinuous = copy(, .data[.head:]) += copy([:], .data[:.tail]) } .head = 0 .tail = .data = returnnil}// Write appends a copy of the packet data to the buffer.// Returns ErrFull if the packet doesn't fit.//// Note that the packet size is limited to 65536 bytes since v0.11.0 due to the internal data structure.func ( *Buffer) ( []byte) (int, error) {iflen() >= 0x10000 {return0, errPacketTooBig } .mutex.Lock()if .closed { .mutex.Unlock()return0, io.ErrClosedPipe }if (.limitCount > 0 && .count >= .limitCount) || (.limitSize > 0 && .size()+2+len() > .limitSize) { .mutex.Unlock()return0, ErrFull }// grow the buffer until the packet fitsfor !.available(len()) { := .grow()if != nil { .mutex.Unlock()return0, } }// store the length of the packet .data[.tail] = uint8(len() >> 8) .tail++if .tail >= len(.data) { .tail = 0 } .data[.tail] = uint8(len()) .tail++if .tail >= len(.data) { .tail = 0 }// store the packet := copy(.data[.tail:], ) .tail += if .tail >= len(.data) {// we reached the end, wrap around := copy(.data, [:]) .tail = } .count++select {case .notify<-struct{}{}:default: } .mutex.Unlock()returnlen(), nil}// Read populates the given byte slice, returning the number of bytes read.// Blocks until data is available or the buffer is closed.// Returns io.ErrShortBuffer is the packet is too small to copy the Write.// Returns io.EOF if the buffer is closed.func ( *Buffer) ( []byte) ( int, error) { //nolint:gocognit// Return immediately if the deadline is already exceeded.select {case<-.readDeadline.Done():return0, &netError{ErrTimeout, true, true}default: }for { .mutex.Lock()if .head != .tail {// decode the packet size := .data[.head] .head++if .head >= len(.data) { .head = 0 } := .data[.head] .head++if .head >= len(.data) { .head = 0 } := int((uint16() << 8) | uint16())// determine the number of bytes we'll actually copy := if > len() { = len() }// copy the dataif .head+ < len(.data) {copy(, .data[.head:.head+]) } else { := copy(, .data[.head:])copy([:], .data[:-]) }// advance head, discarding any data that wasn't copied .head += if .head >= len(.data) { .head -= len(.data) }if .head == .tail {// the buffer is empty, reset to beginning // in order to improve cache locality. .head = 0 .tail = 0 } .count-- .mutex.Unlock()if < {return , io.ErrShortBuffer }return , nil }if .closed { .mutex.Unlock()return0, io.EOF } .mutex.Unlock()select {case<-.readDeadline.Done():return0, &netError{ErrTimeout, true, true}case<-.notify: } }}// Close the buffer, unblocking any pending reads.// Data in the buffer can still be read, Read will return io.EOF only when empty.func ( *Buffer) () ( error) { .mutex.Lock()if .closed { .mutex.Unlock()returnnil } .closed = trueclose(.notify) .mutex.Unlock()returnnil}// Count returns the number of packets in the buffer.func ( *Buffer) () int { .mutex.Lock()defer .mutex.Unlock()return .count}// SetLimitCount controls the maximum number of packets that can be buffered.// Causes Write to return ErrFull when this limit is reached.// A zero value will disable this limit.func ( *Buffer) ( int) { .mutex.Lock()defer .mutex.Unlock() .limitCount = }// Size returns the total byte size of packets in the buffer, including// a small amount of administrative overhead.func ( *Buffer) () int { .mutex.Lock()defer .mutex.Unlock()return .size()}func ( *Buffer) () int { := .tail - .headif < 0 { += len(.data) }return}// SetLimitSize controls the maximum number of bytes that can be buffered.// Causes Write to return ErrFull when this limit is reached.// A zero value means 4MB since v0.11.0.//// User can set packetioSizeHardLimit build tag to enable 4MB hard limit.// When packetioSizeHardLimit build tag is set, SetLimitSize exceeding// the hard limit will be silently discarded.func ( *Buffer) ( int) { .mutex.Lock()defer .mutex.Unlock() .limitSize = }// SetReadDeadline sets the deadline for the Read operation.// Setting to zero means no deadline.func ( *Buffer) ( time.Time) error { .readDeadline.Set()returnnil}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.