// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/task/thread_pool/thread_group_impl.h"

#include <optional>
#include <string_view>

#include "base/logging.h"
#include "base/metrics/histogram_macros.h"
#include "base/profiler/thread_group_profiler.h"
#include "base/sequence_token.h"
#include "base/strings/stringprintf.h"
#include "base/task/common/checked_lock.h"
#include "base/task/thread_pool/worker_thread.h"
#include "base/threading/platform_thread_metrics.h"
#include "base/threading/scoped_blocking_call.h"
#include "base/threading/scoped_blocking_call_internal.h"
#include "base/threading/thread_checker.h"
#include "base/time/time_override.h"
#include "base/trace_event/trace_event.h"
#include "third_party/abseil-cpp/absl/container/inlined_vector.h"

#if BUILDFLAG(ARKWEB_PERFORMANCE_SCHEDULING) && !BUILDFLAG(ARKWEB_TEST)
#include "arkweb/chromium_ext/content/public/common/content_switches_ext.h"
#include "base/command_line.h"
#include "base/ohos/sys_info_utils_ext.h"
#endif

namespace base::internal {

// Upon destruction, executes actions that control the number of active workers.
// Useful to satisfy locking requirements of these actions.
class ThreadGroupImpl::ScopedCommandsExecutor
    : public ThreadGroup::BaseScopedCommandsExecutor {
 public:
  explicit ScopedCommandsExecutor(ThreadGroupImpl* outer)
      : BaseScopedCommandsExecutor(outer) {}

  ScopedCommandsExecutor(const ScopedCommandsExecutor&) = delete;
  ScopedCommandsExecutor& operator=(const ScopedCommandsExecutor&) = delete;
  ~ScopedCommandsExecutor() override {
    CheckedLock::AssertNoLockHeldOnCurrentThread();

    // Wake up workers.
    for (auto& worker : workers_to_wake_up_) {
      worker->WakeUp();
    }
  }

  void ScheduleWakeUp(scoped_refptr<WorkerThread> worker) {
    workers_to_wake_up_.emplace_back(std::move(worker));
  }

 private:
  absl::InlinedVector<scoped_refptr<WorkerThread>, 2> workers_to_wake_up_;
};

class ThreadGroupImpl::WorkerDelegate : public WorkerThread::Delegate,
                                        public BlockingObserver {
 public:
  // |outer| owns the worker for which this delegate is constructed. If
  // |is_excess| is true, this worker will be eligible for reclaim.
  explicit WorkerDelegate(TrackedRef<ThreadGroupImpl> outer, bool is_excess);
  WorkerDelegate(const WorkerDelegate&) = delete;
  WorkerDelegate& operator=(const WorkerDelegate&) = delete;

  // WorkerThread::Delegate:
  void OnMainEntry(WorkerThread* worker) override;
  void OnMainExit(WorkerThread* worker) override;
  RegisteredTaskSource GetWork(WorkerThread* worker) override;
  RegisteredTaskSource SwapProcessedTask(RegisteredTaskSource task_source,
                                         WorkerThread* worker) override;
  void RecordUnnecessaryWakeup() override;
  TimeDelta GetSleepTimeout() override;

  // BlockingObserver:
  void BlockingStarted(BlockingType blocking_type) override;
  void BlockingTypeUpgraded() override;
  void BlockingEnded() override;

  // WorkerThread::Delegate:

  // Notifies the worker of shutdown, possibly marking the running task as
  // MAY_BLOCK.
  void OnShutdownStartedLockRequired(BaseScopedCommandsExecutor* executor)
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  // Increments max [best effort] tasks iff this worker has been within a
  // ScopedBlockingCall for more than |may_block_threshold|.
  void MaybeIncrementMaxTasksLockRequired()
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  // Increments max [best effort] tasks.
  void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  TaskPriority current_task_priority_lock_required() const
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
    return *read_any().current_task_priority;
  }

  // Exposed for AnnotateAcquiredLockAlias.
  const CheckedLock& lock() const LOCK_RETURNED(outer_->lock_) {
    return outer_->lock_;
  }

 private:
  // Returns true iff the worker can get work. Cleans up the worker or puts it
  // on the idle set if it can't get work.
  bool CanGetWorkLockRequired(BaseScopedCommandsExecutor* executor,
                              WorkerThread* worker)
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  // Calls cleanup on |worker| and removes it from the thread group. Called from
  // GetWork() when no work is available and CanCleanupLockRequired() returns
  // true.
  void CleanupLockRequired(BaseScopedCommandsExecutor* executor,
                           WorkerThread* worker)
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  // Called in GetWork() when a worker becomes idle.
  void OnWorkerBecomesIdleLockRequired(BaseScopedCommandsExecutor* executor,
                                       WorkerThread* worker)
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  RegisteredTaskSource GetWorkLockRequired(BaseScopedCommandsExecutor* executor,
                                           WorkerThread* worker)
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  // Returns true if |worker| is allowed to cleanup and remove itself from the
  // thread group. Called from GetWork() when no work is available.
  bool CanCleanupLockRequired(const WorkerThread* worker)
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  // Only used in DCHECKs.
  template <typename Worker>
  bool ContainsWorker(const std::vector<scoped_refptr<Worker>>& workers,
                      const WorkerThread* worker) {
    auto it = std::ranges::find_if(
        workers,
        [worker](const scoped_refptr<Worker>& i) { return i.get() == worker; });
    return it != workers.end();
  }

  // Accessed only from the worker thread.
  struct WorkerOnly {
    WorkerOnly();
    ~WorkerOnly();
    // Associated WorkerThread, if any, initialized in OnMainEntry().
    raw_ptr<WorkerThread> worker_thread_;

#if BUILDFLAG(IS_WIN)
    std::unique_ptr<win::ScopedWindowsThreadEnvironment> win_thread_environment;
#endif  // BUILDFLAG(IS_WIN)
  } worker_only_;

  // Writes from the worker thread protected by |outer_->lock_|. Reads from any
  // thread, protected by |outer_->lock_| when not on the worker thread.
  struct WriteWorkerReadAny {
    // The priority of the task the worker is currently running if any.
    std::optional<TaskPriority> current_task_priority;
    // The shutdown behavior of the task the worker is currently running if any.
    std::optional<TaskShutdownBehavior> current_shutdown_behavior;

    // Time when MayBlockScopeEntered() was last called. Reset when
    // BlockingScopeExited() is called.
    TimeTicks blocking_start_time;

    // Whether the worker is currently running a task (i.e. GetWork() has
    // returned a non-empty task source and DidProcessTask() hasn't been called
    // yet).
    bool is_running_task() const { return !!current_shutdown_behavior; }
  } write_worker_read_any_;

  WorkerOnly& worker_only() {
    DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
    return worker_only_;
  }

  WriteWorkerReadAny& write_worker() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
    DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
    return write_worker_read_any_;
  }

  const WriteWorkerReadAny& read_any() const
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
    return write_worker_read_any_;
  }

  const WriteWorkerReadAny& read_worker() const {
    DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
    return write_worker_read_any_;
  }

  const TrackedRef<ThreadGroupImpl> outer_;

  // Whether the worker is in excess. This must be decided at worker creation
  // time to prevent unnecessarily discarding TLS state, as well as any behavior
  // the OS has learned about a given thread.
  const bool is_excess_;

  // Whether |outer_->max_tasks_|/|outer_->max_best_effort_tasks_| were
  // incremented due to a ScopedBlockingCall on the thread.
  bool incremented_max_tasks_since_blocked_ GUARDED_BY(outer_->lock_) = false;
  bool incremented_max_best_effort_tasks_since_blocked_
      GUARDED_BY(outer_->lock_) = false;
  // Whether |outer_->max_tasks_| and |outer_->max_best_effort_tasks_| was
  // incremented due to running CONTINUE_ON_SHUTDOWN on the thread during
  // shutdown.
  bool incremented_max_tasks_for_shutdown_ GUARDED_BY(outer_->lock_) = false;

  // Verifies that specific calls are always made from the worker thread.
  THREAD_CHECKER(worker_thread_checker_);
};

ThreadGroupImpl::ThreadGroupImpl(std::string_view histogram_label,
                                 std::string_view thread_group_label,
                                 ThreadType thread_type_hint,
                                 int64_t thread_group_type,
                                 TrackedRef<TaskTracker> task_tracker,
                                 TrackedRef<Delegate> delegate,
                                 bool monitor_worker_thread_priorities)
    : ThreadGroup(histogram_label,
                  thread_group_label,
                  thread_type_hint,
                  std::move(task_tracker),
                  std::move(delegate)),
      thread_group_type_(thread_group_type),
      tracked_ref_factory_(this),
      monitor_worker_thread_priorities_(monitor_worker_thread_priorities) {
  DCHECK(!thread_group_label_.empty());
#if BUILDFLAG(ARKWEB_PERFORMANCE_SCHEDULING) && !BUILDFLAG(ARKWEB_TEST)
  base::CommandLine* command_line = base::CommandLine::ForCurrentProcess();
  cmd_enable_report_thread_pool_ = false;
  if (command_line) {
    cmd_enable_report_thread_pool_ = command_line->HasSwitch(switches::kEnableReportThreadPoolForeg);
  }
#endif
}

void ThreadGroupImpl::Start(
    size_t max_tasks,
    size_t max_best_effort_tasks,
    TimeDelta suggested_reclaim_time,
    scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
    WorkerThreadObserver* worker_thread_observer,
    WorkerEnvironment worker_environment,
    bool synchronous_thread_start_for_testing,
    std::optional<TimeDelta> may_block_threshold) {
#if DCHECK_IS_ON()
  DCHECK(!in_start().start_called);
  in_start().start_called = true;
#endif
  {
    ScopedCommandsExecutor executor(this);
    CheckedAutoLock auto_lock(lock_);

    ThreadGroup::StartImplLockRequired(
        max_tasks, max_best_effort_tasks, suggested_reclaim_time,
        service_thread_task_runner, worker_thread_observer, worker_environment,
        synchronous_thread_start_for_testing, may_block_threshold);

    DCHECK(workers_.empty());
    EnsureEnoughWorkersLockRequired(&executor);
  }

  if (ThreadGroupProfiler::IsProfilingEnabled()) {
    // This call posts a task, so do it outside of the lock.
    thread_group_profiler_.emplace(service_thread_task_runner,
                                   thread_group_type_);
  }
}

