package pubsub
import (
"context"
"errors"
"fmt"
"maps"
"math/rand"
"os"
"slices"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/libp2p/go-libp2p"
ps "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
quicTransport "github.com/libp2p/go-libp2p/p2p/transport/quic"
ma "github.com/multiformats/go-multiaddr"
"github.com/vmihailenco/msgpack/v5"
"golang.org/x/sync/errgroup"
"github.com/pancsta/asyncmachine-go/internal/utils"
amhelp "github.com/pancsta/asyncmachine-go/pkg/helpers"
am "github.com/pancsta/asyncmachine-go/pkg/machine"
"github.com/pancsta/asyncmachine-go/pkg/pubsub/states"
udsTransport "github.com/pancsta/asyncmachine-go/pkg/pubsub/uds"
"github.com/pancsta/asyncmachine-go/pkg/rpc"
ssam "github.com/pancsta/asyncmachine-go/pkg/states"
)
type Topic struct {
*am .ExceptionHandler
T *ps .Topic
Mach *am .Machine
MachMetrics atomic .Pointer [am .Machine ]
HostToPeer map [string ]string
ListenAddrs []ma .Multiaddr
Addrs atomic .Pointer [[]ma .Multiaddr ]
Name string
ConnAddrs []ma .Multiaddr
ExposedMachs []*am .Machine
Debounce time .Time
LogEnabled bool
HeartbeatFreq time .Duration
MaxMsgsPerWin int
DebugWorkerTelemetry bool
ConnectionsToReady int
Multiplayer int
SendInfoDebounceMs int
MaxQueueLen uint16
GossipAmount int
TestSchema am .Schema
TestStates am .S
host host .Host
ps *ps .PubSub
sub *ps .Subscription
handler *ps .TopicEventHandler
workers map [string ]map [int ]*rpc .Worker
info map [string ]map [int ]*Info
pool *errgroup .Group
missingUpdates PeerGossips
tracers []*Tracer
pendingMachUpdates map [string ]map [int ]am .Time
missingPeers map [string ]struct {}
lastReqHello time .Time
lastReqUpdate time .Time
reqInfo map [string ]time .Time
reqUpdate map [string ]time .Time
retried bool
msgsWin int
msgsWinCount int
transportUds bool
}
func NewTopic (
ctx context .Context , name , suffix string , exposedMachs []*am .Machine ,
opts *TopicOpts ,
) (*Topic , error ) {
if opts == nil {
opts = &TopicOpts {}
}
t := &Topic {
Multiplayer : 5 ,
MaxQueueLen : 10 ,
GossipAmount : 5 ,
Name : name ,
ExposedMachs : exposedMachs ,
LogEnabled : os .Getenv (EnvAmPubsubLog ) != "" ,
DebugWorkerTelemetry : os .Getenv (EnvAmPubsubDbg ) != "" ,
HeartbeatFreq : time .Second ,
MaxMsgsPerWin : 10 ,
ConnectionsToReady : 5 ,
SendInfoDebounceMs : 500 ,
tracers : make ([]*Tracer , len (exposedMachs )),
workers : make (map [string ]map [int ]*rpc .Worker ),
pool : amhelp .Pool (10 ),
info : make (map [string ]map [int ]*Info ),
pendingMachUpdates : make (map [string ]map [int ]am .Time ),
missingPeers : make (map [string ]struct {}),
missingUpdates : make (PeerGossips ),
reqInfo : make (map [string ]time .Time ),
reqUpdate : make (map [string ]time .Time ),
}
for i , mach := range exposedMachs {
tracer := &Tracer {}
err := mach .BindTracer (tracer )
if err != nil {
return nil , err
}
t .tracers [i ] = tracer
}
if suffix == "" {
suffix = utils .RandId (2 )
}
mach , err := am .NewCommon (ctx , "ps-" +name +"-" +suffix , states .TopicSchema ,
ss .Names (), t , opts .Parent , &am .Opts {
Tags : []string {"pubsub:" + name },
})
if err != nil {
return nil , err
}
mach .SemLogger ().SetArgsMapper (LogArgs )
mach .SetGroups (states .TopicGroups , states .TopicStates )
t .Mach = mach
return t , nil
}
func (t *Topic ) ExceptionEnter (e *am .Event ) bool {
err := am .ParseArgs (e .Args ).Err
return !errors .Is (err , am .ErrEvalTimeout )
}
func (t *Topic ) ExceptionState (e *am .Event ) {
t .ExceptionHandler .ExceptionState (e )
args := am .ParseArgs (e .Args )
err := args .Err
target := args .TargetStates
if errors .Is (err , am .ErrHandlerTimeout ) &&
slices .Contains (target , ss .ErrJoining ) && !t .retried {
t .retried = true
t .Mach .EvRemove1 (e , ss .Exception , nil )
}
}
func (t *Topic ) ReadyEnter (e *am .Event ) bool {
return t .ConnCount () >= t .ConnectionsToReady
}
func (t *Topic ) StartEnter (e *am .Event ) bool {
return len (t .ListenAddrs ) > 0
}
func (t *Topic ) StartState (e *am .Event ) {
ctx := t .Mach .NewStateCtx (ss .Start )
go func () {
if ctx .Err () != nil {
return
}
t .Mach .PanicToErrState (ss .ErrJoining , nil )
var security libp2p .Option
transport := libp2p .Transport (quicTransport .NewTransport )
if t .transportUds {
transport = libp2p .Transport (udsTransport .NewUDSTransport )
security = libp2p .NoSecurity
} else {
privk , _ , err := crypto .GenerateKeyPair (crypto .Ed25519 , 0 )
if err != nil {
_ = AddErrJoining (e , t .Mach , err , nil )
return
}
security = libp2p .Identity (privk )
}
host , err := libp2p .New (transport ,
libp2p .ListenAddrs (t .ListenAddrs ...),
security ,
)
if ctx .Err () != nil {
return
}
if err != nil {
_ = AddErrJoining (e , t .Mach , err , nil )
return
}
t .host = host
pid := host .ID ().String ()
tags := t .Mach .Tags ()
tags = append (tags , "peer:" +pid )
t .Mach .SetTags (tags )
amhelp .MachDebugEnv (t .Mach )
gossip , err := ps .NewGossipSub (ctx , host )
if err != nil {
_ = AddErrJoining (e , t .Mach , err , nil )
return
}
t .ps = gossip
addrs , err := t .GetPeerAddrs ()
if err != nil {
_ = AddErrJoining (e , t .Mach , err , nil )
return
}
t .Addrs .Store (&addrs )
t .Mach .Log ("Listening on %s" , addrs )
t .Mach .EvAdd1 (e , ss .Connecting , nil )
t .Mach .EvAdd1 (e , ss .Started , Pass (&A {
PeerId : pid ,
Peer : t .peerName (pid ),
}))
if t .HeartbeatFreq == 0 {
return
}
tick := time .NewTicker (t .HeartbeatFreq * time .Duration (t .Multiplayer ))
defer tick .Stop ()
for {
select {
case <- ctx .Done ():
return
case <- tick .C :
t .Mach .EvRemove1 (e , ss .Heartbeat , nil )
t .Mach .EvAdd1 (e , ss .Heartbeat , nil )
}
}
}()
}
func (t *Topic ) StartEnd (e *am .Event ) {
if t .host != nil {
t .host .Close ()
}
t .host = nil
t .ps = nil
}
func (t *Topic ) ConnectingEnter (e *am .Event ) bool {
return len (t .ConnAddrs ) > 0
}
func (t *Topic ) ConnectingState (e *am .Event ) {
ctx := t .Mach .NewStateCtx (ss .Connecting )
go func () {
if ctx .Err () != nil {
return
}
g := errgroup .Group {}
g .SetLimit (10 )
for _ , addr := range t .ConnAddrs {
g .Go (func () error {
if ctx .Err () != nil {
return ctx .Err ()
}
infos , err := peer .AddrInfosFromP2pAddrs (addr )
if err != nil {
err := fmt .Errorf ("%w: %s" , err , addr )
_ = ssam .AddErrConnecting (e , t .Mach , err , nil )
return nil
}
for _ , addrInfo := range infos {
t .log ("Trying %s" , addrInfo )
err := t .host .Connect (ctx , addrInfo )
if err == nil {
return nil
}
}
return nil
})
}
_ = g .Wait ()
if ctx .Err () != nil {
return
}
if t .ConnCount () <= 0 {
err := errors .New ("failed to establish any connections" )
_ = ssam .AddErrConnecting (e , t .Mach , err , nil )
return
}
t .Mach .EvAdd1 (e , ss .Connected , nil )
}()
}
func (t *Topic ) JoiningState (e *am .Event ) {
ctx := t .Mach .NewStateCtx (ss .Joining )
go func () {
if ctx .Err () != nil {
return
}
topic , err := t .ps .Join (t .Name )
if ctx .Err () != nil {
return
}
if err != nil {
_ = AddErrJoining (e , t .Mach , err , nil )
return
}
t .T = topic
subscription , err := t .T .Subscribe (ps .WithBufferSize (1024 ))
if ctx .Err () != nil {
return
}
if err != nil {
_ = AddErrJoining (e , t .Mach , err , nil )
return
}
t .sub = subscription
t .Mach .EvAdd1 (e , ss .Joined , nil )
}()
}
func (t *Topic ) ProcessMsgsState (e *am .Event ) {
mach := t .Mach
for _ , psMsg := range ParseArgs (e .Args ).Msgs {
if psMsg == nil {
continue
}
fromId := psMsg .GetFrom ().String ()
var base Msg
if err := msgpack .Unmarshal (psMsg .Data , &base ); err != nil {
mach .Add1 (ss .MsgReceived , Pass (&A {
Msgs : []*ps .Message {psMsg },
Length : 1 ,
}))
} else {
switch base .Type {
case MsgTypeInfo :
var msg MsgInfo
if err := msgpack .Unmarshal (psMsg .Data , &msg ); err == nil {
mach .Add1 (ss .MsgInfo , Pass (&A {
MsgInfo : &msg ,
PeerId : fromId ,
Peer : t .peerName (fromId ),
}))
amhelp .AskAdd1 (mach , ss .MissPeersByGossip , Pass (&A {
PeersGossip : msg .PeerGossips ,
PeerId : fromId ,
Peer : t .peerName (fromId ),
}))
amhelp .AskAdd1 (mach , ss .MissUpdatesByGossip , Pass (&A {
PeersGossip : msg .PeerGossips ,
PeerId : fromId ,
}))
} else {
mach .EvAddErr (e , err , nil )
}
case MsgTypeBye :
var msg MsgBye
if err := msgpack .Unmarshal (psMsg .Data , &msg ); err == nil {
mach .Add1 (ss .MsgBye , Pass (&A {
MsgBye : &msg ,
PeerId : fromId ,
}))
}
case MsgTypeUpdates :
var msg MsgUpdates
if err := msgpack .Unmarshal (psMsg .Data , &msg ); err == nil {
mach .Add1 (ss .MsgUpdates , Pass (&A {
MsgUpdates : &msg ,
PeerId : fromId ,
}))
} else {
mach .EvAddErr (e , err , nil )
}
case MsgTypeGossip :
var msg MsgGossip
if err := msgpack .Unmarshal (psMsg .Data , &msg ); err == nil {
amhelp .AskAdd1 (mach , ss .MissPeersByGossip , Pass (&A {
PeersGossip : msg .PeerGossips ,
PeerId : fromId ,
}))
amhelp .AskAdd1 (mach , ss .MissUpdatesByGossip , Pass (&A {
PeersGossip : msg .PeerGossips ,
PeerId : fromId ,
}))
} else {
mach .EvAddErr (e , err , nil )
}
case MsgTypeReqInfo :
var msg MsgReqInfo
if err := msgpack .Unmarshal (psMsg .Data , &msg ); err == nil {
amhelp .AskAdd1 (mach , ss .MsgReqInfo , Pass (&A {
MsgReqInfo : &msg ,
PeerId : fromId ,
Peer : t .peerName (fromId ),
HTime : time .Now (),
}))
} else {
mach .EvAddErr (e , err , nil )
}
case MsgTypeReqUpdates :
var msg MsgReqUpdates
if err := msgpack .Unmarshal (psMsg .Data , &msg ); err == nil {
amhelp .AskAdd1 (mach , ss .MsgReqUpdates , Pass (&A {
MsgReqUpdates : &msg ,
PeerId : fromId ,
Peer : t .peerName (fromId ),
}))
} else {
mach .EvAddErr (e , err , nil )
}
default :
err := fmt .Errorf ("unsupported msg type: %s" , base .Type )
_ = AddErrListening (nil , mach , err , nil )
}
}
}
}
func (t *Topic ) JoinedState (e *am .Event ) {
mach := t .Mach
ctx := mach .NewStateCtx (ss .Joined )
self := t .host .ID ().String ()
t .retried = false
if !e .IsValid () {
return
}
bufSize := 50
msgs := make ([]*ps .Message , bufSize )
msgsMx := sync .Mutex {}
msgsI := 0
go func () {
for {
psMsg , err := t .sub .Next (ctx )
if ctx .Err () != nil {
return
}
if err != nil {
_ = AddErrListening (e , mach , err , nil )
continue
}
fromId := psMsg .GetFrom ().String ()
if fromId == self {
continue
}
if mach .QueueLen () > t .MaxQueueLen {
continue
}
msgsMx .Lock ()
if msgsI < bufSize {
msgs [msgsI ] = psMsg
msgsI ++
msgsMx .Unlock ()
continue
}
mach .Add1 (ss .ProcessMsgs , Pass (&A {
Msgs : msgs ,
Length : msgsI ,
}))
msgs = make ([]*ps .Message , bufSize )
msgsI = 0
msgsMx .Unlock ()
}
}()
go func () {
if ctx .Err () != nil {
return
}
t := time .NewTicker (1 * time .Second )
defer t .Stop ()
for {
select {
case <- ctx .Done ():
case <- t .C :
msgsMx .Lock ()
if msgsI > 0 {
mach .Add1 (ss .ProcessMsgs , Pass (&A {
Msgs : msgs ,
Length : msgsI ,
}))
msgs = make ([]*ps .Message , bufSize )
msgsI = 0
}
msgsMx .Unlock ()
}
}
}()
go func () {
if ctx .Err () != nil {
return
}
handler , err := t .T .EventHandler ()
if err != nil {
_ = AddErrJoining (nil , mach , err , nil )
return
}
t .handler = handler
for {
pEv , err := t .handler .NextPeerEvent (ctx )
if ctx .Err () != nil {
return
}
if err != nil {
_ = AddErrListening (nil , mach , err , nil )
continue
}
fromId := pEv .Peer .String ()
switch pEv .Type {
case ps .PeerJoin :
mach .Add1 (ss .PeerJoined , Pass (&A {
PeerId : fromId ,
Peer : t .peerName (fromId ),
}))
case ps .PeerLeave :
mach .Add1 (ss .PeerLeft , Pass (&A {
PeerId : fromId ,
Peer : t .peerName (fromId ),
}))
}
}
}()
if len (t .ExposedMachs ) > 0 {
go func () {
minDelay := 1
maxDelay := 30
delay := minDelay + rand .Intn (maxDelay -minDelay )
if !amhelp .Wait (ctx , time .Duration (delay *100 *int (time .Millisecond ))) {
return
}
mach .Add1 (ss .SendInfo , Pass (&A {
PeerIds : []string {t .host .ID ().String ()},
}))
}()
}
}
func (t *Topic ) JoinedEnd (e *am .Event ) {
if t .handler != nil {
t .handler .Cancel ()
}
}
func (t *Topic ) PeerLeftEnter (e *am .Event ) bool {
return ParseArgs (e .Args ).PeerId != ""
}
func (t *Topic ) MsgInfoEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
if a == nil || a .MsgInfo == nil || a .PeerId == "" {
return false
}
for peerId , machs := range a .MsgInfo .PeerInfo {
if _ , ok := t .missingPeers [peerId ]; ok {
return true
}
if _ , ok := t .workers [peerId ]; !ok {
return true
}
for machIdx := range machs {
if _ , ok := t .workers [peerId ][machIdx ]; !ok {
t .log ("Missing mach %d for peer %s" , machIdx , peerId )
return true
}
}
}
return false
}
func (t *Topic ) MsgInfoState (e *am .Event ) {
ctx := t .Mach .NewStateCtx (ss .Start )
args := ParseArgs (e .Args )
added := 0
self := t .host .ID ().String ()
for peerId , machs := range args .MsgInfo .PeerInfo {
if peerId == self {
continue
}
if _ , ok := t .workers [peerId ]; !ok {
t .workers [peerId ] = make (map [int ]*rpc .Worker )
t .info [peerId ] = make (map [int ]*Info )
}
if _ , ok := t .missingPeers [peerId ]; ok {
delete (t .missingPeers , peerId )
t .Mach .EvRemove1 (e , ss .MissPeersByGossip , nil )
}
t .metric ("Info" , peerId )
t .metric ("Peer" , "" )
for machIdx , info := range machs {
if w , ok := t .workers [peerId ][machIdx ]; ok {
if info .MTime .Sum (nil ) > w .Time (nil ).Sum (nil ) {
w .InternalUpdateClock (info .MTime , 0 , true )
}
continue
}
tags := []string {"pubsub-worker" , "src-id:" + info .Id }
id := "ps-" + info .Id + "-" + utils .RandId (4 )
schema := info .Schema
if t .TestSchema != nil {
schema = t .TestSchema
}
names := info .States
if t .TestStates != nil {
names = t .TestStates
}
worker , err := rpc .NewWorker (ctx , id , nil , schema , names , t .Mach , tags )
if err != nil {
t .Mach .EvAddErr (e , err , nil )
continue
}
if t .DebugWorkerTelemetry {
if t .peerName (peerId ) == "P5" {
amhelp .MachDebugEnv (worker )
}
}
if mtime , ok := t .pendingMachUpdates [peerId ][machIdx ]; ok {
worker .InternalUpdateClock (mtime , 0 , true )
delete (t .pendingMachUpdates [peerId ], machIdx )
if len (t .pendingMachUpdates [peerId ]) == 0 {
delete (t .pendingMachUpdates , peerId )
}
t .Mach .EvRemove1 (e , ss .MissPeersByUpdates , nil )
} else {
worker .InternalUpdateClock (info .MTime , 0 , true )
}
t .workers [peerId ][machIdx ] = worker
t .info [peerId ][machIdx ] = info
added ++
}
}
fromPeerId := args .PeerId
if _ , ok := t .workers [fromPeerId ]; !ok {
t .missingPeers [fromPeerId ] = struct {}{}
t .metric ("Gossip" , fromPeerId )
}
if added > 0 {
t .log ("Added %d new workers (total %d)" , added , t .workersCount ())
t .log ("Known peers == %d (missing == %d)" ,
len (t .workers ), len (t .missingPeers ))
for pid := range t .missingPeers {
name := t .peerName (pid )
if name == "" {
name = pid
}
t .log ("Missing example: %s" , name )
break
}
}
}
func (t *Topic ) MissPeersByGossipEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
if a == nil || a .PeersGossip == nil {
return false
}
self := t .host .ID ().String ()
for peerId , machs := range a .PeersGossip {
if peerId == self {
continue
}
if _ , ok := t .missingPeers [peerId ]; ok {
continue
}
if _ , ok := t .workers [peerId ]; !ok || len (machs ) != len (t .workers [peerId ]) {
return true
}
}
return false
}
func (t *Topic ) MissPeersByGossipState (e *am .Event ) {
args := ParseArgs (e .Args )
self := t .host .ID ().String ()
for peerId := range args .PeersGossip {
if peerId == self {
continue
}
if _ , ok := t .missingPeers [peerId ]; ok {
continue
}
if _ , ok := t .workers [peerId ]; !ok {
t .missingPeers [peerId ] = struct {}{}
name := t .peerName (peerId )
if name == "" {
name = peerId
}
t .log ("New missing %s" , name )
t .log ("Known: %d; Missing: %d" ,
len (t .workers ), len (t .missingPeers ))
}
}
}
func (t *Topic ) MissPeersByGossipExit (e *am .Event ) bool {
return len (t .missingPeers ) == 0
}
func (t *Topic ) MissUpdatesByGossipEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
if a == nil || a .PeersGossip == nil {
return false
}
self := t .host .ID ().String ()
for peerId , machs := range a .PeersGossip {
if peerId == self {
continue
}
if _ , ok := t .workers [peerId ]; !ok {
continue
}
for machIdx , mtime := range machs {
m , ok := t .workers [peerId ][machIdx ]
if !ok {
continue
}
if mtime > m .Time (nil ).Sum (nil ) {
return true
}
}
}
return false
}
func (t *Topic ) MissUpdatesByGossipState (e *am .Event ) {
args := ParseArgs (e .Args )
self := t .host .ID ().String ()
for peerId , machs := range args .PeersGossip {
if peerId == self {
continue
}
if _ , ok := t .workers [peerId ]; !ok {
continue
}
if _ , ok := t .missingUpdates [peerId ]; !ok {
t .missingUpdates [peerId ] = make (Gossips )
}
for machIdx , mtime := range machs {
m , ok := t .workers [peerId ][machIdx ]
if !ok {
t .missingUpdates [peerId ][machIdx ] = mtime
t .metric ("Gossip" , peerId )
continue
}
if mtime > m .Time (nil ).Sum (nil ) {
t .missingUpdates [peerId ][machIdx ] = mtime
t .metric ("Gossip" , peerId )
}
}
if len (t .missingUpdates [peerId ]) == 0 {
delete (t .missingUpdates , peerId )
}
}
}
func (t *Topic ) MissUpdatesByGossipExit (e *am .Event ) bool {
return len (t .missingUpdates ) == 0
}
func (t *Topic ) PeerJoinedEnter (e *am .Event ) bool {
return ParseArgs (e .Args ).PeerId != ""
}
func (t *Topic ) PeerJoinedState (e *am .Event ) {
t .Mach .EvRemove1 (e , ss .PeerJoined , nil )
args := ParseArgs (e .Args )
peerId := args .PeerId
t .missingPeers [peerId ] = struct {}{}
if t .Mach .QueueLen () > t .MaxQueueLen {
return
}
t .Mach .EvAdd1 (e , ss .SendInfo , Pass (&A {
PeerIds : []string {t .host .ID ().String ()},
}))
}
func (t *Topic ) MsgByeEnter (e *am .Event ) bool {
return ParseArgs (e .Args ).MsgBye != nil
}
func (t *Topic ) MsgReceivedEnter (e *am .Event ) bool {
return ParseArgs (e .Args ).Msgs != nil
}
func (t *Topic ) SendMsgEnter (e *am .Event ) bool {
win := time .Now ().Second () / t .Multiplayer
if t .msgsWin != win {
t .msgsWin = win
t .msgsWinCount = 0
}
t .msgsWinCount ++
if t .msgsWinCount > t .MaxMsgsPerWin {
t .log ("Too many messages in last time window, dropping.." )
return false
}
a := ParseArgs (e .Args )
return a != nil && len (a .Msg ) > 0
}
func (t *Topic ) SendMsgState (e *am .Event ) {
ctx := t .Mach .NewStateCtx (ss .Start )
args := ParseArgs (e .Args )
msg := args .Msg
switch args .MsgType {
case string (MsgTypeReqInfo ):
t .lastReqHello = time .Now ()
case string (MsgTypeReqUpdates ):
t .lastReqUpdate = time .Now ()
}
if !e .IsValid () {
return
}
t .pool .Go (func () error {
if ctx .Err () != nil {
return nil
}
err := t .T .Publish (ctx , msg )
if ctx .Err () != nil {
return nil
}
t .Mach .EvAddErr (e , err , nil )
return nil
})
}
func (t *Topic ) SendInfoEnter (e *am .Event ) bool {
return len (ParseArgs (e .Args ).PeerIds ) > 0
}
func (t *Topic ) SendInfoState (e *am .Event ) {
ctx := t .Mach .NewStateCtx (ss .SendInfo )
args := ParseArgs (e .Args )
debounce := t .SendInfoDebounceMs * t .Multiplayer
debounceMax := debounce * 2
delay := time .Duration (rand .Intn (debounceMax )) * time .Millisecond
if !e .IsValid () {
t .Mach .EvRemove1 (e , ss .SendInfo , nil )
return
}
go func () {
defer t .Mach .EvRemove1 (e , ss .SendInfo , nil )
if !amhelp .Wait (ctx , delay ) {
return
}
t .Mach .EvAdd1 (e , ss .DoSendInfo , Pass (&A {
PeerIds : args .PeerIds ,
}))
}()
}
func (t *Topic ) MsgUpdatesEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a != nil && a .MsgUpdates != nil && a .PeerId != ""
}
func (t *Topic ) MsgUpdatesState (e *am .Event ) {
args := ParseArgs (e .Args )
self := t .host .ID ().String ()
for peerId , machs := range args .MsgUpdates .PeerClocks {
if peerId == self {
continue
}
workers , ok := t .workers [peerId ]
if !ok {
t .Mach .EvAdd1 (e , ss .MissPeersByUpdates , Pass (&A {
MachClocks : machs ,
PeerId : peerId ,
Peer : t .peerName (peerId ),
}))
continue
}
missing := t .missingUpdates [peerId ]
t .metric ("Update" , peerId )
for machIdx , mtime := range machs {
w , ok := workers [machIdx ]
if !ok {
t .log ("worker %s:%d not found, delaying.." , peerId , machIdx )
continue
}
if mtime .Sum (nil ) > w .Time (nil ).Sum (nil ) {
w .InternalUpdateClock (mtime , 0 , true )
}
if missing == nil {
continue
}
if missingMTime , ok := missing [machIdx ]; ok &&
missingMTime <= mtime .Sum (nil ) {
delete (missing , machIdx )
if len (missing ) == 0 {
delete (t .missingUpdates , peerId )
}
}
}
}
fromPeerId := args .PeerId
if _ , ok := t .workers [fromPeerId ]; !ok {
t .missingPeers [fromPeerId ] = struct {}{}
}
}
func (t *Topic ) MsgReqInfoEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
if a == nil || a .MsgReqInfo == nil || a .PeerId == "" {
return false
}
wCount := len (t .workers )
self := t .host .ID ().String ()
for _ , peerId := range a .MsgReqInfo .PeerIds {
if _ , ok := t .workers [peerId ]; ok || peerId == self {
return wCount == 0 || rand .Intn (wCount ) != 0
}
}
return false
}
func (t *Topic ) MsgReqInfoState (e *am .Event ) {
args := ParseArgs (e .Args )
self := t .host .ID ().String ()
pids := []string {}
for _ , peerId := range args .MsgReqInfo .PeerIds {
if _ , ok := t .workers [peerId ]; ok || peerId == self {
pids = append (pids , peerId )
}
t .reqInfo [peerId ] = time .Now ()
}
fromPeerId := args .PeerId
if _ , ok := t .workers [fromPeerId ]; !ok {
t .missingPeers [fromPeerId ] = struct {}{}
}
t .Mach .Add1 (ss .SendInfo , Pass (&A {
PeerIds : pids ,
}))
}
func (t *Topic ) MsgReqUpdatesEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
if a == nil || a .MsgReqInfo == nil || a .PeerId == "" {
return false
}
self := t .host .ID ().String ()
for _ , peerId := range a .MsgReqInfo .PeerIds {
if _ , ok := t .workers [peerId ]; ok || peerId == self {
return rand .Intn (len (t .workers )) != 0
}
}
return false
}
func (t *Topic ) MsgReqUpdatesState (e *am .Event ) {
args := ParseArgs (e .Args )
ctx := t .Mach .NewStateCtx (ss .Start )
self := t .host .ID ().String ()
update := &MsgUpdates {
Msg : Msg {MsgTypeUpdates },
PeerClocks : make (map [string ]MachClocks ),
}
for _ , peerId := range args .MsgReqUpdates .PeerIds {
if peerId == self {
update .PeerClocks [peerId ] = make (MachClocks )
for i , mach := range t .ExposedMachs {
update .PeerClocks [peerId ][i ] = mach .Time (nil )
}
}
t .reqUpdate [peerId ] = time .Now ()
if machs , ok := t .workers [peerId ]; ok {
update .PeerClocks [peerId ] = make (MachClocks )
for i , mach := range machs {
update .PeerClocks [peerId ][i ] = mach .Time (nil )
}
}
}
fromPeerId := args .PeerId
if _ , ok := t .workers [fromPeerId ]; !ok {
t .missingPeers [fromPeerId ] = struct {}{}
}
if len (update .PeerClocks ) <= 0 {
return
}
if !e .IsValid () {
return
}
t .pool .Go (func () error {
if ctx .Err () != nil {
return nil
}
encoded , err := msgpack .Marshal (update )
if err != nil {
t .Mach .EvAddErr (e , err , nil )
return nil
}
t .Mach .EvAdd1 (e , ss .SendMsg , Pass (&A {
Msg : encoded ,
MsgType : string (MsgTypeUpdates ),
}))
return nil
})
}
func (t *Topic ) MissPeersByUpdatesEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a != nil && a .MachClocks != nil && a .PeerId != ""
}
func (t *Topic ) MissPeersByUpdatesState (e *am .Event ) {
args := ParseArgs (e .Args )
pending := args .MachClocks
peerId := args .PeerId
_ , ok := t .pendingMachUpdates [peerId ]
if !ok {
t .pendingMachUpdates [peerId ] = pending
t .metric ("Gossip" , peerId )
return
}
for machIdx , mtimeNew := range pending {
if mtimeOld , ok := t .pendingMachUpdates [peerId ][machIdx ]; ok {
if mtimeNew .Sum (nil ) > mtimeOld .Sum (nil ) {
t .pendingMachUpdates [peerId ][machIdx ] = mtimeNew
t .metric ("Gossip" , peerId )
}
} else {
t .pendingMachUpdates [peerId ][machIdx ] = mtimeNew
t .metric ("Gossip" , peerId )
}
}
}
func (t *Topic ) MissPeersByUpdatesExit (e *am .Event ) bool {
return len (t .pendingMachUpdates ) == 0
}
func (t *Topic ) ListMachinesEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return a != nil && a .WorkersCh != nil &&
cap (a .WorkersCh ) > 0
}
func (t *Topic ) ListMachinesState (e *am .Event ) {
t .Mach .EvRemove1 (e , ss .ListMachines , nil )
args := ParseArgs (e .Args )
filters := args .ListFilters
if filters == nil {
filters = &ListFilters {}
}
retCh := args .WorkersCh
ret := make ([]*rpc .Worker , 0 )
for peerId , workers := range t .workers {
for _ , w := range workers {
if filters .IdExact != "" && w .Id () != filters .IdExact {
continue
}
if filters .IdRegexp != nil && !filters .IdRegexp .MatchString (w .Id ()) {
continue
}
if filters .IdPartial != "" &&
!strings .Contains (w .Id (), filters .IdPartial ) {
continue
}
if filters .PeerId != "" && peerId != filters .PeerId {
continue
}
if filters .Parent != "" && w .ParentId () != filters .Parent {
continue
}
ret = append (ret , w )
}
}
t .log ("listing %d workers" , len (ret ))
retCh <- ret
}
func (t *Topic ) HeartbeatState (e *am .Event ) {
mach := t .Mach
mach .EvRemove1 (e , ss .Heartbeat , nil )
mach .EvRemove1 (e , ss .MissPeersByGossip , nil )
mach .EvRemove1 (e , ss .MissPeersByUpdates , nil )
delegated := am .S {ss .SendGossips , ss .ReqMissingUpdates , ss .ReqMissingPeers }
for _ , state := range delegated {
if mach .Not1 (state ) && !mach .WillBe1 (state ) {
mach .EvAdd1 (e , state , nil )
}
}
if mach .Is1 (ss .SendUpdates ) || mach .WillBe1 (ss .SendUpdates ) {
return
}
clocks := make (MachClocks )
for i , mach := range t .ExposedMachs {
if !t .tracers [i ].dirty .Load () {
continue
}
mtime := mach .Time (nil )
t .tracers [i ].dirty .Store (false )
clocks [i ] = mtime
}
mach .EvAdd1 (e , ss .SendUpdates , Pass (&A {
MachClocks : clocks ,
}))
}
func (t *Topic ) SendUpdatesEnter (e *am .Event ) bool {
a := ParseArgs (e .Args )
return len (a .MachClocks ) > 0
}
func (t *Topic ) SendUpdatesState (e *am .Event ) {
defer t .Mach .EvRemove1 (e , ss .SendUpdates , nil )
clocks := ParseArgs (e .Args ).MachClocks
self := t .host .ID ().String ()
update := &MsgUpdates {
Msg : Msg {MsgTypeUpdates },
PeerClocks : make (map [string ]MachClocks ),
}
update .PeerClocks [self ] = clocks
if !e .IsValid () {
return
}
encoded , err := msgpack .Marshal (update )
if err != nil {
t .Mach .EvAddErr (e , err , nil )
return
}
t .Mach .EvAdd1 (e , ss .SendMsg , Pass (&A {
Msg : encoded ,
MsgType : string (MsgTypeUpdates ),
}))
}
func (t *Topic ) SendGossipsEnter (e *am .Event ) bool {
if len (t .workers ) == 0 {
return false
}
if rand .Intn (len (t .workers )) != 0 {
return false
}
return true
}
func (t *Topic ) SendGossipsState (e *am .Event ) {
ctx := t .Mach .NewStateCtx (ss .SendGossips )
allPids := slices .Collect (maps .Keys (t .workers ))
sendPids := PeerGossips {}
for range t .GossipAmount {
pid := allPids [rand .Intn (len (allPids ))]
if len (t .workers [pid ]) == 0 {
continue
}
sums := map [int ]uint64 {}
for i , w := range t .workers [pid ] {
sums [i ] = w .Time (nil ).Sum (nil )
}
sendPids [pid ] = sums
}
if !e .IsValid () {
t .Mach .EvRemove1 (e , ss .SendGossips , nil )
return
}
go func () {
defer t .Mach .EvRemove1 (e , ss .SendGossips , nil )
if ctx .Err () != nil {
return
}
randPeers := &MsgGossip {
Msg : Msg {MsgTypeGossip },
PeerGossips : sendPids ,
}
encoded , err := msgpack .Marshal (randPeers )
if err != nil {
t .Mach .EvAddErr (e , err , nil )
return
}
t .Mach .EvAdd1 (e , ss .SendMsg , Pass (&A {
Msg : encoded ,
MsgType : string (MsgTypeGossip ),
}))
}()
}
var reqMissPeersFreq = time .Second * 5
func (t *Topic ) ReqMissingPeersEnter (e *am .Event ) bool {
if len (t .missingPeers ) == 0 && len (t .pendingMachUpdates ) == 0 {
return false
}
if time .Since (t .lastReqHello ) <= reqMissPeersFreq {
return false
}
return true
}
func (t *Topic ) ReqMissingPeersState (e *am .Event ) {
mach := t .Mach
ctx := mach .NewStateCtx (ss .ReqMissingPeers )
amount := 5
maxTries := 15
reqHelloFreq := time .Second * 5 * time .Duration (t .Multiplayer )
reqPids := []string {}
pids := slices .Concat (
slices .Collect (maps .Keys (t .missingPeers )),
slices .Collect (maps .Keys (t .pendingMachUpdates )))
for i := 0 ; i < maxTries && len (reqPids ) < amount ; i ++ {
pid := pids [rand .Intn (len (pids ))]
if slices .Contains (reqPids , pid ) {
continue
}
if t , ok := t .reqInfo [pid ]; ok && time .Since (t ) < reqHelloFreq {
continue
}
reqPids = append (reqPids , pid )
t .reqInfo [pid ] = time .Now ()
}
if len (reqPids ) <= 0 {
mach .EvRemove1 (e , ss .ReqMissingPeers , nil )
return
}
t .log ("missing peers: %d" , len (reqPids ))
if !e .IsValid () {
mach .EvRemove1 (e , ss .ReqMissingPeers , nil )
return
}
t .pool .Go (func () error {
defer mach .EvRemove1 (e , ss .ReqMissingPeers , nil )
if ctx .Err () != nil {
return nil
}
reqHello := &MsgReqInfo {
Msg : Msg {MsgTypeReqInfo },
PeerIds : reqPids ,
}
encoded , err := msgpack .Marshal (reqHello )
if err != nil {
mach .EvAddErr (e , err , nil )
return nil
}
mach .EvAdd1 (e , ss .SendMsg , Pass (&A {
Msg : encoded ,
MsgType : string (reqHello .Type ),
PeerId : reqPids [0 ],
Peer : t .peerName (reqPids [0 ]),
}))
return nil
})
}
var reqMissUpdatesFreq = time .Second * 5
func (t *Topic ) ReqMissingUpdatesEnter (e *am .Event ) bool {
if len (t .missingUpdates ) == 0 {
return false
}
if time .Since (t .lastReqUpdate ) <= reqMissUpdatesFreq {
return false
}
return true
}
func (t *Topic ) ReqMissingUpdatesState (e *am .Event ) {
ctx := t .Mach .NewStateCtx (ss .ReqMissingUpdates )
amount := 5
maxTries := 15
reqUpdateFreq := time .Second * time .Duration (5 *t .Multiplayer )
reqPids := []string {}
pids := slices .Collect (maps .Keys (t .missingUpdates ))
for i := 0 ; i < maxTries && len (reqPids ) < amount ; i ++ {
pid := pids [rand .Intn (len (pids ))]
if slices .Contains (reqPids , pid ) {
continue
}
if t , ok := t .reqUpdate [pid ]; ok && time .Since (t ) < reqUpdateFreq {
continue
}
reqPids = append (reqPids , pid )
t .reqUpdate [pid ] = time .Now ()
}
if len (reqPids ) <= 0 {
t .Mach .EvRemove1 (e , ss .ReqMissingUpdates , nil )
return
}
if !e .IsValid () {
t .Mach .EvRemove1 (e , ss .ReqMissingUpdates , nil )
return
}
t .pool .Go (func () error {
defer t .Mach .EvRemove1 (e , ss .ReqMissingUpdates , nil )
if ctx .Err () != nil {
return nil
}
reqUpdate := &MsgReqUpdates {
Msg : Msg {MsgTypeReqUpdates },
PeerIds : reqPids ,
}
encoded , err := msgpack .Marshal (reqUpdate )
if err != nil {
t .Mach .EvAddErr (e , err , nil )
return nil
}
t .Mach .EvAdd1 (e , ss .SendMsg , Pass (&A {
Msg : encoded ,
MsgType : string (reqUpdate .Type ),
PeerId : reqPids [0 ],
Peer : t .peerName (reqPids [0 ]),
}))
return nil
})
}
func (t *Topic ) DoSendInfoEnter (e *am .Event ) bool {
return len (ParseArgs (e .Args ).PeerIds ) > 0
}
func (t *Topic ) DoSendInfoState (e *am .Event ) {
t .Mach .EvRemove1 (e , ss .DoSendInfo , nil )
args := ParseArgs (e .Args )
peerIds := args .PeerIds
gossips := PeerGossips {}
self := t .host .ID ().String ()
exposed := slices .Clone (t .ExposedMachs )
for range min (t .GossipAmount , len (t .workers )) {
ids := slices .Collect (maps .Keys (t .workers ))
pid := ids [rand .Intn (len (t .workers ))]
if len (t .workers [pid ]) == 0 {
continue
}
sums := map [int ]uint64 {}
for i , w := range t .workers [pid ] {
sums [i ] = w .Time (nil ).Sum (nil )
}
gossips [pid ] = sums
}
if !e .IsValid () {
return
}
msg := &MsgInfo {
Msg : Msg {MsgTypeInfo },
PeerInfo : PeerInfo {},
PeerGossips : gossips ,
}
for _ , peerId := range peerIds {
machs := MachInfo {}
if peerId == self {
for machIdx , mach := range exposed {
var schema am .Schema
if t .TestSchema == nil {
schema = mach .Schema ()
}
var names am .S
if t .TestStates == nil {
names = mach .StateNames ()
}
machs [machIdx ] = &Info {
Id : mach .Id (),
Schema : schema ,
States : names ,
MTime : mach .Time (nil ),
Tags : mach .Tags (),
Parent : mach .ParentId (),
}
}
} else {
if _ , ok := t .info [peerId ]; !ok {
continue
}
machs = t .info [peerId ]
}
msg .PeerInfo [peerId ] = machs
}
encoded , err := msgpack .Marshal (msg )
if err != nil {
t .Mach .EvAddErr (e , err , nil )
return
}
t .Mach .EvAdd1 (e , ss .SendMsg , Pass (&A {
Msg : encoded ,
MsgType : string (MsgTypeInfo ),
}))
}
func (t *Topic ) Start () am .Result {
return t .Mach .Add1 (ss .Start , nil )
}
func (t *Topic ) ConnCount () int {
return t .host .ConnManager ().(*connmgr .BasicConnMgr ).GetInfo ().ConnCount
}
func (t *Topic ) Join () am .Result {
return t .Mach .Add1 (ss .Joining , nil )
}
func (t *Topic ) StartAndJoin (ctx context .Context ) am .Result {
if t .Mach .Add1 (ss .Start , nil ) == am .Canceled {
return am .Canceled
}
err := amhelp .WaitForAll (ctx , time .Second , t .Mach .When1 (ss .Connected , ctx ))
if ctx .Err () != nil {
return am .Canceled
}
if err != nil {
return am .Canceled
}
return t .Join ()
}
func (t *Topic ) Dispose () am .Result {
return t .Mach .Add1 (ss .Disposing , nil )
}
func (t *Topic ) GetPeerAddrs () ([]ma .Multiaddr , error ) {
if t .Mach .Not1 (ss .Start ) {
return nil , fmt .Errorf ("%w: %s" , am .ErrStateInactive , ss .Start )
}
h := t .host
peerID := h .ID ()
addrs := h .Addrs ()
if len (addrs ) == 0 {
return nil , errors .New ("no listen addresses available" )
}
peerAddrs := make ([]ma .Multiaddr , len (addrs ))
for i , addr := range addrs {
peerAddrs [i ] = addr .Encapsulate (ma .StringCast ("/p2p/" + peerID .String ()))
}
return peerAddrs , nil
}
func (t *Topic ) workersCount () int {
ret := 0
for _ , workers := range t .workers {
ret += len (workers )
}
return ret
}
func (t *Topic ) log (msg string , args ...any ) {
if !t .LogEnabled {
return
}
t .Mach .Log (msg , args ...)
}
func (t *Topic ) peerName (pid string ) string {
if name , ok := t .HostToPeer [pid ]; ok {
return name
}
return ""
}
func (t *Topic ) metric (msg , host string ) bool {
metric := t .MachMetrics .Load ()
if metric == nil {
return false
}
key := msg + host
if name := t .peerName (host ); name != "" {
key = name + msg
}
if !metric .Has1 (key ) {
return false
}
metric .Add1 (key , nil )
return true
}
func (t *Topic ) syncMetrics () {
metric := t .MachMetrics .Load ()
if metric == nil {
return
}
add := func (state string ) {
if !metric .Has1 (state ) {
return
}
metric .Add1 (state , nil )
}
t .Mach .Eval ("syncMetrics" , func () {
state := ""
for hostId := range t .missingUpdates {
state = "Gossip" + hostId
if id , ok := t .HostToPeer [hostId ]; ok {
state = id + "Gossip"
}
add (state )
}
for hostId := range t .pendingMachUpdates {
state = "Gossip" + hostId
if id , ok := t .HostToPeer [hostId ]; ok {
state = id + "Gossip"
}
add (state )
}
for hostId := range t .missingPeers {
state = "Gossip" + hostId
if id , ok := t .HostToPeer [hostId ]; ok {
state = id + "Gossip"
}
add (state )
}
for hostId := range t .info {
state = "Info" + hostId
if id , ok := t .HostToPeer [hostId ]; ok {
state = id + "Info"
}
add (state )
add ("Peer" )
}
}, nil )
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .