package main

import (
	
	
	
	
	
	
	
	
	
	

	
	
	
	

	
	amhelp 
	am 
	arpc 
	ssrpc 
	amtele 
	amprom 
)

var ss = states.FlightsStates

const (
	serviceName = "tree_state_source"
	// promPushFreq is the frequency of pushing metrics to Prometheus.
	promPushFreq = 15 * time.Second
	// casual viewing
	mutationFreq = 1 * time.Second
	// load testing
	// mutationFreq = 5 * time.Millisecond
)

type Node struct {
	// config
	Name       string `env:"TST_NAME"`
	ParentAddr string `env:"TST_PARENT_ADDR"`
	Addr       string `env:"TST_ADDR"`
	HttpAddr   string `env:"TST_HTTP_ADDR"`

	// telemetry
	LokiAddr       string `env:"LOKI_ADDR"`
	PushGatewayUrl string `env:"PUSH_GATEWAY_URL"`

	Service string
	Loki    promtail.Client
	Metrics []*amprom.Metrics
	Prom    *push.Pusher
}

func init() {
	// load .env
	_ = godotenv.Load()

	// am-dbg is required for debugging, go run it
	// go run github.com/pancsta/asyncmachine-go/tools/cmd/am-dbg@latest
	// amhelp.EnableDebugging(false)
	// amhelp.SetEnvLogLevel(am.LogOps)
}

func main() {
	,  := context.WithCancel(context.Background())
	defer ()
	 := ReadEnv()

	if .Addr == "" {
		panic("TST_ADDR required")
	}

	// handle exit
	 := make(chan os.Signal, 1)
	signal.Notify(, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-
		()
	}()

	initTelemetry()

	// STATE SOURCE

	var  am.Api
	// RPC source
	if .ParentAddr == "" {
		var  error
		,  = localWorker(, )
		if  != nil {
			panic()
		}

	} else {
		// RPC repeater
		,  := replicant(, )
		if  != nil {
			panic()
		}
		 = .Worker
	}

	// EXPORT

	exportState(, , )
	go httpServer(, , )

	// RAND TRAFFIC

	blockEcho(, , )

	// EXIT

	if .Prom != nil {

		time.Sleep(500 * time.Millisecond)
		 := .Prom.Push()
		if  != nil {
			panic()
		}
		time.Sleep(500 * time.Millisecond)
	}

	if .Loki != nil {
		.Loki.Close()
	}

	println("bye")
}

func ( context.Context) *Node {
	 := &Node{}
	 := envconfig.Process(, )
	if  != nil {
		panic("Failed to load config: " + .Error())
	}

	if .ParentAddr == "" {
		.Name = "root"
	}

	.Service = serviceName
	if .Name != "" {
		.Service += "_" + .Name
	}

	return 
}

func localWorker( context.Context,  *Node) (*am.Machine, error) {
	// worker state machine
	,  := am.NewCommon(, .Name, states.FlightsSchema,
		ss.Names(), nil, nil, nil)
	if  != nil {
		panic()
	}
	exportTelemetry(, )
	.Add1(ss.Ready, nil)

	return , 
}

func replicant( context.Context,  *Node) (*arpc.Client, error) {
	// RPC client
	,  := arpc.NewClient(, .ParentAddr, .Name, states.FlightsSchema, states.FlightsStates.Names(), nil)
	if  != nil {
		panic()
	}
	exportTelemetry(, .Mach)
	fmt.Println("Connecting to " + .ParentAddr)
	.Start()
	 = amhelp.WaitForAll(, 5*time.Second,
		.Mach.When1(ssrpc.ClientStates.Ready, ))
	if  != nil {
		panic()
	}
	exportTelemetry(, .Worker)

	return , 
}

func exportTelemetry( *Node,  am.Api) {
	amhelp.MachDebugEnv()

	if .Loki != nil {
		amtele.BindLokiLogger(, .Loki)
	}

	// prom metrics
	if .Prom != nil {
		 := amprom.BindMach()
		amprom.BindToPusher(, .Prom)
		.Metrics = append(.Metrics, )
	}
}

func initTelemetry( *Node) {
	var  error

	// loki logging
	if .LokiAddr != "" {
		 := map[string]string{
			"service_name": amtele.NormalizeId(.Service),
		}
		.Loki,  = promtail.NewJSONv1Client(.LokiAddr, )
		if  != nil {
			panic()
		}
	}

	// prometheus metrics
	if .PushGatewayUrl != "" {
		.Prom = push.New(.PushGatewayUrl, amtele.NormalizeId(.Service))
	} else {
		// envconfig magic...
		.Prom = nil
	}
}

func exportState( context.Context,  *Node,  am.Api) {
	// RPC repeater via mux
	,  := arpc.NewMux(, .Name, nil, &arpc.MuxOpts{Parent: })
	if  != nil {
		panic()
	}
	.NewServerFn = func( int,  net.Conn) (*arpc.Server, error) {
		 := fmt.Sprintf("%s-%d", .Name, )
		,  := arpc.NewServer(, .Addr, , , &arpc.ServerOpts{Parent: .Mach})
		if  != nil {
			return nil, 
		}
		exportTelemetry(, .Mach)

		return , nil
	}
	.Addr = .Addr
	exportTelemetry(, .Mach)
	fmt.Println("Starting on " + .Addr)
	.Start()
}

// blockEcho blocks and:
// - creates a random mutation (for the root state machine only)
// - prints the current machine time
// - pushes to prometheus
func blockEcho( context.Context,  *Node,  am.Api) {
	var  time.Time
	 := time.NewTicker(mutationFreq)
	 := 0

	for .Err() == nil {
		++

		select {
		case <-.Done():

		case <-.C:
			// mutate only the root source machine
			if .Id() == "root" {
				randMut()
			}

			// push prom for all machines
			if time.Since() < promPushFreq {
				continue
			}
			fmt.Printf("Time: %d\n", .Time(nil).Sum(nil))
			 = time.Now()
			if .Prom == nil {
				continue
			}
			for ,  := range .Metrics {
				.Sync()
			}
			 := .Prom.Push()
			if  != nil {
				panic()
			}
		}
	}
}

// randMut causes a random Add mutation of 2 states.
func randMut( am.Api) {
	 := len(ss.Names())

	 := rand.Intn()
	 := ss.Names()[]
	 = rand.Intn()
	 := ss.Names()[]
	 = rand.Intn()
	 := ss.Names()[]

	 := am.S{am.StateException, ssrpc.WorkerStates.SendPayload}
	for ,  := range  {
		if  ==  ||  ==  ||  ==  {
			return
		}
	}

	.Add(am.S{, , }, nil)
	if .IsErr() {
		fmt.Printf("Error: %s", .Err())
		.Remove1(am.StateException, nil)
	}
}

func httpServer( context.Context,  *Node,  am.Api) {
	if .HttpAddr == "" {
		return
	}

	 := &http.Server{
		Addr:    .HttpAddr,
		Handler: http.DefaultServeMux,
	}

	http.HandleFunc("/", func( http.ResponseWriter,  *http.Request) {
		fmt.Fprintln(, .ActiveStates(nil))
	})

	go func() {
		fmt.Println("Starting http on " + .HttpAddr)
		if  := .ListenAndServe();  != nil && !errors.Is(, http.ErrServerClosed) {
			panic()
		}
	}()

	<-.Done()
	_ = .Shutdown(context.Background())
}