/* -------------------------------------------------------------------------
 *  This file is part of the oGRAC project.
 * Copyright (c) 2024 Huawei Technologies Co.,Ltd.
 *
 * oGRAC is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * knl_db_ctrl.c
 *
 *
 * IDENTIFICATION
 * src/kernel/knl_db_ctrl.c
 *
 * -------------------------------------------------------------------------
 */
#include "cm_common_module.h"
#include "knl_db_ctrl.h"
#include "cm_file.h"
#include "knl_context.h"
#include "knl_ctrl_restore.h"
#include "dtc_database.h"
#include "dtc_dls.h"

#ifdef __cplusplus
extern "C" {
#endif

void db_init_logfile_ctrl(knl_session_t *session, uint32 *offset)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    uint32 count = CTRL_MAX_BUF_SIZE / sizeof(log_file_ctrl_t);
    uint32 pages_per_inst = (OG_MAX_LOG_FILES - 1) / count + 1;
    uint32 i;
    logfile_set_t *logfile_set = MY_LOGFILE_SET(session);

    for (i = 0; i < OG_MAX_LOG_FILES; i++) {
        logfile_set->items[i].ctrl = (log_file_ctrl_t *)db_get_log_ctrl_item(db->ctrl.pages, i, sizeof(log_file_ctrl_t),
                                                                             *offset, kernel->id);
        logfile_set->items[i].handle = OG_INVALID_HANDLE;
    }

    uint32 inst_count = kernel->attr.clustered ? OG_MAX_INSTANCES : 1;
    *offset = *offset + pages_per_inst * inst_count;
}

void db_init_space_ctrl(knl_session_t *session, uint32 *offset)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    uint32 count = CTRL_MAX_BUF_SIZE / sizeof(space_ctrl_t);
    uint32 i;
    errno_t err;

    for (i = 0; i < OG_MAX_SPACES; i++) {
        db->spaces[i].ctrl = (space_ctrl_t *)db_get_ctrl_item(db->ctrl.pages, i, sizeof(space_ctrl_t), *offset);
        db->spaces[i].ctrl->used = OG_FALSE;
        db->spaces[i].ctrl->id = i;
        err = memset_sp(db->spaces[i].ctrl->files, OG_MAX_SPACE_FILES * sizeof(uint32), 0xFF,
                        OG_MAX_SPACE_FILES * sizeof(uint32));
        knl_securec_check(err);
    }
    *offset = *offset + (OG_MAX_SPACES - 1) / count + 1;
}

void db_init_datafile_ctrl(knl_session_t *session, uint32 *offset)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    uint32 count = CTRL_MAX_BUF_SIZE / sizeof(datafile_ctrl_t);
    uint32 i;

    for (i = 0; i < OG_MAX_DATA_FILES; i++) {
        db->datafiles[i].ctrl = (datafile_ctrl_t *)db_get_ctrl_item(db->ctrl.pages, i, sizeof(datafile_ctrl_t),
                                                                    *offset);
        db->datafiles[i].ctrl->id = i;
        db->datafiles[i].ctrl->used = OG_FALSE;
        db->datafiles[i].file_no = OG_INVALID_ID32;
        db->datafiles[i].block_num = 0;
    }

    *offset = *offset + (OG_MAX_DATA_FILES - 1) / count + 1;
}

static inline void db_calc_ctrl_checksum(knl_session_t *session, ctrl_page_t *page, uint32 size)
{
    page->tail.checksum = OG_INVALID_CHECKSUM;
    if (size == 0 || DB_IS_CHECKSUM_OFF(session)) {
        return;
    }

    page_calc_checksum((page_head_t *)page, size);
}

static bool32 db_verify_ctrl_checksum(knl_session_t *session, ctrl_page_t *page, uint32 size, uint32 id)
{
    uint32 cks_level = session->kernel->attr.db_block_checksum;
    if (DB_IS_CHECKSUM_OFF(session) || page->tail.checksum == OG_INVALID_CHECKSUM) {
        return OG_TRUE;
    }

    if (size == 0 || !page_verify_checksum((page_head_t *)page, size)) {
        OG_LOG_RUN_ERR("the %d's ctrl page corrupted.size %u cks %u checksum level %s", id, size, page->tail.checksum,
                       knl_checksum_level(cks_level));
        return OG_FALSE;
    }

    return OG_TRUE;
}

static status_t db_try_sync_ctrl_files(knl_session_t *session, uint32 main_file_id)
{
    uint32 i;
    ctrlfile_t *ctrlfile = NULL;
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;

    for (i = 0; i < db->ctrlfiles.count; i++) {
        ctrlfile = &db->ctrlfiles.items[i];
        if (i == main_file_id) {
            continue;
        }

        if (cm_open_device(ctrlfile->name, ctrlfile->type, knl_io_flag(session), &ctrlfile->handle) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[DB] failed to open %s ", ctrlfile->name);
            return OG_ERROR;
        }

        if (cm_write_device(ctrlfile->type, ctrlfile->handle, 0, db->ctrl.pages,
                            (int32)ctrlfile->blocks * ctrlfile->block_size) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[DB] failed to write %s ", ctrlfile->name);
            cm_close_device(ctrlfile->type, &ctrlfile->handle);
            return OG_ERROR;
        }

        if (db_fdatasync_file(session, ctrlfile->handle) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[DB] failed to fdatasync datafile %s", ctrlfile->name);
            cm_close_device(ctrlfile->type, &ctrlfile->handle);
            return OG_ERROR;
        }

        cm_close_device(ctrlfile->type, &ctrlfile->handle);
    }

    return OG_SUCCESS;
}

/*
 * Description     : fetch ctrl file name from file name list
 * Input           : files: file name list seperated by \0
 * Output          : name: one ctrl file name
 * Return Value    : void
 * History         : 1.2017/4/26,  add description
 */
