package rpc
import (
"encoding/gob"
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
"reflect"
"slices"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/cenkalti/rpc2"
"github.com/orsinium-labs/enum"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/rpc/states"
)
func init() {
gob .Register (&ARpc {})
gob .Register (am .Relation (0 ))
}
const (
EnvAmRpcLogServer = "AM_RPC_LOG_SERVER"
EnvAmRpcLogClient = "AM_RPC_LOG_CLIENT"
EnvAmRpcLogMux = "AM_RPC_LOG_MUX"
EnvAmRpcDbg = "AM_RPC_DBG"
EnvAmReplAddr = "AM_REPL_ADDR"
EnvAmReplDir = "AM_REPL_DIR"
)
var ss = states .SharedStates
type (
ServerMethod enum .Member [string ]
ClientMethod enum .Member [string ]
)
var (
ServerAdd = ServerMethod {"Add" }
ServerAddNS = ServerMethod {"AddNS" }
ServerRemove = ServerMethod {"Remove" }
ServerSet = ServerMethod {"Set" }
ServerHello = ServerMethod {"Hello" }
ServerHandshake = ServerMethod {"Handshake" }
ServerLog = ServerMethod {"Log" }
ServerSync = ServerMethod {"Sync" }
ServerBye = ServerMethod {"Close" }
ServerMethods = enum .New (ServerAdd , ServerAddNS , ServerRemove , ServerSet ,
ServerHello , ServerHandshake , ServerLog , ServerSync , ServerBye )
ClientSetClock = ClientMethod {"ClientSetClock" }
ClientPushAllTicks = ClientMethod {"ClientPushAllTicks" }
ClientSendPayload = ClientMethod {"ClientSendPayload" }
ClientBye = ClientMethod {"ClientBye" }
ClientSchemaChange = ClientMethod {"SchemaChange" }
ClientMethods = enum .New (ClientSetClock , ClientPushAllTicks ,
ClientSendPayload , ClientBye , ClientSchemaChange )
)
type ArgsHello struct {
ReqSchema bool
}
type ArgsMut struct {
States []int
Args am .A
Event *am .Event
}
type ArgsGet struct {
Name string
}
type ArgsLog struct {
Msg string
Args []any
}
type ArgsPayload struct {
Name string
Source string
SourceTx string
Destination string
Data any
Token string
}
type RespHandshake struct {
Schema am .Schema
Serialized *am .Serialized
}
type RespResult struct {
Clock *ClockMsg
Result am .Result
}
type RespSync struct {
Time am .Time
QueueTick uint64
}
type RespGet struct {
Value any
}
type Empty struct {}
type ClockMsg struct {
Updates [][2 ]int
QueueTick int
Checksum uint8
}
type PushAllTicks struct {
MutationType []am .MutationType
CalledStates [][]int
ClockMsg []*ClockMsg
}
type clientServerMethods interface {
GetKind() Kind
}
type Kind string
const (
KindClient Kind = "client"
KindServer Kind = "server"
)
const APrefix = "am_rpc"
type A struct {
Id string `log:"id"`
Name string `log:"name"`
MachTime am .Time
QueueTick uint64
Payload *ArgsPayload
Addr string `log:"addr"`
Err error
Method string `log:"addr"`
StartedAt time .Time
Dispose bool
Client *rpc2 .Client
}
type ARpc struct {
Id string `log:"id"`
Name string `log:"name"`
MachTime am .Time
Payload *ArgsPayload
Addr string `log:"addr"`
Err error
Method string `log:"addr"`
StartedAt time .Time
Dispose bool
}
func ParseArgs (args am .A ) *A {
if r , _ := args [APrefix ].(*ARpc ); r != nil {
return amhelp .ArgsToArgs (r , &A {})
}
if a , _ := args [APrefix ].(*A ); a != nil {
return a
}
return &A {}
}
func Pass (args *A ) am .A {
return am .A {APrefix : args }
}
func PassRpc (args *A ) am .A {
return am .A {APrefix : amhelp .ArgsToArgs (args , &ARpc {})}
}
func LogArgs (args am .A ) map [string ]string {
a := ParseArgs (args )
if a == nil {
return nil
}
return amhelp .ArgsToLogMap (a , 0 )
}
type serverRpcMethods interface {
RemoteHello(client *rpc2 .Client , args *ArgsHello , resp *RespHandshake ) error
RemoteAdd(client *rpc2 .Client , args *ArgsMut , resp *RespResult ) error
RemoteRemove(client *rpc2 .Client , args *ArgsMut , resp *RespResult ) error
RemoteSet(client *rpc2 .Client , args *ArgsMut , reply *RespResult ) error
}
type clientRpcMethods interface {
RemoteSetClock(worker *rpc2 .Client , args *ClockMsg , resp *Empty ) error
RemoteSendingPayload(
worker *rpc2 .Client , file *ArgsPayload , resp *Empty ,
) error
RemoteSendPayload(worker *rpc2 .Client , file *ArgsPayload , resp *Empty ) error
}
var (
ErrInvalidParams = errors .New ("invalid params" )
ErrInvalidResp = errors .New ("invalid response" )
ErrRpc = errors .New ("rpc" )
ErrNoAccess = errors .New ("no access" )
ErrNoConn = errors .New ("not connected" )
ErrDestination = errors .New ("wrong destination" )
ErrNetwork = errors .New ("network error" )
ErrNetworkTimeout = errors .New ("network timeout" )
)
func AddErrRpcStr (e *am .Event , mach *am .Machine , msg string ) {
err := fmt .Errorf ("%w: %s" , ErrRpc , msg )
mach .EvAddErrState (e , ss .ErrRpc , err , nil )
}
func AddErrParams (e *am .Event , mach *am .Machine , err error ) {
err = fmt .Errorf ("%w: %w" , ErrInvalidParams , err )
mach .AddErrState (ss .ErrRpc , err , nil )
}
func AddErrResp (e *am .Event , mach *am .Machine , err error ) {
err = fmt .Errorf ("%w: %w" , ErrInvalidResp , err )
mach .AddErrState (ss .ErrRpc , err , nil )
}
func AddErrNetwork (e *am .Event , mach *am .Machine , err error ) {
mach .AddErrState (ss .ErrNetwork , err , nil )
}
func AddErrNoConn (e *am .Event , mach *am .Machine , err error ) {
err = fmt .Errorf ("%w: %w" , ErrNoConn , err )
mach .AddErrState (ss .ErrNetwork , err , nil )
}
func AddErr (e *am .Event , mach *am .Machine , msg string , err error ) {
if msg != "" {
err = fmt .Errorf ("%w: %s" , err , msg )
}
if strings .HasPrefix (err .Error(), "gob: " ) {
AddErrResp (e , mach , err )
} else if strings .Contains (err .Error(), "rpc2: can't find method" ) {
AddErrRpcStr (e , mach , err .Error())
} else if strings .Contains (err .Error(), "connection is shut down" ) ||
strings .Contains (err .Error(), "unexpected EOF" ) {
mach .AddErrState (ss .ErrRpc , err , nil )
} else if strings .Contains (err .Error(), "timeout" ) {
AddErrNetwork (e , mach , errors .Join (err , ErrNetworkTimeout ))
} else if _ , ok := err .(*net .OpError ); ok {
AddErrNetwork (e , mach , err )
} else {
mach .AddErr (err , nil )
}
}
type ExceptionHandler struct {
*am .ExceptionHandler
}
func (h *ExceptionHandler ) ExceptionEnter (e *am .Event ) bool {
args := ParseArgs (e .Args )
mach := e .Machine ()
isRpcClient := mach .Has (am .S {ssC .Disconnecting , ssC .Disconnected })
if errors .Is (args .Err , ErrNetwork ) && isRpcClient &&
mach .Any1 (ssC .Disconnecting , ssC .Disconnected ) {
e .Machine ().Log ("ignoring ErrNetwork on Disconnecting/Disconnected" )
return false
}
return true
}
type semLogger struct {
mach *Worker
steps atomic .Bool
graph atomic .Bool
}
var _ am .SemLogger = &semLogger {}
func (s *semLogger ) SetArgsMapper (mapper am .LogArgsMapperFn ) {
}
func (s *semLogger ) ArgsMapper () am .LogArgsMapperFn {
return nil
}
func (s *semLogger ) EnableId (val bool ) {
}
func (s *semLogger ) IsId () bool {
return false
}
func (s *semLogger ) SetLogger (fn am .LoggerFn ) {
if fn == nil {
s .mach .logger .Store (nil )
return
}
s .mach .logger .Store (&fn )
}
func (s *semLogger ) Logger () am .LoggerFn {
if l := s .mach .logger .Load (); l != nil {
return *l
}
return nil
}
func (s *semLogger ) SetLevel (lvl am .LogLevel ) {
s .mach .logLevel .Store (&lvl )
}
func (s *semLogger ) Level () am .LogLevel {
return *s .mach .logLevel .Load ()
}
func (s *semLogger ) SetEmpty (lvl am .LogLevel ) {
var logger am .LoggerFn = func (_ am .LogLevel , msg string , args ...any ) {
}
s .mach .logger .Store (&logger )
s .mach .logLevel .Store (&lvl )
}
func (s *semLogger ) SetSimple (
logf func (format string , args ...any ), level am .LogLevel ,
) {
var logger am .LoggerFn = func (_ am .LogLevel , msg string , args ...any ) {
logf (msg , args ...)
}
s .mach .logger .Store (&logger )
s .mach .logLevel .Store (&level )
}
func (s *semLogger ) AddPipeOut (addMut bool , sourceState , targetMach string ) {
kind := "remove"
if addMut {
kind = "add"
}
s .mach .log (am .LogOps , "[pipe-out:%s] %s to %s" , kind , sourceState ,
targetMach )
}
func (s *semLogger ) AddPipeIn (addMut bool , targetState , sourceMach string ) {
kind := "remove"
if addMut {
kind = "add"
}
s .mach .log (am .LogOps , "[pipe-in:%s] %s from %s" , kind , targetState ,
sourceMach )
}
func (s *semLogger ) RemovePipes (machId string ) {
s .mach .log (am .LogOps , "[pipe:gc] %s" , machId )
}
func (s *semLogger ) IsSteps () bool {
return s .steps .Load ()
}
func (s *semLogger ) EnableSteps (enable bool ) {
s .steps .Store (enable )
}
func (s *semLogger ) IsGraph () bool {
return s .graph .Load ()
}
func (s *semLogger ) EnableGraph (enable bool ) {
s .graph .Store (enable )
}
func (s *semLogger ) EnableStateCtx (val bool ) {
}
func (s *semLogger ) IsStateCtx () bool {
return true
}
func (s *semLogger ) EnableWhen (val bool ) {
}
func (s *semLogger ) IsWhen () bool {
return true
}
func (s *semLogger ) EnableArgs (val bool ) {
}
func (s *semLogger ) IsArgs () bool {
return true
}
func (s *semLogger ) EnableQueued (val bool ) {
}
func (s *semLogger ) IsQueued () bool {
return true
}
func (s *semLogger ) EnableCan (enable bool ) {
}
func (s *semLogger ) IsCan () bool {
return true
}
type remoteHandler struct {
h any
funcNames []string
funcCache map [string ]reflect .Value
missingCache map [string ]struct {}
}
func newRemoteHandler(
h any ,
funcNames []string ,
) *remoteHandler {
return &remoteHandler {
h : h ,
funcNames : funcNames ,
funcCache : make (map [string ]reflect .Value ),
missingCache : make (map [string ]struct {}),
}
}
type WorkerTracer struct {
*am .NoOpTracer
s *Server
}
func (t *WorkerTracer ) TransitionEnd (tx *am .Transition ) {
go func () {
t .s .mutMx .Lock ()
defer t .s .mutMx .Unlock ()
t .s .pushClockUpdate (false )
}()
}
func (t *WorkerTracer ) SchemaChange (mach am .Api , oldSchema am .Schema ) {
go func () {
t .s .mutMx .Lock ()
defer t .s .mutMx .Unlock ()
if c := t .s .rpcClient .Load (); c != nil {
msg := &RespHandshake {
Schema : mach .Schema (),
Serialized : mach .Export (),
}
err := c .CallWithContext (mach .Ctx (),
ClientSchemaChange .Value , msg , &Empty {})
mach .AddErr (err , nil )
}
}()
}
func MachReplEnv (mach am .Api ) <-chan error {
addr := os .Getenv (EnvAmReplAddr )
dir := os .Getenv (EnvAmReplDir )
err := make (chan error )
switch addr {
case "" :
return err
case "1" :
addr = ""
}
MachRepl (mach , addr , dir , nil , nil )
return err
}
func MachRepl (
mach am .Api , addr , addrDir string , addrCh chan <- string , errCh chan <- error ,
) {
if amhelp .IsTestRunner () {
return
}
if addr == "" {
addr = "127.0.0.1:0"
}
if mach .HasHandlers () && !mach .Has (ssW .Names ()) {
err := fmt .Errorf (
"%w: REPL source has to implement pkg/rpc/states/WorkerStatesDef" ,
am .ErrSchema )
panic (err )
}
mux , err := NewMux (mach .Ctx (), "repl-" +mach .Id (), nil , &MuxOpts {
Parent : mach ,
})
if err != nil {
panic (err )
}
mux .Addr = addr
mux .Source = mach
mux .Start ()
if addrCh == nil && addrDir == "" {
if errCh != nil {
close (errCh )
}
return
}
go func () {
defer func () {
if errCh != nil {
close (errCh )
}
if addrCh != nil {
close (addrCh )
}
}()
dirOk := false
if addrDir != "" {
if _ , err := os .Stat (addrDir ); os .IsNotExist (err ) {
err := os .MkdirAll (addrDir , 0o755 )
if err == nil {
dirOk = true
} else if errCh != nil {
errCh <- err
}
} else {
dirOk = true
}
}
<-mux .Mach .When1 (ssM .Ready , nil )
if addrCh != nil {
addrCh <- mux .Addr
}
if dirOk && addrDir != "" {
err = os .WriteFile (
filepath .Join (addrDir , mach .Id ()+".addr" ),
[]byte (mux .Addr ), 0o644 ,
)
if errCh != nil {
errCh <- err
}
}
}()
}
func Checksum (mTime uint64 , qTick uint64 ) uint8 {
return uint8 (mTime +qTick ) % 10
}
func NewClockMsg (
tSum uint64 , tBefore , tAfter am .Time , qBefore , qAfter uint64 ,
) *ClockMsg {
ret := &ClockMsg {
QueueTick : int (qAfter ) - int (qBefore ),
Checksum : Checksum (tSum , qAfter ),
}
for stateIdx := range tAfter {
if tBefore == nil || stateIdx >= len (tBefore ) {
ret .Updates = append (ret .Updates ,
[2 ]int {stateIdx , int (tAfter [stateIdx ])})
} else if tBefore [stateIdx ] != tAfter [stateIdx ] {
ret .Updates = append (ret .Updates ,
[2 ]int {stateIdx , int (tAfter [stateIdx ] - tBefore [stateIdx ])})
}
}
return ret
}
func ClockFromMsg (
timeBefore am .Time , qTickBefore uint64 , msg *ClockMsg ,
) (am .Time , uint64 ) {
timeAfter := slices .Clone (timeBefore )
l := len (timeAfter )
for _ , v := range msg .Updates {
key := v [0 ]
val := v [1 ]
if key >= l {
continue
}
timeAfter [key ] += uint64 (val )
}
qTickAfter := qTickBefore + uint64 (msg .QueueTick )
return timeAfter , qTickAfter
}
func TrafficMeter (
listener net .Listener , fwdTo string , counter chan <- int64 ,
end <-chan struct {},
) {
defer listener .Close ()
destination , err := net .Dial ("tcp4" , fwdTo )
if err != nil {
fmt .Println ("Error connecting to destination:" , err .Error())
return
}
defer destination .Close ()
conn , err := listener .Accept ()
if err != nil {
fmt .Println ("Error accepting connection:" , err .Error())
return
}
defer conn .Close ()
wg := sync .WaitGroup {}
wg .Add (2 )
bytes := atomic .Int64 {}
go func () {
c , _ := io .Copy (destination , conn )
bytes .Add (c )
wg .Done ()
}()
go func () {
c , _ := io .Copy (conn , destination )
bytes .Add (c )
wg .Done ()
}()
<-end
_ = listener .Close ()
_ = destination .Close ()
_ = conn .Close ()
wg .Wait ()
c := bytes .Load ()
counter <- c
}
func newClosedChan() chan struct {} {
ch := make (chan struct {})
close (ch )
return ch
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .