/* -------------------------------------------------------------------------
 *  This file is part of the oGRAC project.
 * Copyright (c) 2024 Huawei Technologies Co.,Ltd.
 *
 * oGRAC is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * dtc_buffer.c
 *
 *
 * IDENTIFICATION
 * src/cluster/dtc_buffer.c
 *
 * -------------------------------------------------------------------------
 */
#include "knl_cluster_module.h"
#include <knl_buffer.h>
#include "dtc_buffer.h"
#include "dtc_drc.h"
#include "dtc_dcs.h"
#include "dtc_context.h"
#include "dtc_trace.h"
#include "knl_datafile.h"
#include "knl_buflatch.h"

static inline bool32 dtc_buf_prepare_ctrl(knl_session_t *session, buf_read_assist_t *ra, buf_ctrl_t **ctrl)
{
    *ctrl = buf_alloc_ctrl(session, ra->page_id, ra->mode, ra->options);
    if (SECUREC_UNLIKELY(*ctrl == NULL)) {
        knl_panic(ra->options & ENTER_PAGE_TRY);
        session->curr_page = NULL;
        session->curr_page_ctrl = NULL;
        return OG_FALSE;
    }
    return OG_TRUE;
}

static inline bool32 dtc_buf_try_local(knl_session_t *session, buf_read_assist_t *ra, buf_ctrl_t *ctrl)
{
    return dcs_local_page_usable(session, ctrl, ra->mode);
}

static inline bool32 dtc_buf_try_edp(knl_session_t *session, buf_read_assist_t *ra, buf_ctrl_t *ctrl)
{
    if (ra->try_edp) {
        if (ctrl->is_edp && ra->query_scn <= ctrl->edp_scn) {
            DTC_DCS_DEBUG_INF(
                "dtc_buf_try_edp, [%u-%llu], query_scn:%llu, edp_scn:%llu, load_status:%d, lock_mode:%d, pcn:%u, lsn:%llu",
                (uint32)ctrl->page_id.file, (uint64)ctrl->page_id.page, ra->query_scn, ctrl->edp_scn, ctrl->load_status,
                ctrl->lock_mode, ctrl->page->pcn, ctrl->page->lsn);
            return OG_TRUE;
        }
    }
    return OG_FALSE;
}

static inline bool32 dtc_buf_give_up_try(knl_session_t *session, buf_read_assist_t *ra, buf_ctrl_t *ctrl)
{
    if ((ra->options & ENTER_PAGE_TRY) && !ctrl->force_request) {
        if (ctrl->load_status == (uint8)BUF_NEED_LOAD) {
            ctrl->load_status = (uint8)BUF_LOAD_FAILED;
        }
        buf_unlatch(session, ctrl, OG_TRUE);
        session->curr_page = NULL;
        session->curr_page_ctrl = NULL;
        return OG_TRUE;
    }
    return OG_FALSE;
}

static inline bool32 dtc_buf_try_remote(knl_session_t *session, buf_read_assist_t *ra, buf_ctrl_t *ctrl)
{
    drc_lock_mode_e req_mode = (ra->mode == LATCH_MODE_S ? DRC_LOCK_SHARE : DRC_LOCK_EXCLUSIVE);
    ctrl->transfer_status = BUF_TRANS_TRY_REMOTE;

    if (dcs_request_page(session, ctrl, ra->page_id, req_mode) == OG_SUCCESS) {
        ctrl->transfer_status = BUF_TRANS_NONE;
        return OG_TRUE;
    }

    ctrl->transfer_status = BUF_TRANS_NONE;
    if (ctrl->load_status != BUF_IS_LOADED) {
        ctrl->load_status = (uint8)BUF_LOAD_FAILED;
    }
    buf_unlatch(session, ctrl, OG_TRUE);
    return OG_FALSE;
}

static status_t dtc_buf_try_prefetch(knl_session_t *session, buf_read_assist_t *ra, buf_ctrl_t *ctrl)
{
    if (DTC_BUF_NO_PREFETCH(ra->read_num)) {
        if (buf_load_page(session, ctrl, ra->page_id) != OG_SUCCESS) {
            ctrl->load_status = (uint8)BUF_LOAD_FAILED;
            buf_unlatch(session, ctrl, OG_TRUE);
            return OG_ERROR;
        }
        ctrl->force_request = 0;
        ctrl->load_status = (uint8)BUF_IS_LOADED;
    } else if (DTC_BUF_PREFETCH_EXTENT(ra->read_num)) {
        if (buf_read_prefetch_normal(session, ctrl, ra->page_id, LATCH_MODE_S, ra->options) != OG_SUCCESS) {
            buf_unlatch(session, ctrl, OG_TRUE);
            return OG_ERROR;
        }
    } else {
        // may be not at extent boundary
        if (buf_read_prefetch_num_normal(session, ctrl, ra->page_id, ra->read_num, LATCH_MODE_S, ra->options) !=
            OG_SUCCESS) {
            buf_unlatch(session, ctrl, OG_TRUE);
            return OG_ERROR;
        }
    }
    return OG_SUCCESS;
}

static status_t dtc_buf_finish(knl_session_t *session, buf_read_assist_t *ra, buf_ctrl_t *ctrl, knl_buf_wait_t
    *temp_stat)
{
    BUF_UNPROTECT_PAGE(ctrl->page);
    if (ctrl->load_status == (uint8)BUF_NEED_LOAD) {
        bool32 try_load = OG_TRUE;
        if (ra->options & ENTER_PAGE_NO_READ) {
            if (ra->options & ENTER_PAGE_TRY_PREFETCH) {
                ra->read_num = DTC_BUF_PREFETCH_EXT_NUM;
            } else {
                try_load = OG_FALSE;
            }
        }
        if (try_load && dtc_buf_try_prefetch(session, ra, ctrl) != OG_SUCCESS) {
            session->curr_page_ctrl = ctrl;
            OG_LOG_RUN_ERR("[DTC_BNUFFER][%u-%u][dtc buf try prefetch] failed, read num:%u",
                ctrl->page_id.file, ctrl->page_id.page, ra->read_num);
            return OG_ERROR;
        }
        if (ra->options & ENTER_PAGE_NO_READ) {
            ctrl->load_status = (uint8)BUF_IS_LOADED;
        }
    } else {
        if (!buf_check_loaded_page_checksum(session, ctrl, ra->mode, ra->options)) {
            OG_THROW_ERROR(ERR_PAGE_CORRUPTED, ctrl->page_id.file, ctrl->page_id.page);
            buf_unlatch(session, ctrl, OG_TRUE);
            return OG_ERROR;
        }
    }

    knl_panic_log(IS_SAME_PAGID(ra->page_id, ctrl->page_id),
                  "page_id and ctrl's page_id are not same, panic info: page %u-%u ctrl page %u-%u type %u",
                  ra->page_id.file, ra->page_id.page, ctrl->page_id.file, ctrl->page_id.page, ctrl->page->type);

    session->curr_page = (char *)ctrl->page;
    session->curr_page_ctrl = ctrl;
    session->stat->buffer_gets++;

    if (SECUREC_UNLIKELY(ctrl->page->type == PAGE_TYPE_UNDO)) {
        session->stat->undo_buf_reads++;
    }

#ifdef __PROTECT_BUF__
    if (mode != LATCH_MODE_X && !ctrl->is_readonly) {
        BUF_PROTECT_PAGE(ctrl->page);
    }
#endif

    //    stats_buf_record(session, &temp_stat, ctrl);
    buf_push_page(session, ctrl, ra->mode);
    buf_log_enter_page(session, ctrl, ra->mode, ra->options);
    
    if (DTC_BUF_PREFETCH_EXTENT(ra->read_num) &&
        session->kernel->attr.enable_asynch && !session->kernel->attr.enable_dss) {
        if (buf_try_prefetch_next_ext(session, ctrl) != OG_SUCCESS) {
            OG_LOG_RUN_WAR("failed to prefetch next extent file : %u , page: %llu",
                           (uint32)ctrl->page_id.file, (uint64)ctrl->page_id.page);
        }
    }

    return OG_SUCCESS;
}

// unit: ms
#define DCS_LOG_LIMIT_INTERVAL (500)
status_t dtc_read_page(knl_session_t *session, buf_read_assist_t *ra)
{
    buf_ctrl_t *ctrl = NULL;
    knl_buf_wait_t temp_stat;

    stats_buf_init(session, &temp_stat);
    date_t last_time = 0;

    for (;;) {
        if (!dtc_dcs_readable(session, ra->page_id)) {
            if (last_time + DCS_LOG_LIMIT_INTERVAL * MICROSECS_PER_MILLISEC <= KNL_NOW(session)) {
                last_time = KNL_NOW(session);
                OG_LOG_DEBUG_ERR("[DCS][%u-%u] dcs not readable, session is hanging.",
                    ra->page_id.file, ra->page_id.page);
            }
            cm_sleep(DCS_RESEND_MSG_INTERVAL);
            continue;
        }

        if (!dtc_buf_prepare_ctrl(session, ra, &ctrl)) {
            return OG_SUCCESS;
        }

        if (dtc_buf_try_local(session, ra, ctrl)) {
            break;
        }

        if (dtc_buf_give_up_try(session, ra, ctrl)) {
            return OG_SUCCESS;
        }

        if (dtc_buf_try_remote(session, ra, ctrl)) {
            break;
        }
        cm_sleep(DCS_RESEND_MSG_INTERVAL);
    }

    return dtc_buf_finish(session, ra, ctrl, &temp_stat);
}

status_t dtc_get_exclusive_owner_pages(knl_session_t *session, buf_ctrl_t **ctrl_array, buf_ctrl_t *ctrl, uint32 count)
{
    uint32 i;
    uint8 master_id = OG_INVALID_ID8;
    page_id_t *page_ids = (page_id_t *)cm_push(session->stack, sizeof(page_id_t) * count);
    if (NULL == page_ids) {
        OG_LOG_RUN_ERR("[BUFFER] page_ids failed to malloc memory");
        return OG_ERROR;
    }
    uint32 valid_count = 0;

    for (i = 0; i < count; i++) {
        page_ids[i] = INVALID_PAGID;
    }

    /* The ownership of target ctrl has been fetched through dtc_buf_try_remote, here we will prefetch all the left
     * ctrls in the extent. */
    for (i = 0; i < count; i++) {
        if (ctrl_array[i] != NULL && ctrl_array[i] != ctrl) {
            if (master_id == OG_INVALID_ID8) {
                (void)drc_get_page_master_id(ctrl_array[i]->page_id, &master_id);
            }
            uint8 master_id_tmp = OG_INVALID_ID8;
            (void)drc_get_page_master_id(ctrl_array[i]->page_id, &master_id_tmp);
            knl_panic(master_id != OG_INVALID_ID8 && ctrl_array[i]->lock_mode == DRC_LOCK_NULL);
            if (master_id_tmp != master_id) {
                break;  // master has changed
            }
            ctrl_array[i]->transfer_status = BUF_TRANS_TRY_REMOTE;
            page_ids[i] = ctrl_array[i]->page_id;
            valid_count++;
        }
    }

    if (valid_count > 0) {
        (void)dcs_try_get_page_exclusive_owner(session, ctrl_array, page_ids, count, master_id, &valid_count);
    }

    for (i = 0; i < count; i++) {
        if (ctrl_array[i] != NULL && (ctrl_array[i] != ctrl) && (ctrl_array[i]->lock_mode == DRC_LOCK_NULL)) {
            knl_panic(!DB_IS_CLUSTER(session) || (!(ctrl_array[i]->is_edp || ctrl_array[i]->is_dirty) &&
                                                  (ctrl_array[i]->load_status == (uint8)(BUF_NEED_LOAD))));
            ctrl_array[i]->load_status = BUF_LOAD_FAILED;
            buf_unlatch(session, ctrl_array[i], OG_TRUE);
            ctrl_array[i]->transfer_status = BUF_TRANS_NONE;
            ctrl_array[i] = NULL;
        }
    }

    if (valid_count > 0) {
        (void)dcs_claim_page_exclusive_owners(session, page_ids, count, master_id);
    }
    cm_pop(session->stack);
    return OG_SUCCESS;
}

static bool32 dtc_lock_in_rcy_space_set(uint16 uid)
{
    dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
    rcy_set_t *rcy_set = &dtc_rcy->rcy_set;
    for (uint32 i = 0; i < rcy_set->space_set_size; i++) {
        if (uid == rcy_set->space_id_set[i]) {
            return OG_TRUE;
        }
    }
    return OG_FALSE;
}

bool32 dtc_dcs_readable(knl_session_t *session, page_id_t page_id)
{
    drc_part_mngr_t *part_mngr = (&g_drc_res_ctx.part_mngr);
    if (part_mngr->remaster_status != REMASTER_DONE) {
        return OG_FALSE;
    }

    bool32 readable = (part_mngr->remaster_status == REMASTER_DONE &&
                       (g_rc_ctx->status >= REFORM_RECOVER_DONE || OGRAC_SESSION_IN_RECOVERY(session) ||
                        g_rc_ctx->status == REFORM_MOUNTING));

    // only check page_rcy_set in primary and partial recovery
    if (readable == OG_TRUE || !OGRAC_PART_RECOVERY(session) || !DB_IS_PRIMARY(&session->kernel->db)) {
        return readable;
    }

    dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
    if (dtc_rcy->recovery_status <= RECOVERY_ANALYSIS) {
        return OG_FALSE;
    }

    if (IS_INVALID_PAGID(page_id)) {
        return OG_FALSE;
    }

    bool32 page_need_recover = dtc_page_in_rcyset(session, page_id);
    return !page_need_recover;
}

static bool32 is_df_ctrl_lock(knl_session_t *session, drid_t *lock_id)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    drlock_t *df_ctrl_lock = &(db->df_ctrl_lock);
    drid_t *df_ctrl_lock_id = &(df_ctrl_lock->drid);
    if (df_ctrl_lock_id->type == lock_id->type && df_ctrl_lock_id->id == lock_id->id &&
        df_ctrl_lock_id->uid == lock_id->uid) {
        return OG_TRUE;
    }
    return OG_FALSE;
}

bool32 dtc_dls_readable(knl_session_t *session, drid_t *lock_id)
{
    drc_part_mngr_t *part_mngr = (&g_drc_res_ctx.part_mngr);
    if (part_mngr->remaster_status != REMASTER_DONE) {
        return OG_FALSE;
    }

    bool32 readable = (part_mngr->remaster_status == REMASTER_DONE &&
                       (g_rc_ctx->status >= REFORM_RECOVER_DONE || OGRAC_SESSION_IN_RECOVERY(session) ||
                        g_rc_ctx->status == REFORM_MOUNTING));

    // only check lock_space_set in primary and partial recovery
    if (readable == OG_TRUE || !OGRAC_PART_RECOVERY(session) || !DB_IS_PRIMARY(&session->kernel->db)) {
        return readable;
    }

    dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
    if (dtc_rcy->recovery_status <= RECOVERY_ANALYSIS) {
        return OG_FALSE;
    }

    if (is_df_ctrl_lock(session, lock_id))
    {
        return OG_TRUE;
    }

    uint16 uid = lock_id->uid;
    if (uid == OG_INVALID_ID16) {
        return OG_FALSE;
    }
    bool32 lock_need_recover = dtc_lock_in_rcy_space_set(uid);
    return !lock_need_recover;
}