package relay
import (
"context"
"encoding/gob"
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"time"
"github.com/andybalholm/brotli"
"github.com/coder/websocket"
"github.com/joho/godotenv"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
arpc "github.com/pancsta/asyncmachine-go/pkg/rpc"
ssrpc "github.com/pancsta/asyncmachine-go/pkg/rpc/states"
"github.com/pancsta/asyncmachine-go/pkg/telemetry/dbg"
"github.com/pancsta/asyncmachine-go/tools/debugger/server"
typesDbg "github.com/pancsta/asyncmachine-go/tools/debugger/types"
"github.com/pancsta/asyncmachine-go/tools/relay/states"
"github.com/pancsta/asyncmachine-go/tools/relay/types"
)
var (
ssR = states .RelayStates
ssT = states .WsTcpTunStates
Pass = types .Pass
)
type A = types .A
type Relay struct {
Mach *am .Machine
Args types .Args
HttpMux *http .ServeMux
wsTcpTuns map [string ]*WsTcpTun
cWsTcpTuns int
dbgClients map [string ]*server .Client
out types .OutputFunc
lastExport time .Time
http *http .Server
}
func New (ctx context .Context , args types .Args ) (*Relay , error ) {
out := args .Output
if out == nil {
out = func (format string , v ...any ) {
fmt .Printf (format , v ...)
}
}
if args .Debug {
out ("WASM relay debug, loading .env\n" )
_ = godotenv .Load ()
}
r := &Relay {
Args : args ,
lastExport : time .Now (),
out : out ,
dbgClients : make (map [string ]*server .Client ),
wsTcpTuns : make (map [string ]*WsTcpTun ),
}
id := "relay"
if args .Name != "" {
id += "-" + args .Name
}
mach , err := am .NewCommon (ctx , id , states .RelaySchema , ssR .Names (),
r , args .Parent , &am .Opts {
DontPanicToException : args .Debug ,
})
if err != nil {
return nil , err
}
if args .Debug {
_ = amhelp .MachDebugEnv (mach )
}
r .Mach = mach
_, _ = arpc .MachReplEnv (mach , &arpc .ReplOpts {
Args : types .ARpc {},
})
mach .SemLogger ().SetArgsMapper (types .LogArgs )
return r , nil
}
func (r *Relay ) StartState (e *am .Event ) {
a := r .Args
if a .RotateDbg != nil {
cmd := a .RotateDbg
r .out ("starting relay server on %s\n" , cmd .ListenAddr )
for _ , addr := range cmd .FwdAddr {
r .out ("forwarding to %s\n" , addr )
}
dbgParams := typesDbg .Params {
FwdData : cmd .FwdAddr ,
}
go server .StartRpc (r .Mach , cmd .ListenAddr , nil , dbgParams )
}
if a .Wasm != nil {
r .Mach .Add1 (ssR .HttpStarting , nil )
}
}
func (r *Relay ) Start (e *am .Event ) am .Result {
return r .Mach .EvAdd1 (e , ssR .Start , nil )
}
func (r *Relay ) Stop (e *am .Event ) am .Result {
return r .Mach .EvRemove1 (e , ssR .Start , nil )
}
func (r *Relay ) HttpStartingState (e *am .Event ) {
ctx := r .Mach .NewStateCtx (ssR .HttpStarting )
addr := r .Args .Wasm .ListenAddr
r .HttpMux = http .NewServeMux ()
r .http = &http .Server {
Addr : addr ,
Handler : r .HttpMux ,
}
go func () {
if ctx .Err () != nil {
return
}
go r .Mach .Add1 (ssR .HttpReady , nil )
err := r .http .ListenAndServe ()
if err != nil && err != http .ErrServerClosed {
r .Mach .AddErr (err , nil )
}
r .Mach .Remove1 (ssR .HttpReady , nil )
}()
}
func (r *Relay ) HttpReadyState (e *am .Event ) {
r .out ("WASM relay listening on http://%s\n" , r .Args .Wasm .ListenAddr )
if dir := r .Args .Wasm .StaticDir ; dir != "" {
r .out ("WASM relay serving %s\n" , dir )
r .HttpMux .Handle ("/" , http .FileServer (http .Dir (dir )))
}
r .HttpMux .HandleFunc ("/listen/" ,
func (w http .ResponseWriter , req *http .Request ) {
r .HandleWsTcpListen (e , w , req )
})
}
func (r *Relay ) HandleWsTcpListen (
e *am .Event , w http .ResponseWriter , req *http .Request ,
) {
ctx , cancel := context .WithCancel (req .Context ())
defer cancel ()
r .Mach .EvAdd1 (e , ssR .WsTunListenConn , nil )
uri , ok := strings .CutPrefix (req .URL .Path , arpc .WsPathListen )
if !ok {
r .Mach .EvAddErrState (e , ssR .ErrNetwork , fmt .Errorf (
"invalid /listen path: %s" , req .URL .Path ), nil )
return
}
id , tcpAddr := path .Split (uri )
id = strings .TrimSuffix (id , "/" )
r .Mach .Log ("WS TCP tunnel for %s at %s" , id , tcpAddr )
conn , err := websocket .Accept (w , req , &websocket .AcceptOptions {
InsecureSkipVerify : true ,
})
if err != nil {
r .Mach .EvAddErrState (e , ssR .ErrNetwork , err , Pass (&A {
Addr : tcpAddr ,
Id : id ,
}))
return
}
for _ , m := range r .Args .Wasm .ClientMatchers {
if !m .Id .MatchString (id ) {
continue
}
r .Mach .Log ("WS tunnel for %s accepted by client matcher" , id )
netConn := websocket .NetConn (ctx , conn , websocket .MessageBinary )
client , err := m .NewClient (ctx , id , netConn )
if err != nil {
r .Mach .EvAddErr (e ,
fmt .Errorf ("failed to create RPC client for %s: %s" , id , err ), nil )
} else {
<-client .Mach .When1 (ssrpc .ClientStates .Connected , ctx )
<-client .Mach .WhenNot1 (ssrpc .ClientStates .Connected , ctx )
client .Stop (nil , e , true )
return
}
}
var tunMach *am .Machine
ok = r .Mach .Eval ("listen_init" , func () {
if tun , ok2 := r .wsTcpTuns [tcpAddr ]; ok2 {
r .Mach .Log ("disposing existing WS TCP tunnel for %s at %s" , id , tcpAddr )
amhelp .Dispose (tun .Mach )
time .Sleep (100 * time .Millisecond )
}
idTun := fmt .Sprintf ("%s-wtt-%d" , r .Mach .Id (), r .cWsTcpTuns )
r .cWsTcpTuns ++
tun , err := NewWsTcpTun (ctx , conn , id , tcpAddr , req .RemoteAddr , idTun ,
r .Mach , r .Args .Debug )
if err != nil {
r .Mach .EvAddErr (e ,
fmt .Errorf ("failed to create tun mach %s: %s" , idTun , err ), nil )
return
}
tunMach = tun .Mach
r .wsTcpTuns [tcpAddr ] = tun
}, ctx )
if !ok || tunMach == nil {
_ = conn .Close (websocket .StatusInternalError , "tun creation failed" )
return
}
if d := r .Args .Wasm .ReplAddrDir ; d != "" &&
strings .HasPrefix (id , "repl-" ) {
name := id + ".addr"
r .Mach .Log ("REPL detected, creating %s" , name )
err := os .WriteFile (filepath .Join (d , name ), []byte (tcpAddr ), 0o644 )
if err != nil {
r .Mach .EvAddErr (e , fmt .Errorf (
"failed to write REPL addr file %s: %s" , name , err ), nil )
}
}
tunMach .EvAdd1 (e , ssT .Start , nil )
<-tunMach .WhenDisposed ()
r .Mach .Eval ("listen_end" , func () {
amhelp .Dispose (tunMach )
r .Mach .EvAdd1 (e , ssR .WsTunListenDisconn , nil )
}, ctx )
}
func (r *Relay ) HttpReadyEnd (e *am .Event ) {
if s := r .http ; s != nil {
r .Mach .AddErr (s .Close (), nil )
}
}
func (r *Relay ) ClientMsgEnter (e *am .Event ) bool {
_ , ok1 := e .Args ["msgs_tx" ].([]*dbg .DbgMsgTx )
_ , ok2 := e .Args ["conn_ids" ].([]string )
return ok1 && ok2
}
func (r *Relay ) ClientMsgState (e *am .Event ) {
msgs := e .Args ["msgs_tx" ].([]*dbg .DbgMsgTx )
connIds := e .Args ["conn_ids" ].([]string )
for i , msg := range msgs {
machId := msg .MachineID
c := r .dbgClients [machId ]
if _ , ok := r .dbgClients [machId ]; !ok {
r .Mach .Log ("Error: client not found: %s\n" , machId )
continue
}
if c .MsgStruct == nil {
r .Mach .Log ("Error: schema missing for %s, ignoring tx\n" , machId )
continue
}
if c .ConnId != connIds [i ] {
r .Mach .Log ("Error: conn_id mismatch for %s, ignoring tx\n" , machId )
continue
}
c .MsgTxs = append (c .MsgTxs , msg )
}
a := r .Args .RotateDbg
if time .Since (r .lastExport ) > a .IntervalTime || r .msgsCount () > a .IntervalTx {
r .lastExport = time .Now ()
if err := r .hExportData (); err != nil {
r .out ("Error: export failed %s\n" , err )
r .Mach .AddErr (err , nil )
}
}
}
func (r *Relay ) ConnectEventEnter (e *am .Event ) bool {
msg , ok1 := e .Args ["msg_struct" ].(*dbg .DbgMsgStruct )
_ , ok2 := e .Args ["conn_id" ].(string )
if !ok1 || !ok2 || msg .ID == "" {
r .Mach .Log ("Error: msg_struct malformed\n" )
return false
}
return true
}
func (r *Relay ) ConnectEventState (e *am .Event ) {
msg := e .Args ["msg_struct" ].(*dbg .DbgMsgStruct )
connId := e .Args ["conn_id" ].(string )
var c *server .Client
if existing , ok := r .dbgClients [msg .ID ]; ok {
if existing .ConnId != "" && existing .ConnId == connId {
r .Mach .Log ("schema changed for %s" , msg .ID )
r .out ("schema changed for %s\n" , msg .ID )
existing .MsgStruct = msg
c = existing
c .ParseSchema ()
} else {
r .Mach .Log ("client %s already exists, overriding" , msg .ID )
r .out ("client %s already exists, overriding\n" , msg .ID )
}
}
if c == nil {
r .out ("new client %s\n" , msg .ID )
c = &server .Client {
Id : msg .ID ,
ConnId : connId ,
SchemaHash : amhelp .SchemaHash (msg .States ),
Exportable : &server .Exportable {
MsgStruct : msg ,
},
}
c .Connected .Store (true )
r .dbgClients [msg .ID ] = c
}
r .Mach .Add1 (ssR .InitClient , am .A {"id" : msg .ID })
}
func (r *Relay ) DisconnectEventEnter (e *am .Event ) bool {
_ , ok := e .Args ["conn_id" ].(string )
if !ok {
r .Mach .Log ("Error: DisconnectEvent malformed\n" )
return false
}
return true
}
func (r *Relay ) DisconnectEventState (e *am .Event ) {
connID := e .Args ["conn_id" ].(string )
for _ , c := range r .dbgClients {
if c .ConnId != "" && c .ConnId == connID {
c .Connected .Store (false )
r .Mach .Log ("client %s disconnected" , c .Id )
r .out ("client %s disconnected\n" , c .Id )
break
}
}
if len (r .dbgClients ) == 0 {
if err := r .hExportData (); err != nil {
r .out ("Error: export failed %s\n" , err )
r .Mach .AddErr (err , nil )
}
}
}
func (r *Relay ) hExportData () error {
count := r .msgsCount ()
r .out ("exporting %d transitions for %d clients\n" , count , len (r .dbgClients ))
a := r .Args .RotateDbg
filename := a .Filename + "-" + time .Now ().Format ("2006-01-02T15-04-05" )
gobPath := filepath .Join (a .Dir , filename +".gob.br" )
fh , err := os .Create (gobPath )
if err != nil {
return err
}
defer func () {
r .Mach .AddErr (fh .Close (), nil )
}()
data := make ([]*server .Exportable , len (r .dbgClients ))
i := 0
for _ , c := range r .dbgClients {
data [i ] = c .Exportable
i ++
}
brCompress := brotli .NewWriter (fh )
defer func () {
r .Mach .AddErr (brCompress .Close (), nil )
}()
encoder := gob .NewEncoder (brCompress )
err = encoder .Encode (data )
if err != nil {
return err
}
for _ , c := range r .dbgClients {
c .Exportable .MsgTxs = nil
}
return nil
}
func (r *Relay ) msgsCount () int {
count := 0
for _ , c := range r .dbgClients {
count += len (c .MsgTxs )
}
return count
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .