// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package client implements the API for a TURN client
package client import ( ) const ( maxReadQueueSize = 1024 permRefreshInterval = 120 * time.Second maxRetryAttempts = 3 ) const ( timerIDRefreshAlloc int = iota timerIDRefreshPerms ) type inboundData struct { data []byte from net.Addr } // UDPConn is the implementation of the Conn and PacketConn interfaces for UDP network connections. // compatible with net.PacketConn and net.Conn. type UDPConn struct { bindingMgr *bindingManager // Thread-safe readCh chan *inboundData // Thread-safe closeCh chan struct{} // Thread-safe allocation } // NewUDPConn creates a new instance of UDPConn. func ( *AllocationConfig) *UDPConn { := &UDPConn{ bindingMgr: newBindingManager(), readCh: make(chan *inboundData, maxReadQueueSize), closeCh: make(chan struct{}), allocation: allocation{ client: .Client, relayedAddr: .RelayedAddr, serverAddr: .ServerAddr, readTimer: time.NewTimer(time.Duration(math.MaxInt64)), permMap: newPermissionMap(), username: .Username, realm: .Realm, integrity: .Integrity, _nonce: .Nonce, _lifetime: .Lifetime, net: .Net, log: .Log, }, } .log.Debugf("Initial lifetime: %d seconds", int(.lifetime().Seconds())) .refreshAllocTimer = NewPeriodicTimer( timerIDRefreshAlloc, .onRefreshTimers, .lifetime()/2, ) .refreshPermsTimer = NewPeriodicTimer( timerIDRefreshPerms, .onRefreshTimers, permRefreshInterval, ) if .refreshAllocTimer.Start() { .log.Debugf("Started refresh allocation timer") } if .refreshPermsTimer.Start() { .log.Debugf("Started refresh permission timer") } return } // ReadFrom reads a packet from the connection, // copying the payload into p. It returns the number of // bytes copied into p and the return address that // was on the packet. // It returns the number of bytes read (0 <= n <= len(p)) // and any error encountered. Callers should always process // the n > 0 bytes returned before considering the error err. // ReadFrom can be made to time out and return // an Error with Timeout() == true after a fixed time limit; // see SetDeadline and SetReadDeadline. func ( *UDPConn) ( []byte) ( int, net.Addr, error) { for { select { case := <-.readCh: := copy(, .data) if < len(.data) { return 0, nil, io.ErrShortBuffer } return , .from, nil case <-.readTimer.C: return 0, nil, &net.OpError{ Op: "read", Net: .LocalAddr().Network(), Addr: .LocalAddr(), Err: newTimeoutError("i/o timeout"), } case <-.closeCh: return 0, nil, &net.OpError{ Op: "read", Net: .LocalAddr().Network(), Addr: .LocalAddr(), Err: errClosed, } } } } func ( *allocation) ( *permission, net.Addr) error { .mutex.Lock() defer .mutex.Unlock() if .state() == permStateIdle { // Punch a hole! (this would block a bit..) if := .CreatePermissions(); != nil { .permMap.delete() return } .setState(permStatePermitted) } return nil } // WriteTo writes a packet with payload to addr. // WriteTo can be made to time out and return // an Error with Timeout() == true after a fixed time limit; // see SetDeadline and SetWriteDeadline. // On packet-oriented connections, write timeouts are rare. func ( *UDPConn) ( []byte, net.Addr) (int, error) { //nolint:gocognit,cyclop var error , := .(*net.UDPAddr) if ! { return 0, errUDPAddrCast } // Check if we have a permission for the destination IP addr , := .permMap.find() if ! { = &permission{} .permMap.insert(, ) } for := 0; < maxRetryAttempts; ++ { // c.createPermission() would block, per destination IP (, or perm), // until the perm state becomes "requested". Purpose of this is to // guarantee the order of packets (within the same perm). // Note that CreatePermission transaction may not be complete before // all the data transmission. This is done assuming that the request // will be most likely successful and we can tolerate some loss of // UDP packet (or reorder), inorder to minimize the latency in most cases. if = .createPermission(, ); !errors.Is(, errTryAgain) { break } } if != nil { return 0, } // Bind channel , := .bindingMgr.findByAddr() if ! { = .bindingMgr.create() } := .state() //nolint:nestif if == bindingStateIdle || == bindingStateRequest || == bindingStateFailed { func() { // Block only callers with the same binding until // the binding transaction has been complete .muBind.Lock() defer .muBind.Unlock() // Binding state may have been changed while waiting. check again. if .state() == bindingStateIdle { .setState(bindingStateRequest) go func() { := .bind() if != nil { .log.Warnf("Failed to bind bind(): %s", ) .setState(bindingStateFailed) // Keep going... } else { .setState(bindingStateReady) } }() } }() // Send data using SendIndication := addr2PeerAddress() var *stun.Message , = stun.Build( stun.TransactionID, stun.NewType(stun.MethodSend, stun.ClassIndication), proto.Data(), , stun.Fingerprint, ) if != nil { return 0, } // Indication has no transaction (fire-and-forget) return .client.WriteTo(.Raw, .serverAddr) } // Binding is either ready // Check if the binding needs a refresh func() { .muBind.Lock() defer .muBind.Unlock() if .state() == bindingStateReady && time.Since(.refreshedAt()) > 5*time.Minute { .setState(bindingStateRefresh) go func() { if := .bind(); != nil { .log.Warnf("Failed to bind() for refresh: %s", ) .setState(bindingStateFailed) // Keep going... } else { .setRefreshedAt(time.Now()) .setState(bindingStateReady) } }() } }() // Send via ChannelData _, = .sendChannelData(, .number) if != nil { return 0, } return len(), nil } // Close closes the connection. // Any blocked ReadFrom or WriteTo operations will be unblocked and return errors. func ( *UDPConn) () error { .refreshAllocTimer.Stop() .refreshPermsTimer.Stop() select { case <-.closeCh: return errAlreadyClosed default: close(.closeCh) } .client.OnDeallocated(.relayedAddr) return .refreshAllocation(0, true /* dontWait=true */) } // LocalAddr returns the local network address. func ( *UDPConn) () net.Addr { return .relayedAddr } // SetDeadline sets the read and write deadlines associated // with the connection. It is equivalent to calling both // SetReadDeadline and SetWriteDeadline. // // A deadline is an absolute time after which I/O operations // fail with a timeout (see type Error) instead of // blocking. The deadline applies to all future and pending // I/O, not just the immediately following call to ReadFrom or // WriteTo. After a deadline has been exceeded, the connection // can be refreshed by setting a deadline in the future. // // An idle timeout can be implemented by repeatedly extending // the deadline after successful ReadFrom or WriteTo calls. // // A zero value for t means I/O operations will not time out. func ( *UDPConn) ( time.Time) error { return .SetReadDeadline() } // SetReadDeadline sets the deadline for future ReadFrom calls // and any currently-blocked ReadFrom call. // A zero value for t means ReadFrom will not time out. func ( *UDPConn) ( time.Time) error { var time.Duration if == noDeadline() { = time.Duration(math.MaxInt64) } else { = time.Until() } .readTimer.Reset() return nil } // SetWriteDeadline sets the deadline for future WriteTo calls // and any currently-blocked WriteTo call. // Even if write times out, it may return n > 0, indicating that // some of the data was successfully written. // A zero value for t means WriteTo will not time out. func ( *UDPConn) (time.Time) error { // Write never blocks. return nil } func addr2PeerAddress( net.Addr) proto.PeerAddress { var proto.PeerAddress switch a := .(type) { case *net.UDPAddr: .IP = .IP .Port = .Port case *net.TCPAddr: .IP = .IP .Port = .Port } return } // CreatePermissions Issues a CreatePermission request for the supplied addresses // as described in https://datatracker.ietf.org/doc/html/rfc5766#section-9 func ( *allocation) ( ...net.Addr) error { := []stun.Setter{ stun.TransactionID, stun.NewType(stun.MethodCreatePermission, stun.ClassRequest), } for , := range { = append(, addr2PeerAddress()) } = append(, .username, .realm, .nonce(), .integrity, stun.Fingerprint) , := stun.Build(...) if != nil { return } , := .client.PerformTransaction(, .serverAddr, false) if != nil { return } := .Msg if .Type.Class == stun.ClassErrorResponse { var stun.ErrorCodeAttribute if = .GetFrom(); == nil { if .Code == stun.CodeStaleNonce { .setNonceFromMsg() return errTryAgain } return fmt.Errorf("%s (error %s)", .Type, ) //nolint:goerr113 } return fmt.Errorf("%s", .Type) //nolint:goerr113 } return nil } // HandleInbound passes inbound data in UDPConn. func ( *UDPConn) ( []byte, net.Addr) { // Copy data := make([]byte, len()) copy(, ) select { case .readCh <- &inboundData{data: , from: }: default: .log.Warnf("Receive buffer full") } } // FindAddrByChannelNumber returns a peer address associated with the // channel number on this UDPConn. func ( *UDPConn) ( uint16) (net.Addr, bool) { , := .bindingMgr.findByNumber() if ! { return nil, false } return .addr, true } func ( *UDPConn) ( *binding) error { := []stun.Setter{ stun.TransactionID, stun.NewType(stun.MethodChannelBind, stun.ClassRequest), addr2PeerAddress(.addr), proto.ChannelNumber(.number), .username, .realm, .nonce(), .integrity, stun.Fingerprint, } , := stun.Build(...) if != nil { return } , := .client.PerformTransaction(, .serverAddr, false) if != nil { .bindingMgr.deleteByAddr(.addr) return } := .Msg if .Type != stun.NewType(stun.MethodChannelBind, stun.ClassSuccessResponse) { return fmt.Errorf("unexpected response type %s", .Type) //nolint:goerr113 } .log.Debugf("Channel binding successful: %s %d", .addr, .number) // Success. return nil } func ( *UDPConn) ( []byte, uint16) (int, error) { := &proto.ChannelData{ Data: , Number: proto.ChannelNumber(), } .Encode() , := .client.WriteTo(.Raw, .serverAddr) if != nil { return 0, } return len(), nil }