// Package graph provides a graph or interconnected state-machines and their // states, based on the dbg telemetry protocol.
package graph // TODO fix GC import ( amhelp am ssrpc ss ssdbg ) type Vertex struct { StateName string MachId string } type Edge = graph.Edge[*Vertex] type EdgeData struct { // machine has a state MachHas *MachineHas // machine has an RPC connection to another machine MachConnectedTo bool // machine is a child of another machine MachChildOf bool // machine has pipes going to another machine MachPipesTo []*MachPipeTo // state has relations with other states StateRelation []*StateRelation } // Client: // - has State inherited:string auto:bool multi:bool // - connectedTo Client addr:string // - pipeTo Client states:map[string]string // - childOf Client // // State: // - relation type:require|add|remove State // - pipeTo Client|state add:bool type MachineHas struct { Inherited string Auto bool Multi bool } type StateRelation struct { RelType am.Relation } type MachPipeTo struct { FromState string ToState string MutType am.MutationType } type Connection struct { Edge *EdgeData Source *Vertex Target *Vertex } func hash( *Vertex) string { if .StateName != "" { return .MachId + ":" + .StateName } return .MachId } type Exportable struct { MsgTxs []*telemetry.DbgMsgTx } // Client represents a single state machine withing the network graph. type Client struct { Id string // TODO version schemas MsgSchema *telemetry.DbgMsgStruct LatestMsgTx *telemetry.DbgMsgTx LatestTimeSum uint64 LatestClock am.Time ConnId string } // ///// ///// ///// // ///// GRAPH // ///// ///// ///// type Graph struct { Server *am.Machine Clients map[string]*Client // TODO export G and Map // G is a directed graph of machines and states with metadata. G graph.Graph[string, *Vertex] // Map is a unidirectional mirror of g, without metadata. Map graph.Graph[string, *Vertex] } func ( *am.Machine) (*Graph, error) { if !.Has(ssdbg.ServerStates.Names()) { return nil, fmt.Errorf( "Graph.New: server machine %s does not implement ssdbg.ServerStates", .Id()) } := &Graph{ Server: , G: graph.New(hash, graph.Directed()), Map: graph.New(hash), Clients: make(map[string]*Client), } // err := m.BindHandlers(g) // if err != nil { // return nil, err // } return , nil } // Clone returns a deep clone of the graph. func ( *Graph) () (*Graph, error) { , := .G.Clone() if != nil { return nil, } , := .Map.Clone() if != nil { return nil, } := &Graph{ G: , Map: , Clients: make(map[string]*Client, len(.Clients)), } for , := range .Clients { .Clients[] = &Client{ Id: , MsgSchema: .MsgSchema, LatestClock: .LatestClock, LatestTimeSum: .LatestTimeSum, } } return , nil } func ( *Graph) () { .Clients = make(map[string]*Client) .G = graph.New(hash, graph.Directed()) .Map = graph.New(hash) } // Connection returns a Connection for the given source-target. func ( *Graph) (, string) (*Connection, error) { , := .G.Edge(, ) if != nil { return nil, } := .Properties.Data.(*EdgeData) , := .G.Vertex() if != nil { return nil, } , := .G.Vertex() if != nil { return nil, } return &Connection{ Edge: , Source: , Target: , }, nil } func ( *Graph) ( string, *telemetry.DbgMsgTx) { := .Clients[] var uint64 for , := range .Clocks { += } := .MsgSchema.StatesIndex // optimize space if len(.CalledStates) > 0 { .CalledStatesIdxs = amhelp.StatesToIndexes(, .CalledStates) .MachineID = "" .CalledStates = nil } // detect RPC connections - read arg "id" for HandshakeDone, being the ID of // the RPC client // TODO extract to a func if .LatestMsgTx != nil { := .LatestMsgTx := &am.Transition{ TimeBefore: .Clocks, TimeAfter: .Clocks, } , , := amhelp.GetTransitionStates(, ) // RPC conns (requires LogLevel2) := slices.Contains(.MsgSchema.Tags, "rpc-server") if slices.Contains(, ssrpc.ServerStates.HandshakeDone) && { for , := range .LogEntries { if !strings.HasPrefix(.Text, "[add] ") { continue } := strings.Split(strings.TrimRight(.Text, ")\n"), "(") for , := range strings.Split([1], " ") { := strings.Split(, "=") if [0] == "id" { := graph.EdgeData(&EdgeData{MachConnectedTo: true}) := .G.AddEdge([1], .Id, ) if != nil { // wait for the other mach to show up TODO better approach := .Server.WhenArgs(ss.InitClient, am.A{"id": [1]}, nil) go func() { <- if := .G.AddEdge([1], .Id, ); != nil { .Server.AddErr(fmt.Errorf("Graph.ParseMsg: %w", ), nil) return } if = .Map.AddEdge([1], .Id); != nil { .Server.AddErr(fmt.Errorf("Graph.ParseMsg: %w", ), nil) return } }() } else { if = .Map.AddEdge([1], .Id); != nil { .Server.AddErr(fmt.Errorf("Graph.ParseMsg: %w", ), nil) return } } } } } } } // TODO errors // var isErr bool // for _, name := range index { // if strings.HasPrefix(name, "Err") && msgTx.Is1(index, name) { // isErr = true // break // } // } // if isErr || msgTx.Is1(index, am.StateException) { // // prepend to errors TODO DB errors // // idx := SQL COUNT // c.errors = append([]int{idx}, c.errors...) // } _ = .parseMsgLog(, ) // TODO dedicated error state, enable once stable // if err != nil { // g.Mach.AddErr(fmt.Errorf("Graph.parseMsgLog: %w", err), nil) // } .LatestMsgTx = // TODO assert clocks .LatestClock = .Clocks .LatestTimeSum = } func ( *Graph) ( string) error { // TODO return nil } func ( *Graph) ( *telemetry.DbgMsgStruct) error { // init := .ID := &Client{ Id: , MsgSchema: , LatestClock: make(am.Time, len(.States)), } .Clients[] = // add machine := .G.AddVertex(&Vertex{ MachId: .Id, }) if != nil { return } _ = .Map.AddVertex(&Vertex{ MachId: .Id, }) // parent if .MsgSchema.Parent != "" { := graph.EdgeData(&EdgeData{MachChildOf: true}) = .G.AddEdge(.Id, .MsgSchema.Parent, ) if != nil { // wait for the parent to show up := .Server.WhenArgs(ss.InitClient, am.A{"id": .MsgSchema.Parent}, nil) go func() { <- = .G.AddEdge(.Id, .MsgSchema.Parent, ) if == nil { _ = .Map.AddEdge(.Id, .MsgSchema.Parent) } }() } else { _ = .Map.AddEdge(.Id, .MsgSchema.Parent) } } // add states for , := range .MsgSchema.States { // vertex = .G.AddVertex(&Vertex{ MachId: , StateName: , }) if != nil { return } _ = .Map.AddVertex(&Vertex{ MachId: , StateName: , }) // edge = .G.AddEdge(, +":"+, graph.EdgeData(&EdgeData{ MachHas: &MachineHas{ Auto: .Auto, Multi: .Multi, // TODO Inherited: "", }, })) if != nil { return } _ = .Map.AddEdge(, +":"+) } type struct { am.S am.Relation } // add relations for , := range .MsgSchema.States { // define := []{ {: .Require, : am.RelationRequire}, {: .Add, : am.RelationAdd}, {: .Remove, : am.RelationRemove}, } // per relation for , := range { // per state for , := range . { := + ":" + := + ":" + // update an existing edge if , := .G.Edge(, ); == nil { := .Properties.Data.(*EdgeData) .StateRelation = append(.StateRelation, &StateRelation{ RelType: ., }) = .G.UpdateEdge(, , graph.EdgeData()) if != nil { panic() } continue } // add if doesnt exist = .G.AddEdge(, , graph.EdgeData(&EdgeData{ StateRelation: []*StateRelation{ {RelType: .}, }, })) if != nil { panic() } _ = .Map.AddEdge(, ) } } } return nil } // private func ( *Graph) ( *Client, *telemetry.DbgMsgTx) error { // pre-tx log entries for , := range .PreLogEntries { := .parseMsgReader(, , ) if != nil { return } } // tx log entries for , := range .LogEntries { := .parseMsgReader(, , ) if != nil { return } } return nil } func ( *Graph) ( *Client, *am.LogEntry, *telemetry.DbgMsgTx, ) error { // NEW PIPE if strings.HasPrefix(.Text, "[pipe-in:add] ") || strings.HasPrefix(.Text, "[pipe-in:remove] ") || strings.HasPrefix(.Text, "[pipe-out:add] ") || strings.HasPrefix(.Text, "[pipe-out:remove] ") { := strings.HasPrefix(.Text, "[pipe-in:add] ") || strings.HasPrefix(.Text, "[pipe-out:add] ") := strings.HasPrefix(.Text, "[pipe-out") var []string if && { = strings.Split(.Text[len("[pipe-out:add] "):], " to ") } else if ! && { = strings.Split(.Text[len("[pipe-in:add] "):], " from ") } else if && ! { = strings.Split(.Text[len("[pipe-out:remove] "):], " to ") } else if ! && ! { = strings.Split(.Text[len("[pipe-in:remove] "):], " from ") } := am.MutationRemove if { = am.MutationAdd } // define what we know from this log line := [0] var string var string if { = .Id = [1] } else { = [1] = .Id } , := .G.Edge(, ) // edge exists - update if == nil { := .Properties.Data.(*EdgeData) := false // update the missing state from the other side of the pipe for , := range .MachPipesTo { if ! && .MutType == && .ToState == "" { .ToState = = true break } if && .MutType == && .FromState == "" { .FromState = = true break } } // add a new pipe to an existing edge TODO extract if ! { var *MachPipeTo if { = &MachPipeTo{ FromState: , MutType: , } } else { = &MachPipeTo{ ToState: , MutType: , } } .MachPipesTo = append(.MachPipesTo, ) } } else { // create a new edge with a single pipe := &MachPipeTo{ ToState: , MutType: , } if { = &MachPipeTo{ FromState: , } } := &EdgeData{MachPipesTo: []*MachPipeTo{}} := .G.AddEdge(, , graph.EdgeData()) if != nil { return } _ = .Map.AddEdge(, ) } // REMOVE PIPE } else if strings.HasPrefix(.Text, "[pipe:gc] ") { := strings.Split(.Text, " ") := [1] // TODO make it safe // outbound , := .G.AdjacencyMap() if != nil { panic() } for , := range [] { := .G.RemoveEdge(, .Target) if != nil { panic() } _ = .Map.RemoveEdge(, .Target) } // inbound , := .G.PredecessorMap() if != nil { panic() } for , := range [] { := .G.RemoveEdge(.Source, ) if != nil { panic() } _ = .Map.RemoveEdge(.Source, ) } } // TODO detached pipe handlers return nil }