package pond

import (
	
	
	
	
	
	

	
	
)

const (
	// Constant for an unbounded queue
	Unbounded               = math.MaxInt
	DefaultQueueSize        = Unbounded
	DefaultNonBlocking      = false
	LinkedBufferInitialSize = 1024
	LinkedBufferMaxCapacity = 100 * 1024
)

var (
	ErrQueueFull             = errors.New("queue is full")
	ErrQueueEmpty            = errors.New("queue is empty")
	ErrPoolStopped           = errors.New("pool stopped")
	ErrMaxConcurrencyReached = errors.New("max concurrency reached")

	poolStoppedFuture = func() Task {
		,  := future.NewFuture(context.Background())
		(ErrPoolStopped)
		return 
	}()
)

// BasePool defines methods common to all pool types.
type BasePool interface {
	// Returns the number of worker goroutines that are currently active (executing a task) in the pool.
	RunningWorkers() int64

	// Returns the total number of tasks submitted to the pool since its creation.
	SubmittedTasks() uint64

	// Returns the number of tasks that are currently waiting in the pool's queue.
	WaitingTasks() uint64

	// Returns the number of tasks that have completed with an error.
	FailedTasks() uint64

	// Returns the number of tasks that have completed successfully.
	SuccessfulTasks() uint64

	// Returns the total number of tasks that have completed (either successfully or with an error).
	// Tasks accepted by the pool but canceled before execution are excluded.
	CompletedTasks() uint64

	// Returns the number of tasks that have been dropped because the queue was full.
	DroppedTasks() uint64

	// Returns the number of tasks accepted by the pool that were canceled
	// before executing user code due to pool context cancellation.
	CanceledTasks() uint64

	// Returns the maximum concurrency of the pool.
	MaxConcurrency() int

	// Returns the size of the task queue.
	QueueSize() int

	// Returns true if the pool is non-blocking, meaning that it will not block when the task queue is full.
	// In a non-blocking pool, tasks that cannot be submitted to the queue will be dropped.
	// By default, pools are blocking, meaning that they will block when the task queue is full.
	NonBlocking() bool

	// Returns the context associated with this pool.
	Context() context.Context

	// Stops the pool and returns a future that can be used to wait for all tasks pending to complete.
	// The pool will not accept new tasks after it has been stopped.
	Stop() Task

	// Stops the pool and waits for all tasks to complete.
	StopAndWait()

	// Returns true if the pool has been stopped or its context has been cancelled.
	Stopped() bool

	// Resizes the pool by changing the maximum concurrency (number of workers) of the pool.
	// The new max concurrency must be greater than 0.
	// If the new max concurrency is less than the current number of running workers, the pool will continue to run with the new max concurrency.
	Resize(maxConcurrency int)
}

// Represents a pool of goroutines that can execute tasks concurrently.
type Pool interface {
	BasePool

	// Submits a task to the pool without waiting for it to complete.
	// The pool will not accept new tasks after it has been stopped.
	// If the pool has been stopped, this method will return ErrPoolStopped.
	Go(task func()) error

	// Submits a task to the pool and returns a future that can be used to wait for the task to complete.
	// The pool will not accept new tasks after it has been stopped.
	// If the pool has been stopped, the returned future will resolve to ErrPoolStopped.
	Submit(task func()) Task

	// Submits a task to the pool and returns a future that can be used to wait for the task to complete.
	// The task function must return an error.
	// The pool will not accept new tasks after it has been stopped.
	// If the pool has been stopped, the returned future will resolve to ErrPoolStopped.
	SubmitErr(task func() error) Task

	// Attempts to submit a task to the pool and returns a future that can be used to wait for the task to complete
	// and a boolean indicating whether the task was submitted successfully.
	// The pool will not accept new tasks after it has been stopped.
	// If the pool has been stopped, the returned future will resolve to ErrPoolStopped.
	TrySubmit(task func()) (Task, bool)

	// Attempts to submit a task to the pool and returns a future that can be used to wait for the task to complete
	// and a boolean indicating whether the task was submitted successfully.
	// The task function must return an error.
	// The pool will not accept new tasks after it has been stopped.
	// If the pool has been stopped, the returned future will resolve to ErrPoolStopped.
	TrySubmitErr(task func() error) (Task, bool)

	// Creates a new subpool with the specified maximum concurrency and options.
	NewSubpool(maxConcurrency int, options ...Option) Pool

	// Creates a new task group.
	NewGroup() TaskGroup

	// Creates a new task group with the specified context.
	NewGroupContext(ctx context.Context) TaskGroup
}

type pool struct {
	mutex               sync.Mutex
	parent              *pool
	ctx                 context.Context
	cancel              context.CancelCauseFunc
	nonBlocking         bool
	panicRecovery       bool
	maxConcurrency      int
	closed              atomic.Bool
	workerCount         atomic.Int64
	workerWaitGroup     sync.WaitGroup
	submitWaiters       chan struct{}
	queueSize           int
	tasks               *linkedbuffer.LinkedBuffer[any]
	submittedTaskCount  atomic.Uint64
	successfulTaskCount atomic.Uint64
	failedTaskCount     atomic.Uint64
	droppedTaskCount    atomic.Uint64
	canceledTaskCount   atomic.Uint64
}

func ( *pool) () context.Context {
	return .ctx
}

func ( *pool) () bool {
	return .closed.Load() || .ctx.Err() != nil
}

func ( *pool) () int {
	.mutex.Lock()
	defer .mutex.Unlock()

	return .maxConcurrency
}

func ( *pool) ( int) {
	if  == 0 {
		 = math.MaxInt
	}

	if  < 0 {
		panic(errors.New("maxConcurrency must be greater than or equal to 0"))
	}

	.mutex.Lock()

	// Calculate the number of new workers to launch to reach the new max concurrency or the number of tasks in the queue, whichever is smaller
	 := int(math.Min(float64(-.maxConcurrency), float64(.tasks.Len())))

	.maxConcurrency = 

	if  > 0 {
		.workerCount.Add(int64())
		.workerWaitGroup.Add()
	}

	.mutex.Unlock()

	// Launch the new workers
	for  := 0;  < ; ++ {
		.launchWorker(nil)
	}
}

func ( *pool) () int {
	return .queueSize
}

func ( *pool) () bool {
	return .nonBlocking
}

func ( *pool) () int64 {
	return .workerCount.Load()
}

func ( *pool) () uint64 {
	return .submittedTaskCount.Load()
}

func ( *pool) () uint64 {
	return .tasks.Len()
}

func ( *pool) () uint64 {
	return .failedTaskCount.Load()
}

func ( *pool) () uint64 {
	return .successfulTaskCount.Load()
}

func ( *pool) () uint64 {
	return .successfulTaskCount.Load() + .failedTaskCount.Load()
}

func ( *pool) () uint64 {
	return .droppedTaskCount.Load()
}

func ( *pool) () uint64 {
	return .canceledTaskCount.Load()
}

func ( *pool) ( any) {
	var ,  error
	 := false
	defer func() {
		if ! {
			// In case of abnormal exit (e.g. runtime.Goexit() in the task),
			// launch a new worker to execute the next task in the queue.
			.updateMetrics(fmt.Errorf("worker exited abnormally: %w", ))

			,  := .readTask()
			if  != nil {
				return
			}

			if  != nil {
				.launchWorker()
				.notifySubmitWaiter()
			}
		}
	}()
	for {
		if  != nil {
			_,  = invokeTask[any](, .panicRecovery)

			.updateMetrics()
		}

		,  = .readTask()

		if  != nil {
			 = true
			return
		}
	}
}

func ( *pool) ( any) func() ( any,  error) {
	return func() ( any,  error) {
		if  != nil {
			,  = invokeTask[any](, .panicRecovery)

			.updateMetrics()
		}

		// Attempt to submit the next task to the parent pool
		if ,  := .readTask();  == nil {
			for {
				 := .parent.submit(.(), .nonBlocking)
				if  == nil {
					break
				}

				// Wrap the error with the context canceled error to reflect that the task was canceled.
				if errors.Is(, ErrPoolStopped) {
					 = errors.Join(ErrContextCanceled, )
					.updateMetrics()
					.parent.updateMetrics()
				}

				// If the parent pool is stopped/canceled it won't accept submissions.
				// Keep draining the subpool queue so workers can exit cleanly.
				,  = .readTask()
				if  != nil {
					break
				}
			}
		}

		return
	}
}

func ( *pool) ( func()) error {
	return .submit(, .nonBlocking)
}

func ( *pool) ( func()) Task {
	,  := .wrapAndSubmit(, .nonBlocking)
	return 
}

func ( *pool) ( func() error) Task {
	,  := .wrapAndSubmit(, .nonBlocking)
	return 
}

func ( *pool) ( func()) (Task, bool) {
	return .wrapAndSubmit(, true)
}

func ( *pool) ( func() error) (Task, bool) {
	return .wrapAndSubmit(, true)
}

func ( *pool) ( any,  bool) (Task, bool) {
	if .Stopped() {
		return poolStoppedFuture, false
	}

	, ,  := .wrapTask()

	if  := .submit(, );  != nil {
		()
		return , false
	}

	return , true
}

func ( *pool) ( any) (Task, func() error, func(error)) {
	 := .Context()
	,  := future.NewFuture()

	 := wrapTask[struct{}, func(error)](, , , .panicRecovery)

	return , , 
}

func ( *pool) ( any,  bool) ( error) {

	.submittedTaskCount.Add(1)

	if  {
		 = .trySubmit()
	} else {
		 = .blockingTrySubmit()
	}

	if  != nil {
		.droppedTaskCount.Add(1)
	}

	return
}

func ( *pool) ( any) error {
	for {
		if  := .trySubmit();  != ErrQueueFull {
			return 
		}

		// No space left in the queue, wait until a slot is released
		select {
		case <-.ctx.Done():
			return .ctx.Err()
		case <-.submitWaiters:
			select {
			case <-.ctx.Done():
				return .ctx.Err()
			default:
			}
		}
	}
}

func ( *pool) ( any) error {
	.mutex.Lock()

	// Check if the pool has been stopped while holding the lock
	// to avoid race conditions on the workers wait group if the pool is being stopped.
	if .Stopped() {
		.mutex.Unlock()
		return ErrPoolStopped
	}

	 := .queueSize > 0
	 := int(.tasks.Len())

	// When queue is enabled, check if it is full
	if  &&  >= .queueSize {
		.mutex.Unlock()
		return ErrQueueFull
	}

	if int(.workerCount.Load()) >= .maxConcurrency {
		// When queue is disabled, return an error immediately if max concurrency is reached
		if ! {
			.mutex.Unlock()
			return ErrQueueFull
		}

		// If queue is enabled, push the task at the back of the queue
		.tasks.Write()
		.mutex.Unlock()
		return nil
	}

	.workerCount.Add(1)
	.workerWaitGroup.Add(1)

	if  &&  > 0 {
		// Push the task at the back of the queue
		.tasks.Write()

		// Pop the front task
		, _ = .tasks.Read()
	}

	.mutex.Unlock()

	.launchWorker()

	// Notify a submit waiter there is room in the queue for a new task
	.notifySubmitWaiter()

	return nil
}

func ( *pool) ( any) {
	if .parent == nil {
		// Launch a new worker to execute the task
		go .worker()
	} else {
		// Submit task to the parent pool wrapped in a function that will
		// submit the next task to the parent pool when it completes (subpool worker)
		.parent.submit(.subpoolWorker(), .nonBlocking)
	}
}

