package autorelay

import (
	
	
	
	
	
	
	
	

	

	
	
	
	
	
	circuitv2 
	circuitv2_proto 
	

	ma 
)

const protoIDv2 = circuitv2_proto.ProtoIDv2Hop

// Terminology:
// Candidate: Once we connect to a node and it supports relay protocol,
// we call it a candidate, and consider using it as a relay.
//
// Relay: Out of the list candidates, the ones we have a reservation with.
// Currently, we just randomly select a candidate, but we can employ more sophisticated
// selection strategies here (e.g. by facotring in the RTT).

const (
	rsvpRefreshInterval = time.Minute
	rsvpExpirationSlack = 2 * time.Minute

	autorelayTag  = "autorelay"
	maxRelayAddrs = 100
)

type candidate struct {
	added           time.Time
	supportsRelayV2 bool
	ai              peer.AddrInfo
}

// relayFinder is a Host that uses relays for connectivity when a NAT is detected.
type relayFinder struct {
	bootTime time.Time
	host     host.Host

	conf *config

	refCount sync.WaitGroup

	ctxCancel   context.CancelFunc
	ctxCancelMx sync.Mutex

	peerSource PeerSource

	candidateFound             chan struct{} // receives every time we find a new relay candidate
	candidateMx                sync.Mutex
	candidates                 map[peer.ID]*candidate
	backoff                    map[peer.ID]time.Time
	maybeConnectToRelayTrigger chan struct{} // cap: 1
	// Any time _something_ happens that might cause us to need new candidates.
	// This could be
	// * the disconnection of a relay
	// * the failed attempt to obtain a reservation with a current candidate
	// * a candidate is deleted due to its age
	maybeRequestNewCandidates chan struct{} // cap: 1.

	relayReservationUpdated chan struct{}

	relayMx sync.Mutex
	relays  map[peer.ID]*circuitv2.Reservation

	circuitAddrs []ma.Multiaddr

	// A channel that triggers a run of `runScheduledWork`.
	triggerRunScheduledWork chan struct{}
	metricsTracer           MetricsTracer

	emitter event.Emitter
}

var errAlreadyRunning = errors.New("relayFinder already running")

func newRelayFinder( host.Host,  *config) (*relayFinder, error) {
	if .peerSource == nil {
		panic("Can not create a new relayFinder. Need a Peer Source fn or a list of static relays. Refer to the documentation around `libp2p.EnableAutoRelay`")
	}

	,  := .EventBus().Emitter(new(event.EvtAutoRelayAddrsUpdated), eventbus.Stateful)
	if  != nil {
		return nil, 
	}

	return &relayFinder{
		bootTime:                   .clock.Now(),
		host:                       ,
		conf:                       ,
		peerSource:                 .peerSource,
		candidates:                 make(map[peer.ID]*candidate),
		backoff:                    make(map[peer.ID]time.Time),
		candidateFound:             make(chan struct{}, 1),
		maybeConnectToRelayTrigger: make(chan struct{}, 1),
		maybeRequestNewCandidates:  make(chan struct{}, 1),
		triggerRunScheduledWork:    make(chan struct{}, 1),
		relays:                     make(map[peer.ID]*circuitv2.Reservation),
		relayReservationUpdated:    make(chan struct{}, 1),
		metricsTracer:              &wrappedMetricsTracer{.metricsTracer},
		emitter:                    ,
	}, nil
}

type scheduledWorkTimes struct {
	leastFrequentInterval       time.Duration
	nextRefresh                 time.Time
	nextBackoff                 time.Time
	nextOldCandidateCheck       time.Time
	nextAllowedCallToPeerSource time.Time
}

func ( *relayFinder) ( context.Context) {
	,  := .host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged), eventbus.Name("autorelay (relay finder)"), eventbus.BufSize(32))
	if  != nil {
		log.Error("failed to subscribe to the EvtPeerConnectednessChanged")
		return
	}
	defer .Close()
	for {
		select {
		case <-.Done():
			return
		case ,  := <-.Out():
			if ! {
				return
			}
			 := .(event.EvtPeerConnectednessChanged)
			if .Connectedness != network.NotConnected {
				continue
			}
			 := false

			.relayMx.Lock()
			if .usingRelay(.Peer) { // we were disconnected from a relay
				log.Debugw("disconnected from relay", "id", .Peer)
				delete(.relays, .Peer)
				.notifyMaybeConnectToRelay()
				.notifyMaybeNeedNewCandidates()
				 = true
			}
			.relayMx.Unlock()

			if  {
				.notifyRelayReservationUpdated()
				.metricsTracer.ReservationEnded(1)
			}
		}
	}
}

func ( *relayFinder) ( context.Context) {
	 := make(chan struct{}, 1)
	.refCount.Add(1)
	go func() {
		defer .refCount.Done()
		.findNodes(, )
	}()

	.refCount.Add(1)
	go func() {
		defer .refCount.Done()
		.handleNewCandidates()
	}()

	 := .conf.clock.Now()
	 := .conf.clock.InstantTimer(.Add(.conf.bootDelay))
	defer .Stop()

	// This is the least frequent event. It's our fallback timer if we don't have any other work to do.
	 := .conf.minInterval
	// Check if leastFrequentInterval is 0 to avoid busy looping
	if .conf.backoff >  ||  == 0 {
		 = .conf.backoff
	}
	if .conf.maxCandidateAge >  ||  == 0 {
		 = .conf.maxCandidateAge
	}
	if rsvpRefreshInterval >  ||  == 0 {
		 = rsvpRefreshInterval
	}

	 := &scheduledWorkTimes{
		leastFrequentInterval:       ,
		nextRefresh:                 .Add(rsvpRefreshInterval),
		nextBackoff:                 .Add(.conf.backoff),
		nextOldCandidateCheck:       .Add(.conf.maxCandidateAge),
		nextAllowedCallToPeerSource: .Add(-time.Second), // allow immediately
	}

	 := .conf.clock.InstantTimer(.runScheduledWork(, , , ))
	defer .Stop()

	go .cleanupDisconnectedPeers()

	// update addrs on starting the relay finder.
	.updateAddrs()
	for {
		select {
		case <-.candidateFound:
			.notifyMaybeConnectToRelay()
		case <-.Ch():
			.notifyMaybeConnectToRelay()
		case <-.relayReservationUpdated:
			.updateAddrs()
		case  := <-.Ch():
			// Note: `now` is not guaranteed to be the current time. It's the time
			// that the timer was fired. This is okay because we'll schedule
			// future work at a specific time.
			 := .runScheduledWork(, , , )
			.Reset()
		case <-.triggerRunScheduledWork:
			// Ignore the next time because we aren't scheduling any future work here
			_ = .runScheduledWork(, .conf.clock.Now(), , )
		case <-.Done():
			return
		}
	}
}

func ( *relayFinder) () {
	 := .circuitAddrs
	.circuitAddrs = .getCircuitAddrs()

	if areSortedAddrsDifferent(.circuitAddrs, ) {
		log.Debug("relay addresses updated", .circuitAddrs)
		.metricsTracer.RelayAddressUpdated()
		.metricsTracer.RelayAddressCount(len(.circuitAddrs))
		if  := .emitter.Emit(event.EvtAutoRelayAddrsUpdated{RelayAddrs: slices.Clone(.circuitAddrs)});  != nil {
			log.Error("failed to emit event.EvtAutoRelayAddrs with RelayAddrs", .circuitAddrs, )
		}
	}
}

// This function returns the p2p-circuit addrs for the host.
// The returned addresses are of the form <relay's-addr>/p2p/<relay's-id>/p2p-circuit.
func ( *relayFinder) () []ma.Multiaddr {
	.relayMx.Lock()
	defer .relayMx.Unlock()

	 := make([]ma.Multiaddr, 0, 4*len(.relays)+4)
	for  := range .relays {
		 := cleanupAddressSet(.host.Peerstore().Addrs())
		 := ma.StringCast(fmt.Sprintf("/p2p/%s/p2p-circuit", ))
		for ,  := range  {
			 := .Encapsulate()
			 = append(, )
		}
	}

	// Sort the addresses. We depend on this order for checking diffs to send address update events.
	slices.SortStableFunc(, func(,  ma.Multiaddr) int { return bytes.Compare(.Bytes(), .Bytes()) })
	if len() > maxRelayAddrs {
		 = [:maxRelayAddrs]
	}
	return 
}

