package rcmgr

import (
	
	
	
	
	
	
	
	

	
	
	
	

	logging 
	
	manet 
)

var log = logging.Logger("rcmgr")

type resourceManager struct {
	limits Limiter

	connLimiter                    *connLimiter
	connRateLimiter                *rate.Limiter
	verifySourceAddressRateLimiter *rate.Limiter

	trace          *trace
	metrics        *metrics
	disableMetrics bool

	allowlist *Allowlist

	system    *systemScope
	transient *transientScope

	allowlistedSystem    *systemScope
	allowlistedTransient *transientScope

	cancelCtx context.Context
	cancel    func()
	wg        sync.WaitGroup

	mx    sync.Mutex
	svc   map[string]*serviceScope
	proto map[protocol.ID]*protocolScope
	peer  map[peer.ID]*peerScope

	stickyProto map[protocol.ID]struct{}
	stickyPeer  map[peer.ID]struct{}

	connId, streamId int64
}

var _ network.ResourceManager = (*resourceManager)(nil)

type systemScope struct {
	*resourceScope
}

var _ network.ResourceScope = (*systemScope)(nil)

type transientScope struct {
	*resourceScope

	system *systemScope
}

var _ network.ResourceScope = (*transientScope)(nil)

type serviceScope struct {
	*resourceScope

	service string
	rcmgr   *resourceManager

	peers map[peer.ID]*resourceScope
}

var _ network.ServiceScope = (*serviceScope)(nil)

type protocolScope struct {
	*resourceScope

	proto protocol.ID
	rcmgr *resourceManager

	peers map[peer.ID]*resourceScope
}

var _ network.ProtocolScope = (*protocolScope)(nil)

type peerScope struct {
	*resourceScope

	peer  peer.ID
	rcmgr *resourceManager
}

var _ network.PeerScope = (*peerScope)(nil)

type connectionScope struct {
	*resourceScope

	dir           network.Direction
	usefd         bool
	isAllowlisted bool
	rcmgr         *resourceManager
	peer          *peerScope
	endpoint      multiaddr.Multiaddr
	ip            netip.Addr
}

var _ network.ConnScope = (*connectionScope)(nil)
var _ network.ConnManagementScope = (*connectionScope)(nil)

type streamScope struct {
	*resourceScope

	dir   network.Direction
	rcmgr *resourceManager
	peer  *peerScope
	svc   *serviceScope
	proto *protocolScope

	peerProtoScope *resourceScope
	peerSvcScope   *resourceScope
}

var _ network.StreamScope = (*streamScope)(nil)
var _ network.StreamManagementScope = (*streamScope)(nil)

type Option func(*resourceManager) error

func ( Limiter,  ...Option) (network.ResourceManager, error) {
	 := newAllowlist()
	 := &resourceManager{
		limits:          ,
		connLimiter:     newConnLimiter(),
		allowlist:       &,
		svc:             make(map[string]*serviceScope),
		proto:           make(map[protocol.ID]*protocolScope),
		peer:            make(map[peer.ID]*peerScope),
		connRateLimiter: newConnRateLimiter(),
	}

	for ,  := range  {
		if  := ();  != nil {
			return nil, 
		}
	}

	 := make(map[string]struct{})
	for ,  := range .connLimiter.networkPrefixLimitV4 {
		[.Network.String()] = struct{}{}
	}
	for ,  := range .connLimiter.networkPrefixLimitV6 {
		[.Network.String()] = struct{}{}
	}
	for ,  := range .allowedNetworks {
		,  := netip.ParsePrefix(.String())
		if  != nil {
			log.Debugf("failed to parse prefix from allowlist %s, %s", , )
			continue
		}
		if ,  := [.String()]; ! {
			// connlimiter doesn't know about this network. Let's fix that
			.connLimiter.addNetworkPrefixLimit(.Addr().Is6(), NetworkPrefixLimit{
				Network:   ,
				ConnCount: .limits.GetAllowlistedSystemLimits().GetConnTotalLimit(),
			})
		}
	}
	.verifySourceAddressRateLimiter = newVerifySourceAddressRateLimiter(.connLimiter)

	if !.disableMetrics {
		var  TraceReporter
		,  := NewStatsTraceReporter()
		if  != nil {
			log.Errorf("failed to initialise StatsTraceReporter %s", )
		} else {
			if .trace == nil {
				.trace = &trace{}
			}
			 := false
			for ,  := range .trace.reporters {
				if  ==  {
					 = true
					break
				}
			}
			if ! {
				.trace.reporters = append(.trace.reporters, )
			}
		}
	}

	if  := .trace.Start();  != nil {
		return nil, 
	}

	.system = newSystemScope(.GetSystemLimits(), , "system")
	.system.IncRef()
	.transient = newTransientScope(.GetTransientLimits(), , "transient", .system.resourceScope)
	.transient.IncRef()

	.allowlistedSystem = newSystemScope(.GetAllowlistedSystemLimits(), , "allowlistedSystem")
	.allowlistedSystem.IncRef()
	.allowlistedTransient = newTransientScope(.GetAllowlistedTransientLimits(), , "allowlistedTransient", .allowlistedSystem.resourceScope)
	.allowlistedTransient.IncRef()

	.cancelCtx, .cancel = context.WithCancel(context.Background())

	.wg.Add(1)
	go .background()

	return , nil
}

func ( *resourceManager) () *Allowlist {
	return .allowlist
}

// GetAllowlist tries to get the allowlist from the given resourcemanager
// interface by checking to see if its concrete type is a resourceManager.
// Returns nil if it fails to get the allowlist.
func ( network.ResourceManager) *Allowlist {
	,  := .(*resourceManager)
	if ! {
		return nil
	}

	return .allowlist
}

func ( *resourceManager) ( func(network.ResourceScope) error) error {
	return (.system)
}

func ( *resourceManager) ( func(network.ResourceScope) error) error {
	return (.transient)
}

func ( *resourceManager) ( string,  func(network.ServiceScope) error) error {
	 := .getServiceScope()
	defer .DecRef()

	return ()
}

func ( *resourceManager) ( protocol.ID,  func(network.ProtocolScope) error) error {
	 := .getProtocolScope()
	defer .DecRef()

	return ()
}

func ( *resourceManager) ( peer.ID,  func(network.PeerScope) error) error {
	 := .getPeerScope()
	defer .DecRef()

	return ()
}

func ( *resourceManager) ( string) *serviceScope {
	.mx.Lock()
	defer .mx.Unlock()

	,  := .svc[]
	if ! {
		 = newServiceScope(, .limits.GetServiceLimits(), )
		.svc[] = 
	}

	.IncRef()
	return 
}

func ( *resourceManager) ( protocol.ID) *protocolScope {
	.mx.Lock()
	defer .mx.Unlock()

	,  := .proto[]
	if ! {
		 = newProtocolScope(, .limits.GetProtocolLimits(), )
		.proto[] = 
	}

	.IncRef()
	return 
}

func ( *resourceManager) ( protocol.ID) {
	.mx.Lock()
	defer .mx.Unlock()

	if .stickyProto == nil {
		.stickyProto = make(map[protocol.ID]struct{})
	}
	.stickyProto[] = struct{}{}
}

func ( *resourceManager) ( peer.ID) *peerScope {
	.mx.Lock()
	defer .mx.Unlock()

	,  := .peer[]
	if ! {
		 = newPeerScope(, .limits.GetPeerLimits(), )
		.peer[] = 
	}

	.IncRef()
	return 
}

func ( *resourceManager) ( peer.ID) {
	.mx.Lock()
	defer .mx.Unlock()

	if .stickyPeer == nil {
		.stickyPeer = make(map[peer.ID]struct{})
	}

	.stickyPeer[] = struct{}{}
}

func ( *resourceManager) () int64 {
	.mx.Lock()
	defer .mx.Unlock()

	.connId++
	return .connId
}

func ( *resourceManager) () int64 {
	.mx.Lock()
	defer .mx.Unlock()

	.streamId++
	return .streamId
}

// VerifySourceAddress tells the transport to verify the peer's IP address before
// initiating a handshake.
func ( *resourceManager) ( net.Addr) bool {
	if .verifySourceAddressRateLimiter == nil {
		return false
	}
	,  := netip.ParseAddrPort(.String())
	if  != nil {
		return true
	}
	return !.verifySourceAddressRateLimiter.Allow(.Addr())
}

// OpenConnectionNoIP is deprecated and will be removed in the next release
//
// Deprecated: Use OpenConnection instead
func ( *resourceManager) ( network.Direction,  bool,  multiaddr.Multiaddr) (network.ConnManagementScope, error) {
	return .openConnection(, , , netip.Addr{})
}

func ( *resourceManager) ( network.Direction,  bool,  multiaddr.Multiaddr) (network.ConnManagementScope, error) {
	,  := manet.ToIP()
	if  != nil {
		// No IP address
		return .openConnection(, , , netip.Addr{})
	}

	,  := netip.AddrFromSlice()
	if ! {
		return nil, fmt.Errorf("failed to convert ip to netip.Addr")
	}
	return .openConnection(, , , )
}

func ( *resourceManager) ( network.Direction,  bool,  multiaddr.Multiaddr,  netip.Addr) (network.ConnManagementScope, error) {
	if !.connRateLimiter.Allow() {
		return nil, errors.New("rate limit exceeded")
	}

	if .IsValid() {
		if  := .connLimiter.addConn(); ! {
			return nil, fmt.Errorf("connections per ip limit exceeded for %s", )
		}
	}

	var  *connectionScope
	 = newConnectionScope(, , .limits.GetConnLimits(), , , )

	 := .AddConn(, )
	if  != nil && .IsValid() {
		// Try again if this is an allowlisted connection
		// Failed to open connection, let's see if this was allowlisted and try again
		 := .allowlist.Allowed()
		if  {
			.Done()
			 = newAllowListedConnectionScope(, , .limits.GetConnLimits(), , )
			 = .AddConn(, )
		}
	}

	if  != nil {
		.Done()
		.metrics.BlockConn(, )
		return nil, 
	}

	.metrics.AllowConn(, )
	return , nil
}

func ( *resourceManager) ( peer.ID,  network.Direction) (network.StreamManagementScope, error) {
	 := .getPeerScope()
	 := newStreamScope(, .limits.GetStreamLimits(), , )
	.DecRef() // we have the reference in edges

	 := .AddStream()
	if  != nil {
		.Done()
		.metrics.BlockStream(, )
		return nil, 
	}

	.metrics.AllowStream(, )
	return , nil
}

func ( *resourceManager) () error {
	.cancel()
	.wg.Wait()
	.trace.Close()

	return nil
}

func ( *resourceManager) () {
	defer .wg.Done()

	// periodically garbage collects unused peer and protocol scopes
	 := time.NewTicker(time.Minute)
	defer .Stop()

	for {
		select {
		case <-.C:
			.gc()
		case <-.cancelCtx.Done():
			return
		}
	}
}

func ( *resourceManager) () {
	.mx.Lock()
	defer .mx.Unlock()

	for ,  := range .proto {
		,  := .stickyProto[]
		if  {
			continue
		}
		if .IsUnused() {
			.Done()
			delete(.proto, )
		}
	}

	var  []peer.ID
	for ,  := range .peer {
		,  := .stickyPeer[]
		if  {
			continue
		}

		if .IsUnused() {
			.Done()
			delete(.peer, )
			 = append(, )
		}
	}

	for ,  := range .svc {
		.Lock()
		for ,  := range  {
			,  := .peers[]
			if  {
				.Done()
				delete(.peers, )
			}
		}
		.Unlock()
	}

	for ,  := range .proto {
		.Lock()
		for ,  := range  {
			,  := .peers[]
			if  {
				.Done()
				delete(.peers, )
			}
		}
		.Unlock()
	}
}

func newSystemScope( Limit,  *resourceManager,  string) *systemScope {
	return &systemScope{
		resourceScope: newResourceScope(, nil, , .trace, .metrics),
	}
}

func newTransientScope( Limit,  *resourceManager,  string,  *resourceScope) *transientScope {
	return &transientScope{
		resourceScope: newResourceScope(,
			[]*resourceScope{},
			, .trace, .metrics),
		system: .system,
	}
}

func newServiceScope( string,  Limit,  *resourceManager) *serviceScope {
	return &serviceScope{
		resourceScope: newResourceScope(,
			[]*resourceScope{.system.resourceScope},
			fmt.Sprintf("service:%s", ), .trace, .metrics),
		service: ,
		rcmgr:   ,
	}
}

func newProtocolScope( protocol.ID,  Limit,  *resourceManager) *protocolScope {
	return &protocolScope{
		resourceScope: newResourceScope(,
			[]*resourceScope{.system.resourceScope},
			fmt.Sprintf("protocol:%s", ), .trace, .metrics),
		proto: ,
		rcmgr: ,
	}
}

func newPeerScope( peer.ID,  Limit,  *resourceManager) *peerScope {
	return &peerScope{
		resourceScope: newResourceScope(,
			[]*resourceScope{.system.resourceScope},
			peerScopeName(), .trace, .metrics),
		peer:  ,
		rcmgr: ,
	}
}

func newConnectionScope( network.Direction,  bool,  Limit,  *resourceManager,  multiaddr.Multiaddr,  netip.Addr) *connectionScope {
	return &connectionScope{
		resourceScope: newResourceScope(,
			[]*resourceScope{.transient.resourceScope, .system.resourceScope},
			connScopeName(.nextConnId()), .trace, .metrics),
		dir:      ,
		usefd:    ,
		rcmgr:    ,
		endpoint: ,
		ip:       ,
	}
}

func newAllowListedConnectionScope( network.Direction,  bool,  Limit,  *resourceManager,  multiaddr.Multiaddr) *connectionScope {
	return &connectionScope{
		resourceScope: newResourceScope(,
			[]*resourceScope{.allowlistedTransient.resourceScope, .allowlistedSystem.resourceScope},
			connScopeName(.nextConnId()), .trace, .metrics),
		dir:           ,
		usefd:         ,
		rcmgr:         ,
		endpoint:      ,
		isAllowlisted: true,
	}
}

func newStreamScope( network.Direction,  Limit,  *peerScope,  *resourceManager) *streamScope {
	return &streamScope{
		resourceScope: newResourceScope(,
			[]*resourceScope{.resourceScope, .transient.resourceScope, .system.resourceScope},
			streamScopeName(.nextStreamId()), .trace, .metrics),
		dir:   ,
		rcmgr: .rcmgr,
		peer:  ,
	}
}

func ( string) bool {
	return  == "system"
}

func ( string) bool {
	return  == "transient"
}

func streamScopeName( int64) string {
	return fmt.Sprintf("stream-%d", )
}

func ( string) bool {
	return strings.HasPrefix(, "stream-") && !IsSpan()
}

func connScopeName( int64) string {
	return fmt.Sprintf("conn-%d", )
}

func ( string) bool {
	return strings.HasPrefix(, "conn-") && !IsSpan()
}

func peerScopeName( peer.ID) string {
	return fmt.Sprintf("peer:%s", )
}

// PeerStrInScopeName returns "" if name is not a peerScopeName. Returns a string to avoid allocating a peer ID object
func ( string) string {
	if !strings.HasPrefix(, "peer:") || IsSpan() {
		return ""
	}
	// Index to avoid allocating a new string
	 := strings.Index(, "peer:")
	if  == -1 {
		return ""
	}
	 := ([+len("peer:"):])
	return 
}

// ParseProtocolScopeName returns the service name if name is a serviceScopeName.
// Otherwise returns ""
func ( string) string {
	if strings.HasPrefix(, "protocol:") && !IsSpan() {
		if strings.Contains(, "peer:") {
			// This is a protocol peer scope
			return ""
		}

		// Index to avoid allocating a new string
		 := strings.Index(, ":")
		if  == -1 {
			return ""
		}
		return [+1:]
	}
	return ""
}

func ( *serviceScope) () string {
	return .service
}

func ( *serviceScope) ( peer.ID) *resourceScope {
	.Lock()
	defer .Unlock()

	,  := .peers[]
	if  {
		.IncRef()
		return 
	}

	 := .rcmgr.limits.GetServicePeerLimits(.service)

	if .peers == nil {
		.peers = make(map[peer.ID]*resourceScope)
	}

	 = newResourceScope(, nil, fmt.Sprintf("%s.peer:%s", .name, ), .rcmgr.trace, .rcmgr.metrics)
	.peers[] = 

	.IncRef()
	return 
}

func ( *protocolScope) () protocol.ID {
	return .proto
}

func ( *protocolScope) ( peer.ID) *resourceScope {
	.Lock()
	defer .Unlock()

	,  := .peers[]
	if  {
		.IncRef()
		return 
	}

	 := .rcmgr.limits.GetProtocolPeerLimits(.proto)

	if .peers == nil {
		.peers = make(map[peer.ID]*resourceScope)
	}

	 = newResourceScope(, nil, fmt.Sprintf("%s.peer:%s", .name, ), .rcmgr.trace, .rcmgr.metrics)
	.peers[] = 

	.IncRef()
	return 
}

func ( *peerScope) () peer.ID {
	return .peer
}

func ( *connectionScope) () network.PeerScope {
	.Lock()
	defer .Unlock()

	// avoid nil is not nil footgun; go....
	if .peer == nil {
		return nil
	}

	return .peer
}

func ( *connectionScope) () {
	.Lock()
	defer .Unlock()
	if .done {
		return
	}
	if .ip.IsValid() {
		.rcmgr.connLimiter.rmConn(.ip)
	}
	.resourceScope.doneUnlocked()
}

// transferAllowedToStandard transfers this connection scope from being part of
// the allowlist set of scopes to being part of the standard set of scopes.
// Happens when we first allowlisted this connection due to its IP, but later
// discovered that the peer id not what we expected.
func ( *connectionScope) () ( error) {

	 := .rcmgr.system.resourceScope
	 := .rcmgr.transient.resourceScope

	 := .resourceScope.rc.stat()

	for ,  := range .edges {
		.ReleaseForChild()
		.DecRef() // removed from edges
	}
	.edges = nil

	if  := .ReserveForChild();  != nil {
		return 
	}
	.IncRef()

	// Undo this if we fail later
	defer func() {
		if  != nil {
			.ReleaseForChild()
			.DecRef()
		}
	}()

	if  := .ReserveForChild();  != nil {
		return 
	}
	.IncRef()

	// Update edges
	.edges = []*resourceScope{
		,
		,
	}
	return nil
}

func ( *connectionScope) ( peer.ID) error {
	.Lock()
	defer .Unlock()

	if .peer != nil {
		return fmt.Errorf("connection scope already attached to a peer")
	}

	 := .rcmgr.system
	 := .rcmgr.transient

	if .isAllowlisted {
		 = .rcmgr.allowlistedSystem
		 = .rcmgr.allowlistedTransient

		if !.rcmgr.allowlist.AllowedPeerAndMultiaddr(, .endpoint) {
			.isAllowlisted = false

			// This is not an allowed peer + multiaddr combination. We need to
			// transfer this connection to the general scope. We'll do this first by
			// transferring the connection to the system and transient scopes, then
			// continue on with this function. The idea is that a connection
			// shouldn't get the benefit of evading the transient scope because it
			// was _almost_ an allowlisted connection.
			if  := .transferAllowedToStandard();  != nil {
				// Failed to transfer this connection to the standard scopes
				return 
			}

			// set the system and transient scopes to the non-allowlisted ones
			 = .rcmgr.system
			 = .rcmgr.transient
		}
	}

	.peer = .rcmgr.getPeerScope()

	// juggle resources from transient scope to peer scope
	 := .resourceScope.rc.stat()
	if  := .peer.ReserveForChild();  != nil {
		.peer.DecRef()
		.peer = nil
		.rcmgr.metrics.BlockPeer()
		return 
	}

	.ReleaseForChild()
	.DecRef() // removed from edges

	// update edges
	 := []*resourceScope{
		.peer.resourceScope,
		.resourceScope,
	}
	.resourceScope.edges = 

	.rcmgr.metrics.AllowPeer()
	return nil
}

func ( *streamScope) () network.ProtocolScope {
	.Lock()
	defer .Unlock()

	// avoid nil is not nil footgun; go....
	if .proto == nil {
		return nil
	}

	return .proto
}

func ( *streamScope) ( protocol.ID) error {
	.Lock()
	defer .Unlock()

	if .proto != nil {
		return fmt.Errorf("stream scope already attached to a protocol")
	}

	.proto = .rcmgr.getProtocolScope()

	// juggle resources from transient scope to protocol scope
	 := .resourceScope.rc.stat()
	if  := .proto.ReserveForChild();  != nil {
		.proto.DecRef()
		.proto = nil
		.rcmgr.metrics.BlockProtocol()
		return 
	}

	.peerProtoScope = .proto.getPeerScope(.peer.peer)
	if  := .peerProtoScope.ReserveForChild();  != nil {
		.proto.ReleaseForChild()
		.proto.DecRef()
		.proto = nil
		.peerProtoScope.DecRef()
		.peerProtoScope = nil
		.rcmgr.metrics.BlockProtocolPeer(, .peer.peer)
		return 
	}

	.rcmgr.transient.ReleaseForChild()
	.rcmgr.transient.DecRef() // removed from edges

	// update edges
	 := []*resourceScope{
		.peer.resourceScope,
		.peerProtoScope,
		.proto.resourceScope,
		.rcmgr.system.resourceScope,
	}
	.resourceScope.edges = 

	.rcmgr.metrics.AllowProtocol()
	return nil
}

func ( *streamScope) () network.ServiceScope {
	.Lock()
	defer .Unlock()

	// avoid nil is not nil footgun; go....
	if .svc == nil {
		return nil
	}

	return .svc
}

func ( *streamScope) ( string) error {
	.Lock()
	defer .Unlock()

	if .svc != nil {
		return fmt.Errorf("stream scope already attached to a service")
	}
	if .proto == nil {
		return fmt.Errorf("stream scope not attached to a protocol")
	}

	.svc = .rcmgr.getServiceScope()

	// reserve resources in service
	 := .resourceScope.rc.stat()
	if  := .svc.ReserveForChild();  != nil {
		.svc.DecRef()
		.svc = nil
		.rcmgr.metrics.BlockService()
		return 
	}

	// get the per peer service scope constraint, if any
	.peerSvcScope = .svc.getPeerScope(.peer.peer)
	if  := .peerSvcScope.ReserveForChild();  != nil {
		.svc.ReleaseForChild()
		.svc.DecRef()
		.svc = nil
		.peerSvcScope.DecRef()
		.peerSvcScope = nil
		.rcmgr.metrics.BlockServicePeer(, .peer.peer)
		return 
	}

	// update edges
	 := []*resourceScope{
		.peer.resourceScope,
		.peerProtoScope,
		.peerSvcScope,
		.proto.resourceScope,
		.svc.resourceScope,
		.rcmgr.system.resourceScope,
	}
	.resourceScope.edges = 

	.rcmgr.metrics.AllowService()
	return nil
}

func ( *streamScope) () network.PeerScope {
	.Lock()
	defer .Unlock()

	// avoid nil is not nil footgun; go....
	if .peer == nil {
		return nil
	}

	return .peer
}