package nats
import (
"context"
"errors"
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/nats-io/nats.go/internal/parser"
)
type KeyValueManager interface {
KeyValue (bucket string ) (KeyValue , error )
CreateKeyValue (cfg *KeyValueConfig ) (KeyValue , error )
DeleteKeyValue (bucket string ) error
KeyValueStoreNames () <-chan string
KeyValueStores () <-chan KeyValueStatus
}
type KeyValue interface {
Get (key string ) (entry KeyValueEntry , err error )
GetRevision (key string , revision uint64 ) (entry KeyValueEntry , err error )
Put (key string , value []byte ) (revision uint64 , err error )
PutString (key string , value string ) (revision uint64 , err error )
Create (key string , value []byte ) (revision uint64 , err error )
Update (key string , value []byte , last uint64 ) (revision uint64 , err error )
Delete (key string , opts ...DeleteOpt ) error
Purge (key string , opts ...DeleteOpt ) error
Watch (keys string , opts ...WatchOpt ) (KeyWatcher , error )
WatchAll (opts ...WatchOpt ) (KeyWatcher , error )
WatchFiltered (keys []string , opts ...WatchOpt ) (KeyWatcher , error )
Keys (opts ...WatchOpt ) ([]string , error )
ListKeys (opts ...WatchOpt ) (KeyLister , error )
History (key string , opts ...WatchOpt ) ([]KeyValueEntry , error )
Bucket () string
PurgeDeletes (opts ...PurgeOpt ) error
Status () (KeyValueStatus , error )
}
type KeyValueStatus interface {
Bucket () string
Values () uint64
History () int64
TTL () time .Duration
BackingStore () string
Bytes () uint64
IsCompressed () bool
}
type KeyWatcher interface {
Context () context .Context
Updates () <-chan KeyValueEntry
Stop () error
Error () <-chan error
}
type KeyLister interface {
Keys () <-chan string
Stop () error
Error () <-chan error
}
type WatchOpt interface {
configureWatcher(opts *watchOpts ) error
}
func (ctx ContextOpt ) configureWatcher (opts *watchOpts ) error {
opts .ctx = ctx
return nil
}
type watchOpts struct {
ctx context .Context
ignoreDeletes bool
includeHistory bool
updatesOnly bool
metaOnly bool
}
type watchOptFn func (opts *watchOpts ) error
func (opt watchOptFn ) configureWatcher (opts *watchOpts ) error {
return opt (opts )
}
func IncludeHistory () WatchOpt {
return watchOptFn (func (opts *watchOpts ) error {
if opts .updatesOnly {
return errors .New ("nats: include history can not be used with updates only" )
}
opts .includeHistory = true
return nil
})
}
func UpdatesOnly () WatchOpt {
return watchOptFn (func (opts *watchOpts ) error {
if opts .includeHistory {
return errors .New ("nats: updates only can not be used with include history" )
}
opts .updatesOnly = true
return nil
})
}
func IgnoreDeletes () WatchOpt {
return watchOptFn (func (opts *watchOpts ) error {
opts .ignoreDeletes = true
return nil
})
}
func MetaOnly () WatchOpt {
return watchOptFn (func (opts *watchOpts ) error {
opts .metaOnly = true
return nil
})
}
type PurgeOpt interface {
configurePurge(opts *purgeOpts ) error
}
type purgeOpts struct {
dmthr time .Duration
ctx context .Context
}
type DeleteMarkersOlderThan time .Duration
func (ttl DeleteMarkersOlderThan ) configurePurge (opts *purgeOpts ) error {
opts .dmthr = time .Duration (ttl )
return nil
}
func (ctx ContextOpt ) configurePurge (opts *purgeOpts ) error {
opts .ctx = ctx
return nil
}
type DeleteOpt interface {
configureDelete(opts *deleteOpts ) error
}
type deleteOpts struct {
purge bool
revision uint64
}
type deleteOptFn func (opts *deleteOpts ) error
func (opt deleteOptFn ) configureDelete (opts *deleteOpts ) error {
return opt (opts )
}
func LastRevision (revision uint64 ) DeleteOpt {
return deleteOptFn (func (opts *deleteOpts ) error {
opts .revision = revision
return nil
})
}
func purge() DeleteOpt {
return deleteOptFn (func (opts *deleteOpts ) error {
opts .purge = true
return nil
})
}
type KeyValueConfig struct {
Bucket string `json:"bucket"`
Description string `json:"description,omitempty"`
MaxValueSize int32 `json:"max_value_size,omitempty"`
History uint8 `json:"history,omitempty"`
TTL time .Duration `json:"ttl,omitempty"`
MaxBytes int64 `json:"max_bytes,omitempty"`
Storage StorageType `json:"storage,omitempty"`
Replicas int `json:"num_replicas,omitempty"`
Placement *Placement `json:"placement,omitempty"`
RePublish *RePublish `json:"republish,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Compression bool `json:"compression,omitempty"`
}
const (
KeyValueMaxHistory = 64
AllKeys = ">"
kvLatestRevision = 0
kvop = "KV-Operation"
kvdel = "DEL"
kvpurge = "PURGE"
)
type KeyValueOp uint8
const (
KeyValuePut KeyValueOp = iota
KeyValueDelete
KeyValuePurge
)
func (op KeyValueOp ) String () string {
switch op {
case KeyValuePut :
return "KeyValuePutOp"
case KeyValueDelete :
return "KeyValueDeleteOp"
case KeyValuePurge :
return "KeyValuePurgeOp"
default :
return "Unknown Operation"
}
}
type KeyValueEntry interface {
Bucket () string
Key () string
Value () []byte
Revision () uint64
Created () time .Time
Delta () uint64
Operation () KeyValueOp
}
var (
ErrKeyValueConfigRequired = errors .New ("nats: config required" )
ErrInvalidBucketName = errors .New ("nats: invalid bucket name" )
ErrInvalidKey = errors .New ("nats: invalid key" )
ErrBucketNotFound = errors .New ("nats: bucket not found" )
ErrBadBucket = errors .New ("nats: bucket not valid key-value store" )
ErrKeyNotFound = errors .New ("nats: key not found" )
ErrKeyDeleted = errors .New ("nats: key was deleted" )
ErrHistoryToLarge = errors .New ("nats: history limited to a max of 64" )
ErrNoKeysFound = errors .New ("nats: no keys found" )
ErrKeyWatcherTimeout = errors .New ("nats: key watcher timed out waiting for initial keys" )
)
var (
ErrKeyExists JetStreamError = &jsError {apiErr : &APIError {ErrorCode : JSErrCodeStreamWrongLastSequence , Code : 400 }, message : "key exists" }
)
const (
kvBucketNamePre = "KV_"
kvBucketNameTmpl = "KV_%s"
kvSubjectsTmpl = "$KV.%s.>"
kvSubjectsPreTmpl = "$KV.%s."
kvSubjectsPreDomainTmpl = "%s.$KV.%s."
)
var (
validBucketRe = regexp .MustCompile (`^[a-zA-Z0-9_-]+$` )
validKeyRe = regexp .MustCompile (`^[-/_=\.a-zA-Z0-9]+$` )
validSearchKeyRe = regexp .MustCompile (`^[-/_=\.a-zA-Z0-9*]*[>]?$` )
)
func (js *js ) KeyValue (bucket string ) (KeyValue , error ) {
if !js .nc .serverMinVersion (2 , 6 , 2 ) {
return nil , errors .New ("nats: key-value requires at least server version 2.6.2" )
}
if !bucketValid (bucket ) {
return nil , ErrInvalidBucketName
}
stream := fmt .Sprintf (kvBucketNameTmpl , bucket )
si , err := js .StreamInfo (stream )
if err != nil {
if errors .Is (err , ErrStreamNotFound ) {
err = ErrBucketNotFound
}
return nil , err
}
if si .Config .MaxMsgsPerSubject < 1 {
return nil , ErrBadBucket
}
return mapStreamToKVS (js , si ), nil
}
func (js *js ) CreateKeyValue (cfg *KeyValueConfig ) (KeyValue , error ) {
if !js .nc .serverMinVersion (2 , 6 , 2 ) {
return nil , errors .New ("nats: key-value requires at least server version 2.6.2" )
}
if cfg == nil {
return nil , ErrKeyValueConfigRequired
}
if !bucketValid (cfg .Bucket ) {
return nil , ErrInvalidBucketName
}
if _ , err := js .AccountInfo (); err != nil {
return nil , err
}
history := int64 (1 )
if cfg .History > 0 {
if cfg .History > KeyValueMaxHistory {
return nil , ErrHistoryToLarge
}
history = int64 (cfg .History )
}
replicas := cfg .Replicas
if replicas == 0 {
replicas = 1
}
maxBytes := cfg .MaxBytes
if maxBytes == 0 {
maxBytes = -1
}
maxMsgSize := cfg .MaxValueSize
if maxMsgSize == 0 {
maxMsgSize = -1
}
duplicateWindow := 2 * time .Minute
if cfg .TTL > 0 && cfg .TTL < duplicateWindow {
duplicateWindow = cfg .TTL
}
var compression StoreCompression
if cfg .Compression {
compression = S2Compression
}
scfg := &StreamConfig {
Name : fmt .Sprintf (kvBucketNameTmpl , cfg .Bucket ),
Description : cfg .Description ,
MaxMsgsPerSubject : history ,
MaxBytes : maxBytes ,
MaxAge : cfg .TTL ,
MaxMsgSize : maxMsgSize ,
Storage : cfg .Storage ,
Replicas : replicas ,
Placement : cfg .Placement ,
AllowRollup : true ,
DenyDelete : true ,
Duplicates : duplicateWindow ,
MaxMsgs : -1 ,
MaxConsumers : -1 ,
AllowDirect : true ,
RePublish : cfg .RePublish ,
Compression : compression ,
}
if cfg .Mirror != nil {
m := cfg .Mirror .copy ()
if !strings .HasPrefix (m .Name , kvBucketNamePre ) {
m .Name = fmt .Sprintf (kvBucketNameTmpl , m .Name )
}
scfg .Mirror = m
scfg .MirrorDirect = true
} else if len (cfg .Sources ) > 0 {
for _ , ss := range cfg .Sources {
var sourceBucketName string
if strings .HasPrefix (ss .Name , kvBucketNamePre ) {
sourceBucketName = ss .Name [len (kvBucketNamePre ):]
} else {
sourceBucketName = ss .Name
ss .Name = fmt .Sprintf (kvBucketNameTmpl , ss .Name )
}
if ss .External == nil || sourceBucketName != cfg .Bucket {
ss .SubjectTransforms = []SubjectTransformConfig {{Source : fmt .Sprintf (kvSubjectsTmpl , sourceBucketName ), Destination : fmt .Sprintf (kvSubjectsTmpl , cfg .Bucket )}}
}
scfg .Sources = append (scfg .Sources , ss )
}
scfg .Subjects = []string {fmt .Sprintf (kvSubjectsTmpl , cfg .Bucket )}
} else {
scfg .Subjects = []string {fmt .Sprintf (kvSubjectsTmpl , cfg .Bucket )}
}
if js .nc .serverMinVersion (2 , 7 , 2 ) {
scfg .Discard = DiscardNew
}
si , err := js .AddStream (scfg )
if err != nil {
if errors .Is (err , ErrStreamNameAlreadyInUse ) {
if si , _ = js .StreamInfo (scfg .Name ); si != nil {
si .Config .Discard = scfg .Discard
si .Config .AllowDirect = scfg .AllowDirect
if reflect .DeepEqual (&si .Config , scfg ) {
si , err = js .UpdateStream (scfg )
}
}
}
if err != nil {
return nil , err
}
}
return mapStreamToKVS (js , si ), nil
}
func (js *js ) DeleteKeyValue (bucket string ) error {
if !bucketValid (bucket ) {
return ErrInvalidBucketName
}
stream := fmt .Sprintf (kvBucketNameTmpl , bucket )
return js .DeleteStream (stream )
}
type kvs struct {
name string
stream string
pre string
putPre string
js *js
useJSPfx bool
useDirect bool
}
type kve struct {
bucket string
key string
value []byte
revision uint64
delta uint64
created time .Time
op KeyValueOp
}
func (e *kve ) Bucket () string { return e .bucket }
func (e *kve ) Key () string { return e .key }
func (e *kve ) Value () []byte { return e .value }
func (e *kve ) Revision () uint64 { return e .revision }
func (e *kve ) Created () time .Time { return e .created }
func (e *kve ) Delta () uint64 { return e .delta }
func (e *kve ) Operation () KeyValueOp { return e .op }
func bucketValid(bucket string ) bool {
if len (bucket ) == 0 {
return false
}
return validBucketRe .MatchString (bucket )
}
func keyValid(key string ) bool {
if len (key ) == 0 || key [0 ] == '.' || key [len (key )-1 ] == '.' {
return false
}
return validKeyRe .MatchString (key )
}
func searchKeyValid(key string ) bool {
if len (key ) == 0 || key [0 ] == '.' || key [len (key )-1 ] == '.' {
return false
}
return validSearchKeyRe .MatchString (key )
}
func (kv *kvs ) Get (key string ) (KeyValueEntry , error ) {
e , err := kv .get (key , kvLatestRevision )
if err != nil {
if errors .Is (err , ErrKeyDeleted ) {
return nil , ErrKeyNotFound
}
return nil , err
}
return e , nil
}
func (kv *kvs ) GetRevision (key string , revision uint64 ) (KeyValueEntry , error ) {
e , err := kv .get (key , revision )
if err != nil {
if errors .Is (err , ErrKeyDeleted ) {
return nil , ErrKeyNotFound
}
return nil , err
}
return e , nil
}
func (kv *kvs ) get (key string , revision uint64 ) (KeyValueEntry , error ) {
if !keyValid (key ) {
return nil , ErrInvalidKey
}
var b strings .Builder
b .WriteString (kv .pre )
b .WriteString (key )
var m *RawStreamMsg
var err error
var _opts [1 ]JSOpt
opts := _opts [:0 ]
if kv .useDirect {
opts = append (opts , DirectGet ())
}
if revision == kvLatestRevision {
m , err = kv .js .GetLastMsg (kv .stream , b .String (), opts ...)
} else {
m , err = kv .js .GetMsg (kv .stream , revision , opts ...)
if err == nil && m .Subject != b .String () {
return nil , ErrKeyNotFound
}
}
if err != nil {
if errors .Is (err , ErrMsgNotFound ) {
err = ErrKeyNotFound
}
return nil , err
}
entry := &kve {
bucket : kv .name ,
key : key ,
value : m .Data ,
revision : m .Sequence ,
created : m .Time ,
}
if len (m .Header ) > 0 {
switch m .Header .Get (kvop ) {
case kvdel :
entry .op = KeyValueDelete
return entry , ErrKeyDeleted
case kvpurge :
entry .op = KeyValuePurge
return entry , ErrKeyDeleted
}
}
return entry , nil
}
func (kv *kvs ) Put (key string , value []byte ) (revision uint64 , err error ) {
if !keyValid (key ) {
return 0 , ErrInvalidKey
}
var b strings .Builder
if kv .useJSPfx {
b .WriteString (kv .js .opts .pre )
}
if kv .putPre != _EMPTY_ {
b .WriteString (kv .putPre )
} else {
b .WriteString (kv .pre )
}
b .WriteString (key )
pa , err := kv .js .Publish (b .String (), value )
if err != nil {
return 0 , err
}
return pa .Sequence , err
}
func (kv *kvs ) PutString (key string , value string ) (revision uint64 , err error ) {
return kv .Put (key , []byte (value ))
}
func (kv *kvs ) Create (key string , value []byte ) (revision uint64 , err error ) {
v , err := kv .Update (key , value , 0 )
if err == nil {
return v , nil
}
if e , err := kv .get (key , kvLatestRevision ); errors .Is (err , ErrKeyDeleted ) {
return kv .Update (key , value , e .Revision ())
}
if errors .Is (err , ErrKeyExists ) {
jserr := ErrKeyExists .(*jsError )
return 0 , fmt .Errorf ("%w: %s" , err , jserr .message )
}
return 0 , err
}
func (kv *kvs ) Update (key string , value []byte , revision uint64 ) (uint64 , error ) {
if !keyValid (key ) {
return 0 , ErrInvalidKey
}
var b strings .Builder
if kv .useJSPfx {
b .WriteString (kv .js .opts .pre )
}
b .WriteString (kv .pre )
b .WriteString (key )
m := Msg {Subject : b .String (), Header : Header {}, Data : value }
m .Header .Set (ExpectedLastSubjSeqHdr , strconv .FormatUint (revision , 10 ))
pa , err := kv .js .PublishMsg (&m )
if err != nil {
return 0 , err
}
return pa .Sequence , err
}
func (kv *kvs ) Delete (key string , opts ...DeleteOpt ) error {
if !keyValid (key ) {
return ErrInvalidKey
}
var b strings .Builder
if kv .useJSPfx {
b .WriteString (kv .js .opts .pre )
}
if kv .putPre != _EMPTY_ {
b .WriteString (kv .putPre )
} else {
b .WriteString (kv .pre )
}
b .WriteString (key )
m := NewMsg (b .String ())
var o deleteOpts
for _ , opt := range opts {
if opt != nil {
if err := opt .configureDelete (&o ); err != nil {
return err
}
}
}
if o .purge {
m .Header .Set (kvop , kvpurge )
m .Header .Set (MsgRollup , MsgRollupSubject )
} else {
m .Header .Set (kvop , kvdel )
}
if o .revision != 0 {
m .Header .Set (ExpectedLastSubjSeqHdr , strconv .FormatUint (o .revision , 10 ))
}
_ , err := kv .js .PublishMsg (m )
return err
}
func (kv *kvs ) Purge (key string , opts ...DeleteOpt ) error {
return kv .Delete (key , append (opts , purge ())...)
}
const kvDefaultPurgeDeletesMarkerThreshold = 30 * time .Minute
func (kv *kvs ) PurgeDeletes (opts ...PurgeOpt ) error {
var o purgeOpts
for _ , opt := range opts {
if opt != nil {
if err := opt .configurePurge (&o ); err != nil {
return err
}
}
}
var wopts []WatchOpt
if o .ctx != nil {
wopts = append (wopts , Context (o .ctx ))
}
watcher , err := kv .WatchAll (wopts ...)
if err != nil {
return err
}
defer watcher .Stop ()
var limit time .Time
olderThan := o .dmthr
if olderThan == 0 {
olderThan = kvDefaultPurgeDeletesMarkerThreshold
}
if olderThan > 0 {
limit = time .Now ().Add (-olderThan )
}
var deleteMarkers []KeyValueEntry
for entry := range watcher .Updates () {
if entry == nil {
break
}
if op := entry .Operation (); op == KeyValueDelete || op == KeyValuePurge {
deleteMarkers = append (deleteMarkers , entry )
}
}
watcher .Stop ()
var (
pr StreamPurgeRequest
b strings .Builder
)
purgeOpts := []JSOpt {}
if o .ctx != nil {
purgeOpts = append (purgeOpts , Context (o .ctx ))
}
for _ , entry := range deleteMarkers {
b .WriteString (kv .pre )
b .WriteString (entry .Key ())
pr .Subject = b .String ()
pr .Keep = 0
if olderThan > 0 && entry .Created ().After (limit ) {
pr .Keep = 1
}
if err := kv .js .purgeStream (kv .stream , &pr , purgeOpts ...); err != nil {
return err
}
b .Reset ()
}
return nil
}
func (kv *kvs ) Keys (opts ...WatchOpt ) ([]string , error ) {
opts = append (opts , IgnoreDeletes (), MetaOnly ())
watcher , err := kv .WatchAll (opts ...)
if err != nil {
return nil , err
}
defer watcher .Stop ()
var keys []string
for entry := range watcher .Updates () {
if entry == nil {
break
}
keys = append (keys , entry .Key ())
}
if len (keys ) == 0 {
return nil , ErrNoKeysFound
}
return keys , nil
}
type keyLister struct {
watcher KeyWatcher
keys chan string
}
func (kv *kvs ) ListKeys (opts ...WatchOpt ) (KeyLister , error ) {
opts = append (opts , IgnoreDeletes (), MetaOnly ())
watcher , err := kv .WatchAll (opts ...)
if err != nil {
return nil , err
}
kl := &keyLister {watcher : watcher , keys : make (chan string , 256 )}
go func () {
defer close (kl .keys )
defer watcher .Stop ()
for entry := range watcher .Updates () {
if entry == nil {
return
}
kl .keys <- entry .Key ()
}
}()
return kl , nil
}
func (kl *keyLister ) Keys () <-chan string {
return kl .keys
}
func (kl *keyLister ) Stop () error {
return kl .watcher .Stop ()
}
func (kl *keyLister ) Error () <-chan error {
return kl .watcher .Error ()
}
func (kv *kvs ) History (key string , opts ...WatchOpt ) ([]KeyValueEntry , error ) {
opts = append (opts , IncludeHistory ())
watcher , err := kv .Watch (key , opts ...)
if err != nil {
return nil , err
}
defer watcher .Stop ()
var entries []KeyValueEntry
for entry := range watcher .Updates () {
if entry == nil {
break
}
entries = append (entries , entry )
}
if len (entries ) == 0 {
return nil , ErrKeyNotFound
}
return entries , nil
}
type watcher struct {
mu sync .Mutex
updates chan KeyValueEntry
sub *Subscription
initDone bool
initPending uint64
received uint64
ctx context .Context
initDoneTimer *time .Timer
errCh chan error
}
func (w *watcher ) Context () context .Context {
if w == nil {
return nil
}
return w .ctx
}
func (w *watcher ) Updates () <-chan KeyValueEntry {
if w == nil {
return nil
}
return w .updates
}
func (w *watcher ) Stop () error {
if w == nil {
return nil
}
return w .sub .Unsubscribe ()
}
func (w *watcher ) Error () <-chan error {
if w == nil {
closedCh := make (chan error )
close (closedCh )
return closedCh
}
return w .errCh
}
func (kv *kvs ) WatchAll (opts ...WatchOpt ) (KeyWatcher , error ) {
return kv .Watch (AllKeys , opts ...)
}
func (kv *kvs ) WatchFiltered (keys []string , opts ...WatchOpt ) (KeyWatcher , error ) {
for _ , key := range keys {
if !searchKeyValid (key ) {
return nil , fmt .Errorf ("%w: %s" , ErrInvalidKey , "key cannot be empty and must be a valid NATS subject" )
}
}
var o watchOpts
for _ , opt := range opts {
if opt != nil {
if err := opt .configureWatcher (&o ); err != nil {
return nil , err
}
}
}
for i , key := range keys {
var b strings .Builder
b .WriteString (kv .pre )
b .WriteString (key )
keys [i ] = b .String ()
}
if len (keys ) == 0 {
var b strings .Builder
b .WriteString (kv .pre )
b .WriteString (AllKeys )
keys = []string {b .String ()}
}
w := &watcher {
updates : make (chan KeyValueEntry , 256 ),
ctx : o .ctx ,
errCh : make (chan error , 1 ),
}
update := func (m *Msg ) {
tokens , err := parser .GetMetadataFields (m .Reply )
if err != nil {
return
}
if len (m .Subject ) <= len (kv .pre ) {
return
}
subj := m .Subject [len (kv .pre ):]
var op KeyValueOp
if len (m .Header ) > 0 {
switch m .Header .Get (kvop ) {
case kvdel :
op = KeyValueDelete
case kvpurge :
op = KeyValuePurge
}
}
delta := parser .ParseNum (tokens [parser .AckNumPendingTokenPos ])
w .mu .Lock ()
defer w .mu .Unlock ()
if !o .ignoreDeletes || (op != KeyValueDelete && op != KeyValuePurge ) {
entry := &kve {
bucket : kv .name ,
key : subj ,
value : m .Data ,
revision : parser .ParseNum (tokens [parser .AckStreamSeqTokenPos ]),
created : time .Unix (0 , int64 (parser .ParseNum (tokens [parser .AckTimestampSeqTokenPos ]))),
delta : delta ,
op : op ,
}
w .updates <- entry
}
if !w .initDone {
w .received ++
if w .received >= w .initPending || delta == 0 {
if w .initDoneTimer != nil {
w .initDoneTimer .Stop ()
}
w .initDone = true
w .updates <- nil
} else if w .initDoneTimer != nil {
w .initDoneTimer .Reset (kv .js .opts .wait )
}
}
}
subOpts := []SubOpt {BindStream (kv .stream ), OrderedConsumer ()}
if !o .includeHistory {
subOpts = append (subOpts , DeliverLastPerSubject ())
}
if o .updatesOnly {
subOpts = append (subOpts , DeliverNew ())
}
if o .metaOnly {
subOpts = append (subOpts , HeadersOnly ())
}
if o .ctx != nil {
subOpts = append (subOpts , Context (o .ctx ))
}
w .mu .Lock ()
defer w .mu .Unlock ()
var sub *Subscription
var err error
if len (keys ) == 1 {
sub , err = kv .js .Subscribe (keys [0 ], update , subOpts ...)
} else {
subOpts = append (subOpts , ConsumerFilterSubjects (keys ...))
sub , err = kv .js .Subscribe ("" , update , subOpts ...)
}
if err != nil {
return nil , err
}
sub .mu .Lock ()
if !o .updatesOnly {
if sub .jsi != nil {
if sub .jsi .pending == 0 {
w .initDone = true
w .updates <- nil
} else {
w .initPending = sub .jsi .pending
w .initDoneTimer = time .AfterFunc (kv .js .opts .wait , func () {
w .mu .Lock ()
defer w .mu .Unlock ()
if !w .initDone {
w .initDone = true
select {
case w .errCh <- ErrKeyWatcherTimeout :
default :
}
w .updates <- nil
}
})
}
}
} else {
w .initDone = true
}
sub .pDone = func (_ string ) {
w .mu .Lock ()
defer w .mu .Unlock ()
if w .initDoneTimer != nil {
w .initDoneTimer .Stop ()
}
close (w .updates )
close (w .errCh )
}
sub .mu .Unlock ()
w .sub = sub
return w , nil
}
func (kv *kvs ) Watch (keys string , opts ...WatchOpt ) (KeyWatcher , error ) {
return kv .WatchFiltered ([]string {keys }, opts ...)
}
func (kv *kvs ) Bucket () string {
return kv .name
}
type KeyValueBucketStatus struct {
nfo *StreamInfo
bucket string
}
func (s *KeyValueBucketStatus ) Bucket () string { return s .bucket }
func (s *KeyValueBucketStatus ) Values () uint64 { return s .nfo .State .Msgs }
func (s *KeyValueBucketStatus ) History () int64 { return s .nfo .Config .MaxMsgsPerSubject }
func (s *KeyValueBucketStatus ) TTL () time .Duration { return s .nfo .Config .MaxAge }
func (s *KeyValueBucketStatus ) BackingStore () string { return "JetStream" }
func (s *KeyValueBucketStatus ) StreamInfo () *StreamInfo { return s .nfo }
func (s *KeyValueBucketStatus ) Bytes () uint64 { return s .nfo .State .Bytes }
func (s *KeyValueBucketStatus ) IsCompressed () bool { return s .nfo .Config .Compression != NoCompression }
func (kv *kvs ) Status () (KeyValueStatus , error ) {
nfo , err := kv .js .StreamInfo (kv .stream )
if err != nil {
return nil , err
}
return &KeyValueBucketStatus {nfo : nfo , bucket : kv .name }, nil
}
func (js *js ) KeyValueStoreNames () <-chan string {
ch := make (chan string )
l := &streamNamesLister {js : js }
l .js .opts .streamListSubject = fmt .Sprintf (kvSubjectsTmpl , "*" )
go func () {
defer close (ch )
for l .Next () {
for _ , name := range l .Page () {
if !strings .HasPrefix (name , kvBucketNamePre ) {
continue
}
ch <- strings .TrimPrefix (name , kvBucketNamePre )
}
}
}()
return ch
}
func (js *js ) KeyValueStores () <-chan KeyValueStatus {
ch := make (chan KeyValueStatus )
l := &streamLister {js : js }
l .js .opts .streamListSubject = fmt .Sprintf (kvSubjectsTmpl , "*" )
go func () {
defer close (ch )
for l .Next () {
for _ , info := range l .Page () {
if !strings .HasPrefix (info .Config .Name , kvBucketNamePre ) {
continue
}
ch <- &KeyValueBucketStatus {nfo : info , bucket : strings .TrimPrefix (info .Config .Name , kvBucketNamePre )}
}
}
}()
return ch
}
func mapStreamToKVS(js *js , info *StreamInfo ) *kvs {
bucket := strings .TrimPrefix (info .Config .Name , kvBucketNamePre )
kv := &kvs {
name : bucket ,
stream : info .Config .Name ,
pre : fmt .Sprintf (kvSubjectsPreTmpl , bucket ),
js : js ,
useJSPfx : js .opts .pre != defaultAPIPrefix ,
useDirect : info .Config .AllowDirect ,
}
if m := info .Config .Mirror ; m != nil {
bucket := strings .TrimPrefix (m .Name , kvBucketNamePre )
if m .External != nil && m .External .APIPrefix != _EMPTY_ {
kv .useJSPfx = false
kv .pre = fmt .Sprintf (kvSubjectsPreTmpl , bucket )
kv .putPre = fmt .Sprintf (kvSubjectsPreDomainTmpl , m .External .APIPrefix , bucket )
} else {
kv .putPre = fmt .Sprintf (kvSubjectsPreTmpl , bucket )
}
}
return kv
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .