package server
import (
"encoding/gob"
"fmt"
"log"
"net"
"net/http"
"net/rpc"
"os"
"slices"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/coder/websocket"
"github.com/soheilhy/cmux"
"github.com/pancsta/asyncmachine-go/internal/utils"
"github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/telemetry"
ss "github.com/pancsta/asyncmachine-go/tools/debugger/states"
"github.com/pancsta/asyncmachine-go/tools/debugger/types"
)
type Exportable struct {
MsgStruct *telemetry .DbgMsgStruct
MsgTxs []*telemetry .DbgMsgTx
}
type Client struct {
*Exportable
ReaderCollapsed bool
Id string
MTimeSum uint64
SelectedGroup string
Connected atomic .Bool
ConnId string
SchemaHash string
MsgTxsFiltered []int
LogMsgs [][]*am .LogEntry
Errors []int
MsgTxsParsed []*types .MsgTxParsed
MsgSchemaParsed *types .MsgSchemaParsed
txCache map [string ]int
txCacheMx sync .Mutex
}
func (c *Client ) ClearCache () {
c .txCache = nil
}
func (c *Client ) LastActive () time .Time {
if len (c .MsgTxs ) == 0 {
return time .Time {}
}
return *c .MsgTxs [len (c .MsgTxs )-1 ].Time
}
func (c *Client ) HadErrSinceTx (tx , distance int ) bool {
if slices .Contains (c .Errors , tx ) {
return true
}
index := sort .Search (len (c .Errors ), func (i int ) bool {
return c .Errors [i ] < tx
})
if index >= len (c .Errors ) {
return false
}
return tx -c .Errors [index ] < distance
}
func (c *Client ) LastTxTill (hTime time .Time ) int {
l := len (c .MsgTxs )
if l == 0 {
return -1
}
i := sort .Search (l , func (i int ) bool {
t := c .MsgTxs [i ].Time
return t .After (hTime ) || t .Equal (hTime )
})
if i == l {
i = l - 1
}
tx := c .MsgTxs [i ]
if i > 0 && tx .Time .Sub (hTime ) > hTime .Sub (*c .MsgTxs [i -1 ].Time ) {
return i
} else {
return i
}
}
func (c *Client ) StatesToIndexes (states am .S ) []int {
return helpers .StatesToIndexes (c .MsgStruct .StatesIndex , states )
}
func (c *Client ) IndexesToStates (indexes []int ) am .S {
return helpers .IndexesToStates (c .MsgStruct .StatesIndex , indexes )
}
func (c *Client ) TxIndex (id string ) int {
c .txCacheMx .Lock ()
defer c .txCacheMx .Unlock ()
if c .txCache == nil {
c .txCache = make (map [string ]int )
}
if idx , ok := c .txCache [id ]; ok {
return idx
}
for i , tx := range c .MsgTxs {
if tx .ID == id {
c .txCache [id ] = i
return i
}
}
return -1
}
func (c *Client ) Tx (idx int ) *telemetry .DbgMsgTx {
if idx < 0 || idx >= len (c .MsgTxs ) {
return nil
}
return c .MsgTxs [idx ]
}
func (c *Client ) TxParsed (idx int ) *types .MsgTxParsed {
if idx < 0 || idx >= len (c .MsgTxsParsed ) {
return nil
}
return c .MsgTxsParsed [idx ]
}
func (c *Client ) TxByMachTime (sum uint64 ) int {
idx , ok := slices .BinarySearchFunc (c .MsgTxsParsed ,
&types .MsgTxParsed {TimeSum : sum }, func (i , j *types .MsgTxParsed ) int {
if i .TimeSum < j .TimeSum {
return -1
} else if i .TimeSum > j .TimeSum {
return 1
}
return 0
})
if !ok {
return 0
}
return idx
}
func (c *Client ) FilterIndexByCursor1 (cursor1 int ) int {
if cursor1 == 0 {
return 0
}
return slices .Index (c .MsgTxsFiltered , cursor1 -1 )
}
func (c *Client ) ParseSchema () {
schema := c .MsgStruct
sp := &types .MsgSchemaParsed {
GroupsOrder : []string {"all" },
Groups : map [string ]am .S {
"all" : schema .StatesIndex ,
},
}
c .MsgSchemaParsed = sp
if len (schema .GroupsOrder ) == 0 {
return
}
pastSelf := false
prev := am .S {}
for _ , g := range schema .GroupsOrder {
name := strings .TrimSuffix (g , "StatesDef" )
if g == "self" {
name = "- self"
} else if pastSelf {
name = "- " + name
}
sp .GroupsOrder = append (sp .GroupsOrder , name )
sp .Groups [name ] = c .IndexesToStates (schema .Groups [g ])
if pastSelf {
sp .Groups [g ] = slices .Concat (sp .Groups [name ], prev )
prev = sp .Groups [name ]
}
if g == "self" {
pastSelf = true
}
}
}
func StartRpc (
mach *am .Machine , addr string , mux chan <- cmux .CMux , fwdAdds []string ,
enableHttp bool ,
) {
var err error
gob .Register (am .Relation (0 ))
gob .Register (Exportable {})
if addr == "" {
addr = telemetry .DbgAddr
}
_ , addrPort , _ := net .SplitHostPort (addr )
port , _ := strconv .Atoi (addrPort )
lis , err := net .Listen ("tcp" , addr )
if err != nil {
log .Println (err )
mach .AddErr (err , nil )
panic (err )
}
mach .Log ("dbg server started at %s" , addr )
fwdTo := make ([]*rpc .Client , len (fwdAdds ))
for i , a := range fwdAdds {
fwdTo [i ], err = rpc .Dial ("tcp" , a )
if err != nil {
fmt .Printf ("Cant fwd to %s: %s\n" , a , err )
os .Exit (1 )
}
}
mux1 := cmux .New (lis )
tcpL := mux1 .Match (cmux .Any ())
go tcpAccept (tcpL , mach , fwdTo )
if enableHttp {
httpAddr := fmt .Sprintf ("%s:%d" ,
addr [:strings .LastIndex (addr , ":" )], port +1 )
httpLis , err := net .Listen ("tcp" , httpAddr )
if err != nil {
log .Println (err )
mach .AddErr (err , nil )
panic (err )
}
mux2 := cmux .New (httpLis )
httpL := mux2 .Match (cmux .HTTP1 ())
httpS := &http .Server {
Handler : http .HandlerFunc (func (w http .ResponseWriter , r *http .Request ) {
if r .RequestURI == "/diagrams/mach.ws" {
wsHandler (w , r , mach )
return
}
httpHandler (w , r , mach )
})}
go func () {
mach .AddErr (httpS .Serve (httpL ), nil )
}()
go func () {
if err := mux2 .Serve (); err != nil {
fmt .Println (err )
mach .AddErr (err , nil )
os .Exit (1 )
}
}()
}
if mux != nil {
go func () {
time .Sleep (100 * time .Millisecond )
mux <- mux1
}()
}
if err := mux1 .Serve (); err != nil {
fmt .Println (err )
os .Exit (1 )
}
}
type RPCServer struct {
Mach *am .Machine
ConnID string
FwdTo []*rpc .Client
}
type DbgMsgSchemaFwd struct {
MsgStruct *telemetry .DbgMsgStruct
ConnId string
}
func (r *RPCServer ) DbgMsgSchemaFwd (
msg *DbgMsgSchemaFwd , _ *string ,
) error {
r .Mach .Add1 (ss .ConnectEvent , am .A {
"msg_struct" : msg .MsgStruct ,
"conn_id" : msg .ConnId ,
"Client.id" : msg .MsgStruct .ID ,
})
return nil
}
func (r *RPCServer ) DbgMsgSchema (
msgSchema *telemetry .DbgMsgStruct , _ *string ,
) error {
r .Mach .Add1 (ss .ConnectEvent , am .A {
"msg_struct" : msgSchema ,
"conn_id" : r .ConnID ,
"Client.id" : msgSchema .ID ,
})
for _ , host := range r .FwdTo {
fwdMsg := DbgMsgSchemaFwd {
MsgStruct : msgSchema ,
ConnId : r .ConnID ,
}
err := host .Call ("RPCServer.DbgMsgSchemaFwd" , &fwdMsg , nil )
if err != nil {
return err
}
}
return nil
}
var (
queue []*telemetry .DbgMsgTx
queueConnId []string
queueMx sync .Mutex
scheduled bool
)
func (r *RPCServer ) DbgMsgTx (msgTx *telemetry .DbgMsgTx , _ *string ) error {
queueMx .Lock ()
defer queueMx .Unlock ()
if !scheduled {
scheduled = true
go func () {
time .Sleep (time .Second )
queueMx .Lock ()
defer queueMx .Unlock ()
r .Mach .Add1 (ss .ClientMsg , am .A {
"msgs_tx" : queue ,
"conn_ids" : queueConnId ,
})
queue = nil
queueConnId = nil
scheduled = false
}()
}
now := time .Now ()
msgTx .Time = &now
queue = append (queue , msgTx )
queueConnId = append (queueConnId , r .ConnID )
for _ , host := range r .FwdTo {
fwdMsg := DbgMsgTx {
MsgTx : msgTx ,
ConnId : r .ConnID ,
}
err := host .Call ("RPCServer.DbgMsgTxFwd" , fwdMsg , nil )
if err != nil {
return err
}
}
return nil
}
type DbgMsgTx struct {
MsgTx *telemetry .DbgMsgTx
ConnId string
}
func (r *RPCServer ) DbgMsgTxFwd (msg *DbgMsgTx , _ *string ) error {
queueMx .Lock ()
defer queueMx .Unlock ()
msgTx := msg .MsgTx
connId := msg .ConnId
if !scheduled {
scheduled = true
go func () {
time .Sleep (time .Second )
queueMx .Lock ()
defer queueMx .Unlock ()
r .Mach .Add1 (ss .ClientMsg , am .A {
"msgs_tx" : queue ,
"conn_ids" : queueConnId ,
})
queue = nil
queueConnId = nil
scheduled = false
}()
}
now := time .Now ()
msgTx .Time = &now
queue = append (queue , msgTx )
queueConnId = append (queueConnId , connId )
for _ , host := range r .FwdTo {
fwdMsg := DbgMsgTx {
MsgTx : msgTx ,
ConnId : connId ,
}
err := host .Call ("RPCServer.DbgMsgTxFwd" , &fwdMsg , nil )
if err != nil {
return err
}
}
return nil
}
func tcpAccept(l net .Listener , mach *am .Machine , fwdTo []*rpc .Client ) {
for {
conn , err := l .Accept ()
if err != nil {
mach .AddErr (err , nil )
continue
}
go func () {
server := rpc .NewServer ()
connID := utils .RandId (8 )
rcvr := &RPCServer {
Mach : mach ,
ConnID : connID ,
FwdTo : fwdTo ,
}
err = server .Register (rcvr )
if err != nil {
rcvr .Mach .AddErr (err , nil )
fmt .Println (err )
os .Exit (1 )
}
server .ServeConn (conn )
rcvr .Mach .Add1 (ss .DisconnectEvent , am .A {"conn_id" : connID })
}()
}
}
func httpHandler(w http .ResponseWriter , r *http .Request , mach *am .Machine ) {
done := make (chan struct {})
middleware (w )
if r .Method == "OPTIONS" {
w .WriteHeader (http .StatusOK )
return
}
mach .Add1 (ss .WebReq , am .A {
"uri" : r .RequestURI ,
"*http.Request" : r ,
"http.ResponseWriter" : w ,
"doneChan" : done ,
"addr" : r .RemoteAddr ,
})
<-done
}
func wsHandler(w http .ResponseWriter , r *http .Request , mach *am .Machine ) {
middleware (w )
c , err := websocket .Accept (w , r , &websocket .AcceptOptions {
InsecureSkipVerify : true ,
})
if err != nil {
mach .AddErrState (ss .ErrWeb , err , nil )
return
}
defer c .Close (websocket .StatusInternalError , "internal error" )
done := make (chan struct {})
mach .Add1 (ss .WebSocket , am .A {
"*websocket.Conn" : c ,
"*http.Request" : r ,
"http.ResponseWriter" : w ,
"doneChan" : done ,
"addr" : r .RemoteAddr ,
})
<-done
}
func middleware(w http .ResponseWriter ) {
w .Header ().Set ("Access-Control-Allow-Origin" , "*" )
w .Header ().Set ("Access-Control-Allow-Methods" ,
"POST, GET, OPTIONS, PUT, DELETE" )
w .Header ().Set ("Access-Control-Allow-Headers" ,
"Accept, Content-Type, Content-Length, Accept-Encoding, " +
"X-CSRF-Token, Authorization" )
w .Header ().Set ("Cache-Control" , "no-cache, no-store, must-revalidate" )
w .Header ().Set ("Pragma" , "no-cache" )
w .Header ().Set ("Expires" , "0" )
}
type GetField int
const (
GetCursorTx GetField = iota + 1
GetCursorStep
GetClientCount
GetMsgCount
GetOpts
GetSelectedState
)
func Decode (s string ) GetField {
return GetField (s [0 ])
}
func (n GetField ) String () string {
switch n {
case GetCursorTx :
return "GetCursorTx"
case GetCursorStep :
return "GetCursorStep"
case GetClientCount :
return "GetClientCount"
case GetMsgCount :
return "GetMsgCount"
case GetOpts :
return "GetOpts"
case GetSelectedState :
return "GetSelectedState"
}
return "!UNKNOWN!"
}
func (n GetField ) Encode () string {
return string (rune (n ))
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .