package pubsub

import (
	
	
	
	
	

	
)

const (
	defaultValidateQueueSize   = 32
	defaultValidateConcurrency = 1024
	defaultValidateThrottle    = 8192
)

// ValidationError is an error that may be signalled from message publication when the message
// fails validation
type ValidationError struct {
	Reason string
}

func ( ValidationError) () string {
	return .Reason
}

// Validator is a function that validates a message with a binary decision: accept or reject.
type Validator func(context.Context, peer.ID, *Message) bool

// ValidatorEx is an extended validation function that validates a message with an enumerated decision
type ValidatorEx func(context.Context, peer.ID, *Message) ValidationResult

// ValidationResult represents the decision of an extended validator
type ValidationResult int

const (
	// ValidationAccept is a validation decision that indicates a valid message that should be accepted and
	// delivered to the application and forwarded to the network.
	ValidationAccept = ValidationResult(0)
	// ValidationReject is a validation decision that indicates an invalid message that should not be
	// delivered to the application or forwarded to the application. Furthermore the peer that forwarded
	// the message should be penalized by peer scoring routers.
	ValidationReject = ValidationResult(1)
	// ValidationIgnore is a validation decision that indicates a message that should be ignored: it will
	// be neither delivered to the application nor forwarded to the network. However, in contrast to
	// ValidationReject, the peer that forwarded the message must not be penalized by peer scoring routers.
	ValidationIgnore = ValidationResult(2)
	// internal
	validationThrottled = ValidationResult(-1)
)

// ValidatorOpt is an option for RegisterTopicValidator.
type ValidatorOpt func(addVal *addValReq) error

// validation represents the validator pipeline.
// The validator pipeline performs signature validation and runs a
// sequence of user-configured validators per-topic. It is possible to
// adjust various concurrency parameters, such as the number of
// workers and the max number of simultaneous validations. The user
// can also attach inline validators that will be executed
// synchronously; this may be useful to prevent superfluous
// context-switching for lightweight tasks.
type validation struct {
	p *PubSub

	tracer *pubsubTracer

	// mx protects the validator map
	mx sync.Mutex
	// topicVals tracks per topic validators
	topicVals map[string]*validatorImpl

	// defaultVals tracks default validators applicable to all topics
	defaultVals []*validatorImpl

	// validateQ is the front-end to the validation pipeline
	validateQ chan *validateReq

	// validateThrottle limits the number of active validation goroutines
	validateThrottle chan struct{}

	// this is the number of synchronous validation workers
	validateWorkers int
}

// validation requests
type validateReq struct {
	vals []*validatorImpl
	src  peer.ID
	msg  *Message
}

// representation of topic validators
type validatorImpl struct {
	topic            string
	validate         ValidatorEx
	validateTimeout  time.Duration
	validateThrottle chan struct{}
	validateInline   bool
}

// async request to add a topic validators
type addValReq struct {
	topic    string
	validate interface{}
	timeout  time.Duration
	throttle int
	inline   bool
	resp     chan error
}

// async request to remove a topic validator
type rmValReq struct {
	topic string
	resp  chan error
}

// newValidation creates a new validation pipeline
func newValidation() *validation {
	return &validation{
		topicVals:        make(map[string]*validatorImpl),
		validateQ:        make(chan *validateReq, defaultValidateQueueSize),
		validateThrottle: make(chan struct{}, defaultValidateThrottle),
		validateWorkers:  runtime.NumCPU(),
	}
}

// Start attaches the validation pipeline to a pubsub instance and starts background
// workers
func ( *validation) ( *PubSub) {
	.p = 
	.tracer = .tracer
	for  := 0;  < .validateWorkers; ++ {
		go .validateWorker()
	}
}

// AddValidator adds a new validator
func ( *validation) ( *addValReq) {
	,  := .makeValidator()
	if  != nil {
		.resp <- 
		return
	}

	.mx.Lock()
	defer .mx.Unlock()

	 := .topic

	,  := .topicVals[]
	if  {
		.resp <- fmt.Errorf("duplicate validator for topic %s", )
		return
	}

	.topicVals[] = 
	.resp <- nil
}

func ( *validation) ( *addValReq) (*validatorImpl, error) {
	 := func( Validator) ValidatorEx {
		return func( context.Context,  peer.ID,  *Message) ValidationResult {
			if (, , ) {
				return ValidationAccept
			} else {
				return ValidationReject
			}
		}
	}

	var  ValidatorEx
	switch v := .validate.(type) {
	case func( context.Context,  peer.ID,  *Message) bool:
		 = (Validator())
	case Validator:
		 = ()

	case func( context.Context,  peer.ID,  *Message) ValidationResult:
		 = ValidatorEx()
	case ValidatorEx:
		 = 

	default:
		 := .topic
		if .topic == "" {
			 = "(default)"
		}
		return nil, fmt.Errorf("unknown validator type for topic %s; must be an instance of Validator or ValidatorEx", )
	}

	 := &validatorImpl{
		topic:            .topic,
		validate:         ,
		validateTimeout:  0,
		validateThrottle: make(chan struct{}, defaultValidateConcurrency),
		validateInline:   .inline,
	}

	if .timeout > 0 {
		.validateTimeout = .timeout
	}

	if .throttle > 0 {
		.validateThrottle = make(chan struct{}, .throttle)
	}

	return , nil
}

// RemoveValidator removes an existing validator
func ( *validation) ( *rmValReq) {
	.mx.Lock()
	defer .mx.Unlock()

	 := .topic

	,  := .topicVals[]
	if  {
		delete(.topicVals, )
		.resp <- nil
	} else {
		.resp <- fmt.Errorf("no validator for topic %s", )
	}
}

// PushLocal synchronously pushes a locally published message and performs applicable
// validations.
// Returns an error if validation fails
func ( *validation) ( *Message) error {
	.p.tracer.PublishMessage()

	 := .p.checkSigningPolicy()
	if  != nil {
		return 
	}

	 := .getValidators()
	return .validate(, .ReceivedFrom, , true)
}

// Push pushes a message into the validation pipeline.
// It returns true if the message can be forwarded immediately without validation.
func ( *validation) ( peer.ID,  *Message) bool {
	 := .getValidators()

	if len() > 0 || .Signature != nil {
		select {
		case .validateQ <- &validateReq{, , }:
		default:
			log.Debugf("message validation throttled: queue full; dropping message from %s", )
			.tracer.RejectMessage(, RejectValidationQueueFull)
		}
		return false
	}

	return true
}

// getValidators returns all validators that apply to a given message
func ( *validation) ( *Message) []*validatorImpl {
	.mx.Lock()
	defer .mx.Unlock()

	var  []*validatorImpl
	 = append(, .defaultVals...)

	 := .GetTopic()

	,  := .topicVals[]
	if ! {
		return 
	}

	return append(, )
}

// validateWorker is an active goroutine performing inline validation
func ( *validation) () {
	for {
		select {
		case  := <-.validateQ:
			.validate(.vals, .src, .msg, false)
		case <-.p.ctx.Done():
			return
		}
	}
}

// validate performs validation and only sends the message if all validators succeed
func ( *validation) ( []*validatorImpl,  peer.ID,  *Message,  bool) error {
	// If signature verification is enabled, but signing is disabled,
	// the Signature is required to be nil upon receiving the message in PubSub.pushMsg.
	if .Signature != nil {
		if !.validateSignature() {
			log.Debugf("message signature validation failed; dropping message from %s", )
			.tracer.RejectMessage(, RejectInvalidSignature)
			return ValidationError{Reason: RejectInvalidSignature}
		}
	}

	// we can mark the message as seen now that we have verified the signature
	// and avoid invoking user validators more than once
	 := .p.idGen.ID()
	if !.p.markSeen() {
		.tracer.DuplicateMessage()
		return nil
	} else {
		.tracer.ValidateMessage()
	}

	var ,  []*validatorImpl
	for ,  := range  {
		if .validateInline ||  {
			 = append(, )
		} else {
			 = append(, )
		}
	}

	// apply inline (synchronous) validators
	 := ValidationAccept
:
	for ,  := range  {
		switch .validateMsg(.p.ctx, , ) {
		case ValidationAccept:
		case ValidationReject:
			 = ValidationReject
			break 
		case ValidationIgnore:
			 = ValidationIgnore
		}
	}

	if  == ValidationReject {
		log.Debugf("message validation failed; dropping message from %s", )
		.tracer.RejectMessage(, RejectValidationFailed)
		return ValidationError{Reason: RejectValidationFailed}
	}

	// apply async validators
	if len() > 0 {
		select {
		case .validateThrottle <- struct{}{}:
			go func() {
				.doValidateTopic(, , , )
				<-.validateThrottle
			}()
		default:
			log.Debugf("message validation throttled; dropping message from %s", )
			.tracer.RejectMessage(, RejectValidationThrottled)
		}
		return nil
	}

	if  == ValidationIgnore {
		.tracer.RejectMessage(, RejectValidationIgnored)
		return ValidationError{Reason: RejectValidationIgnored}
	}

	// no async validators, accepted message, send it!
	select {
	case .p.sendMsg <- :
		return nil
	case <-.p.ctx.Done():
		return .p.ctx.Err()
	}
}

func ( *validation) ( *Message) bool {
	 := verifyMessageSignature(.Message)
	if  != nil {
		log.Debugf("signature verification error: %s", .Error())
		return false
	}

	return true
}

func ( *validation) ( []*validatorImpl,  peer.ID,  *Message,  ValidationResult) {
	 := .validateTopic(, , )

	if  == ValidationAccept &&  != ValidationAccept {
		 = 
	}

	switch  {
	case ValidationAccept:
		.p.sendMsg <- 
	case ValidationReject:
		log.Debugf("message validation failed; dropping message from %s", )
		.tracer.RejectMessage(, RejectValidationFailed)
		return
	case ValidationIgnore:
		log.Debugf("message validation punted; ignoring message from %s", )
		.tracer.RejectMessage(, RejectValidationIgnored)
		return
	case validationThrottled:
		log.Debugf("message validation throttled; ignoring message from %s", )
		.tracer.RejectMessage(, RejectValidationThrottled)

	default:
		// BUG: this would be an internal programming error, so a panic seems appropiate.
		panic(fmt.Errorf("unexpected validation result: %d", ))
	}
}

func ( *validation) ( []*validatorImpl,  peer.ID,  *Message) ValidationResult {
	if len() == 1 {
		return .validateSingleTopic([0], , )
	}

	,  := context.WithCancel(.p.ctx)
	defer ()

	 := make(chan ValidationResult, len())
	 := 0

	for ,  := range  {
		++

		select {
		case .validateThrottle <- struct{}{}:
			go func( *validatorImpl) {
				 <- .validateMsg(, , )
				<-.validateThrottle
			}()

		default:
			log.Debugf("validation throttled for topic %s", .topic)
			 <- validationThrottled
		}
	}

	 := ValidationAccept
:
	for  := 0;  < ; ++ {
		switch <- {
		case ValidationAccept:
		case ValidationReject:
			 = ValidationReject
			break 
		case ValidationIgnore:
			// throttled validation has the same effect, but takes precedence over Ignore as it is not
			// known whether the throttled validator would have signaled rejection.
			if  != validationThrottled {
				 = ValidationIgnore
			}
		case validationThrottled:
			 = validationThrottled
		}
	}

	return 
}

// fast path for single topic validation that avoids the extra goroutine
func ( *validation) ( *validatorImpl,  peer.ID,  *Message) ValidationResult {
	select {
	case .validateThrottle <- struct{}{}:
		 := .validateMsg(.p.ctx, , )
		<-.validateThrottle
		return 

	default:
		log.Debugf("validation throttled for topic %s", .topic)
		return validationThrottled
	}
}

func ( *validatorImpl) ( context.Context,  peer.ID,  *Message) ValidationResult {
	 := time.Now()
	defer func() {
		log.Debugf("validation done; took %s", time.Since())
	}()

	if .validateTimeout > 0 {
		var  func()
		,  = context.WithTimeout(, .validateTimeout)
		defer ()
	}

	 := .validate(, , )
	switch  {
	case ValidationAccept:
		fallthrough
	case ValidationReject:
		fallthrough
	case ValidationIgnore:
		return 

	default:
		log.Warnf("Unexpected result from validator: %d; ignoring message", )
		return ValidationIgnore
	}
}

// / Options
// WithDefaultValidator adds a validator that applies to all topics by default; it can be used
// more than once and add multiple validators. Having a defult validator does not inhibit registering
// a per topic validator.
func ( interface{},  ...ValidatorOpt) Option {
	return func( *PubSub) error {
		 := &addValReq{
			validate: ,
		}

		for ,  := range  {
			 := ()
			if  != nil {
				return 
			}
		}

		,  := .val.makeValidator()
		if  != nil {
			return 
		}

		.val.defaultVals = append(.val.defaultVals, )
		return nil
	}
}

// WithValidateQueueSize sets the buffer of validate queue. Defaults to 32.
// When queue is full, validation is throttled and new messages are dropped.
func ( int) Option {
	return func( *PubSub) error {
		if  > 0 {
			.val.validateQ = make(chan *validateReq, )
			return nil
		}
		return fmt.Errorf("validate queue size must be > 0")
	}
}

// WithValidateThrottle sets the upper bound on the number of active validation
// goroutines across all topics. The default is 8192.
func ( int) Option {
	return func( *PubSub) error {
		.val.validateThrottle = make(chan struct{}, )
		return nil
	}
}

// WithValidateWorkers sets the number of synchronous validation worker goroutines.
// Defaults to NumCPU.
//
// The synchronous validation workers perform signature validation, apply inline
// user validators, and schedule asynchronous user validators.
// You can adjust this parameter to devote less cpu time to synchronous validation.
func ( int) Option {
	return func( *PubSub) error {
		if  > 0 {
			.val.validateWorkers = 
			return nil
		}
		return fmt.Errorf("number of validation workers must be > 0")
	}
}

// WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator.
// By default there is no timeout in asynchronous validators.
func ( time.Duration) ValidatorOpt {
	return func( *addValReq) error {
		.timeout = 
		return nil
	}
}

// WithValidatorConcurrency is an option that sets the topic validator throttle.
// This controls the number of active validation goroutines for the topic; the default is 1024.
func ( int) ValidatorOpt {
	return func( *addValReq) error {
		.throttle = 
		return nil
	}
}

// WithValidatorInline is an option that sets the validation disposition to synchronous:
// it will be executed inline in validation front-end, without spawning a new goroutine.
// This is suitable for simple or cpu-bound validators that do not block.
func ( bool) ValidatorOpt {
	return func( *addValReq) error {
		.inline = 
		return nil
	}
}