package swarmimport (tptmamanet)// dialRequest is structure used to request dials to the peer associated with a// worker looptype dialRequest struct {// ctx is the context that may be used for the request // if another concurrent request is made, any of the concurrent request's ctx may be used for // dials to the peer's addresses // ctx for simultaneous connect requests have higher priority than normal requests ctx context.Context// resch is the channel used to send the response for this query resch chandialResponse}// dialResponse is the response sent to dialRequests on the request's resch channeltype dialResponse struct {// conn is the connection to the peer on success conn *Conn// err is the error in dialing the peer // nil on connection success err error}// pendRequest is used to track progress on a dialRequest.type pendRequest struct {// req is the original dialRequest req dialRequest// err comprises errors of all failed dials err *DialError// addrs are the addresses on which we are waiting for pending dials // At the time of creation addrs is initialised to all the addresses of the peer. On a failed dial, // the addr is removed from the map and err is updated. On a successful dial, the dialRequest is // completed and response is sent with the connection addrs map[string]struct{}}// addrDial tracks dials to a particular multiaddress.type addrDial struct {// addr is the address dialed addr ma.Multiaddr// ctx is the context used for dialing the address ctx context.Context// conn is the established connection on success conn *Conn// err is the err on dialing the address err error// dialed indicates whether we have triggered the dial to the address dialed bool// createdAt is the time this struct was created createdAt time.Time// dialRankingDelay is the delay in dialing this address introduced by the ranking logic dialRankingDelay time.Duration// expectedTCPUpgradeTime is the expected time by which security upgrade will complete expectedTCPUpgradeTime time.Time}// dialWorker synchronises concurrent dials to a peer. It ensures that we make at most one dial to a// peer's addresstype dialWorker struct { s *Swarm peer peer.ID// reqch is used to send dial requests to the worker. close reqch to end the worker loop reqch <-chandialRequest// pendingRequests is the set of pendingRequests pendingRequests map[*pendRequest]struct{}// trackedDials tracks dials to the peer's addresses. An entry here is used to ensure that // we dial an address at most once trackedDials map[string]*addrDial// resch is used to receive response for dials to the peers addresses. resch chantpt.DialUpdate connected bool// true when a connection has been successfully established// for testing wg sync.WaitGroup cl Clock}func newDialWorker( *Swarm, peer.ID, <-chandialRequest, Clock) *dialWorker {if == nil { = RealClock{} }return &dialWorker{s: ,peer: ,reqch: ,pendingRequests: make(map[*pendRequest]struct{}),trackedDials: make(map[string]*addrDial),resch: make(chantpt.DialUpdate),cl: , }}// loop implements the core dial worker loop. Requests are received on w.reqch.// The loop exits when w.reqch is closed.func ( *dialWorker) () { .wg.Add(1)defer .wg.Done()defer .s.limiter.clearAllPeerDials(.peer)// dq is used to pace dials to different addresses of the peer := newDialQueue()// dialsInFlight is the number of dials in flight. := 0 := .cl.Now()// dialTimer is the dialTimer used to trigger dials := .cl.InstantTimer(.Add(math.MaxInt64))defer .Stop() := true// scheduleNextDial updates timer for triggering the next dial := func() {if && !.Stop() { <-.Ch() } = falseif .Len() > 0 {if == 0 && !.connected {// if there are no dials in flight, trigger the next dials immediately .Reset() } else { := .Add(.top().Delay)for , := range .trackedDials {if !.expectedTCPUpgradeTime.IsZero() && .expectedTCPUpgradeTime.After() { = .expectedTCPUpgradeTime } } .Reset() } = true } }// totalDials is used to track number of dials made by this worker for metrics := 0:for {// The loop has three parts // 1. Input requests are received on w.reqch. If a suitable connection is not available we create // a pendRequest object to track the dialRequest and add the addresses to dq. // 2. Addresses from the dialQueue are dialed at appropriate time intervals depending on delay logic. // We are notified of the completion of these dials on w.resch. // 3. Responses for dials are received on w.resch. On receiving a response, we updated the pendRequests // interested in dials on this address.select {case , := <-.reqch:if ! {if .s.metricsTracer != nil { .s.metricsTracer.DialCompleted(.connected, , time.Since()) }return }// We have received a new request. If we do not have a suitable connection, // track this dialRequest with a pendRequest. // Enqueue the peer's addresses relevant to this request in dq and // track dials to the addresses relevant to this request. := .s.bestAcceptableConnToPeer(.ctx, .peer)if != nil { .resch <- dialResponse{conn: }continue } , , := .s.addrsForDial(.ctx, .peer)if != nil { .resch <- dialResponse{err: &DialError{Peer: .peer,DialErrors: ,Cause: , }}continue }// get the delays to dial these addrs from the swarms dialRanker , , := network.GetSimultaneousConnect(.ctx) := .rankAddrs(, ) := make(map[string]time.Duration, len())// create the pending request object := &pendRequest{req: ,addrs: make(map[string]struct{}, len()),err: &DialError{Peer: .peer, DialErrors: }, }for , := range { .addrs[string(.Addr.Bytes())] = struct{}{} [string(.Addr.Bytes())] = .Delay }// Check if dials to any of the addrs have completed already // If they have errored, record the error in pr. If they have succeeded, // respond with the connection. // If they are pending, add them to tojoin. // If we haven't seen any of the addresses before, add them to todial.var []ma.Multiaddrvar []*addrDialfor , := range { , := .trackedDials[string(.Addr.Bytes())]if ! { = append(, .Addr)continue }if .conn != nil {// dial to this addr was successful, complete the request .resch <- dialResponse{conn: .conn}continue }if .err != nil {// dial to this addr errored, accumulate the error .err.recordErr(.addr, .err)delete(.addrs, string(.addr.Bytes()))continue }// dial is still pending, add to the join list = append(, ) }iflen() == 0 && len() == 0 {// all request applicable addrs have been dialed, we must have errored .err.Cause = ErrAllDialsFailed .resch <- dialResponse{err: .err}continue }// The request has some pending or new dials .pendingRequests[] = struct{}{}for , := range {if !.dialed {// we haven't dialed this address. update the ad.ctx to have simultaneous connect values // set correctlyif , , := network.GetSimultaneousConnect(.ctx); {if , , := network.GetSimultaneousConnect(.ctx); ! { .ctx = network.WithSimultaneousConnect(.ctx, , )// update the element in dq to use the simultaneous connect delay. .UpdateOrAdd(network.AddrDelay{Addr: .addr,Delay: [string(.addr.Bytes())], }) } } }// add the request to the addrDial }iflen() > 0 { := time.Now()// these are new addresses, track them and add them to dqfor , := range { .trackedDials[string(.Bytes())] = &addrDial{addr: ,ctx: .ctx,createdAt: , } .Add(network.AddrDelay{Addr: , Delay: [string(.Bytes())]}) } }// setup dialTimer for updates to dq ()case<-.Ch():// It's time to dial the next batch of addresses. // We don't check the delay of the addresses received from the queue here // because if the timer triggered before the delay, it means that all // the inflight dials have errored and we should dial the next batch of // addresses := time.Now()for , := range .NextBatch() {// spawn the dial , := .trackedDials[string(.Addr.Bytes())]if ! {log.Errorf("SWARM BUG: no entry for address %s in trackedDials", .Addr)continue } .dialed = true .dialRankingDelay = .Sub(.createdAt) := .s.dialNextAddr(.ctx, .peer, .addr, .resch)if != nil {// Errored without attempting a dial. This happens in case of // backoff or black hole. .dispatchError(, ) } else {// the dial was successful. update inflight dials ++ ++ } } = false// schedule more dials ()case := <-.resch:// A dial to an address has completed. // Update all requests waiting on this address. On success, complete the request. // On error, record the error , := .trackedDials[string(.Addr.Bytes())]if ! {log.Errorf("SWARM BUG: no entry for address %s in trackedDials", .Addr)if .Conn != nil { .Conn.Close() } --continue }// TCP Connection has been established. Wait for connection upgrade on this address // before making new dials.if .Kind == tpt.UpdateKindHandshakeProgressed {// Only wait for public addresses to complete dialing since private dials // are quick any wayifmanet.IsPublicAddr(.Addr) { .expectedTCPUpgradeTime = .cl.Now().Add(PublicTCPDelay) } ()continue } -- .expectedTCPUpgradeTime = time.Time{}if .Conn != nil {// we got a connection, add it to the swarm , := .s.addConn(.Conn, network.DirOutbound)if != nil {// oops no, we failed to add it to the swarm .Conn.Close() .dispatchError(, )continue }for := range .pendingRequests {if , := .addrs[string(.addr.Bytes())]; { .req.resch <- dialResponse{conn: }delete(.pendingRequests, ) } } .conn = if !.connected { .connected = trueif .s.metricsTracer != nil { .s.metricsTracer.DialRankingDelay(.dialRankingDelay) } }continue }// it must be an error -- add backoff if applicable and dispatch // ErrDialRefusedBlackHole shouldn't end up here, just a safety checkif .Err != ErrDialRefusedBlackHole && .Err != context.Canceled && !.connected {// we only add backoff if there has not been a successful connection // for consistency with the old dialer behavior. .s.backf.AddBackoff(.peer, .Addr) } elseif .Err == ErrDialRefusedBlackHole {log.Errorf("SWARM BUG: unexpected ErrDialRefusedBlackHole while dialing peer %s to addr %s", .peer, .Addr) } .dispatchError(, .Err)// Only schedule next dial on error. // If we scheduleNextDial on success, we will end up making one dial more than // required because the final successful dial will spawn one more dial () } }}// dispatches an error to a specific addr dialfunc ( *dialWorker) ( *addrDial, error) { .err = for := range .pendingRequests {// accumulate the errorif , := .addrs[string(.addr.Bytes())]; { .err.recordErr(.addr, )delete(.addrs, string(.addr.Bytes()))iflen(.addrs) == 0 {// all addrs have erred, dispatch dial error // but first do a last one check in case an acceptable connection has landed from // a simultaneous dial that started later and added new acceptable addrs := .s.bestAcceptableConnToPeer(.req.ctx, .peer)if != nil { .req.resch <- dialResponse{conn: } } else { .err.Cause = ErrAllDialsFailed .req.resch <- dialResponse{err: .err} }delete(.pendingRequests, ) } } }// if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests. // this is necessary to support active listen scenarios, where a new dial comes in while // another dial is in progress, and needs to do a direct connection without inhibitions from // dial backoff.if == ErrDialBackoff {delete(.trackedDials, string(.addr.Bytes())) }}// rankAddrs ranks addresses for dialing. if it's a simConnect request we// dial all addresses immediately without any delayfunc ( *dialWorker) ( []ma.Multiaddr, bool) []network.AddrDelay {if {returnNoDelayDialRanker() }return .s.dialRanker()}// dialQueue is a priority queue used to schedule dialstype dialQueue struct {// q contains dials ordered by delay q []network.AddrDelay}// newDialQueue returns a new dialQueuefunc newDialQueue() *dialQueue {return &dialQueue{q: make([]network.AddrDelay, 0, 16), }}// Add adds a new element to the dialQueue. To update an element use UpdateOrAdd.func ( *dialQueue) ( network.AddrDelay) {for := .Len() - 1; >= 0; -- {if .q[].Delay <= .Delay {// insert at pos i+1 .q = append(.q, network.AddrDelay{}) // extend the slicecopy(.q[+2:], .q[+1:]) .q[+1] = return } }// insert at position 0 .q = append(.q, network.AddrDelay{}) // extend the slicecopy(.q[1:], .q[0:]) .q[0] = }// UpdateOrAdd updates the elements with address adelay.Addr to the new delay// Useful when hole punchingfunc ( *dialQueue) ( network.AddrDelay) {for := 0; < .Len(); ++ {if .q[].Addr.Equal(.Addr) {if .q[].Delay == .Delay {// existing element is the same. nothing to doreturn }// remove the elementcopy(.q[:], .q[+1:]) .q = .q[:len(.q)-1] } } .Add()}// NextBatch returns all the elements in the queue with the highest priorityfunc ( *dialQueue) () []network.AddrDelay {if .Len() == 0 {returnnil }// i is the index of the second highest priority elementvarintfor = 0; < .Len(); ++ {if .q[].Delay != .q[0].Delay {break } } := .q[:] .q = .q[:]return}// top returns the top element of the queuefunc ( *dialQueue) () network.AddrDelay {return .q[0]}// Len returns the number of elements in the queuefunc ( *dialQueue) () int {returnlen(.q)}
The pages are generated with Goldsv0.8.2. (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu.
PR and bug reports are welcome and can be submitted to the issue list.
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds.