package redis

import (
	
	
	
	
	
	
	
	
	
	
	
	

	
	
	
	
	
)

var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")

// ClusterOptions are used to configure a cluster client and should be
// passed to NewClusterClient.
type ClusterOptions struct {
	// A seed list of host:port addresses of cluster nodes.
	Addrs []string

	// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
	ClientName string

	// NewClient creates a cluster node client with provided name and options.
	NewClient func(opt *Options) *Client

	// The maximum number of retries before giving up. Command is retried
	// on network errors and MOVED/ASK redirects.
	// Default is 3 retries.
	MaxRedirects int

	// Enables read-only commands on slave nodes.
	ReadOnly bool
	// Allows routing read-only commands to the closest master or slave node.
	// It automatically enables ReadOnly.
	RouteByLatency bool
	// Allows routing read-only commands to the random master or slave node.
	// It automatically enables ReadOnly.
	RouteRandomly bool

	// Optional function that returns cluster slots information.
	// It is useful to manually create cluster of standalone Redis servers
	// and load-balance read/write operations between master and slaves.
	// It can use service like ZooKeeper to maintain configuration information
	// and Cluster.ReloadState to manually trigger state reloading.
	ClusterSlots func(context.Context) ([]ClusterSlot, error)

	// Following options are copied from Options struct.

	Dialer func(ctx context.Context, network, addr string) (net.Conn, error)

	OnConnect func(ctx context.Context, cn *Conn) error

	Username string
	Password string

	MaxRetries      int
	MinRetryBackoff time.Duration
	MaxRetryBackoff time.Duration

	DialTimeout           time.Duration
	ReadTimeout           time.Duration
	WriteTimeout          time.Duration
	ContextTimeoutEnabled bool

	PoolFIFO        bool
	PoolSize        int // applies per cluster node and not for the whole cluster
	PoolTimeout     time.Duration
	MinIdleConns    int
	MaxIdleConns    int
	ConnMaxIdleTime time.Duration
	ConnMaxLifetime time.Duration

	TLSConfig *tls.Config
}

func ( *ClusterOptions) () {
	if .MaxRedirects == -1 {
		.MaxRedirects = 0
	} else if .MaxRedirects == 0 {
		.MaxRedirects = 3
	}

	if .RouteByLatency || .RouteRandomly {
		.ReadOnly = true
	}

	if .PoolSize == 0 {
		.PoolSize = 5 * runtime.GOMAXPROCS(0)
	}

	switch .ReadTimeout {
	case -1:
		.ReadTimeout = 0
	case 0:
		.ReadTimeout = 3 * time.Second
	}
	switch .WriteTimeout {
	case -1:
		.WriteTimeout = 0
	case 0:
		.WriteTimeout = .ReadTimeout
	}

	if .MaxRetries == 0 {
		.MaxRetries = -1
	}
	switch .MinRetryBackoff {
	case -1:
		.MinRetryBackoff = 0
	case 0:
		.MinRetryBackoff = 8 * time.Millisecond
	}
	switch .MaxRetryBackoff {
	case -1:
		.MaxRetryBackoff = 0
	case 0:
		.MaxRetryBackoff = 512 * time.Millisecond
	}

	if .NewClient == nil {
		.NewClient = NewClient
	}
}

// ParseClusterURL parses a URL into ClusterOptions that can be used to connect to Redis.
// The URL must be in the form:
//
//	redis://<user>:<password>@<host>:<port>
//	or
//	rediss://<user>:<password>@<host>:<port>
//
// To add additional addresses, specify the query parameter, "addr" one or more times. e.g:
//
//	redis://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3>
//	or
//	rediss://<user>:<password>@<host>:<port>?addr=<host2>:<port2>&addr=<host3>:<port3>
//
// Most Option fields can be set using query parameters, with the following restrictions:
//   - field names are mapped using snake-case conversion: to set MaxRetries, use max_retries
//   - only scalar type fields are supported (bool, int, time.Duration)
//   - for time.Duration fields, values must be a valid input for time.ParseDuration();
//     additionally a plain integer as value (i.e. without unit) is intepreted as seconds
//   - to disable a duration field, use value less than or equal to 0; to use the default
//     value, leave the value blank or remove the parameter
//   - only the last value is interpreted if a parameter is given multiple times
//   - fields "network", "addr", "username" and "password" can only be set using other
//     URL attributes (scheme, host, userinfo, resp.), query paremeters using these
//     names will be treated as unknown parameters
//   - unknown parameter names will result in an error
//
// Example:
//
//	redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791
//	is equivalent to:
//	&ClusterOptions{
//		Addr:        ["localhost:6789", "localhost:6790", "localhost:6791"]
//		DialTimeout: 3 * time.Second, // no time unit = seconds
//		ReadTimeout: 6 * time.Second,
//	}
func ( string) (*ClusterOptions, error) {
	 := &ClusterOptions{}

	,  := url.Parse()
	if  != nil {
		return nil, 
	}

	// add base URL to the array of addresses
	// more addresses may be added through the URL params
	,  := getHostPortWithDefaults()
	.Addrs = append(.Addrs, net.JoinHostPort(, ))

	// setup username, password, and other configurations
	,  = setupClusterConn(, , )
	if  != nil {
		return nil, 
	}

	return , nil
}

// setupClusterConn gets the username and password from the URL and the query parameters.
func setupClusterConn( *url.URL,  string,  *ClusterOptions) (*ClusterOptions, error) {
	switch .Scheme {
	case "rediss":
		.TLSConfig = &tls.Config{ServerName: }
		fallthrough
	case "redis":
		.Username, .Password = getUserPassword()
	default:
		return nil, fmt.Errorf("redis: invalid URL scheme: %s", .Scheme)
	}

	// retrieve the configuration from the query parameters
	,  := setupClusterQueryParams(, )
	if  != nil {
		return nil, 
	}

	return , nil
}

// setupClusterQueryParams converts query parameters in u to option value in o.
func setupClusterQueryParams( *url.URL,  *ClusterOptions) (*ClusterOptions, error) {
	 := queryOptions{q: .Query()}

	.ClientName = .string("client_name")
	.MaxRedirects = .int("max_redirects")
	.ReadOnly = .bool("read_only")
	.RouteByLatency = .bool("route_by_latency")
	.RouteRandomly = .bool("route_randomly")
	.MaxRetries = .int("max_retries")
	.MinRetryBackoff = .duration("min_retry_backoff")
	.MaxRetryBackoff = .duration("max_retry_backoff")
	.DialTimeout = .duration("dial_timeout")
	.ReadTimeout = .duration("read_timeout")
	.WriteTimeout = .duration("write_timeout")
	.PoolFIFO = .bool("pool_fifo")
	.PoolSize = .int("pool_size")
	.MinIdleConns = .int("min_idle_conns")
	.PoolTimeout = .duration("pool_timeout")
	.ConnMaxLifetime = .duration("conn_max_lifetime")
	.ConnMaxIdleTime = .duration("conn_max_idle_time")

	if .err != nil {
		return nil, .err
	}

	// addr can be specified as many times as needed
	 := .strings("addr")
	for ,  := range  {
		, ,  := net.SplitHostPort()
		if  != nil ||  == "" ||  == "" {
			return nil, fmt.Errorf("redis: unable to parse addr param: %s", )
		}

		.Addrs = append(.Addrs, net.JoinHostPort(, ))
	}

	// any parameters left?
	if  := .remaining(); len() > 0 {
		return nil, fmt.Errorf("redis: unexpected option: %s", strings.Join(, ", "))
	}

	return , nil
}

func ( *ClusterOptions) () *Options {
	return &Options{
		ClientName: .ClientName,
		Dialer:     .Dialer,
		OnConnect:  .OnConnect,

		Username: .Username,
		Password: .Password,

		MaxRetries:      .MaxRetries,
		MinRetryBackoff: .MinRetryBackoff,
		MaxRetryBackoff: .MaxRetryBackoff,

		DialTimeout:  .DialTimeout,
		ReadTimeout:  .ReadTimeout,
		WriteTimeout: .WriteTimeout,

		PoolFIFO:        .PoolFIFO,
		PoolSize:        .PoolSize,
		PoolTimeout:     .PoolTimeout,
		MinIdleConns:    .MinIdleConns,
		MaxIdleConns:    .MaxIdleConns,
		ConnMaxIdleTime: .ConnMaxIdleTime,
		ConnMaxLifetime: .ConnMaxLifetime,

		TLSConfig: .TLSConfig,
		// If ClusterSlots is populated, then we probably have an artificial
		// cluster whose nodes are not in clustering mode (otherwise there isn't
		// much use for ClusterSlots config).  This means we cannot execute the
		// READONLY command against that node -- setting readOnly to false in such
		// situations in the options below will prevent that from happening.
		readOnly: .ReadOnly && .ClusterSlots == nil,
	}
}

//------------------------------------------------------------------------------

type clusterNode struct {
	Client *Client

	latency    uint32 // atomic
	generation uint32 // atomic
	failing    uint32 // atomic
}

func newClusterNode( *ClusterOptions,  string) *clusterNode {
	 := .clientOptions()
	.Addr = 
	 := clusterNode{
		Client: .NewClient(),
	}

	.latency = math.MaxUint32
	if .RouteByLatency {
		go .updateLatency()
	}

	return &
}

func ( *clusterNode) () string {
	return .Client.String()
}

func ( *clusterNode) () error {
	return .Client.Close()
}

func ( *clusterNode) () {
	const  = 10
	var  uint64

	 := 0
	for  := 0;  < ; ++ {
		time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond)

		 := time.Now()
		 := .Client.Ping(context.TODO()).Err()
		if  == nil {
			 += uint64(time.Since() / time.Microsecond)
			++
		}
	}

	var  float64
	if  == 0 {
		// If none of the pings worked, set latency to some arbitrarily high value so this node gets
		// least priority.
		 = float64((1 * time.Minute) / time.Microsecond)
	} else {
		 = float64() / float64()
	}
	atomic.StoreUint32(&.latency, uint32(+0.5))
}

func ( *clusterNode) () time.Duration {
	 := atomic.LoadUint32(&.latency)
	return time.Duration() * time.Microsecond
}

func ( *clusterNode) () {
	atomic.StoreUint32(&.failing, uint32(time.Now().Unix()))
}

func ( *clusterNode) () bool {
	const  = 15 // 15 seconds

	 := atomic.LoadUint32(&.failing)
	if  == 0 {
		return false
	}
	if time.Now().Unix()-int64() <  {
		return true
	}
	atomic.StoreUint32(&.failing, 0)
	return false
}

func ( *clusterNode) () uint32 {
	return atomic.LoadUint32(&.generation)
}

func ( *clusterNode) ( uint32) {
	for {
		 := atomic.LoadUint32(&.generation)
		if  <  || atomic.CompareAndSwapUint32(&.generation, , ) {
			break
		}
	}
}

//------------------------------------------------------------------------------

type clusterNodes struct {
	opt *ClusterOptions

	mu          sync.RWMutex
	addrs       []string
	nodes       map[string]*clusterNode
	activeAddrs []string
	closed      bool
	onNewNode   []func(rdb *Client)

	_generation uint32 // atomic
}

func newClusterNodes( *ClusterOptions) *clusterNodes {
	return &clusterNodes{
		opt: ,

		addrs: .Addrs,
		nodes: make(map[string]*clusterNode),
	}
}

func ( *clusterNodes) () error {
	.mu.Lock()
	defer .mu.Unlock()

	if .closed {
		return nil
	}
	.closed = true

	var  error
	for ,  := range .nodes {
		if  := .Client.Close();  != nil &&  == nil {
			 = 
		}
	}

	.nodes = nil
	.activeAddrs = nil

	return 
}

func ( *clusterNodes) ( func( *Client)) {
	.mu.Lock()
	.onNewNode = append(.onNewNode, )
	.mu.Unlock()
}

func ( *clusterNodes) () ([]string, error) {
	var  []string

	.mu.RLock()
	 := .closed //nolint:ifshort
	if ! {
		if len(.activeAddrs) > 0 {
			 = .activeAddrs
		} else {
			 = .addrs
		}
	}
	.mu.RUnlock()

	if  {
		return nil, pool.ErrClosed
	}
	if len() == 0 {
		return nil, errClusterNoNodes
	}
	return , nil
}

func ( *clusterNodes) () uint32 {
	return atomic.AddUint32(&._generation, 1)
}

// GC removes unused nodes.
func ( *clusterNodes) ( uint32) {
	//nolint:prealloc
	var  []*clusterNode

	.mu.Lock()

	.activeAddrs = .activeAddrs[:0]
	for ,  := range .nodes {
		if .Generation() >=  {
			.activeAddrs = append(.activeAddrs, )
			if .opt.RouteByLatency {
				go .updateLatency()
			}
			continue
		}

		delete(.nodes, )
		 = append(, )
	}

	.mu.Unlock()

	for ,  := range  {
		_ = .Client.Close()
	}
}

func ( *clusterNodes) ( string) (*clusterNode, error) {
	,  := .get()
	if  != nil {
		return nil, 
	}
	if  != nil {
		return , nil
	}

	.mu.Lock()
	defer .mu.Unlock()

	if .closed {
		return nil, pool.ErrClosed
	}

	,  := .nodes[]
	if  {
		return , nil
	}

	 = newClusterNode(.opt, )
	for ,  := range .onNewNode {
		(.Client)
	}

	.addrs = appendIfNotExists(.addrs, )
	.nodes[] = 

	return , nil
}

func ( *clusterNodes) ( string) (*clusterNode, error) {
	var  *clusterNode
	var  error
	.mu.RLock()
	if .closed {
		 = pool.ErrClosed
	} else {
		 = .nodes[]
	}
	.mu.RUnlock()
	return , 
}

func ( *clusterNodes) () ([]*clusterNode, error) {
	.mu.RLock()
	defer .mu.RUnlock()

	if .closed {
		return nil, pool.ErrClosed
	}

	 := make([]*clusterNode, 0, len(.nodes))
	for ,  := range .nodes {
		 = append(, )
	}
	return , nil
}

func ( *clusterNodes) () (*clusterNode, error) {
	,  := .Addrs()
	if  != nil {
		return nil, 
	}

	 := rand.Intn(len())
	return .GetOrCreate([])
}

//------------------------------------------------------------------------------

type clusterSlot struct {
	start, end int
	nodes      []*clusterNode
}

type clusterSlotSlice []*clusterSlot

func ( clusterSlotSlice) () int {
	return len()
}

func ( clusterSlotSlice) (,  int) bool {
	return [].start < [].start
}

func ( clusterSlotSlice) (,  int) {
	[], [] = [], []
}

type clusterState struct {
	nodes   *clusterNodes
	Masters []*clusterNode
	Slaves  []*clusterNode

	slots []*clusterSlot

	generation uint32
	createdAt  time.Time
}

func newClusterState(
	 *clusterNodes,  []ClusterSlot,  string,
) (*clusterState, error) {
	 := clusterState{
		nodes: ,

		slots: make([]*clusterSlot, 0, len()),

		generation: .NextGeneration(),
		createdAt:  time.Now(),
	}

	, ,  := net.SplitHostPort()
	 := isLoopback()

	for ,  := range  {
		var  []*clusterNode
		for ,  := range .Nodes {
			 := .Addr
			if ! {
				 = replaceLoopbackHost(, )
			}

			,  := .nodes.GetOrCreate()
			if  != nil {
				return nil, 
			}

			.SetGeneration(.generation)
			 = append(, )

			if  == 0 {
				.Masters = appendUniqueNode(.Masters, )
			} else {
				.Slaves = appendUniqueNode(.Slaves, )
			}
		}

		.slots = append(.slots, &clusterSlot{
			start: .Start,
			end:   .End,
			nodes: ,
		})
	}

	sort.Sort(clusterSlotSlice(.slots))

	time.AfterFunc(time.Minute, func() {
		.GC(.generation)
	})

	return &, nil
}

func replaceLoopbackHost(,  string) string {
	, ,  := net.SplitHostPort()
	if  != nil {
		return 
	}

	 := net.ParseIP()
	if  == nil {
		return 
	}

	if !.IsLoopback() {
		return 
	}

	// Use origin host which is not loopback and node port.
	return net.JoinHostPort(, )
}

func isLoopback( string) bool {
	 := net.ParseIP()
	if  == nil {
		return true
	}
	return .IsLoopback()
}

func ( *clusterState) ( int) (*clusterNode, error) {
	 := .slotNodes()
	if len() > 0 {
		return [0], nil
	}
	return .nodes.Random()
}

func ( *clusterState) ( int) (*clusterNode, error) {
	 := .slotNodes()
	switch len() {
	case 0:
		return .nodes.Random()
	case 1:
		return [0], nil
	case 2:
		if  := [1]; !.Failing() {
			return , nil
		}
		return [0], nil
	default:
		var  *clusterNode
		for  := 0;  < 10; ++ {
			 := rand.Intn(len()-1) + 1
			 = []
			if !.Failing() {
				return , nil
			}
		}

		// All slaves are loading - use master.
		return [0], nil
	}
}

func ( *clusterState) ( int) (*clusterNode, error) {
	 := .slotNodes()
	if len() == 0 {
		return .nodes.Random()
	}

	var  *clusterNode
	for ,  := range  {
		if .Failing() {
			continue
		}
		if  == nil || .Latency() < .Latency() {
			 = 
		}
	}
	if  != nil {
		return , nil
	}

	// If all nodes are failing - return random node
	return .nodes.Random()
}

func ( *clusterState) ( int) (*clusterNode, error) {
	 := .slotNodes()
	if len() == 0 {
		return .nodes.Random()
	}
	if len() == 1 {
		return [0], nil
	}
	 := rand.Perm(len())
	for ,  := range  {
		if  := []; !.Failing() {
			return , nil
		}
	}
	return [[0]], nil
}

func ( *clusterState) ( int) []*clusterNode {
	 := sort.Search(len(.slots), func( int) bool {
		return .slots[].end >= 
	})
	if  >= len(.slots) {
		return nil
	}
	 := .slots[]
	if  >= .start &&  <= .end {
		return .nodes
	}
	return nil
}

//------------------------------------------------------------------------------

type clusterStateHolder struct {
	load func(ctx context.Context) (*clusterState, error)

	state     atomic.Value
	reloading uint32 // atomic
}

func newClusterStateHolder( func( context.Context) (*clusterState, error)) *clusterStateHolder {
	return &clusterStateHolder{
		load: ,
	}
}

func ( *clusterStateHolder) ( context.Context) (*clusterState, error) {
	,  := .load()
	if  != nil {
		return nil, 
	}
	.state.Store()
	return , nil
}

func ( *clusterStateHolder) () {
	if !atomic.CompareAndSwapUint32(&.reloading, 0, 1) {
		return
	}
	go func() {
		defer atomic.StoreUint32(&.reloading, 0)

		,  := .Reload(context.Background())
		if  != nil {
			return
		}
		time.Sleep(200 * time.Millisecond)
	}()
}

func ( *clusterStateHolder) ( context.Context) (*clusterState, error) {
	 := .state.Load()
	if  == nil {
		return .Reload()
	}

	 := .(*clusterState)
	if time.Since(.createdAt) > 10*time.Second {
		.LazyReload()
	}
	return , nil
}

func ( *clusterStateHolder) ( context.Context) (*clusterState, error) {
	,  := .Reload()
	if  == nil {
		return , nil
	}
	return .Get()
}

//------------------------------------------------------------------------------

// ClusterClient is a Redis Cluster client representing a pool of zero
// or more underlying connections. It's safe for concurrent use by
// multiple goroutines.
type ClusterClient struct {
	opt           *ClusterOptions
	nodes         *clusterNodes
	state         *clusterStateHolder
	cmdsInfoCache *cmdsInfoCache
	cmdable
	hooksMixin
}

// NewClusterClient returns a Redis Cluster client as described in
// http://redis.io/topics/cluster-spec.
func ( *ClusterOptions) *ClusterClient {
	.init()

	 := &ClusterClient{
		opt:   ,
		nodes: newClusterNodes(),
	}

	.state = newClusterStateHolder(.loadState)
	.cmdsInfoCache = newCmdsInfoCache(.cmdsInfo)
	.cmdable = .Process

	.initHooks(hooks{
		dial:       nil,
		process:    .process,
		pipeline:   .processPipeline,
		txPipeline: .processTxPipeline,
	})

	return 
}

// Options returns read-only Options that were used to create the client.
func ( *ClusterClient) () *ClusterOptions {
	return .opt
}

// ReloadState reloads cluster state. If available it calls ClusterSlots func
// to get cluster slots information.
func ( *ClusterClient) ( context.Context) {
	.state.LazyReload()
}

// Close closes the cluster client, releasing any open resources.
//
// It is rare to Close a ClusterClient, as the ClusterClient is meant
// to be long-lived and shared between many goroutines.
func ( *ClusterClient) () error {
	return .nodes.Close()
}

// Do create a Cmd from the args and processes the cmd.
func ( *ClusterClient) ( context.Context,  ...interface{}) *Cmd {
	 := NewCmd(, ...)
	_ = .Process(, )
	return 
}

func ( *ClusterClient) ( context.Context,  Cmder) error {
	 := .processHook(, )
	.SetErr()
	return 
}

func ( *ClusterClient) ( context.Context,  Cmder) error {
	 := .cmdInfo(, .Name())
	 := .cmdSlot(, )
	var  *clusterNode
	var  bool
	var  error
	for  := 0;  <= .opt.MaxRedirects; ++ {
		if  > 0 {
			if  := internal.Sleep(, .retryBackoff());  != nil {
				return 
			}
		}

		if  == nil {
			var  error
			,  = .cmdNode(, , )
			if  != nil {
				return 
			}
		}

		if  {
			 = false

			 := .Client.Pipeline()
			_ = .Process(, NewCmd(, "asking"))
			_ = .Process(, )
			_,  = .Exec()
		} else {
			 = .Client.Process(, )
		}

		// If there is no error - we are done.
		if  == nil {
			return nil
		}
		if  := isReadOnlyError();  ||  == pool.ErrClosed {
			if  {
				.state.LazyReload()
			}
			 = nil
			continue
		}

		// If slave is loading - pick another node.
		if .opt.ReadOnly && isLoadingError() {
			.MarkAsFailing()
			 = nil
			continue
		}

		var  bool
		var  string
		, ,  = isMovedError()
		if  ||  {
			.state.LazyReload()

			var  error
			,  = .nodes.GetOrCreate()
			if  != nil {
				return 
			}
			continue
		}

		if shouldRetry(, .readTimeout() == nil) {
			// First retry the same node.
			if  == 0 {
				continue
			}

			// Second try another node.
			.MarkAsFailing()
			 = nil
			continue
		}

		return 
	}
	return 
}

func ( *ClusterClient) ( func( *Client)) {
	.nodes.OnNewNode()
}

// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
func ( *ClusterClient) (
	 context.Context,
	 func( context.Context,  *Client) error,
) error {
	,  := .state.ReloadOrGet()
	if  != nil {
		return 
	}

	var  sync.WaitGroup
	 := make(chan error, 1)

	for ,  := range .Masters {
		.Add(1)
		go func( *clusterNode) {
			defer .Done()
			 := (, .Client)
			if  != nil {
				select {
				case  <- :
				default:
				}
			}
		}()
	}

	.Wait()

	select {
	case  := <-:
		return 
	default:
		return nil
	}
}

// ForEachSlave concurrently calls the fn on each slave node in the cluster.
// It returns the first error if any.
func ( *ClusterClient) (
	 context.Context,
	 func( context.Context,  *Client) error,
) error {
	,  := .state.ReloadOrGet()
	if  != nil {
		return 
	}

	var  sync.WaitGroup
	 := make(chan error, 1)

	for ,  := range .Slaves {
		.Add(1)
		go func( *clusterNode) {
			defer .Done()
			 := (, .Client)
			if  != nil {
				select {
				case  <- :
				default:
				}
			}
		}()
	}

	.Wait()

	select {
	case  := <-:
		return 
	default:
		return nil
	}
}

// ForEachShard concurrently calls the fn on each known node in the cluster.
// It returns the first error if any.
func ( *ClusterClient) (
	 context.Context,
	 func( context.Context,  *Client) error,
) error {
	,  := .state.ReloadOrGet()
	if  != nil {
		return 
	}

	var  sync.WaitGroup
	 := make(chan error, 1)

	 := func( *clusterNode) {
		defer .Done()
		 := (, .Client)
		if  != nil {
			select {
			case  <- :
			default:
			}
		}
	}

	for ,  := range .Masters {
		.Add(1)
		go ()
	}
	for ,  := range .Slaves {
		.Add(1)
		go ()
	}

	.Wait()

	select {
	case  := <-:
		return 
	default:
		return nil
	}
}

// PoolStats returns accumulated connection pool stats.
func ( *ClusterClient) () *PoolStats {
	var  PoolStats

	,  := .state.Get(context.TODO())
	if  == nil {
		return &
	}

	for ,  := range .Masters {
		 := .Client.connPool.Stats()
		.Hits += .Hits
		.Misses += .Misses
		.Timeouts += .Timeouts

		.TotalConns += .TotalConns
		.IdleConns += .IdleConns
		.StaleConns += .StaleConns
	}

	for ,  := range .Slaves {
		 := .Client.connPool.Stats()
		.Hits += .Hits
		.Misses += .Misses
		.Timeouts += .Timeouts

		.TotalConns += .TotalConns
		.IdleConns += .IdleConns
		.StaleConns += .StaleConns
	}

	return &
}

func ( *ClusterClient) ( context.Context) (*clusterState, error) {
	if .opt.ClusterSlots != nil {
		,  := .opt.ClusterSlots()
		if  != nil {
			return nil, 
		}
		return newClusterState(.nodes, , "")
	}

	,  := .nodes.Addrs()
	if  != nil {
		return nil, 
	}

	var  error

	for ,  := range rand.Perm(len()) {
		 := []

		,  := .nodes.GetOrCreate()
		if  != nil {
			if  == nil {
				 = 
			}
			continue
		}

		,  := .Client.ClusterSlots().Result()
		if  != nil {
			if  == nil {
				 = 
			}
			continue
		}

		return newClusterState(.nodes, , .Client.opt.Addr)
	}

	/*
	 * No node is connectable. It's possible that all nodes' IP has changed.
	 * Clear activeAddrs to let client be able to re-connect using the initial
	 * setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]),
	 * which might have chance to resolve domain name and get updated IP address.
	 */
	.nodes.mu.Lock()
	.nodes.activeAddrs = nil
	.nodes.mu.Unlock()

	return nil, 
}

func ( *ClusterClient) () Pipeliner {
	 := Pipeline{
		exec: pipelineExecer(.processPipelineHook),
	}
	.init()
	return &
}

func ( *ClusterClient) ( context.Context,  func(Pipeliner) error) ([]Cmder, error) {
	return .Pipeline().Pipelined(, )
}

func ( *ClusterClient) ( context.Context,  []Cmder) error {
	 := newCmdsMap()

	if  := .mapCmdsByNode(, , );  != nil {
		setCmdsErr(, )
		return 
	}

	for  := 0;  <= .opt.MaxRedirects; ++ {
		if  > 0 {
			if  := internal.Sleep(, .retryBackoff());  != nil {
				setCmdsErr(, )
				return 
			}
		}

		 := newCmdsMap()
		var  sync.WaitGroup

		for ,  := range .m {
			.Add(1)
			go func( *clusterNode,  []Cmder) {
				defer .Done()
				.processPipelineNode(, , , )
			}(, )
		}

		.Wait()
		if len(.m) == 0 {
			break
		}
		 = 
	}

	return cmdsFirstErr()
}

func ( *ClusterClient) ( context.Context,  *cmdsMap,  []Cmder) error {
	,  := .state.Get()
	if  != nil {
		return 
	}

	if .opt.ReadOnly && .cmdsAreReadOnly(, ) {
		for ,  := range  {
			 := .cmdSlot(, )
			,  := .slotReadOnlyNode(, )
			if  != nil {
				return 
			}
			.Add(, )
		}
		return nil
	}

	for ,  := range  {
		 := .cmdSlot(, )
		,  := .slotMasterNode()
		if  != nil {
			return 
		}
		.Add(, )
	}
	return nil
}

func ( *ClusterClient) ( context.Context,  []Cmder) bool {
	for ,  := range  {
		 := .cmdInfo(, .Name())
		if  == nil || !.ReadOnly {
			return false
		}
	}
	return true
}

func ( *ClusterClient) (
	 context.Context,  *clusterNode,  []Cmder,  *cmdsMap,
) {
	_ = .Client.withProcessPipelineHook(, , func( context.Context,  []Cmder) error {
		,  := .Client.getConn()
		if  != nil {
			_ = .mapCmdsByNode(, , )
			setCmdsErr(, )
			return 
		}

		var  error
		defer func() {
			.Client.releaseConn(, , )
		}()
		 = .processPipelineNodeConn(, , , , )

		return 
	})
}

func ( *ClusterClient) (
	 context.Context,  *clusterNode,  *pool.Conn,  []Cmder,  *cmdsMap,
) error {
	if  := .WithWriter(.context(), .opt.WriteTimeout, func( *proto.Writer) error {
		return writeCmds(, )
	});  != nil {
		if shouldRetry(, true) {
			_ = .mapCmdsByNode(, , )
		}
		setCmdsErr(, )
		return 
	}

	return .WithReader(.context(), .opt.ReadTimeout, func( *proto.Reader) error {
		return .pipelineReadCmds(, , , , )
	})
}

func ( *ClusterClient) (
	 context.Context,
	 *clusterNode,
	 *proto.Reader,
	 []Cmder,
	 *cmdsMap,
) error {
	for ,  := range  {
		 := .readReply()
		.SetErr()

		if  == nil {
			continue
		}

		if .checkMovedErr(, , , ) {
			continue
		}

		if .opt.ReadOnly {
			.MarkAsFailing()
		}

		if !isRedisError() {
			if shouldRetry(, true) {
				_ = .mapCmdsByNode(, , )
			}
			setCmdsErr([+1:], )
			return 
		}
	}

	if  := [0].Err();  != nil && shouldRetry(, true) {
		_ = .mapCmdsByNode(, , )
		return 
	}

	return nil
}

func ( *ClusterClient) (
	 context.Context,  Cmder,  error,  *cmdsMap,
) bool {
	, ,  := isMovedError()
	if ! && ! {
		return false
	}

	,  := .nodes.GetOrCreate()
	if  != nil {
		return false
	}

	if  {
		.state.LazyReload()
		.Add(, )
		return true
	}

	if  {
		.Add(, NewCmd(, "asking"), )
		return true
	}

	panic("not reached")
}

// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func ( *ClusterClient) () Pipeliner {
	 := Pipeline{
		exec: func( context.Context,  []Cmder) error {
			 = wrapMultiExec(, )
			return .processTxPipelineHook(, )
		},
	}
	.init()
	return &
}

func ( *ClusterClient) ( context.Context,  func(Pipeliner) error) ([]Cmder, error) {
	return .TxPipeline().Pipelined(, )
}

func ( *ClusterClient) ( context.Context,  []Cmder) error {
	// Trim multi .. exec.
	 = [1 : len()-1]

	,  := .state.Get()
	if  != nil {
		setCmdsErr(, )
		return 
	}

	 := .mapCmdsBySlot(, )
	for ,  := range  {
		,  := .slotMasterNode()
		if  != nil {
			setCmdsErr(, )
			continue
		}

		 := map[*clusterNode][]Cmder{: }
		for  := 0;  <= .opt.MaxRedirects; ++ {
			if  > 0 {
				if  := internal.Sleep(, .retryBackoff());  != nil {
					setCmdsErr(, )
					return 
				}
			}

			 := newCmdsMap()
			var  sync.WaitGroup

			for ,  := range  {
				.Add(1)
				go func( *clusterNode,  []Cmder) {
					defer .Done()
					.processTxPipelineNode(, , , )
				}(, )
			}

			.Wait()
			if len(.m) == 0 {
				break
			}
			 = .m
		}
	}

	return cmdsFirstErr()
}

func ( *ClusterClient) ( context.Context,  []Cmder) map[int][]Cmder {
	 := make(map[int][]Cmder)
	for ,  := range  {
		 := .cmdSlot(, )
		[] = append([], )
	}
	return 
}

func ( *ClusterClient) (
	 context.Context,  *clusterNode,  []Cmder,  *cmdsMap,
) {
	 = wrapMultiExec(, )
	_ = .Client.withProcessPipelineHook(, , func( context.Context,  []Cmder) error {
		,  := .Client.getConn()
		if  != nil {
			_ = .mapCmdsByNode(, , )
			setCmdsErr(, )
			return 
		}

		var  error
		defer func() {
			.Client.releaseConn(, , )
		}()
		 = .processTxPipelineNodeConn(, , , , )

		return 
	})
}

func ( *ClusterClient) (
	 context.Context,  *clusterNode,  *pool.Conn,  []Cmder,  *cmdsMap,
) error {
	if  := .WithWriter(.context(), .opt.WriteTimeout, func( *proto.Writer) error {
		return writeCmds(, )
	});  != nil {
		if shouldRetry(, true) {
			_ = .mapCmdsByNode(, , )
		}
		setCmdsErr(, )
		return 
	}

	return .WithReader(.context(), .opt.ReadTimeout, func( *proto.Reader) error {
		 := [0].(*StatusCmd)
		// Trim multi and exec.
		 := [1 : len()-1]

		if  := .txPipelineReadQueued(
			, , , , ,
		);  != nil {
			setCmdsErr(, )

			, ,  := isMovedError()
			if  ||  {
				return .cmdsMoved(, , , , , )
			}

			return 
		}

		return pipelineReadCmds(, )
	})
}

func ( *ClusterClient) (
	 context.Context,
	 *proto.Reader,
	 *StatusCmd,
	 []Cmder,
	 *cmdsMap,
) error {
	// Parse queued replies.
	if  := .readReply();  != nil {
		return 
	}

	for ,  := range  {
		 := .readReply()
		if  == nil || .checkMovedErr(, , , ) || isRedisError() {
			continue
		}
		return 
	}

	// Parse number of replies.
	,  := .ReadLine()
	if  != nil {
		if  == Nil {
			 = TxFailedErr
		}
		return 
	}

	if [0] != proto.RespArray {
		return fmt.Errorf("redis: expected '*', but got line %q", )
	}

	return nil
}

func ( *ClusterClient) (
	 context.Context,  []Cmder,
	,  bool,
	 string,
	 *cmdsMap,
) error {
	,  := .nodes.GetOrCreate()
	if  != nil {
		return 
	}

	if  {
		.state.LazyReload()
		for ,  := range  {
			.Add(, )
		}
		return nil
	}

	if  {
		for ,  := range  {
			.Add(, NewCmd(, "asking"), )
		}
		return nil
	}

	return nil
}

func ( *ClusterClient) ( context.Context,  func(*Tx) error,  ...string) error {
	if len() == 0 {
		return fmt.Errorf("redis: Watch requires at least one key")
	}

	 := hashtag.Slot([0])
	for ,  := range [1:] {
		if hashtag.Slot() !=  {
			 := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
			return 
		}
	}

	,  := .slotMasterNode(, )
	if  != nil {
		return 
	}

	for  := 0;  <= .opt.MaxRedirects; ++ {
		if  > 0 {
			if  := internal.Sleep(, .retryBackoff());  != nil {
				return 
			}
		}

		 = .Client.Watch(, , ...)
		if  == nil {
			break
		}

		, ,  := isMovedError()
		if  ||  {
			,  = .nodes.GetOrCreate()
			if  != nil {
				return 
			}
			continue
		}

		if  := isReadOnlyError();  ||  == pool.ErrClosed {
			if  {
				.state.LazyReload()
			}
			,  = .slotMasterNode(, )
			if  != nil {
				return 
			}
			continue
		}

		if shouldRetry(, true) {
			continue
		}

		return 
	}

	return 
}

func ( *ClusterClient) () *PubSub {
	var  *clusterNode
	 := &PubSub{
		opt: .opt.clientOptions(),

		newConn: func( context.Context,  []string) (*pool.Conn, error) {
			if  != nil {
				panic("node != nil")
			}

			var  error
			if len() > 0 {
				 := hashtag.Slot([0])
				,  = .slotMasterNode(, )
			} else {
				,  = .nodes.Random()
			}
			if  != nil {
				return nil, 
			}

			,  := .Client.newConn(context.TODO())
			if  != nil {
				 = nil

				return nil, 
			}

			return , nil
		},
		closeConn: func( *pool.Conn) error {
			 := .Client.connPool.CloseConn()
			 = nil
			return 
		},
	}
	.init()

	return 
}

// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
func ( *ClusterClient) ( context.Context,  ...string) *PubSub {
	 := .pubSub()
	if len() > 0 {
		_ = .Subscribe(, ...)
	}
	return 
}

// PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription.
func ( *ClusterClient) ( context.Context,  ...string) *PubSub {
	 := .pubSub()
	if len() > 0 {
		_ = .PSubscribe(, ...)
	}
	return 
}

// SSubscribe Subscribes the client to the specified shard channels.
func ( *ClusterClient) ( context.Context,  ...string) *PubSub {
	 := .pubSub()
	if len() > 0 {
		_ = .SSubscribe(, ...)
	}
	return 
}

func ( *ClusterClient) ( int) time.Duration {
	return internal.RetryBackoff(, .opt.MinRetryBackoff, .opt.MaxRetryBackoff)
}

func ( *ClusterClient) ( context.Context) (map[string]*CommandInfo, error) {
	// Try 3 random nodes.
	const  = 3

	,  := .nodes.Addrs()
	if  != nil {
		return nil, 
	}

	var  error

	 := rand.Perm(len())
	if len() >  {
		 = [:]
	}

	for ,  := range  {
		 := []

		,  := .nodes.GetOrCreate()
		if  != nil {
			if  == nil {
				 = 
			}
			continue
		}

		,  := .Client.Command().Result()
		if  == nil {
			return , nil
		}
		if  == nil {
			 = 
		}
	}

	if  == nil {
		panic("not reached")
	}
	return nil, 
}

func ( *ClusterClient) ( context.Context,  string) *CommandInfo {
	,  := .cmdsInfoCache.Get()
	if  != nil {
		internal.Logger.Printf(context.TODO(), "getting command info: %s", )
		return nil
	}

	 := []
	if  == nil {
		internal.Logger.Printf(context.TODO(), "info for cmd=%s not found", )
	}
	return 
}

func ( *ClusterClient) ( context.Context,  Cmder) int {
	 := .Args()
	if [0] == "cluster" && [1] == "getkeysinslot" {
		return [2].(int)
	}

	 := .cmdInfo(, .Name())
	return cmdSlot(, cmdFirstKeyPos(, ))
}

func cmdSlot( Cmder,  int) int {
	if  == 0 {
		return hashtag.RandomSlot()
	}
	 := .stringArg()
	return hashtag.Slot()
}

func ( *ClusterClient) (
	 context.Context,
	 *CommandInfo,
	 int,
) (*clusterNode, error) {
	,  := .state.Get()
	if  != nil {
		return nil, 
	}

	if .opt.ReadOnly &&  != nil && .ReadOnly {
		return .slotReadOnlyNode(, )
	}
	return .slotMasterNode()
}

func ( *ClusterClient) ( *clusterState,  int) (*clusterNode, error) {
	if .opt.RouteByLatency {
		return .slotClosestNode()
	}
	if .opt.RouteRandomly {
		return .slotRandomNode()
	}
	return .slotSlaveNode()
}

func ( *ClusterClient) ( context.Context,  int) (*clusterNode, error) {
	,  := .state.Get()
	if  != nil {
		return nil, 
	}
	return .slotMasterNode()
}

// SlaveForKey gets a client for a replica node to run any command on it.
// This is especially useful if we want to run a particular lua script which has
// only read only commands on the replica.
// This is because other redis commands generally have a flag that points that
// they are read only and automatically run on the replica nodes
// if ClusterOptions.ReadOnly flag is set to true.
func ( *ClusterClient) ( context.Context,  string) (*Client, error) {
	,  := .state.Get()
	if  != nil {
		return nil, 
	}
	 := hashtag.Slot()
	,  := .slotReadOnlyNode(, )
	if  != nil {
		return nil, 
	}
	return .Client, 
}

// MasterForKey return a client to the master node for a particular key.
func ( *ClusterClient) ( context.Context,  string) (*Client, error) {
	 := hashtag.Slot()
	,  := .slotMasterNode(, )
	if  != nil {
		return nil, 
	}
	return .Client, 
}

func ( *ClusterClient) ( context.Context) context.Context {
	if .opt.ContextTimeoutEnabled {
		return 
	}
	return context.Background()
}

func appendUniqueNode( []*clusterNode,  *clusterNode) []*clusterNode {
	for ,  := range  {
		if  ==  {
			return 
		}
	}
	return append(, )
}

func appendIfNotExists( []string,  ...string) []string {
:
	for ,  := range  {
		for ,  := range  {
			if  ==  {
				continue 
			}
		}
		 = append(, )
	}
	return 
}

//------------------------------------------------------------------------------

type cmdsMap struct {
	mu sync.Mutex
	m  map[*clusterNode][]Cmder
}

func newCmdsMap() *cmdsMap {
	return &cmdsMap{
		m: make(map[*clusterNode][]Cmder),
	}
}

func ( *cmdsMap) ( *clusterNode,  ...Cmder) {
	.mu.Lock()
	.m[] = append(.m[], ...)
	.mu.Unlock()
}