package history
import (
"context"
"errors"
"fmt"
"slices"
"sync"
"time"
"github.com/orsinium-labs/enum"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
)
var (
ErrIncompatibleType = fmt .Errorf ("incompatible type" )
ErrCondition = fmt .Errorf ("incorrect condition" )
)
var ErrConfig = fmt .Errorf ("incorrect config" )
type MatcherFn func (now *am .TimeIndex , db []*MemoryRecord ) []*MemoryRecord
type Query struct {
Active am .S
Activated am .S
Inactive am .S
Deactivated am .S
Start ConditionTime
End ConditionTime
}
type ConditionTime struct {
MTimeStates am .S
MTime am .Time
HTime time .Time
MTimeSum uint64
MTimeTrackedSum uint64
MTimeDiff uint64
MTimeTrackedDiff uint64
MTimeRecordDiff uint64
MachTick uint32
}
type ConditionTx struct {
Query
Called am .S
SourceTx string
SourceMach string
IsAuto bool
IsAccepted bool
IsCheck bool
IsBroken bool
QueueLen uint16
QueuedAt uint64
ExecutedAt uint64
}
type BaseConfig struct {
Log bool
Called am .S
CalledExclude bool
Changed am .S
ChangedExclude bool
TrackRejected bool
TrackedStates am .S
StoreTransitions bool
StoreSchema bool
MaxRecords int
}
type Config = BaseConfig
type Backend enum .Member [string ]
var (
BackendMemory = Backend {"memory" }
BackendSqlite = Backend {"sqlite" }
BackendBbolt = Backend {"bbolt" }
BackendEnum = enum .New (BackendMemory , BackendSqlite , BackendBbolt )
)
type MachineRecord struct {
MachId string `msgpack:"mi"`
StateNames am .S `msgpack:"sn"`
Schema am .Schema `msgpack:"s"`
FirstTracking time .Time `msgpack:"ft"`
LastTracking time .Time `msgpack:"lt"`
LastSync time .Time `msgpack:"ls"`
MTime am .Time `msgpack:"mt"`
MTimeSum uint64 `msgpack:"mts"`
MachTick uint32 `msgpack:"mt2"`
NextId uint64 `msgpack:"ni"`
}
type MemoryRecord struct {
Time *TimeRecord
Transition *TransitionRecord
}
type TimeRecord struct {
MutType am .MutationType `msgpack:"mt"`
MTimeSum uint64 `msgpack:"mts"`
MTimeTrackedSum uint64 `msgpack:"mtts"`
MTimeDiffSum uint64 `msgpack:"mtds"`
MTimeTrackedDiffSum uint64 `msgpack:"mttds"`
MTimeRecordDiffSum uint64 `msgpack:"mtrds"`
HTime time .Time `msgpack:"ht"`
MTimeTracked am .Time `msgpack:"mtt"`
MTimeTrackedDiff am .Time `msgpack:"mttd"`
MachTick uint32 `msgpack:"mt2"`
}
type TransitionRecord struct {
TransitionId string `msgpack:"ti"`
TimeRecordId uint64 `msgpack:"tri"`
SourceTx string `msgpack:"st"`
SourceMach string `msgpack:"sm"`
IsAuto bool `msgpack:"ia"`
IsAccepted bool `msgpack:"ia2"`
IsCheck bool `msgpack:"ic"`
IsBroken bool `msgpack:"ib"`
QueueLen uint16 `msgpack:"ql"`
QueuedAt uint64 `msgpack:"qa"`
ExecutedAt uint64 `msgpack:"ea"`
Called []int `msgpack:"c"`
Arguments map [string ]string `msgpack:"a"`
}
type tracer struct {
*am .TracerNoOp
mem *Memory
}
func (t *tracer ) MachineInit (mach am .Api ) context .Context {
m := t .mem
now := time .Now ().UTC ()
if m .machRec == nil {
m .machRec = &MachineRecord {
MachId : mach .Id (),
FirstTracking : now ,
NextId : 1 ,
}
}
rec := m .machRec
mTime := mach .Time (nil )
rec .MTimeSum = mTime .Sum (nil )
rec .MTime = mTime
rec .MachTick = mach .MachineTick ()
rec .LastTracking = now
rec .LastSync = now
if m .Cfg .StoreSchema {
rec .Schema = mach .Schema ()
rec .StateNames = mach .StateNames ()
}
return nil
}
func (t *tracer ) SchemaChange (machine am .Api , old am .Schema ) {
m := t .mem
m .mx .Lock ()
defer m .mx .Unlock ()
if !m .Cfg .StoreSchema {
return
}
rec := m .machRec
rec .Schema = m .Mach .Schema ()
rec .StateNames = m .Mach .StateNames ()
}
func (t *tracer ) TransitionEnd (tx *am .Transition ) {
m := t .mem
if (!tx .IsAccepted .Load () && !m .Cfg .TrackRejected ) || tx .Mutation .IsCheck {
return
}
mach := m .Mach
called := tx .CalledStates ()
changed := tx .TimeAfter .DiffSince (tx .TimeBefore ).
ToIndex (mach .StateNames ()).NonZeroStates ()
mut := tx .Mutation
cfg := m .Cfg
match := (cfg .ChangedExclude || len (cfg .Changed ) == 0 ) &&
(cfg .CalledExclude || len (cfg .Called ) == 0 )
mTime := tx .TimeAfter
mTimeTracked := mTime .Filter (m .cacheTrackedIdxs )
mTimeTrackedBefore := tx .TimeBefore .Filter (m .cacheTrackedIdxs )
sum := mTime .Sum (nil )
sumTracked := mTimeTracked .Sum (nil )
for _ , name := range cfg .Called {
listed := slices .Contains (called , name )
if listed && cfg .CalledExclude {
match = false
break
} else if !listed && !cfg .CalledExclude {
match = true
break
}
}
for _ , name := range cfg .Changed {
listed := slices .Contains (changed , name )
if listed && cfg .ChangedExclude {
match = false
break
} else if !listed && !cfg .ChangedExclude {
match = true
break
}
}
if !match {
return
}
m .mx .Lock ()
defer m .mx .Unlock ()
var recordDiff uint64
if len (m .db ) > 0 {
lastRec := m .db [len (m .db )-1 ]
recordDiff = sum - lastRec .Time .MTimeSum
}
machTick := mach .MachineTick ()
now := time .Now ().UTC ()
record := &MemoryRecord {
Time : &TimeRecord {
MutType : mut .Type ,
MTimeSum : sum ,
MTimeTrackedSum : sumTracked ,
MTimeDiffSum : sum - tx .TimeBefore .Sum (nil ),
MTimeTrackedDiffSum : sumTracked - tx .TimeBefore .Sum (m .cacheTrackedIdxs ),
MTimeRecordDiffSum : recordDiff ,
MachTick : machTick ,
HTime : now ,
MTimeTracked : mTimeTracked ,
MTimeTrackedDiff : mTimeTracked .DiffSince (mTimeTrackedBefore ),
},
}
if cfg .StoreTransitions {
record .Transition = &TransitionRecord {
TransitionId : tx .Id ,
Called : m .Index (called ),
IsAuto : mut .IsAuto ,
IsAccepted : tx .IsAccepted .Load (),
IsCheck : mut .IsCheck ,
IsBroken : tx .IsBroken .Load (),
QueueLen : tx .QueueLen ,
QueuedAt : mut .QueueTick ,
Arguments : tx .Mutation .MapArgs (mach .SemLogger ().ArgsMapper ()),
}
if mut .Source != nil {
record .Transition .SourceTx = mut .Source .TxId
record .Transition .SourceMach = mut .Source .MachId
}
if mut .QueueTick > 0 {
record .Transition .ExecutedAt = mach .QueueTick ()
}
}
if len (m .db ) >= cfg .MaxRecords {
m .db = m .db [1 :]
}
m .db = append (m .db , record )
m .machRec .MTime = mTime
m .machRec .MTimeSum = sum
m .machRec .LastSync = now
m .machRec .MachTick = machTick
m .machRec .NextId ++
}
type MemoryApi interface {
ActivatedBetween (ctx context .Context , state string , start, end time .Time ) bool
ActiveBetween (ctx context .Context , state string , start, end time .Time ) bool
DeactivatedBetween (
ctx context .Context , state string , start, end time .Time ,
) bool
InactiveBetween (ctx context .Context , state string , start, end time .Time ) bool
FindLatest (
ctx context .Context , retTx bool , limit int , query Query ,
) ([]*MemoryRecord , error )
Sync () error
ToTimeRecord (format any ) (*TimeRecord , error )
ToMachineRecord (format any ) (*MachineRecord , error )
ToTransitionRecord (format any ) (*TransitionRecord , error )
IsTracked (states am .S ) bool
IsTracked1 (state string ) bool
Index (states am .S ) []int
Index1 (state string ) int
Machine () am .Api
Config () BaseConfig
Context () context .Context
MachineRecord () *MachineRecord
Dispose () error
}
type BaseMemory struct {
Ctx context .Context
Mach am .Api
Cfg *BaseConfig
memImpl MemoryApi
mx sync .RWMutex
db []*MemoryRecord
tr *tracer
}
var _ MemoryApi = &BaseMemory {}
func NewBaseMemory (
ctx context .Context , mach am .Api , config BaseConfig , memImpl MemoryApi ,
) *BaseMemory {
return &BaseMemory {
memImpl : memImpl ,
Mach : mach ,
Cfg : &config ,
Ctx : ctx ,
}
}
func (m *BaseMemory ) ActivatedBetween (
ctx context .Context , state string , start , end time .Time ,
) bool {
ret , err := m .memImpl .FindLatest (ctx , false , 1 , Query {
Activated : am .S {state },
Start : ConditionTime {
HTime : start ,
},
End : ConditionTime {
HTime : end ,
},
})
return err == nil && ret != nil
}
func (m *BaseMemory ) Sync () error {
return nil
}
func (m *BaseMemory ) ActiveBetween (
ctx context .Context , state string , start , end time .Time ,
) bool {
ret , err := m .memImpl .FindLatest (ctx , false , 1 , Query {
Active : am .S {state },
Start : ConditionTime {
HTime : start ,
},
End : ConditionTime {
HTime : end ,
},
})
return err == nil && ret != nil
}
func (m *BaseMemory ) DeactivatedBetween (
ctx context .Context , state string , start , end time .Time ,
) bool {
ret , err := m .memImpl .FindLatest (ctx , false , 1 , Query {
Deactivated : am .S {state },
Start : ConditionTime {
HTime : start ,
},
End : ConditionTime {
HTime : end ,
},
})
return err == nil && ret != nil
}
func (m *BaseMemory ) InactiveBetween (
ctx context .Context , state string , start , end time .Time ,
) bool {
ret , err := m .memImpl .FindLatest (ctx , false , 1 , Query {
Inactive : am .S {state },
Start : ConditionTime {
HTime : start ,
},
End : ConditionTime {
HTime : end ,
},
})
return err == nil && ret != nil
}
func (m *BaseMemory ) FindLatest (
ctx context .Context , retTx bool , limit int , query Query ,
) ([]*MemoryRecord , error ) {
panic ("implement in subclass" )
}
func (m *BaseMemory ) IsTracked (states am .S ) bool {
for _ , state := range states {
if !slices .Contains (m .Cfg .TrackedStates , state ) {
return false
}
}
return true
}
func (m *BaseMemory ) IsTracked1 (state string ) bool {
return m .IsTracked (am .S {state })
}
func (m *BaseMemory ) Index (states am .S ) []int {
result := make ([]int , len (states ))
for i , state := range states {
result [i ] = slices .Index (m .Cfg .TrackedStates , state )
}
return result
}
func (m *BaseMemory ) Index1 (state string ) int {
return slices .Index (m .Cfg .TrackedStates , state )
}
func (m *BaseMemory ) ToTimeRecord (format any ) (*TimeRecord , error ) {
tr , ok := format .(*TimeRecord )
if !ok {
return nil , ErrIncompatibleType
}
return tr , nil
}
func (m *BaseMemory ) ToMachineRecord (format any ) (*MachineRecord , error ) {
mr , ok := format .(*MachineRecord )
if !ok {
return nil , ErrIncompatibleType
}
return mr , nil
}
func (m *BaseMemory ) ToTransitionRecord (format any ) (*TransitionRecord , error ) {
tr , ok := format .(*TransitionRecord )
if !ok {
return nil , ErrIncompatibleType
}
return tr , nil
}
func (m *BaseMemory ) ToMemoryRecord (format any ) (*MemoryRecord , error ) {
mr , ok := format .(*MemoryRecord )
if !ok {
return nil , ErrIncompatibleType
}
return mr , nil
}
func (m *BaseMemory ) Dispose () error {
panic ("implement in subclass" )
}
func (m *BaseMemory ) MachineRecord () *MachineRecord {
panic ("implement in subclass" )
}
func (m *BaseMemory ) Machine () am .Api {
panic ("implement in subclass" )
}
func (m *BaseMemory ) Config () BaseConfig {
panic ("implement in subclass" )
}
func (m *BaseMemory ) Context () context .Context {
return m .Ctx
}
func (m *BaseMemory ) ValidateQuery (query Query ) error {
names := slices .Concat (query .Active , query .Activated , query .Inactive ,
query .Deactivated , query .Start .MTimeStates , query .End .MTimeStates )
for _ , state := range names {
if !m .IsTracked1 (state ) {
return fmt .Errorf ("%w: %w: %s not tracked" ,
ErrCondition , am .ErrStateMissing , state )
}
}
if len (query .Start .MTimeStates ) != len (query .Start .MTime ) {
return fmt .Errorf ("%w: state list and machine time mismatch: %d != %d" ,
ErrCondition , len (query .Start .MTimeStates ), len (query .Start .MTime ))
}
if len (query .End .MTimeStates ) != len (query .End .MTime ) {
return fmt .Errorf ("%w: state list and machine time mismatch: %d != %d" ,
ErrCondition , len (query .End .MTimeStates ), len (query .End .MTime ))
}
return nil
}
type Memory struct {
*BaseMemory
onErr func (err error )
machRec *MachineRecord
cacheTrackedIdxs []int
}
func NewMemory (
ctx context .Context , machRecord *MachineRecord , mach am .Api ,
config BaseConfig , onErr func (err error ),
) (*Memory , error ) {
c := config
if c .MaxRecords <= 0 {
c .MaxRecords = 1000
}
if !c .CalledExclude {
c .TrackedStates = slices .Concat (c .TrackedStates , c .Called )
}
if !c .ChangedExclude {
c .TrackedStates = slices .Concat (c .TrackedStates , c .Changed )
}
c .TrackedStates = mach .ParseStates (c .TrackedStates )
if len (c .TrackedStates ) == 0 {
return nil , fmt .Errorf ("%w: no states to track" , am .ErrStateMissing )
}
mem := &Memory {
onErr : onErr ,
cacheTrackedIdxs : mach .Index (c .TrackedStates ),
}
mem .BaseMemory = NewBaseMemory (ctx , mach , c , mem )
tr := &tracer {
mem : mem ,
}
mem .tr = tr
if machRecord != nil {
cp := *machRecord
tr .mem .machRec = &cp
}
tr .MachineInit (mach )
mach .OnDispose (func (id string , ctx context .Context ) {
err := mem .Dispose ()
if err != nil {
mem .onErr (err )
}
})
return mem , mach .BindTracer (tr )
}
func (m *Memory ) MachineRecord () *MachineRecord {
m .mx .Lock ()
defer m .mx .Unlock ()
cp := *m .machRec
return &cp
}
func (m *Memory ) Machine () am .Api {
return m .Mach
}
func (m *Memory ) Config () BaseConfig {
return *m .Cfg
}
func (m *Memory ) Dispose () error {
m .mx .Lock ()
defer m .mx .Unlock ()
m .db = nil
return m .Mach .DetachTracer (m .tr )
}
func (m *Memory ) FindLatest (
ctx context .Context , _ bool , limit int , query Query ,
) ([]*MemoryRecord , error ) {
if err := m .ValidateQuery (query ); err != nil {
return nil , err
}
s := query .Start
e := query .End
mach := m .Mach
return m .Match (ctx , func (
now *am .TimeIndex , db []*MemoryRecord ,
) []*MemoryRecord {
mTimeIdxs := m .Index (s .MTimeStates )
var older *MemoryRecord
var ret []*MemoryRecord
for i := len (db ) - 1 ; i >= 0 ; i -- {
if ctx .Err () != nil {
return nil
}
r := db [i ]
older = nil
if i > 0 {
older = db [i -1 ]
}
for _ , state := range query .Active {
if !am .IsActiveTick (r .Time .MTimeTracked [m .Index1 (state )]) {
continue
}
}
for _ , state := range query .Activated {
idx := m .Index1 (state )
if !am .IsActiveTick (r .Time .MTimeTracked [idx ]) {
continue
}
if older != nil && am .IsActiveTick (older .Time .MTimeTracked [idx ]) {
continue
}
}
for _ , state := range query .Inactive {
if am .IsActiveTick (r .Time .MTimeTracked [mach .Index1 (state )]) {
continue
}
}
for _ , state := range query .Deactivated {
idx := m .Index1 (state )
if am .IsActiveTick (r .Time .MTimeTracked [idx ]) {
continue
}
if older != nil && !am .IsActiveTick (older .Time .MTimeTracked [idx ]) {
continue
}
}
if len (s .MTimeStates ) > 0 {
mTimeTrackedCond := r .Time .MTimeTracked .Filter (mTimeIdxs )
if mTimeTrackedCond .Before (false , s .MTime ) ||
mTimeTrackedCond .After (false , e .MTime ) {
continue
}
}
t := r .Time
if !s .HTime .IsZero () && !e .HTime .IsZero () &&
(t .HTime .Before (s .HTime ) || t .HTime .After (e .HTime )) {
continue
}
if s .MTimeSum != 0 && e .MTimeSum != 0 &&
(t .MTimeSum < s .MTimeSum || t .MTimeSum > e .MTimeSum ) {
continue
}
if s .MTimeTrackedSum != 0 && e .MTimeTrackedSum != 0 &&
(t .MTimeTrackedSum < s .MTimeTrackedSum ||
t .MTimeTrackedSum > e .MTimeTrackedSum ) {
continue
}
if s .MTimeDiff != 0 && e .MTimeDiff != 0 &&
(t .MTimeDiffSum < s .MTimeDiff || t .MTimeDiffSum > e .MTimeDiff ) {
continue
}
if s .MTimeTrackedDiff != 0 && e .MTimeTrackedDiff != 0 &&
(t .MTimeTrackedDiffSum < s .MTimeTrackedDiff ||
t .MTimeTrackedDiffSum > e .MTimeTrackedDiff ) {
continue
}
if s .MTimeRecordDiff != 0 && e .MTimeRecordDiff != 0 &&
(t .MTimeRecordDiffSum < s .MTimeRecordDiff ||
t .MTimeRecordDiffSum > e .MTimeRecordDiff ) {
continue
}
if s .MachTick != 0 && e .MachTick != 0 &&
(t .MachTick < s .MachTick || t .MachTick > e .MachTick ) {
continue
}
ret = append (ret , r )
if limit > 0 && len (ret ) >= limit {
break
}
}
return ret
})
}
func (m *Memory ) Match (
ctx context .Context , matcherFn MatcherFn ,
) ([]*MemoryRecord , error ) {
m .mx .RLock ()
db := make ([]*MemoryRecord , len (m .db ))
copy (db , m .db )
m .mx .RUnlock ()
ret := matcherFn (m .Mach .Time (nil ).ToIndex (m .Mach .StateNames ()), db )
if ctx .Err () != nil || m .Ctx .Err () != nil {
return nil , errors .Join (ctx .Err (), m .Ctx .Err ())
}
return ret , nil
}
func (m *Memory ) Export () []*MemoryRecord {
m .mx .RLock ()
defer m .mx .RUnlock ()
db := make ([]*MemoryRecord , len (m .db ))
copy (db , m .db )
return db
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .