// Copyright 2021-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nats

import (
	
	
	
	
	
	
	
)

// JetStreamManager manages JetStream Streams and Consumers.
type JetStreamManager interface {
	// AddStream creates a stream.
	AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)

	// UpdateStream updates a stream.
	UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)

	// DeleteStream deletes a stream.
	DeleteStream(name string, opts ...JSOpt) error

	// StreamInfo retrieves information from a stream.
	StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error)

	// PurgeStream purges a stream messages.
	PurgeStream(name string, opts ...JSOpt) error

	// StreamsInfo can be used to retrieve a list of StreamInfo objects.
	// Deprecated: Use Streams() instead.
	StreamsInfo(opts ...JSOpt) <-chan *StreamInfo

	// Streams can be used to retrieve a list of StreamInfo objects.
	Streams(opts ...JSOpt) <-chan *StreamInfo

	// StreamNames is used to retrieve a list of Stream names.
	StreamNames(opts ...JSOpt) <-chan string

	// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
	// Use options nats.DirectGet() or nats.DirectGetNext() to trigger retrieval
	// directly from a distributed group of servers (leader and replicas).
	// The stream must have been created/updated with the AllowDirect boolean.
	GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error)

	// GetLastMsg retrieves the last raw stream message stored in JetStream by subject.
	// Use option nats.DirectGet() to trigger retrieval
	// directly from a distributed group of servers (leader and replicas).
	// The stream must have been created/updated with the AllowDirect boolean.
	GetLastMsg(name, subject string, opts ...JSOpt) (*RawStreamMsg, error)

	// DeleteMsg deletes a message from a stream. The message is marked as erased, but its value is not overwritten.
	DeleteMsg(name string, seq uint64, opts ...JSOpt) error

	// SecureDeleteMsg deletes a message from a stream. The deleted message is overwritten with random data
	// As a result, this operation is slower than DeleteMsg()
	SecureDeleteMsg(name string, seq uint64, opts ...JSOpt) error

	// AddConsumer adds a consumer to a stream.
	// If the consumer already exists, and the configuration is the same, it
	// will return the existing consumer.
	// If the consumer already exists, and the configuration is different, it
	// will return ErrConsumerNameAlreadyInUse.
	AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)

	// UpdateConsumer updates an existing consumer.
	UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)

	// DeleteConsumer deletes a consumer.
	DeleteConsumer(stream, consumer string, opts ...JSOpt) error

	// ConsumerInfo retrieves information of a consumer from a stream.
	ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error)

	// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
	// Deprecated: Use Consumers() instead.
	ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo

	// Consumers is used to retrieve a list of ConsumerInfo objects.
	Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo

	// ConsumerNames is used to retrieve a list of Consumer names.
	ConsumerNames(stream string, opts ...JSOpt) <-chan string

	// AccountInfo retrieves info about the JetStream usage from an account.
	AccountInfo(opts ...JSOpt) (*AccountInfo, error)

	// StreamNameBySubject returns a stream matching given subject.
	StreamNameBySubject(string, ...JSOpt) (string, error)
}

// StreamConfig will determine the properties for a stream.
// There are sensible defaults for most. If no subjects are
// given the name will be used as the only subject.
type StreamConfig struct {
	// Name is the name of the stream. It is required and must be unique
	// across the JetStream account.
	//
	// Name Names cannot contain whitespace, ., *, >, path separators
	// (forward or backwards slash), and non-printable characters.
	Name string `json:"name"`

	// Description is an optional description of the stream.
	Description string `json:"description,omitempty"`

	// Subjects is a list of subjects that the stream is listening on.
	// Wildcards are supported. Subjects cannot be set if the stream is
	// created as a mirror.
	Subjects []string `json:"subjects,omitempty"`

	// Retention defines the message retention policy for the stream.
	// Defaults to LimitsPolicy.
	Retention RetentionPolicy `json:"retention"`

	// MaxConsumers specifies the maximum number of consumers allowed for
	// the stream.
	MaxConsumers int `json:"max_consumers"`

	// MaxMsgs is the maximum number of messages the stream will store.
	// After reaching the limit, stream adheres to the discard policy.
	// If not set, server default is -1 (unlimited).
	MaxMsgs int64 `json:"max_msgs"`

	// MaxBytes is the maximum total size of messages the stream will store.
	// After reaching the limit, stream adheres to the discard policy.
	// If not set, server default is -1 (unlimited).
	MaxBytes int64 `json:"max_bytes"`

	// Discard defines the policy for handling messages when the stream
	// reaches its limits in terms of number of messages or total bytes.
	Discard DiscardPolicy `json:"discard"`

	// DiscardNewPerSubject is a flag to enable discarding new messages per
	// subject when limits are reached. Requires DiscardPolicy to be
	// DiscardNew and the MaxMsgsPerSubject to be set.
	DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`

	// MaxAge is the maximum age of messages that the stream will retain.
	MaxAge time.Duration `json:"max_age"`

	// MaxMsgsPerSubject is the maximum number of messages per subject that
	// the stream will retain.
	MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`

	// MaxMsgSize is the maximum size of any single message in the stream.
	MaxMsgSize int32 `json:"max_msg_size,omitempty"`

	// Storage specifies the type of storage backend used for the stream
	// (file or memory).
	Storage StorageType `json:"storage"`

	// Replicas is the number of stream replicas in clustered JetStream.
	// Defaults to 1, maximum is 5.
	Replicas int `json:"num_replicas"`

	// NoAck is a flag to disable acknowledging messages received by this
	// stream.
	//
	// If set to true, publish methods from the JetStream client will not
	// work as expected, since they rely on acknowledgements. Core NATS
	// publish methods should be used instead. Note that this will make
	// message delivery less reliable.
	NoAck bool `json:"no_ack,omitempty"`

	// Duplicates is the window within which to track duplicate messages.
	// If not set, server default is 2 minutes.
	Duplicates time.Duration `json:"duplicate_window,omitempty"`

	// Placement is used to declare where the stream should be placed via
	// tags and/or an explicit cluster name.
	Placement *Placement `json:"placement,omitempty"`

	// Mirror defines the configuration for mirroring another stream.
	Mirror *StreamSource `json:"mirror,omitempty"`

	// Sources is a list of other streams this stream sources messages from.
	Sources []*StreamSource `json:"sources,omitempty"`

	// Sealed streams do not allow messages to be published or deleted via limits or API,
	// sealed streams can not be unsealed via configuration update. Can only
	// be set on already created streams via the Update API.
	Sealed bool `json:"sealed,omitempty"`

	// DenyDelete restricts the ability to delete messages from a stream via
	// the API. Defaults to false.
	DenyDelete bool `json:"deny_delete,omitempty"`

	// DenyPurge restricts the ability to purge messages from a stream via
	// the API. Defaults to false.
	DenyPurge bool `json:"deny_purge,omitempty"`

	// AllowRollup allows the use of the Nats-Rollup header to replace all
	// contents of a stream, or subject in a stream, with a single new
	// message.
	AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`

	// Compression specifies the message storage compression algorithm.
	// Defaults to NoCompression.
	Compression StoreCompression `json:"compression"`

	// FirstSeq is the initial sequence number of the first message in the
	// stream.
	FirstSeq uint64 `json:"first_seq,omitempty"`

	// SubjectTransform allows applying a transformation to matching
	// messages' subjects.
	SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`

	// RePublish allows immediate republishing a message to the configured
	// subject after it's stored.
	RePublish *RePublish `json:"republish,omitempty"`

	// AllowDirect enables direct access to individual messages using direct
	// get API. Defaults to false.
	AllowDirect bool `json:"allow_direct"`

	// MirrorDirect enables direct access to individual messages from the
	// origin stream using direct get API. Defaults to false.
	MirrorDirect bool `json:"mirror_direct"`

	// ConsumerLimits defines limits of certain values that consumers can
	// set, defaults for those who don't set these settings
	ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"`

	// Metadata is a set of application-defined key-value pairs for
	// associating metadata on the stream. This feature requires nats-server
	// v2.10.0 or later.
	Metadata map[string]string `json:"metadata,omitempty"`

	// Template identifies the template that manages the Stream. Deprecated:
	// This feature is no longer supported.
	Template string `json:"template_owner,omitempty"`

	// AllowMsgTTL allows header initiated per-message TTLs.
	// This feature requires nats-server v2.11.0 or later.
	AllowMsgTTL bool `json:"allow_msg_ttl"`

	// Enables and sets a duration for adding server markers for delete, purge and max age limits.
	// This feature requires nats-server v2.11.0 or later.
	SubjectDeleteMarkerTTL time.Duration `json:"subject_delete_marker_ttl,omitempty"`
}

// SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received.
type SubjectTransformConfig struct {
	Source      string `json:"src,omitempty"`
	Destination string `json:"dest"`
}

// RePublish is for republishing messages once committed to a stream. The original
// subject cis remapped from the subject pattern to the destination pattern.
type RePublish struct {
	Source      string `json:"src,omitempty"`
	Destination string `json:"dest"`
	HeadersOnly bool   `json:"headers_only,omitempty"`
}

// Placement is used to guide placement of streams in clustered JetStream.
type Placement struct {
	Cluster string   `json:"cluster"`
	Tags    []string `json:"tags,omitempty"`
}

// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
	Name              string                   `json:"name"`
	OptStartSeq       uint64                   `json:"opt_start_seq,omitempty"`
	OptStartTime      *time.Time               `json:"opt_start_time,omitempty"`
	FilterSubject     string                   `json:"filter_subject,omitempty"`
	SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
	External          *ExternalStream          `json:"external,omitempty"`
	Domain            string                   `json:"-"`
}

// ExternalStream allows you to qualify access to a stream source in another
// account.
type ExternalStream struct {
	APIPrefix     string `json:"api"`
	DeliverPrefix string `json:"deliver,omitempty"`
}

// StreamConsumerLimits are the limits for a consumer on a stream.
// These can be overridden on a per consumer basis.
type StreamConsumerLimits struct {
	InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
	MaxAckPending     int           `json:"max_ack_pending,omitempty"`
}

// Helper for copying when we do not want to change user's version.
func ( *StreamSource) () *StreamSource {
	 := *
	// Check pointers
	if .OptStartTime != nil {
		 := *.OptStartTime
		.OptStartTime = &
	}
	if .External != nil {
		 := *.External
		.External = &
	}
	return &
}

// If we have a Domain, convert to the appropriate ext.APIPrefix.
// This will change the stream source, so should be a copy passed in.
func ( *StreamSource) () error {
	if .Domain == _EMPTY_ {
		return nil
	}
	if .External != nil {
		// These should be mutually exclusive.
		// TODO(dlc) - Make generic?
		return errors.New("nats: domain and external are both set")
	}
	.External = &ExternalStream{APIPrefix: fmt.Sprintf(jsExtDomainT, .Domain)}
	return nil
}

// apiResponse is a standard response from the JetStream JSON API
type apiResponse struct {
	Type  string    `json:"type"`
	Error *APIError `json:"error,omitempty"`
}

// apiPaged includes variables used to create paged responses from the JSON API
type apiPaged struct {
	Total  int `json:"total"`
	Offset int `json:"offset"`
	Limit  int `json:"limit"`
}

// apiPagedRequest includes parameters allowing specific pages to be requested
// from APIs responding with apiPaged.
type apiPagedRequest struct {
	Offset int `json:"offset,omitempty"`
}

// AccountInfo contains info about the JetStream usage from the current account.
type AccountInfo struct {
	Tier
	Domain string          `json:"domain"`
	API    APIStats        `json:"api"`
	Tiers  map[string]Tier `json:"tiers"`
}

type Tier struct {
	Memory         uint64        `json:"memory"`
	Store          uint64        `json:"storage"`
	ReservedMemory uint64        `json:"reserved_memory"`
	ReservedStore  uint64        `json:"reserved_storage"`
	Streams        int           `json:"streams"`
	Consumers      int           `json:"consumers"`
	Limits         AccountLimits `json:"limits"`
}

// APIStats reports on API calls to JetStream for this account.
type APIStats struct {
	Total  uint64 `json:"total"`
	Errors uint64 `json:"errors"`
}

// AccountLimits includes the JetStream limits of the current account.
type AccountLimits struct {
	MaxMemory            int64 `json:"max_memory"`
	MaxStore             int64 `json:"max_storage"`
	MaxStreams           int   `json:"max_streams"`
	MaxConsumers         int   `json:"max_consumers"`
	MaxAckPending        int   `json:"max_ack_pending"`
	MemoryMaxStreamBytes int64 `json:"memory_max_stream_bytes"`
	StoreMaxStreamBytes  int64 `json:"storage_max_stream_bytes"`
	MaxBytesRequired     bool  `json:"max_bytes_required"`
}

type accountInfoResponse struct {
	apiResponse
	AccountInfo
}

// AccountInfo fetches account information from the server, containing details
// about the account associated with this JetStream connection. If account is
// not enabled for JetStream, ErrJetStreamNotEnabledForAccount is returned.
//
// If the server does not have JetStream enabled, ErrJetStreamNotEnabled is
// returned (for a single server setup). For clustered topologies, AccountInfo
// will time out.
func ( *js) ( ...JSOpt) (*AccountInfo, error) {
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return nil, 
	}
	if  != nil {
		defer ()
	}

	,  := .apiRequestWithContext(.ctx, .apiSubj(apiAccountInfo), nil)
	if  != nil {
		// todo maybe nats server should never have no responder on this subject and always respond if they know there is no js to be had
		if errors.Is(, ErrNoResponders) {
			 = ErrJetStreamNotEnabled
		}
		return nil, 
	}
	var  accountInfoResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		return nil, 
	}
	if .Error != nil {
		// Internally checks based on error code instead of description match.
		if errors.Is(.Error, ErrJetStreamNotEnabledForAccount) {
			return nil, ErrJetStreamNotEnabledForAccount
		}
		return nil, .Error
	}

	return &.AccountInfo, nil
}

type createConsumerRequest struct {
	Stream string          `json:"stream_name"`
	Config *ConsumerConfig `json:"config"`
}

type consumerResponse struct {
	apiResponse
	*ConsumerInfo
}

// AddConsumer adds a consumer to a stream.
// If the consumer already exists, and the configuration is the same, it
// will return the existing consumer.
// If the consumer already exists, and the configuration is different, it
// will return ErrConsumerNameAlreadyInUse.
func ( *js) ( string,  *ConsumerConfig,  ...JSOpt) (*ConsumerInfo, error) {
	if  == nil {
		 = &ConsumerConfig{}
	}
	 := .Name
	if  == _EMPTY_ {
		 = .Durable
	}
	if  != _EMPTY_ {
		,  := .ConsumerInfo(, , ...)
		if  != nil && !errors.Is(, ErrConsumerNotFound) && !errors.Is(, ErrStreamNotFound) {
			return nil, 
		}

		if  != nil {
			 := checkConfig(&.Config, )
			if  != nil {
				return nil, fmt.Errorf("%w: creating consumer %q on stream %q", ErrConsumerNameAlreadyInUse, , )
			} else {
				return , nil
			}
		}
	}

	return .upsertConsumer(, , , ...)
}

func ( *js) ( string,  *ConsumerConfig,  ...JSOpt) (*ConsumerInfo, error) {
	if  == nil {
		return nil, ErrConsumerConfigRequired
	}
	 := .Name
	if  == _EMPTY_ {
		 = .Durable
	}
	if  == _EMPTY_ {
		return nil, ErrConsumerNameRequired
	}
	return .upsertConsumer(, , , ...)
}

func ( *js) (,  string,  *ConsumerConfig,  ...JSOpt) (*ConsumerInfo, error) {
	if  := checkStreamName();  != nil {
		return nil, 
	}
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return nil, 
	}
	if  != nil {
		defer ()
	}

	,  := json.Marshal(&createConsumerRequest{Stream: , Config: })
	if  != nil {
		return nil, 
	}

	var  string
	if  == _EMPTY_ {
		// if consumer name is empty (neither Durable nor Name is set), use the legacy ephemeral endpoint
		 = fmt.Sprintf(apiLegacyConsumerCreateT, )
	} else if  := checkConsumerName();  != nil {
		return nil, 
	} else if .nc.serverMinVersion(2, 9, 0) {
		if .Durable != "" && .opts.featureFlags.useDurableConsumerCreate {
			// if user set the useDurableConsumerCreate flag, use the legacy DURABLE.CREATE endpoint
			 = fmt.Sprintf(apiDurableCreateT, , )
		} else if .FilterSubject == _EMPTY_ || .FilterSubject == ">" {
			// if filter subject is empty or ">", use the endpoint without filter subject
			 = fmt.Sprintf(apiConsumerCreateT, , )
		} else {
			// safeguard against passing invalid filter subject in request subject
			if .FilterSubject[0] == '.' || .FilterSubject[len(.FilterSubject)-1] == '.' {
				return nil, fmt.Errorf("%w: %q", ErrInvalidFilterSubject, .FilterSubject)
			}
			// if filter subject is not empty, use the endpoint with filter subject
			 = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, , , .FilterSubject)
		}
	} else {
		if .Durable != "" {
			// if Durable is set, use the DURABLE.CREATE endpoint
			 = fmt.Sprintf(apiDurableCreateT, , )
		} else {
			// if Durable is not set, use the legacy ephemeral endpoint
			 = fmt.Sprintf(apiLegacyConsumerCreateT, )
		}
	}

	,  := .apiRequestWithContext(.ctx, .apiSubj(), )
	if  != nil {
		if errors.Is(, ErrNoResponders) {
			 = ErrJetStreamNotEnabled
		}
		return nil, 
	}
	var  consumerResponse
	 = json.Unmarshal(.Data, &)
	if  != nil {
		return nil, 
	}
	if .Error != nil {
		if errors.Is(.Error, ErrStreamNotFound) {
			return nil, ErrStreamNotFound
		}
		if errors.Is(.Error, ErrConsumerNotFound) {
			return nil, ErrConsumerNotFound
		}
		return nil, .Error
	}

	if .Error == nil && .ConsumerInfo == nil {
		return nil, ErrConsumerCreationResponseEmpty
	}

	// check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo
	if len(.FilterSubjects) != 0 && len(.Config.FilterSubjects) == 0 {
		return nil, ErrConsumerMultipleFilterSubjectsNotSupported
	}
	return .ConsumerInfo, nil
}

// consumerDeleteResponse is the response for a Consumer delete request.
type consumerDeleteResponse struct {
	apiResponse
	Success bool `json:"success,omitempty"`
}

func checkStreamName( string) error {
	if  == _EMPTY_ {
		return ErrStreamNameRequired
	}
	if strings.ContainsAny(, ". ") {
		return ErrInvalidStreamName
	}
	return nil
}

// Check that the consumer name is not empty and is valid (does not contain "." and " ").
// Additional consumer name validation is done in nats-server.
// Returns ErrConsumerNameRequired if consumer name is empty, ErrInvalidConsumerName is invalid, otherwise nil
func checkConsumerName( string) error {
	if  == _EMPTY_ {
		return ErrConsumerNameRequired
	}
	if strings.ContainsAny(, ". ") {
		return ErrInvalidConsumerName
	}
	return nil
}

// DeleteConsumer deletes a Consumer.
func ( *js) (,  string,  ...JSOpt) error {
	if  := checkStreamName();  != nil {
		return 
	}
	if  := checkConsumerName();  != nil {
		return 
	}
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return 
	}
	if  != nil {
		defer ()
	}

	 := .apiSubj(fmt.Sprintf(apiConsumerDeleteT, , ))
	,  := .apiRequestWithContext(.ctx, , nil)
	if  != nil {
		return 
	}
	var  consumerDeleteResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		return 
	}

	if .Error != nil {
		if errors.Is(.Error, ErrConsumerNotFound) {
			return ErrConsumerNotFound
		}
		return .Error
	}
	return nil
}

// ConsumerInfo returns information about a Consumer.
func ( *js) (,  string,  ...JSOpt) (*ConsumerInfo, error) {
	if  := checkStreamName();  != nil {
		return nil, 
	}
	if  := checkConsumerName();  != nil {
		return nil, 
	}
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return nil, 
	}
	if  != nil {
		defer ()
	}
	return .getConsumerInfoContext(.ctx, , )
}

// consumerLister fetches pages of ConsumerInfo objects. This object is not
// safe to use for multiple threads.
type consumerLister struct {
	stream string
	js     *js

	err      error
	offset   int
	page     []*ConsumerInfo
	pageInfo *apiPaged
}

// consumersRequest is the type used for Consumers requests.
type consumersRequest struct {
	apiPagedRequest
}

// consumerListResponse is the response for a Consumers List request.
type consumerListResponse struct {
	apiResponse
	apiPaged
	Consumers []*ConsumerInfo `json:"consumers"`
}

// Next fetches the next ConsumerInfo page.
func ( *consumerLister) () bool {
	if .err != nil {
		return false
	}
	if  := checkStreamName(.stream);  != nil {
		.err = 
		return false
	}
	if .pageInfo != nil && .offset >= .pageInfo.Total {
		return false
	}

	,  := json.Marshal(consumersRequest{
		apiPagedRequest: apiPagedRequest{Offset: .offset},
	})
	if  != nil {
		.err = 
		return false
	}

	var  context.CancelFunc
	 := .js.opts.ctx
	if  == nil {
		,  = context.WithTimeout(context.Background(), .js.opts.wait)
		defer ()
	}

	 := .js.apiSubj(fmt.Sprintf(apiConsumerListT, .stream))
	,  := .js.apiRequestWithContext(, , )
	if  != nil {
		.err = 
		return false
	}
	var  consumerListResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		.err = 
		return false
	}
	if .Error != nil {
		.err = .Error
		return false
	}

	.pageInfo = &.apiPaged
	.page = .Consumers
	.offset += len(.page)
	return true
}

// Page returns the current ConsumerInfo page.
func ( *consumerLister) () []*ConsumerInfo {
	return .page
}

// Err returns any errors found while fetching pages.
func ( *consumerLister) () error {
	return .err
}

// Consumers is used to retrieve a list of ConsumerInfo objects.
func ( *js) ( string,  ...JSOpt) <-chan *ConsumerInfo {
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return nil
	}

	 := make(chan *ConsumerInfo)
	 := &consumerLister{js: &js{nc: .nc, opts: }, stream: }
	go func() {
		if  != nil {
			defer ()
		}
		defer close()
		for .Next() {
			for ,  := range .Page() {
				select {
				case  <- :
				case <-.ctx.Done():
					return
				}
			}
		}
	}()

	return 
}

// ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
// Deprecated: Use Consumers() instead.
func ( *js) ( string,  ...JSOpt) <-chan *ConsumerInfo {
	return .Consumers(, ...)
}

type consumerNamesLister struct {
	stream string
	js     *js

	err      error
	offset   int
	page     []string
	pageInfo *apiPaged
}

// consumerNamesListResponse is the response for a Consumers Names List request.
type consumerNamesListResponse struct {
	apiResponse
	apiPaged
	Consumers []string `json:"consumers"`
}

// Next fetches the next consumer names page.
func ( *consumerNamesLister) () bool {
	if .err != nil {
		return false
	}
	if  := checkStreamName(.stream);  != nil {
		.err = 
		return false
	}
	if .pageInfo != nil && .offset >= .pageInfo.Total {
		return false
	}

	var  context.CancelFunc
	 := .js.opts.ctx
	if  == nil {
		,  = context.WithTimeout(context.Background(), .js.opts.wait)
		defer ()
	}

	,  := json.Marshal(consumersRequest{
		apiPagedRequest: apiPagedRequest{Offset: .offset},
	})
	if  != nil {
		.err = 
		return false
	}
	 := .js.apiSubj(fmt.Sprintf(apiConsumerNamesT, .stream))
	,  := .js.apiRequestWithContext(, , )
	if  != nil {
		.err = 
		return false
	}
	var  consumerNamesListResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		.err = 
		return false
	}
	if .Error != nil {
		.err = .Error
		return false
	}

	.pageInfo = &.apiPaged
	.page = .Consumers
	.offset += len(.page)
	return true
}

// Page returns the current ConsumerInfo page.
func ( *consumerNamesLister) () []string {
	return .page
}

