// Package bbolt provides machine history tracking and traversal using // the bbolt K/V database.
package bbolt import ( amhist am ) type MatcherFn func( now *am.TimeIndex, machBucket *bbolt.Bucket, ) []*amhist.MemoryRecord type Config struct { amhist.BaseConfig EncJson bool // amount of records to save in bulk (default: 100) QueueBatch int32 } const ( BuckMachines = "_machines" BuckTransitions = "transitions" BuckTimes = "times" ) // ///// ///// ///// // ///// TRACER // ///// ///// ///// type tracer struct { *am.TracerNoOp mem *Memory } func ( *tracer) ( am.Api) context.Context { := .mem := time.Now().UTC() // locks .mx.Lock() defer .mx.Unlock() // upsert machine record := .Time(nil) // TODO handle DB errs , := GetMachine(.Db, .Id()) if == nil { .machRec = &amhist.MachineRecord{ MachId: .Id(), FirstTracking: , NextId: 1, } = .machRec } else { .machRec = } .MTimeSum = .Sum(nil) .MTime = .LastTracking = .MachTick = .MachineTick() .LastSync = .nextId.Store(.NextId) // schema if .Cfg.StoreSchema { .Schema = .Schema() .StateNames = .StateNames() } // save := .Db.Update(func( *bbolt.Tx) error { // insert machine := .Bucket([]byte(BuckMachines)) , := .encode(.machRec) if != nil { return } := []byte(.Id()) = .Put(, ) if != nil { return } // create buckets , := .CreateBucketIfNotExists() if != nil { return } _, = .CreateBucketIfNotExists([]byte(BuckTransitions)) if != nil { return } _, = .CreateBucketIfNotExists([]byte(BuckTimes)) if != nil { return } return nil }) if != nil { .onErr() } return nil } func ( *tracer) ( am.Api, am.Schema) { := .mem if .Ctx.Err() != nil { _ = .Dispose() return } // locks .mx.Lock() defer .mx.Unlock() // update schema if !.Cfg.StoreSchema { return } := .machRec .Schema = .Mach.Schema() .StateNames = .Mach.StateNames() // sync mach record := .Db.Update(func( *bbolt.Tx) error { := .Bucket([]byte(BuckMachines)) , := .encode() if != nil { return } return .Put([]byte(.MachId), ) }) if != nil { .onErr() } } func ( *tracer) ( *am.Transition) { := .mem if .Ctx.Err() != nil { _ = .Dispose() return } if (!.IsAccepted.Load() && !.Cfg.TrackRejected) || .Mutation.IsCheck { return } := .Mach := .CalledStates() := .TimeAfter.DiffSince(.TimeBefore). ToIndex(.StateNames()).NonZeroStates() := .Mutation := .Cfg := (.ChangedExclude || len(.Changed) == 0) && (.CalledExclude || len(.Called) == 0) := .TimeAfter := .Filter(.cacheTrackedIdxs) := .TimeBefore.Filter(.cacheTrackedIdxs) := .Sum(nil) := .Sum(nil) // process called for , := range .Called { := slices.Contains(, ) if && .CalledExclude { = false break } else if ! && !.CalledExclude { = true break } } // process changed for , := range .Changed { := slices.Contains(, ) if && .ChangedExclude { = false break } else if ! && !.ChangedExclude { = true break } } if ! { return } // lock .mx.Lock() defer .mx.Unlock() // time record var uint64 if .lastRec != nil { = - .lastRec.MTimeSum } := .MachineTick() := time.Now().UTC() := &amhist.TimeRecord{ MutType: .Type, MTimeSum: , MTimeTrackedSum: , MTimeDiffSum: - .TimeBefore.Sum(nil), MTimeTrackedDiffSum: - .TimeBefore.Sum(.cacheTrackedIdxs), MTimeRecordDiffSum: , MachTick: , HTime: , MTimeTracked: , MTimeTrackedDiff: .DiffSince(), } // optional tx record var *amhist.TransitionRecord if .Cfg.StoreTransitions { // link time record = &amhist.TransitionRecord{ TransitionId: .Id, Called: .Called, IsAuto: .IsAuto, IsAccepted: .IsAccepted.Load(), IsCheck: .IsCheck, IsBroken: .IsBroken.Load(), QueueLen: .QueueLen(), QueuedAt: .QueueTick, // TODO optimize? Arguments: .MapArgs(.SemLogger().ArgsMapper()), } // optional fields if .Source != nil { .SourceTx = .Source.TxId .SourceMach = .Source.MachId } if .QueueTick > 0 { := .QueueTick() .ExecutedAt = } } // update machine record := .machRec .MTime = .MTimeSum = .LastSync = .MachTick = .NextId++ // queue, cache, GC .queue.ids = append(.queue.ids, .nextId.Load()) .nextId.Add(1) .queue.times = append(.queue.times, ) .queue.txs = append(.queue.txs, ) .SavePending.Add(1) // toSave := m.SavePending.Add(1) .lastRec = // TODO ensure save after a delay if .SavePending.Load() >= .Cfg.QueueBatch { .syncMx.RLock() .writeDb(true) .checkGc() } } // itob returns an 8-byte big endian representation of v. func itob( uint64) []byte { := make([]byte, 8) binary.BigEndian.PutUint64(, ) return } // ///// ///// ///// // ///// MEMORY // ///// ///// ///// func ( string) (*bbolt.DB, error) { if == "" { = "amhist" } // TODO optimize: use NoSync? return bbolt.Open(+".db", 0600, &bbolt.Options{ Timeout: 1 * time.Second, }) } type queue struct { times []*amhist.TimeRecord txs []*amhist.TransitionRecord ids []uint64 } type Memory struct { *amhist.BaseMemory Db *bbolt.DB // read-only config for this history Cfg *Config SavePending atomic.Int32 SaveInProgress atomic.Bool Saved atomic.Uint64 // Value of Saved at the end of the last GC SavedGc atomic.Uint64 // sync lock (read: flush, write: sync) syncMx sync.RWMutex // nextId sequence ID nextId atomic.Uint64 // garbage collector lock (read: query, write: GC) gcMx sync.RWMutex // TODO use Ctx disposed atomic.Bool queue *queue queueWorker *errgroup.Group onErr func(err error) machRec *amhist.MachineRecord // global lock, needed mostly for [Memory.MachineRecord]. mx sync.Mutex tr *tracer cacheTrackedIdxs []int lastRec *amhist.TimeRecord } func ( context.Context, *bbolt.DB, am.Api, Config, func( error), ) (*Memory, error) { // init DB := .Update(func( *bbolt.Tx) error { , := .CreateBucketIfNotExists([]byte(BuckMachines)) return }) if != nil { return nil, } := if .MaxRecords <= 0 { .MaxRecords = 1000 } if .QueueBatch <= 0 { .QueueBatch = 100 } // include allowlists in tracked states if !.CalledExclude { .TrackedStates = slices.Concat(.TrackedStates, .Called) } if !.ChangedExclude { .TrackedStates = slices.Concat(.TrackedStates, .Changed) } .TrackedStates = .ParseStates(.TrackedStates) if len(.TrackedStates) == 0 { return nil, fmt.Errorf("%w: no states to track", am.ErrStateMissing) } // init and bind tracer := &Memory{ Cfg: &, Db: , cacheTrackedIdxs: .Index(.TrackedStates), onErr: , queue: &queue{}, queueWorker: &errgroup.Group{}, } .BaseMemory = amhist.NewBaseMemory(, , .BaseConfig, ) .queueWorker.SetLimit(1) := &tracer{ mem: , } .tr = .MachineInit() return , .BindTracer() } // FindLatest is [amhist.BaseMemory.FindLatest]. func ( *Memory) ( context.Context, bool, int, amhist.Query, ) ([]*amhist.MemoryRecord, error) { if := .ValidateQuery(); != nil { return nil, } := .Start := .End := .Mach := .Cfg return .Match(, func( *am.TimeIndex, *bbolt.Bucket) []*amhist.MemoryRecord { := .Index(.MTimeStates) := .Bucket([]byte(BuckTimes)) var *amhist.MemoryRecord := &amhist.MemoryRecord{ Time: &amhist.TimeRecord{}, } var []*amhist.MemoryRecord for := .nextId.Load() - 1; > 0; -- { if .Err() != nil || .Ctx.Err() != nil { return nil } := .Get(itob()) if == nil { .log("empty hit for %d", ) break } // read TimeRecord var error // 1st pass, move 1 more down if == nil { -- .Time, = DecTimeRecord(.Id(), , , .EncJson) := .Get(itob()) if != nil { = &amhist.MemoryRecord{ Time: &amhist.TimeRecord{}, } .Time, = DecTimeRecord(.Id(), , , .EncJson) if != nil { .onErr() return nil } } // 2nd and later passes } else if != nil { = = &amhist.MemoryRecord{ Time: &amhist.TimeRecord{}, } .Time, = DecTimeRecord(.Id(), , , .EncJson) // TODO tx // last pass } else { = = nil } // err if != nil { .onErr() return nil } // states conditions := .Time // Active for , := range .Active { if !am.IsActiveTick(.MTimeTracked[.Index1()]) { continue } } // Activated for , := range .Activated { := .Index1() if !am.IsActiveTick(.MTimeTracked[]) { continue } // if has previously been active if != nil && am.IsActiveTick(.Time.MTimeTracked[]) { continue } } // Inactive for , := range .Inactive { if am.IsActiveTick(.MTimeTracked[.Index1()]) { continue } } // Deactivated for , := range .Deactivated { := .Index1() if am.IsActiveTick(.MTimeTracked[]) { continue } // if has previously been inactive if != nil && !am.IsActiveTick(.Time.MTimeTracked[]) { continue } } // MTimeStates if len(.MTimeStates) > 0 { // caution: slice a sliced time slice := .MTimeTracked.Filter() if .Before(false, .MTime) || .After(false, .MTime) { continue } } // time conditions // HTime if !.HTime.IsZero() && !.HTime.IsZero() && (.HTime.Before(.HTime) || .HTime.After(.HTime)) { continue } // MTimeSum if .MTimeSum != 0 && .MTimeSum != 0 && (.MTimeSum < .MTimeSum || .MTimeSum > .MTimeSum) { continue } // MTimeTrackedSum if .MTimeTrackedSum != 0 && .MTimeTrackedSum != 0 && (.MTimeTrackedSum < .MTimeTrackedSum || .MTimeTrackedSum > .MTimeTrackedSum) { continue } // MTimeDiff if .MTimeDiff != 0 && .MTimeDiff != 0 && (.MTimeDiffSum < .MTimeDiff || .MTimeDiffSum > .MTimeDiff) { continue } // MTimeTrackedDiff if .MTimeTrackedDiff != 0 && .MTimeTrackedDiff != 0 && (.MTimeTrackedDiffSum < .MTimeTrackedDiff || .MTimeTrackedDiffSum > .MTimeTrackedDiff) { continue } // MTimeRecordDiff if .MTimeRecordDiff != 0 && .MTimeRecordDiff != 0 && (.MTimeRecordDiffSum < .MTimeRecordDiff || .MTimeRecordDiffSum > .MTimeRecordDiff) { continue } // MachTick if .MachTick != 0 && .MachTick != 0 && (.MachTick < .MachTick || .MachTick > .MachTick) { continue } // read TransitionRecord if && .StoreTransitions { .Transition, = DecTransitionRecord(.Id(), , .Get(itob()), .EncJson) if != nil { .onErr() } } // collect and return = append(, ) if > 0 && len() >= { break } } return }) } // Sync is [amhist.BaseMemory.Sync]. func ( *Memory) () error { .log("sync...") // locks .mx.Lock() defer .mx.Unlock() .syncMx.Lock() defer .syncMx.Unlock() .writeDb(false) .log("sync OK") return nil } // Match returns the first record that matches the MatcherFn. func ( *Memory) ( context.Context, MatcherFn, ) ([]*amhist.MemoryRecord, error) { // stop GC and query .gcMx.RLock() var []*amhist.MemoryRecord := .Db.View(func( *bbolt.Tx) error { := .Bucket([]byte(.Mach.Id())) := .Mach.Time(nil).ToIndex(.Mach.StateNames()) = (, ) return nil }) .gcMx.RUnlock() // err if .Err() != nil || .Ctx.Err() != nil { return nil, errors.Join(.Err(), .Ctx.Err()) } else if != nil { return nil, } else if == nil { return nil, nil } return , nil } // MachineRecord is [amhist.BaseMemory.MachineRecord]. func ( *Memory) () *amhist.MachineRecord { .mx.Lock() defer .mx.Unlock() // link to a copy := *.machRec return & } // Dispose is [amhist.BaseMemory.Dispose]. TODO merge with ctx func ( *Memory) () error { if !.disposed.CompareAndSwap(false, true) { return nil } .mx.Lock() defer .mx.Unlock() .gcMx.Lock() defer .gcMx.Unlock() return errors.Join( .Db.Close(), .Mach.DetachTracer(.tr), ) } // Config is [amhist.BaseMemory.Config]. func ( *Memory) () amhist.BaseConfig { return .Cfg.BaseConfig } // Machine is [amhist.BaseMemory.Machine]. func ( *Memory) () am.Api { return .Mach } func ( *Memory) ( any) ([]byte, error) { if .Cfg.EncJson { return json.MarshalIndent(, "", " ") } return msgpack.Marshal() } // writeDb requires [Memory.mx]. func ( *Memory) ( bool) { if .SavePending.Load() <= 0 { return } := .queue // copy := *.machRec := .times .times = nil := .txs .txs = nil := .ids .ids = nil .log("writeDb for %d record", len()) := len() .SavePending.Add(-int32()) // fork go func() { if { defer .syncMx.RUnlock() } := .Db.Batch(func( *bbolt.Tx) error { // buckets := .Bucket([]byte(BuckMachines)) := []byte(.MachId) := .Bucket() := .Bucket([]byte(BuckTimes)) var *bbolt.Bucket if .Cfg.StoreTransitions { = .Bucket([]byte(BuckTransitions)) } // update machine , := .encode() if != nil { return } = .Put(, ) if != nil { return } for , := range { := itob([]) // insert time , := .encode() if != nil { return } = .Put(, ) if != nil { return } // insert tx if !.Cfg.StoreTransitions { continue } := [] , := .encode() if != nil { return } = .Put(, ) if != nil { return } } // stats := .Saved.Add(uint64()) .log("saved %d records (total %d)", , ) return nil }) if != nil { .onErr() } }() } func ( *Memory) () { // maybe GC TODO cap to max diff := .SavedGc.Load() := .Saved.Load() if float32(-) <= float32(.Cfg.MaxRecords)*1.5 || !.gcMx.TryLock() { return } .log("gc...") // dont block tracer go func() { defer .gcMx.Unlock() // trim the bottom := .Db.Update(func( *bbolt.Tx) error { := .Bucket([]byte(.Mach.Id())) := .Bucket([]byte(BuckTimes)) var *bbolt.Bucket if .Cfg.StoreTransitions { = .Bucket([]byte(BuckTransitions)) } := 0 // go 1 by 1 and delete stuff for := .nextId.Load() - uint64(.Cfg.MaxRecords); > 0; -- { ++ // time := .Delete(itob()) if != nil { return } // tx if !.Cfg.StoreTransitions { continue } = .Delete(itob()) if != nil { // show err, but dont stop deleting .onErr() } } .log("gc done for %d records", ) return nil }) if != nil { .onErr() } .SavedGc.Store(.Saved.Load()) }() } func ( *Memory) ( string, ...any) { if !.Cfg.Log { return } log.Printf(, ...) } // ///// ///// ///// // ///// DB // ///// ///// ///// // GetMachine returns a machine record for a given machine id. func ( *bbolt.DB, string) (*amhist.MachineRecord, error) { var *amhist.MachineRecord := .View(func( *bbolt.Tx) error { := .Bucket([]byte(BuckMachines)) := .Get([]byte()) if == nil { return nil } return Decode(, , true) }) return , } // ListMachines returns a list of all machines in a database. func ( *bbolt.DB) ([]*amhist.MachineRecord, error) { := make([]*amhist.MachineRecord, 0) := .View(func( *bbolt.Tx) error { := .Bucket([]byte(BuckMachines)) := .Cursor() for , := .First(); != nil; , = .Next() { := &amhist.MachineRecord{} := Decode(, , true) if != nil { return } = append(, ) } return nil }) return , } func ( string, uint64, []byte, bool, ) (*amhist.TimeRecord, error) { // TODO optimize: cache [machId-key] := &amhist.TimeRecord{} := Decode(, , ) if != nil { return nil, } return , nil } func ( string, uint64, []byte, bool, ) (*amhist.TransitionRecord, error) { // TODO optimize: cache [machId-key] := &amhist.TransitionRecord{} := Decode(, , ) if != nil { return nil, } return , nil } func ( []byte, any, bool) error { if { if := json.Unmarshal(, ); == nil { return nil } } return msgpack.Unmarshal(, ) }