package swarm
import (
"context"
"sync"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)
type connectednessEventEmitter struct {
mx sync .RWMutex
newConns chan peer .ID
removeConnsMx sync .Mutex
removeConns []peer .ID
lastEvent map [peer .ID ]network .Connectedness
connectedness func (peer .ID ) network .Connectedness
emitter event .Emitter
wg sync .WaitGroup
removeConnNotif chan struct {}
ctx context .Context
cancel context .CancelFunc
}
func newConnectednessEventEmitter(connectedness func (peer .ID ) network .Connectedness , emitter event .Emitter ) *connectednessEventEmitter {
ctx , cancel := context .WithCancel (context .Background ())
c := &connectednessEventEmitter {
newConns : make (chan peer .ID , 32 ),
lastEvent : make (map [peer .ID ]network .Connectedness ),
removeConnNotif : make (chan struct {}, 1 ),
connectedness : connectedness ,
emitter : emitter ,
ctx : ctx ,
cancel : cancel ,
}
c .wg .Add (1 )
go c .runEmitter ()
return c
}
func (c *connectednessEventEmitter ) AddConn (p peer .ID ) {
c .mx .RLock ()
defer c .mx .RUnlock ()
if c .ctx .Err () != nil {
return
}
c .newConns <- p
}
func (c *connectednessEventEmitter ) RemoveConn (p peer .ID ) {
c .mx .RLock ()
defer c .mx .RUnlock ()
if c .ctx .Err () != nil {
return
}
c .removeConnsMx .Lock ()
c .removeConns = append (c .removeConns , p )
c .removeConnsMx .Unlock ()
select {
case c .removeConnNotif <- struct {}{}:
default :
}
}
func (c *connectednessEventEmitter ) Close () {
c .cancel ()
c .wg .Wait ()
}
func (c *connectednessEventEmitter ) runEmitter () {
defer c .wg .Done ()
for {
select {
case p := <- c .newConns :
c .notifyPeer (p , true )
case <- c .removeConnNotif :
c .sendConnRemovedNotifications ()
case <- c .ctx .Done ():
c .mx .Lock ()
defer c .mx .Unlock ()
for {
select {
case p := <- c .newConns :
c .notifyPeer (p , true )
case <- c .removeConnNotif :
c .sendConnRemovedNotifications ()
default :
return
}
}
}
}
}
func (c *connectednessEventEmitter ) notifyPeer (p peer .ID , forceNotConnectedEvent bool ) {
oldState := c .lastEvent [p ]
c .lastEvent [p ] = c .connectedness (p )
if c .lastEvent [p ] == network .NotConnected {
delete (c .lastEvent , p )
}
if (forceNotConnectedEvent && c .lastEvent [p ] == network .NotConnected ) || c .lastEvent [p ] != oldState {
c .emitter .Emit (event .EvtPeerConnectednessChanged {
Peer : p ,
Connectedness : c .lastEvent [p ],
})
}
}
func (c *connectednessEventEmitter ) sendConnRemovedNotifications () {
c .removeConnsMx .Lock ()
removeConns := c .removeConns
c .removeConns = nil
c .removeConnsMx .Unlock ()
for _ , p := range removeConns {
c .notifyPeer (p , false )
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .