// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package ice

import (
	
	
	
	
	
	
	

	
	
)

type bufferedConn struct {
	net.Conn
	buf    *packetio.Buffer
	logger logging.LeveledLogger
	closed int32
}

func newBufferedConn( net.Conn,  int,  logging.LeveledLogger) net.Conn {
	 := packetio.NewBuffer()
	if  > 0 {
		.SetLimitSize()
	}

	 := &bufferedConn{
		Conn:   ,
		buf:    ,
		logger: ,
	}

	go .writeProcess()

	return 
}

func ( *bufferedConn) ( []byte) (int, error) {
	,  := .buf.Write()
	if  != nil {
		return , 
	}

	return , nil
}

func ( *bufferedConn) () {
	 := make([]byte, receiveMTU)
	for atomic.LoadInt32(&.closed) == 0 {
		,  := .buf.Read()
		if errors.Is(, io.EOF) {
			return
		}

		if  != nil {
			.logger.Warnf("Failed to read from buffer: %s", )

			continue
		}

		if ,  := .Conn.Write([:]);  != nil {
			.logger.Warnf("Failed to write: %s", )

			continue
		}
	}
}

func ( *bufferedConn) () error {
	atomic.StoreInt32(&.closed, 1)
	_ = .buf.Close()

	return .Conn.Close()
}

type tcpPacketConn struct {
	params *tcpPacketParams

	// conns is a map of net.Conns indexed by remote net.Addr.String()
	conns map[string]net.Conn

	recvChan chan streamingPacket

	mu         sync.Mutex
	wg         sync.WaitGroup
	closedChan chan struct{}
	closeOnce  sync.Once
	aliveTimer *time.Timer
}

type streamingPacket struct {
	Data  []byte
	RAddr net.Addr
	Err   error
}

type tcpPacketParams struct {
	ReadBuffer    int
	LocalAddr     net.Addr
	Logger        logging.LeveledLogger
	WriteBuffer   int
	AliveDuration time.Duration
}

func newTCPPacketConn( tcpPacketParams) *tcpPacketConn {
	 := &tcpPacketConn{
		params: &,

		conns: map[string]net.Conn{},

		recvChan:   make(chan streamingPacket, .ReadBuffer),
		closedChan: make(chan struct{}),
	}

	if .AliveDuration > 0 {
		.aliveTimer = time.AfterFunc(.AliveDuration, func() {
			.params.Logger.Warn("close tcp packet conn by alive timeout")
			_ = .Close()
		})
	}

	return 
}

func ( *tcpPacketConn) () {
	.mu.Lock()
	if .aliveTimer != nil {
		.aliveTimer.Stop()
	}
	.mu.Unlock()
}

func ( *tcpPacketConn) ( net.Conn,  []byte) error {
	.params.Logger.Infof(
		"Added connection: %s remote %s to local %s",
		.RemoteAddr().Network(),
		.RemoteAddr(),
		.LocalAddr(),
	)

	.mu.Lock()
	defer .mu.Unlock()

	select {
	case <-.closedChan:
		return io.ErrClosedPipe
	default:
	}

	if ,  := .conns[.RemoteAddr().String()];  {
		return fmt.Errorf("%w: %s", errConnectionAddrAlreadyExist, .RemoteAddr().String())
	}

	if .params.WriteBuffer > 0 {
		 = newBufferedConn(, .params.WriteBuffer, .params.Logger)
	}
	.conns[.RemoteAddr().String()] = 

	.wg.Add(1)
	go func() {
		defer .wg.Done()
		if  != nil {
			select {
			case <-.closedChan:
				// NOTE: recvChan can fill up and never drain in edge
				// cases while closing a connection, which can cause the
				// packetConn to never finish closing. Bail out early
				// here to prevent that.
				return
			case .recvChan <- streamingPacket{, .RemoteAddr(), nil}:
			}
		}
		.startReading()
	}()

	return nil
}

func ( *tcpPacketConn) ( net.Conn) {
	 := make([]byte, receiveMTU)

	for {
		,  := readStreamingPacket(, )
		if  != nil {
			.params.Logger.Warnf("Failed to read streaming packet: %s", )
			 := .removeConn()
			// Only propagate connection closure errors if no other open connection exists.
			if  || !(errors.Is(, io.EOF) || errors.Is(, net.ErrClosed)) {
				.handleRecv(streamingPacket{nil, .RemoteAddr(), })
			}

			return
		}

		 := make([]byte, )
		copy(, [:])

		.handleRecv(streamingPacket{, .RemoteAddr(), nil})
	}
}

func ( *tcpPacketConn) ( streamingPacket) {
	.mu.Lock()

	 := .recvChan
	if .isClosed() {
		 = nil
	}

	.mu.Unlock()

	select {
	case  <- :
	case <-.closedChan:
	}
}

func ( *tcpPacketConn) () bool {
	select {
	case <-.closedChan:
		return true
	default:
		return false
	}
}

// WriteTo is for passive and s-o candidates.
func ( *tcpPacketConn) ( []byte) ( int,  net.Addr,  error) {
	,  := <-.recvChan

	if ! {
		return 0, nil, io.ErrClosedPipe
	}

	if .Err != nil {
		return 0, .RAddr, .Err
	}

	if cap() < len(.Data) {
		return 0, .RAddr, io.ErrShortBuffer
	}

	 = len(.Data)
	copy(, .Data[:])

	return , .RAddr, 
}

// WriteTo is for active and s-o candidates.
func ( *tcpPacketConn) ( []byte,  net.Addr) ( int,  error) {
	.mu.Lock()
	,  := .conns[.String()]
	.mu.Unlock()

	if ! {
		return 0, io.ErrClosedPipe
	}

	,  = writeStreamingPacket(, )
	if  != nil {
		.params.Logger.Tracef("%w %s", errWrite, )

		return , 
	}

	return , 
}

func ( *tcpPacketConn) ( io.Closer) {
	 := .Close()
	if  != nil {
		.params.Logger.Warnf("%v: %s", errClosingConnection, )
	}
}

func ( *tcpPacketConn) ( net.Conn) bool {
	.mu.Lock()
	defer .mu.Unlock()

	.closeAndLogError()

	delete(.conns, .RemoteAddr().String())

	return len(.conns) == 0
}

func ( *tcpPacketConn) () error {
	.mu.Lock()

	var  bool
	.closeOnce.Do(func() {
		close(.closedChan)
		 = true
		if .aliveTimer != nil {
			.aliveTimer.Stop()
		}
	})

	for ,  := range .conns {
		.closeAndLogError()
		delete(.conns, .RemoteAddr().String())
	}

	.mu.Unlock()

	.wg.Wait()

	if  {
		close(.recvChan)
	}

	return nil
}

func ( *tcpPacketConn) () net.Addr {
	return .params.LocalAddr
}

func ( *tcpPacketConn) (time.Time) error {
	return nil
}

func ( *tcpPacketConn) (time.Time) error {
	return nil
}

func ( *tcpPacketConn) (time.Time) error {
	return nil
}

func ( *tcpPacketConn) () <-chan struct{} {
	return .closedChan
}

func ( *tcpPacketConn) () string {
	return fmt.Sprintf("tcpPacketConn{LocalAddr: %s}", .params.LocalAddr)
}