// The udpmux package contains the logic for multiplexing multiple WebRTC (ICE) // connections over a single UDP socket.
package udpmux import ( logging pool ) var log = logging.Logger("webrtc-udpmux") // ReceiveBufSize is the size of the buffer used to receive packets from the PacketConn. // It is fine for this number to be higher than the actual path MTU as this value is not // used to decide the packet size on the write path. const ReceiveBufSize = 1500 type Candidate struct { Ufrag string Addr *net.UDPAddr } // UDPMux multiplexes multiple ICE connections over a single net.PacketConn, // generally a UDP socket. // // The connections are indexed by (ufrag, IP address family) and by remote // address from which the connection has received valid STUN/RTC packets. // // When a new packet is received on the underlying net.PacketConn, we // first check the address map to see if there is a connection associated with the // remote address: // If found, we pass the packet to that connection. // Otherwise, we check to see if the packet is a STUN packet. // If it is, we read the ufrag from the STUN packet and use it to check if there // is a connection associated with the (ufrag, IP address family) pair. // If found we add the association to the address map. type UDPMux struct { socket net.PacketConn queue chan Candidate mx sync.Mutex // ufragMap allows us to multiplex incoming STUN packets based on ufrag ufragMap map[ufragConnKey]*muxedConnection // addrMap allows us to correctly direct incoming packets after the connection // is established and ufrag isn't available on all packets addrMap map[string]*muxedConnection // ufragAddrMap allows cleaning up all addresses from the addrMap once the connection is closed // During the ICE connectivity checks, the same ufrag might be used on multiple addresses. ufragAddrMap map[ufragConnKey][]net.Addr // the context controls the lifecycle of the mux wg sync.WaitGroup ctx context.Context cancel context.CancelFunc } var _ ice.UDPMux = &UDPMux{} func ( net.PacketConn) *UDPMux { , := context.WithCancel(context.Background()) := &UDPMux{ ctx: , cancel: , socket: , ufragMap: make(map[ufragConnKey]*muxedConnection), addrMap: make(map[string]*muxedConnection), ufragAddrMap: make(map[ufragConnKey][]net.Addr), queue: make(chan Candidate, 32), } return } func ( *UDPMux) () { .wg.Add(1) go func() { defer .wg.Done() .readLoop() }() } // GetListenAddresses implements ice.UDPMux func ( *UDPMux) () []net.Addr { return []net.Addr{.socket.LocalAddr()} } // GetConn implements ice.UDPMux // It creates a net.PacketConn for a given ufrag if an existing one cannot be found. // We differentiate IPv4 and IPv6 addresses, since a remote is can be reachable at multiple different // UDP addresses of the same IP address family (eg. server-reflexive addresses and peer-reflexive addresses). func ( *UDPMux) ( string, net.Addr) (net.PacketConn, error) { , := .(*net.UDPAddr) if ! { return nil, fmt.Errorf("unexpected address type: %T", ) } select { case <-.ctx.Done(): return nil, io.ErrClosedPipe default: := && .IP.To4() == nil , := .getOrCreateConn(, , , ) return , nil } } // Close implements ice.UDPMux func ( *UDPMux) () error { select { case <-.ctx.Done(): return nil default: } .cancel() .socket.Close() .wg.Wait() return nil } // writeTo writes a packet to the underlying net.PacketConn func ( *UDPMux) ( []byte, net.Addr) (int, error) { return .socket.WriteTo(, ) } func ( *UDPMux) () { for { select { case <-.ctx.Done(): return default: } := pool.Get(ReceiveBufSize) , , := .socket.ReadFrom() if != nil { if strings.Contains(.Error(), "use of closed network connection") || errors.Is(, context.Canceled) { log.Debugf("readLoop exiting: socket %s closed", .socket.LocalAddr()) } else { log.Errorf("error reading from socket %s: %v", .socket.LocalAddr(), ) } pool.Put() return } = [:] if := .processPacket(, ); ! { pool.Put() } } } func ( *UDPMux) ( []byte, net.Addr) ( bool) { , := .(*net.UDPAddr) if ! { log.Errorf("received a non-UDP address: %s", ) return false } := .IP.To4() == nil // Connections are indexed by remote address. We first // check if the remote address has a connection associated // with it. If yes, we push the received packet to the connection .mx.Lock() , := .addrMap[.String()] .mx.Unlock() if { if := .Push(, ); != nil { log.Debugf("could not push packet: %v", ) return false } return true } if !stun.IsMessage() { log.Debug("incoming message is not a STUN message") return false } := &stun.Message{Raw: } if := .Decode(); != nil { log.Debugf("failed to decode STUN message: %s", ) return false } if .Type != stun.BindingRequest { log.Debugf("incoming message should be a STUN binding request, got %s", .Type) return false } , := ufragFromSTUNMessage() if != nil { log.Debugf("could not find STUN username: %s", ) return false } , := .getOrCreateConn(, , , ) if { select { case .queue <- Candidate{Addr: , Ufrag: }: default: log.Debugw("queue full, dropping incoming candidate", "ufrag", , "addr", ) .Close() return false } } if := .Push(, ); != nil { log.Debugf("could not push packet: %v", ) return false } return true } func ( *UDPMux) ( context.Context) (Candidate, error) { select { case := <-.queue: return , nil case <-.Done(): return Candidate{}, .Err() case <-.ctx.Done(): return Candidate{}, .ctx.Err() } } type ufragConnKey struct { ufrag string isIPv6 bool } // ufragFromSTUNMessage returns the local or ufrag // from the STUN username attribute. Local ufrag is the ufrag of the // peer which initiated the connectivity check, e.g in a connectivity // check from A to B, the username attribute will be B_ufrag:A_ufrag // with the local ufrag value being A_ufrag. In case of ice-lite, the // localUfrag value will always be the remote peer's ufrag since ICE-lite // implementations do not generate connectivity checks. In our specific // case, since the local and remote ufrag is equal, we can return // either value. func ufragFromSTUNMessage( *stun.Message) (string, error) { , := .Get(stun.AttrUsername) if != nil { return "", } := bytes.Index(, []byte{':'}) if == -1 { return "", fmt.Errorf("invalid STUN username attribute") } return string([+1:]), nil } // RemoveConnByUfrag removes the connection associated with the ufrag and all the // addresses associated with that connection. This method is called by pion when // a peerconnection is closed. func ( *UDPMux) ( string) { if == "" { return } .mx.Lock() defer .mx.Unlock() for , := range [...]bool{true, false} { := ufragConnKey{ufrag: , isIPv6: } if , := .ufragMap[]; { delete(.ufragMap, ) for , := range .ufragAddrMap[] { delete(.addrMap, .String()) } delete(.ufragAddrMap, ) .close() } } } func ( *UDPMux) ( string, bool, *UDPMux, net.Addr) ( bool, *muxedConnection) { := ufragConnKey{ufrag: , isIPv6: } .mx.Lock() defer .mx.Unlock() if , := .ufragMap[]; { .addrMap[.String()] = .ufragAddrMap[] = append(.ufragAddrMap[], ) return false, } := newMuxedConnection(, ) .ufragMap[] = .addrMap[.String()] = .ufragAddrMap[] = append(.ufragAddrMap[], ) return true, }