func ( *relayFinder) ( context.Context,  time.Time,  *scheduledWorkTimes,  chan<- struct{}) time.Time {
	 := .Add(.leastFrequentInterval)

	if .After(.nextRefresh) {
		.nextRefresh = .Add(rsvpRefreshInterval)
		if .refreshReservations(, ) {
			.notifyRelayReservationUpdated()
		}
	}

	if .After(.nextBackoff) {
		.nextBackoff = .clearBackoff()
	}

	if .After(.nextOldCandidateCheck) {
		.nextOldCandidateCheck = .clearOldCandidates()
	}

	if .After(.nextAllowedCallToPeerSource) {
		select {
		case  <- struct{}{}:
			.nextAllowedCallToPeerSource = .Add(.conf.minInterval)
			if .nextAllowedCallToPeerSource.Before() {
				 = .nextAllowedCallToPeerSource
			}
		default:
		}
	} else {
		// We still need to schedule this work if it's sooner than nextTime
		if .nextAllowedCallToPeerSource.Before() {
			 = .nextAllowedCallToPeerSource
		}
	}

	// Find the next time we need to run scheduled work.
	if .nextRefresh.Before() {
		 = .nextRefresh
	}
	if .nextBackoff.Before() {
		 = .nextBackoff
	}
	if .nextOldCandidateCheck.Before() {
		 = .nextOldCandidateCheck
	}
	if .Equal() {
		// Only happens in CI with a mock clock
		 = .Add(1) // avoids an infinite loop
	}

	.metricsTracer.ScheduledWorkUpdated()

	return 
}

// clearOldCandidates clears old candidates from the map. Returns the next time
// to run this function.
func ( *relayFinder) ( time.Time) time.Time {
	// If we don't have any candidates, we should run this again in rf.conf.maxCandidateAge.
	 := .Add(.conf.maxCandidateAge)

	var  bool
	.candidateMx.Lock()
	defer .candidateMx.Unlock()
	for ,  := range .candidates {
		 := .added.Add(.conf.maxCandidateAge)
		if .After() {
			if .Before() {
				 = 
			}
		} else {
			log.Debugw("deleting candidate due to age", "id", )
			 = true
			.removeCandidate()
		}
	}
	if  {
		.notifyMaybeNeedNewCandidates()
	}

	return 
}

// clearBackoff clears old backoff entries from the map. Returns the next time
// to run this function.
func ( *relayFinder) ( time.Time) time.Time {
	 := .Add(.conf.backoff)

	.candidateMx.Lock()
	defer .candidateMx.Unlock()
	for ,  := range .backoff {
		 := .Add(.conf.backoff)
		if .After() {
			if .Before() {
				 = 
			}
		} else {
			log.Debugw("removing backoff for node", "id", )
			delete(.backoff, )
		}
	}

	return 
}

// findNodes accepts nodes from the channel and tests if they support relaying.
// It is run on both public and private nodes.
// It garbage collects old entries, so that nodes doesn't overflow.
// This makes sure that as soon as we need to find relay candidates, we have them available.
// peerSourceRateLimiter is used to limit how often we call the peer source.
func ( *relayFinder) ( context.Context,  <-chan struct{}) {
	var  <-chan peer.AddrInfo
	var  sync.WaitGroup
	for {
		.candidateMx.Lock()
		 := len(.candidates)
		.candidateMx.Unlock()

		if  == nil &&  < .conf.minCandidates {
			.metricsTracer.CandidateLoopState(peerSourceRateLimited)

			select {
			case <-:
				 = .peerSource(, .conf.maxCandidates)
				select {
				case .triggerRunScheduledWork <- struct{}{}:
				default:
				}
			case <-.Done():
				return
			}
		}

		if  == nil {
			.metricsTracer.CandidateLoopState(waitingForTrigger)
		} else {
			.metricsTracer.CandidateLoopState(waitingOnPeerChan)
		}

		select {
		case <-.maybeRequestNewCandidates:
			continue
		case ,  := <-:
			if ! {
				.Wait()
				 = nil
				continue
			}
			log.Debugw("found node", "id", .ID)
			.candidateMx.Lock()
			 := len(.candidates)
			,  := .backoff[.ID]
			.candidateMx.Unlock()
			if  {
				log.Debugw("skipping node that we recently failed to obtain a reservation with", "id", .ID, "last attempt", .conf.clock.Since())
				continue
			}
			if  >= .conf.maxCandidates {
				log.Debugw("skipping node. Already have enough candidates", "id", .ID, "num", , "max", .conf.maxCandidates)
				continue
			}
			.refCount.Add(1)
			.Add(1)
			go func() {
				defer .refCount.Done()
				defer .Done()
				if  := .handleNewNode(, );  {
					.notifyNewCandidate()
				}
			}()
		case <-.Done():
			.metricsTracer.CandidateLoopState(stopped)
			return
		}
	}
}

func ( *relayFinder) () {
	select {
	case .maybeConnectToRelayTrigger <- struct{}{}:
	default:
	}
}

func ( *relayFinder) () {
	select {
	case .maybeRequestNewCandidates <- struct{}{}:
	default:
	}
}

func ( *relayFinder) () {
	select {
	case .candidateFound <- struct{}{}:
	default:
	}
}

func ( *relayFinder) () {
	select {
	case .relayReservationUpdated <- struct{}{}:
	default:
	}
}

// handleNewNode tests if a peer supports circuit v2.
// This method is only run on private nodes.
// If a peer does, it is added to the candidates map.
// Note that just supporting the protocol doesn't guarantee that we can also obtain a reservation.
func ( *relayFinder) ( context.Context,  peer.AddrInfo) ( bool) {
	.relayMx.Lock()
	 := .usingRelay(.ID)
	.relayMx.Unlock()
	if  {
		return false
	}

	,  := context.WithTimeout(, 20*time.Second)
	defer ()
	,  := .tryNode(, )
	if  != nil {
		log.Debugf("node %s not accepted as a candidate: %s", .ID, )
		if  == errProtocolNotSupported {
			.metricsTracer.CandidateChecked(false)
		}
		return false
	}
	.metricsTracer.CandidateChecked(true)

	.candidateMx.Lock()
	if len(.candidates) > .conf.maxCandidates {
		.candidateMx.Unlock()
		return false
	}
	log.Debugw("node supports relay protocol", "peer", .ID, "supports circuit v2", )
	.addCandidate(&candidate{
		added:           .conf.clock.Now(),
		ai:              ,
		supportsRelayV2: ,
	})
	.candidateMx.Unlock()
	return true
}

var errProtocolNotSupported = errors.New("doesn't speak circuit v2")

// tryNode checks if a peer actually supports either circuit v2.
// It does not modify any internal state.
func ( *relayFinder) ( context.Context,  peer.AddrInfo) ( bool,  error) {
	if  := .host.Connect(, );  != nil {
		return false, fmt.Errorf("error connecting to relay %s: %w", .ID, )
	}

	 := .host.Network().ConnsToPeer(.ID)
	for ,  := range  {
		if isRelayAddr(.RemoteMultiaddr()) {
			return false, errors.New("not a public node")
		}
	}

	// wait for identify to complete in at least one conn so that we can check the supported protocols
	,  := .host.(interface{ () identify.IDService })
	if ! {
		// if we don't have identify, assume the peer supports relay.
		return true, nil
	}
	 := make(chan struct{}, 1)
	for ,  := range  {
		go func( network.Conn) {
			select {
			case <-.().IdentifyWait():
				select {
				case  <- struct{}{}:
				default:
				}
			case <-.Done():
			}
		}()
	}

	select {
	case <-:
	case <-.Done():
		return false, .Err()
	}

	,  := .host.Peerstore().SupportsProtocols(.ID, protoIDv2)
	if  != nil {
		return false, fmt.Errorf("error checking relay protocol support for peer %s: %w", .ID, )
	}
	if len() == 0 {
		return false, errProtocolNotSupported
	}
	return true, nil
}

// When a new node that could be a relay is found, we receive a notification on the maybeConnectToRelayTrigger chan.
// This function makes sure that we only run one instance of maybeConnectToRelay at once, and buffers
// exactly one more trigger event to run maybeConnectToRelay.
func ( *relayFinder) ( context.Context) {
	for {
		select {
		case <-.Done():
			return
		case <-.maybeConnectToRelayTrigger:
			.maybeConnectToRelay()
		}
	}
}

func ( *relayFinder) ( context.Context) {
	.relayMx.Lock()
	 := len(.relays)
	.relayMx.Unlock()
	// We're already connected to our desired number of relays. Nothing to do here.
	if  == .conf.desiredRelays {
		return
	}

	.candidateMx.Lock()
	if len(.relays) == 0 && len(.candidates) < .conf.minCandidates && .conf.clock.Since(.bootTime) < .conf.bootDelay {
		// During the startup phase, we don't want to connect to the first candidate that we find.
		// Instead, we wait until we've found at least minCandidates, and then select the best of those.
		// However, if that takes too long (longer than bootDelay), we still go ahead.
		.candidateMx.Unlock()
		return
	}
	if len(.candidates) == 0 {
		.candidateMx.Unlock()
		return
	}
	 := .selectCandidates()
	.candidateMx.Unlock()

	// We now iterate over the candidates, attempting (sequentially) to get reservations with them, until
	// we reach the desired number of relays.
	for ,  := range  {
		 := .ai.ID
		.relayMx.Lock()
		 := .usingRelay()
		.relayMx.Unlock()
		if  {
			.candidateMx.Lock()
			.removeCandidate()
			.candidateMx.Unlock()
			.notifyMaybeNeedNewCandidates()
			continue
		}
		,  := .connectToRelay(, )
		if  != nil {
			log.Debugw("failed to connect to relay", "peer", , "error", )
			.notifyMaybeNeedNewCandidates()
			.metricsTracer.ReservationRequestFinished(false, )
			continue
		}
		log.Debugw("adding new relay", "id", )
		.relayMx.Lock()
		.relays[] = 
		 := len(.relays)
		.relayMx.Unlock()
		.notifyMaybeNeedNewCandidates()

		.host.ConnManager().Protect(, autorelayTag) // protect the connection

		.notifyRelayReservationUpdated()

		.metricsTracer.ReservationRequestFinished(false, nil)

		if  >= .conf.desiredRelays {
			break
		}
	}
}

func ( *relayFinder) ( context.Context,  *candidate) (*circuitv2.Reservation, error) {
	 := .ai.ID

	,  := context.WithTimeout(, 10*time.Second)
	defer ()

	var  *circuitv2.Reservation

	// make sure we're still connected.
	if .host.Network().Connectedness() != network.Connected {
		if  := .host.Connect(, .ai);  != nil {
			.candidateMx.Lock()
			.removeCandidate(.ai.ID)
			.candidateMx.Unlock()
			return nil, fmt.Errorf("failed to connect: %w", )
		}
	}

	.candidateMx.Lock()
	.backoff[] = .conf.clock.Now()
	.candidateMx.Unlock()
	var  error
	if .supportsRelayV2 {
		,  = circuitv2.Reserve(, .host, .ai)
		if  != nil {
			 = fmt.Errorf("failed to reserve slot: %w", )
		}
	}
	.candidateMx.Lock()
	.removeCandidate()
	.candidateMx.Unlock()
	return , 
}

func ( *relayFinder) ( context.Context,  time.Time) bool {
	.relayMx.Lock()

	// find reservations about to expire and refresh them in parallel
	 := new(errgroup.Group)
	for ,  := range .relays {
		if .Add(rsvpExpirationSlack).Before(.Expiration) {
			continue
		}

		 := 
		.Go(func() error {
			 := .refreshRelayReservation(, )
			.metricsTracer.ReservationRequestFinished(true, )
			return 
		})
	}
	.relayMx.Unlock()

	 := .Wait()
	return  != nil
}

func ( *relayFinder) ( context.Context,  peer.ID) error {
	,  := circuitv2.Reserve(, .host, peer.AddrInfo{ID: })

	.relayMx.Lock()
	if  != nil {
		log.Debugw("failed to refresh relay slot reservation", "relay", , "error", )
		,  := .relays[]
		delete(.relays, )
		// unprotect the connection
		.host.ConnManager().Unprotect(, autorelayTag)
		.relayMx.Unlock()
		if  {
			.metricsTracer.ReservationEnded(1)
		}
		return 
	}

	log.Debugw("refreshed relay slot reservation", "relay", )
	.relays[] = 
	.relayMx.Unlock()
	return nil
}

// usingRelay returns if we're currently using the given relay.
func ( *relayFinder) ( peer.ID) bool {
	,  := .relays[]
	return 
}

// addCandidates adds a candidate to the candidates set. Assumes caller holds candidateMx mutex
func ( *relayFinder) ( *candidate) {
	,  := .candidates[.ai.ID]
	.candidates[.ai.ID] = 
	if ! {
		.metricsTracer.CandidateAdded(1)
	}
}

func ( *relayFinder) ( peer.ID) {
	,  := .candidates[]
	if  {
		delete(.candidates, )
		.metricsTracer.CandidateRemoved(1)
	}
}

// selectCandidates returns an ordered slice of relay candidates.
// Callers should attempt to obtain reservations with the candidates in this order.
func ( *relayFinder) () []*candidate {
	 := .conf.clock.Now()
	 := make([]*candidate, 0, len(.candidates))
	for ,  := range .candidates {
		if .added.Add(.conf.maxCandidateAge).After() {
			 = append(, )
		}
	}

	// TODO: better relay selection strategy; this just selects random relays,
	// but we should probably use ping latency as the selection metric
	rand.Shuffle(len(), func(,  int) {
		[], [] = [], []
	})
	return 
}

func ( *relayFinder) () error {
	.ctxCancelMx.Lock()
	defer .ctxCancelMx.Unlock()
	if .ctxCancel != nil {
		return errAlreadyRunning
	}
	log.Debug("starting relay finder")

	.initMetrics()

	,  := context.WithCancel(context.Background())
	.ctxCancel = 
	.refCount.Add(1)
	go func() {
		defer .refCount.Done()
		.background()
	}()
	return nil
}

func ( *relayFinder) () error {
	.ctxCancelMx.Lock()
	defer .ctxCancelMx.Unlock()
	log.Debug("stopping relay finder")
	if .ctxCancel != nil {
		.ctxCancel()
	}
	.refCount.Wait()
	.ctxCancel = nil

	.resetMetrics()
	return nil
}

func ( *relayFinder) () {
	.metricsTracer.DesiredReservations(.conf.desiredRelays)

	.relayMx.Lock()
	.metricsTracer.ReservationOpened(len(.relays))
	.relayMx.Unlock()

	.candidateMx.Lock()
	.metricsTracer.CandidateAdded(len(.candidates))
	.candidateMx.Unlock()
}

func ( *relayFinder) () {
	.relayMx.Lock()
	.metricsTracer.ReservationEnded(len(.relays))
	.relayMx.Unlock()

	.candidateMx.Lock()
	.metricsTracer.CandidateRemoved(len(.candidates))
	.candidateMx.Unlock()

	.metricsTracer.RelayAddressCount(0)
	.metricsTracer.ScheduledWorkUpdated(&scheduledWorkTimes{})
}

func areSortedAddrsDifferent(,  []ma.Multiaddr) bool {
	if len() != len() {
		return true
	}
	for ,  := range  {
		if !.Equal([]) {
			return true
		}
	}
	return false
}