package frostdb
import (
"context"
"slices"
"sync/atomic"
"time"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/polarsignals/frostdb"
"golang.org/x/sync/errgroup"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
)
type Time struct {
Id uint64
MutType uint64
TimeSum uint64
TimeDiff uint64
TransitionId string
States map [string ]uint64 `frostdb:",rle_dict,asc(1),null_first"`
}
type Transition struct {
Id string
SourceTx string
SourceMach string
IsAuto bool
IsAccepted bool
IsCheck bool
IsBroken bool
QueueLen uint16
QueuedAt uint64
ExecutedAt uint64
Called []uint64
Arguments map [string ]string
}
type Memory struct {
Store *frostdb .ColumnStore
Db *frostdb .DB
}
func NewMemory (ctx context .Context ) (*Memory , error ) {
store , err := frostdb .New (frostdb .WithWAL (), frostdb .WithStoragePath ("." ))
if err != nil {
return nil , err
}
db , err := store .DB (ctx , "amhist" )
if err != nil {
return nil , err
}
return &Memory {
Db : db ,
Store : store ,
}, nil
}
func (m *Memory ) Close () error {
return m .Db .Close ()
}
func (m *Memory ) Track (
onErr func (err error ), mach *am .Machine , calledAllowlist ,
changedAllowlist am .S , maxEntries int ,
) (*Tracer , error ) {
t := &Tracer {
TracerNoOp : &am .TracerNoOp {},
loop : &errgroup .Group {},
db : m .Db ,
lastActivated : make (map [string ]time .Time ),
calledAllowlist : calledAllowlist ,
changedAllowlist : changedAllowlist ,
maxEntries : maxEntries ,
onErr : onErr ,
tTimes : make (map [string ]*frostdb .GenericTable [*Time ]),
tTransitions : make (map [string ]*frostdb .GenericTable [*Transition ]),
}
t .loop .SetLimit (1 )
t .MachineInit (mach )
return t , mach .BindTracer (t )
}
type Tracer struct {
*am .TracerNoOp
StoreChecks bool
Active atomic .Int32
db *frostdb .DB
lastActivated map [string ]time .Time
calledAllowlist am .S
changedAllowlist am .S
maxEntries int
loop *errgroup .Group
onErr func (err error )
tTimes map [string ]*frostdb .GenericTable [*Time ]
tTransitions map [string ]*frostdb .GenericTable [*Transition ]
}
func (t *Tracer ) TransitionEnd (tx *am .Transition ) {
if tx .Mutation .IsCheck && !t .StoreChecks {
return
}
mach := tx .Machine
called := tx .CalledStates ()
match := true
for _ , name := range t .calledAllowlist {
if slices .Contains (called , name ) {
match = true
break
}
}
if !match {
return
}
calledIdxs := make ([]uint64 , len (called ))
for i , idx := range tx .Mutation .Called {
calledIdxs [i ] = uint64 (idx )
}
recTime := &Time {
MutType : uint64 (tx .Mutation .Type ),
TimeSum : tx .Machine .Time (nil ).Sum (nil ),
TimeDiff : tx .TimeAfter .Sum (nil ) - tx .TimeBefore .Sum (nil ),
TransitionId : tx .Id ,
States : make (map [string ]uint64 , len (ssB .Names ())),
}
recTx := &Transition {
Id : tx .Id ,
IsAuto : tx .Mutation .IsAuto ,
IsAccepted : tx .IsAccepted .Load (),
IsCheck : tx .Mutation .IsCheck ,
IsBroken : tx .IsBroken .Load (),
QueuedAt : tx .Mutation .QueueTick ,
Called : calledIdxs ,
Arguments : tx .Mutation .MapArgs (mach .SemLogger ().ArgsMapper ()),
QueueLen : mach .QueueLen (),
}
if tx .Mutation .QueueTick > 0 {
qt := mach .QueueTick ()
recTx .ExecutedAt = qt
}
if tx .Mutation .Source != nil {
recTx .SourceTx = tx .Mutation .Source .TxId
recTx .SourceMach = tx .Mutation .Source .MachId
}
for _ , state := range ssB .Names () {
recTime .States [state ] = tx .Machine .Tick (state )
}
t .Active .Add (1 )
go t .loop .Go (func () error {
_ , err := t .tTransitions [mach .Id ()].Write (mach .Ctx (), recTx )
if err != nil {
t .onErr (err )
return err
}
_, err = t .tTimes [mach .Id ()].Write (mach .Ctx (), recTime )
if err != nil {
t .onErr (err )
return err
}
t .Active .Add (-1 )
return nil
})
}
func (t *Tracer ) MachineInit (mach am .Api ) context .Context {
times , err := frostdb .NewGenericTable [*Time ](
t .db , "times" +mach .Id (), memory .DefaultAllocator ,
)
if err != nil {
t .onErr (err )
return nil
}
t .tTimes [mach .Id ()] = times
transitions , err := frostdb .NewGenericTable [*Transition ](
t .db , "transitions" +mach .Id (), memory .DefaultAllocator ,
)
if err != nil {
t .onErr (err )
return nil
}
t .tTransitions [mach .Id ()] = transitions
return nil
}
type BasicStatesDef struct {
*am .StatesBase
ErrNetwork string
ErrHandlerTimeout string
Start string
Ready string
Healthcheck string
Heartbeat string
}
var BasicSchema = am .Schema {
ssB .Exception : {Multi : true },
ssB .ErrNetwork : {
Multi : true ,
Require : am .S {am .StateException },
},
ssB .ErrHandlerTimeout : {
Multi : true ,
Require : am .S {am .StateException },
},
ssB .Start : {},
ssB .Ready : {Require : am .S {ssB .Start }},
ssB .Healthcheck : {Multi : true },
ssB .Heartbeat : {},
}
var (
ssB = am .NewStates (BasicStatesDef {})
BasicStates = ssB
)
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .