package rpc
import (
"context"
"fmt"
"net"
"os"
"reflect"
"slices"
"sync"
"sync/atomic"
"time"
"github.com/cenkalti/rpc2"
"github.com/pancsta/asyncmachine-go/internal/utils"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/rpc/states"
ampipe "github.com/pancsta/asyncmachine-go/pkg/states/pipes"
)
var (
ssS = states .ServerStates
ssW = states .NetSourceStates
)
type Server struct {
*ExceptionHandler
Mach *am .Machine
Source am .Api
Addr string
DeliveryTimeout time .Duration
Listener atomic .Pointer [net .Listener ]
Conn net .Conn
NoNewListener bool
LogEnabled bool
CallCount uint64
Args any
ArgsPrefix string
PushInterval atomic .Pointer [time .Duration ]
syncMutations bool
syncAllowedStates am .S
syncSkippedStates am .S
syncSchema bool
syncShallowClocks bool
AllowId string
rpcServer *rpc2 .Server
rpcClient atomic .Pointer [rpc2 .Client ]
tracer *sourceTracer
ticker *time .Ticker
lockCollection sync .Mutex
lockExport sync .Mutex
clientId atomic .Pointer [string ]
deliveryHandlers any
lastPush time .Time
lastPushData *tracerData
}
var (
_ serverRpcMethods = &Server {}
_ clientServerMethods = &Server {}
)
func NewServer (
ctx context .Context , addr string , name string , netSrcMach am .Api ,
opts *ServerOpts ,
) (*Server , error ) {
if name == "" {
name = "rpc"
}
if opts == nil {
opts = &ServerOpts {}
}
if !netSrcMach .StatesVerified () {
return nil , fmt .Errorf (
"net source states not verified, call VerifyStates()" )
}
hasHandlers := netSrcMach .HasHandlers ()
if hasHandlers && !netSrcMach .Has (ssW .Names ()) {
err := fmt .Errorf (
"%w: NetSourceMach with handlers has to implement " +
"pkg/rpc/states/NetSourceStatesDef" ,
am .ErrSchema )
return nil , err
}
s := &Server {
ExceptionHandler : &ExceptionHandler {},
Addr : addr ,
DeliveryTimeout : 5 * time .Second ,
LogEnabled : os .Getenv (EnvAmRpcLogServer ) != "" ,
Source : netSrcMach ,
Args : opts .Args ,
ArgsPrefix : opts .ArgsPrefix ,
lastPushData : &tracerData {},
}
interval := 250 * time .Millisecond
s .PushInterval .Store (&interval )
mach , err := am .NewCommon (ctx , "rs-" +name , states .ServerSchema , ssS .Names (),
s , opts .Parent , &am .Opts {Tags : []string {"rpc-server" }})
if err != nil {
return nil , err
}
mach .SemLogger ().SetArgsMapper (LogArgs )
mach .SetGroups (states .ServerGroups , ssS )
mach .OnDispose (func (id string , ctx context .Context ) {
if l := s .Listener .Load (); l != nil {
_ = (*l ).Close ()
s .Listener .Store (nil )
}
s .rpcServer = nil
_ = s .Source .DetachTracer (s .tracer )
_ = s .Source .DetachHandlers (s .deliveryHandlers )
})
s .Mach = mach
if os .Getenv (EnvAmRpcDbg ) != "" {
_ = amhelp .MachDebugEnv (mach )
}
s .tracer = &sourceTracer {
s : s ,
dataLatest : &tracerData {
queueTick : 1 ,
},
}
s .tracer .dataLatest .queueTick = 1
if err = netSrcMach .BindTracer (s .tracer ); err != nil {
return nil , err
}
netSrcMach .OnDispose (func (id string , ctx context .Context ) {
_ = netSrcMach .DetachTracer (s .tracer )
})
if hasHandlers {
payloadState := ssW .SendPayload
if opts .PayloadState != "" {
payloadState = opts .PayloadState
}
var h any
if payloadState == ssW .SendPayload {
h = &SendPayloadHandlers {
SendPayloadState : getSendPayloadState (s , ssW .SendPayload ),
}
} else {
h = createSendPayloadHandlers (s , payloadState )
}
err = netSrcMach .BindHandlers (h )
if err != nil {
return nil , err
}
mach .OnDispose (func (id string , ctx context .Context ) {
_ = netSrcMach .DetachHandlers (h )
})
}
return s , nil
}
func (s *Server ) StartEnd (e *am .Event ) {
if ParseArgs (e .Args ).Dispose {
s .Mach .Dispose ()
}
}
func (s *Server ) RpcStartingEnter (e *am .Event ) bool {
if s .Listener .Load () == nil && s .NoNewListener {
return false
}
if s .Addr == "" {
return false
}
return true
}
func (s *Server ) RpcStartingState (e *am .Event ) {
ctxRpcStarting := s .Mach .NewStateCtx (ssS .RpcStarting )
ctxStart := s .Mach .NewStateCtx (ssS .Start )
s .log ("Starting RPC on %s" , s .Addr )
s .bindRpcHandlers ()
srv := s .rpcServer
go func () {
if ctxStart .Err () != nil {
return
}
if s .Conn != nil {
s .Addr = s .Conn .LocalAddr ().String ()
} else if l := s .Listener .Load (); l != nil {
s .Addr = (*l ).Addr ().String ()
} else {
cfg := net .ListenConfig {}
lis , err := cfg .Listen (ctxStart , "tcp4" , s .Addr )
if err != nil {
AddErrNetwork (e , s .Mach , err )
s .Mach .Remove1 (ssS .RpcStarting , nil )
return
}
s .Listener .Store (&lis )
s .Addr = lis .Addr ().String ()
}
s .log ("RPC started on %s" , s .Addr )
go func () {
if ctxRpcStarting .Err () != nil {
return
}
s .Mach .EvAdd1 (e , ssS .RpcReady , Pass (&A {Addr : s .Addr }))
lisP := s .Listener .Load ()
if s .Conn != nil {
srv .ServeConn (s .Conn )
} else {
srv .Accept (*lisP )
}
if ctxStart .Err () != nil {
return
}
if lisP != nil {
(*lisP ).Close ()
s .Listener .Store (nil )
}
if ctxStart .Err () != nil {
return
}
if s .Mach .Is1 (ssS .Start ) {
s .Mach .EvRemove1 (e , ssS .RpcReady , nil )
s .Mach .EvAdd1 (e , ssS .RpcStarting , nil )
}
}()
srv .OnDisconnect (func (client *rpc2 .Client ) {
s .Mach .EvRemove1 (e , ssS .ClientConnected , Pass (&A {Client : client }))
})
srv .OnConnect (func (client *rpc2 .Client ) {
s .Mach .EvAdd1 (e , ssS .ClientConnected , Pass (&A {Client : client }))
})
}()
}
func (s *Server ) RpcReadyEnter (e *am .Event ) bool {
return s .Mach .Is1 (ssS .RpcStarting )
}
func (s *Server ) RpcReadyState (e *am .Event ) {
if *s .PushInterval .Load () == 0 {
return
}
ctx := s .Mach .NewStateCtx (ssS .RpcReady )
if s .ticker == nil {
s .ticker = time .NewTicker (*s .PushInterval .Load ())
}
t := s .ticker
go func () {
if ctx .Err () != nil {
return
}
for {
select {
case <- ctx .Done ():
s .ticker .Stop ()
return
case <- t .C :
s .pushClient ()
}
}
}()
}
func (s *Server ) HandshakeDoneEnd (e *am .Event ) {
if c := s .rpcClient .Load (); c != nil {
_ = c .Close ()
}
}
func (s *Server ) Start () am .Result {
return s .Mach .Add1 (ssS .Start , nil )
}
func (s *Server ) Stop (dispose bool ) am .Result {
if s .Mach == nil {
return am .Canceled
}
if dispose {
s .log ("disposing" )
}
res := s .Mach .Remove1 (ssS .Start , Pass (&A {
Dispose : dispose ,
}))
return res
}
func (s *Server ) SendPayload (
ctx context .Context , event *am .Event , payload *MsgSrvPayload ,
) error {
if s .Mach .Not1 (ssS .ClientConnected ) || s .Mach .Not1 (ssS .HandshakeDone ) {
return ErrNoConn
}
id := s .ClientId ()
if payload .Destination != "" && id != payload .Destination {
return fmt .Errorf ("%w: %s != %s" , ErrDestination , payload .Destination , id )
}
defer s .Mach .PanicToErr (nil )
payload .Token = utils .RandId (0 )
if event != nil {
payload .Source = event .MachineId
payload .SourceTx = event .TransitionId
}
s .log ("sending payload %s from %s to %s" , payload .Name , payload .Source ,
payload .Destination )
return s .rpcClient .Load ().CallWithContext (ctx ,
ClientSendPayload .Value , payload , &MsgEmpty {})
}
func (s *Server ) ClientId () string {
id := s .clientId .Load ()
if id == nil {
return ""
}
return *id
}
func (s *Server ) GetKind () Kind {
return KindServer
}
func (s *Server ) log (msg string , args ...any ) {
if !s .LogEnabled {
return
}
s .Mach .Log (msg , args ...)
}
func (s *Server ) bindRpcHandlers () {
s .rpcServer = rpc2 .NewServer ()
s .rpcServer .Handle (ServerHello .Value , s .RemoteHello )
s .rpcServer .Handle (ServerHandshake .Value , s .RemoteHandshake )
s .rpcServer .Handle (ServerAdd .Value , s .RemoteAdd )
s .rpcServer .Handle (ServerAddNS .Value , s .RemoteAddNS )
s .rpcServer .Handle (ServerRemove .Value , s .RemoteRemove )
s .rpcServer .Handle (ServerSet .Value , s .RemoteSet )
s .rpcServer .Handle (ServerSync .Value , s .RemoteSync )
s .rpcServer .Handle (ServerArgs .Value , s .RemoteArgs )
s .rpcServer .Handle (ServerBye .Value , s .RemoteBye )
}
func (s *Server ) pushClient () {
c := s .rpcClient .Load ()
if c == nil {
return
}
if *s .PushInterval .Load () == 0 || s .Mach .Not1 (ssS .HandshakeDone ) {
return
}
if !s .lockExport .TryLock () {
s .log ("skip parallel export" )
return
}
defer s .lockExport .Unlock ()
data := s .tracer .DataLatest ()
if data == nil {
return
}
if s .lastPushData .mTrackedTimeSum == data .mTrackedTimeSum &&
s .lastPushData .queueTick == data .queueTick {
return
}
if time .Since (s .lastPush ) < *s .PushInterval .Load () {
return
}
s .log ("pushClient:try t%d" , data .mTrackedTimeSum )
var err error
if s .syncMutations {
err = s .pushUpdateMutations (s .tracer .DataQueue ())
} else {
err = s .pushUpdateLatest (data )
}
if err != nil {
s .Mach .Remove1 (ssS .ClientConnected , nil )
AddErr (nil , s .Mach , "pushClient" , err )
return
}
s .log ("pushClient:ok t%d" , data .mTrackedTimeSum )
s .storeLastPush (data )
}
func (s *Server ) pushUpdateMutations (muts []tracerMutation ) error {
c := s .rpcClient .Load ()
if c == nil {
return nil
}
defer s .Mach .PanicToErr (nil )
updateMuts := calcUpdateMutations (s .syncSchema , muts , s .lastPushData )
if len (updateMuts .MutationType ) == 0 {
return nil
}
s .CallCount ++
return c .Notify (ClientUpdateMutations .Value , updateMuts )
}
func (s *Server ) pushUpdateLatest (data *tracerData ) error {
c := s .rpcClient .Load ()
if c == nil {
return nil
}
defer s .Mach .PanicToErr (nil )
update := calcUpdate (s .syncSchema , data , s .lastPushData , s .syncShallowClocks )
if len (update .Indexes ) == 0 {
return nil
}
s .CallCount ++
return c .Notify (ClientUpdate .Value , update )
}
func (s *Server ) newMsgMutation (
mut am .Result , data *tracerData ,
) *MsgSrvMutation {
r := MsgSrvMutation {Result : mut }
if s .syncMutations {
r .Mutations = calcUpdateMutations (s .syncSchema , s .tracer .DataQueue (),
s .lastPushData )
} else {
r .Update = calcUpdate (s .syncSchema , data , s .lastPushData ,
s .syncShallowClocks )
}
s .storeLastPush (data )
return &r
}
func (s *Server ) storeLastPush (data *tracerData ) {
s .lastPush = time .Now ()
s .lastPushData = data
}
func (s *Server ) RemoteHello (
client *rpc2 .Client , req *MsgCliHello , resp *MsgSrvHello ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
if req == nil || req .Id == "" {
s .Mach .Remove1 (ssS .Handshaking , nil )
AddErrRpcStr (nil , s .Mach , "handshake failed: ID missing" )
return ErrInvalidParams
}
s .lockCollection .Lock ()
defer s .lockCollection .Unlock ()
if s .AllowId != "" && req .Id != s .AllowId {
s .Mach .Remove1 (ssS .Handshaking , nil )
return fmt .Errorf ("%w: %s != %s" , ErrNoAccess , req .Id , s .AllowId )
}
s .syncAllowedStates = req .AllowedStates
s .syncSkippedStates = req .SkippedStates
s .syncShallowClocks = req .ShallowClocks
s .syncMutations = req .SyncMutations
export , schema , _ := s .Source .Export ()
s .tracer .calcTrackedStates (export .StateNames )
s .tracer .active = true
statesCount := len (export .StateNames )
tTrackedSum := export .Time .Filter (s .tracer .trackedStateIdxs ).Sum (nil )
if !req .SyncSchema {
export .StateNames = am .StatesShared (export .StateNames ,
s .tracer .trackedStates )
export .Time = export .Time .Filter (s .tracer .trackedStateIdxs )
} else {
for i := range export .StateNames {
if slices .Contains (s .tracer .trackedStateIdxs , i ) {
continue
}
export .Time [i ] = 0
}
}
*resp = MsgSrvHello {
Serialized : export ,
StatesCount : uint32 (statesCount ),
}
if req .SyncSchema {
s .syncSchema = true
if req .SchemaHash == "" ||
req .SchemaHash != amhelp .SchemaHash (schema ) {
resp .Schema = schema
}
}
s .lastPushData .mTime = export .Time
s .lastPushData .queueTick = export .QueueTick
s .lastPushData .mTrackedTimeSum = tTrackedSum
s .lastPush = time .Now ()
s .clientId .Store (&req .Id )
s .log ("RemoteHello: t%v q%d" , tTrackedSum , export .QueueTick )
s .Mach .Add1 (ssS .Handshaking , nil )
return nil
}
func (s *Server ) RemoteHandshake (
client *rpc2 .Client , _ *MsgEmpty , _ *MsgEmpty ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
id := s .clientId .Load ()
if id == nil {
return ErrNoConn
}
sum := s .Source .Time (nil ).Sum (nil )
qTick := s .Source .QueueTick ()
s .log ("RemoteHandshake: t%v q%d" , sum , qTick )
s .rpcClient .Store (client )
s .Mach .Add1 (ssS .HandshakeDone , Pass (&A {
Id : *id ,
}))
return nil
}
func (s *Server ) RemoteAdd (
_ *rpc2 .Client , req *MsgCliMutation , resp *MsgSrvMutation ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
s .lockExport .Lock ()
defer s .lockExport .Unlock ()
if req .States == nil {
return ErrInvalidParams
}
args := req .Args
if s .ArgsPrefix != "" && s .Args != nil {
args = am .A {
s .ArgsPrefix : amhelp .ArgsFromMap (req .Args , s .Args ),
}
}
var val am .Result
if req .Event != nil {
val = s .Source .EvAdd (req .Event , amhelp .IndexesToStates (
s .Source .StateNames (), req .States ), args )
} else {
val = s .Source .Add (amhelp .IndexesToStates (s .Source .StateNames (),
req .States ), args )
}
data := s .tracer .DataLatest ()
*resp = *s .newMsgMutation (val , data )
return nil
}
func (s *Server ) RemoteAddNS (
_ *rpc2 .Client , req *MsgCliMutation , _ *MsgEmpty ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
s .lockExport .Lock ()
defer s .lockExport .Unlock ()
if req .States == nil {
return ErrInvalidParams
}
args := req .Args
if s .ArgsPrefix != "" && s .Args != nil {
args = am .A {
s .ArgsPrefix : amhelp .ArgsFromMap (req .Args , s .Args ),
}
}
_ = s .Source .Add (amhelp .IndexesToStates (s .Source .StateNames (), req .States ),
args )
return nil
}
func (s *Server ) RemoteRemove (
_ *rpc2 .Client , req *MsgCliMutation , resp *MsgSrvMutation ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
s .lockExport .Lock ()
defer s .lockExport .Unlock ()
if req .States == nil {
return ErrInvalidParams
}
args := req .Args
if s .ArgsPrefix != "" && s .Args != nil {
args = am .A {
s .ArgsPrefix : amhelp .ArgsFromMap (req .Args , s .Args ),
}
}
val := s .Source .Remove (amhelp .IndexesToStates (s .Source .StateNames (),
req .States ), args )
data := s .tracer .DataLatest ()
*resp = *s .newMsgMutation (val , data )
return nil
}
func (s *Server ) RemoteSet (
_ *rpc2 .Client , req *MsgCliMutation , resp *MsgSrvMutation ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
s .lockExport .Lock ()
defer s .lockExport .Unlock ()
if req .States == nil {
return ErrInvalidParams
}
args := req .Args
if s .ArgsPrefix != "" && s .Args != nil {
args = am .A {
s .ArgsPrefix : amhelp .ArgsFromMap (req .Args , s .Args ),
}
}
val := s .Source .Set (amhelp .IndexesToStates (s .Source .StateNames (),
req .States ), args )
data := s .tracer .DataLatest ()
*resp = *s .newMsgMutation (val , data )
return nil
}
func (s *Server ) RemoteSync (
_ *rpc2 .Client , _ *MsgEmpty , resp *MsgSrvSync ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
s .Mach .Add1 (ssS .MetricSync , nil )
*resp = MsgSrvSync {
Time : s .Source .Time (nil ),
QueueTick : s .Source .QueueTick (),
}
s .log ("RemoteSync: [%v]" , resp .Time )
return nil
}
func (s *Server ) RemoteArgs (
_ *rpc2 .Client , _ *MsgEmpty , resp *MsgSrvArgs ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
s .Mach .Add1 (ssS .MetricSync , nil )
if s .Args != nil {
args , err := utils .StructFields (s .Args )
if err != nil {
return err
}
(*resp ).Args = args
}
return nil
}
func (s *Server ) RemoteBye (
_ *rpc2 .Client , _ *MsgEmpty , _ *MsgEmpty ,
) error {
if s .Mach .Not1 (ssS .Start ) {
return am .ErrCanceled
}
s .log ("RemoteBye" )
s .Mach .Remove1 (ssS .ClientConnected , Pass (&A {
Addr : s .Addr ,
}))
go func () {
select {
case <- time .After (100 * time .Millisecond ):
s .log ("rpc.Close timeout" )
case <- amhelp .ExecAndClose (func () {
if c := s .rpcClient .Load (); c != nil {
_ = c .Close ()
}
}):
s .log ("rpc.Close" )
}
time .Sleep (100 * time .Millisecond )
s .Mach .Remove1 (ssS .HandshakeDone , nil )
}()
s .rpcClient .Store (nil )
s .clientId .Store (nil )
return nil
}
func BindServer (source , target *am .Machine , rpcReady , clientConn string ) error {
if rpcReady == "" || clientConn == "" {
return fmt .Errorf ("rpcReady and clientConn must be set" )
}
h := &struct {
RpcReadyState am .HandlerFinal
RpcReadyEnd am .HandlerFinal
HandshakeDoneState am .HandlerFinal
HandshakeDoneEnd am .HandlerFinal
}{
RpcReadyState : ampipe .Add (source , target , ssS .RpcReady , rpcReady ),
RpcReadyEnd : ampipe .Remove (source , target , ssS .RpcReady , rpcReady ),
HandshakeDoneState : ampipe .Add (source , target , ssS .ClientConnected ,
clientConn ),
HandshakeDoneEnd : ampipe .Remove (source , target , ssS .ClientConnected ,
clientConn ),
}
return source .BindHandlers (h )
}
func BindServerMulti (
source , target *am .Machine , rpcReady , clientConn , clientDisconn string ,
) error {
if rpcReady == "" || clientConn == "" || clientDisconn == "" {
return fmt .Errorf ("rpcReady, clientConn, and clientDisconn must be set" )
}
h := &struct {
RpcReadyState am .HandlerFinal
RpcReadyEnd am .HandlerFinal
HandshakeDoneState am .HandlerFinal
HandshakeDoneEnd am .HandlerFinal
}{
RpcReadyState : ampipe .Add (source , target , ssS .RpcReady , rpcReady ),
RpcReadyEnd : ampipe .Remove (source , target , ssS .RpcReady , rpcReady ),
HandshakeDoneState : ampipe .Add (source , target ,
ssS .ClientConnected , clientConn ),
HandshakeDoneEnd : ampipe .Add (source , target ,
ssS .ClientConnected , clientDisconn ),
}
return source .BindHandlers (h )
}
func BindServerRpcReady (source , target *am .Machine , rpcReady string ) error {
h := &struct {
RpcReadyState am .HandlerFinal
}{
RpcReadyState : ampipe .Add (source , target , ssS .RpcReady , rpcReady ),
}
return source .BindHandlers (h )
}
type ServerOpts struct {
PayloadState string
Parent am .Api
Args any
ArgsPrefix string
}
type SendPayloadHandlers struct {
SendPayloadState am .HandlerFinal
}
func getSendPayloadState(s *Server , stateName string ) am .HandlerFinal {
return func (e *am .Event ) {
e .Machine ().EvRemove1 (e , stateName , nil )
ctx := s .Mach .NewStateCtx (ssS .Start )
args := ParseArgs (e .Args )
argsOut := &A {Name : args .Name }
if args .Payload == nil || args .Name == "" {
err := fmt .Errorf ("invalid payload args [name, payload]" )
e .Machine ().EvAddErrState (e , ssW .ErrSendPayload , err , Pass (argsOut ))
return
}
go func () {
ctx , cancel := context .WithTimeout (ctx , s .DeliveryTimeout )
defer cancel ()
err := s .SendPayload (ctx , e , args .Payload )
if err != nil {
e .Machine ().EvAddErrState (e , ssW .ErrSendPayload , err , Pass (argsOut ))
}
}()
}
}
func createSendPayloadHandlers(s *Server , stateName string ) any {
fn := getSendPayloadState (s , stateName )
structType := reflect .StructOf ([]reflect .StructField {
{
Name : stateName + am .SuffixState ,
Type : reflect .TypeOf (fn ),
},
})
val := reflect .New (structType ).Elem ()
val .Field (0 ).Set (reflect .ValueOf (fn ))
ret := val .Addr ().Interface ()
return ret
}
func calcUpdate(
syncSchema bool , data , lastPush *tracerData , shallowClocks bool ,
) *MsgSrvUpdate {
var idxs []uint16
var ticks []uint32
if shallowClocks {
idxs , ticks = genShallowUpdate (syncSchema , data , lastPush )
} else {
idxs , ticks = genDeepUpdate (syncSchema , data , lastPush )
}
return &MsgSrvUpdate {
QueueTick : uint16 (data .queueTick - lastPush .queueTick ),
MachTick : uint8 (data .machTick - lastPush .machTick ),
Indexes : idxs ,
Ticks : ticks ,
Checksum : data .checksum ,
}
}
func calcUpdateMutations(
syncSchema bool , muts []tracerMutation , prev *tracerData ,
) *MsgSrvUpdateMuts {
ret := &MsgSrvUpdateMuts {}
for i := range muts {
mut := &muts [i ]
ret .MutationType = append (ret .MutationType , mut .mutType )
called := make ([]uint16 , len (mut .calledIdxs ))
for ii := range mut .calledIdxs {
called [ii ] = uint16 (mut .calledIdxs [ii ])
}
ret .CalledStates = append (ret .CalledStates , called )
ret .Updates = append (ret .Updates , *calcUpdate (syncSchema , &mut .data , prev ,
false ))
prev = &mut .data
}
return ret
}
func genDeepUpdate(
syncSchema bool , data , lastPush *tracerData ,
) (indexes []uint16 , ticks []uint32 ) {
indexes = make ([]uint16 , 0 , len (data .tracked ))
ticks = make ([]uint32 , 0 , len (data .tracked ))
for trackedIdx := range data .tracked {
stateIdx := data .trackedIdxs [trackedIdx ]
prev := lastPush .mTime
now := data .mTime
pushedIdx := uint16 (stateIdx )
if !syncSchema {
pushedIdx = uint16 (trackedIdx )
}
if prev == nil || trackedIdx >= len (prev ) {
if now [trackedIdx ] == 0 {
continue
}
indexes = append (indexes , pushedIdx )
ticks = append (ticks , uint32 (now [pushedIdx ]))
} else if prev [pushedIdx ] != now [pushedIdx ] {
indexes = append (indexes , pushedIdx )
ticks = append (ticks , uint32 (now [pushedIdx ]-prev [pushedIdx ]))
}
}
return indexes , ticks
}
func genShallowUpdate(
syncSchema bool , data , lastPush *tracerData ,
) (indexes []uint16 , ticks []uint32 ) {
indexes = make ([]uint16 , 0 , len (data .tracked ))
ticks = make ([]uint32 , 0 , len (data .tracked ))
for trackedIdx := range data .tracked {
stateIdx := data .trackedIdxs [trackedIdx ]
prev := lastPush .mTime
now := data .mTime
pushedIdx := uint16 (stateIdx )
if !syncSchema {
pushedIdx = uint16 (trackedIdx )
}
if prev == nil || int (pushedIdx ) >= len (prev ) {
if now [pushedIdx ] == 0 {
continue
}
indexes = append (indexes , pushedIdx )
tick := 0
if am .IsActiveTick (now [pushedIdx ]) {
tick = 1
}
ticks = append (ticks , uint32 (tick ))
} else if prev [pushedIdx ]%2 != now [pushedIdx ]%2 {
indexes = append (indexes , pushedIdx )
ticks = append (ticks , uint32 (1 ))
}
}
return indexes , ticks
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .