// Copyright 2021-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nats

import (
	
	
	
	
	
	
	
	
	
	
	
	
	
	

	
	
)

// ObjectStoreManager creates, loads and deletes Object Stores
type ObjectStoreManager interface {
	// ObjectStore will look up and bind to an existing object store instance.
	ObjectStore(bucket string) (ObjectStore, error)
	// CreateObjectStore will create an object store.
	CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error)
	// DeleteObjectStore will delete the underlying stream for the named object.
	DeleteObjectStore(bucket string) error
	// ObjectStoreNames is used to retrieve a list of bucket names
	ObjectStoreNames(opts ...ObjectOpt) <-chan string
	// ObjectStores is used to retrieve a list of bucket statuses
	ObjectStores(opts ...ObjectOpt) <-chan ObjectStoreStatus
}

// ObjectStore is a blob store capable of storing large objects efficiently in
// JetStream streams
type ObjectStore interface {
	// Put will place the contents from the reader into a new object.
	Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error)
	// Get will pull the named object from the object store.
	Get(name string, opts ...GetObjectOpt) (ObjectResult, error)

	// PutBytes is convenience function to put a byte slice into this object store.
	PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error)
	// GetBytes is a convenience function to pull an object from this object store and return it as a byte slice.
	GetBytes(name string, opts ...GetObjectOpt) ([]byte, error)

	// PutString is convenience function to put a string into this object store.
	PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error)
	// GetString is a convenience function to pull an object from this object store and return it as a string.
	GetString(name string, opts ...GetObjectOpt) (string, error)

	// PutFile is convenience function to put a file into this object store.
	PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error)
	// GetFile is a convenience function to pull an object from this object store and place it in a file.
	GetFile(name, file string, opts ...GetObjectOpt) error

	// GetInfo will retrieve the current information for the object.
	GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error)
	// UpdateMeta will update the metadata for the object.
	UpdateMeta(name string, meta *ObjectMeta) error

	// Delete will delete the named object.
	Delete(name string) error

	// AddLink will add a link to another object.
	AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error)

	// AddBucketLink will add a link to another object store.
	AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, error)

	// Seal will seal the object store, no further modifications will be allowed.
	Seal() error

	// Watch for changes in the underlying store and receive meta information updates.
	Watch(opts ...WatchOpt) (ObjectWatcher, error)

	// List will list all the objects in this store.
	List(opts ...ListObjectsOpt) ([]*ObjectInfo, error)

	// Status retrieves run-time status about the backing store of the bucket.
	Status() (ObjectStoreStatus, error)
}

type ObjectOpt interface {
	configureObject(opts *objOpts) error
}

type objOpts struct {
	ctx context.Context
}

// For nats.Context() support.
func ( ContextOpt) ( *objOpts) error {
	.ctx = 
	return nil
}

// ObjectWatcher is what is returned when doing a watch.
type ObjectWatcher interface {
	// Updates returns a channel to read any updates to entries.
	Updates() <-chan *ObjectInfo
	// Stop will stop this watcher.
	Stop() error
}

var (
	ErrObjectConfigRequired = errors.New("nats: object-store config required")
	ErrBadObjectMeta        = errors.New("nats: object-store meta information invalid")
	ErrObjectNotFound       = errors.New("nats: object not found")
	ErrInvalidStoreName     = errors.New("nats: invalid object-store name")
	ErrDigestMismatch       = errors.New("nats: received a corrupt object, digests do not match")
	ErrInvalidDigestFormat  = errors.New("nats: object digest hash has invalid format")
	ErrNoObjectsFound       = errors.New("nats: no objects found")
	ErrObjectAlreadyExists  = errors.New("nats: an object already exists with that name")
	ErrNameRequired         = errors.New("nats: name is required")
	ErrNeeds262             = errors.New("nats: object-store requires at least server version 2.6.2")
	ErrLinkNotAllowed       = errors.New("nats: link cannot be set when putting the object in bucket")
	ErrObjectRequired       = errors.New("nats: object required")
	ErrNoLinkToDeleted      = errors.New("nats: not allowed to link to a deleted object")
	ErrNoLinkToLink         = errors.New("nats: not allowed to link to another link")
	ErrCantGetBucket        = errors.New("nats: invalid Get, object is a link to a bucket")
	ErrBucketRequired       = errors.New("nats: bucket required")
	ErrBucketMalformed      = errors.New("nats: bucket malformed")
	ErrUpdateMetaDeleted    = errors.New("nats: cannot update meta for a deleted object")
)

// ObjectStoreConfig is the config for the object store.
type ObjectStoreConfig struct {
	Bucket      string        `json:"bucket"`
	Description string        `json:"description,omitempty"`
	TTL         time.Duration `json:"max_age,omitempty"`
	MaxBytes    int64         `json:"max_bytes,omitempty"`
	Storage     StorageType   `json:"storage,omitempty"`
	Replicas    int           `json:"num_replicas,omitempty"`
	Placement   *Placement    `json:"placement,omitempty"`

	// Bucket-specific metadata
	// NOTE: Metadata requires nats-server v2.10.0+
	Metadata map[string]string `json:"metadata,omitempty"`
	// Enable underlying stream compression.
	// NOTE: Compression is supported for nats-server 2.10.0+
	Compression bool `json:"compression,omitempty"`
}

type ObjectStoreStatus interface {
	// Bucket is the name of the bucket
	Bucket() string
	// Description is the description supplied when creating the bucket
	Description() string
	// TTL indicates how long objects are kept in the bucket
	TTL() time.Duration
	// Storage indicates the underlying JetStream storage technology used to store data
	Storage() StorageType
	// Replicas indicates how many storage replicas are kept for the data in the bucket
	Replicas() int
	// Sealed indicates the stream is sealed and cannot be modified in any way
	Sealed() bool
	// Size is the combined size of all data in the bucket including metadata, in bytes
	Size() uint64
	// BackingStore provides details about the underlying storage
	BackingStore() string
	// Metadata is the user supplied metadata for the bucket
	Metadata() map[string]string
	// IsCompressed indicates if the data is compressed on disk
	IsCompressed() bool
}

// ObjectMetaOptions
type ObjectMetaOptions struct {
	Link      *ObjectLink `json:"link,omitempty"`
	ChunkSize uint32      `json:"max_chunk_size,omitempty"`
}

// ObjectMeta is high level information about an object.
type ObjectMeta struct {
	Name        string            `json:"name"`
	Description string            `json:"description,omitempty"`
	Headers     Header            `json:"headers,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"`

	// Optional options.
	Opts *ObjectMetaOptions `json:"options,omitempty"`
}

// ObjectInfo is meta plus instance information.
type ObjectInfo struct {
	ObjectMeta
	Bucket  string    `json:"bucket"`
	NUID    string    `json:"nuid"`
	Size    uint64    `json:"size"`
	ModTime time.Time `json:"mtime"`
	Chunks  uint32    `json:"chunks"`
	Digest  string    `json:"digest,omitempty"`
	Deleted bool      `json:"deleted,omitempty"`
}

// ObjectLink is used to embed links to other buckets and objects.
type ObjectLink struct {
	// Bucket is the name of the other object store.
	Bucket string `json:"bucket"`
	// Name can be used to link to a single object.
	// If empty means this is a link to the whole store, like a directory.
	Name string `json:"name,omitempty"`
}

// ObjectResult will return the underlying stream info and also be an io.ReadCloser.
type ObjectResult interface {
	io.ReadCloser
	Info() (*ObjectInfo, error)
	Error() error
}

const (
	objNameTmpl         = "OBJ_%s"     // OBJ_<bucket> // stream name
	objAllChunksPreTmpl = "$O.%s.C.>"  // $O.<bucket>.C.> // chunk stream subject
	objAllMetaPreTmpl   = "$O.%s.M.>"  // $O.<bucket>.M.> // meta stream subject
	objChunksPreTmpl    = "$O.%s.C.%s" // $O.<bucket>.C.<object-nuid> // chunk message subject
	objMetaPreTmpl      = "$O.%s.M.%s" // $O.<bucket>.M.<name-encoded> // meta message subject
	objNoPending        = "0"
	objDefaultChunkSize = uint32(128 * 1024) // 128k
	objDigestType       = "SHA-256="
	objDigestTmpl       = objDigestType + "%s"
)

type obs struct {
	name   string
	stream string
	js     *js
}

// CreateObjectStore will create an object store.
func ( *js) ( *ObjectStoreConfig) (ObjectStore, error) {
	if !.nc.serverMinVersion(2, 6, 2) {
		return nil, ErrNeeds262
	}
	if  == nil {
		return nil, ErrObjectConfigRequired
	}
	if !validBucketRe.MatchString(.Bucket) {
		return nil, ErrInvalidStoreName
	}

	 := .Bucket
	 := fmt.Sprintf(objAllChunksPreTmpl, )
	 := fmt.Sprintf(objAllMetaPreTmpl, )

	// We will set explicitly some values so that we can do comparison
	// if we get an "already in use" error and need to check if it is same.
	// See kv
	 := .Replicas
	if  == 0 {
		 = 1
	}
	 := .MaxBytes
	if  == 0 {
		 = -1
	}
	var  StoreCompression
	if .Compression {
		 = S2Compression
	}
	 := &StreamConfig{
		Name:        fmt.Sprintf(objNameTmpl, ),
		Description: .Description,
		Subjects:    []string{, },
		MaxAge:      .TTL,
		MaxBytes:    ,
		Storage:     .Storage,
		Replicas:    ,
		Placement:   .Placement,
		Discard:     DiscardNew,
		AllowRollup: true,
		AllowDirect: true,
		Metadata:    .Metadata,
		Compression: ,
	}

	// Create our stream.
	,  := .AddStream()
	if  != nil {
		return nil, 
	}

	return &obs{name: , stream: .Name, js: }, nil
}

// ObjectStore will look up and bind to an existing object store instance.
func ( *js) ( string) (ObjectStore, error) {
	if !validBucketRe.MatchString() {
		return nil, ErrInvalidStoreName
	}
	if !.nc.serverMinVersion(2, 6, 2) {
		return nil, ErrNeeds262
	}

	 := fmt.Sprintf(objNameTmpl, )
	,  := .StreamInfo()
	if  != nil {
		return nil, 
	}
	return &obs{name: , stream: .Config.Name, js: }, nil
}

// DeleteObjectStore will delete the underlying stream for the named object.
func ( *js) ( string) error {
	 := fmt.Sprintf(objNameTmpl, )
	return .DeleteStream()
}

func encodeName( string) string {
	return base64.URLEncoding.EncodeToString([]byte())
}

// Put will place the contents from the reader into this object-store.
func ( *obs) ( *ObjectMeta,  io.Reader,  ...ObjectOpt) (*ObjectInfo, error) {
	if  == nil || .Name == "" {
		return nil, ErrBadObjectMeta
	}

	if .Opts == nil {
		.Opts = &ObjectMetaOptions{ChunkSize: objDefaultChunkSize}
	} else if .Opts.Link != nil {
		return nil, ErrLinkNotAllowed
	} else if .Opts.ChunkSize == 0 {
		.Opts.ChunkSize = objDefaultChunkSize
	}

	var  objOpts
	for ,  := range  {
		if  != nil {
			if  := .configureObject(&);  != nil {
				return nil, 
			}
		}
	}
	 := .ctx

	// Create the new nuid so chunks go on a new subject if the name is re-used
	 := nuid.Next()

	// These will be used in more than one place
	 := fmt.Sprintf(objChunksPreTmpl, .name, )

	// Grab existing meta info (einfo). Ok to be found or not found, any other error is a problem
	// Chunks on the old nuid can be cleaned up at the end
	,  := .GetInfo(.Name, GetObjectInfoShowDeleted()) // GetInfo will encode the name
	if  != nil &&  != ErrObjectNotFound {
		return nil, 
	}

	// For async error handling
	var  error
	var  sync.Mutex
	 := func( error) {
		.Lock()
		defer .Unlock()
		 = 
	}
	 := func() error {
		.Lock()
		defer .Unlock()
		return 
	}

	// Create our own JS context to handle errors etc.
	,  := .js.nc.JetStream(PublishAsyncErrHandler(func( JetStream,  *Msg,  error) { () }))
	if  != nil {
		return nil, 
	}

	defer .(*js).cleanupReplySub()

	 := func() error {
		// wait until all pubs are complete or up to default timeout before attempting purge
		select {
		case <-.PublishAsyncComplete():
		case <-time.After(.js.opts.wait):
		}
		if  := .js.purgeStream(.stream, &StreamPurgeRequest{Subject: });  != nil {
			return fmt.Errorf("could not cleanup bucket after erroneous put operation: %w", )
		}
		return nil
	}

	,  := NewMsg(), sha256.New()
	, ,  := make([]byte, .Opts.ChunkSize), 0, uint64(0)

	// set up the info object. The chunk upload sets the size and digest
	 := &ObjectInfo{Bucket: .name, NUID: , ObjectMeta: *}

	for  != nil {
		if  != nil {
			select {
			case <-.Done():
				if .Err() == context.Canceled {
					 = .Err()
				} else {
					 = ErrTimeout
				}
			default:
			}
			if  != nil {
				if  := ();  != nil {
					return nil, errors.Join(, )
				}
				return nil, 
			}
		}

		// Actual read.
		// TODO(dlc) - Deadline?
		,  := .Read()

		// Handle all non EOF errors
		if  != nil &&  != io.EOF {
			if  := ();  != nil {
				return nil, errors.Join(, )
			}
			return nil, 
		}

		// Add chunk only if we received data
		if  > 0 {
			// Chunk processing.
			.Data = [:]
			.Write(.Data)

			// Send msg itself.
			if ,  := .PublishMsgAsync();  != nil {
				if  := ();  != nil {
					return nil, errors.Join(, )
				}
				return nil, 
			}
			if  := ();  != nil {
				if  := ();  != nil {
					return nil, errors.Join(, )
				}
				return nil, 
			}
			// Update totals.
			++
			 += uint64()
		}

		// EOF Processing.
		if  == io.EOF {
			// Place meta info.
			.Size, .Chunks = uint64(), uint32()
			.Digest = GetObjectDigestValue()
			break
		}
	}

	// Prepare the meta message
	 := fmt.Sprintf(objMetaPreTmpl, .name, encodeName(.Name))
	 := NewMsg()
	.Header.Set(MsgRollup, MsgRollupSubject)
	.Data,  = json.Marshal()
	if  != nil {
		if  != nil {
			if  := ();  != nil {
				return nil, errors.Join(, )
			}
		}
		return nil, 
	}

	// Publish the meta message.
	_,  = .PublishMsgAsync()
	if  != nil {
		if  != nil {
			if  := ();  != nil {
				return nil, errors.Join(, )
			}
		}
		return nil, 
	}

	// Wait for all to be processed.
	select {
	case <-.PublishAsyncComplete():
		if  := ();  != nil {
			if  != nil {
				if  := ();  != nil {
					return nil, errors.Join(, )
				}
			}
			return nil, 
		}
	case <-time.After(.js.opts.wait):
		return nil, ErrTimeout
	}

	.ModTime = time.Now().UTC() // This time is not actually the correct time

	// Delete any original chunks.
	if  != nil && !.Deleted {
		 := fmt.Sprintf(objChunksPreTmpl, .name, .NUID)
		if  := .js.purgeStream(.stream, &StreamPurgeRequest{Subject: });  != nil {
			return , 
		}
	}

	// TODO would it be okay to do this to return the info with the correct time?
	// With the understanding that it is an extra call to the server.
	// Otherwise the time the user gets back is the client time, not the server time.
	// return obs.GetInfo(info.Name)

	return , nil
}

// GetObjectDigestValue calculates the base64 value of hashed data
func ( hash.Hash) string {
	 := .Sum(nil)
	return fmt.Sprintf(objDigestTmpl, base64.URLEncoding.EncodeToString([:]))
}

// DecodeObjectDigest decodes base64 hash
func ( string) ([]byte, error) {
	 := strings.SplitN(, "=", 2)
	if len() != 2 {
		return nil, ErrInvalidDigestFormat
	}
	return base64.URLEncoding.DecodeString([1])
}

// ObjectResult impl.
type objResult struct {
	sync.Mutex
	info        *ObjectInfo
	r           io.ReadCloser
	err         error
	ctx         context.Context
	digest      hash.Hash
	readTimeout time.Duration
}

func ( *ObjectInfo) () bool {
	return .ObjectMeta.Opts != nil && .ObjectMeta.Opts.Link != nil
}

type GetObjectOpt interface {
	configureGetObject(opts *getObjectOpts) error
}
type getObjectOpts struct {
	ctx context.Context
	// Include deleted object in the result.
	showDeleted bool
}

type getObjectFn func(opts *getObjectOpts) error

func ( getObjectFn) ( *getObjectOpts) error {
	return ()
}

// GetObjectShowDeleted makes Get() return object if it was marked as deleted.
func () GetObjectOpt {
	return getObjectFn(func( *getObjectOpts) error {
		.showDeleted = true
		return nil
	})
}

// For nats.Context() support.
func ( ContextOpt) ( *getObjectOpts) error {
	.ctx = 
	return nil
}

// Get will pull the object from the underlying stream.
func ( *obs) ( string,  ...GetObjectOpt) (ObjectResult, error) {
	var  getObjectOpts
	for ,  := range  {
		if  != nil {
			if  := .configureGetObject(&);  != nil {
				return nil, 
			}
		}
	}
	 := .ctx
	 := make([]GetObjectInfoOpt, 0)
	if  != nil {
		 = append(, Context())
	}
	if .showDeleted {
		 = append(, GetObjectInfoShowDeleted())
	}

	// Grab meta info.
	,  := .GetInfo(, ...)
	if  != nil {
		return nil, 
	}
	if .NUID == _EMPTY_ {
		return nil, ErrBadObjectMeta
	}

	// Check for object links. If single objects we do a pass through.
	if .isLink() {
		if .ObjectMeta.Opts.Link.Name == _EMPTY_ {
			return nil, ErrCantGetBucket
		}

		// is the link in the same bucket?
		 := .ObjectMeta.Opts.Link.Bucket
		if  == .name {
			return .(.ObjectMeta.Opts.Link.Name)
		}

		// different bucket
		,  := .js.ObjectStore()
		if  != nil {
			return nil, 
		}
		return .Get(.ObjectMeta.Opts.Link.Name)
	}

	 := &objResult{info: , ctx: , readTimeout: .js.opts.wait}
	if .Size == 0 {
		return , nil
	}

	,  := net.Pipe()
	.r = 

	 := func( *Msg,  error) {
		.Close()
		.Sub.Unsubscribe()
		.setErr()
	}

	// For calculating sum256
	.digest = sha256.New()

	 := func( *Msg) {
		var  error
		if  != nil {
			select {
			case <-.Done():
				if errors.Is(.Err(), context.Canceled) {
					 = .Err()
				} else {
					 = ErrTimeout
				}
			default:
			}
			if  != nil {
				(, )
				return
			}
		}

		,  := parser.GetMetadataFields(.Reply)
		if  != nil {
			(, )
			return
		}

		// Write to our pipe.
		for  := .Data; len() > 0; {
			,  := .Write()
			if  != nil {
				(, )
				return
			}
			 = [:]
		}
		// Update sha256
		.digest.Write(.Data)

		// Check if we are done.
		if [parser.AckNumPendingTokenPos] == objNoPending {
			.Close()
			.Sub.Unsubscribe()
		}
	}

	 := fmt.Sprintf(objChunksPreTmpl, .name, .NUID)
	 := fmt.Sprintf(objNameTmpl, .name)
	 := []SubOpt{
		OrderedConsumer(),
		BindStream(),
	}
	_,  = .js.Subscribe(, , ...)
	if  != nil {
		return nil, 
	}

	return , nil
}

// Delete will delete the object.
func ( *obs) ( string) error {
	// Grab meta info.
	,  := .GetInfo(, GetObjectInfoShowDeleted())
	if  != nil {
		return 
	}
	if .NUID == _EMPTY_ {
		return ErrBadObjectMeta
	}

	// Place a rollup delete marker and publish the info
	.Deleted = true
	.Size, .Chunks, .Digest = 0, 0, _EMPTY_

	if  = publishMeta(, .js);  != nil {
		return 
	}

	// Purge chunks for the object.
	 := fmt.Sprintf(objChunksPreTmpl, .name, .NUID)
	return .js.purgeStream(.stream, &StreamPurgeRequest{Subject: })
}

func publishMeta( *ObjectInfo,  JetStreamContext) error {
	// marshal the object into json, don't store an actual time
	.ModTime = time.Time{}
	,  := json.Marshal()
	if  != nil {
		return 
	}

	// Prepare and publish the message.
	 := NewMsg(fmt.Sprintf(objMetaPreTmpl, .Bucket, encodeName(.ObjectMeta.Name)))
	.Header.Set(MsgRollup, MsgRollupSubject)
	.Data = 
	if ,  := .PublishMsg();  != nil {
		return 
	}

	// set the ModTime in case it's returned to the user, even though it's not the correct time.
	.ModTime = time.Now().UTC()
	return nil
}

// AddLink will add a link to another object if it's not deleted and not another link
// name is the name of this link object
// obj is what is being linked too
func ( *obs) ( string,  *ObjectInfo) (*ObjectInfo, error) {
	if  == "" {
		return nil, ErrNameRequired
	}

	// TODO Handle stale info

	if  == nil || .Name == "" {
		return nil, ErrObjectRequired
	}
	if .Deleted {
		return nil, ErrNoLinkToDeleted
	}
	if .isLink() {
		return nil, ErrNoLinkToLink
	}

	// If object with link's name is found, error.
	// If link with link's name is found, that's okay to overwrite.
	// If there was an error that was not ErrObjectNotFound, error.
	,  := .GetInfo(, GetObjectInfoShowDeleted())
	if  != nil {
		if !.isLink() {
			return nil, ErrObjectAlreadyExists
		}
	} else if  != ErrObjectNotFound {
		return nil, 
	}

	// create the meta for the link
	 := &ObjectMeta{
		Name: ,
		Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: .Bucket, Name: .Name}},
	}
	 := &ObjectInfo{Bucket: .name, NUID: nuid.Next(), ModTime: time.Now().UTC(), ObjectMeta: *}

	// put the link object
	if  = publishMeta(, .js);  != nil {
		return nil, 
	}

	return , nil
}

// AddBucketLink will add a link to another object store.
func ( *obs) ( string,  ObjectStore) (*ObjectInfo, error) {
	if  == "" {
		return nil, ErrNameRequired
	}
	if  == nil {
		return nil, ErrBucketRequired
	}
	,  := .(*obs)
	if ! {
		return nil, ErrBucketMalformed
	}

	// If object with link's name is found, error.
	// If link with link's name is found, that's okay to overwrite.
	// If there was an error that was not ErrObjectNotFound, error.
	,  := .GetInfo(, GetObjectInfoShowDeleted())
	if  != nil {
		if !.isLink() {
			return nil, ErrObjectAlreadyExists
		}
	} else if  != ErrObjectNotFound {
		return nil, 
	}

	// create the meta for the link
	 := &ObjectMeta{
		Name: ,
		Opts: &ObjectMetaOptions{Link: &ObjectLink{Bucket: .name}},
	}
	 := &ObjectInfo{Bucket: .name, NUID: nuid.Next(), ObjectMeta: *}

	// put the link object
	 = publishMeta(, .js)
	if  != nil {
		return nil, 
	}

	return , nil
}

// PutBytes is convenience function to put a byte slice into this object store.
func ( *obs) ( string,  []byte,  ...ObjectOpt) (*ObjectInfo, error) {
	return .Put(&ObjectMeta{Name: }, bytes.NewReader(), ...)
}

// GetBytes is a convenience function to pull an object from this object store and return it as a byte slice.
func ( *obs) ( string,  ...GetObjectOpt) ([]byte, error) {
	,  := .Get(, ...)
	if  != nil {
		return nil, 
	}
	defer .Close()

	var  bytes.Buffer
	if ,  := .ReadFrom();  != nil {
		return nil, 
	}
	return .Bytes(), nil
}

// PutString is convenience function to put a string into this object store.
func ( *obs) ( string,  string,  ...ObjectOpt) (*ObjectInfo, error) {
	return .Put(&ObjectMeta{Name: }, strings.NewReader(), ...)
}

// GetString is a convenience function to pull an object from this object store and return it as a string.
func ( *obs) ( string,  ...GetObjectOpt) (string, error) {
	,  := .Get(, ...)
	if  != nil {
		return _EMPTY_, 
	}
	defer .Close()

	var  bytes.Buffer
	if ,  := .ReadFrom();  != nil {
		return _EMPTY_, 
	}
	return .String(), nil
}

// PutFile is convenience function to put a file into an object store.
func ( *obs) ( string,  ...ObjectOpt) (*ObjectInfo, error) {
	,  := os.Open()
	if  != nil {
		return nil, 
	}
	defer .Close()
	return .Put(&ObjectMeta{Name: }, , ...)
}

// GetFile is a convenience function to pull and object and place in a file.
func ( *obs) (,  string,  ...GetObjectOpt) error {
	// Expect file to be new.
	,  := os.OpenFile(, os.O_WRONLY|os.O_CREATE, 0600)
	if  != nil {
		return 
	}
	defer .Close()

	,  := .Get(, ...)
	if  != nil {
		os.Remove(.Name())
		return 
	}
	defer .Close()

	// Stream copy to the file.
	_,  = io.Copy(, )
	return 
}

type GetObjectInfoOpt interface {
	configureGetInfo(opts *getObjectInfoOpts) error
}
type getObjectInfoOpts struct {
	ctx context.Context
	// Include deleted object in the result.
	showDeleted bool
}

type getObjectInfoFn func(opts *getObjectInfoOpts) error

func ( getObjectInfoFn) ( *getObjectInfoOpts) error {
	return ()
}

// GetObjectInfoShowDeleted makes GetInfo() return object if it was marked as deleted.
func () GetObjectInfoOpt {
	return getObjectInfoFn(func( *getObjectInfoOpts) error {
		.showDeleted = true
		return nil
	})
}

// For nats.Context() support.
func ( ContextOpt) ( *getObjectInfoOpts) error {
	.ctx = 
	return nil
}

// GetInfo will retrieve the current information for the object.
func ( *obs) ( string,  ...GetObjectInfoOpt) (*ObjectInfo, error) {
	// Grab last meta value we have.
	if  == "" {
		return nil, ErrNameRequired
	}
	var  getObjectInfoOpts
	for ,  := range  {
		if  != nil {
			if  := .configureGetInfo(&);  != nil {
				return nil, 
			}
		}
	}

	 := fmt.Sprintf(objMetaPreTmpl, .name, encodeName()) // used as data in a JS API call
	 := fmt.Sprintf(objNameTmpl, .name)

	,  := .js.GetLastMsg(, )
	if  != nil {
		if errors.Is(, ErrMsgNotFound) {
			 = ErrObjectNotFound
		}
		return nil, 
	}
	var  ObjectInfo
	if  := json.Unmarshal(.Data, &);  != nil {
		return nil, ErrBadObjectMeta
	}
	if !.showDeleted && .Deleted {
		return nil, ErrObjectNotFound
	}
	.ModTime = .Time
	return &, nil
}

// UpdateMeta will update the meta for the object.
func ( *obs) ( string,  *ObjectMeta) error {
	if  == nil {
		return ErrBadObjectMeta
	}

	// Grab the current meta.
	,  := .GetInfo()
	if  != nil {
		if errors.Is(, ErrObjectNotFound) {
			return ErrUpdateMetaDeleted
		}
		return 
	}

	// If the new name is different from the old, and it exists, error
	// If there was an error that was not ErrObjectNotFound, error.
	if  != .Name {
		,  := .GetInfo(.Name, GetObjectInfoShowDeleted())
		if  != nil && !errors.Is(, ErrObjectNotFound) {
			return 
		}
		if  == nil && !.Deleted {
			return ErrObjectAlreadyExists
		}
	}

	// Update Meta prevents update of ObjectMetaOptions (Link, ChunkSize)
	// These should only be updated internally when appropriate.
	.Name = .Name
	.Description = .Description
	.Headers = .Headers
	.Metadata = .Metadata

	// Prepare the meta message
	if  = publishMeta(, .js);  != nil {
		return 
	}

	// did the name of this object change? We just stored the meta under the new name
	// so delete the meta from the old name via purge stream for subject
	if  != .Name {
		 := fmt.Sprintf(objMetaPreTmpl, .name, encodeName())
		return .js.purgeStream(.stream, &StreamPurgeRequest{Subject: })
	}

	return nil
}

// Seal will seal the object store, no further modifications will be allowed.
func ( *obs) () error {
	 := fmt.Sprintf(objNameTmpl, .name)
	,  := .js.StreamInfo()
	if  != nil {
		return 
	}
	// Seal the stream from being able to take on more messages.
	 := .Config
	.Sealed = true
	_,  = .js.UpdateStream(&)
	return 
}

// Implementation for Watch
type objWatcher struct {
	updates chan *ObjectInfo
	sub     *Subscription
}

// Updates returns the interior channel.
func ( *objWatcher) () <-chan *ObjectInfo {
	if  == nil {
		return nil
	}
	return .updates
}

// Stop will unsubscribe from the watcher.
func ( *objWatcher) () error {
	if  == nil {
		return nil
	}
	return .sub.Unsubscribe()
}

// Watch for changes in the underlying store and receive meta information updates.
func ( *obs) ( ...WatchOpt) (ObjectWatcher, error) {
	var  watchOpts
	for ,  := range  {
		if  != nil {
			if  := .configureWatcher(&);  != nil {
				return nil, 
			}
		}
	}

	var  bool

	 := &objWatcher{updates: make(chan *ObjectInfo, 32)}

	 := func( *Msg) {
		var  ObjectInfo
		if  := json.Unmarshal(.Data, &);  != nil {
			return // TODO(dlc) - Communicate this upwards?
		}
		,  := .Metadata()
		if  != nil {
			return
		}

		if !.ignoreDeletes || !.Deleted {
			.ModTime = .Timestamp
			.updates <- &
		}

		// if UpdatesOnly is set, no not send nil to the channel
		// as it would always be triggered after initializing the watcher
		if ! && .NumPending == 0 {
			 = true
			.updates <- nil
		}
	}

	 := fmt.Sprintf(objAllMetaPreTmpl, .name)
	,  := .js.GetLastMsg(.stream, )
	// if there are no messages on the stream and we are not watching
	// updates only, send nil to the channel to indicate that the initial
	// watch is done
	if !.updatesOnly {
		if errors.Is(, ErrMsgNotFound) {
			 = true
			.updates <- nil
		}
	} else {
		// if UpdatesOnly was used, mark initialization as complete
		 = true
	}

	// Used ordered consumer to deliver results.
	 := fmt.Sprintf(objNameTmpl, .name)
	 := []SubOpt{OrderedConsumer(), BindStream()}
	if !.includeHistory {
		 = append(, DeliverLastPerSubject())
	}
	if .updatesOnly {
		 = append(, DeliverNew())
	}
	,  := .js.Subscribe(, , ...)
	if  != nil {
		return nil, 
	}
	// Set us up to close when the waitForMessages func returns.
	.pDone = func( string) {
		close(.updates)
	}
	.sub = 
	return , nil
}

type ListObjectsOpt interface {
	configureListObjects(opts *listObjectOpts) error
}
type listObjectOpts struct {
	ctx context.Context
	// Include deleted objects in the result channel.
	showDeleted bool
}

type listObjectsFn func(opts *listObjectOpts) error

func ( listObjectsFn) ( *listObjectOpts) error {
	return ()
}

// ListObjectsShowDeleted makes ListObjects() return deleted objects.
func () ListObjectsOpt {
	return listObjectsFn(func( *listObjectOpts) error {
		.showDeleted = true
		return nil
	})
}

// For nats.Context() support.
func ( ContextOpt) ( *listObjectOpts) error {
	.ctx = 
	return nil
}

// List will list all the objects in this store.
func ( *obs) ( ...ListObjectsOpt) ([]*ObjectInfo, error) {
	var  listObjectOpts
	for ,  := range  {
		if  != nil {
			if  := .configureListObjects(&);  != nil {
				return nil, 
			}
		}
	}
	 := make([]WatchOpt, 0)
	if !.showDeleted {
		 = append(, IgnoreDeletes())
	}
	,  := .Watch(...)
	if  != nil {
		return nil, 
	}
	defer .Stop()
	if .ctx == nil {
		.ctx = context.Background()
	}

	var  []*ObjectInfo
	 := .Updates()
:
	for {
		select {
		case  := <-:
			if  == nil {
				break 
			}
			 = append(, )
		case <-.ctx.Done():
			return nil, .ctx.Err()
		}
	}
	if len() == 0 {
		return nil, ErrNoObjectsFound
	}
	return , nil
}

// ObjectBucketStatus  represents status of a Bucket, implements ObjectStoreStatus
type ObjectBucketStatus struct {
	nfo    *StreamInfo
	bucket string
}

// Bucket is the name of the bucket
func ( *ObjectBucketStatus) () string { return .bucket }

// Description is the description supplied when creating the bucket
func ( *ObjectBucketStatus) () string { return .nfo.Config.Description }

// TTL indicates how long objects are kept in the bucket
func ( *ObjectBucketStatus) () time.Duration { return .nfo.Config.MaxAge }

// Storage indicates the underlying JetStream storage technology used to store data
func ( *ObjectBucketStatus) () StorageType { return .nfo.Config.Storage }

// Replicas indicates how many storage replicas are kept for the data in the bucket
func ( *ObjectBucketStatus) () int { return .nfo.Config.Replicas }

// Sealed indicates the stream is sealed and cannot be modified in any way
func ( *ObjectBucketStatus) () bool { return .nfo.Config.Sealed }

// Size is the combined size of all data in the bucket including metadata, in bytes
func ( *ObjectBucketStatus) () uint64 { return .nfo.State.Bytes }

// BackingStore indicates what technology is used for storage of the bucket
func ( *ObjectBucketStatus) () string { return "JetStream" }

// Metadata is the metadata supplied when creating the bucket
func ( *ObjectBucketStatus) () map[string]string { return .nfo.Config.Metadata }

// StreamInfo is the stream info retrieved to create the status
func ( *ObjectBucketStatus) () *StreamInfo { return .nfo }

// IsCompressed indicates if the data is compressed on disk
func ( *ObjectBucketStatus) () bool { return .nfo.Config.Compression != NoCompression }

// Status retrieves run-time status about a bucket
func ( *obs) () (ObjectStoreStatus, error) {
	,  := .js.StreamInfo(.stream)
	if  != nil {
		return nil, 
	}

	 := &ObjectBucketStatus{
		nfo:    ,
		bucket: .name,
	}

	return , nil
}

// Read impl.
func ( *objResult) ( []byte) ( int,  error) {
	.Lock()
	defer .Unlock()
	 := time.Now().Add(.readTimeout)
	if  := .ctx;  != nil {
		if ,  := .Deadline();  {
			 = 
		}
		select {
		case <-.Done():
			if .Err() == context.Canceled {
				.err = .Err()
			} else {
				.err = ErrTimeout
			}
		default:
		}
	}
	if .err != nil {
		return 0, .err
	}
	if .r == nil {
		return 0, io.EOF
	}

	 := .r.(net.Conn)
	.SetReadDeadline()
	,  = .Read()
	if ,  := .(net.Error);  && .Timeout() {
		if  := .ctx;  != nil {
			select {
			case <-.Done():
				if .Err() == context.Canceled {
					return 0, .Err()
				} else {
					return 0, ErrTimeout
				}
			default:
				 = nil
			}
		}
	}
	if  == io.EOF {
		// Make sure the digest matches.
		 := .digest.Sum(nil)
		,  := DecodeObjectDigest(.info.Digest)
		if  != nil {
			.err = 
			return 0, .err
		}
		if !bytes.Equal([:], ) {
			.err = ErrDigestMismatch
			return 0, .err
		}
	}
	return , 
}

// Close impl.
func ( *objResult) () error {
	.Lock()
	defer .Unlock()
	if .r == nil {
		return nil
	}
	return .r.Close()
}

func ( *objResult) ( error) {
	.Lock()
	defer .Unlock()
	.err = 
}

func ( *objResult) () (*ObjectInfo, error) {
	.Lock()
	defer .Unlock()
	return .info, .err
}

func ( *objResult) () error {
	.Lock()
	defer .Unlock()
	return .err
}

// ObjectStoreNames is used to retrieve a list of bucket names
func ( *js) ( ...ObjectOpt) <-chan string {
	var  objOpts
	for ,  := range  {
		if  != nil {
			if  := .configureObject(&);  != nil {
				return nil
			}
		}
	}
	 := make(chan string)
	var  context.CancelFunc
	if .ctx == nil {
		.ctx,  = context.WithTimeout(context.Background(), defaultRequestWait)
	}
	 := &streamLister{js: }
	.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*")
	.js.opts.ctx = .ctx
	go func() {
		if  != nil {
			defer ()
		}
		defer close()
		for .Next() {
			for ,  := range .Page() {
				if !strings.HasPrefix(.Config.Name, "OBJ_") {
					continue
				}
				select {
				case  <- .Config.Name:
				case <-.ctx.Done():
					return
				}
			}
		}
	}()

	return 
}

// ObjectStores is used to retrieve a list of bucket statuses
func ( *js) ( ...ObjectOpt) <-chan ObjectStoreStatus {
	var  objOpts
	for ,  := range  {
		if  != nil {
			if  := .configureObject(&);  != nil {
				return nil
			}
		}
	}
	 := make(chan ObjectStoreStatus)
	var  context.CancelFunc
	if .ctx == nil {
		.ctx,  = context.WithTimeout(context.Background(), defaultRequestWait)
	}
	 := &streamLister{js: }
	.js.opts.streamListSubject = fmt.Sprintf(objAllChunksPreTmpl, "*")
	.js.opts.ctx = .ctx
	go func() {
		if  != nil {
			defer ()
		}
		defer close()
		for .Next() {
			for ,  := range .Page() {
				if !strings.HasPrefix(.Config.Name, "OBJ_") {
					continue
				}
				select {
				case  <- &ObjectBucketStatus{
					nfo:    ,
					bucket: strings.TrimPrefix(.Config.Name, "OBJ_"),
				}:
				case <-.ctx.Done():
					return
				}
			}
		}
	}()

	return 
}