package nats
import (
"context"
"encoding/json"
"github.com/nats-io/nats.go"
"github.com/pancsta/asyncmachine-go/pkg/integrations"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
)
func ExposeMachine (
ctx context .Context , mach am .Api , nc *nats .Conn , topic , queue string ,
) error {
var (
sub1 *nats .Subscription
err error
)
bind := func (msg *nats .Msg ) {
dispatcher (ctx , nc , mach , msg )
}
if queue != "" {
sub1 , err = nc .QueueSubscribe (topic , queue , bind )
} else {
sub1 , err = nc .Subscribe (topic , bind )
}
if err != nil {
return err
}
sub2 , err := nc .Subscribe (topic +"." +mach .Id (), bind )
if err != nil {
_ = sub1 .Unsubscribe ()
return err
}
go func () {
<-ctx .Done ()
_ = sub1 .Unsubscribe ()
_ = sub2 .Unsubscribe ()
}()
return err
}
func Add (
ctx context .Context , nc *nats .Conn , topic , machID string , states am .S ,
args am .A ) (am .Result , error ) {
req := integrations .NewMutationReq ()
req .Add = states
req .Args = args
reqJs , err := json .Marshal (req )
if err != nil {
return am .Canceled , err
}
msg , err := nc .RequestWithContext (ctx , topic +"." +machID , reqJs )
if err != nil {
return am .Canceled , err
}
var resp integrations .MutationResp
err = json .Unmarshal (msg .Data , &resp )
return resp .Result , err
}
func Remove (
ctx context .Context , nc *nats .Conn , machID , topic string , states am .S ,
args am .A ,
) (am .Result , error ) {
req := integrations .NewMutationReq ()
req .Remove = states
req .Args = args
reqJs , err := json .Marshal (req )
if err != nil {
return am .Canceled , err
}
msg , err := nc .RequestWithContext (ctx , topic +"." +machID , reqJs )
if err != nil {
return am .Canceled , err
}
var resp integrations .MutationResp
err = json .Unmarshal (msg .Data , &resp )
return resp .Result , err
}
func dispatcher(
ctx context .Context , nc *nats .Conn , mach am .Api , msg *nats .Msg ) {
var (
j []byte
err0 error
)
msgKind := integrations .MsgKindReq {}
if err := json .Unmarshal (msg .Data , &msgKind ); err != nil ||
!integrations .KindEnum .Contains (msgKind .Kind ) {
return
}
get := &integrations .GetterReq {}
mut := &integrations .MutationReq {}
wait := &integrations .WaitingReq {}
switch msgKind .Kind {
case integrations .KindReqGetter :
if err0 = json .Unmarshal (msg .Data , get ); err0 == nil {
resp , err := integrations .HandlerGetter (ctx , mach , get )
if err != nil {
err0 = err
} else {
j , err0 = json .Marshal (resp )
}
}
case integrations .KindReqMutation :
if err0 = json .Unmarshal (msg .Data , mut ); err0 == nil {
resp , err := integrations .HandlerMutation (ctx , mach , mut )
if err != nil {
err0 = err
} else {
j , err0 = json .Marshal (resp )
}
}
case integrations .KindReqWaiting :
if err0 = json .Unmarshal (msg .Data , wait ); err0 == nil {
resp , err := integrations .HandlerWaiting (ctx , mach , wait )
if err != nil {
err0 = err
} else {
j , err0 = json .Marshal (resp )
}
}
}
if err0 != nil {
mach .AddErr (err0 , nil )
return
}
if j != nil {
if msgKind .Kind == integrations .KindReqWaiting {
err0 = nc .Publish (msg .Subject , j )
} else {
err0 = msg .Respond (j )
}
if err0 != nil {
mach .AddErr (err0 , nil )
}
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .