package pubsub

import (
	
	
	
)

var (
	ErrQueueCancelled    = errors.New("rpc queue operation cancelled")
	ErrQueueClosed       = errors.New("rpc queue closed")
	ErrQueueFull         = errors.New("rpc queue full")
	ErrQueuePushOnClosed = errors.New("push on closed rpc queue")
)

type priorityQueue struct {
	normal   []*RPC
	priority []*RPC
}

func ( *priorityQueue) () int {
	return len(.normal) + len(.priority)
}

func ( *priorityQueue) ( *RPC) {
	.normal = append(.normal, )
}

func ( *priorityQueue) ( *RPC) {
	.priority = append(.priority, )
}

func ( *priorityQueue) () *RPC {
	var  *RPC

	if len(.priority) > 0 {
		 = .priority[0]
		.priority[0] = nil
		.priority = .priority[1:]
	} else if len(.normal) > 0 {
		 = .normal[0]
		.normal[0] = nil
		.normal = .normal[1:]
	}

	return 
}

type rpcQueue struct {
	dataAvailable  sync.Cond
	spaceAvailable sync.Cond
	// Mutex used to access queue
	queueMu sync.Mutex
	queue   priorityQueue

	closed  bool
	maxSize int
}

func newRpcQueue( int) *rpcQueue {
	 := &rpcQueue{maxSize: }
	.dataAvailable.L = &.queueMu
	.spaceAvailable.L = &.queueMu
	return 
}

func ( *rpcQueue) ( *RPC,  bool) error {
	return .push(, false, )
}

func ( *rpcQueue) ( *RPC,  bool) error {
	return .push(, true, )
}

func ( *rpcQueue) ( *RPC,  bool,  bool) error {
	.queueMu.Lock()
	defer .queueMu.Unlock()

	if .closed {
		panic(ErrQueuePushOnClosed)
	}

	for .queue.Len() == .maxSize {
		if  {
			.spaceAvailable.Wait()
			// It can receive a signal because the queue is closed.
			if .closed {
				panic(ErrQueuePushOnClosed)
			}
		} else {
			return ErrQueueFull
		}
	}
	if  {
		.queue.PriorityPush()
	} else {
		.queue.NormalPush()
	}

	.dataAvailable.Signal()
	return nil
}

// Note that, when the queue is empty and there are two blocked Pop calls, it
// doesn't mean that the first Pop will get the item from the next Push. The
// second Pop will probably get it instead.
func ( *rpcQueue) ( context.Context) (*RPC, error) {
	.queueMu.Lock()
	defer .queueMu.Unlock()

	if .closed {
		return nil, ErrQueueClosed
	}

	 := context.AfterFunc(, func() {
		// Wake up all the waiting routines. The only routine that correponds
		// to this Pop call will return from the function. Note that this can
		// be expensive, if there are too many waiting routines.
		.dataAvailable.Broadcast()
	})
	defer ()

	for .queue.Len() == 0 {
		select {
		case <-.Done():
			return nil, ErrQueueCancelled
		default:
		}
		.dataAvailable.Wait()
		// It can receive a signal because the queue is closed.
		if .closed {
			return nil, ErrQueueClosed
		}
	}
	 := .queue.Pop()
	.spaceAvailable.Signal()
	return , nil
}

func ( *rpcQueue) () {
	.queueMu.Lock()
	defer .queueMu.Unlock()

	.closed = true
	.dataAvailable.Broadcast()
	.spaceAvailable.Broadcast()
}