package webtransport

import (
	
	
	
	
	
	
	
	

	
	
	
)

// sessionID is the WebTransport Session ID
type sessionID uint64

const closeSessionCapsuleType http3.CapsuleType = 0x2843

const maxCloseCapsuleErrorMsgLen = 1024

type acceptQueue[ any] struct {
	mx sync.Mutex
	// The channel is used to notify consumers (via Chan) about new incoming items.
	// Needs to be buffered to preserve the notification if an item is enqueued
	// between a call to Next and to Chan.
	c chan struct{}
	// Contains all the streams waiting to be accepted.
	// There's no explicit limit to the length of the queue, but it is implicitly
	// limited by the stream flow control provided by QUIC.
	queue []
}

func newAcceptQueue[ any]() *acceptQueue[] {
	return &acceptQueue[]{c: make(chan struct{}, 1)}
}

func ( *acceptQueue[]) ( ) {
	.mx.Lock()
	.queue = append(.queue, )
	.mx.Unlock()

	select {
	case .c <- struct{}{}:
	default:
	}
}

func ( *acceptQueue[]) ()  {
	.mx.Lock()
	defer .mx.Unlock()

	if len(.queue) == 0 {
		return *new()
	}
	 := .queue[0]
	.queue = .queue[1:]
	return 
}

func ( *acceptQueue[]) () <-chan struct{} { return .c }

type http3Stream interface {
	io.ReadWriteCloser
	ReceiveDatagram(context.Context) ([]byte, error)
	SendDatagram([]byte) error
	CancelRead(quic.StreamErrorCode)
	CancelWrite(quic.StreamErrorCode)
	SetWriteDeadline(time.Time) error
}

var (
	_ http3Stream = &http3.Stream{}
	_ http3Stream = &http3.RequestStream{}
)

// SessionState contains the state of a WebTransport session
type SessionState struct {
	// ConnectionState contains the QUIC connection state, including TLS handshake information
	ConnectionState quic.ConnectionState

	// ApplicationProtocol contains the application protocol negotiated for the session
	ApplicationProtocol string
}

type Session struct {
	sessionID           sessionID
	conn                *quic.Conn
	str                 http3Stream
	applicationProtocol string

	streamHdr    []byte
	uniStreamHdr []byte

	ctx      context.Context
	closeMx  sync.Mutex
	closeErr error // not nil once the session is closed
	// streamCtxs holds all the context.CancelFuncs of calls to Open{Uni}StreamSync calls currently active.
	// When the session is closed, this allows us to cancel all these contexts and make those calls return.
	streamCtxs map[int]context.CancelFunc

	bidiAcceptQueue acceptQueue[*Stream]
	uniAcceptQueue  acceptQueue[*ReceiveStream]

	streams streamsMap
}

func newSession( context.Context,  sessionID,  *quic.Conn,  http3Stream,  string) *Session {
	,  := context.WithCancel()
	 := &Session{
		sessionID:           ,
		conn:                ,
		str:                 ,
		applicationProtocol: ,
		ctx:                 ,
		streamCtxs:          make(map[int]context.CancelFunc),
		bidiAcceptQueue:     *newAcceptQueue[*Stream](),
		uniAcceptQueue:      *newAcceptQueue[*ReceiveStream](),
		streams:             *newStreamsMap(),
	}
	// precompute the headers for unidirectional streams
	.uniStreamHdr = make([]byte, 0, 2+quicvarint.Len(uint64(.sessionID)))
	.uniStreamHdr = quicvarint.Append(.uniStreamHdr, webTransportUniStreamType)
	.uniStreamHdr = quicvarint.Append(.uniStreamHdr, uint64(.sessionID))
	// precompute the headers for bidirectional streams
	.streamHdr = make([]byte, 0, 2+quicvarint.Len(uint64(.sessionID)))
	.streamHdr = quicvarint.Append(.streamHdr, webTransportFrameType)
	.streamHdr = quicvarint.Append(.streamHdr, uint64(.sessionID))

	go func() {
		defer ()
		.handleConn()
	}()
	return 
}

func ( *Session) () {
	 := .parseNextCapsule()
	.closeWithError()
}

// parseNextCapsule parses the next Capsule sent on the request stream.
// It returns a SessionError, if the capsule received is a WT_CLOSE_SESSION Capsule.
func ( *Session) () error {
	for {
		, ,  := http3.ParseCapsule(quicvarint.NewReader(.str))
		if  != nil {
			return 
		}
		switch  {
		case closeSessionCapsuleType:
			var  [4]byte
			if ,  := io.ReadFull(, [:]);  != nil {
				return 
			}
			 := binary.BigEndian.Uint32([:])
			// the length of the error message is limited to 1024 bytes
			,  := io.ReadAll(io.LimitReader(, maxCloseCapsuleErrorMsgLen))
			if  != nil {
				return 
			}
			return &SessionError{
				Remote:    true,
				ErrorCode: SessionErrorCode(),
				Message:   string(),
			}
		default:
			// unknown capsule, skip it
			if ,  := io.ReadAll();  != nil {
				return 
			}
		}
	}
}

func ( *Session) ( *quic.Stream,  bool) *Stream {
	var  []byte
	if  {
		 = .streamHdr
	}
	 := newStream(, , func() { .streams.RemoveStream(.StreamID()) })
	.streams.AddStream(.StreamID(), .closeWithSession)
	return 
}

func ( *Session) ( *quic.ReceiveStream) *ReceiveStream {
	 := newReceiveStream(, func() { .streams.RemoveStream(.StreamID()) })
	.streams.AddStream(.StreamID(), .closeWithSession)
	return 
}

func ( *Session) ( *quic.SendStream) *SendStream {
	 := newSendStream(, .uniStreamHdr, func() { .streams.RemoveStream(.StreamID()) })
	.streams.AddStream(.StreamID(), .closeWithSession)
	return 
}

// addIncomingStream adds a bidirectional stream that the remote peer opened
func ( *Session) ( *quic.Stream) {
	.closeMx.Lock()
	 := .closeErr
	if  != nil {
		.closeMx.Unlock()
		.CancelRead(WTSessionGoneErrorCode)
		.CancelWrite(WTSessionGoneErrorCode)
		return
	}
	 := .addStream(, false)
	.closeMx.Unlock()

	.bidiAcceptQueue.Add()
}

// addIncomingUniStream adds a unidirectional stream that the remote peer opened
func ( *Session) ( *quic.ReceiveStream) {
	.closeMx.Lock()
	 := .closeErr
	if  != nil {
		.closeMx.Unlock()
		.CancelRead(WTSessionGoneErrorCode)
		return
	}
	 := .addReceiveStream()
	.closeMx.Unlock()

	.uniAcceptQueue.Add()
}

// Context returns a context that is closed when the session is closed.
func ( *Session) () context.Context {
	return .ctx
}

func ( *Session) ( context.Context) (*Stream, error) {
	.closeMx.Lock()
	 := .closeErr
	.closeMx.Unlock()
	if  != nil {
		return nil, 
	}

	for {
		// If there's a stream in the accept queue, return it immediately.
		if  := .bidiAcceptQueue.Next();  != nil {
			return , nil
		}
		// No stream in the accept queue. Wait until we accept one.
		select {
		case <-.ctx.Done():
			return nil, .closeErr
		case <-.Done():
			return nil, .Err()
		case <-.bidiAcceptQueue.Chan():
		}
	}
}

func ( *Session) ( context.Context) (*ReceiveStream, error) {
	.closeMx.Lock()
	 := .closeErr
	.closeMx.Unlock()
	if  != nil {
		return nil, .closeErr
	}

	for {
		// If there's a stream in the accept queue, return it immediately.
		if  := .uniAcceptQueue.Next();  != nil {
			return , nil
		}
		// No stream in the accept queue. Wait until we accept one.
		select {
		case <-.ctx.Done():
			return nil, .closeErr
		case <-.Done():
			return nil, .Err()
		case <-.uniAcceptQueue.Chan():
		}
	}
}

func ( *Session) () (*Stream, error) {
	.closeMx.Lock()
	defer .closeMx.Unlock()

	if .closeErr != nil {
		return nil, .closeErr
	}

	,  := .conn.OpenStream()
	if  != nil {
		return nil, 
	}
	return .addStream(, true), nil
}

func ( *Session) ( context.CancelFunc) ( int) {
:
	 = rand.Int()
	if ,  := .streamCtxs[];  {
		goto 
	}
	.streamCtxs[] = 
	return 
}

func ( *Session) ( context.Context) (*Stream, error) {
	.closeMx.Lock()
	if .closeErr != nil {
		.closeMx.Unlock()
		return nil, .closeErr
	}
	,  := context.WithCancel()
	 := .addStreamCtxCancel()
	.closeMx.Unlock()

	// open a new bidirectional stream without holding the mutex: this call might block
	,  := .conn.OpenStreamSync()

	.closeMx.Lock()
	defer .closeMx.Unlock()
	delete(.streamCtxs, )

	// the session might have been closed concurrently with OpenStreamSync returning
	if  != nil && .closeErr != nil {
		.CancelRead(WTSessionGoneErrorCode)
		.CancelWrite(WTSessionGoneErrorCode)
		return nil, .closeErr
	}
	if  != nil {
		if .closeErr != nil {
			return nil, .closeErr
		}
		return nil, 
	}
	return .addStream(, true), nil
}

func ( *Session) () (*SendStream, error) {
	.closeMx.Lock()
	defer .closeMx.Unlock()

	if .closeErr != nil {
		return nil, .closeErr
	}
	,  := .conn.OpenUniStream()
	if  != nil {
		return nil, 
	}
	return .addSendStream(), nil
}

func ( *Session) ( context.Context) ( *SendStream,  error) {
	.closeMx.Lock()
	if .closeErr != nil {
		.closeMx.Unlock()
		return nil, .closeErr
	}
	,  := context.WithCancel()
	 := .addStreamCtxCancel()
	.closeMx.Unlock()

	// open a new unidirectional stream without holding the mutex: this call might block
	,  := .conn.OpenUniStreamSync()

	.closeMx.Lock()
	defer .closeMx.Unlock()
	delete(.streamCtxs, )

	// the session might have been closed concurrently with OpenStreamSync returning
	if  != nil && .closeErr != nil {
		.CancelWrite(WTSessionGoneErrorCode)
		return nil, .closeErr
	}
	if  != nil {
		if .closeErr != nil {
			return nil, .closeErr
		}
		return nil, 
	}
	return .addSendStream(), nil
}

func ( *Session) () net.Addr {
	return .conn.LocalAddr()
}

func ( *Session) () net.Addr {
	return .conn.RemoteAddr()
}

func ( *Session) ( SessionErrorCode,  string) error {
	,  := .closeWithError(&SessionError{ErrorCode: , Message: })
	if  != nil || ! {
		return 
	}

	// truncate the message if it's too long
	if len() > maxCloseCapsuleErrorMsgLen {
		 = truncateUTF8(, maxCloseCapsuleErrorMsgLen)
	}

	 := make([]byte, 4, 4+len())
	binary.BigEndian.PutUint32(, uint32())
	 = append(, []byte()...)

	// Optimistically send the WT_CLOSE_SESSION Capsule:
	// If we're flow-control limited, we don't want to wait for the receiver to issue new flow control credits.
	// There's no idiomatic way to do a non-blocking write in Go, so we set a short deadline.
	.str.SetWriteDeadline(time.Now().Add(10 * time.Millisecond))
	if  := http3.WriteCapsule(quicvarint.NewWriter(.str), closeSessionCapsuleType, );  != nil {
		.str.CancelWrite(WTSessionGoneErrorCode)
	}

	.str.CancelRead(WTSessionGoneErrorCode)
	 = .str.Close()
	<-.ctx.Done()
	return 
}

func ( *Session) ( []byte) error {
	return .str.SendDatagram()
}

func ( *Session) ( context.Context) ([]byte, error) {
	return .str.ReceiveDatagram()
}

func ( *Session) ( error) (bool /* first call to close session */, error) {
	.closeMx.Lock()
	defer .closeMx.Unlock()
	// Duplicate call, or the remote already closed this session.
	if .closeErr != nil {
		return false, nil
	}
	.closeErr = 

	for ,  := range .streamCtxs {
		()
	}
	.streams.CloseSession()

	return true, nil
}

// SessionState returns the current state of the session
func ( *Session) () SessionState {
	return SessionState{
		ConnectionState:     .conn.ConnectionState(),
		ApplicationProtocol: .applicationProtocol,
	}
}

// truncateUTF8 cuts a string to max n bytes without breaking UTF-8 characters.
func truncateUTF8( string,  int) string {
	if len() <=  {
		return 
	}

	for  > 0 && !utf8.RuneStart([]) {
		--
	}
	return [:]
}