package redis

import (
	
	
	
	
	
	
	

	
	
	
	
)

// Scanner internal/hscan.Scanner exposed interface.
type Scanner = hscan.Scanner

// Nil reply returned by Redis when key does not exist.
const Nil = proto.Nil

// SetLogger set custom log
func ( internal.Logging) {
	internal.Logger = 
}

//------------------------------------------------------------------------------

type Hook interface {
	DialHook(next DialHook) DialHook
	ProcessHook(next ProcessHook) ProcessHook
	ProcessPipelineHook(next ProcessPipelineHook) ProcessPipelineHook
}

type (
	DialHook            func(ctx context.Context, network, addr string) (net.Conn, error)
	ProcessHook         func(ctx context.Context, cmd Cmder) error
	ProcessPipelineHook func(ctx context.Context, cmds []Cmder) error
)

type hooksMixin struct {
	slice   []Hook
	initial hooks
	current hooks
}

func ( *hooksMixin) ( hooks) {
	.initial = 
	.chain()
}

type hooks struct {
	dial       DialHook
	process    ProcessHook
	pipeline   ProcessPipelineHook
	txPipeline ProcessPipelineHook
}

func ( *hooks) () {
	if .dial == nil {
		.dial = func( context.Context, ,  string) (net.Conn, error) { return nil, nil }
	}
	if .process == nil {
		.process = func( context.Context,  Cmder) error { return nil }
	}
	if .pipeline == nil {
		.pipeline = func( context.Context,  []Cmder) error { return nil }
	}
	if .txPipeline == nil {
		.txPipeline = func( context.Context,  []Cmder) error { return nil }
	}
}

// AddHook is to add a hook to the queue.
// Hook is a function executed during network connection, command execution, and pipeline,
// it is a first-in-first-out stack queue (FIFO).
// You need to execute the next hook in each hook, unless you want to terminate the execution of the command.
// For example, you added hook-1, hook-2:
//
//	client.AddHook(hook-1, hook-2)
//
// hook-1:
//
//	func (Hook1) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
//	 	return func(ctx context.Context, cmd Cmder) error {
//		 	print("hook-1 start")
//		 	next(ctx, cmd)
//		 	print("hook-1 end")
//		 	return nil
//	 	}
//	}
//
// hook-2:
//
//	func (Hook2) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
//		return func(ctx context.Context, cmd redis.Cmder) error {
//			print("hook-2 start")
//			next(ctx, cmd)
//			print("hook-2 end")
//			return nil
//		}
//	}
//
// The execution sequence is:
//
//	hook-1 start -> hook-2 start -> exec redis cmd -> hook-2 end -> hook-1 end
//
// Please note: "next(ctx, cmd)" is very important, it will call the next hook,
// if "next(ctx, cmd)" is not executed, the redis command will not be executed.
func ( *hooksMixin) ( Hook) {
	.slice = append(.slice, )
	.chain()
}

func ( *hooksMixin) () {
	.initial.setDefaults()

	.current.dial = .initial.dial
	.current.process = .initial.process
	.current.pipeline = .initial.pipeline
	.current.txPipeline = .initial.txPipeline

	for  := len(.slice) - 1;  >= 0; -- {
		if  := .slice[].DialHook(.current.dial);  != nil {
			.current.dial = 
		}
		if  := .slice[].ProcessHook(.current.process);  != nil {
			.current.process = 
		}
		if  := .slice[].ProcessPipelineHook(.current.pipeline);  != nil {
			.current.pipeline = 
		}
		if  := .slice[].ProcessPipelineHook(.current.txPipeline);  != nil {
			.current.txPipeline = 
		}
	}
}

func ( *hooksMixin) () hooksMixin {
	 := *
	 := len(.slice)
	.slice = .slice[::]
	return 
}

func ( *hooksMixin) ( context.Context,  Cmder,  ProcessHook) error {
	for  := len(.slice) - 1;  >= 0; -- {
		if  := .slice[].ProcessHook();  != nil {
			 = 
		}
	}
	return (, )
}

func ( *hooksMixin) (
	 context.Context,  []Cmder,  ProcessPipelineHook,
) error {
	for  := len(.slice) - 1;  >= 0; -- {
		if  := .slice[].ProcessPipelineHook();  != nil {
			 = 
		}
	}
	return (, )
}

func ( *hooksMixin) ( context.Context, ,  string) (net.Conn, error) {
	return .current.dial(, , )
}

func ( *hooksMixin) ( context.Context,  Cmder) error {
	return .current.process(, )
}

func ( *hooksMixin) ( context.Context,  []Cmder) error {
	return .current.pipeline(, )
}

func ( *hooksMixin) ( context.Context,  []Cmder) error {
	return .current.txPipeline(, )
}

//------------------------------------------------------------------------------

type baseClient struct {
	opt      *Options
	connPool pool.Pooler

	onClose func() error // hook called when client is closed
}

func ( *baseClient) () *baseClient {
	 := *
	return &
}

func ( *baseClient) ( time.Duration) *baseClient {
	 := .opt.clone()
	.ReadTimeout = 
	.WriteTimeout = 

	 := .clone()
	.opt = 

	return 
}

func ( *baseClient) () string {
	return fmt.Sprintf("Redis<%s db:%d>", .getAddr(), .opt.DB)
}

func ( *baseClient) ( context.Context) (*pool.Conn, error) {
	,  := .connPool.NewConn()
	if  != nil {
		return nil, 
	}

	 = .initConn(, )
	if  != nil {
		_ = .connPool.CloseConn()
		return nil, 
	}

	return , nil
}

func ( *baseClient) ( context.Context) (*pool.Conn, error) {
	if .opt.Limiter != nil {
		 := .opt.Limiter.Allow()
		if  != nil {
			return nil, 
		}
	}

	,  := ._getConn()
	if  != nil {
		if .opt.Limiter != nil {
			.opt.Limiter.ReportResult()
		}
		return nil, 
	}

	return , nil
}

func ( *baseClient) ( context.Context) (*pool.Conn, error) {
	,  := .connPool.Get()
	if  != nil {
		return nil, 
	}

	if .Inited {
		return , nil
	}

	if  := .initConn(, );  != nil {
		.connPool.Remove(, , )
		if  := errors.Unwrap();  != nil {
			return nil, 
		}
		return nil, 
	}

	return , nil
}

func ( *baseClient) ( context.Context,  *pool.Conn) error {
	if .Inited {
		return nil
	}
	.Inited = true

	,  := .opt.Username, .opt.Password
	if .opt.CredentialsProvider != nil {
		,  = .opt.CredentialsProvider()
	}

	 := pool.NewSingleConnPool(.connPool, )
	 := newConn(.opt, )

	var  bool

	// For redis-server < 6.0 that does not support the Hello command,
	// we continue to provide services with RESP2.
	if  := .Hello(, 3, , , "").Err();  == nil {
		 = true
	} else if !strings.HasPrefix(.Error(), "ERR unknown command") &&
		// this check is for compatibility DragonflyDB.
		!strings.HasPrefix(.Error(), "NOAUTH Authentication") {
		return 
	}

	,  := .Pipelined(, func( Pipeliner) error {
		if ! &&  != "" {
			if  != "" {
				.AuthACL(, , )
			} else {
				.Auth(, )
			}
		}

		if .opt.DB > 0 {
			.Select(, .opt.DB)
		}

		if .opt.readOnly {
			.ReadOnly()
		}

		if .opt.ClientName != "" {
			.ClientSetName(, .opt.ClientName)
		}

		return nil
	})
	if  != nil {
		return 
	}

	if .opt.OnConnect != nil {
		return .opt.OnConnect(, )
	}
	return nil
}

func ( *baseClient) ( context.Context,  *pool.Conn,  error) {
	if .opt.Limiter != nil {
		.opt.Limiter.ReportResult()
	}

	if isBadConn(, false, .opt.Addr) {
		.connPool.Remove(, , )
	} else {
		.connPool.Put(, )
	}
}

func ( *baseClient) (
	 context.Context,  func(context.Context, *pool.Conn) error,
) error {
	,  := .getConn()
	if  != nil {
		return 
	}

	var  error
	defer func() {
		.releaseConn(, , )
	}()

	 = (, )

	return 
}

func ( *baseClient) ( context.Context, ,  string) (net.Conn, error) {
	return .opt.Dialer(, , )
}

func ( *baseClient) ( context.Context,  Cmder) error {
	var  error
	for  := 0;  <= .opt.MaxRetries; ++ {
		 := 

		,  := ._process(, , )
		if  == nil || ! {
			return 
		}

		 = 
	}
	return 
}

func ( *baseClient) ( context.Context,  Cmder,  int) (bool, error) {
	if  > 0 {
		if  := internal.Sleep(, .retryBackoff());  != nil {
			return false, 
		}
	}

	 := uint32(0)
	if  := .withConn(, func( context.Context,  *pool.Conn) error {
		if  := .WithWriter(.context(), .opt.WriteTimeout, func( *proto.Writer) error {
			return writeCmd(, )
		});  != nil {
			atomic.StoreUint32(&, 1)
			return 
		}

		if  := .WithReader(.context(), .cmdTimeout(), .readReply);  != nil {
			if .readTimeout() == nil {
				atomic.StoreUint32(&, 1)
			} else {
				atomic.StoreUint32(&, 0)
			}
			return 
		}

		return nil
	});  != nil {
		 := shouldRetry(, atomic.LoadUint32(&) == 1)
		return , 
	}

	return false, nil
}

func ( *baseClient) ( int) time.Duration {
	return internal.RetryBackoff(, .opt.MinRetryBackoff, .opt.MaxRetryBackoff)
}

func ( *baseClient) ( Cmder) time.Duration {
	if  := .readTimeout();  != nil {
		 := *
		if  == 0 {
			return 0
		}
		return  + 10*time.Second
	}
	return .opt.ReadTimeout
}

// Close closes the client, releasing any open resources.
//
// It is rare to Close a Client, as the Client is meant to be
// long-lived and shared between many goroutines.
func ( *baseClient) () error {
	var  error
	if .onClose != nil {
		if  := .onClose();  != nil {
			 = 
		}
	}
	if  := .connPool.Close();  != nil &&  == nil {
		 = 
	}
	return 
}

func ( *baseClient) () string {
	return .opt.Addr
}

func ( *baseClient) ( context.Context,  []Cmder) error {
	if  := .generalProcessPipeline(, , .pipelineProcessCmds);  != nil {
		return 
	}
	return cmdsFirstErr()
}

func ( *baseClient) ( context.Context,  []Cmder) error {
	if  := .generalProcessPipeline(, , .txPipelineProcessCmds);  != nil {
		return 
	}
	return cmdsFirstErr()
}

type pipelineProcessor func(context.Context, *pool.Conn, []Cmder) (bool, error)

func ( *baseClient) (
	 context.Context,  []Cmder,  pipelineProcessor,
) error {
	var  error
	for  := 0;  <= .opt.MaxRetries; ++ {
		if  > 0 {
			if  := internal.Sleep(, .retryBackoff());  != nil {
				setCmdsErr(, )
				return 
			}
		}

		// Enable retries by default to retry dial errors returned by withConn.
		 := true
		 = .withConn(, func( context.Context,  *pool.Conn) error {
			var  error
			,  = (, , )
			return 
		})
		if  == nil || ! || !shouldRetry(, true) {
			return 
		}
	}
	return 
}

func ( *baseClient) (
	 context.Context,  *pool.Conn,  []Cmder,
) (bool, error) {
	if  := .WithWriter(.context(), .opt.WriteTimeout, func( *proto.Writer) error {
		return writeCmds(, )
	});  != nil {
		setCmdsErr(, )
		return true, 
	}

	if  := .WithReader(.context(), .opt.ReadTimeout, func( *proto.Reader) error {
		return pipelineReadCmds(, )
	});  != nil {
		return true, 
	}

	return false, nil
}

func pipelineReadCmds( *proto.Reader,  []Cmder) error {
	for ,  := range  {
		 := .readReply()
		.SetErr()
		if  != nil && !isRedisError() {
			setCmdsErr([+1:], )
			return 
		}
	}
	// Retry errors like "LOADING redis is loading the dataset in memory".
	return [0].Err()
}

func ( *baseClient) (
	 context.Context,  *pool.Conn,  []Cmder,
) (bool, error) {
	if  := .WithWriter(.context(), .opt.WriteTimeout, func( *proto.Writer) error {
		return writeCmds(, )
	});  != nil {
		setCmdsErr(, )
		return true, 
	}

	if  := .WithReader(.context(), .opt.ReadTimeout, func( *proto.Reader) error {
		 := [0].(*StatusCmd)
		// Trim multi and exec.
		 := [1 : len()-1]

		if  := txPipelineReadQueued(, , );  != nil {
			setCmdsErr(, )
			return 
		}

		return pipelineReadCmds(, )
	});  != nil {
		return false, 
	}

	return false, nil
}

func txPipelineReadQueued( *proto.Reader,  *StatusCmd,  []Cmder) error {
	// Parse +OK.
	if  := .readReply();  != nil {
		return 
	}

	// Parse +QUEUED.
	for range  {
		if  := .readReply();  != nil && !isRedisError() {
			return 
		}
	}

	// Parse number of replies.
	,  := .ReadLine()
	if  != nil {
		if  == Nil {
			 = TxFailedErr
		}
		return 
	}

	if [0] != proto.RespArray {
		return fmt.Errorf("redis: expected '*', but got line %q", )
	}

	return nil
}

func ( *baseClient) ( context.Context) context.Context {
	if .opt.ContextTimeoutEnabled {
		return 
	}
	return context.Background()
}

//------------------------------------------------------------------------------

// Client is a Redis client representing a pool of zero or more underlying connections.
// It's safe for concurrent use by multiple goroutines.
//
// Client creates and frees connections automatically; it also maintains a free pool
// of idle connections. You can control the pool size with Config.PoolSize option.
type Client struct {
	*baseClient
	cmdable
	hooksMixin
}

// NewClient returns a client to the Redis Server specified by Options.
func ( *Options) *Client {
	.init()

	 := Client{
		baseClient: &baseClient{
			opt: ,
		},
	}
	.init()
	.connPool = newConnPool(, .dialHook)

	return &
}

func ( *Client) () {
	.cmdable = .Process
	.initHooks(hooks{
		dial:       .baseClient.dial,
		process:    .baseClient.process,
		pipeline:   .baseClient.processPipeline,
		txPipeline: .baseClient.processTxPipeline,
	})
}

func ( *Client) ( time.Duration) *Client {
	 := *
	.baseClient = .baseClient.withTimeout()
	.init()
	return &
}

func ( *Client) () *Conn {
	return newConn(.opt, pool.NewStickyConnPool(.connPool))
}

// Do create a Cmd from the args and processes the cmd.
func ( *Client) ( context.Context,  ...interface{}) *Cmd {
	 := NewCmd(, ...)
	_ = .Process(, )
	return 
}

func ( *Client) ( context.Context,  Cmder) error {
	 := .processHook(, )
	.SetErr()
	return 
}

// Options returns read-only Options that were used to create the client.
func ( *Client) () *Options {
	return .opt
}

type PoolStats pool.Stats

// PoolStats returns connection pool stats.
func ( *Client) () *PoolStats {
	 := .connPool.Stats()
	return (*PoolStats)()
}

