package rpc
import (
"context"
"errors"
"fmt"
"maps"
"net"
"os"
"slices"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/cenkalti/rpc2"
"github.com/pancsta/asyncmachine-go/internal/utils"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)
var (
ssC = states .ClientStates
ssCo = states .ConsumerStates
)
type Client struct {
*ExceptionHandler
Mach *am .Machine
Name string
Addr string
NetMach *NetworkMachine
Consumer *am .Machine
CallCount uint64
LogEnabled bool
DisconnCooldown time .Duration
LastMsgAt time .Time
HelloDelay time .Duration
ReconnectOn bool
SyncNoSchema bool
SyncAllMutations bool
SyncAllowedStates am .S
SyncSkippedStates am .S
SyncShallowClocks bool
SyncMutationFiltering bool
ConnTimeout time .Duration
ConnRetries int
ConnRetryTimeout time .Duration
ConnRetryDelay time .Duration
ConnRetryBackoff time .Duration
CallTimeout time .Duration
CallRetries int
CallRetryTimeout time .Duration
CallRetryDelay time .Duration
CallRetryBackoff time .Duration
DisconnTimeout time .Duration
netMachInt *NetMachInternal
lockQueue sync .Mutex
callLock sync .Mutex
rpc atomic .Pointer [rpc2 .Client ]
schema am .Schema
conn net .Conn
tmpTestErr error
permTestErr error
connRetryRound atomic .Int32
trackedStates am .S
trackedStateIdxs []int
}
var (
_ clientRpcMethods = &Client {}
_ clientServerMethods = &Client {}
)
func NewClient (
ctx context .Context , netSrcAddr string , name string , netSrcSchema am .Schema ,
opts *ClientOpts ,
) (*Client , error ) {
if name == "" {
name = "rpc"
}
if opts == nil {
opts = &ClientOpts {}
}
if !opts .NoSchema && netSrcSchema == nil {
netSrcSchema = am .Schema {}
}
if netSrcAddr == "" {
return nil , errors .New ("rpcc: workerAddr required" )
}
c := &Client {
Name : name ,
ExceptionHandler : &ExceptionHandler {},
LogEnabled : os .Getenv (EnvAmRpcLogClient ) != "" ,
Addr : netSrcAddr ,
CallTimeout : 3 * time .Second ,
ConnTimeout : 3 * time .Second ,
DisconnTimeout : 3 * time .Second ,
DisconnCooldown : 10 * time .Millisecond ,
ReconnectOn : true ,
SyncNoSchema : opts .NoSchema ,
SyncAllowedStates : opts .AllowedStates ,
SyncSkippedStates : opts .SkippedStates ,
SyncAllMutations : opts .SyncMutations ,
SyncShallowClocks : opts .SyncShallowClocks ,
SyncMutationFiltering : opts .MutationFiltering ,
ConnRetryTimeout : 1 * time .Minute ,
ConnRetries : 15 ,
ConnRetryDelay : 100 * time .Millisecond ,
ConnRetryBackoff : 3 * time .Second ,
CallRetryTimeout : 1 * time .Minute ,
CallRetries : 15 ,
CallRetryDelay : 100 * time .Millisecond ,
CallRetryBackoff : 3 * time .Second ,
schema : am .CloneSchema (netSrcSchema ),
}
if amhelp .IsDebug () {
c .CallTimeout = 100 * time .Second
}
mach , err := am .NewCommon (ctx , GetClientId (name ), states .ClientSchema ,
ssC .Names (), c , opts .Parent , &am .Opts {Tags : []string {
"rpc-client" ,
"addr:" + netSrcAddr ,
}})
if err != nil {
return nil , err
}
mach .SemLogger ().SetArgsMapper (LogArgs )
mach .SetGroups (states .ClientGroups , ssC )
c .Mach = mach
if os .Getenv (EnvAmRpcDbg ) != "" {
_ = amhelp .MachDebugEnv (mach )
}
if opts .Consumer != nil {
err := amhelp .Implements (opts .Consumer .StateNames (), ssCo .Names ())
if err != nil {
err := fmt .Errorf (
"consumer has to implement pkg/rpc/states/ConsumerStatesDef: %w" , err )
return nil , err
}
c .Consumer = opts .Consumer
}
return c , nil
}
func (c *Client ) StartState (e *am .Event ) {
ctx := c .Mach .NewStateCtx (ssC .Start )
nmConn := &clientNetMachConn {rpc : c }
stateNames := slices .Collect (maps .Keys (c .schema ))
id := PrefixNetMach + utils .RandId (5 )
netMach , nmInternal , err := NewNetworkMachine (ctx , id , nmConn , c .schema ,
stateNames , c .Mach , nil , c .SyncMutationFiltering )
if err != nil {
c .Mach .AddErr (err , nil )
return
}
c .NetMach = netMach
c .netMachInt = nmInternal
}
func (c *Client ) StartEnd (e *am .Event ) {
before := e .Transition ().TimeBefore
idx := e .Machine ().Index1
if before .Is ([]int {idx (ssC .Connecting ), idx (ssC .Exception )}) {
return
}
wasConn := before .Is1 (idx (ssC .Connecting )) || before .Is1 (idx (ssC .Connected ))
if wasConn {
c .Mach .EvAdd1 (e , ssC .Disconnecting , nil )
}
}
func (c *Client ) ConnectingState (e *am .Event ) {
ctx := c .Mach .NewStateCtx (ssC .Connecting )
go func () {
if ctx .Err () != nil {
return
}
timeout := c .ConnTimeout
if amhelp .IsDebug () {
timeout = 100 * time .Second
}
d := net .Dialer {
Timeout : timeout ,
}
c .Mach .Log ("dialing %s" , c .Addr )
conn , err := d .DialContext (ctx , "tcp4" , c .Addr )
if ctx .Err () != nil {
return
}
if err != nil {
c .Mach .EvAdd1 (e , ssC .Disconnected , nil )
AddErrNetwork (e , c .Mach , err )
return
}
c .conn = conn
client := c .bindRpcHandlers (conn )
go client .Run ()
c .Mach .EvAdd1 (e , ssC .Connected , nil )
}()
}
func (c *Client ) DisconnectingEnter (e *am .Event ) bool {
return c .rpc .Load () != nil && c .conn != nil
}
func (c *Client ) DisconnectingState (e *am .Event ) {
ctx := c .Mach .NewStateCtx (ssC .Disconnecting )
go func () {
if ctx .Err () != nil {
return
}
c .notify (ctx , ServerBye .Value , &MsgEmpty {})
if !amhelp .Wait (ctx , c .DisconnCooldown ) {
c .ensureGroupConnected (e )
return
}
if c .rpc .Load () != nil {
select {
case <- time .After (c .DisconnTimeout ):
c .log ("rpc.Close timeout" )
case <- amhelp .ExecAndClose (func () {
_ = c .rpc .Load ().Close ()
}):
c .log ("rpc.Close" )
case <- ctx .Done ():
}
}
if ctx .Err () != nil {
c .ensureGroupConnected (e )
return
}
c .Mach .EvAdd1 (e , ssC .Disconnected , nil )
}()
}
func (c *Client ) ConnectedState (e *am .Event ) {
ctx := c .Mach .NewStateCtx (ssC .Connected )
disconnCh := c .rpc .Load ().DisconnectNotify ()
c .connRetryRound .Store (0 )
go func () {
select {
case <- ctx .Done ():
return
case <- disconnCh :
c .log ("rpc.DisconnectNotify" )
c .Mach .EvAdd1 (e , ssC .Disconnected , nil )
}
}()
}
func (c *Client ) DisconnectedEnter (e *am .Event ) bool {
return !c .Mach .WillBe1 (ssC .Disconnecting )
}
func (c *Client ) DisconnectedState (e *am .Event ) {
wasAny := e .Transition ().TimeBefore .Any1
if wasAny (c .Mach .Index1 (ssC .Connected ), c .Mach .Index1 (ssC .Connecting )) &&
c .ReconnectOn {
c .Mach .EvAdd1 (e , ssC .RetryingConn , nil )
return
}
if c .conn != nil {
_ = c .conn .Close ()
}
}
func (c *Client ) HandshakingState (e *am .Event ) {
ctx := c .Mach .NewStateCtx (ssC .Connected )
go func () {
if ctx .Err () != nil {
return
}
resp := &MsgSrvHello {}
if c .HelloDelay > 0 {
if !amhelp .Wait (ctx , c .HelloDelay ) {
return
}
}
ok := false
delay := c .CallRetryDelay
timeout := c .CallTimeout / 2
for i := 0 ; i < c .ConnRetries ; i ++ {
msg := MsgCliHello {
Id : c .Mach .Id (),
SyncSchema : !c .SyncNoSchema ,
SyncMutations : c .SyncAllMutations ,
ShallowClocks : c .SyncShallowClocks ,
SkippedStates : c .SyncSkippedStates ,
AllowedStates : c .SyncAllowedStates ,
}
if !c .SyncNoSchema {
msg .SyncSchema = true
msg .SchemaHash = amhelp .SchemaHash (c .NetMach .Schema ())
}
if c .call (ctx , ServerHello .Value , msg , resp , timeout ) {
ok = true
c .log ("hello ok on %d try" , i +1 )
break
}
if !amhelp .Wait (ctx , delay ) {
return
}
if c .CallRetryBackoff > 0 {
delay *= 2
if delay > c .CallRetryBackoff {
delay = c .CallRetryBackoff
}
}
}
if !ok {
c .Mach .EvAdd1 (e , ssC .RetryingConn , nil )
return
}
stateNames := resp .Serialized .StateNames
if len (stateNames ) == 0 {
AddErrRpcStr (e , c .Mach , "states missing" )
return
}
if resp .Serialized .ID == "" {
AddErrRpcStr (e , c .Mach , "ID missing" )
return
}
if c .NetMach .Schema () == nil && resp .Schema == nil && !c .SyncNoSchema {
AddErrRpcStr (e , c .Mach , "schema missing" )
return
}
c .updateStatesSchema (resp )
if c .Mach .Tick (ssC .Handshaking ) == 1 && os .Getenv (EnvAmRpcDbg ) != "" {
_ = amhelp .MachDebugEnv (c .NetMach )
}
c .NetMach .tags [1 ] = "src-id:" + resp .Serialized .ID
c .NetMach .remoteId = resp .Serialized .ID
if !c .call (ctx , ServerHandshake .Value , &MsgEmpty {}, &MsgEmpty {}, 0 ) {
c .Mach .EvAdd1 (e , ssC .RetryingConn , nil )
return
}
c .Mach .EvAdd1 (e , ssC .HandshakeDone , Pass (&A {
Id : resp .Serialized .ID ,
MachTime : resp .Serialized .Time ,
QueueTick : resp .Serialized .QueueTick ,
}))
}()
}
func (c *Client ) HandshakeDoneEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a .Id != "" && a .MachTime != nil && a .QueueTick > 0
}
func (c *Client ) HandshakeDoneState (e *am .Event ) {
args := ParseArgs (e .Args )
netMach := c .NetMach
netMach .id = PrefixNetMach + c .Name
c .clockSet (args .MachTime , args .QueueTick , args .MachTick )
c .log ("connected to %s" , netMach .remoteId )
c .log ("time t%d q%d: %v" ,
netMach .Time (nil ).Sum (nil ), netMach .QueueTick (), args .MachTime )
}
func (c *Client ) CallRetryFailedState (e *am .Event ) {
c .Mach .EvRemove1 (e , ssC .CallRetryFailed , nil )
}
func (c *Client ) RetryingCallEnter (e *am .Event ) bool {
return c .Mach .Any1 (ssC .Connected , ssC .RetryingConn )
}
func (c *Client ) ExceptionState (e *am .Event ) {
c .ExceptionHandler .ExceptionState (e )
c .Mach .EvRemove1 (e , am .StateException , nil )
}
func (c *Client ) RetryingConnState (e *am .Event ) {
ctx := c .Mach .NewStateCtx (ssC .RetryingConn )
delay := c .ConnRetryDelay
start := time .Now ()
go func () {
for ctx .Err () == nil && c .connRetryRound .Load () < int32 (c .ConnRetries ) {
c .connRetryRound .Add (1 )
if !amhelp .Wait (ctx , delay ) {
return
}
amhelp .Add1Sync (ctx , c .Mach , ssC .Connecting , nil )
if ctx .Err () != nil {
return
}
_ = amhelp .WaitForErrAny (ctx , c .ConnTimeout *2 , c .Mach ,
c .Mach .WhenNot1 (ssC .Connecting , ctx ))
if ctx .Err () != nil {
return
}
c .Mach .EvRemove1 (e , ssC .Exception , nil )
if c .ConnRetryBackoff > 0 {
delay *= 2
if delay > c .ConnRetryBackoff {
delay = c .ConnRetryBackoff
}
}
if c .ConnRetryTimeout > 0 && time .Since (start ) > c .ConnRetryTimeout {
break
}
}
if ctx .Err () != nil {
return
}
c .Mach .EvRemove1 (e , ssC .RetryingConn , nil )
c .Mach .EvAdd1 (e , ssC .ConnRetryFailed , nil )
}()
}
func (c *Client ) WorkerPayloadEnter (e *am .Event ) bool {
if c .Consumer == nil {
return false
}
args := ParseArgs (e .Args )
argsOut := &A {Name : args .Name }
if args .Payload == nil {
err := errors .New ("invalid payload" )
c .Mach .AddErrState (ssC .ErrDelivery , err , Pass (argsOut ))
return false
}
return true
}
func (c *Client ) WorkerPayloadState (e *am .Event ) {
args := ParseArgs (e .Args )
argsOut := &A {
Name : args .Name ,
Payload : args .Payload ,
}
c .Consumer .EvAdd1 (e , ssCo .WorkerPayload , Pass (argsOut ))
}
func (c *Client ) HealthcheckState (e *am .Event ) {
c .Mach .EvRemove1 (e , ssC .Healthcheck , nil )
c .ensureGroupConnected (e )
}
func (c *Client ) Start () am .Result {
return c .Mach .Add (am .S {ssC .Start , ssC .Connecting }, nil )
}
func (c *Client ) Stop (waitTillExit context .Context , dispose bool ) am .Result {
res := c .Mach .Remove1 (ssC .Start , nil )
if res != am .Canceled && waitTillExit != nil {
_ = amhelp .WaitForAll (waitTillExit , 2 *time .Second ,
c .Mach .When1 (ssC .Disconnected , nil ))
}
if dispose {
c .log ("disposing" )
c .Mach .Dispose ()
c .NetMach .Dispose ()
}
return res
}
func (c *Client ) IsPartial () bool {
return len (c .SyncSkippedStates ) > 0 || len (c .SyncSkippedStates ) > 0
}
func (c *Client ) GetKind () Kind {
return KindClient
}
func (c *Client ) Sync () am .Time {
c .Mach .Add1 (ssC .MetricSync , nil )
resp := &MsgSrvSync {}
ok := c .callFailsafe (c .Mach .Ctx (), ServerSync .Value , &MsgEmpty {}, resp )
if !ok {
return nil
}
if len (resp .Time ) > 0 && len (resp .Time ) != len (c .NetMach .StateNames ()) {
AddErrRpcStr (nil , c .Mach , "wrong clock len" )
return nil
}
c .clockSet (resp .Time , resp .QueueTick , resp .MachTick )
return c .NetMach .machTime
}
func (c *Client ) Args () []string {
resp := &MsgSrvArgs {}
ok := c .callFailsafe (c .Mach .Ctx (), ServerArgs .Value , &MsgEmpty {}, resp )
if !ok {
return nil
}
return (*resp ).Args
}
func (c *Client ) updateStatesSchema (resp *MsgSrvHello ) {
netMach := c .NetMach
netMach .schemaMx .Lock ()
defer netMach .schemaMx .Unlock ()
netMach .clockMx .Lock ()
defer netMach .clockMx .Unlock ()
if resp .Schema != nil {
c .schema = resp .Schema
netMach .schema = resp .Schema
}
netMach .stateNames = resp .Serialized .StateNames
netMach .queueTick = resp .Serialized .QueueTick
netMach .machTime = resp .Serialized .Time
for idx , state := range netMach .stateNames {
netMach .machClock [state ] = netMach .machTime [idx ]
}
netMach .subs .SetClock (netMach .machClock )
c .log ("registered %d states" , len (netMach .stateNames ))
c .trackedStates = netMach .stateNames
if c .SyncAllowedStates != nil {
c .trackedStates = am .StatesShared (c .trackedStates , c .SyncAllowedStates )
}
c .trackedStates = am .StatesDiff (c .trackedStates , c .SyncSkippedStates )
c .trackedStateIdxs = make ([]int , len (c .trackedStates ))
for i , name := range c .trackedStates {
c .trackedStateIdxs [i ] = slices .Index (netMach .stateNames , name )
}
}
func (c *Client ) clockFromUpdate (
update *MsgSrvUpdate , timeBefore am .Time , qTickBefore uint64 ,
machTickBefore uint32 ,
) (am .Time , uint64 , uint32 ) {
timeAfter := slices .Clone (timeBefore )
l := uint16 (len (timeAfter ))
for i , idx := range update .Indexes {
val := update .Ticks [i ]
if idx >= l {
continue
}
timeAfter [idx ] += uint64 (val )
}
qTickAfter := qTickBefore + uint64 (update .QueueTick )
machTickAfter := machTickBefore + uint32 (update .MachTick )
return timeAfter , qTickAfter , machTickAfter
}
func (c *Client ) ensureGroupConnected (e *am .Event ) {
groupConn := states .ClientGroups .Connected
if !c .Mach .Any1 (groupConn ...) && !c .Mach .WillBe (groupConn ) {
c .Mach .EvAdd1 (e , ssC .Disconnected , nil )
}
}
func (c *Client ) log (msg string , args ...any ) {
if !c .LogEnabled {
return
}
c .Mach .Log (msg , args ...)
}
func (c *Client ) bindRpcHandlers (conn net .Conn ) *rpc2 .Client {
c .log ("new rpc2 client" )
client := rpc2 .NewClient (conn )
client .Handle (ClientUpdate .Value , c .RemoteUpdate )
client .Handle (ClientUpdateMutations .Value , c .RemoteUpdateMutations )
client .Handle (ClientSendPayload .Value , c .RemoteSendPayload )
client .Handle (ClientBye .Value , c .RemoteBye )
client .Handle (ClientSchemaChange .Value , c .RemoteSchemaChange )
client .SetBlocking (true )
c .rpc .Store (client )
return client
}
func (c *Client ) clockSet (mTime am .Time , qTick uint64 , machTick uint32 ) {
if c .Mach .Not1 (ssC .HandshakeDone ) {
return
}
c .lockQueue .Lock ()
defer c .lockQueue .Unlock ()
if mTime == nil {
return
}
var sum uint64
for _ , v := range mTime {
sum += v
}
c .log ("clockUpdate full OK t%d q%d" , sum , qTick )
c .netMachInt .Lock ()
c .netMachInt .UpdateClock (mTime , qTick , machTick )
}
func (c *Client ) clockUpdate (update *MsgSrvUpdate , queueLocked bool ) bool {
if c .Mach .Not1 (ssC .HandshakeDone ) {
return true
}
if !queueLocked {
c .lockQueue .Lock ()
defer c .lockQueue .Unlock ()
}
c .netMachInt .Lock ()
netMach := c .NetMach
mTime , qTick , machTick := c .clockFromUpdate (update , netMach .machTime ,
netMach .queueTick , netMach .machTick )
checksumTime := mTime
if c .SyncShallowClocks {
checksumTime = am .NewTime (checksumTime , c .trackedStateIdxs )
}
check := Checksum (checksumTime .Sum (nil ), qTick , machTick )
if check != update .Checksum {
c .Mach .Log ("clockUpdate mismatch %d != %d" , update .Checksum , check )
c .log ("msg q%d m%d ch%d %+v" , update .QueueTick , update .MachTick ,
update .Checksum , update .Indexes )
c .log ("clock t%d q%d m%d ch%d (%+v)" , mTime .Sum (nil ), qTick ,
machTick , check , mTime )
netMach .clockMx .Unlock ()
return false
}
if mTime == nil {
netMach .clockMx .Unlock ()
return false
}
c .log ("clockUpdate diff OK tt%d q%d" , mTime .Sum (nil ), qTick )
c .netMachInt .UpdateClock (mTime , qTick , machTick )
return true
}
func (c *Client ) clockUpdateMutations (msgs *MsgSrvUpdateMuts ) bool {
if c .Mach .Not1 (ssC .HandshakeDone ) {
return true
}
c .lockQueue .Lock ()
defer c .lockQueue .Unlock ()
for i := range msgs .Updates {
if !c .clockUpdate (&msgs .Updates [i ], true ) {
return false
}
}
return true
}
func (c *Client ) callFailsafe (
ctx context .Context , method string , args , resp any ,
) bool {
mName := ServerMethods .Parse (method ).Value
if c .rpc .Load () == nil {
AddErrNoConn (nil , c .Mach , errors .New (mName ))
return false
}
c .callLock .Lock ()
defer c .callLock .Unlock ()
if c .call (ctx , method , args , resp , 0 ) {
return true
}
start := time .Now ()
worked := false
delay := c .CallRetryDelay
c .Mach .Add1 (ssC .RetryingCall , Pass (&A {
Method : mName ,
StartedAt : start ,
}))
defer func () {
if worked {
c .Mach .Remove1 (ssC .RetryingCall , nil )
} else {
c .Mach .Add1 (ssC .CallRetryFailed , Pass (&A {Method : mName }))
}
}()
for i := 0 ; i < c .CallRetries ; i ++ {
if !amhelp .Wait (ctx , delay ) {
return false
}
<-c .Mach .When1 (ssC .Ready , ctx )
if ctx .Err () != nil {
return false
}
if c .call (ctx , method , args , resp , 0 ) {
worked = true
return true
}
if c .CallRetryBackoff > 0 {
delay *= 2
if delay > c .CallRetryBackoff {
delay = c .CallRetryBackoff
}
}
if c .CallRetryTimeout > 0 && time .Since (start ) > c .CallRetryTimeout {
break
}
}
return false
}
func (c *Client ) call (
ctx context .Context , method string , args , resp any , timeout time .Duration ,
) bool {
defer c .Mach .PanicToErr (nil )
mName := ServerMethods .Parse (method ).Value
c .CallCount ++
if timeout == 0 {
timeout = c .CallTimeout
}
callCtx , cancel := context .WithTimeout (ctx , timeout )
defer cancel ()
err := c .rpc .Load ().CallWithContext (ctx , method , args , resp )
if ctx .Err () != nil {
return false
}
if callCtx .Err () != nil {
c .Mach .AddErrState (ssC .ErrNetworkTimeout , callCtx .Err (), nil )
return false
}
if c .tmpTestErr != nil {
AddErrNetwork (nil , c .Mach , fmt .Errorf ("%w: %s" , c .tmpTestErr , mName ))
c .tmpTestErr = nil
return false
}
if c .permTestErr != nil {
AddErrNetwork (nil , c .Mach , fmt .Errorf ("%w: %s" , c .tmpTestErr , mName ))
return false
}
if err != nil {
AddErr (nil , c .Mach , mName , err )
return false
}
return true
}
func (c *Client ) notifyFailsafe (
ctx context .Context , method string , args any ,
) bool {
mName := ServerMethods .Parse (method ).Value
if c .rpc .Load () == nil {
AddErrNoConn (nil , c .Mach , errors .New (mName ))
return false
}
c .callLock .Lock ()
defer c .callLock .Unlock ()
if c .notify (ctx , method , args ) {
return true
}
start := time .Now ()
worked := false
delay := c .CallRetryDelay
c .Mach .Add1 (ssC .RetryingCall , Pass (&A {
Method : mName ,
StartedAt : start ,
}))
defer func () {
if worked {
c .Mach .Remove1 (ssC .RetryingCall , nil )
} else {
c .Mach .Add1 (ssC .CallRetryFailed , Pass (&A {Method : mName }))
}
}()
for i := 0 ; i < c .CallRetries ; i ++ {
time .Sleep (delay )
if c .notify (ctx , method , args ) {
return true
}
if c .CallRetryBackoff > 0 {
delay *= 2
if delay > c .CallRetryBackoff {
delay = c .CallRetryBackoff
}
}
if c .CallRetryTimeout > 0 && time .Since (start ) > c .CallRetryTimeout {
break
}
}
return false
}
func (c *Client ) notify (
ctx context .Context , method string , args any ,
) bool {
defer c .Mach .PanicToErr (nil )
mName := ServerMethods .Parse (method ).Value
err := c .conn .SetDeadline (time .Now ().Add (c .CallTimeout ))
if err != nil {
AddErr (nil , c .Mach , mName , err )
return false
}
c .CallCount ++
err = c .rpc .Load ().Notify (method , args )
if ctx .Err () != nil {
return false
}
if err != nil {
AddErr (nil , c .Mach , method , err )
return false
}
err = c .conn .SetDeadline (time .Time {})
if err != nil {
AddErr (nil , c .Mach , mName , err )
return false
}
return true
}
func (c *Client ) RemoteUpdate (
_ *rpc2 .Client , update *MsgSrvUpdate , _ *MsgEmpty ,
) error {
if update == nil {
AddErrParams (nil , c .Mach , nil )
return nil
}
c .clockUpdate (update , false )
return nil
}
func (c *Client ) RemoteUpdateMutations (
_ *rpc2 .Client , updates *MsgSrvUpdateMuts , _ *MsgEmpty ,
) error {
if updates == nil {
AddErrParams (nil , c .Mach , nil )
return nil
}
if !c .clockUpdateMutations (updates ) {
c .Sync ()
}
return nil
}
func (c *Client ) RemoteSendingPayload (
_ *rpc2 .Client , payload *MsgSrvPayload , _ *MsgEmpty ,
) error {
c .log ("RemoteSendingPayload %s" , payload .Name )
c .Mach .Add1 (ssC .WorkerDelivering , Pass (&A {
Payload : payload ,
Name : payload .Name ,
}))
return nil
}
func (c *Client ) RemoteSendPayload (
_ *rpc2 .Client , payload *MsgSrvPayload , _ *MsgEmpty ,
) error {
c .log ("RemoteSendPayload %s:%s" , payload .Name , payload .Token )
c .Mach .Add1 (ssC .WorkerPayload , Pass (&A {
Payload : payload ,
Name : payload .Name ,
}))
return nil
}
func (c *Client ) RemoteBye (
_ *rpc2 .Client , _ *MsgEmpty , _ *MsgEmpty ,
) error {
c .Mach .Remove1 (ssC .Start , nil )
return nil
}
func (c *Client ) RemoteSchemaChange (
_ *rpc2 .Client , msg *MsgSrvHello , _ *MsgEmpty ,
) error {
c .log ("new schema v" + strconv .Itoa (len (msg .Serialized .StateNames )))
c .updateStatesSchema (msg )
return nil
}
type ClientOpts struct {
Consumer *am .Machine
Parent am .Api
NoSchema bool
AllowedStates am .S
SkippedStates am .S
SyncMutations bool
SyncShallowClocks bool
MutationFiltering bool
}
func GetClientId (name string ) string {
return "rc-" + name
}
type clientNetMachConn struct {
rpc *Client
}
func (c clientNetMachConn ) Call (
ctx context .Context , method ServerMethod , args any , resp any ,
) bool {
if !c .rpc .callFailsafe (ctx , method .Value , args , resp ) {
return false
}
switch method {
case ServerAdd :
fallthrough
case ServerSet :
fallthrough
case ServerRemove :
mutResp , ok := resp .(*MsgSrvMutation )
if !ok {
c .rpc .Mach .AddErr (errors .New ("parsing resp.(*MsgSrvMutation)" ), nil )
return false
}
synced := false
if c .rpc .SyncAllMutations {
synced = c .rpc .clockUpdateMutations (mutResp .Mutations )
} else {
synced = c .rpc .clockUpdate (mutResp .Update , false )
}
if !synced {
c .rpc .Sync ()
}
}
return true
}
func (c clientNetMachConn ) Notify (
ctx context .Context , method ServerMethod , args any ,
) bool {
return c .rpc .notifyFailsafe (ctx , method .Value , args )
}
var _ NetMachConn = clientNetMachConn {}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .