package relay
import (
"context"
"encoding/gob"
"os"
"path"
"time"
"github.com/andybalholm/brotli"
"github.com/joho/godotenv"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/telemetry"
"github.com/pancsta/asyncmachine-go/tools/debugger/server"
"github.com/pancsta/asyncmachine-go/tools/relay/states"
"github.com/pancsta/asyncmachine-go/tools/relay/types"
)
var ss = states .RelayStates
type Relay struct {
Mach *am .Machine
Args *types .Args
clients map [string ]*server .Client
out types .OutputFunc
lastExport time .Time
}
func New (
ctx context .Context , args *types .Args , out types .OutputFunc ,
) (*Relay , error ) {
if args .Debug {
out ("debugging enabled\n" )
_ = godotenv .Load ()
}
r := &Relay {
Args : args ,
lastExport : time .Now (),
out : out ,
clients : make (map [string ]*server .Client ),
}
mach , err := am .NewCommon (ctx , "relay" , states .RelaySchema , ss .Names (),
r , nil , &am .Opts {
DontPanicToException : args .Debug ,
})
if err != nil {
return nil , err
}
if args .Debug {
amhelp .MachDebugEnv (mach )
}
r .Mach = mach
return r , nil
}
func (r *Relay ) StartState (e *am .Event ) {
a := r .Args .RotateDbg
r .out ("starting relay server on %s\n" , a .ListenAddr )
for _ , addr := range a .FwdAddr {
r .out ("forwarding to %s\n" , addr )
}
go server .StartRpc (r .Mach , a .ListenAddr , nil , a .FwdAddr , false )
}
func (r *Relay ) ClientMsgEnter (e *am .Event ) bool {
_ , ok1 := e .Args ["msgs_tx" ].([]*telemetry .DbgMsgTx )
_ , ok2 := e .Args ["conn_ids" ].([]string )
return ok1 && ok2
}
func (r *Relay ) ClientMsgState (e *am .Event ) {
msgs := e .Args ["msgs_tx" ].([]*telemetry .DbgMsgTx )
connIds := e .Args ["conn_ids" ].([]string )
for i , msg := range msgs {
machId := msg .MachineID
c := r .clients [machId ]
if _ , ok := r .clients [machId ]; !ok {
r .Mach .Log ("Error: client not found: %s\n" , machId )
continue
}
if c .MsgStruct == nil {
r .Mach .Log ("Error: schema missing for %s, ignoring tx\n" , machId )
continue
}
if c .ConnId != connIds [i ] {
r .Mach .Log ("Error: conn_id mismatch for %s, ignoring tx\n" , machId )
continue
}
c .MsgTxs = append (c .MsgTxs , msg )
}
a := r .Args .RotateDbg
if time .Since (r .lastExport ) > a .IntervalTime || r .msgsCount () > a .IntervalTx {
r .lastExport = time .Now ()
if err := r .hExportData (); err != nil {
r .out ("Error: export failed %s\n" , err )
r .Mach .AddErr (err , nil )
}
}
}
func (r *Relay ) ConnectEventEnter (e *am .Event ) bool {
msg , ok1 := e .Args ["msg_struct" ].(*telemetry .DbgMsgStruct )
_ , ok2 := e .Args ["conn_id" ].(string )
if !ok1 || !ok2 || msg .ID == "" {
r .Mach .Log ("Error: msg_struct malformed\n" )
return false
}
return true
}
func (r *Relay ) ConnectEventState (e *am .Event ) {
msg := e .Args ["msg_struct" ].(*telemetry .DbgMsgStruct )
connId := e .Args ["conn_id" ].(string )
var c *server .Client
if existing , ok := r .clients [msg .ID ]; ok {
if existing .ConnId != "" && existing .ConnId == connId {
r .Mach .Log ("schema changed for %s" , msg .ID )
r .out ("schema changed for %s\n" , msg .ID )
existing .MsgStruct = msg
c = existing
c .ParseSchema ()
} else {
r .Mach .Log ("client %s already exists, overriding" , msg .ID )
r .out ("client %s already exists, overriding\n" , msg .ID )
}
}
if c == nil {
r .out ("new client %s\n" , msg .ID )
c = &server .Client {
Id : msg .ID ,
ConnId : connId ,
SchemaHash : amhelp .SchemaHash (msg .States ),
Exportable : &server .Exportable {
MsgStruct : msg ,
},
}
c .Connected .Store (true )
r .clients [msg .ID ] = c
}
r .Mach .Add1 (ss .InitClient , am .A {"id" : msg .ID })
}
func (r *Relay ) DisconnectEventEnter (e *am .Event ) bool {
_ , ok := e .Args ["conn_id" ].(string )
if !ok {
r .Mach .Log ("Error: DisconnectEvent malformed\n" )
return false
}
return true
}
func (r *Relay ) DisconnectEventState (e *am .Event ) {
connID := e .Args ["conn_id" ].(string )
for _ , c := range r .clients {
if c .ConnId != "" && c .ConnId == connID {
c .Connected .Store (false )
r .Mach .Log ("client %s disconnected" , c .Id )
r .out ("client %s disconnected\n" , c .Id )
break
}
}
}
func (r *Relay ) hExportData () error {
count := r .msgsCount ()
r .out ("exporting %d transitions for %d clients\n" , count , len (r .clients ))
a := r .Args .RotateDbg
filename := a .Filename + "-" + time .Now ().Format ("2006-01-02_15-04-05" )
gobPath := path .Join (a .Dir , filename +".gob.br" )
fw , err := os .Create (gobPath )
if err != nil {
return err
}
defer fw .Close ()
data := make ([]*server .Exportable , len (r .clients ))
i := 0
for _ , c := range r .clients {
data [i ] = c .Exportable
i ++
}
brCompress := brotli .NewWriter (fw )
defer brCompress .Close ()
encoder := gob .NewEncoder (brCompress )
err = encoder .Encode (data )
if err != nil {
return err
}
for _ , c := range r .clients {
c .Exportable .MsgTxs = nil
}
return nil
}
func (r *Relay ) msgsCount () int {
count := 0
for _ , c := range r .clients {
count += len (c .MsgTxs )
}
return count
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .