// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package sctp

import (
	
	
	
	
	
	
	
	
	
	

	
	
	
)

// Port 5000 shows up in examples for SDPs used by WebRTC. Since this implementation
// assumes it will be used by DTLS over UDP, the port is only meaningful for de-multiplexing
// but more-so verification.
// Example usage: https://www.rfc-editor.org/rfc/rfc8841.html#section-13.1-2
const defaultSCTPSrcDstPort = 5000

// Use global random generator to properly seed by crypto grade random.
var globalMathRandomGenerator = randutil.NewMathRandomGenerator() // nolint:gochecknoglobals

// Association errors.
var (
	ErrChunk                         = errors.New("abort chunk, with following errors")
	ErrShutdownNonEstablished        = errors.New("shutdown called in non-established state")
	ErrAssociationClosedBeforeConn   = errors.New("association closed before connecting")
	ErrAssociationClosed             = errors.New("association closed")
	ErrSilentlyDiscard               = errors.New("silently discard")
	ErrInitNotStoredToSend           = errors.New("the init not stored to send")
	ErrCookieEchoNotStoredToSend     = errors.New("cookieEcho not stored to send")
	ErrSCTPPacketSourcePortZero      = errors.New("sctp packet must not have a source port of 0")
	ErrSCTPPacketDestinationPortZero = errors.New("sctp packet must not have a destination port of 0")
	ErrInitChunkBundled              = errors.New("init chunk must not be bundled with any other chunk")
	ErrInitChunkVerifyTagNotZero     = errors.New(
		"init chunk expects a verification tag of 0 on the packet when out-of-the-blue",
	)
	ErrHandleInitState            = errors.New("todo: handle Init when in state")
	ErrInitAckNoCookie            = errors.New("no cookie in InitAck")
	ErrInflightQueueTSNPop        = errors.New("unable to be popped from inflight queue TSN")
	ErrTSNRequestNotExist         = errors.New("requested non-existent TSN")
	ErrResetPacketInStateNotExist = errors.New("sending reset packet in non-established state")
	ErrParamterType               = errors.New("unexpected parameter type")
	ErrPayloadDataStateNotExist   = errors.New("sending payload data in non-established state")
	ErrChunkTypeUnhandled         = errors.New("unhandled chunk type")
	ErrHandshakeInitAck           = errors.New("handshake failed (INIT ACK)")
	ErrHandshakeCookieEcho        = errors.New("handshake failed (COOKIE ECHO)")
	ErrTooManyReconfigRequests    = errors.New("too many outstanding reconfig requests")
)

const (
	receiveMTU            uint32 = 8192 // MTU for inbound packet (from DTLS)
	initialMTU            uint32 = 1228 // initial MTU for outgoing packets (to DTLS)
	initialRecvBufSize    uint32 = 1024 * 1024
	commonHeaderSize      uint32 = 12
	dataChunkHeaderSize   uint32 = 16
	defaultMaxMessageSize uint32 = 65536
)

// association state enums.
const (
	closed uint32 = iota
	cookieWait
	cookieEchoed
	established
	shutdownAckSent
	shutdownPending
	shutdownReceived
	shutdownSent
)

// retransmission timer IDs.
const (
	timerT1Init int = iota
	timerT1Cookie
	timerT2Shutdown
	timerT3RTX
	timerReconfig
)

// ack mode (for testing).
const (
	ackModeNormal int = iota
	ackModeNoDelay
	ackModeAlwaysDelay
)

// ack transmission state.
const (
	ackStateIdle      int = iota // ack timer is off
	ackStateImmediate            // will send ack immediately
	ackStateDelay                // ack timer is on (ack is being delayed)
)

// other constants.
const (
	acceptChSize = 16
	// avgChunkSize is an estimate of the average chunk size. There is no theory behind
	// this estimate.
	avgChunkSize = 500
	// minTSNOffset is the minimum offset over the cummulative TSN that we will enqueue
	// irrespective of the receive buffer size
	// see getMaxTSNOffset.
	minTSNOffset = 2000
	// maxTSNOffset is the maximum offset over the cummulative TSN that we will enqueue
	// irrespective of the receive buffer size
	// see getMaxTSNOffset.
	maxTSNOffset = 40000
	// maxReconfigRequests is the maximum number of reconfig requests we will keep outstanding.
	maxReconfigRequests = 1000
)

func getAssociationStateString( uint32) string {
	switch  {
	case closed:
		return "Closed"
	case cookieWait:
		return "CookieWait"
	case cookieEchoed:
		return "CookieEchoed"
	case established:
		return "Established"
	case shutdownPending:
		return "ShutdownPending"
	case shutdownSent:
		return "ShutdownSent"
	case shutdownReceived:
		return "ShutdownReceived"
	case shutdownAckSent:
		return "ShutdownAckSent"
	default:
		return fmt.Sprintf("Invalid association state %d", )
	}
}

// Association represents an SCTP association
// 13.2.  Parameters Necessary per Association (i.e., the TCB)
//
//	Peer        : Tag value to be sent in every packet and is received
//	Verification: in the INIT or INIT ACK chunk.
//	Tag         :
//	State       : A state variable indicating what state the association
//	            : is in, i.e., COOKIE-WAIT, COOKIE-ECHOED, ESTABLISHED,
//	            : SHUTDOWN-PENDING, SHUTDOWN-SENT, SHUTDOWN-RECEIVED,
//	            : SHUTDOWN-ACK-SENT.
//
// Note: No "CLOSED" state is illustrated since if a
// association is "CLOSED" its TCB SHOULD be removed.
// Note: By nature of an Association being constructed with one net.Conn,
// it is not a multi-home supporting implementation of SCTP.
type Association struct {
	bytesReceived uint64
	bytesSent     uint64

	lock sync.RWMutex

	netConn net.Conn

	peerVerificationTag    uint32
	myVerificationTag      uint32
	state                  uint32
	initialTSN             uint32
	myNextTSN              uint32 // nextTSN
	minTSN2MeasureRTT      uint32 // for RTT measurement
	willSendForwardTSN     bool
	willRetransmitFast     bool
	willRetransmitReconfig bool

	willSendShutdown         bool
	willSendShutdownAck      bool
	willSendShutdownComplete bool

	willSendAbort      bool
	willSendAbortCause errorCause

	// Reconfig
	myNextRSN        uint32
	reconfigs        map[uint32]*chunkReconfig
	reconfigRequests map[uint32]*paramOutgoingResetRequest

	// Non-RFC internal data
	sourcePort              uint16
	destinationPort         uint16
	myMaxNumInboundStreams  uint16
	myMaxNumOutboundStreams uint16
	myCookie                *paramStateCookie
	payloadQueue            *receivePayloadQueue
	inflightQueue           *payloadQueue
	pendingQueue            *pendingQueue
	controlQueue            *controlQueue
	mtu                     uint32
	maxPayloadSize          uint32       // max DATA chunk payload size
	srtt                    atomic.Value // type float64
	cumulativeTSNAckPoint   uint32
	advancedPeerTSNAckPoint uint32
	useForwardTSN           bool
	sendZeroChecksum        bool
	recvZeroChecksum        bool

	// Congestion control parameters
	maxReceiveBufferSize uint32
	maxMessageSize       uint32
	cwnd                 uint32 // my congestion window size
	rwnd                 uint32 // calculated peer's receiver windows size
	ssthresh             uint32 // slow start threshold
	partialBytesAcked    uint32
	inFastRecovery       bool
	fastRecoverExitPoint uint32
	minCwnd              uint32 // Minimum congestion window
	fastRtxWnd           uint32 // Send window for fast retransmit
	cwndCAStep           uint32 // Step of congestion window increase at Congestion Avoidance

	// RTX & Ack timer
	rtoMgr     *rtoManager
	t1Init     *rtxTimer
	t1Cookie   *rtxTimer
	t2Shutdown *rtxTimer
	t3RTX      *rtxTimer
	tReconfig  *rtxTimer
	ackTimer   *ackTimer

	// Chunks stored for retransmission
	storedInit       *chunkInit
	storedCookieEcho *chunkCookieEcho

	streams              map[uint16]*Stream
	acceptCh             chan *Stream
	readLoopCloseCh      chan struct{}
	awakeWriteLoopCh     chan struct{}
	closeWriteLoopCh     chan struct{}
	handshakeCompletedCh chan error

	closeWriteLoopOnce sync.Once

	// local error
	silentError error

	ackState int
	ackMode  int // for testing

	// stats
	stats *associationStats

	// per inbound packet context
	delayedAckTriggered   bool
	immediateAckTriggered bool

	blockWrite   bool
	writePending bool
	writeNotify  chan struct{}

	name string
	log  logging.LeveledLogger
}

// Config collects the arguments to createAssociation construction into
// a single structure.
type Config struct {
	Name                 string
	NetConn              net.Conn
	MaxReceiveBufferSize uint32
	MaxMessageSize       uint32
	EnableZeroChecksum   bool
	LoggerFactory        logging.LoggerFactory
	BlockWrite           bool
	MTU                  uint32

	// congestion control configuration
	// RTOMax is the maximum retransmission timeout in milliseconds
	RTOMax float64
	// Minimum congestion window
	MinCwnd uint32
	// Send window for fast retransmit
	FastRtxWnd uint32
	// Step of congestion window increase at Congestion Avoidance
	CwndCAStep uint32
}

// Server accepts a SCTP stream over a conn.
func ( Config) (*Association, error) {
	 := createAssociation()
	.init(false)

	select {
	case  := <-.handshakeCompletedCh:
		if  != nil {
			return nil, 
		}

		return , nil
	case <-.readLoopCloseCh:
		return nil, ErrAssociationClosedBeforeConn
	}
}

// Client opens a SCTP stream over a conn.
func ( Config) (*Association, error) {
	return createClientWithContext(context.Background(), )
}

func createClientWithContext( context.Context,  Config) (*Association, error) {
	 := createAssociation()
	.init(true)

	select {
	case <-.Done():
		.log.Errorf("[%s] client handshake canceled: state=%s", .name, getAssociationStateString(.getState()))
		.Close() // nolint:errcheck,gosec

		return nil, .Err()
	case  := <-.handshakeCompletedCh:
		if  != nil {
			return nil, 
		}

		return , nil
	case <-.readLoopCloseCh:
		return nil, ErrAssociationClosedBeforeConn
	}
}

func createAssociation( Config) *Association {
	 := .MaxReceiveBufferSize
	if  == 0 {
		 = initialRecvBufSize
	}

	 := .MaxMessageSize
	if  == 0 {
		 = defaultMaxMessageSize
	}

	 := .MTU
	if  == 0 {
		 = initialMTU
	}

	 := globalMathRandomGenerator.Uint32()
	 := &Association{
		netConn:              .NetConn,
		maxReceiveBufferSize: ,
		maxMessageSize:       ,
		minCwnd:              .MinCwnd,
		fastRtxWnd:           .FastRtxWnd,
		cwndCAStep:           .CwndCAStep,

		// These two max values have us not need to follow
		// 5.1.1 where this peer may be incapable of supporting
		// the requested amount of outbound streams from the other
		// peer.
		myMaxNumOutboundStreams: math.MaxUint16,
		myMaxNumInboundStreams:  math.MaxUint16,

		payloadQueue:            newReceivePayloadQueue(getMaxTSNOffset()),
		inflightQueue:           newPayloadQueue(),
		pendingQueue:            newPendingQueue(),
		controlQueue:            newControlQueue(),
		mtu:                     ,
		maxPayloadSize:           - (commonHeaderSize + dataChunkHeaderSize),
		myVerificationTag:       globalMathRandomGenerator.Uint32(),
		initialTSN:              ,
		myNextTSN:               ,
		myNextRSN:               ,
		minTSN2MeasureRTT:       ,
		state:                   closed,
		rtoMgr:                  newRTOManager(.RTOMax),
		streams:                 map[uint16]*Stream{},
		reconfigs:               map[uint32]*chunkReconfig{},
		reconfigRequests:        map[uint32]*paramOutgoingResetRequest{},
		acceptCh:                make(chan *Stream, acceptChSize),
		readLoopCloseCh:         make(chan struct{}),
		awakeWriteLoopCh:        make(chan struct{}, 1),
		closeWriteLoopCh:        make(chan struct{}),
		handshakeCompletedCh:    make(chan error),
		cumulativeTSNAckPoint:    - 1,
		advancedPeerTSNAckPoint:  - 1,
		recvZeroChecksum:        .EnableZeroChecksum,
		silentError:             ErrSilentlyDiscard,
		stats:                   &associationStats{},
		log:                     .LoggerFactory.NewLogger("sctp"),
		name:                    .Name,
		blockWrite:              .BlockWrite,
		writeNotify:             make(chan struct{}, 1),
	}

	if .name == "" {
		.name = fmt.Sprintf("%p", )
	}

	// RFC 4690 Sec 7.2.1
	//  o  The initial cwnd before DATA transmission or after a sufficiently
	//     long idle period MUST be set to min(4*MTU, max (2*MTU, 4380
	//     bytes)).
	.setCWND(min32(4*.MTU(), max32(2*.MTU(), 4380)))
	.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (INI)",
		.name, .CWND(), .ssthresh, .inflightQueue.getNumBytes())

	.srtt.Store(float64(0))
	.t1Init = newRTXTimer(timerT1Init, , maxInitRetrans, .RTOMax)
	.t1Cookie = newRTXTimer(timerT1Cookie, , maxInitRetrans, .RTOMax)
	.t2Shutdown = newRTXTimer(timerT2Shutdown, , noMaxRetrans, .RTOMax)
	.t3RTX = newRTXTimer(timerT3RTX, , noMaxRetrans, .RTOMax)
	.tReconfig = newRTXTimer(timerReconfig, , noMaxRetrans, .RTOMax)
	.ackTimer = newAckTimer()

	return 
}

func ( *Association) ( bool) {
	.lock.Lock()
	defer .lock.Unlock()

	go .readLoop()
	go .writeLoop()

	if  {
		 := &chunkInit{}
		.initialTSN = .myNextTSN
		.numOutboundStreams = .myMaxNumOutboundStreams
		.numInboundStreams = .myMaxNumInboundStreams
		.initiateTag = .myVerificationTag
		.advertisedReceiverWindowCredit = .maxReceiveBufferSize
		setSupportedExtensions(&.chunkInitCommon)

		if .recvZeroChecksum {
			.params = append(.params, &paramZeroChecksumAcceptable{edmid: dtlsErrorDetectionMethod})
		}

		.storedInit = 

		 := .sendInit()
		if  != nil {
			.log.Errorf("[%s] failed to send init: %s", .name, .Error())
		}

		// After sending the INIT chunk, "A" starts the T1-init timer and enters the COOKIE-WAIT state.
		// Note: ideally we would set state after the timer starts but since we don't do this in an atomic
		// set + timer-start, it's safer to just set the state first so that we don't have a timer expiration
		// race.
		.setState(cookieWait)
		.t1Init.start(.rtoMgr.getRTO())
	}
}

// caller must hold a.lock.
func ( *Association) () error {
	.log.Debugf("[%s] sending INIT", .name)
	if .storedInit == nil {
		return ErrInitNotStoredToSend
	}

	 := &packet{}
	.verificationTag = 0
	.sourcePort = defaultSCTPSrcDstPort
	.destinationPort = defaultSCTPSrcDstPort
	.sourcePort = .sourcePort
	.destinationPort = .destinationPort

	.chunks = []chunk{.storedInit}

	.controlQueue.push()
	.awakeWriteLoop()

	return nil
}

// caller must hold a.lock.
func ( *Association) () error {
	if .storedCookieEcho == nil {
		return ErrCookieEchoNotStoredToSend
	}

	.log.Debugf("[%s] sending COOKIE-ECHO", .name)

	 := &packet{}
	.verificationTag = .peerVerificationTag
	.sourcePort = .sourcePort
	.destinationPort = .destinationPort
	.chunks = []chunk{.storedCookieEcho}

	.controlQueue.push()
	.awakeWriteLoop()

	return nil
}

// Shutdown initiates the shutdown sequence. The method blocks until the
// shutdown sequence is completed and the connection is closed, or until the
// passed context is done, in which case the context's error is returned.
func ( *Association) ( context.Context) error {
	.log.Debugf("[%s] closing association..", .name)

	 := .getState()

	if  != established {
		return fmt.Errorf("%w: shutdown %s", ErrShutdownNonEstablished, .name)
	}

	// Attempt a graceful shutdown.
	.setState(shutdownPending)

	.lock.Lock()

	if .inflightQueue.size() == 0 {
		// No more outstanding, send shutdown.
		.willSendShutdown = true
		.awakeWriteLoop()
		.setState(shutdownSent)
	}

	.lock.Unlock()

	select {
	case <-.closeWriteLoopCh:
		return nil
	case <-.Done():
		return .Err()
	}
}

// Close ends the SCTP Association and cleans up any state.
func ( *Association) () error {
	.log.Debugf("[%s] closing association..", .name)

	 := .close()

	// Wait for readLoop to end
	<-.readLoopCloseCh

	.log.Debugf("[%s] association closed", .name)
	.log.Debugf("[%s] stats nPackets (in) : %d", .name, .stats.getNumPacketsReceived())
	.log.Debugf("[%s] stats nPackets (out) : %d", .name, .stats.getNumPacketsSent())
	.log.Debugf("[%s] stats nDATAs (in) : %d", .name, .stats.getNumDATAs())
	.log.Debugf("[%s] stats nSACKs (in) : %d", .name, .stats.getNumSACKsReceived())
	.log.Debugf("[%s] stats nSACKs (out) : %d", .name, .stats.getNumSACKsSent())
	.log.Debugf("[%s] stats nT3Timeouts : %d", .name, .stats.getNumT3Timeouts())
	.log.Debugf("[%s] stats nAckTimeouts: %d", .name, .stats.getNumAckTimeouts())
	.log.Debugf("[%s] stats nFastRetrans: %d", .name, .stats.getNumFastRetrans())

	return 
}

func ( *Association) () error {
	.log.Debugf("[%s] closing association..", .name)

	.setState(closed)

	 := .netConn.Close()

	.closeAllTimers()

	// awake writeLoop to exit
	.closeWriteLoopOnce.Do(func() { close(.closeWriteLoopCh) })

	return 
}

// Abort sends the abort packet with user initiated abort and immediately
// closes the connection.
func ( *Association) ( string) {
	.log.Debugf("[%s] aborting association: %s", .name, )

	.lock.Lock()

	.willSendAbort = true
	.willSendAbortCause = &errorCauseUserInitiatedAbort{
		upperLayerAbortReason: []byte(),
	}

	.lock.Unlock()

	.awakeWriteLoop()

	// Wait for readLoop to end
	<-.readLoopCloseCh
}

func ( *Association) () {
	// Close all retransmission & ack timers
	.t1Init.close()
	.t1Cookie.close()
	.t2Shutdown.close()
	.t3RTX.close()
	.tReconfig.close()
	.ackTimer.close()
}

func ( *Association) () {
	var  error
	defer func() {
		// also stop writeLoop, otherwise writeLoop can be leaked
		// if connection is lost when there is no writing packet.
		.closeWriteLoopOnce.Do(func() { close(.closeWriteLoopCh) })

		.lock.Lock()
		.setState(closed)
		for ,  := range .streams {
			.unregisterStream(, )
		}
		.lock.Unlock()
		close(.acceptCh)
		close(.readLoopCloseCh)

		.log.Debugf("[%s] association closed", .name)
		.log.Debugf("[%s] stats nDATAs (in) : %d", .name, .stats.getNumDATAs())
		.log.Debugf("[%s] stats nSACKs (in) : %d", .name, .stats.getNumSACKsReceived())
		.log.Debugf("[%s] stats nT3Timeouts : %d", .name, .stats.getNumT3Timeouts())
		.log.Debugf("[%s] stats nAckTimeouts: %d", .name, .stats.getNumAckTimeouts())
		.log.Debugf("[%s] stats nFastRetrans: %d", .name, .stats.getNumFastRetrans())
	}()

	.log.Debugf("[%s] readLoop entered", .name)
	 := make([]byte, receiveMTU)

	for {
		,  := .netConn.Read()
		if  != nil {
			 = 

			break
		}
		// Make a buffer sized to what we read, then copy the data we
		// read from the underlying transport. We do this because the
		// user data is passed to the reassembly queue without
		// copying.
		 := make([]byte, )
		copy(, [:])
		atomic.AddUint64(&.bytesReceived, uint64()) //nolint:gosec // G115
		if  = .handleInbound();  != nil {
			 = 

			break
		}
	}

	.log.Debugf("[%s] readLoop exited %s", .name, )
}

func ( *Association) () {
	.log.Debugf("[%s] writeLoop entered", .name)
	defer .log.Debugf("[%s] writeLoop exited", .name)

:
	for {
		,  := .gatherOutbound()

		for ,  := range  {
			,  := .netConn.Write()
			if  != nil {
				if !errors.Is(, io.EOF) {
					.log.Warnf("[%s] failed to write packets on netConn: %v", .name, )
				}
				.log.Debugf("[%s] writeLoop ended", .name)

				break 
			}
			atomic.AddUint64(&.bytesSent, uint64(len()))
			.stats.incPacketsSent()
		}

		if ! {
			if  := .close();  != nil {
				.log.Warnf("[%s] failed to close association: %v", .name, )
			}

			return
		}

		select {
		case <-.awakeWriteLoopCh:
		case <-.closeWriteLoopCh:
			break 
		}
	}

	.setState(closed)
	.closeAllTimers()
}

func ( *Association) () {
	select {
	case .awakeWriteLoopCh <- struct{}{}:
	default:
	}
}

func ( *Association) () bool {
	return .blockWrite
}

// Mark the association is writable and unblock the waiting write,
// the caller should hold the association write lock.
func ( *Association) () {
	.writePending = false
	select {
	case .writeNotify <- struct{}{}:
	default:
	}
}

// unregisterStream un-registers a stream from the association
// The caller should hold the association write lock.
func ( *Association) ( *Stream,  error) {
	.lock.Lock()
	defer .lock.Unlock()

	delete(.streams, .streamIdentifier)
	.readErr = 
	.readNotifier.Broadcast()
}

func chunkMandatoryChecksum( []chunk) bool {
	for ,  := range  {
		switch .(type) {
		case *chunkInit, *chunkCookieEcho:
			return true
		}
	}

	return false
}

func ( *Association) ( *packet) ([]byte, error) {
	return .marshal(!.sendZeroChecksum || chunkMandatoryChecksum(.chunks))
}

func ( *Association) ( []byte) (*packet, error) {
	 := &packet{}
	if  := .unmarshal(!.recvZeroChecksum, );  != nil {
		return nil, 
	}

	return , nil
}

// handleInbound parses incoming raw packets.
func ( *Association) ( []byte) error {
	,  := .unmarshalPacket()
	if  != nil {
		.log.Warnf("[%s] unable to parse SCTP packet %s", .name, )

		return nil
	}

	if  := checkPacket();  != nil {
		.log.Warnf("[%s] failed validating packet %s", .name, )

		return nil
	}

	.handleChunksStart()

	for ,  := range .chunks {
		if  := .handleChunk(, );  != nil {
			return 
		}
	}

	.handleChunksEnd()

	return nil
}

// The caller should hold the lock.
func ( *Association) ( [][]byte) [][]byte {
	for ,  := range .getDataPacketsToRetransmit() {
		,  := .marshalPacket()
		if  != nil {
			.log.Warnf("[%s] failed to serialize a DATA packet to be retransmitted", .name)

			continue
		}
		 = append(, )
	}

	return 
}

// The caller should hold the lock.
//
//nolint:cyclop
func ( *Association) ( [][]byte) [][]byte {
	// Pop unsent data chunks from the pending queue to send as much as
	// cwnd and rwnd allow.
	,  := .popPendingDataChunksToSend()
	if len() > 0 {
		// Start timer. (noop if already started)
		.log.Tracef("[%s] T3-rtx timer start (pt1)", .name)
		.t3RTX.start(.rtoMgr.getRTO())
		for ,  := range .bundleDataChunksIntoPackets() {
			,  := .marshalPacket()
			if  != nil {
				.log.Warnf("[%s] failed to serialize a DATA packet", .name)

				continue
			}
			 = append(, )
		}
	}

	if len() > 0 || .willRetransmitReconfig { //nolint:nestif
		if .willRetransmitReconfig {
			.willRetransmitReconfig = false
			.log.Debugf("[%s] retransmit %d RECONFIG chunk(s)", .name, len(.reconfigs))
			for ,  := range .reconfigs {
				 := .createPacket([]chunk{})
				,  := .marshalPacket()
				if  != nil {
					.log.Warnf("[%s] failed to serialize a RECONFIG packet to be retransmitted", .name)
				} else {
					 = append(, )
				}
			}
		}

		if len() > 0 {
			 := .generateNextRSN()
			 := .myNextTSN - 1
			 := &chunkReconfig{
				paramA: &paramOutgoingResetRequest{
					reconfigRequestSequenceNumber: ,
					senderLastTSN:                 ,
					streamIdentifiers:             ,
				},
			}
			.reconfigs[] =  // store in the map for retransmission
			.log.Debugf("[%s] sending RECONFIG: rsn=%d tsn=%d streams=%v",
				.name, , .myNextTSN-1, )
			 := .createPacket([]chunk{})
			,  := .marshalPacket()
			if  != nil {
				.log.Warnf("[%s] failed to serialize a RECONFIG packet to be transmitted", .name)
			} else {
				 = append(, )
			}
		}

		if len(.reconfigs) > 0 {
			.tReconfig.start(.rtoMgr.getRTO())
		}
	}

	return 
}

// The caller should hold the lock.
//
//nolint:cyclop
func ( *Association) ( [][]byte) [][]byte {
	if .willRetransmitFast { //nolint:nestif
		.willRetransmitFast = false

		 := []*chunkPayloadData{}
		 := commonHeaderSize

		 := .MTU()
		if  < .fastRtxWnd {
			 = .fastRtxWnd
		}
		for  := 0; ; ++ {
			,  := .inflightQueue.get(.cumulativeTSNAckPoint + uint32() + 1) //nolint:gosec // G115
			if ! {
				break // end of pending data
			}

			if .acked || .abandoned() {
				continue
			}

			if .nSent > 1 || .missIndicator < 3 {
				continue
			}

			// RFC 4960 Sec 7.2.4 Fast Retransmit on Gap Reports
			//  3)  Determine how many of the earliest (i.e., lowest TSN) DATA chunks
			//      marked for retransmission will fit into a single packet, subject
			//      to constraint of the path MTU of the destination transport
			//      address to which the packet is being sent.  Call this value K.
			//      Retransmit those K DATA chunks in a single packet.  When a Fast
			//      Retransmit is being performed, the sender SHOULD ignore the value
			//      of cwnd and SHOULD NOT delay retransmission for this single
			//		packet.

			 := dataChunkHeaderSize + uint32(len(.userData)) //nolint:gosec // G115
			if  < + {
				break
			}

			 += 
			.stats.incFastRetrans()
			.nSent++
			.checkPartialReliabilityStatus()
			 = append(, )
			.log.Tracef("[%s] fast-retransmit: tsn=%d sent=%d htna=%d",
				.name, .tsn, .nSent, .fastRecoverExitPoint)
		}

		if len() > 0 {
			for ,  := range .bundleDataChunksIntoPackets() {
				,  := .marshalPacket()
				if  != nil {
					.log.Warnf("[%s] failed to serialize a DATA packet to be fast-retransmitted", .name)

					continue
				}
				 = append(, )
			}
		}
	}

	return 
}

// The caller should hold the lock.
func ( *Association) ( [][]byte) [][]byte {
	if .ackState == ackStateImmediate {
		.ackState = ackStateIdle
		 := .createSelectiveAckChunk()
		.stats.incSACKsSent()
		.log.Debugf("[%s] sending SACK: %s", .name, )
		,  := .marshalPacket(.createPacket([]chunk{}))
		if  != nil {
			.log.Warnf("[%s] failed to serialize a SACK packet", .name)
		} else {
			 = append(, )
		}
	}

	return 
}

// The caller should hold the lock.
func ( *Association) ( [][]byte) [][]byte {
	if .willSendForwardTSN {
		.willSendForwardTSN = false
		if sna32GT(.advancedPeerTSNAckPoint, .cumulativeTSNAckPoint) {
			 := .createForwardTSN()
			,  := .marshalPacket(.createPacket([]chunk{}))
			if  != nil {
				.log.Warnf("[%s] failed to serialize a Forward TSN packet", .name)
			} else {
				 = append(, )
			}
		}
	}

	return 
}

func ( *Association) ( [][]byte) ([][]byte, bool) {
	 := true

	switch {
	case .willSendShutdown:
		.willSendShutdown = false

		 := &chunkShutdown{
			cumulativeTSNAck: .cumulativeTSNAckPoint,
		}

		,  := .marshalPacket(.createPacket([]chunk{}))
		if  != nil {
			.log.Warnf("[%s] failed to serialize a Shutdown packet", .name)
		} else {
			.t2Shutdown.start(.rtoMgr.getRTO())
			 = append(, )
		}
	case .willSendShutdownAck:
		.willSendShutdownAck = false

		 := &chunkShutdownAck{}

		,  := .marshalPacket(.createPacket([]chunk{}))
		if  != nil {
			.log.Warnf("[%s] failed to serialize a ShutdownAck packet", .name)
		} else {
			.t2Shutdown.start(.rtoMgr.getRTO())
			 = append(, )
		}
	case .willSendShutdownComplete:
		.willSendShutdownComplete = false

		 := &chunkShutdownComplete{}

		,  := .marshalPacket(.createPacket([]chunk{}))
		if  != nil {
			.log.Warnf("[%s] failed to serialize a ShutdownComplete packet", .name)
		} else {
			 = append(, )
			 = false
		}
	}

	return , 
}

func ( *Association) () ([]byte, error) {
	 := .willSendAbortCause

	.willSendAbort = false
	.willSendAbortCause = nil

	 := &chunkAbort{}

	if  != nil {
		.errorCauses = []errorCause{}
	}

	,  := .marshalPacket(.createPacket([]chunk{}))

	return , 
}

// gatherOutbound gathers outgoing packets. The returned bool value set to
// false means the association should be closed down after the final send.
func ( *Association) () ([][]byte, bool) {
	.lock.Lock()
	defer .lock.Unlock()

	if .willSendAbort {
		,  := .gatherAbortPacket()
		if  != nil {
			.log.Warnf("[%s] failed to serialize an abort packet", .name)

			return nil, false
		}

		return [][]byte{}, false
	}

	 := [][]byte{}

	if .controlQueue.size() > 0 {
		for ,  := range .controlQueue.popAll() {
			,  := .marshalPacket()
			if  != nil {
				.log.Warnf("[%s] failed to serialize a control packet", .name)

				continue
			}
			 = append(, )
		}
	}

	 := .getState()

	 := true

	switch  {
	case established:
		 = .gatherDataPacketsToRetransmit()
		 = .gatherOutboundDataAndReconfigPackets()
		 = .gatherOutboundFastRetransmissionPackets()
		 = .gatherOutboundSackPackets()
		 = .gatherOutboundForwardTSNPackets()
	case shutdownPending, shutdownSent, shutdownReceived:
		 = .gatherDataPacketsToRetransmit()
		 = .gatherOutboundFastRetransmissionPackets()
		 = .gatherOutboundSackPackets()
		,  = .gatherOutboundShutdownPackets()
	case shutdownAckSent:
		,  = .gatherOutboundShutdownPackets()
	}

	return , 
}

func checkPacket( *packet) error {
	// All packets must adhere to these rules

	// This is the SCTP sender's port number.  It can be used by the
	// receiver in combination with the source IP address, the SCTP
	// destination port, and possibly the destination IP address to
	// identify the association to which this packet belongs.  The port
	// number 0 MUST NOT be used.
	if .sourcePort == 0 {
		return ErrSCTPPacketSourcePortZero
	}

	// This is the SCTP port number to which this packet is destined.
	// The receiving host will use this port number to de-multiplex the
	// SCTP packet to the correct receiving endpoint/application.  The
	// port number 0 MUST NOT be used.
	if .destinationPort == 0 {
		return ErrSCTPPacketDestinationPortZero
	}

	// Check values on the packet that are specific to a particular chunk type
	for ,  := range .chunks {
		switch .(type) { // nolint:gocritic
		case *chunkInit:
			// An INIT or INIT ACK chunk MUST NOT be bundled with any other chunk.
			// They MUST be the only chunks present in the SCTP packets that carry
			// them.
			if len(.chunks) != 1 {
				return ErrInitChunkBundled
			}

			// A packet containing an INIT chunk MUST have a zero Verification
			// Tag.
			if .verificationTag != 0 {
				return ErrInitChunkVerifyTagNotZero
			}
		}
	}

	return nil
}

func min16(,  uint16) uint16 {
	if  <  {
		return 
	}

	return 
}

func max32(,  uint32) uint32 {
	if  >  {
		return 
	}

	return 
}

func min32(,  uint32) uint32 {
	if  <  {
		return 
	}

	return 
}

// peerLastTSN return last received cumulative TSN.
func ( *Association) () uint32 {
	return .payloadQueue.getcumulativeTSN()
}

// setState atomically sets the state of the Association.
// The caller should hold the lock.
func ( *Association) ( uint32) {
	 := atomic.SwapUint32(&.state, )
	if  !=  {
		.log.Debugf("[%s] state change: '%s' => '%s'",
			.name,
			getAssociationStateString(),
			getAssociationStateString())
	}
}

// getState atomically returns the state of the Association.
func ( *Association) () uint32 {
	return atomic.LoadUint32(&.state)
}

// BytesSent returns the number of bytes sent.
func ( *Association) () uint64 {
	return atomic.LoadUint64(&.bytesSent)
}

// BytesReceived returns the number of bytes received.
func ( *Association) () uint64 {
	return atomic.LoadUint64(&.bytesReceived)
}

// MTU returns the association's current MTU.
func ( *Association) () uint32 {
	return atomic.LoadUint32(&.mtu)
}

// CWND returns the association's current congestion window (cwnd).
func ( *Association) () uint32 {
	return atomic.LoadUint32(&.cwnd)
}

func ( *Association) ( uint32) {
	if  < .minCwnd {
		 = .minCwnd
	}
	atomic.StoreUint32(&.cwnd, )
}

// RWND returns the association's current receiver window (rwnd).
func ( *Association) () uint32 {
	return atomic.LoadUint32(&.rwnd)
}

func ( *Association) ( uint32) {
	atomic.StoreUint32(&.rwnd, )
}

// SRTT returns the latest smoothed round-trip time (srrt).
func ( *Association) () float64 {
	return .srtt.Load().(float64) //nolint:forcetypeassert
}

// getMaxTSNOffset returns the maximum offset over the current cummulative TSN that
// we are willing to enqueue. This ensures that we keep the bytes utilized in the receive
// buffer within a small multiple of the user provided max receive buffer size.
func getMaxTSNOffset( uint32) uint32 {
	// 4 is a magic number here. There is no theory behind this.
	 := ( * 4) / avgChunkSize
	if  < minTSNOffset {
		 = minTSNOffset
	}
	if  > maxTSNOffset {
		 = maxTSNOffset
	}

	return 
}

func setSupportedExtensions( *chunkInitCommon) {
	// nolint:godox
	// TODO RFC5061 https://tools.ietf.org/html/rfc6525#section-5.2
	// An implementation supporting this (Supported Extensions Parameter)
	// extension MUST list the ASCONF, the ASCONF-ACK, and the AUTH chunks
	// in its INIT and INIT-ACK parameters.
	.params = append(.params, &paramSupportedExtensions{
		ChunkTypes: []chunkType{ctReconfig, ctForwardTSN},
	})
}

// The caller should hold the lock.
//
//nolint:cyclop
func ( *Association) ( *packet,  *chunkInit) ([]*packet, error) {
	 := .getState()
	.log.Debugf("[%s] chunkInit received in state '%s'", .name, getAssociationStateString())

	// https://tools.ietf.org/html/rfc4960#section-5.2.1
	// Upon receipt of an INIT in the COOKIE-WAIT state, an endpoint MUST
	// respond with an INIT ACK using the same parameters it sent in its
	// original INIT chunk (including its Initiate Tag, unchanged).  When
	// responding, the endpoint MUST send the INIT ACK back to the same
	// address that the original INIT (sent by this endpoint) was sent.

	if  != closed &&  != cookieWait &&  != cookieEchoed {
		// 5.2.2.  Unexpected INIT in States Other than CLOSED, COOKIE-ECHOED,
		//        COOKIE-WAIT, and SHUTDOWN-ACK-SENT
		return nil, fmt.Errorf("%w: %s", ErrHandleInitState, getAssociationStateString())
	}

	// NOTE: Setting these prior to a reception of a COOKIE ECHO chunk containing
	// our cookie is not compliant with https://www.rfc-editor.org/rfc/rfc9260#section-5.1-2.2.3.
	// It makes us more vulnerable to resource attacks, albeit minimally so.
	//  https://www.rfc-editor.org/rfc/rfc9260#sec_handle_stream_parameters
	.myMaxNumInboundStreams = min16(.numInboundStreams, .myMaxNumInboundStreams)
	.myMaxNumOutboundStreams = min16(.numOutboundStreams, .myMaxNumOutboundStreams)
	.peerVerificationTag = .initiateTag
	.sourcePort = .destinationPort
	.destinationPort = .sourcePort

	// 13.2 This is the last TSN received in sequence.  This value
	// is set initially by taking the peer's initial TSN,
	// received in the INIT or INIT ACK chunk, and
	// subtracting one from it.
	.payloadQueue.init(.initialTSN - 1)

	.setRWND(.advertisedReceiverWindowCredit)
	.log.Debugf("[%s] initial rwnd=%d", .name, .RWND())

	for ,  := range .params {
		switch v := .(type) { // nolint:gocritic
		case *paramSupportedExtensions:
			for ,  := range .ChunkTypes {
				if  == ctForwardTSN {
					.log.Debugf("[%s] use ForwardTSN (on init)", .name)
					.useForwardTSN = true
				}
			}
		case *paramZeroChecksumAcceptable:
			.sendZeroChecksum = .edmid == dtlsErrorDetectionMethod
		}
	}

	if !.useForwardTSN {
		.log.Warnf("[%s] not using ForwardTSN (on init)", .name)
	}

	 := &packet{}
	.verificationTag = .peerVerificationTag
	.sourcePort = .sourcePort
	.destinationPort = .destinationPort

	 := &chunkInitAck{}
	.log.Debug("sending INIT ACK")

	.initialTSN = .myNextTSN
	.numOutboundStreams = .myMaxNumOutboundStreams
	.numInboundStreams = .myMaxNumInboundStreams
	.initiateTag = .myVerificationTag
	.advertisedReceiverWindowCredit = .maxReceiveBufferSize

	if .myCookie == nil {
		var  error
		// NOTE: This generation process is not compliant with
		// 5.1.3.  Generating State Cookie (https://www.rfc-editor.org/rfc/rfc4960#section-5.1.3)
		if .myCookie,  = newRandomStateCookie();  != nil {
			return nil, 
		}
	}

	.params = []param{.myCookie}

	if .recvZeroChecksum {
		.params = append(.params, &paramZeroChecksumAcceptable{edmid: dtlsErrorDetectionMethod})
	}
	.log.Debugf("[%s] sendZeroChecksum=%t (on init)", .name, .sendZeroChecksum)

	setSupportedExtensions(&.chunkInitCommon)

	.chunks = []chunk{}

	return pack(), nil
}

// The caller should hold the lock.
func ( *Association) ( *packet,  *chunkInitAck) error { //nolint:cyclop
	 := .getState()
	.log.Debugf("[%s] chunkInitAck received in state '%s'", .name, getAssociationStateString())
	if  != cookieWait {
		// RFC 4960
		// 5.2.3.  Unexpected INIT ACK
		//   If an INIT ACK is received by an endpoint in any state other than the
		//   COOKIE-WAIT state, the endpoint should discard the INIT ACK chunk.
		//   An unexpected INIT ACK usually indicates the processing of an old or
		//   duplicated INIT chunk.
		return nil
	}

	.myMaxNumInboundStreams = min16(.numInboundStreams, .myMaxNumInboundStreams)
	.myMaxNumOutboundStreams = min16(.numOutboundStreams, .myMaxNumOutboundStreams)
	.peerVerificationTag = .initiateTag
	.payloadQueue.init(.initialTSN - 1)
	if .sourcePort != .destinationPort ||
		.destinationPort != .sourcePort {
		.log.Warnf("[%s] handleInitAck: port mismatch", .name)

		return nil
	}

	.setRWND(.advertisedReceiverWindowCredit)
	.log.Debugf("[%s] initial rwnd=%d", .name, .RWND())

	// RFC 4690 Sec 7.2.1
	//  o  The initial value of ssthresh MAY be arbitrarily high (for
	//     example, implementations MAY use the size of the receiver
	//     advertised window).
	.ssthresh = .RWND()
	.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (INI)",
		.name, .CWND(), .ssthresh, .inflightQueue.getNumBytes())

	.t1Init.stop()
	.storedInit = nil

	var  *paramStateCookie
	for ,  := range .params {
		switch v := .(type) {
		case *paramStateCookie:
			 = 
		case *paramSupportedExtensions:
			for ,  := range .ChunkTypes {
				if  == ctForwardTSN {
					.log.Debugf("[%s] use ForwardTSN (on initAck)", .name)
					.useForwardTSN = true
				}
			}
		case *paramZeroChecksumAcceptable:
			.sendZeroChecksum = .edmid == dtlsErrorDetectionMethod
		}
	}

	.log.Debugf("[%s] sendZeroChecksum=%t (on initAck)", .name, .sendZeroChecksum)

	if !.useForwardTSN {
		.log.Warnf("[%s] not using ForwardTSN (on initAck)", .name)
	}
	if  == nil {
		return ErrInitAckNoCookie
	}

	.storedCookieEcho = &chunkCookieEcho{}
	.storedCookieEcho.cookie = .cookie

	 := .sendCookieEcho()
	if  != nil {
		.log.Errorf("[%s] failed to send init: %s", .name, .Error())
	}

	.t1Cookie.start(.rtoMgr.getRTO())
	.setState(cookieEchoed)

	return nil
}

// The caller should hold the lock.
func ( *Association) ( *chunkHeartbeat) []*packet {
	.log.Tracef("[%s] chunkHeartbeat", .name)
	,  := .params[0].(*paramHeartbeatInfo)
	if ! {
		.log.Warnf("[%s] failed to handle Heartbeat, no ParamHeartbeatInfo", .name)
	}

	return pack(&packet{
		verificationTag: .peerVerificationTag,
		sourcePort:      .sourcePort,
		destinationPort: .destinationPort,
		chunks: []chunk{&chunkHeartbeatAck{
			params: []param{
				&paramHeartbeatInfo{
					heartbeatInformation: .heartbeatInformation,
				},
			},
		}},
	})
}

// The caller should hold the lock.
func ( *Association) ( *chunkCookieEcho) []*packet {
	 := .getState()
	.log.Debugf("[%s] COOKIE-ECHO received in state '%s'", .name, getAssociationStateString())

	if .myCookie == nil {
		.log.Debugf("[%s] COOKIE-ECHO received before initialization", .name)

		return nil
	}
	switch  {
	default:
		return nil
	case established:
		if !bytes.Equal(.myCookie.cookie, .cookie) {
			return nil
		}
	case closed, cookieWait, cookieEchoed:
		if !bytes.Equal(.myCookie.cookie, .cookie) {
			return nil
		}

		// RFC wise, these do not seem to belong here, but removing them
		// causes TestCookieEchoRetransmission to break
		.t1Init.stop()
		.storedInit = nil

		.t1Cookie.stop()
		.storedCookieEcho = nil

		.setState(established)
		if !.completeHandshake(nil) {
			return nil
		}
	}

	 := &packet{
		verificationTag: .peerVerificationTag,
		sourcePort:      .sourcePort,
		destinationPort: .destinationPort,
		chunks:          []chunk{&chunkCookieAck{}},
	}

	return pack()
}

// The caller should hold the lock.
func ( *Association) () {
	 := .getState()
	.log.Debugf("[%s] COOKIE-ACK received in state '%s'", .name, getAssociationStateString())
	if  != cookieEchoed {
		// RFC 4960
		// 5.2.5.  Handle Duplicate COOKIE-ACK.
		//   At any state other than COOKIE-ECHOED, an endpoint should silently
		//   discard a received COOKIE ACK chunk.
		return
	}

	.t1Cookie.stop()
	.storedCookieEcho = nil

	.setState(established)
	.completeHandshake(nil)
}

// The caller should hold the lock.
func ( *Association) ( *chunkPayloadData) []*packet {
	.log.Tracef("[%s] DATA: tsn=%d immediateSack=%v len=%d",
		.name, .tsn, .immediateSack, len(.userData))
	.stats.incDATAs()

	 := .payloadQueue.canPush(.tsn)
	if  { //nolint:nestif
		 := .getOrCreateStream(.streamIdentifier, true, PayloadTypeUnknown)
		if  == nil {
			// silently discard the data. (sender will retry on T3-rtx timeout)
			// see pion/sctp#30
			.log.Debugf("[%s] discard %d", .name, .streamSequenceNumber)

			return nil
		}

		if .getMyReceiverWindowCredit() > 0 {
			// Pass the new chunk to stream level as soon as it arrives
			.payloadQueue.push(.tsn)
			.handleData()
		} else {
			// Receive buffer is full
			,  := .payloadQueue.getLastTSNReceived()
			if  && sna32LT(.tsn, ) {
				.log.Debugf(
					"[%s] receive buffer full, but accepted as this is a missing chunk with tsn=%d ssn=%d",
					.name, .tsn, .streamSequenceNumber,
				)
				.payloadQueue.push(.tsn)
				.handleData()
			} else {
				.log.Debugf(
					"[%s] receive buffer full. dropping DATA with tsn=%d ssn=%d",
					.name, .tsn, .streamSequenceNumber,
				)
			}
		}
	}

	return .handlePeerLastTSNAndAcknowledgement(.immediateSack)
}

// A common routine for handleData and handleForwardTSN routines
// The caller should hold the lock.
func ( *Association) ( bool) []*packet { //nolint:cyclop
	var  []*packet

	// Try to advance peerLastTSN

	// From RFC 3758 Sec 3.6:
	//   .. and then MUST further advance its cumulative TSN point locally
	//   if possible
	// Meaning, if peerLastTSN+1 points to a chunk that is received,
	// advance peerLastTSN until peerLastTSN+1 points to unreceived chunk.
	for {
		if  := .payloadQueue.pop(false); ! {
			break
		}

		for ,  := range .reconfigRequests {
			 := .resetStreamsIfAny()
			if  != nil {
				.log.Debugf("[%s] RESET RESPONSE: %+v", .name, )
				 = append(, )
			}
		}
	}

	 := (.payloadQueue.size() > 0)
	if  {
		.log.Tracef("[%s] packetloss: %s", .name, .payloadQueue.getGapAckBlocksString())
	}

	if (.ackState != ackStateImmediate && ! && ! && .ackMode == ackModeNormal) ||
		.ackMode == ackModeAlwaysDelay {
		if .ackState == ackStateIdle {
			.delayedAckTriggered = true
		} else {
			.immediateAckTriggered = true
		}
	} else {
		.immediateAckTriggered = true
	}

	return 
}

// The caller should hold the lock.
func ( *Association) () uint32 {
	var  uint32
	for ,  := range .streams {
		 += uint32(.getNumBytesInReassemblyQueue()) //nolint:gosec // G115
	}

	if  >= .maxReceiveBufferSize {
		return 0
	}

	return .maxReceiveBufferSize - 
}

// OpenStream opens a stream.
func ( *Association) (
	 uint16,
	 PayloadProtocolIdentifier,
) (*Stream, error) {
	.lock.Lock()
	defer .lock.Unlock()

	switch .getState() {
	case shutdownAckSent, shutdownPending, shutdownReceived, shutdownSent, closed:
		return nil, ErrAssociationClosed
	}

	return .getOrCreateStream(, false, ), nil
}

// AcceptStream accepts a stream.
func ( *Association) () (*Stream, error) {
	,  := <-.acceptCh
	if ! {
		return nil, io.EOF // no more incoming streams
	}

	return , nil
}

// createStream creates a stream. The caller should hold the lock and check no stream exists for this id.
func ( *Association) ( uint16,  bool) *Stream {
	 := &Stream{
		association:      ,
		streamIdentifier: ,
		reassemblyQueue:  newReassemblyQueue(),
		log:              .log,
		name:             fmt.Sprintf("%d:%s", , .name),
		writeDeadline:    deadline.New(),
	}

	.readNotifier = sync.NewCond(&.lock)

	if  {
		select {
		case .acceptCh <- :
			.streams[] = 
			.log.Debugf("[%s] accepted a new stream (streamIdentifier: %d)",
				.name, )
		default:
			.log.Debugf("[%s] dropped a new stream (acceptCh size: %d)",
				.name, len(.acceptCh))

			return nil
		}
	} else {
		.streams[] = 
	}

	return 
}

// getOrCreateStream gets or creates a stream. The caller should hold the lock.
func ( *Association) (
	 uint16,
	 bool,
	 PayloadProtocolIdentifier,
) *Stream {
	if ,  := .streams[];  {
		.SetDefaultPayloadType()

		return 
	}

	 := .createStream(, )
	if  != nil {
		.SetDefaultPayloadType()
	}

	return 
}

// The caller should hold the lock.
//
//nolint:gocognit,cyclop
func ( *Association) ( *chunkSelectiveAck) (map[uint16]int, uint32, error) {
	 := map[uint16]int{}

	// New ack point, so pop all ACKed packets from inflightQueue
	// We add 1 because the "currentAckPoint" has already been popped from the inflight queue
	// For the first SACK we take care of this by setting the ackpoint to cumAck - 1
	for  := .cumulativeTSNAckPoint + 1; sna32LTE(, .cumulativeTSNAck); ++ {
		,  := .inflightQueue.pop()
		if ! {
			return nil, 0, fmt.Errorf("%w: %v", ErrInflightQueueTSNPop, )
		}

		if !.acked {
			// RFC 4096 sec 6.3.2.  Retransmission Timer Rules
			//   R3)  Whenever a SACK is received that acknowledges the DATA chunk
			//        with the earliest outstanding TSN for that address, restart the
			//        T3-rtx timer for that address with its current RTO (if there is
			//        still outstanding data on that address).
			if  == .cumulativeTSNAckPoint+1 {
				// T3 timer needs to be reset. Stop it for now.
				.t3RTX.stop()
			}

			 := len(.userData)

			// Sum the number of bytes acknowledged per stream
			if ,  := [.streamIdentifier];  {
				[.streamIdentifier] =  + 
			} else {
				[.streamIdentifier] = 
			}

			// RFC 4960 sec 6.3.1.  RTO Calculation
			//   C4)  When data is in flight and when allowed by rule C5 below, a new
			//        RTT measurement MUST be made each round trip.  Furthermore, new
			//        RTT measurements SHOULD be made no more than once per round trip
			//        for a given destination transport address.
			//   C5)  Karn's algorithm: RTT measurements MUST NOT be made using
			//        packets that were retransmitted (and thus for which it is
			//        ambiguous whether the reply was for the first instance of the
			//        chunk or for a later instance)
			if .nSent == 1 && sna32GTE(.tsn, .minTSN2MeasureRTT) {
				.minTSN2MeasureRTT = .myNextTSN
				 := time.Since(.since).Seconds() * 1000.0
				 := .rtoMgr.setNewRTT()
				.srtt.Store()
				.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f",
					.name, , , .rtoMgr.getRTO())
			}
		}

		if .inFastRecovery && .tsn == .fastRecoverExitPoint {
			.log.Debugf("[%s] exit fast-recovery", .name)
			.inFastRecovery = false
		}
	}

	 := .cumulativeTSNAck

	// Mark selectively acknowledged chunks as "acked"
	for ,  := range .gapAckBlocks {
		for  := .start;  <= .end; ++ {
			 := .cumulativeTSNAck + uint32()
			,  := .inflightQueue.get()
			if ! {
				return nil, 0, fmt.Errorf("%w: %v", ErrTSNRequestNotExist, )
			}

			if !.acked {
				 := .inflightQueue.markAsAcked()

				// Sum the number of bytes acknowledged per stream
				if ,  := [.streamIdentifier];  {
					[.streamIdentifier] =  + 
				} else {
					[.streamIdentifier] = 
				}

				.log.Tracef("[%s] tsn=%d has been sacked", .name, .tsn)

				if .nSent == 1 {
					.minTSN2MeasureRTT = .myNextTSN
					 := time.Since(.since).Seconds() * 1000.0
					 := .rtoMgr.setNewRTT()
					.srtt.Store()
					.log.Tracef("[%s] SACK: measured-rtt=%f srtt=%f new-rto=%f",
						.name, , , .rtoMgr.getRTO())
				}

				if sna32LT(, ) {
					 = 
				}
			}
		}
	}

	return , , nil
}

// The caller should hold the lock.
func ( *Association) ( int) {
	// RFC 4096, sec 6.3.2.  Retransmission Timer Rules
	//   R2)  Whenever all outstanding data sent to an address have been
	//        acknowledged, turn off the T3-rtx timer of that address.
	if .inflightQueue.size() == 0 {
		.log.Tracef("[%s] SACK: no more packet in-flight (pending=%d)", .name, .pendingQueue.size())
		.t3RTX.stop()
	} else {
		.log.Tracef("[%s] T3-rtx timer start (pt2)", .name)
		.t3RTX.start(.rtoMgr.getRTO())
	}

	// Update congestion control parameters
	if .CWND() <= .ssthresh { //nolint:nestif
		// RFC 4096, sec 7.2.1.  Slow-Start
		//   o  When cwnd is less than or equal to ssthresh, an SCTP endpoint MUST
		//		use the slow-start algorithm to increase cwnd only if the current
		//      congestion window is being fully utilized, an incoming SACK
		//      advances the Cumulative TSN Ack Point, and the data sender is not
		//      in Fast Recovery.  Only when these three conditions are met can
		//      the cwnd be increased; otherwise, the cwnd MUST not be increased.
		//		If these conditions are met, then cwnd MUST be increased by, at
		//      most, the lesser of 1) the total size of the previously
		//      outstanding DATA chunk(s) acknowledged, and 2) the destination's
		//      path MTU.
		if !.inFastRecovery &&
			.pendingQueue.size() > 0 {
			.setCWND(.CWND() + min32(uint32(), .CWND())) //nolint:gosec // G115
			// a.cwnd += min32(uint32(totalBytesAcked), a.MTU()) // SCTP way (slow)
			.log.Tracef("[%s] updated cwnd=%d ssthresh=%d acked=%d (SS)",
				.name, .CWND(), .ssthresh, )
		} else {
			.log.Tracef("[%s] cwnd did not grow: cwnd=%d ssthresh=%d acked=%d FR=%v pending=%d",
				.name, .CWND(), .ssthresh, , .inFastRecovery, .pendingQueue.size())
		}
	} else {
		// RFC 4096, sec 7.2.2.  Congestion Avoidance
		//   o  Whenever cwnd is greater than ssthresh, upon each SACK arrival
		//      that advances the Cumulative TSN Ack Point, increase
		//      partial_bytes_acked by the total number of bytes of all new chunks
		//      acknowledged in that SACK including chunks acknowledged by the new
		//      Cumulative TSN Ack and by Gap Ack Blocks.
		.partialBytesAcked += uint32() //nolint:gosec // G115

		//   o  When partial_bytes_acked is equal to or greater than cwnd and
		//      before the arrival of the SACK the sender had cwnd or more bytes
		//      of data outstanding (i.e., before arrival of the SACK, flight size
		//      was greater than or equal to cwnd), increase cwnd by MTU, and
		//      reset partial_bytes_acked to (partial_bytes_acked - cwnd).
		if .partialBytesAcked >= .CWND() && .pendingQueue.size() > 0 {
			.partialBytesAcked -= .CWND()
			 := .MTU()
			if  < .cwndCAStep {
				 = .cwndCAStep
			}
			.setCWND(.CWND() + )
			.log.Tracef("[%s] updated cwnd=%d ssthresh=%d acked=%d (CA)",
				.name, .CWND(), .ssthresh, )
		}
	}
}

// The caller should hold the lock.
//
//nolint:cyclop
func ( *Association) (
	 uint32,
	 []gapAckBlock,
	 uint32,
	 bool,
) error {
	// HTNA algorithm - RFC 4960 Sec 7.2.4
	// Increment missIndicator of each chunks that the SACK reported missing
	// when either of the following is met:
	// a)  Not in fast-recovery
	//     miss indications are incremented only for missing TSNs prior to the
	//     highest TSN newly acknowledged in the SACK.
	// b)  In fast-recovery AND the Cumulative TSN Ack Point advanced
	//     the miss indications are incremented for all TSNs reported missing
	//     in the SACK.
	//nolint:nestif
	if !.inFastRecovery ||
		(.inFastRecovery && ) {
		var  uint32
		if !.inFastRecovery {
			// a) increment only for missing TSNs prior to the HTNA
			 = 
		} else {
			// b) increment for all TSNs reported missing
			 = 
			if len() > 0 {
				 += uint32([len()-1].end)
			}
		}

		for  :=  + 1; sna32LT(, ); ++ {
			,  := .inflightQueue.get()
			if ! {
				return fmt.Errorf("%w: %v", ErrTSNRequestNotExist, )
			}
			if !.acked && !.abandoned() && .missIndicator < 3 {
				.missIndicator++
				if .missIndicator == 3 {
					if !.inFastRecovery {
						// 2)  If not in Fast Recovery, adjust the ssthresh and cwnd of the
						//     destination address(es) to which the missing DATA chunks were
						//     last sent, according to the formula described in Section 7.2.3.
						.inFastRecovery = true
						.fastRecoverExitPoint = 
						.ssthresh = max32(.CWND()/2, 4*.MTU())
						.setCWND(.ssthresh)
						.partialBytesAcked = 0
						.willRetransmitFast = true

						.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (FR)",
							.name, .CWND(), .ssthresh, .inflightQueue.getNumBytes())
					}
				}
			}
		}
	}

	if .inFastRecovery &&  {
		.willRetransmitFast = true
	}

	return nil
}

// The caller should hold the lock.
//
//nolint:cyclop
func ( *Association) ( *chunkSelectiveAck) error {
	.log.Tracef(
		"[%s] SACK: cumTSN=%d a_rwnd=%d",
		.name, .cumulativeTSNAck, .advertisedReceiverWindowCredit,
	)
	 := .getState()
	if  != established &&  != shutdownPending &&  != shutdownReceived {
		return nil
	}

	.stats.incSACKsReceived()

	if sna32GT(.cumulativeTSNAckPoint, .cumulativeTSNAck) {
		// RFC 4960 sec 6.2.1.  Processing a Received SACK
		// D)
		//   i) If Cumulative TSN Ack is less than the Cumulative TSN Ack
		//      Point, then drop the SACK.  Since Cumulative TSN Ack is
		//      monotonically increasing, a SACK whose Cumulative TSN Ack is
		//      less than the Cumulative TSN Ack Point indicates an out-of-
		//      order SACK.

		.log.Debugf("[%s] SACK Cumulative ACK %v is older than ACK point %v",
			.name,
			.cumulativeTSNAck,
			.cumulativeTSNAckPoint)

		return nil
	}

	// Process selective ack
	, ,  := .processSelectiveAck()
	if  != nil {
		return 
	}

	var  int
	for ,  := range  {
		 += 
	}

	 := false
	if sna32LT(.cumulativeTSNAckPoint, .cumulativeTSNAck) {
		.log.Tracef("[%s] SACK: cumTSN advanced: %d -> %d",
			.name,
			.cumulativeTSNAckPoint,
			.cumulativeTSNAck)

		.cumulativeTSNAckPoint = .cumulativeTSNAck
		 = true
		.onCumulativeTSNAckPointAdvanced()
	}

	for ,  := range  {
		if ,  := .streams[];  {
			.lock.Unlock()
			.onBufferReleased()
			.lock.Lock()
		}
	}

	// New rwnd value
	// RFC 4960 sec 6.2.1.  Processing a Received SACK
	// D)
	//   ii) Set rwnd equal to the newly received a_rwnd minus the number
	//       of bytes still outstanding after processing the Cumulative
	//       TSN Ack and the Gap Ack Blocks.

	// bytes acked were already subtracted by markAsAcked() method
	 := uint32(.inflightQueue.getNumBytes()) //nolint:gosec // G115
	if  >= .advertisedReceiverWindowCredit {
		.setRWND(0)
	} else {
		.setRWND(.advertisedReceiverWindowCredit - )
	}

	 = .processFastRetransmission(
		.cumulativeTSNAck, .gapAckBlocks, , ,
	)
	if  != nil {
		return 
	}

	if .useForwardTSN {
		// RFC 3758 Sec 3.5 C1
		if sna32LT(.advancedPeerTSNAckPoint, .cumulativeTSNAckPoint) {
			.advancedPeerTSNAckPoint = .cumulativeTSNAckPoint
		}

		// RFC 3758 Sec 3.5 C2
		for  := .advancedPeerTSNAckPoint + 1; ; ++ {
			,  := .inflightQueue.get()
			if ! {
				break
			}
			if !.abandoned() {
				break
			}
			.advancedPeerTSNAckPoint = 
		}

		// RFC 3758 Sec 3.5 C3
		if sna32GT(.advancedPeerTSNAckPoint, .cumulativeTSNAckPoint) {
			.willSendForwardTSN = true
		}
		.awakeWriteLoop()
	}

	.postprocessSack(, )

	return nil
}

// The caller must hold the lock. This method was only added because the
// linter was complaining about the "cognitive complexity" of handleSack.
func ( *Association) ( uint32,  bool) {
	switch {
	case .inflightQueue.size() > 0:
		// Start timer. (noop if already started)
		.log.Tracef("[%s] T3-rtx timer start (pt3)", .name)
		.t3RTX.start(.rtoMgr.getRTO())
	case  == shutdownPending:
		// No more outstanding, send shutdown.
		 = true
		.willSendShutdown = true
		.setState(shutdownSent)
	case  == shutdownReceived:
		// No more outstanding, send shutdown ack.
		 = true
		.willSendShutdownAck = true
		.setState(shutdownAckSent)
	}

	if  {
		.awakeWriteLoop()
	}
}

// The caller should hold the lock.
func ( *Association) ( *chunkShutdown) {
	 := .getState()

	switch  {
	case established:
		if .inflightQueue.size() > 0 {
			.setState(shutdownReceived)
		} else {
			// No more outstanding, send shutdown ack.
			.willSendShutdownAck = true
			.setState(shutdownAckSent)

			.awakeWriteLoop()
		}

		// a.cumulativeTSNAckPoint = c.cumulativeTSNAck
	case shutdownSent:
		.willSendShutdownAck = true
		.setState(shutdownAckSent)

		.awakeWriteLoop()
	}
}

// The caller should hold the lock.
func ( *Association) ( *chunkShutdownAck) {
	 := .getState()
	if  == shutdownSent ||  == shutdownAckSent {
		.t2Shutdown.stop()
		.willSendShutdownComplete = true

		.awakeWriteLoop()
	}
}

func ( *Association) ( *chunkShutdownComplete) error {
	 := .getState()
	if  == shutdownAckSent {
		.t2Shutdown.stop()

		return .close()
	}

	return nil
}

func ( *Association) ( *chunkAbort) error {
	var  string
	for ,  := range .errorCauses {
		 += fmt.Sprintf("(%s)", )
	}

	_ = .close()

	return fmt.Errorf("[%s] %w: %s", .name, ErrChunk, )
}

// createForwardTSN generates ForwardTSN chunk.
// This method will be be called if useForwardTSN is set to false.
// The caller should hold the lock.
func ( *Association) () *chunkForwardTSN {
	// RFC 3758 Sec 3.5 C4
	 := map[uint16]uint16{} // to report only once per SI
	for  := .cumulativeTSNAckPoint + 1; sna32LTE(, .advancedPeerTSNAckPoint); ++ {
		,  := .inflightQueue.get()
		if ! {
			break
		}

		,  := [.streamIdentifier]
		if ! {
			[.streamIdentifier] = .streamSequenceNumber
		} else if sna16LT(, .streamSequenceNumber) {
			// to report only once with greatest SSN
			[.streamIdentifier] = .streamSequenceNumber
		}
	}

	 := &chunkForwardTSN{
		newCumulativeTSN: .advancedPeerTSNAckPoint,
		streams:          []chunkForwardTSNStream{},
	}

	var  string
	for ,  := range  {
		 += fmt.Sprintf("(si=%d ssn=%d)", , )
		.streams = append(.streams, chunkForwardTSNStream{
			identifier: ,
			sequence:   ,
		})
	}
	.log.Tracef(
		"[%s] building fwdtsn: newCumulativeTSN=%d cumTSN=%d - %s",
		.name, .newCumulativeTSN, .cumulativeTSNAckPoint, ,
	)

	return 
}

// createPacket wraps chunks in a packet.
// The caller should hold the read lock.
func ( *Association) ( []chunk) *packet {
	return &packet{
		verificationTag: .peerVerificationTag,
		sourcePort:      .sourcePort,
		destinationPort: .destinationPort,
		chunks:          ,
	}
}

// The caller should hold the lock.
func ( *Association) ( *chunkReconfig) ([]*packet, error) {
	.log.Tracef("[%s] handleReconfig", .name)

	 := make([]*packet, 0)

	,  := .handleReconfigParam(.paramA)
	if  != nil {
		return nil, 
	}
	if  != nil {
		 = append(, )
	}

	if .paramB != nil {
		,  = .handleReconfigParam(.paramB)
		if  != nil {
			return nil, 
		}
		if  != nil {
			 = append(, )
		}
	}

	return , nil
}

// The caller should hold the lock.
func ( *Association) ( *chunkForwardTSN) []*packet {
	.log.Tracef("[%s] FwdTSN: %s", .name, .String())

	if !.useForwardTSN {
		.log.Warn("[%s] received FwdTSN but not enabled")
		// Return an error chunk
		 := &chunkError{
			errorCauses: []errorCause{&errorCauseUnrecognizedChunkType{}},
		}
		 := &packet{}
		.verificationTag = .peerVerificationTag
		.sourcePort = .sourcePort
		.destinationPort = .destinationPort
		.chunks = []chunk{}

		return []*packet{}
	}

	// From RFC 3758 Sec 3.6:
	//   Note, if the "New Cumulative TSN" value carried in the arrived
	//   FORWARD TSN chunk is found to be behind or at the current cumulative
	//   TSN point, the data receiver MUST treat this FORWARD TSN as out-of-
	//   date and MUST NOT update its Cumulative TSN.  The receiver SHOULD
	//   send a SACK to its peer (the sender of the FORWARD TSN) since such a
	//   duplicate may indicate the previous SACK was lost in the network.

	.log.Tracef("[%s] should send ack? newCumTSN=%d peerLastTSN=%d",
		.name, .newCumulativeTSN, .peerLastTSN())
	if sna32LTE(.newCumulativeTSN, .peerLastTSN()) {
		.log.Tracef("[%s] sending ack on Forward TSN", .name)
		.ackState = ackStateImmediate
		.ackTimer.stop()
		.awakeWriteLoop()

		return nil
	}

	// From RFC 3758 Sec 3.6:
	//   the receiver MUST perform the same TSN handling, including duplicate
	//   detection, gap detection, SACK generation, cumulative TSN
	//   advancement, etc. as defined in RFC 2960 [2]---with the following
	//   exceptions and additions.

	//   When a FORWARD TSN chunk arrives, the data receiver MUST first update
	//   its cumulative TSN point to the value carried in the FORWARD TSN
	//   chunk,

	// Advance peerLastTSN
	for sna32LT(.peerLastTSN(), .newCumulativeTSN) {
		.payloadQueue.pop(true) // may not exist
	}

	// Report new peerLastTSN value and abandoned largest SSN value to
	// corresponding streams so that the abandoned chunks can be removed
	// from the reassemblyQueue.
	for ,  := range .streams {
		if ,  := .streams[.identifier];  {
			.handleForwardTSNForOrdered(.sequence)
		}
	}

	// TSN may be forewared for unordered chunks. ForwardTSN chunk does not
	// report which stream identifier it skipped for unordered chunks.
	// Therefore, we need to broadcast this event to all existing streams for
	// unordered chunks.
	// See https://github.com/pion/sctp/issues/106
	for ,  := range .streams {
		.handleForwardTSNForUnordered(.newCumulativeTSN)
	}

	return .handlePeerLastTSNAndAcknowledgement(false)
}

func ( *Association) ( uint16) error {
	.lock.Lock()
	defer .lock.Unlock()

	 := .getState()
	if  != established {
		return fmt.Errorf("%w: state=%s", ErrResetPacketInStateNotExist,
			getAssociationStateString())
	}

	// Create DATA chunk which only contains valid stream identifier with
	// nil userData and use it as a EOS from the stream.
	 := &chunkPayloadData{
		streamIdentifier:  ,
		beginningFragment: true,
		endingFragment:    true,
		userData:          nil,
	}

	.pendingQueue.push()
	.awakeWriteLoop()

	return nil
}

// The caller should hold the lock.
func ( *Association) ( param) (*packet, error) {
	switch par := .(type) {
	case *paramOutgoingResetRequest:
		.log.Tracef("[%s] handleReconfigParam (OutgoingResetRequest)", .name)
		if .peerLastTSN() < .senderLastTSN && len(.reconfigRequests) >= maxReconfigRequests {
			// We have too many reconfig requests outstanding. Drop the request and let
			// the peer retransmit. A well behaved peer should only have 1 outstanding
			// reconfig request.
			//
			// RFC 6525: https://www.rfc-editor.org/rfc/rfc6525.html#section-5.1.1
			//    At any given time, there MUST NOT be more than one request in flight.
			//    So, if the Re-configuration Timer is running and the RE-CONFIG chunk
			//    contains at least one request parameter, the chunk MUST be buffered.
			// chrome:
			// https://chromium.googlesource.com/external/webrtc/+/refs/heads/main/net/dcsctp/socket/stream_reset_handler.cc#271
			return nil, fmt.Errorf("%w: %d", ErrTooManyReconfigRequests, len(.reconfigRequests))
		}
		.reconfigRequests[.reconfigRequestSequenceNumber] = 
		 := .resetStreamsIfAny()
		if  != nil {
			return , nil
		}

		return nil, nil //nolint:nilnil
	case *paramReconfigResponse:
		.log.Tracef("[%s] handleReconfigParam (ReconfigResponse)", .name)
		if .result == reconfigResultInProgress {
			// RFC 6525: https://www.rfc-editor.org/rfc/rfc6525.html#section-5.2.7
			//
			//   If the Result field indicates "In progress", the timer for the
			//   Re-configuration Request Sequence Number is started again.  If
			//   the timer runs out, the RE-CONFIG chunk MUST be retransmitted
			//   but the corresponding error counters MUST NOT be incremented.
			if ,  := .reconfigs[.reconfigResponseSequenceNumber];  {
				.tReconfig.stop()
				.tReconfig.start(.rtoMgr.getRTO())
			}

			return nil, nil //nolint:nilnil
		}
		delete(.reconfigs, .reconfigResponseSequenceNumber)
		if len(.reconfigs) == 0 {
			.tReconfig.stop()
		}

		return nil, nil //nolint:nilnil
	default:
		return nil, fmt.Errorf("%w: %t", ErrParamterType, )
	}
}

// The caller should hold the lock.
func ( *Association) ( *paramOutgoingResetRequest) *packet {
	 := reconfigResultSuccessPerformed
	if sna32LTE(.senderLastTSN, .peerLastTSN()) {
		.log.Debugf("[%s] resetStream(): senderLastTSN=%d <= peerLastTSN=%d",
			.name, .senderLastTSN, .peerLastTSN())
		for ,  := range .streamIdentifiers {
			,  := .streams[]
			if ! {
				continue
			}
			.lock.Unlock()
			.onInboundStreamReset()
			.lock.Lock()
			.log.Debugf("[%s] deleting stream %d", .name, )
			delete(.streams, .streamIdentifier)
		}
		delete(.reconfigRequests, .reconfigRequestSequenceNumber)
	} else {
		.log.Debugf("[%s] resetStream(): senderLastTSN=%d > peerLastTSN=%d",
			.name, .senderLastTSN, .peerLastTSN())
		 = reconfigResultInProgress
	}

	return .createPacket([]chunk{&chunkReconfig{
		paramA: &paramReconfigResponse{
			reconfigResponseSequenceNumber: .reconfigRequestSequenceNumber,
			result:                         ,
		},
	}})
}

// Move the chunk peeked with a.pendingQueue.peek() to the inflightQueue.
// The caller should hold the lock.
func ( *Association) ( *chunkPayloadData) {
	if  := .pendingQueue.pop();  != nil {
		.log.Errorf("[%s] failed to pop from pending queue: %s", .name, .Error())
	}

	// Mark all fragements are in-flight now
	if .endingFragment {
		.setAllInflight()
	}

	// Assign TSN
	.tsn = .generateNextTSN()

	.since = time.Now() // use to calculate RTT and also for maxPacketLifeTime
	.nSent = 1          // being sent for the first time

	.checkPartialReliabilityStatus()

	.log.Tracef(
		"[%s] sending ppi=%d tsn=%d ssn=%d sent=%d len=%d (%v,%v)",
		.name,
		.payloadType,
		.tsn,
		.streamSequenceNumber,
		.nSent,
		len(.userData),
		.beginningFragment,
		.endingFragment,
	)

	.inflightQueue.pushNoCheck()
}

// popPendingDataChunksToSend pops chunks from the pending queues as many as
// the cwnd and rwnd allows to send.
// The caller should hold the lock.
//
//nolint:cyclop
func ( *Association) () ([]*chunkPayloadData, []uint16) {
	 := []*chunkPayloadData{}
	var  []uint16 // stream identifieres to reset

	if .pendingQueue.size() > 0 { //nolint:nestif
		// RFC 4960 sec 6.1.  Transmission of DATA Chunks
		//   A) At any given time, the data sender MUST NOT transmit new data to
		//      any destination transport address if its peer's rwnd indicates
		//      that the peer has no buffer space (i.e., rwnd is 0; see Section
		//      6.2.1).  However, regardless of the value of rwnd (including if it
		//      is 0), the data sender can always have one DATA chunk in flight to
		//      the receiver if allowed by cwnd (see rule B, below).

		for {
			 := .pendingQueue.peek()
			if  == nil {
				break // no more pending data
			}

			 := uint32(len(.userData)) //nolint:gosec // G115
			if  == 0 {
				 = append(, .streamIdentifier)
				 := .pendingQueue.pop()
				if  != nil {
					.log.Errorf("failed to pop from pending queue: %s", .Error())
				}

				continue
			}

			if uint32(.inflightQueue.getNumBytes())+ > .CWND() { //nolint:gosec // G115
				break // would exceeds cwnd
			}

			if  > .RWND() {
				break // no more rwnd
			}

			.setRWND(.RWND() - )

			.movePendingDataChunkToInflightQueue()
			 = append(, )
		}

		// the data sender can always have one DATA chunk in flight to the receiver
		if len() == 0 && .inflightQueue.size() == 0 {
			// Send zero window probe
			 := .pendingQueue.peek()
			if  != nil {
				.movePendingDataChunkToInflightQueue()
				 = append(, )
			}
		}
	}

	if .blockWrite && len() > 0 && .pendingQueue.size() == 0 {
		.log.Tracef("[%s] all pending data have been sent, notify writable", .name)
		.notifyBlockWritable()
	}

	return , 
}

// bundleDataChunksIntoPackets packs DATA chunks into packets. It tries to bundle
// DATA chunks into a packet so long as the resulting packet size does not exceed
// the path MTU.
// The caller should hold the lock.
func ( *Association) ( []*chunkPayloadData) []*packet {
	 := []*packet{}
	 := []chunk{}
	 := int(commonHeaderSize)

	for ,  := range  {
		// RFC 4960 sec 6.1.  Transmission of DATA Chunks
		//   Multiple DATA chunks committed for transmission MAY be bundled in a
		//   single packet.  Furthermore, DATA chunks being retransmitted MAY be
		//   bundled with new DATA chunks, as long as the resulting packet size
		//   does not exceed the path MTU.
		 := int(dataChunkHeaderSize) + len(.userData)
		 += getPadding()
		if + > int(.MTU()) {
			 = append(, .createPacket())
			 = []chunk{}
			 = int(commonHeaderSize)
		}
		 = append(, )
		 += 
	}

	if len() > 0 {
		 = append(, .createPacket())
	}

	return 
}

// sendPayloadData sends the data chunks.
func ( *Association) ( context.Context,  []*chunkPayloadData) error {
	.lock.Lock()

	 := .getState()
	if  != established {
		.lock.Unlock()

		return fmt.Errorf("%w: state=%s", ErrPayloadDataStateNotExist,
			getAssociationStateString())
	}

	if .blockWrite {
		for .writePending {
			.lock.Unlock()
			select {
			case <-.Done():
				return .Err()
			case <-.writeNotify:
				.lock.Lock()
			}
		}
		.writePending = true
	}

	// Push the chunks into the pending queue first.
	for ,  := range  {
		.pendingQueue.push()
	}

	.lock.Unlock()
	.awakeWriteLoop()

	return nil
}

// The caller should hold the lock.
func ( *Association) ( *chunkPayloadData) {
	if !.useForwardTSN {
		return
	}

	// draft-ietf-rtcweb-data-protocol-09.txt section 6
	//	6.  Procedures
	//		All Data Channel Establishment Protocol messages MUST be sent using
	//		ordered delivery and reliable transmission.
	//
	if .payloadType == PayloadTypeWebRTCDCEP {
		return
	}

	// PR-SCTP
	if ,  := .streams[.streamIdentifier];  { //nolint:nestif
		.lock.RLock()
		if .reliabilityType == ReliabilityTypeRexmit {
			if .nSent >= .reliabilityValue {
				.setAbandoned(true)
				.log.Tracef(
					"[%s] marked as abandoned: tsn=%d ppi=%d (remix: %d)",
					.name, .tsn, .payloadType, .nSent,
				)
			}
		} else if .reliabilityType == ReliabilityTypeTimed {
			 := int64(time.Since(.since).Seconds() * 1000)
			if  >= int64(.reliabilityValue) {
				.setAbandoned(true)
				.log.Tracef(
					"[%s] marked as abandoned: tsn=%d ppi=%d (timed: %d)",
					.name, .tsn, .payloadType, ,
				)
			}
		}
		.lock.RUnlock()
	} else {
		// Remote has reset its send side of the stream, we can still send data.
		.log.Tracef("[%s] stream %d not found, remote reset", .name, .streamIdentifier)
	}
}

// getDataPacketsToRetransmit is called when T3-rtx is timed out and retransmit outstanding data chunks
// that are not acked or abandoned yet.
// The caller should hold the lock.
func ( *Association) () []*packet {
	 := min32(.CWND(), .RWND())
	 := []*chunkPayloadData{}
	var  int
	var  bool

	for  := 0; !; ++ {
		,  := .inflightQueue.get(.cumulativeTSNAckPoint + uint32() + 1) //nolint:gosec // G115
		if ! {
			break // end of pending data
		}

		if !.retransmit {
			continue
		}

		if  == 0 && int(.RWND()) < len(.userData) {
			// Send it as a zero window probe
			 = true
		} else if +len(.userData) > int() {
			break
		}

		// reset the retransmit flag not to retransmit again before the next
		// t3-rtx timer fires
		.retransmit = false
		 += len(.userData)

		.nSent++

		.checkPartialReliabilityStatus()

		.log.Tracef(
			"[%s] retransmitting tsn=%d ssn=%d sent=%d",
			.name, .tsn, .streamSequenceNumber, .nSent,
		)

		 = append(, )
	}

	return .bundleDataChunksIntoPackets()
}

// generateNextTSN returns the myNextTSN and increases it. The caller should hold the lock.
// The caller should hold the lock.
func ( *Association) () uint32 {
	 := .myNextTSN
	.myNextTSN++

	return 
}

// generateNextRSN returns the myNextRSN and increases it. The caller should hold the lock.
// The caller should hold the lock.
func ( *Association) () uint32 {
	 := .myNextRSN
	.myNextRSN++

	return 
}

func ( *Association) () *chunkSelectiveAck {
	 := &chunkSelectiveAck{}
	.cumulativeTSNAck = .peerLastTSN()
	.advertisedReceiverWindowCredit = .getMyReceiverWindowCredit()
	.duplicateTSN = .payloadQueue.popDuplicates()
	.gapAckBlocks = .payloadQueue.getGapAckBlocks()

	return 
}

func pack( *packet) []*packet {
	return []*packet{}
}

func ( *Association) () {
	.lock.Lock()
	defer .lock.Unlock()

	.stats.incPacketsReceived()

	.delayedAckTriggered = false
	.immediateAckTriggered = false
}

func ( *Association) () {
	.lock.Lock()
	defer .lock.Unlock()

	if .immediateAckTriggered {
		.ackState = ackStateImmediate
		.ackTimer.stop()
		.awakeWriteLoop()
	} else if .delayedAckTriggered {
		// Will send delayed ack in the next ack timeout
		.ackState = ackStateDelay
		.ackTimer.start()
	}
}

func ( *Association) ( *packet,  chunk) error { //nolint:cyclop
	.lock.Lock()
	defer .lock.Unlock()

	var  []*packet
	var  error

	if _,  = .check();  != nil {
		.log.Errorf("[%s] failed validating chunk: %s ", .name, )

		return nil
	}

	 := false

	switch receivedChunk := .(type) {
	// Note: We do not do the following for chunkInit, chunkInitAck, and chunkCookieEcho:
	// If an endpoint receives an INIT, INIT ACK, or COOKIE ECHO chunk but decides not to establish the
	// new association due to missing mandatory parameters in the received INIT or INIT ACK chunk, invalid
	// parameter values, or lack of local resources, it SHOULD respond with an ABORT chunk.

	case *chunkInit:
		,  = .handleInit(, )

	case *chunkInitAck:
		 = .handleInitAck(, )

	case *chunkAbort:
		 = true
		 = .handleAbort()

	case *chunkError:
		var  string
		for ,  := range .errorCauses {
			 += fmt.Sprintf("(%s)", )
		}
		.log.Debugf("[%s] Error chunk, with following errors: %s", .name, )

	// Note: chunkHeartbeatAck not handled?
	case *chunkHeartbeat:
		 = .handleHeartbeat()

	case *chunkCookieEcho:
		 = .handleCookieEcho()

	case *chunkCookieAck:
		.handleCookieAck()

	case *chunkPayloadData:
		 = .handleData()

	case *chunkSelectiveAck:
		 = .handleSack()

	case *chunkReconfig:
		,  = .handleReconfig()

	case *chunkForwardTSN:
		 = .handleForwardTSN()

	case *chunkShutdown:
		.handleShutdown()
	case *chunkShutdownAck:
		.handleShutdownAck()
	case *chunkShutdownComplete:
		 = .handleShutdownComplete()

	default:
		 = ErrChunkTypeUnhandled
	}

	// Log and return, the only condition that is fatal is a ABORT chunk
	if  != nil {
		if  {
			return 
		}

		.log.Errorf("Failed to handle chunk: %v", )

		return nil
	}

	if len() > 0 {
		.controlQueue.pushAll()
		.awakeWriteLoop()
	}

	return nil
}

func ( *Association) ( int,  uint) { //nolint:cyclop
	.lock.Lock()
	defer .lock.Unlock()

	// TSN hasn't been incremented in 3 attempts. Speculatively
	// toggle ZeroChecksum because old Pion versions had a broken implementation
	if .cumulativeTSNAckPoint+1 == .initialTSN && %3 == 0 {
		.sendZeroChecksum = !.sendZeroChecksum
	}

	if  == timerT1Init {
		 := .sendInit()
		if  != nil {
			.log.Debugf("[%s] failed to retransmit init (nRtos=%d): %v", .name, , )
		}

		return
	}

	if  == timerT1Cookie {
		 := .sendCookieEcho()
		if  != nil {
			.log.Debugf("[%s] failed to retransmit cookie-echo (nRtos=%d): %v", .name, , )
		}

		return
	}

	if  == timerT2Shutdown {
		.log.Debugf("[%s] retransmission of shutdown timeout (nRtos=%d): %v", .name, )
		 := .getState()

		switch  {
		case shutdownSent:
			.willSendShutdown = true
			.awakeWriteLoop()
		case shutdownAckSent:
			.willSendShutdownAck = true
			.awakeWriteLoop()
		}
	}

	if  == timerT3RTX { //nolint:nestif
		.stats.incT3Timeouts()

		// RFC 4960 sec 6.3.3
		//  E1)  For the destination address for which the timer expires, adjust
		//       its ssthresh with rules defined in Section 7.2.3 and set the
		//       cwnd <- MTU.
		// RFC 4960 sec 7.2.3
		//   When the T3-rtx timer expires on an address, SCTP should perform slow
		//   start by:
		//      ssthresh = max(cwnd/2, 4*MTU)
		//      cwnd = 1*MTU

		.ssthresh = max32(.CWND()/2, 4*.MTU())
		.setCWND(.MTU())
		.log.Tracef("[%s] updated cwnd=%d ssthresh=%d inflight=%d (RTO)",
			.name, .CWND(), .ssthresh, .inflightQueue.getNumBytes())

		// RFC 3758 sec 3.5
		//  A5) Any time the T3-rtx timer expires, on any destination, the sender
		//  SHOULD try to advance the "Advanced.Peer.Ack.Point" by following
		//  the procedures outlined in C2 - C5.
		if .useForwardTSN {
			// RFC 3758 Sec 3.5 C2
			for  := .advancedPeerTSNAckPoint + 1; ; ++ {
				,  := .inflightQueue.get()
				if ! {
					break
				}
				if !.abandoned() {
					break
				}
				.advancedPeerTSNAckPoint = 
			}

			// RFC 3758 Sec 3.5 C3
			if sna32GT(.advancedPeerTSNAckPoint, .cumulativeTSNAckPoint) {
				.willSendForwardTSN = true
			}
		}

		.log.Debugf("[%s] T3-rtx timed out: nRtos=%d cwnd=%d ssthresh=%d", .name, , .CWND(), .ssthresh)

		/*
			a.log.Debugf("   - advancedPeerTSNAckPoint=%d", a.advancedPeerTSNAckPoint)
			a.log.Debugf("   - cumulativeTSNAckPoint=%d", a.cumulativeTSNAckPoint)
			a.inflightQueue.updateSortedKeys()
			for i, tsn := range a.inflightQueue.sorted {
				if c, ok := a.inflightQueue.get(tsn); ok {
					a.log.Debugf("   - [%d] tsn=%d acked=%v abandoned=%v (%v,%v) len=%d",
						i, c.tsn, c.acked, c.abandoned(), c.beginningFragment, c.endingFragment, len(c.userData))
				}
			}
		*/

		.inflightQueue.markAllToRetrasmit()
		.awakeWriteLoop()

		return
	}

	if  == timerReconfig {
		.willRetransmitReconfig = true
		.awakeWriteLoop()
	}
}

func ( *Association) ( int) {
	.lock.Lock()
	defer .lock.Unlock()

	if  == timerT1Init {
		.log.Errorf("[%s] retransmission failure: T1-init", .name)
		.completeHandshake(ErrHandshakeInitAck)

		return
	}

	if  == timerT1Cookie {
		.log.Errorf("[%s] retransmission failure: T1-cookie", .name)
		.completeHandshake(ErrHandshakeCookieEcho)

		return
	}

	if  == timerT2Shutdown {
		.log.Errorf("[%s] retransmission failure: T2-shutdown", .name)

		return
	}

	if  == timerT3RTX {
		// T3-rtx timer will not fail by design
		// Justifications:
		//  * ICE would fail if the connectivity is lost
		//  * WebRTC spec is not clear how this incident should be reported to ULP
		.log.Errorf("[%s] retransmission failure: T3-rtx (DATA)", .name)

		return
	}
}

func ( *Association) () {
	.lock.Lock()
	defer .lock.Unlock()

	.log.Tracef("[%s] ack timed out (ackState: %d)", .name, .ackState)
	.stats.incAckTimeouts()

	.ackState = ackStateImmediate
	.awakeWriteLoop()
}

// BufferedAmount returns total amount (in bytes) of currently buffered user data.
func ( *Association) () int {
	.lock.RLock()
	defer .lock.RUnlock()

	return .pendingQueue.getNumBytes() + .inflightQueue.getNumBytes()
}

// MaxMessageSize returns the maximum message size you can send.
func ( *Association) () uint32 {
	return atomic.LoadUint32(&.maxMessageSize)
}

// SetMaxMessageSize sets the maximum message size you can send.
func ( *Association) ( uint32) {
	atomic.StoreUint32(&.maxMessageSize, )
}

// completeHandshake sends the given error to  handshakeCompletedCh unless the read/write
// side of the association closes before that can happen. It returns whether it was able
// to send on the channel or not.
func ( *Association) ( error) bool {
	select {
	// Note: This is a future place where the user could be notified (COMMUNICATION UP)
	case .handshakeCompletedCh <- :
		return true
	case <-.closeWriteLoopCh: // check the read/write sides for closure
	case <-.readLoopCloseCh:
	}

	return false
}