package pubsub
import (
"context"
"errors"
"sync"
)
var (
ErrQueueCancelled = errors .New ("rpc queue operation cancelled" )
ErrQueueClosed = errors .New ("rpc queue closed" )
ErrQueueFull = errors .New ("rpc queue full" )
ErrQueuePushOnClosed = errors .New ("push on closed rpc queue" )
)
type priorityQueue struct {
normal []*RPC
priority []*RPC
}
func (q *priorityQueue ) Len () int {
return len (q .normal ) + len (q .priority )
}
func (q *priorityQueue ) NormalPush (rpc *RPC ) {
q .normal = append (q .normal , rpc )
}
func (q *priorityQueue ) PriorityPush (rpc *RPC ) {
q .priority = append (q .priority , rpc )
}
func (q *priorityQueue ) Pop () *RPC {
var rpc *RPC
if len (q .priority ) > 0 {
rpc = q .priority [0 ]
q .priority [0 ] = nil
q .priority = q .priority [1 :]
} else if len (q .normal ) > 0 {
rpc = q .normal [0 ]
q .normal [0 ] = nil
q .normal = q .normal [1 :]
}
return rpc
}
type rpcQueue struct {
dataAvailable sync .Cond
spaceAvailable sync .Cond
queueMu sync .Mutex
queue priorityQueue
closed bool
maxSize int
}
func newRpcQueue(maxSize int ) *rpcQueue {
q := &rpcQueue {maxSize : maxSize }
q .dataAvailable .L = &q .queueMu
q .spaceAvailable .L = &q .queueMu
return q
}
func (q *rpcQueue ) Push (rpc *RPC , block bool ) error {
return q .push (rpc , false , block )
}
func (q *rpcQueue ) UrgentPush (rpc *RPC , block bool ) error {
return q .push (rpc , true , block )
}
func (q *rpcQueue ) push (rpc *RPC , urgent bool , block bool ) error {
q .queueMu .Lock ()
defer q .queueMu .Unlock ()
if q .closed {
panic (ErrQueuePushOnClosed )
}
for q .queue .Len () == q .maxSize {
if block {
q .spaceAvailable .Wait ()
if q .closed {
panic (ErrQueuePushOnClosed )
}
} else {
return ErrQueueFull
}
}
if urgent {
q .queue .PriorityPush (rpc )
} else {
q .queue .NormalPush (rpc )
}
q .dataAvailable .Signal ()
return nil
}
func (q *rpcQueue ) Pop (ctx context .Context ) (*RPC , error ) {
q .queueMu .Lock ()
defer q .queueMu .Unlock ()
if q .closed {
return nil , ErrQueueClosed
}
unregisterAfterFunc := context .AfterFunc (ctx , func () {
q .dataAvailable .Broadcast ()
})
defer unregisterAfterFunc ()
for q .queue .Len () == 0 {
select {
case <- ctx .Done ():
return nil , ErrQueueCancelled
default :
}
q .dataAvailable .Wait ()
if q .closed {
return nil , ErrQueueClosed
}
}
rpc := q .queue .Pop ()
q .spaceAvailable .Signal ()
return rpc , nil
}
func (q *rpcQueue ) Close () {
q .queueMu .Lock ()
defer q .queueMu .Unlock ()
q .closed = true
q .dataAvailable .Broadcast ()
q .spaceAvailable .Broadcast ()
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .