// Package badger provides machine history tracking and traversal using // the Badger K/V database.
package badger import ( amhelp amhist am ) type MatcherFn func(now *am.TimeIndex, txn *badger.Txn) []*amhist.MemoryRecord type Config struct { amhist.BaseConfig EncJson bool // amount of records to save in bulk (default: 100) QueueBatch int32 } const ( prefixMachines = "_machines/" suffixTimes = "/times/" suffixTxs = "/txs/" // gcSize is the max number of deletes per GC transaction. gcSize = 1000 ) func machineKey( string) []byte { return []byte(prefixMachines + ) } func timeKey( string, uint64) []byte { := make([]byte, len()+len(suffixTimes)+8) copy(, ) copy([len():], suffixTimes) binary.BigEndian.PutUint64([len()+len(suffixTimes):], ) return } func txKey( string, uint64) []byte { := make([]byte, len()+len(suffixTxs)+8) copy(, ) copy([len():], suffixTxs) binary.BigEndian.PutUint64([len()+len(suffixTxs):], ) return } // ///// ///// ///// // ///// TRACER // ///// ///// ///// type tracer struct { *am.TracerNoOp mem *Memory id string } func ( *tracer) () string { return "badger" + .id } func ( *tracer) ( am.Api) context.Context { := .mem := time.Now().UTC() // locks .mx.Lock() defer .mx.Unlock() // upsert machine record := .Time(nil) // TODO handle DB errs , := GetMachine(.Db, .Id()) if == nil { .machRec = &amhist.MachineRecord{ MachId: .Id(), FirstTracking: , NextId: 1, } = .machRec } else { .machRec = } .MTimeSum = .Sum(nil) .MTime = .LastTracking = .MachTick = .MachineTick() .LastSync = .nextId.Store(.NextId) // schema if .Cfg.StoreSchema { .Schema = .Schema() .StateNames = .StateNames() } // save machine record := .Db.Update(func( *badger.Txn) error { , := .encode(.machRec) if != nil { return } return .Set(machineKey(.Id()), ) }) if != nil { .onErr() } return nil } func ( *tracer) ( am.Api, am.Schema) { := .mem if .Ctx.Err() != nil { _ = .Dispose() return } // locks .mx.Lock() defer .mx.Unlock() // update schema if !.Cfg.StoreSchema { return } := .machRec .Schema = .Mach.Schema() .StateNames = .Mach.StateNames() := .Db.Update(func( *badger.Txn) error { , := .encode() if != nil { return } return .Set(machineKey(.MachId), ) }) if != nil { .onErr() } } func ( *tracer) ( *am.Transition) { := .mem if .Ctx.Err() != nil { _ = .Dispose() return } if (!.IsAccepted.Load() && !.Cfg.TrackRejected) || .Mutation.IsCheck { return } := .Mach := .CalledStates() := .TimeAfter.DiffSince(.TimeBefore). ToIndex(.StateNames()).NonZeroStates() := .Mutation := .Cfg := (.ChangedExclude || len(.Changed) == 0) && (.CalledExclude || len(.Called) == 0) := .TimeAfter := .Filter(.cacheTrackedIdxs) := .TimeBefore.Filter(.cacheTrackedIdxs) := .Sum(nil) := .Sum(nil) // process called for , := range .Called { := slices.Contains(, ) if && .CalledExclude { = false break } else if ! && !.CalledExclude { = true break } } // process changed for , := range .Changed { := slices.Contains(, ) if && .ChangedExclude { = false break } else if ! && !.ChangedExclude { = true break } } if ! { return } // lock .mx.Lock() defer .mx.Unlock() // time record var uint64 if .lastRec != nil { = - .lastRec.MTimeSum } := .MachineTick() := time.Now().UTC() := &amhist.TimeRecord{ MutType: .Type, MTimeSum: , MTimeTrackedSum: , MTimeDiffSum: - .TimeBefore.Sum(nil), MTimeTrackedDiffSum: - .TimeBefore.Sum(.cacheTrackedIdxs), MTimeRecordDiffSum: , MachTick: , HTime: , MTimeTracked: , MTimeTrackedDiff: .DiffSince(), } // optional tx record var *amhist.TransitionRecord if .Cfg.StoreTransitions { // link time record = &amhist.TransitionRecord{ TransitionId: .Id, Called: .Called, IsAuto: .IsAuto, IsAccepted: .IsAccepted.Load(), IsCheck: .IsCheck, IsBroken: .IsBroken.Load(), QueueLen: .QueueLen(), QueuedAt: .QueueTick, // TODO optimize? Arguments: .MapArgs(.SemLogger().ArgsMapper()), } // optional fields if .Source != nil { .SourceTx = .Source.TxId .SourceMach = .Source.MachId } if .QueueTick > 0 { := .QueueTick() .ExecutedAt = } } // update machine record := .machRec .MTime = .MTimeSum = .LastSync = .MachTick = .NextId++ // queue, cache, GC .queue.ids = append(.queue.ids, .nextId.Load()) .nextId.Add(1) .queue.times = append(.queue.times, ) .queue.txs = append(.queue.txs, ) .SavePending.Add(1) .lastRec = // TODO ensure save after a delay if .SavePending.Load() >= .Cfg.QueueBatch { .syncMx.RLock() .writeDb(true) .checkGc() } } // ///// ///// ///// // ///// MEMORY // ///// ///// ///// func ( string) (*badger.DB, error) { if == "" { = "amhist" } := badger.DefaultOptions( + ".badger") if amhelp.IsWasm() || amhelp.IsWasi() { = badger.DefaultOptions("").WithInMemory(true) } // TODO handle logger .Logger = nil return badger.Open() } type queue struct { times []*amhist.TimeRecord txs []*amhist.TransitionRecord ids []uint64 } type Memory struct { *amhist.BaseMemory Db *badger.DB // read-only config for this history Cfg *Config SavePending atomic.Int32 SaveInProgress atomic.Bool Saved atomic.Uint64 // Value of Saved at the end of the last GC SavedGc atomic.Uint64 // sync lock (read: flush, write: sync) syncMx sync.RWMutex // nextId sequence ID nextId atomic.Uint64 // garbage collector lock (read: query, write: GC) gcMx sync.RWMutex // TODO use Context disposed atomic.Bool queue *queue onErr func(err error) machRec *amhist.MachineRecord // global lock, needed mostly for [Memory.MachineRecord]. mx sync.Mutex tr *tracer cacheTrackedIdxs []int lastRec *amhist.TimeRecord } func ( context.Context, *badger.DB, am.Api, Config, func( error), ) (*Memory, error) { := if .MaxRecords <= 0 { .MaxRecords = 1000 } if .QueueBatch <= 0 { .QueueBatch = 100 } // include allowlists in tracked states if !.CalledExclude { .TrackedStates = slices.Concat(.TrackedStates, .Called) } if !.ChangedExclude { .TrackedStates = slices.Concat(.TrackedStates, .Changed) } .TrackedStates = .ParseStates(.TrackedStates) if len(.TrackedStates) == 0 { return nil, fmt.Errorf("%w: no states to track", am.ErrStateMissing) } // init and bind tracer := &Memory{ Cfg: &, Db: , cacheTrackedIdxs: .Index(.TrackedStates), onErr: , queue: &queue{}, } .BaseMemory = amhist.NewBaseMemory(, , .BaseConfig, ) := &tracer{ mem: , id: amhelp.RandId(4), } .tr = .MachineInit() amhelp.DisposeBind(, func( string, context.Context) { := .Dispose() if != nil { .onErr() } }) , := .BindTracer() return , } // FindLatest is [amhist.BaseMemory.FindLatest]. func ( *Memory) ( context.Context, bool, int, amhist.Query, ) ([]*amhist.MemoryRecord, error) { if := .ValidateQuery(); != nil { return nil, } := .Start := .End := .Mach := .Cfg return .Match(, func( *am.TimeIndex, *badger.Txn) []*amhist.MemoryRecord { := .Id() := .Index(.MTimeStates) var *amhist.MemoryRecord := &amhist.MemoryRecord{ Time: &amhist.TimeRecord{}, } var []*amhist.MemoryRecord for := .nextId.Load() - 1; > 0; -- { if .Err() != nil || .Ctx.Err() != nil { return nil } , := getVal(, timeKey(, )) if != nil { .log("empty hit for %d", ) break } // read TimeRecord // 1st pass, move 1 more down if == nil { -- .Time, = DecTimeRecord(, , , .EncJson) , := getVal(, timeKey(, )) if == nil { = &amhist.MemoryRecord{ Time: &amhist.TimeRecord{}, } .Time, = DecTimeRecord(, , , .EncJson) if != nil { .onErr() return nil } } // 2nd and later passes } else if != nil { = = &amhist.MemoryRecord{ Time: &amhist.TimeRecord{}, } .Time, = DecTimeRecord(, , , .EncJson) // TODO tx // last pass } else { = = nil } // err if != nil { .onErr() return nil } // states conditions := .Time // Active for , := range .Active { if !am.IsActiveTick(.MTimeTracked[.Index1()]) { continue } } // Activated for , := range .Activated { := .Index1() if !am.IsActiveTick(.MTimeTracked[]) { continue } // if has previously been active if != nil && am.IsActiveTick(.Time.MTimeTracked[]) { continue } } // Inactive for , := range .Inactive { if am.IsActiveTick(.MTimeTracked[.Index1()]) { continue } } // Deactivated for , := range .Deactivated { := .Index1() if am.IsActiveTick(.MTimeTracked[]) { continue } // if has previously been inactive if != nil && !am.IsActiveTick(.Time.MTimeTracked[]) { continue } } // MTimeStates if len(.MTimeStates) > 0 { // caution: slice a sliced time slice := .MTimeTracked.Filter() if .Before(false, .MTime) || .After(false, .MTime) { continue } } // time conditions // HTime if !.HTime.IsZero() && !.HTime.IsZero() && (.HTime.Before(.HTime) || .HTime.After(.HTime)) { continue } // MTimeSum if .MTimeSum != 0 && .MTimeSum != 0 && (.MTimeSum < .MTimeSum || .MTimeSum > .MTimeSum) { continue } // MTimeTrackedSum if .MTimeTrackedSum != 0 && .MTimeTrackedSum != 0 && (.MTimeTrackedSum < .MTimeTrackedSum || .MTimeTrackedSum > .MTimeTrackedSum) { continue } // MTimeDiff if .MTimeDiff != 0 && .MTimeDiff != 0 && (.MTimeDiffSum < .MTimeDiff || .MTimeDiffSum > .MTimeDiff) { continue } // MTimeTrackedDiff if .MTimeTrackedDiff != 0 && .MTimeTrackedDiff != 0 && (.MTimeTrackedDiffSum < .MTimeTrackedDiff || .MTimeTrackedDiffSum > .MTimeTrackedDiff) { continue } // MTimeRecordDiff if .MTimeRecordDiff != 0 && .MTimeRecordDiff != 0 && (.MTimeRecordDiffSum < .MTimeRecordDiff || .MTimeRecordDiffSum > .MTimeRecordDiff) { continue } // MachTick if .MachTick != 0 && .MachTick != 0 && (.MachTick < .MachTick || .MachTick > .MachTick) { continue } // read TransitionRecord if && .StoreTransitions { , := getVal(, txKey(, )) if != nil { .Transition, = DecTransitionRecord(, , , .EncJson) if != nil { .onErr() } } } // collect and return = append(, ) if > 0 && len() >= { break } } return }) } // Sync is [amhist.BaseMemory.Sync]. func ( *Memory) () error { .log("sync...") // locks .mx.Lock() defer .mx.Unlock() .syncMx.Lock() defer .syncMx.Unlock() .writeDb(false) .log("sync OK") return nil } // Match returns records that match the MatcherFn. func ( *Memory) ( context.Context, MatcherFn, ) ([]*amhist.MemoryRecord, error) { // stop GC and query .gcMx.RLock() var []*amhist.MemoryRecord := .Db.View(func( *badger.Txn) error { := .Mach.Time(nil).ToIndex(.Mach.StateNames()) = (, ) return nil }) .gcMx.RUnlock() // err if .Err() != nil || .Ctx.Err() != nil { return nil, errors.Join(.Err(), .Ctx.Err()) } else if != nil { return nil, } else if == nil { return nil, nil } return , nil } // MachineRecord is [amhist.BaseMemory.MachineRecord]. func ( *Memory) () *amhist.MachineRecord { .mx.Lock() defer .mx.Unlock() // link to a copy := *.machRec return & } // Dispose is [amhist.BaseMemory.Dispose]. TODO merge with ctx func ( *Memory) () error { if !.disposed.CompareAndSwap(false, true) { return nil } .mx.Lock() defer .mx.Unlock() .gcMx.Lock() defer .gcMx.Unlock() return errors.Join( .Db.Close(), .Mach.DetachTracer(.tr.TracerId()), ) } // Config is [amhist.BaseMemory.Config]. func ( *Memory) () amhist.BaseConfig { return .Cfg.BaseConfig } // Machine is [amhist.BaseMemory.Machine]. func ( *Memory) () am.Api { return .Mach } func ( *Memory) ( any) ([]byte, error) { if .Cfg.EncJson { return json.MarshalIndent(, "", " ") } return msgpack.Marshal() } // writeDb requires [Memory.mx]. func ( *Memory) ( bool) { if .SavePending.Load() <= 0 { return } := .queue // copy := *.machRec := .times .times = nil := .txs .txs = nil := .ids .ids = nil .log("writeDb for %d record", len()) := len() .SavePending.Add(-int32()) // fork go func() { if { defer .syncMx.RUnlock() } := .Db.NewWriteBatch() // update machine record , := .encode() if != nil { .Cancel() .onErr() return } if := .Set(machineKey(.MachId), ); != nil { .Cancel() .onErr() return } for , := range { // insert time , := .encode() if != nil { .Cancel() .onErr() return } if := .Set(timeKey(.MachId, []), ); != nil { .Cancel() .onErr() return } // insert tx if !.Cfg.StoreTransitions { continue } := [] , := .encode() if != nil { .Cancel() .onErr() return } if := .Set(txKey(.MachId, []), ); != nil { .Cancel() .onErr() return } } if := .Flush(); != nil { .onErr() return } // stats := .Saved.Add(uint64()) .log("saved %d records (total %d)", , ) }() } func ( *Memory) () { // maybe GC TODO cap to max diff := .SavedGc.Load() := .Saved.Load() if float32(-) <= float32(.Cfg.MaxRecords)*1.5 || !.gcMx.TryLock() { return } .log("gc...") // dont block tracer go func() { defer .gcMx.Unlock() := .Mach.Id() := .nextId.Load() - uint64(.Cfg.MaxRecords) := 0 // delete in batches to stay within transaction size limits for := uint64(1); <= ; += gcSize { := min(+gcSize, +1) := .Db.Update(func( *badger.Txn) error { for := ; < ; ++ { ++ if := .Delete(timeKey(, )); != nil && !errors.Is(, badger.ErrKeyNotFound) { return } if !.Cfg.StoreTransitions { continue } if := .Delete(txKey(, )); != nil && !errors.Is(, badger.ErrKeyNotFound) { // show err, but dont stop deleting .onErr() } } return nil }) if != nil { .onErr() } } .log("gc done for %d records", ) .SavedGc.Store(.Saved.Load()) }() } func ( *Memory) ( string, ...any) { if !.Cfg.Log { return } log.Printf(, ...) } // ///// ///// ///// // ///// DB // ///// ///// ///// // getVal is a helper to read a value from badger by key, returning nil if not // found. func getVal( *badger.Txn, []byte) ([]byte, error) { , := .Get() if errors.Is(, badger.ErrKeyNotFound) { return nil, } if != nil { return nil, } return .ValueCopy(nil) } // GetMachine returns a machine record for a given machine id. func ( *badger.DB, string) (*amhist.MachineRecord, error) { var *amhist.MachineRecord := .View(func( *badger.Txn) error { , := getVal(, machineKey()) if errors.Is(, badger.ErrKeyNotFound) { return nil } if != nil { return } = &amhist.MachineRecord{} return Decode(, , true) }) return , } // ListMachines returns a list of all machines in a database. func ( *badger.DB) ([]*amhist.MachineRecord, error) { := []byte(prefixMachines) var []*amhist.MachineRecord := .View(func( *badger.Txn) error { := badger.DefaultIteratorOptions .Prefix = := .NewIterator() defer .Close() for .Rewind(); .ValidForPrefix(); .Next() { := .Item() := .Value(func( []byte) error { := &amhist.MachineRecord{} if := Decode(, , true); != nil { return } = append(, ) return nil }) if != nil { return } } return nil }) return , } func ( string, uint64, []byte, bool, ) (*amhist.TimeRecord, error) { := &amhist.TimeRecord{} := Decode(, , ) if != nil { return nil, } return , nil } func ( string, uint64, []byte, bool, ) (*amhist.TransitionRecord, error) { := &amhist.TransitionRecord{} := Decode(, , ) if != nil { return nil, } return , nil } func ( []byte, any, bool) error { if { if := json.Unmarshal(, ); == nil { return nil } } return msgpack.Unmarshal(, ) }