* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* knl_tran.c
*
*
* IDENTIFICATION
* src/kernel/xact/knl_tran.c
*
* -------------------------------------------------------------------------
*/
#include "knl_xact_module.h"
#include "knl_tran.h"
#include "knl_lob.h"
#include "rcr_btree.h"
#include "pcr_btree.h"
#include "knl_context.h"
#include "temp_btree.h"
#include "pcr_heap.h"
#include "pcr_heap_undo.h"
#include "knl_table.h"
#include "knl_xa.h"
#include "index_common.h"
#include "dtc_dls.h"
#include "dtc_tran.h"
#include "dtc_dc.h"
#include "dtc_drc.h"
#include "dtc_context.h"
#include "oGRAC_fdsa.h"
pcr_itl_t g_init_pcr_itl = { .scn = 0, .xid.value = 0, .undo_page.value = 0, .undo_slot = 0, .flags = 0 };
static inline void tx_reset_rm(knl_session_t *session, knl_rm_t *rm)
{
lock_reset(rm);
lob_items_reset(rm);
rm->tx_id.value = OG_INVALID_ID64;
rm->txn = NULL;
rm->xid.value = OG_INVALID_ID64;
rm->svpt_count = 0;
rm->ssn = 0;
rm->begin_lsn = OG_INVALID_ID64;
rm->temp_has_undo = OG_FALSE;
rm->noredo_undo_pages.count = 0;
rm->noredo_undo_pages.first = INVALID_UNDO_PAGID;
rm->noredo_undo_pages.last = INVALID_UNDO_PAGID;
if (rm->large_page_id != OG_INVALID_ID32) {
mpool_free_page(session->kernel->attr.large_pool, rm->large_page_id);
rm->large_page_id = OG_INVALID_ID32;
}
}
void knl_tx_reset_rm(knl_handle_t session, void *rm)
{
tx_reset_rm((knl_session_t *)session, (knl_rm_t *)rm);
}
status_t tx_area_init_impl(knl_session_t *session, undo_set_t *undo_set, uint32 lseg_no, uint32 rseg_no,
bool32 is_extend)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_t *undo = NULL;
tx_item_t *item = NULL;
uint32 txn_no;
uint32 page_no;
uint32 seg_no;
uint32 id;
if (is_extend && ogx->extend_cnt == 0) {
ogx->extend_segno = lseg_no;
knl_panic(session->kernel->id == undo_set->inst_id);
}
for (seg_no = lseg_no; seg_no < rseg_no; seg_no++) {
undo = &undo_set->undos[seg_no];
undo->lock = 0;
undo->ow_scn = DB_CURR_SCN(session);
undo->capacity = UNDO_DEF_TXN_PAGE(session) * TXN_PER_PAGE(session);
if (is_extend) {
uint64 buf_size = knl_txn_buffer_size(session->kernel->attr.page_size, 1);
undo->items = (tx_item_t *)malloc((size_t)buf_size);
if (undo->items == NULL) {
OG_THROW_ERROR(ERR_ALLOC_MEMORY, (uint64)buf_size, "extend undo segments");
return OG_ERROR;
}
ogx->extend_cnt++;
} else {
undo->items = (tx_item_t *)(undo_set->tx_buf + seg_no * undo->capacity * sizeof(tx_item_t));
}
undo->free_items.count = 0;
undo->free_items.first = OG_INVALID_ID32;
undo->free_items.last = OG_INVALID_ID32;
id = 0;
for (txn_no = 0; txn_no < TXN_PER_PAGE(session); txn_no++) {
for (page_no = 0; page_no < UNDO_DEF_TXN_PAGE(session); page_no++) {
item = &undo->items[id];
item->xmap.seg_id = seg_no + OG_MAX_UNDO_SEGMENT * undo_set->inst_id;
item->xmap.slot = (uint16)(page_no * TXN_PER_PAGE(session) + txn_no);
item->lock = 0;
item->prev = OG_INVALID_ID32;
item->next = OG_INVALID_ID32;
item->rmid = OG_INVALID_ID16;
item->in_progress = OG_FALSE;
item->systime = KNL_NOW(session);
id++;
}
}
}
return OG_SUCCESS;
}
status_t tx_area_init(knl_session_t *session, uint32 lseg_no, uint32 rseg_no)
{
tx_area_t *area = &session->kernel->tran_ctx;
undo_set_t *undo_set = MY_UNDO_SET(session);
undo_context_t *undo_ctx = &session->kernel->undo_ctx;
area->scn_lock = 0;
area->seri_lock = 0;
area->rollback_num = 0;
undo_set->active_workers = 0;
undo_set->tx_buf = session->kernel->attr.tran_buf;
undo_set->assign_workers = session->kernel->attr.tx_rollback_proc_num;
undo_ctx->active_workers = 0;
return tx_area_init_impl(session, undo_set, lseg_no, rseg_no, OG_FALSE);
}
void tx_extend_deinit(knl_session_t *session)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
undo_t *undo = NULL;
for (uint32 i = ogx->extend_segno; i < ogx->extend_segno + ogx->extend_cnt; i++) {
undo = &ogx->undos[i];
CM_FREE_PTR(undo->items);
}
}
static inline tx_id_t xmap_get_txid(knl_session_t *session, xmap_t xmap)
{
tx_id_t tx_id;
tx_id.seg_id = XMAP_SEG_ID(xmap);
tx_id.item_id = xmap.slot % TXN_PER_PAGE(session) * UNDO_DEF_TXN_PAGE(session) + xmap.slot / TXN_PER_PAGE(session);
return tx_id;
}
tx_id_t tx_xmap_get_txid(knl_session_t *session, xmap_t xmap)
{
return xmap_get_txid(session, xmap);
}
static inline tx_item_t *xmap_get_item(knl_session_t *session, xmap_t xmap)
{
tx_id_t tx_id = xmap_get_txid(session, xmap);
undo_set_t *undo_set = UNDO_SET(session, XMAP_INST_ID(xmap));
undo_t *undo = &undo_set->undos[tx_id.seg_id];
return &undo->items[tx_id.item_id];
}
uint8 xid_get_inst_id(knl_session_t *session, xid_t xid)
{
uint8 inst_id = XID_INST_ID(xid);
if (inst_id == session->kernel->id) {
return inst_id;
}
return drc_get_deposit_id(inst_id);
}
uint8 xmap_get_inst_id(knl_session_t *session, xmap_t xmap)
{
uint8 inst_id = XMAP_INST_ID(xmap);
if (inst_id == session->kernel->id) {
return inst_id;
}
return drc_get_deposit_id(inst_id);
}
static inline void tx_bind_segid(knl_session_t *session, knl_rm_t *rm, uint64 global_segid)
{
uint32 active_undo_segments = UNDO_ACTIVE_SEGMENT_COUNT(session);
uint32 auton_trans_segments = UNDO_AUTON_TRANS_SEGMENT_COUNT(session);
if (!UNDO_IS_AUTON_BIND_OWN(session) || active_undo_segments <= auton_trans_segments) {
rm->undo_segid = (uint32)(global_segid % active_undo_segments);
} else {
rm->undo_segid = (uint32)(global_segid % (active_undo_segments - auton_trans_segments) + auton_trans_segments);
}
rm->tx_id.seg_id = (uint32)(global_segid % (UNDO_SEGMENT_COUNT(session) - auton_trans_segments) +
auton_trans_segments);
}
static inline void tx_bind_auton_segid(knl_session_t *session, knl_rm_t *rm, uint64 global_segid)
{
uint32 active_undo_segments = UNDO_ACTIVE_SEGMENT_COUNT(session);
uint32 auton_trans_segments = UNDO_AUTON_TRANS_SEGMENT_COUNT(session);
if (!UNDO_IS_AUTON_BIND_OWN(session) || active_undo_segments <= auton_trans_segments) {
rm->undo_segid = (uint32)(global_segid % active_undo_segments);
} else {
rm->undo_segid = (uint32)(global_segid % auton_trans_segments);
}
rm->tx_id.seg_id = (uint32)(global_segid % auton_trans_segments);
}
static inline undo_t *tx_bind_undo(knl_session_t *session, knl_rm_t *rm)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
uint64 global_segid;
rm->undo_page_info.undo_rid = g_invalid_undo_rowid;
rm->undo_page_info.undo_fs = 0;
rm->undo_page_info.encrypt_enable = OG_FALSE;
rm->undo_page_info.undo_log_encrypt = OG_FALSE;
rm->noredo_undo_page_info.undo_rid = g_invalid_undo_rowid;
rm->noredo_undo_page_info.undo_fs = 0;
rm->noredo_undo_page_info.encrypt_enable = OG_FALSE;
rm->noredo_undo_page_info.undo_log_encrypt = OG_FALSE;
global_segid = (uint64)cm_atomic_inc(&session->kernel->undo_segid);
if (rm->prev == OG_INVALID_ID16) {
tx_bind_segid(session, rm, global_segid);
} else {
tx_bind_auton_segid(session, rm, global_segid);
}
return &ogx->undos[rm->tx_id.seg_id];
}
static status_t txn_alloc(knl_session_t *session, knl_rm_t *rm)
{
undo_t *undo = tx_bind_undo(session, rm);
cm_spin_lock(&undo->lock, &session->stat->spin_stat.stat_txn_list);
if (undo->free_items.count == 0) {
cm_spin_unlock(&undo->lock);
OG_THROW_ERROR(ERR_TOO_MANY_PENDING_TRANS);
return OG_ERROR;
}
rm->tx_id.item_id = undo->free_items.first;
undo->stat.txn_cnts++;
undo->free_items.count--;
if (undo->free_items.count == 0) {
undo->free_items.first = OG_INVALID_ID32;
undo->free_items.last = OG_INVALID_ID32;
} else {
undo->free_items.first = undo->items[rm->tx_id.item_id].next;
knl_panic(undo->free_items.first != OG_INVALID_ID32);
undo->items[undo->free_items.first].prev = OG_INVALID_ID32;
}
cm_spin_unlock(&undo->lock);
return OG_SUCCESS;
}
static void txn_release(knl_session_t *session, undo_set_t *undo_set, tx_id_t tx_id)
{
CM_ASSERT(tx_id.seg_id < OG_MAX_UNDO_SEGMENT);
undo_t *undo = &undo_set->undos[tx_id.seg_id];
if (tx_id.item_id >= undo->capacity) {
return;
}
knl_temp_cache_t *temp_table_ptr = NULL;
for (uint32 i = 0; i < session->temp_table_count; i++) {
temp_table_ptr = &session->temp_table_cache[i];
if (temp_table_ptr->hold_rmid == session->rmid) {
temp_table_ptr->hold_rmid = OG_INVALID_ID32;
}
}
cm_spin_lock(&undo->lock, &session->stat->spin_stat.stat_txn_list);
if (undo->free_items.count == 0) {
undo->free_items.count = 1;
undo->free_items.first = tx_id.item_id;
undo->free_items.last = tx_id.item_id;
undo->items[tx_id.item_id].prev = OG_INVALID_ID32;
} else {
undo->items[undo->free_items.last].next = tx_id.item_id;
undo->items[tx_id.item_id].prev = undo->free_items.last;
undo->free_items.last = tx_id.item_id;
undo->free_items.count++;
}
undo->items[tx_id.item_id].next = OG_INVALID_ID32;
cm_spin_unlock(&undo->lock);
}
void tx_area_release_impl(knl_session_t *session, uint32 lseg_no, uint32 rseg_no, uint32 inst_id)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
tx_item_t *item = NULL;
undo_t *undo = NULL;
txn_t *txn = NULL;
uint32 i;
uint32 seg_no;
tx_id_t tx_id;
undo_set_t *undo_set = UNDO_SET(session, inst_id);
for (seg_no = lseg_no; seg_no < rseg_no; seg_no++) {
undo = &ogx->undos[seg_no];
for (i = 0; i < undo->capacity; i++) {
item = &undo->items[i];
txn = txn_addr(session, item->xmap);
if (txn->status == (uint8)XACT_END) {
tx_id.seg_id = XMAP_SEG_ID(item->xmap);
tx_id.item_id = i;
txn_release(session, undo_set, tx_id);
}
}
}
}
void tx_area_release(knl_session_t *session, undo_set_t *undo_set)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
core_ctrl_t *core_ctrl = DB_CORE_CTRL(session);
uint32 rcy_rm_id = 0;
bool32 need_rcy = OG_FALSE;
undo_t *undo = NULL;
tx_item_t *item = NULL;
txn_t *txn = NULL;
uint32 i;
uint32 seg_no;
tx_id_t tx_id;
for (seg_no = 0; seg_no < core_ctrl->undo_segments; seg_no++) {
undo = &undo_set->undos[seg_no];
for (i = 0; i < undo->capacity; i++) {
item = &undo->items[i];
txn = txn_addr(session, item->xmap);
if (txn->status == (uint8)XACT_END) {
tx_id.seg_id = XMAP_SEG_ID(item->xmap);
tx_id.item_id = i;
txn_release(session, undo_set, tx_id);
} else {
item->rmid = undo_set->rb_ctx[rcy_rm_id % undo_set->assign_workers].session->rmid;
rcy_rm_id++;
need_rcy = OG_TRUE;
}
}
}
OG_LOG_RUN_INF("[tx_area_release] undo_set->active_workers=%lld, undo_ctx->active_workers=%lld",
undo_set->active_workers, ogx->active_workers);
undo_set->active_workers = rcy_rm_id > 0 ? undo_set->assign_workers : 0;
cm_atomic_add(&ogx->active_workers, undo_set->active_workers);
OG_LOG_RUN_INF("[tx_area_release] add active_workers in undo_ctx, undo_set->active_workers=%lld, "
"undo_ctx->active_workers=%lld", undo_set->active_workers, ogx->active_workers);
if (session->kernel->id == undo_set->inst_id) {
tx_area_t *area = &session->kernel->tran_ctx;
area->rollback_num = need_rcy ? session->kernel->attr.tx_rollback_proc_num : 0;
}
}
static void tx_rollback_items(knl_session_t *session, thread_t *thread, undo_t *undo)
{
knl_rm_t *rm = session->rm;
tx_item_t *item = NULL;
txn_t *txn = NULL;
uint32 id;
status_t status;
for (id = 0; id < undo->capacity; id++) {
if (thread->closed) {
break;
}
item = &undo->items[id];
if (item->rmid != session->rmid) {
continue;
}
txn = txn_addr(session, item->xmap);
switch (txn->status) {
case XACT_PHASE1:
status = xa_recover(session, item, txn, id);
knl_panic(status == OG_SUCCESS);
break;
case XACT_PHASE2:
case XACT_BEGIN:
tx_rm_attach_trans(rm, item, txn, id);
knl_rollback(session, NULL);
break;
case XACT_END:
default:
break;
}
}
}
void tx_area_rollback(knl_session_t *session, thread_t *thread, undo_set_t *undo_set)
{
tx_area_t *area = &session->kernel->tran_ctx;
uint32 seg_no;
if ((!DB_IS_READONLY(session) || DB_IS_MAXFIX(session) ||
(!DB_IS_PRIMARY(&session->kernel->db) && session->kernel->lrpl_ctx.is_promoting == OG_TRUE)) &&
DB_IS_BG_ROLLBACK_SE(session) && DB_IN_BG_ROLLBACK(session)) {
for (seg_no = 0; seg_no < UNDO_SEGMENT_COUNT(session); seg_no++) {
if (thread->closed) {
break;
}
tx_rollback_items(session, thread, &undo_set->undos[seg_no]);
}
(void)cm_atomic_dec(&area->rollback_num);
}
}
inline txn_t *txn_addr(knl_session_t *session, xmap_t xmap)
{
uint32 page_capacity = TXN_PER_PAGE(session);
uint8 deposit_id = xmap_get_inst_id(session, xmap);
knl_panic(deposit_id == (uint8)session->kernel->id || (!DB_IS_PRIMARY(&session->kernel->db) && rc_is_master()));
undo_set_t *undo_set = UNDO_SET(session, XMAP_INST_ID(xmap));
undo_t *undo = &undo_set->undos[XMAP_SEG_ID(xmap)];
txn_page_t *txn_page = undo->txn_pages[xmap.slot / page_capacity];
return &txn_page->items[xmap.slot % page_capacity];
}
status_t tx_begin(knl_session_t *session)
{
knl_rm_t *rm = session->rm;
undo_t *undo = NULL;
tx_item_t *tx_item = NULL;
txn_t *txn = NULL;
undo_page_id_t page_id;
if (session->kernel->undo_ctx.is_switching) {
OG_THROW_ERROR(ERR_INVALID_OPERATION, ",when swithching undo space");
return OG_ERROR;
}
uint64 begin_time = KNL_NOW(session);
if (txn_alloc(session, rm) != OG_SUCCESS) {
return OG_ERROR;
}
if (KNL_IS_AUTON_SE(session)) {
session->kernel->stat.auto_txn_alloc_times += KNL_NOW(session) - begin_time;
} else {
session->kernel->stat.txn_alloc_times += KNL_NOW(session) - begin_time;
}
undo = &session->kernel->undo_ctx.undos[rm->tx_id.seg_id];
tx_item = &undo->items[rm->tx_id.item_id];
page_id = undo->segment->txn_page[tx_item->xmap.slot / TXN_PER_PAGE(session)];
rm->xid.xmap = tx_item->xmap;
log_atomic_op_begin(session);
begin_time = KNL_NOW(session);
buf_enter_page(session, PAGID_U2N(page_id), LATCH_MODE_X, ENTER_PAGE_RESIDENT);
txn = txn_addr(session, tx_item->xmap);
cm_spin_lock(&tx_item->lock, &session->stat->spin_stat.stat_txn);
txn->xnum++;
txn->status = (uint8)XACT_BEGIN;
txn->undo_pages.count = 0;
txn->undo_pages.first = INVALID_UNDO_PAGID;
txn->undo_pages.last = INVALID_UNDO_PAGID;
tx_item->rmid = session->rmid;
rm->xid.xnum = txn->xnum;
cm_spin_unlock(&tx_item->lock);
rm->txn = txn;
log_put(session, RD_TX_BEGIN, &rm->xid, sizeof(xid_t), LOG_ENTRY_FLAG_NONE);
buf_leave_page(session, OG_TRUE);
if (KNL_IS_AUTON_SE(session)) {
session->kernel->stat.auto_txn_page_waits += KNL_NOW(session) - begin_time;
} else {
session->kernel->stat.txn_page_waits += KNL_NOW(session) - begin_time;
}
log_atomic_op_end(session);
knl_panic(XID_INST_ID(rm->xid) == session->kernel->id);
rm->begin_lsn = session->curr_lsn;
tx_item->systime = KNL_NOW(session);
return OG_SUCCESS;
}
* if call this function, must lock scn_lock first
*/
knl_scn_t tx_inc_scn(knl_session_t *session, uint32 seg_id, txn_t *txn, knl_scn_t xa_scn)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
tx_area_t *area = &session->kernel->tran_ctx;
knl_scn_t scn;
timeval_t now;
time_t init_time;
uint64 seq = 1;
undo_t *undo = &ogx->undos[seg_id];
init_time = DB_INIT_TIME(session);
#ifdef OG_RAC_ING
knl_scn_t gts_scn;
if (TX_XA_CONSISTENCY(session)) {
status_t status = gts_get_lcl_timestamp(>s_scn);
KNL_SCN_TO_TIMESEQ(gts_scn, &now, seq, CM_GTS_BASETIME);
seq++;
knl_panic(status == OG_SUCCESS);
} else
#endif
{
(void)cm_gettimeofday(&now);
}
cm_spin_lock(&area->scn_lock, &session->stat->spin_stat.stat_inc_scn);
if (xa_scn != OG_INVALID_ID64) {
scn = xa_scn;
if (scn > KNL_GET_SCN(&session->kernel->scn)) {
KNL_SET_SCN(&session->kernel->scn, scn);
}
} else {
scn = knl_inc_scn(init_time, &now, seq, &session->kernel->scn, session->kernel->attr.systime_inc_threshold);
}
if (undo->ow_scn < txn->scn && txn->status != (uint8)XACT_PHASE1) {
undo->ow_scn = txn->scn;
}
cm_spin_unlock(&area->scn_lock);
return scn;
}
static inline void tx_end_stat(knl_session_t *session, txn_t *txn, tx_item_t *item)
{
if (txn->status == (uint8)XACT_BEGIN) {
session->stat->local_txn_times += (KNL_NOW(session) - item->systime);
} else if (txn->status == (uint8)XACT_PHASE1 || txn->status == (uint8)XACT_PHASE2) {
session->stat->xa_txn_times += (KNL_NOW(session) - item->systime);
} else {
knl_panic(0);
}
}
* end transaction
* From now on, we are going to overwrite commit scn to transaction,
* save the max overwritten scn to global transaction area. If we are
* in rollback process, overwrite 0 to transaction, so the following
* allocation of itl can reuse itl related to current transaction
* immediately 'causing no rows or keys are related with the itl.
*/
static void tx_end(knl_session_t *session, bool32 is_commit, knl_scn_t xa_scn, bool32 dis_ckpt)
{
undo_context_t *ogx = &session->kernel->undo_ctx;
tx_area_t *area = &session->kernel->tran_ctx;
knl_rm_t *rm = session->rm;
undo_set_t *undo_set = UNDO_SET(session, XID_INST_ID(rm->xid));
txn_t *txn = rm->txn;
undo_t *tx_undo = &undo_set->undos[rm->tx_id.seg_id];
tx_item_t *tx_item = &tx_undo->items[rm->tx_id.item_id];
undo_page_id_t page_id = tx_undo->segment->txn_page[tx_item->xmap.slot / TXN_PER_PAGE(session)];
bool32 has_logic = (session->kernel->db.ctrl.core.lrep_mode == LOG_REPLICATION_ON);
rd_tx_end_t redo;
knl_panic((XID_INST_ID(rm->xid) == session->kernel->id) || DB_IS_BG_ROLLBACK_SE(session));
rm->need_copy_logic_log = LOG_HAS_LOGIC_DATA(session);
rm->nolog_insert = OG_FALSE;
rm->nolog_type = LOGGING_LEVEL;
rm->logging = OG_TRUE;
undo_t *undo = &ogx->undos[UNDO_GET_SESSION_UNDO_SEGID(session)];
redo.xmap = rm->xid.xmap;
redo.is_auton = 0;
redo.is_commit = (uint8)is_commit;
tx_end_stat(session, txn, tx_item);
tx_item->in_progress = OG_TRUE;
if (session->kernel->attr.serialized_commit) {
cm_spin_lock(&area->seri_lock, &session->stat->spin_stat.stat_seri_commit);
}
uint64 begin_time = KNL_NOW(session);
log_atomic_op_begin(session);
buf_enter_page(session, PAGID_U2N(page_id), LATCH_MODE_X, ENTER_PAGE_RESIDENT);
cm_spin_lock(&tx_item->lock, &session->stat->spin_stat.stat_txn);
txn->scn = tx_inc_scn(session, rm->tx_id.seg_id, txn, xa_scn);
tx_item->rmid = OG_INVALID_ID16;
txn->status = (uint8)XACT_END;
cm_spin_unlock(&tx_item->lock);
cm_atomic_set(&session->kernel->commit_scn, (int64)txn->scn);
redo.scn = txn->scn;
redo.aligned = 0;
log_put(session, RD_TX_END, &redo, sizeof(rd_tx_end_t), LOG_ENTRY_FLAG_NONE);
if (has_logic && knl_xa_xid_valid(&rm->xa_xid)) {
log_append_data(session, &rm->xa_xid, sizeof(knl_xa_xid_t));
}
buf_leave_page(session, OG_TRUE);
if (KNL_IS_AUTON_SE(session)) {
session->kernel->stat.auto_txn_page_end_waits += KNL_NOW(session) - begin_time;
} else {
session->kernel->stat.txn_page_end_waits += KNL_NOW(session) - begin_time;
}
if (txn->undo_pages.count > 0) {
undo_release_pages(session, undo, &txn->undo_pages, OG_TRUE);
session->rm->txn_alarm_enable = OG_TRUE;
}
if (session->rm->noredo_undo_pages.count > 0) {
undo_release_pages(session, undo, &rm->noredo_undo_pages, OG_FALSE);
}
if (dis_ckpt) {
knl_begin_session_wait(session, CKPT_DISABLE_WAIT, OG_TRUE);
ckpt_disable_update_point(session);
knl_end_session_wait(session, CKPT_DISABLE_WAIT);
}
log_atomic_op_end(session);
if (session->kernel->attr.serialized_commit) {
cm_spin_unlock(&area->seri_lock);
}
tx_item->in_progress = OG_FALSE;
dls_release_txn(session, rm);
}
static inline void tx_release(knl_session_t *session)
{
knl_rm_t *rm = session->rm;
undo_set_t *undo_set = UNDO_SET(session, XID_INST_ID(rm->xid));
lock_free(session, rm);
txn_release(session, undo_set, rm->tx_id);
tx_reset_rm(session, rm);
session->tx_fpl.count = 0;
session->tx_fpl.index = 0;
}
void tx_copy_logic_log(knl_session_t *session)
{
knl_rm_t *rm = session->rm;
rm->need_copy_logic_log = LOG_HAS_LOGIC_DATA(session);
if (rm->need_copy_logic_log) {
log_atomic_op_begin(session);
log_atomic_op_end(session);
log_commit(session);
}
}
static inline void tx_delete_xa_xid(knl_session_t *session)
{
if (knl_xa_xid_valid(&session->rm->xa_xid)) {
g_knl_callback.delete_xa_xid(&session->rm->xa_xid);
}
}
void tx_commit(knl_session_t *session, knl_scn_t xa_scn)
{
knl_rm_t *rm = session->rm;
rm->isolevel = session->kernel->attr.db_isolevel;
rm->query_scn = OG_INVALID_ID64;
bool32 has_logic = session->logic_log_size > 0 || session->rm->logic_log_size > 0;
if (session->temp_table_count != 0) {
knl_close_temp_tables(session, DICT_TYPE_TEMP_TABLE_TRANS);
}
if (rm->txn == NULL || rm->txn->status == (uint8)XACT_END) {
tx_copy_logic_log(session);
if (has_logic) {
dtc_sync_ddl(session);
}
rm->svpt_count = 0;
return;
}
if (rm->lob_items.count != 0) {
lob_free_delete_pages(session);
lob_items_free(session);
}
tx_end(session, OG_TRUE, xa_scn, has_logic);
log_commit(session);
if (has_logic) {
SYNC_POINT_GLOBAL_START(OGRAC_DDL_BEFORE_SYNC_DDL_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
dtc_sync_ddl(session);
ckpt_enable_update_point(session);
}
tx_release(session);
g_knl_callback.accumate_io(session, IO_TYPE_COMMIT);
}
static inline status_t tx_is_invalid_xid(knl_session_t *session, xid_t xid)
{
if ((xid.xmap.seg_id >= UNDO_SEGMENT_COUNT(session) && !DB_ATTR_CLUSTER(session)) ||
(xid.xmap.slot / TXN_PER_PAGE(session)) >= UNDO_DEF_TXN_PAGE(session)) {
OG_THROW_ERROR(ERR_INVALID_DATABASE_DEF, "invalid xid , exceed max segment count or def txn pages");
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t knl_commit_force(knl_handle_t handle, knl_xid_t *xid)
{
knl_session_t *session = (knl_session_t *)handle;
knl_rm_t *rm = session->rm;
tx_item_t *item = NULL;
txn_t *txn = NULL;
xid_t force_xid;
tx_id_t tx_id;
uint32 i;
if (!DB_IS_RESTRICT(session)) {
OG_THROW_ERROR(ERR_INVALID_OPERATION, ",operation only supported in restrict mode");
return OG_ERROR;
}
if (rm->txn != NULL) {
OG_THROW_ERROR(ERR_TXN_IN_PROGRESS, "cur session is in transaction,can't commit force transaction.");
return OG_ERROR;
}
force_xid.xmap.seg_id = xid->seg_id;
force_xid.xmap.slot = xid->slot;
force_xid.xnum = xid->xnum;
if (tx_is_invalid_xid(session, force_xid) != OG_SUCCESS) {
return OG_ERROR;
}
item = xmap_get_item(session, force_xid.xmap);
txn = txn_addr(session, force_xid.xmap);
for (i = SESSION_ID_ROLLBACK; i <= SESSION_ID_ROLLBACK_EDN; i++) {
if (item->rmid == session->kernel->sessions[i]->rmid) {
break;
}
}
if (i > SESSION_ID_ROLLBACK_EDN ||
txn->status == (uint8)XACT_END ||
txn->xnum != xid->xnum) {
OG_THROW_ERROR(ERR_INVALID_DATABASE_DEF, "invalid xid , not found residual transaction");
return OG_ERROR;
}
if (txn->status == (uint8)XACT_PHASE1) {
OG_THROW_ERROR(ERR_XATXN_IN_PROGRESS, "can't commit force residual XA transaction.");
return OG_ERROR;
}
tx_id = xmap_get_txid(session, force_xid.xmap);
tx_rm_attach_trans(rm, item, txn, tx_id.item_id);
knl_commit(handle);
return OG_SUCCESS;
}
void knl_commit(knl_handle_t handle)
{
status_t ret = OG_ERROR;
io_id_t io_id = {0};
if (cm_dbs_is_enable_dbs()) {
io_id.io_no = GetFdsaIoNo();
io_id.fdsa_type = FDSA_KNL_COMMIT;
ret = AddIo2FdsaHashTable(io_id);
}
SYNC_POINT_GLOBAL_START(OGRAC_KNL_COMMIT_DELAY, NULL, 200000);
SYNC_POINT_GLOBAL_END;
SYNC_POINT_GLOBAL_START(OGRAC_KNL_COMMIT_DELAY_ONCE, NULL, 660000);
SYNC_POINT_GLOBAL_END;
knl_session_t *session = (knl_session_t *)handle;
g_knl_callback.before_commit(handle);
tx_commit(session, OG_INVALID_ID64);
session->stat->commits++;
tx_delete_xa_xid(session);
if (cm_dbs_is_enable_dbs() && ret == OG_SUCCESS) {
RemovetIoFromFdsaHashtable(io_id);
}
if (!DB_IS_CLUSTER(session) || session->logic_log_num == 0) {
return;
}
if (knl_begin_auton_rm(session) != OG_SUCCESS) {
return;
}
status_t status = db_clean_ddl_op(session, DDL_CLEAN_SESSION);
knl_end_auton_rm(session, status);
}
static void tx_undo_one_row(knl_session_t *session, undo_row_t *row, undo_page_t *page, int32 slot,
knl_dictionary_t *dc, heap_undo_assist_t *heap_assist)
{
switch (row->type) {
case UNDO_HEAP_INSERT:
heap_undo_insert(session, row, page, slot, dc, heap_assist);
break;
case UNDO_HEAP_DELETE:
case UNDO_HEAP_DELETE_ORG:
case UNDO_HEAP_COMPACT_DELETE:
case UNDO_HEAP_COMPACT_DELETE_ORG:
heap_undo_delete(session, row, page, slot);
break;
case UNDO_HEAP_UPDATE:
case UNDO_HEAP_UPDATE_FULL:
heap_undo_update(session, row, page, slot, dc, heap_assist);
break;
case UNDO_BTREE_INSERT:
btree_undo_insert(session, row, page, slot, dc);
break;
case UNDO_BTREE_DELETE:
btree_undo_delete(session, row, page, slot, dc);
break;
case UNDO_CREATE_INDEX:
btree_undo_create(session, row, page, slot);
break;
case UNDO_LOB_INSERT:
lob_undo_insert(session, row, page, slot, dc);
break;
case UNDO_LOB_DELETE_COMMIT:
lob_undo_delete_commit(session, row, page, slot);
break;
case UNDO_TEMP_HEAP_INSERT:
temp_heap_undo_insert(session, row, page, slot);
break;
case UNDO_TEMP_HEAP_BINSERT:
temp_heap_undo_batch_insert(session, row, page, slot);
break;
case UNDO_TEMP_HEAP_DELETE:
temp_heap_undo_delete(session, row, page, slot);
break;
case UNDO_TEMP_HEAP_UPDATE:
case UNDO_TEMP_HEAP_UPDATE_FULL:
temp_heap_undo_update(session, row, page, slot);
break;
case UNDO_TEMP_BTREE_INSERT:
temp_btree_undo_insert(session, row, page, slot, dc);
break;
case UNDO_TEMP_BTREE_BINSERT:
temp_btree_undo_batch_insert(session, row, page, slot, dc);
break;
case UNDO_TEMP_BTREE_DELETE:
temp_btree_undo_delete(session, row, page, slot, dc);
break;
case UNDO_LOB_DELETE:
lob_undo_delete(session, row, page, slot);
break;
case UNDO_HEAP_INSERT_MIGR:
heap_undo_insert_migr(session, row, page, slot, dc, heap_assist);
break;
case UNDO_HEAP_UPDATE_LINKRID:
heap_undo_update_linkrid(session, row, page, slot);
break;
case UNDO_HEAP_DELETE_MIGR:
heap_undo_delete_migr(session, row, page, slot, dc, heap_assist);
break;
case UNDO_PCRH_ITL:
pcrh_ud_itl(session, row, page, slot, dc, heap_assist);
break;
case UNDO_PCRH_INSERT:
pcrh_undo_ins(session, row, page, slot);
break;
case UNDO_PCRH_DELETE:
case UNDO_PCRH_COMPACT_DELETE:
pcrh_undo_del(session, row, page, slot);
break;
case UNDO_PCRH_UPDATE:
case UNDO_PCRH_UPDATE_FULL:
pcrh_undo_upd(session, row, page, slot);
break;
case UNDO_PCRH_UPDATE_LINK_SSN:
pcrh_undo_upd_link_ssn(session, row, page, slot);
break;
case UNDO_PCRH_UPDATE_NEXT_RID:
pcrh_undo_upd_next_rowid(session, row, page, slot);
break;
case UNDO_PCRH_BATCH_INSERT:
pcrh_undo_batch_ins(session, row, page, slot);
break;
case UNDO_PCRB_ITL:
pcrb_undo_itl(session, row, page, slot);
break;
case UNDO_PCRB_INSERT:
pcrb_undo_insert(session, row, page, slot, dc);
break;
case UNDO_PCRB_DELETE:
pcrb_undo_delete(session, row, page, slot, dc);
break;
case UNDO_PCRB_BATCH_INSERT:
pcrb_undo_batch_insert(session, row, page, slot, dc);
break;
case UNDO_LOB_DELETE_COMMIT_RECYCLE:
lob_undo_delete_commit_recycle(session, row, page, slot);
break;
case UNDO_LOB_ALLOC_PAGE:
lob_undo_write_page(session, row, page, slot);
break;
case UNDO_CREATE_HEAP:
heap_undo_create_part(session, row, page, slot);
break;
case UNDO_CREATE_LOB:
lob_undo_create_part(session, row, page, slot);
break;
case UNDO_LOB_TEMP_ALLOC_PAGE:
lob_temp_undo_write_page(session, row, page, slot);
break;
case UNDO_LOB_TEMP_DELETE:
lob_temp_undo_delete(session, row, page, slot);
break;
default:
knl_panic_log(0, "row type is unknown, panic info: page %u-%u type %u row type %u",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page, page->head.type, row->type);
break;
}
}
static void tx_rollback_one_row(knl_session_t *session, undo_row_t *row, undo_page_t *page, int32 slot)
{
knl_dictionary_t dc;
heap_undo_assist_t heap_assist;
page_id_t page_id;
uint32 i;
heap_assist.rows = 0;
heap_assist.heap = NULL;
heap_assist.need_latch = OG_FALSE;
dc.handle = NULL;
page_id = AS_PAGID(page->head.id);
log_atomic_op_begin(session);
tx_undo_one_row(session, row, page, slot, &dc, &heap_assist);
if (heap_assist.need_latch) {
dls_latch_x(session, &heap_assist.heap->latch, session->id, &session->stat_heap);
tx_undo_one_row(session, row, page, slot, &dc, &heap_assist);
dls_unlatch(session, &heap_assist.heap->latch, &session->stat_heap);
}
buf_enter_page(session, page_id, LATCH_MODE_X, ENTER_PAGE_PINNED);
row->is_cleaned = 1;
if (SPC_IS_LOGGING_BY_PAGEID(session, page_id)) {
log_put(session, RD_UNDO_CLEAN, &slot, sizeof(int32), LOG_ENTRY_FLAG_NONE);
}
buf_leave_page(session, OG_TRUE);
log_atomic_op_end(session);
for (i = 0; i < heap_assist.rows; i++) {
session->change_list = heap_assist.change_list[i];
heap_try_change_map(session, heap_assist.heap, heap_assist.page_id[i]);
}
if (dc.handle != NULL) {
dc_close(&dc);
}
}
static void tx_free_undo_pages(knl_session_t *session, undo_page_list_t *free_list, page_id_t last_page_id,
bool32 need_redo)
{
knl_rm_t *rm = session->rm;
undo_set_t *undo_set = UNDO_SET(session, XID_INST_ID(rm->xid));
undo_t *undo = &undo_set->undos[UNDO_GET_SESSION_UNDO_SEGID(session)];
txn_t *txn = rm->txn;
undo_page_id_t txn_page_id;
undo_page_list_t *tx_undo_page_list = NULL;
rd_undo_chg_txn_t redo;
txn_get_owner(session, rm->xid.xmap, &txn_page_id);
log_atomic_op_begin(session);
buf_enter_page(session, PAGID_U2N(txn_page_id), LATCH_MODE_X, ENTER_PAGE_RESIDENT);
if (need_redo) {
tx_undo_page_list = &txn->undo_pages;
} else {
tx_undo_page_list = &rm->noredo_undo_pages;
}
knl_panic_log(tx_undo_page_list->count >= free_list->count, "undo page count is smaller than free count, "
"panic info: page %u-%u type %u undo page count %u free count %u", txn_page_id.file,
txn_page_id.page, ((page_head_t *)CURR_PAGE(session))->type,
tx_undo_page_list->count, free_list->count);
tx_undo_page_list->count -= free_list->count;
if (tx_undo_page_list->count == 0) {
tx_undo_page_list->first = INVALID_UNDO_PAGID;
tx_undo_page_list->last = INVALID_UNDO_PAGID;
} else {
knl_panic_log(!IS_INVALID_PAGID(last_page_id), "last page id is invalid, panic info: txn_page %u-%u type %u",
txn_page_id.file, txn_page_id.page, ((page_head_t *)CURR_PAGE(session))->type);
tx_undo_page_list->last = PAGID_N2U(last_page_id);
}
if (need_redo) {
redo.xmap = rm->xid.xmap;
redo.undo_pages = *tx_undo_page_list;
log_put(session, RD_UNDO_CHANGE_TXN, &redo, sizeof(rd_undo_chg_txn_t), LOG_ENTRY_FLAG_NONE);
}
buf_leave_page(session, need_redo);
undo_release_pages(session, undo, free_list, need_redo);
log_atomic_op_end(session);
}
* rollback undo record on undo pages
* rollback from current_slot in begin page to target_slot in end page,
* if end page is a invalid page_id, rollback all undo-chained-pages generated by current transaction
* only in end transaction scenario could we free undo pages in rollback
*/
static void tx_rollback_pages(knl_session_t *session, undo_page_id_t undo_page_id, undo_rowid_t *svpt_urid,
bool32 need_redo)
{
knl_rm_t *rm = session->rm;
int32 slot;
int32 min_slot;
uint16 end_slot;
undo_page_t *page = NULL;
undo_row_t *row = NULL;
buf_ctrl_t *ctrl = NULL;
page_id_t page_id;
page_id_t prev;
undo_page_list_t free_list;
bool32 need_release = (svpt_urid == NULL && rm->svpt_count == 0);
free_list.count = 0;
page_id = PAGID_U2N(undo_page_id);
while (!IS_INVALID_PAGID(page_id)) {
if (buf_read_page(session, page_id, LATCH_MODE_S, ENTER_PAGE_PINNED) != OG_SUCCESS) {
CM_ABORT(DB_IS_CLUSTER(session) && DB_IS_MAXFIX(session), "[BUFFER] ABORT INFO: failed to read page %u-%u", page_id.file, page_id.page);
OG_LOG_RUN_WAR("page: %u-%u can't be loaded, ignore rollback the reset undo pages of this tx.",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page);
break;
}
page = (undo_page_t *)CURR_PAGE(session);
if (page_is_damaged(&page->head)) {
CM_ABORT(DB_IS_CLUSTER(session) && DB_IS_MAXFIX(session), "[BUFFER] ABORT INFO: page damaged %u-%u",
page_id.file, page_id.page);
OG_LOG_RUN_WAR("page: %u-%u was damaged, ignore rollback the reset undo pages of this tx.",
AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page);
buf_leave_page(session, OG_FALSE);
break;
}
end_slot = page->begin_slot;
prev = PAGID_U2N(page->prev);
ctrl = session->curr_page_ctrl;
buf_leave_page(session, OG_FALSE);
if (svpt_urid != NULL && IS_SAME_PAGID(svpt_urid->page_id, page_id)) {
knl_panic_log(svpt_urid->slot >= end_slot, "slot abnormal, panic info: page %u-%u type %u "
"svpt_urid slot %u end_slot %u", AS_PAGID(page->head.id).file, AS_PAGID(page->head.id).page,
page->head.type, svpt_urid->slot, end_slot);
end_slot = svpt_urid->slot;
}
min_slot = (int32)end_slot;
for (slot = (int32)page->rows - 1; slot >= min_slot; slot--) {
row = UNDO_ROW(session, page, slot);
if (row->xid.value == OG_INVALID_ID64) {
continue;
}
knl_panic_log(row->xid.value == rm->xid.value, "the xid of row and rm are not equal, panic info: "
"page %u-%u type %u row xid %llu rm xid %llu", AS_PAGID(page->head.id).file,
AS_PAGID(page->head.id).page, page->head.type, row->xid.value, rm->xid.value);
if (!row->is_cleaned) {
tx_rollback_one_row(session, row, page, slot);
}
}
BUF_UNPIN(ctrl);
if (svpt_urid != NULL && IS_SAME_PAGID(svpt_urid->page_id, page_id)) {
break;
}
if (need_release) {
if (free_list.count == 0) {
free_list.first = as_undo_page_id(page_id);
free_list.last = as_undo_page_id(page_id);
} else {
free_list.first = as_undo_page_id(page_id);
}
free_list.count++;
if (free_list.count == OG_EXTENT_SIZE) {
tx_free_undo_pages(session, &free_list, prev, need_redo);
free_list.count = 0;
}
}
page_id = prev;
}
}
* release savepoint whose lsn bigger than the parameter lsn on rm when rollback.
*/
static void tx_release_named_savepoint(knl_session_t *session, knl_savepoint_t *savepoint)
{
int i;
knl_rm_t *rm = session->rm;
if (savepoint == NULL) {
rm->svpt_count = 0;
return;
}
if (rm->svpt_count == 0) {
return;
}
if (savepoint->name[0] != '\0') {
for (i = rm->svpt_count - 1; i >= 0; i--) {
if (cm_str_equal_ins(savepoint->name, rm->save_points[i].name)) {
break;
}
}
} else {
for (i = rm->svpt_count - 1; i >= 0; i--) {
if (rm->save_points[i].lsn <= savepoint->lsn) {
break;
}
}
}
if (i < 0) {
rm->svpt_count = 0;
} else {
rm->svpt_count = i + 1;
}
}
void tx_rollback(knl_session_t *session, knl_savepoint_t *savepoint)
{
knl_rm_t *rm = session->rm;
tx_release_named_savepoint(session, savepoint);
if (rm->txn == NULL || rm->txn->status == (uint8)XACT_END) {
if (savepoint == NULL) {
rm->isolevel = session->kernel->attr.db_isolevel;
rm->query_scn = OG_INVALID_ID64;
}
return;
}
if (savepoint != NULL && savepoint->xid == rm->xid.value) {
knl_panic(savepoint->lsn != OG_INVALID_ID64 || DB_IS_BG_ROLLBACK_SE(session)
|| IS_INVALID_PAGID(savepoint->urid.page_id) || knl_xa_xid_valid(&rm->xa_xid));
tx_rollback_pages(session, rm->undo_page_info.undo_rid.page_id, &savepoint->urid, OG_TRUE);
tx_rollback_pages(session, rm->noredo_undo_page_info.undo_rid.page_id, &savepoint->noredo_urid, OG_FALSE);
g_knl_callback.invalidate_cursor(session, savepoint->lsn);
} else {
knl_panic(rm->begin_lsn != OG_INVALID_ID64 || DB_IS_BG_ROLLBACK_SE(session) || knl_xa_xid_valid(&rm->xa_xid));
tx_rollback_pages(session, rm->undo_page_info.undo_rid.page_id, NULL, OG_TRUE);
tx_rollback_pages(session, rm->noredo_undo_page_info.undo_rid.page_id, NULL, OG_FALSE);
g_knl_callback.invalidate_cursor(session, rm->begin_lsn);
}
if (savepoint != NULL && (savepoint->xid == rm->xid.value || rm->svpt_count > 0)) {
lob_reset_svpt(session, savepoint);
lock_free_to_svpt(session, savepoint);
lock_reset_to_svpt(session, savepoint);
} else {
if (rm->lob_items.count != 0) {
lob_items_free(session);
}
tx_end(session, OG_FALSE, OG_INVALID_ID64, OG_FALSE);
tx_release(session);
if (session->temp_table_count != 0) {
knl_close_temp_tables(session, DICT_TYPE_TEMP_TABLE_TRANS);
}
}
session->logic_log_size = 0;
if (savepoint == NULL) {
rm->isolevel = session->kernel->attr.db_isolevel;
rm->query_scn = OG_INVALID_ID64;
if (!DB_IS_CLUSTER(session) || session->logic_log_num == 0) {
return;
}
if (knl_begin_auton_rm(session) != OG_SUCCESS) {
return;
}
status_t status = db_clean_ddl_op(session, DDL_CLEAN_SESSION);
knl_end_auton_rm(session, status);
}
}
void knl_rollback(knl_handle_t handle, knl_savepoint_t *savepoint)
{
knl_session_t *session = (knl_session_t *)handle;
knl_rm_t *rm = session->rm;
if (session->rm->nolog_insert && session->rm->nolog_type == SESSION_LEVEL) {
OG_LOG_RUN_WAR("The rollback does not take effect because the transaction has executed "
"session level nologging insert, rmid: %d, xid(%d, %d, %d).",
session->rmid, rm->xid.xmap.seg_id, rm->xid.xmap.slot, rm->xid.xnum);
return;
}
tx_rollback(session, savepoint);
if (savepoint == NULL) {
tx_delete_xa_xid(session);
}
session->dist_ddl_id = NULL;
session->stat->rollbacks++;
}
* get transaction info
* get transaction info by transaction xid
* @param kernel session, is_scan, xid, trans info
*/
void tx_get_info(knl_session_t *session, bool32 is_scan, xid_t xid, txn_info_t *txn_info)
{
txn_snapshot_t snapshot;
tx_get_local_snapshot(session, xid.xmap, &snapshot);
txn_info->xid.xmap = xid.xmap;
txn_info->xid.xnum = snapshot.xnum;
if (xid.xnum == snapshot.xnum) {
* Transaction version is same with us, we get trans info directly from
* current transaction. If transaction is in XACT_END status, we just return it.
* If transaction is active or transaction is ending in progress and current
* behavior is itl-reuse, we will read history version or reuse other itl.
*/
txn_info->is_owscn = OG_FALSE;
if (snapshot.status == (uint8)XACT_PHASE1 || snapshot.status == (uint8)XACT_PHASE2) {
txn_info->scn = snapshot.scn;
txn_info->status = (uint8)snapshot.status;
} else if (snapshot.status != (uint8)XACT_END || (snapshot.in_progress && !is_scan)) {
txn_info->scn = DB_CURR_SCN(session);
txn_info->status = (uint8)XACT_BEGIN;
} else {
txn_info->scn = snapshot.scn;
txn_info->status = (uint8)XACT_END;
}
} else if (xid.xnum + 1 == snapshot.xnum && snapshot.status == (uint8)XACT_BEGIN) {
* To increase transaction info retention time, we would not overwrite
* transaction scn when we are reusing a committed transaction. So, we
* can get commit version from current transaction directly.
*/
txn_info->scn = snapshot.scn;
txn_info->is_owscn = OG_FALSE;
txn_info->status = (uint8)XACT_END;
} else {
undo_set_t *undo_set = UNDO_SET(session, XID_INST_ID(xid));
undo_t *undo = &undo_set->undos[XID_SEG_ID(xid)];
txn_info->status = (uint8)XACT_END;
txn_info->is_owscn = OG_TRUE;
txn_info->scn = undo->ow_scn;
}
}
void tx_get_itl_info(knl_session_t *session, bool32 is_scan, itl_t *itl, txn_info_t *txn_info)
{
if (itl->is_active) {
tx_get_info(session, is_scan, itl->xid, txn_info);
} else {
txn_info->scn = itl->scn;
txn_info->is_owscn = (bool8)itl->is_owscn;
txn_info->status = (uint8)XACT_END;
}
}
void tx_get_pcr_itl_info(knl_session_t *session, bool32 is_scan, pcr_itl_t *itl, txn_info_t *txn_info)
{
if (itl->is_active) {
if (!itl->is_hist) {
if (DB_IS_CLUSTER(session)) {
dtc_get_txn_info(session, is_scan, itl->xid, txn_info);
} else {
tx_get_info(session, is_scan, itl->xid, txn_info);
}
} else {
txn_info->scn = DB_CURR_SCN(session);
txn_info->is_owscn = OG_FALSE;
txn_info->status = (uint8)XACT_BEGIN;
}
} else {
txn_info->scn = itl->scn;
txn_info->is_owscn = (bool8)itl->is_owscn;
txn_info->status = (uint8)XACT_END;
}
}
static status_t tx_check_wait_valid(knl_session_t *session)
{
if (session->dead_locked) {
OG_THROW_ERROR(ERR_DEAD_LOCK, "transaction", session->id);
OG_LOG_ALARM(WARN_DEADLOCK, "'instance-name':'%s'}", session->kernel->instance_name);
return OG_ERROR;
}
if (session->itl_dead_locked) {
OG_THROW_ERROR(ERR_DEAD_LOCK, "itl", session->id);
OG_LOG_ALARM(WARN_DEADLOCK, "'instance-name':'%s'}", session->kernel->instance_name);
return OG_ERROR;
}
if (session->lock_dead_locked) {
OG_THROW_ERROR(ERR_DEAD_LOCK, "table", session->id);
OG_LOG_ALARM(WARN_DEADLOCK, "'instance-name':'%s'}", session->kernel->instance_name);
return OG_ERROR;
}
if (session->canceled) {
OG_THROW_ERROR(ERR_OPERATION_CANCELED);
return OG_ERROR;
}
if (session->killed) {
OG_THROW_ERROR(ERR_OPERATION_KILLED);
return OG_ERROR;
}
return OG_SUCCESS;
}
static inline void tx_reset_deadlock_flag(knl_session_t *session)
{
session->itl_dead_locked = OG_FALSE;
session->dead_locked = OG_FALSE;
session->lock_dead_locked = OG_FALSE;
}
* transaction wait
* transaction concurrency control interface
* Wait for the end of the transaction which hold the heap row or btree key.
* @param kernel session, timeout(in milliseconds)
*/
status_t tx_wait(knl_session_t *session, uint32 timeout, wait_event_t event)
{
txn_snapshot_t snapshot;
date_t begin_time;
status_t status;
tx_get_snapshot(session, session->wxid.xmap, &snapshot);
if (snapshot.xnum != session->wxid.xnum || snapshot.status == (uint8)XACT_END) {
session->wxid.value = OG_INVALID_ID64;
return OG_SUCCESS;
}
begin_time = KNL_NOW(session);
tx_reset_deadlock_flag(session);
session->wrmid = snapshot.rmid;
knl_begin_session_wait(session, event, OG_TRUE);
for (;;) {
if (dls_wait_txn(session, snapshot.rmid)) {
status = OG_SUCCESS;
break;
}
if (timeout != 0 && (KNL_NOW(session) - begin_time) / (date_t)MICROSECS_PER_MILLISEC > (date_t)timeout) {
OG_THROW_ERROR(ERR_LOCK_TIMEOUT);
status = OG_ERROR;
break;
}
if (tx_check_wait_valid(session) != OG_SUCCESS) {
status = OG_ERROR;
break;
}
tx_get_snapshot(session, session->wxid.xmap, &snapshot);
if (snapshot.xnum != session->wxid.xnum || snapshot.status == (uint8)XACT_END) {
status = OG_SUCCESS;
break;
}
}
dls_wait_txn_recyle(session);
knl_end_session_wait(session, event);
session->stat->con_wait_time += session->wait_pool[event].usecs;
tx_reset_deadlock_flag(session);
session->wrmid = OG_INVALID_ID16;
session->wxid.value = OG_INVALID_ID64;
return status;
}
inline void tx_get_local_snapshot(knl_session_t *session, xmap_t xmap, txn_snapshot_t *snapshot)
{
tx_item_t *tx_item = xmap_get_item(session, xmap);
txn_t *txn = txn_addr(session, xmap);
cm_spin_lock(&tx_item->lock, &session->stat->spin_stat.stat_txn);
snapshot->xnum = txn->xnum;
snapshot->scn = txn->scn;
snapshot->rmid = tx_item->rmid;
snapshot->status = txn->status;
snapshot->in_progress = tx_item->in_progress;
cm_spin_unlock(&tx_item->lock);
}
inline void tx_get_snapshot(knl_session_t *session, xmap_t xmap, txn_snapshot_t *snapshot)
{
uint8 inst_id;
uint8 curr_id;
inst_id = XMAP_INST_ID(xmap);
if (inst_id == session->kernel->id) {
tx_get_local_snapshot(session, xmap, snapshot);
return;
}
for (;;) {
curr_id = xmap_get_inst_id(session, xmap);
if (curr_id == session->kernel->id) {
if (rc_instance_accessible(inst_id)) {
tx_get_local_snapshot(session, xmap, snapshot);
} else {
cm_sleep(MES_MSG_RETRY_TIME);
continue;
}
} else {
if (g_dtc->profile.node_count <= curr_id) {
OG_LOG_RUN_ERR("current id get from xmap is invalid, curr_id(%u)", curr_id);
break;
}
if (dtc_get_remote_txn_snapshot(session, xmap, curr_id, snapshot) != OG_SUCCESS) {
cm_reset_error();
cm_sleep(MES_MSG_RETRY_TIME);
continue;
}
}
return;
}
}
void txn_get_owner(knl_session_t *session, xmap_t xmap, undo_page_id_t *page_id)
{
undo_set_t *undo_set = UNDO_SET(session, XMAP_INST_ID(xmap));
undo_t *undo = &undo_set->undos[XMAP_SEG_ID(xmap)];
*page_id = undo->segment->txn_page[xmap.slot / TXN_PER_PAGE(session)];
}
void tx_rollback_proc(thread_t *thread)
{
rollback_ctx_t *rb_ctx = (rollback_ctx_t *)thread->argument;
knl_session_t *session = rb_ctx->session;
undo_set_t *undo_set = UNDO_SET(session, rb_ctx->inst_id);
undo_context_t *ogx = &session->kernel->undo_ctx;
session->bg_rollback = OG_TRUE;
cm_set_thread_name("rollback");
OG_LOG_RUN_INF("rollback %u thread started", rb_ctx->inst_id);
KNL_SESSION_SET_CURR_THREADID(session, cm_get_current_thread_id());
while (!thread->closed) {
* make it works when it reach to WAIT_CLEAN,
* because we will drop nologging tables during `db_drop_nologging_table',
* if we want to lock a row which is locked by a running transaction,
* we can wait tx_rollback_proc to undo it,
* otherwise, deadlock maybe occurred, example(1->2->3->1):
* 1. tx_rollback_proc wait db_open;
* 2. db_open wait db_drop_nologging_table;
* 3. db_clean_nologging_guts wait tx_rollback_proc to rollback a running transaction;
*/
if (session->kernel->db.status >= DB_STATUS_WAIT_CLEAN) {
if (DB_IS_MAXFIX(session)) {
break;
}
if (!DB_IS_READONLY(session) && !DB_IS_MAINTENANCE(session)) {
break;
}
if (!DB_IS_PRIMARY(&session->kernel->db) && session->kernel->lrpl_ctx.is_promoting == OG_TRUE) {
break;
}
}
cm_sleep(200);
}
if (!thread->closed) {
tx_area_rollback(session, thread, undo_set);
OG_LOG_RUN_INF("[tx_rollback_proc] undo_set->active_workers=%lld, undo_ctx->active_workers=%lld",
undo_set->active_workers, ogx->active_workers);
if (undo_set->active_workers > 0) {
(void)cm_atomic_dec(&ogx->active_workers);
OG_LOG_RUN_INF("[tx_rollback_proc] dec active_workers in undo ogx, undo_set->active_workers=%lld, "
"undo_ctx->active_workers=%lld", undo_set->active_workers, ogx->active_workers);
}
}
session->bg_rollback = OG_FALSE;
OG_LOG_RUN_INF("rollback thread closed");
KNL_SESSION_CLEAR_THREADID(session);
}
status_t tx_rollback_start(knl_session_t *session)
{
knl_instance_t *kernel = session->kernel;
undo_set_t *undo_set = MY_UNDO_SET(session);
uint32 i;
for (i = 0; i < session->kernel->attr.tx_rollback_proc_num; i++) {
undo_set->rb_ctx[i].session = kernel->sessions[SESSION_ID_ROLLBACK + i];
undo_set->rb_ctx[i].inst_id = session->kernel->id;
if (cm_create_thread(tx_rollback_proc, 0, &undo_set->rb_ctx[i], &kernel->tran_ctx.rollback_proc[i]) !=
OG_SUCCESS) {
return OG_ERROR;
}
undo_set->rb_ctx[i].thread = kernel->tran_ctx.rollback_proc[i];
}
return OG_SUCCESS;
}
void tx_rollback_close(knl_session_t *session)
{
knl_instance_t *kernel = session->kernel;
tx_area_t *ogx = &kernel->tran_ctx;
undo_set_t *undo_set = MY_UNDO_SET(session);
for (uint32 i = 0; i < session->kernel->attr.tx_rollback_proc_num; i++) {
knl_panic(undo_set->rb_ctx[i].session == kernel->sessions[SESSION_ID_ROLLBACK + i]);
cm_close_thread(&ogx->rollback_proc[i]);
}
knl_panic(undo_set->active_workers == 0);
}
status_t txn_dump_page(knl_session_t *session, page_head_t *page_head, cm_dump_t *dump)
{
txn_page_t *page = (txn_page_t *)page_head;
txn_t *txn = NULL;
page_id_t first;
page_id_t last;
uint32 count = (PAGE_SIZE(page->head) - sizeof(page_head_t) - sizeof(page_tail_t)) / sizeof(txn_t);
cm_dump(dump, "txn page information\n");
CM_DUMP_WRITE_FILE(dump);
for (uint32 slot = 0; slot < count; slot++) {
txn = &page->items[slot];
first = PAGID_U2N(txn->undo_pages.first);
last = PAGID_U2N(txn->undo_pages.last);
cm_dump(dump, "\titems[%u] ", slot);
cm_dump(dump, "\txnum: %-3u", txn->xnum);
cm_dump(dump, "\tstatus: %s", txn_status((xact_status_t)txn->status));
cm_dump(dump, "\tscn: %llu", txn->scn);
cm_dump(dump, "\tundo_pages: count %u first %u-%u last %u-%u\n", txn->undo_pages.count,
(uint32)first.file, (uint32)first.page, (uint32)last.file, (uint32)last.page);
CM_DUMP_WRITE_FILE(dump);
}
return OG_SUCCESS;
}
void tx_record_sql(knl_session_t *session)
{
text_t sql_text;
sql_text.str = (char *)cm_push(session->stack, RECORD_SQL_SIZE);
sql_text.len = RECORD_SQL_SIZE;
if (sql_text.str == NULL || g_knl_callback.get_sql_text(session->id, &sql_text) != OG_SUCCESS) {
cm_reset_error();
} else {
OG_LOG_RUN_ERR("sql detail: %s", T2S(&sql_text));
}
cm_pop(session->stack);
}
void tx_shutdown(knl_session_t *session)
{
for (uint8 id = 0; id < OG_MAX_INSTANCES; id++) {
undo_set_t *undo_set = UNDO_SET(session, id);
if (undo_set->assign_workers == 0) {
continue;
}
if (id == session->kernel->id) {
tx_rollback_close(session);
} else {
dtc_rollback_close(session, id);
}
}
}