package libp2pwebrtc

import (
	
	
	
	
	
	
	

	ic 
	
	
	tpt 

	ma 
	
	
	
)

var _ tpt.CapableConn = &connection{}

const maxAcceptQueueLen = 256

type errConnectionTimeout struct{}

var _ net.Error = &errConnectionTimeout{}

func (errConnectionTimeout) () string   { return "connection timeout" }
func (errConnectionTimeout) () bool   { return true }
func (errConnectionTimeout) () bool { return false }

var errConnClosed = errors.New("connection closed")

type dataChannel struct {
	stream  datachannel.ReadWriteCloser
	channel *webrtc.DataChannel
}

type connection struct {
	pc        *webrtc.PeerConnection
	transport *WebRTCTransport
	scope     network.ConnManagementScope

	closeOnce sync.Once
	closeErr  error

	localPeer      peer.ID
	localMultiaddr ma.Multiaddr

	remotePeer      peer.ID
	remoteKey       ic.PubKey
	remoteMultiaddr ma.Multiaddr

	m            sync.Mutex
	streams      map[uint16]*stream
	nextStreamID atomic.Int32

	acceptQueue chan dataChannel

	ctx    context.Context
	cancel context.CancelFunc
}

func newConnection(
	 network.Direction,
	 *webrtc.PeerConnection,
	 *WebRTCTransport,
	 network.ConnManagementScope,

	 peer.ID,
	 ma.Multiaddr,

	 peer.ID,
	 ic.PubKey,
	 ma.Multiaddr,
	 chan dataChannel,
	 chan struct{},
) (*connection, error) {
	,  := context.WithCancel(context.Background())
	 := &connection{
		pc:        ,
		transport: ,
		scope:     ,

		localPeer:      ,
		localMultiaddr: ,

		remotePeer:      ,
		remoteKey:       ,
		remoteMultiaddr: ,
		ctx:             ,
		cancel:          ,
		streams:         make(map[uint16]*stream),

		acceptQueue: ,
	}
	switch  {
	case network.DirInbound:
		.nextStreamID.Store(1)
	case network.DirOutbound:
		// stream ID 0 is used for the Noise handshake stream
		.nextStreamID.Store(2)
	}

	.OnConnectionStateChange(.onConnectionStateChange)
	.SCTP().OnClose(func( error) {
		if  != nil {
			.closeWithError(fmt.Errorf("%w: %w", errConnClosed, ))
		}
		.closeWithError(errConnClosed)
	})
	select {
	case <-:
		.Close()
		return nil, errConnClosed
	default:
	}
	return , nil
}

// ConnState implements transport.CapableConn
func ( *connection) () network.ConnectionState {
	return network.ConnectionState{Transport: "webrtc-direct"}
}

// Close closes the underlying peerconnection.
func ( *connection) () error {
	.closeWithError(errConnClosed)
	return nil
}

// CloseWithError closes the connection ignoring the error code. As there's no way to signal
// the remote peer on closing the underlying peerconnection, we ignore the error code.
func ( *connection) ( network.ConnErrorCode) error {
	return .Close()
}

// closeWithError is used to Close the connection when the underlying DTLS connection fails
func ( *connection) ( error) {
	.closeOnce.Do(func() {
		.closeErr = 
		// cancel must be called after closeErr is set. This ensures interested goroutines waiting on
		// ctx.Done can read closeErr without holding the conn lock.
		.cancel()
		// closing peerconnection will close the datachannels associated with the streams
		.pc.Close()

		.m.Lock()
		 := .streams
		.streams = nil
		.m.Unlock()
		for ,  := range  {
			.closeForShutdown()
		}
		.scope.Done()
	})
}

func ( *connection) () bool {
	return .ctx.Err() != nil
}

func ( *connection) ( context.Context) (network.MuxedStream, error) {
	if .IsClosed() {
		return nil, .closeErr
	}

	 := .nextStreamID.Add(2) - 2
	if  > math.MaxUint16 {
		return nil, errors.New("exhausted stream ID space")
	}
	 := uint16()
	,  := .pc.CreateDataChannel("", &webrtc.DataChannelInit{ID: &})
	if  != nil {
		return nil, 
	}
	,  := .detachChannel(, )
	if  != nil {
		// There's a race between webrtc.SCTP.OnClose callback and the underlying
		// association closing. It's nicer to close the connection here.
		if errors.Is(, sctp.ErrStreamClosed) {
			.closeWithError(errConnClosed)
			return nil, .closeErr
		}
		.Close()
		return nil, fmt.Errorf("detach channel failed for stream(%d): %w", , )
	}
	 := newStream(, , maxSendMessageSize, func() { .removeStream() })
	if  := .addStream();  != nil {
		.Reset()
		return nil, fmt.Errorf("failed to add stream(%d) to connection: %w", , )
	}
	return , nil
}

func ( *connection) () (network.MuxedStream, error) {
	select {
	case <-.ctx.Done():
		return nil, .closeErr
	case  := <-.acceptQueue:
		 := newStream(.channel, .stream, maxSendMessageSize, func() { .removeStream(*.channel.ID()) })
		if  := .addStream();  != nil {
			.Reset()
			return nil, 
		}
		return , nil
	}
}

func ( *connection) () peer.ID            { return .localPeer }
func ( *connection) () peer.ID           { return .remotePeer }
func ( *connection) () ic.PubKey    { return .remoteKey }
func ( *connection) () ma.Multiaddr  { return .localMultiaddr }
func ( *connection) () ma.Multiaddr { return .remoteMultiaddr }
func ( *connection) () network.ConnScope      { return .scope }
func ( *connection) () tpt.Transport      { return .transport }

func ( *connection) ( *stream) error {
	.m.Lock()
	defer .m.Unlock()
	if .streams == nil {
		return .closeErr
	}
	if ,  := .streams[.id];  {
		return errors.New("stream ID already exists")
	}
	.streams[.id] = 
	return nil
}

func ( *connection) ( uint16) {
	.m.Lock()
	defer .m.Unlock()
	delete(.streams, )
}

func ( *connection) ( webrtc.PeerConnectionState) {
	if  == webrtc.PeerConnectionStateFailed ||  == webrtc.PeerConnectionStateClosed {
		.closeWithError(errConnectionTimeout{})
	}
}

// detachChannel detaches an outgoing channel by taking into account the context
// passed to `OpenStream` as well the closure of the underlying peerconnection
//
// The underlying SCTP stream for a datachannel implements a net.Conn interface.
// However, the datachannel creates a goroutine which continuously reads from
// the SCTP stream and surfaces the data using an OnMessage callback.
//
// The actual abstractions are as follows: webrtc.DataChannel
// wraps pion.DataChannel, which wraps sctp.Stream.
//
// The goroutine for reading, Detach method,
// and the OnMessage callback are present at the webrtc.DataChannel level.
// Detach provides us abstracted access to the underlying pion.DataChannel,
// which allows us to issue Read calls to the datachannel.
// This was desired because it was not feasible to introduce backpressure
// with the OnMessage callbacks. The tradeoff is a change in the semantics of
// the OnOpen callback, and having to force close Read locally.
func ( *connection) ( context.Context,  *webrtc.DataChannel) (datachannel.ReadWriteCloser, error) {
	 := make(chan struct{})

	var  datachannel.ReadWriteCloser
	var  error
	// OnOpen will return immediately for detached datachannels
	// refer: https://github.com/pion/webrtc/blob/7ab3174640b3ce15abebc2516a2ca3939b5f105f/datachannel.go#L278-L282
	.OnOpen(func() {
		,  = .Detach()
		// this is safe since the function should return instantly if the peerconnection is closed
		close()
	})
	select {
	case <-.ctx.Done():
		return nil, .closeErr
	case <-.Done():
		return nil, .Err()
	case <-:
		return , 
	}
}