package webrtc
import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"errors"
"fmt"
"io"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/pion/ice/v4"
"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/sdp/v3"
"github.com/pion/srtp/v3"
"github.com/pion/webrtc/v4/internal/util"
"github.com/pion/webrtc/v4/pkg/rtcerr"
)
type PeerConnection struct {
statsID string
mu sync .RWMutex
sdpOrigin sdp .Origin
ops *operations
configuration Configuration
currentLocalDescription *SessionDescription
pendingLocalDescription *SessionDescription
currentRemoteDescription *SessionDescription
pendingRemoteDescription *SessionDescription
signalingState SignalingState
iceConnectionState atomic .Value
connectionState atomic .Value
idpLoginURL *string
isClosed *atomicBool
isGracefullyClosingOrClosed bool
isCloseDone chan struct {}
isGracefulCloseDone chan struct {}
isNegotiationNeeded *atomicBool
updateNegotiationNeededFlagOnEmptyChain *atomicBool
lastOffer string
lastAnswer string
greaterMid int
rtpTransceivers []*RTPTransceiver
nonMediaBandwidthProbe atomic .Value
onSignalingStateChangeHandler func (SignalingState )
onICEConnectionStateChangeHandler atomic .Value
onConnectionStateChangeHandler atomic .Value
onTrackHandler func (*TrackRemote , *RTPReceiver )
onDataChannelHandler func (*DataChannel )
onNegotiationNeededHandler atomic .Value
iceGatherer *ICEGatherer
iceTransport *ICETransport
dtlsTransport *DTLSTransport
sctpTransport *SCTPTransport
api *API
log logging .LeveledLogger
interceptorRTCPWriter interceptor .RTCPWriter
}
func NewPeerConnection (configuration Configuration ) (*PeerConnection , error ) {
api := NewAPI ()
return api .NewPeerConnection (configuration )
}
func (api *API ) NewPeerConnection (configuration Configuration ) (*PeerConnection , error ) {
pc := &PeerConnection {
statsID : fmt .Sprintf ("PeerConnection-%d" , time .Now ().UnixNano ()),
configuration : Configuration {
ICEServers : []ICEServer {},
ICETransportPolicy : ICETransportPolicyAll ,
BundlePolicy : BundlePolicyBalanced ,
RTCPMuxPolicy : RTCPMuxPolicyRequire ,
Certificates : []Certificate {},
ICECandidatePoolSize : 0 ,
},
isClosed : &atomicBool {},
isCloseDone : make (chan struct {}),
isGracefulCloseDone : make (chan struct {}),
isNegotiationNeeded : &atomicBool {},
updateNegotiationNeededFlagOnEmptyChain : &atomicBool {},
lastOffer : "" ,
lastAnswer : "" ,
greaterMid : -1 ,
signalingState : SignalingStateStable ,
api : api ,
log : api .settingEngine .LoggerFactory .NewLogger ("pc" ),
}
pc .ops = newOperations (pc .updateNegotiationNeededFlagOnEmptyChain , pc .onNegotiationNeeded )
pc .iceConnectionState .Store (ICEConnectionStateNew )
pc .connectionState .Store (PeerConnectionStateNew )
i , err := api .interceptorRegistry .Build ("" )
if err != nil {
return nil , err
}
pc .api = &API {
settingEngine : api .settingEngine ,
interceptor : i ,
}
if api .settingEngine .disableMediaEngineCopy {
pc .api .mediaEngine = api .mediaEngine
} else {
pc .api .mediaEngine = api .mediaEngine .copy ()
pc .api .mediaEngine .setMultiCodecNegotiation (!api .settingEngine .disableMediaEngineMultipleCodecs )
}
if err = pc .initConfiguration (configuration ); err != nil {
return nil , err
}
pc .iceGatherer , err = pc .createICEGatherer ()
if err != nil {
return nil , err
}
iceTransport := pc .createICETransport ()
pc .iceTransport = iceTransport
dtlsTransport , err := pc .api .NewDTLSTransport (pc .iceTransport , pc .configuration .Certificates )
if err != nil {
return nil , err
}
pc .dtlsTransport = dtlsTransport
pc .sctpTransport = pc .api .NewSCTPTransport (pc .dtlsTransport )
pc .sctpTransport .OnDataChannel (func (d *DataChannel ) {
pc .mu .RLock ()
handler := pc .onDataChannelHandler
pc .mu .RUnlock ()
if handler != nil {
handler (d )
}
})
pc .interceptorRTCPWriter = pc .api .interceptor .BindRTCPWriter (interceptor .RTCPWriterFunc (pc .writeRTCP ))
return pc , nil
}
func (pc *PeerConnection ) initConfiguration (configuration Configuration ) error {
if configuration .PeerIdentity != "" {
pc .configuration .PeerIdentity = configuration .PeerIdentity
}
if len (configuration .Certificates ) > 0 {
now := time .Now ()
for _ , x509Cert := range configuration .Certificates {
if !x509Cert .Expires ().IsZero () && now .After (x509Cert .Expires ()) {
return &rtcerr .InvalidAccessError {Err : ErrCertificateExpired }
}
pc .configuration .Certificates = append (pc .configuration .Certificates , x509Cert )
}
} else {
sk , err := ecdsa .GenerateKey (elliptic .P256 (), rand .Reader )
if err != nil {
return &rtcerr .UnknownError {Err : err }
}
certificate , err := GenerateCertificate (sk )
if err != nil {
return err
}
pc .configuration .Certificates = []Certificate {*certificate }
}
if configuration .BundlePolicy != BundlePolicyUnknown {
pc .configuration .BundlePolicy = configuration .BundlePolicy
}
if configuration .RTCPMuxPolicy != RTCPMuxPolicyUnknown {
pc .configuration .RTCPMuxPolicy = configuration .RTCPMuxPolicy
}
if configuration .ICECandidatePoolSize != 0 {
pc .configuration .ICECandidatePoolSize = configuration .ICECandidatePoolSize
}
pc .configuration .ICETransportPolicy = configuration .ICETransportPolicy
pc .configuration .SDPSemantics = configuration .SDPSemantics
sanitizedICEServers := configuration .getICEServers ()
if len (sanitizedICEServers ) > 0 {
for _ , server := range sanitizedICEServers {
if err := server .validate (); err != nil {
return err
}
}
pc .configuration .ICEServers = sanitizedICEServers
}
return nil
}
func (pc *PeerConnection ) OnSignalingStateChange (f func (SignalingState )) {
pc .mu .Lock ()
defer pc .mu .Unlock ()
pc .onSignalingStateChangeHandler = f
}
func (pc *PeerConnection ) onSignalingStateChange (newState SignalingState ) {
pc .mu .RLock ()
handler := pc .onSignalingStateChangeHandler
pc .mu .RUnlock ()
pc .log .Infof ("signaling state changed to %s" , newState )
if handler != nil {
go handler (newState )
}
}
func (pc *PeerConnection ) OnDataChannel (f func (*DataChannel )) {
pc .mu .Lock ()
defer pc .mu .Unlock ()
pc .onDataChannelHandler = f
}
func (pc *PeerConnection ) OnNegotiationNeeded (f func ()) {
pc .onNegotiationNeededHandler .Store (f )
}
func (pc *PeerConnection ) onNegotiationNeeded () {
if !pc .ops .IsEmpty () {
pc .updateNegotiationNeededFlagOnEmptyChain .set (true )
return
}
pc .ops .Enqueue (pc .negotiationNeededOp )
}
func (pc *PeerConnection ) negotiationNeededOp () {
if pc .isClosed .get () {
return
}
if !pc .ops .IsEmpty () {
pc .updateNegotiationNeededFlagOnEmptyChain .set (true )
return
}
if pc .SignalingState () != SignalingStateStable {
return
}
if !pc .checkNegotiationNeeded () {
pc .isNegotiationNeeded .set (false )
return
}
if pc .isNegotiationNeeded .get () {
return
}
pc .isNegotiationNeeded .set (true )
if handler , ok := pc .onNegotiationNeededHandler .Load ().(func ()); ok && handler != nil {
handler ()
}
}
func (pc *PeerConnection ) checkNegotiationNeeded () bool {
pc .mu .Lock ()
defer pc .mu .Unlock ()
localDesc := pc .currentLocalDescription
remoteDesc := pc .currentRemoteDescription
if localDesc == nil {
return true
}
pc .sctpTransport .lock .Lock ()
lenDataChannel := len (pc .sctpTransport .dataChannels )
pc .sctpTransport .lock .Unlock ()
if lenDataChannel != 0 && haveDataChannel (localDesc ) == nil {
return true
}
for _ , transceiver := range pc .rtpTransceivers {
mid := getByMid (transceiver .Mid (), localDesc )
if mid == nil {
return true
}
if transceiver .Direction () == RTPTransceiverDirectionSendrecv ||
transceiver .Direction () == RTPTransceiverDirectionSendonly {
descMsid , okMsid := mid .Attribute (sdp .AttrKeyMsid )
sender := transceiver .Sender ()
if sender == nil {
return true
}
track := sender .Track ()
if track == nil {
continue
}
if !okMsid || descMsid != track .StreamID ()+" " +track .ID () {
return true
}
}
switch localDesc .Type {
case SDPTypeOffer :
rm := getByMid (transceiver .Mid (), remoteDesc )
if rm == nil {
return true
}
if getPeerDirection (mid ) != transceiver .Direction () && getPeerDirection (rm ) != transceiver .Direction ().Revers () {
return true
}
case SDPTypeAnswer :
if _ , ok := mid .Attribute (transceiver .Direction ().String ()); !ok {
return true
}
default :
}
}
return false
}
func (pc *PeerConnection ) OnICECandidate (f func (*ICECandidate )) {
pc .iceGatherer .OnLocalCandidate (f )
}
func (pc *PeerConnection ) OnICEGatheringStateChange (f func (ICEGatheringState )) {
pc .iceGatherer .OnStateChange (
func (gathererState ICEGathererState ) {
switch gathererState {
case ICEGathererStateGathering :
f (ICEGatheringStateGathering )
case ICEGathererStateComplete :
f (ICEGatheringStateComplete )
default :
}
})
}
func (pc *PeerConnection ) OnTrack (f func (*TrackRemote , *RTPReceiver )) {
pc .mu .Lock ()
defer pc .mu .Unlock ()
pc .onTrackHandler = f
}
func (pc *PeerConnection ) onTrack (t *TrackRemote , r *RTPReceiver ) {
pc .mu .RLock ()
handler := pc .onTrackHandler
pc .mu .RUnlock ()
pc .log .Debugf ("got new track: %+v" , t )
if t != nil {
if handler != nil {
go handler (t , r )
} else {
pc .log .Warnf ("OnTrack unset, unable to handle incoming media streams" )
}
}
}
func (pc *PeerConnection ) OnICEConnectionStateChange (f func (ICEConnectionState )) {
pc .onICEConnectionStateChangeHandler .Store (f )
}
func (pc *PeerConnection ) onICEConnectionStateChange (cs ICEConnectionState ) {
pc .iceConnectionState .Store (cs )
pc .log .Infof ("ICE connection state changed: %s" , cs )
if handler , ok := pc .onICEConnectionStateChangeHandler .Load ().(func (ICEConnectionState )); ok && handler != nil {
handler (cs )
}
}
func (pc *PeerConnection ) OnConnectionStateChange (f func (PeerConnectionState )) {
pc .onConnectionStateChangeHandler .Store (f )
}
func (pc *PeerConnection ) onConnectionStateChange (cs PeerConnectionState ) {
pc .connectionState .Store (cs )
pc .log .Infof ("peer connection state changed: %s" , cs )
if handler , ok := pc .onConnectionStateChangeHandler .Load ().(func (PeerConnectionState )); ok && handler != nil {
go handler (cs )
}
}
func (pc *PeerConnection ) SetConfiguration (configuration Configuration ) error {
if pc .isClosed .get () {
return &rtcerr .InvalidStateError {Err : ErrConnectionClosed }
}
if configuration .PeerIdentity != "" {
if configuration .PeerIdentity != pc .configuration .PeerIdentity {
return &rtcerr .InvalidModificationError {Err : ErrModifyingPeerIdentity }
}
pc .configuration .PeerIdentity = configuration .PeerIdentity
}
if len (configuration .Certificates ) > 0 {
if len (configuration .Certificates ) != len (pc .configuration .Certificates ) {
return &rtcerr .InvalidModificationError {Err : ErrModifyingCertificates }
}
for i , certificate := range configuration .Certificates {
if !pc .configuration .Certificates [i ].Equals (certificate ) {
return &rtcerr .InvalidModificationError {Err : ErrModifyingCertificates }
}
}
pc .configuration .Certificates = configuration .Certificates
}
if configuration .BundlePolicy != BundlePolicyUnknown {
if configuration .BundlePolicy != pc .configuration .BundlePolicy {
return &rtcerr .InvalidModificationError {Err : ErrModifyingBundlePolicy }
}
pc .configuration .BundlePolicy = configuration .BundlePolicy
}
if configuration .RTCPMuxPolicy != RTCPMuxPolicyUnknown {
if configuration .RTCPMuxPolicy != pc .configuration .RTCPMuxPolicy {
return &rtcerr .InvalidModificationError {Err : ErrModifyingRTCPMuxPolicy }
}
pc .configuration .RTCPMuxPolicy = configuration .RTCPMuxPolicy
}
if configuration .ICECandidatePoolSize != 0 {
if pc .configuration .ICECandidatePoolSize != configuration .ICECandidatePoolSize &&
pc .LocalDescription () != nil {
return &rtcerr .InvalidModificationError {Err : ErrModifyingICECandidatePoolSize }
}
pc .configuration .ICECandidatePoolSize = configuration .ICECandidatePoolSize
}
pc .configuration .ICETransportPolicy = configuration .ICETransportPolicy
if len (configuration .ICEServers ) > 0 {
for _ , server := range configuration .ICEServers {
if err := server .validate (); err != nil {
return err
}
}
pc .configuration .ICEServers = configuration .ICEServers
}
return nil
}
func (pc *PeerConnection ) GetConfiguration () Configuration {
return pc .configuration
}
func (pc *PeerConnection ) getStatsID () string {
pc .mu .RLock ()
defer pc .mu .RUnlock ()
return pc .statsID
}
func (pc *PeerConnection ) hasLocalDescriptionChanged (desc *SessionDescription ) bool {
for _ , t := range pc .rtpTransceivers {
m := getByMid (t .Mid (), desc )
if m == nil {
return true
}
if getPeerDirection (m ) != t .Direction () {
return true
}
}
return false
}
func (pc *PeerConnection ) CreateOffer (options *OfferOptions ) (SessionDescription , error ) {
useIdentity := pc .idpLoginURL != nil
switch {
case useIdentity :
return SessionDescription {}, errIdentityProviderNotImplemented
case pc .isClosed .get ():
return SessionDescription {}, &rtcerr .InvalidStateError {Err : ErrConnectionClosed }
}
if options != nil && options .ICERestart {
if err := pc .iceTransport .restart (); err != nil {
return SessionDescription {}, err
}
}
var (
descr *sdp .SessionDescription
offer SessionDescription
err error
)
count := 0
pc .mu .Lock ()
defer pc .mu .Unlock ()
for {
currentTransceivers := pc .rtpTransceivers
isPlanB := pc .configuration .SDPSemantics == SDPSemanticsPlanB
if pc .currentRemoteDescription != nil && isPlanB {
isPlanB = descriptionPossiblyPlanB (pc .currentRemoteDescription )
}
if !isPlanB {
if pc .currentRemoteDescription != nil {
var numericMid int
for _ , media := range pc .currentRemoteDescription .parsed .MediaDescriptions {
mid := getMidValue (media )
if mid == "" {
continue
}
numericMid , err = strconv .Atoi (mid )
if err != nil {
continue
}
if numericMid > pc .greaterMid {
pc .greaterMid = numericMid
}
}
}
for _ , t := range currentTransceivers {
if mid := t .Mid (); mid != "" {
numericMid , errMid := strconv .Atoi (mid )
if errMid == nil {
if numericMid > pc .greaterMid {
pc .greaterMid = numericMid
}
}
continue
}
pc .greaterMid ++
err = t .SetMid (strconv .Itoa (pc .greaterMid ))
if err != nil {
return SessionDescription {}, err
}
}
}
if pc .currentRemoteDescription == nil {
descr , err = pc .generateUnmatchedSDP (currentTransceivers , useIdentity )
} else {
descr , err = pc .generateMatchedSDP (
currentTransceivers ,
useIdentity ,
true ,
connectionRoleFromDtlsRole (defaultDtlsRoleOffer ),
)
}
if err != nil {
return SessionDescription {}, err
}
updateSDPOrigin (&pc .sdpOrigin , descr )
sdpBytes , err := descr .Marshal ()
if err != nil {
return SessionDescription {}, err
}
offer = SessionDescription {
Type : SDPTypeOffer ,
SDP : string (sdpBytes ),
parsed : descr ,
}
if isPlanB || !pc .hasLocalDescriptionChanged (&offer ) {
break
}
count ++
if count >= 128 {
return SessionDescription {}, errExcessiveRetries
}
}
pc .lastOffer = offer .SDP
return offer , nil
}
func (pc *PeerConnection ) createICEGatherer () (*ICEGatherer , error ) {
g , err := pc .api .NewICEGatherer (ICEGatherOptions {
ICEServers : pc .configuration .getICEServers (),
ICEGatherPolicy : pc .configuration .ICETransportPolicy ,
})
if err != nil {
return nil , err
}
return g , nil
}
func (pc *PeerConnection ) updateConnectionState (
iceConnectionState ICEConnectionState ,
dtlsTransportState DTLSTransportState ,
) {
connectionState := PeerConnectionStateNew
switch {
case pc .isClosed .get ():
connectionState = PeerConnectionStateClosed
case iceConnectionState == ICEConnectionStateFailed || dtlsTransportState == DTLSTransportStateFailed :
connectionState = PeerConnectionStateFailed
case iceConnectionState == ICEConnectionStateDisconnected :
connectionState = PeerConnectionStateDisconnected
case (iceConnectionState == ICEConnectionStateNew || iceConnectionState == ICEConnectionStateClosed ) &&
(dtlsTransportState == DTLSTransportStateNew || dtlsTransportState == DTLSTransportStateClosed ):
connectionState = PeerConnectionStateNew
case (iceConnectionState == ICEConnectionStateNew || iceConnectionState == ICEConnectionStateChecking ) ||
(dtlsTransportState == DTLSTransportStateNew || dtlsTransportState == DTLSTransportStateConnecting ):
connectionState = PeerConnectionStateConnecting
case (iceConnectionState == ICEConnectionStateConnected ||
iceConnectionState == ICEConnectionStateCompleted || iceConnectionState == ICEConnectionStateClosed ) &&
(dtlsTransportState == DTLSTransportStateConnected || dtlsTransportState == DTLSTransportStateClosed ):
connectionState = PeerConnectionStateConnected
}
if pc .connectionState .Load () == connectionState {
return
}
pc .onConnectionStateChange (connectionState )
}
func (pc *PeerConnection ) createICETransport () *ICETransport {
transport := pc .api .NewICETransport (pc .iceGatherer )
transport .internalOnConnectionStateChangeHandler .Store (func (state ICETransportState ) {
var cs ICEConnectionState
switch state {
case ICETransportStateNew :
cs = ICEConnectionStateNew
case ICETransportStateChecking :
cs = ICEConnectionStateChecking
case ICETransportStateConnected :
cs = ICEConnectionStateConnected
case ICETransportStateCompleted :
cs = ICEConnectionStateCompleted
case ICETransportStateFailed :
cs = ICEConnectionStateFailed
case ICETransportStateDisconnected :
cs = ICEConnectionStateDisconnected
case ICETransportStateClosed :
cs = ICEConnectionStateClosed
default :
pc .log .Warnf ("OnConnectionStateChange: unhandled ICE state: %s" , state )
return
}
pc .onICEConnectionStateChange (cs )
pc .updateConnectionState (cs , pc .dtlsTransport .State ())
})
return transport
}
func (pc *PeerConnection ) CreateAnswer (*AnswerOptions ) (SessionDescription , error ) {
useIdentity := pc .idpLoginURL != nil
remoteDesc := pc .RemoteDescription ()
switch {
case remoteDesc == nil :
return SessionDescription {}, &rtcerr .InvalidStateError {Err : ErrNoRemoteDescription }
case useIdentity :
return SessionDescription {}, errIdentityProviderNotImplemented
case pc .isClosed .get ():
return SessionDescription {}, &rtcerr .InvalidStateError {Err : ErrConnectionClosed }
case pc .signalingState .Get () != SignalingStateHaveRemoteOffer &&
pc .signalingState .Get () != SignalingStateHaveLocalPranswer :
return SessionDescription {}, &rtcerr .InvalidStateError {Err : ErrIncorrectSignalingState }
}
connectionRole := connectionRoleFromDtlsRole (pc .api .settingEngine .answeringDTLSRole )
if connectionRole == sdp .ConnectionRole (0 ) {
connectionRole = connectionRoleFromDtlsRole (defaultDtlsRoleAnswer )
if isIceLiteSet (remoteDesc .parsed ) && !pc .api .settingEngine .candidates .ICELite {
connectionRole = connectionRoleFromDtlsRole (DTLSRoleServer )
}
}
pc .mu .Lock ()
defer pc .mu .Unlock ()
descr , err := pc .generateMatchedSDP (pc .rtpTransceivers , useIdentity , false , connectionRole )
if err != nil {
return SessionDescription {}, err
}
updateSDPOrigin (&pc .sdpOrigin , descr )
sdpBytes , err := descr .Marshal ()
if err != nil {
return SessionDescription {}, err
}
desc := SessionDescription {
Type : SDPTypeAnswer ,
SDP : string (sdpBytes ),
parsed : descr ,
}
pc .lastAnswer = desc .SDP
return desc , nil
}
func (pc *PeerConnection ) setDescription (sd *SessionDescription , op stateChangeOp ) error {
switch {
case pc .isClosed .get ():
return &rtcerr .InvalidStateError {Err : ErrConnectionClosed }
case NewSDPType (sd .Type .String ()) == SDPTypeUnknown :
return &rtcerr .TypeError {
Err : fmt .Errorf ("%w: '%d' is not a valid enum value of type SDPType" , errPeerConnSDPTypeInvalidValue , sd .Type ),
}
}
nextState , err := func () (SignalingState , error ) {
pc .mu .Lock ()
defer pc .mu .Unlock ()
cur := pc .SignalingState ()
setLocal := stateChangeOpSetLocal
setRemote := stateChangeOpSetRemote
newSDPDoesNotMatchOffer := &rtcerr .InvalidModificationError {Err : errSDPDoesNotMatchOffer }
newSDPDoesNotMatchAnswer := &rtcerr .InvalidModificationError {Err : errSDPDoesNotMatchAnswer }
var nextState SignalingState
var err error
switch op {
case setLocal :
switch sd .Type {
case SDPTypeOffer :
if sd .SDP != pc .lastOffer {
return nextState , newSDPDoesNotMatchOffer
}
nextState , err = checkNextSignalingState (cur , SignalingStateHaveLocalOffer , setLocal , sd .Type )
if err == nil {
pc .pendingLocalDescription = sd
}
case SDPTypeAnswer :
if sd .SDP != pc .lastAnswer {
return nextState , newSDPDoesNotMatchAnswer
}
nextState , err = checkNextSignalingState (cur , SignalingStateStable , setLocal , sd .Type )
if err == nil {
pc .currentLocalDescription = sd
pc .currentRemoteDescription = pc .pendingRemoteDescription
pc .pendingRemoteDescription = nil
pc .pendingLocalDescription = nil
}
case SDPTypeRollback :
nextState , err = checkNextSignalingState (cur , SignalingStateStable , setLocal , sd .Type )
if err == nil {
pc .pendingLocalDescription = nil
}
case SDPTypePranswer :
if sd .SDP != pc .lastAnswer {
return nextState , newSDPDoesNotMatchAnswer
}
nextState , err = checkNextSignalingState (cur , SignalingStateHaveLocalPranswer , setLocal , sd .Type )
if err == nil {
pc .pendingLocalDescription = sd
}
default :
return nextState , &rtcerr .OperationError {Err : fmt .Errorf ("%w: %s(%s)" , errPeerConnStateChangeInvalid , op , sd .Type )}
}
case setRemote :
switch sd .Type {
case SDPTypeOffer :
nextState , err = checkNextSignalingState (cur , SignalingStateHaveRemoteOffer , setRemote , sd .Type )
if err == nil {
pc .pendingRemoteDescription = sd
}
case SDPTypeAnswer :
nextState , err = checkNextSignalingState (cur , SignalingStateStable , setRemote , sd .Type )
if err == nil {
pc .currentRemoteDescription = sd
pc .currentLocalDescription = pc .pendingLocalDescription
pc .pendingRemoteDescription = nil
pc .pendingLocalDescription = nil
}
case SDPTypeRollback :
nextState , err = checkNextSignalingState (cur , SignalingStateStable , setRemote , sd .Type )
if err == nil {
pc .pendingRemoteDescription = nil
}
case SDPTypePranswer :
nextState , err = checkNextSignalingState (cur , SignalingStateHaveRemotePranswer , setRemote , sd .Type )
if err == nil {
pc .pendingRemoteDescription = sd
}
default :
return nextState , &rtcerr .OperationError {Err : fmt .Errorf ("%w: %s(%s)" , errPeerConnStateChangeInvalid , op , sd .Type )}
}
default :
return nextState , &rtcerr .OperationError {Err : fmt .Errorf ("%w: %q" , errPeerConnStateChangeUnhandled , op )}
}
return nextState , err
}()
if err == nil {
pc .signalingState .Set (nextState )
if pc .signalingState .Get () == SignalingStateStable {
pc .isNegotiationNeeded .set (false )
pc .mu .Lock ()
pc .onNegotiationNeeded ()
pc .mu .Unlock ()
}
pc .onSignalingStateChange (nextState )
}
return err
}
func (pc *PeerConnection ) SetLocalDescription (desc SessionDescription ) error {
if pc .isClosed .get () {
return &rtcerr .InvalidStateError {Err : ErrConnectionClosed }
}
haveLocalDescription := pc .currentLocalDescription != nil
if desc .SDP == "" {
switch desc .Type {
case SDPTypeAnswer , SDPTypePranswer :
desc .SDP = pc .lastAnswer
case SDPTypeOffer :
desc .SDP = pc .lastOffer
default :
return &rtcerr .InvalidModificationError {
Err : fmt .Errorf ("%w: %s" , errPeerConnSDPTypeInvalidValueSetLocalDescription , desc .Type ),
}
}
}
desc .parsed = &sdp .SessionDescription {}
if err := desc .parsed .UnmarshalString (desc .SDP ); err != nil {
return err
}
if err := pc .setDescription (&desc , stateChangeOpSetLocal ); err != nil {
return err
}
currentTransceivers := append ([]*RTPTransceiver {}, pc .GetTransceivers ()...)
weAnswer := desc .Type == SDPTypeAnswer
remoteDesc := pc .RemoteDescription ()
if weAnswer && remoteDesc != nil {
_ = setRTPTransceiverCurrentDirection (&desc , currentTransceivers , false )
if err := pc .startRTPSenders (currentTransceivers ); err != nil {
return err
}
pc .configureRTPReceivers (haveLocalDescription , remoteDesc , currentTransceivers )
pc .ops .Enqueue (func () {
pc .startRTP (haveLocalDescription , remoteDesc , currentTransceivers )
})
}
mediaSection , ok := selectCandidateMediaSection (desc .parsed )
if ok {
pc .iceGatherer .setMediaStreamIdentification (mediaSection .SDPMid , mediaSection .SDPMLineIndex )
}
if pc .iceGatherer .State () == ICEGathererStateNew {
return pc .iceGatherer .Gather ()
}
return nil
}
func (pc *PeerConnection ) LocalDescription () *SessionDescription {
if pendingLocalDescription := pc .PendingLocalDescription (); pendingLocalDescription != nil {
return pendingLocalDescription
}
return pc .CurrentLocalDescription ()
}
func (pc *PeerConnection ) SetRemoteDescription (desc SessionDescription ) error {
if pc .isClosed .get () {
return &rtcerr .InvalidStateError {Err : ErrConnectionClosed }
}
isRenegotiation := pc .currentRemoteDescription != nil
if _ , err := desc .Unmarshal (); err != nil {
return err
}
if err := pc .setDescription (&desc , stateChangeOpSetRemote ); err != nil {
return err
}
if err := pc .api .mediaEngine .updateFromRemoteDescription (*desc .parsed ); err != nil {
return err
}
for _ , sender := range pc .GetSenders () {
sender .configureRTXAndFEC ()
}
var transceiver *RTPTransceiver
localTransceivers := append ([]*RTPTransceiver {}, pc .GetTransceivers ()...)
detectedPlanB := descriptionIsPlanB (pc .RemoteDescription (), pc .log )
if pc .configuration .SDPSemantics != SDPSemanticsUnifiedPlan {
detectedPlanB = descriptionPossiblyPlanB (pc .RemoteDescription ())
}
weOffer := desc .Type == SDPTypeAnswer
if !weOffer && !detectedPlanB {
for _ , media := range pc .RemoteDescription ().parsed .MediaDescriptions {
midValue := getMidValue (media )
if midValue == "" {
return errPeerConnRemoteDescriptionWithoutMidValue
}
if media .MediaName .Media == mediaSectionApplication {
continue
}
kind := NewRTPCodecType (media .MediaName .Media )
direction := getPeerDirection (media )
if kind == 0 || direction == RTPTransceiverDirectionUnknown {
continue
}
transceiver , localTransceivers = findByMid (midValue , localTransceivers )
if transceiver == nil {
transceiver , localTransceivers = satisfyTypeAndDirection (kind , direction , localTransceivers )
} else if direction == RTPTransceiverDirectionInactive {
if err := transceiver .Stop (); err != nil {
return err
}
}
switch {
case transceiver == nil :
receiver , err := pc .api .NewRTPReceiver (kind , pc .dtlsTransport )
if err != nil {
return err
}
localDirection := RTPTransceiverDirectionRecvonly
if direction == RTPTransceiverDirectionRecvonly {
localDirection = RTPTransceiverDirectionSendonly
} else if direction == RTPTransceiverDirectionInactive {
localDirection = RTPTransceiverDirectionInactive
}
transceiver = newRTPTransceiver (receiver , nil , localDirection , kind , pc .api )
pc .mu .Lock ()
pc .addRTPTransceiver (transceiver )
pc .mu .Unlock ()
if codecs , err := codecsFromMediaDescription (media ); err == nil {
filteredCodecs := []RTPCodecParameters {}
for _ , codec := range codecs {
if c , matchType := codecParametersFuzzySearch (
codec ,
pc .api .mediaEngine .getCodecsByKind (kind ),
); matchType == codecMatchExact {
codec .PayloadType = c .PayloadType
filteredCodecs = append (filteredCodecs , codec )
}
}
_ = transceiver .SetCodecPreferences (filteredCodecs )
}
case direction == RTPTransceiverDirectionRecvonly :
if transceiver .Direction () == RTPTransceiverDirectionSendrecv {
transceiver .setDirection (RTPTransceiverDirectionSendonly )
} else if transceiver .Direction () == RTPTransceiverDirectionRecvonly {
transceiver .setDirection (RTPTransceiverDirectionInactive )
}
case direction == RTPTransceiverDirectionSendrecv :
if transceiver .Direction () == RTPTransceiverDirectionSendonly {
transceiver .setDirection (RTPTransceiverDirectionSendrecv )
} else if transceiver .Direction () == RTPTransceiverDirectionInactive {
transceiver .setDirection (RTPTransceiverDirectionRecvonly )
}
case direction == RTPTransceiverDirectionSendonly :
if transceiver .Direction () == RTPTransceiverDirectionInactive {
transceiver .setDirection (RTPTransceiverDirectionRecvonly )
}
}
if transceiver .Mid () == "" {
if err := transceiver .SetMid (midValue ); err != nil {
return err
}
}
}
}
iceDetails , err := extractICEDetails (desc .parsed , pc .log )
if err != nil {
return err
}
if isRenegotiation && pc .iceTransport .haveRemoteCredentialsChange (iceDetails .Ufrag , iceDetails .Password ) {
if !weOffer {
if err = pc .iceTransport .restart (); err != nil {
return err
}
}
if err = pc .iceTransport .setRemoteCredentials (iceDetails .Ufrag , iceDetails .Password ); err != nil {
return err
}
}
for i := range iceDetails .Candidates {
if err = pc .iceTransport .AddRemoteCandidate (&iceDetails .Candidates [i ]); err != nil {
return err
}
}
currentTransceivers := append ([]*RTPTransceiver {}, pc .GetTransceivers ()...)
if isRenegotiation {
if weOffer {
_ = setRTPTransceiverCurrentDirection (&desc , currentTransceivers , true )
if err = pc .startRTPSenders (currentTransceivers ); err != nil {
return err
}
pc .configureRTPReceivers (true , &desc , currentTransceivers )
pc .ops .Enqueue (func () {
pc .startRTP (true , &desc , currentTransceivers )
})
}
return nil
}
remoteIsLite := isIceLiteSet (desc .parsed )
fingerprint , fingerprintHash , err := extractFingerprint (desc .parsed )
if err != nil {
return err
}
iceRole := ICERoleControlled
if (weOffer && remoteIsLite == pc .api .settingEngine .candidates .ICELite ) ||
(remoteIsLite && !pc .api .settingEngine .candidates .ICELite ) {
iceRole = ICERoleControlling
}
if weOffer {
_ = setRTPTransceiverCurrentDirection (&desc , currentTransceivers , true )
if err := pc .startRTPSenders (currentTransceivers ); err != nil {
return err
}
pc .configureRTPReceivers (false , &desc , currentTransceivers )
}
pc .ops .Enqueue (func () {
pc .startTransports (
iceRole ,
dtlsRoleFromRemoteSDP (desc .parsed ),
iceDetails .Ufrag ,
iceDetails .Password ,
fingerprint ,
fingerprintHash ,
)
if weOffer {
pc .startRTP (false , &desc , currentTransceivers )
}
})
return nil
}
func (pc *PeerConnection ) configureReceiver (incoming trackDetails , receiver *RTPReceiver ) {
receiver .configureReceive (trackDetailsToRTPReceiveParameters (&incoming ))
for i := range receiver .tracks {
receiver .tracks [i ].track .mu .Lock ()
receiver .tracks [i ].track .id = incoming .id
receiver .tracks [i ].track .streamID = incoming .streamID
receiver .tracks [i ].track .mu .Unlock ()
}
}
func (pc *PeerConnection ) startReceiver (incoming trackDetails , receiver *RTPReceiver ) {
if err := receiver .startReceive (trackDetailsToRTPReceiveParameters (&incoming )); err != nil {
pc .log .Warnf ("RTPReceiver Receive failed %s" , err )
return
}
for _ , track := range receiver .Tracks () {
if track .SSRC () == 0 || track .RID () != "" {
return
}
if pc .api .settingEngine .fireOnTrackBeforeFirstRTP {
pc .onTrack (track , receiver )
return
}
go func (track *TrackRemote ) {
b := make ([]byte , pc .api .settingEngine .getReceiveMTU ())
n , _ , err := track .peek (b )
if err != nil {
pc .log .Warnf ("Could not determine PayloadType for SSRC %d (%s)" , track .SSRC (), err )
return
}
if err = track .checkAndUpdateTrack (b [:n ]); err != nil {
pc .log .Warnf ("Failed to set codec settings for track SSRC %d (%s)" , track .SSRC (), err )
return
}
pc .onTrack (track , receiver )
}(track )
}
}
func setRTPTransceiverCurrentDirection(
answer *SessionDescription ,
currentTransceivers []*RTPTransceiver ,
weOffer bool ,
) error {
currentTransceivers = append ([]*RTPTransceiver {}, currentTransceivers ...)
for _ , media := range answer .parsed .MediaDescriptions {
midValue := getMidValue (media )
if midValue == "" {
return errPeerConnRemoteDescriptionWithoutMidValue
}
if media .MediaName .Media == mediaSectionApplication {
continue
}
var transceiver *RTPTransceiver
transceiver , currentTransceivers = findByMid (midValue , currentTransceivers )
if transceiver == nil {
return fmt .Errorf ("%w: %q" , errPeerConnTranscieverMidNil , midValue )
}
direction := getPeerDirection (media )
if direction == RTPTransceiverDirectionUnknown {
continue
}
if weOffer {
switch direction {
case RTPTransceiverDirectionSendonly :
direction = RTPTransceiverDirectionRecvonly
case RTPTransceiverDirectionRecvonly :
direction = RTPTransceiverDirectionSendonly
default :
}
}
if !weOffer && direction == RTPTransceiverDirectionSendonly && transceiver .Sender () == nil {
direction = RTPTransceiverDirectionInactive
}
transceiver .setCurrentDirection (direction )
}
return nil
}
func runIfNewReceiver(
incomingTrack trackDetails ,
transceivers []*RTPTransceiver ,
callbackFunc func (incomingTrack trackDetails , receiver *RTPReceiver ),
) bool {
for _ , t := range transceivers {
if t .Mid () != incomingTrack .mid {
continue
}
receiver := t .Receiver ()
if (incomingTrack .kind != t .Kind ()) ||
(t .Direction () != RTPTransceiverDirectionRecvonly && t .Direction () != RTPTransceiverDirectionSendrecv ) ||
receiver == nil ||
(receiver .haveReceived ()) {
continue
}
callbackFunc (incomingTrack , receiver )
return true
}
return false
}
func (pc *PeerConnection ) configureRTPReceivers (
isRenegotiation bool ,
remoteDesc *SessionDescription ,
currentTransceivers []*RTPTransceiver ,
) {
incomingTracks := trackDetailsFromSDP (pc .log , remoteDesc .parsed )
if isRenegotiation {
for _ , transceiver := range currentTransceivers {
receiver := transceiver .Receiver ()
if receiver == nil {
continue
}
tracks := transceiver .Receiver ().Tracks ()
if len (tracks ) == 0 {
continue
}
mid := transceiver .Mid ()
receiverNeedsStopped := false
for _ , trackRemote := range tracks {
func (track *TrackRemote ) {
track .mu .Lock ()
defer track .mu .Unlock ()
if track .rid != "" {
if details := trackDetailsForRID (incomingTracks , mid , track .rid ); details != nil {
track .id = details .id
track .streamID = details .streamID
return
}
} else if track .ssrc != 0 {
if details := trackDetailsForSSRC (incomingTracks , track .ssrc ); details != nil {
track .id = details .id
track .streamID = details .streamID
return
}
}
receiverNeedsStopped = true
}(trackRemote )
}
if !receiverNeedsStopped {
continue
}
if err := receiver .Stop (); err != nil {
pc .log .Warnf ("Failed to stop RtpReceiver: %s" , err )
continue
}
receiver , err := pc .api .NewRTPReceiver (receiver .kind , pc .dtlsTransport )
if err != nil {
pc .log .Warnf ("Failed to create new RtpReceiver: %s" , err )
continue
}
transceiver .setReceiver (receiver )
}
}
localTransceivers := append ([]*RTPTransceiver {}, currentTransceivers ...)
filteredTracks := append ([]trackDetails {}, incomingTracks ...)
for _ , incomingTrack := range incomingTracks {
for _ , t := range localTransceivers {
if receiver := t .Receiver (); receiver != nil {
for _ , track := range receiver .Tracks () {
for _ , ssrc := range incomingTrack .ssrcs {
if ssrc == track .SSRC () {
filteredTracks = filterTrackWithSSRC (filteredTracks , track .SSRC ())
}
}
}
}
}
}
for _ , incomingTrack := range filteredTracks {
_ = runIfNewReceiver (incomingTrack , localTransceivers , pc .configureReceiver )
}
}
func (pc *PeerConnection ) startRTPReceivers (remoteDesc *SessionDescription , currentTransceivers []*RTPTransceiver ) {
incomingTracks := trackDetailsFromSDP (pc .log , remoteDesc .parsed )
if len (incomingTracks ) == 0 {
return
}
localTransceivers := append ([]*RTPTransceiver {}, currentTransceivers ...)
unhandledTracks := incomingTracks [:0 ]
for _ , incomingTrack := range incomingTracks {
trackHandled := runIfNewReceiver (incomingTrack , localTransceivers , pc .startReceiver )
if !trackHandled {
unhandledTracks = append (unhandledTracks , incomingTrack )
}
}
remoteIsPlanB := false
switch pc .configuration .SDPSemantics {
case SDPSemanticsPlanB :
remoteIsPlanB = true
case SDPSemanticsUnifiedPlanWithFallback :
remoteIsPlanB = descriptionPossiblyPlanB (pc .RemoteDescription ())
default :
}
if remoteIsPlanB {
for _ , incomingTrack := range unhandledTracks {
t , err := pc .AddTransceiverFromKind (incomingTrack .kind , RTPTransceiverInit {
Direction : RTPTransceiverDirectionSendrecv ,
})
if err != nil {
pc .log .Warnf ("Could not add transceiver for remote SSRC %d: %s" , incomingTrack .ssrcs [0 ], err )
continue
}
pc .configureReceiver (incomingTrack , t .Receiver ())
pc .startReceiver (incomingTrack , t .Receiver ())
}
}
}
func (pc *PeerConnection ) startRTPSenders (currentTransceivers []*RTPTransceiver ) error {
for _ , transceiver := range currentTransceivers {
if sender := transceiver .Sender (); sender != nil && sender .isNegotiated () && !sender .hasSent () {
err := sender .Send (sender .GetParameters ())
if err != nil {
return err
}
}
}
return nil
}
func (pc *PeerConnection ) startSCTP (maxMessageSize uint32 ) {
if err := pc .sctpTransport .Start (SCTPCapabilities {
MaxMessageSize : maxMessageSize ,
}); err != nil {
pc .log .Warnf ("Failed to start SCTP: %s" , err )
if err = pc .sctpTransport .Stop (); err != nil {
pc .log .Warnf ("Failed to stop SCTPTransport: %s" , err )
}
return
}
}
func (pc *PeerConnection ) handleUndeclaredSSRC (
ssrc SSRC ,
mediaSection *sdp .MediaDescription ,
) (handled bool , err error ) {
streamID := ""
id := ""
hasRidAttribute := false
hasSSRCAttribute := false
for _ , a := range mediaSection .Attributes {
switch a .Key {
case sdp .AttrKeyMsid :
if split := strings .Split (a .Value , " " ); len (split ) == 2 {
streamID = split [0 ]
id = split [1 ]
}
case sdp .AttrKeySSRC :
hasSSRCAttribute = true
case sdpAttributeRid :
hasRidAttribute = true
}
}
if hasRidAttribute {
return false , nil
} else if hasSSRCAttribute {
return false , errMediaSectionHasExplictSSRCAttribute
}
incoming := trackDetails {
ssrcs : []SSRC {ssrc },
kind : RTPCodecTypeVideo ,
streamID : streamID ,
id : id ,
}
if mediaSection .MediaName .Media == RTPCodecTypeAudio .String () {
incoming .kind = RTPCodecTypeAudio
}
t , err := pc .AddTransceiverFromKind (incoming .kind , RTPTransceiverInit {
Direction : RTPTransceiverDirectionSendrecv ,
})
if err != nil {
return false , fmt .Errorf ("%w: %d: %s" , errPeerConnRemoteSSRCAddTransceiver , ssrc , err )
}
pc .configureReceiver (incoming , t .Receiver ())
pc .startReceiver (incoming , t .Receiver ())
return true , nil
}
func (pc *PeerConnection ) findMediaSectionByPayloadType (
payloadType PayloadType ,
remoteDescription *SessionDescription ,
) (selectedMediaSection *sdp .MediaDescription , ok bool ) {
for i := range remoteDescription .parsed .MediaDescriptions {
descr := remoteDescription .parsed .MediaDescriptions [i ]
media := descr .MediaName .Media
if !strings .EqualFold (media , "video" ) && !strings .EqualFold (media , "audio" ) {
continue
}
formats := descr .MediaName .Formats
for _ , payloadStr := range formats {
payload , err := strconv .ParseUint (payloadStr , 10 , 8 )
if err != nil {
continue
}
if PayloadType (payload ) == payloadType {
return remoteDescription .parsed .MediaDescriptions [i ], true
}
}
}
return nil , false
}
func (pc *PeerConnection ) handleNonMediaBandwidthProbe () {
nonMediaBandwidthProbe , err := pc .api .NewRTPReceiver (RTPCodecTypeVideo , pc .dtlsTransport )
if err != nil {
pc .log .Errorf ("handleNonMediaBandwidthProbe failed to create RTPReceiver: %v" , err )
return
}
if err = nonMediaBandwidthProbe .Receive (RTPReceiveParameters {
Encodings : []RTPDecodingParameters {{RTPCodingParameters : RTPCodingParameters {}}},
}); err != nil {
pc .log .Errorf ("handleNonMediaBandwidthProbe failed to start RTPReceiver: %v" , err )
return
}
pc .nonMediaBandwidthProbe .Store (nonMediaBandwidthProbe )
b := make ([]byte , pc .api .settingEngine .getReceiveMTU ())
for {
if _, _, err = nonMediaBandwidthProbe .readRTP (b , nonMediaBandwidthProbe .Track ()); err != nil {
pc .log .Tracef ("handleNonMediaBandwidthProbe read exiting: %v" , err )
return
}
}
}
func (pc *PeerConnection ) handleIncomingSSRC (rtpStream io .Reader , ssrc SSRC ) error {
remoteDescription := pc .RemoteDescription ()
if remoteDescription == nil {
return errPeerConnRemoteDescriptionNil
}
for _ , track := range trackDetailsFromSDP (pc .log , remoteDescription .parsed ) {
if track .rtxSsrc != nil && ssrc == *track .rtxSsrc {
return nil
}
if track .fecSsrc != nil && ssrc == *track .fecSsrc {
return nil
}
for _ , trackSsrc := range track .ssrcs {
if ssrc == trackSsrc {
return nil
}
}
}
if len (remoteDescription .parsed .MediaDescriptions ) == 1 {
mediaSection := remoteDescription .parsed .MediaDescriptions [0 ]
if handled , err := pc .handleUndeclaredSSRC (ssrc , mediaSection ); handled || err != nil {
return err
}
}
b := make ([]byte , pc .api .settingEngine .getReceiveMTU ())
i , err := rtpStream .Read (b )
if err != nil {
return err
}
if i < 4 {
return errRTPTooShort
}
payloadType := PayloadType (b [1 ] & 0x7f )
params , err := pc .api .mediaEngine .getRTPParametersByPayloadType (payloadType )
if err != nil {
return err
}
midExtensionID , audioSupported , videoSupported := pc .api .mediaEngine .getHeaderExtensionID (
RTPHeaderExtensionCapability {sdp .SDESMidURI },
)
if !audioSupported && !videoSupported {
mediaSection , ok := pc .findMediaSectionByPayloadType (payloadType , remoteDescription )
if ok {
if ok , err = pc .handleUndeclaredSSRC (ssrc , mediaSection ); ok || err != nil {
return err
}
}
return errPeerConnSimulcastMidRTPExtensionRequired
}
streamIDExtensionID , audioSupported , videoSupported := pc .api .mediaEngine .getHeaderExtensionID (
RTPHeaderExtensionCapability {sdp .SDESRTPStreamIDURI },
)
if !audioSupported && !videoSupported {
return errPeerConnSimulcastStreamIDRTPExtensionRequired
}
repairStreamIDExtensionID , _ , _ := pc .api .mediaEngine .getHeaderExtensionID (
RTPHeaderExtensionCapability {sdp .SDESRepairRTPStreamIDURI },
)
streamInfo := createStreamInfo (
"" ,
ssrc ,
0 , 0 ,
params .Codecs [0 ].PayloadType ,
0 , 0 ,
params .Codecs [0 ].RTPCodecCapability ,
params .HeaderExtensions ,
)
readStream , interceptor , rtcpReadStream , rtcpInterceptor , err := pc .dtlsTransport .streamsForSSRC (ssrc , *streamInfo )
if err != nil {
return err
}
var mid , rid , rsid string
var paddingOnly bool
for readCount := 0 ; readCount <= simulcastProbeCount ; readCount ++ {
if mid == "" || (rid == "" && rsid == "" ) {
if paddingOnly {
readCount --
}
i , _ , err := interceptor .Read (b , nil )
if err != nil {
return err
}
if _, paddingOnly , err = handleUnknownRTPPacket (
b [:i ], uint8 (midExtensionID ),
uint8 (streamIDExtensionID ),
uint8 (repairStreamIDExtensionID ),
&mid ,
&rid ,
&rsid ,
); err != nil {
return err
}
continue
}
for _ , t := range pc .GetTransceivers () {
receiver := t .Receiver ()
if t .Mid () != mid || receiver == nil {
continue
}
if rsid != "" {
receiver .mu .Lock ()
defer receiver .mu .Unlock ()
return receiver .receiveForRtx (SSRC (0 ), rsid , streamInfo , readStream , interceptor , rtcpReadStream , rtcpInterceptor )
}
track , err := receiver .receiveForRid (
rid ,
params ,
streamInfo ,
readStream ,
interceptor ,
rtcpReadStream ,
rtcpInterceptor ,
)
if err != nil {
return err
}
pc .onTrack (track , receiver )
return nil
}
}
pc .api .interceptor .UnbindRemoteStream (streamInfo )
return errPeerConnSimulcastIncomingSSRCFailed
}
func (pc *PeerConnection ) undeclaredMediaProcessor () {
go pc .undeclaredRTPMediaProcessor ()
go pc .undeclaredRTCPMediaProcessor ()
}
func (pc *PeerConnection ) undeclaredRTPMediaProcessor () {
var simulcastRoutineCount uint64
for {
srtpSession , err := pc .dtlsTransport .getSRTPSession ()
if err != nil {
pc .log .Warnf ("undeclaredMediaProcessor failed to open SrtpSession: %v" , err )
return
}
srtcpSession , err := pc .dtlsTransport .getSRTCPSession ()
if err != nil {
pc .log .Warnf ("undeclaredMediaProcessor failed to open SrtcpSession: %v" , err )
return
}
srtpReadStream , ssrc , err := srtpSession .AcceptStream ()
if err != nil {
pc .log .Warnf ("Failed to accept RTP %v" , err )
return
}
srtcpReadStream , err := srtcpSession .OpenReadStream (ssrc )
if err != nil {
pc .log .Warnf ("Failed to open RTCP stream for %d: %v" , ssrc , err )
return
}
if pc .isClosed .get () {
if err = srtpReadStream .Close (); err != nil {
pc .log .Warnf ("Failed to close RTP stream %v" , err )
}
if err = srtcpReadStream .Close (); err != nil {
pc .log .Warnf ("Failed to close RTCP stream %v" , err )
}
continue
}
pc .dtlsTransport .storeSimulcastStream (srtpReadStream , srtcpReadStream )
if ssrc == 0 {
go pc .handleNonMediaBandwidthProbe ()
continue
}
if atomic .AddUint64 (&simulcastRoutineCount , 1 ) >= simulcastMaxProbeRoutines {
atomic .AddUint64 (&simulcastRoutineCount , ^uint64 (0 ))
pc .log .Warn (ErrSimulcastProbeOverflow .Error())
continue
}
go func (rtpStream io .Reader , ssrc SSRC ) {
if err := pc .handleIncomingSSRC (rtpStream , ssrc ); err != nil {
pc .log .Errorf (incomingUnhandledRTPSsrc , ssrc , err )
}
atomic .AddUint64 (&simulcastRoutineCount , ^uint64 (0 ))
}(srtpReadStream , SSRC (ssrc ))
}
}
func (pc *PeerConnection ) undeclaredRTCPMediaProcessor () {
var unhandledStreams []*srtp .ReadStreamSRTCP
defer func () {
for _ , s := range unhandledStreams {
_ = s .Close ()
}
}()
for {
srtcpSession , err := pc .dtlsTransport .getSRTCPSession ()
if err != nil {
pc .log .Warnf ("undeclaredMediaProcessor failed to open SrtcpSession: %v" , err )
return
}
stream , ssrc , err := srtcpSession .AcceptStream ()
if err != nil {
pc .log .Warnf ("Failed to accept RTCP %v" , err )
return
}
pc .log .Warnf ("Incoming unhandled RTCP ssrc(%d), OnTrack will not be fired" , ssrc )
unhandledStreams = append (unhandledStreams , stream )
}
}
func (pc *PeerConnection ) RemoteDescription () *SessionDescription {
pc .mu .RLock ()
defer pc .mu .RUnlock ()
if pc .pendingRemoteDescription != nil {
return pc .pendingRemoteDescription
}
return pc .currentRemoteDescription
}
func (pc *PeerConnection ) AddICECandidate (candidate ICECandidateInit ) error {
remoteDesc := pc .RemoteDescription ()
if remoteDesc == nil {
return &rtcerr .InvalidStateError {Err : ErrNoRemoteDescription }
}
candidateValue := strings .TrimPrefix (candidate .Candidate , "candidate:" )
if candidateValue == "" {
return pc .iceTransport .AddRemoteCandidate (nil )
}
cand , err := ice .UnmarshalCandidate (candidateValue )
if err != nil {
if errors .Is (err , ice .ErrUnknownCandidateTyp ) || errors .Is (err , ice .ErrDetermineNetworkType ) {
pc .log .Warnf ("Discarding remote candidate: %s" , err )
return nil
}
return err
}
if ufrag , ok := cand .GetExtension ("ufrag" ); ok {
if !pc .descriptionContainsUfrag (remoteDesc .parsed , ufrag .Value ) {
pc .log .Errorf ("dropping candidate with ufrag %s because it doesn't match the current ufrags" , ufrag .Value )
return nil
}
}
c , err := newICECandidateFromICE (cand , "" , 0 )
if err != nil {
return err
}
return pc .iceTransport .AddRemoteCandidate (&c )
}
func (pc *PeerConnection ) descriptionContainsUfrag (sdp *sdp .SessionDescription , matchUfrag string ) bool {
ufrag , ok := sdp .Attribute ("ice-ufrag" )
if ok && ufrag == matchUfrag {
return true
}
for _ , media := range sdp .MediaDescriptions {
ufrag , ok := media .Attribute ("ice-ufrag" )
if ok && ufrag == matchUfrag {
return true
}
}
return false
}
func (pc *PeerConnection ) ICEConnectionState () ICEConnectionState {
if state , ok := pc .iceConnectionState .Load ().(ICEConnectionState ); ok {
return state
}
return ICEConnectionState (0 )
}
func (pc *PeerConnection ) GetSenders () (result []*RTPSender ) {
pc .mu .Lock ()
defer pc .mu .Unlock ()
for _ , transceiver := range pc .rtpTransceivers {
if sender := transceiver .Sender (); sender != nil {
result = append (result , sender )
}
}
return result
}
func (pc *PeerConnection ) GetReceivers () (receivers []*RTPReceiver ) {
pc .mu .Lock ()
defer pc .mu .Unlock ()
for _ , transceiver := range pc .rtpTransceivers {
if receiver := transceiver .Receiver (); receiver != nil {
receivers = append (receivers , receiver )
}
}
return
}
func (pc *PeerConnection ) GetTransceivers () []*RTPTransceiver {
pc .mu .Lock ()
defer pc .mu .Unlock ()
return pc .rtpTransceivers
}
func (pc *PeerConnection ) AddTrack (track TrackLocal ) (*RTPSender , error ) {
if pc .isClosed .get () {
return nil , &rtcerr .InvalidStateError {Err : ErrConnectionClosed }
}
pc .mu .Lock ()
defer pc .mu .Unlock ()
for _ , transceiver := range pc .rtpTransceivers {
currentDirection := transceiver .getCurrentDirection ()
if transceiver .kind == track .Kind () && transceiver .Sender () == nil &&
!(currentDirection == RTPTransceiverDirectionSendrecv || currentDirection == RTPTransceiverDirectionSendonly ) {
sender , err := pc .api .NewRTPSender (track , pc .dtlsTransport )
if err == nil {
err = transceiver .SetSender (sender , track )
if err != nil {
_ = sender .Stop ()
transceiver .setSender (nil )
}
}
if err != nil {
return nil , err
}
pc .onNegotiationNeeded ()
return sender , nil
}
}
transceiver , err := pc .newTransceiverFromTrack (RTPTransceiverDirectionSendrecv , track )
if err != nil {
return nil , err
}
pc .addRTPTransceiver (transceiver )
return transceiver .Sender (), nil
}
func (pc *PeerConnection ) RemoveTrack (sender *RTPSender ) (err error ) {
if pc .isClosed .get () {
return &rtcerr .InvalidStateError {Err : ErrConnectionClosed }
}
var transceiver *RTPTransceiver
pc .mu .Lock ()
defer pc .mu .Unlock ()
for _ , t := range pc .rtpTransceivers {
if t .Sender () == sender {
transceiver = t
break
}
}
if transceiver == nil {
return &rtcerr .InvalidAccessError {Err : ErrSenderNotCreatedByConnection }
} else if err = sender .Stop (); err == nil {
err = transceiver .setSendingTrack (nil )
if err == nil {
pc .onNegotiationNeeded ()
}
}
return
}
func (pc *PeerConnection ) newTransceiverFromTrack (
direction RTPTransceiverDirection ,
track TrackLocal ,
init ...RTPTransceiverInit ,
) (t *RTPTransceiver , err error ) {
var (
receiver *RTPReceiver
sender *RTPSender
)
switch direction {
case RTPTransceiverDirectionSendrecv :
receiver , err = pc .api .NewRTPReceiver (track .Kind (), pc .dtlsTransport )
if err != nil {
return t , err
}
sender , err = pc .api .NewRTPSender (track , pc .dtlsTransport )
case RTPTransceiverDirectionSendonly :
sender , err = pc .api .NewRTPSender (track , pc .dtlsTransport )
default :
err = errPeerConnAddTransceiverFromTrackSupport
}
if err != nil {
return t , err
}
if sender != nil && len (sender .trackEncodings ) == 1 &&
len (init ) == 1 && len (init [0 ].SendEncodings ) == 1 && init [0 ].SendEncodings [0 ].SSRC != 0 {
sender .trackEncodings [0 ].ssrc = init [0 ].SendEncodings [0 ].SSRC
}
return newRTPTransceiver (receiver , sender , direction , track .Kind (), pc .api ), nil
}
func (pc *PeerConnection ) AddTransceiverFromKind (
kind RTPCodecType ,
init ...RTPTransceiverInit ,
) (t *RTPTransceiver , err error ) {
if pc .isClosed .get () {
return nil , &rtcerr .InvalidStateError {Err : ErrConnectionClosed }
}
direction := RTPTransceiverDirectionSendrecv
if len (init ) > 1 {
return nil , errPeerConnAddTransceiverFromKindOnlyAcceptsOne
} else if len (init ) == 1 {
direction = init [0 ].Direction
}
switch direction {
case RTPTransceiverDirectionSendonly , RTPTransceiverDirectionSendrecv :
codecs := pc .api .mediaEngine .getCodecsByKind (kind )
if len (codecs ) == 0 {
return nil , ErrNoCodecsAvailable
}
track , err := NewTrackLocalStaticSample (codecs [0 ].RTPCodecCapability , util .MathRandAlpha (16 ), util .MathRandAlpha (16 ))
if err != nil {
return nil , err
}
t , err = pc .newTransceiverFromTrack (direction , track , init ...)
if err != nil {
return nil , err
}
case RTPTransceiverDirectionRecvonly :
receiver , err := pc .api .NewRTPReceiver (kind , pc .dtlsTransport )
if err != nil {
return nil , err
}
t = newRTPTransceiver (receiver , nil , RTPTransceiverDirectionRecvonly , kind , pc .api )
default :
return nil , errPeerConnAddTransceiverFromKindSupport
}
pc .mu .Lock ()
pc .addRTPTransceiver (t )
pc .mu .Unlock ()
return t , nil
}
func (pc *PeerConnection ) AddTransceiverFromTrack (
track TrackLocal ,
init ...RTPTransceiverInit ,
) (t *RTPTransceiver , err error ) {
if pc .isClosed .get () {
return nil , &rtcerr .InvalidStateError {Err : ErrConnectionClosed }
}
direction := RTPTransceiverDirectionSendrecv
if len (init ) > 1 {
return nil , errPeerConnAddTransceiverFromTrackOnlyAcceptsOne
} else if len (init ) == 1 {
direction = init [0 ].Direction
}
t , err = pc .newTransceiverFromTrack (direction , track , init ...)
if err == nil {
pc .mu .Lock ()
pc .addRTPTransceiver (t )
pc .mu .Unlock ()
}
return
}
func (pc *PeerConnection ) CreateDataChannel (label string , options *DataChannelInit ) (*DataChannel , error ) {
if pc .isClosed .get () {
return nil , &rtcerr .InvalidStateError {Err : ErrConnectionClosed }
}
params := &DataChannelParameters {
Label : label ,
Ordered : true ,
}
if options != nil {
params .ID = options .ID
}
if options != nil {
if options .Ordered != nil {
params .Ordered = *options .Ordered
}
if options .MaxPacketLifeTime != nil {
params .MaxPacketLifeTime = options .MaxPacketLifeTime
}
if options .MaxRetransmits != nil {
params .MaxRetransmits = options .MaxRetransmits
}
if options .Protocol != nil {
params .Protocol = *options .Protocol
}
if len (params .Protocol ) > 65535 {
return nil , &rtcerr .TypeError {Err : ErrProtocolTooLarge }
}
if options .Negotiated != nil {
params .Negotiated = *options .Negotiated
}
}
dataChannel , err := pc .api .newDataChannel (params , nil , pc .log )
if err != nil {
return nil , err
}
if dataChannel .maxPacketLifeTime != nil && dataChannel .maxRetransmits != nil {
return nil , &rtcerr .TypeError {Err : ErrRetransmitsOrPacketLifeTime }
}
pc .sctpTransport .lock .Lock ()
pc .sctpTransport .dataChannels = append (pc .sctpTransport .dataChannels , dataChannel )
if dataChannel .ID () != nil {
pc .sctpTransport .dataChannelIDsUsed [*dataChannel .ID ()] = struct {}{}
}
pc .sctpTransport .dataChannelsRequested ++
pc .sctpTransport .lock .Unlock ()
if pc .sctpTransport .State () == SCTPTransportStateConnected {
if err = dataChannel .open (pc .sctpTransport ); err != nil {
return nil , err
}
}
pc .mu .Lock ()
pc .onNegotiationNeeded ()
pc .mu .Unlock ()
return dataChannel , nil
}
func (pc *PeerConnection ) SetIdentityProvider (string ) error {
return errPeerConnSetIdentityProviderNotImplemented
}
func (pc *PeerConnection ) WriteRTCP (pkts []rtcp .Packet ) error {
_ , err := pc .interceptorRTCPWriter .Write (pkts , make (interceptor .Attributes ))
return err
}
func (pc *PeerConnection ) writeRTCP (pkts []rtcp .Packet , _ interceptor .Attributes ) (int , error ) {
return pc .dtlsTransport .WriteRTCP (pkts )
}
func (pc *PeerConnection ) Close () error {
return pc .close (false )
}
func (pc *PeerConnection ) GracefulClose () error {
return pc .close (true )
}
func (pc *PeerConnection ) close (shouldGracefullyClose bool ) error {
pc .mu .Lock ()
isAlreadyClosingOrClosed := pc .isClosed .swap (true )
isAlreadyGracefullyClosingOrClosed := pc .isGracefullyClosingOrClosed
if shouldGracefullyClose && !isAlreadyGracefullyClosingOrClosed {
pc .isGracefullyClosingOrClosed = true
}
pc .mu .Unlock ()
if isAlreadyClosingOrClosed {
if !shouldGracefullyClose {
return nil
}
if isAlreadyGracefullyClosingOrClosed {
<-pc .isGracefulCloseDone
return nil
}
<-pc .isCloseDone
} else {
defer close (pc .isCloseDone )
}
if shouldGracefullyClose {
defer close (pc .isGracefulCloseDone )
}
closeErrs := make ([]error , 4 )
doGracefulCloseOps := func () []error {
if !shouldGracefullyClose {
return nil
}
var gracefulCloseErrors []error
if pc .iceTransport != nil {
gracefulCloseErrors = append (gracefulCloseErrors , pc .iceTransport .GracefulStop ())
}
pc .ops .GracefulClose ()
pc .sctpTransport .lock .Lock ()
for _ , d := range pc .sctpTransport .dataChannels {
gracefulCloseErrors = append (gracefulCloseErrors , d .GracefulClose ())
}
pc .sctpTransport .lock .Unlock ()
return gracefulCloseErrors
}
if isAlreadyClosingOrClosed {
return util .FlattenErrs (doGracefulCloseOps ())
}
pc .signalingState .Set (SignalingStateClosed )
pc .mu .Lock ()
for _ , t := range pc .rtpTransceivers {
closeErrs = append (closeErrs , t .Stop ())
}
if nonMediaBandwidthProbe , ok := pc .nonMediaBandwidthProbe .Load ().(*RTPReceiver ); ok {
closeErrs = append (closeErrs , nonMediaBandwidthProbe .Stop ())
}
pc .mu .Unlock ()
pc .sctpTransport .lock .Lock ()
for _ , d := range pc .sctpTransport .dataChannels {
d .setReadyState (DataChannelStateClosed )
}
pc .sctpTransport .lock .Unlock ()
if pc .sctpTransport != nil {
closeErrs = append (closeErrs , pc .sctpTransport .Stop ())
}
closeErrs = append (closeErrs , pc .dtlsTransport .Stop ())
if pc .iceTransport != nil && !shouldGracefullyClose {
closeErrs = append (closeErrs , pc .iceTransport .Stop ())
}
pc .updateConnectionState (pc .ICEConnectionState (), pc .dtlsTransport .State ())
closeErrs = append (closeErrs , doGracefulCloseOps ()...)
closeErrs = append (closeErrs , pc .api .interceptor .Close ())
return util .FlattenErrs (closeErrs )
}
func (pc *PeerConnection ) addRTPTransceiver (t *RTPTransceiver ) {
pc .rtpTransceivers = append (pc .rtpTransceivers , t )
pc .onNegotiationNeeded ()
}
func (pc *PeerConnection ) CurrentLocalDescription () *SessionDescription {
pc .mu .Lock ()
defer pc .mu .Unlock ()
localDescription := pc .currentLocalDescription
iceGather := pc .iceGatherer
iceGatheringState := pc .ICEGatheringState ()
return populateLocalCandidates (localDescription , iceGather , iceGatheringState )
}
func (pc *PeerConnection ) PendingLocalDescription () *SessionDescription {
pc .mu .Lock ()
defer pc .mu .Unlock ()
localDescription := pc .pendingLocalDescription
iceGather := pc .iceGatherer
iceGatheringState := pc .ICEGatheringState ()
return populateLocalCandidates (localDescription , iceGather , iceGatheringState )
}
func (pc *PeerConnection ) CurrentRemoteDescription () *SessionDescription {
pc .mu .RLock ()
defer pc .mu .RUnlock ()
return pc .currentRemoteDescription
}
func (pc *PeerConnection ) PendingRemoteDescription () *SessionDescription {
pc .mu .RLock ()
defer pc .mu .RUnlock ()
return pc .pendingRemoteDescription
}
func (pc *PeerConnection ) SignalingState () SignalingState {
return pc .signalingState .Get ()
}
func (pc *PeerConnection ) ICEGatheringState () ICEGatheringState {
if pc .iceGatherer == nil {
return ICEGatheringStateNew
}
switch pc .iceGatherer .State () {
case ICEGathererStateNew :
return ICEGatheringStateNew
case ICEGathererStateGathering :
return ICEGatheringStateGathering
default :
return ICEGatheringStateComplete
}
}
func (pc *PeerConnection ) ConnectionState () PeerConnectionState {
if state , ok := pc .connectionState .Load ().(PeerConnectionState ); ok {
return state
}
return PeerConnectionState (0 )
}
func (pc *PeerConnection ) GetStats () StatsReport {
var (
dataChannelsAccepted uint32
dataChannelsClosed uint32
dataChannelsOpened uint32
dataChannelsRequested uint32
)
statsCollector := newStatsReportCollector ()
statsCollector .Collecting ()
pc .mu .Lock ()
if pc .iceGatherer != nil {
pc .iceGatherer .collectStats (statsCollector )
}
if pc .iceTransport != nil {
pc .iceTransport .collectStats (statsCollector )
}
pc .sctpTransport .lock .Lock ()
dataChannels := append ([]*DataChannel {}, pc .sctpTransport .dataChannels ...)
dataChannelsAccepted = pc .sctpTransport .dataChannelsAccepted
dataChannelsOpened = pc .sctpTransport .dataChannelsOpened
dataChannelsRequested = pc .sctpTransport .dataChannelsRequested
pc .sctpTransport .lock .Unlock ()
for _ , d := range dataChannels {
state := d .ReadyState ()
if state != DataChannelStateConnecting && state != DataChannelStateOpen {
dataChannelsClosed ++
}
d .collectStats (statsCollector )
}
pc .sctpTransport .collectStats (statsCollector )
stats := PeerConnectionStats {
Timestamp : statsTimestampNow (),
Type : StatsTypePeerConnection ,
ID : pc .statsID ,
DataChannelsAccepted : dataChannelsAccepted ,
DataChannelsClosed : dataChannelsClosed ,
DataChannelsOpened : dataChannelsOpened ,
DataChannelsRequested : dataChannelsRequested ,
}
statsCollector .Collect (stats .ID , stats )
certificates := pc .configuration .Certificates
for _ , certificate := range certificates {
if err := certificate .collectStats (statsCollector ); err != nil {
continue
}
}
pc .mu .Unlock ()
pc .api .mediaEngine .collectStats (statsCollector )
return statsCollector .Ready ()
}
func (pc *PeerConnection ) startTransports (
iceRole ICERole ,
dtlsRole DTLSRole ,
remoteUfrag , remotePwd , fingerprint , fingerprintHash string ,
) {
err := pc .iceTransport .Start (
pc .iceGatherer ,
ICEParameters {
UsernameFragment : remoteUfrag ,
Password : remotePwd ,
ICELite : false ,
},
&iceRole ,
)
if err != nil {
pc .log .Warnf ("Failed to start manager: %s" , err )
return
}
pc .dtlsTransport .internalOnCloseHandler = func () {
if pc .isClosed .get () || pc .api .settingEngine .disableCloseByDTLS {
return
}
pc .log .Info ("Closing PeerConnection from DTLS CloseNotify" )
go func () {
if pcClosErr := pc .Close (); pcClosErr != nil {
pc .log .Warnf ("Failed to close PeerConnection from DTLS CloseNotify: %s" , pcClosErr )
}
}()
}
err = pc .dtlsTransport .Start (DTLSParameters {
Role : dtlsRole ,
Fingerprints : []DTLSFingerprint {{Algorithm : fingerprintHash , Value : fingerprint }},
})
pc .updateConnectionState (pc .ICEConnectionState (), pc .dtlsTransport .State ())
if err != nil {
pc .log .Warnf ("Failed to start manager: %s" , err )
return
}
}
func (pc *PeerConnection ) startRTP (
isRenegotiation bool ,
remoteDesc *SessionDescription ,
currentTransceivers []*RTPTransceiver ,
) {
if !isRenegotiation {
pc .undeclaredMediaProcessor ()
}
pc .startRTPReceivers (remoteDesc , currentTransceivers )
if d := haveDataChannel (remoteDesc ); d != nil {
pc .startSCTP (getMaxMessageSize (d ))
}
}
func (pc *PeerConnection ) generateUnmatchedSDP (
transceivers []*RTPTransceiver ,
useIdentity bool ,
) (*sdp .SessionDescription , error ) {
desc , err := sdp .NewJSEPSessionDescription (useIdentity )
if err != nil {
return nil , err
}
desc .Attributes = append (desc .Attributes , sdp .Attribute {Key : sdp .AttrKeyMsidSemantic , Value : "WMS *" })
iceParams , err := pc .iceGatherer .GetLocalParameters ()
if err != nil {
return nil , err
}
candidates , err := pc .iceGatherer .GetLocalCandidates ()
if err != nil {
return nil , err
}
isPlanB := pc .configuration .SDPSemantics == SDPSemanticsPlanB
mediaSections := []mediaSection {}
pc .sctpTransport .lock .Lock ()
defer pc .sctpTransport .lock .Unlock ()
if isPlanB {
video := make ([]*RTPTransceiver , 0 )
audio := make ([]*RTPTransceiver , 0 )
for _ , t := range transceivers {
if t .kind == RTPCodecTypeVideo {
video = append (video , t )
} else if t .kind == RTPCodecTypeAudio {
audio = append (audio , t )
}
if sender := t .Sender (); sender != nil {
sender .setNegotiated ()
}
}
if len (video ) > 0 {
mediaSections = append (mediaSections , mediaSection {id : "video" , transceivers : video })
}
if len (audio ) > 0 {
mediaSections = append (mediaSections , mediaSection {id : "audio" , transceivers : audio })
}
if pc .sctpTransport .dataChannelsRequested != 0 {
mediaSections = append (mediaSections , mediaSection {id : "data" , data : true })
}
} else {
for _ , t := range transceivers {
if sender := t .Sender (); sender != nil {
sender .setNegotiated ()
}
mediaSections = append (mediaSections , mediaSection {id : t .Mid (), transceivers : []*RTPTransceiver {t }})
}
if pc .sctpTransport .dataChannelsRequested != 0 {
mediaSections = append (mediaSections , mediaSection {id : strconv .Itoa (len (mediaSections )), data : true })
}
}
dtlsFingerprints , err := pc .configuration .Certificates [0 ].GetFingerprints ()
if err != nil {
return nil , err
}
return populateSDP (
desc ,
isPlanB ,
dtlsFingerprints ,
pc .api .settingEngine .sdpMediaLevelFingerprints ,
pc .api .settingEngine .candidates .ICELite ,
true ,
pc .api .mediaEngine ,
connectionRoleFromDtlsRole (defaultDtlsRoleOffer ),
candidates ,
iceParams ,
mediaSections ,
pc .ICEGatheringState (),
nil ,
pc .api .settingEngine .getSCTPMaxMessageSize (),
)
}
func (pc *PeerConnection ) generateMatchedSDP (
transceivers []*RTPTransceiver ,
useIdentity , includeUnmatched bool ,
connectionRole sdp .ConnectionRole ,
) (*sdp .SessionDescription , error ) {
desc , err := sdp .NewJSEPSessionDescription (useIdentity )
if err != nil {
return nil , err
}
desc .Attributes = append (desc .Attributes , sdp .Attribute {Key : sdp .AttrKeyMsidSemantic , Value : "WMS *" })
iceParams , err := pc .iceGatherer .GetLocalParameters ()
if err != nil {
return nil , err
}
candidates , err := pc .iceGatherer .GetLocalCandidates ()
if err != nil {
return nil , err
}
var transceiver *RTPTransceiver
remoteDescription := pc .currentRemoteDescription
if pc .pendingRemoteDescription != nil {
remoteDescription = pc .pendingRemoteDescription
}
isExtmapAllowMixed := isExtMapAllowMixedSet (remoteDescription .parsed )
localTransceivers := append ([]*RTPTransceiver {}, transceivers ...)
detectedPlanB := descriptionIsPlanB (remoteDescription , pc .log )
if pc .configuration .SDPSemantics != SDPSemanticsUnifiedPlan {
detectedPlanB = descriptionPossiblyPlanB (remoteDescription )
}
mediaSections := []mediaSection {}
alreadyHaveApplicationMediaSection := false
for _ , media := range remoteDescription .parsed .MediaDescriptions {
midValue := getMidValue (media )
if midValue == "" {
return nil , errPeerConnRemoteDescriptionWithoutMidValue
}
if media .MediaName .Media == mediaSectionApplication {
mediaSections = append (mediaSections , mediaSection {id : midValue , data : true })
alreadyHaveApplicationMediaSection = true
continue
}
kind := NewRTPCodecType (media .MediaName .Media )
direction := getPeerDirection (media )
if kind == 0 || direction == RTPTransceiverDirectionUnknown {
continue
}
sdpSemantics := pc .configuration .SDPSemantics
switch {
case sdpSemantics == SDPSemanticsPlanB || sdpSemantics == SDPSemanticsUnifiedPlanWithFallback && detectedPlanB :
if !detectedPlanB {
return nil , &rtcerr .TypeError {
Err : fmt .Errorf ("%w: Expected PlanB, but RemoteDescription is UnifiedPlan" , ErrIncorrectSDPSemantics ),
}
}
mediaTransceivers := []*RTPTransceiver {}
for {
transceiver , localTransceivers = satisfyTypeAndDirection (kind , direction , localTransceivers )
if transceiver == nil {
if len (mediaTransceivers ) == 0 {
transceiver = &RTPTransceiver {kind : kind , api : pc .api , codecs : pc .api .mediaEngine .getCodecsByKind (kind )}
transceiver .setDirection (RTPTransceiverDirectionInactive )
mediaTransceivers = append (mediaTransceivers , transceiver )
}
break
}
if sender := transceiver .Sender (); sender != nil {
sender .setNegotiated ()
}
mediaTransceivers = append (mediaTransceivers , transceiver )
}
mediaSections = append (mediaSections , mediaSection {id : midValue , transceivers : mediaTransceivers })
case sdpSemantics == SDPSemanticsUnifiedPlan || sdpSemantics == SDPSemanticsUnifiedPlanWithFallback :
if detectedPlanB {
return nil , &rtcerr .TypeError {
Err : fmt .Errorf (
"%w: Expected UnifiedPlan, but RemoteDescription is PlanB" ,
ErrIncorrectSDPSemantics ,
),
}
}
transceiver , localTransceivers = findByMid (midValue , localTransceivers )
if transceiver == nil {
return nil , fmt .Errorf ("%w: %q" , errPeerConnTranscieverMidNil , midValue )
}
if sender := transceiver .Sender (); sender != nil {
sender .setNegotiated ()
}
mediaTransceivers := []*RTPTransceiver {transceiver }
extensions , _ := rtpExtensionsFromMediaDescription (media )
mediaSections = append (
mediaSections ,
mediaSection {id : midValue , transceivers : mediaTransceivers , matchExtensions : extensions , rids : getRids (media )},
)
}
}
pc .sctpTransport .lock .Lock ()
defer pc .sctpTransport .lock .Unlock ()
var bundleGroup *string
if includeUnmatched {
if !detectedPlanB {
for _ , t := range localTransceivers {
if sender := t .Sender (); sender != nil {
sender .setNegotiated ()
}
mediaSections = append (mediaSections , mediaSection {id : t .Mid (), transceivers : []*RTPTransceiver {t }})
}
}
if pc .sctpTransport .dataChannelsRequested != 0 && !alreadyHaveApplicationMediaSection {
if detectedPlanB {
mediaSections = append (mediaSections , mediaSection {id : "data" , data : true })
} else {
mediaSections = append (mediaSections , mediaSection {id : strconv .Itoa (len (mediaSections )), data : true })
}
}
} else if remoteDescription != nil {
groupValue , _ := remoteDescription .parsed .Attribute (sdp .AttrKeyGroup )
groupValue = strings .TrimLeft (groupValue , "BUNDLE" )
bundleGroup = &groupValue
}
if pc .configuration .SDPSemantics == SDPSemanticsUnifiedPlanWithFallback && detectedPlanB {
pc .log .Info ("Plan-B Offer detected; responding with Plan-B Answer" )
}
dtlsFingerprints , err := pc .configuration .Certificates [0 ].GetFingerprints ()
if err != nil {
return nil , err
}
return populateSDP (
desc ,
detectedPlanB ,
dtlsFingerprints ,
pc .api .settingEngine .sdpMediaLevelFingerprints ,
pc .api .settingEngine .candidates .ICELite ,
isExtmapAllowMixed ,
pc .api .mediaEngine ,
connectionRole ,
candidates ,
iceParams ,
mediaSections ,
pc .ICEGatheringState (),
bundleGroup ,
pc .api .settingEngine .getSCTPMaxMessageSize (),
)
}
func (pc *PeerConnection ) setGatherCompleteHandler (handler func ()) {
pc .iceGatherer .onGatheringCompleteHandler .Store (handler )
}
func (pc *PeerConnection ) SCTP () *SCTPTransport {
return pc .sctpTransport
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .