// Package history provides machine history tracking and traversal using // the process' memory and structs.
package history import ( am ) // ///// ///// ///// // ///// TYPES // ///// ///// ///// var ( ErrIncompatibleType = fmt.Errorf("incompatible type") ErrCondition = fmt.Errorf("incorrect condition") ) // TODO bind ErrConfig var ErrConfig = fmt.Errorf("incorrect config") type MatcherFn func(now *am.TimeIndex, db []*MemoryRecord) []*MemoryRecord // Query represents various conditions for a single mutation. All are // optional, but at least one must be set. type Query struct { // states // Active is a set of states that were active AFTER the mutation Active am.S // Activated is a set of states that were activated during the transition. Activated am.S // Inactive is a set of states that were inactive AFTER the mutation Inactive am.S // Deactivated is a set of states that were deactivated during the // transition. Deactivated am.S // times // Start is the beginning of a scalar time condition and requires an // equivalent in [Query.End]. Start ConditionTime // End is the end of a scalar time condition and requires an equivalent in // [Query.Start]. End ConditionTime // TODO support transitions // StartTx ConditionTx // EndTx ConditionTx } type ConditionTime struct { // MTimeStates is a set of states for MTime. MTimeStates am.S // MTime is the machine time for a mutation to match. MTime am.Time // HTime is the human time for a mutation to match. HTime time.Time // MTimeSum is a machine time sum after this transition. MTimeSum uint64 // MTimeSum is a machine time sum after this transition for tracked states // only. MTimeTrackedSum uint64 // MTimeDiff is a machine time difference for this transition, compared to the // previous transition. MTimeDiff uint64 // MTimeDiff is a machine time difference for this transition for tracked // states only. MTimeTrackedDiff uint64 // MTimeRecordDiff is a machine time difference since the previous // [TimeRecord]. MTimeRecordDiff uint64 // MachTick is the machine tick at the time of this transition. MachTick uint32 } // TODO ConditionTx // ConditionTx represents a condition for a single transition. Requires // [BaseConfig.StoreTransitions] to be true. type ConditionTx struct { Query // Called is a set of states that were called in the mutation. Called am.S // tx info SourceTx string SourceMach string IsAuto bool IsAccepted bool IsCheck bool IsBroken bool QueueLen uint16 // normal queue props QueuedAt uint64 ExecutedAt uint64 } // BaseConfig describes the tracking configuration - conditions, list of tracked // states and data expiration TTLs. type BaseConfig struct { Log bool // tracking conditions // Called is a list of mutation states (called ones) required to track a // transition. See also CalledExclude. Optional. Called am.S // CalledExclude flips Called to be a blocklist. CalledExclude bool // Changed is a list of transition states which had clock changes, required to // track a transition. See also ChangedExclude. A state can be called, but not // changed. Optional. Changed am.S // ChangedExclude flips Changed to be a blocklist. ChangedExclude bool // TrackRejected is a flag to track rejected transitions. TrackRejected bool // stored data // TrackedStates is a list of states to store clock values of. TrackedStates am.S // StoreTransitions is a flag to store TransitionRecord, in addition to // TimeRecord, for each tracked transition. StoreTransitions bool // TODO StoreSchema keeps the latest machine schema and state names within // [MachineRecord]. Useful for dynamic machines. StoreSchema bool // TODO SkipHumanTime won't store [Time.HTime], to save space. // SkipHumanTime bool // TODO MachineOnly will store only [MachineRecord], making history queries // impossible. // MachineOnly bool // GC & TTL // MaxRecords is the maximum number of records to keep in the history before // a rotation begins. MaxRecords int // TODO MaxHumanAge is the maximum age of records in human time. // MaxHumanAge time.Duration // TODO MaxMachAge is the maximum machine time sum of the tracked machine. // MaxMachAge uint64 // TODO MaxMachTrackedAge is the maximum machine time sum of the tracked // machine, counted from tracked states only. // MaxMachTrackedAge uint64 } // Config for the in-process memory. type Config = BaseConfig // Backend enumerates all available asyncmachine history backends. type Backend enum.Member[string] var ( BackendMemory = Backend{"memory"} BackendSqlite = Backend{"sqlite"} BackendBbolt = Backend{"bbolt"} BackendEnum = enum.New(BackendMemory, BackendSqlite, BackendBbolt) ) // ///// ///// ///// // ///// RECORDS // ///// ///// ///// type MachineRecord struct { // ID of the tracked machine MachId string `msgpack:"mi"` StateNames am.S `msgpack:"sn"` Schema am.Schema `msgpack:"s"` // human times // first time the machine has been tracked FirstTracking time.Time `msgpack:"ft"` // last time a tracking of this machine has started LastTracking time.Time `msgpack:"lt"` // last time a sync has been performed LastSync time.Time `msgpack:"ls"` // machine times // current (total) machine time MTime am.Time `msgpack:"mt"` // sum of the current machine time MTimeSum uint64 `msgpack:"mts"` // current machine start tick MachTick uint32 `msgpack:"mt2"` // next ID for time records NextId uint64 `msgpack:"ni"` } type MemoryRecord struct { Time *TimeRecord Transition *TransitionRecord } type TimeRecord struct { // MutType is a mutation type. MutType am.MutationType `msgpack:"mt"` // MTimeSum is a machine time sum after this transition. MTimeSum uint64 `msgpack:"mts"` // MTimeSum is a machine time sum after this transition for tracked states // only. MTimeTrackedSum uint64 `msgpack:"mtts"` // MTimeDiffSum is a machine time difference for this transition. MTimeDiffSum uint64 `msgpack:"mtds"` // MTimeDiffSum is a machine time difference for this transition for tracked // states only. MTimeTrackedDiffSum uint64 `msgpack:"mttds"` // MTimeRecordDiffSum is a machine time difference since the previous // [TimeRecord]. MTimeRecordDiffSum uint64 `msgpack:"mtrds"` // HTime is a human time in UTC. HTime time.Time `msgpack:"ht"` // MTime is a machine time after this mutation. MTimeTracked am.Time `msgpack:"mtt"` // MTimeTrackedDiff is a machine time diff compared to the previous mutation // (not a record). MTimeTrackedDiff am.Time `msgpack:"mttd"` // MachTick is the machine tick at the time of this transition. MachTick uint32 `msgpack:"mt2"` // TODO distance since the last change // DistanceLastChange am.Time eg [13, 23, 34] for [Foo, Bar, Baz] in // machine time ticks (not tracked-only ticks) // DistanceLastActive am.Time } type TransitionRecord struct { TransitionId string `msgpack:"ti"` TimeRecordId uint64 `msgpack:"tri"` SourceTx string `msgpack:"st"` SourceMach string `msgpack:"sm"` IsAuto bool `msgpack:"ia"` IsAccepted bool `msgpack:"ia2"` IsCheck bool `msgpack:"ic"` IsBroken bool `msgpack:"ib"` QueueLen uint16 `msgpack:"ql"` // normal queue props QueuedAt uint64 `msgpack:"qa"` ExecutedAt uint64 `msgpack:"ea"` // extra Called []int `msgpack:"c"` Arguments map[string]string `msgpack:"a"` } // ///// ///// ///// // ///// TRACER // ///// ///// ///// type tracer struct { *am.TracerNoOp mem *Memory } func ( *tracer) ( am.Api) context.Context { := .mem := time.Now().UTC() if .machRec == nil { .machRec = &MachineRecord{ MachId: .Id(), FirstTracking: , // compat only NextId: 1, } } := .machRec := .Time(nil) .MTimeSum = .Sum(nil) .MTime = .MachTick = .MachineTick() .LastTracking = .LastSync = // schema if .Cfg.StoreSchema { .Schema = .Schema() .StateNames = .StateNames() } return nil } func ( *tracer) ( am.Api, am.Schema) { := .mem // locks .mx.Lock() defer .mx.Unlock() // update schema if !.Cfg.StoreSchema { return } := .machRec .Schema = .Mach.Schema() .StateNames = .Mach.StateNames() } func ( *tracer) ( *am.Transition) { := .mem if (!.IsAccepted.Load() && !.Cfg.TrackRejected) || .Mutation.IsCheck { return } := .Mach := .CalledStates() := .TimeAfter.DiffSince(.TimeBefore). ToIndex(.StateNames()).NonZeroStates() := .Mutation := .Cfg := (.ChangedExclude || len(.Changed) == 0) && (.CalledExclude || len(.Called) == 0) := .TimeAfter := .Filter(.cacheTrackedIdxs) := .TimeBefore.Filter(.cacheTrackedIdxs) := .Sum(nil) := .Sum(nil) // process called for , := range .Called { := slices.Contains(, ) if && .CalledExclude { = false break } else if ! && !.CalledExclude { = true break } } // process changed for , := range .Changed { := slices.Contains(, ) if && .ChangedExclude { = false break } else if ! && !.ChangedExclude { = true break } } if ! { return } // lock .mx.Lock() defer .mx.Unlock() // create record and count diffs var uint64 if len(.db) > 0 { := .db[len(.db)-1] = - .Time.MTimeSum } := .MachineTick() := time.Now().UTC() := &MemoryRecord{ Time: &TimeRecord{ MutType: .Type, MTimeSum: , MTimeTrackedSum: , MTimeDiffSum: - .TimeBefore.Sum(nil), MTimeTrackedDiffSum: - .TimeBefore.Sum(.cacheTrackedIdxs), MTimeRecordDiffSum: , MachTick: , HTime: , MTimeTracked: , MTimeTrackedDiff: .DiffSince(), }, } // optional tx record if .StoreTransitions { .Transition = &TransitionRecord{ TransitionId: .Id, Called: .Index(), IsAuto: .IsAuto, IsAccepted: .IsAccepted.Load(), IsCheck: .IsCheck, IsBroken: .IsBroken.Load(), QueueLen: .QueueLen, QueuedAt: .QueueTick, // TODO optimize? Arguments: .Mutation.MapArgs(.SemLogger().ArgsMapper()), } // optional fields if .Source != nil { .Transition.SourceTx = .Source.TxId .Transition.SourceMach = .Source.MachId } if .QueueTick > 0 { .Transition.ExecutedAt = .QueueTick() } } // rotate and store if len(.db) >= .MaxRecords { .db = .db[1:] } .db = append(.db, ) // update machine record .machRec.MTime = .machRec.MTimeSum = .machRec.LastSync = .machRec.MachTick = .machRec.NextId++ } // ///// ///// ///// // ///// BASE MEMORY // ///// ///// ///// type MemoryApi interface { // predefined queries // ActivatedBetween returns true if the state become active withing the // passed conditions. ActivatedBetween(ctx context.Context, state string, start, end time.Time) bool // ActiveBetween returns true if the state was active at least once // within the passed conditions. Always true is ActivatedBetween is true. ActiveBetween(ctx context.Context, state string, start, end time.Time) bool DeactivatedBetween( ctx context.Context, state string, start, end time.Time, ) bool InactiveBetween(ctx context.Context, state string, start, end time.Time) bool // DB queries FindLatest( ctx context.Context, retTx bool, limit int, query Query, ) ([]*MemoryRecord, error) // Sync synchronizes the batch buffer with the underlying backend, making // those new records appear in queries. Doesn't guarantee persistence. // Useful when queries very fresh records. Sync() error // converters ToTimeRecord(format any) (*TimeRecord, error) ToMachineRecord(format any) (*MachineRecord, error) ToTransitionRecord(format any) (*TransitionRecord, error) // states IsTracked(states am.S) bool IsTracked1(state string) bool Index(states am.S) []int Index1(state string) int // misc Machine() am.Api Config() BaseConfig Context() context.Context MachineRecord() *MachineRecord Dispose() error } // BaseMemory are the common methods for all memory implementations, operating // on common models, like [TimeRecord] or [Query]. type BaseMemory struct { // TODO move panic methods to an interface Ctx context.Context Mach am.Api // read-only config for this history Cfg *BaseConfig // private // the top level memory implementing MemoryApi face memImpl MemoryApi // in-process mem only // lock for the machine record mx sync.RWMutex db []*MemoryRecord tr *tracer } var _ MemoryApi = &BaseMemory{} func ( context.Context, am.Api, BaseConfig, MemoryApi, ) *BaseMemory { return &BaseMemory{ memImpl: , Mach: , Cfg: &, Ctx: , } } // predefined queries // ActivatedBetween returns true if the state was activated within the passed // human time range. func ( *BaseMemory) ( context.Context, string, , time.Time, ) bool { , := .memImpl.FindLatest(, false, 1, Query{ Activated: am.S{}, Start: ConditionTime{ HTime: , }, End: ConditionTime{ HTime: , }, }) return == nil && != nil } func ( *BaseMemory) () error { return nil } // ActiveBetween returns true if the state was activated all the time within // the passed human time range. func ( *BaseMemory) ( context.Context, string, , time.Time, ) bool { , := .memImpl.FindLatest(, false, 1, Query{ Active: am.S{}, Start: ConditionTime{ HTime: , }, End: ConditionTime{ HTime: , }, }) return == nil && != nil } func ( *BaseMemory) ( context.Context, string, , time.Time, ) bool { , := .memImpl.FindLatest(, false, 1, Query{ Deactivated: am.S{}, Start: ConditionTime{ HTime: , }, End: ConditionTime{ HTime: , }, }) return == nil && != nil } func ( *BaseMemory) ( context.Context, string, , time.Time, ) bool { , := .memImpl.FindLatest(, false, 1, Query{ Inactive: am.S{}, Start: ConditionTime{ HTime: , }, End: ConditionTime{ HTime: , }, }) return == nil && != nil } // DB queries // FindLatest returns the latest records matching the given conditions, in order // from the newest to the oldest. If the limit is 0, all records are returned. func ( *BaseMemory) ( context.Context, bool, int, Query, ) ([]*MemoryRecord, error) { panic("implement in subclass") } // state methods // IsTracked returns true if the given states are all being tracked by this // memory instance. func ( *BaseMemory) ( am.S) bool { for , := range { if !slices.Contains(.Cfg.TrackedStates, ) { return false } } return true } // IsTracked1 is IsTracked for a single state. func ( *BaseMemory) ( string) bool { return .IsTracked(am.S{}) } // Index returns the indexes of the given states in the history records, or -1 // if the state is not being tracked. func ( *BaseMemory) ( am.S) []int { := make([]int, len()) for , := range { [] = slices.Index(.Cfg.TrackedStates, ) } return } // Index1 is [BaseMemory.Index] for a single state. func ( *BaseMemory) ( string) int { return slices.Index(.Cfg.TrackedStates, ) } // records methods func ( *BaseMemory) ( any) (*TimeRecord, error) { , := .(*TimeRecord) if ! { return nil, ErrIncompatibleType } return , nil } func ( *BaseMemory) ( any) (*MachineRecord, error) { , := .(*MachineRecord) if ! { return nil, ErrIncompatibleType } return , nil } func ( *BaseMemory) ( any) (*TransitionRecord, error) { , := .(*TransitionRecord) if ! { return nil, ErrIncompatibleType } return , nil } func ( *BaseMemory) ( any) (*MemoryRecord, error) { , := .(*MemoryRecord) if ! { return nil, ErrIncompatibleType } return , nil } // misc func ( *BaseMemory) () error { panic("implement in subclass") } // MachineRecord returns a copy of the history record for the tracked machine. func ( *BaseMemory) () *MachineRecord { panic("implement in subclass") } func ( *BaseMemory) () am.Api { panic("implement in subclass") } func ( *BaseMemory) () BaseConfig { panic("implement in subclass") } func ( *BaseMemory) () context.Context { return .Ctx } func ( *BaseMemory) ( Query) error { // validate states tracked := slices.Concat(.Active, .Activated, .Inactive, .Deactivated, .Start.MTimeStates, .End.MTimeStates) for , := range { if !.IsTracked1() { return fmt.Errorf("%w: %w: %s not tracked", ErrCondition, am.ErrStateMissing, ) } } // validate MTime len == MTimeStates len if len(.Start.MTimeStates) != len(.Start.MTime) { return fmt.Errorf("%w: state list and machine time mismatch: %d != %d", ErrCondition, len(.Start.MTimeStates), len(.Start.MTime)) } if len(.End.MTimeStates) != len(.End.MTime) { return fmt.Errorf("%w: state list and machine time mismatch: %d != %d", ErrCondition, len(.End.MTimeStates), len(.End.MTime)) } return nil } // ///// ///// ///// // ///// IN-PROCESS MEMORY // ///// ///// ///// type Memory struct { *BaseMemory onErr func(err error) machRec *MachineRecord cacheTrackedIdxs []int } // NewMemory returns a new memory instance that tracks the given machine // according to the given tracking configuration. All states are tracked by // default, which often is not desired. Keeps 1000 records by default. func ( context.Context, *MachineRecord, am.Api, BaseConfig, func( error), ) (*Memory, error) { // TODO validate mach.Id() == machRecord.MachId := if .MaxRecords <= 0 { .MaxRecords = 1000 } // include allowlists in tracked states if !.CalledExclude { .TrackedStates = slices.Concat(.TrackedStates, .Called) } if !.ChangedExclude { .TrackedStates = slices.Concat(.TrackedStates, .Changed) } .TrackedStates = .ParseStates(.TrackedStates) if len(.TrackedStates) == 0 { return nil, fmt.Errorf("%w: no states to track", am.ErrStateMissing) } // init and bind := &Memory{ onErr: , cacheTrackedIdxs: .Index(.TrackedStates), } .BaseMemory = NewBaseMemory(, , , ) := &tracer{ mem: , } .tr = if != nil { // clone := * .mem.machRec = & } .MachineInit() .OnDispose(func( string, context.Context) { := .Dispose() if != nil { .onErr() } }) return , .BindTracer() } func ( *Memory) () *MachineRecord { .mx.Lock() defer .mx.Unlock() := *.machRec return & } func ( *Memory) () am.Api { return .Mach } func ( *Memory) () BaseConfig { return *.Cfg } func ( *Memory) () error { .mx.Lock() defer .mx.Unlock() .db = nil return .Mach.DetachTracer(.tr) } // FindLatest is [BaseMemory.FindLatest] for in-process memory. func ( *Memory) ( context.Context, bool, int, Query, ) ([]*MemoryRecord, error) { if := .ValidateQuery(); != nil { return nil, } := .Start := .End := .Mach return .Match(, func( *am.TimeIndex, []*MemoryRecord, ) []*MemoryRecord { := .Index(.MTimeStates) var *MemoryRecord var []*MemoryRecord for := len() - 1; >= 0; -- { if .Err() != nil { return nil } := [] = nil if > 0 { = [-1] } // states conditions // Active for , := range .Active { if !am.IsActiveTick(.Time.MTimeTracked[.Index1()]) { continue } } // Activated for , := range .Activated { := .Index1() if !am.IsActiveTick(.Time.MTimeTracked[]) { continue } // if has previously been active if != nil && am.IsActiveTick(.Time.MTimeTracked[]) { continue } } // Inactive for , := range .Inactive { if am.IsActiveTick(.Time.MTimeTracked[.Index1()]) { continue } } // Deactivated for , := range .Deactivated { := .Index1() if am.IsActiveTick(.Time.MTimeTracked[]) { continue } // if has previously been inactive if != nil && !am.IsActiveTick(.Time.MTimeTracked[]) { continue } } // MTimeStates if len(.MTimeStates) > 0 { // caution: slice a sliced time slice := .Time.MTimeTracked.Filter() if .Before(false, .MTime) || .After(false, .MTime) { continue } } // time conditions := .Time // HTime if !.HTime.IsZero() && !.HTime.IsZero() && (.HTime.Before(.HTime) || .HTime.After(.HTime)) { continue } // MTimeSum if .MTimeSum != 0 && .MTimeSum != 0 && (.MTimeSum < .MTimeSum || .MTimeSum > .MTimeSum) { continue } // MTimeTrackedSum if .MTimeTrackedSum != 0 && .MTimeTrackedSum != 0 && (.MTimeTrackedSum < .MTimeTrackedSum || .MTimeTrackedSum > .MTimeTrackedSum) { continue } // MTimeDiff if .MTimeDiff != 0 && .MTimeDiff != 0 && (.MTimeDiffSum < .MTimeDiff || .MTimeDiffSum > .MTimeDiff) { continue } // MTimeTrackedDiff if .MTimeTrackedDiff != 0 && .MTimeTrackedDiff != 0 && (.MTimeTrackedDiffSum < .MTimeTrackedDiff || .MTimeTrackedDiffSum > .MTimeTrackedDiff) { continue } // MTimeRecordDiff if .MTimeRecordDiff != 0 && .MTimeRecordDiff != 0 && (.MTimeRecordDiffSum < .MTimeRecordDiff || .MTimeRecordDiffSum > .MTimeRecordDiff) { continue } // MachTick if .MachTick != 0 && .MachTick != 0 && (.MachTick < .MachTick || .MachTick > .MachTick) { continue } // collect and return = append(, ) if > 0 && len() >= { break } } return }) } // Match returns the first record that matches the MatcherFn. func ( *Memory) ( context.Context, MatcherFn, ) ([]*MemoryRecord, error) { // clone DB index .mx.RLock() := make([]*MemoryRecord, len(.db)) copy(, .db) .mx.RUnlock() := (.Mach.Time(nil).ToIndex(.Mach.StateNames()), ) if .Err() != nil || .Ctx.Err() != nil { return nil, errors.Join(.Err(), .Ctx.Err()) } return , nil } func ( *Memory) () []*MemoryRecord { .mx.RLock() defer .mx.RUnlock() // copy := make([]*MemoryRecord, len(.db)) copy(, .db) return }