package swarm
import (
"context"
"errors"
"fmt"
"sync"
"time"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
)
var ErrConnClosed = errors .New ("connection closed" )
type Conn struct {
id uint64
conn transport .CapableConn
swarm *Swarm
closeOnce sync .Once
err error
notifyLk sync .Mutex
streams struct {
sync .Mutex
m map [*Stream ]struct {}
}
stat network .ConnStats
}
var _ network .Conn = &Conn {}
func (c *Conn ) IsClosed () bool {
return c .conn .IsClosed ()
}
func (c *Conn ) ID () string {
return fmt .Sprintf ("%s-%d" , c .RemotePeer ().String ()[:10 ], c .id )
}
func (c *Conn ) Close () error {
c .closeOnce .Do (func () {
c .doClose (0 )
})
return c .err
}
func (c *Conn ) CloseWithError (errCode network .ConnErrorCode ) error {
c .closeOnce .Do (func () {
c .doClose (errCode )
})
return c .err
}
func (c *Conn ) doClose (errCode network .ConnErrorCode ) {
c .swarm .removeConn (c )
c .streams .Lock ()
streams := c .streams .m
c .streams .m = nil
c .streams .Unlock ()
if errCode != 0 {
c .err = c .conn .CloseWithError (errCode )
} else {
c .err = c .conn .Close ()
}
c .swarm .connectednessEventEmitter .RemoveConn (c .RemotePeer ())
for s := range streams {
s .Reset ()
}
go func () {
c .notifyLk .Lock ()
defer c .notifyLk .Unlock ()
c .swarm .notifyAll (func (f network .Notifiee ) {
f .Disconnected (c .swarm , c )
})
c .swarm .refs .Done ()
}()
}
func (c *Conn ) removeStream (s *Stream ) {
c .streams .Lock ()
c .stat .NumStreams --
delete (c .streams .m , s )
c .streams .Unlock ()
s .scope .Done ()
}
func (c *Conn ) start () {
go func () {
defer c .swarm .refs .Done ()
defer c .Close ()
for {
ts , err := c .conn .AcceptStream ()
if err != nil {
return
}
scope , err := c .swarm .ResourceManager ().OpenStream (c .RemotePeer (), network .DirInbound )
if err != nil {
ts .ResetWithError (network .StreamResourceLimitExceeded )
continue
}
c .swarm .refs .Add (1 )
go func () {
s , err := c .addStream (ts , network .DirInbound , scope )
c .swarm .refs .Done ()
if err != nil {
scope .Done ()
return
}
if h := c .swarm .StreamHandler (); h != nil {
h (s )
}
s .completeAcceptStreamGoroutine ()
}()
}
}()
}
func (c *Conn ) String () string {
return fmt .Sprintf (
"<swarm.Conn[%T] %s (%s) <-> %s (%s)>" ,
c .conn .Transport (),
c .conn .LocalMultiaddr (),
c .conn .LocalPeer (),
c .conn .RemoteMultiaddr (),
c .conn .RemotePeer (),
)
}
func (c *Conn ) LocalMultiaddr () ma .Multiaddr {
return c .conn .LocalMultiaddr ()
}
func (c *Conn ) LocalPeer () peer .ID {
return c .conn .LocalPeer ()
}
func (c *Conn ) RemoteMultiaddr () ma .Multiaddr {
return c .conn .RemoteMultiaddr ()
}
func (c *Conn ) RemotePeer () peer .ID {
return c .conn .RemotePeer ()
}
func (c *Conn ) RemotePublicKey () ic .PubKey {
return c .conn .RemotePublicKey ()
}
func (c *Conn ) ConnState () network .ConnectionState {
return c .conn .ConnState ()
}
func (c *Conn ) Stat () network .ConnStats {
c .streams .Lock ()
defer c .streams .Unlock ()
return c .stat
}
func (c *Conn ) NewStream (ctx context .Context ) (network .Stream , error ) {
if c .Stat ().Limited {
if useLimited , _ := network .GetAllowLimitedConn (ctx ); !useLimited {
return nil , network .ErrLimitedConn
}
}
scope , err := c .swarm .ResourceManager ().OpenStream (c .RemotePeer (), network .DirOutbound )
if err != nil {
return nil , err
}
if _ , ok := ctx .Deadline (); !ok {
var cancel context .CancelFunc
ctx , cancel = context .WithTimeout (ctx , defaultNewStreamTimeout )
defer cancel ()
}
s , err := c .openAndAddStream (ctx , scope )
if err != nil {
scope .Done ()
if errors .Is (err , context .DeadlineExceeded ) {
err = fmt .Errorf ("timed out: %w" , err )
}
return nil , err
}
return s , nil
}
func (c *Conn ) openAndAddStream (ctx context .Context , scope network .StreamManagementScope ) (network .Stream , error ) {
ts , err := c .conn .OpenStream (ctx )
if err != nil {
return nil , err
}
return c .addStream (ts , network .DirOutbound , scope )
}
func (c *Conn ) addStream (ts network .MuxedStream , dir network .Direction , scope network .StreamManagementScope ) (*Stream , error ) {
c .streams .Lock ()
if c .streams .m == nil {
c .streams .Unlock ()
ts .Reset ()
return nil , ErrConnClosed
}
s := &Stream {
stream : ts ,
conn : c ,
scope : scope ,
stat : network .Stats {
Direction : dir ,
Opened : time .Now (),
},
id : c .swarm .nextStreamID .Add (1 ),
acceptStreamGoroutineCompleted : dir != network .DirInbound ,
}
c .stat .NumStreams ++
c .streams .m [s ] = struct {}{}
c .swarm .refs .Add (1 )
c .streams .Unlock ()
return s , nil
}
func (c *Conn ) GetStreams () []network .Stream {
c .streams .Lock ()
defer c .streams .Unlock ()
streams := make ([]network .Stream , 0 , len (c .streams .m ))
for s := range c .streams .m {
streams = append (streams , s )
}
return streams
}
func (c *Conn ) Scope () network .ConnScope {
return c .conn .Scope ()
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .