package upgrader
import (
"context"
"fmt"
"strings"
"sync"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/transport"
tec "github.com/jbenet/go-temp-err-catcher"
logging "github.com/libp2p/go-libp2p/gologshim"
manet "github.com/multiformats/go-multiaddr/net"
)
var log = logging .Logger ("upgrader" )
type listener struct {
transport .GatedMaListener
transport transport .Transport
upgrader *upgrader
rcmgr network .ResourceManager
incoming chan transport .CapableConn
err error
threshold *threshold
ctx context .Context
cancel func ()
}
var _ transport .Listener = (*listener )(nil )
func (l *listener ) Close () error {
err := l .GatedMaListener .Close ()
l .cancel ()
for c := range l .incoming {
c .Close ()
}
return err
}
func (l *listener ) handleIncoming () {
var wg sync .WaitGroup
defer func () {
l .GatedMaListener .Close ()
if l .err == nil {
l .err = fmt .Errorf ("listener closed" )
}
wg .Wait ()
close (l .incoming )
}()
var catcher tec .TempErrCatcher
for l .ctx .Err () == nil {
maconn , connScope , err := l .GatedMaListener .Accept ()
if err != nil {
if catcher .IsTemporary (err ) {
log .Info ("temporary accept error" , "err" , err )
continue
}
l .err = err
return
}
catcher .Reset ()
if connScope == nil {
log .Error ("BUG: got nil connScope for incoming connection" , "remote_multiaddr" , maconn .RemoteMultiaddr ())
maconn .Close ()
continue
}
l .threshold .Wait ()
log .Debug ("listener got connection" ,
"listener" , l ,
"local_multiaddr" , maconn .LocalMultiaddr (),
"remote_multiaddr" , maconn .RemoteMultiaddr ())
wg .Add (1 )
go func () {
defer wg .Done ()
ctx , cancel := context .WithTimeout (l .ctx , l .upgrader .acceptTimeout )
defer cancel ()
conn , err := l .upgrader .Upgrade (ctx , l .transport , maconn , network .DirInbound , "" , connScope )
if err != nil {
log .Debug ("accept upgrade error" ,
"err" , err ,
"local_multiaddr" , maconn .LocalMultiaddr (),
"remote_multiaddr" , maconn .RemoteMultiaddr ())
connScope .Done ()
return
}
log .Debug ("listener accepted connection" ,
"listener" , l ,
"connection" , conn )
l .threshold .Acquire ()
defer l .threshold .Release ()
select {
case l .incoming <- conn :
case <- ctx .Done ():
if l .ctx .Err () == nil {
log .Warn ("listener dropped connection due to slow accept" , "remote_multiaddr" , maconn .RemoteMultiaddr (), "peer" , conn .RemotePeer ())
}
conn .CloseWithError (network .ConnRateLimited )
}
}()
}
}
func (l *listener ) Accept () (transport .CapableConn , error ) {
for c := range l .incoming {
if !c .IsClosed () {
return c , nil
}
}
if strings .Contains (l .err .Error(), "use of closed network connection" ) {
return nil , transport .ErrListenerClosed
}
return nil , l .err
}
func (l *listener ) String () string {
if s , ok := l .transport .(fmt .Stringer ); ok {
return fmt .Sprintf ("<stream.Listener[%s] %s>" , s , l .Multiaddr ())
}
return fmt .Sprintf ("<stream.Listener %s>" , l .Multiaddr ())
}
type gatedMaListener struct {
manet .Listener
rcmgr network .ResourceManager
connGater connmgr .ConnectionGater
}
var _ transport .GatedMaListener = &gatedMaListener {}
func (l *gatedMaListener ) Accept () (manet .Conn , network .ConnManagementScope , error ) {
for {
conn , err := l .Listener .Accept ()
if err != nil {
return nil , nil , err
}
if l .connGater != nil && !l .connGater .InterceptAccept (conn ) {
log .Debug ("gater blocked incoming connection" ,
"local_multiaddr" , conn .LocalMultiaddr (),
"remote_multiaddr" , conn .RemoteMultiaddr ())
if err := conn .Close (); err != nil {
log .Warn ("failed to close incoming connection rejected by gater" , "err" , err )
}
continue
}
connScope , err := l .rcmgr .OpenConnection (network .DirInbound , true , conn .RemoteMultiaddr ())
if err != nil {
log .Debug ("resource manager blocked accept of new connection" , "err" , err )
if err := conn .Close (); err != nil {
log .Warn ("failed to open incoming connection. Rejected by resource manager" , "err" , err )
}
continue
}
return conn , connScope , nil
}
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .