package pubsub

import (
	
	
	

	
	
	
	discimpl 
)

var (
	// poll interval

	// DiscoveryPollInitialDelay is how long the discovery system waits after it first starts before polling
	DiscoveryPollInitialDelay = 0 * time.Millisecond
	// DiscoveryPollInterval is approximately how long the discovery system waits in between checks for whether the
	// more peers are needed for any topic
	DiscoveryPollInterval = 1 * time.Second
)

// interval at which to retry advertisements when they fail.
const discoveryAdvertiseRetryInterval = 2 * time.Minute

type DiscoverOpt func(*discoverOptions) error

type discoverOptions struct {
	connFactory BackoffConnectorFactory
	opts        []discovery.Option
}

func defaultDiscoverOptions() *discoverOptions {
	 := rand.NewSource(rand.Int63())
	,  := time.Second*10, time.Hour
	 := 100
	 := time.Minute * 2
	 := &discoverOptions{
		connFactory: func( host.Host) (*discimpl.BackoffConnector, error) {
			 := discimpl.NewExponentialBackoff(, , discimpl.FullJitter, time.Second, 5.0, 0, rand.New())
			return discimpl.NewBackoffConnector(, , , )
		},
	}

	return 
}

// discover represents the discovery pipeline.
// The discovery pipeline handles advertising and discovery of peers
type discover struct {
	p *PubSub

	// discovery assists in discovering and advertising peers for a topic
	discovery discovery.Discovery

	// advertising tracks which topics are being advertised
	advertising map[string]context.CancelFunc

	// discoverQ handles continuing peer discovery
	discoverQ chan *discoverReq

	// ongoing tracks ongoing discovery requests
	ongoing map[string]struct{}

	// done handles completion of a discovery request
	done chan string

	// connector handles connecting to new peers found via discovery
	connector *discimpl.BackoffConnector

	// options are the set of options to be used to complete struct construction in Start
	options *discoverOptions
}

// MinTopicSize returns a function that checks if a router is ready for publishing based on the topic size.
// The router ultimately decides the whether it is ready or not, the given size is just a suggestion. Note
// that the topic size does not include the router in the count.
func ( int) RouterReady {
	return func( PubSubRouter,  string) (bool, error) {
		return .EnoughPeers(, ), nil
	}
}

// Start attaches the discovery pipeline to a pubsub instance, initializes discovery and starts event loop
func ( *discover) ( *PubSub,  ...DiscoverOpt) error {
	if .discovery == nil ||  == nil {
		return nil
	}

	.p = 
	.advertising = make(map[string]context.CancelFunc)
	.discoverQ = make(chan *discoverReq, 32)
	.ongoing = make(map[string]struct{})
	.done = make(chan string)

	,  := .options.connFactory(.host)
	if  != nil {
		return 
	}
	.connector = 

	go .discoverLoop()
	go .pollTimer()

	return nil
}

func ( *discover) () {
	select {
	case <-time.After(DiscoveryPollInitialDelay):
	case <-.p.ctx.Done():
		return
	}

	select {
	case .p.eval <- .requestDiscovery:
	case <-.p.ctx.Done():
		return
	}

	 := time.NewTicker(DiscoveryPollInterval)
	defer .Stop()

	for {
		select {
		case <-.C:
			select {
			case .p.eval <- .requestDiscovery:
			case <-.p.ctx.Done():
				return
			}
		case <-.p.ctx.Done():
			return
		}
	}
}

func ( *discover) () {
	for  := range .p.myTopics {
		if !.p.rt.EnoughPeers(, 0) {
			.discoverQ <- &discoverReq{topic: , done: make(chan struct{}, 1)}
		}
	}
}

func ( *discover) () {
	for {
		select {
		case  := <-.discoverQ:
			 := .topic

			if ,  := .ongoing[];  {
				.done <- struct{}{}
				continue
			}

			.ongoing[] = struct{}{}

			go func() {
				.handleDiscovery(.p.ctx, , .opts)
				select {
				case .done <- :
				case <-.p.ctx.Done():
				}
				.done <- struct{}{}
			}()
		case  := <-.done:
			delete(.ongoing, )
		case <-.p.ctx.Done():
			return
		}
	}
}

// Advertise advertises this node's interest in a topic to a discovery service. Advertise is not thread-safe.
func ( *discover) ( string) {
	if .discovery == nil {
		return
	}

	,  := context.WithCancel(.p.ctx)

	if ,  := .advertising[];  {
		()
		return
	}
	.advertising[] = 

	go func() {
		,  := .discovery.Advertise(, )
		if  != nil {
			log.Warnf("bootstrap: error providing rendezvous for %s: %s", , .Error())
			if  == 0 {
				 = discoveryAdvertiseRetryInterval
			}
		}

		 := time.NewTimer()
		defer .Stop()

		for .Err() == nil {
			select {
			case <-.C:
				,  = .discovery.Advertise(, )
				if  != nil {
					log.Warnf("bootstrap: error providing rendezvous for %s: %s", , .Error())
					if  == 0 {
						 = discoveryAdvertiseRetryInterval
					}
				}
				.Reset()
			case <-.Done():
				return
			}
		}
	}()
}

// StopAdvertise stops advertising this node's interest in a topic. StopAdvertise is not thread-safe.
func ( *discover) ( string) {
	if .discovery == nil {
		return
	}

	if ,  := .advertising[];  {
		()
		delete(.advertising, )
	}
}

// Discover searches for additional peers interested in a given topic
func ( *discover) ( string,  ...discovery.Option) {
	if .discovery == nil {
		return
	}

	.discoverQ <- &discoverReq{, , make(chan struct{}, 1)}
}

// Bootstrap attempts to bootstrap to a given topic. Returns true if bootstrapped successfully, false otherwise.
func ( *discover) ( context.Context,  string,  RouterReady,  ...discovery.Option) bool {
	if .discovery == nil {
		return true
	}

	 := time.NewTimer(time.Hour)
	if !.Stop() {
		<-.C
	}
	defer .Stop()

	for {
		// Check if ready for publishing
		 := make(chan bool, 1)
		select {
		case .p.eval <- func() {
			,  := (.p.rt, )
			 <- 
		}:
			if <- {
				return true
			}
		case <-.p.ctx.Done():
			return false
		case <-.Done():
			return false
		}

		// If not ready discover more peers
		 := &discoverReq{, , make(chan struct{}, 1)}
		select {
		case .discoverQ <- :
		case <-.p.ctx.Done():
			return false
		case <-.Done():
			return false
		}

		select {
		case <-.done:
		case <-.p.ctx.Done():
			return false
		case <-.Done():
			return false
		}

		.Reset(time.Millisecond * 100)
		select {
		case <-.C:
		case <-.p.ctx.Done():
			return false
		case <-.Done():
			return false
		}
	}
}

func ( *discover) ( context.Context,  string,  []discovery.Option) {
	,  := context.WithTimeout(, time.Second*10)
	defer ()

	,  := .discovery.FindPeers(, , ...)
	if  != nil {
		log.Debugf("error finding peers for topic %s: %v", , )
		return
	}

	.connector.Connect(, )
}

type discoverReq struct {
	topic string
	opts  []discovery.Option
	done  chan struct{}
}

type pubSubDiscovery struct {
	discovery.Discovery
	opts []discovery.Option
}

func ( *pubSubDiscovery) ( context.Context,  string,  ...discovery.Option) (time.Duration, error) {
	return .Discovery.Advertise(, "floodsub:"+, append(, .opts...)...)
}

func ( *pubSubDiscovery) ( context.Context,  string,  ...discovery.Option) (<-chan peer.AddrInfo, error) {
	return .Discovery.FindPeers(, "floodsub:"+, append(, .opts...)...)
}

// WithDiscoveryOpts passes libp2p Discovery options into the PubSub discovery subsystem
func ( ...discovery.Option) DiscoverOpt {
	return func( *discoverOptions) error {
		.opts = 
		return nil
	}
}

// BackoffConnectorFactory creates a BackoffConnector that is attached to a given host
type BackoffConnectorFactory func(host host.Host) (*discimpl.BackoffConnector, error)

// WithDiscoverConnector adds a custom connector that deals with how the discovery subsystem connects to peers
func ( BackoffConnectorFactory) DiscoverOpt {
	return func( *discoverOptions) error {
		.connFactory = 
		return nil
	}
}