package webtransport

import (
	
	
	
	
	
	
	

	
	
	
)

// sessionID is the WebTransport Session ID
type sessionID uint64

const closeWebtransportSessionCapsuleType http3.CapsuleType = 0x2843

type acceptQueue[ any] struct {
	mx sync.Mutex
	// The channel is used to notify consumers (via Chan) about new incoming items.
	// Needs to be buffered to preserve the notification if an item is enqueued
	// between a call to Next and to Chan.
	c chan struct{}
	// Contains all the streams waiting to be accepted.
	// There's no explicit limit to the length of the queue, but it is implicitly
	// limited by the stream flow control provided by QUIC.
	queue []
}

func newAcceptQueue[ any]() *acceptQueue[] {
	return &acceptQueue[]{c: make(chan struct{}, 1)}
}

func ( *acceptQueue[]) ( ) {
	.mx.Lock()
	.queue = append(.queue, )
	.mx.Unlock()

	select {
	case .c <- struct{}{}:
	default:
	}
}

func ( *acceptQueue[]) ()  {
	.mx.Lock()
	defer .mx.Unlock()

	if len(.queue) == 0 {
		return *new()
	}
	 := .queue[0]
	.queue = .queue[1:]
	return 
}

func ( *acceptQueue[]) () <-chan struct{} { return .c }

type Session struct {
	sessionID  sessionID
	qconn      http3.Connection
	requestStr http3.Stream

	streamHdr    []byte
	uniStreamHdr []byte

	ctx      context.Context
	closeMx  sync.Mutex
	closeErr error // not nil once the session is closed
	// streamCtxs holds all the context.CancelFuncs of calls to Open{Uni}StreamSync calls currently active.
	// When the session is closed, this allows us to cancel all these contexts and make those calls return.
	streamCtxs map[int]context.CancelFunc

	bidiAcceptQueue acceptQueue[Stream]
	uniAcceptQueue  acceptQueue[ReceiveStream]

	// TODO: garbage collect streams from when they are closed
	streams streamsMap
}

func newSession( sessionID,  http3.Connection,  http3.Stream) *Session {
	 := .Context().Value(quic.ConnectionTracingKey).(quic.ConnectionTracingID)
	,  := context.WithCancel(context.WithValue(context.Background(), quic.ConnectionTracingKey, ))
	 := &Session{
		sessionID:       ,
		qconn:           ,
		requestStr:      ,
		ctx:             ,
		streamCtxs:      make(map[int]context.CancelFunc),
		bidiAcceptQueue: *newAcceptQueue[Stream](),
		uniAcceptQueue:  *newAcceptQueue[ReceiveStream](),
		streams:         *newStreamsMap(),
	}
	// precompute the headers for unidirectional streams
	.uniStreamHdr = make([]byte, 0, 2+quicvarint.Len(uint64(.sessionID)))
	.uniStreamHdr = quicvarint.Append(.uniStreamHdr, webTransportUniStreamType)
	.uniStreamHdr = quicvarint.Append(.uniStreamHdr, uint64(.sessionID))
	// precompute the headers for bidirectional streams
	.streamHdr = make([]byte, 0, 2+quicvarint.Len(uint64(.sessionID)))
	.streamHdr = quicvarint.Append(.streamHdr, webTransportFrameType)
	.streamHdr = quicvarint.Append(.streamHdr, uint64(.sessionID))

	go func() {
		defer ()
		.handleConn()
	}()
	return 
}

func ( *Session) () {
	var  *SessionError
	 := .parseNextCapsule()
	if !errors.As(, &) {
		 = &SessionError{Remote: true}
	}

	.closeMx.Lock()
	defer .closeMx.Unlock()
	// If we closed the connection, the closeErr will be set in Close.
	if .closeErr == nil {
		.closeErr = 
	}
	for ,  := range .streamCtxs {
		()
	}
	.streams.CloseSession()
}

// parseNextCapsule parses the next Capsule sent on the request stream.
// It returns a SessionError, if the capsule received is a CLOSE_WEBTRANSPORT_SESSION Capsule.
func ( *Session) () error {
	for {
		// TODO: enforce max size
		, ,  := http3.ParseCapsule(quicvarint.NewReader(.requestStr))
		if  != nil {
			return 
		}
		switch  {
		case closeWebtransportSessionCapsuleType:
			 := make([]byte, 4)
			if ,  := io.ReadFull(, );  != nil {
				return 
			}
			 := binary.BigEndian.Uint32()
			,  := io.ReadAll()
			if  != nil {
				return 
			}
			return &SessionError{
				Remote:    true,
				ErrorCode: SessionErrorCode(),
				Message:   string(),
			}
		default:
			// unknown capsule, skip it
			if ,  := io.ReadAll();  != nil {
				return 
			}
		}
	}
}

func ( *Session) ( quic.Stream,  bool) Stream {
	var  []byte
	if  {
		 = .streamHdr
	}
	 := newStream(, , func() { .streams.RemoveStream(.StreamID()) })
	.streams.AddStream(.StreamID(), .closeWithSession)
	return 
}

func ( *Session) ( quic.ReceiveStream) ReceiveStream {
	 := newReceiveStream(, func() { .streams.RemoveStream(.StreamID()) })
	.streams.AddStream(.StreamID(), func() {
		.closeWithSession()
	})
	return 
}

func ( *Session) ( quic.SendStream) SendStream {
	 := newSendStream(, .uniStreamHdr, func() { .streams.RemoveStream(.StreamID()) })
	.streams.AddStream(.StreamID(), .closeWithSession)
	return 
}

// addIncomingStream adds a bidirectional stream that the remote peer opened
func ( *Session) ( quic.Stream) {
	.closeMx.Lock()
	 := .closeErr
	if  != nil {
		.closeMx.Unlock()
		.CancelRead(sessionCloseErrorCode)
		.CancelWrite(sessionCloseErrorCode)
		return
	}
	 := .addStream(, false)
	.closeMx.Unlock()

	.bidiAcceptQueue.Add()
}

// addIncomingUniStream adds a unidirectional stream that the remote peer opened
func ( *Session) ( quic.ReceiveStream) {
	.closeMx.Lock()
	 := .closeErr
	if  != nil {
		.closeMx.Unlock()
		.CancelRead(sessionCloseErrorCode)
		return
	}
	 := .addReceiveStream()
	.closeMx.Unlock()

	.uniAcceptQueue.Add()
}

// Context returns a context that is closed when the session is closed.
func ( *Session) () context.Context {
	return .ctx
}

func ( *Session) ( context.Context) (Stream, error) {
	.closeMx.Lock()
	 := .closeErr
	.closeMx.Unlock()
	if  != nil {
		return nil, 
	}

	for {
		// If there's a stream in the accept queue, return it immediately.
		if  := .bidiAcceptQueue.Next();  != nil {
			return , nil
		}
		// No stream in the accept queue. Wait until we accept one.
		select {
		case <-.ctx.Done():
			return nil, .closeErr
		case <-.Done():
			return nil, .Err()
		case <-.bidiAcceptQueue.Chan():
		}
	}
}

func ( *Session) ( context.Context) (ReceiveStream, error) {
	.closeMx.Lock()
	 := .closeErr
	.closeMx.Unlock()
	if  != nil {
		return nil, .closeErr
	}

	for {
		// If there's a stream in the accept queue, return it immediately.
		if  := .uniAcceptQueue.Next();  != nil {
			return , nil
		}
		// No stream in the accept queue. Wait until we accept one.
		select {
		case <-.ctx.Done():
			return nil, .closeErr
		case <-.Done():
			return nil, .Err()
		case <-.uniAcceptQueue.Chan():
		}
	}
}

func ( *Session) () (Stream, error) {
	.closeMx.Lock()
	defer .closeMx.Unlock()

	if .closeErr != nil {
		return nil, .closeErr
	}

	,  := .qconn.OpenStream()
	if  != nil {
		return nil, 
	}
	return .addStream(, true), nil
}

func ( *Session) ( context.CancelFunc) ( int) {
:
	 = rand.Int()
	if ,  := .streamCtxs[];  {
		goto 
	}
	.streamCtxs[] = 
	return 
}

func ( *Session) ( context.Context) (Stream, error) {
	.closeMx.Lock()
	if .closeErr != nil {
		.closeMx.Unlock()
		return nil, .closeErr
	}
	,  := context.WithCancel()
	 := .addStreamCtxCancel()
	.closeMx.Unlock()

	,  := .qconn.OpenStreamSync()
	if  != nil {
		if .closeErr != nil {
			return nil, .closeErr
		}
		return nil, 
	}

	.closeMx.Lock()
	defer .closeMx.Unlock()
	delete(.streamCtxs, )
	// Some time might have passed. Check if the session is still alive
	if .closeErr != nil {
		.CancelWrite(sessionCloseErrorCode)
		.CancelRead(sessionCloseErrorCode)
		return nil, .closeErr
	}
	return .addStream(, true), nil
}

func ( *Session) () (SendStream, error) {
	.closeMx.Lock()
	defer .closeMx.Unlock()

	if .closeErr != nil {
		return nil, .closeErr
	}
	,  := .qconn.OpenUniStream()
	if  != nil {
		return nil, 
	}
	return .addSendStream(), nil
}

func ( *Session) ( context.Context) ( SendStream,  error) {
	.closeMx.Lock()
	if .closeErr != nil {
		.closeMx.Unlock()
		return nil, .closeErr
	}
	,  := context.WithCancel()
	 := .addStreamCtxCancel()
	.closeMx.Unlock()

	,  := .qconn.OpenUniStreamSync()
	if  != nil {
		if .closeErr != nil {
			return nil, .closeErr
		}
		return nil, 
	}

	.closeMx.Lock()
	defer .closeMx.Unlock()
	delete(.streamCtxs, )
	// Some time might have passed. Check if the session is still alive
	if .closeErr != nil {
		.CancelWrite(sessionCloseErrorCode)
		return nil, .closeErr
	}
	return .addSendStream(), nil
}

func ( *Session) () net.Addr {
	return .qconn.LocalAddr()
}

func ( *Session) () net.Addr {
	return .qconn.RemoteAddr()
}

func ( *Session) ( SessionErrorCode,  string) error {
	,  := .closeWithError(, )
	if  != nil || ! {
		return 
	}

	.requestStr.CancelRead(1337)
	 = .requestStr.Close()
	<-.ctx.Done()
	return 
}

func ( *Session) ( []byte) error {
	return .requestStr.SendDatagram()
}

func ( *Session) ( context.Context) ([]byte, error) {
	return .requestStr.ReceiveDatagram()
}

func ( *Session) ( SessionErrorCode,  string) (bool /* first call to close session */, error) {
	.closeMx.Lock()
	defer .closeMx.Unlock()
	// Duplicate call, or the remote already closed this session.
	if .closeErr != nil {
		return false, nil
	}
	.closeErr = &SessionError{
		ErrorCode: ,
		Message:   ,
	}

	 := make([]byte, 4, 4+len())
	binary.BigEndian.PutUint32(, uint32())
	 = append(, []byte()...)

	return true, http3.WriteCapsule(
		quicvarint.NewWriter(.requestStr),
		closeWebtransportSessionCapsuleType,
		,
	)
}

func ( *Session) () quic.ConnectionState {
	return .qconn.ConnectionState()
}