package sctp
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"math"
"net"
"sync"
"sync/atomic"
"time"
"github.com/pion/logging"
"github.com/pion/randutil"
"github.com/pion/transport/v3/deadline"
)
const defaultSCTPSrcDstPort = 5000
var globalMathRandomGenerator = randutil .NewMathRandomGenerator ()
var (
ErrChunk = errors .New ("abort chunk, with following errors" )
ErrShutdownNonEstablished = errors .New ("shutdown called in non-established state" )
ErrAssociationClosedBeforeConn = errors .New ("association closed before connecting" )
ErrAssociationClosed = errors .New ("association closed" )
ErrSilentlyDiscard = errors .New ("silently discard" )
ErrInitNotStoredToSend = errors .New ("the init not stored to send" )
ErrCookieEchoNotStoredToSend = errors .New ("cookieEcho not stored to send" )
ErrSCTPPacketSourcePortZero = errors .New ("sctp packet must not have a source port of 0" )
ErrSCTPPacketDestinationPortZero = errors .New ("sctp packet must not have a destination port of 0" )
ErrInitChunkBundled = errors .New ("init chunk must not be bundled with any other chunk" )
ErrInitChunkVerifyTagNotZero = errors .New (
"init chunk expects a verification tag of 0 on the packet when out-of-the-blue" ,
)
ErrHandleInitState = errors .New ("todo: handle Init when in state" )
ErrInitAckNoCookie = errors .New ("no cookie in InitAck" )
ErrInflightQueueTSNPop = errors .New ("unable to be popped from inflight queue TSN" )
ErrTSNRequestNotExist = errors .New ("requested non-existent TSN" )
ErrResetPacketInStateNotExist = errors .New ("sending reset packet in non-established state" )
ErrParamterType = errors .New ("unexpected parameter type" )
ErrPayloadDataStateNotExist = errors .New ("sending payload data in non-established state" )
ErrChunkTypeUnhandled = errors .New ("unhandled chunk type" )
ErrHandshakeInitAck = errors .New ("handshake failed (INIT ACK)" )
ErrHandshakeCookieEcho = errors .New ("handshake failed (COOKIE ECHO)" )
ErrTooManyReconfigRequests = errors .New ("too many outstanding reconfig requests" )
)
const (
receiveMTU uint32 = 8192
initialMTU uint32 = 1228
initialRecvBufSize uint32 = 1024 * 1024
commonHeaderSize uint32 = 12
dataChunkHeaderSize uint32 = 16
defaultMaxMessageSize uint32 = 65536
)
const (
closed uint32 = iota
cookieWait
cookieEchoed
established
shutdownAckSent
shutdownPending
shutdownReceived
shutdownSent
)
const (
timerT1Init int = iota
timerT1Cookie
timerT2Shutdown
timerT3RTX
timerReconfig
)
const (
ackModeNormal int = iota
ackModeNoDelay
ackModeAlwaysDelay
)
const (
ackStateIdle int = iota
ackStateImmediate
ackStateDelay
)
const (
acceptChSize = 16
avgChunkSize = 500
minTSNOffset = 2000
maxTSNOffset = 40000
maxReconfigRequests = 1000
)
func getAssociationStateString(assoc uint32 ) string {
switch assoc {
case closed :
return "Closed"
case cookieWait :
return "CookieWait"
case cookieEchoed :
return "CookieEchoed"
case established :
return "Established"
case shutdownPending :
return "ShutdownPending"
case shutdownSent :
return "ShutdownSent"
case shutdownReceived :
return "ShutdownReceived"
case shutdownAckSent :
return "ShutdownAckSent"
default :
return fmt .Sprintf ("Invalid association state %d" , assoc )
}
}
type Association struct {
bytesReceived uint64
bytesSent uint64
lock sync .RWMutex
netConn net .Conn
peerVerificationTag uint32
myVerificationTag uint32
state uint32
initialTSN uint32
myNextTSN uint32
minTSN2MeasureRTT uint32
willSendForwardTSN bool
willRetransmitFast bool
willRetransmitReconfig bool
willSendShutdown bool
willSendShutdownAck bool
willSendShutdownComplete bool
willSendAbort bool
willSendAbortCause errorCause
myNextRSN uint32
reconfigs map [uint32 ]*chunkReconfig
reconfigRequests map [uint32 ]*paramOutgoingResetRequest
sourcePort uint16
destinationPort uint16
myMaxNumInboundStreams uint16
myMaxNumOutboundStreams uint16
myCookie *paramStateCookie
payloadQueue *receivePayloadQueue
inflightQueue *payloadQueue
pendingQueue *pendingQueue
controlQueue *controlQueue
mtu uint32
maxPayloadSize uint32
srtt atomic .Value
cumulativeTSNAckPoint uint32
advancedPeerTSNAckPoint uint32
useForwardTSN bool
sendZeroChecksum bool
recvZeroChecksum bool
maxReceiveBufferSize uint32
maxMessageSize uint32
cwnd uint32
rwnd uint32
ssthresh uint32
partialBytesAcked uint32
inFastRecovery bool
fastRecoverExitPoint uint32
minCwnd uint32
fastRtxWnd uint32
cwndCAStep uint32
rtoMgr *rtoManager
t1Init *rtxTimer
t1Cookie *rtxTimer
t2Shutdown *rtxTimer
t3RTX *rtxTimer
tReconfig *rtxTimer
ackTimer *ackTimer
storedInit *chunkInit
storedCookieEcho *chunkCookieEcho
streams map [uint16 ]*Stream
acceptCh chan *Stream
readLoopCloseCh chan struct {}
awakeWriteLoopCh chan struct {}
closeWriteLoopCh chan struct {}
handshakeCompletedCh chan error
closeWriteLoopOnce sync .Once
silentError error
ackState int
ackMode int
stats *associationStats
delayedAckTriggered bool
immediateAckTriggered bool
blockWrite bool
writePending bool
writeNotify chan struct {}
name string
log logging .LeveledLogger
}
type Config struct {
Name string
NetConn net .Conn
MaxReceiveBufferSize uint32
MaxMessageSize uint32
EnableZeroChecksum bool
LoggerFactory logging .LoggerFactory
BlockWrite bool
MTU uint32
RTOMax float64
MinCwnd uint32
FastRtxWnd uint32
CwndCAStep uint32
}
func Server (config Config ) (*Association , error ) {
a := createAssociation (config )
a .init (false )
select {
case err := <- a .handshakeCompletedCh :
if err != nil {
return nil , err
}
return a , nil
case <- a .readLoopCloseCh :
return nil , ErrAssociationClosedBeforeConn
}
}
func Client (config Config ) (*Association , error ) {
return createClientWithContext (context .Background (), config )
}
func createClientWithContext(ctx context .Context , config Config ) (*Association , error ) {
assoc := createAssociation (config )
assoc .init (true )
select {
case <- ctx .Done ():
assoc .log .Errorf ("[%s] client handshake canceled: state=%s" , assoc .name , getAssociationStateString (assoc .getState ()))
assoc .Close ()
return nil , ctx .Err ()
case err := <- assoc .handshakeCompletedCh :
if err != nil {
return nil , err
}
return assoc , nil
case <- assoc .readLoopCloseCh :
return nil , ErrAssociationClosedBeforeConn
}
}
func createAssociation(config Config ) *Association {
maxReceiveBufferSize := config .MaxReceiveBufferSize
if maxReceiveBufferSize == 0 {
maxReceiveBufferSize = initialRecvBufSize
}
maxMessageSize := config .MaxMessageSize
if maxMessageSize == 0 {
maxMessageSize = defaultMaxMessageSize
}
mtu := config .MTU
if mtu == 0 {
mtu = initialMTU
}
tsn := globalMathRandomGenerator .Uint32 ()
assoc := &Association {
netConn : config .NetConn ,
maxReceiveBufferSize : maxReceiveBufferSize ,
maxMessageSize : maxMessageSize ,
minCwnd : config .MinCwnd ,
fastRtxWnd : config .FastRtxWnd ,
cwndCAStep : config .CwndCAStep ,
myMaxNumOutboundStreams : math .MaxUint16 ,
myMaxNumInboundStreams : math .MaxUint16 ,
payloadQueue : newReceivePayloadQueue (getMaxTSNOffset (maxReceiveBufferSize )),
inflightQueue : newPayloadQueue (),
pendingQueue : newPendingQueue (),
controlQueue : newControlQueue (),
mtu : mtu ,
maxPayloadSize : mtu - (commonHeaderSize + dataChunkHeaderSize ),
myVerificationTag : globalMathRandomGenerator .Uint32 (),
initialTSN : tsn ,
myNextTSN : tsn ,
myNextRSN : tsn ,
minTSN2MeasureRTT : tsn ,
state : closed ,
rtoMgr : newRTOManager (config .RTOMax ),
streams : map [uint16 ]*Stream {},
reconfigs : map [uint32 ]*chunkReconfig {},
reconfigRequests : map [uint32 ]*paramOutgoingResetRequest {},
acceptCh : make (chan *Stream , acceptChSize ),
readLoopCloseCh : make (chan struct {}),
awakeWriteLoopCh : make (chan struct {}, 1 ),
closeWriteLoopCh : make (chan struct {}),
handshakeCompletedCh : make (chan error ),
cumulativeTSNAckPoint : tsn - 1 ,
advancedPeerTSNAckPoint : tsn - 1 ,
recvZeroChecksum : config .EnableZeroChecksum ,
silentError : ErrSilentlyDiscard ,
stats : &associationStats {},
log : config .LoggerFactory .NewLogger ("sctp" ),
name : config .Name ,
blockWrite : config .BlockWrite ,
writeNotify : make (chan struct {}, 1 ),
}
if assoc .name == "" {
assoc .name = fmt .Sprintf ("%p" , assoc )
}
assoc .setCWND (min32 (4 *assoc .MTU (), max32 (2 *assoc .MTU (), 4380 )))
assoc .log .Tracef ("[%s] updated cwnd=%d ssthresh=%d inflight=%d (INI)" ,
assoc .name , assoc .CWND (), assoc .ssthresh , assoc .inflightQueue .getNumBytes ())
assoc .srtt .Store (float64 (0 ))
assoc .t1Init = newRTXTimer (timerT1Init , assoc , maxInitRetrans , config .RTOMax )
assoc .t1Cookie = newRTXTimer (timerT1Cookie , assoc , maxInitRetrans , config .RTOMax )
assoc .t2Shutdown = newRTXTimer (timerT2Shutdown , assoc , noMaxRetrans , config .RTOMax )
assoc .t3RTX = newRTXTimer (timerT3RTX , assoc , noMaxRetrans , config .RTOMax )
assoc .tReconfig = newRTXTimer (timerReconfig , assoc , noMaxRetrans , config .RTOMax )
assoc .ackTimer = newAckTimer (assoc )
return assoc
}
func (a *Association ) init (isClient bool ) {
a .lock .Lock ()
defer a .lock .Unlock ()
go a .readLoop ()
go a .writeLoop ()
if isClient {
init := &chunkInit {}
init .initialTSN = a .myNextTSN
init .numOutboundStreams = a .myMaxNumOutboundStreams
init .numInboundStreams = a .myMaxNumInboundStreams
init .initiateTag = a .myVerificationTag
init .advertisedReceiverWindowCredit = a .maxReceiveBufferSize
setSupportedExtensions (&init .chunkInitCommon )
if a .recvZeroChecksum {
init .params = append (init .params , ¶mZeroChecksumAcceptable {edmid : dtlsErrorDetectionMethod })
}
a .storedInit = init
err := a .sendInit ()
if err != nil {
a .log .Errorf ("[%s] failed to send init: %s" , a .name , err .Error())
}
a .setState (cookieWait )
a .t1Init .start (a .rtoMgr .getRTO ())
}
}
func (a *Association ) sendInit () error {
a .log .Debugf ("[%s] sending INIT" , a .name )
if a .storedInit == nil {
return ErrInitNotStoredToSend
}
outbound := &packet {}
outbound .verificationTag = 0
a .sourcePort = defaultSCTPSrcDstPort
a .destinationPort = defaultSCTPSrcDstPort
outbound .sourcePort = a .sourcePort
outbound .destinationPort = a .destinationPort
outbound .chunks = []chunk {a .storedInit }
a .controlQueue .push (outbound )
a .awakeWriteLoop ()
return nil
}
func (a *Association ) sendCookieEcho () error {
if a .storedCookieEcho == nil {
return ErrCookieEchoNotStoredToSend
}
a .log .Debugf ("[%s] sending COOKIE-ECHO" , a .name )
outbound := &packet {}
outbound .verificationTag = a .peerVerificationTag
outbound .sourcePort = a .sourcePort
outbound .destinationPort = a .destinationPort
outbound .chunks = []chunk {a .storedCookieEcho }
a .controlQueue .push (outbound )
a .awakeWriteLoop ()
return nil
}
func (a *Association ) Shutdown (ctx context .Context ) error {
a .log .Debugf ("[%s] closing association.." , a .name )
state := a .getState ()
if state != established {
return fmt .Errorf ("%w: shutdown %s" , ErrShutdownNonEstablished , a .name )
}
a .setState (shutdownPending )
a .lock .Lock ()
if a .inflightQueue .size () == 0 {
a .willSendShutdown = true
a .awakeWriteLoop ()
a .setState (shutdownSent )
}
a .lock .Unlock ()
select {
case <- a .closeWriteLoopCh :
return nil
case <- ctx .Done ():
return ctx .Err ()
}
}
func (a *Association ) Close () error {
a .log .Debugf ("[%s] closing association.." , a .name )
err := a .close ()
<-a .readLoopCloseCh
a .log .Debugf ("[%s] association closed" , a .name )
a .log .Debugf ("[%s] stats nPackets (in) : %d" , a .name , a .stats .getNumPacketsReceived ())
a .log .Debugf ("[%s] stats nPackets (out) : %d" , a .name , a .stats .getNumPacketsSent ())
a .log .Debugf ("[%s] stats nDATAs (in) : %d" , a .name , a .stats .getNumDATAs ())
a .log .Debugf ("[%s] stats nSACKs (in) : %d" , a .name , a .stats .getNumSACKsReceived ())
a .log .Debugf ("[%s] stats nSACKs (out) : %d" , a .name , a .stats .getNumSACKsSent ())
a .log .Debugf ("[%s] stats nT3Timeouts : %d" , a .name , a .stats .getNumT3Timeouts ())
a .log .Debugf ("[%s] stats nAckTimeouts: %d" , a .name , a .stats .getNumAckTimeouts ())
a .log .Debugf ("[%s] stats nFastRetrans: %d" , a .name , a .stats .getNumFastRetrans ())
return err
}
func (a *Association ) close () error {
a .log .Debugf ("[%s] closing association.." , a .name )
a .setState (closed )
err := a .netConn .Close ()
a .closeAllTimers ()
a .closeWriteLoopOnce .Do (func () { close (a .closeWriteLoopCh ) })
return err
}
func (a *Association ) Abort (reason string ) {
a .log .Debugf ("[%s] aborting association: %s" , a .name , reason )
a .lock .Lock ()
a .willSendAbort = true
a .willSendAbortCause = &errorCauseUserInitiatedAbort {
upperLayerAbortReason : []byte (reason ),
}
a .lock .Unlock ()
a .awakeWriteLoop ()
<-a .readLoopCloseCh
}
func (a *Association ) closeAllTimers () {
a .t1Init .close ()
a .t1Cookie .close ()
a .t2Shutdown .close ()
a .t3RTX .close ()
a .tReconfig .close ()
a .ackTimer .close ()
}
func (a *Association ) readLoop () {
var closeErr error
defer func () {
a .closeWriteLoopOnce .Do (func () { close (a .closeWriteLoopCh ) })
a .lock .Lock ()
a .setState (closed )
for _ , s := range a .streams {
a .unregisterStream (s , closeErr )
}
a .lock .Unlock ()
close (a .acceptCh )
close (a .readLoopCloseCh )
a .log .Debugf ("[%s] association closed" , a .name )
a .log .Debugf ("[%s] stats nDATAs (in) : %d" , a .name , a .stats .getNumDATAs ())
a .log .Debugf ("[%s] stats nSACKs (in) : %d" , a .name , a .stats .getNumSACKsReceived ())
a .log .Debugf ("[%s] stats nT3Timeouts : %d" , a .name , a .stats .getNumT3Timeouts ())
a .log .Debugf ("[%s] stats nAckTimeouts: %d" , a .name , a .stats .getNumAckTimeouts ())
a .log .Debugf ("[%s] stats nFastRetrans: %d" , a .name , a .stats .getNumFastRetrans ())
}()
a .log .Debugf ("[%s] readLoop entered" , a .name )
buffer := make ([]byte , receiveMTU )
for {
n , err := a .netConn .Read (buffer )
if err != nil {
closeErr = err
break
}
inbound := make ([]byte , n )
copy (inbound , buffer [:n ])
atomic .AddUint64 (&a .bytesReceived , uint64 (n ))
if err = a .handleInbound (inbound ); err != nil {
closeErr = err
break
}
}
a .log .Debugf ("[%s] readLoop exited %s" , a .name , closeErr )
}
func (a *Association ) writeLoop () {
a .log .Debugf ("[%s] writeLoop entered" , a .name )
defer a .log .Debugf ("[%s] writeLoop exited" , a .name )
loop :
for {
rawPackets , ok := a .gatherOutbound ()
for _ , raw := range rawPackets {
_ , err := a .netConn .Write (raw )
if err != nil {
if !errors .Is (err , io .EOF ) {
a .log .Warnf ("[%s] failed to write packets on netConn: %v" , a .name , err )
}
a .log .Debugf ("[%s] writeLoop ended" , a .name )
break loop
}
atomic .AddUint64 (&a .bytesSent , uint64 (len (raw )))
a .stats .incPacketsSent ()
}
if !ok {
if err := a .close (); err != nil {
a .log .Warnf ("[%s] failed to close association: %v" , a .name , err )
}
return
}
select {
case <- a .awakeWriteLoopCh :
case <- a .closeWriteLoopCh :
break loop
}
}
a .setState (closed )
a .closeAllTimers ()
}
func (a *Association ) awakeWriteLoop () {
select {
case a .awakeWriteLoopCh <- struct {}{}:
default :
}
}
func (a *Association ) isBlockWrite () bool {
return a .blockWrite
}
func (a *Association ) notifyBlockWritable () {
a .writePending = false
select {
case a .writeNotify <- struct {}{}:
default :
}
}
func (a *Association ) unregisterStream (s *Stream , err error ) {
s .lock .Lock ()
defer s .lock .Unlock ()
delete (a .streams , s .streamIdentifier )
s .readErr = err
s .readNotifier .Broadcast ()
}
func chunkMandatoryChecksum(cc []chunk ) bool {
for _ , c := range cc {
switch c .(type ) {
case *chunkInit , *chunkCookieEcho :
return true
}
}
return false
}
func (a *Association ) marshalPacket (p *packet ) ([]byte , error ) {
return p .marshal (!a .sendZeroChecksum || chunkMandatoryChecksum (p .chunks ))
}
func (a *Association ) unmarshalPacket (raw []byte ) (*packet , error ) {
p := &packet {}
if err := p .unmarshal (!a .recvZeroChecksum , raw ); err != nil {
return nil , err
}
return p , nil
}
func (a *Association ) handleInbound (raw []byte ) error {
pkt , err := a .unmarshalPacket (raw )
if err != nil {
a .log .Warnf ("[%s] unable to parse SCTP packet %s" , a .name , err )
return nil
}
if err := checkPacket (pkt ); err != nil {
a .log .Warnf ("[%s] failed validating packet %s" , a .name , err )
return nil
}
a .handleChunksStart ()
for _ , c := range pkt .chunks {
if err := a .handleChunk (pkt , c ); err != nil {
return err
}
}
a .handleChunksEnd ()
return nil
}
func (a *Association ) gatherDataPacketsToRetransmit (rawPackets [][]byte ) [][]byte {
for _ , p := range a .getDataPacketsToRetransmit () {
raw , err := a .marshalPacket (p )
if err != nil {
a .log .Warnf ("[%s] failed to serialize a DATA packet to be retransmitted" , a .name )
continue
}
rawPackets = append (rawPackets , raw )
}
return rawPackets
}
func (a *Association ) gatherOutboundDataAndReconfigPackets (rawPackets [][]byte ) [][]byte {
chunks , sisToReset := a .popPendingDataChunksToSend ()
if len (chunks ) > 0 {
a .log .Tracef ("[%s] T3-rtx timer start (pt1)" , a .name )
a .t3RTX .start (a .rtoMgr .getRTO ())
for _ , p := range a .bundleDataChunksIntoPackets (chunks ) {
raw , err := a .marshalPacket (p )
if err != nil {
a .log .Warnf ("[%s] failed to serialize a DATA packet" , a .name )
continue
}
rawPackets = append (rawPackets , raw )
}
}
if len (sisToReset ) > 0 || a .willRetransmitReconfig {
if a .willRetransmitReconfig {
a .willRetransmitReconfig = false
a .log .Debugf ("[%s] retransmit %d RECONFIG chunk(s)" , a .name , len (a .reconfigs ))
for _ , c := range a .reconfigs {
p := a .createPacket ([]chunk {c })
raw , err := a .marshalPacket (p )
if err != nil {
a .log .Warnf ("[%s] failed to serialize a RECONFIG packet to be retransmitted" , a .name )
} else {
rawPackets = append (rawPackets , raw )
}
}
}
if len (sisToReset ) > 0 {
rsn := a .generateNextRSN ()
tsn := a .myNextTSN - 1
c := &chunkReconfig {
paramA : ¶mOutgoingResetRequest {
reconfigRequestSequenceNumber : rsn ,
senderLastTSN : tsn ,
streamIdentifiers : sisToReset ,
},
}
a .reconfigs [rsn ] = c
a .log .Debugf ("[%s] sending RECONFIG: rsn=%d tsn=%d streams=%v" ,
a .name , rsn , a .myNextTSN -1 , sisToReset )
p := a .createPacket ([]chunk {c })
raw , err := a .marshalPacket (p )
if err != nil {
a .log .Warnf ("[%s] failed to serialize a RECONFIG packet to be transmitted" , a .name )
} else {
rawPackets = append (rawPackets , raw )
}
}
if len (a .reconfigs ) > 0 {
a .tReconfig .start (a .rtoMgr .getRTO ())
}
}
return rawPackets
}
func (a *Association ) gatherOutboundFastRetransmissionPackets (rawPackets [][]byte ) [][]byte {
if a .willRetransmitFast {
a .willRetransmitFast = false
toFastRetrans := []*chunkPayloadData {}
fastRetransSize := commonHeaderSize
fastRetransWnd := a .MTU ()
if fastRetransWnd < a .fastRtxWnd {
fastRetransWnd = a .fastRtxWnd
}
for i := 0 ; ; i ++ {
chunkPayload , ok := a .inflightQueue .get (a .cumulativeTSNAckPoint + uint32 (i ) + 1 )
if !ok {
break
}
if chunkPayload .acked || chunkPayload .abandoned () {
continue
}
if chunkPayload .nSent > 1 || chunkPayload .missIndicator < 3 {
continue
}
dataChunkSize := dataChunkHeaderSize + uint32 (len (chunkPayload .userData ))
if fastRetransWnd < fastRetransSize +dataChunkSize {
break
}
fastRetransSize += dataChunkSize
a .stats .incFastRetrans ()
chunkPayload .nSent ++
a .checkPartialReliabilityStatus (chunkPayload )
toFastRetrans = append (toFastRetrans , chunkPayload )
a .log .Tracef ("[%s] fast-retransmit: tsn=%d sent=%d htna=%d" ,
a .name , chunkPayload .tsn , chunkPayload .nSent , a .fastRecoverExitPoint )
}
if len (toFastRetrans ) > 0 {
for _ , p := range a .bundleDataChunksIntoPackets (toFastRetrans ) {
raw , err := a .marshalPacket (p )
if err != nil {
a .log .Warnf ("[%s] failed to serialize a DATA packet to be fast-retransmitted" , a .name )
continue
}
rawPackets = append (rawPackets , raw )
}
}
}
return rawPackets
}
func (a *Association ) gatherOutboundSackPackets (rawPackets [][]byte ) [][]byte {
if a .ackState == ackStateImmediate {
a .ackState = ackStateIdle
sack := a .createSelectiveAckChunk ()
a .stats .incSACKsSent ()
a .log .Debugf ("[%s] sending SACK: %s" , a .name , sack )
raw , err := a .marshalPacket (a .createPacket ([]chunk {sack }))
if err != nil {
a .log .Warnf ("[%s] failed to serialize a SACK packet" , a .name )
} else {
rawPackets = append (rawPackets , raw )
}
}
return rawPackets
}
func (a *Association ) gatherOutboundForwardTSNPackets (rawPackets [][]byte ) [][]byte {
if a .willSendForwardTSN {
a .willSendForwardTSN = false
if sna32GT (a .advancedPeerTSNAckPoint , a .cumulativeTSNAckPoint ) {
fwdtsn := a .createForwardTSN ()
raw , err := a .marshalPacket (a .createPacket ([]chunk {fwdtsn }))
if err != nil {
a .log .Warnf ("[%s] failed to serialize a Forward TSN packet" , a .name )
} else {
rawPackets = append (rawPackets , raw )
}
}
}
return rawPackets
}
func (a *Association ) gatherOutboundShutdownPackets (rawPackets [][]byte ) ([][]byte , bool ) {
ok := true
switch {
case a .willSendShutdown :
a .willSendShutdown = false
shutdown := &chunkShutdown {
cumulativeTSNAck : a .cumulativeTSNAckPoint ,
}
raw , err := a .marshalPacket (a .createPacket ([]chunk {shutdown }))
if err != nil {
a .log .Warnf ("[%s] failed to serialize a Shutdown packet" , a .name )
} else {
a .t2Shutdown .start (a .rtoMgr .getRTO ())
rawPackets = append (rawPackets , raw )
}
case a .willSendShutdownAck :
a .willSendShutdownAck = false
shutdownAck := &chunkShutdownAck {}
raw , err := a .marshalPacket (a .createPacket ([]chunk {shutdownAck }))
if err != nil {
a .log .Warnf ("[%s] failed to serialize a ShutdownAck packet" , a .name )
} else {
a .t2Shutdown .start (a .rtoMgr .getRTO ())
rawPackets = append (rawPackets , raw )
}
case a .willSendShutdownComplete :
a .willSendShutdownComplete = false
shutdownComplete := &chunkShutdownComplete {}
raw , err := a .marshalPacket (a .createPacket ([]chunk {shutdownComplete }))
if err != nil {
a .log .Warnf ("[%s] failed to serialize a ShutdownComplete packet" , a .name )
} else {
rawPackets = append (rawPackets , raw )
ok = false
}
}
return rawPackets , ok
}
func (a *Association ) gatherAbortPacket () ([]byte , error ) {
cause := a .willSendAbortCause
a .willSendAbort = false
a .willSendAbortCause = nil
abort := &chunkAbort {}
if cause != nil {
abort .errorCauses = []errorCause {cause }
}
raw , err := a .marshalPacket (a .createPacket ([]chunk {abort }))
return raw , err
}
func (a *Association ) gatherOutbound () ([][]byte , bool ) {
a .lock .Lock ()
defer a .lock .Unlock ()
if a .willSendAbort {
pkt , err := a .gatherAbortPacket ()
if err != nil {
a .log .Warnf ("[%s] failed to serialize an abort packet" , a .name )
return nil , false
}
return [][]byte {pkt }, false
}
rawPackets := [][]byte {}
if a .controlQueue .size () > 0 {
for _ , p := range a .controlQueue .popAll () {
raw , err := a .marshalPacket (p )
if err != nil {
a .log .Warnf ("[%s] failed to serialize a control packet" , a .name )
continue
}
rawPackets = append (rawPackets , raw )
}
}
state := a .getState ()
ok := true
switch state {
case established :
rawPackets = a .gatherDataPacketsToRetransmit (rawPackets )
rawPackets = a .gatherOutboundDataAndReconfigPackets (rawPackets )
rawPackets = a .gatherOutboundFastRetransmissionPackets (rawPackets )
rawPackets = a .gatherOutboundSackPackets (rawPackets )
rawPackets = a .gatherOutboundForwardTSNPackets (rawPackets )
case shutdownPending , shutdownSent , shutdownReceived :
rawPackets = a .gatherDataPacketsToRetransmit (rawPackets )
rawPackets = a .gatherOutboundFastRetransmissionPackets (rawPackets )
rawPackets = a .gatherOutboundSackPackets (rawPackets )
rawPackets , ok = a .gatherOutboundShutdownPackets (rawPackets )
case shutdownAckSent :
rawPackets , ok = a .gatherOutboundShutdownPackets (rawPackets )
}
return rawPackets , ok
}
func checkPacket(pkt *packet ) error {
if pkt .sourcePort == 0 {
return ErrSCTPPacketSourcePortZero
}
if pkt .destinationPort == 0 {
return ErrSCTPPacketDestinationPortZero
}
for _ , c := range pkt .chunks {
switch c .(type ) {
case *chunkInit :
if len (pkt .chunks ) != 1 {
return ErrInitChunkBundled
}
if pkt .verificationTag != 0 {
return ErrInitChunkVerifyTagNotZero
}
}
}
return nil
}
func min16(a , b uint16 ) uint16 {
if a < b {
return a
}
return b
}
func max32(a , b uint32 ) uint32 {
if a > b {
return a
}
return b
}
func min32(a , b uint32 ) uint32 {
if a < b {
return a
}
return b
}
func (a *Association ) peerLastTSN () uint32 {
return a .payloadQueue .getcumulativeTSN ()
}
func (a *Association ) setState (newState uint32 ) {
oldState := atomic .SwapUint32 (&a .state , newState )
if newState != oldState {
a .log .Debugf ("[%s] state change: '%s' => '%s'" ,
a .name ,
getAssociationStateString (oldState ),
getAssociationStateString (newState ))
}
}
func (a *Association ) getState () uint32 {
return atomic .LoadUint32 (&a .state )
}
func (a *Association ) BytesSent () uint64 {
return atomic .LoadUint64 (&a .bytesSent )
}
func (a *Association ) BytesReceived () uint64 {
return atomic .LoadUint64 (&a .bytesReceived )
}
func (a *Association ) MTU () uint32 {
return atomic .LoadUint32 (&a .mtu )
}
func (a *Association ) CWND () uint32 {
return atomic .LoadUint32 (&a .cwnd )
}
func (a *Association ) setCWND (cwnd uint32 ) {
if cwnd < a .minCwnd {
cwnd = a .minCwnd
}
atomic .StoreUint32 (&a .cwnd , cwnd )
}
func (a *Association ) RWND () uint32 {
return atomic .LoadUint32 (&a .rwnd )
}
func (a *Association ) setRWND (rwnd uint32 ) {
atomic .StoreUint32 (&a .rwnd , rwnd )
}
func (a *Association ) SRTT () float64 {
return a .srtt .Load ().(float64 )
}
func getMaxTSNOffset(maxReceiveBufferSize uint32 ) uint32 {
offset := (maxReceiveBufferSize * 4 ) / avgChunkSize
if offset < minTSNOffset {
offset = minTSNOffset
}
if offset > maxTSNOffset {
offset = maxTSNOffset
}
return offset
}
func setSupportedExtensions(init *chunkInitCommon ) {
init .params = append (init .params , ¶mSupportedExtensions {
ChunkTypes : []chunkType {ctReconfig , ctForwardTSN },
})
}
func (a *Association ) handleInit (pkt *packet , initChunk *chunkInit ) ([]*packet , error ) {
state := a .getState ()
a .log .Debugf ("[%s] chunkInit received in state '%s'" , a .name , getAssociationStateString (state ))
if state != closed && state != cookieWait && state != cookieEchoed {
return nil , fmt .Errorf ("%w: %s" , ErrHandleInitState , getAssociationStateString (state ))
}
a .myMaxNumInboundStreams = min16 (initChunk .numInboundStreams , a .myMaxNumInboundStreams )
a .myMaxNumOutboundStreams = min16 (initChunk .numOutboundStreams , a .myMaxNumOutboundStreams )
a .peerVerificationTag = initChunk .initiateTag
a .sourcePort = pkt .destinationPort
a .destinationPort = pkt .sourcePort
a .payloadQueue .init (initChunk .initialTSN - 1 )
a .setRWND (initChunk .advertisedReceiverWindowCredit )
a .log .Debugf ("[%s] initial rwnd=%d" , a .name , a .RWND ())
for _ , param := range initChunk .params {
switch v := param .(type ) {
case *paramSupportedExtensions :
for _ , t := range v .ChunkTypes {
if t == ctForwardTSN {
a .log .Debugf ("[%s] use ForwardTSN (on init)" , a .name )
a .useForwardTSN = true
}
}
case *paramZeroChecksumAcceptable :
a .sendZeroChecksum = v .edmid == dtlsErrorDetectionMethod
}
}
if !a .useForwardTSN {
a .log .Warnf ("[%s] not using ForwardTSN (on init)" , a .name )
}
outbound := &packet {}
outbound .verificationTag = a .peerVerificationTag
outbound .sourcePort = a .sourcePort
outbound .destinationPort = a .destinationPort
initAck := &chunkInitAck {}
a .log .Debug ("sending INIT ACK" )
initAck .initialTSN = a .myNextTSN
initAck .numOutboundStreams = a .myMaxNumOutboundStreams
initAck .numInboundStreams = a .myMaxNumInboundStreams
initAck .initiateTag = a .myVerificationTag
initAck .advertisedReceiverWindowCredit = a .maxReceiveBufferSize
if a .myCookie == nil {
var err error
if a .myCookie , err = newRandomStateCookie (); err != nil {
return nil , err
}
}
initAck .params = []param {a .myCookie }
if a .recvZeroChecksum {
initAck .params = append (initAck .params , ¶mZeroChecksumAcceptable {edmid : dtlsErrorDetectionMethod })
}
a .log .Debugf ("[%s] sendZeroChecksum=%t (on init)" , a .name , a .sendZeroChecksum )
setSupportedExtensions (&initAck .chunkInitCommon )
outbound .chunks = []chunk {initAck }
return pack (outbound ), nil
}
func (a *Association ) handleInitAck (pkt *packet , initChunkAck *chunkInitAck ) error {
state := a .getState ()
a .log .Debugf ("[%s] chunkInitAck received in state '%s'" , a .name , getAssociationStateString (state ))
if state != cookieWait {
return nil
}
a .myMaxNumInboundStreams = min16 (initChunkAck .numInboundStreams , a .myMaxNumInboundStreams )
a .myMaxNumOutboundStreams = min16 (initChunkAck .numOutboundStreams , a .myMaxNumOutboundStreams )
a .peerVerificationTag = initChunkAck .initiateTag
a .payloadQueue .init (initChunkAck .initialTSN - 1 )
if a .sourcePort != pkt .destinationPort ||
a .destinationPort != pkt .sourcePort {
a .log .Warnf ("[%s] handleInitAck: port mismatch" , a .name )
return nil
}
a .setRWND (initChunkAck .advertisedReceiverWindowCredit )
a .log .Debugf ("[%s] initial rwnd=%d" , a .name , a .RWND ())
a .ssthresh = a .RWND ()
a .log .Tracef ("[%s] updated cwnd=%d ssthresh=%d inflight=%d (INI)" ,
a .name , a .CWND (), a .ssthresh , a .inflightQueue .getNumBytes ())
a .t1Init .stop ()
a .storedInit = nil
var cookieParam *paramStateCookie
for _ , param := range initChunkAck .params {
switch v := param .(type ) {
case *paramStateCookie :
cookieParam = v
case *paramSupportedExtensions :
for _ , t := range v .ChunkTypes {
if t == ctForwardTSN {
a .log .Debugf ("[%s] use ForwardTSN (on initAck)" , a .name )
a .useForwardTSN = true
}
}
case *paramZeroChecksumAcceptable :
a .sendZeroChecksum = v .edmid == dtlsErrorDetectionMethod
}
}
a .log .Debugf ("[%s] sendZeroChecksum=%t (on initAck)" , a .name , a .sendZeroChecksum )
if !a .useForwardTSN {
a .log .Warnf ("[%s] not using ForwardTSN (on initAck)" , a .name )
}
if cookieParam == nil {
return ErrInitAckNoCookie
}
a .storedCookieEcho = &chunkCookieEcho {}
a .storedCookieEcho .cookie = cookieParam .cookie
err := a .sendCookieEcho ()
if err != nil {
a .log .Errorf ("[%s] failed to send init: %s" , a .name , err .Error())
}
a .t1Cookie .start (a .rtoMgr .getRTO ())
a .setState (cookieEchoed )
return nil
}
func (a *Association ) handleHeartbeat (c *chunkHeartbeat ) []*packet {
a .log .Tracef ("[%s] chunkHeartbeat" , a .name )
hbi , ok := c .params [0 ].(*paramHeartbeatInfo )
if !ok {
a .log .Warnf ("[%s] failed to handle Heartbeat, no ParamHeartbeatInfo" , a .name )
}
return pack (&packet {
verificationTag : a .peerVerificationTag ,
sourcePort : a .sourcePort ,
destinationPort : a .destinationPort ,
chunks : []chunk {&chunkHeartbeatAck {
params : []param {
¶mHeartbeatInfo {
heartbeatInformation : hbi .heartbeatInformation ,
},
},
}},
})
}
func (a *Association ) handleCookieEcho (cookieEcho *chunkCookieEcho ) []*packet {
state := a .getState ()
a .log .Debugf ("[%s] COOKIE-ECHO received in state '%s'" , a .name , getAssociationStateString (state ))
if a .myCookie == nil {
a .log .Debugf ("[%s] COOKIE-ECHO received before initialization" , a .name )
return nil
}
switch state {
default :
return nil
case established :
if !bytes .Equal (a .myCookie .cookie , cookieEcho .cookie ) {
return nil
}
case closed , cookieWait , cookieEchoed :
if !bytes .Equal (a .myCookie .cookie , cookieEcho .cookie ) {
return nil
}
a .t1Init .stop ()
a .storedInit = nil
a .t1Cookie .stop ()
a .storedCookieEcho = nil
a .setState (established )
if !a .completeHandshake (nil ) {
return nil
}
}
p := &packet {
verificationTag : a .peerVerificationTag ,
sourcePort : a .sourcePort ,
destinationPort : a .destinationPort ,
chunks : []chunk {&chunkCookieAck {}},
}
return pack (p )
}
func (a *Association ) handleCookieAck () {
state := a .getState ()
a .log .Debugf ("[%s] COOKIE-ACK received in state '%s'" , a .name , getAssociationStateString (state ))
if state != cookieEchoed {
return
}
a .t1Cookie .stop ()
a .storedCookieEcho = nil
a .setState (established )
a .completeHandshake (nil )
}
func (a *Association ) handleData (chunkPayload *chunkPayloadData ) []*packet {
a .log .Tracef ("[%s] DATA: tsn=%d immediateSack=%v len=%d" ,
a .name , chunkPayload .tsn , chunkPayload .immediateSack , len (chunkPayload .userData ))
a .stats .incDATAs ()
canPush := a .payloadQueue .canPush (chunkPayload .tsn )
if canPush {
stream := a .getOrCreateStream (chunkPayload .streamIdentifier , true , PayloadTypeUnknown )
if stream == nil {
a .log .Debugf ("[%s] discard %d" , a .name , chunkPayload .streamSequenceNumber )
return nil
}
if a .getMyReceiverWindowCredit () > 0 {
a .payloadQueue .push (chunkPayload .tsn )
stream .handleData (chunkPayload )
} else {
lastTSN , ok := a .payloadQueue .getLastTSNReceived ()
if ok && sna32LT (chunkPayload .tsn , lastTSN ) {
a .log .Debugf (
"[%s] receive buffer full, but accepted as this is a missing chunk with tsn=%d ssn=%d" ,
a .name , chunkPayload .tsn , chunkPayload .streamSequenceNumber ,
)
a .payloadQueue .push (chunkPayload .tsn )
stream .handleData (chunkPayload )
} else {
a .log .Debugf (
"[%s] receive buffer full. dropping DATA with tsn=%d ssn=%d" ,
a .name , chunkPayload .tsn , chunkPayload .streamSequenceNumber ,
)
}
}
}
return a .handlePeerLastTSNAndAcknowledgement (chunkPayload .immediateSack )
}
func (a *Association ) handlePeerLastTSNAndAcknowledgement (sackImmediately bool ) []*packet {
var reply []*packet
for {
if popOk := a .payloadQueue .pop (false ); !popOk {
break
}
for _ , rstReq := range a .reconfigRequests {
resp := a .resetStreamsIfAny (rstReq )
if resp != nil {
a .log .Debugf ("[%s] RESET RESPONSE: %+v" , a .name , resp )
reply = append (reply , resp )
}
}
}
hasPacketLoss := (a .payloadQueue .size () > 0 )
if hasPacketLoss {
a .log .Tracef ("[%s] packetloss: %s" , a .name , a .payloadQueue .getGapAckBlocksString ())
}
if (a .ackState != ackStateImmediate && !sackImmediately && !hasPacketLoss && a .ackMode == ackModeNormal ) ||
a .ackMode == ackModeAlwaysDelay {
if a .ackState == ackStateIdle {
a .delayedAckTriggered = true
} else {
a .immediateAckTriggered = true
}
} else {
a .immediateAckTriggered = true
}
return reply
}
func (a *Association ) getMyReceiverWindowCredit () uint32 {
var bytesQueued uint32
for _ , s := range a .streams {
bytesQueued += uint32 (s .getNumBytesInReassemblyQueue ())
}
if bytesQueued >= a .maxReceiveBufferSize {
return 0
}
return a .maxReceiveBufferSize - bytesQueued
}
func (a *Association ) OpenStream (
streamIdentifier uint16 ,
defaultPayloadType PayloadProtocolIdentifier ,
) (*Stream , error ) {
a .lock .Lock ()
defer a .lock .Unlock ()
switch a .getState () {
case shutdownAckSent , shutdownPending , shutdownReceived , shutdownSent , closed :
return nil , ErrAssociationClosed
}
return a .getOrCreateStream (streamIdentifier , false , defaultPayloadType ), nil
}
func (a *Association ) AcceptStream () (*Stream , error ) {
s , ok := <-a .acceptCh
if !ok {
return nil , io .EOF
}
return s , nil
}
func (a *Association ) createStream (streamIdentifier uint16 , accept bool ) *Stream {
stream := &Stream {
association : a ,
streamIdentifier : streamIdentifier ,
reassemblyQueue : newReassemblyQueue (streamIdentifier ),
log : a .log ,
name : fmt .Sprintf ("%d:%s" , streamIdentifier , a .name ),
writeDeadline : deadline .New (),
}
stream .readNotifier = sync .NewCond (&stream .lock )
if accept {
select {
case a .acceptCh <- stream :
a .streams [streamIdentifier ] = stream
a .log .Debugf ("[%s] accepted a new stream (streamIdentifier: %d)" ,
a .name , streamIdentifier )
default :
a .log .Debugf ("[%s] dropped a new stream (acceptCh size: %d)" ,
a .name , len (a .acceptCh ))
return nil
}
} else {
a .streams [streamIdentifier ] = stream
}
return stream
}
func (a *Association ) getOrCreateStream (
streamIdentifier uint16 ,
accept bool ,
defaultPayloadType PayloadProtocolIdentifier ,
) *Stream {
if s , ok := a .streams [streamIdentifier ]; ok {
s .SetDefaultPayloadType (defaultPayloadType )
return s
}
s := a .createStream (streamIdentifier , accept )
if s != nil {
s .SetDefaultPayloadType (defaultPayloadType )
}
return s
}
func (a *Association ) processSelectiveAck (selectiveAckChunk *chunkSelectiveAck ) (map [uint16 ]int , uint32 , error ) {
bytesAckedPerStream := map [uint16 ]int {}
for i := a .cumulativeTSNAckPoint + 1 ; sna32LTE (i , selectiveAckChunk .cumulativeTSNAck ); i ++ {
chunkPayload , ok := a .inflightQueue .pop (i )
if !ok {
return nil , 0 , fmt .Errorf ("%w: %v" , ErrInflightQueueTSNPop , i )
}
if !chunkPayload .acked {
if i == a .cumulativeTSNAckPoint +1 {
a .t3RTX .stop ()
}
nBytesAcked := len (chunkPayload .userData )
if amount , ok := bytesAckedPerStream [chunkPayload .streamIdentifier ]; ok {
bytesAckedPerStream [chunkPayload .streamIdentifier ] = amount + nBytesAcked
} else {
bytesAckedPerStream [chunkPayload .streamIdentifier ] = nBytesAcked
}
if chunkPayload .nSent == 1 && sna32GTE (chunkPayload .tsn , a .minTSN2MeasureRTT ) {
a .minTSN2MeasureRTT = a .myNextTSN
rtt := time .Since (chunkPayload .since ).Seconds () * 1000.0
srtt := a .rtoMgr .setNewRTT (rtt )
a .srtt .Store (srtt )
a .log .Tracef ("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f" ,
a .name , rtt , srtt , a .rtoMgr .getRTO ())
}
}
if a .inFastRecovery && chunkPayload .tsn == a .fastRecoverExitPoint {
a .log .Debugf ("[%s] exit fast-recovery" , a .name )
a .inFastRecovery = false
}
}
htna := selectiveAckChunk .cumulativeTSNAck
for _ , g := range selectiveAckChunk .gapAckBlocks {
for i := g .start ; i <= g .end ; i ++ {
tsn := selectiveAckChunk .cumulativeTSNAck + uint32 (i )
chunkPayload , ok := a .inflightQueue .get (tsn )
if !ok {
return nil , 0 , fmt .Errorf ("%w: %v" , ErrTSNRequestNotExist , tsn )
}
if !chunkPayload .acked {
nBytesAcked := a .inflightQueue .markAsAcked (tsn )
if amount , ok := bytesAckedPerStream [chunkPayload .streamIdentifier ]; ok {
bytesAckedPerStream [chunkPayload .streamIdentifier ] = amount + nBytesAcked
} else {
bytesAckedPerStream [chunkPayload .streamIdentifier ] = nBytesAcked
}
a .log .Tracef ("[%s] tsn=%d has been sacked" , a .name , chunkPayload .tsn )
if chunkPayload .nSent == 1 {
a .minTSN2MeasureRTT = a .myNextTSN
rtt := time .Since (chunkPayload .since ).Seconds () * 1000.0
srtt := a .rtoMgr .setNewRTT (rtt )
a .srtt .Store (srtt )
a .log .Tracef ("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f" ,
a .name , rtt , srtt , a .rtoMgr .getRTO ())
}
if sna32LT (htna , tsn ) {
htna = tsn
}
}
}
}
return bytesAckedPerStream , htna , nil
}
func (a *Association ) onCumulativeTSNAckPointAdvanced (totalBytesAcked int ) {
if a .inflightQueue .size () == 0 {
a .log .Tracef ("[%s] SACK: no more packet in-flight (pending=%d)" , a .name , a .pendingQueue .size ())
a .t3RTX .stop ()
} else {
a .log .Tracef ("[%s] T3-rtx timer start (pt2)" , a .name )
a .t3RTX .start (a .rtoMgr .getRTO ())
}
if a .CWND () <= a .ssthresh {
if !a .inFastRecovery &&
a .pendingQueue .size () > 0 {
a .setCWND (a .CWND () + min32 (uint32 (totalBytesAcked ), a .CWND ()))
a .log .Tracef ("[%s] updated cwnd=%d ssthresh=%d acked=%d (SS)" ,
a .name , a .CWND (), a .ssthresh , totalBytesAcked )
} else {
a .log .Tracef ("[%s] cwnd did not grow: cwnd=%d ssthresh=%d acked=%d FR=%v pending=%d" ,
a .name , a .CWND (), a .ssthresh , totalBytesAcked , a .inFastRecovery , a .pendingQueue .size ())
}
} else {
a .partialBytesAcked += uint32 (totalBytesAcked )
if a .partialBytesAcked >= a .CWND () && a .pendingQueue .size () > 0 {
a .partialBytesAcked -= a .CWND ()
step := a .MTU ()
if step < a .cwndCAStep {
step = a .cwndCAStep
}
a .setCWND (a .CWND () + step )
a .log .Tracef ("[%s] updated cwnd=%d ssthresh=%d acked=%d (CA)" ,
a .name , a .CWND (), a .ssthresh , totalBytesAcked )
}
}
}
func (a *Association ) processFastRetransmission (
cumTSNAckPoint uint32 ,
gapAckBlocks []gapAckBlock ,
htna uint32 ,
cumTSNAckPointAdvanced bool ,
) error {
if !a .inFastRecovery ||
(a .inFastRecovery && cumTSNAckPointAdvanced ) {
var maxTSN uint32
if !a .inFastRecovery {
maxTSN = htna
} else {
maxTSN = cumTSNAckPoint
if len (gapAckBlocks ) > 0 {
maxTSN += uint32 (gapAckBlocks [len (gapAckBlocks )-1 ].end )
}
}
for tsn := cumTSNAckPoint + 1 ; sna32LT (tsn , maxTSN ); tsn ++ {
c , ok := a .inflightQueue .get (tsn )
if !ok {
return fmt .Errorf ("%w: %v" , ErrTSNRequestNotExist , tsn )
}
if !c .acked && !c .abandoned () && c .missIndicator < 3 {
c .missIndicator ++
if c .missIndicator == 3 {
if !a .inFastRecovery {
a .inFastRecovery = true
a .fastRecoverExitPoint = htna
a .ssthresh = max32 (a .CWND ()/2 , 4 *a .MTU ())
a .setCWND (a .ssthresh )
a .partialBytesAcked = 0
a .willRetransmitFast = true
a .log .Tracef ("[%s] updated cwnd=%d ssthresh=%d inflight=%d (FR)" ,
a .name , a .CWND (), a .ssthresh , a .inflightQueue .getNumBytes ())
}
}
}
}
}
if a .inFastRecovery && cumTSNAckPointAdvanced {
a .willRetransmitFast = true
}
return nil
}
func (a *Association ) handleSack (selectiveAckChunk *chunkSelectiveAck ) error {
a .log .Tracef (
"[%s] SACK: cumTSN=%d a_rwnd=%d" ,
a .name , selectiveAckChunk .cumulativeTSNAck , selectiveAckChunk .advertisedReceiverWindowCredit ,
)
state := a .getState ()
if state != established && state != shutdownPending && state != shutdownReceived {
return nil
}
a .stats .incSACKsReceived ()
if sna32GT (a .cumulativeTSNAckPoint , selectiveAckChunk .cumulativeTSNAck ) {
a .log .Debugf ("[%s] SACK Cumulative ACK %v is older than ACK point %v" ,
a .name ,
selectiveAckChunk .cumulativeTSNAck ,
a .cumulativeTSNAckPoint )
return nil
}
bytesAckedPerStream , htna , err := a .processSelectiveAck (selectiveAckChunk )
if err != nil {
return err
}
var totalBytesAcked int
for _ , nBytesAcked := range bytesAckedPerStream {
totalBytesAcked += nBytesAcked
}
cumTSNAckPointAdvanced := false
if sna32LT (a .cumulativeTSNAckPoint , selectiveAckChunk .cumulativeTSNAck ) {
a .log .Tracef ("[%s] SACK: cumTSN advanced: %d -> %d" ,
a .name ,
a .cumulativeTSNAckPoint ,
selectiveAckChunk .cumulativeTSNAck )
a .cumulativeTSNAckPoint = selectiveAckChunk .cumulativeTSNAck
cumTSNAckPointAdvanced = true
a .onCumulativeTSNAckPointAdvanced (totalBytesAcked )
}
for si , nBytesAcked := range bytesAckedPerStream {
if s , ok := a .streams [si ]; ok {
a .lock .Unlock ()
s .onBufferReleased (nBytesAcked )
a .lock .Lock ()
}
}
bytesOutstanding := uint32 (a .inflightQueue .getNumBytes ())
if bytesOutstanding >= selectiveAckChunk .advertisedReceiverWindowCredit {
a .setRWND (0 )
} else {
a .setRWND (selectiveAckChunk .advertisedReceiverWindowCredit - bytesOutstanding )
}
err = a .processFastRetransmission (
selectiveAckChunk .cumulativeTSNAck , selectiveAckChunk .gapAckBlocks , htna , cumTSNAckPointAdvanced ,
)
if err != nil {
return err
}
if a .useForwardTSN {
if sna32LT (a .advancedPeerTSNAckPoint , a .cumulativeTSNAckPoint ) {
a .advancedPeerTSNAckPoint = a .cumulativeTSNAckPoint
}
for i := a .advancedPeerTSNAckPoint + 1 ; ; i ++ {
c , ok := a .inflightQueue .get (i )
if !ok {
break
}
if !c .abandoned () {
break
}
a .advancedPeerTSNAckPoint = i
}
if sna32GT (a .advancedPeerTSNAckPoint , a .cumulativeTSNAckPoint ) {
a .willSendForwardTSN = true
}
a .awakeWriteLoop ()
}
a .postprocessSack (state , cumTSNAckPointAdvanced )
return nil
}
func (a *Association ) postprocessSack (state uint32 , shouldAwakeWriteLoop bool ) {
switch {
case a .inflightQueue .size () > 0 :
a .log .Tracef ("[%s] T3-rtx timer start (pt3)" , a .name )
a .t3RTX .start (a .rtoMgr .getRTO ())
case state == shutdownPending :
shouldAwakeWriteLoop = true
a .willSendShutdown = true
a .setState (shutdownSent )
case state == shutdownReceived :
shouldAwakeWriteLoop = true
a .willSendShutdownAck = true
a .setState (shutdownAckSent )
}
if shouldAwakeWriteLoop {
a .awakeWriteLoop ()
}
}
func (a *Association ) handleShutdown (_ *chunkShutdown ) {
state := a .getState ()
switch state {
case established :
if a .inflightQueue .size () > 0 {
a .setState (shutdownReceived )
} else {
a .willSendShutdownAck = true
a .setState (shutdownAckSent )
a .awakeWriteLoop ()
}
case shutdownSent :
a .willSendShutdownAck = true
a .setState (shutdownAckSent )
a .awakeWriteLoop ()
}
}
func (a *Association ) handleShutdownAck (_ *chunkShutdownAck ) {
state := a .getState ()
if state == shutdownSent || state == shutdownAckSent {
a .t2Shutdown .stop ()
a .willSendShutdownComplete = true
a .awakeWriteLoop ()
}
}
func (a *Association ) handleShutdownComplete (_ *chunkShutdownComplete ) error {
state := a .getState ()
if state == shutdownAckSent {
a .t2Shutdown .stop ()
return a .close ()
}
return nil
}
func (a *Association ) handleAbort (c *chunkAbort ) error {
var errStr string
for _ , e := range c .errorCauses {
errStr += fmt .Sprintf ("(%s)" , e )
}
_ = a .close ()
return fmt .Errorf ("[%s] %w: %s" , a .name , ErrChunk , errStr )
}
func (a *Association ) createForwardTSN () *chunkForwardTSN {
streamMap := map [uint16 ]uint16 {}
for i := a .cumulativeTSNAckPoint + 1 ; sna32LTE (i , a .advancedPeerTSNAckPoint ); i ++ {
c , ok := a .inflightQueue .get (i )
if !ok {
break
}
ssn , ok := streamMap [c .streamIdentifier ]
if !ok {
streamMap [c .streamIdentifier ] = c .streamSequenceNumber
} else if sna16LT (ssn , c .streamSequenceNumber ) {
streamMap [c .streamIdentifier ] = c .streamSequenceNumber
}
}
fwdtsn := &chunkForwardTSN {
newCumulativeTSN : a .advancedPeerTSNAckPoint ,
streams : []chunkForwardTSNStream {},
}
var streamStr string
for si , ssn := range streamMap {
streamStr += fmt .Sprintf ("(si=%d ssn=%d)" , si , ssn )
fwdtsn .streams = append (fwdtsn .streams , chunkForwardTSNStream {
identifier : si ,
sequence : ssn ,
})
}
a .log .Tracef (
"[%s] building fwdtsn: newCumulativeTSN=%d cumTSN=%d - %s" ,
a .name , fwdtsn .newCumulativeTSN , a .cumulativeTSNAckPoint , streamStr ,
)
return fwdtsn
}
func (a *Association ) createPacket (cs []chunk ) *packet {
return &packet {
verificationTag : a .peerVerificationTag ,
sourcePort : a .sourcePort ,
destinationPort : a .destinationPort ,
chunks : cs ,
}
}
func (a *Association ) handleReconfig (reconfigChunk *chunkReconfig ) ([]*packet , error ) {
a .log .Tracef ("[%s] handleReconfig" , a .name )
pp := make ([]*packet , 0 )
pkt , err := a .handleReconfigParam (reconfigChunk .paramA )
if err != nil {
return nil , err
}
if pkt != nil {
pp = append (pp , pkt )
}
if reconfigChunk .paramB != nil {
pkt , err = a .handleReconfigParam (reconfigChunk .paramB )
if err != nil {
return nil , err
}
if pkt != nil {
pp = append (pp , pkt )
}
}
return pp , nil
}
func (a *Association ) handleForwardTSN (chunkTSN *chunkForwardTSN ) []*packet {
a .log .Tracef ("[%s] FwdTSN: %s" , a .name , chunkTSN .String ())
if !a .useForwardTSN {
a .log .Warn ("[%s] received FwdTSN but not enabled" )
cerr := &chunkError {
errorCauses : []errorCause {&errorCauseUnrecognizedChunkType {}},
}
outbound := &packet {}
outbound .verificationTag = a .peerVerificationTag
outbound .sourcePort = a .sourcePort
outbound .destinationPort = a .destinationPort
outbound .chunks = []chunk {cerr }
return []*packet {outbound }
}
a .log .Tracef ("[%s] should send ack? newCumTSN=%d peerLastTSN=%d" ,
a .name , chunkTSN .newCumulativeTSN , a .peerLastTSN ())
if sna32LTE (chunkTSN .newCumulativeTSN , a .peerLastTSN ()) {
a .log .Tracef ("[%s] sending ack on Forward TSN" , a .name )
a .ackState = ackStateImmediate
a .ackTimer .stop ()
a .awakeWriteLoop ()
return nil
}
for sna32LT (a .peerLastTSN (), chunkTSN .newCumulativeTSN ) {
a .payloadQueue .pop (true )
}
for _ , forwarded := range chunkTSN .streams {
if s , ok := a .streams [forwarded .identifier ]; ok {
s .handleForwardTSNForOrdered (forwarded .sequence )
}
}
for _ , s := range a .streams {
s .handleForwardTSNForUnordered (chunkTSN .newCumulativeTSN )
}
return a .handlePeerLastTSNAndAcknowledgement (false )
}
func (a *Association ) sendResetRequest (streamIdentifier uint16 ) error {
a .lock .Lock ()
defer a .lock .Unlock ()
state := a .getState ()
if state != established {
return fmt .Errorf ("%w: state=%s" , ErrResetPacketInStateNotExist ,
getAssociationStateString (state ))
}
c := &chunkPayloadData {
streamIdentifier : streamIdentifier ,
beginningFragment : true ,
endingFragment : true ,
userData : nil ,
}
a .pendingQueue .push (c )
a .awakeWriteLoop ()
return nil
}
func (a *Association ) handleReconfigParam (raw param ) (*packet , error ) {
switch par := raw .(type ) {
case *paramOutgoingResetRequest :
a .log .Tracef ("[%s] handleReconfigParam (OutgoingResetRequest)" , a .name )
if a .peerLastTSN () < par .senderLastTSN && len (a .reconfigRequests ) >= maxReconfigRequests {
return nil , fmt .Errorf ("%w: %d" , ErrTooManyReconfigRequests , len (a .reconfigRequests ))
}
a .reconfigRequests [par .reconfigRequestSequenceNumber ] = par
resp := a .resetStreamsIfAny (par )
if resp != nil {
return resp , nil
}
return nil , nil
case *paramReconfigResponse :
a .log .Tracef ("[%s] handleReconfigParam (ReconfigResponse)" , a .name )
if par .result == reconfigResultInProgress {
if _ , ok := a .reconfigs [par .reconfigResponseSequenceNumber ]; ok {
a .tReconfig .stop ()
a .tReconfig .start (a .rtoMgr .getRTO ())
}
return nil , nil
}
delete (a .reconfigs , par .reconfigResponseSequenceNumber )
if len (a .reconfigs ) == 0 {
a .tReconfig .stop ()
}
return nil , nil
default :
return nil , fmt .Errorf ("%w: %t" , ErrParamterType , par )
}
}
func (a *Association ) resetStreamsIfAny (resetRequest *paramOutgoingResetRequest ) *packet {
result := reconfigResultSuccessPerformed
if sna32LTE (resetRequest .senderLastTSN , a .peerLastTSN ()) {
a .log .Debugf ("[%s] resetStream(): senderLastTSN=%d <= peerLastTSN=%d" ,
a .name , resetRequest .senderLastTSN , a .peerLastTSN ())
for _ , id := range resetRequest .streamIdentifiers {
s , ok := a .streams [id ]
if !ok {
continue
}
a .lock .Unlock ()
s .onInboundStreamReset ()
a .lock .Lock ()
a .log .Debugf ("[%s] deleting stream %d" , a .name , id )
delete (a .streams , s .streamIdentifier )
}
delete (a .reconfigRequests , resetRequest .reconfigRequestSequenceNumber )
} else {
a .log .Debugf ("[%s] resetStream(): senderLastTSN=%d > peerLastTSN=%d" ,
a .name , resetRequest .senderLastTSN , a .peerLastTSN ())
result = reconfigResultInProgress
}
return a .createPacket ([]chunk {&chunkReconfig {
paramA : ¶mReconfigResponse {
reconfigResponseSequenceNumber : resetRequest .reconfigRequestSequenceNumber ,
result : result ,
},
}})
}
func (a *Association ) movePendingDataChunkToInflightQueue (chunkPayload *chunkPayloadData ) {
if err := a .pendingQueue .pop (chunkPayload ); err != nil {
a .log .Errorf ("[%s] failed to pop from pending queue: %s" , a .name , err .Error())
}
if chunkPayload .endingFragment {
chunkPayload .setAllInflight ()
}
chunkPayload .tsn = a .generateNextTSN ()
chunkPayload .since = time .Now ()
chunkPayload .nSent = 1
a .checkPartialReliabilityStatus (chunkPayload )
a .log .Tracef (
"[%s] sending ppi=%d tsn=%d ssn=%d sent=%d len=%d (%v,%v)" ,
a .name ,
chunkPayload .payloadType ,
chunkPayload .tsn ,
chunkPayload .streamSequenceNumber ,
chunkPayload .nSent ,
len (chunkPayload .userData ),
chunkPayload .beginningFragment ,
chunkPayload .endingFragment ,
)
a .inflightQueue .pushNoCheck (chunkPayload )
}
func (a *Association ) popPendingDataChunksToSend () ([]*chunkPayloadData , []uint16 ) {
chunks := []*chunkPayloadData {}
var sisToReset []uint16
if a .pendingQueue .size () > 0 {
for {
chunkPayload := a .pendingQueue .peek ()
if chunkPayload == nil {
break
}
dataLen := uint32 (len (chunkPayload .userData ))
if dataLen == 0 {
sisToReset = append (sisToReset , chunkPayload .streamIdentifier )
err := a .pendingQueue .pop (chunkPayload )
if err != nil {
a .log .Errorf ("failed to pop from pending queue: %s" , err .Error())
}
continue
}
if uint32 (a .inflightQueue .getNumBytes ())+dataLen > a .CWND () {
break
}
if dataLen > a .RWND () {
break
}
a .setRWND (a .RWND () - dataLen )
a .movePendingDataChunkToInflightQueue (chunkPayload )
chunks = append (chunks , chunkPayload )
}
if len (chunks ) == 0 && a .inflightQueue .size () == 0 {
c := a .pendingQueue .peek ()
if c != nil {
a .movePendingDataChunkToInflightQueue (c )
chunks = append (chunks , c )
}
}
}
if a .blockWrite && len (chunks ) > 0 && a .pendingQueue .size () == 0 {
a .log .Tracef ("[%s] all pending data have been sent, notify writable" , a .name )
a .notifyBlockWritable ()
}
return chunks , sisToReset
}
func (a *Association ) bundleDataChunksIntoPackets (chunks []*chunkPayloadData ) []*packet {
packets := []*packet {}
chunksToSend := []chunk {}
bytesInPacket := int (commonHeaderSize )
for _ , chunkPayload := range chunks {
chunkSizeInPacket := int (dataChunkHeaderSize ) + len (chunkPayload .userData )
chunkSizeInPacket += getPadding (chunkSizeInPacket )
if bytesInPacket +chunkSizeInPacket > int (a .MTU ()) {
packets = append (packets , a .createPacket (chunksToSend ))
chunksToSend = []chunk {}
bytesInPacket = int (commonHeaderSize )
}
chunksToSend = append (chunksToSend , chunkPayload )
bytesInPacket += chunkSizeInPacket
}
if len (chunksToSend ) > 0 {
packets = append (packets , a .createPacket (chunksToSend ))
}
return packets
}
func (a *Association ) sendPayloadData (ctx context .Context , chunks []*chunkPayloadData ) error {
a .lock .Lock ()
state := a .getState ()
if state != established {
a .lock .Unlock ()
return fmt .Errorf ("%w: state=%s" , ErrPayloadDataStateNotExist ,
getAssociationStateString (state ))
}
if a .blockWrite {
for a .writePending {
a .lock .Unlock ()
select {
case <- ctx .Done ():
return ctx .Err ()
case <- a .writeNotify :
a .lock .Lock ()
}
}
a .writePending = true
}
for _ , c := range chunks {
a .pendingQueue .push (c )
}
a .lock .Unlock ()
a .awakeWriteLoop ()
return nil
}
func (a *Association ) checkPartialReliabilityStatus (chunkPayload *chunkPayloadData ) {
if !a .useForwardTSN {
return
}
if chunkPayload .payloadType == PayloadTypeWebRTCDCEP {
return
}
if stream , ok := a .streams [chunkPayload .streamIdentifier ]; ok {
stream .lock .RLock ()
if stream .reliabilityType == ReliabilityTypeRexmit {
if chunkPayload .nSent >= stream .reliabilityValue {
chunkPayload .setAbandoned (true )
a .log .Tracef (
"[%s] marked as abandoned: tsn=%d ppi=%d (remix: %d)" ,
a .name , chunkPayload .tsn , chunkPayload .payloadType , chunkPayload .nSent ,
)
}
} else if stream .reliabilityType == ReliabilityTypeTimed {
elapsed := int64 (time .Since (chunkPayload .since ).Seconds () * 1000 )
if elapsed >= int64 (stream .reliabilityValue ) {
chunkPayload .setAbandoned (true )
a .log .Tracef (
"[%s] marked as abandoned: tsn=%d ppi=%d (timed: %d)" ,
a .name , chunkPayload .tsn , chunkPayload .payloadType , elapsed ,
)
}
}
stream .lock .RUnlock ()
} else {
a .log .Tracef ("[%s] stream %d not found, remote reset" , a .name , chunkPayload .streamIdentifier )
}
}
func (a *Association ) getDataPacketsToRetransmit () []*packet {
awnd := min32 (a .CWND (), a .RWND ())
chunks := []*chunkPayloadData {}
var bytesToSend int
var done bool
for i := 0 ; !done ; i ++ {
chunkPayload , ok := a .inflightQueue .get (a .cumulativeTSNAckPoint + uint32 (i ) + 1 )
if !ok {
break
}
if !chunkPayload .retransmit {
continue
}
if i == 0 && int (a .RWND ()) < len (chunkPayload .userData ) {
done = true
} else if bytesToSend +len (chunkPayload .userData ) > int (awnd ) {
break
}
chunkPayload .retransmit = false
bytesToSend += len (chunkPayload .userData )
chunkPayload .nSent ++
a .checkPartialReliabilityStatus (chunkPayload )
a .log .Tracef (
"[%s] retransmitting tsn=%d ssn=%d sent=%d" ,
a .name , chunkPayload .tsn , chunkPayload .streamSequenceNumber , chunkPayload .nSent ,
)
chunks = append (chunks , chunkPayload )
}
return a .bundleDataChunksIntoPackets (chunks )
}
func (a *Association ) generateNextTSN () uint32 {
tsn := a .myNextTSN
a .myNextTSN ++
return tsn
}
func (a *Association ) generateNextRSN () uint32 {
rsn := a .myNextRSN
a .myNextRSN ++
return rsn
}
func (a *Association ) createSelectiveAckChunk () *chunkSelectiveAck {
sack := &chunkSelectiveAck {}
sack .cumulativeTSNAck = a .peerLastTSN ()
sack .advertisedReceiverWindowCredit = a .getMyReceiverWindowCredit ()
sack .duplicateTSN = a .payloadQueue .popDuplicates ()
sack .gapAckBlocks = a .payloadQueue .getGapAckBlocks ()
return sack
}
func pack(p *packet ) []*packet {
return []*packet {p }
}
func (a *Association ) handleChunksStart () {
a .lock .Lock ()
defer a .lock .Unlock ()
a .stats .incPacketsReceived ()
a .delayedAckTriggered = false
a .immediateAckTriggered = false
}
func (a *Association ) handleChunksEnd () {
a .lock .Lock ()
defer a .lock .Unlock ()
if a .immediateAckTriggered {
a .ackState = ackStateImmediate
a .ackTimer .stop ()
a .awakeWriteLoop ()
} else if a .delayedAckTriggered {
a .ackState = ackStateDelay
a .ackTimer .start ()
}
}
func (a *Association ) handleChunk (receivedPacket *packet , receivedChunk chunk ) error {
a .lock .Lock ()
defer a .lock .Unlock ()
var packets []*packet
var err error
if _, err = receivedChunk .check (); err != nil {
a .log .Errorf ("[%s] failed validating chunk: %s " , a .name , err )
return nil
}
isAbort := false
switch receivedChunk := receivedChunk .(type ) {
case *chunkInit :
packets , err = a .handleInit (receivedPacket , receivedChunk )
case *chunkInitAck :
err = a .handleInitAck (receivedPacket , receivedChunk )
case *chunkAbort :
isAbort = true
err = a .handleAbort (receivedChunk )
case *chunkError :
var errStr string
for _ , e := range receivedChunk .errorCauses {
errStr += fmt .Sprintf ("(%s)" , e )
}
a .log .Debugf ("[%s] Error chunk, with following errors: %s" , a .name , errStr )
case *chunkHeartbeat :
packets = a .handleHeartbeat (receivedChunk )
case *chunkCookieEcho :
packets = a .handleCookieEcho (receivedChunk )
case *chunkCookieAck :
a .handleCookieAck ()
case *chunkPayloadData :
packets = a .handleData (receivedChunk )
case *chunkSelectiveAck :
err = a .handleSack (receivedChunk )
case *chunkReconfig :
packets , err = a .handleReconfig (receivedChunk )
case *chunkForwardTSN :
packets = a .handleForwardTSN (receivedChunk )
case *chunkShutdown :
a .handleShutdown (receivedChunk )
case *chunkShutdownAck :
a .handleShutdownAck (receivedChunk )
case *chunkShutdownComplete :
err = a .handleShutdownComplete (receivedChunk )
default :
err = ErrChunkTypeUnhandled
}
if err != nil {
if isAbort {
return err
}
a .log .Errorf ("Failed to handle chunk: %v" , err )
return nil
}
if len (packets ) > 0 {
a .controlQueue .pushAll (packets )
a .awakeWriteLoop ()
}
return nil
}
func (a *Association ) onRetransmissionTimeout (id int , nRtos uint ) {
a .lock .Lock ()
defer a .lock .Unlock ()
if a .cumulativeTSNAckPoint +1 == a .initialTSN && nRtos %3 == 0 {
a .sendZeroChecksum = !a .sendZeroChecksum
}
if id == timerT1Init {
err := a .sendInit ()
if err != nil {
a .log .Debugf ("[%s] failed to retransmit init (nRtos=%d): %v" , a .name , nRtos , err )
}
return
}
if id == timerT1Cookie {
err := a .sendCookieEcho ()
if err != nil {
a .log .Debugf ("[%s] failed to retransmit cookie-echo (nRtos=%d): %v" , a .name , nRtos , err )
}
return
}
if id == timerT2Shutdown {
a .log .Debugf ("[%s] retransmission of shutdown timeout (nRtos=%d): %v" , a .name , nRtos )
state := a .getState ()
switch state {
case shutdownSent :
a .willSendShutdown = true
a .awakeWriteLoop ()
case shutdownAckSent :
a .willSendShutdownAck = true
a .awakeWriteLoop ()
}
}
if id == timerT3RTX {
a .stats .incT3Timeouts ()
a .ssthresh = max32 (a .CWND ()/2 , 4 *a .MTU ())
a .setCWND (a .MTU ())
a .log .Tracef ("[%s] updated cwnd=%d ssthresh=%d inflight=%d (RTO)" ,
a .name , a .CWND (), a .ssthresh , a .inflightQueue .getNumBytes ())
if a .useForwardTSN {
for i := a .advancedPeerTSNAckPoint + 1 ; ; i ++ {
c , ok := a .inflightQueue .get (i )
if !ok {
break
}
if !c .abandoned () {
break
}
a .advancedPeerTSNAckPoint = i
}
if sna32GT (a .advancedPeerTSNAckPoint , a .cumulativeTSNAckPoint ) {
a .willSendForwardTSN = true
}
}
a .log .Debugf ("[%s] T3-rtx timed out: nRtos=%d cwnd=%d ssthresh=%d" , a .name , nRtos , a .CWND (), a .ssthresh )
a .inflightQueue .markAllToRetrasmit ()
a .awakeWriteLoop ()
return
}
if id == timerReconfig {
a .willRetransmitReconfig = true
a .awakeWriteLoop ()
}
}
func (a *Association ) onRetransmissionFailure (id int ) {
a .lock .Lock ()
defer a .lock .Unlock ()
if id == timerT1Init {
a .log .Errorf ("[%s] retransmission failure: T1-init" , a .name )
a .completeHandshake (ErrHandshakeInitAck )
return
}
if id == timerT1Cookie {
a .log .Errorf ("[%s] retransmission failure: T1-cookie" , a .name )
a .completeHandshake (ErrHandshakeCookieEcho )
return
}
if id == timerT2Shutdown {
a .log .Errorf ("[%s] retransmission failure: T2-shutdown" , a .name )
return
}
if id == timerT3RTX {
a .log .Errorf ("[%s] retransmission failure: T3-rtx (DATA)" , a .name )
return
}
}
func (a *Association ) onAckTimeout () {
a .lock .Lock ()
defer a .lock .Unlock ()
a .log .Tracef ("[%s] ack timed out (ackState: %d)" , a .name , a .ackState )
a .stats .incAckTimeouts ()
a .ackState = ackStateImmediate
a .awakeWriteLoop ()
}
func (a *Association ) BufferedAmount () int {
a .lock .RLock ()
defer a .lock .RUnlock ()
return a .pendingQueue .getNumBytes () + a .inflightQueue .getNumBytes ()
}
func (a *Association ) MaxMessageSize () uint32 {
return atomic .LoadUint32 (&a .maxMessageSize )
}
func (a *Association ) SetMaxMessageSize (maxMsgSize uint32 ) {
atomic .StoreUint32 (&a .maxMessageSize , maxMsgSize )
}
func (a *Association ) completeHandshake (handshakeErr error ) bool {
select {
case a .handshakeCompletedCh <- handshakeErr :
return true
case <- a .closeWriteLoopCh :
case <- a .readLoopCloseCh :
}
return false
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .