cf03c126创建于 2025年4月16日历史提交
// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "base/task/thread_pool/thread_group_impl.h"

#include <stddef.h>

#include <algorithm>
#include <type_traits>
#include <utility>

#include "base/atomicops.h"
#include "base/auto_reset.h"
#include "base/compiler_specific.h"
#include "base/containers/stack_container.h"
#include "base/feature_list.h"
#include "base/functional/bind.h"
#include "base/functional/callback_helpers.h"
#include "base/location.h"
#include "base/memory/ptr_util.h"
#include "base/memory/raw_ptr.h"
#include "base/metrics/histogram_macros.h"
#include "base/numerics/clamped_math.h"
#include "base/ranges/algorithm.h"
#include "base/sequence_token.h"
#include "base/strings/string_piece.h"
#include "base/strings/string_util.h"
#include "base/strings/stringprintf.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool/task_tracker.h"
#include "base/threading/platform_thread.h"
#include "base/threading/scoped_blocking_call.h"
#include "base/threading/scoped_blocking_call_internal.h"
#include "base/threading/thread_checker.h"
#include "base/threading/thread_restrictions.h"
#include "base/time/time_override.h"
#include "build/build_config.h"
#include "third_party/abseil-cpp/absl/types/optional.h"

#if BUILDFLAG(IS_WIN)
#include "base/win/scoped_com_initializer.h"
#include "base/win/scoped_windows_thread_environment.h"
#include "base/win/scoped_winrt_initializer.h"
#include "base/win/windows_version.h"
#endif  // BUILDFLAG(IS_WIN)

namespace base {
namespace internal {

namespace {

constexpr size_t kMaxNumberOfWorkers = 256;

// In a background thread group:
// - Blocking calls take more time than in a foreground thread group.
// - We want to minimize impact on foreground work, not maximize execution
//   throughput.
// For these reasons, the timeout to increase the maximum number of concurrent
// tasks when there is a MAY_BLOCK ScopedBlockingCall is *long*. It is not
// infinite because execution throughput should not be reduced forever if a task
// blocks forever.
//
// TODO(fdoray): On platforms without background thread groups, blocking in a
// BEST_EFFORT task should:
// 1. Increment the maximum number of concurrent tasks after a *short* timeout,
//    to allow scheduling of USER_VISIBLE/USER_BLOCKING tasks.
// 2. Increment the maximum number of concurrent BEST_EFFORT tasks after a
//    *long* timeout, because we only want to allow more BEST_EFFORT tasks to be
//    be scheduled concurrently when we believe that a BEST_EFFORT task is
//    blocked forever.
// Currently, only 1. is true as the configuration is per thread group.
// TODO(https://crbug.com/927755): Fix racy condition when MayBlockThreshold ==
// BlockedWorkersPoll.
constexpr TimeDelta kForegroundMayBlockThreshold = Milliseconds(1000);
constexpr TimeDelta kForegroundBlockedWorkersPoll = Milliseconds(1200);
constexpr TimeDelta kBackgroundMayBlockThreshold = Seconds(10);
constexpr TimeDelta kBackgroundBlockedWorkersPoll = Seconds(12);

// Only used in DCHECKs.
bool ContainsWorker(const std::vector<scoped_refptr<WorkerThread>>& workers,
                    const WorkerThread* worker) {
  auto it =
      ranges::find_if(workers, [worker](const scoped_refptr<WorkerThread>& i) {
        return i.get() == worker;
      });
  return it != workers.end();
}

}  // namespace

// Upon destruction, executes actions that control the number of active workers.
// Useful to satisfy locking requirements of these actions.
class ThreadGroupImpl::ScopedCommandsExecutor
    : public ThreadGroup::BaseScopedCommandsExecutor {
 public:
  explicit ScopedCommandsExecutor(ThreadGroupImpl* outer) : outer_(outer) {}

  ScopedCommandsExecutor(const ScopedCommandsExecutor&) = delete;
  ScopedCommandsExecutor& operator=(const ScopedCommandsExecutor&) = delete;
  ~ScopedCommandsExecutor() { FlushImpl(); }

  void ScheduleWakeUp(scoped_refptr<WorkerThread> worker) {
    workers_to_wake_up_.AddWorker(std::move(worker));
  }

  void ScheduleStart(scoped_refptr<WorkerThread> worker) {
    workers_to_start_.AddWorker(std::move(worker));
  }

  void FlushWorkerCreation(CheckedLock* held_lock) {
    if (workers_to_wake_up_.empty() && workers_to_start_.empty())
      return;
    CheckedAutoUnlock auto_unlock(*held_lock);
    FlushImpl();
    workers_to_wake_up_.clear();
    workers_to_start_.clear();
    must_schedule_adjust_max_tasks_ = false;
  }

  void ScheduleAdjustMaxTasks() {
    DCHECK(!must_schedule_adjust_max_tasks_);
    must_schedule_adjust_max_tasks_ = true;
  }

 private:
  class WorkerContainer {
   public:
    WorkerContainer() = default;
    WorkerContainer(const WorkerContainer&) = delete;
    WorkerContainer& operator=(const WorkerContainer&) = delete;

    void AddWorker(scoped_refptr<WorkerThread> worker) {
      if (!worker)
        return;
      if (!first_worker_)
        first_worker_ = std::move(worker);
      else
        additional_workers_.push_back(std::move(worker));
    }

    template <typename Action>
    void ForEachWorker(Action action) {
      if (first_worker_) {
        action(first_worker_.get());
        for (scoped_refptr<WorkerThread> worker : additional_workers_)
          action(worker.get());
      } else {
        DCHECK(additional_workers_.empty());
      }
    }

    bool empty() const { return first_worker_ == nullptr; }

    void clear() {
      first_worker_.reset();
      additional_workers_.clear();
    }

   private:
    // The purpose of |first_worker| is to avoid a heap allocation by the vector
    // in the case where there is only one worker in the container.
    scoped_refptr<WorkerThread> first_worker_;
    std::vector<scoped_refptr<WorkerThread>> additional_workers_;
  };

  void FlushImpl() {
    CheckedLock::AssertNoLockHeldOnCurrentThread();

    // Wake up workers.
    workers_to_wake_up_.ForEachWorker(
        [](WorkerThread* worker) { worker->WakeUp(); });

    // Start workers. Happens after wake ups to prevent the case where a worker
    // enters its main function, is descheduled because it wasn't woken up yet,
    // and is woken up immediately after.
    workers_to_start_.ForEachWorker([&](WorkerThread* worker) {
      worker->Start(outer_->after_start().service_thread_task_runner,
                    outer_->after_start().worker_thread_observer);
      if (outer_->worker_started_for_testing_)
        outer_->worker_started_for_testing_->Wait();
    });

    if (must_schedule_adjust_max_tasks_)
      outer_->ScheduleAdjustMaxTasks();
  }

  const raw_ptr<ThreadGroupImpl> outer_;

  WorkerContainer workers_to_wake_up_;
  WorkerContainer workers_to_start_;
  bool must_schedule_adjust_max_tasks_ = false;
};

class ThreadGroupImpl::WorkerThreadDelegateImpl : public WorkerThread::Delegate,
                                                  public BlockingObserver {
 public:
  // |outer| owns the worker for which this delegate is constructed. If
  // |is_excess| is true, this worker will be eligible for reclaim.
  explicit WorkerThreadDelegateImpl(TrackedRef<ThreadGroupImpl> outer,
                                    bool is_excess);
  WorkerThreadDelegateImpl(const WorkerThreadDelegateImpl&) = delete;
  WorkerThreadDelegateImpl& operator=(const WorkerThreadDelegateImpl&) = delete;

  // OnMainExit() handles the thread-affine cleanup; WorkerThreadDelegateImpl
  // can thereafter safely be deleted from any thread.
  ~WorkerThreadDelegateImpl() override = default;

  // WorkerThread::Delegate:
  WorkerThread::ThreadLabel GetThreadLabel() const override;
  void OnMainEntry(WorkerThread* worker) override;
  RegisteredTaskSource GetWork(WorkerThread* worker) override;
  void DidProcessTask(RegisteredTaskSource task_source) override;
  TimeDelta GetSleepTimeout() override;
  void OnMainExit(WorkerThread* worker) override;
  void RecordUnnecessaryWakeup() override;

  // BlockingObserver:
  void BlockingStarted(BlockingType blocking_type) override;
  void BlockingTypeUpgraded() override;
  void BlockingEnded() override;

  // Notifies the worker of shutdown, possibly marking the running task as
  // MAY_BLOCK.
  void OnShutdownStartedLockRequired(ScopedCommandsExecutor* executor)
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  // Returns true iff the worker can get work. Cleans up the worker or puts it
  // on the idle set if it can't get work.
  bool CanGetWorkLockRequired(ScopedCommandsExecutor* executor,
                              WorkerThread* worker)
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  // Increments max [best effort] tasks iff this worker has been within a
  // ScopedBlockingCall for more than |may_block_threshold|.
  void MaybeIncrementMaxTasksLockRequired()
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  // Increments max [best effort] tasks.
  void IncrementMaxTasksLockRequired() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  TaskPriority current_task_priority_lock_required() const
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
    return *read_any().current_task_priority;
  }

  // True if this worker is be eligible for reclaim.
  bool is_excess() const { return is_excess_; }

  // Exposed for AnnotateAcquiredLockAlias
  const CheckedLock& lock() const LOCK_RETURNED(outer_->lock_) {
    return outer_->lock_;
  }

 private:
  // Returns true if |worker| is allowed to cleanup and remove itself from the
  // thread group. Called from GetWork() when no work is available.
  bool CanCleanupLockRequired(const WorkerThread* worker) const
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  // Calls cleanup on |worker| and removes it from the thread group. Called from
  // GetWork() when no work is available and CanCleanupLockRequired() returns
  // true.
  void CleanupLockRequired(ScopedCommandsExecutor* executor,
                           WorkerThread* worker)
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  // Called in GetWork() when a worker becomes idle.
  void OnWorkerBecomesIdleLockRequired(ScopedCommandsExecutor* executor,
                                       WorkerThread* worker)
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_);

  // Accessed only from the worker thread.
  struct WorkerOnly {
    // Associated WorkerThread, if any, initialized in OnMainEntry().
    raw_ptr<WorkerThread> worker_thread_;

#if BUILDFLAG(IS_WIN)
    std::unique_ptr<win::ScopedWindowsThreadEnvironment> win_thread_environment;
#endif  // BUILDFLAG(IS_WIN)
  } worker_only_;

  // Writes from the worker thread protected by |outer_->lock_|. Reads from any
  // thread, protected by |outer_->lock_| when not on the worker thread.
  struct WriteWorkerReadAny {
    // The priority of the task the worker is currently running if any.
    absl::optional<TaskPriority> current_task_priority;
    // The shutdown behavior of the task the worker is currently running if any.
    absl::optional<TaskShutdownBehavior> current_shutdown_behavior;

    // Time when MayBlockScopeEntered() was last called. Reset when
    // BlockingScopeExited() is called.
    TimeTicks blocking_start_time;

    // Whether the worker is currently running a task (i.e. GetWork() has
    // returned a non-empty task source and DidProcessTask() hasn't been called
    // yet).
    bool is_running_task() const { return !!current_shutdown_behavior; }
  } write_worker_read_any_;

  WorkerOnly& worker_only() {
    DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
    return worker_only_;
  }

  WriteWorkerReadAny& write_worker() EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
    DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
    return write_worker_read_any_;
  }

  const WriteWorkerReadAny& read_any() const
      EXCLUSIVE_LOCKS_REQUIRED(outer_->lock_) {
    return write_worker_read_any_;
  }

  const WriteWorkerReadAny& read_worker() const {
    DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
    return write_worker_read_any_;
  }

  const TrackedRef<ThreadGroupImpl> outer_;

  const bool is_excess_;

  // Whether |outer_->max_tasks_|/|outer_->max_best_effort_tasks_| were
  // incremented due to a ScopedBlockingCall on the thread.
  bool incremented_max_tasks_since_blocked_ GUARDED_BY(outer_->lock_) = false;
  bool incremented_max_best_effort_tasks_since_blocked_
      GUARDED_BY(outer_->lock_) = false;
  // Whether |outer_->max_tasks_| and |outer_->max_best_effort_tasks_| was
  // incremented due to running CONTINUE_ON_SHUTDOWN on the thread during
  // shutdown.
  bool incremented_max_tasks_for_shutdown_ GUARDED_BY(outer_->lock_) = false;

  // Verifies that specific calls are always made from the worker thread.
  THREAD_CHECKER(worker_thread_checker_);
};

