// Copyright 2021-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nats

import (
	
	
	
	
	
	
	
	
	

	
)

// KeyValueManager is used to manage KeyValue stores.
type KeyValueManager interface {
	// KeyValue will lookup and bind to an existing KeyValue store.
	KeyValue(bucket string) (KeyValue, error)
	// CreateKeyValue will create a KeyValue store with the following configuration.
	CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error)
	// DeleteKeyValue will delete this KeyValue store (JetStream stream).
	DeleteKeyValue(bucket string) error
	// KeyValueStoreNames is used to retrieve a list of key value store names
	KeyValueStoreNames() <-chan string
	// KeyValueStores is used to retrieve a list of key value store statuses
	KeyValueStores() <-chan KeyValueStatus
}

// KeyValue contains methods to operate on a KeyValue store.
type KeyValue interface {
	// Get returns the latest value for the key.
	Get(key string) (entry KeyValueEntry, err error)
	// GetRevision returns a specific revision value for the key.
	GetRevision(key string, revision uint64) (entry KeyValueEntry, err error)
	// Put will place the new value for the key into the store.
	Put(key string, value []byte) (revision uint64, err error)
	// PutString will place the string for the key into the store.
	PutString(key string, value string) (revision uint64, err error)
	// Create will add the key/value pair iff it does not exist.
	Create(key string, value []byte) (revision uint64, err error)
	// Update will update the value iff the latest revision matches.
	// Update also resets the TTL associated with the key (if any).
	Update(key string, value []byte, last uint64) (revision uint64, err error)
	// Delete will place a delete marker and leave all revisions.
	Delete(key string, opts ...DeleteOpt) error
	// Purge will place a delete marker and remove all previous revisions.
	Purge(key string, opts ...DeleteOpt) error
	// Watch for any updates to keys that match the keys argument which could include wildcards.
	// Watch will send a nil entry when it has received all initial values.
	Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
	// WatchAll will invoke the callback for all updates.
	WatchAll(opts ...WatchOpt) (KeyWatcher, error)
	// WatchFiltered will watch for any updates to keys that match the keys
	// argument. It can be configured with the same options as Watch.
	WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error)
	// Keys will return all keys.
	// Deprecated: Use ListKeys instead to avoid memory issues.
	Keys(opts ...WatchOpt) ([]string, error)
	// ListKeys will return all keys in a channel.
	ListKeys(opts ...WatchOpt) (KeyLister, error)
	// History will return all historical values for the key.
	History(key string, opts ...WatchOpt) ([]KeyValueEntry, error)
	// Bucket returns the current bucket name.
	Bucket() string
	// PurgeDeletes will remove all current delete markers.
	PurgeDeletes(opts ...PurgeOpt) error
	// Status retrieves the status and configuration of a bucket
	Status() (KeyValueStatus, error)
}

// KeyValueStatus is run-time status about a Key-Value bucket
type KeyValueStatus interface {
	// Bucket the name of the bucket
	Bucket() string

	// Values is how many messages are in the bucket, including historical values
	Values() uint64

	// History returns the configured history kept per key
	History() int64

	// TTL is how long the bucket keeps values for
	TTL() time.Duration

	// BackingStore indicates what technology is used for storage of the bucket
	BackingStore() string

	// Bytes returns the size in bytes of the bucket
	Bytes() uint64

	// IsCompressed indicates if the data is compressed on disk
	IsCompressed() bool
}

// KeyWatcher is what is returned when doing a watch.
type KeyWatcher interface {
	// Context returns watcher context optionally provided by nats.Context option.
	Context() context.Context
	// Updates returns a channel to read any updates to entries.
	Updates() <-chan KeyValueEntry
	// Stop will stop this watcher.
	Stop() error
	// Error returns a channel that will receive any error that occurs during
	// watching. In particular, this will receive an error if the watcher times
	// out while expecting more initial keys. The channel is closed when the
	// watch operation completes or when Stop() is called.
	Error() <-chan error
}

// KeyLister is used to retrieve a list of key value store keys
type KeyLister interface {
	Keys() <-chan string
	Stop() error
	// Error returns a channel that will receive any error that occurs during
	// key listing. In particular, this will receive an error if the underlying
	// watcher times out while expecting more keys. The channel is closed when
	// the listing operation completes or when Stop() is called.
	Error() <-chan error
}

type WatchOpt interface {
	configureWatcher(opts *watchOpts) error
}

// For nats.Context() support.
func ( ContextOpt) ( *watchOpts) error {
	.ctx = 
	return nil
}

type watchOpts struct {
	ctx context.Context
	// Do not send delete markers to the update channel.
	ignoreDeletes bool
	// Include all history per subject, not just last one.
	includeHistory bool
	// Include only updates for keys.
	updatesOnly bool
	// retrieve only the meta data of the entry
	metaOnly bool
}

type watchOptFn func(opts *watchOpts) error

func ( watchOptFn) ( *watchOpts) error {
	return ()
}

// IncludeHistory instructs the key watcher to include historical values as well.
func () WatchOpt {
	return watchOptFn(func( *watchOpts) error {
		if .updatesOnly {
			return errors.New("nats: include history can not be used with updates only")
		}
		.includeHistory = true
		return nil
	})
}

// UpdatesOnly instructs the key watcher to only include updates on values (without latest values when started).
func () WatchOpt {
	return watchOptFn(func( *watchOpts) error {
		if .includeHistory {
			return errors.New("nats: updates only can not be used with include history")
		}
		.updatesOnly = true
		return nil
	})
}

// IgnoreDeletes will have the key watcher not pass any deleted keys.
func () WatchOpt {
	return watchOptFn(func( *watchOpts) error {
		.ignoreDeletes = true
		return nil
	})
}

// MetaOnly instructs the key watcher to retrieve only the entry meta data, not the entry value
func () WatchOpt {
	return watchOptFn(func( *watchOpts) error {
		.metaOnly = true
		return nil
	})
}

type PurgeOpt interface {
	configurePurge(opts *purgeOpts) error
}

type purgeOpts struct {
	dmthr time.Duration // Delete markers threshold
	ctx   context.Context
}

// DeleteMarkersOlderThan indicates that delete or purge markers older than that
// will be deleted as part of PurgeDeletes() operation, otherwise, only the data
// will be removed but markers that are recent will be kept.
// Note that if no option is specified, the default is 30 minutes. You can set
// this option to a negative value to instruct to always remove the markers,
// regardless of their age.
type DeleteMarkersOlderThan time.Duration

func ( DeleteMarkersOlderThan) ( *purgeOpts) error {
	.dmthr = time.Duration()
	return nil
}

// For nats.Context() support.
func ( ContextOpt) ( *purgeOpts) error {
	.ctx = 
	return nil
}

type DeleteOpt interface {
	configureDelete(opts *deleteOpts) error
}

type deleteOpts struct {
	// Remove all previous revisions.
	purge bool

	// Delete only if the latest revision matches.
	revision uint64
}

type deleteOptFn func(opts *deleteOpts) error

func ( deleteOptFn) ( *deleteOpts) error {
	return ()
}

// LastRevision deletes if the latest revision matches.
func ( uint64) DeleteOpt {
	return deleteOptFn(func( *deleteOpts) error {
		.revision = 
		return nil
	})
}

// purge removes all previous revisions.
func purge() DeleteOpt {
	return deleteOptFn(func( *deleteOpts) error {
		.purge = true
		return nil
	})
}

// KeyValueConfig is for configuring a KeyValue store.
type KeyValueConfig struct {
	Bucket       string          `json:"bucket"`
	Description  string          `json:"description,omitempty"`
	MaxValueSize int32           `json:"max_value_size,omitempty"`
	History      uint8           `json:"history,omitempty"`
	TTL          time.Duration   `json:"ttl,omitempty"`
	MaxBytes     int64           `json:"max_bytes,omitempty"`
	Storage      StorageType     `json:"storage,omitempty"`
	Replicas     int             `json:"num_replicas,omitempty"`
	Placement    *Placement      `json:"placement,omitempty"`
	RePublish    *RePublish      `json:"republish,omitempty"`
	Mirror       *StreamSource   `json:"mirror,omitempty"`
	Sources      []*StreamSource `json:"sources,omitempty"`

	// Enable underlying stream compression.
	// NOTE: Compression is supported for nats-server 2.10.0+
	Compression bool `json:"compression,omitempty"`
}

// Used to watch all keys.
const (
	KeyValueMaxHistory = 64
	AllKeys            = ">"
	kvLatestRevision   = 0
	kvop               = "KV-Operation"
	kvdel              = "DEL"
	kvpurge            = "PURGE"
)

type KeyValueOp uint8

const (
	KeyValuePut KeyValueOp = iota
	KeyValueDelete
	KeyValuePurge
)

func ( KeyValueOp) () string {
	switch  {
	case KeyValuePut:
		return "KeyValuePutOp"
	case KeyValueDelete:
		return "KeyValueDeleteOp"
	case KeyValuePurge:
		return "KeyValuePurgeOp"
	default:
		return "Unknown Operation"
	}
}

// KeyValueEntry is a retrieved entry for Get or List or Watch.
type KeyValueEntry interface {
	// Bucket is the bucket the data was loaded from.
	Bucket() string
	// Key is the key that was retrieved.
	Key() string
	// Value is the retrieved value.
	Value() []byte
	// Revision is a unique sequence for this value.
	Revision() uint64
	// Created is the time the data was put in the bucket.
	Created() time.Time
	// Delta is distance from the latest value.
	Delta() uint64
	// Operation returns Put or Delete or Purge.
	Operation() KeyValueOp
}

// Errors
var (
	ErrKeyValueConfigRequired = errors.New("nats: config required")
	ErrInvalidBucketName      = errors.New("nats: invalid bucket name")
	ErrInvalidKey             = errors.New("nats: invalid key")
	ErrBucketNotFound         = errors.New("nats: bucket not found")
	ErrBadBucket              = errors.New("nats: bucket not valid key-value store")
	ErrKeyNotFound            = errors.New("nats: key not found")
	ErrKeyDeleted             = errors.New("nats: key was deleted")
	ErrHistoryToLarge         = errors.New("nats: history limited to a max of 64")
	ErrNoKeysFound            = errors.New("nats: no keys found")
	ErrKeyWatcherTimeout      = errors.New("nats: key watcher timed out waiting for initial keys")
)

var (
	ErrKeyExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamWrongLastSequence, Code: 400}, message: "key exists"}
)

const (
	kvBucketNamePre         = "KV_"
	kvBucketNameTmpl        = "KV_%s"
	kvSubjectsTmpl          = "$KV.%s.>"
	kvSubjectsPreTmpl       = "$KV.%s."
	kvSubjectsPreDomainTmpl = "%s.$KV.%s."
)

// Regex for valid keys and buckets.
var (
	validBucketRe    = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`)
	validKeyRe       = regexp.MustCompile(`^[-/_=\.a-zA-Z0-9]+$`)
	validSearchKeyRe = regexp.MustCompile(`^[-/_=\.a-zA-Z0-9*]*[>]?$`)
)

// KeyValue will lookup and bind to an existing KeyValue store.
func ( *js) ( string) (KeyValue, error) {
	if !.nc.serverMinVersion(2, 6, 2) {
		return nil, errors.New("nats: key-value requires at least server version 2.6.2")
	}
	if !bucketValid() {
		return nil, ErrInvalidBucketName
	}
	 := fmt.Sprintf(kvBucketNameTmpl, )
	,  := .StreamInfo()
	if  != nil {
		if errors.Is(, ErrStreamNotFound) {
			 = ErrBucketNotFound
		}
		return nil, 
	}
	// Do some quick sanity checks that this is a correctly formed stream for KV.
	// Max msgs per subject should be > 0.
	if .Config.MaxMsgsPerSubject < 1 {
		return nil, ErrBadBucket
	}

	return mapStreamToKVS(, ), nil
}

// CreateKeyValue will create a KeyValue store with the following configuration.
func ( *js) ( *KeyValueConfig) (KeyValue, error) {
	if !.nc.serverMinVersion(2, 6, 2) {
		return nil, errors.New("nats: key-value requires at least server version 2.6.2")
	}
	if  == nil {
		return nil, ErrKeyValueConfigRequired
	}
	if !bucketValid(.Bucket) {
		return nil, ErrInvalidBucketName
	}
	if ,  := .AccountInfo();  != nil {
		return nil, 
	}

	// Default to 1 for history. Max is 64 for now.
	 := int64(1)
	if .History > 0 {
		if .History > KeyValueMaxHistory {
			return nil, ErrHistoryToLarge
		}
		 = int64(.History)
	}

	 := .Replicas
	if  == 0 {
		 = 1
	}

	// We will set explicitly some values so that we can do comparison
	// if we get an "already in use" error and need to check if it is same.
	 := .MaxBytes
	if  == 0 {
		 = -1
	}
	 := .MaxValueSize
	if  == 0 {
		 = -1
	}
	// When stream's MaxAge is not set, server uses 2 minutes as the default
	// for the duplicate window. If MaxAge is set, and lower than 2 minutes,
	// then the duplicate window will be set to that. If MaxAge is greater,
	// we will cap the duplicate window to 2 minutes (to be consistent with
	// previous behavior).
	 := 2 * time.Minute
	if .TTL > 0 && .TTL <  {
		 = .TTL
	}
	var  StoreCompression
	if .Compression {
		 = S2Compression
	}
	 := &StreamConfig{
		Name:              fmt.Sprintf(kvBucketNameTmpl, .Bucket),
		Description:       .Description,
		MaxMsgsPerSubject: ,
		MaxBytes:          ,
		MaxAge:            .TTL,
		MaxMsgSize:        ,
		Storage:           .Storage,
		Replicas:          ,
		Placement:         .Placement,
		AllowRollup:       true,
		DenyDelete:        true,
		Duplicates:        ,
		MaxMsgs:           -1,
		MaxConsumers:      -1,
		AllowDirect:       true,
		RePublish:         .RePublish,
		Compression:       ,
	}
	if .Mirror != nil {
		// Copy in case we need to make changes so we do not change caller's version.
		 := .Mirror.copy()
		if !strings.HasPrefix(.Name, kvBucketNamePre) {
			.Name = fmt.Sprintf(kvBucketNameTmpl, .Name)
		}
		.Mirror = 
		.MirrorDirect = true
	} else if len(.Sources) > 0 {
		for ,  := range .Sources {
			var  string
			if strings.HasPrefix(.Name, kvBucketNamePre) {
				 = .Name[len(kvBucketNamePre):]
			} else {
				 = .Name
				.Name = fmt.Sprintf(kvBucketNameTmpl, .Name)
			}

			if .External == nil ||  != .Bucket {
				.SubjectTransforms = []SubjectTransformConfig{{Source: fmt.Sprintf(kvSubjectsTmpl, ), Destination: fmt.Sprintf(kvSubjectsTmpl, .Bucket)}}
			}
			.Sources = append(.Sources, )
		}
		.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, .Bucket)}
	} else {
		.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, .Bucket)}
	}

	// If we are at server version 2.7.2 or above use DiscardNew. We can not use DiscardNew for 2.7.1 or below.
	if .nc.serverMinVersion(2, 7, 2) {
		.Discard = DiscardNew
	}

	,  := .AddStream()
	if  != nil {
		// If we have a failure to add, it could be because we have
		// a config change if the KV was created against a pre 2.7.2
		// and we are now moving to a v2.7.2+. If that is the case
		// and the only difference is the discard policy, then update
		// the stream.
		// The same logic applies for KVs created pre 2.9.x and
		// the AllowDirect setting.
		if errors.Is(, ErrStreamNameAlreadyInUse) {
			if , _ = .StreamInfo(.Name);  != nil {
				// To compare, make the server's stream info discard
				// policy same than ours.
				.Config.Discard = .Discard
				// Also need to set allow direct for v2.9.x+
				.Config.AllowDirect = .AllowDirect
				if reflect.DeepEqual(&.Config, ) {
					,  = .UpdateStream()
				}
			}
		}
		if  != nil {
			return nil, 
		}
	}
	return mapStreamToKVS(, ), nil
}

// DeleteKeyValue will delete this KeyValue store (JetStream stream).
func ( *js) ( string) error {
	if !bucketValid() {
		return ErrInvalidBucketName
	}
	 := fmt.Sprintf(kvBucketNameTmpl, )
	return .DeleteStream()
}

type kvs struct {
	name   string
	stream string
	pre    string
	putPre string
	js     *js
	// If true, it means that APIPrefix/Domain was set in the context
	// and we need to add something to some of our high level protocols
	// (such as Put, etc..)
	useJSPfx bool
	// To know if we can use the stream direct get API
	useDirect bool
}

// Underlying entry.
type kve struct {
	bucket   string
	key      string
	value    []byte
	revision uint64
	delta    uint64
	created  time.Time
	op       KeyValueOp
}

func ( *kve) () string        { return .bucket }
func ( *kve) () string           { return .key }
func ( *kve) () []byte         { return .value }
func ( *kve) () uint64      { return .revision }
func ( *kve) () time.Time    { return .created }
func ( *kve) () uint64         { return .delta }
func ( *kve) () KeyValueOp { return .op }

func bucketValid( string) bool {
	if len() == 0 {
		return false
	}
	return validBucketRe.MatchString()
}

func keyValid( string) bool {
	if len() == 0 || [0] == '.' || [len()-1] == '.' {
		return false
	}
	return validKeyRe.MatchString()
}

func searchKeyValid( string) bool {
	if len() == 0 || [0] == '.' || [len()-1] == '.' {
		return false
	}
	return validSearchKeyRe.MatchString()
}

// Get returns the latest value for the key.
func ( *kvs) ( string) (KeyValueEntry, error) {
	,  := .get(, kvLatestRevision)
	if  != nil {
		if errors.Is(, ErrKeyDeleted) {
			return nil, ErrKeyNotFound
		}
		return nil, 
	}

	return , nil
}

// GetRevision returns a specific revision value for the key.
func ( *kvs) ( string,  uint64) (KeyValueEntry, error) {
	,  := .get(, )
	if  != nil {
		if errors.Is(, ErrKeyDeleted) {
			return nil, ErrKeyNotFound
		}
		return nil, 
	}

	return , nil
}

func ( *kvs) ( string,  uint64) (KeyValueEntry, error) {
	if !keyValid() {
		return nil, ErrInvalidKey
	}

	var  strings.Builder
	.WriteString(.pre)
	.WriteString()

	var  *RawStreamMsg
	var  error
	var  [1]JSOpt
	 := [:0]
	if .useDirect {
		 = append(, DirectGet())
	}

	if  == kvLatestRevision {
		,  = .js.GetLastMsg(.stream, .String(), ...)
	} else {
		,  = .js.GetMsg(.stream, , ...)
		// If a sequence was provided, just make sure that the retrieved
		// message subject matches the request.
		if  == nil && .Subject != .String() {
			return nil, ErrKeyNotFound
		}
	}
	if  != nil {
		if errors.Is(, ErrMsgNotFound) {
			 = ErrKeyNotFound
		}
		return nil, 
	}

	 := &kve{
		bucket:   .name,
		key:      ,
		value:    .Data,
		revision: .Sequence,
		created:  .Time,
	}

	// Double check here that this is not a DEL Operation marker.
	if len(.Header) > 0 {
		switch .Header.Get(kvop) {
		case kvdel:
			.op = KeyValueDelete
			return , ErrKeyDeleted
		case kvpurge:
			.op = KeyValuePurge
			return , ErrKeyDeleted
		}
	}

	return , nil
}

// Put will place the new value for the key into the store.
func ( *kvs) ( string,  []byte) ( uint64,  error) {
	if !keyValid() {
		return 0, ErrInvalidKey
	}

	var  strings.Builder
	if .useJSPfx {
		.WriteString(.js.opts.pre)
	}
	if .putPre != _EMPTY_ {
		.WriteString(.putPre)
	} else {
		.WriteString(.pre)
	}
	.WriteString()

	,  := .js.Publish(.String(), )
	if  != nil {
		return 0, 
	}
	return .Sequence, 
}

// PutString will place the string for the key into the store.
func ( *kvs) ( string,  string) ( uint64,  error) {
	return .Put(, []byte())
}

// Create will add the key/value pair if it does not exist.
func ( *kvs) ( string,  []byte) ( uint64,  error) {
	,  := .Update(, , 0)
	if  == nil {
		return , nil
	}

	// TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that
	// so we need to double check.
	if ,  := .get(, kvLatestRevision); errors.Is(, ErrKeyDeleted) {
		return .Update(, , .Revision())
	}

	// Check if the expected last subject sequence is not zero which implies
	// the key already exists.
	if errors.Is(, ErrKeyExists) {
		 := ErrKeyExists.(*jsError)
		return 0, fmt.Errorf("%w: %s", , .message)
	}

	return 0, 
}

// Update will update the value if the latest revision matches.
func ( *kvs) ( string,  []byte,  uint64) (uint64, error) {
	if !keyValid() {
		return 0, ErrInvalidKey
	}

	var  strings.Builder
	if .useJSPfx {
		.WriteString(.js.opts.pre)
	}
	.WriteString(.pre)
	.WriteString()

	 := Msg{Subject: .String(), Header: Header{}, Data: }
	.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(, 10))

	,  := .js.PublishMsg(&)
	if  != nil {
		return 0, 
	}
	return .Sequence, 
}

// Delete will place a delete marker and leave all revisions.
func ( *kvs) ( string,  ...DeleteOpt) error {
	if !keyValid() {
		return ErrInvalidKey
	}

	var  strings.Builder
	if .useJSPfx {
		.WriteString(.js.opts.pre)
	}
	if .putPre != _EMPTY_ {
		.WriteString(.putPre)
	} else {
		.WriteString(.pre)
	}
	.WriteString()

	// DEL op marker. For watch functionality.
	 := NewMsg(.String())

	var  deleteOpts
	for ,  := range  {
		if  != nil {
			if  := .configureDelete(&);  != nil {
				return 
			}
		}
	}

	if .purge {
		.Header.Set(kvop, kvpurge)
		.Header.Set(MsgRollup, MsgRollupSubject)
	} else {
		.Header.Set(kvop, kvdel)
	}

	if .revision != 0 {
		.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(.revision, 10))
	}

	,  := .js.PublishMsg()
	return 
}

// Purge will remove the key and all revisions.
func ( *kvs) ( string,  ...DeleteOpt) error {
	return .Delete(, append(, purge())...)
}

const kvDefaultPurgeDeletesMarkerThreshold = 30 * time.Minute

// PurgeDeletes will remove all current delete markers.
// This is a maintenance option if there is a larger buildup of delete markers.
// See DeleteMarkersOlderThan() option for more information.
func ( *kvs) ( ...PurgeOpt) error {
	var  purgeOpts
	for ,  := range  {
		if  != nil {
			if  := .configurePurge(&);  != nil {
				return 
			}
		}
	}
	// Transfer possible context purge option to the watcher. This is the
	// only option that matters for the PurgeDeletes() feature.
	var  []WatchOpt
	if .ctx != nil {
		 = append(, Context(.ctx))
	}
	,  := .WatchAll(...)
	if  != nil {
		return 
	}
	defer .Stop()

	var  time.Time
	 := .dmthr
	// Negative value is used to instruct to always remove markers, regardless
	// of age. If set to 0 (or not set), use our default value.
	if  == 0 {
		 = kvDefaultPurgeDeletesMarkerThreshold
	}
	if  > 0 {
		 = time.Now().Add(-)
	}

	var  []KeyValueEntry
	for  := range .Updates() {
		if  == nil {
			break
		}
		if  := .Operation();  == KeyValueDelete ||  == KeyValuePurge {
			 = append(, )
		}
	}
	// Stop watcher here so as we purge we do not have the system continually updating numPending.
	.Stop()

	var (
		 StreamPurgeRequest
		  strings.Builder
	)
	// Do actual purges here.
	 := []JSOpt{}
	if .ctx != nil {
		 = append(, Context(.ctx))
	}
	for ,  := range  {
		.WriteString(.pre)
		.WriteString(.Key())
		.Subject = .String()
		.Keep = 0
		if  > 0 && .Created().After() {
			.Keep = 1
		}
		if  := .js.purgeStream(.stream, &, ...);  != nil {
			return 
		}
		.Reset()
	}
	return nil
}

// Keys() will return all keys.
func ( *kvs) ( ...WatchOpt) ([]string, error) {
	 = append(, IgnoreDeletes(), MetaOnly())
	,  := .WatchAll(...)
	if  != nil {
		return nil, 
	}
	defer .Stop()

	var  []string
	for  := range .Updates() {
		if  == nil {
			break
		}
		 = append(, .Key())
	}
	if len() == 0 {
		return nil, ErrNoKeysFound
	}
	return , nil
}

type keyLister struct {
	watcher KeyWatcher
	keys    chan string
}

// ListKeys will return all keys.
func ( *kvs) ( ...WatchOpt) (KeyLister, error) {
	 = append(, IgnoreDeletes(), MetaOnly())
	,  := .WatchAll(...)
	if  != nil {
		return nil, 
	}
	 := &keyLister{watcher: , keys: make(chan string, 256)}

	go func() {
		defer close(.keys)
		defer .Stop()
		for  := range .Updates() {
			if  == nil {
				return
			}
			.keys <- .Key()
		}
	}()
	return , nil
}

func ( *keyLister) () <-chan string {
	return .keys
}

func ( *keyLister) () error {
	return .watcher.Stop()
}

func ( *keyLister) () <-chan error {
	return .watcher.Error()
}

// History will return all values for the key.
func ( *kvs) ( string,  ...WatchOpt) ([]KeyValueEntry, error) {
	 = append(, IncludeHistory())
	,  := .Watch(, ...)
	if  != nil {
		return nil, 
	}
	defer .Stop()

	var  []KeyValueEntry
	for  := range .Updates() {
		if  == nil {
			break
		}
		 = append(, )
	}
	if len() == 0 {
		return nil, ErrKeyNotFound
	}
	return , nil
}

// Implementation for Watch
type watcher struct {
	mu            sync.Mutex
	updates       chan KeyValueEntry
	sub           *Subscription
	initDone      bool
	initPending   uint64
	received      uint64
	ctx           context.Context
	initDoneTimer *time.Timer
	errCh         chan error
}

// Context returns the context for the watcher if set.
func ( *watcher) () context.Context {
	if  == nil {
		return nil
	}
	return .ctx
}

// Updates returns the interior channel.
func ( *watcher) () <-chan KeyValueEntry {
	if  == nil {
		return nil
	}
	return .updates
}

// Stop will unsubscribe from the watcher.
func ( *watcher) () error {
	if  == nil {
		return nil
	}
	return .sub.Unsubscribe()
}

// Error returns a channel that will receive any error that occurs during watching.
func ( *watcher) () <-chan error {
	if  == nil {
		 := make(chan error)
		close()
		return 
	}
	return .errCh
}

// WatchAll watches all keys.
func ( *kvs) ( ...WatchOpt) (KeyWatcher, error) {
	return .Watch(AllKeys, ...)
}

func ( *kvs) ( []string,  ...WatchOpt) (KeyWatcher, error) {
	for ,  := range  {
		if !searchKeyValid() {
			return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "key cannot be empty and must be a valid NATS subject")
		}
	}
	var  watchOpts
	for ,  := range  {
		if  != nil {
			if  := .configureWatcher(&);  != nil {
				return nil, 
			}
		}
	}

	// Could be a pattern so don't check for validity as we normally do.
	for ,  := range  {
		var  strings.Builder
		.WriteString(.pre)
		.WriteString()
		[] = .String()
	}

	// if no keys are provided, watch all keys
	if len() == 0 {
		var  strings.Builder
		.WriteString(.pre)
		.WriteString(AllKeys)
		 = []string{.String()}
	}

	// We will block below on placing items on the chan. That is by design.
	 := &watcher{
		updates: make(chan KeyValueEntry, 256),
		ctx:     .ctx,
		errCh:   make(chan error, 1),
	}

	 := func( *Msg) {
		,  := parser.GetMetadataFields(.Reply)
		if  != nil {
			return
		}
		if len(.Subject) <= len(.pre) {
			return
		}
		 := .Subject[len(.pre):]

		var  KeyValueOp
		if len(.Header) > 0 {
			switch .Header.Get(kvop) {
			case kvdel:
				 = KeyValueDelete
			case kvpurge:
				 = KeyValuePurge
			}
		}
		 := parser.ParseNum([parser.AckNumPendingTokenPos])
		.mu.Lock()
		defer .mu.Unlock()
		if !.ignoreDeletes || ( != KeyValueDelete &&  != KeyValuePurge) {
			 := &kve{
				bucket:   .name,
				key:      ,
				value:    .Data,
				revision: parser.ParseNum([parser.AckStreamSeqTokenPos]),
				created:  time.Unix(0, int64(parser.ParseNum([parser.AckTimestampSeqTokenPos]))),
				delta:    ,
				op:       ,
			}
			.updates <- 
		}
		// Check if done and initial values.
		// Skip if UpdatesOnly() is set, since there will never be updates initially.
		if !.initDone {
			.received++
			// Use the stable initPending value set at consumer creation.
			// We're done if we've received all expected messages OR there are no more pending
			if .received >= .initPending ||  == 0 {
				// Avoid possible race setting up timer.
				if .initDoneTimer != nil {
					.initDoneTimer.Stop()
				}
				.initDone = true
				.updates <- nil
			} else if .initDoneTimer != nil {
				.initDoneTimer.Reset(.js.opts.wait)
			}
		}
	}

	// Used ordered consumer to deliver results.
	 := []SubOpt{BindStream(.stream), OrderedConsumer()}
	if !.includeHistory {
		 = append(, DeliverLastPerSubject())
	}
	if .updatesOnly {
		 = append(, DeliverNew())
	}
	if .metaOnly {
		 = append(, HeadersOnly())
	}
	if .ctx != nil {
		 = append(, Context(.ctx))
	}
	// Create the sub and rest of initialization under the lock.
	// We want to prevent the race between this code and the
	// update() callback.
	.mu.Lock()
	defer .mu.Unlock()
	var  *Subscription
	var  error
	if len() == 1 {
		,  = .js.Subscribe([0], , ...)
	} else {
		 = append(, ConsumerFilterSubjects(...))
		,  = .js.Subscribe("", , ...)
	}
	if  != nil {
		return nil, 
	}
	.mu.Lock()
	// If there were no pending messages at the time of the creation
	// of the consumer, send the marker.
	// Skip if UpdatesOnly() is set, since there will never be updates initially.
	if !.updatesOnly {
		if .jsi != nil {
			if .jsi.pending == 0 {
				.initDone = true
				.updates <- nil
			} else {
				.initPending = .jsi.pending
				// Set a timer to send the marker if we do not get any messages.
				.initDoneTimer = time.AfterFunc(.js.opts.wait, func() {
					.mu.Lock()
					defer .mu.Unlock()
					if !.initDone {
						.initDone = true
						select {
						case .errCh <- ErrKeyWatcherTimeout:
						default:
						}
						.updates <- nil
					}
				})
			}
		}
	} else {
		// if UpdatesOnly was used, mark initialization as complete
		.initDone = true
	}
	// Set us up to close when the waitForMessages func returns.
	.pDone = func( string) {
		.mu.Lock()
		defer .mu.Unlock()
		if .initDoneTimer != nil {
			.initDoneTimer.Stop()
		}
		close(.updates)
		close(.errCh)
	}

	.mu.Unlock()

	.sub = 
	return , nil
}

// Watch will fire the callback when a key that matches the keys pattern is updated.
// keys needs to be a valid NATS subject.
func ( *kvs) ( string,  ...WatchOpt) (KeyWatcher, error) {
	return .WatchFiltered([]string{}, ...)
}

// Bucket returns the current bucket name (JetStream stream).
func ( *kvs) () string {
	return .name
}

// KeyValueBucketStatus represents status of a Bucket, implements KeyValueStatus
type KeyValueBucketStatus struct {
	nfo    *StreamInfo
	bucket string
}

// Bucket the name of the bucket
func ( *KeyValueBucketStatus) () string { return .bucket }

// Values is how many messages are in the bucket, including historical values
func ( *KeyValueBucketStatus) () uint64 { return .nfo.State.Msgs }

// History returns the configured history kept per key
func ( *KeyValueBucketStatus) () int64 { return .nfo.Config.MaxMsgsPerSubject }

// TTL is how long the bucket keeps values for
func ( *KeyValueBucketStatus) () time.Duration { return .nfo.Config.MaxAge }

// BackingStore indicates what technology is used for storage of the bucket
func ( *KeyValueBucketStatus) () string { return "JetStream" }

// StreamInfo is the stream info retrieved to create the status
func ( *KeyValueBucketStatus) () *StreamInfo { return .nfo }

// Bytes is the size of the stream
func ( *KeyValueBucketStatus) () uint64 { return .nfo.State.Bytes }

// IsCompressed indicates if the data is compressed on disk
func ( *KeyValueBucketStatus) () bool { return .nfo.Config.Compression != NoCompression }

// Status retrieves the status and configuration of a bucket
func ( *kvs) () (KeyValueStatus, error) {
	,  := .js.StreamInfo(.stream)
	if  != nil {
		return nil, 
	}

	return &KeyValueBucketStatus{nfo: , bucket: .name}, nil
}

// KeyValueStoreNames is used to retrieve a list of key value store names
func ( *js) () <-chan string {
	 := make(chan string)
	 := &streamNamesLister{js: }
	.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*")
	go func() {
		defer close()
		for .Next() {
			for ,  := range .Page() {
				if !strings.HasPrefix(, kvBucketNamePre) {
					continue
				}
				 <- strings.TrimPrefix(, kvBucketNamePre)
			}
		}
	}()

	return 
}

// KeyValueStores is used to retrieve a list of key value store statuses
func ( *js) () <-chan KeyValueStatus {
	 := make(chan KeyValueStatus)
	 := &streamLister{js: }
	.js.opts.streamListSubject = fmt.Sprintf(kvSubjectsTmpl, "*")
	go func() {
		defer close()
		for .Next() {
			for ,  := range .Page() {
				if !strings.HasPrefix(.Config.Name, kvBucketNamePre) {
					continue
				}
				 <- &KeyValueBucketStatus{nfo: , bucket: strings.TrimPrefix(.Config.Name, kvBucketNamePre)}
			}
		}
	}()
	return 
}

func mapStreamToKVS( *js,  *StreamInfo) *kvs {
	 := strings.TrimPrefix(.Config.Name, kvBucketNamePre)

	 := &kvs{
		name:   ,
		stream: .Config.Name,
		pre:    fmt.Sprintf(kvSubjectsPreTmpl, ),
		js:     ,
		// Determine if we need to use the JS prefix in front of Put and Delete operations
		useJSPfx:  .opts.pre != defaultAPIPrefix,
		useDirect: .Config.AllowDirect,
	}

	// If we are mirroring, we will have mirror direct on, so just use the mirror name
	// and override use
	if  := .Config.Mirror;  != nil {
		 := strings.TrimPrefix(.Name, kvBucketNamePre)
		if .External != nil && .External.APIPrefix != _EMPTY_ {
			.useJSPfx = false
			.pre = fmt.Sprintf(kvSubjectsPreTmpl, )
			.putPre = fmt.Sprintf(kvSubjectsPreDomainTmpl, .External.APIPrefix, )
		} else {
			.putPre = fmt.Sprintf(kvSubjectsPreTmpl, )
		}
	}

	return 
}