package pubsub

Import Path
	/pkg/pubsub (on go.dev)

Dependency Relation
	imports 29 packages, and imported by 0 packages

Involved Source Files pubsub.go topic.go
Package-Level Type Names (total 21)
/* sort by: | */
A is a struct for node arguments. It's a typesafe alternative to [am.A]. Addrs is a list of addresses. HTime is human time Length is a general length used for logging ListFilters *ListFilters MTime is machine time MachClocks MachClocks MachId is a state machine ID Msg is a raw msg. MsgBye *MsgBye MsgGossip *MsgGossip MsgInfo happens when a peer introduces it's exposed state machines. MsgReqInfo *MsgReqInfo MsgReqUpdates *MsgReqUpdates MsgType string MsgUpdates *MsgUpdates Msgs is a list of PubSub messages. NetMachs is a return channel for a list of [rpc.NetworkMachine]. It has to be buffered or the mutation will fail. PeerId is a peer ID PeerId string PeerIds []string PeersGossip PeerGossips func ParseArgs(args am.A) *A func Pass(args *A) am.A
type Gossips (map)
Info is sent when a peer exposes state machines in the topic. Id string MTime am.Time Parent string Schema am.Schema States am.S Tags []string
TODO merge with REPL, add tag-based queries (to find workers) ChildrenMax int ChildrenMin int Level to traverse towards the tree root. IdExact string IdPartial string IdRegexp *regexp.Regexp Parent string PeerId string
Machine clocks indexed by a local peer index.
type MachInfo (map)
Type MsgType
TODO implement Machs map[int]MsgByeMach Msg Msg Msg.Type MsgType
MsgByeMach is sent when a peer un-exposes state machines in the topic. Id string MTime am.Time PeerBye bool
Msg Msg Msg.Type MsgType Number of workers from each peer
Msg Msg Msg.Type MsgType Number of workers from each peer peer ID => mach idx => schema
Msg Msg Msg.Type MsgType TODO list specific mach indexes
Msg Msg Msg.Type MsgType TODO list specific mach indexes
const MsgTypeBye const MsgTypeGossip const MsgTypeInfo const MsgTypeReqInfo const MsgTypeReqUpdates const MsgTypeUpdates
Msg Msg Msg.Type MsgType PeerClocks PeerClocks
Machine clocks indexed by a local peer index.
PeerGossips is peer ID => machine time sum TODO check local time sum
type PeerInfo (map)
Topic is a single-topic PubSub based on lib2p-pubsub with manual discovery. Each peer can have many exposed state machines and broadcasts their clock changes on the channel, introduces them when joining and via private messages to newly joined peers. TODO optimizations: - avoid txs which will get cancelled (CPU) - rate limit the TOTAL amount of msgs in the network (CPU, network) - hook into libp2p errors, eg queue full, and act accordingly Addrs is a list of addresses for this peer. ConnAddrs contains a list of addresses for initial connections to discovery nodes in the network. ConnectionsToReady int DbgNetMachs exposes local network machines via to am-dbg Debounce for clock broadcasts (separate per each exposed state machine). ExceptionHandler *am.ExceptionHandler List of exposed state machines, index => mach_id, indexes are used on the channel. Indexes should NOT change, as they are used for addressing. `nil` value means empty. Number of gossips to send in SendGossipsState HeartbeatFreq broadcasts changed clocks of exposed machines. HostToPeer map[string]string ListenAddrs contains the list of multiaddresses that this topic listens on. None will allocate one automatically. LogEnabled bool Mach is a state machine for this PubSub. MachMetrics atomic.Pointer[am.Machine] Maximum msgs per minute sent to the network. Does not include MsgInfo, which are debounced. Max allowed queue length to send MsgInfo to newly joned peers, as well as received msgs. all amounts and delayes are multiplied by this factor Name indicates the name of the channel or topic instance, typically associated with a given process. SendInfoDebounceMs int T represents the PubSub topic used for communication. Use this hardcoded schema instead of exchanging real ones. Limits all the workers to this one. TODO catalog of named schemas, exchange unknown ones TestStates am.S (*Topic) ConnCount() int (*Topic) ConnectingEnter(e *am.Event) bool (*Topic) ConnectingState(e *am.Event) (*Topic) Dispose() am.Result (*Topic) DoSendInfoEnter(e *am.Event) bool (*Topic) DoSendInfoState(e *am.Event) (*Topic) ExceptionEnter(e *am.Event) bool (*Topic) ExceptionState(e *am.Event) (*Topic) GetPeerAddrs() ([]ma.Multiaddr, error) (*Topic) HeartbeatState(e *am.Event) (*Topic) Join() am.Result (*Topic) JoinedEnd(e *am.Event) (*Topic) JoinedState(e *am.Event) (*Topic) JoiningState(e *am.Event) (*Topic) ListMachinesEnter(e *am.Event) bool (*Topic) ListMachinesState(e *am.Event) (*Topic) MissPeersByGossipEnter(e *am.Event) bool (*Topic) MissPeersByGossipExit(e *am.Event) bool (*Topic) MissPeersByGossipState(e *am.Event) (*Topic) MissPeersByUpdatesEnter(e *am.Event) bool (*Topic) MissPeersByUpdatesExit(e *am.Event) bool (*Topic) MissPeersByUpdatesState(e *am.Event) (*Topic) MissUpdatesByGossipEnter(e *am.Event) bool (*Topic) MissUpdatesByGossipExit(e *am.Event) bool (*Topic) MissUpdatesByGossipState(e *am.Event) (*Topic) MsgByeEnter(e *am.Event) bool (*Topic) MsgInfoEnter(e *am.Event) bool (*Topic) MsgInfoState(e *am.Event) (*Topic) MsgReceivedEnter(e *am.Event) bool (*Topic) MsgReqInfoEnter(e *am.Event) bool (*Topic) MsgReqInfoState(e *am.Event) (*Topic) MsgReqUpdatesEnter(e *am.Event) bool TODO this is never fired (*Topic) MsgUpdatesEnter(e *am.Event) bool (*Topic) MsgUpdatesState(e *am.Event) (*Topic) PeerJoinedEnter(e *am.Event) bool (*Topic) PeerJoinedState(e *am.Event) (*Topic) PeerLeftEnter(e *am.Event) bool (*Topic) ProcessMsgsState(e *am.Event) TODO exit (*Topic) ReqMissingPeersEnter(e *am.Event) bool (*Topic) ReqMissingPeersState(e *am.Event) (*Topic) ReqMissingUpdatesEnter(e *am.Event) bool (*Topic) ReqMissingUpdatesState(e *am.Event) (*Topic) SendGossipsEnter(e *am.Event) bool (*Topic) SendGossipsState(e *am.Event) (*Topic) SendInfoEnter(e *am.Event) bool (*Topic) SendInfoState(e *am.Event) (*Topic) SendMsgEnter(e *am.Event) bool (*Topic) SendMsgState(e *am.Event) (*Topic) SendUpdatesEnter(e *am.Event) bool (*Topic) SendUpdatesState(e *am.Event) (*Topic) Start() am.Result (*Topic) StartAndJoin(ctx context.Context) am.Result (*Topic) StartEnd(e *am.Event) (*Topic) StartEnter(e *am.Event) bool (*Topic) StartState(e *am.Event) func NewTopic(ctx context.Context, name, suffix string, exposedMachs []*am.Machine, opts *TopicOpts) (*Topic, error)
Parent is a parent state machine for a new Topic state machine. See [am.Opts]. func NewTopic(ctx context.Context, name, suffix string, exposedMachs []*am.Machine, opts *TopicOpts) (*Topic, error)
TracerNoOp *am.TracerNoOp ( Tracer) HandlerEnd(transition *machine.Transition, emitter string, handler string) ( Tracer) HandlerStart(transition *machine.Transition, emitter string, handler string) ( Tracer) Inheritable() bool ( Tracer) MachineDispose(machID string) ( Tracer) MachineInit(machine machine.Api) context.Context ( Tracer) MutationQueued(machine machine.Api, mutation *machine.Mutation) ( Tracer) NewSubmachine(parent, machine machine.Api) ( Tracer) QueueEnd(machine machine.Api) ( Tracer) SchemaChange(machine machine.Api, old machine.Schema) TransitionEnd sends a message when a transition ends ( Tracer) TransitionInit(transition *machine.Transition) ( Tracer) TransitionStart(transition *machine.Transition) ( Tracer) VerifyStates(machine machine.Api) *Tracer : github.com/pancsta/asyncmachine-go/pkg/machine.Tracer
Package-Level Functions (total 6)
AddErrJoining wraps an error in the ErrJoining sentinel and adds to a machine.
AddErrListening wraps an error in the ErrListening sentinel and adds to a machine.
LogArgs is an args logger for A and pubsub.A.
func NewTopic(ctx context.Context, name, suffix string, exposedMachs []*am.Machine, opts *TopicOpts) (*Topic, error)
ParseArgs extracts A from [am.Event.Args][APrefix].
Pass prepares [am.A] from A to pass to further mutations.
Package-Level Variables (total 2)
Package-Level Constants (total 9)
const APrefix = "am_pubsub"
EnvAmPubsubDbg exposes PubSub workers over dbg telemetry.
EnvAmPubsubLog enables machine logging for pubsub.
const MsgTypeBye MsgType = "bye"
const MsgTypeGossip MsgType = "gossip"
const MsgTypeInfo MsgType = "info"
const MsgTypeReqInfo MsgType = "reqinfo"
const MsgTypeReqUpdates MsgType = "requpdates"
const MsgTypeUpdates MsgType = "updates"