package pubsubimport (pubsubamhelpam)const (// EnvAmPubsubLog enables machine logging for pubsub.EnvAmPubsubLog = "AM_PUBSUB_LOG"// EnvAmPubsubDbg exposes PubSub workers over dbg telemetry.EnvAmPubsubDbg = "AM_PUBSUB_DBG")var ss = states.TopicStates// ///// ///// /////// ///// ARGS// ///// ///// /////// TODO keep with states// PARTIALStypeMsgTypestringconst (MsgTypeUpdatesMsgType = "updates"MsgTypeByeMsgType = "bye"MsgTypeInfoMsgType = "info"MsgTypeReqInfoMsgType = "reqinfo"MsgTypeReqUpdatesMsgType = "requpdates"MsgTypeGossipMsgType = "gossip")typeMsgstruct { Type MsgType}type (Gossipsmap[int]uint64// PeerGossips is peer ID => machine time sum // TODO check local time sumPeerGossipsmap[string]Gossips)// Info is sent when a peer exposes state machines in the topic.typeInfostruct { Id string Schema am.Schema States am.S MTime am.Time Tags []string Parent string}type (MachInfomap[int]*InfoPeerInfomap[string]MachInfo)// Machine clocks indexed by a local peer index.type (MachClocksmap[int]am.TimePeerClocksmap[string]MachClocks)// WRAPPERS// MsgByeMach is sent when a peer un-exposes state machines in the topic.typeMsgByeMachstruct { PeerBye bool Id string MTime am.Time}typeMsgInfostruct {Msg// peer ID => mach idx => schema PeerInfo PeerInfo// Number of workers from each peer PeerGossips PeerGossips}typeMsgUpdatesstruct {Msg PeerClocks PeerClocks// TODO removed machs}typeMsgGossipstruct {Msg// Number of workers from each peer PeerGossips PeerGossips}typeMsgReqUpdatesstruct {Msg// TODO list specific mach indexes PeerIds []string}typeMsgReqInfostruct {Msg PeerIds []string}// TODO implementtypeMsgByestruct {Msg Machs map[int]MsgByeMach}// ///// ///// /////// ///// TRACER// ///// ///// /////typeTracerstruct { *am.TracerNoOp// This machine's clock has been updated and needs to be synced. dirty atomic.Bool}// TransitionEnd sends a message when a transition endsfunc ( *Tracer) ( *am.Transition) { .dirty.Store(true)}typeTopicOptsstruct {// Parent is a parent state machine for a new Topic state machine. See // [am.Opts]. Parent am.Api}// ///// ///// /////// ///// ARGS// ///// ///// /////constAPrefix = "am_pubsub"// A is a struct for node arguments. It's a typesafe alternative to [am.A].typeAstruct {// MsgInfo happens when a peer introduces it's exposed state machines. MsgInfo *MsgInfo MsgUpdates *MsgUpdates MsgBye *MsgBye MsgReqInfo *MsgReqInfo MsgReqUpdates *MsgReqUpdates MsgGossip *MsgGossip// Msgs is a list of PubSub messages. Msgs []*pubsub.Message// Length is a general length used for logging Length int`log:"length"`// Msg is a raw msg. Msg []byte// PeerId is a peer ID Peer string`log:"peer"` PeerId string PeerIds []string// MachId is a state machine ID MachId string`log:"mach_id"`// MTime is machine time MTime am.Time`log:"mtime"`// HTime is human time HTime time.Time// Addrs is a list of addresses. Addrs []multiaddr.Multiaddr`log:"addrs"`// NetMachs is a return channel for a list of [rpc.NetworkMachine]. It has to // be buffered or the mutation will fail. NetMachs chan<- []*rpc.NetworkMachine ListFilters *ListFilters MachClocks MachClocks MsgType string`log:"msg_type"` PeersGossip PeerGossips}// TODO merge with REPL, add tag-based queries (to find workers)typeListFiltersstruct { IdExact string IdPartial string IdRegexp *regexp.Regexp Parent string PeerId string// Level to traverse towards the tree root. DepthLevel int ChildrenMax int ChildrenMin int}// ParseArgs extracts A from [am.Event.Args][APrefix].func ( am.A) *A {if , := [APrefix].(*A); != nil {return }return &A{}}// Pass prepares [am.A] from A to pass to further mutations.func ( *A) am.A {returnam.A{APrefix: }}// LogArgs is an args logger for A and pubsub.A.func ( am.A) map[string]string { := ParseArgs()if == nil {returnnil }returnamhelp.ArgsToLogMap(, 0)}// ///// ///// /////// ///// ERRORS// ///// ///// /////// sentinel errorsvar (ErrJoining = errors.New("error joining")ErrListening = errors.New("error listening"))// error mutations// AddErrJoining wraps an error in the ErrJoining sentinel and adds to a// machine.func ( *am.Event, *am.Machine, error, am.A,) error { = fmt.Errorf("%w: %w", ErrJoining, ) .EvAddErrState(, ss.ErrJoining, , )return}// AddErrListening wraps an error in the ErrListening sentinel and adds to a// machine.func ( *am.Event, *am.Machine, error, am.A,) error { = fmt.Errorf("%w: %w", ErrListening, ) .EvAddErrState(, ss.ErrListening, , )return}// ///// ///// /////// ///// DISCOVERY// ///// ///// /////// discoveryNotifee gets notified when we find a new peer via mDNS discovery// type discoveryNotifee struct {// h host.Host// }//// // TODO enable// func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) {// fmt.Printf("discovered new peer %s", pi.ID.MutString())// err := n.h.Connect(context.Background(), pi)// if err != nil {// fmt.Printf("error connecting to peer %s: %s", pi.ID.MutString(), err)// }// }
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.