// Err returns any errors found while fetching pages.
func ( *consumerNamesLister) () error {
	return .err
}

// ConsumerNames is used to retrieve a list of Consumer names.
func ( *js) ( string,  ...JSOpt) <-chan string {
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return nil
	}

	 := make(chan string)
	 := &consumerNamesLister{stream: , js: &js{nc: .nc, opts: }}
	go func() {
		if  != nil {
			defer ()
		}
		defer close()
		for .Next() {
			for ,  := range .Page() {
				select {
				case  <- :
				case <-.ctx.Done():
					return
				}
			}
		}
	}()

	return 
}

// streamCreateResponse stream creation.
type streamCreateResponse struct {
	apiResponse
	*StreamInfo
}

func ( *js) ( *StreamConfig,  ...JSOpt) (*StreamInfo, error) {
	if  == nil {
		return nil, ErrStreamConfigRequired
	}
	if  := checkStreamName(.Name);  != nil {
		return nil, 
	}
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return nil, 
	}
	if  != nil {
		defer ()
	}

	// In case we need to change anything, copy so we do not change the caller's version.
	 := *

	// If we have a mirror and an external domain, convert to ext.APIPrefix.
	if .Mirror != nil && .Mirror.Domain != _EMPTY_ {
		// Copy so we do not change the caller's version.
		.Mirror = .Mirror.copy()
		if  := .Mirror.convertDomain();  != nil {
			return nil, 
		}
	}
	// Check sources for the same.
	if len(.Sources) > 0 {
		.Sources = append([]*StreamSource(nil), .Sources...)
		for ,  := range .Sources {
			if .Domain != _EMPTY_ {
				.Sources[] = .copy()
				if  := .Sources[].convertDomain();  != nil {
					return nil, 
				}
			}
		}
	}

	,  := json.Marshal(&)
	if  != nil {
		return nil, 
	}

	 := .apiSubj(fmt.Sprintf(apiStreamCreateT, .Name))
	,  := .apiRequestWithContext(.ctx, , )
	if  != nil {
		return nil, 
	}
	var  streamCreateResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		return nil, 
	}
	if .Error != nil {
		if errors.Is(.Error, ErrStreamNameAlreadyInUse) {
			return nil, ErrStreamNameAlreadyInUse
		}
		return nil, .Error
	}

	// check that input subject transform (if used) is reflected in the returned ConsumerInfo
	if .SubjectTransform != nil && .StreamInfo.Config.SubjectTransform == nil {
		return nil, ErrStreamSubjectTransformNotSupported
	}
	if len(.Sources) != 0 {
		if len(.Sources) != len(.Config.Sources) {
			return nil, ErrStreamSourceNotSupported
		}
		for  := range .Sources {
			if len(.Sources[].SubjectTransforms) != 0 && len(.Sources[].SubjectTransforms) == 0 {
				return nil, ErrStreamSourceMultipleSubjectTransformsNotSupported
			}
		}
	}

	return .StreamInfo, nil
}

type (
	// StreamInfoRequest contains additional option to return
	StreamInfoRequest struct {
		apiPagedRequest
		// DeletedDetails when true includes information about deleted messages
		DeletedDetails bool `json:"deleted_details,omitempty"`
		// SubjectsFilter when set, returns information on the matched subjects
		SubjectsFilter string `json:"subjects_filter,omitempty"`
	}
	streamInfoResponse = struct {
		apiResponse
		apiPaged
		*StreamInfo
	}
)

func ( *js) ( string,  ...JSOpt) (*StreamInfo, error) {
	if  := checkStreamName();  != nil {
		return nil, 
	}
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return nil, 
	}
	if  != nil {
		defer ()
	}

	var  int
	var  map[string]uint64
	var  []byte
	var  bool

	var  StreamInfoRequest
	if .streamInfoOpts != nil {
		 = true
		 = *.streamInfoOpts
	}

	for {
		if  {
			.Offset = 
			if ,  = json.Marshal(&);  != nil {
				return nil, 
			}
		}

		 := .apiSubj(fmt.Sprintf(apiStreamInfoT, ))

		,  := .apiRequestWithContext(.ctx, , )
		if  != nil {
			return nil, 
		}

		var  streamInfoResponse
		if  := json.Unmarshal(.Data, &);  != nil {
			return nil, 
		}

		if .Error != nil {
			if errors.Is(.Error, ErrStreamNotFound) {
				return nil, ErrStreamNotFound
			}
			return nil, .Error
		}

		var  int
		// for backwards compatibility
		if .Total != 0 {
			 = .Total
		} else {
			 = len(.State.Subjects)
		}

		if  && len(.StreamInfo.State.Subjects) > 0 {
			if  == nil {
				 = make(map[string]uint64, )
			}

			for ,  := range .State.Subjects {
				[] = 
				++
			}
		}

		if  >=  {
			if  {
				.StreamInfo.State.Subjects = 
			}
			return .StreamInfo, nil
		}
	}
}

// StreamInfo shows config and current state for this stream.
type StreamInfo struct {
	Config     StreamConfig        `json:"config"`
	Created    time.Time           `json:"created"`
	State      StreamState         `json:"state"`
	Cluster    *ClusterInfo        `json:"cluster,omitempty"`
	Mirror     *StreamSourceInfo   `json:"mirror,omitempty"`
	Sources    []*StreamSourceInfo `json:"sources,omitempty"`
	Alternates []*StreamAlternate  `json:"alternates,omitempty"`
}

// StreamAlternate is an alternate stream represented by a mirror.
type StreamAlternate struct {
	Name    string `json:"name"`
	Domain  string `json:"domain,omitempty"`
	Cluster string `json:"cluster"`
}

// StreamSourceInfo shows information about an upstream stream source.
type StreamSourceInfo struct {
	Name              string                   `json:"name"`
	Lag               uint64                   `json:"lag"`
	Active            time.Duration            `json:"active"`
	External          *ExternalStream          `json:"external"`
	Error             *APIError                `json:"error"`
	FilterSubject     string                   `json:"filter_subject,omitempty"`
	SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
}

// StreamState is information about the given stream.
type StreamState struct {
	Msgs        uint64            `json:"messages"`
	Bytes       uint64            `json:"bytes"`
	FirstSeq    uint64            `json:"first_seq"`
	FirstTime   time.Time         `json:"first_ts"`
	LastSeq     uint64            `json:"last_seq"`
	LastTime    time.Time         `json:"last_ts"`
	Consumers   int               `json:"consumer_count"`
	Deleted     []uint64          `json:"deleted"`
	NumDeleted  int               `json:"num_deleted"`
	NumSubjects uint64            `json:"num_subjects"`
	Subjects    map[string]uint64 `json:"subjects"`
}

// ClusterInfo shows information about the underlying set of servers
// that make up the stream or consumer.
type ClusterInfo struct {
	Name        string      `json:"name,omitempty"`
	RaftGroup   string      `json:"raft_group,omitempty"`
	Leader      string      `json:"leader,omitempty"`
	LeaderSince *time.Time  `json:"leader_since,omitempty"`
	SystemAcc   bool        `json:"system_account,omitempty"`
	TrafficAcc  string      `json:"traffic_account,omitempty"`
	Replicas    []*PeerInfo `json:"replicas,omitempty"`
}

// PeerInfo shows information about all the peers in the cluster that
// are supporting the stream or consumer.
type PeerInfo struct {
	Name    string        `json:"name"`
	Current bool          `json:"current"`
	Offline bool          `json:"offline,omitempty"`
	Active  time.Duration `json:"active"`
	Lag     uint64        `json:"lag,omitempty"`
}

// UpdateStream updates a Stream.
func ( *js) ( *StreamConfig,  ...JSOpt) (*StreamInfo, error) {
	if  == nil {
		return nil, ErrStreamConfigRequired
	}
	if  := checkStreamName(.Name);  != nil {
		return nil, 
	}
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return nil, 
	}
	if  != nil {
		defer ()
	}

	,  := json.Marshal()
	if  != nil {
		return nil, 
	}

	 := .apiSubj(fmt.Sprintf(apiStreamUpdateT, .Name))
	,  := .apiRequestWithContext(.ctx, , )
	if  != nil {
		return nil, 
	}
	var  streamInfoResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		return nil, 
	}
	if .Error != nil {
		if errors.Is(.Error, ErrStreamNotFound) {
			return nil, ErrStreamNotFound
		}
		return nil, .Error
	}

	// check that input subject transform (if used) is reflected in the returned StreamInfo
	if .SubjectTransform != nil && .StreamInfo.Config.SubjectTransform == nil {
		return nil, ErrStreamSubjectTransformNotSupported
	}

	if len(.Sources) != 0 {
		if len(.Sources) != len(.Config.Sources) {
			return nil, ErrStreamSourceNotSupported
		}
		for  := range .Sources {
			if len(.Sources[].SubjectTransforms) != 0 && len(.Sources[].SubjectTransforms) == 0 {
				return nil, ErrStreamSourceMultipleSubjectTransformsNotSupported
			}
		}
	}

	return .StreamInfo, nil
}

// streamDeleteResponse is the response for a Stream delete request.
type streamDeleteResponse struct {
	apiResponse
	Success bool `json:"success,omitempty"`
}

// DeleteStream deletes a Stream.
func ( *js) ( string,  ...JSOpt) error {
	if  := checkStreamName();  != nil {
		return 
	}
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return 
	}
	if  != nil {
		defer ()
	}

	 := .apiSubj(fmt.Sprintf(apiStreamDeleteT, ))
	,  := .apiRequestWithContext(.ctx, , nil)
	if  != nil {
		return 
	}
	var  streamDeleteResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		return 
	}

	if .Error != nil {
		if errors.Is(.Error, ErrStreamNotFound) {
			return ErrStreamNotFound
		}
		return .Error
	}
	return nil
}

type apiMsgGetRequest struct {
	Seq     uint64 `json:"seq,omitempty"`
	LastFor string `json:"last_by_subj,omitempty"`
	NextFor string `json:"next_by_subj,omitempty"`
}

// RawStreamMsg is a raw message stored in JetStream.
type RawStreamMsg struct {
	Subject  string
	Sequence uint64
	Header   Header
	Data     []byte
	Time     time.Time
}

// storedMsg is a raw message stored in JetStream.
type storedMsg struct {
	Subject  string    `json:"subject"`
	Sequence uint64    `json:"seq"`
	Header   []byte    `json:"hdrs,omitempty"`
	Data     []byte    `json:"data,omitempty"`
	Time     time.Time `json:"time"`
}

// apiMsgGetResponse is the response for a Stream get request.
type apiMsgGetResponse struct {
	apiResponse
	Message *storedMsg `json:"message,omitempty"`
}

// GetLastMsg retrieves the last raw stream message stored in JetStream by subject.
func ( *js) (,  string,  ...JSOpt) (*RawStreamMsg, error) {
	return .getMsg(, &apiMsgGetRequest{LastFor: }, ...)
}

// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
func ( *js) ( string,  uint64,  ...JSOpt) (*RawStreamMsg, error) {
	return .getMsg(, &apiMsgGetRequest{Seq: }, ...)
}

// Low level getMsg
func ( *js) ( string,  *apiMsgGetRequest,  ...JSOpt) (*RawStreamMsg, error) {
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return nil, 
	}
	if  != nil {
		defer ()
	}

	if  := checkStreamName();  != nil {
		return nil, 
	}

	var  string
	if .directGet && .LastFor != _EMPTY_ {
		 = apiDirectMsgGetLastBySubjectT
		 := .apiSubj(fmt.Sprintf(, , .LastFor))
		,  := .apiRequestWithContext(.ctx, , nil)
		if  != nil {
			return nil, 
		}
		return convertDirectGetMsgResponseToMsg(, )
	}

	if .directGet {
		 = apiDirectMsgGetT
		.NextFor = .directNextFor
	} else {
		 = apiMsgGetT
	}

	,  := json.Marshal()
	if  != nil {
		return nil, 
	}

	 := .apiSubj(fmt.Sprintf(, ))
	,  := .apiRequestWithContext(.ctx, , )
	if  != nil {
		return nil, 
	}

	if .directGet {
		return convertDirectGetMsgResponseToMsg(, )
	}

	var  apiMsgGetResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		return nil, 
	}
	if .Error != nil {
		if errors.Is(.Error, ErrMsgNotFound) {
			return nil, ErrMsgNotFound
		}
		if errors.Is(.Error, ErrStreamNotFound) {
			return nil, ErrStreamNotFound
		}
		return nil, .Error
	}

	 := .Message

	var  Header
	if len(.Header) > 0 {
		,  = DecodeHeadersMsg(.Header)
		if  != nil {
			return nil, 
		}
	}

	return &RawStreamMsg{
		Subject:  .Subject,
		Sequence: .Sequence,
		Header:   ,
		Data:     .Data,
		Time:     .Time,
	}, nil
}

func convertDirectGetMsgResponseToMsg( string,  *Msg) (*RawStreamMsg, error) {
	// Check for 404/408. We would get a no-payload message and a "Status" header
	if len(.Data) == 0 {
		 := .Header.Get(statusHdr)
		if  != _EMPTY_ {
			switch  {
			case noMessagesSts:
				return nil, ErrMsgNotFound
			default:
				 := .Header.Get(descrHdr)
				if  == _EMPTY_ {
					 = "unable to get message"
				}
				return nil, fmt.Errorf("nats: %s", )
			}
		}
	}
	// Check for headers that give us the required information to
	// reconstruct the message.
	if len(.Header) == 0 {
		return nil, errors.New("nats: response should have headers")
	}
	 := .Header.Get(JSStream)
	if  == _EMPTY_ {
		return nil, errors.New("nats: missing stream header")
	}

	// Mirrors can now answer direct gets, so removing check for name equality.
	// TODO(dlc) - We could have server also have a header with origin and check that?

	 := .Header.Get(JSSequence)
	if  == _EMPTY_ {
		return nil, errors.New("nats: missing sequence header")
	}
	,  := strconv.ParseUint(, 10, 64)
	if  != nil {
		return nil, fmt.Errorf("nats: invalid sequence header '%s': %v", , )
	}
	 := .Header.Get(JSTimeStamp)
	if  == _EMPTY_ {
		return nil, errors.New("nats: missing timestamp header")
	}
	// Temporary code: the server in main branch is sending with format
	// "2006-01-02 15:04:05.999999999 +0000 UTC", but will be changed
	// to use format RFC3339Nano. Because of server test deps/cycle,
	// support both until the server PR lands.
	,  := time.Parse(time.RFC3339Nano, )
	if  != nil {
		,  = time.Parse("2006-01-02 15:04:05.999999999 +0000 UTC", )
		if  != nil {
			return nil, fmt.Errorf("nats: invalid timestamp header '%s': %v", , )
		}
	}
	 := .Header.Get(JSSubject)
	if  == _EMPTY_ {
		return nil, errors.New("nats: missing subject header")
	}
	return &RawStreamMsg{
		Subject:  ,
		Sequence: ,
		Header:   .Header,
		Data:     .Data,
		Time:     ,
	}, nil
}

type msgDeleteRequest struct {
	Seq     uint64 `json:"seq"`
	NoErase bool   `json:"no_erase,omitempty"`
}

// msgDeleteResponse is the response for a Stream delete request.
type msgDeleteResponse struct {
	apiResponse
	Success bool `json:"success,omitempty"`
}

// DeleteMsg deletes a message from a stream.
// The message is marked as erased, but not overwritten
func ( *js) ( string,  uint64,  ...JSOpt) error {
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return 
	}
	if  != nil {
		defer ()
	}

	return .deleteMsg(.ctx, , &msgDeleteRequest{Seq: , NoErase: true})
}

// SecureDeleteMsg deletes a message from a stream. The deleted message is overwritten with random data
// As a result, this operation is slower than DeleteMsg()
func ( *js) ( string,  uint64,  ...JSOpt) error {
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return 
	}
	if  != nil {
		defer ()
	}

	return .deleteMsg(.ctx, , &msgDeleteRequest{Seq: })
}

func ( *js) ( context.Context,  string,  *msgDeleteRequest) error {
	if  := checkStreamName();  != nil {
		return 
	}
	,  := json.Marshal()
	if  != nil {
		return 
	}

	 := .apiSubj(fmt.Sprintf(apiMsgDeleteT, ))
	,  := .apiRequestWithContext(, , )
	if  != nil {
		return 
	}
	var  msgDeleteResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		return 
	}
	if .Error != nil {
		return .Error
	}
	return nil
}

// StreamPurgeRequest is optional request information to the purge API.
type StreamPurgeRequest struct {
	// Purge up to but not including sequence.
	Sequence uint64 `json:"seq,omitempty"`
	// Subject to match against messages for the purge command.
	Subject string `json:"filter,omitempty"`
	// Number of messages to keep.
	Keep uint64 `json:"keep,omitempty"`
}

type streamPurgeResponse struct {
	apiResponse
	Success bool   `json:"success,omitempty"`
	Purged  uint64 `json:"purged"`
}

// PurgeStream purges messages on a Stream.
func ( *js) ( string,  ...JSOpt) error {
	if  := checkStreamName();  != nil {
		return 
	}
	var  *StreamPurgeRequest
	var  bool
	for ,  := range  {
		// For PurgeStream, only request body opt is relevant
		if ,  = .(*StreamPurgeRequest);  {
			break
		}
	}
	return .purgeStream(, )
}

func ( *js) ( string,  *StreamPurgeRequest,  ...JSOpt) error {
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return 
	}
	if  != nil {
		defer ()
	}

	var  []byte
	if  != nil {
		if ,  = json.Marshal();  != nil {
			return 
		}
	}

	 := .apiSubj(fmt.Sprintf(apiStreamPurgeT, ))
	,  := .apiRequestWithContext(.ctx, , )
	if  != nil {
		return 
	}
	var  streamPurgeResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		return 
	}
	if .Error != nil {
		if errors.Is(.Error, ErrBadRequest) {
			return fmt.Errorf("%w: %s", ErrBadRequest, "invalid purge request body")
		}
		return .Error
	}
	return nil
}