ThreadGroupImpl::ThreadGroupImpl(StringPiece histogram_label,
                                 StringPiece thread_group_label,
                                 ThreadType thread_type_hint,
                                 TrackedRef<TaskTracker> task_tracker,
                                 TrackedRef<Delegate> delegate,
                                 ThreadGroup* predecessor_thread_group)
    : ThreadGroup(std::move(task_tracker),
                  std::move(delegate),
                  predecessor_thread_group),
      histogram_label_(histogram_label),
      thread_group_label_(thread_group_label),
      thread_type_hint_(thread_type_hint),
      idle_workers_set_cv_for_testing_(lock_.CreateConditionVariable()),
      tracked_ref_factory_(this) {
  DCHECK(!thread_group_label_.empty());
}

void ThreadGroupImpl::Start(
    size_t max_tasks,
    size_t max_best_effort_tasks,
    TimeDelta suggested_reclaim_time,
    scoped_refptr<SingleThreadTaskRunner> service_thread_task_runner,
    WorkerThreadObserver* worker_thread_observer,
    WorkerEnvironment worker_environment,
    bool synchronous_thread_start_for_testing,
    absl::optional<TimeDelta> may_block_threshold) {
  ThreadGroup::Start();

  DCHECK(!replacement_thread_group_);

  in_start().no_worker_reclaim = FeatureList::IsEnabled(kNoWorkerThreadReclaim);
  in_start().may_block_threshold =
      may_block_threshold ? may_block_threshold.value()
                          : (thread_type_hint_ != ThreadType::kBackground
                                 ? kForegroundMayBlockThreshold
                                 : kBackgroundMayBlockThreshold);
  in_start().blocked_workers_poll_period =
      thread_type_hint_ != ThreadType::kBackground
          ? kForegroundBlockedWorkersPoll
          : kBackgroundBlockedWorkersPoll;

  ScopedCommandsExecutor executor(this);
  CheckedAutoLock auto_lock(lock_);

  DCHECK(workers_.empty());
  max_tasks_ = max_tasks;
  DCHECK_GE(max_tasks_, 1U);
  in_start().initial_max_tasks = max_tasks_;
  DCHECK_LE(in_start().initial_max_tasks, kMaxNumberOfWorkers);
  max_best_effort_tasks_ = max_best_effort_tasks;
  in_start().suggested_reclaim_time = suggested_reclaim_time;
  in_start().worker_environment = worker_environment;
  in_start().service_thread_task_runner = std::move(service_thread_task_runner);
  in_start().worker_thread_observer = worker_thread_observer;

#if DCHECK_IS_ON()
  in_start().initialized = true;
#endif

  if (synchronous_thread_start_for_testing) {
    worker_started_for_testing_.emplace(WaitableEvent::ResetPolicy::AUTOMATIC);
    // Don't emit a ScopedBlockingCallWithBaseSyncPrimitives from this
    // WaitableEvent or it defeats the purpose of having threads start without
    // externally visible side-effects.
    worker_started_for_testing_->declare_only_used_while_idle();
  }

  EnsureEnoughWorkersLockRequired(&executor);
}

ThreadGroupImpl::~ThreadGroupImpl() {
  // ThreadGroup should only ever be deleted:
  //  1) In tests, after JoinForTesting().
  //  2) In production, iff initialization failed.
  // In both cases |workers_| should be empty.
  DCHECK(workers_.empty());
}

void ThreadGroupImpl::UpdateSortKey(TaskSource::Transaction transaction) {
  ScopedCommandsExecutor executor(this);
  UpdateSortKeyImpl(&executor, std::move(transaction));
}

void ThreadGroupImpl::PushTaskSourceAndWakeUpWorkers(
    TransactionWithRegisteredTaskSource transaction_with_task_source) {
  ScopedCommandsExecutor executor(this);
  PushTaskSourceAndWakeUpWorkersImpl(&executor,
                                     std::move(transaction_with_task_source));
}

size_t ThreadGroupImpl::GetMaxConcurrentNonBlockedTasksDeprecated() const {
#if DCHECK_IS_ON()
  CheckedAutoLock auto_lock(lock_);
  DCHECK_NE(after_start().initial_max_tasks, 0U)
      << "GetMaxConcurrentTasksDeprecated() should only be called after the "
      << "thread group has started.";
#endif
  return after_start().initial_max_tasks;
}

void ThreadGroupImpl::WaitForWorkersIdleForTesting(size_t n) {
  CheckedAutoLock auto_lock(lock_);

#if DCHECK_IS_ON()
  DCHECK(!some_workers_cleaned_up_for_testing_)
      << "Workers detached prior to waiting for a specific number of idle "
         "workers. Doing the wait under such conditions is flaky. Consider "
         "setting the suggested reclaim time to TimeDelta::Max() in Start().";
#endif

  WaitForWorkersIdleLockRequiredForTesting(n);
}

void ThreadGroupImpl::WaitForAllWorkersIdleForTesting() {
  CheckedAutoLock auto_lock(lock_);
  WaitForWorkersIdleLockRequiredForTesting(workers_.size());
}

void ThreadGroupImpl::WaitForWorkersCleanedUpForTesting(size_t n) {
  CheckedAutoLock auto_lock(lock_);

  if (!num_workers_cleaned_up_for_testing_cv_)
    num_workers_cleaned_up_for_testing_cv_ = lock_.CreateConditionVariable();

  while (num_workers_cleaned_up_for_testing_ < n)
    num_workers_cleaned_up_for_testing_cv_->Wait();

  num_workers_cleaned_up_for_testing_ = 0;
}

void ThreadGroupImpl::JoinForTesting() {
  decltype(workers_) workers_copy;
  {
    CheckedAutoLock auto_lock(lock_);
    priority_queue_.EnableFlushTaskSourcesOnDestroyForTesting();

    DCHECK_GT(workers_.size(), size_t(0))
        << "Joined an unstarted thread group.";

    join_for_testing_started_ = true;

    // Ensure WorkerThreads in |workers_| do not attempt to cleanup while
    // being joined.
    worker_cleanup_disallowed_for_testing_ = true;

    // Make a copy of the WorkerThreads so that we can call
    // WorkerThread::JoinForTesting() without holding |lock_| since
    // WorkerThreads may need to access |workers_|.
    workers_copy = workers_;
  }
  for (const auto& worker : workers_copy)
    worker->JoinForTesting();

  CheckedAutoLock auto_lock(lock_);
  DCHECK(workers_ == workers_copy);
  // Release |workers_| to clear their TrackedRef against |this|.
  workers_.clear();
}

size_t ThreadGroupImpl::NumberOfWorkersForTesting() const {
  CheckedAutoLock auto_lock(lock_);
  return workers_.size();
}

size_t ThreadGroupImpl::GetMaxTasksForTesting() const {
  CheckedAutoLock auto_lock(lock_);
  return max_tasks_;
}

size_t ThreadGroupImpl::GetMaxBestEffortTasksForTesting() const {
  CheckedAutoLock auto_lock(lock_);
  return max_best_effort_tasks_;
}

size_t ThreadGroupImpl::NumberOfIdleWorkersForTesting() const {
  CheckedAutoLock auto_lock(lock_);
  return idle_workers_set_.Size();
}

ThreadGroupImpl::WorkerThreadDelegateImpl::WorkerThreadDelegateImpl(
    TrackedRef<ThreadGroupImpl> outer,
    bool is_excess)
    : outer_(std::move(outer)), is_excess_(is_excess) {
  // Bound in OnMainEntry().
  DETACH_FROM_THREAD(worker_thread_checker_);
}

WorkerThread::ThreadLabel
ThreadGroupImpl::WorkerThreadDelegateImpl::GetThreadLabel() const {
  return WorkerThread::ThreadLabel::POOLED;
}

void ThreadGroupImpl::WorkerThreadDelegateImpl::OnMainEntry(
    WorkerThread* worker) {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);

  {
#if DCHECK_IS_ON()
    CheckedAutoLock auto_lock(outer_->lock_);
    DCHECK(ContainsWorker(outer_->workers_, worker));
#endif
  }

#if BUILDFLAG(IS_WIN)
  worker_only().win_thread_environment = GetScopedWindowsThreadEnvironment(
      outer_->after_start().worker_environment);
#endif  // BUILDFLAG(IS_WIN)

  PlatformThread::SetName(
      StringPrintf("ThreadPool%sWorker", outer_->thread_group_label_.c_str()));

  outer_->BindToCurrentThread();
  worker_only().worker_thread_ = worker;
  SetBlockingObserverForCurrentThread(this);

  if (outer_->worker_started_for_testing_) {
    // When |worker_started_for_testing_| is set, the thread that starts workers
    // should wait for a worker to have started before starting the next one,
    // and there should only be one thread that wakes up workers at a time.
    DCHECK(!outer_->worker_started_for_testing_->IsSignaled());
    outer_->worker_started_for_testing_->Signal();
  }
}

RegisteredTaskSource ThreadGroupImpl::WorkerThreadDelegateImpl::GetWork(
    WorkerThread* worker) {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  DCHECK(!read_worker().current_task_priority);
  DCHECK(!read_worker().current_shutdown_behavior);

  ScopedCommandsExecutor executor(outer_.get());
  CheckedAutoLock auto_lock(outer_->lock_);

  DCHECK(ContainsWorker(outer_->workers_, worker));

  // Use this opportunity, before assigning work to this worker, to create/wake
  // additional workers if needed (doing this here allows us to reduce
  // potentially expensive create/wake directly on PostTask()).
  //
  // Note: FlushWorkerCreation() below releases |outer_->lock_|. It is thus
  // important that all other operations come after it to keep this method
  // transactional.
  outer_->EnsureEnoughWorkersLockRequired(&executor);
  executor.FlushWorkerCreation(&outer_->lock_);

  if (!CanGetWorkLockRequired(&executor, worker))
    return nullptr;

  RegisteredTaskSource task_source;
  TaskPriority priority;
  while (!task_source && !outer_->priority_queue_.IsEmpty()) {
    // Enforce the CanRunPolicy and that no more than |max_best_effort_tasks_|
    // BEST_EFFORT tasks run concurrently.
    priority = outer_->priority_queue_.PeekSortKey().priority();
    if (!outer_->task_tracker_->CanRunPriority(priority) ||
        (priority == TaskPriority::BEST_EFFORT &&
         outer_->num_running_best_effort_tasks_ >=
             outer_->max_best_effort_tasks_)) {
      break;
    }

    task_source = outer_->TakeRegisteredTaskSource(&executor);
  }
  if (!task_source) {
    OnWorkerBecomesIdleLockRequired(&executor, worker);
    return nullptr;
  }

  // Running task bookkeeping.
  outer_->IncrementTasksRunningLockRequired(priority);
  DCHECK(!outer_->idle_workers_set_.Contains(worker));
  write_worker().current_task_priority = priority;
  write_worker().current_shutdown_behavior = task_source->shutdown_behavior();

  return task_source;
}

void ThreadGroupImpl::WorkerThreadDelegateImpl::DidProcessTask(
    RegisteredTaskSource task_source) {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  DCHECK(read_worker().current_task_priority);
  DCHECK(read_worker().current_shutdown_behavior);

  // A transaction to the TaskSource to reenqueue, if any. Instantiated here as
  // |TaskSource::lock_| is a UniversalPredecessor and must always be acquired
  // prior to acquiring a second lock
  absl::optional<TransactionWithRegisteredTaskSource>
      transaction_with_task_source;
  if (task_source) {
    transaction_with_task_source.emplace(
        TransactionWithRegisteredTaskSource::FromTaskSource(
            std::move(task_source)));
  }

  ScopedCommandsExecutor workers_executor(outer_.get());
  ScopedReenqueueExecutor reenqueue_executor;
  CheckedAutoLock auto_lock(outer_->lock_);

  // During shutdown, max_tasks may have been incremented in StartShutdown().
  if (incremented_max_tasks_for_shutdown_) {
    DCHECK(outer_->shutdown_started_);
    outer_->DecrementMaxTasksLockRequired();
    if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
      outer_->DecrementMaxBestEffortTasksLockRequired();
    }
    incremented_max_tasks_since_blocked_ = false;
    incremented_max_best_effort_tasks_since_blocked_ = false;
    incremented_max_tasks_for_shutdown_ = false;
  }

  DCHECK(read_worker().blocking_start_time.is_null());
  DCHECK(!incremented_max_tasks_since_blocked_);
  DCHECK(!incremented_max_best_effort_tasks_since_blocked_);

  // Running task bookkeeping.
  outer_->DecrementTasksRunningLockRequired(
      *read_worker().current_task_priority);
  write_worker().current_shutdown_behavior = absl::nullopt;
  write_worker().current_task_priority = absl::nullopt;

  if (transaction_with_task_source) {
    outer_->ReEnqueueTaskSourceLockRequired(
        &workers_executor, &reenqueue_executor,
        std::move(transaction_with_task_source.value()));
  }
}

TimeDelta ThreadGroupImpl::WorkerThreadDelegateImpl::GetSleepTimeout() {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  if (!is_excess())
    return TimeDelta::Max();
  // Sleep for an extra 10% to avoid the following pathological case:
  //   0) A task is running on a timer which matches
  //      |after_start().suggested_reclaim_time|.
  //   1) The timer fires and this worker is created by
  //      MaintainAtLeastOneIdleWorkerLockRequired() because the last idle
  //      worker was assigned the task.
  //   2) This worker begins sleeping |after_start().suggested_reclaim_time| (at
  //      the front of the idle set).
  //   3) The task assigned to the other worker completes and the worker goes
  //      back in the idle set (this worker may now second on the idle set;
  //      its GetLastUsedTime() is set to Now()).
  //   4) The sleep in (2) expires. Since (3) was fast this worker is likely to
  //      have been second on the idle set long enough for
  //      CanCleanupLockRequired() to be satisfied in which case this worker is
  //      cleaned up.
  //   5) The timer fires at roughly the same time and we're back to (1) if (4)
  //      resulted in a clean up; causing thread churn.
  //
  //   Sleeping 10% longer in (2) makes it much less likely that (4) occurs
  //   before (5). In that case (5) will cause (3) and refresh this worker's
  //   GetLastUsedTime(), making CanCleanupLockRequired() return false in (4)
  //   and avoiding churn.
  //
  //   Of course the same problem arises if in (0) the timer matches
  //   |after_start().suggested_reclaim_time * 1.1| but it's expected that any
  //   timer slower than |after_start().suggested_reclaim_time| will cause such
  //   churn during long idle periods. If this is a problem in practice, the
  //   standby thread configuration and algorithm should be revisited.
  return outer_->after_start().suggested_reclaim_time * 1.1;
}

bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanCleanupLockRequired(
    const WorkerThread* worker) const {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  if (!is_excess())
    return false;

  const TimeTicks last_used_time = worker->GetLastUsedTime();
  return !last_used_time.is_null() &&
         subtle::TimeTicksNowIgnoringOverride() - last_used_time >=
             outer_->after_start().suggested_reclaim_time &&
         LIKELY(!outer_->worker_cleanup_disallowed_for_testing_);
}

void ThreadGroupImpl::WorkerThreadDelegateImpl::CleanupLockRequired(
    ScopedCommandsExecutor* executor,
    WorkerThread* worker) {
  DCHECK(!outer_->join_for_testing_started_);
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);

  worker->Cleanup();

  if (outer_->IsOnIdleSetLockRequired(worker))
    outer_->idle_workers_set_.Remove(worker);

  // Remove the worker from |workers_|.
  auto worker_iter = ranges::find(outer_->workers_, worker);
  DCHECK(worker_iter != outer_->workers_.end());
  outer_->workers_.erase(worker_iter);

#if BUILDFLAG(IS_OHOS)
  outer_->destroy_workers_.push_back(base::WrapRefCounted(worker));
#endif
}

void ThreadGroupImpl::WorkerThreadDelegateImpl::OnWorkerBecomesIdleLockRequired(
    ScopedCommandsExecutor* executor,
    WorkerThread* worker) {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  DCHECK(!outer_->idle_workers_set_.Contains(worker));

  // Add the worker to the idle set.
  outer_->idle_workers_set_.Insert(worker);
  DCHECK_LE(outer_->idle_workers_set_.Size(), outer_->workers_.size());
  outer_->idle_workers_set_cv_for_testing_->Broadcast();
}

void ThreadGroupImpl::WorkerThreadDelegateImpl::OnMainExit(
    WorkerThread* worker) {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);

#if DCHECK_IS_ON()
  {
    bool shutdown_complete = outer_->task_tracker_->IsShutdownComplete();
    CheckedAutoLock auto_lock(outer_->lock_);

    // |worker| should already have been removed from the idle workers set and
    // |workers_| by the time the thread is about to exit. (except in the cases
    // where the thread group is no longer going to be used - in which case,
    // it's fine for there to be invalid workers in the thread group.
    if (!shutdown_complete && !outer_->join_for_testing_started_) {
      DCHECK(!outer_->idle_workers_set_.Contains(worker));
      DCHECK(!ContainsWorker(outer_->workers_, worker));
    }
  }
#endif

#if BUILDFLAG(IS_WIN)
  worker_only().win_thread_environment.reset();
#endif  // BUILDFLAG(IS_WIN)

  // Count cleaned up workers for tests. It's important to do this here instead
  // of at the end of CleanupLockRequired() because some side-effects of
  // cleaning up happen outside the lock (e.g. recording histograms) and
  // resuming from tests must happen-after that point or checks on the main
  // thread will be flaky (crbug.com/1047733).
  CheckedAutoLock auto_lock(outer_->lock_);
  ++outer_->num_workers_cleaned_up_for_testing_;
#if DCHECK_IS_ON()
  outer_->some_workers_cleaned_up_for_testing_ = true;
#endif
  if (outer_->num_workers_cleaned_up_for_testing_cv_)
    outer_->num_workers_cleaned_up_for_testing_cv_->Signal();
}

void ThreadGroupImpl::WorkerThreadDelegateImpl::RecordUnnecessaryWakeup() {
  base::BooleanHistogram::FactoryGet(
      std::string("ThreadPool.UnnecessaryWakeup.") + outer_->histogram_label_,
      base::Histogram::kUmaTargetedHistogramFlag)
      ->Add(true);

  TRACE_EVENT_INSTANT("wakeup.flow", "ThreadPool.UnnecessaryWakeup");
}

void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingStarted(
    BlockingType blocking_type) {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  DCHECK(worker_only().worker_thread_);
  // Skip if this blocking scope happened outside of a RunTask.
  if (!read_worker().current_task_priority) {
    return;
  }

  worker_only().worker_thread_->MaybeUpdateThreadType();

  // WillBlock is always used when time overrides is active. crbug.com/1038867
  if (base::subtle::ScopedTimeClockOverrides::overrides_active()) {
    blocking_type = BlockingType::WILL_BLOCK;
  }

  ScopedCommandsExecutor executor(outer_.get());
  CheckedAutoLock auto_lock(outer_->lock_);

  DCHECK(!incremented_max_tasks_since_blocked_);
  DCHECK(!incremented_max_best_effort_tasks_since_blocked_);
  DCHECK(read_worker().blocking_start_time.is_null());
  write_worker().blocking_start_time = subtle::TimeTicksNowIgnoringOverride();

  if (incremented_max_tasks_for_shutdown_)
    return;

  if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT)
    ++outer_->num_unresolved_best_effort_may_block_;

  if (blocking_type == BlockingType::WILL_BLOCK) {
    incremented_max_tasks_since_blocked_ = true;
    outer_->IncrementMaxTasksLockRequired();
    outer_->EnsureEnoughWorkersLockRequired(&executor);
  } else {
    ++outer_->num_unresolved_may_block_;
  }

  outer_->MaybeScheduleAdjustMaxTasksLockRequired(&executor);
}

void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingTypeUpgraded() {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  // Skip if this blocking scope happened outside of a RunTask.
  if (!read_worker().current_task_priority) {
    return;
  }

  // The blocking type always being WILL_BLOCK in this experiment and with time
  // overrides, it should never be considered "upgraded".
  if (base::subtle::ScopedTimeClockOverrides::overrides_active())
    return;

  ScopedCommandsExecutor executor(outer_.get());
  CheckedAutoLock auto_lock(outer_->lock_);

  // Don't do anything if a MAY_BLOCK ScopedBlockingCall instantiated in the
  // same scope already caused the max tasks to be incremented.
  if (incremented_max_tasks_since_blocked_)
    return;

  // Cancel the effect of a MAY_BLOCK ScopedBlockingCall instantiated in the
  // same scope.
  --outer_->num_unresolved_may_block_;

  incremented_max_tasks_since_blocked_ = true;
  outer_->IncrementMaxTasksLockRequired();
  outer_->EnsureEnoughWorkersLockRequired(&executor);
}

void ThreadGroupImpl::WorkerThreadDelegateImpl::BlockingEnded() {
  DCHECK_CALLED_ON_VALID_THREAD(worker_thread_checker_);
  // Skip if this blocking scope happened outside of a RunTask.
  if (!read_worker().current_task_priority) {
    return;
  }

  CheckedAutoLock auto_lock(outer_->lock_);
  DCHECK(!read_worker().blocking_start_time.is_null());
  write_worker().blocking_start_time = TimeTicks();
  if (!incremented_max_tasks_for_shutdown_) {
    if (incremented_max_tasks_since_blocked_)
      outer_->DecrementMaxTasksLockRequired();
    else
      --outer_->num_unresolved_may_block_;

    if (*read_worker().current_task_priority == TaskPriority::BEST_EFFORT) {
      if (incremented_max_best_effort_tasks_since_blocked_)
        outer_->DecrementMaxBestEffortTasksLockRequired();
      else
        --outer_->num_unresolved_best_effort_may_block_;
    }
  }

  incremented_max_tasks_since_blocked_ = false;
  incremented_max_best_effort_tasks_since_blocked_ = false;
}

void ThreadGroupImpl::WorkerThreadDelegateImpl::OnShutdownStartedLockRequired(
    ScopedCommandsExecutor* executor) {
  if (!read_any().is_running_task())
    return;
  // Workers running a CONTINUE_ON_SHUTDOWN tasks are replaced by incrementing
  // max_tasks/max_best_effort_tasks. The effect is reverted in
  // DidProcessTask().
  if (*read_any().current_shutdown_behavior ==
      TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
    incremented_max_tasks_for_shutdown_ = true;
    IncrementMaxTasksLockRequired();
  }
}

bool ThreadGroupImpl::WorkerThreadDelegateImpl::CanGetWorkLockRequired(
    ScopedCommandsExecutor* executor,
    WorkerThread* worker) {
  const bool is_on_idle_workers_set = outer_->IsOnIdleSetLockRequired(worker);
  DCHECK_EQ(is_on_idle_workers_set, outer_->idle_workers_set_.Contains(worker));

  if (is_on_idle_workers_set) {
    if (CanCleanupLockRequired(worker))
      CleanupLockRequired(executor, worker);
    return false;
  }

  // If too many workers are running, this worker should not get work, until
  // tasks are no longer in excess (i.e. max tasks increases). This ensures that
  // if this worker is in excess, it gets a chance to being cleaned up.
  if (outer_->GetNumAwakeWorkersLockRequired() > outer_->max_tasks_) {
    OnWorkerBecomesIdleLockRequired(executor, worker);
    return false;
  }

  return true;
}

void ThreadGroupImpl::WorkerThreadDelegateImpl::
    MaybeIncrementMaxTasksLockRequired() {
  if (read_any().blocking_start_time.is_null() ||
      subtle::TimeTicksNowIgnoringOverride() - read_any().blocking_start_time <
          outer_->after_start().may_block_threshold) {
    return;
  }
  IncrementMaxTasksLockRequired();
}

void ThreadGroupImpl::WorkerThreadDelegateImpl::
    IncrementMaxTasksLockRequired() {
  if (!incremented_max_tasks_since_blocked_) {
    outer_->IncrementMaxTasksLockRequired();
    // Update state for an unresolved ScopedBlockingCall.
    if (!read_any().blocking_start_time.is_null()) {
      incremented_max_tasks_since_blocked_ = true;
      --outer_->num_unresolved_may_block_;
    }
  }
  if (*read_any().current_task_priority == TaskPriority::BEST_EFFORT &&
      !incremented_max_best_effort_tasks_since_blocked_) {
    outer_->IncrementMaxBestEffortTasksLockRequired();
    // Update state for an unresolved ScopedBlockingCall.
    if (!read_any().blocking_start_time.is_null()) {
      incremented_max_best_effort_tasks_since_blocked_ = true;
      --outer_->num_unresolved_best_effort_may_block_;
    }
  }
}

void ThreadGroupImpl::WaitForWorkersIdleLockRequiredForTesting(size_t n) {
  // Make sure workers do not cleanup while watching the idle count.
  AutoReset<bool> ban_cleanups(&worker_cleanup_disallowed_for_testing_, true);

  while (idle_workers_set_.Size() < n)
    idle_workers_set_cv_for_testing_->Wait();
}

void ThreadGroupImpl::MaintainAtLeastOneIdleWorkerLockRequired(
    ScopedCommandsExecutor* executor) {
  if (workers_.size() == kMaxNumberOfWorkers)
    return;
  DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);

  if (!idle_workers_set_.IsEmpty())
    return;

  if (workers_.size() >= max_tasks_)
    return;

  scoped_refptr<WorkerThread> new_worker =
      CreateAndRegisterWorkerLockRequired(executor);
  DCHECK(new_worker);
  idle_workers_set_.Insert(new_worker.get());
}

scoped_refptr<WorkerThread>
ThreadGroupImpl::CreateAndRegisterWorkerLockRequired(
    ScopedCommandsExecutor* executor) {
  DCHECK(!join_for_testing_started_);
  DCHECK_LT(workers_.size(), max_tasks_);
  DCHECK_LT(workers_.size(), kMaxNumberOfWorkers);
  DCHECK(idle_workers_set_.IsEmpty());

  // WorkerThread needs |lock_| as a predecessor for its thread lock
  // because in WakeUpOneWorker, |lock_| is first acquired and then
  // the thread lock is acquired when WakeUp is called on the worker.
  scoped_refptr<WorkerThread> worker = MakeRefCounted<WorkerThread>(
      thread_type_hint_,
      std::make_unique<WorkerThreadDelegateImpl>(
          tracked_ref_factory_.GetTrackedRef(),
          /* is_excess=*/after_start().no_worker_reclaim
              ? workers_.size() >= after_start().initial_max_tasks
              : true),
      task_tracker_, worker_sequence_num_++, &lock_);

  workers_.push_back(worker);
#if BUILDFLAG(IS_OHOS)
  create_workers_.push_back(worker);
#endif
  executor->ScheduleStart(worker);
  DCHECK_LE(workers_.size(), max_tasks_);

  return worker;
}

size_t ThreadGroupImpl::GetNumAwakeWorkersLockRequired() const {
  DCHECK_GE(workers_.size(), idle_workers_set_.Size());
  size_t num_awake_workers = workers_.size() - idle_workers_set_.Size();
  DCHECK_GE(num_awake_workers, num_running_tasks_);
  return num_awake_workers;
}

size_t ThreadGroupImpl::GetDesiredNumAwakeWorkersLockRequired() const {
  // Number of BEST_EFFORT task sources that are running or queued and allowed
  // to run by the CanRunPolicy.
  const size_t num_running_or_queued_can_run_best_effort_task_sources =
      num_running_best_effort_tasks_ +
      GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();

  const size_t workers_for_best_effort_task_sources =
      std::max(std::min(num_running_or_queued_can_run_best_effort_task_sources,
                        max_best_effort_tasks_),
               num_running_best_effort_tasks_);

  // Number of USER_{VISIBLE|BLOCKING} task sources that are running or queued.
  const size_t num_running_or_queued_foreground_task_sources =
      (num_running_tasks_ - num_running_best_effort_tasks_) +
      GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();

  const size_t workers_for_foreground_task_sources =
      num_running_or_queued_foreground_task_sources;

  return std::min({workers_for_best_effort_task_sources +
                       workers_for_foreground_task_sources,
                   max_tasks_, kMaxNumberOfWorkers});
}

void ThreadGroupImpl::DidUpdateCanRunPolicy() {
  ScopedCommandsExecutor executor(this);
  CheckedAutoLock auto_lock(lock_);
  EnsureEnoughWorkersLockRequired(&executor);
}

void ThreadGroupImpl::OnShutdownStarted() {
  ScopedCommandsExecutor executor(this);
  CheckedAutoLock auto_lock(lock_);

  // Don't do anything if the thread group isn't started.
  if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_))
    return;

  // Start a MAY_BLOCK scope on each worker that is already running a task.
  for (scoped_refptr<WorkerThread>& worker : workers_) {
    // The delegates of workers inside a ThreadGroupImpl should be
    // WorkerThreadDelegateImpls.
    WorkerThreadDelegateImpl* delegate =
        static_cast<WorkerThreadDelegateImpl*>(worker->delegate());
    AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
    delegate->OnShutdownStartedLockRequired(&executor);
  }
  EnsureEnoughWorkersLockRequired(&executor);

  shutdown_started_ = true;
}

void ThreadGroupImpl::EnsureEnoughWorkersLockRequired(
    BaseScopedCommandsExecutor* base_executor) {
  // Don't do anything if the thread group isn't started.
  if (max_tasks_ == 0 || UNLIKELY(join_for_testing_started_))
    return;

  ScopedCommandsExecutor* executor =
      static_cast<ScopedCommandsExecutor*>(base_executor);

  const size_t desired_num_awake_workers =
      GetDesiredNumAwakeWorkersLockRequired();
  const size_t num_awake_workers = GetNumAwakeWorkersLockRequired();

  size_t num_workers_to_wake_up =
      ClampSub(desired_num_awake_workers, num_awake_workers);
  num_workers_to_wake_up = std::min(num_workers_to_wake_up, size_t(2U));

  // Wake up the appropriate number of workers.
  for (size_t i = 0; i < num_workers_to_wake_up; ++i) {
    MaintainAtLeastOneIdleWorkerLockRequired(executor);
    WorkerThread* worker_to_wakeup = idle_workers_set_.Take();
    DCHECK(worker_to_wakeup);
    executor->ScheduleWakeUp(worker_to_wakeup);
  }

  // In the case where the loop above didn't wake up any worker and we don't
  // have excess workers, the idle worker should be maintained. This happens
  // when called from the last worker awake, or a recent increase in |max_tasks|
  // now makes it possible to keep an idle worker.
  if (desired_num_awake_workers == num_awake_workers)
    MaintainAtLeastOneIdleWorkerLockRequired(executor);

  // This function is called every time a task source is (re-)enqueued,
  // hence the minimum priority needs to be updated.
  UpdateMinAllowedPriorityLockRequired();

  // Ensure that the number of workers is periodically adjusted if needed.
  MaybeScheduleAdjustMaxTasksLockRequired(executor);
}

void ThreadGroupImpl::AdjustMaxTasks() {
  DCHECK(
      after_start().service_thread_task_runner->RunsTasksInCurrentSequence());

  ScopedCommandsExecutor executor(this);
  CheckedAutoLock auto_lock(lock_);
  DCHECK(adjust_max_tasks_posted_);
  adjust_max_tasks_posted_ = false;

  // Increment max tasks for each worker that has been within a MAY_BLOCK
  // ScopedBlockingCall for more than may_block_threshold.
  for (scoped_refptr<WorkerThread> worker : workers_) {
    // The delegates of workers inside a ThreadGroupImpl should be
    // WorkerThreadDelegateImpls.
    WorkerThreadDelegateImpl* delegate =
        static_cast<WorkerThreadDelegateImpl*>(worker->delegate());
    AnnotateAcquiredLockAlias annotate(lock_, delegate->lock());
    delegate->MaybeIncrementMaxTasksLockRequired();
  }

  // Wake up workers according to the updated |max_tasks_|. This will also
  // reschedule AdjustMaxTasks() if necessary.
  EnsureEnoughWorkersLockRequired(&executor);
}

void ThreadGroupImpl::ScheduleAdjustMaxTasks() {
  // |adjust_max_tasks_posted_| can't change before the task posted below runs.
  // Skip check on NaCl to avoid unsafe reference acquisition warning.
#if !BUILDFLAG(IS_NACL)
  DCHECK(TS_UNCHECKED_READ(adjust_max_tasks_posted_));
#endif

  after_start().service_thread_task_runner->PostDelayedTask(
      FROM_HERE, BindOnce(&ThreadGroupImpl::AdjustMaxTasks, Unretained(this)),
      after_start().blocked_workers_poll_period);
}

void ThreadGroupImpl::MaybeScheduleAdjustMaxTasksLockRequired(
    ScopedCommandsExecutor* executor) {
  if (!adjust_max_tasks_posted_ &&
      ShouldPeriodicallyAdjustMaxTasksLockRequired()) {
    executor->ScheduleAdjustMaxTasks();
    adjust_max_tasks_posted_ = true;
  }
}

bool ThreadGroupImpl::ShouldPeriodicallyAdjustMaxTasksLockRequired() {
  // AdjustMaxTasks() should be scheduled to periodically adjust |max_tasks_|
  // and |max_best_effort_tasks_| when (1) the concurrency limits are not large
  // enough to accommodate all queued and running task sources and an idle
  // worker and (2) there are unresolved MAY_BLOCK ScopedBlockingCalls.
  // - When (1) is false: No worker would be created or woken up if the
  //   concurrency limits were increased, so there is no hurry to increase them.
  // - When (2) is false: The concurrency limits could not be increased by
  //   AdjustMaxTasks().

  const size_t num_running_or_queued_best_effort_task_sources =
      num_running_best_effort_tasks_ +
      GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired();
  if (num_running_or_queued_best_effort_task_sources > max_best_effort_tasks_ &&
      num_unresolved_best_effort_may_block_ > 0) {
    return true;
  }

  const size_t num_running_or_queued_task_sources =
      num_running_tasks_ +
      GetNumAdditionalWorkersForBestEffortTaskSourcesLockRequired() +
      GetNumAdditionalWorkersForForegroundTaskSourcesLockRequired();
  constexpr size_t kIdleWorker = 1;
  return num_running_or_queued_task_sources + kIdleWorker > max_tasks_ &&
         num_unresolved_may_block_ > 0;
}

void ThreadGroupImpl::UpdateMinAllowedPriorityLockRequired() {
  if (priority_queue_.IsEmpty() || num_running_tasks_ < max_tasks_) {
    max_allowed_sort_key_.store(kMaxYieldSortKey, std::memory_order_relaxed);
  } else {
    max_allowed_sort_key_.store({priority_queue_.PeekSortKey().priority(),
                                 priority_queue_.PeekSortKey().worker_count()},
                                std::memory_order_relaxed);
  }
}

bool ThreadGroupImpl::IsOnIdleSetLockRequired(WorkerThread* worker) const {
  // To avoid searching through the idle set : use GetLastUsedTime() not being
  // null (or being directly on top of the idle set) as a proxy for being on
  // the idle set.
  return idle_workers_set_.Peek() == worker ||
         !worker->GetLastUsedTime().is_null();
}

void ThreadGroupImpl::DecrementTasksRunningLockRequired(TaskPriority priority) {
  DCHECK_GT(num_running_tasks_, 0U);
  --num_running_tasks_;
  if (priority == TaskPriority::BEST_EFFORT) {
    DCHECK_GT(num_running_best_effort_tasks_, 0U);
    --num_running_best_effort_tasks_;
  }
  UpdateMinAllowedPriorityLockRequired();
}

void ThreadGroupImpl::IncrementTasksRunningLockRequired(TaskPriority priority) {
  ++num_running_tasks_;
  DCHECK_LE(num_running_tasks_, max_tasks_);
  DCHECK_LE(num_running_tasks_, kMaxNumberOfWorkers);
  if (priority == TaskPriority::BEST_EFFORT) {
    ++num_running_best_effort_tasks_;
    DCHECK_LE(num_running_best_effort_tasks_, num_running_tasks_);
    DCHECK_LE(num_running_best_effort_tasks_, max_best_effort_tasks_);
  }
  UpdateMinAllowedPriorityLockRequired();
}

void ThreadGroupImpl::DecrementMaxTasksLockRequired() {
  DCHECK_GT(num_running_tasks_, 0U);
  DCHECK_GT(max_tasks_, 0U);
  --max_tasks_;
  UpdateMinAllowedPriorityLockRequired();
}

void ThreadGroupImpl::IncrementMaxTasksLockRequired() {
  DCHECK_GT(num_running_tasks_, 0U);
  ++max_tasks_;
  UpdateMinAllowedPriorityLockRequired();
}

void ThreadGroupImpl::DecrementMaxBestEffortTasksLockRequired() {
  DCHECK_GT(num_running_tasks_, 0U);
  DCHECK_GT(max_best_effort_tasks_, 0U);
  --max_best_effort_tasks_;
  UpdateMinAllowedPriorityLockRequired();
}

void ThreadGroupImpl::IncrementMaxBestEffortTasksLockRequired() {
  DCHECK_GT(num_running_tasks_, 0U);
  ++max_best_effort_tasks_;
  UpdateMinAllowedPriorityLockRequired();
}

ThreadGroupImpl::InitializedInStart::InitializedInStart() = default;
ThreadGroupImpl::InitializedInStart::~InitializedInStart() = default;

}  // namespace internal
}  // namespace base