package graph
import (
"fmt"
"slices"
"strings"
"github.com/dominikbraun/graph"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
"github.com/pancsta/asyncmachine-go/pkg/telemetry"
ss "github.com/pancsta/asyncmachine-go/tools/debugger/states"
ssdbg "github.com/pancsta/asyncmachine-go/tools/debugger/states"
)
type Vertex struct {
StateName string
MachId string
}
type Edge = graph .Edge [*Vertex ]
type EdgeData struct {
MachHas *MachineHas
MachConnectedTo bool
MachChildOf bool
MachPipesTo []*MachPipeTo
StateRelation []*StateRelation
}
type MachineHas struct {
Inherited string
Auto bool
Multi bool
}
type StateRelation struct {
RelType am .Relation
}
type MachPipeTo struct {
FromState string
ToState string
MutType am .MutationType
}
type Connection struct {
Edge *EdgeData
Source *Vertex
Target *Vertex
}
func hash(c *Vertex ) string {
if c .StateName != "" {
return c .MachId + ":" + c .StateName
}
return c .MachId
}
type Exportable struct {
MsgTxs []*telemetry .DbgMsgTx
}
type Client struct {
Id string
MsgSchema *telemetry .DbgMsgStruct
LatestMsgTx *telemetry .DbgMsgTx
LatestTimeSum uint64
LatestClock am .Time
ConnId string
}
type Graph struct {
Server *am .Machine
Clients map [string ]*Client
G graph .Graph [string , *Vertex ]
Map graph .Graph [string , *Vertex ]
}
func New (server *am .Machine ) (*Graph , error ) {
if !server .Has (ssdbg .ServerStates .Names ()) {
return nil , fmt .Errorf (
"Graph.New: server machine %s does not implement ssdbg.ServerStates" ,
server .Id ())
}
g := &Graph {
Server : server ,
G : graph .New (hash , graph .Directed ()),
Map : graph .New (hash ),
Clients : make (map [string ]*Client ),
}
return g , nil
}
func (g *Graph ) Clone () (*Graph , error ) {
c1 , err := g .G .Clone ()
if err != nil {
return nil , err
}
c2 , err := g .Map .Clone ()
if err != nil {
return nil , err
}
g2 := &Graph {
G : c1 ,
Map : c2 ,
Clients : make (map [string ]*Client , len (g .Clients )),
}
for id , c := range g .Clients {
g2 .Clients [id ] = &Client {
Id : id ,
MsgSchema : c .MsgSchema ,
LatestClock : c .LatestClock ,
LatestTimeSum : c .LatestTimeSum ,
}
}
return g2 , nil
}
func (g *Graph ) Clear () {
g .Clients = make (map [string ]*Client )
g .G = graph .New (hash , graph .Directed ())
g .Map = graph .New (hash )
}
func (g *Graph ) Connection (source , target string ) (*Connection , error ) {
edge , err := g .G .Edge (source , target )
if err != nil {
return nil , err
}
data := edge .Properties .Data .(*EdgeData )
targetVert , err := g .G .Vertex (target )
if err != nil {
return nil , err
}
sourceVert , err := g .G .Vertex (source )
if err != nil {
return nil , err
}
return &Connection {
Edge : data ,
Source : sourceVert ,
Target : targetVert ,
}, nil
}
func (g *Graph ) ParseMsg (id string , msgTx *telemetry .DbgMsgTx ) {
c := g .Clients [id ]
var sum uint64
for _ , v := range msgTx .Clocks {
sum += v
}
index := c .MsgSchema .StatesIndex
if len (msgTx .CalledStates ) > 0 {
msgTx .CalledStatesIdxs = amhelp .StatesToIndexes (index ,
msgTx .CalledStates )
msgTx .MachineID = ""
msgTx .CalledStates = nil
}
if c .LatestMsgTx != nil {
prevTx := c .LatestMsgTx
fakeTx := &am .Transition {
TimeBefore : prevTx .Clocks ,
TimeAfter : msgTx .Clocks ,
}
added , _ , _ := amhelp .GetTransitionStates (fakeTx , index )
isRpcServer := slices .Contains (c .MsgSchema .Tags , "rpc-server" )
if slices .Contains (added , ssrpc .ServerStates .HandshakeDone ) && isRpcServer {
for _ , item := range msgTx .LogEntries {
if !strings .HasPrefix (item .Text , "[add] " ) {
continue
}
line := strings .Split (strings .TrimRight (item .Text , ")\n" ), "(" )
for _ , arg := range strings .Split (line [1 ], " " ) {
a := strings .Split (arg , "=" )
if a [0 ] == "id" {
data := graph .EdgeData (&EdgeData {MachConnectedTo : true })
err := g .G .AddEdge (a [1 ], c .Id , data )
if err != nil {
when := g .Server .WhenArgs (ss .InitClient , am .A {"id" : a [1 ]}, nil )
go func () {
<-when
if err := g .G .AddEdge (a [1 ], c .Id , data ); err != nil {
g .Server .AddErr (fmt .Errorf ("Graph.ParseMsg: %w" , err ), nil )
return
}
if err = g .Map .AddEdge (a [1 ], c .Id ); err != nil {
g .Server .AddErr (fmt .Errorf ("Graph.ParseMsg: %w" , err ), nil )
return
}
}()
} else {
if err = g .Map .AddEdge (a [1 ], c .Id ); err != nil {
g .Server .AddErr (fmt .Errorf ("Graph.ParseMsg: %w" , err ), nil )
return
}
}
}
}
}
}
}
_ = g .parseMsgLog (c , msgTx )
c .LatestMsgTx = msgTx
c .LatestClock = msgTx .Clocks
c .LatestTimeSum = sum
}
func (g *Graph ) RemoveClient (id string ) error {
return nil
}
func (g *Graph ) AddClient (msg *telemetry .DbgMsgStruct ) error {
id := msg .ID
c := &Client {
Id : id ,
MsgSchema : msg ,
LatestClock : make (am .Time , len (msg .States )),
}
g .Clients [id ] = c
err := g .G .AddVertex (&Vertex {
MachId : c .Id ,
})
if err != nil {
return err
}
_ = g .Map .AddVertex (&Vertex {
MachId : c .Id ,
})
if c .MsgSchema .Parent != "" {
data := graph .EdgeData (&EdgeData {MachChildOf : true })
err = g .G .AddEdge (c .Id , c .MsgSchema .Parent , data )
if err != nil {
when := g .Server .WhenArgs (ss .InitClient ,
am .A {"id" : c .MsgSchema .Parent }, nil )
go func () {
<-when
err = g .G .AddEdge (c .Id , c .MsgSchema .Parent , data )
if err == nil {
_ = g .Map .AddEdge (c .Id , c .MsgSchema .Parent )
}
}()
} else {
_ = g .Map .AddEdge (c .Id , c .MsgSchema .Parent )
}
}
for name , props := range c .MsgSchema .States {
err = g .G .AddVertex (&Vertex {
MachId : id ,
StateName : name ,
})
if err != nil {
return err
}
_ = g .Map .AddVertex (&Vertex {
MachId : id ,
StateName : name ,
})
err = g .G .AddEdge (id , id +":" +name , graph .EdgeData (&EdgeData {
MachHas : &MachineHas {
Auto : props .Auto ,
Multi : props .Multi ,
Inherited : "" ,
},
}))
if err != nil {
return err
}
_ = g .Map .AddEdge (id , id +":" +name )
}
type relation struct {
states am .S
relType am .Relation
}
for name , state := range c .MsgSchema .States {
toAdd := []relation {
{states : state .Require , relType : am .RelationRequire },
{states : state .Add , relType : am .RelationAdd },
{states : state .Remove , relType : am .RelationRemove },
}
for _ , item := range toAdd {
for _ , relState := range item .states {
from := id + ":" + name
to := id + ":" + relState
if edge , err := g .G .Edge (from , to ); err == nil {
data := edge .Properties .Data .(*EdgeData )
data .StateRelation = append (data .StateRelation , &StateRelation {
RelType : item .relType ,
})
err = g .G .UpdateEdge (from , to , graph .EdgeData (data ))
if err != nil {
panic (err )
}
continue
}
err = g .G .AddEdge (from , to , graph .EdgeData (&EdgeData {
StateRelation : []*StateRelation {
{RelType : item .relType },
},
}))
if err != nil {
panic (err )
}
_ = g .Map .AddEdge (from , to )
}
}
}
return nil
}
func (g *Graph ) parseMsgLog (c *Client , msgTx *telemetry .DbgMsgTx ) error {
for _ , entry := range msgTx .PreLogEntries {
err := g .parseMsgReader (c , entry , msgTx )
if err != nil {
return err
}
}
for _ , entry := range msgTx .LogEntries {
err := g .parseMsgReader (c , entry , msgTx )
if err != nil {
return err
}
}
return nil
}
func (g *Graph ) parseMsgReader (
c *Client , log *am .LogEntry , tx *telemetry .DbgMsgTx ,
) error {
if strings .HasPrefix (log .Text , "[pipe-in:add] " ) ||
strings .HasPrefix (log .Text , "[pipe-in:remove] " ) ||
strings .HasPrefix (log .Text , "[pipe-out:add] " ) ||
strings .HasPrefix (log .Text , "[pipe-out:remove] " ) {
isAdd := strings .HasPrefix (log .Text , "[pipe-in:add] " ) ||
strings .HasPrefix (log .Text , "[pipe-out:add] " )
isPipeOut := strings .HasPrefix (log .Text , "[pipe-out" )
var msg []string
if isPipeOut && isAdd {
msg = strings .Split (log .Text [len ("[pipe-out:add] " ):], " to " )
} else if !isPipeOut && isAdd {
msg = strings .Split (log .Text [len ("[pipe-in:add] " ):], " from " )
} else if isPipeOut && !isAdd {
msg = strings .Split (log .Text [len ("[pipe-out:remove] " ):], " to " )
} else if !isPipeOut && !isAdd {
msg = strings .Split (log .Text [len ("[pipe-in:remove] " ):], " from " )
}
mut := am .MutationRemove
if isAdd {
mut = am .MutationAdd
}
state := msg [0 ]
var sourceMachId string
var targetMachId string
if isPipeOut {
sourceMachId = c .Id
targetMachId = msg [1 ]
} else {
sourceMachId = msg [1 ]
targetMachId = c .Id
}
link , linkErr := g .G .Edge (sourceMachId , targetMachId )
if linkErr == nil {
data := link .Properties .Data .(*EdgeData )
found := false
for _ , pipe := range data .MachPipesTo {
if !isPipeOut && pipe .MutType == mut && pipe .ToState == "" {
pipe .ToState = state
found = true
break
}
if isPipeOut && pipe .MutType == mut && pipe .FromState == "" {
pipe .FromState = state
found = true
break
}
}
if !found {
var pipe *MachPipeTo
if isPipeOut {
pipe = &MachPipeTo {
FromState : state ,
MutType : mut ,
}
} else {
pipe = &MachPipeTo {
ToState : state ,
MutType : mut ,
}
}
data .MachPipesTo = append (data .MachPipesTo , pipe )
}
} else {
pipe := &MachPipeTo {
ToState : state ,
MutType : mut ,
}
if isPipeOut {
pipe = &MachPipeTo {
FromState : state ,
}
}
data := &EdgeData {MachPipesTo : []*MachPipeTo {pipe }}
err := g .G .AddEdge (sourceMachId , targetMachId , graph .EdgeData (data ))
if err != nil {
return err
}
_ = g .Map .AddEdge (sourceMachId , targetMachId )
}
} else if strings .HasPrefix (log .Text , "[pipe:gc] " ) {
l := strings .Split (log .Text , " " )
id := l [1 ]
adjs , err := g .G .AdjacencyMap ()
if err != nil {
panic (err )
}
for _ , edge := range adjs [id ] {
err := g .G .RemoveEdge (id , edge .Target )
if err != nil {
panic (err )
}
_ = g .Map .RemoveEdge (id , edge .Target )
}
preds , err := g .G .PredecessorMap ()
if err != nil {
panic (err )
}
for _ , edge := range preds [id ] {
err := g .G .RemoveEdge (edge .Source , id )
if err != nil {
panic (err )
}
_ = g .Map .RemoveEdge (edge .Source , id )
}
}
return nil
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .