/* -------------------------------------------------------------------------
 *  This file is part of the oGRAC project.
 * Copyright (c) 2024 Huawei Technologies Co.,Ltd.
 *
 * oGRAC is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * dc_util.c
 *
 *
 * IDENTIFICATION
 * src/kernel/catalog/dc_util.c
 *
 * -------------------------------------------------------------------------
 */
#include "knl_dc_module.h"
#include "dc_util.h"
#include "cm_log.h"
#include "knl_context.h"
#include "dc_tbl.h"

void dc_list_add(dc_list_t *list, dc_list_node_t *node)
{
    knl_panic(node != NULL);

    cm_spin_lock(&list->lock, NULL);
    node->next = list->first;
    list->first = (void *)node;
    list->count++;
    cm_spin_unlock(&list->lock);
}

void *dc_list_remove(dc_list_t *list)
{
    dc_list_node_t *node = NULL;

    if (list->count == 0) {
        return NULL;
    }

    cm_spin_lock(&list->lock, NULL);
    if (list->count == 0) {
        cm_spin_unlock(&list->lock);
        return NULL;
    }
    node = (dc_list_node_t *)list->first;
    list->first = node->next;
    list->count--;
    cm_spin_unlock(&list->lock);

    return (void *)node;
}

void dc_lru_add(dc_lru_queue_t *queue, dc_entity_t *entity)
{
    knl_panic_log(entity->lru_prev == NULL && entity->lru_next == NULL,
                  "current entity's lru_prev or lru_next is not NULL, panic info: table %s", entity->table.desc.name);

    if (queue->head == NULL) {
        queue->head = entity;
        queue->tail = entity;
        entity->lru_prev = NULL;
        entity->lru_next = NULL;
    } else {
        entity->lru_next = queue->head;
        entity->lru_prev = NULL;
        queue->head->lru_prev = entity;
        queue->head = entity;
    }

    queue->count++;
}

void dc_lru_remove(dc_lru_queue_t *queue, dc_entity_t *entity)
{
    if (dc_is_reserved_entry(entity->entry->uid, entity->entry->id)) {
        return;
    }

    if (queue->head == entity) {
        queue->head = entity->lru_next;
    }

    if (queue->tail == entity) {
        queue->tail = entity->lru_prev;
    }

    if (entity->lru_prev != NULL) {
        entity->lru_prev->lru_next = entity->lru_next;
    }

    if (entity->lru_next != NULL) {
        entity->lru_next->lru_prev = entity->lru_prev;
    }

    queue->count--;
    entity->lru_next = NULL;
    entity->lru_prev = NULL;
}

void dc_lru_shift(dc_lru_queue_t *dc_lru, dc_entity_t *entity)
{
    dc_lru_remove(dc_lru, entity);
    dc_lru_add(dc_lru, entity);
}

status_t dc_init_lru(dc_context_t *ogx)
{
    dc_lru_queue_t *dc_lru = NULL;
    errno_t err;

    if (dc_alloc_mem(ogx, ogx->memory, sizeof(dc_lru_queue_t), (void **)&ogx->lru_queue) != OG_SUCCESS) {
        return OG_ERROR;
    }

    dc_lru = ogx->lru_queue;
    err = memset_sp(dc_lru, sizeof(dc_lru_queue_t), 0, sizeof(dc_lru_queue_t));
    knl_securec_check(err);
    dc_lru->count = 0;
    dc_lru->lock = 0;
    dc_lru->head = NULL;
    dc_lru->tail = NULL;
    return OG_SUCCESS;
}

static void dc_free_entity(dc_context_t *ogx, dc_entry_t *entry)
{
    if (entry->sch_lock != NULL) {
        dc_list_add(&ogx->free_schema_locks, (dc_list_node_t *)entry->sch_lock);
        entry->sch_lock = NULL;
    }

    if (entry->entity != NULL) {
        mctx_destroy(entry->entity->memory);
        entry->entity = NULL;
    }
}

/* nologging table dc entity can only recyclable if its entry has been emptied */
#define DC_ENTITY_RECYCLABLE(entry, entity)                                 \
    ((entity)->ref_count == 0 && (entity)->valid &&                         \
    (entity) == (entry)->entity && ((entry)->need_empty_entry == OG_FALSE))

bool32 dc_try_recycle(dc_context_t *ogx, dc_lru_queue_t *queue, dc_entity_t *entity)
{
    dc_entry_t *entry = NULL;

    if (entity == NULL) {
        return OG_FALSE;
    }

    entry = entity->entry;

    if (!DC_ENTITY_RECYCLABLE(entry, entity)) {
        return OG_FALSE;
    }

    cm_spin_lock(&entry->lock, NULL);
    if (!DC_ENTITY_RECYCLABLE(entry, entity)) {
        cm_spin_unlock(&entry->lock);
        return OG_FALSE;
    }

    /*
    * entries with table lock should not be recycled.
    * because anonymous block of procedure may open dc then close it before transaction end,
    * in this case, if entity is recyled, empty entity may be got when transaction releasing itl locks.
    */
    if (dc_is_locked(entry)) {
        cm_spin_unlock(&entry->lock);
        return OG_FALSE;
    }

    dc_lru_remove(queue, entity);
    cm_spin_lock(&entry->ref_lock, NULL);
    if (entry->type == DICT_TYPE_TABLE) {
        if (entry->ref_count == 1) {
            dc_segment_recycle(ogx, entity);
        }
        entry->ref_count--;
        knl_panic_log(entry->ref_count >= 0, "the table's ref_count is abnormal, panic info: table %s ref_count %u",
                      entity->table.desc.name, entry->ref_count);
    }
    dc_free_entity(ogx, entry);
    cm_spin_unlock(&entry->ref_lock);
    cm_spin_unlock(&entry->lock);

    return OG_TRUE;
}

static status_t dc_lru_recycle(dc_context_t *ogx)
{
    dc_lru_queue_t *queue;
    dc_entity_t *curr = NULL;
    dc_entity_t *head = NULL;
    dc_entity_t *prev = NULL;

    queue = ogx->lru_queue;
    cm_spin_lock(&queue->lock, NULL);

    if (queue->count == 0) {
        cm_spin_unlock(&queue->lock);
        OG_THROW_ERROR(ERR_ALLOC_GA_MEMORY, ogx->pool.name);
        return OG_ERROR;
    }

    head = queue->head;
    curr = queue->tail;

    while (curr != NULL) {
        if (curr == head) {
            break;
        }

        prev = curr->lru_prev;

        if (dc_try_recycle(ogx, queue, curr)) {
            cm_spin_unlock(&queue->lock);
            return OG_SUCCESS;
        }

        dc_lru_shift(queue, curr);
        curr = prev;
    }

    cm_spin_unlock(&queue->lock);
    OG_THROW_ERROR(ERR_ALLOC_GA_MEMORY, ogx->pool.name);
    return OG_ERROR;
}

dc_entity_t *dc_get_entity_from_lru(knl_session_t *session, uint32 pos, bool32 *is_found)
{
    dc_lru_queue_t *queue;
    dc_entry_t *entry = NULL;
    dc_entity_t *entity = NULL;
    dc_entity_t *prev = NULL;
    uint32       i = 0;
    dc_context_t *ogx = &session->kernel->dc_ctx;

    queue = ogx->lru_queue;
    cm_spin_lock(&queue->lock, NULL);

    if (queue->count == 0 || pos > queue->count) {
        cm_spin_unlock(&queue->lock);
        return NULL;
    }

    entity = queue->tail;
    while (entity != NULL) {
        if (i == pos) {
            break;
        }
        prev = entity->lru_prev;
        entity = prev;
        i++;
    }

    if (entity != NULL && DC_ENTRY_IS_MONITORED(entity->entry)) {
        entry = entity->entry;
        if (!DC_ENTITY_RECYCLABLE(entry, entity)) {
            cm_spin_unlock(&queue->lock);
            return NULL;
        }

        if (!cm_spin_try_lock(&entry->lock)) {
            cm_spin_unlock(&queue->lock);
            return NULL;
        }

        if (!DC_ENTITY_RECYCLABLE(entry, entity)) {
            cm_spin_unlock(&entry->lock);
            cm_spin_unlock(&queue->lock);
            return NULL;
        }

        cm_spin_lock(&entity->ref_lock, NULL);
        entity->ref_count++;
        *is_found = OG_TRUE;
        cm_spin_unlock(&entity->ref_lock);
        cm_spin_unlock(&entry->lock);
    }

    cm_spin_unlock(&queue->lock);
    return entity;
}

static status_t dc_recycle_ctx(dc_context_t *ogx)
{
    if (dc_lru_recycle(ogx) == OG_SUCCESS) {
        return OG_SUCCESS;
    }

    cm_reset_error();
    g_knl_callback.dc_recycle_external();

    return dc_lru_recycle(ogx);
}

status_t dc_alloc_mem(dc_context_t *ogx, memory_context_t *mem, uint32 size, void **buf)
{
    for (;;) {
        /* 1. try to alloc mem from dc pool */
        if (mctx_try_alloc(mem, size, buf)) {
            break;
        }

        /* 2. try to recycle from dc_lru queue */
        if (dc_recycle_ctx(ogx) == OG_SUCCESS) {
            continue;
        }

        /* 3. due to sql_area recycled, so make the last attempt to alloc mem from dc pool  */
        if (mctx_try_alloc(mem, size, buf)) {
            cm_reset_error();
            break;
        }

        return OG_ERROR;
    }

    return OG_SUCCESS;
}

status_t dc_alloc_page(dc_context_t *ogx, char **page)
{
    uint32 page_id;
    errno_t err;

    for (;;) {
        if (mpool_try_alloc_page(&ogx->pool, &page_id)) {
            break;
        }

        if (dc_recycle_ctx(ogx) == OG_SUCCESS) {
            continue;
        }

        if (mpool_try_alloc_page(&ogx->pool, &page_id)) {
            cm_reset_error();
            break;
        }

        return OG_ERROR;
    }

    *page = mpool_page_addr(&ogx->pool, page_id);
    err = memset_sp(*page, OG_SHARED_PAGE_SIZE, 0, OG_SHARED_PAGE_SIZE);
    knl_securec_check(err);

    return OG_SUCCESS;
}

status_t dc_alloc_memory_page(dc_context_t *ogx, uint32 *page_id)
{
    for (;;) {
        if (mpool_try_alloc_page(&ogx->pool, page_id)) {
            break;
        }

        if (dc_recycle_ctx(ogx) == OG_SUCCESS) {
            continue;
        }

        if (mpool_try_alloc_page(&ogx->pool, page_id)) {
            cm_reset_error();
            break;
        }

        return OG_ERROR;
    }

    return OG_SUCCESS;
}

status_t dc_create_memory_context(dc_context_t *ogx, memory_context_t **memory)
{
    for (;;) {
        if (mctx_try_create(&ogx->pool, memory)) {
            break;
        }

        if (dc_recycle_ctx(ogx) == OG_SUCCESS) {
            continue;
        }

        if (mctx_try_create(&ogx->pool, memory)) {
            cm_reset_error();
            break;
        }

        return OG_ERROR;
    }

    return OG_SUCCESS;
}

static status_t dc_alloc_from_ctx(knl_session_t *session, dc_list_t *list, uint32 size, void **buf)
{
    dc_context_t *ogx = &session->kernel->dc_ctx;
    errno_t err;

    cm_spin_lock(&ogx->lock, NULL);

    *buf = dc_list_remove(list);

    if (*buf == NULL) {
        if (dc_alloc_mem(ogx, ogx->memory, size, buf) != OG_SUCCESS) {
            cm_spin_unlock(&ogx->lock);
            return OG_ERROR;
        }
    }

    cm_spin_unlock(&ogx->lock);

    err = memset_sp(*buf, size, 0, size);
    knl_securec_check(err);

    return OG_SUCCESS;
}

status_t dc_copy_text2str(knl_session_t *session, memory_context_t *context, text_t *src, char **dst)
{
    if (src->len == 0) {
        *dst = NULL;
        return OG_SUCCESS;
    }

    if (dc_alloc_mem(&session->kernel->dc_ctx, context, src->len + 1, (void **)dst) != OG_SUCCESS) {
        return OG_ERROR;
    }

    (void)cm_text2str(src, *dst, src->len + 1);

    return OG_SUCCESS;
}

status_t dc_alloc_appendix(knl_session_t *session, dc_entry_t *entry)
{
    return dc_alloc_from_ctx(session, &session->kernel->dc_ctx.free_appendixes, sizeof(dc_appendix_t),
        (void **)&entry->appendix);
}

status_t dc_alloc_synonym_link(knl_session_t *session, dc_entry_t *entry)
{
    return dc_alloc_from_ctx(session, &session->kernel->dc_ctx.free_synonym_links, sizeof(synonym_link_t),
        (void **)&entry->appendix->synonym_link);
}

status_t dc_alloc_schema_lock(knl_session_t *session, dc_entry_t *entry)
{
    status_t ret;
    ret = dc_alloc_from_ctx(session, &session->kernel->dc_ctx.free_schema_locks, (uint32)SCHEMA_LOCK_SIZE,
                            (void **)&entry->sch_lock);
    if (ret == OG_SUCCESS) {
        entry->sch_lock->inst_id = OG_INVALID_ID8;
    }
    return ret;
}

bool32 dc_try_reuse_entry(dc_user_t *user, dc_entry_t **entry)
{
    cm_spin_lock(&user->free_entries_lock, NULL);
    *entry = (dc_entry_t *)cm_bilist_remove_head(&user->free_entries);
    if (*entry == NULL) {
        cm_spin_unlock(&user->free_entries_lock);
        return OG_FALSE;
    }
    (*entry)->is_free = OG_FALSE;
    (*entry)->ready = OG_FALSE;
    (*entry)->used = OG_TRUE;
    cm_spin_unlock(&user->free_entries_lock);
    return OG_TRUE;
}

// if entry in free list, remove it
void dc_try_remove_entry(dc_user_t *user, dc_entry_t *entry)
{
    cm_spin_lock(&user->free_entries_lock, NULL);
    if (entry->is_free) {
        cm_bilist_del(&entry->node, &user->free_entries);
        entry->is_free = OG_FALSE;
    }
    cm_spin_unlock(&user->free_entries_lock);
}