package redis

import (
	
	
	
	
	

	
	
	
)

// PubSub implements Pub/Sub commands as described in
// http://redis.io/topics/pubsub. Message receiving is NOT safe
// for concurrent use by multiple goroutines.
//
// PubSub automatically reconnects to Redis Server and resubscribes
// to the channels in case of network errors.
type PubSub struct {
	opt *Options

	newConn   func(ctx context.Context, channels []string) (*pool.Conn, error)
	closeConn func(*pool.Conn) error

	mu        sync.Mutex
	cn        *pool.Conn
	channels  map[string]struct{}
	patterns  map[string]struct{}
	schannels map[string]struct{}

	closed bool
	exit   chan struct{}

	cmd *Cmd

	chOnce sync.Once
	msgCh  *channel
	allCh  *channel
}

func ( *PubSub) () {
	.exit = make(chan struct{})
}

func ( *PubSub) () string {
	 := mapKeys(.channels)
	 = append(, mapKeys(.patterns)...)
	 = append(, mapKeys(.schannels)...)
	return fmt.Sprintf("PubSub(%s)", strings.Join(, ", "))
}

func ( *PubSub) ( context.Context) (*pool.Conn, error) {
	.mu.Lock()
	,  := .conn(, nil)
	.mu.Unlock()
	return , 
}

func ( *PubSub) ( context.Context,  []string) (*pool.Conn, error) {
	if .closed {
		return nil, pool.ErrClosed
	}
	if .cn != nil {
		return .cn, nil
	}

	 := mapKeys(.channels)
	 = append(, ...)

	,  := .newConn(, )
	if  != nil {
		return nil, 
	}

	if  := .resubscribe(, );  != nil {
		_ = .closeConn()
		return nil, 
	}

	.cn = 
	return , nil
}

func ( *PubSub) ( context.Context,  *pool.Conn,  Cmder) error {
	return .WithWriter(context.Background(), .opt.WriteTimeout, func( *proto.Writer) error {
		return writeCmd(, )
	})
}

func ( *PubSub) ( context.Context,  *pool.Conn) error {
	var  error

	if len(.channels) > 0 {
		 = ._subscribe(, , "subscribe", mapKeys(.channels))
	}

	if len(.patterns) > 0 {
		 := ._subscribe(, , "psubscribe", mapKeys(.patterns))
		if  != nil &&  == nil {
			 = 
		}
	}

	if len(.schannels) > 0 {
		 := ._subscribe(, , "ssubscribe", mapKeys(.schannels))
		if  != nil &&  == nil {
			 = 
		}
	}

	return 
}

func mapKeys( map[string]struct{}) []string {
	 := make([]string, len())
	 := 0
	for  := range  {
		[] = 
		++
	}
	return 
}

func ( *PubSub) (
	 context.Context,  *pool.Conn,  string,  []string,
) error {
	 := make([]interface{}, 0, 1+len())
	 = append(, )
	for ,  := range  {
		 = append(, )
	}
	 := NewSliceCmd(, ...)
	return .writeCmd(, , )
}

func ( *PubSub) (
	 context.Context,
	 *pool.Conn,
	 error,
	 bool,
) {
	.mu.Lock()
	.releaseConn(, , , )
	.mu.Unlock()
}

func ( *PubSub) ( context.Context,  *pool.Conn,  error,  bool) {
	if .cn !=  {
		return
	}
	if isBadConn(, , .opt.Addr) {
		.reconnect(, )
	}
}

func ( *PubSub) ( context.Context,  error) {
	_ = .closeTheCn()
	_, _ = .conn(, nil)
}

func ( *PubSub) ( error) error {
	if .cn == nil {
		return nil
	}
	if !.closed {
		internal.Logger.Printf(.getContext(), "redis: discarding bad PubSub connection: %s", )
	}
	 := .closeConn(.cn)
	.cn = nil
	return 
}

func ( *PubSub) () error {
	.mu.Lock()
	defer .mu.Unlock()

	if .closed {
		return pool.ErrClosed
	}
	.closed = true
	close(.exit)

	return .closeTheCn(pool.ErrClosed)
}

// Subscribe the client to the specified channels. It returns
// empty subscription if there are no channels.
func ( *PubSub) ( context.Context,  ...string) error {
	.mu.Lock()
	defer .mu.Unlock()

	 := .subscribe(, "subscribe", ...)
	if .channels == nil {
		.channels = make(map[string]struct{})
	}
	for ,  := range  {
		.channels[] = struct{}{}
	}
	return 
}

// PSubscribe the client to the given patterns. It returns
// empty subscription if there are no patterns.
func ( *PubSub) ( context.Context,  ...string) error {
	.mu.Lock()
	defer .mu.Unlock()

	 := .subscribe(, "psubscribe", ...)
	if .patterns == nil {
		.patterns = make(map[string]struct{})
	}
	for ,  := range  {
		.patterns[] = struct{}{}
	}
	return 
}

// SSubscribe Subscribes the client to the specified shard channels.
func ( *PubSub) ( context.Context,  ...string) error {
	.mu.Lock()
	defer .mu.Unlock()

	 := .subscribe(, "ssubscribe", ...)
	if .schannels == nil {
		.schannels = make(map[string]struct{})
	}
	for ,  := range  {
		.schannels[] = struct{}{}
	}
	return 
}

// Unsubscribe the client from the given channels, or from all of
// them if none is given.
func ( *PubSub) ( context.Context,  ...string) error {
	.mu.Lock()
	defer .mu.Unlock()

	if len() > 0 {
		for ,  := range  {
			delete(.channels, )
		}
	} else {
		// Unsubscribe from all channels.
		for  := range .channels {
			delete(.channels, )
		}
	}

	 := .subscribe(, "unsubscribe", ...)
	return 
}

// PUnsubscribe the client from the given patterns, or from all of
// them if none is given.
func ( *PubSub) ( context.Context,  ...string) error {
	.mu.Lock()
	defer .mu.Unlock()

	if len() > 0 {
		for ,  := range  {
			delete(.patterns, )
		}
	} else {
		// Unsubscribe from all patterns.
		for  := range .patterns {
			delete(.patterns, )
		}
	}

	 := .subscribe(, "punsubscribe", ...)
	return 
}

// SUnsubscribe unsubscribes the client from the given shard channels,
// or from all of them if none is given.
func ( *PubSub) ( context.Context,  ...string) error {
	.mu.Lock()
	defer .mu.Unlock()

	if len() > 0 {
		for ,  := range  {
			delete(.schannels, )
		}
	} else {
		// Unsubscribe from all channels.
		for  := range .schannels {
			delete(.schannels, )
		}
	}

	 := .subscribe(, "sunsubscribe", ...)
	return 
}

func ( *PubSub) ( context.Context,  string,  ...string) error {
	,  := .conn(, )
	if  != nil {
		return 
	}

	 = ._subscribe(, , , )
	.releaseConn(, , , false)
	return 
}

func ( *PubSub) ( context.Context,  ...string) error {
	 := []interface{}{"ping"}
	if len() == 1 {
		 = append(, [0])
	}
	 := NewCmd(, ...)

	.mu.Lock()
	defer .mu.Unlock()

	,  := .conn(, nil)
	if  != nil {
		return 
	}

	 = .writeCmd(, , )
	.releaseConn(, , , false)
	return 
}

// Subscription received after a successful subscription to channel.
type Subscription struct {
	// Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
	Kind string
	// Channel name we have subscribed to.
	Channel string
	// Number of channels we are currently subscribed to.
	Count int
}

func ( *Subscription) () string {
	return fmt.Sprintf("%s: %s", .Kind, .Channel)
}

// Message received as result of a PUBLISH command issued by another client.
type Message struct {
	Channel      string
	Pattern      string
	Payload      string
	PayloadSlice []string
}

func ( *Message) () string {
	return fmt.Sprintf("Message<%s: %s>", .Channel, .Payload)
}

// Pong received as result of a PING command issued by another client.
type Pong struct {
	Payload string
}

func ( *Pong) () string {
	if .Payload != "" {
		return fmt.Sprintf("Pong<%s>", .Payload)
	}
	return "Pong"
}

func ( *PubSub) ( interface{}) (interface{}, error) {
	switch reply := .(type) {
	case string:
		return &Pong{
			Payload: ,
		}, nil
	case []interface{}:
		switch  := [0].(string);  {
		case "subscribe", "unsubscribe", "psubscribe", "punsubscribe", "ssubscribe", "sunsubscribe":
			// Can be nil in case of "unsubscribe".
			,  := [1].(string)
			return &Subscription{
				Kind:    ,
				Channel: ,
				Count:   int([2].(int64)),
			}, nil
		case "message", "smessage":
			switch payload := [2].(type) {
			case string:
				return &Message{
					Channel: [1].(string),
					Payload: ,
				}, nil
			case []interface{}:
				 := make([]string, len())
				for ,  := range  {
					[] = .(string)
				}
				return &Message{
					Channel:      [1].(string),
					PayloadSlice: ,
				}, nil
			default:
				return nil, fmt.Errorf("redis: unsupported pubsub message payload: %T", )
			}
		case "pmessage":
			return &Message{
				Pattern: [1].(string),
				Channel: [2].(string),
				Payload: [3].(string),
			}, nil
		case "pong":
			return &Pong{
				Payload: [1].(string),
			}, nil
		default:
			return nil, fmt.Errorf("redis: unsupported pubsub message: %q", )
		}
	default:
		return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", )
	}
}

// ReceiveTimeout acts like Receive but returns an error if message
// is not received in time. This is low-level API and in most cases
// Channel should be used instead.
func ( *PubSub) ( context.Context,  time.Duration) (interface{}, error) {
	if .cmd == nil {
		.cmd = NewCmd()
	}

	// Don't hold the lock to allow subscriptions and pings.

	,  := .connWithLock()
	if  != nil {
		return nil, 
	}

	 = .WithReader(context.Background(), , func( *proto.Reader) error {
		return .cmd.readReply()
	})

	.releaseConnWithLock(, , ,  > 0)

	if  != nil {
		return nil, 
	}

	return .newMessage(.cmd.Val())
}

// Receive returns a message as a Subscription, Message, Pong or error.
// See PubSub example for details. This is low-level API and in most cases
// Channel should be used instead.
func ( *PubSub) ( context.Context) (interface{}, error) {
	return .ReceiveTimeout(, 0)
}

// ReceiveMessage returns a Message or error ignoring Subscription and Pong
// messages. This is low-level API and in most cases Channel should be used
// instead.
func ( *PubSub) ( context.Context) (*Message, error) {
	for {
		,  := .Receive()
		if  != nil {
			return nil, 
		}

		switch msg := .(type) {
		case *Subscription:
			// Ignore.
		case *Pong:
			// Ignore.
		case *Message:
			return , nil
		default:
			 := fmt.Errorf("redis: unknown message: %T", )
			return nil, 
		}
	}
}

func ( *PubSub) () context.Context {
	if .cmd != nil {
		return .cmd.ctx
	}
	return context.Background()
}

//------------------------------------------------------------------------------

// Channel returns a Go channel for concurrently receiving messages.
// The channel is closed together with the PubSub. If the Go channel
// is blocked full for 30 seconds the message is dropped.
// Receive* APIs can not be used after channel is created.
//
// go-redis periodically sends ping messages to test connection health
// and re-subscribes if ping can not not received for 30 seconds.
func ( *PubSub) ( ...ChannelOption) <-chan *Message {
	.chOnce.Do(func() {
		.msgCh = newChannel(, ...)
		.msgCh.initMsgChan()
	})
	if .msgCh == nil {
		 := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions")
		panic()
	}
	return .msgCh.msgCh
}

// ChannelSize is like Channel, but creates a Go channel
// with specified buffer size.
//
// Deprecated: use Channel(WithChannelSize(size)), remove in v9.
func ( *PubSub) ( int) <-chan *Message {
	return .Channel(WithChannelSize())
}

// ChannelWithSubscriptions is like Channel, but message type can be either
// *Subscription or *Message. Subscription messages can be used to detect
// reconnections.
//
// ChannelWithSubscriptions can not be used together with Channel or ChannelSize.
func ( *PubSub) ( ...ChannelOption) <-chan interface{} {
	.chOnce.Do(func() {
		.allCh = newChannel(, ...)
		.allCh.initAllChan()
	})
	if .allCh == nil {
		 := fmt.Errorf("redis: ChannelWithSubscriptions can't be called after Channel")
		panic()
	}
	return .allCh.allCh
}

type ChannelOption func(c *channel)

// WithChannelSize specifies the Go chan size that is used to buffer incoming messages.
//
// The default is 100 messages.
func ( int) ChannelOption {
	return func( *channel) {
		.chanSize = 
	}
}

// WithChannelHealthCheckInterval specifies the health check interval.
// PubSub will ping Redis Server if it does not receive any messages within the interval.
// To disable health check, use zero interval.
//
// The default is 3 seconds.
func ( time.Duration) ChannelOption {
	return func( *channel) {
		.checkInterval = 
	}
}

// WithChannelSendTimeout specifies the channel send timeout after which
// the message is dropped.
//
// The default is 60 seconds.
func ( time.Duration) ChannelOption {
	return func( *channel) {
		.chanSendTimeout = 
	}
}

type channel struct {
	pubSub *PubSub

	msgCh chan *Message
	allCh chan interface{}
	ping  chan struct{}

	chanSize        int
	chanSendTimeout time.Duration
	checkInterval   time.Duration
}

func newChannel( *PubSub,  ...ChannelOption) *channel {
	 := &channel{
		pubSub: ,

		chanSize:        100,
		chanSendTimeout: time.Minute,
		checkInterval:   3 * time.Second,
	}
	for ,  := range  {
		()
	}
	if .checkInterval > 0 {
		.initHealthCheck()
	}
	return 
}

func ( *channel) () {
	 := context.TODO()
	.ping = make(chan struct{}, 1)

	go func() {
		 := time.NewTimer(time.Minute)
		.Stop()

		for {
			.Reset(.checkInterval)
			select {
			case <-.ping:
				if !.Stop() {
					<-.C
				}
			case <-.C:
				if  := .pubSub.Ping();  != nil {
					.pubSub.mu.Lock()
					.pubSub.reconnect(, )
					.pubSub.mu.Unlock()
				}
			case <-.pubSub.exit:
				return
			}
		}
	}()
}

// initMsgChan must be in sync with initAllChan.
func ( *channel) () {
	 := context.TODO()
	.msgCh = make(chan *Message, .chanSize)

	go func() {
		 := time.NewTimer(time.Minute)
		.Stop()

		var  int
		for {
			,  := .pubSub.Receive()
			if  != nil {
				if  == pool.ErrClosed {
					close(.msgCh)
					return
				}
				if  > 0 {
					time.Sleep(100 * time.Millisecond)
				}
				++
				continue
			}

			 = 0

			// Any message is as good as a ping.
			select {
			case .ping <- struct{}{}:
			default:
			}

			switch msg := .(type) {
			case *Subscription:
				// Ignore.
			case *Pong:
				// Ignore.
			case *Message:
				.Reset(.chanSendTimeout)
				select {
				case .msgCh <- :
					if !.Stop() {
						<-.C
					}
				case <-.C:
					internal.Logger.Printf(
						, "redis: %s channel is full for %s (message is dropped)",
						, .chanSendTimeout)
				}
			default:
				internal.Logger.Printf(, "redis: unknown message type: %T", )
			}
		}
	}()
}

// initAllChan must be in sync with initMsgChan.
func ( *channel) () {
	 := context.TODO()
	.allCh = make(chan interface{}, .chanSize)

	go func() {
		 := time.NewTimer(time.Minute)
		.Stop()

		var  int
		for {
			,  := .pubSub.Receive()
			if  != nil {
				if  == pool.ErrClosed {
					close(.allCh)
					return
				}
				if  > 0 {
					time.Sleep(100 * time.Millisecond)
				}
				++
				continue
			}

			 = 0

			// Any message is as good as a ping.
			select {
			case .ping <- struct{}{}:
			default:
			}

			switch msg := .(type) {
			case *Pong:
				// Ignore.
			case *Subscription, *Message:
				.Reset(.chanSendTimeout)
				select {
				case .allCh <- :
					if !.Stop() {
						<-.C
					}
				case <-.C:
					internal.Logger.Printf(
						, "redis: %s channel is full for %s (message is dropped)",
						, .chanSendTimeout)
				}
			default:
				internal.Logger.Printf(, "redis: unknown message type: %T", )
			}
		}
	}()
}