package server
import (
"encoding/gob"
"fmt"
"io"
"log"
"net"
"net/http"
"net/rpc"
"os"
"slices"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/coder/websocket"
"github.com/pancsta/asyncmachine-go/internal/utils"
"github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/telemetry/dbg"
ss "github.com/pancsta/asyncmachine-go/tools/debugger/states"
"github.com/pancsta/asyncmachine-go/tools/debugger/types"
"github.com/soheilhy/cmux"
)
type Exportable struct {
MsgStruct *dbg .DbgMsgStruct
MsgTxs []*dbg .DbgMsgTx
}
type Client struct {
*Exportable
ReaderCollapsed bool
Id string
MTimeSum uint64
SelectedGroup string
Connected atomic .Bool
ConnId string
SchemaHash string
MsgTxsFiltered []int
LogMsgs [][]*am .LogEntry
Errors []int
MsgTxsParsed []*types .MsgTxParsed
MsgSchemaParsed *types .MsgSchemaParsed
txCache map [string ]int
txCacheMx sync .Mutex
}
func (c *Client ) ClearCache () {
c .txCache = nil
}
func (c *Client ) LastActive () time .Time {
if len (c .MsgTxs ) == 0 {
return time .Time {}
}
return *c .MsgTxs [len (c .MsgTxs )-1 ].Time
}
func (c *Client ) HadErrSinceTx (tx , distance int ) bool {
if slices .Contains (c .Errors , tx ) {
return true
}
index := sort .Search (len (c .Errors ), func (i int ) bool {
return c .Errors [i ] < tx
})
if index >= len (c .Errors ) {
return false
}
return tx -c .Errors [index ] < distance
}
func (c *Client ) LastTxTill (hTime time .Time ) int {
l := len (c .MsgTxs )
if l == 0 {
return -1
}
i := sort .Search (l , func (i int ) bool {
t := c .MsgTxs [i ].Time
return t .After (hTime ) || t .Equal (hTime )
})
if i == l {
i = l - 1
}
tx := c .MsgTxs [i ]
if i > 0 && tx .Time .Sub (hTime ) > hTime .Sub (*c .MsgTxs [i -1 ].Time ) {
return i
} else {
return i
}
}
func (c *Client ) StatesToIndexes (states am .S ) []int {
return helpers .StatesToIndexes (c .MsgStruct .StatesIndex , states )
}
func (c *Client ) IndexesToStates (indexes []int ) am .S {
return helpers .IndexesToStates (c .MsgStruct .StatesIndex , indexes )
}
func (c *Client ) TxIndex (id string ) int {
c .txCacheMx .Lock ()
defer c .txCacheMx .Unlock ()
if c .txCache == nil {
c .txCache = make (map [string ]int )
}
if idx , ok := c .txCache [id ]; ok {
return idx
}
for i , tx := range c .MsgTxs {
if tx .ID == id {
c .txCache [id ] = i
return i
}
}
return -1
}
func (c *Client ) Tx (idx int ) *dbg .DbgMsgTx {
if idx < 0 || idx >= len (c .MsgTxs ) {
return nil
}
return c .MsgTxs [idx ]
}
func (c *Client ) TxParsed (idx int ) *types .MsgTxParsed {
if idx < 0 || idx >= len (c .MsgTxsParsed ) {
return nil
}
return c .MsgTxsParsed [idx ]
}
func (c *Client ) TxByMachTime (sum uint64 ) int {
idx , ok := slices .BinarySearchFunc (c .MsgTxsParsed ,
&types .MsgTxParsed {TimeSum : sum }, func (i , j *types .MsgTxParsed ) int {
if i .TimeSum < j .TimeSum {
return -1
} else if i .TimeSum > j .TimeSum {
return 1
}
return 0
})
if !ok {
return 0
}
return idx
}
func (c *Client ) FilterIndexByCursor1 (cursor1 int ) int {
if cursor1 == 0 {
return 0
}
return slices .Index (c .MsgTxsFiltered , cursor1 -1 )
}
func (c *Client ) ParseSchema () {
schema := c .MsgStruct
sp := &types .MsgSchemaParsed {
GroupsOrder : []string {"all" },
Groups : map [string ]am .S {
"all" : schema .StatesIndex ,
},
}
c .MsgSchemaParsed = sp
if len (schema .GroupsOrder ) == 0 {
return
}
pastSelf := false
prev := am .S {}
for _ , g := range schema .GroupsOrder {
name := strings .TrimSuffix (g , "StatesDef" )
if g == "self" {
name = "- self"
} else if pastSelf {
name = "- " + name
}
sp .GroupsOrder = append (sp .GroupsOrder , name )
sp .Groups [name ] = c .IndexesToStates (schema .Groups [g ])
if pastSelf {
sp .Groups [g ] = slices .Concat (sp .Groups [name ], prev )
prev = sp .Groups [name ]
}
if g == "self" {
pastSelf = true
}
}
}
func StartRpc (
mach *am .Machine , addr string , mux chan <- cmux .CMux , p types .Params ,
) {
var err error
gob .Register (am .Relation (0 ))
gob .Register (Exportable {})
if addr == "" {
addr = dbg .DbgAddr
}
_ , addrPort , _ := net .SplitHostPort (addr )
port , _ := strconv .Atoi (addrPort )
lis , err := net .Listen ("tcp" , addr )
if err != nil {
log .Println (err )
mach .AddErr (err , nil )
fmt .Println (err )
os .Exit (1 )
}
mach .Log ("dbg server started at %s" , addr )
fwdTo := make ([]*rpc .Client , len (p .FwdData ))
for i , a := range p .FwdData {
fwdTo [i ], err = rpc .Dial ("tcp" , a )
if err != nil {
fmt .Printf ("Cant fwd to %s: %s\n" , a , err )
os .Exit (1 )
}
}
mux1 := cmux .New (lis )
tcpL := mux1 .Match (cmux .Any ())
go tcpAccept (tcpL , mach , fwdTo )
if p .UiWeb {
httpAddr := fmt .Sprintf ("%s:%d" ,
addr [:strings .LastIndex (addr , ":" )], port +1 )
httpMux := http .ServeMux {}
httpMux .HandleFunc ("/{$}" , func (w http .ResponseWriter , r *http .Request ) {
httpHandlerIndex (w , r , mach , p )
})
httpMux .Handle ("/" , http .FileServer (http .Dir (p .OutputDir )))
httpMux .HandleFunc ("/diagrams/mach" , func (
w http .ResponseWriter , r *http .Request ,
) {
httpHandlerDiagMach (w , r , mach )
})
httpMux .HandleFunc ("/diagrams/mach.svg" , func (
w http .ResponseWriter , r *http .Request ,
) {
httpHandlerDiagMach (w , r , mach )
})
httpMux .HandleFunc ("/diagrams/mach.ws" ,
func (w http .ResponseWriter , r *http .Request ) {
wsDiagHandler (w , r , mach )
})
httpMux .HandleFunc ("/dbg.ws" , func (w http .ResponseWriter , r *http .Request ) {
wsDbgHandler (w , r , mach , fwdTo )
})
httpSrv := &http .Server {
Handler : &httpMux ,
Addr : httpAddr ,
}
go func () {
mach .AddErr (httpSrv .ListenAndServe (), nil )
}()
}
if mux != nil {
go func () {
time .Sleep (100 * time .Millisecond )
mux <- mux1
}()
}
if err := mux1 .Serve (); err != nil {
fmt .Println (err )
os .Exit (1 )
}
}
type RPCServer struct {
Mach *am .Machine
ConnID string
FwdTo []*rpc .Client
}
type DbgMsgSchemaFwd struct {
MsgStruct *dbg .DbgMsgStruct
ConnId string
}
func (r *RPCServer ) DbgMsgSchemaFwd (
msg *DbgMsgSchemaFwd , _ *string ,
) error {
r .Mach .Add1 (ss .ConnectEvent , am .A {
"msg_struct" : msg .MsgStruct ,
"conn_id" : msg .ConnId ,
"Client.id" : msg .MsgStruct .ID ,
})
return nil
}
func (r *RPCServer ) DbgMsgSchema (
msgSchema *dbg .DbgMsgStruct , _ *string ,
) error {
r .Mach .Add1 (ss .ConnectEvent , am .A {
"msg_struct" : msgSchema ,
"conn_id" : r .ConnID ,
"Client.id" : msgSchema .ID ,
})
for _ , host := range r .FwdTo {
fwdMsg := DbgMsgSchemaFwd {
MsgStruct : msgSchema ,
ConnId : r .ConnID ,
}
err := host .Call ("RPCServer.DbgMsgSchemaFwd" , &fwdMsg , nil )
if err != nil {
return err
}
}
return nil
}
var (
queue []*dbg .DbgMsgTx
queueConnId []string
queueMx sync .Mutex
scheduled bool
)
func (r *RPCServer ) DbgMsgTx (msgTx *dbg .DbgMsgTx , _ *string ) error {
queueMx .Lock ()
defer queueMx .Unlock ()
if !scheduled {
scheduled = true
go func () {
time .Sleep (time .Second )
queueMx .Lock ()
defer queueMx .Unlock ()
r .Mach .Add1 (ss .ClientMsg , am .A {
"msgs_tx" : queue ,
"conn_ids" : queueConnId ,
})
queue = nil
queueConnId = nil
scheduled = false
}()
}
now := time .Now ()
msgTx .Time = &now
queue = append (queue , msgTx )
queueConnId = append (queueConnId , r .ConnID )
for _ , host := range r .FwdTo {
fwdMsg := DbgMsgTx {
MsgTx : msgTx ,
ConnId : r .ConnID ,
}
err := host .Call ("RPCServer.DbgMsgTxFwd" , fwdMsg , nil )
if err != nil {
return err
}
}
return nil
}
type DbgMsgTx struct {
MsgTx *dbg .DbgMsgTx
ConnId string
}
func (r *RPCServer ) DbgMsgTxFwd (msg *DbgMsgTx , _ *string ) error {
queueMx .Lock ()
defer queueMx .Unlock ()
msgTx := msg .MsgTx
connId := msg .ConnId
if !scheduled {
scheduled = true
go func () {
time .Sleep (time .Second )
queueMx .Lock ()
defer queueMx .Unlock ()
r .Mach .Add1 (ss .ClientMsg , am .A {
"msgs_tx" : queue ,
"conn_ids" : queueConnId ,
})
queue = nil
queueConnId = nil
scheduled = false
}()
}
now := time .Now ()
msgTx .Time = &now
queue = append (queue , msgTx )
queueConnId = append (queueConnId , connId )
for _ , host := range r .FwdTo {
fwdMsg := DbgMsgTx {
MsgTx : msgTx ,
ConnId : connId ,
}
err := host .Call ("RPCServer.DbgMsgTxFwd" , &fwdMsg , nil )
if err != nil {
return err
}
}
return nil
}
func tcpAccept(l net .Listener , mach *am .Machine , fwdTo []*rpc .Client ) {
for {
conn , err := l .Accept ()
if err != nil {
mach .AddErr (err , nil )
continue
}
go AcceptConn (conn , mach , fwdTo )
}
}
func AcceptConn (
conn io .ReadWriteCloser , mach *am .Machine , fwdTo []*rpc .Client ,
) {
server := rpc .NewServer ()
connId := utils .RandId (8 )
rcvr := &RPCServer {
Mach : mach ,
ConnID : connId ,
FwdTo : fwdTo ,
}
err := server .Register (rcvr )
if err != nil {
rcvr .Mach .AddErr (err , nil )
fmt .Println (err )
os .Exit (1 )
}
server .ServeConn (conn )
rcvr .Mach .Add1 (ss .DisconnectEvent , am .A {"conn_id" : connId })
}
func httpHandlerIndex(
w http .ResponseWriter , r *http .Request , mach *am .Machine , p types .Params ,
) {
middleware (w )
entries , err := os .ReadDir (p .OutputDir )
if err != nil {
mach .AddErr (err , nil )
http .Error (w , "Failed to read directory" , http .StatusInternalServerError )
return
}
sort .Slice (entries , func (i , j int ) bool {
if entries [i ].IsDir () != entries [j ].IsDir () {
return entries [i ].IsDir ()
}
return entries [i ].Name () < entries [j ].Name ()
})
w .Header ().Set ("Content-Type" , "text/html; charset=utf-8" )
fmt .Fprintf (w , "<html><head><title>am-dbg --dir %s</title></head><body>" ,
p .OutputDir )
fmt .Fprintf (w , "<h1>am-dbg</h1><hr>" )
fmt .Fprintf (w , "<h2>Pages</h2><hr><ul>" )
fmt .Fprintf (w , `<li><a href="/diagrams/mach">/diagrams/mach</a></li>` )
fmt .Fprintf (w , `</ul>` )
fmt .Fprintf (w , "<h2>--dir %s</h2><hr><ul>" , p .OutputDir )
for _ , entry := range entries {
name := entry .Name ()
if entry .IsDir () {
name += "/"
}
fmt .Fprintf (w , `<li><a href="%s">%s</a></li>` , name , name )
}
fmt .Fprintf (w , "</ul><hr></body></html>" )
}
func httpHandlerDiagMach(
w http .ResponseWriter , r *http .Request , mach *am .Machine ,
) {
done := make (chan struct {})
middleware (w )
if r .Method == "OPTIONS" {
w .WriteHeader (http .StatusOK )
return
}
mach .Add1 (ss .WebReq , am .A {
"uri" : r .RequestURI ,
"*http.Request" : r ,
"http.ResponseWriter" : w ,
"doneChan" : done ,
"addr" : r .RemoteAddr ,
})
<-done
}
func wsDiagHandler(w http .ResponseWriter , r *http .Request , mach *am .Machine ) {
middleware (w )
conn , err := websocket .Accept (w , r , &websocket .AcceptOptions {
InsecureSkipVerify : true ,
})
if err != nil {
mach .AddErrState (ss .ErrWeb , err , nil )
return
}
defer conn .Close (websocket .StatusInternalError , "internal error" )
done := make (chan struct {})
mach .Add1 (ss .WebSocketDiag , am .A {
"*websocket.Conn" : conn ,
"*http.Request" : r ,
"http.ResponseWriter" : w ,
"doneChan" : done ,
"addr" : r .RemoteAddr ,
})
<-done
}
func middleware(w http .ResponseWriter ) {
w .Header ().Set ("Access-Control-Allow-Origin" , "localhost" )
w .Header ().Set ("Access-Control-Allow-Methods" ,
"POST, GET, OPTIONS, PUT, DELETE" )
w .Header ().Set ("Access-Control-Allow-Headers" ,
"Accept, Content-Type, Content-Length, Accept-Encoding, " +
"X-CSRF-Token, Authorization" )
w .Header ().Set ("Cache-Control" , "no-cache, no-store, must-revalidate" )
w .Header ().Set ("Pragma" , "no-cache" )
w .Header ().Set ("Expires" , "0" )
}
func wsDbgHandler(
w http .ResponseWriter , r *http .Request , mach *am .Machine , fwdTo []*rpc .Client ,
) {
connWs , err := websocket .Accept (w , r , &websocket .AcceptOptions {
InsecureSkipVerify : true ,
})
if err != nil {
log .Printf ("Upgrade error: %v" , err )
return
}
conn := websocket .NetConn (mach .Context (), connWs , websocket .MessageBinary )
AcceptConn (conn , mach , fwdTo )
}
type GetField int
const (
GetCursorTx GetField = iota + 1
GetCursorStep
GetClientCount
GetMsgCount
GetOpts
GetSelectedState
)
func Decode (s string ) GetField {
return GetField (s [0 ])
}
func (n GetField ) String () string {
switch n {
case GetCursorTx :
return "GetCursorTx"
case GetCursorStep :
return "GetCursorStep"
case GetClientCount :
return "GetClientCount"
case GetMsgCount :
return "GetMsgCount"
case GetOpts :
return "GetOpts"
case GetSelectedState :
return "GetSelectedState"
}
return "!UNKNOWN!"
}
func (n GetField ) Encode () string {
return string (rune (n ))
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .