#include "base/task/thread_pool/worker_thread.h"
#include <stddef.h>
#include <algorithm>
#include <atomic>
#include <utility>
#include "base/allocator/partition_allocator/partition_alloc_buildflags.h"
#include "base/allocator/partition_allocator/partition_alloc_config.h"
#include "base/check_op.h"
#include "base/compiler_specific.h"
#include "base/debug/alias.h"
#include "base/feature_list.h"
#include "base/functional/callback_helpers.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/task_features.h"
#include "base/task/thread_pool/environment_config.h"
#include "base/task/thread_pool/task_tracker.h"
#include "base/task/thread_pool/worker_thread_observer.h"
#include "base/threading/hang_watcher.h"
#include "base/time/time.h"
#include "base/time/time_override.h"
#include "base/trace_event/base_tracing.h"
#include "build/build_config.h"
#include "third_party/abseil-cpp/absl/types/optional.h"
#if BUILDFLAG(IS_OHOS)
#include "base/threading/platform_thread.h"
#endif
#if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
#include "base/files/file_descriptor_watcher_posix.h"
#endif
#if BUILDFLAG(IS_APPLE)
#include "base/mac/scoped_nsautorelease_pool.h"
#endif
#if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
PA_CONFIG(THREAD_CACHE_SUPPORTED)
#include "base/allocator/partition_allocator/thread_cache.h"
#endif
namespace base::internal {
namespace {
#if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
PA_CONFIG(THREAD_CACHE_SUPPORTED)
TimeDelta GetSleepTimeBeforePurge(TimeDelta min_sleep_time) {
const TimeTicks now = TimeTicks::Now();
constexpr TimeDelta kFirstSleepLength = Minutes(1);
static const TimeTicks first_scheduled_wake = now + kFirstSleepLength;
constexpr TimeDelta kPurgeThreadCacheIdleDelay = Seconds(1);
const TimeTicks snapped_wake =
(now + min_sleep_time)
.SnappedToNextTick(TimeTicks(), kPurgeThreadCacheIdleDelay);
return std::max(snapped_wake - now, first_scheduled_wake - now);
}
#endif
bool IsDelayFirstWorkerSleepEnabled() {
static bool state = FeatureList::IsEnabled(kDelayFirstWorkerWake);
return state;
}
}
constexpr TimeDelta WorkerThread::Delegate::kPurgeThreadCacheIdleDelay;
void WorkerThread::Delegate::WaitForWork(WaitableEvent* wake_up_event) {
DCHECK(wake_up_event);
const TimeDelta sleep_time = GetSleepTimeout();
#if BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
PA_CONFIG(THREAD_CACHE_SUPPORTED)
TimeDelta min_sleep_time = std::min(sleep_time, kPurgeThreadCacheIdleDelay);
if (IsDelayFirstWorkerSleepEnabled())
min_sleep_time = GetSleepTimeBeforePurge(min_sleep_time);
const bool was_signaled = wake_up_event->TimedWait(min_sleep_time);
if (!was_signaled) {
partition_alloc::ThreadCache::PurgeCurrentThread();
if (sleep_time > min_sleep_time) {
wake_up_event->TimedWait(sleep_time.is_max()
? base::TimeDelta::Max()
: sleep_time - min_sleep_time);
}
}
#else
wake_up_event->TimedWait(sleep_time);
#endif
}
WorkerThread::WorkerThread(ThreadType thread_type_hint,
std::unique_ptr<Delegate> delegate,
TrackedRef<TaskTracker> task_tracker,
size_t sequence_num,
const CheckedLock* predecessor_lock)
: thread_lock_(predecessor_lock),
delegate_(std::move(delegate)),
task_tracker_(std::move(task_tracker)),
thread_type_hint_(thread_type_hint),
current_thread_type_(GetDesiredThreadType()),
sequence_num_(sequence_num) {
DCHECK(delegate_);
DCHECK(task_tracker_);
DCHECK(CanUseBackgroundThreadTypeForWorkerThread() ||
thread_type_hint_ != ThreadType::kBackground);
DCHECK(CanUseUtilityThreadTypeForWorkerThread() ||
thread_type_hint != ThreadType::kUtility);
wake_up_event_.declare_only_used_while_idle();
}
bool WorkerThread::Start(
scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
WorkerThreadObserver* worker_thread_observer) {
CheckedLock::AssertNoLockHeldOnCurrentThread();
IsDelayFirstWorkerSleepEnabled();
CheckedAutoLock auto_lock(thread_lock_);
DCHECK(thread_handle_.is_null());
#if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
DCHECK(io_thread_task_runner);
io_thread_task_runner_ = std::move(io_thread_task_runner);
#endif
if (should_exit_.IsSet() || join_called_for_testing_.IsSet())
return true;
DCHECK(!worker_thread_observer_);
worker_thread_observer_ = worker_thread_observer;
self_ = this;
constexpr size_t kDefaultStackSize = 0;
PlatformThread::CreateWithType(kDefaultStackSize, this, &thread_handle_,
current_thread_type_);
if (thread_handle_.is_null()) {
self_ = nullptr;
return false;
}
return true;
}
void WorkerThread::WakeUp() {
CheckedLock::AssertNoLockHeldOnCurrentThread();
DCHECK(!join_called_for_testing_.IsSet());
DCHECK(!should_exit_.IsSet());
TRACE_EVENT_INSTANT("wakeup.flow", "WorkerThread::WakeUp",
perfetto::Flow::FromPointer(this));
wake_up_event_.Signal();
}
void WorkerThread::JoinForTesting() {
DCHECK(!join_called_for_testing_.IsSet());
join_called_for_testing_.Set();
wake_up_event_.Signal();
PlatformThreadHandle thread_handle;
{
CheckedAutoLock auto_lock(thread_lock_);
if (thread_handle_.is_null())
return;
thread_handle = thread_handle_;
thread_handle_ = PlatformThreadHandle();
}
PlatformThread::Join(thread_handle);
}
bool WorkerThread::ThreadAliveForTesting() const {
CheckedAutoLock auto_lock(thread_lock_);
return !thread_handle_.is_null();
}
WorkerThread::~WorkerThread() {
CheckedAutoLock auto_lock(thread_lock_);
if (!thread_handle_.is_null()) {
DCHECK(!join_called_for_testing_.IsSet());
PlatformThread::Detach(thread_handle_);
}
}
void WorkerThread::Cleanup() {
DCHECK(!should_exit_.IsSet());
should_exit_.Set();
wake_up_event_.Signal();
}
void WorkerThread::MaybeUpdateThreadType() {
UpdateThreadType(GetDesiredThreadType());
}
void WorkerThread::BeginUnusedPeriod() {
CheckedAutoLock auto_lock(thread_lock_);
DCHECK(last_used_time_.is_null());
last_used_time_ = subtle::TimeTicksNowIgnoringOverride();
}
void WorkerThread::EndUnusedPeriod() {
CheckedAutoLock auto_lock(thread_lock_);
DCHECK(!last_used_time_.is_null());
last_used_time_ = TimeTicks();
}
TimeTicks WorkerThread::GetLastUsedTime() const {
CheckedAutoLock auto_lock(thread_lock_);
return last_used_time_;
}
bool WorkerThread::ShouldExit() const {
return should_exit_.IsSet() || join_called_for_testing_.IsSet() ||
task_tracker_->IsShutdownComplete();
}
ThreadType WorkerThread::GetDesiredThreadType() const {
if (task_tracker_->HasShutdownStarted())
return ThreadType::kDefault;
return thread_type_hint_;
}
void WorkerThread::UpdateThreadType(ThreadType desired_thread_type) {
if (desired_thread_type == current_thread_type_)
return;
PlatformThread::SetCurrentThreadType(desired_thread_type);
current_thread_type_ = desired_thread_type;
}
#if BUILDFLAG(IS_OHOS)
PlatformThreadId WorkerThread::GetRealTid() {
return realtid_;
}
#endif
void WorkerThread::ThreadMain() {
#if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
DCHECK(io_thread_task_runner_);
FileDescriptorWatcher file_descriptor_watcher(io_thread_task_runner_);
#endif
#if BUILDFLAG(IS_OHOS)
realtid_ = PlatformThread::CurrentRealId();
#endif
if (thread_type_hint_ == ThreadType::kBackground) {
switch (delegate_->GetThreadLabel()) {
case ThreadLabel::POOLED:
RunBackgroundPooledWorker();
return;
case ThreadLabel::SHARED:
RunBackgroundSharedWorker();
return;
case ThreadLabel::DEDICATED:
RunBackgroundDedicatedWorker();
return;
#if BUILDFLAG(IS_WIN)
case ThreadLabel::SHARED_COM:
RunBackgroundSharedCOMWorker();
return;
case ThreadLabel::DEDICATED_COM:
RunBackgroundDedicatedCOMWorker();
return;
#endif
}
}
switch (delegate_->GetThreadLabel()) {
case ThreadLabel::POOLED:
RunPooledWorker();
return;
case ThreadLabel::SHARED:
RunSharedWorker();
return;
case ThreadLabel::DEDICATED:
RunDedicatedWorker();
return;
#if BUILDFLAG(IS_WIN)
case ThreadLabel::SHARED_COM:
RunSharedCOMWorker();
return;
case ThreadLabel::DEDICATED_COM:
RunDedicatedCOMWorker();
return;
#endif
}
}
NOINLINE void WorkerThread::RunPooledWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunBackgroundPooledWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunSharedWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunBackgroundSharedWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunDedicatedWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunBackgroundDedicatedWorker() {
RunWorker();
NO_CODE_FOLDING();
}
#if BUILDFLAG(IS_WIN)
NOINLINE void WorkerThread::RunSharedCOMWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunBackgroundSharedCOMWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunDedicatedCOMWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunBackgroundDedicatedCOMWorker() {
RunWorker();
NO_CODE_FOLDING();
}
#endif
void WorkerThread::RunWorker() {
DCHECK_EQ(self_, this);
TRACE_EVENT_INSTANT0("base", "WorkerThread born", TRACE_EVENT_SCOPE_THREAD);
TRACE_EVENT_BEGIN0("base", "WorkerThread active");
if (worker_thread_observer_)
worker_thread_observer_->OnWorkerThreadMainEntry();
delegate_->OnMainEntry(this);
const bool watch_for_hangs =
base::HangWatcher::IsThreadPoolHangWatchingEnabled() &&
GetDesiredThreadType() != ThreadType::kBackground;
base::ScopedClosureRunner unregister_for_hang_watching;
if (watch_for_hangs) {
unregister_for_hang_watching = base::HangWatcher::RegisterThread(
base::HangWatcher::ThreadType::kThreadPoolThread);
}
{
TRACE_EVENT_END0("base", "WorkerThread active");
PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
delegate_->WaitForWork(&wake_up_event_);
TRACE_EVENT_BEGIN("base", "WorkerThread active",
perfetto::TerminatingFlow::FromPointer(this));
}
bool got_work_this_wakeup = false;
while (!ShouldExit()) {
#if BUILDFLAG(IS_APPLE)
mac::ScopedNSAutoreleasePool autorelease_pool;
#endif
absl::optional<WatchHangsInScope> hang_watch_scope;
if (watch_for_hangs)
hang_watch_scope.emplace();
UpdateThreadType(GetDesiredThreadType());
RegisteredTaskSource task_source = delegate_->GetWork(this);
if (!task_source) {
if (ShouldExit())
break;
if (!got_work_this_wakeup)
delegate_->RecordUnnecessaryWakeup();
TRACE_EVENT_END0("base", "WorkerThread active");
PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
hang_watch_scope.reset();
delegate_->WaitForWork(&wake_up_event_);
got_work_this_wakeup = false;
TRACE_EVENT_BEGIN("base", "WorkerThread active",
perfetto::TerminatingFlow::FromPointer(this));
continue;
}
got_work_this_wakeup = true;
TaskSource* task_source_before_run = task_source.get();
base::debug::Alias(&task_source_before_run);
task_source = task_tracker_->RunAndPopNextTask(std::move(task_source));
TaskSource* task_source_before_move = task_source.get();
base::debug::Alias(&task_source_before_move);
delegate_->DidProcessTask(std::move(task_source));
CHECK(!task_source);
wake_up_event_.Reset();
}
delegate_->OnMainExit(this);
if (worker_thread_observer_)
worker_thread_observer_->OnWorkerThreadMainExit();
self_ = nullptr;
TRACE_EVENT_END0("base", "WorkerThread active");
TRACE_EVENT_INSTANT0("base", "WorkerThread dead", TRACE_EVENT_SCOPE_THREAD);
PERFETTO_INTERNAL_ADD_EMPTY_EVENT();
}
}