* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* knl_undo.c
*
*
* IDENTIFICATION
* src/kernel/xact/knl_undo.c
*
* -------------------------------------------------------------------------
*/
#include "knl_xact_module.h"
#include "knl_undo.h"
#include "knl_buffer_access.h"
#include "knl_context.h"
#include "knl_space_manage.h"
#include "dtc_database.h"
undo_type_descriptor_t g_undo_type_desc[] = {
{ UNDO_HEAP_INSERT, "heap_insert" },
{ UNDO_HEAP_DELETE, "heap_delete" },
{ UNDO_HEAP_COMPACT_DELETE, "heap_compact_delete" },
{ UNDO_HEAP_UPDATE, "heap_update" },
{ UNDO_HEAP_UPDATE_FULL, "heap_update_full" },
{ UNDO_BTREE_INSERT, "btree_insert" },
{ UNDO_BTREE_DELETE, "btree_delete" },
{ UNDO_CREATE_INDEX, "create_index" },
{ UNDO_LOB_INSERT, "lob_insert" },
{ UNDO_LOB_DELETE_COMMIT, "lob_delete_commit" },
{ UNDO_TEMP_HEAP_INSERT, "temp_heap_insert" },
{ UNDO_TEMP_HEAP_BINSERT, "temp_heap_batch_insert" },
{ UNDO_TEMP_HEAP_DELETE, "temp_heap_delete" },
{ UNDO_TEMP_HEAP_UPDATE, "temp_heap_delete" },
{ UNDO_TEMP_HEAP_UPDATE_FULL, "temp_heap_update_full" },
{ UNDO_TEMP_BTREE_INSERT, "temp_btree_insert" },
{ UNDO_TEMP_BTREE_DELETE, "temp_btree_delete" },
{ UNDO_TEMP_BTREE_BINSERT, "temp_btree_batch_insert" },
{ UNDO_LOB_DELETE, "lob_delete" },
{ UNDO_HEAP_INSERT_MIGR, "heap_insert_migr" },
{ UNDO_HEAP_UPDATE_LINKRID, "heap_update_linkrid" },
{ UNDO_HEAP_DELETE_MIGR, "heap_delete_migr" },
{ UNDO_HEAP_DELETE_ORG, "heap_delete_org" },
{ UNDO_HEAP_COMPACT_DELETE_ORG, "heap_compact_delete_org" },
{ UNDO_PCRH_ITL, "pcrh_itl" },
{ UNDO_PCRH_INSERT, "pcrh_insert" },
{ UNDO_PCRH_DELETE, "pcrh_delete" },
{ UNDO_PCRH_COMPACT_DELETE, "pcrh_compact_delete" },
{ UNDO_PCRH_UPDATE, "pcrh_update" },
{ UNDO_PCRH_UPDATE_FULL, "pcrh_update_full" },
{ UNDO_PCRH_UPDATE_LINK_SSN, "pcrh_update_link_ssn" },
{ UNDO_PCRH_UPDATE_NEXT_RID, "pcrh_update_next_rid" },
{ UNDO_PCRH_BATCH_INSERT, "pcrh_batch_insert" },
{ UNDO_PCRB_ITL, "pcrb_itl" },
{ UNDO_PCRB_INSERT, "pcrb_insert" },
{ UNDO_PCRB_DELETE, "pcrb_delete" },
{ UNDO_PCRB_BATCH_INSERT, "pcrb_batch_insert" },
{ UNDO_LOB_DELETE_COMMIT_RECYCLE, "lob_delete_commit_recycle" },
{ UNDO_LOB_ALLOC_PAGE, "lob_alloc_page" },
{ UNDO_CREATE_HEAP, "create hash partition heap entry" },
{ UNDO_CREATE_LOB, "create hash partition lob entry" },
};
const char *undo_type(uint8 type)
{
uint32 undo_count = sizeof(g_undo_type_desc) / sizeof(undo_type_descriptor_t);
for (uint32 i = 0; i < undo_count; i++) {
if (type == g_undo_type_desc[i].undo_type) {
return g_undo_type_desc[i].type_desc;
}
}
return "invalid";
}
* print the given undo page for debug
* @note caller should hold the undo page
*/
status_t undo_dump_page(knl_session_t *session, page_head_t *page_head, cm_dump_t *dump)
{
undo_page_t *page = (undo_page_t *)page_head;
undo_row_t *row = NULL;
undo_page_id_t seg_id;
cm_dump(dump, "undo page information\n");
cm_dump(dump, "\tprev: %u-%u", page->prev.file, page->prev.page);
cm_dump(dump, "\tss_time: %lld", page->ss_time);
cm_dump(dump, "\trows: %u", page->rows);
cm_dump(dump, "\tfree_size: %u", page->free_size);
cm_dump(dump, "\tfree_begin: %u", page->free_begin);
cm_dump(dump, "\tbegin_slot: %u\n", page->begin_slot);
cm_dump(dump, "row information on this page\n");
cm_dump(dump, "slot\trow_type\tis_cleaned\tis_xfirst\tscn\tis_owscn\txmap\txnum\tssn");
cm_dump(dump, "\tseg_id\tis_shadow\tuser_id\ttable_id\tindex_id\trow_page\trow_slot\tprev_undo\tprev_slot\n");
CM_DUMP_WRITE_FILE(dump);
for (uint32 slot = 0; slot < page->rows; slot++) {
row = UNDO_ROW(session, page, slot);
cm_dump(dump, "#%u\t%s", slot, undo_type((uint8)row->type));
cm_dump(dump, "\t%u\t%u\t%llu\t%u\t%u-%u\t%u\t%u", row->is_cleaned, row->is_xfirst, row->scn, row->is_owscn,
row->xid.xmap.seg_id, row->xid.xmap.slot, row->xid.xnum, row->ssn);
if (row->type == UNDO_BTREE_INSERT || row->type == UNDO_BTREE_DELETE) {
seg_id.value = row->prev_page.value;
cm_dump(dump, "\t%u-%u\t%u", seg_id.file, seg_id.page, (row->index_id == OG_SHADOW_INDEX_ID ? 1 : 0));
} else if (row->type == UNDO_TEMP_BTREE_INSERT || row->type == UNDO_TEMP_BTREE_DELETE) {
cm_dump(dump, "\t%u\t%u\t%u", row->user_id, (uint32)row->seg_page, (uint32)row->index_id);
} else {
cm_dump(dump, "\t%u-%u\t%u", (uint32)row->rowid.file, (uint32)row->rowid.page, (uint32)row->rowid.slot);
}
cm_dump(dump, "\t%u-%u\t%u\n", row->prev_page.file, row->prev_page.page, row->prev_slot);
CM_DUMP_WRITE_FILE(dump);
}
return OG_SUCCESS;
}
static inline bool32 undo_cipher_reserve_valid(knl_session_t *session, undo_page_t *page, uint8 cipher_reserve_size)
{
uint16 cipher_offset = sizeof(undo_page_t) + cipher_reserve_size;
if (page->free_begin < cipher_offset) {
return OG_FALSE;
} else if (page->rows > 0 && *UNDO_SLOT(session, page, 0) < cipher_offset) {
return OG_FALSE;
}
return OG_TRUE;
}
static inline bool32 undo_is_formated_page(knl_session_t *session, undo_page_t *page)
{
if (page->rows == 0 && page->begin_slot == 0 &&
page->free_size == (uint16)(DEFAULT_PAGE_SIZE(session) - sizeof(undo_page_t) - sizeof(page_tail_t)) &&
page->free_begin == sizeof(undo_page_t)) {
return OG_TRUE;
}
return OG_FALSE;
}
bool32 undo_valid_encrypt(knl_session_t *session, page_head_t *page)
{
space_t *space = SPACE_GET(session, DATAFILE_GET(session, AS_PAGID_PTR(page->id)->file)->space_id);
if (space->ctrl->cipher_reserve_size == 0) {
return OG_FALSE;
}
return undo_cipher_reserve_valid(session, (undo_page_t *)page, space->ctrl->cipher_reserve_size);
}
void temp2_undo_init(knl_session_t *session)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
uint32 i;
for (i = 0; i < UNDO_SEGMENT_COUNT(session); i++) {
ogx->undos[i].temp_free_page_list.count = 0;
ogx->undos[i].temp_free_page_list.first = INVALID_UNDO_PAGID;
ogx->undos[i].temp_free_page_list.last = INVALID_UNDO_PAGID;
}
}
static void undo_init_proc(thread_t *thread)
{
knl_session_t *session = NULL;
if (g_knl_callback.alloc_knl_session(OG_TRUE, (knl_handle_t *)&session) != OG_SUCCESS) {
knl_panic(0);
}
undo_init_worker_t *worker = (undo_init_worker_t *)thread->argument;
undo_init_ctx_t *undo_ctx = worker->undo_ctx;
undo_page_id_t *entry = worker->entry;
uint32 lseg_no = worker->lseg_no;
uint32 rseg_no = worker->rseg_no;
undo_set_t *undo_set = worker->undo_set;
OG_LOG_RUN_INF("prefetch start thread %u", lseg_no);
if (!thread->closed) {
for (uint32 i = lseg_no; i < rseg_no; i += UNDO_INIT_THREAD_NUMS) {
undo_set->undos[i].entry = entry[i];
buf_enter_page(session, PAGID_U2N(entry[i]), LATCH_MODE_S, ENTER_PAGE_RESIDENT);
undo_set->undos[i].segment = (undo_segment_t *)(CURR_PAGE(session) + PAGE_HEAD_SIZE);
buf_leave_page(session, OG_FALSE);
for (uint32 j = 0; j < undo_set->undos[i].segment->txn_page_count; j++) {
buf_enter_page(session, PAGID_U2N(undo_set->undos[i].segment->txn_page[j]), LATCH_MODE_S,
ENTER_PAGE_RESIDENT);
undo_set->undos[i].txn_pages[j] = (txn_page_t *)CURR_PAGE(session);
buf_leave_page(session, OG_FALSE);
}
}
}
thread->closed = OG_TRUE;
if (undo_ctx->undo_init_active_workers > 0) {
(void)cm_atomic_dec(&undo_ctx->undo_init_active_workers);
}
g_knl_callback.release_knl_session(session);
OG_LOG_RUN_INF("prefetch end thread %u", lseg_no);
return;
}
void undo_init_impl(knl_session_t *session, undo_set_t *undo_set, uint32 lseg_no, uint32 rseg_no)
{
undo_page_id_t *entry = NULL;
undo_init_ctx_t undo_ctx = { 0 };
undo_ctx.undo_init_active_workers = UNDO_INIT_THREAD_NUMS;
undo_set->used = OG_TRUE;
buf_enter_page(session, undo_set->space->entry, LATCH_MODE_S, ENTER_PAGE_RESIDENT);
entry = (undo_page_id_t *)(CURR_PAGE(session) + PAGE_HEAD_SIZE + sizeof(space_head_t));
for (int k = 0; k < UNDO_INIT_THREAD_NUMS; k++) {
undo_init_worker_t *undo_init_worker = &undo_ctx.workers[k];
undo_init_worker->entry = entry;
undo_init_worker->lseg_no = lseg_no + k;
undo_init_worker->rseg_no = rseg_no;
undo_init_worker->undo_ctx = &undo_ctx;
undo_init_worker->undo_set = undo_set;
if (cm_create_thread(undo_init_proc, 0, undo_init_worker, &undo_init_worker->thread) != OG_SUCCESS) {
OG_LOG_RUN_ERR("failed to create tx_rollback_proc %u", k);
return;
}
}
while (undo_ctx.undo_init_active_workers > 0) {
OG_LOG_RUN_INF("wait for prefetch end");
cm_sleep(5);
}
for (int k = 0; k < UNDO_INIT_THREAD_NUMS; k++) {
cm_close_thread(&undo_ctx.workers[k].thread);
}
buf_leave_page(session, OG_FALSE);
}
void undo_set_release(knl_session_t *session, undo_set_t *undo_set)
{
undo_t *undo = NULL;
buf_ctrl_t *ctrl = NULL;
for (uint32 i = 0; i < UNDO_SEGMENT_COUNT(session); i++) {
undo = &undo_set->undos[i];
for (uint32 j = 0; j < undo->segment->txn_page_count; j++) {
buf_enter_page(session, PAGID_U2N(undo->segment->txn_page[j]), LATCH_MODE_S, ENTER_PAGE_RESIDENT);
ctrl = session->curr_page_ctrl;
buf_leave_page(session, OG_FALSE);
buf_unreside(session, ctrl);
undo->txn_pages[j] = NULL;
}
buf_enter_page(session, PAGID_U2N(undo->entry), LATCH_MODE_S, ENTER_PAGE_RESIDENT);
ctrl = session->curr_page_ctrl;
buf_leave_page(session, OG_FALSE);
buf_unreside(session, ctrl);
undo->segment = NULL;
}
undo_set->inst_id = OG_INVALID_ID32;
undo_set->space = NULL;
undo_set->used = OG_FALSE;
}
* init the undo context
*/
void undo_init(knl_session_t *session, uint32 lseg_no, uint32 rseg_no)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_set_t *undo_set = MY_UNDO_SET(session);
ogx->retention = session->kernel->attr.undo_retention_time;
ogx->space = spc_get_undo_space(session, session->kernel->id);
ogx->temp_space = spc_get_temp_undo_space(session, session->kernel->id);
ogx->is_switching = OG_FALSE;
ogx->extend_segno = 0;
ogx->extend_cnt = 0;
ogx->undos = undo_set->undos;
undo_set->space = ogx->space;
undo_set->inst_id = session->kernel->id;
if (!cm_dbs_is_enable_dbs() || DB_IS_PRIMARY(&session->kernel->db) || rc_is_master()) {
undo_init_impl(session, undo_set, lseg_no, rseg_no);
}
temp2_undo_init(session);
}
* close the undo pre-loading thread
*/
void undo_close(knl_session_t *session)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
cm_close_thread(&ogx->thread);
}
* format the specified txn page during undo create
* @param kernel session, txn page, page id
*/
static inline void undo_format_txn(knl_session_t *session, page_head_t *page, page_id_t page_id)
{
page_init(session, page, page_id, PAGE_TYPE_TXN);
log_put(session, RD_UNDO_FORMAT_TXN, page, sizeof(page_head_t), LOG_ENTRY_FLAG_NONE);
}
* format the specified undo page
* @param kernel session, undo page, page id, prev undo page, next undo page
* @note as undo page management is page level rather than extent level, we
* link formated undo page to the specified undo page list.
*/
void undo_format_page(knl_session_t *session, undo_page_t *page, page_id_t page_id, undo_page_id_t prev,
undo_page_id_t next)
{
page_init(session, (page_head_t *)page, page_id, PAGE_TYPE_UNDO);
AS_PAGID_PTR(page->head.next_ext)->file = next.file;
AS_PAGID_PTR(page->head.next_ext)->page = next.page;
page->ss_time = 0;
page->rows = 0;
page->begin_slot = 0;
page->prev = prev;
page->free_size = (uint16)(DEFAULT_PAGE_SIZE(session) - sizeof(undo_page_t) - sizeof(page_tail_t));
page->free_begin = sizeof(undo_page_t);
}
static void undo_extend_txn_impl(knl_session_t *session, space_t *space, undo_t *undo, page_id_t *extent)
{
rd_undo_alloc_txn_page_t rd;
buf_enter_page(session, PAGID_U2N(undo->entry), LATCH_MODE_X, ENTER_PAGE_RESIDENT);
buf_enter_page(session, *extent, LATCH_MODE_X, ENTER_PAGE_RESIDENT | ENTER_PAGE_NO_READ);
undo_format_txn(session, (page_head_t *)CURR_PAGE(session), *extent);
rd.slot = undo->segment->txn_page_count;
rd.txn_extent = *extent;
undo->segment->txn_page[undo->segment->txn_page_count++] = PAGID_N2U(*extent);
buf_leave_page(session, OG_TRUE);
log_put(session, RD_UNDO_EXTEND_TXN, &rd, sizeof(rd_undo_alloc_txn_page_t), LOG_ENTRY_FLAG_NONE);
buf_leave_page(session, OG_TRUE);
}
* extend a txn page for the specified undo segment
* @param kernel session, undo segment id
* @note we use space alloc extent interface to catch error
*/
static status_t undo_extend_txn(knl_session_t *session, space_t *space, undo_t *undo)
{
page_id_t extent;
log_atomic_op_begin(session);
if (OG_SUCCESS != spc_alloc_extent(session, space, space->ctrl->extent_size, &extent, OG_FALSE)) {
OG_THROW_ERROR(ERR_ALLOC_EXTENT, space->ctrl->name);
log_atomic_op_end(session);
return OG_ERROR;
}
undo_extend_txn_impl(session, space, undo, &extent);
log_atomic_op_end(session);
return OG_SUCCESS;
}
static status_t undo_df_extend_txn(knl_session_t *session, space_t *space, undo_t *undo, datafile_t *df)
{
page_id_t extent;
log_atomic_op_begin(session);
if (OG_SUCCESS != spc_df_alloc_extent(session, space, space->ctrl->extent_size, &extent, df)) {
OG_THROW_ERROR(ERR_ALLOC_EXTENT, space->ctrl->name);
log_atomic_op_end(session);
return OG_ERROR;
}
undo_extend_txn_impl(session, space, undo, &extent);
log_atomic_op_end(session);
return OG_SUCCESS;
}
static void undo_create_segment_impl(knl_session_t *session, space_t *space, undo_t *undo, uint32 id, page_id_t *extent)
{
undo_page_id_t *undo_entry = NULL;
rd_undo_alloc_seg_t redo;
undo->entry = PAGID_N2U(*extent);
buf_enter_page(session, space->entry, LATCH_MODE_X, ENTER_PAGE_RESIDENT);
undo_entry = (undo_page_id_t *)(CURR_PAGE(session) + PAGE_HEAD_SIZE + sizeof(space_head_t));
undo_entry[id] = undo->entry;
redo.entry = undo->entry;
redo.id = id;
log_put(session, RD_UNDO_ALLOC_SEGMENT, &redo, sizeof(rd_undo_alloc_seg_t), LOG_ENTRY_FLAG_NONE);
buf_leave_page(session, OG_TRUE);
buf_enter_page(session, PAGID_U2N(undo->entry), LATCH_MODE_X, ENTER_PAGE_RESIDENT | ENTER_PAGE_NO_READ);
page_init(session, (page_head_t *)CURR_PAGE(session), PAGID_U2N(undo->entry), PAGE_TYPE_UNDO_HEAD);
undo->segment = UNDO_GET_SEGMENT(session);
undo->segment->page_list.count = 0;
undo->segment->page_list.first = INVALID_UNDO_PAGID;
undo->segment->page_list.last = INVALID_UNDO_PAGID;
undo->segment->txn_page_count = 0;
log_put(session, RD_UNDO_CREATE_SEGMENT, CURR_PAGE(session), sizeof(page_head_t), LOG_ENTRY_FLAG_NONE);
log_append_data(session, undo->segment, sizeof(undo_segment_t));
buf_leave_page(session, OG_TRUE);
}
* create and init an undo segment during undo create
* @param kernel session, undo segment id
* @note we use space alloc extent interface to catch error
*/
static status_t undo_create_segment(knl_session_t *session, space_t *space, undo_t *undo, uint32 id)
{
page_id_t page_id;
log_atomic_op_begin(session);
if (OG_SUCCESS != spc_alloc_extent(session, space, space->ctrl->extent_size, &page_id, OG_FALSE)) {
OG_THROW_ERROR(ERR_ALLOC_EXTENT, space->ctrl->name);
log_atomic_op_end(session);
return OG_ERROR;
}
undo_create_segment_impl(session, space, undo, id, &page_id);
log_atomic_op_end(session);
return OG_SUCCESS;
}
static status_t undo_df_create_segment(knl_session_t *session, space_t *space, undo_t *undo, uint32 id, datafile_t *df)
{
page_id_t page_id;
log_atomic_op_begin(session);
if (OG_SUCCESS != spc_df_alloc_extent(session, space, space->ctrl->extent_size, &page_id, df)) {
OG_THROW_ERROR(ERR_ALLOC_EXTENT, space->ctrl->name);
log_atomic_op_end(session);
return OG_ERROR;
}
undo_create_segment_impl(session, space, undo, id, &page_id);
log_atomic_op_end(session);
return OG_SUCCESS;
}
static void undo_get_stat_records(knl_session_t *session, undo_context_t *ogx, undo_stat_t *undo_stat)
{
uint32 i;
undo_t *undo = NULL;
uint64 buf_busy_wait = 0;
uint32 page_list_cnt = 0;
uint32 stat_cnt = ogx->stat_cnt % OG_MAX_UNDO_STAT_RECORDS;
if (ogx->stat_cnt == 0) {
undo_stat->begin_time = cm_now();
} else if (stat_cnt == 0) {
undo_stat->begin_time = ogx->stat[OG_MAX_UNDO_STAT_RECORDS - 1].end_time;
} else {
undo_stat->begin_time = ogx->stat[stat_cnt - 1].end_time;
}
undo_stat->end_time = cm_now();
for (i = 0; i < UNDO_SEGMENT_COUNT(session); i++) {
undo = &ogx->undos[i];
undo_stat->total_undo_pages += undo->segment->page_list.count;
undo_stat->reuse_expire_pages += undo->stat.reuse_expire_pages;
undo_stat->reuse_unexpire_pages += undo->stat.reuse_unexpire_pages;
undo_stat->use_space_pages += undo->stat.use_space_pages;
undo_stat->steal_expire_pages += undo->stat.steal_expire_pages;
undo_stat->steal_unexpire_pages += undo->stat.steal_unexpire_pages;
undo_stat->txn_cnts += undo->stat.txn_cnts;
undo_stat->total_buf_busy_waits += undo->stat.buf_busy_waits;
if ((undo->stat.buf_busy_waits > buf_busy_wait) || (undo->segment->page_list.count > page_list_cnt)) {
buf_busy_wait = undo->stat.buf_busy_waits;
page_list_cnt = undo->segment->page_list.count;
undo_stat->busy_wait_segment = i;
}
}
undo = &ogx->undos[undo_stat->busy_wait_segment];
undo_stat->busy_seg_pages = undo->segment->page_list.count;
undo_stat->longest_sql_time = ogx->longest_sql_time;
return;
}
static void undo_set_stat_records(undo_context_t *ogx, undo_stat_t undo_stat)
{
uint32 stat_cnt = ogx->stat_cnt % OG_MAX_UNDO_STAT_RECORDS;
ogx->stat[stat_cnt].begin_time = undo_stat.begin_time;
ogx->stat[stat_cnt].end_time = undo_stat.end_time;
ogx->stat[stat_cnt].total_undo_pages = undo_stat.total_undo_pages;
ogx->stat[stat_cnt].reuse_expire_pages = undo_stat.reuse_expire_pages;
ogx->stat[stat_cnt].reuse_unexpire_pages = undo_stat.reuse_unexpire_pages;
ogx->stat[stat_cnt].use_space_pages = undo_stat.use_space_pages;
ogx->stat[stat_cnt].steal_expire_pages = undo_stat.steal_expire_pages;
ogx->stat[stat_cnt].steal_unexpire_pages = undo_stat.steal_unexpire_pages;
ogx->stat[stat_cnt].txn_cnts = undo_stat.txn_cnts;
ogx->stat[stat_cnt].longest_sql_time = undo_stat.longest_sql_time;
ogx->stat[stat_cnt].total_buf_busy_waits = undo_stat.total_buf_busy_waits;
ogx->stat[stat_cnt].busy_wait_segment = undo_stat.busy_wait_segment;
ogx->stat[stat_cnt].busy_seg_pages = undo_stat.busy_seg_pages;
ogx->stat_cnt++;
if (ogx->stat_cnt >= OG_MAX_UNDO_STAT_RECORDS) {
ogx->stat_cnt = ogx->stat_cnt % OG_MAX_UNDO_STAT_RECORDS + OG_MAX_UNDO_STAT_RECORDS;
}
return;
}
void undo_timed_task(knl_session_t *session)
{
uint32 i;
undo_context_t *ogx = &session->kernel->undo_ctx;
uint32 stat_cnt = ogx->stat_cnt % OG_MAX_UNDO_STAT_RECORDS;
undo_stat_t stat = { 0 };
undo_get_stat_records(session, ogx, &stat);
cm_spin_lock(&ogx->stat[stat_cnt].lock, NULL);
undo_set_stat_records(ogx, stat);
cm_spin_unlock(&ogx->stat[stat_cnt].lock);
ogx->longest_sql_time = 0;
for (i = 0; i < UNDO_SEGMENT_COUNT(session); i++) {
undo_t *undo = &ogx->undos[i];
MEMS_RETVOID_IFERR(memset_sp(&undo->stat, sizeof(undo_seg_stat_t), 0, sizeof(undo_seg_stat_t)));
undo->stat.begin_time = cm_now();
}
return;
}
* init undo pages for the specified undo segment during undo pre-load
* @param kernel session, undo segment id, number of init pages
*/
static void undo_init_segment(knl_session_t *session, uint32 id, uint32 init_pages)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_t *undo = &ogx->undos[id];
rd_undo_fmt_page_t redo;
page_id_t page_id;
uint32 extent_size;
uint32 i;
for (;;) {
log_atomic_op_begin(session);
buf_enter_page(session, PAGID_U2N(undo->entry), LATCH_MODE_X, ENTER_PAGE_RESIDENT);
if (undo->segment->page_list.count >= init_pages) {
buf_leave_page(session, OG_FALSE);
log_atomic_op_end(session);
return;
}
if (!spc_alloc_undo_extent(session, ogx->space, &page_id, &extent_size)) {
buf_leave_page(session, OG_FALSE);
log_atomic_op_end(session);
return;
}
for (i = 0; i < extent_size; i++) {
buf_enter_page(session, page_id, LATCH_MODE_X, ENTER_PAGE_NO_READ);
undo_format_page(session, (undo_page_t *)CURR_PAGE(session), page_id, INVALID_UNDO_PAGID,
undo->segment->page_list.first);
redo.page_id = PAGID_N2U(page_id);
redo.prev = g_invalid_undo_pagid;
redo.next = undo->segment->page_list.first;
log_put(session, RD_UNDO_FORMAT_PAGE, &redo, sizeof(rd_undo_fmt_page_t), LOG_ENTRY_FLAG_NONE);
buf_leave_page(session, OG_TRUE);
undo->segment->page_list.first = PAGID_N2U(page_id);
if (undo->segment->page_list.count == 0) {
undo->segment->page_list.last = undo->segment->page_list.first;
}
undo->segment->page_list.count++;
page_id.page++;
}
log_put(session, RD_UNDO_CHANGE_SEGMENT, &undo->segment->page_list, sizeof(undo_page_list_t),
LOG_ENTRY_FLAG_NONE);
buf_leave_page(session, OG_TRUE);
log_atomic_op_end(session);
}
MEMS_RETVOID_IFERR(memset_sp(&undo->stat, sizeof(undo_seg_stat_t), 0, sizeof(undo_seg_stat_t)));
undo->stat.begin_time = cm_now();
return;
}
* warm up the undo space by init some undo pages for each undo segment
*/
static void undo_preload_proc(thread_t *thread)
{
knl_session_t *session = (knl_session_t *)thread->argument;
undo_context_t *ogx = &session->kernel->undo_ctx;
uint64 total_pages;
uint32 init_pages;
uint32 i;
cm_set_thread_name("undo preload");
OG_LOG_RUN_INF("undo preload thread started");
KNL_SESSION_SET_CURR_THREADID(session, cm_get_current_thread_id());
knl_qos_begin(session);
total_pages = spc_count_pages(session, ogx->space, OG_FALSE);
init_pages = UNDO_INIT_PAGES(session, total_pages);
for (i = 0; i < UNDO_SEGMENT_COUNT(session); i++) {
undo_init_segment(session, i, init_pages);
}
knl_qos_end(session);
OG_LOG_RUN_INF("undo preload thread closed");
KNL_SESSION_CLEAR_THREADID(session);
}
* undo pre-load thread
* @note only called after the first startup after database create
*/
status_t undo_preload(knl_session_t *session)
{
knl_instance_t *kernel = session->kernel;
undo_context_t *ogx = &kernel->undo_ctx;
if (OG_SUCCESS != cm_create_thread(undo_preload_proc, 0, kernel->sessions[SESSION_ID_UNDO], &ogx->thread)) {
return OG_ERROR;
}
return OG_SUCCESS;
}
* undo create interface
*/
status_t undo_create(knl_session_t *session, uint32 inst_id, uint32 space_id, uint32 lseg_no, uint32 count)
{
undo_set_t *undo_set = UNDO_SET(session, inst_id);
undo_t *undo = NULL;
uint32 i;
uint32 j;
space_t *space = SPACE_GET(session, space_id);
for (i = lseg_no; i < count; i++) {
undo = &undo_set->undos[i];
if (OG_SUCCESS != undo_create_segment(session, space, undo, i)) {
return OG_ERROR;
}
for (j = 0; j < UNDO_DEF_TXN_PAGE(session); j++) {
if (OG_SUCCESS != undo_extend_txn(session, space, undo)) {
return OG_ERROR;
}
}
}
return OG_SUCCESS;
}
status_t temp_undo_create(knl_session_t *session, uint32 inst_id, uint32 space_id, uint32 lseg_no, uint32 count)
{
undo_set_t *temp_undo_set = TEMP_UNDO_SET(session, inst_id);
undo_t *undo = NULL;
uint32 i;
uint32 j;
space_t *space = SPACE_GET(session, space_id);
for (i = lseg_no; i < count; i++) {
undo = &temp_undo_set->undos[i];
if (OG_SUCCESS != undo_create_segment(session, space, undo, i)) {
return OG_ERROR;
}
for (j = 0; j < UNDO_DEF_TXN_PAGE(session); j++) {
if (OG_SUCCESS != undo_extend_txn(session, space, undo)) {
return OG_ERROR;
}
}
}
return OG_SUCCESS;
}
status_t undo_df_create(knl_session_t *session, uint32 space_id, uint32 lseg_no, uint32 count, datafile_t *df)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_t *undo = NULL;
uint32 i;
uint32 j;
ogx->space = SPACE_GET(session, space_id);
for (i = lseg_no; i < count; i++) {
undo = &ogx->undos[i];
if (OG_SUCCESS != undo_df_create_segment(session, ogx->space, undo, i, df)) {
return OG_ERROR;
}
for (j = 0; j < UNDO_DEF_TXN_PAGE(session); j++) {
if (OG_SUCCESS != undo_df_extend_txn(session, ogx->space, undo, df)) {
return OG_ERROR;
}
}
}
return OG_SUCCESS;
}
* extend undo pages for the specified undo segment
* As we use space alloc undo extent, we may get a normal extent allocated from space hwm
* or an undo page allocated from space free undo page lists. Here, we skip the last
* allocated page which would be inited by the caller, and format the rest undo pages and
* link the rest undo pages to undo segment.
* @param kernel session, undo, allocated undo page id (output param), extent size (output param)
* @note when extend segment failed, we don't throw an error, the caller would take other
* undo strategies into consideration.
*/
static bool32 undo_extend_segment(knl_session_t *session, undo_page_list_t *page_list, space_t *space,
page_id_t *page_id, uint32 *extent_size)
{
uint32 i;
knl_begin_session_wait(session, UNDO_EXTEND_SEGMENT, OG_FALSE);
if (!spc_alloc_undo_extent(session, space, page_id, extent_size)) {
knl_end_session_wait(session, UNDO_EXTEND_SEGMENT);
return OG_FALSE;
}
knl_end_session_wait(session, UNDO_EXTEND_SEGMENT);
for (i = 0; i < *extent_size - 1; i++) {
buf_enter_page(session, *page_id, LATCH_MODE_X, ENTER_PAGE_NO_READ);
undo_format_page(session, (undo_page_t *)CURR_PAGE(session), *page_id, INVALID_UNDO_PAGID, page_list->first);
if (SPACE_IS_LOGGING(space)) {
rd_undo_fmt_page_t redo;
redo.page_id = PAGID_N2U(*page_id);
redo.prev = INVALID_UNDO_PAGID;
redo.next = page_list->first;
log_put(session, RD_UNDO_FORMAT_PAGE, &redo, sizeof(rd_undo_fmt_page_t), LOG_ENTRY_FLAG_NONE);
}
buf_leave_page(session, OG_TRUE);
page_list->first = PAGID_N2U(*page_id);
if (page_list->count == 0) {
page_list->last = page_list->first;
}
page_list->count++;
page_id->page++;
}
if (*extent_size > 1) {
if (SPACE_IS_LOGGING(space)) {
log_put(session, RD_UNDO_CHANGE_SEGMENT, page_list, sizeof(undo_page_list_t), LOG_ENTRY_FLAG_NONE);
}
* while extent_size > 1 means it is extended from datafile, for temp2_undo,
* because of it has no recovery when restart,
* and undo page in temp2_undo is not latest, so, it must be formated.
*/
if (SPACE_IS_NOLOGGING(space)) {
undo_page_t *page = NULL;
buf_enter_page(session, *page_id, LATCH_MODE_X, ENTER_PAGE_NO_READ);
page = (undo_page_t *)CURR_PAGE(session);
undo_format_page(session, page, *page_id, INVALID_UNDO_PAGID, INVALID_UNDO_PAGID);
buf_leave_page(session, OG_TRUE);
}
}
return OG_TRUE;
}
static inline bool32 undo_shrink_need_suspend(knl_session_t *session)
{
return db_in_switch(&session->kernel->switch_ctrl);
}
* shrink the specified undo segment
* Shrink extra undo pages of the undo segment to undo space one by one.
* If undo page count is less than reserve pages, just skip shrink.
* During shrinking, we only shrink undo pages which exceed undo retention time.
* @param kernel session, undo, reserved undo pages
*/
static void undo_shrink_segment(knl_session_t *session, undo_t *undo, bool32 need_redo, uint32 reserve_pages)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_page_t *page = NULL;
undo_page_list_t *page_list = NULL;
space_t *space = NULL;
undo_page_id_t first;
undo_page_id_t next;
uint32 i;
uint64 begin_time = KNL_NOW(session);
space = need_redo ? ogx->space : ogx->temp_space;
if (space == NULL) {
page_list = UNDO_GET_FREE_PAGELIST(undo, need_redo);
knl_panic_log(page_list->count == 0, "undo shrink segment has free %llu pages, time consuming: %llu",
session->kernel->stat.undo_free_pages, session->kernel->stat.undo_shrink_times);
return;
}
for (i = 0; i < UNDO_SHRINK_PAGES; i++) {
if (undo_shrink_need_suspend(session)) {
break;
}
log_atomic_op_begin(session);
buf_enter_page(session, PAGID_U2N(undo->entry), LATCH_MODE_X, ENTER_PAGE_RESIDENT);
page_list = UNDO_GET_FREE_PAGELIST(undo, need_redo);
if (page_list->count <= reserve_pages) {
buf_leave_page(session, OG_FALSE);
log_atomic_op_end(session);
return;
}
first = page_list->first;
buf_enter_page(session, PAGID_U2N(first), LATCH_MODE_S, ENTER_PAGE_NORMAL);
page = (undo_page_t *)CURR_PAGE(session);
if (KNL_NOW(session) - page->ss_time < (date_t)ogx->retention * MICROSECS_PER_SECOND) {
buf_leave_page(session, OG_FALSE);
buf_leave_page(session, OG_FALSE);
log_atomic_op_end(session);
return;
}
next = PAGID_N2U(AS_PAGID(page->head.next_ext));
buf_leave_page(session, OG_FALSE);
page_list->count--;
if (page_list->count > 0) {
knl_panic_log(!IS_INVALID_PAGID(next), "undo shrink segment has free %llu pages, time consuming: %llu",
session->kernel->stat.undo_free_pages, session->kernel->stat.undo_shrink_times);
}
page_list->first = next;
if (need_redo) {
log_put(session, RD_UNDO_CHANGE_SEGMENT, page_list, sizeof(undo_page_list_t), LOG_ENTRY_FLAG_NONE);
}
buf_leave_page(session, OG_TRUE && need_redo);
spc_free_extent(session, space, PAGID_U2N(first));
log_atomic_op_end(session);
session->kernel->stat.undo_free_pages++;
session->kernel->stat.undo_shrink_times += (KNL_NOW(session) - begin_time);
}
}
* shrink page_list of all undo segment
*/
void undo_shrink_inactive_segments(knl_session_t *session)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_t *undo = NULL;
uint32 i;
uint32 active_segment = session->kernel->attr.undo_active_segments;
uint64 begin_time = KNL_NOW(session);
for (i = active_segment; i < UNDO_SEGMENT_COUNT(session); i++) {
undo = &ogx->undos[i];
log_atomic_op_begin(session);
buf_enter_page(session, PAGID_U2N(undo->entry), LATCH_MODE_X, ENTER_PAGE_RESIDENT);
if (undo->segment->page_list.count == 0) {
buf_leave_page(session, OG_FALSE);
log_atomic_op_end(session);
continue;
}
spc_free_undo_extents(session, ogx->space, &undo->segment->page_list);
undo->segment->page_list.count = 0;
undo->segment->page_list.first = g_invalid_undo_pagid;
undo->segment->page_list.last = g_invalid_undo_pagid;
log_put(session, RD_UNDO_CHANGE_SEGMENT, &undo->segment->page_list, sizeof(undo_page_list_t),
LOG_ENTRY_FLAG_NONE);
buf_leave_page(session, OG_TRUE);
log_atomic_op_end(session);
session->kernel->stat.undo_free_pages++;
session->kernel->stat.undo_shrink_times += (KNL_NOW(session) - begin_time);
}
}
* undo shrink interface
* @note only cyclically caller by SMON thread
*/
void undo_shrink_segments(knl_session_t *session)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_t *undo = NULL;
uint64 undo1_total_pages;
uint32 undo1_reserve_pages;
uint64 undo2_total_pages;
uint32 undo2_reserve_pages;
uint32 i;
undo1_total_pages = spc_count_pages(session, ogx->space, OG_FALSE);
undo1_reserve_pages = UNDO_RESERVE_PAGES(session, undo1_total_pages);
undo2_total_pages = (ogx->temp_space == NULL) ? 0 : spc_count_pages(session, ogx->temp_space, OG_FALSE);
undo2_reserve_pages = UNDO_RESERVE_TEMP_PAGES(session, undo2_total_pages);
for (i = 0; i < UNDO_SEGMENT_COUNT(session); i++) {
undo = &ogx->undos[i];
undo_shrink_segment(session, undo, OG_TRUE, undo1_reserve_pages);
undo_shrink_segment(session, undo, OG_FALSE, undo2_reserve_pages);
}
}
* find the undo segment which holds the most undo pages
* @param kernel session, undo pointer
*/
static bool32 undo_find_free_segment(knl_session_t *session, bool32 need_redo, undo_t **undo)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
uint32 page_count = 0;
undo_t *curr = NULL;
uint32 i;
undo_page_list_t *page_list = NULL;
for (i = 0; i < UNDO_SEGMENT_COUNT(session); i++) {
if (i == UNDO_GET_SESSION_UNDO_SEGID(session)) {
continue;
}
curr = &ogx->undos[i];
buf_enter_page(session, PAGID_U2N(curr->entry), LATCH_MODE_S, ENTER_PAGE_RESIDENT);
page_list = UNDO_GET_FREE_PAGELIST(curr, need_redo);
if (page_count < page_list->count) {
page_count = page_list->count;
*undo = curr;
}
buf_leave_page(session, OG_FALSE);
}
if (page_count == 0) {
return OG_FALSE;
}
return OG_TRUE;
}
static void undo_force_shrink_pages(knl_session_t *session, undo_page_list_t *page_list, undo_page_list_t *shrink_pages,
undo_t *undo_need_shrink, bool32 need_redo)
{
uint32 i;
undo_page_t *page = NULL;
undo_page_id_t last;
undo_page_id_t next;
uint32 page_count;
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_t *undo = UNDO_GET_SESSION_UNDO_SEGMENT(session);
page_count = MIN(page_list->count, OG_EXTENT_SIZE);
next = page_list->first;
for (i = 0; i < page_count; i++) {
last = next;
buf_enter_page(session, PAGID_U2N(last), LATCH_MODE_S, ENTER_PAGE_NORMAL);
page = (undo_page_t *)CURR_PAGE(session);
if (KNL_NOW(session) - page->ss_time < (date_t)ogx->retention * MICROSECS_PER_SECOND) {
undo->stat.steal_unexpire_pages++;
undo_need_shrink->stat.stealed_unexpire_pages++;
} else {
undo->stat.steal_expire_pages++;
undo_need_shrink->stat.stealed_expire_pages++;
}
next = PAGID_N2U(AS_PAGID(((page_head_t *)CURR_PAGE(session))->next_ext));
buf_leave_page(session, OG_FALSE);
}
shrink_pages->count = page_count;
shrink_pages->first = page_list->first;
shrink_pages->last = last;
page_list->count -= page_count;
page_list->first = next;
if (need_redo) {
log_put(session, RD_UNDO_CHANGE_SEGMENT, page_list, sizeof(undo_page_list_t), LOG_ENTRY_FLAG_NONE);
}
return;
}
* force to shrink undo pages from other undo segment to ours
* In force mode, it means that we don't care about the undo retention time,
* we don't care about the reserved pages. The only thing we care is that
* we should shrink a normal extent from the undo segment which holds the
* most free undo pages to our undo segment *directly*.
*/
static bool32 undo_force_shrink_segment(knl_session_t *session, undo_context_t *ogx, bool32 need_redo)
{
undo_t *undo_need_shrink = NULL;
undo_t *undo = UNDO_GET_SESSION_UNDO_SEGMENT(session);
undo_page_list_t shrink_pages;
uint32 page_count;
undo_page_list_t *page_list = NULL;
uint64 begin_time = KNL_NOW(session);
if (!undo_find_free_segment(session, need_redo, &undo_need_shrink)) {
return OG_FALSE;
}
if (!cm_latch_timed_x(&ogx->latch, session->id, 100, NULL)) {
return OG_TRUE;
}
log_atomic_op_begin(session);
buf_enter_page(session, PAGID_U2N(undo_need_shrink->entry), LATCH_MODE_X, ENTER_PAGE_RESIDENT);
page_list = UNDO_GET_FREE_PAGELIST(undo_need_shrink, need_redo);
if (page_list->count == 0) {
buf_leave_page(session, OG_FALSE);
log_atomic_op_end(session);
cm_unlatch(&ogx->latch, NULL);
return OG_FALSE;
}
page_count = MIN(page_list->count, OG_EXTENT_SIZE);
undo_force_shrink_pages(session, page_list, &shrink_pages, undo_need_shrink, need_redo);
buf_leave_page(session, OG_TRUE && need_redo);
undo_release_pages(session, undo, &shrink_pages, need_redo);
log_atomic_op_end(session);
cm_unlatch(&ogx->latch, NULL);
session->kernel->stat.undo_free_pages += page_count;
session->kernel->stat.undo_shrink_times += (KNL_NOW(session) - begin_time);
return OG_TRUE;
}
bool32 undo_check_active_transaction(knl_session_t *session)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_t *undo = NULL;
uint32 seg_no;
for (seg_no = 0; seg_no < UNDO_SEGMENT_COUNT(session); seg_no++) {
undo = &ogx->undos[seg_no];
if (undo->free_items.count != TXN_PER_PAGE(session) * UNDO_DEF_TXN_PAGE(session)) {
return OG_TRUE;
}
}
return OG_FALSE;
}
void undo_get_txn_hwms(knl_session_t *session, space_t *space, uint32 *hwms)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_t *undo = NULL;
uint32 i;
uint32 j;
undo_page_id_t ud_pageid;
datafile_t *df = NULL;
for (i = 0; i < UNDO_SEGMENT_COUNT(session); i++) {
undo = &ogx->undos[i];
for (j = 0; j < undo->segment->txn_page_count; j++) {
ud_pageid = undo->segment->txn_page[j];
df = DATAFILE_GET(session, ud_pageid.file);
if (ud_pageid.page >= hwms[df->file_no]) {
hwms[df->file_no] = ud_pageid.page + 1;
}
}
}
}
void undo_clean_segment_pagelist(knl_session_t *session, space_t *space)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_t *undo = NULL;
undo_page_list_t *page_list = NULL;
uint32 i;
bool32 need_redo = SPACE_IS_LOGGING(space);
for (i = 0; i < UNDO_SEGMENT_COUNT(session); i++) {
undo = &ogx->undos[i];
log_atomic_op_begin(session);
buf_enter_page(session, PAGID_U2N(undo->entry), LATCH_MODE_X, ENTER_PAGE_RESIDENT);
page_list = UNDO_GET_FREE_PAGELIST(undo, need_redo);
page_list->count = 0;
page_list->first = INVALID_UNDO_PAGID;
page_list->last = INVALID_UNDO_PAGID;
if (need_redo) {
log_put(session, RD_UNDO_CHANGE_SEGMENT, page_list, sizeof(undo_page_list_t), LOG_ENTRY_FLAG_NONE);
}
buf_leave_page(session, OG_TRUE && need_redo);
log_atomic_op_end(session);
}
}
static inline bool32 undo_need_alloc_from_space(knl_session_t *session, undo_context_t *ogx, undo_page_t *page,
uint32 reserved_size, uint8 cipher_reserve_size)
{
if (page->free_size < reserved_size &&
KNL_NOW(session) - page->ss_time < (date_t)ogx->retention * MICROSECS_PER_SECOND) {
return OG_TRUE;
}
if (undo_cipher_reserve_valid(session, page, cipher_reserve_size)) {
return OG_FALSE;
}
if (undo_is_formated_page(session, page)) {
return OG_FALSE;
}
return OG_TRUE;
}
static void undo_alloc_from_free_list(knl_session_t *session, space_t *space, undo_page_list_t *page_list,
undo_page_t *page, undo_page_id_t next_page)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_t *undo = &ogx->undos[UNDO_GET_SESSION_UNDO_SEGID(session)];
bool32 need_redo = SPACE_IS_LOGGING(space);
if (KNL_NOW(session) - page->ss_time < (date_t)ogx->retention * MICROSECS_PER_SECOND) {
undo->stat.reuse_unexpire_pages++;
} else {
undo->stat.reuse_expire_pages++;
}
page_list->count--;
if (page_list->count == 0) {
page_list->first = INVALID_UNDO_PAGID;
page_list->last = INVALID_UNDO_PAGID;
} else {
knl_panic(!IS_INVALID_PAGID(next_page));
page_list->first = next_page;
}
if (need_redo) {
log_put(session, RD_UNDO_CHANGE_SEGMENT, page_list, sizeof(undo_page_list_t), LOG_ENTRY_FLAG_NONE);
}
return;
}
* alloc undo page interface
* In undo alloc page, we use three hierarchical allocation strategies:
* 1. Alloc page from undo segment page list which exceeds retention time.
* 2. Alloc page from undo space.
* 3. Alloc page from undo segment page list which does not exceed undo retention time.
* @param kernel session, undo page reserved size, undo page id (output)
*/
static bool32 undo_alloc_page(knl_session_t *session, space_t *space, uint32 reserved_size, uint8 cipher_reserve_size,
page_id_t *page_id, bool32 *new_extent)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_t *undo = &ogx->undos[UNDO_GET_SESSION_UNDO_SEGID(session)];
undo_page_t *page = NULL;
undo_page_id_t next_page;
uint32 extent_size;
bool32 from_space = OG_FALSE;
undo_page_list_t *page_list = NULL;
bool32 need_redo = SPACE_IS_LOGGING(space);
uint64 buf_busy_waits = 0;
next_page = INVALID_UNDO_PAGID;
if (STATS_ENABLE_MONITOR_TABLE(session)) {
buf_busy_waits = session->stat_page.misses;
}
buf_enter_page(session, PAGID_U2N(undo->entry), LATCH_MODE_X, ENTER_PAGE_RESIDENT);
page_list = UNDO_GET_FREE_PAGELIST(undo, need_redo);
if (page_list->count == 0) {
from_space = OG_TRUE;
} else {
buf_enter_prefetch_page_num(session, PAGID_U2N(page_list->first), session->kernel->attr.undo_prefetch_page_num,
LATCH_MODE_S, ENTER_PAGE_HIGH_AGE);
page = (undo_page_t *)CURR_PAGE(session);
next_page = PAGID_N2U(AS_PAGID(page->head.next_ext));
from_space = undo_need_alloc_from_space(session, ogx, page, reserved_size, cipher_reserve_size);
buf_leave_page(session, OG_FALSE);
}
if (from_space) {
if (undo_extend_segment(session, page_list, space, page_id, &extent_size)) {
*new_extent = extent_size > 1 ? OG_TRUE : OG_FALSE;
buf_leave_page(session, *new_extent && need_redo);
undo->stat.use_space_pages++;
return OG_TRUE;
}
}
if (STATS_ENABLE_MONITOR_TABLE(session)) {
buf_busy_waits = session->stat_page.misses - buf_busy_waits;
undo->stat.buf_busy_waits += buf_busy_waits;
}
if (page_list->count == 0) {
buf_leave_page(session, OG_FALSE);
return OG_FALSE;
}
*page_id = PAGID_U2N(page_list->first);
undo_alloc_from_free_list(session, space, page_list, page, next_page);
buf_leave_page(session, OG_TRUE && need_redo);
return OG_TRUE;
}
* release undo pages to undo segment free page list
* @param kernel session, undo, free undo page list
*/
void undo_release_pages(knl_session_t *session, undo_t *undo, undo_page_list_t *undo_pages, bool32 need_redo)
{
undo_page_list_t *page_list = NULL;
buf_enter_page(session, PAGID_U2N(undo->entry), LATCH_MODE_X, ENTER_PAGE_RESIDENT);
page_list = UNDO_GET_FREE_PAGELIST(undo, need_redo);
if (page_list->count == 0) {
*page_list = *undo_pages;
} else {
spc_concat_extent(session, PAGID_U2N(page_list->last), PAGID_U2N(undo_pages->first));
page_list->last = undo_pages->last;
page_list->count += undo_pages->count;
}
if (need_redo) {
log_put(session, RD_UNDO_CHANGE_SEGMENT, page_list, sizeof(undo_page_list_t), LOG_ENTRY_FLAG_NONE);
}
buf_leave_page(session, OG_TRUE && need_redo);
}
static inline void undo_page_encrypt_format(knl_session_t *session, undo_page_t *page, uint8 cipher_reserve_size,
bool32 need_redo)
{
page->free_size -= cipher_reserve_size;
page->free_begin += cipher_reserve_size;
if (need_redo) {
rd_undo_cipher_reserve_t reserve_redo;
reserve_redo.cipher_reserve_size = cipher_reserve_size;
reserve_redo.unused = 0;
reserve_redo.aligned = 0;
log_put(session, RD_UNDO_CIPHER_RESERVE, &reserve_redo, sizeof(rd_undo_cipher_reserve_t), LOG_ENTRY_FLAG_NONE);
}
}
static void trigger_undo_usage_alram_log(knl_session_t *session)
{
xid_t xid = session->rm->xid;
text_t sql_text;
CM_SAVE_STACK(session->stack);
sql_text.str = (char *)cm_push(session->stack, RECORD_SQL_SIZE);
sql_text.len = RECORD_SQL_SIZE;
if (sql_text.str == NULL || g_knl_callback.get_sql_text(session->id, &sql_text) != OG_SUCCESS) {
OG_LOG_RUN_WAR(
"[TxnUndospaceUsage] session-id: %u, rmid: %u, transaction-id: %u.%u.%u, txn-alarm-threshold: %u%%, sql-detail: [fail to record sql]",
session->id, session->rm->id, xid.xmap.seg_id, xid.xmap.slot, xid.xnum,
session->kernel->attr.txn_undo_usage_alarm_threshold);
cm_reset_error();
} else {
OG_LOG_RUN_WAR(
"[TxnUndospaceUsage] session-id: %u, rmid: %u, transaction-id: %u.%u.%u, txn-alarm-threshold: %u%%, sql-detail: [%s]",
session->id, session->rm->id, xid.xmap.seg_id, xid.xmap.slot, xid.xnum,
session->kernel->attr.txn_undo_usage_alarm_threshold, T2S(&sql_text));
}
CM_RESTORE_STACK(session->stack);
}
static inline void txn_undo_page_check(knl_session_t *session, uint32 undo_page_count)
{
uint64 undo_total_pages = spc_count_pages_with_ext(session, session->kernel->undo_ctx.space, OG_FALSE);
if (undo_page_count >=
undo_total_pages * session->kernel->attr.txn_undo_usage_alarm_threshold / OG_MAX_TXN_UNDO_ALARM_THRESHOLD) {
trigger_undo_usage_alram_log(session);
session->rm->txn_alarm_enable = OG_FALSE;
}
}
* allocate undo page for txn
* Here, we use four hierarchical allocation strategies:
* 1. Alloc undo page whose page free size satisfies with request size or
* page last change time exceed undo retention time from our undo segment.
* 2. Alloc undo page from undo space, append the extra pages to undo segment.
* 3. Force to alloc undo page from undo segment free page list if any.
* 4. Try to shrink undo page from other undo segment directly if any.
* After allocation, format the undo page, append it to txn undo page list.
* @param kernel session, txn, txn reserved size
* @note we may not format the undo page we just alloc if its free size
* is over reserve size to keep the undo data as much as possible to avoid
* snapshot too old error.
*/
static status_t undo_alloc_page_for_txn(knl_session_t *session, txn_t *txn, uint32 reserve_size_input, bool32 need_redo,
uint8 cipher_reserve_size)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
knl_rm_t *rm = session->rm;
undo_page_list_t *tx_undo_page_list = NULL;
undo_page_info_t *curr_undo_page = NULL;
undo_page_t *page = NULL;
uint32 reserve_size = reserve_size_input;
page_id_t page_id;
undo_page_id_t txn_page_id;
rd_undo_fmt_page_t fmt_redo;
rd_undo_chg_page_t chg_redo;
rd_undo_chg_txn_t redo;
bool32 new_extent = OG_FALSE;
space_t *space = need_redo ? ogx->space : ogx->temp_space;
reserve_size = MAX(reserve_size, session->kernel->attr.undo_reserve_size);
txn_get_owner(session, rm->xid.xmap, &txn_page_id);
log_atomic_op_begin(session);
buf_enter_page(session, PAGID_U2N(txn_page_id), LATCH_MODE_X, ENTER_PAGE_RESIDENT);
for (;;) {
if (undo_alloc_page(session, space, reserve_size, cipher_reserve_size, &page_id, &new_extent)) {
break;
}
buf_leave_page(session, OG_FALSE);
log_atomic_op_end(session);
if (!undo_force_shrink_segment(session, ogx, need_redo)) {
OG_THROW_ERROR(ERR_NO_FREE_UNDO_PAGE);
return OG_ERROR;
}
log_atomic_op_begin(session);
buf_enter_page(session, PAGID_U2N(txn_page_id), LATCH_MODE_X, ENTER_PAGE_RESIDENT);
}
buf_enter_page(session, page_id, LATCH_MODE_X, new_extent ? ENTER_PAGE_NO_READ : ENTER_PAGE_NORMAL);
page = (undo_page_t *)CURR_PAGE(session);
if (need_redo) {
tx_undo_page_list = &txn->undo_pages;
curr_undo_page = &rm->undo_page_info;
} else {
tx_undo_page_list = &rm->noredo_undo_pages;
curr_undo_page = &rm->noredo_undo_page_info;
}
if (!new_extent && page->free_size >= reserve_size &&
undo_cipher_reserve_valid(session, page, cipher_reserve_size)) {
TO_PAGID_DATA(INVALID_PAGID, page->head.next_ext);
page->prev = tx_undo_page_list->last;
page->begin_slot = page->rows;
if (need_redo) {
chg_redo.prev = page->prev;
chg_redo.slot = page->begin_slot;
chg_redo.aligned = 0;
log_put(session, RD_UNDO_CHANGE_PAGE, &chg_redo, sizeof(rd_undo_chg_page_t), LOG_ENTRY_FLAG_NONE);
}
} else {
undo_format_page(session, page, page_id, tx_undo_page_list->last, INVALID_UNDO_PAGID);
if (need_redo) {
fmt_redo.page_id = PAGID_N2U(page_id);
fmt_redo.prev = tx_undo_page_list->last;
fmt_redo.next = INVALID_UNDO_PAGID;
log_put(session, RD_UNDO_FORMAT_PAGE, &fmt_redo, sizeof(rd_undo_fmt_page_t), LOG_ENTRY_FLAG_NONE);
}
if (cipher_reserve_size > 0) {
undo_page_encrypt_format(session, page, cipher_reserve_size, need_redo);
}
}
curr_undo_page->undo_rid.page_id = PAGID_N2U(page_id);
curr_undo_page->undo_rid.slot = page->begin_slot;
curr_undo_page->undo_fs = page->free_size;
curr_undo_page->encrypt_enable = undo_valid_encrypt(session, (page_head_t *)page);
buf_leave_page(session, OG_TRUE);
if (tx_undo_page_list->count == 0) {
tx_undo_page_list->first = PAGID_N2U(page_id);
tx_undo_page_list->last = PAGID_N2U(page_id);
} else {
spc_concat_extent(session, PAGID_U2N(tx_undo_page_list->last), page_id);
tx_undo_page_list->last = PAGID_N2U(page_id);
}
tx_undo_page_list->count++;
if (need_redo) {
redo.xmap = rm->xid.xmap;
redo.undo_pages = *tx_undo_page_list;
log_put(session, RD_UNDO_CHANGE_TXN, &redo, sizeof(rd_undo_chg_txn_t), LOG_ENTRY_FLAG_NONE);
}
buf_leave_page(session, OG_TRUE && need_redo);
log_atomic_op_end(session);
if (session->kernel->attr.txn_undo_usage_alarm_threshold != 0 && session->rm->txn_alarm_enable) {
txn_undo_page_check(session, txn->undo_pages.count);
}
return OG_SUCCESS;
}
* prepare the undo page and begin transaction if necessary
* If current undo page free size is enough, return success.
* @param kernel session, undo count, total undo request size
* need_redo flags if generate redo for undo
*/
status_t undo_multi_prepare(knl_session_t *session, uint32 count, uint32 size, bool32 need_redo, bool32 need_encrypt)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
knl_rm_t *rm = session->rm;
uint32 cost_size;
uint32 undo_freespace;
space_t *space = need_redo ? ogx->space : ogx->temp_space;
bool32 encrypt_enable = OG_FALSE;
uint8 cipher_reserve_size = need_encrypt ? space->ctrl->cipher_reserve_size : 0;
if (DB_NOT_READY(session)) {
return OG_SUCCESS;
}
if (!need_redo && ogx->temp_space == NULL) {
OG_THROW_ERROR(ERR_NOLOGGING_SPACE, "TEMP2_UNDO");
return OG_ERROR;
}
knl_panic_log(!DB_IS_READONLY(session), "current DB is readonly!");
if (rm->txn == NULL) {
if (tx_begin(session) != OG_SUCCESS) {
return OG_ERROR;
}
}
cost_size = size + count * (UNDO_ROW_HEAD_SIZE + sizeof(uint16));
if (cost_size > (uint32)(UNDO_PAGE_MAX_FREE_SIZE(session) - (uint16)cipher_reserve_size)) {
OG_THROW_ERROR(ERR_RECORD_SIZE_OVERFLOW, "undo cost", cost_size,
UNDO_PAGE_MAX_FREE_SIZE(session) - cipher_reserve_size);
return OG_ERROR;
}
if (need_redo) {
undo_freespace = rm->undo_page_info.undo_fs;
encrypt_enable = rm->undo_page_info.encrypt_enable;
rm->undo_page_info.undo_log_encrypt = need_encrypt;
} else {
undo_freespace = rm->noredo_undo_page_info.undo_fs;
encrypt_enable = rm->noredo_undo_page_info.encrypt_enable;
rm->noredo_undo_page_info.undo_log_encrypt = OG_FALSE;
}
if (encrypt_enable || !need_encrypt) {
if (undo_freespace >= cost_size) {
return OG_SUCCESS;
}
}
knl_panic_log(rm->txn != NULL, "rm's txn is NULL.");
return undo_alloc_page_for_txn(session, rm->txn, cost_size, need_redo, cipher_reserve_size);
}
static void undo_check_log_encrypt(knl_session_t *session, undo_page_t *page, space_t *space, bool32 need_redo)
{
undo_page_info_t *undo_info = NULL;
if (session->log_encrypt) {
return;
}
if (need_redo) {
undo_info = &session->rm->undo_page_info;
} else {
undo_info = &session->rm->noredo_undo_page_info;
}
#ifdef LOG_DIAG
knl_panic_log(undo_info->encrypt_enable == undo_valid_encrypt(session, (page_head_t *)page),
"undo's encrypt status is abnormal, panic info: page %u-%u type %u", AS_PAGID(page->head.id).file,
AS_PAGID(page->head.id).page, page->head.type);
#endif
if (undo_info->undo_log_encrypt) {
knl_panic_log(undo_info->encrypt_enable, "curr undo page must encryptable");
session->log_encrypt = OG_TRUE;
}
}
* undo write interface
* Organize undo row with undo data, write undo row to current undo page.
* @param kernel session, undo data
*/
void undo_write(knl_session_t *session, undo_data_t *undo_data, bool32 need_redo, bool32 nolog_insert)
{
undo_page_t *page = NULL;
undo_row_t *row = NULL;
uint16 *slot = NULL;
uint16 actual_size;
undo_rowid_t undo_rid;
undo_page_info_t *undo_page_info = UNDO_GET_PAGE_INFO(session, need_redo);
errno_t ret;
if (DB_NOT_READY(session)) {
return;
}
knl_panic_log(!DB_IS_READONLY(session), "current DB is readonly!");
undo_rid = undo_page_info->undo_rid;
space_t *space = SPACE_GET(session, DATAFILE_GET(session, undo_rid.page_id.file)->space_id);
buf_enter_page(session, PAGID_U2N(undo_rid.page_id), LATCH_MODE_X, ENTER_PAGE_NORMAL);
page = (undo_page_t *)CURR_PAGE(session);
undo_check_log_encrypt(session, page, space, need_redo);
knl_panic_log(page->free_begin + page->free_size == UNDO_PAGE_FREE_END(session, page),
"the page's free size is abnormal, "
"panic info: page %u-%u type %u free_begin %u free_size %u free_end %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, page->free_begin,
page->free_size, UNDO_PAGE_FREE_END(session, page));
knl_panic_log(undo_data->size + UNDO_ROW_HEAD_SIZE + sizeof(uint16) <= page->free_size,
"the undo_data is abnormal, panic info: page %u-%u type %u undo_data size %u page's free_size %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, undo_data->size,
page->free_size);
row = (undo_row_t *)((char *)page + page->free_begin);
row->type = undo_data->type;
row->rowid = undo_data->rowid;
row->data_size = undo_data->size;
row->prev_page = undo_data->snapshot.undo_page;
row->prev_slot = undo_data->snapshot.undo_slot;
row->scn = undo_data->snapshot.scn;
row->ssn = undo_data->ssn;
row->xid = session->rm->xid;
row->is_owscn = undo_data->snapshot.is_owscn;
row->is_xfirst = undo_data->snapshot.is_xfirst;
row->is_cleaned = 0;
if (undo_data->snapshot.contain_subpartno) {
row->contain_subpartno = OG_TRUE;
}
knl_panic_log(row->xid.value != OG_INVALID_ID64,
"the xid of row is invalid, panic info: page %u-%u type %u row xid %llu",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, row->xid.value);
knl_panic_log(undo_rid.slot == page->rows,
"the undo_rid's slot is not match to page's rows, panic info: "
"page %u-%u type %u undo_rid slot %u page rows %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, undo_rid.slot,
page->rows);
actual_size = (uint16)(UNDO_ROW_HEAD_SIZE + undo_data->size);
if (undo_data->size > 0) {
ret = memcpy_sp(row->data, page->free_size, undo_data->data, undo_data->size);
knl_securec_check(ret);
if (session->delete_ptrans) {
if (row->type == UNDO_HEAP_DELETE || row->type == UNDO_PCRH_DELETE) {
row_head_t *ptrans_row = (row_head_t *)row->data;
*(uint64 *)((char *)ptrans_row + ptrans_row->size - sizeof(uint64)) = session->xa_scn;
}
}
}
slot = UNDO_SLOT(session, page, page->rows);
*slot = page->free_begin;
* free_size less than DEFAULT_PAGE_SIZE(8192),
* actual_size is less than page size(8192) + UNDO_ROW_HEAD_SIZE,
* the sum is less than max value(65535) of uint16
*/
page->free_begin += actual_size;
page->free_size -= (actual_size + sizeof(uint16));
page->ss_time = KNL_NOW(session);
page->rows++;
undo_page_info->undo_fs = page->free_size;
undo_page_info->undo_rid.slot = page->rows;
if (need_redo) {
if (nolog_insert) {
log_put(session, RD_UNDO_NOLOG_WRITE, &undo_data->size, sizeof(uint16), LOG_ENTRY_FLAG_NONE);
} else {
log_put(session, RD_UNDO_WRITE, &page->ss_time, sizeof(date_t), LOG_ENTRY_FLAG_NONE);
log_append_data(session, row, actual_size);
}
}
buf_leave_page(session, OG_TRUE);
}
status_t undo_segment_dump(knl_session_t *session, page_head_t *page_head, cm_dump_t *dump)
{
undo_segment_t *segment = UNDO_GET_SEGMENT(session);
cm_dump(dump, "undo segment information\n");
cm_dump(dump, "\tpage lists: count %u first %u-%u last %u-%u\n", segment->page_list.count,
segment->page_list.first.file, segment->page_list.first.page, segment->page_list.last.file,
segment->page_list.last.page);
cm_dump(dump, "\ttxn_page_count: %u\n", segment->txn_page_count);
cm_dump(dump, "txn_page information on this page");
CM_DUMP_WRITE_FILE(dump);
for (uint32 slot = 0; slot < segment->txn_page_count; slot++) {
if (slot % UNDO_PAGE_PER_LINE == 0) {
cm_dump(dump, "\n\t");
}
cm_dump(dump, "%u-%u ", segment->txn_page[slot].file, segment->txn_page[slot].page);
CM_DUMP_WRITE_FILE(dump);
}
return OG_SUCCESS;
}
void undo_invalid_segments(knl_session_t *session)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
uint32 i;
uint32 j;
undo_t *undo = NULL;
for (i = 0; i < UNDO_SEGMENT_COUNT(session); i++) {
undo = &ogx->undos[i];
for (j = 0; j < UNDO_DEF_TXN_PAGE(session); j++) {
buf_unreside_page(session, PAGID_U2N(undo->segment->txn_page[j]));
}
buf_unreside_page(session, PAGID_U2N(undo->entry));
}
}
static void undo_move_txn(knl_session_t *session, undo_t *undo, uint32 id)
{
txn_t *old_txn = NULL;
txn_t *new_txn = NULL;
txn_page_t *new_txnpage = NULL;
txn_page_t *old_txnpage = NULL;
log_atomic_op_begin(session);
buf_enter_page(session, PAGID_U2N(undo->segment->txn_page[id]), LATCH_MODE_X, ENTER_PAGE_RESIDENT);
new_txnpage = (txn_page_t *)CURR_PAGE(session);
old_txnpage = undo->txn_pages[id];
for (uint32 i = 0; i < TXN_PER_PAGE(session); i++) {
old_txn = &old_txnpage->items[i];
new_txn = &new_txnpage->items[i];
new_txn->scn = old_txn->scn;
new_txn->xnum = old_txn->xnum;
}
log_put(session, RD_UNDO_MOVE_TXN, CURR_PAGE(session), sizeof(page_head_t), LOG_ENTRY_FLAG_NONE);
log_append_data(session, new_txnpage->items, DEFAULT_PAGE_SIZE(session) - sizeof(page_head_t));
buf_leave_page(session, OG_TRUE);
log_atomic_op_end(session);
}
void undo_reload_segment(knl_session_t *session, page_id_t entry)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_page_id_t *undo_entry = NULL;
undo_t *undo = NULL;
buf_enter_page(session, entry, LATCH_MODE_S, ENTER_PAGE_RESIDENT);
undo_entry = (undo_page_id_t *)(CURR_PAGE(session) + PAGE_HEAD_SIZE + sizeof(space_head_t));
buf_leave_page(session, OG_FALSE);
for (uint32 i = 0; i < UNDO_SEGMENT_COUNT(session); i++) {
buf_enter_page(session, PAGID_U2N(undo_entry[i]), LATCH_MODE_S, ENTER_PAGE_RESIDENT | ENTER_PAGE_NO_READ);
undo = &ogx->undos[i];
undo->entry = undo_entry[i];
undo->segment = UNDO_GET_SEGMENT(session);
buf_leave_page(session, OG_FALSE);
}
}
* 1. create segment and alloc txn pages for new undo space
* 2. change undo segments of undo context in memory
* 3. move txn from old undo to new undo
* @param kernel session, undo, new undo space id
*/
static status_t undo_switch(knl_session_t *session, uint32 space_id)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_t *undos = ogx->undos;
undo_t undo;
space_t *space = SPACE_GET(session, space_id);
uint32 i;
uint32 j;
for (i = 0; i < UNDO_SEGMENT_COUNT(session); i++) {
if (OG_SUCCESS != undo_create_segment(session, space, &undo, i)) {
return OG_ERROR;
}
for (j = 0; j < UNDO_DEF_TXN_PAGE(session); j++) {
if (OG_SUCCESS != undo_extend_txn(session, space, &undo)) {
return OG_ERROR;
}
}
}
undo_reload_segment(session, space->entry);
for (i = 0; i < UNDO_SEGMENT_COUNT(session); i++) {
for (j = 0; j < UNDO_DEF_TXN_PAGE(session); j++) {
undo_move_txn(session, &undos[i], j);
}
}
ogx->space = space;
return OG_SUCCESS;
}
status_t undo_switch_space(knl_session_t *session, uint32 space_id)
{
core_ctrl_t *core_ctrl = DB_CORE_CTRL(session);
rd_switch_undo_space_t rd;
space_t *space = NULL;
space_t *old_undo_space = SPACE_GET(session, dtc_my_ctrl(session)->undo_space);
space_t *new_undo_space = NULL;
undo_set_t *undo_set = MY_UNDO_SET(session);
knl_panic(0);
session->kernel->undo_ctx.is_switching = OG_TRUE;
undo_invalid_segments(session);
if (undo_switch(session, space_id) != OG_SUCCESS) {
session->kernel->undo_ctx.space = old_undo_space;
session->kernel->undo_ctx.is_switching = OG_FALSE;
return OG_ERROR;
}
dtc_my_ctrl(session)->undo_space = space_id;
undo_init(session, 0, core_ctrl->undo_segments);
if (tx_area_init(session, 0, core_ctrl->undo_segments) != OG_SUCCESS) {
session->kernel->undo_ctx.is_switching = OG_FALSE;
return OG_ERROR;
}
tx_area_release(session, undo_set);
space = SPACE_GET(session, space_id);
new_undo_space = SPACE_GET(session, space_id);
new_undo_space->ctrl->type = SPACE_TYPE_UNDO | SPACE_TYPE_DEFAULT;
old_undo_space->ctrl->type = SPACE_TYPE_UNDO;
new_undo_space->ctrl->cipher_reserve_size = old_undo_space->ctrl->cipher_reserve_size;
new_undo_space->ctrl->encrypt_version = old_undo_space->ctrl->encrypt_version;
session->kernel->undo_ctx.is_switching = OG_FALSE;
rd.op_type = RD_SWITCH_UNDO_SPACE;
rd.space_id = space_id;
rd.space_entry = space->entry;
log_put(session, RD_LOGIC_OPERATION, &rd, sizeof(rd_switch_undo_space_t), LOG_ENTRY_FLAG_NONE);
knl_commit(session);
ckpt_trigger(session, OG_TRUE, CKPT_TRIGGER_FULL);
if (db_save_space_ctrl(session, old_undo_space->ctrl->id) != OG_SUCCESS) {
CM_ABORT(0, "[DB] ABORT INFO: failed to save space ctrl file when load tablespace %s",
old_undo_space->ctrl->name);
}
if (db_save_space_ctrl(session, new_undo_space->ctrl->id) != OG_SUCCESS) {
CM_ABORT(0, "[DB] ABORT INFO: failed to save space ctrl file when load tablespace %s",
new_undo_space->ctrl->name);
}
if (db_save_core_ctrl(session) != OG_SUCCESS) {
CM_ABORT(0, "[DB] ABORT INFO: failed to save core ctrl file when load tablespace");
}
OG_LOG_RUN_INF("[UNDO] succeed to switch undo tablespace %s", new_undo_space->ctrl->name);
return OG_SUCCESS;
}
uint32 undo_max_prepare_size(knl_session_t *session, uint32 count)
{
return UNDO_PAGE_MAX_FREE_SIZE(session) - count * (UNDO_ROW_HEAD_SIZE + sizeof(uint16));
}
uint32 undo_part_locate_size(knl_handle_t knl_table)
{
table_t *table = (table_t *)knl_table;
if (!IS_PART_TABLE(table)) {
return sizeof(uint32);
}
if (IS_COMPART_TABLE(table->part_table)) {
return sizeof(knl_part_locate_t);
} else {
return sizeof(uint32);
}
}