* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* knl_temp.c
*
*
* IDENTIFICATION
* src/kernel/table/knl_temp.c
*
* -------------------------------------------------------------------------
*/
#include "knl_table_module.h"
#include "knl_temp.h"
#ifdef __cplusplus
extern "C" {
#endif
#define TEMP_CT_MAX_ROW_SIZE 8000
status_t temp_heap_alloc_page(knl_session_t *session, knl_temp_cache_t *temp_cache, uint32 *curr_vmid);
static void temp_heap_revert_update(knl_session_t *session, undo_row_t *undo_row, row_head_t *row, uint16 *offsets,
uint16 *lens);
bool32 knl_temp_object_isvalid_by_id(knl_session_t *session, uint32 uid, uint32 oid, knl_scn_t org_scn)
{
knl_dictionary_t dc;
if (knl_open_dc_by_id(session, uid, oid, &dc, OG_TRUE) != OG_SUCCESS) {
int32 code = cm_get_error_code();
if (code == ERR_TABLE_OR_VIEW_NOT_EXIST) {
return OG_FALSE;
}
return OG_TRUE;
}
if (dc.org_scn != org_scn) {
dc_close(&dc);
return OG_FALSE;
}
dc_close(&dc);
return OG_TRUE;
}
void temp_mtrl_init_context(knl_session_t *session)
{
uint32 i;
mtrl_context_t *ogx = session->temp_mtrl;
errno_t ret;
ogx->seg_count = 0;
ogx->lock = 0;
for (i = 0; i < OG_MAX_MTRL_OPEN_PAGES; i++) {
ogx->open_pages[i] = OG_INVALID_ID32;
}
for (i = 0; i < session->temp_table_capacity * 2; i++) {
ogx->segments[i] = NULL;
}
ogx->print_page = NULL;
ogx->sort_cmp = NULL;
ogx->session = (handle_t)session;
ogx->pool = NULL;
g_knl_callback.init_vmc((handle_t)ogx);
session->temp_table_count = 0;
ret = memset_sp(session->temp_table_cache, sizeof(knl_temp_cache_t) * session->temp_table_capacity, 0,
sizeof(knl_temp_cache_t) * session->temp_table_capacity);
knl_securec_check(ret);
}
void temp_mtrl_release_context(knl_session_t *session)
{
uint32 i;
mtrl_context_t *ogx = session->temp_mtrl;
if (ogx->seg_count == 0) {
int32 ret = memset_sp(ogx->segments, sizeof(ogx->segments), 0, sizeof(ogx->segments));
knl_securec_check(ret);
vmc_free(&ogx->vmc);
return;
}
for (i = 0; i < OG_MAX_MTRL_OPEN_PAGES; i++) {
if (ogx->open_pages[i] != OG_INVALID_ID32) {
vm_close(ogx->session, ogx->pool, ogx->open_pages[i], VM_ENQUE_HEAD);
ogx->open_pages[i] = OG_INVALID_ID32;
}
}
for (i = 0; i < session->temp_table_capacity * 2; i++) {
if (ogx->segments[i] != NULL) {
ogx->segments[i]->curr_page = NULL;
mtrl_release_segment(ogx, i);
}
ogx->segments[i] = NULL;
}
ogx->seg_count = 0;
ogx->open_hwm = 0;
session->temp_table_count = 0;
vmc_free(&ogx->vmc);
}
vm_page_t *buf_curr_temp_page(knl_session_t *session)
{
knl_panic_log(session->temp_page_stack.depth > 0, "temp_page_stack's depth abnormal, panic info: depth %u",
session->temp_page_stack.depth);
return session->temp_page_stack.pages[session->temp_page_stack.depth - 1];
}
static inline void buf_push_temp_page(knl_session_t *session, vm_page_t *page)
{
knl_panic_log(session->temp_page_stack.depth < KNL_MAX_PAGE_STACK_DEPTH,
"temp_page_stack's depth abnormal, panic info: depth %u", session->temp_page_stack.depth);
session->temp_page_stack.pages[session->temp_page_stack.depth] = page;
session->temp_page_stack.depth++;
}
static inline void buf_pop_temp_page(knl_session_t *session)
{
knl_panic_log(session->temp_page_stack.depth > 0, "temp_page_stack's depth abnormal, panic info: depth %u",
session->temp_page_stack.depth);
session->temp_page_stack.depth--;
}
void temp_page_init(knl_session_t *session, page_head_t *page, uint32 vmid, page_type_t type)
{
temp_page_tail_t *tail = NULL;
errno_t ret;
ret = memset_sp((char *)page, TEMP_PAGE_SIZE, 0, TEMP_PAGE_SIZE);
knl_securec_check(ret);
AS_PAGID_PTR(page->id)->vmid = vmid;
page->size_units = page_size_units(TEMP_PAGE_SIZE);
page->type = type;
AS_PAGID_PTR(page->next_ext)->vmid = OG_INVALID_ID32;
tail = TEMP_PAGE_TAIL(page);
tail->pcn = 0;
}
#ifdef LOG_DIAG
static void temp_heap_validate_page(knl_session_t *session, vm_page_t *vm_page)
{
temp_heap_page_t *page = (temp_heap_page_t *)vm_page->data;
if (page->head.type != PAGE_TYPE_TEMP_HEAP) {
return;
}
knl_panic_log(page->free_begin >= sizeof(temp_heap_page_t),
"page's free size begin is smaller than the size of "
"temp heap page's size, panic info: page %u-%u type %u free_begin %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, page->free_begin);
knl_panic_log(page->free_begin <= page->free_end,
"page's free size begin is more than end, panic info: "
"page %u-%u type %u free_begin %u free_end %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, page->free_begin,
page->free_end);
knl_panic_log(page->itls == 0, "page's itls is abnormal, panic info: page %u-%u type %u itls %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, page->itls);
for (uint16 i = 0; i < page->dirs; i++) {
temp_row_dir_t *dir = temp_heap_get_dir(page, i);
if (dir->is_free) {
continue;
}
knl_panic_log(dir->offset >= sizeof(temp_heap_page_t),
"dir's offset is abnormal, panic info: page %u-%u type %u dir offset %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, dir->offset);
knl_panic_log(dir->offset < page->free_begin,
"dir's offset is more than page's free_begin, panic info: "
"page %u-%u type %u free_begin %u dir offset %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, page->free_begin,
dir->offset);
row_head_t *row = TEMP_HEAP_GET_ROW(page, dir);
knl_panic_log(row->is_link + row->is_migr < 2, "row is both link and migr, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
knl_panic_log(ROW_ITL_ID(row) == OG_INVALID_ID8, "row itl id is valid, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
}
uint32 total_size = sizeof(temp_heap_page_t);
while (total_size < page->free_begin) {
row_head_t *pos = (row_head_t *)((char *)page + total_size);
knl_panic_log(pos->size >= HEAP_MIN_ROW_SIZE,
"the size pointed by pos is smaller than the min limit, "
"panic info: page %u-%u type %u pos size %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, pos->size);
knl_panic_log(pos->size <= TEMP_CT_MAX_ROW_SIZE,
"the size pointed by pos is more than the max limit, panic info: page %u-%u type %u pos size %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, pos->size);
total_size += pos->size;
knl_panic_log(
total_size <= page->free_begin,
"total_size is more than page's free_begin, panic info: page %u-%u type %u free_begin %u total_size %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, page->free_begin, total_size);
}
}
#endif
status_t buf_enter_temp_page_nolock(knl_session_t *session, uint32 vmid)
{
vm_page_t *vm_page = NULL;
if (vmid == OG_INVALID_ID32) {
OG_LOG_RUN_ERR("invalid vm id when buffer enter temp page with nolock");
OG_THROW_ERROR(ERR_INVALID_PAGE_ID, "");
return OG_ERROR;
}
if (mtrl_open_page(session->temp_mtrl, vmid, &vm_page) != OG_SUCCESS) {
OG_LOG_RUN_ERR("buf_enter_temp_page_nolock: "
"mtrl_open_page failed for vmid: %u, error code: %d", vmid, cm_get_error_code());
return OG_ERROR;
}
buf_push_temp_page(session, vm_page);
return OG_SUCCESS;
}
void buf_leave_temp_page_nolock(knl_session_t *session, bool32 changed)
{
vm_page_t *vm_page = NULL;
page_head_t *head;
vm_page = buf_curr_temp_page(session);
head = (page_head_t *)(vm_page->data);
buf_pop_temp_page(session);
knl_panic_log(AS_PAGID_PTR(head->id)->vmid == vm_page->vmid,
"vmid is abnormal, panic info: the vmid record on head %u the vmid record on vm page %u",
AS_PAGID_PTR(head->id)->vmid, vm_page->vmid);
#ifdef LOG_DIAG
temp_heap_validate_page(session, vm_page);
#endif
if (changed) {
head->pcn++;
TEMP_PAGE_TAIL(head)->pcn++;
}
mtrl_close_page(session->temp_mtrl, vm_page->vmid);
}
static void temp_heap_format_page(knl_session_t *session, knl_temp_cache_t *temp_cache, temp_heap_page_t *page,
uint32 id)
{
temp_page_init(session, &page->head, id, PAGE_TYPE_TEMP_HEAP);
AS_PAGID_PTR(page->next)->vmid = OG_INVALID_ID32;
page->itls = 0;
page->first_free_dir = HEAP_NO_FREE_DIR;
page->free_begin = sizeof(temp_heap_page_t);
page->free_end = (uint32)PAGE_SIZE(page->head) - sizeof(temp_page_tail_t);
page->free_size = page->free_end - page->free_begin;
page->oid = OG_INVALID_ID32;
page->uid = OG_INVALID_ID16;
page->seg_scn = OG_INVALID_ID64;
page->org_scn = temp_cache->org_scn;
page->rows = 0;
page->dirs = 0;
}
static inline void temp_heap_init_row(knl_session_t *session, row_assist_t *ra, char *buf, uint32 column_count,
uint16 flags)
{
row_init(ra, buf, KNL_MAX_ROW_SIZE(session), column_count);
ROW_SET_ITL_ID(ra->head, OG_INVALID_ID8);
ra->head->flags = flags;
}
status_t temp_create_segment(knl_session_t *session, uint32 *id)
{
mtrl_context_t *ogx = session->temp_mtrl;
mtrl_segment_t *segment = NULL;
uint32 i;
for (i = 0; i < ogx->seg_count; i++) {
segment = ogx->segments[i];
if (segment->level == OG_INVALID_ID32) {
knl_panic_log(!segment->is_used, "current segment is used.");
break;
}
}
if (i >= session->temp_table_capacity * 2) {
OG_THROW_ERROR(ERR_TOO_MANY_OBJECTS, session->temp_table_capacity * 2, "temp segments");
return OG_ERROR;
}
if (i >= ogx->seg_count) {
ogx->seg_count++;
OG_RETURN_IFERR(vmc_alloc_mem(&ogx->vmc, sizeof(mtrl_segment_t), (void **)&ogx->segments[i]));
}
segment = ogx->segments[i];
segment->vm_list.count = 0;
segment->cmp_items = NULL;
segment->type = MTRL_SEGMENT_TEMP;
segment->level = 0;
segment->is_used = OG_TRUE;
*id = i;
return OG_SUCCESS;
}
void temp_drop_segment(mtrl_context_t *ogx, uint32 id)
{
knl_panic_log(ogx->segments[id]->is_used, "current segment is not used, panic info: id %u", id);
vm_free_list(ogx->session, ogx->pool, &ogx->segments[id]->vm_list);
ogx->segments[id]->is_used = OG_FALSE;
ogx->segments[id]->level = OG_INVALID_ID32;
}
status_t temp_heap_create_segment(knl_session_t *session, knl_temp_cache_t *temp_table_ptr)
{
uint32 table_segid;
mtrl_segment_t *segment = NULL;
uint32 vmid = OG_INVALID_ID32;
if (temp_create_segment(session, &table_segid) != OG_SUCCESS) {
return OG_ERROR;
}
temp_table_ptr->table_segid = table_segid;
segment = session->temp_mtrl->segments[table_segid];
if (temp_heap_alloc_page(session, temp_table_ptr, &vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to alloc page in heap create segment.");
return OG_ERROR;
}
knl_panic_log(segment->vm_list.last == vmid, "the vm list is abnormal, panic info: vm_list's last %u vmid %u.",
segment->vm_list.last, vmid);
knl_panic_log(segment->vm_list.count > 0, "the count of vm list is abnormal, panic info: vm_list's count %u",
segment->vm_list.count);
return OG_SUCCESS;
}
static bool32 temp_heap_check_page(knl_session_t *session, knl_cursor_t *cursor, temp_heap_page_t *page,
page_type_t type)
{
table_t *table = NULL;
if (page->head.type != type) {
return OG_FALSE;
}
table = (table_t *)cursor->table;
return (page->org_scn == table->desc.org_scn);
}
static void temp_heap_insert_into_page(knl_cursor_t *cursor, temp_heap_page_t *page, row_head_t *row, undo_data_t *undo,
rd_temp_heap_insert_t *rd, uint16 *slot)
{
temp_row_dir_t *dir = NULL;
char *row_addr = NULL;
uint16 row_size = row->size;
errno_t ret;
if (page->first_free_dir == HEAP_NO_FREE_DIR || rd->new_dir) {
*slot = page->dirs;
page->dirs++;
dir = temp_heap_get_dir(page, *slot);
page->free_end -= sizeof(temp_row_dir_t);
page->free_size -= sizeof(temp_row_dir_t);
undo->snapshot.scn = 0;
undo->snapshot.is_owscn = 0;
undo->snapshot.undo_page = INVALID_UNDO_PAGID;
undo->snapshot.undo_slot = INVALID_SLOT;
} else {
*slot = page->first_free_dir;
dir = temp_heap_get_dir(page, *slot);
page->first_free_dir = dir->next_slot;
undo->snapshot.scn = dir->scn;
undo->snapshot.is_owscn = dir->is_owscn;
undo->snapshot.undo_page = dir->undo_page;
undo->snapshot.undo_slot = dir->undo_slot;
}
dir->undo_page = rd->undo_page;
dir->undo_slot = rd->undo_slot;
dir->scn = rd->ssn;
dir->is_owscn = 0;
dir->offset = page->free_begin;
row_addr = (char *)page + dir->offset;
row->flags = 0;
ROW_SET_ITL_ID(row, OG_INVALID_ID8);
row->is_changed = 0;
ret = memcpy_sp(row_addr, page->free_size, row, row_size);
knl_securec_check(ret);
page->free_begin += row_size;
page->free_size -= row_size;
page->rows++;
knl_panic_log(page->free_begin <= page->free_end,
"page's free size begin is more than end, panic info: "
"page %u-%u type %u table %s free_begin %u free_end %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type,
((table_t *)cursor->table)->desc.name, page->free_begin, page->free_end);
}
static status_t temp_heap_get_row(knl_session_t *session, knl_cursor_t *cursor, temp_heap_page_t *page,
bool32 *is_found)
{
temp_row_dir_t *dir = NULL;
row_head_t *row = NULL;
*is_found = OG_FALSE;
dir = temp_heap_get_dir(page, (uint32)cursor->rowid.vm_slot);
if (dir->is_free || cursor->ssn < dir->scn) {
*is_found = OG_FALSE;
return OG_SUCCESS;
}
if (cursor->ssn == dir->scn && cursor->action <= CURSOR_ACTION_SELECT) {
*is_found = OG_FALSE;
return OG_SUCCESS;
}
row = TEMP_HEAP_GET_ROW(page, dir);
if (row->is_migr) {
*is_found = OG_FALSE;
return OG_SUCCESS;
}
if (cursor->ssn == dir->scn && row->is_changed == 0) {
*is_found = OG_FALSE;
return OG_SUCCESS;
}
*is_found = !row->is_deleted;
if (*is_found) {
HEAP_COPY_ROW(session, cursor, row);
}
knl_panic_log(ROW_ITL_ID(row) == OG_INVALID_ID8, "row itl id is valid, panic info: page %u-%u type %u table %s",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type,
((table_t *)cursor->table)->desc.name);
knl_panic_log(row->is_link + row->is_migr < 2, "row is both link and migr, panic info: page %u-%u type %u table %s",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type,
((table_t *)cursor->table)->desc.name);
return OG_SUCCESS;
}
static status_t temp_heap_scan_full_page(knl_session_t *session, knl_cursor_t *cursor, temp_heap_page_t *page,
bool32 *is_found)
{
vm_ctrl_t *ctrl = NULL;
*is_found = OG_FALSE;
for (;;) {
cursor->link_rid.vmid = OG_INVALID_ID32;
if (cursor->rowid.vm_slot == OG_INVALID_ID16) {
cursor->rowid.vm_slot = 0;
} else {
cursor->rowid.vm_slot++;
}
if (cursor->rowid.vm_slot == page->dirs) {
ctrl = vm_get_ctrl(session->temp_mtrl->pool, (uint32)cursor->rowid.vmid);
if (session->stat_sample) {
cursor->rowid.vmid = OG_INVALID_ID32;
} else {
cursor->rowid.vmid = ctrl->next;
}
cursor->rowid.vm_slot = OG_INVALID_ID16;
return OG_SUCCESS;
}
cursor->rowid_pos.file = cursor->rowid.file;
cursor->rowid_pos.page = cursor->rowid.page;
cursor->rowid_pos.vmid = cursor->rowid.vmid;
cursor->rowid_pos.vm_slot = cursor->rowid.vm_slot;
if (temp_heap_get_row(session, cursor, page, is_found) != OG_SUCCESS) {
return OG_ERROR;
}
if (*is_found) {
return OG_SUCCESS;
}
}
}
static status_t temp_heap_get_migr_row(knl_session_t *session, knl_cursor_t *cursor)
{
temp_row_dir_t *dir = NULL;
row_head_t *row = NULL;
temp_heap_page_t *page = NULL;
int32 ret;
if (buf_enter_temp_page_nolock(session, (uint32)cursor->link_rid.vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to open heap migr vm page (%u).", cursor->link_rid.vmid);
return OG_ERROR;
}
page = TEMP_HEAP_CURR_PAGE(session);
if (!temp_heap_check_page(session, cursor, page, PAGE_TYPE_TEMP_HEAP)) {
buf_leave_temp_page_nolock(session, OG_FALSE);
OG_THROW_ERROR(ERR_OBJECT_ALREADY_DROPPED, "temp table");
return OG_ERROR;
}
dir = temp_heap_get_dir(page, (uint32)cursor->link_rid.vm_slot);
knl_panic_log(dir->is_free == 0, "dir is free, panic info: page %u-%u type %u table %s",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type,
((table_t *)cursor->table)->desc.name);
row = TEMP_HEAP_GET_ROW(page, dir);
knl_panic_log(row->is_migr == 1, "row is not migr, panic info: page %u-%u type %u table %s",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type,
((table_t *)cursor->table)->desc.name);
knl_panic_log(cursor->link_rid.vmid != OG_INVALID_ID32,
"the vm id is invalid, panic info: page %u-%u type %u table %s", AS_PAGID(page->head.id).file,
AS_PAGID(page->head.id).page, page->head.type, ((table_t *)cursor->table)->desc.name);
if (row->size != 0) {
ret = memcpy_sp(cursor->row, TEMP_CT_MAX_ROW_SIZE, row, row->size);
knl_securec_check(ret);
}
cursor->row->is_migr = 0;
buf_leave_temp_page_nolock(session, OG_FALSE);
return OG_SUCCESS;
}
static status_t temp_heap_read_by_rowid(knl_session_t *session, knl_cursor_t *cursor, bool32 *is_found)
{
temp_heap_page_t *page = NULL;
*is_found = OG_FALSE;
cursor->snapshot.is_valid = 0;
cursor->snapshot.undo_page = INVALID_UNDO_PAGID;
cursor->snapshot.undo_slot = INVALID_SLOT;
cursor->link_rid.vmid = OG_INVALID_ID32;
if (buf_enter_temp_page_nolock(session, (uint32)cursor->rowid.vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to open heap rowid vm page (%u).", cursor->rowid.vmid);
return OG_ERROR;
}
page = TEMP_HEAP_CURR_PAGE(session);
if (!temp_heap_check_page(session, cursor, page, PAGE_TYPE_TEMP_HEAP)) {
buf_leave_temp_page_nolock(session, OG_FALSE);
OG_THROW_ERROR(ERR_INVALID_ROWID);
return OG_ERROR;
}
if (cursor->rowid.vm_slot >= page->dirs) {
buf_leave_temp_page_nolock(session, OG_FALSE);
OG_THROW_ERROR(ERR_INVALID_ROWID);
return OG_ERROR;
}
if (temp_heap_get_row(session, cursor, page, is_found) != OG_SUCCESS) {
buf_leave_temp_page_nolock(session, OG_FALSE);
return OG_ERROR;
}
if (*is_found && cursor->link_rid.vmid != OG_INVALID_ID32) {
if (temp_heap_get_migr_row(session, cursor) != OG_SUCCESS) {
buf_leave_temp_page_nolock(session, OG_FALSE);
return OG_ERROR;
}
}
buf_leave_temp_page_nolock(session, OG_FALSE);
return OG_SUCCESS;
}
static status_t temp_heap_fetch_by_page(knl_session_t *session, knl_cursor_t *cursor, bool32 *is_found)
{
temp_heap_page_t *page = NULL;
*is_found = OG_FALSE;
if (buf_enter_temp_page_nolock(session, (uint32)cursor->rowid.vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to open heap fetch vm page (%u), "
"error code: %d.", cursor->rowid.vmid, cm_get_error_code());
return OG_ERROR;
}
page = TEMP_HEAP_CURR_PAGE(session);
if (!temp_heap_check_page(session, cursor, page, PAGE_TYPE_TEMP_HEAP)) {
buf_leave_temp_page_nolock(session, OG_FALSE);
OG_THROW_ERROR(ERR_OBJECT_ALREADY_DROPPED, "temp table");
return OG_ERROR;
}
if (temp_heap_scan_full_page(session, cursor, page, is_found) != OG_SUCCESS) {
buf_leave_temp_page_nolock(session, OG_FALSE);
return OG_ERROR;
}
if (*is_found && cursor->link_rid.vmid != OG_INVALID_ID32) {
if (temp_heap_get_migr_row(session, cursor) != OG_SUCCESS) {
buf_leave_temp_page_nolock(session, OG_FALSE);
return OG_ERROR;
}
}
buf_leave_temp_page_nolock(session, OG_FALSE);
return OG_SUCCESS;
}
static status_t temp_heap_try_lock_row(knl_session_t *session, knl_cursor_t *cursor)
{
temp_row_dir_t *dir = NULL;
row_head_t *row = NULL;
temp_heap_page_t *page = NULL;
if (buf_enter_temp_page_nolock(session, (uint32)cursor->rowid.vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to open heap fetch vm page (%u), "
"error code: %d.", cursor->rowid.vmid, cm_get_error_code());
return OG_ERROR;
}
page = TEMP_HEAP_CURR_PAGE(session);
dir = temp_heap_get_dir(page, (uint32)cursor->rowid.vm_slot);
knl_panic_log(!dir->is_free, "dir is free, panic info: page %u-%u type %u table %s", AS_PAGID(page->head.id).file,
AS_PAGID(page->head.id).page, page->head.type, ((table_t *)cursor->table)->desc.name);
row = TEMP_HEAP_GET_ROW(page, dir);
knl_panic_log(!row->is_deleted, "row is deleted, panic info: page %u-%u type %u table %s",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type,
((table_t *)cursor->table)->desc.name);
if (dir->scn >= cursor->ssn) {
buf_leave_temp_page_nolock(session, OG_FALSE);
OG_THROW_ERROR(ERR_ROW_SELF_UPDATED);
return OG_ERROR;
}
buf_leave_temp_page_nolock(session, OG_FALSE);
return OG_SUCCESS;
}
status_t temp_heap_lock_row(knl_session_t *session, knl_cursor_t *cursor, bool32 *is_locked)
{
if (lock_table_shared(session, cursor->dc_entity, LOCK_INF_WAIT) != OG_SUCCESS) {
return OG_ERROR;
}
if (temp_heap_try_lock_row(session, cursor) != OG_SUCCESS) {
return OG_ERROR;
}
*is_locked = OG_TRUE;
return OG_SUCCESS;
}
status_t temp_heap_fetch(knl_handle_t handle, knl_cursor_t *cursor)
{
knl_session_t *session = (knl_session_t *)handle;
if (cursor->temp_cache == NULL || CURSOR_TEMP_CACHE(cursor)->table_segid == OG_INVALID_ID32) {
cursor->eof = OG_TRUE;
return OG_SUCCESS;
}
knl_temp_cache_t *temp_table = (knl_temp_cache_t *)cursor->temp_cache;
if (temp_table->table_type == DICT_TYPE_TEMP_TABLE_TRANS && temp_table->hold_rmid != OG_INVALID_ID32 &&
session->rmid != temp_table->hold_rmid) {
cursor->eof = OG_TRUE;
return OG_SUCCESS;
}
for (;;) {
if (cursor->rowid.vmid == OG_INVALID_ID32) {
cursor->eof = OG_TRUE;
return OG_SUCCESS;
}
if (cursor->eof) {
return OG_SUCCESS;
}
if (temp_heap_fetch_by_page(session, cursor, &cursor->is_found) != OG_SUCCESS) {
return OG_ERROR;
}
if (!cursor->is_found) {
continue;
}
if (knl_match_cond(session, cursor, &cursor->is_found) != OG_SUCCESS) {
return OG_ERROR;
}
if (!cursor->is_found) {
continue;
}
if (cursor->action <= CURSOR_ACTION_SELECT) {
return OG_SUCCESS;
}
if (temp_heap_lock_row(session, cursor, &cursor->is_found) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
}
inline bool32 temp_rowid_valid(knl_handle_t session, const rowid_t *rid)
{
knl_session_t *se = (knl_session_t *)session;
vm_pool_t *pool = se->temp_mtrl->pool;
if (rid->vmid >= pool->ctrl_hwm) {
return OG_FALSE;
}
return OG_TRUE;
}
status_t temp_heap_rowid_fetch(knl_handle_t session, knl_cursor_t *knl_cur)
{
if (knl_cur->temp_cache == NULL || CURSOR_TEMP_CACHE(knl_cur)->table_segid == OG_INVALID_ID32) {
knl_cur->eof = OG_TRUE;
return OG_SUCCESS;
}
for (;;) {
if (knl_cur->rowid_no == knl_cur->rowid_count) {
knl_cur->eof = OG_TRUE;
return OG_SUCCESS;
}
knl_cur->rowid = knl_cur->rowid_array[knl_cur->rowid_no];
knl_cur->rowid_no++;
if (temp_heap_fetch_by_rowid((knl_session_t *)session, knl_cur) != OG_SUCCESS) {
return OG_ERROR;
}
if (knl_cur->is_found) {
return OG_SUCCESS;
}
}
return OG_ERROR;
}
status_t temp_heap_fetch_by_rowid(knl_session_t *session, knl_cursor_t *cursor)
{
if (!temp_rowid_valid((knl_session_t *)session, &cursor->rowid)) {
OG_THROW_ERROR(ERR_INVALID_ROWID);
return OG_ERROR;
}
if (temp_heap_read_by_rowid(session, cursor, &cursor->is_found) != OG_SUCCESS) {
return OG_ERROR;
}
if (!cursor->is_found) {
return OG_SUCCESS;
}
if (knl_match_cond(session, cursor, &cursor->is_found) != OG_SUCCESS) {
return OG_ERROR;
}
if (!cursor->is_found || cursor->action <= CURSOR_ACTION_SELECT) {
return OG_SUCCESS;
}
if (temp_heap_lock_row(session, cursor, &cursor->is_found) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t temp_heap_alloc_page(knl_session_t *session, knl_temp_cache_t *temp_cache, uint32 *curr_vmid)
{
temp_heap_page_t *page = NULL;
vm_page_t *vm_page = NULL;
mtrl_segment_t *segment = NULL;
mtrl_context_t *ogx = session->temp_mtrl;
uint32 vmid;
if (temp_cache == NULL || temp_cache->table_segid == OG_INVALID_ID32) {
OG_LOG_RUN_ERR("[TEMP] failed to alloc page");
return OG_ERROR;
}
segment = session->temp_mtrl->segments[temp_cache->table_segid];
if (vm_alloc(ogx->session, ogx->pool, &vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to extend segment when heap alloc page.");
return OG_ERROR;
}
if (buf_enter_temp_page_nolock(session, vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to open extend vm (%d) when heap alloc page.", vmid);
return OG_ERROR;
}
vm_page = buf_curr_temp_page(session);
page = (temp_heap_page_t *)vm_page->data;
temp_heap_format_page(session, temp_cache, page, vmid);
buf_leave_temp_page_nolock(session, OG_TRUE);
vm_append(ogx->pool, &segment->vm_list, vmid);
*curr_vmid = vmid;
knl_panic_log(segment->vm_list.last == vmid,
"list's last is not equal to vmid, panic info: page %u-%u type %u vm_list's last %u vmid %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, segment->vm_list.last,
vmid);
return OG_SUCCESS;
}
static status_t temp_enter_heap_insert(knl_session_t *session, knl_cursor_t *cursor, uint32 cost_size,
uint32 *alloc_vmid)
{
temp_heap_page_t *page = NULL;
vm_page_t *vm_page = NULL;
uint32 vmid;
uint64 pct_free_size = ((table_t *)cursor->table)->desc.pctfree * TEMP_PAGE_SIZE / 100;
vmid = session->temp_mtrl->segments[CURSOR_TEMP_CACHE(cursor)->table_segid]->vm_list.last;
if (buf_enter_temp_page_nolock(session, vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to open vm page (%u) when enter heap insert.", vmid);
return OG_ERROR;
}
vm_page = buf_curr_temp_page(session);
page = (temp_heap_page_t *)vm_page->data;
knl_panic_log(page->head.type == PAGE_TYPE_TEMP_HEAP,
"page type is abnormal, panic info: page %u-%u type %u table %s", AS_PAGID(page->head.id).file,
AS_PAGID(page->head.id).page, page->head.type, ((table_t *)cursor->table)->desc.name);
if (page->free_end < cost_size + page->free_begin + pct_free_size) {
buf_leave_temp_page_nolock(session, OG_FALSE);
if (OG_SUCCESS != temp_heap_alloc_page(session, cursor->temp_cache, &vmid)) {
OG_LOG_RUN_ERR("failed to find free temp buffer page for enter insert size : %u.", cost_size);
return OG_ERROR;
}
if (buf_enter_temp_page_nolock(session, vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to open new allocated page (%d) when enter heap insert.", vmid);
return OG_ERROR;
}
vm_page = buf_curr_temp_page(session);
page = (temp_heap_page_t *)vm_page->data;
knl_panic_log(page->free_end >= cost_size + page->free_begin,
"page's free size is abnormal, panic info: "
"page %u-%u type %u table %s free_end %u free_begin %u cost_size %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type,
((table_t *)cursor->table)->desc.name, page->free_end, page->free_begin, cost_size);
}
*alloc_vmid = vmid;
return OG_SUCCESS;
}
static void temp_batch_insert_rows(knl_session_t *session, knl_cursor_t *cursor, undo_data_t *undo, uint16 *rows_size)
{
rd_temp_heap_insert_t rd;
row_head_t *row = cursor->row;
uint16 vm_slot;
vm_page_t *vm_page = buf_curr_temp_page(session);
temp_heap_page_t *page = (temp_heap_page_t *)vm_page->data;
ROW_SET_ITL_ID(row, OG_INVALID_ID8);
rd.ssn = cursor->ssn;
if (IS_LOGGING_TABLE_BY_TYPE(cursor->dc_type)) {
rd.undo_page = session->rm->undo_page_info.undo_rid.page_id;
rd.undo_slot = session->rm->undo_page_info.undo_rid.slot;
} else {
rd.undo_page = session->rm->noredo_undo_page_info.undo_rid.page_id;
rd.undo_slot = session->rm->noredo_undo_page_info.undo_rid.slot;
}
rd.new_dir = OG_TRUE;
temp_heap_undo_binsert_t *batch_undo = (temp_heap_undo_binsert_t *)undo->data;
batch_undo->obj_info.uid = CURSOR_TEMP_CACHE(cursor)->user_id;
batch_undo->obj_info.table_id = CURSOR_TEMP_CACHE(cursor)->table_id;
batch_undo->obj_info.seg_scn = CURSOR_TEMP_CACHE(cursor)->seg_scn;
batch_undo->count = 0;
batch_undo->begin_slot = page->dirs;
uint32 row_count = MIN(cursor->rowid_count - cursor->rowid_no, KNL_ROWID_ARRAY_SIZE);
uint64 pct_free_size = ((table_t *)cursor->table)->desc.pctfree * TEMP_PAGE_SIZE / 100;
for (uint32 i = 0; i < row_count; i++) {
temp_heap_insert_into_page(cursor, page, row, undo, &rd, &vm_slot);
cursor->rowid.vm_slot = vm_slot;
cursor->rowid_array[cursor->rowid_no].vmid = cursor->rowid.vmid;
cursor->rowid_array[cursor->rowid_no].vm_slot = vm_slot;
cursor->rowid_no++;
batch_undo->count++;
CURSOR_TEMP_CACHE(cursor)->rows++;
row = (row_head_t *)((char *)row + row->size);
if (i == row_count - 1) {
break;
}
uint32 max_cost_size = row->size + sizeof(temp_row_dir_t);
if (row->size > TEMP_CT_MAX_ROW_SIZE || page->free_end < page->free_begin + max_cost_size + pct_free_size) {
break;
}
}
*rows_size = (uint16)((char *)row - (char *)cursor->row);
knl_panic_log(page->dirs - batch_undo->begin_slot == batch_undo->count,
"invalid temp heap batch insert dirs %u, begin slot %u, batch count %u", (uint32)page->dirs,
(uint32)batch_undo->begin_slot, (uint32)batch_undo->count);
knl_panic_log(*rows_size != 0, "invalid temp heap batch insert rows_size %u", (uint32)(*rows_size));
undo->snapshot.is_xfirst = cursor->is_xfirst;
undo->type = UNDO_TEMP_HEAP_BINSERT;
undo->rowid = cursor->rowid;
}
static status_t temp_heap_try_batch_insert(knl_session_t *session, knl_cursor_t *cursor, uint16 *rows_size)
{
row_head_t *row = cursor->row;
uint32 vmid = OG_INVALID_ID32;
undo_data_t undo = { 0 };
bool32 need_redo = IS_TEMPTABLE_HAS_REDO(session);
if (!DB_IS_PRIMARY(&session->kernel->db)) {
need_redo = OG_FALSE;
}
if (row->size > TEMP_CT_MAX_ROW_SIZE) {
OG_THROW_ERROR(ERR_RECORD_SIZE_OVERFLOW, "insert row", row->size, TEMP_CT_MAX_ROW_SIZE);
return OG_ERROR;
}
knl_panic_log(cursor->rowid_no <= cursor->rowid_count, "invalid temp heap batch insert rowid no %u, rowid count %u",
(uint32)cursor->rowid_no, (uint32)cursor->rowid_count);
undo.size = sizeof(temp_heap_undo_binsert_t);
if (DB_IS_PRIMARY(&session->kernel->db) && undo_prepare(session, undo.size, need_redo, OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
*rows_size = 0;
cursor->is_xfirst = OG_TRUE;
session->rm->temp_has_undo = OG_TRUE;
uint32 max_cost_size = row->size + sizeof(temp_row_dir_t);
if (temp_enter_heap_insert(session, cursor, max_cost_size, &vmid) != OG_SUCCESS) {
return OG_ERROR;
}
cursor->rowid.vmid = vmid;
CM_SAVE_STACK(session->stack);
temp_heap_undo_binsert_t *batch_undo = (temp_heap_undo_binsert_t *)cm_push(session->stack, undo.size);
undo.data = (char *)batch_undo;
temp_batch_insert_rows(session, cursor, &undo, rows_size);
if (DB_IS_PRIMARY(&session->kernel->db)) {
log_atomic_op_begin(session);
undo_write(session, &undo, need_redo, OG_FALSE);
log_atomic_op_end(session);
}
CM_RESTORE_STACK(session->stack);
buf_leave_temp_page_nolock(session, OG_TRUE);
return OG_SUCCESS;
}
static status_t temp_heap_batch_insert(knl_session_t *session, knl_cursor_t *cursor)
{
status_t status;
row_head_t *row_addr = cursor->row;
uint16 offset = 0;
cursor->rowid_no = 0;
do {
status = temp_heap_try_batch_insert(session, cursor, &offset);
cursor->row = (row_head_t *)((char *)cursor->row + offset);
} while (cursor->rowid_count > cursor->rowid_no && status == OG_SUCCESS);
cursor->rowid_no = 0;
cursor->row_offset = 0;
cursor->row = row_addr;
return status;
}
static status_t temp_simple_insert(knl_session_t *session, knl_cursor_t *cursor)
{
temp_heap_page_t *page = NULL;
vm_page_t *vm_page = NULL;
row_head_t *row = cursor->row;
uint32 vmid = OG_INVALID_ID32;
rd_temp_heap_insert_t rd;
uint16 vm_slot;
undo_data_t undo = { 0 };
bool32 need_redo = IS_TEMPTABLE_HAS_REDO(session);
if (!DB_IS_PRIMARY(&session->kernel->db)) {
need_redo = OG_FALSE;
}
undo_page_info_t *undo_page_info = UNDO_GET_PAGE_INFO(session, need_redo);
undo.size = sizeof(temp_heap_extra_undo_t);
if (DB_IS_PRIMARY(&session->kernel->db) && undo_prepare(session, undo.size, need_redo, OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
cursor->is_xfirst = OG_TRUE;
session->rm->temp_has_undo = OG_TRUE;
uint32 max_cost_size = row->size + sizeof(temp_row_dir_t);
knl_panic_log(session->temp_mtrl->segments[CURSOR_TEMP_CACHE(cursor)->table_segid]->vm_list.count > 0,
"vm page count is abnormal, panic info: page %u-%u type %u table %s vm_list's count %u",
cursor->rowid.file, cursor->rowid.page, ((page_head_t *)cursor->page_buf)->type,
((table_t *)cursor->table)->desc.name,
session->temp_mtrl->segments[CURSOR_TEMP_CACHE(cursor)->table_segid]->vm_list.count);
if (temp_enter_heap_insert(session, cursor, max_cost_size, &vmid) != OG_SUCCESS) {
return OG_ERROR;
}
cursor->rowid.vmid = vmid;
vm_page = buf_curr_temp_page(session);
page = (temp_heap_page_t *)vm_page->data;
ROW_SET_ITL_ID(row, OG_INVALID_ID8);
rd.ssn = cursor->ssn;
rd.undo_page = undo_page_info->undo_rid.page_id;
rd.undo_slot = undo_page_info->undo_rid.slot;
rd.new_dir = OG_FALSE;
temp_heap_insert_into_page(cursor, page, cursor->row, &undo, &rd, &vm_slot);
cursor->rowid.vm_slot = vm_slot;
undo.data = (char *)cm_push(session->stack, undo.size);
undo.snapshot.is_xfirst = cursor->is_xfirst;
undo.type = UNDO_TEMP_HEAP_INSERT;
undo.rowid = cursor->rowid;
temp_heap_extra_undo_t *extra_undo =
(temp_heap_extra_undo_t *)(undo.data + undo.size - sizeof(temp_heap_extra_undo_t));
extra_undo->uid = CURSOR_TEMP_CACHE(cursor)->user_id;
extra_undo->table_id = CURSOR_TEMP_CACHE(cursor)->table_id;
extra_undo->seg_scn = CURSOR_TEMP_CACHE(cursor)->seg_scn;
if (DB_IS_PRIMARY(&session->kernel->db)) {
log_atomic_op_begin(session);
undo_write(session, &undo, need_redo, OG_FALSE);
log_atomic_op_end(session);
}
cm_pop(session->stack);
buf_leave_temp_page_nolock(session, OG_TRUE);
CURSOR_TEMP_CACHE(cursor)->rows++;
return OG_SUCCESS;
}
status_t temp_heap_insert(knl_session_t *session, knl_cursor_t *cursor)
{
row_head_t *row = cursor->row;
knl_panic_log(cursor->is_valid, "current cursor is invalid, panic info: page %u-%u type %u table %s",
cursor->rowid.file, cursor->rowid.page, ((page_head_t *)cursor->page_buf)->type,
((table_t *)cursor->table)->desc.name);
cursor->link_rid.vmid = OG_INVALID_ID32;
if (row->size > TEMP_CT_MAX_ROW_SIZE) {
OG_THROW_ERROR(ERR_RECORD_SIZE_OVERFLOW, "insert row", row->size, TEMP_CT_MAX_ROW_SIZE);
return OG_ERROR;
}
if (row->size < HEAP_MIN_ROW_SIZE) {
row->size = HEAP_MIN_ROW_SIZE;
}
if (lock_table_shared(session, cursor->dc_entity, LOCK_INF_WAIT) != OG_SUCCESS) {
return OG_ERROR;
}
if (cursor->xid != session->rm->xid.value) {
cursor->xid = session->rm->xid.value;
}
cursor->part_loc.part_no = OG_INVALID_ID32;
if (SECUREC_UNLIKELY(cursor->rowid_count > 0)) {
return temp_heap_batch_insert(session, cursor);
}
return temp_simple_insert(session, cursor);
}
static void temp_heap_batch_free_dirs(knl_session_t *session, temp_heap_page_t *page, uint16 begin, uint16 end)
{
uint32 src_count = page->dirs - end;
uint32 free_count = end - begin;
temp_row_dir_t *dir = NULL;
for (uint32 i = end; i < page->dirs; i++) {
dir = temp_heap_get_dir(page, i);
row_head_t *row = TEMP_HEAP_GET_ROW(page, dir);
if (!row->is_link && row->is_migr) {
return;
}
}
knl_panic_log(free_count >= 1, "temp heap batch insert finish one row at least.");
if (src_count > 0) {
temp_row_dir_t *src_dir = temp_heap_get_dir(page, page->dirs - 1);
temp_row_dir_t *dst_dir = temp_heap_get_dir(page, begin + src_count - 1);
temp_row_dir_t *free_dir = temp_heap_get_dir(page, end - 1);
knl_panic_log(free_dir->is_free, "temp heap batch insert invalid dir.");
uint32 copy_size = src_count * sizeof(temp_row_dir_t);
errno_t ret = memmove_s(dst_dir, copy_size, src_dir, copy_size);
knl_securec_check(ret);
}
page->dirs -= free_count;
page->free_size += free_count * sizeof(temp_row_dir_t);
page->free_end += free_count * sizeof(temp_row_dir_t);
if (page->first_free_dir == HEAP_NO_FREE_DIR) {
return;
}
if (page->first_free_dir >= end) {
page->first_free_dir = page->first_free_dir - free_count;
}
knl_panic_log(page->first_free_dir < page->dirs, "temp page invalid first free dir %u, page dirs %u.",
(uint32)page->first_free_dir, (uint32)page->dirs);
dir = temp_heap_get_dir(page, page->first_free_dir);
while (dir->next_slot != HEAP_NO_FREE_DIR) {
if (dir->next_slot >= end) {
dir->next_slot -= free_count;
}
knl_panic_log(dir->next_slot == HEAP_NO_FREE_DIR || dir->next_slot < page->dirs,
"temp page invalid next slot %u, page dirs %u.", (uint32)dir->next_slot, (uint32)page->dirs);
dir = temp_heap_get_dir(page, dir->next_slot);
}
}
static void temp_heap_undo_insert_row(knl_session_t *session, undo_row_t *ud_row, undo_page_t *ud_page, int32 ud_slot,
uint32 vm_slot)
{
vm_page_t *vm_page = buf_curr_temp_page(session);
temp_heap_page_t *page = (temp_heap_page_t *)vm_page->data;
knl_panic_log(page->head.type == PAGE_TYPE_TEMP_HEAP, "page type is abnormal, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
temp_row_dir_t *dir = temp_heap_get_dir(page, vm_slot);
knl_panic_log(IS_SAME_PAGID(dir->undo_page, AS_PAGID(ud_page->head.id)),
"dir's undo_page and ud_page are not same, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
knl_panic_log(dir->undo_slot == ud_slot,
"dir's undo_slot is not equal ud_slot, panic info: page %u-%u type %u dir undo_slot %u ud_slot %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, dir->undo_slot, ud_slot);
row_head_t *row = TEMP_HEAP_GET_ROW(page, dir);
knl_panic_log(ROW_ITL_ID(row) == OG_INVALID_ID8, "row itl is valid, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
if (page->free_begin == dir->offset + row->size) {
page->free_begin = dir->offset;
}
if (row->is_link) {
page->free_size += HEAP_MIN_ROW_SIZE;
} else {
page->free_size += row->size;
}
page->rows--;
dir->scn = ud_row->scn;
dir->is_owscn = 0;
dir->undo_page = ud_row->prev_page;
dir->undo_slot = ud_row->prev_slot;
ROW_SET_ITL_ID(row, OG_INVALID_ID8);
row->is_deleted = 1;
dir->is_free = 1;
dir->next_slot = page->first_free_dir;
page->first_free_dir = (uint16)vm_slot;
}
status_t temp_undo_enter_page(knl_session_t *session, uint32 vmid)
{
status_t status = OG_SUCCESS;
knl_begin_session_wait(session, TEMP_ENTER_PAGE, OG_TRUE);
for (;;) {
if (buf_enter_temp_page_nolock(session, vmid) == OG_SUCCESS) {
break;
}
if (cm_get_error_code() != ERR_NO_FREE_VMEM) {
OG_LOG_RUN_ERR("temp_undo_enter_page failed with error code: %d", cm_get_error_code());
status = OG_ERROR;
break;
}
cm_reset_error();
cm_spin_sleep_and_stat2(1);
}
knl_end_session_wait(session, TEMP_ENTER_PAGE);
return status;
}
void temp_heap_undo_insert(knl_session_t *session, undo_row_t *ud_row, undo_page_t *ud_page, int32 ud_slot)
{
rowid_t rid = ud_row->rowid;
if (DB_IS_BG_ROLLBACK_SE(session)) {
return;
}
temp_heap_extra_undo_t *extra_undo =
(temp_heap_extra_undo_t *)(ud_row->data + ud_row->data_size - sizeof(temp_heap_extra_undo_t));
knl_temp_cache_t *knl_temp_cache = knl_get_temp_cache(session, extra_undo->uid, extra_undo->table_id);
if (knl_temp_cache == NULL || knl_temp_cache->seg_scn != extra_undo->seg_scn) {
return;
}
if (temp_undo_enter_page(session, (uint32)rid.vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("temp heap undo insert enter link vmid %u "
"failed with error code: %d", (uint32)rid.vmid, cm_get_error_code());
knl_panic_log(0, "temp heap undo insert enter link vmid %u failed.", (uint32)rid.vmid);
return;
}
temp_heap_undo_insert_row(session, ud_row, ud_page, ud_slot, (uint32)rid.vm_slot);
buf_leave_temp_page_nolock(session, OG_TRUE);
knl_temp_cache->rows--;
}
void temp_heap_undo_batch_insert(knl_session_t *session, undo_row_t *ud_row, undo_page_t *ud_page, int32 ud_slot)
{
rowid_t rid = ud_row->rowid;
if (DB_IS_BG_ROLLBACK_SE(session)) {
return;
}
temp_heap_undo_binsert_t *batch_undo = (temp_heap_undo_binsert_t *)ud_row->data;
knl_temp_cache_t *knl_temp_cache = knl_get_temp_cache(session, batch_undo->obj_info.uid,
batch_undo->obj_info.table_id);
if (knl_temp_cache == NULL || knl_temp_cache->seg_scn != batch_undo->obj_info.seg_scn) {
return;
}
if (temp_undo_enter_page(session, (uint32)rid.vmid) != OG_SUCCESS) {
knl_panic_log(0, "temp heap undo batch insert enter vmid %u failed.", (uint32)rid.vmid);
return;
}
vm_page_t *vm_page = buf_curr_temp_page(session);
temp_heap_page_t *page = (temp_heap_page_t *)vm_page->data;
knl_panic_log(page->head.type == PAGE_TYPE_TEMP_HEAP, "invalid temp page type %u", (uint32)page->head.type);
uint16 first_free_dir = page->first_free_dir;
uint16 begin_slot = batch_undo->begin_slot;
uint16 end_slot = batch_undo->count + begin_slot;
for (uint32 slot = end_slot - 1; slot >= begin_slot; slot--) {
temp_heap_undo_insert_row(session, ud_row, ud_page, ud_slot, slot);
if (slot == 0) {
break;
}
}
page->first_free_dir = first_free_dir;
temp_heap_batch_free_dirs(session, page, begin_slot, end_slot);
buf_leave_temp_page_nolock(session, OG_TRUE);
}
status_t temp_heap_delete(knl_session_t *session, knl_cursor_t *cursor)
{
vm_page_t *vm_page = NULL;
temp_heap_page_t *page = NULL;
row_head_t *row = NULL;
temp_row_dir_t *dir = NULL;
undo_data_t undo = { 0 };
errno_t ret;
dc_entity_t *entity = NULL;
bool32 need_redo = IS_TEMPTABLE_HAS_REDO(session);
if (!DB_IS_PRIMARY(&session->kernel->db)) {
need_redo = OG_FALSE;
}
undo_page_info_t *undo_page_info = UNDO_GET_PAGE_INFO(session, need_redo);
knl_panic_log(cursor->is_valid, "current cursor is invalid, panic info: page %u-%u type %u table %s",
cursor->rowid.file, cursor->rowid.page, ((page_head_t *)cursor->page_buf)->type,
((table_t *)cursor->table)->desc.name);
if (cursor->xid != session->rm->xid.value) {
cursor->xid = session->rm->xid.value;
}
entity = (dc_entity_t *)cursor->dc_entity;
if (entity->contain_lob) {
if (OG_SUCCESS != lob_delete(session, cursor)) {
return OG_ERROR;
}
}
undo.size = cursor->row->size + sizeof(temp_heap_extra_undo_t);
if (DB_IS_PRIMARY(&session->kernel->db) && OG_SUCCESS != undo_prepare(session, undo.size, need_redo, OG_FALSE)) {
return OG_ERROR;
}
session->rm->temp_has_undo = OG_TRUE;
undo.data = (char *)cm_push(session->stack, undo.size);
ret = memcpy_sp(undo.data, undo.size, cursor->row, cursor->row->size);
knl_securec_check(ret);
temp_heap_extra_undo_t *extra_undo =
(temp_heap_extra_undo_t *)(undo.data + undo.size - sizeof(temp_heap_extra_undo_t));
extra_undo->uid = CURSOR_TEMP_CACHE(cursor)->user_id;
extra_undo->table_id = CURSOR_TEMP_CACHE(cursor)->table_id;
extra_undo->seg_scn = CURSOR_TEMP_CACHE(cursor)->seg_scn;
undo.type = UNDO_TEMP_HEAP_DELETE;
undo.rowid = cursor->rowid;
if (buf_enter_temp_page_nolock(session, (uint32)cursor->rowid.vmid) != OG_SUCCESS) {
cm_pop(session->stack);
OG_LOG_RUN_ERR("Fail to open vm page (%u) in heap delete.", cursor->rowid.vmid);
return OG_ERROR;
}
vm_page = buf_curr_temp_page(session);
page = (temp_heap_page_t *)vm_page->data;
knl_panic_log(page->head.type == PAGE_TYPE_TEMP_HEAP,
"page type is abnormal, panic info: page %u-%u type %u table %s", AS_PAGID(page->head.id).file,
AS_PAGID(page->head.id).page, page->head.type, ((table_t *)cursor->table)->desc.name);
dir = temp_heap_get_dir(page, (uint32)cursor->rowid.vm_slot);
undo.snapshot.scn = dir->scn;
undo.snapshot.is_owscn = 0;
undo.snapshot.undo_page = dir->undo_page;
undo.snapshot.undo_slot = dir->undo_slot;
undo.snapshot.is_xfirst = cursor->is_xfirst;
dir->undo_page = undo_page_info->undo_rid.page_id;
dir->undo_slot = undo_page_info->undo_rid.slot;
dir->scn = cursor->ssn;
dir->is_owscn = 0;
row = (row_head_t *)TEMP_HEAP_GET_ROW(page, dir);
if (DB_IS_PRIMARY(&session->kernel->db)) {
log_atomic_op_begin(session);
undo_write(session, &undo, need_redo, OG_FALSE);
log_atomic_op_end(session);
}
cm_pop(session->stack);
knl_panic_log(!row->is_deleted, "row is deleted, panic info: page %u-%u type %u table %s",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type,
((table_t *)cursor->table)->desc.name);
row->is_deleted = 1;
row->is_changed = 1;
page->rows--;
if (row->is_link) {
page->free_size += HEAP_MIN_ROW_SIZE;
knl_panic_log(row->is_migr == 0, "row is migr, panic info: page %u-%u type %u table %s",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type,
((table_t *)cursor->table)->desc.name);
} else {
page->free_size += row->size;
}
buf_leave_temp_page_nolock(session, OG_TRUE);
if (cursor->link_rid.vmid != OG_INVALID_ID32) {
if (buf_enter_temp_page_nolock(session, (uint32)cursor->link_rid.vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to open link_rid vm page (%u) in heap delete.", cursor->link_rid.vmid);
return OG_ERROR;
}
vm_page = buf_curr_temp_page(session);
page = (temp_heap_page_t *)vm_page->data;
knl_panic_log(page->head.type == PAGE_TYPE_TEMP_HEAP,
"page type is abnormal, panic info: page %u-%u type %u table %s", AS_PAGID(page->head.id).file,
AS_PAGID(page->head.id).page, page->head.type, ((table_t *)cursor->table)->desc.name);
dir = temp_heap_get_dir(page, (uint32)cursor->link_rid.vm_slot);
row = TEMP_HEAP_GET_ROW(page, dir);
knl_panic_log(!row->is_deleted, "row is deleted, panic info: page %u-%u type %u table %s",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type,
((table_t *)cursor->table)->desc.name);
row->is_deleted = 1;
row->is_changed = 1;
page->free_size += row->size;
page->rows--;
buf_leave_temp_page_nolock(session, OG_TRUE);
}
CURSOR_TEMP_CACHE(cursor)->rows--;
return OG_SUCCESS;
}
void temp_heap_undo_delete(knl_session_t *session, undo_row_t *ud_row, undo_page_t *ud_page, int32 ud_slot)
{
temp_heap_page_t *page = NULL;
temp_row_dir_t *dir = NULL;
row_head_t *row = NULL;
rowid_t rid = ud_row->rowid;
rowid_t link_rid;
errno_t ret;
if (DB_IS_BG_ROLLBACK_SE(session)) {
return;
}
temp_heap_extra_undo_t *extra_undo =
(temp_heap_extra_undo_t *)(ud_row->data + ud_row->data_size - sizeof(temp_heap_extra_undo_t));
knl_temp_cache_t *knl_temp_cache = knl_get_temp_cache(session, extra_undo->uid, extra_undo->table_id);
if (knl_temp_cache == NULL || knl_temp_cache->seg_scn != extra_undo->seg_scn) {
return;
}
link_rid.vmid = OG_INVALID_ID32;
if (temp_undo_enter_page(session, (uint32)rid.vmid) != OG_SUCCESS) {
knl_panic_log(0, "temp heap undo delete enter vmid %u failed.", (uint32)rid.vmid);
return;
}
page = TEMP_HEAP_CURR_PAGE(session);
knl_panic_log(page->head.type == PAGE_TYPE_TEMP_HEAP, "page type is abnormal, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
dir = temp_heap_get_dir(page, (uint32)rid.vm_slot);
knl_panic_log(IS_SAME_PAGID(dir->undo_page, AS_PAGID(ud_page->head.id)),
"dir's undo_page and ud_page are not same, panic info: page %u-%u type %u, ud_page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type,
AS_PAGID(ud_page->head.id).file, AS_PAGID(ud_page->head.id).page, ud_page->head.type);
knl_panic_log(dir->undo_slot == ud_slot,
"dir's undo_slot is not equal to ud_slot, panic info: page %u-%u type %u dir undo_slot %u ud_slot %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, dir->undo_slot, ud_slot);
row = TEMP_HEAP_GET_ROW(page, dir);
knl_panic_log(ROW_ITL_ID(row) == OG_INVALID_ID8, "row itl is valid, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
knl_panic_log(row->is_deleted == 1, "row is not deleted, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
dir->is_owscn = 0;
dir->undo_page = ud_row->prev_page;
dir->undo_slot = ud_row->prev_slot;
dir->scn = ud_row->scn;
row->is_deleted = 0;
page->rows++;
if (row->is_link) {
ret = memcpy_sp(&link_rid, sizeof(rowid_t), (char *)row + sizeof(row_head_t), sizeof(rowid_t));
knl_securec_check(ret);
page->free_size -= HEAP_MIN_ROW_SIZE;
} else {
page->free_size -= row->size;
}
buf_leave_temp_page_nolock(session, OG_TRUE);
if (link_rid.vmid != OG_INVALID_ID32) {
if (temp_undo_enter_page(session, (uint32)link_rid.vmid) != OG_SUCCESS) {
knl_panic_log(0, "temp heap undo delete enter link vmid %u failed.", (uint32)link_rid.vmid);
return;
}
page = TEMP_HEAP_CURR_PAGE(session);
knl_panic_log(page->head.type == PAGE_TYPE_TEMP_HEAP, "page type is abnormal, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
dir = temp_heap_get_dir(page, (uint32)link_rid.vm_slot);
row = TEMP_HEAP_GET_ROW(page, dir);
row->is_deleted = 0;
page->rows++;
page->free_size -= row->size;
buf_leave_temp_page_nolock(session, OG_TRUE);
}
knl_temp_cache->rows++;
}
static void temp_heap_insert_into_page_migr(knl_session_t *session, temp_heap_page_t *page, row_head_t *row,
uint16 *slot)
{
temp_row_dir_t *dir = NULL;
char *row_addr = NULL;
errno_t ret;
*slot = page->dirs;
page->dirs++;
dir = temp_heap_get_dir(page, *slot);
page->free_end -= sizeof(temp_row_dir_t);
page->free_size -= sizeof(temp_row_dir_t);
dir->scn = 0;
dir->is_owscn = 0;
dir->undo_page = INVALID_UNDO_PAGID;
dir->undo_slot = INVALID_SLOT;
dir->offset = page->free_begin;
page->free_begin += row->size;
page->free_size -= row->size;
row->is_migr = 1;
ROW_SET_ITL_ID(row, OG_INVALID_ID8);
row_addr = (char *)page + dir->offset;
ret = memcpy_sp(row_addr, page->free_size, row, row->size);
knl_securec_check(ret);
page->rows++;
knl_panic_log(page->free_begin <= page->free_end,
"page's free size begin is more than end, panic info: page %u-%u type %u free_begin %u free_end %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, page->free_begin,
page->free_end);
}
static void temp_heap_reorganize_with_update(row_head_t *row, uint16 *offsets, uint16 *lens, knl_update_info_t *info,
row_assist_t *new_ra)
{
uint32 i;
uint32 uid;
uint8 bits;
uid = 0;
for (i = 0; i < ROW_COLUMN_COUNT(new_ra->head); i++) {
if (uid < info->count && i == info->columns[uid]) {
bits = row_get_column_bits2((row_head_t *)info->data, uid);
row_put_column_data(new_ra, bits, info->data + info->offsets[uid], info->lens[uid]);
uid++;
} else if (i < ROW_COLUMN_COUNT(row)) {
bits = row_get_column_bits2(row, i);
row_put_column_data(new_ra, bits, (char *)row + offsets[i], lens[i]);
} else {
row_put_null(new_ra);
}
}
}
static status_t temp_heap_update_migr(knl_session_t *session, knl_cursor_t *cursor, heap_update_assist_t *ua,
undo_data_t *undo)
{
vm_page_t *owner_vm_page = NULL;
vm_page_t *migr_vm_page = NULL;
vm_page_t *old_migr_vm_page = NULL;
temp_heap_page_t *owner_page = NULL;
temp_heap_page_t *migr_page = NULL;
temp_heap_page_t *old_migr_page = NULL;
temp_row_dir_t *owner_dir = NULL;
temp_row_dir_t *old_migr_dir = NULL;
row_head_t *owner_row = NULL;
row_head_t *old_migr_row = NULL;
rowid_t old_link_rid;
uint16 cost_size;
uint16 vm_slot;
uint32 vmid;
char *buf = NULL;
row_assist_t ra;
errno_t ret;
old_link_rid = cursor->link_rid;
knl_panic_log(ua->inc_size > 0,
"row increased size is abnormal, panic info: page %u-%u type %u table %s "
"inc_size %u",
cursor->rowid.file, cursor->rowid.page, ((page_head_t *)cursor->page_buf)->type,
((table_t *)cursor->table)->desc.name, ua->inc_size);
cost_size = (uint16)(ua->new_size + sizeof(temp_row_dir_t));
buf = (char *)cm_push(session->stack, ua->new_size);
temp_heap_init_row(session, &ra, buf, ua->new_cols, 0);
temp_heap_reorganize_with_update(cursor->row, cursor->offsets, cursor->lens, ua->info, &ra);
knl_panic_log(ra.head->size == ua->new_size,
"ra's size and ua's new_size are not equal, panic info: "
"page %u-%u type %u table %s ra size %u new_size %u",
cursor->rowid.file, cursor->rowid.page, ((page_head_t *)cursor->page_buf)->type,
((table_t *)cursor->table)->desc.name, ra.head->size, ua->new_size);
if (temp_enter_heap_insert(session, cursor, cost_size, &vmid) != OG_SUCCESS) {
cm_pop(session->stack);
return OG_ERROR;
}
cursor->link_rid.vmid = vmid;
migr_vm_page = buf_curr_temp_page(session);
migr_page = (temp_heap_page_t *)migr_vm_page->data;
knl_panic_log(migr_page->head.type == PAGE_TYPE_TEMP_HEAP,
"migr_page type is abnormal, panic info: migr_page %u-%u type %u", AS_PAGID(migr_page->head.id).file,
AS_PAGID(migr_page->head.id).page, migr_page->head.type);
knl_panic_log(migr_page->free_end - migr_page->free_begin >= cost_size,
"migr_page's free size is abnormal, "
"panic info: migr_page %u-%u type %u free_end %u free_begin %u cost_size %u",
AS_PAGID(migr_page->head.id).file, AS_PAGID(migr_page->head.id).page, migr_page->head.type,
migr_page->free_end, migr_page->free_begin, cost_size);
temp_heap_insert_into_page_migr(session, migr_page, ra.head, &vm_slot);
cursor->link_rid.vm_slot = vm_slot;
buf_leave_temp_page_nolock(session, OG_TRUE);
if (buf_enter_temp_page_nolock(session, (uint32)cursor->rowid.vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to open owner vm page (%u) in heap update migr.", cursor->rowid.vmid);
cm_pop(session->stack);
return OG_ERROR;
}
owner_vm_page = buf_curr_temp_page(session);
owner_page = (temp_heap_page_t *)owner_vm_page->data;
knl_panic_log(owner_page->head.type == PAGE_TYPE_TEMP_HEAP,
"owner_page type is abnormal, panic info: owner_page %u-%u type %u",
AS_PAGID(owner_page->head.id).file, AS_PAGID(owner_page->head.id).page, owner_page->head.type);
owner_dir = temp_heap_get_dir(owner_page, (uint32)cursor->rowid.vm_slot);
owner_row = TEMP_HEAP_GET_ROW(owner_page, owner_dir);
if (!owner_row->is_link) {
owner_row->is_link = 1;
owner_page->free_size += owner_row->size - HEAP_MIN_ROW_SIZE;
}
ret = memcpy_sp((char *)owner_row + sizeof(row_head_t), sizeof(rowid_t), &cursor->link_rid, sizeof(rowid_t));
knl_securec_check(ret);
buf_leave_temp_page_nolock(session, OG_TRUE);
if (old_link_rid.vmid != OG_INVALID_ID32) {
if (buf_enter_temp_page_nolock(session, (uint32)old_link_rid.vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to open old_link vm page (%d) in heap update migr.", old_link_rid.vmid);
cm_pop(session->stack);
return OG_ERROR;
}
old_migr_vm_page = buf_curr_temp_page(session);
old_migr_page = (temp_heap_page_t *)old_migr_vm_page->data;
knl_panic_log(old_migr_page->head.type == PAGE_TYPE_TEMP_HEAP,
"old_migr_page type is abnormal, panic info: old_migr_page %u-%u type %u",
AS_PAGID(old_migr_page->head.id).file, AS_PAGID(old_migr_page->head.id).page,
old_migr_page->head.type);
old_migr_dir = temp_heap_get_dir(old_migr_page, (uint32)old_link_rid.vm_slot);
old_migr_row = TEMP_HEAP_GET_ROW(old_migr_page, old_migr_dir);
old_migr_row->is_deleted = 1;
old_migr_dir->is_free = 1;
old_migr_page->rows--;
old_migr_dir->next_slot = old_migr_page->first_free_dir;
old_migr_page->first_free_dir = (uint16)old_link_rid.vm_slot;
old_migr_page->free_size += old_migr_row->size;
buf_leave_temp_page_nolock(session, OG_TRUE);
}
cm_pop(session->stack);
return OG_SUCCESS;
}
static void temp_heap_generate_undo_for_update(knl_session_t *session, knl_cursor_t *cursor, temp_heap_page_t *page,
undo_data_t *undo)
{
temp_row_dir_t *dir = NULL;
row_head_t *row = NULL;
bool32 need_redo = IS_TEMPTABLE_HAS_REDO(session);
undo_page_info_t *undo_page_info = UNDO_GET_PAGE_INFO(session, need_redo);
knl_panic_log(undo->size <= TEMP_CT_MAX_ROW_SIZE,
"undo size is more than the max limit, panic info: "
"page %u-%u type %u table %s undo size %u",
cursor->rowid.file, cursor->rowid.page, ((page_head_t *)cursor->page_buf)->type,
((table_t *)cursor->table)->desc.name, undo->size);
dir = temp_heap_get_dir(page, (uint32)cursor->rowid.vm_slot);
row = TEMP_HEAP_GET_ROW(page, dir);
undo->snapshot.scn = dir->scn;
undo->snapshot.is_owscn = 0;
undo->snapshot.undo_page = dir->undo_page;
undo->snapshot.undo_slot = dir->undo_slot;
undo->snapshot.is_xfirst = cursor->is_xfirst;
dir->undo_page = undo_page_info->undo_rid.page_id;
dir->undo_slot = undo_page_info->undo_rid.slot;
dir->scn = cursor->ssn;
dir->is_owscn = 0;
row->is_changed = 1;
temp_heap_extra_undo_t *extra_undo =
(temp_heap_extra_undo_t *)(undo->data + undo->size - sizeof(temp_heap_extra_undo_t));
extra_undo->uid = CURSOR_TEMP_CACHE(cursor)->user_id;
extra_undo->table_id = CURSOR_TEMP_CACHE(cursor)->table_id;
extra_undo->seg_scn = CURSOR_TEMP_CACHE(cursor)->seg_scn;
if (DB_IS_PRIMARY(&session->kernel->db)) {
log_atomic_op_begin(session);
undo_write(session, undo, need_redo, OG_FALSE);
log_atomic_op_end(session);
}
}
static status_t temp_heap_update_link_row(knl_session_t *session, knl_cursor_t *cursor, heap_update_assist_t *ua,
undo_data_t *undo)
{
vm_page_t *migr_vm_page = NULL;
temp_heap_page_t *migr_page = NULL;
temp_heap_page_t *page = NULL;
temp_row_dir_t *migr_dir = NULL;
row_head_t *migr_row = NULL;
if (buf_enter_temp_page_nolock(session, (uint32)cursor->rowid.vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to open vm page (%u) in heap update link row.", cursor->rowid.vmid);
return OG_ERROR;
}
page = TEMP_HEAP_CURR_PAGE(session);
knl_panic_log(page->head.type == PAGE_TYPE_TEMP_HEAP,
"page type is abnormal, panic info: page %u-%u type %u table %s", AS_PAGID(page->head.id).file,
AS_PAGID(page->head.id).page, page->head.type, ((table_t *)cursor->table)->desc.name);
temp_heap_generate_undo_for_update(session, cursor, page, undo);
buf_leave_temp_page_nolock(session, OG_TRUE);
if (buf_enter_temp_page_nolock(session, (uint32)cursor->link_rid.vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to open link_rid vm page (%u) in heap update link row.", cursor->link_rid.vmid);
return OG_ERROR;
}
migr_vm_page = buf_curr_temp_page(session);
migr_page = (temp_heap_page_t *)migr_vm_page->data;
migr_dir = temp_heap_get_dir(migr_page, (uint32)cursor->link_rid.vm_slot);
migr_row = TEMP_HEAP_GET_ROW(migr_page, migr_dir);
if (ua->mode == UPDATE_INPLACE) {
heap_update_inplace(session, cursor->offsets, cursor->lens, ua->info, migr_row);
buf_leave_temp_page_nolock(session, OG_TRUE);
return OG_SUCCESS;
}
ua->inc_size -= (int32)(migr_row->size - cursor->data_size);
if (ua->inc_size > 0 && ua->new_size > migr_page->free_end - migr_page->free_begin) {
buf_leave_temp_page_nolock(session, OG_FALSE);
return temp_heap_update_migr(session, cursor, ua, undo);
}
temp_heap_update_inpage(session, cursor->row, cursor->offsets, cursor->lens, ua, migr_page,
(uint16)cursor->link_rid.vm_slot);
buf_leave_temp_page_nolock(session, OG_TRUE);
return OG_SUCCESS;
}
status_t temp_heap_update(knl_session_t *session, knl_cursor_t *cursor)
{
heap_update_assist_t ua;
row_head_t *row = NULL;
temp_row_dir_t *dir = NULL;
dc_entity_t *entity = NULL;
vm_page_t *vm_page = NULL;
temp_heap_page_t *page = NULL;
undo_data_t undo = { 0 };
status_t status;
errno_t ret;
knl_panic_log(cursor->is_valid, "current cursor is invalid, panic info: page %u-%u type %u table %s",
cursor->rowid.file, cursor->rowid.page, ((page_head_t *)cursor->page_buf)->type,
((table_t *)cursor->table)->desc.name);
if (cursor->xid != session->rm->xid.value) {
cursor->xid = session->rm->xid.value;
}
cursor->part_loc.part_no = OG_INVALID_ID32;
entity = (dc_entity_t *)cursor->dc_entity;
ua.old_cols = ROW_COLUMN_COUNT(cursor->row);
ua.new_cols = entity->column_count;
ROWID_COPY(ua.rowid, cursor->rowid);
ua.info = &cursor->update_info;
heap_update_prepare(session, cursor->row, cursor->offsets, cursor->lens, cursor->data_size, &ua);
if (entity->contain_lob) {
if (lob_update(session, cursor, &ua) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (ua.new_size > TEMP_CT_MAX_ROW_SIZE) {
OG_THROW_ERROR(ERR_RECORD_SIZE_OVERFLOW, "update row", ua.new_size, TEMP_CT_MAX_ROW_SIZE);
return OG_ERROR;
}
undo.data = (char *)cm_push(session->stack, TEMP_CT_MAX_ROW_SIZE);
undo.rowid = cursor->rowid;
if (ua.undo_size >= cursor->row->size + 2 * sizeof(uint32)) {
undo.type = UNDO_TEMP_HEAP_UPDATE_FULL;
ret = memcpy_sp(undo.data, TEMP_CT_MAX_ROW_SIZE, cursor->row, cursor->row->size);
knl_securec_check(ret);
undo.size = cursor->row->size;
} else {
undo.type = UNDO_TEMP_HEAP_UPDATE;
heap_get_update_undo_data(session, &ua, &undo, TEMP_CT_MAX_ROW_SIZE);
}
undo.size += sizeof(temp_heap_extra_undo_t);
if (DB_IS_PRIMARY(&session->kernel->db) &&
undo_prepare(session, undo.size, IS_TEMPTABLE_HAS_REDO(session), OG_FALSE) != OG_SUCCESS) {
cm_pop(session->stack);
return OG_ERROR;
}
session->rm->temp_has_undo = OG_TRUE;
if (cursor->link_rid.vmid != OG_INVALID_ID32) {
status = temp_heap_update_link_row(session, cursor, &ua, &undo);
cm_pop(session->stack);
return status;
}
if (buf_enter_temp_page_nolock(session, (uint32)cursor->rowid.vmid) != OG_SUCCESS) {
OG_LOG_RUN_ERR("Fail to open vm page (%u) in heap update.", cursor->rowid.vmid);
cm_pop(session->stack);
return OG_ERROR;
}
vm_page = buf_curr_temp_page(session);
page = (temp_heap_page_t *)vm_page->data;
knl_panic_log(page->head.type == PAGE_TYPE_TEMP_HEAP, "page type is abnormal, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
dir = temp_heap_get_dir(page, (uint32)cursor->rowid.vm_slot);
row = (row_head_t *)TEMP_HEAP_GET_ROW(page, dir);
knl_panic_log(ROW_ITL_ID(row) == OG_INVALID_ID8, "row's itl is valid, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
temp_heap_generate_undo_for_update(session, cursor, page, &undo);
if (ua.mode == UPDATE_INPLACE) {
heap_update_inplace(session, cursor->offsets, cursor->lens, ua.info, row);
buf_leave_temp_page_nolock(session, OG_TRUE);
cm_pop(session->stack);
return OG_SUCCESS;
}
ua.inc_size -= (int32)(row->size - cursor->data_size);
if (ua.inc_size > 0 && ua.new_size > page->free_end - page->free_begin) {
buf_leave_temp_page_nolock(session, OG_FALSE);
status = temp_heap_update_migr(session, cursor, &ua, &undo);
cm_pop(session->stack);
return status;
}
temp_heap_update_inpage(session, cursor->row, cursor->offsets, cursor->lens, &ua, page,
(uint16)cursor->rowid.vm_slot);
buf_leave_temp_page_nolock(session, OG_TRUE);
cm_pop(session->stack);
return OG_SUCCESS;
}
static inline void temp_cm_decode_row(char *ptr, uint16 *offsets, uint16 *lens, uint16 *size)
{
row_head_t *row;
uint16 old_flag;
row = (row_head_t *)ptr;
old_flag = row->is_migr;
row->is_migr = 0;
cm_decode_row(ptr, offsets, lens, size);
row->is_migr = old_flag;
}
static void temp_heap_revert_update(knl_session_t *session, undo_row_t *undo_row, row_head_t *row, uint16 *offsets,
uint16 *lens)
{
row_assist_t new_ra;
knl_update_info_t info;
heap_undo_update_info_t *undo_info = NULL;
uint16 row_size;
char *buf = NULL;
errno_t err;
CM_SAVE_STACK(session->stack);
CM_PUSH_UPDATE_INFO(session, info);
buf = (char *)cm_push(session->stack, TEMP_CT_MAX_ROW_SIZE);
undo_info = (heap_undo_update_info_t *)cm_push(session->stack, TEMP_CT_MAX_ROW_SIZE);
#ifdef LOG_DIAG
err = memset_sp(buf, TEMP_CT_MAX_ROW_SIZE, 0, TEMP_CT_MAX_ROW_SIZE);
knl_securec_check(err);
#endif
err = memcpy_sp(undo_info, TEMP_CT_MAX_ROW_SIZE, undo_row->data, undo_row->data_size);
knl_securec_check(err);
row_size = row->size;
info.count = undo_info->count;
err = memcpy_sp(info.columns, session->kernel->attr.max_column_count, undo_info->columns,
info.count * sizeof(uint16));
knl_securec_check(err);
info.data = (char *)undo_info + HEAP_UNDO_UPDATE_INFO_SIZE(info.count);
temp_cm_decode_row(info.data, info.offsets, info.lens, NULL);
temp_cm_decode_row((char *)row, offsets, lens, NULL);
temp_heap_init_row(session, &new_ra, buf, undo_info->old_cols, row->flags);
temp_heap_reorganize_with_update(row, offsets, lens, &info, &new_ra);
knl_panic_log(new_ra.head->size <= row_size,
"the new_ra's size is more than row_size, panic info: new_ra size %u row_size %u", new_ra.head->size,
row_size);
err = memcpy_sp(row, row_size, new_ra.buf, new_ra.head->size);
knl_securec_check(err);
row->size = row_size;
CM_RESTORE_STACK(session->stack);
}
static void temp_heap_undo_link_row(knl_session_t *session, undo_row_t *ud_row, rowid_t *rid, uint16 ud_row_size)
{
temp_heap_page_t *page = NULL;
temp_row_dir_t *dir = NULL;
row_head_t *row = NULL;
knl_update_info_t old_info;
errno_t ret;
if (temp_undo_enter_page(session, (uint32)rid->vmid) != OG_SUCCESS) {
knl_panic_log(0, "temp heap undo link row enter vmid %u failed.", (uint32)rid->vmid);
return;
}
page = TEMP_HEAP_CURR_PAGE(session);
knl_panic_log(page->head.type == PAGE_TYPE_TEMP_HEAP, "page type is abnormal, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
dir = temp_heap_get_dir(page, (uint32)rid->vm_slot);
row = TEMP_HEAP_GET_ROW(page, dir);
if (ud_row->type == UNDO_TEMP_HEAP_UPDATE_FULL) {
uint16 row_size = row->size;
uint16 flags = row->flags;
if (ud_row_size != 0) {
ret = memcpy_sp(row, row_size, ud_row->data, (size_t)ud_row_size);
knl_securec_check(ret);
}
knl_panic_log(row->size <= row_size,
"row size is abnormal, panic info: page %u-%u type %u row's size %u row_size %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, row->size, row_size);
row->flags = flags;
ROW_SET_ITL_ID(row, OG_INVALID_ID8);
row->size = row_size;
} else {
CM_SAVE_STACK(session->stack);
CM_PUSH_UPDATE_INFO(session, old_info);
temp_heap_revert_update(session, ud_row, row, old_info.offsets, old_info.lens);
CM_RESTORE_STACK(session->stack);
}
buf_leave_temp_page_nolock(session, OG_TRUE);
}
void temp_heap_undo_update(knl_session_t *session, undo_row_t *ud_row, undo_page_t *ud_page, int32 ud_slot)
{
rowid_t rid = ud_row->rowid;
temp_heap_page_t *page = NULL;
temp_row_dir_t *dir = NULL;
row_head_t *row = NULL;
knl_update_info_t old_info;
rowid_t link_rid;
errno_t ret;
uint16 ud_row_size;
if (DB_IS_BG_ROLLBACK_SE(session)) {
return;
}
temp_heap_extra_undo_t *extra_undo =
(temp_heap_extra_undo_t *)(ud_row->data + ud_row->data_size - sizeof(temp_heap_extra_undo_t));
knl_temp_cache_t *knl_temp_cache = knl_get_temp_cache(session, extra_undo->uid, extra_undo->table_id);
ud_row_size = ud_row->data_size - sizeof(temp_heap_extra_undo_t);
if (knl_temp_cache == NULL || knl_temp_cache->seg_scn != extra_undo->seg_scn) {
return;
}
if (temp_undo_enter_page(session, (uint32)rid.vmid) != OG_SUCCESS) {
knl_panic_log(0, "temp heap undo update enter vmid %u failed.", (uint32)rid.vmid);
return;
}
page = TEMP_HEAP_CURR_PAGE(session);
knl_panic_log(page->head.type == PAGE_TYPE_TEMP_HEAP, "page type is abnormal, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
dir = temp_heap_get_dir(page, (uint32)rid.vm_slot);
knl_panic_log(IS_SAME_PAGID(dir->undo_page, AS_PAGID(ud_page->head.id)),
"dir's undo_page and ud_page are not same, panic info: page %u-%u type %u",
AS_PAGID(ud_page->head.id).file, AS_PAGID(ud_page->head.id).page, ud_page->head.type);
knl_panic_log(dir->undo_slot == ud_slot,
"the undo_slot is abnormal, panic info: page %u-%u type %u dir's undo_slot %u ud_slot %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, dir->undo_slot, ud_slot);
row = TEMP_HEAP_GET_ROW(page, dir);
knl_panic_log(ROW_ITL_ID(row) == OG_INVALID_ID8, "row's itl is valid, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
knl_panic_log(!dir->is_free, " the dir is free, panic info: page %u-%u type %u", AS_PAGID(page->head.id).file,
AS_PAGID(page->head.id).page, page->head.type);
dir->is_owscn = 0;
dir->undo_page = ud_row->prev_page;
dir->undo_slot = ud_row->prev_slot;
dir->scn = ud_row->scn;
if (row->is_link) {
ret = memcpy_sp(&link_rid, sizeof(rowid_t), (char *)row + sizeof(row_head_t), sizeof(rowid_t));
knl_securec_check(ret);
buf_leave_temp_page_nolock(session, OG_FALSE);
temp_heap_undo_link_row(session, ud_row, &link_rid, ud_row_size);
return;
}
if (ud_row->type == UNDO_TEMP_HEAP_UPDATE_FULL) {
uint16 row_size = row->size;
uint16 flags = row->flags;
ret = memcpy_sp(row, TEMP_CT_MAX_ROW_SIZE, ud_row->data, ud_row_size);
knl_securec_check(ret);
knl_panic_log(row->size <= row_size,
"row size is abnormal, panic info: page %u-%u type %u row's size %u "
"row_size %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, row->size, row_size);
row->flags = flags;
ROW_SET_ITL_ID(row, OG_INVALID_ID8);
row->size = row_size;
} else {
CM_SAVE_STACK(session->stack);
CM_PUSH_UPDATE_INFO(session, old_info);
temp_heap_revert_update(session, ud_row, row, old_info.offsets, old_info.lens);
CM_RESTORE_STACK(session->stack);
}
buf_leave_temp_page_nolock(session, OG_TRUE);
}
void temp_heap_update_inpage(knl_session_t *session, row_head_t *ori_row, uint16 *offsets, uint16 *lens,
heap_update_assist_t *ua, temp_heap_page_t *page, uint16 slot)
{
row_assist_t ra;
uint16 flags;
uint16 old_size;
row_head_t *row = NULL;
temp_row_dir_t *dir = NULL;
knl_panic_log(page->head.type == PAGE_TYPE_TEMP_HEAP, "the page type is abnormal, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
dir = temp_heap_get_dir(page, slot);
row = TEMP_HEAP_GET_ROW(page, dir);
flags = row->flags;
old_size = row->size;
knl_panic_log(ROW_ITL_ID(row) == OG_INVALID_ID8, "row's itl is valid, panic info: page %u-%u type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type);
if (ua->inc_size > 0) {
knl_panic_log(page->free_end - page->free_begin >= ua->new_size,
"page's free size is abnormal, panic info: "
"page %u-%u type %u free_end %u free_begin %u ua's new_size %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, page->free_end,
page->free_begin, ua->new_size);
dir->offset = page->free_begin;
page->free_begin += ua->new_size;
page->free_size -= ua->inc_size;
knl_panic_log(dir->offset < page->free_begin,
"the dir's offset is more than the page's free_begin, "
"panic info: page %u-%u type %u dir offset %u free_begin %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, dir->offset,
page->free_begin);
knl_panic_log(page->free_begin <= page->free_end,
"the page's free size begin is more than end, panic info: "
"page %u-%u type %u free_begin %u free_end %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, page->free_begin,
page->free_end);
row = TEMP_HEAP_GET_ROW(page, dir);
temp_heap_init_row(session, &ra, (char *)row, ua->new_cols, flags);
temp_heap_reorganize_with_update(ori_row, offsets, lens, ua->info, &ra);
knl_panic_log(ra.head->size > old_size,
"current row_size is bigger than old_size when row increased size is "
"bigger than 0, panic info: page %u-%u type %u r's size %u old_size %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, ra.head->size,
old_size);
} else {
temp_heap_init_row(session, &ra, (char *)row, ua->new_cols, flags);
temp_heap_reorganize_with_update(ori_row, offsets, lens, ua->info, &ra);
knl_panic_log(ra.head->size <= old_size,
"current row_size is bigger than old_size when row increased size "
"is not bigger than 0, panic info: page %u-%u type %u ra's size %u old_size %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, ra.head->size,
old_size);
row->size = old_size;
knl_panic_log(dir->offset < page->free_begin,
"the dir's offset is more than the page's free_begin, "
"panic info: page %u-%u type %u dir offset %u free_begin %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, dir->offset,
page->free_begin);
knl_panic_log(page->free_begin <= page->free_end,
"the page's free size begin is more than end, panic info: "
"page %u-%u type %u free_begin %u free_end %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, page->free_begin,
page->free_end);
}
}
#ifdef __cplusplus
}
#endif