// Copyright 2012-2023 The NATS Authors// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.package natsimport (// Default Encoders)//lint:file-ignore SA1019 Ignore deprecation warnings for EncodedConn// Encoder interface is for all register encoders//// Deprecated: Encoded connections are no longer supported.typeEncoderinterface {Encode(subject string, v any) ([]byte, error)Decode(subject string, data []byte, vPtr any) error}var encMap map[string]Encodervar encLock sync.Mutex// Indexed names into the Registered Encoders.const (JSON_ENCODER = "json"GOB_ENCODER = "gob"DEFAULT_ENCODER = "default")func init() {encMap = make(map[string]Encoder)// Register json, gob and default encoderRegisterEncoder(JSON_ENCODER, &builtin.JsonEncoder{})RegisterEncoder(GOB_ENCODER, &builtin.GobEncoder{})RegisterEncoder(DEFAULT_ENCODER, &builtin.DefaultEncoder{})}// EncodedConn are the preferred way to interface with NATS. They wrap a bare connection to// a nats server and have an extendable encoder system that will encode and decode messages// from raw Go types.//// Deprecated: Encoded connections are no longer supported.typeEncodedConnstruct { Conn *Conn Enc Encoder}// NewEncodedConn will wrap an existing Connection and utilize the appropriate registered// encoder.//// Deprecated: Encoded connections are no longer supported.func ( *Conn, string) (*EncodedConn, error) {if == nil {returnnil, errors.New("nats: Nil Connection") }if .IsClosed() {returnnil, ErrConnectionClosed } := &EncodedConn{Conn: , Enc: EncoderForType()}if .Enc == nil {returnnil, fmt.Errorf("no encoder registered for '%s'", ) }return , nil}// RegisterEncoder will register the encType with the given Encoder. Useful for customization.//// Deprecated: Encoded connections are no longer supported.func ( string, Encoder) {encLock.Lock()deferencLock.Unlock()encMap[] = }// EncoderForType will return the registered Encoder for the encType.//// Deprecated: Encoded connections are no longer supported.func ( string) Encoder {encLock.Lock()deferencLock.Unlock()returnencMap[]}// Publish publishes the data argument to the given subject. The data argument// will be encoded using the associated encoder.//// Deprecated: Encoded connections are no longer supported.func ( *EncodedConn) ( string, any) error { , := .Enc.Encode(, )if != nil {return }return .Conn.publish(, _EMPTY_, nil, )}// PublishRequest will perform a Publish() expecting a response on the// reply subject. Use Request() for automatically waiting for a response// inline.//// Deprecated: Encoded connections are no longer supported.func ( *EncodedConn) (, string, any) error { , := .Enc.Encode(, )if != nil {return }return .Conn.publish(, , nil, )}// Request will create an Inbox and perform a Request() call// with the Inbox reply for the data v. A response will be// decoded into the vPtr Response.//// Deprecated: Encoded connections are no longer supported.func ( *EncodedConn) ( string, any, any, time.Duration) error { , := .Enc.Encode(, )if != nil {return } , := .Conn.Request(, , )if != nil {return }ifreflect.TypeOf() == emptyMsgType { := .(*Msg) * = * } else { = .Enc.Decode(.Subject, .Data, ) }return}// Handler is a specific callback used for Subscribe. It is generalized to// an any, but we will discover its format and arguments at runtime// and perform the correct callback, including demarshaling encoded data// back into the appropriate struct based on the signature of the Handler.//// Handlers are expected to have one of four signatures.//// type person struct {// Name string `json:"name,omitempty"`// Age uint `json:"age,omitempty"`// }//// handler := func(m *Msg)// handler := func(p *person)// handler := func(subject string, o *obj)// handler := func(subject, reply string, o *obj)//// These forms allow a callback to request a raw Msg ptr, where the processing// of the message from the wire is untouched. Process a JSON representation// and demarshal it into the given struct, e.g. person.// There are also variants where the callback wants either the subject, or the// subject and the reply subject.//// Deprecated: Encoded connections are no longer supported.typeHandlerany// Dissect the cb Handler's signaturefunc argInfo( Handler) (reflect.Type, int) { := reflect.TypeOf()if .Kind() != reflect.Func {panic("nats: Handler needs to be a func") } := .NumIn()if == 0 {returnnil, }return .In( - 1), }var emptyMsgType = reflect.TypeOf(&Msg{})// Subscribe will create a subscription on the given subject and process incoming// messages using the specified Handler. The Handler should be a func that matches// a signature from the description of Handler from above.//// Deprecated: Encoded connections are no longer supported.func ( *EncodedConn) ( string, Handler) (*Subscription, error) {return .subscribe(, _EMPTY_, )}// QueueSubscribe will create a queue subscription on the given subject and process// incoming messages using the specified Handler. The Handler should be a func that// matches a signature from the description of Handler from above.//// Deprecated: Encoded connections are no longer supported.func ( *EncodedConn) (, string, Handler) (*Subscription, error) {return .subscribe(, , )}// Internal implementation that all public functions will use.func ( *EncodedConn) (, string, Handler) (*Subscription, error) {if == nil {returnnil, errors.New("nats: Handler required for EncodedConn Subscription") } , := argInfo()if == nil {returnnil, errors.New("nats: Handler requires at least one argument") } := reflect.ValueOf() := ( == emptyMsgType) := func( *Msg) {var []reflect.Valueif { = []reflect.Value{reflect.ValueOf()} } else {varreflect.Valueif .Kind() != reflect.Ptr { = reflect.New() } else { = reflect.New(.Elem()) }if := .Enc.Decode(.Subject, .Data, .Interface()); != nil {if .Conn.Opts.AsyncErrorCB != nil { .Conn.ach.push(func() { .Conn.Opts.AsyncErrorCB(.Conn, .Sub, errors.New("nats: Got an error trying to unmarshal: "+.Error())) }) }return }if .Kind() != reflect.Ptr { = reflect.Indirect() }// Callback Arityswitch {case1: = []reflect.Value{}case2: := reflect.ValueOf(.Subject) = []reflect.Value{, }case3: := reflect.ValueOf(.Subject) := reflect.ValueOf(.Reply) = []reflect.Value{, , } } } .Call() }return .Conn.subscribe(, , , nil, nil, false, nil)}// FlushTimeout allows a Flush operation to have an associated timeout.//// Deprecated: Encoded connections are no longer supported.func ( *EncodedConn) ( time.Duration) ( error) {return .Conn.FlushTimeout()}// Flush will perform a round trip to the server and return when it// receives the internal reply.//// Deprecated: Encoded connections are no longer supported.func ( *EncodedConn) () error {return .Conn.Flush()}// Close will close the connection to the server. This call will release// all blocking calls, such as Flush(), etc.//// Deprecated: Encoded connections are no longer supported.func ( *EncodedConn) () { .Conn.Close()}// Drain will put a connection into a drain state. All subscriptions will// immediately be put into a drain state. Upon completion, the publishers// will be drained and can not publish any additional messages. Upon draining// of the publishers, the connection will be closed. Use the ClosedCB()// option to know when the connection has moved from draining to closed.//// Deprecated: Encoded connections are no longer supported.func ( *EncodedConn) () error {return .Conn.Drain()}// LastError reports the last error encountered via the Connection.//// Deprecated: Encoded connections are no longer supported.func ( *EncodedConn) () error {return .Conn.LastError()}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.