package bbolt
import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"log"
"slices"
"sync"
"sync/atomic"
"time"
"github.com/vmihailenco/msgpack/v5"
"go.etcd.io/bbolt"
"golang.org/x/sync/errgroup"
amhist "github.com/pancsta/asyncmachine-go/pkg/history"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
)
type MatcherFn func (
now *am .TimeIndex , machBucket *bbolt .Bucket ,
) []*amhist .MemoryRecord
type Config struct {
amhist .BaseConfig
EncJson bool
QueueBatch int32
}
const (
BuckMachines = "_machines"
BuckTransitions = "transitions"
BuckTimes = "times"
)
type tracer struct {
*am .TracerNoOp
mem *Memory
}
func (t *tracer ) MachineInit (mach am .Api ) context .Context {
m := t .mem
now := time .Now ().UTC ()
m .mx .Lock ()
defer m .mx .Unlock ()
mTime := mach .Time (nil )
rec , _ := GetMachine (m .Db , mach .Id ())
if rec == nil {
m .machRec = &amhist .MachineRecord {
MachId : mach .Id (),
FirstTracking : now ,
NextId : 1 ,
}
rec = m .machRec
} else {
m .machRec = rec
}
rec .MTimeSum = mTime .Sum (nil )
rec .MTime = mTime
rec .LastTracking = now
rec .MachTick = mach .MachineTick ()
rec .LastSync = now
m .nextId .Store (rec .NextId )
if m .Cfg .StoreSchema {
rec .Schema = mach .Schema ()
rec .StateNames = mach .StateNames ()
}
err := m .Db .Update (func (dbTx *bbolt .Tx ) error {
mb := dbTx .Bucket ([]byte (BuckMachines ))
enc , err := m .encode (m .machRec )
if err != nil {
return err
}
machIdBt := []byte (mach .Id ())
err = mb .Put (machIdBt , enc )
if err != nil {
return err
}
b , err := dbTx .CreateBucketIfNotExists (machIdBt )
if err != nil {
return err
}
_, err = b .CreateBucketIfNotExists ([]byte (BuckTransitions ))
if err != nil {
return err
}
_, err = b .CreateBucketIfNotExists ([]byte (BuckTimes ))
if err != nil {
return err
}
return nil
})
if err != nil {
m .onErr (err )
}
return nil
}
func (t *tracer ) SchemaChange (machine am .Api , old am .Schema ) {
m := t .mem
if m .Ctx .Err () != nil {
_ = m .Dispose ()
return
}
m .mx .Lock ()
defer m .mx .Unlock ()
if !m .Cfg .StoreSchema {
return
}
rec := m .machRec
rec .Schema = m .Mach .Schema ()
rec .StateNames = m .Mach .StateNames ()
err := m .Db .Update (func (tx *bbolt .Tx ) error {
b := tx .Bucket ([]byte (BuckMachines ))
encRec , err := m .encode (rec )
if err != nil {
return err
}
return b .Put ([]byte (rec .MachId ), encRec )
})
if err != nil {
m .onErr (err )
}
}
func (t *tracer ) TransitionEnd (tx *am .Transition ) {
m := t .mem
if m .Ctx .Err () != nil {
_ = m .Dispose ()
return
}
if (!tx .IsAccepted .Load () && !m .Cfg .TrackRejected ) || tx .Mutation .IsCheck {
return
}
mach := m .Mach
called := tx .CalledStates ()
changed := tx .TimeAfter .DiffSince (tx .TimeBefore ).
ToIndex (mach .StateNames ()).NonZeroStates ()
mut := tx .Mutation
cfg := m .Cfg
match := (cfg .ChangedExclude || len (cfg .Changed ) == 0 ) &&
(cfg .CalledExclude || len (cfg .Called ) == 0 )
mTime := tx .TimeAfter
mTimeTracked := mTime .Filter (m .cacheTrackedIdxs )
mTimeTrackedBefore := tx .TimeBefore .Filter (m .cacheTrackedIdxs )
sum := mTime .Sum (nil )
sumTracked := mTimeTracked .Sum (nil )
for _ , name := range cfg .Called {
listed := slices .Contains (called , name )
if listed && cfg .CalledExclude {
match = false
break
} else if !listed && !cfg .CalledExclude {
match = true
break
}
}
for _ , name := range cfg .Changed {
listed := slices .Contains (changed , name )
if listed && cfg .ChangedExclude {
match = false
break
} else if !listed && !cfg .ChangedExclude {
match = true
break
}
}
if !match {
return
}
m .mx .Lock ()
defer m .mx .Unlock ()
var recordDiff uint64
if m .lastRec != nil {
recordDiff = sum - m .lastRec .MTimeSum
}
machTick := mach .MachineTick ()
now := time .Now ().UTC ()
recTime := &amhist .TimeRecord {
MutType : mut .Type ,
MTimeSum : sum ,
MTimeTrackedSum : sumTracked ,
MTimeDiffSum : sum - tx .TimeBefore .Sum (nil ),
MTimeTrackedDiffSum : sumTracked - tx .TimeBefore .Sum (m .cacheTrackedIdxs ),
MTimeRecordDiffSum : recordDiff ,
MachTick : machTick ,
HTime : now ,
MTimeTracked : mTimeTracked ,
MTimeTrackedDiff : mTimeTracked .DiffSince (mTimeTrackedBefore ),
}
var recTx *amhist .TransitionRecord
if m .Cfg .StoreTransitions {
recTx = &amhist .TransitionRecord {
TransitionId : tx .Id ,
Called : mut .Called ,
IsAuto : mut .IsAuto ,
IsAccepted : tx .IsAccepted .Load (),
IsCheck : mut .IsCheck ,
IsBroken : tx .IsBroken .Load (),
QueueLen : mach .QueueLen (),
QueuedAt : mut .QueueTick ,
Arguments : mut .MapArgs (mach .SemLogger ().ArgsMapper ()),
}
if mut .Source != nil {
recTx .SourceTx = mut .Source .TxId
recTx .SourceMach = mut .Source .MachId
}
if mut .QueueTick > 0 {
qt := mach .QueueTick ()
recTx .ExecutedAt = qt
}
}
machRec := m .machRec
machRec .MTime = mTime
machRec .MTimeSum = sum
machRec .LastSync = now
machRec .MachTick = machTick
machRec .NextId ++
m .queue .ids = append (m .queue .ids , m .nextId .Load ())
m .nextId .Add (1 )
m .queue .times = append (m .queue .times , recTime )
m .queue .txs = append (m .queue .txs , recTx )
m .SavePending .Add (1 )
m .lastRec = recTime
if m .SavePending .Load () >= m .Cfg .QueueBatch {
m .syncMx .RLock ()
m .writeDb (true )
m .checkGc ()
}
}
func itob(v uint64 ) []byte {
b := make ([]byte , 8 )
binary .BigEndian .PutUint64 (b , v )
return b
}
func NewDb (name string ) (*bbolt .DB , error ) {
if name == "" {
name = "amhist"
}
return bbolt .Open (name +".db" , 0600 , &bbolt .Options {
Timeout : 1 * time .Second ,
})
}
type queue struct {
times []*amhist .TimeRecord
txs []*amhist .TransitionRecord
ids []uint64
}
type Memory struct {
*amhist .BaseMemory
Db *bbolt .DB
Cfg *Config
SavePending atomic .Int32
SaveInProgress atomic .Bool
Saved atomic .Uint64
SavedGc atomic .Uint64
syncMx sync .RWMutex
nextId atomic .Uint64
gcMx sync .RWMutex
disposed atomic .Bool
queue *queue
queueWorker *errgroup .Group
onErr func (err error )
machRec *amhist .MachineRecord
mx sync .Mutex
tr *tracer
cacheTrackedIdxs []int
lastRec *amhist .TimeRecord
}
func NewMemory (
ctx context .Context , db *bbolt .DB , mach am .Api , cfg Config ,
onErr func (err error ),
) (*Memory , error ) {
err := db .Update (func (tx *bbolt .Tx ) error {
_ , err := tx .CreateBucketIfNotExists ([]byte (BuckMachines ))
return err
})
if err != nil {
return nil , err
}
c := cfg
if c .MaxRecords <= 0 {
c .MaxRecords = 1000
}
if c .QueueBatch <= 0 {
c .QueueBatch = 100
}
if !c .CalledExclude {
c .TrackedStates = slices .Concat (c .TrackedStates , c .Called )
}
if !c .ChangedExclude {
c .TrackedStates = slices .Concat (c .TrackedStates , c .Changed )
}
c .TrackedStates = mach .ParseStates (c .TrackedStates )
if len (c .TrackedStates ) == 0 {
return nil , fmt .Errorf ("%w: no states to track" , am .ErrStateMissing )
}
mem := &Memory {
Cfg : &c ,
Db : db ,
cacheTrackedIdxs : mach .Index (c .TrackedStates ),
onErr : onErr ,
queue : &queue {},
queueWorker : &errgroup .Group {},
}
mem .BaseMemory = amhist .NewBaseMemory (ctx , mach , cfg .BaseConfig , mem )
mem .queueWorker .SetLimit (1 )
tr := &tracer {
mem : mem ,
}
mem .tr = tr
tr .MachineInit (mach )
return mem , mach .BindTracer (tr )
}
func (m *Memory ) FindLatest (
ctx context .Context , retTx bool , limit int , query amhist .Query ,
) ([]*amhist .MemoryRecord , error ) {
if err := m .ValidateQuery (query ); err != nil {
return nil , err
}
s := query .Start
e := query .End
mach := m .Mach
cfg := m .Cfg
return m .Match (ctx , func (
now *am .TimeIndex , machBuck *bbolt .Bucket ) []*amhist .MemoryRecord {
mTimeIdxs := mach .Index (s .MTimeStates )
b := machBuck .Bucket ([]byte (BuckTimes ))
var older *amhist .MemoryRecord
r := &amhist .MemoryRecord {
Time : &amhist .TimeRecord {},
}
var ret []*amhist .MemoryRecord
for id := m .nextId .Load () - 1 ; id > 0 ; id -- {
if ctx .Err () != nil || m .Ctx .Err () != nil {
return nil
}
v := b .Get (itob (id ))
if v == nil {
m .log ("empty hit for %d" , id )
break
}
var err error
if older == nil {
id --
r .Time , err = DecTimeRecord (mach .Id (), id , v , cfg .EncJson )
v := b .Get (itob (id ))
if v != nil {
older = &amhist .MemoryRecord {
Time : &amhist .TimeRecord {},
}
older .Time , err = DecTimeRecord (mach .Id (), id , v , cfg .EncJson )
if err != nil {
m .onErr (err )
return nil
}
}
} else if v != nil {
r = older
older = &amhist .MemoryRecord {
Time : &amhist .TimeRecord {},
}
older .Time , err = DecTimeRecord (mach .Id (), id , v , cfg .EncJson )
} else {
r = older
older = nil
}
if err != nil {
m .onErr (err )
return nil
}
t := r .Time
for _ , state := range query .Active {
if !am .IsActiveTick (t .MTimeTracked [m .Index1 (state )]) {
continue
}
}
for _ , state := range query .Activated {
idx := m .Index1 (state )
if !am .IsActiveTick (t .MTimeTracked [idx ]) {
continue
}
if older != nil && am .IsActiveTick (older .Time .MTimeTracked [idx ]) {
continue
}
}
for _ , state := range query .Inactive {
if am .IsActiveTick (t .MTimeTracked [mach .Index1 (state )]) {
continue
}
}
for _ , state := range query .Deactivated {
idx := m .Index1 (state )
if am .IsActiveTick (t .MTimeTracked [idx ]) {
continue
}
if older != nil && !am .IsActiveTick (older .Time .MTimeTracked [idx ]) {
continue
}
}
if len (s .MTimeStates ) > 0 {
mTimeTrackedCond := t .MTimeTracked .Filter (mTimeIdxs )
if mTimeTrackedCond .Before (false , s .MTime ) ||
mTimeTrackedCond .After (false , e .MTime ) {
continue
}
}
if !s .HTime .IsZero () && !e .HTime .IsZero () &&
(t .HTime .Before (s .HTime ) || t .HTime .After (e .HTime )) {
continue
}
if s .MTimeSum != 0 && e .MTimeSum != 0 &&
(t .MTimeSum < s .MTimeSum || t .MTimeSum > e .MTimeSum ) {
continue
}
if s .MTimeTrackedSum != 0 && e .MTimeTrackedSum != 0 &&
(t .MTimeTrackedSum < s .MTimeTrackedSum ||
t .MTimeTrackedSum > e .MTimeTrackedSum ) {
continue
}
if s .MTimeDiff != 0 && e .MTimeDiff != 0 &&
(t .MTimeDiffSum < s .MTimeDiff || t .MTimeDiffSum > e .MTimeDiff ) {
continue
}
if s .MTimeTrackedDiff != 0 && e .MTimeTrackedDiff != 0 &&
(t .MTimeTrackedDiffSum < s .MTimeTrackedDiff ||
t .MTimeTrackedDiffSum > e .MTimeTrackedDiff ) {
continue
}
if s .MTimeRecordDiff != 0 && e .MTimeRecordDiff != 0 &&
(t .MTimeRecordDiffSum < s .MTimeRecordDiff ||
t .MTimeRecordDiffSum > e .MTimeRecordDiff ) {
continue
}
if s .MachTick != 0 && e .MachTick != 0 &&
(t .MachTick < s .MachTick || t .MachTick > e .MachTick ) {
continue
}
if retTx && cfg .StoreTransitions {
r .Transition , err = DecTransitionRecord (mach .Id (), id ,
machBuck .Get (itob (id )), cfg .EncJson )
if err != nil {
m .onErr (err )
}
}
ret = append (ret , r )
if limit > 0 && len (ret ) >= limit {
break
}
}
return ret
})
}
func (m *Memory ) Sync () error {
m .log ("sync..." )
m .mx .Lock ()
defer m .mx .Unlock ()
m .syncMx .Lock ()
defer m .syncMx .Unlock ()
m .writeDb (false )
m .log ("sync OK" )
return nil
}
func (m *Memory ) Match (
ctx context .Context , matcherFn MatcherFn ,
) ([]*amhist .MemoryRecord , error ) {
m .gcMx .RLock ()
var ret []*amhist .MemoryRecord
err := m .Db .View (func (tx *bbolt .Tx ) error {
b := tx .Bucket ([]byte (m .Mach .Id ()))
now := m .Mach .Time (nil ).ToIndex (m .Mach .StateNames ())
ret = matcherFn (now , b )
return nil
})
m .gcMx .RUnlock ()
if ctx .Err () != nil || m .Ctx .Err () != nil {
return nil , errors .Join (ctx .Err (), m .Ctx .Err ())
} else if err != nil {
return nil , err
} else if ret == nil {
return nil , nil
}
return ret , nil
}
func (m *Memory ) MachineRecord () *amhist .MachineRecord {
m .mx .Lock ()
defer m .mx .Unlock ()
cp := *m .machRec
return &cp
}
func (m *Memory ) Dispose () error {
if !m .disposed .CompareAndSwap (false , true ) {
return nil
}
m .mx .Lock ()
defer m .mx .Unlock ()
m .gcMx .Lock ()
defer m .gcMx .Unlock ()
return errors .Join (
m .Db .Close (),
m .Mach .DetachTracer (m .tr ),
)
}
func (m *Memory ) Config () amhist .BaseConfig {
return m .Cfg .BaseConfig
}
func (m *Memory ) Machine () am .Api {
return m .Mach
}
func (m *Memory ) encode (v any ) ([]byte , error ) {
if m .Cfg .EncJson {
return json .MarshalIndent (v , "" , " " )
}
return msgpack .Marshal (v )
}
func (m *Memory ) writeDb (rLocked bool ) {
if m .SavePending .Load () <= 0 {
return
}
q := m .queue
machRec := *m .machRec
times := q .times
q .times = nil
txs := q .txs
q .txs = nil
ids := q .ids
q .ids = nil
m .log ("writeDb for %d record" , len (times ))
l := len (times )
m .SavePending .Add (-int32 (l ))
go func () {
if rLocked {
defer m .syncMx .RUnlock ()
}
err := m .Db .Batch (func (dbTx *bbolt .Tx ) error {
bMachs := dbTx .Bucket ([]byte (BuckMachines ))
machIdBt := []byte (machRec .MachId )
b := dbTx .Bucket (machIdBt )
bTimes := b .Bucket ([]byte (BuckTimes ))
var bTxs *bbolt .Bucket
if m .Cfg .StoreTransitions {
bTxs = b .Bucket ([]byte (BuckTransitions ))
}
enc , err := m .encode (machRec )
if err != nil {
return err
}
err = bMachs .Put (machIdBt , enc )
if err != nil {
return err
}
for i , recTime := range times {
id := itob (ids [i ])
encTime , err := m .encode (recTime )
if err != nil {
return err
}
err = bTimes .Put (id , encTime )
if err != nil {
return err
}
if !m .Cfg .StoreTransitions {
continue
}
recTx := txs [i ]
encTx , err := m .encode (recTx )
if err != nil {
return err
}
err = bTxs .Put (id , encTx )
if err != nil {
return err
}
}
all := m .Saved .Add (uint64 (l ))
m .log ("saved %d records (total %d)" , l , all )
return nil
})
if err != nil {
m .onErr (err )
}
}()
}
func (m *Memory ) checkGc () {
sinceLastGc := m .SavedGc .Load ()
now := m .Saved .Load ()
if float32 (now -sinceLastGc ) <= float32 (m .Cfg .MaxRecords )*1.5 ||
!m .gcMx .TryLock () {
return
}
m .log ("gc..." )
go func () {
defer m .gcMx .Unlock ()
err := m .Db .Update (func (dbTx *bbolt .Tx ) error {
b := dbTx .Bucket ([]byte (m .Mach .Id ()))
bTimes := b .Bucket ([]byte (BuckTimes ))
var bTxs *bbolt .Bucket
if m .Cfg .StoreTransitions {
bTxs = b .Bucket ([]byte (BuckTransitions ))
}
i := 0
for id := m .nextId .Load () - uint64 (m .Cfg .MaxRecords ); id > 0 ; id -- {
i ++
err := bTimes .Delete (itob (id ))
if err != nil {
return err
}
if !m .Cfg .StoreTransitions {
continue
}
err = bTxs .Delete (itob (id ))
if err != nil {
m .onErr (err )
}
}
m .log ("gc done for %d records" , i )
return nil
})
if err != nil {
m .onErr (err )
}
m .SavedGc .Store (m .Saved .Load ())
}()
}
func (m *Memory ) log (msg string , args ...any ) {
if !m .Cfg .Log {
return
}
log .Printf (msg , args ...)
}
func GetMachine (db *bbolt .DB , id string ) (*amhist .MachineRecord , error ) {
var ret *amhist .MachineRecord
err := db .View (func (dbTx *bbolt .Tx ) error {
b := dbTx .Bucket ([]byte (BuckMachines ))
pack := b .Get ([]byte (id ))
if pack == nil {
return nil
}
return Decode (pack , ret , true )
})
return ret , err
}
func ListMachines (db *bbolt .DB ) ([]*amhist .MachineRecord , error ) {
ret := make ([]*amhist .MachineRecord , 0 )
err := db .View (func (tx *bbolt .Tx ) error {
b := tx .Bucket ([]byte (BuckMachines ))
c := b .Cursor ()
for k , v := c .First (); k != nil ; k , v = c .Next () {
rec := &amhist .MachineRecord {}
err := Decode (v , rec , true )
if err != nil {
return err
}
ret = append (ret , rec )
}
return nil
})
return ret , err
}
func DecTimeRecord (
machId string , id uint64 , v []byte , tryJson bool ,
) (*amhist .TimeRecord , error ) {
rec := &amhist .TimeRecord {}
err := Decode (v , rec , tryJson )
if err != nil {
return nil , err
}
return rec , nil
}
func DecTransitionRecord (
machId string , id uint64 , v []byte , tryJson bool ,
) (*amhist .TransitionRecord , error ) {
rec := &amhist .TransitionRecord {}
err := Decode (v , rec , tryJson )
if err != nil {
return nil , err
}
return rec , nil
}
func Decode (v []byte , out any , tryJson bool ) error {
if tryJson {
if err := json .Unmarshal (v , out ); err == nil {
return nil
}
}
return msgpack .Unmarshal (v , out )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .