#ifndef CC_RASTER_CATEGORIZED_WORKER_POOL_H_
#define CC_RASTER_CATEGORIZED_WORKER_POOL_H_
#include <memory>
#include <vector>
#include "base/containers/span.h"
#include "base/functional/callback.h"
#include "base/memory/raw_ptr.h"
#include "base/synchronization/condition_variable.h"
#include "base/task/post_job.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/task_runner.h"
#include "base/thread_annotations.h"
#include "base/threading/platform_thread.h"
#include "base/threading/simple_thread.h"
#include "cc/cc_export.h"
#include "cc/raster/task_category.h"
#include "cc/raster/task_graph_runner.h"
#include "cc/raster/task_graph_work_queue.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
namespace cc {
class CC_EXPORT CategorizedWorkerPool : public base::TaskRunner,
public TaskGraphRunner {
public:
class CC_EXPORT Delegate {
public:
virtual ~Delegate() = default;
virtual void NotifyThreadWillRun(base::PlatformThreadId tid) = 0;
};
CategorizedWorkerPool();
static CategorizedWorkerPool* GetOrCreate(Delegate* delegate = nullptr);
NamespaceToken GenerateNamespaceToken() override;
void WaitForTasksToFinishRunning(NamespaceToken token) override;
void CollectCompletedTasks(NamespaceToken token,
Task::Vector* completed_tasks) override;
virtual void FlushForTesting() = 0;
virtual void Start(int max_concurrency_foreground) = 0;
virtual void Shutdown() = 0;
TaskGraphRunner* GetTaskGraphRunner() { return this; }
scoped_refptr<base::SequencedTaskRunner> CreateSequencedTaskRunner();
protected:
class CategorizedWorkerPoolSequencedTaskRunner;
friend class CategorizedWorkerPoolSequencedTaskRunner;
~CategorizedWorkerPool() override;
class ClosureTask : public Task {
public:
explicit ClosureTask(base::OnceClosure closure);
ClosureTask(const ClosureTask&) = delete;
ClosureTask& operator=(const ClosureTask&) = delete;
void RunOnWorkerThread() override;
protected:
~ClosureTask() override;
private:
base::OnceClosure closure_;
};
void CollectCompletedTasksWithLockAcquired(NamespaceToken token,
Task::Vector* completed_tasks)
EXCLUSIVE_LOCKS_REQUIRED(lock_);
bool ShouldRunTaskForCategoryWithLockAcquired(TaskCategory category)
EXCLUSIVE_LOCKS_REQUIRED(lock_);
mutable base::Lock lock_;
TaskGraphWorkQueue work_queue_ GUARDED_BY(lock_);
const NamespaceToken namespace_token_;
Task::Vector tasks_ GUARDED_BY(lock_);
TaskGraph graph_ GUARDED_BY(lock_);
Task::Vector completed_tasks_ GUARDED_BY(lock_);
base::ConditionVariable has_namespaces_with_finished_running_tasks_cv_;
};
class CC_EXPORT CategorizedWorkerPoolImpl : public CategorizedWorkerPool {
public:
explicit CategorizedWorkerPoolImpl(Delegate* delegate = nullptr);
void ThreadWillRun(base::PlatformThreadId tid);
bool PostDelayedTask(const base::Location& from_here,
base::OnceClosure task,
base::TimeDelta delay) override;
void ScheduleTasks(NamespaceToken token, TaskGraph* graph) override;
void Run(const std::vector<TaskCategory>& categories,
base::ConditionVariable* has_ready_to_run_tasks_cv);
void FlushForTesting() override;
void Start(int max_concurrency_foreground) override;
void Shutdown() override;
private:
~CategorizedWorkerPoolImpl() override;
void ScheduleTasksWithLockAcquired(NamespaceToken token, TaskGraph* graph)
EXCLUSIVE_LOCKS_REQUIRED(lock_);
bool RunTaskWithLockAcquired(const std::vector<TaskCategory>& categories)
EXCLUSIVE_LOCKS_REQUIRED(lock_);
void RunTaskInCategoryWithLockAcquired(TaskCategory category)
EXCLUSIVE_LOCKS_REQUIRED(lock_);
void SignalHasReadyToRunTasksWithLockAcquired()
EXCLUSIVE_LOCKS_REQUIRED(lock_);
const raw_ptr<Delegate> delegate_;
std::vector<std::unique_ptr<base::SimpleThread>> threads_;
base::ConditionVariable has_task_for_normal_priority_thread_cv_;
base::ConditionVariable has_task_for_background_priority_thread_cv_;
bool shutdown_ GUARDED_BY(lock_);
};
class CC_EXPORT CategorizedWorkerPoolJob : public CategorizedWorkerPool {
public:
CategorizedWorkerPoolJob();
bool PostDelayedTask(const base::Location& from_here,
base::OnceClosure task,
base::TimeDelta delay) override;
void ScheduleTasks(NamespaceToken token, TaskGraph* graph) override;
void Run(base::span<const TaskCategory> categories,
base::JobDelegate* job_delegate);
void FlushForTesting() override;
void Start(int max_concurrency_foreground) override;
void Shutdown() override;
#if BUILDFLAG(IS_OHOS)
base::JobHandle* GetForegroundJobHandle() {
return &foreground_job_handle_;
}
#endif
private:
~CategorizedWorkerPoolJob() override;
absl::optional<TaskGraphWorkQueue::PrioritizedTask>
GetNextTaskToRunWithLockAcquired(base::span<const TaskCategory> categories);
base::JobHandle* ScheduleTasksWithLockAcquired(NamespaceToken token,
TaskGraph* graph)
EXCLUSIVE_LOCKS_REQUIRED(lock_);
base::JobHandle* GetJobHandleToNotifyWithLockAcquired()
EXCLUSIVE_LOCKS_REQUIRED(lock_);
size_t GetMaxJobConcurrency(base::span<const TaskCategory> categories) const;
size_t max_concurrency_foreground_ = 0;
base::JobHandle background_job_handle_;
base::JobHandle foreground_job_handle_;
};
}
#endif