/* -------------------------------------------------------------------------
 *  This file is part of the oGRAC project.
 * Copyright (c) 2024 Huawei Technologies Co.,Ltd.
 *
 * oGRAC is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * repl_log_replay.c
 *
 *
 * IDENTIFICATION
 * src/kernel/replication/repl_log_replay.c
 *
 * -------------------------------------------------------------------------
 */
#include "knl_replication_module.h"
#include "repl_log_replay.h"
#include "cm_log.h"
#include "cm_file.h"
#include "cm_dbs_ulog.h"
#include "knl_context.h"
#include "dtc_database.h"
#include "dtc_dls.h"

#define REPLAY_FAIL_THRESHOLD  100
#define LOAD_FAIL_THRESHOLD    10

/*
 * redo apply from file routine, including from archive files and log files
 * @param kernel session, log point, data size, need more log flag
 */
static status_t lrpl_replay(knl_session_t *session, log_point_t *point, uint32 data_size,
    log_batch_t *batch, uint32 block_size)
{
    bool32 need_more = OG_FALSE;
    bool32 replay_fail = OG_FALSE;
    lrpl_context_t *ogx = &session->kernel->lrpl_ctx;

    if (rcy_replay(session, point, data_size, batch, block_size, &need_more, &replay_fail, OG_FALSE) != OG_SUCCESS) {
        OG_LOG_RUN_INF("[Log Replayer] failed to replay log at point [%u-%u/%u/%llu]",
                       point->rst_id, point->asn, point->block_id, (uint64)point->lfn);
        return OG_ERROR;
    }

    if (replay_fail) {
        if (ogx->replay_fail_cnt < REPLAY_FAIL_THRESHOLD) {
            ogx->replay_fail_cnt++;
        }
    } else {
        ogx->replay_fail_cnt = 0;
    }

    if (!need_more) {
        OG_LOG_RUN_INF("[Log Replayer] failed to replay log at point [%u-%u/%u/%llu], no more log needed",
                       point->rst_id, point->asn, point->block_id, (uint64)point->lfn);
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

static bool32 lrpl_check_gap_exist(knl_session_t *session, log_point_t *point)
{
    uint32 file_id;

    log_lock_logfile(session);
    file_id = log_get_id_by_asn(session, (uint32)point->rst_id, point->asn, NULL);
    if (file_id == OG_INVALID_ID32) {
        log_unlock_logfile(session);
        return OG_TRUE;
    }

    log_unlatch_file(session, file_id);
    log_unlock_logfile(session);
    return OG_FALSE;
}

status_t lrpl_prepare_archfile(knl_session_t *session, log_point_t *point, bool32 *reset)
{
    lrpl_context_t *lrpl_ctx = &session->kernel->lrpl_ctx;
    thread_t *thread = &lrpl_ctx->thread;
    char arch_name[OG_FILE_NAME_BUFFER_SIZE] = {0};
    lftc_task_handle_t task_handle;
    bool32 fetch_done = OG_FALSE;

    *reset = OG_FALSE;

    bool32 file_exist = arch_get_archived_log_name(session, (uint32)point->rst_id, point->asn, ARCH_DEFAULT_DEST,
                                                   arch_name, OG_FILE_NAME_BUFFER_SIZE, session->kernel->id);
    if (file_exist && cm_file_exist(arch_name)) {
        return OG_SUCCESS;
    }

    arch_set_archive_log_name(session, (uint32)point->rst_id, point->asn, ARCH_DEFAULT_DEST, arch_name,
                              OG_FILE_NAME_BUFFER_SIZE, session->kernel->id);

    if (cm_file_exist(arch_name)) {
        return OG_SUCCESS;
    }

    if (lftc_clt_create_task(session, (uint32)point->rst_id, point->asn, arch_name, &task_handle) != OG_SUCCESS) {
        return OG_ERROR;
    }

    for (;;) {
        if (thread->closed) {
            return OG_ERROR;
        }

        if (lftc_clt_task_running(session, &task_handle, &fetch_done)) {
            cm_sleep(100);
            continue;
        }

        if (fetch_done || session->kernel->lftc_client_ctx.arch_lost) {
            lrpl_ctx->has_gap = session->kernel->lftc_client_ctx.arch_lost;
            break;
        }

        // Check replay point is in online
        uint32 file_id = log_get_id_by_asn(session, (uint32)point->rst_id, point->asn, NULL);
        if (file_id != OG_INVALID_ID32) {
            log_unlatch_file(session, file_id);
            *reset = OG_TRUE;
            *point = session->kernel->redo_ctx.curr_point;
            return OG_SUCCESS;
        }

        // Check whether restlog has changed.
        if (point->rst_id < session->kernel->db.ctrl.core.resetlogs.rst_id &&
            point->asn > session->kernel->db.ctrl.core.resetlogs.last_asn) {
            OG_LOG_RUN_INF("[LRPL] point rstid/asn [%u/%u], resetlogs rstid/last_asn [%u/%u], "
                "curr_file rstid/asn [%u/%u], current point [%u-%u/%u/%llu]",
                point->rst_id, point->asn, session->kernel->db.ctrl.core.resetlogs.rst_id,
                session->kernel->db.ctrl.core.resetlogs.last_asn,
                session->kernel->redo_ctx.files[session->kernel->redo_ctx.curr_file].head.rst_id,
                session->kernel->redo_ctx.files[session->kernel->redo_ctx.curr_file].head.asn,
                session->kernel->redo_ctx.curr_point.rst_id, session->kernel->redo_ctx.curr_point.asn,
                session->kernel->redo_ctx.curr_point.block_id,
                (uint64)session->kernel->redo_ctx.curr_point.lfn);
            *reset = OG_TRUE;
            *point = session->kernel->redo_ctx.curr_point;
            return OG_SUCCESS;
        }

        if (knl_failover_triggered(session->kernel)) {
            lrpl_ctx->has_gap = lrpl_check_gap_exist(session, point);
            return OG_ERROR;
        }

        // Sleep 3 seconds and retry
        cm_sleep(3000);

        arch_set_archive_log_name(session, (uint32)point->rst_id, point->asn, ARCH_DEFAULT_DEST, arch_name,
                                  OG_FILE_NAME_BUFFER_SIZE, session->kernel->id);
        if (cm_file_exist(arch_name)) {
            OG_LOG_RUN_INF("[Log Replayer] Archive log %s already exists", arch_name);
            return OG_SUCCESS;
        }

        if (lftc_clt_create_task(session, (uint32)point->rst_id, point->asn,
            arch_name, &task_handle) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    return OG_SUCCESS;
}

static inline void lrpl_switch_buf(lrpl_context_t *lrpl_ctx, rcy_context_t *rcy_ctx)
{
    if (rcy_ctx->paral_rcy) {
        if (rcy_ctx->swich_buf) {
            lrpl_ctx->read_buf = &rcy_ctx->read_buf2;
        } else {
            lrpl_ctx->read_buf = &rcy_ctx->read_buf;
        }
        rcy_ctx->swich_buf = !rcy_ctx->swich_buf;
    }
}

static bool32 lrpl_need_realloc_buf(log_batch_t *batch, lrpl_context_t *lrpl_ctx, rcy_context_t *rcy_ctx)
{
    if (rcy_ctx->paral_rcy) {
        if (rcy_ctx->swich_buf) {
            if (log_need_realloc_buf(batch, &rcy_ctx->read_buf, "rcy", OG_MAX_BATCH_SIZE)) {
                lrpl_ctx->read_buf = &rcy_ctx->read_buf;
                return OG_TRUE;
            }
        } else {
            if (log_need_realloc_buf(batch, &rcy_ctx->read_buf2, "rcy second buf", OG_MAX_BATCH_SIZE)) {
                lrpl_ctx->read_buf = &rcy_ctx->read_buf2;
                return OG_TRUE;
            }
        }
    } else {
        if (log_need_realloc_buf(batch, &rcy_ctx->read_buf, "rcy", OG_MAX_BATCH_SIZE)) {
            lrpl_ctx->read_buf = &rcy_ctx->read_buf;
            return OG_TRUE;
        }
    }
    return OG_FALSE;
}

static status_t lrpl_load(knl_session_t *session, log_point_t *point, uint32 *data_size, uint32 *block_size)
{
    rcy_context_t *rcy = &session->kernel->rcy_ctx;
    lrpl_context_t *ogx = &session->kernel->lrpl_ctx;
    log_context_t *log = &session->kernel->redo_ctx;
    lrcv_context_t *lrcv = &session->kernel->lrcv_ctx;
    uint32 file_id = OG_INVALID_ID32;
    rcy->loading_curr_file = OG_FALSE;

    /*
     * In redo switch waiting, the waiting point's asn may be far different from the current file's asn.
     * If current file's redo is read from online file, the replayed point can not be moved to the next
     * file successfully, causing replay blocked.
     * And replay from archive file can ensure the continuity of the archive on tandby node. So we set
     * file_id with invalid, to catchup from archive.
     */

    if (!lrcv->wait_info.waiting) {
        log_lock_logfile(session);
        file_id = log_get_id_by_asn(session, (uint32)point->rst_id, point->asn, &rcy->loading_curr_file);
        log_unlock_logfile(session);
    }

    if (file_id == OG_INVALID_ID32) {
        bool32 reset = OG_FALSE;
        if (lrpl_prepare_archfile(session, point, &reset) != OG_SUCCESS) {
            OG_LOG_RUN_INF("[Log Replayer] failed to prepare archive log at point [%u-%u/%u/%llu]",
                point->rst_id, point->asn, point->block_id, (uint64)point->lfn);
            return OG_ERROR;
        }

        if (reset) {
            rcy->loading_curr_file = lrcv->wait_info.waiting;
            return OG_SUCCESS;
        }

        if (rcy_load_from_arch(session, point, data_size, &rcy->arch_file, ogx->read_buf) != OG_SUCCESS) {
            OG_LOG_RUN_INF("[Log Replayer] failed to load archive log at point [%u-%u/%u/%llu]",
                point->rst_id, point->asn, point->block_id, (uint64)point->lfn);
            return OG_ERROR;
        }
        *block_size = (uint32)rcy->arch_file.head.block_size;
    } else {
        /*
         * Close archive logfile descriptor as soon as possible,
         * to free disk space after it has been cleaned automatically.
         */
        cm_close_file(rcy->arch_file.handle);
        rcy->arch_file.handle = OG_INVALID_HANDLE;

        /* rcy->read_buf.buf_size <= 64M, cannot overflow */
        if (rcy_load_from_online(session, file_id, point, data_size, ogx->log_handle + file_id,
                                 ogx->read_buf) != OG_SUCCESS) {
            OG_LOG_RUN_INF("[Log Replayer] failed to load online log[%u] at point [%u-%u/%u/%llu]",
                file_id, point->rst_id, point->asn, point->block_id, (uint64)point->lfn);
            return OG_ERROR;
        }
        *block_size = log->files[file_id].ctrl->block_size;
    }

    return OG_SUCCESS;
}

static status_t lrpl_do_replay(knl_session_t *session, log_point_t *point, uint32 data_size,
    log_batch_t *batch, uint32 block_size)
{
    lrpl_context_t *ogx = &session->kernel->lrpl_ctx;
    rcy_context_t *rcy = &session->kernel->rcy_ctx;
    date_t start_time = g_timer()->now;
    status_t status;

    rcy->last_lrpl_time = start_time;

    if (rcy->paral_rcy) {
        rcy_wait_preload_complete(session);
        rcy->wait_stats_view[PRELOAD_WAIT_TIME] = (uint64)(g_timer()->now - start_time) / MILLISECS_PER_SECOND;

        rcy_wait_replay_complete(session);
        if (rcy->last_point.asn != OG_INVALID_ASN) {
            ckpt_set_trunc_point(session, &rcy->last_point);
        }
    }

    rcy->wait_stats_view[PARAL_PROC_WAIT_TIME] = (uint64)(g_timer()->now - start_time) / MILLISECS_PER_SECOND;
    start_time = g_timer()->now;
    rcy->add_page_time = 0;
    rcy->add_bucket_time = 0;

    rcy->curr_group = rcy->group_list;
    rcy->curr_group_id = 0;
    /*
     * data_size less than 0.9 * buf_size, means no more log to relpay. When switch log file, the log tail is also
     * less than buf_size, but mostly log tail > 0.9 * buf_size
     */
    rcy->replay_no_lag = (data_size < RCY_NO_LAG_TRESHOLD(ogx->read_buf->buf_size));
    status = lrpl_replay(session, point, data_size, batch, block_size);

    rcy->wait_stats_view[GROUP_ANALYZE_TIME] = (uint64)(g_timer()->now - start_time) / MILLISECS_PER_SECOND;
    rcy->wait_stats_view[ADD_PAGE_TIME] = (uint64)rcy->add_page_time / MILLISECS_PER_SECOND;
    rcy->wait_stats_view[ADD_BUCKET_TIME] = (uint64)rcy->add_bucket_time / MILLISECS_PER_SECOND;

    rcy->last_point = *point;

    return status;
}

static void inline lrpl_record_load_err(lrpl_context_t *lrpl_ctx)
{
    if (lrpl_ctx->load_fail_cnt < LOAD_FAIL_THRESHOLD) {
        lrpl_ctx->load_fail_cnt++;
    }

    if (lrpl_ctx->load_fail_cnt >= LOAD_FAIL_THRESHOLD) {
        cm_reset_error();
        OG_LOG_RUN_ERR("[Log Replayer] log loading failures is %u, need repair", lrpl_ctx->load_fail_cnt);
    }
}

static status_t lrpl_perform(knl_session_t *session, log_point_t *point)
{
    uint32 data_size = 0;
    rcy_context_t *rcy = &session->kernel->rcy_ctx;
    lrpl_context_t *ogx = &session->kernel->lrpl_ctx;
    log_batch_t *batch = NULL;
    date_t start_time = g_timer()->now;
    date_t curr_time;
    uint32 block_size = 0;

    lrpl_switch_buf(ogx, rcy);

    if (lrpl_load(session, point, &data_size, &block_size) != OG_SUCCESS) {
        lrpl_record_load_err(ogx);
        return OG_ERROR;
    }

    ogx->load_fail_cnt = 0;
    batch = (log_batch_t *)ogx->read_buf->aligned_buf;
    if (lrpl_need_realloc_buf(batch, ogx, rcy)) {
        rcy_wait_replay_complete(session);
        return OG_SUCCESS;
    }

    curr_time = g_timer()->now;
    rcy->wait_stats_view[READ_LOG_TIME] = (uint64)(curr_time - start_time) / MILLISECS_PER_SECOND;
    rcy->wait_stats_view[READ_LOG_SIZE] = data_size / SIZE_M(1);
    rcy->wait_stats_view[REPALY_SPEED] = (uint64)data_size * MICROSECS_PER_SECOND / SIZE_M(1)
                                         / MAX(curr_time - rcy->last_lrpl_time, 1);
    
    if (lrpl_do_replay(session, point, data_size, batch, block_size) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

bool32 lrpl_need_replay(knl_session_t *session, log_point_t *point)
{
    log_context_t *redo_ctx = &session->kernel->redo_ctx;
    lrcv_context_t *lrcv = &session->kernel->lrcv_ctx;
    log_file_t *file = &redo_ctx->files[redo_ctx->curr_file];
    log_point_t *flush_point = &lrcv->flush_point;
    log_point_t *primary_point = &lrcv->primary_curr_point;
    log_point_t *wait_point = &lrcv->wait_info.wait_point;

    if (db_terminate_lfn_reached(session, point->lfn)) {
        return OG_FALSE;
    }

    // If standby is waiting for log switch, replay point should stop at wait point.
    if (lrcv->wait_info.waiting && log_cmp_point(point, wait_point) == 0) {
        return OG_FALSE;
    }

    if (session->kernel->rcy_ctx.log_decrypt_failed) {
        return OG_FALSE;
    }

    // IF flush_point is valid, choose the latest log point to check whether LRPL has finished its work
    if (flush_point->asn != OG_INVALID_ASN && lrcv->status > LRCV_DISCONNECTED && lrcv->status != LRCV_NEED_REPAIR) {
        if (log_cmp_point(primary_point, flush_point) >= 0) {
            return (bool32)(log_cmp_point(point, primary_point) < 0 && !LOG_POINT_LFN_EQUAL(point, primary_point));
        } else {
            return (log_cmp_point(point, flush_point) < 0);
        }
    } else {  // Else need to use the latest log point on current log file to check
        // If status is LOG_FILE_INACTIVE, the asn is 0
        if (file != NULL && (file->ctrl->status == LOG_FILE_CURRENT || file->ctrl->status == LOG_FILE_ACTIVE)) {
            return !log_point_equal(point, redo_ctx);
        }

        return (log_cmp_point(point, flush_point) < 0);
    }
}

static inline uint16 lrpl_get_new_curr_fileid(knl_session_t *session, log_point_t *replay_point)
{
    log_context_t *redo_ctx = &session->kernel->redo_ctx;
    log_file_t *file = &redo_ctx->files[redo_ctx->curr_file];

    if (file->head.asn == OG_INVALID_ASN) {
        return redo_ctx->curr_file;
    }

    knl_panic(file->head.asn > replay_point->asn);
    return (redo_ctx->curr_file + redo_ctx->logfile_hwm - (file->head.asn - replay_point->asn) %
           redo_ctx->logfile_hwm) % redo_ctx->logfile_hwm;
}

static void lrpl_reset_single_loginfo(knl_session_t *session, uint32 file_id, uint32 replay_asn)
{
    log_context_t *redo_ctx = &session->kernel->redo_ctx;
    log_file_t *file = redo_ctx->files + file_id;
    uint64 start_pos;

    if (file->head.asn > replay_asn) {
        OG_LOG_RUN_INF("[Log Replayer] reset logfile [%d]-[%u/%u/%llu]",
            file_id, file->head.rst_id, file->head.asn, file->head.write_pos);

        start_pos = CM_CALC_ALIGN(sizeof(log_file_head_t), file->ctrl->block_size);
        file->head.asn = OG_INVALID_ASN;
        redo_ctx->free_size += (file_id == redo_ctx->curr_file) ? (file->head.write_pos - start_pos) : 0;
        file->head.write_pos = start_pos;
        file->ctrl->status = LOG_FILE_INACTIVE;
        file->ctrl->archived = OG_FALSE;
        log_flush_head(session, file);
        if (db_save_log_ctrl(session, file_id, session->kernel->id) != OG_SUCCESS) {
            CM_ABORT(0, "[Log Replayer] ABORT INFO: save control space file failed when reset log file");
        }
    }
}

static void lrpl_reset_loginfo(knl_session_t *session, uint32 replay_asn)
{
    log_context_t *redo_ctx = &session->kernel->redo_ctx;
    uint32 file_id = redo_ctx->active_file;

    log_lock_logfile(session);

    while (file_id != redo_ctx->curr_file) {
        lrpl_reset_single_loginfo(session, file_id, replay_asn);
        log_get_next_file(session, &file_id, OG_FALSE);
    }

    log_flush_head(session, redo_ctx->files + redo_ctx->curr_file);
    lrpl_reset_single_loginfo(session, redo_ctx->curr_file, replay_asn);

    log_unlock_logfile(session);
}

static void lrpl_process_for_failover(knl_session_t *session)
{
    log_context_t *redo_ctx = &session->kernel->redo_ctx;
    lrpl_context_t *lrpl_ctx = &session->kernel->lrpl_ctx;
    log_file_t *file = NULL;
    uint16 new_curr_file;

    if (!lrpl_ctx->has_gap) {
        return;
    }

    new_curr_file = lrpl_get_new_curr_fileid(session, &lrpl_ctx->curr_point);

    OG_LOG_RUN_INF("[Log Replayer] replay has gap, active file [%u]-[%u/%u/%llu], "
                   "current file [%u]-[%u/%u/%llu], new current file %u",
                   redo_ctx->active_file, redo_ctx->files[redo_ctx->active_file].head.rst_id,
                   redo_ctx->files[redo_ctx->active_file].head.asn,
                   redo_ctx->files[redo_ctx->active_file].head.write_pos,
                   redo_ctx->curr_file, redo_ctx->files[redo_ctx->curr_file].head.rst_id,
                   redo_ctx->files[redo_ctx->curr_file].head.asn,
                   redo_ctx->files[redo_ctx->curr_file].head.write_pos, new_curr_file);

    arch_reset_archfile(session, lrpl_ctx->curr_point.asn);
    lrpl_reset_loginfo(session, lrpl_ctx->curr_point.asn);
    ckpt_trigger(session, OG_TRUE, CKPT_TRIGGER_FULL);

    file = &redo_ctx->files[new_curr_file];
    redo_ctx->curr_file = new_curr_file;
    redo_ctx->active_file = new_curr_file;
    file->head.asn = lrpl_ctx->curr_point.asn;
    file->ctrl->status = LOG_FILE_CURRENT;
    file->ctrl->archived = OG_FALSE;
    log_flush_head(session, file);
    if (db_save_log_ctrl(session, new_curr_file, session->kernel->id) != OG_SUCCESS) {
        CM_ABORT(0, "[Log Replayer] ABORT INFO: save control space file failed when reset log file");
    }

    dtc_my_ctrl(session)->log_first = redo_ctx->active_file;
    dtc_my_ctrl(session)->log_last = redo_ctx->curr_file;
    if (db_save_core_ctrl(session) != OG_SUCCESS) {
        CM_ABORT(0, "[Log Replayer] ABORT INFO: save core control space file failed when reset log file");
    }
}

static void lrpl_try_repair_file_offset(knl_session_t *session)
{
    log_context_t *log_ctx = &session->kernel->redo_ctx;
    rcy_context_t *rcy = &session->kernel->rcy_ctx;
    lrpl_context_t *lrpl_ctx = &session->kernel->lrpl_ctx;
    switch_ctrl_t *ctrl = &session->kernel->switch_ctrl;
    log_file_t *file = &log_ctx->files[log_ctx->curr_file];
    int64 buf_size = rcy->read_buf.buf_size;
    uint64 lfn = 0;
    uint64 scn;

    if (session->kernel->lrcv_ctx.reconnected || DB_IS_RAFT_ENABLED(session->kernel)) {
        return;
    }

    OG_LOG_RUN_INF("[Log Replayer] start to repair file offset");
    if (log_get_file_offset(session, file->ctrl->name, &rcy->read_buf,
        (uint64 *)&file->head.write_pos, &lfn, &scn) != OG_SUCCESS) {
        OG_LOG_RUN_ERR("[Log Replayer] failed to get file offset for logfile[%u] %s, latest lfn %llu",
            log_ctx->curr_file, file->ctrl->name, lfn);
        if (buf_size != rcy->read_buf.buf_size) {
            lrpl_ctx->read_buf = &rcy->read_buf;
        }

        cm_spin_lock(&ctrl->lock, NULL);
        ctrl->handling = OG_FALSE;
        ctrl->request = SWITCH_REQ_NONE;
        cm_spin_unlock(&ctrl->lock);
    }
    OG_LOG_RUN_INF("[Log Replayer] end to repair file offset");

    /* Add current file's remain size into free size. */
    log_ctx->free_size += log_file_freesize(file);

    /*
     * Set reconnected to true, to prevent parsing current redo file
     * and increasing free size repeatably.
     */
    session->kernel->lrcv_ctx.reconnected = OG_TRUE;
}

static inline bool32 lrpl_can_failover(knl_session_t *session)
{
    lrpl_context_t *lrpl = &session->kernel->lrpl_ctx;
    lrcv_context_t *lrcv = &session->kernel->lrcv_ctx;
    
    return (bool32)(lrcv->status == LRCV_NEED_REPAIR || lrpl->has_gap ||
            !lrpl_need_replay(session, &lrpl->curr_point) ||
            lrpl->load_fail_cnt >= LOAD_FAIL_THRESHOLD);
}

/* Get redo log size between begin point and end point */
status_t lrpl_log_size_btw_2points(knl_session_t *session, log_point_t begin, log_point_t end, uint64 *file_size)
{
    log_context_t *ogx = &session->kernel->redo_ctx;
    log_point_t point = begin;
    uint32 file_id;
    uint64 size;
    uint64 end_pos;

    *file_size = 0;
    point.block_id = (point.block_id == 0) ? 1 : point.block_id;

    while (LOG_POINT_FILE_LT(point, end) || LOG_POINT_FILE_EQUAL(point, end)) {
        size = 0;
        file_id = log_get_id_by_asn(session, (uint32)point.rst_id, point.asn, NULL);
        if (file_id == OG_INVALID_ID32) {
            arch_file_t arch_file;
            arch_file.handle = OG_INVALID_HANDLE;
            bool32 is_compress = OG_FALSE;
            if (rcy_load_arch(session, (uint32)point.rst_id, point.asn, &arch_file, &is_compress) != OG_SUCCESS) {
                cm_close_file(arch_file.handle);
                return OG_ERROR;
            }
            cm_close_file(arch_file.handle);

            if (LOG_POINT_FILE_EQUAL(point, end)) {
                end_pos = (uint64)end.block_id * arch_file.head.block_size;
            } else {
                end_pos = arch_file.head.write_pos;
            }

            if (end_pos > (uint64)point.block_id * arch_file.head.block_size) {
                size = end_pos - (uint64)point.block_id * arch_file.head.block_size;
            }
        } else {
            log_file_t *online_file = &ogx->files[file_id];

            if (LOG_POINT_FILE_EQUAL(point, end)) {
                end_pos = (uint64)end.block_id * online_file->ctrl->block_size;
            } else {
                end_pos = online_file->head.write_pos;
            }

            if (end_pos > (uint64)point.block_id * online_file->ctrl->block_size) {
                size = end_pos - (uint64)point.block_id * online_file->ctrl->block_size;
            }
            log_unlatch_file(session, file_id);
        }

        *file_size += size;
        /* switch to first point of next file */
        if (point.rst_id < end.rst_id && point.asn >= session->kernel->db.ctrl.core.resetlogs.last_asn) {
            point.rst_id++;
        }
        point.asn++;
        point.block_id = 1;
    }
    return OG_SUCCESS;
}

static void lrpl_wait_replay_complete(knl_session_t *session)
{
    rcy_context_t *rcy = &session->kernel->rcy_ctx;

    if (rcy->paral_rcy && rcy->last_point.asn != OG_INVALID_ASN) {
        rcy_wait_replay_complete(session);
        ckpt_set_trunc_point(session, &rcy->last_point);
        rcy->last_point.asn = OG_INVALID_ASN;
    }
}

static void lrpl_proc_loop(thread_t *thread)
{
    knl_session_t *session = (knl_session_t *)thread->argument;
    lrpl_context_t *lrpl = &session->kernel->lrpl_ctx;
    bool32 sleep_needed = OG_FALSE;

    while (!thread->closed) {
        if (lrpl->is_closing) {
            break;
        }

        if (sleep_needed) {
            if (knl_failover_triggered(session->kernel)) {
                lrpl_try_repair_file_offset(session);
                if (lrpl_can_failover(session)) {
                    OG_LOG_RUN_INF("[Log Replayer] failover triggered");
                    break;
                }
            }

            cm_sleep(20);
        }

        if (!lrpl_need_replay(session, &lrpl->curr_point)) {
            lrpl_wait_replay_complete(session);
            sleep_needed = OG_TRUE;
            lrpl->replay_fail_cnt = 0;
            lrpl->load_fail_cnt = 0;
            continue;
        }

        if (session->kernel->lftc_client_ctx.arch_lost || lrpl->load_fail_cnt >= LOAD_FAIL_THRESHOLD) {
            sleep_needed = OG_TRUE;
            continue;
        }

        if (lrpl_perform(session, &lrpl->curr_point) != OG_SUCCESS) {
            sleep_needed = OG_TRUE;
            continue;
        }

        sleep_needed = OG_FALSE;
    }

    lrpl_wait_replay_complete(session);
}

static void lrpl_update_resetlog_scn(knl_session_t *session)
{
    log_context_t *redo_ctx = &session->kernel->redo_ctx;
    core_ctrl_t *core = &session->kernel->db.ctrl.core;
    switch_ctrl_t *ctrl = &session->kernel->switch_ctrl;

    if (ctrl->request != SWITCH_REQ_NONE || OG_INVALID_SCN(redo_ctx->curr_scn)) {
        return;
    }

    core->reset_log_scn = redo_ctx->curr_scn;
    if (db_save_core_ctrl(session) != OG_SUCCESS) {
        CM_ABORT(0, "[Log Replayer] ABORT INFO: failed to save core ctrlfile");
    }
}

void dtc_lrpl_proc_loop(thread_t *thread)
{
    knl_session_t *session = (knl_session_t *)thread->argument;
    lrpl_context_t *lrpl = &session->kernel->lrpl_ctx;
    dtc_node_ctrl_t *node_ctrl = NULL;
    for (uint32 i = 0; i < g_dtc->profile.node_count; i++) {
        node_ctrl = dtc_get_ctrl(session, i);
        lrpl->dtc_curr_point[i] = node_ctrl->rcy_point;
    }

    while (!thread->closed) {
        if (rc_is_master() == OG_TRUE && lrpl->is_done == OG_FALSE) {
            OG_LOG_RUN_INF("[DTC LRPL]oGRAC master start lrpl replay");
            lrpl->is_replaying = OG_TRUE;
            if (dtc_recover(session) != OG_SUCCESS) {
                CM_ABORT_REASONABLE(0, "ABORT INFO: [DTC LRPL]dtc lrpl recovery failed");
                return;
            }
            lrpl->is_replaying = OG_FALSE;
        }
        cm_sleep(STANDBY_LRPL_WAIT_SLEEP_TIME);
    }
    
    return;
}

/*
 * lrpl thread global apply routine, include real time apply and archive apply
 * @param thread
 */
void lrpl_proc(thread_t *thread)
{
    knl_session_t *session = (knl_session_t *)thread->argument;
    lrpl_context_t *lrpl = &session->kernel->lrpl_ctx;
    rcy_context_t *rcy = &session->kernel->rcy_ctx;

    cm_set_thread_name("log_replayer");
    OG_LOG_RUN_INF("log replayer thread started");
    KNL_SESSION_SET_CURR_THREADID(session, cm_get_current_thread_id());

    lrpl->curr_point = session->kernel->redo_ctx.curr_point;
    lrpl->begin_point = lrpl->curr_point;
    rcy->last_lrpl_time = 0;

    if (DB_IS_CLUSTER(session)) {
        dtc_lrpl_proc_loop(thread);
        OG_LOG_RUN_INF("log replayer thread closed");
        KNL_SESSION_CLEAR_THREADID(session);
        return;
    }

    lrpl_proc_loop(thread);
    OG_LOG_RUN_INF("[Log Replayer] recovery end with log point: rst_id %u asn %u lfn %llu offset %u",
        lrpl->curr_point.rst_id, lrpl->curr_point.asn, (uint64)lrpl->curr_point.lfn, lrpl->curr_point.block_id);

    lrpl_process_for_failover(session);

    thread->closed = OG_TRUE;
    lrpl->read_buf = NULL;

    /* rcy_ctx->file is opened in lrpl_perform=>rcy_load_from_arch */
    cm_close_file(session->kernel->rcy_ctx.arch_file.handle);
    session->kernel->rcy_ctx.arch_file.handle = INVALID_FILE_HANDLE;
    for (uint32 i = 0; i < OG_MAX_LOG_FILES; i++) {
        cm_close_file(lrpl->log_handle[i]);
        lrpl->log_handle[i] = INVALID_FILE_HANDLE;
    }
    lrpl->end_time = cm_now();

    lrpl_update_resetlog_scn(session);
    OG_LOG_RUN_INF("log replayer thread closed");
    KNL_SESSION_CLEAR_THREADID(session);
}

status_t lrpl_init(knl_session_t *session)
{
    knl_session_t *replay_session = session->kernel->sessions[SESSION_ID_REPLAY];
    lrpl_context_t *lrpl = &replay_session->kernel->lrpl_ctx;
    uint32 i;

    lrpl->has_gap = OG_FALSE;
    lrpl->replay_fail_cnt = 0;
    lrpl->load_fail_cnt = 0;

    if (lrpl->read_buf == NULL) {
        lrpl->read_buf = &session->kernel->rcy_ctx.read_buf;
    }
    lrpl->is_closing = OG_FALSE;
    lrpl->is_promoting = OG_FALSE;
    lrpl->is_done = OG_FALSE;
    lrpl->curr_point.asn = OG_INVALID_ASN;
    lrpl->begin_point.asn = OG_INVALID_ASN;
    lrpl->begin_time = cm_now();
    lrpl->end_time = 0;
    lrpl->is_replaying = OG_FALSE;

    for (i = 0; i < OG_MAX_LOG_FILES; i++) {
        lrpl->log_handle[i] = -1;
    }

    if (rc_is_master() == OG_TRUE && OG_SUCCESS != cm_create_thread(lrpl_proc, 0, replay_session, &lrpl->thread)) {
        OG_LOG_RUN_INF("[Log Replayer] failed to start log replayer thread");
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

void lrpl_close(knl_session_t *session)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    lrpl_context_t *lrpl = &kernel->lrpl_ctx;
    lrpl->is_closing = OG_TRUE;
    cm_close_thread(&lrpl->thread);
}

bool32 lrpl_replay_blocked(knl_session_t *session)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    lrpl_context_t *lrpl = &kernel->lrpl_ctx;
    lrcv_context_t *lrcv = &kernel->lrcv_ctx;

    if (DB_STATUS(session) != DB_STATUS_OPEN || lrcv->status < LRCV_READY) {
        return OG_FALSE;
    }

    if (lrpl_need_replay(session, &lrpl->curr_point) &&
        (lrpl->replay_fail_cnt >= REPLAY_FAIL_THRESHOLD || lrpl->load_fail_cnt >= LOAD_FAIL_THRESHOLD)) {
        return OG_TRUE;
    }

    return OG_FALSE;
}

char* dtc_get_lrpl_status(knl_session_t *session)
{
    if (DB_IS_PRIMARY(&session->kernel->db)) {
        return "STOP_REPLAY";
    }
    if (cm_dbs_log_recycled()) {
        return "LOG_RECYCLED";
    }
    lrpl_context_t *lrpl_ctx = &session->kernel->lrpl_ctx;
    if (lrpl_ctx->is_replaying) {
        return "START_REPLAY";
    }
    return "STOP_REPLAY";
}

status_t dtc_cal_redo_size_by_node_id(knl_session_t *session, uint32 node_id, uint32* redo_recovery_size)
{
    logfile_set_t *log_set = LOGFILE_SET(session, node_id);
    lrpl_context_t *lrpl_ctx = &session->kernel->lrpl_ctx;
    log_file_t *log_file = &log_set->items[0];
    device_type_t type = cm_device_type(log_file->ctrl->name);
    int32 handle = -1;
    if (cm_open_device_no_retry(log_file->ctrl->name, type, knl_io_flag(session), &handle) != OG_SUCCESS) {
        OG_LOG_RUN_ERR("[DB] failed to open redo log file %s ", log_file->ctrl->name);
        return OG_ERROR;
    }
    if (cm_device_get_used_cap_no_retry(log_file->ctrl->type, handle, lrpl_ctx->dtc_curr_point[node_id].lsn + 1,
                                        redo_recovery_size) != OG_SUCCESS) {
        OG_LOG_RUN_ERR("failed to fetch rcy redo log size of rcy point lsn(%llu) from DBStor",
                       lrpl_ctx->dtc_curr_point[node_id].lsn + 1);
        return OG_ERROR;
    }
    OG_LOG_DEBUG_INF("succ to fetch rcy redo log size (%u) of rcy point lsn(%llu) from DBStor", *redo_recovery_size,
                     lrpl_ctx->dtc_curr_point[node_id].lsn + 1);
    return OG_SUCCESS;
}

status_t dtc_cal_lrpl_redo_size(knl_session_t *session, uint32* redo_recovery_size, double* redo_recovery_time)
{
    if (DB_IS_PRIMARY(&session->kernel->db)) {
        return OG_SUCCESS;
    }
    lrpl_context_t *lrpl_ctx = &session->kernel->lrpl_ctx;
    if (lrpl_ctx->is_replaying == OG_FALSE) {
        return OG_SUCCESS;
    }
    dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
    uint32 rcy_log_size = 0;
    for (uint32 node_id = 0; node_id < dtc_rcy->node_count; node_id++) {
        if (dtc_cal_redo_size_by_node_id(session, node_id, &rcy_log_size) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("failed to fetch rcy redo log size of node %u", node_id);
            return OG_ERROR;
        }
        *redo_recovery_size += rcy_log_size / SIZE_K(1);
    }

    if (lrpl_ctx->lrpl_speed != 0) {
        *redo_recovery_time = ((double)(*redo_recovery_size) / lrpl_ctx->lrpl_speed) / MICROSECS_PER_SECOND;
    }
    return OG_SUCCESS;
}