func ( *Client) ( context.Context,  func(Pipeliner) error) ([]Cmder, error) {
	return .Pipeline().Pipelined(, )
}

func ( *Client) () Pipeliner {
	 := Pipeline{
		exec: pipelineExecer(.processPipelineHook),
	}
	.init()
	return &
}

func ( *Client) ( context.Context,  func(Pipeliner) error) ([]Cmder, error) {
	return .TxPipeline().Pipelined(, )
}

// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func ( *Client) () Pipeliner {
	 := Pipeline{
		exec: func( context.Context,  []Cmder) error {
			 = wrapMultiExec(, )
			return .processTxPipelineHook(, )
		},
	}
	.init()
	return &
}

func ( *Client) () *PubSub {
	 := &PubSub{
		opt: .opt,

		newConn: func( context.Context,  []string) (*pool.Conn, error) {
			return .newConn()
		},
		closeConn: .connPool.CloseConn,
	}
	.init()
	return 
}

// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
// Note that this method does not wait on a response from Redis, so the
// subscription may not be active immediately. To force the connection to wait,
// you may call the Receive() method on the returned *PubSub like so:
//
//	sub := client.Subscribe(queryResp)
//	iface, err := sub.Receive()
//	if err != nil {
//	    // handle error
//	}
//
//	// Should be *Subscription, but others are possible if other actions have been
//	// taken on sub since it was created.
//	switch iface.(type) {
//	case *Subscription:
//	    // subscribe succeeded
//	case *Message:
//	    // received first message
//	case *Pong:
//	    // pong received
//	default:
//	    // handle error
//	}
//
//	ch := sub.Channel()
func ( *Client) ( context.Context,  ...string) *PubSub {
	 := .pubSub()
	if len() > 0 {
		_ = .Subscribe(, ...)
	}
	return 
}

// PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription.
func ( *Client) ( context.Context,  ...string) *PubSub {
	 := .pubSub()
	if len() > 0 {
		_ = .PSubscribe(, ...)
	}
	return 
}

// SSubscribe Subscribes the client to the specified shard channels.
// Channels can be omitted to create empty subscription.
func ( *Client) ( context.Context,  ...string) *PubSub {
	 := .pubSub()
	if len() > 0 {
		_ = .SSubscribe(, ...)
	}
	return 
}

//------------------------------------------------------------------------------

// Conn represents a single Redis connection rather than a pool of connections.
// Prefer running commands from Client unless there is a specific need
// for a continuous single Redis connection.
type Conn struct {
	baseClient
	cmdable
	statefulCmdable
	hooksMixin
}

func newConn( *Options,  pool.Pooler) *Conn {
	 := Conn{
		baseClient: baseClient{
			opt:      ,
			connPool: ,
		},
	}

	.cmdable = .Process
	.statefulCmdable = .Process
	.initHooks(hooks{
		dial:       .baseClient.dial,
		process:    .baseClient.process,
		pipeline:   .baseClient.processPipeline,
		txPipeline: .baseClient.processTxPipeline,
	})

	return &
}

func ( *Conn) ( context.Context,  Cmder) error {
	 := .processHook(, )
	.SetErr()
	return 
}

func ( *Conn) ( context.Context,  func(Pipeliner) error) ([]Cmder, error) {
	return .Pipeline().Pipelined(, )
}

func ( *Conn) () Pipeliner {
	 := Pipeline{
		exec: .processPipelineHook,
	}
	.init()
	return &
}

func ( *Conn) ( context.Context,  func(Pipeliner) error) ([]Cmder, error) {
	return .TxPipeline().Pipelined(, )
}

// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func ( *Conn) () Pipeliner {
	 := Pipeline{
		exec: func( context.Context,  []Cmder) error {
			 = wrapMultiExec(, )
			return .processTxPipelineHook(, )
		},
	}
	.init()
	return &
}