static void db_fetch_ctrlfile_name(text_t *files, text_t *name)
{
    if (!cm_fetch_text(files, ',', '\0', name)) {
        return;
    }

    cm_trim_text(name);
    if (name->str[0] == '\'') {
        name->str++;
        /* reduce the length of "\'XXX\'" */
        name->len -= (uint32)strlen("\'\'");

        cm_trim_text(name);
    }
}

static bool32 db_is_ctrl_size_valid(knl_session_t *session, ctrlfile_t *ctrlfile)
{
    int64 filesize;
    int64 max_size;

    max_size = (int64)(ctrlfile->blocks * ctrlfile->block_size);
    filesize = cm_device_size(ctrlfile->type, ctrlfile->handle);
    if (filesize != max_size) {
        OG_LOG_RUN_ERR("[DB] the size of ctrl file %s is abnormal, the expected size is: %lld, "
            "the actual size is: %lld",
            ctrlfile->name, max_size, filesize);
        return OG_FALSE;
    }

    return OG_TRUE;
}

static bool32 db_validate_ctrl(knl_session_t *session, ctrlfile_t *ctrlfile)
{
    uint32 i;
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    ctrl_page_t *pages = kernel->db.ctrl.pages;

    for (i = 0; i < CTRL_MAX_PAGES(session); i++) {
        if (pages[i].head.pcn != pages[i].tail.pcn) {
            return OG_FALSE;
        }

        if (!db_verify_ctrl_checksum(session, &pages[i], (uint32)ctrlfile->block_size, i)) {
            return OG_FALSE;
        }
    }

    return OG_TRUE;
}

static status_t db_try_load_oldctrl(ctrlfile_t *ctrlfile, knl_instance_t *kernel, bool32 *updated)
{
    int32 error_code = cm_get_error_code();
    if (error_code == ERR_READ_DEVICE_INCOMPLETE) {
        OG_LOG_RUN_ERR("[DB] failed to read %s try read old version", ctrlfile->name);
        if (cm_read_device(ctrlfile->type, ctrlfile->handle, 0, kernel->db.ctrl.pages,
                           CTRL_OLD_MAX_PAGE * ctrlfile->block_size) == OG_SUCCESS) {
            cm_reset_error();
            *updated = OG_TRUE;
            return OG_SUCCESS;
        }
    }
    return OG_ERROR;
}

static status_t db_check_undo_space(knl_instance_t *kernel, database_t *db)
{
    dtc_node_ctrl_t *ctrl = dtc_my_kernel_ctrl(kernel);
    uint32 undo_id = ctrl->undo_space;
    space_t *undo_space = &db->spaces[undo_id];
    text_t undo_name;
    char *param = cm_get_config_value(kernel->attr.config, "UNDO_TABLESPACE");
    uint32 len = (uint32)strlen(param);
    if (len == 0) {
        return cm_alter_config(kernel->attr.config, "UNDO_TABLESPACE", undo_space->ctrl->name, CONFIG_SCOPE_MEMORY,
                               OG_TRUE);
    }

    cm_str2text(param, &undo_name);
    if (!cm_text_str_equal_ins(&undo_name, undo_space->ctrl->name)) {
        OG_THROW_ERROR(ERR_UNDO_TABLESPACE_NOT_MATCH, undo_name.str, undo_space->ctrl->name);
        return OG_ERROR;
    }
    return OG_SUCCESS;
}

static status_t db_check_ctrl_attr(knl_instance_t *kernel, database_t *db)
{
    if (!db->recover_for_restore && !(db->status == DB_STATUS_CREATING || db->status == DB_STATUS_MOUNT) &&
        !db->ctrl.core.build_completed) {
        OG_THROW_ERROR(ERR_DATABASE_NOT_COMPLETED);
        knl_panic_log(0, "database is not create completed ,db status is : %d", db->status);
        return OG_ERROR;
    }

    if (kernel->attr.page_size != db->ctrl.core.page_size) {
        OG_THROW_ERROR(ERR_PARAMETER_NOT_MATCH, "PAGE_SIZE", kernel->attr.page_size, db->ctrl.core.page_size);
        return OG_ERROR;
    }

    if (kernel->attr.clustered != db->ctrl.core.clustered) {
        //        OG_THROW_ERROR(ERR_PARAMETER_NOT_MATCH, "CLUSTER_DATABASE", kernel->attr.clustered,
        //        db->ctrl.core.clustered);
        OG_LOG_RUN_WAR("CLUSTER_DATABASE not match, in cnf:%d, in ctrl:%d", kernel->attr.clustered,
                       db->ctrl.core.clustered);
    }

    if (kernel->attr.max_column_count < db->ctrl.core.max_column_count) {
        OG_THROW_ERROR(ERR_PARAMETER_NOT_MATCH, "MAX_COLUMN_COUNT", kernel->attr.max_column_count,
                       db->ctrl.core.max_column_count);
        return OG_ERROR;
    }

    if (kernel->attr.max_column_count > db->ctrl.core.max_column_count) {
        db->ctrl.core.max_column_count = kernel->attr.max_column_count;
    }

    if (kernel->attr.undo_segments != db->ctrl.core.undo_segments) {
        OG_THROW_ERROR(ERR_PARAMETER_NOT_MATCH, "UNDO_SEGMENTS", kernel->attr.undo_segments,
                       db->ctrl.core.undo_segments);
        return OG_ERROR;
    }

    if (db_check_undo_space(kernel, db) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

status_t db_load_ctrlspace(knl_session_t *session, text_t *files)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    ctrlfile_t *ctrlfile = NULL;
    text_t file_name;
    uint32 main_file_id = OG_INVALID_ID32;
    uint32 id = 0;
    bool32 loaded = OG_FALSE;
    bool32 upgrade = OG_FALSE;

    cm_remove_brackets(files);

    db_fetch_ctrlfile_name(files, &file_name);
    while (file_name.len > 0) {
        CM_ABORT((id < OG_MAX_CTRL_FILES), "number of ctrl file exceeded the limit %d.", OG_MAX_CTRL_FILES);
        ctrlfile = &db->ctrlfiles.items[id];
        (void)cm_text2str(&file_name, ctrlfile->name, OG_FILE_NAME_BUFFER_SIZE);
        ctrlfile->type = cm_device_type(ctrlfile->name);
        ctrlfile->block_size = OG_DFLT_CTRL_BLOCK_SIZE;
        ctrlfile->blocks = CTRL_MAX_PAGES(session);

        if (cm_open_device(ctrlfile->name, ctrlfile->type, knl_io_flag(session), &ctrlfile->handle) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[DB] failed to open %s ", ctrlfile->name);
            return OG_ERROR;
        }

        if (loaded) {
            id++;
            cm_close_device(ctrlfile->type, &ctrlfile->handle);
            db_fetch_ctrlfile_name(files, &file_name);
            continue;
        }

        if (cm_read_device(ctrlfile->type, ctrlfile->handle, 0, db->ctrl.pages,
                           (int32)ctrlfile->blocks * ctrlfile->block_size) != OG_SUCCESS) {
            /*
             * old version ctrl file size is 512*16k, new version change to 640*16k
             * need to adapte old version and read 512*16k again when read error!
             */
            if (db_try_load_oldctrl(ctrlfile, kernel, &upgrade) != OG_SUCCESS) {
                OG_LOG_RUN_ERR("[DB] failed to read %s ", ctrlfile->name);
                OG_THROW_ERROR(ERR_LOAD_CONTROL_FILE, ctrlfile->name);
                cm_close_device(ctrlfile->type, &ctrlfile->handle);
                return OG_ERROR;
            }
        }

        if (!db_is_ctrl_size_valid(session, ctrlfile)) {
            OG_THROW_ERROR(ERR_LOAD_CONTROL_FILE, "control file size is not correct");
            return OG_ERROR;
        }

        cm_close_device(ctrlfile->type, &ctrlfile->handle);

        if (!db_validate_ctrl(session, ctrlfile)) {
            OG_LOG_RUN_WAR("control file %s is corrupted", ctrlfile->name);
        } else {
            main_file_id = id;
            loaded = OG_TRUE;
        }

        id++;
        db_fetch_ctrlfile_name(files, &file_name);
    }

    if (!loaded) {
        OG_THROW_ERROR(ERR_LOAD_CONTROL_FILE, "no usable control file");
        return OG_ERROR;
    }

    bool is_slave = !DB_IS_PRIMARY(db);
    db_load_core(db);
    if (is_slave) {
        OG_LOG_RUN_INF("Manually set to STANDBY.");
        db->ctrl.core.db_role = REPL_ROLE_PHYSICAL_STANDBY;
    } else {
        if (db->ctrl.core.db_role == REPL_ROLE_PHYSICAL_STANDBY) {
            OG_LOG_RUN_INF("[INST] [SWITCHOVER] db role is PHYSICAL_STANDBY, need promote role");
            tx_rollback_close(session);
            lrpl_context_t *lrpl = &session->kernel->lrpl_ctx;
            g_standby_will_promote = OG_TRUE;
            status_t status = cm_create_thread(db_promote_cluster_role, 0, NULL, &lrpl->promote_thread);
            if (status != OG_SUCCESS) {
                CM_ABORT_REASONABLE(0, "[INST] [SWITCHOVER] promote cm_create_thread failed");
                return OG_ERROR;
            }
        } else {
            OG_LOG_RUN_INF("Manually set to PRIMARY.");
            db->ctrl.core.db_role = REPL_ROLE_PRIMARY;
        }
    }

    if (db_check_ctrl_attr(kernel, db) != OG_SUCCESS) {
        return OG_ERROR;
    }

    db->ctrlfiles.count = id;
    db->cluster_ready = db->ctrl.core.clustered;

    /* load core system info to kernel */
    kernel->scn = dtc_my_ctrl(session)->scn;
    kernel->lsn = dtc_my_ctrl(session)->lsn;
    kernel->lfn = dtc_my_ctrl(session)->lfn;

    g_timer()->db_init_time = db->ctrl.core.init_time;
    g_timer()->system_scn = &kernel->scn;
    CM_MFENCE;
    cm_atomic_set(&(g_timer()->sys_scn_valid), (int64)OG_TRUE);

    if (upgrade) {
        main_file_id = db->ctrlfiles.count;
    }
    return db_try_sync_ctrl_files(session, main_file_id);
}

status_t db_generate_ctrlitems(knl_session_t *session)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    ctrlfile_t *ctrlfile = NULL;
    char *param;
    text_t files;
    text_t file_name;
    uint32 id = 0;

    param = cm_get_config_value(kernel->attr.config, "CONTROL_FILES");
    cm_str2text(param, &files);

    if (files.len == 0) {
        OG_THROW_ERROR(ERR_LOAD_CONTROL_FILE, "CONTROL_FILES is not set!");
        return OG_ERROR;
    }

    cm_remove_brackets(&files);
    db_fetch_ctrlfile_name(&files, &file_name);
    while (file_name.len > 0) {
        ctrlfile = &db->ctrlfiles.items[id];
        (void)cm_text2str(&file_name, ctrlfile->name, OG_FILE_NAME_BUFFER_SIZE);
        ctrlfile->type = cm_device_type(ctrlfile->name);
        ctrlfile->block_size = OG_DFLT_CTRL_BLOCK_SIZE;
        id++;
        db_fetch_ctrlfile_name(&files, &file_name);
    }

    db->ctrlfiles.count = id;
    return OG_SUCCESS;
}

static status_t db_create_ctrl_device(knl_session_t *session, ctrlfile_t *ctrlfile)
{
    knl_instance_t *kernel = session->kernel;
    bool32 is_dbstor = cm_dbs_is_enable_dbs();
    if (is_dbstor) {
        ctrlfile->blocks = kernel->attr.clustered ? CTRL_MAX_PAGES_CLUSTERED : CTRL_MAX_PAGES_NONCLUSTERED;
        ctrlfile->block_size = OG_DFLT_CTRL_BLOCK_SIZE;
        uint32 flags = (ctrlfile->type == DEV_TYPE_PGPOOL ? 0xFFFFFFFF : knl_io_flag(session));
        if (cm_build_device(ctrlfile->name, ctrlfile->type, kernel->attr.xpurpose_buf,
            OG_XPURPOSE_BUFFER_SIZE, (int64)ctrlfile->blocks * ctrlfile->block_size,
            flags, OG_FALSE, &ctrlfile->handle) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[DB] failed to build %s ", ctrlfile->name);
            return OG_ERROR;
        }
        if (cm_open_device(ctrlfile->name, ctrlfile->type, knl_io_flag(session), &ctrlfile->handle) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[DB] failed to open %s ", ctrlfile->name);
            return OG_ERROR;
        }
    } else {
        if (cm_create_device(ctrlfile->name, ctrlfile->type, knl_io_flag(session), &ctrlfile->handle) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[BACKUP] failed to create %s ", ctrlfile->name);
            return OG_ERROR;
        }
    }
    return OG_SUCCESS;
}

status_t db_create_ctrl_file(knl_session_t *session)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    ctrlfile_t *ctrlfile = NULL;
    bak_context_t *ogx = &session->kernel->backup_ctx;
    bak_stat_t *stat = &ogx->bak.stat;
    uint32 id;

    for (id = 0; id < db->ctrlfiles.count; id++) {
        ctrlfile = &db->ctrlfiles.items[id];
        if (db_create_ctrl_device(session, ctrlfile) != OG_SUCCESS) {
            return OG_ERROR;
        }
        if (db_fsync(session, ctrlfile->type, ctrlfile->handle) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[BACKUP] failed to fsync datafile %s", ctrlfile->name);
            return OG_ERROR;
        }

        (void)cm_atomic_inc(&stat->writes);
    }

    return OG_SUCCESS;
}

status_t db_save_ctrl_page(knl_session_t *session, ctrlfile_t *ctrlfile, uint32 page_id)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    char *page_buf = (char *)cm_push(session->stack, (uint32)ctrlfile->block_size + (uint32)OG_MAX_ALIGN_SIZE_4K);
    ctrl_page_t *page = (ctrl_page_t *)cm_aligned_buf(page_buf);
    errno_t ret;

    knl_panic(page_id < CTRL_MAX_PAGES(session));
    CM_ABORT(page_id < CTRL_MAX_PAGES(session),
             "[DB] ABORT INFO: the count of control page has reached max control page %u",
             CTRL_MAX_PAGES(session));
    ret = memcpy_sp(page, ctrlfile->block_size, &db->ctrl.pages[page_id], ctrlfile->block_size);
    knl_securec_check(ret);

    page->head.pcn++;
    page->tail.pcn++;

    db_calc_ctrl_checksum(session, page, (uint32)ctrlfile->block_size);

    if (cm_write_device(ctrlfile->type, ctrlfile->handle, (int64)page_id * ctrlfile->block_size, (char *)page,
                        ctrlfile->block_size) != OG_SUCCESS) {
        cm_pop(session->stack);
        OG_LOG_RUN_ERR("[DB] failed to write %s ", ctrlfile->name);
        return OG_ERROR;
    }

    if (db_fdatasync_file(session, ctrlfile->handle) != OG_SUCCESS) {
        OG_LOG_RUN_ERR("[DB] failed to fdatasync datafile %s", ctrlfile->name);
        cm_pop(session->stack);
        return OG_ERROR;
    }

    cm_pop(session->stack);
    return OG_SUCCESS;
}

status_t db_read_ctrl_page(knl_session_t *session, ctrlfile_t *ctrlfile, uint32 page_id)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;

    knl_panic(page_id < CTRL_MAX_PAGES(session));
    CM_ABORT(page_id < CTRL_MAX_PAGES(session),
             "[DB] ABORT INFO: the count of control page has reached max control page %u",
             CTRL_MAX_PAGES(session));

    if (cm_read_device(ctrlfile->type, ctrlfile->handle, (int64)page_id * ctrlfile->block_size,
                       &db->ctrl.pages[page_id], ctrlfile->block_size) != OG_SUCCESS) {
        OG_LOG_RUN_ERR("[DB] failed to read %s offset %lld", ctrlfile->name, (int64)page_id * ctrlfile->block_size);
        return OG_ERROR;
    }

    db_calc_ctrl_checksum(session, &db->ctrl.pages[page_id], (uint32)ctrlfile->block_size);
    return OG_SUCCESS;
}

status_t db_read_log_page(knl_session_t *session, ctrlfile_t *ctrlfile, uint32 start, uint32 end)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;

    knl_panic(end < CTRL_MAX_PAGES(session));
    CM_ABORT(end < CTRL_MAX_PAGES(session),
             "[DB] ABORT INFO: the count of control page has reached max control page %u",
             CTRL_MAX_PAGES(session));

    for (uint32 i = start; i <= end; i++) {
        if (cm_read_device(ctrlfile->type, ctrlfile->handle, (int64)i * ctrlfile->block_size, &db->ctrl.pages[i],
                           ctrlfile->block_size) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[DB] failed to read %s offset %lld", ctrlfile->name, (int64)i * ctrlfile->block_size);
            return OG_ERROR;
        }
        db_calc_ctrl_checksum(session, &db->ctrl.pages[i], (uint32)ctrlfile->block_size);
    }

    return OG_SUCCESS;
}

status_t db_save_core_ctrl(knl_session_t *session)
{
    ctrlfile_t *ctrlfile = NULL;
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    uint32 i;
    // standby follower in promote don't need save core ctrl
    if (DB_IS_CLUSTER(session) && !DB_IS_PRIMARY(db) && !rc_is_master() && g_standby_will_promote)
    {
        return OG_SUCCESS;
    }

    cm_spin_lock(&db->ctrl_lock, NULL);
    db_store_core(db);

    for (i = 0; i < db->ctrlfiles.count; i++) {
        ctrlfile = &db->ctrlfiles.items[i];
        knl_panic((uint32)ctrlfile->block_size == OG_DFLT_CTRL_BLOCK_SIZE);

        /* ctrlfile can be opened for a long time, closed in db_close_ctrl_files */
        if (cm_open_device(ctrlfile->name, ctrlfile->type, knl_io_flag(session), &ctrlfile->handle) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[DB] failed to open %s ", ctrlfile->name);
            cm_spin_unlock(&db->ctrl_lock);
            return OG_ERROR;
        }

        if (db_save_ctrl_page(session, ctrlfile, CORE_CTRL_PAGE_ID) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[DB] failed to write %s ", ctrlfile->name);
            cm_spin_unlock(&db->ctrl_lock);
            return OG_ERROR;
        }
    }

    cm_spin_unlock(&db->ctrl_lock);
    return OG_SUCCESS;
}

status_t db_save_node_ctrl(knl_session_t *session)
{
    return dtc_save_ctrl(session, session->kernel->id);
}

status_t db_save_log_ctrl(knl_session_t *session, uint32 id, uint32 node_id)
{
    ctrlfile_t *ctrlfile = NULL;
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    uint32 count = CTRL_MAX_BUF_SIZE / sizeof(log_file_ctrl_t);
    uint32 pages_per_inst = (OG_MAX_LOG_FILES - 1) / count + 1;
    uint32 i;
    uint32 page_id;

    cm_spin_lock(&db->ctrl_lock, NULL);
    db_store_core(db);

    for (i = 0; i < db->ctrlfiles.count; i++) {
        ctrlfile = &db->ctrlfiles.items[i];
        /* ctrlfile can be opened for a long time, closed in db_close_ctrl_files */
        if (cm_open_device(ctrlfile->name, ctrlfile->type, knl_io_flag(session), &ctrlfile->handle) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[DB] failed to open %s ", ctrlfile->name);
            cm_spin_unlock(&db->ctrl_lock);
            return OG_ERROR;
        }

        page_id = db->ctrl.log_segment + pages_per_inst * node_id + id / count;
        if (db_save_ctrl_page(session, ctrlfile, page_id) != OG_SUCCESS) {
            cm_spin_unlock(&db->ctrl_lock);
            return OG_ERROR;
        }

        if (db_save_ctrl_page(session, ctrlfile, CORE_CTRL_PAGE_ID) != OG_SUCCESS) {
            cm_spin_unlock(&db->ctrl_lock);
            return OG_ERROR;
        }
    }
    if (db->status >= DB_STATUS_MOUNT) {
        if (ctrl_backup_log_ctrl(session, id, session->kernel->id) != OG_SUCCESS) {
            cm_spin_unlock(&db->ctrl_lock);
            return OG_ERROR;
        }
    }

    cm_spin_unlock(&db->ctrl_lock);

    if (session->kernel->attr.clustered) {
        if (dtc_save_ctrl(session, session->kernel->id) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[DB] failed to save ctrl for instance %u", session->kernel->id);
            return OG_ERROR;
        }
    }
    return OG_SUCCESS;
}
static status_t db_write_ctrl_page(knl_session_t *session, ctrlfile_t *ctrlfile, uint32 page_id, ctrl_page_t *page)
{
    page->head.pcn++;
    page->tail.pcn++;
    db_calc_ctrl_checksum(session, page, (uint32)ctrlfile->block_size);
    if (cm_write_device(ctrlfile->type, ctrlfile->handle, (int64)page_id * ctrlfile->block_size, (char *)page,
                        ctrlfile->block_size) != OG_SUCCESS) {
        OG_LOG_RUN_ERR("[DB] failed to write %s ", ctrlfile->name);
        return OG_ERROR;
    }

    if (db_fdatasync_file(session, ctrlfile->handle) != OG_SUCCESS) {
        OG_LOG_RUN_ERR("[DB] failed to fdatasync datafile %s", ctrlfile->name);
        return OG_ERROR;
    }
    return OG_SUCCESS;
}
static status_t db_save_ctrl_page_ograc(knl_session_t *session, ctrlfile_t *ctrlfile, uint32 id)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    uint32 count = CTRL_MAX_BUF_SIZE / sizeof(datafile_ctrl_t);
    uint32 page_id = db->ctrl.datafile_segment + id / count;
    datafile_ctrl_t *df_ctrl_buf = NULL;
    datafile_ctrl_t *df_ctrl = (datafile_ctrl_t *)db_get_ctrl_item(db->ctrl.pages, id, sizeof(datafile_ctrl_t),
                                                                   db->ctrl.datafile_segment);
    char *page_buf = (char *)cm_push(session->stack, (uint32)OG_DFLT_CTRL_BLOCK_SIZE + (uint32)OG_MAX_ALIGN_SIZE_4K);
    ctrl_page_t *page = (ctrl_page_t *)cm_aligned_buf(page_buf);
    if (cm_read_device(ctrlfile->type, ctrlfile->handle, (int64)page_id * ctrlfile->block_size,
                       page, ctrlfile->block_size) != OG_SUCCESS) {
        cm_pop(session->stack);
        OG_LOG_RUN_ERR("[DB]failed to read %s offset %lld", ctrlfile->name, (int64)page_id * ctrlfile->block_size);
        return OG_ERROR;
    }

    df_ctrl_buf = (datafile_ctrl_t *)(page->buf + id % count * sizeof(datafile_ctrl_t));
    int32 ret = memcpy_sp(df_ctrl_buf, sizeof(datafile_ctrl_t), df_ctrl, sizeof(datafile_ctrl_t));
    knl_securec_check(ret);
    if (db_write_ctrl_page(session, ctrlfile, page_id, page) != OG_SUCCESS) {
        cm_pop(session->stack);
        OG_LOG_RUN_ERR("[DB]failed to write %s offset %lld", ctrlfile->name, (int64)page_id * ctrlfile->block_size);
        return OG_ERROR;
    }
    cm_pop(session->stack);
    return OG_SUCCESS;
}
status_t db_save_datafile_ctrl(knl_session_t *session, uint32 id)
{
    ctrlfile_t *ctrlfile = NULL;
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    uint32 count = CTRL_MAX_BUF_SIZE / sizeof(datafile_ctrl_t);
    uint32 i;
    uint32 page_id;

    knl_panic(!OGRAC_REPLAY_NODE(session));
    for (;;) {
        if (dls_spin_try_lock(session, &db->df_ctrl_lock)) {
            break;
        }
        cm_sleep(2);
    }
    db_store_core(db);
    for (i = 0; i < db->ctrlfiles.count; i++) {
        ctrlfile = &db->ctrlfiles.items[i];
        /* ctrlfile can be opened for a long time, closed in db_close_ctrl_files */
        if (cm_open_device(ctrlfile->name, ctrlfile->type, knl_io_flag(session), &ctrlfile->handle) != OG_SUCCESS) {
            dls_spin_unlock(session, &db->df_ctrl_lock);
            OG_LOG_RUN_ERR("[DB]failed to open %s ", ctrlfile->name);
            return OG_ERROR;
        }
        if (DB_IS_CLUSTER(session)) {
            if (db_save_ctrl_page_ograc(session, ctrlfile, id) != OG_SUCCESS) {
                dls_spin_unlock(session, &db->df_ctrl_lock);
                return OG_ERROR;
            }
        } else {
            page_id = db->ctrl.datafile_segment + id / count;
            if (db_save_ctrl_page(session, ctrlfile, page_id) != OG_SUCCESS) {
                dls_spin_unlock(session, &db->df_ctrl_lock);
                return OG_ERROR;
            }
        }
        if (db_save_ctrl_page(session, ctrlfile, CORE_CTRL_PAGE_ID) != OG_SUCCESS) {
            dls_spin_unlock(session, &db->df_ctrl_lock);
            return OG_ERROR;
        }
    }
    if (ctrl_backup_datafile_ctrl(session, id) != OG_SUCCESS) {
        dls_spin_unlock(session, &db->df_ctrl_lock);
        return OG_ERROR;
    }
    dls_spin_unlock(session, &db->df_ctrl_lock);
    return OG_SUCCESS;
}

status_t db_save_space_ctrl(knl_session_t *session, uint32 id)
{
    ctrlfile_t *ctrlfile = NULL;
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    uint32 count = CTRL_MAX_BUF_SIZE / sizeof(space_ctrl_t);
    uint32 i;
    uint32 page_id;

    knl_panic(!OGRAC_REPLAY_NODE(session));
    cm_spin_lock(&db->ctrl_lock, NULL);
    db_store_core(db);

    for (i = 0; i < db->ctrlfiles.count; i++) {
        ctrlfile = &db->ctrlfiles.items[i];
        /* ctrlfile can be opened for a long time, closed in db_close_ctrl_files */
        if (cm_open_device(ctrlfile->name, ctrlfile->type, knl_io_flag(session), &ctrlfile->handle) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[DB] failed to open %s ", ctrlfile->name);
            cm_spin_unlock(&db->ctrl_lock);
            return OG_ERROR;
        }

        page_id = db->ctrl.space_segment + id / count;
        if (db_save_ctrl_page(session, ctrlfile, page_id) != OG_SUCCESS) {
            cm_spin_unlock(&db->ctrl_lock);
            return OG_ERROR;
        }

        if (db_save_ctrl_page(session, ctrlfile, CORE_CTRL_PAGE_ID) != OG_SUCCESS) {
            cm_spin_unlock(&db->ctrl_lock);
            return OG_ERROR;
        }
    }

    if (ctrl_backup_space_ctrl(session, id) != OG_SUCCESS) {
        cm_spin_unlock(&db->ctrl_lock);
        return OG_ERROR;
    }

    cm_spin_unlock(&db->ctrl_lock);
    return OG_SUCCESS;
}

status_t db_save_arch_ctrl(knl_session_t *session, uint32 id, uint32 node_id, uint32 start_asn, uint32 end_asn)
{
    ctrlfile_t *ctrlfile = NULL;
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    uint32 count = CTRL_MAX_BUF_SIZE / sizeof(arch_ctrl_t);
    uint32 pages_per_inst = (OG_MAX_ARCH_NUM - 1) / count + 1;
    uint32 page_id = db->ctrl.arch_segment + pages_per_inst * node_id + id / count;

    cm_spin_lock(&db->ctrl_lock, NULL);
    arch_set_arch_start(session, start_asn, node_id);
    arch_set_arch_end(session, end_asn, node_id);
    db_store_core(db);

    for (uint32 i = 0; i < db->ctrlfiles.count; i++) {
        ctrlfile = &db->ctrlfiles.items[i];
        /* ctrlfile can be opened for a long time, closed in db_close_ctrl_files */
        if (cm_open_device(ctrlfile->name, ctrlfile->type, knl_io_flag(session), &ctrlfile->handle) != OG_SUCCESS) {
            OG_LOG_RUN_ERR("[DB] failed to open %s ", ctrlfile->name);
            cm_spin_unlock(&db->ctrl_lock);
            return OG_ERROR;
        }

        if (db_save_ctrl_page(session, ctrlfile, page_id) != OG_SUCCESS) {
            cm_spin_unlock(&db->ctrl_lock);
            return OG_ERROR;
        }

        if (db_save_ctrl_page(session, ctrlfile, CORE_CTRL_PAGE_ID) != OG_SUCCESS) {
            cm_spin_unlock(&db->ctrl_lock);
            return OG_ERROR;
        }

        if (session->kernel->attr.clustered) {
            if (db_save_ctrl_page(session, ctrlfile, CTRL_LOG_SEGMENT + node_id) != OG_SUCCESS) {
                OG_LOG_RUN_ERR("[DB] failed to write %s ", ctrlfile->name);
                cm_spin_unlock(&db->ctrl_lock);
                CM_ABORT_REASONABLE(0, "[DB] ABORT INFO: save core control file failed");
                return OG_ERROR;
            }
        }
    }
    cm_spin_unlock(&db->ctrl_lock);
    return OG_SUCCESS;
}

arch_ctrl_t *db_get_arch_ctrl(knl_session_t *session, uint32 id, uint32 node_id)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    database_t *db = &kernel->db;
    uint32 count = CTRL_MAX_BUF_SIZE / sizeof(arch_ctrl_t);
    uint32 pages_per_inst = (OG_MAX_ARCH_NUM - 1) / count + 1;
    uint32 page_id = db->ctrl.arch_segment + pages_per_inst * node_id + id / count;
    uint32 slot = id % count;

    knl_panic(page_id < CTRL_MAX_PAGES(session));
    CM_ABORT(page_id < CTRL_MAX_PAGES(session),
             "[DB] ABORT INFO: the count of control page has reached max control page %u",
             CTRL_MAX_PAGES(session));

    ctrl_page_t *page = &db->ctrl.pages[page_id];
    return (arch_ctrl_t *)(page->buf + slot * sizeof(arch_ctrl_t));
}

/*
 * check if ctrl file readable
 */
status_t db_check(knl_session_t *session, text_t *ctrlfiles, bool32 *is_found)
{
    knl_instance_t *kernel = (knl_instance_t *)session->kernel;
    text_t file_name;
    text_t temp_ctrlfiles;
    int32 fp = OG_INVALID_HANDLE;
    char name[OG_FILE_NAME_BUFFER_SIZE] = { 0 };
    char *param = cm_get_config_value(kernel->attr.config, "CONTROL_FILES");

    cm_str2text(param, ctrlfiles);
    cm_str2text(param, &temp_ctrlfiles);
    if (ctrlfiles->len == 0) {
        *is_found = OG_FALSE;
        return OG_SUCCESS;
    }

    cm_remove_brackets(&temp_ctrlfiles);
    db_fetch_ctrlfile_name(&temp_ctrlfiles, &file_name);
    (void)cm_text2str(&file_name, name, OG_FILE_NAME_BUFFER_SIZE);

    device_type_t type = cm_device_type(name);
    if (cm_open_device(name, type, knl_io_flag(session), &fp) != OG_SUCCESS) {
        *is_found = OG_FALSE;
        OG_LOG_RUN_ERR("[DB] failed to open %s ", name);
        return OG_ERROR;
    }
    cm_close_device(type, &fp);
    *is_found = OG_TRUE;

    return OG_SUCCESS;
}

void db_update_name_by_path(const char *path, char *name, uint32 len)
{
    text_t left;
    text_t right;
    text_t text;
    char right_str[OG_FILE_NAME_BUFFER_SIZE];
    errno_t err;

    cm_str2text(name, &text);
    (void)cm_split_rtext(&text, SLASH, '\0', &left, &right);
    (void)cm_text2str(&right, right_str, OG_FILE_NAME_BUFFER_SIZE);
    err = snprintf_s(name, len, len - 1, "%s/%s", path, right_str);
    knl_securec_check_ss(err);
}

status_t db_update_ctrl_filename(knl_session_t *session)
{
    knl_instance_t *kernel = session->kernel;
    ctrlfile_set_t *ctrlfiles = &kernel->db.ctrlfiles;
    char *param = NULL;
    text_t local_ctrlfiles;
    text_t file_name;
    char path[OG_FILE_NAME_BUFFER_SIZE];
    char name[OG_FILE_NAME_BUFFER_SIZE];
    uint32 i;
    errno_t err;

    param = cm_get_config_value(kernel->attr.config, "CONTROL_FILES");
    cm_str2text(param, &local_ctrlfiles);
    if (local_ctrlfiles.len == 0) {
        OG_LOG_RUN_ERR("the value of CONTROL_FILES is invaild");
        return OG_ERROR;
    }
    if (cm_check_exist_special_char(param, (uint32)strlen(param))) {
        OG_THROW_ERROR(ERR_INVALID_DIR, param);
        return OG_ERROR;
    }

    err = snprintf_s(path, OG_FILE_NAME_BUFFER_SIZE, OG_FILE_NAME_BUFFER_SIZE - 1, "%s/%s", kernel->home, "data");
    knl_securec_check_ss(err);
    cm_remove_brackets(&local_ctrlfiles);
    for (i = 0; i < ctrlfiles->count; i++) {
        db_fetch_ctrlfile_name(&local_ctrlfiles, &file_name);
        if (file_name.len == 0) {
            OG_LOG_RUN_ERR("the value of CONTROL_FILES is invaild");
            return OG_ERROR;
        }
        (void)cm_text2str(&file_name, name, OG_FILE_NAME_BUFFER_SIZE);
        if (cm_device_type(name) == DEV_TYPE_FILE) {
            db_update_name_by_path(path, name, OG_FILE_NAME_BUFFER_SIZE);
        }
        err = strcpy_sp(ctrlfiles->items[i].name, OG_FILE_NAME_BUFFER_SIZE, name);
        knl_securec_check(err);
    }

    return OG_SUCCESS;
}

status_t db_update_config_ctrl_name(knl_session_t *session)
{
    config_t *config = session->kernel->attr.config;
    knl_instance_t *kernel = session->kernel;
    ctrlfile_set_t *ctrlfiles = &kernel->db.ctrlfiles;
    char value[OG_MAX_CONFIG_LINE_SIZE] = { 0 };
    uint32 i;
    errno_t err;

    err = strcpy_sp(value, OG_MAX_CONFIG_LINE_SIZE, "(");
    knl_securec_check(err);
    if (ctrlfiles->count > 1) {
        for (i = 0; i < ctrlfiles->count - 1; i++) {
            err = strcat_sp(value, OG_MAX_CONFIG_LINE_SIZE, ctrlfiles->items[i].name);
            knl_securec_check(err);
            err = strcat_sp(value, OG_MAX_CONFIG_LINE_SIZE, ", ");
            knl_securec_check(err);
        }
    }
    err = strcat_sp(value, OG_MAX_CONFIG_LINE_SIZE, ctrlfiles->items[ctrlfiles->count - 1].name);
    knl_securec_check(err);
    err = strcat_sp(value, OG_MAX_CONFIG_LINE_SIZE, ")");
    knl_securec_check(err);

    if (cm_alter_config(config, "CONTROL_FILES", value, CONFIG_SCOPE_BOTH, OG_TRUE) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

static status_t db_update_ctrl_logfile_name(knl_session_t *session)
{
    knl_attr_t *attr = &session->kernel->attr;
    log_file_ctrl_t *logfile = NULL;
    uint32 i;
    logfile_set_t *logfile_set = MY_LOGFILE_SET(session);

    if (!attr->log_file_convert.is_convert) {
        return OG_SUCCESS;
    }

    for (i = 0; i < logfile_set->logfile_hwm; i++) {
        logfile = logfile_set->items[i].ctrl;
        if (LOG_IS_DROPPED(logfile->flg)) {
            continue;
        }
        if (db_change_storage_path(&attr->log_file_convert, logfile->name, OG_FILE_NAME_BUFFER_SIZE) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    return OG_SUCCESS;
}

static status_t db_update_ctrl_datafile_name(knl_session_t *session)
{
    knl_attr_t *attr = &session->kernel->attr;
    knl_instance_t *kernel = session->kernel;
    datafile_ctrl_t *datafile = NULL;
    uint32 i;

    if (!attr->data_file_convert.is_convert) {
        return OG_SUCCESS;
    }

    for (i = 0; i < OG_MAX_DATA_FILES; i++) {
        datafile = kernel->db.datafiles[i].ctrl;
        if (!datafile->used) {
            continue;
        }
        if (db_change_storage_path(&attr->data_file_convert, datafile->name, OG_FILE_NAME_BUFFER_SIZE) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    return OG_SUCCESS;
}

status_t db_update_storage_filename(knl_session_t *session)
{
    if (db_update_ctrl_logfile_name(session) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (db_update_ctrl_datafile_name(session) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

void db_update_sysdata_version(knl_session_t *session)
{
    database_t *db = &session->kernel->db;
    rd_update_sysdata_t redo;

    redo.op_type = RD_UPDATE_SYSDATA_VERSION;
    redo.sysdata_version = CORE_SYSDATA_VERSION;

    OG_LOG_RUN_INF("[UPGRADE] system data version is update from %u to %u",
        db->ctrl.core.sysdata_version, CORE_SYSDATA_VERSION);
    db->ctrl.core.sysdata_version = CORE_SYSDATA_VERSION;
    if (db_save_core_ctrl(session) != OG_SUCCESS) {
        CM_ABORT(0, "[UPGRADE] ABORT INFO: update system data version failed when perform upgrade");
    }

    log_put(session, RD_LOGIC_OPERATION, &redo, sizeof(rd_update_sysdata_t), LOG_ENTRY_FLAG_NONE);
    knl_commit(session);
}

void rd_update_sysdata_version(knl_session_t *session, log_entry_t *log)
{
    database_t *db = &session->kernel->db;
    if (log->size != CM_ALIGN4(sizeof(rd_update_sysdata_t)) + LOG_ENTRY_SIZE) {
        OG_LOG_RUN_ERR("[UPGRADE]no need to replay update sys version, log size %u is wrong", log->size);
        return;
    }
    rd_update_sysdata_t *redo = (rd_update_sysdata_t *)log->data;

    if (redo->sysdata_version != CORE_SYSDATA_VERSION) {
        OG_LOG_RUN_ERR("[UPGRADE] update sys version failed: the system data's binary version"
                       "is different between primary and standby");
        CM_ASSERT(0);
    }
    OG_LOG_RUN_INF("[UPGRADE] system data version is update from %u to %u",
        db->ctrl.core.sysdata_version, CORE_SYSDATA_VERSION);
    db->ctrl.core.sysdata_version = CORE_SYSDATA_VERSION;
    if (db_save_core_ctrl(session) != OG_SUCCESS) {
        CM_ABORT(0, "[UPGRADE] ABORT INFO: update system data version failed when perform upgrade");
    }
}

void print_update_sysdata_version(log_entry_t *log)
{
    rd_update_sysdata_t *redo = (rd_update_sysdata_t *)log->data;
    printf("update sysdata version:%u\n", redo->sysdata_version);
}

bool32 db_sysdata_version_is_equal(knl_session_t *session, bool32 is_upgrade)
{
    database_t *db = &session->kernel->db;
    knl_attr_t *attr = &session->kernel->attr;

    if (is_upgrade || !attr->check_sysdata_version) {
        return OG_TRUE;
    }
    if (db->ctrl.core.sysdata_version != CORE_SYSDATA_VERSION) {
        OG_LOG_RUN_ERR("[CTRL] the system data's version is different between binary and ctrl file");
        OG_THROW_ERROR(ERR_INVALID_OPERATION, ": the system data's version is different between binary and ctrl file");
        return OG_FALSE;
    }

    return OG_TRUE;
}

bool32 db_cur_ctrl_version_is_higher(knl_session_t *session, ctrl_version_t version)
{
    ctrl_version_t cur_version = DB_CORE_CTRL(session)->version;
    if (cur_version.main == version.main) {
        if (cur_version.major == version.major) {
            return (cur_version.revision == version.revision) ?
                   (cur_version.inner > version.inner) : (cur_version.revision > version.revision);
        } else {
            return (cur_version.major > version.major);
        }
    } else {
        return (cur_version.main > version.main);
    }
}

bool32 db_equal_to_cur_ctrl_version(knl_session_t *session, ctrl_version_t version)
{
    ctrl_version_t cur_version = DB_CORE_CTRL(session)->version;
    return (cur_version.main == version.main && cur_version.major == version.major &&
            cur_version.revision == version.revision && (cur_version.inner == version.inner));
}

bool32 db_cur_ctrl_version_is_higher_or_equal(knl_session_t *session, ctrl_version_t version)
{
    return (db_cur_ctrl_version_is_higher(session, version) || db_equal_to_cur_ctrl_version(session, version));
}

#ifdef __cplusplus
}
#endif