package nats
import (
"bytes"
"context"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"hash"
"io"
"net"
"os"
"strings"
"sync"
"time"
"github.com/nats-io/nats.go/internal/parser"
"github.com/nats-io/nuid"
)
type ObjectStoreManager interface {
ObjectStore (bucket string ) (ObjectStore , error )
CreateObjectStore (cfg *ObjectStoreConfig ) (ObjectStore , error )
DeleteObjectStore (bucket string ) error
ObjectStoreNames (opts ...ObjectOpt ) <-chan string
ObjectStores (opts ...ObjectOpt ) <-chan ObjectStoreStatus
}
type ObjectStore interface {
Put (obj *ObjectMeta , reader io .Reader , opts ...ObjectOpt ) (*ObjectInfo , error )
Get (name string , opts ...GetObjectOpt ) (ObjectResult , error )
PutBytes (name string , data []byte , opts ...ObjectOpt ) (*ObjectInfo , error )
GetBytes (name string , opts ...GetObjectOpt ) ([]byte , error )
PutString (name string , data string , opts ...ObjectOpt ) (*ObjectInfo , error )
GetString (name string , opts ...GetObjectOpt ) (string , error )
PutFile (file string , opts ...ObjectOpt ) (*ObjectInfo , error )
GetFile (name, file string , opts ...GetObjectOpt ) error
GetInfo (name string , opts ...GetObjectInfoOpt ) (*ObjectInfo , error )
UpdateMeta (name string , meta *ObjectMeta ) error
Delete (name string ) error
AddLink (name string , obj *ObjectInfo ) (*ObjectInfo , error )
AddBucketLink (name string , bucket ObjectStore ) (*ObjectInfo , error )
Seal () error
Watch (opts ...WatchOpt ) (ObjectWatcher , error )
List (opts ...ListObjectsOpt ) ([]*ObjectInfo , error )
Status () (ObjectStoreStatus , error )
}
type ObjectOpt interface {
configureObject(opts *objOpts ) error
}
type objOpts struct {
ctx context .Context
}
func (ctx ContextOpt ) configureObject (opts *objOpts ) error {
opts .ctx = ctx
return nil
}
type ObjectWatcher interface {
Updates () <-chan *ObjectInfo
Stop () error
}
var (
ErrObjectConfigRequired = errors .New ("nats: object-store config required" )
ErrBadObjectMeta = errors .New ("nats: object-store meta information invalid" )
ErrObjectNotFound = errors .New ("nats: object not found" )
ErrInvalidStoreName = errors .New ("nats: invalid object-store name" )
ErrDigestMismatch = errors .New ("nats: received a corrupt object, digests do not match" )
ErrInvalidDigestFormat = errors .New ("nats: object digest hash has invalid format" )
ErrNoObjectsFound = errors .New ("nats: no objects found" )
ErrObjectAlreadyExists = errors .New ("nats: an object already exists with that name" )
ErrNameRequired = errors .New ("nats: name is required" )
ErrNeeds262 = errors .New ("nats: object-store requires at least server version 2.6.2" )
ErrLinkNotAllowed = errors .New ("nats: link cannot be set when putting the object in bucket" )
ErrObjectRequired = errors .New ("nats: object required" )
ErrNoLinkToDeleted = errors .New ("nats: not allowed to link to a deleted object" )
ErrNoLinkToLink = errors .New ("nats: not allowed to link to another link" )
ErrCantGetBucket = errors .New ("nats: invalid Get, object is a link to a bucket" )
ErrBucketRequired = errors .New ("nats: bucket required" )
ErrBucketMalformed = errors .New ("nats: bucket malformed" )
ErrUpdateMetaDeleted = errors .New ("nats: cannot update meta for a deleted object" )
)
type ObjectStoreConfig struct {
Bucket string `json:"bucket"`
Description string `json:"description,omitempty"`
TTL time .Duration `json:"max_age,omitempty"`
MaxBytes int64 `json:"max_bytes,omitempty"`
Storage StorageType `json:"storage,omitempty"`
Replicas int `json:"num_replicas,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Metadata map [string ]string `json:"metadata,omitempty"`
Compression bool `json:"compression,omitempty"`
}
type ObjectStoreStatus interface {
Bucket () string
Description () string
TTL () time .Duration
Storage () StorageType
Replicas () int
Sealed () bool
Size () uint64
BackingStore () string
Metadata () map [string ]string
IsCompressed () bool
}
type ObjectMetaOptions struct {
Link *ObjectLink `json:"link,omitempty"`
ChunkSize uint32 `json:"max_chunk_size,omitempty"`
}
type ObjectMeta struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Headers Header `json:"headers,omitempty"`
Metadata map [string ]string `json:"metadata,omitempty"`
Opts *ObjectMetaOptions `json:"options,omitempty"`
}
type ObjectInfo struct {
ObjectMeta
Bucket string `json:"bucket"`
NUID string `json:"nuid"`
Size uint64 `json:"size"`
ModTime time .Time `json:"mtime"`
Chunks uint32 `json:"chunks"`
Digest string `json:"digest,omitempty"`
Deleted bool `json:"deleted,omitempty"`
}
type ObjectLink struct {
Bucket string `json:"bucket"`
Name string `json:"name,omitempty"`
}
type ObjectResult interface {
io .ReadCloser
Info () (*ObjectInfo , error )
Error () error
}
const (
objNameTmpl = "OBJ_%s"
objAllChunksPreTmpl = "$O.%s.C.>"
objAllMetaPreTmpl = "$O.%s.M.>"
objChunksPreTmpl = "$O.%s.C.%s"
objMetaPreTmpl = "$O.%s.M.%s"
objNoPending = "0"
objDefaultChunkSize = uint32 (128 * 1024 )
objDigestType = "SHA-256="
objDigestTmpl = objDigestType + "%s"
)
type obs struct {
name string
stream string
js *js
}
func (js *js ) CreateObjectStore (cfg *ObjectStoreConfig ) (ObjectStore , error ) {
if !js .nc .serverMinVersion (2 , 6 , 2 ) {
return nil , ErrNeeds262
}
if cfg == nil {
return nil , ErrObjectConfigRequired
}
if !validBucketRe .MatchString (cfg .Bucket ) {
return nil , ErrInvalidStoreName
}
name := cfg .Bucket
chunks := fmt .Sprintf (objAllChunksPreTmpl , name )
meta := fmt .Sprintf (objAllMetaPreTmpl , name )
replicas := cfg .Replicas
if replicas == 0 {
replicas = 1
}
maxBytes := cfg .MaxBytes
if maxBytes == 0 {
maxBytes = -1
}
var compression StoreCompression
if cfg .Compression {
compression = S2Compression
}
scfg := &StreamConfig {
Name : fmt .Sprintf (objNameTmpl , name ),
Description : cfg .Description ,
Subjects : []string {chunks , meta },
MaxAge : cfg .TTL ,
MaxBytes : maxBytes ,
Storage : cfg .Storage ,
Replicas : replicas ,
Placement : cfg .Placement ,
Discard : DiscardNew ,
AllowRollup : true ,
AllowDirect : true ,
Metadata : cfg .Metadata ,
Compression : compression ,
}
_ , err := js .AddStream (scfg )
if err != nil {
return nil , err
}
return &obs {name : name , stream : scfg .Name , js : js }, nil
}
func (js *js ) ObjectStore (bucket string ) (ObjectStore , error ) {
if !validBucketRe .MatchString (bucket ) {
return nil , ErrInvalidStoreName
}
if !js .nc .serverMinVersion (2 , 6 , 2 ) {
return nil , ErrNeeds262
}
stream := fmt .Sprintf (objNameTmpl , bucket )
si , err := js .StreamInfo (stream )
if err != nil {
return nil , err
}
return &obs {name : bucket , stream : si .Config .Name , js : js }, nil
}
func (js *js ) DeleteObjectStore (bucket string ) error {
stream := fmt .Sprintf (objNameTmpl , bucket )
return js .DeleteStream (stream )
}
func encodeName(name string ) string {
return base64 .URLEncoding .EncodeToString ([]byte (name ))
}
func (obs *obs ) Put (meta *ObjectMeta , r io .Reader , opts ...ObjectOpt ) (*ObjectInfo , error ) {
if meta == nil || meta .Name == "" {
return nil , ErrBadObjectMeta
}
if meta .Opts == nil {
meta .Opts = &ObjectMetaOptions {ChunkSize : objDefaultChunkSize }
} else if meta .Opts .Link != nil {
return nil , ErrLinkNotAllowed
} else if meta .Opts .ChunkSize == 0 {
meta .Opts .ChunkSize = objDefaultChunkSize
}
var o objOpts
for _ , opt := range opts {
if opt != nil {
if err := opt .configureObject (&o ); err != nil {
return nil , err
}
}
}
ctx := o .ctx
newnuid := nuid .Next ()
chunkSubj := fmt .Sprintf (objChunksPreTmpl , obs .name , newnuid )
einfo , err := obs .GetInfo (meta .Name , GetObjectInfoShowDeleted ())
if err != nil && err != ErrObjectNotFound {
return nil , err
}
var perr error
var mu sync .Mutex
setErr := func (err error ) {
mu .Lock ()
defer mu .Unlock ()
perr = err
}
getErr := func () error {
mu .Lock ()
defer mu .Unlock ()
return perr
}
jetStream , err := obs .js .nc .JetStream (PublishAsyncErrHandler (func (js JetStream , _ *Msg , err error ) { setErr (err ) }))
if err != nil {
return nil , err
}
defer jetStream .(*js ).cleanupReplySub ()
purgePartial := func () error {
select {
case <- jetStream .PublishAsyncComplete ():
case <- time .After (obs .js .opts .wait ):
}
if err := obs .js .purgeStream (obs .stream , &StreamPurgeRequest {Subject : chunkSubj }); err != nil {
return fmt .Errorf ("could not cleanup bucket after erroneous put operation: %w" , err )
}
return nil
}
m , h := NewMsg (chunkSubj ), sha256 .New ()
chunk , sent , total := make ([]byte , meta .Opts .ChunkSize ), 0 , uint64 (0 )
info := &ObjectInfo {Bucket : obs .name , NUID : newnuid , ObjectMeta : *meta }
for r != nil {
if ctx != nil {
select {
case <- ctx .Done ():
if ctx .Err () == context .Canceled {
err = ctx .Err ()
} else {
err = ErrTimeout
}
default :
}
if err != nil {
if purgeErr := purgePartial (); purgeErr != nil {
return nil , errors .Join (err , purgeErr )
}
return nil , err
}
}
n , readErr := r .Read (chunk )
if readErr != nil && readErr != io .EOF {
if purgeErr := purgePartial (); purgeErr != nil {
return nil , errors .Join (readErr , purgeErr )
}
return nil , readErr
}
if n > 0 {
m .Data = chunk [:n ]
h .Write (m .Data )
if _ , err := jetStream .PublishMsgAsync (m ); err != nil {
if purgeErr := purgePartial (); purgeErr != nil {
return nil , errors .Join (err , purgeErr )
}
return nil , err
}
if err := getErr (); err != nil {
if purgeErr := purgePartial (); purgeErr != nil {
return nil , errors .Join (err , purgeErr )
}
return nil , err
}
sent ++
total += uint64 (n )
}
if readErr == io .EOF {
info .Size , info .Chunks = uint64 (total ), uint32 (sent )
info .Digest = GetObjectDigestValue (h )
break
}
}
metaSubj := fmt .Sprintf (objMetaPreTmpl , obs .name , encodeName (meta .Name ))
mm := NewMsg (metaSubj )
mm .Header .Set (MsgRollup , MsgRollupSubject )
mm .Data , err = json .Marshal (info )
if err != nil {
if r != nil {
if purgeErr := purgePartial (); purgeErr != nil {
return nil , errors .Join (err , purgeErr )
}
}
return nil , err
}
_, err = jetStream .PublishMsgAsync (mm )
if err != nil {
if r != nil {
if purgeErr := purgePartial (); purgeErr != nil {
return nil , errors .Join (err , purgeErr )
}
}
return nil , err
}
select {
case <- jetStream .PublishAsyncComplete ():
if err := getErr (); err != nil {
if r != nil {
if purgeErr := purgePartial (); purgeErr != nil {
return nil , errors .Join (err , purgeErr )
}
}
return nil , err
}
case <- time .After (obs .js .opts .wait ):
return nil , ErrTimeout
}
info .ModTime = time .Now ().UTC ()
if einfo != nil && !einfo .Deleted {
echunkSubj := fmt .Sprintf (objChunksPreTmpl , obs .name , einfo .NUID )
if err := obs .js .purgeStream (obs .stream , &StreamPurgeRequest {Subject : echunkSubj }); err != nil {
return info , err
}
}
return info , nil
}
func GetObjectDigestValue (data hash .Hash ) string {
sha := data .Sum (nil )
return fmt .Sprintf (objDigestTmpl , base64 .URLEncoding .EncodeToString (sha [:]))
}
func DecodeObjectDigest (data string ) ([]byte , error ) {
digest := strings .SplitN (data , "=" , 2 )
if len (digest ) != 2 {
return nil , ErrInvalidDigestFormat
}
return base64 .URLEncoding .DecodeString (digest [1 ])
}
type objResult struct {
sync .Mutex
info *ObjectInfo
r io .ReadCloser
err error
ctx context .Context
digest hash .Hash
readTimeout time .Duration
}
func (info *ObjectInfo ) isLink () bool {
return info .ObjectMeta .Opts != nil && info .ObjectMeta .Opts .Link != nil
}
type GetObjectOpt interface {
configureGetObject(opts *getObjectOpts ) error
}
type getObjectOpts struct {
ctx context .Context
showDeleted bool
}
type getObjectFn func (opts *getObjectOpts ) error
func (opt getObjectFn ) configureGetObject (opts *getObjectOpts ) error {
return opt (opts )
}
func GetObjectShowDeleted () GetObjectOpt {
return getObjectFn (func (opts *getObjectOpts ) error {
opts .showDeleted = true
return nil
})
}
func (ctx ContextOpt ) configureGetObject (opts *getObjectOpts ) error {
opts .ctx = ctx
return nil
}
func (obs *obs ) Get (name string , opts ...GetObjectOpt ) (ObjectResult , error ) {
var o getObjectOpts
for _ , opt := range opts {
if opt != nil {
if err := opt .configureGetObject (&o ); err != nil {
return nil , err
}
}
}
ctx := o .ctx
infoOpts := make ([]GetObjectInfoOpt , 0 )
if ctx != nil {
infoOpts = append (infoOpts , Context (ctx ))
}
if o .showDeleted {
infoOpts = append (infoOpts , GetObjectInfoShowDeleted ())
}
info , err := obs .GetInfo (name , infoOpts ...)
if err != nil {
return nil , err
}
if info .NUID == _EMPTY_ {
return nil , ErrBadObjectMeta
}
if info .isLink () {
if info .ObjectMeta .Opts .Link .Name == _EMPTY_ {
return nil , ErrCantGetBucket
}
lbuck := info .ObjectMeta .Opts .Link .Bucket
if lbuck == obs .name {
return obs .Get (info .ObjectMeta .Opts .Link .Name )
}
lobs , err := obs .js .ObjectStore (lbuck )
if err != nil {
return nil , err
}
return lobs .Get (info .ObjectMeta .Opts .Link .Name )
}
result := &objResult {info : info , ctx : ctx , readTimeout : obs .js .opts .wait }
if info .Size == 0 {
return result , nil
}
pr , pw := net .Pipe ()
result .r = pr
gotErr := func (m *Msg , err error ) {
pw .Close ()
m .Sub .Unsubscribe ()
result .setErr (err )
}
result .digest = sha256 .New ()
processChunk := func (m *Msg ) {
var err error
if ctx != nil {
select {
case <- ctx .Done ():
if errors .Is (ctx .Err (), context .Canceled ) {
err = ctx .Err ()
} else {
err = ErrTimeout
}
default :
}
if err != nil {
gotErr (m , err )
return
}
}
tokens , err := parser .GetMetadataFields (m .Reply )
if err != nil {
gotErr (m , err )
return
}
for b := m .Data ; len (b ) > 0 ; {
n , err := pw .Write (b )
if err != nil {
gotErr (m , err )
return
}
b = b [n :]
}
result .digest .Write (m .Data )
if tokens [parser .AckNumPendingTokenPos ] == objNoPending {
pw .Close ()
m .Sub .Unsubscribe ()
}
}
chunkSubj := fmt .Sprintf (objChunksPreTmpl , obs .name , info .NUID )
streamName := fmt .Sprintf (objNameTmpl , obs .name )
subscribeOpts := []SubOpt {
OrderedConsumer (),
BindStream (streamName ),
}
_, err = obs .js .Subscribe (chunkSubj , processChunk , subscribeOpts ...)
if err != nil {
return nil , err
}
return result , nil
}
func (obs *obs ) Delete (name string ) error {
info , err := obs .GetInfo (name , GetObjectInfoShowDeleted ())
if err != nil {
return err
}
if info .NUID == _EMPTY_ {
return ErrBadObjectMeta
}
info .Deleted = true
info .Size , info .Chunks , info .Digest = 0 , 0 , _EMPTY_
if err = publishMeta (info , obs .js ); err != nil {
return err
}
chunkSubj := fmt .Sprintf (objChunksPreTmpl , obs .name , info .NUID )
return obs .js .purgeStream (obs .stream , &StreamPurgeRequest {Subject : chunkSubj })
}
func publishMeta(info *ObjectInfo , js JetStreamContext ) error {
info .ModTime = time .Time {}
data , err := json .Marshal (info )
if err != nil {
return err
}
mm := NewMsg (fmt .Sprintf (objMetaPreTmpl , info .Bucket , encodeName (info .ObjectMeta .Name )))
mm .Header .Set (MsgRollup , MsgRollupSubject )
mm .Data = data
if _ , err := js .PublishMsg (mm ); err != nil {
return err
}
info .ModTime = time .Now ().UTC ()
return nil
}
func (obs *obs ) AddLink (name string , obj *ObjectInfo ) (*ObjectInfo , error ) {
if name == "" {
return nil , ErrNameRequired
}
if obj == nil || obj .Name == "" {
return nil , ErrObjectRequired
}
if obj .Deleted {
return nil , ErrNoLinkToDeleted
}
if obj .isLink () {
return nil , ErrNoLinkToLink
}
einfo , err := obs .GetInfo (name , GetObjectInfoShowDeleted ())
if einfo != nil {
if !einfo .isLink () {
return nil , ErrObjectAlreadyExists
}
} else if err != ErrObjectNotFound {
return nil , err
}
meta := &ObjectMeta {
Name : name ,
Opts : &ObjectMetaOptions {Link : &ObjectLink {Bucket : obj .Bucket , Name : obj .Name }},
}
info := &ObjectInfo {Bucket : obs .name , NUID : nuid .Next (), ModTime : time .Now ().UTC (), ObjectMeta : *meta }
if err = publishMeta (info , obs .js ); err != nil {
return nil , err
}
return info , nil
}
func (ob *obs ) AddBucketLink (name string , bucket ObjectStore ) (*ObjectInfo , error ) {
if name == "" {
return nil , ErrNameRequired
}
if bucket == nil {
return nil , ErrBucketRequired
}
bos , ok := bucket .(*obs )
if !ok {
return nil , ErrBucketMalformed
}
einfo , err := ob .GetInfo (name , GetObjectInfoShowDeleted ())
if einfo != nil {
if !einfo .isLink () {
return nil , ErrObjectAlreadyExists
}
} else if err != ErrObjectNotFound {
return nil , err
}
meta := &ObjectMeta {
Name : name ,
Opts : &ObjectMetaOptions {Link : &ObjectLink {Bucket : bos .name }},
}
info := &ObjectInfo {Bucket : ob .name , NUID : nuid .Next (), ObjectMeta : *meta }
err = publishMeta (info , ob .js )
if err != nil {
return nil , err
}
return info , nil
}
func (obs *obs ) PutBytes (name string , data []byte , opts ...ObjectOpt ) (*ObjectInfo , error ) {
return obs .Put (&ObjectMeta {Name : name }, bytes .NewReader (data ), opts ...)
}
func (obs *obs ) GetBytes (name string , opts ...GetObjectOpt ) ([]byte , error ) {
result , err := obs .Get (name , opts ...)
if err != nil {
return nil , err
}
defer result .Close ()
var b bytes .Buffer
if _ , err := b .ReadFrom (result ); err != nil {
return nil , err
}
return b .Bytes (), nil
}
func (obs *obs ) PutString (name string , data string , opts ...ObjectOpt ) (*ObjectInfo , error ) {
return obs .Put (&ObjectMeta {Name : name }, strings .NewReader (data ), opts ...)
}
func (obs *obs ) GetString (name string , opts ...GetObjectOpt ) (string , error ) {
result , err := obs .Get (name , opts ...)
if err != nil {
return _EMPTY_ , err
}
defer result .Close ()
var b bytes .Buffer
if _ , err := b .ReadFrom (result ); err != nil {
return _EMPTY_ , err
}
return b .String (), nil
}
func (obs *obs ) PutFile (file string , opts ...ObjectOpt ) (*ObjectInfo , error ) {
f , err := os .Open (file )
if err != nil {
return nil , err
}
defer f .Close ()
return obs .Put (&ObjectMeta {Name : file }, f , opts ...)
}
func (obs *obs ) GetFile (name , file string , opts ...GetObjectOpt ) error {
f , err := os .OpenFile (file , os .O_WRONLY |os .O_CREATE , 0600 )
if err != nil {
return err
}
defer f .Close ()
result , err := obs .Get (name , opts ...)
if err != nil {
os .Remove (f .Name ())
return err
}
defer result .Close ()
_, err = io .Copy (f , result )
return err
}
type GetObjectInfoOpt interface {
configureGetInfo(opts *getObjectInfoOpts ) error
}
type getObjectInfoOpts struct {
ctx context .Context
showDeleted bool
}
type getObjectInfoFn func (opts *getObjectInfoOpts ) error
func (opt getObjectInfoFn ) configureGetInfo (opts *getObjectInfoOpts ) error {
return opt (opts )
}
func GetObjectInfoShowDeleted () GetObjectInfoOpt {
return getObjectInfoFn (func (opts *getObjectInfoOpts ) error {
opts .showDeleted = true
return nil
})
}
func (ctx ContextOpt ) configureGetInfo (opts *getObjectInfoOpts ) error {
opts .ctx = ctx
return nil
}
func (obs *obs ) GetInfo (name string , opts ...GetObjectInfoOpt ) (*ObjectInfo , error ) {
if name == "" {
return nil , ErrNameRequired
}
var o getObjectInfoOpts
for _ , opt := range opts {
if opt != nil {
if err := opt .configureGetInfo (&o ); err != nil {
return nil , err
}
}
}
metaSubj := fmt .Sprintf (objMetaPreTmpl , obs .name , encodeName (name ))
stream := fmt .Sprintf (objNameTmpl , obs .name )
m , err := obs .js .GetLastMsg (stream , metaSubj )
if err != nil {
if errors .Is (err , ErrMsgNotFound ) {
err = ErrObjectNotFound
}
return nil , err
}
var info ObjectInfo
if err := json .Unmarshal (m .Data , &info ); err != nil {
return nil , ErrBadObjectMeta
}
if !o .showDeleted && info .Deleted {
return nil , ErrObjectNotFound
}
info .ModTime = m .Time
return &info , nil
}
func (obs *obs ) UpdateMeta (name string , meta *ObjectMeta ) error {
if meta == nil {
return ErrBadObjectMeta
}
info , err := obs .GetInfo (name )
if err != nil {
if errors .Is (err , ErrObjectNotFound ) {
return ErrUpdateMetaDeleted
}
return err
}
if name != meta .Name {
existingInfo , err := obs .GetInfo (meta .Name , GetObjectInfoShowDeleted ())
if err != nil && !errors .Is (err , ErrObjectNotFound ) {
return err
}
if err == nil && !existingInfo .Deleted {
return ErrObjectAlreadyExists
}
}
info .Name = meta .Name
info .Description = meta .Description
info .Headers = meta .Headers
info .Metadata = meta .Metadata
if err = publishMeta (info , obs .js ); err != nil {
return err
}
if name != meta .Name {
metaSubj := fmt .Sprintf (objMetaPreTmpl , obs .name , encodeName (name ))
return obs .js .purgeStream (obs .stream , &StreamPurgeRequest {Subject : metaSubj })
}
return nil
}
func (obs *obs ) Seal () error {
stream := fmt .Sprintf (objNameTmpl , obs .name )
si , err := obs .js .StreamInfo (stream )
if err != nil {
return err
}
cfg := si .Config
cfg .Sealed = true
_, err = obs .js .UpdateStream (&cfg )
return err
}
type objWatcher struct {
updates chan *ObjectInfo
sub *Subscription
}
func (w *objWatcher ) Updates () <-chan *ObjectInfo {
if w == nil {
return nil
}
return w .updates
}
func (w *objWatcher ) Stop () error {
if w == nil {
return nil
}
return w .sub .Unsubscribe ()
}
func (obs *obs ) Watch (opts ...WatchOpt ) (ObjectWatcher , error ) {
var o watchOpts
for _ , opt := range opts {
if opt != nil {
if err := opt .configureWatcher (&o ); err != nil {
return nil , err
}
}
}
var initDoneMarker bool
w := &objWatcher {updates : make (chan *ObjectInfo , 32 )}
update := func (m *Msg ) {
var info ObjectInfo
if err := json .Unmarshal (m .Data , &info ); err != nil {
return
}
meta , err := m .Metadata ()
if err != nil {
return
}
if !o .ignoreDeletes || !info .Deleted {
info .ModTime = meta .Timestamp
w .updates <- &info
}
if !initDoneMarker && meta .NumPending == 0 {
initDoneMarker = true
w .updates <- nil
}
}
allMeta := fmt .Sprintf (objAllMetaPreTmpl , obs .name )
_ , err := obs .js .GetLastMsg (obs .stream , allMeta )
if !o .updatesOnly {
if errors .Is (err , ErrMsgNotFound ) {
initDoneMarker = true
w .updates <- nil
}
} else {
initDoneMarker = true
}
streamName := fmt .Sprintf (objNameTmpl , obs .name )
subOpts := []SubOpt {OrderedConsumer (), BindStream (streamName )}
if !o .includeHistory {
subOpts = append (subOpts , DeliverLastPerSubject ())
}
if o .updatesOnly {
subOpts = append (subOpts , DeliverNew ())
}
sub , err := obs .js .Subscribe (allMeta , update , subOpts ...)
if err != nil {
return nil , err
}
sub .pDone = func (_ string ) {
close (w .updates )
}
w .sub = sub
return w , nil
}
type ListObjectsOpt interface {
configureListObjects(opts *listObjectOpts ) error
}
type listObjectOpts struct {
ctx context .Context
showDeleted bool
}
type listObjectsFn func (opts *listObjectOpts ) error
func (opt listObjectsFn ) configureListObjects (opts *listObjectOpts ) error {
return opt (opts )
}
func ListObjectsShowDeleted () ListObjectsOpt {
return listObjectsFn (func (opts *listObjectOpts ) error {
opts .showDeleted = true
return nil
})
}
func (ctx ContextOpt ) configureListObjects (opts *listObjectOpts ) error {
opts .ctx = ctx
return nil
}
func (obs *obs ) List (opts ...ListObjectsOpt ) ([]*ObjectInfo , error ) {
var o listObjectOpts
for _ , opt := range opts {
if opt != nil {
if err := opt .configureListObjects (&o ); err != nil {
return nil , err
}
}
}
watchOpts := make ([]WatchOpt , 0 )
if !o .showDeleted {
watchOpts = append (watchOpts , IgnoreDeletes ())
}
watcher , err := obs .Watch (watchOpts ...)
if err != nil {
return nil , err
}
defer watcher .Stop ()
if o .ctx == nil {
o .ctx = context .Background ()
}
var objs []*ObjectInfo
updates := watcher .Updates ()
Updates :
for {
select {
case entry := <- updates :
if entry == nil {
break Updates
}
objs = append (objs , entry )
case <- o .ctx .Done ():
return nil , o .ctx .Err ()
}
}
if len (objs ) == 0 {
return nil , ErrNoObjectsFound
}
return objs , nil
}
type ObjectBucketStatus struct {
nfo *StreamInfo
bucket string
}
func (s *ObjectBucketStatus ) Bucket () string { return s .bucket }
func (s *ObjectBucketStatus ) Description () string { return s .nfo .Config .Description }
func (s *ObjectBucketStatus ) TTL () time .Duration { return s .nfo .Config .MaxAge }
func (s *ObjectBucketStatus ) Storage () StorageType { return s .nfo .Config .Storage }
func (s *ObjectBucketStatus ) Replicas () int { return s .nfo .Config .Replicas }
func (s *ObjectBucketStatus ) Sealed () bool { return s .nfo .Config .Sealed }
func (s *ObjectBucketStatus ) Size () uint64 { return s .nfo .State .Bytes }
func (s *ObjectBucketStatus ) BackingStore () string { return "JetStream" }
func (s *ObjectBucketStatus ) Metadata () map [string ]string { return s .nfo .Config .Metadata }
func (s *ObjectBucketStatus ) StreamInfo () *StreamInfo { return s .nfo }
func (s *ObjectBucketStatus ) IsCompressed () bool { return s .nfo .Config .Compression != NoCompression }
func (obs *obs ) Status () (ObjectStoreStatus , error ) {
nfo , err := obs .js .StreamInfo (obs .stream )
if err != nil {
return nil , err
}
status := &ObjectBucketStatus {
nfo : nfo ,
bucket : obs .name ,
}
return status , nil
}
func (o *objResult ) Read (p []byte ) (n int , err error ) {
o .Lock ()
defer o .Unlock ()
readDeadline := time .Now ().Add (o .readTimeout )
if ctx := o .ctx ; ctx != nil {
if deadline , ok := ctx .Deadline (); ok {
readDeadline = deadline
}
select {
case <- ctx .Done ():
if ctx .Err () == context .Canceled {
o .err = ctx .Err ()
} else {
o .err = ErrTimeout
}
default :
}
}
if o .err != nil {
return 0 , o .err
}
if o .r == nil {
return 0 , io .EOF
}
r := o .r .(net .Conn )
r .SetReadDeadline (readDeadline )
n , err = r .Read (p )
if err , ok := err .(net .Error ); ok && err .Timeout () {
if ctx := o .ctx ; ctx != nil {
select {
case <- ctx .Done ():
if ctx .Err () == context .Canceled {
return 0 , ctx .Err ()
} else {
return 0 , ErrTimeout
}
default :
err = nil
}
}
}
if err == io .EOF {
sha := o .digest .Sum (nil )
rsha , decodeErr := DecodeObjectDigest (o .info .Digest )
if decodeErr != nil {
o .err = decodeErr
return 0 , o .err
}
if !bytes .Equal (sha [:], rsha ) {
o .err = ErrDigestMismatch
return 0 , o .err
}
}
return n , err
}
func (o *objResult ) Close () error {
o .Lock ()
defer o .Unlock ()
if o .r == nil {
return nil
}
return o .r .Close ()
}
func (o *objResult ) setErr (err error ) {
o .Lock ()
defer o .Unlock ()
o .err = err
}
func (o *objResult ) Info () (*ObjectInfo , error ) {
o .Lock ()
defer o .Unlock ()
return o .info , o .err
}
func (o *objResult ) Error () error {
o .Lock ()
defer o .Unlock ()
return o .err
}
func (js *js ) ObjectStoreNames (opts ...ObjectOpt ) <-chan string {
var o objOpts
for _ , opt := range opts {
if opt != nil {
if err := opt .configureObject (&o ); err != nil {
return nil
}
}
}
ch := make (chan string )
var cancel context .CancelFunc
if o .ctx == nil {
o .ctx , cancel = context .WithTimeout (context .Background (), defaultRequestWait )
}
l := &streamLister {js : js }
l .js .opts .streamListSubject = fmt .Sprintf (objAllChunksPreTmpl , "*" )
l .js .opts .ctx = o .ctx
go func () {
if cancel != nil {
defer cancel ()
}
defer close (ch )
for l .Next () {
for _ , info := range l .Page () {
if !strings .HasPrefix (info .Config .Name , "OBJ_" ) {
continue
}
select {
case ch <- info .Config .Name :
case <- o .ctx .Done ():
return
}
}
}
}()
return ch
}
func (js *js ) ObjectStores (opts ...ObjectOpt ) <-chan ObjectStoreStatus {
var o objOpts
for _ , opt := range opts {
if opt != nil {
if err := opt .configureObject (&o ); err != nil {
return nil
}
}
}
ch := make (chan ObjectStoreStatus )
var cancel context .CancelFunc
if o .ctx == nil {
o .ctx , cancel = context .WithTimeout (context .Background (), defaultRequestWait )
}
l := &streamLister {js : js }
l .js .opts .streamListSubject = fmt .Sprintf (objAllChunksPreTmpl , "*" )
l .js .opts .ctx = o .ctx
go func () {
if cancel != nil {
defer cancel ()
}
defer close (ch )
for l .Next () {
for _ , info := range l .Page () {
if !strings .HasPrefix (info .Config .Name , "OBJ_" ) {
continue
}
select {
case ch <- &ObjectBucketStatus {
nfo : info ,
bucket : strings .TrimPrefix (info .Config .Name , "OBJ_" ),
}:
case <- o .ctx .Done ():
return
}
}
}
}()
return ch
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .