// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>// SPDX-License-Identifier: MITpackage stunimport ()// NoopHandler just discards any event.func () Handler {returnfunc(Event) {}}// NewAgent initializes and returns new Agent with provided handler.// If h is nil, the NoopHandler will be used.func ( Handler) *Agent {if == nil { = NoopHandler() } := &Agent{transactions: make(map[transactionID]agentTransaction),handler: , }return}// Agent is low-level abstraction over transaction list that// handles concurrency (all calls are goroutine-safe) and// time outs (via Collect call).typeAgentstruct {// transactions is map of transactions that are currently // in progress. Event handling is done in such way when // transaction is unregistered before agentTransaction access, // minimizing mux lock and protecting agentTransaction from // data races via unexpected concurrent access. transactions map[transactionID]agentTransaction closed bool// all calls are invalid if true mux sync.Mutex// protects transactions and closed handler Handler// handles transactions}// Handler handles state changes of transaction.//// Handler is called on transaction state change.// Usage of e is valid only during call, user must// copy needed fields explicitly.typeHandlerfunc(e Event)// Event is passed to Handler describing the transaction event.// Do not reuse outside Handler.typeEventstruct { TransactionID [TransactionIDSize]byte Message *Message Error error}// agentTransaction represents transaction in progress.// Concurrent access is invalid.type agentTransaction struct { id transactionID deadline time.Time}var (// ErrTransactionStopped indicates that transaction was manually stopped.ErrTransactionStopped = errors.New("transaction is stopped")// ErrTransactionNotExists indicates that agent failed to find transaction.ErrTransactionNotExists = errors.New("transaction not exists")// ErrTransactionExists indicates that transaction with same id is already // registered.ErrTransactionExists = errors.New("transaction exists with same id"))// StopWithError removes transaction from list and calls handler with// provided error. Can return ErrTransactionNotExists and ErrAgentClosed.func ( *Agent) ( [TransactionIDSize]byte, error) error { .mux.Lock()if .closed { .mux.Unlock()returnErrAgentClosed } , := .transactions[]delete(.transactions, ) := .handler .mux.Unlock()if ! {returnErrTransactionNotExists } (Event{TransactionID: .id,Error: , })returnnil}// Stop stops transaction by id with ErrTransactionStopped, blocking// until handler returns.func ( *Agent) ( [TransactionIDSize]byte) error {return .StopWithError(, ErrTransactionStopped)}// ErrAgentClosed indicates that agent is in closed state and is unable// to handle transactions.varErrAgentClosed = errors.New("agent is closed")// Start registers transaction with provided id and deadline.// Could return ErrAgentClosed, ErrTransactionExists.//// Agent handler is guaranteed to be eventually called.func ( *Agent) ( [TransactionIDSize]byte, time.Time) error { .mux.Lock()defer .mux.Unlock()if .closed {returnErrAgentClosed } , := .transactions[]if {returnErrTransactionExists } .transactions[] = agentTransaction{id: ,deadline: , }returnnil}// agentCollectCap is initial capacity for Agent.Collect slices,// sufficient to make function zero-alloc in most cases.const agentCollectCap = 100// ErrTransactionTimeOut indicates that transaction has reached deadline.varErrTransactionTimeOut = errors.New("transaction is timed out")// Collect terminates all transactions that have deadline before provided// time, blocking until all handlers will process ErrTransactionTimeOut.// Will return ErrAgentClosed if agent is already closed.//// It is safe to call Collect concurrently but makes no sense.func ( *Agent) ( time.Time) error { := make([]transactionID, 0, agentCollectCap) .mux.Lock()if .closed {// Doing nothing if agent is closed. // All transactions should be already closed // during Close() call. .mux.Unlock()returnErrAgentClosed }// Adding all transactions with deadline before gcTime // to toCall and toRemove slices. // No allocs if there are less than agentCollectCap // timed out transactions.for , := range .transactions {if .deadline.Before() { = append(, ) } }// Un-registering timed out transactions.for , := range {delete(.transactions, ) }// Calling handler does not require locked mutex, // reducing lock time. := .handler .mux.Unlock()// Sending ErrTransactionTimeOut to handler for all transactions, // blocking until last one. := Event{Error: ErrTransactionTimeOut, }for , := range { .TransactionID = () }returnnil}// Process incoming message, synchronously passing it to handler.func ( *Agent) ( *Message) error { := Event{TransactionID: .TransactionID,Message: , } .mux.Lock()if .closed { .mux.Unlock()returnErrAgentClosed } := .handlerdelete(.transactions, .TransactionID) .mux.Unlock() ()returnnil}// SetHandler sets agent handler to h.func ( *Agent) ( Handler) error { .mux.Lock()if .closed { .mux.Unlock()returnErrAgentClosed } .handler = .mux.Unlock()returnnil}// Close terminates all transactions with ErrAgentClosed and renders Agent to// closed state.func ( *Agent) () error { := Event{Error: ErrAgentClosed, } .mux.Lock()if .closed { .mux.Unlock()returnErrAgentClosed }for , := range .transactions { .TransactionID = .id .handler() } .transactions = nil .closed = true .handler = nil .mux.Unlock()returnnil}type transactionID [TransactionIDSize]byte
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.