ThreadGroupImpl::~ThreadGroupImpl() {
  // ThreadGroup should only ever be deleted:
  //  1) In tests, after JoinForTesting().
  //  2) In production, iff initialization failed.
  // In both cases |workers_| should be empty.
  DCHECK(workers_.empty());
}

void ThreadGroupImpl::UpdateSortKey(TaskSource::Transaction transaction) {
  ScopedCommandsExecutor executor(this);
  UpdateSortKeyImpl(&executor, std::move(transaction));
}

void ThreadGroupImpl::PushTaskSourceAndWakeUpWorkers(
    RegisteredTaskSourceAndTransaction transaction_with_task_source) {
  ScopedCommandsExecutor executor(this);
  PushTaskSourceAndWakeUpWorkersImpl(&executor,
                                     std::move(transaction_with_task_source));
}

ThreadGroupImpl::WorkerDelegate::WorkerDelegate(
    TrackedRef<ThreadGroupImpl> outer,
    bool is_excess)
    : outer_(outer), is_excess_(is_excess) {
  // Bound in OnMainEntry().
  DETACH_FROM_THREAD(worker_thread_checker_);
}

ThreadGroupImpl::WorkerDelegate::WorkerOnly::WorkerOnly() = default;
ThreadGroupImpl::WorkerDelegate::WorkerOnly::~WorkerOnly() = default;

TimeDelta ThreadGroupImpl::WorkerDelegate::GetSleepTimeout() {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  if (!is_excess_) {
    return TimeDelta::Max();
  }
  // Sleep for an extra 10% to avoid the following pathological case:
  //   0) A task is running on a timer which matches
  //      |after_start().suggested_reclaim_time|.
  //   1) The timer fires and this worker is created by
  //      MaintainAtLeastOneIdleWorkerLockRequired() because the last idle
  //      worker was assigned the task.
  //   2) This worker begins sleeping |after_start().suggested_reclaim_time|
  //      (at the front of the idle set).
  //   3) The task assigned to the other worker completes and the worker goes
  //      back in the idle set (this worker may now second on the idle set;
  //      its GetLastUsedTime() is set to Now()).
  //   4) The sleep in (2) expires. Since (3) was fast this worker is likely
  //      to have been second on the idle set long enough for
  //      CanCleanupLockRequired() to be satisfied in which case this worker
  //      is cleaned up.
  //   5) The timer fires at roughly the same time and we're back to (1) if
  //      (4) resulted in a clean up; causing thread churn.
  //
  //   Sleeping 10% longer in (2) makes it much less likely that (4) occurs
  //   before (5). In that case (5) will cause (3) and refresh this worker's
  //   GetLastUsedTime(), making CanCleanupLockRequired() return false in (4)
  //   and avoiding churn.
  //
  //   Of course the same problem arises if in (0) the timer matches
  //   |after_start().suggested_reclaim_time * 1.1| but it's expected that any
  //   timer slower than |after_start().suggested_reclaim_time| will cause
  //   such churn during long idle periods. If this is a problem in practice,
  //   the standby thread configuration and algorithm should be revisited.
  return outer_->after_start().suggested_reclaim_time * 1.1;
}

void ThreadGroupImpl::WorkerDelegate::OnMainEntry(WorkerThread* worker) {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);

  {
#if DCHECK_IS_ON()
    CheckedAutoLock auto_lock(outer_->lock_);
    DCHECK(
        ContainsWorker(outer_->workers_, static_cast<WorkerThread*>(worker)));
#endif
  }

#if BUILDFLAG(IS_WIN)
  worker_only().win_thread_environment = GetScopedWindowsThreadEnvironment(
      outer_->after_start().worker_environment);
#endif  // BUILDFLAG(IS_WIN)

  std::string thread_name =
      StringPrintf("ThreadPool%sWorker", outer_->thread_group_label_.c_str());
  PlatformThread::SetName(thread_name);
#if BUILDFLAG(IS_ANDROID)
  if (outer_->monitor_worker_thread_priorities_) {
    PlatformThreadPriorityMonitor::Get().RegisterCurrentThread(thread_name);
  }
#endif  // BUILDFLAG(IS_ANDROID)

  outer_->BindToCurrentThread();
  worker_only().worker_thread_ = static_cast<WorkerThread*>(worker);
  SetBlockingObserverForCurrentThread(this);

  if (outer_->thread_group_profiler_) {
    outer_->thread_group_profiler_->OnWorkerThreadStarted(worker);
  }

  if (outer_->worker_started_for_testing_) {
    // When |worker_started_for_testing_| is set, the thread that starts
    // workers should wait for a worker to have started before starting the
    // next one, and there should only be one thread that wakes up workers at
    // a time.
    DCHECK(!outer_->worker_started_for_testing_->IsSignaled());
    outer_->worker_started_for_testing_->Signal();
  }
}

void ThreadGroupImpl::WorkerDelegate::OnMainExit(WorkerThread* worker_base) {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);

#if DCHECK_IS_ON()
  WorkerThread* worker = static_cast<WorkerThread*>(worker_base);
  {
    bool shutdown_complete = outer_->task_tracker_->IsShutdownComplete();
    CheckedAutoLock auto_lock(outer_->lock_);

    // |worker| should already have been removed from the idle workers set and
    // |workers_| by the time the thread is about to exit. (except in the
    // cases where the thread group is no longer going to be used - in which
    // case, it's fine for there to be invalid workers in the thread group).
    if (!shutdown_complete && !outer_->join_for_testing_started_) {
      DCHECK(!outer_->idle_workers_set_.Contains(worker));
      DCHECK(!ContainsWorker(outer_->workers_, worker));
    }
  }
#endif

#if BUILDFLAG(IS_WIN)
  worker_only().win_thread_environment.reset();
#endif  // BUILDFLAG(IS_WIN)

  if (outer_->thread_group_profiler_) {
    outer_->thread_group_profiler_->OnWorkerThreadExiting(worker_base);
  }

  // Count cleaned up workers for tests. It's important to do this here
  // instead of at the end of CleanupLockRequired() because some side-effects
  // of cleaning up happen outside the lock (e.g. recording histograms) and
  // resuming from tests must happen-after that point or checks on the main
  // thread will be flaky (crbug.com/1047733).
  CheckedAutoLock auto_lock(outer_->lock_);
  ++outer_->num_workers_cleaned_up_for_testing_;
#if DCHECK_IS_ON()
  outer_->some_workers_cleaned_up_for_testing_ = true;
#endif
  if (outer_->num_workers_cleaned_up_for_testing_cv_) {
    outer_->num_workers_cleaned_up_for_testing_cv_->Signal();
  }
}

bool ThreadGroupImpl::WorkerDelegate::CanGetWorkLockRequired(
    BaseScopedCommandsExecutor* executor,
    WorkerThread* worker_base) {
  WorkerThread* worker = static_cast<WorkerThread*>(worker_base);

  const bool is_on_idle_workers_set = outer_->IsOnIdleSetLockRequired(worker);
  DCHECK_EQ(is_on_idle_workers_set, outer_->idle_workers_set_.Contains(worker));

  // This occurs when the when WorkerThread::Delegate::WaitForWork() times out
  // (i.e. when the worker's wakes up after GetSleepTimeout()).
  if (is_on_idle_workers_set) {
    if (CanCleanupLockRequired(worker)) {
      CleanupLockRequired(executor, worker);
    }
    return false;
  }

  // If too many workers are running, this worker should not get work, until
  // tasks are no longer in excess (i.e. max tasks increases). This ensures that
  // if this worker is in excess, it gets a chance to being cleaned up.
  if (outer_->GetNumAwakeWorkersLockRequired() > outer_->max_tasks_) {
    OnWorkerBecomesIdleLockRequired(executor, worker);
    return false;
  }

  return true;
}

RegisteredTaskSource ThreadGroupImpl::WorkerDelegate::GetWork(
    WorkerThread* worker) {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  DCHECK(!read_worker().current_task_priority);
  DCHECK(!read_worker().current_shutdown_behavior);

  ScopedCommandsExecutor executor(outer_.get());
  RegisteredTaskSource task_source;
  {
    CheckedAutoLock auto_lock(outer_->lock_);
    task_source = GetWorkLockRequired(&executor, worker);
  }
  // Notify the profiler on the worker thread status when profiling is enabled.
  // This must be called without holding lock_ as lock_ is not a universal
  // predecessor that does not satisfy OnWorkerThreadIdle's CheckedLock.
  if (outer_->thread_group_profiler_) {
    // GetWork is only called when waking up, i.e. from an idle state. No need
    // to mark it idle again if no task source available.
    if (task_source) {
      outer_->thread_group_profiler_->OnWorkerThreadActive(worker);
    }
  }
  return task_source;
}

RegisteredTaskSource ThreadGroupImpl::WorkerDelegate::GetWorkLockRequired(
    BaseScopedCommandsExecutor* executor,
    WorkerThread* worker) {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  DCHECK(ContainsWorker(outer_->workers_, worker));

  if (!CanGetWorkLockRequired(executor, worker)) {
    return nullptr;
  }

  RegisteredTaskSource task_source;
  TaskPriority priority;
  while (!task_source && !outer_->priority_queue_.IsEmpty()) {
    // Enforce the CanRunPolicy and that no more than |max_best_effort_tasks_|
    // BEST_EFFORT tasks run concurrently.
    priority = outer_->priority_queue_.PeekSortKey().priority();
    if (!outer_->task_tracker_->CanRunPriority(priority) ||
        (priority == TaskPriority::BEST_EFFORT &&
         outer_->num_running_best_effort_tasks_ >=
             outer_->max_best_effort_tasks_)) {
      break;
    }

    task_source = outer_->TakeRegisteredTaskSource(executor);
  }
  if (!task_source) {
    OnWorkerBecomesIdleLockRequired(executor, worker);
    return nullptr;
  }

  // Running task bookkeeping.
  outer_->IncrementTasksRunningLockRequired(priority);

  write_worker().current_task_priority = priority;
  write_worker().current_shutdown_behavior = task_source->shutdown_behavior();

  // Subtle: This must be after the call to WillRunTask() inside
  // TakeRegisteredTaskSource(), so that any state used by WillRunTask() to
  // determine that the task source must remain in the TaskQueue is also used
  // to determine the desired number of workers. Concretely, this wouldn't
  // work:
  //
  //   Thread 1: GetWork() calls EnsureEnoughWorkers(). No worker woken up
  //             because the queue contains a job with max concurrency = 1 and
  //             the current worker is awake.
  //   Thread 2: Increases the job's max concurrency.
  //             ShouldQueueUponCapacityIncrease() returns false because the
  //             job is already queued.
  //   Thread 1: Calls WillRunTask() on the job. It returns
  //             kAllowedNotSaturated because max concurrency is not reached.
  //             But no extra worker is woken up to run the job!
  outer_->EnsureEnoughWorkersLockRequired(executor);

  return task_source;
}

RegisteredTaskSource ThreadGroupImpl::WorkerDelegate::SwapProcessedTask(
    RegisteredTaskSource task_source,
    WorkerThread* worker) {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  DCHECK(read_worker().current_task_priority);
  DCHECK(read_worker().current_shutdown_behavior);

  // A transaction to the TaskSource to reenqueue, if any. Instantiated here as
  // |TaskSource::lock_| is a UniversalPredecessor and must always be acquired
  // prior to acquiring a second lock
  std::optional<RegisteredTaskSourceAndTransaction>
      transaction_with_task_source;
  if (task_source) {
    transaction_with_task_source.emplace(
        RegisteredTaskSourceAndTransaction::FromTaskSource(
            std::move(task_source)));
  }

  // Calling WakeUp() guarantees that this WorkerThread will run Tasks from
  // TaskSources returned by the GetWork() method of |delegate_| until it
  // returns nullptr. Resetting |wake_up_event_| here doesn't break this
  // invariant and avoids a useless loop iteration before going to sleep if
  // WakeUp() is called while this WorkerThread is awake.
  wake_up_event_.Reset();

  ScopedCommandsExecutor workers_executor(outer_.get());
  ScopedReenqueueExecutor reenqueue_executor;
  RegisteredTaskSource next_task_source;
  {
    CheckedAutoLock auto_lock(outer_->lock_);

    // During shutdown, max_tasks may have been incremented in
    // OnShutdownStartedLockRequired().
    if (incremented_max_tasks_for_shutdown_) {
      DCHECK(outer_->shutdown_started_);
      outer_->DecrementMaxTasksLockRequired();
      if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
        outer_->DecrementMaxBestEffortTasksLockRequired();
      }
      incremented_max_tasks_since_blocked_ = false;
      incremented_max_best_effort_tasks_since_blocked_ = false;
      incremented_max_tasks_for_shutdown_ = false;
    }

    DCHECK(read_worker().blocking_start_time.is_null());
    DCHECK(!incremented_max_tasks_since_blocked_);
    DCHECK(!incremented_max_best_effort_tasks_since_blocked_);

    // Running task bookkeeping.
    outer_->DecrementTasksRunningLockRequired(
        *read_worker().current_task_priority);
    write_worker().current_shutdown_behavior = std::nullopt;
    write_worker().current_task_priority = std::nullopt;

    if (transaction_with_task_source) {
      outer_->ReEnqueueTaskSourceLockRequired(
          &workers_executor, &reenqueue_executor,
          std::move(transaction_with_task_source.value()));
    }

    next_task_source = GetWorkLockRequired(&workers_executor,
                                           static_cast<WorkerThread*>(worker));
  }
  // Must be called without holding a lock.
  if (outer_->thread_group_profiler_ && !task_source) {
    outer_->thread_group_profiler_->OnWorkerThreadIdle(worker);
  }
  return next_task_source;
}

bool ThreadGroupImpl::WorkerDelegate::CanCleanupLockRequired(
    const WorkerThread* worker) {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  if (!is_excess_) {
    return false;
  }

  const TimeTicks last_used_time = worker->GetLastUsedTime();
  if (last_used_time.is_null() ||
      subtle::TimeTicksNowIgnoringOverride() - last_used_time <
          outer_->after_start().suggested_reclaim_time) {
    return false;
  }
  if (!outer_->worker_cleanup_disallowed_for_testing_) [[likely]] {
    return true;
  }
  return false;
}

void ThreadGroupImpl::WorkerDelegate::CleanupLockRequired(
    BaseScopedCommandsExecutor* executor,
    WorkerThread* worker_base) {
  WorkerThread* worker = static_cast<WorkerThread*>(worker_base);
  DCHECK(!outer_->join_for_testing_started_);
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);

  worker->Cleanup();

  if (outer_->IsOnIdleSetLockRequired(worker)) {
    outer_->idle_workers_set_.Remove(worker);
  }

  // Remove the worker from |workers_|.
  auto worker_iter = std::ranges::find(outer_->workers_, worker);
  CHECK(worker_iter != outer_->workers_.end());
  outer_->workers_.erase(worker_iter);

#if BUILDFLAG(ARKWEB_PERFORMANCE_SCHEDULING) && !BUILDFLAG(ARKWEB_TEST)
  if (worker->GetRealTid() != kInvalidThreadId && base::ohos::IsPcDevice()) {
    outer_->destroy_workers_ids_.push_back(worker->GetRealTid());
  }
#endif
}

void ThreadGroupImpl::WorkerDelegate::OnWorkerBecomesIdleLockRequired(
    BaseScopedCommandsExecutor* executor,
    WorkerThread* worker_base) {
  WorkerThread* worker = static_cast<WorkerThread*>(worker_base);

  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  DCHECK(!outer_->idle_workers_set_.Contains(worker));

  // Add the worker to the idle set.
  outer_->idle_workers_set_.Insert(worker);
  DCHECK_LE(outer_->idle_workers_set_.Size(), outer_->workers_.size());
  outer_->idle_workers_set_cv_for_testing_.Broadcast();
}

void ThreadGroupImpl::WorkerDelegate::RecordUnnecessaryWakeup() {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);

  base::BooleanHistogram::FactoryGet(
      outer_->unnecessary_wakeup_histogram_label_,
      base::Histogram::kUmaTargetedHistogramFlag)
      ->Add(true);

  TRACE_EVENT_INSTANT("wakeup.flow", "ThreadPool.UnnecessaryWakeup");
}

void ThreadGroupImpl::WorkerDelegate::BlockingStarted(
    BlockingType blocking_type) {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  DCHECK(worker_only().worker_thread_);
  // Skip if this blocking scope happened outside of a RunTask.
  if (!read_worker().current_task_priority) {
    return;
  }

  worker_only().worker_thread_->MaybeUpdateThreadType();

  // WillBlock is always used when time overrides is active. crbug.com/1038867
  if (base::subtle::ScopedTimeClockOverrides::overrides_active()) {
    blocking_type = BlockingType::WILL_BLOCK;
  }

  ScopedCommandsExecutor executor(outer_.get());
  CheckedAutoLock auto_lock(outer_->lock_);

  DCHECK(!incremented_max_tasks_since_blocked_);
  DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
  DCHECK(read_worker().blocking_start_time.is_null());
  write_worker().blocking_start_time = subtle::TimeTicksNowIgnoringOverride();

  if (incremented_max_tasks_for_shutdown_) {
    return;
  }

  if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT) {
    ++outer_->num_unresolved_best_effort_may_block_;
  }

  if (blocking_type == BlockingType::WILL_BLOCK) {
    incremented_max_tasks_since_blocked_ = true;
    outer_->IncrementMaxTasksLockRequired();
    outer_->EnsureEnoughWorkersLockRequired(&executor);
  } else {
    ++outer_->num_unresolved_may_block_;
  }

  outer_->MaybeScheduleAdjustMaxTasksLockRequired(&executor);
}

void ThreadGroupImpl::WorkerDelegate::BlockingTypeUpgraded() {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  // Skip if this blocking scope happened outside of a RunTask.
  if (!read_worker().current_task_priority) {
    return;
  }

  // The blocking type always being WILL_BLOCK in this experiment and with
  // time overrides, it should never be considered "upgraded".
  if (base::subtle::ScopedTimeClockOverrides::overrides_active()) {
    return;
  }

  ScopedCommandsExecutor executor(outer_.get());
  CheckedAutoLock auto_lock(outer_->lock_);

  // Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the
  // same scope already caused the max tasks to be incremented.
  if (incremented_max_tasks_since_blocked_) {
    return;
  }

  // Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the
  // same scope.
  --outer_->num_unresolved_may_block_;

  incremented_max_tasks_since_blocked_ = true;
  outer_->IncrementMaxTasksLockRequired();
  outer_->EnsureEnoughWorkersLockRequired(&executor);
}

void ThreadGroupImpl::WorkerDelegate::BlockingEnded() {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  // Skip if this blocking scope happened outside of a RunTask.
  if (!read_worker().current_task_priority) {
    return;
  }

  CheckedAutoLock auto_lock(outer_->lock_);
  DCHECK(!read_worker().blocking_start_time.is_null());
  write_worker().blocking_start_time = TimeTicks();
  if (!incremented_max_tasks_for_shutdown_) {
    if (incremented_max_tasks_since_blocked_) {
      outer_->DecrementMaxTasksLockRequired();
    } else {
      --outer_->num_unresolved_may_block_;
    }

    if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
      if (incremented_max_best_effort_tasks_since_blocked_) {
        outer_->DecrementMaxBestEffortTasksLockRequired();
      } else {
        --outer_->num_unresolved_best_effort_may_block_;
      }
    }
  }

  incremented_max_tasks_since_blocked_ = false;
  incremented_max_best_effort_tasks_since_blocked_ = false;
}

// BlockingObserver:
// Notifies the worker of shutdown, possibly marking the running task as
// MAY_BLOCK.
void ThreadGroupImpl::WorkerDelegate::OnShutdownStartedLockRequired(
    BaseScopedCommandsExecutor* executor) {
  if (!read_any().is_running_task()) {
    return;
  }
  // Workers running a CONTINUE_ON_SHUTDOWN tasks are replaced by incrementing
  // max_tasks/max_best_effort_tasks. The effect is reverted in
  // DidProcessTask().
  if (*read_any().current_shutdown_behavior ==
      TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
    incremented_max_tasks_for_shutdown_ = true;
    IncrementMaxTasksLockRequired();
  }
}

// Increments max [best effort] tasks iff this worker has been within a
// ScopedBlockingCall for more than |may_block_threshold|.
void ThreadGroupImpl::WorkerDelegate::MaybeIncrementMaxTasksLockRequired()
    EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
  if (read_any().blocking_start_time.is_null() ||
      subtle::TimeTicksNowIgnoringOverride() - read_any().blocking_start_time <
          outer_->after_start().may_block_threshold) {
    return;
  }
  IncrementMaxTasksLockRequired();
}

// Increments max [best effort] tasks.
void ThreadGroupImpl::WorkerDelegate::IncrementMaxTasksLockRequired()
    EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
  if (!incremented_max_tasks_since_blocked_) {
    outer_->IncrementMaxTasksLockRequired();
    // Update state for an unresolved ScopedBlockingCall.
    if (!read_any().blocking_start_time.is_null()) {
      incremented_max_tasks_since_blocked_ = true;
      --outer_->num_unresolved_may_block_;
    }
  }
  if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT &&
      !incremented_max_best_effort_tasks_since_blocked_) {
    outer_->IncrementMaxBestEffortTasksLockRequired();
    // Update state for an unresolved ScopedBlockingCall.
    if (!read_any().blocking_start_time.is_null()) {
      incremented_max_best_effort_tasks_since_blocked_ = true;
      --outer_->num_unresolved_best_effort_may_block_;
    }
  }
}

void ThreadGroupImpl::JoinForTesting() {
  // profiler needs to shutdown first to not block worker thread joins.
  if (thread_group_profiler_) {
    thread_group_profiler_->Shutdown();
  }
  decltype(workers_) workers_copy;
  {
    CheckedAutoLock auto_lock(lock_);
    priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();

    DCHECK_GT(workers_.size(), size_t(0))
        << "Joined an unstarted thread group.";

    join_for_testing_started_ = true;

    // Ensure WorkerThreads in |workers_| do not attempt to cleanup while
    // being joined.
    worker_cleanup_disallowed_for_testing_ = true;

    // Make a copy of the WorkerThreads so that we can call
    // WorkerThread::JoinForTesting() without holding |lock_| since
    // WorkerThreads may need to access |workers_|.
    workers_copy = workers_;
  }
  for (const auto& worker : workers_copy) {
    static_cast<WorkerThread*>(worker.get())->JoinForTesting();
  }

  CheckedAutoLock auto_lock(lock_);
  DCHECK(workers_ == workers_copy);
  // Release |workers_| to clear their TrackedRef against |this|.
  workers_.clear();
}

size_t ThreadGroupImpl::NumberOfIdleWorkersLockRequiredForTesting() const {
  return idle_workers_set_.Size();
}

void ThreadGroupImpl::MaintainAtLeastOneIdleWorkerLockRequired(
    ScopedCommandsExecutor* executor) {
  if (workers_.size() == kMaxNumberOfWorkers) {
    return;
  }
  DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);

  if (!idle_workers_set_.IsEmpty()) {
    return;
  }

  if (workers_.size() >= max_tasks_) {
    return;
  }

  scoped_refptr<WorkerThread> new_worker =
      CreateAndRegisterWorkerLockRequired(executor);
  DCHECK(new_worker);
  idle_workers_set_.Insert(new_worker.get());
}

scoped_refptr<WorkerThread>
ThreadGroupImpl::CreateAndRegisterWorkerLockRequired(
    ScopedCommandsExecutor* executor) {
  DCHECK(!join_for_testing_started_);
  DCHECK_LT(workers_.size(), max_tasks_);
  DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
  DCHECK(idle_workers_set_.IsEmpty());

  // WorkerThread needs |lock_| as a predecessor for its thread lock because in
  // GetWork(), |lock_| is first acquired and then the thread lock is acquired
  // when GetLastUsedTime() is called on the worker by CanGetWorkLockRequired().
  scoped_refptr<WorkerThread> worker = MakeRefCounted<WorkerThread>(
      thread_type_hint_,
      std::make_unique<WorkerDelegate>(
          tracked_ref_factory_.GetTrackedRef(),
          /* is_excess=*/workers_.size() >= after_start().initial_max_tasks),
      task_tracker_, worker_sequence_num_++, &lock_);

  workers_.push_back(worker);
#if BUILDFLAG(ARKWEB_PERFORMANCE_SCHEDULING) && !BUILDFLAG(ARKWEB_TEST)
  if (worker && (base::ohos::IsPcDevice() || cmd_enable_report_thread_pool_)) {
    create_workers_.push_back(worker);
  }
#endif
  executor->ScheduleStart(worker);
  DCHECK_LE(workers_.size(), max_tasks_);

  return worker;
}

size_t ThreadGroupImpl::GetNumAwakeWorkersLockRequired() const {
  DCHECK_GE(workers_.size(), idle_workers_set_.Size());
  size_t num_awake_workers = workers_.size() - idle_workers_set_.Size();
  DCHECK_GE(num_awake_workers, num_running_tasks_);
  return num_awake_workers;
}

void ThreadGroupImpl::DidUpdateCanRunPolicy() {
  ScopedCommandsExecutor executor(this);
  CheckedAutoLock auto_lock(lock_);
  EnsureEnoughWorkersLockRequired(&executor);
}

void ThreadGroupImpl::OnShutdownStarted() {
  ScopedCommandsExecutor executor(this);
  CheckedAutoLock auto_lock(lock_);

  // Don't do anything if the thread group isn't started.
  if (max_tasks_ == 0) {
    return;
  }
  if (join_for_testing_started_) [[unlikely]] {
    return;
  }

  if (thread_group_profiler_) {
    thread_group_profiler_->Shutdown();
  }

  // Start a MAY_BLOCK scope on each worker that is already running a task.
  for (scoped_refptr<WorkerThread>& worker : workers_) {
    // The delegates of workers inside a ThreadGroupImpl should be
    // `WorkerDelegate`s.
    WorkerDelegate* delegate = static_cast<WorkerDelegate*>(worker->delegate());
    AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
    delegate->OnShutdownStartedLockRequired(&executor);
  }
  EnsureEnoughWorkersLockRequired(&executor);

  shutdown_started_ = true;
}

void ThreadGroupImpl::EnsureEnoughWorkersLockRequired(
    BaseScopedCommandsExecutor* base_executor) {
  // Don't do anything if the thread group isn't started.
  if (max_tasks_ == 0) {
    return;
  }
#if DCHECK_IS_ON()
  // CHECK() that Start() is complete, if workers are to be created.
  after_start();
#endif
  if (join_for_testing_started_) [[unlikely]] {
    return;
  }

  ScopedCommandsExecutor* executor =
      static_cast<ScopedCommandsExecutor*>(base_executor);

  const size_t desired_num_awake_workers =
      GetDesiredNumAwakeWorkersLockRequired();
  const size_t num_awake_workers = GetNumAwakeWorkersLockRequired();

  size_t num_workers_to_wake_up =
      ClampSub(desired_num_awake_workers, num_awake_workers);
  num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(2U));

  // Wake up the appropriate number of workers.
  for (size_t i = 0; i < num_workers_to_wake_up; ++i) {
    MaintainAtLeastOneIdleWorkerLockRequired(executor);
    WorkerThread* worker_to_wakeup = idle_workers_set_.Take();
    DCHECK(worker_to_wakeup);
    executor->ScheduleWakeUp(worker_to_wakeup);
  }

  // In the case where the loop above didn't wake up any worker and we don't
  // have excess workers, the idle worker should be maintained. This happens
  // when called from the last worker awake, or a recent increase in |max_tasks|
  // now makes it possible to keep an idle worker.
  if (desired_num_awake_workers == num_awake_workers) {
    MaintainAtLeastOneIdleWorkerLockRequired(executor);
  }

  // This function is called every time a task source is (re-)enqueued,
  // hence the minimum priority needs to be updated.
  UpdateMinAllowedPriorityLockRequired();

  // Ensure that the number of workers is periodically adjusted if needed.
  MaybeScheduleAdjustMaxTasksLockRequired(executor);
}

bool ThreadGroupImpl::IsOnIdleSetLockRequired(WorkerThread* worker) const {
  // To avoid searching through the idle set : use GetLastUsedTime() not being
  // null (or being directly on top of the idle set) as a proxy for being on
  // the idle set.
  return idle_workers_set_.Peek() == worker ||
         !worker->GetLastUsedTime().is_null();
}

void ThreadGroupImpl::ScheduleAdjustMaxTasks() {
  // |adjust_max_tasks_posted_| can't change before the task posted below runs.
  DCHECK(TS_UNCHECKED_READ(adjust_max_tasks_posted_));

  after_start().service_thread_task_runner->PostDelayedTask(
      FROM_HERE, BindOnce(&ThreadGroupImpl::AdjustMaxTasks, Unretained(this)),
      after_start().blocked_workers_poll_period);
}

void ThreadGroupImpl::AdjustMaxTasks() {
  DCHECK(
      after_start().service_thread_task_runner->RunsTasksInCurrentSequence());

  ScopedCommandsExecutor executor(this);
  CheckedAutoLock auto_lock(lock_);
  DCHECK(adjust_max_tasks_posted_);
  adjust_max_tasks_posted_ = false;

  // Increment max tasks for each worker that has been within a MAY_BLOCK
  // ScopedBlockingCall for more than may_block_threshold.
  for (scoped_refptr<WorkerThread> worker : workers_) {
    // The delegates of workers inside a ThreadGroupImpl should be
    // `WorkerDelegate`s.
    WorkerDelegate* delegate = static_cast<WorkerDelegate*>(worker->delegate());
    AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
    delegate->MaybeIncrementMaxTasksLockRequired();
  }

  // Wake up workers according to the updated |max_tasks_|. This will also
  // reschedule AdjustMaxTasks() if necessary.
  EnsureEnoughWorkersLockRequired(&executor);
}

}  // namespace base::internal