#include "base/task/thread_pool/task_tracker.h"
#include <stdint.h>
#include <memory>
#include <utility>
#include <vector>
#include "base/barrier_closure.h"
#include "base/check_op.h"
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/functional/callback_helpers.h"
#include "base/memory/ptr_util.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/ref_counted.h"
#include "base/metrics/histogram_base.h"
#include "base/metrics/histogram_samples.h"
#include "base/sequence_token.h"
#include "base/synchronization/atomic_flag.h"
#include "base/task/common/checked_lock.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/single_thread_task_runner.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool/task.h"
#include "base/task/thread_pool/test_utils.h"
#include "base/test/bind.h"
#include "base/test/gtest_util.h"
#include "base/test/metrics/histogram_tester.h"
#include "base/test/test_simple_task_runner.h"
#include "base/test/test_timeouts.h"
#include "base/test/test_waitable_event.h"
#include "base/threading/platform_thread.h"
#include "base/threading/scoped_blocking_call.h"
#include "base/threading/simple_thread.h"
#include "base/threading/thread_restrictions.h"
#include "testing/gmock/include/gmock/gmock.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base {
namespace internal {
namespace {
constexpr size_t kLoadTestNumIterations = 75;
class CallbackThread : public SimpleThread {
public:
explicit CallbackThread(OnceClosure closure)
: SimpleThread("CallbackThread"), closure_(std::move(closure)) {}
CallbackThread(const CallbackThread&) = delete;
CallbackThread& operator=(const CallbackThread&) = delete;
bool has_returned() { return has_returned_.IsSet(); }
private:
void Run() override {
std::move(closure_).Run();
has_returned_.Set();
}
OnceClosure closure_;
AtomicFlag has_returned_;
};
class ThreadPostingAndRunningTask : public SimpleThread {
public:
enum class Action {
WILL_POST,
RUN,
WILL_POST_AND_RUN,
};
ThreadPostingAndRunningTask(TaskTracker* tracker,
scoped_refptr<Sequence> sequence,
Action action,
bool expect_post_succeeds,
Task task)
: SimpleThread("ThreadPostingAndRunningTask"),
tracker_(tracker),
task_(std::move(task)),
sequence_(std::move(sequence)),
action_(action),
expect_post_succeeds_(expect_post_succeeds) {
EXPECT_TRUE(task_.task);
EXPECT_TRUE(sequence_);
EXPECT_NE(Action::RUN, action_);
}
ThreadPostingAndRunningTask(TaskTracker* tracker,
RegisteredTaskSource task_source)
: SimpleThread("ThreadPostingAndRunningTask"),
tracker_(tracker),
task_source_(std::move(task_source)),
action_(Action::RUN),
expect_post_succeeds_(false) {
EXPECT_TRUE(task_source_);
}
ThreadPostingAndRunningTask(const ThreadPostingAndRunningTask&) = delete;
ThreadPostingAndRunningTask& operator=(const ThreadPostingAndRunningTask&) =
delete;
RegisteredTaskSource TakeTaskSource() { return std::move(task_source_); }
private:
void Run() override {
bool post_and_queue_succeeded = true;
if (action_ == Action::WILL_POST || action_ == Action::WILL_POST_AND_RUN) {
EXPECT_TRUE(task_.task);
post_and_queue_succeeded =
tracker_->WillPostTask(&task_, sequence_->shutdown_behavior());
{
auto transaction = sequence_->BeginTransaction();
transaction.WillPushImmediateTask();
transaction.PushImmediateTask(std::move(task_));
}
task_source_ = tracker_->RegisterTaskSource(std::move(sequence_));
post_and_queue_succeeded &= !!task_source_;
EXPECT_EQ(expect_post_succeeds_, post_and_queue_succeeded);
}
if (post_and_queue_succeeded &&
(action_ == Action::RUN || action_ == Action::WILL_POST_AND_RUN)) {
EXPECT_TRUE(task_source_);
task_source_.WillRunTask();
EXPECT_FALSE(tracker_->RunAndPopNextTask(std::move(task_source_)));
}
}
const raw_ptr<TaskTracker> tracker_;
Task task_;
scoped_refptr<Sequence> sequence_;
RegisteredTaskSource task_source_;
const Action action_;
const bool expect_post_succeeds_;
};
class ThreadPoolTaskTrackerTest
: public testing::TestWithParam<TaskShutdownBehavior> {
public:
ThreadPoolTaskTrackerTest(const ThreadPoolTaskTrackerTest&) = delete;
ThreadPoolTaskTrackerTest& operator=(const ThreadPoolTaskTrackerTest&) =
delete;
protected:
ThreadPoolTaskTrackerTest() = default;
Task CreateTask() {
return Task(
FROM_HERE,
BindOnce(&ThreadPoolTaskTrackerTest::RunTaskCallback, Unretained(this)),
TimeTicks::Now(), TimeDelta());
}
RegisteredTaskSource WillPostTaskAndQueueTaskSource(
Task task,
const TaskTraits& traits) {
if (!tracker_.WillPostTask(&task, traits.shutdown_behavior()))
return nullptr;
auto sequence = test::CreateSequenceWithTask(std::move(task), traits);
return tracker_.RegisterTaskSource(std::move(sequence));
}
RegisteredTaskSource RunAndPopNextTask(RegisteredTaskSource task_source) {
task_source.WillRunTask();
return tracker_.RunAndPopNextTask(std::move(task_source));
}
void ExpectAsyncCompleteShutdownBlocks() {
ASSERT_FALSE(thread_calling_shutdown_);
ASSERT_TRUE(tracker_.HasShutdownStarted());
thread_calling_shutdown_ = std::make_unique<CallbackThread>(
BindOnce(&TaskTracker::CompleteShutdown, Unretained(&tracker_)));
thread_calling_shutdown_->Start();
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
VerifyAsyncShutdownInProgress();
}
void WaitForAsyncIsShutdownComplete() {
ASSERT_TRUE(thread_calling_shutdown_);
thread_calling_shutdown_->Join();
EXPECT_TRUE(thread_calling_shutdown_->has_returned());
EXPECT_TRUE(tracker_.IsShutdownComplete());
}
void VerifyAsyncShutdownInProgress() {
ASSERT_TRUE(thread_calling_shutdown_);
EXPECT_FALSE(thread_calling_shutdown_->has_returned());
EXPECT_TRUE(tracker_.HasShutdownStarted());
EXPECT_FALSE(tracker_.IsShutdownComplete());
}
void CallFlushFromAnotherThread() {
threads_calling_flush_.push_back(std::make_unique<CallbackThread>(
BindOnce(&TaskTracker::FlushForTesting, Unretained(&tracker_))));
threads_calling_flush_.back()->Start();
}
void WaitForAsyncFlushesReturned() {
ASSERT_GE(threads_calling_flush_.size(), 1U);
for (auto& thread_calling_flush : threads_calling_flush_) {
thread_calling_flush->Join();
EXPECT_TRUE(thread_calling_flush->has_returned());
}
}
void VerifyAsyncFlushesInProgress() {
ASSERT_GE(threads_calling_flush_.size(), 1U);
for (auto& thread_calling_flush : threads_calling_flush_) {
EXPECT_FALSE(thread_calling_flush->has_returned());
}
}
size_t NumTasksExecuted() {
CheckedAutoLock auto_lock(lock_);
return num_tasks_executed_;
}
TaskTracker tracker_;
private:
void RunTaskCallback() {
CheckedAutoLock auto_lock(lock_);
++num_tasks_executed_;
}
std::unique_ptr<CallbackThread> thread_calling_shutdown_;
std::vector<std::unique_ptr<CallbackThread>> threads_calling_flush_;
CheckedLock lock_;
size_t num_tasks_executed_ = 0;
};
#define WAIT_FOR_ASYNC_SHUTDOWN_COMPLETED() \
do { \
SCOPED_TRACE(""); \
WaitForAsyncIsShutdownComplete(); \
} while (false)
#define VERIFY_ASYNC_SHUTDOWN_IN_PROGRESS() \
do { \
SCOPED_TRACE(""); \
VerifyAsyncShutdownInProgress(); \
} while (false)
#define WAIT_FOR_ASYNC_FLUSHES_RETURNED() \
do { \
SCOPED_TRACE(""); \
WaitForAsyncFlushesReturned(); \
} while (false)
#define VERIFY_ASYNC_FLUSHES_IN_PROGRESS() \
do { \
SCOPED_TRACE(""); \
VerifyAsyncFlushesInProgress(); \
} while (false)
}
TEST_P(ThreadPoolTaskTrackerTest, WillPostAndRunBeforeShutdown) {
Task task(CreateTask());
EXPECT_TRUE(tracker_.WillPostTask(&task, GetParam()));
EXPECT_EQ(0U, NumTasksExecuted());
test::QueueAndRunTaskSource(
&tracker_, test::CreateSequenceWithTask(std::move(task), {GetParam()}));
EXPECT_EQ(1U, NumTasksExecuted());
test::ShutdownTaskTracker(&tracker_);
}
TEST_P(ThreadPoolTaskTrackerTest, WillPostAndRunLongTaskBeforeShutdown) {
TestWaitableEvent task_running;
TestWaitableEvent task_barrier;
Task blocked_task(FROM_HERE, BindLambdaForTesting([&]() {
task_running.Signal();
task_barrier.Wait();
}),
TimeTicks::Now(), TimeDelta());
auto sequence =
WillPostTaskAndQueueTaskSource(std::move(blocked_task), {GetParam()});
EXPECT_TRUE(sequence);
ThreadPostingAndRunningTask thread_running_task(&tracker_,
std::move(sequence));
thread_running_task.Start();
task_running.Wait();
tracker_.StartShutdown();
if (GetParam() == TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN) {
tracker_.CompleteShutdown();
} else {
ExpectAsyncCompleteShutdownBlocks();
}
task_barrier.Signal();
thread_running_task.Join();
if (GetParam() != TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN)
WAIT_FOR_ASYNC_SHUTDOWN_COMPLETED();
}
TEST_P(ThreadPoolTaskTrackerTest, WillPostBeforeShutdownQueueDuringShutdown) {
Task task{CreateTask()};
EXPECT_TRUE(tracker_.WillPostTask(&task, GetParam()));
auto sequence = test::CreateSequenceWithTask(std::move(task), {GetParam()});
auto block_shutdown_sequence = WillPostTaskAndQueueTaskSource(
CreateTask(), {TaskShutdownBehavior::BLOCK_SHUTDOWN});
EXPECT_TRUE(block_shutdown_sequence);
tracker_.StartShutdown();
ExpectAsyncCompleteShutdownBlocks();
const bool should_run = GetParam() == TaskShutdownBehavior::BLOCK_SHUTDOWN;
if (should_run) {
test::QueueAndRunTaskSource(&tracker_, std::move(sequence));
EXPECT_EQ(1U, NumTasksExecuted());
VERIFY_ASYNC_SHUTDOWN_IN_PROGRESS();
} else {
EXPECT_FALSE(tracker_.RegisterTaskSource(std::move(sequence)));
}
RunAndPopNextTask(std::move(block_shutdown_sequence));
EXPECT_EQ(should_run ? 2U : 1U, NumTasksExecuted());
WAIT_FOR_ASYNC_SHUTDOWN_COMPLETED();
}
TEST_P(ThreadPoolTaskTrackerTest, WillPostBeforeShutdownRunDuringShutdown) {
auto sequence = WillPostTaskAndQueueTaskSource(CreateTask(), {GetParam()});
EXPECT_TRUE(sequence);
auto block_shutdown_sequence = WillPostTaskAndQueueTaskSource(
CreateTask(), {TaskShutdownBehavior::BLOCK_SHUTDOWN});
EXPECT_TRUE(block_shutdown_sequence);
tracker_.StartShutdown();
ExpectAsyncCompleteShutdownBlocks();
EXPECT_EQ(0U, NumTasksExecuted());
const bool should_run = GetParam() == TaskShutdownBehavior::BLOCK_SHUTDOWN;
RunAndPopNextTask(std::move(sequence));
EXPECT_EQ(should_run ? 1U : 0U, NumTasksExecuted());
VERIFY_ASYNC_SHUTDOWN_IN_PROGRESS();
RunAndPopNextTask(std::move(block_shutdown_sequence));
EXPECT_EQ(should_run ? 2U : 1U, NumTasksExecuted());
WAIT_FOR_ASYNC_SHUTDOWN_COMPLETED();
}
TEST_P(ThreadPoolTaskTrackerTest, WillPostBeforeShutdownRunAfterShutdown) {
auto sequence = WillPostTaskAndQueueTaskSource(CreateTask(), {GetParam()});
EXPECT_TRUE(sequence);
tracker_.StartShutdown();
EXPECT_EQ(0U, NumTasksExecuted());
if (GetParam() == TaskShutdownBehavior::BLOCK_SHUTDOWN) {
ExpectAsyncCompleteShutdownBlocks();
RunAndPopNextTask(std::move(sequence));
EXPECT_EQ(1U, NumTasksExecuted());
WAIT_FOR_ASYNC_SHUTDOWN_COMPLETED();
} else {
tracker_.CompleteShutdown();
RunAndPopNextTask(std::move(sequence));
EXPECT_EQ(0U, NumTasksExecuted());
}
}
TEST_P(ThreadPoolTaskTrackerTest, WillPostAndRunDuringShutdown) {
auto block_shutdown_sequence = WillPostTaskAndQueueTaskSource(
CreateTask(), {TaskShutdownBehavior::BLOCK_SHUTDOWN});
EXPECT_TRUE(block_shutdown_sequence);
tracker_.StartShutdown();
if (GetParam() == TaskShutdownBehavior::BLOCK_SHUTDOWN) {
auto sequence = WillPostTaskAndQueueTaskSource(CreateTask(), {GetParam()});
EXPECT_TRUE(sequence);
EXPECT_EQ(0U, NumTasksExecuted());
RunAndPopNextTask(std::move(sequence));
EXPECT_EQ(1U, NumTasksExecuted());
} else {
auto sequence = WillPostTaskAndQueueTaskSource(CreateTask(), {GetParam()});
EXPECT_FALSE(sequence);
}
ExpectAsyncCompleteShutdownBlocks();
RunAndPopNextTask(std::move(block_shutdown_sequence));
EXPECT_EQ(GetParam() == TaskShutdownBehavior::BLOCK_SHUTDOWN ? 2U : 1U,
NumTasksExecuted());
WAIT_FOR_ASYNC_SHUTDOWN_COMPLETED();
}
TEST_P(ThreadPoolTaskTrackerTest, WillPostAfterShutdown) {
test::ShutdownTaskTracker(&tracker_);
Task task(CreateTask());
if (GetParam() == TaskShutdownBehavior::BLOCK_SHUTDOWN) {
tracker_.BeginFizzlingBlockShutdownTasks();
EXPECT_FALSE(tracker_.WillPostTask(&task, GetParam()));
tracker_.EndFizzlingBlockShutdownTasks();
EXPECT_DCHECK_DEATH(tracker_.WillPostTask(&task, GetParam()));
} else {
EXPECT_FALSE(tracker_.WillPostTask(&task, GetParam()));
}
}
TEST_P(ThreadPoolTaskTrackerTest, SingletonAllowed) {
const bool can_use_singletons =
(GetParam() != TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN);
Task task(FROM_HERE, BindOnce(&internal::AssertSingletonAllowed),
TimeTicks::Now(), TimeDelta());
auto sequence = WillPostTaskAndQueueTaskSource(std::move(task), {GetParam()});
EXPECT_TRUE(sequence);
if (can_use_singletons) {
EXPECT_FALSE(RunAndPopNextTask(std::move(sequence)));
} else {
EXPECT_DCHECK_DEATH({ RunAndPopNextTask(std::move(sequence)); });
}
}
TEST_P(ThreadPoolTaskTrackerTest, IOAllowed) {
Task task_with_may_block(FROM_HERE, BindOnce([]() {
ScopedBlockingCall scope_blocking_call(
FROM_HERE, BlockingType::WILL_BLOCK);
}),
TimeTicks::Now(), TimeDelta());
TaskTraits traits_with_may_block{MayBlock(), GetParam()};
auto sequence_with_may_block = WillPostTaskAndQueueTaskSource(
std::move(task_with_may_block), traits_with_may_block);
EXPECT_TRUE(sequence_with_may_block);
RunAndPopNextTask(std::move(sequence_with_may_block));
Task task_without_may_block(FROM_HERE, BindOnce([]() {
EXPECT_DCHECK_DEATH({
ScopedBlockingCall scope_blocking_call(
FROM_HERE, BlockingType::WILL_BLOCK);
});
}),
TimeTicks::Now(), TimeDelta());
TaskTraits traits_without_may_block = TaskTraits(GetParam());
auto sequence_without_may_block = WillPostTaskAndQueueTaskSource(
std::move(task_without_may_block), traits_without_may_block);
EXPECT_TRUE(sequence_without_may_block);
RunAndPopNextTask(std::move(sequence_without_may_block));
}
static void RunTaskRunnerCurrentDefaultHandleVerificationTask(
TaskTracker* tracker,
Task verify_task,
TaskTraits traits,
scoped_refptr<TaskRunner> task_runner,
TaskSourceExecutionMode execution_mode) {
EXPECT_TRUE(tracker->WillPostTask(&verify_task, traits.shutdown_behavior()));
EXPECT_FALSE(SingleThreadTaskRunner::HasCurrentDefault());
EXPECT_FALSE(SequencedTaskRunner::HasCurrentDefault());
test::QueueAndRunTaskSource(
tracker,
test::CreateSequenceWithTask(std::move(verify_task), traits,
std::move(task_runner), execution_mode));
EXPECT_FALSE(SingleThreadTaskRunner::HasCurrentDefault());
EXPECT_FALSE(SequencedTaskRunner::HasCurrentDefault());
}
static void VerifyNoTaskRunnerCurrentDefaultHandle() {
EXPECT_FALSE(SingleThreadTaskRunner::HasCurrentDefault());
EXPECT_FALSE(SequencedTaskRunner::HasCurrentDefault());
}
TEST_P(ThreadPoolTaskTrackerTest, TaskRunnerHandleIsNotSetOnParallel) {
Task verify_task(FROM_HERE, BindOnce(&VerifyNoTaskRunnerCurrentDefaultHandle),
TimeTicks::Now(), TimeDelta());
RunTaskRunnerCurrentDefaultHandleVerificationTask(
&tracker_, std::move(verify_task), TaskTraits(GetParam()), nullptr,
TaskSourceExecutionMode::kParallel);
}
static void VerifySequencedTaskRunnerCurrentDefaultHandle(
const SequencedTaskRunner* expected_task_runner) {
EXPECT_FALSE(SingleThreadTaskRunner::HasCurrentDefault());
EXPECT_TRUE(SequencedTaskRunner::HasCurrentDefault());
EXPECT_EQ(expected_task_runner, SequencedTaskRunner::GetCurrentDefault());
}
TEST_P(ThreadPoolTaskTrackerTest,
SequencedTaskRunnerHasCurrentDefaultOnSequenced) {
scoped_refptr<SequencedTaskRunner> test_task_runner(new TestSimpleTaskRunner);
Task verify_task(FROM_HERE,
BindOnce(&VerifySequencedTaskRunnerCurrentDefaultHandle,
Unretained(test_task_runner.get())),
TimeTicks::Now(), TimeDelta());
RunTaskRunnerCurrentDefaultHandleVerificationTask(
&tracker_, std::move(verify_task), TaskTraits(GetParam()),
std::move(test_task_runner), TaskSourceExecutionMode::kSequenced);
}
static void VerifySingleThreadTaskRunnerCurrentDefaultHandle(
const SingleThreadTaskRunner* expected_task_runner) {
EXPECT_TRUE(SingleThreadTaskRunner::HasCurrentDefault());
EXPECT_TRUE(SequencedTaskRunner::HasCurrentDefault());
EXPECT_EQ(expected_task_runner, SingleThreadTaskRunner::GetCurrentDefault());
}
TEST_P(ThreadPoolTaskTrackerTest,
SingleThreadTaskRunnerCurrentDefaultHandleIsSetOnSingleThreaded) {
scoped_refptr<SingleThreadTaskRunner> test_task_runner(
new TestSimpleTaskRunner);
Task verify_task(FROM_HERE,
BindOnce(&VerifySingleThreadTaskRunnerCurrentDefaultHandle,
Unretained(test_task_runner.get())),
TimeTicks::Now(), TimeDelta());
RunTaskRunnerCurrentDefaultHandleVerificationTask(
&tracker_, std::move(verify_task), TaskTraits(GetParam()),
std::move(test_task_runner), TaskSourceExecutionMode::kSingleThread);
}
TEST_P(ThreadPoolTaskTrackerTest, FlushPendingDelayedTask) {
Task delayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), Days(1));
tracker_.WillPostTask(&delayed_task, GetParam());
tracker_.FlushForTesting();
}
TEST_P(ThreadPoolTaskTrackerTest, FlushAsyncForTestingPendingDelayedTask) {
Task delayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), Days(1));
tracker_.WillPostTask(&delayed_task, GetParam());
bool called_back = false;
tracker_.FlushAsyncForTesting(
BindOnce([](bool* called_back) { *called_back = true; },
Unretained(&called_back)));
EXPECT_TRUE(called_back);
}
TEST_P(ThreadPoolTaskTrackerTest, FlushPendingUndelayedTask) {
Task undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
auto undelayed_sequence =
WillPostTaskAndQueueTaskSource(std::move(undelayed_task), {GetParam()});
CallFlushFromAnotherThread();
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
VERIFY_ASYNC_FLUSHES_IN_PROGRESS();
RunAndPopNextTask(std::move(undelayed_sequence));
WAIT_FOR_ASYNC_FLUSHES_RETURNED();
}
TEST_P(ThreadPoolTaskTrackerTest, MultipleFlushes) {
Task undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
auto undelayed_sequence =
WillPostTaskAndQueueTaskSource(std::move(undelayed_task), {GetParam()});
CallFlushFromAnotherThread();
CallFlushFromAnotherThread();
CallFlushFromAnotherThread();
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
VERIFY_ASYNC_FLUSHES_IN_PROGRESS();
RunAndPopNextTask(std::move(undelayed_sequence));
WAIT_FOR_ASYNC_FLUSHES_RETURNED();
}
TEST_P(ThreadPoolTaskTrackerTest, FlushAsyncForTestingPendingUndelayedTask) {
Task undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
auto undelayed_sequence =
WillPostTaskAndQueueTaskSource(std::move(undelayed_task), {GetParam()});
TestWaitableEvent event;
tracker_.FlushAsyncForTesting(
BindOnce(&TestWaitableEvent::Signal, Unretained(&event)));
EXPECT_FALSE(event.IsSignaled());
RunAndPopNextTask(std::move(undelayed_sequence));
event.Wait();
}
TEST_P(ThreadPoolTaskTrackerTest, MultipleFlushAsyncForTesting) {
Task undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
auto undelayed_sequence =
WillPostTaskAndQueueTaskSource(std::move(undelayed_task), {GetParam()});
TestWaitableEvent three_callbacks_ran;
auto on_flush_done = BarrierClosure(
3,
BindOnce(&TestWaitableEvent::Signal, Unretained(&three_callbacks_ran)));
tracker_.FlushAsyncForTesting(on_flush_done);
tracker_.FlushAsyncForTesting(on_flush_done);
tracker_.FlushAsyncForTesting(on_flush_done);
EXPECT_FALSE(three_callbacks_ran.IsSignaled());
RunAndPopNextTask(std::move(undelayed_sequence));
three_callbacks_ran.Wait();
}
TEST_P(ThreadPoolTaskTrackerTest, PostTaskDuringFlush) {
Task undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
auto undelayed_sequence =
WillPostTaskAndQueueTaskSource(std::move(undelayed_task), {GetParam()});
CallFlushFromAnotherThread();
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
VERIFY_ASYNC_FLUSHES_IN_PROGRESS();
Task other_undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(),
TimeDelta());
auto other_undelayed_sequence = WillPostTaskAndQueueTaskSource(
std::move(other_undelayed_task), {GetParam()});
RunAndPopNextTask(std::move(undelayed_sequence));
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
VERIFY_ASYNC_FLUSHES_IN_PROGRESS();
RunAndPopNextTask(std::move(other_undelayed_sequence));
WAIT_FOR_ASYNC_FLUSHES_RETURNED();
}
TEST_P(ThreadPoolTaskTrackerTest, PostTaskDuringFlushAsyncForTesting) {
Task undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
auto undelayed_sequence =
WillPostTaskAndQueueTaskSource(std::move(undelayed_task), {GetParam()});
TestWaitableEvent event;
tracker_.FlushAsyncForTesting(
BindOnce(&TestWaitableEvent::Signal, Unretained(&event)));
EXPECT_FALSE(event.IsSignaled());
Task other_undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(),
TimeDelta());
auto other_undelayed_sequence = WillPostTaskAndQueueTaskSource(
std::move(other_undelayed_task), {GetParam()});
RunAndPopNextTask(std::move(undelayed_sequence));
EXPECT_FALSE(event.IsSignaled());
RunAndPopNextTask(std::move(other_undelayed_sequence));
event.Wait();
}
TEST_P(ThreadPoolTaskTrackerTest, RunDelayedTaskDuringFlush) {
Task delayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), Days(1));
auto delayed_sequence =
WillPostTaskAndQueueTaskSource(std::move(delayed_task), {GetParam()});
Task undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
auto undelayed_sequence =
WillPostTaskAndQueueTaskSource(std::move(undelayed_task), {GetParam()});
CallFlushFromAnotherThread();
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
VERIFY_ASYNC_FLUSHES_IN_PROGRESS();
RunAndPopNextTask(std::move(delayed_sequence));
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
VERIFY_ASYNC_FLUSHES_IN_PROGRESS();
RunAndPopNextTask(std::move(undelayed_sequence));
WAIT_FOR_ASYNC_FLUSHES_RETURNED();
}
TEST_P(ThreadPoolTaskTrackerTest, RunDelayedTaskDuringFlushAsyncForTesting) {
Task delayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), Days(1));
auto delayed_sequence =
WillPostTaskAndQueueTaskSource(std::move(delayed_task), {GetParam()});
Task undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
auto undelayed_sequence =
WillPostTaskAndQueueTaskSource(std::move(undelayed_task), {GetParam()});
TestWaitableEvent event;
tracker_.FlushAsyncForTesting(
BindOnce(&TestWaitableEvent::Signal, Unretained(&event)));
EXPECT_FALSE(event.IsSignaled());
RunAndPopNextTask(std::move(delayed_sequence));
EXPECT_FALSE(event.IsSignaled());
RunAndPopNextTask(std::move(undelayed_sequence));
event.Wait();
}
TEST_P(ThreadPoolTaskTrackerTest, FlushAfterShutdown) {
if (GetParam() == TaskShutdownBehavior::BLOCK_SHUTDOWN)
return;
Task undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
tracker_.WillPostTask(&undelayed_task, GetParam());
test::ShutdownTaskTracker(&tracker_);
tracker_.FlushForTesting();
}
TEST_P(ThreadPoolTaskTrackerTest, FlushAfterShutdownAsync) {
if (GetParam() == TaskShutdownBehavior::BLOCK_SHUTDOWN)
return;
Task undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
tracker_.WillPostTask(&undelayed_task, GetParam());
test::ShutdownTaskTracker(&tracker_);
bool called_back = false;
tracker_.FlushAsyncForTesting(
BindOnce([](bool* called_back) { *called_back = true; },
Unretained(&called_back)));
EXPECT_TRUE(called_back);
}
TEST_P(ThreadPoolTaskTrackerTest, ShutdownDuringFlush) {
if (GetParam() == TaskShutdownBehavior::BLOCK_SHUTDOWN)
return;
Task undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
auto undelayed_sequence =
WillPostTaskAndQueueTaskSource(std::move(undelayed_task), {GetParam()});
CallFlushFromAnotherThread();
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
VERIFY_ASYNC_FLUSHES_IN_PROGRESS();
test::ShutdownTaskTracker(&tracker_);
WAIT_FOR_ASYNC_FLUSHES_RETURNED();
}
TEST_P(ThreadPoolTaskTrackerTest, ShutdownDuringFlushAsyncForTesting) {
if (GetParam() == TaskShutdownBehavior::BLOCK_SHUTDOWN)
return;
Task undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
auto undelayed_sequence =
WillPostTaskAndQueueTaskSource(std::move(undelayed_task), {GetParam()});
TestWaitableEvent event;
tracker_.FlushAsyncForTesting(
BindOnce(&TestWaitableEvent::Signal, Unretained(&event)));
EXPECT_FALSE(event.IsSignaled());
test::ShutdownTaskTracker(&tracker_);
event.Wait();
}
TEST_P(ThreadPoolTaskTrackerTest, PostTasksDoNotBlockShutdown) {
Task undelayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
EXPECT_TRUE(tracker_.WillPostTask(&undelayed_task, GetParam()));
test::ShutdownTaskTracker(&tracker_);
}
TEST_P(ThreadPoolTaskTrackerTest, DelayedRunTasks) {
Task delayed_task(FROM_HERE, DoNothing(), TimeTicks::Now(), Days(1));
auto sequence =
WillPostTaskAndQueueTaskSource(std::move(delayed_task), {GetParam()});
EXPECT_TRUE(sequence);
RunAndPopNextTask(std::move(sequence));
test::ShutdownTaskTracker(&tracker_);
}
INSTANTIATE_TEST_SUITE_P(
ContinueOnShutdown,
ThreadPoolTaskTrackerTest,
::testing::Values(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN));
INSTANTIATE_TEST_SUITE_P(
SkipOnShutdown,
ThreadPoolTaskTrackerTest,
::testing::Values(TaskShutdownBehavior::SKIP_ON_SHUTDOWN));
INSTANTIATE_TEST_SUITE_P(
BlockShutdown,
ThreadPoolTaskTrackerTest,
::testing::Values(TaskShutdownBehavior::BLOCK_SHUTDOWN));
namespace {
void ExpectSequenceToken(SequenceToken sequence_token) {
EXPECT_EQ(sequence_token, SequenceToken::GetForCurrentThread());
}
}
TEST_F(ThreadPoolTaskTrackerTest, CurrentSequenceToken) {
scoped_refptr<Sequence> sequence = MakeRefCounted<Sequence>(
TaskTraits(), nullptr, TaskSourceExecutionMode::kParallel);
const SequenceToken sequence_token = sequence->token();
Task task(FROM_HERE, BindOnce(&ExpectSequenceToken, sequence_token),
TimeTicks::Now(), TimeDelta());
tracker_.WillPostTask(&task, sequence->shutdown_behavior());
{
Sequence::Transaction sequence_transaction(sequence->BeginTransaction());
sequence_transaction.WillPushImmediateTask();
sequence_transaction.PushImmediateTask(std::move(task));
EXPECT_FALSE(SequenceToken::GetForCurrentThread().IsValid());
}
test::QueueAndRunTaskSource(&tracker_, std::move(sequence));
EXPECT_FALSE(SequenceToken::GetForCurrentThread().IsValid());
}
TEST_F(ThreadPoolTaskTrackerTest, LoadWillPostAndRunBeforeShutdown) {
std::vector<std::unique_ptr<ThreadPostingAndRunningTask>> threads;
for (size_t i = 0; i < kLoadTestNumIterations; ++i) {
threads.push_back(std::make_unique<ThreadPostingAndRunningTask>(
&tracker_,
MakeRefCounted<Sequence>(
TaskTraits{TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN}, nullptr,
TaskSourceExecutionMode::kParallel),
ThreadPostingAndRunningTask::Action::WILL_POST_AND_RUN, true,
CreateTask()));
threads.back()->Start();
threads.push_back(std::make_unique<ThreadPostingAndRunningTask>(
&tracker_,
MakeRefCounted<Sequence>(
TaskTraits{TaskShutdownBehavior::SKIP_ON_SHUTDOWN}, nullptr,
TaskSourceExecutionMode::kParallel),
ThreadPostingAndRunningTask::Action::WILL_POST_AND_RUN, true,
CreateTask()));
threads.back()->Start();
threads.push_back(std::make_unique<ThreadPostingAndRunningTask>(
&tracker_,
MakeRefCounted<Sequence>(
TaskTraits{TaskShutdownBehavior::BLOCK_SHUTDOWN}, nullptr,
TaskSourceExecutionMode::kParallel),
ThreadPostingAndRunningTask::Action::WILL_POST_AND_RUN, true,
CreateTask()));
threads.back()->Start();
}
for (const auto& thread : threads)
thread->Join();
EXPECT_EQ(kLoadTestNumIterations * 3, NumTasksExecuted());
test::ShutdownTaskTracker(&tracker_);
}
TEST_F(ThreadPoolTaskTrackerTest,
LoadWillPostBeforeShutdownAndRunDuringShutdown) {
constexpr TaskTraits traits_continue_on_shutdown =
TaskTraits(TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN);
constexpr TaskTraits traits_skip_on_shutdown =
TaskTraits(TaskShutdownBehavior::SKIP_ON_SHUTDOWN);
constexpr TaskTraits traits_block_shutdown =
TaskTraits(TaskShutdownBehavior::BLOCK_SHUTDOWN);
std::vector<std::unique_ptr<ThreadPostingAndRunningTask>> post_threads;
{
std::vector<scoped_refptr<Sequence>> sequences_continue_on_shutdown;
std::vector<scoped_refptr<Sequence>> sequences_skip_on_shutdown;
std::vector<scoped_refptr<Sequence>> sequences_block_shutdown;
for (size_t i = 0; i < kLoadTestNumIterations; ++i) {
sequences_continue_on_shutdown.push_back(
MakeRefCounted<Sequence>(traits_continue_on_shutdown, nullptr,
TaskSourceExecutionMode::kParallel));
sequences_skip_on_shutdown.push_back(
MakeRefCounted<Sequence>(traits_skip_on_shutdown, nullptr,
TaskSourceExecutionMode::kParallel));
sequences_block_shutdown.push_back(MakeRefCounted<Sequence>(
traits_block_shutdown, nullptr, TaskSourceExecutionMode::kParallel));
}
for (size_t i = 0; i < kLoadTestNumIterations; ++i) {
post_threads.push_back(std::make_unique<ThreadPostingAndRunningTask>(
&tracker_, sequences_continue_on_shutdown[i],
ThreadPostingAndRunningTask::Action::WILL_POST, true, CreateTask()));
post_threads.back()->Start();
post_threads.push_back(std::make_unique<ThreadPostingAndRunningTask>(
&tracker_, sequences_skip_on_shutdown[i],
ThreadPostingAndRunningTask::Action::WILL_POST, true, CreateTask()));
post_threads.back()->Start();
post_threads.push_back(std::make_unique<ThreadPostingAndRunningTask>(
&tracker_, sequences_block_shutdown[i],
ThreadPostingAndRunningTask::Action::WILL_POST, true, CreateTask()));
post_threads.back()->Start();
}
}
for (const auto& thread : post_threads)
thread->Join();
tracker_.StartShutdown();
ExpectAsyncCompleteShutdownBlocks();
std::vector<std::unique_ptr<ThreadPostingAndRunningTask>> run_threads;
for (size_t i = 0; i < kLoadTestNumIterations; ++i) {
run_threads.push_back(std::make_unique<ThreadPostingAndRunningTask>(
&tracker_, post_threads[i * 3]->TakeTaskSource()));
run_threads.back()->Start();
run_threads.push_back(std::make_unique<ThreadPostingAndRunningTask>(
&tracker_, post_threads[i * 3 + 1]->TakeTaskSource()));
run_threads.back()->Start();
run_threads.push_back(std::make_unique<ThreadPostingAndRunningTask>(
&tracker_, post_threads[i * 3 + 2]->TakeTaskSource()));
run_threads.back()->Start();
}
for (const auto& thread : run_threads)
thread->Join();
WAIT_FOR_ASYNC_SHUTDOWN_COMPLETED();
EXPECT_EQ(kLoadTestNumIterations, NumTasksExecuted());
}
TEST_F(ThreadPoolTaskTrackerTest, LoadWillPostAndRunDuringShutdown) {
auto block_shutdown_sequence = WillPostTaskAndQueueTaskSource(
CreateTask(), {TaskShutdownBehavior::BLOCK_SHUTDOWN});
EXPECT_TRUE(block_shutdown_sequence);
tracker_.StartShutdown();
ExpectAsyncCompleteShutdownBlocks();
std::vector<std::unique_ptr<ThreadPostingAndRunningTask>> threads;
for (size_t i = 0; i < kLoadTestNumIterations; ++i) {
threads.push_back(std::make_unique<ThreadPostingAndRunningTask>(
&tracker_,
MakeRefCounted<Sequence>(
TaskTraits{TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN}, nullptr,
TaskSourceExecutionMode::kParallel),
ThreadPostingAndRunningTask::Action::WILL_POST_AND_RUN, false,
CreateTask()));
threads.back()->Start();
threads.push_back(std::make_unique<ThreadPostingAndRunningTask>(
&tracker_,
MakeRefCounted<Sequence>(
TaskTraits{TaskShutdownBehavior::SKIP_ON_SHUTDOWN}, nullptr,
TaskSourceExecutionMode::kParallel),
ThreadPostingAndRunningTask::Action::WILL_POST_AND_RUN, false,
CreateTask()));
threads.back()->Start();
threads.push_back(std::make_unique<ThreadPostingAndRunningTask>(
&tracker_,
MakeRefCounted<Sequence>(
TaskTraits{TaskShutdownBehavior::BLOCK_SHUTDOWN}, nullptr,
TaskSourceExecutionMode::kParallel),
ThreadPostingAndRunningTask::Action::WILL_POST_AND_RUN, true,
CreateTask()));
threads.back()->Start();
}
for (const auto& thread : threads)
thread->Join();
EXPECT_EQ(kLoadTestNumIterations, NumTasksExecuted());
VERIFY_ASYNC_SHUTDOWN_IN_PROGRESS();
RunAndPopNextTask(std::move(block_shutdown_sequence));
EXPECT_EQ(kLoadTestNumIterations + 1, NumTasksExecuted());
WAIT_FOR_ASYNC_SHUTDOWN_COMPLETED();
}
TEST_F(ThreadPoolTaskTrackerTest,
RunAndPopNextTaskReturnsSequenceToReschedule) {
TaskTraits default_traits;
Task task_1(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
EXPECT_TRUE(
tracker_.WillPostTask(&task_1, default_traits.shutdown_behavior()));
Task task_2(FROM_HERE, DoNothing(), TimeTicks::Now(), TimeDelta());
EXPECT_TRUE(
tracker_.WillPostTask(&task_2, default_traits.shutdown_behavior()));
scoped_refptr<Sequence> sequence =
test::CreateSequenceWithTask(std::move(task_1), default_traits);
{
auto transaction = sequence->BeginTransaction();
transaction.WillPushImmediateTask();
transaction.PushImmediateTask(std::move(task_2));
}
EXPECT_EQ(sequence,
test::QueueAndRunTaskSource(&tracker_, sequence).Unregister());
}
namespace {
class WaitAllowedTestThread : public SimpleThread {
public:
WaitAllowedTestThread() : SimpleThread("WaitAllowedTestThread") {}
WaitAllowedTestThread(const WaitAllowedTestThread&) = delete;
WaitAllowedTestThread& operator=(const WaitAllowedTestThread&) = delete;
private:
void Run() override {
auto task_tracker = std::make_unique<TaskTracker>();
internal::AssertBaseSyncPrimitivesAllowed();
Task task_without_sync_primitives(
FROM_HERE, BindOnce([]() {
EXPECT_DCHECK_DEATH({ internal::AssertBaseSyncPrimitivesAllowed(); });
}),
TimeTicks::Now(), TimeDelta());
TaskTraits default_traits;
EXPECT_TRUE(task_tracker->WillPostTask(&task_without_sync_primitives,
default_traits.shutdown_behavior()));
auto sequence_without_sync_primitives = test::CreateSequenceWithTask(
std::move(task_without_sync_primitives), default_traits);
test::QueueAndRunTaskSource(task_tracker.get(),
std::move(sequence_without_sync_primitives));
internal::AssertBaseSyncPrimitivesAllowed();
Task task_with_sync_primitives(
FROM_HERE, BindOnce([]() {
internal::AssertBaseSyncPrimitivesAllowed();
}),
TimeTicks::Now(), TimeDelta());
TaskTraits traits_with_sync_primitives =
TaskTraits(WithBaseSyncPrimitives());
EXPECT_TRUE(task_tracker->WillPostTask(
&task_with_sync_primitives,
traits_with_sync_primitives.shutdown_behavior()));
auto sequence_with_sync_primitives = test::CreateSequenceWithTask(
std::move(task_with_sync_primitives), traits_with_sync_primitives);
test::QueueAndRunTaskSource(task_tracker.get(),
std::move(sequence_with_sync_primitives));
ScopedAllowBaseSyncPrimitivesForTesting
allow_wait_in_task_tracker_destructor;
task_tracker.reset();
}
};
}
TEST(ThreadPoolTaskTrackerWaitAllowedTest, WaitAllowed) {
testing::GTEST_FLAG(death_test_style) = "threadsafe";
WaitAllowedTestThread wait_allowed_test_thread;
wait_allowed_test_thread.Start();
wait_allowed_test_thread.Join();
}
}
}