#ifndef BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_
#define BASE_TASK_SEQUENCE_MANAGER_TASK_QUEUE_IMPL_H_
#include <stddef.h>
#include <deque>
#include <functional>
#include <memory>
#include <optional>
#include <set>
#include <utility>
#include <vector>
#include "base/base_export.h"
#include "base/compiler_specific.h"
#include "base/containers/flat_map.h"
#include "base/containers/intrusive_heap.h"
#include "base/dcheck_is_on.h"
#include "base/functional/callback.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/raw_ptr_exclusion.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
#include "base/observer_list.h"
#include "base/pending_task.h"
#include "base/task/common/checked_lock.h"
#include "base/task/common/operations_controller.h"
#include "base/task/sequence_manager/associated_thread_id.h"
#include "base/task/sequence_manager/atomic_flag_set.h"
#include "base/task/sequence_manager/enqueue_order.h"
#include "base/task/sequence_manager/fence.h"
#include "base/task/sequence_manager/sequenced_task_source.h"
#include "base/task/sequence_manager/task_queue.h"
#include "base/task/sequence_manager/tasks.h"
#include "base/threading/thread_checker.h"
#include "base/time/time_override.h"
#include "base/trace_event/base_tracing_forward.h"
#include "base/values.h"
namespace base {
class LazyNow;
namespace sequence_manager::internal {
class SequenceManagerImpl;
class WorkQueue;
class WorkQueueSets;
class WakeUpQueue;
class BASE_EXPORT TaskQueueImpl : public TaskQueue {
public:
static void InitializeFeatures();
TaskQueueImpl(SequenceManagerImpl* sequence_manager,
WakeUpQueue* wake_up_queue,
const TaskQueue::Spec& spec);
TaskQueueImpl(const TaskQueueImpl&) = delete;
TaskQueueImpl& operator=(const TaskQueueImpl&) = delete;
~TaskQueueImpl() override;
enum class WorkQueueType { kImmediate, kDelayed };
enum class CurrentThread { kMainThread, kNotMainThread };
struct DeferredNonNestableTask {
Task task;
RAW_PTR_EXCLUSION internal::TaskQueueImpl* task_queue;
WorkQueueType work_queue_type;
};
using OnNextWakeUpChangedCallback = RepeatingCallback<void(TimeTicks)>;
using OnTaskStartedHandler =
RepeatingCallback<void(const Task&, const TaskQueue::TaskTiming&)>;
using OnTaskCompletedHandler =
RepeatingCallback<void(const Task&, TaskQueue::TaskTiming*, LazyNow*)>;
using OnTaskPostedHandler = RepeatingCallback<void(const Task&)>;
using TaskExecutionTraceLogger =
RepeatingCallback<void(perfetto::EventContext&, const Task&)>;
const char* GetName() const override;
bool IsQueueEnabled() const override;
bool IsEmpty() const override;
size_t GetNumberOfPendingTasks() const override;
bool HasTaskToRunImmediatelyOrReadyDelayedTask() const override;
std::optional<WakeUp> GetNextDesiredWakeUp() override;
void SetQueuePriority(TaskQueue::QueuePriority priority) override;
TaskQueue::QueuePriority GetQueuePriority() const override;
void AddTaskObserver(TaskObserver* task_observer) override;
void RemoveTaskObserver(TaskObserver* task_observer) override;
void InsertFence(TaskQueue::InsertFencePosition position) override;
void InsertFenceAt(TimeTicks time) override;
void RemoveFence() override;
bool HasActiveFence() override;
bool BlockedByFence() const override;
void SetThrottler(TaskQueue::Throttler* throttler) override;
void ResetThrottler() override;
void UpdateWakeUp(LazyNow* lazy_now) override;
void SetShouldReportPostedTasksWhenDisabled(bool should_report) override;
scoped_refptr<SingleThreadTaskRunner> CreateTaskRunner(
TaskType task_type) const override;
const scoped_refptr<SingleThreadTaskRunner>& task_runner() const override;
void SetOnTaskStartedHandler(OnTaskStartedHandler handler) override;
void SetOnTaskCompletedHandler(OnTaskCompletedHandler handler) override;
[[nodiscard]] std::unique_ptr<TaskQueue::OnTaskPostedCallbackHandle>
AddOnTaskPostedHandler(OnTaskPostedHandler handler) override;
void SetTaskExecutionTraceLogger(TaskExecutionTraceLogger logger) override;
std::unique_ptr<QueueEnabledVoter> CreateQueueEnabledVoter() override;
void RemoveCancelledTasks() override;
bool IsBlockedByScopedExecutionFences() override;
void SetQueueEnabled(bool enabled);
void UnregisterTaskQueue();
QueueName GetProtoName() const;
bool CouldTaskRun(EnqueueOrder enqueue_order) const;
bool WasBlockedOrLowPriority(EnqueueOrder enqueue_order) const;
void ReloadEmptyImmediateWorkQueue();
Value::Dict AsValue(TimeTicks now, bool force_verbose) const;
bool GetQuiescenceMonitored() const { return should_monitor_quiescence_; }
bool GetShouldNotifyObservers() const { return should_notify_observers_; }
void NotifyWillProcessTask(const Task& task,
bool was_blocked_or_low_priority);
void NotifyDidProcessTask(const Task& task);
bool HasTaskToRunImmediately() const;
bool HasTaskToRunImmediatelyLocked() const
EXCLUSIVE_LOCKS_REQUIRED(any_thread_lock_);
WorkQueue* delayed_work_queue() {
return main_thread_only().delayed_work_queue.get();
}
const WorkQueue* delayed_work_queue() const {
return main_thread_only().delayed_work_queue.get();
}
WorkQueue* immediate_work_queue() {
return main_thread_only().immediate_work_queue.get();
}
const WorkQueue* immediate_work_queue() const {
return main_thread_only().immediate_work_queue.get();
}
TaskExecutionTraceLogger task_execution_trace_logger() const {
return main_thread_only().task_execution_trace_logger;
}
bool RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now);
void MoveReadyDelayedTasksToWorkQueue(LazyNow* lazy_now,
EnqueueOrder enqueue_order);
void OnWakeUp(LazyNow* lazy_now, EnqueueOrder enqueue_order);
const WakeUpQueue* wake_up_queue() const {
return main_thread_only().wake_up_queue;
}
HeapHandle heap_handle() const { return main_thread_only().heap_handle; }
void set_heap_handle(HeapHandle heap_handle) {
main_thread_only().heap_handle = heap_handle;
}
void RequeueDeferredNonNestableTask(DeferredNonNestableTask task);
void PushImmediateIncomingTaskForTest(Task task);
void ReclaimMemory(TimeTicks now);
void OnTaskStarted(const Task& task,
const TaskQueue::TaskTiming& task_timing);
void OnTaskCompleted(const Task& task,
TaskQueue::TaskTiming* task_timing,
LazyNow* lazy_now);
bool RequiresTaskTiming() const;
WeakPtr<SequenceManagerImpl> GetSequenceManagerWeakPtr();
SequenceManagerImpl* sequence_manager() const { return sequence_manager_; }
bool IsUnregistered() const;
void CompleteInitializationOnBoundThread();
void AddQueueEnabledVoter(bool voter_is_enabled,
TaskQueue::QueueEnabledVoter& voter);
void RemoveQueueEnabledVoter(bool voter_is_enabled,
TaskQueue::QueueEnabledVoter& voter);
void OnQueueEnabledVoteChanged(bool enabled);
protected:
void SetNextWakeUp(LazyNow* lazy_now, std::optional<WakeUp> wake_up);
private:
friend class WorkQueue;
friend class WorkQueueTest;
friend class DelayedTaskHandleDelegate;
class GuardedTaskPoster : public RefCountedThreadSafe<GuardedTaskPoster> {
public:
explicit GuardedTaskPoster(TaskQueueImpl* outer);
bool PostTask(PostedTask task);
DelayedTaskHandle PostCancelableTask(PostedTask task);
bool RunOrPostTask(PostedTask task);
void StartAcceptingOperations() {
operations_controller_.StartAcceptingOperations();
}
void ShutdownAndWaitForZeroOperations() {
operations_controller_.ShutdownAndWaitForZeroOperations();
outer_ = nullptr;
}
private:
friend class RefCountedThreadSafe<GuardedTaskPoster>;
~GuardedTaskPoster();
base::internal::OperationsController operations_controller_;
RAW_PTR_EXCLUSION TaskQueueImpl* outer_ = nullptr;
};
class TaskRunner final : public SingleThreadTaskRunner {
public:
TaskRunner(scoped_refptr<GuardedTaskPoster> task_poster,
scoped_refptr<AssociatedThreadId> associated_thread,
TaskType task_type);
bool PostDelayedTask(const Location& location,
OnceClosure callback,
TimeDelta delay) final;
bool PostDelayedTaskAt(subtle::PostDelayedTaskPassKey,
const Location& location,
OnceClosure callback,
TimeTicks delayed_run_time,
base::subtle::DelayPolicy delay_policy) final;
DelayedTaskHandle PostCancelableDelayedTaskAt(
subtle::PostDelayedTaskPassKey,
const Location& location,
OnceClosure callback,
TimeTicks delayed_run_time,
base::subtle::DelayPolicy delay_policy) final;
DelayedTaskHandle PostCancelableDelayedTask(subtle::PostDelayedTaskPassKey,
const Location& location,
OnceClosure callback,
TimeDelta delay) final;
bool PostNonNestableDelayedTask(const Location& location,
OnceClosure callback,
TimeDelta delay) final;
bool RunOrPostTask(subtle::RunOrPostTaskPassKey,
const Location& from_here,
OnceClosure task) final;
bool BelongsToCurrentThread() const final;
bool RunsTasksInCurrentSequence() const final;
private:
~TaskRunner() final;
const scoped_refptr<GuardedTaskPoster> task_poster_;
const scoped_refptr<AssociatedThreadId> associated_thread_;
const TaskType task_type_;
};
class OnTaskPostedCallbackHandleImpl
: public TaskQueue::OnTaskPostedCallbackHandle {
public:
OnTaskPostedCallbackHandleImpl(
TaskQueueImpl* task_queue_impl,
scoped_refptr<const AssociatedThreadId> associated_thread_);
~OnTaskPostedCallbackHandleImpl() override;
void UnregisterTaskQueue() { task_queue_impl_ = nullptr; }
private:
RAW_PTR_EXCLUSION TaskQueueImpl* task_queue_impl_ = nullptr;
const scoped_refptr<const AssociatedThreadId> associated_thread_;
};
struct DelayedIncomingQueue {
public:
DelayedIncomingQueue();
DelayedIncomingQueue(const DelayedIncomingQueue&) = delete;
DelayedIncomingQueue& operator=(const DelayedIncomingQueue&) = delete;
~DelayedIncomingQueue();
void push(Task task);
void remove(HeapHandle heap_handle);
Task take_top();
bool empty() const { return queue_.empty(); }
size_t size() const { return queue_.size(); }
const Task& top() const LIFETIME_BOUND { return queue_.top(); }
void swap(DelayedIncomingQueue* other);
void SweepCancelledTasks(SequenceManagerImpl* sequence_manager);
Value::List AsValue(TimeTicks now) const;
private:
struct Compare {
bool operator()(const Task& lhs, const Task& rhs) const;
};
IntrusiveHeap<Task, Compare> queue_;
};
struct MainThreadOnly {
MainThreadOnly(TaskQueueImpl* task_queue, WakeUpQueue* wake_up_queue);
~MainThreadOnly();
raw_ptr<WakeUpQueue> wake_up_queue;
raw_ptr<TaskQueue::Throttler> throttler = nullptr;
std::unique_ptr<WorkQueue> delayed_work_queue;
std::unique_ptr<WorkQueue> immediate_work_queue;
DelayedIncomingQueue delayed_incoming_queue;
ObserverList<TaskObserver>::UncheckedAndDanglingUntriaged task_observers;
HeapHandle heap_handle;
bool is_enabled = true;
std::optional<Fence> current_fence;
std::optional<TimeTicks> delayed_fence;
EnqueueOrder enqueue_order_at_which_we_became_unblocked;
EnqueueOrder
enqueue_order_at_which_we_became_unblocked_with_normal_priority;
OnTaskStartedHandler on_task_started_handler;
OnTaskCompletedHandler on_task_completed_handler;
TaskExecutionTraceLogger task_execution_trace_logger;
std::optional<WakeUp> scheduled_wake_up;
bool is_enabled_for_test = true;
std::optional<TimeTicks> disabled_time;
bool should_report_posted_tasks_when_disabled = false;
int enabled_voter_count = 0;
int voter_count = 0;
};
void PostTask(PostedTask task);
void RemoveCancelableTask(HeapHandle heap_handle);
void PostImmediateTaskImpl(PostedTask task, CurrentThread current_thread);
void PostDelayedTaskImpl(PostedTask task, CurrentThread current_thread);
void PushOntoDelayedIncomingQueueFromMainThread(Task pending_task,
LazyNow* lazy_now,
bool notify_task_annotator);
void PushOntoDelayedIncomingQueue(Task pending_task);
void ScheduleDelayedWorkTask(Task pending_task);
void MoveReadyImmediateTasksToImmediateWorkQueueLocked()
EXCLUSIVE_LOCKS_REQUIRED(any_thread_lock_);
using TaskDeque = std::deque<Task>;
void TakeImmediateIncomingQueueTasks(TaskDeque* queue);
void TraceQueueSize() const;
static Value::List QueueAsValue(const TaskDeque& queue, TimeTicks now);
static Value::Dict TaskAsValue(const Task& task, TimeTicks now);
Task MakeDelayedTask(PostedTask delayed_task, LazyNow* lazy_now) const;
void ActivateDelayedFenceIfNeeded(const Task& task);
void UpdateCrossThreadQueueStateLocked()
EXCLUSIVE_LOCKS_REQUIRED(any_thread_lock_);
TimeDelta GetTaskDelayAdjustment(CurrentThread current_thread);
void MaybeReportIpcTaskQueuedFromMainThread(const Task& pending_task);
bool ShouldReportIpcTaskQueuedFromAnyThreadLocked(
base::TimeDelta* time_since_disabled)
EXCLUSIVE_LOCKS_REQUIRED(any_thread_lock_);
void MaybeReportIpcTaskQueuedFromAnyThreadLocked(const Task& pending_task)
EXCLUSIVE_LOCKS_REQUIRED(any_thread_lock_);
void MaybeReportIpcTaskQueuedFromAnyThreadUnlocked(const Task& pending_task);
void ReportIpcTaskQueued(const Task& pending_task,
const base::TimeDelta& time_since_disabled);
void OnQueueUnblocked();
void InsertFence(Fence fence);
void RemoveOnTaskPostedHandler(
OnTaskPostedCallbackHandleImpl* on_task_posted_callback_handle);
TaskQueue::QueuePriority DefaultPriority() const;
bool AreAllQueueEnabledVotersEnabled() const {
return main_thread_only().enabled_voter_count ==
main_thread_only().voter_count;
}
bool IsQueueEnabledFromAnyThread() const;
QueueName name_;
const raw_ptr<SequenceManagerImpl, AcrossTasksDanglingUntriaged>
sequence_manager_;
const scoped_refptr<AssociatedThreadId> associated_thread_;
const scoped_refptr<GuardedTaskPoster> task_poster_;
mutable base::internal::CheckedLock any_thread_lock_;
struct AnyThread {
struct TracingOnly {
TracingOnly();
~TracingOnly();
std::optional<TimeTicks> disabled_time;
bool should_report_posted_tasks_when_disabled = false;
};
AnyThread();
~AnyThread();
TaskDeque immediate_incoming_queue;
bool immediate_work_queue_empty = true;
bool post_immediate_task_should_schedule_work = true;
bool unregistered = false;
bool is_enabled = true;
base::flat_map<raw_ptr<OnTaskPostedCallbackHandleImpl>, OnTaskPostedHandler>
on_task_posted_handlers;
#if DCHECK_IS_ON()
size_t queue_set_index = 0;
#endif
TracingOnly tracing_only;
};
AnyThread any_thread_ GUARDED_BY(any_thread_lock_);
MainThreadOnly main_thread_only_;
MainThreadOnly& main_thread_only() {
associated_thread_->AssertInSequenceWithCurrentThread();
return main_thread_only_;
}
const MainThreadOnly& main_thread_only() const LIFETIME_BOUND {
associated_thread_->AssertInSequenceWithCurrentThread();
return main_thread_only_;
}
AtomicFlagSet::AtomicFlag empty_queues_to_reload_handle_;
const bool should_monitor_quiescence_;
const bool should_notify_observers_;
const bool delayed_fence_allowed_;
const bool scoped_execution_fence_allowed_;
const scoped_refptr<SingleThreadTaskRunner> default_task_runner_;
base::WeakPtrFactory<TaskQueueImpl> voter_weak_ptr_factory_{this};
};
}
}
#endif