// Package prometheus provides Prometheus metrics for asyncmachine. // Metrics are collected from machine's transitions and states.
package prometheus // TODO measure auto added states // TODO collect also total numbers? // TODO race in NewSUbmachine BindToPusher import ( amhelp am ) const EnvPromPushUrl = "AM_PROM_PUSH_URL" type PromInheritTracer struct { *am.NoOpTracer m *Metrics } // PromTracer is [am.Tracer] for tracing state machines. type PromTracer struct { *am.NoOpTracer m *Metrics txStartTime time.Time prevTime uint64 } func ( *PromTracer) ( am.Api, am.Schema) { // TODO refresh state and relation metrics } func ( *PromTracer) ( string) { .m.Close() } func ( *PromTracer) (, am.Api) { // skip RPC machines := os.Getenv("AM_RPC_DBG") != "" for , := range .Tags() { if strings.HasPrefix(, "rpc-") && ! { return } } // bind metrics and configured exporters := BindMach() if .m.pusher != nil { BindToPusher(, .m.pusher) } if .m.registry != nil { BindToRegistry(, .m.registry) } // bind inheritance tracer := .BindTracer(&PromInheritTracer{ m: , }) if != nil { .AddErr(, nil) return } .m.childMetrics = append(.m.childMetrics, ) } func ( *PromTracer) ( *am.Transition) { .txStartTime = time.Now() } func ( *PromTracer) ( *am.Transition) { if .m.closed || !.IsAccepted.Load() { return } := .Api.StateNames() .m.mx.Lock() defer .m.mx.Unlock() := .QueueLen .m.queueSize = uint64() .m.queueSizeLen++ .m.stepsAmount += uint64(len(.Steps)) .m.stepsAmountLen++ // TODO log slow txs (Opts and default to 1ms) .m.txTime += uint64(time.Since(.txStartTime).Microseconds()) .m.txTimeLen++ // executed handlers := 0 for , := range .Steps { if .Type == am.StepHandler { ++ } } .m.handlersAmount += uint64() .m.handlersAmountLen++ // tx states , , := amhelp.GetTransitionStates(, ) switch .Mutation.Type { case am.MutationAdd: .m.statesAdded += uint64(len()) .m.statesAddedLen++ case am.MutationRemove: .m.statesRemoved += uint64(len()) .m.statesRemovedLen++ case am.MutationSet: .m.statesAdded += uint64(len()) .m.statesAddedLen++ .m.statesRemoved += uint64(len()) .m.statesRemovedLen++ } .m.statesTouched += uint64(len()) .m.statesTouchedLen++ // time sum := .Api.Time(nil).Sum(nil) .m.txTick += - .prevTime .m.txTickLen++ .prevTime = // active / inactive states := 0 := 0 for , := range .TimeBefore { if am.IsActiveTick() { ++ } else { ++ } } .m.statesActiveAmount += uint64() .m.statesActiveAmountLen++ .m.statesInactiveAmount += uint64() .m.statesInactiveAmountLen++ // Exception if slices.Contains(.TargetStates(), am.StateException) { .m.exceptionsCount++ } .m.transitionsCount++ } // Metrics is a set of Prometheus metrics for asyncmachine. type Metrics struct { // Tracer is a Prometheus tracer for the machine. You can detach it any time // using Machine.DetachTracer. Tracer am.Tracer mx sync.Mutex closed bool // mach definition // number of registered states StatesAmount prometheus.Gauge // number of relations for all registered states RelAmount prometheus.Gauge // number of state referenced by relations for all registered states RefStatesAmount prometheus.Gauge // tx data // current number of queued transitions (per transition) QueueSize prometheus.Gauge queueSize uint64 queueSizeLen uint // transition duration in machine's clock (ticks per tx) TxTick prometheus.Gauge txTick uint64 txTickLen uint // number of active states (per transition) StatesActiveAmount prometheus.Gauge statesActiveAmount uint64 statesActiveAmountLen uint // number of inactive states (per transition) StatesInactiveAmount prometheus.Gauge statesInactiveAmount uint64 statesInactiveAmountLen uint // number of states added (per transition) StatesAdded prometheus.Gauge statesAdded uint64 statesAddedLen uint // number of states removed (per transition) StatesRemoved prometheus.Gauge statesRemoved uint64 statesRemovedLen uint // number of states touched (per transition) StatesTouched prometheus.Gauge statesTouched uint64 statesTouchedLen uint // number of errors ExceptionsCount prometheus.Gauge exceptionsCount uint64 // number of transitions TransitionsCount prometheus.Gauge transitionsCount uint64 // stats // steps per transition StepsAmount prometheus.Gauge stepsAmount uint64 stepsAmountLen uint // amount of executed handlers per tx HandlersAmount prometheus.Gauge handlersAmount uint64 handlersAmountLen uint // transition time TxTime prometheus.Gauge txTime uint64 txTimeLen uint pusher *push.Pusher registry *prometheus.Registry childMetrics []*Metrics } func newMetrics( am.Api) *Metrics { := telemetry.NormalizeId(.Id()) return &Metrics{ // mach definition StatesAmount: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "states_" + , Help: "Number of registered states", Namespace: "am", }), RelAmount: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "relations_" + , Help: "Number of relations for all states", Namespace: "am", }), RefStatesAmount: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "ref_states_" + , Help: "Number of states referenced by relations", Namespace: "am", }), // tx data QueueSize: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "queue_size_" + , Help: "Average queue size", Namespace: "am", }), TxTick: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "tx_ticks_" + , Help: "Average transition machine time taken (ticks)", Namespace: "am", }), ExceptionsCount: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "exceptions_" + , Help: "Number of transitions with Exception active", Namespace: "am", }), TransitionsCount: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "transitions_" + , Help: "Number of transitions", Namespace: "am", }), StatesAdded: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "states_added_" + , Help: "Average amount of states added", Namespace: "am", }), StatesRemoved: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "states_removed_" + , Help: "Average amount of states removed", Namespace: "am", }), StatesTouched: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "states_touched_" + , Help: "Average amount of states touched", Namespace: "am", }), StepsAmount: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "steps_" + , Help: "Average amount of steps per transition", Namespace: "am", }), HandlersAmount: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "handlers_" + , Help: "Average amount of executed handlers per tx", Namespace: "am", }), TxTime: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "tx_time_" + , Help: "Average transition human time taken (μs)", Namespace: "am", }), StatesActiveAmount: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "states_active_" + , Help: "Average amount of active states", Namespace: "am", }), StatesInactiveAmount: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "states_inactive_" + , Help: "Average amount of inactive states", Namespace: "am", }), } } // Sync synchronizes the metrics with the current counters, averaging certain // values and using totals of others. After that the counters are reset to 0. // // Sync should be called right before pushing or scraping. func ( *Metrics) () { if .closed { return } .mx.Lock() defer .mx.Unlock() // update the gauges .QueueSize.Set(average(.queueSize, .queueSizeLen)) .TxTick.Set(average(.txTick, .txTickLen)) .StatesActiveAmount.Set( average(.statesActiveAmount, .statesActiveAmountLen)) .StatesInactiveAmount.Set( average(.statesInactiveAmount, .statesInactiveAmountLen)) .StatesAdded.Set(average(.statesAdded, .statesAddedLen)) .StatesRemoved.Set(average(.statesRemoved, .statesRemovedLen)) .StatesTouched.Set(average(.statesTouched, .statesTouchedLen)) .ExceptionsCount.Set(float64(.exceptionsCount)) .TransitionsCount.Set(float64(.transitionsCount)) .StepsAmount.Set(average(.stepsAmount, .stepsAmountLen)) .HandlersAmount.Set(average(.handlersAmount, .handlersAmountLen)) .TxTime.Set(average(.txTime, .txTimeLen)) // reset buffers .queueSize = 0 .queueSizeLen = 0 .txTick = 0 .txTickLen = 0 .statesActiveAmount = 0 .statesActiveAmountLen = 0 .statesInactiveAmount = 0 .statesInactiveAmountLen = 0 .statesAdded = 0 .statesAddedLen = 0 .statesRemoved = 0 .statesRemovedLen = 0 .statesTouched = 0 .statesTouchedLen = 0 .exceptionsCount = 0 .stepsAmount = 0 .stepsAmountLen = 0 .handlersAmount = 0 .handlersAmountLen = 0 .txTime = 0 .txTimeLen = 0 .transitionsCount = 0 // sync children for , := range .childMetrics { .() } } // Close sets all gauges to 0. func ( *Metrics) () { // close only once if .closed { return } .closed = true // set all gauges to 0 .StatesAmount.Set(0) .RelAmount.Set(0) .RefStatesAmount.Set(0) .QueueSize.Set(0) .TxTick.Set(0) .StatesActiveAmount.Set(0) .StatesInactiveAmount.Set(0) .StatesAdded.Set(0) .StatesRemoved.Set(0) .StatesTouched.Set(0) .ExceptionsCount.Set(0) .TransitionsCount.Set(0) .StepsAmount.Set(0) .HandlersAmount.Set(0) .TxTime.Set(0) } func ( *Metrics) ( *push.Pusher) { .pusher = } func ( *Metrics) ( *prometheus.Registry) { .registry = } // BindMach bind transitions to Prometheus metrics. func ( am.Api) *Metrics { := newMetrics() .Log("[bind] prometheus metrics") // state & relations // TODO bind in SchemaChange .StatesAmount.Set(float64(len(.StateNames()))) := 0 := 0 for , := range .Schema() { if .Add != nil { ++ += len(.Add) } if .Remove != nil { ++ += len(.Remove) } if .Require != nil { ++ += len(.Require) } if .After != nil { ++ += len(.After) } } .RelAmount.Set(float64()) .RefStatesAmount.Set(float64()) .Tracer = &PromTracer{m: } _ = .BindTracer(.Tracer) return } func ( *Metrics, *push.Pusher) { .SetPusher() // transition .Collector(.TransitionsCount) .Collector(.QueueSize) .Collector(.TxTick) .Collector(.StepsAmount) .Collector(.HandlersAmount) .Collector(.StatesAdded) .Collector(.StatesRemoved) .Collector(.StatesTouched) // machine states .Collector(.StatesAmount) .Collector(.RelAmount) .Collector(.RefStatesAmount) .Collector(.StatesActiveAmount) .Collector(.StatesInactiveAmount) // tx time .Collector(.TxTime) // errors .Collector(.ExceptionsCount) } func ( *Metrics, *prometheus.Registry) { .SetRegistry() // transition .MustRegister(.TransitionsCount) .MustRegister(.QueueSize) .MustRegister(.TxTick) .MustRegister(.StepsAmount) .MustRegister(.HandlersAmount) .MustRegister(.StatesAdded) .MustRegister(.StatesRemoved) .MustRegister(.StatesTouched) // machine states .MustRegister(.StatesAmount) .MustRegister(.RelAmount) .MustRegister(.RefStatesAmount) .MustRegister(.StatesActiveAmount) .MustRegister(.StatesInactiveAmount) // tx time .MustRegister(.TxTime) // errors .MustRegister(.ExceptionsCount) } func average( uint64, uint) float64 { if == 0 { return 0 } return float64( / uint64()) } // MachMetricsEnv bind an OpenTelemetry tracer to [mach], based on environment // variables: // - AM_SERVICE (required) // - AM_PROM_PUSH_URL (required) // // This tracer is inherited by submachines, and this function applies only to // top-level machines. func ( am.Api) *Metrics { if .ParentId() != "" { return nil } := os.Getenv(EnvPromPushUrl) := os.Getenv(telemetry.EnvService) // return early if any required variables are empty if == "" || == "" { return nil } // bind metrics via a pusher // TODO global ticker := push.New(, telemetry.NormalizeId()) // bind transition to metrics := BindMach() BindToPusher(, ) // TODO config := time.NewTicker(15 * time.Second) go func() { for { select { // sync every 15 secs case <-.C: .Sync() := .Push() if != nil { .AddErr(, nil) } case <-.Ctx().Done(): // pass } } }() return }