#ifndef BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_
#define BASE_TASK_SEQUENCE_MANAGER_SEQUENCE_MANAGER_IMPL_H_
#include <array>
#include <atomic>
#include <deque>
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <string>
#include <utility>
#include "base/atomic_sequence_num.h"
#include "base/base_export.h"
#include "base/callback_list.h"
#include "base/compiler_specific.h"
#include "base/containers/circular_deque.h"
#include "base/debug/crash_logging.h"
#include "base/functional/callback_forward.h"
#include "base/gtest_prod_util.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/raw_ptr_exclusion.h"
#include "base/memory/scoped_refptr.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_pump_type.h"
#include "base/observer_list.h"
#include "base/pending_task.h"
#include "base/rand_util.h"
#include "base/run_loop.h"
#include "base/synchronization/lock.h"
#include "base/task/current_thread.h"
#include "base/task/sequence_manager/associated_thread_id.h"
#include "base/task/sequence_manager/enqueue_order.h"
#include "base/task/sequence_manager/enqueue_order_generator.h"
#include "base/task/sequence_manager/sequence_manager.h"
#include "base/task/sequence_manager/task_queue.h"
#include "base/task/sequence_manager/task_queue_impl.h"
#include "base/task/sequence_manager/task_queue_selector.h"
#include "base/task/sequence_manager/thread_controller.h"
#include "base/task/sequence_manager/work_tracker.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/single_thread_task_runner.h"
#include "base/threading/thread_checker.h"
#include "base/time/default_tick_clock.h"
#include "base/types/pass_key.h"
#include "base/values.h"
#include "build/build_config.h"
namespace base {
namespace internal {
class SequenceManagerThreadDelegate;
}
namespace trace_event {
class ConvertableToTraceFormat;
}
namespace sequence_manager {
class SequenceManagerForTest;
class TaskQueue;
class TaskTimeObserver;
class TimeDomain;
namespace internal {
class TaskQueueImpl;
class DefaultWakeUpQueue;
class SequenceManagerImpl;
class ThreadControllerImpl;
std::unique_ptr<SequenceManagerImpl> CreateUnboundSequenceManagerImpl(
PassKey<base::internal::SequenceManagerThreadDelegate>,
SequenceManager::Settings settings);
class BASE_EXPORT SequenceManagerImpl
: public SequenceManager,
public internal::SequencedTaskSource,
public internal::TaskQueueSelector::Observer,
public RunLoop::NestingObserver {
public:
using Observer = SequenceManager::Observer;
SequenceManagerImpl(const SequenceManagerImpl&) = delete;
SequenceManagerImpl& operator=(const SequenceManagerImpl&) = delete;
~SequenceManagerImpl() override;
static void InitializeFeatures();
void BindToCurrentThread() override;
scoped_refptr<SequencedTaskRunner> GetTaskRunnerForCurrentTask() override;
void BindToMessagePump(std::unique_ptr<MessagePump> message_pump) override;
void SetObserver(Observer* observer) override;
void AddTaskTimeObserver(TaskTimeObserver* task_time_observer) override;
void RemoveTaskTimeObserver(TaskTimeObserver* task_time_observer) override;
void SetTimeDomain(TimeDomain* time_domain) override;
void ResetTimeDomain() override;
const TickClock* GetTickClock() const override;
TimeTicks NowTicks() const override;
void SetDefaultTaskRunner(
scoped_refptr<SingleThreadTaskRunner> task_runner) override;
void ReclaimMemory() override;
bool GetAndClearSystemIsQuiescentBit() override;
void SetWorkBatchSize(int work_batch_size) override;
void EnableCrashKeys(const char* async_stack_crash_key) override;
size_t GetPendingTaskCountForTesting() const override;
TaskQueue::Handle CreateTaskQueue(const TaskQueue::Spec& spec) override;
std::string DescribeAllPendingTasks() const override;
void AddTaskObserver(TaskObserver* task_observer) override;
void RemoveTaskObserver(TaskObserver* task_observer) override;
std::optional<WakeUp> GetNextDelayedWakeUp() const override;
TaskQueue::QueuePriority GetPriorityCount() const override;
std::vector<TaskQueue*> GetBestEffortTaskQueues() override;
void SetRunTaskSynchronouslyAllowed(
bool can_run_tasks_synchronously) override;
using internal::SequencedTaskSource::SelectNextTask;
std::optional<SelectedTask> SelectNextTask(LazyNow& lazy_now,
SelectTaskOption option) override;
void DidRunTask(LazyNow& lazy_now) override;
using internal::SequencedTaskSource::GetPendingWakeUp;
std::optional<WakeUp> GetPendingWakeUp(LazyNow* lazy_now,
SelectTaskOption option) override;
#if BUILDFLAG(IS_WIN)
bool NextWakeUpNeedsHighRes() override;
#endif
void OnBeginWork() override;
bool OnIdle() override;
void MaybeEmitTaskDetails(
perfetto::EventContext& ctx,
const SequencedTaskSource::SelectedTask& selected_task) const override;
void AddDestructionObserver(
CurrentThread::DestructionObserver* destruction_observer);
void RemoveDestructionObserver(
CurrentThread::DestructionObserver* destruction_observer);
[[nodiscard]] CallbackListSubscription RegisterOnNextIdleCallback(
OnceClosure on_next_idle_callback);
void SetTaskRunner(scoped_refptr<SingleThreadTaskRunner> task_runner);
scoped_refptr<SingleThreadTaskRunner> GetTaskRunner();
bool IsBoundToCurrentThread() const;
MessagePump* GetMessagePump() const override;
bool IsType(MessagePumpType type) const;
void SetAddQueueTimeToTasks(bool enable);
void SetTaskExecutionAllowedInNativeNestedLoop(bool allowed);
bool IsTaskExecutionAllowedInNativeNestedLoop() const;
#if BUILDFLAG(IS_IOS)
void AttachToMessagePump();
#endif
bool IsIdleForTesting() override;
void EnableMessagePumpTimeKeeperMetrics(
const char* thread_name,
bool wall_time_based_metrics_enabled_for_testing = false);
void ScheduleWork();
internal::TaskQueueImpl* currently_executing_task_queue() const;
void UnregisterTaskQueueImpl(
std::unique_ptr<internal::TaskQueueImpl> task_queue);
scoped_refptr<AssociatedThreadId> associated_thread() const {
return associated_thread_;
}
const Settings& settings() const LIFETIME_BOUND { return settings_; }
WeakPtr<SequenceManagerImpl> GetWeakPtr();
static constexpr TimeDelta kReclaimMemoryInterval = Seconds(30);
static scoped_refptr<SingleThreadTaskRunner> GetCurrentBestEffortTaskRunner(
PassKey<SingleThreadTaskRunner>);
protected:
static std::unique_ptr<ThreadControllerImpl>
CreateThreadControllerImplForCurrentThread(const TickClock* clock);
SequenceManagerImpl(std::unique_ptr<internal::ThreadController> controller,
SequenceManager::Settings settings = Settings());
friend class internal::TaskQueueImpl;
friend class internal::DefaultWakeUpQueue;
friend class ::base::sequence_manager::SequenceManagerForTest;
private:
FRIEND_TEST_ALL_PREFIXES(SequenceManagerTest,
BestEffortPriority_SinglePriority);
FRIEND_TEST_ALL_PREFIXES(SequenceManagerTest,
BestEffortPriority_ManyHighPriorities);
FRIEND_TEST_ALL_PREFIXES(SequenceManagerTest,
BestEffortPriority_ManyLowPriorities);
static SequenceManagerImpl* GetCurrent();
friend class ::base::CurrentThread;
friend std::unique_ptr<SequenceManager>
sequence_manager::CreateSequenceManagerOnCurrentThread(
SequenceManager::Settings);
friend std::unique_ptr<SequenceManager>
sequence_manager::CreateSequenceManagerOnCurrentThreadWithPump(
std::unique_ptr<MessagePump> message_pump,
SequenceManager::Settings);
friend std::unique_ptr<SequenceManager>
sequence_manager::CreateUnboundSequenceManager(SequenceManager::Settings);
friend std::unique_ptr<SequenceManagerImpl>
sequence_manager::internal::CreateUnboundSequenceManagerImpl(
PassKey<base::internal::SequenceManagerThreadDelegate>,
SequenceManager::Settings);
static std::unique_ptr<SequenceManagerImpl> CreateOnCurrentThread(
SequenceManager::Settings settings);
static std::unique_ptr<SequenceManagerImpl> CreateUnbound(
SequenceManager::Settings settings);
enum class ProcessTaskResult {
kDeferred,
kExecuted,
kSequenceManagerDeleted,
};
using NonNestableTaskDeque =
circular_deque<internal::TaskQueueImpl::DeferredNonNestableTask>;
struct ExecutingTask {
ExecutingTask(Task&& task,
internal::TaskQueueImpl* task_queue,
TaskQueue::TaskTiming task_timing)
: pending_task(std::move(task)),
task_queue(task_queue),
task_queue_name(task_queue->GetProtoName()),
task_timing(task_timing),
priority(task_queue->GetQueuePriority()),
task_type(pending_task.task_type) {}
Task pending_task;
RAW_PTR_EXCLUSION internal::TaskQueueImpl* task_queue = nullptr;
QueueName task_queue_name;
TaskQueue::TaskTiming task_timing;
TaskQueue::QueuePriority priority;
int task_type;
};
struct MainThreadOnly {
explicit MainThreadOnly(
SequenceManagerImpl* sequence_manager,
const scoped_refptr<AssociatedThreadId>& associated_thread,
const SequenceManager::Settings& settings,
const base::TickClock* clock);
~MainThreadOnly();
int nesting_depth = 0;
NonNestableTaskDeque non_nestable_task_queue;
raw_ptr<debug::CrashKeyString> file_name_crash_key = nullptr;
raw_ptr<debug::CrashKeyString> function_name_crash_key = nullptr;
raw_ptr<debug::CrashKeyString> async_stack_crash_key = nullptr;
std::array<char, static_cast<size_t>(debug::CrashKeySize::Size64)>
async_stack_buffer = {};
internal::TaskQueueSelector selector;
ObserverList<TaskObserver>::UncheckedAndRawPtrExcluded task_observers;
ObserverList<TaskTimeObserver> task_time_observers;
const raw_ptr<const base::TickClock> default_clock;
raw_ptr<TimeDomain> time_domain = nullptr;
std::unique_ptr<WakeUpQueue> wake_up_queue;
std::unique_ptr<WakeUpQueue> non_waking_wake_up_queue;
bool memory_reclaim_scheduled = false;
TimeTicks next_time_to_reclaim_memory;
RAW_PTR_EXCLUSION std::set<internal::TaskQueueImpl*> active_queues;
std::map<internal::TaskQueueImpl*, std::unique_ptr<internal::TaskQueueImpl>>
queues_to_delete;
bool task_was_run_on_quiescence_monitored_queue = false;
bool nesting_observer_registered_ = false;
std::deque<ExecutingTask> task_execution_stack;
raw_ptr<Observer> observer = nullptr;
ObserverList<CurrentThread::DestructionObserver>::
UncheckedAndDanglingUntriaged destruction_observers;
OnceClosureList on_next_idle_callbacks;
};
void CompleteInitializationOnBoundThread();
void OnTaskQueueEnabled(internal::TaskQueueImpl* queue) override;
void OnWorkAvailable() override;
void OnBeginNestedRunLoop() override;
void OnExitNestedRunLoop() override;
void SetNextWakeUp(LazyNow* lazy_now, std::optional<WakeUp> wake_up);
void WillRequestReloadImmediateWorkQueue();
SyncWorkAuthorization TryAcquireSyncWorkAuthorization();
void WillQueueTask(Task* pending_task);
void MoveReadyDelayedTasksToWorkQueues(LazyNow* lazy_now);
void NotifyWillProcessTask(ExecutingTask* task, LazyNow* time_before_task);
void NotifyDidProcessTask(ExecutingTask* task, LazyNow* time_after_task);
EnqueueOrder GetNextSequenceNumber();
bool GetAddQueueTimeToTasks();
std::unique_ptr<trace_event::ConvertableToTraceFormat>
AsValueWithSelectorResultForTracing(internal::WorkQueue* selected_work_queue,
bool force_verbose) const;
Value::Dict AsValueWithSelectorResult(
internal::WorkQueue* selected_work_queue,
bool force_verbose) const;
AtomicFlagSet::AtomicFlag GetFlagToRequestReloadForEmptyQueue(
TaskQueueImpl* task_queue);
void ReloadEmptyWorkQueues();
std::unique_ptr<internal::TaskQueueImpl> CreateTaskQueueImpl(
const TaskQueue::Spec& spec);
void MaybeReclaimMemory();
void CleanUpQueues();
void RemoveAllCanceledDelayedTasksFromFront(LazyNow* lazy_now);
TaskQueue::TaskTiming::TimeRecordingPolicy ShouldRecordTaskTiming(
const internal::TaskQueueImpl* task_queue);
void RecordCrashKeys(const PendingTask&);
std::optional<SelectedTask> SelectNextTaskImpl(LazyNow& lazy_now,
SelectTaskOption option);
std::optional<WakeUp> GetNextDelayedWakeUpWithOption(
SelectTaskOption option) const;
std::optional<WakeUp> AdjustWakeUp(std::optional<WakeUp> wake_up,
LazyNow* lazy_now) const;
void MaybeAddLeewayToTask(Task& task) const;
#if DCHECK_IS_ON()
void LogTaskDebugInfo(const internal::WorkQueue* work_queue) const;
#endif
TaskQueue::TaskTiming InitializeTaskTiming(
internal::TaskQueueImpl* task_queue);
std::optional<TaskQueue::QueuePriority> GetBestEffortPriority() const;
const scoped_refptr<AssociatedThreadId> associated_thread_;
EnqueueOrderGenerator enqueue_order_generator_;
const std::unique_ptr<internal::ThreadController> controller_;
const Settings settings_;
WorkTracker work_tracker_;
std::atomic<bool> add_queue_time_to_tasks_;
AtomicFlagSet empty_queues_to_reload_;
MainThreadOnly main_thread_only_;
MainThreadOnly& main_thread_only() {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
return main_thread_only_;
}
const MainThreadOnly& main_thread_only() const LIFETIME_BOUND {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
return main_thread_only_;
}
std::atomic<const base::TickClock*> clock_;
const base::TickClock* main_thread_clock() const {
DCHECK_CALLED_ON_VALID_THREAD(associated_thread_->thread_checker);
return clock_.load(std::memory_order_relaxed);
}
const base::TickClock* any_thread_clock() const {
return clock_.load(std::memory_order_acquire);
}
WeakPtrFactory<SequenceManagerImpl> weak_factory_{this};
};
}
}
}
#endif