package redis

import (
	
	
	
	
	
	
	
	
	

	
	rendezvous  //nolint

	
	
	
	
)

var errRingShardsDown = errors.New("redis: all ring shards are down")

//------------------------------------------------------------------------------

type ConsistentHash interface {
	Get(string) string
}

type rendezvousWrapper struct {
	*rendezvous.Rendezvous
}

func ( rendezvousWrapper) ( string) string {
	return .Lookup()
}

func newRendezvous( []string) ConsistentHash {
	return rendezvousWrapper{rendezvous.New(, xxhash.Sum64String)}
}

//------------------------------------------------------------------------------

// RingOptions are used to configure a ring client and should be
// passed to NewRing.
type RingOptions struct {
	// Map of name => host:port addresses of ring shards.
	Addrs map[string]string

	// NewClient creates a shard client with provided options.
	NewClient func(opt *Options) *Client

	// ClientName will execute the `CLIENT SETNAME ClientName` command for each conn.
	ClientName string

	// Frequency of PING commands sent to check shards availability.
	// Shard is considered down after 3 subsequent failed checks.
	HeartbeatFrequency time.Duration

	// NewConsistentHash returns a consistent hash that is used
	// to distribute keys across the shards.
	//
	// See https://medium.com/@dgryski/consistent-hashing-algorithmic-tradeoffs-ef6b8e2fcae8
	// for consistent hashing algorithmic tradeoffs.
	NewConsistentHash func(shards []string) ConsistentHash

	// Following options are copied from Options struct.

	Dialer    func(ctx context.Context, network, addr string) (net.Conn, error)
	OnConnect func(ctx context.Context, cn *Conn) error

	Username string
	Password string
	DB       int

	MaxRetries      int
	MinRetryBackoff time.Duration
	MaxRetryBackoff time.Duration

	DialTimeout  time.Duration
	ReadTimeout  time.Duration
	WriteTimeout time.Duration

	// PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
	PoolFIFO bool

	PoolSize        int
	PoolTimeout     time.Duration
	MinIdleConns    int
	MaxIdleConns    int
	ConnMaxIdleTime time.Duration
	ConnMaxLifetime time.Duration

	TLSConfig *tls.Config
	Limiter   Limiter
}

func ( *RingOptions) () {
	if .NewClient == nil {
		.NewClient = func( *Options) *Client {
			return NewClient()
		}
	}

	if .HeartbeatFrequency == 0 {
		.HeartbeatFrequency = 500 * time.Millisecond
	}

	if .NewConsistentHash == nil {
		.NewConsistentHash = newRendezvous
	}

	if .MaxRetries == -1 {
		.MaxRetries = 0
	} else if .MaxRetries == 0 {
		.MaxRetries = 3
	}
	switch .MinRetryBackoff {
	case -1:
		.MinRetryBackoff = 0
	case 0:
		.MinRetryBackoff = 8 * time.Millisecond
	}
	switch .MaxRetryBackoff {
	case -1:
		.MaxRetryBackoff = 0
	case 0:
		.MaxRetryBackoff = 512 * time.Millisecond
	}
}

func ( *RingOptions) () *Options {
	return &Options{
		ClientName: .ClientName,
		Dialer:     .Dialer,
		OnConnect:  .OnConnect,

		Username: .Username,
		Password: .Password,
		DB:       .DB,

		MaxRetries: -1,

		DialTimeout:  .DialTimeout,
		ReadTimeout:  .ReadTimeout,
		WriteTimeout: .WriteTimeout,

		PoolFIFO:        .PoolFIFO,
		PoolSize:        .PoolSize,
		PoolTimeout:     .PoolTimeout,
		MinIdleConns:    .MinIdleConns,
		MaxIdleConns:    .MaxIdleConns,
		ConnMaxIdleTime: .ConnMaxIdleTime,
		ConnMaxLifetime: .ConnMaxLifetime,

		TLSConfig: .TLSConfig,
		Limiter:   .Limiter,
	}
}

//------------------------------------------------------------------------------

type ringShard struct {
	Client *Client
	down   int32
	addr   string
}

func newRingShard( *RingOptions,  string) *ringShard {
	 := .clientOptions()
	.Addr = 

	return &ringShard{
		Client: .NewClient(),
		addr:   ,
	}
}

func ( *ringShard) () string {
	var  string
	if .IsUp() {
		 = "up"
	} else {
		 = "down"
	}
	return fmt.Sprintf("%s is %s", .Client, )
}

func ( *ringShard) () bool {
	const  = 3
	return atomic.LoadInt32(&.down) >= 
}

func ( *ringShard) () bool {
	return !.IsDown()
}

// Vote votes to set shard state and returns true if state was changed.
func ( *ringShard) ( bool) bool {
	if  {
		 := .IsDown()
		atomic.StoreInt32(&.down, 0)
		return 
	}

	if .IsDown() {
		return false
	}

	atomic.AddInt32(&.down, 1)
	return .IsDown()
}

//------------------------------------------------------------------------------

type ringSharding struct {
	opt *RingOptions

	mu        sync.RWMutex
	shards    *ringShards
	closed    bool
	hash      ConsistentHash
	numShard  int
	onNewNode []func(rdb *Client)

	// ensures exclusive access to SetAddrs so there is no need
	// to hold mu for the duration of potentially long shard creation
	setAddrsMu sync.Mutex
}

type ringShards struct {
	m    map[string]*ringShard
	list []*ringShard
}

func newRingSharding( *RingOptions) *ringSharding {
	 := &ringSharding{
		opt: ,
	}
	.SetAddrs(.Addrs)

	return 
}

func ( *ringSharding) ( func( *Client)) {
	.mu.Lock()
	.onNewNode = append(.onNewNode, )
	.mu.Unlock()
}

// SetAddrs replaces the shards in use, such that you can increase and
// decrease number of shards, that you use. It will reuse shards that
// existed before and close the ones that will not be used anymore.
func ( *ringSharding) ( map[string]string) {
	.setAddrsMu.Lock()
	defer .setAddrsMu.Unlock()

	 := func( map[string]*ringShard) {
		for ,  := range  {
			if  := .Client.Close();  != nil {
				internal.Logger.Printf(context.Background(), "shard.Close %s failed: %s", , )
			}
		}
	}

	.mu.RLock()
	if .closed {
		.mu.RUnlock()
		return
	}
	 := .shards
	.mu.RUnlock()

	, ,  := .newRingShards(, )

	.mu.Lock()
	if .closed {
		()
		.mu.Unlock()
		return
	}
	.shards = 
	.rebalanceLocked()
	.mu.Unlock()

	()
}

func ( *ringSharding) (
	 map[string]string,  *ringShards,
) ( *ringShards, ,  map[string]*ringShard) {

	 = &ringShards{m: make(map[string]*ringShard, len())}
	 = make(map[string]*ringShard) // indexed by addr
	 = make(map[string]*ringShard)  // indexed by addr

	if  != nil {
		for ,  := range .list {
			[.addr] = 
		}
	}

	for ,  := range  {
		if ,  := [];  {
			.m[] = 
			delete(, )
		} else {
			 := newRingShard(.opt, )
			.m[] = 
			[] = 

			for ,  := range .onNewNode {
				(.Client)
			}
		}
	}

	for ,  := range .m {
		.list = append(.list, )
	}

	return
}

func ( *ringSharding) () []*ringShard {
	var  []*ringShard

	.mu.RLock()
	if !.closed {
		 = .shards.list
	}
	.mu.RUnlock()

	return 
}

func ( *ringSharding) ( string) string {
	 = hashtag.Key()

	var  string

	.mu.RLock()
	defer .mu.RUnlock()

	if .numShard > 0 {
		 = .hash.Get()
	}

	return 
}

func ( *ringSharding) ( string) (*ringShard, error) {
	 = hashtag.Key()

	.mu.RLock()
	defer .mu.RUnlock()

	if .closed {
		return nil, pool.ErrClosed
	}

	if .numShard == 0 {
		return nil, errRingShardsDown
	}

	 := .hash.Get()
	if  == "" {
		return nil, errRingShardsDown
	}
	return .shards.m[], nil
}

func ( *ringSharding) ( string) (*ringShard, error) {
	if  == "" {
		return .Random()
	}

	.mu.RLock()
	defer .mu.RUnlock()

	return .shards.m[], nil
}

func ( *ringSharding) () (*ringShard, error) {
	return .GetByKey(strconv.Itoa(rand.Int()))
}

// Heartbeat monitors state of each shard in the ring.
func ( *ringSharding) ( context.Context,  time.Duration) {
	 := time.NewTicker()
	defer .Stop()

	for {
		select {
		case <-.C:
			var  bool

			for ,  := range .List() {
				 := .Client.Ping().Err()
				 :=  == nil ||  == pool.ErrPoolTimeout
				if .Vote() {
					internal.Logger.Printf(, "ring shard state changed: %s", )
					 = true
				}
			}

			if  {
				.mu.Lock()
				.rebalanceLocked()
				.mu.Unlock()
			}
		case <-.Done():
			return
		}
	}
}

// rebalanceLocked removes dead shards from the Ring.
// Requires c.mu locked.
func ( *ringSharding) () {
	if .closed {
		return
	}
	if .shards == nil {
		return
	}

	 := make([]string, 0, len(.shards.m))

	for ,  := range .shards.m {
		if .IsUp() {
			 = append(, )
		}
	}

	.hash = .opt.NewConsistentHash()
	.numShard = len()
}

func ( *ringSharding) () int {
	.mu.RLock()
	defer .mu.RUnlock()

	return .numShard
}

func ( *ringSharding) () error {
	.mu.Lock()
	defer .mu.Unlock()

	if .closed {
		return nil
	}
	.closed = true

	var  error

	for ,  := range .shards.list {
		if  := .Client.Close();  != nil &&  == nil {
			 = 
		}
	}

	.hash = nil
	.shards = nil
	.numShard = 0

	return 
}

//------------------------------------------------------------------------------

// Ring is a Redis client that uses consistent hashing to distribute
// keys across multiple Redis servers (shards). It's safe for
// concurrent use by multiple goroutines.
//
// Ring monitors the state of each shard and removes dead shards from
// the ring. When a shard comes online it is added back to the ring. This
// gives you maximum availability and partition tolerance, but no
// consistency between different shards or even clients. Each client
// uses shards that are available to the client and does not do any
// coordination when shard state is changed.
//
// Ring should be used when you need multiple Redis servers for caching
// and can tolerate losing data when one of the servers dies.
// Otherwise you should use Redis Cluster.
type Ring struct {
	cmdable
	hooksMixin

	opt               *RingOptions
	sharding          *ringSharding
	cmdsInfoCache     *cmdsInfoCache
	heartbeatCancelFn context.CancelFunc
}

func ( *RingOptions) *Ring {
	.init()

	,  := context.WithCancel(context.Background())

	 := Ring{
		opt:               ,
		sharding:          newRingSharding(),
		heartbeatCancelFn: ,
	}

	.cmdsInfoCache = newCmdsInfoCache(.cmdsInfo)
	.cmdable = .Process

	.initHooks(hooks{
		process: .process,
		pipeline: func( context.Context,  []Cmder) error {
			return .generalProcessPipeline(, , false)
		},
		txPipeline: func( context.Context,  []Cmder) error {
			return .generalProcessPipeline(, , true)
		},
	})

	go .sharding.Heartbeat(, .HeartbeatFrequency)

	return &
}

func ( *Ring) ( map[string]string) {
	.sharding.SetAddrs()
}

// Do create a Cmd from the args and processes the cmd.
func ( *Ring) ( context.Context,  ...interface{}) *Cmd {
	 := NewCmd(, ...)
	_ = .Process(, )
	return 
}

func ( *Ring) ( context.Context,  Cmder) error {
	 := .processHook(, )
	.SetErr()
	return 
}

// Options returns read-only Options that were used to create the client.
func ( *Ring) () *RingOptions {
	return .opt
}

func ( *Ring) ( int) time.Duration {
	return internal.RetryBackoff(, .opt.MinRetryBackoff, .opt.MaxRetryBackoff)
}

// PoolStats returns accumulated connection pool stats.
func ( *Ring) () *PoolStats {
	 := .sharding.List()
	var  PoolStats
	for ,  := range  {
		 := .Client.connPool.Stats()
		.Hits += .Hits
		.Misses += .Misses
		.Timeouts += .Timeouts
		.TotalConns += .TotalConns
		.IdleConns += .IdleConns
	}
	return &
}

// Len returns the current number of shards in the ring.
func ( *Ring) () int {
	return .sharding.Len()
}

// Subscribe subscribes the client to the specified channels.
func ( *Ring) ( context.Context,  ...string) *PubSub {
	if len() == 0 {
		panic("at least one channel is required")
	}

	,  := .sharding.GetByKey([0])
	if  != nil {
		// TODO: return PubSub with sticky error
		panic()
	}
	return .Client.Subscribe(, ...)
}

// PSubscribe subscribes the client to the given patterns.
func ( *Ring) ( context.Context,  ...string) *PubSub {
	if len() == 0 {
		panic("at least one channel is required")
	}

	,  := .sharding.GetByKey([0])
	if  != nil {
		// TODO: return PubSub with sticky error
		panic()
	}
	return .Client.PSubscribe(, ...)
}

// SSubscribe Subscribes the client to the specified shard channels.
func ( *Ring) ( context.Context,  ...string) *PubSub {
	if len() == 0 {
		panic("at least one channel is required")
	}
	,  := .sharding.GetByKey([0])
	if  != nil {
		// TODO: return PubSub with sticky error
		panic()
	}
	return .Client.SSubscribe(, ...)
}

func ( *Ring) ( func( *Client)) {
	.sharding.OnNewNode()
}

// ForEachShard concurrently calls the fn on each live shard in the ring.
// It returns the first error if any.
func ( *Ring) (
	 context.Context,
	 func( context.Context,  *Client) error,
) error {
	 := .sharding.List()
	var  sync.WaitGroup
	 := make(chan error, 1)
	for ,  := range  {
		if .IsDown() {
			continue
		}

		.Add(1)
		go func( *ringShard) {
			defer .Done()
			 := (, .Client)
			if  != nil {
				select {
				case  <- :
				default:
				}
			}
		}()
	}
	.Wait()

	select {
	case  := <-:
		return 
	default:
		return nil
	}
}

func ( *Ring) ( context.Context) (map[string]*CommandInfo, error) {
	 := .sharding.List()
	var  error
	for ,  := range  {
		,  := .Client.Command().Result()
		if  == nil {
			return , nil
		}
		if  == nil {
			 = 
		}
	}
	if  == nil {
		return nil, errRingShardsDown
	}
	return nil, 
}

func ( *Ring) ( context.Context,  string) *CommandInfo {
	,  := .cmdsInfoCache.Get()
	if  != nil {
		return nil
	}
	 := []
	if  == nil {
		internal.Logger.Printf(, "info for cmd=%s not found", )
	}
	return 
}

func ( *Ring) ( context.Context,  Cmder) (*ringShard, error) {
	 := .cmdInfo(, .Name())
	 := cmdFirstKeyPos(, )
	if  == 0 {
		return .sharding.Random()
	}
	 := .stringArg()
	return .sharding.GetByKey()
}

func ( *Ring) ( context.Context,  Cmder) error {
	var  error
	for  := 0;  <= .opt.MaxRetries; ++ {
		if  > 0 {
			if  := internal.Sleep(, .retryBackoff());  != nil {
				return 
			}
		}

		,  := .cmdShard(, )
		if  != nil {
			return 
		}

		 = .Client.Process(, )
		if  == nil || !shouldRetry(, .readTimeout() == nil) {
			return 
		}
	}
	return 
}

func ( *Ring) ( context.Context,  func(Pipeliner) error) ([]Cmder, error) {
	return .Pipeline().Pipelined(, )
}

func ( *Ring) () Pipeliner {
	 := Pipeline{
		exec: pipelineExecer(.processPipelineHook),
	}
	.init()
	return &
}

func ( *Ring) ( context.Context,  func(Pipeliner) error) ([]Cmder, error) {
	return .TxPipeline().Pipelined(, )
}

func ( *Ring) () Pipeliner {
	 := Pipeline{
		exec: func( context.Context,  []Cmder) error {
			 = wrapMultiExec(, )
			return .processTxPipelineHook(, )
		},
	}
	.init()
	return &
}

func ( *Ring) (
	 context.Context,  []Cmder,  bool,
) error {
	if  {
		// Trim multi .. exec.
		 = [1 : len()-1]
	}

	 := make(map[string][]Cmder)

	for ,  := range  {
		 := .cmdInfo(, .Name())
		 := .stringArg(cmdFirstKeyPos(, ))
		if  != "" {
			 = .sharding.Hash()
		}
		[] = append([], )
	}

	var  sync.WaitGroup
	for ,  := range  {
		.Add(1)
		go func( string,  []Cmder) {
			defer .Done()

			// TODO: retry?
			,  := .sharding.GetByName()
			if  != nil {
				setCmdsErr(, )
				return
			}

			if  {
				 = wrapMultiExec(, )
				_ = .Client.processTxPipelineHook(, )
			} else {
				_ = .Client.processPipelineHook(, )
			}
		}(, )
	}

	.Wait()
	return cmdsFirstErr()
}

func ( *Ring) ( context.Context,  func(*Tx) error,  ...string) error {
	if len() == 0 {
		return fmt.Errorf("redis: Watch requires at least one key")
	}

	var  []*ringShard

	for ,  := range  {
		if  != "" {
			,  := .sharding.GetByKey(hashtag.Key())
			if  != nil {
				return 
			}

			 = append(, )
		}
	}

	if len() == 0 {
		return fmt.Errorf("redis: Watch requires at least one shard")
	}

	if len() > 1 {
		for ,  := range [1:] {
			if .Client != [0].Client {
				 := fmt.Errorf("redis: Watch requires all keys to be in the same shard")
				return 
			}
		}
	}

	return [0].Client.Watch(, , ...)
}

// Close closes the ring client, releasing any open resources.
//
// It is rare to Close a Ring, as the Ring is meant to be long-lived
// and shared between many goroutines.
func ( *Ring) () error {
	.heartbeatCancelFn()

	return .sharding.Close()
}