package webrtc
import (
"fmt"
"sync"
"sync/atomic"
"github.com/pion/ice/v4"
"github.com/pion/logging"
"github.com/pion/stun/v3"
)
type ICEGatherer struct {
lock sync .RWMutex
log logging .LeveledLogger
state ICEGathererState
validatedServers []*stun .URI
gatherPolicy ICETransportPolicy
agent *ice .Agent
onLocalCandidateHandler atomic .Value
onStateChangeHandler atomic .Value
onGatheringCompleteHandler atomic .Value
api *API
sdpMid atomic .Value
sdpMLineIndex atomic .Uint32
}
func (api *API ) NewICEGatherer (opts ICEGatherOptions ) (*ICEGatherer , error ) {
var validatedServers []*stun .URI
if len (opts .ICEServers ) > 0 {
for _ , server := range opts .ICEServers {
url , err := server .urls ()
if err != nil {
return nil , err
}
validatedServers = append (validatedServers , url ...)
}
}
return &ICEGatherer {
state : ICEGathererStateNew ,
gatherPolicy : opts .ICEGatherPolicy ,
validatedServers : validatedServers ,
api : api ,
log : api .settingEngine .LoggerFactory .NewLogger ("ice" ),
sdpMid : atomic .Value {},
sdpMLineIndex : atomic .Uint32 {},
}, nil
}
func (g *ICEGatherer ) createAgent () error {
g .lock .Lock ()
defer g .lock .Unlock ()
if g .agent != nil || g .State () != ICEGathererStateNew {
return nil
}
candidateTypes := []ice .CandidateType {}
if g .api .settingEngine .candidates .ICELite {
candidateTypes = append (candidateTypes , ice .CandidateTypeHost )
} else if g .gatherPolicy == ICETransportPolicyRelay {
candidateTypes = append (candidateTypes , ice .CandidateTypeRelay )
}
var nat1To1CandiTyp ice .CandidateType
switch g .api .settingEngine .candidates .NAT1To1IPCandidateType {
case ICECandidateTypeHost :
nat1To1CandiTyp = ice .CandidateTypeHost
case ICECandidateTypeSrflx :
nat1To1CandiTyp = ice .CandidateTypeServerReflexive
default :
nat1To1CandiTyp = ice .CandidateTypeUnspecified
}
mDNSMode := g .api .settingEngine .candidates .MulticastDNSMode
if mDNSMode != ice .MulticastDNSModeDisabled && mDNSMode != ice .MulticastDNSModeQueryAndGather {
mDNSMode = ice .MulticastDNSModeQueryOnly
}
config := &ice .AgentConfig {
Lite : g .api .settingEngine .candidates .ICELite ,
Urls : g .validatedServers ,
PortMin : g .api .settingEngine .ephemeralUDP .PortMin ,
PortMax : g .api .settingEngine .ephemeralUDP .PortMax ,
DisconnectedTimeout : g .api .settingEngine .timeout .ICEDisconnectedTimeout ,
FailedTimeout : g .api .settingEngine .timeout .ICEFailedTimeout ,
KeepaliveInterval : g .api .settingEngine .timeout .ICEKeepaliveInterval ,
LoggerFactory : g .api .settingEngine .LoggerFactory ,
CandidateTypes : candidateTypes ,
HostAcceptanceMinWait : g .api .settingEngine .timeout .ICEHostAcceptanceMinWait ,
SrflxAcceptanceMinWait : g .api .settingEngine .timeout .ICESrflxAcceptanceMinWait ,
PrflxAcceptanceMinWait : g .api .settingEngine .timeout .ICEPrflxAcceptanceMinWait ,
RelayAcceptanceMinWait : g .api .settingEngine .timeout .ICERelayAcceptanceMinWait ,
STUNGatherTimeout : g .api .settingEngine .timeout .ICESTUNGatherTimeout ,
InterfaceFilter : g .api .settingEngine .candidates .InterfaceFilter ,
IPFilter : g .api .settingEngine .candidates .IPFilter ,
NAT1To1IPs : g .api .settingEngine .candidates .NAT1To1IPs ,
NAT1To1IPCandidateType : nat1To1CandiTyp ,
IncludeLoopback : g .api .settingEngine .candidates .IncludeLoopbackCandidate ,
Net : g .api .settingEngine .net ,
MulticastDNSMode : mDNSMode ,
MulticastDNSHostName : g .api .settingEngine .candidates .MulticastDNSHostName ,
LocalUfrag : g .api .settingEngine .candidates .UsernameFragment ,
LocalPwd : g .api .settingEngine .candidates .Password ,
TCPMux : g .api .settingEngine .iceTCPMux ,
UDPMux : g .api .settingEngine .iceUDPMux ,
ProxyDialer : g .api .settingEngine .iceProxyDialer ,
DisableActiveTCP : g .api .settingEngine .iceDisableActiveTCP ,
MaxBindingRequests : g .api .settingEngine .iceMaxBindingRequests ,
BindingRequestHandler : g .api .settingEngine .iceBindingRequestHandler ,
}
requestedNetworkTypes := g .api .settingEngine .candidates .ICENetworkTypes
if len (requestedNetworkTypes ) == 0 {
requestedNetworkTypes = supportedNetworkTypes ()
}
for _ , typ := range requestedNetworkTypes {
config .NetworkTypes = append (config .NetworkTypes , ice .NetworkType (typ ))
}
agent , err := ice .NewAgent (config )
if err != nil {
return err
}
g .agent = agent
return nil
}
func (g *ICEGatherer ) Gather () error {
if err := g .createAgent (); err != nil {
return err
}
agent := g .getAgent ()
if agent == nil {
return fmt .Errorf ("%w: unable to gather" , errICEAgentNotExist )
}
g .setState (ICEGathererStateGathering )
if err := agent .OnCandidate (func (candidate ice .Candidate ) {
onLocalCandidateHandler := func (*ICECandidate ) {}
if handler , ok := g .onLocalCandidateHandler .Load ().(func (candidate *ICECandidate )); ok && handler != nil {
onLocalCandidateHandler = handler
}
onGatheringCompleteHandler := func () {}
if handler , ok := g .onGatheringCompleteHandler .Load ().(func ()); ok && handler != nil {
onGatheringCompleteHandler = handler
}
sdpMid := ""
if mid , ok := g .sdpMid .Load ().(string ); ok {
sdpMid = mid
}
sdpMLineIndex := uint16 (g .sdpMLineIndex .Load ())
if candidate != nil {
c , err := newICECandidateFromICE (candidate , sdpMid , sdpMLineIndex )
if err != nil {
g .log .Warnf ("Failed to convert ice.Candidate: %s" , err )
return
}
onLocalCandidateHandler (&c )
} else {
g .setState (ICEGathererStateComplete )
onGatheringCompleteHandler ()
onLocalCandidateHandler (nil )
}
}); err != nil {
return err
}
return agent .GatherCandidates ()
}
func (g *ICEGatherer ) setMediaStreamIdentification (mid string , mLineIndex uint16 ) {
g .sdpMid .Store (mid )
g .sdpMLineIndex .Store (uint32 (mLineIndex ))
}
func (g *ICEGatherer ) Close () error {
return g .close (false )
}
func (g *ICEGatherer ) GracefulClose () error {
return g .close (true )
}
func (g *ICEGatherer ) close (shouldGracefullyClose bool ) error {
g .lock .Lock ()
defer g .lock .Unlock ()
if g .agent == nil {
return nil
}
if shouldGracefullyClose {
if err := g .agent .GracefulClose (); err != nil {
return err
}
} else {
if err := g .agent .Close (); err != nil {
return err
}
}
g .agent = nil
g .setState (ICEGathererStateClosed )
return nil
}
func (g *ICEGatherer ) GetLocalParameters () (ICEParameters , error ) {
if err := g .createAgent (); err != nil {
return ICEParameters {}, err
}
agent := g .getAgent ()
if agent == nil {
return ICEParameters {}, fmt .Errorf ("%w: unable to get local parameters" , errICEAgentNotExist )
}
frag , pwd , err := agent .GetLocalUserCredentials ()
if err != nil {
return ICEParameters {}, err
}
return ICEParameters {
UsernameFragment : frag ,
Password : pwd ,
ICELite : false ,
}, nil
}
func (g *ICEGatherer ) GetLocalCandidates () ([]ICECandidate , error ) {
if err := g .createAgent (); err != nil {
return nil , err
}
agent := g .getAgent ()
if agent == nil {
return nil , fmt .Errorf ("%w: unable to get local candidates" , errICEAgentNotExist )
}
iceCandidates , err := agent .GetLocalCandidates ()
if err != nil {
return nil , err
}
sdpMid := ""
if mid , ok := g .sdpMid .Load ().(string ); ok {
sdpMid = mid
}
sdpMLineIndex := uint16 (g .sdpMLineIndex .Load ())
return newICECandidatesFromICE (iceCandidates , sdpMid , sdpMLineIndex )
}
func (g *ICEGatherer ) OnLocalCandidate (f func (*ICECandidate )) {
g .onLocalCandidateHandler .Store (f )
}
func (g *ICEGatherer ) OnStateChange (f func (ICEGathererState )) {
g .onStateChangeHandler .Store (f )
}
func (g *ICEGatherer ) State () ICEGathererState {
return atomicLoadICEGathererState (&g .state )
}
func (g *ICEGatherer ) setState (s ICEGathererState ) {
atomicStoreICEGathererState (&g .state , s )
if handler , ok := g .onStateChangeHandler .Load ().(func (state ICEGathererState )); ok && handler != nil {
handler (s )
}
}
func (g *ICEGatherer ) getAgent () *ice .Agent {
g .lock .RLock ()
defer g .lock .RUnlock ()
return g .agent
}
func (g *ICEGatherer ) collectStats (collector *statsReportCollector ) {
agent := g .getAgent ()
if agent == nil {
return
}
collector .Collecting ()
go func (collector *statsReportCollector , agent *ice .Agent ) {
for _ , candidatePairStats := range agent .GetCandidatePairsStats () {
collector .Collecting ()
stats , err := toICECandidatePairStats (candidatePairStats )
if err != nil {
g .log .Error (err .Error())
collector .Done ()
continue
}
collector .Collect (stats .ID , stats )
}
for _ , candidateStats := range agent .GetLocalCandidatesStats () {
collector .Collecting ()
networkType , err := getNetworkType (candidateStats .NetworkType )
if err != nil {
g .log .Error (err .Error())
}
candidateType , err := getCandidateType (candidateStats .CandidateType )
if err != nil {
g .log .Error (err .Error())
}
stats := ICECandidateStats {
Timestamp : statsTimestampFrom (candidateStats .Timestamp ),
ID : candidateStats .ID ,
Type : StatsTypeLocalCandidate ,
IP : candidateStats .IP ,
Port : int32 (candidateStats .Port ),
Protocol : networkType .Protocol (),
CandidateType : candidateType ,
Priority : int32 (candidateStats .Priority ),
URL : candidateStats .URL ,
RelayProtocol : candidateStats .RelayProtocol ,
Deleted : candidateStats .Deleted ,
}
collector .Collect (stats .ID , stats )
}
for _ , candidateStats := range agent .GetRemoteCandidatesStats () {
collector .Collecting ()
networkType , err := getNetworkType (candidateStats .NetworkType )
if err != nil {
g .log .Error (err .Error())
}
candidateType , err := getCandidateType (candidateStats .CandidateType )
if err != nil {
g .log .Error (err .Error())
}
stats := ICECandidateStats {
Timestamp : statsTimestampFrom (candidateStats .Timestamp ),
ID : candidateStats .ID ,
Type : StatsTypeRemoteCandidate ,
IP : candidateStats .IP ,
Port : int32 (candidateStats .Port ),
Protocol : networkType .Protocol (),
CandidateType : candidateType ,
Priority : int32 (candidateStats .Priority ),
URL : candidateStats .URL ,
RelayProtocol : candidateStats .RelayProtocol ,
}
collector .Collect (stats .ID , stats )
}
collector .Done ()
}(collector , agent )
}
func (g *ICEGatherer ) getSelectedCandidatePairStats () (ICECandidatePairStats , bool ) {
agent := g .getAgent ()
if agent == nil {
return ICECandidatePairStats {}, false
}
selectedCandidatePairStats , isAvailable := agent .GetSelectedCandidatePairStats ()
if !isAvailable {
return ICECandidatePairStats {}, false
}
stats , err := toICECandidatePairStats (selectedCandidatePairStats )
if err != nil {
g .log .Error (err .Error())
return ICECandidatePairStats {}, false
}
return stats , true
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .