package turn
import (
b64 "encoding/base64"
"fmt"
"math"
"net"
"sync"
"time"
"github.com/pion/logging"
"github.com/pion/stun/v3"
"github.com/pion/transport/v3"
"github.com/pion/transport/v3/stdnet"
"github.com/pion/turn/v4/internal/client"
"github.com/pion/turn/v4/internal/proto"
)
const (
defaultRTO = 200 * time .Millisecond
maxRtxCount = 7
maxDataBufferSize = math .MaxUint16
)
type ClientConfig struct {
STUNServerAddr string
TURNServerAddr string
Username string
Password string
Realm string
Software string
RTO time .Duration
Conn net .PacketConn
Net transport .Net
LoggerFactory logging .LoggerFactory
}
type Client struct {
conn net .PacketConn
net transport .Net
stunServerAddr net .Addr
turnServerAddr net .Addr
username stun .Username
password string
realm stun .Realm
integrity stun .MessageIntegrity
software stun .Software
trMap *client .TransactionMap
rto time .Duration
relayedConn *client .UDPConn
tcpAllocation *client .TCPAllocation
allocTryLock client .TryLock
listenTryLock client .TryLock
mutex sync .RWMutex
mutexTrMap sync .Mutex
log logging .LeveledLogger
}
func NewClient (config *ClientConfig ) (*Client , error ) {
loggerFactory := config .LoggerFactory
if loggerFactory == nil {
loggerFactory = logging .NewDefaultLoggerFactory ()
}
log := loggerFactory .NewLogger ("turnc" )
if config .Conn == nil {
return nil , errNilConn
}
rto := defaultRTO
if config .RTO > 0 {
rto = config .RTO
}
if config .Net == nil {
n , err := stdnet .NewNet ()
if err != nil {
return nil , err
}
config .Net = n
}
var stunServ , turnServ net .Addr
var err error
if len (config .STUNServerAddr ) > 0 {
stunServ , err = config .Net .ResolveUDPAddr ("udp4" , config .STUNServerAddr )
if err != nil {
return nil , err
}
log .Debugf ("Resolved STUN server %s to %s" , config .STUNServerAddr , stunServ )
}
if len (config .TURNServerAddr ) > 0 {
turnServ , err = config .Net .ResolveUDPAddr ("udp4" , config .TURNServerAddr )
if err != nil {
return nil , err
}
log .Debugf ("Resolved TURN server %s to %s" , config .TURNServerAddr , turnServ )
}
client := &Client {
conn : config .Conn ,
stunServerAddr : stunServ ,
turnServerAddr : turnServ ,
username : stun .NewUsername (config .Username ),
password : config .Password ,
realm : stun .NewRealm (config .Realm ),
software : stun .NewSoftware (config .Software ),
trMap : client .NewTransactionMap (),
net : config .Net ,
rto : rto ,
log : log ,
}
return client , nil
}
func (c *Client ) TURNServerAddr () net .Addr {
return c .turnServerAddr
}
func (c *Client ) STUNServerAddr () net .Addr {
return c .stunServerAddr
}
func (c *Client ) Username () stun .Username {
return c .username
}
func (c *Client ) Realm () stun .Realm {
return c .realm
}
func (c *Client ) WriteTo (data []byte , to net .Addr ) (int , error ) {
return c .conn .WriteTo (data , to )
}
func (c *Client ) Listen () error {
if err := c .listenTryLock .Lock (); err != nil {
return fmt .Errorf ("%w: %s" , errAlreadyListening , err .Error())
}
go func () {
buf := make ([]byte , maxDataBufferSize )
for {
n , from , err := c .conn .ReadFrom (buf )
if err != nil {
c .log .Debugf ("Failed to read: %s. Exiting loop" , err )
break
}
_, err = c .HandleInbound (buf [:n ], from )
if err != nil {
c .log .Debugf ("Failed to handle inbound message: %s. Exiting loop" , err )
break
}
}
c .listenTryLock .Unlock ()
}()
return nil
}
func (c *Client ) Close () {
c .mutexTrMap .Lock ()
defer c .mutexTrMap .Unlock ()
c .trMap .CloseAndDeleteAll ()
}
func (c *Client ) SendBindingRequestTo (to net .Addr ) (net .Addr , error ) {
attrs := []stun .Setter {stun .TransactionID , stun .BindingRequest }
if len (c .software ) > 0 {
attrs = append (attrs , c .software )
}
msg , err := stun .Build (attrs ...)
if err != nil {
return nil , err
}
trRes , err := c .PerformTransaction (msg , to , false )
if err != nil {
return nil , err
}
var reflAddr stun .XORMappedAddress
if err := reflAddr .GetFrom (trRes .Msg ); err != nil {
return nil , err
}
return &net .UDPAddr {
IP : reflAddr .IP ,
Port : reflAddr .Port ,
}, nil
}
func (c *Client ) SendBindingRequest () (net .Addr , error ) {
if c .stunServerAddr == nil {
return nil , errSTUNServerAddressNotSet
}
return c .SendBindingRequestTo (c .stunServerAddr )
}
func (c *Client ) sendAllocateRequest (protocol proto .Protocol ) (
proto .RelayedAddress ,
proto .Lifetime ,
stun .Nonce ,
error ,
) {
var relayed proto .RelayedAddress
var lifetime proto .Lifetime
var nonce stun .Nonce
msg , err := stun .Build (
stun .TransactionID ,
stun .NewType (stun .MethodAllocate , stun .ClassRequest ),
proto .RequestedTransport {Protocol : protocol },
stun .Fingerprint ,
)
if err != nil {
return relayed , lifetime , nonce , err
}
trRes , err := c .PerformTransaction (msg , c .turnServerAddr , false )
if err != nil {
return relayed , lifetime , nonce , err
}
res := trRes .Msg
if err = nonce .GetFrom (res ); err != nil {
return relayed , lifetime , nonce , err
}
if err = c .realm .GetFrom (res ); err != nil {
return relayed , lifetime , nonce , err
}
c .realm = append ([]byte (nil ), c .realm ...)
c .integrity = stun .NewLongTermIntegrity (
c .username .String (), c .realm .String (), c .password ,
)
msg , err = stun .Build (
stun .TransactionID ,
stun .NewType (stun .MethodAllocate , stun .ClassRequest ),
proto .RequestedTransport {Protocol : protocol },
&c .username ,
&c .realm ,
&nonce ,
&c .integrity ,
stun .Fingerprint ,
)
if err != nil {
return relayed , lifetime , nonce , err
}
trRes , err = c .PerformTransaction (msg , c .turnServerAddr , false )
if err != nil {
return relayed , lifetime , nonce , err
}
res = trRes .Msg
if res .Type .Class == stun .ClassErrorResponse {
var code stun .ErrorCodeAttribute
if err = code .GetFrom (res ); err == nil {
return relayed , lifetime , nonce , fmt .Errorf ("%s (error %s)" , res .Type , code )
}
return relayed , lifetime , nonce , fmt .Errorf ("%s" , res .Type )
}
if err := relayed .GetFrom (res ); err != nil {
return relayed , lifetime , nonce , err
}
if err := lifetime .GetFrom (res ); err != nil {
return relayed , lifetime , nonce , err
}
return relayed , lifetime , nonce , nil
}
func (c *Client ) Allocate () (net .PacketConn , error ) {
if err := c .allocTryLock .Lock (); err != nil {
return nil , fmt .Errorf ("%w: %s" , errOneAllocateOnly , err .Error())
}
defer c .allocTryLock .Unlock ()
relayedConn := c .relayedUDPConn ()
if relayedConn != nil {
return nil , fmt .Errorf ("%w: %s" , errAlreadyAllocated , relayedConn .LocalAddr ().String ())
}
relayed , lifetime , nonce , err := c .sendAllocateRequest (proto .ProtoUDP )
if err != nil {
return nil , err
}
relayedAddr := &net .UDPAddr {
IP : relayed .IP ,
Port : relayed .Port ,
}
relayedConn = client .NewUDPConn (&client .AllocationConfig {
Client : c ,
RelayedAddr : relayedAddr ,
ServerAddr : c .turnServerAddr ,
Realm : c .realm ,
Username : c .username ,
Integrity : c .integrity ,
Nonce : nonce ,
Lifetime : lifetime .Duration ,
Net : c .net ,
Log : c .log ,
})
c .setRelayedUDPConn (relayedConn )
return relayedConn , nil
}
func (c *Client ) AllocateTCP () (*client .TCPAllocation , error ) {
if err := c .allocTryLock .Lock (); err != nil {
return nil , fmt .Errorf ("%w: %s" , errOneAllocateOnly , err .Error())
}
defer c .allocTryLock .Unlock ()
allocation := c .getTCPAllocation ()
if allocation != nil {
return nil , fmt .Errorf ("%w: %s" , errAlreadyAllocated , allocation .Addr ())
}
relayed , lifetime , nonce , err := c .sendAllocateRequest (proto .ProtoTCP )
if err != nil {
return nil , err
}
relayedAddr := &net .TCPAddr {
IP : relayed .IP ,
Port : relayed .Port ,
}
allocation = client .NewTCPAllocation (&client .AllocationConfig {
Client : c ,
RelayedAddr : relayedAddr ,
ServerAddr : c .turnServerAddr ,
Realm : c .realm ,
Username : c .username ,
Integrity : c .integrity ,
Nonce : nonce ,
Lifetime : lifetime .Duration ,
Net : c .net ,
Log : c .log ,
})
c .setTCPAllocation (allocation )
return allocation , nil
}
func (c *Client ) CreatePermission (addrs ...net .Addr ) error {
if conn := c .relayedUDPConn (); conn != nil {
if err := conn .CreatePermissions (addrs ...); err != nil {
return err
}
}
if allocation := c .getTCPAllocation (); allocation != nil {
if err := allocation .CreatePermissions (addrs ...); err != nil {
return err
}
}
return nil
}
func (c *Client ) PerformTransaction (msg *stun .Message , to net .Addr , ignoreResult bool ) (client .TransactionResult ,
error ,
) {
trKey := b64 .StdEncoding .EncodeToString (msg .TransactionID [:])
raw := make ([]byte , len (msg .Raw ))
copy (raw , msg .Raw )
tr := client .NewTransaction (&client .TransactionConfig {
Key : trKey ,
Raw : raw ,
To : to ,
Interval : c .rto ,
IgnoreResult : ignoreResult ,
})
c .trMap .Insert (trKey , tr )
c .log .Tracef ("Start %s transaction %s to %s" , msg .Type , trKey , tr .To )
_ , err := c .conn .WriteTo (tr .Raw , to )
if err != nil {
return client .TransactionResult {}, err
}
tr .StartRtxTimer (c .onRtxTimeout )
if ignoreResult {
return client .TransactionResult {}, nil
}
res := tr .WaitForResult ()
if res .Err != nil {
return res , res .Err
}
return res , nil
}
func (c *Client ) OnDeallocated (net .Addr ) {
c .setRelayedUDPConn (nil )
c .setTCPAllocation (nil )
}
func (c *Client ) HandleInbound (data []byte , from net .Addr ) (bool , error ) {
switch {
case stun .IsMessage (data ):
return true , c .handleSTUNMessage (data , from )
case proto .IsChannelData (data ):
return true , c .handleChannelData (data )
case c .stunServerAddr != nil && from .String () == c .stunServerAddr .String ():
return true , errNonSTUNMessage
default :
c .log .Tracef ("Ignoring non-STUN/TURN packet" )
}
return false , nil
}
func (c *Client ) handleSTUNMessage (data []byte , from net .Addr ) error {
raw := make ([]byte , len (data ))
copy (raw , data )
msg := &stun .Message {Raw : raw }
if err := msg .Decode (); err != nil {
return fmt .Errorf ("%w: %s" , errFailedToDecodeSTUN , err .Error())
}
if msg .Type .Class == stun .ClassRequest {
return fmt .Errorf ("%w : %s" , errUnexpectedSTUNRequestMessage , msg .String ())
}
if msg .Type .Class == stun .ClassIndication {
switch msg .Type .Method {
case stun .MethodData :
var peerAddr proto .PeerAddress
if err := peerAddr .GetFrom (msg ); err != nil {
return err
}
from = &net .UDPAddr {
IP : peerAddr .IP ,
Port : peerAddr .Port ,
}
var data proto .Data
if err := data .GetFrom (msg ); err != nil {
return err
}
c .log .Tracef ("Data indication received from %s" , from )
relayedConn := c .relayedUDPConn ()
if relayedConn == nil {
c .log .Debug ("No relayed conn allocated" )
return nil
}
relayedConn .HandleInbound (data , from )
case stun .MethodConnectionAttempt :
var peerAddr proto .PeerAddress
if err := peerAddr .GetFrom (msg ); err != nil {
return err
}
addr := &net .TCPAddr {
IP : peerAddr .IP ,
Port : peerAddr .Port ,
}
var cid proto .ConnectionID
if err := cid .GetFrom (msg ); err != nil {
return err
}
c .log .Debugf ("Connection attempt from %s" , addr )
allocation := c .getTCPAllocation ()
if allocation == nil {
c .log .Debug ("No TCP allocation exists" )
return nil
}
allocation .HandleConnectionAttempt (addr , cid )
default :
c .log .Debug ("Received unsupported STUN method" )
}
return nil
}
trKey := b64 .StdEncoding .EncodeToString (msg .TransactionID [:])
c .mutexTrMap .Lock ()
tr , ok := c .trMap .Find (trKey )
if !ok {
c .mutexTrMap .Unlock ()
c .log .Debugf ("No transaction for %s" , msg )
return nil
}
tr .StopRtxTimer ()
c .trMap .Delete (trKey )
c .mutexTrMap .Unlock ()
if !tr .WriteResult (client .TransactionResult {
Msg : msg ,
From : from ,
Retries : tr .Retries (),
}) {
c .log .Debugf ("No listener for %s" , msg )
}
return nil
}
func (c *Client ) handleChannelData (data []byte ) error {
chData := &proto .ChannelData {
Raw : make ([]byte , len (data )),
}
copy (chData .Raw , data )
if err := chData .Decode (); err != nil {
return err
}
relayedConn := c .relayedUDPConn ()
if relayedConn == nil {
c .log .Debug ("No relayed conn allocated" )
return nil
}
addr , ok := relayedConn .FindAddrByChannelNumber (uint16 (chData .Number ))
if !ok {
return fmt .Errorf ("%w: %d" , errChannelBindNotFound , int (chData .Number ))
}
c .log .Tracef ("Channel data received from %s (ch=%d)" , addr .String (), int (chData .Number ))
relayedConn .HandleInbound (chData .Data , addr )
return nil
}
func (c *Client ) onRtxTimeout (trKey string , nRtx int ) {
c .mutexTrMap .Lock ()
defer c .mutexTrMap .Unlock ()
tr , ok := c .trMap .Find (trKey )
if !ok {
return
}
if nRtx == maxRtxCount {
c .trMap .Delete (trKey )
if !tr .WriteResult (client .TransactionResult {
Err : fmt .Errorf ("%w %s" , errAllRetransmissionsFailed , trKey ),
}) {
c .log .Debug ("No listener for transaction" )
}
return
}
c .log .Tracef ("Retransmitting transaction %s to %s (nRtx=%d)" ,
trKey , tr .To , nRtx )
_ , err := c .conn .WriteTo (tr .Raw , tr .To )
if err != nil {
c .trMap .Delete (trKey )
if !tr .WriteResult (client .TransactionResult {
Err : fmt .Errorf ("%w %s" , errFailedToRetransmitTransaction , trKey ),
}) {
c .log .Debug ("No listener for transaction" )
}
return
}
tr .StartRtxTimer (c .onRtxTimeout )
}
func (c *Client ) setRelayedUDPConn (conn *client .UDPConn ) {
c .mutex .Lock ()
defer c .mutex .Unlock ()
c .relayedConn = conn
}
func (c *Client ) relayedUDPConn () *client .UDPConn {
c .mutex .RLock ()
defer c .mutex .RUnlock ()
return c .relayedConn
}
func (c *Client ) setTCPAllocation (alloc *client .TCPAllocation ) {
c .mutex .Lock ()
defer c .mutex .Unlock ()
c .tcpAllocation = alloc
}
func (c *Client ) getTCPAllocation () *client .TCPAllocation {
c .mutex .RLock ()
defer c .mutex .RUnlock ()
return c .tcpAllocation
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .