package nats
import (
"context"
"reflect"
)
func (nc *Conn ) RequestMsgWithContext (ctx context .Context , msg *Msg ) (*Msg , error ) {
if msg == nil {
return nil , ErrInvalidMsg
}
hdr , err := msg .headerBytes ()
if err != nil {
return nil , err
}
return nc .requestWithContext (ctx , msg .Subject , hdr , msg .Data )
}
func (nc *Conn ) RequestWithContext (ctx context .Context , subj string , data []byte ) (*Msg , error ) {
return nc .requestWithContext (ctx , subj , nil , data )
}
func (nc *Conn ) requestWithContext (ctx context .Context , subj string , hdr , data []byte ) (*Msg , error ) {
if ctx == nil {
return nil , ErrInvalidContext
}
if nc == nil {
return nil , ErrInvalidConnection
}
if ctx .Err () != nil {
return nil , ctx .Err ()
}
var m *Msg
var err error
if nc .useOldRequestStyle () {
m , err = nc .oldRequestWithContext (ctx , subj , hdr , data )
} else {
mch , token , err := nc .createNewRequestAndSend (subj , hdr , data )
if err != nil {
return nil , err
}
var ok bool
select {
case m , ok = <- mch :
if !ok {
return nil , ErrConnectionClosed
}
case <- ctx .Done ():
nc .mu .Lock ()
delete (nc .respMap , token )
nc .mu .Unlock ()
return nil , ctx .Err ()
}
}
if err == nil && len (m .Data ) == 0 && m .Header .Get (statusHdr ) == noResponders {
m , err = nil , ErrNoResponders
}
return m , err
}
func (nc *Conn ) oldRequestWithContext (ctx context .Context , subj string , hdr , data []byte ) (*Msg , error ) {
inbox := nc .NewInbox ()
ch := make (chan *Msg , RequestChanLen )
s , err := nc .subscribe (inbox , _EMPTY_ , nil , ch , nil , true , nil )
if err != nil {
return nil , err
}
s .AutoUnsubscribe (1 )
defer s .Unsubscribe ()
err = nc .publish (subj , inbox , false , hdr , data )
if err != nil {
return nil , err
}
return s .NextMsgWithContext (ctx )
}
func (s *Subscription ) nextMsgWithContext (ctx context .Context , pullSubInternal , waitIfNoMsg bool ) (*Msg , error ) {
if ctx == nil {
return nil , ErrInvalidContext
}
if s == nil {
return nil , ErrBadSubscription
}
if ctx .Err () != nil {
return nil , ctx .Err ()
}
s .mu .Lock ()
err := s .validateNextMsgState (pullSubInternal )
if err != nil {
s .mu .Unlock ()
return nil , err
}
mch := s .mch
s .mu .Unlock ()
var ok bool
var msg *Msg
select {
case msg , ok = <- mch :
if !ok {
return nil , s .getNextMsgErr ()
}
if err := s .processNextMsgDelivered (msg ); err != nil {
return nil , err
}
return msg , nil
default :
if pullSubInternal && !waitIfNoMsg {
return nil , errNoMessages
}
}
select {
case msg , ok = <- mch :
if !ok {
return nil , s .getNextMsgErr ()
}
if err := s .processNextMsgDelivered (msg ); err != nil {
return nil , err
}
case <- ctx .Done ():
return nil , ctx .Err ()
}
return msg , nil
}
func (s *Subscription ) NextMsgWithContext (ctx context .Context ) (*Msg , error ) {
return s .nextMsgWithContext (ctx , false , true )
}
func (nc *Conn ) FlushWithContext (ctx context .Context ) error {
if nc == nil {
return ErrInvalidConnection
}
if ctx == nil {
return ErrInvalidContext
}
_ , ok := ctx .Deadline ()
if !ok {
return ErrNoDeadlineContext
}
nc .mu .Lock ()
if nc .isClosed () {
nc .mu .Unlock ()
return ErrConnectionClosed
}
ch := make (chan struct {}, 1 )
nc .sendPing (ch )
nc .mu .Unlock ()
var err error
select {
case _ , ok := <- ch :
if !ok {
err = ErrConnectionClosed
} else {
close (ch )
}
case <- ctx .Done ():
err = ctx .Err ()
}
if err != nil {
nc .removeFlushEntry (ch )
}
return err
}
func (c *EncodedConn ) RequestWithContext (ctx context .Context , subject string , v any , vPtr any ) error {
if ctx == nil {
return ErrInvalidContext
}
b , err := c .Enc .Encode (subject , v )
if err != nil {
return err
}
m , err := c .Conn .RequestWithContext (ctx , subject , b )
if err != nil {
return err
}
if reflect .TypeOf (vPtr ) == emptyMsgType {
mPtr := vPtr .(*Msg )
*mPtr = *m
} else {
err := c .Enc .Decode (m .Subject , m .Data , vPtr )
if err != nil {
return err
}
}
return nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .