#include "base/profiler/thread_group_profiler.h"
#include <memory>
#include "base/check.h"
#include "base/functional/bind.h"
#include "base/memory/ptr_util.h"
#include "base/numerics/safe_conversions.h"
#include "base/profiler/periodic_sampling_scheduler.h"
#include "base/profiler/sample_metadata.h"
#include "base/profiler/sampling_profiler_thread_token.h"
#include "base/profiler/stack_sampling_profiler.h"
#include "base/profiler/thread_group_profiler_client.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/bind_post_task.h"
#include "base/task/sequenced_task_runner.h"
#include "base/task/thread_pool/worker_thread.h"
#include "base/time/time.h"
namespace base {
namespace {
ThreadGroupProfilerClient* g_thread_group_profiler_client = nullptr;
constexpr double kFractionOfExecutionTimeToSample = 0.02;
constexpr char kProfilerMetadataThreadGroupType[] = "ThreadGroupType";
const TimeDelta kMinRemainingTimeForNewThreadSampling = Seconds(1);
}
void ThreadGroupProfiler::SetClient(
std::unique_ptr<ThreadGroupProfilerClient> client) {
delete g_thread_group_profiler_client;
g_thread_group_profiler_client = client.release();
}
bool ThreadGroupProfiler::IsProfilingEnabled() {
return GetClient() && GetClient()->IsProfilerEnabledForCurrentProcess();
}
ThreadGroupProfiler::ThreadGroupProfiler(
scoped_refptr<base::SequencedTaskRunner> task_runner,
int64_t thread_group_type,
std::unique_ptr<PeriodicSamplingScheduler> periodic_sampling_scheduler,
ProfilerFactory profiler_factory)
: thread_group_type_(thread_group_type),
periodic_sampling_scheduler_(std::move(periodic_sampling_scheduler)),
task_runner_(std::move(task_runner)),
stack_sampling_profiler_factory_(std::move(profiler_factory)) {
DETACH_FROM_SEQUENCE(task_runner_sequence_checker_);
if (!periodic_sampling_scheduler_) {
periodic_sampling_scheduler_ = std::make_unique<PeriodicSamplingScheduler>(
GetClient()->GetSamplingParams().sampling_interval *
GetClient()->GetSamplingParams().samples_per_profile,
kFractionOfExecutionTimeToSample, TimeTicks::Now());
}
task_runner_->PostTask(
FROM_HERE, BindOnce(&ThreadGroupProfiler::StartTask, Unretained(this)));
}
ThreadGroupProfiler::~ThreadGroupProfiler() {
CHECK(!active_collection_);
}
void ThreadGroupProfiler::Shutdown() {
DCHECK_CALLED_ON_VALID_SEQUENCE(construction_sequence_checker_);
CHECK(!task_runner_->PostTask(FROM_HERE, DoNothing()));
TS_UNCHECKED_READ(active_collection_).reset();
thread_group_profiler_shutdown_.Signal();
}
void ThreadGroupProfiler::OnWorkerThreadStarted(
internal::WorkerThread* worker_thread) {
task_runner_->PostTask(
FROM_HERE, BindOnce(&ThreadGroupProfiler::OnWorkerThreadStartedTask,
Unretained(this), Unretained(worker_thread),
GetSamplingProfilerCurrentThreadToken()));
}
void ThreadGroupProfiler::OnWorkerThreadActive(
internal::WorkerThread* worker_thread) {
task_runner_->PostTask(
FROM_HERE, BindOnce(&ThreadGroupProfiler::OnWorkerThreadActiveTask,
Unretained(this), Unretained(worker_thread)));
}
void ThreadGroupProfiler::OnWorkerThreadIdle(
internal::WorkerThread* worker_thread) {
task_runner_->PostTask(FROM_HERE,
BindOnce(&ThreadGroupProfiler::OnWorkerThreadIdleTask,
Unretained(this), Unretained(worker_thread)));
}
void ThreadGroupProfiler::OnWorkerThreadExiting(
internal::WorkerThread* worker_thread) {
WaitableEvent profiling_has_stopped;
task_runner_->PostTask(
FROM_HERE, BindOnce(&ThreadGroupProfiler::OnWorkerThreadExitingTask,
Unretained(this), Unretained(worker_thread),
Unretained(&profiling_has_stopped)));
base::WaitableEvent* event_array[] = {&profiling_has_stopped,
&thread_group_profiler_shutdown_};
WaitableEvent::WaitMany(event_array);
}
class ThreadGroupProfiler::ProfilerImpl : public ThreadGroupProfiler::Profiler {
public:
ProfilerImpl(SamplingProfilerThreadToken thread_token,
const StackSamplingProfiler::SamplingParams& params,
std::unique_ptr<ProfileBuilder> profile_builder,
StackSamplingProfiler::UnwindersFactory unwinder_factory)
: sampling_profiler_{thread_token, params, std::move(profile_builder),
std::move(unwinder_factory)} {}
~ProfilerImpl() override = default;
void Start() override { sampling_profiler_.Start(); }
private:
StackSamplingProfiler sampling_profiler_;
};
ThreadGroupProfiler::ActiveCollection::ActiveCollection(
const flat_map<internal::WorkerThread*, WorkerThreadContext>&
worker_thread_context_set,
int64_t thread_group_type,
const TimeDelta& sampling_duration,
SequencedTaskRunner* task_runner,
ProfilerFactory factory,
OnceClosure collection_complete_callback)
: thread_group_type_(thread_group_type),
task_runner_(task_runner),
stack_sampling_profiler_factory_(factory),
collection_complete_callback_(std::move(collection_complete_callback)),
sampling_duration_(sampling_duration),
collection_end_time_(TimeTicks::Now() + sampling_duration),
empty_collection_closure_{
BindOnce(&ActiveCollection::OnEmptyCollectionCompleted,
Unretained(this))} {
decltype(profilers_)::container_type new_profilers;
for (auto& [worker_thread, context] : worker_thread_context_set) {
if (!context.is_idle) {
std::unique_ptr<Profiler> profiler = CreateSamplingProfilerForThread(
worker_thread, context.token, GetClient()->GetSamplingParams());
profiler->Start();
AddProfileMetadataForThread(kProfilerMetadataThreadGroupType,
thread_group_type_, context.token.id);
new_profilers.emplace_back(worker_thread, std::move(profiler));
}
}
profilers_ = flat_map(std::move(new_profilers));
if (profilers_.empty()) {
task_runner_->PostDelayedTask(
FROM_HERE, empty_collection_closure_.callback(), sampling_duration_);
} else {
empty_collection_closure_.Cancel();
}
}
void ThreadGroupProfiler::ActiveCollection::MaybeAddWorkerThread(
internal::WorkerThread* worker_thread,
const SamplingProfilerThreadToken& token) {
if ((collection_end_time_ - TimeTicks::Now()) <
kMinRemainingTimeForNewThreadSampling) {
return;
}
if (profilers_.find(worker_thread) != profilers_.end()) {
return;
}
StackSamplingProfiler::SamplingParams sampling_params =
GetClient()->GetSamplingParams();
sampling_params.samples_per_profile =
ClampFloor((collection_end_time_ - TimeTicks::Now()) /
sampling_params.sampling_interval);
std::unique_ptr<Profiler> profiler =
CreateSamplingProfilerForThread(worker_thread, token, sampling_params);
profiler->Start();
AddProfileMetadataForThread(kProfilerMetadataThreadGroupType,
thread_group_type_, token.id);
profilers_.emplace(worker_thread, std::move(profiler));
empty_collection_closure_.Cancel();
}
void ThreadGroupProfiler::ActiveCollection::RemoveWorkerThread(
internal::WorkerThread* worker_thread) {
const bool was_present = profilers_.erase(worker_thread) == 1;
if (!was_present || !profilers_.empty()) {
return;
}
empty_collection_closure_.Reset(BindOnce(
&ActiveCollection::OnEmptyCollectionCompleted, Unretained(this)));
task_runner_->PostDelayedTask(FROM_HERE, empty_collection_closure_.callback(),
collection_end_time_ - TimeTicks::Now());
}
std::unique_ptr<ThreadGroupProfiler::Profiler>
ThreadGroupProfiler::ActiveCollection::CreateSamplingProfilerForThread(
internal::WorkerThread* worker_thread,
const SamplingProfilerThreadToken& token,
const StackSamplingProfiler::SamplingParams& sampling_params) {
ThreadGroupProfilerClient* client = ThreadGroupProfiler::GetClient();
return stack_sampling_profiler_factory_.Run(
token, sampling_params,
client->CreateProfileBuilder(BindPostTask(
task_runner_,
BindOnce(&ActiveCollection::OnProfilerCollectionCompleted,
Unretained(this), Unretained(worker_thread)))),
client->GetUnwindersFactory());
}
void ThreadGroupProfiler::ActiveCollection::OnProfilerCollectionCompleted(
internal::WorkerThread* worker_thread) {
DCHECK(!profilers_.empty());
profilers_.erase(worker_thread);
if (profilers_.empty()) {
std::move(collection_complete_callback_).Run();
}
}
void ThreadGroupProfiler::ActiveCollection::OnEmptyCollectionCompleted() {
DCHECK(profilers_.empty());
std::move(collection_complete_callback_).Run();
}
ThreadGroupProfiler::ActiveCollection::~ActiveCollection() = default;
ThreadGroupProfilerClient* ThreadGroupProfiler::GetClient() {
return g_thread_group_profiler_client;
}
ThreadGroupProfiler::ProfilerFactory
ThreadGroupProfiler::GetDefaultProfilerFactory() {
return BindRepeating(
[](SamplingProfilerThreadToken thread_token,
const StackSamplingProfiler::SamplingParams& params,
std::unique_ptr<ProfileBuilder> profile_builder,
StackSamplingProfiler::UnwindersFactory unwinder_factory)
-> std::unique_ptr<Profiler> {
return std::make_unique<ProfilerImpl>(thread_token, params,
std::move(profile_builder),
std::move(unwinder_factory));
});
}
TimeDelta ThreadGroupProfiler::GetSamplingDuration() {
StackSamplingProfiler::SamplingParams params =
GetClient()->GetSamplingParams();
return params.sampling_interval * params.samples_per_profile;
}
void ThreadGroupProfiler::ThreadGroupProfiler::StartTask() {
DCHECK_CALLED_ON_VALID_SEQUENCE(task_runner_sequence_checker_);
task_runner_->PostDelayedTask(
FROM_HERE,
base::BindOnce(&ThreadGroupProfiler::CollectProfilesTask,
Unretained(this)),
periodic_sampling_scheduler_->GetTimeToNextCollection());
}
void ThreadGroupProfiler::OnWorkerThreadStartedTask(
internal::WorkerThread* worker_thread,
SamplingProfilerThreadToken token) {
DCHECK_CALLED_ON_VALID_SEQUENCE(task_runner_sequence_checker_);
const bool inserted =
worker_thread_context_set_
.emplace(worker_thread, WorkerThreadContext{token,
true})
.second;
DCHECK(inserted);
}
void ThreadGroupProfiler::OnWorkerThreadActiveTask(
internal::WorkerThread* worker_thread) {
DCHECK_CALLED_ON_VALID_SEQUENCE(task_runner_sequence_checker_);
auto it = worker_thread_context_set_.find(worker_thread);
DCHECK(it != worker_thread_context_set_.end());
it->second.is_idle = false;
if (active_collection_) {
active_collection_->MaybeAddWorkerThread(worker_thread, it->second.token);
}
}
void ThreadGroupProfiler::OnWorkerThreadIdleTask(
internal::WorkerThread* worker_thread) {
DCHECK_CALLED_ON_VALID_SEQUENCE(task_runner_sequence_checker_);
auto it = worker_thread_context_set_.find(worker_thread);
DCHECK(it != worker_thread_context_set_.end());
it->second.is_idle = true;
}
void ThreadGroupProfiler::OnWorkerThreadExitingTask(
internal::WorkerThread* worker_thread,
WaitableEvent* profiling_has_stopped) {
DCHECK_CALLED_ON_VALID_SEQUENCE(task_runner_sequence_checker_);
if (active_collection_) {
active_collection_->RemoveWorkerThread(worker_thread);
}
worker_thread_context_set_.erase(worker_thread);
profiling_has_stopped->Signal();
}
void ThreadGroupProfiler::CollectProfilesTask() {
DCHECK_CALLED_ON_VALID_SEQUENCE(task_runner_sequence_checker_);
DCHECK(!active_collection_);
active_collection_.emplace(
worker_thread_context_set_, thread_group_type_, GetSamplingDuration(),
task_runner_.get(), stack_sampling_profiler_factory_,
BindOnce(&ThreadGroupProfiler::EndActiveCollectionTask,
Unretained(this)));
}
void ThreadGroupProfiler::EndActiveCollectionTask() {
DCHECK_CALLED_ON_VALID_SEQUENCE(task_runner_sequence_checker_);
DCHECK(active_collection_);
active_collection_.reset();
task_runner_->PostDelayedTask(
FROM_HERE,
base::BindOnce(&ThreadGroupProfiler::CollectProfilesTask,
Unretained(this)),
periodic_sampling_scheduler_->GetTimeToNextCollection());
}
}