package machine
import (
"context"
"maps"
"slices"
"strings"
"sync"
)
type (
InternalLogFunc func (level LogLevel , msg string , args ...any )
InternalCheckFunc func (states S ) bool
)
type Subscriptions struct {
Mx sync .Mutex
mach Api
clock Clock
is InternalCheckFunc
not InternalCheckFunc
log InternalLogFunc
stateCtx IndexStateCtx
when IndexWhen
whenCtx map [context .Context ][]*WhenBinding
whenTime IndexWhenTime
whenTimeCtx map [context .Context ][]*WhenTimeBinding
whenArgs IndexWhenArgs
whenArgsCtx map [context .Context ][]*WhenArgsBinding
whenQuery []*whenQueryBinding
whenQueryCtx map [context .Context ][]*whenQueryBinding
whenQueueEnds []*whenQueueEndsBinding
whenQueue []*whenQueueBinding
}
func NewSubscriptionManager (
mach Api , clock Clock , is , not InternalCheckFunc , log InternalLogFunc ,
) *Subscriptions {
return &Subscriptions {
mach : mach ,
clock : clock ,
is : is ,
not : not ,
log : log ,
when : IndexWhen {},
whenTime : IndexWhenTime {},
whenArgs : IndexWhenArgs {},
stateCtx : IndexStateCtx {},
whenCtx : map [context .Context ][]*WhenBinding {},
whenTimeCtx : map [context .Context ][]*WhenTimeBinding {},
whenArgsCtx : map [context .Context ][]*WhenArgsBinding {},
}
}
func (sm *Subscriptions ) ProcessStateCtx (deactivated S ) []context .CancelFunc {
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
var toCancel []context .CancelFunc
for _ , s := range deactivated {
if _ , ok := sm .stateCtx [s ]; !ok {
continue
}
toCancel = append (toCancel , sm .stateCtx [s ].Cancel )
sm .log (LogOps , "[ctx:match] %s" , s )
delete (sm .stateCtx , s )
}
return toCancel
}
func (sm *Subscriptions ) ProcessWhen (activated , deactivated S ) []chan struct {} {
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
ret := sm .processWhenCtx ()
all := slices .Concat (activated , deactivated )
for _ , s := range all {
for _ , binding := range slices .Clone (sm .when [s ]) {
if slices .Contains (activated , s ) {
if !binding .Negation {
if !binding .States [s ] {
binding .Matched ++
}
} else {
if !binding .States [s ] {
binding .Matched --
}
}
binding .States [s ] = true
} else {
if !binding .Negation {
if binding .States [s ] {
binding .Matched --
}
} else {
if binding .States [s ] {
binding .Matched ++
}
}
binding .States [s ] = false
}
expired := binding .Ctx != nil && binding .Ctx .Err () != nil
if binding .Matched < binding .Total && !expired {
continue
}
sm .gcWhenBinding (binding , true )
ret = append (ret , binding .Ch )
}
}
return ret
}
func (sm *Subscriptions ) processWhenCtx () []chan struct {} {
var ret []chan struct {}
for ctx , bindings := range sm .whenCtx {
if ctx .Err () == nil {
continue
}
delete (sm .whenCtx , ctx )
for _ , binding := range bindings {
sm .gcWhenBinding (binding , false )
}
}
return ret
}
func (sm *Subscriptions ) gcWhenBinding (binding *WhenBinding , gcCtx bool ) {
var names []string
for state := range binding .States {
if binding .Ctx != nil && gcCtx {
sm .whenCtx [binding .Ctx ] = slicesWithout (
sm .whenCtx [binding .Ctx ], binding )
if len (sm .whenCtx [binding .Ctx ]) == 0 {
delete (sm .whenCtx , binding .Ctx )
}
}
names = append (names , state )
if len (sm .when [state ]) == 1 {
delete (sm .when , state )
continue
}
sm .when [state ] = slicesWithout (sm .when [state ], binding )
}
if binding .Negation {
sm .log (LogOps , "[whenNot:match] %s" , j (names ))
} else {
sm .log (LogOps , "[when:match] %s" , j (names ))
}
}
func (sm *Subscriptions ) ProcessWhenTime (before Clock ) []chan struct {} {
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
ret := sm .processWhenTimeCtx ()
allTicked := S {}
for state , t := range before {
if sm .clock [state ] != t {
allTicked = append (allTicked , state )
}
}
for _ , s := range allTicked {
for _ , binding := range slices .Clone (sm .whenTime [s ]) {
if !binding .Completed [s ] &&
sm .clock [s ] >= binding .Times [binding .Index [s ]] {
binding .Matched ++
binding .Completed [s ] = true
}
expired := binding .Ctx != nil && binding .Ctx .Err () != nil
if binding .Matched < binding .Total && !expired {
continue
}
sm .gcWhenTimeBinding (binding , true )
ret = append (ret , binding .Ch )
}
}
return ret
}
func (sm *Subscriptions ) processWhenTimeCtx () []chan struct {} {
var ret []chan struct {}
for ctx , bindings := range sm .whenTimeCtx {
if ctx .Err () == nil {
continue
}
delete (sm .whenTimeCtx , ctx )
for _ , binding := range bindings {
sm .gcWhenTimeBinding (binding , false )
}
}
return ret
}
func (sm *Subscriptions ) gcWhenTimeBinding (
binding *WhenTimeBinding , gcCtx bool ,
) {
var names []string
for state := range binding .Index {
if binding .Ctx != nil && gcCtx {
sm .whenTimeCtx [binding .Ctx ] = slicesWithout (
sm .whenTimeCtx [binding .Ctx ], binding )
if len (sm .whenTimeCtx [binding .Ctx ]) == 0 {
delete (sm .whenTimeCtx , binding .Ctx )
}
}
names = append (names , state )
if len (sm .whenTime [state ]) == 1 {
delete (sm .whenTime , state )
continue
}
sm .whenTime [state ] = slicesWithout (sm .whenTime [state ], binding )
}
sm .log (LogOps , "[whenTime:match] %s %d" , j (names ), binding .Times )
}
func (sm *Subscriptions ) ProcessWhenArgs (e *Event ) []chan struct {} {
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
ret := sm .processWhenArgsCtx ()
for _ , binding := range slices .Clone (sm .whenArgs [e .Name ]) {
expired := binding .ctx != nil && binding .ctx .Err () != nil
if !compareArgs (e .Args , binding .args ) && !expired {
continue
}
sm .gcWhenArgsBinding (binding , true )
ret = append (ret , binding .ch )
}
return ret
}
func (sm *Subscriptions ) processWhenArgsCtx () []chan struct {} {
var ret []chan struct {}
for ctx , bindings := range sm .whenArgsCtx {
if ctx .Err () == nil {
continue
}
delete (sm .whenArgsCtx , ctx )
for _ , binding := range bindings {
sm .gcWhenArgsBinding (binding , false )
}
}
return ret
}
func (sm *Subscriptions ) gcWhenArgsBinding (
binding *WhenArgsBinding , gcCtx bool ,
) {
if binding .ctx != nil && gcCtx {
sm .whenArgsCtx [binding .ctx ] = slicesWithout (
sm .whenArgsCtx [binding .ctx ], binding )
if len (sm .whenArgsCtx [binding .ctx ]) == 0 {
delete (sm .whenArgsCtx , binding .ctx )
}
}
if len (sm .whenArgs [binding .handler ]) == 1 {
delete (sm .whenArgs , binding .handler )
} else {
sm .whenArgs [binding .handler ] = slicesWithout (
sm .whenArgs [binding .handler ], binding )
}
argNames := jw (slices .Collect (maps .Keys (binding .args )), "," )
name , _ := strings .CutSuffix (binding .handler , SuffixState )
sm .log (LogOps , "[whenArgs:match] %s (%s)" , name , argNames )
}
func (sm *Subscriptions ) ProcessWhenQuery () []chan struct {} {
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
ret := sm .processWhenQueryCtx ()
for _ , binding := range slices .Clone (sm .whenQuery ) {
expired := binding .ctx != nil && binding .ctx .Err () != nil
if !binding .fn (sm .clock ) && !expired {
continue
}
sm .gcWhenQueryBinding (binding , true )
ret = append (ret , binding .ch )
}
return ret
}
func (sm *Subscriptions ) processWhenQueryCtx () []chan struct {} {
var ret []chan struct {}
for ctx , bindings := range sm .whenQueryCtx {
if ctx .Err () == nil {
continue
}
delete (sm .whenArgsCtx , ctx )
for _ , binding := range bindings {
sm .gcWhenQueryBinding (binding , false )
}
}
return ret
}
func (sm *Subscriptions ) gcWhenQueryBinding (
binding *whenQueryBinding , gcCtx bool ,
) {
if binding .ctx != nil && gcCtx {
sm .whenQueryCtx [binding .ctx ] = slicesWithout (
sm .whenQueryCtx [binding .ctx ], binding )
if len (sm .whenQueryCtx [binding .ctx ]) == 0 {
delete (sm .whenQueryCtx , binding .ctx )
}
}
idx := 0
if len (sm .whenQuery ) == 1 {
sm .whenQuery = nil
} else {
idx = slices .Index (sm .whenQuery , binding )
sm .whenQuery = slices .Delete (sm .whenQuery , idx , idx +1 )
}
sm .log (LogOps , "[whenQuery:match] %d" , idx )
}
func (sm *Subscriptions ) ProcessWhenQueueEnds () []chan struct {} {
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
ret := make ([]chan struct {}, len (sm .whenQueueEnds ))
for i , binding := range sm .whenQueueEnds {
ret [i ] = binding .ch
}
sm .whenQueueEnds = nil
return ret
}
func (sm *Subscriptions ) ProcessWhenQueue (queueTick uint64 ) []chan struct {} {
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
var toClose []chan struct {}
var toCloseIdx []int
for i , binding := range sm .whenQueue {
if uint64 (binding .tick ) > queueTick {
continue
}
sm .log (LogOps , "[whenQueue:match] %d" , binding .tick )
toClose = append (toClose , binding .ch )
toCloseIdx = append (toCloseIdx , i )
}
slices .Reverse (toCloseIdx )
for _ , idx := range toCloseIdx {
sm .whenQueue = slices .Delete (sm .whenQueue , idx , idx +1 )
}
return toClose
}
func (sm *Subscriptions ) NewStateCtx (state string ) context .Context {
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
if _ , ok := sm .stateCtx [state ]; ok {
return sm .stateCtx [state ].Ctx
}
v := CtxValue {
Id : sm .mach .Id (),
State : state ,
Tick : sm .clock [state ],
}
stateCtx , cancel := context .WithCancel (context .WithValue (sm .mach .Ctx (),
CtxKey , v ))
if !sm .is (S {state }) {
cancel ()
return stateCtx
}
binding := &CtxBinding {
Ctx : stateCtx ,
Cancel : cancel ,
}
sm .stateCtx [state ] = binding
sm .log (LogOps , "[ctx:new] %s" , state )
return stateCtx
}
func (sm *Subscriptions ) When (states S , ctx context .Context ) <-chan struct {} {
if sm .is (states ) {
return newClosedChan ()
}
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
ch := make (chan struct {})
setMap := StateIsActive {}
matched := 0
for _ , s := range states {
setMap [s ] = sm .is (S {s })
if setMap [s ] {
matched ++
}
}
binding := &WhenBinding {
Ch : ch ,
Negation : false ,
States : setMap ,
Total : len (states ),
Matched : matched ,
Ctx : ctx ,
}
sm .log (LogOps , "[when:new] %s" , j (states ))
for _ , s := range states {
sm .when [s ] = append (sm .when [s ], binding )
if ctx != nil {
sm .whenCtx [ctx ] = append (sm .whenCtx [ctx ], binding )
}
}
return ch
}
func (sm *Subscriptions ) WhenNot (
states S , ctx context .Context ,
) <-chan struct {} {
if sm .not (states ) {
return newClosedChan ()
}
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
ch := make (chan struct {})
setMap := StateIsActive {}
matched := 0
for _ , s := range states {
setMap [s ] = sm .is (S {s })
if !setMap [s ] {
matched ++
}
}
binding := &WhenBinding {
Ch : ch ,
Negation : true ,
States : setMap ,
Total : len (states ),
Matched : matched ,
Ctx : ctx ,
}
sm .log (LogOps , "[whenNot:new] %s" , j (states ))
for _ , s := range states {
if _ , ok := sm .when [s ]; !ok {
sm .when [s ] = []*WhenBinding {binding }
} else {
sm .when [s ] = append (sm .when [s ], binding )
}
}
if ctx != nil {
sm .whenCtx [ctx ] = append (sm .whenCtx [ctx ], binding )
}
return ch
}
func (sm *Subscriptions ) WhenArgs (
state string , args A , ctx context .Context ,
) <-chan struct {} {
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
ch := make (chan struct {})
handler := state + SuffixState
argNames := jw (slices .Collect (maps .Keys (args )), "," )
sm .log (LogOps , "[whenArgs:new] %s (%s)" , state , argNames )
for _ , binding := range sm .whenArgs [handler ] {
if compareArgs (binding .args , args ) {
return binding .ch
}
}
binding := &WhenArgsBinding {
ch : ch ,
handler : handler ,
args : args ,
ctx : ctx ,
}
sm .whenArgs [handler ] = append (sm .whenArgs [handler ], binding )
if ctx != nil {
sm .whenArgsCtx [ctx ] = append (sm .whenArgsCtx [ctx ], binding )
}
return ch
}
func (sm *Subscriptions ) WhenTime (
states S , times Time , ctx context .Context ,
) <-chan struct {} {
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
ch := make (chan struct {})
indexWhenTime := sm .whenTime
passed := true
for i , s := range states {
if sm .clock [s ] < times [i ] {
passed = false
break
}
}
if passed {
close (ch )
return ch
}
completed := StateIsActive {}
matched := 0
index := map [string ]int {}
for i , s := range states {
completed [s ] = sm .clock [s ] >= times [i ]
if completed [s ] {
matched ++
}
index [s ] = i
}
binding := &WhenTimeBinding {
Ch : ch ,
Index : index ,
Completed : completed ,
Total : len (states ),
Matched : matched ,
Times : times ,
Ctx : ctx ,
}
sm .log (LogOps , "[whenTime:new] %s %s" , jw (states , "," ), times )
for _ , s := range states {
indexWhenTime [s ] = append (indexWhenTime [s ], binding )
}
if ctx != nil {
sm .whenTimeCtx [ctx ] = append (sm .whenTimeCtx [ctx ], binding )
}
return ch
}
func (sm *Subscriptions ) WhenQuery (
fn func (clock Clock ) bool , ctx context .Context ,
) <-chan struct {} {
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
ch := make (chan struct {})
binding := &whenQueryBinding {
ch : ch ,
ctx : ctx ,
fn : fn ,
}
sm .log (LogOps , "[whenQuery:new] %d" , len (sm .whenQuery ))
sm .whenQuery = append (sm .whenQuery , binding )
if ctx != nil {
sm .whenQueryCtx [ctx ] = append (sm .whenQueryCtx [ctx ], binding )
}
return ch
}
func (sm *Subscriptions ) WhenQueueEnds (
ctx context .Context , mx *sync .RWMutex ,
) <-chan struct {} {
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
ch := make (chan struct {})
binding := &whenQueueEndsBinding {
ch : ch ,
ctx : ctx ,
}
sm .whenQueueEnds = append (sm .whenQueueEnds , binding )
return ch
}
func (sm *Subscriptions ) WhenQueue (tick Result ) <-chan struct {} {
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
ch := make (chan struct {})
binding := &whenQueueBinding {
ch : ch ,
tick : tick ,
}
sm .log (LogOps , "[whenQueue:new] %d" , tick )
sm .whenQueue = append (sm .whenQueue , binding )
return ch
}
func (sm *Subscriptions ) HasWhenArgs () bool {
sm .Mx .Lock ()
defer sm .Mx .Unlock ()
return len (sm .whenArgs ) > 0
}
func (sm *Subscriptions ) dispose () {
for _ , binding := range sm .stateCtx {
binding .Cancel ()
}
for state := range sm .when {
for _ , binding := range sm .when [state ] {
closeSafe (binding .Ch )
}
}
for state := range sm .whenTime {
for _ , binding := range sm .whenTime [state ] {
closeSafe (binding .Ch )
}
}
for state := range sm .whenArgs {
for _ , binding := range sm .whenArgs [state ] {
closeSafe (binding .ch )
}
}
for _ , bind := range sm .whenQueueEnds {
closeSafe (bind .ch )
}
for _ , bind := range sm .whenQueue {
closeSafe (bind .ch )
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .