package redis
import (
"context"
"crypto/tls"
"errors"
"net"
"strings"
"sync"
"time"
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/rand"
)
type FailoverOptions struct {
MasterName string
SentinelAddrs []string
ClientName string
SentinelUsername string
SentinelPassword string
RouteByLatency bool
RouteRandomly bool
ReplicaOnly bool
UseDisconnectedReplicas bool
Dialer func (ctx context .Context , network, addr string ) (net .Conn , error )
OnConnect func (ctx context .Context , cn *Conn ) error
Username string
Password string
DB int
MaxRetries int
MinRetryBackoff time .Duration
MaxRetryBackoff time .Duration
DialTimeout time .Duration
ReadTimeout time .Duration
WriteTimeout time .Duration
ContextTimeoutEnabled bool
PoolFIFO bool
PoolSize int
PoolTimeout time .Duration
MinIdleConns int
MaxIdleConns int
ConnMaxIdleTime time .Duration
ConnMaxLifetime time .Duration
TLSConfig *tls .Config
}
func (opt *FailoverOptions ) clientOptions () *Options {
return &Options {
Addr : "FailoverClient" ,
ClientName : opt .ClientName ,
Dialer : opt .Dialer ,
OnConnect : opt .OnConnect ,
DB : opt .DB ,
Username : opt .Username ,
Password : opt .Password ,
MaxRetries : opt .MaxRetries ,
MinRetryBackoff : opt .MinRetryBackoff ,
MaxRetryBackoff : opt .MaxRetryBackoff ,
DialTimeout : opt .DialTimeout ,
ReadTimeout : opt .ReadTimeout ,
WriteTimeout : opt .WriteTimeout ,
ContextTimeoutEnabled : opt .ContextTimeoutEnabled ,
PoolFIFO : opt .PoolFIFO ,
PoolSize : opt .PoolSize ,
PoolTimeout : opt .PoolTimeout ,
MinIdleConns : opt .MinIdleConns ,
MaxIdleConns : opt .MaxIdleConns ,
ConnMaxIdleTime : opt .ConnMaxIdleTime ,
ConnMaxLifetime : opt .ConnMaxLifetime ,
TLSConfig : opt .TLSConfig ,
}
}
func (opt *FailoverOptions ) sentinelOptions (addr string ) *Options {
return &Options {
Addr : addr ,
ClientName : opt .ClientName ,
Dialer : opt .Dialer ,
OnConnect : opt .OnConnect ,
DB : 0 ,
Username : opt .SentinelUsername ,
Password : opt .SentinelPassword ,
MaxRetries : opt .MaxRetries ,
MinRetryBackoff : opt .MinRetryBackoff ,
MaxRetryBackoff : opt .MaxRetryBackoff ,
DialTimeout : opt .DialTimeout ,
ReadTimeout : opt .ReadTimeout ,
WriteTimeout : opt .WriteTimeout ,
PoolFIFO : opt .PoolFIFO ,
PoolSize : opt .PoolSize ,
PoolTimeout : opt .PoolTimeout ,
MinIdleConns : opt .MinIdleConns ,
MaxIdleConns : opt .MaxIdleConns ,
ConnMaxIdleTime : opt .ConnMaxIdleTime ,
ConnMaxLifetime : opt .ConnMaxLifetime ,
TLSConfig : opt .TLSConfig ,
}
}
func (opt *FailoverOptions ) clusterOptions () *ClusterOptions {
return &ClusterOptions {
ClientName : opt .ClientName ,
Dialer : opt .Dialer ,
OnConnect : opt .OnConnect ,
Username : opt .Username ,
Password : opt .Password ,
MaxRedirects : opt .MaxRetries ,
RouteByLatency : opt .RouteByLatency ,
RouteRandomly : opt .RouteRandomly ,
MinRetryBackoff : opt .MinRetryBackoff ,
MaxRetryBackoff : opt .MaxRetryBackoff ,
DialTimeout : opt .DialTimeout ,
ReadTimeout : opt .ReadTimeout ,
WriteTimeout : opt .WriteTimeout ,
PoolFIFO : opt .PoolFIFO ,
PoolSize : opt .PoolSize ,
PoolTimeout : opt .PoolTimeout ,
MinIdleConns : opt .MinIdleConns ,
MaxIdleConns : opt .MaxIdleConns ,
ConnMaxIdleTime : opt .ConnMaxIdleTime ,
ConnMaxLifetime : opt .ConnMaxLifetime ,
TLSConfig : opt .TLSConfig ,
}
}
func NewFailoverClient (failoverOpt *FailoverOptions ) *Client {
if failoverOpt .RouteByLatency {
panic ("to route commands by latency, use NewFailoverClusterClient" )
}
if failoverOpt .RouteRandomly {
panic ("to route commands randomly, use NewFailoverClusterClient" )
}
sentinelAddrs := make ([]string , len (failoverOpt .SentinelAddrs ))
copy (sentinelAddrs , failoverOpt .SentinelAddrs )
rand .Shuffle (len (sentinelAddrs ), func (i , j int ) {
sentinelAddrs [i ], sentinelAddrs [j ] = sentinelAddrs [j ], sentinelAddrs [i ]
})
failover := &sentinelFailover {
opt : failoverOpt ,
sentinelAddrs : sentinelAddrs ,
}
opt := failoverOpt .clientOptions ()
opt .Dialer = masterReplicaDialer (failover )
opt .init ()
var connPool *pool .ConnPool
rdb := &Client {
baseClient : &baseClient {
opt : opt ,
},
}
rdb .init ()
connPool = newConnPool (opt , rdb .dialHook )
rdb .connPool = connPool
rdb .onClose = failover .Close
failover .mu .Lock ()
failover .onFailover = func (ctx context .Context , addr string ) {
_ = connPool .Filter (func (cn *pool .Conn ) bool {
return cn .RemoteAddr ().String () != addr
})
}
failover .mu .Unlock ()
return rdb
}
func masterReplicaDialer(
failover *sentinelFailover ,
) func (ctx context .Context , network , addr string ) (net .Conn , error ) {
return func (ctx context .Context , network , _ string ) (net .Conn , error ) {
var addr string
var err error
if failover .opt .ReplicaOnly {
addr , err = failover .RandomReplicaAddr (ctx )
} else {
addr , err = failover .MasterAddr (ctx )
if err == nil {
failover .trySwitchMaster (ctx , addr )
}
}
if err != nil {
return nil , err
}
if failover .opt .Dialer != nil {
return failover .opt .Dialer (ctx , network , addr )
}
netDialer := &net .Dialer {
Timeout : failover .opt .DialTimeout ,
KeepAlive : 5 * time .Minute ,
}
if failover .opt .TLSConfig == nil {
return netDialer .DialContext (ctx , network , addr )
}
return tls .DialWithDialer (netDialer , network , addr , failover .opt .TLSConfig )
}
}
type SentinelClient struct {
*baseClient
hooksMixin
}
func NewSentinelClient (opt *Options ) *SentinelClient {
opt .init ()
c := &SentinelClient {
baseClient : &baseClient {
opt : opt ,
},
}
c .initHooks (hooks {
dial : c .baseClient .dial ,
process : c .baseClient .process ,
})
c .connPool = newConnPool (opt , c .dialHook )
return c
}
func (c *SentinelClient ) Process (ctx context .Context , cmd Cmder ) error {
err := c .processHook (ctx , cmd )
cmd .SetErr (err )
return err
}
func (c *SentinelClient ) pubSub () *PubSub {
pubsub := &PubSub {
opt : c .opt ,
newConn : func (ctx context .Context , channels []string ) (*pool .Conn , error ) {
return c .newConn (ctx )
},
closeConn : c .connPool .CloseConn ,
}
pubsub .init ()
return pubsub
}
func (c *SentinelClient ) Ping (ctx context .Context ) *StringCmd {
cmd := NewStringCmd (ctx , "ping" )
_ = c .Process (ctx , cmd )
return cmd
}
func (c *SentinelClient ) Subscribe (ctx context .Context , channels ...string ) *PubSub {
pubsub := c .pubSub ()
if len (channels ) > 0 {
_ = pubsub .Subscribe (ctx , channels ...)
}
return pubsub
}
func (c *SentinelClient ) PSubscribe (ctx context .Context , channels ...string ) *PubSub {
pubsub := c .pubSub ()
if len (channels ) > 0 {
_ = pubsub .PSubscribe (ctx , channels ...)
}
return pubsub
}
func (c *SentinelClient ) GetMasterAddrByName (ctx context .Context , name string ) *StringSliceCmd {
cmd := NewStringSliceCmd (ctx , "sentinel" , "get-master-addr-by-name" , name )
_ = c .Process (ctx , cmd )
return cmd
}
func (c *SentinelClient ) Sentinels (ctx context .Context , name string ) *MapStringStringSliceCmd {
cmd := NewMapStringStringSliceCmd (ctx , "sentinel" , "sentinels" , name )
_ = c .Process (ctx , cmd )
return cmd
}
func (c *SentinelClient ) Failover (ctx context .Context , name string ) *StatusCmd {
cmd := NewStatusCmd (ctx , "sentinel" , "failover" , name )
_ = c .Process (ctx , cmd )
return cmd
}
func (c *SentinelClient ) Reset (ctx context .Context , pattern string ) *IntCmd {
cmd := NewIntCmd (ctx , "sentinel" , "reset" , pattern )
_ = c .Process (ctx , cmd )
return cmd
}
func (c *SentinelClient ) FlushConfig (ctx context .Context ) *StatusCmd {
cmd := NewStatusCmd (ctx , "sentinel" , "flushconfig" )
_ = c .Process (ctx , cmd )
return cmd
}
func (c *SentinelClient ) Master (ctx context .Context , name string ) *MapStringStringCmd {
cmd := NewMapStringStringCmd (ctx , "sentinel" , "master" , name )
_ = c .Process (ctx , cmd )
return cmd
}
func (c *SentinelClient ) Masters (ctx context .Context ) *SliceCmd {
cmd := NewSliceCmd (ctx , "sentinel" , "masters" )
_ = c .Process (ctx , cmd )
return cmd
}
func (c *SentinelClient ) Replicas (ctx context .Context , name string ) *MapStringStringSliceCmd {
cmd := NewMapStringStringSliceCmd (ctx , "sentinel" , "replicas" , name )
_ = c .Process (ctx , cmd )
return cmd
}
func (c *SentinelClient ) CkQuorum (ctx context .Context , name string ) *StringCmd {
cmd := NewStringCmd (ctx , "sentinel" , "ckquorum" , name )
_ = c .Process (ctx , cmd )
return cmd
}
func (c *SentinelClient ) Monitor (ctx context .Context , name , ip , port , quorum string ) *StringCmd {
cmd := NewStringCmd (ctx , "sentinel" , "monitor" , name , ip , port , quorum )
_ = c .Process (ctx , cmd )
return cmd
}
func (c *SentinelClient ) Set (ctx context .Context , name , option , value string ) *StringCmd {
cmd := NewStringCmd (ctx , "sentinel" , "set" , name , option , value )
_ = c .Process (ctx , cmd )
return cmd
}
func (c *SentinelClient ) Remove (ctx context .Context , name string ) *StringCmd {
cmd := NewStringCmd (ctx , "sentinel" , "remove" , name )
_ = c .Process (ctx , cmd )
return cmd
}
type sentinelFailover struct {
opt *FailoverOptions
sentinelAddrs []string
onFailover func (ctx context .Context , addr string )
onUpdate func (ctx context .Context )
mu sync .RWMutex
_masterAddr string
sentinel *SentinelClient
pubsub *PubSub
}
func (c *sentinelFailover ) Close () error {
c .mu .Lock ()
defer c .mu .Unlock ()
if c .sentinel != nil {
return c .closeSentinel ()
}
return nil
}
func (c *sentinelFailover ) closeSentinel () error {
firstErr := c .pubsub .Close ()
c .pubsub = nil
err := c .sentinel .Close ()
if err != nil && firstErr == nil {
firstErr = err
}
c .sentinel = nil
return firstErr
}
func (c *sentinelFailover ) RandomReplicaAddr (ctx context .Context ) (string , error ) {
if c .opt == nil {
return "" , errors .New ("opt is nil" )
}
addresses , err := c .replicaAddrs (ctx , false )
if err != nil {
return "" , err
}
if len (addresses ) == 0 && c .opt .UseDisconnectedReplicas {
addresses , err = c .replicaAddrs (ctx , true )
if err != nil {
return "" , err
}
}
if len (addresses ) == 0 {
return c .MasterAddr (ctx )
}
return addresses [rand .Intn (len (addresses ))], nil
}
func (c *sentinelFailover ) MasterAddr (ctx context .Context ) (string , error ) {
c .mu .RLock ()
sentinel := c .sentinel
c .mu .RUnlock ()
if sentinel != nil {
addr , err := c .getMasterAddr (ctx , sentinel )
if err != nil {
if errors .Is (err , context .Canceled ) || errors .Is (err , context .DeadlineExceeded ) {
return "" , err
}
internal .Logger .Printf (ctx , "sentinel: GetMasterAddrByName name=%q failed: %s" ,
c .opt .MasterName , err )
} else {
return addr , nil
}
}
c .mu .Lock ()
defer c .mu .Unlock ()
if c .sentinel != nil {
addr , err := c .getMasterAddr (ctx , c .sentinel )
if err != nil {
_ = c .closeSentinel ()
if errors .Is (err , context .Canceled ) || errors .Is (err , context .DeadlineExceeded ) {
return "" , err
}
internal .Logger .Printf (ctx , "sentinel: GetMasterAddrByName name=%q failed: %s" ,
c .opt .MasterName , err )
} else {
return addr , nil
}
}
for i , sentinelAddr := range c .sentinelAddrs {
sentinel := NewSentinelClient (c .opt .sentinelOptions (sentinelAddr ))
masterAddr , err := sentinel .GetMasterAddrByName (ctx , c .opt .MasterName ).Result ()
if err != nil {
_ = sentinel .Close ()
if errors .Is (err , context .Canceled ) || errors .Is (err , context .DeadlineExceeded ) {
return "" , err
}
internal .Logger .Printf (ctx , "sentinel: GetMasterAddrByName master=%q failed: %s" ,
c .opt .MasterName , err )
continue
}
c .sentinelAddrs [0 ], c .sentinelAddrs [i ] = c .sentinelAddrs [i ], c .sentinelAddrs [0 ]
c .setSentinel (ctx , sentinel )
addr := net .JoinHostPort (masterAddr [0 ], masterAddr [1 ])
return addr , nil
}
return "" , errors .New ("redis: all sentinels specified in configuration are unreachable" )
}
func (c *sentinelFailover ) replicaAddrs (ctx context .Context , useDisconnected bool ) ([]string , error ) {
c .mu .RLock ()
sentinel := c .sentinel
c .mu .RUnlock ()
if sentinel != nil {
addrs , err := c .getReplicaAddrs (ctx , sentinel )
if err != nil {
if errors .Is (err , context .Canceled ) || errors .Is (err , context .DeadlineExceeded ) {
return nil , err
}
internal .Logger .Printf (ctx , "sentinel: Replicas name=%q failed: %s" ,
c .opt .MasterName , err )
} else if len (addrs ) > 0 {
return addrs , nil
}
}
c .mu .Lock ()
defer c .mu .Unlock ()
if c .sentinel != nil {
addrs , err := c .getReplicaAddrs (ctx , c .sentinel )
if err != nil {
_ = c .closeSentinel ()
if errors .Is (err , context .Canceled ) || errors .Is (err , context .DeadlineExceeded ) {
return nil , err
}
internal .Logger .Printf (ctx , "sentinel: Replicas name=%q failed: %s" ,
c .opt .MasterName , err )
} else if len (addrs ) > 0 {
return addrs , nil
} else {
_ = c .closeSentinel ()
}
}
var sentinelReachable bool
for i , sentinelAddr := range c .sentinelAddrs {
sentinel := NewSentinelClient (c .opt .sentinelOptions (sentinelAddr ))
replicas , err := sentinel .Replicas (ctx , c .opt .MasterName ).Result ()
if err != nil {
_ = sentinel .Close ()
if errors .Is (err , context .Canceled ) || errors .Is (err , context .DeadlineExceeded ) {
return nil , err
}
internal .Logger .Printf (ctx , "sentinel: Replicas master=%q failed: %s" ,
c .opt .MasterName , err )
continue
}
sentinelReachable = true
addrs := parseReplicaAddrs (replicas , useDisconnected )
if len (addrs ) == 0 {
continue
}
c .sentinelAddrs [0 ], c .sentinelAddrs [i ] = c .sentinelAddrs [i ], c .sentinelAddrs [0 ]
c .setSentinel (ctx , sentinel )
return addrs , nil
}
if sentinelReachable {
return []string {}, nil
}
return []string {}, errors .New ("redis: all sentinels specified in configuration are unreachable" )
}
func (c *sentinelFailover ) getMasterAddr (ctx context .Context , sentinel *SentinelClient ) (string , error ) {
addr , err := sentinel .GetMasterAddrByName (ctx , c .opt .MasterName ).Result ()
if err != nil {
return "" , err
}
return net .JoinHostPort (addr [0 ], addr [1 ]), nil
}
func (c *sentinelFailover ) getReplicaAddrs (ctx context .Context , sentinel *SentinelClient ) ([]string , error ) {
addrs , err := sentinel .Replicas (ctx , c .opt .MasterName ).Result ()
if err != nil {
internal .Logger .Printf (ctx , "sentinel: Replicas name=%q failed: %s" ,
c .opt .MasterName , err )
return nil , err
}
return parseReplicaAddrs (addrs , false ), nil
}
func parseReplicaAddrs(addrs []map [string ]string , keepDisconnected bool ) []string {
nodes := make ([]string , 0 , len (addrs ))
for _ , node := range addrs {
isDown := false
if flags , ok := node ["flags" ]; ok {
for _ , flag := range strings .Split (flags , "," ) {
switch flag {
case "s_down" , "o_down" :
isDown = true
case "disconnected" :
if !keepDisconnected {
isDown = true
}
}
}
}
if !isDown && node ["ip" ] != "" && node ["port" ] != "" {
nodes = append (nodes , net .JoinHostPort (node ["ip" ], node ["port" ]))
}
}
return nodes
}
func (c *sentinelFailover ) trySwitchMaster (ctx context .Context , addr string ) {
c .mu .RLock ()
currentAddr := c ._masterAddr
c .mu .RUnlock ()
if addr == currentAddr {
return
}
c .mu .Lock ()
defer c .mu .Unlock ()
if addr == c ._masterAddr {
return
}
c ._masterAddr = addr
internal .Logger .Printf (ctx , "sentinel: new master=%q addr=%q" ,
c .opt .MasterName , addr )
if c .onFailover != nil {
c .onFailover (ctx , addr )
}
}
func (c *sentinelFailover ) setSentinel (ctx context .Context , sentinel *SentinelClient ) {
if c .sentinel != nil {
panic ("not reached" )
}
c .sentinel = sentinel
c .discoverSentinels (ctx )
c .pubsub = sentinel .Subscribe (ctx , "+switch-master" , "+replica-reconf-done" )
go c .listen (c .pubsub )
}
func (c *sentinelFailover ) discoverSentinels (ctx context .Context ) {
sentinels , err := c .sentinel .Sentinels (ctx , c .opt .MasterName ).Result ()
if err != nil {
internal .Logger .Printf (ctx , "sentinel: Sentinels master=%q failed: %s" , c .opt .MasterName , err )
return
}
for _ , sentinel := range sentinels {
ip , ok := sentinel ["ip" ]
if !ok {
continue
}
port , ok := sentinel ["port" ]
if !ok {
continue
}
if ip != "" && port != "" {
sentinelAddr := net .JoinHostPort (ip , port )
if !contains (c .sentinelAddrs , sentinelAddr ) {
internal .Logger .Printf (ctx , "sentinel: discovered new sentinel=%q for master=%q" ,
sentinelAddr , c .opt .MasterName )
c .sentinelAddrs = append (c .sentinelAddrs , sentinelAddr )
}
}
}
}
func (c *sentinelFailover ) listen (pubsub *PubSub ) {
ctx := context .TODO ()
if c .onUpdate != nil {
c .onUpdate (ctx )
}
ch := pubsub .Channel ()
for msg := range ch {
if msg .Channel == "+switch-master" {
parts := strings .Split (msg .Payload , " " )
if parts [0 ] != c .opt .MasterName {
internal .Logger .Printf (pubsub .getContext (), "sentinel: ignore addr for master=%q" , parts [0 ])
continue
}
addr := net .JoinHostPort (parts [3 ], parts [4 ])
c .trySwitchMaster (pubsub .getContext (), addr )
}
if c .onUpdate != nil {
c .onUpdate (ctx )
}
}
}
func contains(slice []string , str string ) bool {
for _ , s := range slice {
if s == str {
return true
}
}
return false
}
func NewFailoverClusterClient (failoverOpt *FailoverOptions ) *ClusterClient {
sentinelAddrs := make ([]string , len (failoverOpt .SentinelAddrs ))
copy (sentinelAddrs , failoverOpt .SentinelAddrs )
failover := &sentinelFailover {
opt : failoverOpt ,
sentinelAddrs : sentinelAddrs ,
}
opt := failoverOpt .clusterOptions ()
opt .ClusterSlots = func (ctx context .Context ) ([]ClusterSlot , error ) {
masterAddr , err := failover .MasterAddr (ctx )
if err != nil {
return nil , err
}
nodes := []ClusterNode {{
Addr : masterAddr ,
}}
replicaAddrs , err := failover .replicaAddrs (ctx , false )
if err != nil {
return nil , err
}
for _ , replicaAddr := range replicaAddrs {
nodes = append (nodes , ClusterNode {
Addr : replicaAddr ,
})
}
slots := []ClusterSlot {
{
Start : 0 ,
End : 16383 ,
Nodes : nodes ,
},
}
return slots , nil
}
c := NewClusterClient (opt )
failover .mu .Lock ()
failover .onUpdate = func (ctx context .Context ) {
c .ReloadState (ctx )
}
failover .mu .Unlock ()
return c
}
The pages are generated with Golds v0.8.2 . (GOOS=linux GOARCH=amd64)
Golds is a Go 101 project developed by Tapir Liu .
PR and bug reports are welcome and can be submitted to the issue list .
Please follow @zigo_101 (reachable from the left QR code) to get the latest news of Golds .