* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "thread_manager.h"
#include <errno.h>
#include <pthread.h>
#include <stdatomic.h>
#include "appspawn_utils.h"
#include "list.h"
typedef struct {
atomic_uint threadExit;
uint32_t index;
pthread_t threadId;
} ThreadNode;
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
ListNode taskList;
ListNode waitingTaskQueue;
ListNode executingTaskQueue;
ListNode executorQueue;
uint32_t executorCount;
uint32_t maxThreadCount;
uint32_t currTaskId;
struct timespec lastAdjust;
uint32_t validThreadCount;
ThreadNode threadNode[1];
} ThreadManager;
typedef struct {
uint32_t taskId;
ListNode node;
ListNode executorList;
uint32_t totalTask;
atomic_uint taskFlags;
atomic_uint finishTaskCount;
const ThreadContext *context;
TaskFinishProcessor finishProcess;
pthread_mutex_t mutex;
pthread_cond_t cond;
} TaskNode;
typedef struct {
ListNode node;
ListNode executeNode;
TaskNode *task;
const ThreadContext *context;
TaskExecutor executor;
} TaskExecuteNode;
static ThreadManager *g_threadManager = NULL;
static void *ManagerThreadProc(void *args);
static void *ThreadExecute(void *args);
static void SetCondAttr(pthread_cond_t *cond)
{
pthread_condattr_t attr;
pthread_condattr_init(&attr);
pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
pthread_cond_init(cond, &attr);
pthread_condattr_destroy(&attr);
}
static void ConvertToTimespec(int time, struct timespec *tm)
{
struct timespec start;
clock_gettime(CLOCK_MONOTONIC, &start);
uint64_t ns = time;
ns *= APPSPAWN_MSEC_TO_NSEC;
ns += start.tv_sec * APPSPAWN_SEC_TO_NSEC + start.tv_nsec;
tm->tv_sec = ns / APPSPAWN_SEC_TO_NSEC;
tm->tv_nsec = ns % APPSPAWN_SEC_TO_NSEC;
}
static TaskExecuteNode *PopTaskExecutor(ThreadManager *mgr)
{
TaskExecuteNode *executor = NULL;
pthread_mutex_lock(&mgr->mutex);
ListNode *node = mgr->executorQueue.next;
if (node != &mgr->executorQueue) {
OH_ListRemove(node);
OH_ListInit(node);
executor = ListEntry(node, TaskExecuteNode, executeNode);
mgr->executorCount--;
}
pthread_mutex_unlock(&mgr->mutex);
return executor;
}
static int AddExecutor(ThreadManager *mgr, const TaskNode *task)
{
ListNode *node = task->executorList.next;
while (node != &task->executorList) {
TaskExecuteNode *executor = ListEntry(node, TaskExecuteNode, node);
APPSPAWN_LOGV("AddExecutor task: %{public}u executorCount: %{public}u executor: %{public}u",
task->taskId, mgr->executorCount, executor->task->taskId);
pthread_mutex_lock(&mgr->mutex);
OH_ListRemove(&executor->executeNode);
OH_ListInit(&executor->executeNode);
OH_ListAddTail(&mgr->executorQueue, &executor->executeNode);
mgr->executorCount++;
pthread_mutex_unlock(&mgr->mutex);
node = node->next;
}
return 0;
}
static void RunExecutor(ThreadManager *mgr, ThreadNode *threadNode, uint32_t maxCount)
{
APPSPAWN_LOGV("RunExecutor in thread: %{public}d executorCount: %{public}u ",
threadNode->index, mgr->executorCount);
TaskExecuteNode *executor = PopTaskExecutor(mgr);
uint32_t count = 0;
while (executor != NULL && !threadNode->threadExit) {
APPSPAWN_LOGV("RunExecutor task: %{public}u", executor->task->taskId);
atomic_fetch_add(&executor->task->finishTaskCount, 1);
executor->executor(executor->task->taskId, executor->context);
count++;
if (count >= maxCount) {
break;
}
executor = PopTaskExecutor(mgr);
}
APPSPAWN_LOGV("RunExecutor executorCount: %{public}u end", mgr->executorCount);
}
static int TaskCompareTaskId(ListNode *node, void *data)
{
TaskNode *task = ListEntry(node, TaskNode, node);
return task->taskId - *(uint32_t *)data;
}
static TaskNode *GetTask(ThreadManager *mgr, ListNode *queue, uint32_t taskId)
{
ListNode *node = NULL;
pthread_mutex_lock(&mgr->mutex);
node = OH_ListFind(queue, &taskId, TaskCompareTaskId);
pthread_mutex_unlock(&mgr->mutex);
if (node == NULL) {
return NULL;
}
return ListEntry(node, TaskNode, node);
}
static void DeleteTask(TaskNode *task)
{
APPSPAWN_LOGV("DeleteTask task: %{public}u ", task->taskId);
if (!ListEmpty(task->node)) {
return;
}
OH_ListRemoveAll(&task->executorList, NULL);
pthread_cond_destroy(&task->cond);
pthread_mutex_destroy(&task->mutex);
free(task);
}
static TaskNode *PopTask(ThreadManager *mgr, ListNode *queue)
{
TaskNode *task = NULL;
pthread_mutex_lock(&mgr->mutex);
ListNode *node = queue->next;
if (node != queue) {
OH_ListRemove(node);
OH_ListInit(node);
task = ListEntry(node, TaskNode, node);
}
pthread_mutex_unlock(&mgr->mutex);
return task;
}
static void PushTask(ThreadManager *mgr, TaskNode *task, ListNode *queue)
{
pthread_mutex_lock(&mgr->mutex);
OH_ListAddTail(queue, &task->node);
pthread_cond_broadcast(&mgr->cond);
pthread_mutex_unlock(&mgr->mutex);
}
static void SafeRemoveTask(ThreadManager *mgr, TaskNode *task)
{
pthread_mutex_lock(&mgr->mutex);
OH_ListRemove(&task->node);
OH_ListInit(&task->node);
pthread_mutex_unlock(&mgr->mutex);
ListNode *node = task->executorList.next;
while (node != &task->executorList) {
OH_ListRemove(node);
OH_ListInit(node);
TaskExecuteNode *executor = ListEntry(node, TaskExecuteNode, node);
pthread_mutex_lock(&mgr->mutex);
if (!ListEmpty(executor->executeNode)) {
OH_ListRemove(&executor->executeNode);
OH_ListInit(&executor->executeNode);
mgr->executorCount--;
}
pthread_mutex_unlock(&mgr->mutex);
free(executor);
node = task->executorList.next;
}
}
static void ExecuteTask(ThreadManager *mgr)
{
TaskNode *task = PopTask(mgr, &mgr->waitingTaskQueue);
if (task == NULL) {
return;
}
APPSPAWN_LOGV("ExecuteTask task: %{public}u ", task->taskId);
AddExecutor(mgr, task);
PushTask(mgr, task, &mgr->executingTaskQueue);
return;
}
static void CheckTaskComplete(ThreadManager *mgr)
{
TaskNode *task = PopTask(mgr, &mgr->executingTaskQueue);
if (task == NULL) {
return;
}
if (task->totalTask <= atomic_load(&task->finishTaskCount)) {
if (task->finishProcess != NULL) {
task->finishProcess(task->taskId, task->context);
DeleteTask(task);
return;
}
pthread_mutex_lock(&task->mutex);
pthread_cond_signal(&task->cond);
pthread_mutex_unlock(&task->mutex);
return;
}
PushTask(mgr, task, &mgr->executingTaskQueue);
return;
}
static void TaskQueueDestroyProc(ListNode *node)
{
OH_ListRemove(node);
TaskNode *task = ListEntry(node, TaskNode, node);
DeleteTask(task);
}
int CreateThreadMgr(uint32_t maxThreadCount, ThreadMgr *instance)
{
if (g_threadManager != NULL) {
*instance = (ThreadMgr)g_threadManager;
return 0;
}
ThreadManager *mgr = (ThreadManager *)malloc(sizeof(ThreadManager) + maxThreadCount * sizeof(ThreadNode));
APPSPAWN_CHECK(mgr != NULL, return -1, "Failed to create thread manager");
mgr->executorCount = 0;
mgr->currTaskId = 0;
mgr->validThreadCount = 0;
mgr->maxThreadCount = maxThreadCount;
OH_ListInit(&mgr->taskList);
OH_ListInit(&mgr->waitingTaskQueue);
OH_ListInit(&mgr->executingTaskQueue);
OH_ListInit(&mgr->executorQueue);
pthread_mutex_init(&mgr->mutex, NULL);
SetCondAttr(&mgr->cond);
for (uint32_t index = 0; index < maxThreadCount + 1; index++) {
mgr->threadNode[index].index = index;
mgr->threadNode[index].threadId = INVALID_THREAD_ID;
atomic_init(&mgr->threadNode[index].threadExit, 0);
}
g_threadManager = mgr;
int ret = pthread_create(&mgr->threadNode[0].threadId, NULL, ManagerThreadProc, (void *)&mgr->threadNode[0]);
if (ret != 0) {
APPSPAWN_LOGE("Failed to create thread for manager");
g_threadManager = NULL;
free(mgr);
return -1;
}
*instance = (ThreadMgr)mgr;
APPSPAWN_LOGV("Create thread manager success maxThreadCount: %{public}u", maxThreadCount);
return 0;
}
int DestroyThreadMgr(ThreadMgr instance)
{
APPSPAWN_LOGV("DestroyThreadMgr");
ThreadManager *mgr = (ThreadManager *)instance;
APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
atomic_store(&mgr->threadNode[index].threadExit, 1);
APPSPAWN_LOGV("DestroyThreadMgr index %{public}d %{public}d", index, mgr->threadNode[index].threadExit);
}
}
pthread_mutex_lock(&mgr->mutex);
pthread_cond_broadcast(&mgr->cond);
pthread_mutex_unlock(&mgr->mutex);
for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
pthread_join(mgr->threadNode[index].threadId, NULL);
APPSPAWN_LOGV("DestroyThreadMgr index %{public}d end", index);
}
}
pthread_mutex_lock(&mgr->mutex);
OH_ListRemoveAll(&mgr->taskList, TaskQueueDestroyProc);
OH_ListRemoveAll(&mgr->waitingTaskQueue, TaskQueueDestroyProc);
OH_ListRemoveAll(&mgr->executingTaskQueue, TaskQueueDestroyProc);
OH_ListRemoveAll(&mgr->executorQueue, TaskQueueDestroyProc);
pthread_mutex_unlock(&mgr->mutex);
pthread_cond_destroy(&mgr->cond);
pthread_mutex_destroy(&mgr->mutex);
return 0;
}
int ThreadMgrAddTask(ThreadMgr instance, ThreadTaskHandle *taskHandle)
{
ThreadManager *mgr = (ThreadManager *)instance;
APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
TaskNode *task = (TaskNode *)malloc(sizeof(TaskNode));
APPSPAWN_CHECK(task != NULL, return -1, "Failed to create thread task");
task->context = NULL;
task->finishProcess = NULL;
task->totalTask = 0;
atomic_init(&task->taskFlags, 0);
atomic_init(&task->finishTaskCount, 0);
OH_ListInit(&task->node);
OH_ListInit(&task->executorList);
pthread_mutex_init(&task->mutex, NULL);
SetCondAttr(&task->cond);
pthread_mutex_lock(&mgr->mutex);
task->taskId = mgr->currTaskId++;
OH_ListAddTail(&mgr->taskList, &task->node);
pthread_mutex_unlock(&mgr->mutex);
*taskHandle = task->taskId;
APPSPAWN_LOGV("Create thread task success task id: %{public}u", task->taskId);
return 0;
}
int ThreadMgrAddExecutor(ThreadMgr instance,
ThreadTaskHandle taskHandle, TaskExecutor executor, const ThreadContext *context)
{
ThreadManager *mgr = (ThreadManager *)instance;
APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
TaskExecuteNode *node = (TaskExecuteNode *)malloc(sizeof(TaskExecuteNode));
APPSPAWN_CHECK(node != NULL, return -1, "Failed to create thread executor for task %{public}u", taskHandle);
node->task = task;
OH_ListInit(&node->node);
OH_ListInit(&node->executeNode);
node->context = context;
node->executor = executor;
task->totalTask++;
OH_ListAddTail(&task->executorList, &node->node);
return 0;
}
int ThreadMgrCancelTask(ThreadMgr instance, ThreadTaskHandle taskHandle)
{
ThreadManager *mgr = (ThreadManager *)instance;
APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
if (task != NULL) {
SafeRemoveTask(mgr, task);
DeleteTask(task);
return 0;
}
task = GetTask(mgr, &mgr->waitingTaskQueue, taskHandle);
if (task != NULL) {
SafeRemoveTask(mgr, task);
DeleteTask(task);
return 0;
}
task = GetTask(mgr, &mgr->executingTaskQueue, taskHandle);
if (task != NULL) {
SafeRemoveTask(mgr, task);
DeleteTask(task);
return 0;
}
return 0;
}
int TaskSyncExecute(ThreadMgr instance, ThreadTaskHandle taskHandle)
{
ThreadManager *mgr = (ThreadManager *)instance;
APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
pthread_mutex_lock(&task->mutex);
OH_ListRemove(&task->node);
OH_ListInit(&task->node);
OH_ListAddTail(&mgr->waitingTaskQueue, &task->node);
pthread_cond_broadcast(&mgr->cond);
pthread_mutex_unlock(&task->mutex);
APPSPAWN_LOGV("TaskSyncExecute task: %{public}u", task->taskId);
struct timespec abstime;
int ret = 0;
do {
ConvertToTimespec(60 * 1000, &abstime);
pthread_mutex_lock(&task->mutex);
ret = pthread_cond_timedwait(&task->cond, &task->mutex, &abstime);
pthread_mutex_unlock(&task->mutex);
APPSPAWN_LOGV("TaskSyncExecute success task id: %{public}u ret: %{public}d", task->taskId, ret);
} while (ret == ETIMEDOUT);
DeleteTask(task);
return ret;
}
int TaskExecute(ThreadMgr instance,
ThreadTaskHandle taskHandle, TaskFinishProcessor process, const ThreadContext *context)
{
ThreadManager *mgr = (ThreadManager *)instance;
APPSPAWN_CHECK(mgr != NULL, return -1, "Invalid thread manager");
TaskNode *task = GetTask(mgr, &mgr->taskList, taskHandle);
APPSPAWN_CHECK(task != NULL, return -1, "Invalid thread task %{public}u", taskHandle);
task->finishProcess = process;
task->context = context;
pthread_mutex_lock(&mgr->mutex);
OH_ListRemove(&task->node);
OH_ListInit(&task->node);
OH_ListAddTail(&mgr->waitingTaskQueue, &task->node);
pthread_cond_broadcast(&mgr->cond);
pthread_mutex_unlock(&mgr->mutex);
APPSPAWN_LOGV("TaskExecute task: %{public}u", task->taskId);
return 0;
}
static void CheckAndCreateNewThread(ThreadManager *mgr)
{
if (mgr->maxThreadCount <= mgr->validThreadCount) {
return;
}
if (mgr->executorCount <= mgr->validThreadCount) {
return;
}
APPSPAWN_LOGV("CheckAndCreateNewThread maxThreadCount: %{public}u validThreadCount: %{public}u %{public}u",
mgr->maxThreadCount, mgr->validThreadCount, mgr->executorCount);
uint32_t totalThread = mgr->maxThreadCount;
if (mgr->executorCount <= mgr->maxThreadCount) {
totalThread = mgr->executorCount;
}
for (uint32_t index = 0; index < mgr->maxThreadCount + 1; index++) {
if (mgr->threadNode[index].threadId != INVALID_THREAD_ID) {
continue;
}
int ret = pthread_create(&mgr->threadNode[index].threadId,
NULL, ThreadExecute, (void *)&(mgr->threadNode[index]));
APPSPAWN_CHECK(ret == 0, return, "Failed to create thread for %{public}u", index);
APPSPAWN_LOGV("Create thread success index: %{public}u", mgr->threadNode[index].index);
mgr->validThreadCount++;
if (mgr->validThreadCount >= totalThread) {
return;
}
}
return;
}
static void *ManagerThreadProc(void *args)
{
ThreadManager *mgr = g_threadManager;
ThreadNode *threadNode = (ThreadNode *)args;
struct timespec abstime;
while (!threadNode->threadExit) {
pthread_mutex_lock(&mgr->mutex);
do {
uint32_t timeout = 60 * 1000;
if (!ListEmpty(mgr->waitingTaskQueue)) {
break;
}
if (!ListEmpty(mgr->executingTaskQueue)) {
timeout = 500;
}
ConvertToTimespec(timeout, &abstime);
int ret = pthread_cond_timedwait(&mgr->cond, &mgr->mutex, &abstime);
if (!ListEmpty(mgr->executingTaskQueue) || ret == ETIMEDOUT) {
break;
}
if (threadNode->threadExit) {
break;
}
} while (1);
pthread_mutex_unlock(&mgr->mutex);
ExecuteTask(mgr);
CheckAndCreateNewThread(mgr);
if (mgr->validThreadCount == 0) {
RunExecutor(mgr, threadNode, 5);
}
CheckTaskComplete(mgr);
}
return 0;
}
static void *ThreadExecute(void *args)
{
ThreadManager *mgr = g_threadManager;
ThreadNode *threadNode = (ThreadNode *)args;
struct timespec abstime;
while (!threadNode->threadExit) {
pthread_mutex_lock(&mgr->mutex);
while (ListEmpty(mgr->executorQueue) && !threadNode->threadExit) {
ConvertToTimespec(60 * 1000, &abstime);
pthread_cond_timedwait(&mgr->cond, &mgr->mutex, &abstime);
}
pthread_mutex_unlock(&mgr->mutex);
APPSPAWN_LOGV("bbbb threadNode->threadExit %{public}d", threadNode->threadExit);
RunExecutor(mgr, threadNode, 1);
}
return NULL;
}