package swarm

import (
	
	
	
	

	
	
	tpt 

	ma 
	manet 
)

// dialRequest is structure used to request dials to the peer associated with a
// worker loop
type dialRequest struct {
	// ctx is the context that may be used for the request
	// if another concurrent request is made, any of the concurrent request's ctx may be used for
	// dials to the peer's addresses
	// ctx for simultaneous connect requests have higher priority than normal requests
	ctx context.Context
	// resch is the channel used to send the response for this query
	resch chan dialResponse
}

// dialResponse is the response sent to dialRequests on the request's resch channel
type dialResponse struct {
	// conn is the connection to the peer on success
	conn *Conn
	// err is the error in dialing the peer
	// nil on connection success
	err error
}

// pendRequest is used to track progress on a dialRequest.
type pendRequest struct {
	// req is the original dialRequest
	req dialRequest
	// err comprises errors of all failed dials
	err *DialError
	// addrs are the addresses on which we are waiting for pending dials
	// At the time of creation addrs is initialised to all the addresses of the peer. On a failed dial,
	// the addr is removed from the map and err is updated. On a successful dial, the dialRequest is
	// completed and response is sent with the connection
	addrs map[string]struct{}
}

// addrDial tracks dials to a particular multiaddress.
type addrDial struct {
	// addr is the address dialed
	addr ma.Multiaddr
	// ctx is the context used for dialing the address
	ctx context.Context
	// conn is the established connection on success
	conn *Conn
	// err is the err on dialing the address
	err error
	// dialed indicates whether we have triggered the dial to the address
	dialed bool
	// createdAt is the time this struct was created
	createdAt time.Time
	// dialRankingDelay is the delay in dialing this address introduced by the ranking logic
	dialRankingDelay time.Duration
	// expectedTCPUpgradeTime is the expected time by which security upgrade will complete
	expectedTCPUpgradeTime time.Time
}

// dialWorker synchronises concurrent dials to a peer. It ensures that we make at most one dial to a
// peer's address
type dialWorker struct {
	s    *Swarm
	peer peer.ID
	// reqch is used to send dial requests to the worker. close reqch to end the worker loop
	reqch <-chan dialRequest
	// pendingRequests is the set of pendingRequests
	pendingRequests map[*pendRequest]struct{}
	// trackedDials tracks dials to the peer's addresses. An entry here is used to ensure that
	// we dial an address at most once
	trackedDials map[string]*addrDial
	// resch is used to receive response for dials to the peers addresses.
	resch chan tpt.DialUpdate

	connected bool // true when a connection has been successfully established

	// for testing
	wg sync.WaitGroup
	cl Clock
}

func newDialWorker( *Swarm,  peer.ID,  <-chan dialRequest,  Clock) *dialWorker {
	if  == nil {
		 = RealClock{}
	}
	return &dialWorker{
		s:               ,
		peer:            ,
		reqch:           ,
		pendingRequests: make(map[*pendRequest]struct{}),
		trackedDials:    make(map[string]*addrDial),
		resch:           make(chan tpt.DialUpdate),
		cl:              ,
	}
}

// loop implements the core dial worker loop. Requests are received on w.reqch.
// The loop exits when w.reqch is closed.
func ( *dialWorker) () {
	.wg.Add(1)
	defer .wg.Done()
	defer .s.limiter.clearAllPeerDials(.peer)

	// dq is used to pace dials to different addresses of the peer
	 := newDialQueue()
	// dialsInFlight is the number of dials in flight.
	 := 0

	 := .cl.Now()
	// dialTimer is the dialTimer used to trigger dials
	 := .cl.InstantTimer(.Add(math.MaxInt64))
	defer .Stop()

	 := true
	// scheduleNextDial updates timer for triggering the next dial
	 := func() {
		if  && !.Stop() {
			<-.Ch()
		}
		 = false
		if .Len() > 0 {
			if  == 0 && !.connected {
				// if there are no dials in flight, trigger the next dials immediately
				.Reset()
			} else {
				 := .Add(.top().Delay)
				for ,  := range .trackedDials {
					if !.expectedTCPUpgradeTime.IsZero() && .expectedTCPUpgradeTime.After() {
						 = .expectedTCPUpgradeTime
					}
				}
				.Reset()
			}
			 = true
		}
	}

	// totalDials is used to track number of dials made by this worker for metrics
	 := 0
:
	for {
		// The loop has three parts
		//  1. Input requests are received on w.reqch. If a suitable connection is not available we create
		//     a pendRequest object to track the dialRequest and add the addresses to dq.
		//  2. Addresses from the dialQueue are dialed at appropriate time intervals depending on delay logic.
		//     We are notified of the completion of these dials on w.resch.
		//  3. Responses for dials are received on w.resch. On receiving a response, we updated the pendRequests
		//     interested in dials on this address.

		select {
		case ,  := <-.reqch:
			if ! {
				if .s.metricsTracer != nil {
					.s.metricsTracer.DialCompleted(.connected, , time.Since())
				}
				return
			}
			// We have received a new request. If we do not have a suitable connection,
			// track this dialRequest with a pendRequest.
			// Enqueue the peer's addresses relevant to this request in dq and
			// track dials to the addresses relevant to this request.

			 := .s.bestAcceptableConnToPeer(.ctx, .peer)
			if  != nil {
				.resch <- dialResponse{conn: }
				continue 
			}

			, ,  := .s.addrsForDial(.ctx, .peer)
			if  != nil {
				.resch <- dialResponse{
					err: &DialError{
						Peer:       .peer,
						DialErrors: ,
						Cause:      ,
					}}
				continue 
			}

			// get the delays to dial these addrs from the swarms dialRanker
			, ,  := network.GetSimultaneousConnect(.ctx)
			 := .rankAddrs(, )
			 := make(map[string]time.Duration, len())

			// create the pending request object
			 := &pendRequest{
				req:   ,
				addrs: make(map[string]struct{}, len()),
				err:   &DialError{Peer: .peer, DialErrors: },
			}
			for ,  := range  {
				.addrs[string(.Addr.Bytes())] = struct{}{}
				[string(.Addr.Bytes())] = .Delay
			}

			// Check if dials to any of the addrs have completed already
			// If they have errored, record the error in pr. If they have succeeded,
			// respond with the connection.
			// If they are pending, add them to tojoin.
			// If we haven't seen any of the addresses before, add them to todial.
			var  []ma.Multiaddr
			var  []*addrDial

			for ,  := range  {
				,  := .trackedDials[string(.Addr.Bytes())]
				if ! {
					 = append(, .Addr)
					continue
				}

				if .conn != nil {
					// dial to this addr was successful, complete the request
					.resch <- dialResponse{conn: .conn}
					continue 
				}

				if .err != nil {
					// dial to this addr errored, accumulate the error
					.err.recordErr(.addr, .err)
					delete(.addrs, string(.addr.Bytes()))
					continue
				}

				// dial is still pending, add to the join list
				 = append(, )
			}

			if len() == 0 && len() == 0 {
				// all request applicable addrs have been dialed, we must have errored
				.err.Cause = ErrAllDialsFailed
				.resch <- dialResponse{err: .err}
				continue 
			}

			// The request has some pending or new dials
			.pendingRequests[] = struct{}{}

			for ,  := range  {
				if !.dialed {
					// we haven't dialed this address. update the ad.ctx to have simultaneous connect values
					// set correctly
					if , ,  := network.GetSimultaneousConnect(.ctx);  {
						if , ,  := network.GetSimultaneousConnect(.ctx); ! {
							.ctx = network.WithSimultaneousConnect(.ctx, , )
							// update the element in dq to use the simultaneous connect delay.
							.UpdateOrAdd(network.AddrDelay{
								Addr:  .addr,
								Delay: [string(.addr.Bytes())],
							})
						}
					}
				}
				// add the request to the addrDial
			}

			if len() > 0 {
				 := time.Now()
				// these are new addresses, track them and add them to dq
				for ,  := range  {
					.trackedDials[string(.Bytes())] = &addrDial{
						addr:      ,
						ctx:       .ctx,
						createdAt: ,
					}
					.Add(network.AddrDelay{Addr: , Delay: [string(.Bytes())]})
				}
			}
			// setup dialTimer for updates to dq
			()

		case <-.Ch():
			// It's time to dial the next batch of addresses.
			// We don't check the delay of the addresses received from the queue here
			// because if the timer triggered before the delay, it means that all
			// the inflight dials have errored and we should dial the next batch of
			// addresses
			 := time.Now()
			for ,  := range .NextBatch() {
				// spawn the dial
				,  := .trackedDials[string(.Addr.Bytes())]
				if ! {
					log.Errorf("SWARM BUG: no entry for address %s in trackedDials", .Addr)
					continue
				}
				.dialed = true
				.dialRankingDelay = .Sub(.createdAt)
				 := .s.dialNextAddr(.ctx, .peer, .addr, .resch)
				if  != nil {
					// Errored without attempting a dial. This happens in case of
					// backoff or black hole.
					.dispatchError(, )
				} else {
					// the dial was successful. update inflight dials
					++
					++
				}
			}
			 = false
			// schedule more dials
			()

		case  := <-.resch:
			// A dial to an address has completed.
			// Update all requests waiting on this address. On success, complete the request.
			// On error, record the error

			,  := .trackedDials[string(.Addr.Bytes())]
			if ! {
				log.Errorf("SWARM BUG: no entry for address %s in trackedDials", .Addr)
				if .Conn != nil {
					.Conn.Close()
				}
				--
				continue
			}

			// TCP Connection has been established. Wait for connection upgrade on this address
			// before making new dials.
			if .Kind == tpt.UpdateKindHandshakeProgressed {
				// Only wait for public addresses to complete dialing since private dials
				// are quick any way
				if manet.IsPublicAddr(.Addr) {
					.expectedTCPUpgradeTime = .cl.Now().Add(PublicTCPDelay)
				}
				()
				continue
			}
			--
			.expectedTCPUpgradeTime = time.Time{}
			if .Conn != nil {
				// we got a connection, add it to the swarm
				,  := .s.addConn(.Conn, network.DirOutbound)
				if  != nil {
					// oops no, we failed to add it to the swarm
					.Conn.Close()
					.dispatchError(, )
					continue 
				}

				for  := range .pendingRequests {
					if ,  := .addrs[string(.addr.Bytes())];  {
						.req.resch <- dialResponse{conn: }
						delete(.pendingRequests, )
					}
				}

				.conn = 
				if !.connected {
					.connected = true
					if .s.metricsTracer != nil {
						.s.metricsTracer.DialRankingDelay(.dialRankingDelay)
					}
				}

				continue 
			}

			// it must be an error -- add backoff if applicable and dispatch
			// ErrDialRefusedBlackHole shouldn't end up here, just a safety check
			if .Err != ErrDialRefusedBlackHole && .Err != context.Canceled && !.connected {
				// we only add backoff if there has not been a successful connection
				// for consistency with the old dialer behavior.
				.s.backf.AddBackoff(.peer, .Addr)
			} else if .Err == ErrDialRefusedBlackHole {
				log.Errorf("SWARM BUG: unexpected ErrDialRefusedBlackHole while dialing peer %s to addr %s",
					.peer, .Addr)
			}

			.dispatchError(, .Err)
			// Only schedule next dial on error.
			// If we scheduleNextDial on success, we will end up making one dial more than
			// required because the final successful dial will spawn one more dial
			()
		}
	}
}

// dispatches an error to a specific addr dial
func ( *dialWorker) ( *addrDial,  error) {
	.err = 
	for  := range .pendingRequests {
		// accumulate the error
		if ,  := .addrs[string(.addr.Bytes())];  {
			.err.recordErr(.addr, )
			delete(.addrs, string(.addr.Bytes()))
			if len(.addrs) == 0 {
				// all addrs have erred, dispatch dial error
				// but first do a last one check in case an acceptable connection has landed from
				// a simultaneous dial that started later and added new acceptable addrs
				 := .s.bestAcceptableConnToPeer(.req.ctx, .peer)
				if  != nil {
					.req.resch <- dialResponse{conn: }
				} else {
					.err.Cause = ErrAllDialsFailed
					.req.resch <- dialResponse{err: .err}
				}
				delete(.pendingRequests, )
			}
		}
	}

	// if it was a backoff, clear the address dial so that it doesn't inhibit new dial requests.
	// this is necessary to support active listen scenarios, where a new dial comes in while
	// another dial is in progress, and needs to do a direct connection without inhibitions from
	// dial backoff.
	if  == ErrDialBackoff {
		delete(.trackedDials, string(.addr.Bytes()))
	}
}

// rankAddrs ranks addresses for dialing. if it's a simConnect request we
// dial all addresses immediately without any delay
func ( *dialWorker) ( []ma.Multiaddr,  bool) []network.AddrDelay {
	if  {
		return NoDelayDialRanker()
	}
	return .s.dialRanker()
}

// dialQueue is a priority queue used to schedule dials
type dialQueue struct {
	// q contains dials ordered by delay
	q []network.AddrDelay
}

// newDialQueue returns a new dialQueue
func newDialQueue() *dialQueue {
	return &dialQueue{
		q: make([]network.AddrDelay, 0, 16),
	}
}

// Add adds a new element to the dialQueue. To update an element use UpdateOrAdd.
func ( *dialQueue) ( network.AddrDelay) {
	for  := .Len() - 1;  >= 0; -- {
		if .q[].Delay <= .Delay {
			// insert at pos i+1
			.q = append(.q, network.AddrDelay{}) // extend the slice
			copy(.q[+2:], .q[+1:])
			.q[+1] = 
			return
		}
	}
	// insert at position 0
	.q = append(.q, network.AddrDelay{}) // extend the slice
	copy(.q[1:], .q[0:])
	.q[0] = 
}

// UpdateOrAdd updates the elements with address adelay.Addr to the new delay
// Useful when hole punching
func ( *dialQueue) ( network.AddrDelay) {
	for  := 0;  < .Len(); ++ {
		if .q[].Addr.Equal(.Addr) {
			if .q[].Delay == .Delay {
				// existing element is the same. nothing to do
				return
			}
			// remove the element
			copy(.q[:], .q[+1:])
			.q = .q[:len(.q)-1]
		}
	}
	.Add()
}

// NextBatch returns all the elements in the queue with the highest priority
func ( *dialQueue) () []network.AddrDelay {
	if .Len() == 0 {
		return nil
	}

	// i is the index of the second highest priority element
	var  int
	for  = 0;  < .Len(); ++ {
		if .q[].Delay != .q[0].Delay {
			break
		}
	}
	 := .q[:]
	.q = .q[:]
	return 
}

// top returns the top element of the queue
func ( *dialQueue) () network.AddrDelay {
	return .q[0]
}

// Len returns the number of elements in the queue
func ( *dialQueue) () int {
	return len(.q)
}