package nats
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"
)
type JetStreamManager interface {
AddStream (cfg *StreamConfig , opts ...JSOpt ) (*StreamInfo , error )
UpdateStream (cfg *StreamConfig , opts ...JSOpt ) (*StreamInfo , error )
DeleteStream (name string , opts ...JSOpt ) error
StreamInfo (stream string , opts ...JSOpt ) (*StreamInfo , error )
PurgeStream (name string , opts ...JSOpt ) error
StreamsInfo (opts ...JSOpt ) <-chan *StreamInfo
Streams (opts ...JSOpt ) <-chan *StreamInfo
StreamNames (opts ...JSOpt ) <-chan string
GetMsg (name string , seq uint64 , opts ...JSOpt ) (*RawStreamMsg , error )
GetLastMsg (name, subject string , opts ...JSOpt ) (*RawStreamMsg , error )
DeleteMsg (name string , seq uint64 , opts ...JSOpt ) error
SecureDeleteMsg (name string , seq uint64 , opts ...JSOpt ) error
AddConsumer (stream string , cfg *ConsumerConfig , opts ...JSOpt ) (*ConsumerInfo , error )
UpdateConsumer (stream string , cfg *ConsumerConfig , opts ...JSOpt ) (*ConsumerInfo , error )
DeleteConsumer (stream, consumer string , opts ...JSOpt ) error
ConsumerInfo (stream, name string , opts ...JSOpt ) (*ConsumerInfo , error )
ConsumersInfo (stream string , opts ...JSOpt ) <-chan *ConsumerInfo
Consumers (stream string , opts ...JSOpt ) <-chan *ConsumerInfo
ConsumerNames (stream string , opts ...JSOpt ) <-chan string
AccountInfo (opts ...JSOpt ) (*AccountInfo , error )
StreamNameBySubject (string , ...JSOpt ) (string , error )
}
type StreamConfig struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`
MaxAge time .Duration `json:"max_age"`
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Duplicates time .Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Sealed bool `json:"sealed,omitempty"`
DenyDelete bool `json:"deny_delete,omitempty"`
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
Compression StoreCompression `json:"compression"`
FirstSeq uint64 `json:"first_seq,omitempty"`
SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`
RePublish *RePublish `json:"republish,omitempty"`
AllowDirect bool `json:"allow_direct"`
MirrorDirect bool `json:"mirror_direct"`
ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"`
Metadata map [string ]string `json:"metadata,omitempty"`
Template string `json:"template_owner,omitempty"`
AllowMsgTTL bool `json:"allow_msg_ttl"`
SubjectDeleteMarkerTTL time .Duration `json:"subject_delete_marker_ttl,omitempty"`
}
type SubjectTransformConfig struct {
Source string `json:"src,omitempty"`
Destination string `json:"dest"`
}
type RePublish struct {
Source string `json:"src,omitempty"`
Destination string `json:"dest"`
HeadersOnly bool `json:"headers_only,omitempty"`
}
type Placement struct {
Cluster string `json:"cluster"`
Tags []string `json:"tags,omitempty"`
}
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time .Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
}
type ExternalStream struct {
APIPrefix string `json:"api"`
DeliverPrefix string `json:"deliver,omitempty"`
}
type StreamConsumerLimits struct {
InactiveThreshold time .Duration `json:"inactive_threshold,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
}
func (ss *StreamSource ) copy () *StreamSource {
nss := *ss
if ss .OptStartTime != nil {
t := *ss .OptStartTime
nss .OptStartTime = &t
}
if ss .External != nil {
ext := *ss .External
nss .External = &ext
}
return &nss
}
func (ss *StreamSource ) convertDomain () error {
if ss .Domain == _EMPTY_ {
return nil
}
if ss .External != nil {
return errors .New ("nats: domain and external are both set" )
}
ss .External = &ExternalStream {APIPrefix : fmt .Sprintf (jsExtDomainT , ss .Domain )}
return nil
}
type apiResponse struct {
Type string `json:"type"`
Error *APIError `json:"error,omitempty"`
}
type apiPaged struct {
Total int `json:"total"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}
type apiPagedRequest struct {
Offset int `json:"offset,omitempty"`
}
type AccountInfo struct {
Tier
Domain string `json:"domain"`
API APIStats `json:"api"`
Tiers map [string ]Tier `json:"tiers"`
}
type Tier struct {
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
ReservedMemory uint64 `json:"reserved_memory"`
ReservedStore uint64 `json:"reserved_storage"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
Limits AccountLimits `json:"limits"`
}
type APIStats struct {
Total uint64 `json:"total"`
Errors uint64 `json:"errors"`
}
type AccountLimits struct {
MaxMemory int64 `json:"max_memory"`
MaxStore int64 `json:"max_storage"`
MaxStreams int `json:"max_streams"`
MaxConsumers int `json:"max_consumers"`
MaxAckPending int `json:"max_ack_pending"`
MemoryMaxStreamBytes int64 `json:"memory_max_stream_bytes"`
StoreMaxStreamBytes int64 `json:"storage_max_stream_bytes"`
MaxBytesRequired bool `json:"max_bytes_required"`
}
type accountInfoResponse struct {
apiResponse
AccountInfo
}
func (js *js ) AccountInfo (opts ...JSOpt ) (*AccountInfo , error ) {
o , cancel , err := getJSContextOpts (js .opts , opts ...)
if err != nil {
return nil , err
}
if cancel != nil {
defer cancel ()
}
resp , err := js .apiRequestWithContext (o .ctx , js .apiSubj (apiAccountInfo ), nil )
if err != nil {
if errors .Is (err , ErrNoResponders ) {
err = ErrJetStreamNotEnabled
}
return nil , err
}
var info accountInfoResponse
if err := json .Unmarshal (resp .Data , &info ); err != nil {
return nil , err
}
if info .Error != nil {
if errors .Is (info .Error , ErrJetStreamNotEnabledForAccount ) {
return nil , ErrJetStreamNotEnabledForAccount
}
return nil , info .Error
}
return &info .AccountInfo , nil
}
type createConsumerRequest struct {
Stream string `json:"stream_name"`
Config *ConsumerConfig `json:"config"`
}
type consumerResponse struct {
apiResponse
*ConsumerInfo
}
func (js *js ) AddConsumer (stream string , cfg *ConsumerConfig , opts ...JSOpt ) (*ConsumerInfo , error ) {
if cfg == nil {
cfg = &ConsumerConfig {}
}
consumerName := cfg .Name
if consumerName == _EMPTY_ {
consumerName = cfg .Durable
}
if consumerName != _EMPTY_ {
consInfo , err := js .ConsumerInfo (stream , consumerName , opts ...)
if err != nil && !errors .Is (err , ErrConsumerNotFound ) && !errors .Is (err , ErrStreamNotFound ) {
return nil , err
}
if consInfo != nil {
sameConfig := checkConfig (&consInfo .Config , cfg )
if sameConfig != nil {
return nil , fmt .Errorf ("%w: creating consumer %q on stream %q" , ErrConsumerNameAlreadyInUse , consumerName , stream )
} else {
return consInfo , nil
}
}
}
return js .upsertConsumer (stream , consumerName , cfg , opts ...)
}
func (js *js ) UpdateConsumer (stream string , cfg *ConsumerConfig , opts ...JSOpt ) (*ConsumerInfo , error ) {
if cfg == nil {
return nil , ErrConsumerConfigRequired
}
consumerName := cfg .Name
if consumerName == _EMPTY_ {
consumerName = cfg .Durable
}
if consumerName == _EMPTY_ {
return nil , ErrConsumerNameRequired
}
return js .upsertConsumer (stream , consumerName , cfg , opts ...)
}
func (js *js ) upsertConsumer (stream , consumerName string , cfg *ConsumerConfig , opts ...JSOpt ) (*ConsumerInfo , error ) {
if err := checkStreamName (stream ); err != nil {
return nil , err
}
o , cancel , err := getJSContextOpts (js .opts , opts ...)
if err != nil {
return nil , err
}
if cancel != nil {
defer cancel ()
}
req , err := json .Marshal (&createConsumerRequest {Stream : stream , Config : cfg })
if err != nil {
return nil , err
}
var ccSubj string
if consumerName == _EMPTY_ {
ccSubj = fmt .Sprintf (apiLegacyConsumerCreateT , stream )
} else if err := checkConsumerName (consumerName ); err != nil {
return nil , err
} else if js .nc .serverMinVersion (2 , 9 , 0 ) {
if cfg .Durable != "" && js .opts .featureFlags .useDurableConsumerCreate {
ccSubj = fmt .Sprintf (apiDurableCreateT , stream , consumerName )
} else if cfg .FilterSubject == _EMPTY_ || cfg .FilterSubject == ">" {
ccSubj = fmt .Sprintf (apiConsumerCreateT , stream , consumerName )
} else {
if cfg .FilterSubject [0 ] == '.' || cfg .FilterSubject [len (cfg .FilterSubject )-1 ] == '.' {
return nil , fmt .Errorf ("%w: %q" , ErrInvalidFilterSubject , cfg .FilterSubject )
}
ccSubj = fmt .Sprintf (apiConsumerCreateWithFilterSubjectT , stream , consumerName , cfg .FilterSubject )
}
} else {
if cfg .Durable != "" {
ccSubj = fmt .Sprintf (apiDurableCreateT , stream , consumerName )
} else {
ccSubj = fmt .Sprintf (apiLegacyConsumerCreateT , stream )
}
}
resp , err := js .apiRequestWithContext (o .ctx , js .apiSubj (ccSubj ), req )
if err != nil {
if errors .Is (err , ErrNoResponders ) {
err = ErrJetStreamNotEnabled
}
return nil , err
}
var info consumerResponse
err = json .Unmarshal (resp .Data , &info )
if err != nil {
return nil , err
}
if info .Error != nil {
if errors .Is (info .Error , ErrStreamNotFound ) {
return nil , ErrStreamNotFound
}
if errors .Is (info .Error , ErrConsumerNotFound ) {
return nil , ErrConsumerNotFound
}
return nil , info .Error
}
if info .Error == nil && info .ConsumerInfo == nil {
return nil , ErrConsumerCreationResponseEmpty
}
if len (cfg .FilterSubjects ) != 0 && len (info .Config .FilterSubjects ) == 0 {
return nil , ErrConsumerMultipleFilterSubjectsNotSupported
}
return info .ConsumerInfo , nil
}
type consumerDeleteResponse struct {
apiResponse
Success bool `json:"success,omitempty"`
}
func checkStreamName(stream string ) error {
if stream == _EMPTY_ {
return ErrStreamNameRequired
}
if strings .ContainsAny (stream , ". " ) {
return ErrInvalidStreamName
}
return nil
}
func checkConsumerName(consumer string ) error {
if consumer == _EMPTY_ {
return ErrConsumerNameRequired
}
if strings .ContainsAny (consumer , ". " ) {
return ErrInvalidConsumerName
}
return nil
}
func (js *js ) DeleteConsumer (stream , consumer string , opts ...JSOpt ) error {
if err := checkStreamName (stream ); err != nil {
return err
}
if err := checkConsumerName (consumer ); err != nil {
return err
}
o , cancel , err := getJSContextOpts (js .opts , opts ...)
if err != nil {
return err
}
if cancel != nil {
defer cancel ()
}
dcSubj := js .apiSubj (fmt .Sprintf (apiConsumerDeleteT , stream , consumer ))
r , err := js .apiRequestWithContext (o .ctx , dcSubj , nil )
if err != nil {
return err
}
var resp consumerDeleteResponse
if err := json .Unmarshal (r .Data , &resp ); err != nil {
return err
}
if resp .Error != nil {
if errors .Is (resp .Error , ErrConsumerNotFound ) {
return ErrConsumerNotFound
}
return resp .Error
}
return nil
}
func (js *js ) ConsumerInfo (stream , consumer string , opts ...JSOpt ) (*ConsumerInfo , error ) {
if err := checkStreamName (stream ); err != nil {
return nil , err
}
if err := checkConsumerName (consumer ); err != nil {
return nil , err
}
o , cancel , err := getJSContextOpts (js .opts , opts ...)
if err != nil {
return nil , err
}
if cancel != nil {
defer cancel ()
}
return js .getConsumerInfoContext (o .ctx , stream , consumer )
}
type consumerLister struct {
stream string
js *js
err error
offset int
page []*ConsumerInfo
pageInfo *apiPaged
}
type consumersRequest struct {
apiPagedRequest
}
type consumerListResponse struct {
apiResponse
apiPaged
Consumers []*ConsumerInfo `json:"consumers"`
}
func (c *consumerLister ) Next () bool {
if c .err != nil {
return false
}
if err := checkStreamName (c .stream ); err != nil {
c .err = err
return false
}
if c .pageInfo != nil && c .offset >= c .pageInfo .Total {
return false
}
req , err := json .Marshal (consumersRequest {
apiPagedRequest : apiPagedRequest {Offset : c .offset },
})
if err != nil {
c .err = err
return false
}
var cancel context .CancelFunc
ctx := c .js .opts .ctx
if ctx == nil {
ctx , cancel = context .WithTimeout (context .Background (), c .js .opts .wait )
defer cancel ()
}
clSubj := c .js .apiSubj (fmt .Sprintf (apiConsumerListT , c .stream ))
r , err := c .js .apiRequestWithContext (ctx , clSubj , req )
if err != nil {
c .err = err
return false
}
var resp consumerListResponse
if err := json .Unmarshal (r .Data , &resp ); err != nil {
c .err = err
return false
}
if resp .Error != nil {
c .err = resp .Error
return false
}
c .pageInfo = &resp .apiPaged
c .page = resp .Consumers
c .offset += len (c .page )
return true
}
func (c *consumerLister ) Page () []*ConsumerInfo {
return c .page
}
func (c *consumerLister ) Err () error {
return c .err
}
func (jsc *js ) Consumers (stream string , opts ...JSOpt ) <-chan *ConsumerInfo {
o , cancel , err := getJSContextOpts (jsc .opts , opts ...)
if err != nil {
return nil
}
ch := make (chan *ConsumerInfo )
l := &consumerLister {js : &js {nc : jsc .nc , opts : o }, stream : stream }
go func () {
if cancel != nil {
defer cancel ()
}
defer close (ch )
for l .Next () {
for _ , info := range l .Page () {
select {
case ch <- info :
case <- o .ctx .Done ():
return
}
}
}
}()
return ch
}
func (jsc *js ) ConsumersInfo (stream string , opts ...JSOpt ) <-chan *ConsumerInfo {
return jsc .Consumers (stream , opts ...)
}
type consumerNamesLister struct {
stream string
js *js
err error
offset int
page []string
pageInfo *apiPaged
}
type consumerNamesListResponse struct {
apiResponse
apiPaged
Consumers []string `json:"consumers"`
}
func (c *consumerNamesLister ) Next () bool {
if c .err != nil {
return false
}
if err := checkStreamName (c .stream ); err != nil {
c .err = err
return false
}
if c .pageInfo != nil && c .offset >= c .pageInfo .Total {
return false
}
var cancel context .CancelFunc
ctx := c .js .opts .ctx
if ctx == nil {
ctx , cancel = context .WithTimeout (context .Background (), c .js .opts .wait )
defer cancel ()
}
req , err := json .Marshal (consumersRequest {
apiPagedRequest : apiPagedRequest {Offset : c .offset },
})
if err != nil {
c .err = err
return false
}
clSubj := c .js .apiSubj (fmt .Sprintf (apiConsumerNamesT , c .stream ))
r , err := c .js .apiRequestWithContext (ctx , clSubj , req )
if err != nil {
c .err = err
return false
}
var resp consumerNamesListResponse
if err := json .Unmarshal (r .Data , &resp ); err != nil {
c .err = err
return false
}
if resp .Error != nil {
c .err = resp .Error
return false
}
c .pageInfo = &resp .apiPaged
c .page = resp .Consumers
c .offset += len (c .page )
return true
}
func (c *consumerNamesLister ) Page () []string {
return c .page
}
func (c *consumerNamesLister ) Err () error {
return c .err
}
func (jsc *js ) ConsumerNames (stream string , opts ...JSOpt ) <-chan string {
o , cancel , err := getJSContextOpts (jsc .opts , opts ...)
if err != nil {
return nil
}
ch := make (chan string )
l := &consumerNamesLister {stream : stream , js : &js {nc : jsc .nc , opts : o }}
go func () {
if cancel != nil {
defer cancel ()
}
defer close (ch )
for l .Next () {
for _ , info := range l .Page () {
select {
case ch <- info :
case <- o .ctx .Done ():
return
}
}
}
}()
return ch
}
type streamCreateResponse struct {
apiResponse
*StreamInfo
}
func (js *js ) AddStream (cfg *StreamConfig , opts ...JSOpt ) (*StreamInfo , error ) {
if cfg == nil {
return nil , ErrStreamConfigRequired
}
if err := checkStreamName (cfg .Name ); err != nil {
return nil , err
}
o , cancel , err := getJSContextOpts (js .opts , opts ...)
if err != nil {
return nil , err
}
if cancel != nil {
defer cancel ()
}
ncfg := *cfg
if cfg .Mirror != nil && cfg .Mirror .Domain != _EMPTY_ {
ncfg .Mirror = ncfg .Mirror .copy ()
if err := ncfg .Mirror .convertDomain (); err != nil {
return nil , err
}
}
if len (ncfg .Sources ) > 0 {
ncfg .Sources = append ([]*StreamSource (nil ), ncfg .Sources ...)
for i , ss := range ncfg .Sources {
if ss .Domain != _EMPTY_ {
ncfg .Sources [i ] = ss .copy ()
if err := ncfg .Sources [i ].convertDomain (); err != nil {
return nil , err
}
}
}
}
req , err := json .Marshal (&ncfg )
if err != nil {
return nil , err
}
csSubj := js .apiSubj (fmt .Sprintf (apiStreamCreateT , cfg .Name ))
r , err := js .apiRequestWithContext (o .ctx , csSubj , req )
if err != nil {
return nil , err
}
var resp streamCreateResponse
if err := json .Unmarshal (r .Data , &resp ); err != nil {
return nil , err
}
if resp .Error != nil {
if errors .Is (resp .Error , ErrStreamNameAlreadyInUse ) {
return nil , ErrStreamNameAlreadyInUse
}
return nil , resp .Error
}
if cfg .SubjectTransform != nil && resp .StreamInfo .Config .SubjectTransform == nil {
return nil , ErrStreamSubjectTransformNotSupported
}
if len (cfg .Sources ) != 0 {
if len (cfg .Sources ) != len (resp .Config .Sources ) {
return nil , ErrStreamSourceNotSupported
}
for i := range cfg .Sources {
if len (cfg .Sources [i ].SubjectTransforms ) != 0 && len (resp .Sources [i ].SubjectTransforms ) == 0 {
return nil , ErrStreamSourceMultipleSubjectTransformsNotSupported
}
}
}
return resp .StreamInfo , nil
}
type (
StreamInfoRequest struct {
apiPagedRequest
DeletedDetails bool `json:"deleted_details,omitempty"`
SubjectsFilter string `json:"subjects_filter,omitempty"`
}
streamInfoResponse = struct {
apiResponse
apiPaged
*StreamInfo
}
)
func (js *js ) StreamInfo (stream string , opts ...JSOpt ) (*StreamInfo , error ) {
if err := checkStreamName (stream ); err != nil {
return nil , err
}
o , cancel , err := getJSContextOpts (js .opts , opts ...)
if err != nil {
return nil , err
}
if cancel != nil {
defer cancel ()
}
var i int
var subjectMessagesMap map [string ]uint64
var req []byte
var requestPayload bool
var siOpts StreamInfoRequest
if o .streamInfoOpts != nil {
requestPayload = true
siOpts = *o .streamInfoOpts
}
for {
if requestPayload {
siOpts .Offset = i
if req , err = json .Marshal (&siOpts ); err != nil {
return nil , err
}
}
siSubj := js .apiSubj (fmt .Sprintf (apiStreamInfoT , stream ))
r , err := js .apiRequestWithContext (o .ctx , siSubj , req )
if err != nil {
return nil , err
}
var resp streamInfoResponse
if err := json .Unmarshal (r .Data , &resp ); err != nil {
return nil , err
}
if resp .Error != nil {
if errors .Is (resp .Error , ErrStreamNotFound ) {
return nil , ErrStreamNotFound
}
return nil , resp .Error
}
var total int
if resp .Total != 0 {
total = resp .Total
} else {
total = len (resp .State .Subjects )
}
if requestPayload && len (resp .StreamInfo .State .Subjects ) > 0 {
if subjectMessagesMap == nil {
subjectMessagesMap = make (map [string ]uint64 , total )
}
for k , j := range resp .State .Subjects {
subjectMessagesMap [k ] = j
i ++
}
}
if i >= total {
if requestPayload {
resp .StreamInfo .State .Subjects = subjectMessagesMap
}
return resp .StreamInfo , nil
}
}
}
type StreamInfo struct {
Config StreamConfig `json:"config"`
Created time .Time `json:"created"`
State StreamState `json:"state"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
Mirror *StreamSourceInfo `json:"mirror,omitempty"`
Sources []*StreamSourceInfo `json:"sources,omitempty"`
Alternates []*StreamAlternate `json:"alternates,omitempty"`
}
type StreamAlternate struct {
Name string `json:"name"`
Domain string `json:"domain,omitempty"`
Cluster string `json:"cluster"`
}
type StreamSourceInfo struct {
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time .Duration `json:"active"`
External *ExternalStream `json:"external"`
Error *APIError `json:"error"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
}
type StreamState struct {
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time .Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time .Time `json:"last_ts"`
Consumers int `json:"consumer_count"`
Deleted []uint64 `json:"deleted"`
NumDeleted int `json:"num_deleted"`
NumSubjects uint64 `json:"num_subjects"`
Subjects map [string ]uint64 `json:"subjects"`
}
type ClusterInfo struct {
Name string `json:"name,omitempty"`
RaftGroup string `json:"raft_group,omitempty"`
Leader string `json:"leader,omitempty"`
LeaderSince *time .Time `json:"leader_since,omitempty"`
SystemAcc bool `json:"system_account,omitempty"`
TrafficAcc string `json:"traffic_account,omitempty"`
Replicas []*PeerInfo `json:"replicas,omitempty"`
}
type PeerInfo struct {
Name string `json:"name"`
Current bool `json:"current"`
Offline bool `json:"offline,omitempty"`
Active time .Duration `json:"active"`
Lag uint64 `json:"lag,omitempty"`
}
func (js *js ) UpdateStream (cfg *StreamConfig , opts ...JSOpt ) (*StreamInfo , error ) {
if cfg == nil {
return nil , ErrStreamConfigRequired
}
if err := checkStreamName (cfg .Name ); err != nil {
return nil , err
}
o , cancel , err := getJSContextOpts (js .opts , opts ...)
if err != nil {
return nil , err
}
if cancel != nil {
defer cancel ()
}
req , err := json .Marshal (cfg )
if err != nil {
return nil , err
}
usSubj := js .apiSubj (fmt .Sprintf (apiStreamUpdateT , cfg .Name ))
r , err := js .apiRequestWithContext (o .ctx , usSubj , req )
if err != nil {
return nil , err
}
var resp streamInfoResponse
if err := json .Unmarshal (r .Data , &resp ); err != nil {
return nil , err
}
if resp .Error != nil {
if errors .Is (resp .Error , ErrStreamNotFound ) {
return nil , ErrStreamNotFound
}
return nil , resp .Error
}
if cfg .SubjectTransform != nil && resp .StreamInfo .Config .SubjectTransform == nil {
return nil , ErrStreamSubjectTransformNotSupported
}
if len (cfg .Sources ) != 0 {
if len (cfg .Sources ) != len (resp .Config .Sources ) {
return nil , ErrStreamSourceNotSupported
}
for i := range cfg .Sources {
if len (cfg .Sources [i ].SubjectTransforms ) != 0 && len (resp .Sources [i ].SubjectTransforms ) == 0 {
return nil , ErrStreamSourceMultipleSubjectTransformsNotSupported
}
}
}
return resp .StreamInfo , nil
}
type streamDeleteResponse struct {
apiResponse
Success bool `json:"success,omitempty"`
}
func (js *js ) DeleteStream (name string , opts ...JSOpt ) error {
if err := checkStreamName (name ); err != nil {
return err
}
o , cancel , err := getJSContextOpts (js .opts , opts ...)
if err != nil {
return err
}
if cancel != nil {
defer cancel ()
}
dsSubj := js .apiSubj (fmt .Sprintf (apiStreamDeleteT , name ))
r , err := js .apiRequestWithContext (o .ctx , dsSubj , nil )
if err != nil {
return err
}
var resp streamDeleteResponse
if err := json .Unmarshal (r .Data , &resp ); err != nil {
return err
}
if resp .Error != nil {
if errors .Is (resp .Error , ErrStreamNotFound ) {
return ErrStreamNotFound
}
return resp .Error
}
return nil
}
type apiMsgGetRequest struct {
Seq uint64 `json:"seq,omitempty"`
LastFor string `json:"last_by_subj,omitempty"`
NextFor string `json:"next_by_subj,omitempty"`
}
type RawStreamMsg struct {
Subject string
Sequence uint64
Header Header
Data []byte
Time time .Time
}
type storedMsg struct {
Subject string `json:"subject"`
Sequence uint64 `json:"seq"`
Header []byte `json:"hdrs,omitempty"`
Data []byte `json:"data,omitempty"`
Time time .Time `json:"time"`
}
type apiMsgGetResponse struct {
apiResponse
Message *storedMsg `json:"message,omitempty"`
}
func (js *js ) GetLastMsg (name , subject string , opts ...JSOpt ) (*RawStreamMsg , error ) {
return js .getMsg (name , &apiMsgGetRequest {LastFor : subject }, opts ...)
}
func (js *js ) GetMsg (name string , seq uint64 , opts ...JSOpt ) (*RawStreamMsg , error ) {
return js .getMsg (name , &apiMsgGetRequest {Seq : seq }, opts ...)
}
func (js *js ) getMsg (name string , mreq *apiMsgGetRequest , opts ...JSOpt ) (*RawStreamMsg , error ) {
o , cancel , err := getJSContextOpts (js .opts , opts ...)
if err != nil {
return nil , err
}
if cancel != nil {
defer cancel ()
}
if err := checkStreamName (name ); err != nil {
return nil , err
}
var apiSubj string
if o .directGet && mreq .LastFor != _EMPTY_ {
apiSubj = apiDirectMsgGetLastBySubjectT
dsSubj := js .apiSubj (fmt .Sprintf (apiSubj , name , mreq .LastFor ))
r , err := js .apiRequestWithContext (o .ctx , dsSubj , nil )
if err != nil {
return nil , err
}
return convertDirectGetMsgResponseToMsg (name , r )
}
if o .directGet {
apiSubj = apiDirectMsgGetT
mreq .NextFor = o .directNextFor
} else {
apiSubj = apiMsgGetT
}
req , err := json .Marshal (mreq )
if err != nil {
return nil , err
}
dsSubj := js .apiSubj (fmt .Sprintf (apiSubj , name ))
r , err := js .apiRequestWithContext (o .ctx , dsSubj , req )
if err != nil {
return nil , err
}
if o .directGet {
return convertDirectGetMsgResponseToMsg (name , r )
}
var resp apiMsgGetResponse
if err := json .Unmarshal (r .Data , &resp ); err != nil {
return nil , err
}
if resp .Error != nil {
if errors .Is (resp .Error , ErrMsgNotFound ) {
return nil , ErrMsgNotFound
}
if errors .Is (resp .Error , ErrStreamNotFound ) {
return nil , ErrStreamNotFound
}
return nil , resp .Error
}
msg := resp .Message
var hdr Header
if len (msg .Header ) > 0 {
hdr , err = DecodeHeadersMsg (msg .Header )
if err != nil {
return nil , err
}
}
return &RawStreamMsg {
Subject : msg .Subject ,
Sequence : msg .Sequence ,
Header : hdr ,
Data : msg .Data ,
Time : msg .Time ,
}, nil
}
func convertDirectGetMsgResponseToMsg(name string , r *Msg ) (*RawStreamMsg , error ) {
if len (r .Data ) == 0 {
val := r .Header .Get (statusHdr )
if val != _EMPTY_ {
switch val {
case noMessagesSts :
return nil , ErrMsgNotFound
default :
desc := r .Header .Get (descrHdr )
if desc == _EMPTY_ {
desc = "unable to get message"
}
return nil , fmt .Errorf ("nats: %s" , desc )
}
}
}
if len (r .Header ) == 0 {
return nil , errors .New ("nats: response should have headers" )
}
stream := r .Header .Get (JSStream )
if stream == _EMPTY_ {
return nil , errors .New ("nats: missing stream header" )
}
seqStr := r .Header .Get (JSSequence )
if seqStr == _EMPTY_ {
return nil , errors .New ("nats: missing sequence header" )
}
seq , err := strconv .ParseUint (seqStr , 10 , 64 )
if err != nil {
return nil , fmt .Errorf ("nats: invalid sequence header '%s': %v" , seqStr , err )
}
timeStr := r .Header .Get (JSTimeStamp )
if timeStr == _EMPTY_ {
return nil , errors .New ("nats: missing timestamp header" )
}
tm , err := time .Parse (time .RFC3339Nano , timeStr )
if err != nil {
tm , err = time .Parse ("2006-01-02 15:04:05.999999999 +0000 UTC" , timeStr )
if err != nil {
return nil , fmt .Errorf ("nats: invalid timestamp header '%s': %v" , timeStr , err )
}
}
subj := r .Header .Get (JSSubject )
if subj == _EMPTY_ {
return nil , errors .New ("nats: missing subject header" )
}
return &RawStreamMsg {
Subject : subj ,
Sequence : seq ,
Header : r .Header ,
Data : r .Data ,
Time : tm ,
}, nil
}
type msgDeleteRequest struct {
Seq uint64 `json:"seq"`
NoErase bool `json:"no_erase,omitempty"`
}
type msgDeleteResponse struct {
apiResponse
Success bool `json:"success,omitempty"`
}
func (js *js ) DeleteMsg (name string , seq uint64 , opts ...JSOpt ) error {
o , cancel , err := getJSContextOpts (js .opts , opts ...)
if err != nil {
return err
}
if cancel != nil {
defer cancel ()
}
return js .deleteMsg (o .ctx , name , &msgDeleteRequest {Seq : seq , NoErase : true })
}
func (js *js ) SecureDeleteMsg (name string , seq uint64 , opts ...JSOpt ) error {
o , cancel , err := getJSContextOpts (js .opts , opts ...)
if err != nil {
return err
}
if cancel != nil {
defer cancel ()
}
return js .deleteMsg (o .ctx , name , &msgDeleteRequest {Seq : seq })
}
func (js *js ) deleteMsg (ctx context .Context , stream string , req *msgDeleteRequest ) error {
if err := checkStreamName (stream ); err != nil {
return err
}
reqJSON , err := json .Marshal (req )
if err != nil {
return err
}
dsSubj := js .apiSubj (fmt .Sprintf (apiMsgDeleteT , stream ))
r , err := js .apiRequestWithContext (ctx , dsSubj , reqJSON )
if err != nil {
return err
}
var resp msgDeleteResponse
if err := json .Unmarshal (r .Data , &resp ); err != nil {
return err
}
if resp .Error != nil {
return resp .Error
}
return nil
}
type StreamPurgeRequest struct {
Sequence uint64 `json:"seq,omitempty"`
Subject string `json:"filter,omitempty"`
Keep uint64 `json:"keep,omitempty"`
}
type streamPurgeResponse struct {
apiResponse
Success bool `json:"success,omitempty"`
Purged uint64 `json:"purged"`
}
func (js *js ) PurgeStream (stream string , opts ...JSOpt ) error {
if err := checkStreamName (stream ); err != nil {
return err
}
var req *StreamPurgeRequest
var ok bool
for _ , opt := range opts {
if req , ok = opt .(*StreamPurgeRequest ); ok {
break
}
}
return js .purgeStream (stream , req )
}
func (js *js ) purgeStream (stream string , req *StreamPurgeRequest , opts ...JSOpt ) error {
o , cancel , err := getJSContextOpts (js .opts , opts ...)
if err != nil {
return err
}
if cancel != nil {
defer cancel ()
}
var b []byte
if req != nil {
if b , err = json .Marshal (req ); err != nil {
return err
}
}
psSubj := js .apiSubj (fmt .Sprintf (apiStreamPurgeT , stream ))
r , err := js .apiRequestWithContext (o .ctx , psSubj , b )
if err != nil {
return err
}
var resp streamPurgeResponse
if err := json .Unmarshal (r .Data , &resp ); err != nil {
return err
}
if resp .Error != nil {
if errors .Is (resp .Error , ErrBadRequest ) {
return fmt .Errorf ("%w: %s" , ErrBadRequest , "invalid purge request body" )
}
return resp .Error
}
return nil
}
type streamLister struct {
js *js
page []*StreamInfo
err error
offset int
pageInfo *apiPaged
}
type streamListResponse struct {
apiResponse
apiPaged
Streams []*StreamInfo `json:"streams"`
}
type streamNamesRequest struct {
apiPagedRequest
Subject string `json:"subject,omitempty"`
}
func (s *streamLister ) Next () bool {
if s .err != nil {
return false
}
if s .pageInfo != nil && s .offset >= s .pageInfo .Total {
return false
}
req , err := json .Marshal (streamNamesRequest {
apiPagedRequest : apiPagedRequest {Offset : s .offset },
Subject : s .js .opts .streamListSubject ,
})
if err != nil {
s .err = err
return false
}
var cancel context .CancelFunc
ctx := s .js .opts .ctx
if ctx == nil {
ctx , cancel = context .WithTimeout (context .Background (), s .js .opts .wait )
defer cancel ()
}
slSubj := s .js .apiSubj (apiStreamListT )
r , err := s .js .apiRequestWithContext (ctx , slSubj , req )
if err != nil {
s .err = err
return false
}
var resp streamListResponse
if err := json .Unmarshal (r .Data , &resp ); err != nil {
s .err = err
return false
}
if resp .Error != nil {
s .err = resp .Error
return false
}
s .pageInfo = &resp .apiPaged
s .page = resp .Streams
s .offset += len (s .page )
return true
}
func (s *streamLister ) Page () []*StreamInfo {
return s .page
}
func (s *streamLister ) Err () error {
return s .err
}
func (jsc *js ) Streams (opts ...JSOpt ) <-chan *StreamInfo {
o , cancel , err := getJSContextOpts (jsc .opts , opts ...)
if err != nil {
return nil
}
ch := make (chan *StreamInfo )
l := &streamLister {js : &js {nc : jsc .nc , opts : o }}
go func () {
if cancel != nil {
defer cancel ()
}
defer close (ch )
for l .Next () {
for _ , info := range l .Page () {
select {
case ch <- info :
case <- o .ctx .Done ():
return
}
}
}
}()
return ch
}
func (jsc *js ) StreamsInfo (opts ...JSOpt ) <-chan *StreamInfo {
return jsc .Streams (opts ...)
}
type streamNamesLister struct {
js *js
err error
offset int
page []string
pageInfo *apiPaged
}
func (l *streamNamesLister ) Next () bool {
if l .err != nil {
return false
}
if l .pageInfo != nil && l .offset >= l .pageInfo .Total {
return false
}
var cancel context .CancelFunc
ctx := l .js .opts .ctx
if ctx == nil {
ctx , cancel = context .WithTimeout (context .Background (), l .js .opts .wait )
defer cancel ()
}
req , err := json .Marshal (streamNamesRequest {
apiPagedRequest : apiPagedRequest {Offset : l .offset },
Subject : l .js .opts .streamListSubject ,
})
if err != nil {
l .err = err
return false
}
r , err := l .js .apiRequestWithContext (ctx , l .js .apiSubj (apiStreams ), req )
if err != nil {
l .err = err
return false
}
var resp streamNamesResponse
if err := json .Unmarshal (r .Data , &resp ); err != nil {
l .err = err
return false
}
if resp .Error != nil {
l .err = resp .Error
return false
}
l .pageInfo = &resp .apiPaged
l .page = resp .Streams
l .offset += len (l .page )
return true
}
func (l *streamNamesLister ) Page () []string {
return l .page
}
func (l *streamNamesLister ) Err () error {
return l .err
}
func (jsc *js ) StreamNames (opts ...JSOpt ) <-chan string {
o , cancel , err := getJSContextOpts (jsc .opts , opts ...)
if err != nil {
return nil
}
ch := make (chan string )
l := &streamNamesLister {js : &js {nc : jsc .nc , opts : o }}
go func () {
if cancel != nil {
defer cancel ()
}
defer close (ch )
for l .Next () {
for _ , info := range l .Page () {
select {
case ch <- info :
case <- o .ctx .Done ():
return
}
}
}
}()
return ch
}
func (jsc *js ) StreamNameBySubject (subj string , opts ...JSOpt ) (string , error ) {
o , cancel , err := getJSContextOpts (jsc .opts , opts ...)
if err != nil {
return "" , err
}
if cancel != nil {
defer cancel ()
}
var slr streamNamesResponse
req := &streamRequest {subj }
j , err := json .Marshal (req )
if err != nil {
return _EMPTY_ , err
}
resp , err := jsc .apiRequestWithContext (o .ctx , jsc .apiSubj (apiStreams ), j )
if err != nil {
if errors .Is (err , ErrNoResponders ) {
err = ErrJetStreamNotEnabled
}
return _EMPTY_ , err
}
if err := json .Unmarshal (resp .Data , &slr ); err != nil {
return _EMPTY_ , err
}
if slr .Error != nil || len (slr .Streams ) != 1 {
return _EMPTY_ , ErrNoMatchingStream
}
return slr .Streams [0 ], nil
}
func getJSContextOpts(defs *jsOpts , opts ...JSOpt ) (*jsOpts , context .CancelFunc , error ) {
var o jsOpts
for _ , opt := range opts {
if err := opt .configureJSContext (&o ); err != nil {
return nil , nil , err
}
}
if o .ctx != nil && o .wait != 0 {
return nil , nil , ErrContextAndTimeout
}
if o .wait == 0 && o .ctx == nil {
o .wait = defs .wait
}
var cancel context .CancelFunc
if o .ctx == nil && o .wait > 0 {
o .ctx , cancel = context .WithTimeout (context .Background (), o .wait )
}
if o .pre == _EMPTY_ {
o .pre = defs .pre
}
if o .ctx != nil {
if _ , hasDeadline := o .ctx .Deadline (); !hasDeadline {
o .ctx , cancel = context .WithTimeout (o .ctx , defs .wait )
}
}
return &o , cancel , nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .