/* -------------------------------------------------------------------------
 *  This file is part of the oGRAC project.
 * Copyright (c) 2024 Huawei Technologies Co.,Ltd.
 *
 * oGRAC is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * mes_queue.c
 *
 *
 * IDENTIFICATION
 * src/mec/mes_queue.c
 *
 * -------------------------------------------------------------------------
 */
#include "mes_log_module.h"
#include "mes_queue.h"
#include "srv_instance.h"
#include "dtc_context.h"
#include "mes_func.h"
#include "tms_monitor.h"

#define MES_QUEUE_LOG_LENGTH (1024)

static void put_msgitem_nolock(dtc_msgqueue_t *queue, dtc_msgitem_t *msgitem)
{
    if (queue->count == 0) {
        queue->first = msgitem;
        queue->last = msgitem;
        msgitem->next = NULL;
    } else {
        queue->last->next = msgitem;
        queue->last = msgitem;
    }
    queue->count++;
}

void put_msgitem(dtc_msgqueue_t *queue, dtc_msgitem_t *msgitem)
{
    cm_spin_lock(&queue->lock, NULL);
    if (queue->count == 0) {
        queue->first = msgitem;
        queue->last = msgitem;
        msgitem->next = NULL;
    } else {
        queue->last->next = msgitem;
        queue->last = msgitem;
    }
    queue->count++;
    cm_spin_unlock(&queue->lock);
}

dtc_msgitem_t *mes_alloc_msgitem_nolock(dtc_msgqueue_t *queue)
{
    dtc_msgitem_t *ret = NULL;

    if (queue->count == 0) {
        if (alloc_msgitems(&g_mes.mq_ctx.pool, queue) != OG_SUCCESS) {
            OG_THROW_ERROR_EX(ERR_MES_CREATE_AREA, "alloc msg item failed");
            return NULL;
        }
    }

    if (queue->count > 0) {
        ret = queue->first;
        queue->first = ret->next;
        queue->count--;
    }

    return ret;
}

dtc_msgitem_t *get_msgitem(dtc_msgqueue_t *queue)
{
    dtc_msgitem_t *ret = NULL;

    cm_spin_lock(&queue->lock, NULL);
    if (queue->count > 0) {
        ret = queue->first;
        queue->first = ret->next;
        queue->count--;
    }
    cm_spin_unlock(&queue->lock);
    return ret;
}

status_t alloc_msgitems(dtc_msgitem_pool_t *pool, dtc_msgqueue_t *msgitems)
{
    dtc_msgitem_t *item;

    cm_spin_lock(&pool->free_list.lock, NULL);
    if (pool->free_list.count == 0) {
        cm_spin_unlock(&pool->free_list.lock);
        cm_spin_lock(&pool->lock, NULL);
        if (pool->buf_idx == OG_INVALID_ID16 || pool->hwm >= INIT_MSGITEM_BUFFER_SIZE) {
            pool->buf_idx++;
            if (pool->buf_idx >= MAX_POOL_BUFFER_COUNT) {
                cm_spin_unlock(&pool->lock);
                OG_LOG_RUN_ERR("pool->buf_idx exceed.");
                return OG_ERROR;
            }
            pool->hwm = 0;
            pool->buffer[pool->buf_idx] = (dtc_msgitem_t *)malloc(INIT_MSGITEM_BUFFER_SIZE * sizeof(dtc_msgitem_t));
            if (pool->buffer[pool->buf_idx] == NULL) {
                cm_spin_unlock(&pool->lock);
                return OG_ERROR;
            }
        }
        item = (dtc_msgitem_t *)(pool->buffer[pool->buf_idx] + pool->hwm);
        pool->hwm += MSG_ITEM_BATCH_SIZE;
        cm_spin_unlock(&pool->lock);

        msgitems->first = item;
        for (uint32 loop = 0; loop < MSG_ITEM_BATCH_SIZE - 1; loop++) {
            item->next = item + 1;
            item = item->next;
        }
        item->next = NULL;
        msgitems->last = item;
        msgitems->count = MSG_ITEM_BATCH_SIZE;
        return OG_SUCCESS;
    }

    knl_panic(pool->free_list.count >= MSG_ITEM_BATCH_SIZE);

    msgitems->first = pool->free_list.first;
    for (uint32 loop = 0; loop < MSG_ITEM_BATCH_SIZE - 1; loop++) {
        pool->free_list.first = pool->free_list.first->next;
    }

    msgitems->last = pool->free_list.first;
    pool->free_list.first = pool->free_list.first->next;
    msgitems->last->next = NULL;
    msgitems->count = MSG_ITEM_BATCH_SIZE;

    pool->free_list.count -= MSG_ITEM_BATCH_SIZE;
    if (pool->free_list.count == 0) {
        pool->free_list.last = NULL;
    }

    cm_spin_unlock(&pool->free_list.lock);

    return OG_SUCCESS;
}

static void free_msgitems(dtc_msgitem_pool_t *pool, dtc_msgqueue_t *msgitems)
{
    cm_spin_lock(&pool->free_list.lock, NULL);
    if (pool->free_list.count > 0) {
        pool->free_list.last->next = msgitems->first;
        pool->free_list.last = msgitems->last;
        pool->free_list.count += msgitems->count;
    } else {
        pool->free_list.first = msgitems->first;
        pool->free_list.last = msgitems->last;
        pool->free_list.count = msgitems->count;
    }
    cm_spin_unlock(&pool->free_list.lock);
    init_msgqueue(msgitems);
}

dtc_msgitem_t *mes_alloc_msgitem(dtc_msgqueue_t *queue)
{
    dtc_msgitem_t *item = NULL;

    cm_spin_lock(&queue->lock, NULL);
    if (queue->count == 0) {
        if (alloc_msgitems(&g_mes.mq_ctx.pool, queue) != OG_SUCCESS) {
            cm_spin_unlock(&queue->lock);
            OG_THROW_ERROR_EX(ERR_MES_CREATE_AREA, "alloc msg item failed");
            return NULL;
        }
    }

    item = queue->first;
    queue->first = item->next;
    queue->count--;
    cm_spin_unlock(&queue->lock);

    return item;
}

void init_msgqueue(dtc_msgqueue_t *queue)
{
    queue->lock = 0;
    queue->first = NULL;
    queue->last = NULL;
    queue->count = 0;
}

void init_msgitem_pool(dtc_msgitem_pool_t *pool)
{
    pool->lock = 0;
    pool->buf_idx = OG_INVALID_ID16;
    pool->hwm = 0;
    init_msgqueue(&pool->free_list);
}

void free_msgitem_pool(dtc_msgitem_pool_t *pool)
{
    if (pool->buf_idx == OG_INVALID_ID16) {
        return;
    }

    for (uint16 i = 0; i <= pool->buf_idx; i++) {
        CM_FREE_PTR(pool->buffer[i]);
    }
}

uint32 dtc_get_rand_value(void)
{
    uint32 randvalue;

    randvalue = cm_random(1024 * 1024);

    return randvalue;
}

dtc_msgitem_t *mes_get_task_msg(mes_task_group_t *group)
{
    dtc_msgqueue_t *group_queue = &group->queue;
    dtc_msgitem_t *msgitem;

    if (group_queue->count != 0) {
        msgitem = get_msgitem(group_queue);
        if (msgitem != NULL) {
            return msgitem;
        }
    }

    if (group->queue.count > 100) {
        MES_LOGGING(MES_LOGGING_GET_QUEUE, "[mes]: group %u queue length num %u.", group->group_id, group->queue.count);
    }

    return NULL;
}

void dtc_task_proc(thread_t *thread)
{
    uint32 index = *(uint32 *)thread->argument;
    dtc_msgitem_t *msgitem;
    mes_task_group_t *group;

    cm_set_thread_name("dtc_task_proc");

    dtc_msgqueue_t finished_msgitem_queue;
    init_msgqueue(&finished_msgitem_queue);

    group = mes_get_task_group(index);
    if (group == NULL) {
        OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "[mes]: task index %u not belong any group.", index);
        return;
    }

    tms_monitor_handle monitor_handler =
        tms_sig_event_reg("dtc_task_proc", tms_monitor_cb, TMS_MONITOR_DEFAULT_STEP);
    if (monitor_handler == NULL) {
        OG_LOG_RUN_ERR("[mes]: task regist monitor event failed.");
        return;
    }
    tms_monitor_t *monitor_event = (tms_monitor_t *)monitor_handler;

    while (!thread->closed) {
        msgitem = mes_get_task_msg(group);
        if (msgitem == NULL) {
            monitor_event->monitor_is_running = OG_FALSE;
            cm_wait_cond_no_timeout(&group->work_thread_cond);
            continue;
        }
        tms_update_monitor_start_time(monitor_handler);
        monitor_event->monitor_is_running = OG_TRUE;

        if (msgitem->msg.head->src_inst != msgitem->msg.head->dst_inst) {  // ignores the consume time of message send
                                                                           // inter, shoule use a new view if need
            mes_consume_with_time(msgitem->msg.head->cmd, MES_TIME_GET_QUEUE, msgitem->start_time);
        }

        MES_LOG_DEBUG(
            msgitem->msg.head->cmd,
            "[mes]cmd=%u, rsn=%u, src_inst=%u, dst_inst=%u, src_sid=%u, dst_sid=%u, start_time=%llu, thread_id=%u, group_id=%u, queue_len=%u.",
            msgitem->msg.head->cmd, msgitem->msg.head->rsn, msgitem->msg.head->src_inst, msgitem->msg.head->dst_inst,
            msgitem->msg.head->src_sid, msgitem->msg.head->dst_sid, msgitem->start_time, index, group->group_id,
            g_mes.mq_ctx.group.task_group[group->group_id].queue.count);

        g_mes.proc(index, &msgitem->msg);

        if (msgitem->msg.head->src_inst != msgitem->msg.head->dst_inst) {
            mes_consume_with_time(msgitem->msg.head->cmd, MES_TIME_QUEUE_PROC, msgitem->start_time);
        }

        put_msgitem_nolock(&finished_msgitem_queue, msgitem);
        if (MSG_ITEM_BATCH_SIZE == finished_msgitem_queue.count) {
            free_msgitems(&g_mes.mq_ctx.pool, &finished_msgitem_queue);
        }
        tms_update_monitor_end_time(monitor_handler);
    }
}

status_t init_dtc_mq_instance(void)
{
    uint32 loop;

    for (loop = 0; loop < DTC_MSG_QUEUE_NUM; loop++) {
        init_msgqueue(&g_mes.mq_ctx.queue[loop]);
    }

    for (loop = 0; loop < OG_DTC_MAX_TASK_NUM; loop++) {
        init_msgqueue(&g_mes.mq_ctx.tasks[loop].queue);
        g_mes.mq_ctx.tasks[loop].choice = 0;
    }

    for (loop = 0; loop < MES_TASK_GROUP_ALL; loop++) {
        init_msgqueue(&g_mes.mq_ctx.group.task_group[loop].queue);
        cm_init_cond(&g_mes.mq_ctx.group.task_group[loop].work_thread_cond);
    }

    init_msgitem_pool(&g_mes.mq_ctx.pool);

    for (loop = 0; loop < g_mes.profile.work_thread_num; loop++) {
        g_mes.mes_ctx.work_thread_idx[loop] = loop;
        if (OG_SUCCESS != cm_create_thread(dtc_task_proc, DB_THREAD_STACK_SIZE, &g_mes.mes_ctx.work_thread_idx[loop],
                                           &g_mes.mq_ctx.tasks[loop].thread)) {
            OG_LOG_RUN_ERR("create work thread %u failed.", loop);
            return OG_ERROR;
        }
    }
    return OG_SUCCESS;
}

void free_dtc_mq_instance(void)
{
    uint32 loop;

    for (loop = 0; loop < g_mes.profile.work_thread_num; loop++) {
        g_mes.mq_ctx.tasks[loop].thread.closed = OG_TRUE;
    }

    for (loop = 0; loop < MES_TASK_GROUP_ALL; loop++) {
        cm_release_cond(&g_mes.mq_ctx.group.task_group[loop].work_thread_cond);
    }

    for (loop = 0; loop < g_mes.profile.work_thread_num; loop++) {
        cm_close_thread(&g_mes.mq_ctx.tasks[loop].thread);
    }

    for (loop = 0; loop < MES_TASK_GROUP_ALL; loop++) {
        cm_destory_cond(&g_mes.mq_ctx.group.task_group[loop].work_thread_cond);
    }

    free_msgitem_pool(&g_mes.mq_ctx.pool);
}

status_t mes_put_inter_msg(mes_message_t *msg)
{
    dtc_msgitem_t *msgitem;

    msgitem = mes_alloc_msgitem(&g_mes.mq_ctx.local_queue);
    if (msgitem == NULL) {
        OG_LOG_RUN_ERR("mes_alloc_msgitem failed.");
        return OG_ERROR;
    }
    uint64 start_time = 0;
    mes_get_consume_time_start(&start_time);

    MES_LOG_WITH_MSG(msg);

    msgitem->msg.head = msg->head;
    msgitem->msg.buffer = msg->buffer;
    msgitem->start_time = start_time;

    mes_put_msgitem(msgitem);
    return OG_SUCCESS;
}

void mes_put_msgitem(dtc_msgitem_t *msgitem)
{
    mes_task_group_id_t group_id = g_mes.mq_ctx.command_attr[msgitem->msg.head->cmd].group_id;
    dtc_msgqueue_t *queue = &g_mes.mq_ctx.group.task_group[group_id].queue;

    put_msgitem(queue, msgitem);

    cm_release_cond_signal(&g_mes.mq_ctx.group.task_group[group_id].work_thread_cond);

    return;
}

void mes_set_command_task_group(mes_command_t command, mes_task_group_id_t group_id)
{
    g_mes.mq_ctx.command_attr[command].group_id = group_id;
}

status_t mes_set_group_task_num(mes_task_group_id_t group_id, uint32 task_num)
{
    mes_task_group_t *task_group = &g_mes.mq_ctx.group.task_group[group_id];

    if (task_num == 0) {
        OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "[mes]: group_id %u can't set task_num 0.", group_id);
        return OG_ERROR;
    }

    if (task_group->is_set) {
        OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "[mes]: group_id %u has been set already.", group_id);
        return OG_ERROR;
    }

    if ((g_mes.mq_ctx.group.assign_task_idx + task_num) > g_mes.mq_ctx.task_num) {
        OG_THROW_ERROR_EX(ERR_MES_PARAMETER, "[mes]: group %u task num %u has excced total task num.", group_id,
                          task_num);
        return OG_ERROR;
    }

    task_group->group_id = group_id;
    task_group->task_num = task_num;
    task_group->start_task_idx = g_mes.mq_ctx.group.assign_task_idx;
    g_mes.mq_ctx.group.assign_task_idx += task_num;
    task_group->is_set = OG_TRUE;

    OG_LOG_DEBUG_INF("[mes] set group %u start_task_idx %u task num %u.", group_id, task_group->start_task_idx,
        task_num);
    return OG_SUCCESS;
}

mes_task_group_t *mes_get_task_group(uint32 task_index)
{
    mes_task_group_t *group;

    for (uint32 i = 0; i < MES_TASK_GROUP_ALL; i++) {
        group = &g_mes.mq_ctx.group.task_group[i];
        if (task_index < (group->start_task_idx + group->task_num)) {
            return group;
        }
    }

    return NULL;
}