package nats
import (
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/nats-io/nats.go/internal/parser"
"github.com/nats-io/nuid"
)
type JetStream interface {
Publish (subj string , data []byte , opts ...PubOpt ) (*PubAck , error )
PublishMsg (m *Msg , opts ...PubOpt ) (*PubAck , error )
PublishAsync (subj string , data []byte , opts ...PubOpt ) (PubAckFuture , error )
PublishMsgAsync (m *Msg , opts ...PubOpt ) (PubAckFuture , error )
PublishAsyncPending () int
PublishAsyncComplete () <-chan struct {}
CleanupPublisher ()
Subscribe (subj string , cb MsgHandler , opts ...SubOpt ) (*Subscription , error )
SubscribeSync (subj string , opts ...SubOpt ) (*Subscription , error )
ChanSubscribe (subj string , ch chan *Msg , opts ...SubOpt ) (*Subscription , error )
ChanQueueSubscribe (subj, queue string , ch chan *Msg , opts ...SubOpt ) (*Subscription , error )
QueueSubscribe (subj, queue string , cb MsgHandler , opts ...SubOpt ) (*Subscription , error )
QueueSubscribeSync (subj, queue string , opts ...SubOpt ) (*Subscription , error )
PullSubscribe (subj, durable string , opts ...SubOpt ) (*Subscription , error )
}
type JetStreamContext interface {
JetStream
JetStreamManager
KeyValueManager
ObjectStoreManager
}
const (
defaultAPIPrefix = "$JS.API."
jsDomainT = "$JS.%s.API."
jsExtDomainT = "$JS.%s.API"
apiAccountInfo = "INFO"
apiConsumerCreateT = "CONSUMER.CREATE.%s.%s"
apiConsumerCreateWithFilterSubjectT = "CONSUMER.CREATE.%s.%s.%s"
apiLegacyConsumerCreateT = "CONSUMER.CREATE.%s"
apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"
apiConsumerInfoT = "CONSUMER.INFO.%s.%s"
apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"
apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"
apiConsumerListT = "CONSUMER.LIST.%s"
apiConsumerNamesT = "CONSUMER.NAMES.%s"
apiStreams = "STREAM.NAMES"
apiStreamCreateT = "STREAM.CREATE.%s"
apiStreamInfoT = "STREAM.INFO.%s"
apiStreamUpdateT = "STREAM.UPDATE.%s"
apiStreamDeleteT = "STREAM.DELETE.%s"
apiStreamPurgeT = "STREAM.PURGE.%s"
apiStreamListT = "STREAM.LIST"
apiMsgGetT = "STREAM.MSG.GET.%s"
apiDirectMsgGetT = "DIRECT.GET.%s"
apiDirectMsgGetLastBySubjectT = "DIRECT.GET.%s.%s"
apiMsgDeleteT = "STREAM.MSG.DELETE.%s"
orderedHeartbeatsInterval = 5 * time .Second
hbcThresh = 2
chanSubFCCheckInterval = 250 * time .Millisecond
DefaultPubRetryWait = 250 * time .Millisecond
DefaultPubRetryAttempts = 2
defaultAsyncPubAckInflight = 4000
)
const (
jsCtrlHB = 1
jsCtrlFC = 2
)
type js struct {
nc *Conn
opts *jsOpts
mu sync .RWMutex
rpre string
rsub *Subscription
pafs map [string ]*pubAckFuture
stc chan struct {}
dch chan struct {}
rr *rand .Rand
connStatusCh chan (Status )
replyPrefix string
replyPrefixLen int
}
type jsOpts struct {
ctx context .Context
pre string
wait time .Duration
aecb MsgErrHandler
maxpa int
ackTimeout time .Duration
domain string
ctrace ClientTrace
shouldTrace bool
purgeOpts *StreamPurgeRequest
streamInfoOpts *StreamInfoRequest
streamListSubject string
directGet bool
directNextFor string
featureFlags featureFlags
}
const (
defaultRequestWait = 5 * time .Second
defaultAccountCheck = 20 * time .Second
)
func (nc *Conn ) JetStream (opts ...JSOpt ) (JetStreamContext , error ) {
js := &js {
nc : nc ,
opts : &jsOpts {
pre : defaultAPIPrefix ,
wait : defaultRequestWait ,
maxpa : defaultAsyncPubAckInflight ,
},
}
inboxPrefix := InboxPrefix
if js .nc .Opts .InboxPrefix != _EMPTY_ {
inboxPrefix = js .nc .Opts .InboxPrefix + "."
}
js .replyPrefix = inboxPrefix
js .replyPrefixLen = len (js .replyPrefix ) + aReplyTokensize + 1
for _ , opt := range opts {
if err := opt .configureJSContext (js .opts ); err != nil {
return nil , err
}
}
return js , nil
}
type JSOpt interface {
configureJSContext(opts *jsOpts ) error
}
type jsOptFn func (opts *jsOpts ) error
func (opt jsOptFn ) configureJSContext (opts *jsOpts ) error {
return opt (opts )
}
type featureFlags struct {
useDurableConsumerCreate bool
}
func UseLegacyDurableConsumers () JSOpt {
return jsOptFn (func (opts *jsOpts ) error {
opts .featureFlags .useDurableConsumerCreate = true
return nil
})
}
type ClientTrace struct {
RequestSent func (subj string , payload []byte )
ResponseReceived func (subj string , payload []byte , hdr Header )
}
func (ct ClientTrace ) configureJSContext (js *jsOpts ) error {
js .ctrace = ct
js .shouldTrace = true
return nil
}
func Domain (domain string ) JSOpt {
if domain == _EMPTY_ {
return APIPrefix (_EMPTY_ )
}
return jsOptFn (func (js *jsOpts ) error {
js .domain = domain
js .pre = fmt .Sprintf (jsDomainT , domain )
return nil
})
}
func (s *StreamPurgeRequest ) configureJSContext (js *jsOpts ) error {
js .purgeOpts = s
return nil
}
func (s *StreamInfoRequest ) configureJSContext (js *jsOpts ) error {
js .streamInfoOpts = s
return nil
}
func APIPrefix (pre string ) JSOpt {
return jsOptFn (func (js *jsOpts ) error {
if pre == _EMPTY_ {
return nil
}
js .pre = pre
if !strings .HasSuffix (js .pre , "." ) {
js .pre = js .pre + "."
}
return nil
})
}
func DirectGet () JSOpt {
return jsOptFn (func (js *jsOpts ) error {
js .directGet = true
return nil
})
}
func DirectGetNext (subject string ) JSOpt {
return jsOptFn (func (js *jsOpts ) error {
js .directGet = true
js .directNextFor = subject
return nil
})
}
func StreamListFilter (subject string ) JSOpt {
return jsOptFn (func (opts *jsOpts ) error {
opts .streamListSubject = subject
return nil
})
}
func (js *js ) apiSubj (subj string ) string {
if js .opts .pre == _EMPTY_ {
return subj
}
var b strings .Builder
b .WriteString (js .opts .pre )
b .WriteString (subj )
return b .String ()
}
type PubOpt interface {
configurePublish(opts *pubOpts ) error
}
type pubOptFn func (opts *pubOpts ) error
func (opt pubOptFn ) configurePublish (opts *pubOpts ) error {
return opt (opts )
}
type pubOpts struct {
ctx context .Context
ttl time .Duration
id string
lid string
str string
seq *uint64
lss *uint64
msgTTL time .Duration
rwait time .Duration
rnum int
stallWait time .Duration
pafRetry *pubAckFuture
}
type pubAckResponse struct {
apiResponse
*PubAck
}
type PubAck struct {
Stream string `json:"stream"`
Sequence uint64 `json:"seq"`
Duplicate bool `json:"duplicate,omitempty"`
Domain string `json:"domain,omitempty"`
}
const (
MsgIdHdr = "Nats-Msg-Id"
ExpectedStreamHdr = "Nats-Expected-Stream"
ExpectedLastSeqHdr = "Nats-Expected-Last-Sequence"
ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
MsgRollup = "Nats-Rollup"
MsgTTLHdr = "Nats-TTL"
)
const (
JSStream = "Nats-Stream"
JSSequence = "Nats-Sequence"
JSTimeStamp = "Nats-Time-Stamp"
JSSubject = "Nats-Subject"
JSLastSequence = "Nats-Last-Sequence"
)
const MsgSize = "Nats-Msg-Size"
const (
MsgRollupSubject = "sub"
MsgRollupAll = "all"
)
func (js *js ) PublishMsg (m *Msg , opts ...PubOpt ) (*PubAck , error ) {
var o = pubOpts {rwait : DefaultPubRetryWait , rnum : DefaultPubRetryAttempts }
if len (opts ) > 0 {
if m .Header == nil {
m .Header = Header {}
}
for _ , opt := range opts {
if err := opt .configurePublish (&o ); err != nil {
return nil , err
}
}
}
if o .ctx != nil && o .ttl != 0 {
return nil , ErrContextAndTimeout
}
if o .ttl == 0 && o .ctx == nil {
o .ttl = js .opts .wait
}
if o .stallWait > 0 {
return nil , errors .New ("nats: stall wait cannot be set to sync publish" )
}
if o .id != _EMPTY_ {
m .Header .Set (MsgIdHdr , o .id )
}
if o .lid != _EMPTY_ {
m .Header .Set (ExpectedLastMsgIdHdr , o .lid )
}
if o .str != _EMPTY_ {
m .Header .Set (ExpectedStreamHdr , o .str )
}
if o .seq != nil {
m .Header .Set (ExpectedLastSeqHdr , strconv .FormatUint (*o .seq , 10 ))
}
if o .lss != nil {
m .Header .Set (ExpectedLastSubjSeqHdr , strconv .FormatUint (*o .lss , 10 ))
}
if o .msgTTL > 0 {
m .Header .Set (MsgTTLHdr , o .msgTTL .String ())
}
var resp *Msg
var err error
if o .ttl > 0 {
resp , err = js .nc .RequestMsg (m , time .Duration (o .ttl ))
} else {
resp , err = js .nc .RequestMsgWithContext (o .ctx , m )
}
if err != nil {
for r , ttl := 0 , o .ttl ; errors .Is (err , ErrNoResponders ) && (r < o .rnum || o .rnum < 0 ); r ++ {
if o .ctx != nil {
select {
case <- o .ctx .Done ():
case <- time .After (o .rwait ):
}
} else {
time .Sleep (o .rwait )
}
if o .ttl > 0 {
ttl -= o .rwait
if ttl <= 0 {
err = ErrTimeout
break
}
resp , err = js .nc .RequestMsg (m , time .Duration (ttl ))
} else {
resp , err = js .nc .RequestMsgWithContext (o .ctx , m )
}
}
if err != nil {
if errors .Is (err , ErrNoResponders ) {
err = ErrNoStreamResponse
}
return nil , err
}
}
var pa pubAckResponse
if err := json .Unmarshal (resp .Data , &pa ); err != nil {
return nil , ErrInvalidJSAck
}
if pa .Error != nil {
return nil , pa .Error
}
if pa .PubAck == nil || pa .PubAck .Stream == _EMPTY_ {
return nil , ErrInvalidJSAck
}
return pa .PubAck , nil
}
func (js *js ) Publish (subj string , data []byte , opts ...PubOpt ) (*PubAck , error ) {
return js .PublishMsg (&Msg {Subject : subj , Data : data }, opts ...)
}
type PubAckFuture interface {
Ok () <-chan *PubAck
Err () <-chan error
Msg () *Msg
}
type pubAckFuture struct {
js *js
msg *Msg
pa *PubAck
st time .Time
err error
errCh chan error
doneCh chan *PubAck
retries int
maxRetries int
retryWait time .Duration
reply string
timeout *time .Timer
}
func (paf *pubAckFuture ) Ok () <-chan *PubAck {
paf .js .mu .Lock ()
defer paf .js .mu .Unlock ()
if paf .doneCh == nil {
paf .doneCh = make (chan *PubAck , 1 )
if paf .pa != nil {
paf .doneCh <- paf .pa
}
}
return paf .doneCh
}
func (paf *pubAckFuture ) Err () <-chan error {
paf .js .mu .Lock ()
defer paf .js .mu .Unlock ()
if paf .errCh == nil {
paf .errCh = make (chan error , 1 )
if paf .err != nil {
paf .errCh <- paf .err
}
}
return paf .errCh
}
func (paf *pubAckFuture ) Msg () *Msg {
paf .js .mu .RLock ()
defer paf .js .mu .RUnlock ()
return paf .msg
}
const aReplyTokensize = 6
func (js *js ) newAsyncReply () string {
js .mu .Lock ()
if js .rsub == nil {
sha := sha256 .New ()
sha .Write ([]byte (nuid .Next ()))
b := sha .Sum (nil )
for i := 0 ; i < aReplyTokensize ; i ++ {
b [i ] = rdigits [int (b [i ]%base )]
}
js .rpre = fmt .Sprintf ("%s%s." , js .replyPrefix , b [:aReplyTokensize ])
sub , err := js .nc .Subscribe (fmt .Sprintf ("%s*" , js .rpre ), js .handleAsyncReply )
if err != nil {
js .mu .Unlock ()
return _EMPTY_
}
js .rsub = sub
js .rr = rand .New (rand .NewSource (time .Now ().UnixNano ()))
}
if js .connStatusCh == nil {
js .connStatusCh = js .nc .StatusChanged (RECONNECTING , CLOSED )
go js .resetPendingAcksOnReconnect ()
}
var sb strings .Builder
sb .WriteString (js .rpre )
for {
rn := js .rr .Int63 ()
var b [aReplyTokensize ]byte
for i , l := 0 , rn ; i < len (b ); i ++ {
b [i ] = rdigits [l %base ]
l /= base
}
if _ , ok := js .pafs [string (b [:])]; ok {
continue
}
sb .Write (b [:])
break
}
js .mu .Unlock ()
return sb .String ()
}
func (js *js ) resetPendingAcksOnReconnect () {
js .mu .Lock ()
connStatusCh := js .connStatusCh
js .mu .Unlock ()
for {
newStatus , ok := <-connStatusCh
if !ok || newStatus == CLOSED {
return
}
js .mu .Lock ()
errCb := js .opts .aecb
for id , paf := range js .pafs {
paf .err = ErrDisconnected
if paf .errCh != nil {
paf .errCh <- paf .err
}
if errCb != nil {
defer errCb (js , paf .msg , ErrDisconnected )
}
delete (js .pafs , id )
}
if js .dch != nil {
close (js .dch )
js .dch = nil
}
js .mu .Unlock ()
}
}
func (js *js ) CleanupPublisher () {
js .cleanupReplySub ()
js .mu .Lock ()
errCb := js .opts .aecb
for id , paf := range js .pafs {
paf .err = ErrJetStreamPublisherClosed
if paf .errCh != nil {
paf .errCh <- paf .err
}
if errCb != nil {
defer errCb (js , paf .msg , ErrJetStreamPublisherClosed )
}
delete (js .pafs , id )
}
if js .dch != nil {
close (js .dch )
js .dch = nil
}
js .mu .Unlock ()
}
func (js *js ) cleanupReplySub () {
js .mu .Lock ()
if js .rsub != nil {
js .rsub .Unsubscribe ()
js .rsub = nil
}
if js .connStatusCh != nil {
close (js .connStatusCh )
js .connStatusCh = nil
}
js .mu .Unlock ()
}
func (js *js ) registerPAF (id string , paf *pubAckFuture ) (int , int ) {
js .mu .Lock ()
if js .pafs == nil {
js .pafs = make (map [string ]*pubAckFuture )
}
paf .js = js
js .pafs [id ] = paf
np := len (js .pafs )
maxpa := js .opts .maxpa
js .mu .Unlock ()
return np , maxpa
}
func (js *js ) getPAF (id string ) *pubAckFuture {
if js .pafs == nil {
return nil
}
return js .pafs [id ]
}
func (js *js ) clearPAF (id string ) {
js .mu .Lock ()
delete (js .pafs , id )
js .mu .Unlock ()
}
func (js *js ) PublishAsyncPending () int {
js .mu .RLock ()
defer js .mu .RUnlock ()
return len (js .pafs )
}
func (js *js ) asyncStall () <-chan struct {} {
js .mu .Lock ()
if js .stc == nil {
js .stc = make (chan struct {})
}
stc := js .stc
js .mu .Unlock ()
return stc
}
func (js *js ) handleAsyncReply (m *Msg ) {
if len (m .Subject ) <= js .replyPrefixLen {
return
}
id := m .Subject [js .replyPrefixLen :]
js .mu .Lock ()
paf := js .getPAF (id )
if paf == nil {
js .mu .Unlock ()
return
}
closeStc := func () {
if js .stc != nil && len (js .pafs ) < js .opts .maxpa {
close (js .stc )
js .stc = nil
}
}
closeDchFn := func () func () {
var dch chan struct {}
if js .dch != nil && len (js .pafs ) == 0 {
dch = js .dch
js .dch = nil
}
return func () {
if dch != nil {
close (dch )
}
}
}
doErr := func (err error ) {
paf .err = err
if paf .errCh != nil {
paf .errCh <- paf .err
}
cb := js .opts .aecb
js .mu .Unlock ()
if cb != nil {
cb (paf .js , paf .msg , err )
}
}
if paf .timeout != nil {
paf .timeout .Stop ()
}
if len (m .Data ) == 0 && m .Header .Get (statusHdr ) == noResponders {
if paf .retries < paf .maxRetries {
paf .retries ++
time .AfterFunc (paf .retryWait , func () {
js .mu .Lock ()
paf := js .getPAF (id )
js .mu .Unlock ()
if paf == nil {
return
}
_ , err := js .PublishMsgAsync (paf .msg , pubOptFn (func (po *pubOpts ) error {
po .pafRetry = paf
return nil
}))
if err != nil {
js .mu .Lock ()
doErr (err )
}
})
js .mu .Unlock ()
return
}
delete (js .pafs , id )
closeStc ()
defer closeDchFn ()()
doErr (ErrNoResponders )
return
}
delete (js .pafs , id )
closeStc ()
defer closeDchFn ()()
var pa pubAckResponse
if err := json .Unmarshal (m .Data , &pa ); err != nil {
doErr (ErrInvalidJSAck )
return
}
if pa .Error != nil {
doErr (pa .Error )
return
}
if pa .PubAck == nil || pa .PubAck .Stream == _EMPTY_ {
doErr (ErrInvalidJSAck )
return
}
paf .pa = pa .PubAck
if paf .doneCh != nil {
paf .doneCh <- paf .pa
}
js .mu .Unlock ()
}
type MsgErrHandler func (JetStream , *Msg , error )
func PublishAsyncErrHandler (cb MsgErrHandler ) JSOpt {
return jsOptFn (func (js *jsOpts ) error {
js .aecb = cb
return nil
})
}
func PublishAsyncMaxPending (max int ) JSOpt {
return jsOptFn (func (js *jsOpts ) error {
if max < 1 {
return errors .New ("nats: max ack pending should be >= 1" )
}
js .maxpa = max
return nil
})
}
func PublishAsyncTimeout (dur time .Duration ) JSOpt {
return jsOptFn (func (opts *jsOpts ) error {
opts .ackTimeout = dur
return nil
})
}
func (js *js ) PublishAsync (subj string , data []byte , opts ...PubOpt ) (PubAckFuture , error ) {
return js .PublishMsgAsync (&Msg {Subject : subj , Data : data }, opts ...)
}
const defaultStallWait = 200 * time .Millisecond
func (js *js ) PublishMsgAsync (m *Msg , opts ...PubOpt ) (PubAckFuture , error ) {
var o pubOpts
if len (opts ) > 0 {
if m .Header == nil {
m .Header = Header {}
}
for _ , opt := range opts {
if err := opt .configurePublish (&o ); err != nil {
return nil , err
}
}
}
if o .rnum < 0 {
return nil , fmt .Errorf ("%w: retry attempts cannot be negative" , ErrInvalidArg )
}
if o .ttl != 0 || o .ctx != nil {
return nil , ErrContextAndTimeout
}
stallWait := defaultStallWait
if o .stallWait > 0 {
stallWait = o .stallWait
}
if o .id != _EMPTY_ {
m .Header .Set (MsgIdHdr , o .id )
}
if o .lid != _EMPTY_ {
m .Header .Set (ExpectedLastMsgIdHdr , o .lid )
}
if o .str != _EMPTY_ {
m .Header .Set (ExpectedStreamHdr , o .str )
}
if o .seq != nil {
m .Header .Set (ExpectedLastSeqHdr , strconv .FormatUint (*o .seq , 10 ))
}
if o .lss != nil {
m .Header .Set (ExpectedLastSubjSeqHdr , strconv .FormatUint (*o .lss , 10 ))
}
if o .msgTTL > 0 {
m .Header .Set (MsgTTLHdr , o .msgTTL .String ())
}
paf := o .pafRetry
if paf == nil && m .Reply != _EMPTY_ {
return nil , errors .New ("nats: reply subject should be empty" )
}
var id string
var reply string
if paf == nil {
reply = js .newAsyncReply ()
if reply == _EMPTY_ {
return nil , errors .New ("nats: error creating async reply handler" )
}
id = reply [js .replyPrefixLen :]
paf = &pubAckFuture {msg : m , st : time .Now (), maxRetries : o .rnum , retryWait : o .rwait , reply : reply }
numPending , maxPending := js .registerPAF (id , paf )
if maxPending > 0 && numPending > maxPending {
select {
case <- js .asyncStall ():
case <- time .After (stallWait ):
js .clearPAF (id )
return nil , ErrTooManyStalledMsgs
}
}
if js .opts .ackTimeout > 0 {
paf .timeout = time .AfterFunc (js .opts .ackTimeout , func () {
js .mu .Lock ()
defer js .mu .Unlock ()
if _ , ok := js .pafs [id ]; !ok {
return
}
delete (js .pafs , id )
if js .stc != nil && len (js .pafs ) < js .opts .maxpa {
close (js .stc )
js .stc = nil
}
paf .err = ErrAsyncPublishTimeout
if paf .errCh != nil {
paf .errCh <- paf .err
}
if js .opts .aecb != nil {
js .opts .aecb (js , paf .msg , ErrAsyncPublishTimeout )
}
if js .dch != nil && len (js .pafs ) == 0 {
close (js .dch )
js .dch = nil
}
})
}
} else {
reply = paf .reply
if paf .timeout != nil {
paf .timeout .Reset (js .opts .ackTimeout )
}
id = reply [js .replyPrefixLen :]
}
hdr , err := m .headerBytes ()
if err != nil {
return nil , err
}
if err := js .nc .publish (m .Subject , reply , false , hdr , m .Data ); err != nil {
js .clearPAF (id )
return nil , err
}
return paf , nil
}
func (js *js ) PublishAsyncComplete () <-chan struct {} {
js .mu .Lock ()
defer js .mu .Unlock ()
if js .dch == nil {
js .dch = make (chan struct {})
}
dch := js .dch
if len (js .pafs ) == 0 {
close (js .dch )
js .dch = nil
}
return dch
}
func MsgId (id string ) PubOpt {
return pubOptFn (func (opts *pubOpts ) error {
opts .id = id
return nil
})
}
func ExpectStream (stream string ) PubOpt {
return pubOptFn (func (opts *pubOpts ) error {
opts .str = stream
return nil
})
}
func ExpectLastSequence (seq uint64 ) PubOpt {
return pubOptFn (func (opts *pubOpts ) error {
opts .seq = &seq
return nil
})
}
func ExpectLastSequencePerSubject (seq uint64 ) PubOpt {
return pubOptFn (func (opts *pubOpts ) error {
opts .lss = &seq
return nil
})
}
func ExpectLastMsgId (id string ) PubOpt {
return pubOptFn (func (opts *pubOpts ) error {
opts .lid = id
return nil
})
}
func RetryWait (dur time .Duration ) PubOpt {
return pubOptFn (func (opts *pubOpts ) error {
opts .rwait = dur
return nil
})
}
func RetryAttempts (num int ) PubOpt {
return pubOptFn (func (opts *pubOpts ) error {
opts .rnum = num
return nil
})
}
func StallWait (ttl time .Duration ) PubOpt {
return pubOptFn (func (opts *pubOpts ) error {
if ttl <= 0 {
return errors .New ("nats: stall wait should be more than 0" )
}
opts .stallWait = ttl
return nil
})
}
func MsgTTL (dur time .Duration ) PubOpt {
return pubOptFn (func (opts *pubOpts ) error {
opts .msgTTL = dur
return nil
})
}
type ackOpts struct {
ttl time .Duration
ctx context .Context
nakDelay time .Duration
}
type AckOpt interface {
configureAck(opts *ackOpts ) error
}
type MaxWait time .Duration
func (ttl MaxWait ) configureJSContext (js *jsOpts ) error {
js .wait = time .Duration (ttl )
return nil
}
func (ttl MaxWait ) configurePull (opts *pullOpts ) error {
opts .ttl = time .Duration (ttl )
return nil
}
type AckWait time .Duration
func (ttl AckWait ) configurePublish (opts *pubOpts ) error {
opts .ttl = time .Duration (ttl )
return nil
}
func (ttl AckWait ) configureSubscribe (opts *subOpts ) error {
opts .cfg .AckWait = time .Duration (ttl )
return nil
}
func (ttl AckWait ) configureAck (opts *ackOpts ) error {
opts .ttl = time .Duration (ttl )
return nil
}
type ContextOpt struct {
context .Context
}
func (ctx ContextOpt ) configureJSContext (opts *jsOpts ) error {
opts .ctx = ctx
return nil
}
func (ctx ContextOpt ) configurePublish (opts *pubOpts ) error {
opts .ctx = ctx
return nil
}
func (ctx ContextOpt ) configureSubscribe (opts *subOpts ) error {
opts .ctx = ctx
return nil
}
func (ctx ContextOpt ) configurePull (opts *pullOpts ) error {
opts .ctx = ctx
return nil
}
func (ctx ContextOpt ) configureAck (opts *ackOpts ) error {
opts .ctx = ctx
return nil
}
func Context (ctx context .Context ) ContextOpt {
return ContextOpt {ctx }
}
type nakDelay time .Duration
func (d nakDelay ) configureAck (opts *ackOpts ) error {
opts .nakDelay = time .Duration (d )
return nil
}
type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time .Time `json:"opt_start_time,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time .Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
BackOff []time .Duration `json:"backoff,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
FilterSubjects []string `json:"filter_subjects,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"`
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time .Duration `json:"idle_heartbeat,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`
MaxRequestBatch int `json:"max_batch,omitempty"`
MaxRequestExpires time .Duration `json:"max_expires,omitempty"`
MaxRequestMaxBytes int `json:"max_bytes,omitempty"`
DeliverSubject string `json:"deliver_subject,omitempty"`
DeliverGroup string `json:"deliver_group,omitempty"`
InactiveThreshold time .Duration `json:"inactive_threshold,omitempty"`
Replicas int `json:"num_replicas"`
MemoryStorage bool `json:"mem_storage,omitempty"`
Metadata map [string ]string `json:"metadata,omitempty"`
}
type ConsumerInfo struct {
Stream string `json:"stream_name"`
Name string `json:"name"`
Created time .Time `json:"created"`
Config ConsumerConfig `json:"config"`
Delivered SequenceInfo `json:"delivered"`
AckFloor SequenceInfo `json:"ack_floor"`
NumAckPending int `json:"num_ack_pending"`
NumRedelivered int `json:"num_redelivered"`
NumWaiting int `json:"num_waiting"`
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
}
type SequenceInfo struct {
Consumer uint64 `json:"consumer_seq"`
Stream uint64 `json:"stream_seq"`
Last *time .Time `json:"last_active,omitempty"`
}
type SequencePair struct {
Consumer uint64 `json:"consumer_seq"`
Stream uint64 `json:"stream_seq"`
}
type nextRequest struct {
Expires time .Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
Heartbeat time .Duration `json:"idle_heartbeat,omitempty"`
}
type jsSub struct {
js *js
nms string
psubj string
consumer string
stream string
deliver string
pull bool
dc bool
ackNone bool
pending uint64
ordered bool
dseq uint64
sseq uint64
ccreq *createConsumerRequest
hbc *time .Timer
hbi time .Duration
active bool
cmeta string
fcr string
fcd uint64
fciseq uint64
csfct *time .Timer
ctx context .Context
cancel func ()
}
func (sub *Subscription ) deleteConsumer () error {
sub .mu .Lock ()
jsi := sub .jsi
if jsi == nil {
sub .mu .Unlock ()
return nil
}
if jsi .stream == _EMPTY_ || jsi .consumer == _EMPTY_ {
sub .mu .Unlock ()
return nil
}
stream , consumer := jsi .stream , jsi .consumer
js := jsi .js
sub .mu .Unlock ()
return js .DeleteConsumer (stream , consumer )
}
type SubOpt interface {
configureSubscribe(opts *subOpts ) error
}
type subOptFn func (opts *subOpts ) error
func (opt subOptFn ) configureSubscribe (opts *subOpts ) error {
return opt (opts )
}
func (js *js ) Subscribe (subj string , cb MsgHandler , opts ...SubOpt ) (*Subscription , error ) {
if cb == nil {
return nil , ErrBadSubscription
}
return js .subscribe (subj , _EMPTY_ , cb , nil , false , false , opts )
}
func (js *js ) SubscribeSync (subj string , opts ...SubOpt ) (*Subscription , error ) {
mch := make (chan *Msg , js .nc .Opts .SubChanLen )
return js .subscribe (subj , _EMPTY_ , nil , mch , true , false , opts )
}
func (js *js ) QueueSubscribe (subj , queue string , cb MsgHandler , opts ...SubOpt ) (*Subscription , error ) {
if cb == nil {
return nil , ErrBadSubscription
}
return js .subscribe (subj , queue , cb , nil , false , false , opts )
}
func (js *js ) QueueSubscribeSync (subj , queue string , opts ...SubOpt ) (*Subscription , error ) {
mch := make (chan *Msg , js .nc .Opts .SubChanLen )
return js .subscribe (subj , queue , nil , mch , true , false , opts )
}
func (js *js ) ChanSubscribe (subj string , ch chan *Msg , opts ...SubOpt ) (*Subscription , error ) {
return js .subscribe (subj , _EMPTY_ , nil , ch , false , false , opts )
}
func (js *js ) ChanQueueSubscribe (subj , queue string , ch chan *Msg , opts ...SubOpt ) (*Subscription , error ) {
return js .subscribe (subj , queue , nil , ch , false , false , opts )
}
func (js *js ) PullSubscribe (subj , durable string , opts ...SubOpt ) (*Subscription , error ) {
mch := make (chan *Msg , js .nc .Opts .SubChanLen )
if durable != "" {
opts = append (opts , Durable (durable ))
}
return js .subscribe (subj , _EMPTY_ , nil , mch , true , true , opts )
}
func processConsInfo(info *ConsumerInfo , userCfg *ConsumerConfig , isPullMode bool , subj , queue string ) (string , error ) {
ccfg := &info .Config
if ccfg .FilterSubject != _EMPTY_ && subj != ccfg .FilterSubject {
return _EMPTY_ , ErrSubjectMismatch
}
if isPullMode && ccfg .DeliverSubject != _EMPTY_ {
return _EMPTY_ , ErrPullSubscribeToPushConsumer
} else if !isPullMode && ccfg .DeliverSubject == _EMPTY_ {
return _EMPTY_ , ErrPullSubscribeRequired
}
if isPullMode {
return _EMPTY_ , checkConfig (ccfg , userCfg )
}
dg := info .Config .DeliverGroup
if dg == _EMPTY_ {
if queue != _EMPTY_ {
return _EMPTY_ , errors .New ("cannot create a queue subscription for a consumer without a deliver group" )
} else if info .PushBound {
return _EMPTY_ , errors .New ("consumer is already bound to a subscription" )
}
} else {
if queue == _EMPTY_ {
return _EMPTY_ , fmt .Errorf ("cannot create a subscription for a consumer with a deliver group %q" , dg )
} else if queue != dg {
return _EMPTY_ , fmt .Errorf ("cannot create a queue subscription %q for a consumer with a deliver group %q" ,
queue , dg )
}
}
if err := checkConfig (ccfg , userCfg ); err != nil {
return _EMPTY_ , err
}
return ccfg .DeliverSubject , nil
}
func checkConfig(s , u *ConsumerConfig ) error {
makeErr := func (fieldName string , usrVal , srvVal any ) error {
return fmt .Errorf ("nats: configuration requests %s to be %v, but consumer's value is %v" , fieldName , usrVal , srvVal )
}
if u .Durable != _EMPTY_ && u .Durable != s .Durable {
return makeErr ("durable" , u .Durable , s .Durable )
}
if u .Description != _EMPTY_ && u .Description != s .Description {
return makeErr ("description" , u .Description , s .Description )
}
if u .DeliverPolicy != deliverPolicyNotSet && u .DeliverPolicy != s .DeliverPolicy {
return makeErr ("deliver policy" , u .DeliverPolicy , s .DeliverPolicy )
}
if u .OptStartSeq > 0 && u .OptStartSeq != s .OptStartSeq {
return makeErr ("optional start sequence" , u .OptStartSeq , s .OptStartSeq )
}
if u .OptStartTime != nil && !u .OptStartTime .IsZero () && !(*u .OptStartTime ).Equal (*s .OptStartTime ) {
return makeErr ("optional start time" , u .OptStartTime , s .OptStartTime )
}
if u .AckPolicy != ackPolicyNotSet && u .AckPolicy != s .AckPolicy {
return makeErr ("ack policy" , u .AckPolicy , s .AckPolicy )
}
if u .AckWait > 0 && u .AckWait != s .AckWait {
return makeErr ("ack wait" , u .AckWait , s .AckWait )
}
if u .MaxDeliver > 0 && u .MaxDeliver != s .MaxDeliver {
return makeErr ("max deliver" , u .MaxDeliver , s .MaxDeliver )
}
if u .ReplayPolicy != replayPolicyNotSet && u .ReplayPolicy != s .ReplayPolicy {
return makeErr ("replay policy" , u .ReplayPolicy , s .ReplayPolicy )
}
if u .RateLimit > 0 && u .RateLimit != s .RateLimit {
return makeErr ("rate limit" , u .RateLimit , s .RateLimit )
}
if u .SampleFrequency != _EMPTY_ && u .SampleFrequency != s .SampleFrequency {
return makeErr ("sample frequency" , u .SampleFrequency , s .SampleFrequency )
}
if u .MaxWaiting > 0 && u .MaxWaiting != s .MaxWaiting {
return makeErr ("max waiting" , u .MaxWaiting , s .MaxWaiting )
}
if u .MaxAckPending > 0 && u .MaxAckPending != s .MaxAckPending {
return makeErr ("max ack pending" , u .MaxAckPending , s .MaxAckPending )
}
if u .FlowControl && !s .FlowControl {
return makeErr ("flow control" , u .FlowControl , s .FlowControl )
}
if u .Heartbeat > 0 && u .Heartbeat != s .Heartbeat {
return makeErr ("heartbeat" , u .Heartbeat , s .Heartbeat )
}
if u .Replicas > 0 && u .Replicas != s .Replicas {
return makeErr ("replicas" , u .Replicas , s .Replicas )
}
if u .MemoryStorage && !s .MemoryStorage {
return makeErr ("memory storage" , u .MemoryStorage , s .MemoryStorage )
}
return nil
}
func (js *js ) subscribe (subj , queue string , cb MsgHandler , ch chan *Msg , isSync , isPullMode bool , opts []SubOpt ) (*Subscription , error ) {
cfg := ConsumerConfig {
DeliverPolicy : deliverPolicyNotSet ,
AckPolicy : ackPolicyNotSet ,
ReplayPolicy : replayPolicyNotSet ,
}
o := subOpts {cfg : &cfg }
if len (opts ) > 0 {
for _ , opt := range opts {
if opt == nil {
continue
}
if err := opt .configureSubscribe (&o ); err != nil {
return nil , err
}
}
}
if subj == _EMPTY_ && o .stream == _EMPTY_ {
return nil , errors .New ("nats: subject required" )
}
hasHeartbeats := o .cfg .Heartbeat > 0
hasFC := o .cfg .FlowControl
if isPullMode {
if o .cfg .DeliverSubject != _EMPTY_ {
return nil , ErrPullSubscribeToPushConsumer
}
}
if queue != _EMPTY_ {
if o .cfg .Heartbeat > 0 || o .cfg .FlowControl {
return nil , errors .New ("nats: queue subscription doesn't support idle heartbeat nor flow control" )
}
if o .consumer == _EMPTY_ && o .cfg .Durable == _EMPTY_ {
if err := checkConsumerName (queue ); err != nil {
return nil , err
}
o .cfg .Durable = queue
}
}
var (
err error
shouldCreate bool
info *ConsumerInfo
deliver string
stream = o .stream
consumer = o .consumer
isDurable = o .cfg .Durable != _EMPTY_
consumerBound = o .bound
ctx = o .ctx
skipCInfo = o .skipCInfo
notFoundErr bool
lookupErr bool
nc = js .nc
nms string
hbi time .Duration
ccreq *createConsumerRequest
maxap int
)
if o .ordered {
if isDurable {
return nil , errors .New ("nats: durable can not be set for an ordered consumer" )
}
if o .cfg .AckPolicy != ackPolicyNotSet {
return nil , errors .New ("nats: ack policy can not be set for an ordered consumer" )
}
if o .cfg .MaxDeliver != 1 && o .cfg .MaxDeliver != 0 {
return nil , errors .New ("nats: max deliver can not be set for an ordered consumer" )
}
if o .cfg .DeliverSubject != _EMPTY_ {
return nil , errors .New ("nats: deliver subject can not be set for an ordered consumer" )
}
if queue != _EMPTY_ {
return nil , errors .New ("nats: queues not be set for an ordered consumer" )
}
if consumer != _EMPTY_ {
return nil , errors .New ("nats: can not bind existing consumer for an ordered consumer" )
}
if isPullMode {
return nil , errors .New ("nats: can not use pull mode for an ordered consumer" )
}
o .cfg .FlowControl = true
o .cfg .AckPolicy = AckNonePolicy
o .cfg .MaxDeliver = 1
o .cfg .AckWait = 22 * time .Hour
o .cfg .Replicas = 1
o .cfg .MemoryStorage = true
if !hasHeartbeats {
o .cfg .Heartbeat = orderedHeartbeatsInterval
}
hasFC , hasHeartbeats = true , true
o .mack = true
hbi = o .cfg .Heartbeat
}
if consumer == _EMPTY_ {
consumer = o .cfg .Durable
}
if stream == _EMPTY_ {
stream , err = js .StreamNameBySubject (subj )
if err != nil {
return nil , err
}
}
if consumer != _EMPTY_ && !o .skipCInfo {
info , err = js .ConsumerInfo (stream , consumer )
notFoundErr = errors .Is (err , ErrConsumerNotFound )
lookupErr = err == ErrJetStreamNotEnabled || errors .Is (err , ErrTimeout ) || errors .Is (err , context .DeadlineExceeded )
}
switch {
case info != nil :
deliver , err = processConsInfo (info , o .cfg , isPullMode , subj , queue )
if err != nil {
return nil , err
}
icfg := &info .Config
hasFC , hbi = icfg .FlowControl , icfg .Heartbeat
hasHeartbeats = hbi > 0
maxap = icfg .MaxAckPending
case (err != nil && !notFoundErr ) || (notFoundErr && consumerBound ):
if !(isPullMode && lookupErr && consumerBound ) {
return nil , err
}
case skipCInfo :
hasFC , hbi = o .cfg .FlowControl , o .cfg .Heartbeat
hasHeartbeats = hbi > 0
maxap = o .cfg .MaxAckPending
deliver = o .cfg .DeliverSubject
if consumerBound {
break
}
fallthrough
default :
shouldCreate = true
if o .cfg .DeliverSubject != _EMPTY_ {
deliver = o .cfg .DeliverSubject
} else if !isPullMode {
deliver = nc .NewInbox ()
cfg .DeliverSubject = deliver
}
cfg .FilterSubject = subj
if queue != _EMPTY_ {
cfg .DeliverGroup = queue
}
if cfg .DeliverPolicy == deliverPolicyNotSet {
cfg .DeliverPolicy = DeliverAllPolicy
}
if cfg .AckPolicy == ackPolicyNotSet {
cfg .AckPolicy = AckExplicitPolicy
}
if cfg .ReplayPolicy == replayPolicyNotSet {
cfg .ReplayPolicy = ReplayInstantPolicy
}
if cfg .MaxAckPending == 0 && ch != nil && cfg .AckPolicy != AckNonePolicy {
cfg .MaxAckPending = cap (ch )
}
ccreq = &createConsumerRequest {
Stream : stream ,
Config : &cfg ,
}
hbi = cfg .Heartbeat
}
if isPullMode {
nms = fmt .Sprintf (js .apiSubj (apiRequestNextT ), stream , consumer )
deliver = nc .NewInbox ()
deliver += ".*"
}
var cancel func ()
if ctx != nil {
ctx , cancel = context .WithCancel (ctx )
}
jsi := &jsSub {
js : js ,
stream : stream ,
consumer : consumer ,
deliver : deliver ,
hbi : hbi ,
ordered : o .ordered ,
ccreq : ccreq ,
dseq : 1 ,
pull : isPullMode ,
nms : nms ,
psubj : subj ,
cancel : cancel ,
ackNone : o .cfg .AckPolicy == AckNonePolicy ,
ctx : o .ctx ,
}
if cb != nil && !o .mack && o .cfg .AckPolicy != AckNonePolicy {
ocb := cb
cb = func (m *Msg ) { ocb (m ); m .Ack () }
}
sub , err := nc .subscribe (deliver , queue , cb , ch , nil , isSync , jsi )
if err != nil {
return nil , err
}
cleanUpSub := func () {
if sub != nil {
sub .mu .Lock ()
sub .jsi = nil
sub .mu .Unlock ()
sub .Unsubscribe ()
}
}
consName := o .cfg .Name
if shouldCreate {
if cfg .Durable != "" {
consName = cfg .Durable
} else if consName == "" {
consName = getHash (nuid .Next ())
}
var info *ConsumerInfo
if o .ctx != nil {
info , err = js .upsertConsumer (stream , consName , ccreq .Config , Context (o .ctx ))
} else {
info , err = js .upsertConsumer (stream , consName , ccreq .Config )
}
if err != nil {
var apiErr *APIError
if ok := errors .As (err , &apiErr ); !ok {
cleanUpSub ()
return nil , err
}
if consumer == _EMPTY_ ||
(apiErr .ErrorCode != JSErrCodeConsumerAlreadyExists && apiErr .ErrorCode != JSErrCodeConsumerNameExists ) {
cleanUpSub ()
if errors .Is (apiErr , ErrStreamNotFound ) {
return nil , ErrStreamNotFound
}
return nil , err
}
if !isPullMode {
cleanUpSub ()
}
info , err = js .ConsumerInfo (stream , consumer )
if err != nil {
return nil , err
}
deliver , err = processConsInfo (info , o .cfg , isPullMode , subj , queue )
if err != nil {
return nil , err
}
if !isPullMode {
if isSync {
ch = make (chan *Msg , cap (ch ))
} else if ch != nil {
for done := false ; !done ; {
select {
case <- ch :
default :
done = true
}
}
}
jsi .deliver = deliver
jsi .hbi = info .Config .Heartbeat
sub , err = nc .subscribe (jsi .deliver , queue , cb , ch , nil , isSync , jsi )
if err != nil {
return nil , err
}
hasFC = info .Config .FlowControl
hasHeartbeats = info .Config .Heartbeat > 0
}
} else {
sub .mu .Lock ()
sub .jsi .dc = true
sub .jsi .pending = info .NumPending + info .Delivered .Consumer
if consumer == _EMPTY_ {
sub .jsi .consumer = info .Name
if isPullMode {
sub .jsi .nms = fmt .Sprintf (js .apiSubj (apiRequestNextT ), stream , info .Name )
}
}
sub .mu .Unlock ()
}
maxap = info .Config .MaxAckPending
}
if maxap > DefaultSubPendingMsgsLimit {
bl := maxap * 1024 * 1024
if bl < DefaultSubPendingBytesLimit {
bl = DefaultSubPendingBytesLimit
}
if err := sub .SetPendingLimits (maxap , bl ); err != nil {
return nil , err
}
}
if hasHeartbeats {
sub .scheduleHeartbeatCheck ()
}
if sub .Type () == ChanSubscription && hasFC {
sub .chanSubcheckForFlowControlResponse ()
}
if ctx != nil {
go func () {
<-ctx .Done ()
sub .Unsubscribe ()
}()
}
return sub , nil
}
func (sub *Subscription ) InitialConsumerPending () (uint64 , error ) {
sub .mu .Lock ()
defer sub .mu .Unlock ()
if sub .jsi == nil || sub .jsi .consumer == _EMPTY_ {
return 0 , fmt .Errorf ("%w: not a JetStream subscription" , ErrTypeSubscription )
}
return sub .jsi .pending , nil
}
func (sub *Subscription ) chanSubcheckForFlowControlResponse () {
sub .mu .Lock ()
if sub .closed {
sub .mu .Unlock ()
return
}
var fcReply string
var nc *Conn
jsi := sub .jsi
if jsi .csfct == nil {
jsi .csfct = time .AfterFunc (chanSubFCCheckInterval , sub .chanSubcheckForFlowControlResponse )
} else {
fcReply = sub .checkForFlowControlResponse ()
nc = sub .conn
jsi .csfct .Reset (chanSubFCCheckInterval )
}
sub .mu .Unlock ()
nc .Publish (fcReply , nil )
}
type ErrConsumerSequenceMismatch struct {
StreamResumeSequence uint64
ConsumerSequence uint64
LastConsumerSequence uint64
}
func (ecs *ErrConsumerSequenceMismatch ) Error () string {
return fmt .Sprintf ("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d" ,
ecs .ConsumerSequence ,
ecs .LastConsumerSequence -ecs .ConsumerSequence ,
ecs .StreamResumeSequence ,
)
}
func isJSControlMessage(msg *Msg ) (bool , int ) {
if len (msg .Data ) > 0 || msg .Header .Get (statusHdr ) != controlMsg {
return false , 0
}
val := msg .Header .Get (descrHdr )
if strings .HasPrefix (val , "Idle" ) {
return true , jsCtrlHB
}
if strings .HasPrefix (val , "Flow" ) {
return true , jsCtrlFC
}
return true , 0
}
func (sub *Subscription ) trackSequences (reply string ) {
sub .jsi .fciseq ++
sub .jsi .cmeta = reply
}
func (sub *Subscription ) checkOrderedMsgs (m *Msg ) bool {
if m .Reply == _EMPTY_ {
return false
}
tokens , err := parser .GetMetadataFields (m .Reply )
if err != nil {
return false
}
sseq , dseq := parser .ParseNum (tokens [parser .AckStreamSeqTokenPos ]), parser .ParseNum (tokens [parser .AckConsumerSeqTokenPos ])
jsi := sub .jsi
if dseq != jsi .dseq {
sub .resetOrderedConsumer (jsi .sseq + 1 )
return true
}
jsi .dseq , jsi .sseq = dseq +1 , sseq
return false
}
func (sub *Subscription ) applyNewSID () (osid int64 ) {
nc := sub .conn
sub .mu .Unlock ()
nc .subsMu .Lock ()
osid = sub .sid
delete (nc .subs , osid )
nc .ssid ++
nsid := nc .ssid
nc .subs [nsid ] = sub
nc .subsMu .Unlock ()
sub .mu .Lock ()
sub .sid = nsid
return osid
}
func (sub *Subscription ) resetOrderedConsumer (sseq uint64 ) {
nc := sub .conn
if sub .jsi == nil || nc == nil || sub .closed {
return
}
var maxStr string
if sub .max > 0 {
if sub .jsi .fciseq < sub .max {
adjustedMax := sub .max - sub .jsi .fciseq
maxStr = strconv .Itoa (int (adjustedMax ))
} else {
go func (sid int64 ) {
nc .mu .Lock ()
nc .bw .appendString (fmt .Sprintf (unsubProto , sid , _EMPTY_ ))
nc .kickFlusher ()
nc .mu .Unlock ()
}(sub .sid )
return
}
}
osid := sub .applyNewSID ()
newDeliver := nc .NewInbox ()
sub .Subject = newDeliver
nsid := sub .sid
go func () {
nc .mu .Lock ()
nc .bw .appendString (fmt .Sprintf (unsubProto , osid , _EMPTY_ ))
nc .bw .appendString (fmt .Sprintf (subProto , newDeliver , _EMPTY_ , nsid ))
if maxStr != _EMPTY_ {
nc .bw .appendString (fmt .Sprintf (unsubProto , nsid , maxStr ))
}
nc .kickFlusher ()
nc .mu .Unlock ()
pushErr := func (err error ) {
nc .handleConsumerSequenceMismatch (sub , fmt .Errorf ("%w: recreating ordered consumer" , err ))
nc .unsubscribe (sub , 0 , true )
}
sub .mu .Lock ()
jsi := sub .jsi
jsi .dseq = 1
jsi .cmeta = _EMPTY_
jsi .fcr , jsi .fcd = _EMPTY_ , 0
jsi .deliver = newDeliver
cfg := jsi .ccreq .Config
cfg .DeliverSubject = newDeliver
cfg .DeliverPolicy = DeliverByStartSequencePolicy
cfg .OptStartSeq = sseq
cfg .OptStartTime = nil
js := jsi .js
sub .mu .Unlock ()
sub .mu .Lock ()
if jsi .consumer != _EMPTY_ {
go js .DeleteConsumer (jsi .stream , jsi .consumer )
}
jsi .consumer = ""
sub .mu .Unlock ()
consName := getHash (nuid .Next ())
var cinfo *ConsumerInfo
var err error
if js .opts .ctx != nil {
cinfo , err = js .upsertConsumer (jsi .stream , consName , cfg , Context (js .opts .ctx ))
} else {
cinfo , err = js .upsertConsumer (jsi .stream , consName , cfg )
}
if err != nil {
var apiErr *APIError
if errors .Is (err , ErrJetStreamNotEnabled ) || errors .Is (err , ErrTimeout ) || errors .Is (err , context .DeadlineExceeded ) {
return
} else if errors .As (err , &apiErr ) && apiErr .ErrorCode == JSErrCodeInsufficientResourcesErr {
return
} else if errors .As (err , &apiErr ) && apiErr .ErrorCode == JSErrCodeJetStreamNotAvailable {
return
}
pushErr (err )
return
}
sub .mu .Lock ()
jsi .consumer = cinfo .Name
sub .mu .Unlock ()
}()
}
func (sub *Subscription ) getJSDelivered () uint64 {
if sub .typ == ChanSubscription {
return sub .jsi .fciseq - uint64 (len (sub .mch ))
}
return sub .delivered
}
func (sub *Subscription ) checkForFlowControlResponse () string {
jsi := sub .jsi
jsi .active = true
if sub .getJSDelivered () >= jsi .fcd {
fcr := jsi .fcr
jsi .fcr , jsi .fcd = _EMPTY_ , 0
return fcr
}
return _EMPTY_
}
func (sub *Subscription ) scheduleFlowControlResponse (reply string ) {
sub .jsi .fcr , sub .jsi .fcd = reply , sub .jsi .fciseq
}
func (sub *Subscription ) activityCheck () {
sub .mu .Lock ()
jsi := sub .jsi
if jsi == nil || sub .closed {
sub .mu .Unlock ()
return
}
active := jsi .active
jsi .hbc .Reset (jsi .hbi * hbcThresh )
jsi .active = false
nc := sub .conn
sub .mu .Unlock ()
if !active {
if !jsi .ordered || nc .Status () != CONNECTED {
nc .mu .Lock ()
if errCB := nc .Opts .AsyncErrorCB ; errCB != nil {
nc .ach .push (func () { errCB (nc , sub , ErrConsumerNotActive ) })
}
nc .mu .Unlock ()
return
}
sub .mu .Lock ()
sub .resetOrderedConsumer (jsi .sseq + 1 )
sub .mu .Unlock ()
}
}
func (sub *Subscription ) scheduleHeartbeatCheck () {
sub .mu .Lock ()
defer sub .mu .Unlock ()
jsi := sub .jsi
if jsi == nil {
return
}
if jsi .hbc == nil {
jsi .hbc = time .AfterFunc (jsi .hbi *hbcThresh , sub .activityCheck )
} else {
jsi .hbc .Reset (jsi .hbi * hbcThresh )
}
}
func (nc *Conn ) handleConsumerSequenceMismatch (sub *Subscription , err error ) {
nc .mu .Lock ()
errCB := nc .Opts .AsyncErrorCB
if errCB != nil {
nc .ach .push (func () { errCB (nc , sub , err ) })
}
nc .mu .Unlock ()
}
func (nc *Conn ) checkForSequenceMismatch (msg *Msg , s *Subscription , jsi *jsSub ) {
s .mu .Lock ()
ctrl , ordered := jsi .cmeta , jsi .ordered
jsi .active = true
s .mu .Unlock ()
if ctrl == _EMPTY_ {
return
}
tokens , err := parser .GetMetadataFields (ctrl )
if err != nil {
return
}
var ldseq string
dseq := tokens [parser .AckConsumerSeqTokenPos ]
hdr := msg .Header [lastConsumerSeqHdr ]
if len (hdr ) == 1 {
ldseq = hdr [0 ]
}
if ldseq != dseq {
sseq := parser .ParseNum (tokens [parser .AckStreamSeqTokenPos ])
if ordered {
s .mu .Lock ()
s .resetOrderedConsumer (jsi .sseq + 1 )
s .mu .Unlock ()
} else {
ecs := &ErrConsumerSequenceMismatch {
StreamResumeSequence : uint64 (sseq ),
ConsumerSequence : parser .ParseNum (dseq ),
LastConsumerSequence : parser .ParseNum (ldseq ),
}
nc .handleConsumerSequenceMismatch (s , ecs )
}
}
}
type streamRequest struct {
Subject string `json:"subject,omitempty"`
}
type streamNamesResponse struct {
apiResponse
apiPaged
Streams []string `json:"streams"`
}
type subOpts struct {
stream, consumer string
cfg *ConsumerConfig
bound bool
mack bool
ordered bool
ctx context .Context
skipCInfo bool
}
func SkipConsumerLookup () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .skipCInfo = true
return nil
})
}
func OrderedConsumer () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .ordered = true
return nil
})
}
func ManualAck () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .mack = true
return nil
})
}
func Description (description string ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .Description = description
return nil
})
}
func Durable (consumer string ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
if opts .cfg .Durable != _EMPTY_ {
return errors .New ("nats: option Durable set more than once" )
}
if opts .consumer != _EMPTY_ && opts .consumer != consumer {
return fmt .Errorf ("nats: duplicate consumer names (%s and %s)" , opts .consumer , consumer )
}
if err := checkConsumerName (consumer ); err != nil {
return err
}
opts .cfg .Durable = consumer
return nil
})
}
func DeliverAll () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .DeliverPolicy = DeliverAllPolicy
return nil
})
}
func DeliverLast () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .DeliverPolicy = DeliverLastPolicy
return nil
})
}
func DeliverLastPerSubject () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .DeliverPolicy = DeliverLastPerSubjectPolicy
return nil
})
}
func DeliverNew () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .DeliverPolicy = DeliverNewPolicy
return nil
})
}
func StartSequence (seq uint64 ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .DeliverPolicy = DeliverByStartSequencePolicy
opts .cfg .OptStartSeq = seq
return nil
})
}
func StartTime (startTime time .Time ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .DeliverPolicy = DeliverByStartTimePolicy
opts .cfg .OptStartTime = &startTime
return nil
})
}
func AckNone () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .AckPolicy = AckNonePolicy
return nil
})
}
func AckAll () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .AckPolicy = AckAllPolicy
return nil
})
}
func AckExplicit () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .AckPolicy = AckExplicitPolicy
return nil
})
}
func MaxDeliver (n int ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .MaxDeliver = n
return nil
})
}
func MaxAckPending (n int ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .MaxAckPending = n
return nil
})
}
func ReplayOriginal () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .ReplayPolicy = ReplayOriginalPolicy
return nil
})
}
func ReplayInstant () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .ReplayPolicy = ReplayInstantPolicy
return nil
})
}
func RateLimit (n uint64 ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .RateLimit = n
return nil
})
}
func BackOff (backOff []time .Duration ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .BackOff = backOff
return nil
})
}
func BindStream (stream string ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
if opts .stream != _EMPTY_ && opts .stream != stream {
return fmt .Errorf ("nats: duplicate stream name (%s and %s)" , opts .stream , stream )
}
opts .stream = stream
return nil
})
}
func Bind (stream , consumer string ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
if stream == _EMPTY_ {
return ErrStreamNameRequired
}
if consumer == _EMPTY_ {
return ErrConsumerNameRequired
}
if opts .cfg .Durable != _EMPTY_ && opts .cfg .Durable != consumer {
return fmt .Errorf ("nats: duplicate consumer names (%s and %s)" , opts .cfg .Durable , consumer )
}
if opts .stream != _EMPTY_ && opts .stream != stream {
return fmt .Errorf ("nats: duplicate stream name (%s and %s)" , opts .stream , stream )
}
opts .stream = stream
opts .consumer = consumer
opts .bound = true
return nil
})
}
func EnableFlowControl () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .FlowControl = true
return nil
})
}
func IdleHeartbeat (duration time .Duration ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .Heartbeat = duration
return nil
})
}
func DeliverSubject (subject string ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .DeliverSubject = subject
return nil
})
}
func HeadersOnly () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .HeadersOnly = true
return nil
})
}
func MaxRequestBatch (max int ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .MaxRequestBatch = max
return nil
})
}
func MaxRequestExpires (max time .Duration ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .MaxRequestExpires = max
return nil
})
}
func MaxRequestMaxBytes (bytes int ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .MaxRequestMaxBytes = bytes
return nil
})
}
func InactiveThreshold (threshold time .Duration ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
if threshold < 0 {
return fmt .Errorf ("invalid InactiveThreshold value (%v), needs to be greater or equal to 0" , threshold )
}
opts .cfg .InactiveThreshold = threshold
return nil
})
}
func ConsumerReplicas (replicas int ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
if replicas < 1 {
return fmt .Errorf ("invalid ConsumerReplicas value (%v), needs to be greater than 0" , replicas )
}
opts .cfg .Replicas = replicas
return nil
})
}
func ConsumerMemoryStorage () SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .MemoryStorage = true
return nil
})
}
func ConsumerName (name string ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .Name = name
return nil
})
}
func ConsumerFilterSubjects (subjects ...string ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .FilterSubjects = subjects
return nil
})
}
func (sub *Subscription ) ConsumerInfo () (*ConsumerInfo , error ) {
sub .mu .Lock ()
if sub .jsi == nil || sub .jsi .consumer == _EMPTY_ {
if sub .jsi .ordered {
sub .mu .Unlock ()
return nil , ErrConsumerInfoOnOrderedReset
}
sub .mu .Unlock ()
return nil , ErrTypeSubscription
}
js := sub .jsi .js
stream , consumer := sub .jsi .stream , sub .jsi .consumer
sub .mu .Unlock ()
return js .getConsumerInfo (stream , consumer )
}
type pullOpts struct {
maxBytes int
ttl time .Duration
ctx context .Context
hb time .Duration
}
type PullOpt interface {
configurePull(opts *pullOpts ) error
}
func PullMaxWaiting (n int ) SubOpt {
return subOptFn (func (opts *subOpts ) error {
opts .cfg .MaxWaiting = n
return nil
})
}
type PullHeartbeat time .Duration
func (h PullHeartbeat ) configurePull (opts *pullOpts ) error {
if h <= 0 {
return fmt .Errorf ("%w: idle heartbeat has to be greater than 0" , ErrInvalidArg )
}
opts .hb = time .Duration (h )
return nil
}
type PullMaxBytes int
func (n PullMaxBytes ) configurePull (opts *pullOpts ) error {
opts .maxBytes = int (n )
return nil
}
var (
errNoMessages = errors .New ("nats: no messages" )
errRequestsPending = errors .New ("nats: requests pending" )
)
func checkMsg(msg *Msg , checkSts , isNoWait bool ) (usrMsg bool , err error ) {
usrMsg = true
if len (msg .Data ) > 0 || len (msg .Header ) == 0 {
return
}
val := msg .Header .Get (statusHdr )
if val == _EMPTY_ {
return
}
usrMsg = false
if !checkSts {
return
}
if isHb , _ := isJSControlMessage (msg ); isHb {
return
}
switch val {
case noResponders :
err = ErrNoResponders
case noMessagesSts :
err = errNoMessages
case reqTimeoutSts :
if isNoWait {
err = errRequestsPending
} else {
err = ErrTimeout
}
case jetStream409Sts :
if strings .Contains (strings .ToLower (msg .Header .Get (descrHdr )), "consumer deleted" ) {
err = ErrConsumerDeleted
break
}
if strings .Contains (strings .ToLower (msg .Header .Get (descrHdr )), "leadership change" ) {
err = ErrConsumerLeadershipChanged
break
}
fallthrough
default :
err = fmt .Errorf ("nats: %s" , msg .Header .Get (descrHdr ))
}
return
}
func (sub *Subscription ) Fetch (batch int , opts ...PullOpt ) ([]*Msg , error ) {
if sub == nil {
return nil , ErrBadSubscription
}
if batch < 1 {
return nil , ErrInvalidArg
}
var o pullOpts
for _ , opt := range opts {
if err := opt .configurePull (&o ); err != nil {
return nil , err
}
}
if o .ctx != nil && o .ttl != 0 {
return nil , ErrContextAndTimeout
}
sub .mu .Lock ()
jsi := sub .jsi
if jsi == nil || !jsi .pull {
sub .mu .Unlock ()
return nil , ErrTypeSubscription
}
nc := sub .conn
nms := sub .jsi .nms
rply , _ := newFetchInbox (jsi .deliver )
js := sub .jsi .js
pmc := len (sub .mch ) > 0
ttl := o .ttl
if ttl == 0 {
ttl = js .opts .wait
}
sub .mu .Unlock ()
var (
ctx = o .ctx
err error
cancel context .CancelFunc
)
if ctx == nil {
ctx , cancel = context .WithTimeout (context .Background (), ttl )
} else if _ , hasDeadline := ctx .Deadline (); !hasDeadline {
if octx , ok := ctx .(ContextOpt ); ok && octx .Context == context .Background () {
return nil , ErrNoDeadlineContext
}
ctx , cancel = context .WithTimeout (ctx , ttl )
} else {
ctx , cancel = context .WithCancel (ctx )
}
defer cancel ()
if o .hb > 0 {
deadline , _ := ctx .Deadline ()
if 2 *o .hb >= time .Until (deadline ) {
return nil , fmt .Errorf ("%w: idle heartbeat value too large" , ErrInvalidArg )
}
}
select {
case <- ctx .Done ():
if o .ctx != nil {
err = ctx .Err ()
} else {
err = ErrTimeout
}
default :
}
if err != nil {
return nil , err
}
var (
msgs = make ([]*Msg , 0 , batch )
msg *Msg
)
for pmc && len (msgs ) < batch {
msg , err = sub .nextMsgWithContext (ctx , true , false )
if err != nil {
if errors .Is (err , errNoMessages ) {
err = nil
}
break
}
if usrMsg , _ := checkMsg (msg , false , false ); usrMsg {
msgs = append (msgs , msg )
}
}
var hbTimer *time .Timer
defer func () {
if hbTimer != nil {
hbTimer .Stop ()
}
}()
var hbErr error
sub .mu .Lock ()
subClosed := sub .closed || sub .draining
sub .mu .Unlock ()
if subClosed {
err = errors .Join (ErrBadSubscription , ErrSubscriptionClosed )
}
hbLock := sync .Mutex {}
var disconnected atomic .Bool
if err == nil && len (msgs ) < batch && !subClosed {
noWait := batch -len (msgs ) > 1
var nr nextRequest
sendReq := func () error {
deadline , _ := ctx .Deadline ()
ttl = time .Until (deadline )
select {
case <- ctx .Done ():
return ctx .Err ()
default :
}
expiresDiff := time .Duration (float64 (ttl ) * 0.1 )
if expiresDiff > 5 *time .Second {
expiresDiff = 5 * time .Second
}
expires := ttl - expiresDiff
nr .Batch = batch - len (msgs )
nr .Expires = expires
nr .NoWait = noWait
nr .MaxBytes = o .maxBytes
if 2 *o .hb < expires {
nr .Heartbeat = o .hb
} else {
nr .Heartbeat = 0
}
req , _ := json .Marshal (nr )
if err := nc .PublishRequest (nms , rply , req ); err != nil {
return err
}
if o .hb > 0 {
if hbTimer == nil {
hbTimer = time .AfterFunc (2 *o .hb , func () {
hbLock .Lock ()
hbErr = ErrNoHeartbeat
hbLock .Unlock ()
cancel ()
})
} else {
hbTimer .Reset (2 * o .hb )
}
}
return nil
}
connStatusChanged := nc .StatusChanged ()
go func () {
select {
case <- ctx .Done ():
case <- connStatusChanged :
disconnected .Store (true )
cancel ()
}
nc .RemoveStatusListener (connStatusChanged )
}()
err = sendReq ()
for err == nil && len (msgs ) < batch {
msg , err = sub .nextMsgWithContext (ctx , true , true )
if err == nil {
if hbTimer != nil {
hbTimer .Reset (2 * o .hb )
}
var usrMsg bool
usrMsg , err = checkMsg (msg , true , noWait )
if err == nil && usrMsg {
msgs = append (msgs , msg )
} else if noWait && (errors .Is (err , errNoMessages ) || errors .Is (err , errRequestsPending )) && len (msgs ) == 0 {
noWait = false
err = sendReq ()
} else if errors .Is (err , ErrTimeout ) && len (msgs ) == 0 {
err = nil
}
}
}
}
if err != nil && len (msgs ) == 0 {
hbLock .Lock ()
defer hbLock .Unlock ()
if hbErr != nil {
return nil , hbErr
}
if disconnected .Load () {
return nil , ErrFetchDisconnected
}
return nil , o .checkCtxErr (err )
}
return msgs , nil
}
func newFetchInbox(subj string ) (string , string ) {
if !strings .HasSuffix (subj , ".*" ) {
return subj , ""
}
reqID := nuid .Next ()
var sb strings .Builder
sb .WriteString (subj [:len (subj )-1 ])
sb .WriteString (reqID )
return sb .String (), reqID
}
func subjectMatchesReqID(subject , reqID string ) bool {
subjectParts := strings .Split (subject , "." )
if len (subjectParts ) < 2 {
return false
}
return subjectParts [len (subjectParts )-1 ] == reqID
}
type MessageBatch interface {
Messages () <-chan *Msg
Error () error
Done () <-chan struct {}
}
type messageBatch struct {
sync .Mutex
msgs chan *Msg
err error
done chan struct {}
}
func (mb *messageBatch ) Messages () <-chan *Msg {
mb .Lock ()
defer mb .Unlock ()
return mb .msgs
}
func (mb *messageBatch ) Error () error {
mb .Lock ()
defer mb .Unlock ()
return mb .err
}
func (mb *messageBatch ) Done () <-chan struct {} {
mb .Lock ()
defer mb .Unlock ()
return mb .done
}
func (sub *Subscription ) FetchBatch (batch int , opts ...PullOpt ) (MessageBatch , error ) {
if sub == nil {
return nil , ErrBadSubscription
}
if batch < 1 {
return nil , ErrInvalidArg
}
var o pullOpts
for _ , opt := range opts {
if err := opt .configurePull (&o ); err != nil {
return nil , err
}
}
if o .ctx != nil && o .ttl != 0 {
return nil , ErrContextAndTimeout
}
sub .mu .Lock ()
jsi := sub .jsi
if jsi == nil || !jsi .pull {
sub .mu .Unlock ()
return nil , ErrTypeSubscription
}
nc := sub .conn
nms := sub .jsi .nms
rply , reqID := newFetchInbox (sub .jsi .deliver )
js := sub .jsi .js
pmc := len (sub .mch ) > 0
ttl := o .ttl
if ttl == 0 {
ttl = js .opts .wait
}
sub .mu .Unlock ()
var (
ctx = o .ctx
cancel context .CancelFunc
cancelContext = true
)
if ctx == nil {
ctx , cancel = context .WithTimeout (context .Background (), ttl )
} else if _ , hasDeadline := ctx .Deadline (); !hasDeadline {
if octx , ok := ctx .(ContextOpt ); ok && octx .Context == context .Background () {
return nil , ErrNoDeadlineContext
}
ctx , cancel = context .WithTimeout (ctx , ttl )
} else {
ctx , cancel = context .WithCancel (ctx )
}
defer func () {
if cancelContext {
cancel ()
}
}()
if o .hb > 0 {
deadline , _ := ctx .Deadline ()
if 2 *o .hb >= time .Until (deadline ) {
return nil , fmt .Errorf ("%w: idle heartbeat value too large" , ErrInvalidArg )
}
}
select {
case <- ctx .Done ():
if o .ctx != nil {
return nil , ctx .Err ()
} else {
return nil , ErrTimeout
}
default :
}
result := &messageBatch {
msgs : make (chan *Msg , batch ),
done : make (chan struct {}, 1 ),
}
var msg *Msg
for pmc && len (result .msgs ) < batch {
msg , err := sub .nextMsgWithContext (ctx , true , false )
if err != nil {
if errors .Is (err , errNoMessages ) {
err = nil
}
result .err = err
break
}
if usrMsg , _ := checkMsg (msg , false , false ); usrMsg {
result .msgs <- msg
}
}
sub .mu .Lock ()
subClosed := sub .closed || sub .draining
sub .mu .Unlock ()
if len (result .msgs ) == batch || result .err != nil || subClosed {
close (result .msgs )
if subClosed && len (result .msgs ) == 0 {
return nil , errors .Join (ErrBadSubscription , ErrSubscriptionClosed )
}
result .done <- struct {}{}
return result , nil
}
deadline , _ := ctx .Deadline ()
ttl = time .Until (deadline )
expiresDiff := time .Duration (float64 (ttl ) * 0.1 )
if expiresDiff > 5 *time .Second {
expiresDiff = 5 * time .Second
}
expires := ttl - expiresDiff
connStatusChanged := nc .StatusChanged ()
var disconnected atomic .Bool
go func () {
select {
case <- ctx .Done ():
case <- connStatusChanged :
disconnected .Store (true )
cancel ()
}
nc .RemoveStatusListener (connStatusChanged )
}()
requestBatch := batch - len (result .msgs )
req := nextRequest {
Expires : expires ,
Batch : requestBatch ,
MaxBytes : o .maxBytes ,
Heartbeat : o .hb ,
}
reqJSON , err := json .Marshal (req )
if err != nil {
close (result .msgs )
result .done <- struct {}{}
result .err = err
return result , nil
}
if err := nc .PublishRequest (nms , rply , reqJSON ); err != nil {
if len (result .msgs ) == 0 {
return nil , err
}
close (result .msgs )
result .done <- struct {}{}
result .err = err
return result , nil
}
var hbTimer *time .Timer
defer func () {
if hbTimer != nil {
hbTimer .Stop ()
}
}()
var hbErr error
if o .hb > 0 {
hbTimer = time .AfterFunc (2 *o .hb , func () {
result .Lock ()
hbErr = ErrNoHeartbeat
result .Unlock ()
cancel ()
})
}
cancelContext = false
go func () {
defer cancel ()
var requestMsgs int
for requestMsgs < requestBatch {
msg , err = sub .nextMsgWithContext (ctx , true , true )
if err != nil {
break
}
if hbTimer != nil {
hbTimer .Reset (2 * o .hb )
}
var usrMsg bool
usrMsg , err = checkMsg (msg , true , false )
if err != nil {
if errors .Is (err , ErrTimeout ) {
if reqID != "" && !subjectMatchesReqID (msg .Subject , reqID ) {
continue
}
err = nil
}
break
}
if usrMsg {
result .Lock ()
result .msgs <- msg
result .Unlock ()
requestMsgs ++
}
}
if err != nil {
result .Lock ()
if hbErr != nil {
result .err = hbErr
} else if disconnected .Load () {
result .err = ErrFetchDisconnected
} else {
result .err = o .checkCtxErr (err )
}
result .Unlock ()
}
close (result .msgs )
result .Lock ()
result .done <- struct {}{}
result .Unlock ()
}()
return result , nil
}
func (o *pullOpts ) checkCtxErr (err error ) error {
if o .ctx == nil && errors .Is (err , context .DeadlineExceeded ) {
return ErrTimeout
}
return err
}
func (js *js ) getConsumerInfo (stream , consumer string ) (*ConsumerInfo , error ) {
ctx , cancel := context .WithTimeout (context .Background (), js .opts .wait )
defer cancel ()
return js .getConsumerInfoContext (ctx , stream , consumer )
}
func (js *js ) getConsumerInfoContext (ctx context .Context , stream , consumer string ) (*ConsumerInfo , error ) {
ccInfoSubj := fmt .Sprintf (apiConsumerInfoT , stream , consumer )
resp , err := js .apiRequestWithContext (ctx , js .apiSubj (ccInfoSubj ), nil )
if err != nil {
if errors .Is (err , ErrNoResponders ) {
err = ErrJetStreamNotEnabled
}
return nil , err
}
var info consumerResponse
if err := json .Unmarshal (resp .Data , &info ); err != nil {
return nil , err
}
if info .Error != nil {
if errors .Is (info .Error , ErrConsumerNotFound ) {
return nil , ErrConsumerNotFound
}
if errors .Is (info .Error , ErrStreamNotFound ) {
return nil , ErrStreamNotFound
}
return nil , info .Error
}
if info .Error == nil && info .ConsumerInfo == nil {
return nil , ErrConsumerNotFound
}
return info .ConsumerInfo , nil
}
func (js *js ) apiRequestWithContext (ctx context .Context , subj string , data []byte ) (*Msg , error ) {
if js .opts .shouldTrace {
ctrace := js .opts .ctrace
if ctrace .RequestSent != nil {
ctrace .RequestSent (subj , data )
}
}
resp , err := js .nc .RequestWithContext (ctx , subj , data )
if err != nil {
return nil , err
}
if js .opts .shouldTrace {
ctrace := js .opts .ctrace
if ctrace .ResponseReceived != nil {
ctrace .ResponseReceived (subj , resp .Data , resp .Header )
}
}
return resp , nil
}
func (m *Msg ) checkReply () error {
if m == nil || m .Sub == nil {
return ErrMsgNotBound
}
if m .Reply == _EMPTY_ {
return ErrMsgNoReply
}
return nil
}
func (m *Msg ) ackReply (ackType []byte , sync bool , opts ...AckOpt ) error {
var o ackOpts
for _ , opt := range opts {
if err := opt .configureAck (&o ); err != nil {
return err
}
}
if err := m .checkReply (); err != nil {
return err
}
var ackNone bool
var js *js
sub := m .Sub
sub .mu .Lock ()
nc := sub .conn
if jsi := sub .jsi ; jsi != nil {
js = jsi .js
ackNone = jsi .ackNone
}
sub .mu .Unlock ()
if atomic .LoadUint32 (&m .ackd ) == 1 {
return ErrMsgAlreadyAckd
}
if ackNone {
return ErrCantAckIfConsumerAckNone
}
usesCtx := o .ctx != nil
usesWait := o .ttl > 0
if usesWait && usesCtx {
return ErrContextAndTimeout
}
sync = sync || usesCtx || usesWait
ctx := o .ctx
wait := defaultRequestWait
if usesWait {
wait = o .ttl
} else if js != nil {
wait = js .opts .wait
}
var body []byte
var err error
if o .nakDelay > 0 {
body = []byte (fmt .Sprintf ("%s {\"delay\": %d}" , ackType , o .nakDelay .Nanoseconds ()))
} else {
body = ackType
}
if sync {
if usesCtx {
_, err = nc .RequestWithContext (ctx , m .Reply , body )
} else {
_, err = nc .Request (m .Reply , body , wait )
}
} else {
err = nc .Publish (m .Reply , body )
}
if err == nil && !bytes .Equal (ackType , ackProgress ) {
atomic .StoreUint32 (&m .ackd , 1 )
}
return err
}
func (m *Msg ) Ack (opts ...AckOpt ) error {
return m .ackReply (ackAck , false , opts ...)
}
func (m *Msg ) AckSync (opts ...AckOpt ) error {
return m .ackReply (ackAck , true , opts ...)
}
func (m *Msg ) Nak (opts ...AckOpt ) error {
return m .ackReply (ackNak , false , opts ...)
}
func (m *Msg ) NakWithDelay (delay time .Duration , opts ...AckOpt ) error {
if delay > 0 {
opts = append (opts , nakDelay (delay ))
}
return m .ackReply (ackNak , false , opts ...)
}
func (m *Msg ) Term (opts ...AckOpt ) error {
return m .ackReply (ackTerm , false , opts ...)
}
func (m *Msg ) InProgress (opts ...AckOpt ) error {
return m .ackReply (ackProgress , false , opts ...)
}
type MsgMetadata struct {
Sequence SequencePair
NumDelivered uint64
NumPending uint64
Timestamp time .Time
Stream string
Consumer string
Domain string
}
func (m *Msg ) Metadata () (*MsgMetadata , error ) {
if err := m .checkReply (); err != nil {
return nil , err
}
tokens , err := parser .GetMetadataFields (m .Reply )
if err != nil {
return nil , err
}
meta := &MsgMetadata {
Domain : tokens [parser .AckDomainTokenPos ],
NumDelivered : parser .ParseNum (tokens [parser .AckNumDeliveredTokenPos ]),
NumPending : parser .ParseNum (tokens [parser .AckNumPendingTokenPos ]),
Timestamp : time .Unix (0 , int64 (parser .ParseNum (tokens [parser .AckTimestampSeqTokenPos ]))),
Stream : tokens [parser .AckStreamTokenPos ],
Consumer : tokens [parser .AckConsumerTokenPos ],
}
meta .Sequence .Stream = parser .ParseNum (tokens [parser .AckStreamSeqTokenPos ])
meta .Sequence .Consumer = parser .ParseNum (tokens [parser .AckConsumerSeqTokenPos ])
return meta , nil
}
type AckPolicy int
const (
AckNonePolicy AckPolicy = iota
AckAllPolicy
AckExplicitPolicy
ackPolicyNotSet = 99
)
func jsonString(s string ) string {
return "\"" + s + "\""
}
func (p *AckPolicy ) UnmarshalJSON (data []byte ) error {
switch string (data ) {
case jsonString ("none" ):
*p = AckNonePolicy
case jsonString ("all" ):
*p = AckAllPolicy
case jsonString ("explicit" ):
*p = AckExplicitPolicy
default :
return fmt .Errorf ("nats: can not unmarshal %q" , data )
}
return nil
}
func (p AckPolicy ) MarshalJSON () ([]byte , error ) {
switch p {
case AckNonePolicy :
return json .Marshal ("none" )
case AckAllPolicy :
return json .Marshal ("all" )
case AckExplicitPolicy :
return json .Marshal ("explicit" )
default :
return nil , fmt .Errorf ("nats: unknown acknowledgement policy %v" , p )
}
}
func (p AckPolicy ) String () string {
switch p {
case AckNonePolicy :
return "AckNone"
case AckAllPolicy :
return "AckAll"
case AckExplicitPolicy :
return "AckExplicit"
case ackPolicyNotSet :
return "Not Initialized"
default :
return "Unknown AckPolicy"
}
}
type ReplayPolicy int
const (
ReplayInstantPolicy ReplayPolicy = iota
ReplayOriginalPolicy
replayPolicyNotSet = 99
)
func (p *ReplayPolicy ) UnmarshalJSON (data []byte ) error {
switch string (data ) {
case jsonString ("instant" ):
*p = ReplayInstantPolicy
case jsonString ("original" ):
*p = ReplayOriginalPolicy
default :
return fmt .Errorf ("nats: can not unmarshal %q" , data )
}
return nil
}
func (p ReplayPolicy ) MarshalJSON () ([]byte , error ) {
switch p {
case ReplayOriginalPolicy :
return json .Marshal ("original" )
case ReplayInstantPolicy :
return json .Marshal ("instant" )
default :
return nil , fmt .Errorf ("nats: unknown replay policy %v" , p )
}
}
var (
ackAck = []byte ("+ACK" )
ackNak = []byte ("-NAK" )
ackProgress = []byte ("+WPI" )
ackTerm = []byte ("+TERM" )
)
type DeliverPolicy int
const (
DeliverAllPolicy DeliverPolicy = iota
DeliverLastPolicy
DeliverNewPolicy
DeliverByStartSequencePolicy
DeliverByStartTimePolicy
DeliverLastPerSubjectPolicy
deliverPolicyNotSet = 99
)
func (p *DeliverPolicy ) UnmarshalJSON (data []byte ) error {
switch string (data ) {
case jsonString ("all" ), jsonString ("undefined" ):
*p = DeliverAllPolicy
case jsonString ("last" ):
*p = DeliverLastPolicy
case jsonString ("new" ):
*p = DeliverNewPolicy
case jsonString ("by_start_sequence" ):
*p = DeliverByStartSequencePolicy
case jsonString ("by_start_time" ):
*p = DeliverByStartTimePolicy
case jsonString ("last_per_subject" ):
*p = DeliverLastPerSubjectPolicy
}
return nil
}
func (p DeliverPolicy ) MarshalJSON () ([]byte , error ) {
switch p {
case DeliverAllPolicy :
return json .Marshal ("all" )
case DeliverLastPolicy :
return json .Marshal ("last" )
case DeliverNewPolicy :
return json .Marshal ("new" )
case DeliverByStartSequencePolicy :
return json .Marshal ("by_start_sequence" )
case DeliverByStartTimePolicy :
return json .Marshal ("by_start_time" )
case DeliverLastPerSubjectPolicy :
return json .Marshal ("last_per_subject" )
default :
return nil , fmt .Errorf ("nats: unknown deliver policy %v" , p )
}
}
type RetentionPolicy int
const (
LimitsPolicy RetentionPolicy = iota
InterestPolicy
WorkQueuePolicy
)
type DiscardPolicy int
const (
DiscardOld DiscardPolicy = iota
DiscardNew
)
const (
limitsPolicyString = "limits"
interestPolicyString = "interest"
workQueuePolicyString = "workqueue"
)
func (rp RetentionPolicy ) String () string {
switch rp {
case LimitsPolicy :
return "Limits"
case InterestPolicy :
return "Interest"
case WorkQueuePolicy :
return "WorkQueue"
default :
return "Unknown Retention Policy"
}
}
func (rp RetentionPolicy ) MarshalJSON () ([]byte , error ) {
switch rp {
case LimitsPolicy :
return json .Marshal (limitsPolicyString )
case InterestPolicy :
return json .Marshal (interestPolicyString )
case WorkQueuePolicy :
return json .Marshal (workQueuePolicyString )
default :
return nil , fmt .Errorf ("nats: can not marshal %v" , rp )
}
}
func (rp *RetentionPolicy ) UnmarshalJSON (data []byte ) error {
switch string (data ) {
case jsonString (limitsPolicyString ):
*rp = LimitsPolicy
case jsonString (interestPolicyString ):
*rp = InterestPolicy
case jsonString (workQueuePolicyString ):
*rp = WorkQueuePolicy
default :
return fmt .Errorf ("nats: can not unmarshal %q" , data )
}
return nil
}
func (dp DiscardPolicy ) String () string {
switch dp {
case DiscardOld :
return "DiscardOld"
case DiscardNew :
return "DiscardNew"
default :
return "Unknown Discard Policy"
}
}
func (dp DiscardPolicy ) MarshalJSON () ([]byte , error ) {
switch dp {
case DiscardOld :
return json .Marshal ("old" )
case DiscardNew :
return json .Marshal ("new" )
default :
return nil , fmt .Errorf ("nats: can not marshal %v" , dp )
}
}
func (dp *DiscardPolicy ) UnmarshalJSON (data []byte ) error {
switch strings .ToLower (string (data )) {
case jsonString ("old" ):
*dp = DiscardOld
case jsonString ("new" ):
*dp = DiscardNew
default :
return fmt .Errorf ("nats: can not unmarshal %q" , data )
}
return nil
}
type StorageType int
const (
FileStorage StorageType = iota
MemoryStorage
)
const (
memoryStorageString = "memory"
fileStorageString = "file"
)
func (st StorageType ) String () string {
switch st {
case MemoryStorage :
return "Memory"
case FileStorage :
return "File"
default :
return "Unknown Storage Type"
}
}
func (st StorageType ) MarshalJSON () ([]byte , error ) {
switch st {
case MemoryStorage :
return json .Marshal (memoryStorageString )
case FileStorage :
return json .Marshal (fileStorageString )
default :
return nil , fmt .Errorf ("nats: can not marshal %v" , st )
}
}
func (st *StorageType ) UnmarshalJSON (data []byte ) error {
switch string (data ) {
case jsonString (memoryStorageString ):
*st = MemoryStorage
case jsonString (fileStorageString ):
*st = FileStorage
default :
return fmt .Errorf ("nats: can not unmarshal %q" , data )
}
return nil
}
type StoreCompression uint8
const (
NoCompression StoreCompression = iota
S2Compression
)
func (alg StoreCompression ) String () string {
switch alg {
case NoCompression :
return "None"
case S2Compression :
return "S2"
default :
return "Unknown StoreCompression"
}
}
func (alg StoreCompression ) MarshalJSON () ([]byte , error ) {
var str string
switch alg {
case S2Compression :
str = "s2"
case NoCompression :
str = "none"
default :
return nil , errors .New ("unknown compression algorithm" )
}
return json .Marshal (str )
}
func (alg *StoreCompression ) UnmarshalJSON (b []byte ) error {
var str string
if err := json .Unmarshal (b , &str ); err != nil {
return err
}
switch str {
case "s2" :
*alg = S2Compression
case "none" :
*alg = NoCompression
default :
return errors .New ("unknown compression algorithm" )
}
return nil
}
const nameHashLen = 8
func getHash(name string ) string {
sha := sha256 .New ()
sha .Write ([]byte (name ))
b := sha .Sum (nil )
for i := 0 ; i < nameHashLen ; i ++ {
b [i ] = rdigits [int (b [i ]%base )]
}
return string (b [:nameHashLen ])
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .