package sctp
import (
"errors"
"io"
"sort"
"sync/atomic"
)
func sortChunksByTSN(a []*chunkPayloadData ) {
sort .Slice (a , func (i , j int ) bool {
return sna32LT (a [i ].tsn , a [j ].tsn )
})
}
func sortChunksBySSN(a []*chunkSet ) {
sort .Slice (a , func (i , j int ) bool {
return sna16LT (a [i ].ssn , a [j ].ssn )
})
}
type chunkSet struct {
ssn uint16
ppi PayloadProtocolIdentifier
chunks []*chunkPayloadData
}
func newChunkSet(ssn uint16 , ppi PayloadProtocolIdentifier ) *chunkSet {
return &chunkSet {
ssn : ssn ,
ppi : ppi ,
chunks : []*chunkPayloadData {},
}
}
func (set *chunkSet ) push (chunk *chunkPayloadData ) bool {
for _ , c := range set .chunks {
if c .tsn == chunk .tsn {
return false
}
}
set .chunks = append (set .chunks , chunk )
sortChunksByTSN (set .chunks )
complete := set .isComplete ()
return complete
}
func (set *chunkSet ) isComplete () bool {
nChunks := len (set .chunks )
if nChunks == 0 {
return false
}
if !set .chunks [0 ].beginningFragment {
return false
}
if !set .chunks [nChunks -1 ].endingFragment {
return false
}
var lastTSN uint32
for i , chunk := range set .chunks {
if i > 0 {
if chunk .tsn != lastTSN +1 {
return false
}
}
lastTSN = chunk .tsn
}
return true
}
type reassemblyQueue struct {
si uint16
nextSSN uint16
ordered []*chunkSet
unordered []*chunkSet
unorderedChunks []*chunkPayloadData
nBytes uint64
}
var errTryAgain = errors .New ("try again" )
func newReassemblyQueue(si uint16 ) *reassemblyQueue {
return &reassemblyQueue {
si : si ,
nextSSN : 0 ,
ordered : make ([]*chunkSet , 0 ),
unordered : make ([]*chunkSet , 0 ),
}
}
func (r *reassemblyQueue ) push (chunk *chunkPayloadData ) bool {
var cset *chunkSet
if chunk .streamIdentifier != r .si {
return false
}
if chunk .unordered {
r .unorderedChunks = append (r .unorderedChunks , chunk )
atomic .AddUint64 (&r .nBytes , uint64 (len (chunk .userData )))
sortChunksByTSN (r .unorderedChunks )
cset = r .findCompleteUnorderedChunkSet ()
if cset != nil {
r .unordered = append (r .unordered , cset )
return true
}
return false
}
if sna16LT (chunk .streamSequenceNumber , r .nextSSN ) {
return false
}
if chunk .isFragmented () {
for _ , set := range r .ordered {
if set .ssn == chunk .streamSequenceNumber && set .chunks [0 ].isFragmented () {
cset = set
break
}
}
}
if cset == nil {
cset = newChunkSet (chunk .streamSequenceNumber , chunk .payloadType )
r .ordered = append (r .ordered , cset )
if !chunk .unordered {
sortChunksBySSN (r .ordered )
}
}
atomic .AddUint64 (&r .nBytes , uint64 (len (chunk .userData )))
return cset .push (chunk )
}
func (r *reassemblyQueue ) findCompleteUnorderedChunkSet () *chunkSet {
startIdx := -1
nChunks := 0
var lastTSN uint32
var found bool
for i , chunk := range r .unorderedChunks {
if chunk .beginningFragment {
startIdx = i
nChunks = 1
lastTSN = chunk .tsn
if chunk .endingFragment {
found = true
break
}
continue
}
if startIdx < 0 {
continue
}
if chunk .tsn != lastTSN +1 {
startIdx = -1
continue
}
lastTSN = chunk .tsn
nChunks ++
if chunk .endingFragment {
found = true
break
}
}
if !found {
return nil
}
var chunks []*chunkPayloadData
chunks = append (chunks , r .unorderedChunks [startIdx :startIdx +nChunks ]...)
r .unorderedChunks = append (
r .unorderedChunks [:startIdx ],
r .unorderedChunks [startIdx +nChunks :]...)
chunkSet := newChunkSet (0 , chunks [0 ].payloadType )
chunkSet .chunks = chunks
return chunkSet
}
func (r *reassemblyQueue ) isReadable () bool {
if len (r .unordered ) > 0 {
return true
}
if len (r .ordered ) > 0 {
cset := r .ordered [0 ]
if cset .isComplete () {
if sna16LTE (cset .ssn , r .nextSSN ) {
return true
}
}
}
return false
}
func (r *reassemblyQueue ) read (buf []byte ) (int , PayloadProtocolIdentifier , error ) {
var (
cset *chunkSet
isUnordered bool
nTotal int
err error
)
switch {
case len (r .unordered ) > 0 :
cset = r .unordered [0 ]
isUnordered = true
case len (r .ordered ) > 0 :
cset = r .ordered [0 ]
if !cset .isComplete () {
return 0 , 0 , errTryAgain
}
if sna16GT (cset .ssn , r .nextSSN ) {
return 0 , 0 , errTryAgain
}
default :
return 0 , 0 , errTryAgain
}
for _ , c := range cset .chunks {
if len (buf )-nTotal < len (c .userData ) {
err = io .ErrShortBuffer
} else {
copy (buf [nTotal :], c .userData )
}
nTotal += len (c .userData )
}
switch {
case err != nil :
return nTotal , 0 , err
case isUnordered :
r .unordered = r .unordered [1 :]
default :
r .ordered = r .ordered [1 :]
if cset .ssn == r .nextSSN {
r .nextSSN ++
}
}
r .subtractNumBytes (nTotal )
return nTotal , cset .ppi , err
}
func (r *reassemblyQueue ) forwardTSNForOrdered (lastSSN uint16 ) {
keep := []*chunkSet {}
for _ , set := range r .ordered {
if sna16LTE (set .ssn , lastSSN ) {
if !set .isComplete () {
for _ , c := range set .chunks {
r .subtractNumBytes (len (c .userData ))
}
continue
}
}
keep = append (keep , set )
}
r .ordered = keep
if sna16LTE (r .nextSSN , lastSSN ) {
r .nextSSN = lastSSN + 1
}
}
func (r *reassemblyQueue ) forwardTSNForUnordered (newCumulativeTSN uint32 ) {
lastIdx := -1
for i , c := range r .unorderedChunks {
if sna32GT (c .tsn , newCumulativeTSN ) {
break
}
lastIdx = i
}
if lastIdx >= 0 {
for _ , c := range r .unorderedChunks [0 : lastIdx +1 ] {
r .subtractNumBytes (len (c .userData ))
}
r .unorderedChunks = r .unorderedChunks [lastIdx +1 :]
}
}
func (r *reassemblyQueue ) subtractNumBytes (nBytes int ) {
cur := atomic .LoadUint64 (&r .nBytes )
if int (cur ) >= nBytes {
atomic .AddUint64 (&r .nBytes , -uint64 (nBytes ))
} else {
atomic .StoreUint64 (&r .nBytes , 0 )
}
}
func (r *reassemblyQueue ) getNumBytes () int {
return int (atomic .LoadUint64 (&r .nBytes ))
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .