package rpc
import (
"context"
"fmt"
"log"
"net"
"net/http"
"os"
"reflect"
"slices"
"sync"
"sync/atomic"
"time"
"github.com/cenkalti/rpc2"
"github.com/coder/websocket"
"github.com/pancsta/asyncmachine-go/internal/utils"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/rpc/states"
ampipe "github.com/pancsta/asyncmachine-go/pkg/states/pipes"
)
var (
ssS = states .ServerStates
ssW = states .StateSourceStates
)
type Server struct {
*ExceptionHandler
Mach *am .Machine
Source am .Api
Addr string
DeliveryTimeout time .Duration
Listener atomic .Pointer [net .Listener ]
Conn net .Conn
LogEnabled bool
CallCount uint64
Args any
Opts ServerOpts
PushInterval atomic .Pointer [time .Duration ]
AllowId string
WsTunReconn bool
WsTunConnTimeout time .Duration
WsTunConnRetries int
WsTunConnRetryTimeout time .Duration
WsTunConnRetryDelay time .Duration
WsTunConnRetryBackoff time .Duration
syncMutations bool
syncAllowedStates am .S
syncSkippedStates am .S
syncSchema bool
syncShallowClocks bool
rpcServer *rpc2 .Server
rpcClient atomic .Pointer [rpc2 .Client ]
tracer *sourceTracer
ticker *time .Ticker
lockCollection sync .Mutex
lockExport sync .Mutex
clientId atomic .Pointer [string ]
deliveryHandlers any
lastPush time .Time
lastPushData *tracerData
httpSrv *http .Server
wsTunRetryRound atomic .Int32
wsConn *websocket .Conn
}
var (
_ serverRpcMethods = &Server {}
_ clientServerMethods = &Server {}
)
func NewServer (
ctx context .Context , addr string , name string , netSrcMach am .Api ,
opts *ServerOpts ,
) (*Server , error ) {
if name == "" {
name = "rpc"
}
if opts == nil {
opts = &ServerOpts {}
}
if opts .WebSocketTunnel != "" && addr == "" {
return nil , fmt .Errorf ("addr required for WebSocketTunnel" )
}
if netSrcMach == nil {
return nil , fmt .Errorf ("netSrcMach required" )
}
if !netSrcMach .StatesVerified () {
return nil , fmt .Errorf (
"net source states not verified, call VerifyStates()" )
}
hasHandlers := netSrcMach .HasHandlers ()
if hasHandlers && !netSrcMach .Has (ssW .Names ()) {
err := fmt .Errorf (
"%w: NetSourceMach with handlers has to implement " +
"pkg/rpc/states/StateSourceStatesDef" ,
am .ErrSchema )
return nil , err
}
s := &Server {
ExceptionHandler : &ExceptionHandler {},
Addr : addr ,
DeliveryTimeout : 5 * time .Second ,
LogEnabled : os .Getenv (EnvAmRpcLogServer ) != "" ,
Source : netSrcMach ,
Args : opts .Args ,
Opts : *opts ,
WsTunConnTimeout : 3 * time .Second ,
WsTunConnRetryTimeout : 5 * time .Minute ,
WsTunConnRetries : 50 ,
WsTunConnRetryDelay : time .Second ,
WsTunConnRetryBackoff : 10 * time .Second ,
lastPushData : &tracerData {},
}
interval := 250 * time .Millisecond
s .PushInterval .Store (&interval )
mach , err := am .NewCommon (ctx , "rs-" +name , states .ServerSchema , ssS .Names (),
s , opts .Parent , &am .Opts {
Tags : []string {TagRpcServer },
})
if err != nil {
return nil , err
}
mach .SemLogger ().SetArgsMapper (LogArgs )
mach .SetGroups (states .ServerGroups , ssS )
mach .OnDispose (func (id string , ctx context .Context ) {
netSrcMach .Dispose ()
if l := s .Listener .Load (); l != nil {
_ = (*l ).Close ()
s .Listener .Store (nil )
}
s .rpcServer = nil
_ = s .Source .DetachTracer (s .tracer )
_ = s .Source .DetachHandlers (s .deliveryHandlers )
})
s .Mach = mach
if os .Getenv (EnvAmRpcDbg ) != "" {
_ = amhelp .MachDebugEnv (mach )
}
if opts .WebSocketTunnel != "" {
mach .Add1 (ssS .WebSocketTunnel , nil )
}
s .tracer = &sourceTracer {
s : s ,
dataLatest : &tracerData {
queueTick : 1 ,
},
}
s .tracer .dataLatest .queueTick = 1
if err = netSrcMach .BindTracer (s .tracer ); err != nil {
return nil , err
}
if hasHandlers {
payloadState := ssW .SendPayload
if opts .PayloadState != "" {
payloadState = opts .PayloadState
}
var h any
if payloadState == ssW .SendPayload {
h = &SendPayloadHandlers {
SendPayloadState : getSendPayloadState (s , ssW .SendPayload ),
}
} else {
h = createSendPayloadHandlers (s , payloadState )
}
err = netSrcMach .BindHandlers (h )
if err != nil {
return nil , err
}
}
if amhelp .IsDebug () {
s .log ("debug, extending timeouts" )
s .WsTunConnRetryTimeout *= 10
}
return s , nil
}
func (s *Server ) ExceptionState (e *am .Event ) {
s .ExceptionHandler .ExceptionState (e )
if s .Mach .Is1 (ssS .WebSocketTunnel ) {
if s .wsConn != nil {
_ = s .wsConn .Close (websocket .StatusInternalError , "" )
}
if s .Conn != nil {
_ = s .Conn .Close ()
}
add , _ := e .Transition ().TimeIndexDiff ()
shouldRetry := s .wsTunRetryRound .Load () < int32 (s .WsTunConnRetries ) &&
s .WsTunReconn
if add .Is1 (ssS .ErrNetwork ) && shouldRetry {
s .log ("WebSocket exception retry" )
s .Mach .Remove1 (ssS .Exception , nil )
s .Mach .Add1 (ssS .RpcStarting , nil )
}
}
}
func (s *Server ) StartState (e *am .Event ) {
ctx := s .Mach .NewStateCtx (ssS .Start )
if s .Opts .WebSocket {
s .httpSrv = &http .Server {
Addr : s .Addr ,
Handler : &wsHandlerServer {
s : s ,
event : e .Export (),
},
}
go func () {
if ctx .Err () != nil {
return
}
err := s .httpSrv .ListenAndServe ()
if err != nil {
AddErrNetwork (e , s .Mach , err )
s .Mach .Remove1 (ssS .RpcStarting , nil )
return
}
}()
}
}
func (s *Server ) StartEnd (e *am .Event ) {
if s .Opts .WebSocket {
go func () {
_ = s .httpSrv .Shutdown (s .Mach .Context ())
}()
} else if s .Mach .Is1 (ssS .WebSocketTunnel ) && s .wsConn != nil {
_ = s .wsConn .Close (websocket .StatusNormalClosure , "" )
}
if ParseArgs (e .Args ).Dispose {
s .Mach .Dispose ()
}
}
func (s *Server ) RpcStartingEnter (e *am .Event ) bool {
if s .Addr == "" && s .Listener .Load () == nil && s .Conn == nil {
return false
}
return true
}
func (s *Server ) RpcStartingState (e *am .Event ) {
ctxStart := s .Mach .NewStateCtx (ssS .Start )
s .log ("Starting RPC on %s" , s .Addr )
s .bindRpcHandlers ()
if s .Opts .WebSocket {
return
}
go func () {
if ctxStart .Err () != nil {
return
}
if s .Opts .WebSocketTunnel != "" {
addr := "ws://" + s .Addr + s .Opts .WebSocketTunnel
delay := s .WsTunConnRetryDelay
start := time .Now ()
s .Conn = nil
for ctxStart .Err () == nil &&
(s .WsTunReconn || s .wsTunRetryRound .Load () == 0 ) &&
s .wsTunRetryRound .Load () < int32 (s .WsTunConnRetries ) {
s .wsTunRetryRound .Add (1 )
if !amhelp .Wait (ctxStart , delay ) {
return
}
ctxWs , cancel := context .WithTimeout (ctxStart , s .WsTunConnTimeout )
s .log ("Dialing (round %d) %s" , s .wsTunRetryRound .Load (), addr )
ws , _ , err := websocket .Dial (ctxWs , addr , nil )
if err != nil {
s .log ("WebSocket err" )
if ctxStart .Err () == nil && ctxWs .Err () != nil {
AddErrNetworkTimeout (e , s .Mach , err )
} else {
AddErrNetwork (e , s .Mach , err )
}
} else {
s .log ("WebSocket OK" )
s .wsConn = ws
if err != nil {
AddErrNetwork (e , s .Mach , err )
} else {
s .log ("Tunnel OK" )
s .Conn = websocket .NetConn (ctxStart , ws , websocket .MessageBinary )
s .wsTunRetryRound .Store (0 )
}
cancel ()
break
}
cancel ()
if s .WsTunConnRetryBackoff > 0 {
delay *= 2
if delay > s .WsTunConnRetryBackoff {
delay = s .WsTunConnRetryBackoff
}
}
if t := s .WsTunConnRetryTimeout ; t > 0 && time .Since (start ) > t {
s .log ("WebSocket RetryTimeout" )
break
}
}
if s .Conn == nil {
s .log ("WebSocket tunnel failure" )
s .Mach .EvRemove1 (e , ssS .RpcStarting , nil )
return
}
} else if s .Conn != nil {
s .log ("Using existing connection %s" , s .Conn .LocalAddr ())
s .Addr = s .Conn .LocalAddr ().String ()
} else if l := s .Listener .Load (); l != nil {
s .Addr = (*l ).Addr ().String ()
} else {
cfg := net .ListenConfig {}
lis , err := cfg .Listen (ctxStart , "tcp4" , s .Addr )
if err != nil {
AddErrNetwork (e , s .Mach , err )
s .Mach .EvRemove1 (e , ssS .RpcStarting , nil )
return
}
s .Listener .Store (&lis )
s .Addr = lis .Addr ().String ()
}
s .Mach .EvAdd1 (e , ssS .RpcAccepting , nil )
}()
}
func (s *Server ) RpcAcceptingEnter (e *am .Event ) bool {
return s .Listener .Load () != nil || s .Conn != nil
}
func (s *Server ) RpcAcceptingState (e *am .Event ) {
ctxRpcAccepting := s .Mach .NewStateCtx (ssS .RpcAccepting )
ctxStart := s .Mach .NewStateCtx (ssS .Start )
srv := s .rpcServer
s .log ("RPC started on %s" , s .Addr )
go func () {
if ctxRpcAccepting .Err () != nil {
return
}
go func () {
if ctxRpcAccepting .Err () != nil {
return
}
s .Mach .EvAdd1 (e , ssS .RpcReady , Pass (&A {Addr : s .Addr }))
lis := s .Listener .Load ()
if s .Conn != nil {
srv .ServeConn (s .Conn )
} else if lis != nil {
srv .Accept (*lis )
} else {
AddErrNetwork (e , s .Mach , fmt .Errorf ("no listener" ))
s .Mach .EvRemove1 (e , ssS .RpcReady , nil )
return
}
if ctxStart .Err () != nil {
return
}
if lis != nil {
(*lis ).Close ()
s .Listener .Store (nil )
}
if ctxStart .Err () != nil {
return
}
if s .Mach .Is1 (ssS .Start ) {
s .log ("restarting on failed listener" )
s .Mach .EvRemove1 (e , ssS .RpcReady , nil )
s .Mach .EvAdd1 (e , ssS .RpcStarting , nil )
}
}()
srv .OnDisconnect (func (client *rpc2 .Client ) {
s .Mach .EvRemove1 (e , ssS .ClientConnected , Pass (&A {Client : client }))
})
srv .OnConnect (func (client *rpc2 .Client ) {
s .Mach .EvAdd1 (e , ssS .ClientConnected , Pass (&A {Client : client }))
})
}()
}
func (s *Server ) RpcReadyEnter (e *am .Event ) bool {
return s .Mach .Is1 (ssS .RpcAccepting )
}
func (s *Server ) RpcReadyState (e *am .Event ) {
if s .Opts .WebSocketTunnel != "" {
s .log ("ws tunnel ok: %s" , s .Opts .WebSocketTunnel )
}
if *s .PushInterval .Load () == 0 {
return
}
ctx := s .Mach .NewStateCtx (ssS .RpcReady )
if s .ticker == nil {
s .ticker = time .NewTicker (*s .PushInterval .Load ())
}
t := s .ticker
go func () {
if ctx .Err () != nil {
return
}
for {
select {
case <- ctx .Done ():
s .ticker .Stop ()
return
case <- t .C :
s .pushClient ()
}
}
}()
}
func (s *Server ) HandshakeDoneEnd (e *am .Event ) {
if c := s .rpcClient .Load (); c != nil {
_ = c .Close ()
}
}
func (s *Server ) Start (e *am .Event ) am .Result {
return s .Mach .EvAdd1 (e , ssS .Start , nil )
}
func (s *Server ) Stop (e *am .Event , dispose bool ) am .Result {
if s .Mach == nil {
return am .Canceled
}
if dispose {
s .log ("disposing" )
}
amhelp .DisposeEv (s .Mach , e )
return am .Executed
}
func (s *Server ) SendPayload (
ctx context .Context , srcEvent *am .Event , payload *MsgSrvPayload ,
) error {
if s .Mach .Not1 (ssS .ClientConnected ) || s .Mach .Not1 (ssS .HandshakeDone ) {
return ErrNoConn
}
id := s .ClientId ()
if payload .Destination != "" && id != payload .Destination {
return fmt .Errorf ("%w: %s != %s" , ErrDestination , payload .Destination , id )
}
defer s .Mach .PanicToErr (nil )
payload .Token = utils .RandId (0 )
if srcEvent != nil {
payload .Source = srcEvent .MachineId
payload .SourceTx = srcEvent .TransitionId
}
s .log ("sending payload %s from %s to %s" , payload .Name , payload .Source ,
payload .Destination )
return s .rpcClient .Load ().CallWithContext (ctx ,
ClientSendPayload .Value , payload , &MsgEmpty {})
}
func (s *Server ) ClientId () string {
id := s .clientId .Load ()
if id == nil {
return ""
}
return *id
}
func (s *Server ) GetKind () Kind {
return KindServer
}
func (s *Server ) log (msg string , args ...any ) {
if !s .LogEnabled {
return
}
s .Mach .Log (msg , args ...)
}
func (s *Server ) bindRpcHandlers () {
s .rpcServer = rpc2 .NewServer ()
if os .Getenv (EnvAmRpcLogServer ) != "" {
rpc2 .DebugLog = true
}
s .rpcServer .Handle (ServerHello .Value , s .RemoteHello )
s .rpcServer .Handle (ServerHandshake .Value , s .RemoteHandshake )
s .rpcServer .Handle (ServerAdd .Value , s .RemoteAdd )
s .rpcServer .Handle (ServerAddNS .Value , s .RemoteAddNS )
s .rpcServer .Handle (ServerRemove .Value , s .RemoteRemove )
s .rpcServer .Handle (ServerSet .Value , s .RemoteSet )
s .rpcServer .Handle (ServerSync .Value , s .RemoteSync )
s .rpcServer .Handle (ServerArgs .Value , s .RemoteArgs )
s .rpcServer .Handle (ServerBye .Value , s .RemoteBye )
}
func (s *Server ) pushClient () {
c := s .rpcClient .Load ()
if c == nil {
return
}
if *s .PushInterval .Load () == 0 || s .Mach .Not1 (ssS .HandshakeDone ) {
return
}
if !s .lockExport .TryLock () {
s .log ("skip parallel export" )
return
}
defer s .lockExport .Unlock ()
data := s .tracer .DataLatest ()
if data == nil {
return
}
if s .lastPushData .mTrackedTimeSum == data .mTrackedTimeSum &&
s .lastPushData .queueTick == data .queueTick {
return
}
if time .Since (s .lastPush ) < *s .PushInterval .Load () {
return
}
s .log ("pushClient:try t%d" , data .mTrackedTimeSum )
var err error
if s .syncMutations {
err = s .pushUpdateMutations (s .tracer .DataQueue ())
} else {
err = s .pushUpdateLatest (data )
}
if err != nil {
s .Mach .Remove1 (ssS .ClientConnected , nil )
AddErr (nil , s .Mach , "pushClient" , err )
return
}
s .log ("pushClient:ok t%d" , data .mTrackedTimeSum )
s .storeLastPush (data )
}
func (s *Server ) pushUpdateMutations (muts []tracerMutation ) error {
c := s .rpcClient .Load ()
if c == nil {
return nil
}
defer s .Mach .PanicToErr (nil )
updateMuts := calcUpdateMutations (s .syncSchema , muts , s .lastPushData )
if len (updateMuts .MutationType ) == 0 {
return nil
}
s .CallCount ++
return c .Notify (ClientUpdateMutations .Value , updateMuts )
}
func (s *Server ) pushUpdateLatest (data *tracerData ) error {
c := s .rpcClient .Load ()
if c == nil {
return nil
}
defer s .Mach .PanicToErr (nil )
update := calcUpdate (s .syncSchema , data , s .lastPushData , s .syncShallowClocks )
if len (update .Indexes ) == 0 {
return nil
}
s .CallCount ++
return c .Notify (ClientUpdate .Value , update )
}
func (s *Server ) newMsgMutation (
mut am .Result , data *tracerData ,
) *MsgSrvMutation {
r := MsgSrvMutation {Result : mut }
if s .syncMutations {
r .Mutations = calcUpdateMutations (s .syncSchema , s .tracer .DataQueue (),
s .lastPushData )
} else {
r .Update = calcUpdate (s .syncSchema , data , s .lastPushData ,
s .syncShallowClocks )
}
s .storeLastPush (data )
return &r
}
func (s *Server ) storeLastPush (data *tracerData ) {
s .lastPush = time .Now ()
s .lastPushData = data
}
func (s *Server ) RemoteHello (
client *rpc2 .Client , req *MsgCliHello , resp *MsgSrvHello ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
if req == nil || req .Id == "" {
s .Mach .Remove1 (ssS .Handshaking , nil )
AddErrRpcStr (nil , s .Mach , "handshake failed: ID missing" )
return ErrInvalidParams
}
s .lockCollection .Lock ()
defer s .lockCollection .Unlock ()
if s .AllowId != "" && req .Id != s .AllowId {
s .Mach .Remove1 (ssS .Handshaking , nil )
return fmt .Errorf ("%w: %s != %s" , ErrNoAccess , req .Id , s .AllowId )
}
s .syncAllowedStates = req .AllowedStates
s .syncSkippedStates = req .SkippedStates
s .syncShallowClocks = req .ShallowClocks
s .syncMutations = req .SyncMutations
export , schema , _ := s .Source .Export ()
s .tracer .calcTrackedStates (export .StateNames )
s .tracer .active = true
statesCount := len (export .StateNames )
tTrackedSum := export .Time .Filter (s .tracer .trackedStateIdxs ).Sum (nil )
if !req .SyncSchema {
export .StateNames = am .StatesShared (export .StateNames ,
s .tracer .trackedStates )
export .Time = export .Time .Filter (s .tracer .trackedStateIdxs )
} else {
for i := range export .StateNames {
if slices .Contains (s .tracer .trackedStateIdxs , i ) {
continue
}
export .Time [i ] = 0
}
}
*resp = MsgSrvHello {
Serialized : export ,
StatesCount : uint32 (statesCount ),
}
if req .SyncSchema {
s .syncSchema = true
if req .SchemaHash == "" ||
req .SchemaHash != amhelp .SchemaHash (schema ) {
resp .Schema = schema
}
}
s .lastPushData .mTime = export .Time
s .lastPushData .queueTick = export .QueueTick
s .lastPushData .mTrackedTimeSum = tTrackedSum
s .lastPush = time .Now ()
s .clientId .Store (&req .Id )
s .log ("RemoteHello: t%v q%d" , tTrackedSum , export .QueueTick )
s .Mach .Add1 (ssS .Handshaking , nil )
return nil
}
func (s *Server ) RemoteHandshake (
client *rpc2 .Client , _ *MsgEmpty , _ *MsgEmpty ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
id := s .clientId .Load ()
if id == nil {
return ErrNoConn
}
sum := s .Source .Time (nil ).Sum (nil )
qTick := s .Source .QueueTick ()
s .log ("RemoteHandshake: t%v q%d" , sum , qTick )
s .rpcClient .Store (client )
s .Mach .Add1 (ssS .HandshakeDone , Pass (&A {
Id : *id ,
}))
return nil
}
func (s *Server ) RemoteAdd (
_ *rpc2 .Client , req *MsgCliMutation , resp *MsgSrvMutation ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
s .lockExport .Lock ()
defer s .lockExport .Unlock ()
if req .States == nil {
return ErrInvalidParams
}
args := req .Args
if s .Opts .ParseRpc != nil {
args = s .Opts .ParseRpc (args )
}
var val am .Result
if req .Event != nil {
val = s .Source .EvAdd (req .Event , amhelp .IndexesToStates (
s .Source .StateNames (), req .States ), args )
} else {
val = s .Source .Add (amhelp .IndexesToStates (s .Source .StateNames (),
req .States ), args )
}
data := s .tracer .DataLatest ()
*resp = *s .newMsgMutation (val , data )
return nil
}
func (s *Server ) RemoteAddNS (
_ *rpc2 .Client , req *MsgCliMutation , _ *MsgEmpty ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
s .lockExport .Lock ()
defer s .lockExport .Unlock ()
if req .States == nil {
return ErrInvalidParams
}
args := req .Args
if s .Opts .ParseRpc != nil {
args = s .Opts .ParseRpc (args )
}
_ = s .Source .Add (amhelp .IndexesToStates (s .Source .StateNames (), req .States ),
args )
return nil
}
func (s *Server ) RemoteRemove (
_ *rpc2 .Client , req *MsgCliMutation , resp *MsgSrvMutation ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
s .lockExport .Lock ()
defer s .lockExport .Unlock ()
if req .States == nil {
return ErrInvalidParams
}
args := req .Args
if s .Opts .ParseRpc != nil {
args = s .Opts .ParseRpc (args )
}
val := s .Source .Remove (amhelp .IndexesToStates (s .Source .StateNames (),
req .States ), args )
data := s .tracer .DataLatest ()
*resp = *s .newMsgMutation (val , data )
return nil
}
func (s *Server ) RemoteSet (
_ *rpc2 .Client , req *MsgCliMutation , resp *MsgSrvMutation ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
s .lockExport .Lock ()
defer s .lockExport .Unlock ()
if req .States == nil {
return ErrInvalidParams
}
args := req .Args
if s .Opts .ParseRpc != nil {
args = s .Opts .ParseRpc (args )
}
val := s .Source .Set (amhelp .IndexesToStates (s .Source .StateNames (),
req .States ), args )
data := s .tracer .DataLatest ()
*resp = *s .newMsgMutation (val , data )
return nil
}
func (s *Server ) RemoteSync (
_ *rpc2 .Client , _ *MsgEmpty , resp *MsgSrvSync ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
s .Mach .Add1 (ssS .MetricSync , nil )
*resp = MsgSrvSync {
Time : s .Source .Time (nil ),
QueueTick : s .Source .QueueTick (),
}
s .log ("RemoteSync: [%v]" , resp .Time )
return nil
}
func (s *Server ) RemoteArgs (
_ *rpc2 .Client , _ *MsgEmpty , resp *MsgSrvArgs ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
if s .Args != nil {
args , err := utils .StructFields (s .Args )
if err != nil {
return err
}
(*resp ).Args = args
}
return nil
}
func (s *Server ) RemoteBye (
_ *rpc2 .Client , _ *MsgEmpty , _ *MsgEmpty ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
s .log ("RemoteBye" )
s .Mach .Remove1 (ssS .ClientConnected , Pass (&A {
Addr : s .Addr ,
}))
go func () {
select {
case <- time .After (100 * time .Millisecond ):
s .log ("rpc.Close timeout" )
case <- amhelp .ExecAndClose (func () {
if c := s .rpcClient .Load (); c != nil {
_ = c .Close ()
}
}):
s .log ("rpc.Close" )
}
time .Sleep (100 * time .Millisecond )
s .Mach .Remove1 (ssS .HandshakeDone , nil )
}()
s .rpcClient .Store (nil )
s .clientId .Store (nil )
return nil
}
func BindServer (
source , target *am .Machine , rpcReady , clientReady string ,
) error {
if rpcReady == "" || clientReady == "" {
return fmt .Errorf ("rpcReady and clientConn must be set" )
}
h := &struct {
RpcReadyState am .HandlerFinal
RpcReadyEnd am .HandlerFinal
HandshakeDoneState am .HandlerFinal
HandshakeDoneEnd am .HandlerFinal
}{
RpcReadyState : ampipe .Add (source , target , ssS .RpcReady , rpcReady ),
RpcReadyEnd : ampipe .Remove (source , target , ssS .RpcReady , rpcReady ),
HandshakeDoneState : ampipe .Add (source , target , ssS .ClientConnected ,
clientReady ),
HandshakeDoneEnd : ampipe .Remove (source , target , ssS .ClientConnected ,
clientReady ),
}
return source .BindHandlers (h )
}
func BindServerMulti (
source , target *am .Machine , rpcReady , clientConn , clientDisconn string ,
) error {
if rpcReady == "" || clientConn == "" || clientDisconn == "" {
return fmt .Errorf ("rpcReady, clientConn, and clientDisconn must be set" )
}
h := &struct {
RpcReadyState am .HandlerFinal
RpcReadyEnd am .HandlerFinal
HandshakeDoneState am .HandlerFinal
HandshakeDoneEnd am .HandlerFinal
}{
RpcReadyState : ampipe .Add (source , target , ssS .RpcReady , rpcReady ),
RpcReadyEnd : ampipe .Remove (source , target , ssS .RpcReady , rpcReady ),
HandshakeDoneState : ampipe .Add (source , target ,
ssS .ClientConnected , clientConn ),
HandshakeDoneEnd : ampipe .Add (source , target ,
ssS .ClientConnected , clientDisconn ),
}
return source .BindHandlers (h )
}
func BindServerRpcReady (source , target *am .Machine , rpcReady string ) error {
h := &struct {
RpcReadyState am .HandlerFinal
}{
RpcReadyState : ampipe .Add (source , target , ssS .RpcReady , rpcReady ),
}
return source .BindHandlers (h )
}
type ServerOpts struct {
PayloadState string
Parent am .Api
Args any
ParseRpc func (args am .A ) am .A
WebSocket bool
WebSocketTunnel string
}
type SendPayloadHandlers struct {
SendPayloadState am .HandlerFinal
}
func getSendPayloadState(s *Server , stateName string ) am .HandlerFinal {
return func (e *am .Event ) {
e .Machine ().EvRemove1 (e , stateName , nil )
ctx := s .Mach .NewStateCtx (ssS .Start )
args := ParseArgs (e .Args )
argsOut := &A {Name : args .Name }
if args .Payload == nil || args .Name == "" {
err := fmt .Errorf ("invalid payload args [name, payload]" )
e .Machine ().EvAddErrState (e , ssW .ErrSendPayload , err , Pass (argsOut ))
return
}
go func () {
ctx , cancel := context .WithTimeout (ctx , s .DeliveryTimeout )
defer cancel ()
err := s .SendPayload (ctx , e , args .Payload )
if err != nil {
e .Machine ().EvAddErrState (e , ssW .ErrSendPayload , err , Pass (argsOut ))
}
}()
}
}
func createSendPayloadHandlers(s *Server , stateName string ) any {
fn := getSendPayloadState (s , stateName )
structType := reflect .StructOf ([]reflect .StructField {
{
Name : stateName + am .SuffixState ,
Type : reflect .TypeOf (fn ),
},
})
val := reflect .New (structType ).Elem ()
val .Field (0 ).Set (reflect .ValueOf (fn ))
ret := val .Addr ().Interface ()
return ret
}
func calcUpdate(
syncSchema bool , data , lastPush *tracerData , shallowClocks bool ,
) *MsgSrvUpdate {
var idxs []uint16
var ticks []uint32
if shallowClocks {
idxs , ticks = genShallowUpdate (syncSchema , data , lastPush )
} else {
idxs , ticks = genDeepUpdate (syncSchema , data , lastPush )
}
return &MsgSrvUpdate {
QueueTick : uint16 (data .queueTick - lastPush .queueTick ),
MachTick : uint8 (data .machTick - lastPush .machTick ),
Indexes : idxs ,
Ticks : ticks ,
Checksum : data .checksum ,
}
}
func calcUpdateMutations(
syncSchema bool , muts []tracerMutation , prev *tracerData ,
) *MsgSrvUpdateMuts {
ret := &MsgSrvUpdateMuts {}
for i := range muts {
mut := &muts [i ]
ret .MutationType = append (ret .MutationType , mut .mutType )
called := make ([]uint16 , len (mut .calledIdxs ))
for ii := range mut .calledIdxs {
called [ii ] = uint16 (mut .calledIdxs [ii ])
}
ret .CalledStates = append (ret .CalledStates , called )
ret .Updates = append (ret .Updates , *calcUpdate (syncSchema , &mut .data , prev ,
false ))
prev = &mut .data
}
return ret
}
func genDeepUpdate(
syncSchema bool , data , lastPush *tracerData ,
) (indexes []uint16 , ticks []uint32 ) {
indexes = make ([]uint16 , 0 , len (data .tracked ))
ticks = make ([]uint32 , 0 , len (data .tracked ))
for trackedIdx := range data .tracked {
stateIdx := data .trackedIdxs [trackedIdx ]
prev := lastPush .mTime
now := data .mTime
pushedIdx := uint16 (stateIdx )
if !syncSchema {
pushedIdx = uint16 (trackedIdx )
}
if prev == nil || trackedIdx >= len (prev ) {
if now [trackedIdx ] == 0 {
continue
}
indexes = append (indexes , pushedIdx )
ticks = append (ticks , uint32 (now [pushedIdx ]))
} else if prev [pushedIdx ] != now [pushedIdx ] {
indexes = append (indexes , pushedIdx )
ticks = append (ticks , uint32 (now [pushedIdx ]-prev [pushedIdx ]))
}
}
return indexes , ticks
}
func genShallowUpdate(
syncSchema bool , data , lastPush *tracerData ,
) (indexes []uint16 , ticks []uint32 ) {
indexes = make ([]uint16 , 0 , len (data .tracked ))
ticks = make ([]uint32 , 0 , len (data .tracked ))
for trackedIdx := range data .tracked {
stateIdx := data .trackedIdxs [trackedIdx ]
prev := lastPush .mTime
now := data .mTime
pushedIdx := uint16 (stateIdx )
if !syncSchema {
pushedIdx = uint16 (trackedIdx )
}
if prev == nil || int (pushedIdx ) >= len (prev ) {
if now [pushedIdx ] == 0 {
continue
}
indexes = append (indexes , pushedIdx )
tick := 0
if am .IsActiveTick (now [pushedIdx ]) {
tick = 1
}
ticks = append (ticks , uint32 (tick ))
} else if prev [pushedIdx ]%2 != now [pushedIdx ]%2 {
indexes = append (indexes , pushedIdx )
ticks = append (ticks , uint32 (1 ))
}
}
return indexes , ticks
}
type wsHandlerServer struct {
s *Server
event *am .Event
}
func (h *wsHandlerServer ) ServeHTTP (w http .ResponseWriter , r *http .Request ) {
mach := h .s .Mach
connWs , err := websocket .Accept (w , r , &websocket .AcceptOptions {
InsecureSkipVerify : true ,
})
if err != nil {
log .Printf ("Upgrade error: %v" , err )
return
}
conn := websocket .NetConn (mach .Context (), connWs , websocket .MessageBinary )
h .s .Conn = conn
mach .EvAdd1 (h .event , ssS .RpcAccepting , nil )
select {
case <- mach .WhenNot1 (ss .Start , nil ):
case <- r .Context ().Done ():
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .