/* -------------------------------------------------------------------------
 *  This file is part of the oGRAC project.
 * Copyright (c) 2024 Huawei Technologies Co.,Ltd.
 *
 * oGRAC is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * bak_build.c
 *
 *
 * IDENTIFICATION
 * src/kernel/backup/bak_build.c
 *
 * -------------------------------------------------------------------------
 */
#include "knl_backup_module.h"
#include "bak_build.h"
#include "cm_file.h"
#include "cm_kmc.h"
#include "knl_backup.h"
#include "bak_restore.h"
#include "knl_context.h"
#include "dtc_database.h"

static void bak_set_build_params(bak_t *bak, cs_pipe_t *pipe, cs_packet_t *send_pack, cs_packet_t *recv_pack,
                                 build_param_ctrl_t *ctrl)
{
    bak_remote_t *remote = &bak->remote;
    bak_attr_t *attr = &bak->record.attr;
    char ipstr[CM_MAX_IP_LEN];

    bak->record.start_time = (uint64)cm_now();
    bak->is_building = OG_TRUE;
    bak->record.log_only = OG_FALSE;
    bak->record.data_only = OG_FALSE;
    bak->record.is_repair = ctrl->is_repair;
    bak->record.is_increment = ctrl->is_increment;
    attr->backup_type = BACKUP_MODE_FULL;
    attr->level = (ctrl->is_increment || ctrl->is_repair) ? OG_TRUE : OG_FALSE;
    attr->base_lsn = ctrl->base_lsn;
    attr->compress = ctrl->compress;
    bak->compress_ctx.compress_level = ctrl->compress_level;
    bak->encrypt_info.encrypt_alg = ENCRYPT_NONE;
    bak->record.device = DEVICE_DISK;
    bak->cumulative = OG_FALSE;
    bak->is_first_link = OG_TRUE;
    bak->need_retry = OG_FALSE;
    bak->need_check = OG_FALSE;
    bak->failed = OG_FALSE;
    bak->proc_count = ctrl->parallelism;
    bak->backup_buf_size = ctrl->buffer_size;

    remote->pipe = pipe;
    remote->send_pack = send_pack;
    remote->recv_pack = recv_pack;

    errno_t ret = strncpy_s(bak->peer_host, OG_HOST_NAME_BUFFER_SIZE, cm_inet_ntop(
        (struct sockaddr *)&pipe->link.tcp.remote.addr, ipstr, CM_MAX_IP_LEN), OG_HOST_NAME_BUFFER_SIZE - 1);
    knl_securec_check(ret);

    OG_LOG_RUN_INF("[BUILD] is increment %u, base lsn %llu ", ctrl->is_increment, ctrl->base_lsn);
}

bool32 knl_brain_repair_check(knl_session_t *session)
{
    reset_log_t *reset_log = &session->kernel->lrcv_ctx.primary_resetlog;
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    log_point_t *rcy = &dtc_my_ctrl(session)->rcy_point;

    if ((kernel->lrcv_ctx.dbid != 0) && (kernel->lrcv_ctx.dbid != kernel->db.ctrl.core.dbid)) {
        OG_LOG_RUN_ERR("[REPAIR] primary dbid [%u] is not equal to standby dbid [%u], need repair",
            kernel->lrcv_ctx.dbid, kernel->db.ctrl.core.dbid);
        OG_THROW_ERROR(ERR_INVALID_OPERATION, ", primary dbid is not equal to standby dbid.");
        return OG_FALSE;
    }

    if (reset_log->last_asn > rcy->asn || rcy->rst_id >= reset_log->rst_id) {
        OG_LOG_RUN_ERR("[REPAIR] reset_log and rcy point not matched. reset_log rst_id-asn-lfn: %u-%u-%llu,"
            "rcy point rst_id-asn-lfn: %u-%u-%llu.", reset_log->rst_id, reset_log->last_asn, reset_log->last_lfn,
            rcy->rst_id, rcy->asn, (uint64)rcy->lfn);
        OG_THROW_ERROR(ERR_INVALID_OPERATION, ", asn of reset_log is larger than asn of rcy or rst_id of rcy is larger "
            "than rst_id of reset_log.");
        return OG_FALSE;
    }

    uint32 log_count = log_get_count(session);
    if (rcy->asn - reset_log->last_asn >= log_count) {
        OG_LOG_RUN_ERR("[REPAIR] the count of log between rcy(asn: %u) and reset_log(asn: %u) is larger than "
            "log_count %u.", rcy->asn, reset_log->last_asn, log_count);
        OG_THROW_ERROR(ERR_INVALID_OPERATION, ", log generated by standby is too large.");
        return OG_FALSE;
    }

    if (reset_log->rst_id - rcy->rst_id > 1) {
        OG_LOG_RUN_ERR("[REPAIR] reset_log and rcy point not matched. reset_log rst_id-asn-lfn: %u-%u-%llu,"
            "rcy point rst_id-asn-lfn: %u-%u-%llu.", reset_log->rst_id, reset_log->last_asn, reset_log->last_lfn,
            rcy->rst_id, rcy->asn, (uint64)rcy->lfn);
        OG_THROW_ERROR(ERR_INVALID_OPERATION, ", rst_id of reset_log and rcy is not mapped.");
        return OG_FALSE;
    }

    if (reset_log->last_lfn > rcy->lfn) {
        OG_LOG_RUN_ERR("[REPAIR] primary lfn [%llu] is larger than standby rcy lfn [%llu]", reset_log->last_lfn, (uint64)rcy->lfn);
        OG_THROW_ERROR(ERR_INVALID_OPERATION, ", primary lfn is larger than standby rcy lfn.");
        return OG_FALSE;
    }

    return OG_TRUE;
}

static status_t build_analyze_mem_init(knl_session_t *session)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    build_analyse_bucket_t *free_list = &bak->build_aly_free_list;
    int64 buf_size = BUILD_ALY_MAX_PAGE_SIZE + BUILD_ALY_MAX_ITEM_SIZE + BUILD_ALY_MAX_BUCKET_SIZE;
    errno_t ret;

    if (cm_aligned_malloc(buf_size, "build log analysis", &bak->build_aly_mem) != OG_SUCCESS) {
        return OG_ERROR;
    }
    bak->build_aly_pages = (page_id_t *)bak->build_aly_mem.aligned_buf;
    bak->build_aly_items = (build_analyse_item_t *)(bak->build_aly_mem.aligned_buf + BUILD_ALY_MAX_PAGE_SIZE);
    bak->build_aly_buckets = (build_analyse_bucket_t *)(bak->build_aly_mem.aligned_buf + BUILD_ALY_MAX_PAGE_SIZE +
                                                        BUILD_ALY_MAX_ITEM_SIZE);

    ret = memset_sp(bak->build_aly_mem.aligned_buf, (size_t)buf_size, 0, (size_t)buf_size);
    knl_securec_check(ret);

    free_list->count = BUILD_ALY_MAX_ITEM;
    free_list->first = &bak->build_aly_items[0];
    for (uint32 i = 0; i < BUILD_ALY_MAX_ITEM - 1; i++) {
        bak->build_aly_items[i].next = &bak->build_aly_items[i + 1];  // init free list
        bak->build_aly_items[i].page_id = &bak->build_aly_pages[i];
    }
    bak->build_aly_items[BUILD_ALY_MAX_ITEM - 1].page_id = &bak->build_aly_pages[BUILD_ALY_MAX_ITEM - 1];
    bak->page_count = 0;

    if (cm_aligned_malloc(OG_MAX_BATCH_SIZE, "build analyze log buffer", &bak->log_buf) != OG_SUCCESS) {
        cm_aligned_free(&bak->build_aly_mem);
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

static void build_analyze_mem_free(knl_session_t *session)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;

    cm_aligned_free(&bak->build_aly_mem);
    bak->build_aly_pages = NULL;
    bak->build_aly_items = NULL;
    bak->build_aly_buckets = NULL;
    bak->page_count = 0;
    cm_aligned_free(&bak->log_buf);
}

build_analyse_item_t *build_analyze_pop_free_item(knl_session_t *session)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    build_analyse_bucket_t *free_list = &bak->build_aly_free_list;
    build_analyse_item_t *item = NULL;

    if (free_list->first == NULL) {
        return NULL;
    }

    item = free_list->first;
    free_list->first = item->next;
    free_list->count--;
    item->next = NULL;

    return item;
}

static void brain_repair_invalidate_page(bak_t *bak, page_id_t *page_id)
{
    uint32 file_hash = page_id->file % BUILD_ALY_MAX_FILE;
    uint32 page_hash = page_id->page % BUILD_ALY_MAX_BUCKET_PER_FILE;
    build_analyse_bucket_t *bucket = &bak->build_aly_buckets[file_hash * BUILD_ALY_MAX_BUCKET_PER_FILE + page_hash];

    build_analyse_item_t *item = bucket->first;
    while (item != NULL) {
        if (IS_SAME_PAGID(*item->page_id, *page_id)) {
            item->page_id->file = OG_INVALID_FILEID;
            break;
        }
        item = item->next;
    }
}

void brain_repair_filer_page_from_remote(knl_session_t *session, page_head_t *page, uint32 page_count)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    page_head_t *page_head = page;
    if (!bak->record.is_repair) {
        return;
    }
    char *page_buff = NULL;
    for (uint32 i = 0; i < page_count; i++) {
        page_id_t *page_id = AS_PAGID_PTR(page_head->id);
        brain_repair_invalidate_page(bak, page_id);
        page_buff = (char*)page_head + DEFAULT_PAGE_SIZE(session);
        page_head = (page_head_t *)page_buff;
    }

    return;
}

static status_t build_aly_add_page(knl_session_t *session, page_id_t page_id)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    uint32 file_hash = page_id.file % BUILD_ALY_MAX_FILE;
    uint32 page_hash = page_id.page % BUILD_ALY_MAX_BUCKET_PER_FILE;
    build_analyse_bucket_t *bucket = &bak->build_aly_buckets[file_hash * BUILD_ALY_MAX_BUCKET_PER_FILE + page_hash];

    build_analyse_item_t *item = bucket->first;
    while (item != NULL) {
        if (IS_SAME_PAGID(*item->page_id, page_id)) {
            return OG_SUCCESS;
        }
        item = item->next;
    }

    build_analyse_item_t *new_item = build_analyze_pop_free_item(session);
    if (new_item == NULL) {
        OG_LOG_RUN_ERR("[BUILD] pages changed by standby is too much.");
        OG_THROW_ERROR(ERR_INVALID_OPERATION, ", pages changed by standby is too much.");
        return OG_ERROR;
    }

    OG_LOG_DEBUG_INF("[REPAIR] standby find page %u-%u", page_id.file, page_id.page);

    new_item->next = bucket->first;
    *new_item->page_id = page_id;
    bucket->first = new_item;
    bucket->count++;
    bak->page_count++;
    return OG_SUCCESS;
}

static void build_analyze_construct_hash(knl_session_t *session)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    build_analyse_item_t *item = NULL;
    build_analyse_bucket_t *bucket = NULL;
    uint32 file_hash;
    uint32 page_hash;

    for (uint32 i = 0; i < bak->page_count; i++) {
        item = &bak->build_aly_items[i];
        file_hash = item->page_id->file % BUILD_ALY_MAX_FILE;
        page_hash = item->page_id->page % BUILD_ALY_MAX_BUCKET_PER_FILE;
        bucket = &bak->build_aly_buckets[file_hash * BUILD_ALY_MAX_BUCKET_PER_FILE + page_hash];

        item->next = bucket->first;
        bucket->first = item;
        bucket->count++;
    }
}

static status_t build_find_batch(knl_session_t *session, device_type_t type, int32 *handle, log_file_head_t *head,
                          log_point_t *point, uint32 *data_size, aligned_buf_t *align_buf)
{
    uint64 size;
    uint64 remain_size;
    uint64 buf_size = (uint64)align_buf->buf_size;
    bool32 finished = OG_FALSE;
    log_batch_t *batch = NULL;
    log_batch_tail_t *tail = NULL;
    char *buf = align_buf->aligned_buf;
    uint64 last_lfn = 0;

    int64 file_size = cm_file_size(*handle);
    if (file_size == -1) {
        OG_LOG_RUN_ERR("[BUILD] failed to get log [%u-%u] size ", point->rst_id, point->asn);
        OG_THROW_ERROR(ERR_SEEK_FILE, 0, SEEK_END, errno);
        return OG_ERROR;
    }
    uint64 offset = CM_CALC_ALIGN(sizeof(log_file_head_t), (uint32)head->block_size);
    do {
        size = (uint64)file_size - offset;
        size = size > buf_size ? buf_size : size;
        if (size == 0 || finished) {
            break;
        }

        if (cm_read_device(type, *handle, (int64)offset, buf, (int32)size) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[BUILD] Failed to read log [%u-%u] file", point->rst_id, point->asn);
            return OG_ERROR;
        }

        batch = (log_batch_t *)buf;
        tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));

        remain_size = size;
        while (remain_size >= sizeof(log_batch_t)) {
            if (!rcy_validate_batch(batch, tail) || batch->head.point.rst_id != head->rst_id ||
                (last_lfn != 0 && batch->head.point.lfn != last_lfn + 1)) {
                finished = OG_TRUE;
                OG_LOG_RUN_INF("[BUILD] log [%u-%u] offset %llu invalid batch size %u "
                               "head [%llu/%u-%u/%llu/%llu] latest_lfn %llu",
                               head->rst_id, head->asn, offset, batch->size, batch->head.magic_num,
                               batch->head.point.rst_id, batch->head.point.asn,
                               (uint64)batch->head.point.lfn, batch->raft_index, last_lfn);

                break;
            }
            last_lfn = batch->head.point.lfn;
            if (batch->head.point.lfn == point->lfn) {
                finished = OG_TRUE;
                if (session->kernel->lrcv_ctx.primary_reset_log_scn == batch->scn ||
                    session->kernel->lrcv_ctx.primary_reset_log_scn == 0) {
                    *data_size = batch->size;
                } else {
                    *data_size = 0;
                    OG_LOG_RUN_ERR("[BUILD] For same lfn [%llu] in resetlog, scn from primary and standby [%llu/%llu] "
                                   "is different, break-brain build is not allowed", (uint64)point->lfn,
                                   session->kernel->lrcv_ctx.primary_reset_log_scn, batch->scn);
                }
                break;
            }
            offset += batch->space_size;
            remain_size -= batch->space_size;
            batch = (log_batch_t *)((char *)batch + batch->space_size);
            if (remain_size < batch->space_size) {
                break;
            }
            tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));
        }
    } while (1);

    if (*data_size != 0) {
        errno_t ret = memmove_s(buf, (uint32)buf_size, batch, batch->size);
        knl_securec_check(ret);
    }
    return OG_SUCCESS;
}

static status_t build_load_from_online(knl_session_t *session, uint32 file_id, log_point_t *point, uint32 *data_size,
    aligned_buf_t *align_buf)
{
    log_file_head_t head;
    int32 handle = OG_INVALID_HANDLE;
    log_file_t *file = &session->kernel->redo_ctx.files[file_id];

    if (cm_open_device(file->ctrl->name, file->ctrl->type, knl_redo_io_flag(session), &handle) != OG_SUCCESS) {
        OG_LOG_RUN_ERR("[BUILD] failed to open %s", file->ctrl->name);
        return OG_ERROR;
    }

    uint64 size = CM_CALC_ALIGN(sizeof(log_file_head_t), file->ctrl->block_size);
    log_file_head_t *tmp_head = (log_file_head_t *)align_buf->aligned_buf;
    if (cm_read_device(file->ctrl->type, handle, 0, tmp_head, (int32)size) != OG_SUCCESS) {
        cm_close_device(file->ctrl->type, &handle);
        OG_LOG_RUN_ERR("[BUILD] failed to read %s ", file->ctrl->name);
        return OG_ERROR;
    }

    head.block_size = tmp_head->block_size;
    head.rst_id = tmp_head->rst_id;
    head.asn = tmp_head->asn;
    if (build_find_batch(session, file->ctrl->type, &handle, &head, point, data_size, align_buf) != OG_SUCCESS) {
        cm_close_device(file->ctrl->type, &handle);
        return OG_ERROR;
    }

    cm_close_device(file->ctrl->type, &handle);
    return OG_SUCCESS;
}

static status_t build_load_from_arch(knl_session_t *session, log_point_t *point, uint32 *data_size, aligned_buf_t *align_buf)
{
    arch_file_t arch_file = { .name = { 0 }, .handle = OG_INVALID_HANDLE };
    bool32 is_compress = OG_FALSE;
    if (rcy_load_arch(session, (uint32)point->rst_id, point->asn, &arch_file, &is_compress) != OG_SUCCESS)  {
        /* max rst_id <= 2^18, cannot overflow */
        cm_close_device(cm_device_type(arch_file.name), &arch_file.handle);
        rcy_close_file(session);
        return OG_ERROR;
    }

    if (build_find_batch(session, cm_device_type(arch_file.name), &arch_file.handle, &arch_file.head, point, data_size,
                         align_buf) != OG_SUCCESS) {
        cm_close_device(cm_device_type(arch_file.name), &arch_file.handle);
        rcy_close_file(session);
        return OG_ERROR;
    }

    cm_close_device(cm_device_type(arch_file.name), &arch_file.handle);
    return OG_SUCCESS;
}

static status_t build_load_batch(knl_session_t *session, log_point_t *point, uint32 *data_size, aligned_buf_t *buf)
{
    status_t status = OG_SUCCESS;
    knl_instance_t *kernel = session->kernel;
    bak_context_t *ogx = &kernel->backup_ctx;
    bak_t *bak = &ogx->bak;

    log_lock_logfile(session);
    uint32 file_id = bak_log_get_id(session, bak->record.data_type, (uint32)point->rst_id, point->asn);
    log_unlock_logfile(session);

    if (file_id != OG_INVALID_ID32) {
        status = build_load_from_online(session, file_id, point, data_size, buf);
        log_unlatch_file(session, file_id);
    } else {
        status = build_load_from_arch(session, point, data_size, buf);
    }

    return status;
}

static status_t bak_build_prepare(knl_session_t *session)
{
    struct st_knl_instance *kernel = session->kernel;
    bak_context_t *ogx = &kernel->backup_ctx;
    bak_t *bak = &ogx->bak;
    bak_remote_t *remote = &bak->remote;
    /*
     * malloc a memory space to store four parts as follows:
     * |--bak->backup_buf--------|--bak->compress_buf------|--remote->send_pack----|--remote->recv_pack----|
     * |--BACKUP_BUFFER_SIZE(bak)--|--BACKUP_BUFFER_SIZE(bak)--|--sizeof(cs_packet_t)--|--sizeof(cs_packet_t)--|
     * so the total space size is BACKUP_BUFFER_SIZE(bak) * 2 and sizeof(cs_packet_t) * 2
     */
    size_t size = BACKUP_BUFFER_SIZE(bak) + BACKUP_BUFFER_SIZE(bak) + sizeof(cs_packet_t) + sizeof(cs_packet_t);
    char *buf = (char *)malloc(size);
    if (buf == NULL) {
        OG_THROW_ERROR(ERR_ALLOC_MEMORY, (uint64)size, "backup");
        return OG_ERROR;
    }

    size_t offset = 0;
    bak->backup_buf = buf + offset;
    offset += BACKUP_BUFFER_SIZE(bak);

    bak_reset_process_ctrl(bak, OG_TRUE);
    bak->compress_buf = buf + offset;
    offset += BACKUP_BUFFER_SIZE(bak);
    /*
     * between the memory addresses of remote->send_pack and bak->backup_buf are bak->backup_buf and bak->compress_buf,
     * so remote->send_pack is offset by 2 * OG_BACKUP_BUFFER_SIZE compared to bak->backup_buf
     */
    remote->send_pack = (cs_packet_t *)(buf + offset);
    offset += sizeof(cs_packet_t);
    /*
     * between the memory addresses of remote->recv_pack and bak->backup_buf are bak->backup_buf,bak->compress_buf and
     * remote->send_pack, so remote->recv_pack is offset by 2 * OG_BACKUP_BUFFER_SIZE and sizeof(cs_packet_t) compared
     * to bak->backup_buf
     */
    remote->recv_pack = (cs_packet_t *)(buf + offset);
    remote->pipe = &remote->send_pipe;
    remote->pipe->link.tcp.sock = CS_INVALID_SOCKET;
    bak->record.start_time = (uint64)cm_now();
    bak->record.attr.backup_type = BACKUP_MODE_FULL;
    bak->is_building = OG_TRUE;

    // if return og_error, free ctrl_data_buf in bak_build_end
    bak->ctrl_data_buf = (char *)malloc(CTRL_MAX_PAGES(session) * OG_DFLT_CTRL_BLOCK_SIZE);
    if (bak->ctrl_data_buf == NULL) {
        OG_THROW_ERROR(ERR_ALLOC_MEMORY, (uint32)(CTRL_MAX_PAGES(session) * OG_DFLT_CTRL_BLOCK_SIZE), "build");
        return OG_ERROR;
    }

    if (bak->proc_count > BUILD_SINGLE_THREAD) {
        if (cm_aligned_malloc(BACKUP_BUFFER_SIZE(bak), "rst stream buf0", &bak->recv_stream.bufs[0]) != OG_SUCCESS) {
            free(bak->ctrl_data_buf);
            bak->ctrl_data_buf = NULL;
            return OG_ERROR;
        }
        if (cm_aligned_malloc(BACKUP_BUFFER_SIZE(bak), "rst stream buf1", &bak->recv_stream.bufs[1]) != OG_SUCCESS) {
            free(bak->ctrl_data_buf);
            bak->ctrl_data_buf = NULL;
            cm_aligned_free(&bak->recv_stream.bufs[0]);
            return OG_ERROR;
        }
        bak->recv_stream.buf_size = BACKUP_BUFFER_SIZE(bak);
    }

    if (rst_start_write_thread(&ogx->process[BAK_COMMON_PROC]) != OG_SUCCESS) {
        free(bak->ctrl_data_buf);
        bak->ctrl_data_buf = NULL;
        cm_aligned_free(&bak->recv_stream.bufs[0]);
        cm_aligned_free(&bak->recv_stream.bufs[1]);
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

void bak_build_end(knl_session_t *session)
{
    bak_context_t *ogx = &session->kernel->backup_ctx;
    bak_t *bak = &ogx->bak;
    bak_error_t *error_info = &bak->error_info;

    OG_LOG_RUN_INF("[BUILD] remote backup finished, release resources");

    // reset common process finally
    for (int32 i = OG_MAX_BACKUP_PROCESS - 1; i >= 0; i--) {
        bak_reset_process(&ogx->process[i]);
    }

    if (bak->backup_buf != NULL) {
        free(bak->backup_buf);
        bak->backup_buf = NULL;
        bak->depends = NULL;
        bak->compress_buf = NULL;
        bak->ctrl_backup_buf = NULL;
    }

    if (bak->ctrl_data_buf != NULL) {
        free(bak->ctrl_data_buf);
        bak->ctrl_data_buf = NULL;
    }

    /* running on standby, standby decompress file */
    bak_free_compress_context(session, OG_FALSE);

    if (bak->proc_count > BUILD_SINGLE_THREAD) {
        cm_aligned_free(&bak->recv_stream.bufs[0]);
        cm_aligned_free(&bak->recv_stream.bufs[1]);
        bak->recv_stream.buf_size = 0;
    }

    cm_close_device(bak->local.type, &bak->local.handle);
    bak->local.handle = OG_INVALID_HANDLE;

    if (bak->failed && !bak->need_retry) {
        bak_set_fail_error(error_info, "build");
        OG_LOG_RUN_INF("[BUILD] build failed, error code :%d, %s", error_info->err_code, error_info->err_msg);
    }

    build_disconnect(bak);

    bak->failed = OG_FALSE;
    bak->is_building = OG_FALSE;
    bak->record.is_increment = OG_FALSE;
    bak->record.is_repair = OG_FALSE;
    if (rst_delete_track_file(session, bak, OG_TRUE) != OG_SUCCESS) {
        OG_LOG_RUN_WAR("[BUILD] failed to delete bak process file.");
    }

    bak_unset_running(session, ogx);
}

static status_t bak_build_connect(knl_session_t *session)
{
    bak_context_t *ogx = &session->kernel->backup_ctx;
    bak_t *bak = &ogx->bak;
    bak_remote_t *remote = &bak->remote;
    uint16 port;
    char url[OG_HOST_NAME_BUFFER_SIZE + OG_TCP_PORT_MAX_LENGTH + 1] = { 0 };
    char server_host[OG_HOST_NAME_BUFFER_SIZE];
    char bind_host[OG_HOST_NAME_BUFFER_SIZE];
    int32 ret;

    if (lrcv_get_primary_server(session, OG_BACKUP_RETRY_COUNT, server_host, OG_HOST_NAME_BUFFER_SIZE,
        &port) != OG_SUCCESS) {
        return OG_ERROR;
    }

    ret = snprintf_s(url, sizeof(url), sizeof(url) - 1, "%s:%u", server_host, (uint32)port);
    knl_securec_check_ss(ret);
    OG_LOG_RUN_INF("[BUILD] remote backup, standby connect to primary %s", url);

    remote->pipe->connect_timeout = REPL_CONNECT_TIMEOUT;
    remote->pipe->socket_timeout = REPL_SOCKET_TIMEOUT;
    arch_get_bind_host(session, server_host, bind_host, OG_HOST_NAME_BUFFER_SIZE);
    if (cs_connect(url, remote->pipe, bind_host, NULL, NULL) != OG_SUCCESS) {
        OG_LOG_RUN_ERR("[BACKUP] failed to connect %s", url);
        return OG_ERROR;
    }

    if (knl_login(session, remote->pipe, REP_LOGIN_BACKUP, (const char *)bind_host, NULL) != OG_SUCCESS) {
        OG_LOG_RUN_ERR("[BACKUP] login failed");
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

static status_t bak_build_start(knl_session_t *session)
{
    bak_context_t *ogx = &session->kernel->backup_ctx;
    bak_t *bak = &ogx->bak;

    if (arch_init(session) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (bak_alloc_compress_context(session, OG_FALSE) != OG_SUCCESS) {
        bak->failed = OG_TRUE;
        return OG_ERROR;
    }

    if (bak_build_prepare(session) != OG_SUCCESS) {
        bak->failed = OG_TRUE;
        return OG_ERROR;
    }

    for (uint32 i = 0; i < BUILD_MAX_RETRY; i++) {
        if (bak_build_connect(session) == OG_SUCCESS) {
            return OG_SUCCESS;
        }
    }

    bak->need_retry = OG_TRUE;
    return OG_ERROR;
}

static bak_file_type_t rst_calc_file_type(uint32 type)
{
    bak_file_type_t file_type;

    switch (type) {
        case BAK_MSG_TYPE_CTRL:
            file_type = BACKUP_CTRL_FILE;
            break;
        case BAK_MSG_TYPE_DATA:
            file_type = BACKUP_DATA_FILE;
            break;
        case BAK_MSG_TYPE_ARCH:
            file_type = BACKUP_ARCH_FILE;
            break;
        case BAK_MSG_TYPE_LOG:
            file_type = BACKUP_LOG_FILE;
            break;
        default:
            file_type = BACKUP_HEAD_FILE;
            break;
    }

    return file_type;
}

static inline bool32 rst_restart_from_file_end(bak_t *bak, bak_start_msg_t *start_msg)
{
    return (BAK_IS_FULL_BUILDING(bak) && bak->need_check && bak->file_count != start_msg->curr_file_index);
}

static status_t rst_read_start_msg(knl_session_t *session, bak_start_msg_t *start_msg)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    bool32 read_end = OG_FALSE;

    if (rst_agent_read_head(bak, BAK_PKG_FILE_START, &bak->remote.remain_data_size, &read_end) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (bak_agent_recv(bak, (char *)start_msg, sizeof(bak_start_msg_t)) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (bak_agent_send_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
        return OG_ERROR;
    }
    OG_LOG_RUN_INF("[RESTORE]: type:%u, file_id:%u, curr_file_index:%u", start_msg->type, start_msg->file_id,
        start_msg->curr_file_index);
    if (start_msg->type != BAK_MSG_TYPE_HEAD) {
        if (rst_restart_from_file_end(bak, start_msg)) {
            OG_LOG_RUN_INF("[BUILD] break-point build restart from end of file [%u/%u]", bak->file_count,
                start_msg->curr_file_index);
            if (bak->file_count != start_msg->curr_file_index - 1) {
                OG_LOG_RUN_INF("[BUILD] break-point build restart from unexpected position");
                return OG_ERROR;
            }
            bak->file_count++;
        }
        bak->need_check = OG_FALSE;
        bak->files[bak->file_count].type = rst_calc_file_type(start_msg->type);
        bak->files[bak->file_count].id = start_msg->file_id;
        bak->curr_file_index = bak->file_count;
        bak->file_count++;
    }

    return OG_SUCCESS;
}

static bak_stage_t rst_calc_stage(uint32 type)
{
    bak_stage_t stage;

    switch (type) {
        case BAK_MSG_TYPE_CTRL:
            stage = BACKUP_CTRL_STAGE;
            break;
        case BAK_MSG_TYPE_DATA:
            stage = BACKUP_DATA_STAGE;
            break;
        case BAK_MSG_TYPE_HEAD:
            stage = BACKUP_HEAD_STAGE;
            break;
        default:
            stage = BACKUP_LOG_STAGE;
            break;
    }

    return stage;
}

static status_t rst_build_receive_files(knl_session_t *session)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    bak_start_msg_t start_msg;
    bak_stage_t stage = BACKUP_CTRL_STAGE;

    bak->head_is_built = OG_FALSE;
    while (!bak->failed && stage != BACKUP_HEAD_STAGE) {
        if (bak_check_session_status(session) != OG_SUCCESS) {
            return OG_ERROR;
        }

        if (rst_read_start_msg(session, &start_msg) != OG_SUCCESS) {
            return OG_ERROR;
        }

        stage = rst_calc_stage(start_msg.type);
        if (stage == BACKUP_HEAD_STAGE) {
            bak->remote.remain_data_size = 0;
            bak->progress.stage = BACKUP_READ_FINISHED;
            break;
        }

        if (bak->progress.stage != stage) {
            bak->progress.stage = stage;
        }

        bak_file_type_t type = rst_calc_file_type(start_msg.type);
        if (type >= BACKUP_DATA_FILE && type <= BACKUP_ARCH_FILE && bak_paral_task_enable(session)) {
            if (rst_stream_read_file(session) != OG_SUCCESS) {
                return OG_ERROR;
            }
        } else {
            if (rst_read_file(session, bak->curr_file_index) != OG_SUCCESS) {
                return OG_ERROR;
            }
        }

        if (bak_agent_send_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    return bak->failed ? OG_ERROR : OG_SUCCESS;
}

static status_t rst_build_keyfile(knl_session_t *session, bak_head_t *head, int32 read_size)
{
    return OG_SUCCESS;
}

static status_t rst_build_head(knl_session_t *session)
{
    bak_context_t *ctx = &session->kernel->backup_ctx;
    bak_t *bak = &ctx->bak;
    int32 read_size;
    bool32 read_end = OG_FALSE;
    int32 buf_size = sizeof(bak_head_t);
    bak_head_t *head = (bak_head_t *)bak->backup_buf;
    int32 expect_size = (int32)sizeof(bak_head_t);

    if (rst_read_data(ctx, (void *)head, buf_size, &read_size, &read_end, 0) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (rst_read_check_size(read_size, expect_size, "backupset head") != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (bak->is_building) {
        if (rst_build_keyfile(session, head, read_size) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[BUILD] rst build keyfile failed.");
            return OG_ERROR;
        }
    }

    if (rst_set_head(session, head, OG_FALSE) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (rst_wait_agent_process(bak) != OG_SUCCESS) {
        return OG_ERROR;
    }
    bak->head_is_built = OG_TRUE;

    return OG_SUCCESS;
}

static status_t build_analyze_entry(knl_session_t *session, log_entry_t *log)
{
    if (RD_TYPE_IS_ENTER_PAGE(log->type)) {
        rd_enter_page_t *redo = (rd_enter_page_t *)log->data;
        page_id_t page_id = MAKE_PAGID(redo->file, redo->page);
        if (build_aly_add_page(session, page_id) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }
    return OG_SUCCESS;
}

static status_t build_analyze_group(knl_session_t *session, log_group_t *group)
{
    uint32 offset = sizeof(log_group_t);
    log_entry_t *log = NULL;

    while (offset < LOG_GROUP_ACTUAL_SIZE(group)) {
        log = (log_entry_t *)((char *)group + offset);
        if (build_analyze_entry(session, log) != OG_SUCCESS) {
            return OG_ERROR;
        }
        offset += log->size;
    }
    return OG_SUCCESS;
}

static status_t build_analyze_batch(knl_session_t *session, log_batch_t *batch)
{
    knl_session_t *se = session->kernel->sessions[SESSION_ID_KERNEL];
    log_context_t *ogx = &session->kernel->redo_ctx;
    log_cursor_t cursor;

    rcy_init_log_cursor(&cursor, batch);

    log_group_t *group = log_fetch_group(ogx, &cursor);
    while (group != NULL) {
        if (build_analyze_group(se, group) != OG_SUCCESS) {
            return OG_ERROR;
        }

        group = log_fetch_group(ogx, &cursor);
    }

    return OG_SUCCESS;
}

static inline void build_next_file(log_point_t *point, bool32 *need_more_log)
{
    point->asn++;
    point->block_id = 0;
    *need_more_log = OG_TRUE;
}

static bool32 build_check_batch(log_point_t *point, log_batch_t *batch, bool32 *need_more_log)
{
    if (batch->head.magic_num != LOG_MAGIC_NUMBER || !LFN_IS_CONTINUOUS(batch->head.point.lfn, point->lfn)) {
        build_next_file(point, need_more_log);
        return OG_FALSE;
    }

    return OG_TRUE;
}

static status_t build_analyze(knl_session_t *session, log_point_t *point, log_batch_t *batch, uint32 block_size,
    uint32 data_size_input, bool32 *need_more_log)
{
    uint32 data_size = data_size_input;
    log_context_t *ogx = &session->kernel->redo_ctx;
    log_batch_tail_t *tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));
    bool32 first_batch = OG_TRUE;
    if (data_size == 0) {
        build_next_file(point, need_more_log);
        return OG_SUCCESS;
    }

    while (data_size >= sizeof(log_batch_id_t)) {
        if (!build_check_batch(point, batch, need_more_log)) {
            return OG_SUCCESS;
        }
        if (data_size < batch->space_size) {
            if (first_batch) {
                *need_more_log = OG_FALSE;
                OG_LOG_RUN_ERR("[BUILD] analyze failed, invalid batch(%u,%u).", point->asn, point->block_id);
                return OG_ERROR;
            }
            *need_more_log = OG_TRUE;
            return OG_SUCCESS;
        }

        if (!rcy_validate_batch(batch, tail)) {
            *need_more_log = OG_FALSE;
            OG_LOG_RUN_ERR("[BUILD] analyze failed, invalid batch(%u,%u).", point->asn, point->block_id);
            return OG_ERROR;
        }
        first_batch = OG_FALSE;
        if (rcy_verify_checksum(session, batch) != OG_SUCCESS) {
            *need_more_log = OG_FALSE;
            OG_LOG_RUN_ERR("[BUILD] analyze failed");
            return OG_ERROR;
        }

        if (batch->encrypted) {
            if (log_decrypt(session, batch, ogx->logwr_cipher_buf, ogx->logwr_cipher_buf_size) != OG_SUCCESS) {
                *need_more_log = OG_FALSE;
                OG_LOG_RUN_ERR("[BUILD] analyze failed");
                return OG_ERROR;
            }
        }

        if (build_analyze_batch(session, batch) != OG_SUCCESS) {
            *need_more_log = OG_FALSE;
            OG_LOG_RUN_ERR("[BUILD] analyze failed");
            return OG_ERROR;
        }

        data_size -= batch->space_size;
        point->lfn = batch->head.point.lfn;

        point->block_id += batch->space_size / block_size;
        batch = (log_batch_t *)((char *)batch + batch->space_size);
        tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));
    }

    *need_more_log = OG_TRUE;
    return OG_SUCCESS;
}

static status_t build_analyze_redo(knl_session_t *session, log_point_t *curr_point)
{
    aligned_buf_t log_buf  = session->kernel->backup_ctx.bak.log_buf;
    log_batch_t *batch = NULL;
    bool32 need_more_log = OG_FALSE;
    uint32 data_size = 0;
    uint32 block_size;

    while (bak_load_log_batch(session, curr_point, &data_size, &log_buf, &block_size) == OG_SUCCESS) {
        batch = (log_batch_t *)log_buf.aligned_buf;

        if (build_analyze(session, curr_point, batch, block_size, data_size, &need_more_log) != OG_SUCCESS) {
            cm_aligned_free(&log_buf);
            OG_THROW_ERROR(ERR_INVALID_OPERATION, ", analyze log failed");
            return OG_ERROR;
        }

        if (!need_more_log) {
            break;
        }
    }

    return OG_SUCCESS;
}

static status_t build_repair_prepare(knl_session_t *session, uint64 *base_lsn)
{
    reset_log_t resetlog = session->kernel->lrcv_ctx.primary_resetlog;
    aligned_buf_t log_buf = session->kernel->backup_ctx.bak.log_buf;
    log_point_t target_point;
    uint32 data_size = 0;
    log_point_t *lrp = &dtc_my_ctrl(session)->lrp_point;

    target_point.rst_id = resetlog.rst_id - 1;
    target_point.asn = resetlog.last_asn;
    target_point.lfn = resetlog.last_lfn;
    target_point.block_id = 0;

    OG_LOG_RUN_INF("[REPAIR] analysis primary resetlog rstid-asn-lfn: %u-%u-%llu", resetlog.rst_id, resetlog.last_asn,
        resetlog.last_lfn);
    if (log_load(session) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (build_load_batch(session, &target_point, &data_size, &log_buf) != OG_SUCCESS) {
        OG_THROW_ERROR(ERR_INVALID_OPERATION, ", can not find resetlog point in standby.");
        return OG_ERROR;
    }

    if (data_size != 0) {
        log_batch_t *batch = (log_batch_t *)log_buf.aligned_buf;
        *base_lsn = rcy_fetch_batch_lsn(session, batch);
        target_point.block_id = batch->head.point.block_id;
    } else {
        OG_THROW_ERROR(ERR_INVALID_OPERATION, ", can not find resetlog point in standby.");
        return OG_ERROR;
    }

    target_point.lfn -= 1;

    OG_LOG_RUN_INF("[REPAIR] analysis start point rstid-asn-lfn: %u-%u-%llu", target_point.rst_id, target_point.asn,
        (uint64)target_point.lfn);
    if (build_analyze_redo(session, &target_point) != OG_SUCCESS) {
        return OG_ERROR;
    }
    OG_LOG_RUN_INF("[REPAIR] analysis end point rstid-asn-lfn: %u-%u-%llu", target_point.rst_id, target_point.asn,
        (uint64)target_point.lfn);

    if (target_point.lfn < lrp->lfn) {
        OG_THROW_ERROR(ERR_INVALID_OPERATION, ", analysis end lfn is smaller than lrp lfn.");
        return OG_ERROR;
    }
    return OG_SUCCESS;
}

static status_t rst_build_get_base_lsn(knl_session_t *session, uint64 *base_lsn)
{
    log_point_t start_point = dtc_my_ctrl(session)->rcy_point;
    log_batch_t *batch = NULL;
    log_batch_tail_t *tail = NULL;
    uint32 data_size = 0;
    bak_context_t *ogx = &session->kernel->backup_ctx;
    bak_t *bak = &ogx->bak;
    database_t *db = &session->kernel->db;
    reset_log_t rst_log = db->ctrl.core.resetlogs;
    aligned_buf_t log_buf;
    uint32 block_size;

    if (log_load(session) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (dtc_my_ctrl(session)->rcy_point.lfn == dtc_my_ctrl(session)->lrp_point.lfn) {
        *base_lsn = DB_CURR_LSN(session);
        return OG_SUCCESS;
    }

    if (cm_aligned_malloc(OG_MAX_BATCH_SIZE, "backup log buffer", &log_buf) != OG_SUCCESS) {
        return OG_ERROR;
    }

    for (;;) {
        if (bak_load_log_batch(session, &start_point, &data_size, &log_buf, &block_size) != OG_SUCCESS) {
            cm_aligned_free(&log_buf);
            return OG_ERROR;
        }

        batch = (log_batch_t *)log_buf.aligned_buf;
        if (data_size >= sizeof(log_batch_t) && data_size >= batch->size) {
            tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));
            if (rcy_validate_batch(batch, tail)) {
                break;
            }
        }

        start_point.asn++;
        start_point.rst_id = bak_get_rst_id(bak, start_point.asn, &(rst_log));
        start_point.block_id = 0;
    }

    *base_lsn = rcy_fetch_batch_lsn(session, batch);
    cm_aligned_free(&log_buf);
    return OG_SUCCESS;
}

static bool32 rst_logfile_not_changed(knl_instance_t *kernel, log_file_ctrl_t *log_old, log_file_ctrl_t *log_new)
{
    char name[OG_FILE_NAME_BUFFER_SIZE];
    errno_t ret;

    ret = strcpy_sp(name, OG_FILE_NAME_BUFFER_SIZE, log_new->name);
    knl_securec_check(ret);

    if (LOG_IS_DROPPED(log_old->flg) != LOG_IS_DROPPED(log_new->flg)) {
        OG_LOG_RUN_ERR("[BUILD] primary's log flag has changed old: %u, new: %u", log_old->flg, log_new->flg);
        return OG_FALSE;
    }
    if (db_change_storage_path(&kernel->attr.log_file_convert, name, OG_FILE_NAME_BUFFER_SIZE) != OG_SUCCESS) {
        return OG_FALSE;
    }
    if (!cm_str_equal(log_old->name, name)) {
        OG_LOG_RUN_ERR("[BUILD] primary's log amount/name has changed old: %s, new: %s", log_old->name, name);
        return OG_FALSE;
    }

    return OG_TRUE;
}

static bool32 rst_datafile_not_changed(knl_instance_t *kernel, datafile_ctrl_t *df_old, datafile_ctrl_t *df_new)
{
    char name[OG_FILE_NAME_BUFFER_SIZE];
    errno_t ret;

    ret = strcpy_sp(name, OG_FILE_NAME_BUFFER_SIZE, df_new->name);
    knl_securec_check(ret);

    if (df_old->used != df_new->used || (df_old->flag & DATAFILE_FLAG_ONLINE) != (df_new->flag & DATAFILE_FLAG_ONLINE))
        {
        OG_LOG_RUN_ERR("[BUILD] primary's data flag has changed old_used: %u, new_used: %u, old_flag: %u,"
            "new_flag: %u", df_old->used, df_new->used, df_old->flag, df_new->flag);
        return OG_FALSE;
    }

    if (df_old->create_version != df_new->create_version) {
        OG_LOG_RUN_ERR("[BUILD] primary's data version has changed old: %u, new: %u", df_old->create_version,
            df_new->create_version);
        return OG_FALSE;
    }
    if (db_change_storage_path(&kernel->attr.data_file_convert, name, OG_FILE_NAME_BUFFER_SIZE) != OG_SUCCESS) {
        return OG_FALSE;
    }
    if (!cm_str_equal(df_old->name, name)) {
        OG_LOG_RUN_ERR("[BUILD] primary's data amount/name has changed old: %s, new: %s", df_old->name, name);
        return OG_FALSE;
    }

    return OG_TRUE;
}


bool32 rst_db_files_not_changed(knl_session_t *session, ctrl_page_t *new_ctrl)
{
    knl_instance_t *kernel = session->kernel;
    datafile_ctrl_t *df_old = NULL;
    datafile_ctrl_t *df_new = NULL;
    log_file_ctrl_t *log_old = NULL;
    log_file_ctrl_t *log_new = NULL;
    uint32 offset = CTRL_LOG_SEGMENT;
    uint32 count;
    uint32 i;

    for (i = 0; i < OG_MAX_LOG_FILES; i++) {
        log_new = (log_file_ctrl_t *)db_get_ctrl_item(new_ctrl, i, sizeof(log_file_ctrl_t), offset);
        log_old = MY_LOGFILE_SET(session)->items[i].ctrl;
        if (rst_logfile_not_changed(kernel, log_old, log_new) != OG_TRUE) {
            return OG_FALSE;
        }
    }

    count = CTRL_MAX_BUF_SIZE / sizeof(log_file_ctrl_t);
    offset = offset + (OG_MAX_LOG_FILES - 1) / count + 1;

    count = CTRL_MAX_BUF_SIZE / sizeof(space_ctrl_t);
    offset = offset + (OG_MAX_SPACES - 1) / count + 1;

    for (i = 0; i < OG_MAX_DATA_FILES; i++) {
        df_new = (datafile_ctrl_t *)db_get_ctrl_item(new_ctrl, i, sizeof(datafile_ctrl_t), offset);
        df_old = kernel->db.datafiles[i].ctrl;
        if (rst_datafile_not_changed(kernel, df_old, df_new) != OG_TRUE) {
            return OG_FALSE;
        }
    }

    return OG_TRUE;
}

status_t rst_build_update_logfiles(knl_session_t *session, bak_process_t *ogx)
{
    log_file_ctrl_t *logfile = NULL;
    int32 *handle = NULL;
    bool32 is_dbstor = knl_dbs_is_enable_dbs();

    for (uint32 i = 0; i < dtc_my_ctrl(session)->log_hwm; i++) {
        logfile = MY_LOGFILE_SET(session)->items[i].ctrl;
        handle = &MY_LOGFILE_SET(session)->items[i].handle;
        if (LOG_IS_DROPPED(logfile->flg)) {
            if (strlen(ogx->logfile_name[i]) == 0) {
                continue;
            }

            cm_close_device(ogx->log_type[i], handle);
            if (cm_remove_device(ogx->log_type[i], ogx->logfile_name[i]) != OG_SUCCESS) {
                return OG_ERROR;
            }
            continue;
        }

        if (cm_exist_device(logfile->type, logfile->name) && *handle != OG_INVALID_HANDLE) {
            cm_close_device(ogx->log_type[i], handle);
            if (cm_open_device(logfile->name, logfile->type, knl_redo_io_flag(session), handle) != OG_SUCCESS) {
                OG_LOG_RUN_ERR("[RESTORE] failed to open %s ", logfile->name);
                return OG_ERROR;
            }
            continue;
        }

        if (strlen(ogx->logfile_name[i]) > 0) {
            cm_close_device(ogx->log_type[i], handle);
            if (cm_remove_device(ogx->log_type[i], ogx->logfile_name[i]) != OG_SUCCESS) {
                return OG_ERROR;
            }
        }

        if (cm_create_device(logfile->name, logfile->type, knl_redo_io_flag(session), handle) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[RESTORE] failed to create %s ", logfile->name);
            return OG_ERROR;
        }
        if (is_dbstor) {
            break;
        }
    }

    return OG_SUCCESS;
}

static status_t rst_load_ctrlfile(knl_session_t *session)
{
    bak_context_t *ogx = &session->kernel->backup_ctx;
    bak_t *bak = &ogx->bak;
    text_t ctrlfiles;
    database_t *db = &session->kernel->db;
    dtc_node_ctrl_t *node = dtc_my_ctrl(session);

    char *param = cm_get_config_value(session->kernel->attr.config, "CONTROL_FILES");
    cm_str2text(param, &ctrlfiles);
    if (ctrlfiles.len == 0) {
        OG_LOG_RUN_INF("[BUILD] ctrl file does not exist");
        OG_THROW_ERROR(ERR_LOAD_CONTROL_FILE, "ctrl file does not exist");
        return OG_ERROR;
    }

    if (db_load_ctrlspace(session, &ctrlfiles) != OG_SUCCESS) {
        return OG_ERROR;
    }

    node->archived_start = 0;
    node->archived_end = 0;
    db->ctrl.core.build_completed = OG_FALSE;
    db->ctrl.core.db_role = bak->is_building ? REPL_ROLE_PHYSICAL_STANDBY : REPL_ROLE_PRIMARY;
    bak->ctrlfile_completed = OG_TRUE;

    return OG_SUCCESS;
}

static status_t rst_check_ctrl_file(knl_session_t *session)
{
    bak_context_t *ogx = &session->kernel->backup_ctx;
    bak_t *bak = &ogx->bak;
    bool32 read_end = OG_FALSE;
    int32 read_size;

    if (!BAK_IS_FULL_BUILDING(bak) || bak->is_first_link) {
        return OG_SUCCESS;
    }

    OG_LOG_RUN_INF("[BUILD] start read ctrl file");
    if (rst_read_data(ogx, bak->ctrl_data_buf, CTRL_MAX_PAGES(session) * OG_DFLT_CTRL_BLOCK_SIZE,
                      &read_size, &read_end, 0) != OG_SUCCESS) {
        return OG_ERROR;
    }
    OG_LOG_RUN_INF("[BUILD] ctrl file read_size : %u", read_size);

    if (bak_agent_send_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (rst_load_ctrlfile(session) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (!db_sysdata_version_is_equal(session, OG_FALSE)) {
        return OG_ERROR;
    }

    OG_LOG_RUN_INF("[BUILD] start check file amount/name");
    if (!rst_db_files_not_changed(session, (ctrl_page_t *)bak->ctrl_data_buf)) {
        OG_LOG_RUN_ERR("[BUILD] primary's log/data amount/name has changed");
        if (rst_delete_track_file(session, bak, OG_FALSE) != OG_SUCCESS) {
            return OG_ERROR;
        }

        OG_THROW_ERROR(ERR_INVALID_OPERATION, ": primary's log/data amount/name has changed");
        return OG_ERROR;
    }
    OG_LOG_RUN_INF("[BUILD] check file amount/name successfully");

    return OG_SUCCESS;
}

static status_t rst_build_restore(knl_session_t *session)
{
    if (rst_restore_config_param(session) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (rst_check_ctrl_file(session) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (rst_build_receive_files(session) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (rst_build_head(session) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

static char *bak_get_compress_name(compress_algo_e compress)
{
    switch (compress) {
        case COMPRESS_NONE:
            return "NONE";
        case COMPRESS_ZLIB:
            return "ZLIB";
        case COMPRESS_ZSTD:
            return "ZSTD";
        case COMPRESS_LZ4:
            return "LZ4";
        default:
            cm_assert(OG_FALSE);
            return "NULL";
    }
}

bool32 bak_parameter_is_valid(build_progress_t *build_progress)
{
    uint32 stage = bak_get_build_stage(&build_progress->stage);
    if (stage != BUILD_LOG_STAGE && stage != BUILD_DATA_STAGE) {
        return OG_FALSE;
    }

    return OG_TRUE;
}

static status_t bak_obtain_build_progress(bak_t *bak, bak_process_t *proc, const char *file_name)
{
    build_progress_t *build_progress = &bak->progress.build_progress;
    int32 handle = OG_INVALID_HANDLE;

    if (cm_open_file(file_name, O_BINARY | O_RDWR, &handle) != OG_SUCCESS) {
        OG_LOG_RUN_ERR("[BACKUP] failed to open %s ", file_name);
        return OG_ERROR;
    }

    if (cm_read_file(handle, build_progress, sizeof(build_progress_t), NULL) != OG_SUCCESS) {
        cm_close_file(handle);
        OG_LOG_RUN_ERR("[BACKUP] failed to read %s ", file_name);
        return OG_ERROR;
    }
    cm_close_file(handle);
    OG_LOG_RUN_INF("[BUILD] write build progress [%u/%u/%llu/%u/%u/%u/%d]", build_progress->stage,
                   build_progress->file_id, build_progress->data_offset, build_progress->asn,
                   build_progress->curr_file_index, build_progress->last_file_index, build_progress->start_time);

    if (!bak_parameter_is_valid(build_progress)) {
        OG_LOG_RUN_ERR("[BACKUP] parameters are not effective in %s ", file_name);
        return OG_ERROR;
    }

    bak->file_count = build_progress->curr_file_index;

    return OG_SUCCESS;
}

static status_t bak_agent_send_build_ctrl(knl_session_t *session, bak_t *bak, bak_process_t *proc)
{
    bak_agent_head_t head;
    bak_attr_t *attr = &bak->record.attr;
    build_param_ctrl_t ctrl;
    build_progress_t *build_progress = &bak->progress.build_progress;
    char bak_process_file[OG_FILE_NAME_BUFFER_SIZE];
    errno_t ret;
    int32 primary_start_time;
    bool32 read_end = OG_FALSE;
    int32 read_size;

    head.ver = BAK_AGENT_PROTOCOL;
    head.serial_number = bak->remote.serial_number++;
    head.cmd = BAK_PKG_START;
    head.len = sizeof(bak_agent_head_t) + sizeof(build_param_ctrl_t) + sizeof(build_progress_t);
    head.flags = 0;
    head.reserved = 0;
    ctrl.compress = attr->compress;
    ctrl.compress_level = bak->compress_ctx.compress_level;
    ctrl.parallelism = bak->proc_count;
    ctrl.is_increment = bak->record.is_increment;
    ctrl.base_lsn = attr->base_lsn;
    ctrl.is_repair = bak->record.is_repair;
    ctrl.buffer_size = bak->backup_buf_size;

    OG_LOG_DEBUG_INF("[BACKUP] send type %d", head.cmd);
    // send head
    if (bak_agent_send(bak, (char *)&head, sizeof(bak_agent_head_t)) != OG_SUCCESS) {
        return OG_ERROR;
    }
    // send compress param
    if (bak_agent_send(bak, (char *)&ctrl, sizeof(build_param_ctrl_t)) != OG_SUCCESS) {
        return OG_ERROR;
    }
    // send break-point info if necessary
    ret = snprintf_s(bak_process_file, OG_FILE_NAME_BUFFER_SIZE, OG_FILE_NAME_BUFFER_SIZE - 1, "%s/bak_process_file",
        session->kernel->home);
    knl_securec_check_ss(ret);
    if (cm_file_exist((const char *)bak_process_file)) {
        if (bak_obtain_build_progress(bak, proc, bak_process_file) != OG_SUCCESS) {
            OG_THROW_ERROR(ERR_INVALID_OPERATION, ", building process file is not effective.");
            return OG_ERROR;
        }
        bak->need_check = OG_TRUE;
        bak->is_first_link = OG_FALSE;
    } else {
        ret = memset_sp(build_progress, sizeof(build_progress_t), 0, sizeof(build_progress_t));
        knl_securec_check(ret);
        bak->is_first_link = OG_TRUE;
    }

    if (bak_agent_send(bak, (char *)build_progress, sizeof(build_progress_t)) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (rst_agent_read(bak, (char *)&primary_start_time, sizeof(int32), &read_size, &read_end) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (bak->is_first_link) {
        build_progress->start_time = primary_start_time;
        OG_LOG_RUN_INF("[BUILD] first link [primary/standby] start time : [%d/%d]", primary_start_time,
            build_progress->start_time);
    } else {
        if (build_progress->start_time != primary_start_time) {
            OG_LOG_RUN_INF("[BUILD] [primary/standby] start time : [%d/%d]", primary_start_time,
                build_progress->start_time);
            OG_LOG_RUN_INF("[BUILD] primary has restart or timeout, only non-breakpoint building is allowed");
            if (rst_delete_track_file(session, bak, OG_FALSE) != OG_SUCCESS) {
                return OG_ERROR;
            }

            OG_THROW_ERROR(ERR_INVALID_OPERATION,
                "primary has restart or timeout, only non-breakpoint building is allowed");
            return OG_ERROR;
        }
        OG_LOG_RUN_INF("[BUILD] [primary/standby] start time : [%d/%d]", primary_start_time,
            build_progress->start_time);
    }

    return OG_SUCCESS;
}

static status_t rst_build_prepare_basefiles(knl_session_t *session, bak_process_t *ogx)
{
    knl_instance_t *kernel = session->kernel;
    datafile_t *df = NULL;
    log_file_ctrl_t *log = NULL;
    uint32 i;
    errno_t ret;

    for (i = 0; i < OG_MAX_DATA_FILES; i++) {
        ogx->datafile_size[i] = OG_INVALID_INT64;
        df = &kernel->db.datafiles[i];
        if (!df->ctrl->used || !DATAFILE_IS_ONLINE(df)) {
            ogx->datafile_name[i][0] = '\0';
            continue;
        }

        if (cm_open_device(df->ctrl->name, df->ctrl->type, knl_io_flag(session), &ogx->datafiles[i]) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[BUILD] failed to open %s ", df->ctrl->name);
            return OG_ERROR;
        }

        ret = strcpy_sp(ogx->datafile_name[i], OG_FILE_NAME_BUFFER_SIZE, df->ctrl->name);
        knl_securec_check(ret);
        ogx->file_type[i] = df->ctrl->type;
        ogx->datafile_version[i] = df->ctrl->create_version;

        ogx->datafile_size[i] = cm_device_size(cm_device_type(ogx->datafile_name[i]), ogx->datafiles[i]);
        if (ogx->datafile_size[i] == -1) {
            OG_THROW_ERROR(ERR_SEEK_FILE, 0, SEEK_END, errno);
            return OG_ERROR;
        }
    }

    for (i = 0; i < OG_MAX_LOG_FILES; i++) {
        ogx->logfile_name[i][0] = '\0';
        ogx->log_type[i] = 0;
    }

    for (i = 0; i < MY_LOGFILE_SET(session)->logfile_hwm; i++) {
        log = MY_LOGFILE_SET(session)->items[i].ctrl;
        if (LOG_IS_DROPPED(log->flg)) {
            continue;
        }

        ret = strcpy_sp(ogx->logfile_name[i], OG_FILE_NAME_BUFFER_SIZE, log->name);
        knl_securec_check(ret);
        ogx->log_type[i] = log->type;
    }

    return OG_SUCCESS;
}

static status_t bak_check_keep_alive(knl_session_t *session)
{
    bak_context_t *ogx = &session->kernel->backup_ctx;
    uint32 build_keep_alive_timeout = session->kernel->attr.build_keep_alive_timeout;
    bak_t *bak = &ogx->bak;

    if (bak->record.attr.level != 0) {
        return OG_SUCCESS;
    }

    // will not retry while timeout
    bak->build_retry_time++;
    if (bak->build_retry_time == build_keep_alive_timeout) {
        bak->need_retry = OG_FALSE;
        OG_THROW_ERROR(ERR_INVALID_OPERATION, ", please check run log for information of building timeout.");
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

static status_t bak_agent_send_diff_pages(knl_session_t *session)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    bak_agent_head_t head;

    uint32 page_count = BUILD_ALY_MAX_ITEM - bak->build_aly_free_list.count;
    knl_panic(page_count == bak->page_count);

    OG_LOG_RUN_INF("[BUILD] start to send page list to primary, page_count: %u", page_count);
    head.ver = BAK_AGENT_PROTOCOL;
    head.cmd = BAK_PKG_DATA;
    head.len = sizeof(bak_agent_head_t) + page_count * sizeof(page_id_t);
    head.serial_number = 0;
    head.flags = 0;
    head.reserved = 0;

    if (bak_agent_send(bak, (char *)&head, sizeof(bak_agent_head_t)) != OG_SUCCESS) {
        return OG_ERROR;
    }
    if (bak_agent_send(bak, (char *)bak->build_aly_pages, page_count * sizeof(page_id_t)) != OG_SUCCESS) {
        return OG_ERROR;
    }
    if (bak_agent_wait_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
        return OG_ERROR;
    }
    OG_LOG_RUN_INF("[BUILD] send page list to primary successfully");
    return OG_SUCCESS;
}

inline static void bak_brain_close_file(int32 *handle, uint32 count)
{
    for (uint32 i = 0; i < count; i++) {
        cm_close_file(handle[i]);
    }
}

static status_t bak_brain_reset_page(knl_session_t *session)
{
    int32 handles[OG_MAX_DATA_FILES];
    errno_t ret = memset_sp(handles, OG_MAX_DATA_FILES * sizeof(int32), -1, OG_MAX_DATA_FILES * sizeof(int32));
    knl_securec_check(ret);
    build_analyse_item_t *item = NULL;
    bak_t *bak = &session->kernel->backup_ctx.bak;
    char *buf = session->kernel->attr.xpurpose_buf;
    ret = memset_sp(buf, DEFAULT_PAGE_SIZE(session), 0, DEFAULT_PAGE_SIZE(session));
    knl_securec_check(ret);

    for (uint32 i = 0; i < bak->page_count; i++) {
        item = &bak->build_aly_items[i];
        if (item->page_id->file == OG_INVALID_FILEID) {
            continue;
        }

        datafile_t *df = DATAFILE_GET(session, item->page_id->file);
        if (cm_open_device(df->ctrl->name, df->ctrl->type, knl_io_flag(session), &handles[item->page_id->file]) !=
            OG_SUCCESS) {
            OG_LOG_RUN_ERR("[REPAIR] failed to open datafile %s", df->ctrl->name);
            return OG_ERROR;
        }
        int64 offset = (int64)item->page_id->page * DEFAULT_PAGE_SIZE(session);
        if (cm_write_device(df->ctrl->type, handles[item->page_id->file], offset, buf, DEFAULT_PAGE_SIZE(session)) !=
            OG_SUCCESS) {
            OG_LOG_RUN_ERR("[REPAIR] failed to write datafile %s", df->ctrl->name);
            bak_brain_close_file(handles, OG_MAX_DATA_FILES);
            return OG_ERROR;
        }
        OG_LOG_DEBUG_INF("[REPAIR] brain reset page %u-%u", item->page_id->file, item->page_id->page);
        item->page_id->file = OG_INVALID_FILEID;
    }

    for (uint32 i = 0; i < OG_MAX_DATA_FILES; i++) {
        if (handles[i] == OG_INVALID_HANDLE) {
            continue;
        }
        if (db_fdatasync_file(session, handles[i]) != OG_SUCCESS) {
            bak_brain_close_file(handles, OG_MAX_DATA_FILES);
            OG_LOG_RUN_ERR("[REPAIR] failed to fdatasync datafile");
            return OG_ERROR;
        }
    }

    bak_brain_close_file(handles, OG_MAX_DATA_FILES);
    return OG_SUCCESS;
}

static status_t bak_build_from_remote(knl_session_t *session)
{
    bak_context_t *ogx = &session->kernel->backup_ctx;
    bak_t *bak = &ogx->bak;
    bak_process_t *proc = &ogx->process[BAK_COMMON_PROC];

    if (bak_check_keep_alive(session) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (bak_build_start(session) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (bak_agent_send_build_ctrl(session, bak, proc) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (bak->record.is_repair && bak_agent_send_diff_pages(session) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (bak->record.attr.level > 0) {
        if (rst_build_prepare_basefiles(session, &ogx->process[BAK_COMMON_PROC]) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    if (rst_build_restore(session) != OG_SUCCESS) {
        if (!bak->need_retry) {
            OG_LOG_RUN_INF("[RESTORE] restore failed, no more retry");
            (void)bak_agent_send_pkg(bak, BAK_PKG_ACK);
        }
        return OG_ERROR;
    }

    if (bak_agent_send_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
        return OG_ERROR;
    }

    rst_wait_write_end(bak);

    if (bak->record.is_repair) {
        if (bak_brain_reset_page(session) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    return bak->failed ? OG_ERROR : OG_SUCCESS;
}

static status_t rst_build_clean_arch(knl_session_t *session)
{
    uint32 arch_num = 0;
    uint32 clean_locator = 0;
    arch_ctrl_t *arch_ctrl = NULL;
    arch_proc_context_t *proc_ctx = &session->kernel->arch_ctx.arch_proc[0];
    dtc_node_ctrl_t *node = dtc_my_ctrl(session);

    arch_get_files_num(session, ARCH_DEFAULT_DEST, session->kernel->id, &arch_num);

    for (uint32 i = 0; i < arch_num; i++) {
        clean_locator = (node->archived_start + i) % OG_MAX_ARCH_NUM;
        arch_ctrl = db_get_arch_ctrl(session, clean_locator, session->kernel->id);
        if (arch_ctrl->recid == 0) {
            continue;
        }

        if (!cm_exist_device(cm_device_type(arch_ctrl->name), arch_ctrl->name)) {
            OG_LOG_RUN_INF("[RESTORE] archive file %s does not exist", arch_ctrl->name);
        } else {
            if (cm_remove_device(cm_device_type(arch_ctrl->name), arch_ctrl->name) != OG_SUCCESS) {
                OG_LOG_RUN_ERR("[RESTORE] Failed to remove archive file %s", arch_ctrl->name);
                return OG_ERROR;
            }
        }

        arch_ctrl->recid = 0;
    }
    proc_ctx->curr_arch_size = 0;
    return OG_SUCCESS;
}

static status_t bak_build_prepare_attr(knl_session_t *session, build_param_ctrl_t *ctrl)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    bak_attr_t *attr = &bak->record.attr;

    attr->compress = ctrl->compress;
    attr->level = (ctrl->is_increment || ctrl->is_repair) ? OG_TRUE : OG_FALSE;
    bak->compress_ctx.compress_level = ctrl->compress_level;
    bak->record.is_repair = ctrl->is_repair;
    bak->record.is_increment = ctrl->is_increment;
    bak->proc_count = (ctrl->parallelism == 0) ? BUILD_DEFAULT_PARALLELISM : ctrl->parallelism;
    bak->backup_buf_size = ctrl->buffer_size;

    if (ctrl->base_lsn != OG_INVALID_LSN) {
        attr->base_lsn = ctrl->base_lsn;
        return OG_SUCCESS;
    }

    if (ctrl->is_increment) {
        if (rst_build_get_base_lsn(session, &ctrl->base_lsn) != OG_SUCCESS) {
            return OG_ERROR;
        }

        if (rst_build_clean_arch(session) != OG_SUCCESS) {
            return OG_ERROR;
        }
    } else if (ctrl->is_repair) {
        if (build_analyze_mem_init(session) != OG_SUCCESS) {
            return OG_ERROR;
        }

        if (build_repair_prepare(session, &ctrl->base_lsn) != OG_SUCCESS) {
            build_analyze_mem_free(session);
            return OG_ERROR;
        }

        if (rst_build_clean_arch(session) != OG_SUCCESS) {
            build_analyze_mem_free(session);
            return OG_ERROR;
        }
    }
    attr->base_lsn = ctrl->base_lsn;

    return OG_SUCCESS;
}

status_t bak_build_restore(knl_session_t *session, build_param_ctrl_t *ctrl)
{
    bak_context_t *ogx = &session->kernel->backup_ctx;
    lrcv_context_t *lrcv_ctx = &session->kernel->lrcv_ctx;
    bak_t *bak = &ogx->bak;

    OG_LOG_RUN_INF("[BUILD] build start, compress method %s, compress level %u, is increment %u, is repair %u, "
                   "buffer size %uM", bak_get_compress_name(ctrl->compress), ctrl->compress_level, ctrl->is_increment,
                   ctrl->is_repair, ctrl->buffer_size / SIZE_M(1));
    bak->need_retry = OG_FALSE;

    if (lrcv_ctx->thread.closed) {
        OG_THROW_ERROR(ERR_LRCV_NOT_READY);
        return OG_ERROR;
    }

    if (bak_set_running(session, ogx) != OG_SUCCESS) {
        OG_THROW_ERROR(ERR_BACKUP_IN_PROGRESS, "restore");
        return OG_ERROR;
    }

    if (bak_build_prepare_attr(session, ctrl) != OG_SUCCESS) {
        bak_unset_running(session, ogx);
        return OG_ERROR;
    }

    status_t status = bak_build_from_remote(session);
    if (status != OG_SUCCESS) {
        bak->failed = OG_TRUE;
    }

    if (!bak->need_retry) {
        OG_LOG_RUN_INF("[BUILD] backup remote finished, failed :%d", bak->failed);
    }

    if (bak->error_info.err_code != ERR_BACKUP_IN_PROGRESS && !bak->need_retry) {
        build_analyze_mem_free(session);
    }

    bak_build_end(session);
    return status;
}

static void bak_send_error(bak_t *bak, cs_pipe_t *pipe, int32 max_pkg_size)
{
    bak_agent_head_t head;
    int32 err_code;
    const char *error_msg = NULL;
    int32 receive_size;

    if (!bak->build_stopped && bak->need_retry) {
        return;
    }

    head.ver = BAK_AGENT_PROTOCOL;
    head.cmd = BAK_PKG_ERROR;

    if (bak->build_stopped) {
        err_code = ERR_BUILD_CANCELLED;
        error_msg = "Build has been cancelled";
    } else {
        cm_get_error(&err_code, &error_msg, NULL);
    }

    head.len = (uint32)(sizeof(bak_agent_head_t) + sizeof(int32) + strlen(error_msg) + 1);
    OG_LOG_RUN_WAR("[BACKUP] build failed, error:%d, %s", err_code, error_msg);

    if (cs_write_stream(pipe, (char *)&head, sizeof(bak_agent_head_t), max_pkg_size) != OG_SUCCESS) {
        return;
    }

    if (cs_write_stream(pipe, (char *)&err_code, sizeof(int32), max_pkg_size) != OG_SUCCESS) {
        return;
    }

    if (cs_write_stream(pipe, (char *)error_msg, (uint32)strlen(error_msg) + 1, max_pkg_size) != OG_SUCCESS) {
        return;
    }

    (void)cs_read_stream(pipe, (char *)&head, OG_NETWORK_IO_TIMEOUT, sizeof(bak_agent_head_t), (int32 *)&receive_size);
}

static status_t bak_build_wait_start(knl_session_t *session, cs_pipe_t *pipe, build_param_ctrl_t *ctrl,
    build_progress_t *build_progress)
{
    bak_agent_head_t head;
    int32 receive_size;

    if (cs_read_stream(pipe, (char *)&head, OG_NETWORK_IO_TIMEOUT, sizeof(bak_agent_head_t),
                       (int32 *)&receive_size) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (receive_size != sizeof(bak_agent_head_t)) {
        OG_THROW_ERROR(ERR_TCP_TIMEOUT_REMAIN, (int32)(sizeof(bak_agent_head_t) - receive_size));
        return OG_ERROR;
    }

    if (head.cmd != BAK_PKG_START) {
        OG_THROW_ERROR(ERR_NOT_EXPECTED_BACKUP_PACKET, BAK_PKG_START, head.cmd);
        return OG_ERROR;
    }

    if (cs_read_stream(pipe, (char *)ctrl, OG_NETWORK_IO_TIMEOUT, sizeof(build_param_ctrl_t),
                       (int32 *)&receive_size) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (receive_size != sizeof(build_param_ctrl_t)) {
        OG_THROW_ERROR(ERR_TCP_TIMEOUT_REMAIN, (int32)(sizeof(build_param_ctrl_t) - receive_size));
        return OG_ERROR;
    }

    if (cs_read_stream(pipe, (char *)build_progress, OG_NETWORK_IO_TIMEOUT, sizeof(build_progress_t),
        (int32 *)&receive_size) != OG_SUCCESS) {
        return OG_ERROR;
    }
    OG_LOG_RUN_INF("[BUILD] received standby build progress [%u/%u/%llu/%u/%u/%u/%d]", build_progress->stage,
        build_progress->file_id, build_progress->data_offset, build_progress->asn, build_progress->curr_file_index,
        build_progress->last_file_index, build_progress->start_time);

    if (receive_size != sizeof(build_progress_t)) {
        OG_THROW_ERROR(ERR_TCP_TIMEOUT_REMAIN, (int32)(sizeof(build_progress_t) - receive_size));
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

static status_t bak_set_break_point_params(knl_session_t *session, bak_context_t *ogx, build_progress_t *build_progress)
{
    build_progress_t *local_build_progress = &ogx->bak.progress.build_progress;
    int32 temp_time_start;
    errno_t ret;

    switch (build_progress->stage) {
        case BACKUP_START:
            ogx->bak.is_first_link = OG_TRUE;
            ogx->bak.need_check = OG_FALSE;
            ret = memset_sp(local_build_progress, sizeof(build_progress_t), 0, sizeof(build_progress_t));
            knl_securec_check(ret);
            local_build_progress->start_time = (int32)session->kernel->scn;
            break;
        case BACKUP_DATA_STAGE:
        case BACKUP_LOG_STAGE:
        case BACKUP_HEAD_STAGE:
            ogx->bak.is_first_link = OG_FALSE;
            ogx->bak.need_check = OG_TRUE;
            ogx->bak.send_buf.buf_size = 0;
            temp_time_start = local_build_progress->start_time;
            *local_build_progress = *build_progress;
            local_build_progress->start_time = temp_time_start;
            ogx->bak.file_count = build_progress->curr_file_index;
            ogx->bak.curr_file_index = build_progress->curr_file_index;
            ogx->bak.progress.stage = build_progress->stage;
            break;
        default:
            OG_LOG_RUN_ERR("[BUILD] primary DB received unexpected build process stage, %u",
                ogx->bak.progress.build_progress.stage);
            return OG_ERROR;
    }
    OG_LOG_RUN_INF("[BUILD] received standby build progress [%u/%u/%llu/%u/%u/%u/%d]", build_progress->stage,
        build_progress->file_id, build_progress->data_offset, build_progress->asn,
        build_progress->curr_file_index, build_progress->last_file_index, build_progress->start_time);

    return OG_SUCCESS;
}

static status_t bak_get_page_list(knl_session_t *session)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    uint32 read_size = 0;
    bool32 read_end = OG_FALSE;
    bak->remote.remain_data_size = 0;

    OG_LOG_RUN_INF("[BUILD] start to get page list from standby");
    if (rst_agent_read_head(bak, BAK_PKG_DATA, &read_size, &read_end) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (read_size > BUILD_ALY_MAX_PAGE_SIZE) {
        OG_LOG_RUN_ERR("[BUILD] get page list from standby failed, received size: %u, max size: %llu", read_size,
            BUILD_ALY_MAX_PAGE_SIZE);
        return OG_ERROR;
    }

    if (bak_agent_recv(bak, (char *)bak->build_aly_pages, (int32)read_size) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (bak_agent_send_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
        return OG_ERROR;
    }

    bak->page_count = read_size / sizeof(page_id_t);
    build_analyze_construct_hash(session);
    OG_LOG_RUN_INF("[BUILD] get page list from standby successfully, page count: %u", bak->page_count);
    return OG_SUCCESS;
}

status_t bak_build_backup(knl_session_t *session, cs_pipe_t *pipe, cs_packet_t *send_pack, cs_packet_t *recv_pack)
{
    bak_context_t *ogx = &session->kernel->backup_ctx;
    build_param_ctrl_t ctrl;
    bak_t *bak = &ogx->bak;
    build_progress_t *local_build_progress = &ogx->bak.progress.build_progress;
    build_progress_t build_progress;

    if (bak_build_wait_start(session, pipe, &ctrl, &build_progress) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (bak_precheck(session) != OG_SUCCESS) {
        bak_send_error(bak, pipe, (int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
        return OG_ERROR;
    }

    if (bak_set_build_running(session, ogx, &build_progress) != OG_SUCCESS) {
        bak_send_error(bak, pipe, (int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
        return OG_ERROR;
    }

    bak_set_build_params(&ogx->bak, pipe, send_pack, recv_pack, &ctrl);

    if (bak_set_break_point_params(session, ogx, &build_progress) != OG_SUCCESS) {
        bak_unset_build_running(session, ogx);
        bak_send_error(bak, pipe, (int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
        return OG_ERROR;
    }

    if (bak_agent_write(bak, (char *)&local_build_progress->start_time, sizeof(int32)) != OG_SUCCESS) {
        bak_unset_build_running(session, ogx);
        bak_send_error(bak, pipe, (int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
        return OG_ERROR;
    }

    if (ctrl.is_repair) {
        if (build_analyze_mem_init(session) != OG_SUCCESS) {
            bak_unset_build_running(session, ogx);
            bak_send_error(bak, pipe, (int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
            return OG_ERROR;
        }
        if (bak_get_page_list(session) != OG_SUCCESS) {
            bak_unset_build_running(session, ogx);
            build_analyze_mem_free(session);
            bak_send_error(bak, pipe, (int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
            return OG_ERROR;
        }
    }

    status_t status = bak_backup_proc(session);
    if (ctrl.is_repair) {
        build_analyze_mem_free(session);
    }

    if (status != OG_SUCCESS) {
        bak_send_error(bak, pipe, (int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
    }

    return status;
}

void bak_stream_read_prepare(knl_session_t *session, bak_process_t *process, datafile_t *datafile, uint32 sec_id)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    bak_assignment_t *assign_ctrl = &process->assign_ctrl;
    bak_ctrl_t *ctrl = &process->ctrl;
    errno_t ret;

    ret = strcpy_sp(ctrl->name, OG_FILE_NAME_BUFFER_SIZE, datafile->ctrl->name);
    knl_securec_check(ret);
    ctrl->type = datafile->ctrl->type;
    ctrl->offset = DEFAULT_PAGE_SIZE(session);

    assign_ctrl->start = ctrl->offset;
    assign_ctrl->end = ctrl->offset;
    assign_ctrl->file_id = datafile->ctrl->id;
    assign_ctrl->sec_id = 0;

    bak_record_new_file(bak, BACKUP_DATA_FILE, assign_ctrl->file_id, sec_id, 0, OG_FALSE, 0, 0);
}

status_t bak_send_stream_data(knl_session_t *session, bak_t *bak, bak_assignment_t *assign_ctrl)
{
    bak_process_t *proc = &session->kernel->backup_ctx.process[BAK_COMMON_PROC];
    bak_stream_buf_t *stream = &bak->send_stream;
    date_t start;
    bool32 file_end = OG_FALSE;
    uint32 send_size;
    uint64 last_read_offset = 0;
    uint64 curr_read_offset = 0;
    char *send_buf = NULL;

    for (;;) {
        if (bak_check_session_status(session) != OG_SUCCESS) {
            return OG_ERROR;
        }

        if (!cm_spin_try_lock(&stream->lock)) {
            cm_spin_sleep();
            continue;
        }

        file_end = (stream->read_offset == assign_ctrl->file_size);
        if (stream->data_size[stream->wid] == 0 && !file_end) {
            // no data in working buffer, need retry
            cm_spin_unlock(&stream->lock);
            cm_spin_sleep();
            continue;
        }
        curr_read_offset = stream->read_offset;

        stream->wid = !stream->wid;
        stream->fid = !stream->fid;
        stream->data_size[stream->wid] = 0; // reset working buffer data size
        cm_spin_unlock(&stream->lock);

        send_size = stream->data_size[stream->fid];
        send_buf = stream->bufs[stream->fid].aligned_buf;
        if (send_size > 0) {
            start = g_timer()->now;
            if (bak_agent_write(bak, send_buf, send_size) != OG_SUCCESS) {
                return OG_ERROR;
            }
            proc->stat.write_time += (g_timer()->now - start);
            proc->stat.write_size += send_size;
        }

        bak_update_progress(bak, (curr_read_offset - last_read_offset));
        last_read_offset = curr_read_offset;
        if (file_end) {
            break;
        }
    }

    return OG_SUCCESS;
}

void bak_init_send_stream(bak_t *bak, uint32 start, uint64 filesize, uint32 file_id)
{
    bak_stream_buf_t *stream_buf = &bak->send_stream;
    bak_read_cursor_t *cursor = &bak->read_cursor;

    stream_buf->curr_block_id = 0;
    stream_buf->read_offset = start;
    stream_buf->bakfile_size = 0;
    stream_buf->wid = 0;
    stream_buf->fid = 1;
    stream_buf->data_size[0] = 0;
    stream_buf->data_size[1] = 0;

    cursor->block_id = 0;
    cursor->file_id = file_id;
    cursor->file_size = filesize;
    cursor->offset = start;
    cursor->read_size = 0;
}

status_t bak_stream_send_end(bak_t *bak, bak_stream_buf_t *stream_buf)
{
    bak->backup_size = stream_buf->bakfile_size;

    if (bak_agent_send_pkg(bak, BAK_PKG_FILE_END) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (bak_agent_wait_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
        return OG_ERROR;
    }

    knl_panic(bak->files[bak->curr_file_index].type != BACKUP_HEAD_FILE);
    knl_panic(bak->curr_file_index < BAK_MAX_FILE_NUM);
    bak->files[bak->curr_file_index].size = bak->backup_size;
    bak->files[bak->curr_file_index].sec_start = 0;
    bak->files[bak->curr_file_index].sec_end = 0;
    bak->curr_file_index++;

    return OG_SUCCESS;
}

static void rst_assign_uds_restore_task(knl_session_t *session, bak_process_t *proc)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    uint32 bak_index = bak->file_count - 1;
    bak_file_t *bak_file = &bak->files[bak_index];
    bak_assignment_t *assign_ctrl = &proc->assign_ctrl;

    knl_panic(proc->is_free);
    assign_ctrl->task = bak->is_building ? BAK_BUILD_RESTORE_TASK : BAK_STREAM_RESTORE_TASK;
    bak_file->sec_start = assign_ctrl->section_start;
    bak_file->sec_end = assign_ctrl->section_end;

    for (uint32 j = 0; j < OG_MAX_DATA_FILES; j++) {
        knl_panic(proc->datafiles[j] == OG_INVALID_HANDLE);
    }

    CM_MFENCE;
    proc->is_free = OG_FALSE;
}

void rst_assign_stream_restore_task(knl_session_t *session, bak_ctrl_t *init_ctrl)
{
    bak_context_t *backup_ctx = &session->kernel->backup_ctx;
    bak_t *bak = &backup_ctx->bak;
    bak_assignment_t *assign_ctrl = NULL;
    bak_process_t *proc = NULL;
    bak_ctrl_t *ctrl = NULL;
    uint32 bak_index = bak->curr_file_index;

    for (uint32 id = 0; id < bak->proc_count; id++) {
        proc = &backup_ctx->process[id + 1];
        knl_panic(proc->is_free);

        ctrl = &proc->ctrl;
        *ctrl = *init_ctrl;
        ctrl->handle = OG_INVALID_HANDLE;

        assign_ctrl = &proc->assign_ctrl;
        assign_ctrl->sec_id = 0;
        assign_ctrl->start = 0;
        assign_ctrl->end = 0;
        assign_ctrl->section_start = 0;
        assign_ctrl->section_end = 0;
        assign_ctrl->bak_index = bak_index;
        assign_ctrl->bak_file.handle = bak->local.handle;
        assign_ctrl->type = bak->files[bak_index].type;

        rst_assign_uds_restore_task(session, proc);
    }
}

void rst_init_recv_stream(bak_t *bak)
{
    rst_stream_buf_t *recv_stream = &bak->recv_stream;

    recv_stream->prev_block = 0;
    recv_stream->curr_block_offset = 0;
    recv_stream->wid = 0;
    recv_stream->fid = 1;
    recv_stream->usable_size[0] = 0;
    recv_stream->usable_size[1] = 0;
    recv_stream->recv_size[0] = 0;
    recv_stream->recv_size[1] = 0;
    recv_stream->curr_file_tail = 0;
    recv_stream->is_eof = OG_FALSE;
}

status_t rst_stream_read_prepare(knl_session_t *session, rst_stream_buf_t *recv_stream, log_file_head_t *head,
                                 bool32 *ignore_logfile)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    bak_process_t *common_proc = &session->kernel->backup_ctx.process[BAK_COMMON_PROC];
    uint32 bak_index = bak->curr_file_index;
    uint32 file_id = bak->files[bak_index].id;
    bool32 ignore_data = OG_FALSE;

    if (bak->files[bak_index].type == BACKUP_DATA_FILE) {
        recv_stream->base_filesize = common_proc->datafile_size[file_id];  // for incremental restore, set base file
                                                                           // size
    } else {
        int32 read_size;
        bool32 log_end = OG_FALSE;

        if (rst_agent_read(bak, (char *)head, sizeof(log_file_head_t), &read_size, &log_end) != OG_SUCCESS) {
            return OG_ERROR;
        }

        if (read_size < sizeof(log_file_head_t)) {
            OG_LOG_RUN_ERR("[RESTOR] failed to read log file head, read size %d less than %u", read_size,
                           (uint32)sizeof(log_file_head_t));
            return OG_ERROR;
        }

        if (log_verify_head_checksum(session, head, "") != OG_SUCCESS) {
            return OG_ERROR;
        }

        uint32 fill_size = CM_CALC_ALIGN(sizeof(log_file_head_t), (uint32)head->block_size) - sizeof(log_file_head_t);
        if (rst_agent_read(bak, ((char *)head) + sizeof(log_file_head_t), fill_size, &read_size, &log_end) !=
            OG_SUCCESS) {
            return OG_ERROR;
        }

        if (read_size != fill_size) {
            OG_LOG_RUN_ERR("[RESTOR] failed to read log file head, read size %d less than %u", read_size, fill_size);
            return OG_ERROR;
        }
        common_proc->ctrl.handle = OG_INVALID_HANDLE;
        if (rst_set_logfile_ctrl(session, bak->curr_file_index, head, &common_proc->ctrl, &ignore_data) != OG_SUCCESS) {
            return OG_ERROR;
        }
        if (!ignore_data) {
            if (rst_write_data(session, &common_proc->ctrl, (const char *)head, fill_size + sizeof(log_file_head_t)) !=
                OG_SUCCESS) {
                cm_close_device(common_proc->ctrl.type, &common_proc->ctrl.handle);
                return OG_ERROR;
            }
        }
        cm_close_device(common_proc->ctrl.type, &common_proc->ctrl.handle);
        bak_update_progress(bak, (uint64)(fill_size + sizeof(log_file_head_t)));
    }

    *ignore_logfile = ignore_data;
    return OG_SUCCESS;
}

/*
 * recieve buffer tail may be incomplete block, need set uasble size
 */
static status_t rst_stream_aligned_blocks(rst_stream_buf_t *stream, uint32 recv_size, bool32 ignore_data)
{
    knl_panic(recv_size <= stream->buf_size);
    if (ignore_data || recv_size == 0) {
        stream->usable_size[stream->fid] = 0;
        stream->recv_size[stream->fid] = 0;
        return OG_SUCCESS;
    }

    uint32 usable_size = 0;
    char *buf = stream->bufs[stream->fid].aligned_buf;
    bak_block_head_t *block = (bak_block_head_t *)(buf + usable_size);

    while (block->block_size + usable_size <= recv_size) {
        if (block->magic_num != LOG_MAGIC_NUMBER) {
            OG_LOG_RUN_ERR("[RESTOR] invalid received block with id %u, size is %u", block->block_id,
                           block->block_size);
            return OG_ERROR;
        }

        usable_size += block->block_size;
        if (sizeof(bak_block_head_t) + usable_size > recv_size) {
            break;
        }
        block = (bak_block_head_t *)(buf + usable_size);
    }

    if (usable_size < sizeof(bak_block_head_t)) {
        OG_LOG_RUN_ERR("[RESTOR] invalid received block with id %u, size is %u", block->block_id, block->block_size);
        return OG_ERROR;
    }
    knl_panic(usable_size <= recv_size);

    stream->usable_size[stream->fid] = usable_size;
    stream->recv_size[stream->fid] = recv_size;
    return OG_SUCCESS;
}

// previous recieve buffer tail may be incomplete block, need copy it to current buffer head
static uint32 rst_fix_incomplete_block(rst_stream_buf_t *stream)
{
    char *curr_recv_buf = stream->bufs[stream->fid].aligned_buf;
    char *prev_buf_tail = stream->bufs[stream->wid].aligned_buf + stream->usable_size[stream->wid];
    uint32 tail_size = stream->recv_size[stream->wid] - stream->usable_size[stream->wid];
    errno_t ret;

    if (tail_size > 0) {
        ret = memmove_s(curr_recv_buf, stream->buf_size, prev_buf_tail, tail_size);
        knl_securec_check(ret);
    }

    return tail_size;
}

static status_t rst_stream_read_internal(knl_session_t *session, char *buf, uint32 buf_size, int32 *recv_size,
                                  bool32 *file_end)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    bak_process_t *proc = &session->kernel->backup_ctx.process[BAK_COMMON_PROC];
    date_t start = g_timer()->now;

    if (rst_agent_read(bak, buf, buf_size, recv_size, file_end) != OG_SUCCESS) {
        return OG_ERROR;
    }
    proc->stat.read_time += (g_timer()->now - start);
    proc->stat.read_size += (*recv_size);
    bak_update_progress(bak, (uint64)(*recv_size));

    return OG_SUCCESS;
}

status_t rst_recv_stream_data(knl_session_t *session, bool32 ignore_data)
{
    bak_t *bak = &session->kernel->backup_ctx.bak;
    rst_stream_buf_t *stream = &bak->recv_stream;
    char *recv_buf = NULL;
    bool32 file_end = OG_FALSE;
    uint32 last_tail_size;
    int32 recv_size = 0;

    for (;;) {
        if (bak_check_session_status(session) != OG_SUCCESS) {
            return OG_ERROR;
        }

        if (!cm_spin_try_lock(&stream->lock)) {
            cm_spin_sleep();
            continue;
        }

        // make sure the data of working buffer has been distributed
        if (stream->usable_size[stream->wid] > stream->curr_block_offset) {
            // some data of working buffer is not distributed, need wait
            cm_spin_unlock(&stream->lock);
            cm_spin_sleep();
            continue;
        }

        knl_panic(stream->usable_size[stream->wid] == stream->curr_block_offset);
        stream->wid = !stream->wid;
        stream->fid = !stream->fid;
        stream->curr_block_offset = 0;
        recv_buf = stream->bufs[stream->fid].aligned_buf;

        if (file_end) {
            knl_panic(stream->recv_size[stream->wid] == stream->usable_size[stream->wid]);
            stream->is_eof = OG_TRUE;
            cm_spin_unlock(&stream->lock);
            break;
        }
        cm_spin_unlock(&stream->lock);

        last_tail_size = rst_fix_incomplete_block(stream);
        if (rst_stream_read_internal(session, recv_buf + last_tail_size, stream->buf_size - last_tail_size, &recv_size,
                                     &file_end) != OG_SUCCESS) {
            return OG_ERROR;
        }
        OG_LOG_DEBUG_INF("[BACKUP] read size :%u, last block tail size %u", recv_size, last_tail_size);
        recv_size += last_tail_size;
        if (rst_stream_aligned_blocks(stream, (uint32)recv_size, ignore_data) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    return OG_SUCCESS;
}