// Copyright 2012-2024 The NATS Authors// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.
// A Go client for the NATS messaging system (https://nats.io).
package natsimport ()// Default Constantsconst (Version = "1.41.2"DefaultURL = "nats://127.0.0.1:4222"DefaultPort = 4222DefaultMaxReconnect = 60DefaultReconnectWait = 2 * time.SecondDefaultReconnectJitter = 100 * time.MillisecondDefaultReconnectJitterTLS = time.SecondDefaultTimeout = 2 * time.SecondDefaultPingInterval = 2 * time.MinuteDefaultMaxPingOut = 2DefaultMaxChanLen = 64 * 1024// 64kDefaultReconnectBufSize = 8 * 1024 * 1024// 8MBRequestChanLen = 8DefaultDrainTimeout = 30 * time.SecondDefaultFlusherTimeout = time.MinuteLangString = "go")const (// STALE_CONNECTION is for detection and proper handling of stale connections.STALE_CONNECTION = "stale connection"// PERMISSIONS_ERR is for when nats server subject authorization has failed.PERMISSIONS_ERR = "permissions violation"// AUTHORIZATION_ERR is for when nats server user authorization has failed.AUTHORIZATION_ERR = "authorization violation"// AUTHENTICATION_EXPIRED_ERR is for when nats server user authorization has expired.AUTHENTICATION_EXPIRED_ERR = "user authentication expired"// AUTHENTICATION_REVOKED_ERR is for when user authorization has been revoked.AUTHENTICATION_REVOKED_ERR = "user authentication revoked"// ACCOUNT_AUTHENTICATION_EXPIRED_ERR is for when nats server account authorization has expired.ACCOUNT_AUTHENTICATION_EXPIRED_ERR = "account authentication expired"// MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limitMAX_CONNECTIONS_ERR = "maximum connections exceeded"// MAX_SUBSCRIPTIONS_ERR is for when nats server denies the connection due to server subscriptions limitMAX_SUBSCRIPTIONS_ERR = "maximum subscriptions exceeded")// Errorsvar (ErrConnectionClosed = errors.New("nats: connection closed")ErrConnectionDraining = errors.New("nats: connection draining")ErrDrainTimeout = errors.New("nats: draining connection timed out")ErrConnectionReconnecting = errors.New("nats: connection reconnecting")ErrSecureConnRequired = errors.New("nats: secure connection required")ErrSecureConnWanted = errors.New("nats: secure connection not available")ErrBadSubscription = errors.New("nats: invalid subscription")ErrTypeSubscription = errors.New("nats: invalid subscription type")ErrBadSubject = errors.New("nats: invalid subject")ErrBadQueueName = errors.New("nats: invalid queue name")ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")ErrTimeout = errors.New("nats: timeout")ErrBadTimeout = errors.New("nats: timeout invalid")ErrAuthorization = errors.New("nats: authorization violation")ErrAuthExpired = errors.New("nats: authentication expired")ErrAuthRevoked = errors.New("nats: authentication revoked")ErrPermissionViolation = errors.New("nats: permissions violation")ErrAccountAuthExpired = errors.New("nats: account authentication expired")ErrNoServers = errors.New("nats: no servers available for connection")ErrJsonParse = errors.New("nats: connect message, json parse error")ErrChanArg = errors.New("nats: argument needs to be a channel type")ErrMaxPayload = errors.New("nats: maximum payload exceeded")ErrMaxMessages = errors.New("nats: maximum messages delivered")ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")ErrClientCertOrRootCAsRequired = errors.New("nats: at least one of certCB or rootCAsCB must be set")ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")ErrInvalidConnection = errors.New("nats: invalid connection")ErrInvalidMsg = errors.New("nats: invalid message or message nil")ErrInvalidArg = errors.New("nats: invalid argument")ErrInvalidContext = errors.New("nats: invalid context")ErrNoDeadlineContext = errors.New("nats: context requires a deadline")ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server")ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler")ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler")ErrNoUserCB = errors.New("nats: user callback not defined")ErrNkeyAndUser = errors.New("nats: user callback and nkey defined")ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server")ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)ErrTokenAlreadySet = errors.New("nats: token and token handler both set")ErrUserInfoAlreadySet = errors.New("nats: cannot set user info callback and user/pass")ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")ErrMsgNoReply = errors.New("nats: message does not have a reply")ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")ErrDisconnected = errors.New("nats: server is disconnected")ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")ErrBadHeaderMsg = errors.New("nats: message could not decode headers")ErrNoResponders = errors.New("nats: no responders available for request")ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")ErrConnectionNotTLS = errors.New("nats: connection is not tls")ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded"))// GetDefaultOptions returns default configuration options for the client.func () Options {returnOptions{AllowReconnect: true,MaxReconnect: DefaultMaxReconnect,ReconnectWait: DefaultReconnectWait,ReconnectJitter: DefaultReconnectJitter,ReconnectJitterTLS: DefaultReconnectJitterTLS,Timeout: DefaultTimeout,PingInterval: DefaultPingInterval,MaxPingsOut: DefaultMaxPingOut,SubChanLen: DefaultMaxChanLen,ReconnectBufSize: DefaultReconnectBufSize,DrainTimeout: DefaultDrainTimeout,FlusherTimeout: DefaultFlusherTimeout, }}// Deprecated: Use GetDefaultOptions() instead.// DefaultOptions is not safe for use by multiple clients.// For details see #308.varDefaultOptions = GetDefaultOptions()// Status represents the state of the connection.typeStatusintconst (DISCONNECTED = Status(iota)CONNECTEDCLOSEDRECONNECTINGCONNECTINGDRAINING_SUBSDRAINING_PUBS)func ( Status) () string {switch {caseDISCONNECTED:return"DISCONNECTED"caseCONNECTED:return"CONNECTED"caseCLOSED:return"CLOSED"caseRECONNECTING:return"RECONNECTING"caseCONNECTING:return"CONNECTING"caseDRAINING_SUBS:return"DRAINING_SUBS"caseDRAINING_PUBS:return"DRAINING_PUBS" }return"unknown status"}// ConnHandler is used for asynchronous events such as// disconnected and closed connections.typeConnHandlerfunc(*Conn)// ConnErrHandler is used to process asynchronous events like// disconnected connection with the error (if any).typeConnErrHandlerfunc(*Conn, error)// ErrHandler is used to process asynchronous errors encountered// while processing inbound messages.typeErrHandlerfunc(*Conn, *Subscription, error)// UserJWTHandler is used to fetch and return the account signed// JWT for this user.typeUserJWTHandlerfunc() (string, error)// TLSCertHandler is used to fetch and return tls certificate.typeTLSCertHandlerfunc() (tls.Certificate, error)// RootCAsHandler is used to fetch and return a set of root certificate// authorities that clients use when verifying server certificates.typeRootCAsHandlerfunc() (*x509.CertPool, error)// SignatureHandler is used to sign a nonce from the server while// authenticating with nkeys. The user should sign the nonce and// return the raw signature. The client will base64 encode this to// send to the server.typeSignatureHandlerfunc([]byte) ([]byte, error)// AuthTokenHandler is used to generate a new token.typeAuthTokenHandlerfunc() string// UserInfoCB is used to pass the username and password when establishing connection.typeUserInfoCBfunc() (string, string)// ReconnectDelayHandler is used to get from the user the desired// delay the library should pause before attempting to reconnect// again. Note that this is invoked after the library tried the// whole list of URLs and failed to reconnect.typeReconnectDelayHandlerfunc(attempts int) time.Duration// asyncCB is used to preserve order for async callbacks.type asyncCB struct { f func() next *asyncCB}type asyncCallbacksHandler struct { mu sync.Mutex cond *sync.Cond head *asyncCB tail *asyncCB}// Option is a function on the options for a connection.typeOptionfunc(*Options) error// CustomDialer can be used to specify any dialer, not necessarily a// *net.Dialer. A CustomDialer may also implement `SkipTLSHandshake() bool`// in order to skip the TLS handshake in case not required.typeCustomDialerinterface {Dial(network, address string) (net.Conn, error)}typeInProcessConnProviderinterface {InProcessConn() (net.Conn, error)}// Options can be used to create a customized connection.typeOptionsstruct {// Url represents a single NATS server url to which the client // will be connecting. If the Servers option is also set, it // then becomes the first server in the Servers array. Url string// InProcessServer represents a NATS server running within the // same process. If this is set then we will attempt to connect // to the server directly rather than using external TCP conns. InProcessServer InProcessConnProvider// Servers is a configured set of servers which this client // will use when attempting to connect. Servers []string// NoRandomize configures whether we will randomize the // server pool. NoRandomize bool// NoEcho configures whether the server will echo back messages // that are sent on this connection if we also have matching subscriptions. // Note this is supported on servers >= version 1.2. Proto 1 or greater. NoEcho bool// Name is an optional name label which will be sent to the server // on CONNECT to identify the client. Name string// Verbose signals the server to send an OK ack for commands // successfully processed by the server. Verbose bool// Pedantic signals the server whether it should be doing further // validation of subjects. Pedantic bool// Secure enables TLS secure connections that skip server // verification by default. NOT RECOMMENDED. Secure bool// TLSConfig is a custom TLS configuration to use for secure // transports. TLSConfig *tls.Config// TLSCertCB is used to fetch and return custom tls certificate. TLSCertCB TLSCertHandler// TLSHandshakeFirst is used to instruct the library perform // the TLS handshake right after the connect and before receiving // the INFO protocol from the server. If this option is enabled // but the server is not configured to perform the TLS handshake // first, the connection will fail. TLSHandshakeFirst bool// RootCAsCB is used to fetch and return a set of root certificate // authorities that clients use when verifying server certificates. RootCAsCB RootCAsHandler// AllowReconnect enables reconnection logic to be used when we // encounter a disconnect from the current server. AllowReconnect bool// MaxReconnect sets the number of reconnect attempts that will be // tried before giving up. If negative, then it will never give up // trying to reconnect. // Defaults to 60. MaxReconnect int// ReconnectWait sets the time to backoff after attempting a reconnect // to a server that we were already connected to previously. // Defaults to 2s. ReconnectWait time.Duration// CustomReconnectDelayCB is invoked after the library tried every // URL in the server list and failed to reconnect. It passes to the // user the current number of attempts. This function returns the // amount of time the library will sleep before attempting to reconnect // again. It is strongly recommended that this value contains some // jitter to prevent all connections to attempt reconnecting at the same time. CustomReconnectDelayCB ReconnectDelayHandler// ReconnectJitter sets the upper bound for a random delay added to // ReconnectWait during a reconnect when no TLS is used. // Defaults to 100ms. ReconnectJitter time.Duration// ReconnectJitterTLS sets the upper bound for a random delay added to // ReconnectWait during a reconnect when TLS is used. // Defaults to 1s. ReconnectJitterTLS time.Duration// Timeout sets the timeout for a Dial operation on a connection. // Defaults to 2s. Timeout time.Duration// DrainTimeout sets the timeout for a Drain Operation to complete. // Defaults to 30s. DrainTimeout time.Duration// FlusherTimeout is the maximum time to wait for write operations // to the underlying connection to complete (including the flusher loop). // Defaults to 1m. FlusherTimeout time.Duration// PingInterval is the period at which the client will be sending ping // commands to the server, disabled if 0 or negative. // Defaults to 2m. PingInterval time.Duration// MaxPingsOut is the maximum number of pending ping commands that can // be awaiting a response before raising an ErrStaleConnection error. // Defaults to 2. MaxPingsOut int// ClosedCB sets the closed handler that is called when a client will // no longer be connected. ClosedCB ConnHandler// DisconnectedCB sets the disconnected handler that is called // whenever the connection is disconnected. // Will not be called if DisconnectedErrCB is set // Deprecated. Use DisconnectedErrCB which passes error that caused // the disconnect event. DisconnectedCB ConnHandler// DisconnectedErrCB sets the disconnected error handler that is called // whenever the connection is disconnected. // Disconnected error could be nil, for instance when user explicitly closes the connection. // DisconnectedCB will not be called if DisconnectedErrCB is set DisconnectedErrCB ConnErrHandler// ConnectedCB sets the connected handler called when the initial connection // is established. It is not invoked on successful reconnects - for reconnections, // use ReconnectedCB. ConnectedCB can be used in conjunction with RetryOnFailedConnect // to detect whether the initial connect was successful. ConnectedCB ConnHandler// ReconnectedCB sets the reconnected handler called whenever // the connection is successfully reconnected. ReconnectedCB ConnHandler// DiscoveredServersCB sets the callback that is invoked whenever a new // server has joined the cluster. DiscoveredServersCB ConnHandler// AsyncErrorCB sets the async error handler (e.g. slow consumer errors) AsyncErrorCB ErrHandler// ReconnectErrCB sets the callback that is invoked whenever a // reconnect attempt failed ReconnectErrCB ConnErrHandler// ReconnectBufSize is the size of the backing bufio during reconnect. // Once this has been exhausted publish operations will return an error. // Defaults to 8388608 bytes (8MB). ReconnectBufSize int// SubChanLen is the size of the buffered channel used between the socket // Go routine and the message delivery for SyncSubscriptions. // NOTE: This does not affect AsyncSubscriptions which are // dictated by PendingLimits() // Defaults to 65536. SubChanLen int// UserJWT sets the callback handler that will fetch a user's JWT. UserJWT UserJWTHandler// Nkey sets the public nkey that will be used to authenticate // when connecting to the server. UserJWT and Nkey are mutually exclusive // and if defined, UserJWT will take precedence. Nkey string// SignatureCB designates the function used to sign the nonce // presented from the server. SignatureCB SignatureHandler// User sets the username to be used when connecting to the server. User string// Password sets the password to be used when connecting to a server. Password string// UserInfo sets the callback handler that will fetch the username and password. UserInfo UserInfoCB// Token sets the token to be used when connecting to a server. Token string// TokenHandler designates the function used to generate the token to be used when connecting to a server. TokenHandler AuthTokenHandler// Dialer allows a custom net.Dialer when forming connections. // Deprecated: should use CustomDialer instead. Dialer *net.Dialer// CustomDialer allows to specify a custom dialer (not necessarily // a *net.Dialer). CustomDialer CustomDialer// UseOldRequestStyle forces the old method of Requests that utilize // a new Inbox and a new Subscription for each request. UseOldRequestStyle bool// NoCallbacksAfterClientClose allows preventing the invocation of // callbacks after Close() is called. Client won't receive notifications // when Close is invoked by user code. Default is to invoke the callbacks. NoCallbacksAfterClientClose bool// LameDuckModeHandler sets the callback to invoke when the server notifies // the connection that it entered lame duck mode, that is, going to // gradually disconnect all its connections before shutting down. This is // often used in deployments when upgrading NATS Servers. LameDuckModeHandler ConnHandler// RetryOnFailedConnect sets the connection in reconnecting state right // away if it can't connect to a server in the initial set. The // MaxReconnect and ReconnectWait options are used for this process, // similarly to when an established connection is disconnected. // If a ReconnectHandler is set, it will be invoked on the first // successful reconnect attempt (if the initial connect fails), // and if a ClosedHandler is set, it will be invoked if // it fails to connect (after exhausting the MaxReconnect attempts). RetryOnFailedConnect bool// For websocket connections, indicates to the server that the connection // supports compression. If the server does too, then data will be compressed. Compression bool// For websocket connections, adds a path to connections url. // This is useful when connecting to NATS behind a proxy. ProxyPath string// InboxPrefix allows the default _INBOX prefix to be customized InboxPrefix string// IgnoreAuthErrorAbort - if set to true, client opts out of the default connect behavior of aborting // subsequent reconnect attempts if server returns the same auth error twice (regardless of reconnect policy). IgnoreAuthErrorAbort bool// SkipHostLookup skips the DNS lookup for the server hostname. SkipHostLookup bool// PermissionErrOnSubscribe - if set to true, the client will return ErrPermissionViolation // from SubscribeSync if the server returns a permissions error for a subscription. // Defaults to false. PermissionErrOnSubscribe bool}const (// Scratch storage for assembling protocol headers scratchSize = 512// The size of the bufio reader/writer on top of the socket. defaultBufSize = 32768// The buffered size of the flush "kick" channel flushChanSize = 1// Default server pool size srvPoolSize = 4// NUID size nuidSize = 22// Default ports used if none is specified in given URL(s) defaultWSPortString = "80" defaultWSSPortString = "443" defaultPortString = "4222")// A Conn represents a bare connection to a nats-server.// It can send and receive []byte payloads.// The connection is safe to use in multiple Go routines concurrently.typeConnstruct {// Keep all members for which we use atomic at the beginning of the // struct and make sure they are all 64bits (or use padding if necessary). // atomic.* functions crash on 32bit machines if operand is not aligned // at 64bit. See https://github.com/golang/go/issues/599Statistics mu sync.RWMutex// Opts holds the configuration of the Conn. // Modifying the configuration of a running Conn is a race. Opts Options wg sync.WaitGroup srvPool []*srv current *srv urls map[string]struct{} // Keep track of all known URLs (used by processInfo) conn net.Conn bw *natsWriter br *natsReader fch chanstruct{} info serverInfo ssid int64 subsMu sync.RWMutex subs map[int64]*Subscription ach *asyncCallbacksHandler pongs []chanstruct{} scratch [scratchSize]byte status Status statListeners map[Status]map[chanStatus]struct{} initc bool// true if the connection is performing the initial connect err error ps *parseState ptmr *time.Timer pout int ar bool// abort reconnect rqch chanstruct{} ws bool// true if a websocket connection// New style response handler respSub string// The wildcard subject respSubPrefix string// the wildcard prefix including trailing . respSubLen int// the length of the wildcard prefix excluding trailing . respMux *Subscription// A single response subscription respMap map[string]chan *Msg// Request map for the response msg channels respRand *rand.Rand// Used for generating suffix// Msg filters for testing. // Protected by subsMu filters map[string]msgFilter}type natsReader struct { r io.Reader buf []byte off int n int}type natsWriter struct { w io.Writer bufs []byte limit int pending *bytes.Buffer plimit int}// Subscription represents interest in a given subject.typeSubscriptionstruct { mu sync.Mutex sid int64// Subject that represents this subscription. This can be different // than the received subject inside a Msg if this is a wildcard. Subject string// Optional queue group name. If present, all subscriptions with the // same name will form a distributed queue, and each message will // only be processed by one member of the group. Queue string// For holding information about a JetStream consumer. jsi *jsSub delivered uint64 max uint64 conn *Conn mcb MsgHandler mch chan *Msg errCh chan (error) closed bool sc bool connClosed bool draining bool status SubStatus statListeners map[chanSubStatus][]SubStatus permissionsErr error// Type of Subscription typ SubscriptionType// Async linked list pHead *Msg pTail *Msg pCond *sync.Cond pDone func(subject string)// Pending stats, async subscriptions, high-speed etc. pMsgs int pBytes int pMsgsMax int pBytesMax int pMsgsLimit int pBytesLimit int dropped int}// Status represents the state of the connection.typeSubStatusintconst (SubscriptionActive = SubStatus(iota)SubscriptionDrainingSubscriptionClosedSubscriptionSlowConsumer)func ( SubStatus) () string {switch {caseSubscriptionActive:return"Active"caseSubscriptionDraining:return"Draining"caseSubscriptionClosed:return"Closed"caseSubscriptionSlowConsumer:return"SlowConsumer" }return"unknown status"}// Msg represents a message delivered by NATS. This structure is used// by Subscribers and PublishMsg().//// # Types of Acknowledgements//// In case using JetStream, there are multiple ways to ack a Msg://// // Acknowledgement that a message has been processed.// msg.Ack()//// // Negatively acknowledges a message.// msg.Nak()//// // Terminate a message so that it is not redelivered further.// msg.Term()//// // Signal the server that the message is being worked on and reset redelivery timer.// msg.InProgress()typeMsgstruct { Subject string Reply string Header Header Data []byte Sub *Subscription// Internal next *Msg wsz int barrier *barrierInfo ackd uint32}// Compares two msgs, ignores sub but checks all other public fields.func ( *Msg) ( *Msg) bool {if == {returntrue }if == nil || == nil {returnfalse }if .Subject != .Subject || .Reply != .Reply {returnfalse }if !bytes.Equal(.Data, .Data) {returnfalse }iflen(.Header) != len(.Header) {returnfalse }for , := range .Header { , := .Header[]if ! || len() != len() {returnfalse }for , := range {if != [] {returnfalse } } }returntrue}// Size returns a message size in bytes.func ( *Msg) () int {if .wsz != 0 {return .wsz } , := .headerBytes()returnlen(.Subject) + len(.Reply) + len() + len(.Data)}func ( *Msg) () ([]byte, error) {var []byteiflen(.Header) == 0 {return , nil }varbytes.Buffer , := .WriteString(hdrLine)if != nil {returnnil, ErrBadHeaderMsg } = http.Header(.Header).Write(&)if != nil {returnnil, ErrBadHeaderMsg } _, = .WriteString(crlf)if != nil {returnnil, ErrBadHeaderMsg }return .Bytes(), nil}type barrierInfo struct { refs int64 f func()}// Tracks various stats received and sent on this connection,// including counts for messages and bytes.typeStatisticsstruct { InMsgs uint64 OutMsgs uint64 InBytes uint64 OutBytes uint64 Reconnects uint64}// Tracks individual backend servers.type srv struct { url *url.URL didConnect bool reconnects int lastErr error isImplicit bool tlsName string}// The INFO block received from the server.type serverInfo struct { ID string`json:"server_id"` Name string`json:"server_name"` Proto int`json:"proto"` Version string`json:"version"` Host string`json:"host"` Port int`json:"port"` Headers bool`json:"headers"` AuthRequired bool`json:"auth_required,omitempty"` TLSRequired bool`json:"tls_required,omitempty"` TLSAvailable bool`json:"tls_available,omitempty"` MaxPayload int64`json:"max_payload"` CID uint64`json:"client_id,omitempty"` ClientIP string`json:"client_ip,omitempty"` Nonce string`json:"nonce,omitempty"` Cluster string`json:"cluster,omitempty"` ConnectURLs []string`json:"connect_urls,omitempty"` LameDuckMode bool`json:"ldm,omitempty"`}const (// clientProtoZero is the original client protocol from 2009. // http://nats.io/documentation/internals/nats-protocol/ /* clientProtoZero */ _ = iota// clientProtoInfo signals a client can receive more then the original INFO block. // This can be used to update clients on other cluster members, etc. clientProtoInfo)type connectInfo struct { Verbose bool`json:"verbose"` Pedantic bool`json:"pedantic"` UserJWT string`json:"jwt,omitempty"` Nkey string`json:"nkey,omitempty"` Signature string`json:"sig,omitempty"` User string`json:"user,omitempty"` Pass string`json:"pass,omitempty"` Token string`json:"auth_token,omitempty"` TLS bool`json:"tls_required"` Name string`json:"name"` Lang string`json:"lang"` Version string`json:"version"` Protocol int`json:"protocol"` Echo bool`json:"echo"` Headers bool`json:"headers"` NoResponders bool`json:"no_responders"`}// MsgHandler is a callback function that processes messages delivered to// asynchronous subscribers.typeMsgHandlerfunc(msg *Msg)// Connect will attempt to connect to the NATS system.// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222// Comma separated arrays are also supported, e.g. urlA, urlB.// Options start with the defaults but can be overridden.// To connect to a NATS Server's websocket port, use the `ws` or `wss` scheme, such as// `ws://localhost:8080`. Note that websocket schemes cannot be mixed with others (nats/tls).func ( string, ...Option) (*Conn, error) { := GetDefaultOptions() .Servers = processUrlString()for , := range {if != nil {if := (&); != nil {returnnil, } } }return .Connect()}// Options that can be passed to Connect.// Name is an Option to set the client name.func ( string) Option {returnfunc( *Options) error { .Name = returnnil }}// InProcessServer is an Option that will try to establish a direction to a NATS server// running within the process instead of dialing via TCP.func ( InProcessConnProvider) Option {returnfunc( *Options) error { .InProcessServer = returnnil }}// Secure is an Option to enable TLS secure connections that skip server verification by default.// Pass a TLS Configuration for proper TLS.// A TLS Configuration using InsecureSkipVerify should NOT be used in a production setting.func ( ...*tls.Config) Option {returnfunc( *Options) error { .Secure = true// Use of variadic just simplifies testing scenarios. We only take the first one.iflen() > 1 {returnErrMultipleTLSConfigs }iflen() == 1 { .TLSConfig = [0] }returnnil }}// ClientTLSConfig is an Option to set the TLS configuration for secure// connections. It can be used to e.g. set TLS config with cert and root CAs// from memory. For simple use case of loading cert and CAs from file,// ClientCert and RootCAs options are more convenient.// If Secure is not already set this will set it as well.func ( TLSCertHandler, RootCAsHandler) Option {returnfunc( *Options) error { .Secure = trueif == nil && == nil {returnErrClientCertOrRootCAsRequired }// Smoke test the callbacks to fail early // if they are not valid.if != nil {if , := (); != nil {return } }if != nil {if , := (); != nil {return } }if .TLSConfig == nil { .TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} } .TLSCertCB = .RootCAsCB = returnnil }}// RootCAs is a helper option to provide the RootCAs pool from a list of filenames.// If Secure is not already set this will set it as well.func ( ...string) Option {returnfunc( *Options) error { := func() (*x509.CertPool, error) { := x509.NewCertPool()for , := range { , := os.ReadFile()if != nil || == nil {returnnil, fmt.Errorf("nats: error loading or parsing rootCA file: %w", ) } := .AppendCertsFromPEM()if ! {returnnil, fmt.Errorf("nats: failed to parse root certificate from %q", ) } }return , nil }if .TLSConfig == nil { .TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} }if , := (); != nil {return } .RootCAsCB = .Secure = truereturnnil }}// ClientCert is a helper option to provide the client certificate from a file.// If Secure is not already set this will set it as well.func (, string) Option {returnfunc( *Options) error { := func() (tls.Certificate, error) { , := tls.LoadX509KeyPair(, )if != nil {returntls.Certificate{}, fmt.Errorf("nats: error loading client certificate: %w", ) } .Leaf, = x509.ParseCertificate(.Certificate[0])if != nil {returntls.Certificate{}, fmt.Errorf("nats: error parsing client certificate: %w", ) }return , nil }if .TLSConfig == nil { .TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} }if , := (); != nil {return } .TLSCertCB = .Secure = truereturnnil }}// NoReconnect is an Option to turn off reconnect behavior.func () Option {returnfunc( *Options) error { .AllowReconnect = falsereturnnil }}// DontRandomize is an Option to turn off randomizing the server pool.func () Option {returnfunc( *Options) error { .NoRandomize = truereturnnil }}// NoEcho is an Option to turn off messages echoing back from a server.// Note this is supported on servers >= version 1.2. Proto 1 or greater.func () Option {returnfunc( *Options) error { .NoEcho = truereturnnil }}// ReconnectWait is an Option to set the wait time between reconnect attempts.// Defaults to 2s.func ( time.Duration) Option {returnfunc( *Options) error { .ReconnectWait = returnnil }}// MaxReconnects is an Option to set the maximum number of reconnect attempts.// If negative, it will never stop trying to reconnect.// Defaults to 60.func ( int) Option {returnfunc( *Options) error { .MaxReconnect = returnnil }}// ReconnectJitter is an Option to set the upper bound of a random delay added ReconnectWait.// Defaults to 100ms and 1s, respectively.func (, time.Duration) Option {returnfunc( *Options) error { .ReconnectJitter = .ReconnectJitterTLS = returnnil }}// CustomReconnectDelay is an Option to set the CustomReconnectDelayCB option.// See CustomReconnectDelayCB Option for more details.func ( ReconnectDelayHandler) Option {returnfunc( *Options) error { .CustomReconnectDelayCB = returnnil }}// PingInterval is an Option to set the period for client ping commands.// Defaults to 2m.func ( time.Duration) Option {returnfunc( *Options) error { .PingInterval = returnnil }}// MaxPingsOutstanding is an Option to set the maximum number of ping requests// that can go unanswered by the server before closing the connection.// Defaults to 2.func ( int) Option {returnfunc( *Options) error { .MaxPingsOut = returnnil }}// ReconnectBufSize sets the buffer size of messages kept while busy reconnecting.// Defaults to 8388608 bytes (8MB). It can be disabled by setting it to -1.func ( int) Option {returnfunc( *Options) error { .ReconnectBufSize = returnnil }}// Timeout is an Option to set the timeout for Dial on a connection.// Defaults to 2s.func ( time.Duration) Option {returnfunc( *Options) error { .Timeout = returnnil }}// FlusherTimeout is an Option to set the write (and flush) timeout on a connection.func ( time.Duration) Option {returnfunc( *Options) error { .FlusherTimeout = returnnil }}// DrainTimeout is an Option to set the timeout for draining a connection.// Defaults to 30s.func ( time.Duration) Option {returnfunc( *Options) error { .DrainTimeout = returnnil }}// DisconnectErrHandler is an Option to set the disconnected error handler.func ( ConnErrHandler) Option {returnfunc( *Options) error { .DisconnectedErrCB = returnnil }}// DisconnectHandler is an Option to set the disconnected handler.// Deprecated: Use DisconnectErrHandler.func ( ConnHandler) Option {returnfunc( *Options) error { .DisconnectedCB = returnnil }}// ConnectHandler is an Option to set the connected handler.func ( ConnHandler) Option {returnfunc( *Options) error { .ConnectedCB = returnnil }}// ReconnectHandler is an Option to set the reconnected handler.func ( ConnHandler) Option {returnfunc( *Options) error { .ReconnectedCB = returnnil }}// ReconnectErrHandler is an Option to set the reconnect error handler.func ( ConnErrHandler) Option {returnfunc( *Options) error { .ReconnectErrCB = returnnil }}// ClosedHandler is an Option to set the closed handler.func ( ConnHandler) Option {returnfunc( *Options) error { .ClosedCB = returnnil }}// DiscoveredServersHandler is an Option to set the new servers handler.func ( ConnHandler) Option {returnfunc( *Options) error { .DiscoveredServersCB = returnnil }}// ErrorHandler is an Option to set the async error handler.func ( ErrHandler) Option {returnfunc( *Options) error { .AsyncErrorCB = returnnil }}// UserInfo is an Option to set the username and password to// use when not included directly in the URLs.func (, string) Option {returnfunc( *Options) error { .User = .Password = returnnil }}func ( UserInfoCB) Option {returnfunc( *Options) error { .UserInfo = returnnil }}// Token is an Option to set the token to use// when a token is not included directly in the URLs// and when a token handler is not provided.func ( string) Option {returnfunc( *Options) error {if .TokenHandler != nil {returnErrTokenAlreadySet } .Token = returnnil }}// TokenHandler is an Option to set the token handler to use// when a token is not included directly in the URLs// and when a token is not set.func ( AuthTokenHandler) Option {returnfunc( *Options) error {if .Token != "" {returnErrTokenAlreadySet } .TokenHandler = returnnil }}// UserCredentials is a convenience function that takes a filename// for a user's JWT and a filename for the user's private Nkey seed.func ( string, ...string) Option { := func() (string, error) {returnuserFromFile() }varstringiflen() > 0 { = [0] } else { = } := func( []byte) ([]byte, error) {returnsigHandler(, ) }returnUserJWT(, )}// UserJWTAndSeed is a convenience function that takes the JWT and seed// values as strings.func ( string, string) Option { := func() (string, error) {return , nil } := func( []byte) ([]byte, error) { , := nkeys.FromSeed([]byte())if != nil {returnnil, fmt.Errorf("unable to extract key pair from seed: %w", ) }// Wipe our key on exit.defer .Wipe() , := .Sign()return , nil }returnUserJWT(, )}// UserJWT will set the callbacks to retrieve the user's JWT and// the signature callback to sign the server nonce. This an the Nkey// option are mutually exclusive.func ( UserJWTHandler, SignatureHandler) Option {returnfunc( *Options) error {if == nil {returnErrNoUserCB }if == nil {returnErrUserButNoSigCB }// Smoke test the user callback to ensure it is setup properly // when processing options.if , := (); != nil {return } .UserJWT = .SignatureCB = returnnil }}// Nkey will set the public Nkey and the signature callback to// sign the server nonce.func ( string, SignatureHandler) Option {returnfunc( *Options) error { .Nkey = .SignatureCB = if != "" && == nil {returnErrNkeyButNoSigCB }returnnil }}// SyncQueueLen will set the maximum queue len for the internal// channel used for SubscribeSync().// Defaults to 65536.func ( int) Option {returnfunc( *Options) error { .SubChanLen = returnnil }}// Dialer is an Option to set the dialer which will be used when// attempting to establish a connection.// Deprecated: Should use CustomDialer instead.func ( *net.Dialer) Option {returnfunc( *Options) error { .Dialer = returnnil }}// SetCustomDialer is an Option to set a custom dialer which will be// used when attempting to establish a connection. If both Dialer// and CustomDialer are specified, CustomDialer takes precedence.func ( CustomDialer) Option {returnfunc( *Options) error { .CustomDialer = returnnil }}// UseOldRequestStyle is an Option to force usage of the old Request style.func () Option {returnfunc( *Options) error { .UseOldRequestStyle = truereturnnil }}// NoCallbacksAfterClientClose is an Option to disable callbacks when user code// calls Close(). If close is initiated by any other condition, callbacks// if any will be invoked.func () Option {returnfunc( *Options) error { .NoCallbacksAfterClientClose = truereturnnil }}// LameDuckModeHandler sets the callback to invoke when the server notifies// the connection that it entered lame duck mode, that is, going to// gradually disconnect all its connections before shutting down. This is// often used in deployments when upgrading NATS Servers.func ( ConnHandler) Option {returnfunc( *Options) error { .LameDuckModeHandler = returnnil }}// RetryOnFailedConnect sets the connection in reconnecting state right away// if it can't connect to a server in the initial set.// See RetryOnFailedConnect option for more details.func ( bool) Option {returnfunc( *Options) error { .RetryOnFailedConnect = returnnil }}// Compression is an Option to indicate if this connection supports// compression. Currently only supported for Websocket connections.func ( bool) Option {returnfunc( *Options) error { .Compression = returnnil }}// ProxyPath is an option for websocket connections that adds a path to connections url.// This is useful when connecting to NATS behind a proxy.func ( string) Option {returnfunc( *Options) error { .ProxyPath = returnnil }}// CustomInboxPrefix configures the request + reply inbox prefixfunc ( string) Option {returnfunc( *Options) error {if == "" || strings.Contains(, ">") || strings.Contains(, "*") || strings.HasSuffix(, ".") {returnerrors.New("nats: invalid custom prefix") } .InboxPrefix = returnnil }}// IgnoreAuthErrorAbort opts out of the default connect behavior of aborting// subsequent reconnect attempts if server returns the same auth error twice.func () Option {returnfunc( *Options) error { .IgnoreAuthErrorAbort = truereturnnil }}// SkipHostLookup is an Option to skip the host lookup when connecting to a server.func () Option {returnfunc( *Options) error { .SkipHostLookup = truereturnnil }}func ( bool) Option {returnfunc( *Options) error { .PermissionErrOnSubscribe = returnnil }}// TLSHandshakeFirst is an Option to perform the TLS handshake first, that is// before receiving the INFO protocol. This requires the server to also be// configured with such option, otherwise the connection will fail.func () Option {returnfunc( *Options) error { .TLSHandshakeFirst = true .Secure = truereturnnil }}// Handler processing// SetDisconnectHandler will set the disconnect event handler.// Deprecated: Use SetDisconnectErrHandlerfunc ( *Conn) ( ConnHandler) {if == nil {return } .mu.Lock()defer .mu.Unlock() .Opts.DisconnectedCB = }// SetDisconnectErrHandler will set the disconnect event handler.func ( *Conn) ( ConnErrHandler) {if == nil {return } .mu.Lock()defer .mu.Unlock() .Opts.DisconnectedErrCB = }// DisconnectErrHandler will return the disconnect event handler.func ( *Conn) () ConnErrHandler {if == nil {returnnil } .mu.Lock()defer .mu.Unlock()return .Opts.DisconnectedErrCB}// SetReconnectHandler will set the reconnect event handler.func ( *Conn) ( ConnHandler) {if == nil {return } .mu.Lock()defer .mu.Unlock() .Opts.ReconnectedCB = }// ReconnectHandler will return the reconnect event handler.func ( *Conn) () ConnHandler {if == nil {returnnil } .mu.Lock()defer .mu.Unlock()return .Opts.ReconnectedCB}// SetDiscoveredServersHandler will set the discovered servers handler.func ( *Conn) ( ConnHandler) {if == nil {return } .mu.Lock()defer .mu.Unlock() .Opts.DiscoveredServersCB = }// DiscoveredServersHandler will return the discovered servers handler.func ( *Conn) () ConnHandler {if == nil {returnnil } .mu.Lock()defer .mu.Unlock()return .Opts.DiscoveredServersCB}// SetClosedHandler will set the closed event handler.func ( *Conn) ( ConnHandler) {if == nil {return } .mu.Lock()defer .mu.Unlock() .Opts.ClosedCB = }// ClosedHandler will return the closed event handler.func ( *Conn) () ConnHandler {if == nil {returnnil } .mu.Lock()defer .mu.Unlock()return .Opts.ClosedCB}// SetErrorHandler will set the async error handler.func ( *Conn) ( ErrHandler) {if == nil {return } .mu.Lock()defer .mu.Unlock() .Opts.AsyncErrorCB = }// ErrorHandler will return the async error handler.func ( *Conn) () ErrHandler {if == nil {returnnil } .mu.Lock()defer .mu.Unlock()return .Opts.AsyncErrorCB}// Process the url string argument to Connect.// Return an array of urls, even if only one.func processUrlString( string) []string { := strings.Split(, ",")varintfor , := range { := strings.TrimSuffix(strings.TrimSpace(), "/")iflen() > 0 { [] = ++ } }return [:]}// Connect will attempt to connect to a NATS server with multiple options.func ( Options) () (*Conn, error) { := &Conn{Opts: }// Some default options processing.if .Opts.MaxPingsOut == 0 { .Opts.MaxPingsOut = DefaultMaxPingOut }// Allow old default for channel length to work correctly.if .Opts.SubChanLen == 0 { .Opts.SubChanLen = DefaultMaxChanLen }// Default ReconnectBufSizeif .Opts.ReconnectBufSize == 0 { .Opts.ReconnectBufSize = DefaultReconnectBufSize }// Ensure that Timeout is not 0if .Opts.Timeout == 0 { .Opts.Timeout = DefaultTimeout }// Check first for user jwt callback being defined and nkey.if .Opts.UserJWT != nil && .Opts.Nkey != "" {returnnil, ErrNkeyAndUser }// Check if we have an nkey but no signature callback defined.if .Opts.Nkey != "" && .Opts.SignatureCB == nil {returnnil, ErrNkeyButNoSigCB }// Allow custom Dialer for connecting using a timeout by defaultif .Opts.Dialer == nil { .Opts.Dialer = &net.Dialer{Timeout: .Opts.Timeout, } }// If the TLSHandshakeFirst option is specified, make sure that // the Secure boolean is true.if .Opts.TLSHandshakeFirst { .Opts.Secure = true }if := .setupServerPool(); != nil {returnnil, }// Create the async callback handler. .ach = &asyncCallbacksHandler{} .ach.cond = sync.NewCond(&.ach.mu)// Set a default error handler that will print to stderr.if .Opts.AsyncErrorCB == nil { .Opts.AsyncErrorCB = defaultErrHandler }// Create reader/writer .newReaderWriter() , := .connect()if != nil {returnnil, }// Spin up the async cb dispatcher on successgo .ach.asyncCBDispatcher()if && .Opts.ConnectedCB != nil { .ach.push(func() { .Opts.ConnectedCB() }) }return , nil}func defaultErrHandler( *Conn, *Subscription, error) {varuint64if != nil { .mu.RLock() = .info.CID .mu.RUnlock() }varstringif != nil {varstring .mu.Lock()if .jsi != nil { = .jsi.psubj } else { = .Subject } .mu.Unlock() = fmt.Sprintf("%s on connection [%d] for subscription on %q\n", .Error(), , ) } else { = fmt.Sprintf("%s on connection [%d]\n", .Error(), ) }os.Stderr.WriteString()}const ( _CRLF_ = "\r\n" _EMPTY_ = "" _SPC_ = " " _PUB_P_ = "PUB " _HPUB_P_ = "HPUB ")var _CRLF_BYTES_ = []byte(_CRLF_)const ( _OK_OP_ = "+OK" _ERR_OP_ = "-ERR" _PONG_OP_ = "PONG" _INFO_OP_ = "INFO")const ( connectProto = "CONNECT %s" + _CRLF_ pingProto = "PING" + _CRLF_ pongProto = "PONG" + _CRLF_ subProto = "SUB %s %s %d" + _CRLF_ unsubProto = "UNSUB %d %s" + _CRLF_ okProto = _OK_OP_ + _CRLF_)// Return the currently selected serverfunc ( *Conn) () (int, *srv) {for , := range .srvPool {if == nil {continue }if == .current {return , } }return -1, nil}// Pop the current server and put onto the end of the list. Select head of list as long// as number of reconnect attempts under MaxReconnect.func ( *Conn) () (*srv, error) { , := .currentServer()if < 0 {returnnil, ErrNoServers } := .srvPool := len()copy([:-1], [+1:]) := .Opts.MaxReconnectif < 0 || .reconnects < { .srvPool[-1] = } else { .srvPool = [0 : -1] }iflen(.srvPool) <= 0 { .current = nilreturnnil, ErrNoServers } .current = .srvPool[0]return .srvPool[0], nil}// Will assign the correct server to nc.currentfunc ( *Conn) () error { .current = niliflen(.srvPool) <= 0 {returnErrNoServers }for , := range .srvPool {if != nil { .current = returnnil } }returnErrNoServers}const tlsScheme = "tls"// Create the server pool using the options given.// We will place a Url option first, followed by any// Server Options. We will randomize the server pool unless// the NoRandomize flag is set.func ( *Conn) () error { .srvPool = make([]*srv, 0, srvPoolSize) .urls = make(map[string]struct{}, srvPoolSize)// Create srv objects from each url string in nc.Opts.Servers // and add them to the pool.for , := range .Opts.Servers {if := .addURLToPool(, false, false); != nil {return } }// Randomize if allowed toif !.Opts.NoRandomize { .shufflePool(0) }// Normally, if this one is set, Options.Servers should not be, // but we always allowed that, so continue to do so.if .Opts.Url != _EMPTY_ {// Add to the end of the arrayif := .addURLToPool(.Opts.Url, false, false); != nil {return }// Then swap it with first to guarantee that Options.Url is tried first. := len(.srvPool) - 1if > 0 { .srvPool[0], .srvPool[] = .srvPool[], .srvPool[0] } } elseiflen(.srvPool) <= 0 {// Place default URL if pool is empty.if := .addURLToPool(DefaultURL, false, false); != nil {return } }// Check for Scheme hint to move to TLS mode.for , := range .srvPool {if .url.Scheme == tlsScheme || .url.Scheme == wsSchemeTLS {// FIXME(dlc), this is for all in the pool, should be case by case. .Opts.Secure = trueif .Opts.TLSConfig == nil { .Opts.TLSConfig = &tls.Config{MinVersion: tls.VersionTLS12} } } }return .pickServer()}// Helper function to return schemefunc ( *Conn) () string {if .ws {if .Opts.Secure {returnwsSchemeTLS }returnwsScheme }if .Opts.Secure {returntlsScheme }return"nats"}// Return true iff u.Hostname() is an IP address.func hostIsIP( *url.URL) bool {returnnet.ParseIP(.Hostname()) != nil}// addURLToPool adds an entry to the server poolfunc ( *Conn) ( string, , bool) error {if !strings.Contains(, "://") { = fmt.Sprintf("%s://%s", .connScheme(), ) }var ( *url.URLerror )for := 0; < 2; ++ { , = url.Parse()if != nil {return }if .Port() != "" {break }// In case given URL is of the form "localhost:", just add // the port number at the end, otherwise, add ":4222".if [len()-1] != ':' { += ":" }switch .Scheme {casewsScheme: += defaultWSPortStringcasewsSchemeTLS: += defaultWSSPortStringdefault: += defaultPortString } } := isWebsocketScheme()// We don't support mix and match of websocket and non websocket URLs. // If this is the first URL, then we accept and switch the global state // to websocket. After that, we will know how to reject mixed URLs.iflen(.srvPool) == 0 { .ws = } elseif && !.ws || ! && .ws {returnerrors.New("mixing of websocket and non websocket URLs is not allowed") }varstringif { := .current.url// Check to see if we do not have a url.User but current connected // url does. If so copy over.if .User == nil && .User != nil { .User = .User }// We are checking to see if we have a secure connection and are // adding an implicit server that just has an IP. If so we will remember // the current hostname we are connected to.if && hostIsIP() { = .Hostname() } } := &srv{url: , isImplicit: , tlsName: } .srvPool = append(.srvPool, ) .urls[.Host] = struct{}{}returnnil}// shufflePool swaps randomly elements in the server pool// The `offset` value indicates that the shuffling should start at// this offset and leave the elements from [0..offset) intact.func ( *Conn) ( int) {iflen(.srvPool) <= +1 {return } := rand.NewSource(time.Now().UnixNano()) := rand.New()for := ; < len(.srvPool); ++ { := + .Intn(+1-) .srvPool[], .srvPool[] = .srvPool[], .srvPool[] }}func ( *Conn) () { .br = &natsReader{buf: make([]byte, defaultBufSize),off: -1, } .bw = &natsWriter{limit: defaultBufSize,plimit: .Opts.ReconnectBufSize, }}func ( *Conn) () { := .bw .w, .bufs = .newWriter(), nil := .br .r, .n, .off = .conn, 0, -1}func ( *Conn) () io.Writer {vario.Writer = .connif .Opts.FlusherTimeout > 0 { = &timeoutWriter{conn: .conn, timeout: .Opts.FlusherTimeout} }return}func ( *natsWriter) ( string) error {return .appendBufs([]byte())}func ( *natsWriter) ( ...[]byte) error {for , := range {iflen() == 0 {continue }if .pending != nil { .pending.Write() } else { .bufs = append(.bufs, ...) } }if .pending == nil && len(.bufs) >= .limit {return .flush() }returnnil}func ( *natsWriter) ( ...string) error {for , := range {if , := .w.Write([]byte()); != nil {return } }returnnil}func ( *natsWriter) () error {// If a pending buffer is set, we don't flush. Code that needs to // write directly to the socket, by-passing buffers during (re)connect, // will use the writeDirect() API.if .pending != nil {returnnil }// Do not skip calling w.w.Write() here if len(w.bufs) is 0 because // the actual writer (if websocket for instance) may have things // to do such as sending control frames, etc.. , := .w.Write(.bufs) .bufs = .bufs[:0]return}func ( *natsWriter) () int {if .pending != nil {return .pending.Len() }returnlen(.bufs)}func ( *natsWriter) () { .pending = new(bytes.Buffer)}func ( *natsWriter) () error {if .pending == nil || .pending.Len() == 0 {returnnil } , := .w.Write(.pending.Bytes())// Reset the pending buffer at this point because we don't want // to take the risk of sending duplicates or partials. .pending.Reset()return}func ( *natsWriter) () bool {if .pending == nil {returnfalse }return .pending.Len() >= .plimit}func ( *natsWriter) () { .pending = nil}// Notify the reader that we are done with the connect, where "read" operations// happen synchronously and under the connection lock. After this point, "read"// will be happening from the read loop, without the connection lock.//// Note: this runs under the connection lock.func ( *natsReader) () {if , := .r.(*websocketReader); { .doneWithConnect() }}func ( *natsReader) () ([]byte, error) {if .off >= 0 { := .off .off = -1return .buf[:.n], nil }varerror .n, = .r.Read(.buf)return .buf[:.n], }func ( *natsReader) ( byte) (string, error) {varstring:// First look if we have something in the bufferif .off >= 0 { := bytes.IndexByte(.buf[.off:.n], )if >= 0 { := .off + + 1 += string(.buf[.off:]) .off = if .off >= .n { .off = -1 }return , nil }// We did not find the delim, so will have to read more. += string(.buf[.off:.n]) .off = -1 }if , := .Read(); != nil {return , } .off = 0goto}// createConn will connect to the server and wrap the appropriate// bufio structures. It will do the right thing when an existing// connection is in place.func ( *Conn) () ( error) {if .Opts.Timeout < 0 {returnErrBadTimeout }if , := .currentServer(); == nil {returnErrNoServers }// If we have a reference to an in-process server then establish a // connection using that.if .Opts.InProcessServer != nil { , := .Opts.InProcessServer.InProcessConn()if != nil {returnfmt.Errorf("failed to get in-process connection: %w", ) } .conn = .bindToNewConn()returnnil }// We will auto-expand host names if they resolve to multiple IPs := []string{} := .current.urlif !.Opts.SkipHostLookup && net.ParseIP(.Hostname()) == nil { , := net.LookupHost(.Hostname())for , := range { = append(, net.JoinHostPort(, .Port())) } }// Fall back to what we were given.iflen() == 0 { = append(, .Host) }// CustomDialer takes precedence. If not set, use Opts.Dialer which // is set to a default *net.Dialer (in Connect()) if not explicitly // set by the user. := .Opts.CustomDialerif == nil {// We will copy and shorten the timeout if we have multiple hosts to try. := *.Opts.Dialer .Timeout = .Timeout / time.Duration(len()) = & }iflen() > 1 && !.Opts.NoRandomize {rand.Shuffle(len(), func(, int) { [], [] = [], [] }) }for , := range { .conn, = .Dial("tcp", )if == nil {break } }if != nil {return }// If scheme starts with "ws" then branch out to websocket code.ifisWebsocketScheme() {return .wsInitHandshake() }// Reset reader/writer to this new TCP connection .bindToNewConn()returnnil}type skipTLSDialer interface { SkipTLSHandshake() bool}// makeTLSConn will wrap an existing Conn using TLSfunc ( *Conn) () error {if .Opts.CustomDialer != nil {// we do nothing when asked to skip the TLS wrapper , := .Opts.CustomDialer.(skipTLSDialer)if && .SkipTLSHandshake() {returnnil } }// Allow the user to configure their own tls.Config structure. := &tls.Config{}if .Opts.TLSConfig != nil { = util.CloneTLSConfig(.Opts.TLSConfig) }if .Opts.TLSCertCB != nil { , := .Opts.TLSCertCB()if != nil {return } .Certificates = []tls.Certificate{} }if .Opts.RootCAsCB != nil { , := .Opts.RootCAsCB()if != nil {return } .RootCAs = }// If its blank we will override it with the current hostif .ServerName == _EMPTY_ {if .current.tlsName != _EMPTY_ { .ServerName = .current.tlsName } else { , , := net.SplitHostPort(.current.url.Host) .ServerName = } } .conn = tls.Client(.conn, ) := .conn.(*tls.Conn)if := .Handshake(); != nil {return } .bindToNewConn()returnnil}// TLSConnectionState retrieves the state of the TLS connection to the serverfunc ( *Conn) () (tls.ConnectionState, error) {if !.isConnected() {returntls.ConnectionState{}, ErrDisconnected } .mu.RLock() := .conn .mu.RUnlock() , := .(*tls.Conn)if ! {returntls.ConnectionState{}, ErrConnectionNotTLS }return .ConnectionState(), nil}// waitForExits will wait for all socket watcher Go routines to// be shutdown before proceeding.func ( *Conn) () {// Kick old flusher forcefully.select {case .fch<-struct{}{}:default: }// Wait for any previous go routines. .wg.Wait()}// ForceReconnect forces a reconnect attempt to the server.// This is a non-blocking call and will start the reconnect// process without waiting for it to complete.//// If the connection is already in the process of reconnecting,// this call will force an immediate reconnect attempt (bypassing// the current reconnect delay).func ( *Conn) () error { .mu.Lock()defer .mu.Unlock()if .isClosed() {returnErrConnectionClosed }if .isReconnecting() {// if we're already reconnecting, force a reconnect attempt // even if we're in the middle of a backoffif .rqch != nil {close(.rqch) .rqch = nil }returnnil }// Clear any queued pongs .clearPendingFlushCalls()// Clear any queued and blocking requests. .clearPendingRequestCalls()// Stop ping timer if set. .stopPingTimer()// Go ahead and make sure we have flushed the outbound .bw.flush() .conn.Close() .changeConnStatus(RECONNECTING)go .doReconnect(nil, true)returnnil}// ConnectedUrl reports the connected server's URLfunc ( *Conn) () string {if == nil {return_EMPTY_ } .mu.RLock()defer .mu.RUnlock()if .status != CONNECTED {return_EMPTY_ }return .current.url.String()}// ConnectedUrlRedacted reports the connected server's URL with passwords redactedfunc ( *Conn) () string {if == nil {return_EMPTY_ } .mu.RLock()defer .mu.RUnlock()if .status != CONNECTED {return_EMPTY_ }return .current.url.Redacted()}// ConnectedAddr returns the connected server's IPfunc ( *Conn) () string {if == nil {return_EMPTY_ } .mu.RLock()defer .mu.RUnlock()if .status != CONNECTED {return_EMPTY_ }return .conn.RemoteAddr().String()}// ConnectedServerId reports the connected server's Idfunc ( *Conn) () string {if == nil {return_EMPTY_ } .mu.RLock()defer .mu.RUnlock()if .status != CONNECTED {return_EMPTY_ }return .info.ID}// ConnectedServerName reports the connected server's namefunc ( *Conn) () string {if == nil {return_EMPTY_ } .mu.RLock()defer .mu.RUnlock()if .status != CONNECTED {return_EMPTY_ }return .info.Name}var semVerRe = regexp.MustCompile(`\Av?([0-9]+)\.?([0-9]+)?\.?([0-9]+)?`)func versionComponents( string) (, , int, error) { := semVerRe.FindStringSubmatch()if == nil {return0, 0, 0, errors.New("invalid semver") } , = strconv.Atoi([1])if != nil {return -1, -1, -1, } , = strconv.Atoi([2])if != nil {return -1, -1, -1, } , = strconv.Atoi([3])if != nil {return -1, -1, -1, }return , , , }// Check for minimum server requirement.func ( *Conn) (, , int) bool { , , , := versionComponents(.ConnectedServerVersion())if < || ( == && < ) || ( == && == && < ) {returnfalse }returntrue}// ConnectedServerVersion reports the connected server's version as a stringfunc ( *Conn) () string {if == nil {return_EMPTY_ } .mu.RLock()defer .mu.RUnlock()if .status != CONNECTED {return_EMPTY_ }return .info.Version}// ConnectedClusterName reports the connected server's cluster name if anyfunc ( *Conn) () string {if == nil {return_EMPTY_ } .mu.RLock()defer .mu.RUnlock()if .status != CONNECTED {return_EMPTY_ }return .info.Cluster}// Low level setup for structs, etcfunc ( *Conn) () { .subs = make(map[int64]*Subscription) .pongs = make([]chanstruct{}, 0, 8) .fch = make(chanstruct{}, flushChanSize) .rqch = make(chanstruct{})// Setup scratch outbound buffer for PUB/HPUB := .scratch[:len(_HPUB_P_)]copy(, _HPUB_P_)}// Process a connected connection and initialize properly.func ( *Conn) () error {// Set our deadline for the whole connect process .conn.SetDeadline(time.Now().Add(.Opts.Timeout))defer .conn.SetDeadline(time.Time{})// Set our status to connecting. .changeConnStatus(CONNECTING)// If we need to have a TLS connection and want the TLS handshake to occur // first, do it now.if .Opts.Secure && .Opts.TLSHandshakeFirst {if := .makeTLSConn(); != nil {return } }// Process the INFO protocol received from the server := .processExpectedInfo()if != nil {return }// Send the CONNECT protocol along with the initial PING protocol. // Wait for the PONG response (or any error that we get from the server). = .sendConnect()if != nil {return }// Reset the number of PING sent out .pout = 0// Start or reset Timerif .Opts.PingInterval > 0 {if .ptmr == nil { .ptmr = time.AfterFunc(.Opts.PingInterval, .processPingTimer) } else { .ptmr.Reset(.Opts.PingInterval) } }// Start the readLoop and flusher go routines, we will wait on both on a reconnect event. .wg.Add(2)go .readLoop()go .flusher()// Notify the reader that we are done with the connect handshake, where // reads were done synchronously and under the connection lock. .br.doneWithConnect()returnnil}// Main connect function. Will connect to the nats-server.func ( *Conn) () (bool, error) {varerrorvarbool// Create actual socket connection // For first connect we walk all servers in the pool and try // to connect immediately. .mu.Lock()defer .mu.Unlock() .initc = true// The pool may change inside the loop iteration due to INFO protocol.for := 0; < len(.srvPool); ++ { .current = .srvPool[]if = .createConn(); == nil {// This was moved out of processConnectInit() because // that function is now invoked from doReconnect() too. .setup() = .processConnectInit()if == nil { .current.didConnect = true .current.reconnects = 0 .current.lastErr = nilbreak } else { .mu.Unlock() .close(DISCONNECTED, false, ) .mu.Lock()// Do not reset nc.current here since it would prevent // RetryOnFailedConnect to work should this be the last server // to try before starting doReconnect(). } } else {// Cancel out default connection refused, will trigger the // No servers error conditionalifstrings.Contains(.Error(), "connection refused") { = nil } } }if == nil && .status != CONNECTED { = ErrNoServers }if == nil { = true .initc = false } elseif .Opts.RetryOnFailedConnect { .setup() .changeConnStatus(RECONNECTING) .bw.switchToPending()go .doReconnect(ErrNoServers, false) = nil } else { .current = nil }return , }// This will check to see if the connection should be// secure. This can be dictated from either end and should// only be called after the INIT protocol has been received.func ( *Conn) () error {// Check to see if we need to engage TLS := .Opts// Check for mismatch in setupsif .Secure && !.info.TLSRequired && !.info.TLSAvailable {returnErrSecureConnWanted } elseif .info.TLSRequired && !.Secure {// Switch to Secure since server needs TLS. .Secure = true }if .Secure {// If TLS handshake first is true, we have already done // the handshake, so we are done here.if .TLSHandshakeFirst {returnnil }// Need to rewrap with bufioif := .makeTLSConn(); != nil {return } }returnnil}// processExpectedInfo will look for the expected first INFO message// sent when a connection is established. The lock should be held entering.func ( *Conn) () error { := &control{}// Read the protocol := .readOp()if != nil {return }// The nats protocol should send INFO first always.if .op != _INFO_OP_ {returnErrNoInfoReceived }// Parse the protocolif := .processInfo(.args); != nil {return }if .Opts.Nkey != "" && .info.Nonce == "" {returnErrNkeysNotSupported }// For websocket connections, we already switched to TLS if need be, // so we are done here.if .ws {returnnil }return .checkForSecure()}// Sends a protocol control message by queuing into the bufio writer// and kicking the flush Go routine. These writes are protected.func ( *Conn) ( string) { .mu.Lock() .bw.appendString() .kickFlusher() .mu.Unlock()}// Generate a connect protocol message, issuing user/password if// applicable. The lock is assumed to be held upon entering.func ( *Conn) () (string, error) { := .Optsvar , , , , , string := .current.url.Userif != nil {// if no password, assume username is authTokenif , := .Password(); ! { = .Username() } else { = .Username() , _ = .Password() } } else {// Take from options (possibly all empty strings) = .User = .Password = .Token = .Nkeyif .Opts.UserInfo != nil {if != _EMPTY_ || != _EMPTY_ {return_EMPTY_, ErrUserInfoAlreadySet } , = .Opts.UserInfo() } }// Look for user jwt.if .UserJWT != nil {if , := .UserJWT(); != nil {return_EMPTY_, } else { = }if != _EMPTY_ {return_EMPTY_, ErrNkeyAndUser } }if != _EMPTY_ || != _EMPTY_ {if .SignatureCB == nil {if == _EMPTY_ {return_EMPTY_, ErrNkeyButNoSigCB }return_EMPTY_, ErrUserButNoSigCB } , := .SignatureCB([]byte(.info.Nonce))if != nil {return_EMPTY_, fmt.Errorf("error signing nonce: %w", ) } = base64.RawURLEncoding.EncodeToString() }if .Opts.TokenHandler != nil {if != _EMPTY_ {return_EMPTY_, ErrTokenAlreadySet } = .Opts.TokenHandler() }// If our server does not support headers then we can't do them or no responders. := .info.Headers := connectInfo{ .Verbose, .Pedantic, , , , , , , .Secure, .Name, LangString, Version, clientProtoInfo, !.NoEcho, , , } , := json.Marshal()if != nil {return_EMPTY_, ErrJsonParse }// Check if NoEcho is set and we have a server that supports it.if .NoEcho && .info.Proto < 1 {return_EMPTY_, ErrNoEchoNotSupported }returnfmt.Sprintf(connectProto, ), nil}// normalizeErr removes the prefix -ERR, trim spaces and remove the quotes.func normalizeErr( string) string { := strings.TrimSpace(strings.TrimPrefix(, _ERR_OP_)) = strings.TrimLeft(strings.TrimRight(, "'"), "'")return}// natsProtoErr represents an -ERR protocol message sent by the server.type natsProtoErr struct { description string}func ( *natsProtoErr) () string {returnfmt.Sprintf("nats: %s", .description)}func ( *natsProtoErr) ( error) bool {returnstrings.ToLower(.Error()) == .Error()}// Send a connect protocol message to the server, issue user/password if// applicable. Will wait for a flush to return from the server for error// processing.func ( *Conn) () error {// Construct the CONNECT protocol string , := .connectProto()if != nil {if !.initc && .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, nil, ) }) }return }// Write the protocol and PING directly to the underlying writer.if := .bw.writeDirect(, pingProto); != nil {return }// We don't want to read more than we need here, otherwise // we would need to transfer the excess read data to the readLoop. // Since in normal situations we just are looking for a PONG\r\n, // reading byte-by-byte here is ok. , := .readProto()if != nil {if !.initc && .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, nil, ) }) }return }// If opts.Verbose is set, handle +OKif .Opts.Verbose && == okProto {// Read the rest now... , = .readProto()if != nil {if !.initc && .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, nil, ) }) }return } }// We expect a PONGif != pongProto {// But it could be something else, like -ERR// Since we no longer use ReadLine(), trim the trailing "\r\n" = strings.TrimRight(, "\r\n")// If it's a server error...ifstrings.HasPrefix(, _ERR_OP_) {// Remove -ERR, trim spaces and quotes, and convert to lower case. = normalizeErr()// Check if this is an auth errorif := checkAuthError(strings.ToLower()); != nil {// This will schedule an async error if we are in reconnect, // and keep track of the auth error for the current server. // If we have got the same error twice, this sets nc.ar to true to // indicate that the reconnect should be aborted (will be checked // in doReconnect()). .processAuthError() }return &natsProtoErr{} }// Notify that we got an unexpected protocol.returnfmt.Errorf("nats: expected '%s', got '%s'", _PONG_OP_, ) }// This is where we are truly connected. .changeConnStatus(CONNECTED)returnnil}// reads a protocol line.func ( *Conn) () (string, error) {return .br.ReadString('\n')}// A control protocol line.type control struct { op, args string}// Read a control line and process the intended op.func ( *Conn) ( *control) error { , := .readProto()if != nil {return }parseControl(, )returnnil}// Parse a control line from the server.func parseControl( string, *control) { := strings.SplitN(, _SPC_, 2)iflen() == 1 { .op = strings.TrimSpace([0]) .args = _EMPTY_ } elseiflen() == 2 { .op, .args = strings.TrimSpace([0]), strings.TrimSpace([1]) } else { .op = _EMPTY_ }}// flushReconnectPendingItems will push the pending items that were// gathered while we were in a RECONNECTING state to the socket.func ( *Conn) () error {return .bw.flushPendingBuffer()}// Stops the ping timer if set.// Connection lock is held on entry.func ( *Conn) () {if .ptmr != nil { .ptmr.Stop() }}// Try to reconnect using the option parameters.// This function assumes we are allowed to reconnect.func ( *Conn) ( error, bool) {// We want to make sure we have the other watchers shutdown properly // here before we proceed past this point. .waitForExits()// FIXME(dlc) - We have an issue here if we have // outstanding flush points (pongs) and they were not // sent out, but are still in the pipe.// Hold the lock manually and release where needed below, // can't do defer here. .mu.Lock()// Clear any errors. .err = nil// Perform appropriate callback if needed for a disconnect. // DisconnectedErrCB has priority over deprecated DisconnectedCBif !.initc {if .Opts.DisconnectedErrCB != nil { .ach.push(func() { .Opts.DisconnectedErrCB(, ) }) } elseif .Opts.DisconnectedCB != nil { .ach.push(func() { .Opts.DisconnectedCB() }) } }// This is used to wait on go routines exit if we start them in the loop // but an error occurs after that. := falsevar *time.Timer// Channel used to kick routine out of sleep when conn is closed. := .rqch// if rqch is nil, we need to set it up to signal // the reconnect loop to reconnect immediately // this means that `ForceReconnect` was called // before entering doReconnectif == nil { = make(chanstruct{})close() }// Counter that is increased when the whole list of servers has been tried.varintvartime.Durationvartime.Duration// If a custom reconnect delay handler is set, this takes precedence. := .Opts.CustomReconnectDelayCBif == nil { = .Opts.ReconnectWait// TODO: since we sleep only after the whole list has been tried, we can't // rely on individual *srv to know if it is a TLS or non-TLS url. // We have to pick which type of jitter to use, for now, we use these hints: = .Opts.ReconnectJitterif .Opts.Secure || .Opts.TLSConfig != nil { = .Opts.ReconnectJitterTLS } }for := 0; len(.srvPool) > 0; { , := .selectNextServer()if != nil { .err = break } := +1 >= len(.srvPool) && ! = false .mu.Unlock()if ! { ++// Release the lock to give a chance to a concurrent nc.Close() to break the loop.runtime.Gosched() } else { = 0vartime.Durationif != nil { ++ = () } else { = if > 0 { += time.Duration(rand.Int63n(int64())) } }if == nil { = time.NewTimer() } else { .Reset() }select {case<-: .Stop()// we need to reset the rqch channel to avoid // closing a closed channel in the next iteration .mu.Lock() .rqch = make(chanstruct{}) .mu.Unlock()case<-.C: } }// If the readLoop, etc.. go routines were started, wait for them to complete.if { .waitForExits() = false } .mu.Lock()// Check if we have been closed first.if .isClosed() {break }// Mark that we tried a reconnect .reconnects++// Try to create a new connection = .createConn()// Not yet connected, retry... // Continue to hold the lockif != nil {// Perform appropriate callback for a failed connection attempt.if .Opts.ReconnectErrCB != nil { .ach.push(func() { .Opts.ReconnectErrCB(, ) }) } .err = nilcontinue }// We are reconnected .Reconnects++// Process connect logicif .err = .processConnectInit(); .err != nil {// Check if we should abort reconnect. If so, break out // of the loop and connection will be closed.if .ar {break } .changeConnStatus(RECONNECTING)continue }// Clear possible lastErr under the connection lock after // a successful processConnectInit(). .current.lastErr = nil// Clear out server stats for the server we connected to.. .didConnect = true .reconnects = 0// Send existing subscription state .resendSubscriptions()// Now send off and clear pending buffer .err = .flushReconnectPendingItems()if .err != nil { .changeConnStatus(RECONNECTING)// Stop the ping timer (if set) .stopPingTimer()// Since processConnectInit() returned without error, the // go routines were started, so wait for them to return // on the next iteration (after releasing the lock). = truecontinue }// Done with the pending buffer .bw.doneWithPending()// Queue up the correct callback. If we are in initial connect state // (using retry on failed connect), we will call the ConnectedCB, // otherwise the ReconnectedCB.if .Opts.ReconnectedCB != nil && !.initc { .ach.push(func() { .Opts.ReconnectedCB() }) } elseif .Opts.ConnectedCB != nil && .initc { .ach.push(func() { .Opts.ConnectedCB() }) }// If we are here with a retry on failed connect, indicate that the // initial connect is now complete. .initc = false// Release lock here, we will return below. .mu.Unlock()// Make sure to flush everything .Flush()return }// Call into close.. We have no servers left..if .err == nil { .err = ErrNoServers } .mu.Unlock() .close(CLOSED, true, nil)}// processOpErr handles errors from reading or parsing the protocol.// The lock should not be held entering this function.func ( *Conn) ( error) bool { .mu.Lock()defer .mu.Unlock()if .isConnecting() || .isClosed() || .isReconnecting() {returnfalse }if .Opts.AllowReconnect && .status == CONNECTED {// Set our new status .changeConnStatus(RECONNECTING)// Stop ping timer if set .stopPingTimer()if .conn != nil { .conn.Close() .conn = nil }// Create pending buffer before reconnecting. .bw.switchToPending()// Clear any queued pongs, e.g. pending flush calls. .clearPendingFlushCalls()go .doReconnect(, false)returnfalse } .changeConnStatus(DISCONNECTED) .err = returntrue}// dispatch is responsible for calling any async callbacksfunc ( *asyncCallbacksHandler) () {for { .mu.Lock()// Protect for spurious wakeups. We should get out of the // wait only if there is an element to pop from the list.for .head == nil { .cond.Wait() } := .head .head = .nextif == .tail { .tail = nil } .mu.Unlock()// This signals that the dispatcher has been closed and all // previous callbacks have been dispatched.if .f == nil {return }// Invoke callback outside of handler's lock .f() }}// Add the given function to the tail of the list and// signals the dispatcher.func ( *asyncCallbacksHandler) ( func()) { .pushOrClose(, false)}// Signals that we are closing...func ( *asyncCallbacksHandler) () { .pushOrClose(nil, true)}// Add the given function to the tail of the list and// signals the dispatcher.func ( *asyncCallbacksHandler) ( func(), bool) { .mu.Lock()defer .mu.Unlock()// Make sure that library is not calling push with nil function, // since this is used to notify the dispatcher that it should stop.if ! && == nil {panic("pushing a nil callback") } := &asyncCB{f: }if .tail != nil { .tail.next = } else { .head = } .tail = if { .cond.Broadcast() } else { .cond.Signal() }}// readLoop() will sit on the socket reading and processing the// protocol from the server. It will dispatch appropriately based// on the op type.func ( *Conn) () {// Release the wait group on exitdefer .wg.Done()// Create a parseState if needed. .mu.Lock()if .ps == nil { .ps = &parseState{} } := .conn := .br .mu.Unlock()if == nil {return }for { , := .Read()if == nil {// With websocket, it is possible that there is no error but // also no buffer returned (either WS control message or read of a // partial compressed message). We could call parse(buf) which // would ignore an empty buffer, but simply go back to top of the loop.iflen() == 0 {continue } = .parse() }if != nil {if := .processOpErr(); { .close(CLOSED, true, nil) }break } }// Clear the parseState here.. .mu.Lock() .ps = nil .mu.Unlock()}// waitForMsgs waits on the conditional shared with readLoop and processMsg.// It is used to deliver messages to asynchronous subscribers.func ( *Conn) ( *Subscription) {varboolvar , uint64// Used to account for adjustments to sub.pBytes when we wrap back around. := -1for { .mu.Lock()// Do accounting for last msg delivered here so we only lock once // and drain state trips after callback has returned.if >= 0 { .pMsgs-- .pBytes -= = -1 }if .pHead == nil && !.closed { .pCond.Wait() }// Pop the msg off the list := .pHeadif != nil { .pHead = .nextif .pHead == nil { .pTail = nil }if .barrier != nil { .mu.Unlock()ifatomic.AddInt64(&.barrier.refs, -1) == 0 { .barrier.f() }continue } = len(.Data) } := .mcb = .max = .closedvarstringif !.closed { .delivered++ = .deliveredif .jsi != nil { = .checkForFlowControlResponse() } } .mu.Unlock()// Respond to flow control if applicableif != _EMPTY_ { .Publish(, nil) }if {break }// Deliver the message.if != nil && ( == 0 || <= ) { () }// If we have hit the max for delivered msgs, remove sub.if > 0 && >= { .mu.Lock() .removeSub() .mu.Unlock()break } }// Check for barrier messages .mu.Lock()for := .pHead; != nil; = .pHead {if .barrier != nil { .mu.Unlock()ifatomic.AddInt64(&.barrier.refs, -1) == 0 { .barrier.f() } .mu.Lock() } .pHead = .next }// Now check for pDone := .pDone .mu.Unlock()if != nil { (.Subject) }}// Used for debugging and simulating loss for certain tests.// Return what is to be used. If we return nil the message will be dropped.type msgFilter func(m *Msg) *Msg// processMsg is called by parse and will place the msg on the// appropriate channel/pending queue for processing. If the channel is full,// or the pending queue is over the pending limits, the connection is// considered a slow consumer.func ( *Conn) ( []byte) {// Statsatomic.AddUint64(&.InMsgs, 1)atomic.AddUint64(&.InBytes, uint64(len()))// Don't lock the connection to avoid server cutting us off if the // flusher is holding the connection lock, trying to send to the server // that is itself trying to send data to us. .subsMu.RLock() := .subs[.ps.ma.sid]varmsgFilterif .filters != nil { = .filters[string(.ps.ma.subject)] } .subsMu.RUnlock()if == nil {return }// Copy them into string := string(.ps.ma.subject) := string(.ps.ma.reply)// Doing message create outside of the sub's lock to reduce contention. // It's possible that we end-up not using the message, but that's ok.// FIXME(dlc): Need to copy, should/can do COW? := if !.ps.msgCopied { = make([]byte, len())copy(, ) }// Check if we have headers encoded here.varHeadervarerrorvarboolvarintvarstringif .ps.ma.hdr > 0 { := [:.ps.ma.hdr] = [.ps.ma.hdr:] , = DecodeHeadersMsg()if != nil {// We will pass the message through but send async error. .mu.Lock() .err = ErrBadHeaderMsgif .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, , ErrBadHeaderMsg) }) } .mu.Unlock() } }// FIXME(dlc): Should we recycle these containers? := &Msg{Subject: ,Reply: ,Header: ,Data: ,Sub: ,wsz: len() + len() + len(), }// Check for message filters.if != nil {if = (); == nil {// Drop message.return } } .mu.Lock()// Check if closed.if .closed { .mu.Unlock()return }// Skip flow control messages in case of using a JetStream context. := .jsiif != nil {// There has to be a header for it to be a control message.if != nil { , = isJSControlMessage()if && == jsCtrlHB {// Check if the heartbeat has a "Consumer Stalled" header, if // so, the value is the FC reply to send a nil message to. // We will send it at the end of this function. = .Header.Get(consumerStalledHdr) } }// Check for ordered consumer here. If checkOrderedMsgs returns true that means it detected a gap.if ! && .ordered && .checkOrderedMsgs() { .mu.Unlock()return } }// Skip processing if this is a control message and // if not a pull consumer heartbeat. For pull consumers, // heartbeats have to be handled on per request basis.if ! || ( != nil && .pull) {varbool// Subscription internal stats (applicable only for non ChanSubscription's)if .typ != ChanSubscription { .pMsgs++if .pMsgs > .pMsgsMax { .pMsgsMax = .pMsgs } .pBytes += len(.Data)if .pBytes > .pBytesMax { .pBytesMax = .pBytes }// Check for a Slow Consumerif (.pMsgsLimit > 0 && .pMsgs > .pMsgsLimit) || (.pBytesLimit > 0 && .pBytes > .pBytesLimit) {goto } } elseif != nil { = true }// We have two modes of delivery. One is the channel, used by channel // subscribers and syncSubscribers, the other is a linked list for async.if .mch != nil {select {case .mch<- :default:goto } } else {// Push onto the async pListif .pHead == nil { .pHead = .pTail = if .pCond != nil { .pCond.Signal() } } else { .pTail.next = .pTail = } }if != nil {// Store the ACK metadata from the message to // compare later on with the received heartbeat. .trackSequences(.Reply)if {// For ChanSubscription, since we can't call this when a message // is "delivered" (since user is pull from their own channel), // we have a go routine that does this check, however, we do it // also here to make it much more responsive. The go routine is // really to avoid stalling when there is no new messages coming. = .checkForFlowControlResponse() } } } elseif == jsCtrlFC && .Reply != _EMPTY_ {// This is a flow control message. // We will schedule the send of the FC reply once we have delivered the // DATA message that was received before this flow control message, which // has sequence `jsi.fciseq`. However, it is possible that this message // has already been delivered, in that case, we need to send the FC reply now.if .getJSDelivered() >= .fciseq { = .Reply } else {// Schedule a reply after the previous message is delivered. .scheduleFlowControlResponse(.Reply) } }// Clear any SlowConsumer status.if .sc { .changeSubStatus(SubscriptionActive) } .sc = false .mu.Unlock()if != _EMPTY_ { .Publish(, nil) }// Handle control heartbeat messages.if && == jsCtrlHB && .Reply == _EMPTY_ { .checkForSequenceMismatch(, , ) }return: .dropped++ := !.sc .sc = true// Undo stats from aboveif .typ != ChanSubscription { .pMsgs-- .pBytes -= len(.Data) }if { .changeSubStatus(SubscriptionSlowConsumer) .mu.Unlock()// Now we need connection's lock and we may end-up in the situation // that we were trying to avoid, except that in this case, the client // is already experiencing client-side slow consumer situation. .mu.Lock() .err = ErrSlowConsumerif .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, , ErrSlowConsumer) }) } .mu.Unlock() } else { .mu.Unlock() }}var ( permissionsRe = regexp.MustCompile(`Subscription to "(\S+)"`) permissionsQueueRe = regexp.MustCompile(`using queue "(\S+)"`))// processTransientError is called when the server signals a non terminal error// which does not close the connection or trigger a reconnect.// This will trigger the async error callback if set.// These errors include the following:// - permissions violation on publish or subscribe// - maximum subscriptions exceededfunc ( *Conn) ( error) { .mu.Lock() .err = iferrors.Is(, ErrPermissionViolation) { := permissionsRe.FindStringSubmatch(.Error())iflen() >= 2 { := permissionsQueueRe.FindStringSubmatch(.Error())varstringiflen() >= 2 { = [1] } := [1]for , := range .subs {if .Subject == && .Queue == && .permissionsErr == nil { .mu.Lock()if .errCh != nil { .errCh <- } .permissionsErr = .mu.Unlock() } } } }if .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, nil, ) }) } .mu.Unlock()}// processAuthError generally processing for auth errors. We want to do retries// unless we get the same error again. This allows us for instance to swap credentials// and have the app reconnect, but if nothing is changing we should bail.// This function will return true if the connection should be closed, false otherwise.// Connection lock is held on entryfunc ( *Conn) ( error) bool { .err = if !.initc && .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, nil, ) }) }// We should give up if we tried twice on this server and got the // same error. This behavior can be modified using IgnoreAuthErrorAbort.if .current.lastErr == && !.Opts.IgnoreAuthErrorAbort { .ar = true } else { .current.lastErr = }return .ar}// flusher is a separate Go routine that will process flush requests for the write// bufio. This allows coalescing of writes to the underlying socket.func ( *Conn) () {// Release the wait groupdefer .wg.Done()// snapshot the bw and conn since they can change from underneath of us. .mu.Lock() := .bw := .conn := .fch .mu.Unlock()if == nil || == nil {return }for {if , := <-; ! {return } .mu.Lock()// Check to see if we should bail out.if !.isConnected() || .isConnecting() || != .conn { .mu.Unlock()return }if .buffered() > 0 {if := .flush(); != nil {if .err == nil { .err = }if .Opts.AsyncErrorCB != nil { .ach.push(func() { .Opts.AsyncErrorCB(, nil, ) }) } } } .mu.Unlock() }}// processPing will send an immediate pong protocol response to the// server. The server uses this mechanism to detect dead clients.func ( *Conn) () { .sendProto(pongProto)}// processPong is used to process responses to the client's ping// messages. We use pings for the flush mechanism as well.func ( *Conn) () {varchanstruct{} .mu.Lock()iflen(.pongs) > 0 { = .pongs[0] .pongs = append(.pongs[:0], .pongs[1:]...) } .pout = 0 .mu.Unlock()if != nil { <- struct{}{} }}// processOK is a placeholder for processing OK messages.func ( *Conn) () {// do nothing}// processInfo is used to parse the info messages sent// from the server.// This function may update the server pool.func ( *Conn) ( string) error {if == _EMPTY_ {returnnil }varserverInfoif := json.Unmarshal([]byte(), &); != nil {return }// Copy content into connection's info structure. .info = // The array could be empty/not present on initial connect, // if advertise is disabled on that server, or servers that // did not include themselves in the async INFO protocol. // If empty, do not remove the implicit servers from the pool.iflen(.info.ConnectURLs) == 0 {if !.initc && .LameDuckMode && .Opts.LameDuckModeHandler != nil { .ach.push(func() { .Opts.LameDuckModeHandler() }) }returnnil }// Note about pool randomization: when the pool was first created, // it was randomized (if allowed). We keep the order the same (removing // implicit servers that are no longer sent to us). New URLs are sent // to us in no specific order so don't need extra randomization. := false// This is what we got from the server we are connected to. := .info.ConnectURLs// Transform that to a map for easy lookups := make(map[string]struct{}, len())for , := range { [] = struct{}{} }// Walk the pool and removed the implicit servers that are no longer in the // given array/map := .srvPoolfor := 0; < len(); ++ { := [] := .url.Host// Check if this URL is in the INFO protocol , := []// Remove from the temp map so that at the end we are left with only // new (or restarted) servers that need to be added to the pool.delete(, )// Keep servers that were set through Options, but also the one that // we are currently connected to (even if it is a discovered server).if !.isImplicit || .url == .current.url {continue }if ! {// Remove from server pool. Keep current order.copy([:], [+1:]) .srvPool = [:len()-1] = .srvPool -- } }// Figure out if we should save off the current non-IP hostname if we encounter a bare IP. := .current != nil && !hostIsIP(.current.url)// If there are any left in the tmp map, these are new (or restarted) servers // and need to be added to the pool.for := range {// Before adding, check if this is a new (as in never seen) URL. // This is used to figure out if we invoke the DiscoveredServersCBif , := .urls[]; ! { = true } .addURLToPool(fmt.Sprintf("%s://%s", .connScheme(), ), true, ) }if {// Randomize the pool if allowed but leave the first URL in place.if !.Opts.NoRandomize { .shufflePool(1) }if !.initc && .Opts.DiscoveredServersCB != nil { .ach.push(func() { .Opts.DiscoveredServersCB() }) } }if !.initc && .LameDuckMode && .Opts.LameDuckModeHandler != nil { .ach.push(func() { .Opts.LameDuckModeHandler() }) }returnnil}// processAsyncInfo does the same than processInfo, but is called// from the parser. Calls processInfo under connection's lock// protection.func ( *Conn) ( []byte) { .mu.Lock()// Ignore errors, we will simply not update the server pool... .processInfo(string()) .mu.Unlock()}// LastError reports the last error encountered via the connection.// It can be used reliably within ClosedCB in order to find out reason// why connection was closed for example.func ( *Conn) () error {if == nil {returnErrInvalidConnection } .mu.RLock() := .err .mu.RUnlock()return}// Check if the given error string is an auth error, and if so returns// the corresponding ErrXXX error, nil otherwisefunc checkAuthError( string) error {ifstrings.HasPrefix(, AUTHORIZATION_ERR) {returnErrAuthorization }ifstrings.HasPrefix(, AUTHENTICATION_EXPIRED_ERR) {returnErrAuthExpired }ifstrings.HasPrefix(, AUTHENTICATION_REVOKED_ERR) {returnErrAuthRevoked }ifstrings.HasPrefix(, ACCOUNT_AUTHENTICATION_EXPIRED_ERR) {returnErrAccountAuthExpired }returnnil}// processErr processes any error messages from the server and// sets the connection's LastError.func ( *Conn) ( string) {// Trim, remove quotes := normalizeErr()// convert to lower case. := strings.ToLower()varbool// FIXME(dlc) - process Slow Consumer signals special.if == STALE_CONNECTION { = .processOpErr(ErrStaleConnection) } elseif == MAX_CONNECTIONS_ERR { = .processOpErr(ErrMaxConnectionsExceeded) } elseifstrings.HasPrefix(, PERMISSIONS_ERR) { .processTransientError(fmt.Errorf("%w: %s", ErrPermissionViolation, )) } elseifstrings.HasPrefix(, MAX_SUBSCRIPTIONS_ERR) { .processTransientError(ErrMaxSubscriptionsExceeded) } elseif := checkAuthError(); != nil { .mu.Lock() = .processAuthError() .mu.Unlock() } else { = true .mu.Lock() .err = errors.New("nats: " + ) .mu.Unlock() }if { .close(CLOSED, true, nil) }}// kickFlusher will send a bool on a channel to kick the// flush Go routine to flush data to the server.func ( *Conn) () {if .bw != nil {select {case .fch<-struct{}{}:default: } }}// Publish publishes the data argument to the given subject. The data// argument is left untouched and needs to be correctly interpreted on// the receiver.func ( *Conn) ( string, []byte) error {return .publish(, _EMPTY_, nil, )}// Header represents the optional Header for a NATS message,// based on the implementation of http.Header.typeHeadermap[string][]string// Add adds the key, value pair to the header. It is case-sensitive// and appends to any existing values associated with key.func ( Header) (, string) { [] = append([], )}// Set sets the header entries associated with key to the single// element value. It is case-sensitive and replaces any existing// values associated with key.func ( Header) (, string) { [] = []string{}}// Get gets the first value associated with the given key.// It is case-sensitive.func ( Header) ( string) string {if == nil {return_EMPTY_ }if := []; != nil {return [0] }return_EMPTY_}// Values returns all values associated with the given key.// It is case-sensitive.func ( Header) ( string) []string {return []}// Del deletes the values associated with a key.// It is case-sensitive.func ( Header) ( string) {delete(, )}// NewMsg creates a message for publishing that will use headers.func ( string) *Msg {return &Msg{Subject: ,Header: make(Header), }}const ( hdrLine = "NATS/1.0\r\n" crlf = "\r\n" hdrPreEnd = len(hdrLine) - len(crlf) statusHdr = "Status" descrHdr = "Description" lastConsumerSeqHdr = "Nats-Last-Consumer" lastStreamSeqHdr = "Nats-Last-Stream" consumerStalledHdr = "Nats-Consumer-Stalled" noResponders = "503" noMessagesSts = "404" reqTimeoutSts = "408" jetStream409Sts = "409" controlMsg = "100" statusLen = 3// e.g. 20x, 40x, 50x)// DecodeHeadersMsg will decode and headers.func ( []byte) (Header, error) { := bufio.NewReaderSize(bytes.NewReader(), 128) := textproto.NewReader() , := .ReadLine()if != nil || len() < hdrPreEnd || [:hdrPreEnd] != hdrLine[:hdrPreEnd] {returnnil, ErrBadHeaderMsg } , := readMIMEHeader()if != nil {returnnil, }// Check if we have an inlined status.iflen() > hdrPreEnd {varstring := strings.TrimSpace([hdrPreEnd:])iflen() != statusLen { = strings.TrimSpace([statusLen:]) = [:statusLen] } .Add(statusHdr, )iflen() > 0 { .Add(descrHdr, ) } }returnHeader(), nil}// readMIMEHeader returns a MIMEHeader that preserves the// original case of the MIME header, based on the implementation// of textproto.ReadMIMEHeader.//// https://golang.org/pkg/net/textproto/#Reader.ReadMIMEHeaderfunc readMIMEHeader( *textproto.Reader) (textproto.MIMEHeader, error) { := make(textproto.MIMEHeader)for { , := .ReadLine()iflen() == 0 {return , }// Process key fetching original case. := strings.IndexByte(, ':')if < 0 {returnnil, ErrBadHeaderMsg } := [:]if == "" {// Skip empty keys.continue } ++for < len() && ([] == ' ' || [] == '\t') { ++ } [] = append([], [:])if != nil {return , } }}// PublishMsg publishes the Msg structure, which includes the// Subject, an optional Reply and an optional Data field.func ( *Conn) ( *Msg) error {if == nil {returnErrInvalidMsg } , := .headerBytes()if != nil {return }return .publish(.Subject, .Reply, , .Data)}// PublishRequest will perform a Publish() expecting a response on the// reply subject. Use Request() for automatically waiting for a response// inline.func ( *Conn) (, string, []byte) error {return .publish(, , nil, )}// Used for handrolled Itoaconst digits = "0123456789"// publish is the internal function to publish messages to a nats-server.// Sends a protocol data message by queuing into the bufio writer// and kicking the flush go routine. These writes should be protected.func ( *Conn) (, string, , []byte) error {if == nil {returnErrInvalidConnection }if == "" {returnErrBadSubject } .mu.Lock()// Check if headers attempted to be sent to server that does not support them.iflen() > 0 && !.info.Headers { .mu.Unlock()returnErrHeadersNotSupported }if .isClosed() { .mu.Unlock()returnErrConnectionClosed }if .isDrainingPubs() { .mu.Unlock()returnErrConnectionDraining }// Proactively reject payloads over the threshold set by server. := int64(len() + len())// Skip this check if we are not yet connected (RetryOnFailedConnect)if !.initc && > .info.MaxPayload { .mu.Unlock()returnErrMaxPayload }// Check if we are reconnecting, and if so check if // we have exceeded our reconnect outbound buffer limits.if .bw.atLimitIfUsingPending() { .mu.Unlock()returnErrReconnectBufExceeded }var []byteif != nil { = .scratch[:len(_HPUB_P_)] } else { = .scratch[1:len(_HPUB_P_)] } = append(, ...) = append(, ' ')if != "" { = append(, ...) = append(, ' ') }// We could be smarter here, but simple loop is ok, // just avoid strconv in fast path. // FIXME(dlc) - Find a better way here. // msgh = strconv.AppendInt(msgh, int64(len(data)), 10) // go 1.14 some values strconv faster, may be able to switch over.var [12]byte := len()if != nil {iflen() > 0 {for := len(); > 0; /= 10 { -- [] = digits[%10] } } else { -- [] = digits[0] } = append(, [:]...) = append(, ' ')// reset for below. = len() }if > 0 {for := ; > 0; /= 10 { -- [] = digits[%10] } } else { -- [] = digits[0] } = append(, [:]...) = append(, _CRLF_...)if := .bw.appendBufs(, , , _CRLF_BYTES_); != nil { .mu.Unlock()return } .OutMsgs++ .OutBytes += uint64(len() + len())iflen(.fch) == 0 { .kickFlusher() } .mu.Unlock()returnnil}// respHandler is the global response handler. It will look up// the appropriate channel based on the last token and place// the message on the channel if possible.func ( *Conn) ( *Msg) { .mu.Lock()// Just return if closed.if .isClosed() { .mu.Unlock()return }varchan *Msg// Grab mch := .respToken(.Subject)if != _EMPTY_ { = .respMap[]// Delete the key regardless, one response only.delete(.respMap, ) } elseiflen(.respMap) == 1 {// If the server has rewritten the subject, the response token (rt) // will not match (could be the case with JetStream). If that is the // case and there is a single entry, use that.for , := range .respMap { = delete(.respMap, )break } } .mu.Unlock()// Don't block, let Request timeout instead, mch is // buffered and we should delete the key before a // second response is processed.select {case<- :default:return }}// Helper to setup and send new request style requests. Return the chan to receive the response.func ( *Conn) ( string, , []byte) (chan *Msg, string, error) { .mu.Lock()// Create new literal Inbox and map to a chan msg. := make(chan *Msg, RequestChanLen) := .newRespInbox() := [.respSubLen:] .respMap[] = if .respMux == nil {// Create the response subscription we will use for all new style responses. // This will be on an _INBOX with an additional terminal token. The subscription // will be on a wildcard. , := .subscribeLocked(.respSub, _EMPTY_, .respHandler, nil, nil, false, nil)if != nil { .mu.Unlock()returnnil, , } .respMux = } .mu.Unlock()if := .publish(, , , ); != nil {returnnil, , }return , , nil}// RequestMsg will send a request payload including optional headers and deliver// the response message, or an error, including a timeout if no message was received properly.func ( *Conn) ( *Msg, time.Duration) (*Msg, error) {if == nil {returnnil, ErrInvalidMsg } , := .headerBytes()if != nil {returnnil, }return .request(.Subject, , .Data, )}// Request will send a request payload and deliver the response message,// or an error, including a timeout if no message was received properly.func ( *Conn) ( string, []byte, time.Duration) (*Msg, error) {return .request(, nil, , )}func ( *Conn) () bool { .mu.RLock() := .Opts.UseOldRequestStyle .mu.RUnlock()return}func ( *Conn) ( string, , []byte, time.Duration) (*Msg, error) {if == nil {returnnil, ErrInvalidConnection }var *Msgvarerrorif .useOldRequestStyle() { , = .oldRequest(, , , ) } else { , = .newRequest(, , , ) }// Check for no responder status.if == nil && len(.Data) == 0 && .Header.Get(statusHdr) == noResponders { , = nil, ErrNoResponders }return , }func ( *Conn) ( string, , []byte, time.Duration) (*Msg, error) { , , := .createNewRequestAndSend(, , )if != nil {returnnil, } := globalTimerPool.Get()deferglobalTimerPool.Put()varboolvar *Msgselect {case , = <-:if ! {returnnil, ErrConnectionClosed }case<-.C: .mu.Lock()delete(.respMap, ) .mu.Unlock()returnnil, ErrTimeout }return , nil}// oldRequest will create an Inbox and perform a Request() call// with the Inbox reply and return the first reply received.// This is optimized for the case of multiple responses.func ( *Conn) ( string, , []byte, time.Duration) (*Msg, error) { := .NewInbox() := make(chan *Msg, RequestChanLen) , := .subscribe(, _EMPTY_, nil, , nil, true, nil)if != nil {returnnil, } .AutoUnsubscribe(1)defer .Unsubscribe() = .publish(, , , )if != nil {returnnil, }return .NextMsg()}// InboxPrefix is the prefix for all inbox subjects.const (InboxPrefix = "_INBOX." inboxPrefixLen = len(InboxPrefix) replySuffixLen = 8// Gives us 62^8 rdigits = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" base = 62)// NewInbox will return an inbox string which can be used for directed replies from// subscribers. These are guaranteed to be unique, but can be shared and subscribed// to by others.func () string {var [inboxPrefixLen + nuidSize]byte := [:inboxPrefixLen]copy(, InboxPrefix) := [inboxPrefixLen:]copy(, nuid.Next())returnstring([:])}// Create a new inbox that is prefix aware.func ( *Conn) () string {if .Opts.InboxPrefix == _EMPTY_ {returnNewInbox() }varstrings.Builder .WriteString(.Opts.InboxPrefix) .WriteByte('.') .WriteString(nuid.Next())return .String()}// Function to init new response structures.func ( *Conn) () { .respSubPrefix = fmt.Sprintf("%s.", .NewInbox()) .respSubLen = len(.respSubPrefix) .respSub = fmt.Sprintf("%s*", .respSubPrefix) .respMap = make(map[string]chan *Msg) .respRand = rand.New(rand.NewSource(time.Now().UnixNano()))}// newRespInbox creates a new literal response subject// that will trigger the mux subscription handler.// Lock should be held.func ( *Conn) () string {if .respMap == nil { .initNewResp() }varstrings.Builder .WriteString(.respSubPrefix) := .respRand.Int63()for := 0; < replySuffixLen; ++ { .WriteByte(rdigits[%base]) /= base }return .String()}// NewRespInbox is the new format used for _INBOX.func ( *Conn) () string { .mu.Lock() := .newRespInbox() .mu.Unlock()return}// respToken will return the last token of a literal response inbox// which we use for the message channel lookup. This needs to verify the subject// prefix matches to protect itself against the server changing the subject.// Lock should be held.func ( *Conn) ( string) string {if , := strings.CutPrefix(, .respSubPrefix); {return }return""}// Subscribe will express interest in the given subject. The subject// can have wildcards.// There are two type of wildcards: * for partial, and > for full.// A subscription on subject time.*.east would receive messages sent to time.us.east and time.eu.east.// A subscription on subject time.us.> would receive messages sent to// time.us.east and time.us.east.atlanta, while time.us.* would only match time.us.east// since it can't match more than one token.// Messages will be delivered to the associated MsgHandler.func ( *Conn) ( string, MsgHandler) (*Subscription, error) {return .subscribe(, _EMPTY_, , nil, nil, false, nil)}// ChanSubscribe will express interest in the given subject and place// all messages received on the channel.// You should not close the channel until sub.Unsubscribe() has been called.func ( *Conn) ( string, chan *Msg) (*Subscription, error) {return .subscribe(, _EMPTY_, nil, , nil, false, nil)}// ChanQueueSubscribe will express interest in the given subject.// All subscribers with the same queue name will form the queue group// and only one member of the group will be selected to receive any given message,// which will be placed on the channel.// You should not close the channel until sub.Unsubscribe() has been called.// Note: This is the same than QueueSubscribeSyncWithChan.func ( *Conn) (, string, chan *Msg) (*Subscription, error) {return .subscribe(, , nil, , nil, false, nil)}// SubscribeSync will express interest on the given subject. Messages will// be received synchronously using Subscription.NextMsg().func ( *Conn) ( string) (*Subscription, error) {if == nil {returnnil, ErrInvalidConnection } := make(chan *Msg, .Opts.SubChanLen)varchanerrorif .Opts.PermissionErrOnSubscribe { = make(chanerror, 100) }return .subscribe(, _EMPTY_, nil, , , true, nil)}// QueueSubscribe creates an asynchronous queue subscriber on the given subject.// All subscribers with the same queue name will form the queue group and// only one member of the group will be selected to receive any given// message asynchronously.func ( *Conn) (, string, MsgHandler) (*Subscription, error) {return .subscribe(, , , nil, nil, false, nil)}// QueueSubscribeSync creates a synchronous queue subscriber on the given// subject. All subscribers with the same queue name will form the queue// group and only one member of the group will be selected to receive any// given message synchronously using Subscription.NextMsg().func ( *Conn) (, string) (*Subscription, error) { := make(chan *Msg, .Opts.SubChanLen)varchanerrorif .Opts.PermissionErrOnSubscribe { = make(chanerror, 100) }return .subscribe(, , nil, , , true, nil)}// QueueSubscribeSyncWithChan will express interest in the given subject.// All subscribers with the same queue name will form the queue group// and only one member of the group will be selected to receive any given message,// which will be placed on the channel.// You should not close the channel until sub.Unsubscribe() has been called.// Note: This is the same than ChanQueueSubscribe.func ( *Conn) (, string, chan *Msg) (*Subscription, error) {return .subscribe(, , nil, , nil, false, nil)}// badSubject will do quick test on whether a subject is acceptable.// Spaces are not allowed and all tokens should be > 0 in len.func badSubject( string) bool {ifstrings.ContainsAny(, " \t\r\n") {returntrue } := strings.Split(, ".")for , := range {iflen() == 0 {returntrue } }returnfalse}// badQueue will check a queue name for whitespace.func badQueue( string) bool {returnstrings.ContainsAny(, " \t\r\n")}// subscribe is the internal subscribe function that indicates interest in a subject.func ( *Conn) (, string, MsgHandler, chan *Msg, chan (error), bool, *jsSub) (*Subscription, error) {if == nil {returnnil, ErrInvalidConnection } .mu.Lock()defer .mu.Unlock()return .subscribeLocked(, , , , , , )}func ( *Conn) (, string, MsgHandler, chan *Msg, chan (error), bool, *jsSub) (*Subscription, error) {if == nil {returnnil, ErrInvalidConnection }ifbadSubject() {returnnil, ErrBadSubject }if != _EMPTY_ && badQueue() {returnnil, ErrBadQueueName }// Check for some error conditions.if .isClosed() {returnnil, ErrConnectionClosed }if .isDraining() {returnnil, ErrConnectionDraining }if == nil && == nil {returnnil, ErrBadSubscription } := &Subscription{Subject: ,Queue: ,mcb: ,conn: ,jsi: , }// Set pending limits.if != nil { .pMsgsLimit = cap() } else { .pMsgsLimit = DefaultSubPendingMsgsLimit } .pBytesLimit = DefaultSubPendingBytesLimit// If we have an async callback, start up a sub specific // Go routine to deliver the messages.varboolif != nil { .typ = AsyncSubscription .pCond = sync.NewCond(&.mu) = true } elseif ! { .typ = ChanSubscription .mch = } else { // Sync Subscription .typ = SyncSubscription .mch = .errCh = } .subsMu.Lock() .ssid++ .sid = .ssid .subs[.sid] = .subsMu.Unlock()// Let's start the go routine now that it is fully setup and registered.if {go .waitForMsgs() }// We will send these for all subs when we reconnect // so that we can suppress here if reconnecting.if !.isReconnecting() { .bw.appendString(fmt.Sprintf(subProto, , , .sid)) .kickFlusher() } .changeSubStatus(SubscriptionActive)return , nil}// NumSubscriptions returns active number of subscriptions.func ( *Conn) () int { .mu.RLock()defer .mu.RUnlock()returnlen(.subs)}// Lock for nc should be held here upon entryfunc ( *Conn) ( *Subscription) { .subsMu.Lock()delete(.subs, .sid) .subsMu.Unlock() .mu.Lock()defer .mu.Unlock()// Release callers on NextMsg for SyncSubscription onlyif .mch != nil && .typ == SyncSubscription {close(.mch) } .mch = nil// If JS subscription then stop HB timer.if := .jsi; != nil {if .hbc != nil { .hbc.Stop() .hbc = nil }if .csfct != nil { .csfct.Stop() .csfct = nil } }if .typ != AsyncSubscription { := .pDoneif != nil { (.Subject) } }// Mark as invalid .closed = true .changeSubStatus(SubscriptionClosed)if .pCond != nil { .pCond.Broadcast() }}// SubscriptionType is the type of the Subscription.typeSubscriptionTypeint// The different types of subscription types.const (AsyncSubscription = SubscriptionType(iota)SyncSubscriptionChanSubscriptionNilSubscriptionPullSubscription)// Type returns the type of Subscription.func ( *Subscription) () SubscriptionType {if == nil {returnNilSubscription } .mu.Lock()defer .mu.Unlock()// Pull subscriptions are really a SyncSubscription and we want this // type to be set internally for all delivered messages management, etc.. // So check when to return PullSubscription to the user.if .jsi != nil && .jsi.pull {returnPullSubscription }return .typ}// IsValid returns a boolean indicating whether the subscription// is still active. This will return false if the subscription has// already been closed.func ( *Subscription) () bool {if == nil {returnfalse } .mu.Lock()defer .mu.Unlock()return .conn != nil && !.closed}// Drain will remove interest but continue callbacks until all messages// have been processed.//// For a JetStream subscription, if the library has created the JetStream// consumer, the library will send a DeleteConsumer request to the server// when the Drain operation completes. If a failure occurs when deleting// the JetStream consumer, an error will be reported to the asynchronous// error callback.// If you do not wish the JetStream consumer to be automatically deleted,// ensure that the consumer is not created by the library, which means// create the consumer with AddConsumer and bind to this consumer.func ( *Subscription) () error {if == nil {returnErrBadSubscription } .mu.Lock() := .conn .mu.Unlock()if == nil {returnErrBadSubscription }return .unsubscribe(, 0, true)}// IsDraining returns a boolean indicating whether the subscription// is being drained.// This will return false if the subscription has already been closed.func ( *Subscription) () bool {if == nil {returnfalse } .mu.Lock()defer .mu.Unlock()return .draining}// StatusChanged returns a channel on which given list of subscription status// changes will be sent. If no status is provided, all status changes will be sent.// Available statuses are SubscriptionActive, SubscriptionDraining, SubscriptionClosed,// and SubscriptionSlowConsumer.// The returned channel will be closed when the subscription is closed.func ( *Subscription) ( ...SubStatus) <-chanSubStatus {iflen() == 0 { = []SubStatus{SubscriptionActive, SubscriptionDraining, SubscriptionClosed, SubscriptionSlowConsumer} } := make(chanSubStatus, 10) .mu.Lock()defer .mu.Unlock()for , := range { .registerStatusChangeListener(, )// initial statusif == .status { <- } }return}// registerStatusChangeListener registers a channel waiting for a specific status change event.// Status change events are non-blocking - if no receiver is waiting for the status change,// it will not be sent on the channel. Closed channels are ignored.// Lock should be held entering.func ( *Subscription) ( SubStatus, chanSubStatus) {if .statListeners == nil { .statListeners = make(map[chanSubStatus][]SubStatus) }if , := .statListeners[]; ! { .statListeners[] = make([]SubStatus, 0) } .statListeners[] = append(.statListeners[], )}// sendStatusEvent sends subscription status event to all channels.// If there is no listener, sendStatusEvent// will not block. Lock should be held entering.func ( *Subscription) ( SubStatus) {for , := range .statListeners {if !containsStatus(, ) {continue }// only send event if someone's listeningselect {case<- :default: }if == SubscriptionClosed {close() } }}func containsStatus( []SubStatus, SubStatus) bool {for , := range {if == {returntrue } }returnfalse}// changeSubStatus changes subscription status and sends events// to all listeners. Lock should be held entering.func ( *Subscription) ( SubStatus) {if == nil {return } .sendStatusEvent() .status = }// Unsubscribe will remove interest in the given subject.//// For a JetStream subscription, if the library has created the JetStream// consumer, it will send a DeleteConsumer request to the server (if the// unsubscribe itself was successful). If the delete operation fails, the// error will be returned.// If you do not wish the JetStream consumer to be automatically deleted,// ensure that the consumer is not created by the library, which means// create the consumer with AddConsumer and bind to this consumer (using// the nats.Bind() option).func ( *Subscription) () error {if == nil {returnErrBadSubscription } .mu.Lock() := .conn := .closed := .jsi != nil && .jsi.dc .mu.Unlock()if == nil || .IsClosed() {returnErrConnectionClosed }if {returnErrBadSubscription }if .IsDraining() {returnErrConnectionDraining } := .unsubscribe(, 0, false)if == nil && { = .deleteConsumer() }return}// checkDrained will watch for a subscription to be fully drained// and then remove it.func ( *Conn) ( *Subscription) {if == nil || == nil {return }deferfunc() { .mu.Lock()defer .mu.Unlock() .draining = false }()// This allows us to know that whatever we have in the client pending // is correct and the server will not send additional information. .Flush() .mu.Lock()// For JS subscriptions, check if we are going to delete the // JS consumer when drain completes. := .jsi != nil && .jsi.dc .mu.Unlock()// Once we are here we just wait for Pending to reach 0 or // any other state to exit this go routine.for {// check connection is still valid.if .IsClosed() {return }// Check subscription state .mu.Lock() := .conn := .closed := .pMsgs .mu.Unlock()if == nil || || == 0 { .mu.Lock() .removeSub() .mu.Unlock()if {if := .deleteConsumer(); != nil { .mu.Lock()if := .Opts.AsyncErrorCB; != nil { .ach.push(func() { (, , ) }) } .mu.Unlock() } }return }time.Sleep(100 * time.Millisecond) }}// AutoUnsubscribe will issue an automatic Unsubscribe that is// processed by the server when max messages have been received.// This can be useful when sending a request to an unknown number// of subscribers.func ( *Subscription) ( int) error {if == nil {returnErrBadSubscription } .mu.Lock() := .conn := .closed .mu.Unlock()if == nil || {returnErrBadSubscription }return .unsubscribe(, , false)}// SetClosedHandler will set the closed handler for when a subscription// is closed (either unsubscribed or drained).func ( *Subscription) ( func( string)) { .mu.Lock() .pDone = .mu.Unlock()}// unsubscribe performs the low level unsubscribe to the server.// Use Subscription.Unsubscribe()func ( *Conn) ( *Subscription, int, bool) error {varstringif > 0 { .mu.Lock() .max = uint64()if .delivered < .max { = strconv.Itoa() } .mu.Unlock() } .mu.Lock()// ok here, but defer is expensivedefer .mu.Unlock()if .isClosed() {returnErrConnectionClosed } .subsMu.RLock() := .subs[.sid] .subsMu.RUnlock()// Already unsubscribedif == nil {returnnil }if == _EMPTY_ && ! { .removeSub() }if { .mu.Lock() .draining = true .changeSubStatus(SubscriptionDraining) .mu.Unlock()go .checkDrained() }// We will send these for all subs when we reconnect // so that we can suppress here.if !.isReconnecting() { .bw.appendString(fmt.Sprintf(unsubProto, .sid, )) .kickFlusher() }// For JetStream subscriptions cancel the attached context if there is any.varfunc() .mu.Lock() := .jsiif != nil { = .cancel .cancel = nil } .mu.Unlock()if != nil { () }returnnil}// NextMsg will return the next message available to a synchronous subscriber// or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription),// the connection is closed (ErrConnectionClosed), the timeout is reached (ErrTimeout),// or if there were no responders (ErrNoResponders) when used in the context of a request/reply.func ( *Subscription) ( time.Duration) (*Msg, error) {if == nil {returnnil, ErrBadSubscription } .mu.Lock() := .validateNextMsgState(false)if != nil { .mu.Unlock()returnnil, }// snapshot := .mch .mu.Unlock()varboolvar *Msg// If something is available right away, let's optimize that case.select {case , = <-:if ! {returnnil, .getNextMsgErr() }if := .processNextMsgDelivered(); != nil {returnnil, } else {return , nil }default: }// If we are here a message was not immediately available, so lets loop // with a timeout. := globalTimerPool.Get()deferglobalTimerPool.Put()if .errCh != nil {select {case , = <-:if ! {returnnil, .getNextMsgErr() }if := .processNextMsgDelivered(); != nil {returnnil, }case := <-.errCh:returnnil, case<-.C:returnnil, ErrTimeout } } else {select {case , = <-:if ! {returnnil, .getNextMsgErr() }if := .processNextMsgDelivered(); != nil {returnnil, }case<-.C:returnnil, ErrTimeout } }return , nil}// nextMsgNoTimeout works similarly to Subscription.NextMsg() but will not// time out. It is only used internally for non-timeout subscription iterator.func ( *Subscription) () (*Msg, error) {if == nil {returnnil, ErrBadSubscription } .mu.Lock() := .validateNextMsgState(false)if != nil { .mu.Unlock()returnnil, }// snapshot := .mch .mu.Unlock()varboolvar *Msg// If something is available right away, let's optimize that case.select {case , = <-:if ! {returnnil, .getNextMsgErr() }if := .processNextMsgDelivered(); != nil {returnnil, } else {return , nil }default: }if .errCh != nil {select {case , = <-:if ! {returnnil, .getNextMsgErr() }if := .processNextMsgDelivered(); != nil {returnnil, }case := <-.errCh:returnnil, } } else { , = <-if ! {returnnil, .getNextMsgErr() }if := .processNextMsgDelivered(); != nil {returnnil, } }return , nil}// validateNextMsgState checks whether the subscription is in a valid// state to call NextMsg and be delivered another message synchronously.// This should be called while holding the lock.func ( *Subscription) ( bool) error {if .connClosed {returnErrConnectionClosed }if .mch == nil {if .max > 0 && .delivered >= .max {returnErrMaxMessages } elseif .closed {returnErrBadSubscription } }if .mcb != nil {returnErrSyncSubRequired }// if this subscription previously had a permissions error // and no reconnect has been attempted, return the permissions error // since the subscription does not exist on the serverif .conn.Opts.PermissionErrOnSubscribe && .permissionsErr != nil {return .permissionsErr }if .sc { .changeSubStatus(SubscriptionActive) .sc = falsereturnErrSlowConsumer }// Unless this is from an internal call, reject use of this API. // Users should use Fetch() instead.if ! && .jsi != nil && .jsi.pull {returnErrTypeSubscription }returnnil}// This is called when the sync channel has been closed.// The error returned will be either connection or subscription// closed depending on what caused NextMsg() to fail.func ( *Subscription) () error { .mu.Lock()defer .mu.Unlock()if .connClosed {returnErrConnectionClosed }returnErrBadSubscription}// processNextMsgDelivered takes a message and applies the needed// accounting to the stats from the subscription, returning an// error in case we have the maximum number of messages have been// delivered already. It should not be called while holding the lock.func ( *Subscription) ( *Msg) error { .mu.Lock() := .conn := .maxvarstring// Update some stats. .delivered++ := .deliveredif .jsi != nil { = .checkForFlowControlResponse() }if .typ == SyncSubscription { .pMsgs-- .pBytes -= len(.Data) } .mu.Unlock()if != _EMPTY_ { .Publish(, nil) }if > 0 {if > {returnErrMaxMessages }// Remove subscription if we have reached max.if == { .mu.Lock() .removeSub() .mu.Unlock() } }iflen(.Data) == 0 && .Header.Get(statusHdr) == noResponders {returnErrNoResponders }returnnil}// Queued returns the number of queued messages in the client for this subscription.//// Deprecated: Use Pending()func ( *Subscription) () (int, error) { , , := .Pending()returnint(), }// Pending returns the number of queued messages and queued bytes in the client for this subscription.func ( *Subscription) () (int, int, error) {if == nil {return -1, -1, ErrBadSubscription } .mu.Lock()defer .mu.Unlock()if .conn == nil || .closed {return -1, -1, ErrBadSubscription }if .typ == ChanSubscription {return -1, -1, ErrTypeSubscription }return .pMsgs, .pBytes, nil}// MaxPending returns the maximum number of queued messages and queued bytes seen so far.func ( *Subscription) () (int, int, error) {if == nil {return -1, -1, ErrBadSubscription } .mu.Lock()defer .mu.Unlock()if .conn == nil || .closed {return -1, -1, ErrBadSubscription }if .typ == ChanSubscription {return -1, -1, ErrTypeSubscription }return .pMsgsMax, .pBytesMax, nil}// ClearMaxPending resets the maximums seen so far.func ( *Subscription) () error {if == nil {returnErrBadSubscription } .mu.Lock()defer .mu.Unlock()if .conn == nil || .closed {returnErrBadSubscription }if .typ == ChanSubscription {returnErrTypeSubscription } .pMsgsMax, .pBytesMax = 0, 0returnnil}// Pending Limitsconst (// DefaultSubPendingMsgsLimit will be 512k msgs.DefaultSubPendingMsgsLimit = 512 * 1024// DefaultSubPendingBytesLimit is 64MBDefaultSubPendingBytesLimit = 64 * 1024 * 1024)// PendingLimits returns the current limits for this subscription.// If no error is returned, a negative value indicates that the// given metric is not limited.func ( *Subscription) () (int, int, error) {if == nil {return -1, -1, ErrBadSubscription } .mu.Lock()defer .mu.Unlock()if .conn == nil || .closed {return -1, -1, ErrBadSubscription }if .typ == ChanSubscription {return -1, -1, ErrTypeSubscription }return .pMsgsLimit, .pBytesLimit, nil}// SetPendingLimits sets the limits for pending msgs and bytes for this subscription.// Zero is not allowed. Any negative value means that the given metric is not limited.func ( *Subscription) (, int) error {if == nil {returnErrBadSubscription } .mu.Lock()defer .mu.Unlock()if .conn == nil || .closed {returnErrBadSubscription }if .typ == ChanSubscription {returnErrTypeSubscription }if == 0 || == 0 {returnErrInvalidArg } .pMsgsLimit, .pBytesLimit = , returnnil}// Delivered returns the number of delivered messages for this subscription.func ( *Subscription) () (int64, error) {if == nil {return -1, ErrBadSubscription } .mu.Lock()defer .mu.Unlock()if .conn == nil || .closed {return -1, ErrBadSubscription }returnint64(.delivered), nil}// Dropped returns the number of known dropped messages for this subscription.// This will correspond to messages dropped by violations of PendingLimits. If// the server declares the connection a SlowConsumer, this number may not be// valid.func ( *Subscription) () (int, error) {if == nil {return -1, ErrBadSubscription } .mu.Lock()defer .mu.Unlock()if .conn == nil || .closed {return -1, ErrBadSubscription }return .dropped, nil}// Respond allows a convenient way to respond to requests in service based subscriptions.func ( *Msg) ( []byte) error {if == nil || .Sub == nil {returnErrMsgNotBound }if .Reply == "" {returnErrMsgNoReply } .Sub.mu.Lock() := .Sub.conn .Sub.mu.Unlock()// No need to check the connection here since the call to publish will do all the checking.return .Publish(.Reply, )}// RespondMsg allows a convenient way to respond to requests in service based subscriptions that might include headersfunc ( *Msg) ( *Msg) error {if == nil || .Sub == nil {returnErrMsgNotBound }if .Reply == "" {returnErrMsgNoReply } .Subject = .Reply .Sub.mu.Lock() := .Sub.conn .Sub.mu.Unlock()// No need to check the connection here since the call to publish will do all the checking.return .PublishMsg()}// FIXME: This is a hack// removeFlushEntry is needed when we need to discard queued up responses// for our pings as part of a flush call. This happens when we have a flush// call outstanding and we call close.func ( *Conn) ( chanstruct{}) bool { .mu.Lock()defer .mu.Unlock()if .pongs == nil {returnfalse }for , := range .pongs {if == { .pongs[] = nilreturntrue } }returnfalse}// The lock must be held entering this function.func ( *Conn) ( chanstruct{}) { .pongs = append(.pongs, ) .bw.appendString(pingProto)// Flush in place. .bw.flush()}// This will fire periodically and send a client origin// ping to the server. Will also check that we have received// responses from the server.func ( *Conn) () { .mu.Lock()if .status != CONNECTED { .mu.Unlock()return }// Check for violation .pout++if .pout > .Opts.MaxPingsOut { .mu.Unlock()if := .processOpErr(ErrStaleConnection); { .close(CLOSED, true, nil) }return } .sendPing(nil) .ptmr.Reset(.Opts.PingInterval) .mu.Unlock()}// FlushTimeout allows a Flush operation to have an associated timeout.func ( *Conn) ( time.Duration) ( error) {if == nil {returnErrInvalidConnection }if <= 0 {returnErrBadTimeout } .mu.Lock()if .isClosed() { .mu.Unlock()returnErrConnectionClosed } := globalTimerPool.Get()deferglobalTimerPool.Put()// Create a buffered channel to prevent chan send to block // in processPong() if this code here times out just when // PONG was received. := make(chanstruct{}, 1) .sendPing() .mu.Unlock()select {case , := <-:if ! { = ErrConnectionClosed } else {close() }case<-.C: = ErrTimeout }if != nil { .removeFlushEntry() }return}// RTT calculates the round trip time between this client and the server.func ( *Conn) () (time.Duration, error) {if .IsClosed() {return0, ErrConnectionClosed }if .IsReconnecting() {return0, ErrDisconnected } := time.Now()if := .FlushTimeout(10 * time.Second); != nil {return0, }returntime.Since(), nil}// Flush will perform a round trip to the server and return when it// receives the internal reply.func ( *Conn) () error {return .FlushTimeout(10 * time.Second)}// Buffered will return the number of bytes buffered to be sent to the server.// FIXME(dlc) take into account disconnected state.func ( *Conn) () (int, error) { .mu.RLock()defer .mu.RUnlock()if .isClosed() || .bw == nil {return -1, ErrConnectionClosed }return .bw.buffered(), nil}// resendSubscriptions will send our subscription state back to the// server. Used in reconnectsfunc ( *Conn) () {// Since we are going to send protocols to the server, we don't want to // be holding the subsMu lock (which is used in processMsg). So copy // the subscriptions in a temporary array. .subsMu.RLock() := make([]*Subscription, 0, len(.subs))for , := range .subs { = append(, ) } .subsMu.RUnlock()for , := range { := uint64(0) .mu.Lock()// when resending subscriptions, the permissions error should be cleared // since the user may have fixed the permissions issue .permissionsErr = nilif .max > 0 {if .delivered < .max { = .max - .delivered }// adjustedMax could be 0 here if the number of delivered msgs // reached the max, if so unsubscribe.if == 0 { .mu.Unlock() .bw.writeDirect(fmt.Sprintf(unsubProto, .sid, _EMPTY_))continue } } , , := .Subject, .Queue, .sid .mu.Unlock() .bw.writeDirect(fmt.Sprintf(subProto, , , ))if > 0 { := strconv.Itoa(int()) .bw.writeDirect(fmt.Sprintf(unsubProto, , )) } }}// This will clear any pending flush calls and release pending calls.// Lock is assumed to be held by the caller.func ( *Conn) () {// Clear any queued pongs, e.g. pending flush calls.for , := range .pongs {if != nil {close() } } .pongs = nil}// This will clear any pending Request calls.// Lock is assumed to be held by the caller.func ( *Conn) () {for , := range .respMap {if != nil {close()delete(.respMap, ) } }}// Low level close call that will do correct cleanup and set// desired status. Also controls whether user defined callbacks// will be triggered. The lock should not be held entering this// function. This function will handle the locking manually.func ( *Conn) ( Status, bool, error) { .mu.Lock()if .isClosed() { .status = .mu.Unlock()return } .status = CLOSED// Kick the Go routines so they fall out. .kickFlusher()// If the reconnect timer is waiting between a reconnect attempt, // this will kick it out.if .rqch != nil {close(.rqch) .rqch = nil }// Clear any queued pongs, e.g. pending flush calls. .clearPendingFlushCalls()// Clear any queued and blocking Requests. .clearPendingRequestCalls()// Stop ping timer if set. .stopPingTimer() .ptmr = nil// Need to close and set TCP conn to nil if reconnect loop has stopped, // otherwise we would incorrectly invoke Disconnect handler (if set) // down below.if .ar && .conn != nil { .conn.Close() .conn = nil } elseif .conn != nil {// Go ahead and make sure we have flushed the outbound .bw.flush()defer .conn.Close() }// Close sync subscriber channels and release any // pending NextMsg() calls. .subsMu.Lock()for , := range .subs { .mu.Lock()// Release callers on NextMsg for SyncSubscription onlyif .mch != nil && .typ == SyncSubscription {close(.mch) } .mch = nil// Mark as invalid, for signaling to waitForMsgs .closed = true// Mark connection closed in subscription .connClosed = true// If we have an async subscription, signals it to exitif .typ == AsyncSubscription && .pCond != nil { .pCond.Signal() } .mu.Unlock() } .subs = nil .subsMu.Unlock() .changeConnStatus()// Perform appropriate callback if needed for a disconnect.if {if .conn != nil {if := .Opts.DisconnectedErrCB; != nil { .ach.push(func() { (, ) }) } elseif := .Opts.DisconnectedCB; != nil { .ach.push(func() { () }) } }if .Opts.ClosedCB != nil { .ach.push(func() { .Opts.ClosedCB() }) } }// If this is terminal, then we have to notify the asyncCB handler that // it can exit once all async callbacks have been dispatched.if == CLOSED { .ach.close() } .mu.Unlock()}// Close will close the connection to the server. This call will release// all blocking calls, such as Flush() and NextMsg()func ( *Conn) () {if != nil {// This will be a no-op if the connection was not websocket. // We do this here as opposed to inside close() because we want // to do this only for the final user-driven close of the client. // Otherwise, we would need to change close() to pass a boolean // indicating that this is the case. .wsClose() .close(CLOSED, !.Opts.NoCallbacksAfterClientClose, nil) }}// IsClosed tests if a Conn has been closed.func ( *Conn) () bool { .mu.RLock()defer .mu.RUnlock()return .isClosed()}// IsReconnecting tests if a Conn is reconnecting.func ( *Conn) () bool { .mu.RLock()defer .mu.RUnlock()return .isReconnecting()}// IsConnected tests if a Conn is connected.func ( *Conn) () bool { .mu.RLock()defer .mu.RUnlock()return .isConnected()}// drainConnection will run in a separate Go routine and will// flush all publishes and drain all active subscriptions.func ( *Conn) () {// Snapshot subs list. .mu.Lock()// Check again here if we are in a state to not process.if .isClosed() { .mu.Unlock()return }if .isConnecting() || .isReconnecting() { .mu.Unlock()// Move to closed state. .Close()return } := make([]*Subscription, 0, len(.subs))for , := range .subs {if == .respMux {// Skip since might be in use while messages // are being processed (can miss responses).continue } = append(, ) } := .Opts.AsyncErrorCB := .Opts.DrainTimeout := .respMux .mu.Unlock()// for pushing errors with context. := func( error) { .mu.Lock() .err = if != nil { .ach.push(func() { (, nil, ) }) } .mu.Unlock() }// Do subs first, skip request handler if present.for , := range {if := .Drain(); != nil {// We will notify about these but continue. () } }// Wait for the subscriptions to drop to zero. := time.Now().Add()varintif != nil { = 1 } else { = 0 }fortime.Now().Before() {if .NumSubscriptions() == {break }time.Sleep(10 * time.Millisecond) }// In case there was a request/response handler // then need to call drain at the end.if != nil {if := .Drain(); != nil {// We will notify about these but continue. () }fortime.Now().Before() {if .NumSubscriptions() == 0 {break }time.Sleep(10 * time.Millisecond) } }// Check if we timed out.if .NumSubscriptions() != 0 { (ErrDrainTimeout) }// Flip State .mu.Lock() .changeConnStatus(DRAINING_PUBS) .mu.Unlock()// Do publish drain via Flush() call. := .FlushTimeout(5 * time.Second)if != nil { () }// Move to closed state. .Close()}// Drain will put a connection into a drain state. All subscriptions will// immediately be put into a drain state. Upon completion, the publishers// will be drained and can not publish any additional messages. Upon draining// of the publishers, the connection will be closed. Use the ClosedCB// option to know when the connection has moved from draining to closed.//// See note in Subscription.Drain for JetStream subscriptions.func ( *Conn) () error { .mu.Lock()if .isClosed() { .mu.Unlock()returnErrConnectionClosed }if .isConnecting() || .isReconnecting() { .mu.Unlock() .Close()returnErrConnectionReconnecting }if .isDraining() { .mu.Unlock()returnnil } .changeConnStatus(DRAINING_SUBS)go .drainConnection() .mu.Unlock()returnnil}// IsDraining tests if a Conn is in the draining state.func ( *Conn) () bool { .mu.RLock()defer .mu.RUnlock()return .isDraining()}// caller must lockfunc ( *Conn) ( bool) []string { := len(.srvPool) := make([]string, 0)for := 0; < ; ++ {if && !.srvPool[].isImplicit {continue } := .srvPool[].url = append(, fmt.Sprintf("%s://%s", .Scheme, .Host)) }return}// Servers returns the list of known server urls, including additional// servers discovered after a connection has been established. If// authentication is enabled, use UserInfo or Token when connecting with// these urls.func ( *Conn) () []string { .mu.RLock()defer .mu.RUnlock()return .getServers(false)}// DiscoveredServers returns only the server urls that have been discovered// after a connection has been established. If authentication is enabled,// use UserInfo or Token when connecting with these urls.func ( *Conn) () []string { .mu.RLock()defer .mu.RUnlock()return .getServers(true)}// Status returns the current state of the connection.func ( *Conn) () Status { .mu.RLock()defer .mu.RUnlock()return .status}// Test if Conn has been closed Lock is assumed held.func ( *Conn) () bool {return .status == CLOSED}// Test if Conn is in the process of connectingfunc ( *Conn) () bool {return .status == CONNECTING}// Test if Conn is being reconnected.func ( *Conn) () bool {return .status == RECONNECTING}// Test if Conn is connected or connecting.func ( *Conn) () bool {return .status == CONNECTED || .isDraining()}// Test if Conn is in the draining state.func ( *Conn) () bool {return .status == DRAINING_SUBS || .status == DRAINING_PUBS}// Test if Conn is in the draining state for pubs.func ( *Conn) () bool {return .status == DRAINING_PUBS}// Stats will return a race safe copy of the Statistics section for the connection.func ( *Conn) () Statistics {// Stats are updated either under connection's mu or with atomic operations // for inbound stats in processMsg(). .mu.Lock() := Statistics{InMsgs: atomic.LoadUint64(&.InMsgs),InBytes: atomic.LoadUint64(&.InBytes),OutMsgs: .OutMsgs,OutBytes: .OutBytes,Reconnects: .Reconnects, } .mu.Unlock()return}// MaxPayload returns the size limit that a message payload can have.// This is set by the server configuration and delivered to the client// upon connect.func ( *Conn) () int64 { .mu.RLock()defer .mu.RUnlock()return .info.MaxPayload}// HeadersSupported will return if the server supports headersfunc ( *Conn) () bool { .mu.RLock()defer .mu.RUnlock()return .info.Headers}// AuthRequired will return if the connected server requires authorization.func ( *Conn) () bool { .mu.RLock()defer .mu.RUnlock()return .info.AuthRequired}// TLSRequired will return if the connected server requires TLS connections.func ( *Conn) () bool { .mu.RLock()defer .mu.RUnlock()return .info.TLSRequired}// Barrier schedules the given function `f` to all registered asynchronous// subscriptions.// Only the last subscription to see this barrier will invoke the function.// If no subscription is registered at the time of this call, `f()` is invoked// right away.// ErrConnectionClosed is returned if the connection is closed prior to// the call.func ( *Conn) ( func()) error { .mu.Lock()if .isClosed() { .mu.Unlock()returnErrConnectionClosed } .subsMu.Lock()// Need to figure out how many non chan subscriptions there are := 0for , := range .subs {if .typ == AsyncSubscription { ++ } }if == 0 { .subsMu.Unlock() .mu.Unlock() ()returnnil } := &barrierInfo{refs: int64(), f: }for , := range .subs { .mu.Lock()if .mch == nil { := &Msg{barrier: }// Push onto the async pListif .pTail != nil { .pTail.next = } else { .pHead = .pCond.Signal() } .pTail = } .mu.Unlock() } .subsMu.Unlock() .mu.Unlock()returnnil}// GetClientIP returns the client IP as known by the server.// Supported as of server version 2.1.6.func ( *Conn) () (net.IP, error) { .mu.RLock()defer .mu.RUnlock()if .isClosed() {returnnil, ErrConnectionClosed }if .info.ClientIP == "" {returnnil, ErrClientIPNotSupported } := net.ParseIP(.info.ClientIP)return , nil}// GetClientID returns the client ID assigned by the server to which// the client is currently connected to. Note that the value may change if// the client reconnects.// This function returns ErrClientIDNotSupported if the server is of a// version prior to 1.2.0.func ( *Conn) () (uint64, error) { .mu.RLock()defer .mu.RUnlock()if .isClosed() {return0, ErrConnectionClosed }if .info.CID == 0 {return0, ErrClientIDNotSupported }return .info.CID, nil}// StatusChanged returns a channel on which given list of connection status changes will be reported.// If no statuses are provided, defaults will be used: CONNECTED, RECONNECTING, DISCONNECTED, CLOSED.func ( *Conn) ( ...Status) chanStatus {iflen() == 0 { = []Status{CONNECTED, RECONNECTING, DISCONNECTED, CLOSED} } := make(chanStatus, 10) .mu.Lock()defer .mu.Unlock()for , := range { .registerStatusChangeListener(, ) }return}// RemoveStatusListener removes a status change listener.// If the channel is not closed, it will be closed.// Listeners will be removed automatically on status change// as well, but this is a way to remove them manually.func ( *Conn) ( chan (Status)) { .mu.Lock()defer .mu.Unlock()select {case<-:default:close() }for , := range .statListeners {for := range {delete(, ) } }}// registerStatusChangeListener registers a channel waiting for a specific status change event.// Status change events are non-blocking - if no receiver is waiting for the status change,// it will not be sent on the channel. Closed channels are ignored.// The lock should be held entering.func ( *Conn) ( Status, chanStatus) {if .statListeners == nil { .statListeners = make(map[Status]map[chanStatus]struct{}) }if , := .statListeners[]; ! { .statListeners[] = make(map[chanStatus]struct{}) } .statListeners[][] = struct{}{}}// sendStatusEvent sends connection status event to all channels.// If channel is closed, or there is no listener, sendStatusEvent// will not block. Lock should be held entering.func ( *Conn) ( Status) {:for := range .statListeners[] {// make sure channel is not closedselect {case<-:// if chan is closed, remove itdelete(.statListeners[], )continuedefault: }// only send event if someone's listeningselect {case<- :default: } }}// changeConnStatus changes connections status and sends events// to all listeners. Lock should be held entering.func ( *Conn) ( Status) {if == nil {return } .sendStatusEvent() .status = }// NkeyOptionFromSeed will load an nkey pair from a seed file.// It will return the NKey Option and will handle// signing of nonce challenges from the server. It will take// care to not hold keys in memory and to wipe memory.func ( string) (Option, error) { , := nkeyPairFromSeedFile()if != nil {returnnil, }// Wipe our key on exit.defer .Wipe() , := .PublicKey()if != nil {returnnil, }if !nkeys.IsValidPublicUserKey() {returnnil, errors.New("nats: Not a valid nkey user seed") } := func( []byte) ([]byte, error) {returnsigHandler(, ) }returnNkey(string(), ), nil}// Just wipe slice with 'x', for clearing contents of creds or nkey seed file.func wipeSlice( []byte) {for := range { [] = 'x' }}func userFromFile( string) (string, error) { , := expandPath()if != nil {return_EMPTY_, fmt.Errorf("nats: %w", ) } , := os.ReadFile()if != nil {return_EMPTY_, fmt.Errorf("nats: %w", ) }deferwipeSlice()returnnkeys.ParseDecoratedJWT()}func homeDir() (string, error) {ifruntime.GOOS == "windows" { , := os.Getenv("HOMEDRIVE"), os.Getenv("HOMEPATH") := os.Getenv("USERPROFILE")varstringif == "" || == "" {if == "" {return_EMPTY_, errors.New("nats: failed to get home dir, require %HOMEDRIVE% and %HOMEPATH% or %USERPROFILE%") } = } else { = filepath.Join(, ) }return , nil } := os.Getenv("HOME")if == "" {return_EMPTY_, errors.New("nats: failed to get home dir, require $HOME") }return , nil}func expandPath( string) (string, error) { = os.ExpandEnv()if !strings.HasPrefix(, "~") {return , nil } , := homeDir()if != nil {return_EMPTY_, }returnfilepath.Join(, [1:]), nil}func nkeyPairFromSeedFile( string) (nkeys.KeyPair, error) { , := os.ReadFile()if != nil {returnnil, fmt.Errorf("nats: %w", ) }deferwipeSlice()returnnkeys.ParseDecoratedNKey()}// Sign authentication challenges from the server.// Do not keep private seed in memory.func sigHandler( []byte, string) ([]byte, error) { , := nkeyPairFromSeedFile()if != nil {returnnil, fmt.Errorf("unable to extract key pair from file %q: %w", , ) }// Wipe our key on exit.defer .Wipe() , := .Sign()return , nil}type timeoutWriter struct { timeout time.Duration conn net.Conn err error}// Write implements the io.Writer interface.func ( *timeoutWriter) ( []byte) (int, error) {if .err != nil {return0, .err }varint .conn.SetWriteDeadline(time.Now().Add(.timeout)) , .err = .conn.Write() .conn.SetWriteDeadline(time.Time{})return , .err}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.