#include "Heap/GcThreadPool.h"
#if defined(__linux__) || defined(hongmeng) || defined(__APPLE__)
#include <sys/resource.h>
#endif
#include <sched.h>
#include "Base/Log.h"
#include "Base/Panic.h"
#include "Base/SysCall.h"
#include "Mutator/MutatorManager.h"
#include "securec.h"
namespace MapleRuntime {
GCPoolThread::GCPoolThread(GCThreadPool* threadPool, const char* threadName, size_t threadId, size_t stackSize)
: id(threadId), tid(-1), name(threadName), pool(threadPool)
{
pthread_attr_t attr;
CHECK_PTHREAD_CALL(pthread_attr_init, (&attr), "");
CHECK_PTHREAD_CALL(pthread_attr_setstacksize, (&attr, stackSize), stackSize);
CHECK_PTHREAD_CALL(pthread_create, (&pthread, nullptr, &WorkerFunc, this), "GCPoolThread init");
#ifdef __WIN64
CHECK_PTHREAD_CALL(pthread_setname_np, (pthread, threadName), "GCPoolThread SetName");
#endif
CHECK_PTHREAD_CALL(pthread_attr_destroy, (&attr), "GCPoolThread init");
}
GCPoolThread::~GCPoolThread()
{
CHECK_PTHREAD_CALL(pthread_join, (pthread, nullptr), "thread deinit");
pool = nullptr;
}
#if defined(__linux__) || defined(hongmeng)
void GCPoolThread::SetPriority(int32_t priority) const { SetThreadPriority(tid, priority); }
void GCPoolThread::SetThreadPriority(pid_t tid, int32_t priority)
{
errno = 0;
int ret = ::setpriority(static_cast<int>(PRIO_PROCESS), static_cast<uint32_t>(tid), priority);
CHECK_E(UNLIKELY(ret != 0 && errno != 0), "::setpriority(tid %d, priority %d) failed: %s", tid, priority,
::strerror(errno));
}
#endif
void* GCPoolThread::WorkerFunc(void* param)
{
ThreadLocal::SetThreadType(ThreadType::GC_THREAD);
GCPoolThread* thread = reinterpret_cast<GCPoolThread*>(param);
GCThreadPool* pool = thread->pool;
#ifdef __APPLE__
CHECK_PTHREAD_CALL(pthread_setname_np, (thread->name.Str()), "GCPoolThread SetName");
#elif defined(__linux__) || defined(hongmeng)
CHECK_PTHREAD_CALL(prctl, (PR_SET_NAME, thread->name.Str()), "GCPoolThread SetName");
#endif
#if defined(__linux__) || defined(hongmeng)
thread->tid = MapleRuntime::GetTid();
SetThreadPriority(thread->tid, pool->priority);
#endif
while (!pool->IsExited()) {
HeapWork* task = nullptr;
{
std::unique_lock<std::mutex> poolLock(pool->poolMutex);
while (((pool->currActiveThreadNum > pool->maxActiveThreadNum) || !pool->IsRunning()) &&
!pool->IsExited()) {
--(pool->currActiveThreadNum);
if (pool->currActiveThreadNum == 0) {
pool->allThreadStopped.notify_all();
}
pool->threadSleepingCondVar.wait(poolLock);
++(pool->currActiveThreadNum);
}
while (pool->workQueue.empty() && pool->IsRunning() && !pool->IsExited()) {
++(pool->waitingThreadNum);
if (pool->waitingThreadNum == pool->maxActiveThreadNum) {
pool->allWorkDoneCondVar.notify_all();
}
pool->taskEmptyCondVar.wait(poolLock);
--(pool->waitingThreadNum);
}
if (!pool->workQueue.empty() && pool->IsRunning() && !pool->IsExited()) {
task = pool->workQueue.front();
pool->workQueue.pop();
}
}
if (task != nullptr) {
task->Execute(thread->id);
delete task;
}
}
{
std::unique_lock<std::mutex> poolLock(pool->poolMutex);
--(pool->currActiveThreadNum);
if (pool->currActiveThreadNum == 0) {
pool->allThreadStopped.notify_all();
}
}
return nullptr;
}
const int MAX_NAME_LEN = 256;
GCThreadPool::GCThreadPool(const char* poolName, int32_t threadNum, int32_t prior)
: priority(prior), name(poolName), running(false), exit(false), maxThreadNum(threadNum),
maxActiveThreadNum(threadNum), currActiveThreadNum(maxThreadNum), waitingThreadNum(0)
{
char threadName[MAX_NAME_LEN];
for (int32_t i = 0; i < maxThreadNum; ++i) {
errno_t ret = snprintf_s(threadName, MAX_NAME_LEN, (MAX_NAME_LEN - 1), "%s-pool-t%d", poolName, (i + 1));
CHECK_E(ret < 0, "snprintf_s name = %s threadId%d in GCThreadPool::GCThreadPool return %d rather than 0.",
name.Str(), (i + 1), ret);
GCPoolThread* threadItem = new (std::nothrow) GCPoolThread(this, threadName, (i + 1), DEFAULT_STACK_SIZE);
CHECK_DETAIL(threadItem != nullptr, "new GCPoolThread failed");
threads.push_back(threadItem);
}
Stop();
LOG(RTLOG_DEBUG, "GCThreadPool init");
}
void GCThreadPool::Exit()
{
std::unique_lock<std::mutex> poolLock(poolMutex);
exit.store(true, std::memory_order_relaxed);
taskEmptyCondVar.notify_all();
threadSleepingCondVar.notify_all();
allWorkDoneCondVar.notify_all();
allThreadStopped.notify_all();
LOG(RTLOG_DEBUG, "GCThreadPool Exit");
}
GCThreadPool::~GCThreadPool()
{
for (auto thread : threads) {
delete thread;
}
threads.clear();
ClearAllWork();
}
#if defined(__linux__) || defined(hongmeng)
void GCThreadPool::SetPriority(int32_t prior) const
{
for (auto thread : threads) {
thread->SetPriority(prior);
}
}
#endif
void GCThreadPool::SetMaxActiveThreadNum(int32_t num)
{
std::unique_lock<std::mutex> poolLock(poolMutex);
int32_t oldNum = maxActiveThreadNum;
if (num >= maxThreadNum) {
maxActiveThreadNum = maxThreadNum;
} else if (num > 0) {
maxActiveThreadNum = num;
} else {
LOG(RTLOG_ERROR, "SetMaxActiveThreadNum invalid input val");
return;
}
if ((maxActiveThreadNum > oldNum) && (waitingThreadNum > 0) && IsRunning()) {
threadSleepingCondVar.notify_all();
}
}
void GCThreadPool::AddWork(HeapWork* task)
{
CHECK_DETAIL(task != nullptr, "failed to add a null task");
std::unique_lock<std::mutex> poolLock(poolMutex);
workQueue.push(task);
if (IsRunning() && (waitingThreadNum > 0)) {
taskEmptyCondVar.notify_one();
}
}
void GCThreadPool::Start()
{
std::unique_lock<std::mutex> poolLock(poolMutex);
running.store(true, std::memory_order_relaxed);
threadSleepingCondVar.notify_all();
}
void GCThreadPool::DrainWorkQueue()
{
MRT_ASSERT(!IsRunning(), "thread pool is running");
HeapWork* task = nullptr;
do {
task = nullptr;
poolMutex.lock();
if (!workQueue.empty()) {
task = workQueue.front();
workQueue.pop();
}
poolMutex.unlock();
if (task != nullptr) {
task->Execute(0);
delete task;
}
} while (task != nullptr);
}
void GCThreadPool::WaitFinish()
{
HeapWork* task = nullptr;
do {
task = nullptr;
poolMutex.lock();
if (!workQueue.empty() && IsRunning() && !IsExited()) {
task = workQueue.front();
workQueue.pop();
}
poolMutex.unlock();
if (task != nullptr) {
task->Execute(0);
delete task;
}
} while (task != nullptr);
{
std::unique_lock<std::mutex> poolLock(poolMutex);
while ((waitingThreadNum != maxActiveThreadNum) && IsRunning() && !IsExited()) {
allWorkDoneCondVar.wait(poolLock);
}
}
Stop();
DrainWorkQueue();
}
void GCThreadPool::Stop()
{
std::unique_lock<std::mutex> poolLock(poolMutex);
running.store(false, std::memory_order_relaxed);
taskEmptyCondVar.notify_all();
while (currActiveThreadNum != 0) {
allThreadStopped.wait(poolLock);
}
}
void GCThreadPool::ClearAllWork()
{
std::unique_lock<std::mutex> poolLock(poolMutex);
while (!workQueue.empty()) {
HeapWork* task = workQueue.front();
workQueue.pop();
delete task;
}
}
#if defined(MRT_DEBUG) && (MRT_DEBUG == 1)
ScopedCpuTime::ScopedCpuTime(GCThreadPool& threadPool, LogType type) : logType(type)
{
if (ENABLE_LOG(logType)) {
#if defined(__linux__) || defined(hongmeng)
threadCount = threadPool.GetMaxThreadNum() + 1;
cid.resize(threadCount);
workerStart.resize(threadCount);
pthread_getcpuclockid(pthread_self(), &cid[0]);
clock_gettime(cid[0], &workerStart[0]);
size_t index = 1;
for (auto worker : threadPool.GetThreads()) {
pthread_t thread = worker->GetThread();
pthread_getcpuclockid(thread, &cid[index]);
clock_gettime(cid[index], &workerStart[index]);
++index;
}
#else
VLOG(logType, "ScopedCpuTime is not supported in windows yet.");
#endif
}
}
ScopedCpuTime::~ScopedCpuTime()
{
if (ENABLE_LOG(logType)) {
#if defined(__linux__) || defined(hongmeng)
struct timespec workerEnd[threadCount];
uint64_t workerCpuTime[threadCount];
for (size_t i = 0; i < threadCount; ++i) {
clock_gettime(cid[i], &workerEnd[i]);
workerCpuTime[i] = static_cast<uint64_t>(workerEnd[i].tv_sec - workerStart[i].tv_sec) *
MapleRuntime::SECOND_TO_NANO_SECOND +
static_cast<uint64_t>((workerEnd[i].tv_nsec - workerStart[i].tv_nsec));
}
for (size_t i = 0; i < threadCount; ++i) {
VLOG(logType, "worker %zu cputime: %lu,\t", i, workerCpuTime[i]);
}
#endif
}
}
#endif
}