// Copyright 2012-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// A Go client for the NATS messaging system (https://nats.io).
package nats import ( ) // Default Constants const ( Version = "1.41.2" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 DefaultReconnectWait = 2 * time.Second DefaultReconnectJitter = 100 * time.Millisecond DefaultReconnectJitterTLS = time.Second DefaultTimeout = 2 * time.Second DefaultPingInterval = 2 * time.Minute DefaultMaxPingOut = 2 DefaultMaxChanLen = 64 * 1024 // 64k DefaultReconnectBufSize = 8 * 1024 * 1024 // 8MB RequestChanLen = 8 DefaultDrainTimeout = 30 * time.Second DefaultFlusherTimeout = time.Minute LangString = "go" ) const ( // STALE_CONNECTION is for detection and proper handling of stale connections. STALE_CONNECTION = "stale connection" // PERMISSIONS_ERR is for when nats server subject authorization has failed. PERMISSIONS_ERR = "permissions violation" // AUTHORIZATION_ERR is for when nats server user authorization has failed. AUTHORIZATION_ERR = "authorization violation" // AUTHENTICATION_EXPIRED_ERR is for when nats server user authorization has expired. AUTHENTICATION_EXPIRED_ERR = "user authentication expired" // AUTHENTICATION_REVOKED_ERR is for when user authorization has been revoked. AUTHENTICATION_REVOKED_ERR = "user authentication revoked" // ACCOUNT_AUTHENTICATION_EXPIRED_ERR is for when nats server account authorization has expired. ACCOUNT_AUTHENTICATION_EXPIRED_ERR = "account authentication expired" // MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit MAX_CONNECTIONS_ERR = "maximum connections exceeded" // MAX_SUBSCRIPTIONS_ERR is for when nats server denies the connection due to server subscriptions limit MAX_SUBSCRIPTIONS_ERR = "maximum subscriptions exceeded" ) // Errors var ( ErrConnectionClosed = errors.New("nats: connection closed") ErrConnectionDraining = errors.New("nats: connection draining") ErrDrainTimeout = errors.New("nats: draining connection timed out") ErrConnectionReconnecting = errors.New("nats: connection reconnecting") ErrSecureConnRequired = errors.New("nats: secure connection required") ErrSecureConnWanted = errors.New("nats: secure connection not available") ErrBadSubscription = errors.New("nats: invalid subscription") ErrTypeSubscription = errors.New("nats: invalid subscription type") ErrBadSubject = errors.New("nats: invalid subject") ErrBadQueueName = errors.New("nats: invalid queue name") ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped") ErrTimeout = errors.New("nats: timeout") ErrBadTimeout = errors.New("nats: timeout invalid") ErrAuthorization = errors.New("nats: authorization violation") ErrAuthExpired = errors.New("nats: authentication expired") ErrAuthRevoked = errors.New("nats: authentication revoked") ErrPermissionViolation = errors.New("nats: permissions violation") ErrAccountAuthExpired = errors.New("nats: account authentication expired") ErrNoServers = errors.New("nats: no servers available for connection") ErrJsonParse = errors.New("nats: connect message, json parse error") ErrChanArg = errors.New("nats: argument needs to be a channel type") ErrMaxPayload = errors.New("nats: maximum payload exceeded") ErrMaxMessages = errors.New("nats: maximum messages delivered") ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription") ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed") ErrClientCertOrRootCAsRequired = errors.New("nats: at least one of certCB or rootCAsCB must be set") ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received") ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded") ErrInvalidConnection = errors.New("nats: invalid connection") ErrInvalidMsg = errors.New("nats: invalid message or message nil") ErrInvalidArg = errors.New("nats: invalid argument") ErrInvalidContext = errors.New("nats: invalid context") ErrNoDeadlineContext = errors.New("nats: context requires a deadline") ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server") ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server") ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler") ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler") ErrNoUserCB = errors.New("nats: user callback not defined") ErrNkeyAndUser = errors.New("nats: user callback and nkey defined") ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server") ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) ErrTokenAlreadySet = errors.New("nats: token and token handler both set") ErrUserInfoAlreadySet = errors.New("nats: cannot set user info callback and user/pass") ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection") ErrMsgNoReply = errors.New("nats: message does not have a reply") ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") ErrDisconnected = errors.New("nats: server is disconnected") ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") ErrBadHeaderMsg = errors.New("nats: message could not decode headers") ErrNoResponders = errors.New("nats: no responders available for request") ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded") ErrConnectionNotTLS = errors.New("nats: connection is not tls") ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded") ) // GetDefaultOptions returns default configuration options for the client. func () Options { return Options{ AllowReconnect: true, MaxReconnect: DefaultMaxReconnect, ReconnectWait: DefaultReconnectWait, ReconnectJitter: DefaultReconnectJitter, ReconnectJitterTLS: DefaultReconnectJitterTLS, Timeout: DefaultTimeout, PingInterval: DefaultPingInterval, MaxPingsOut: DefaultMaxPingOut, SubChanLen: DefaultMaxChanLen, ReconnectBufSize: DefaultReconnectBufSize, DrainTimeout: DefaultDrainTimeout, FlusherTimeout: DefaultFlusherTimeout, } } // Deprecated: Use GetDefaultOptions() instead. // DefaultOptions is not safe for use by multiple clients. // For details see #308. var DefaultOptions = GetDefaultOptions() // Status represents the state of the connection. type Status int const ( DISCONNECTED = Status(iota) CONNECTED CLOSED RECONNECTING CONNECTING DRAINING_SUBS DRAINING_PUBS ) func ( Status) () string { switch { case DISCONNECTED: return "DISCONNECTED" case CONNECTED: return "CONNECTED" case CLOSED: return "CLOSED" case RECONNECTING: return "RECONNECTING" case CONNECTING: return "CONNECTING" case DRAINING_SUBS: return "DRAINING_SUBS" case DRAINING_PUBS: return "DRAINING_PUBS" } return "unknown status" } // ConnHandler is used for asynchronous events such as // disconnected and closed connections. type ConnHandler func(*Conn) // ConnErrHandler is used to process asynchronous events like // disconnected connection with the error (if any). type ConnErrHandler func(*Conn, error) // ErrHandler is used to process asynchronous errors encountered // while processing inbound messages. type ErrHandler func(*Conn, *Subscription, error) // UserJWTHandler is used to fetch and return the account signed // JWT for this user. type UserJWTHandler func() (string, error) // TLSCertHandler is used to fetch and return tls certificate. type TLSCertHandler func() (tls.Certificate, error) // RootCAsHandler is used to fetch and return a set of root certificate // authorities that clients use when verifying server certificates. type RootCAsHandler func() (*x509.CertPool, error) // SignatureHandler is used to sign a nonce from the server while // authenticating with nkeys. The user should sign the nonce and // return the raw signature. The client will base64 encode this to // send to the server. type SignatureHandler func([]byte) ([]byte, error) // AuthTokenHandler is used to generate a new token. type AuthTokenHandler func() string // UserInfoCB is used to pass the username and password when establishing connection. type UserInfoCB func() (string, string) // ReconnectDelayHandler is used to get from the user the desired // delay the library should pause before attempting to reconnect // again. Note that this is invoked after the library tried the // whole list of URLs and failed to reconnect. type ReconnectDelayHandler func(attempts int) time.Duration // asyncCB is used to preserve order for async callbacks. type asyncCB struct { f func() next *asyncCB } type asyncCallbacksHandler struct { mu sync.Mutex cond *sync.Cond head *asyncCB tail *asyncCB } // Option is a function on the options for a connection. type Option func(*Options) error // CustomDialer can be used to specify any dialer, not necessarily a // *net.Dialer. A CustomDialer may also implement `SkipTLSHandshake() bool` // in order to skip the TLS handshake in case not required. type CustomDialer interface { Dial(network, address string) (net.Conn, error) } type InProcessConnProvider interface { InProcessConn() (net.Conn, error) } // Options can be used to create a customized connection. type Options struct { // Url represents a single NATS server url to which the client // will be connecting. If the Servers option is also set, it // then becomes the first server in the Servers array. Url string // InProcessServer represents a NATS server running within the // same process. If this is set then we will attempt to connect // to the server directly rather than using external TCP conns. InProcessServer InProcessConnProvider // Servers is a configured set of servers which this client // will use when attempting to connect. Servers []string // NoRandomize configures whether we will randomize the // server pool. NoRandomize bool // NoEcho configures whether the server will echo back messages // that are sent on this connection if we also have matching subscriptions. // Note this is supported on servers >= version 1.2. Proto 1 or greater. NoEcho bool // Name is an optional name label which will be sent to the server // on CONNECT to identify the client. Name string // Verbose signals the server to send an OK ack for commands // successfully processed by the server. Verbose bool // Pedantic signals the server whether it should be doing further // validation of subjects. Pedantic bool // Secure enables TLS secure connections that skip server // verification by default. NOT RECOMMENDED. Secure bool // TLSConfig is a custom TLS configuration to use for secure // transports. TLSConfig *tls.Config // TLSCertCB is used to fetch and return custom tls certificate. TLSCertCB TLSCertHandler // TLSHandshakeFirst is used to instruct the library perform // the TLS handshake right after the connect and before receiving // the INFO protocol from the server. If this option is enabled // but the server is not configured to perform the TLS handshake // first, the connection will fail. TLSHandshakeFirst bool // RootCAsCB is used to fetch and return a set of root certificate // authorities that clients use when verifying server certificates. RootCAsCB RootCAsHandler // AllowReconnect enables reconnection logic to be used when we // encounter a disconnect from the current server. AllowReconnect bool // MaxReconnect sets the number of reconnect attempts that will be // tried before giving up. If negative, then it will never give up // trying to reconnect. // Defaults to 60. MaxReconnect int // ReconnectWait sets the time to backoff after attempting a reconnect // to a server that we were already connected to previously. // Defaults to 2s. ReconnectWait time.Duration // CustomReconnectDelayCB is invoked after the library tried every // URL in the server list and failed to reconnect. It passes to the // user the current number of attempts. This function returns the // amount of time the library will sleep before attempting to reconnect // again. It is strongly recommended that this value contains some // jitter to prevent all connections to attempt reconnecting at the same time. CustomReconnectDelayCB ReconnectDelayHandler // ReconnectJitter sets the upper bound for a random delay added to // ReconnectWait during a reconnect when no TLS is used. // Defaults to 100ms. ReconnectJitter time.Duration // ReconnectJitterTLS sets the upper bound for a random delay added to // ReconnectWait during a reconnect when TLS is used. // Defaults to 1s. ReconnectJitterTLS time.Duration // Timeout sets the timeout for a Dial operation on a connection. // Defaults to 2s. Timeout time.Duration // DrainTimeout sets the timeout for a Drain Operation to complete. // Defaults to 30s. DrainTimeout time.Duration // FlusherTimeout is the maximum time to wait for write operations // to the underlying connection to complete (including the flusher loop). // Defaults to 1m. FlusherTimeout time.Duration // PingInterval is the period at which the client will be sending ping // commands to the server, disabled if 0 or negative. // Defaults to 2m. PingInterval time.Duration // MaxPingsOut is the maximum number of pending ping commands that can // be awaiting a response before raising an ErrStaleConnection error. // Defaults to 2. MaxPingsOut int // ClosedCB sets the closed handler that is called when a client will // no longer be connected. ClosedCB ConnHandler // DisconnectedCB sets the disconnected handler that is called // whenever the connection is disconnected. // Will not be called if DisconnectedErrCB is set // Deprecated. Use DisconnectedErrCB which passes error that caused // the disconnect event. DisconnectedCB ConnHandler // DisconnectedErrCB sets the disconnected error handler that is called // whenever the connection is disconnected. // Disconnected error could be nil, for instance when user explicitly closes the connection. // DisconnectedCB will not be called if DisconnectedErrCB is set DisconnectedErrCB ConnErrHandler // ConnectedCB sets the connected handler called when the initial connection // is established. It is not invoked on successful reconnects - for reconnections, // use ReconnectedCB. ConnectedCB can be used in conjunction with RetryOnFailedConnect // to detect whether the initial connect was successful. ConnectedCB ConnHandler // ReconnectedCB sets the reconnected handler called whenever // the connection is successfully reconnected. ReconnectedCB ConnHandler // DiscoveredServersCB sets the callback that is invoked whenever a new // server has joined the cluster. DiscoveredServersCB ConnHandler // AsyncErrorCB sets the async error handler (e.g. slow consumer errors) AsyncErrorCB ErrHandler // ReconnectErrCB sets the callback that is invoked whenever a // reconnect attempt failed ReconnectErrCB ConnErrHandler // ReconnectBufSize is the size of the backing bufio during reconnect. // Once this has been exhausted publish operations will return an error. // Defaults to 8388608 bytes (8MB). ReconnectBufSize int // SubChanLen is the size of the buffered channel used between the socket // Go routine and the message delivery for SyncSubscriptions. // NOTE: This does not affect AsyncSubscriptions which are // dictated by PendingLimits() // Defaults to 65536. SubChanLen int // UserJWT sets the callback handler that will fetch a user's JWT. UserJWT UserJWTHandler // Nkey sets the public nkey that will be used to authenticate // when connecting to the server. UserJWT and Nkey are mutually exclusive // and if defined, UserJWT will take precedence. Nkey string // SignatureCB designates the function used to sign the nonce // presented from the server. SignatureCB SignatureHandler // User sets the username to be used when connecting to the server. User string // Password sets the password to be used when connecting to a server. Password string // UserInfo sets the callback handler that will fetch the username and password. UserInfo UserInfoCB // Token sets the token to be used when connecting to a server. Token string // TokenHandler designates the function used to generate the token to be used when connecting to a server. TokenHandler AuthTokenHandler // Dialer allows a custom net.Dialer when forming connections. // Deprecated: should use CustomDialer instead. Dialer *net.Dialer // CustomDialer allows to specify a custom dialer (not necessarily // a *net.Dialer). CustomDialer CustomDialer // UseOldRequestStyle forces the old method of Requests that utilize // a new Inbox and a new Subscription for each request. UseOldRequestStyle bool // NoCallbacksAfterClientClose allows preventing the invocation of // callbacks after Close() is called. Client won't receive notifications // when Close is invoked by user code. Default is to invoke the callbacks. NoCallbacksAfterClientClose bool // LameDuckModeHandler sets the callback to invoke when the server notifies // the connection that it entered lame duck mode, that is, going to // gradually disconnect all its connections before shutting down. This is // often used in deployments when upgrading NATS Servers. LameDuckModeHandler ConnHandler // RetryOnFailedConnect sets the connection in reconnecting state right // away if it can't connect to a server in the initial set. The // MaxReconnect and ReconnectWait options are used for this process, // similarly to when an established connection is disconnected. // If a ReconnectHandler is set, it will be invoked on the first // successful reconnect attempt (if the initial connect fails), // and if a ClosedHandler is set, it will be invoked if // it fails to connect (after exhausting the MaxReconnect attempts). RetryOnFailedConnect bool // For websocket connections, indicates to the server that the connection // supports compression. If the server does too, then data will be compressed. Compression bool // For websocket connections, adds a path to connections url. // This is useful when connecting to NATS behind a proxy. ProxyPath string // InboxPrefix allows the default _INBOX prefix to be customized InboxPrefix string // IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting // subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy). IgnoreAuthErrorAbort bool // SkipHostLookup skips the DNS lookup for the server hostname. SkipHostLookup bool // PermissionErrOnSubscribe - if set to true, the client will return ErrPermissionViolation // from SubscribeSync if the server returns a permissions error for a subscription. // Defaults to false. PermissionErrOnSubscribe bool } const ( // Scratch storage for assembling protocol headers scratchSize = 512 // The size of the bufio reader/writer on top of the socket. defaultBufSize = 32768 // The buffered size of the flush "kick" channel flushChanSize = 1 // Default server pool size srvPoolSize = 4 // NUID size nuidSize = 22 // Default ports used if none is specified in given URL(s) defaultWSPortString = "80" defaultWSSPortString = "443" defaultPortString = "4222" ) // A Conn represents a bare connection to a nats-server. // It can send and receive []byte payloads. // The connection is safe to use in multiple Go routines concurrently. type Conn struct { // Keep all members for which we use atomic at the beginning of the // struct and make sure they are all 64bits (or use padding if necessary). // atomic.* functions crash on 32bit machines if operand is not aligned // at 64bit. See https://github.com/golang/go/issues/599 Statistics mu sync.RWMutex // Opts holds the configuration of the Conn. // Modifying the configuration of a running Conn is a race. Opts Options wg sync.WaitGroup srvPool []*srv current *srv urls map[string]struct{} // Keep track of all known URLs (used by processInfo) conn net.Conn bw *natsWriter br *natsReader fch chan struct{} info serverInfo ssid int64 subsMu sync.RWMutex subs map[int64]*Subscription ach *asyncCallbacksHandler pongs []chan struct{} scratch [scratchSize]byte status Status statListeners map[Status]map[chan Status]struct{} initc bool // true if the connection is performing the initial connect err error ps *parseState ptmr *time.Timer pout int ar bool // abort reconnect rqch chan struct{} ws bool // true if a websocket connection // New style response handler respSub string // The wildcard subject respSubPrefix string // the wildcard prefix including trailing . respSubLen int // the length of the wildcard prefix excluding trailing . respMux *Subscription // A single response subscription respMap map[string]chan *Msg // Request map for the response msg channels respRand *rand.Rand // Used for generating suffix // Msg filters for testing. // Protected by subsMu filters map[string]msgFilter } type natsReader struct { r io.Reader buf []byte off int n int } type natsWriter struct { w io.Writer bufs []byte limit int pending *bytes.Buffer plimit int } // Subscription represents interest in a given subject. type Subscription struct { mu sync.Mutex sid int64 // Subject that represents this subscription. This can be different // than the received subject inside a Msg if this is a wildcard. Subject string // Optional queue group name. If present, all subscriptions with the // same name will form a distributed queue, and each message will // only be processed by one member of the group. Queue string // For holding information about a JetStream consumer. jsi *jsSub delivered uint64 max uint64 conn *Conn mcb MsgHandler mch chan *Msg errCh chan (error) closed bool sc bool connClosed bool draining bool status SubStatus statListeners map[chan SubStatus][]SubStatus permissionsErr error // Type of Subscription typ SubscriptionType // Async linked list pHead *Msg pTail *Msg pCond *sync.Cond pDone func(subject string) // Pending stats, async subscriptions, high-speed etc. pMsgs int pBytes int pMsgsMax int pBytesMax int pMsgsLimit int pBytesLimit int dropped int } // Status represents the state of the connection. type SubStatus int const ( SubscriptionActive = SubStatus(iota) SubscriptionDraining SubscriptionClosed SubscriptionSlowConsumer ) func ( SubStatus) () string { switch { case SubscriptionActive: return "Active" case SubscriptionDraining: return "Draining" case SubscriptionClosed: return "Closed" case SubscriptionSlowConsumer: return "SlowConsumer" } return "unknown status" } // Msg represents a message delivered by NATS. This structure is used // by Subscribers and PublishMsg(). // // # Types of Acknowledgements // // In case using JetStream, there are multiple ways to ack a Msg: // // // Acknowledgement that a message has been processed. // msg.Ack() // // // Negatively acknowledges a message. // msg.Nak() // // // Terminate a message so that it is not redelivered further. // msg.Term() // // // Signal the server that the message is being worked on and reset redelivery timer. // msg.InProgress() type Msg struct { Subject string Reply string Header Header Data []byte Sub *Subscription // Internal next *Msg wsz int barrier *barrierInfo ackd uint32 } // Compares two msgs, ignores sub but checks all other public fields. func ( *Msg) ( *Msg) bool { if == { return true } if == nil || == nil { return false } if .Subject != .Subject || .Reply != .Reply { return false } if !bytes.Equal(.Data, .Data) { return false } if len(.Header) != len(.Header) { return false } for , := range .Header { , := .Header[] if ! || len() != len() { return false } for , := range { if != [] { return false } } } return true } // Size returns a message size in bytes. func ( *Msg) () int { if .wsz != 0 { return .wsz } , := .headerBytes() return len(.Subject) + len(.Reply) + len() + len(.Data) } func ( *Msg) () ([]byte, error) { var []byte if len(.Header) == 0 { return , nil } var bytes.Buffer , := .WriteString(hdrLine) if != nil { return nil, ErrBadHeaderMsg } = http.Header(.Header).Write(&) if != nil { return nil, ErrBadHeaderMsg } _, = .WriteString(crlf) if != nil { return nil, ErrBadHeaderMsg } return .Bytes(), nil } type barrierInfo struct { refs int64 f func() } // Tracks various stats received and sent on this connection, // including counts for messages and bytes. type Statistics struct { InMsgs uint64 OutMsgs uint64 InBytes uint64 OutBytes uint64 Reconnects uint64 } // Tracks individual backend servers. type srv struct { url *url.URL didConnect bool reconnects int lastErr error isImplicit bool tlsName string } // The INFO block received from the server. type serverInfo struct { ID string `json:"server_id"` Name string `json:"server_name"` Proto int `json:"proto"` Version string `json:"version"` Host string `json:"host"` Port int `json:"port"` Headers bool `json:"headers"` AuthRequired bool `json:"auth_required,omitempty"` TLSRequired bool `json:"tls_required,omitempty"` TLSAvailable bool `json:"tls_available,omitempty"` MaxPayload int64 `json:"max_payload"` CID uint64 `json:"client_id,omitempty"` ClientIP string `json:"client_ip,omitempty"` Nonce string `json:"nonce,omitempty"` Cluster string `json:"cluster,omitempty"` ConnectURLs []string `json:"connect_urls,omitempty"` LameDuckMode bool `json:"ldm,omitempty"` } const ( // clientProtoZero is the original client protocol from 2009. // http://nats.io/documentation/internals/nats-protocol/ /* clientProtoZero */ _ = iota // clientProtoInfo signals a client can receive more then the original INFO block. // This can be used to update clients on other cluster members, etc. clientProtoInfo ) type connectInfo struct { Verbose bool `json:"verbose"` Pedantic bool `json:"pedantic"` UserJWT string `json:"jwt,omitempty"` Nkey string `json:"nkey,omitempty"` Signature string `json:"sig,omitempty"` User string `json:"user,omitempty"` Pass string `json:"pass,omitempty"` Token string `json:"auth_token,omitempty"` TLS bool `json:"tls_required"` Name string `json:"name"` Lang string `json:"lang"` Version string `json:"version"` Protocol int `json:"protocol"` Echo bool `json:"echo"` Headers bool `json:"headers"` NoResponders bool `json:"no_responders"` } // MsgHandler is a callback function that processes messages delivered to // asynchronous subscribers. type MsgHandler func(msg *Msg) // Connect will attempt to connect to the NATS system. // The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222 // Comma separated arrays are also supported, e.g. urlA, urlB. // Options start with the defaults but can be overridden. // To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as // `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls). func ( string, ...Option) (*Conn, error) { := GetDefaultOptions() .Servers = processUrlString() for , := range { if != nil { if := (&); != nil { return nil, } } } return .Connect() } // Options that can be passed to Connect. // Name is an Option to set the client name. func ( string) Option { return func( *Options) error { .Name = return nil } } // InProcessServer is an Option that will try to establish a direction to a NATS server // running within the process instead of dialing via TCP. func ( InProcessConnProvider) Option { return func( *Options) error { .InProcessServer = return nil } } // Secure is an Option to enable TLS secure connections that skip server verification by default. // Pass a TLS Configuration for proper TLS. // A TLS Configuration using InsecureSkipVerify should NOT be used in a production setting. func ( ...*tls.Config) Option { return func( *Options) error { .Secure = true // Use of variadic just simplifies testing scenarios. We only take the first one. if len() > 1 { return ErrMultipleTLSConfigs } if len() == 1 { .TLSConfig = [0] } return nil } } // ClientTLSConfig is an Option to set the TLS configuration for secure // connections. It can be used to e.g. set TLS config with cert and root CAs // from memory. For simple use case of loading cert and CAs from file, // ClientCert and RootCAs options are more convenient. // If Secure is not already set this will set it as well. func ( TLSCertHandler, RootCAsHandler) Option { return func( *Options) error { .Secure = true if == nil && == nil { return ErrClientCertOrRootCAsRequired } // Smoke test the callbacks to fail early // if they are not valid. if != nil { if , := (); != nil { return } } if != nil { if , := (); != nil { return } } if .TLSConfig == nil { .TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} } .TLSCertCB = .RootCAsCB = return nil } } // RootCAs is a helper option to provide the RootCAs pool from a list of filenames. // If Secure is not already set this will set it as well. func ( ...string) Option { return func( *Options) error { := func() (*x509.CertPool, error) { := x509.NewCertPool() for , := range { , := os.ReadFile() if != nil || == nil { return nil, fmt.Errorf("nats: error loading or parsing rootCA file: %w", ) } := .AppendCertsFromPEM() if ! { return nil, fmt.Errorf("nats: failed to parse root certificate from %q", ) } } return , nil } if .TLSConfig == nil { .TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} } if , := (); != nil { return } .RootCAsCB = .Secure = true return nil } } // ClientCert is a helper option to provide the client certificate from a file. // If Secure is not already set this will set it as well. func (, string) Option { return func( *Options) error { := func() (tls.Certificate, error) { , := tls.LoadX509KeyPair(, ) if != nil { return tls.Certificate{}, fmt.Errorf("nats: error loading client certificate: %w", ) } .Leaf, = x509.ParseCertificate(.Certificate[0]) if != nil { return tls.Certificate{}, fmt.Errorf("nats: error parsing client certificate: %w", ) } return , nil } if .TLSConfig == nil { .TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} } if , := (); != nil { return } .TLSCertCB = .Secure = true return nil } } // NoReconnect is an Option to turn off reconnect behavior. func () Option { return func( *Options) error { .AllowReconnect = false return nil } } // DontRandomize is an Option to turn off randomizing the server pool. func () Option { return func( *Options) error { .NoRandomize = true return nil } } // NoEcho is an Option to turn off messages echoing back from a server. // Note this is supported on servers >= version 1.2. Proto 1 or greater. func () Option { return func( *Options) error { .NoEcho = true return nil } } // ReconnectWait is an Option to set the wait time between reconnect attempts. // Defaults to 2s. func ( time.Duration) Option { return func( *Options) error { .ReconnectWait = return nil } } // MaxReconnects is an Option to set the maximum number of reconnect attempts. // If negative, it will never stop trying to reconnect. // Defaults to 60. func ( int) Option { return func( *Options) error { .MaxReconnect = return nil } } // ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait. // Defaults to 100ms and 1s, respectively. func (, time.Duration) Option { return func( *Options) error { .ReconnectJitter = .ReconnectJitterTLS = return nil } } // CustomReconnectDelay is an Option to set the CustomReconnectDelayCB option. // See CustomReconnectDelayCB Option for more details. func ( ReconnectDelayHandler) Option { return func( *Options) error { .CustomReconnectDelayCB = return nil } } // PingInterval is an Option to set the period for client ping commands. // Defaults to 2m. func ( time.Duration) Option { return func( *Options) error { .PingInterval = return nil } } // MaxPingsOutstanding is an Option to set the maximum number of ping requests // that can go unanswered by the server before closing the connection. // Defaults to 2. func ( int) Option { return func( *Options) error { .MaxPingsOut = return nil } } // ReconnectBufSize sets the buffer size of messages kept while busy reconnecting. // Defaults to 8388608 bytes (8MB). It can be disabled by setting it to -1. func ( int) Option { return func( *Options) error { .ReconnectBufSize = return nil } } // Timeout is an Option to set the timeout for Dial on a connection. // Defaults to 2s. func ( time.Duration) Option { return func( *Options) error { .Timeout = return nil } } // FlusherTimeout is an Option to set the write (and flush) timeout on a connection. func ( time.Duration) Option { return func( *Options) error { .FlusherTimeout = return nil } } // DrainTimeout is an Option to set the timeout for draining a connection. // Defaults to 30s. func ( time.Duration) Option { return func( *Options) error { .DrainTimeout = return nil } } // DisconnectErrHandler is an Option to set the disconnected error handler. func ( ConnErrHandler) Option { return func( *Options) error { .DisconnectedErrCB = return nil } } // DisconnectHandler is an Option to set the disconnected handler. // Deprecated: Use DisconnectErrHandler. func ( ConnHandler) Option { return func( *Options) error { .DisconnectedCB = return nil } } // ConnectHandler is an Option to set the connected handler. func ( ConnHandler) Option { return func( *Options) error { .ConnectedCB = return nil } } // ReconnectHandler is an Option to set the reconnected handler. func ( ConnHandler) Option { return func( *Options) error { .ReconnectedCB = return nil } } // ReconnectErrHandler is an Option to set the reconnect error handler. func ( ConnErrHandler) Option { return func( *Options) error { .ReconnectErrCB = return nil } } // ClosedHandler is an Option to set the closed handler. func ( ConnHandler) Option { return func( *Options) error { .ClosedCB = return nil } } // DiscoveredServersHandler is an Option to set the new servers handler. func ( ConnHandler) Option { return func( *Options) error { .DiscoveredServersCB = return nil } } // ErrorHandler is an Option to set the async error handler. func ( ErrHandler) Option { return func( *Options) error { .AsyncErrorCB = return nil } } // UserInfo is an Option to set the username and password to // use when not included directly in the URLs. func (, string) Option { return func( *Options) error { .User = .Password = return nil } } func ( UserInfoCB) Option { return func( *Options) error { .UserInfo = return nil } } // Token is an Option to set the token to use // when a token is not included directly in the URLs // and when a token handler is not provided. func ( string) Option { return func( *Options) error { if .TokenHandler != nil { return ErrTokenAlreadySet } .Token = return nil } } // TokenHandler is an Option to set the token handler to use // when a token is not included directly in the URLs // and when a token is not set. func ( AuthTokenHandler) Option { return func( *Options) error { if .Token != "" { return ErrTokenAlreadySet } .TokenHandler = return nil } } // UserCredentials is a convenience function that takes a filename // for a user's JWT and a filename for the user's private Nkey seed. func ( string, ...string) Option { := func() (string, error) { return userFromFile() } var string if len() > 0 { = [0] } else { = } := func( []byte) ([]byte, error) { return sigHandler(, ) } return UserJWT(, ) } // UserJWTAndSeed is a convenience function that takes the JWT and seed // values as strings. func ( string, string) Option { := func() (string, error) { return , nil } := func( []byte) ([]byte, error) { , := nkeys.FromSeed([]byte()) if != nil { return nil, fmt.Errorf("unable to extract key pair from seed: %w", ) } // Wipe our key on exit. defer .Wipe() , := .Sign() return , nil } return UserJWT(, ) } // UserJWT will set the callbacks to retrieve the user's JWT and // the signature callback to sign the server nonce. This an the Nkey // option are mutually exclusive. func ( UserJWTHandler, SignatureHandler) Option { return func( *Options) error { if == nil { return ErrNoUserCB } if == nil { return ErrUserButNoSigCB } // Smoke test the user callback to ensure it is setup properly // when processing options. if , := (); != nil { return } .UserJWT = .SignatureCB = return nil } } // Nkey will set the public Nkey and the signature callback to // sign the server nonce. func ( string, SignatureHandler) Option { return func( *Options) error { .Nkey = .SignatureCB = if != "" && == nil { return ErrNkeyButNoSigCB } return nil } } // SyncQueueLen will set the maximum queue len for the internal // channel used for SubscribeSync(). // Defaults to 65536. func ( int) Option { return func( *Options) error { .SubChanLen = return nil } } // Dialer is an Option to set the dialer which will be used when // attempting to establish a connection. // Deprecated: Should use CustomDialer instead. func ( *net.Dialer) Option { return func( *Options) error { .Dialer = return nil } } // SetCustomDialer is an Option to set a custom dialer which will be // used when attempting to establish a connection. If both Dialer // and CustomDialer are specified, CustomDialer takes precedence. func ( CustomDialer) Option { return func( *Options) error { .CustomDialer = return nil } } // UseOldRequestStyle is an Option to force usage of the old Request style. func () Option { return func( *Options) error { .UseOldRequestStyle = true return nil } } // NoCallbacksAfterClientClose is an Option to disable callbacks when user code // calls Close(). If close is initiated by any other condition, callbacks // if any will be invoked. func () Option { return func( *Options) error { .NoCallbacksAfterClientClose = true return nil } } // LameDuckModeHandler sets the callback to invoke when the server notifies // the connection that it entered lame duck mode, that is, going to // gradually disconnect all its connections before shutting down. This is // often used in deployments when upgrading NATS Servers. func ( ConnHandler) Option { return func( *Options) error { .LameDuckModeHandler = return nil } } // RetryOnFailedConnect sets the connection in reconnecting state right away // if it can't connect to a server in the initial set. // See RetryOnFailedConnect option for more details. func ( bool) Option { return func( *Options) error { .RetryOnFailedConnect = return nil } } // Compression is an Option to indicate if this connection supports // compression. Currently only supported for Websocket connections. func ( bool) Option { return func( *Options) error { .Compression = return nil } } // ProxyPath is an option for websocket connections that adds a path to connections url. // This is useful when connecting to NATS behind a proxy. func ( string) Option { return func( *Options) error { .ProxyPath = return nil } } // CustomInboxPrefix configures the request + reply inbox prefix func ( string) Option { return func( *Options) error { if == "" || strings.Contains(, ">") || strings.Contains(, "*") || strings.HasSuffix(, ".") { return errors.New("nats: invalid custom prefix") } .InboxPrefix = return nil } } // IgnoreAuthErrorAbort opts out of the default connect behavior of aborting // subsequent reconnect attempts if server returns the same auth error twice. func () Option { return func( *Options) error { .IgnoreAuthErrorAbort = true return nil } } // SkipHostLookup is an Option to skip the host lookup when connecting to a server. func () Option { return func( *Options) error { .SkipHostLookup = true return nil } } func ( bool) Option { return func( *Options) error { .PermissionErrOnSubscribe = return nil } } // TLSHandshakeFirst is an Option to perform the TLS handshake first, that is // before receiving the INFO protocol. This requires the server to also be // configured with such option, otherwise the connection will fail. func () Option { return func( *Options) error { .TLSHandshakeFirst = true .Secure = true return nil } } // Handler processing // SetDisconnectHandler will set the disconnect event handler. // Deprecated: Use SetDisconnectErrHandler func ( *Conn) ( ConnHandler) { if == nil { return } .mu.Lock() defer .mu.Unlock() .Opts.DisconnectedCB = } // SetDisconnectErrHandler will set the disconnect event handler. func ( *Conn) ( ConnErrHandler) { if == nil { return } .mu.Lock() defer .mu.Unlock() .Opts.DisconnectedErrCB = } // DisconnectErrHandler will return the disconnect event handler. func ( *Conn) () ConnErrHandler { if == nil { return nil } .mu.Lock() defer .mu.Unlock() return .Opts.DisconnectedErrCB } // SetReconnectHandler will set the reconnect event handler. func ( *Conn) ( ConnHandler) { if == nil { return } .mu.Lock() defer .mu.Unlock() .Opts.ReconnectedCB = } // ReconnectHandler will return the reconnect event handler. func ( *Conn) () ConnHandler { if == nil { return nil } .mu.Lock() defer .mu.Unlock() return .Opts.ReconnectedCB } // SetDiscoveredServersHandler will set the discovered servers handler. func ( *Conn) ( ConnHandler) { if == nil { return } .mu.Lock() defer .mu.Unlock() .Opts.DiscoveredServersCB = } // DiscoveredServersHandler will return the discovered servers handler. func ( *Conn) () ConnHandler { if == nil { return nil } .mu.Lock() defer .mu.Unlock() return .Opts.DiscoveredServersCB } // SetClosedHandler will set the closed event handler. func ( *Conn) ( ConnHandler) { if == nil { return } .mu.Lock() defer .mu.Unlock() .Opts.ClosedCB = } // ClosedHandler will return the closed event handler. func ( *Conn) () ConnHandler { if == nil { return nil } .mu.Lock() defer .mu.Unlock() return .Opts.ClosedCB } // SetErrorHandler will set the async error handler. func ( *Conn) ( ErrHandler) { if == nil { return } .mu.Lock() defer .mu.Unlock() .Opts.AsyncErrorCB = } // ErrorHandler will return the async error handler. func ( *Conn) () ErrHandler { if == nil { return nil } .mu.Lock() defer .mu.Unlock() return .Opts.AsyncErrorCB } // Process the url string argument to Connect. // Return an array of urls, even if only one. func processUrlString( string) []string { := strings.Split(, ",") var int for , := range { := strings.TrimSuffix(strings.TrimSpace(), "/") if len() > 0 { [] = ++ } } return [:] } // Connect will attempt to connect to a NATS server with multiple options. func ( Options) () (*Conn, error) { := &Conn{Opts: } // Some default options processing. if .Opts.MaxPingsOut == 0 { .Opts.MaxPingsOut = DefaultMaxPingOut } // Allow old default for channel length to work correctly. if .Opts.SubChanLen == 0 { .Opts.SubChanLen = DefaultMaxChanLen } // Default ReconnectBufSize if .Opts.ReconnectBufSize == 0 { .Opts.ReconnectBufSize = DefaultReconnectBufSize } // Ensure that Timeout is not 0 if .Opts.Timeout == 0 { .Opts.Timeout = DefaultTimeout } // Check first for user jwt callback being defined and nkey. if .Opts.UserJWT != nil && .Opts.Nkey != "" { return nil, ErrNkeyAndUser } // Check if we have an nkey but no signature callback defined. if .Opts.Nkey != "" && .Opts.SignatureCB == nil { return nil, ErrNkeyButNoSigCB } // Allow custom Dialer for connecting using a timeout by default if .Opts.Dialer == nil { .Opts.Dialer = &net.Dialer{ Timeout: .Opts.Timeout, } } // If the TLSHandshakeFirst option is specified, make sure that // the Secure boolean is true. if .Opts.TLSHandshakeFirst { .Opts.Secure = true } if := .setupServerPool(); != nil { return nil, } // Create the async callback handler. .ach = &asyncCallbacksHandler{} .ach.cond = sync.NewCond(&.ach.mu) // Set a default error handler that will print to stderr. if .Opts.AsyncErrorCB == nil { .Opts.AsyncErrorCB = defaultErrHandler } // Create reader/writer .newReaderWriter() , := .connect() if != nil { return nil, } // Spin up the async cb dispatcher on success go .ach.asyncCBDispatcher() if && .Opts.ConnectedCB != nil { .ach.push(func() { .Opts.ConnectedCB() }) } return , nil } func defaultErrHandler( *Conn, *Subscription, error) { var uint64 if != nil { .mu.RLock() = .info.CID .mu.RUnlock() } var string if != nil { var string .mu.Lock() if .jsi != nil { = .jsi.psubj } else { = .Subject } .mu.Unlock() = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", .Error(), , ) } else { = fmt.Sprintf("%s on connection [%d]\n", .Error(), ) } os.Stderr.WriteString() } const ( _CRLF_ = "\r\n" _EMPTY_ = "" _SPC_ = " " _PUB_P_ = "PUB " _HPUB_P_ = "HPUB " ) var _CRLF_BYTES_ = []byte(_CRLF_) const ( _OK_OP_ = "+OK" _ERR_OP_ = "-ERR" _PONG_OP_ = "PONG" _INFO_OP_ = "INFO" ) const ( connectProto = "CONNECT %s" + _CRLF_ pingProto = "PING" + _CRLF_ pongProto = "PONG" + _CRLF_ subProto = "SUB %s %s %d" + _CRLF_ unsubProto = "UNSUB %d %s" + _CRLF_ okProto = _OK_OP_ + _CRLF_ ) // Return the currently selected server func ( *Conn) () (int, *srv) { for , := range .srvPool { if == nil { continue } if == .current { return , } } return -1, nil } // Pop the current server and put onto the end of the list. Select head of list as long // as number of reconnect attempts under MaxReconnect. func ( *Conn) () (*srv, error) { , := .currentServer() if < 0 { return nil, ErrNoServers } := .srvPool := len() copy([:-1], [+1:]) := .Opts.MaxReconnect if < 0 || .reconnects < { .srvPool[-1] = } else { .srvPool = [0 : -1] } if len(.srvPool) <= 0 { .current = nil return nil, ErrNoServers } .current = .srvPool[0] return .srvPool[0], nil } // Will assign the correct server to nc.current func ( *Conn) () error { .current = nil if len(.srvPool) <= 0 { return ErrNoServers } for , := range .srvPool { if != nil { .current = return nil } } return ErrNoServers } const tlsScheme = "tls" // Create the server pool using the options given. // We will place a Url option first, followed by any // Server Options. We will randomize the server pool unless // the NoRandomize flag is set. func ( *Conn) () error { .srvPool = make([]*srv, 0, srvPoolSize) .urls = make(map[string]struct{}, srvPoolSize) // Create srv objects from each url string in nc.Opts.Servers // and add them to the pool. for , := range .Opts.Servers { if := .addURLToPool(, false, false); != nil { return } } // Randomize if allowed to if !.Opts.NoRandomize { .shufflePool(0) } // Normally, if this one is set, Options.Servers should not be, // but we always allowed that, so continue to do so. if .Opts.Url != _EMPTY_ { // Add to the end of the array if := .addURLToPool(.Opts.Url, false, false); != nil { return } // Then swap it with first to guarantee that Options.Url is tried first. := len(.srvPool) - 1 if > 0 { .srvPool[0], .srvPool[] = .srvPool[], .srvPool[0] } } else if len(.srvPool) <= 0 { // Place default URL if pool is empty. if := .addURLToPool(DefaultURL, false, false); != nil { return } } // Check for Scheme hint to move to TLS mode. for , := range .srvPool { if .url.Scheme == tlsScheme || .url.Scheme == wsSchemeTLS { // FIXME(dlc), this is for all in the pool, should be case by case. .Opts.Secure = true if .Opts.TLSConfig == nil { .Opts.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} } } } return .pickServer() } // Helper function to return scheme func ( *Conn) () string { if .ws { if .Opts.Secure { return wsSchemeTLS } return wsScheme } if .Opts.Secure { return tlsScheme } return "nats" } // Return true iff u.Hostname() is an IP address. func hostIsIP( *url.URL) bool { return net.ParseIP(.Hostname()) != nil } // addURLToPool adds an entry to the server pool func ( *Conn) ( string, , bool) error { if !strings.Contains(, "://") { = fmt.Sprintf("%s://%s", .connScheme(), ) } var ( *url.URL error ) for := 0; < 2; ++ { , = url.Parse() if != nil { return } if .Port() != "" { break } // In case given URL is of the form "localhost:", just add // the port number at the end, otherwise, add ":4222". if [len()-1] != ':' { += ":" } switch .Scheme { case wsScheme: += defaultWSPortString case wsSchemeTLS: += defaultWSSPortString default: += defaultPortString } } := isWebsocketScheme() // We don't support mix and match of websocket and non websocket URLs. // If this is the first URL, then we accept and switch the global state // to websocket. After that, we will know how to reject mixed URLs. if len(.srvPool) == 0 { .ws = } else if && !.ws || ! && .ws { return errors.New("mixing of websocket and non websocket URLs is not allowed") } var string if { := .current.url // Check to see if we do not have a url.User but current connected // url does. If so copy over. if .User == nil && .User != nil { .User = .User } // We are checking to see if we have a secure connection and are // adding an implicit server that just has an IP. If so we will remember // the current hostname we are connected to. if && hostIsIP() { = .Hostname() } } := &srv{url: , isImplicit: , tlsName: } .srvPool = append(.srvPool, ) .urls[.Host] = struct{}{} return nil } // shufflePool swaps randomly elements in the server pool // The `offset` value indicates that the shuffling should start at // this offset and leave the elements from [0..offset) intact. func ( *Conn) ( int) { if len(.srvPool) <= +1 { return } := rand.NewSource(time.Now().UnixNano()) := rand.New() for := ; < len(.srvPool); ++ { := + .Intn(+1-) .srvPool[], .srvPool[] = .srvPool[], .srvPool[] } } func ( *Conn) () { .br = &natsReader{ buf: make([]byte, defaultBufSize), off: -1, } .bw = &natsWriter{ limit: defaultBufSize, plimit: .Opts.ReconnectBufSize, } } func ( *Conn) () { := .bw .w, .bufs = .newWriter(), nil := .br .r, .n, .off = .conn, 0, -1 } func ( *Conn) () io.Writer { var io.Writer = .conn if .Opts.FlusherTimeout > 0 { = &timeoutWriter{conn: .conn, timeout: .Opts.FlusherTimeout} } return } func ( *natsWriter) ( string) error { return .appendBufs([]byte()) } func ( *natsWriter) ( ...[]byte) error { for , := range { if len() == 0 { continue } if .pending != nil { .pending.Write() } else { .bufs = append(.bufs, ...) } } if .pending == nil && len(.bufs) >= .limit { return .flush() } return nil } func ( *natsWriter) ( ...string) error { for , := range { if , := .w.Write([]byte()); != nil { return } } return nil } func ( *natsWriter) () error { // If a pending buffer is set, we don't flush. Code that needs to // write directly to the socket, by-passing buffers during (re)connect, // will use the writeDirect() API. if .pending != nil { return nil } // Do not skip calling w.w.Write() here if len(w.bufs) is 0 because // the actual writer (if websocket for instance) may have things // to do such as sending control frames, etc.. , := .w.Write(.bufs) .bufs = .bufs[:0] return } func ( *natsWriter) () int { if .pending != nil { return .pending.Len() } return len(.bufs) } func ( *natsWriter) () { .pending = new(bytes.Buffer) } func ( *natsWriter) () error { if .pending == nil || .pending.Len() == 0 { return nil } , := .w.Write(.pending.Bytes()) // Reset the pending buffer at this point because we don't want // to take the risk of sending duplicates or partials. .pending.Reset() return } func ( *natsWriter) () bool { if .pending == nil { return false } return .pending.Len() >= .plimit } func ( *natsWriter) () { .pending = nil } // Notify the reader that we are done with the connect, where "read" operations // happen synchronously and under the connection lock. After this point, "read" // will be happening from the read loop, without the connection lock. // // Note: this runs under the connection lock. func ( *natsReader) () { if , := .r.(*websocketReader); { .doneWithConnect() } } func ( *natsReader) () ([]byte, error) { if .off >= 0 { := .off .off = -1 return .buf[:.n], nil } var error .n, = .r.Read(.buf) return .buf[:.n], } func ( *natsReader) ( byte) (string, error) { var string : // First look if we have something in the buffer if .off >= 0 { := bytes.IndexByte(.buf[.off:.n], ) if >= 0 { := .off + + 1 += string(.buf[.off:]) .off = if .off >= .n { .off = -1 } return , nil } // We did not find the delim, so will have to read more. += string(.buf[.off:.n]) .off = -1 } if , := .Read(); != nil { return , } .off = 0 goto } // createConn will connect to the server and wrap the appropriate // bufio structures. It will do the right thing when an existing // connection is in place. func ( *Conn) () ( error) { if .Opts.Timeout < 0 { return ErrBadTimeout } if , := .currentServer(); == nil { return ErrNoServers } // If we have a reference to an in-process server then establish a // connection using that. if .Opts.InProcessServer != nil { , := .Opts.InProcessServer.InProcessConn() if != nil { return fmt.Errorf("failed to get in-process connection: %w", ) } .conn = .bindToNewConn() return nil } // We will auto-expand host names if they resolve to multiple IPs := []string{} := .current.url if !.Opts.SkipHostLookup && net.ParseIP(.Hostname()) == nil { , := net.LookupHost(.Hostname()) for , := range { = append(, net.JoinHostPort(, .Port())) } } // Fall back to what we were given. if len() == 0 { = append(, .Host) } // CustomDialer takes precedence. If not set, use Opts.Dialer which // is set to a default *net.Dialer (in Connect()) if not explicitly // set by the user. := .Opts.CustomDialer if == nil { // We will copy and shorten the timeout if we have multiple hosts to try. := *.Opts.Dialer .Timeout = .Timeout / time.Duration(len()) = & } if len() > 1 && !.Opts.NoRandomize { rand.Shuffle(len(), func(, int) { [], [] = [], [] }) } for , := range { .conn, = .Dial("tcp", ) if == nil { break } } if != nil { return } // If scheme starts with "ws" then branch out to websocket code. if isWebsocketScheme() { return .wsInitHandshake() } // Reset reader/writer to this new TCP connection .bindToNewConn() return nil } type skipTLSDialer interface { SkipTLSHandshake() bool } // makeTLSConn will wrap an existing Conn using TLS func ( *Conn) () error { if .Opts.CustomDialer != nil { // we do nothing when asked to skip the TLS wrapper , := .Opts.CustomDialer.(skipTLSDialer) if && .SkipTLSHandshake() { return nil } } // Allow the user to configure their own tls.Config structure. := &tls.Config{} if .Opts.TLSConfig != nil { = util.CloneTLSConfig(.Opts.TLSConfig) } if .Opts.TLSCertCB != nil { , := .Opts.TLSCertCB() if != nil { return } .Certificates = []tls.Certificate{} } if .Opts.RootCAsCB != nil { , := .Opts.RootCAsCB() if != nil { return } .RootCAs = } // If its blank we will override it with the current host if .ServerName == _EMPTY_ { if .current.tlsName != _EMPTY_ { .ServerName = .current.tlsName } else { , , := net.SplitHostPort(.current.url.Host) .ServerName = } } .conn = tls.Client(.conn, ) := .conn.(*tls.Conn) if := .Handshake(); != nil { return } .bindToNewConn() return nil } // TLSConnectionState retrieves the state of the TLS connection to the server func ( *Conn) () (tls.ConnectionState, error) { if !.isConnected() { return tls.ConnectionState{}, ErrDisconnected } .mu.RLock() := .conn .mu.RUnlock() , := .(*tls.Conn) if ! { return tls.ConnectionState{}, ErrConnectionNotTLS } return .ConnectionState(), nil } // waitForExits will wait for all socket watcher Go routines to // be shutdown before proceeding. func ( *Conn) () { // Kick old flusher forcefully. select { case .fch <- struct{}{}: default: } // Wait for any previous go routines. .wg.Wait() } // ForceReconnect forces a reconnect attempt to the server. // This is a non-blocking call and will start the reconnect // process without waiting for it to complete. // // If the connection is already in the process of reconnecting, // this call will force an immediate reconnect attempt (bypassing // the current reconnect delay). func ( *Conn) () error { .mu.Lock() defer .mu.Unlock() if .isClosed() { return ErrConnectionClosed } if .isReconnecting() { // if we're already reconnecting, force a reconnect attempt // even if we're in the middle of a backoff if .rqch != nil { close(.rqch) .rqch = nil } return nil } // Clear any queued pongs .clearPendingFlushCalls() // Clear any queued and blocking requests. .clearPendingRequestCalls() // Stop ping timer if set. .stopPingTimer() // Go ahead and make sure we have flushed the outbound .bw.flush() .conn.Close() .changeConnStatus(RECONNECTING) go .doReconnect(nil, true) return nil } // ConnectedUrl reports the connected server's URL func ( *Conn) () string { if == nil { return _EMPTY_ } .mu.RLock() defer .mu.RUnlock() if .status != CONNECTED { return _EMPTY_ } return .current.url.String() } // ConnectedUrlRedacted reports the connected server's URL with passwords redacted func ( *Conn) () string { if == nil { return _EMPTY_ } .mu.RLock() defer .mu.RUnlock() if .status != CONNECTED { return _EMPTY_ } return .current.url.Redacted() } // ConnectedAddr returns the connected server's IP func ( *Conn) () string { if == nil { return _EMPTY_ } .mu.RLock() defer .mu.RUnlock() if .status != CONNECTED { return _EMPTY_ } return .conn.RemoteAddr().String() } // ConnectedServerId reports the connected server's Id func ( *Conn) () string { if == nil { return _EMPTY_ } .mu.RLock() defer .mu.RUnlock() if .status != CONNECTED { return _EMPTY_ } return .info.ID } // ConnectedServerName reports the connected server's name func ( *Conn) () string { if == nil { return _EMPTY_ } .mu.RLock() defer .mu.RUnlock() if .status != CONNECTED { return _EMPTY_ } return .info.Name } var semVerRe = regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`) func versionComponents( string) (, , int, error) { := semVerRe.FindStringSubmatch() if == nil { return 0, 0, 0, errors.New("invalid semver") } , = strconv.Atoi([1]) if != nil { return -1, -1, -1, } , = strconv.Atoi([2]) if != nil { return -1, -1, -1, } , = strconv.Atoi([3]) if != nil { return -1, -1, -1, } return , , , } // Check for minimum server requirement. func ( *Conn) (, , int) bool { , , , := versionComponents(.ConnectedServerVersion()) if < || ( == && < ) || ( == && == && < ) { return false } return true } // ConnectedServerVersion reports the connected server's version as a string func ( *Conn) () string { if == nil { return _EMPTY_ } .mu.RLock() defer .mu.RUnlock() if .status != CONNECTED { return _EMPTY_ } return .info.Version } // ConnectedClusterName reports the connected server's cluster name if any func ( *Conn) () string { if == nil { return _EMPTY_ } .mu.RLock() defer .mu.RUnlock() if .status != CONNECTED { return _EMPTY_ } return .info.Cluster } // Low level setup for structs, etc func ( *Conn) () { .subs = make(map[int64]*Subscription) .pongs = make([]chan struct{}, 0, 8) .fch = make(chan struct{}, flushChanSize) .rqch = make(chan struct{}) // Setup scratch outbound buffer for PUB/HPUB := .scratch[:len(_HPUB_P_)] copy(, _HPUB_P_) } // Process a connected connection and initialize properly. func ( *Conn) () error { // Set our deadline for the whole connect process .conn.SetDeadline(time.Now().Add(.Opts.Timeout)) defer .conn.SetDeadline(time.Time{}) // Set our status to connecting. .changeConnStatus(CONNECTING) // If we need to have a TLS connection and want the TLS handshake to occur // first, do it now. if .Opts.Secure && .Opts.TLSHandshakeFirst { if := .makeTLSConn(); != nil { return } } // Process the INFO protocol received from the server := .processExpectedInfo() if != nil { return } // Send the CONNECT protocol along with the initial PING protocol. // Wait for the PONG response (or any error that we get from the server). = .sendConnect() if != nil { return } // Reset the number of PING sent out .pout = 0 // Start or reset Timer if .Opts.PingInterval > 0 { if .ptmr == nil { .ptmr = time.AfterFunc(.Opts.PingInterval, .processPingTimer) } else { .ptmr.Reset(.Opts.PingInterval) } } // Start the readLoop and flusher go routines, we will wait on both on a reconnect event. .wg.Add(2) go .readLoop() go .flusher() // Notify the reader that we are done with the connect handshake, where // reads were done synchronously and under the connection lock. .br.doneWithConnect() return nil } // Main connect function. Will connect to the nats-server. func ( *Conn) () (bool, error) { var error var bool // Create actual socket connection // For first connect we walk all servers in the pool and try // to connect immediately. .mu.Lock() defer .mu.Unlock() .initc = true // The pool may change inside the loop iteration due to INFO protocol. for := 0; < len(.srvPool); ++ { .current = .srvPool[] if = .createConn(); == nil { // This was moved out of processConnectInit() because // that function is now invoked from doReconnect() too. .setup() = .processConnectInit() if == nil { .current.didConnect = true .current.reconnects = 0 .current.lastErr = nil break } else { .mu.Unlock() .close(DISCONNECTED, false, ) .mu.Lock() // Do not reset nc.current here since it would prevent // RetryOnFailedConnect to work should this be the last server // to try before starting doReconnect(). } } else { // Cancel out default connection refused, will trigger the // No servers error conditional if strings.Contains(.Error(), "connection refused") { = nil } } } if == nil && .status != CONNECTED { = ErrNoServers } if == nil { = true .initc = false } else if .Opts.RetryOnFailedConnect { .setup() .changeConnStatus(RECONNECTING) .bw.switchToPending() go .doReconnect(ErrNoServers, false) = nil } else { .current = nil } return , } // This will check to see if the connection should be // secure. This can be dictated from either end and should // only be called after the INIT protocol has been received. func ( *Conn) () error { // Check to see if we need to engage TLS := .Opts // Check for mismatch in setups if .Secure && !.info.TLSRequired && !.info.TLSAvailable { return ErrSecureConnWanted } else if .info.TLSRequired && !.Secure { // Switch to Secure since server needs TLS. .Secure = true } if .Secure { // If TLS handshake first is true, we have already done // the handshake, so we are done here. if .TLSHandshakeFirst { return nil } // Need to rewrap with bufio if := .makeTLSConn(); != nil { return } } return nil } // processExpectedInfo will look for the expected first INFO message // sent when a connection is established. The lock should be held entering. func ( *Conn) () error { := &control{} // Read the protocol := .readOp() if != nil { return } // The nats protocol should send INFO first always. if .op != _INFO_OP_ { return ErrNoInfoReceived } // Parse the protocol if := .processInfo(.args); != nil { return } if .Opts.Nkey != "" && .info.Nonce == "" { return ErrNkeysNotSupported } // For websocket connections, we already switched to TLS if need be, // so we are done here. if .ws { return nil } return .checkForSecure() } // Sends a protocol control message by queuing into the bufio writer // and kicking the flush Go routine. These writes are protected. func ( *Conn) ( string) { .mu.Lock() .bw.appendString() .kickFlusher() .mu.Unlock() } // Generate a connect protocol message, issuing user/password if // applicable. The lock is assumed to be held upon entering. func ( *Conn) () (string, error) { := .Opts var , , , , , string := .current.url.User if != nil { // if no password, assume username is authToken if , := .Password(); ! { = .Username() } else { = .Username() , _ = .Password() } } else { // Take from options (possibly all empty strings) = .User = .Password = .Token = .Nkey if .Opts.UserInfo != nil { if != _EMPTY_ || != _EMPTY_ { return _EMPTY_, ErrUserInfoAlreadySet } , = .Opts.UserInfo() } } // Look for user jwt. if .UserJWT != nil { if , := .UserJWT(); != nil { return _EMPTY_, } else { = } if != _EMPTY_ { return _EMPTY_, ErrNkeyAndUser } } if != _EMPTY_ || != _EMPTY_ { if .SignatureCB == nil { if == _EMPTY_ { return _EMPTY_, ErrNkeyButNoSigCB } return _EMPTY_, ErrUserButNoSigCB } , := .SignatureCB([]byte(.info.Nonce)) if != nil { return _EMPTY_, fmt.Errorf("error signing nonce: %w", ) } = base64.RawURLEncoding.EncodeToString() } if .Opts.TokenHandler != nil { if != _EMPTY_ { return _EMPTY_, ErrTokenAlreadySet } = .Opts.TokenHandler() } // If our server does not support headers then we can't do them or no responders. := .info.Headers := connectInfo{ .Verbose, .Pedantic, , , , , , , .Secure, .Name, LangString, Version, clientProtoInfo, !.NoEcho, , , } , := json.Marshal() if != nil { return _EMPTY_, ErrJsonParse } // Check if NoEcho is set and we have a server that supports it. if .NoEcho && .info.Proto < 1 { return _EMPTY_, ErrNoEchoNotSupported } return fmt.Sprintf(connectProto, ), nil } // normalizeErr removes the prefix -ERR, trim spaces and remove the quotes. func normalizeErr( string) string { := strings.TrimSpace(strings.TrimPrefix(, _ERR_OP_)) = strings.TrimLeft(strings.TrimRight(, "'"), "'") return } // natsProtoErr represents an -ERR protocol message sent by the server. type natsProtoErr struct { description string } func ( *natsProtoErr) () string { return fmt.Sprintf("nats: %s", .description) } func ( *natsProtoErr) ( error) bool { return strings.ToLower(.Error()) == .Error() } // Send a connect protocol message to the server, issue user/password if // applicable. Will wait for a flush to return from the server for error // processing. func ( *Conn) () error { // Construct the CONNECT protocol string , := .connectProto() if != nil { if !.initc && .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, nil, ) }) } return } // Write the protocol and PING directly to the underlying writer. if := .bw.writeDirect(, pingProto); != nil { return } // We don't want to read more than we need here, otherwise // we would need to transfer the excess read data to the readLoop. // Since in normal situations we just are looking for a PONG\r\n, // reading byte-by-byte here is ok. , := .readProto() if != nil { if !.initc && .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, nil, ) }) } return } // If opts.Verbose is set, handle +OK if .Opts.Verbose && == okProto { // Read the rest now... , = .readProto() if != nil { if !.initc && .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, nil, ) }) } return } } // We expect a PONG if != pongProto { // But it could be something else, like -ERR // Since we no longer use ReadLine(), trim the trailing "\r\n" = strings.TrimRight(, "\r\n") // If it's a server error... if strings.HasPrefix(, _ERR_OP_) { // Remove -ERR, trim spaces and quotes, and convert to lower case. = normalizeErr() // Check if this is an auth error if := checkAuthError(strings.ToLower()); != nil { // This will schedule an async error if we are in reconnect, // and keep track of the auth error for the current server. // If we have got the same error twice, this sets nc.ar to true to // indicate that the reconnect should be aborted (will be checked // in doReconnect()). .processAuthError() } return &natsProtoErr{} } // Notify that we got an unexpected protocol. return fmt.Errorf("nats: expected '%s', got '%s'", _PONG_OP_, ) } // This is where we are truly connected. .changeConnStatus(CONNECTED) return nil } // reads a protocol line. func ( *Conn) () (string, error) { return .br.ReadString('\n') } // A control protocol line. type control struct { op, args string } // Read a control line and process the intended op. func ( *Conn) ( *control) error { , := .readProto() if != nil { return } parseControl(, ) return nil } // Parse a control line from the server. func parseControl( string, *control) { := strings.SplitN(, _SPC_, 2) if len() == 1 { .op = strings.TrimSpace([0]) .args = _EMPTY_ } else if len() == 2 { .op, .args = strings.TrimSpace([0]), strings.TrimSpace([1]) } else { .op = _EMPTY_ } } // flushReconnectPendingItems will push the pending items that were // gathered while we were in a RECONNECTING state to the socket. func ( *Conn) () error { return .bw.flushPendingBuffer() } // Stops the ping timer if set. // Connection lock is held on entry. func ( *Conn) () { if .ptmr != nil { .ptmr.Stop() } } // Try to reconnect using the option parameters. // This function assumes we are allowed to reconnect. func ( *Conn) ( error, bool) { // We want to make sure we have the other watchers shutdown properly // here before we proceed past this point. .waitForExits() // FIXME(dlc) - We have an issue here if we have // outstanding flush points (pongs) and they were not // sent out, but are still in the pipe. // Hold the lock manually and release where needed below, // can't do defer here. .mu.Lock() // Clear any errors. .err = nil // Perform appropriate callback if needed for a disconnect. // DisconnectedErrCB has priority over deprecated DisconnectedCB if !.initc { if .Opts.DisconnectedErrCB != nil { .ach.push(func() { .Opts.DisconnectedErrCB(, ) }) } else if .Opts.DisconnectedCB != nil { .ach.push(func() { .Opts.DisconnectedCB() }) } } // This is used to wait on go routines exit if we start them in the loop // but an error occurs after that. := false var *time.Timer // Channel used to kick routine out of sleep when conn is closed. := .rqch // if rqch is nil, we need to set it up to signal // the reconnect loop to reconnect immediately // this means that `ForceReconnect` was called // before entering doReconnect if == nil { = make(chan struct{}) close() } // Counter that is increased when the whole list of servers has been tried. var int var time.Duration var time.Duration // If a custom reconnect delay handler is set, this takes precedence. := .Opts.CustomReconnectDelayCB if == nil { = .Opts.ReconnectWait // TODO: since we sleep only after the whole list has been tried, we can't // rely on individual *srv to know if it is a TLS or non-TLS url. // We have to pick which type of jitter to use, for now, we use these hints: = .Opts.ReconnectJitter if .Opts.Secure || .Opts.TLSConfig != nil { = .Opts.ReconnectJitterTLS } } for := 0; len(.srvPool) > 0; { , := .selectNextServer() if != nil { .err = break } := +1 >= len(.srvPool) && ! = false .mu.Unlock() if ! { ++ // Release the lock to give a chance to a concurrent nc.Close() to break the loop. runtime.Gosched() } else { = 0 var time.Duration if != nil { ++ = () } else { = if > 0 { += time.Duration(rand.Int63n(int64())) } } if == nil { = time.NewTimer() } else { .Reset() } select { case <-: .Stop() // we need to reset the rqch channel to avoid // closing a closed channel in the next iteration .mu.Lock() .rqch = make(chan struct{}) .mu.Unlock() case <-.C: } } // If the readLoop, etc.. go routines were started, wait for them to complete. if { .waitForExits() = false } .mu.Lock() // Check if we have been closed first. if .isClosed() { break } // Mark that we tried a reconnect .reconnects++ // Try to create a new connection = .createConn() // Not yet connected, retry... // Continue to hold the lock if != nil { // Perform appropriate callback for a failed connection attempt. if .Opts.ReconnectErrCB != nil { .ach.push(func() { .Opts.ReconnectErrCB(, ) }) } .err = nil continue } // We are reconnected .Reconnects++ // Process connect logic if .err = .processConnectInit(); .err != nil { // Check if we should abort reconnect. If so, break out // of the loop and connection will be closed. if .ar { break } .changeConnStatus(RECONNECTING) continue } // Clear possible lastErr under the connection lock after // a successful processConnectInit(). .current.lastErr = nil // Clear out server stats for the server we connected to.. .didConnect = true .reconnects = 0 // Send existing subscription state .resendSubscriptions() // Now send off and clear pending buffer .err = .flushReconnectPendingItems() if .err != nil { .changeConnStatus(RECONNECTING) // Stop the ping timer (if set) .stopPingTimer() // Since processConnectInit() returned without error, the // go routines were started, so wait for them to return // on the next iteration (after releasing the lock). = true continue } // Done with the pending buffer .bw.doneWithPending() // Queue up the correct callback. If we are in initial connect state // (using retry on failed connect), we will call the ConnectedCB, // otherwise the ReconnectedCB. if .Opts.ReconnectedCB != nil && !.initc { .ach.push(func() { .Opts.ReconnectedCB() }) } else if .Opts.ConnectedCB != nil && .initc { .ach.push(func() { .Opts.ConnectedCB() }) } // If we are here with a retry on failed connect, indicate that the // initial connect is now complete. .initc = false // Release lock here, we will return below. .mu.Unlock() // Make sure to flush everything .Flush() return } // Call into close.. We have no servers left.. if .err == nil { .err = ErrNoServers } .mu.Unlock() .close(CLOSED, true, nil) } // processOpErr handles errors from reading or parsing the protocol. // The lock should not be held entering this function. func ( *Conn) ( error) bool { .mu.Lock() defer .mu.Unlock() if .isConnecting() || .isClosed() || .isReconnecting() { return false } if .Opts.AllowReconnect && .status == CONNECTED { // Set our new status .changeConnStatus(RECONNECTING) // Stop ping timer if set .stopPingTimer() if .conn != nil { .conn.Close() .conn = nil } // Create pending buffer before reconnecting. .bw.switchToPending() // Clear any queued pongs, e.g. pending flush calls. .clearPendingFlushCalls() go .doReconnect(, false) return false } .changeConnStatus(DISCONNECTED) .err = return true } // dispatch is responsible for calling any async callbacks func ( *asyncCallbacksHandler) () { for { .mu.Lock() // Protect for spurious wakeups. We should get out of the // wait only if there is an element to pop from the list. for .head == nil { .cond.Wait() } := .head .head = .next if == .tail { .tail = nil } .mu.Unlock() // This signals that the dispatcher has been closed and all // previous callbacks have been dispatched. if .f == nil { return } // Invoke callback outside of handler's lock .f() } } // Add the given function to the tail of the list and // signals the dispatcher. func ( *asyncCallbacksHandler) ( func()) { .pushOrClose(, false) } // Signals that we are closing... func ( *asyncCallbacksHandler) () { .pushOrClose(nil, true) } // Add the given function to the tail of the list and // signals the dispatcher. func ( *asyncCallbacksHandler) ( func(), bool) { .mu.Lock() defer .mu.Unlock() // Make sure that library is not calling push with nil function, // since this is used to notify the dispatcher that it should stop. if ! && == nil { panic("pushing a nil callback") } := &asyncCB{f: } if .tail != nil { .tail.next = } else { .head = } .tail = if { .cond.Broadcast() } else { .cond.Signal() } } // readLoop() will sit on the socket reading and processing the // protocol from the server. It will dispatch appropriately based // on the op type. func ( *Conn) () { // Release the wait group on exit defer .wg.Done() // Create a parseState if needed. .mu.Lock() if .ps == nil { .ps = &parseState{} } := .conn := .br .mu.Unlock() if == nil { return } for { , := .Read() if == nil { // With websocket, it is possible that there is no error but // also no buffer returned (either WS control message or read of a // partial compressed message). We could call parse(buf) which // would ignore an empty buffer, but simply go back to top of the loop. if len() == 0 { continue } = .parse() } if != nil { if := .processOpErr(); { .close(CLOSED, true, nil) } break } } // Clear the parseState here.. .mu.Lock() .ps = nil .mu.Unlock() } // waitForMsgs waits on the conditional shared with readLoop and processMsg. // It is used to deliver messages to asynchronous subscribers. func ( *Conn) ( *Subscription) { var bool var , uint64 // Used to account for adjustments to sub.pBytes when we wrap back around. := -1 for { .mu.Lock() // Do accounting for last msg delivered here so we only lock once // and drain state trips after callback has returned. if >= 0 { .pMsgs-- .pBytes -= = -1 } if .pHead == nil && !.closed { .pCond.Wait() } // Pop the msg off the list := .pHead if != nil { .pHead = .next if .pHead == nil { .pTail = nil } if .barrier != nil { .mu.Unlock() if atomic.AddInt64(&.barrier.refs, -1) == 0 { .barrier.f() } continue } = len(.Data) } := .mcb = .max = .closed var string if !.closed { .delivered++ = .delivered if .jsi != nil { = .checkForFlowControlResponse() } } .mu.Unlock() // Respond to flow control if applicable if != _EMPTY_ { .Publish(, nil) } if { break } // Deliver the message. if != nil && ( == 0 || <= ) { () } // If we have hit the max for delivered msgs, remove sub. if > 0 && >= { .mu.Lock() .removeSub() .mu.Unlock() break } } // Check for barrier messages .mu.Lock() for := .pHead; != nil; = .pHead { if .barrier != nil { .mu.Unlock() if atomic.AddInt64(&.barrier.refs, -1) == 0 { .barrier.f() } .mu.Lock() } .pHead = .next } // Now check for pDone := .pDone .mu.Unlock() if != nil { (.Subject) } } // Used for debugging and simulating loss for certain tests. // Return what is to be used. If we return nil the message will be dropped. type msgFilter func(m *Msg) *Msg // processMsg is called by parse and will place the msg on the // appropriate channel/pending queue for processing. If the channel is full, // or the pending queue is over the pending limits, the connection is // considered a slow consumer. func ( *Conn) ( []byte) { // Stats atomic.AddUint64(&.InMsgs, 1) atomic.AddUint64(&.InBytes, uint64(len())) // Don't lock the connection to avoid server cutting us off if the // flusher is holding the connection lock, trying to send to the server // that is itself trying to send data to us. .subsMu.RLock() := .subs[.ps.ma.sid] var msgFilter if .filters != nil { = .filters[string(.ps.ma.subject)] } .subsMu.RUnlock() if == nil { return } // Copy them into string := string(.ps.ma.subject) := string(.ps.ma.reply) // Doing message create outside of the sub's lock to reduce contention. // It's possible that we end-up not using the message, but that's ok. // FIXME(dlc): Need to copy, should/can do COW? := if !.ps.msgCopied { = make([]byte, len()) copy(, ) } // Check if we have headers encoded here. var Header var error var bool var int var string if .ps.ma.hdr > 0 { := [:.ps.ma.hdr] = [.ps.ma.hdr:] , = DecodeHeadersMsg() if != nil { // We will pass the message through but send async error. .mu.Lock() .err = ErrBadHeaderMsg if .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, , ErrBadHeaderMsg) }) } .mu.Unlock() } } // FIXME(dlc): Should we recycle these containers? := &Msg{ Subject: , Reply: , Header: , Data: , Sub: , wsz: len() + len() + len(), } // Check for message filters. if != nil { if = (); == nil { // Drop message. return } } .mu.Lock() // Check if closed. if .closed { .mu.Unlock() return } // Skip flow control messages in case of using a JetStream context. := .jsi if != nil { // There has to be a header for it to be a control message. if != nil { , = isJSControlMessage() if && == jsCtrlHB { // Check if the heartbeat has a "Consumer Stalled" header, if // so, the value is the FC reply to send a nil message to. // We will send it at the end of this function. = .Header.Get(consumerStalledHdr) } } // Check for ordered consumer here. If checkOrderedMsgs returns true that means it detected a gap. if ! && .ordered && .checkOrderedMsgs() { .mu.Unlock() return } } // Skip processing if this is a control message and // if not a pull consumer heartbeat. For pull consumers, // heartbeats have to be handled on per request basis. if ! || ( != nil && .pull) { var bool // Subscription internal stats (applicable only for non ChanSubscription's) if .typ != ChanSubscription { .pMsgs++ if .pMsgs > .pMsgsMax { .pMsgsMax = .pMsgs } .pBytes += len(.Data) if .pBytes > .pBytesMax { .pBytesMax = .pBytes } // Check for a Slow Consumer if (.pMsgsLimit > 0 && .pMsgs > .pMsgsLimit) || (.pBytesLimit > 0 && .pBytes > .pBytesLimit) { goto } } else if != nil { = true } // We have two modes of delivery. One is the channel, used by channel // subscribers and syncSubscribers, the other is a linked list for async. if .mch != nil { select { case .mch <- : default: goto } } else { // Push onto the async pList if .pHead == nil { .pHead = .pTail = if .pCond != nil { .pCond.Signal() } } else { .pTail.next = .pTail = } } if != nil { // Store the ACK metadata from the message to // compare later on with the received heartbeat. .trackSequences(.Reply) if { // For ChanSubscription, since we can't call this when a message // is "delivered" (since user is pull from their own channel), // we have a go routine that does this check, however, we do it // also here to make it much more responsive. The go routine is // really to avoid stalling when there is no new messages coming. = .checkForFlowControlResponse() } } } else if == jsCtrlFC && .Reply != _EMPTY_ { // This is a flow control message. // We will schedule the send of the FC reply once we have delivered the // DATA message that was received before this flow control message, which // has sequence `jsi.fciseq`. However, it is possible that this message // has already been delivered, in that case, we need to send the FC reply now. if .getJSDelivered() >= .fciseq { = .Reply } else { // Schedule a reply after the previous message is delivered. .scheduleFlowControlResponse(.Reply) } } // Clear any SlowConsumer status. if .sc { .changeSubStatus(SubscriptionActive) } .sc = false .mu.Unlock() if != _EMPTY_ { .Publish(, nil) } // Handle control heartbeat messages. if && == jsCtrlHB && .Reply == _EMPTY_ { .checkForSequenceMismatch(, , ) } return : .dropped++ := !.sc .sc = true // Undo stats from above if .typ != ChanSubscription { .pMsgs-- .pBytes -= len(.Data) } if { .changeSubStatus(SubscriptionSlowConsumer) .mu.Unlock() // Now we need connection's lock and we may end-up in the situation // that we were trying to avoid, except that in this case, the client // is already experiencing client-side slow consumer situation. .mu.Lock() .err = ErrSlowConsumer if .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, , ErrSlowConsumer) }) } .mu.Unlock() } else { .mu.Unlock() } } var ( permissionsRe = regexp.MustCompile(`Subscription to "(\S+)"`) permissionsQueueRe = regexp.MustCompile(`using queue "(\S+)"`) ) // processTransientError is called when the server signals a non terminal error // which does not close the connection or trigger a reconnect. // This will trigger the async error callback if set. // These errors include the following: // - permissions violation on publish or subscribe // - maximum subscriptions exceeded func ( *Conn) ( error) { .mu.Lock() .err = if errors.Is(, ErrPermissionViolation) { := permissionsRe.FindStringSubmatch(.Error()) if len() >= 2 { := permissionsQueueRe.FindStringSubmatch(.Error()) var string if len() >= 2 { = [1] } := [1] for , := range .subs { if .Subject == && .Queue == && .permissionsErr == nil { .mu.Lock() if .errCh != nil { .errCh <- } .permissionsErr = .mu.Unlock() } } } } if .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, nil, ) }) } .mu.Unlock() } // processAuthError generally processing for auth errors. We want to do retries // unless we get the same error again. This allows us for instance to swap credentials // and have the app reconnect, but if nothing is changing we should bail. // This function will return true if the connection should be closed, false otherwise. // Connection lock is held on entry func ( *Conn) ( error) bool { .err = if !.initc && .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, nil, ) }) } // We should give up if we tried twice on this server and got the // same error. This behavior can be modified using IgnoreAuthErrorAbort. if .current.lastErr == && !.Opts.IgnoreAuthErrorAbort { .ar = true } else { .current.lastErr = } return .ar } // flusher is a separate Go routine that will process flush requests for the write // bufio. This allows coalescing of writes to the underlying socket. func ( *Conn) () { // Release the wait group defer .wg.Done() // snapshot the bw and conn since they can change from underneath of us. .mu.Lock() := .bw := .conn := .fch .mu.Unlock() if == nil || == nil { return } for { if , := <-; ! { return } .mu.Lock() // Check to see if we should bail out. if !.isConnected() || .isConnecting() || != .conn { .mu.Unlock() return } if .buffered() > 0 { if := .flush(); != nil { if .err == nil { .err = } if .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, nil, ) }) } } } .mu.Unlock() } } // processPing will send an immediate pong protocol response to the // server. The server uses this mechanism to detect dead clients. func ( *Conn) () { .sendProto(pongProto) } // processPong is used to process responses to the client's ping // messages. We use pings for the flush mechanism as well. func ( *Conn) () { var chan struct{} .mu.Lock() if len(.pongs) > 0 { = .pongs[0] .pongs = append(.pongs[:0], .pongs[1:]...) } .pout = 0 .mu.Unlock() if != nil { <- struct{}{} } } // processOK is a placeholder for processing OK messages. func ( *Conn) () { // do nothing } // processInfo is used to parse the info messages sent // from the server. // This function may update the server pool. func ( *Conn) ( string) error { if == _EMPTY_ { return nil } var serverInfo if := json.Unmarshal([]byte(), &); != nil { return } // Copy content into connection's info structure. .info = // The array could be empty/not present on initial connect, // if advertise is disabled on that server, or servers that // did not include themselves in the async INFO protocol. // If empty, do not remove the implicit servers from the pool. if len(.info.ConnectURLs) == 0 { if !.initc && .LameDuckMode && .Opts.LameDuckModeHandler != nil { .ach.push(func() { .Opts.LameDuckModeHandler() }) } return nil } // Note about pool randomization: when the pool was first created, // it was randomized (if allowed). We keep the order the same (removing // implicit servers that are no longer sent to us). New URLs are sent // to us in no specific order so don't need extra randomization. := false // This is what we got from the server we are connected to. := .info.ConnectURLs // Transform that to a map for easy lookups := make(map[string]struct{}, len()) for , := range { [] = struct{}{} } // Walk the pool and removed the implicit servers that are no longer in the // given array/map := .srvPool for := 0; < len(); ++ { := [] := .url.Host // Check if this URL is in the INFO protocol , := [] // Remove from the temp map so that at the end we are left with only // new (or restarted) servers that need to be added to the pool. delete(, ) // Keep servers that were set through Options, but also the one that // we are currently connected to (even if it is a discovered server). if !.isImplicit || .url == .current.url { continue } if ! { // Remove from server pool. Keep current order. copy([:], [+1:]) .srvPool = [:len()-1] = .srvPool -- } } // Figure out if we should save off the current non-IP hostname if we encounter a bare IP. := .current != nil && !hostIsIP(.current.url) // If there are any left in the tmp map, these are new (or restarted) servers // and need to be added to the pool. for := range { // Before adding, check if this is a new (as in never seen) URL. // This is used to figure out if we invoke the DiscoveredServersCB if , := .urls[]; ! { = true } .addURLToPool(fmt.Sprintf("%s://%s", .connScheme(), ), true, ) } if { // Randomize the pool if allowed but leave the first URL in place. if !.Opts.NoRandomize { .shufflePool(1) } if !.initc && .Opts.DiscoveredServersCB != nil { .ach.push(func() { .Opts.DiscoveredServersCB() }) } } if !.initc && .LameDuckMode && .Opts.LameDuckModeHandler != nil { .ach.push(func() { .Opts.LameDuckModeHandler() }) } return nil } // processAsyncInfo does the same than processInfo, but is called // from the parser. Calls processInfo under connection's lock // protection. func ( *Conn) ( []byte) { .mu.Lock() // Ignore errors, we will simply not update the server pool... .processInfo(string()) .mu.Unlock() } // LastError reports the last error encountered via the connection. // It can be used reliably within ClosedCB in order to find out reason // why connection was closed for example. func ( *Conn) () error { if == nil { return ErrInvalidConnection } .mu.RLock() := .err .mu.RUnlock() return } // Check if the given error string is an auth error, and if so returns // the corresponding ErrXXX error, nil otherwise func checkAuthError( string) error { if strings.HasPrefix(, AUTHORIZATION_ERR) { return ErrAuthorization } if strings.HasPrefix(, AUTHENTICATION_EXPIRED_ERR) { return ErrAuthExpired } if strings.HasPrefix(, AUTHENTICATION_REVOKED_ERR) { return ErrAuthRevoked } if strings.HasPrefix(, ACCOUNT_AUTHENTICATION_EXPIRED_ERR) { return ErrAccountAuthExpired } return nil } // processErr processes any error messages from the server and // sets the connection's LastError. func ( *Conn) ( string) { // Trim, remove quotes := normalizeErr() // convert to lower case. := strings.ToLower() var bool // FIXME(dlc) - process Slow Consumer signals special. if == STALE_CONNECTION { = .processOpErr(ErrStaleConnection) } else if == MAX_CONNECTIONS_ERR { = .processOpErr(ErrMaxConnectionsExceeded) } else if strings.HasPrefix(, PERMISSIONS_ERR) { .processTransientError(fmt.Errorf("%w: %s", ErrPermissionViolation, )) } else if strings.HasPrefix(, MAX_SUBSCRIPTIONS_ERR) { .processTransientError(ErrMaxSubscriptionsExceeded) } else if := checkAuthError(); != nil { .mu.Lock() = .processAuthError() .mu.Unlock() } else { = true .mu.Lock() .err = errors.New("nats: " + ) .mu.Unlock() } if { .close(CLOSED, true, nil) } } // kickFlusher will send a bool on a channel to kick the // flush Go routine to flush data to the server. func ( *Conn) () { if .bw != nil { select { case .fch <- struct{}{}: default: } } } // Publish publishes the data argument to the given subject. The data // argument is left untouched and needs to be correctly interpreted on // the receiver. func ( *Conn) ( string, []byte) error { return .publish(, _EMPTY_, nil, ) } // Header represents the optional Header for a NATS message, // based on the implementation of http.Header. type Header map[string][]string // Add adds the key, value pair to the header. It is case-sensitive // and appends to any existing values associated with key. func ( Header) (, string) { [] = append([], ) } // Set sets the header entries associated with key to the single // element value. It is case-sensitive and replaces any existing // values associated with key. func ( Header) (, string) { [] = []string{} } // Get gets the first value associated with the given key. // It is case-sensitive. func ( Header) ( string) string { if == nil { return _EMPTY_ } if := []; != nil { return [0] } return _EMPTY_ } // Values returns all values associated with the given key. // It is case-sensitive. func ( Header) ( string) []string { return [] } // Del deletes the values associated with a key. // It is case-sensitive. func ( Header) ( string) { delete(, ) } // NewMsg creates a message for publishing that will use headers. func ( string) *Msg { return &Msg{ Subject: , Header: make(Header), } } const ( hdrLine = "NATS/1.0\r\n" crlf = "\r\n" hdrPreEnd = len(hdrLine) - len(crlf) statusHdr = "Status" descrHdr = "Description" lastConsumerSeqHdr = "Nats-Last-Consumer" lastStreamSeqHdr = "Nats-Last-Stream" consumerStalledHdr = "Nats-Consumer-Stalled" noResponders = "503" noMessagesSts = "404" reqTimeoutSts = "408" jetStream409Sts = "409" controlMsg = "100" statusLen = 3 // e.g. 20x, 40x, 50x ) // DecodeHeadersMsg will decode and headers. func ( []byte) (Header, error) { := bufio.NewReaderSize(bytes.NewReader(), 128) := textproto.NewReader() , := .ReadLine() if != nil || len() < hdrPreEnd || [:hdrPreEnd] != hdrLine[:hdrPreEnd] { return nil, ErrBadHeaderMsg } , := readMIMEHeader() if != nil { return nil, } // Check if we have an inlined status. if len() > hdrPreEnd { var string := strings.TrimSpace([hdrPreEnd:]) if len() != statusLen { = strings.TrimSpace([statusLen:]) = [:statusLen] } .Add(statusHdr, ) if len() > 0 { .Add(descrHdr, ) } } return Header(), nil } // readMIMEHeader returns a MIMEHeader that preserves the // original case of the MIME header, based on the implementation // of textproto.ReadMIMEHeader. // // https://golang.org/pkg/net/textproto/#Reader.ReadMIMEHeader func readMIMEHeader( *textproto.Reader) (textproto.MIMEHeader, error) { := make(textproto.MIMEHeader) for { , := .ReadLine() if len() == 0 { return , } // Process key fetching original case. := strings.IndexByte(, ':') if < 0 { return nil, ErrBadHeaderMsg } := [:] if == "" { // Skip empty keys. continue } ++ for < len() && ([] == ' ' || [] == '\t') { ++ } [] = append([], [:]) if != nil { return , } } } // PublishMsg publishes the Msg structure, which includes the // Subject, an optional Reply and an optional Data field. func ( *Conn) ( *Msg) error { if == nil { return ErrInvalidMsg } , := .headerBytes() if != nil { return } return .publish(.Subject, .Reply, , .Data) } // PublishRequest will perform a Publish() expecting a response on the // reply subject. Use Request() for automatically waiting for a response // inline. func ( *Conn) (, string, []byte) error { return .publish(, , nil, ) } // Used for handrolled Itoa const digits = "0123456789" // publish is the internal function to publish messages to a nats-server. // Sends a protocol data message by queuing into the bufio writer // and kicking the flush go routine. These writes should be protected. func ( *Conn) (, string, , []byte) error { if == nil { return ErrInvalidConnection } if == "" { return ErrBadSubject } .mu.Lock() // Check if headers attempted to be sent to server that does not support them. if len() > 0 && !.info.Headers { .mu.Unlock() return ErrHeadersNotSupported } if .isClosed() { .mu.Unlock() return ErrConnectionClosed } if .isDrainingPubs() { .mu.Unlock() return ErrConnectionDraining } // Proactively reject payloads over the threshold set by server. := int64(len() + len()) // Skip this check if we are not yet connected (RetryOnFailedConnect) if !.initc && > .info.MaxPayload { .mu.Unlock() return ErrMaxPayload } // Check if we are reconnecting, and if so check if // we have exceeded our reconnect outbound buffer limits. if .bw.atLimitIfUsingPending() { .mu.Unlock() return ErrReconnectBufExceeded } var []byte if != nil { = .scratch[:len(_HPUB_P_)] } else { = .scratch[1:len(_HPUB_P_)] } = append(, ...) = append(, ' ') if != "" { = append(, ...) = append(, ' ') } // We could be smarter here, but simple loop is ok, // just avoid strconv in fast path. // FIXME(dlc) - Find a better way here. // msgh = strconv.AppendInt(msgh, int64(len(data)), 10) // go 1.14 some values strconv faster, may be able to switch over. var [12]byte := len() if != nil { if len() > 0 { for := len(); > 0; /= 10 { -- [] = digits[%10] } } else { -- [] = digits[0] } = append(, [:]...) = append(, ' ') // reset for below. = len() } if > 0 { for := ; > 0; /= 10 { -- [] = digits[%10] } } else { -- [] = digits[0] } = append(, [:]...) = append(, _CRLF_...) if := .bw.appendBufs(, , , _CRLF_BYTES_); != nil { .mu.Unlock() return } .OutMsgs++ .OutBytes += uint64(len() + len()) if len(.fch) == 0 { .kickFlusher() } .mu.Unlock() return nil } // respHandler is the global response handler. It will look up // the appropriate channel based on the last token and place // the message on the channel if possible. func ( *Conn) ( *Msg) { .mu.Lock() // Just return if closed. if .isClosed() { .mu.Unlock() return } var chan *Msg // Grab mch := .respToken(.Subject) if != _EMPTY_ { = .respMap[] // Delete the key regardless, one response only. delete(.respMap, ) } else if len(.respMap) == 1 { // If the server has rewritten the subject, the response token (rt) // will not match (could be the case with JetStream). If that is the // case and there is a single entry, use that. for , := range .respMap { = delete(.respMap, ) break } } .mu.Unlock() // Don't block, let Request timeout instead, mch is // buffered and we should delete the key before a // second response is processed. select { case <- : default: return } } // Helper to setup and send new request style requests. Return the chan to receive the response. func ( *Conn) ( string, , []byte) (chan *Msg, string, error) { .mu.Lock() // Create new literal Inbox and map to a chan msg. := make(chan *Msg, RequestChanLen) := .newRespInbox() := [.respSubLen:] .respMap[] = if .respMux == nil { // Create the response subscription we will use for all new style responses. // This will be on an _INBOX with an additional terminal token. The subscription // will be on a wildcard. , := .subscribeLocked(.respSub, _EMPTY_, .respHandler, nil, nil, false, nil) if != nil { .mu.Unlock() return nil, , } .respMux = } .mu.Unlock() if := .publish(, , , ); != nil { return nil, , } return , , nil } // RequestMsg will send a request payload including optional headers and deliver // the response message, or an error, including a timeout if no message was received properly. func ( *Conn) ( *Msg, time.Duration) (*Msg, error) { if == nil { return nil, ErrInvalidMsg } , := .headerBytes() if != nil { return nil, } return .request(.Subject, , .Data, ) } // Request will send a request payload and deliver the response message, // or an error, including a timeout if no message was received properly. func ( *Conn) ( string, []byte, time.Duration) (*Msg, error) { return .request(, nil, , ) } func ( *Conn) () bool { .mu.RLock() := .Opts.UseOldRequestStyle .mu.RUnlock() return } func ( *Conn) ( string, , []byte, time.Duration) (*Msg, error) { if == nil { return nil, ErrInvalidConnection } var *Msg var error if .useOldRequestStyle() { , = .oldRequest(, , , ) } else { , = .newRequest(, , , ) } // Check for no responder status. if == nil && len(.Data) == 0 && .Header.Get(statusHdr) == noResponders { , = nil, ErrNoResponders } return , } func ( *Conn) ( string, , []byte, time.Duration) (*Msg, error) { , , := .createNewRequestAndSend(, , ) if != nil { return nil, } := globalTimerPool.Get() defer globalTimerPool.Put() var bool var *Msg select { case , = <-: if ! { return nil, ErrConnectionClosed } case <-.C: .mu.Lock() delete(.respMap, ) .mu.Unlock() return nil, ErrTimeout } return , nil } // oldRequest will create an Inbox and perform a Request() call // with the Inbox reply and return the first reply received. // This is optimized for the case of multiple responses. func ( *Conn) ( string, , []byte, time.Duration) (*Msg, error) { := .NewInbox() := make(chan *Msg, RequestChanLen) , := .subscribe(, _EMPTY_, nil, , nil, true, nil) if != nil { return nil, } .AutoUnsubscribe(1) defer .Unsubscribe() = .publish(, , , ) if != nil { return nil, } return .NextMsg() } // InboxPrefix is the prefix for all inbox subjects. const ( InboxPrefix = "_INBOX." inboxPrefixLen = len(InboxPrefix) replySuffixLen = 8 // Gives us 62^8 rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" base = 62 ) // NewInbox will return an inbox string which can be used for directed replies from // subscribers. These are guaranteed to be unique, but can be shared and subscribed // to by others. func () string { var [inboxPrefixLen + nuidSize]byte := [:inboxPrefixLen] copy(, InboxPrefix) := [inboxPrefixLen:] copy(, nuid.Next()) return string([:]) } // Create a new inbox that is prefix aware. func ( *Conn) () string { if .Opts.InboxPrefix == _EMPTY_ { return NewInbox() } var strings.Builder .WriteString(.Opts.InboxPrefix) .WriteByte('.') .WriteString(nuid.Next()) return .String() } // Function to init new response structures. func ( *Conn) () { .respSubPrefix = fmt.Sprintf("%s.", .NewInbox()) .respSubLen = len(.respSubPrefix) .respSub = fmt.Sprintf("%s*", .respSubPrefix) .respMap = make(map[string]chan *Msg) .respRand = rand.New(rand.NewSource(time.Now().UnixNano())) } // newRespInbox creates a new literal response subject // that will trigger the mux subscription handler. // Lock should be held. func ( *Conn) () string { if .respMap == nil { .initNewResp() } var strings.Builder .WriteString(.respSubPrefix) := .respRand.Int63() for := 0; < replySuffixLen; ++ { .WriteByte(rdigits[%base]) /= base } return .String() } // NewRespInbox is the new format used for _INBOX. func ( *Conn) () string { .mu.Lock() := .newRespInbox() .mu.Unlock() return } // respToken will return the last token of a literal response inbox // which we use for the message channel lookup. This needs to verify the subject // prefix matches to protect itself against the server changing the subject. // Lock should be held. func ( *Conn) ( string) string { if , := strings.CutPrefix(, .respSubPrefix); { return } return "" } // Subscribe will express interest in the given subject. The subject // can have wildcards. // There are two type of wildcards: * for partial, and > for full. // A subscription on subject time.*.east would receive messages sent to time.us.east and time.eu.east. // A subscription on subject time.us.> would receive messages sent to // time.us.east and time.us.east.atlanta, while time.us.* would only match time.us.east // since it can't match more than one token. // Messages will be delivered to the associated MsgHandler. func ( *Conn) ( string, MsgHandler) (*Subscription, error) { return .subscribe(, _EMPTY_, , nil, nil, false, nil) } // ChanSubscribe will express interest in the given subject and place // all messages received on the channel. // You should not close the channel until sub.Unsubscribe() has been called. func ( *Conn) ( string, chan *Msg) (*Subscription, error) { return .subscribe(, _EMPTY_, nil, , nil, false, nil) } // ChanQueueSubscribe will express interest in the given subject. // All subscribers with the same queue name will form the queue group // and only one member of the group will be selected to receive any given message, // which will be placed on the channel. // You should not close the channel until sub.Unsubscribe() has been called. // Note: This is the same than QueueSubscribeSyncWithChan. func ( *Conn) (, string, chan *Msg) (*Subscription, error) { return .subscribe(, , nil, , nil, false, nil) } // SubscribeSync will express interest on the given subject. Messages will // be received synchronously using Subscription.NextMsg(). func ( *Conn) ( string) (*Subscription, error) { if == nil { return nil, ErrInvalidConnection } := make(chan *Msg, .Opts.SubChanLen) var chan error if .Opts.PermissionErrOnSubscribe { = make(chan error, 100) } return .subscribe(, _EMPTY_, nil, , , true, nil) } // QueueSubscribe creates an asynchronous queue subscriber on the given subject. // All subscribers with the same queue name will form the queue group and // only one member of the group will be selected to receive any given // message asynchronously. func ( *Conn) (, string, MsgHandler) (*Subscription, error) { return .subscribe(, , , nil, nil, false, nil) } // QueueSubscribeSync creates a synchronous queue subscriber on the given // subject. All subscribers with the same queue name will form the queue // group and only one member of the group will be selected to receive any // given message synchronously using Subscription.NextMsg(). func ( *Conn) (, string) (*Subscription, error) { := make(chan *Msg, .Opts.SubChanLen) var chan error if .Opts.PermissionErrOnSubscribe { = make(chan error, 100) } return .subscribe(, , nil, , , true, nil) } // QueueSubscribeSyncWithChan will express interest in the given subject. // All subscribers with the same queue name will form the queue group // and only one member of the group will be selected to receive any given message, // which will be placed on the channel. // You should not close the channel until sub.Unsubscribe() has been called. // Note: This is the same than ChanQueueSubscribe. func ( *Conn) (, string, chan *Msg) (*Subscription, error) { return .subscribe(, , nil, , nil, false, nil) } // badSubject will do quick test on whether a subject is acceptable. // Spaces are not allowed and all tokens should be > 0 in len. func badSubject( string) bool { if strings.ContainsAny(, " \t\r\n") { return true } := strings.Split(, ".") for , := range { if len() == 0 { return true } } return false } // badQueue will check a queue name for whitespace. func badQueue( string) bool { return strings.ContainsAny(, " \t\r\n") } // subscribe is the internal subscribe function that indicates interest in a subject. func ( *Conn) (, string, MsgHandler, chan *Msg, chan (error), bool, *jsSub) (*Subscription, error) { if == nil { return nil, ErrInvalidConnection } .mu.Lock() defer .mu.Unlock() return .subscribeLocked(, , , , , , ) } func ( *Conn) (, string, MsgHandler, chan *Msg, chan (error), bool, *jsSub) (*Subscription, error) { if == nil { return nil, ErrInvalidConnection } if badSubject() { return nil, ErrBadSubject } if != _EMPTY_ && badQueue() { return nil, ErrBadQueueName } // Check for some error conditions. if .isClosed() { return nil, ErrConnectionClosed } if .isDraining() { return nil, ErrConnectionDraining } if == nil && == nil { return nil, ErrBadSubscription } := &Subscription{ Subject: , Queue: , mcb: , conn: , jsi: , } // Set pending limits. if != nil { .pMsgsLimit = cap() } else { .pMsgsLimit = DefaultSubPendingMsgsLimit } .pBytesLimit = DefaultSubPendingBytesLimit // If we have an async callback, start up a sub specific // Go routine to deliver the messages. var bool if != nil { .typ = AsyncSubscription .pCond = sync.NewCond(&.mu) = true } else if ! { .typ = ChanSubscription .mch = } else { // Sync Subscription .typ = SyncSubscription .mch = .errCh = } .subsMu.Lock() .ssid++ .sid = .ssid .subs[.sid] = .subsMu.Unlock() // Let's start the go routine now that it is fully setup and registered. if { go .waitForMsgs() } // We will send these for all subs when we reconnect // so that we can suppress here if reconnecting. if !.isReconnecting() { .bw.appendString(fmt.Sprintf(subProto, , , .sid)) .kickFlusher() } .changeSubStatus(SubscriptionActive) return , nil } // NumSubscriptions returns active number of subscriptions. func ( *Conn) () int { .mu.RLock() defer .mu.RUnlock() return len(.subs) } // Lock for nc should be held here upon entry func ( *Conn) ( *Subscription) { .subsMu.Lock() delete(.subs, .sid) .subsMu.Unlock() .mu.Lock() defer .mu.Unlock() // Release callers on NextMsg for SyncSubscription only if .mch != nil && .typ == SyncSubscription { close(.mch) } .mch = nil // If JS subscription then stop HB timer. if := .jsi; != nil { if .hbc != nil { .hbc.Stop() .hbc = nil } if .csfct != nil { .csfct.Stop() .csfct = nil } } if .typ != AsyncSubscription { := .pDone if != nil { (.Subject) } } // Mark as invalid .closed = true .changeSubStatus(SubscriptionClosed) if .pCond != nil { .pCond.Broadcast() } } // SubscriptionType is the type of the Subscription. type SubscriptionType int // The different types of subscription types. const ( AsyncSubscription = SubscriptionType(iota) SyncSubscription ChanSubscription NilSubscription PullSubscription ) // Type returns the type of Subscription. func ( *Subscription) () SubscriptionType { if == nil { return NilSubscription } .mu.Lock() defer .mu.Unlock() // Pull subscriptions are really a SyncSubscription and we want this // type to be set internally for all delivered messages management, etc.. // So check when to return PullSubscription to the user. if .jsi != nil && .jsi.pull { return PullSubscription } return .typ } // IsValid returns a boolean indicating whether the subscription // is still active. This will return false if the subscription has // already been closed. func ( *Subscription) () bool { if == nil { return false } .mu.Lock() defer .mu.Unlock() return .conn != nil && !.closed } // Drain will remove interest but continue callbacks until all messages // have been processed. // // For a JetStream subscription, if the library has created the JetStream // consumer, the library will send a DeleteConsumer request to the server // when the Drain operation completes. If a failure occurs when deleting // the JetStream consumer, an error will be reported to the asynchronous // error callback. // If you do not wish the JetStream consumer to be automatically deleted, // ensure that the consumer is not created by the library, which means // create the consumer with AddConsumer and bind to this consumer. func ( *Subscription) () error { if == nil { return ErrBadSubscription } .mu.Lock() := .conn .mu.Unlock() if == nil { return ErrBadSubscription } return .unsubscribe(, 0, true) } // IsDraining returns a boolean indicating whether the subscription // is being drained. // This will return false if the subscription has already been closed. func ( *Subscription) () bool { if == nil { return false } .mu.Lock() defer .mu.Unlock() return .draining } // StatusChanged returns a channel on which given list of subscription status // changes will be sent. If no status is provided, all status changes will be sent. // Available statuses are SubscriptionActive, SubscriptionDraining, SubscriptionClosed, // and SubscriptionSlowConsumer. // The returned channel will be closed when the subscription is closed. func ( *Subscription) ( ...SubStatus) <-chan SubStatus { if len() == 0 { = []SubStatus{SubscriptionActive, SubscriptionDraining, SubscriptionClosed, SubscriptionSlowConsumer} } := make(chan SubStatus, 10) .mu.Lock() defer .mu.Unlock() for , := range { .registerStatusChangeListener(, ) // initial status if == .status { <- } } return } // registerStatusChangeListener registers a channel waiting for a specific status change event. // Status change events are non-blocking - if no receiver is waiting for the status change, // it will not be sent on the channel. Closed channels are ignored. // Lock should be held entering. func ( *Subscription) ( SubStatus, chan SubStatus) { if .statListeners == nil { .statListeners = make(map[chan SubStatus][]SubStatus) } if , := .statListeners[]; ! { .statListeners[] = make([]SubStatus, 0) } .statListeners[] = append(.statListeners[], ) } // sendStatusEvent sends subscription status event to all channels. // If there is no listener, sendStatusEvent // will not block. Lock should be held entering. func ( *Subscription) ( SubStatus) { for , := range .statListeners { if !containsStatus(, ) { continue } // only send event if someone's listening select { case <- : default: } if == SubscriptionClosed { close() } } } func containsStatus( []SubStatus, SubStatus) bool { for , := range { if == { return true } } return false } // changeSubStatus changes subscription status and sends events // to all listeners. Lock should be held entering. func ( *Subscription) ( SubStatus) { if == nil { return } .sendStatusEvent() .status = } // Unsubscribe will remove interest in the given subject. // // For a JetStream subscription, if the library has created the JetStream // consumer, it will send a DeleteConsumer request to the server (if the // unsubscribe itself was successful). If the delete operation fails, the // error will be returned. // If you do not wish the JetStream consumer to be automatically deleted, // ensure that the consumer is not created by the library, which means // create the consumer with AddConsumer and bind to this consumer (using // the nats.Bind() option). func ( *Subscription) () error { if == nil { return ErrBadSubscription } .mu.Lock() := .conn := .closed := .jsi != nil && .jsi.dc .mu.Unlock() if == nil || .IsClosed() { return ErrConnectionClosed } if { return ErrBadSubscription } if .IsDraining() { return ErrConnectionDraining } := .unsubscribe(, 0, false) if == nil && { = .deleteConsumer() } return } // checkDrained will watch for a subscription to be fully drained // and then remove it. func ( *Conn) ( *Subscription) { if == nil || == nil { return } defer func() { .mu.Lock() defer .mu.Unlock() .draining = false }() // This allows us to know that whatever we have in the client pending // is correct and the server will not send additional information. .Flush() .mu.Lock() // For JS subscriptions, check if we are going to delete the // JS consumer when drain completes. := .jsi != nil && .jsi.dc .mu.Unlock() // Once we are here we just wait for Pending to reach 0 or // any other state to exit this go routine. for { // check connection is still valid. if .IsClosed() { return } // Check subscription state .mu.Lock() := .conn := .closed := .pMsgs .mu.Unlock() if == nil || || == 0 { .mu.Lock() .removeSub() .mu.Unlock() if { if := .deleteConsumer(); != nil { .mu.Lock() if := .Opts.AsyncErrorCB; != nil { .ach.push(func() { (, , ) }) } .mu.Unlock() } } return } time.Sleep(100 * time.Millisecond) } } // AutoUnsubscribe will issue an automatic Unsubscribe that is // processed by the server when max messages have been received. // This can be useful when sending a request to an unknown number // of subscribers. func ( *Subscription) ( int) error { if == nil { return ErrBadSubscription } .mu.Lock() := .conn := .closed .mu.Unlock() if == nil || { return ErrBadSubscription } return .unsubscribe(, , false) } // SetClosedHandler will set the closed handler for when a subscription // is closed (either unsubscribed or drained). func ( *Subscription) ( func( string)) { .mu.Lock() .pDone = .mu.Unlock() } // unsubscribe performs the low level unsubscribe to the server. // Use Subscription.Unsubscribe() func ( *Conn) ( *Subscription, int, bool) error { var string if > 0 { .mu.Lock() .max = uint64() if .delivered < .max { = strconv.Itoa() } .mu.Unlock() } .mu.Lock() // ok here, but defer is expensive defer .mu.Unlock() if .isClosed() { return ErrConnectionClosed } .subsMu.RLock() := .subs[.sid] .subsMu.RUnlock() // Already unsubscribed if == nil { return nil } if == _EMPTY_ && ! { .removeSub() } if { .mu.Lock() .draining = true .changeSubStatus(SubscriptionDraining) .mu.Unlock() go .checkDrained() } // We will send these for all subs when we reconnect // so that we can suppress here. if !.isReconnecting() { .bw.appendString(fmt.Sprintf(unsubProto, .sid, )) .kickFlusher() } // For JetStream subscriptions cancel the attached context if there is any. var func() .mu.Lock() := .jsi if != nil { = .cancel .cancel = nil } .mu.Unlock() if != nil { () } return nil } // NextMsg will return the next message available to a synchronous subscriber // or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription), // the connection is closed (ErrConnectionClosed), the timeout is reached (ErrTimeout), // or if there were no responders (ErrNoResponders) when used in the context of a request/reply. func ( *Subscription) ( time.Duration) (*Msg, error) { if == nil { return nil, ErrBadSubscription } .mu.Lock() := .validateNextMsgState(false) if != nil { .mu.Unlock() return nil, } // snapshot := .mch .mu.Unlock() var bool var *Msg // If something is available right away, let's optimize that case. select { case , = <-: if ! { return nil, .getNextMsgErr() } if := .processNextMsgDelivered(); != nil { return nil, } else { return , nil } default: } // If we are here a message was not immediately available, so lets loop // with a timeout. := globalTimerPool.Get() defer globalTimerPool.Put() if .errCh != nil { select { case , = <-: if ! { return nil, .getNextMsgErr() } if := .processNextMsgDelivered(); != nil { return nil, } case := <-.errCh: return nil, case <-.C: return nil, ErrTimeout } } else { select { case , = <-: if ! { return nil, .getNextMsgErr() } if := .processNextMsgDelivered(); != nil { return nil, } case <-.C: return nil, ErrTimeout } } return , nil } // nextMsgNoTimeout works similarly to Subscription.NextMsg() but will not // time out. It is only used internally for non-timeout subscription iterator. func ( *Subscription) () (*Msg, error) { if == nil { return nil, ErrBadSubscription } .mu.Lock() := .validateNextMsgState(false) if != nil { .mu.Unlock() return nil, } // snapshot := .mch .mu.Unlock() var bool var *Msg // If something is available right away, let's optimize that case. select { case , = <-: if ! { return nil, .getNextMsgErr() } if := .processNextMsgDelivered(); != nil { return nil, } else { return , nil } default: } if .errCh != nil { select { case , = <-: if ! { return nil, .getNextMsgErr() } if := .processNextMsgDelivered(); != nil { return nil, } case := <-.errCh: return nil, } } else { , = <- if ! { return nil, .getNextMsgErr() } if := .processNextMsgDelivered(); != nil { return nil, } } return , nil } // validateNextMsgState checks whether the subscription is in a valid // state to call NextMsg and be delivered another message synchronously. // This should be called while holding the lock. func ( *Subscription) ( bool) error { if .connClosed { return ErrConnectionClosed } if .mch == nil { if .max > 0 && .delivered >= .max { return ErrMaxMessages } else if .closed { return ErrBadSubscription } } if .mcb != nil { return ErrSyncSubRequired } // if this subscription previously had a permissions error // and no reconnect has been attempted, return the permissions error // since the subscription does not exist on the server if .conn.Opts.PermissionErrOnSubscribe && .permissionsErr != nil { return .permissionsErr } if .sc { .changeSubStatus(SubscriptionActive) .sc = false return ErrSlowConsumer } // Unless this is from an internal call, reject use of this API. // Users should use Fetch() instead. if ! && .jsi != nil && .jsi.pull { return ErrTypeSubscription } return nil } // This is called when the sync channel has been closed. // The error returned will be either connection or subscription // closed depending on what caused NextMsg() to fail. func ( *Subscription) () error { .mu.Lock() defer .mu.Unlock() if .connClosed { return ErrConnectionClosed } return ErrBadSubscription } // processNextMsgDelivered takes a message and applies the needed // accounting to the stats from the subscription, returning an // error in case we have the maximum number of messages have been // delivered already. It should not be called while holding the lock. func ( *Subscription) ( *Msg) error { .mu.Lock() := .conn := .max var string // Update some stats. .delivered++ := .delivered if .jsi != nil { = .checkForFlowControlResponse() } if .typ == SyncSubscription { .pMsgs-- .pBytes -= len(.Data) } .mu.Unlock() if != _EMPTY_ { .Publish(, nil) } if > 0 { if > { return ErrMaxMessages } // Remove subscription if we have reached max. if == { .mu.Lock() .removeSub() .mu.Unlock() } } if len(.Data) == 0 && .Header.Get(statusHdr) == noResponders { return ErrNoResponders } return nil } // Queued returns the number of queued messages in the client for this subscription. // // Deprecated: Use Pending() func ( *Subscription) () (int, error) { , , := .Pending() return int(), } // Pending returns the number of queued messages and queued bytes in the client for this subscription. func ( *Subscription) () (int, int, error) { if == nil { return -1, -1, ErrBadSubscription } .mu.Lock() defer .mu.Unlock() if .conn == nil || .closed { return -1, -1, ErrBadSubscription } if .typ == ChanSubscription { return -1, -1, ErrTypeSubscription } return .pMsgs, .pBytes, nil } // MaxPending returns the maximum number of queued messages and queued bytes seen so far. func ( *Subscription) () (int, int, error) { if == nil { return -1, -1, ErrBadSubscription } .mu.Lock() defer .mu.Unlock() if .conn == nil || .closed { return -1, -1, ErrBadSubscription } if .typ == ChanSubscription { return -1, -1, ErrTypeSubscription } return .pMsgsMax, .pBytesMax, nil } // ClearMaxPending resets the maximums seen so far. func ( *Subscription) () error { if == nil { return ErrBadSubscription } .mu.Lock() defer .mu.Unlock() if .conn == nil || .closed { return ErrBadSubscription } if .typ == ChanSubscription { return ErrTypeSubscription } .pMsgsMax, .pBytesMax = 0, 0 return nil } // Pending Limits const ( // DefaultSubPendingMsgsLimit will be 512k msgs. DefaultSubPendingMsgsLimit = 512 * 1024 // DefaultSubPendingBytesLimit is 64MB DefaultSubPendingBytesLimit = 64 * 1024 * 1024 ) // PendingLimits returns the current limits for this subscription. // If no error is returned, a negative value indicates that the // given metric is not limited. func ( *Subscription) () (int, int, error) { if == nil { return -1, -1, ErrBadSubscription } .mu.Lock() defer .mu.Unlock() if .conn == nil || .closed { return -1, -1, ErrBadSubscription } if .typ == ChanSubscription { return -1, -1, ErrTypeSubscription } return .pMsgsLimit, .pBytesLimit, nil } // SetPendingLimits sets the limits for pending msgs and bytes for this subscription. // Zero is not allowed. Any negative value means that the given metric is not limited. func ( *Subscription) (, int) error { if == nil { return ErrBadSubscription } .mu.Lock() defer .mu.Unlock() if .conn == nil || .closed { return ErrBadSubscription } if .typ == ChanSubscription { return ErrTypeSubscription } if == 0 || == 0 { return ErrInvalidArg } .pMsgsLimit, .pBytesLimit = , return nil } // Delivered returns the number of delivered messages for this subscription. func ( *Subscription) () (int64, error) { if == nil { return -1, ErrBadSubscription } .mu.Lock() defer .mu.Unlock() if .conn == nil || .closed { return -1, ErrBadSubscription } return int64(.delivered), nil } // Dropped returns the number of known dropped messages for this subscription. // This will correspond to messages dropped by violations of PendingLimits. If // the server declares the connection a SlowConsumer, this number may not be // valid. func ( *Subscription) () (int, error) { if == nil { return -1, ErrBadSubscription } .mu.Lock() defer .mu.Unlock() if .conn == nil || .closed { return -1, ErrBadSubscription } return .dropped, nil } // Respond allows a convenient way to respond to requests in service based subscriptions. func ( *Msg) ( []byte) error { if == nil || .Sub == nil { return ErrMsgNotBound } if .Reply == "" { return ErrMsgNoReply } .Sub.mu.Lock() := .Sub.conn .Sub.mu.Unlock() // No need to check the connection here since the call to publish will do all the checking. return .Publish(.Reply, ) } // RespondMsg allows a convenient way to respond to requests in service based subscriptions that might include headers func ( *Msg) ( *Msg) error { if == nil || .Sub == nil { return ErrMsgNotBound } if .Reply == "" { return ErrMsgNoReply } .Subject = .Reply .Sub.mu.Lock() := .Sub.conn .Sub.mu.Unlock() // No need to check the connection here since the call to publish will do all the checking. return .PublishMsg() } // FIXME: This is a hack // removeFlushEntry is needed when we need to discard queued up responses // for our pings as part of a flush call. This happens when we have a flush // call outstanding and we call close. func ( *Conn) ( chan struct{}) bool { .mu.Lock() defer .mu.Unlock() if .pongs == nil { return false } for , := range .pongs { if == { .pongs[] = nil return true } } return false } // The lock must be held entering this function. func ( *Conn) ( chan struct{}) { .pongs = append(.pongs, ) .bw.appendString(pingProto) // Flush in place. .bw.flush() } // This will fire periodically and send a client origin // ping to the server. Will also check that we have received // responses from the server. func ( *Conn) () { .mu.Lock() if .status != CONNECTED { .mu.Unlock() return } // Check for violation .pout++ if .pout > .Opts.MaxPingsOut { .mu.Unlock() if := .processOpErr(ErrStaleConnection); { .close(CLOSED, true, nil) } return } .sendPing(nil) .ptmr.Reset(.Opts.PingInterval) .mu.Unlock() } // FlushTimeout allows a Flush operation to have an associated timeout. func ( *Conn) ( time.Duration) ( error) { if == nil { return ErrInvalidConnection } if <= 0 { return ErrBadTimeout } .mu.Lock() if .isClosed() { .mu.Unlock() return ErrConnectionClosed } := globalTimerPool.Get() defer globalTimerPool.Put() // Create a buffered channel to prevent chan send to block // in processPong() if this code here times out just when // PONG was received. := make(chan struct{}, 1) .sendPing() .mu.Unlock() select { case , := <-: if ! { = ErrConnectionClosed } else { close() } case <-.C: = ErrTimeout } if != nil { .removeFlushEntry() } return } // RTT calculates the round trip time between this client and the server. func ( *Conn) () (time.Duration, error) { if .IsClosed() { return 0, ErrConnectionClosed } if .IsReconnecting() { return 0, ErrDisconnected } := time.Now() if := .FlushTimeout(10 * time.Second); != nil { return 0, } return time.Since(), nil } // Flush will perform a round trip to the server and return when it // receives the internal reply. func ( *Conn) () error { return .FlushTimeout(10 * time.Second) } // Buffered will return the number of bytes buffered to be sent to the server. // FIXME(dlc) take into account disconnected state. func ( *Conn) () (int, error) { .mu.RLock() defer .mu.RUnlock() if .isClosed() || .bw == nil { return -1, ErrConnectionClosed } return .bw.buffered(), nil } // resendSubscriptions will send our subscription state back to the // server. Used in reconnects func ( *Conn) () { // Since we are going to send protocols to the server, we don't want to // be holding the subsMu lock (which is used in processMsg). So copy // the subscriptions in a temporary array. .subsMu.RLock() := make([]*Subscription, 0, len(.subs)) for , := range .subs { = append(, ) } .subsMu.RUnlock() for , := range { := uint64(0) .mu.Lock() // when resending subscriptions, the permissions error should be cleared // since the user may have fixed the permissions issue .permissionsErr = nil if .max > 0 { if .delivered < .max { = .max - .delivered } // adjustedMax could be 0 here if the number of delivered msgs // reached the max, if so unsubscribe. if == 0 { .mu.Unlock() .bw.writeDirect(fmt.Sprintf(unsubProto, .sid, _EMPTY_)) continue } } , , := .Subject, .Queue, .sid .mu.Unlock() .bw.writeDirect(fmt.Sprintf(subProto, , , )) if > 0 { := strconv.Itoa(int()) .bw.writeDirect(fmt.Sprintf(unsubProto, , )) } } } // This will clear any pending flush calls and release pending calls. // Lock is assumed to be held by the caller. func ( *Conn) () { // Clear any queued pongs, e.g. pending flush calls. for , := range .pongs { if != nil { close() } } .pongs = nil } // This will clear any pending Request calls. // Lock is assumed to be held by the caller. func ( *Conn) () { for , := range .respMap { if != nil { close() delete(.respMap, ) } } } // Low level close call that will do correct cleanup and set // desired status. Also controls whether user defined callbacks // will be triggered. The lock should not be held entering this // function. This function will handle the locking manually. func ( *Conn) ( Status, bool, error) { .mu.Lock() if .isClosed() { .status = .mu.Unlock() return } .status = CLOSED // Kick the Go routines so they fall out. .kickFlusher() // If the reconnect timer is waiting between a reconnect attempt, // this will kick it out. if .rqch != nil { close(.rqch) .rqch = nil } // Clear any queued pongs, e.g. pending flush calls. .clearPendingFlushCalls() // Clear any queued and blocking Requests. .clearPendingRequestCalls() // Stop ping timer if set. .stopPingTimer() .ptmr = nil // Need to close and set TCP conn to nil if reconnect loop has stopped, // otherwise we would incorrectly invoke Disconnect handler (if set) // down below. if .ar && .conn != nil { .conn.Close() .conn = nil } else if .conn != nil { // Go ahead and make sure we have flushed the outbound .bw.flush() defer .conn.Close() } // Close sync subscriber channels and release any // pending NextMsg() calls. .subsMu.Lock() for , := range .subs { .mu.Lock() // Release callers on NextMsg for SyncSubscription only if .mch != nil && .typ == SyncSubscription { close(.mch) } .mch = nil // Mark as invalid, for signaling to waitForMsgs .closed = true // Mark connection closed in subscription .connClosed = true // If we have an async subscription, signals it to exit if .typ == AsyncSubscription && .pCond != nil { .pCond.Signal() } .mu.Unlock() } .subs = nil .subsMu.Unlock() .changeConnStatus() // Perform appropriate callback if needed for a disconnect. if { if .conn != nil { if := .Opts.DisconnectedErrCB; != nil { .ach.push(func() { (, ) }) } else if := .Opts.DisconnectedCB; != nil { .ach.push(func() { () }) } } if .Opts.ClosedCB != nil { .ach.push(func() { .Opts.ClosedCB() }) } } // If this is terminal, then we have to notify the asyncCB handler that // it can exit once all async callbacks have been dispatched. if == CLOSED { .ach.close() } .mu.Unlock() } // Close will close the connection to the server. This call will release // all blocking calls, such as Flush() and NextMsg() func ( *Conn) () { if != nil { // This will be a no-op if the connection was not websocket. // We do this here as opposed to inside close() because we want // to do this only for the final user-driven close of the client. // Otherwise, we would need to change close() to pass a boolean // indicating that this is the case. .wsClose() .close(CLOSED, !.Opts.NoCallbacksAfterClientClose, nil) } } // IsClosed tests if a Conn has been closed. func ( *Conn) () bool { .mu.RLock() defer .mu.RUnlock() return .isClosed() } // IsReconnecting tests if a Conn is reconnecting. func ( *Conn) () bool { .mu.RLock() defer .mu.RUnlock() return .isReconnecting() } // IsConnected tests if a Conn is connected. func ( *Conn) () bool { .mu.RLock() defer .mu.RUnlock() return .isConnected() } // drainConnection will run in a separate Go routine and will // flush all publishes and drain all active subscriptions. func ( *Conn) () { // Snapshot subs list. .mu.Lock() // Check again here if we are in a state to not process. if .isClosed() { .mu.Unlock() return } if .isConnecting() || .isReconnecting() { .mu.Unlock() // Move to closed state. .Close() return } := make([]*Subscription, 0, len(.subs)) for , := range .subs { if == .respMux { // Skip since might be in use while messages // are being processed (can miss responses). continue } = append(, ) } := .Opts.AsyncErrorCB := .Opts.DrainTimeout := .respMux .mu.Unlock() // for pushing errors with context. := func( error) { .mu.Lock() .err = if != nil { .ach.push(func() { (, nil, ) }) } .mu.Unlock() } // Do subs first, skip request handler if present. for , := range { if := .Drain(); != nil { // We will notify about these but continue. () } } // Wait for the subscriptions to drop to zero. := time.Now().Add() var int if != nil { = 1 } else { = 0 } for time.Now().Before() { if .NumSubscriptions() == { break } time.Sleep(10 * time.Millisecond) } // In case there was a request/response handler // then need to call drain at the end. if != nil { if := .Drain(); != nil { // We will notify about these but continue. () } for time.Now().Before() { if .NumSubscriptions() == 0 { break } time.Sleep(10 * time.Millisecond) } } // Check if we timed out. if .NumSubscriptions() != 0 { (ErrDrainTimeout) } // Flip State .mu.Lock() .changeConnStatus(DRAINING_PUBS) .mu.Unlock() // Do publish drain via Flush() call. := .FlushTimeout(5 * time.Second) if != nil { () } // Move to closed state. .Close() } // Drain will put a connection into a drain state. All subscriptions will // immediately be put into a drain state. Upon completion, the publishers // will be drained and can not publish any additional messages. Upon draining // of the publishers, the connection will be closed. Use the ClosedCB // option to know when the connection has moved from draining to closed. // // See note in Subscription.Drain for JetStream subscriptions. func ( *Conn) () error { .mu.Lock() if .isClosed() { .mu.Unlock() return ErrConnectionClosed } if .isConnecting() || .isReconnecting() { .mu.Unlock() .Close() return ErrConnectionReconnecting } if .isDraining() { .mu.Unlock() return nil } .changeConnStatus(DRAINING_SUBS) go .drainConnection() .mu.Unlock() return nil } // IsDraining tests if a Conn is in the draining state. func ( *Conn) () bool { .mu.RLock() defer .mu.RUnlock() return .isDraining() } // caller must lock func ( *Conn) ( bool) []string { := len(.srvPool) := make([]string, 0) for := 0; < ; ++ { if && !.srvPool[].isImplicit { continue } := .srvPool[].url = append(, fmt.Sprintf("%s://%s", .Scheme, .Host)) } return } // Servers returns the list of known server urls, including additional // servers discovered after a connection has been established. If // authentication is enabled, use UserInfo or Token when connecting with // these urls. func ( *Conn) () []string { .mu.RLock() defer .mu.RUnlock() return .getServers(false) } // DiscoveredServers returns only the server urls that have been discovered // after a connection has been established. If authentication is enabled, // use UserInfo or Token when connecting with these urls. func ( *Conn) () []string { .mu.RLock() defer .mu.RUnlock() return .getServers(true) } // Status returns the current state of the connection. func ( *Conn) () Status { .mu.RLock() defer .mu.RUnlock() return .status } // Test if Conn has been closed Lock is assumed held. func ( *Conn) () bool { return .status == CLOSED } // Test if Conn is in the process of connecting func ( *Conn) () bool { return .status == CONNECTING } // Test if Conn is being reconnected. func ( *Conn) () bool { return .status == RECONNECTING } // Test if Conn is connected or connecting. func ( *Conn) () bool { return .status == CONNECTED || .isDraining() } // Test if Conn is in the draining state. func ( *Conn) () bool { return .status == DRAINING_SUBS || .status == DRAINING_PUBS } // Test if Conn is in the draining state for pubs. func ( *Conn) () bool { return .status == DRAINING_PUBS } // Stats will return a race safe copy of the Statistics section for the connection. func ( *Conn) () Statistics { // Stats are updated either under connection's mu or with atomic operations // for inbound stats in processMsg(). .mu.Lock() := Statistics{ InMsgs: atomic.LoadUint64(&.InMsgs), InBytes: atomic.LoadUint64(&.InBytes), OutMsgs: .OutMsgs, OutBytes: .OutBytes, Reconnects: .Reconnects, } .mu.Unlock() return } // MaxPayload returns the size limit that a message payload can have. // This is set by the server configuration and delivered to the client // upon connect. func ( *Conn) () int64 { .mu.RLock() defer .mu.RUnlock() return .info.MaxPayload } // HeadersSupported will return if the server supports headers func ( *Conn) () bool { .mu.RLock() defer .mu.RUnlock() return .info.Headers } // AuthRequired will return if the connected server requires authorization. func ( *Conn) () bool { .mu.RLock() defer .mu.RUnlock() return .info.AuthRequired } // TLSRequired will return if the connected server requires TLS connections. func ( *Conn) () bool { .mu.RLock() defer .mu.RUnlock() return .info.TLSRequired } // Barrier schedules the given function `f` to all registered asynchronous // subscriptions. // Only the last subscription to see this barrier will invoke the function. // If no subscription is registered at the time of this call, `f()` is invoked // right away. // ErrConnectionClosed is returned if the connection is closed prior to // the call. func ( *Conn) ( func()) error { .mu.Lock() if .isClosed() { .mu.Unlock() return ErrConnectionClosed } .subsMu.Lock() // Need to figure out how many non chan subscriptions there are := 0 for , := range .subs { if .typ == AsyncSubscription { ++ } } if == 0 { .subsMu.Unlock() .mu.Unlock() () return nil } := &barrierInfo{refs: int64(), f: } for , := range .subs { .mu.Lock() if .mch == nil { := &Msg{barrier: } // Push onto the async pList if .pTail != nil { .pTail.next = } else { .pHead = .pCond.Signal() } .pTail = } .mu.Unlock() } .subsMu.Unlock() .mu.Unlock() return nil } // GetClientIP returns the client IP as known by the server. // Supported as of server version 2.1.6. func ( *Conn) () (net.IP, error) { .mu.RLock() defer .mu.RUnlock() if .isClosed() { return nil, ErrConnectionClosed } if .info.ClientIP == "" { return nil, ErrClientIPNotSupported } := net.ParseIP(.info.ClientIP) return , nil } // GetClientID returns the client ID assigned by the server to which // the client is currently connected to. Note that the value may change if // the client reconnects. // This function returns ErrClientIDNotSupported if the server is of a // version prior to 1.2.0. func ( *Conn) () (uint64, error) { .mu.RLock() defer .mu.RUnlock() if .isClosed() { return 0, ErrConnectionClosed } if .info.CID == 0 { return 0, ErrClientIDNotSupported } return .info.CID, nil } // StatusChanged returns a channel on which given list of connection status changes will be reported. // If no statuses are provided, defaults will be used: CONNECTED, RECONNECTING, DISCONNECTED, CLOSED. func ( *Conn) ( ...Status) chan Status { if len() == 0 { = []Status{CONNECTED, RECONNECTING, DISCONNECTED, CLOSED} } := make(chan Status, 10) .mu.Lock() defer .mu.Unlock() for , := range { .registerStatusChangeListener(, ) } return } // RemoveStatusListener removes a status change listener. // If the channel is not closed, it will be closed. // Listeners will be removed automatically on status change // as well, but this is a way to remove them manually. func ( *Conn) ( chan (Status)) { .mu.Lock() defer .mu.Unlock() select { case <-: default: close() } for , := range .statListeners { for := range { delete(, ) } } } // registerStatusChangeListener registers a channel waiting for a specific status change event. // Status change events are non-blocking - if no receiver is waiting for the status change, // it will not be sent on the channel. Closed channels are ignored. // The lock should be held entering. func ( *Conn) ( Status, chan Status) { if .statListeners == nil { .statListeners = make(map[Status]map[chan Status]struct{}) } if , := .statListeners[]; ! { .statListeners[] = make(map[chan Status]struct{}) } .statListeners[][] = struct{}{} } // sendStatusEvent sends connection status event to all channels. // If channel is closed, or there is no listener, sendStatusEvent // will not block. Lock should be held entering. func ( *Conn) ( Status) { : for := range .statListeners[] { // make sure channel is not closed select { case <-: // if chan is closed, remove it delete(.statListeners[], ) continue default: } // only send event if someone's listening select { case <- : default: } } } // changeConnStatus changes connections status and sends events // to all listeners. Lock should be held entering. func ( *Conn) ( Status) { if == nil { return } .sendStatusEvent() .status = } // NkeyOptionFromSeed will load an nkey pair from a seed file. // It will return the NKey Option and will handle // signing of nonce challenges from the server. It will take // care to not hold keys in memory and to wipe memory. func ( string) (Option, error) { , := nkeyPairFromSeedFile() if != nil { return nil, } // Wipe our key on exit. defer .Wipe() , := .PublicKey() if != nil { return nil, } if !nkeys.IsValidPublicUserKey() { return nil, errors.New("nats: Not a valid nkey user seed") } := func( []byte) ([]byte, error) { return sigHandler(, ) } return Nkey(string(), ), nil } // Just wipe slice with 'x', for clearing contents of creds or nkey seed file. func wipeSlice( []byte) { for := range { [] = 'x' } } func userFromFile( string) (string, error) { , := expandPath() if != nil { return _EMPTY_, fmt.Errorf("nats: %w", ) } , := os.ReadFile() if != nil { return _EMPTY_, fmt.Errorf("nats: %w", ) } defer wipeSlice() return nkeys.ParseDecoratedJWT() } func homeDir() (string, error) { if runtime.GOOS == "windows" { , := os.Getenv("HOMEDRIVE"), os.Getenv("HOMEPATH") := os.Getenv("USERPROFILE") var string if == "" || == "" { if == "" { return _EMPTY_, errors.New("nats: failed to get home dir, require %HOMEDRIVE% and %HOMEPATH% or %USERPROFILE%") } = } else { = filepath.Join(, ) } return , nil } := os.Getenv("HOME") if == "" { return _EMPTY_, errors.New("nats: failed to get home dir, require $HOME") } return , nil } func expandPath( string) (string, error) { = os.ExpandEnv() if !strings.HasPrefix(, "~") { return , nil } , := homeDir() if != nil { return _EMPTY_, } return filepath.Join(, [1:]), nil } func nkeyPairFromSeedFile( string) (nkeys.KeyPair, error) { , := os.ReadFile() if != nil { return nil, fmt.Errorf("nats: %w", ) } defer wipeSlice() return nkeys.ParseDecoratedNKey() } // Sign authentication challenges from the server. // Do not keep private seed in memory. func sigHandler( []byte, string) ([]byte, error) { , := nkeyPairFromSeedFile() if != nil { return nil, fmt.Errorf("unable to extract key pair from file %q: %w", , ) } // Wipe our key on exit. defer .Wipe() , := .Sign() return , nil } type timeoutWriter struct { timeout time.Duration conn net.Conn err error } // Write implements the io.Writer interface. func ( *timeoutWriter) ( []byte) (int, error) { if .err != nil { return 0, .err } var int .conn.SetWriteDeadline(time.Now().Add(.timeout)) , .err = .conn.Write() .conn.SetWriteDeadline(time.Time{}) return , .err }