// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package asynq

import (
	
	
	

	
	
	
	
)

// heartbeater is responsible for writing process info to redis periodically to
// indicate that the background worker process is up.
type heartbeater struct {
	logger *log.Logger
	broker base.Broker
	clock  timeutil.Clock

	// channel to communicate back to the long running "heartbeater" goroutine.
	done chan struct{}

	// interval between heartbeats.
	interval time.Duration

	// following fields are initialized at construction time and are immutable.
	host           string
	pid            int
	serverID       string
	concurrency    int
	queues         map[string]int
	strictPriority bool

	// following fields are mutable and should be accessed only by the
	// heartbeater goroutine. In other words, confine these variables
	// to this goroutine only.
	started time.Time
	workers map[string]*workerInfo

	// state is shared with other goroutine but is concurrency safe.
	state *serverState

	// channels to receive updates on active workers.
	starting <-chan *workerInfo
	finished <-chan *base.TaskMessage
}

type heartbeaterParams struct {
	logger         *log.Logger
	broker         base.Broker
	interval       time.Duration
	concurrency    int
	queues         map[string]int
	strictPriority bool
	state          *serverState
	starting       <-chan *workerInfo
	finished       <-chan *base.TaskMessage
}

func newHeartbeater( heartbeaterParams) *heartbeater {
	,  := os.Hostname()
	if  != nil {
		 = "unknown-host"
	}

	return &heartbeater{
		logger:   .logger,
		broker:   .broker,
		clock:    timeutil.NewRealClock(),
		done:     make(chan struct{}),
		interval: .interval,

		host:           ,
		pid:            os.Getpid(),
		serverID:       uuid.New().String(),
		concurrency:    .concurrency,
		queues:         .queues,
		strictPriority: .strictPriority,

		state:    .state,
		workers:  make(map[string]*workerInfo),
		starting: .starting,
		finished: .finished,
	}
}

func ( *heartbeater) () {
	.logger.Debug("Heartbeater shutting down...")
	// Signal the heartbeater goroutine to stop.
	.done <- struct{}{}
}

// A workerInfo holds an active worker information.
type workerInfo struct {
	// the task message the worker is processing.
	msg *base.TaskMessage
	// the time the worker has started processing the message.
	started time.Time
	// deadline the worker has to finish processing the task by.
	deadline time.Time
	// lease the worker holds for the task.
	lease *base.Lease
}

func ( *heartbeater) ( *sync.WaitGroup) {
	.Add(1)
	go func() {
		defer .Done()

		.started = .clock.Now()

		.beat()

		 := time.NewTimer(.interval)
		for {
			select {
			case <-.done:
				.broker.ClearServerState(.host, .pid, .serverID)
				.logger.Debug("Heartbeater done")
				.Stop()
				return

			case <-.C:
				.beat()
				.Reset(.interval)

			case  := <-.starting:
				.workers[.msg.ID] = 

			case  := <-.finished:
				delete(.workers, .ID)
			}
		}
	}()
}

// beat extends lease for workers and writes server/worker info to redis.
func ( *heartbeater) () {
	.state.mu.Lock()
	 := .state.value.String()
	.state.mu.Unlock()

	 := base.ServerInfo{
		Host:              .host,
		PID:               .pid,
		ServerID:          .serverID,
		Concurrency:       .concurrency,
		Queues:            .queues,
		StrictPriority:    .strictPriority,
		Status:            ,
		Started:           .started,
		ActiveWorkerCount: len(.workers),
	}

	var  []*base.WorkerInfo
	 := make(map[string][]string)
	for ,  := range .workers {
		 = append(, &base.WorkerInfo{
			Host:     .host,
			PID:      .pid,
			ServerID: .serverID,
			ID:       ,
			Type:     .msg.Type,
			Queue:    .msg.Queue,
			Payload:  .msg.Payload,
			Started:  .started,
			Deadline: .deadline,
		})
		// Check lease before adding to the set to make sure not to extend the lease if the lease is already expired.
		if .lease.IsValid() {
			[.msg.Queue] = append([.msg.Queue], )
		} else {
			.lease.NotifyExpiration() // notify processor if the lease is expired
		}
	}

	// Note: Set TTL to be long enough so that it won't expire before we write again
	// and short enough to expire quickly once the process is shut down or killed.
	if  := .broker.WriteServerState(&, , .interval*2);  != nil {
		.logger.Errorf("Failed to write server state data: %v", )
	}

	for ,  := range  {
		,  := .broker.ExtendLease(, ...)
		if  != nil {
			.logger.Errorf("Failed to extend lease for tasks %v: %v", , )
			continue
		}
		for ,  := range  {
			if  := .workers[].lease; !.Reset() {
				.logger.Warnf("Lease reset failed for %s; lease deadline: %v", , .Deadline())
			}
		}
	}
}