AckWait sets the maximum amount of time we will wait for an ack.
AckWait : AckOpt
AckWait : PubOpt
AckWait : SubOpt
APIError is included in all API responses if there was an error.CodeintDescriptionstringErrorCodeErrorCode APIError implements the JetStreamError interface. Error prints the JetStream API error code and description Is matches against an APIError.
*APIError : JetStreamError
*APIError : error
func (*APIError).APIError() *APIError
func JetStreamError.APIError() *APIError
A Conn represents a bare connection to a nats-server.
It can send and receive []byte payloads.
The connection is safe to use in multiple Go routines concurrently. Opts holds the configuration of the Conn.
Modifying the configuration of a running Conn is a race. Keep all members for which we use atomic at the beginning of the
struct and make sure they are all 64bits (or use padding if necessary).
atomic.* functions crash on 32bit machines if operand is not aligned
at 64bit. See https://github.com/golang/go/issues/599Statistics.InBytesuint64Statistics.InMsgsuint64Statistics.OutBytesuint64Statistics.OutMsgsuint64Statistics.Reconnectsuint64 AuthRequired will return if the connected server requires authorization. Barrier schedules the given function `f` to all registered asynchronous
subscriptions.
Only the last subscription to see this barrier will invoke the function.
If no subscription is registered at the time of this call, `f()` is invoked
right away.
ErrConnectionClosed is returned if the connection is closed prior to
the call. Buffered will return the number of bytes buffered to be sent to the server.
FIXME(dlc) take into account disconnected state. ChanQueueSubscribe will express interest in the given subject.
All subscribers with the same queue name will form the queue group
and only one member of the group will be selected to receive any given message,
which will be placed on the channel.
You should not close the channel until sub.Unsubscribe() has been called.
Note: This is the same than QueueSubscribeSyncWithChan. ChanSubscribe will express interest in the given subject and place
all messages received on the channel.
You should not close the channel until sub.Unsubscribe() has been called. Close will close the connection to the server. This call will release
all blocking calls, such as Flush() and NextMsg() ClosedHandler will return the closed event handler. ConnectedAddr returns the connected server's IP ConnectedClusterName reports the connected server's cluster name if any ConnectedServerId reports the connected server's Id ConnectedServerName reports the connected server's name ConnectedServerVersion reports the connected server's version as a string ConnectedUrl reports the connected server's URL ConnectedUrlRedacted reports the connected server's URL with passwords redacted DisconnectErrHandler will return the disconnect event handler. DiscoveredServers returns only the server urls that have been discovered
after a connection has been established. If authentication is enabled,
use UserInfo or Token when connecting with these urls. DiscoveredServersHandler will return the discovered servers handler. Drain will put a connection into a drain state. All subscriptions will
immediately be put into a drain state. Upon completion, the publishers
will be drained and can not publish any additional messages. Upon draining
of the publishers, the connection will be closed. Use the ClosedCB
option to know when the connection has moved from draining to closed.
See note in Subscription.Drain for JetStream subscriptions. ErrorHandler will return the async error handler. Flush will perform a round trip to the server and return when it
receives the internal reply. FlushTimeout allows a Flush operation to have an associated timeout. FlushWithContext will allow a context to control the duration
of a Flush() call. This context should be non-nil and should
have a deadline set. We will return an error if none is present. ForceReconnect forces a reconnect attempt to the server.
This is a non-blocking call and will start the reconnect
process without waiting for it to complete.
If the connection is already in the process of reconnecting,
this call will force an immediate reconnect attempt (bypassing
the current reconnect delay). GetClientID returns the client ID assigned by the server to which
the client is currently connected to. Note that the value may change if
the client reconnects.
This function returns ErrClientIDNotSupported if the server is of a
version prior to 1.2.0. GetClientIP returns the client IP as known by the server.
Supported as of server version 2.1.6. HeadersSupported will return if the server supports headers IsClosed tests if a Conn has been closed. IsConnected tests if a Conn is connected. IsDraining tests if a Conn is in the draining state. IsReconnecting tests if a Conn is reconnecting. JetStream returns a JetStreamContext for messaging and stream management.
Errors are only returned if inconsistent options are provided.
NOTE: JetStreamContext is part of legacy API.
Users are encouraged to switch to the new JetStream API for enhanced capabilities and
simplified API. Please refer to the `jetstream` package.
See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md LastError reports the last error encountered via the connection.
It can be used reliably within ClosedCB in order to find out reason
why connection was closed for example. MaxPayload returns the size limit that a message payload can have.
This is set by the server configuration and delivered to the client
upon connect. Create a new inbox that is prefix aware. NewRespInbox is the new format used for _INBOX. NumSubscriptions returns active number of subscriptions. Publish publishes the data argument to the given subject. The data
argument is left untouched and needs to be correctly interpreted on
the receiver. PublishMsg publishes the Msg structure, which includes the
Subject, an optional Reply and an optional Data field. PublishRequest will perform a Publish() expecting a response on the
reply subject. Use Request() for automatically waiting for a response
inline. QueueSubscribe creates an asynchronous queue subscriber on the given subject.
All subscribers with the same queue name will form the queue group and
only one member of the group will be selected to receive any given
message asynchronously. QueueSubscribeSync creates a synchronous queue subscriber on the given
subject. All subscribers with the same queue name will form the queue
group and only one member of the group will be selected to receive any
given message synchronously using Subscription.NextMsg(). QueueSubscribeSyncWithChan will express interest in the given subject.
All subscribers with the same queue name will form the queue group
and only one member of the group will be selected to receive any given message,
which will be placed on the channel.
You should not close the channel until sub.Unsubscribe() has been called.
Note: This is the same than ChanQueueSubscribe. RTT calculates the round trip time between this client and the server. ReconnectHandler will return the reconnect event handler. RemoveStatusListener removes a status change listener.
If the channel is not closed, it will be closed.
Listeners will be removed automatically on status change
as well, but this is a way to remove them manually. Request will send a request payload and deliver the response message,
or an error, including a timeout if no message was received properly. RequestMsg will send a request payload including optional headers and deliver
the response message, or an error, including a timeout if no message was received properly. RequestMsgWithContext takes a context, a subject and payload
in bytes and request expecting a single response. RequestWithContext takes a context, a subject and payload
in bytes and request expecting a single response. Servers returns the list of known server urls, including additional
servers discovered after a connection has been established. If
authentication is enabled, use UserInfo or Token when connecting with
these urls. SetClosedHandler will set the closed event handler. SetDisconnectErrHandler will set the disconnect event handler. SetDisconnectHandler will set the disconnect event handler.
Deprecated: Use SetDisconnectErrHandler SetDiscoveredServersHandler will set the discovered servers handler. SetErrorHandler will set the async error handler. SetReconnectHandler will set the reconnect event handler. Stats will return a race safe copy of the Statistics section for the connection. Status returns the current state of the connection. StatusChanged returns a channel on which given list of connection status changes will be reported.
If no statuses are provided, defaults will be used: CONNECTED, RECONNECTING, DISCONNECTED, CLOSED. Subscribe will express interest in the given subject. The subject
can have wildcards.
There are two type of wildcards: * for partial, and > for full.
A subscription on subject time.*.east would receive messages sent to time.us.east and time.eu.east.
A subscription on subject time.us.> would receive messages sent to
time.us.east and time.us.east.atlanta, while time.us.* would only match time.us.east
since it can't match more than one token.
Messages will be delivered to the associated MsgHandler. SubscribeSync will express interest on the given subject. Messages will
be received synchronously using Subscription.NextMsg(). TLSConnectionState retrieves the state of the TLS connection to the server TLSRequired will return if the connected server requires TLS connections.
*Conn : github.com/apache/thrift/lib/go/thrift.Flusher
func Connect(url string, options ...Option) (*Conn, error)
func Options.Connect() (*Conn, error)
func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error)
func github.com/pancsta/asyncmachine-go/pkg/integrations/nats.Add(ctx context.Context, nc *Conn, topic, machID string, states am.S, args am.A) (am.Result, error)
func github.com/pancsta/asyncmachine-go/pkg/integrations/nats.ExposeMachine(ctx context.Context, mach am.Api, nc *Conn, topic, queue string) error
func github.com/pancsta/asyncmachine-go/pkg/integrations/nats.Remove(ctx context.Context, nc *Conn, machID, topic string, states am.S, args am.A) (am.Result, error)
ContextOpt is an option used to set a context.Context.Contextcontext.Context Deadline returns the time when work done on behalf of this context
should be canceled. Deadline returns ok==false when no deadline is
set. Successive calls to Deadline return the same results. Done returns a channel that's closed when work done on behalf of this
context should be canceled. Done may return nil if this context can
never be canceled. Successive calls to Done return the same value.
The close of the Done channel may happen asynchronously,
after the cancel function returns.
WithCancel arranges for Done to be closed when cancel is called;
WithDeadline arranges for Done to be closed when the deadline
expires; WithTimeout arranges for Done to be closed when the timeout
elapses.
Done is provided for use in select statements:
// Stream generates values with DoSomething and sends them to out
// until DoSomething returns an error or ctx.Done is closed.
func Stream(ctx context.Context, out chan<- Value) error {
for {
v, err := DoSomething(ctx)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case out <- v:
}
}
}
See https://blog.golang.org/pipelines for more examples of how to use
a Done channel for cancellation. If Done is not yet closed, Err returns nil.
If Done is closed, Err returns a non-nil error explaining why:
DeadlineExceeded if the context's deadline passed,
or Canceled if the context was canceled for some other reason.
After Err returns a non-nil error, successive calls to Err return the same error. Value returns the value associated with this context for key, or nil
if no value is associated with key. Successive calls to Value with
the same key returns the same result.
Use context values only for request-scoped data that transits
processes and API boundaries, not for passing optional parameters to
functions.
A key identifies a specific value in a Context. Functions that wish
to store values in Context typically allocate a key in a global
variable then use that key as the argument to context.WithValue and
Context.Value. A key can be any type that supports equality;
packages should define keys as an unexported type to avoid
collisions.
Packages that define a Context key should provide type-safe accessors
for the values stored using that key:
// Package user defines a User type that's stored in Contexts.
package user
import "context"
// User is the type of value stored in the Contexts.
type User struct {...}
// key is an unexported type for keys defined in this package.
// This prevents collisions with keys defined in other packages.
type key int
// userKey is the key for user.User values in Contexts. It is
// unexported; clients use user.NewContext and user.FromContext
// instead of using this key directly.
var userKey key
// NewContext returns a new Context that carries value u.
func NewContext(ctx context.Context, u *User) context.Context {
return context.WithValue(ctx, userKey, u)
}
// FromContext returns the User value stored in ctx, if any.
func FromContext(ctx context.Context) (*User, bool) {
u, ok := ctx.Value(userKey).(*User)
return u, ok
}
ContextOpt : AckOpt
ContextOpt : GetObjectInfoOpt
ContextOpt : GetObjectOpt
ContextOpt : JSOpt
ContextOpt : ListObjectsOpt
ContextOpt : ObjectOpt
ContextOpt : PubOpt
ContextOpt : PullOpt
ContextOpt : PurgeOpt
ContextOpt : SubOpt
ContextOpt : WatchOpt
ContextOpt : context.Context
func Context(ctx context.Context) ContextOpt
CustomDialer can be used to specify any dialer, not necessarily a
*net.Dialer. A CustomDialer may also implement `SkipTLSHandshake() bool`
in order to skip the TLS handshake in case not required.( CustomDialer) Dial(network, address string) (net.Conn, error)
github.com/pion/transport/v2.Dialer(interface)
github.com/pion/transport/v2.Net(interface)
*github.com/pion/transport/v2/stdnet.Net
github.com/pion/transport/v3.Dialer(interface)
github.com/pion/transport/v3.Net(interface)
*github.com/pion/transport/v3/stdnet.Net
*github.com/pion/transport/v3/vnet.Net
*github.com/pion/turn/v4/internal/client.TCPAllocation
*crypto/tls.Dialer
*golang.org/x/crypto/ssh.Client
*golang.org/x/net/internal/socks.Dialer
golang.org/x/net/proxy.Dialer(interface)
*golang.org/x/net/proxy.PerHost
*net.Dialer
CustomDialer : github.com/pion/transport/v2.Dialer
CustomDialer : github.com/pion/transport/v3.Dialer
CustomDialer : golang.org/x/net/proxy.Dialer
func SetCustomDialer(dialer CustomDialer) Option
DeleteMarkersOlderThan indicates that delete or purge markers older than that
will be deleted as part of PurgeDeletes() operation, otherwise, only the data
will be removed but markers that are recent will be kept.
Note that if no option is specified, the default is 30 minutes. You can set
this option to a negative value to instruct to always remove the markers,
regardless of their age.
DeleteMarkersOlderThan : PurgeOpt
EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to
a nats server and have an extendable encoder system that will encode and decode messages
from raw Go types.
Deprecated: Encoded connections are no longer supported.Conn*ConnEncEncoder BindRecvChan binds a channel for receive operations from NATS.
Deprecated: Encoded connections are no longer supported. BindRecvQueueChan binds a channel for queue-based receive operations from NATS.
Deprecated: Encoded connections are no longer supported. BindSendChan binds a channel for send operations to NATS.
Deprecated: Encoded connections are no longer supported. Close will close the connection to the server. This call will release
all blocking calls, such as Flush(), etc.
Deprecated: Encoded connections are no longer supported. Drain will put a connection into a drain state. All subscriptions will
immediately be put into a drain state. Upon completion, the publishers
will be drained and can not publish any additional messages. Upon draining
of the publishers, the connection will be closed. Use the ClosedCB()
option to know when the connection has moved from draining to closed.
Deprecated: Encoded connections are no longer supported. Flush will perform a round trip to the server and return when it
receives the internal reply.
Deprecated: Encoded connections are no longer supported. FlushTimeout allows a Flush operation to have an associated timeout.
Deprecated: Encoded connections are no longer supported. LastError reports the last error encountered via the Connection.
Deprecated: Encoded connections are no longer supported. Publish publishes the data argument to the given subject. The data argument
will be encoded using the associated encoder.
Deprecated: Encoded connections are no longer supported. PublishRequest will perform a Publish() expecting a response on the
reply subject. Use Request() for automatically waiting for a response
inline.
Deprecated: Encoded connections are no longer supported. QueueSubscribe will create a queue subscription on the given subject and process
incoming messages using the specified Handler. The Handler should be a func that
matches a signature from the description of Handler from above.
Deprecated: Encoded connections are no longer supported. Request will create an Inbox and perform a Request() call
with the Inbox reply for the data v. A response will be
decoded into the vPtr Response.
Deprecated: Encoded connections are no longer supported. RequestWithContext will create an Inbox and perform a Request
using the provided cancellation context with the Inbox reply
for the data v. A response will be decoded into the vPtr last parameter.
Deprecated: Encoded connections are no longer supported. Subscribe will create a subscription on the given subject and process incoming
messages using the specified Handler. The Handler should be a func that matches
a signature from the description of Handler from above.
Deprecated: Encoded connections are no longer supported.
*EncodedConn : github.com/apache/thrift/lib/go/thrift.Flusher
func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error)
ErrConsumerSequenceMismatch represents an error from a consumer
that received a Heartbeat including sequence different to the
one expected from the view of the client. ConsumerSequence is the sequence of the consumer that is behind. LastConsumerSequence is the sequence of the consumer when the heartbeat
was received. StreamResumeSequence is the stream sequence from where the consumer
should resume consuming from the stream.(*ErrConsumerSequenceMismatch) Error() string
*ErrConsumerSequenceMismatch : error
ErrHandler is used to process asynchronous errors encountered
while processing inbound messages.
func (*Conn).ErrorHandler() ErrHandler
func ErrorHandler(cb ErrHandler) Option
func (*Conn).SetErrorHandler(cb ErrHandler)
Handler is a specific callback used for Subscribe. It is generalized to
an any, but we will discover its format and arguments at runtime
and perform the correct callback, including demarshaling encoded data
back into the appropriate struct based on the signature of the Handler.
Handlers are expected to have one of four signatures.
type person struct {
Name string `json:"name,omitempty"`
Age uint `json:"age,omitempty"`
}
handler := func(m *Msg)
handler := func(p *person)
handler := func(subject string, o *obj)
handler := func(subject, reply string, o *obj)
These forms allow a callback to request a raw Msg ptr, where the processing
of the message from the wire is untouched. Process a JSON representation
and demarshal it into the given struct, e.g. person.
There are also variants where the callback wants either the subject, or the
subject and the reply subject.
Deprecated: Encoded connections are no longer supported.
func (*EncodedConn).QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error)
func (*EncodedConn).Subscribe(subject string, cb Handler) (*Subscription, error)
Header represents the optional Header for a NATS message,
based on the implementation of http.Header. Add adds the key, value pair to the header. It is case-sensitive
and appends to any existing values associated with key. Del deletes the values associated with a key.
It is case-sensitive. Get gets the first value associated with the given key.
It is case-sensitive. Set sets the header entries associated with key to the single
element value. It is case-sensitive and replaces any existing
values associated with key. Values returns all values associated with the given key.
It is case-sensitive.
Header : github.com/redis/go-redis/v9.ConsistentHash
Header : go.opentelemetry.io/otel/propagation.ValuesGetter
func DecodeHeadersMsg(data []byte) (Header, error)
JetStream allows persistent messaging through JetStream.
NOTE: JetStream is part of legacy API.
Users are encouraged to switch to the new JetStream API for enhanced capabilities and
simplified API. Please refer to the `jetstream` package.
See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md ChanQueueSubscribe creates channel based Subscription with a queue group.
See important note in QueueSubscribe() ChanSubscribe creates channel based Subscription.
See important note in Subscribe() CleanupPublisher will cleanup the publishing side of JetStreamContext.
This will unsubscribe from the internal reply subject if needed.
All pending async publishes will fail with ErrJetStreamPublisherClosed.
If an error handler was provided, it will be called for each pending async
publish and PublishAsyncComplete will be closed.
After completing JetStreamContext is still usable - internal subscription
will be recreated on next publish, but the acks from previous publishes will
be lost. Publish publishes a message to JetStream. PublishAsync publishes a message to JetStream and returns a PubAckFuture.
The data should not be changed until the PubAckFuture has been processed. PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd. PublishAsyncPending returns the number of async publishes outstanding for this context. PublishMsg publishes a Msg to JetStream. PublishMsgAsync publishes a Msg to JetStream and returns a PubAckFuture.
The message should not be changed until the PubAckFuture has been processed. PullSubscribe creates a Subscription that can fetch messages.
See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be
set to an empty string.
When using PullSubscribe, the messages are fetched using Fetch() and FetchBatch() methods. QueueSubscribe creates a Subscription with a queue group.
If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
See important note in Subscribe() QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
See important note in QueueSubscribe() Subscribe creates an async Subscription for JetStream.
The stream and consumer names can be provided with the nats.Bind() option.
For creating an ephemeral (where the consumer name is picked by the server),
you can provide the stream name with nats.BindStream().
If no stream name is specified, the library will attempt to figure out which
stream the subscription is for. See important notes below for more details.
IMPORTANT NOTES:
* If none of the options Bind() nor Durable() are specified, the library will
send a request to the server to create an ephemeral JetStream consumer,
which will be deleted after an Unsubscribe() or Drain(), or automatically
by the server after a short period of time after the NATS subscription is
gone.
* If Durable() option is specified, the library will attempt to lookup a JetStream
consumer with this name, and if found, will bind to it and not attempt to
delete it. However, if not found, the library will send a request to
create such durable JetStream consumer. Note that the library will delete
the JetStream consumer after an Unsubscribe() or Drain() only if it
created the durable consumer while subscribing. If the durable consumer
already existed prior to subscribing it won't be deleted.
* If Bind() option is provided, the library will attempt to lookup the
consumer with the given name, and if successful, bind to it. If the lookup fails,
then the Subscribe() call will return an error. SubscribeSync creates a Subscription that can be used to process messages synchronously.
See important note in Subscribe()JetStreamContext(interface)
JetStreamContext allows JetStream messaging and stream management.
NOTE: JetStreamContext is part of legacy API.
Users are encouraged to switch to the new JetStream API for enhanced capabilities and
simplified API. Please refer to the `jetstream` package.
See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md AccountInfo retrieves info about the JetStream usage from an account. AddConsumer adds a consumer to a stream.
If the consumer already exists, and the configuration is the same, it
will return the existing consumer.
If the consumer already exists, and the configuration is different, it
will return ErrConsumerNameAlreadyInUse. AddStream creates a stream. ChanQueueSubscribe creates channel based Subscription with a queue group.
See important note in QueueSubscribe() ChanSubscribe creates channel based Subscription.
See important note in Subscribe() CleanupPublisher will cleanup the publishing side of JetStreamContext.
This will unsubscribe from the internal reply subject if needed.
All pending async publishes will fail with ErrJetStreamPublisherClosed.
If an error handler was provided, it will be called for each pending async
publish and PublishAsyncComplete will be closed.
After completing JetStreamContext is still usable - internal subscription
will be recreated on next publish, but the acks from previous publishes will
be lost. ConsumerInfo retrieves information of a consumer from a stream. ConsumerNames is used to retrieve a list of Consumer names. Consumers is used to retrieve a list of ConsumerInfo objects. ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
Deprecated: Use Consumers() instead. CreateKeyValue will create a KeyValue store with the following configuration. CreateObjectStore will create an object store. DeleteConsumer deletes a consumer. DeleteKeyValue will delete this KeyValue store (JetStream stream). DeleteMsg deletes a message from a stream. The message is marked as erased, but its value is not overwritten. DeleteObjectStore will delete the underlying stream for the named object. DeleteStream deletes a stream. GetLastMsg retrieves the last raw stream message stored in JetStream by subject.
Use option nats.DirectGet() to trigger retrieval
directly from a distributed group of servers (leader and replicas).
The stream must have been created/updated with the AllowDirect boolean. GetMsg retrieves a raw stream message stored in JetStream by sequence number.
Use options nats.DirectGet() or nats.DirectGetNext() to trigger retrieval
directly from a distributed group of servers (leader and replicas).
The stream must have been created/updated with the AllowDirect boolean. KeyValue will lookup and bind to an existing KeyValue store. KeyValueStoreNames is used to retrieve a list of key value store names KeyValueStores is used to retrieve a list of key value store statuses ObjectStore will look up and bind to an existing object store instance. ObjectStoreNames is used to retrieve a list of bucket names ObjectStores is used to retrieve a list of bucket statuses Publish publishes a message to JetStream. PublishAsync publishes a message to JetStream and returns a PubAckFuture.
The data should not be changed until the PubAckFuture has been processed. PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd. PublishAsyncPending returns the number of async publishes outstanding for this context. PublishMsg publishes a Msg to JetStream. PublishMsgAsync publishes a Msg to JetStream and returns a PubAckFuture.
The message should not be changed until the PubAckFuture has been processed. PullSubscribe creates a Subscription that can fetch messages.
See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be
set to an empty string.
When using PullSubscribe, the messages are fetched using Fetch() and FetchBatch() methods. PurgeStream purges a stream messages. QueueSubscribe creates a Subscription with a queue group.
If no optional durable name nor binding options are specified, the queue name will be used as a durable name.
See important note in Subscribe() QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously.
See important note in QueueSubscribe() SecureDeleteMsg deletes a message from a stream. The deleted message is overwritten with random data
As a result, this operation is slower than DeleteMsg() StreamInfo retrieves information from a stream. StreamNameBySubject returns a stream matching given subject. StreamNames is used to retrieve a list of Stream names. Streams can be used to retrieve a list of StreamInfo objects. StreamsInfo can be used to retrieve a list of StreamInfo objects.
Deprecated: Use Streams() instead. Subscribe creates an async Subscription for JetStream.
The stream and consumer names can be provided with the nats.Bind() option.
For creating an ephemeral (where the consumer name is picked by the server),
you can provide the stream name with nats.BindStream().
If no stream name is specified, the library will attempt to figure out which
stream the subscription is for. See important notes below for more details.
IMPORTANT NOTES:
* If none of the options Bind() nor Durable() are specified, the library will
send a request to the server to create an ephemeral JetStream consumer,
which will be deleted after an Unsubscribe() or Drain(), or automatically
by the server after a short period of time after the NATS subscription is
gone.
* If Durable() option is specified, the library will attempt to lookup a JetStream
consumer with this name, and if found, will bind to it and not attempt to
delete it. However, if not found, the library will send a request to
create such durable JetStream consumer. Note that the library will delete
the JetStream consumer after an Unsubscribe() or Drain() only if it
created the durable consumer while subscribing. If the durable consumer
already existed prior to subscribing it won't be deleted.
* If Bind() option is provided, the library will attempt to lookup the
consumer with the given name, and if successful, bind to it. If the lookup fails,
then the Subscribe() call will return an error. SubscribeSync creates a Subscription that can be used to process messages synchronously.
See important note in Subscribe() UpdateConsumer updates an existing consumer. UpdateStream updates a stream.
JetStreamContext : JetStream
JetStreamContext : JetStreamManager
JetStreamContext : KeyValueManager
JetStreamContext : ObjectStoreManager
func (*Conn).JetStream(opts ...JSOpt) (JetStreamContext, error)
JetStreamManager manages JetStream Streams and Consumers. AccountInfo retrieves info about the JetStream usage from an account. AddConsumer adds a consumer to a stream.
If the consumer already exists, and the configuration is the same, it
will return the existing consumer.
If the consumer already exists, and the configuration is different, it
will return ErrConsumerNameAlreadyInUse. AddStream creates a stream. ConsumerInfo retrieves information of a consumer from a stream. ConsumerNames is used to retrieve a list of Consumer names. Consumers is used to retrieve a list of ConsumerInfo objects. ConsumersInfo is used to retrieve a list of ConsumerInfo objects.
Deprecated: Use Consumers() instead. DeleteConsumer deletes a consumer. DeleteMsg deletes a message from a stream. The message is marked as erased, but its value is not overwritten. DeleteStream deletes a stream. GetLastMsg retrieves the last raw stream message stored in JetStream by subject.
Use option nats.DirectGet() to trigger retrieval
directly from a distributed group of servers (leader and replicas).
The stream must have been created/updated with the AllowDirect boolean. GetMsg retrieves a raw stream message stored in JetStream by sequence number.
Use options nats.DirectGet() or nats.DirectGetNext() to trigger retrieval
directly from a distributed group of servers (leader and replicas).
The stream must have been created/updated with the AllowDirect boolean. PurgeStream purges a stream messages. SecureDeleteMsg deletes a message from a stream. The deleted message is overwritten with random data
As a result, this operation is slower than DeleteMsg() StreamInfo retrieves information from a stream. StreamNameBySubject returns a stream matching given subject. StreamNames is used to retrieve a list of Stream names. Streams can be used to retrieve a list of StreamInfo objects. StreamsInfo can be used to retrieve a list of StreamInfo objects.
Deprecated: Use Streams() instead. UpdateConsumer updates an existing consumer. UpdateStream updates a stream.JetStreamContext(interface)
KeyValue contains methods to operate on a KeyValue store. Bucket returns the current bucket name. Create will add the key/value pair iff it does not exist. Delete will place a delete marker and leave all revisions. Get returns the latest value for the key. GetRevision returns a specific revision value for the key. History will return all historical values for the key. Keys will return all keys.
Deprecated: Use ListKeys instead to avoid memory issues. ListKeys will return all keys in a channel. Purge will place a delete marker and remove all previous revisions. PurgeDeletes will remove all current delete markers. Put will place the new value for the key into the store. PutString will place the string for the key into the store. Status retrieves the status and configuration of a bucket Update will update the value iff the latest revision matches.
Update also resets the TTL associated with the key (if any). Watch for any updates to keys that match the keys argument which could include wildcards.
Watch will send a nil entry when it has received all initial values. WatchAll will invoke the callback for all updates. WatchFiltered will watch for any updates to keys that match the keys
argument. It can be configured with the same options as Watch.
func JetStreamContext.CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error)
func JetStreamContext.KeyValue(bucket string) (KeyValue, error)
func KeyValueManager.CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error)
func KeyValueManager.KeyValue(bucket string) (KeyValue, error)
KeyValueBucketStatus represents status of a Bucket, implements KeyValueStatus BackingStore indicates what technology is used for storage of the bucket Bucket the name of the bucket Bytes is the size of the stream History returns the configured history kept per key IsCompressed indicates if the data is compressed on disk StreamInfo is the stream info retrieved to create the status TTL is how long the bucket keeps values for Values is how many messages are in the bucket, including historical values
*KeyValueBucketStatus : KeyValueStatus
KeyValueEntry is a retrieved entry for Get or List or Watch. Bucket is the bucket the data was loaded from. Created is the time the data was put in the bucket. Delta is distance from the latest value. Key is the key that was retrieved. Operation returns Put or Delete or Purge. Revision is a unique sequence for this value. Value is the retrieved value.
func KeyValue.Get(key string) (entry KeyValueEntry, err error)
func KeyValue.GetRevision(key string, revision uint64) (entry KeyValueEntry, err error)
func KeyValue.History(key string, opts ...WatchOpt) ([]KeyValueEntry, error)
func KeyWatcher.Updates() <-chan KeyValueEntry
KeyValueManager is used to manage KeyValue stores. CreateKeyValue will create a KeyValue store with the following configuration. DeleteKeyValue will delete this KeyValue store (JetStream stream). KeyValue will lookup and bind to an existing KeyValue store. KeyValueStoreNames is used to retrieve a list of key value store names KeyValueStores is used to retrieve a list of key value store statusesJetStreamContext(interface)
KeyValueStatus is run-time status about a Key-Value bucket BackingStore indicates what technology is used for storage of the bucket Bucket the name of the bucket Bytes returns the size in bytes of the bucket History returns the configured history kept per key IsCompressed indicates if the data is compressed on disk TTL is how long the bucket keeps values for Values is how many messages are in the bucket, including historical values
*KeyValueBucketStatus
func JetStreamContext.KeyValueStores() <-chan KeyValueStatus
func KeyValue.Status() (KeyValueStatus, error)
func KeyValueManager.KeyValueStores() <-chan KeyValueStatus
KeyWatcher is what is returned when doing a watch. Context returns watcher context optionally provided by nats.Context option. Stop will stop this watcher. Updates returns a channel to read any updates to entries.
func KeyValue.Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
func KeyValue.WatchAll(opts ...WatchOpt) (KeyWatcher, error)
func KeyValue.WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error)
MaxWait sets the maximum amount of time we will wait for a response.
MaxWait : JSOpt
MaxWait : PullOpt
MessageBatch provides methods to retrieve messages consumed using [Subscribe.FetchBatch]. Done signals end of execution. Error returns an error encountered when fetching messages. Messages returns a channel on which messages will be published.
func (*Subscription).FetchBatch(batch int, opts ...PullOpt) (MessageBatch, error)
MsgErrHandler is used to process asynchronous errors from
JetStream PublishAsync. It will return the original
message sent to the server for possible retransmitting and the error encountered.
func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt
ObjectBucketStatus represents status of a Bucket, implements ObjectStoreStatus BackingStore indicates what technology is used for storage of the bucket Bucket is the name of the bucket Description is the description supplied when creating the bucket IsCompressed indicates if the data is compressed on disk Metadata is the metadata supplied when creating the bucket Replicas indicates how many storage replicas are kept for the data in the bucket Sealed indicates the stream is sealed and cannot be modified in any way Size is the combined size of all data in the bucket including metadata, in bytes Storage indicates the underlying JetStream storage technology used to store data StreamInfo is the stream info retrieved to create the status TTL indicates how long objects are kept in the bucket
*ObjectBucketStatus : ObjectStoreStatus
*ObjectBucketStatus : github.com/alexflint/go-arg.Described
ObjectLink is used to embed links to other buckets and objects. Bucket is the name of the other object store. Name can be used to link to a single object.
If empty means this is a link to the whole store, like a directory.
ObjectStore is a blob store capable of storing large objects efficiently in
JetStream streams AddBucketLink will add a link to another object store. AddLink will add a link to another object. Delete will delete the named object. Get will pull the named object from the object store. GetBytes is a convenience function to pull an object from this object store and return it as a byte slice. GetFile is a convenience function to pull an object from this object store and place it in a file. GetInfo will retrieve the current information for the object. GetString is a convenience function to pull an object from this object store and return it as a string. List will list all the objects in this store. Put will place the contents from the reader into a new object. PutBytes is convenience function to put a byte slice into this object store. PutFile is convenience function to put a file into this object store. PutString is convenience function to put a string into this object store. Seal will seal the object store, no further modifications will be allowed. Status retrieves run-time status about the backing store of the bucket. UpdateMeta will update the metadata for the object. Watch for changes in the underlying store and receive meta information updates.
func JetStreamContext.CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error)
func JetStreamContext.ObjectStore(bucket string) (ObjectStore, error)
func ObjectStoreManager.CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error)
func ObjectStoreManager.ObjectStore(bucket string) (ObjectStore, error)
func ObjectStore.AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, error)
ObjectStoreManager creates, loads and deletes Object Stores CreateObjectStore will create an object store. DeleteObjectStore will delete the underlying stream for the named object. ObjectStore will look up and bind to an existing object store instance. ObjectStoreNames is used to retrieve a list of bucket names ObjectStores is used to retrieve a list of bucket statusesJetStreamContext(interface)
BackingStore provides details about the underlying storage Bucket is the name of the bucket Description is the description supplied when creating the bucket IsCompressed indicates if the data is compressed on disk Metadata is the user supplied metadata for the bucket Replicas indicates how many storage replicas are kept for the data in the bucket Sealed indicates the stream is sealed and cannot be modified in any way Size is the combined size of all data in the bucket including metadata, in bytes Storage indicates the underlying JetStream storage technology used to store data TTL indicates how long objects are kept in the bucket
*ObjectBucketStatus
ObjectStoreStatus : github.com/alexflint/go-arg.Described
func JetStreamContext.ObjectStores(opts ...ObjectOpt) <-chan ObjectStoreStatus
func ObjectStore.Status() (ObjectStoreStatus, error)
func ObjectStoreManager.ObjectStores(opts ...ObjectOpt) <-chan ObjectStoreStatus
ObjectWatcher is what is returned when doing a watch. Stop will stop this watcher. Updates returns a channel to read any updates to entries.
func ObjectStore.Watch(opts ...WatchOpt) (ObjectWatcher, error)
Options can be used to create a customized connection. AllowReconnect enables reconnection logic to be used when we
encounter a disconnect from the current server. AsyncErrorCB sets the async error handler (e.g. slow consumer errors) ClosedCB sets the closed handler that is called when a client will
no longer be connected. For websocket connections, indicates to the server that the connection
supports compression. If the server does too, then data will be compressed. ConnectedCB sets the connected handler called when the initial connection
is established. It is not invoked on successful reconnects - for reconnections,
use ReconnectedCB. ConnectedCB can be used in conjunction with RetryOnFailedConnect
to detect whether the initial connect was successful. CustomDialer allows to specify a custom dialer (not necessarily
a *net.Dialer). CustomReconnectDelayCB is invoked after the library tried every
URL in the server list and failed to reconnect. It passes to the
user the current number of attempts. This function returns the
amount of time the library will sleep before attempting to reconnect
again. It is strongly recommended that this value contains some
jitter to prevent all connections to attempt reconnecting at the same time. Dialer allows a custom net.Dialer when forming connections.
Deprecated: should use CustomDialer instead. DisconnectedCB sets the disconnected handler that is called
whenever the connection is disconnected.
Will not be called if DisconnectedErrCB is set
Deprecated. Use DisconnectedErrCB which passes error that caused
the disconnect event. DisconnectedErrCB sets the disconnected error handler that is called
whenever the connection is disconnected.
Disconnected error could be nil, for instance when user explicitly closes the connection.
DisconnectedCB will not be called if DisconnectedErrCB is set DiscoveredServersCB sets the callback that is invoked whenever a new
server has joined the cluster. DrainTimeout sets the timeout for a Drain Operation to complete.
Defaults to 30s. FlusherTimeout is the maximum time to wait for write operations
to the underlying connection to complete (including the flusher loop).
Defaults to 1m. IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting
subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy). InProcessServer represents a NATS server running within the
same process. If this is set then we will attempt to connect
to the server directly rather than using external TCP conns. InboxPrefix allows the default _INBOX prefix to be customized LameDuckModeHandler sets the callback to invoke when the server notifies
the connection that it entered lame duck mode, that is, going to
gradually disconnect all its connections before shutting down. This is
often used in deployments when upgrading NATS Servers. MaxPingsOut is the maximum number of pending ping commands that can
be awaiting a response before raising an ErrStaleConnection error.
Defaults to 2. MaxReconnect sets the number of reconnect attempts that will be
tried before giving up. If negative, then it will never give up
trying to reconnect.
Defaults to 60. Name is an optional name label which will be sent to the server
on CONNECT to identify the client. Nkey sets the public nkey that will be used to authenticate
when connecting to the server. UserJWT and Nkey are mutually exclusive
and if defined, UserJWT will take precedence. NoCallbacksAfterClientClose allows preventing the invocation of
callbacks after Close() is called. Client won't receive notifications
when Close is invoked by user code. Default is to invoke the callbacks. NoEcho configures whether the server will echo back messages
that are sent on this connection if we also have matching subscriptions.
Note this is supported on servers >= version 1.2. Proto 1 or greater. NoRandomize configures whether we will randomize the
server pool. Password sets the password to be used when connecting to a server. Pedantic signals the server whether it should be doing further
validation of subjects. PermissionErrOnSubscribe - if set to true, the client will return ErrPermissionViolation
from SubscribeSync if the server returns a permissions error for a subscription.
Defaults to false. PingInterval is the period at which the client will be sending ping
commands to the server, disabled if 0 or negative.
Defaults to 2m. For websocket connections, adds a path to connections url.
This is useful when connecting to NATS behind a proxy. ReconnectBufSize is the size of the backing bufio during reconnect.
Once this has been exhausted publish operations will return an error.
Defaults to 8388608 bytes (8MB). ReconnectErrCB sets the callback that is invoked whenever a
reconnect attempt failed ReconnectJitter sets the upper bound for a random delay added to
ReconnectWait during a reconnect when no TLS is used.
Defaults to 100ms. ReconnectJitterTLS sets the upper bound for a random delay added to
ReconnectWait during a reconnect when TLS is used.
Defaults to 1s. ReconnectWait sets the time to backoff after attempting a reconnect
to a server that we were already connected to previously.
Defaults to 2s. ReconnectedCB sets the reconnected handler called whenever
the connection is successfully reconnected. RetryOnFailedConnect sets the connection in reconnecting state right
away if it can't connect to a server in the initial set. The
MaxReconnect and ReconnectWait options are used for this process,
similarly to when an established connection is disconnected.
If a ReconnectHandler is set, it will be invoked on the first
successful reconnect attempt (if the initial connect fails),
and if a ClosedHandler is set, it will be invoked if
it fails to connect (after exhausting the MaxReconnect attempts). RootCAsCB is used to fetch and return a set of root certificate
authorities that clients use when verifying server certificates. Secure enables TLS secure connections that skip server
verification by default. NOT RECOMMENDED. Servers is a configured set of servers which this client
will use when attempting to connect. SignatureCB designates the function used to sign the nonce
presented from the server. SkipHostLookup skips the DNS lookup for the server hostname. SubChanLen is the size of the buffered channel used between the socket
Go routine and the message delivery for SyncSubscriptions.
NOTE: This does not affect AsyncSubscriptions which are
dictated by PendingLimits()
Defaults to 65536. TLSCertCB is used to fetch and return custom tls certificate. TLSConfig is a custom TLS configuration to use for secure
transports. TLSHandshakeFirst is used to instruct the library perform
the TLS handshake right after the connect and before receiving
the INFO protocol from the server. If this option is enabled
but the server is not configured to perform the TLS handshake
first, the connection will fail. Timeout sets the timeout for a Dial operation on a connection.
Defaults to 2s. Token sets the token to be used when connecting to a server. TokenHandler designates the function used to generate the token to be used when connecting to a server. Url represents a single NATS server url to which the client
will be connecting. If the Servers option is also set, it
then becomes the first server in the Servers array. UseOldRequestStyle forces the old method of Requests that utilize
a new Inbox and a new Subscription for each request. User sets the username to be used when connecting to the server. UserInfo sets the callback handler that will fetch the username and password. UserJWT sets the callback handler that will fetch a user's JWT. Verbose signals the server to send an OK ack for commands
successfully processed by the server. Connect will attempt to connect to a NATS server with multiple options.
func GetDefaultOptions() Options
var DefaultOptions
ReconnectDelayHandler is used to get from the user the desired
delay the library should pause before attempting to reconnect
again. Note that this is invoked after the library tried the
whole list of URLs and failed to reconnect.
func CustomReconnectDelay(cb ReconnectDelayHandler) Option
RePublish is for republishing messages once committed to a stream. The original
subject cis remapped from the subject pattern to the destination pattern.DestinationstringHeadersOnlyboolSourcestring
RootCAsHandler is used to fetch and return a set of root certificate
authorities that clients use when verifying server certificates.
func ClientTLSConfig(certCB TLSCertHandler, rootCAsCB RootCAsHandler) Option
SequencePair includes the consumer and stream sequence info from a JetStream consumer.Consumeruint64Streamuint64
SignatureHandler is used to sign a nonce from the server while
authenticating with nkeys. The user should sign the nonce and
return the raw signature. The client will base64 encode this to
send to the server.
func Nkey(pubKey string, sigCB SignatureHandler) Option
func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option
StreamConfig will determine the properties for a stream.
There are sensible defaults for most. If no subjects are
given the name will be used as the only subject. AllowDirect enables direct access to individual messages using direct
get API. Defaults to false. AllowMsgTTL allows header initiated per-message TTLs.
This feature requires nats-server v2.11.0 or later. AllowRollup allows the use of the Nats-Rollup header to replace all
contents of a stream, or subject in a stream, with a single new
message. Compression specifies the message storage compression algorithm.
Defaults to NoCompression. ConsumerLimits defines limits of certain values that consumers can
set, defaults for those who don't set these settings DenyDelete restricts the ability to delete messages from a stream via
the API. Defaults to false. DenyPurge restricts the ability to purge messages from a stream via
the API. Defaults to false. Description is an optional description of the stream. Discard defines the policy for handling messages when the stream
reaches its limits in terms of number of messages or total bytes. DiscardNewPerSubject is a flag to enable discarding new messages per
subject when limits are reached. Requires DiscardPolicy to be
DiscardNew and the MaxMsgsPerSubject to be set. Duplicates is the window within which to track duplicate messages.
If not set, server default is 2 minutes. FirstSeq is the initial sequence number of the first message in the
stream. MaxAge is the maximum age of messages that the stream will retain. MaxBytes is the maximum total size of messages the stream will store.
After reaching the limit, stream adheres to the discard policy.
If not set, server default is -1 (unlimited). MaxConsumers specifies the maximum number of consumers allowed for
the stream. MaxMsgSize is the maximum size of any single message in the stream. MaxMsgs is the maximum number of messages the stream will store.
After reaching the limit, stream adheres to the discard policy.
If not set, server default is -1 (unlimited). MaxMsgsPerSubject is the maximum number of messages per subject that
the stream will retain. Metadata is a set of application-defined key-value pairs for
associating metadata on the stream. This feature requires nats-server
v2.10.0 or later. Mirror defines the configuration for mirroring another stream. MirrorDirect enables direct access to individual messages from the
origin stream using direct get API. Defaults to false. Name is the name of the stream. It is required and must be unique
across the JetStream account.
Name Names cannot contain whitespace, ., *, >, path separators
(forward or backwards slash), and non-printable characters. NoAck is a flag to disable acknowledging messages received by this
stream.
If set to true, publish methods from the JetStream client will not
work as expected, since they rely on acknowledgements. Core NATS
publish methods should be used instead. Note that this will make
message delivery less reliable. Placement is used to declare where the stream should be placed via
tags and/or an explicit cluster name. RePublish allows immediate republishing a message to the configured
subject after it's stored. Replicas is the number of stream replicas in clustered JetStream.
Defaults to 1, maximum is 5. Retention defines the message retention policy for the stream.
Defaults to LimitsPolicy. Sealed streams do not allow messages to be published or deleted via limits or API,
sealed streams can not be unsealed via configuration update. Can only
be set on already created streams via the Update API. Sources is a list of other streams this stream sources messages from. Storage specifies the type of storage backend used for the stream
(file or memory). Enables and sets a duration for adding server markers for delete, purge and max age limits.
This feature requires nats-server v2.11.0 or later. SubjectTransform allows applying a transformation to matching
messages' subjects. Subjects is a list of subjects that the stream is listening on.
Wildcards are supported. Subjects cannot be set if the stream is
created as a mirror. Template identifies the template that manages the Stream. Deprecated:
This feature is no longer supported.
func JetStreamContext.AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)
func JetStreamContext.UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)
func JetStreamManager.AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)
func JetStreamManager.UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)
StreamInfoRequest contains additional option to returnapiPagedRequest.Offsetint DeletedDetails when true includes information about deleted messages SubjectsFilter when set, returns information on the matched subjects
*StreamInfoRequest : JSOpt
StreamPurgeRequest is optional request information to the purge API. Number of messages to keep. Purge up to but not including sequence. Subject to match against messages for the purge command.
*StreamPurgeRequest : JSOpt
SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received.DestinationstringSourcestring
Subscription represents interest in a given subject. Optional queue group name. If present, all subscriptions with the
same name will form a distributed queue, and each message will
only be processed by one member of the group. Subject that represents this subscription. This can be different
than the received subject inside a Msg if this is a wildcard. AutoUnsubscribe will issue an automatic Unsubscribe that is
processed by the server when max messages have been received.
This can be useful when sending a request to an unknown number
of subscribers. ClearMaxPending resets the maximums seen so far.(*Subscription) ConsumerInfo() (*ConsumerInfo, error) Delivered returns the number of delivered messages for this subscription. Drain will remove interest but continue callbacks until all messages
have been processed.
For a JetStream subscription, if the library has created the JetStream
consumer, the library will send a DeleteConsumer request to the server
when the Drain operation completes. If a failure occurs when deleting
the JetStream consumer, an error will be reported to the asynchronous
error callback.
If you do not wish the JetStream consumer to be automatically deleted,
ensure that the consumer is not created by the library, which means
create the consumer with AddConsumer and bind to this consumer. Dropped returns the number of known dropped messages for this subscription.
This will correspond to messages dropped by violations of PendingLimits. If
the server declares the connection a SlowConsumer, this number may not be
valid. Fetch pulls a batch of messages from a stream for a pull consumer. FetchBatch pulls a batch of messages from a stream for a pull consumer.
Unlike [Subscription.Fetch], it is non blocking and returns [MessageBatch],
allowing to retrieve incoming messages from a channel.
The returned channel is always closed after all messages for a batch have been
delivered by the server - it is safe to iterate over it using range.
To avoid using default JetStream timeout as fetch expiry time, use [nats.MaxWait]
or [nats.Context] (with deadline set).
This method will not return error in case of pull request expiry (even if there are no messages).
Any other error encountered when receiving messages will cause FetchBatch to stop receiving new messages. InitialConsumerPending returns the number of messages pending to be
delivered to the consumer when the subscription was created. IsDraining returns a boolean indicating whether the subscription
is being drained.
This will return false if the subscription has already been closed. IsValid returns a boolean indicating whether the subscription
is still active. This will return false if the subscription has
already been closed. MaxPending returns the maximum number of queued messages and queued bytes seen so far. Msgs returns an iter.Seq2[*Msg, error] that can be used to iterate over
messages. It can only be used with a subscription that has been created with
SubscribeSync or QueueSubscribeSync, otherwise it will return an error on the
first iteration.
The iterator will block until a message is available. The
subscription will not be closed when the iterator is done. MsgsTimeout returns an iter.Seq2[*Msg, error] that can be used to iterate
over messages. It can only be used with a subscription that has been created
with SubscribeSync or QueueSubscribeSync, otherwise it will return an error
on the first iteration.
The iterator will block until a message is available or the timeout is
reached. If the timeout is reached, the iterator will return nats.ErrTimeout
but it will not be closed. NextMsg will return the next message available to a synchronous subscriber
or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription),
the connection is closed (ErrConnectionClosed), the timeout is reached (ErrTimeout),
or if there were no responders (ErrNoResponders) when used in the context of a request/reply. NextMsgWithContext takes a context and returns the next message
available to a synchronous subscriber, blocking until it is delivered
or context gets canceled. Pending returns the number of queued messages and queued bytes in the client for this subscription. PendingLimits returns the current limits for this subscription.
If no error is returned, a negative value indicates that the
given metric is not limited. Queued returns the number of queued messages in the client for this subscription.
Deprecated: Use Pending() SetClosedHandler will set the closed handler for when a subscription
is closed (either unsubscribed or drained). SetPendingLimits sets the limits for pending msgs and bytes for this subscription.
Zero is not allowed. Any negative value means that the given metric is not limited. StatusChanged returns a channel on which given list of subscription status
changes will be sent. If no status is provided, all status changes will be sent.
Available statuses are SubscriptionActive, SubscriptionDraining, SubscriptionClosed,
and SubscriptionSlowConsumer.
The returned channel will be closed when the subscription is closed. Type returns the type of Subscription. Unsubscribe will remove interest in the given subject.
For a JetStream subscription, if the library has created the JetStream
consumer, it will send a DeleteConsumer request to the server (if the
unsubscribe itself was successful). If the delete operation fails, the
error will be returned.
If you do not wish the JetStream consumer to be automatically deleted,
ensure that the consumer is not created by the library, which means
create the consumer with AddConsumer and bind to this consumer (using
the nats.Bind() option).
*Subscription : database/sql/driver.Validator
func (*Conn).ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error)
func (*Conn).ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)
func (*Conn).QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error)
func (*Conn).QueueSubscribeSync(subj, queue string) (*Subscription, error)
func (*Conn).QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error)
func (*Conn).Subscribe(subj string, cb MsgHandler) (*Subscription, error)
func (*Conn).SubscribeSync(subj string) (*Subscription, error)
func (*EncodedConn).BindRecvChan(subject string, channel any) (*Subscription, error)
func (*EncodedConn).BindRecvQueueChan(subject, queue string, channel any) (*Subscription, error)
func (*EncodedConn).QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error)
func (*EncodedConn).Subscribe(subject string, cb Handler) (*Subscription, error)
func JetStream.ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
func JetStream.ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
func JetStream.PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
func JetStream.QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
func JetStream.QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)
func JetStream.Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
func JetStream.SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)
func JetStreamContext.ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
func JetStreamContext.ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error)
func JetStreamContext.PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error)
func JetStreamContext.QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
func JetStreamContext.QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error)
func JetStreamContext.Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
func JetStreamContext.SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)
AckAll when acking a sequence number, this implicitly acks all sequences
below this one as well.
AckExplicit requires ack or nack for all messages.
AckNone requires no acks for delivered messages.
APIPrefix changes the default prefix used for the JetStream API.
BackOff is an array of time durations that represent the time to delay based on delivery count.
Bind binds a subscription to an existing consumer from a stream without attempting to create.
The first argument is the stream name and the second argument will be the consumer name.
BindStream binds a consumer to a stream explicitly based on a name.
When a stream name is not specified, the library uses the subscribe
subject as a way to find the stream name. It is done by making a request
to the server to get list of stream names that have a filter for this
subject. If the returned list contains a single stream, then this
stream name will be used, otherwise the `ErrNoMatchingStream` is returned.
To avoid the stream lookup, provide the stream name with this function.
See also `Bind()`.
ClientCert is a helper option to provide the client certificate from a file.
If Secure is not already set this will set it as well.
ClientTLSConfig is an Option to set the TLS configuration for secure
connections. It can be used to e.g. set TLS config with cert and root CAs
from memory. For simple use case of loading cert and CAs from file,
ClientCert and RootCAs options are more convenient.
If Secure is not already set this will set it as well.
ClosedHandler is an Option to set the closed handler.
Compression is an Option to indicate if this connection supports
compression. Currently only supported for Websocket connections.
Connect will attempt to connect to the NATS system.
The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
Comma separated arrays are also supported, e.g. urlA, urlB.
Options start with the defaults but can be overridden.
To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as
`ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
ConnectHandler is an Option to set the connected handler.
ConsumerFilterSubjects can be used to set multiple subject filters on the consumer.
It has to be used in conjunction with [nats.BindStream] and
with empty 'subject' parameter.
ConsumerMemoryStorage sets the memory storage to true for a consumer.
ConsumerName sets the name for a consumer.
ConsumerReplicas sets the number of replica count for a consumer.
Context returns an option that can be used to configure a context for APIs
that are context aware such as those part of the JetStream interface.
CustomInboxPrefix configures the request + reply inbox prefix
CustomReconnectDelay is an Option to set the CustomReconnectDelayCB option.
See CustomReconnectDelayCB Option for more details.
DecodeHeadersMsg will decode and headers.
DecodeObjectDigest decodes base64 hash
DeliverAll will configure a Consumer to receive all the
messages from a Stream.
DeliverLast configures a Consumer to receive messages
starting with the latest one.
DeliverLastPerSubject configures a Consumer to receive messages
starting with the latest one for each filtered subject.
DeliverNew configures a Consumer to receive messages
published after the subscription.
DeliverSubject specifies the JetStream consumer deliver subject.
This option is used only in situations where the consumer does not exist
and a creation request is sent to the server. If not provided, an inbox
will be selected.
If a consumer exists, then the NATS subscription will be created on
the JetStream consumer's DeliverSubject, not necessarily this subject.
Description will set the description for the created consumer.
Dialer is an Option to set the dialer which will be used when
attempting to establish a connection.
Deprecated: Should use CustomDialer instead.
DirectGet is an option that can be used to make GetMsg() or GetLastMsg()
retrieve message directly from a group of servers (leader and replicas)
if the stream was created with the AllowDirect option.
DirectGetNext is an option that can be used to make GetMsg() retrieve message
directly from a group of servers (leader and replicas) if the stream was
created with the AllowDirect option.
The server will find the next message matching the filter `subject` starting
at the start sequence (argument in GetMsg()). The filter `subject` can be a
wildcard.
DisconnectErrHandler is an Option to set the disconnected error handler.
DisconnectHandler is an Option to set the disconnected handler.
Deprecated: Use DisconnectErrHandler.
DiscoveredServersHandler is an Option to set the new servers handler.
Domain changes the domain part of JetStream API prefix.
DontRandomize is an Option to turn off randomizing the server pool.
DrainTimeout is an Option to set the timeout for draining a connection.
Defaults to 30s.
Durable defines the consumer name for JetStream durable subscribers.
This function will return ErrInvalidConsumerName if the name contains
any dot ".".
EnableFlowControl enables flow control for a push based consumer.
EncoderForType will return the registered Encoder for the encType.
Deprecated: Encoded connections are no longer supported.
ErrorHandler is an Option to set the async error handler.
ExpectLastMsgId sets the expected last msgId in the response from the publish.
ExpectLastSequence sets the expected sequence in the response from the publish.
ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.
ExpectStream sets the expected stream to respond from the publish.
FlusherTimeout is an Option to set the write (and flush) timeout on a connection.
GetDefaultOptions returns default configuration options for the client.
GetObjectDigestValue calculates the base64 value of hashed data
GetObjectInfoShowDeleted makes GetInfo() return object if it was marked as deleted.
GetObjectShowDeleted makes Get() return object if it was marked as deleted.
HeadersOnly() will instruct the consumer to only deliver headers and no payloads.
IdleHeartbeat enables push based consumers to have idle heartbeats delivered.
For pull consumers, idle heartbeat has to be set on each [Fetch] call.
IgnoreAuthErrorAbort opts out of the default connect behavior of aborting
subsequent reconnect attempts if server returns the same auth error twice.
IgnoreDeletes will have the key watcher not pass any deleted keys.
InactiveThreshold indicates how long the server should keep a consumer
after detecting a lack of activity. In NATS Server 2.8.4 and earlier, this
option only applies to ephemeral consumers. In NATS Server 2.9.0 and later,
this option applies to both ephemeral and durable consumers, allowing durable
consumers to also be deleted automatically after the inactivity threshold has
passed.
IncludeHistory instructs the key watcher to include historical values as well.
InProcessServer is an Option that will try to establish a direction to a NATS server
running within the process instead of dialing via TCP.
LameDuckModeHandler sets the callback to invoke when the server notifies
the connection that it entered lame duck mode, that is, going to
gradually disconnect all its connections before shutting down. This is
often used in deployments when upgrading NATS Servers.
LastRevision deletes if the latest revision matches.
ListObjectsShowDeleted makes ListObjects() return deleted objects.
ManualAck disables auto ack functionality for async subscriptions.
MaxAckPending sets the number of outstanding acks that are allowed before
message delivery is halted.
MaxDeliver sets the number of redeliveries for a message.
MaxPingsOutstanding is an Option to set the maximum number of ping requests
that can go unanswered by the server before closing the connection.
Defaults to 2.
MaxReconnects is an Option to set the maximum number of reconnect attempts.
If negative, it will never stop trying to reconnect.
Defaults to 60.
MaxRequestBatch sets the maximum pull consumer batch size that a Fetch()
can request.
MaxRequestExpires sets the maximum pull consumer request expiration that a
Fetch() can request (using the Fetch's timeout value).
MaxRequesMaxBytes sets the maximum pull consumer request bytes that a
Fetch() can receive.
MetaOnly instructs the key watcher to retrieve only the entry meta data, not the entry value
NewEncodedConn will wrap an existing Connection and utilize the appropriate registered
encoder.
Deprecated: Encoded connections are no longer supported.
NewInbox will return an inbox string which can be used for directed replies from
subscribers. These are guaranteed to be unique, but can be shared and subscribed
to by others.
NewMsg creates a message for publishing that will use headers.
Nkey will set the public Nkey and the signature callback to
sign the server nonce.
NkeyOptionFromSeed will load an nkey pair from a seed file.
It will return the NKey Option and will handle
signing of nonce challenges from the server. It will take
care to not hold keys in memory and to wipe memory.
NoCallbacksAfterClientClose is an Option to disable callbacks when user code
calls Close(). If close is initiated by any other condition, callbacks
if any will be invoked.
NoEcho is an Option to turn off messages echoing back from a server.
Note this is supported on servers >= version 1.2. Proto 1 or greater.
NoReconnect is an Option to turn off reconnect behavior.
OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages.
There are no redeliveries and no acks, and flow control and heartbeats will be added but
will be taken care of without additional client code.
PingInterval is an Option to set the period for client ping commands.
Defaults to 2m.
ProxyPath is an option for websocket connections that adds a path to connections url.
This is useful when connecting to NATS behind a proxy.
PublishAsyncErrHandler sets the error handler for async publishes in JetStream.
PublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.
PublishAsyncTimeout sets the timeout for async message publish.
If not provided, timeout is disabled.
PullMaxWaiting defines the max inflight pull requests.
RateLimit is the Bits per sec rate limit applied to a push consumer.
ReconnectBufSize sets the buffer size of messages kept while busy reconnecting.
Defaults to 8388608 bytes (8MB). It can be disabled by setting it to -1.
ReconnectErrHandler is an Option to set the reconnect error handler.
ReconnectHandler is an Option to set the reconnected handler.
ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait.
Defaults to 100ms and 1s, respectively.
ReconnectWait is an Option to set the wait time between reconnect attempts.
Defaults to 2s.
RegisterEncoder will register the encType with the given Encoder. Useful for customization.
Deprecated: Encoded connections are no longer supported.
ReplayInstant replays the messages as fast as possible.
ReplayOriginal replays the messages at the original speed.
RetryAttempts sets the retry number of attempts when ErrNoResponders is encountered.
RetryOnFailedConnect sets the connection in reconnecting state right away
if it can't connect to a server in the initial set.
See RetryOnFailedConnect option for more details.
RetryWait sets the retry wait time when ErrNoResponders is encountered.
RootCAs is a helper option to provide the RootCAs pool from a list of filenames.
If Secure is not already set this will set it as well.
Secure is an Option to enable TLS secure connections that skip server verification by default.
Pass a TLS Configuration for proper TLS.
A TLS Configuration using InsecureSkipVerify should NOT be used in a production setting.
SetCustomDialer is an Option to set a custom dialer which will be
used when attempting to establish a connection. If both Dialer
and CustomDialer are specified, CustomDialer takes precedence.
SkipConsumerLookup will omit looking up consumer when [Bind], [Durable]
or [ConsumerName] are provided.
NOTE: This setting may cause an existing consumer to be overwritten. Also,
because consumer lookup is skipped, all consumer options like AckPolicy,
DeliverSubject etc. need to be provided even if consumer already exists.
SkipHostLookup is an Option to skip the host lookup when connecting to a server.
StallWait sets the max wait when the producer becomes stall producing messages.
StartSequence configures a Consumer to receive
messages from a start sequence.
StartTime configures a Consumer to receive
messages from a start time.
StreamListFilter is an option that can be used to configure `StreamsInfo()` and `StreamNames()` requests.
It allows filtering the returned streams by subject associated with each stream.
Wildcards can be used. For example, `StreamListFilter(FOO.*.A) will return
all streams which have at least one subject matching the provided pattern (e.g. FOO.TEST.A).
SyncQueueLen will set the maximum queue len for the internal
channel used for SubscribeSync().
Defaults to 65536.
Timeout is an Option to set the timeout for Dial on a connection.
Defaults to 2s.
TLSHandshakeFirst is an Option to perform the TLS handshake first, that is
before receiving the INFO protocol. This requires the server to also be
configured with such option, otherwise the connection will fail.
Token is an Option to set the token to use
when a token is not included directly in the URLs
and when a token handler is not provided.
TokenHandler is an Option to set the token handler to use
when a token is not included directly in the URLs
and when a token is not set.
UpdatesOnly instructs the key watcher to only include updates on values (without latest values when started).
UseLegacyDurableConsumers makes JetStream use the legacy (pre nats-server v2.9.0) subjects for consumer creation.
If this option is used when creating JetStremContext, $JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer> will be used
to create a consumer with Durable provided, rather than $JS.API.CONSUMER.CREATE.<stream>.<consumer>.
UseOldRequestStyle is an Option to force usage of the old Request style.
UserCredentials is a convenience function that takes a filename
for a user's JWT and a filename for the user's private Nkey seed.
UserInfo is an Option to set the username and password to
use when not included directly in the URLs.
UserJWT will set the callbacks to retrieve the user's JWT and
the signature callback to sign the server nonce. This an the Nkey
option are mutually exclusive.
UserJWTAndSeed is a convenience function that takes the JWT and seed
values as strings.
Package-Level Variables (total 125)
Deprecated: Use GetDefaultOptions() instead.
DefaultOptions is not safe for use by multiple clients.
For details see #308.
Errors
ErrAsyncPublishTimeout is returned when waiting for ack on async publish
ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer.
ErrConsumerDeleted is returned when attempting to send pull request to a consumer which does not exist
ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed
ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting
multiple filter subjects with filter_subjects field. If this error is returned when executing AddConsumer(), the consumer with invalid
configuration was already created in the server.
ErrConsumerNameAlreadyInUse is an error returned when consumer with given name already exists.
ErrConsumerNameRequired is returned when the provided consumer durable name is empty.
ErrConsumerNotActive is an error returned when consumer is not active.
ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrContextAndTimeout is returned when attempting to use both context and timeout.
ErrInvalidStreamName is returned when the provided stream name is invalid (contains '.' or ' ').
ErrJetStreamNotEnabled is an error returned when JetStream is not enabled for an account.
Note: This error will not be returned in clustered mode, even if each
server in the cluster does not have JetStream enabled. In clustered mode,
requests will time out instead.
ErrJetStreamNotEnabledForAccount is an error returned when JetStream is not enabled for an account.
ErrJetStreamPublisherClosed is returned for each unfinished ack future when JetStream.Cleanup is called.
ErrDuplicateFilterSubjects is returned when filter subjects overlap when creating consumer.
Errors
ErrPullSubscribeRequired is returned when attempting to use subscribe methods not suitable for pull consumers for pull consumers.
ErrPullSubscribeToPushConsumer is returned when attempting to use PullSubscribe on push consumer.
Errors
Errors
Errors
Errors
Errors
ErrStreamConfigRequired is returned when empty stream configuration is supplied to add/update stream.
ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration.
ErrStreamNameRequired is returned when the provided stream name is empty.
ErrStreamNotFound is an error returned when stream with given name does not exist.
ErrStreamSourceMultipleSubjectTransformsNotSupported is returned when the connected nats-server version does not support setting
the stream sources. If this error is returned when executing AddStream(), the stream with invalid
configuration was already created in the server.
ErrStreamSourceNotSupported is returned when the connected nats-server version does not support setting
the stream sources. If this error is returned when executing AddStream(), the stream with invalid
configuration was already created in the server.
ErrStreamSourceSubjectTransformNotSupported is returned when the connected nats-server version does not support setting
the stream source subject transform. If this error is returned when executing AddStream(), the stream with invalid
configuration was already created in the server.
ErrStreamSubjectTransformNotSupported is returned when the connected nats-server version does not support setting
the stream subject transform. If this error is returned when executing AddStream(), the stream with invalid
configuration was already created in the server.
ErrSubjectMismatch is returned when the provided subject does not match consumer's filter subject.
ErrSubscriptionClosed is returned when attempting to send pull request to a closed subscription
Errors
Errors
Errors
ErrTooManyStalledMsgs is returned when too many outstanding async
messages are waiting for ack.
WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.