package webtransport

import (
	
	
	
	
	
	
	
	

	
)

type quicSendStream interface {
	io.WriteCloser
	CancelWrite(quic.StreamErrorCode)
	Context() context.Context
	SetWriteDeadline(time.Time) error
	SetReliableBoundary()
}

var (
	_ quicSendStream = &quic.SendStream{}
	_ quicSendStream = &quic.Stream{}
)

type quicReceiveStream interface {
	io.Reader
	CancelRead(quic.StreamErrorCode)
	SetReadDeadline(time.Time) error
}

var (
	_ quicReceiveStream = &quic.ReceiveStream{}
	_ quicReceiveStream = &quic.Stream{}
)

// A SendStream is a unidirectional WebTransport send stream.
type SendStream struct {
	str quicSendStream
	// WebTransport stream header.
	// Set by the constructor, set to nil once sent out.
	// Might be initialized to nil if this sendStream is part of an incoming bidirectional stream.
	streamHdr   []byte
	streamHdrMu sync.Mutex
	// Set to true when a goroutine is spawned to send the header asynchronously.
	// This only happens if the stream is closed / reset immediately after creation.
	sendingHdrAsync bool

	onClose func() // to remove the stream from the streamsMap

	closeOnce sync.Once
	closed    chan struct{}
	closeErr  error

	deadlineMu       sync.Mutex
	writeDeadline    time.Time
	deadlineNotifyCh chan struct{} // receives a value when deadline changes
}

func newSendStream( quicSendStream,  []byte,  func()) *SendStream {
	return &SendStream{
		str:       ,
		closed:    make(chan struct{}),
		streamHdr: ,
		onClose:   ,
	}
}

// Write writes data to the stream.
// Write can be made to time out using [SendStream.SetWriteDeadline].
// If the stream was canceled, the error is a [StreamError].
func ( *SendStream) ( []byte) (int, error) {
	,  := .write()
	if  != nil && !isTimeoutError() {
		.onClose()
	}
	var  *quic.StreamError
	if errors.As(, &) && .ErrorCode == WTSessionGoneErrorCode {
		return , .handleSessionGoneError()
	}
	return , maybeConvertStreamError()
}

// handleSessionGoneError waits for the session to be closed after receiving a WTSessionGoneErrorCode.
// If the peer is initiating the session close, we might need to wait for the CONNECT stream to be closed.
// While a malicious peer might withhold the session close, this is not an interesting attack vector:
// 1. a WebTransport stream consumes very little memory, and
// 2. the number of concurrent WebTransport sessions is limited.
func ( *SendStream) () error {
	.deadlineMu.Lock()
	if .deadlineNotifyCh == nil {
		.deadlineNotifyCh = make(chan struct{}, 1)
	}
	.deadlineMu.Unlock()

	for {
		.deadlineMu.Lock()
		 := .writeDeadline
		.deadlineMu.Unlock()

		var  <-chan time.Time
		if !.IsZero() {
			if  := time.Until();  > 0 {
				 = time.After()
			} else {
				return os.ErrDeadlineExceeded
			}
		}
		select {
		case <-.closed:
			return .closeErr
		case <-:
			return os.ErrDeadlineExceeded
		case <-.deadlineNotifyCh:
		}
	}
}

func ( *SendStream) ( []byte) (int, error) {
	.streamHdrMu.Lock()
	 := .maybeSendStreamHeader()
	.streamHdrMu.Unlock()
	if  != nil {
		return 0, 
	}
	return .str.Write()
}

func ( *SendStream) () error {
	if len(.streamHdr) == 0 {
		return nil
	}
	,  := .str.Write(.streamHdr)
	if  > 0 {
		.streamHdr = .streamHdr[:]
	}
	.str.SetReliableBoundary()
	if  != nil {
		return 
	}
	.streamHdr = nil
	return nil
}

// CancelWrite aborts sending on this stream.
// Data already written, but not yet delivered to the peer is not guaranteed to be delivered reliably.
// Write will unblock immediately, and future calls to Write will fail.
// When called multiple times it is a no-op.
func ( *SendStream) ( StreamErrorCode) {
	// if a Goroutine is already sending the header, return immediately
	.streamHdrMu.Lock()
	if .sendingHdrAsync {
		.streamHdrMu.Unlock()
		return
	}

	if len(.streamHdr) > 0 {
		// Sending the stream header might block if we are blocked by flow control.
		// Send a stream header async so that CancelWrite can return immediately.
		.sendingHdrAsync = true
		 := .streamHdr
		.streamHdr = nil
		.streamHdrMu.Unlock()

		go func() {
			.SetWriteDeadline(time.Time{})
			_, _ = .str.Write()
			.str.SetReliableBoundary()
			.str.CancelWrite(webtransportCodeToHTTPCode())
			.onClose()
		}()
		return
	}
	.streamHdrMu.Unlock()

	.str.CancelWrite(webtransportCodeToHTTPCode())
	.onClose()
}

func ( *SendStream) ( error) {
	.closeOnce.Do(func() {
		.closeErr = 
		.str.CancelWrite(WTSessionGoneErrorCode)
		close(.closed)
	})
}

// Close closes the write-direction of the stream.
// Future calls to Write are not permitted after calling Close.
func ( *SendStream) () error {
	// if a Goroutine is already sending the header, return immediately
	.streamHdrMu.Lock()
	if .sendingHdrAsync {
		.streamHdrMu.Unlock()
		return nil
	}

	if len(.streamHdr) > 0 {
		// Sending the stream header might block if we are blocked by flow control.
		// Send a stream header async so that CancelWrite can return immediately.
		.sendingHdrAsync = true
		 := .streamHdr
		.streamHdr = nil
		.streamHdrMu.Unlock()

		go func() {
			.SetWriteDeadline(time.Time{})
			_, _ = .str.Write()
			.str.SetReliableBoundary()
			_ = .str.Close()
			.onClose()
		}()
		return nil
	}
	.streamHdrMu.Unlock()

	.onClose()
	return maybeConvertStreamError(.str.Close())
}

// The Context is canceled as soon as the write-side of the stream is closed.
// This happens when Close() or CancelWrite() is called, or when the peer
// cancels the read-side of their stream.
// The cancellation cause is set to the error that caused the stream to
// close, or `context.Canceled` in case the stream is closed without error.
func ( *SendStream) () context.Context {
	return .str.Context()
}

// SetWriteDeadline sets the deadline for future Write calls
// and any currently-blocked Write call.
// Even if write times out, it may return n > 0, indicating that
// some data was successfully written.
// A zero value for t means Write will not time out.
func ( *SendStream) ( time.Time) error {
	.deadlineMu.Lock()
	.writeDeadline = 
	if .deadlineNotifyCh != nil {
		select {
		case .deadlineNotifyCh <- struct{}{}:
		default:
		}
	}
	.deadlineMu.Unlock()

	return maybeConvertStreamError(.str.SetWriteDeadline())
}

// A ReceiveStream is a unidirectional WebTransport receive stream.
type ReceiveStream struct {
	str quicReceiveStream

	onClose func() // to remove the stream from the streamsMap

	closeOnce sync.Once
	closed    chan struct{}
	closeErr  error

	deadlineMu       sync.Mutex
	readDeadline     time.Time
	deadlineNotifyCh chan struct{} // receives a value when deadline changes
}

func newReceiveStream( quicReceiveStream,  func()) *ReceiveStream {
	return &ReceiveStream{
		str:     ,
		closed:  make(chan struct{}),
		onClose: ,
	}
}

// Read reads data from the stream.
// Read can be made to time out using [ReceiveStream.SetReadDeadline].
// If the stream was canceled, the error is a [StreamError].
func ( *ReceiveStream) ( []byte) (int, error) {
	,  := .str.Read()
	if  != nil && !isTimeoutError() {
		.onClose()
	}
	var  *quic.StreamError
	if errors.As(, &) && .ErrorCode == WTSessionGoneErrorCode {
		return , .handleSessionGoneError()
	}
	return , maybeConvertStreamError()
}

// handleSessionGoneError waits for the session to be closed after receiving a WTSessionGoneErrorCode.
// If the peer is initiating the session close, we might need to wait for the CONNECT stream to be closed.
// While a malicious peer might withhold the session close, this is not an interesting attack vector:
// 1. a WebTransport stream consumes very little memory, and
// 2. the number of concurrent WebTransport sessions is limited.
func ( *ReceiveStream) () error {
	.deadlineMu.Lock()
	if .deadlineNotifyCh == nil {
		.deadlineNotifyCh = make(chan struct{}, 1)
	}
	.deadlineMu.Unlock()

	for {
		.deadlineMu.Lock()
		 := .readDeadline
		.deadlineMu.Unlock()

		var  <-chan time.Time
		if !.IsZero() {
			if  := time.Until();  > 0 {
				 = time.After()
			} else {
				return os.ErrDeadlineExceeded
			}
		}
		select {
		case <-.closed:
			return .closeErr
		case <-:
			return os.ErrDeadlineExceeded
		case <-.deadlineNotifyCh:
		}
	}
}

// CancelRead aborts receiving on this stream.
// It instructs the peer to stop transmitting stream data.
// Read will unblock immediately, and future Read calls will fail.
// When called multiple times it is a no-op.
func ( *ReceiveStream) ( StreamErrorCode) {
	.str.CancelRead(webtransportCodeToHTTPCode())
	.onClose()
}

func ( *ReceiveStream) ( error) {
	.closeOnce.Do(func() {
		.closeErr = 
		.str.CancelRead(WTSessionGoneErrorCode)
		close(.closed)
	})
}

// SetReadDeadline sets the deadline for future Read calls and
// any currently-blocked Read call.
// A zero value for t means Read will not time out.
func ( *ReceiveStream) ( time.Time) error {
	.deadlineMu.Lock()
	.readDeadline = 
	if .deadlineNotifyCh != nil {
		select {
		case .deadlineNotifyCh <- struct{}{}:
		default:
		}
	}
	.deadlineMu.Unlock()

	return maybeConvertStreamError(.str.SetReadDeadline())
}

// Stream is a bidirectional WebTransport stream.
type Stream struct {
	sendStr *SendStream
	recvStr *ReceiveStream

	mx                             sync.Mutex
	sendSideClosed, recvSideClosed bool
	onClose                        func()
}

func newStream( *quic.Stream,  []byte,  func()) *Stream {
	 := &Stream{onClose: }
	.sendStr = newSendStream(, , func() { .registerClose(true) })
	.recvStr = newReceiveStream(, func() { .registerClose(false) })
	return 
}

// Write writes data to the stream.
// Write can be made to time out using [Stream.SetWriteDeadline] or [Stream.SetDeadline].
// If the stream was canceled, the error is a [StreamError].
func ( *Stream) ( []byte) (int, error) {
	return .sendStr.Write()
}

// Read reads data from the stream.
// Read can be made to time out using [Stream.SetReadDeadline] and [Stream.SetDeadline].
// If the stream was canceled, the error is a [StreamError].
func ( *Stream) ( []byte) (int, error) {
	return .recvStr.Read()
}

// CancelWrite aborts sending on this stream.
// See [SendStream.CancelWrite] for more details.
func ( *Stream) ( StreamErrorCode) {
	.sendStr.CancelWrite()
}

// CancelRead aborts receiving on this stream.
// See [ReceiveStream.CancelRead] for more details.
func ( *Stream) ( StreamErrorCode) {
	.recvStr.CancelRead()
}

// Close closes the send-direction of the stream.
// It does not close the receive-direction of the stream.
func ( *Stream) () error {
	return .sendStr.Close()
}

func ( *Stream) ( bool) {
	.mx.Lock()
	if  {
		.sendSideClosed = true
	} else {
		.recvSideClosed = true
	}
	 := .sendSideClosed && .recvSideClosed
	.mx.Unlock()

	if  {
		.onClose()
	}
}

func ( *Stream) ( error) {
	.sendStr.closeWithSession()
	.recvStr.closeWithSession()
}

// The Context is canceled as soon as the write-side of the stream is closed.
// See [SendStream.Context] for more details.
func ( *Stream) () context.Context {
	return .sendStr.Context()
}

// SetWriteDeadline sets the deadline for future Write calls.
// See [SendStream.SetWriteDeadline] for more details.
func ( *Stream) ( time.Time) error {
	return .sendStr.SetWriteDeadline()
}

// SetReadDeadline sets the deadline for future Read calls.
// See [ReceiveStream.SetReadDeadline] for more details.
func ( *Stream) ( time.Time) error {
	return .recvStr.SetReadDeadline()
}

// SetDeadline sets the read and write deadlines associated with the stream.
// It is equivalent to calling both SetReadDeadline and SetWriteDeadline.
func ( *Stream) ( time.Time) error {
	 := .SetWriteDeadline()
	 := .SetReadDeadline()
	return errors.Join(, )
}

func maybeConvertStreamError( error) error {
	if  == nil {
		return nil
	}
	var  *quic.StreamError
	if errors.As(, &) {
		,  := httpCodeToWebtransportCode(.ErrorCode)
		if  != nil {
			return fmt.Errorf("stream reset, but failed to convert stream error %d: %w", .ErrorCode, )
		}
		return &StreamError{
			ErrorCode: ,
			Remote:    .Remote,
		}
	}
	return 
}

func isTimeoutError( error) bool {
	,  := .(net.Error)
	if ! {
		return false
	}
	return .Timeout()
}