package pubsub

import (
	
	
	
	
	

	pb 
	

	
)

// ErrTopicClosed is returned if a Topic is utilized after it has been closed
var ErrTopicClosed = errors.New("this Topic is closed, try opening a new one")

// ErrNilSignKey is returned if a nil private key was provided
var ErrNilSignKey = errors.New("nil sign key")

// ErrEmptyPeerID is returned if an empty peer ID was provided
var ErrEmptyPeerID = errors.New("empty peer ID")

// Topic is the handle for a pubsub topic
type Topic struct {
	p     *PubSub
	topic string

	evtHandlerMux sync.RWMutex
	evtHandlers   map[*TopicEventHandler]struct{}

	mux    sync.RWMutex
	closed bool
}

// String returns the topic associated with t
func ( *Topic) () string {
	return .topic
}

// SetScoreParams sets the topic score parameters if the pubsub router supports peer
// scoring
func ( *Topic) ( *TopicScoreParams) error {
	 := .validate()
	if  != nil {
		return fmt.Errorf("invalid topic score parameters: %w", )
	}

	.mux.Lock()
	defer .mux.Unlock()

	if .closed {
		return ErrTopicClosed
	}

	 := make(chan error, 1)
	 := func() {
		,  := .p.rt.(*GossipSubRouter)
		if ! {
			 <- fmt.Errorf("pubsub router is not gossipsub")
			return
		}

		if .score == nil {
			 <- fmt.Errorf("peer scoring is not enabled in router")
			return
		}

		 := .score.SetTopicScoreParams(.topic, )
		 <- 
	}

	select {
	case .p.eval <- :
		 = <-
		return 

	case <-.p.ctx.Done():
		return .p.ctx.Err()
	}
}

// EventHandler creates a handle for topic specific events
// Multiple event handlers may be created and will operate independently of each other
func ( *Topic) ( ...TopicEventHandlerOpt) (*TopicEventHandler, error) {
	.mux.RLock()
	defer .mux.RUnlock()
	if .closed {
		return nil, ErrTopicClosed
	}

	 := &TopicEventHandler{
		topic: ,
		err:   nil,

		evtLog:   make(map[peer.ID]EventType),
		evtLogCh: make(chan struct{}, 1),
	}

	for ,  := range  {
		 := ()
		if  != nil {
			return nil, 
		}
	}

	 := make(chan struct{}, 1)

	select {
	case .p.eval <- func() {
		 := .p.topics[.topic]
		for  := range  {
			.evtLog[] = PeerJoin
		}

		.evtHandlerMux.Lock()
		.evtHandlers[] = struct{}{}
		.evtHandlerMux.Unlock()
		 <- struct{}{}
	}:
	case <-.p.ctx.Done():
		return nil, .p.ctx.Err()
	}

	<-

	return , nil
}

func ( *Topic) ( PeerEvent) {
	.evtHandlerMux.RLock()
	defer .evtHandlerMux.RUnlock()

	for  := range .evtHandlers {
		.sendNotification()
	}
}

// Subscribe returns a new Subscription for the topic.
// Note that subscription is not an instantaneous operation. It may take some time
// before the subscription is processed by the pubsub main loop and propagated to our peers.
func ( *Topic) ( ...SubOpt) (*Subscription, error) {
	.mux.RLock()
	defer .mux.RUnlock()
	if .closed {
		return nil, ErrTopicClosed
	}

	 := &Subscription{
		topic: .topic,
		ctx:   .p.ctx,
	}

	for ,  := range  {
		 := ()
		if  != nil {
			return nil, 
		}
	}

	if .ch == nil {
		// apply the default size
		.ch = make(chan *Message, 32)
	}

	 := make(chan *Subscription, 1)

	.p.disc.Discover(.topic)

	select {
	case .p.addSub <- &addSubReq{
		sub:  ,
		resp: ,
	}:
	case <-.p.ctx.Done():
		return nil, .p.ctx.Err()
	}

	return <-, nil
}

// Relay enables message relaying for the topic and returns a reference
// cancel function. Subsequent calls increase the reference counter.
// To completely disable the relay, all references must be cancelled.
func ( *Topic) () (RelayCancelFunc, error) {
	.mux.RLock()
	defer .mux.RUnlock()
	if .closed {
		return nil, ErrTopicClosed
	}

	 := make(chan RelayCancelFunc, 1)

	.p.disc.Discover(.topic)

	select {
	case .p.addRelay <- &addRelayReq{
		topic: .topic,
		resp:  ,
	}:
	case <-.p.ctx.Done():
		return nil, .p.ctx.Err()
	}

	return <-, nil
}

// RouterReady is a function that decides if a router is ready to publish
type RouterReady func(rt PubSubRouter, topic string) (bool, error)

// ProvideKey is a function that provides a private key and its associated peer ID when publishing a new message
type ProvideKey func() (crypto.PrivKey, peer.ID)

type PublishOptions struct {
	ready     RouterReady
	customKey ProvideKey
	local     bool
}

type PubOpt func(pub *PublishOptions) error

// Publish publishes data to topic.
func ( *Topic) ( context.Context,  []byte,  ...PubOpt) error {
	.mux.RLock()
	defer .mux.RUnlock()
	if .closed {
		return ErrTopicClosed
	}

	 := .p.signID
	 := .p.signKey

	 := &PublishOptions{}
	for ,  := range  {
		 := ()
		if  != nil {
			return 
		}
	}

	if .customKey != nil && !.local {
		,  = .customKey()
		if  == nil {
			return ErrNilSignKey
		}
		if len() == 0 {
			return ErrEmptyPeerID
		}
	}

	 := &pb.Message{
		Data:  ,
		Topic: &.topic,
		From:  nil,
		Seqno: nil,
	}
	if  != "" {
		.From = []byte()
		.Seqno = .p.nextSeqno()
	}
	if  != nil {
		.From = []byte()
		 := signMessage(, , )
		if  != nil {
			return 
		}
	}

	if .ready != nil {
		if .p.disc.discovery != nil {
			.p.disc.Bootstrap(, .topic, .ready)
		} else {
			// TODO: we could likely do better than polling every 200ms.
			// For example, block this goroutine on a channel,
			// and check again whenever events tell us that the number of
			// peers has increased.
			var  *time.Ticker
		:
			for {
				// Check if ready for publishing.
				// Similar to what disc.Bootstrap does.
				 := make(chan bool, 1)
				select {
				case .p.eval <- func() {
					,  := .ready(.p.rt, .topic)
					 <- 
				}:
					if <- {
						break 
					}
				case <-.p.ctx.Done():
					return .p.ctx.Err()
				case <-.Done():
					return .Err()
				}
				if  == nil {
					 = time.NewTicker(200 * time.Millisecond)
					defer .Stop()
				}

				select {
				case <-.C:
				case <-.Done():
					return fmt.Errorf("router is not ready: %w", .Err())
				}
			}
		}
	}

	return .p.val.PushLocal(&Message{, "", .p.host.ID(), nil, .local})
}

// WithReadiness returns a publishing option for only publishing when the router is ready.
// This option is not useful unless PubSub is also using WithDiscovery
func ( RouterReady) PubOpt {
	return func( *PublishOptions) error {
		.ready = 
		return nil
	}
}

// WithLocalPublication returns a publishing option to notify in-process subscribers only.
// It prevents message publication to mesh peers.
// Useful in edge cases where the msg needs to be only delivered to the in-process subscribers,
// e.g. not to spam the network with outdated msgs.
// Should not be used specifically for in-process pubsubing.
func ( bool) PubOpt {
	return func( *PublishOptions) error {
		.local = 
		return nil
	}
}

// WithSecretKeyAndPeerId returns a publishing option for providing a custom private key and its corresponding peer ID
// This option is useful when we want to send messages from "virtual", never-connectable peers in the network
func ( crypto.PrivKey,  peer.ID) PubOpt {
	return func( *PublishOptions) error {
		.customKey = func() (crypto.PrivKey, peer.ID) {
			return , 
		}

		return nil
	}
}

// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
// Does not error if the topic is already closed.
func ( *Topic) () error {
	.mux.Lock()
	defer .mux.Unlock()
	if .closed {
		return nil
	}

	 := &rmTopicReq{, make(chan error, 1)}

	select {
	case .p.rmTopic <- :
	case <-.p.ctx.Done():
		return .p.ctx.Err()
	}

	 := <-.resp

	if  == nil {
		.closed = true
	}

	return 
}

// ListPeers returns a list of peers we are connected to in the given topic.
func ( *Topic) () []peer.ID {
	.mux.RLock()
	defer .mux.RUnlock()
	if .closed {
		return []peer.ID{}
	}

	return .p.ListPeers(.topic)
}

type EventType int

const (
	PeerJoin EventType = iota
	PeerLeave
)

// TopicEventHandler is used to manage topic specific events. No Subscription is required to receive events.
type TopicEventHandler struct {
	topic *Topic
	err   error

	evtLogMx sync.Mutex
	evtLog   map[peer.ID]EventType
	evtLogCh chan struct{}
}

type TopicEventHandlerOpt func(t *TopicEventHandler) error

type PeerEvent struct {
	Type EventType
	Peer peer.ID
}

// Cancel closes the topic event handler
func ( *TopicEventHandler) () {
	 := .topic
	.err = fmt.Errorf("topic event handler cancelled by calling handler.Cancel()")

	.evtHandlerMux.Lock()
	delete(.evtHandlers, )
	.topic.evtHandlerMux.Unlock()
}

func ( *TopicEventHandler) ( PeerEvent) {
	.evtLogMx.Lock()
	.addToEventLog()
	.evtLogMx.Unlock()
}

// addToEventLog assumes a lock has been taken to protect the event log
func ( *TopicEventHandler) ( PeerEvent) {
	,  := .evtLog[.Peer]
	if ! {
		.evtLog[.Peer] = .Type
		// send signal that an event has been added to the event log
		select {
		case .evtLogCh <- struct{}{}:
		default:
		}
	} else if  != .Type {
		delete(.evtLog, .Peer)
	}
}

// pullFromEventLog assumes a lock has been taken to protect the event log
func ( *TopicEventHandler) () (PeerEvent, bool) {
	for ,  := range .evtLog {
		 := PeerEvent{Peer: , Type: }
		delete(.evtLog, )
		return , true
	}
	return PeerEvent{}, false
}

// NextPeerEvent returns the next event regarding subscribed peers
// Guarantees: Peer Join and Peer Leave events for a given peer will fire in order.
// Unless a peer both Joins and Leaves before NextPeerEvent emits either event
// all events will eventually be received from NextPeerEvent.
func ( *TopicEventHandler) ( context.Context) (PeerEvent, error) {
	for {
		.evtLogMx.Lock()
		,  := .pullFromEventLog()
		if  {
			// make sure an event log signal is available if there are events in the event log
			if len(.evtLog) > 0 {
				select {
				case .evtLogCh <- struct{}{}:
				default:
				}
			}
			.evtLogMx.Unlock()
			return , nil
		}
		.evtLogMx.Unlock()

		select {
		case <-.evtLogCh:
			continue
		case <-.Done():
			return PeerEvent{}, .Err()
		}
	}
}