func ( *pool) () ( any,  error) {
	.mutex.Lock()

	if .tasks.Len() == 0 {
		// No more tasks in the queue, worker will exit
		.workerCount.Add(-1)
		.workerWaitGroup.Done()
		.mutex.Unlock()

		// Notify a submit waiter there is room in the queue for a new task
		.notifySubmitWaiter()

		 = ErrQueueEmpty
		return
	}

	if .maxConcurrency > 0 && int(.workerCount.Load()) > .maxConcurrency {
		// Max concurrency reached, kill the worker
		.workerCount.Add(-1)
		.workerWaitGroup.Done()
		.mutex.Unlock()

		 = ErrMaxConcurrencyReached
		return
	}

	, _ = .tasks.Read()

	.mutex.Unlock()

	// Notify a submit waiter there is room in the queue for a new task
	.notifySubmitWaiter()

	return
}

func ( *pool) () {
	// Wake up one of the waiters (if any)
	select {
	case .submitWaiters <- struct{}{}:
	default:
		return
	}
}

func ( *pool) ( error) {
	if  != nil {
		if errors.Is(, ErrContextCanceled) {
			.canceledTaskCount.Add(1)
		} else {
			.failedTaskCount.Add(1)
		}
	} else {
		.successfulTaskCount.Add(1)
	}
}

func ( *pool) () Task {
	return Submit(func() {
		// Stop accepting new tasks while holding the lock to avoid race conditions.
		.mutex.Lock()
		.closed.Store(true)
		.mutex.Unlock()

		// Wait for all workers to finish executing all tasks (including the ones in the queue)
		.workerWaitGroup.Wait()

		// Cancel the context with a pool stopped error to signal that the pool has been stopped
		.cancel(ErrPoolStopped)
	})
}

func ( *pool) () {
	.Stop().Wait()
}

func ( *pool) ( int,  ...Option) Pool {
	return newPool(, , ...)
}

func ( *pool) () TaskGroup {
	return newTaskGroup(, .ctx)
}

func ( *pool) ( context.Context) TaskGroup {
	return newTaskGroup(, )
}

func newPool( int,  *pool,  ...Option) *pool {

	if  != nil {
		if  > .MaxConcurrency() {
			panic(fmt.Errorf("maxConcurrency cannot be greater than the parent pool's maxConcurrency (%d)", .MaxConcurrency()))
		}

		if  == 0 {
			 = .MaxConcurrency()
		}
	}

	if  == 0 {
		 = math.MaxInt
	}

	if  < 0 {
		panic(errors.New("maxConcurrency must be greater than or equal to 0"))
	}

	 := &pool{
		ctx:            context.Background(),
		nonBlocking:    DefaultNonBlocking,
		panicRecovery:  true,
		maxConcurrency: ,
		queueSize:      DefaultQueueSize,
		// Buffer size of 1 to prevent deadlock when read on the submitWaiters channel happens
		// after the write on the same channel in the notifySubmitWaiter method.
		// See https://github.com/alitto/pond/issues/108
		submitWaiters: make(chan struct{}, 1),
	}

	if  != nil {
		.parent = 
		.ctx = .Context()
		.queueSize = .queueSize
		.nonBlocking = .nonBlocking
		.panicRecovery = .panicRecovery
	}

	for ,  := range  {
		()
	}

	.ctx, .cancel = context.WithCancelCause(.ctx)

	.tasks = linkedbuffer.NewLinkedBuffer[any](LinkedBufferInitialSize, LinkedBufferMaxCapacity)

	return 
}

// NewPool creates a new pool with the given maximum concurrency and options.
// The new maximum concurrency must be greater than or equal to 0 (0 means no limit).
func ( int,  ...Option) Pool {
	return newPool(, nil, ...)
}