#include "base/task/sequence_manager/thread_controller_impl.h"
#include <algorithm>
#include "base/functional/bind.h"
#include "base/memory/ptr_util.h"
#include "base/message_loop/message_pump.h"
#include "base/notreached.h"
#include "base/run_loop.h"
#include "base/task/common/lazy_now.h"
#include "base/task/sequence_manager/sequence_manager_impl.h"
#include "base/task/sequence_manager/sequenced_task_source.h"
#include "base/trace_event/trace_event.h"
#include "build/build_config.h"
namespace base::sequence_manager::internal {
using ShouldScheduleWork = WorkDeduplicator::ShouldScheduleWork;
ThreadControllerImpl::ThreadControllerImpl(
SequenceManagerImpl* funneled_sequence_manager,
scoped_refptr<SingleThreadTaskRunner> task_runner,
const TickClock* time_source)
: ThreadController(time_source),
funneled_sequence_manager_(funneled_sequence_manager),
task_runner_(task_runner),
message_loop_task_runner_(funneled_sequence_manager
? funneled_sequence_manager->GetTaskRunner()
: nullptr),
work_deduplicator_(associated_thread_) {
if (task_runner_ || funneled_sequence_manager_) {
work_deduplicator_.BindToCurrentThread();
}
immediate_do_work_closure_ =
BindRepeating(&ThreadControllerImpl::DoWork, weak_factory_.GetWeakPtr(),
WorkType::kImmediate);
delayed_do_work_closure_ =
BindRepeating(&ThreadControllerImpl::DoWork, weak_factory_.GetWeakPtr(),
WorkType::kDelayed);
LazyNow lazy_now(time_source_);
run_level_tracker_.OnRunLoopStarted(RunLevelTracker::kIdle, lazy_now);
}
ThreadControllerImpl::~ThreadControllerImpl() {
run_level_tracker_.OnRunLoopEnded();
}
ThreadControllerImpl::MainSequenceOnly::MainSequenceOnly() = default;
ThreadControllerImpl::MainSequenceOnly::~MainSequenceOnly() = default;
std::unique_ptr<ThreadControllerImpl> ThreadControllerImpl::Create(
SequenceManagerImpl* funneled_sequence_manager,
const TickClock* time_source) {
return WrapUnique(new ThreadControllerImpl(
funneled_sequence_manager,
funneled_sequence_manager ? funneled_sequence_manager->GetTaskRunner()
: nullptr,
time_source));
}
void ThreadControllerImpl::SetSequencedTaskSource(
SequencedTaskSource* sequence) {
DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker);
DCHECK(sequence);
DCHECK(!sequence_);
sequence_ = sequence;
}
void ThreadControllerImpl::ScheduleWork() {
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
"ThreadControllerImpl::ScheduleWork::PostTask");
if (work_deduplicator_.OnWorkRequested() ==
ShouldScheduleWork::kScheduleImmediate) {
task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_);
}
}
void ThreadControllerImpl::SetNextDelayedDoWork(LazyNow* lazy_now,
std::optional<WakeUp> wake_up) {
DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker);
DCHECK(sequence_);
DCHECK(!wake_up || !wake_up->is_immediate());
if (!wake_up) {
if (!main_sequence_only().next_delayed_do_work.is_max()) {
cancelable_delayed_do_work_closure_.Cancel();
main_sequence_only().next_delayed_do_work = TimeTicks::Max();
}
return;
}
if (work_deduplicator_.OnDelayedWorkRequested() ==
ShouldScheduleWork::kNotNeeded) {
return;
}
if (main_sequence_only().next_delayed_do_work == wake_up->time) {
return;
}
base::TimeDelta delay =
std::max(TimeDelta(), wake_up->time - lazy_now->Now());
TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
"ThreadControllerImpl::SetNextDelayedDoWork::PostDelayedTask",
"delay_ms", delay.InMillisecondsF());
main_sequence_only().next_delayed_do_work = wake_up->time;
cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_);
task_runner_->PostDelayedTask(
FROM_HERE, cancelable_delayed_do_work_closure_.callback(), delay);
}
bool ThreadControllerImpl::RunsTasksInCurrentSequence() {
return task_runner_->RunsTasksInCurrentSequence();
}
void ThreadControllerImpl::SetDefaultTaskRunner(
scoped_refptr<SingleThreadTaskRunner> task_runner) {
#if DCHECK_IS_ON()
default_task_runner_set_ = true;
#endif
if (!funneled_sequence_manager_) {
return;
}
funneled_sequence_manager_->SetTaskRunner(task_runner);
}
scoped_refptr<SingleThreadTaskRunner>
ThreadControllerImpl::GetDefaultTaskRunner() {
return funneled_sequence_manager_->GetTaskRunner();
}
void ThreadControllerImpl::RestoreDefaultTaskRunner() {
if (!funneled_sequence_manager_) {
return;
}
funneled_sequence_manager_->SetTaskRunner(message_loop_task_runner_);
}
void ThreadControllerImpl::BindToCurrentThread(
std::unique_ptr<MessagePump> message_pump) {
NOTREACHED();
}
void ThreadControllerImpl::WillQueueTask(PendingTask* pending_task) {
task_annotator_.WillQueueTask("SequenceManager PostTask", pending_task);
}
void ThreadControllerImpl::DoWork(WorkType work_type) {
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("sequence_manager"),
"ThreadControllerImpl::DoWork");
DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker);
DCHECK(sequence_);
work_deduplicator_.OnWorkStarted();
std::optional<base::TimeTicks> recent_time;
WeakPtr<ThreadControllerImpl> weak_ptr = weak_factory_.GetWeakPtr();
for (int i = 0; i < main_sequence_only().work_batch_size_; i++) {
LazyNow lazy_now_select_task(recent_time, time_source_);
DCHECK_GT(run_level_tracker_.num_run_levels(), 0U);
run_level_tracker_.OnWorkStarted(lazy_now_select_task);
int run_depth = static_cast<int>(run_level_tracker_.num_run_levels());
std::optional<SequencedTaskSource::SelectedTask> selected_task =
sequence_->SelectNextTask(lazy_now_select_task);
LazyNow lazy_now_task_selected(time_source_);
run_level_tracker_.OnApplicationTaskSelected(
(selected_task && selected_task->task.delayed_run_time.is_null())
? selected_task->task.queue_time
: TimeTicks(),
lazy_now_task_selected);
if (!selected_task) {
run_level_tracker_.OnWorkEnded(lazy_now_task_selected, run_depth);
break;
}
{
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("devtools.timeline"), "RunTask");
SequencedTaskSource* source = sequence_;
task_annotator_.RunTask(
"ThreadControllerImpl::RunTask", selected_task->task,
[&selected_task, &source](perfetto::EventContext& ctx) {
if (selected_task->task_execution_trace_logger) {
selected_task->task_execution_trace_logger.Run(
ctx, selected_task->task);
}
source->MaybeEmitTaskDetails(ctx, *selected_task);
});
if (!weak_ptr) {
return;
}
LazyNow lazy_now_after_run_task(time_source_);
sequence_->DidRunTask(lazy_now_after_run_task);
run_level_tracker_.OnWorkEnded(lazy_now_after_run_task, run_depth);
if (lazy_now_after_run_task.has_value()) {
recent_time =
std::optional<base::TimeTicks>(lazy_now_after_run_task.Now());
} else {
recent_time.reset();
}
}
if (run_level_tracker_.num_run_levels() > 1) {
break;
}
}
work_deduplicator_.WillCheckForMoreWork();
LazyNow lazy_now_after_work(time_source_);
std::optional<WakeUp> next_wake_up =
sequence_->GetPendingWakeUp(&lazy_now_after_work);
if ((next_wake_up && next_wake_up->is_immediate()) || sequence_->OnIdle()) {
if (work_deduplicator_.DidCheckForMoreWork(
WorkDeduplicator::NextTask::kIsImmediate) ==
ShouldScheduleWork::kScheduleImmediate) {
task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_);
}
return;
}
if (work_deduplicator_.DidCheckForMoreWork(
WorkDeduplicator::NextTask::kIsDelayed) ==
ShouldScheduleWork::kScheduleImmediate) {
task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_);
return;
}
run_level_tracker_.OnIdle(lazy_now_after_work);
if (!next_wake_up) {
main_sequence_only().next_delayed_do_work = TimeTicks::Max();
cancelable_delayed_do_work_closure_.Cancel();
return;
}
TimeTicks next_wake_up_time = next_wake_up->time;
if (next_wake_up_time == main_sequence_only().next_delayed_do_work) {
return;
}
main_sequence_only().next_delayed_do_work = next_wake_up_time;
cancelable_delayed_do_work_closure_.Reset(delayed_do_work_closure_);
task_runner_->PostDelayedTask(FROM_HERE,
cancelable_delayed_do_work_closure_.callback(),
next_wake_up_time - lazy_now_after_work.Now());
}
void ThreadControllerImpl::AddNestingObserver(
RunLoop::NestingObserver* observer) {
DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker);
nesting_observer_ = observer;
RunLoop::AddNestingObserverOnCurrentThread(this);
}
void ThreadControllerImpl::RemoveNestingObserver(
RunLoop::NestingObserver* observer) {
DCHECK_CALLED_ON_VALID_SEQUENCE(associated_thread_->sequence_checker);
DCHECK_EQ(observer, nesting_observer_);
nesting_observer_ = nullptr;
RunLoop::RemoveNestingObserverOnCurrentThread(this);
}
void ThreadControllerImpl::OnBeginNestedRunLoop() {
LazyNow lazy_now(time_source_);
run_level_tracker_.OnRunLoopStarted(RunLevelTracker::kInBetweenWorkItems,
lazy_now);
work_deduplicator_.OnWorkRequested();
task_runner_->PostTask(FROM_HERE, immediate_do_work_closure_);
if (nesting_observer_) {
nesting_observer_->OnBeginNestedRunLoop();
}
}
void ThreadControllerImpl::OnExitNestedRunLoop() {
if (nesting_observer_) {
nesting_observer_->OnExitNestedRunLoop();
}
run_level_tracker_.OnRunLoopEnded();
}
void ThreadControllerImpl::SetWorkBatchSize(int work_batch_size) {
main_sequence_only().work_batch_size_ = work_batch_size;
}
void ThreadControllerImpl::SetTaskExecutionAllowedInNativeNestedLoop(
bool allowed) {
NOTREACHED();
}
bool ThreadControllerImpl::IsTaskExecutionAllowed() const {
return true;
}
bool ThreadControllerImpl::ShouldQuitRunLoopWhenIdle() {
return false;
}
MessagePump* ThreadControllerImpl::GetBoundMessagePump() const {
return nullptr;
}
#if BUILDFLAG(IS_IOS) || BUILDFLAG(IS_ANDROID) || BUILDFLAG(IS_ARKWEB)
void ThreadControllerImpl::AttachToMessagePump() {
NOTREACHED();
}
#endif
#if BUILDFLAG(IS_IOS)
void ThreadControllerImpl::DetachFromMessagePump() {
NOTREACHED();
}
#endif
}