// streamLister fetches pages of StreamInfo objects. This object is not safe
// to use for multiple threads.
type streamLister struct {
	js   *js
	page []*StreamInfo
	err  error

	offset   int
	pageInfo *apiPaged
}

// streamListResponse list of detailed stream information.
// A nil request is valid and means all streams.
type streamListResponse struct {
	apiResponse
	apiPaged
	Streams []*StreamInfo `json:"streams"`
}

// streamNamesRequest is used for Stream Name requests.
type streamNamesRequest struct {
	apiPagedRequest
	// These are filters that can be applied to the list.
	Subject string `json:"subject,omitempty"`
}

// Next fetches the next StreamInfo page.
func ( *streamLister) () bool {
	if .err != nil {
		return false
	}
	if .pageInfo != nil && .offset >= .pageInfo.Total {
		return false
	}

	,  := json.Marshal(streamNamesRequest{
		apiPagedRequest: apiPagedRequest{Offset: .offset},
		Subject:         .js.opts.streamListSubject,
	})
	if  != nil {
		.err = 
		return false
	}

	var  context.CancelFunc
	 := .js.opts.ctx
	if  == nil {
		,  = context.WithTimeout(context.Background(), .js.opts.wait)
		defer ()
	}

	 := .js.apiSubj(apiStreamListT)
	,  := .js.apiRequestWithContext(, , )
	if  != nil {
		.err = 
		return false
	}
	var  streamListResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		.err = 
		return false
	}
	if .Error != nil {
		.err = .Error
		return false
	}

	.pageInfo = &.apiPaged
	.page = .Streams
	.offset += len(.page)
	return true
}

// Page returns the current StreamInfo page.
func ( *streamLister) () []*StreamInfo {
	return .page
}

// Err returns any errors found while fetching pages.
func ( *streamLister) () error {
	return .err
}

// Streams can be used to retrieve a list of StreamInfo objects.
func ( *js) ( ...JSOpt) <-chan *StreamInfo {
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return nil
	}

	 := make(chan *StreamInfo)
	 := &streamLister{js: &js{nc: .nc, opts: }}
	go func() {
		if  != nil {
			defer ()
		}
		defer close()
		for .Next() {
			for ,  := range .Page() {
				select {
				case  <- :
				case <-.ctx.Done():
					return
				}
			}
		}
	}()

	return 
}

// StreamsInfo can be used to retrieve a list of StreamInfo objects.
// Deprecated: Use Streams() instead.
func ( *js) ( ...JSOpt) <-chan *StreamInfo {
	return .Streams(...)
}

type streamNamesLister struct {
	js *js

	err      error
	offset   int
	page     []string
	pageInfo *apiPaged
}

// Next fetches the next stream names page.
func ( *streamNamesLister) () bool {
	if .err != nil {
		return false
	}
	if .pageInfo != nil && .offset >= .pageInfo.Total {
		return false
	}

	var  context.CancelFunc
	 := .js.opts.ctx
	if  == nil {
		,  = context.WithTimeout(context.Background(), .js.opts.wait)
		defer ()
	}

	,  := json.Marshal(streamNamesRequest{
		apiPagedRequest: apiPagedRequest{Offset: .offset},
		Subject:         .js.opts.streamListSubject,
	})
	if  != nil {
		.err = 
		return false
	}
	,  := .js.apiRequestWithContext(, .js.apiSubj(apiStreams), )
	if  != nil {
		.err = 
		return false
	}
	var  streamNamesResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		.err = 
		return false
	}
	if .Error != nil {
		.err = .Error
		return false
	}

	.pageInfo = &.apiPaged
	.page = .Streams
	.offset += len(.page)
	return true
}

// Page returns the current ConsumerInfo page.
func ( *streamNamesLister) () []string {
	return .page
}

// Err returns any errors found while fetching pages.
func ( *streamNamesLister) () error {
	return .err
}

// StreamNames is used to retrieve a list of Stream names.
func ( *js) ( ...JSOpt) <-chan string {
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return nil
	}

	 := make(chan string)
	 := &streamNamesLister{js: &js{nc: .nc, opts: }}
	go func() {
		if  != nil {
			defer ()
		}
		defer close()
		for .Next() {
			for ,  := range .Page() {
				select {
				case  <- :
				case <-.ctx.Done():
					return
				}
			}
		}
	}()

	return 
}

// StreamNameBySubject returns a stream name that matches the subject.
func ( *js) ( string,  ...JSOpt) (string, error) {
	, ,  := getJSContextOpts(.opts, ...)
	if  != nil {
		return "", 
	}
	if  != nil {
		defer ()
	}

	var  streamNamesResponse
	 := &streamRequest{}
	,  := json.Marshal()
	if  != nil {
		return _EMPTY_, 
	}

	,  := .apiRequestWithContext(.ctx, .apiSubj(apiStreams), )
	if  != nil {
		if errors.Is(, ErrNoResponders) {
			 = ErrJetStreamNotEnabled
		}
		return _EMPTY_, 
	}
	if  := json.Unmarshal(.Data, &);  != nil {
		return _EMPTY_, 
	}

	if .Error != nil || len(.Streams) != 1 {
		return _EMPTY_, ErrNoMatchingStream
	}
	return .Streams[0], nil
}

func getJSContextOpts( *jsOpts,  ...JSOpt) (*jsOpts, context.CancelFunc, error) {
	var  jsOpts
	for ,  := range  {
		if  := .configureJSContext(&);  != nil {
			return nil, nil, 
		}
	}

	// Check for option collisions. Right now just timeout and context.
	if .ctx != nil && .wait != 0 {
		return nil, nil, ErrContextAndTimeout
	}
	if .wait == 0 && .ctx == nil {
		.wait = .wait
	}
	var  context.CancelFunc
	if .ctx == nil && .wait > 0 {
		.ctx,  = context.WithTimeout(context.Background(), .wait)
	}
	if .pre == _EMPTY_ {
		.pre = .pre
	}
	if .ctx != nil {
		// if context does not have a deadline, use timeout from js context
		if ,  := .ctx.Deadline(); ! {
			.ctx,  = context.WithTimeout(.ctx, .wait)
		}
	}
	return &, , nil
}