// Copyright 2020-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nats

import (
	
	
	
	
	
	
	
	
	
	
	
	

	
	
)

// JetStream allows persistent messaging through JetStream.
//
// NOTE: JetStream is part of legacy API.
// Users are encouraged to switch to the new JetStream API for enhanced capabilities and
// simplified API. Please refer to the `jetstream` package.
// See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
type JetStream interface {
	// Publish publishes a message to JetStream.
	Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)

	// PublishMsg publishes a Msg to JetStream.
	PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error)

	// PublishAsync publishes a message to JetStream and returns a PubAckFuture.
	// The data should not be changed until the PubAckFuture has been processed.
	PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error)

	// PublishMsgAsync publishes a Msg to JetStream and returns a PubAckFuture.
	// The message should not be changed until the PubAckFuture has been processed.
	PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)

	// PublishAsyncPending returns the number of async publishes outstanding for this context.
	PublishAsyncPending() int

	// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd.
	PublishAsyncComplete() <-chan struct{}

	// CleanupPublisher will cleanup the publishing side of JetStreamContext.
	//
	// This will unsubscribe from the internal reply subject if needed.
	// All pending async publishes will fail with ErrJetStreamPublisherClosed.
	//
	// If an error handler was provided, it will be called for each pending async
	// publish and PublishAsyncComplete will be closed.
	//
	// After completing JetStreamContext is still usable - internal subscription
	// will be recreated on next publish, but the acks from previous publishes will
	// be lost.
	CleanupPublisher()

	// Subscribe creates an async Subscription for JetStream.
	// The stream and consumer names can be provided with the nats.Bind() option.
	// For creating an ephemeral (where the consumer name is picked by the server),
	// you can provide the stream name with nats.BindStream().
	// If no stream name is specified, the library will attempt to figure out which
	// stream the subscription is for. See important notes below for more details.
	//
	// IMPORTANT NOTES:
	// * If none of the options Bind() nor Durable() are specified, the library will
	// send a request to the server to create an ephemeral JetStream consumer,
	// which will be deleted after an Unsubscribe() or Drain(), or automatically
	// by the server after a short period of time after the NATS subscription is
	// gone.
	// * If Durable() option is specified, the library will attempt to lookup a JetStream
	// consumer with this name, and if found, will bind to it and not attempt to
	// delete it. However, if not found, the library will send a request to
	// create such durable JetStream consumer. Note that the library will delete
	// the JetStream consumer after an Unsubscribe() or Drain() only if it
	// created the durable consumer while subscribing. If the durable consumer
	// already existed prior to subscribing it won't be deleted.
	// * If Bind() option is provided, the library will attempt to lookup the
	// consumer with the given name, and if successful, bind to it. If the lookup fails,
	// then the Subscribe() call will return an error.
	Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)

	// SubscribeSync creates a Subscription that can be used to process messages synchronously.
	// See important note in Subscribe()
	SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)

	// ChanSubscribe creates channel based Subscription.
	// See important note in Subscribe()
	ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

	// ChanQueueSubscribe creates channel based Subscription with a queue group.
	// See important note in QueueSubscribe()
	ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)

	// QueueSubscribe creates a Subscription with a queue group.
	// If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
	// See important note in Subscribe()
	QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)

	// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
	// See important note in QueueSubscribe()
	QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)

	// PullSubscribe creates a Subscription that can fetch messages.
	// See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be
	// set to an empty string.
	// When using PullSubscribe, the messages are fetched using Fetch() and FetchBatch() methods.
	PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
}

// JetStreamContext allows JetStream messaging and stream management.
//
// NOTE: JetStreamContext is part of legacy API.
// Users are encouraged to switch to the new JetStream API for enhanced capabilities and
// simplified API. Please refer to the `jetstream` package.
// See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
type JetStreamContext interface {
	JetStream
	JetStreamManager
	KeyValueManager
	ObjectStoreManager
}

// Request API subjects for JetStream.
const (
	// defaultAPIPrefix is the default prefix for the JetStream API.
	defaultAPIPrefix = "$JS.API."

	// jsDomainT is used to create JetStream API prefix by specifying only Domain
	jsDomainT = "$JS.%s.API."

	// jsExtDomainT is used to create a StreamSource External APIPrefix
	jsExtDomainT = "$JS.%s.API"

	// apiAccountInfo is for obtaining general information about JetStream.
	apiAccountInfo = "INFO"

	// apiConsumerCreateT is used to create consumers.
	// it accepts stream name and consumer name.
	apiConsumerCreateT = "CONSUMER.CREATE.%s.%s"

	// apiConsumerCreateT is used to create consumers.
	// it accepts stream name, consumer name and filter subject
	apiConsumerCreateWithFilterSubjectT = "CONSUMER.CREATE.%s.%s.%s"

	// apiLegacyConsumerCreateT is used to create consumers.
	// this is a legacy endpoint to support creating ephemerals before nats-server v2.9.0.
	apiLegacyConsumerCreateT = "CONSUMER.CREATE.%s"

	// apiDurableCreateT is used to create durable consumers.
	// this is a legacy endpoint to support creating durable consumers before nats-server v2.9.0.
	apiDurableCreateT = "CONSUMER.DURABLE.CREATE.%s.%s"

	// apiConsumerInfoT is used to create consumers.
	apiConsumerInfoT = "CONSUMER.INFO.%s.%s"

	// apiRequestNextT is the prefix for the request next message(s) for a consumer in worker/pull mode.
	apiRequestNextT = "CONSUMER.MSG.NEXT.%s.%s"

	// apiConsumerDeleteT is used to delete consumers.
	apiConsumerDeleteT = "CONSUMER.DELETE.%s.%s"

	// apiConsumerListT is used to return all detailed consumer information
	apiConsumerListT = "CONSUMER.LIST.%s"

	// apiConsumerNamesT is used to return a list with all consumer names for the stream.
	apiConsumerNamesT = "CONSUMER.NAMES.%s"

	// apiStreams can lookup a stream by subject.
	apiStreams = "STREAM.NAMES"

	// apiStreamCreateT is the endpoint to create new streams.
	apiStreamCreateT = "STREAM.CREATE.%s"

	// apiStreamInfoT is the endpoint to get information on a stream.
	apiStreamInfoT = "STREAM.INFO.%s"

	// apiStreamUpdateT is the endpoint to update existing streams.
	apiStreamUpdateT = "STREAM.UPDATE.%s"

	// apiStreamDeleteT is the endpoint to delete streams.
	apiStreamDeleteT = "STREAM.DELETE.%s"

	// apiStreamPurgeT is the endpoint to purge streams.
	apiStreamPurgeT = "STREAM.PURGE.%s"

	// apiStreamListT is the endpoint that will return all detailed stream information
	apiStreamListT = "STREAM.LIST"

	// apiMsgGetT is the endpoint to get a message.
	apiMsgGetT = "STREAM.MSG.GET.%s"

	// apiMsgGetT is the endpoint to perform a direct get of a message.
	apiDirectMsgGetT = "DIRECT.GET.%s"

	// apiDirectMsgGetLastBySubjectT is the endpoint to perform a direct get of a message by subject.
	apiDirectMsgGetLastBySubjectT = "DIRECT.GET.%s.%s"

	// apiMsgDeleteT is the endpoint to remove a message.
	apiMsgDeleteT = "STREAM.MSG.DELETE.%s"

	// orderedHeartbeatsInterval is how fast we want HBs from the server during idle.
	orderedHeartbeatsInterval = 5 * time.Second

	// Scale for threshold of missed HBs or lack of activity.
	hbcThresh = 2

	// For ChanSubscription, we can't update sub.delivered as we do for other
	// type of subscriptions, since the channel is user provided.
	// With flow control in play, we will check for flow control on incoming
	// messages (as opposed to when they are delivered), but also from a go
	// routine. Without this, the subscription would possibly stall until
	// a new message or heartbeat/fc are received.
	chanSubFCCheckInterval = 250 * time.Millisecond

	// Default time wait between retries on Publish iff err is NoResponders.
	DefaultPubRetryWait = 250 * time.Millisecond

	// Default number of retries
	DefaultPubRetryAttempts = 2

	// defaultAsyncPubAckInflight is the number of async pub acks inflight.
	defaultAsyncPubAckInflight = 4000
)

// Types of control messages, so far heartbeat and flow control
const (
	jsCtrlHB = 1
	jsCtrlFC = 2
)

// js is an internal struct from a JetStreamContext.
type js struct {
	nc   *Conn
	opts *jsOpts

	// For async publish context.
	mu             sync.RWMutex
	rpre           string
	rsub           *Subscription
	pafs           map[string]*pubAckFuture
	stc            chan struct{}
	dch            chan struct{}
	rr             *rand.Rand
	connStatusCh   chan (Status)
	replyPrefix    string
	replyPrefixLen int
}

type jsOpts struct {
	ctx context.Context
	// For importing JetStream from other accounts.
	pre string
	// Amount of time to wait for API requests.
	wait time.Duration
	// For async publish error handling.
	aecb MsgErrHandler
	// Max async pub ack in flight
	maxpa int
	// ackTimeout is the max time to wait for an ack in async publish.
	ackTimeout time.Duration
	// the domain that produced the pre
	domain string
	// enables protocol tracing
	ctrace      ClientTrace
	shouldTrace bool
	// purgeOpts contains optional stream purge options
	purgeOpts *StreamPurgeRequest
	// streamInfoOpts contains optional stream info options
	streamInfoOpts *StreamInfoRequest
	// streamListSubject is used for subject filtering when listing streams / stream names
	streamListSubject string
	// For direct get message requests
	directGet bool
	// For direct get next message
	directNextFor string

	// featureFlags are used to enable/disable specific JetStream features
	featureFlags featureFlags
}

const (
	defaultRequestWait  = 5 * time.Second
	defaultAccountCheck = 20 * time.Second
)

// JetStream returns a JetStreamContext for messaging and stream management.
// Errors are only returned if inconsistent options are provided.
//
// NOTE: JetStreamContext is part of legacy API.
// Users are encouraged to switch to the new JetStream API for enhanced capabilities and
// simplified API. Please refer to the `jetstream` package.
// See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
func ( *Conn) ( ...JSOpt) (JetStreamContext, error) {
	 := &js{
		nc: ,
		opts: &jsOpts{
			pre:   defaultAPIPrefix,
			wait:  defaultRequestWait,
			maxpa: defaultAsyncPubAckInflight,
		},
	}
	 := InboxPrefix
	if .nc.Opts.InboxPrefix != _EMPTY_ {
		 = .nc.Opts.InboxPrefix + "."
	}
	.replyPrefix = 
	.replyPrefixLen = len(.replyPrefix) + aReplyTokensize + 1

	for ,  := range  {
		if  := .configureJSContext(.opts);  != nil {
			return nil, 
		}
	}
	return , nil
}

// JSOpt configures a JetStreamContext.
type JSOpt interface {
	configureJSContext(opts *jsOpts) error
}

// jsOptFn configures an option for the JetStreamContext.
type jsOptFn func(opts *jsOpts) error

func ( jsOptFn) ( *jsOpts) error {
	return ()
}

type featureFlags struct {
	useDurableConsumerCreate bool
}

// UseLegacyDurableConsumers makes JetStream use the legacy (pre nats-server v2.9.0) subjects for consumer creation.
// If this option is used when creating JetStreamContext, $JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer> will be used
// to create a consumer with Durable provided, rather than $JS.API.CONSUMER.CREATE.<stream>.<consumer>.
func () JSOpt {
	return jsOptFn(func( *jsOpts) error {
		.featureFlags.useDurableConsumerCreate = true
		return nil
	})
}

// ClientTrace can be used to trace API interactions for the JetStream Context.
type ClientTrace struct {
	RequestSent      func(subj string, payload []byte)
	ResponseReceived func(subj string, payload []byte, hdr Header)
}

func ( ClientTrace) ( *jsOpts) error {
	.ctrace = 
	.shouldTrace = true
	return nil
}

// Domain changes the domain part of JetStream API prefix.
func ( string) JSOpt {
	if  == _EMPTY_ {
		return APIPrefix(_EMPTY_)
	}

	return jsOptFn(func( *jsOpts) error {
		.domain = 
		.pre = fmt.Sprintf(jsDomainT, )

		return nil
	})

}

func ( *StreamPurgeRequest) ( *jsOpts) error {
	.purgeOpts = 
	return nil
}

func ( *StreamInfoRequest) ( *jsOpts) error {
	.streamInfoOpts = 
	return nil
}

// APIPrefix changes the default prefix used for the JetStream API.
func ( string) JSOpt {
	return jsOptFn(func( *jsOpts) error {
		if  == _EMPTY_ {
			return nil
		}

		.pre = 
		if !strings.HasSuffix(.pre, ".") {
			.pre = .pre + "."
		}

		return nil
	})
}

// DirectGet is an option that can be used to make GetMsg() or GetLastMsg()
// retrieve message directly from a group of servers (leader and replicas)
// if the stream was created with the AllowDirect option.
func () JSOpt {
	return jsOptFn(func( *jsOpts) error {
		.directGet = true
		return nil
	})
}

// DirectGetNext is an option that can be used to make GetMsg() retrieve message
// directly from a group of servers (leader and replicas) if the stream was
// created with the AllowDirect option.
// The server will find the next message matching the filter `subject` starting
// at the start sequence (argument in GetMsg()). The filter `subject` can be a
// wildcard.
func ( string) JSOpt {
	return jsOptFn(func( *jsOpts) error {
		.directGet = true
		.directNextFor = 
		return nil
	})
}

// StreamListFilter is an option that can be used to configure `StreamsInfo()` and `StreamNames()` requests.
// It allows filtering the returned streams by subject associated with each stream.
// Wildcards can be used. For example, `StreamListFilter(FOO.*.A) will return
// all streams which have at least one subject matching the provided pattern (e.g. FOO.TEST.A).
func ( string) JSOpt {
	return jsOptFn(func( *jsOpts) error {
		.streamListSubject = 
		return nil
	})
}

func ( *js) ( string) string {
	if .opts.pre == _EMPTY_ {
		return 
	}
	var  strings.Builder
	.WriteString(.opts.pre)
	.WriteString()
	return .String()
}

// PubOpt configures options for publishing JetStream messages.
type PubOpt interface {
	configurePublish(opts *pubOpts) error
}

// pubOptFn is a function option used to configure JetStream Publish.
type pubOptFn func(opts *pubOpts) error

func ( pubOptFn) ( *pubOpts) error {
	return ()
}

type pubOpts struct {
	ctx    context.Context
	ttl    time.Duration
	id     string
	lid    string        // Expected last msgId
	str    string        // Expected stream name
	seq    *uint64       // Expected last sequence
	lss    *uint64       // Expected last sequence per subject
	msgTTL time.Duration // Message TTL

	// Publish retries for NoResponders err.
	rwait time.Duration // Retry wait between attempts
	rnum  int           // Retry attempts

	// stallWait is the max wait of a async pub ack.
	stallWait time.Duration

	// internal option to re-use existing paf in case of retry.
	pafRetry *pubAckFuture
}

// pubAckResponse is the ack response from the JetStream API when publishing a message.
type pubAckResponse struct {
	apiResponse
	*PubAck
}

// PubAck is an ack received after successfully publishing a message.
type PubAck struct {
	Stream    string `json:"stream"`
	Sequence  uint64 `json:"seq"`
	Duplicate bool   `json:"duplicate,omitempty"`
	Domain    string `json:"domain,omitempty"`
}

// Headers for published messages.
const (
	MsgIdHdr               = "Nats-Msg-Id"
	ExpectedStreamHdr      = "Nats-Expected-Stream"
	ExpectedLastSeqHdr     = "Nats-Expected-Last-Sequence"
	ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
	ExpectedLastMsgIdHdr   = "Nats-Expected-Last-Msg-Id"
	MsgRollup              = "Nats-Rollup"
	MsgTTLHdr              = "Nats-TTL"
)

// Headers for republished messages and direct gets.
const (
	JSStream       = "Nats-Stream"
	JSSequence     = "Nats-Sequence"
	JSTimeStamp    = "Nats-Time-Stamp"
	JSSubject      = "Nats-Subject"
	JSLastSequence = "Nats-Last-Sequence"
)

// MsgSize is a header that will be part of a consumer's delivered message if HeadersOnly requested.
const MsgSize = "Nats-Msg-Size"

// Rollups, can be subject only or all messages.
const (
	MsgRollupSubject = "sub"
	MsgRollupAll     = "all"
)

// PublishMsg publishes a Msg to a stream from JetStream.
func ( *js) ( *Msg,  ...PubOpt) (*PubAck, error) {
	var  = pubOpts{rwait: DefaultPubRetryWait, rnum: DefaultPubRetryAttempts}
	if len() > 0 {
		if .Header == nil {
			.Header = Header{}
		}
		for ,  := range  {
			if  := .configurePublish(&);  != nil {
				return nil, 
			}
		}
	}
	// Check for option collisions. Right now just timeout and context.
	if .ctx != nil && .ttl != 0 {
		return nil, ErrContextAndTimeout
	}
	if .ttl == 0 && .ctx == nil {
		.ttl = .opts.wait
	}
	if .stallWait > 0 {
		return nil, errors.New("nats: stall wait cannot be set to sync publish")
	}

	if .id != _EMPTY_ {
		.Header.Set(MsgIdHdr, .id)
	}
	if .lid != _EMPTY_ {
		.Header.Set(ExpectedLastMsgIdHdr, .lid)
	}
	if .str != _EMPTY_ {
		.Header.Set(ExpectedStreamHdr, .str)
	}
	if .seq != nil {
		.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*.seq, 10))
	}
	if .lss != nil {
		.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*.lss, 10))
	}
	if .msgTTL > 0 {
		.Header.Set(MsgTTLHdr, .msgTTL.String())
	}

	var  *Msg
	var  error

	if .ttl > 0 {
		,  = .nc.RequestMsg(, time.Duration(.ttl))
	} else {
		,  = .nc.RequestMsgWithContext(.ctx, )
	}

	if  != nil {
		for ,  := 0, .ttl; errors.Is(, ErrNoResponders) && ( < .rnum || .rnum < 0); ++ {
			// To protect against small blips in leadership changes etc, if we get a no responders here retry.
			if .ctx != nil {
				select {
				case <-.ctx.Done():
				case <-time.After(.rwait):
				}
			} else {
				time.Sleep(.rwait)
			}
			if .ttl > 0 {
				 -= .rwait
				if  <= 0 {
					 = ErrTimeout
					break
				}
				,  = .nc.RequestMsg(, time.Duration())
			} else {
				,  = .nc.RequestMsgWithContext(.ctx, )
			}
		}
		if  != nil {
			if errors.Is(, ErrNoResponders) {
				 = ErrNoStreamResponse
			}
			return nil, 
		}
	}

	var  pubAckResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		return nil, ErrInvalidJSAck
	}
	if .Error != nil {
		return nil, .Error
	}
	if .PubAck == nil || .PubAck.Stream == _EMPTY_ {
		return nil, ErrInvalidJSAck
	}
	return .PubAck, nil
}

// Publish publishes a message to a stream from JetStream.
func ( *js) ( string,  []byte,  ...PubOpt) (*PubAck, error) {
	return .PublishMsg(&Msg{Subject: , Data: }, ...)
}

// PubAckFuture is a future for a PubAck.
type PubAckFuture interface {
	// Ok returns a receive only channel that can be used to get a PubAck.
	Ok() <-chan *PubAck

	// Err returns a receive only channel that can be used to get the error from an async publish.
	Err() <-chan error

	// Msg returns the message that was sent to the server.
	Msg() *Msg
}

type pubAckFuture struct {
	js         *js
	msg        *Msg
	pa         *PubAck
	st         time.Time
	err        error
	errCh      chan error
	doneCh     chan *PubAck
	retries    int
	maxRetries int
	retryWait  time.Duration
	reply      string
	timeout    *time.Timer
}

func ( *pubAckFuture) () <-chan *PubAck {
	.js.mu.Lock()
	defer .js.mu.Unlock()

	if .doneCh == nil {
		.doneCh = make(chan *PubAck, 1)
		if .pa != nil {
			.doneCh <- .pa
		}
	}

	return .doneCh
}

func ( *pubAckFuture) () <-chan error {
	.js.mu.Lock()
	defer .js.mu.Unlock()

	if .errCh == nil {
		.errCh = make(chan error, 1)
		if .err != nil {
			.errCh <- .err
		}
	}

	return .errCh
}

func ( *pubAckFuture) () *Msg {
	.js.mu.RLock()
	defer .js.mu.RUnlock()
	return .msg
}

// For quick token lookup etc.
const aReplyTokensize = 6

func ( *js) () string {
	.mu.Lock()
	if .rsub == nil {
		// Create our wildcard reply subject.
		 := sha256.New()
		.Write([]byte(nuid.Next()))
		 := .Sum(nil)
		for  := 0;  < aReplyTokensize; ++ {
			[] = rdigits[int([]%base)]
		}
		.rpre = fmt.Sprintf("%s%s.", .replyPrefix, [:aReplyTokensize])
		,  := .nc.Subscribe(fmt.Sprintf("%s*", .rpre), .handleAsyncReply)
		if  != nil {
			.mu.Unlock()
			return _EMPTY_
		}
		.rsub = 
		.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
	}
	if .connStatusCh == nil {
		.connStatusCh = .nc.StatusChanged(RECONNECTING, CLOSED)
		go .resetPendingAcksOnReconnect()
	}
	var  strings.Builder
	.WriteString(.rpre)
	for {
		 := .rr.Int63()
		var  [aReplyTokensize]byte
		for ,  := 0, ;  < len(); ++ {
			[] = rdigits[%base]
			 /= base
		}
		if ,  := .pafs[string([:])];  {
			continue
		}
		.Write([:])
		break
	}
	.mu.Unlock()
	return .String()
}

func ( *js) () {
	.mu.Lock()
	 := .connStatusCh
	.mu.Unlock()
	for {
		,  := <-
		if ! ||  == CLOSED {
			return
		}
		.mu.Lock()
		 := .opts.aecb
		for ,  := range .pafs {
			.err = ErrDisconnected
			if .errCh != nil {
				.errCh <- .err
			}
			if  != nil {
				defer (, .msg, ErrDisconnected)
			}
			delete(.pafs, )
		}
		if .dch != nil {
			close(.dch)
			.dch = nil
		}
		.mu.Unlock()
	}
}

// CleanupPublisher will cleanup the publishing side of JetStreamContext.
//
// This will unsubscribe from the internal reply subject if needed.
// All pending async publishes will fail with ErrJetStreamContextClosed.
//
// If an error handler was provided, it will be called for each pending async
// publish and PublishAsyncComplete will be closed.
//
// After completing JetStreamContext is still usable - internal subscription
// will be recreated on next publish, but the acks from previous publishes will
// be lost.
func ( *js) () {
	.cleanupReplySub()
	.mu.Lock()
	 := .opts.aecb
	for ,  := range .pafs {
		.err = ErrJetStreamPublisherClosed
		if .errCh != nil {
			.errCh <- .err
		}
		if  != nil {
			defer (, .msg, ErrJetStreamPublisherClosed)
		}
		delete(.pafs, )
	}
	if .dch != nil {
		close(.dch)
		.dch = nil
	}
	.mu.Unlock()
}

func ( *js) () {
	.mu.Lock()
	if .rsub != nil {
		.rsub.Unsubscribe()
		.rsub = nil
	}
	if .connStatusCh != nil {
		close(.connStatusCh)
		.connStatusCh = nil
	}
	.mu.Unlock()
}

// registerPAF will register for a PubAckFuture.
func ( *js) ( string,  *pubAckFuture) (int, int) {
	.mu.Lock()
	if .pafs == nil {
		.pafs = make(map[string]*pubAckFuture)
	}
	.js = 
	.pafs[] = 
	 := len(.pafs)
	 := .opts.maxpa
	.mu.Unlock()
	return , 
}

// Lock should be held.
func ( *js) ( string) *pubAckFuture {
	if .pafs == nil {
		return nil
	}
	return .pafs[]
}

// clearPAF will remove a PubAckFuture that was registered.
func ( *js) ( string) {
	.mu.Lock()
	delete(.pafs, )
	.mu.Unlock()
}

// PublishAsyncPending returns how many PubAckFutures are pending.
func ( *js) () int {
	.mu.RLock()
	defer .mu.RUnlock()
	return len(.pafs)
}

func ( *js) () <-chan struct{} {
	.mu.Lock()
	if .stc == nil {
		.stc = make(chan struct{})
	}
	 := .stc
	.mu.Unlock()
	return 
}

// Handle an async reply from PublishAsync.
func ( *js) ( *Msg) {
	if len(.Subject) <= .replyPrefixLen {
		return
	}
	 := .Subject[.replyPrefixLen:]

	.mu.Lock()
	 := .getPAF()
	if  == nil {
		.mu.Unlock()
		return
	}

	 := func() {
		// Check on anyone stalled and waiting.
		if .stc != nil && len(.pafs) < .opts.maxpa {
			close(.stc)
			.stc = nil
		}
	}

	 := func() func() {
		var  chan struct{}
		// Check on anyone one waiting on done status.
		if .dch != nil && len(.pafs) == 0 {
			 = .dch
			.dch = nil
		}
		// Return function to close done channel which
		// should be deferred so that error is processed and
		// can be checked.
		return func() {
			if  != nil {
				close()
			}
		}
	}

	 := func( error) {
		.err = 
		if .errCh != nil {
			.errCh <- .err
		}
		 := .opts.aecb
		.mu.Unlock()
		if  != nil {
			(.js, .msg, )
		}
	}

	if .timeout != nil {
		.timeout.Stop()
	}

	// Process no responders etc.
	if len(.Data) == 0 && .Header.Get(statusHdr) == noResponders {
		if .retries < .maxRetries {
			.retries++
			time.AfterFunc(.retryWait, func() {
				.mu.Lock()
				 := .getPAF()
				.mu.Unlock()
				if  == nil {
					return
				}
				,  := .PublishMsgAsync(.msg, pubOptFn(func( *pubOpts) error {
					.pafRetry = 
					return nil
				}))
				if  != nil {
					.mu.Lock()
					()
				}
			})
			.mu.Unlock()
			return
		}
		delete(.pafs, )
		()
		defer ()()
		(ErrNoResponders)
		return
	}

	//remove
	delete(.pafs, )
	()
	defer ()()

	var  pubAckResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		(ErrInvalidJSAck)
		return
	}
	if .Error != nil {
		(.Error)
		return
	}
	if .PubAck == nil || .PubAck.Stream == _EMPTY_ {
		(ErrInvalidJSAck)
		return
	}

	// So here we have received a proper puback.
	.pa = .PubAck
	if .doneCh != nil {
		.doneCh <- .pa
	}
	.mu.Unlock()
}

// MsgErrHandler is used to process asynchronous errors from
// JetStream PublishAsync. It will return the original
// message sent to the server for possible retransmitting and the error encountered.
type MsgErrHandler func(JetStream, *Msg, error)

// PublishAsyncErrHandler sets the error handler for async publishes in JetStream.
func ( MsgErrHandler) JSOpt {
	return jsOptFn(func( *jsOpts) error {
		.aecb = 
		return nil
	})
}

// PublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.
func ( int) JSOpt {
	return jsOptFn(func( *jsOpts) error {
		if  < 1 {
			return errors.New("nats: max ack pending should be >= 1")
		}
		.maxpa = 
		return nil
	})
}

// PublishAsyncTimeout sets the timeout for async message publish.
// If not provided, timeout is disabled.
func ( time.Duration) JSOpt {
	return jsOptFn(func( *jsOpts) error {
		.ackTimeout = 
		return nil
	})
}

// PublishAsync publishes a message to JetStream and returns a PubAckFuture
func ( *js) ( string,  []byte,  ...PubOpt) (PubAckFuture, error) {
	return .PublishMsgAsync(&Msg{Subject: , Data: }, ...)
}

const defaultStallWait = 200 * time.Millisecond

func ( *js) ( *Msg,  ...PubOpt) (PubAckFuture, error) {
	var  pubOpts
	if len() > 0 {
		if .Header == nil {
			.Header = Header{}
		}
		for ,  := range  {
			if  := .configurePublish(&);  != nil {
				return nil, 
			}
		}
	}

	if .rnum < 0 {
		return nil, fmt.Errorf("%w: retry attempts cannot be negative", ErrInvalidArg)
	}

	// Timeouts and contexts do not make sense for these.
	if .ttl != 0 || .ctx != nil {
		return nil, ErrContextAndTimeout
	}
	 := defaultStallWait
	if .stallWait > 0 {
		 = .stallWait
	}

	// FIXME(dlc) - Make common.
	if .id != _EMPTY_ {
		.Header.Set(MsgIdHdr, .id)
	}
	if .lid != _EMPTY_ {
		.Header.Set(ExpectedLastMsgIdHdr, .lid)
	}
	if .str != _EMPTY_ {
		.Header.Set(ExpectedStreamHdr, .str)
	}
	if .seq != nil {
		.Header.Set(ExpectedLastSeqHdr, strconv.FormatUint(*.seq, 10))
	}
	if .lss != nil {
		.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*.lss, 10))
	}
	if .msgTTL > 0 {
		.Header.Set(MsgTTLHdr, .msgTTL.String())
	}

	// Reply
	 := .pafRetry
	if  == nil && .Reply != _EMPTY_ {
		return nil, errors.New("nats: reply subject should be empty")
	}
	var  string
	var  string

	// register new paf if not retrying
	if  == nil {
		 = .newAsyncReply()

		if  == _EMPTY_ {
			return nil, errors.New("nats: error creating async reply handler")
		}

		 = [.replyPrefixLen:]
		 = &pubAckFuture{msg: , st: time.Now(), maxRetries: .rnum, retryWait: .rwait, reply: }
		,  := .registerPAF(, )

		if  > 0 &&  >  {
			select {
			case <-.asyncStall():
			case <-time.After():
				.clearPAF()
				return nil, ErrTooManyStalledMsgs
			}
		}
		if .opts.ackTimeout > 0 {
			.timeout = time.AfterFunc(.opts.ackTimeout, func() {
				.mu.Lock()
				defer .mu.Unlock()

				if ,  := .pafs[]; ! {
					// paf has already been resolved
					// while waiting for the lock
					return
				}

				// ack timed out, remove from pending acks
				delete(.pafs, )

				// check on anyone stalled and waiting.
				if .stc != nil && len(.pafs) < .opts.maxpa {
					close(.stc)
					.stc = nil
				}

				// send error to user
				.err = ErrAsyncPublishTimeout
				if .errCh != nil {
					.errCh <- .err
				}

				// call error callback if set
				if .opts.aecb != nil {
					.opts.aecb(, .msg, ErrAsyncPublishTimeout)
				}

				// check on anyone one waiting on done status.
				if .dch != nil && len(.pafs) == 0 {
					close(.dch)
					.dch = nil
				}
			})
		}
	} else {
		 = .reply
		if .timeout != nil {
			.timeout.Reset(.opts.ackTimeout)
		}
		 = [.replyPrefixLen:]
	}
	,  := .headerBytes()
	if  != nil {
		return nil, 
	}
	if  := .nc.publish(.Subject, , false, , .Data);  != nil {
		.clearPAF()
		return nil, 
	}

	return , nil
}

// PublishAsyncComplete returns a channel that will be closed when all outstanding messages have been ack'd.
func ( *js) () <-chan struct{} {
	.mu.Lock()
	defer .mu.Unlock()
	if .dch == nil {
		.dch = make(chan struct{})
	}
	 := .dch
	if len(.pafs) == 0 {
		close(.dch)
		.dch = nil
	}
	return 
}

// MsgId sets the message ID used for deduplication.
func ( string) PubOpt {
	return pubOptFn(func( *pubOpts) error {
		.id = 
		return nil
	})
}

// ExpectStream sets the expected stream to respond from the publish.
func ( string) PubOpt {
	return pubOptFn(func( *pubOpts) error {
		.str = 
		return nil
	})
}

// ExpectLastSequence sets the expected sequence in the response from the publish.
func ( uint64) PubOpt {
	return pubOptFn(func( *pubOpts) error {
		.seq = &
		return nil
	})
}

// ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.
func ( uint64) PubOpt {
	return pubOptFn(func( *pubOpts) error {
		.lss = &
		return nil
	})
}

// ExpectLastMsgId sets the expected last msgId in the response from the publish.
func ( string) PubOpt {
	return pubOptFn(func( *pubOpts) error {
		.lid = 
		return nil
	})
}

// RetryWait sets the retry wait time when ErrNoResponders is encountered.
func ( time.Duration) PubOpt {
	return pubOptFn(func( *pubOpts) error {
		.rwait = 
		return nil
	})
}

// RetryAttempts sets the retry number of attempts when ErrNoResponders is encountered.
func ( int) PubOpt {
	return pubOptFn(func( *pubOpts) error {
		.rnum = 
		return nil
	})
}

// StallWait sets the max wait when the producer becomes stall producing messages.
func ( time.Duration) PubOpt {
	return pubOptFn(func( *pubOpts) error {
		if  <= 0 {
			return errors.New("nats: stall wait should be more than 0")
		}
		.stallWait = 
		return nil
	})
}

// MsgTTL sets per msg TTL.
// Requires [StreamConfig.AllowMsgTTL] to be enabled.
func ( time.Duration) PubOpt {
	return pubOptFn(func( *pubOpts) error {
		.msgTTL = 
		return nil
	})
}

type ackOpts struct {
	ttl      time.Duration
	ctx      context.Context
	nakDelay time.Duration
}

// AckOpt are the options that can be passed when acknowledge a message.
type AckOpt interface {
	configureAck(opts *ackOpts) error
}

// MaxWait sets the maximum amount of time we will wait for a response.
type MaxWait time.Duration

func ( MaxWait) ( *jsOpts) error {
	.wait = time.Duration()
	return nil
}

func ( MaxWait) ( *pullOpts) error {
	.ttl = time.Duration()
	return nil
}

// AckWait sets the maximum amount of time we will wait for an ack.
type AckWait time.Duration

func ( AckWait) ( *pubOpts) error {
	.ttl = time.Duration()
	return nil
}

func ( AckWait) ( *subOpts) error {
	.cfg.AckWait = time.Duration()
	return nil
}

func ( AckWait) ( *ackOpts) error {
	.ttl = time.Duration()
	return nil
}

// ContextOpt is an option used to set a context.Context.
type ContextOpt struct {
	context.Context
}

func ( ContextOpt) ( *jsOpts) error {
	.ctx = 
	return nil
}

func ( ContextOpt) ( *pubOpts) error {
	.ctx = 
	return nil
}

func ( ContextOpt) ( *subOpts) error {
	.ctx = 
	return nil
}

func ( ContextOpt) ( *pullOpts) error {
	.ctx = 
	return nil
}

func ( ContextOpt) ( *ackOpts) error {
	.ctx = 
	return nil
}

// Context returns an option that can be used to configure a context for APIs
// that are context aware such as those part of the JetStream interface.
func ( context.Context) ContextOpt {
	return ContextOpt{}
}

type nakDelay time.Duration

func ( nakDelay) ( *ackOpts) error {
	.nakDelay = time.Duration()
	return nil
}

// Subscribe

// ConsumerConfig is the configuration of a JetStream consumer.
type ConsumerConfig struct {
	Durable         string          `json:"durable_name,omitempty"`
	Name            string          `json:"name,omitempty"`
	Description     string          `json:"description,omitempty"`
	DeliverPolicy   DeliverPolicy   `json:"deliver_policy"`
	OptStartSeq     uint64          `json:"opt_start_seq,omitempty"`
	OptStartTime    *time.Time      `json:"opt_start_time,omitempty"`
	AckPolicy       AckPolicy       `json:"ack_policy"`
	AckWait         time.Duration   `json:"ack_wait,omitempty"`
	MaxDeliver      int             `json:"max_deliver,omitempty"`
	BackOff         []time.Duration `json:"backoff,omitempty"`
	FilterSubject   string          `json:"filter_subject,omitempty"`
	FilterSubjects  []string        `json:"filter_subjects,omitempty"`
	ReplayPolicy    ReplayPolicy    `json:"replay_policy"`
	RateLimit       uint64          `json:"rate_limit_bps,omitempty"` // Bits per sec
	SampleFrequency string          `json:"sample_freq,omitempty"`
	MaxWaiting      int             `json:"max_waiting,omitempty"`
	MaxAckPending   int             `json:"max_ack_pending,omitempty"`
	FlowControl     bool            `json:"flow_control,omitempty"`
	Heartbeat       time.Duration   `json:"idle_heartbeat,omitempty"`
	HeadersOnly     bool            `json:"headers_only,omitempty"`

	// Pull based options.
	MaxRequestBatch    int           `json:"max_batch,omitempty"`
	MaxRequestExpires  time.Duration `json:"max_expires,omitempty"`
	MaxRequestMaxBytes int           `json:"max_bytes,omitempty"`

	// Push based consumers.
	DeliverSubject string `json:"deliver_subject,omitempty"`
	DeliverGroup   string `json:"deliver_group,omitempty"`

	// Inactivity threshold.
	InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`

	// Generally inherited by parent stream and other markers, now can be configured directly.
	Replicas int `json:"num_replicas"`
	// Force memory storage.
	MemoryStorage bool `json:"mem_storage,omitempty"`

	// Metadata is additional metadata for the Consumer.
	// Keys starting with `_nats` are reserved.
	// NOTE: Metadata requires nats-server v2.10.0+
	Metadata map[string]string `json:"metadata,omitempty"`
}

// ConsumerInfo is the info from a JetStream consumer.
type ConsumerInfo struct {
	Stream         string         `json:"stream_name"`
	Name           string         `json:"name"`
	Created        time.Time      `json:"created"`
	Config         ConsumerConfig `json:"config"`
	Delivered      SequenceInfo   `json:"delivered"`
	AckFloor       SequenceInfo   `json:"ack_floor"`
	NumAckPending  int            `json:"num_ack_pending"`
	NumRedelivered int            `json:"num_redelivered"`
	NumWaiting     int            `json:"num_waiting"`
	NumPending     uint64         `json:"num_pending"`
	Cluster        *ClusterInfo   `json:"cluster,omitempty"`
	PushBound      bool           `json:"push_bound,omitempty"`
}

// SequenceInfo has both the consumer and the stream sequence and last activity.
type SequenceInfo struct {
	Consumer uint64     `json:"consumer_seq"`
	Stream   uint64     `json:"stream_seq"`
	Last     *time.Time `json:"last_active,omitempty"`
}

// SequencePair includes the consumer and stream sequence info from a JetStream consumer.
type SequencePair struct {
	Consumer uint64 `json:"consumer_seq"`
	Stream   uint64 `json:"stream_seq"`
}

// nextRequest is for getting next messages for pull based consumers from JetStream.
type nextRequest struct {
	Expires   time.Duration `json:"expires,omitempty"`
	Batch     int           `json:"batch,omitempty"`
	NoWait    bool          `json:"no_wait,omitempty"`
	MaxBytes  int           `json:"max_bytes,omitempty"`
	Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
}

// jsSub includes JetStream subscription info.
type jsSub struct {
	js *js

	// For pull subscribers, this is the next message subject to send requests to.
	nms string

	psubj    string // the subject that was passed by user to the subscribe calls
	consumer string
	stream   string
	deliver  string
	pull     bool
	dc       bool // Delete JS consumer
	ackNone  bool

	// This is ConsumerInfo's Pending+Consumer.Delivered that we get from the
	// add consumer response. Note that some versions of the server gather the
	// consumer info *after* the creation of the consumer, which means that
	// some messages may have been already delivered. So the sum of the two
	// is a more accurate representation of the number of messages pending or
	// in the process of being delivered to the subscription when created.
	pending uint64

	// Ordered consumers
	ordered bool
	dseq    uint64
	sseq    uint64
	ccreq   *createConsumerRequest

	// Heartbeats and Flow Control handling from push consumers.
	hbc    *time.Timer
	hbi    time.Duration
	active bool
	cmeta  string
	fcr    string
	fcd    uint64
	fciseq uint64
	csfct  *time.Timer

	// context set on js.Subscribe used e.g. to recreate ordered consumer
	ctx context.Context

	// Cancellation function to cancel context on drain/unsubscribe.
	cancel func()
}

// Deletes the JS Consumer.
// No connection nor subscription lock must be held on entry.
func ( *Subscription) () error {
	.mu.Lock()
	 := .jsi
	if  == nil {
		.mu.Unlock()
		return nil
	}
	if .stream == _EMPTY_ || .consumer == _EMPTY_ {
		.mu.Unlock()
		return nil
	}
	,  := .stream, .consumer
	 := .js
	.mu.Unlock()

	return .DeleteConsumer(, )
}

// SubOpt configures options for subscribing to JetStream consumers.
type SubOpt interface {
	configureSubscribe(opts *subOpts) error
}

// subOptFn is a function option used to configure a JetStream Subscribe.
type subOptFn func(opts *subOpts) error

func ( subOptFn) ( *subOpts) error {
	return ()
}

// Subscribe creates an async Subscription for JetStream.
// The stream and consumer names can be provided with the nats.Bind() option.
// For creating an ephemeral (where the consumer name is picked by the server),
// you can provide the stream name with nats.BindStream().
// If no stream name is specified, the library will attempt to figure out which
// stream the subscription is for. See important notes below for more details.
//
// IMPORTANT NOTES:
// * If none of the options Bind() nor Durable() are specified, the library will
// send a request to the server to create an ephemeral JetStream consumer,
// which will be deleted after an Unsubscribe() or Drain(), or automatically
// by the server after a short period of time after the NATS subscription is
// gone.
// * If Durable() option is specified, the library will attempt to lookup a JetStream
// consumer with this name, and if found, will bind to it and not attempt to
// delete it. However, if not found, the library will send a request to create
// such durable JetStream consumer. The library will delete the JetStream consumer
// after an Unsubscribe() or Drain().
// * If Bind() option is provided, the library will attempt to lookup the
// consumer with the given name, and if successful, bind to it. If the lookup fails,
// then the Subscribe() call will return an error.
func ( *js) ( string,  MsgHandler,  ...SubOpt) (*Subscription, error) {
	if  == nil {
		return nil, ErrBadSubscription
	}
	return .subscribe(, _EMPTY_, , nil, false, false, )
}

// SubscribeSync creates a Subscription that can be used to process messages synchronously.
// See important note in Subscribe()
func ( *js) ( string,  ...SubOpt) (*Subscription, error) {
	 := make(chan *Msg, .nc.Opts.SubChanLen)
	return .subscribe(, _EMPTY_, nil, , true, false, )
}

// QueueSubscribe creates a Subscription with a queue group.
// If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
// See important note in Subscribe()
func ( *js) (,  string,  MsgHandler,  ...SubOpt) (*Subscription, error) {
	if  == nil {
		return nil, ErrBadSubscription
	}
	return .subscribe(, , , nil, false, false, )
}

// QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
// See important note in QueueSubscribe()
func ( *js) (,  string,  ...SubOpt) (*Subscription, error) {
	 := make(chan *Msg, .nc.Opts.SubChanLen)
	return .subscribe(, , nil, , true, false, )
}

// ChanSubscribe creates channel based Subscription.
// Using ChanSubscribe without buffered capacity is not recommended since
// it will be prone to dropping messages with a slow consumer error.  Make sure to give the channel enough
// capacity to handle bursts in traffic, for example other Subscribe APIs use a default of 512k capacity in comparison.
// See important note in Subscribe()
func ( *js) ( string,  chan *Msg,  ...SubOpt) (*Subscription, error) {
	return .subscribe(, _EMPTY_, nil, , false, false, )
}

// ChanQueueSubscribe creates channel based Subscription with a queue group.
// See important note in QueueSubscribe()
func ( *js) (,  string,  chan *Msg,  ...SubOpt) (*Subscription, error) {
	return .subscribe(, , nil, , false, false, )
}

// PullSubscribe creates a Subscription that can fetch messages.
// See important note in Subscribe()
func ( *js) (,  string,  ...SubOpt) (*Subscription, error) {
	 := make(chan *Msg, .nc.Opts.SubChanLen)
	if  != "" {
		 = append(, Durable())
	}
	return .subscribe(, _EMPTY_, nil, , true, true, )
}

func processConsInfo( *ConsumerInfo,  *ConsumerConfig,  bool, ,  string) (string, error) {
	 := &.Config

	// Make sure this new subject matches or is a subset.
	if .FilterSubject != _EMPTY_ &&  != .FilterSubject {
		return _EMPTY_, ErrSubjectMismatch
	}

	// Prevent binding a subscription against incompatible consumer types.
	if  && .DeliverSubject != _EMPTY_ {
		return _EMPTY_, ErrPullSubscribeToPushConsumer
	} else if ! && .DeliverSubject == _EMPTY_ {
		return _EMPTY_, ErrPullSubscribeRequired
	}

	// If pull mode, nothing else to check here.
	if  {
		return _EMPTY_, checkConfig(, )
	}

	// At this point, we know the user wants push mode, and the JS consumer is
	// really push mode.

	 := .Config.DeliverGroup
	if  == _EMPTY_ {
		// Prevent an user from attempting to create a queue subscription on
		// a JS consumer that was not created with a deliver group.
		if  != _EMPTY_ {
			return _EMPTY_, errors.New("cannot create a queue subscription for a consumer without a deliver group")
		} else if .PushBound {
			// Need to reject a non queue subscription to a non queue consumer
			// if the consumer is already bound.
			return _EMPTY_, errors.New("consumer is already bound to a subscription")
		}
	} else {
		// If the JS consumer has a deliver group, we need to fail a non queue
		// subscription attempt:
		if  == _EMPTY_ {
			return _EMPTY_, fmt.Errorf("cannot create a subscription for a consumer with a deliver group %q", )
		} else if  !=  {
			// Here the user's queue group name does not match the one associated
			// with the JS consumer.
			return _EMPTY_, fmt.Errorf("cannot create a queue subscription %q for a consumer with a deliver group %q",
				, )
		}
	}
	if  := checkConfig(, );  != nil {
		return _EMPTY_, 
	}
	return .DeliverSubject, nil
}

func checkConfig(,  *ConsumerConfig) error {
	 := func( string, ,  any) error {
		return fmt.Errorf("nats: configuration requests %s to be %v, but consumer's value is %v", , , )
	}

	if .Durable != _EMPTY_ && .Durable != .Durable {
		return ("durable", .Durable, .Durable)
	}
	if .Description != _EMPTY_ && .Description != .Description {
		return ("description", .Description, .Description)
	}
	if .DeliverPolicy != deliverPolicyNotSet && .DeliverPolicy != .DeliverPolicy {
		return ("deliver policy", .DeliverPolicy, .DeliverPolicy)
	}
	if .OptStartSeq > 0 && .OptStartSeq != .OptStartSeq {
		return ("optional start sequence", .OptStartSeq, .OptStartSeq)
	}
	if .OptStartTime != nil && !.OptStartTime.IsZero() && !(*.OptStartTime).Equal(*.OptStartTime) {
		return ("optional start time", .OptStartTime, .OptStartTime)
	}
	if .AckPolicy != ackPolicyNotSet && .AckPolicy != .AckPolicy {
		return ("ack policy", .AckPolicy, .AckPolicy)
	}
	if .AckWait > 0 && .AckWait != .AckWait {
		return ("ack wait", .AckWait, .AckWait)
	}
	if .MaxDeliver > 0 && .MaxDeliver != .MaxDeliver {
		return ("max deliver", .MaxDeliver, .MaxDeliver)
	}
	if .ReplayPolicy != replayPolicyNotSet && .ReplayPolicy != .ReplayPolicy {
		return ("replay policy", .ReplayPolicy, .ReplayPolicy)
	}
	if .RateLimit > 0 && .RateLimit != .RateLimit {
		return ("rate limit", .RateLimit, .RateLimit)
	}
	if .SampleFrequency != _EMPTY_ && .SampleFrequency != .SampleFrequency {
		return ("sample frequency", .SampleFrequency, .SampleFrequency)
	}
	if .MaxWaiting > 0 && .MaxWaiting != .MaxWaiting {
		return ("max waiting", .MaxWaiting, .MaxWaiting)
	}
	if .MaxAckPending > 0 && .MaxAckPending != .MaxAckPending {
		return ("max ack pending", .MaxAckPending, .MaxAckPending)
	}
	// For flow control, we want to fail if the user explicit wanted it, but
	// it is not set in the existing consumer. If it is not asked by the user,
	// the library still handles it and so no reason to fail.
	if .FlowControl && !.FlowControl {
		return ("flow control", .FlowControl, .FlowControl)
	}
	if .Heartbeat > 0 && .Heartbeat != .Heartbeat {
		return ("heartbeat", .Heartbeat, .Heartbeat)
	}
	if .Replicas > 0 && .Replicas != .Replicas {
		return ("replicas", .Replicas, .Replicas)
	}
	if .MemoryStorage && !.MemoryStorage {
		return ("memory storage", .MemoryStorage, .MemoryStorage)
	}
	return nil
}

func ( *js) (,  string,  MsgHandler,  chan *Msg, ,  bool,  []SubOpt) (*Subscription, error) {
	 := ConsumerConfig{
		DeliverPolicy: deliverPolicyNotSet,
		AckPolicy:     ackPolicyNotSet,
		ReplayPolicy:  replayPolicyNotSet,
	}
	 := subOpts{cfg: &}
	if len() > 0 {
		for ,  := range  {
			if  == nil {
				continue
			}
			if  := .configureSubscribe(&);  != nil {
				return nil, 
			}
		}
	}

	// If no stream name is specified, the subject cannot be empty.
	if  == _EMPTY_ && .stream == _EMPTY_ {
		return nil, errors.New("nats: subject required")
	}

	// Note that these may change based on the consumer info response we may get.
	 := .cfg.Heartbeat > 0
	 := .cfg.FlowControl

	// Some checks for pull subscribers
	if  {
		// No deliver subject should be provided
		if .cfg.DeliverSubject != _EMPTY_ {
			return nil, ErrPullSubscribeToPushConsumer
		}
	}

	// Some check/setting specific to queue subs
	if  != _EMPTY_ {
		// Queue subscriber cannot have HB or FC (since messages will be randomly dispatched
		// to members). We may in the future have a separate NATS subscription that all members
		// would subscribe to and server would send on.
		if .cfg.Heartbeat > 0 || .cfg.FlowControl {
			// Not making this a public ErrXXX in case we allow in the future.
			return nil, errors.New("nats: queue subscription doesn't support idle heartbeat nor flow control")
		}

		// If this is a queue subscription and no consumer nor durable name was specified,
		// then we will use the queue name as a durable name.
		if .consumer == _EMPTY_ && .cfg.Durable == _EMPTY_ {
			if  := checkConsumerName();  != nil {
				return nil, 
			}
			.cfg.Durable = 
		}
	}

	var (
		           error
		  bool
		          *ConsumerInfo
		       string
		        = .stream
		      = .consumer
		     = .cfg.Durable != _EMPTY_
		 = .bound
		           = .ctx
		     = .skipCInfo
		   bool
		     bool
		            = .nc
		           string
		           time.Duration
		         *createConsumerRequest // In case we need to hold onto it for ordered consumers.
		         int
	)

	// Do some quick checks here for ordered consumers. We do these here instead of spread out
	// in the individual SubOpts.
	if .ordered {
		// Make sure we are not durable.
		if  {
			return nil, errors.New("nats: durable can not be set for an ordered consumer")
		}
		// Check ack policy.
		if .cfg.AckPolicy != ackPolicyNotSet {
			return nil, errors.New("nats: ack policy can not be set for an ordered consumer")
		}
		// Check max deliver.
		if .cfg.MaxDeliver != 1 && .cfg.MaxDeliver != 0 {
			return nil, errors.New("nats: max deliver can not be set for an ordered consumer")
		}
		// No deliver subject, we pick our own.
		if .cfg.DeliverSubject != _EMPTY_ {
			return nil, errors.New("nats: deliver subject can not be set for an ordered consumer")
		}
		// Queue groups not allowed.
		if  != _EMPTY_ {
			return nil, errors.New("nats: queues not be set for an ordered consumer")
		}
		// Check for bound consumers.
		if  != _EMPTY_ {
			return nil, errors.New("nats: can not bind existing consumer for an ordered consumer")
		}
		// Check for pull mode.
		if  {
			return nil, errors.New("nats: can not use pull mode for an ordered consumer")
		}
		// Setup how we need it to be here.
		.cfg.FlowControl = true
		.cfg.AckPolicy = AckNonePolicy
		.cfg.MaxDeliver = 1
		.cfg.AckWait = 22 * time.Hour // Just set to something known, not utilized.
		// Force R1 and MemoryStorage for these.
		.cfg.Replicas = 1
		.cfg.MemoryStorage = true

		if ! {
			.cfg.Heartbeat = orderedHeartbeatsInterval
		}
		,  = true, true
		.mack = true // To avoid auto-ack wrapping call below.
		 = .cfg.Heartbeat
	}

	// In case a consumer has not been set explicitly, then the
	// durable name will be used as the consumer name.
	if  == _EMPTY_ {
		 = .cfg.Durable
	}

	// Find the stream mapped to the subject if not bound to a stream already.
	if  == _EMPTY_ {
		,  = .StreamNameBySubject()
		if  != nil {
			return nil, 
		}
	}

	// With an explicit durable name, we can lookup the consumer first
	// to which it should be attaching to.
	// If SkipConsumerLookup was used, do not call consumer info.
	if  != _EMPTY_ && !.skipCInfo {
		,  = .ConsumerInfo(, )
		 = errors.Is(, ErrConsumerNotFound)
		 =  == ErrJetStreamNotEnabled || errors.Is(, ErrTimeout) || errors.Is(, context.DeadlineExceeded)
	}

	switch {
	case  != nil:
		,  = processConsInfo(, .cfg, , , )
		if  != nil {
			return nil, 
		}
		 := &.Config
		,  = .FlowControl, .Heartbeat
		 =  > 0
		 = .MaxAckPending
	case ( != nil && !) || ( && ):
		// If the consumer is being bound and we got an error on pull subscribe then allow the error.
		if !( &&  && ) {
			return nil, 
		}
	case :
		// When skipping consumer info, need to rely on the manually passed sub options
		// to match the expected behavior from the subscription.
		,  = .cfg.FlowControl, .cfg.Heartbeat
		 =  > 0
		 = .cfg.MaxAckPending
		 = .cfg.DeliverSubject
		if  {
			break
		}

		// When not bound to a consumer already, proceed to create.
		fallthrough
	default:
		// Attempt to create consumer if not found nor using Bind.
		 = true
		if .cfg.DeliverSubject != _EMPTY_ {
			 = .cfg.DeliverSubject
		} else if ! {
			 = .NewInbox()
			.DeliverSubject = 
		}
		// Do filtering always, server will clear as needed.
		.FilterSubject = 

		// Pass the queue to the consumer config
		if  != _EMPTY_ {
			.DeliverGroup = 
		}

		// If not set, default to deliver all
		if .DeliverPolicy == deliverPolicyNotSet {
			.DeliverPolicy = DeliverAllPolicy
		}
		// If not set, default to ack explicit.
		if .AckPolicy == ackPolicyNotSet {
			.AckPolicy = AckExplicitPolicy
		}
		// If not set, default to instant
		if .ReplayPolicy == replayPolicyNotSet {
			.ReplayPolicy = ReplayInstantPolicy
		}

		// If we have acks at all and the MaxAckPending is not set go ahead
		// and set to the internal max for channel based consumers
		if .MaxAckPending == 0 &&  != nil && .AckPolicy != AckNonePolicy {
			.MaxAckPending = cap()
		}
		// Create request here.
		 = &createConsumerRequest{
			Stream: ,
			Config: &,
		}
		 = .Heartbeat
	}

	if  {
		 = fmt.Sprintf(.apiSubj(apiRequestNextT), , )
		 = .NewInbox()
		// for pull consumers, create a wildcard subscription to differentiate pull requests
		 += ".*"
	}

	// In case this has a context, then create a child context that
	// is possible to cancel via unsubscribe / drain.
	var  func()
	if  != nil {
		,  = context.WithCancel()
	}

	 := &jsSub{
		js:       ,
		stream:   ,
		consumer: ,
		deliver:  ,
		hbi:      ,
		ordered:  .ordered,
		ccreq:    ,
		dseq:     1,
		pull:     ,
		nms:      ,
		psubj:    ,
		cancel:   ,
		ackNone:  .cfg.AckPolicy == AckNonePolicy,
		ctx:      .ctx,
	}

	// Auto acknowledge unless manual ack is set or policy is set to AckNonePolicy
	if  != nil && !.mack && .cfg.AckPolicy != AckNonePolicy {
		 := 
		 = func( *Msg) { (); .Ack() }
	}
	,  := .subscribe(, , , , nil, , )
	if  != nil {
		return nil, 
	}

	// If we fail and we had the sub we need to cleanup, but can't just do a straight Unsubscribe or Drain.
	// We need to clear the jsi so we do not remove any durables etc.
	 := func() {
		if  != nil {
			.mu.Lock()
			.jsi = nil
			.mu.Unlock()
			.Unsubscribe()
		}
	}

	// If we are creating or updating let's process that request.
	 := .cfg.Name
	if  {
		if .Durable != "" {
			 = .Durable
		} else if  == "" {
			 = getHash(nuid.Next())
		}
		var  *ConsumerInfo
		if .ctx != nil {
			,  = .upsertConsumer(, , .Config, Context(.ctx))
		} else {
			,  = .upsertConsumer(, , .Config)
		}
		if  != nil {
			var  *APIError
			if  := errors.As(, &); ! {
				()
				return nil, 
			}
			if  == _EMPTY_ ||
				(.ErrorCode != JSErrCodeConsumerAlreadyExists && .ErrorCode != JSErrCodeConsumerNameExists) {
				()
				if errors.Is(, ErrStreamNotFound) {
					return nil, ErrStreamNotFound
				}
				return nil, 
			}
			// We will not be using this sub here if we were push based.
			if ! {
				()
			}

			,  = .ConsumerInfo(, )
			if  != nil {
				return nil, 
			}
			,  = processConsInfo(, .cfg, , , )
			if  != nil {
				return nil, 
			}

			if ! {
				// We can't reuse the channel, so if one was passed, we need to create a new one.
				if  {
					 = make(chan *Msg, cap())
				} else if  != nil {
					// User provided (ChanSubscription), simply try to drain it.
					for  := false; !; {
						select {
						case <-:
						default:
							 = true
						}
					}
				}
				.deliver = 
				.hbi = .Config.Heartbeat

				// Recreate the subscription here.
				,  = .subscribe(.deliver, , , , nil, , )
				if  != nil {
					return nil, 
				}
				 = .Config.FlowControl
				 = .Config.Heartbeat > 0
			}
		} else {
			// Since the library created the JS consumer, it will delete it on Unsubscribe()/Drain()
			.mu.Lock()
			.jsi.dc = true
			.jsi.pending = .NumPending + .Delivered.Consumer
			// If this is an ephemeral, we did not have a consumer name, we get it from the info
			// after the AddConsumer returns.
			if  == _EMPTY_ {
				.jsi.consumer = .Name
				if  {
					.jsi.nms = fmt.Sprintf(.apiSubj(apiRequestNextT), , .Name)
				}
			}
			.mu.Unlock()
		}
		// Capture max ack pending from the info response here which covers both
		// success and failure followed by consumer lookup.
		 = .Config.MaxAckPending
	}

	// If maxap is greater than the default sub's pending limit, use that.
	if  > DefaultSubPendingMsgsLimit {
		// For bytes limit, use the min of maxp*1MB or DefaultSubPendingBytesLimit
		 :=  * 1024 * 1024
		if  < DefaultSubPendingBytesLimit {
			 = DefaultSubPendingBytesLimit
		}
		if  := .SetPendingLimits(, );  != nil {
			return nil, 
		}
	}

	// Do heartbeats last if needed.
	if  {
		.scheduleHeartbeatCheck()
	}
	// For ChanSubscriptions, if we know that there is flow control, we will
	// start a go routine that evaluates the number of delivered messages
	// and process flow control.
	if .Type() == ChanSubscription &&  {
		.chanSubcheckForFlowControlResponse()
	}

	// Wait for context to get canceled if there is one.
	if  != nil {
		go func() {
			<-.Done()
			.Unsubscribe()
		}()
	}

	return , nil
}

// InitialConsumerPending returns the number of messages pending to be
// delivered to the consumer when the subscription was created.
func ( *Subscription) () (uint64, error) {
	.mu.Lock()
	defer .mu.Unlock()
	if .jsi == nil || .jsi.consumer == _EMPTY_ {
		return 0, fmt.Errorf("%w: not a JetStream subscription", ErrTypeSubscription)
	}
	return .jsi.pending, nil
}

// This long-lived routine is used per ChanSubscription to check
// on the number of delivered messages and check for flow control response.
func ( *Subscription) () {
	.mu.Lock()
	// We don't use defer since if we need to send an RC reply, we need
	// to do it outside the sub's lock. So doing explicit unlock...
	if .closed {
		.mu.Unlock()
		return
	}
	var  string
	var  *Conn

	 := .jsi
	if .csfct == nil {
		.csfct = time.AfterFunc(chanSubFCCheckInterval, .)
	} else {
		 = .checkForFlowControlResponse()
		 = .conn
		// Do the reset here under the lock, it's ok...
		.csfct.Reset(chanSubFCCheckInterval)
	}
	.mu.Unlock()
	// This call will return an error (which we don't care here)
	// if nc is nil or fcReply is empty.
	.Publish(, nil)
}

// ErrConsumerSequenceMismatch represents an error from a consumer
// that received a Heartbeat including sequence different to the
// one expected from the view of the client.
type ErrConsumerSequenceMismatch struct {
	// StreamResumeSequence is the stream sequence from where the consumer
	// should resume consuming from the stream.
	StreamResumeSequence uint64

	// ConsumerSequence is the sequence of the consumer that is behind.
	ConsumerSequence uint64

	// LastConsumerSequence is the sequence of the consumer when the heartbeat
	// was received.
	LastConsumerSequence uint64
}

func ( *ErrConsumerSequenceMismatch) () string {
	return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
		.ConsumerSequence,
		.LastConsumerSequence-.ConsumerSequence,
		.StreamResumeSequence,
	)
}

// isJSControlMessage will return true if this is an empty control status message
// and indicate what type of control message it is, say jsCtrlHB or jsCtrlFC
func isJSControlMessage( *Msg) (bool, int) {
	if len(.Data) > 0 || .Header.Get(statusHdr) != controlMsg {
		return false, 0
	}
	 := .Header.Get(descrHdr)
	if strings.HasPrefix(, "Idle") {
		return true, jsCtrlHB
	}
	if strings.HasPrefix(, "Flow") {
		return true, jsCtrlFC
	}
	return true, 0
}

// Keeps track of the incoming message's reply subject so that the consumer's
// state (deliver sequence, etc..) can be checked against heartbeats.
// We will also bump the incoming data message sequence that is used in FC cases.
// Runs under the subscription lock
func ( *Subscription) ( string) {
	// For flow control, keep track of incoming message sequence.
	.jsi.fciseq++
	.jsi.cmeta = 
}

// Check to make sure messages are arriving in order.
// Returns true if the sub had to be replaced. Will cause upper layers to return.
// The caller has verified that sub.jsi != nil and that this is not a control message.
// Lock should be held.
func ( *Subscription) ( *Msg) bool {
	// Ignore msgs with no reply like HBs and flow control, they are handled elsewhere.
	if .Reply == _EMPTY_ {
		return false
	}

	// Normal message here.
	,  := parser.GetMetadataFields(.Reply)
	if  != nil {
		return false
	}
	,  := parser.ParseNum([parser.AckStreamSeqTokenPos]), parser.ParseNum([parser.AckConsumerSeqTokenPos])

	 := .jsi
	if  != .dseq {
		.resetOrderedConsumer(.sseq + 1)
		return true
	}
	// Update our tracking here.
	.dseq, .sseq = +1, 
	return false
}

// Update and replace sid.
// Lock should be held on entry but will be unlocked to prevent lock inversion.
func ( *Subscription) () ( int64) {
	 := .conn
	.mu.Unlock()

	.subsMu.Lock()
	 = .sid
	delete(.subs, )
	// Place new one.
	.ssid++
	 := .ssid
	.subs[] = 
	.subsMu.Unlock()

	.mu.Lock()
	.sid = 
	return 
}

// We are here if we have detected a gap with an ordered consumer.
// We will create a new consumer and rewire the low level subscription.
// Lock should be held.
func ( *Subscription) ( uint64) {
	 := .conn
	if .jsi == nil ||  == nil || .closed {
		return
	}

	var  string
	// If there was an AUTO_UNSUB done, we need to adjust the new value
	// to send after the SUB for the new sid.
	if .max > 0 {
		if .jsi.fciseq < .max {
			 := .max - .jsi.fciseq
			 = strconv.Itoa(int())
		} else {
			// We are already at the max, so we should just unsub the
			// existing sub and be done
			go func( int64) {
				.mu.Lock()
				.bw.appendString(fmt.Sprintf(unsubProto, , _EMPTY_))
				.kickFlusher()
				.mu.Unlock()
			}(.sid)
			return
		}
	}

	// Quick unsubscribe. Since we know this is a simple push subscriber we do in place.
	 := .applyNewSID()

	// Grab new inbox.
	 := .NewInbox()
	.Subject = 

	// Snapshot the new sid under sub lock.
	 := .sid

	// We are still in the low level readLoop for the connection so we need
	// to spin a go routine to try to create the new consumer.
	go func() {
		// Unsubscribe and subscribe with new inbox and sid.
		// Remap a new low level sub into this sub since its client accessible.
		// This is done here in this go routine to prevent lock inversion.
		.mu.Lock()
		.bw.appendString(fmt.Sprintf(unsubProto, , _EMPTY_))
		.bw.appendString(fmt.Sprintf(subProto, , _EMPTY_, ))
		if  != _EMPTY_ {
			.bw.appendString(fmt.Sprintf(unsubProto, , ))
		}
		.kickFlusher()
		.mu.Unlock()

		 := func( error) {
			.handleConsumerSequenceMismatch(, fmt.Errorf("%w: recreating ordered consumer", ))
			.unsubscribe(, 0, true)
		}

		.mu.Lock()
		 := .jsi
		// Reset some items in jsi.
		.dseq = 1
		.cmeta = _EMPTY_
		.fcr, .fcd = _EMPTY_, 0
		.deliver = 
		// Reset consumer request for starting policy.
		 := .ccreq.Config
		.DeliverSubject = 
		.DeliverPolicy = DeliverByStartSequencePolicy
		.OptStartSeq = 
		// In case the consumer was created with a start time, we need to clear it
		// since we are now using a start sequence.
		.OptStartTime = nil

		 := .js
		.mu.Unlock()

		.mu.Lock()
		// Attempt to delete the existing consumer.
		// We don't wait for the response since even if it's unsuccessful,
		// inactivity threshold will kick in and delete it.
		if .consumer != _EMPTY_ {
			go .DeleteConsumer(.stream, .consumer)
		}
		.consumer = ""
		.mu.Unlock()
		 := getHash(nuid.Next())
		var  *ConsumerInfo
		var  error
		if .opts.ctx != nil {
			,  = .upsertConsumer(.stream, , , Context(.opts.ctx))
		} else {
			,  = .upsertConsumer(.stream, , )
		}
		if  != nil {
			var  *APIError
			if errors.Is(, ErrJetStreamNotEnabled) || errors.Is(, ErrTimeout) || errors.Is(, context.DeadlineExceeded) {
				// if creating consumer failed, retry
				return
			} else if errors.As(, &) && .ErrorCode == JSErrCodeInsufficientResourcesErr {
				// retry for insufficient resources, as it may mean that client is connected to a running
				// server in cluster while the server hosting R1 JetStream resources is restarting
				return
			} else if errors.As(, &) && .ErrorCode == JSErrCodeJetStreamNotAvailable {
				// retry if JetStream meta leader is temporarily unavailable
				return
			}
			()
			return
		}

		.mu.Lock()
		.consumer = .Name
		.mu.Unlock()
	}()
}

// For jetstream subscriptions, returns the number of delivered messages.
// For ChanSubscription, this value is computed based on the known number
// of messages added to the channel minus the current size of that channel.
// Lock held on entry
func ( *Subscription) () uint64 {
	if .typ == ChanSubscription {
		return .jsi.fciseq - uint64(len(.mch))
	}
	return .delivered
}

// checkForFlowControlResponse will check to see if we should send a flow control response
// based on the subscription current delivered index and the target.
// Runs under subscription lock
func ( *Subscription) () string {
	// Caller has verified that there is a sub.jsi and fc
	 := .jsi
	.active = true
	if .getJSDelivered() >= .fcd {
		 := .fcr
		.fcr, .fcd = _EMPTY_, 0
		return 
	}
	return _EMPTY_
}

// Record an inbound flow control message.
// Runs under subscription lock
func ( *Subscription) ( string) {
	.jsi.fcr, .jsi.fcd = , .jsi.fciseq
}

// Checks for activity from our consumer.
// If we do not think we are active send an async error.
func ( *Subscription) () {
	.mu.Lock()
	 := .jsi
	if  == nil || .closed {
		.mu.Unlock()
		return
	}

	 := .active
	.hbc.Reset(.hbi * hbcThresh)
	.active = false
	 := .conn
	.mu.Unlock()

	if ! {
		if !.ordered || .Status() != CONNECTED {
			.mu.Lock()
			if  := .Opts.AsyncErrorCB;  != nil {
				.ach.push(func() { (, , ErrConsumerNotActive) })
			}
			.mu.Unlock()
			return
		}
		.mu.Lock()
		.resetOrderedConsumer(.sseq + 1)
		.mu.Unlock()
	}
}

// scheduleHeartbeatCheck sets up the timer check to make sure we are active
// or receiving idle heartbeats..
func ( *Subscription) () {
	.mu.Lock()
	defer .mu.Unlock()

	 := .jsi
	if  == nil {
		return
	}

	if .hbc == nil {
		.hbc = time.AfterFunc(.hbi*hbcThresh, .activityCheck)
	} else {
		.hbc.Reset(.hbi * hbcThresh)
	}
}

// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer.
func ( *Conn) ( *Subscription,  error) {
	.mu.Lock()
	 := .Opts.AsyncErrorCB
	if  != nil {
		.ach.push(func() { (, , ) })
	}
	.mu.Unlock()
}

// checkForSequenceMismatch will make sure we have not missed any messages since last seen.
func ( *Conn) ( *Msg,  *Subscription,  *jsSub) {
	// Process heartbeat received, get latest control metadata if present.
	.mu.Lock()
	,  := .cmeta, .ordered
	.active = true
	.mu.Unlock()

	if  == _EMPTY_ {
		return
	}

	,  := parser.GetMetadataFields()
	if  != nil {
		return
	}

	// Consumer sequence.
	var  string
	 := [parser.AckConsumerSeqTokenPos]
	 := .Header[lastConsumerSeqHdr]
	if len() == 1 {
		 = [0]
	}

	// Detect consumer sequence mismatch and whether
	// should restart the consumer.
	if  !=  {
		// Dispatch async error including details such as
		// from where the consumer could be restarted.
		 := parser.ParseNum([parser.AckStreamSeqTokenPos])
		if  {
			.mu.Lock()
			.resetOrderedConsumer(.sseq + 1)
			.mu.Unlock()
		} else {
			 := &ErrConsumerSequenceMismatch{
				StreamResumeSequence: uint64(),
				ConsumerSequence:     parser.ParseNum(),
				LastConsumerSequence: parser.ParseNum(),
			}
			.handleConsumerSequenceMismatch(, )
		}
	}
}

type streamRequest struct {
	Subject string `json:"subject,omitempty"`
}

type streamNamesResponse struct {
	apiResponse
	apiPaged
	Streams []string `json:"streams"`
}

type subOpts struct {
	// For attaching.
	stream, consumer string
	// For creating or updating.
	cfg *ConsumerConfig
	// For binding a subscription to a consumer without creating it.
	bound bool
	// For manual ack
	mack bool
	// For an ordered consumer.
	ordered bool
	ctx     context.Context

	// To disable calling ConsumerInfo
	skipCInfo bool
}

// SkipConsumerLookup will omit looking up consumer when [Bind], [Durable]
// or [ConsumerName] are provided.
//
// NOTE: This setting may cause an existing consumer to be overwritten. Also,
// because consumer lookup is skipped, all consumer options like AckPolicy,
// DeliverSubject etc. need to be provided even if consumer already exists.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.skipCInfo = true
		return nil
	})
}

// OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages.
// There are no redeliveries and no acks, and flow control and heartbeats will be added but
// will be taken care of without additional client code.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.ordered = true
		return nil
	})
}

// ManualAck disables auto ack functionality for async subscriptions.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.mack = true
		return nil
	})
}

// Description will set the description for the created consumer.
func ( string) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.Description = 
		return nil
	})
}

// Durable defines the consumer name for JetStream durable subscribers.
// This function will return ErrInvalidConsumerName if the name contains
// any dot ".".
func ( string) SubOpt {
	return subOptFn(func( *subOpts) error {
		if .cfg.Durable != _EMPTY_ {
			return errors.New("nats: option Durable set more than once")
		}
		if .consumer != _EMPTY_ && .consumer !=  {
			return fmt.Errorf("nats: duplicate consumer names (%s and %s)", .consumer, )
		}
		if  := checkConsumerName();  != nil {
			return 
		}

		.cfg.Durable = 
		return nil
	})
}

// DeliverAll will configure a Consumer to receive all the
// messages from a Stream.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.DeliverPolicy = DeliverAllPolicy
		return nil
	})
}

// DeliverLast configures a Consumer to receive messages
// starting with the latest one.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.DeliverPolicy = DeliverLastPolicy
		return nil
	})
}

// DeliverLastPerSubject configures a Consumer to receive messages
// starting with the latest one for each filtered subject.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.DeliverPolicy = DeliverLastPerSubjectPolicy
		return nil
	})
}

// DeliverNew configures a Consumer to receive messages
// published after the subscription.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.DeliverPolicy = DeliverNewPolicy
		return nil
	})
}

// StartSequence configures a Consumer to receive
// messages from a start sequence.
func ( uint64) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.DeliverPolicy = DeliverByStartSequencePolicy
		.cfg.OptStartSeq = 
		return nil
	})
}

// StartTime configures a Consumer to receive
// messages from a start time.
func ( time.Time) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.DeliverPolicy = DeliverByStartTimePolicy
		.cfg.OptStartTime = &
		return nil
	})
}

// AckNone requires no acks for delivered messages.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.AckPolicy = AckNonePolicy
		return nil
	})
}

// AckAll when acking a sequence number, this implicitly acks all sequences
// below this one as well.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.AckPolicy = AckAllPolicy
		return nil
	})
}

// AckExplicit requires ack or nack for all messages.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.AckPolicy = AckExplicitPolicy
		return nil
	})
}

// MaxDeliver sets the number of redeliveries for a message.
func ( int) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.MaxDeliver = 
		return nil
	})
}

// MaxAckPending sets the number of outstanding acks that are allowed before
// message delivery is halted.
func ( int) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.MaxAckPending = 
		return nil
	})
}

// ReplayOriginal replays the messages at the original speed.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.ReplayPolicy = ReplayOriginalPolicy
		return nil
	})
}

// ReplayInstant replays the messages as fast as possible.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.ReplayPolicy = ReplayInstantPolicy
		return nil
	})
}

// RateLimit is the Bits per sec rate limit applied to a push consumer.
func ( uint64) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.RateLimit = 
		return nil
	})
}

// BackOff is an array of time durations that represent the time to delay based on delivery count.
func ( []time.Duration) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.BackOff = 
		return nil
	})
}

// BindStream binds a consumer to a stream explicitly based on a name.
// When a stream name is not specified, the library uses the subscribe
// subject as a way to find the stream name. It is done by making a request
// to the server to get list of stream names that have a filter for this
// subject. If the returned list contains a single stream, then this
// stream name will be used, otherwise the `ErrNoMatchingStream` is returned.
// To avoid the stream lookup, provide the stream name with this function.
// See also `Bind()`.
func ( string) SubOpt {
	return subOptFn(func( *subOpts) error {
		if .stream != _EMPTY_ && .stream !=  {
			return fmt.Errorf("nats: duplicate stream name (%s and %s)", .stream, )
		}

		.stream = 
		return nil
	})
}

// Bind binds a subscription to an existing consumer from a stream without attempting to create.
// The first argument is the stream name and the second argument will be the consumer name.
func (,  string) SubOpt {
	return subOptFn(func( *subOpts) error {
		if  == _EMPTY_ {
			return ErrStreamNameRequired
		}
		if  == _EMPTY_ {
			return ErrConsumerNameRequired
		}

		// In case of pull subscribers, the durable name is a required parameter
		// so check that they are not different.
		if .cfg.Durable != _EMPTY_ && .cfg.Durable !=  {
			return fmt.Errorf("nats: duplicate consumer names (%s and %s)", .cfg.Durable, )
		}
		if .stream != _EMPTY_ && .stream !=  {
			return fmt.Errorf("nats: duplicate stream name (%s and %s)", .stream, )
		}
		.stream = 
		.consumer = 
		.bound = true
		return nil
	})
}

// EnableFlowControl enables flow control for a push based consumer.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.FlowControl = true
		return nil
	})
}

// IdleHeartbeat enables push based consumers to have idle heartbeats delivered.
// For pull consumers, idle heartbeat has to be set on each [Fetch] call.
func ( time.Duration) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.Heartbeat = 
		return nil
	})
}

// DeliverSubject specifies the JetStream consumer deliver subject.
//
// This option is used only in situations where the consumer does not exist
// and a creation request is sent to the server. If not provided, an inbox
// will be selected.
// If a consumer exists, then the NATS subscription will be created on
// the JetStream consumer's DeliverSubject, not necessarily this subject.
func ( string) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.DeliverSubject = 
		return nil
	})
}

// HeadersOnly() will instruct the consumer to only deliver headers and no payloads.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.HeadersOnly = true
		return nil
	})
}

// MaxRequestBatch sets the maximum pull consumer batch size that a Fetch()
// can request.
func ( int) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.MaxRequestBatch = 
		return nil
	})
}

// MaxRequestExpires sets the maximum pull consumer request expiration that a
// Fetch() can request (using the Fetch's timeout value).
func ( time.Duration) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.MaxRequestExpires = 
		return nil
	})
}

// MaxRequesMaxBytes sets the maximum pull consumer request bytes that a
// Fetch() can receive.
func ( int) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.MaxRequestMaxBytes = 
		return nil
	})
}

// InactiveThreshold indicates how long the server should keep a consumer
// after detecting a lack of activity. In NATS Server 2.8.4 and earlier, this
// option only applies to ephemeral consumers. In NATS Server 2.9.0 and later,
// this option applies to both ephemeral and durable consumers, allowing durable
// consumers to also be deleted automatically after the inactivity threshold has
// passed.
func ( time.Duration) SubOpt {
	return subOptFn(func( *subOpts) error {
		if  < 0 {
			return fmt.Errorf("invalid InactiveThreshold value (%v), needs to be greater or equal to 0", )
		}
		.cfg.InactiveThreshold = 
		return nil
	})
}

// ConsumerReplicas sets the number of replica count for a consumer.
func ( int) SubOpt {
	return subOptFn(func( *subOpts) error {
		if  < 1 {
			return fmt.Errorf("invalid ConsumerReplicas value (%v), needs to be greater than 0", )
		}
		.cfg.Replicas = 
		return nil
	})
}

// ConsumerMemoryStorage sets the memory storage to true for a consumer.
func () SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.MemoryStorage = true
		return nil
	})
}

// ConsumerName sets the name for a consumer.
func ( string) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.Name = 
		return nil
	})
}

// ConsumerFilterSubjects can be used to set multiple subject filters on the consumer.
// It has to be used in conjunction with [nats.BindStream] and
// with empty 'subject' parameter.
func ( ...string) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.FilterSubjects = 
		return nil
	})
}

func ( *Subscription) () (*ConsumerInfo, error) {
	.mu.Lock()
	// TODO(dlc) - Better way to mark especially if we attach.
	if .jsi == nil || .jsi.consumer == _EMPTY_ {
		if .jsi.ordered {
			.mu.Unlock()
			return nil, ErrConsumerInfoOnOrderedReset
		}
		.mu.Unlock()
		return nil, ErrTypeSubscription
	}

	// Consumer info lookup should fail if in direct mode.
	 := .jsi.js
	,  := .jsi.stream, .jsi.consumer
	.mu.Unlock()

	return .getConsumerInfo(, )
}

type pullOpts struct {
	maxBytes int
	ttl      time.Duration
	ctx      context.Context
	hb       time.Duration
}

// PullOpt are the options that can be passed when pulling a batch of messages.
type PullOpt interface {
	configurePull(opts *pullOpts) error
}

// PullMaxWaiting defines the max inflight pull requests.
func ( int) SubOpt {
	return subOptFn(func( *subOpts) error {
		.cfg.MaxWaiting = 
		return nil
	})
}

type PullHeartbeat time.Duration

func ( PullHeartbeat) ( *pullOpts) error {
	if  <= 0 {
		return fmt.Errorf("%w: idle heartbeat has to be greater than 0", ErrInvalidArg)
	}
	.hb = time.Duration()
	return nil
}

// PullMaxBytes defines the max bytes allowed for a fetch request.
type PullMaxBytes int

func ( PullMaxBytes) ( *pullOpts) error {
	.maxBytes = int()
	return nil
}

var (
	// errNoMessages is an error that a Fetch request using no_wait can receive to signal
	// that there are no more messages available.
	errNoMessages = errors.New("nats: no messages")

	// errRequestsPending is an error that represents a sub.Fetch requests that was using
	// no_wait and expires time got discarded by the server.
	errRequestsPending = errors.New("nats: requests pending")
)

// Returns if the given message is a user message or not, and if
// `checkSts` is true, returns appropriate error based on the
// content of the status (404, etc..)
func checkMsg( *Msg, ,  bool) ( bool,  error) {
	// Assume user message
	 = true

	// If payload or no header, consider this a user message
	if len(.Data) > 0 || len(.Header) == 0 {
		return
	}
	// Look for status header
	 := .Header.Get(statusHdr)
	// If not present, then this is considered a user message
	if  == _EMPTY_ {
		return
	}
	// At this point, this is not a user message since there is
	// no payload and a "Status" header.
	 = false

	// If we don't care about status, we are done.
	if ! {
		return
	}

	// if it's a heartbeat message, report as not user msg
	if ,  := isJSControlMessage();  {
		return
	}
	switch  {
	case noResponders:
		 = ErrNoResponders
	case noMessagesSts:
		// 404 indicates that there are no messages.
		 = errNoMessages
	case reqTimeoutSts:
		// In case of a fetch request with no wait request and expires time,
		// need to skip 408 errors and retry.
		if  {
			 = errRequestsPending
		} else {
			// Older servers may send a 408 when a request in the server was expired
			// and interest is still found, which will be the case for our
			// implementation. Regardless, ignore 408 errors until receiving at least
			// one message when making requests without no_wait.
			 = ErrTimeout
		}
	case jetStream409Sts:
		if strings.Contains(strings.ToLower(.Header.Get(descrHdr)), "consumer deleted") {
			 = ErrConsumerDeleted
			break
		}

		if strings.Contains(strings.ToLower(.Header.Get(descrHdr)), "leadership change") {
			 = ErrConsumerLeadershipChanged
			break
		}
		fallthrough
	default:
		 = fmt.Errorf("nats: %s", .Header.Get(descrHdr))
	}
	return
}

// Fetch pulls a batch of messages from a stream for a pull consumer.
func ( *Subscription) ( int,  ...PullOpt) ([]*Msg, error) {
	if  == nil {
		return nil, ErrBadSubscription
	}
	if  < 1 {
		return nil, ErrInvalidArg
	}

	var  pullOpts
	for ,  := range  {
		if  := .configurePull(&);  != nil {
			return nil, 
		}
	}
	if .ctx != nil && .ttl != 0 {
		return nil, ErrContextAndTimeout
	}

	.mu.Lock()
	 := .jsi
	// Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription,
	// so check for jsi.pull boolean instead.
	if  == nil || !.pull {
		.mu.Unlock()
		return nil, ErrTypeSubscription
	}

	 := .conn
	 := .jsi.nms
	,  := newFetchInbox(.deliver)
	 := .jsi.js
	 := len(.mch) > 0

	// All fetch requests have an expiration, in case of no explicit expiration
	// then the default timeout of the JetStream context is used.
	 := .ttl
	if  == 0 {
		 = .opts.wait
	}
	.mu.Unlock()

	// Use the given context or setup a default one for the span
	// of the pull batch request.
	var (
		    = .ctx
		    error
		 context.CancelFunc
	)
	if  == nil {
		,  = context.WithTimeout(context.Background(), )
	} else if ,  := .Deadline(); ! {
		// Prevent from passing the background context which will just block
		// and cannot be canceled either.
		if ,  := .(ContextOpt);  && .Context == context.Background() {
			return nil, ErrNoDeadlineContext
		}

		// If the context did not have a deadline, then create a new child context
		// that will use the default timeout from the JS context.
		,  = context.WithTimeout(, )
	} else {
		,  = context.WithCancel()
	}
	defer ()

	// if heartbeat is set, validate it against the context timeout
	if .hb > 0 {
		,  := .Deadline()
		if 2*.hb >= time.Until() {
			return nil, fmt.Errorf("%w: idle heartbeat value too large", ErrInvalidArg)
		}
	}

	// Check if context not done already before making the request.
	select {
	case <-.Done():
		if .ctx != nil { // Timeout or Cancel triggered by context object option
			 = .Err()
		} else { // Timeout triggered by timeout option
			 = ErrTimeout
		}
	default:
	}
	if  != nil {
		return nil, 
	}

	var (
		 = make([]*Msg, 0, )
		  *Msg
	)
	for  && len() <  {
		// Check next msg with booleans that say that this is an internal call
		// for a pull subscribe (so don't reject it) and don't wait if there
		// are no messages.
		,  = .nextMsgWithContext(, true, false)
		if  != nil {
			if errors.Is(, errNoMessages) {
				 = nil
			}
			break
		}
		// Check msg but just to determine if this is a user message
		// or status message, however, we don't care about values of status
		// messages at this point in the Fetch() call, so checkMsg can't
		// return an error.
		if ,  := checkMsg(, false, false);  {
			 = append(, )
		}
	}
	var  *time.Timer
	defer func() {
		if  != nil {
			.Stop()
		}
	}()
	var  error
	.mu.Lock()
	 := .closed || .draining
	.mu.Unlock()
	if  {
		 = errors.Join(ErrBadSubscription, ErrSubscriptionClosed)
	}
	 := sync.Mutex{}
	var  atomic.Bool
	if  == nil && len() <  && ! {
		// For batch real size of 1, it does not make sense to set no_wait in
		// the request.
		 := -len() > 1

		var  nextRequest

		 := func() error {
			// The current deadline for the context will be used
			// to set the expires TTL for a fetch request.
			,  := .Deadline()
			 = time.Until()

			// Check if context has already been canceled or expired.
			select {
			case <-.Done():
				return .Err()
			default:
			}

			// Make our request expiration a bit shorter than the current timeout.
			 := time.Duration(float64() * 0.1)
			if  > 5*time.Second {
				 = 5 * time.Second
			}
			 :=  - 

			.Batch =  - len()
			.Expires = 
			.NoWait = 
			.MaxBytes = .maxBytes
			if 2*.hb <  {
				.Heartbeat = .hb
			} else {
				.Heartbeat = 0
			}
			,  := json.Marshal()
			if  := .PublishRequest(, , );  != nil {
				return 
			}
			if .hb > 0 {
				if  == nil {
					 = time.AfterFunc(2*.hb, func() {
						.Lock()
						 = ErrNoHeartbeat
						.Unlock()
						()
					})
				} else {
					.Reset(2 * .hb)
				}
			}
			return nil
		}
		 := .StatusChanged()
		go func() {
			select {
			case <-.Done():
			case <-:
				.Store(true)
				()
			}
			.RemoveStatusListener()
		}()
		 = ()
		for  == nil && len() <  {
			// Ask for next message and wait if there are no messages
			,  = .nextMsgWithContext(, true, true)
			if  == nil {
				if  != nil {
					.Reset(2 * .hb)
				}
				var  bool

				,  = checkMsg(, true, )
				if  == nil &&  {
					 = append(, )
				} else if  && (errors.Is(, errNoMessages) || errors.Is(, errRequestsPending)) && len() == 0 {
					// If we have a 404/408 for our "no_wait" request and have
					// not collected any message, then resend request to
					// wait this time.
					 = false
					 = ()
				} else if errors.Is(, ErrTimeout) && len() == 0 {
					// If we get a 408, we will bail if we already collected some
					// messages, otherwise ignore and go back calling nextMsg.
					 = nil
				}
			}
		}
	}
	// If there is at least a message added to msgs, then need to return OK and no error
	if  != nil && len() == 0 {
		.Lock()
		defer .Unlock()
		if  != nil {
			return nil, 
		}
		if .Load() {
			return nil, ErrFetchDisconnected
		}
		return nil, .checkCtxErr()
	}
	return , nil
}

// newFetchInbox returns subject used as reply subject when sending pull requests
// as well as request ID. For non-wildcard subject, request ID is empty and
// passed subject is not transformed
func newFetchInbox( string) (string, string) {
	if !strings.HasSuffix(, ".*") {
		return , ""
	}
	 := nuid.Next()
	var  strings.Builder
	.WriteString([:len()-1])
	.WriteString()
	return .String(), 
}

func subjectMatchesReqID(,  string) bool {
	 := strings.Split(, ".")
	if len() < 2 {
		return false
	}
	return [len()-1] == 
}

// MessageBatch provides methods to retrieve messages consumed using [Subscribe.FetchBatch].
type MessageBatch interface {
	// Messages returns a channel on which messages will be published.
	Messages() <-chan *Msg

	// Error returns an error encountered when fetching messages.
	Error() error

	// Done signals end of execution.
	Done() <-chan struct{}
}

type messageBatch struct {
	sync.Mutex
	msgs chan *Msg
	err  error
	done chan struct{}
}

func ( *messageBatch) () <-chan *Msg {
	.Lock()
	defer .Unlock()
	return .msgs
}

func ( *messageBatch) () error {
	.Lock()
	defer .Unlock()
	return .err
}

func ( *messageBatch) () <-chan struct{} {
	.Lock()
	defer .Unlock()
	return .done
}

// FetchBatch pulls a batch of messages from a stream for a pull consumer.
// Unlike [Subscription.Fetch], it is non blocking and returns [MessageBatch],
// allowing to retrieve incoming messages from a channel.
// The returned channel is always closed after all messages for a batch have been
// delivered by the server - it is safe to iterate over it using range.
//
// To avoid using default JetStream timeout as fetch expiry time, use [nats.MaxWait]
// or [nats.Context] (with deadline set).
//
// This method will not return error in case of pull request expiry (even if there are no messages).
// Any other error encountered when receiving messages will cause FetchBatch to stop receiving new messages.
func ( *Subscription) ( int,  ...PullOpt) (MessageBatch, error) {
	if  == nil {
		return nil, ErrBadSubscription
	}
	if  < 1 {
		return nil, ErrInvalidArg
	}

	var  pullOpts
	for ,  := range  {
		if  := .configurePull(&);  != nil {
			return nil, 
		}
	}
	if .ctx != nil && .ttl != 0 {
		return nil, ErrContextAndTimeout
	}
	.mu.Lock()
	 := .jsi
	// Reject if this is not a pull subscription. Note that sub.typ is SyncSubscription,
	// so check for jsi.pull boolean instead.
	if  == nil || !.pull {
		.mu.Unlock()
		return nil, ErrTypeSubscription
	}

	 := .conn
	 := .jsi.nms
	,  := newFetchInbox(.jsi.deliver)
	 := .jsi.js
	 := len(.mch) > 0

	// All fetch requests have an expiration, in case of no explicit expiration
	// then the default timeout of the JetStream context is used.
	 := .ttl
	if  == 0 {
		 = .opts.wait
	}
	.mu.Unlock()

	// Use the given context or setup a default one for the span
	// of the pull batch request.
	var (
		           = .ctx
		        context.CancelFunc
		 = true
	)
	if  == nil {
		,  = context.WithTimeout(context.Background(), )
	} else if ,  := .Deadline(); ! {
		// Prevent from passing the background context which will just block
		// and cannot be canceled either.
		if ,  := .(ContextOpt);  && .Context == context.Background() {
			return nil, ErrNoDeadlineContext
		}

		// If the context did not have a deadline, then create a new child context
		// that will use the default timeout from the JS context.
		,  = context.WithTimeout(, )
	} else {
		,  = context.WithCancel()
	}
	defer func() {
		// only cancel the context here if we are sure the fetching goroutine has not been started yet
		if  {
			()
		}
	}()

	// if heartbeat is set, validate it against the context timeout
	if .hb > 0 {
		,  := .Deadline()
		if 2*.hb >= time.Until() {
			return nil, fmt.Errorf("%w: idle heartbeat value too large", ErrInvalidArg)
		}
	}

	// Check if context not done already before making the request.
	select {
	case <-.Done():
		if .ctx != nil { // Timeout or Cancel triggered by context object option
			return nil, .Err()
		} else { // Timeout triggered by timeout option
			return nil, ErrTimeout
		}
	default:
	}

	 := &messageBatch{
		msgs: make(chan *Msg, ),
		done: make(chan struct{}, 1),
	}
	var  *Msg
	for  && len(.msgs) <  {
		// Check next msg with booleans that say that this is an internal call
		// for a pull subscribe (so don't reject it) and don't wait if there
		// are no messages.
		,  := .nextMsgWithContext(, true, false)
		if  != nil {
			if errors.Is(, errNoMessages) {
				 = nil
			}
			.err = 
			break
		}
		// Check msg but just to determine if this is a user message
		// or status message, however, we don't care about values of status
		// messages at this point in the Fetch() call, so checkMsg can't
		// return an error.
		if ,  := checkMsg(, false, false);  {
			.msgs <- 
		}
	}
	.mu.Lock()
	 := .closed || .draining
	.mu.Unlock()
	if len(.msgs) ==  || .err != nil ||  {
		close(.msgs)
		if  && len(.msgs) == 0 {
			return nil, errors.Join(ErrBadSubscription, ErrSubscriptionClosed)
		}
		.done <- struct{}{}
		return , nil
	}

	,  := .Deadline()
	 = time.Until()

	// Make our request expiration a bit shorter than the current timeout.
	 := time.Duration(float64() * 0.1)
	if  > 5*time.Second {
		 = 5 * time.Second
	}
	 :=  - 

	 := .StatusChanged()
	var  atomic.Bool
	go func() {
		select {
		case <-.Done():
		case <-:
			.Store(true)
			()
		}
		.RemoveStatusListener()
	}()
	 :=  - len(.msgs)
	 := nextRequest{
		Expires:   ,
		Batch:     ,
		MaxBytes:  .maxBytes,
		Heartbeat: .hb,
	}
	,  := json.Marshal()
	if  != nil {
		close(.msgs)
		.done <- struct{}{}
		.err = 
		return , nil
	}
	if  := .PublishRequest(, , );  != nil {
		if len(.msgs) == 0 {
			return nil, 
		}
		close(.msgs)
		.done <- struct{}{}
		.err = 
		return , nil
	}
	var  *time.Timer
	defer func() {
		if  != nil {
			.Stop()
		}
	}()
	var  error
	if .hb > 0 {
		 = time.AfterFunc(2*.hb, func() {
			.Lock()
			 = ErrNoHeartbeat
			.Unlock()
			()
		})
	}
	 = false
	go func() {
		defer ()
		var  int
		for  <  {
			// Ask for next message and wait if there are no messages
			,  = .nextMsgWithContext(, true, true)
			if  != nil {
				break
			}
			if  != nil {
				.Reset(2 * .hb)
			}
			var  bool

			,  = checkMsg(, true, false)
			if  != nil {
				if errors.Is(, ErrTimeout) {
					if  != "" && !subjectMatchesReqID(.Subject, ) {
						// ignore timeout message from server if it comes from a different pull request
						continue
					}
					 = nil
				}
				break
			}
			if  {
				.Lock()
				.msgs <- 
				.Unlock()
				++
			}
		}
		if  != nil {
			.Lock()
			if  != nil {
				.err = 
			} else if .Load() {
				.err = ErrFetchDisconnected
			} else {
				.err = .checkCtxErr()
			}
			.Unlock()
		}
		close(.msgs)
		.Lock()
		.done <- struct{}{}
		.Unlock()
	}()
	return , nil
}

// checkCtxErr is used to determine whether ErrTimeout should be returned in case of context timeout
func ( *pullOpts) ( error) error {
	if .ctx == nil && errors.Is(, context.DeadlineExceeded) {
		return ErrTimeout
	}
	return 
}

func ( *js) (,  string) (*ConsumerInfo, error) {
	,  := context.WithTimeout(context.Background(), .opts.wait)
	defer ()
	return .getConsumerInfoContext(, , )
}

func ( *js) ( context.Context, ,  string) (*ConsumerInfo, error) {
	 := fmt.Sprintf(apiConsumerInfoT, , )
	,  := .apiRequestWithContext(, .apiSubj(), nil)
	if  != nil {
		if errors.Is(, ErrNoResponders) {
			 = ErrJetStreamNotEnabled
		}
		return nil, 
	}

	var  consumerResponse
	if  := json.Unmarshal(.Data, &);  != nil {
		return nil, 
	}
	if .Error != nil {
		if errors.Is(.Error, ErrConsumerNotFound) {
			return nil, ErrConsumerNotFound
		}
		if errors.Is(.Error, ErrStreamNotFound) {
			return nil, ErrStreamNotFound
		}
		return nil, .Error
	}
	if .Error == nil && .ConsumerInfo == nil {
		return nil, ErrConsumerNotFound
	}
	return .ConsumerInfo, nil
}

// a RequestWithContext with tracing via TraceCB
func ( *js) ( context.Context,  string,  []byte) (*Msg, error) {
	if .opts.shouldTrace {
		 := .opts.ctrace
		if .RequestSent != nil {
			.RequestSent(, )
		}
	}
	,  := .nc.RequestWithContext(, , )
	if  != nil {
		return nil, 
	}
	if .opts.shouldTrace {
		 := .opts.ctrace
		if .ResponseReceived != nil {
			.ResponseReceived(, .Data, .Header)
		}
	}

	return , nil
}

func ( *Msg) () error {
	if  == nil || .Sub == nil {
		return ErrMsgNotBound
	}
	if .Reply == _EMPTY_ {
		return ErrMsgNoReply
	}
	return nil
}

// ackReply handles all acks. Will do the right thing for pull and sync mode.
// It ensures that an ack is only sent a single time, regardless of
// how many times it is being called to avoid duplicated acks.
func ( *Msg) ( []byte,  bool,  ...AckOpt) error {
	var  ackOpts
	for ,  := range  {
		if  := .configureAck(&);  != nil {
			return 
		}
	}

	if  := .checkReply();  != nil {
		return 
	}

	var  bool
	var  *js

	 := .Sub
	.mu.Lock()
	 := .conn
	if  := .jsi;  != nil {
		 = .js
		 = .ackNone
	}
	.mu.Unlock()

	// Skip if already acked.
	if atomic.LoadUint32(&.ackd) == 1 {
		return ErrMsgAlreadyAckd
	}
	if  {
		return ErrCantAckIfConsumerAckNone
	}

	 := .ctx != nil
	 := .ttl > 0

	// Only allow either AckWait or Context option to set the timeout.
	if  &&  {
		return ErrContextAndTimeout
	}

	 =  ||  || 
	 := .ctx
	 := defaultRequestWait
	if  {
		 = .ttl
	} else if  != nil {
		 = .opts.wait
	}

	var  []byte
	var  error
	// This will be > 0 only when called from NakWithDelay()
	if .nakDelay > 0 {
		 = []byte(fmt.Sprintf("%s {\"delay\": %d}", , .nakDelay.Nanoseconds()))
	} else {
		 = 
	}

	if  {
		if  {
			_,  = .RequestWithContext(, .Reply, )
		} else {
			_,  = .Request(.Reply, , )
		}
	} else {
		 = .Publish(.Reply, )
	}

	// Mark that the message has been acked unless it is ackProgress
	// which can be sent many times.
	if  == nil && !bytes.Equal(, ackProgress) {
		atomic.StoreUint32(&.ackd, 1)
	}

	return 
}

// Ack acknowledges a message. This tells the server that the message was
// successfully processed and it can move on to the next message.
func ( *Msg) ( ...AckOpt) error {
	return .ackReply(ackAck, false, ...)
}

// AckSync is the synchronous version of Ack. This indicates successful message
// processing.
func ( *Msg) ( ...AckOpt) error {
	return .ackReply(ackAck, true, ...)
}

// Nak negatively acknowledges a message. This tells the server to redeliver
// the message. You can configure the number of redeliveries by passing
// nats.MaxDeliver when you Subscribe. The default is infinite redeliveries.
func ( *Msg) ( ...AckOpt) error {
	return .ackReply(ackNak, false, ...)
}

// Nak negatively acknowledges a message. This tells the server to redeliver
// the message after the give `delay` duration. You can configure the number
// of redeliveries by passing nats.MaxDeliver when you Subscribe.
// The default is infinite redeliveries.
func ( *Msg) ( time.Duration,  ...AckOpt) error {
	if  > 0 {
		 = append(, nakDelay())
	}
	return .ackReply(ackNak, false, ...)
}

// Term tells the server to not redeliver this message, regardless of the value
// of nats.MaxDeliver.
func ( *Msg) ( ...AckOpt) error {
	return .ackReply(ackTerm, false, ...)
}

// InProgress tells the server that this message is being worked on. It resets
// the redelivery timer on the server.
func ( *Msg) ( ...AckOpt) error {
	return .ackReply(ackProgress, false, ...)
}

// MsgMetadata is the JetStream metadata associated with received messages.
type MsgMetadata struct {
	Sequence     SequencePair
	NumDelivered uint64
	NumPending   uint64
	Timestamp    time.Time
	Stream       string
	Consumer     string
	Domain       string
}

// Metadata retrieves the metadata from a JetStream message. This method will
// return an error for non-JetStream Msgs.
func ( *Msg) () (*MsgMetadata, error) {
	if  := .checkReply();  != nil {
		return nil, 
	}

	,  := parser.GetMetadataFields(.Reply)
	if  != nil {
		return nil, 
	}

	 := &MsgMetadata{
		Domain:       [parser.AckDomainTokenPos],
		NumDelivered: parser.ParseNum([parser.AckNumDeliveredTokenPos]),
		NumPending:   parser.ParseNum([parser.AckNumPendingTokenPos]),
		Timestamp:    time.Unix(0, int64(parser.ParseNum([parser.AckTimestampSeqTokenPos]))),
		Stream:       [parser.AckStreamTokenPos],
		Consumer:     [parser.AckConsumerTokenPos],
	}
	.Sequence.Stream = parser.ParseNum([parser.AckStreamSeqTokenPos])
	.Sequence.Consumer = parser.ParseNum([parser.AckConsumerSeqTokenPos])
	return , nil
}

// AckPolicy determines how the consumer should acknowledge delivered messages.
type AckPolicy int

const (
	// AckNonePolicy requires no acks for delivered messages.
	AckNonePolicy AckPolicy = iota

	// AckAllPolicy when acking a sequence number, this implicitly acks all
	// sequences below this one as well.
	AckAllPolicy

	// AckExplicitPolicy requires ack or nack for all messages.
	AckExplicitPolicy

	// For configuration mismatch check
	ackPolicyNotSet = 99
)

func jsonString( string) string {
	return "\"" +  + "\""
}

func ( *AckPolicy) ( []byte) error {
	switch string() {
	case jsonString("none"):
		* = AckNonePolicy
	case jsonString("all"):
		* = AckAllPolicy
	case jsonString("explicit"):
		* = AckExplicitPolicy
	default:
		return fmt.Errorf("nats: can not unmarshal %q", )
	}

	return nil
}

func ( AckPolicy) () ([]byte, error) {
	switch  {
	case AckNonePolicy:
		return json.Marshal("none")
	case AckAllPolicy:
		return json.Marshal("all")
	case AckExplicitPolicy:
		return json.Marshal("explicit")
	default:
		return nil, fmt.Errorf("nats: unknown acknowledgement policy %v", )
	}
}

func ( AckPolicy) () string {
	switch  {
	case AckNonePolicy:
		return "AckNone"
	case AckAllPolicy:
		return "AckAll"
	case AckExplicitPolicy:
		return "AckExplicit"
	case ackPolicyNotSet:
		return "Not Initialized"
	default:
		return "Unknown AckPolicy"
	}
}

// ReplayPolicy determines how the consumer should replay messages it already has queued in the stream.
type ReplayPolicy int

const (
	// ReplayInstantPolicy will replay messages as fast as possible.
	ReplayInstantPolicy ReplayPolicy = iota

	// ReplayOriginalPolicy will maintain the same timing as the messages were received.
	ReplayOriginalPolicy

	// For configuration mismatch check
	replayPolicyNotSet = 99
)

func ( *ReplayPolicy) ( []byte) error {
	switch string() {
	case jsonString("instant"):
		* = ReplayInstantPolicy
	case jsonString("original"):
		* = ReplayOriginalPolicy
	default:
		return fmt.Errorf("nats: can not unmarshal %q", )
	}

	return nil
}

func ( ReplayPolicy) () ([]byte, error) {
	switch  {
	case ReplayOriginalPolicy:
		return json.Marshal("original")
	case ReplayInstantPolicy:
		return json.Marshal("instant")
	default:
		return nil, fmt.Errorf("nats: unknown replay policy %v", )
	}
}

var (
	ackAck      = []byte("+ACK")
	ackNak      = []byte("-NAK")
	ackProgress = []byte("+WPI")
	ackTerm     = []byte("+TERM")
)

// DeliverPolicy determines how the consumer should select the first message to deliver.
type DeliverPolicy int

const (
	// DeliverAllPolicy starts delivering messages from the very beginning of a
	// stream. This is the default.
	DeliverAllPolicy DeliverPolicy = iota

	// DeliverLastPolicy will start the consumer with the last sequence
	// received.
	DeliverLastPolicy

	// DeliverNewPolicy will only deliver new messages that are sent after the
	// consumer is created.
	DeliverNewPolicy

	// DeliverByStartSequencePolicy will deliver messages starting from a given
	// sequence.
	DeliverByStartSequencePolicy

	// DeliverByStartTimePolicy will deliver messages starting from a given
	// time.
	DeliverByStartTimePolicy

	// DeliverLastPerSubjectPolicy will start the consumer with the last message
	// for all subjects received.
	DeliverLastPerSubjectPolicy

	// For configuration mismatch check
	deliverPolicyNotSet = 99
)

func ( *DeliverPolicy) ( []byte) error {
	switch string() {
	case jsonString("all"), jsonString("undefined"):
		* = DeliverAllPolicy
	case jsonString("last"):
		* = DeliverLastPolicy
	case jsonString("new"):
		* = DeliverNewPolicy
	case jsonString("by_start_sequence"):
		* = DeliverByStartSequencePolicy
	case jsonString("by_start_time"):
		* = DeliverByStartTimePolicy
	case jsonString("last_per_subject"):
		* = DeliverLastPerSubjectPolicy
	}

	return nil
}

func ( DeliverPolicy) () ([]byte, error) {
	switch  {
	case DeliverAllPolicy:
		return json.Marshal("all")
	case DeliverLastPolicy:
		return json.Marshal("last")
	case DeliverNewPolicy:
		return json.Marshal("new")
	case DeliverByStartSequencePolicy:
		return json.Marshal("by_start_sequence")
	case DeliverByStartTimePolicy:
		return json.Marshal("by_start_time")
	case DeliverLastPerSubjectPolicy:
		return json.Marshal("last_per_subject")
	default:
		return nil, fmt.Errorf("nats: unknown deliver policy %v", )
	}
}

// RetentionPolicy determines how messages in a set are retained.
type RetentionPolicy int

const (
	// LimitsPolicy (default) means that messages are retained until any given limit is reached.
	// This could be one of MaxMsgs, MaxBytes, or MaxAge.
	LimitsPolicy RetentionPolicy = iota
	// InterestPolicy specifies that when all known observables have acknowledged a message it can be removed.
	InterestPolicy
	// WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
	WorkQueuePolicy
)

// DiscardPolicy determines how to proceed when limits of messages or bytes are
// reached.
type DiscardPolicy int

const (
	// DiscardOld will remove older messages to return to the limits. This is
	// the default.
	DiscardOld DiscardPolicy = iota
	//DiscardNew will fail to store new messages.
	DiscardNew
)

const (
	limitsPolicyString    = "limits"
	interestPolicyString  = "interest"
	workQueuePolicyString = "workqueue"
)

func ( RetentionPolicy) () string {
	switch  {
	case LimitsPolicy:
		return "Limits"
	case InterestPolicy:
		return "Interest"
	case WorkQueuePolicy:
		return "WorkQueue"
	default:
		return "Unknown Retention Policy"
	}
}

func ( RetentionPolicy) () ([]byte, error) {
	switch  {
	case LimitsPolicy:
		return json.Marshal(limitsPolicyString)
	case InterestPolicy:
		return json.Marshal(interestPolicyString)
	case WorkQueuePolicy:
		return json.Marshal(workQueuePolicyString)
	default:
		return nil, fmt.Errorf("nats: can not marshal %v", )
	}
}

func ( *RetentionPolicy) ( []byte) error {
	switch string() {
	case jsonString(limitsPolicyString):
		* = LimitsPolicy
	case jsonString(interestPolicyString):
		* = InterestPolicy
	case jsonString(workQueuePolicyString):
		* = WorkQueuePolicy
	default:
		return fmt.Errorf("nats: can not unmarshal %q", )
	}
	return nil
}

func ( DiscardPolicy) () string {
	switch  {
	case DiscardOld:
		return "DiscardOld"
	case DiscardNew:
		return "DiscardNew"
	default:
		return "Unknown Discard Policy"
	}
}

func ( DiscardPolicy) () ([]byte, error) {
	switch  {
	case DiscardOld:
		return json.Marshal("old")
	case DiscardNew:
		return json.Marshal("new")
	default:
		return nil, fmt.Errorf("nats: can not marshal %v", )
	}
}

func ( *DiscardPolicy) ( []byte) error {
	switch strings.ToLower(string()) {
	case jsonString("old"):
		* = DiscardOld
	case jsonString("new"):
		* = DiscardNew
	default:
		return fmt.Errorf("nats: can not unmarshal %q", )
	}
	return nil
}

// StorageType determines how messages are stored for retention.
type StorageType int

const (
	// FileStorage specifies on disk storage. It's the default.
	FileStorage StorageType = iota
	// MemoryStorage specifies in memory only.
	MemoryStorage
)

const (
	memoryStorageString = "memory"
	fileStorageString   = "file"
)

func ( StorageType) () string {
	switch  {
	case MemoryStorage:
		return "Memory"
	case FileStorage:
		return "File"
	default:
		return "Unknown Storage Type"
	}
}

func ( StorageType) () ([]byte, error) {
	switch  {
	case MemoryStorage:
		return json.Marshal(memoryStorageString)
	case FileStorage:
		return json.Marshal(fileStorageString)
	default:
		return nil, fmt.Errorf("nats: can not marshal %v", )
	}
}

func ( *StorageType) ( []byte) error {
	switch string() {
	case jsonString(memoryStorageString):
		* = MemoryStorage
	case jsonString(fileStorageString):
		* = FileStorage
	default:
		return fmt.Errorf("nats: can not unmarshal %q", )
	}
	return nil
}

type StoreCompression uint8

const (
	NoCompression StoreCompression = iota
	S2Compression
)

func ( StoreCompression) () string {
	switch  {
	case NoCompression:
		return "None"
	case S2Compression:
		return "S2"
	default:
		return "Unknown StoreCompression"
	}
}

func ( StoreCompression) () ([]byte, error) {
	var  string
	switch  {
	case S2Compression:
		 = "s2"
	case NoCompression:
		 = "none"
	default:
		return nil, errors.New("unknown compression algorithm")
	}
	return json.Marshal()
}

func ( *StoreCompression) ( []byte) error {
	var  string
	if  := json.Unmarshal(, &);  != nil {
		return 
	}
	switch  {
	case "s2":
		* = S2Compression
	case "none":
		* = NoCompression
	default:
		return errors.New("unknown compression algorithm")
	}
	return nil
}

// Length of our hash used for named consumers.
const nameHashLen = 8

// Computes a hash for the given `name`.
func getHash( string) string {
	 := sha256.New()
	.Write([]byte())
	 := .Sum(nil)
	for  := 0;  < nameHashLen; ++ {
		[] = rdigits[int([]%base)]
	}
	return string([:nameHashLen])
}