package client
import (
"errors"
"fmt"
"io"
"math"
"net"
"time"
"github.com/pion/stun/v3"
"github.com/pion/turn/v4/internal/proto"
)
const (
maxReadQueueSize = 1024
permRefreshInterval = 120 * time .Second
maxRetryAttempts = 3
)
const (
timerIDRefreshAlloc int = iota
timerIDRefreshPerms
)
type inboundData struct {
data []byte
from net .Addr
}
type UDPConn struct {
bindingMgr *bindingManager
readCh chan *inboundData
closeCh chan struct {}
allocation
}
func NewUDPConn (config *AllocationConfig ) *UDPConn {
conn := &UDPConn {
bindingMgr : newBindingManager (),
readCh : make (chan *inboundData , maxReadQueueSize ),
closeCh : make (chan struct {}),
allocation : allocation {
client : config .Client ,
relayedAddr : config .RelayedAddr ,
serverAddr : config .ServerAddr ,
readTimer : time .NewTimer (time .Duration (math .MaxInt64 )),
permMap : newPermissionMap (),
username : config .Username ,
realm : config .Realm ,
integrity : config .Integrity ,
_nonce : config .Nonce ,
_lifetime : config .Lifetime ,
net : config .Net ,
log : config .Log ,
},
}
conn .log .Debugf ("Initial lifetime: %d seconds" , int (conn .lifetime ().Seconds ()))
conn .refreshAllocTimer = NewPeriodicTimer (
timerIDRefreshAlloc ,
conn .onRefreshTimers ,
conn .lifetime ()/2 ,
)
conn .refreshPermsTimer = NewPeriodicTimer (
timerIDRefreshPerms ,
conn .onRefreshTimers ,
permRefreshInterval ,
)
if conn .refreshAllocTimer .Start () {
conn .log .Debugf ("Started refresh allocation timer" )
}
if conn .refreshPermsTimer .Start () {
conn .log .Debugf ("Started refresh permission timer" )
}
return conn
}
func (c *UDPConn ) ReadFrom (p []byte ) (n int , addr net .Addr , err error ) {
for {
select {
case ibData := <- c .readCh :
n := copy (p , ibData .data )
if n < len (ibData .data ) {
return 0 , nil , io .ErrShortBuffer
}
return n , ibData .from , nil
case <- c .readTimer .C :
return 0 , nil , &net .OpError {
Op : "read" ,
Net : c .LocalAddr ().Network (),
Addr : c .LocalAddr (),
Err : newTimeoutError ("i/o timeout" ),
}
case <- c .closeCh :
return 0 , nil , &net .OpError {
Op : "read" ,
Net : c .LocalAddr ().Network (),
Addr : c .LocalAddr (),
Err : errClosed ,
}
}
}
}
func (a *allocation ) createPermission (perm *permission , addr net .Addr ) error {
perm .mutex .Lock ()
defer perm .mutex .Unlock ()
if perm .state () == permStateIdle {
if err := a .CreatePermissions (addr ); err != nil {
a .permMap .delete (addr )
return err
}
perm .setState (permStatePermitted )
}
return nil
}
func (c *UDPConn ) WriteTo (payload []byte , addr net .Addr ) (int , error ) {
var err error
_ , ok := addr .(*net .UDPAddr )
if !ok {
return 0 , errUDPAddrCast
}
perm , ok := c .permMap .find (addr )
if !ok {
perm = &permission {}
c .permMap .insert (addr , perm )
}
for i := 0 ; i < maxRetryAttempts ; i ++ {
if err = c .createPermission (perm , addr ); !errors .Is (err , errTryAgain ) {
break
}
}
if err != nil {
return 0 , err
}
bound , ok := c .bindingMgr .findByAddr (addr )
if !ok {
bound = c .bindingMgr .create (addr )
}
bindSt := bound .state ()
if bindSt == bindingStateIdle || bindSt == bindingStateRequest || bindSt == bindingStateFailed {
func () {
bound .muBind .Lock ()
defer bound .muBind .Unlock ()
if bound .state () == bindingStateIdle {
bound .setState (bindingStateRequest )
go func () {
err2 := c .bind (bound )
if err2 != nil {
c .log .Warnf ("Failed to bind bind(): %s" , err2 )
bound .setState (bindingStateFailed )
} else {
bound .setState (bindingStateReady )
}
}()
}
}()
peerAddr := addr2PeerAddress (addr )
var msg *stun .Message
msg , err = stun .Build (
stun .TransactionID ,
stun .NewType (stun .MethodSend , stun .ClassIndication ),
proto .Data (payload ),
peerAddr ,
stun .Fingerprint ,
)
if err != nil {
return 0 , err
}
return c .client .WriteTo (msg .Raw , c .serverAddr )
}
func () {
bound .muBind .Lock ()
defer bound .muBind .Unlock ()
if bound .state () == bindingStateReady && time .Since (bound .refreshedAt ()) > 5 *time .Minute {
bound .setState (bindingStateRefresh )
go func () {
if bindErr := c .bind (bound ); bindErr != nil {
c .log .Warnf ("Failed to bind() for refresh: %s" , bindErr )
bound .setState (bindingStateFailed )
} else {
bound .setRefreshedAt (time .Now ())
bound .setState (bindingStateReady )
}
}()
}
}()
_, err = c .sendChannelData (payload , bound .number )
if err != nil {
return 0 , err
}
return len (payload ), nil
}
func (c *UDPConn ) Close () error {
c .refreshAllocTimer .Stop ()
c .refreshPermsTimer .Stop ()
select {
case <- c .closeCh :
return errAlreadyClosed
default :
close (c .closeCh )
}
c .client .OnDeallocated (c .relayedAddr )
return c .refreshAllocation (0 , true )
}
func (c *UDPConn ) LocalAddr () net .Addr {
return c .relayedAddr
}
func (c *UDPConn ) SetDeadline (t time .Time ) error {
return c .SetReadDeadline (t )
}
func (c *UDPConn ) SetReadDeadline (t time .Time ) error {
var d time .Duration
if t == noDeadline () {
d = time .Duration (math .MaxInt64 )
} else {
d = time .Until (t )
}
c .readTimer .Reset (d )
return nil
}
func (c *UDPConn ) SetWriteDeadline (time .Time ) error {
return nil
}
func addr2PeerAddress(addr net .Addr ) proto .PeerAddress {
var peerAddr proto .PeerAddress
switch a := addr .(type ) {
case *net .UDPAddr :
peerAddr .IP = a .IP
peerAddr .Port = a .Port
case *net .TCPAddr :
peerAddr .IP = a .IP
peerAddr .Port = a .Port
}
return peerAddr
}
func (a *allocation ) CreatePermissions (addrs ...net .Addr ) error {
setters := []stun .Setter {
stun .TransactionID ,
stun .NewType (stun .MethodCreatePermission , stun .ClassRequest ),
}
for _ , addr := range addrs {
setters = append (setters , addr2PeerAddress (addr ))
}
setters = append (setters ,
a .username ,
a .realm ,
a .nonce (),
a .integrity ,
stun .Fingerprint )
msg , err := stun .Build (setters ...)
if err != nil {
return err
}
trRes , err := a .client .PerformTransaction (msg , a .serverAddr , false )
if err != nil {
return err
}
res := trRes .Msg
if res .Type .Class == stun .ClassErrorResponse {
var code stun .ErrorCodeAttribute
if err = code .GetFrom (res ); err == nil {
if code .Code == stun .CodeStaleNonce {
a .setNonceFromMsg (res )
return errTryAgain
}
return fmt .Errorf ("%s (error %s)" , res .Type , code )
}
return fmt .Errorf ("%s" , res .Type )
}
return nil
}
func (c *UDPConn ) HandleInbound (data []byte , from net .Addr ) {
copied := make ([]byte , len (data ))
copy (copied , data )
select {
case c .readCh <- &inboundData {data : copied , from : from }:
default :
c .log .Warnf ("Receive buffer full" )
}
}
func (c *UDPConn ) FindAddrByChannelNumber (chNum uint16 ) (net .Addr , bool ) {
b , ok := c .bindingMgr .findByNumber (chNum )
if !ok {
return nil , false
}
return b .addr , true
}
func (c *UDPConn ) bind (bound *binding ) error {
setters := []stun .Setter {
stun .TransactionID ,
stun .NewType (stun .MethodChannelBind , stun .ClassRequest ),
addr2PeerAddress (bound .addr ),
proto .ChannelNumber (bound .number ),
c .username ,
c .realm ,
c .nonce (),
c .integrity ,
stun .Fingerprint ,
}
msg , err := stun .Build (setters ...)
if err != nil {
return err
}
trRes , err := c .client .PerformTransaction (msg , c .serverAddr , false )
if err != nil {
c .bindingMgr .deleteByAddr (bound .addr )
return err
}
res := trRes .Msg
if res .Type != stun .NewType (stun .MethodChannelBind , stun .ClassSuccessResponse ) {
return fmt .Errorf ("unexpected response type %s" , res .Type )
}
c .log .Debugf ("Channel binding successful: %s %d" , bound .addr , bound .number )
return nil
}
func (c *UDPConn ) sendChannelData (data []byte , chNum uint16 ) (int , error ) {
chData := &proto .ChannelData {
Data : data ,
Number : proto .ChannelNumber (chNum ),
}
chData .Encode ()
_ , err := c .client .WriteTo (chData .Raw , c .serverAddr )
if err != nil {
return 0 , err
}
return len (data ), nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .