// Copyright 2013-2023 The NATS Authors// Licensed under the Apache License, Version 2.0 (the "License");// you may not use this file except in compliance with the License.// You may obtain a copy of the License at//// http://www.apache.org/licenses/LICENSE-2.0//// Unless required by applicable law or agreed to in writing, software// distributed under the License is distributed on an "AS IS" BASIS,// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.// See the License for the specific language governing permissions and// limitations under the License.package natsimport ()// This allows the functionality for network channels by binding send and receive Go chans// to subjects and optionally queue groups.// Data will be encoded and decoded via the EncodedConn and its associated encoders.// BindSendChan binds a channel for send operations to NATS.//// Deprecated: Encoded connections are no longer supported.func ( *EncodedConn) ( string, any) error { := reflect.ValueOf()if .Kind() != reflect.Chan {returnErrChanArg }gochPublish(, , )returnnil}// Publish all values that arrive on the channel until it is closed or we// encounter an error.func chPublish( *EncodedConn, reflect.Value, string) {for { , := .Recv()if ! {// Channel has most likely been closed.return }if := .Publish(, .Interface()); != nil {// Do this under lock. .Conn.mu.Lock()defer .Conn.mu.Unlock()if .Conn.Opts.AsyncErrorCB != nil {// FIXME(dlc) - Not sure this is the right thing to do. // FIXME(ivan) - If the connection is not yet closed, try to schedule the callbackif .Conn.isClosed() {go .Conn.Opts.AsyncErrorCB(.Conn, nil, ) } else { .Conn.ach.push(func() { .Conn.Opts.AsyncErrorCB(.Conn, nil, ) }) } }return } }}// BindRecvChan binds a channel for receive operations from NATS.//// Deprecated: Encoded connections are no longer supported.func ( *EncodedConn) ( string, any) (*Subscription, error) {return .bindRecvChan(, _EMPTY_, )}// BindRecvQueueChan binds a channel for queue-based receive operations from NATS.//// Deprecated: Encoded connections are no longer supported.func ( *EncodedConn) (, string, any) (*Subscription, error) {return .bindRecvChan(, , )}// Internal function to bind receive operations for a channel.func ( *EncodedConn) (, string, any) (*Subscription, error) { := reflect.ValueOf()if .Kind() != reflect.Chan {returnnil, ErrChanArg } := .Type().Elem() := func( *Msg) {varreflect.Valueif .Kind() != reflect.Ptr { = reflect.New() } else { = reflect.New(.Elem()) }if := .Enc.Decode(.Subject, .Data, .Interface()); != nil { .Conn.err = errors.New("nats: Got an error trying to unmarshal: " + .Error())if .Conn.Opts.AsyncErrorCB != nil { .Conn.ach.push(func() { .Conn.Opts.AsyncErrorCB(.Conn, .Sub, .Conn.err) }) }return }if .Kind() != reflect.Ptr { = reflect.Indirect() }// This is a bit hacky, but in this instance we may be trying to send to a closed channel. // and the user does not know when it is safe to close the channel.deferfunc() {// If we have panicked, recover and close the subscription.if := recover(); != nil { .Sub.Unsubscribe() } }()// Actually do the send to the channel. .Send() }return .Conn.subscribe(, , , nil, nil, false, nil)}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.