package ice
import (
"context"
"errors"
"fmt"
"hash/crc32"
"io"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pion/stun/v3"
)
type candidateBase struct {
id string
networkType NetworkType
candidateType CandidateType
component uint16
address string
port int
relatedAddress *CandidateRelatedAddress
tcpType TCPType
resolvedAddr net .Addr
lastSent atomic .Value
lastReceived atomic .Value
conn net .PacketConn
currAgent *Agent
closeCh chan struct {}
closedCh chan struct {}
foundationOverride string
priorityOverride uint32
remoteCandidateCaches map [AddrPort ]Candidate
isLocationTracked bool
extensions []CandidateExtension
}
func (c *candidateBase ) Done () <-chan struct {} {
return c .closeCh
}
func (c *candidateBase ) Err () error {
select {
case <- c .closedCh :
return ErrRunCanceled
default :
return nil
}
}
func (c *candidateBase ) Deadline () (deadline time .Time , ok bool ) {
return time .Time {}, false
}
func (c *candidateBase ) Value (interface {}) interface {} {
return nil
}
func (c *candidateBase ) ID () string {
return c .id
}
func (c *candidateBase ) Foundation () string {
if c .foundationOverride != "" {
return c .foundationOverride
}
return fmt .Sprintf ("%d" , crc32 .ChecksumIEEE ([]byte (c .Type ().String ()+c .address +c .networkType .String ())))
}
func (c *candidateBase ) Address () string {
return c .address
}
func (c *candidateBase ) Port () int {
return c .port
}
func (c *candidateBase ) Type () CandidateType {
return c .candidateType
}
func (c *candidateBase ) NetworkType () NetworkType {
return c .networkType
}
func (c *candidateBase ) Component () uint16 {
return c .component
}
func (c *candidateBase ) SetComponent (component uint16 ) {
c .component = component
}
func (c *candidateBase ) LocalPreference () uint16 {
if c .NetworkType ().IsTCP () {
var otherPref uint16 = 8191
directionPref := func () uint16 {
switch c .Type () {
case CandidateTypeHost , CandidateTypeRelay :
switch c .tcpType {
case TCPTypeActive :
return 6
case TCPTypePassive :
return 4
case TCPTypeSimultaneousOpen :
return 2
case TCPTypeUnspecified :
return 0
}
case CandidateTypePeerReflexive , CandidateTypeServerReflexive :
switch c .tcpType {
case TCPTypeSimultaneousOpen :
return 6
case TCPTypeActive :
return 4
case TCPTypePassive :
return 2
case TCPTypeUnspecified :
return 0
}
case CandidateTypeUnspecified :
return 0
}
return 0
}()
return (1 <<13 )*directionPref + otherPref
}
return defaultLocalPreference
}
func (c *candidateBase ) RelatedAddress () *CandidateRelatedAddress {
return c .relatedAddress
}
func (c *candidateBase ) TCPType () TCPType {
return c .tcpType
}
func (c *candidateBase ) start (a *Agent , conn net .PacketConn , initializedCh <-chan struct {}) {
if c .conn != nil {
c .agent ().log .Warn ("Can't start already started candidateBase" )
return
}
c .currAgent = a
c .conn = conn
c .closeCh = make (chan struct {})
c .closedCh = make (chan struct {})
go c .recvLoop (initializedCh )
}
var bufferPool = sync .Pool {
New : func () interface {} {
return make ([]byte , receiveMTU )
},
}
func (c *candidateBase ) recvLoop (initializedCh <-chan struct {}) {
agent := c .agent ()
defer close (c .closedCh )
select {
case <- initializedCh :
case <- c .closeCh :
return
}
bufferPoolBuffer := bufferPool .Get ()
defer bufferPool .Put (bufferPoolBuffer )
buf , ok := bufferPoolBuffer .([]byte )
if !ok {
return
}
for {
n , srcAddr , err := c .conn .ReadFrom (buf )
if err != nil {
if !(errors .Is (err , io .EOF ) || errors .Is (err , net .ErrClosed )) {
agent .log .Warnf ("Failed to read from candidate %s: %v" , c , err )
}
return
}
c .handleInboundPacket (buf [:n ], srcAddr )
}
}
func (c *candidateBase ) validateSTUNTrafficCache (addr net .Addr ) bool {
if candidate , ok := c .remoteCandidateCaches [toAddrPort (addr )]; ok {
candidate .seen (false )
return true
}
return false
}
func (c *candidateBase ) addRemoteCandidateCache (candidate Candidate , srcAddr net .Addr ) {
if c .validateSTUNTrafficCache (srcAddr ) {
return
}
c .remoteCandidateCaches [toAddrPort (srcAddr )] = candidate
}
func (c *candidateBase ) handleInboundPacket (buf []byte , srcAddr net .Addr ) {
agent := c .agent ()
if stun .IsMessage (buf ) {
msg := &stun .Message {
Raw : make ([]byte , len (buf )),
}
copy (msg .Raw , buf )
if err := msg .Decode (); err != nil {
agent .log .Warnf ("Failed to handle decode ICE from %s to %s: %v" , c .addr (), srcAddr , err )
return
}
if err := agent .loop .Run (c , func (_ context .Context ) {
agent .handleInbound (msg , c , srcAddr )
}); err != nil {
agent .log .Warnf ("Failed to handle message: %v" , err )
}
return
}
if !c .validateSTUNTrafficCache (srcAddr ) {
remoteCandidate , valid := agent .validateNonSTUNTraffic (c , srcAddr )
if !valid {
agent .log .Warnf ("Discarded message from %s, not a valid remote candidate" , c .addr ())
return
}
c .addRemoteCandidateCache (remoteCandidate , srcAddr )
}
if _ , err := agent .buf .Write (buf ); err != nil {
agent .log .Warnf ("Failed to write packet: %s" , err )
return
}
}
func (c *candidateBase ) close () error {
if c .Done () == nil {
return nil
}
select {
case <- c .Done ():
return nil
default :
}
var firstErr error
close (c .closeCh )
if err := c .conn .SetDeadline (time .Now ()); err != nil {
firstErr = err
}
if err := c .conn .Close (); err != nil && firstErr == nil {
firstErr = err
}
if firstErr != nil {
return firstErr
}
<-c .closedCh
return nil
}
func (c *candidateBase ) writeTo (raw []byte , dst Candidate ) (int , error ) {
n , err := c .conn .WriteTo (raw , dst .addr ())
if err != nil {
if errors .Is (err , io .ErrClosedPipe ) {
return n , err
}
c .agent ().log .Infof ("Failed to send packet: %v" , err )
return n , nil
}
c .seen (true )
return n , nil
}
func (c *candidateBase ) TypePreference () uint16 {
pref := c .Type ().Preference ()
if pref == 0 {
return 0
}
if c .NetworkType ().IsTCP () {
var tcpPriorityOffset uint16 = defaultTCPPriorityOffset
if c .agent () != nil {
tcpPriorityOffset = c .agent ().tcpPriorityOffset
}
pref -= tcpPriorityOffset
}
return pref
}
func (c *candidateBase ) Priority () uint32 {
if c .priorityOverride != 0 {
return c .priorityOverride
}
return (1 <<24 )*uint32 (c .TypePreference ()) +
(1 <<8 )*uint32 (c .LocalPreference ()) +
(1 <<0 )*uint32 (256 -c .Component ())
}
func (c *candidateBase ) Equal (other Candidate ) bool {
if c .addr () != other .addr () {
if c .addr () == nil || other .addr () == nil {
return false
}
if !addrEqual (c .addr (), other .addr ()) {
return false
}
}
return c .NetworkType () == other .NetworkType () &&
c .Type () == other .Type () &&
c .Address () == other .Address () &&
c .Port () == other .Port () &&
c .TCPType () == other .TCPType () &&
c .RelatedAddress ().Equal (other .RelatedAddress ())
}
func (c *candidateBase ) DeepEqual (other Candidate ) bool {
return c .Equal (other ) && c .extensionsEqual (other .Extensions ())
}
func (c *candidateBase ) String () string {
return fmt .Sprintf (
"%s %s %s%s (resolved: %v)" ,
c .NetworkType (),
c .Type (),
net .JoinHostPort (c .Address (), strconv .Itoa (c .Port ())),
c .relatedAddress ,
c .resolvedAddr ,
)
}
func (c *candidateBase ) LastReceived () time .Time {
if lastReceived , ok := c .lastReceived .Load ().(time .Time ); ok {
return lastReceived
}
return time .Time {}
}
func (c *candidateBase ) setLastReceived (t time .Time ) {
c .lastReceived .Store (t )
}
func (c *candidateBase ) LastSent () time .Time {
if lastSent , ok := c .lastSent .Load ().(time .Time ); ok {
return lastSent
}
return time .Time {}
}
func (c *candidateBase ) setLastSent (t time .Time ) {
c .lastSent .Store (t )
}
func (c *candidateBase ) seen (outbound bool ) {
if outbound {
c .setLastSent (time .Now ())
} else {
c .setLastReceived (time .Now ())
}
}
func (c *candidateBase ) addr () net .Addr {
return c .resolvedAddr
}
func (c *candidateBase ) filterForLocationTracking () bool {
return c .isLocationTracked
}
func (c *candidateBase ) agent () *Agent {
return c .currAgent
}
func (c *candidateBase ) context () context .Context {
return c
}
func (c *candidateBase ) copy () (Candidate , error ) {
return UnmarshalCandidate (c .Marshal ())
}
func removeZoneIDFromAddress(addr string ) string {
if i := strings .Index (addr , "%" ); i != -1 {
return addr [:i ]
}
return addr
}
func (c *candidateBase ) Marshal () string {
val := c .Foundation ()
if val == " " {
val = ""
}
val = fmt .Sprintf ("%s %d %s %d %s %d typ %s" ,
val ,
c .Component (),
c .NetworkType ().NetworkShort (),
c .Priority (),
removeZoneIDFromAddress (c .Address ()),
c .Port (),
c .Type ())
if r := c .RelatedAddress (); r != nil && r .Address != "" && r .Port != 0 {
val = fmt .Sprintf ("%s raddr %s rport %d" ,
val ,
r .Address ,
r .Port )
}
extensions := c .marshalExtensions ()
if extensions != "" {
val = fmt .Sprintf ("%s %s" , val , extensions )
}
return val
}
type CandidateExtension struct {
Key string
Value string
}
func (c *candidateBase ) Extensions () []CandidateExtension {
tcpType := c .TCPType ()
hasTCPType := 0
if tcpType != TCPTypeUnspecified {
hasTCPType = 1
}
extensions := make ([]CandidateExtension , len (c .extensions )+hasTCPType )
if hasTCPType == 1 {
extensions [0 ] = CandidateExtension {
Key : "tcptype" ,
Value : tcpType .String (),
}
}
copy (extensions [hasTCPType :], c .extensions )
return extensions
}
func (c *candidateBase ) GetExtension (key string ) (CandidateExtension , bool ) {
extension := CandidateExtension {Key : key }
for i := range c .extensions {
if c .extensions [i ].Key == key {
extension .Value = c .extensions [i ].Value
return extension , true
}
}
if key == "tcptype" && c .TCPType () != TCPTypeUnspecified {
extension .Value = c .TCPType ().String ()
return extension , true
}
return extension , false
}
func (c *candidateBase ) AddExtension (ext CandidateExtension ) error {
if ext .Key == "tcptype" {
tcpType := NewTCPType (ext .Value )
if tcpType == TCPTypeUnspecified {
return fmt .Errorf ("%w: invalid or unsupported TCPtype %s" , errParseTCPType , ext .Value )
}
c .tcpType = tcpType
return nil
}
if ext .Key == "" {
return fmt .Errorf ("%w: key is empty" , errParseExtension )
}
for i := range c .extensions {
if c .extensions [i ].Key == ext .Key {
c .extensions [i ] = ext
return nil
}
}
c .extensions = append (c .extensions , ext )
return nil
}
func (c *candidateBase ) RemoveExtension (key string ) (ok bool ) {
if key == "tcptype" {
c .tcpType = TCPTypeUnspecified
ok = true
}
for i := range c .extensions {
if c .extensions [i ].Key == key {
c .extensions = append (c .extensions [:i ], c .extensions [i +1 :]...)
ok = true
break
}
}
return ok
}
func (c *candidateBase ) marshalExtensions () string {
value := ""
exts := c .Extensions ()
for i := range exts {
if value != "" {
value += " "
}
value += exts [i ].Key + " " + exts [i ].Value
}
return value
}
func (c *candidateBase ) extensionsEqual (other []CandidateExtension ) bool {
freq1 := make (map [CandidateExtension ]int )
freq2 := make (map [CandidateExtension ]int )
if len (c .extensions ) != len (other ) {
return false
}
if len (c .extensions ) == 0 {
return true
}
if len (c .extensions ) == 1 {
return c .extensions [0 ] == other [0 ]
}
for i := range c .extensions {
freq1 [c .extensions [i ]]++
freq2 [other [i ]]++
}
for k , v := range freq1 {
if freq2 [k ] != v {
return false
}
}
return true
}
func (c *candidateBase ) setExtensions (extensions []CandidateExtension ) {
c .extensions = extensions
}
func UnmarshalCandidate (raw string ) (Candidate , error ) {
raw = strings .TrimPrefix (raw , "candidate:" )
pos := 0
foundation , pos , err := readCandidateCharToken (raw , pos , 32 )
if err != nil {
return nil , fmt .Errorf ("%w: %v in %s" , errParseFoundation , err , raw )
}
if foundation == "" {
foundation = " "
}
if pos >= len (raw ) {
return nil , fmt .Errorf ("%w: expected component in %s" , errAttributeTooShortICECandidate , raw )
}
component , pos , err := readCandidateDigitToken (raw , pos , 5 )
if err != nil {
return nil , fmt .Errorf ("%w: %v in %s" , errParseComponent , err , raw )
}
if pos >= len (raw ) {
return nil , fmt .Errorf ("%w: expected transport in %s" , errAttributeTooShortICECandidate , raw )
}
protocol , pos := readCandidateStringToken (raw , pos )
if pos >= len (raw ) {
return nil , fmt .Errorf ("%w: expected priority in %s" , errAttributeTooShortICECandidate , raw )
}
priority , pos , err := readCandidateDigitToken (raw , pos , 10 )
if err != nil {
return nil , fmt .Errorf ("%w: %v in %s" , errParsePriority , err , raw )
}
if pos >= len (raw ) {
return nil , fmt .Errorf ("%w: expected address in %s" , errAttributeTooShortICECandidate , raw )
}
address , pos := readCandidateStringToken (raw , pos )
address = removeZoneIDFromAddress (address )
if pos >= len (raw ) {
return nil , fmt .Errorf ("%w: expected port in %s" , errAttributeTooShortICECandidate , raw )
}
port , pos , err := readCandidatePort (raw , pos )
if err != nil {
return nil , fmt .Errorf ("%w: %v in %s" , errParsePort , err , raw )
}
typeKey , pos := readCandidateStringToken (raw , pos )
if typeKey != "typ" {
return nil , fmt .Errorf ("%w (%s)" , ErrUnknownCandidateTyp , typeKey )
}
if pos >= len (raw ) {
return nil , fmt .Errorf ("%w: expected candidate type in %s" , errAttributeTooShortICECandidate , raw )
}
typ , pos := readCandidateStringToken (raw , pos )
raddr , rport , pos , err := tryReadRelativeAddrs (raw , pos )
if err != nil {
return nil , err
}
tcpType := TCPTypeUnspecified
var extensions []CandidateExtension
var tcpTypeRaw string
if pos < len (raw ) {
extensions , tcpTypeRaw , err = unmarshalCandidateExtensions (raw [pos :])
if err != nil {
return nil , fmt .Errorf ("%w: %v" , errParseExtension , err )
}
if tcpTypeRaw != "" {
tcpType = NewTCPType (tcpTypeRaw )
if tcpType == TCPTypeUnspecified {
return nil , fmt .Errorf ("%w: invalid or unsupported TCPtype %s" , errParseTCPType , tcpTypeRaw )
}
}
}
switch typ {
case "host" :
candidate , err := NewCandidateHost (&CandidateHostConfig {
"" ,
protocol ,
address ,
port ,
uint16 (component ),
uint32 (priority ),
foundation ,
tcpType ,
false ,
})
if err != nil {
return nil , err
}
candidate .setExtensions (extensions )
return candidate , nil
case "srflx" :
candidate , err := NewCandidateServerReflexive (&CandidateServerReflexiveConfig {
"" ,
protocol ,
address ,
port ,
uint16 (component ),
uint32 (priority ),
foundation ,
raddr ,
rport ,
})
if err != nil {
return nil , err
}
candidate .setExtensions (extensions )
return candidate , nil
case "prflx" :
candidate , err := NewCandidatePeerReflexive (&CandidatePeerReflexiveConfig {
"" ,
protocol ,
address ,
port ,
uint16 (component ),
uint32 (priority ),
foundation ,
raddr ,
rport ,
})
if err != nil {
return nil , err
}
candidate .setExtensions (extensions )
return candidate , nil
case "relay" :
candidate , err := NewCandidateRelay (&CandidateRelayConfig {
"" ,
protocol ,
address ,
port ,
uint16 (component ),
uint32 (priority ),
foundation ,
raddr ,
rport ,
"" ,
nil ,
})
if err != nil {
return nil , err
}
candidate .setExtensions (extensions )
return candidate , nil
default :
return nil , fmt .Errorf ("%w (%s)" , ErrUnknownCandidateTyp , typ )
}
}
func readCandidateCharToken(raw string , start int , limit int ) (string , int , error ) {
for i , char := range raw [start :] {
if char == 0x20 {
return raw [start : start +i ], start + i + 1 , nil
}
if i == limit {
return "" , 0 , fmt .Errorf ("token too long: %s expected 1x%d" , raw [start :start +i ], limit )
}
if !(char >= 'A' && char <= 'Z' ||
char >= 'a' && char <= 'z' ||
char >= '0' && char <= '9' ||
char == '+' || char == '/' ) {
return "" , 0 , fmt .Errorf ("invalid ice-char token: %c" , char )
}
}
return raw [start :], len (raw ), nil
}
func readCandidateStringToken(raw string , start int ) (string , int ) {
for i , char := range raw [start :] {
if char == 0x20 {
return raw [start : start +i ], start + i + 1
}
}
return raw [start :], len (raw )
}
func readCandidateDigitToken(raw string , start , limit int ) (int , int , error ) {
var val int
for i , char := range raw [start :] {
if char == 0x20 {
return val , start + i + 1 , nil
}
if i == limit {
return 0 , 0 , fmt .Errorf ("token too long: %s expected 1x%d" , raw [start :start +i ], limit )
}
if !(char >= '0' && char <= '9' ) {
return 0 , 0 , fmt .Errorf ("invalid digit token: %c" , char )
}
val = val *10 + int (char -'0' )
}
return val , len (raw ), nil
}
func readCandidatePort(raw string , start int ) (int , int , error ) {
port , pos , err := readCandidateDigitToken (raw , start , 5 )
if err != nil {
return 0 , 0 , err
}
if port > 65535 {
return 0 , 0 , fmt .Errorf ("invalid RFC 4566 port %d" , port )
}
return port , pos , nil
}
func readCandidateByteString(raw string , start int ) (string , int , error ) {
for i , char := range raw [start :] {
if char == 0x20 {
return raw [start : start +i ], start + i + 1 , nil
}
if !(char >= 0x01 && char <= 0x09 ||
char >= 0x0B && char <= 0x0C ||
char >= 0x0E && char <= 0xFF ) {
return "" , 0 , fmt .Errorf ("invalid byte-string character: %c" , char )
}
}
return raw [start :], len (raw ), nil
}
func tryReadRelativeAddrs(raw string , start int ) (raddr string , rport , pos int , err error ) {
key , pos := readCandidateStringToken (raw , start )
if key != "raddr" {
return "" , 0 , start , nil
}
if pos >= len (raw ) {
return "" , 0 , 0 , fmt .Errorf ("%w: expected raddr value in %s" , errParseRelatedAddr , raw )
}
raddr , pos = readCandidateStringToken (raw , pos )
if pos >= len (raw ) {
return "" , 0 , 0 , fmt .Errorf ("%w: expected rport in %s" , errParseRelatedAddr , raw )
}
key , pos = readCandidateStringToken (raw , pos )
if key != "rport" {
return "" , 0 , 0 , fmt .Errorf ("%w: expected rport in %s" , errParseRelatedAddr , raw )
}
if pos >= len (raw ) {
return "" , 0 , 0 , fmt .Errorf ("%w: expected rport value in %s" , errParseRelatedAddr , raw )
}
rport , pos , err = readCandidatePort (raw , pos )
if err != nil {
return "" , 0 , 0 , fmt .Errorf ("%w: %v" , errParseRelatedAddr , err )
}
return raddr , rport , pos , nil
}
func unmarshalCandidateExtensions(raw string ) (extensions []CandidateExtension , rawTCPTypeRaw string , err error ) {
extensions = make ([]CandidateExtension , 0 )
if raw == "" {
return extensions , "" , nil
}
if raw [0 ] == 0x20 {
return extensions , "" , fmt .Errorf ("%w: unexpected space %s" , errParseExtension , raw )
}
for i := 0 ; i < len (raw ); {
key , next , err := readCandidateByteString (raw , i )
if err != nil {
return extensions , "" , fmt .Errorf (
"%w: failed to read key %v" , errParseExtension , err ,
)
}
i = next
var value string
if i < len (raw ) {
value , next , err = readCandidateByteString (raw , i )
if err != nil {
return extensions , "" , fmt .Errorf (
"%w: failed to read value %v" , errParseExtension , err ,
)
}
i = next
}
if key == "tcptype" {
rawTCPTypeRaw = value
continue
}
extensions = append (extensions , CandidateExtension {key , value })
}
return extensions , rawTCPTypeRaw , nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .