package yamux

import (
	
	
	
	
	
	
	
	
	
	
	
	
	

	pool 
)

// The MemoryManager allows management of memory allocations.
// Memory is allocated:
// 1. When opening / accepting a new stream. This uses the highest priority.
// 2. When trying to increase the stream receive window. This uses a lower priority.
// This is a subset of the libp2p's resource manager ResourceScopeSpan interface.
type MemoryManager interface {
	ReserveMemory(size int, prio uint8) error

	// ReleaseMemory explicitly releases memory previously reserved with ReserveMemory
	ReleaseMemory(size int)

	// Done ends the span and releases associated resources.
	Done()
}

type nullMemoryManagerImpl struct{}

func ( nullMemoryManagerImpl) ( int,  uint8) error { return nil }
func ( nullMemoryManagerImpl) ( int)                   {}
func ( nullMemoryManagerImpl) ()                                    {}

var nullMemoryManager = &nullMemoryManagerImpl{}

// Session is used to wrap a reliable ordered connection and to
// multiplex it into multiple streams.
type Session struct {
	rtt int64 // to be accessed atomically, in nanoseconds

	// localGoAway indicates that we should stop
	// accepting futher connections. Must be first for alignment.
	localGoAway int32

	// nextStreamID is the next stream we should
	// send. This depends if we are a client/server.
	nextStreamID uint32

	// config holds our configuration
	config *Config

	// logger is used for our logs
	logger *log.Logger

	// conn is the underlying connection
	conn net.Conn

	// reader is a buffered reader
	reader io.Reader

	newMemoryManager func() (MemoryManager, error)

	// pings is used to track inflight pings
	pingLock   sync.Mutex
	pingID     uint32
	activePing *ping

	// streams maps a stream id to a stream, and inflight has an entry
	// for any outgoing stream that has not yet been established. Both are
	// protected by streamLock.
	numIncomingStreams uint32
	streams            map[uint32]*Stream
	inflight           map[uint32]struct{}
	streamLock         sync.Mutex

	// synCh acts like a semaphore. It is sized to the AcceptBacklog which
	// is assumed to be symmetric between the client and server. This allows
	// the client to avoid exceeding the backlog and instead blocks the open.
	synCh chan struct{}

	// acceptCh is used to pass ready streams to the client
	acceptCh chan *Stream

	// sendCh is used to send messages
	sendCh chan []byte

	// pingCh and pingCh are used to send pings and pongs
	pongCh, pingCh chan uint32

	// recvDoneCh is closed when recv() exits to avoid a race
	// between stream registration and stream shutdown
	recvDoneCh chan struct{}
	// recvErr is the error the receive loop ended with
	recvErr error

	// sendDoneCh is closed when send() exits to avoid a race
	// between returning from a Stream.Write and exiting from the send loop
	// (which may be reading a buffer on-load-from Stream.Write).
	sendDoneCh chan struct{}

	// client is true if we're the client and our stream IDs should be odd.
	client bool

	// shutdown is used to safely close a session
	shutdown     bool
	shutdownErr  error
	shutdownCh   chan struct{}
	shutdownLock sync.Mutex

	// keepaliveTimer is a periodic timer for keepalive messages. It's nil
	// when keepalives are disabled.
	keepaliveLock   sync.Mutex
	keepaliveTimer  *time.Timer
	keepaliveActive bool
}

// newSession is used to construct a new session
func newSession( *Config,  net.Conn,  bool,  int,  func() (MemoryManager, error)) *Session {
	var  io.Reader = 
	if  > 0 {
		 = bufio.NewReaderSize(, )
	}
	if  == nil {
		 = func() (MemoryManager, error) { return nullMemoryManager, nil }
	}
	 := &Session{
		config:           ,
		client:           ,
		logger:           log.New(.LogOutput, "", log.LstdFlags),
		conn:             ,
		reader:           ,
		streams:          make(map[uint32]*Stream),
		inflight:         make(map[uint32]struct{}),
		synCh:            make(chan struct{}, .AcceptBacklog),
		acceptCh:         make(chan *Stream, .AcceptBacklog),
		sendCh:           make(chan []byte, 64),
		pongCh:           make(chan uint32, .PingBacklog),
		pingCh:           make(chan uint32),
		recvDoneCh:       make(chan struct{}),
		sendDoneCh:       make(chan struct{}),
		shutdownCh:       make(chan struct{}),
		newMemoryManager: ,
	}
	if  {
		.nextStreamID = 1
	} else {
		.nextStreamID = 2
	}
	if .EnableKeepAlive {
		.startKeepalive()
	}
	go .recv()
	go .send()
	go .startMeasureRTT()
	return 
}

// IsClosed does a safe check to see if we have shutdown
func ( *Session) () bool {
	select {
	case <-.shutdownCh:
		return true
	default:
		return false
	}
}

// CloseChan returns a read-only channel which is closed as
// soon as the session is closed.
func ( *Session) () <-chan struct{} {
	return .shutdownCh
}

// NumStreams returns the number of currently open streams
func ( *Session) () int {
	.streamLock.Lock()
	 := len(.streams)
	.streamLock.Unlock()
	return 
}

// Open is used to create a new stream as a net.Conn
func ( *Session) ( context.Context) (net.Conn, error) {
	,  := .OpenStream()
	if  != nil {
		return nil, 
	}
	return , nil
}

// OpenStream is used to create a new stream
func ( *Session) ( context.Context) (*Stream, error) {
	if .IsClosed() {
		return nil, .shutdownErr
	}

	// Block if we have too many inflight SYNs
	select {
	case .synCh <- struct{}{}:
	case <-.Done():
		return nil, .Err()
	case <-.shutdownCh:
		return nil, .shutdownErr
	}

	,  := .newMemoryManager()
	if  != nil {
		return nil, fmt.Errorf("failed to create resource scope span: %w", )
	}
	if  := .ReserveMemory(initialStreamWindow, 255);  != nil {
		return nil, 
	}

:
	// Get an ID, and check for stream exhaustion
	 := atomic.LoadUint32(&.nextStreamID)
	if  >= math.MaxUint32-1 {
		.Done()
		return nil, ErrStreamsExhausted
	}
	if !atomic.CompareAndSwapUint32(&.nextStreamID, , +2) {
		goto 
	}

	// Register the stream
	 := newStream(, , streamInit, initialStreamWindow, )
	.streamLock.Lock()
	.streams[] = 
	.inflight[] = struct{}{}
	.streamLock.Unlock()

	// Send the window update to create
	if  := .sendWindowUpdate(.Done());  != nil {
		defer .Done()
		select {
		case <-.synCh:
		default:
			.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore")
		}
		return nil, 
	}
	return , nil
}

// Accept is used to block until the next available stream
// is ready to be accepted.
func ( *Session) () (net.Conn, error) {
	,  := .AcceptStream()
	if  != nil {
		return nil, 
	}
	return , 
}

// AcceptStream is used to block until the next available stream
// is ready to be accepted.
func ( *Session) () (*Stream, error) {
	for {
		select {
		case  := <-.acceptCh:
			if  := .sendWindowUpdate(nil);  != nil {
				// don't return accept errors.
				.logger.Printf("[WARN] error sending window update before accepting: %s", )
				continue
			}
			return , nil
		case <-.shutdownCh:
			return nil, .shutdownErr
		}
	}
}

// Close is used to close the session and all streams. It doesn't send a GoAway before
// closing the connection.
func ( *Session) () error {
	return .close(ErrSessionShutdown, false, goAwayNormal)
}

// CloseWithError is used to close the session and all streams after sending a GoAway message with errCode.
// Blocks for ConnectionWriteTimeout to write the GoAway message.
//
// The GoAway may not actually be sent depending on the semantics of the underlying net.Conn.
// For TCP connections, it may be dropped depending on LINGER value or if there's unread data in the kernel
// receive buffer.
func ( *Session) ( uint32) error {
	return .close(&GoAwayError{Remote: false, ErrorCode: }, true, )
}

func ( *Session) ( error,  bool,  uint32) error {
	.shutdownLock.Lock()
	defer .shutdownLock.Unlock()

	if .shutdown {
		return nil
	}
	.shutdown = true
	if .shutdownErr == nil {
		.shutdownErr = 
	}
	close(.shutdownCh)
	.stopKeepalive()

	// Only send GoAway if we have an error code.
	if  &&  != goAwayNormal {
		// wait for write loop to exit
		// We need to write the current frame completely before sending a goaway.
		// This will wait for at most s.config.ConnectionWriteTimeout
		<-.sendDoneCh
		 := .goAway()
		if  := .conn.SetWriteDeadline(time.Now().Add(goAwayWaitTime));  == nil {
			_, _ = .conn.Write([:]) // there's nothing we can do on error here
		}
		.conn.SetWriteDeadline(time.Time{})
	}

	.conn.Close()
	<-.sendDoneCh
	<-.recvDoneCh

	 := 
	if ,  := .(*GoAwayError); ! {
		 = fmt.Errorf("%w: connection closed: %w", ErrStreamReset, )
	}
	.streamLock.Lock()
	defer .streamLock.Unlock()
	for ,  := range .streams {
		.forceClose()
		delete(.streams, )
		.memorySpan.Done()
	}
	return nil
}

// GoAway can be used to prevent accepting further
// connections. It does not close the underlying conn.
func ( *Session) () error {
	return .sendMsg(.goAway(goAwayNormal), nil, nil, true)
}

// goAway is used to send a goAway message
func ( *Session) ( uint32) header {
	atomic.SwapInt32(&.localGoAway, 1)
	 := encode(typeGoAway, 0, 0, )
	return 
}

func ( *Session) () {
	,  := .Ping()
	if  != nil {
		return
	}
	if !atomic.CompareAndSwapInt64(&.rtt, 0, .Nanoseconds()) {
		 := atomic.LoadInt64(&.rtt)
		 := /2 + .Nanoseconds()/2
		atomic.StoreInt64(&.rtt, )
	}
}

func ( *Session) () {
	.measureRTT()
	 := time.NewTicker(.config.MeasureRTTInterval)
	defer .Stop()
	for {
		select {
		case <-.CloseChan():
			return
		case <-.C:
			.measureRTT()
		}
	}
}

// 0 if we don't yet have a measurement
func ( *Session) () time.Duration {
	return time.Duration(atomic.LoadInt64(&.rtt))
}

// Ping is used to measure the RTT response time
func ( *Session) () ( time.Duration,  error) {
	// Prepare a ping.
	.pingLock.Lock()
	// If there's an active ping, jump on the bandwagon.
	if  := .activePing;  != nil {
		.pingLock.Unlock()
		return .wait()
	}

	// Ok, our job to send the ping.
	 := newPing(.pingID)
	.pingID++
	.activePing = 
	.pingLock.Unlock()

	defer func() {
		// complete ping promise
		.finish(, )

		// Unset it.
		.pingLock.Lock()
		.activePing = nil
		.pingLock.Unlock()
	}()

	// Send the ping request, waiting at most one connection write timeout
	// to flush it.
	 := time.NewTimer(.config.ConnectionWriteTimeout)
	defer .Stop()
	select {
	case .pingCh <- .id:
	case <-.C:
		return 0, ErrTimeout
	case <-.shutdownCh:
		return 0, .shutdownErr
	}

	// The "time" starts once we've actually sent the ping. Otherwise, we'll
	// measure the time it takes to flush the queue as well.
	 := time.Now()

	// Wait for a response, again waiting at most one write timeout.
	if !.Stop() {
		<-.C
	}
	.Reset(.config.ConnectionWriteTimeout)
	select {
	case <-.pingResponse:
	case <-.C:
		return 0, ErrTimeout
	case <-.shutdownCh:
		return 0, .shutdownErr
	}

	// Compute the RTT
	return time.Since(), nil
}

// startKeepalive starts the keepalive process.
func ( *Session) () {
	.keepaliveLock.Lock()
	defer .keepaliveLock.Unlock()
	.keepaliveTimer = time.AfterFunc(.config.KeepAliveInterval, func() {
		.keepaliveLock.Lock()
		if .keepaliveTimer == nil || .keepaliveActive {
			// keepalives have been stopped or a keepalive is active.
			.keepaliveLock.Unlock()
			return
		}
		.keepaliveActive = true
		.keepaliveLock.Unlock()

		,  := .Ping()

		.keepaliveLock.Lock()
		.keepaliveActive = false
		if .keepaliveTimer != nil {
			.keepaliveTimer.Reset(.config.KeepAliveInterval)
		}
		.keepaliveLock.Unlock()

		if  != nil {
			.logger.Printf("[ERR] yamux: keepalive failed: %v", )
			.close(ErrKeepAliveTimeout, false, 0)
		}
	})
}

// stopKeepalive stops the keepalive process.
func ( *Session) () {
	.keepaliveLock.Lock()
	defer .keepaliveLock.Unlock()
	if .keepaliveTimer != nil {
		.keepaliveTimer.Stop()
		.keepaliveTimer = nil
	}
}

func ( *Session) () {
	.keepaliveLock.Lock()
	if .keepaliveTimer != nil && !.keepaliveActive {
		// Don't stop the timer and drain the channel. This is an
		// AfterFunc, not a normal timer, and any attempts to drain the
		// channel will block forever.
		//
		// Go will stop the timer for us internally anyways. The docs
		// say one must stop the timer before calling reset but that's
		// to ensure that the timer doesn't end up firing immediately
		// after calling Reset.
		.keepaliveTimer.Reset(.config.KeepAliveInterval)
	}
	.keepaliveLock.Unlock()
}

// send sends the header and body.
// If waitForShutDown is true, it will wait for shutdown to complete even if the send loop has exited. This
// ensures accurate error reporting. waitForShutDown should be true for callers other than the recvLoop.
// The recvLoop should set waitForShutdown to false to avoid a deadlock.
// For details see: https://github.com/libp2p/go-yamux/issues/129
// and the test `TestSessionCloseDeadlock`
func ( *Session) ( header,  []byte,  <-chan struct{},  bool) error {
	select {
	case <-.shutdownCh:
		return .shutdownErr
	default:
	}

	select {
	case <-:
		return ErrTimeout
	default:
	}

	// duplicate as we're sending this async.
	 := pool.Get(headerSize + len())
	copy([:headerSize], [:])
	copy([headerSize:], )

	select {
	case <-.shutdownCh:
		pool.Put()
		return .shutdownErr
	case <-.sendDoneCh:
		pool.Put()
		if  {
			<-.shutdownCh
			return .shutdownErr
		}
		return errSendLoopDone
	case .sendCh <- :
		return nil
	case <-:
		pool.Put()
		return ErrTimeout
	}
}

// send is a long running goroutine that sends data
func ( *Session) () {
	if  := .sendLoop();  != nil {
		// If we are shutting down because remote closed the connection, prefer the recvLoop error
		// over the sendLoop error. The receive loop might have error code received in a GoAway frame,
		// which was received just before the TCP RST that closed the sendLoop.
		//
		// If we are closing because of an write error, we use the error from the sendLoop and not the recvLoop.
		// We hold the shutdownLock, close the connection, and wait for the receive loop to finish and
		// use the sendLoop error. Holding the shutdownLock ensures that the recvLoop doesn't trigger connection close
		// but the sendLoop does.
		.shutdownLock.Lock()
		if .shutdownErr == nil {
			.conn.Close()
			<-.recvDoneCh
			if ,  := .recvErr.(*GoAwayError);  {
				 = .recvErr
			}
			.shutdownErr = 
		}
		.shutdownLock.Unlock()
		.close(, false, 0)
	}
}

func ( *Session) () ( error) {
	defer func() {
		if  := recover();  != nil {
			fmt.Fprintf(os.Stderr, "caught panic: %s\n%s\n", , debug.Stack())
			 = fmt.Errorf("panic in yamux send loop: %s", )
		}
	}()

	defer close(.sendDoneCh)

	// Extend the write deadline if we've passed the halfway point. This can
	// be expensive so this ensures we only have to do this once every
	// ConnectionWriteTimeout/2 (usually 5s).
	var  time.Time
	 := func() error {
		 := time.Now()
		// If over half of the deadline has elapsed, extend it.
		if .Add(.config.ConnectionWriteTimeout / 2).After() {
			 = .Add(.config.ConnectionWriteTimeout)
			return .conn.SetWriteDeadline()
		}
		return nil
	}

	 := .conn

	// FIXME: https://github.com/libp2p/go-libp2p/issues/644
	// Write coalescing is disabled for now.

	// writer := pool.Writer{W: s.conn}

	// var writeTimeout *time.Timer
	// var writeTimeoutCh <-chan time.Time
	// if s.config.WriteCoalesceDelay > 0 {
	//	writeTimeout = time.NewTimer(s.config.WriteCoalesceDelay)
	//	defer writeTimeout.Stop()

	//	writeTimeoutCh = writeTimeout.C
	// } else {
	//	ch := make(chan time.Time)
	//	close(ch)
	//	writeTimeoutCh = ch
	// }

	for {
		// yield after processing the last message, if we've shutdown.
		// s.sendCh is a buffered channel and Go doesn't guarantee select order.
		select {
		case <-.shutdownCh:
			return nil
		default:
		}

		var  []byte
		// Make sure to send any pings & pongs first so they don't get stuck behind writes.
		select {
		case  := <-.pingCh:
			 = pool.Get(headerSize)
			 := encode(typePing, flagSYN, 0, )
			copy(, [:])
		case  := <-.pongCh:
			 = pool.Get(headerSize)
			 := encode(typePing, flagACK, 0, )
			copy(, [:])
		default:
			// Then send normal data.
			select {
			case  = <-.sendCh:
			case  := <-.pingCh:
				 = pool.Get(headerSize)
				 := encode(typePing, flagSYN, 0, )
				copy(, [:])
			case  := <-.pongCh:
				 = pool.Get(headerSize)
				 := encode(typePing, flagACK, 0, )
				copy(, [:])
			case <-.shutdownCh:
				return nil
				// default:
				//	select {
				//	case buf = <-s.sendCh:
				//	case <-s.shutdownCh:
				//		return nil
				//	case <-writeTimeoutCh:
				//		if err := writer.Flush(); err != nil {
				//			if os.IsTimeout(err) {
				//				err = ErrConnectionWriteTimeout
				//			}
				//			return err
				//		}

				//		select {
				//		case buf = <-s.sendCh:
				//		case <-s.shutdownCh:
				//			return nil
				//		}

				//		if writeTimeout != nil {
				//			writeTimeout.Reset(s.config.WriteCoalesceDelay)
				//		}
				//	}
			}
		}

		if  := ();  != nil {
			pool.Put()
			return 
		}

		,  := .Write()
		pool.Put()

		if  != nil {
			if os.IsTimeout() {
				 = ErrConnectionWriteTimeout
			}
			return 
		}
	}
}

// recv is a long running goroutine that accepts new data
func ( *Session) () {
	if  := .recvLoop();  != nil {
		.close(, false, 0)
	}
}

// Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type
var (
	handlers = []func(*Session, header) error{
		typeData:         (*Session).handleStreamMessage,
		typeWindowUpdate: (*Session).handleStreamMessage,
		typePing:         (*Session).handlePing,
		typeGoAway:       (*Session).handleGoAway,
	}
)

// recvLoop continues to receive data until a fatal error is encountered
func ( *Session) () ( error) {
	defer func() {
		if  := recover();  != nil {
			fmt.Fprintf(os.Stderr, "caught panic: %s\n%s\n", , debug.Stack())
			 = fmt.Errorf("panic in yamux receive loop: %s", )
		}
	}()
	defer func() {
		.recvErr = 
		close(.recvDoneCh)
	}()
	var  header
	for {
		// fmt.Printf("ReadFull from %#v\n", s.reader)
		// Read the header
		if ,  := io.ReadFull(.reader, [:]);  != nil {
			if  != io.EOF && !strings.Contains(.Error(), "closed") && !strings.Contains(.Error(), "reset by peer") {
				.logger.Printf("[ERR] yamux: Failed to read header: %v", )
			}
			return 
		}

		// Reset the keepalive timer every time we receive data.
		// There's no reason to keepalive if we're active. Worse, if the
		// peer is busy sending us stuff, the pong might get stuck
		// behind a bunch of data.
		.extendKeepalive()

		// Verify the version
		if .Version() != protoVersion {
			.logger.Printf("[ERR] yamux: Invalid protocol version: %d", .Version())
			return ErrInvalidVersion
		}

		 := .MsgType()
		if  < typeData ||  > typeGoAway {
			return ErrInvalidMsgType
		}

		if  := handlers[](, );  != nil {
			return 
		}
	}
}

// handleStreamMessage handles either a data or window update frame
func ( *Session) ( header) error {
	// Check for a new stream creation
	 := .StreamID()
	 := .Flags()
	if &flagSYN == flagSYN {
		if  := .incomingStream();  != nil {
			return 
		}
	}

	// Get the stream
	.streamLock.Lock()
	 := .streams[]
	.streamLock.Unlock()

	// If we do not have a stream, likely we sent a RST and/or closed the stream for reading.
	if  == nil {
		// Drain any data on the wire
		if .MsgType() == typeData && .Length() > 0 {
			if ,  := io.CopyN(io.Discard, .reader, int64(.Length()));  != nil {
				return nil
			}
		}
		return nil
	}

	// Check if this is a window update
	if .MsgType() == typeWindowUpdate {
		.incrSendWindow(, )
		return nil
	}

	// Read the new data
	if  := .readData(, , .reader);  != nil {
		if  := .sendMsg(.goAway(goAwayProtoErr), nil, nil, false);  != nil &&  != errSendLoopDone {
			.logger.Printf("[WARN] yamux: failed to send go away: %v", )
		}
		return 
	}
	return nil
}

// handlePing is invoked for a typePing frame
func ( *Session) ( header) error {
	 := .Flags()
	 := .Length()

	// Check if this is a query, respond back in a separate context so we
	// don't interfere with the receiving thread blocking for the write.
	if &flagSYN == flagSYN {
		select {
		case .pongCh <- :
		default:
			.logger.Printf("[WARN] yamux: dropped ping reply")
		}
		return nil
	}

	// Handle a response
	.pingLock.Lock()
	// If we have an active ping, and this is a response to that active
	// ping, complete the ping.
	if .activePing != nil && .activePing.id ==  {
		// Don't assume that the peer won't send multiple responses for
		// the same ping.
		select {
		case .activePing.pingResponse <- struct{}{}:
		default:
		}
	}
	.pingLock.Unlock()
	return nil
}

// handleGoAway is invokde for a typeGoAway frame
func ( *Session) ( header) error {
	 := .Length()
	switch  {
	case goAwayNormal:
		return ErrRemoteGoAway
	case goAwayProtoErr:
		.logger.Printf("[ERR] yamux: received protocol error go away")
	case goAwayInternalErr:
		.logger.Printf("[ERR] yamux: received internal error go away")
	default:
		.logger.Printf("[ERR] yamux: received go away with error code: %d", )
	}
	return &GoAwayError{Remote: true, ErrorCode: }
}

// incomingStream is used to create a new incoming stream
func ( *Session) ( uint32) error {
	if .client != (%2 == 0) {
		.logger.Printf("[ERR] yamux: both endpoints are clients")
		return fmt.Errorf("both yamux endpoints are clients")
	}
	// Reject immediately if we are doing a go away
	if atomic.LoadInt32(&.localGoAway) == 1 {
		 := encode(typeWindowUpdate, flagRST, , 0)
		return .sendMsg(, nil, nil, false)
	}

	// Allocate a new stream
	,  := .newMemoryManager()
	if  != nil {
		return fmt.Errorf("failed to create resource span: %w", )
	}
	if  := .ReserveMemory(initialStreamWindow, 255);  != nil {
		return 
	}
	 := newStream(, , streamSYNReceived, initialStreamWindow, )

	.streamLock.Lock()
	defer .streamLock.Unlock()

	// Check if stream already exists
	if ,  := .streams[];  {
		.logger.Printf("[ERR] yamux: duplicate stream declared")
		if  := .sendMsg(.goAway(goAwayProtoErr), nil, nil, false);  != nil &&  != errSendLoopDone {
			.logger.Printf("[WARN] yamux: failed to send go away: %v", )
		}
		.Done()
		return ErrDuplicateStream
	}

	if .numIncomingStreams >= .config.MaxIncomingStreams {
		// too many active streams at the same time
		.logger.Printf("[WARN] yamux: MaxIncomingStreams exceeded, forcing stream reset")
		defer .Done()
		 := encode(typeWindowUpdate, flagRST, , 0)
		return .sendMsg(, nil, nil, false)
	}

	.numIncomingStreams++
	// Register the stream
	.streams[] = 

	// Check if we've exceeded the backlog
	select {
	case .acceptCh <- :
		return nil
	default:
		// Backlog exceeded! RST the stream
		defer .Done()
		.logger.Printf("[WARN] yamux: backlog exceeded, forcing stream reset")
		.deleteStream()
		 := encode(typeWindowUpdate, flagRST, , 0)
		return .sendMsg(, nil, nil, false)
	}
}

// closeStream is used to close a stream once both sides have
// issued a close. If there was an in-flight SYN and the stream
// was not yet established, then this will give the credit back.
func ( *Session) ( uint32) {
	.streamLock.Lock()
	defer .streamLock.Unlock()
	if ,  := .inflight[];  {
		select {
		case <-.synCh:
		default:
			.logger.Printf("[ERR] yamux: SYN tracking out of sync")
		}
		delete(.inflight, )
	}
	.deleteStream()
}

func ( *Session) ( uint32) {
	,  := .streams[]
	if ! {
		return
	}
	if .client == (%2 == 0) {
		if .numIncomingStreams == 0 {
			.logger.Printf("[ERR] yamux: numIncomingStreams underflow")
			// prevent the creation of any new streams
			.numIncomingStreams = math.MaxUint32
		} else {
			.numIncomingStreams--
		}
	}
	delete(.streams, )
	.memorySpan.Done()
}

// establishStream is used to mark a stream that was in the
// SYN Sent state as established.
func ( *Session) ( uint32) {
	.streamLock.Lock()
	if ,  := .inflight[];  {
		delete(.inflight, )
	} else {
		.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)")
	}
	select {
	case <-.synCh:
	default:
		.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)")
	}
	.streamLock.Unlock()
}