package eventbus

import (
	
	
	
	
	
	

	logging 
	
)

type logInterface interface {
	Errorf(string, ...interface{})
	Warnf(string, ...interface{})
}

var log logInterface = logging.Logger("eventbus")

const slowConsumerWarningTimeout = time.Second

// /////////////////////
// BUS

// basicBus is a type-based event delivery system
type basicBus struct {
	lk            sync.RWMutex
	nodes         map[reflect.Type]*node
	wildcard      *wildcardNode
	metricsTracer MetricsTracer
}

var _ event.Bus = (*basicBus)(nil)

type emitter struct {
	n             *node
	w             *wildcardNode
	typ           reflect.Type
	closed        atomic.Bool
	dropper       func(reflect.Type)
	metricsTracer MetricsTracer
}

func ( *emitter) ( interface{}) error {
	if .closed.Load() {
		return fmt.Errorf("emitter is closed")
	}

	.n.emit()
	.w.emit()

	if .metricsTracer != nil {
		.metricsTracer.EventEmitted(.typ)
	}
	return nil
}

func ( *emitter) () error {
	if !.closed.CompareAndSwap(false, true) {
		return fmt.Errorf("closed an emitter more than once")
	}
	if .n.nEmitters.Add(-1) == 0 {
		.dropper(.typ)
	}
	return nil
}

func ( ...Option) event.Bus {
	 := &basicBus{
		nodes:    map[reflect.Type]*node{},
		wildcard: &wildcardNode{},
	}
	for ,  := range  {
		()
	}
	return 
}

func ( *basicBus) ( reflect.Type,  func(*node),  func(*node)) {
	.lk.Lock()

	,  := .nodes[]
	if ! {
		 = newNode(, .metricsTracer)
		.nodes[] = 
	}

	.lk.Lock()
	.lk.Unlock()

	()

	if  == nil {
		.lk.Unlock()
	} else {
		go func() {
			defer .lk.Unlock()
			()
		}()
	}
}

func ( *basicBus) ( reflect.Type) {
	.lk.Lock()
	,  := .nodes[]
	if ! { // already dropped
		.lk.Unlock()
		return
	}

	.lk.Lock()
	if .nEmitters.Load() > 0 || len(.sinks) > 0 {
		.lk.Unlock()
		.lk.Unlock()
		return // still in use
	}
	.lk.Unlock()

	delete(.nodes, )
	.lk.Unlock()
}

type wildcardSub struct {
	ch            chan interface{}
	w             *wildcardNode
	metricsTracer MetricsTracer
	name          string
	closeOnce     sync.Once
}

func ( *wildcardSub) () <-chan interface{} {
	return .ch
}

func ( *wildcardSub) () error {
	.closeOnce.Do(func() {
		.w.removeSink(.ch)
		if .metricsTracer != nil {
			.metricsTracer.RemoveSubscriber(reflect.TypeOf(event.WildcardSubscription))
		}
	})

	return nil
}

func ( *wildcardSub) () string {
	return .name
}

type namedSink struct {
	name string
	ch   chan interface{}
}

type sub struct {
	ch            chan interface{}
	nodes         []*node
	dropper       func(reflect.Type)
	metricsTracer MetricsTracer
	name          string
	closeOnce     sync.Once
}

func ( *sub) () string {
	return .name
}

func ( *sub) () <-chan interface{} {
	return .ch
}

func ( *sub) () error {
	go func() {
		// drain the event channel, will return when closed and drained.
		// this is necessary to unblock publishes to this channel.
		for range .ch {
		}
	}()
	.closeOnce.Do(func() {
		for ,  := range .nodes {
			.lk.Lock()

			for  := 0;  < len(.sinks); ++ {
				if .sinks[].ch == .ch {
					.sinks[], .sinks[len(.sinks)-1] = .sinks[len(.sinks)-1], nil
					.sinks = .sinks[:len(.sinks)-1]

					if .metricsTracer != nil {
						.metricsTracer.RemoveSubscriber(.typ)
					}
					break
				}
			}

			 := len(.sinks) == 0 && .nEmitters.Load() == 0

			.lk.Unlock()

			if  {
				.dropper(.typ)
			}
		}
		close(.ch)
	})
	return nil
}

var _ event.Subscription = (*sub)(nil)

// Subscribe creates new subscription. Failing to drain the channel will cause
// publishers to get blocked. CancelFunc is guaranteed to return after last send
// to the channel
func ( *basicBus) ( interface{},  ...event.SubscriptionOpt) ( event.Subscription,  error) {
	 := newSubSettings()
	for ,  := range  {
		if  := (&);  != nil {
			return nil, 
		}
	}

	if  == event.WildcardSubscription {
		 := &wildcardSub{
			ch:            make(chan interface{}, .buffer),
			w:             .wildcard,
			metricsTracer: .metricsTracer,
			name:          .name,
		}
		.wildcard.addSink(&namedSink{ch: .ch, name: .name})
		return , nil
	}

	,  := .([]interface{})
	if ! {
		 = []interface{}{}
	}

	if len() > 1 {
		for ,  := range  {
			if  == event.WildcardSubscription {
				return nil, fmt.Errorf("wildcard subscriptions must be started separately")
			}
		}
	}

	 := &sub{
		ch:    make(chan interface{}, .buffer),
		nodes: make([]*node, len()),

		dropper:       .tryDropNode,
		metricsTracer: .metricsTracer,
		name:          .name,
	}

	for ,  := range  {
		if reflect.TypeOf().Kind() != reflect.Ptr {
			return nil, errors.New("subscribe called with non-pointer type")
		}
	}

	for ,  := range  {
		 := reflect.TypeOf()

		.withNode(.Elem(), func( *node) {
			.sinks = append(.sinks, &namedSink{ch: .ch, name: .name})
			.nodes[] = 
			if .metricsTracer != nil {
				.metricsTracer.AddSubscriber(.Elem())
			}
		}, func( *node) {
			if .keepLast {
				 := .last
				if  == nil {
					return
				}
				.ch <- 
			}
		})
	}

	return , nil
}

// Emitter creates new emitter
//
// eventType accepts typed nil pointers, and uses the type information to
// select output type
//
// Example:
// emit, err := eventbus.Emitter(new(EventT))
// defer emit.Close() // MUST call this after being done with the emitter
//
// emit(EventT{})
func ( *basicBus) ( interface{},  ...event.EmitterOpt) ( event.Emitter,  error) {
	if  == event.WildcardSubscription {
		return nil, fmt.Errorf("illegal emitter for wildcard subscription")
	}

	var  emitterSettings
	for ,  := range  {
		if  := (&);  != nil {
			return nil, 
		}
	}

	 := reflect.TypeOf()
	if .Kind() != reflect.Ptr {
		return nil, errors.New("emitter called with non-pointer type")
	}
	 = .Elem()

	.withNode(, func( *node) {
		.nEmitters.Add(1)
		.keepLast = .keepLast || .makeStateful
		 = &emitter{n: , typ: , dropper: .tryDropNode, w: .wildcard, metricsTracer: .metricsTracer}
	}, nil)
	return
}

// GetAllEventTypes returns all the event types that this bus has emitters
// or subscribers for.
func ( *basicBus) () []reflect.Type {
	.lk.RLock()
	defer .lk.RUnlock()

	 := make([]reflect.Type, 0, len(.nodes))
	for  := range .nodes {
		 = append(, )
	}
	return 
}

// /////////////////////
// NODE

type wildcardNode struct {
	sync.RWMutex
	nSinks        atomic.Int32
	sinks         []*namedSink
	metricsTracer MetricsTracer

	slowConsumerTimer *time.Timer
}

func ( *wildcardNode) ( *namedSink) {
	.nSinks.Add(1) // ok to do outside the lock
	.Lock()
	.sinks = append(.sinks, )
	.Unlock()

	if .metricsTracer != nil {
		.metricsTracer.AddSubscriber(reflect.TypeOf(event.WildcardSubscription))
	}
}

func ( *wildcardNode) ( chan interface{}) {
	go func() {
		// drain the event channel, will return when closed and drained.
		// this is necessary to unblock publishes to this channel.
		for range  {
		}
	}()
	.nSinks.Add(-1) // ok to do outside the lock
	.Lock()
	for  := 0;  < len(.sinks); ++ {
		if .sinks[].ch ==  {
			.sinks[], .sinks[len(.sinks)-1] = .sinks[len(.sinks)-1], nil
			.sinks = .sinks[:len(.sinks)-1]
			break
		}
	}
	.Unlock()
}

var wildcardType = reflect.TypeOf(event.WildcardSubscription)

func ( *wildcardNode) ( interface{}) {
	if .nSinks.Load() == 0 {
		return
	}

	.RLock()
	for ,  := range .sinks {

		// Sending metrics before sending on channel allows us to
		// record channel full events before blocking
		sendSubscriberMetrics(.metricsTracer, )

		select {
		case .ch <- :
		default:
			 := emitAndLogError(.slowConsumerTimer, wildcardType, , )
			defer func() {
				.Lock()
				.slowConsumerTimer = 
				.Unlock()
			}()
		}
	}
	.RUnlock()
}

type node struct {
	// Note: make sure to NEVER lock basicBus.lk when this lock is held
	lk sync.Mutex

	typ reflect.Type

	// emitter ref count
	nEmitters atomic.Int32

	keepLast bool
	last     interface{}

	sinks         []*namedSink
	metricsTracer MetricsTracer

	slowConsumerTimer *time.Timer
}

func newNode( reflect.Type,  MetricsTracer) *node {
	return &node{
		typ:           ,
		metricsTracer: ,
	}
}

func ( *node) ( interface{}) {
	 := reflect.TypeOf()
	if  != .typ {
		panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", .typ, ))
	}

	.lk.Lock()
	if .keepLast {
		.last = 
	}

	for ,  := range .sinks {

		// Sending metrics before sending on channel allows us to
		// record channel full events before blocking
		sendSubscriberMetrics(.metricsTracer, )
		select {
		case .ch <- :
		default:
			.slowConsumerTimer = emitAndLogError(.slowConsumerTimer, .typ, , )
		}
	}
	.lk.Unlock()
}

func emitAndLogError( *time.Timer,  reflect.Type,  interface{},  *namedSink) *time.Timer {
	// Slow consumer. Log a warning if stalled for the timeout
	if  == nil {
		 = time.NewTimer(slowConsumerWarningTimeout)
	} else {
		.Reset(slowConsumerWarningTimeout)
	}

	select {
	case .ch <- :
		if !.Stop() {
			<-.C
		}
	case <-.C:
		log.Warnf("subscriber named \"%s\" is a slow consumer of %s. This can lead to libp2p stalling and hard to debug issues.", .name, )
		// Continue to stall since there's nothing else we can do.
		.ch <- 
	}

	return 
}

func sendSubscriberMetrics( MetricsTracer,  *namedSink) {
	if  != nil {
		.SubscriberQueueLength(.name, len(.ch)+1)
		.SubscriberQueueFull(.name, len(.ch)+1 >= cap(.ch))
		.SubscriberEventQueued(.name)
	}
}