/* -------------------------------------------------------------------------
 *  This file is part of the oGRAC project.
 * Copyright (c) 2024 Huawei Technologies Co.,Ltd.
 *
 * oGRAC is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * rb_purge.c
 *
 *
 * IDENTIFICATION
 * src/kernel/flashback/rb_purge.c
 *
 * -------------------------------------------------------------------------
 */
#include "knl_flash_module.h"
#include "rb_purge.h"
#include "index_common.h"
#include "knl_context.h"
#include "knl_table.h"
#include "knl_ctlg.h"
#include "dc_log.h"

/*
 * fetch a recycle bin object by origin name
 * @param kernel session, purge object, object type, object description(output)
 * @note if there are two object whose origin name are same, we fetch the oldest object
 * to purge.
 */
status_t rb_purge_fetch_name(knl_session_t *session, knl_purge_def_t *def, rb_object_type_t type, knl_rb_desc_t *desc)
{
    char part_name[OG_NAME_BUFFER_SIZE];
    knl_cursor_t *cursor = NULL;
    uint32 uid;

    if (!dc_get_user_id(session, &def->owner, &uid)) {
        OG_THROW_ERROR(ERR_RECYCLE_OBJ_NOT_EXIST, T2S(&def->owner), T2S_EX(&def->name));
        return OG_ERROR;
    }

    CM_SAVE_STACK(session->stack);

    cursor = knl_push_cursor(session);
    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_RB_ID, 0);
    knl_init_index_scan(cursor, OG_FALSE);
    knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, 0);
    knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, 0);

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    while (!cursor->eof) {
        rb_convert_desc(cursor, desc);

        if (desc->type == type && desc->uid == uid &&
            cm_text_str_equal(&def->name, desc->org_name) &&
            (def->part_name.str == NULL || cm_text_str_equal(&def->part_name, desc->part_name))) {
            CM_RESTORE_STACK(session->stack);
            return OG_SUCCESS;
        }

        if (knl_fetch(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }
    }

    CM_RESTORE_STACK(session->stack);

    if (def->part_name.str == NULL) {
        OG_THROW_ERROR(ERR_RECYCLE_OBJ_NOT_EXIST, T2S(&def->owner), T2S_EX(&def->name));
    } else {
        (void)cm_text2str(&def->part_name, part_name, OG_NAME_BUFFER_SIZE);
        OG_THROW_ERROR(ERR_RECYCLE_PARTITION_NOT_EXIST, T2S(&def->owner), T2S_EX(&def->name), part_name);
    }

    return OG_ERROR;
}

/*
 * fetch a recycle bin object by object name
 * @param kernel session, purge object, object type, object description(output)
 */
status_t rb_purge_fetch_object(knl_session_t *session, knl_purge_def_t *def,
                               rb_object_type_t type, knl_rb_desc_t *desc)
{
    char buf[OG_NAME_BUFFER_SIZE];
    knl_cursor_t *cursor = NULL;
    errno_t ret;

    desc->id = OG_INVALID_ID64;
    (void)cm_text2str(&def->name, buf, OG_NAME_BUFFER_SIZE);
    ret = sscanf_s(buf, "BIN$%u$%llX", &desc->table_id, &desc->id);
    knl_securec_check_ss(ret);

    CM_SAVE_STACK(session->stack);

    cursor = knl_push_cursor(session);
    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_RB_ID, IX_SYS_RB001_ID);
    knl_init_index_scan(cursor, OG_TRUE);
    knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key,
                     OG_TYPE_BIGINT, &desc->id, sizeof(uint64), IX_COL_SYS_RB001_ID);

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    if (cursor->eof) {
        CM_RESTORE_STACK(session->stack);
        OG_THROW_ERROR(ERR_RECYCLE_OBJ_NOT_EXIST, T2S(&def->owner), T2S_EX(&def->name));
        return OG_ERROR;
    }

    rb_convert_desc(cursor, desc);
    CM_RESTORE_STACK(session->stack);

    if (desc->type != type) {
        OG_THROW_ERROR_EX(ERR_RECYCLEBIN_MISMATCH, "%s type does not match expected type", T2S(&def->name));
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

/*
 * fetch a recycle bin drop table
 * @param kernel session, user, table name, object description(output)
 * @note if there are two object whose origin names are same, we fetch the newest table
 * to flashback.
 */
static status_t rb_fetch_drop_table(knl_session_t *session, text_t *user, text_t *name, knl_rb_desc_t *desc)
{
    knl_cursor_t *cursor = NULL;
    uint32 uid;

    if (!dc_get_user_id(session, user, &uid)) {
        OG_THROW_ERROR(ERR_USER_NOT_EXIST, T2S(user));
        return OG_ERROR;
    }

    CM_SAVE_STACK(session->stack);

    cursor = knl_push_cursor(session);
    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_RB_ID, 0);
    cursor->index_dsc = OG_TRUE;

    knl_init_index_scan(cursor, OG_FALSE);
    knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, 0);
    knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, 0);

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    while (!cursor->eof) {
        rb_convert_desc(cursor, desc);

        if (desc->type == RB_TABLE_OBJECT && desc->oper == RB_OPER_DROP &&
            desc->uid == uid && cm_text_str_equal(name, desc->org_name)) {
            CM_RESTORE_STACK(session->stack);
            return OG_SUCCESS;
        }

        if (knl_fetch(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }
    }

    CM_RESTORE_STACK(session->stack);
    OG_THROW_ERROR(ERR_RECYCLE_OBJ_NOT_EXIST, T2S(user), T2S_EX(name));
    return OG_ERROR;
}

/*
 * fetch a recycle bin truncate table
 * @param kernel session, kernel dictionary, object description(output)
 * @note if there are two object whose origin names are same, we fetch the newest table
 * to flashback.
 */
static status_t rb_fetch_truncate_table(knl_session_t *session, knl_dictionary_t *dc, knl_rb_desc_t *desc)
{
    knl_cursor_t *cursor = NULL;

    CM_SAVE_STACK(session->stack);

    cursor = knl_push_cursor(session);
    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_RB_ID, IX_SYS_RB001_ID);
    cursor->index_dsc = OG_TRUE;

    knl_init_index_scan(cursor, OG_FALSE);
    knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, IX_COL_SYS_RB001_ID);
    knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, IX_COL_SYS_RB001_ID);

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    while (!cursor->eof) {
        rb_convert_desc(cursor, desc);

        if (desc->type == RB_TABLE_OBJECT && desc->oper == RB_OPER_TRUNCATE &&
            desc->uid == dc->uid && desc->table_id == dc->oid) {
            CM_RESTORE_STACK(session->stack);
            return OG_SUCCESS;
        }

        if (knl_fetch(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }
    }

    CM_RESTORE_STACK(session->stack);
    OG_THROW_ERROR(ERR_RECYCLE_OBJ_NOT_EXIST, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc));
    return OG_ERROR;
}

/*
 * fetch a recycle bin truncate table part
 * @param kernel session, kernel dictionary, object description(output)
 * @note if there are two object whose origin names are same, we fetch the newest table part
 * to flashback.
 */
static status_t rb_fetch_truncate_tabpart(knl_session_t *session, knl_dictionary_t *dc,
                                   table_part_t *table_part, knl_rb_desc_t *desc)
{
    knl_cursor_t *cursor = NULL;

    CM_SAVE_STACK(session->stack);

    cursor = knl_push_cursor(session);
    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_RB_ID, IX_SYS_RB001_ID);
    cursor->index_dsc = OG_TRUE;

    knl_init_index_scan(cursor, OG_FALSE);
    knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, IX_COL_SYS_RB001_ID);
    knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, IX_COL_SYS_RB001_ID);

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    while (!cursor->eof) {
        rb_convert_desc(cursor, desc);

        if (desc->type == RB_TABLE_PART_OBJECT && desc->oper == RB_OPER_TRUNCATE &&
            desc->uid == dc->uid && desc->table_id == dc->oid &&
            desc->id == desc->base_id && desc->org_scn == table_part->desc.org_scn &&
            cm_str_equal_ins(desc->part_name, table_part->desc.name)) {
            CM_RESTORE_STACK(session->stack);
            return OG_SUCCESS;
        }

        if (knl_fetch(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }
    }

    CM_RESTORE_STACK(session->stack);
    OG_THROW_ERROR(ERR_RECYCLE_PARTITION_NOT_EXIST, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc), table_part->desc.name);
    return OG_ERROR;
}

/*
 * fetch the base object of current object
 * We can use table_id decoded from current object and base_id to construct the
 * base object name, so that we can use the index on obj_name column to scan
 * the related object quickly.
 * @param kernel session, object description(input with output)
 * @notes this may cause recursive function call.
 */
static status_t rb_fetch_base_object(knl_session_t *session, knl_rb_desc_t *desc)
{
    knl_cursor_t *cursor = NULL;

    if (desc->id == desc->base_id) {
        return OG_SUCCESS;
    }

    CM_SAVE_STACK(session->stack);

    cursor = knl_push_cursor(session);
    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_RB_ID, IX_SYS_RB001_ID);
    knl_init_index_scan(cursor, OG_TRUE);
    knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key,
                     OG_TYPE_BIGINT, &desc->base_id, sizeof(uint64), IX_COL_SYS_RB001_ID);

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    if (cursor->eof) {
        CM_RESTORE_STACK(session->stack);
        OG_THROW_ERROR(ERR_OBJECT_ID_NOT_EXIST, "recyclebin base object", desc->base_id);
        return OG_ERROR;
    }

    rb_convert_desc(cursor, desc);
    CM_RESTORE_STACK(session->stack);

    return OG_SUCCESS;
}

/*
 * fetch the base object of current object
 * We can use table_id decoded from current object and base_id to construct the
 * base object name, so that we can use the index on obj_name column to scan
 * the related object quickly.
 * @param kernel session, object description(input with output)
 * @notes this may cause recursive function call.
 */
static status_t rb_fetch_purge_object(knl_session_t *session, knl_rb_desc_t *desc)
{
    knl_cursor_t *cursor = NULL;

    if (desc->id == desc->purge_id) {
        if (!desc->can_purge || desc->is_cons) {
            if (rb_fetch_base_object(session, desc) != OG_SUCCESS) {
                return OG_ERROR;
            }
        }
        return OG_SUCCESS;
    }

    CM_SAVE_STACK(session->stack);

    cursor = knl_push_cursor(session);

    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_RB_ID, IX_SYS_RB001_ID);
    knl_init_index_scan(cursor, OG_TRUE);
    knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_BIGINT,
                     &desc->purge_id, sizeof(uint64), IX_COL_SYS_RB001_ID);

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    if (cursor->eof) {
        CM_RESTORE_STACK(session->stack);
        OG_THROW_ERROR(ERR_OBJECT_ID_NOT_EXIST, "recyclebin purge object", desc->purge_id);
        return OG_ERROR;
    }

    rb_convert_desc(cursor, desc);
    CM_RESTORE_STACK(session->stack);

    if (!desc->can_purge || desc->is_cons) {
        if (rb_fetch_base_object(session, desc) != OG_SUCCESS) {
            return OG_SUCCESS;
        }
    }

    return OG_SUCCESS;
}

/*
 * fetch a recycle bin object
 * Fetch a recycle bin object to purge, so we should fetch the base object.
 * @param kernel session, object description(output), found(output)
 */
static status_t rb_purge_fetch(knl_session_t *session, knl_rb_desc_t *desc, bool32 *found)
{
    knl_cursor_t *cursor = NULL;

    CM_SAVE_STACK(session->stack);

    cursor = knl_push_cursor(session);

    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_RB_ID, OG_INVALID_ID32);

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    if (cursor->eof) {
        *found = OG_FALSE;
        CM_RESTORE_STACK(session->stack);
        return OG_SUCCESS;
    }

    *found = OG_TRUE;
    rb_convert_desc(cursor, desc);
    CM_RESTORE_STACK(session->stack);

    if (rb_fetch_base_object(session, desc) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

/*
 * fetch a space recycle bin object
 * In fetch space, we use current scn instead of session query scn as cursor query scn
 * 'causing in space auto purging, we want to check if there is any objects in recycle bin
 * now, but we should not change the current session query scn which is being used for
 * visibility judgment.
 * @param kernel session, space id, object description(output), found(output)
 * @note if there are two object whose origin names are same, we fetch the oldest table
 * to purge.
 */
status_t rb_purge_fetch_space(knl_session_t *session, uint32 space_id, knl_rb_desc_t *desc, bool32 *found)
{
    knl_cursor_t *cursor = NULL;
    knl_rb_desc_t temp;
    errno_t ret;

    *found = OG_FALSE;
    desc->rec_scn = OG_INVALID_ID64;

    CM_SAVE_STACK(session->stack);

    cursor = knl_push_cursor(session);

    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_RB_ID, IX_SYS_RB003_ID);
    cursor->query_scn = DB_CURR_SCN(session);
    cursor->query_lsn = DB_CURR_LSN(session);

    knl_init_index_scan(cursor, OG_TRUE);
    knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key,
                     OG_TYPE_INTEGER, &space_id, sizeof(uint32), IX_COL_SYS_RB003_SPACE_ID);

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    while (!cursor->eof) {
        rb_convert_desc(cursor, &temp);

        if (temp.rec_scn < desc->rec_scn) {
            ret = memcpy_sp(desc, sizeof(knl_rb_desc_t), &temp, sizeof(knl_rb_desc_t));
            knl_securec_check(ret);
            *found = OG_TRUE;
        }

        if (knl_fetch(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }
    }

    CM_RESTORE_STACK(session->stack);

    if (*found) {
        if (rb_fetch_purge_object(session, desc) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    return OG_SUCCESS;
}

/*
 * delete the specified recycle bin object and its related objects
 * @param kernel session, object description
 */
static status_t rb_purge_delete(knl_session_t *session, knl_rb_desc_t *desc)
{
    dc_context_t *ogx = &session->kernel->dc_ctx;
    knl_rb_desc_t temp;
    knl_cursor_t *cursor = NULL;

    CM_SAVE_STACK(session->stack);

    cursor = knl_push_cursor(session);

    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_DELETE, SYS_RB_ID, IX_SYS_RB002_ID);

    if (desc->purge_id == desc->base_id) {
        knl_init_index_scan(cursor, OG_FALSE);
        knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_BIGINT, &desc->base_id,
                         sizeof(uint64), IX_COL_SYS_RB002_BASE_ID);
        knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, IX_COL_SYS_RB002_PURGE_ID);
        knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.r_key, OG_TYPE_BIGINT, &desc->base_id,
                         sizeof(uint64), IX_COL_SYS_RB002_BASE_ID);
        knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, IX_COL_SYS_RB002_PURGE_ID);
    } else {
        knl_init_index_scan(cursor, OG_TRUE);
        knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_BIGINT, &desc->base_id,
                         sizeof(uint64), IX_COL_SYS_RB002_BASE_ID);
        knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_BIGINT, &desc->purge_id,
                         sizeof(uint64), IX_COL_SYS_RB002_PURGE_ID);
    }

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    if (cursor->eof) {
        CM_RESTORE_STACK(session->stack);
        OG_THROW_ERROR(ERR_RECYCLE_OBJ_NOT_EXIST, ogx->users[desc->uid]->desc.name, desc->org_name);
        return OG_ERROR;
    }

    while (!cursor->eof) {
        rb_convert_desc(cursor, &temp);

        if (knl_internal_delete(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }

        if (knl_fetch(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }
    }

    CM_RESTORE_STACK(session->stack);

    return OG_SUCCESS;
}

/*
 * fetch truncate recycle bin object
 * @param kernel session, user id, table id, object description, found(output)
 */
static status_t rb_fetch_truncate(knl_session_t *session, uint32 uid, uint32 oid, knl_rb_desc_t *desc, bool32 *found)
{
    knl_cursor_t *cursor = NULL;

    *found = OG_FALSE;

    CM_SAVE_STACK(session->stack);

    cursor = knl_push_cursor(session);

    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_RB_ID, OG_INVALID_ID32);

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    while (!cursor->eof) {
        rb_convert_desc(cursor, desc);

        if (desc->oper == RB_OPER_TRUNCATE &&
            desc->uid == uid && desc->table_id == oid &&
            desc->id == desc->base_id) {
            *found = OG_TRUE;
            break;
        }

        if (knl_fetch(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }
    }

    CM_RESTORE_STACK(session->stack);

    return OG_SUCCESS;
}

/*
 * fetch a recycle bin truncate table subpart
 * @param kernel session, kernel dictionary, object description(output)
 * @note if there are two object whose origin names are same, we fetch the newest table part
 * to flashback.
 */
static status_t rb_fetch_truncate_subtabpart(knl_session_t *session, knl_dictionary_t *dc, table_part_t *subpart,
    knl_rb_desc_t *desc)
{
    CM_SAVE_STACK(session->stack);

    knl_cursor_t *cursor = knl_push_cursor(session);
    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_RB_ID, IX_SYS_RB001_ID);
    cursor->index_dsc = OG_TRUE;

    knl_init_index_scan(cursor, OG_FALSE);
    knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, IX_COL_SYS_RB001_ID);
    knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, IX_COL_SYS_RB001_ID);

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    while (!cursor->eof) {
        rb_convert_desc(cursor, desc);

        if (desc->type == RB_TABLE_SUBPART_OBJECT && desc->oper == RB_OPER_TRUNCATE &&
            desc->uid == dc->uid && desc->table_id == dc->oid &&
            desc->id == desc->base_id && desc->org_scn == subpart->desc.org_scn &&
            cm_str_equal_ins(desc->part_name, subpart->desc.name)) {
            CM_RESTORE_STACK(session->stack);
            return OG_SUCCESS;
        }

        if (knl_fetch(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }
    }

    CM_RESTORE_STACK(session->stack);
    OG_THROW_ERROR(ERR_RECYCLE_PARTITION_NOT_EXIST, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc), subpart->desc.name);
    return OG_ERROR;
}

/*
 * purge truncate object and related objects
 */
static status_t rb_purge_truncate(knl_session_t *session, knl_rb_desc_t *desc)
{
    dc_context_t *ogx = &session->kernel->dc_ctx;
    knl_cursor_t *cursor = NULL;
    status_t status = OG_ERROR;

    CM_SAVE_STACK(session->stack);

    cursor = knl_push_cursor(session);

    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_DELETE, SYS_RB_ID, IX_SYS_RB002_ID);
    knl_init_index_scan(cursor, OG_FALSE);
    knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key,
                     OG_TYPE_BIGINT, &desc->base_id, sizeof(uint64), IX_COL_SYS_RB002_BASE_ID);
    knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, IX_COL_SYS_RB002_PURGE_ID);
    knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.r_key,
                     OG_TYPE_BIGINT, &desc->base_id, sizeof(uint64), IX_COL_SYS_RB002_BASE_ID);
    knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, IX_COL_SYS_RB002_PURGE_ID);
    
    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    if (cursor->eof) {
        CM_RESTORE_STACK(session->stack);
        OG_THROW_ERROR(ERR_RECYCLE_OBJ_NOT_EXIST, ogx->users[desc->uid]->desc.name, desc->org_name);
        return OG_ERROR;
    }

    while (!cursor->eof) {
        rb_convert_desc(cursor, desc);

        switch (desc->type) {
            case RB_TABLE_OBJECT:
            case RB_TABLE_PART_OBJECT:
            case RB_TABLE_SUBPART_OBJECT:
                status = heap_purge_prepare(session, desc);
                break;

            case RB_INDEX_OBJECT:
            case RB_INDEX_PART_OBJECT:
            case RB_INDEX_SUBPART_OBJECT:
                status = btree_purge_prepare(session, desc);
                break;

            case RB_LOB_OBJECT:
            case RB_LOB_PART_OBJECT:
            case RB_LOB_SUBPART_OBJECT:
                status = lob_purge_prepare(session, desc);
                break;

            default:
                CM_RESTORE_STACK(session->stack);
                OG_THROW_ERROR(ERR_INVALID_PURGE_TYPE, desc->type);
                return OG_ERROR;
        }

        if (status != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }

        if (knl_internal_delete(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }

        if (knl_fetch(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }
    }

    CM_RESTORE_STACK(session->stack);

    knl_commit(session);

    if (db_garbage_segment_handle(session, desc->uid, desc->table_id, OG_TRUE) != OG_SUCCESS) {
        OG_LOG_RUN_ERR("failed to handle garbage segment");
        cm_spin_lock(&session->kernel->rmon_ctx.mark_mutex, NULL);
        session->kernel->rmon_ctx.delay_clean_segments = OG_TRUE;
        cm_spin_unlock(&session->kernel->rmon_ctx.mark_mutex);
    }

    return OG_SUCCESS;
}

/*
 * purge drop related
 * Truncated objects in recycle bin should be purged when drop table
 * or purge drop table
 * @param kernel session, user id, table id
 */
status_t rb_purge_drop_related(knl_session_t *session, uint32 uid, uint32 oid)
{
    knl_rb_desc_t desc;
    bool32 found = OG_FALSE;

    knl_set_session_scn(session, OG_INVALID_ID64);

    if (rb_fetch_truncate(session, uid, oid, &desc, &found) != OG_SUCCESS) {
        return OG_ERROR;
    }

    while (found) {
        if (rb_purge_truncate(session, &desc) != OG_SUCCESS) {
            return OG_ERROR;
        }

        knl_set_session_scn(session, OG_INVALID_ID64);

        if (rb_fetch_truncate(session, uid, oid, &desc, &found) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    return OG_SUCCESS;
}

/*
 * purge drop table from recycle bin
 * Causing we are still holding the dc entry, so we can use the db drop table
 * interface directly.
 * @param kernel session, kernel dictionary, object description
 * @note when purge drop table, we should purge truncate object related to current table
 */
static status_t rb_purge_drop_table(knl_session_t *session, knl_rb_desc_t *desc)
{
    dc_context_t *ogx = &session->kernel->dc_ctx;
    knl_dictionary_t dc;

    if (knl_open_dc_by_id(session, desc->uid, desc->table_id, &dc, OG_FALSE) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (desc->org_scn != dc.org_scn) {
        OG_THROW_ERROR(ERR_RECYCLE_OBJ_NOT_EXIST, ogx->users[desc->uid]->desc.name, desc->org_name);
        dc_close(&dc);
        return OG_ERROR;
    }

    uint32 timeout = session->kernel->attr.ddl_lock_timeout;
    if (lock_table_directly(session, &dc, timeout) != OG_SUCCESS) {
        dc_close(&dc);
        return OG_ERROR;
    }

    if (rb_purge_drop_related(session, dc.uid, dc.oid) != OG_SUCCESS) {
        unlock_tables_directly(session);
        dc_close(&dc);
        return OG_ERROR;
    }

    if (rb_purge_delete(session, desc) != OG_SUCCESS) {
        unlock_tables_directly(session);
        dc_close(&dc);
        return OG_ERROR;
    }

    if (db_drop_table(session, NULL, &dc, OG_TRUE) != OG_SUCCESS) {
        unlock_tables_directly(session);
        dc_close(&dc);
        return OG_ERROR;
    }

    unlock_tables_directly(session);
    dc_free_entry(session, DC_ENTRY(&dc));
    dc_close(&dc);
    return OG_SUCCESS;
}

/*
 * purge drop index from recycle bin
 * @param kernel session, kernel dictionary, object description
 */
static status_t rb_purge_drop_index(knl_session_t *session, knl_rb_desc_t *desc)
{
    dc_context_t *ogx = &session->kernel->dc_ctx;
    knl_rb_desc_t bo_desc;
    knl_dictionary_t dc;
    knl_index_desc_t index_desc;
    text_t index_name;
    rd_table_t redo;
    index_t *index = NULL;
    errno_t ret;

    ret = memcpy_sp(&bo_desc, sizeof(knl_rb_desc_t), desc, sizeof(knl_rb_desc_t));
    knl_securec_check(ret);
    if (rb_fetch_base_object(session, &bo_desc) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (knl_open_dc_by_id(session, desc->uid, desc->table_id, &dc, OG_FALSE) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (bo_desc.org_scn != dc.org_scn) {
        OG_THROW_ERROR(ERR_RECYCLE_OBJ_NOT_EXIST, ogx->users[desc->uid]->desc.name, desc->org_name);
        dc_close(&dc);
        return OG_ERROR;
    }

    uint32 timeout = session->kernel->attr.ddl_lock_timeout;
    if (lock_table_directly(session, &dc, timeout) != OG_SUCCESS) {
        dc_close(&dc);
        return OG_ERROR;
    }

    cm_str2text(desc->name, &index_name);
    if (db_fetch_index_desc(session, desc->uid, &index_name, &index_desc) != OG_SUCCESS) {
        unlock_tables_directly(session);
        dc_close(&dc);
        return OG_ERROR;
    }

    if (rb_purge_delete(session, desc) != OG_SUCCESS) {
        unlock_tables_directly(session);
        dc_close(&dc);
        return OG_ERROR;
    }

    index = dc_find_index_by_id((dc_entity_t *)dc.handle, index_desc.id);
    if (index == NULL) {
        unlock_tables_directly(session);
        dc_close(&dc);
        OG_THROW_ERROR(ERR_OBJECT_ID_NOT_EXIST, "index", index_desc.id);
        return OG_ERROR;
    }
    if (index->desc.is_enforced) {
        unlock_tables_directly(session);
        dc_close(&dc);
        OG_THROW_ERROR(ERR_INDEX_ENFORCEMENT);
        return OG_ERROR;
    }

    if (db_drop_index(session, index, &dc) != OG_SUCCESS) {
        unlock_tables_directly(session);
        dc_close(&dc);
        return OG_ERROR;
    }

    redo.op_type = RD_ALTER_TABLE;
    redo.uid = dc.uid;
    redo.oid = dc.oid;
    log_put(session, RD_LOGIC_OPERATION, &redo, sizeof(rd_table_t), LOG_ENTRY_FLAG_NONE);

    SYNC_POINT(session, "SP_B1_PURGE_INDEX");
    knl_commit(session);

    if (db_garbage_segment_handle(session, dc.uid, dc.oid, OG_FALSE) != OG_SUCCESS) {
        cm_spin_lock(&session->kernel->rmon_ctx.mark_mutex, NULL);
        session->kernel->rmon_ctx.delay_clean_segments = OG_TRUE;
        cm_spin_unlock(&session->kernel->rmon_ctx.mark_mutex);
        OG_LOG_RUN_ERR("failed to handle garbage segment");
    }

    dc_invalidate(session, (dc_entity_t *)dc.handle);
    unlock_tables_directly(session);
    dc_close(&dc);
    return OG_SUCCESS;
}

/*
 * purge table object from recycle bin
 * The table object may be a truncated table or a dropped table
 * @param kernel session, object description
 * @note we should check the origin scn before we lock the table,
 * maybe someone has just purged the table we fetched and then creates
 * a new table which reuses the same table id.
 */
status_t rb_purge_table(knl_session_t *session, knl_rb_desc_t *desc)
{
    switch (desc->oper) {
        case RB_OPER_DROP:
            return rb_purge_drop_table(session, desc);

        case RB_OPER_TRUNCATE:
            return rb_purge_truncate(session, desc);

        default:
            OG_THROW_ERROR(ERR_INVALID_PURGE_OPER, "invalid purge table operation");
            return OG_ERROR;
    }
}

/*
 * purge index object from recycle bin
 * The index object may be a truncated index or a dropped index
 * @param kernel session, object description
 * @note if the index object is a truncated index, it should be truncated
 * with table to recycle bin at the same time. we must fetch the base truncate
 * table object, and purge the table with its related object.
 */
status_t rb_purge_index(knl_session_t *session, knl_rb_desc_t *desc)
{
    switch (desc->oper) {
        case RB_OPER_DROP:
            return rb_purge_drop_index(session, desc);

        case RB_OPER_TRUNCATE:
            OG_THROW_ERROR(ERR_INVALID_PURGE_OPER, "cannot purge index");
            return OG_ERROR;

        default:
            OG_THROW_ERROR(ERR_INVALID_PURGE_OPER, "invalid purge index operation");
            return OG_ERROR;
    }
}

status_t rb_purge_table_part(knl_session_t *session, knl_rb_desc_t *desc)
{
    if (desc->id != desc->purge_id) {
        OG_THROW_ERROR(ERR_INVALID_PURGE_OPER, "cannot purge table or partition");
        return OG_ERROR;
    }

    return rb_purge_truncate(session, desc);
}

/*
 * purge a recycle bin object
 * Using an autonomous session to purge the object, to avoid the
 * purging process effect current process of the session.
 * @param kernel session, origin description
 */
status_t rb_purge(knl_session_t *session, knl_rb_desc_t *desc)
{
    status_t status;

    if (knl_begin_auton_rm(session) != OG_SUCCESS) {
        return OG_ERROR;
    }

    switch (desc->type) {
        case RB_TABLE_OBJECT:
            status = rb_purge_table(session, desc);
            break;

        case RB_INDEX_OBJECT:
            status = rb_purge_index(session, desc);
            break;

        case RB_TABLE_PART_OBJECT:
        case RB_TABLE_SUBPART_OBJECT:
            status = rb_purge_table_part(session, desc);
            break;

        default:
            OG_THROW_ERROR(ERR_INVALID_PURGE_TYPE, desc->type);
            status = OG_ERROR;
            break;
    }

    knl_end_auton_rm(session, status);

    return status;
}

/*
 * purge all objects in recycle bin
 */
status_t rb_purge_recyclebin(knl_session_t *session)
{
    knl_rb_desc_t desc;
    bool32 found = OG_FALSE;

    knl_set_session_scn(session, OG_INVALID_ID64);

    if (rb_purge_fetch(session, &desc, &found) != OG_SUCCESS) {
        return OG_ERROR;
    }

    while (found) {
        if (rb_purge(session, &desc) != OG_SUCCESS) {
            return OG_ERROR;
        }

        knl_set_session_scn(session, OG_INVALID_ID64);

        if (rb_purge_fetch(session, &desc, &found) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    return OG_SUCCESS;
}

/*
 * purge all objects of the specified space in recycle bin
 */
status_t rb_purge_space(knl_session_t *session, uint32 space_id)
{
    knl_rb_desc_t desc;
    bool32 found = OG_FALSE;

    knl_set_session_scn(session, OG_INVALID_ID64);

    if (rb_purge_fetch_space(session, space_id, &desc, &found) != OG_SUCCESS) {
        return OG_ERROR;
    }

    while (found) {
        if (rb_purge(session, &desc) != OG_SUCCESS) {
            return OG_ERROR;
        }

        knl_set_session_scn(session, OG_INVALID_ID64);

        if (rb_purge_fetch_space(session, space_id, &desc, &found) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    return OG_SUCCESS;
}

static status_t rb_fetch_uid(knl_session_t *session, uint32 uid, knl_rb_desc_t *desc, bool32 *found)
{
    knl_cursor_t *cursor = NULL;
    knl_rb_desc_t temp;
    errno_t ret;

    *found = OG_FALSE;
    desc->rec_scn = OG_INVALID_ID64;

    CM_SAVE_STACK(session->stack)

    cursor = knl_push_cursor(session);
    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_RB_ID, IX_SYS_RB004_ID);
    cursor->query_scn = DB_CURR_SCN(session);
    cursor->query_lsn = DB_CURR_LSN(session);

    knl_init_index_scan(cursor, OG_TRUE);
    knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key,
                     OG_TYPE_INTEGER, &uid, sizeof(uint32), IX_COL_SYS_RB004_USER_ID);

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    while (!cursor->eof) {
        rb_convert_desc(cursor, &temp);

        if (temp.rec_scn < desc->rec_scn) {
            ret = memcpy_sp(desc, sizeof(knl_rb_desc_t), &temp, sizeof(knl_rb_desc_t));
            knl_securec_check(ret);
            *found = OG_TRUE;
        }

        if (knl_fetch(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }
    }

    CM_RESTORE_STACK(session->stack);

    if (*found) {
        if (rb_fetch_purge_object(session, desc) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    return OG_SUCCESS;
}

/*
 * purge all objects of the specified user id in recycle bin
 */
status_t rb_purge_user(knl_session_t *session, uint32 uid)
{
    knl_rb_desc_t desc;
    bool32 found = OG_FALSE;

    knl_set_session_scn(session, OG_INVALID_ID64);

    if (rb_fetch_uid(session, uid, &desc, &found) != OG_SUCCESS) {
        return OG_ERROR;
    }

    while (found) {
        if (rb_purge(session, &desc) != OG_SUCCESS) {
            return OG_ERROR;
        }

        knl_set_session_scn(session, OG_INVALID_ID64);

        if (rb_fetch_uid(session, uid, &desc, &found) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    return OG_SUCCESS;
}

/*
 * restore drop table from recycle bin
 * @param kernel session, object description, new name
 * @note if we specified a new name, we should rename the index either
 */
static status_t rb_restore_drop(knl_session_t *session, knl_rb_desc_t *desc, text_t *new_name)
{
    dc_context_t *ogx = &session->kernel->dc_ctx;
    knl_rb_desc_t temp;
    text_t temp_name;
    knl_cursor_t *cursor = NULL;

    CM_SAVE_STACK(session->stack);

    cursor = knl_push_cursor(session);

    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_DELETE, SYS_RB_ID, IX_SYS_RB002_ID);
    knl_init_index_scan(cursor, OG_FALSE);
    knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key,
                     OG_TYPE_BIGINT, &desc->base_id, sizeof(uint64), IX_COL_SYS_RB002_BASE_ID);
    knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, IX_COL_SYS_RB002_PURGE_ID);
    knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.r_key,
                     OG_TYPE_BIGINT, &desc->base_id, sizeof(uint64), IX_COL_SYS_RB002_BASE_ID);
    knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, IX_COL_SYS_RB002_PURGE_ID);

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    if (cursor->eof) {
        CM_RESTORE_STACK(session->stack);
        OG_THROW_ERROR(ERR_RECYCLE_OBJ_NOT_EXIST, ogx->users[desc->uid]->desc.name, desc->org_name);
        return OG_ERROR;
    }

    while (!cursor->eof) {
        rb_convert_desc(cursor, &temp);

        switch (temp.type) {
            case RB_TABLE_OBJECT:
                if (new_name->str == NULL) {
                    cm_str2text(temp.org_name, &temp_name);
                } else {
                    temp_name = *new_name;
                }

                if (db_update_table_name(session, temp.uid, temp.name, &temp_name, OG_FALSE) != OG_SUCCESS) {
                    CM_RESTORE_STACK(session->stack);
                    OG_THROW_ERROR(ERR_OBJECT_EXISTS, "table", T2S(&temp_name));
                    return OG_ERROR;
                }
                break;

            case RB_INDEX_OBJECT:
                if (new_name->str == NULL) {
                    cm_str2text(temp.org_name, &temp_name);
                    if (db_update_index_name(session, temp.uid, temp.table_id, temp.name, &temp_name) != OG_SUCCESS) {
                        CM_RESTORE_STACK(session->stack);
                        OG_THROW_ERROR(ERR_OBJECT_EXISTS, "index", T2S(&temp_name));
                        return OG_ERROR;
                    }
                }
                break;

            default:
                OG_THROW_ERROR(ERR_INVALID_FLASHBACK_TYPE, temp.type);
                CM_RESTORE_STACK(session->stack);
                return OG_ERROR;
        }

        if (knl_internal_delete(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }

        if (knl_fetch(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }
    }

    CM_RESTORE_STACK(session->stack);

    return OG_SUCCESS;
}

/*
 * flashback drop table from recycle bin
 * Restore table and all its related drop object from recycle bin, restore
 * the dc entry, so user can see it from dc interface.
 * @param kernel session, flashback option
 * @note we should check the origin scn before we lock the table,
 * maybe someone has just purged the table we fetched and then creates
 * a new table which reuses the same table id.
 */
status_t rb_flashback_drop_table(knl_session_t *session, knl_flashback_def_t *def)
{
    dc_context_t *ogx = &session->kernel->dc_ctx;
    knl_rb_desc_t desc;
    knl_dictionary_t dc;
    rd_flashback_drop_t rd;

    if (rb_fetch_drop_table(session, &def->owner, &def->name, &desc) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (knl_open_dc_by_id(session, desc.uid, desc.table_id, &dc, OG_FALSE) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (dc.type == DICT_TYPE_TABLE_EXTERNAL) {
        dc_close(&dc);
        OG_THROW_ERROR(ERR_OPERATIONS_NOT_SUPPORT, "flashback table", "external organized table");
        return OG_ERROR;
    }
    
    if (desc.org_scn != dc.org_scn) {
        dc_close(&dc);
        OG_THROW_ERROR(ERR_RECYCLE_OBJ_NOT_EXIST, ogx->users[desc.uid]->desc.name, desc.org_name);
        return OG_ERROR;
    }

    uint32 timeout = session->kernel->attr.ddl_lock_timeout;
    if (lock_table_directly(session, &dc, timeout) != OG_SUCCESS) {
        dc_close(&dc);
        return OG_ERROR;
    }

    if (rb_restore_drop(session, &desc, &def->ext_name) != OG_SUCCESS) {
        unlock_tables_directly(session);
        dc_close(&dc);
        return OG_ERROR;
    }

    if (def->ext_name.str == NULL) {
        cm_str2text(desc.org_name, &def->ext_name);
    }

    dc_entity_t *entity = DC_ENTITY(&dc);
    if (!dc_restore(session, entity, &def->ext_name)) {
        unlock_tables_directly(session);
        dc_close(&dc);
        OG_THROW_ERROR(ERR_DUPLICATE_TABLE, T2S(&def->owner), T2S_EX(&def->ext_name));
        return OG_ERROR;
    }

    rd.op_type = RD_FLASHBACK_DROP;
    rd.uid = desc.uid;
    rd.table_id = desc.table_id;
    (void)cm_text2str(&def->ext_name, rd.new_name, OG_NAME_BUFFER_SIZE);
    log_put(session, RD_LOGIC_OPERATION, &rd, sizeof(rd_flashback_drop_t), LOG_ENTRY_FLAG_NONE);

    knl_commit(session);
    dc_invalidate(session, entity);
    unlock_tables_directly(session);
    dc_close(&dc);

    return OG_SUCCESS;
}

void rd_flashback_drop_table(knl_session_t *session, log_entry_t *log)
{
    if (log->size != CM_ALIGN4(sizeof(rd_flashback_drop_t)) + LOG_ENTRY_SIZE) {
        OG_LOG_RUN_ERR("[DC] no need to replay drop table, log size %u is wrong", log->size);
        return;
    }
    rd_flashback_drop_t *rd = (rd_flashback_drop_t *)log->data;
    rd->new_name[OG_NAME_BUFFER_SIZE - 1] = 0;
    knl_dictionary_t dc;
    text_t new_name;

    if (rd->table_id >= DC_GROUP_SIZE * DC_GROUP_COUNT) {
        OG_LOG_RUN_ERR("[RB] no need to replay drop table, invalid table id %u", rd->table_id);
        return;
    }
    if (knl_open_dc_by_id(session, rd->uid, rd->table_id, &dc, OG_FALSE) != OG_SUCCESS) {
        OG_LOG_RUN_ERR("[RB] failed to replay flashback drop table id %u", rd->table_id);
        rd_check_dc_replay_err(session);
        return;
    }

    cm_str2text(rd->new_name, &new_name);
    (void)dc_restore(session, DC_ENTITY(&dc), &new_name);
    dc_invalidate(session, DC_ENTITY(&dc));
    dc_close(&dc);
}

void print_flashback_drop_table(log_entry_t *log)
{
    rd_flashback_drop_t *rd = (rd_flashback_drop_t *)log->data;
    printf("flashback drop uid:%u,oid:%u,name:%s\n", rd->uid, rd->table_id, rd->new_name);
}
/*
 * restore truncate table segment from recycle bin
 * This interface is simple, but reserved for future design
 * @param kernel session, kernel dictionary, object description
 */
static inline status_t rb_restore_table_segment(knl_session_t *session, knl_dictionary_t *dc, knl_rb_desc_t *desc)
{
    table_t *table = DC_TABLE(dc);

    if (db_update_table_entry(session, &table->desc, desc->entry) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

/*
 * restore truncate table part segment from recycle bin
 * This interface is simple, but reserved for future design
 * @param kernel session, kernel dictionary, object description
 */
static status_t rb_restore_table_part_segment(knl_session_t *session, knl_dictionary_t *dc, knl_rb_desc_t *desc)
{
    table_t *table;
    table_part_t *table_part = NULL;
    text_t name;

    table = DC_TABLE(dc);
    knl_panic_log(IS_PART_TABLE(table), "current table is not part table, panic info: table %s rb_table %s",
                  table->desc.name, desc->name);
    cm_str2text(desc->part_name, &name);

    if (!part_table_find_by_name(table->part_table, &name, &table_part)) {
        OG_THROW_ERROR(ERR_PARTITION_NOT_EXIST, "table", desc->part_name);
        return OG_ERROR;
    }

    /* for parent part, there is not need to update entry */
    if (IS_PARENT_TABPART(&table_part->desc)) {
        return OG_SUCCESS;
    }

    if (db_update_table_part_entry(session, &table_part->desc, desc->entry) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

/*
 * restore truncate table subpart segment from recycle bin
 * @param kernel session, kernel dictionary, object description
 */
static status_t rb_restore_tabsubpart_segment(knl_session_t *session, knl_dictionary_t *dc, knl_rb_desc_t *desc)
{
    table_t *table = DC_TABLE(dc);
    text_t name;

    knl_panic_log(IS_PART_TABLE(table) && IS_COMPART_TABLE(table->part_table), "current table is not part table or "
        "the part_table is not compart_table, panic info: table %s rb_table %s", table->desc.name, desc->name);
    cm_str2text(desc->part_name, &name);

    table_part_t *compart = NULL;
    table_part_t *subpart = NULL;
    if (!subpart_table_find_by_name(table->part_table, &name, &compart, &subpart)) {
        OG_THROW_ERROR(ERR_PARTITION_NOT_EXIST, "table", desc->part_name);
        return OG_ERROR;
    }

    if (db_update_subtabpart_entry(session, &subpart->desc, desc->entry) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

/*
 * restore truncate index segment from recycle bin
 * The index id can be decoded from origin name in recycle bin.
 * So we can find the index we need to restore directly.
 * @param kernel session, kernel dictionary, object description
 */
static status_t rb_restore_index_segment(knl_session_t *session, knl_dictionary_t *dc, knl_rb_desc_t *desc)
{
    table_t *table = NULL;
    index_t *index;
    bool32 is_changed = OG_FALSE;
    errno_t ret;
    knl_index_desc_t index_desc;

    index = dc_find_index_by_scn((dc_entity_t *)dc->handle, desc->org_scn);
    if (index == NULL) {
        OG_THROW_ERROR_EX(ERR_RECYCLEBIN_MISMATCH, "index %s does not match flashback table %s.%s index",
                          desc->org_name, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc));
        return OG_ERROR;
    }

    table = DC_TABLE(dc);

    /*
     * global_index_invalid is for compatible with old version in these cases:
     * case 1 : index must was invalid if table can be truncated into recyclebin
     *     but recyclebin index entry is invalid
     * case 2 : global index is invalid after truncate partition,
     *     flashback to before truncate table should reset index to valid
     */
    bool32 global_index_invalid = (IS_PART_TABLE(table) && !IS_PART_INDEX(index) && IS_INVALID_PAGID(desc->entry));
    if (desc->is_invalid || global_index_invalid) {
        if (db_update_index_status(session, index, OG_TRUE, &is_changed) != OG_SUCCESS) {
            return OG_ERROR;
        }
    } else {
        if (db_update_index_status(session, index, OG_FALSE, &is_changed) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    /*
     * since rebuild index may change the index space id, when restore,we have to set the space id to the
     * original one. Could not update the desc->space_id directly since it will change dictionary cache
     */
    ret = memcpy_sp(&index_desc, sizeof(knl_index_desc_t), &index->desc, sizeof(knl_index_desc_t));
    knl_securec_check(ret);
    index_desc.space_id = desc->space_id;
    if (db_update_index_entry(session, &index_desc, desc->entry) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

/*
 * restore truncate index part segment from recycle bin
 * The index id can be decoded from origin name in recycle bin.
 * So we can find the index we need to restore directly.
 * @param kernel session, kernel dictionary, object description
 */
static status_t rb_restore_index_part_segment(knl_session_t *session, knl_dictionary_t *dc, knl_rb_desc_t *desc)
{
    table_t *table;
    index_t *index = NULL;
    index_part_t *index_part = NULL;
    uint32 index_id;
    uint32 part_no;
    bool32 is_changed = OG_FALSE;

    table = DC_TABLE(dc);
    knl_panic_log(IS_PART_TABLE(table), "current table is not part table, panic info: table %s rb_table %s",
                  table->desc.name, desc->name);
    errno_t ret = sscanf_s(desc->part_name, "INDEX%uP%u", &index_id, &part_no);
    knl_securec_check_ss(ret);
    knl_panic_log(part_no < table->part_table->desc.partcnt + OG_SPLIT_PART_COUNT,
                  "current part_no is abnormal, panic info: table %s rb_table %s part_no %u partcnt %u",
                  table->desc.name, desc->name, part_no, table->part_table->desc.partcnt);

    index = dc_find_index_by_id((dc_entity_t *)dc->handle, index_id);
    if (index == NULL) {
        OG_THROW_ERROR_EX(ERR_RECYCLEBIN_MISMATCH, "index %s part_no %u does not match flashback table %s.%s index",
                          desc->org_name, part_no, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc));
        return OG_ERROR;
    }

    knl_panic_log(IS_PART_INDEX(index), "current index is not part index, panic info: table %s rb_table %s index %s",
                  table->desc.name, desc->name, index->desc.name);

    index_part = INDEX_GET_PART(index, part_no);
    if (desc->is_invalid) {
        if (db_update_idxpart_status(session, index_part, OG_TRUE, &is_changed) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    /* for parent part, there is not need to update entry */
    if (IS_PARENT_IDXPART(&index_part->desc)) {
        return OG_SUCCESS;
    }
    index_part->desc.space_id = desc->space_id;
    if (db_upd_idx_part_entry(session, &index_part->desc, desc->entry) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

/*
* restore truncate index subpart segment from recycle bin
* The index id, compart_no and subpart_no can be decoded from origin name in recycle bin.
* So we can find the index we need to restore directly.
* @param kernel session, kernel dictionary, object description
*/
static status_t rb_restore_idxsubpart_segment(knl_session_t *session, knl_dictionary_t *dc, knl_rb_desc_t *desc)
{
    uint32 idx_id;
    uint32 compart_no;
    uint32 part_no;
    table_t *table = DC_TABLE(dc);
    dc_entity_t *entity = DC_ENTITY(dc);

    knl_panic_log(IS_PART_TABLE(table) && IS_COMPART_TABLE(table->part_table), "current table is not part table or "
        "the part_table is not compart_table, panic info: table %s rb_table %s", entity->table.desc.name, desc->name);
    errno_t ret = sscanf_s(desc->part_name, "INDEX%uP%uSUBP%u", &idx_id, &compart_no, &part_no);
    knl_securec_check_ss(ret);
    
    index_t *index = dc_find_index_by_id(entity, idx_id);
    if (index == NULL) {
        OG_THROW_ERROR_EX(ERR_RECYCLEBIN_MISMATCH, "index %s does not match flashback table %s.%s index",
            desc->org_name, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc));
        return OG_ERROR;
    }

    knl_panic_log(IS_PART_INDEX(index) && IS_COMPART_INDEX(index->part_index), "current index is not part index or "
                  "the part_index is not compart_index, panic info: table %s rb_table %s index %s",
                  entity->table.desc.name, desc->name, index->desc.name);
    index_part_t *index_compart = INDEX_GET_PART(index, compart_no);
    index_part_t *index_subpart = PART_GET_SUBENTITY(index->part_index, index_compart->subparts[part_no]);
    if (desc->is_invalid) {
        bool32 is_changed = OG_FALSE;
        if (db_update_sub_idxpart_status(session, index_subpart, OG_TRUE, &is_changed) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    index_subpart->desc.space_id = desc->space_id;
    if (db_upd_sub_idx_part_entry(session, &index_subpart->desc, desc->entry) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

/*
 * restore a truncate lob segment from recycle bin
 * The lob column id can be decoded from origin name in recycle bin.
 * So we can find the lob we need to restore directly.
 * @param kernel session, kernel dictionary, object description
 */
static status_t rb_restore_lob_segment(knl_session_t *session, knl_dictionary_t *dc, knl_rb_desc_t *desc)
{
    dc_entity_t *entity;
    lob_t *lob = NULL;
    knl_column_t *column = NULL;
    uint32 table_id;
    uint32 col_id;
    errno_t ret;

    entity = DC_ENTITY(dc);
    ret = sscanf_s(desc->org_name, "LOB%uC%u", &table_id, &col_id);
    knl_securec_check_ss(ret);
    knl_panic_log(desc->table_id == table_id, "rb's table_id is not equal table_id, panic info: table %s table_id %u "
                  "rb_table %s rb_table_id %u", entity->table.desc.name, table_id, desc->name, desc->table_id);

    column = dc_get_column(entity, col_id);
    if (!COLUMN_IS_LOB(column)) {
        OG_THROW_ERROR_EX(ERR_RECYCLEBIN_MISMATCH, "lob column id %u does not match table %s.%s column id",
                          col_id, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc));
        return OG_ERROR;
    }

    lob = (lob_t *)column->lob;
    if (db_update_lob_entry(session, &lob->desc, desc->entry) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

/*
 * restore a truncate lob part segment from recycle bin
 * The lob column id can be decoded from origin name in recycle bin.
 * So we can find the lob we need to restore directly.
 * @param kernel session, kernel dictionary, object description
 */
static status_t rb_restore_lob_part_segment(knl_session_t *session, knl_dictionary_t *dc, knl_rb_desc_t *desc)
{
    dc_entity_t *entity;
    table_t *table;
    lob_t *lob = NULL;
    lob_part_t *lob_part = NULL;
    knl_column_t *column = NULL;
    uint32 table_id;
    uint32 col_id;
    uint32 part_no;
    errno_t ret;

    entity = DC_ENTITY(dc);
    table = &entity->table;
    knl_panic_log(IS_PART_TABLE(table), "current table is not part_table, panic info: table %s rb_table %s",
                  table->desc.name, desc->name);
    ret = sscanf_s(desc->part_name, "LOB%uC%uP%u", &table_id, &col_id, &part_no);
    knl_securec_check_ss(ret);
    knl_panic_log(desc->table_id == table_id, "rb's table_id is not equal table_id, panic info: table %s table_id %u "
                  "rb_table %s rb_table_id %u", entity->table.desc.name, table_id, desc->name, desc->table_id);
    knl_panic_log(part_no < table->part_table->desc.partcnt + OG_SPLIT_PART_COUNT,
                  "current part_no is abnormal, panic info: table %s rb_table %s part_no %u partcnt %u",
                  table->desc.name, desc->name, part_no, table->part_table->desc.partcnt);

    column = dc_get_column(entity, col_id);
    if (!COLUMN_IS_LOB(column)) {
        OG_THROW_ERROR_EX(ERR_RECYCLEBIN_MISMATCH, "lob id %u partition no %u does not match table %s.%s column id",
                          col_id, part_no, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc));
        return OG_ERROR;
    }

    lob = (lob_t *)column->lob;
    lob_part = LOB_GET_PART(lob, part_no);
    /* for parent part, there is not need to update entry */
    if (IS_PARENT_LOBPART(&lob_part->desc)) {
        return OG_SUCCESS;
    }

    if (db_update_lob_part_entry(session, &lob_part->desc, desc->entry) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

/*
* restore a truncate sublob part segment from recycle bin
* The lob column id, compart_no and subpart_no can be decoded from origin name in recycle bin.
* So we can find the lob we need to restore directly.
* @param kernel session, kernel dictionary, object description
*/
static status_t rb_restore_lobsubpart_segment(knl_session_t *session, knl_dictionary_t *dc, knl_rb_desc_t *desc)
{
    dc_entity_t *entity = DC_ENTITY(dc);
    table_t *table = &entity->table;
    uint32 table_id;
    uint32 col_id;
    uint32 compart_no;
    uint32 part_no;

    knl_panic_log(IS_PART_TABLE(table) && IS_COMPART_TABLE(table->part_table), "current table is not part table or "
        "the part_table is not compart table, panic info: table %s rb_table %s", table->desc.name, desc->name);
    errno_t ret = sscanf_s(desc->part_name, "LOB%uC%uP%uSUBP%u", &table_id, &col_id, &compart_no, &part_no);
    knl_securec_check_ss(ret);
    knl_panic_log(desc->table_id == table_id, "rb's table_id is not equal table_id, panic info: table %s table_id %u "
                  "rb_table %s rb_table_id %u", entity->table.desc.name, table_id, desc->name, desc->table_id);
    knl_panic_log(compart_no < table->part_table->desc.partcnt + OG_SPLIT_PART_COUNT,
                  "current compart_no is abnormal, panic info: table %s rb_table %s compart_no %u partcnt %u",
                  table->desc.name, desc->name, compart_no, table->part_table->desc.partcnt);

    knl_column_t *column = dc_get_column(entity, col_id);
    if (!COLUMN_IS_LOB(column)) {
        OG_THROW_ERROR_EX(ERR_RECYCLEBIN_MISMATCH, "lob id %u does not match table %s.%s column id",
            col_id, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc));
        return OG_ERROR;
    }

    lob_t *lob = column->lob;
    lob_part_t *lob_compart = LOB_GET_PART(lob, compart_no);
    lob_part_t *lob_subpart = PART_GET_SUBENTITY(lob->part_lob, lob_compart->subparts[part_no]);
    if (db_update_sublobpart_entry(session, &lob_subpart->desc, desc->entry) != OG_SUCCESS) {
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

static status_t rb_restore_object_segment(knl_session_t *session, knl_dictionary_t *dc, knl_rb_desc_t *desc)
{
    status_t status = OG_SUCCESS;
    
    switch (desc->type) {
        case RB_TABLE_OBJECT:
            status = rb_restore_table_segment(session, dc, desc);
            break;

        case RB_INDEX_OBJECT:
            status = rb_restore_index_segment(session, dc, desc);
            break;

        case RB_LOB_OBJECT:
            status = rb_restore_lob_segment(session, dc, desc);
            break;

        case RB_TABLE_PART_OBJECT:
            status = rb_restore_table_part_segment(session, dc, desc);
            break;

        case RB_INDEX_PART_OBJECT:
            status = rb_restore_index_part_segment(session, dc, desc);
            break;

        case RB_LOB_PART_OBJECT:
            status = rb_restore_lob_part_segment(session, dc, desc);
            break;

        case RB_TABLE_SUBPART_OBJECT:
            status = rb_restore_tabsubpart_segment(session, dc, desc);
            break;

        case RB_INDEX_SUBPART_OBJECT:
            status = rb_restore_idxsubpart_segment(session, dc, desc);
            break;

        case RB_LOB_SUBPART_OBJECT:
            status = rb_restore_lobsubpart_segment(session, dc, desc);
            break;

        default:
            OG_THROW_ERROR(ERR_INVALID_FLASHBACK_TYPE, desc->type);
            return OG_ERROR;
    }
    
    return status;
}

/*
 * restore truncate table and its related object from recycle bin
 * @param kernel session, kernel dictionary, object description
 * @note here we use db drop table segments interface to release segments we
 * are holding. In case we are releasing segments, crash happens, the transaction
 * roll backed, we have released some segments, we can use flashback table to
 * before truncate again to recover, the segments we released would not be released
 * again.
 */
static status_t rb_restore_truncate(knl_session_t *session, knl_dictionary_t *dc, knl_rb_desc_t *desc)
{
    dc_context_t *ogx = &session->kernel->dc_ctx;
    knl_cursor_t *cursor = NULL;

    CM_SAVE_STACK(session->stack);

    cursor = knl_push_cursor(session);

    knl_open_sys_cursor(session, cursor, CURSOR_ACTION_DELETE, SYS_RB_ID, IX_SYS_RB002_ID);
    knl_init_index_scan(cursor, OG_FALSE);
    knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_BIGINT,
                     &desc->base_id, sizeof(uint64), IX_COL_SYS_RB002_BASE_ID);
    knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, IX_COL_SYS_RB002_PURGE_ID);
    knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.r_key, OG_TYPE_BIGINT,
                     &desc->base_id, sizeof(uint64), IX_COL_SYS_RB002_BASE_ID);
    knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, IX_COL_SYS_RB002_PURGE_ID);

    if (knl_fetch(session, cursor) != OG_SUCCESS) {
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    if (cursor->eof) {
        OG_THROW_ERROR(ERR_RECYCLE_OBJ_NOT_EXIST, ogx->users[desc->uid]->desc.name, desc->org_name);
        CM_RESTORE_STACK(session->stack);
        return OG_ERROR;
    }

    do {
        rb_convert_desc(cursor, desc);

        if (rb_restore_object_segment(session, dc, desc) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }

        if (knl_internal_delete(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }

        if (knl_fetch(session, cursor) != OG_SUCCESS) {
            CM_RESTORE_STACK(session->stack);
            return OG_ERROR;
        }
    } while (!cursor->eof);

    CM_RESTORE_STACK(session->stack);

    return OG_SUCCESS;
}

/*
 * flashback truncate table interface
 * flashback table to before truncate, before flashback, we should
 * check the table change scn, if table definition has changed, we are
 * not allowed to do flashback truncate.
 * @param kernel session, dictionary, force mode
 * @note force mode is that, maybe there are data in current table, it's
 * necessary to notice user to decide whether to do flashback truncate 'causing
 * once we flashback table to before truncate, current segments would converted
 * by segments in recycle bin.
 */
status_t rb_flashback_truncate_table(knl_session_t *session, knl_dictionary_t *dc, bool32 is_force)
{
    knl_rb_desc_t desc;
    rd_table_t redo;

    if (rb_fetch_truncate_table(session, dc, &desc) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (desc.tchg_scn != dc->chg_scn) {
        OG_THROW_ERROR(ERR_DEF_CHANGED, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc));
        return OG_ERROR;
    }

    if (db_table_has_segment(session, dc)) {
        if (!is_force) {
            OG_THROW_ERROR(ERR_TABLE_NOT_EMPTY, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc));
            return OG_ERROR;
        }

        if (rb_truncate_table(session, dc) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    if (rb_restore_truncate(session, dc, &desc) != OG_SUCCESS) {
        return OG_ERROR;
    }

    redo.op_type = RD_ALTER_TABLE;
    redo.uid = dc->uid;
    redo.oid = dc->oid;
    log_put(session, RD_LOGIC_OPERATION, &redo, sizeof(rd_table_t), LOG_ENTRY_FLAG_NONE);
    knl_commit(session);
    dc_invalidate(session, DC_ENTITY(dc));
    return OG_SUCCESS;
}

static status_t rb_prepare_invalid_indexes(knl_session_t *session, table_t *table)
{
    index_t *index = NULL;
    bool32 is_changed = OG_FALSE;

    for (uint32 i = 0; i < table->index_set.total_count; i++) {
        index = table->index_set.items[i];

        if (!IS_PART_INDEX(index)) {
            if (db_update_index_status(session, index, OG_TRUE, &is_changed) != OG_SUCCESS) {
                return OG_ERROR;
            }
            if (btree_segment_prepare(session, index, OG_FALSE, BTREE_DROP_SEGMENT) != OG_SUCCESS) {
                return OG_ERROR;
            }
        }
    }
    return OG_SUCCESS;
}

static status_t rb_flashback_idx_chk(table_t *table)
{
    uint32 valid_count = table->index_set.total_count;

    for (uint32 i = 0; i < valid_count; i++) {
        index_t *index = table->index_set.items[i];
        if (index == NULL) {
            continue;
        }

        if (IS_PART_INDEX(index)) {
            continue;
        }

        if (IS_UNIQUE_PRIMARY_INDEX(index)) {
            OG_THROW_ERROR(ERR_OPERATIONS_NOT_ALLOW, "flashback force when there is unique index");
            return OG_ERROR;
        }
    }

    return OG_SUCCESS;
}

static status_t rb_flashback_partition_check(table_t *table, knl_dictionary_t *dc, text_t *partition_name,
                                        table_part_t **table_part)
{
    if (!IS_PART_TABLE(table)) {
        OG_THROW_ERROR(ERR_OPERATIONS_NOT_SUPPORT, "flashback table partition", DC_ENTRY_NAME(dc));
        return OG_ERROR;
    }

    if (!part_table_find_by_name(table->part_table, partition_name, table_part)) {
        OG_THROW_ERROR(ERR_PARTITION_NOT_EXIST, "table", T2S(partition_name));
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

/*
 * flashback table part interface
 * flashback table part to before truncate, before flashback, we should
 * check the table change scn, if table definition has changed, we are
 * not allowed to do flashback truncate.
 * @param kernel session, dictionary, part name, force mode
 */
status_t rb_flashback_truncate_tabpart(knl_session_t *session, knl_dictionary_t *dc,
                                       text_t *part_name, bool32 is_force)
{
    knl_rb_desc_t desc;
    table_t *table = DC_TABLE(dc);
    table_part_t *table_part = NULL;
    rd_table_t redo;

    if (rb_flashback_partition_check(table, dc, part_name, &table_part) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (rb_fetch_truncate_tabpart(session, dc, table_part, &desc) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (desc.tchg_scn != dc->chg_scn) {
        OG_THROW_ERROR(ERR_DEF_CHANGED, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc));
        return OG_ERROR;
    }

    if (db_tabpart_has_segment(table->part_table, table_part)) {
        if (!is_force) {
            OG_THROW_ERROR(ERR_TABLE_NOT_EMPTY, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc));
            return OG_ERROR;
        }

        if (rb_flashback_idx_chk(table) != OG_SUCCESS) {
            return OG_ERROR;
        }

        if (rb_truncate_table_part(session, dc, table_part) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    if (rb_restore_truncate(session, dc, &desc) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (rb_prepare_invalid_indexes(session, table) != OG_SUCCESS) {
        return OG_ERROR;
    }

    redo.op_type = RD_ALTER_TABLE;
    redo.uid = dc->uid;
    redo.oid = dc->oid;
    log_put(session, RD_LOGIC_OPERATION, &redo, sizeof(rd_table_t), LOG_ENTRY_FLAG_NONE);
    knl_commit(session);
    if (db_garbage_segment_handle(session, dc->uid, dc->oid, OG_FALSE) != OG_SUCCESS) {
        cm_spin_lock(&session->kernel->rmon_ctx.mark_mutex, NULL);
        session->kernel->rmon_ctx.delay_clean_segments = OG_TRUE;
        cm_spin_unlock(&session->kernel->rmon_ctx.mark_mutex);
        OG_LOG_RUN_ERR("failed to handle garbage segment");
    }
    dc_invalidate(session, DC_ENTITY(dc));
    return OG_SUCCESS;
}

/*
* flashback table subpart interface
* flashback table subpart to before truncate. before flashback, we should
* check the table change scn, if table definition has changed, we are
* not allowed to do flashback truncate.
* @param kernel session, dictionary, part name, force mode
*/
status_t rb_flashback_truncate_tabsubpart(knl_session_t *session, knl_dictionary_t *dc, text_t *part_name,
    bool32 is_force)
{
    table_t *table = DC_TABLE(dc);

    table_part_t *compart = NULL;
    table_part_t *subpart = NULL;
    if (!subpart_table_find_by_name(table->part_table, part_name, &compart, &subpart)) {
        OG_THROW_ERROR(ERR_PARTITION_NOT_EXIST, "table", T2S(part_name));
        return OG_ERROR;
    }

    knl_rb_desc_t desc = { 0 };
    if (rb_fetch_truncate_subtabpart(session, dc, subpart, &desc) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if (desc.tchg_scn != dc->chg_scn) {
        OG_THROW_ERROR(ERR_DEF_CHANGED, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc));
        return OG_ERROR;
    }

    if (subpart->heap.segment != NULL) {
        if (!is_force) {
            OG_THROW_ERROR(ERR_TABLE_NOT_EMPTY, DC_ENTRY_USER_NAME(dc), DC_ENTRY_NAME(dc));
            return OG_ERROR;
        }

        if (rb_flashback_idx_chk(table) != OG_SUCCESS) {
            return OG_ERROR;
        }

        if (rb_truncate_table_subpart(session, dc, subpart, compart->part_no) != OG_SUCCESS) {
            return OG_ERROR;
        }
    }

    if (rb_restore_truncate(session, dc, &desc) != OG_SUCCESS) {
        return OG_ERROR;
    }

    rd_table_t redo;
    if (rb_prepare_invalid_indexes(session, table) != OG_SUCCESS) {
        return OG_ERROR;
    }

    redo.op_type = RD_ALTER_TABLE;
    redo.uid = dc->uid;
    redo.oid = dc->oid;
    log_put(session, RD_LOGIC_OPERATION, &redo, sizeof(rd_table_t), LOG_ENTRY_FLAG_NONE);
    knl_commit(session);
    if (db_garbage_segment_handle(session, dc->uid, dc->oid, OG_FALSE) != OG_SUCCESS) {
        cm_spin_lock(&session->kernel->rmon_ctx.mark_mutex, NULL);
        session->kernel->rmon_ctx.delay_clean_segments = OG_TRUE;
        cm_spin_unlock(&session->kernel->rmon_ctx.mark_mutex);
        OG_LOG_RUN_ERR("failed to handle garbage segment");
    }
    dc_invalidate(session, DC_ENTITY(dc));
    return OG_SUCCESS;
}

static status_t rb_fetch_index_by_name(knl_session_t *session, knl_purge_def_t *def, knl_rb_desc_t *desc)
{
    uint32 table_id = OG_INVALID_ID32;
    
    if (!CM_IS_EMPTY(&def->ext_name)) {
        text_t name = def->name;
        def->name = def->ext_name;
        if (rb_purge_fetch_name(session, def, RB_TABLE_OBJECT, desc) != OG_SUCCESS) {
            return OG_ERROR;
        }
        def->name = name;
        table_id = desc->table_id;
    }

    if (rb_purge_fetch_name(session, def, RB_INDEX_OBJECT, desc) != OG_SUCCESS) {
        return OG_ERROR;
    }

    if ((table_id != OG_INVALID_ID32) && (table_id != desc->table_id)) {
        OG_THROW_ERROR(ERR_RECYCLE_OBJ_NOT_EXIST, T2S(&def->owner), T2S_EX(&def->name));
        return OG_ERROR;
    }

    return OG_SUCCESS;
}

status_t db_purge(knl_session_t *session, knl_purge_def_t *def)
{
    knl_rb_desc_t desc;
    uint32 space_id;

    switch (def->type) {
        case PURGE_TABLE:
            if (rb_purge_fetch_name(session, def, RB_TABLE_OBJECT, &desc) != OG_SUCCESS) {
                return OG_ERROR;
            }
            return rb_purge_table(session, &desc);

        case PURGE_TABLE_OBJECT:
            if (rb_purge_fetch_object(session, def, RB_TABLE_OBJECT, &desc) != OG_SUCCESS) {
                return OG_ERROR;
            }
            return rb_purge_table(session, &desc);

        case PURGE_INDEX:
            if (rb_fetch_index_by_name(session, def, &desc) != OG_SUCCESS) {
                return OG_ERROR;
            }
            return rb_purge_index(session, &desc);

        case PURGE_INDEX_OBJECT:
            if (rb_purge_fetch_object(session, def, RB_INDEX_OBJECT, &desc) != OG_SUCCESS) {
                return OG_ERROR;
            }
            return rb_purge_index(session, &desc);

        case PURGE_PART:
            if (rb_purge_fetch_name(session, def, RB_TABLE_PART_OBJECT, &desc) != OG_SUCCESS) {
                return OG_ERROR;
            }
            return rb_purge_table_part(session, &desc);

        case PURGE_PART_OBJECT:
            if (rb_purge_fetch_object(session, def, RB_TABLE_PART_OBJECT, &desc) != OG_SUCCESS) {
                return OG_ERROR;
            }
            return rb_purge_table_part(session, &desc);

        case PURGE_TABLESPACE:
            if (spc_get_space_id(session, &def->name, OG_FALSE, &space_id) != OG_SUCCESS) {
                return OG_ERROR;
            }
            if (spc_check_by_uid(session, &def->name, space_id, session->uid) != OG_SUCCESS) {
                return OG_ERROR;
            }
            return rb_purge_space(session, space_id);

        case PURGE_RECYCLEBIN:
            return rb_purge_recyclebin(session);

        default:
            OG_THROW_ERROR(ERR_INVALID_PURGE_TYPE, def->type);
            return OG_ERROR;
    }
}