package allocation
import (
"net"
"sync"
"sync/atomic"
"time"
"github.com/pion/logging"
"github.com/pion/stun/v3"
"github.com/pion/turn/v4/internal/ipnet"
"github.com/pion/turn/v4/internal/proto"
)
type allocationResponse struct {
transactionID [stun .TransactionIDSize ]byte
responseAttrs []stun .Setter
}
type Allocation struct {
RelayAddr net .Addr
Protocol Protocol
TurnSocket net .PacketConn
RelaySocket net .PacketConn
fiveTuple *FiveTuple
permissionsLock sync .RWMutex
permissions map [string ]*Permission
channelBindingsLock sync .RWMutex
channelBindings []*ChannelBind
lifetimeTimer *time .Timer
closed chan interface {}
log logging .LeveledLogger
responseCache atomic .Value
}
func NewAllocation (turnSocket net .PacketConn , fiveTuple *FiveTuple , log logging .LeveledLogger ) *Allocation {
return &Allocation {
TurnSocket : turnSocket ,
fiveTuple : fiveTuple ,
permissions : make (map [string ]*Permission , 64 ),
closed : make (chan interface {}),
log : log ,
}
}
func (a *Allocation ) GetPermission (addr net .Addr ) *Permission {
a .permissionsLock .RLock ()
defer a .permissionsLock .RUnlock ()
return a .permissions [ipnet .FingerprintAddr (addr )]
}
func (a *Allocation ) AddPermission (perms *Permission ) {
fingerprint := ipnet .FingerprintAddr (perms .Addr )
a .permissionsLock .RLock ()
existedPermission , ok := a .permissions [fingerprint ]
a .permissionsLock .RUnlock ()
if ok {
existedPermission .refresh (permissionTimeout )
return
}
perms .allocation = a
a .permissionsLock .Lock ()
a .permissions [fingerprint ] = perms
a .permissionsLock .Unlock ()
perms .start (permissionTimeout )
}
func (a *Allocation ) RemovePermission (addr net .Addr ) {
a .permissionsLock .Lock ()
defer a .permissionsLock .Unlock ()
delete (a .permissions , ipnet .FingerprintAddr (addr ))
}
func (a *Allocation ) AddChannelBind (chanBind *ChannelBind , lifetime time .Duration ) error {
channelByNumber := a .GetChannelByNumber (chanBind .Number )
if channelByNumber != a .GetChannelByAddr (chanBind .Peer ) {
return errSameChannelDifferentPeer
}
if channelByNumber == nil {
a .channelBindingsLock .Lock ()
defer a .channelBindingsLock .Unlock ()
chanBind .allocation = a
a .channelBindings = append (a .channelBindings , chanBind )
chanBind .start (lifetime )
a .AddPermission (NewPermission (chanBind .Peer , a .log ))
} else {
channelByNumber .refresh (lifetime )
a .AddPermission (NewPermission (channelByNumber .Peer , a .log ))
}
return nil
}
func (a *Allocation ) RemoveChannelBind (number proto .ChannelNumber ) bool {
a .channelBindingsLock .Lock ()
defer a .channelBindingsLock .Unlock ()
for i := len (a .channelBindings ) - 1 ; i >= 0 ; i -- {
if a .channelBindings [i ].Number == number {
a .channelBindings = append (a .channelBindings [:i ], a .channelBindings [i +1 :]...)
return true
}
}
return false
}
func (a *Allocation ) GetChannelByNumber (number proto .ChannelNumber ) *ChannelBind {
a .channelBindingsLock .RLock ()
defer a .channelBindingsLock .RUnlock ()
for _ , cb := range a .channelBindings {
if cb .Number == number {
return cb
}
}
return nil
}
func (a *Allocation ) GetChannelByAddr (addr net .Addr ) *ChannelBind {
a .channelBindingsLock .RLock ()
defer a .channelBindingsLock .RUnlock ()
for _ , cb := range a .channelBindings {
if ipnet .AddrEqual (cb .Peer , addr ) {
return cb
}
}
return nil
}
func (a *Allocation ) Refresh (lifetime time .Duration ) {
if !a .lifetimeTimer .Reset (lifetime ) {
a .log .Errorf ("Failed to reset allocation timer for %v" , a .fiveTuple )
}
}
func (a *Allocation ) SetResponseCache (transactionID [stun .TransactionIDSize ]byte , attrs []stun .Setter ) {
a .responseCache .Store (&allocationResponse {
transactionID : transactionID ,
responseAttrs : attrs ,
})
}
func (a *Allocation ) GetResponseCache () (id [stun .TransactionIDSize ]byte , attrs []stun .Setter ) {
if res , ok := a .responseCache .Load ().(*allocationResponse ); ok && res != nil {
id , attrs = res .transactionID , res .responseAttrs
}
return
}
func (a *Allocation ) Close () error {
select {
case <- a .closed :
return nil
default :
}
close (a .closed )
a .lifetimeTimer .Stop ()
a .permissionsLock .RLock ()
for _ , p := range a .permissions {
p .lifetimeTimer .Stop ()
}
a .permissionsLock .RUnlock ()
a .channelBindingsLock .RLock ()
for _ , c := range a .channelBindings {
c .lifetimeTimer .Stop ()
}
a .channelBindingsLock .RUnlock ()
return a .RelaySocket .Close ()
}
const rtpMTU = 1600
func (a *Allocation ) packetHandler (manager *Manager ) {
buffer := make ([]byte , rtpMTU )
for {
n , srcAddr , err := a .RelaySocket .ReadFrom (buffer )
if err != nil {
manager .DeleteAllocation (a .fiveTuple )
return
}
a .log .Debugf ("Relay socket %s received %d bytes from %s" ,
a .RelaySocket .LocalAddr (),
n ,
srcAddr )
if channel := a .GetChannelByAddr (srcAddr ); channel != nil {
channelData := &proto .ChannelData {
Data : buffer [:n ],
Number : channel .Number ,
}
channelData .Encode ()
if _, err = a .TurnSocket .WriteTo (channelData .Raw , a .fiveTuple .SrcAddr ); err != nil {
a .log .Errorf ("Failed to send ChannelData from allocation %v %v" , srcAddr , err )
}
} else if p := a .GetPermission (srcAddr ); p != nil {
udpAddr , ok := srcAddr .(*net .UDPAddr )
if !ok {
a .log .Errorf ("Failed to send DataIndication from allocation %v %v" , srcAddr , err )
return
}
peerAddressAttr := proto .PeerAddress {IP : udpAddr .IP , Port : udpAddr .Port }
dataAttr := proto .Data (buffer [:n ])
msg , err := stun .Build (
stun .TransactionID ,
stun .NewType (stun .MethodData , stun .ClassIndication ),
peerAddressAttr ,
dataAttr ,
)
if err != nil {
a .log .Errorf ("Failed to send DataIndication from allocation %v %v" , srcAddr , err )
return
}
a .log .Debugf ("Relaying message from %s to client at %s" ,
srcAddr ,
a .fiveTuple .SrcAddr )
if _, err = a .TurnSocket .WriteTo (msg .Raw , a .fiveTuple .SrcAddr ); err != nil {
a .log .Errorf ("Failed to send DataIndication from allocation %v %v" , srcAddr , err )
}
} else {
a .log .Infof ("No Permission or Channel exists for %v on allocation %v" , srcAddr , a .RelayAddr )
}
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .