package http3

import (
	
	
	
	
	
	
	
	
	
	
	

	
	
	

	
)

var errGoAway = errors.New("connection in graceful shutdown")

// Connection is an HTTP/3 connection.
// It has all methods from the quic.Connection expect for AcceptStream, AcceptUniStream,
// SendDatagram and ReceiveDatagram.
type Connection interface {
	OpenStream() (quic.Stream, error)
	OpenStreamSync(context.Context) (quic.Stream, error)
	OpenUniStream() (quic.SendStream, error)
	OpenUniStreamSync(context.Context) (quic.SendStream, error)
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
	CloseWithError(quic.ApplicationErrorCode, string) error
	Context() context.Context
	ConnectionState() quic.ConnectionState

	// ReceivedSettings returns a channel that is closed once the client's SETTINGS frame was received.
	ReceivedSettings() <-chan struct{}
	// Settings returns the settings received on this connection.
	Settings() *Settings
}

type connection struct {
	quic.Connection
	ctx context.Context

	perspective protocol.Perspective
	logger      *slog.Logger

	enableDatagrams bool

	decoder *qpack.Decoder

	streamMx     sync.Mutex
	streams      map[protocol.StreamID]*datagrammer
	lastStreamID protocol.StreamID
	maxStreamID  protocol.StreamID

	settings         *Settings
	receivedSettings chan struct{}

	idleTimeout time.Duration
	idleTimer   *time.Timer
}

func newConnection(
	 context.Context,
	 quic.Connection,
	 bool,
	 protocol.Perspective,
	 *slog.Logger,
	 time.Duration,
) *connection {
	 := &connection{
		ctx:              ,
		Connection:       ,
		perspective:      ,
		logger:           ,
		idleTimeout:      ,
		enableDatagrams:  ,
		decoder:          qpack.NewDecoder(func( qpack.HeaderField) {}),
		receivedSettings: make(chan struct{}),
		streams:          make(map[protocol.StreamID]*datagrammer),
		maxStreamID:      protocol.InvalidStreamID,
		lastStreamID:     protocol.InvalidStreamID,
	}
	if  > 0 {
		.idleTimer = time.AfterFunc(, .onIdleTimer)
	}
	return 
}

func ( *connection) () {
	.CloseWithError(quic.ApplicationErrorCode(ErrCodeNoError), "idle timeout")
}

func ( *connection) ( quic.StreamID) {
	.streamMx.Lock()
	defer .streamMx.Unlock()

	delete(.streams, )
	if .idleTimeout > 0 && len(.streams) == 0 {
		.idleTimer.Reset(.idleTimeout)
	}
	// The server is performing a graceful shutdown.
	// If no more streams are remaining, close the connection.
	if .maxStreamID != protocol.InvalidStreamID {
		if len(.streams) == 0 {
			.CloseWithError(quic.ApplicationErrorCode(ErrCodeNoError), "")
		}
	}
}

func ( *connection) (
	 context.Context,
	 *requestWriter,
	 chan<- struct{},
	 bool,
	 uint64,
) (*requestStream, error) {
	if .perspective == protocol.PerspectiveClient {
		.streamMx.Lock()
		 := .maxStreamID
		var  quic.StreamID
		if .lastStreamID == protocol.InvalidStreamID {
			 = 0
		} else {
			 = .lastStreamID + 4
		}
		.streamMx.Unlock()
		// Streams with stream ID equal to or greater than the stream ID carried in the GOAWAY frame
		// will be rejected, see section 5.2 of RFC 9114.
		if  != protocol.InvalidStreamID &&  >=  {
			return nil, errGoAway
		}
	}

	,  := .OpenStreamSync()
	if  != nil {
		return nil, 
	}
	 := newDatagrammer(func( []byte) error { return .sendDatagram(.StreamID(), ) })
	.streamMx.Lock()
	.streams[.StreamID()] = 
	.lastStreamID = .StreamID()
	.streamMx.Unlock()
	 := newStateTrackingStream(, , )
	 := &http.Response{}
	 := newStream(, , , func( io.Reader,  uint64) error {
		,  := .decodeTrailers(, , )
		if  != nil {
			return 
		}
		.Trailer = 
		return nil
	})
	 := httptrace.ContextClientTrace()
	return newRequestStream(, , , .decoder, , , , ), nil
}

func ( *connection) ( io.Reader, ,  uint64) (http.Header, error) {
	if  >  {
		return nil, fmt.Errorf("HEADERS frame too large: %d bytes (max: %d)", , )
	}

	 := make([]byte, )
	if ,  := io.ReadFull(, );  != nil {
		return nil, 
	}
	,  := .decoder.DecodeFull()
	if  != nil {
		return nil, 
	}
	return parseTrailers()
}

func ( *connection) ( context.Context) (quic.Stream, *datagrammer, error) {
	,  := .AcceptStream()
	if  != nil {
		return nil, nil, 
	}
	 := newDatagrammer(func( []byte) error { return .sendDatagram(.StreamID(), ) })
	if .perspective == protocol.PerspectiveServer {
		 := .StreamID()
		.streamMx.Lock()
		.streams[] = 
		if .idleTimeout > 0 {
			if len(.streams) == 1 {
				.idleTimer.Stop()
			}
		}
		.streamMx.Unlock()
		 = newStateTrackingStream(, , )
	}
	return , , nil
}

func ( *connection) ( quic.ApplicationErrorCode,  string) error {
	if .idleTimer != nil {
		.idleTimer.Stop()
	}
	return .Connection.CloseWithError(, )
}

func ( *connection) ( func(StreamType, quic.ConnectionTracingID, quic.ReceiveStream, error) ( bool)) {
	var (
		      atomic.Bool
		 atomic.Bool
		 atomic.Bool
	)

	for {
		,  := .AcceptUniStream(context.Background())
		if  != nil {
			if .logger != nil {
				.logger.Debug("accepting unidirectional stream failed", "error", )
			}
			return
		}

		go func( quic.ReceiveStream) {
			,  := quicvarint.Read(quicvarint.NewReader())
			if  != nil {
				 := .Context().Value(quic.ConnectionTracingKey).(quic.ConnectionTracingID)
				if  != nil && (StreamType(), , , ) {
					return
				}
				if .logger != nil {
					.logger.Debug("reading stream type on stream failed", "stream ID", .StreamID(), "error", )
				}
				return
			}
			// We're only interested in the control stream here.
			switch  {
			case streamTypeControlStream:
			case streamTypeQPACKEncoderStream:
				if  := .CompareAndSwap(false, true); ! {
					.CloseWithError(quic.ApplicationErrorCode(ErrCodeStreamCreationError), "duplicate QPACK encoder stream")
				}
				// Our QPACK implementation doesn't use the dynamic table yet.
				return
			case streamTypeQPACKDecoderStream:
				if  := .CompareAndSwap(false, true); ! {
					.CloseWithError(quic.ApplicationErrorCode(ErrCodeStreamCreationError), "duplicate QPACK decoder stream")
				}
				// Our QPACK implementation doesn't use the dynamic table yet.
				return
			case streamTypePushStream:
				switch .perspective {
				case protocol.PerspectiveClient:
					// we never increased the Push ID, so we don't expect any push streams
					.CloseWithError(quic.ApplicationErrorCode(ErrCodeIDError), "")
				case protocol.PerspectiveServer:
					// only the server can push
					.CloseWithError(quic.ApplicationErrorCode(ErrCodeStreamCreationError), "")
				}
				return
			default:
				if  != nil {
					if (
						StreamType(),
						.Context().Value(quic.ConnectionTracingKey).(quic.ConnectionTracingID),
						,
						nil,
					) {
						return
					}
				}
				.CancelRead(quic.StreamErrorCode(ErrCodeStreamCreationError))
				return
			}
			// Only a single control stream is allowed.
			if  := .CompareAndSwap(false, true); ! {
				.Connection.CloseWithError(quic.ApplicationErrorCode(ErrCodeStreamCreationError), "duplicate control stream")
				return
			}
			.handleControlStream()
		}()
	}
}

func ( *connection) ( quic.ReceiveStream) {
	 := &frameParser{conn: .Connection, r: }
	,  := .ParseNext()
	if  != nil {
		var  *quic.StreamError
		if  == io.EOF || errors.As(, &) {
			.Connection.CloseWithError(quic.ApplicationErrorCode(ErrCodeClosedCriticalStream), "")
			return
		}
		.Connection.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameError), "")
		return
	}
	,  := .(*settingsFrame)
	if ! {
		.Connection.CloseWithError(quic.ApplicationErrorCode(ErrCodeMissingSettings), "")
		return
	}
	.settings = &Settings{
		EnableDatagrams:       .Datagram,
		EnableExtendedConnect: .ExtendedConnect,
		Other:                 .Other,
	}
	close(.receivedSettings)
	if .Datagram {
		// If datagram support was enabled on our side as well as on the server side,
		// we can expect it to have been negotiated both on the transport and on the HTTP/3 layer.
		// Note: ConnectionState() will block until the handshake is complete (relevant when using 0-RTT).
		if .enableDatagrams && !.ConnectionState().SupportsDatagrams {
			.CloseWithError(quic.ApplicationErrorCode(ErrCodeSettingsError), "missing QUIC Datagram support")
			return
		}
		go func() {
			if  := .receiveDatagrams();  != nil {
				if .logger != nil {
					.logger.Debug("receiving datagrams failed", "error", )
				}
			}
		}()
	}

	// we don't support server push, hence we don't expect any GOAWAY frames from the client
	if .perspective == protocol.PerspectiveServer {
		return
	}

	for {
		,  := .ParseNext()
		if  != nil {
			var  *quic.StreamError
			if  == io.EOF || errors.As(, &) {
				.Connection.CloseWithError(quic.ApplicationErrorCode(ErrCodeClosedCriticalStream), "")
				return
			}
			.Connection.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameError), "")
			return
		}
		// GOAWAY is the only frame allowed at this point:
		// * unexpected frames are ignored by the frame parser
		// * we don't support any extension that might add support for more frames
		,  := .(*goAwayFrame)
		if ! {
			.Connection.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameUnexpected), "")
			return
		}
		if .StreamID%4 != 0 { // client-initiated, bidirectional streams
			.Connection.CloseWithError(quic.ApplicationErrorCode(ErrCodeIDError), "")
			return
		}
		.streamMx.Lock()
		if .maxStreamID != protocol.InvalidStreamID && .StreamID > .maxStreamID {
			.streamMx.Unlock()
			.Connection.CloseWithError(quic.ApplicationErrorCode(ErrCodeIDError), "")
			return
		}
		.maxStreamID = .StreamID
		 := len(.streams) > 0
		.streamMx.Unlock()

		// immediately close the connection if there are currently no active requests
		if ! {
			.CloseWithError(quic.ApplicationErrorCode(ErrCodeNoError), "")
			return
		}
	}
}

func ( *connection) ( protocol.StreamID,  []byte) error {
	// TODO: this creates a lot of garbage and an additional copy
	 := make([]byte, 0, len()+8)
	 = quicvarint.Append(, uint64(/4))
	 = append(, ...)
	return .SendDatagram()
}

func ( *connection) () error {
	for {
		,  := .ReceiveDatagram(context.Background())
		if  != nil {
			return 
		}
		, ,  := quicvarint.Parse()
		if  != nil {
			.CloseWithError(quic.ApplicationErrorCode(ErrCodeDatagramError), "")
			return fmt.Errorf("could not read quarter stream id: %w", )
		}
		if  > maxQuarterStreamID {
			.CloseWithError(quic.ApplicationErrorCode(ErrCodeDatagramError), "")
			return fmt.Errorf("invalid quarter stream id: %w", )
		}
		 := protocol.StreamID(4 * )
		.streamMx.Lock()
		,  := .streams[]
		.streamMx.Unlock()
		if ! {
			continue
		}
		.enqueue([:])
	}
}

// ReceivedSettings returns a channel that is closed once the peer's SETTINGS frame was received.
// Settings can be optained from the Settings method after the channel was closed.
func ( *connection) () <-chan struct{} { return .receivedSettings }

// Settings returns the settings received on this connection.
// It is only valid to call this function after the channel returned by ReceivedSettings was closed.
func ( *connection) () *Settings { return .settings }

// Context returns the context of the underlying QUIC connection.
func ( *connection) () context.Context { return .ctx }