// Copyright 2016-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package nats

import (
	
	
)

// RequestMsgWithContext takes a context, a subject and payload
// in bytes and request expecting a single response.
func ( *Conn) ( context.Context,  *Msg) (*Msg, error) {
	if  == nil {
		return nil, ErrInvalidMsg
	}
	,  := .headerBytes()
	if  != nil {
		return nil, 
	}
	return .requestWithContext(, .Subject, , .Data)
}

// RequestWithContext takes a context, a subject and payload
// in bytes and request expecting a single response.
func ( *Conn) ( context.Context,  string,  []byte) (*Msg, error) {
	return .requestWithContext(, , nil, )
}

func ( *Conn) ( context.Context,  string, ,  []byte) (*Msg, error) {
	if  == nil {
		return nil, ErrInvalidContext
	}
	if  == nil {
		return nil, ErrInvalidConnection
	}
	// Check whether the context is done already before making
	// the request.
	if .Err() != nil {
		return nil, .Err()
	}

	var  *Msg
	var  error

	// If user wants the old style.
	if .useOldRequestStyle() {
		,  = .oldRequestWithContext(, , , )
	} else {
		, ,  := .createNewRequestAndSend(, , )
		if  != nil {
			return nil, 
		}

		var  bool

		select {
		case ,  = <-:
			if ! {
				return nil, ErrConnectionClosed
			}
		case <-.Done():
			.mu.Lock()
			delete(.respMap, )
			.mu.Unlock()
			return nil, .Err()
		}
	}
	// Check for no responder status.
	if  == nil && len(.Data) == 0 && .Header.Get(statusHdr) == noResponders {
		,  = nil, ErrNoResponders
	}
	return , 
}

// oldRequestWithContext utilizes inbox and subscription per request.
func ( *Conn) ( context.Context,  string, ,  []byte) (*Msg, error) {
	 := .NewInbox()
	 := make(chan *Msg, RequestChanLen)

	,  := .subscribe(, _EMPTY_, nil, , nil, true, nil)
	if  != nil {
		return nil, 
	}
	.AutoUnsubscribe(1)
	defer .Unsubscribe()

	 = .publish(, , false, , )
	if  != nil {
		return nil, 
	}

	return .NextMsgWithContext()
}

func ( *Subscription) ( context.Context, ,  bool) (*Msg, error) {
	if  == nil {
		return nil, ErrInvalidContext
	}
	if  == nil {
		return nil, ErrBadSubscription
	}
	if .Err() != nil {
		return nil, .Err()
	}

	.mu.Lock()
	 := .validateNextMsgState()
	if  != nil {
		.mu.Unlock()
		return nil, 
	}

	// snapshot
	 := .mch
	.mu.Unlock()

	var  bool
	var  *Msg

	// If something is available right away, let's optimize that case.
	select {
	case ,  = <-:
		if ! {
			return nil, .getNextMsgErr()
		}
		if  := .processNextMsgDelivered();  != nil {
			return nil, 
		}
		return , nil
	default:
		// If internal and we don't want to wait, signal that there is no
		// message in the internal queue.
		if  && ! {
			return nil, errNoMessages
		}
	}

	select {
	case ,  = <-:
		if ! {
			return nil, .getNextMsgErr()
		}
		if  := .processNextMsgDelivered();  != nil {
			return nil, 
		}
	case <-.Done():
		return nil, .Err()
	}

	return , nil
}

// NextMsgWithContext takes a context and returns the next message
// available to a synchronous subscriber, blocking until it is delivered
// or context gets canceled.
func ( *Subscription) ( context.Context) (*Msg, error) {
	return .nextMsgWithContext(, false, true)
}

// FlushWithContext will allow a context to control the duration
// of a Flush() call. This context should be non-nil and should
// have a deadline set. We will return an error if none is present.
func ( *Conn) ( context.Context) error {
	if  == nil {
		return ErrInvalidConnection
	}
	if  == nil {
		return ErrInvalidContext
	}
	,  := .Deadline()
	if ! {
		return ErrNoDeadlineContext
	}

	.mu.Lock()
	if .isClosed() {
		.mu.Unlock()
		return ErrConnectionClosed
	}
	// Create a buffered channel to prevent chan send to block
	// in processPong()
	 := make(chan struct{}, 1)
	.sendPing()
	.mu.Unlock()

	var  error

	select {
	case ,  := <-:
		if ! {
			 = ErrConnectionClosed
		} else {
			close()
		}
	case <-.Done():
		 = .Err()
	}

	if  != nil {
		.removeFlushEntry()
	}

	return 
}

// RequestWithContext will create an Inbox and perform a Request
// using the provided cancellation context with the Inbox reply
// for the data v. A response will be decoded into the vPtr last parameter.
//
// Deprecated: Encoded connections are no longer supported.
func ( *EncodedConn) ( context.Context,  string,  any,  any) error {
	if  == nil {
		return ErrInvalidContext
	}

	,  := .Enc.Encode(, )
	if  != nil {
		return 
	}
	,  := .Conn.RequestWithContext(, , )
	if  != nil {
		return 
	}
	if reflect.TypeOf() == emptyMsgType {
		 := .(*Msg)
		* = *
	} else {
		 := .Enc.Decode(.Subject, .Data, )
		if  != nil {
			return 
		}
	}

	return nil
}