package main
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/ic2hrmk/promtail"
"github.com/joho/godotenv"
"github.com/prometheus/client_golang/prometheus/push"
"github.com/sethvargo/go-envconfig"
"github.com/pancsta/asyncmachine-go/examples/tree_state_source/states"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
amtele "github.com/pancsta/asyncmachine-go/pkg/telemetry"
amprom "github.com/pancsta/asyncmachine-go/pkg/telemetry/prometheus"
)
var ss = states .FlightsStates
const (
serviceName = "tree_state_source"
promPushFreq = 15 * time .Second
mutationFreq = 1 * time .Second
)
type Node struct {
Name string `env:"TST_NAME"`
ParentAddr string `env:"TST_PARENT_ADDR"`
Addr string `env:"TST_ADDR"`
HttpAddr string `env:"TST_HTTP_ADDR"`
LokiAddr string `env:"LOKI_ADDR"`
PushGatewayUrl string `env:"PUSH_GATEWAY_URL"`
Service string
Loki promtail .Client
Metrics []*amprom .Metrics
Prom *push .Pusher
}
func init() {
_ = godotenv .Load ()
}
func main() {
ctx , cancel := context .WithCancel (context .Background ())
defer cancel ()
node := ReadEnv (ctx )
if node .Addr == "" {
panic ("TST_ADDR required" )
}
sigChan := make (chan os .Signal , 1 )
signal .Notify (sigChan , syscall .SIGINT , syscall .SIGTERM )
go func () {
<-sigChan
cancel ()
}()
initTelemetry (node )
var mach am .Api
if node .ParentAddr == "" {
var err error
mach , err = localWorker (ctx , node )
if err != nil {
panic (err )
}
} else {
client , err := replicant (ctx , node )
if err != nil {
panic (err )
}
mach = client .Worker
}
exportState (ctx , node , mach )
go httpServer (ctx , node , mach )
blockEcho (ctx , node , mach )
if node .Prom != nil {
time .Sleep (500 * time .Millisecond )
err := node .Prom .Push ()
if err != nil {
panic (err )
}
time .Sleep (500 * time .Millisecond )
}
if node .Loki != nil {
node .Loki .Close ()
}
println ("bye" )
}
func ReadEnv (ctx context .Context ) *Node {
config := &Node {}
err := envconfig .Process (ctx , config )
if err != nil {
panic ("Failed to load config: " + err .Error())
}
if config .ParentAddr == "" {
config .Name = "root"
}
config .Service = serviceName
if config .Name != "" {
config .Service += "_" + config .Name
}
return config
}
func localWorker(ctx context .Context , node *Node ) (*am .Machine , error ) {
worker , err := am .NewCommon (ctx , node .Name , states .FlightsSchema ,
ss .Names (), nil , nil , nil )
if err != nil {
panic (err )
}
exportTelemetry (node , worker )
worker .Add1 (ss .Ready , nil )
return worker , err
}
func replicant(ctx context .Context , node *Node ) (*arpc .Client , error ) {
client , err := arpc .NewClient (ctx , node .ParentAddr , node .Name , states .FlightsSchema , states .FlightsStates .Names (), nil )
if err != nil {
panic (err )
}
exportTelemetry (node , client .Mach )
fmt .Println ("Connecting to " + node .ParentAddr )
client .Start ()
err = amhelp .WaitForAll (ctx , 5 *time .Second ,
client .Mach .When1 (ssrpc .ClientStates .Ready , ctx ))
if err != nil {
panic (err )
}
exportTelemetry (node , client .Worker )
return client , err
}
func exportTelemetry(node *Node , mach am .Api ) {
amhelp .MachDebugEnv (mach )
if node .Loki != nil {
amtele .BindLokiLogger (mach , node .Loki )
}
if node .Prom != nil {
metrics := amprom .BindMach (mach )
amprom .BindToPusher (metrics , node .Prom )
node .Metrics = append (node .Metrics , metrics )
}
}
func initTelemetry(node *Node ) {
var err error
if node .LokiAddr != "" {
identifiers := map [string ]string {
"service_name" : amtele .NormalizeId (node .Service ),
}
node .Loki , err = promtail .NewJSONv1Client (node .LokiAddr , identifiers )
if err != nil {
panic (err )
}
}
if node .PushGatewayUrl != "" {
node .Prom = push .New (node .PushGatewayUrl , amtele .NormalizeId (node .Service ))
} else {
node .Prom = nil
}
}
func exportState(ctx context .Context , node *Node , mach am .Api ) {
mux , err := arpc .NewMux (ctx , node .Name , nil , &arpc .MuxOpts {Parent : mach })
if err != nil {
panic (err )
}
mux .NewServerFn = func (num int , conn net .Conn ) (*arpc .Server , error ) {
srvName := fmt .Sprintf ("%s-%d" , node .Name , num )
s , err := arpc .NewServer (ctx , node .Addr , srvName , mach , &arpc .ServerOpts {Parent : mux .Mach })
if err != nil {
return nil , err
}
exportTelemetry (node , s .Mach )
return s , nil
}
mux .Addr = node .Addr
exportTelemetry (node , mux .Mach )
fmt .Println ("Starting on " + node .Addr )
mux .Start ()
}
func blockEcho(ctx context .Context , node *Node , mach am .Api ) {
var lastPush time .Time
t := time .NewTicker (mutationFreq )
c := 0
for ctx .Err () == nil {
c ++
select {
case <- ctx .Done ():
case <- t .C :
if mach .Id () == "root" {
randMut (mach )
}
if time .Since (lastPush ) < promPushFreq {
continue
}
fmt .Printf ("Time: %d\n" , mach .Time (nil ).Sum (nil ))
lastPush = time .Now ()
if node .Prom == nil {
continue
}
for _ , m := range node .Metrics {
m .Sync ()
}
err := node .Prom .Push ()
if err != nil {
panic (err )
}
}
}
}
func randMut(mach am .Api ) {
amount := len (ss .Names ())
pick := rand .Intn (amount )
state1 := ss .Names ()[pick ]
pick = rand .Intn (amount )
state2 := ss .Names ()[pick ]
pick = rand .Intn (amount )
state3 := ss .Names ()[pick ]
skip := am .S {am .StateException , ssrpc .WorkerStates .SendPayload }
for _ , s := range skip {
if state1 == s || state2 == s || state3 == s {
return
}
}
mach .Add (am .S {state1 , state2 , state3 }, nil )
if mach .IsErr () {
fmt .Printf ("Error: %s" , mach .Err ())
mach .Remove1 (am .StateException , nil )
}
}
func httpServer(ctx context .Context , node *Node , worker am .Api ) {
if node .HttpAddr == "" {
return
}
server := &http .Server {
Addr : node .HttpAddr ,
Handler : http .DefaultServeMux ,
}
http .HandleFunc ("/" , func (w http .ResponseWriter , r *http .Request ) {
fmt .Fprintln (w , worker .ActiveStates (nil ))
})
go func () {
fmt .Println ("Starting http on " + node .HttpAddr )
if err := server .ListenAndServe (); err != nil && !errors .Is (err , http .ErrServerClosed ) {
panic (err )
}
}()
<-ctx .Done ()
_ = server .Shutdown (context .Background ())
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .