#include "base/test/task_environment.h"
#include <algorithm>
#include <memory>
#include <ostream>
#include "base/check.h"
#include "base/debug/stack_trace.h"
#include "base/functional/callback_forward.h"
#include "base/functional/callback_helpers.h"
#include "base/lazy_instance.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/raw_ref.h"
#include "base/message_loop/message_pump.h"
#include "base/message_loop/message_pump_type.h"
#include "base/no_destructor.h"
#include "base/process/process.h"
#include "base/run_loop.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/task/common/lazy_now.h"
#include "base/task/sequence_manager/sequence_manager.h"
#include "base/task/sequence_manager/sequence_manager_impl.h"
#include "base/task/sequence_manager/time_domain.h"
#include "base/task/single_thread_task_runner.h"
#include "base/task/thread_pool/thread_pool_impl.h"
#include "base/task/thread_pool/thread_pool_instance.h"
#include "base/test/bind.h"
#include "base/test/test_mock_time_task_runner.h"
#include "base/test/test_timeouts.h"
#include "base/thread_annotations.h"
#include "base/threading/platform_thread.h"
#include "base/threading/sequence_local_storage_map.h"
#include "base/threading/thread_checker_impl.h"
#include "base/threading/thread_local.h"
#include "base/threading/thread_restrictions.h"
#include "base/time/clock.h"
#include "base/time/tick_clock.h"
#include "base/time/time.h"
#include "base/time/time_override.h"
#include "build/build_config.h"
#include "testing/gtest/include/gtest/gtest.h"
#if BUILDFLAG(IS_POSIX) || BUILDFLAG(IS_FUCHSIA)
#include "base/files/file_descriptor_watcher_posix.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
#endif
#if BUILDFLAG(ENABLE_BASE_TRACING)
#include "base/trace_event/trace_log.h"
#endif
namespace base {
namespace test {
namespace {
ObserverList<TaskEnvironment::DestructionObserver>& GetDestructionObservers() {
static NoDestructor<ObserverList<TaskEnvironment::DestructionObserver>>
instance;
return *instance;
}
TaskEnvironment::TestTaskTracker* g_task_tracker = nullptr;
base::MessagePumpType GetMessagePumpTypeForMainThreadType(
TaskEnvironment::MainThreadType main_thread_type) {
switch (main_thread_type) {
case TaskEnvironment::MainThreadType::DEFAULT:
return MessagePumpType::DEFAULT;
case TaskEnvironment::MainThreadType::UI:
return MessagePumpType::UI;
case TaskEnvironment::MainThreadType::IO:
return MessagePumpType::IO;
}
NOTREACHED();
return MessagePumpType::DEFAULT;
}
std::unique_ptr<sequence_manager::SequenceManager>
CreateSequenceManagerForMainThreadType(
TaskEnvironment::MainThreadType main_thread_type,
sequence_manager::SequenceManager::PrioritySettings priority_settings) {
auto type = GetMessagePumpTypeForMainThreadType(main_thread_type);
return sequence_manager::CreateSequenceManagerOnCurrentThreadWithPump(
MessagePump::Create(type),
base::sequence_manager::SequenceManager::Settings::Builder()
.SetMessagePumpType(type)
.SetPrioritySettings(std::move(priority_settings))
.Build());
}
class TickClockBasedClock : public Clock {
public:
explicit TickClockBasedClock(const TickClock* tick_clock)
: tick_clock_(*tick_clock),
start_ticks_(tick_clock_->NowTicks()),
start_time_(Time::UnixEpoch()) {}
Time Now() const override {
return start_time_ + (tick_clock_->NowTicks() - start_ticks_);
}
private:
const raw_ref<const TickClock> tick_clock_;
const TimeTicks start_ticks_;
const Time start_time_;
};
}
class TaskEnvironment::TestTaskTracker
: public internal::ThreadPoolImpl::TaskTrackerImpl {
public:
TestTaskTracker();
TestTaskTracker(const TestTaskTracker&) = delete;
TestTaskTracker& operator=(const TestTaskTracker&) = delete;
bool AllowRunTasks();
bool DisallowRunTasks(TimeDelta timeout = Milliseconds(1));
bool TasksAllowedToRun() const;
std::string DescribeRunningTasks() const;
bool OnControllerThread() const {
return controller_thread_checker_.CalledOnValidThread();
}
private:
friend class TaskEnvironment;
void RunTask(internal::Task task,
internal::TaskSource* sequence,
const TaskTraits& traits) override;
void BeginCompleteShutdown(base::WaitableEvent& shutdown_event) override;
void AssertFlushForTestingAllowed() override;
mutable Lock lock_;
bool can_run_tasks_ GUARDED_BY(lock_) = true;
ConditionVariable can_run_tasks_cv_ GUARDED_BY(lock_);
ConditionVariable task_completed_cv_ GUARDED_BY(lock_);
int64_t next_task_number_ GUARDED_BY(lock_) = 1;
base::flat_map<int64_t, Location> running_tasks_ GUARDED_BY(lock_);
ThreadCheckerImpl controller_thread_checker_;
};
class TaskEnvironment::MockTimeDomain : public sequence_manager::TimeDomain {
public:
explicit MockTimeDomain(
sequence_manager::internal::SequenceManagerImpl* sequence_manager)
: sequence_manager_(sequence_manager) {
DCHECK_EQ(nullptr, current_mock_time_domain_);
current_mock_time_domain_ = this;
}
~MockTimeDomain() override {
DCHECK_EQ(this, current_mock_time_domain_);
current_mock_time_domain_ = nullptr;
}
static MockTimeDomain* current_mock_time_domain_;
static Time GetTime() {
return Time::UnixEpoch() +
(current_mock_time_domain_->NowTicks() - TimeTicks());
}
static TimeTicks GetTimeTicks() {
return current_mock_time_domain_->NowTicks();
}
void AdvanceClock(TimeDelta delta) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
{
AutoLock lock(now_ticks_lock_);
now_ticks_ += delta;
}
if (thread_pool_) {
thread_pool_->ProcessRipeDelayedTasksForTesting();
}
}
void SetThreadPool(internal::ThreadPoolImpl* thread_pool,
const TestTaskTracker* thread_pool_task_tracker) {
DCHECK(!thread_pool_);
DCHECK(!thread_pool_task_tracker_);
thread_pool_ = thread_pool;
thread_pool_task_tracker_ = thread_pool_task_tracker;
}
bool MaybeFastForwardToWakeUp(
absl::optional<sequence_manager::WakeUp> next_wake_up,
bool quit_when_idle_requested) override {
if (quit_when_idle_requested) {
return false;
}
return FastForwardToNextTaskOrCap(next_wake_up, TimeTicks::Max()) ==
NextTaskSource::kMainThreadHasWork;
}
const char* GetName() const override { return "MockTimeDomain"; }
TimeTicks NowTicks() const override {
AutoLock lock(now_ticks_lock_);
return now_ticks_;
}
enum class NextTaskSource {
kNone,
kMainThreadHasWork,
kThreadPoolOnly,
};
NextTaskSource FastForwardToNextTaskOrCap(
absl::optional<sequence_manager::WakeUp> next_main_thread_wake_up,
TimeTicks fast_forward_cap) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
absl::optional<TimeTicks> next_thread_pool_task_time;
if (thread_pool_ && thread_pool_task_tracker_->TasksAllowedToRun()) {
next_thread_pool_task_time =
thread_pool_->NextScheduledRunTimeForTesting();
}
absl::optional<TimeTicks> next_task_time;
if (!next_main_thread_wake_up) {
next_task_time = next_thread_pool_task_time;
} else if (!next_thread_pool_task_time) {
next_task_time = next_main_thread_wake_up->time;
} else {
next_task_time =
std::min(next_main_thread_wake_up->time, *next_thread_pool_task_time);
}
if (next_task_time && *next_task_time <= fast_forward_cap) {
{
AutoLock lock(now_ticks_lock_);
now_ticks_ = std::max(now_ticks_, *next_task_time);
}
if (next_task_time == next_thread_pool_task_time) {
thread_pool_->ProcessRipeDelayedTasksForTesting();
}
if (next_main_thread_wake_up &&
next_task_time == next_main_thread_wake_up->time) {
return NextTaskSource::kMainThreadHasWork;
}
thread_pool_->FlushAsyncForTesting(BindOnce(
&sequence_manager::internal::SequenceManagerImpl::ScheduleWork,
Unretained(sequence_manager_)));
return NextTaskSource::kThreadPoolOnly;
}
if (!fast_forward_cap.is_max()) {
AutoLock lock(now_ticks_lock_);
now_ticks_ = std::max(now_ticks_, fast_forward_cap);
}
return NextTaskSource::kNone;
}
private:
SEQUENCE_CHECKER(sequence_checker_);
raw_ptr<internal::ThreadPoolImpl, DanglingUntriaged> thread_pool_ = nullptr;
raw_ptr<const TestTaskTracker, DanglingUntriaged> thread_pool_task_tracker_ =
nullptr;
const raw_ptr<sequence_manager::internal::SequenceManagerImpl,
DanglingUntriaged>
sequence_manager_;
mutable Lock now_ticks_lock_;
TimeTicks now_ticks_ GUARDED_BY(now_ticks_lock_){
base::subtle::TimeTicksNowIgnoringOverride()
.SnappedToNextTick(TimeTicks(), Milliseconds(1))};
};
TaskEnvironment::MockTimeDomain*
TaskEnvironment::MockTimeDomain::current_mock_time_domain_ = nullptr;
TaskEnvironment::TaskEnvironment(
sequence_manager::SequenceManager::PrioritySettings priority_settings,
TimeSource time_source,
MainThreadType main_thread_type,
ThreadPoolExecutionMode thread_pool_execution_mode,
ThreadingMode threading_mode,
ThreadPoolCOMEnvironment thread_pool_com_environment,
bool subclass_creates_default_taskrunner,
trait_helpers::NotATraitTag)
: main_thread_type_(main_thread_type),
thread_pool_execution_mode_(thread_pool_execution_mode),
threading_mode_(threading_mode),
thread_pool_com_environment_(thread_pool_com_environment),
subclass_creates_default_taskrunner_(subclass_creates_default_taskrunner),
sequence_manager_(
CreateSequenceManagerForMainThreadType(main_thread_type,
std::move(priority_settings))),
mock_time_domain_(
time_source != TimeSource::SYSTEM_TIME
? std::make_unique<TaskEnvironment::MockTimeDomain>(
static_cast<
sequence_manager::internal::SequenceManagerImpl*>(
sequence_manager_.get()))
: nullptr),
time_overrides_(time_source == TimeSource::MOCK_TIME
? std::make_unique<subtle::ScopedTimeClockOverrides>(
&MockTimeDomain::GetTime,
&MockTimeDomain::GetTimeTicks,
nullptr)
: nullptr),
mock_clock_(mock_time_domain_ ? std::make_unique<TickClockBasedClock>(
mock_time_domain_.get())
: nullptr),
scoped_lazy_task_runner_list_for_testing_(
std::make_unique<internal::ScopedLazyTaskRunnerListForTesting>()),
run_loop_timeout_(
mock_time_domain_
? nullptr
: std::make_unique<ScopedRunLoopTimeout>(
FROM_HERE,
TestTimeouts::action_timeout(),
BindRepeating(&sequence_manager::SequenceManager::
DescribeAllPendingTasks,
Unretained(sequence_manager_.get())))) {
CHECK(!base::SingleThreadTaskRunner::HasCurrentDefault());
if (!subclass_creates_default_taskrunner) {
task_queue_ =
sequence_manager_->CreateTaskQueue(sequence_manager::TaskQueue::Spec(
sequence_manager::QueueName::TASK_ENVIRONMENT_DEFAULT_TQ));
task_runner_ = task_queue_->task_runner();
sequence_manager_->SetDefaultTaskRunner(task_runner_);
if (mock_time_domain_) {
sequence_manager_->SetTimeDomain(mock_time_domain_.get());
}
CHECK(base::SingleThreadTaskRunner::HasCurrentDefault())
<< "SingleThreadTaskRunner::CurrentDefaultHandle should've been set "
"now.";
CompleteInitialization();
}
if (threading_mode_ != ThreadingMode::MAIN_THREAD_ONLY) {
InitializeThreadPool();
}
if (thread_pool_execution_mode_ == ThreadPoolExecutionMode::QUEUED &&
task_tracker_) {
CHECK(task_tracker_->DisallowRunTasks());
}
}
TaskEnvironment::TestTaskTracker* TaskEnvironment::CreateThreadPool() {
CHECK(!ThreadPoolInstance::Get())
<< "Someone has already installed a ThreadPoolInstance. If nothing in "
"your test does so, then a test that ran earlier may have installed "
"one and leaked it. base::TestSuite will trap leaked globals, unless "
"someone has explicitly disabled it with "
"DisableCheckForLeakedGlobals().";
auto task_tracker = std::make_unique<TestTaskTracker>();
TestTaskTracker* raw_task_tracker = task_tracker.get();
auto thread_pool = std::make_unique<internal::ThreadPoolImpl>(
std::string(), std::move(task_tracker),
false);
ThreadPoolInstance::Set(std::move(thread_pool));
DCHECK(!g_task_tracker);
g_task_tracker = raw_task_tracker;
return raw_task_tracker;
}
void TaskEnvironment::InitializeThreadPool() {
#if BUILDFLAG(ENABLE_BASE_TRACING)
trace_event::TraceLog::GetInstance();
#endif
task_tracker_ = CreateThreadPool();
if (mock_time_domain_) {
mock_time_domain_->SetThreadPool(
static_cast<internal::ThreadPoolImpl*>(ThreadPoolInstance::Get()),
task_tracker_);
}
ThreadPoolInstance::InitParams init_params(kNumForegroundThreadPoolThreads);
init_params.suggested_reclaim_time = TimeDelta::Max();
#if BUILDFLAG(IS_WIN)
if (thread_pool_com_environment_ == ThreadPoolCOMEnvironment::COM_MTA) {
init_params.common_thread_pool_environment =
ThreadPoolInstance::InitParams::CommonThreadPoolEnvironment::COM_MTA;
}
#endif
ThreadPoolInstance::Get()->Start(init_params);
}
void TaskEnvironment::CompleteInitialization() {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
#if BUILDFLAG(IS_POSIX) || BUILDFLAG(IS_FUCHSIA)
if (main_thread_type() == MainThreadType::IO) {
file_descriptor_watcher_ =
std::make_unique<FileDescriptorWatcher>(GetMainThreadTaskRunner());
}
#endif
}
TaskEnvironment::TaskEnvironment(TaskEnvironment&& other) = default;
TaskEnvironment::~TaskEnvironment() {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
DestroyTaskEnvironment();
}
void TaskEnvironment::DestroyTaskEnvironment() {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
if (!owns_instance_) {
return;
}
owns_instance_.reset();
for (auto& observer : GetDestructionObservers()) {
observer.WillDestroyCurrentTaskEnvironment();
}
ShutdownAndJoinThreadPool();
task_queue_ = nullptr;
sequence_manager_.reset();
DestroyThreadPool();
}
void TaskEnvironment::ShutdownAndJoinThreadPool() {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
if (threading_mode_ == ThreadingMode::MAIN_THREAD_ONLY) {
return;
}
DCHECK(ThreadPoolInstance::Get());
task_tracker_->AllowRunTasks();
ThreadPoolInstance::Get()->FlushForTesting();
ThreadPoolInstance::Get()->Shutdown();
ThreadPoolInstance::Get()->JoinForTesting();
DCHECK_EQ(g_task_tracker, task_tracker_);
g_task_tracker = nullptr;
}
void TaskEnvironment::DestroyThreadPool() {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
if (threading_mode_ == ThreadingMode::MAIN_THREAD_ONLY) {
return;
}
DCHECK(ThreadPoolInstance::Get());
scoped_lazy_task_runner_list_for_testing_.reset();
ScopedAllowBaseSyncPrimitivesForTesting allow_waits_to_destroy_task_tracker;
ThreadPoolInstance::Set(nullptr);
}
sequence_manager::TimeDomain* TaskEnvironment::GetMockTimeDomain() const {
return mock_time_domain_.get();
}
sequence_manager::SequenceManager* TaskEnvironment::sequence_manager() const {
DCHECK(subclass_creates_default_taskrunner_);
return sequence_manager_.get();
}
void TaskEnvironment::DeferredInitFromSubclass(
scoped_refptr<base::SingleThreadTaskRunner> task_runner) {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
task_runner_ = std::move(task_runner);
sequence_manager_->SetDefaultTaskRunner(task_runner_);
CompleteInitialization();
}
scoped_refptr<base::SingleThreadTaskRunner>
TaskEnvironment::GetMainThreadTaskRunner() {
DCHECK(task_runner_);
return task_runner_;
}
bool TaskEnvironment::MainThreadIsIdle() const {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
sequence_manager::internal::SequenceManagerImpl* sequence_manager_impl =
static_cast<sequence_manager::internal::SequenceManagerImpl*>(
sequence_manager_.get());
sequence_manager_impl->ReclaimMemory();
return sequence_manager_impl->IsIdleForTesting();
}
RepeatingClosure TaskEnvironment::QuitClosure() {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
if (!run_until_quit_loop_) {
run_until_quit_loop_ =
std::make_unique<RunLoop>(RunLoop::Type::kNestableTasksAllowed);
}
return run_until_quit_loop_->QuitClosure();
}
void TaskEnvironment::RunUntilQuit() {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
DCHECK(run_until_quit_loop_)
<< "QuitClosure() not called before RunUntilQuit()";
const bool could_run_tasks = task_tracker_->AllowRunTasks();
run_until_quit_loop_->Run();
run_until_quit_loop_.reset();
if (!could_run_tasks) {
EXPECT_TRUE(
task_tracker_->DisallowRunTasks(TestTimeouts::action_max_timeout()))
<< "Could not bring ThreadPool back to ThreadPoolExecutionMode::QUEUED "
"after Quit() because some tasks were long running:\n"
<< task_tracker_->DescribeRunningTasks();
}
}
void TaskEnvironment::RunUntilIdle() {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
if (threading_mode_ == ThreadingMode::MAIN_THREAD_ONLY) {
RunLoop(RunLoop::Type::kNestableTasksAllowed).RunUntilIdle();
return;
}
const bool could_run_tasks = task_tracker_->AllowRunTasks();
for (;;) {
task_tracker_->AllowRunTasks();
RunLoop(RunLoop::Type::kNestableTasksAllowed).RunUntilIdle();
if (!task_tracker_->DisallowRunTasks()) {
continue;
}
RunLoop(RunLoop::Type::kNestableTasksAllowed).RunUntilIdle();
if (!task_tracker_->HasIncompleteTaskSourcesForTesting()) {
break;
}
}
if (could_run_tasks) {
task_tracker_->AllowRunTasks();
}
}
void TaskEnvironment::FastForwardBy(TimeDelta delta) {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
DCHECK(mock_time_domain_);
DCHECK_GE(delta, TimeDelta());
const bool could_run_tasks = task_tracker_ && task_tracker_->AllowRunTasks();
const TimeTicks fast_forward_until = mock_time_domain_->NowTicks() + delta;
do {
RunUntilIdle();
sequence_manager_->ReclaimMemory();
} while (mock_time_domain_->FastForwardToNextTaskOrCap(
sequence_manager_->GetNextDelayedWakeUp(), fast_forward_until) !=
MockTimeDomain::NextTaskSource::kNone);
if (task_tracker_ && !could_run_tasks) {
task_tracker_->DisallowRunTasks();
}
}
void TaskEnvironment::FastForwardUntilNoTasksRemain() {
FastForwardBy(TimeDelta::Max());
}
void TaskEnvironment::AdvanceClock(TimeDelta delta) {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
DCHECK(mock_time_domain_);
DCHECK_GE(delta, TimeDelta());
mock_time_domain_->AdvanceClock(delta);
}
const TickClock* TaskEnvironment::GetMockTickClock() const {
DCHECK(mock_time_domain_);
return mock_time_domain_.get();
}
base::TimeTicks TaskEnvironment::NowTicks() const {
DCHECK(mock_time_domain_);
return mock_time_domain_->NowTicks();
}
const Clock* TaskEnvironment::GetMockClock() const {
DCHECK(mock_clock_);
return mock_clock_.get();
}
size_t TaskEnvironment::GetPendingMainThreadTaskCount() const {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
sequence_manager_->ReclaimMemory();
return sequence_manager_->GetPendingTaskCountForTesting();
}
TimeDelta TaskEnvironment::NextMainThreadPendingTaskDelay() const {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
sequence_manager_->ReclaimMemory();
DCHECK(mock_time_domain_);
LazyNow lazy_now(mock_time_domain_->NowTicks());
if (!sequence_manager_->IsIdleForTesting()) {
return TimeDelta();
}
absl::optional<sequence_manager::WakeUp> wake_up =
sequence_manager_->GetNextDelayedWakeUp();
return wake_up ? wake_up->time - lazy_now.Now() : TimeDelta::Max();
}
bool TaskEnvironment::NextTaskIsDelayed() const {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
TimeDelta delay = NextMainThreadPendingTaskDelay();
return !delay.is_zero() && !delay.is_max();
}
void TaskEnvironment::DescribeCurrentTasks() const {
DCHECK_CALLED_ON_VALID_THREAD(main_thread_checker_);
LOG(INFO) << task_tracker_->DescribeRunningTasks();
LOG(INFO) << sequence_manager_->DescribeAllPendingTasks();
}
void TaskEnvironment::DetachFromThread() {
DETACH_FROM_THREAD(main_thread_checker_);
if (task_tracker_) {
task_tracker_->controller_thread_checker_.DetachFromThread();
}
}
void TaskEnvironment::AddDestructionObserver(DestructionObserver* observer) {
GetDestructionObservers().AddObserver(observer);
}
void TaskEnvironment::RemoveDestructionObserver(DestructionObserver* observer) {
GetDestructionObservers().RemoveObserver(observer);
}
TaskEnvironment::ParallelExecutionFence::ParallelExecutionFence(
const char* error_message) {
CHECK(!g_task_tracker || g_task_tracker->OnControllerThread())
<< error_message;
if (g_task_tracker) {
previously_allowed_to_run_ = g_task_tracker->TasksAllowedToRun() &&
!g_task_tracker->IsShutdownComplete();
while (previously_allowed_to_run_ &&
!g_task_tracker->DisallowRunTasks(Seconds(5))) {
LOG(WARNING) << "Installing ParallelExecutionFence is slow because of "
"these running tasks:\n"
<< g_task_tracker->DescribeRunningTasks()
<< "\nParallelExecutionFence requested by:\n"
<< debug::StackTrace();
}
} else if (ThreadPoolInstance::Get()) {
LOG(WARNING)
<< "ParallelExecutionFence is ineffective when ThreadPoolInstance is "
"not managed by a TaskEnvironment.\n"
"Test fixtures should use a TaskEnvironment member or statically "
"invoke TaskEnvironment::CreateThreadPool() + "
"ThreadPoolInstance::Get()->StartWithDefaultParams() when the "
"former is not possible.";
}
}
TaskEnvironment::ParallelExecutionFence::~ParallelExecutionFence() {
if (previously_allowed_to_run_) {
g_task_tracker->AllowRunTasks();
}
}
TaskEnvironment::TestTaskTracker::TestTaskTracker()
: can_run_tasks_cv_(&lock_), task_completed_cv_(&lock_) {
can_run_tasks_cv_.declare_only_used_while_idle();
task_completed_cv_.declare_only_used_while_idle();
}
bool TaskEnvironment::TestTaskTracker::AllowRunTasks() {
AutoLock auto_lock(lock_);
const bool could_run_tasks = can_run_tasks_;
can_run_tasks_ = true;
can_run_tasks_cv_.Broadcast();
return could_run_tasks;
}
bool TaskEnvironment::TestTaskTracker::TasksAllowedToRun() const {
AutoLock auto_lock(lock_);
return can_run_tasks_;
}
bool TaskEnvironment::TestTaskTracker::DisallowRunTasks(TimeDelta timeout) {
DCHECK(OnControllerThread());
AutoLock auto_lock(lock_);
for (TimeTicks now = subtle::TimeTicksNowIgnoringOverride(),
end = now + timeout;
!running_tasks_.empty() && now < end;
now = subtle::TimeTicksNowIgnoringOverride()) {
task_completed_cv_.TimedWait(end - now);
}
if (!running_tasks_.empty()) {
DCHECK(!IsShutdownComplete());
return false;
}
can_run_tasks_ = false;
return true;
}
void TaskEnvironment::TestTaskTracker::RunTask(internal::Task task,
internal::TaskSource* sequence,
const TaskTraits& traits) {
const Location posted_from = task.posted_from;
int task_number;
{
AutoLock auto_lock(lock_);
while (!can_run_tasks_) {
can_run_tasks_cv_.Wait();
}
task_number = next_task_number_++;
auto pair = running_tasks_.emplace(task_number, posted_from);
CHECK(pair.second);
}
base::TimeTicks before = base::subtle::TimeTicksNowIgnoringOverride();
internal::ThreadPoolImpl::TaskTrackerImpl::RunTask(std::move(task), sequence,
traits);
base::TimeTicks after = base::subtle::TimeTicksNowIgnoringOverride();
const TimeDelta kTimeout = TestTimeouts::action_max_timeout();
if ((after - before) > kTimeout) {
ADD_FAILURE() << "TaskEnvironment: RunTask took more than "
<< kTimeout.InSeconds() << " seconds. Posted from "
<< posted_from.ToString();
}
{
AutoLock auto_lock(lock_);
CHECK(can_run_tasks_);
size_t found = running_tasks_.erase(task_number);
CHECK_EQ(1u, found);
task_completed_cv_.Broadcast();
}
}
std::string TaskEnvironment::TestTaskTracker::DescribeRunningTasks() const {
base::flat_map<int64_t, Location> running_tasks_copy;
{
AutoLock auto_lock(lock_);
running_tasks_copy = running_tasks_;
}
std::string running_tasks_str = "ThreadPool currently running tasks:";
if (running_tasks_copy.empty()) {
running_tasks_str += " none.";
} else {
for (auto& pair : running_tasks_copy) {
running_tasks_str += "\n Task posted from: " + pair.second.ToString();
}
}
return running_tasks_str;
}
void TaskEnvironment::TestTaskTracker::BeginCompleteShutdown(
base::WaitableEvent& shutdown_event) {
const TimeDelta kTimeout = TestTimeouts::action_max_timeout();
if (shutdown_event.TimedWait(kTimeout)) {
return;
}
std::string failure_tasks = DescribeRunningTasks();
ADD_FAILURE() << "TaskEnvironment: CompleteShutdown took more than "
<< kTimeout.InSeconds() << " seconds.\n"
<< failure_tasks;
base::Process::TerminateCurrentProcessImmediately(-1);
}
void TaskEnvironment::TestTaskTracker::AssertFlushForTestingAllowed() {
AutoLock auto_lock(lock_);
ASSERT_TRUE(can_run_tasks_)
<< "FlushForTesting() requires ThreadPool tasks to be allowed to run or "
"it will hang. Note: DisallowRunTasks happens implicitly on-and-off "
"during TaskEnvironment::RunUntilIdle and main thread tasks running "
"under it should thus never FlushForTesting().";
}
}
}