package nats

Import Path
	github.com/nats-io/nats.go (on go.dev)

Dependency Relation
	imports 38 packages, and imported by one package

Involved Source Files context.go enc.go js.go jserrors.go jsm.go kv.go A Go client for the NATS messaging system (https://nats.io). nats_iter.go netchan.go object.go parser.go timer.go ws.go
Code Examples { nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } js, err := nc.JetStream() if err != nil { log.Fatal(err) } js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) js.Publish("foo", []byte("Hello JS!")) sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsg(2 * time.Second) msg.InProgress(nats.AckWait(2)) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() msg.Ack(nats.Context(ctx)) } { nc, _ := nats.Connect("localhost") js, _ := nc.JetStream() js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) js.Publish("foo", []byte("Hello JS!"), nats.AckWait(2*time.Second)) sub, _ := js.SubscribeSync("foo", nats.AckWait(10*time.Second)) msg, _ := sub.NextMsg(2 * time.Second) msg.AckSync(nats.AckWait(2 * time.Second)) } { nc, _ := nats.Connect(nats.DefaultURL) nc.Close() } { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")} for i := 0; i < 1000; i++ { nc.PublishMsg(msg) } err := nc.Flush() if err == nil { } } { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")} for i := 0; i < 1000; i++ { nc.PublishMsg(msg) } err := nc.FlushTimeout(1 * time.Second) if err == nil { } } { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() nc.Subscribe("foo", func(m *nats.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) }) nc.ForceReconnect() } { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() nc.Publish("foo", []byte("Hello World!")) } { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World!")} nc.PublishMsg(msg) } { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() received := 0 nc.QueueSubscribe("foo", "worker_group", func(_ *nats.Msg) { received++ }) } { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() nc.Subscribe("foo", func(m *nats.Msg) { nc.Publish(m.Reply, []byte("I will help you")) }) nc.Request("foo", []byte("help"), 50*time.Millisecond) } { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() nc.Subscribe("foo", func(m *nats.Msg) { fmt.Printf("Received a message: %s\n", string(m.Data)) }) } { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() sub, _ := nc.SubscribeSync("foo") m, err := sub.NextMsg(1 * time.Second) if err == nil { fmt.Printf("Received a message: %s\n", string(m.Data)) } else { fmt.Println("NextMsg timed out.") } } { nc, _ := nats.Connect("demo.nats.io") nc.Close() nc, _ = nats.Connect("nats://derek:secretpassword@demo.nats.io:4222") nc.Close() nc, _ = nats.Connect("tls://derek:secretpassword@demo.nats.io:4443") nc.Close() opts := nats.Options{ AllowReconnect: true, MaxReconnect: 10, ReconnectWait: 5 * time.Second, Timeout: 1 * time.Second, } nc, _ = opts.Connect() nc.Close() } { nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } js, _ := nc.JetStream() ctx, cancel := context.WithCancel(context.Background()) defer cancel() nctx := nats.Context(ctx) js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }, nctx) tctx, tcancel := context.WithTimeout(nctx, 2*time.Second) defer tcancel() deadlineCtx := nats.Context(tctx) js.Publish("foo", []byte("Hello JS!"), deadlineCtx) sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsgWithContext(deadlineCtx) msg.Ack(deadlineCtx) } { sd := &skipTLSDialer{dialer: &net.Dialer{Timeout: 2 * time.Second}, skipTLS: true} nc, _ := nats.Connect("demo.nats.io", nats.SetCustomDialer(sd)) defer nc.Close() } { nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } js, err := nc.JetStream(nats.APIPrefix("dlc"), nats.MaxWait(5*time.Second)) if err != nil { log.Fatal(err) } sub, _ := js.SubscribeSync("foo") js.Publish("foo", []byte("Hello JS!")) sub.NextMsg(2 * time.Second) } { nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) if err != nil { log.Fatal(err) } js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) js.Publish("foo", []byte("Hello JS!")) for i := 0; i < 500; i++ { js.PublishAsync("foo", []byte("Hello JS Async!")) } select { case <-js.PublishAsyncComplete(): case <-time.After(5 * time.Second): fmt.Println("Did not resolve in time") } js.Subscribe("foo", func(msg *nats.Msg) { meta, _ := msg.Metadata() fmt.Printf("Stream Sequence : %v\n", meta.Sequence.Stream) fmt.Printf("Consumer Sequence: %v\n", meta.Sequence.Consumer) }) js.Subscribe("foo", func(msg *nats.Msg) { msg.Ack() }, nats.ManualAck()) js.QueueSubscribe("foo", "group", func(msg *nats.Msg) { msg.Ack() }, nats.ManualAck()) sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsg(2 * time.Second) msg.Ack() sub, _ = js.QueueSubscribeSync("foo", "group") msg, _ = sub.NextMsg(2 * time.Second) msg.Ack() msgCh := make(chan *nats.Msg, 8192) sub, _ = js.ChanSubscribe("foo", msgCh) select { case msg := <-msgCh: fmt.Println("[Received]", msg) case <-time.After(1 * time.Second): } sub, _ = js.PullSubscribe("foo", "wq", nats.PullMaxWaiting(128)) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() for { select { case <-ctx.Done(): return default: } msgs, _ := sub.Fetch(10, nats.Context(ctx)) for _, msg := range msgs { msg.Ack() } } } { nc, _ := nats.Connect("localhost") var js nats.JetStream var jsm nats.JetStreamManager var jsctx nats.JetStreamContext js, _ = nc.JetStream() js.Publish("foo", []byte("hello")) jsm, _ = nc.JetStream() jsm.AddStream(&nats.StreamConfig{Name: "FOO"}) jsctx, _ = nc.JetStream() jsctx.AddStream(&nats.StreamConfig{Name: "BAR"}) jsctx.Publish("bar", []byte("hello world")) } { nc, _ := nats.Connect("localhost") js, _ := nc.JetStream() js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, MaxBytes: 1024, }) js.UpdateStream(&nats.StreamConfig{ Name: "FOO", MaxBytes: 2048, }) js.AddConsumer("FOO", &nats.ConsumerConfig{ Durable: "BAR", }) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() for info := range js.StreamsInfo(nats.Context(ctx)) { fmt.Println("stream name:", info.Config.Name) } for info := range js.ConsumersInfo("FOO", nats.MaxWait(10*time.Second)) { fmt.Println("consumer name:", info.Name) } js.DeleteConsumer("FOO", "BAR") js.DeleteStream("FOO") } { nc, _ := nats.Connect("localhost") js, _ := nc.JetStream(nats.MaxWait(3 * time.Second)) js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }, nats.MaxWait(2*time.Second)) sub, _ := js.PullSubscribe("foo", "my-durable-name") msgs, _ := sub.Fetch(1) msgs, _ = sub.Fetch(1, nats.MaxWait(2*time.Second)) for _, msg := range msgs { msg.Ack() } } { nc, _ := nats.Connect("localhost") js, _ := nc.JetStream() js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsg(2 * time.Second) msg.AckSync() } { nc, _ := nats.Connect("localhost") js, _ := nc.JetStream() js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) js.Publish("foo", []byte("hello")) sub, _ := js.SubscribeSync("foo") msg, _ := sub.NextMsg(2 * time.Second) meta, _ := msg.Metadata() fmt.Printf("Stream seq: %s:%d, Consumer seq: %s:%d\n", meta.Stream, meta.Sequence.Stream, meta.Consumer, meta.Sequence.Consumer) fmt.Printf("Pending: %d\n", meta.NumPending) fmt.Printf("Pending: %d\n", meta.NumDelivered) } { nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } js, err := nc.JetStream() if err != nil { log.Fatal(err) } js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) js.Publish("foo", []byte("Hello JS!")) js.Publish("foo", []byte("Hello JS!"), nats.AckWait(500*time.Millisecond)) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() js.Publish("foo", []byte("Hello JS!"), nats.Context(ctx)) js.Publish("foo", []byte("Hello JS!"), nats.ExpectStream("FOO")) js.Publish("foo", []byte("Hello JS!"), nats.ExpectLastSequence(5)) js.Publish("foo", []byte("Hello JS!"), nats.MsgId("foo:6")) js.Publish("foo", []byte("Hello JS!"), nats.ExpectLastMsgId("foo:6")) } { nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } js, err := nc.JetStream() if err != nil { log.Fatal(err) } js.AddStream(&nats.StreamConfig{ Name: "FOO", Subjects: []string{"foo"}, }) js.Publish("foo", []byte("Hello JS!")) sub, _ := js.PullSubscribe("foo", "wq") msgs, _ := sub.Fetch(1, nats.MaxWait(2*time.Second)) for _, msg := range msgs { msg.Ack() } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() msgs, _ = sub.Fetch(1, nats.Context(ctx)) for _, msg := range msgs { msg.Ack() } } { nc, err := nats.Connect("localhost") if err != nil { log.Fatal(err) } js, err := nc.JetStream() if err != nil { log.Fatal(err) } js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.AckAll()) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.AckExplicit()) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.AckNone()) js.Subscribe("foo", func(msg *nats.Msg) { msg.Ack() }, nats.ManualAck()) sub, _ := js.SubscribeSync("origin", nats.BindStream("m1")) msg, _ := sub.NextMsg(2 * time.Second) msg.Ack() js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.DeliverAll()) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.DeliverLast()) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.DeliverNew()) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO")) js.SubscribeSync("foo", nats.AckWait(30*time.Second), nats.MaxDeliver(1), nats.EnableFlowControl(), nats.IdleHeartbeat(500*time.Millisecond), ) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.MaxAckPending(5)) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.MaxDeliver(5)) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.PullMaxWaiting(5)) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.PullMaxWaiting(5)) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.RateLimit(1024)) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.ReplayOriginal()) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.StartSequence(10)) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.StartTime(time.Now().Add(-2*time.Hour))) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.ManualAck(), nats.MaxDeliver(2), nats.BackOff([]time.Duration{50 * time.Millisecond, 250 * time.Millisecond})) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO"), nats.ConsumerReplicas(1)) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO"), nats.ConsumerMemoryStorage()) js.Subscribe("foo", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO"), nats.SkipConsumerLookup()) js.Subscribe("", func(msg *nats.Msg) { fmt.Printf("Received a message: %s\n", string(msg.Data)) }, nats.Durable("FOO"), nats.ConsumerFilterSubjects("foo", "bar"), nats.BindStream("test_stream")) } { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() received, wanted, total := 0, 10, 100 sub, _ := nc.Subscribe("foo", func(_ *nats.Msg) { received++ }) sub.AutoUnsubscribe(wanted) for i := 0; i < total; i++ { nc.Publish("foo", []byte("Hello")) } nc.Flush() fmt.Printf("Received = %d", received) } { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() sub, _ := nc.SubscribeSync("foo") m, err := sub.NextMsg(1 * time.Second) if err == nil { fmt.Printf("Received a message: %s\n", string(m.Data)) } else { fmt.Println("NextMsg timed out.") } } { nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() sub, _ := nc.SubscribeSync("foo") sub.Unsubscribe() }
Package-Level Type Names (total 108)
/* sort by: | */
AccountInfo contains info about the JetStream usage from the current account. API APIStats Domain string Tier Tier Tier.Consumers int Tier.Limits AccountLimits Tier.Memory uint64 Tier.ReservedMemory uint64 Tier.ReservedStore uint64 Tier.Store uint64 Tier.Streams int Tiers map[string]Tier func JetStreamContext.AccountInfo(opts ...JSOpt) (*AccountInfo, error) func JetStreamManager.AccountInfo(opts ...JSOpt) (*AccountInfo, error)
AccountLimits includes the JetStream limits of the current account. MaxAckPending int MaxBytesRequired bool MaxConsumers int MaxMemory int64 MaxStore int64 MaxStreams int MemoryMaxStreamBytes int64 StoreMaxStreamBytes int64
AckOpt are the options that can be passed when acknowledge a message. AckWait ContextOpt func (*Msg).Ack(opts ...AckOpt) error func (*Msg).AckSync(opts ...AckOpt) error func (*Msg).InProgress(opts ...AckOpt) error func (*Msg).Nak(opts ...AckOpt) error func (*Msg).NakWithDelay(delay time.Duration, opts ...AckOpt) error func (*Msg).Term(opts ...AckOpt) error
AckPolicy determines how the consumer should acknowledge delivered messages. ( AckPolicy) MarshalJSON() ([]byte, error) ( AckPolicy) String() string (*AckPolicy) UnmarshalJSON(data []byte) error AckPolicy : github.com/goccy/go-json.Marshaler *AckPolicy : github.com/goccy/go-json.Unmarshaler AckPolicy : encoding/json.Marshaler *AckPolicy : encoding/json.Unmarshaler AckPolicy : expvar.Var AckPolicy : fmt.Stringer const AckAllPolicy const AckExplicitPolicy const AckNonePolicy
AckWait sets the maximum amount of time we will wait for an ack. AckWait : AckOpt AckWait : PubOpt AckWait : SubOpt
APIError is included in all API responses if there was an error. Code int Description string ErrorCode ErrorCode APIError implements the JetStreamError interface. Error prints the JetStream API error code and description Is matches against an APIError. *APIError : JetStreamError *APIError : error func (*APIError).APIError() *APIError func JetStreamError.APIError() *APIError
APIStats reports on API calls to JetStream for this account. Errors uint64 Total uint64
AuthTokenHandler is used to generate a new token. func TokenHandler(cb AuthTokenHandler) Option
ClientTrace can be used to trace API interactions for the JetStream Context. RequestSent func(subj string, payload []byte) ResponseReceived func(subj string, payload []byte, hdr Header) ClientTrace : JSOpt
ClusterInfo shows information about the underlying set of servers that make up the stream or consumer. Leader string Name string Replicas []*PeerInfo
A Conn represents a bare connection to a nats-server. It can send and receive []byte payloads. The connection is safe to use in multiple Go routines concurrently. Opts holds the configuration of the Conn. Modifying the configuration of a running Conn is a race. Keep all members for which we use atomic at the beginning of the struct and make sure they are all 64bits (or use padding if necessary). atomic.* functions crash on 32bit machines if operand is not aligned at 64bit. See https://github.com/golang/go/issues/599 Statistics.InBytes uint64 Statistics.InMsgs uint64 Statistics.OutBytes uint64 Statistics.OutMsgs uint64 Statistics.Reconnects uint64 AuthRequired will return if the connected server requires authorization. Barrier schedules the given function `f` to all registered asynchronous subscriptions. Only the last subscription to see this barrier will invoke the function. If no subscription is registered at the time of this call, `f()` is invoked right away. ErrConnectionClosed is returned if the connection is closed prior to the call. Buffered will return the number of bytes buffered to be sent to the server. FIXME(dlc) take into account disconnected state. ChanQueueSubscribe will express interest in the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message, which will be placed on the channel. You should not close the channel until sub.Unsubscribe() has been called. Note: This is the same than QueueSubscribeSyncWithChan. ChanSubscribe will express interest in the given subject and place all messages received on the channel. You should not close the channel until sub.Unsubscribe() has been called. Close will close the connection to the server. This call will release all blocking calls, such as Flush() and NextMsg() ClosedHandler will return the closed event handler. ConnectedAddr returns the connected server's IP ConnectedClusterName reports the connected server's cluster name if any ConnectedServerId reports the connected server's Id ConnectedServerName reports the connected server's name ConnectedServerVersion reports the connected server's version as a string ConnectedUrl reports the connected server's URL ConnectedUrlRedacted reports the connected server's URL with passwords redacted DisconnectErrHandler will return the disconnect event handler. DiscoveredServers returns only the server urls that have been discovered after a connection has been established. If authentication is enabled, use UserInfo or Token when connecting with these urls. DiscoveredServersHandler will return the discovered servers handler. Drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ClosedCB option to know when the connection has moved from draining to closed. See note in Subscription.Drain for JetStream subscriptions. ErrorHandler will return the async error handler. Flush will perform a round trip to the server and return when it receives the internal reply. FlushTimeout allows a Flush operation to have an associated timeout. FlushWithContext will allow a context to control the duration of a Flush() call. This context should be non-nil and should have a deadline set. We will return an error if none is present. ForceReconnect forces a reconnect attempt to the server. This is a non-blocking call and will start the reconnect process without waiting for it to complete. If the connection is already in the process of reconnecting, this call will force an immediate reconnect attempt (bypassing the current reconnect delay). GetClientID returns the client ID assigned by the server to which the client is currently connected to. Note that the value may change if the client reconnects. This function returns ErrClientIDNotSupported if the server is of a version prior to 1.2.0. GetClientIP returns the client IP as known by the server. Supported as of server version 2.1.6. HeadersSupported will return if the server supports headers IsClosed tests if a Conn has been closed. IsConnected tests if a Conn is connected. IsDraining tests if a Conn is in the draining state. IsReconnecting tests if a Conn is reconnecting. JetStream returns a JetStreamContext for messaging and stream management. Errors are only returned if inconsistent options are provided. NOTE: JetStreamContext is part of legacy API. Users are encouraged to switch to the new JetStream API for enhanced capabilities and simplified API. Please refer to the `jetstream` package. See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md LastError reports the last error encountered via the connection. It can be used reliably within ClosedCB in order to find out reason why connection was closed for example. MaxPayload returns the size limit that a message payload can have. This is set by the server configuration and delivered to the client upon connect. Create a new inbox that is prefix aware. NewRespInbox is the new format used for _INBOX. NumSubscriptions returns active number of subscriptions. Publish publishes the data argument to the given subject. The data argument is left untouched and needs to be correctly interpreted on the receiver. PublishMsg publishes the Msg structure, which includes the Subject, an optional Reply and an optional Data field. PublishRequest will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline. QueueSubscribe creates an asynchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message asynchronously. QueueSubscribeSync creates a synchronous queue subscriber on the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message synchronously using Subscription.NextMsg(). QueueSubscribeSyncWithChan will express interest in the given subject. All subscribers with the same queue name will form the queue group and only one member of the group will be selected to receive any given message, which will be placed on the channel. You should not close the channel until sub.Unsubscribe() has been called. Note: This is the same than ChanQueueSubscribe. RTT calculates the round trip time between this client and the server. ReconnectHandler will return the reconnect event handler. RemoveStatusListener removes a status change listener. If the channel is not closed, it will be closed. Listeners will be removed automatically on status change as well, but this is a way to remove them manually. Request will send a request payload and deliver the response message, or an error, including a timeout if no message was received properly. RequestMsg will send a request payload including optional headers and deliver the response message, or an error, including a timeout if no message was received properly. RequestMsgWithContext takes a context, a subject and payload in bytes and request expecting a single response. RequestWithContext takes a context, a subject and payload in bytes and request expecting a single response. Servers returns the list of known server urls, including additional servers discovered after a connection has been established. If authentication is enabled, use UserInfo or Token when connecting with these urls. SetClosedHandler will set the closed event handler. SetDisconnectErrHandler will set the disconnect event handler. SetDisconnectHandler will set the disconnect event handler. Deprecated: Use SetDisconnectErrHandler SetDiscoveredServersHandler will set the discovered servers handler. SetErrorHandler will set the async error handler. SetReconnectHandler will set the reconnect event handler. Stats will return a race safe copy of the Statistics section for the connection. Status returns the current state of the connection. StatusChanged returns a channel on which given list of connection status changes will be reported. If no statuses are provided, defaults will be used: CONNECTED, RECONNECTING, DISCONNECTED, CLOSED. Subscribe will express interest in the given subject. The subject can have wildcards. There are two type of wildcards: * for partial, and > for full. A subscription on subject time.*.east would receive messages sent to time.us.east and time.eu.east. A subscription on subject time.us.> would receive messages sent to time.us.east and time.us.east.atlanta, while time.us.* would only match time.us.east since it can't match more than one token. Messages will be delivered to the associated MsgHandler. SubscribeSync will express interest on the given subject. Messages will be received synchronously using Subscription.NextMsg(). TLSConnectionState retrieves the state of the TLS connection to the server TLSRequired will return if the connected server requires TLS connections. *Conn : github.com/apache/thrift/lib/go/thrift.Flusher func Connect(url string, options ...Option) (*Conn, error) func Options.Connect() (*Conn, error) func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error) func github.com/pancsta/asyncmachine-go/pkg/integrations/nats.Add(ctx context.Context, nc *Conn, topic, machID string, states am.S, args am.A) (am.Result, error) func github.com/pancsta/asyncmachine-go/pkg/integrations/nats.ExposeMachine(ctx context.Context, mach am.Api, nc *Conn, topic, queue string) error func github.com/pancsta/asyncmachine-go/pkg/integrations/nats.Remove(ctx context.Context, nc *Conn, machID, topic string, states am.S, args am.A) (am.Result, error)
ConnErrHandler is used to process asynchronous events like disconnected connection with the error (if any). func (*Conn).DisconnectErrHandler() ConnErrHandler func DisconnectErrHandler(cb ConnErrHandler) Option func ReconnectErrHandler(cb ConnErrHandler) Option func (*Conn).SetDisconnectErrHandler(dcb ConnErrHandler)
ConnHandler is used for asynchronous events such as disconnected and closed connections. func (*Conn).ClosedHandler() ConnHandler func (*Conn).DiscoveredServersHandler() ConnHandler func (*Conn).ReconnectHandler() ConnHandler func ClosedHandler(cb ConnHandler) Option func ConnectHandler(cb ConnHandler) Option func DisconnectHandler(cb ConnHandler) Option func DiscoveredServersHandler(cb ConnHandler) Option func LameDuckModeHandler(cb ConnHandler) Option func ReconnectHandler(cb ConnHandler) Option func (*Conn).SetClosedHandler(cb ConnHandler) func (*Conn).SetDisconnectHandler(dcb ConnHandler) func (*Conn).SetDiscoveredServersHandler(dscb ConnHandler) func (*Conn).SetReconnectHandler(rcb ConnHandler)
ConsumerConfig is the configuration of a JetStream consumer. AckPolicy AckPolicy AckWait time.Duration BackOff []time.Duration DeliverGroup string DeliverPolicy DeliverPolicy Push based consumers. Description string Durable string FilterSubject string FilterSubjects []string FlowControl bool HeadersOnly bool Heartbeat time.Duration Inactivity threshold. MaxAckPending int MaxDeliver int Pull based options. MaxRequestExpires time.Duration MaxRequestMaxBytes int MaxWaiting int Force memory storage. Metadata is additional metadata for the Consumer. Keys starting with `_nats` are reserved. NOTE: Metadata requires nats-server v2.10.0+ Name string OptStartSeq uint64 OptStartTime *time.Time // Bits per sec ReplayPolicy ReplayPolicy Generally inherited by parent stream and other markers, now can be configured directly. SampleFrequency string func JetStreamContext.AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) func JetStreamContext.UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) func JetStreamManager.AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) func JetStreamManager.UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error)
ConsumerInfo is the info from a JetStream consumer. AckFloor SequenceInfo Cluster *ClusterInfo Config ConsumerConfig Created time.Time Delivered SequenceInfo Name string NumAckPending int NumPending uint64 NumRedelivered int NumWaiting int PushBound bool Stream string func JetStreamContext.AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) func JetStreamContext.ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error) func JetStreamContext.Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo func JetStreamContext.ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo func JetStreamContext.UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) func JetStreamManager.AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) func JetStreamManager.ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error) func JetStreamManager.Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo func JetStreamManager.ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo func JetStreamManager.UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) func (*Subscription).ConsumerInfo() (*ConsumerInfo, error)
ContextOpt is an option used to set a context.Context. Context context.Context Deadline returns the time when work done on behalf of this context should be canceled. Deadline returns ok==false when no deadline is set. Successive calls to Deadline return the same results. Done returns a channel that's closed when work done on behalf of this context should be canceled. Done may return nil if this context can never be canceled. Successive calls to Done return the same value. The close of the Done channel may happen asynchronously, after the cancel function returns. WithCancel arranges for Done to be closed when cancel is called; WithDeadline arranges for Done to be closed when the deadline expires; WithTimeout arranges for Done to be closed when the timeout elapses. Done is provided for use in select statements: // Stream generates values with DoSomething and sends them to out // until DoSomething returns an error or ctx.Done is closed. func Stream(ctx context.Context, out chan<- Value) error { for { v, err := DoSomething(ctx) if err != nil { return err } select { case <-ctx.Done(): return ctx.Err() case out <- v: } } } See https://blog.golang.org/pipelines for more examples of how to use a Done channel for cancellation. If Done is not yet closed, Err returns nil. If Done is closed, Err returns a non-nil error explaining why: DeadlineExceeded if the context's deadline passed, or Canceled if the context was canceled for some other reason. After Err returns a non-nil error, successive calls to Err return the same error. Value returns the value associated with this context for key, or nil if no value is associated with key. Successive calls to Value with the same key returns the same result. Use context values only for request-scoped data that transits processes and API boundaries, not for passing optional parameters to functions. A key identifies a specific value in a Context. Functions that wish to store values in Context typically allocate a key in a global variable then use that key as the argument to context.WithValue and Context.Value. A key can be any type that supports equality; packages should define keys as an unexported type to avoid collisions. Packages that define a Context key should provide type-safe accessors for the values stored using that key: // Package user defines a User type that's stored in Contexts. package user import "context" // User is the type of value stored in the Contexts. type User struct {...} // key is an unexported type for keys defined in this package. // This prevents collisions with keys defined in other packages. type key int // userKey is the key for user.User values in Contexts. It is // unexported; clients use user.NewContext and user.FromContext // instead of using this key directly. var userKey key // NewContext returns a new Context that carries value u. func NewContext(ctx context.Context, u *User) context.Context { return context.WithValue(ctx, userKey, u) } // FromContext returns the User value stored in ctx, if any. func FromContext(ctx context.Context) (*User, bool) { u, ok := ctx.Value(userKey).(*User) return u, ok } ContextOpt : AckOpt ContextOpt : GetObjectInfoOpt ContextOpt : GetObjectOpt ContextOpt : JSOpt ContextOpt : ListObjectsOpt ContextOpt : ObjectOpt ContextOpt : PubOpt ContextOpt : PullOpt ContextOpt : PurgeOpt ContextOpt : SubOpt ContextOpt : WatchOpt ContextOpt : context.Context func Context(ctx context.Context) ContextOpt
CustomDialer can be used to specify any dialer, not necessarily a *net.Dialer. A CustomDialer may also implement `SkipTLSHandshake() bool` in order to skip the TLS handshake in case not required. ( CustomDialer) Dial(network, address string) (net.Conn, error) github.com/pion/transport/v2.Dialer (interface) github.com/pion/transport/v2.Net (interface) *github.com/pion/transport/v2/stdnet.Net github.com/pion/transport/v3.Dialer (interface) github.com/pion/transport/v3.Net (interface) *github.com/pion/transport/v3/stdnet.Net *github.com/pion/transport/v3/vnet.Net *github.com/pion/turn/v4/internal/client.TCPAllocation *crypto/tls.Dialer *golang.org/x/crypto/ssh.Client *golang.org/x/net/internal/socks.Dialer golang.org/x/net/proxy.Dialer (interface) *golang.org/x/net/proxy.PerHost *net.Dialer CustomDialer : github.com/pion/transport/v2.Dialer CustomDialer : github.com/pion/transport/v3.Dialer CustomDialer : golang.org/x/net/proxy.Dialer func SetCustomDialer(dialer CustomDialer) Option
DeleteMarkersOlderThan indicates that delete or purge markers older than that will be deleted as part of PurgeDeletes() operation, otherwise, only the data will be removed but markers that are recent will be kept. Note that if no option is specified, the default is 30 minutes. You can set this option to a negative value to instruct to always remove the markers, regardless of their age. DeleteMarkersOlderThan : PurgeOpt
func LastRevision(revision uint64) DeleteOpt func KeyValue.Delete(key string, opts ...DeleteOpt) error func KeyValue.Purge(key string, opts ...DeleteOpt) error
DeliverPolicy determines how the consumer should select the first message to deliver. ( DeliverPolicy) MarshalJSON() ([]byte, error) (*DeliverPolicy) UnmarshalJSON(data []byte) error DeliverPolicy : github.com/goccy/go-json.Marshaler *DeliverPolicy : github.com/goccy/go-json.Unmarshaler DeliverPolicy : encoding/json.Marshaler *DeliverPolicy : encoding/json.Unmarshaler const DeliverAllPolicy const DeliverByStartSequencePolicy const DeliverByStartTimePolicy const DeliverLastPerSubjectPolicy const DeliverLastPolicy const DeliverNewPolicy
DiscardPolicy determines how to proceed when limits of messages or bytes are reached. ( DiscardPolicy) MarshalJSON() ([]byte, error) ( DiscardPolicy) String() string (*DiscardPolicy) UnmarshalJSON(data []byte) error DiscardPolicy : github.com/goccy/go-json.Marshaler *DiscardPolicy : github.com/goccy/go-json.Unmarshaler DiscardPolicy : encoding/json.Marshaler *DiscardPolicy : encoding/json.Unmarshaler DiscardPolicy : expvar.Var DiscardPolicy : fmt.Stringer const DiscardNew const DiscardOld
EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to a nats server and have an extendable encoder system that will encode and decode messages from raw Go types. Deprecated: Encoded connections are no longer supported. Conn *Conn Enc Encoder BindRecvChan binds a channel for receive operations from NATS. Deprecated: Encoded connections are no longer supported. BindRecvQueueChan binds a channel for queue-based receive operations from NATS. Deprecated: Encoded connections are no longer supported. BindSendChan binds a channel for send operations to NATS. Deprecated: Encoded connections are no longer supported. Close will close the connection to the server. This call will release all blocking calls, such as Flush(), etc. Deprecated: Encoded connections are no longer supported. Drain will put a connection into a drain state. All subscriptions will immediately be put into a drain state. Upon completion, the publishers will be drained and can not publish any additional messages. Upon draining of the publishers, the connection will be closed. Use the ClosedCB() option to know when the connection has moved from draining to closed. Deprecated: Encoded connections are no longer supported. Flush will perform a round trip to the server and return when it receives the internal reply. Deprecated: Encoded connections are no longer supported. FlushTimeout allows a Flush operation to have an associated timeout. Deprecated: Encoded connections are no longer supported. LastError reports the last error encountered via the Connection. Deprecated: Encoded connections are no longer supported. Publish publishes the data argument to the given subject. The data argument will be encoded using the associated encoder. Deprecated: Encoded connections are no longer supported. PublishRequest will perform a Publish() expecting a response on the reply subject. Use Request() for automatically waiting for a response inline. Deprecated: Encoded connections are no longer supported. QueueSubscribe will create a queue subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above. Deprecated: Encoded connections are no longer supported. Request will create an Inbox and perform a Request() call with the Inbox reply for the data v. A response will be decoded into the vPtr Response. Deprecated: Encoded connections are no longer supported. RequestWithContext will create an Inbox and perform a Request using the provided cancellation context with the Inbox reply for the data v. A response will be decoded into the vPtr last parameter. Deprecated: Encoded connections are no longer supported. Subscribe will create a subscription on the given subject and process incoming messages using the specified Handler. The Handler should be a func that matches a signature from the description of Handler from above. Deprecated: Encoded connections are no longer supported. *EncodedConn : github.com/apache/thrift/lib/go/thrift.Flusher func NewEncodedConn(c *Conn, encType string) (*EncodedConn, error)
Encoder interface is for all register encoders Deprecated: Encoded connections are no longer supported. ( Encoder) Decode(subject string, data []byte, vPtr any) error ( Encoder) Encode(subject string, v any) ([]byte, error) *github.com/nats-io/nats.go/encoders/builtin.DefaultEncoder *github.com/nats-io/nats.go/encoders/builtin.GobEncoder *github.com/nats-io/nats.go/encoders/builtin.JsonEncoder func EncoderForType(encType string) Encoder func RegisterEncoder(encType string, enc Encoder)
ErrConsumerSequenceMismatch represents an error from a consumer that received a Heartbeat including sequence different to the one expected from the view of the client. ConsumerSequence is the sequence of the consumer that is behind. LastConsumerSequence is the sequence of the consumer when the heartbeat was received. StreamResumeSequence is the stream sequence from where the consumer should resume consuming from the stream. (*ErrConsumerSequenceMismatch) Error() string *ErrConsumerSequenceMismatch : error
ErrHandler is used to process asynchronous errors encountered while processing inbound messages. func (*Conn).ErrorHandler() ErrHandler func ErrorHandler(cb ErrHandler) Option func (*Conn).SetErrorHandler(cb ErrHandler)
ExternalStream allows you to qualify access to a stream source in another account. APIPrefix string DeliverPrefix string
ContextOpt func GetObjectInfoShowDeleted() GetObjectInfoOpt func ObjectStore.GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error)
ContextOpt func GetObjectShowDeleted() GetObjectOpt func ObjectStore.Get(name string, opts ...GetObjectOpt) (ObjectResult, error) func ObjectStore.GetBytes(name string, opts ...GetObjectOpt) ([]byte, error) func ObjectStore.GetFile(name, file string, opts ...GetObjectOpt) error func ObjectStore.GetString(name string, opts ...GetObjectOpt) (string, error)
Handler is a specific callback used for Subscribe. It is generalized to an any, but we will discover its format and arguments at runtime and perform the correct callback, including demarshaling encoded data back into the appropriate struct based on the signature of the Handler. Handlers are expected to have one of four signatures. type person struct { Name string `json:"name,omitempty"` Age uint `json:"age,omitempty"` } handler := func(m *Msg) handler := func(p *person) handler := func(subject string, o *obj) handler := func(subject, reply string, o *obj) These forms allow a callback to request a raw Msg ptr, where the processing of the message from the wire is untouched. Process a JSON representation and demarshal it into the given struct, e.g. person. There are also variants where the callback wants either the subject, or the subject and the reply subject. Deprecated: Encoded connections are no longer supported. func (*EncodedConn).QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error) func (*EncodedConn).Subscribe(subject string, cb Handler) (*Subscription, error)
Header represents the optional Header for a NATS message, based on the implementation of http.Header. Add adds the key, value pair to the header. It is case-sensitive and appends to any existing values associated with key. Del deletes the values associated with a key. It is case-sensitive. Get gets the first value associated with the given key. It is case-sensitive. Set sets the header entries associated with key to the single element value. It is case-sensitive and replaces any existing values associated with key. Values returns all values associated with the given key. It is case-sensitive. Header : github.com/redis/go-redis/v9.ConsistentHash Header : go.opentelemetry.io/otel/propagation.ValuesGetter func DecodeHeadersMsg(data []byte) (Header, error)
( InProcessConnProvider) InProcessConn() (net.Conn, error) func InProcessServer(server InProcessConnProvider) Option
JetStream allows persistent messaging through JetStream. NOTE: JetStream is part of legacy API. Users are encouraged to switch to the new JetStream API for enhanced capabilities and simplified API. Please refer to the `jetstream` package. See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md ChanQueueSubscribe creates channel based Subscription with a queue group. See important note in QueueSubscribe() ChanSubscribe creates channel based Subscription. See important note in Subscribe() CleanupPublisher will cleanup the publishing side of JetStreamContext. This will unsubscribe from the internal reply subject if needed. All pending async publishes will fail with ErrJetStreamPublisherClosed. If an error handler was provided, it will be called for each pending async publish and PublishAsyncComplete will be closed. After completing JetStreamContext is still usable - internal subscription will be recreated on next publish, but the acks from previous publishes will be lost. Publish publishes a message to JetStream. PublishAsync publishes a message to JetStream and returns a PubAckFuture. The data should not be changed until the PubAckFuture has been processed. PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd. PublishAsyncPending returns the number of async publishes outstanding for this context. PublishMsg publishes a Msg to JetStream. PublishMsgAsync publishes a Msg to JetStream and returns a PubAckFuture. The message should not be changed until the PubAckFuture has been processed. PullSubscribe creates a Subscription that can fetch messages. See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be set to an empty string. When using PullSubscribe, the messages are fetched using Fetch() and FetchBatch() methods. QueueSubscribe creates a Subscription with a queue group. If no optional durable name nor binding options are specified, the queue name will be used as a durable name. See important note in Subscribe() QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously. See important note in QueueSubscribe() Subscribe creates an async Subscription for JetStream. The stream and consumer names can be provided with the nats.Bind() option. For creating an ephemeral (where the consumer name is picked by the server), you can provide the stream name with nats.BindStream(). If no stream name is specified, the library will attempt to figure out which stream the subscription is for. See important notes below for more details. IMPORTANT NOTES: * If none of the options Bind() nor Durable() are specified, the library will send a request to the server to create an ephemeral JetStream consumer, which will be deleted after an Unsubscribe() or Drain(), or automatically by the server after a short period of time after the NATS subscription is gone. * If Durable() option is specified, the library will attempt to lookup a JetStream consumer with this name, and if found, will bind to it and not attempt to delete it. However, if not found, the library will send a request to create such durable JetStream consumer. Note that the library will delete the JetStream consumer after an Unsubscribe() or Drain() only if it created the durable consumer while subscribing. If the durable consumer already existed prior to subscribing it won't be deleted. * If Bind() option is provided, the library will attempt to lookup the consumer with the given name, and if successful, bind to it. If the lookup fails, then the Subscribe() call will return an error. SubscribeSync creates a Subscription that can be used to process messages synchronously. See important note in Subscribe() JetStreamContext (interface)
JetStreamContext allows JetStream messaging and stream management. NOTE: JetStreamContext is part of legacy API. Users are encouraged to switch to the new JetStream API for enhanced capabilities and simplified API. Please refer to the `jetstream` package. See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md AccountInfo retrieves info about the JetStream usage from an account. AddConsumer adds a consumer to a stream. If the consumer already exists, and the configuration is the same, it will return the existing consumer. If the consumer already exists, and the configuration is different, it will return ErrConsumerNameAlreadyInUse. AddStream creates a stream. ChanQueueSubscribe creates channel based Subscription with a queue group. See important note in QueueSubscribe() ChanSubscribe creates channel based Subscription. See important note in Subscribe() CleanupPublisher will cleanup the publishing side of JetStreamContext. This will unsubscribe from the internal reply subject if needed. All pending async publishes will fail with ErrJetStreamPublisherClosed. If an error handler was provided, it will be called for each pending async publish and PublishAsyncComplete will be closed. After completing JetStreamContext is still usable - internal subscription will be recreated on next publish, but the acks from previous publishes will be lost. ConsumerInfo retrieves information of a consumer from a stream. ConsumerNames is used to retrieve a list of Consumer names. Consumers is used to retrieve a list of ConsumerInfo objects. ConsumersInfo is used to retrieve a list of ConsumerInfo objects. Deprecated: Use Consumers() instead. CreateKeyValue will create a KeyValue store with the following configuration. CreateObjectStore will create an object store. DeleteConsumer deletes a consumer. DeleteKeyValue will delete this KeyValue store (JetStream stream). DeleteMsg deletes a message from a stream. The message is marked as erased, but its value is not overwritten. DeleteObjectStore will delete the underlying stream for the named object. DeleteStream deletes a stream. GetLastMsg retrieves the last raw stream message stored in JetStream by subject. Use option nats.DirectGet() to trigger retrieval directly from a distributed group of servers (leader and replicas). The stream must have been created/updated with the AllowDirect boolean. GetMsg retrieves a raw stream message stored in JetStream by sequence number. Use options nats.DirectGet() or nats.DirectGetNext() to trigger retrieval directly from a distributed group of servers (leader and replicas). The stream must have been created/updated with the AllowDirect boolean. KeyValue will lookup and bind to an existing KeyValue store. KeyValueStoreNames is used to retrieve a list of key value store names KeyValueStores is used to retrieve a list of key value store statuses ObjectStore will look up and bind to an existing object store instance. ObjectStoreNames is used to retrieve a list of bucket names ObjectStores is used to retrieve a list of bucket statuses Publish publishes a message to JetStream. PublishAsync publishes a message to JetStream and returns a PubAckFuture. The data should not be changed until the PubAckFuture has been processed. PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd. PublishAsyncPending returns the number of async publishes outstanding for this context. PublishMsg publishes a Msg to JetStream. PublishMsgAsync publishes a Msg to JetStream and returns a PubAckFuture. The message should not be changed until the PubAckFuture has been processed. PullSubscribe creates a Subscription that can fetch messages. See important note in Subscribe(). Additionally, for an ephemeral pull consumer, the "durable" value must be set to an empty string. When using PullSubscribe, the messages are fetched using Fetch() and FetchBatch() methods. PurgeStream purges a stream messages. QueueSubscribe creates a Subscription with a queue group. If no optional durable name nor binding options are specified, the queue name will be used as a durable name. See important note in Subscribe() QueueSubscribeSync creates a Subscription with a queue group that can be used to process messages synchronously. See important note in QueueSubscribe() SecureDeleteMsg deletes a message from a stream. The deleted message is overwritten with random data As a result, this operation is slower than DeleteMsg() StreamInfo retrieves information from a stream. StreamNameBySubject returns a stream matching given subject. StreamNames is used to retrieve a list of Stream names. Streams can be used to retrieve a list of StreamInfo objects. StreamsInfo can be used to retrieve a list of StreamInfo objects. Deprecated: Use Streams() instead. Subscribe creates an async Subscription for JetStream. The stream and consumer names can be provided with the nats.Bind() option. For creating an ephemeral (where the consumer name is picked by the server), you can provide the stream name with nats.BindStream(). If no stream name is specified, the library will attempt to figure out which stream the subscription is for. See important notes below for more details. IMPORTANT NOTES: * If none of the options Bind() nor Durable() are specified, the library will send a request to the server to create an ephemeral JetStream consumer, which will be deleted after an Unsubscribe() or Drain(), or automatically by the server after a short period of time after the NATS subscription is gone. * If Durable() option is specified, the library will attempt to lookup a JetStream consumer with this name, and if found, will bind to it and not attempt to delete it. However, if not found, the library will send a request to create such durable JetStream consumer. Note that the library will delete the JetStream consumer after an Unsubscribe() or Drain() only if it created the durable consumer while subscribing. If the durable consumer already existed prior to subscribing it won't be deleted. * If Bind() option is provided, the library will attempt to lookup the consumer with the given name, and if successful, bind to it. If the lookup fails, then the Subscribe() call will return an error. SubscribeSync creates a Subscription that can be used to process messages synchronously. See important note in Subscribe() UpdateConsumer updates an existing consumer. UpdateStream updates a stream. JetStreamContext : JetStream JetStreamContext : JetStreamManager JetStreamContext : KeyValueManager JetStreamContext : ObjectStoreManager func (*Conn).JetStream(opts ...JSOpt) (JetStreamContext, error)
JetStreamError is an error result that happens when using JetStream. In case of client-side error, `APIError()` returns nil ( JetStreamError) APIError() *APIError ( JetStreamError) Error() builtin.string *APIError JetStreamError : error var ErrAsyncPublishTimeout var ErrBadRequest var ErrCantAckIfConsumerAckNone var ErrConsumerConfigRequired var ErrConsumerDeleted var ErrConsumerLeadershipChanged var ErrConsumerMultipleFilterSubjectsNotSupported var ErrConsumerNameAlreadyInUse var ErrConsumerNameRequired var ErrConsumerNotActive var ErrConsumerNotFound var ErrContextAndTimeout var ErrDuplicateFilterSubjects var ErrEmptyFilter var ErrInvalidConsumerName var ErrInvalidFilterSubject var ErrInvalidJSAck var ErrInvalidStreamName var ErrJetStreamNotEnabled var ErrJetStreamNotEnabledForAccount var ErrJetStreamPublisherClosed var ErrKeyExists var ErrMsgAlreadyAckd var ErrMsgNotFound var ErrNoHeartbeat var ErrNoMatchingStream var ErrNoStreamResponse var ErrNotJSMessage var ErrOverlappingFilterSubjects var ErrPullSubscribeRequired var ErrPullSubscribeToPushConsumer var ErrStreamConfigRequired var ErrStreamNameAlreadyInUse var ErrStreamNameRequired var ErrStreamNotFound var ErrStreamSourceMultipleSubjectTransformsNotSupported var ErrStreamSourceNotSupported var ErrStreamSourceSubjectTransformNotSupported var ErrStreamSubjectTransformNotSupported var ErrSubjectMismatch var ErrSubscriptionClosed var ErrTooManyStalledMsgs
JetStreamManager manages JetStream Streams and Consumers. AccountInfo retrieves info about the JetStream usage from an account. AddConsumer adds a consumer to a stream. If the consumer already exists, and the configuration is the same, it will return the existing consumer. If the consumer already exists, and the configuration is different, it will return ErrConsumerNameAlreadyInUse. AddStream creates a stream. ConsumerInfo retrieves information of a consumer from a stream. ConsumerNames is used to retrieve a list of Consumer names. Consumers is used to retrieve a list of ConsumerInfo objects. ConsumersInfo is used to retrieve a list of ConsumerInfo objects. Deprecated: Use Consumers() instead. DeleteConsumer deletes a consumer. DeleteMsg deletes a message from a stream. The message is marked as erased, but its value is not overwritten. DeleteStream deletes a stream. GetLastMsg retrieves the last raw stream message stored in JetStream by subject. Use option nats.DirectGet() to trigger retrieval directly from a distributed group of servers (leader and replicas). The stream must have been created/updated with the AllowDirect boolean. GetMsg retrieves a raw stream message stored in JetStream by sequence number. Use options nats.DirectGet() or nats.DirectGetNext() to trigger retrieval directly from a distributed group of servers (leader and replicas). The stream must have been created/updated with the AllowDirect boolean. PurgeStream purges a stream messages. SecureDeleteMsg deletes a message from a stream. The deleted message is overwritten with random data As a result, this operation is slower than DeleteMsg() StreamInfo retrieves information from a stream. StreamNameBySubject returns a stream matching given subject. StreamNames is used to retrieve a list of Stream names. Streams can be used to retrieve a list of StreamInfo objects. StreamsInfo can be used to retrieve a list of StreamInfo objects. Deprecated: Use Streams() instead. UpdateConsumer updates an existing consumer. UpdateStream updates a stream. JetStreamContext (interface)
JSOpt configures a JetStreamContext. ClientTrace ContextOpt MaxWait *StreamInfoRequest *StreamPurgeRequest func APIPrefix(pre string) JSOpt func DirectGet() JSOpt func DirectGetNext(subject string) JSOpt func Domain(domain string) JSOpt func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt func PublishAsyncMaxPending(max int) JSOpt func PublishAsyncTimeout(dur time.Duration) JSOpt func StreamListFilter(subject string) JSOpt func UseLegacyDurableConsumers() JSOpt func (*Conn).JetStream(opts ...JSOpt) (JetStreamContext, error) func JetStreamContext.AccountInfo(opts ...JSOpt) (*AccountInfo, error) func JetStreamContext.AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) func JetStreamContext.AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) func JetStreamContext.ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error) func JetStreamContext.ConsumerNames(stream string, opts ...JSOpt) <-chan string func JetStreamContext.Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo func JetStreamContext.ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo func JetStreamContext.DeleteConsumer(stream, consumer string, opts ...JSOpt) error func JetStreamContext.DeleteMsg(name string, seq uint64, opts ...JSOpt) error func JetStreamContext.DeleteStream(name string, opts ...JSOpt) error func JetStreamContext.GetLastMsg(name, subject string, opts ...JSOpt) (*RawStreamMsg, error) func JetStreamContext.GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) func JetStreamContext.PurgeStream(name string, opts ...JSOpt) error func JetStreamContext.SecureDeleteMsg(name string, seq uint64, opts ...JSOpt) error func JetStreamContext.StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) func JetStreamContext.StreamNameBySubject(string, ...JSOpt) (string, error) func JetStreamContext.StreamNames(opts ...JSOpt) <-chan string func JetStreamContext.Streams(opts ...JSOpt) <-chan *StreamInfo func JetStreamContext.StreamsInfo(opts ...JSOpt) <-chan *StreamInfo func JetStreamContext.UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) func JetStreamContext.UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) func JetStreamManager.AccountInfo(opts ...JSOpt) (*AccountInfo, error) func JetStreamManager.AddConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) func JetStreamManager.AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) func JetStreamManager.ConsumerInfo(stream, name string, opts ...JSOpt) (*ConsumerInfo, error) func JetStreamManager.ConsumerNames(stream string, opts ...JSOpt) <-chan string func JetStreamManager.Consumers(stream string, opts ...JSOpt) <-chan *ConsumerInfo func JetStreamManager.ConsumersInfo(stream string, opts ...JSOpt) <-chan *ConsumerInfo func JetStreamManager.DeleteConsumer(stream, consumer string, opts ...JSOpt) error func JetStreamManager.DeleteMsg(name string, seq uint64, opts ...JSOpt) error func JetStreamManager.DeleteStream(name string, opts ...JSOpt) error func JetStreamManager.GetLastMsg(name, subject string, opts ...JSOpt) (*RawStreamMsg, error) func JetStreamManager.GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) func JetStreamManager.PurgeStream(name string, opts ...JSOpt) error func JetStreamManager.SecureDeleteMsg(name string, seq uint64, opts ...JSOpt) error func JetStreamManager.StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) func JetStreamManager.StreamNameBySubject(string, ...JSOpt) (string, error) func JetStreamManager.StreamNames(opts ...JSOpt) <-chan string func JetStreamManager.Streams(opts ...JSOpt) <-chan *StreamInfo func JetStreamManager.StreamsInfo(opts ...JSOpt) <-chan *StreamInfo func JetStreamManager.UpdateConsumer(stream string, cfg *ConsumerConfig, opts ...JSOpt) (*ConsumerInfo, error) func JetStreamManager.UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)
KeyLister is used to retrieve a list of key value store keys ( KeyLister) Keys() <-chan string ( KeyLister) Stop() error func KeyValue.ListKeys(opts ...WatchOpt) (KeyLister, error)
KeyValue contains methods to operate on a KeyValue store. Bucket returns the current bucket name. Create will add the key/value pair iff it does not exist. Delete will place a delete marker and leave all revisions. Get returns the latest value for the key. GetRevision returns a specific revision value for the key. History will return all historical values for the key. Keys will return all keys. Deprecated: Use ListKeys instead to avoid memory issues. ListKeys will return all keys in a channel. Purge will place a delete marker and remove all previous revisions. PurgeDeletes will remove all current delete markers. Put will place the new value for the key into the store. PutString will place the string for the key into the store. Status retrieves the status and configuration of a bucket Update will update the value iff the latest revision matches. Update also resets the TTL associated with the key (if any). Watch for any updates to keys that match the keys argument which could include wildcards. Watch will send a nil entry when it has received all initial values. WatchAll will invoke the callback for all updates. WatchFiltered will watch for any updates to keys that match the keys argument. It can be configured with the same options as Watch. func JetStreamContext.CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) func JetStreamContext.KeyValue(bucket string) (KeyValue, error) func KeyValueManager.CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) func KeyValueManager.KeyValue(bucket string) (KeyValue, error)
KeyValueBucketStatus represents status of a Bucket, implements KeyValueStatus BackingStore indicates what technology is used for storage of the bucket Bucket the name of the bucket Bytes is the size of the stream History returns the configured history kept per key IsCompressed indicates if the data is compressed on disk StreamInfo is the stream info retrieved to create the status TTL is how long the bucket keeps values for Values is how many messages are in the bucket, including historical values *KeyValueBucketStatus : KeyValueStatus
KeyValueConfig is for configuring a KeyValue store. Bucket string Enable underlying stream compression. NOTE: Compression is supported for nats-server 2.10.0+ Description string History uint8 MaxBytes int64 MaxValueSize int32 Mirror *StreamSource Placement *Placement RePublish *RePublish Replicas int Sources []*StreamSource Storage StorageType TTL time.Duration func JetStreamContext.CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) func KeyValueManager.CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error)
KeyValueEntry is a retrieved entry for Get or List or Watch. Bucket is the bucket the data was loaded from. Created is the time the data was put in the bucket. Delta is distance from the latest value. Key is the key that was retrieved. Operation returns Put or Delete or Purge. Revision is a unique sequence for this value. Value is the retrieved value. func KeyValue.Get(key string) (entry KeyValueEntry, err error) func KeyValue.GetRevision(key string, revision uint64) (entry KeyValueEntry, err error) func KeyValue.History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) func KeyWatcher.Updates() <-chan KeyValueEntry
KeyValueManager is used to manage KeyValue stores. CreateKeyValue will create a KeyValue store with the following configuration. DeleteKeyValue will delete this KeyValue store (JetStream stream). KeyValue will lookup and bind to an existing KeyValue store. KeyValueStoreNames is used to retrieve a list of key value store names KeyValueStores is used to retrieve a list of key value store statuses JetStreamContext (interface)
( KeyValueOp) String() string KeyValueOp : expvar.Var KeyValueOp : fmt.Stringer func KeyValueEntry.Operation() KeyValueOp const KeyValueDelete const KeyValuePurge const KeyValuePut
KeyValueStatus is run-time status about a Key-Value bucket BackingStore indicates what technology is used for storage of the bucket Bucket the name of the bucket Bytes returns the size in bytes of the bucket History returns the configured history kept per key IsCompressed indicates if the data is compressed on disk TTL is how long the bucket keeps values for Values is how many messages are in the bucket, including historical values *KeyValueBucketStatus func JetStreamContext.KeyValueStores() <-chan KeyValueStatus func KeyValue.Status() (KeyValueStatus, error) func KeyValueManager.KeyValueStores() <-chan KeyValueStatus
KeyWatcher is what is returned when doing a watch. Context returns watcher context optionally provided by nats.Context option. Stop will stop this watcher. Updates returns a channel to read any updates to entries. func KeyValue.Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) func KeyValue.WatchAll(opts ...WatchOpt) (KeyWatcher, error) func KeyValue.WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error)
ContextOpt func ListObjectsShowDeleted() ListObjectsOpt func ObjectStore.List(opts ...ListObjectsOpt) ([]*ObjectInfo, error)
MaxWait sets the maximum amount of time we will wait for a response. MaxWait : JSOpt MaxWait : PullOpt
MessageBatch provides methods to retrieve messages consumed using [Subscribe.FetchBatch]. Done signals end of execution. Error returns an error encountered when fetching messages. Messages returns a channel on which messages will be published. func (*Subscription).FetchBatch(batch int, opts ...PullOpt) (MessageBatch, error)
Msg represents a message delivered by NATS. This structure is used by Subscribers and PublishMsg(). # Types of Acknowledgements In case using JetStream, there are multiple ways to ack a Msg: // Acknowledgement that a message has been processed. msg.Ack() // Negatively acknowledges a message. msg.Nak() // Terminate a message so that it is not redelivered further. msg.Term() // Signal the server that the message is being worked on and reset redelivery timer. msg.InProgress() Data []byte Header Header Reply string Sub *Subscription Subject string Ack acknowledges a message. This tells the server that the message was successfully processed and it can move on to the next message. AckSync is the synchronous version of Ack. This indicates successful message processing. Compares two msgs, ignores sub but checks all other public fields. InProgress tells the server that this message is being worked on. It resets the redelivery timer on the server. Metadata retrieves the metadata from a JetStream message. This method will return an error for non-JetStream Msgs. Nak negatively acknowledges a message. This tells the server to redeliver the message. You can configure the number of redeliveries by passing nats.MaxDeliver when you Subscribe. The default is infinite redeliveries. Nak negatively acknowledges a message. This tells the server to redeliver the message after the give `delay` duration. You can configure the number of redeliveries by passing nats.MaxDeliver when you Subscribe. The default is infinite redeliveries. Respond allows a convenient way to respond to requests in service based subscriptions. RespondMsg allows a convenient way to respond to requests in service based subscriptions that might include headers Size returns a message size in bytes. Term tells the server to not redeliver this message, regardless of the value of nats.MaxDeliver. *Msg : github.com/gogo/protobuf/proto.Sizer func NewMsg(subject string) *Msg func (*Conn).Request(subj string, data []byte, timeout time.Duration) (*Msg, error) func (*Conn).RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error) func (*Conn).RequestMsgWithContext(ctx context.Context, msg *Msg) (*Msg, error) func (*Conn).RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) func MessageBatch.Messages() <-chan *Msg func PubAckFuture.Msg() *Msg func (*Subscription).Fetch(batch int, opts ...PullOpt) ([]*Msg, error) func (*Subscription).NextMsg(timeout time.Duration) (*Msg, error) func (*Subscription).NextMsgWithContext(ctx context.Context) (*Msg, error) func (*Conn).ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) func (*Conn).ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) func (*Conn).PublishMsg(m *Msg) error func (*Conn).QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) func (*Conn).RequestMsg(msg *Msg, timeout time.Duration) (*Msg, error) func (*Conn).RequestMsgWithContext(ctx context.Context, msg *Msg) (*Msg, error) func JetStream.ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) func JetStream.ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) func JetStream.PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) func JetStream.PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) func JetStreamContext.ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) func JetStreamContext.ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) func JetStreamContext.PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) func JetStreamContext.PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) func (*Msg).Equal(msg *Msg) bool func (*Msg).RespondMsg(msg *Msg) error
MsgErrHandler is used to process asynchronous errors from JetStream PublishAsync. It will return the original message sent to the server for possible retransmitting and the error encountered. func PublishAsyncErrHandler(cb MsgErrHandler) JSOpt
MsgHandler is a callback function that processes messages delivered to asynchronous subscribers. func (*Conn).QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) func (*Conn).Subscribe(subj string, cb MsgHandler) (*Subscription, error) func JetStream.QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) func JetStream.Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) func JetStreamContext.QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) func JetStreamContext.Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error)
MsgMetadata is the JetStream metadata associated with received messages. Consumer string Domain string NumDelivered uint64 NumPending uint64 Sequence SequencePair Stream string Timestamp time.Time func (*Msg).Metadata() (*MsgMetadata, error)
ObjectBucketStatus represents status of a Bucket, implements ObjectStoreStatus BackingStore indicates what technology is used for storage of the bucket Bucket is the name of the bucket Description is the description supplied when creating the bucket IsCompressed indicates if the data is compressed on disk Metadata is the metadata supplied when creating the bucket Replicas indicates how many storage replicas are kept for the data in the bucket Sealed indicates the stream is sealed and cannot be modified in any way Size is the combined size of all data in the bucket including metadata, in bytes Storage indicates the underlying JetStream storage technology used to store data StreamInfo is the stream info retrieved to create the status TTL indicates how long objects are kept in the bucket *ObjectBucketStatus : ObjectStoreStatus *ObjectBucketStatus : github.com/alexflint/go-arg.Described
ObjectInfo is meta plus instance information. Bucket string Chunks uint32 Deleted bool Digest string ModTime time.Time NUID string ObjectMeta ObjectMeta ObjectMeta.Description string ObjectMeta.Headers Header ObjectMeta.Metadata map[string]string ObjectMeta.Name string Optional options. Size uint64 func ObjectResult.Info() (*ObjectInfo, error) func ObjectStore.AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, error) func ObjectStore.AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error) func ObjectStore.GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, error) func ObjectStore.List(opts ...ListObjectsOpt) ([]*ObjectInfo, error) func ObjectStore.Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) func ObjectStore.PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error) func ObjectStore.PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) func ObjectStore.PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error) func ObjectWatcher.Updates() <-chan *ObjectInfo func ObjectStore.AddLink(name string, obj *ObjectInfo) (*ObjectInfo, error)
ObjectMeta is high level information about an object. Description string Headers Header Metadata map[string]string Name string Optional options. func ObjectStore.Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) func ObjectStore.UpdateMeta(name string, meta *ObjectMeta) error
ObjectMetaOptions ChunkSize uint32 Link *ObjectLink
ContextOpt func JetStreamContext.ObjectStoreNames(opts ...ObjectOpt) <-chan string func JetStreamContext.ObjectStores(opts ...ObjectOpt) <-chan ObjectStoreStatus func ObjectStore.Put(obj *ObjectMeta, reader io.Reader, opts ...ObjectOpt) (*ObjectInfo, error) func ObjectStore.PutBytes(name string, data []byte, opts ...ObjectOpt) (*ObjectInfo, error) func ObjectStore.PutFile(file string, opts ...ObjectOpt) (*ObjectInfo, error) func ObjectStore.PutString(name string, data string, opts ...ObjectOpt) (*ObjectInfo, error) func ObjectStoreManager.ObjectStoreNames(opts ...ObjectOpt) <-chan string func ObjectStoreManager.ObjectStores(opts ...ObjectOpt) <-chan ObjectStoreStatus
ObjectResult will return the underlying stream info and also be an io.ReadCloser. ( ObjectResult) Close() error ( ObjectResult) Error() error ( ObjectResult) Info() (*ObjectInfo, error) ( ObjectResult) Read(p []byte) (n int, err error) ObjectResult : github.com/prometheus/common/expfmt.Closer ObjectResult : io.Closer ObjectResult : io.ReadCloser ObjectResult : io.Reader func ObjectStore.Get(name string, opts ...GetObjectOpt) (ObjectResult, error)
ObjectStore is a blob store capable of storing large objects efficiently in JetStream streams AddBucketLink will add a link to another object store. AddLink will add a link to another object. Delete will delete the named object. Get will pull the named object from the object store. GetBytes is a convenience function to pull an object from this object store and return it as a byte slice. GetFile is a convenience function to pull an object from this object store and place it in a file. GetInfo will retrieve the current information for the object. GetString is a convenience function to pull an object from this object store and return it as a string. List will list all the objects in this store. Put will place the contents from the reader into a new object. PutBytes is convenience function to put a byte slice into this object store. PutFile is convenience function to put a file into this object store. PutString is convenience function to put a string into this object store. Seal will seal the object store, no further modifications will be allowed. Status retrieves run-time status about the backing store of the bucket. UpdateMeta will update the metadata for the object. Watch for changes in the underlying store and receive meta information updates. func JetStreamContext.CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) func JetStreamContext.ObjectStore(bucket string) (ObjectStore, error) func ObjectStoreManager.CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) func ObjectStoreManager.ObjectStore(bucket string) (ObjectStore, error) func ObjectStore.AddBucketLink(name string, bucket ObjectStore) (*ObjectInfo, error)
ObjectStoreConfig is the config for the object store. Bucket string Enable underlying stream compression. NOTE: Compression is supported for nats-server 2.10.0+ Description string MaxBytes int64 Bucket-specific metadata NOTE: Metadata requires nats-server v2.10.0+ Placement *Placement Replicas int Storage StorageType TTL time.Duration func JetStreamContext.CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) func ObjectStoreManager.CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error)
ObjectStoreManager creates, loads and deletes Object Stores CreateObjectStore will create an object store. DeleteObjectStore will delete the underlying stream for the named object. ObjectStore will look up and bind to an existing object store instance. ObjectStoreNames is used to retrieve a list of bucket names ObjectStores is used to retrieve a list of bucket statuses JetStreamContext (interface)
BackingStore provides details about the underlying storage Bucket is the name of the bucket Description is the description supplied when creating the bucket IsCompressed indicates if the data is compressed on disk Metadata is the user supplied metadata for the bucket Replicas indicates how many storage replicas are kept for the data in the bucket Sealed indicates the stream is sealed and cannot be modified in any way Size is the combined size of all data in the bucket including metadata, in bytes Storage indicates the underlying JetStream storage technology used to store data TTL indicates how long objects are kept in the bucket *ObjectBucketStatus ObjectStoreStatus : github.com/alexflint/go-arg.Described func JetStreamContext.ObjectStores(opts ...ObjectOpt) <-chan ObjectStoreStatus func ObjectStore.Status() (ObjectStoreStatus, error) func ObjectStoreManager.ObjectStores(opts ...ObjectOpt) <-chan ObjectStoreStatus
ObjectWatcher is what is returned when doing a watch. Stop will stop this watcher. Updates returns a channel to read any updates to entries. func ObjectStore.Watch(opts ...WatchOpt) (ObjectWatcher, error)
Option is a function on the options for a connection. func ClientCert(certFile, keyFile string) Option func ClientTLSConfig(certCB TLSCertHandler, rootCAsCB RootCAsHandler) Option func ClosedHandler(cb ConnHandler) Option func Compression(enabled bool) Option func ConnectHandler(cb ConnHandler) Option func CustomInboxPrefix(p string) Option func CustomReconnectDelay(cb ReconnectDelayHandler) Option func Dialer(dialer *net.Dialer) Option func DisconnectErrHandler(cb ConnErrHandler) Option func DisconnectHandler(cb ConnHandler) Option func DiscoveredServersHandler(cb ConnHandler) Option func DontRandomize() Option func DrainTimeout(t time.Duration) Option func ErrorHandler(cb ErrHandler) Option func FlusherTimeout(t time.Duration) Option func IgnoreAuthErrorAbort() Option func InProcessServer(server InProcessConnProvider) Option func LameDuckModeHandler(cb ConnHandler) Option func MaxPingsOutstanding(max int) Option func MaxReconnects(max int) Option func Name(name string) Option func Nkey(pubKey string, sigCB SignatureHandler) Option func NkeyOptionFromSeed(seedFile string) (Option, error) func NoCallbacksAfterClientClose() Option func NoEcho() Option func NoReconnect() Option func PermissionErrOnSubscribe(enabled bool) Option func PingInterval(t time.Duration) Option func ProxyPath(path string) Option func ReconnectBufSize(size int) Option func ReconnectErrHandler(cb ConnErrHandler) Option func ReconnectHandler(cb ConnHandler) Option func ReconnectJitter(jitter, jitterForTLS time.Duration) Option func ReconnectWait(t time.Duration) Option func RetryOnFailedConnect(retry bool) Option func RootCAs(file ...string) Option func Secure(tls ...*tls.Config) Option func SetCustomDialer(dialer CustomDialer) Option func SkipHostLookup() Option func SyncQueueLen(max int) Option func Timeout(t time.Duration) Option func TLSHandshakeFirst() Option func Token(token string) Option func TokenHandler(cb AuthTokenHandler) Option func UseOldRequestStyle() Option func UserCredentials(userOrChainedFile string, seedFiles ...string) Option func UserInfo(user, password string) Option func UserInfoHandler(cb UserInfoCB) Option func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option func UserJWTAndSeed(jwt string, seed string) Option func Connect(url string, options ...Option) (*Conn, error)
Options can be used to create a customized connection. AllowReconnect enables reconnection logic to be used when we encounter a disconnect from the current server. AsyncErrorCB sets the async error handler (e.g. slow consumer errors) ClosedCB sets the closed handler that is called when a client will no longer be connected. For websocket connections, indicates to the server that the connection supports compression. If the server does too, then data will be compressed. ConnectedCB sets the connected handler called when the initial connection is established. It is not invoked on successful reconnects - for reconnections, use ReconnectedCB. ConnectedCB can be used in conjunction with RetryOnFailedConnect to detect whether the initial connect was successful. CustomDialer allows to specify a custom dialer (not necessarily a *net.Dialer). CustomReconnectDelayCB is invoked after the library tried every URL in the server list and failed to reconnect. It passes to the user the current number of attempts. This function returns the amount of time the library will sleep before attempting to reconnect again. It is strongly recommended that this value contains some jitter to prevent all connections to attempt reconnecting at the same time. Dialer allows a custom net.Dialer when forming connections. Deprecated: should use CustomDialer instead. DisconnectedCB sets the disconnected handler that is called whenever the connection is disconnected. Will not be called if DisconnectedErrCB is set Deprecated. Use DisconnectedErrCB which passes error that caused the disconnect event. DisconnectedErrCB sets the disconnected error handler that is called whenever the connection is disconnected. Disconnected error could be nil, for instance when user explicitly closes the connection. DisconnectedCB will not be called if DisconnectedErrCB is set DiscoveredServersCB sets the callback that is invoked whenever a new server has joined the cluster. DrainTimeout sets the timeout for a Drain Operation to complete. Defaults to 30s. FlusherTimeout is the maximum time to wait for write operations to the underlying connection to complete (including the flusher loop). Defaults to 1m. IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy). InProcessServer represents a NATS server running within the same process. If this is set then we will attempt to connect to the server directly rather than using external TCP conns. InboxPrefix allows the default _INBOX prefix to be customized LameDuckModeHandler sets the callback to invoke when the server notifies the connection that it entered lame duck mode, that is, going to gradually disconnect all its connections before shutting down. This is often used in deployments when upgrading NATS Servers. MaxPingsOut is the maximum number of pending ping commands that can be awaiting a response before raising an ErrStaleConnection error. Defaults to 2. MaxReconnect sets the number of reconnect attempts that will be tried before giving up. If negative, then it will never give up trying to reconnect. Defaults to 60. Name is an optional name label which will be sent to the server on CONNECT to identify the client. Nkey sets the public nkey that will be used to authenticate when connecting to the server. UserJWT and Nkey are mutually exclusive and if defined, UserJWT will take precedence. NoCallbacksAfterClientClose allows preventing the invocation of callbacks after Close() is called. Client won't receive notifications when Close is invoked by user code. Default is to invoke the callbacks. NoEcho configures whether the server will echo back messages that are sent on this connection if we also have matching subscriptions. Note this is supported on servers >= version 1.2. Proto 1 or greater. NoRandomize configures whether we will randomize the server pool. Password sets the password to be used when connecting to a server. Pedantic signals the server whether it should be doing further validation of subjects. PermissionErrOnSubscribe - if set to true, the client will return ErrPermissionViolation from SubscribeSync if the server returns a permissions error for a subscription. Defaults to false. PingInterval is the period at which the client will be sending ping commands to the server, disabled if 0 or negative. Defaults to 2m. For websocket connections, adds a path to connections url. This is useful when connecting to NATS behind a proxy. ReconnectBufSize is the size of the backing bufio during reconnect. Once this has been exhausted publish operations will return an error. Defaults to 8388608 bytes (8MB). ReconnectErrCB sets the callback that is invoked whenever a reconnect attempt failed ReconnectJitter sets the upper bound for a random delay added to ReconnectWait during a reconnect when no TLS is used. Defaults to 100ms. ReconnectJitterTLS sets the upper bound for a random delay added to ReconnectWait during a reconnect when TLS is used. Defaults to 1s. ReconnectWait sets the time to backoff after attempting a reconnect to a server that we were already connected to previously. Defaults to 2s. ReconnectedCB sets the reconnected handler called whenever the connection is successfully reconnected. RetryOnFailedConnect sets the connection in reconnecting state right away if it can't connect to a server in the initial set. The MaxReconnect and ReconnectWait options are used for this process, similarly to when an established connection is disconnected. If a ReconnectHandler is set, it will be invoked on the first successful reconnect attempt (if the initial connect fails), and if a ClosedHandler is set, it will be invoked if it fails to connect (after exhausting the MaxReconnect attempts). RootCAsCB is used to fetch and return a set of root certificate authorities that clients use when verifying server certificates. Secure enables TLS secure connections that skip server verification by default. NOT RECOMMENDED. Servers is a configured set of servers which this client will use when attempting to connect. SignatureCB designates the function used to sign the nonce presented from the server. SkipHostLookup skips the DNS lookup for the server hostname. SubChanLen is the size of the buffered channel used between the socket Go routine and the message delivery for SyncSubscriptions. NOTE: This does not affect AsyncSubscriptions which are dictated by PendingLimits() Defaults to 65536. TLSCertCB is used to fetch and return custom tls certificate. TLSConfig is a custom TLS configuration to use for secure transports. TLSHandshakeFirst is used to instruct the library perform the TLS handshake right after the connect and before receiving the INFO protocol from the server. If this option is enabled but the server is not configured to perform the TLS handshake first, the connection will fail. Timeout sets the timeout for a Dial operation on a connection. Defaults to 2s. Token sets the token to be used when connecting to a server. TokenHandler designates the function used to generate the token to be used when connecting to a server. Url represents a single NATS server url to which the client will be connecting. If the Servers option is also set, it then becomes the first server in the Servers array. UseOldRequestStyle forces the old method of Requests that utilize a new Inbox and a new Subscription for each request. User sets the username to be used when connecting to the server. UserInfo sets the callback handler that will fetch the username and password. UserJWT sets the callback handler that will fetch a user's JWT. Verbose signals the server to send an OK ack for commands successfully processed by the server. Connect will attempt to connect to a NATS server with multiple options. func GetDefaultOptions() Options var DefaultOptions
PeerInfo shows information about all the peers in the cluster that are supporting the stream or consumer. Active time.Duration Current bool Lag uint64 Name string Offline bool
Placement is used to guide placement of streams in clustered JetStream. Cluster string Tags []string
PubAck is an ack received after successfully publishing a message. Domain string Duplicate bool Sequence uint64 Stream string func JetStream.Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) func JetStream.PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) func JetStreamContext.Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) func JetStreamContext.PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) func PubAckFuture.Ok() <-chan *PubAck
PubAckFuture is a future for a PubAck. Err returns a receive only channel that can be used to get the error from an async publish. Msg returns the message that was sent to the server. Ok returns a receive only channel that can be used to get a PubAck. func JetStream.PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) func JetStream.PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) func JetStreamContext.PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) func JetStreamContext.PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)
PubOpt configures options for publishing JetStream messages. AckWait ContextOpt func ExpectLastMsgId(id string) PubOpt func ExpectLastSequence(seq uint64) PubOpt func ExpectLastSequencePerSubject(seq uint64) PubOpt func ExpectStream(stream string) PubOpt func MsgId(id string) PubOpt func MsgTTL(dur time.Duration) PubOpt func RetryAttempts(num int) PubOpt func RetryWait(dur time.Duration) PubOpt func StallWait(ttl time.Duration) PubOpt func JetStream.Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) func JetStream.PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) func JetStream.PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) func JetStream.PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) func JetStreamContext.Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error) func JetStreamContext.PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) func JetStreamContext.PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) func JetStreamContext.PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error)
PullHeartbeat : PullOpt
PullMaxBytes defines the max bytes allowed for a fetch request. PullMaxBytes : PullOpt
PullOpt are the options that can be passed when pulling a batch of messages. ContextOpt MaxWait PullHeartbeat PullMaxBytes func (*Subscription).Fetch(batch int, opts ...PullOpt) ([]*Msg, error) func (*Subscription).FetchBatch(batch int, opts ...PullOpt) (MessageBatch, error)
ContextOpt DeleteMarkersOlderThan func KeyValue.PurgeDeletes(opts ...PurgeOpt) error
RawStreamMsg is a raw message stored in JetStream. Data []byte Header Header Sequence uint64 Subject string Time time.Time func JetStreamContext.GetLastMsg(name, subject string, opts ...JSOpt) (*RawStreamMsg, error) func JetStreamContext.GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error) func JetStreamManager.GetLastMsg(name, subject string, opts ...JSOpt) (*RawStreamMsg, error) func JetStreamManager.GetMsg(name string, seq uint64, opts ...JSOpt) (*RawStreamMsg, error)
ReconnectDelayHandler is used to get from the user the desired delay the library should pause before attempting to reconnect again. Note that this is invoked after the library tried the whole list of URLs and failed to reconnect. func CustomReconnectDelay(cb ReconnectDelayHandler) Option
ReplayPolicy determines how the consumer should replay messages it already has queued in the stream. ( ReplayPolicy) MarshalJSON() ([]byte, error) (*ReplayPolicy) UnmarshalJSON(data []byte) error ReplayPolicy : github.com/goccy/go-json.Marshaler *ReplayPolicy : github.com/goccy/go-json.Unmarshaler ReplayPolicy : encoding/json.Marshaler *ReplayPolicy : encoding/json.Unmarshaler const ReplayInstantPolicy const ReplayOriginalPolicy
RePublish is for republishing messages once committed to a stream. The original subject cis remapped from the subject pattern to the destination pattern. Destination string HeadersOnly bool Source string
RetentionPolicy determines how messages in a set are retained. ( RetentionPolicy) MarshalJSON() ([]byte, error) ( RetentionPolicy) String() string (*RetentionPolicy) UnmarshalJSON(data []byte) error RetentionPolicy : github.com/goccy/go-json.Marshaler *RetentionPolicy : github.com/goccy/go-json.Unmarshaler RetentionPolicy : encoding/json.Marshaler *RetentionPolicy : encoding/json.Unmarshaler RetentionPolicy : expvar.Var RetentionPolicy : fmt.Stringer const InterestPolicy const LimitsPolicy const WorkQueuePolicy
RootCAsHandler is used to fetch and return a set of root certificate authorities that clients use when verifying server certificates. func ClientTLSConfig(certCB TLSCertHandler, rootCAsCB RootCAsHandler) Option
SequenceInfo has both the consumer and the stream sequence and last activity. Consumer uint64 Last *time.Time Stream uint64
SequencePair includes the consumer and stream sequence info from a JetStream consumer. Consumer uint64 Stream uint64
SignatureHandler is used to sign a nonce from the server while authenticating with nkeys. The user should sign the nonce and return the raw signature. The client will base64 encode this to send to the server. func Nkey(pubKey string, sigCB SignatureHandler) Option func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option
Tracks various stats received and sent on this connection, including counts for messages and bytes. InBytes uint64 InMsgs uint64 OutBytes uint64 OutMsgs uint64 Reconnects uint64 func (*Conn).Stats() Statistics
Status represents the state of the connection. ( Status) String() string Status : expvar.Var Status : fmt.Stringer func (*Conn).Status() Status func (*Conn).StatusChanged(statuses ...Status) chan Status func (*Conn).RemoveStatusListener(ch chan (Status)) func (*Conn).StatusChanged(statuses ...Status) chan Status const CLOSED const CONNECTED const CONNECTING const DISCONNECTED const DRAINING_PUBS const DRAINING_SUBS const RECONNECTING
StorageType determines how messages are stored for retention. ( StorageType) MarshalJSON() ([]byte, error) ( StorageType) String() string (*StorageType) UnmarshalJSON(data []byte) error StorageType : github.com/goccy/go-json.Marshaler *StorageType : github.com/goccy/go-json.Unmarshaler StorageType : encoding/json.Marshaler *StorageType : encoding/json.Unmarshaler StorageType : expvar.Var StorageType : fmt.Stringer func (*ObjectBucketStatus).Storage() StorageType func ObjectStoreStatus.Storage() StorageType const FileStorage const MemoryStorage
( StoreCompression) MarshalJSON() ([]byte, error) ( StoreCompression) String() string (*StoreCompression) UnmarshalJSON(b []byte) error StoreCompression : github.com/goccy/go-json.Marshaler *StoreCompression : github.com/goccy/go-json.Unmarshaler StoreCompression : encoding/json.Marshaler *StoreCompression : encoding/json.Unmarshaler StoreCompression : expvar.Var StoreCompression : fmt.Stringer const NoCompression const S2Compression
StreamAlternate is an alternate stream represented by a mirror. Cluster string Domain string Name string
StreamConfig will determine the properties for a stream. There are sensible defaults for most. If no subjects are given the name will be used as the only subject. AllowDirect enables direct access to individual messages using direct get API. Defaults to false. AllowMsgTTL allows header initiated per-message TTLs. This feature requires nats-server v2.11.0 or later. AllowRollup allows the use of the Nats-Rollup header to replace all contents of a stream, or subject in a stream, with a single new message. Compression specifies the message storage compression algorithm. Defaults to NoCompression. ConsumerLimits defines limits of certain values that consumers can set, defaults for those who don't set these settings DenyDelete restricts the ability to delete messages from a stream via the API. Defaults to false. DenyPurge restricts the ability to purge messages from a stream via the API. Defaults to false. Description is an optional description of the stream. Discard defines the policy for handling messages when the stream reaches its limits in terms of number of messages or total bytes. DiscardNewPerSubject is a flag to enable discarding new messages per subject when limits are reached. Requires DiscardPolicy to be DiscardNew and the MaxMsgsPerSubject to be set. Duplicates is the window within which to track duplicate messages. If not set, server default is 2 minutes. FirstSeq is the initial sequence number of the first message in the stream. MaxAge is the maximum age of messages that the stream will retain. MaxBytes is the maximum total size of messages the stream will store. After reaching the limit, stream adheres to the discard policy. If not set, server default is -1 (unlimited). MaxConsumers specifies the maximum number of consumers allowed for the stream. MaxMsgSize is the maximum size of any single message in the stream. MaxMsgs is the maximum number of messages the stream will store. After reaching the limit, stream adheres to the discard policy. If not set, server default is -1 (unlimited). MaxMsgsPerSubject is the maximum number of messages per subject that the stream will retain. Metadata is a set of application-defined key-value pairs for associating metadata on the stream. This feature requires nats-server v2.10.0 or later. Mirror defines the configuration for mirroring another stream. MirrorDirect enables direct access to individual messages from the origin stream using direct get API. Defaults to false. Name is the name of the stream. It is required and must be unique across the JetStream account. Name Names cannot contain whitespace, ., *, >, path separators (forward or backwards slash), and non-printable characters. NoAck is a flag to disable acknowledging messages received by this stream. If set to true, publish methods from the JetStream client will not work as expected, since they rely on acknowledgements. Core NATS publish methods should be used instead. Note that this will make message delivery less reliable. Placement is used to declare where the stream should be placed via tags and/or an explicit cluster name. RePublish allows immediate republishing a message to the configured subject after it's stored. Replicas is the number of stream replicas in clustered JetStream. Defaults to 1, maximum is 5. Retention defines the message retention policy for the stream. Defaults to LimitsPolicy. Sealed streams do not allow messages to be published or deleted via limits or API, sealed streams can not be unsealed via configuration update. Can only be set on already created streams via the Update API. Sources is a list of other streams this stream sources messages from. Storage specifies the type of storage backend used for the stream (file or memory). Enables and sets a duration for adding server markers for delete, purge and max age limits. This feature requires nats-server v2.11.0 or later. SubjectTransform allows applying a transformation to matching messages' subjects. Subjects is a list of subjects that the stream is listening on. Wildcards are supported. Subjects cannot be set if the stream is created as a mirror. Template identifies the template that manages the Stream. Deprecated: This feature is no longer supported. func JetStreamContext.AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) func JetStreamContext.UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) func JetStreamManager.AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) func JetStreamManager.UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error)
StreamConsumerLimits are the limits for a consumer on a stream. These can be overridden on a per consumer basis. InactiveThreshold time.Duration MaxAckPending int
StreamInfo shows config and current state for this stream. Alternates []*StreamAlternate Cluster *ClusterInfo Config StreamConfig Created time.Time Mirror *StreamSourceInfo Sources []*StreamSourceInfo State StreamState func JetStreamContext.AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) func JetStreamContext.StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) func JetStreamContext.Streams(opts ...JSOpt) <-chan *StreamInfo func JetStreamContext.StreamsInfo(opts ...JSOpt) <-chan *StreamInfo func JetStreamContext.UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) func JetStreamManager.AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) func JetStreamManager.StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) func JetStreamManager.Streams(opts ...JSOpt) <-chan *StreamInfo func JetStreamManager.StreamsInfo(opts ...JSOpt) <-chan *StreamInfo func JetStreamManager.UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) func (*KeyValueBucketStatus).StreamInfo() *StreamInfo func (*ObjectBucketStatus).StreamInfo() *StreamInfo
StreamInfoRequest contains additional option to return apiPagedRequest.Offset int DeletedDetails when true includes information about deleted messages SubjectsFilter when set, returns information on the matched subjects *StreamInfoRequest : JSOpt
StreamPurgeRequest is optional request information to the purge API. Number of messages to keep. Purge up to but not including sequence. Subject to match against messages for the purge command. *StreamPurgeRequest : JSOpt
StreamSource dictates how streams can source from other streams. Domain string External *ExternalStream FilterSubject string Name string OptStartSeq uint64 OptStartTime *time.Time SubjectTransforms []SubjectTransformConfig
StreamSourceInfo shows information about an upstream stream source. Active time.Duration Error *APIError External *ExternalStream FilterSubject string Lag uint64 Name string SubjectTransforms []SubjectTransformConfig
StreamState is information about the given stream. Bytes uint64 Consumers int Deleted []uint64 FirstSeq uint64 FirstTime time.Time LastSeq uint64 LastTime time.Time Msgs uint64 NumDeleted int NumSubjects uint64 Subjects map[string]uint64
SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received. Destination string Source string
SubOpt configures options for subscribing to JetStream consumers. AckWait ContextOpt func AckAll() SubOpt func AckExplicit() SubOpt func AckNone() SubOpt func BackOff(backOff []time.Duration) SubOpt func Bind(stream, consumer string) SubOpt func BindStream(stream string) SubOpt func ConsumerFilterSubjects(subjects ...string) SubOpt func ConsumerMemoryStorage() SubOpt func ConsumerName(name string) SubOpt func ConsumerReplicas(replicas int) SubOpt func DeliverAll() SubOpt func DeliverLast() SubOpt func DeliverLastPerSubject() SubOpt func DeliverNew() SubOpt func DeliverSubject(subject string) SubOpt func Description(description string) SubOpt func Durable(consumer string) SubOpt func EnableFlowControl() SubOpt func HeadersOnly() SubOpt func IdleHeartbeat(duration time.Duration) SubOpt func InactiveThreshold(threshold time.Duration) SubOpt func ManualAck() SubOpt func MaxAckPending(n int) SubOpt func MaxDeliver(n int) SubOpt func MaxRequestBatch(max int) SubOpt func MaxRequestExpires(max time.Duration) SubOpt func MaxRequestMaxBytes(bytes int) SubOpt func OrderedConsumer() SubOpt func PullMaxWaiting(n int) SubOpt func RateLimit(n uint64) SubOpt func ReplayInstant() SubOpt func ReplayOriginal() SubOpt func SkipConsumerLookup() SubOpt func StartSequence(seq uint64) SubOpt func StartTime(startTime time.Time) SubOpt func JetStream.ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) func JetStream.ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) func JetStream.PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) func JetStream.QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) func JetStream.QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) func JetStream.Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) func JetStream.SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) func JetStreamContext.ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) func JetStreamContext.ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) func JetStreamContext.PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) func JetStreamContext.QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) func JetStreamContext.QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) func JetStreamContext.Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) func JetStreamContext.SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)
Subscription represents interest in a given subject. Optional queue group name. If present, all subscriptions with the same name will form a distributed queue, and each message will only be processed by one member of the group. Subject that represents this subscription. This can be different than the received subject inside a Msg if this is a wildcard. AutoUnsubscribe will issue an automatic Unsubscribe that is processed by the server when max messages have been received. This can be useful when sending a request to an unknown number of subscribers. ClearMaxPending resets the maximums seen so far. (*Subscription) ConsumerInfo() (*ConsumerInfo, error) Delivered returns the number of delivered messages for this subscription. Drain will remove interest but continue callbacks until all messages have been processed. For a JetStream subscription, if the library has created the JetStream consumer, the library will send a DeleteConsumer request to the server when the Drain operation completes. If a failure occurs when deleting the JetStream consumer, an error will be reported to the asynchronous error callback. If you do not wish the JetStream consumer to be automatically deleted, ensure that the consumer is not created by the library, which means create the consumer with AddConsumer and bind to this consumer. Dropped returns the number of known dropped messages for this subscription. This will correspond to messages dropped by violations of PendingLimits. If the server declares the connection a SlowConsumer, this number may not be valid. Fetch pulls a batch of messages from a stream for a pull consumer. FetchBatch pulls a batch of messages from a stream for a pull consumer. Unlike [Subscription.Fetch], it is non blocking and returns [MessageBatch], allowing to retrieve incoming messages from a channel. The returned channel is always closed after all messages for a batch have been delivered by the server - it is safe to iterate over it using range. To avoid using default JetStream timeout as fetch expiry time, use [nats.MaxWait] or [nats.Context] (with deadline set). This method will not return error in case of pull request expiry (even if there are no messages). Any other error encountered when receiving messages will cause FetchBatch to stop receiving new messages. InitialConsumerPending returns the number of messages pending to be delivered to the consumer when the subscription was created. IsDraining returns a boolean indicating whether the subscription is being drained. This will return false if the subscription has already been closed. IsValid returns a boolean indicating whether the subscription is still active. This will return false if the subscription has already been closed. MaxPending returns the maximum number of queued messages and queued bytes seen so far. Msgs returns an iter.Seq2[*Msg, error] that can be used to iterate over messages. It can only be used with a subscription that has been created with SubscribeSync or QueueSubscribeSync, otherwise it will return an error on the first iteration. The iterator will block until a message is available. The subscription will not be closed when the iterator is done. MsgsTimeout returns an iter.Seq2[*Msg, error] that can be used to iterate over messages. It can only be used with a subscription that has been created with SubscribeSync or QueueSubscribeSync, otherwise it will return an error on the first iteration. The iterator will block until a message is available or the timeout is reached. If the timeout is reached, the iterator will return nats.ErrTimeout but it will not be closed. NextMsg will return the next message available to a synchronous subscriber or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription), the connection is closed (ErrConnectionClosed), the timeout is reached (ErrTimeout), or if there were no responders (ErrNoResponders) when used in the context of a request/reply. NextMsgWithContext takes a context and returns the next message available to a synchronous subscriber, blocking until it is delivered or context gets canceled. Pending returns the number of queued messages and queued bytes in the client for this subscription. PendingLimits returns the current limits for this subscription. If no error is returned, a negative value indicates that the given metric is not limited. Queued returns the number of queued messages in the client for this subscription. Deprecated: Use Pending() SetClosedHandler will set the closed handler for when a subscription is closed (either unsubscribed or drained). SetPendingLimits sets the limits for pending msgs and bytes for this subscription. Zero is not allowed. Any negative value means that the given metric is not limited. StatusChanged returns a channel on which given list of subscription status changes will be sent. If no status is provided, all status changes will be sent. Available statuses are SubscriptionActive, SubscriptionDraining, SubscriptionClosed, and SubscriptionSlowConsumer. The returned channel will be closed when the subscription is closed. Type returns the type of Subscription. Unsubscribe will remove interest in the given subject. For a JetStream subscription, if the library has created the JetStream consumer, it will send a DeleteConsumer request to the server (if the unsubscribe itself was successful). If the delete operation fails, the error will be returned. If you do not wish the JetStream consumer to be automatically deleted, ensure that the consumer is not created by the library, which means create the consumer with AddConsumer and bind to this consumer (using the nats.Bind() option). *Subscription : database/sql/driver.Validator func (*Conn).ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) func (*Conn).ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) func (*Conn).QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) func (*Conn).QueueSubscribeSync(subj, queue string) (*Subscription, error) func (*Conn).QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) func (*Conn).Subscribe(subj string, cb MsgHandler) (*Subscription, error) func (*Conn).SubscribeSync(subj string) (*Subscription, error) func (*EncodedConn).BindRecvChan(subject string, channel any) (*Subscription, error) func (*EncodedConn).BindRecvQueueChan(subject, queue string, channel any) (*Subscription, error) func (*EncodedConn).QueueSubscribe(subject, queue string, cb Handler) (*Subscription, error) func (*EncodedConn).Subscribe(subject string, cb Handler) (*Subscription, error) func JetStream.ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) func JetStream.ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) func JetStream.PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) func JetStream.QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) func JetStream.QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) func JetStream.Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) func JetStream.SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error) func JetStreamContext.ChanQueueSubscribe(subj, queue string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) func JetStreamContext.ChanSubscribe(subj string, ch chan *Msg, opts ...SubOpt) (*Subscription, error) func JetStreamContext.PullSubscribe(subj, durable string, opts ...SubOpt) (*Subscription, error) func JetStreamContext.QueueSubscribe(subj, queue string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) func JetStreamContext.QueueSubscribeSync(subj, queue string, opts ...SubOpt) (*Subscription, error) func JetStreamContext.Subscribe(subj string, cb MsgHandler, opts ...SubOpt) (*Subscription, error) func JetStreamContext.SubscribeSync(subj string, opts ...SubOpt) (*Subscription, error)
SubscriptionType is the type of the Subscription. func (*Subscription).Type() SubscriptionType const AsyncSubscription const ChanSubscription const NilSubscription const PullSubscription const SyncSubscription
Status represents the state of the connection. ( SubStatus) String() string SubStatus : expvar.Var SubStatus : fmt.Stringer func (*Subscription).StatusChanged(statuses ...SubStatus) <-chan SubStatus func (*Subscription).StatusChanged(statuses ...SubStatus) <-chan SubStatus const SubscriptionActive const SubscriptionClosed const SubscriptionDraining const SubscriptionSlowConsumer
TLSCertHandler is used to fetch and return tls certificate. func ClientTLSConfig(certCB TLSCertHandler, rootCAsCB RootCAsHandler) Option
UserInfoCB is used to pass the username and password when establishing connection. func UserInfoHandler(cb UserInfoCB) Option
UserJWTHandler is used to fetch and return the account signed JWT for this user. func UserJWT(userCB UserJWTHandler, sigCB SignatureHandler) Option
ContextOpt func IgnoreDeletes() WatchOpt func IncludeHistory() WatchOpt func MetaOnly() WatchOpt func UpdatesOnly() WatchOpt func KeyValue.History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) func KeyValue.Keys(opts ...WatchOpt) ([]string, error) func KeyValue.ListKeys(opts ...WatchOpt) (KeyLister, error) func KeyValue.Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) func KeyValue.WatchAll(opts ...WatchOpt) (KeyWatcher, error) func KeyValue.WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error) func ObjectStore.Watch(opts ...WatchOpt) (ObjectWatcher, error)
Package-Level Functions (total 122)
AckAll when acking a sequence number, this implicitly acks all sequences below this one as well.
AckExplicit requires ack or nack for all messages.
AckNone requires no acks for delivered messages.
APIPrefix changes the default prefix used for the JetStream API.
BackOff is an array of time durations that represent the time to delay based on delivery count.
Bind binds a subscription to an existing consumer from a stream without attempting to create. The first argument is the stream name and the second argument will be the consumer name.
BindStream binds a consumer to a stream explicitly based on a name. When a stream name is not specified, the library uses the subscribe subject as a way to find the stream name. It is done by making a request to the server to get list of stream names that have a filter for this subject. If the returned list contains a single stream, then this stream name will be used, otherwise the `ErrNoMatchingStream` is returned. To avoid the stream lookup, provide the stream name with this function. See also `Bind()`.
ClientCert is a helper option to provide the client certificate from a file. If Secure is not already set this will set it as well.
ClientTLSConfig is an Option to set the TLS configuration for secure connections. It can be used to e.g. set TLS config with cert and root CAs from memory. For simple use case of loading cert and CAs from file, ClientCert and RootCAs options are more convenient. If Secure is not already set this will set it as well.
ClosedHandler is an Option to set the closed handler.
Compression is an Option to indicate if this connection supports compression. Currently only supported for Websocket connections.
Connect will attempt to connect to the NATS system. The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222 Comma separated arrays are also supported, e.g. urlA, urlB. Options start with the defaults but can be overridden. To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).
ConnectHandler is an Option to set the connected handler.
ConsumerFilterSubjects can be used to set multiple subject filters on the consumer. It has to be used in conjunction with [nats.BindStream] and with empty 'subject' parameter.
ConsumerMemoryStorage sets the memory storage to true for a consumer.
ConsumerName sets the name for a consumer.
ConsumerReplicas sets the number of replica count for a consumer.
Context returns an option that can be used to configure a context for APIs that are context aware such as those part of the JetStream interface.
CustomInboxPrefix configures the request + reply inbox prefix
CustomReconnectDelay is an Option to set the CustomReconnectDelayCB option. See CustomReconnectDelayCB Option for more details.
DecodeHeadersMsg will decode and headers.
DecodeObjectDigest decodes base64 hash
DeliverAll will configure a Consumer to receive all the messages from a Stream.
DeliverLast configures a Consumer to receive messages starting with the latest one.
DeliverLastPerSubject configures a Consumer to receive messages starting with the latest one for each filtered subject.
DeliverNew configures a Consumer to receive messages published after the subscription.
DeliverSubject specifies the JetStream consumer deliver subject. This option is used only in situations where the consumer does not exist and a creation request is sent to the server. If not provided, an inbox will be selected. If a consumer exists, then the NATS subscription will be created on the JetStream consumer's DeliverSubject, not necessarily this subject.
Description will set the description for the created consumer.
Dialer is an Option to set the dialer which will be used when attempting to establish a connection. Deprecated: Should use CustomDialer instead.
DirectGet is an option that can be used to make GetMsg() or GetLastMsg() retrieve message directly from a group of servers (leader and replicas) if the stream was created with the AllowDirect option.
DirectGetNext is an option that can be used to make GetMsg() retrieve message directly from a group of servers (leader and replicas) if the stream was created with the AllowDirect option. The server will find the next message matching the filter `subject` starting at the start sequence (argument in GetMsg()). The filter `subject` can be a wildcard.
DisconnectErrHandler is an Option to set the disconnected error handler.
DisconnectHandler is an Option to set the disconnected handler. Deprecated: Use DisconnectErrHandler.
DiscoveredServersHandler is an Option to set the new servers handler.
Domain changes the domain part of JetStream API prefix.
DontRandomize is an Option to turn off randomizing the server pool.
DrainTimeout is an Option to set the timeout for draining a connection. Defaults to 30s.
Durable defines the consumer name for JetStream durable subscribers. This function will return ErrInvalidConsumerName if the name contains any dot ".".
EnableFlowControl enables flow control for a push based consumer.
EncoderForType will return the registered Encoder for the encType. Deprecated: Encoded connections are no longer supported.
ErrorHandler is an Option to set the async error handler.
ExpectLastMsgId sets the expected last msgId in the response from the publish.
ExpectLastSequence sets the expected sequence in the response from the publish.
ExpectLastSequencePerSubject sets the expected sequence per subject in the response from the publish.
ExpectStream sets the expected stream to respond from the publish.
FlusherTimeout is an Option to set the write (and flush) timeout on a connection.
GetDefaultOptions returns default configuration options for the client.
GetObjectDigestValue calculates the base64 value of hashed data
GetObjectInfoShowDeleted makes GetInfo() return object if it was marked as deleted.
GetObjectShowDeleted makes Get() return object if it was marked as deleted.
HeadersOnly() will instruct the consumer to only deliver headers and no payloads.
IdleHeartbeat enables push based consumers to have idle heartbeats delivered. For pull consumers, idle heartbeat has to be set on each [Fetch] call.
IgnoreAuthErrorAbort opts out of the default connect behavior of aborting subsequent reconnect attempts if server returns the same auth error twice.
IgnoreDeletes will have the key watcher not pass any deleted keys.
InactiveThreshold indicates how long the server should keep a consumer after detecting a lack of activity. In NATS Server 2.8.4 and earlier, this option only applies to ephemeral consumers. In NATS Server 2.9.0 and later, this option applies to both ephemeral and durable consumers, allowing durable consumers to also be deleted automatically after the inactivity threshold has passed.
IncludeHistory instructs the key watcher to include historical values as well.
InProcessServer is an Option that will try to establish a direction to a NATS server running within the process instead of dialing via TCP.
LameDuckModeHandler sets the callback to invoke when the server notifies the connection that it entered lame duck mode, that is, going to gradually disconnect all its connections before shutting down. This is often used in deployments when upgrading NATS Servers.
LastRevision deletes if the latest revision matches.
ListObjectsShowDeleted makes ListObjects() return deleted objects.
ManualAck disables auto ack functionality for async subscriptions.
MaxAckPending sets the number of outstanding acks that are allowed before message delivery is halted.
MaxDeliver sets the number of redeliveries for a message.
MaxPingsOutstanding is an Option to set the maximum number of ping requests that can go unanswered by the server before closing the connection. Defaults to 2.
MaxReconnects is an Option to set the maximum number of reconnect attempts. If negative, it will never stop trying to reconnect. Defaults to 60.
MaxRequestBatch sets the maximum pull consumer batch size that a Fetch() can request.
MaxRequestExpires sets the maximum pull consumer request expiration that a Fetch() can request (using the Fetch's timeout value).
MaxRequesMaxBytes sets the maximum pull consumer request bytes that a Fetch() can receive.
MetaOnly instructs the key watcher to retrieve only the entry meta data, not the entry value
MsgId sets the message ID used for deduplication.
MsgTTL sets per msg TTL. Requires [StreamConfig.AllowMsgTTL] to be enabled.
Name is an Option to set the client name.
NewEncodedConn will wrap an existing Connection and utilize the appropriate registered encoder. Deprecated: Encoded connections are no longer supported.
NewInbox will return an inbox string which can be used for directed replies from subscribers. These are guaranteed to be unique, but can be shared and subscribed to by others.
NewMsg creates a message for publishing that will use headers.
Nkey will set the public Nkey and the signature callback to sign the server nonce.
NkeyOptionFromSeed will load an nkey pair from a seed file. It will return the NKey Option and will handle signing of nonce challenges from the server. It will take care to not hold keys in memory and to wipe memory.
NoCallbacksAfterClientClose is an Option to disable callbacks when user code calls Close(). If close is initiated by any other condition, callbacks if any will be invoked.
NoEcho is an Option to turn off messages echoing back from a server. Note this is supported on servers >= version 1.2. Proto 1 or greater.
NoReconnect is an Option to turn off reconnect behavior.
OrderedConsumer will create a FIFO direct/ephemeral consumer for in order delivery of messages. There are no redeliveries and no acks, and flow control and heartbeats will be added but will be taken care of without additional client code.
PingInterval is an Option to set the period for client ping commands. Defaults to 2m.
ProxyPath is an option for websocket connections that adds a path to connections url. This is useful when connecting to NATS behind a proxy.
PublishAsyncErrHandler sets the error handler for async publishes in JetStream.
PublishAsyncMaxPending sets the maximum outstanding async publishes that can be inflight at one time.
PublishAsyncTimeout sets the timeout for async message publish. If not provided, timeout is disabled.
PullMaxWaiting defines the max inflight pull requests.
RateLimit is the Bits per sec rate limit applied to a push consumer.
ReconnectBufSize sets the buffer size of messages kept while busy reconnecting. Defaults to 8388608 bytes (8MB). It can be disabled by setting it to -1.
ReconnectErrHandler is an Option to set the reconnect error handler.
ReconnectHandler is an Option to set the reconnected handler.
ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait. Defaults to 100ms and 1s, respectively.
ReconnectWait is an Option to set the wait time between reconnect attempts. Defaults to 2s.
RegisterEncoder will register the encType with the given Encoder. Useful for customization. Deprecated: Encoded connections are no longer supported.
ReplayInstant replays the messages as fast as possible.
ReplayOriginal replays the messages at the original speed.
RetryAttempts sets the retry number of attempts when ErrNoResponders is encountered.
RetryOnFailedConnect sets the connection in reconnecting state right away if it can't connect to a server in the initial set. See RetryOnFailedConnect option for more details.
RetryWait sets the retry wait time when ErrNoResponders is encountered.
RootCAs is a helper option to provide the RootCAs pool from a list of filenames. If Secure is not already set this will set it as well.
Secure is an Option to enable TLS secure connections that skip server verification by default. Pass a TLS Configuration for proper TLS. A TLS Configuration using InsecureSkipVerify should NOT be used in a production setting.
SetCustomDialer is an Option to set a custom dialer which will be used when attempting to establish a connection. If both Dialer and CustomDialer are specified, CustomDialer takes precedence.
SkipConsumerLookup will omit looking up consumer when [Bind], [Durable] or [ConsumerName] are provided. NOTE: This setting may cause an existing consumer to be overwritten. Also, because consumer lookup is skipped, all consumer options like AckPolicy, DeliverSubject etc. need to be provided even if consumer already exists.
SkipHostLookup is an Option to skip the host lookup when connecting to a server.
StallWait sets the max wait when the producer becomes stall producing messages.
StartSequence configures a Consumer to receive messages from a start sequence.
StartTime configures a Consumer to receive messages from a start time.
StreamListFilter is an option that can be used to configure `StreamsInfo()` and `StreamNames()` requests. It allows filtering the returned streams by subject associated with each stream. Wildcards can be used. For example, `StreamListFilter(FOO.*.A) will return all streams which have at least one subject matching the provided pattern (e.g. FOO.TEST.A).
SyncQueueLen will set the maximum queue len for the internal channel used for SubscribeSync(). Defaults to 65536.
Timeout is an Option to set the timeout for Dial on a connection. Defaults to 2s.
TLSHandshakeFirst is an Option to perform the TLS handshake first, that is before receiving the INFO protocol. This requires the server to also be configured with such option, otherwise the connection will fail.
Token is an Option to set the token to use when a token is not included directly in the URLs and when a token handler is not provided.
TokenHandler is an Option to set the token handler to use when a token is not included directly in the URLs and when a token is not set.
UpdatesOnly instructs the key watcher to only include updates on values (without latest values when started).
UseLegacyDurableConsumers makes JetStream use the legacy (pre nats-server v2.9.0) subjects for consumer creation. If this option is used when creating JetStremContext, $JS.API.CONSUMER.DURABLE.CREATE.<stream>.<consumer> will be used to create a consumer with Durable provided, rather than $JS.API.CONSUMER.CREATE.<stream>.<consumer>.
UseOldRequestStyle is an Option to force usage of the old Request style.
UserCredentials is a convenience function that takes a filename for a user's JWT and a filename for the user's private Nkey seed.
UserInfo is an Option to set the username and password to use when not included directly in the URLs.
UserJWT will set the callbacks to retrieve the user's JWT and the signature callback to sign the server nonce. This an the Nkey option are mutually exclusive.
UserJWTAndSeed is a convenience function that takes the JWT and seed values as strings.
Package-Level Variables (total 125)
Deprecated: Use GetDefaultOptions() instead. DefaultOptions is not safe for use by multiple clients. For details see #308.
ErrAsyncPublishTimeout is returned when waiting for ack on async publish
Errors
Errors
Errors
Errors
Errors
Errors
ErrBadRequest is returned when invalid request is sent to JetStream API.
Errors
Errors
ErrCantAckIfConsumerAckNone is returned when attempting to ack a message for consumer with AckNone policy set.
Errors
ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer.
ErrConsumerDeleted is returned when attempting to send pull request to a consumer which does not exist
ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed
ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting multiple filter subjects with filter_subjects field. If this error is returned when executing AddConsumer(), the consumer with invalid configuration was already created in the server.
ErrConsumerNameAlreadyInUse is an error returned when consumer with given name already exists.
ErrConsumerNameRequired is returned when the provided consumer durable name is empty.
ErrConsumerNotActive is an error returned when consumer is not active.
ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrContextAndTimeout is returned when attempting to use both context and timeout.
Errors
Errors
ErrDuplicateFilterSubjects is returned when both FilterSubject and FilterSubjects are specified when creating consumer.
ErrEmptyFilter is returned when a filter in FilterSubjects is empty.
ErrFetchDisconnected is returned when the connection to the server is lost while waiting for messages to be delivered on PullSubscribe.
Errors
ErrInvalidConsumerName is returned when the provided consumer name is invalid (contains '.' or ' ').
Deprecated: ErrInvalidDurableName is no longer returned and will be removed in future releases. Use ErrInvalidConsumerName instead.
ErrInvalidFilterSubject is returned when the provided filter subject is invalid.
ErrInvalidJSAck is returned when JetStream ack from message publish is invalid.
Errors
Errors
ErrInvalidStreamName is returned when the provided stream name is invalid (contains '.' or ' ').
ErrJetStreamNotEnabled is an error returned when JetStream is not enabled for an account. Note: This error will not be returned in clustered mode, even if each server in the cluster does not have JetStream enabled. In clustered mode, requests will time out instead.
ErrJetStreamNotEnabledForAccount is an error returned when JetStream is not enabled for an account.
ErrJetStreamPublisherClosed is returned for each unfinished ack future when JetStream.Cleanup is called.
Errors
Errors
Errors
Errors
Errors
ErrMsgAlreadyAckd is returned when attempting to acknowledge message more than once.
Errors
Errors
ErrMsgNotFound is returned when message with provided sequence number does npt exist.
Errors
ErrNoHeartbeat is returned when no heartbeat is received from server when sending requests with pull consumer.
Errors
ErrNoMatchingStream is returned when stream lookup by subject is unsuccessful.
Errors
Errors
ErrNoStreamResponse is returned when there is no response from stream (e.g. no responders error).
ErrNotJSMessage is returned when attempting to get metadata from non JetStream message .
Errors
ErrDuplicateFilterSubjects is returned when filter subjects overlap when creating consumer.
ErrPullSubscribeRequired is returned when attempting to use subscribe methods not suitable for pull consumers for pull consumers.
ErrPullSubscribeToPushConsumer is returned when attempting to use PullSubscribe on push consumer.
Errors
ErrStreamConfigRequired is returned when empty stream configuration is supplied to add/update stream.
ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration.
ErrStreamNameRequired is returned when the provided stream name is empty.
ErrStreamNotFound is an error returned when stream with given name does not exist.
ErrStreamSourceMultipleSubjectTransformsNotSupported is returned when the connected nats-server version does not support setting the stream sources. If this error is returned when executing AddStream(), the stream with invalid configuration was already created in the server.
ErrStreamSourceNotSupported is returned when the connected nats-server version does not support setting the stream sources. If this error is returned when executing AddStream(), the stream with invalid configuration was already created in the server.
ErrStreamSourceSubjectTransformNotSupported is returned when the connected nats-server version does not support setting the stream source subject transform. If this error is returned when executing AddStream(), the stream with invalid configuration was already created in the server.
ErrStreamSubjectTransformNotSupported is returned when the connected nats-server version does not support setting the stream subject transform. If this error is returned when executing AddStream(), the stream with invalid configuration was already created in the server.
ErrSubjectMismatch is returned when the provided subject does not match consumer's filter subject.
ErrSubscriptionClosed is returned when attempting to send pull request to a closed subscription
Errors
ErrTooManyStalledMsgs is returned when too many outstanding async messages are waiting for ack.
Package-Level Constants (total 136)
ACCOUNT_AUTHENTICATION_EXPIRED_ERR is for when nats server account authorization has expired.
AckAllPolicy when acking a sequence number, this implicitly acks all sequences below this one as well.
AckExplicitPolicy requires ack or nack for all messages.
AckNonePolicy requires no acks for delivered messages.
Used to watch all keys.
The different types of subscription types.
AUTHENTICATION_EXPIRED_ERR is for when nats server user authorization has expired.
AUTHENTICATION_REVOKED_ERR is for when user authorization has been revoked.
AUTHORIZATION_ERR is for when nats server user authorization has failed.
The different types of subscription types.
const CLOSED Status = 2
const CONNECTED Status = 1
const CONNECTING Status = 4
Indexed names into the Registered Encoders.
Default Constants
Default Constants
Default Constants
Default Constants
Default Constants
Default Constants
Default Constants
Default number of retries
Default time wait between retries on Publish iff err is NoResponders.
Default Constants
Default Constants
Default Constants
Default Constants
DefaultSubPendingBytesLimit is 64MB
DefaultSubPendingMsgsLimit will be 512k msgs.
Default Constants
Default Constants
DeliverAllPolicy starts delivering messages from the very beginning of a stream. This is the default.
DeliverByStartSequencePolicy will deliver messages starting from a given sequence.
DeliverByStartTimePolicy will deliver messages starting from a given time.
DeliverLastPerSubjectPolicy will start the consumer with the last message for all subjects received.
DeliverLastPolicy will start the consumer with the last sequence received.
DeliverNewPolicy will only deliver new messages that are sent after the consumer is created.
DiscardNew will fail to store new messages.
DiscardOld will remove older messages to return to the limits. This is the default.
const DISCONNECTED Status = 0
Headers for published messages.
Headers for published messages.
Headers for published messages.
Headers for published messages.
FileStorage specifies on disk storage. It's the default.
Indexed names into the Registered Encoders.
InboxPrefix is the prefix for all inbox subjects.
const INFO_ARG = 30
InterestPolicy specifies that when all known observables have acknowledged a message it can be removed.
Headers for republished messages and direct gets.
Indexed names into the Registered Encoders.
Headers for republished messages and direct gets.
Headers for republished messages and direct gets.
Headers for republished messages and direct gets.
Headers for republished messages and direct gets.
Used to watch all keys.
Default Constants
LimitsPolicy (default) means that messages are retained until any given limit is reached. This could be one of MaxMsgs, MaxBytes, or MaxAge.
MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit
const MAX_CONTROL_LINE_SIZE = 4096
MAX_SUBSCRIPTIONS_ERR is for when nats server denies the connection due to server subscriptions limit
MemoryStorage specifies in memory only.
const MINUS_ERR_ARG = 9
const MSG_ARG = 14
const MSG_END = 16
const MSG_PAYLOAD = 15
Headers for published messages.
Headers for published messages.
Rollups, can be subject only or all messages.
Rollups, can be subject only or all messages.
MsgSize is a header that will be part of a consumer's delivered message if HeadersOnly requested.
Headers for published messages.
The different types of subscription types.
const OP_H = 17
const OP_I = 25
const OP_IN = 26
const OP_INF = 27
const OP_INFO = 28
const OP_INFO_SPC = 29
const OP_M = 10
const OP_MINUS = 4
const OP_MINUS_E = 5
const OP_MINUS_ER = 6
const OP_MINUS_ERR = 7
const OP_MINUS_ERR_SPC = 8
const OP_MS = 11
const OP_MSG = 12
const OP_MSG_SPC = 13
const OP_P = 18
const OP_PI = 19
const OP_PIN = 20
const OP_PING = 21
const OP_PLUS = 1
const OP_PLUS_O = 2
const OP_PLUS_OK = 3
const OP_PO = 22
const OP_PON = 23
const OP_PONG = 24
const OP_START = 0
PERMISSIONS_ERR is for when nats server subject authorization has failed.
The different types of subscription types.
const RECONNECTING Status = 3
ReplayInstantPolicy will replay messages as fast as possible.
ReplayOriginalPolicy will maintain the same timing as the messages were received.
Default Constants
STALE_CONNECTION is for detection and proper handling of stale connections.
The different types of subscription types.
Default Constants
WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed.