package tcpreuse

import (
	
	
	
	
	
	

	
	
	logging 
	
	ma 
	manet 
)

const acceptQueueSize = 64 // It is fine to read 3 bytes from 64 connections in parallel.

// How long we wait for a connection to be accepted before dropping it.
const acceptTimeout = 30 * time.Second

var log = logging.Logger("tcp-demultiplex")

// ConnMgr enables you to share the same listen address between TCP and WebSocket transports.
type ConnMgr struct {
	enableReuseport bool
	reuse           reuseport.Transport
	upgrader        transport.Upgrader

	mx        sync.Mutex
	listeners map[string]*multiplexedListener
}

func ( bool,  transport.Upgrader) *ConnMgr {
	return &ConnMgr{
		enableReuseport: ,
		reuse:           reuseport.Transport{},
		upgrader:        ,
		listeners:       make(map[string]*multiplexedListener),
	}
}

func ( *ConnMgr) ( ma.Multiaddr) (transport.GatedMaListener, error) {
	var  manet.Listener
	var  error
	if .useReuseport() {
		,  = .reuse.Listen()
		if  != nil {
			return nil, 
		}
	} else {
		,  = manet.Listen()
		if  != nil {
			return nil, 
		}
	}
	return .upgrader.GateMaListener(), nil
}

func ( *ConnMgr) () bool {
	return .enableReuseport && ReuseportIsAvailable()
}

func getTCPAddr( ma.Multiaddr) (ma.Multiaddr, error) {
	 := false
	,  := ma.SplitFunc(, func( ma.Component) bool {
		if  {
			return true
		}
		if .Protocol().Code == ma.P_TCP {
			 = true
		}
		return false
	})
	if ! {
		return nil, fmt.Errorf("invalid listen addr %s, need tcp address", )
	}
	return , nil
}

// DemultiplexedListen returns a listener for laddr listening for `connType` connections. The connections
// accepted from returned listeners need to be upgraded with a `transport.Upgrader`.
// NOTE: All listeners for port 0 share the same underlying socket, so they have the same specific port.
func ( *ConnMgr) ( ma.Multiaddr,  DemultiplexedConnType) (transport.GatedMaListener, error) {
	if !.IsKnown() {
		return nil, fmt.Errorf("unknown connection type: %s", )
	}
	,  := getTCPAddr()
	if  != nil {
		return nil, 
	}

	.mx.Lock()
	defer .mx.Unlock()
	,  := .listeners[.String()]
	if  {
		,  := .DemultiplexedListen()
		if  != nil {
			return nil, 
		}
		return , nil
	}

	,  := .gatedMaListen()
	if  != nil {
		return nil, 
	}

	,  := context.WithCancel(context.Background())
	 := func() error {
		()
		.mx.Lock()
		defer .mx.Unlock()
		delete(.listeners, .String())
		delete(.listeners, .Multiaddr().String())
		return .Close()
	}
	 = &multiplexedListener{
		GatedMaListener: ,
		listeners:       make(map[DemultiplexedConnType]*demultiplexedListener),
		ctx:             ,
		closeFn:         ,
	}
	.listeners[.String()] = 
	.listeners[.Multiaddr().String()] = 

	,  := .DemultiplexedListen()
	if  != nil {
		 := .Close()
		return nil, errors.Join(, )
	}

	.wg.Add(1)
	go .run()

	return , nil
}

var _ transport.GatedMaListener = &demultiplexedListener{}

type multiplexedListener struct {
	transport.GatedMaListener
	listeners map[DemultiplexedConnType]*demultiplexedListener
	mx        sync.RWMutex

	ctx     context.Context
	closeFn func() error
	wg      sync.WaitGroup
}

var ErrListenerExists = errors.New("listener already exists for this conn type on this address")

func ( *multiplexedListener) ( DemultiplexedConnType) (transport.GatedMaListener, error) {
	if !.IsKnown() {
		return nil, fmt.Errorf("unknown connection type: %s", )
	}

	.mx.Lock()
	defer .mx.Unlock()
	if ,  := .listeners[];  {
		return nil, ErrListenerExists
	}

	,  := context.WithCancel(.ctx)
	 := &demultiplexedListener{
		buffer:     make(chan *connWithScope),
		inner:      .GatedMaListener,
		ctx:        ,
		cancelFunc: ,
		closeFn:    func() error { .removeDemultiplexedListener(); return nil },
	}

	.listeners[] = 

	return , nil
}

func ( *multiplexedListener) () error {
	defer .Close()
	defer .wg.Done()
	 := make(chan struct{}, acceptQueueSize)
	for {
		, ,  := .GatedMaListener.Accept()
		if  != nil {
			return 
		}
		,  := context.WithTimeout(.ctx, acceptTimeout)
		select {
		case  <- struct{}{}:
		case <-.Done():
			()
			.Done()
			.Close()
			log.Debug("accept queue full, dropping connection", "remote_addr", .RemoteMultiaddr())
			continue
		case <-.ctx.Done():
			()
			.Done()
			.Close()
			log.Debug("listener closed; dropping connection", "remote_addr", .RemoteMultiaddr())
			continue
		}

		.wg.Add(1)
		go func() {
			defer func() { <- }()
			defer .wg.Done()
			defer ()
			, ,  := identifyConnType()
			if  != nil {
				// conn closed by identifyConnType
				.Done()
				log.Debug("error demultiplexing connection", "error", )
				return
			}

			,  := manetConnWithScope(, )
			if  != nil {
				.Done()
				 := .Close()
				 = errors.Join(, )
				log.Debug("error wrapping connection with scope", "error", )
				return
			}

			.mx.RLock()
			,  := .listeners[]
			.mx.RUnlock()
			if ! {
				 := .Close()
				if  != nil {
					log.Debug("no registered listener for demultiplex connection. Error closing the connection", "type", , "close_error", )
				} else {
					log.Debug("no registered listener for demultiplex connection", "type", )
				}
				return
			}

			select {
			case .buffer <- :
			case <-.Done():
				log.Debug("accept timeout; dropping connection", "remote", .RemoteMultiaddr())
				.Close()
			}
		}()
	}
}

func ( *multiplexedListener) () error {
	.mx.Lock()
	for ,  := range .listeners {
		.cancelFunc()
	}
	 := .closeListener()
	.mx.Unlock()
	.wg.Wait()
	return 
}

func ( *multiplexedListener) () error {
	 := .GatedMaListener.Close()
	 := .closeFn()
	return errors.Join(, )
}

func ( *multiplexedListener) ( DemultiplexedConnType) {
	.mx.Lock()
	defer .mx.Unlock()

	delete(.listeners, )
	if len(.listeners) == 0 {
		.closeListener()
		.mx.Unlock()
		.wg.Wait()
		.mx.Lock()
	}
}

type demultiplexedListener struct {
	buffer     chan *connWithScope
	inner      transport.GatedMaListener
	ctx        context.Context
	cancelFunc context.CancelFunc
	closeFn    func() error
}

func ( *demultiplexedListener) () (manet.Conn, network.ConnManagementScope, error) {
	select {
	case  := <-.buffer:
		return .ManetTCPConnInterface, .ConnScope, nil
	case <-.ctx.Done():
		return nil, nil, transport.ErrListenerClosed
	}
}

func ( *demultiplexedListener) () error {
	.cancelFunc()
	return .closeFn()
}

func ( *demultiplexedListener) () ma.Multiaddr {
	return .inner.Multiaddr()
}

func ( *demultiplexedListener) () net.Addr {
	return .inner.Addr()
}