Zzhuofan0129feat: Fix alarm
bcc3c578创建于 2024年9月5日历史提交
/*
 * Copyright (c) 2022 Huawei Device Co., Ltd.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "utils_work_queue.h"

#include <pthread.h>
#include <stddef.h>
#include <sys/prctl.h>

#include "securec.h"

#include "utils_dslm_list.h"
#include "utils_log.h"
#include "utils_mem.h"

#define RUN 0
#define DIE 1

typedef struct WorkQueue {
    ListHead head;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    volatile int32_t state;
    uint32_t capacity;
    uint32_t size;
    pthread_t pthreadId;
    const char *name;
} WorkQueue;

typedef struct {
    ListNode linkNode;
    WorkProcess process; // callback func
    uint32_t dataLen;
    uint8_t *dataBuff; // user data ptr
} Worker;

static void *WorkQueueThread(void *data)
{
    WorkQueue *queue = (WorkQueue *)data;
    Worker *worker = NULL;

#ifndef L0_MINI
    prctl(PR_SET_NAME, queue->name, 0, 0, 0);
#endif

    int ret = pthread_mutex_lock(&queue->mutex);
    if (ret != 0) {
        SECURITY_LOG_ERROR("pthread_mutex_lock error");
        return NULL;
    }
    while (queue->state == RUN) {
        while ((IsEmptyList(&queue->head)) && (queue->state == RUN)) {
            pthread_cond_wait(&queue->cond, &queue->mutex);
        }
        // need to check again
        if (queue->state != RUN) {
            break;
        }

        worker = LIST_ENTRY(queue->head.next, Worker, linkNode);
        RemoveListNode(&worker->linkNode);
        queue->size--;

        ret = pthread_mutex_unlock(&queue->mutex);
        if (ret != 0) {
            SECURITY_LOG_ERROR("pthread_mutex_unlock error");
        }
        worker->process(worker->dataBuff, worker->dataLen);
        FREE(worker);
        ret = pthread_mutex_lock(&queue->mutex);
        if (ret != 0) {
            SECURITY_LOG_ERROR("pthread_mutex_lock error");
            return NULL;
        }
    }

    // now the queue is stopped, just remove the nodes.
    while (!IsEmptyList(&queue->head)) {
        worker = LIST_ENTRY(queue->head.next, Worker, linkNode);
        RemoveListNode(&worker->linkNode);
        queue->size--;
        FREE(worker);
    }

    ret = pthread_mutex_unlock(&queue->mutex);
    if (ret != 0) {
        SECURITY_LOG_ERROR("pthread_mutex_unlock error");
    }
    return NULL;
}

#ifndef L0_MINI
WorkQueue *CreateWorkQueue(uint32_t capacity, const char *name)
{
    WorkQueue *queue = MALLOC(sizeof(WorkQueue));
    if (queue == NULL) {
        return NULL;
    }
    (void)memset_s(queue, sizeof(WorkQueue), 0, sizeof(WorkQueue));

    InitListHead(&(queue->head));
    queue->state = RUN;
    queue->capacity = capacity;
    queue->size = 0;
    queue->name = name;

    int32_t iRet = pthread_mutex_init(&(queue->mutex), NULL);
    if (iRet != 0) {
        FREE(queue);
        return NULL;
    }

    iRet = pthread_cond_init(&queue->cond, NULL);
    if (iRet != 0) {
        (void)pthread_mutex_destroy(&(queue->mutex));
        FREE(queue);
        return NULL;
    }

    iRet = pthread_create(&queue->pthreadId, NULL, WorkQueueThread, queue);
    if (iRet != 0) {
        (void)pthread_cond_destroy(&(queue->cond));
        (void)pthread_mutex_destroy(&(queue->mutex));
        FREE(queue);
        return NULL;
    }

    return queue;
}
#else
WorkQueue *CreateWorkQueue(uint32_t capacity, const char *name)
{
    pthread_attr_t attr;
    WorkQueue *queue = MALLOC(sizeof(WorkQueue));
    if (queue == NULL) {
        return NULL;
    }
    (void)memset_s(queue, sizeof(WorkQueue), 0, sizeof(WorkQueue));

    InitListHead(&(queue->head));
    queue->state = RUN;
    queue->capacity = capacity;
    queue->size = 0;
    queue->name = name;

    int32_t iRet = pthread_mutex_init(&(queue->mutex), NULL);
    if (iRet != 0) {
        FREE(queue);
        return NULL;
    }

    iRet = pthread_cond_init(&queue->cond, NULL);
    if (iRet != 0) {
        (void)pthread_mutex_destroy(&(queue->mutex));
        FREE(queue);
        return NULL;
    }

    iRet = pthread_attr_init(&attr);
    if (iRet != 0) {
        (void)pthread_cond_destroy(&(queue->cond));
        (void)pthread_mutex_destroy(&(queue->mutex));
        (void)pthread_attr_destroy(&attr);
        FREE(queue);
        return NULL;
    }

    iRet = pthread_attr_setstacksize(&attr, 0x10000);
    if (iRet != 0) {
        (void)pthread_cond_destroy(&(queue->cond));
        (void)pthread_mutex_destroy(&(queue->mutex));
        (void)pthread_attr_destroy(&attr);
        FREE(queue);
        return NULL;
    }

    iRet = pthread_create(&queue->pthreadId, &attr, WorkQueueThread, queue);
    if (iRet != 0) {
        (void)pthread_cond_destroy(&(queue->cond));
        (void)pthread_mutex_destroy(&(queue->mutex));
        (void)pthread_attr_destroy(&attr);
        FREE(queue);
        return NULL;
    }
    (void)pthread_attr_destroy(&attr);

    return queue;
}
#endif

uint32_t DestroyWorkQueue(WorkQueue *queue)
{
    if (queue == NULL) {
        return WORK_QUEUE_NULL_PTR;
    }

    (void)pthread_mutex_lock(&queue->mutex);
    queue->state = DIE;
    int32_t iRet = pthread_cond_broadcast(&queue->cond);
    if (iRet != 0) {
        (void)pthread_mutex_unlock(&queue->mutex);
        return WORK_QUEUE_THREAD_COND_ERR;
    }
    (void)pthread_mutex_unlock(&queue->mutex);

    iRet = pthread_join(queue->pthreadId, NULL);
    if (iRet != 0) {
        return WORK_QUEUE_THREAD_JOIN_ERR;
    }

    FREE(queue);
    return WORK_QUEUE_OK;
}

uint32_t QueueWork(WorkQueue *queue, WorkProcess process, uint8_t *data, uint32_t length)
{
    if ((queue == NULL) || (process == NULL)) {
        return WORK_QUEUE_NULL_PTR;
    }
    if (queue->state != RUN) {
        return WORK_QUEUE_STATE_ERR;
    }
    if (queue->size >= queue->capacity) {
        return WORK_QUEUE_FULL;
    }

    Worker *worker = MALLOC(sizeof(Worker));
    if (worker == NULL) {
        return WORK_QUEUE_MALLOC_ERR;
    }
    (void)memset_s(worker, sizeof(Worker), 0, sizeof(Worker));

    InitListHead(&worker->linkNode);
    worker->dataLen = length;
    worker->dataBuff = data;
    worker->process = process;

    (void)pthread_mutex_lock(&queue->mutex);
    AddListNodeBefore(&worker->linkNode, &queue->head);
    queue->size++;

    (void)pthread_mutex_unlock(&queue->mutex);
    (void)pthread_cond_broadcast(&queue->cond);
    return WORK_QUEUE_OK;
}