// Copyright 2013-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nats

import (
	
	
)

// This allows the functionality for network channels by binding send and receive Go chans
// to subjects and optionally queue groups.
// Data will be encoded and decoded via the EncodedConn and its associated encoders.

// BindSendChan binds a channel for send operations to NATS.
//
// Deprecated: Encoded connections are no longer supported.
func ( *EncodedConn) ( string,  any) error {
	 := reflect.ValueOf()
	if .Kind() != reflect.Chan {
		return ErrChanArg
	}
	go chPublish(, , )
	return nil
}

// Publish all values that arrive on the channel until it is closed or we
// encounter an error.
func chPublish( *EncodedConn,  reflect.Value,  string) {
	for {
		,  := .Recv()
		if ! {
			// Channel has most likely been closed.
			return
		}
		if  := .Publish(, .Interface());  != nil {
			// Do this under lock.
			.Conn.mu.Lock()
			defer .Conn.mu.Unlock()

			if .Conn.Opts.AsyncErrorCB != nil {
				// FIXME(dlc) - Not sure this is the right thing to do.
				// FIXME(ivan) - If the connection is not yet closed, try to schedule the callback
				if .Conn.isClosed() {
					go .Conn.Opts.AsyncErrorCB(.Conn, nil, )
				} else {
					.Conn.ach.push(func() { .Conn.Opts.AsyncErrorCB(.Conn, nil, ) })
				}
			}
			return
		}
	}
}

// BindRecvChan binds a channel for receive operations from NATS.
//
// Deprecated: Encoded connections are no longer supported.
func ( *EncodedConn) ( string,  any) (*Subscription, error) {
	return .bindRecvChan(, _EMPTY_, )
}

// BindRecvQueueChan binds a channel for queue-based receive operations from NATS.
//
// Deprecated: Encoded connections are no longer supported.
func ( *EncodedConn) (,  string,  any) (*Subscription, error) {
	return .bindRecvChan(, , )
}

// Internal function to bind receive operations for a channel.
func ( *EncodedConn) (,  string,  any) (*Subscription, error) {
	 := reflect.ValueOf()
	if .Kind() != reflect.Chan {
		return nil, ErrChanArg
	}
	 := .Type().Elem()

	 := func( *Msg) {
		var  reflect.Value
		if .Kind() != reflect.Ptr {
			 = reflect.New()
		} else {
			 = reflect.New(.Elem())
		}
		if  := .Enc.Decode(.Subject, .Data, .Interface());  != nil {
			.Conn.err = errors.New("nats: Got an error trying to unmarshal: " + .Error())
			if .Conn.Opts.AsyncErrorCB != nil {
				.Conn.ach.push(func() { .Conn.Opts.AsyncErrorCB(.Conn, .Sub, .Conn.err) })
			}
			return
		}
		if .Kind() != reflect.Ptr {
			 = reflect.Indirect()
		}
		// This is a bit hacky, but in this instance we may be trying to send to a closed channel.
		// and the user does not know when it is safe to close the channel.
		defer func() {
			// If we have panicked, recover and close the subscription.
			if  := recover();  != nil {
				.Sub.Unsubscribe()
			}
		}()
		// Actually do the send to the channel.
		.Send()
	}

	return .Conn.subscribe(, , , nil, nil, false, nil)
}