#include "base/task/thread_pool/thread_group_impl.h"
#include <stddef.h>
#include <algorithm>
#include <atomic>
#include <memory>
#include <optional>
#include <unordered_set>
#include <utility>
#include <vector>
#include "base/barrier_closure.h"
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/functional/callback_helpers.h"
#include "base/memory/ptr_util.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/ref_counted.h"
#include "base/metrics/statistics_recorder.h"
#include "base/synchronization/atomic_flag.h"
#include "base/synchronization/condition_variable.h"
#include "base/synchronization/lock.h"
#include "base/task/task_runner.h"
#include "base/task/thread_pool/delayed_task_manager.h"
#include "base/task/thread_pool/environment_config.h"
#include "base/task/thread_pool/pooled_task_runner_delegate.h"
#include "base/task/thread_pool/sequence.h"
#include "base/task/thread_pool/task_source_sort_key.h"
#include "base/task/thread_pool/task_tracker.h"
#include "base/task/thread_pool/test_task_factory.h"
#include "base/task/thread_pool/test_utils.h"
#include "base/task/thread_pool/worker_thread_observer.h"
#include "base/test/bind.h"
#include "base/test/gtest_util.h"
#include "base/test/test_simple_task_runner.h"
#include "base/test/test_timeouts.h"
#include "base/test/test_waitable_event.h"
#include "base/threading/platform_thread.h"
#include "base/threading/scoped_blocking_call.h"
#include "base/threading/simple_thread.h"
#include "base/threading/thread.h"
#include "base/threading/thread_checker_impl.h"
#include "base/time/time.h"
#include "base/timer/timer.h"
#include "build/build_config.h"
#include "testing/gtest/include/gtest/gtest.h"
namespace base::internal {
namespace {
constexpr size_t kMaxTasks = 4;
constexpr size_t kNumThreadsPostingTasks = 4;
constexpr size_t kNumTasksPostedPerThread = 150;
constexpr TimeDelta kReclaimTimeForCleanupTests = Milliseconds(500);
constexpr size_t kLargeNumber = 512;
class ThreadGroupImplImplTestBase : public ThreadGroup::Delegate {
public:
ThreadGroupImplImplTestBase(const ThreadGroupImplImplTestBase&) = delete;
ThreadGroupImplImplTestBase& operator=(const ThreadGroupImplImplTestBase&) =
delete;
protected:
ThreadGroupImplImplTestBase()
: service_thread_("ThreadPoolServiceThread"),
tracked_ref_factory_(this) {}
void CommonTearDown() {
delayed_task_manager_.Shutdown();
service_thread_.Stop();
task_tracker_.FlushForTesting();
if (thread_group_) {
thread_group_->JoinForTesting();
}
mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
thread_group_.reset();
}
void CreateThreadGroup(ThreadType thread_type = ThreadType::kDefault) {
ASSERT_FALSE(thread_group_);
service_thread_.Start();
delayed_task_manager_.Start(service_thread_.task_runner());
thread_group_ = std::make_unique<ThreadGroupImpl>(
"TestThreadGroup", "A", thread_type, 0,
task_tracker_.GetTrackedRef(), tracked_ref_factory_.GetTrackedRef());
ASSERT_TRUE(thread_group_);
mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
}
void StartThreadGroup(
TimeDelta suggested_reclaim_time,
size_t max_tasks,
std::optional<int> max_best_effort_tasks = std::nullopt,
WorkerThreadObserver* worker_observer = nullptr,
std::optional<TimeDelta> may_block_threshold = std::nullopt) {
ASSERT_TRUE(thread_group_);
thread_group_->Start(
max_tasks,
max_best_effort_tasks ? max_best_effort_tasks.value() : max_tasks,
suggested_reclaim_time, service_thread_.task_runner(), worker_observer,
ThreadGroup::WorkerEnvironment::NONE,
false, may_block_threshold);
}
void CreateAndStartThreadGroup(
TimeDelta suggested_reclaim_time = TimeDelta::Max(),
size_t max_tasks = kMaxTasks,
std::optional<int> max_best_effort_tasks = std::nullopt,
WorkerThreadObserver* worker_observer = nullptr,
std::optional<TimeDelta> may_block_threshold = std::nullopt) {
CreateThreadGroup();
StartThreadGroup(suggested_reclaim_time, max_tasks, max_best_effort_tasks,
worker_observer, may_block_threshold);
}
Thread service_thread_;
TaskTracker task_tracker_;
std::unique_ptr<ThreadGroupImpl> thread_group_;
DelayedTaskManager delayed_task_manager_;
TrackedRefFactory<ThreadGroup::Delegate> tracked_ref_factory_;
test::MockPooledTaskRunnerDelegate mock_pooled_task_runner_delegate_ = {
task_tracker_.GetTrackedRef(), &delayed_task_manager_};
private:
ThreadGroup* GetThreadGroupForTraits(const TaskTraits& traits) override {
return thread_group_.get();
}
};
class ThreadGroupImplImplTest : public ThreadGroupImplImplTestBase,
public testing::Test {
public:
ThreadGroupImplImplTest(const ThreadGroupImplImplTest&) = delete;
ThreadGroupImplImplTest& operator=(const ThreadGroupImplImplTest&) = delete;
protected:
ThreadGroupImplImplTest() = default;
void SetUp() override { CreateAndStartThreadGroup(); }
void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
};
class ThreadGroupImplImplTestParam
: public ThreadGroupImplImplTestBase,
public testing::TestWithParam<TaskSourceExecutionMode> {
public:
ThreadGroupImplImplTestParam(const ThreadGroupImplImplTestParam&) = delete;
ThreadGroupImplImplTestParam& operator=(const ThreadGroupImplImplTestParam&) =
delete;
protected:
ThreadGroupImplImplTestParam() = default;
void SetUp() override { CreateAndStartThreadGroup(); }
void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
};
using PostNestedTask = test::TestTaskFactory::PostNestedTask;
class ThreadPostingTasksWaitIdle : public SimpleThread {
public:
ThreadPostingTasksWaitIdle(
ThreadGroupImpl* thread_group,
test::MockPooledTaskRunnerDelegate* mock_pooled_task_runner_delegate_,
TaskSourceExecutionMode execution_mode)
: SimpleThread("ThreadPostingTasksWaitIdle"),
thread_group_(thread_group),
factory_(CreatePooledTaskRunnerWithExecutionMode(
execution_mode,
mock_pooled_task_runner_delegate_),
execution_mode) {
DCHECK(thread_group_);
}
ThreadPostingTasksWaitIdle(const ThreadPostingTasksWaitIdle&) = delete;
ThreadPostingTasksWaitIdle& operator=(const ThreadPostingTasksWaitIdle&) =
delete;
const test::TestTaskFactory* factory() const { return &factory_; }
private:
void Run() override {
for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
thread_group_->WaitForAllWorkersIdleForTesting();
EXPECT_TRUE(factory_.PostTask(PostNestedTask::NO, OnceClosure()));
}
}
const raw_ptr<ThreadGroupImpl> thread_group_;
const scoped_refptr<TaskRunner> task_runner_;
test::TestTaskFactory factory_;
};
}
TEST_P(ThreadGroupImplImplTestParam, PostTasksWaitAllWorkersIdle) {
std::vector<std::unique_ptr<ThreadPostingTasksWaitIdle>>
threads_posting_tasks;
for (size_t i = 0; i < kNumThreadsPostingTasks; ++i) {
threads_posting_tasks.push_back(
std::make_unique<ThreadPostingTasksWaitIdle>(
thread_group_.get(), &mock_pooled_task_runner_delegate_,
GetParam()));
threads_posting_tasks.back()->Start();
}
for (const auto& thread_posting_tasks : threads_posting_tasks) {
thread_posting_tasks->Join();
thread_posting_tasks->factory()->WaitForAllTasksToRun();
}
thread_group_->WaitForAllWorkersIdleForTesting();
}
TEST_P(ThreadGroupImplImplTestParam, PostTasksWithOneAvailableWorker) {
TestWaitableEvent event;
std::vector<std::unique_ptr<test::TestTaskFactory>> blocked_task_factories;
for (size_t i = 0; i < (kMaxTasks - 1); ++i) {
blocked_task_factories.push_back(std::make_unique<test::TestTaskFactory>(
CreatePooledTaskRunnerWithExecutionMode(
GetParam(), &mock_pooled_task_runner_delegate_),
GetParam()));
EXPECT_TRUE(blocked_task_factories.back()->PostTask(
PostNestedTask::NO,
BindOnce(&TestWaitableEvent::Wait, Unretained(&event))));
blocked_task_factories.back()->WaitForAllTasksToRun();
}
test::TestTaskFactory short_task_factory(
CreatePooledTaskRunnerWithExecutionMode(
GetParam(), &mock_pooled_task_runner_delegate_),
GetParam());
for (size_t i = 0; i < kNumTasksPostedPerThread; ++i) {
EXPECT_TRUE(short_task_factory.PostTask(PostNestedTask::NO, OnceClosure()));
}
short_task_factory.WaitForAllTasksToRun();
event.Signal();
thread_group_->WaitForAllWorkersIdleForTesting();
}
TEST_P(ThreadGroupImplImplTestParam, Saturate) {
TestWaitableEvent event;
std::vector<std::unique_ptr<test::TestTaskFactory>> factories;
for (size_t i = 0; i < kMaxTasks; ++i) {
factories.push_back(std::make_unique<test::TestTaskFactory>(
CreatePooledTaskRunnerWithExecutionMode(
GetParam(), &mock_pooled_task_runner_delegate_),
GetParam()));
EXPECT_TRUE(factories.back()->PostTask(
PostNestedTask::NO,
BindOnce(&TestWaitableEvent::Wait, Unretained(&event))));
factories.back()->WaitForAllTasksToRun();
}
event.Signal();
thread_group_->WaitForAllWorkersIdleForTesting();
}
TEST_F(ThreadGroupImplImplTest, ShouldYieldFloodedUserVisible) {
TestWaitableEvent threads_running;
TestWaitableEvent threads_continue;
RepeatingClosure threads_running_barrier = BarrierClosure(
kMaxTasks,
BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
auto job_task = base::MakeRefCounted<test::MockJobTask>(
BindLambdaForTesting(
[&threads_running_barrier, &threads_continue](JobDelegate* delegate) {
threads_running_barrier.Run();
threads_continue.Wait();
}),
kMaxTasks);
scoped_refptr<JobTaskSource> task_source =
job_task->GetJobTaskSource(FROM_HERE, {TaskPriority::USER_VISIBLE},
&mock_pooled_task_runner_delegate_);
auto registered_task_source = task_tracker_.RegisterTaskSource(task_source);
ASSERT_TRUE(registered_task_source);
static_cast<ThreadGroup*>(thread_group_.get())
->PushTaskSourceAndWakeUpWorkers(
RegisteredTaskSourceAndTransaction::FromTaskSource(
std::move(registered_task_source)));
threads_running.Wait();
test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT},
&mock_pooled_task_runner_delegate_)
->PostTask(
FROM_HERE, BindLambdaForTesting([&] {
EXPECT_FALSE(thread_group_->ShouldYield(
{TaskPriority::BEST_EFFORT, TimeTicks(), 1}));
}));
EXPECT_FALSE(thread_group_->ShouldYield(
{TaskPriority::BEST_EFFORT, TimeTicks(), 2}));
EXPECT_FALSE(thread_group_->ShouldYield(
{TaskPriority::BEST_EFFORT, TimeTicks(), 0}));
EXPECT_FALSE(thread_group_->ShouldYield(
{TaskPriority::USER_VISIBLE, TimeTicks(), 0}));
EXPECT_FALSE(thread_group_->ShouldYield(
{TaskPriority::USER_BLOCKING, TimeTicks(), 0}));
auto post_user_visible = [&] {
test::CreatePooledTaskRunner({TaskPriority::USER_VISIBLE},
&mock_pooled_task_runner_delegate_)
->PostTask(FROM_HERE, BindLambdaForTesting([&] {
EXPECT_FALSE(thread_group_->ShouldYield(
{TaskPriority::USER_VISIBLE, TimeTicks(),
1}));
}));
};
post_user_visible();
EXPECT_TRUE(thread_group_->ShouldYield(
{TaskPriority::USER_VISIBLE, TimeTicks(), 2}));
post_user_visible();
EXPECT_TRUE(thread_group_->ShouldYield(
{TaskPriority::BEST_EFFORT, TimeTicks(), 0}));
post_user_visible();
EXPECT_FALSE(thread_group_->ShouldYield(
{TaskPriority::USER_VISIBLE, TimeTicks(), 1}));
EXPECT_FALSE(thread_group_->ShouldYield(
{TaskPriority::USER_BLOCKING, TimeTicks(), 0}));
auto post_user_blocking = [&] {
test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
&mock_pooled_task_runner_delegate_)
->PostTask(FROM_HERE, BindLambdaForTesting([&] {
EXPECT_FALSE(thread_group_->ShouldYield(
{TaskPriority::USER_BLOCKING, TimeTicks(),
1}));
}));
};
post_user_blocking();
EXPECT_TRUE(thread_group_->ShouldYield(
{TaskPriority::USER_BLOCKING, TimeTicks(), 2}));
post_user_blocking();
EXPECT_TRUE(thread_group_->ShouldYield(
{TaskPriority::BEST_EFFORT, TimeTicks(), 0}));
post_user_blocking();
EXPECT_TRUE(thread_group_->ShouldYield(
{TaskPriority::USER_VISIBLE, TimeTicks(), 0}));
post_user_blocking();
EXPECT_FALSE(thread_group_->ShouldYield(
{TaskPriority::USER_BLOCKING, TimeTicks(), 1}));
threads_continue.Signal();
task_tracker_.FlushForTesting();
}
INSTANTIATE_TEST_SUITE_P(Parallel,
ThreadGroupImplImplTestParam,
::testing::Values(TaskSourceExecutionMode::kParallel));
INSTANTIATE_TEST_SUITE_P(
Sequenced,
ThreadGroupImplImplTestParam,
::testing::Values(TaskSourceExecutionMode::kSequenced));
INSTANTIATE_TEST_SUITE_P(Job,
ThreadGroupImplImplTestParam,
::testing::Values(TaskSourceExecutionMode::kJob));
namespace {
class ThreadGroupImplImplStartInBodyTest : public ThreadGroupImplImplTest {
public:
void SetUp() override {
CreateThreadGroup();
}
};
void TaskPostedBeforeStart(PlatformThreadRef* platform_thread_ref,
TestWaitableEvent* task_running,
TestWaitableEvent* barrier) {
*platform_thread_ref = PlatformThread::CurrentRef();
task_running->Signal();
barrier->Wait();
}
}
TEST_F(ThreadGroupImplImplStartInBodyTest, PostTasksBeforeStart) {
PlatformThreadRef task_1_thread_ref;
PlatformThreadRef task_2_thread_ref;
TestWaitableEvent task_1_running;
TestWaitableEvent task_2_running;
TestWaitableEvent barrier;
test::CreatePooledTaskRunner({WithBaseSyncPrimitives()},
&mock_pooled_task_runner_delegate_)
->PostTask(
FROM_HERE,
BindOnce(&TaskPostedBeforeStart, Unretained(&task_1_thread_ref),
Unretained(&task_1_running), Unretained(&barrier)));
test::CreatePooledTaskRunner({WithBaseSyncPrimitives()},
&mock_pooled_task_runner_delegate_)
->PostTask(
FROM_HERE,
BindOnce(&TaskPostedBeforeStart, Unretained(&task_2_thread_ref),
Unretained(&task_2_running), Unretained(&barrier)));
EXPECT_EQ(0U, thread_group_->NumberOfWorkersForTesting());
EXPECT_FALSE(task_1_running.IsSignaled());
EXPECT_FALSE(task_2_running.IsSignaled());
StartThreadGroup(TimeDelta::Max(), kMaxTasks);
task_1_running.Wait();
task_2_running.Wait();
EXPECT_NE(task_1_thread_ref, task_2_thread_ref);
barrier.Signal();
task_tracker_.FlushForTesting();
}
TEST_F(ThreadGroupImplImplStartInBodyTest, PostManyTasks) {
scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
{WithBaseSyncPrimitives()}, &mock_pooled_task_runner_delegate_);
constexpr size_t kNumTasksPosted = 2 * kMaxTasks;
TestWaitableEvent threads_running;
TestWaitableEvent threads_continue;
RepeatingClosure threads_running_barrier = BarrierClosure(
kMaxTasks,
BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
for (size_t i = 0; i < kMaxTasks; ++i) {
task_runner->PostTask(FROM_HERE, BindLambdaForTesting([&] {
threads_running_barrier.Run();
threads_continue.Wait();
}));
}
for (size_t i = kMaxTasks; i < kNumTasksPosted; ++i) {
task_runner->PostTask(FROM_HERE, DoNothing());
}
EXPECT_EQ(0U, thread_group_->NumberOfWorkersForTesting());
StartThreadGroup(TimeDelta::Max(), kMaxTasks);
EXPECT_GT(thread_group_->NumberOfWorkersForTesting(), 0U);
EXPECT_EQ(kMaxTasks, thread_group_->GetMaxTasksForTesting());
threads_running.Wait();
EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(),
thread_group_->GetMaxTasksForTesting());
threads_continue.Signal();
task_tracker_.FlushForTesting();
}
namespace {
class BackgroundThreadGroupImplTest : public ThreadGroupImplImplTest {
public:
void CreateAndStartThreadGroup(
TimeDelta suggested_reclaim_time = TimeDelta::Max(),
size_t max_tasks = kMaxTasks,
std::optional<int> max_best_effort_tasks = std::nullopt,
WorkerThreadObserver* worker_observer = nullptr,
std::optional<TimeDelta> may_block_threshold = std::nullopt) {
if (!CanUseBackgroundThreadTypeForWorkerThread()) {
return;
}
CreateThreadGroup(ThreadType::kBackground);
StartThreadGroup(suggested_reclaim_time, max_tasks, max_best_effort_tasks,
worker_observer, may_block_threshold);
}
void SetUp() override { CreateAndStartThreadGroup(); }
};
}
TEST_F(BackgroundThreadGroupImplTest, UpdatePriorityBlockingStarted) {
if (!CanUseBackgroundThreadTypeForWorkerThread()) {
return;
}
const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
{MayBlock(), WithBaseSyncPrimitives(), TaskPriority::BEST_EFFORT},
&mock_pooled_task_runner_delegate_);
TestWaitableEvent threads_running;
RepeatingClosure threads_running_barrier = BarrierClosure(
kMaxTasks,
BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
TestWaitableEvent blocking_threads_continue;
for (size_t i = 0; i < kMaxTasks; ++i) {
task_runner->PostTask(FROM_HERE, BindLambdaForTesting([&] {
EXPECT_EQ(ThreadType::kBackground,
PlatformThread::GetCurrentThreadType());
{
ScopedBlockingCall scoped_blocking_call(
FROM_HERE, BlockingType::MAY_BLOCK);
EXPECT_EQ(ThreadType::kBackground,
PlatformThread::GetCurrentThreadType());
}
threads_running_barrier.Run();
blocking_threads_continue.Wait();
ScopedBlockingCall scoped_blocking_call(
FROM_HERE, BlockingType::MAY_BLOCK);
EXPECT_EQ(ThreadType::kDefault,
PlatformThread::GetCurrentThreadType());
}));
}
threads_running.Wait();
task_tracker_.StartShutdown();
blocking_threads_continue.Signal();
task_tracker_.FlushForTesting();
}
namespace {
class ThreadGroupImplStandbyPolicyTest : public ThreadGroupImplImplTestBase,
public testing::Test {
public:
ThreadGroupImplStandbyPolicyTest() = default;
ThreadGroupImplStandbyPolicyTest(const ThreadGroupImplStandbyPolicyTest&) =
delete;
ThreadGroupImplStandbyPolicyTest& operator=(
const ThreadGroupImplStandbyPolicyTest&) = delete;
void SetUp() override {
CreateAndStartThreadGroup(kReclaimTimeForCleanupTests);
}
void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
};
}
TEST_F(ThreadGroupImplStandbyPolicyTest, InitOne) {
EXPECT_EQ(1U, thread_group_->NumberOfWorkersForTesting());
}
namespace {
enum class OptionalBlockingType {
NO_BLOCK,
MAY_BLOCK,
WILL_BLOCK,
};
struct NestedBlockingType {
NestedBlockingType(BlockingType first_in,
OptionalBlockingType second_in,
BlockingType behaves_as_in)
: first(first_in), second(second_in), behaves_as(behaves_as_in) {}
BlockingType first;
OptionalBlockingType second;
BlockingType behaves_as;
};
class NestedScopedBlockingCall {
public:
explicit NestedScopedBlockingCall(
const NestedBlockingType& nested_blocking_type)
: first_scoped_blocking_call_(FROM_HERE, nested_blocking_type.first),
second_scoped_blocking_call_(
nested_blocking_type.second == OptionalBlockingType::WILL_BLOCK
? std::make_unique<ScopedBlockingCall>(FROM_HERE,
BlockingType::WILL_BLOCK)
: (nested_blocking_type.second ==
OptionalBlockingType::MAY_BLOCK
? std::make_unique<ScopedBlockingCall>(
FROM_HERE,
BlockingType::MAY_BLOCK)
: nullptr)) {}
NestedScopedBlockingCall(const NestedScopedBlockingCall&) = delete;
NestedScopedBlockingCall& operator=(const NestedScopedBlockingCall&) = delete;
private:
ScopedBlockingCall first_scoped_blocking_call_;
std::unique_ptr<ScopedBlockingCall> second_scoped_blocking_call_;
};
}
class ThreadGroupImplBlockingTest
: public ThreadGroupImplImplTestBase,
public testing::TestWithParam<NestedBlockingType> {
public:
ThreadGroupImplBlockingTest() = default;
ThreadGroupImplBlockingTest(const ThreadGroupImplBlockingTest&) = delete;
ThreadGroupImplBlockingTest& operator=(const ThreadGroupImplBlockingTest&) =
delete;
static std::string ParamInfoToString(
::testing::TestParamInfo<NestedBlockingType> param_info) {
std::string str = param_info.param.first == BlockingType::MAY_BLOCK
? "MAY_BLOCK"
: "WILL_BLOCK";
if (param_info.param.second == OptionalBlockingType::MAY_BLOCK) {
str += "_MAY_BLOCK";
} else if (param_info.param.second == OptionalBlockingType::WILL_BLOCK) {
str += "_WILL_BLOCK";
}
return str;
}
void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
protected:
void SaturateWithBlockingTasks(
const NestedBlockingType& nested_blocking_type,
TaskPriority priority = TaskPriority::USER_BLOCKING) {
TestWaitableEvent threads_running;
const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
{MayBlock(), WithBaseSyncPrimitives(), priority},
&mock_pooled_task_runner_delegate_);
RepeatingClosure threads_running_barrier = BarrierClosure(
kMaxTasks,
BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
for (size_t i = 0; i < kMaxTasks; ++i) {
task_runner->PostTask(
FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier,
nested_blocking_type]() {
NestedScopedBlockingCall nested_scoped_blocking_call(
nested_blocking_type);
threads_running_barrier.Run();
blocking_threads_continue_.Wait();
}));
}
threads_running.Wait();
}
void SaturateWithBusyTasks(
TaskPriority priority = TaskPriority::USER_BLOCKING,
TaskShutdownBehavior shutdown_behavior =
TaskShutdownBehavior::SKIP_ON_SHUTDOWN) {
TestWaitableEvent threads_running;
const scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
{MayBlock(), WithBaseSyncPrimitives(), priority, shutdown_behavior},
&mock_pooled_task_runner_delegate_);
RepeatingClosure threads_running_barrier = BarrierClosure(
kMaxTasks,
BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
for (size_t i = 0; i < kMaxTasks; ++i) {
task_runner->PostTask(
FROM_HERE, BindLambdaForTesting([this, &threads_running_barrier]() {
threads_running_barrier.Run();
busy_threads_continue_.Wait();
}));
}
threads_running.Wait();
}
TimeDelta GetMaxTasksChangeSleepTime() {
return std::max(thread_group_->blocked_workers_poll_period_for_testing(),
thread_group_->may_block_threshold_for_testing()) +
TestTimeouts::tiny_timeout();
}
void ExpectMaxTasksIncreasesTo(size_t expected_max_tasks) {
size_t max_tasks = thread_group_->GetMaxTasksForTesting();
while (max_tasks != expected_max_tasks) {
PlatformThread::Sleep(GetMaxTasksChangeSleepTime());
size_t new_max_tasks = thread_group_->GetMaxTasksForTesting();
ASSERT_GE(new_max_tasks, max_tasks);
max_tasks = new_max_tasks;
}
}
void UnblockBlockingTasks() { blocking_threads_continue_.Signal(); }
void UnblockBusyTasks() { busy_threads_continue_.Signal(); }
const scoped_refptr<TaskRunner> task_runner_ =
test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
&mock_pooled_task_runner_delegate_);
private:
TestWaitableEvent blocking_threads_continue_;
TestWaitableEvent busy_threads_continue_;
};
TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblocked) {
CreateAndStartThreadGroup();
ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
SaturateWithBlockingTasks(GetParam());
SaturateWithBusyTasks();
EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
UnblockBusyTasks();
UnblockBlockingTasks();
task_tracker_.FlushForTesting();
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
}
TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblockedBestEffort) {
CreateAndStartThreadGroup();
ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
SaturateWithBlockingTasks(GetParam(), TaskPriority::BEST_EFFORT);
SaturateWithBusyTasks(TaskPriority::BEST_EFFORT);
EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
UnblockBusyTasks();
UnblockBlockingTasks();
task_tracker_.FlushForTesting();
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
}
TEST_P(ThreadGroupImplBlockingTest, TooManyBestEffortTasks) {
constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
CreateAndStartThreadGroup(TimeDelta::Max(), kMaxTasks, kMaxBestEffortTasks);
TestWaitableEvent threads_continue;
{
TestWaitableEvent entered_blocking_scope;
RepeatingClosure entered_blocking_scope_barrier = BarrierClosure(
kMaxBestEffortTasks + 1, BindOnce(&TestWaitableEvent::Signal,
Unretained(&entered_blocking_scope)));
TestWaitableEvent exit_blocking_scope;
TestWaitableEvent threads_running;
RepeatingClosure threads_running_barrier = BarrierClosure(
kMaxBestEffortTasks + 1,
BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
const auto best_effort_task_runner =
test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
&mock_pooled_task_runner_delegate_);
for (size_t i = 0; i < kMaxBestEffortTasks + 1; ++i) {
best_effort_task_runner->PostTask(
FROM_HERE, BindLambdaForTesting([&] {
{
NestedScopedBlockingCall scoped_blocking_call(GetParam());
entered_blocking_scope_barrier.Run();
exit_blocking_scope.Wait();
}
threads_running_barrier.Run();
threads_continue.Wait();
}));
}
entered_blocking_scope.Wait();
exit_blocking_scope.Signal();
threads_running.Wait();
}
EXPECT_GE(thread_group_->NumberOfWorkersForTesting(),
kMaxBestEffortTasks + 1);
EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
kMaxBestEffortTasks + 2);
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
TestWaitableEvent threads_running;
task_runner_->PostTask(FROM_HERE, BindLambdaForTesting([&] {
threads_running.Signal();
threads_continue.Wait();
}));
threads_running.Wait();
EXPECT_GE(thread_group_->NumberOfWorkersForTesting(),
kMaxBestEffortTasks + 2);
EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
kMaxBestEffortTasks + 3);
threads_continue.Signal();
task_tracker_.FlushForTesting();
}
TEST_P(ThreadGroupImplBlockingTest, PostBeforeBlocking) {
CreateAndStartThreadGroup();
TestWaitableEvent thread_running(WaitableEvent::ResetPolicy::AUTOMATIC);
TestWaitableEvent thread_can_block;
TestWaitableEvent threads_continue;
for (size_t i = 0; i < kMaxTasks; ++i) {
task_runner_->PostTask(
FROM_HERE,
BindOnce(
[](const NestedBlockingType& nested_blocking_type,
TestWaitableEvent* thread_running,
TestWaitableEvent* thread_can_block,
TestWaitableEvent* threads_continue) {
thread_running->Signal();
thread_can_block->Wait();
NestedScopedBlockingCall nested_scoped_blocking_call(
nested_blocking_type);
threads_continue->Wait();
},
GetParam(), Unretained(&thread_running),
Unretained(&thread_can_block), Unretained(&threads_continue)));
thread_running.Wait();
}
EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
TestWaitableEvent extra_threads_running;
TestWaitableEvent extra_threads_continue;
RepeatingClosure extra_threads_running_barrier = BarrierClosure(
kMaxTasks,
BindOnce(&TestWaitableEvent::Signal, Unretained(&extra_threads_running)));
for (size_t i = 0; i < kMaxTasks; ++i) {
task_runner_->PostTask(
FROM_HERE, BindOnce(
[](RepeatingClosure* extra_threads_running_barrier,
TestWaitableEvent* extra_threads_continue) {
extra_threads_running_barrier->Run();
extra_threads_continue->Wait();
},
Unretained(&extra_threads_running_barrier),
Unretained(&extra_threads_continue)));
}
thread_can_block.Signal();
extra_threads_running.Wait();
EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
extra_threads_continue.Signal();
threads_continue.Signal();
task_tracker_.FlushForTesting();
}
TEST_P(ThreadGroupImplBlockingTest, WorkersIdleWhenOverCapacity) {
CreateAndStartThreadGroup();
ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
SaturateWithBlockingTasks(GetParam());
SaturateWithBusyTasks();
ASSERT_EQ(thread_group_->NumberOfIdleWorkersForTesting(), 0U);
EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
AtomicFlag is_exiting;
for (size_t i = 0; i < kMaxTasks; ++i) {
task_runner_->PostTask(FROM_HERE, BindOnce(
[](AtomicFlag* is_exiting) {
EXPECT_TRUE(is_exiting->IsSet());
},
Unretained(&is_exiting)));
}
UnblockBlockingTasks();
thread_group_->WaitForWorkersIdleForTesting(kMaxTasks);
EXPECT_EQ(thread_group_->NumberOfIdleWorkersForTesting(), kMaxTasks);
for (size_t i = 0; i < kMaxTasks; ++i) {
task_runner_->PostTask(FROM_HERE, BindOnce(
[](AtomicFlag* is_exiting) {
EXPECT_TRUE(is_exiting->IsSet());
},
Unretained(&is_exiting)));
}
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
is_exiting.Set();
UnblockBusyTasks();
task_tracker_.FlushForTesting();
}
TEST_P(ThreadGroupImplBlockingTest, ThreadBlockedUnblockedShouldYield) {
CreateAndStartThreadGroup();
ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
EXPECT_FALSE(
thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
SaturateWithBlockingTasks(GetParam());
EXPECT_FALSE(
thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
SaturateWithBusyTasks();
EXPECT_FALSE(
thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
test::CreatePooledTaskRunner({TaskPriority::USER_VISIBLE},
&mock_pooled_task_runner_delegate_)
->PostTask(FROM_HERE, BindLambdaForTesting([&] {
EXPECT_FALSE(thread_group_->ShouldYield(
{TaskPriority::BEST_EFFORT, TimeTicks()}));
}));
EXPECT_TRUE(
thread_group_->ShouldYield({TaskPriority::BEST_EFFORT, TimeTicks()}));
test::CreatePooledTaskRunner({TaskPriority::USER_BLOCKING},
&mock_pooled_task_runner_delegate_)
->PostTask(FROM_HERE, BindLambdaForTesting([&] {
EXPECT_FALSE(thread_group_->ShouldYield(
{TaskPriority::USER_VISIBLE, TimeTicks()}));
}));
EXPECT_TRUE(
thread_group_->ShouldYield({TaskPriority::USER_VISIBLE, TimeTicks()}));
UnblockBusyTasks();
UnblockBlockingTasks();
task_tracker_.FlushForTesting();
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
}
INSTANTIATE_TEST_SUITE_P(
All,
ThreadGroupImplBlockingTest,
::testing::Values(NestedBlockingType(BlockingType::MAY_BLOCK,
OptionalBlockingType::NO_BLOCK,
BlockingType::MAY_BLOCK),
NestedBlockingType(BlockingType::WILL_BLOCK,
OptionalBlockingType::NO_BLOCK,
BlockingType::WILL_BLOCK),
NestedBlockingType(BlockingType::MAY_BLOCK,
OptionalBlockingType::WILL_BLOCK,
BlockingType::WILL_BLOCK),
NestedBlockingType(BlockingType::WILL_BLOCK,
OptionalBlockingType::MAY_BLOCK,
BlockingType::WILL_BLOCK)),
ThreadGroupImplBlockingTest::ParamInfoToString);
TEST_F(ThreadGroupImplBlockingTest, ThreadBlockUnblockPremature) {
CreateAndStartThreadGroup(TimeDelta::Max(),
kMaxTasks,
std::nullopt,
nullptr,
TimeDelta::Max());
ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
SaturateWithBlockingTasks(NestedBlockingType(BlockingType::MAY_BLOCK,
OptionalBlockingType::NO_BLOCK,
BlockingType::MAY_BLOCK));
PlatformThread::Sleep(
2 * thread_group_->blocked_workers_poll_period_for_testing());
EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
UnblockBlockingTasks();
task_tracker_.FlushForTesting();
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
}
TEST_F(ThreadGroupImplBlockingTest, ThreadBlockUnblockPrematureBestEffort) {
CreateAndStartThreadGroup(TimeDelta::Max(),
kMaxTasks,
kMaxTasks,
nullptr,
TimeDelta::Max());
ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
ASSERT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
SaturateWithBlockingTasks(NestedBlockingType(BlockingType::WILL_BLOCK,
OptionalBlockingType::NO_BLOCK,
BlockingType::WILL_BLOCK),
TaskPriority::BEST_EFFORT);
PlatformThread::Sleep(
2 * thread_group_->blocked_workers_poll_period_for_testing());
EXPECT_GE(thread_group_->NumberOfWorkersForTesting(), kMaxTasks);
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), 2 * kMaxTasks);
EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
UnblockBlockingTasks();
task_tracker_.FlushForTesting();
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
EXPECT_EQ(thread_group_->GetMaxBestEffortTasksForTesting(), kMaxTasks);
}
TEST_F(ThreadGroupImplBlockingTest, MayBlockIncreaseCapacityNestedWillBlock) {
CreateAndStartThreadGroup();
ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
auto task_runner =
test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
&mock_pooled_task_runner_delegate_);
TestWaitableEvent can_return;
for (size_t i = 0; i < kMaxTasks - 1; ++i) {
task_runner->PostTask(
FROM_HERE, BindOnce(&TestWaitableEvent::Wait, Unretained(&can_return)));
}
TestWaitableEvent can_instantiate_will_block;
TestWaitableEvent did_instantiate_will_block;
task_runner->PostTask(
FROM_HERE,
BindOnce(
[](TestWaitableEvent* can_instantiate_will_block,
TestWaitableEvent* did_instantiate_will_block,
TestWaitableEvent* can_return) {
ScopedBlockingCall may_block(FROM_HERE, BlockingType::MAY_BLOCK);
can_instantiate_will_block->Wait();
ScopedBlockingCall will_block(FROM_HERE, BlockingType::WILL_BLOCK);
did_instantiate_will_block->Signal();
can_return->Wait();
},
Unretained(&can_instantiate_will_block),
Unretained(&did_instantiate_will_block), Unretained(&can_return)));
ExpectMaxTasksIncreasesTo(kMaxTasks + 1);
can_instantiate_will_block.Signal();
did_instantiate_will_block.Wait();
EXPECT_EQ(kMaxTasks + 1, thread_group_->GetMaxTasksForTesting());
can_return.Signal();
task_tracker_.FlushForTesting();
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
}
TEST_F(ThreadGroupImplBlockingTest, ThreadBusyShutdown) {
CreateAndStartThreadGroup();
ASSERT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
SaturateWithBusyTasks(TaskPriority::BEST_EFFORT,
TaskShutdownBehavior::CONTINUE_ON_SHUTDOWN);
thread_group_->OnShutdownStarted();
SaturateWithBusyTasks(TaskPriority::BEST_EFFORT,
TaskShutdownBehavior::BLOCK_SHUTDOWN);
EXPECT_EQ(thread_group_->NumberOfWorkersForTesting(), 2 * kMaxTasks);
UnblockBusyTasks();
task_tracker_.FlushForTesting();
thread_group_->JoinForTesting();
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks);
mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
thread_group_.reset();
}
class ThreadGroupImplOverCapacityTest : public ThreadGroupImplImplTestBase,
public testing::Test {
public:
ThreadGroupImplOverCapacityTest() = default;
ThreadGroupImplOverCapacityTest(const ThreadGroupImplOverCapacityTest&) =
delete;
ThreadGroupImplOverCapacityTest& operator=(
const ThreadGroupImplOverCapacityTest&) = delete;
void SetUp() override {
CreateThreadGroup();
task_runner_ =
test::CreatePooledTaskRunner({MayBlock(), WithBaseSyncPrimitives()},
&mock_pooled_task_runner_delegate_);
}
void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
protected:
scoped_refptr<TaskRunner> task_runner_;
static constexpr size_t kLocalMaxTasks = 3;
void CreateThreadGroup() {
ASSERT_FALSE(thread_group_);
service_thread_.Start();
delayed_task_manager_.Start(service_thread_.task_runner());
thread_group_ = std::make_unique<ThreadGroupImpl>(
"OverCapacityTestThreadGroup", "A", ThreadType::kDefault,
0, task_tracker_.GetTrackedRef(),
tracked_ref_factory_.GetTrackedRef());
ASSERT_TRUE(thread_group_);
mock_pooled_task_runner_delegate_.SetThreadGroup(thread_group_.get());
}
};
TEST_F(ThreadGroupImplOverCapacityTest, VerifyCleanup) {
StartThreadGroup(kReclaimTimeForCleanupTests, kLocalMaxTasks);
TestWaitableEvent threads_running;
TestWaitableEvent threads_continue;
RepeatingClosure threads_running_barrier = BarrierClosure(
kLocalMaxTasks,
BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
TestWaitableEvent blocked_call_continue;
RepeatingClosure closure = BindRepeating(
[](RepeatingClosure* threads_running_barrier,
TestWaitableEvent* threads_continue,
TestWaitableEvent* blocked_call_continue) {
threads_running_barrier->Run();
{
ScopedBlockingCall scoped_blocking_call(FROM_HERE,
BlockingType::WILL_BLOCK);
blocked_call_continue->Wait();
}
threads_continue->Wait();
},
Unretained(&threads_running_barrier), Unretained(&threads_continue),
Unretained(&blocked_call_continue));
for (size_t i = 0; i < kLocalMaxTasks; ++i) {
task_runner_->PostTask(FROM_HERE, closure);
}
threads_running.Wait();
TestWaitableEvent extra_threads_running;
TestWaitableEvent extra_threads_continue;
RepeatingClosure extra_threads_running_barrier = BarrierClosure(
kLocalMaxTasks,
BindOnce(&TestWaitableEvent::Signal, Unretained(&extra_threads_running)));
for (size_t i = 0; i < kLocalMaxTasks; ++i) {
task_runner_->PostTask(
FROM_HERE, BindOnce(
[](RepeatingClosure* extra_threads_running_barrier,
TestWaitableEvent* extra_threads_continue) {
extra_threads_running_barrier->Run();
extra_threads_continue->Wait();
},
Unretained(&extra_threads_running_barrier),
Unretained(&extra_threads_continue)));
}
extra_threads_running.Wait();
ASSERT_EQ(kLocalMaxTasks * 2, thread_group_->NumberOfWorkersForTesting());
EXPECT_EQ(kLocalMaxTasks * 2, thread_group_->GetMaxTasksForTesting());
blocked_call_continue.Signal();
extra_threads_continue.Signal();
for (int i = 0; i < 16; ++i) {
task_runner_->PostDelayedTask(FROM_HERE, DoNothing(),
kReclaimTimeForCleanupTests * i * 0.5);
}
threads_continue.Signal();
thread_group_->WaitForWorkersCleanedUpForTesting(kLocalMaxTasks);
EXPECT_EQ(kLocalMaxTasks, thread_group_->NumberOfWorkersForTesting());
threads_continue.Signal();
task_tracker_.FlushForTesting();
}
TEST_F(ThreadGroupImplBlockingTest, MaximumWorkersTest) {
CreateAndStartThreadGroup();
constexpr size_t kMaxNumberOfWorkers = 256;
constexpr size_t kNumExtraTasks = 10;
TestWaitableEvent early_blocking_threads_running;
RepeatingClosure early_threads_barrier_closure =
BarrierClosure(kMaxNumberOfWorkers,
BindOnce(&TestWaitableEvent::Signal,
Unretained(&early_blocking_threads_running)));
TestWaitableEvent early_threads_finished;
RepeatingClosure early_threads_finished_barrier = BarrierClosure(
kMaxNumberOfWorkers, BindOnce(&TestWaitableEvent::Signal,
Unretained(&early_threads_finished)));
TestWaitableEvent early_release_threads_continue;
for (size_t i = 0; i < kMaxNumberOfWorkers; ++i) {
task_runner_->PostTask(
FROM_HERE, BindOnce(
[](RepeatingClosure* early_threads_barrier_closure,
TestWaitableEvent* early_release_threads_continue,
RepeatingClosure* early_threads_finished) {
{
ScopedBlockingCall scoped_blocking_call(
FROM_HERE, BlockingType::WILL_BLOCK);
early_threads_barrier_closure->Run();
early_release_threads_continue->Wait();
}
early_threads_finished->Run();
},
Unretained(&early_threads_barrier_closure),
Unretained(&early_release_threads_continue),
Unretained(&early_threads_finished_barrier)));
}
early_blocking_threads_running.Wait();
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(),
kMaxTasks + kMaxNumberOfWorkers);
TestWaitableEvent late_release_thread_contine;
TestWaitableEvent late_blocking_threads_running;
RepeatingClosure late_threads_barrier_closure = BarrierClosure(
kNumExtraTasks, BindOnce(&TestWaitableEvent::Signal,
Unretained(&late_blocking_threads_running)));
for (size_t i = 0; i < kNumExtraTasks; ++i) {
task_runner_->PostTask(
FROM_HERE, BindOnce(
[](RepeatingClosure* late_threads_barrier_closure,
TestWaitableEvent* late_release_thread_contine) {
ScopedBlockingCall scoped_blocking_call(
FROM_HERE, BlockingType::WILL_BLOCK);
late_threads_barrier_closure->Run();
late_release_thread_contine->Wait();
},
Unretained(&late_threads_barrier_closure),
Unretained(&late_release_thread_contine)));
}
PlatformThread::Sleep(TestTimeouts::tiny_timeout());
EXPECT_LE(thread_group_->NumberOfWorkersForTesting(), kMaxNumberOfWorkers);
early_release_threads_continue.Signal();
early_threads_finished.Wait();
late_blocking_threads_running.Wait();
TestWaitableEvent final_tasks_running;
TestWaitableEvent final_tasks_continue;
RepeatingClosure final_tasks_running_barrier = BarrierClosure(
kMaxTasks,
BindOnce(&TestWaitableEvent::Signal, Unretained(&final_tasks_running)));
for (size_t i = 0; i < kMaxTasks; ++i) {
task_runner_->PostTask(FROM_HERE,
BindOnce(
[](RepeatingClosure* closure,
TestWaitableEvent* final_tasks_continue) {
closure->Run();
final_tasks_continue->Wait();
},
Unretained(&final_tasks_running_barrier),
Unretained(&final_tasks_continue)));
}
final_tasks_running.Wait();
EXPECT_EQ(thread_group_->GetMaxTasksForTesting(), kMaxTasks + kNumExtraTasks);
late_release_thread_contine.Signal();
final_tasks_continue.Signal();
task_tracker_.FlushForTesting();
}
TEST_F(ThreadGroupImplImplStartInBodyTest, MaxBestEffortTasks) {
constexpr int kMaxBestEffortTasks = kMaxTasks / 2;
StartThreadGroup(TimeDelta::Max(),
kMaxTasks,
kMaxBestEffortTasks);
const scoped_refptr<TaskRunner> foreground_runner =
test::CreatePooledTaskRunner({MayBlock()},
&mock_pooled_task_runner_delegate_);
const scoped_refptr<TaskRunner> background_runner =
test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
&mock_pooled_task_runner_delegate_);
TestWaitableEvent best_effort_tasks_running;
TestWaitableEvent unblock_best_effort_tasks;
RepeatingClosure best_effort_tasks_running_barrier = BarrierClosure(
kMaxBestEffortTasks, BindOnce(&TestWaitableEvent::Signal,
Unretained(&best_effort_tasks_running)));
for (int i = 0; i < kMaxBestEffortTasks; ++i) {
background_runner->PostTask(FROM_HERE, base::BindLambdaForTesting([&] {
best_effort_tasks_running_barrier.Run();
unblock_best_effort_tasks.Wait();
}));
}
best_effort_tasks_running.Wait();
AtomicFlag extra_best_effort_task_can_run;
TestWaitableEvent extra_best_effort_task_running;
background_runner->PostTask(
FROM_HERE, base::BindLambdaForTesting([&] {
EXPECT_TRUE(extra_best_effort_task_can_run.IsSet());
extra_best_effort_task_running.Signal();
}));
TestWaitableEvent foreground_task_running;
foreground_runner->PostTask(
FROM_HERE, base::BindOnce(&TestWaitableEvent::Signal,
Unretained(&foreground_task_running)));
foreground_task_running.Wait();
extra_best_effort_task_can_run.Set();
unblock_best_effort_tasks.Signal();
extra_best_effort_task_running.Wait();
task_tracker_.FlushForTesting();
}
TEST_F(ThreadGroupImplImplStartInBodyTest,
FloodBestEffortTasksDoesNotCreateTooManyWorkers) {
constexpr size_t kMaxBestEffortTasks = kMaxTasks / 2;
StartThreadGroup(TimeDelta::Max(),
kMaxTasks,
kMaxBestEffortTasks);
const scoped_refptr<TaskRunner> runner =
test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
&mock_pooled_task_runner_delegate_);
for (size_t i = 0; i < kLargeNumber; ++i) {
runner->PostTask(FROM_HERE, BindLambdaForTesting([&] {
EXPECT_LE(thread_group_->NumberOfWorkersForTesting(),
kMaxBestEffortTasks + 1);
}));
}
task_tracker_.FlushForTesting();
}
TEST_F(ThreadGroupImplImplStartInBodyTest,
RepeatedWillBlockDoesNotCreateTooManyWorkers) {
constexpr size_t kNumWorkers = 2U;
StartThreadGroup(TimeDelta::Max(),
kNumWorkers,
std::nullopt);
const scoped_refptr<TaskRunner> runner = test::CreatePooledTaskRunner(
{MayBlock()}, &mock_pooled_task_runner_delegate_);
for (size_t i = 0; i < kLargeNumber; ++i) {
runner->PostTask(FROM_HERE, BindLambdaForTesting([&] {
runner->PostTask(
FROM_HERE, BindLambdaForTesting([&] {
EXPECT_LE(
thread_group_->NumberOfWorkersForTesting(),
kNumWorkers + 1);
}));
ScopedBlockingCall scoped_blocking_call(
FROM_HERE, BlockingType::WILL_BLOCK);
EXPECT_EQ(kNumWorkers + 1,
thread_group_->NumberOfWorkersForTesting());
}));
task_tracker_.FlushForTesting();
}
}
namespace {
class ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest
: public ThreadGroupImplImplTestBase,
public testing::TestWithParam<BlockingType> {
public:
static constexpr int kMaxBestEffortTasks = kMaxTasks / 2;
ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest() = default;
ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest(
const ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest&) = delete;
ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest& operator=(
const ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest&) = delete;
void SetUp() override {
CreateThreadGroup();
thread_group_->Start(kMaxTasks, kMaxBestEffortTasks, base::TimeDelta::Max(),
service_thread_.task_runner(), nullptr,
ThreadGroup::WorkerEnvironment::NONE,
false,
{});
}
void TearDown() override { ThreadGroupImplImplTestBase::CommonTearDown(); }
private:
};
}
TEST_P(ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
BlockingCallAndMaxBestEffortTasksTest) {
const scoped_refptr<TaskRunner> background_runner =
test::CreatePooledTaskRunner({TaskPriority::BEST_EFFORT, MayBlock()},
&mock_pooled_task_runner_delegate_);
TestWaitableEvent blocking_best_effort_tasks_running;
TestWaitableEvent unblock_blocking_best_effort_tasks;
RepeatingClosure blocking_best_effort_tasks_running_barrier =
BarrierClosure(kMaxBestEffortTasks,
BindOnce(&TestWaitableEvent::Signal,
Unretained(&blocking_best_effort_tasks_running)));
for (int i = 0; i < kMaxBestEffortTasks; ++i) {
background_runner->PostTask(
FROM_HERE, base::BindLambdaForTesting([&] {
blocking_best_effort_tasks_running_barrier.Run();
ScopedBlockingCall scoped_blocking_call(FROM_HERE, GetParam());
unblock_blocking_best_effort_tasks.Wait();
}));
}
blocking_best_effort_tasks_running.Wait();
TestWaitableEvent best_effort_tasks_running;
TestWaitableEvent unblock_best_effort_tasks;
RepeatingClosure best_effort_tasks_running_barrier = BarrierClosure(
kMaxBestEffortTasks, BindOnce(&TestWaitableEvent::Signal,
Unretained(&best_effort_tasks_running)));
for (int i = 0; i < kMaxBestEffortTasks; ++i) {
background_runner->PostTask(FROM_HERE, base::BindLambdaForTesting([&] {
best_effort_tasks_running_barrier.Run();
unblock_best_effort_tasks.Wait();
}));
}
best_effort_tasks_running.Wait();
unblock_blocking_best_effort_tasks.Signal();
unblock_best_effort_tasks.Signal();
task_tracker_.FlushForTesting();
}
INSTANTIATE_TEST_SUITE_P(MayBlock,
ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
::testing::Values(BlockingType::MAY_BLOCK));
INSTANTIATE_TEST_SUITE_P(WillBlock,
ThreadGroupImplBlockingCallAndMaxBestEffortTasksTest,
::testing::Values(BlockingType::WILL_BLOCK));
TEST_F(ThreadGroupImplImplStartInBodyTest, RacyCleanup) {
constexpr size_t kLocalMaxTasks = 256;
constexpr TimeDelta kReclaimTimeForRacyCleanupTest = Milliseconds(10);
thread_group_->Start(kLocalMaxTasks, kLocalMaxTasks,
kReclaimTimeForRacyCleanupTest,
service_thread_.task_runner(), nullptr,
ThreadGroup::WorkerEnvironment::NONE,
false,
{});
scoped_refptr<TaskRunner> task_runner = test::CreatePooledTaskRunner(
{WithBaseSyncPrimitives()}, &mock_pooled_task_runner_delegate_);
TestWaitableEvent threads_running;
TestWaitableEvent unblock_threads;
RepeatingClosure threads_running_barrier = BarrierClosure(
kLocalMaxTasks,
BindOnce(&TestWaitableEvent::Signal, Unretained(&threads_running)));
for (size_t i = 0; i < kLocalMaxTasks; ++i) {
task_runner->PostTask(
FROM_HERE,
BindOnce(
[](OnceClosure on_running, TestWaitableEvent* unblock_threads) {
std::move(on_running).Run();
unblock_threads->Wait();
},
threads_running_barrier, Unretained(&unblock_threads)));
}
threads_running.Wait();
unblock_threads.Signal();
PlatformThread::Sleep(kReclaimTimeForRacyCleanupTest);
thread_group_->JoinForTesting();
mock_pooled_task_runner_delegate_.SetThreadGroup(nullptr);
thread_group_.reset();
}
}