* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* dtc_recovery.c
*
*
* IDENTIFICATION
* src/cluster/dtc_recovery.c
*
* -------------------------------------------------------------------------
*/
#include "knl_cluster_module.h"
#include "knl_recovery.h"
#include "dtc_database.h"
#include "dtc_drc.h"
#include "dtc_reform.h"
#include "cm_dbs_intf.h"
#include "cm_dbs_ulog.h"
#include "dirent.h"
#include "knl_space_log.h"
#include "knl_map.h"
#include "rcr_btree.h"
#include "knl_create_space.h"
#include "knl_buffer.h"
#include "knl_page.h"
#include "knl_undo.h"
#include "knl_punch_space.h"
#include "cm_io_record.h"
#include "dtc_backup.h"
dtc_rcy_analyze_paral_node_t g_analyze_paral_mgr;
dtc_rcy_replay_paral_node_t g_replay_paral_mgr = { 0 };
page_stack_t g_dtc_rcy_page_id_stack;
log_batch_t *dtc_rcy_get_curr_batch(dtc_rcy_context_t *dtc_rcy, uint32 idx, uint8 index)
{
return ((log_batch_t *)((dtc_rcy)->rcy_nodes[(idx)].read_buf[(index)].aligned_buf +
(dtc_rcy)->rcy_nodes[(idx)].read_pos[(index)]));
}
void dtc_rcy_inc_need_analysis_leave_page_cnt(bool32 recover_flag)
{
if (recover_flag) {
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy->need_analysis_leave_page_cnt++;
}
}
void dtc_rcy_dec_need_analysis_leave_page_cnt(bool32 recover_flag)
{
if (recover_flag) {
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy->need_analysis_leave_page_cnt--;
}
}
void dtc_rcy_reset_need_analysis_leave_page_cnt(bool32 recover_flag)
{
if (recover_flag) {
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy->need_analysis_leave_page_cnt = 0;
}
}
bool8 dtc_rcy_is_need_analysis_leave_page(bool32 recover_flag)
{
if (recover_flag) {
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
return (dtc_rcy->need_analysis_leave_page_cnt > 0);
}
return OG_FALSE;
}
bool8 dtc_rcy_set_pitr_end_analysis(bool32 recover_flag)
{
if (recover_flag) {
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy->is_end_restore_recover = OG_TRUE;
return OG_TRUE;
}
return OG_FALSE;
}
static bool8 dtc_rcy_check_is_end_restore_recovery(void)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
return dtc_rcy->is_end_restore_recover;
}
bool8 dtc_rcy_set_pitr_end_replay(bool32 recover_flag, uint64 lsn)
{
if (recover_flag) {
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
if (lsn >= dtc_rcy->end_lsn_restore_recovery) {
dtc_rcy->is_end_restore_recover = OG_TRUE;
return OG_TRUE;
}
return OG_FALSE;
}
return OG_FALSE;
}
static inline uint32 dtc_rcy_bucket_hash(page_id_t page_id, uint32 range)
{
return (HASH_SEED * page_id.page + page_id.file) * HASH_SEED % range;
}
rcy_set_item_t *dtc_rcy_get_item(rcy_set_bucket_t *bucket, page_id_t page_id)
{
rcy_set_item_t *item = bucket->first;
while (item != NULL) {
if (IS_SAME_PAGID(item->page_id, page_id)) {
return item;
}
item = item->next_item;
}
return NULL;
}
static inline void dtc_rcy_add_to_bucket(rcy_set_bucket_t *bucket, rcy_set_item_t *item)
{
item->next_item = bucket->first;
bucket->first = item;
bucket->count++;
}
static inline void reset_read_buffer()
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
uint32 read_buf_size = g_instance->kernel.attr.rcy_node_read_buf_size;
for (int i = 0; i < dtc_rcy->node_count; ++i) {
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[i];
for (int j = 0; j < read_buf_size; ++j) {
rcy_node->read_buf_ready[j] = OG_FALSE;
rcy_node->write_pos[j] = 0;
rcy_node->read_pos[j] = 0;
rcy_node->read_size[j] = OG_INVALID_ID32;
rcy_node->not_finished[j] = OG_TRUE;
}
rcy_node->read_buf_read_index = 0;
rcy_node->read_buf_write_index = 0;
}
}
static status_t close_read_log_proc(thread_t *read_log_thread, knl_session_t *session)
{
OG_LOG_RUN_INF("[DTC RCY] start close "
"rcy read log thread, closed = %d result = %d",
read_log_thread->closed, read_log_thread->result);
read_log_thread->closed = OG_TRUE;
uint32 time_out = OG_DTC_RCY_NODE_READ_BUF_TIMEOUT;
for (;;) {
if (read_log_thread->result == OG_FALSE) {
cm_sleep(OG_DTC_RCY_NODE_READ_BUF_SLEEP_TIME);
time_out -= OG_DTC_RCY_NODE_READ_BUF_SLEEP_TIME;
if (time_out <= 0) {
OG_LOG_RUN_WAR("[DTC RCY] dtc rcy close read log proc time out");
time_out = OG_DTC_RCY_NODE_READ_BUF_TIMEOUT;
}
} else {
break;
}
}
reset_read_buffer();
g_knl_callback.release_knl_session(session);
cm_close_thread(read_log_thread);
OG_LOG_RUN_INF("[DTC RCY] finish close read log proc");
return OG_SUCCESS;
}
static status_t wait_for_read_buf_finish_read(uint32 index)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[index];
timeval_t begin_time;
uint64 sleep_time = 0;
ELAPSED_BEGIN(begin_time);
OG_LOG_DEBUG_INF("[DTC RCY] dtc fetch log start wait for read buf node_id = %u", rcy_node->node_id);
uint32 time_out = OG_DTC_RCY_NODE_READ_BUF_TIMEOUT;
for (;;) {
if (SECUREC_UNLIKELY(rcy_node->read_size[rcy_node->read_buf_read_index] == OG_INVALID_ID32)) {
cm_sleep(OG_DTC_RCY_NODE_READ_BUF_SLEEP_TIME);
time_out -= OG_DTC_RCY_NODE_READ_BUF_SLEEP_TIME;
if (time_out <= 0) {
OG_LOG_RUN_WAR("[DTC RCY] dtc rcy fetch log batch wait for read buf time out node_id =%u", index);
time_out = OG_DTC_RCY_NODE_READ_BUF_TIMEOUT;
}
} else {
break;
}
}
ELAPSED_END(begin_time, sleep_time);
OG_LOG_DEBUG_INF("[DTC RCY] dtc fetch log finish wait for "
"read buf sleep time = %llu node_id = %u",
sleep_time, rcy_node->node_id);
return OG_SUCCESS;
}
status_t dtc_rcy_set_item_update_need_replay(rcy_set_bucket_t *bucket, page_id_t page_id, bool8 need_replay)
{
rcy_set_item_t *item = bucket->first;
uint64 curr_page_lsn = OG_INVALID_ID64;
knl_session_t *session = g_instance->kernel.sessions[SESSION_ID_KERNEL];
if (!DB_IS_PRIMARY(&session->kernel->db)) {
buf_bucket_t *buf_bucket = buf_find_bucket(session, page_id);
cm_spin_lock(&buf_bucket->lock, NULL);
buf_ctrl_t *ctrl = buf_find_from_bucket(buf_bucket, page_id);
if (!ctrl || ctrl->lock_mode == DRC_LOCK_NULL) {
as the page on disk may be not the latest one. */
curr_page_lsn = 0;
cm_spin_unlock(&buf_bucket->lock);
} else {
curr_page_lsn = (ctrl->page)->lsn;
cm_spin_unlock(&buf_bucket->lock);
}
}
while (item != NULL) {
if (IS_SAME_PAGID(item->page_id, page_id)) {
if (item->last_dirty_lsn <= curr_page_lsn) {
item->need_replay = need_replay;
}
return OG_SUCCESS;
}
item = item->next_item;
}
return OG_ERROR;
}
rcy_set_item_t *dtc_rcy_get_item_internal(page_id_t page_id)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
rcy_set_t *rcy_set = &dtc_rcy->rcy_set;
rcy_set_bucket_t *bucket = NULL;
uint32 hash_id;
hash_id = dtc_rcy_bucket_hash(page_id, rcy_set->bucket_num);
bucket = &rcy_set->buckets[hash_id];
return (dtc_rcy_get_item(bucket, page_id));
}
static void dtc_rcy_init_last_recovery_stat(instance_list_t *recover_list)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_stat_t *stat = &dtc_rcy->rcy_stat;
stat->last_rcy_log_size = 0;
stat->last_rcy_set_num = 0;
stat->last_rcy_analyze_elapsed = 0;
stat->last_rcy_set_revise_elapsed = 0;
stat->last_rcy_replay_elapsed = 0;
stat->last_rcy_elapsed = 0;
stat->last_rcy_is_full_recovery = OG_FALSE;
stat->last_rcy_logic_log_group_count = 0;
stat->last_rcy_logic_log_elapsed = 0;
stat->latc_rcy_logic_log_wait_time = 0;
MEMS_RETVOID_IFERR(memset_sp(&stat->rcy_log_points, sizeof(rcy_node_stat_t) * OG_MAX_INSTANCES, 0,
sizeof(rcy_node_stat_t) * OG_MAX_INSTANCES));
MEMS_RETVOID_IFERR(
memcpy_s(&stat->last_rcy_inst_list, sizeof(instance_list_t), recover_list, sizeof(instance_list_t)));
}
static rcy_set_item_pool_t *dtc_rcy_alloc_itempool(rcy_set_t *rcy_set)
{
rcy_set_item_pool_t *item_pool = NULL;
uint64 item_pool_size = sizeof(rcy_set_item_t) * rcy_set->capacity + sizeof(rcy_set_item_pool_t);
item_pool = (rcy_set_item_pool_t *)malloc(item_pool_size);
if (item_pool == NULL) {
OG_LOG_RUN_ERR("[DTC RCY] failed to alloc rcy set itempool");
OG_THROW_ERROR(ERR_ALLOC_MEMORY, item_pool_size, "dtc recovery set itempool");
return NULL;
}
errno_t ret = memset_sp(item_pool, item_pool_size, 0, item_pool_size);
knl_securec_check(ret);
item_pool->items = (rcy_set_item_t *)((char *)item_pool + sizeof(rcy_set_item_pool_t));
item_pool->hwm = 0;
item_pool->next = NULL;
OG_LOG_RUN_INF("[DTC RCY] alloc rcy_set itempool successfully, recovery set capacity=%llu, itempool size=%llu",
rcy_set->capacity, item_pool_size);
return item_pool;
}
static status_t dtc_rcy_try_alloc_itempool(rcy_set_t *rcy_set, rcy_set_item_pool_t *old_pool)
{
static atomic32_t count = 0;
int32 times = cm_atomic32_inc(&count);
if (times == 1 && rcy_set->curr_item_pools == old_pool) {
rcy_set_item_pool_t *item_pool = dtc_rcy_alloc_itempool(rcy_set);
if (item_pool == NULL) {
cm_atomic32_dec(&count);
OG_LOG_RUN_ERR("[DTC RCY] failed to alloc itempool");
return OG_ERROR;
}
rcy_set->curr_item_pools->next = item_pool;
rcy_set->curr_item_pools = item_pool;
cm_atomic32_dec(&count);
} else {
cm_atomic32_dec(&count);
while (rcy_set->curr_item_pools == old_pool) {
cm_sleep(1);
}
}
return OG_SUCCESS;
}
static void dtc_rcy_handle_pcn_discon(knl_session_t *session, rcy_set_item_t *item, page_id_t page_id, uint32 pcn,
uint64 lsn)
{
if (pcn == 0 || pcn == (uint32)(item->pcn + 1)) {
item->pcn = pcn;
return;
}
if (pcn == (uint32)(item->pcn)) {
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy->pcn_is_equal_num++;
return;
}
if (pcn < item->pcn) {
OG_LOG_RUN_INF("[DTC RCY] analyze update page [%u-%u], first_dirty_lsn: %llu,"
"last_dirty_lsn: %llu, curr_dirty_lsn: %llu, pcn[%u-%u]",
page_id.file, page_id.page, item->first_dirty_lsn, item->last_dirty_lsn, lsn, pcn, item->pcn);
item->need_check_leave_changed = OG_TRUE;
dtc_rcy_inc_need_analysis_leave_page_cnt(session->kernel->db.recover_for_restore);
return;
}
datafile_t *datafile = DATAFILE_GET(session, page_id.file);
if (!datafile->ctrl->used || !DATAFILE_IS_ONLINE(datafile)) {
OG_LOG_RUN_ERR("[DTC RCY] analyze update page [%u-%u], first_dirty_lsn: %llu,"
"last_dirty_lsn: %llu, curr_dirty_lsn: %llu, pcn[%u-%u]",
page_id.file, page_id.page, item->first_dirty_lsn, item->last_dirty_lsn, lsn, pcn, item->pcn);
dtc_rcy_set_pitr_end_analysis(session->kernel->db.recover_for_restore);
return;
}
buf_enter_page(session, page_id, LATCH_MODE_S, ENTER_PAGE_NORMAL);
if (item->pcn < ((page_head_t *)session->curr_page)->pcn) {
item->pcn = pcn;
} else {
OG_LOG_RUN_ERR("[DTC RCY] analyze update page [%u-%u], first_dirty_lsn: %llu,"
"last_dirty_lsn: %llu, curr_dirty_lsn: %llu, pcn[%u-%u]",
page_id.file, page_id.page, item->first_dirty_lsn, item->last_dirty_lsn, lsn, pcn, item->pcn);
dtc_rcy_set_pitr_end_analysis(session->kernel->db.recover_for_restore);
}
buf_leave_page(session, OG_FALSE);
}
void dtc_rcy_init_page_id_stack(bool32 recover_flag)
{
if (recover_flag) {
g_dtc_rcy_page_id_stack.depth = 0;
}
}
void dtc_rcy_push_page_id(bool32 recover_flag, page_id_t page_id)
{
if (recover_flag) {
knl_panic(g_dtc_rcy_page_id_stack.depth < KNL_MAX_PAGE_STACK_DEPTH);
g_dtc_rcy_page_id_stack.depth++;
}
}
void dtc_rcy_pop_page_id(bool32 recover_flag, page_id_t *page_id)
{
if (recover_flag) {
knl_panic(g_dtc_rcy_page_id_stack.depth > 0);
g_dtc_rcy_page_id_stack.depth--;
}
}
static void dtc_rcy_get_page_id(bool32 recover_flag, page_id_t *page_id)
{
if (recover_flag) {
knl_panic(g_dtc_rcy_page_id_stack.depth > 0);
}
}
static void check_node_read_end(uint32 node_id)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[node_id];
if (!rcy_node->not_finished[rcy_node->read_buf_read_index] ||
rcy_node->read_size[rcy_node->read_buf_read_index] == 0) {
rcy_node->recover_done = OG_TRUE;
if (dtc_rcy->phase == PHASE_ANALYSIS) {
OG_LOG_RUN_INF("[DTC RCY] analysis read end point[asn(%u)-block_id(%u)-rst_id(%llu)-lfn(%llu)-lsn(%llu)]",
rcy_node->analysis_read_end_point.asn, rcy_node->analysis_read_end_point.block_id,
(uint64)rcy_node->analysis_read_end_point.rst_id,
(uint64)rcy_node->analysis_read_end_point.lfn, rcy_node->analysis_read_end_point.lsn);
}
if (dtc_rcy->phase == PHASE_RECOVERY &&
(rcy_node->latest_rcy_end_lsn != rcy_node->recovery_read_end_point.lsn)) {
OG_LOG_RUN_INF("[DTC RCY] recovery read end point[asn(%u)-block_id(%u)-rst_id(%llu)-lfn(%llu)-lsn(%llu)]",
rcy_node->recovery_read_end_point.asn, rcy_node->recovery_read_end_point.block_id,
(uint64)rcy_node->recovery_read_end_point.rst_id,
(uint64)rcy_node->recovery_read_end_point.lfn, rcy_node->recovery_read_end_point.lsn);
rcy_node->latest_rcy_end_lsn = rcy_node->recovery_read_end_point.lsn;
}
}
}
static status_t dtc_rcy_record_page(knl_session_t *session, page_id_t page_id, uint64 lsn, uint32 pcn)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
rcy_set_t *rcy_set = &dtc_rcy->rcy_set;
rcy_set_item_pool_t *item_pool;
uint32 hash_id = dtc_rcy_bucket_hash(page_id, rcy_set->bucket_num);
rcy_set_bucket_t *bucket = &rcy_set->buckets[hash_id];
int64 idx;
dtc_rcy_push_page_id(session->kernel->db.recover_for_restore, page_id);
cm_spin_lock(&bucket->lock, NULL);
rcy_set_item_t *item = dtc_rcy_get_item(bucket, page_id);
if (item != NULL) {
OG_LOG_DEBUG_INF("[DTC RCY] analyze update page [%u-%u], hash id=%u, first_dirty_lsn=%llu, last_dirty_lsn=%llu"
", curr_dirty_lsn=%llu",
page_id.file, page_id.page, hash_id, item->first_dirty_lsn, item->last_dirty_lsn, lsn);
if (lsn > item->last_dirty_lsn) {
item->last_dirty_lsn = lsn;
}
if (session->kernel->db.recover_for_restore) {
dtc_rcy_handle_pcn_discon(session, item, page_id, pcn, lsn);
}
cm_spin_unlock(&bucket->lock);
return OG_SUCCESS;
}
status_t ret = OG_SUCCESS;
do {
item_pool = rcy_set->curr_item_pools;
idx = item_pool->hwm;
if (idx >= rcy_set->capacity) {
SYNC_POINT_GLOBAL_START(OGRAC_RECOVERY_RCY_SET_ALLOC_ITEMPOOL_FAIL, &ret, OG_ERROR);
ret = dtc_rcy_try_alloc_itempool(rcy_set, item_pool);
SYNC_POINT_GLOBAL_END;
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to alloc itmepool for recovery set");
return OG_ERROR;
}
item_pool = rcy_set->curr_item_pools;
idx = item_pool->hwm;
continue;
}
} while (!cm_atomic_cas(&item_pool->hwm, idx, idx + 1));
item = &item_pool->items[idx];
item->page_id = page_id;
item->first_dirty_lsn = lsn;
item->last_dirty_lsn = lsn;
item->pcn = pcn;
item->need_replay = OG_TRUE;
OG_LOG_DEBUG_INF("[DTC RCY] analyze record page [%u-%u], hash id=%u, first_dirty_lsn=%llu, last_dirty_lsn=%llu"
", curr_dirty_lsn=%llui, pcn=%u, need replay=%u",
page_id.file, page_id.page, hash_id, item->first_dirty_lsn, item->last_dirty_lsn, lsn, item->pcn,
item->need_replay);
if (drc_get_page_master_id(page_id, &item->master_id) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to get master id of page [%u-%u]", page_id.file, page_id.page);
cm_spin_unlock(&bucket->lock);
return OG_ERROR;
}
dtc_rcy_add_to_bucket(bucket, item);
cm_spin_unlock(&bucket->lock);
return OG_SUCCESS;
}
#define DTC_GET_PAGE_ID_REDO(ptr, type, member, page_id) \
({ \
type *__redo = (type *)(ptr); \
page_id = __redo->member; \
})
bool8 dtc_get_page_id_by_redo(log_entry_t *log, page_id_t *page_id_value)
{
switch (log->type) {
case RD_HEAP_FORMAT_PAGE:
case RD_HEAP_FORMAT_MAP:
case RD_HEAP_FORMAT_ENTRY:
DTC_GET_PAGE_ID_REDO(log->data, rd_heap_format_page_t, page_id, *page_id_value);
break;
case RD_BTREE_FORMAT_PAGE:
DTC_GET_PAGE_ID_REDO(log->data, rd_btree_page_init_t, page_id, *page_id_value);
break;
case RD_BTREE_INIT_ENTRY:
DTC_GET_PAGE_ID_REDO(log->data, rd_btree_init_entry_t, page_id, *page_id_value);
break;
case RD_SPC_UPDATE_HEAD:
DTC_GET_PAGE_ID_REDO(log->data, rd_update_head_t, entry, *page_id_value);
break;
case RD_SPC_INIT_MAP_HEAD:
case RD_SPC_INIT_MAP_PAGE:
*page_id_value = AS_PAGID(log->data);
break;
case RD_UNDO_CREATE_SEGMENT:
case RD_UNDO_FORMAT_TXN:
case RD_LOB_PAGE_INIT:
case RD_LOB_PAGE_EXT_INIT: {
page_head_t *redo = (page_head_t *)log->data;
*page_id_value = AS_PAGID(redo->id);
break;
}
case RD_UNDO_FORMAT_PAGE: {
rd_undo_fmt_page_t *undo_fmt = (rd_undo_fmt_page_t *)log->data;
*page_id_value = MAKE_PAGID(undo_fmt->page_id.file, undo_fmt->page_id.page);
break;
}
case RD_PUNCH_FORMAT_PAGE:
DTC_GET_PAGE_ID_REDO(log->data, rd_punch_page_t, page_id, *page_id_value);
break;
case RD_LEAVE_PAGE:
case RD_LEAVE_TXN_PAGE: {
dtc_rcy_pop_page_id(OG_TRUE, page_id_value);
return dtc_rcy_is_need_analysis_leave_page(OG_TRUE);
}
case RD_SPC_FREE_PAGE: {
dtc_rcy_get_page_id(OG_TRUE, page_id_value);
break;
}
default:
return OG_FALSE;
}
return OG_TRUE;
}
void dtc_rcy_try_set_pitr_end_analysis(bool32 recover_flag, page_id_t *page_id, rcy_set_item_t *item, bool32 changed)
{
if (recover_flag) {
if (item->need_check_leave_changed) {
dtc_rcy_dec_need_analysis_leave_page_cnt(recover_flag);
if (changed) {
OG_LOG_RUN_ERR("[DTC RCY] analyze update page [%u-%u], first_dirty_lsn: %llu, "
"last_dirty_lsn: %llu, pcn %u",
page_id->file, page_id->page, item->first_dirty_lsn, item->last_dirty_lsn, item->pcn);
(void)dtc_rcy_set_pitr_end_analysis(recover_flag);
}
}
item->need_check_leave_changed = OG_FALSE;
}
return;
}
static status_t dtc_rcy_reset_page_pcn(knl_session_t *session, log_entry_t *log)
{
page_id_t page_id;
if (!dtc_get_page_id_by_redo(log, &page_id)) {
return OG_SUCCESS;
}
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
rcy_set_t *rcy_set = &dtc_rcy->rcy_set;
uint32 hash_id = dtc_rcy_bucket_hash(page_id, rcy_set->bucket_num);
rcy_set_bucket_t *bucket = &rcy_set->buckets[hash_id];
cm_spin_lock(&bucket->lock, NULL);
rcy_set_item_t *item = dtc_rcy_get_item(bucket, page_id);
if (item != NULL) {
OG_LOG_DEBUG_INF("[DTC RCY] analyze update page [%u-%u], hash: %u, first_dirty_lsn: %llu,"
"last_dirty_lsn: %llu",
page_id.file, page_id.page, hash_id, item->first_dirty_lsn, item->last_dirty_lsn);
if (RD_TYPE_IS_LEAVE_PAGE(log->type)) {
dtc_rcy_try_set_pitr_end_analysis(session->kernel->db.recover_for_restore, &page_id, item,
*(bool32 *)log->data);
} else {
item->pcn = 0;
}
cm_spin_unlock(&bucket->lock);
return OG_SUCCESS;
}
cm_spin_unlock(&bucket->lock);
OG_LOG_DEBUG_INF("[DTC RCY] analyze record page [%u-%u], now is format, but no page enter", page_id.file,
page_id.page);
return OG_SUCCESS;
}
static void dtc_record_space_id(uint32 space_id)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
rcy_set_t *rcy_set = &dtc_rcy->rcy_set;
uint32 *space_id_set = rcy_set->space_id_set;
for (uint32 i = 0; i < rcy_set->space_set_size; i++) {
if (space_id == space_id_set[i]) {
return;
}
}
space_id_set[rcy_set->space_set_size] = space_id;
rcy_set->space_set_size++;
OG_LOG_RUN_INF("[DTC RCY] add new space_id %u, space_set_size %u", space_id, rcy_set->space_set_size);
return;
}
static status_t dtc_rcy_analyze_entry(knl_session_t *session, log_entry_t *log, uint64 lsn, bool32 is_create_df)
{
knl_panic(log->type >= RD_ENTER_PAGE);
if (!(log->type == RD_ENTER_PAGE || log->type == RD_ENTER_TXN_PAGE)) {
if (!session->kernel->db.recover_for_restore) {
return OG_SUCCESS;
}
return dtc_rcy_reset_page_pcn(session, log);
}
rd_enter_page_t *redo = (rd_enter_page_t *)log->data;
page_id_t page_id = MAKE_PAGID(redo->file, redo->page);
if (session->kernel->db.recover_for_restore) {
return dtc_rcy_record_page(session, page_id, lsn, redo->pcn);
}
datafile_t *df = DATAFILE_GET(session, redo->file);
if (!is_create_df && (!DATAFILE_IS_ONLINE(df) || !df->ctrl->used || df->file_no == OG_INVALID_ID32)) {
OG_LOG_RUN_ERR("[DTC RCY] failed to verify df");
knl_panic(0);
return OG_ERROR;
}
space_t *space = SPACE_GET(session, df->space_id);
if (!is_create_df && (!SPACE_IS_ONLINE(space) || !space->ctrl->used)) {
OG_LOG_RUN_ERR("[DTC RCY] failed to verify space cfg");
knl_panic(0);
return OG_ERROR;
}
dtc_record_space_id(df->space_id);
if (dtc_rcy_record_page(session, page_id, lsn, redo->pcn) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to record page [%u-%u] in recovery_set", page_id.file, page_id.page);
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t dtc_rcy_analyze_group(knl_session_t *session, log_group_t *group)
{
uint32 offset = sizeof(log_group_t);
log_entry_t *log = NULL;
knl_session_t *knl_ss = session->kernel->sessions[SESSION_ID_KERNEL];
knl_ss->dtc_session_type = session->dtc_session_type;
bool32 is_create_df = OG_FALSE;
dtc_rcy_init_page_id_stack(session->kernel->db.recover_for_restore);
dtc_rcy_reset_need_analysis_leave_page_cnt(session->kernel->db.recover_for_restore);
while (offset < LOG_GROUP_ACTUAL_SIZE(group)) {
log = (log_entry_t *)((char *)group + offset);
knl_panic(log->size > 0);
if (!is_create_df && log->type == RD_SPC_CREATE_DATAFILE) {
is_create_df = OG_TRUE;
}
if (dtc_rcy_analyze_entry(knl_ss, log, group->lsn, is_create_df) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to analyze redo entry");
return OG_ERROR;
}
if (dtc_rcy_check_is_end_restore_recovery()) {
break;
}
offset += log->size;
}
return OG_SUCCESS;
}
static inline void dtc_rcy_inc_rcy_set_ref_num(void)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
cm_spin_lock(&dtc_rcy->lock, NULL);
dtc_rcy->rcy_set_ref_num++;
cm_spin_unlock(&dtc_rcy->lock);
}
static inline void dtc_rcy_dec_rcy_set_ref_num(void)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
cm_spin_lock(&dtc_rcy->lock, NULL);
dtc_rcy->rcy_set_ref_num--;
cm_spin_unlock(&dtc_rcy->lock);
}
static inline void dtc_rcy_inc_msg_sent(void)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
cm_spin_lock(&dtc_rcy->lock, NULL);
dtc_rcy->msg_sent++;
cm_spin_unlock(&dtc_rcy->lock);
}
static inline void dtc_rcy_inc_msg_recv(void)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
cm_spin_lock(&dtc_rcy->lock, NULL);
dtc_rcy->msg_recv++;
cm_spin_unlock(&dtc_rcy->lock);
}
static status_t dtc_rcy_check_rcyset_msg(knl_session_t *session)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
cm_spin_lock(&dtc_rcy->lock, NULL);
date_t time_now = KNL_NOW(session);
if (time_now - dtc_rcy->rcy_set_send_time >= dtc_rcy->msg_sent * DTC_RCY_RECV_RCY_SET_ACK_TIMEOUT) {
OG_LOG_RUN_WAR("[DTC RCY] wait nodes collects page info in rcy_set timeout, %u msg_sent, %u msg_recv, "
"time spend=%lld",
dtc_rcy->msg_sent, dtc_rcy->msg_recv, time_now - dtc_rcy->rcy_set_send_time);
dtc_rcy->failed = OG_TRUE;
return OG_ERROR;
}
if (dtc_rcy->msg_recv == dtc_rcy->msg_sent) {
dtc_rcy->phase = PHASE_HANDLE_RCYSET_DONE;
}
cm_spin_unlock(&dtc_rcy->lock);
return OG_SUCCESS;
}
status_t dtc_send_page_to_node(knl_session_t *session, page_id_t *pages, uint32 count, bool32 finished, uint8 node_id,
uint8 cmd)
{
dtc_rcy_set_msg_t req;
status_t status;
drc_remaster_mngr_t *remaster_mngr = &g_drc_res_ctx.part_mngr.remaster_mngr;
mes_init_send_head(&req.head, cmd, sizeof(dtc_rcy_set_msg_t), OG_INVALID_ID32, session->kernel->id, node_id,
session->id, OG_INVALID_ID16);
req.count = count;
req.finished = finished;
req.buffer_len = req.count * sizeof(page_id_t);
req.head.size = (uint16)(sizeof(dtc_rcy_set_msg_t) + req.count * sizeof(page_id_t));
req.reform_trigger_version = remaster_mngr->reform_info.trigger_version;
if (count > 0) {
status = mes_send_data3(&req.head, sizeof(dtc_rcy_set_msg_t), pages);
} else {
status = mes_send_data(&req.head);
}
if (cmd == MES_CMD_SEND_RCY_SET) {
dtc_rcy_inc_msg_sent();
}
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
OG_LOG_RUN_INF("[DTC RCY] send num=%u pages size=%u to instance=%u with status=%d, rcy_set ref num=%u", count,
req.head.size, node_id, status, dtc_rcy->rcy_set_ref_num);
return status;
}
static page_id_t *dtc_rcy_alloc_page_space(uint32 size)
{
if (size == 0) {
OG_LOG_RUN_ERR("[DTC RCY] failed to alloc page space, size=%u", size);
OG_THROW_ERROR(ERR_ALLOC_MEMORY, size, "dtc recovery page space");
return NULL;
}
page_id_t *pages = (page_id_t *)malloc(size);
if (pages == NULL) {
OG_LOG_RUN_ERR("[DTC RCY] failed to alloc page space, size=%u", size);
OG_THROW_ERROR(ERR_ALLOC_MEMORY, size, "dtc recovery page space");
return NULL;
}
errno_t ret = memset_sp(pages, size, 0, size);
knl_securec_check(ret);
return pages;
}
static status_t dtc_send_rcy_set_by_pool(knl_session_t *session, rcy_set_item_pool_t *pool, rcy_set_t *rcy_set)
{
uint32 size = DTC_RCY_SET_SEND_MSG_MAX_PAGE_NUM * sizeof(page_id_t);
rcy_set_item_t *item = NULL;
page_id_t *pages = NULL;
uint32 *page_count = NULL;
uint8 node_id;
for (uint32 i = 0; i < pool->hwm; i++) {
item = &pool->items[i];
node_id = item->master_id;
knl_panic(node_id < OG_MAX_INSTANCES);
pages = rcy_set->pages[node_id];
page_count = &rcy_set->page_count[node_id];
if (pages == NULL) {
pages = dtc_rcy_alloc_page_space(size);
if (pages == NULL) {
OG_LOG_RUN_ERR("[DTC RCY] failed to malloc %u bytes for sending rcyset to instance=%u", size, node_id);
return OG_ERROR;
}
*page_count = 0;
rcy_set->pages[node_id] = pages;
}
pages[*page_count] = item->page_id;
(*page_count)++;
if (*page_count >= DTC_RCY_SET_SEND_MSG_MAX_PAGE_NUM) {
if (dtc_send_page_to_node(session, pages, *page_count, OG_FALSE, node_id, MES_CMD_SEND_RCY_SET) !=
OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to send num=%u pages from rcy set to node=%u, max_page_count=%lu",
*page_count, node_id, DTC_RCY_SET_SEND_MSG_MAX_PAGE_NUM);
return OG_ERROR;
}
*page_count = 0;
}
}
return OG_SUCCESS;
}
static status_t dtc_send_rcy_set(knl_session_t *session)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
rcy_set_item_pool_t *pool = dtc_rcy->rcy_set.item_pools;
status_t ret = OG_SUCCESS;
dtc_rcy->phase = PHASE_HANDLE_RCYSET;
OG_LOG_RUN_INF("[DTC RCY] start send rcy set to each master, dtc_rcy->phase=%u", dtc_rcy->phase);
while (pool != NULL) {
dtc_rcy_inc_rcy_set_ref_num();
SYNC_POINT_GLOBAL_START(OGRAC_RECOVERY_SEND_RCY_SET_FAIL, &ret, OG_ERROR);
ret = dtc_send_rcy_set_by_pool(session, pool, &dtc_rcy->rcy_set);
SYNC_POINT_GLOBAL_END;
if (ret != OG_SUCCESS) {
dtc_rcy_dec_rcy_set_ref_num();
OG_LOG_RUN_ERR("[DTC RCY] failed to send rcy set by pool, pool capacity=%llu, dtc_rcy->phase=%u, "
"rcy_set ref num=%u",
pool->hwm, dtc_rcy->phase, dtc_rcy->rcy_set_ref_num);
return OG_ERROR;
}
dtc_rcy_dec_rcy_set_ref_num();
OG_LOG_RUN_INF("[DTC RCY] send rcy set by pool, dtc_rcy->phase=%u, rcy_set ref num=%u", dtc_rcy->phase,
dtc_rcy->rcy_set_ref_num);
pool = pool->next;
}
page_id_t *pages = NULL;
uint32 *page_count = NULL;
for (uint32 i = 0; i < OG_MAX_INSTANCES; i++) {
dtc_rcy_inc_rcy_set_ref_num();
pages = dtc_rcy->rcy_set.pages[i];
page_count = &dtc_rcy->rcy_set.page_count[i];
if (pages == NULL || *page_count == 0) {
dtc_rcy_dec_rcy_set_ref_num();
continue;
}
if (dtc_send_page_to_node(session, pages, *page_count, OG_TRUE, i, MES_CMD_SEND_RCY_SET) != OG_SUCCESS) {
dtc_rcy_dec_rcy_set_ref_num();
OG_LOG_RUN_ERR("[DTC RCY] failed to send rcy set to node=%u, page_count=%u, rcy_set ref num=%u", i,
*page_count, dtc_rcy->rcy_set_ref_num);
return OG_ERROR;
}
*page_count = 0;
dtc_rcy_dec_rcy_set_ref_num();
OG_LOG_RUN_INF("[DTC RCY] send rcy set to node=%u, page_count=%u, rcy_set ref num=%u", i, *page_count,
dtc_rcy->rcy_set_ref_num);
}
if (dtc_rcy->msg_sent == 0) {
dtc_rcy->phase = PHASE_HANDLE_RCYSET_DONE;
}
dtc_rcy->rcy_set_send_time = KNL_NOW(session);
OG_LOG_RUN_INF("[DTC RCY] send %u rcy set messages, dtc_rcy->phase=%u, send time=%lld", dtc_rcy->msg_sent,
dtc_rcy->phase, dtc_rcy->rcy_set_send_time);
return OG_SUCCESS;
}
static status_t dtc_check_rcy_set_err_ack_msg(mes_message_t *msg)
{
if (sizeof(dtc_rcy_set_msg_t) > msg->head->size) {
return OG_ERROR;
}
dtc_rcy_set_msg_t *request = (dtc_rcy_set_msg_t *)msg->buffer;
if (sizeof(dtc_rcy_set_msg_t) + request->buffer_len != msg->head->size) {
return OG_ERROR;
}
return OG_SUCCESS;
}
void dtc_process_rcy_set_err_ack(void *sess, mes_message_t *msg)
{
if (dtc_check_rcy_set_err_ack_msg(msg) != OG_SUCCESS) {
OG_LOG_RUN_ERR("msg is invalid, msg size %u.", msg->head->size);
mes_release_message_buf(msg->buffer);
return;
}
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_set_msg_t *ack = (dtc_rcy_set_msg_t *)msg->buffer;
bool32 finished = ack->finished;
drc_remaster_mngr_t *remaster_mngr = &g_drc_res_ctx.part_mngr.remaster_mngr;
if (g_rc_ctx->status > REFORM_RECOVER_DONE || g_rc_ctx->status < REFORM_FROZEN ||
ack->reform_trigger_version < remaster_mngr->reform_info.trigger_version) {
OG_LOG_RUN_ERR("[DTC RCY] process rcy set err ack from master=%u, finished=%u, reform status(%u), msg reform "
"trigger version(%llu), local reform trigger version(%llu)",
ack->head.src_inst, finished, g_rc_ctx->status, ack->reform_trigger_version,
remaster_mngr->reform_info.trigger_version);
mes_release_message_buf(msg->buffer);
return;
}
OG_LOG_RUN_INF("[DTC RCY] process rcy set err ack from master=%u, finished=%u", ack->head.src_inst, finished);
if (!finished) {
dtc_rcy->failed = OG_TRUE;
}
dtc_rcy_inc_msg_recv();
mes_release_message_buf(msg->buffer);
return;
}
status_t dtc_rcy_set_update_no_need_replay_batch(rcy_set_t *rcy_set, page_id_t *no_rcy_pages, uint32 count)
{
rcy_set_bucket_t *bucket = NULL;
uint32 hash_id;
page_id_t *page_id = NULL;
status_t ret = OG_SUCCESS;
bool8 need_replay = OG_TRUE;
for (uint32 i = 0; i < count; i++) {
page_id = no_rcy_pages + i;
hash_id = dtc_rcy_bucket_hash(*page_id, rcy_set->bucket_num);
bucket = &rcy_set->buckets[hash_id];
cm_spin_lock(&bucket->lock, NULL);
need_replay = OG_FALSE;
ret = dtc_rcy_set_item_update_need_replay(bucket, *page_id, need_replay);
OG_LOG_RUN_RET_INFO(ret, "[DTC RCY][%u-%u] update need replay(%u) in rcy set", page_id->file, page_id->page,
need_replay);
if (ret != OG_SUCCESS) {
cm_spin_unlock(&bucket->lock);
return ret;
}
cm_spin_unlock(&bucket->lock);
}
return ret;
}
static status_t dtc_check_rcy_set_ack_msg(mes_message_t *msg)
{
if (sizeof(dtc_rcy_set_msg_t) > msg->head->size) {
return OG_ERROR;
}
dtc_rcy_set_msg_t *request = (dtc_rcy_set_msg_t *)msg->buffer;
if ((sizeof(dtc_rcy_set_msg_t) + request->buffer_len) != msg->head->size) {
return OG_ERROR;
}
return OG_SUCCESS;
}
void dtc_process_rcy_set_ack(void *sess, mes_message_t *msg)
{
if (dtc_check_rcy_set_ack_msg(msg) != OG_SUCCESS) {
OG_LOG_RUN_ERR("msg is invalid, msg size %u.", msg->head->size);
mes_release_message_buf(msg->buffer);
return;
}
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
drc_remaster_mngr_t *remaster_mngr = &g_drc_res_ctx.part_mngr.remaster_mngr;
dtc_rcy_set_msg_t *ack = (dtc_rcy_set_msg_t *)msg->buffer;
uint32 count = ack->count;
uint32 buffer_len = ack->buffer_len;
if (buffer_len != count * sizeof(page_id_t) || g_rc_ctx->status >= REFORM_RECOVER_DONE ||
g_rc_ctx->status < REFORM_FROZEN || ack->reform_trigger_version != remaster_mngr->reform_info.trigger_version ||
count > DTC_RCY_SET_SEND_MSG_MAX_PAGE_NUM || ack->head.size != sizeof(dtc_rcy_set_msg_t) + buffer_len) {
OG_LOG_RUN_ERR("[DTC RCY] receive page count=%u, max_page_count=%lu, no_rcy_pages buffer len=%u, reform "
"status(%u), msg reform trigger version(%llu), local reform trigger version(%llu), msg size(%u)",
count, DTC_RCY_SET_SEND_MSG_MAX_PAGE_NUM, buffer_len, g_rc_ctx->status,
ack->reform_trigger_version, remaster_mngr->reform_info.trigger_version, ack->head.size);
mes_release_message_buf(msg->buffer);
return;
}
page_id_t *no_rcy_pages = (page_id_t *)(msg->buffer + sizeof(dtc_rcy_set_msg_t));
bool32 finished = ack->finished;
OG_LOG_RUN_INF("[DTC RCY] process rcy set with edp from master=%u, no_rcy page count=%u, finished=%u",
ack->head.src_inst, count, finished);
if (!finished) {
dtc_rcy->failed = OG_TRUE;
dtc_rcy_inc_msg_recv();
mes_release_message_buf(msg->buffer);
OG_LOG_RUN_ERR("[DTC RCY] collect page info from inst=%u, finished=%u", ack->head.src_inst, finished);
return;
}
if (dtc_rcy->failed) {
mes_release_message_buf(msg->buffer);
return;
}
rcy_set_t *rcy_set = &dtc_rcy->rcy_set;
dtc_rcy_inc_rcy_set_ref_num();
if (dtc_rcy_set_update_no_need_replay_batch(rcy_set, no_rcy_pages, count) != OG_SUCCESS) {
dtc_rcy->failed = OG_TRUE;
}
OG_LOG_RUN_INF("[DTC RCY] finish delete no_rcy page count=%u, rcy_set ref num=%u", count, dtc_rcy->rcy_set_ref_num);
dtc_rcy_inc_msg_recv();
dtc_rcy_dec_rcy_set_ref_num();
OG_LOG_RUN_INF("[DTC RCY] finish process rcy set with edp ack, rcy_set ref num=%u", dtc_rcy->rcy_set_ref_num);
mes_release_message_buf(msg->buffer);
}
static bool32 dtc_rcy_page_need_recover(knl_session_t *session, page_id_t *page_id)
{
return drc_page_need_recover(session, page_id);
}
status_t dtc_send_page_back_to_node(knl_session_t *session, page_id_t *pages, uint32 count, bool32 finished,
uint8 node_id, uint8 cmd)
{
dtc_rcy_set_msg_t req;
status_t status;
drc_remaster_mngr_t *remaster_mngr = &g_drc_res_ctx.part_mngr.remaster_mngr;
mes_init_send_head(&req.head, cmd, sizeof(dtc_rcy_set_msg_t), OG_INVALID_ID32, session->kernel->id, node_id,
session->id, OG_INVALID_ID16);
req.count = count;
req.finished = finished;
req.buffer_len = req.count * sizeof(page_id_t);
req.head.size = (uint16)(sizeof(dtc_rcy_set_msg_t) + req.buffer_len);
req.reform_trigger_version = remaster_mngr->reform_info.trigger_version;
if (count > 0) {
status = mes_send_data3(&req.head, sizeof(dtc_rcy_set_msg_t), pages);
} else {
status = mes_send_data(&req.head);
}
OG_LOG_RUN_INF("[DTC RCY] send %u pages no need to rcy to instance=%u with cmd=%d, status=%d", count, node_id, cmd,
status);
return status;
}
static status_t dtc_process_rcy_set_parameter_check(dtc_rcy_set_msg_t *req, uint32 size)
{
drc_remaster_mngr_t *remaster_mngr = &g_drc_res_ctx.part_mngr.remaster_mngr;
uint32 buffer_len = req->buffer_len;
if (g_rc_ctx->status >= REFORM_RECOVER_DONE || g_rc_ctx->status < REFORM_FROZEN || req->count == 0 ||
req->count > DTC_RCY_SET_SEND_MSG_MAX_PAGE_NUM || buffer_len != size ||
buffer_len > DTC_RCY_SET_SEND_MSG_MAX_PAGE_NUM * sizeof(page_id_t) ||
req->reform_trigger_version != remaster_mngr->reform_info.trigger_version ||
sizeof(dtc_rcy_set_msg_t) + size != req->head.size) {
OG_LOG_RUN_ERR("[DTC RCY] receive page count=%u, max_page_count=%lu, buffer len=%u, max buffer len=%lu, reform"
" status(%u), msg reform trigger version(%llu), local reform trigger version(%llu), msgsize(%u)",
req->count, DTC_RCY_SET_SEND_MSG_MAX_PAGE_NUM, buffer_len,
DTC_RCY_SET_SEND_MSG_MAX_PAGE_NUM * sizeof(page_id_t), g_rc_ctx->status,
req->reform_trigger_version, remaster_mngr->reform_info.trigger_version, req->head.size);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t dtc_check_process_rcy_set_msg(mes_message_t *msg)
{
if (sizeof(dtc_rcy_set_msg_t) > msg->head->size) {
return OG_ERROR;
}
dtc_rcy_set_msg_t *request = (dtc_rcy_set_msg_t *)msg->buffer;
if ((sizeof(dtc_rcy_set_msg_t) + request->count * sizeof(page_id_t)) != msg->head->size) {
return OG_ERROR;
}
return OG_SUCCESS;
}
void dtc_process_rcy_set(void *sess, mes_message_t *receive_msg)
{
if (dtc_check_process_rcy_set_msg(receive_msg) != OG_SUCCESS) {
OG_LOG_RUN_ERR("msg is invalid, msg size %u.", receive_msg->head->size);
mes_release_message_buf(receive_msg->buffer);
return;
}
knl_session_t *session = (knl_session_t *)sess;
dtc_rcy_set_msg_t *req = (dtc_rcy_set_msg_t *)receive_msg->buffer;
page_id_t *pages_recv = (page_id_t *)(receive_msg->buffer + sizeof(dtc_rcy_set_msg_t));
uint8 src_inst = receive_msg->head->src_inst;
uint32 size = req->count * sizeof(page_id_t);
uint32 count = 0;
bool32 need_recover = OG_FALSE;
if (dtc_process_rcy_set_parameter_check(req, size) != OG_SUCCESS) {
mes_release_message_buf(receive_msg->buffer);
return;
}
page_id_t *page_id = NULL;
page_id_t *pages = NULL;
pages = dtc_rcy_alloc_page_space(size);
if (pages == NULL) {
OG_LOG_RUN_ERR("[DTC RCY] failed to malloc %u bytes to collect do not need rcy page info from instance=%u",
size, receive_msg->head->src_inst);
mes_release_message_buf(receive_msg->buffer);
if (dtc_send_page_back_to_node(session, pages, count, OG_FALSE, src_inst, MES_CMD_SEND_RCY_SET_ERR_ACK) !=
OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to send error msg to instance=%u", src_inst);
}
return;
}
OG_LOG_RUN_INF("[DTC RCY] process recovery set of %u pages size=%u from instance=%u", req->count, size, src_inst);
for (uint32 i = 0; i < req->count; i++) {
page_id = pages_recv + i;
need_recover = dtc_rcy_page_need_recover(session, page_id);
if (!need_recover) {
pages[count++] = *page_id;
OG_LOG_DEBUG_INF("[DTC RCY] process recovery set, page [%u-%u] no need to rcy in instance=%u",
page_id->file, page_id->page, session->inst_id);
}
}
OG_LOG_RUN_INF("[DTC RCY] master process rcy set, total check page count=%u, collect no_rcy page count=%u",
req->count, count);
mes_release_message_buf(receive_msg->buffer);
if (dtc_send_page_back_to_node(session, pages, count, OG_TRUE, src_inst, MES_CMD_SEND_RCY_SET_ACK) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to send rcy set result to instance=%u", src_inst);
CM_FREE_PTR(pages);
return;
}
CM_FREE_PTR(pages);
}
bool8 dtc_rcy_page_in_rcyset(page_id_t page_id)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
if (dtc_rcy->full_recovery) {
return OG_TRUE;
}
rcy_set_t *rcy_set = &dtc_rcy->rcy_set;
uint32 hash_id = dtc_rcy_bucket_hash(page_id, rcy_set->bucket_num);
rcy_set_bucket_t *bucket = &rcy_set->buckets[hash_id];
rcy_set_item_t *item = dtc_rcy_get_item(bucket, page_id);
knl_panic_log(item != NULL,
"rcy set item is NULL, panic info: page[%u-%u] is not in rcy set, but appears in "
"replay",
page_id.file, page_id.page);
return item->need_replay;
}
bool32 dtc_page_in_rcyset(knl_session_t *session, page_id_t page_id)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
rcy_set_t *rcy_set = &dtc_rcy->rcy_set;
uint32 hash_id = dtc_rcy_bucket_hash(page_id, rcy_set->bucket_num);
rcy_set_bucket_t *bucket = &rcy_set->buckets[hash_id];
rcy_set_item_t *item = dtc_rcy_get_item(bucket, page_id);
uint64 curr_page_lsn = OG_INVALID_ID64;
if (item != NULL && item->need_replay) {
buf_bucket_t *buf_bucket = buf_find_bucket(session, page_id);
cm_spin_lock(&buf_bucket->lock, NULL);
buf_ctrl_t *ctrl = buf_find_from_bucket(buf_bucket, page_id);
if (!ctrl || ctrl->lock_mode == DRC_LOCK_NULL) {
as the page on disk may be not the latest one. */
curr_page_lsn = 0;
cm_spin_unlock(&buf_bucket->lock);
} else {
curr_page_lsn = (ctrl->page)->lsn;
cm_spin_unlock(&buf_bucket->lock);
}
if (item->last_dirty_lsn <= curr_page_lsn) {
item->need_replay = OG_FALSE;
return OG_FALSE;
} else {
return item->need_replay;
}
}
return OG_FALSE;
}
void dtc_rcy_page_update_need_replay(page_id_t page_id)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
rcy_set_t *rcy_set = &dtc_rcy->rcy_set;
uint32 hash_id = dtc_rcy_bucket_hash(page_id, rcy_set->bucket_num);
rcy_set_bucket_t *bucket = &rcy_set->buckets[hash_id];
rcy_set_item_t *item = dtc_rcy_get_item(bucket, page_id);
knl_panic_log(item != NULL,
"rcy set item is NULL, panic info: page[%u-%u] is not in rcy set, but appears in "
"replay",
page_id.file, page_id.page);
item->need_replay = OG_TRUE;
}
static void dtc_print_batch(log_batch_t *batch, uint8 node_id)
{
OG_LOG_DEBUG_INF("[DTC RCY] Log Batch lfn=%llu, lsn=%llu, scn=%llu, head magic=%llx. point [%u-%u/%u], "
"size=%u, space size=%u for instance=%u",
(uint64)batch->head.point.lfn, batch->lsn, batch->scn, batch->head.magic_num,
batch->head.point.rst_id, batch->head.point.asn, batch->head.point.block_id, batch->size,
batch->space_size, node_id);
}
static void dtc_rcy_close_logfile(knl_session_t *session)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
if (dtc_rcy->rcy_nodes == NULL) {
return;
}
for (uint32 i = 0; i < dtc_rcy->node_count; i++) {
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[i];
if (rcy_node == NULL) {
continue;
}
if (rcy_node->arch_file.handle != OG_INVALID_HANDLE) {
cm_close_device(cm_device_type(rcy_node->arch_file.name), &rcy_node->arch_file.handle);
rcy_node->arch_file.handle = OG_INVALID_HANDLE;
rcy_node->arch_file.name[0] = '\0';
rcy_node->arch_file.head.rst_id = 0;
rcy_node->arch_file.head.asn = 0;
}
logfile_set_t *log_set = LOGFILE_SET(session, rcy_node->node_id);
for (uint32 j = 0; j < log_set->logfile_hwm; j++) {
if (rcy_node->handle[j] != OG_INVALID_HANDLE) {
cm_close_device(log_set->items[j].ctrl->type, &rcy_node->handle[j]);
}
}
}
}
static void free_paral_mgr()
{
CM_FREE_PTR(g_analyze_paral_mgr.free_list.array);
CM_FREE_PTR(g_analyze_paral_mgr.buf_list);
CM_FREE_PTR(g_analyze_paral_mgr.used_list.array);
CM_FREE_PTR(g_replay_paral_mgr.buf_list);
CM_FREE_PTR(g_replay_paral_mgr.group_list);
CM_FREE_PTR(g_replay_paral_mgr.batch_scn);
CM_FREE_PTR(g_replay_paral_mgr.node_id);
CM_FREE_PTR(g_replay_paral_mgr.batch_rpl_start_time);
CM_FREE_PTR(g_replay_paral_mgr.free_list.array);
free((void *)g_replay_paral_mgr.group_num);
g_replay_paral_mgr.group_num = NULL;
}
void dtc_recovery_close(knl_session_t *session)
{
OG_LOG_RUN_INF("[DTC RCY] start dtc recovery close");
if (rc_is_master() == OG_FALSE) {
return;
}
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_close_logfile(session);
while (dtc_rcy->rcy_set_ref_num != 0) {
OG_LOG_RUN_INF("[DTC RCY] wait rcy_set ref num=%u", dtc_rcy->rcy_set_ref_num);
cm_sleep(DTC_RCY_WAIT_REF_NUM_CLEAN_SLEEP_TIME);
}
for (uint32 i = 0; i < OG_MAX_INSTANCES; i++) {
if (dtc_rcy->rcy_set.pages[i] != NULL) {
CM_FREE_PTR(dtc_rcy->rcy_set.pages[i]);
}
}
rcy_set_item_pool_t *pool = dtc_rcy->rcy_set.item_pools;
rcy_set_item_pool_t *next = NULL;
while (pool != NULL) {
next = pool->next;
CM_FREE_PTR(pool);
pool = next;
}
uint32 read_buf_size = g_instance->kernel.attr.rcy_node_read_buf_size;
if (dtc_rcy->rcy_nodes != NULL) {
for (uint32 i = 0; i < dtc_rcy->node_count; i++) {
for (int j = 0; j < read_buf_size; ++j) {
cm_aligned_free(&dtc_rcy->rcy_nodes[i].read_buf[j]);
}
CM_FREE_PTR(dtc_rcy->rcy_nodes[i].read_buf_ready);
CM_FREE_PTR(dtc_rcy->rcy_nodes[i].read_pos);
CM_FREE_PTR(dtc_rcy->rcy_nodes[i].write_pos);
CM_FREE_PTR(dtc_rcy->rcy_nodes[i].read_size);
CM_FREE_PTR(dtc_rcy->rcy_nodes[i].not_finished);
}
}
CM_FREE_PTR(dtc_rcy->rcy_nodes);
free_paral_mgr();
rcy_set_t *rcy_set = &dtc_rcy->rcy_set;
if (rcy_set->buckets != NULL) {
CM_FREE_PTR(rcy_set->buckets);
}
if (dtc_rcy->paral_rcy) {
rcy_close_proc(session);
rcy_free_buffer(&session->kernel->rcy_ctx);
}
if (!dtc_rcy->full_recovery) {
g_knl_callback.release_knl_session(session);
}
dtc_rcy->in_progress = OG_FALSE;
dtc_rcy->ss->dtc_session_type = DTC_TYPE_NONE;
OG_LOG_RUN_INF("[DTC RCY] finish dtc recovery close");
}
static inline bool32 dtc_log_file_not_used(dtc_node_ctrl_t *ctrl, uint32 file)
{
bool32 not_used = OG_FALSE;
if (ctrl->log_first <= ctrl->log_last) {
not_used = file < ctrl->log_first || file > ctrl->log_last;
} else {
not_used = file < ctrl->log_first && file > ctrl->log_last;
}
return not_used;
}
static inline void dtc_init_not_used_log_file(log_file_t *file, database_t *db)
{
file->head.rst_id = db->ctrl.core.resetlogs.rst_id;
file->head.write_pos = CM_CALC_ALIGN(sizeof(log_file_head_t), file->ctrl->block_size);
file->head.block_size = file->ctrl->block_size;
file->head.asn = OG_INVALID_ASN;
}
static inline void dtc_init_dbs_log_file(log_file_t *file, database_t *db)
{
file->head.rst_id = db->ctrl.core.resetlogs.rst_id;
file->head.write_pos = 0;
}
static status_t dtc_init_node_logset(knl_session_t *session, uint8 idx)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[idx];
logfile_set_t *file_set = LOGFILE_SET(session, rcy_node->node_id);
dtc_node_ctrl_t *ctrl = dtc_get_ctrl(session, rcy_node->node_id);
database_t *db = &session->kernel->db;
log_file_t *file = NULL;
char *buf = rcy_node->read_buf[rcy_node->read_buf_read_index].aligned_buf;
if (session->kernel->id == rcy_node->node_id) {
return OG_SUCCESS;
}
file_set->logfile_hwm = ctrl->log_hwm;
file_set->log_count = ctrl->log_count;
for (uint32 i = 0; i < file_set->logfile_hwm; i++) {
file = &file_set->items[i];
file->ctrl = (log_file_ctrl_t *)db_get_log_ctrl_item(db->ctrl.pages, i, sizeof(log_file_ctrl_t),
db->ctrl.log_segment, rcy_node->node_id);
if (LOG_IS_DROPPED(file->ctrl->flg)) {
continue;
}
if (dtc_log_file_not_used(ctrl, i)) {
dtc_init_not_used_log_file(file, db);
continue;
}
if (cm_open_device(file->ctrl->name, file->ctrl->type, knl_io_flag(session), &rcy_node->handle[i]) !=
OG_SUCCESS) {
OG_LOG_RUN_ERR("[DB] failed to open redo log file=%s ", file->ctrl->name);
return OG_ERROR;
}
if (cm_dbs_is_enable_dbs() == OG_TRUE) {
dtc_init_dbs_log_file(file, db);
OG_LOG_RUN_INF("[DTC RCY] Init logfile=%s, handle=%d, point=[%u-%u] write_pos=%llu for instance=%u",
file->ctrl->name, rcy_node->handle[i], file->head.rst_id, file->head.asn,
file->head.write_pos, rcy_node->node_id);
break;
}
if (cm_read_device(file->ctrl->type, rcy_node->handle[i], 0, buf,
CM_CALC_ALIGN(sizeof(log_file_head_t), file->ctrl->block_size)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DB] failed to open redo log file=%s ", file->ctrl->name);
return OG_ERROR;
}
if (log_verify_head_checksum(session, (log_file_head_t *)buf, file->ctrl->name) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to verify head checksum of log file=%s", file->ctrl->name);
return OG_ERROR;
}
errno_t ret = memcpy_sp(&file->head, sizeof(log_file_head_t), buf, sizeof(log_file_head_t));
knl_securec_check(ret);
OG_LOG_RUN_INF("[DTC RCY] Init logfile=%s, handle=%d, point=[%u-%u] write_pos=%llu for instance=%u",
file->ctrl->name, rcy_node->handle[i], file->head.rst_id, file->head.asn, file->head.write_pos,
rcy_node->node_id);
}
return OG_SUCCESS;
}
static inline bool32 dtc_stats_lsn_is_changed(uint64 *lsn_record, uint64 curr_lsn)
{
bool32 changed = (curr_lsn != *lsn_record);
if (changed) {
*lsn_record = curr_lsn;
}
return changed;
}
void dtc_rcy_next_file(knl_session_t *session, uint32 idx, bool32 *need_more_log)
{
OG_LOG_DEBUG_INF("[DTC RCY] dtc rcy next file");
reset_log_t *reset_log = &session->kernel->db.ctrl.core.resetlogs;
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
reform_rcy_node_t *rcy_log_point = &dtc_rcy->rcy_log_points[idx];
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[idx];
log_point_t *point = &rcy_log_point->rcy_write_point;
log_point_t *reply_point = &rcy_log_point->rcy_point;
dtc_node_ctrl_t *ctrl = dtc_get_ctrl(session, rcy_log_point->node_id);
if (cm_dbs_is_enable_dbs() == OG_FALSE) {
logfile_set_t *log_set = LOGFILE_SET(session, rcy_log_point->node_id);
uint32 curr_file = ctrl->log_last;
if (LOG_POINT_FILE_EQUAL(*point, log_set->items[curr_file].head)) {
*need_more_log = OG_FALSE;
return;
}
}
if (point->rst_id < reset_log->rst_id && point->asn == ctrl->last_asn && (uint64)point->lfn == ctrl->last_lfn) {
point->rst_id++;
point->asn++;
point->block_id = 0;
reply_point->rst_id++;
reply_point->asn++;
reply_point->block_id = 0;
*need_more_log = OG_TRUE;
if (rcy_node->latest_rcy_end_lsn != rcy_node->recovery_read_end_point.lsn) {
OG_LOG_RUN_INF("[DTC RCY] Move log point to [%u-%u/%u/%llu]", (uint32)point->rst_id, point->asn,
point->block_id, (uint64)point->lfn);
}
} else {
point->asn++;
point->block_id = 0;
reply_point->asn++;
reply_point->block_id = 0;
*need_more_log = OG_TRUE;
if (rcy_node->latest_rcy_end_lsn != rcy_node->recovery_read_end_point.lsn &&
dtc_stats_lsn_is_changed(&(rcy_node->lsn_records.move_point_lsn_record),
rcy_log_point->rcy_write_point.lsn)) {
OG_LOG_RUN_INF("[DTC RCY] Move log point to [%u-%u/%u/%llu]", (uint32)point->rst_id, point->asn,
point->block_id, (uint64)point->lfn);
}
}
rcy_node->curr_file_length = 0;
}
static bool32 dtc_rcy_check_ulog(knl_session_t *session, uint32 idx)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[idx];
reform_rcy_node_t *rcy_log_point = &dtc_rcy->rcy_log_points[idx];
log_point_t *point = &rcy_log_point->rcy_point;
uint64 start_lsn = point->lsn + 1;
int32 *handle = &rcy_node->handle[0];
logfile_set_t *log_set = LOGFILE_SET(session, rcy_log_point->node_id);
log_file_t *file = &log_set->items[0];
device_type_t type = cm_device_type(file->ctrl->name);
if (cm_open_device(file->ctrl->name, type, knl_io_flag(session), handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DB] failed to open redo log file %s ", file->ctrl->name);
return OG_ERROR;
}
bool32 ulog_is_valid = cm_check_device_offset_valid(type, *handle, start_lsn);
OG_LOG_RUN_INF("[DTC RCY] check ulog lsn %lld from %s, handle %d, inst_id %u result %d", start_lsn,
file->ctrl->name, *handle, rcy_log_point->node_id, ulog_is_valid);
return ulog_is_valid;
}
static bool32 dtc_rcy_check_log_is_exist(knl_session_t *session, uint32 idx)
{
return dtc_rcy_check_ulog(session, idx);
}
uint32 dtc_rcy_get_logfile_by_node(knl_session_t *session, uint32 idx)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[idx];
reform_rcy_node_t *rcy_log_point = &dtc_rcy->rcy_log_points[idx];
logfile_set_t *log_set = LOGFILE_SET(session, rcy_log_point->node_id);
log_point_t *point = &rcy_log_point->rcy_write_point;
log_file_t *file = NULL;
OG_LOG_DEBUG_INF("[DTC RCY] dtc_rcy_get_logfile_by_node point->rst_id = %u,"
" point->asn = %u siz log_set->logfile_hwm = %u",
point->rst_id, point->asn, log_set->logfile_hwm);
for (uint32 i = 0; i < log_set->logfile_hwm; i++) {
file = &log_set->items[i];
if (LOG_IS_DROPPED(file->ctrl->flg)) {
continue;
}
if (cm_dbs_is_enable_dbs() == OG_TRUE) {
if (rcy_node->ulog_exist_data) {
return i;
}
return OG_INVALID_ID32;
}
if (file->head.rst_id != point->rst_id || file->head.asn != point->asn) {
continue;
}
cm_latch_s(&file->latch, session->id, OG_FALSE, NULL);
if (file->head.rst_id != point->rst_id || file->head.asn != point->asn) {
cm_unlatch(&file->latch, NULL);
continue;
}
return i;
}
return OG_INVALID_ID32;
}
status_t dtc_rcy_set_batch_invalidate(knl_session_t *session, log_batch_t *batch)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[dtc_rcy->curr_node_idx];
if (rcy_node->curr_file_length < batch->space_size) {
return OG_SUCCESS;
}
rcy_node->curr_file_length -= batch->space_size;
arch_file_t *file = &rcy_node->arch_file;
device_type_t type = cm_device_type(file->name);
batch->head.magic_num = LOG_INVALIDATE_MAGIC_NUMBER;
int64 offset = (int64)(rcy_node->curr_file_length + rcy_node->blk_size);
if (cm_write_device(type, file->handle, offset, (void *)batch, (int32)batch->space_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] ABORT INFO: flush batch:%s, offset:%lld, size:%d failed.", file->name, offset,
(int32)batch->space_size);
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t dtc_rcy_read_log(knl_session_t *session, int32 *handle, const char *name, int64 offset, void *buf,
int64 buf_size, int64 size_need_read, uint32 *size_read)
{
int64 size = size_need_read;
*size_read = 0;
if (size_need_read == 0) {
OG_LOG_DEBUG_WAR("[DTC RCY] read redo log size_need_read=%lld, offset=%lld, logfile handle=%d "
"from file=%s",
size_need_read, offset, *handle, name);
return OG_SUCCESS;
}
if (size_need_read > buf_size) {
size = buf_size;
}
device_type_t type = cm_device_type(name);
if (type != DEV_TYPE_ULOG) {
type = arch_get_device_type(name);
}
if (cm_open_device(name, type, knl_io_flag(session), handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to open redo log, filename=%s", name);
return OG_ERROR;
}
if (cm_dbs_is_enable_dbs() == OG_TRUE) {
int32 return_size = 0;
if (cm_read_device_nocheck(type, *handle, offset, buf, (int32)size, &return_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to read redo log size_need_read=%lld, offset=%lld, logfile handle=%d "
"from file=%s",
size_need_read, offset, *handle, name);
if (DB_IS_MAXFIX(session)) {
errno_t ret = memset_sp(buf, size, 0, size);
knl_securec_check(ret);
*size_read = size;
return OG_SUCCESS;
}
return OG_ERROR;
}
*size_read = return_size;
} else {
if (cm_read_device(type, *handle, offset, buf, (int32)size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to read redo log size_need_read=%lld, offset=%lld, logfile handle=%d "
"from file=%s",
size_need_read, offset, *handle, name);
return OG_ERROR;
}
*size_read = (int32)size;
}
OG_LOG_DEBUG_INF("[DTC RCY] read redo log size=%lld, offset=%lld from=%s, size_need_read=%lld", size, offset, name,
size_need_read);
return OG_SUCCESS;
}
static status_t dtc_rcy_read_online_log(knl_session_t *session, uint32 file_id, uint32 idx, uint32 *size_read)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[idx];
reform_rcy_node_t *rcy_log_point = &dtc_rcy->rcy_log_points[idx];
logfile_set_t *log_set = LOGFILE_SET(session, rcy_log_point->node_id);
log_file_t *file = &log_set->items[file_id];
int32 *handle = &rcy_node->handle[file_id];
char *buf = rcy_node->read_buf[rcy_node->read_buf_write_index].aligned_buf;
int64 buf_size = rcy_node->read_buf[rcy_node->read_buf_write_index].buf_size;
log_point_t *point = &rcy_log_point->rcy_write_point;
if (point->block_id == 0) {
point->block_id = 1;
}
if (rcy_node->blk_size == 0) {
rcy_node->blk_size = file->ctrl->block_size;
}
int64 file_size = file->head.write_pos;
if (file->ctrl->status == LOG_FILE_CURRENT) {
file_size = file->ctrl->size;
}
int64 offset = (int64)point->block_id * file->ctrl->block_size;
int64 size_need_read = file_size - offset;
if (cm_dbs_is_enable_dbs() == OG_TRUE) {
offset = point->lsn + 1;
size_need_read = buf_size;
OG_LOG_DEBUG_INF("[DTC RCY] dtc_rcy_read_online_log cm_dbs_is_enable_dbs() == OG_TRUE offset=%llu", offset);
}
if (rcy_node->latest_lsn != offset) {
OG_LOG_RUN_INF("[DTC RCY] start read online redo log point %u/%u/%lld from %s", point->asn, point->block_id,
offset, file->ctrl->name);
rcy_node->latest_lsn = offset;
}
return dtc_rcy_read_log(session, handle, file->ctrl->name, offset, buf, buf_size, size_need_read, size_read);
}
static status_t dtc_rcy_load_archfile_no_dbs(knl_session_t *session, uint32 idx, arch_file_t *file, log_point_t *point)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[idx];
if (!arch_get_archived_log_name(session, (uint32)point->rst_id, point->asn, ARCH_DEFAULT_DEST, file->name,
OG_FILE_NAME_BUFFER_SIZE, rcy_node->node_id)) {
arch_set_archive_log_name(session, (uint32)point->rst_id, point->asn, ARCH_DEFAULT_DEST, file->name,
OG_FILE_NAME_BUFFER_SIZE, rcy_node->node_id);
if (!cm_exist_device(arch_get_device_type(file->name), file->name)) {
OG_LOG_RUN_ERR("[DTC RCY] failed to get archived redo log file[%u-%u] for instance %u name:%s",
(uint32)point->rst_id, point->asn, rcy_node->node_id, file->name);
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t dtc_rcy_load_archfile(knl_session_t *session, uint32 idx, arch_file_t *file, log_point_t *point,
bool8 *finish)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[idx];
bool32 is_dbstor = cm_dbs_is_enable_dbs();
if (!DB_CLUSTER_NO_CMS && !is_dbstor && file->head.rst_id == point->rst_id && file->head.asn == point->asn) {
OG_LOG_RUN_INF("[DTC RCY] dtc rcy load archfile already load the need archived logfile %u/%u/ ", point->asn,
point->block_id);
return OG_SUCCESS;
}
device_type_t type = arch_get_device_type(file->name);
if (file->handle != OG_INVALID_HANDLE) {
cm_close_device(type, &file->handle);
file->handle = OG_INVALID_HANDLE;
}
if (is_dbstor || DB_CLUSTER_NO_CMS) {
arch_ctrl_t *arch_ctrl = arch_get_archived_log_info_for_recovery(session, (uint32)point->rst_id, point->asn,
ARCH_DEFAULT_DEST, point->lsn,
rcy_node->node_id);
if (arch_ctrl == NULL) {
OG_LOG_RUN_WAR_LIMIT(LOG_PRINT_INTERVAL_SECOND_20,
"[RECOVERY] failed to get archived log for [%u-%u-%u-%llu]", rcy_node->node_id,
point->rst_id, point->asn, point->lsn);
if (!DB_CLUSTER_NO_CMS) {
return OG_ERROR;
}
if (dtc_rcy_load_archfile_no_dbs(session, idx, file, point) != OG_SUCCESS) {
OG_LOG_RUN_WAR("[DTC RCY] dtc rcy load archfile no dbs is null %u/%u/%s ", point->asn, point->block_id,
file->name);
*finish = OG_TRUE;
return OG_SUCCESS;
}
}
if (arch_ctrl != NULL) {
point->asn = arch_ctrl->asn;
OG_LOG_RUN_INF("[DTC RCY] dtc rcy load archfile arch ctrl is null %u/%u/%s ", point->asn, point->block_id,
file->name);
arch_file_name_info_t file_name_info = {
arch_ctrl->rst_id, arch_ctrl->asn, rcy_node->node_id, OG_FILE_NAME_BUFFER_SIZE,
arch_ctrl->start_lsn, arch_ctrl->end_lsn, file->name
};
char str_buf[OG_FILE_NAME_BUFFER_SIZE] = { 0 };
status_t ret = snprintf_s(str_buf, OG_FILE_NAME_BUFFER_SIZE, OG_MAX_FILE_NAME_LEN, "%s", file->name);
knl_securec_check_ss(ret);
arch_set_archive_log_name_with_lsn(session, ARCH_DEFAULT_DEST, &file_name_info);
if (!cm_exist_device(type, file->name)) {
OG_LOG_RUN_WAR("[DTC RCY] get archived redo log file[%u-%u] for instance %u", (uint32)point->rst_id,
point->asn, rcy_node->node_id);
if (!DB_CLUSTER_NO_CMS) {
return OG_ERROR;
}
ret = snprintf_s(file->name, OG_FILE_NAME_BUFFER_SIZE, OG_MAX_FILE_NAME_LEN, "%s", str_buf);
knl_securec_check_ss(ret);
if (dtc_rcy_load_archfile_no_dbs(session, idx, file, point) != OG_SUCCESS) {
OG_LOG_RUN_INF("[DTC RCY] dtc rcy load archfile no dbs is null %u/%u/%s ", point->asn,
point->block_id, file->name);
*finish = OG_TRUE;
return OG_SUCCESS;
}
}
}
} else {
if (dtc_rcy_load_archfile_no_dbs(session, idx, file, point) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] dtc rcy load archfile %u/%u/%s ", point->asn, point->block_id, file->name);
return OG_ERROR;
}
}
type = arch_get_device_type(file->name);
if (cm_open_device(file->name, type, knl_io_flag(session), &file->handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to open archived redo log file %s", file->name);
return OG_ERROR;
}
if (cm_read_device(type, file->handle, 0, rcy_node->read_buf[rcy_node->read_buf_write_index].aligned_buf,
CM_CALC_ALIGN((uint32)sizeof(log_file_head_t), 512)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to read %s, offset 0 handle %d", file->name, file->handle);
return OG_ERROR;
}
errno_t errcode;
errcode = memcpy_s(&file->head, (int32)sizeof(log_file_head_t),
rcy_node->read_buf[rcy_node->read_buf_write_index].aligned_buf, (int32)sizeof(log_file_head_t));
knl_securec_check(errcode);
return log_verify_head_checksum(session, &file->head, file->name);
}
bool32 dtc_rcy_validate_batch(log_batch_t *batch)
{
if (batch == NULL) {
OG_LOG_RUN_ERR("[DTC RCY] dtc rcy validate batch is NULL");
return OG_FALSE;
}
if (batch->size < sizeof(log_batch_t) || batch->space_size < batch->size ||
batch->size > OG_MAX_BATCH_SIZE || batch->space_size > OG_MAX_BATCH_SIZE) {
OG_LOG_RUN_ERR("[DTC RCY] validate batch failed: invalid size, magic=%llx size=%u space_size=%u",
(uint64)batch->head.magic_num, batch->size, batch->space_size);
return OG_FALSE;
}
log_batch_tail_t *tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));
if (tail == NULL) {
OG_LOG_RUN_ERR("dtc rcy validate batch tail is NULL");
return OG_FALSE;
}
if (batch->head.magic_num == LOG_MAGIC_NUMBER && tail->magic_num == LOG_MAGIC_NUMBER &&
batch->head.point.lfn == tail->point.lfn && batch->size != 0) {
return OG_TRUE;
}
if (batch->head.magic_num == LOG_INVALIDATE_MAGIC_NUMBER && tail->magic_num == LOG_MAGIC_NUMBER &&
batch->head.point.lfn == tail->point.lfn && batch->size != 0) {
return OG_FALSE;
}
OG_LOG_RUN_ERR("[DTC RCY] head magic_num:%llx, lsn:%llu, lfn:%llu, tail magic_num:%llx, lsn:%llu, "
"lfn:%llu, size:%u",
batch->head.magic_num, batch->head.point.lsn, (uint64)batch->head.point.lfn, tail->magic_num,
tail->point.lsn, (uint64)tail->point.lfn, batch->size);
if (cm_dbs_is_enable_dbs() == OG_TRUE) {
if (g_instance->kernel.db.open_status == DB_OPEN_STATUS_MAX_FIX) {
return OG_FALSE;
}
knl_panic(0);
}
return OG_FALSE;
}
status_t dtc_rcy_find_batch_by_lsn(char *buf, dtc_rcy_node_t *rcy_node, log_point_t *point, int32 size_read,
bool8 *is_find_start)
{
int32 buffer_size = size_read;
uint32 invalide_size = 0;
log_batch_t *batch = NULL;
if (buf == NULL) {
OG_LOG_RUN_ERR("[DTC RCY] batch is null, read_size[%d], invalide_size[%u], point[%u/%u/%u%llu/%llu]", size_read,
invalide_size, point->rst_id, point->asn, point->block_id, point->lsn, (uint64)point->lfn);
return OG_ERROR;
}
while (buffer_size >= sizeof(log_batch_t)) {
batch = (log_batch_t *)(buf + invalide_size);
if (buffer_size < batch->size) {
break;
}
if (!dtc_rcy_validate_batch(batch)) {
OG_LOG_RUN_ERR("[DTC RCY] batch is invalidate, read_size[%d], invalide_size[%u], point[%u/%u/%u%llu/%llu]",
size_read, invalide_size, point->rst_id, point->asn, point->block_id, point->lsn,
(uint64)point->lfn);
rcy_node->recover_done = OG_TRUE;
*is_find_start = OG_TRUE;
rcy_node->read_pos[rcy_node->read_buf_write_index] += invalide_size;
return OG_ERROR;
}
if (batch->head.point.lsn > point->lsn) {
break;
}
invalide_size += batch->space_size;
buffer_size -= batch->space_size;
}
point->block_id += invalide_size / rcy_node->blk_size;
rcy_node->curr_file_length += invalide_size;
if (batch->head.point.lsn > point->lsn) {
*is_find_start = OG_TRUE;
rcy_node->read_pos[rcy_node->read_buf_write_index] += invalide_size;
return OG_SUCCESS;
}
return OG_SUCCESS;
}
status_t dtc_rcy_read_archived_log(knl_session_t *session, uint32 idx, uint32 *size_read)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[idx];
reform_rcy_node_t *rcy_log_point = &dtc_rcy->rcy_log_points[idx];
arch_file_t *file = &rcy_node->arch_file;
char *buf = rcy_node->read_buf[rcy_node->read_buf_write_index].aligned_buf;
int64 buf_size = rcy_node->read_buf[rcy_node->read_buf_write_index].buf_size;
log_point_t *point = &rcy_log_point->rcy_write_point;
bool8 is_find_start = OG_TRUE;
bool8 repair_finish = OG_FALSE;
if (dtc_rcy_load_archfile(session, idx, file, point, &repair_finish) != OG_SUCCESS) {
return OG_ERROR;
}
if (repair_finish) {
OG_LOG_RUN_INF("repair page read archiver log finish");
return OG_SUCCESS;
}
if (point->block_id == 0) {
point->block_id = 1;
}
if (point->block_id == OG_INFINITE32) {
is_find_start = OG_FALSE;
point->block_id = 1;
}
if (rcy_node->blk_size == 0) {
rcy_node->blk_size = file->head.block_size;
}
do {
int64 offset = (int64)point->block_id * file->head.block_size;
int64 size_need_read = file->head.write_pos - offset;
status_t status = dtc_rcy_read_log(session, &file->handle, file->name, offset, buf, buf_size, size_need_read,
size_read);
if (status != OG_SUCCESS) {
return status;
}
if (*size_read == 0) {
return status;
}
if (is_find_start) {
break;
}
if (dtc_rcy_find_batch_by_lsn(buf, rcy_node, point, (int32)(*size_read), &is_find_start) != OG_SUCCESS) {
return OG_ERROR;
}
} while (*size_read != 0 && !is_find_start);
return OG_SUCCESS;
}
static status_t dtc_recover_check_assign_nodeid(knl_session_t *session, uint32_t node_id)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
reform_rcy_node_t *rcy_log_point = NULL;
dtc_node_ctrl_t *ctrl = NULL;
knl_panic(node_id <= dtc_rcy->node_count);
rcy_log_point = &dtc_rcy->rcy_log_points[node_id];
ctrl = dtc_get_ctrl(session, rcy_log_point->node_id);
OG_LOG_RUN_INF_LIMIT(LOG_PRINT_INTERVAL_SECOND_20,
"[DTC RCY] node:%u, recovery real end with file: %u, point: %u, lfn: %llu",
rcy_log_point->node_id, rcy_log_point->rcy_point.asn, rcy_log_point->rcy_point.block_id,
(uint64)rcy_log_point->rcy_point.lfn);
OG_LOG_RUN_INF_LIMIT(LOG_PRINT_INTERVAL_SECOND_20,
"[DTC RCY] node:%u, current lfn: %llu, rcy point lfn: %llu, lrp point lfn: %llu",
rcy_log_point->node_id, (uint64)rcy_log_point->rcy_point.lfn, (uint64)ctrl->rcy_point.lfn,
(uint64)(uint64)ctrl->lrp_point.lfn);
OG_LOG_RUN_INF_LIMIT(LOG_PRINT_INTERVAL_SECOND_20,
"[DTC RCY] node:%u, recovery real end with file: %u, read node log proc point: %u, lfn: %llu",
rcy_log_point->node_id, rcy_log_point->rcy_write_point.asn,
rcy_log_point->rcy_write_point.block_id, (uint64)rcy_log_point->rcy_write_point.lfn);
if (rcy_log_point->rcy_write_point.lfn >= ctrl->lrp_point.lfn) {
return OG_SUCCESS;
}
cm_reset_error();
OG_THROW_ERROR(ERR_INVALID_RCV_END_POINT, rcy_log_point->rcy_point.asn, rcy_log_point->rcy_point.block_id,
rcy_log_point->rcy_point.asn, rcy_log_point->rcy_point.block_id);
return OG_ERROR;
}
bool8 dtc_rcy_check_recovery_is_done(knl_session_t *session, uint32 idx)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[idx];
if ((cm_dbs_is_enable_dbs() == OG_TRUE) && (session->kernel->db.recover_for_restore == OG_FALSE) &&
(rcy_node->ulog_exist_data == OG_FALSE)) {
rcy_node->recover_done = OG_TRUE;
return OG_TRUE;
}
return OG_FALSE;
}
static void dtc_standby_update_lrp(knl_session_t *session, uint32 idx, uint32 size_read)
{
OG_LOG_DEBUG_INF("[DTC RCY] dtc start standby update lrp idx=%u size_read=%u", idx, size_read);
if (DB_IS_PRIMARY(&session->kernel->db)) {
OG_LOG_DEBUG_INF("[DTC RCY] dtc standby update lrp idx=%u size_read=%u DB_IS_PRIMARY", idx, size_read);
return;
}
lrpl_context_t *lrpl_ctx = &session->kernel->lrpl_ctx;
if (lrpl_ctx->is_replaying == OG_FALSE) {
OG_LOG_DEBUG_INF("[DTC RCY] dtc standby update lrp idx=%u size_read=%u is not replaying ", idx, size_read);
return;
}
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[idx];
log_batch_t *batch = NULL;
log_batch_t *tmp_batch = dtc_rcy_get_curr_batch(dtc_rcy, idx, rcy_node->read_buf_write_index);
uint32 left_size;
for (;;) {
left_size = size_read - rcy_node->read_pos[rcy_node->read_buf_write_index];
OG_LOG_DEBUG_INF("[DTC RCY] dtc standby update lrp idx=%u size_read=%u process batch left_size=%u", idx,
size_read, left_size);
if (left_size < sizeof(log_batch_t) || left_size < tmp_batch->space_size) {
break;
}
batch = dtc_rcy_get_curr_batch(dtc_rcy, idx, rcy_node->read_buf_write_index);
rcy_node->read_pos[rcy_node->read_buf_write_index] += batch->space_size;
tmp_batch = dtc_rcy_get_curr_batch(dtc_rcy, idx, rcy_node->read_buf_write_index);
}
if (batch == NULL) {
OG_LOG_DEBUG_INF("[DTC RCY] dtc standby update lrp idx=%u size_read=%u batch==null", idx, size_read);
return;
}
rcy_node->read_pos[rcy_node->read_buf_write_index] = 0;
dtc_node_ctrl_t *ctrl = dtc_get_ctrl(session, idx);
OG_LOG_DEBUG_INF("[DTC RCY] ctrl lsn %llu lfn %llu ,log end lsn %llu, lfn %llu", ctrl->lsn, ctrl->lfn,
batch->head.point.lsn, (uint64)batch->head.point.lfn);
if (ctrl->lrp_point.lsn < batch->head.point.lsn) {
ctrl->lrp_point = batch->head.point;
ctrl->scn = DB_CURR_SCN(session);
ctrl->lsn = batch->head.point.lsn;
ctrl->lfn = (uint64)batch->head.point.lfn;
if (dtc_save_ctrl(session, idx) != OG_SUCCESS) {
CM_ABORT(0, "ABORT INFO: save core control file failed when update standby cluster ctrl");
}
}
return;
}
status_t dtc_rcy_read_node_log(knl_session_t *session, uint32 idx, uint32 *size_read)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[idx];
reform_rcy_node_t *rcy_log_point = &dtc_rcy->rcy_log_points[idx];
status_t status;
uint64_t tv_begin;
rcy_node->read_pos[rcy_node->read_buf_write_index] = 0;
rcy_node->write_pos[rcy_node->read_buf_write_index] = 0;
if (DB_IS_PRIMARY(&session->kernel->db) && rcy_node->recover_done) {
return OG_SUCCESS;
}
if (DB_IS_PRIMARY(&session->kernel->db) && dtc_rcy_check_recovery_is_done(session, idx)) {
return OG_SUCCESS;
}
uint32 logfile_id = dtc_rcy_get_logfile_by_node(session, idx);
if (logfile_id != OG_INVALID_ID32) {
oGRAC_record_io_stat_begin(IO_RECORD_EVENT_RECOVERY_READ_ONLINE_LOG, &tv_begin);
status = dtc_rcy_read_online_log(session, logfile_id, idx, size_read);
log_unlatch_file(session, logfile_id);
oGRAC_record_io_stat_end(IO_RECORD_EVENT_RECOVERY_READ_ONLINE_LOG, &tv_begin);
if (!DB_IS_PRIMARY(&session->kernel->db) && (*size_read == 0)) {
OG_LOG_DEBUG_INF("[DTC RCY] finish read online redo log of crashed node=%u, logfile_id=%u, size_read=%u",
rcy_node->node_id, logfile_id, *size_read);
} else {
dtc_standby_update_lrp(session, idx, *size_read);
if (dtc_stats_lsn_is_changed(&(rcy_node->lsn_records.read_log_lsn_record),
rcy_log_point->rcy_write_point.lsn)) {
OG_LOG_RUN_INF("[DTC RCY] finish read online redo log of crashed node=%u, logfile_id=%u, size_read=%u",
rcy_node->node_id, logfile_id, *size_read);
}
}
} else {
status = dtc_rcy_read_archived_log(session, idx, size_read);
OG_LOG_DEBUG_INF("[DTC RCY] dtc rcy read archived redo log of crashed node=%u, logfile_id=%u, size_read=%u",
rcy_node->node_id, logfile_id, *size_read);
if ((status != OG_SUCCESS) && (dtc_recover_check_assign_nodeid(session, idx) == OG_SUCCESS)) {
return OG_SUCCESS;
}
}
if (status == OG_ERROR) {
OG_LOG_RUN_ERR("[DTC RCY] failed to load redo log of crashed node=%u", rcy_node->node_id);
return OG_ERROR;
}
rcy_node->write_pos[rcy_node->read_buf_write_index] += *size_read;
if (dtc_rcy->rcy_stat.last_rcy_set_num <= 0) {
dtc_rcy->rcy_stat.last_rcy_log_size += *size_read;
}
return OG_SUCCESS;
}
static status_t dtc_read_all_logs(knl_session_t *session)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
for (uint32 i = 0; i < dtc_rcy->node_count; i++) {
if (dtc_init_node_logset(session, i) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to init logset for crashed node=%u", dtc_rcy->rcy_nodes[i].node_id);
return OG_ERROR;
}
if (cm_dbs_is_enable_dbs() == OG_TRUE) {
dtc_rcy->rcy_nodes[i].ulog_exist_data = dtc_rcy_check_log_is_exist(session, i);
}
}
return OG_SUCCESS;
}
status_t dtc_rcy_verify_analysis_and_recovery_log_point(log_point_t analysis_read_end_point,
log_point_t recovery_read_end_point)
{
if (analysis_read_end_point.asn != recovery_read_end_point.asn) {
return OG_ERROR;
}
if (analysis_read_end_point.block_id != recovery_read_end_point.block_id) {
return OG_ERROR;
}
if (analysis_read_end_point.lfn != recovery_read_end_point.lfn) {
return OG_ERROR;
}
if (analysis_read_end_point.lsn != recovery_read_end_point.lsn) {
return OG_ERROR;
}
if (analysis_read_end_point.rst_id != recovery_read_end_point.rst_id) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static uint64 dtc_rcy_get_ddl_pitr_lsn(knl_session_t *session, uint64 curr_batch_lsn)
{
if (session->kernel->db.recover_for_restore && session->kernel->db.ctrl.core.ddl_pitr_lsn != 0) {
return session->kernel->db.ctrl.core.ddl_pitr_lsn;
} else {
return curr_batch_lsn;
}
}
status_t dtc_find_next_batch(knl_session_t *session, log_batch_t **batch, uint32 cur_block_id, uint64 cur_lsn,
uint32 node_id)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[node_id];
reform_rcy_node_t *rcy_log_point = &dtc_rcy->rcy_log_points[node_id];
rcy_node->read_pos[rcy_node->read_buf_read_index] = rcy_node->write_pos[rcy_node->read_buf_read_index];
rcy_log_point->rcy_point.block_id = cur_block_id + 1;
if (cm_dbs_is_enable_dbs() == OG_TRUE) {
rcy_log_point->rcy_point.lsn = cur_lsn + 1;
}
OG_RETURN_IFERR(dtc_update_batch(session, node_id));
if (rcy_node->recover_done == OG_TRUE) {
OG_LOG_RUN_INF("recovery done");
return OG_SUCCESS;
}
*batch = dtc_rcy_get_curr_batch(dtc_rcy, node_id, rcy_node->read_buf_read_index);
return OG_SUCCESS;
}
status_t dtc_skip_batch(knl_session_t *session, log_batch_t **batch, uint32 node_id)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[node_id];
reform_rcy_node_t *rcy_log_point = &dtc_rcy->rcy_log_points[node_id];
rcy_node->read_pos[rcy_node->read_buf_read_index] += (*batch)->space_size;
rcy_log_point->rcy_point.block_id += (*batch)->space_size / rcy_node->blk_size;
if (cm_dbs_is_enable_dbs() == OG_TRUE) {
rcy_log_point->rcy_point.lsn = (*batch)->lsn;
}
OG_RETURN_IFERR(dtc_update_batch(session, node_id));
if (rcy_node->recover_done == OG_TRUE) {
OG_LOG_RUN_INF("recovery done");
return OG_SUCCESS;
}
*batch = dtc_rcy_get_curr_batch(dtc_rcy, node_id, rcy_node->read_buf_read_index);
return OG_SUCCESS;
}
status_t dtc_skip_damage_batch(knl_session_t *session, log_batch_t **batch, uint32 node_id)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[node_id];
reform_rcy_node_t *rcy_log_point = &dtc_rcy->rcy_log_points[node_id];
uint32 cur_block_id = rcy_log_point->rcy_point.block_id;
uint64 cur_lsn = rcy_log_point->rcy_point.lsn;
do {
if (dtc_rcy_validate_batch(*batch)) {
OG_LOG_RUN_ERR("[DTC RCY] failed to verify log batch checksum of instance %u with rcy point"
" [%u-%u/%u%llu], betch_lsn=%llu",
rcy_log_point->node_id, rcy_log_point->rcy_point.rst_id, rcy_log_point->rcy_point.asn,
rcy_log_point->rcy_point.block_id, (uint64)rcy_log_point->rcy_point.lfn, (*batch)->lsn);
OG_RETURN_IFERR(dtc_skip_batch(session, batch, node_id));
} else {
OG_LOG_RUN_ERR("[DTC RCY] batch is invalid, find next batch by block_id[%u] lsn[%llu]", cur_block_id,
cur_lsn);
OG_RETURN_IFERR(dtc_find_next_batch(session, batch, cur_block_id, cur_lsn, node_id));
cur_block_id++;
cur_lsn++;
}
if (rcy_node->recover_done == OG_TRUE) {
OG_LOG_RUN_INF("recovery done");
return OG_SUCCESS;
}
} while (!dtc_rcy_validate_batch(*batch) || (rcy_verify_checksum(session, *batch) != OG_SUCCESS));
OG_LOG_RUN_INF("find new batch and continue");
return OG_SUCCESS;
}
static bool32 dtc_standby_rcy_end(knl_session_t *session)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
for (uint32 node_id = 0; node_id < session->kernel->db.ctrl.core.node_count; node_id++) {
reform_rcy_node_t *rcy_log_point = &dtc_rcy->rcy_log_points[node_id];
dtc_node_ctrl_t *ctrl = dtc_get_ctrl(session, node_id);
if (rcy_log_point->rcy_point.lfn < ctrl->lrp_point.lfn) {
return OG_FALSE;
}
}
return OG_TRUE;
}
status_t dtc_update_batch(knl_session_t *session, uint32 node_id)
{
uint32 read_buf_size = g_instance->kernel.attr.rcy_node_read_buf_size;
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[node_id];
log_batch_t *batch = NULL;
uint32 left_size;
if (!DB_IS_PRIMARY(&session->kernel->db) && (DB_NOT_READY(session) || !dtc_rcy->full_recovery) &&
dtc_standby_rcy_end(session)) {
rcy_node->recover_done = OG_TRUE;
rcy_node->read_size[rcy_node->read_buf_read_index] = OG_INVALID_ID32;
rcy_node->read_buf_ready[rcy_node->read_buf_read_index] = OG_FALSE;
if (dtc_rcy->phase == PHASE_ANALYSIS) {
OG_LOG_RUN_INF("[DTC RCY] analysis read end point[asn(%u)-block_id(%u)-rst_id(%llu)-lfn(%llu)-lsn(%llu)]",
rcy_node->analysis_read_end_point.asn, rcy_node->analysis_read_end_point.block_id,
(uint64)rcy_node->analysis_read_end_point.rst_id,
(uint64)rcy_node->analysis_read_end_point.lfn, rcy_node->analysis_read_end_point.lsn);
}
if (dtc_rcy->phase == PHASE_RECOVERY) {
OG_LOG_RUN_INF("[DTC RCY] recovery read end point[asn(%u)-block_id(%u)-rst_id(%llu)-lfn(%llu)-lsn(%llu)]",
rcy_node->recovery_read_end_point.asn, rcy_node->recovery_read_end_point.block_id,
(uint64)rcy_node->recovery_read_end_point.rst_id,
(uint64)rcy_node->recovery_read_end_point.lfn, rcy_node->recovery_read_end_point.lsn);
}
return OG_SUCCESS;
}
wait_for_read_buf_finish_read(node_id);
if (rcy_node->read_size[rcy_node->read_buf_read_index] == 0) {
check_node_read_end(node_id);
rcy_node->read_size[rcy_node->read_buf_read_index] = OG_INVALID_ID32;
rcy_node->read_buf_ready[rcy_node->read_buf_read_index] = OG_FALSE;
OG_LOG_DEBUG_INF("dtc update batch rcy_node->read_size[rcy_node->read_buf_read_index] == 0 node_id=%u",
node_id);
return OG_SUCCESS;
}
batch = dtc_rcy_get_curr_batch(dtc_rcy, node_id, rcy_node->read_buf_read_index);
left_size = rcy_node->write_pos[rcy_node->read_buf_read_index] - rcy_node->read_pos[rcy_node->read_buf_read_index];
if (left_size < sizeof(log_batch_t) || left_size < batch->space_size) {
rcy_node->read_size[rcy_node->read_buf_read_index] = OG_INVALID_ID32;
rcy_node->read_buf_ready[rcy_node->read_buf_read_index] = OG_FALSE;
rcy_node->read_buf_read_index = (rcy_node->read_buf_read_index + 1) % read_buf_size;
OG_LOG_DEBUG_INF("[DTC RCY] dtc update batch left size < sizeof(log_batch_t)"
" node_id = %u read_buf_read_index = %u",
rcy_node->node_id, rcy_node->read_buf_read_index);
wait_for_read_buf_finish_read(node_id);
check_node_read_end(node_id);
}
return OG_SUCCESS;
}
static void find_max_lsn_and_move_point(uint32 idx, uint32 *size_read)
{
OG_LOG_DEBUG_INF("[DTC RCY] start find max lsn and move point idx=%u size_read=%u", idx, *size_read);
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[idx];
log_batch_t *batch = NULL;
log_batch_t *tmp_batch = dtc_rcy_get_curr_batch(dtc_rcy, idx, rcy_node->read_buf_write_index);
uint32 left_size = *size_read - rcy_node->read_pos[rcy_node->read_buf_write_index];
if (left_size < sizeof(log_batch_t) || tmp_batch == NULL || left_size < tmp_batch->space_size) {
OG_LOG_RUN_INF("[DTC RCY] before validate batch, find max lsn and move point left_size"
" < sizeof(log_batch_t) || left_size < tmp_batch->space_size");
return;
}
if (dtc_rcy_validate_batch(tmp_batch) == OG_FALSE) {
OG_LOG_RUN_ERR("[DTC RCY] find max lsn and move point batch is invalidate, read_size=%u", *size_read);
*size_read = 0;
return;
}
reform_rcy_node_t *rcy_log_point = &dtc_rcy->rcy_log_points[idx];
uint32 old_read_pos = rcy_node->read_pos[rcy_node->read_buf_write_index];
for (;;) {
left_size = *size_read - rcy_node->read_pos[rcy_node->read_buf_write_index];
if (left_size < sizeof(log_batch_t) || tmp_batch == NULL || left_size < tmp_batch->space_size) {
OG_LOG_RUN_INF("[DTC RCY] after validate batch, find max lsn and move point left_size "
"< sizeof(log_batch_t) || left_size < tmp_batch->space_size");
break;
}
batch = dtc_rcy_get_curr_batch(dtc_rcy, idx, rcy_node->read_buf_write_index);
rcy_log_point->rcy_write_point.block_id += batch->space_size / rcy_node->blk_size;
rcy_node->read_pos[rcy_node->read_buf_write_index] += batch->space_size;
left_size = *size_read - rcy_node->read_pos[rcy_node->read_buf_write_index];
tmp_batch = dtc_rcy_get_curr_batch(dtc_rcy, idx, rcy_node->read_buf_write_index);
if (left_size < sizeof(log_batch_t) || tmp_batch == NULL || left_size < tmp_batch->space_size) {
OG_LOG_DEBUG_INF("[DTC RCY] get next batch, find max lsn and move point left_size "
"< sizeof(log_batch_t) || left_size < tmp_batch->space_size");
break;
}
if (dtc_rcy_validate_batch(tmp_batch) == OG_FALSE) {
OG_LOG_RUN_ERR("[DTC RCY] find max lsn and move point batch is invalidate, read_size=%u", *size_read);
break;
}
}
if (batch == NULL) {
return;
}
rcy_node->read_pos[rcy_node->read_buf_write_index] = old_read_pos;
rcy_log_point->rcy_write_point.lsn = batch->lsn;
rcy_log_point->rcy_write_point.lfn = batch->head.point.lfn;
if (cm_dbs_is_enable_dbs() == OG_TRUE) {
rcy_log_point->rcy_write_point.lsn = batch->lsn;
}
OG_LOG_DEBUG_INF("[DTC RCY] finish find max lsn and move point idx=%u size_read=%u lsn=%llu block_id=%u", idx,
*size_read, rcy_log_point->lsn, rcy_log_point->rcy_point.block_id);
}
static status_t dtc_read_node_log(dtc_rcy_context_t *dtc_rcy, knl_session_t *session, uint32 node_id, uint32 *read_size)
{
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[node_id];
if (dtc_rcy_read_node_log(session, node_id, read_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to load redo log of crashed node=%u", rcy_node->node_id);
CM_ABORT(0, "ABORT INFO:dtc read node log failed");
return OG_ERROR;
}
if (*read_size == 0) {
bool32 not_finished = OG_TRUE;
dtc_rcy_next_file(session, node_id, ¬_finished);
if (not_finished) {
if (dtc_rcy_read_node_log(session, node_id, read_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to load redo log of instance=%u", rcy_node->node_id);
CM_ABORT(0, "ABORT INFO:dtc read node log failed");
return OG_ERROR;
}
}
rcy_node->not_finished[rcy_node->read_buf_write_index] = not_finished;
}
if (*read_size != 0) {
find_max_lsn_and_move_point(node_id, read_size);
}
return OG_SUCCESS;
}
bool32 dtc_log_need_reload(knl_session_t *session, uint32 node_id, bool32 batch_loaded)
{
lrpl_context_t *lrpl_ctx = &session->kernel->lrpl_ctx;
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
if (DB_IS_PRIMARY(&session->kernel->db) || (DB_NOT_READY(session) || !dtc_rcy->full_recovery) || node_id == 0) {
lrpl_ctx->redo_is_reload = OG_FALSE;
return OG_FALSE;
}
if (batch_loaded == OG_TRUE) {
lrpl_ctx->redo_is_reload = OG_FALSE;
return OG_FALSE;
}
dtc_rcy_node_t *rcy_node = &dtc_rcy->rcy_nodes[1];
if (rcy_node->read_pos[rcy_node->read_buf_read_index] != 0) {
lrpl_ctx->redo_is_reload = OG_FALSE;
return OG_FALSE;
}
OG_LOG_DEBUG_INF("[DTC LRPL] lrpl_ctx->redo_is_reload = %u, node_id = %u", lrpl_ctx->redo_is_reload, node_id);
if (lrpl_ctx->redo_is_reload) {
lrpl_ctx->redo_is_reload = OG_FALSE;
OG_LOG_DEBUG_INF("[DTC LRPL] redo no need reload");
return OG_FALSE;
}
lrpl_ctx->redo_is_reload = OG_TRUE;
OG_LOG_DEBUG_INF("[DTC LRPL] redo need reload");
return OG_TRUE;
}
static status_t dtc_rcy_fetch_log_batch(knl_session_t *session, log_batch_t **batch_out, uint32 *curr_node_idx)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
log_batch_t *batch = NULL;
bool32 batch_loaded = OG_FALSE;
dtc_rcy_node_t *rcy_node = NULL;
reform_rcy_node_t *rcy_log_point = NULL;
uint64 curr_batch_lsn = OG_INVALID_ID64;
uint8 curr_node;
*batch_out = NULL;
for (uint32 i = 0; i < dtc_rcy->node_count; i++) {
dtc_standby_reset_recovery_stat(session);
rcy_node = &dtc_rcy->rcy_nodes[i];
rcy_log_point = &dtc_rcy->rcy_log_points[i];
if (rcy_node->recover_done) {
OG_LOG_DEBUG_INF("[DTC RCY] dtc fetch log recover done node_id = %u", rcy_node->node_id);
if (!dtc_rcy->full_recovery && dtc_rcy->phase == PHASE_RECOVERY &&
dtc_rcy_verify_analysis_and_recovery_log_point(rcy_node->analysis_read_end_point,
rcy_node->recovery_read_end_point) != OG_SUCCESS) {
knl_panic_log(
0,
"[DTC RCY] analysis read end point[asn(%u)-block_id(%u)-rst_id(%llu)-lfn(%llu)-lsn(%llu)] is not "
"equal recovery read end point[asn(%u)-block_id(%u)-rst_id(%llu)-lfn(%llu)-lsn(%llu)]",
rcy_node->analysis_read_end_point.asn, rcy_node->analysis_read_end_point.block_id,
(uint64)rcy_node->analysis_read_end_point.rst_id, (uint64)rcy_node->analysis_read_end_point.lfn,
rcy_node->analysis_read_end_point.lsn, rcy_node->recovery_read_end_point.asn,
rcy_node->recovery_read_end_point.block_id, (uint64)rcy_node->recovery_read_end_point.rst_id,
(uint64)rcy_node->recovery_read_end_point.lfn, rcy_node->recovery_read_end_point.lsn);
}
continue;
}
OG_RETURN_IFERR(dtc_update_batch(session, i));
if (rcy_node->recover_done == OG_TRUE) {
OG_LOG_DEBUG_INF("[DTC RCY] read node log proc node is done node_id = %u", i);
continue;
}
if (rcy_node->read_buf_ready[rcy_node->read_buf_read_index] == OG_FALSE) {
OG_LOG_DEBUG_INF("[DTC RCY] read node log proc node buf not ready node_id = %u", i);
continue;
}
batch = dtc_rcy_get_curr_batch(dtc_rcy, i, rcy_node->read_buf_read_index);
OG_LOG_DEBUG_INF(
"[DTC RCY] fetch batch from instance %u point [%u-%u/%u/%llu],"
" head lfn:%llu, batch writepos:%u, readpos:%u, space_size:%u, current lsn:%llu, start lsn:%llu",
rcy_log_point->node_id, rcy_log_point->rcy_point.rst_id, rcy_log_point->rcy_point.asn,
rcy_log_point->rcy_point.block_id, (uint64)rcy_log_point->rcy_point.lfn, (uint64)batch->head.point.lfn,
rcy_node->write_pos[rcy_node->read_buf_read_index], rcy_node->read_pos[rcy_node->read_buf_read_index],
batch->space_size, rcy_log_point->rcy_point.lsn, rcy_log_point->rcy_point_saved.lsn);
uint32 left_size = rcy_node->write_pos[rcy_node->read_buf_read_index] -
rcy_node->read_pos[rcy_node->read_buf_read_index];
if (left_size < sizeof(log_batch_t) || batch == NULL || left_size < batch->space_size) {
OG_LOG_DEBUG_INF("[DTC RCY] recover fetch batch, find max lsn and move point left_size "
"< sizeof(log_batch_t) || left_size < tmp_batch->space_size");
continue;
}
if (!dtc_rcy_validate_batch(batch)) {
if (!(DB_IS_MAXFIX(session) && cm_dbs_is_enable_dbs())) {
rcy_node->recover_done = OG_TRUE;
OG_LOG_RUN_INF(
"[DTC RCY] Invalid batch from instance %u, recovery done with point [%u-%u/%u/%llu],"
" head lfn:%llu, batch writepos:%u, readpos:%u, space_size:%u, current lsn:%llu, start lsn:%llu",
rcy_log_point->node_id, rcy_log_point->rcy_point.rst_id, rcy_log_point->rcy_point.asn,
rcy_log_point->rcy_point.block_id, (uint64)rcy_log_point->rcy_point.lfn,
(uint64)batch->head.point.lfn, rcy_node->write_pos[rcy_node->read_buf_read_index],
rcy_node->read_pos[rcy_node->read_buf_read_index], batch->space_size, rcy_log_point->rcy_point.lsn,
rcy_log_point->rcy_point_saved.lsn);
continue;
}
}
if (!LFN_IS_CONTINUOUS(batch->head.point.lfn, rcy_log_point->rcy_point.lfn)) {
if (DB_IS_MAXFIX(session)) {
OG_LOG_RUN_WAR("[DTC RCY] damage log batch skipped,not continuous batch from instance %u, "
"recovery with point [%u-%u/%u/%llu/%llu],current point [%u-%u/%u/%llu/%llu]",
rcy_log_point->node_id, batch->head.point.rst_id, batch->head.point.asn,
batch->head.point.block_id, (uint64)batch->head.point.lfn, batch->head.point.lsn,
rcy_log_point->rcy_point.rst_id, rcy_log_point->rcy_point.asn,
rcy_log_point->rcy_point.block_id, (uint64)rcy_log_point->rcy_point.lfn,
rcy_log_point->rcy_point.lsn);
} else {
OG_LOG_RUN_INF("[DTC RCY] not continuous batch from instance %u, "
"recovery done with point [%u-%u/%u/%llu/%llu],current point [%u-%u/%u/%llu/%llu]",
rcy_log_point->node_id, batch->head.point.rst_id, batch->head.point.asn,
batch->head.point.block_id, (uint64)batch->head.point.lfn, batch->head.point.lsn,
rcy_log_point->rcy_point.rst_id, rcy_log_point->rcy_point.asn,
rcy_log_point->rcy_point.block_id, (uint64)rcy_log_point->rcy_point.lfn,
rcy_log_point->rcy_point.lsn);
CM_ABORT_REASONABLE(!cm_dbs_is_enable_dbs() || session->kernel->db.recover_for_restore,
"[DTC RCY] ABORT INFO: dbstor batch not continuous");
rcy_node->recover_done = OG_TRUE;
continue;
}
}
if (rcy_verify_checksum(session, batch) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to verify log batch checksum of instance %u with rcy point"
" [%u-%u/%u%llu], betch_lsn=%llu",
rcy_log_point->node_id, rcy_log_point->rcy_point.rst_id, rcy_log_point->rcy_point.asn,
rcy_log_point->rcy_point.block_id, (uint64)rcy_log_point->rcy_point.lfn, batch->lsn);
if (DB_IS_MAXFIX(session)) {
OG_RETURN_IFERR(dtc_skip_damage_batch(session, &batch, i));
if (rcy_node->recover_done == OG_TRUE) {
continue;
}
} else {
return OG_ERROR;
}
}
if (dtc_log_need_reload(session, i, batch_loaded)) {
break;
}
if (batch->lsn < curr_batch_lsn) {
*curr_node_idx = (uint8)i;
curr_node = rcy_node->node_id;
curr_batch_lsn = batch->lsn;
batch_loaded = OG_TRUE;
OG_LOG_DEBUG_INF(
"[DTC RCY] finish fetch batch from instance %u, recovery point [%u-%u/%u/%llu],"
" head lfn:%llu, batch writepos:%u, readpos:%u, space_size:%u, current lsn:%llu, start lsn:%llu",
rcy_log_point->node_id, rcy_log_point->rcy_point.rst_id, rcy_log_point->rcy_point.asn,
rcy_log_point->rcy_point.block_id, (uint64)rcy_log_point->rcy_point.lfn, (uint64)batch->head.point.lfn,
rcy_node->write_pos[rcy_node->read_buf_read_index], rcy_node->read_pos[rcy_node->read_buf_read_index],
batch->space_size, rcy_log_point->rcy_point.lsn, rcy_log_point->rcy_point_saved.lsn);
}
}
if (batch_loaded) {
rcy_node = &dtc_rcy->rcy_nodes[*curr_node_idx];
*batch_out = dtc_rcy_get_curr_batch(dtc_rcy, *curr_node_idx, rcy_node->read_buf_read_index);
dtc_print_batch(*batch_out, curr_node);
dtc_rcy->curr_node_idx = *curr_node_idx;
dtc_rcy->curr_node = curr_node;
dtc_rcy->curr_batch_lsn = curr_batch_lsn;
rcy_node = &dtc_rcy->rcy_nodes[*curr_node_idx];
rcy_log_point = &dtc_rcy->rcy_log_points[*curr_node_idx];
rcy_log_point->lsn = curr_batch_lsn;
rcy_log_point->rcy_point.lfn = (*batch_out)->head.point.lfn;
rcy_log_point->rcy_point.block_id += (*batch_out)->space_size / rcy_node->blk_size;
rcy_node->read_pos[rcy_node->read_buf_read_index] += (*batch_out)->space_size;
rcy_node->curr_file_length += (*batch_out)->space_size;
if (cm_dbs_is_enable_dbs() == OG_TRUE) {
rcy_log_point->rcy_point.lsn = curr_batch_lsn;
}
OG_LOG_DEBUG_INF("[DTC RCY] fetch batch lfn=%llu lsn=%llu", (uint64)rcy_log_point->rcy_point.lfn,
rcy_log_point->rcy_point.lsn);
if ((*batch_out)->head.point.lfn >= rcy_node->pitr_lfn && rcy_node->ddl_lsn_pitr == OG_INVALID_ID64) {
rcy_node->ddl_lsn_pitr = dtc_rcy_get_ddl_pitr_lsn(session, curr_batch_lsn);
OG_LOG_RUN_INF("[DTC RCY] batch lfn %llu, pitr_lfn %llu, rcy ddl lsn pitr[core %llu/curr %llu], node id %u",
(uint64)(*batch_out)->head.point.lfn, rcy_node->pitr_lfn,
session->kernel->db.ctrl.core.ddl_pitr_lsn, rcy_node->ddl_lsn_pitr, *curr_node_idx);
}
if (dtc_rcy->phase == PHASE_ANALYSIS) {
rcy_node->analysis_read_end_point = (*batch_out)->head.point;
} else if (dtc_rcy->phase == PHASE_RECOVERY) {
rcy_node->recovery_read_end_point = (*batch_out)->head.point;
}
OG_LOG_DEBUG_INF("[DTC RCY] Move log point to [%u-%u/%u/%llu] with read pos %u write pos %u for instance %u,"
" curr_batch_lsn=%llu",
rcy_log_point->rcy_point.rst_id, rcy_log_point->rcy_point.asn,
rcy_log_point->rcy_point.block_id, (uint64)rcy_log_point->rcy_point.lfn,
rcy_node->read_pos[rcy_node->read_buf_read_index],
rcy_node->write_pos[rcy_node->read_buf_read_index], rcy_node->node_id, curr_batch_lsn);
}
return OG_SUCCESS;
}
static uint64 dtc_rcy_get_ddl_lsn_pitr(void)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = NULL;
rcy_node = &dtc_rcy->rcy_nodes[dtc_rcy->curr_node_idx];
return rcy_node->ddl_lsn_pitr;
}
static void dtc_convert_scn_to_time(knl_session_t *session, uint64 batch_scn, char *time_str)
{
timeval_t time_val = { 0 };
KNL_SCN_TO_TIME(batch_scn, &time_val, DB_INIT_TIME(session));
time_t scn_time = cm_date2time(cm_timeval2date(time_val));
text_t fmt_text = { 0 };
cm_str2text("YYYY-MM-DD HH24:MI:SS", &fmt_text);
text_t time_text = { 0 };
time_text.str = time_str;
time_text.len = 0;
cm_time2text(scn_time, &fmt_text, &time_text, OG_MAX_TIME_STRLEN);
return;
}
status_t dtc_rcy_process_batch(knl_session_t *session, log_batch_t *batch)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
log_cursor_t cursor;
log_group_t *group = NULL;
log_context_t *ogx = &session->kernel->redo_ctx;
rcy_init_log_cursor(&cursor, batch);
group = log_fetch_group(ogx, &cursor);
if (group == NULL) {
OG_LOG_RUN_ERR("[DTC RCY] the group is NULL.");
return OG_ERROR;
}
uint64 batch_start_lsn = group->lsn;
while (group != NULL) {
if (dtc_rcy->phase == PHASE_RECOVERY) {
if (dtc_rcy_set_pitr_end_replay(session->kernel->db.recover_for_restore, group->lsn)) {
OG_LOG_RUN_INF("[DTC RCY] pcn is invalide, lsn=%llu, rmid=%u, batch_start_lsn=%llu", group->lsn,
group->rmid, batch_start_lsn);
break;
}
session->ddl_lsn_pitr = dtc_rcy_get_ddl_lsn_pitr();
rcy_replay_group(session, ogx, group);
OG_LOG_DEBUG_INF("[DTC RCY] before redo replay log group, lsn=%llu, rmid=%u, session->kernel->lsn=%llu",
group->lsn, group->rmid, session->kernel->lsn);
dtc_update_lsn(session, group->lsn);
OG_LOG_DEBUG_INF("[DTC RCY] after redo replay log group, lsn=%llu, rmid=%u, session->kernel->lsn=%llu",
group->lsn, group->rmid, session->kernel->lsn);
} else {
if (dtc_rcy_analyze_group(session, group) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to analyze redo log group, lsn %llu, rmid=%u", group->lsn,
group->rmid);
return OG_ERROR;
}
if (dtc_rcy_check_is_end_restore_recovery()) {
OG_LOG_RUN_INF("[DTC RCY] pcn is invalide, lsn=%llu, rmid=%u, batch_start_lsn=%llu, batch scn=%llu",
group->lsn, group->rmid, batch_start_lsn, batch->scn);
dtc_rcy->end_lsn_restore_recovery = batch_start_lsn;
uint64 pitr_scn = session->kernel->rcy_ctx.max_scn;
if (pitr_scn != OG_INVALID_ID64 && batch->scn < pitr_scn) {
char time_str[OG_MAX_TIME_STRLEN] = { 0 };
dtc_convert_scn_to_time(session, batch->scn, time_str);
OG_LOG_RUN_WAR("[DTC RCY] the end replay batch scn %llu is smaller than pitr scn %llu, "
"replay batch end time: %s",
batch->scn, pitr_scn, time_str);
}
break;
}
}
group = log_fetch_group(ogx, &cursor);
}
OG_LOG_DEBUG_INF("[DTC RCY] Log batch lfn=%llu, lsn=%llu, point [%u-%u/%u] has been processed for instance=%u",
(uint64)batch->head.point.lfn, batch->lsn, batch->head.point.rst_id, batch->head.point.asn,
batch->head.point.block_id, dtc_rcy->curr_node);
return OG_SUCCESS;
}
static status_t dtc_recover_check(knl_session_t *session)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
reform_rcy_node_t *rcy_log_point = NULL;
dtc_node_ctrl_t *ctrl = NULL;
status_t status = OG_SUCCESS;
for (uint32 i = 0; i < dtc_rcy->node_count; i++) {
rcy_log_point = &dtc_rcy->rcy_log_points[i];
ctrl = dtc_get_ctrl(session, rcy_log_point->node_id);
OG_LOG_RUN_INF("[DTC RCY] node:%u, recovery real end with file:%u,point:%u,lfn:%llu", rcy_log_point->node_id,
rcy_log_point->rcy_point.asn, rcy_log_point->rcy_point.block_id,
(uint64)rcy_log_point->rcy_point.lfn);
OG_LOG_RUN_INF("[DTC RCY] node:%u, current lfn %llu, rcy point lfn %llu, lrp point lfn %llu",
rcy_log_point->node_id, (uint64)rcy_log_point->rcy_point.lfn, (uint64)ctrl->rcy_point.lfn,
(uint64)(uint64)ctrl->lrp_point.lfn);
if (rcy_log_point->rcy_point.lfn >= ctrl->lrp_point.lfn) {
continue;
}
OG_LOG_RUN_ERR("[DTC RCY] failed to check dtc recovery rcy point");
cm_reset_error();
OG_THROW_ERROR(ERR_INVALID_RCV_END_POINT, rcy_log_point->rcy_point.asn, rcy_log_point->rcy_point.block_id,
ctrl->lrp_point.asn, ctrl->lrp_point.block_id);
status = OG_ERROR;
}
return status;
}
static status_t dtc_rcy_update_node_info(knl_session_t *session, reform_rcy_node_t *rcy_log_point)
{
dtc_node_ctrl_t *ctrl = NULL;
ctrl = dtc_get_ctrl(session, rcy_log_point->node_id);
knl_panic(DB_IS_MAXFIX(session) || log_cmp_point(&ctrl->rcy_point, &rcy_log_point->rcy_point) <= 0);
knl_panic(DB_IS_MAXFIX(session) || ctrl->rcy_point.lfn <= rcy_log_point->rcy_point.lfn);
ctrl->rcy_point = rcy_log_point->rcy_point;
ctrl->lrp_point = rcy_log_point->rcy_point;
ctrl->consistent_lfn = rcy_log_point->rcy_point.lfn;
ctrl->lsn = rcy_log_point->lsn;
ctrl->lfn = rcy_log_point->rcy_point.lfn;
OG_LOG_RUN_INF("[DTC RCY] Update ctrl rcy point to [%u-%u/%u/%llu/%llu] for instance %u", ctrl->rcy_point.rst_id,
ctrl->rcy_point.asn, ctrl->rcy_point.block_id, (uint64)ctrl->rcy_point.lfn, ctrl->rcy_point.lsn,
rcy_log_point->node_id);
if (dtc_save_ctrl(session, rcy_log_point->node_id) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t dtc_rcy_update_ckpt_log_point(knl_session_t *session)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
reform_rcy_node_t *rcy_log_point = NULL;
for (uint32 i = 0; i < dtc_rcy->node_count; i++) {
rcy_log_point = &dtc_rcy->rcy_log_points[i];
if (dtc_rcy_update_node_info(session, rcy_log_point) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to update node info");
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static void dtc_rcy_update_ckpt_prcy_info(knl_session_t *session)
{
ckpt_context_t *ogx = &session->kernel->ckpt_ctx;
OG_LOG_RUN_INF("[DTC RCY] save ckpt end point, prcy_trunc_point.asn=%u, prcy_trunc_point.block_id=%u"
"prcy_trunc_point.rst_id=%d, prcy_trunc_point.lfn=%llu",
g_rc_ctx->prcy_trunc_point.asn, g_rc_ctx->prcy_trunc_point.block_id,
g_rc_ctx->prcy_trunc_point.rst_id, (uint64)g_rc_ctx->prcy_trunc_point.lfn);
cm_spin_lock(&ogx->queue.lock, &session->stat->spin_stat.stat_ckpt_queue);
g_rc_ctx->prcy_trunc_point = ogx->queue.trunc_point;
cm_spin_unlock(&ogx->queue.lock);
}
static bool32 ckpt_prcy_flush_check(knl_session_t *session)
{
if (!DB_IS_CLUSTER(session)) {
return OG_TRUE;
}
if (rc_is_master() == OG_FALSE) {
return OG_TRUE;
}
ckpt_context_t *ogx = &session->kernel->ckpt_ctx;
if (ogx->queue.first != NULL && log_cmp_point(&ogx->queue.first->trunc_point, &g_rc_ctx->prcy_trunc_point) <= 0) {
return OG_FALSE;
}
OG_LOG_DEBUG_INF("[CKPT] finish checkpoint");
return OG_TRUE;
}
#define CHECK_INTERVAL 100
status_t dtc_update_ckpt_log_point(void)
{
OG_LOG_RUN_INF("[RC][partial start] start waiting prcy ckpt done, session->kernel->lsn=%llu, "
"g_rc_ctx->status=%u",
((knl_session_t *)g_rc_ctx->session)->kernel->lsn, g_rc_ctx->status);
SYNC_POINT_GLOBAL_START(OGRAC_PART_RECOVERY_BEFORE_CKPT_ABORT, NULL, 0);
SYNC_POINT_GLOBAL_END;
uint32 loop = 0;
for (;;) {
OG_RETVALUE_IFTRUE(rc_reform_cancled(), OG_ERROR);
if (loop % CHECK_INTERVAL == 0) {
ckpt_trigger(g_rc_ctx->session, OG_FALSE, CKPT_TRIGGER_INC);
if (ckpt_prcy_flush_check(g_rc_ctx->session)) {
break;
}
}
cm_sleep(DTC_REFORM_WAIT_TIME);
loop++;
}
OG_LOG_RUN_INF("[RC][partial start] finish waiting prcy ckpt done, session->kernel->lsn=%llu, "
"g_rc_ctx->status=%u",
((knl_session_t *)g_rc_ctx->session)->kernel->lsn, g_rc_ctx->status);
return dtc_rcy_update_ckpt_log_point(g_rc_ctx->session);
}
static void dtc_rcy_set_num_stat(void)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
rcy_set_item_pool_t *pool = dtc_rcy->rcy_set.item_pools;
while (pool != NULL) {
dtc_rcy->rcy_stat.last_rcy_set_num += pool->hwm;
pool = pool->next;
}
}
static void dtc_rcy_wait_paral_replay_end(knl_session_t *session)
{
rcy_context_t *rcy = &session->kernel->rcy_ctx;
rcy_wait_replay_complete(session);
rcy->rcy_end = OG_TRUE;
}
static bool32 dtc_rcy_pitr_replay_end(rcy_context_t *rcy, log_batch_t *batch)
{
if (batch->scn <= rcy->max_scn) {
return OG_FALSE;
}
OG_LOG_RUN_INF("[DTC RCY] until time recover done");
return OG_TRUE;
}
static bool32 dtc_rcy_full_recovery_replay_end(rcy_context_t *rcy, log_batch_t *batch)
{
if (batch->lsn <= rcy->max_lrp_lsn) {
return OG_FALSE;
}
OG_LOG_RUN_INF("[DTC RCY] until lrp[%llu] full_recover done", batch->lsn);
return OG_TRUE;
}
static status_t dtc_rcy_process_batches(knl_session_t *session)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
log_batch_t *batch = NULL;
status_t status = OG_SUCCESS;
rcy_context_t *rcy = &session->kernel->rcy_ctx;
timeval_t elapsed_begin;
uint64 used_time;
uint64 fetch_log_time = 0;
uint64 replay_log_time = 0;
uint32 curr_node_idx = 0;
ELAPSED_BEGIN(elapsed_begin);
if (dtc_read_all_logs(session) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to load log files");
return OG_ERROR;
}
ELAPSED_END(elapsed_begin, used_time);
OG_LOG_RUN_INF("[DTC RCY] dtc_read_all_logs used %llu", used_time);
knl_session_t *ss = NULL;
if (g_knl_callback.alloc_knl_session(OG_TRUE, (knl_handle_t *)&ss) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] dtc rcy proc init failed as alloc session failed");
return OG_ERROR;
}
if (OG_SUCCESS != cm_create_thread(dtc_rcy_read_node_log_proc, 0, ss, &dtc_rcy->read_log_thread)) {
OG_LOG_RUN_ERR("[DTC RCY] failed to create thread read node log proc");
return OG_ERROR;
}
ELAPSED_BEGIN(elapsed_begin);
if (dtc_rcy_fetch_log_batch(session, &batch, &curr_node_idx) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to extract log batch");
return OG_ERROR;
}
ELAPSED_END(elapsed_begin, fetch_log_time);
while (batch != NULL) {
if (session->canceled) {
OG_THROW_ERROR(ERR_OPERATION_CANCELED);
status = OG_ERROR;
break;
}
if (session->killed) {
OG_THROW_ERROR(ERR_OPERATION_KILLED);
status = OG_ERROR;
break;
}
if (dtc_rcy->canceled) {
OG_LOG_RUN_ERR("[DTC RCY] required to cancel this dtc recovery task");
break;
}
if (dtc_rcy_pitr_replay_end(rcy, batch)) {
break;
}
if (dtc_rcy_full_recovery_replay_end(rcy, batch)) {
break;
}
ELAPSED_BEGIN(elapsed_begin);
if (dtc_rcy_process_batch(session, batch) != OG_SUCCESS) {
status = OG_ERROR;
ELAPSED_END(elapsed_begin, used_time);
break;
}
ELAPSED_END(elapsed_begin, used_time);
replay_log_time += used_time;
ELAPSED_BEGIN(elapsed_begin);
if (dtc_rcy_check_is_end_restore_recovery()) {
break;
}
if (dtc_rcy_fetch_log_batch(session, &batch, &curr_node_idx) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to extract log batch");
status = OG_ERROR;
break;
}
ELAPSED_END(elapsed_begin, used_time);
fetch_log_time += used_time;
}
if (close_read_log_proc(&dtc_rcy->read_log_thread, ss) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] close read log proc time out");
return OG_ERROR;
}
OG_LOG_RUN_INF("[DTC RCY] dtc_rcy_fetch_log_batch used=%llu", fetch_log_time);
OG_LOG_RUN_INF("[DTC RCY] dtc_rcy_process_batch used=%llu", replay_log_time);
return status;
}
static void dtc_rcy_atomic_list_init(dtc_rcy_atomic_list *list)
{
list->begin = 0;
list->end = 0;
list->writed_end = 0;
list->lock = 0;
}
static uint32 dtc_rcy_atomic_list_pop(dtc_rcy_atomic_list *list)
{
int64 begin;
int64 end;
uint32 val;
uint32 prarl_buf_list_size = g_instance->kernel.attr.dtc_rcy_paral_buf_list_size;
cm_spin_lock(&list->lock, NULL);
do {
begin = list->begin;
end = list->writed_end;
if (begin == end) {
cm_spin_unlock(&list->lock);
return OG_INVALID_INT32;
}
val = list->array[begin % prarl_buf_list_size];
} while (!cm_atomic_cas(&list->begin, begin, begin + 1));
cm_spin_unlock(&list->lock);
return val;
}
static bool8 dtc_rcy_atomic_list_push(dtc_rcy_atomic_list *list, uint32 val)
{
int64 begin;
int64 end;
cm_spin_lock(&list->lock, NULL);
uint32 prarl_buf_list_size = g_instance->kernel.attr.dtc_rcy_paral_buf_list_size;
do {
begin = list->begin;
end = list->end;
if (begin + prarl_buf_list_size == end) {
cm_spin_unlock(&list->lock);
return OG_FALSE;
}
} while (!cm_atomic_cas(&list->end, end, end + 1));
list->array[end % prarl_buf_list_size] = val;
while (!cm_atomic_cas(&list->writed_end, end, end + 1)) {
continue;
}
cm_spin_unlock(&list->lock);
return OG_TRUE;
}
static void dtc_rcy_free_list_in_analyze_paral(aligned_buf_t *list, uint32 num)
{
for (uint32 i = 0; i < num; i++) {
cm_aligned_free(&list[i]);
}
}
static void dtc_rcy_analyze_paral_proc(thread_t *thread)
{
knl_session_t *session = (knl_session_t *)thread->argument;
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
log_cursor_t cursor;
log_group_t *group = NULL;
log_batch_t *batch = NULL;
status_t ret;
uint32 idx;
log_context_t *ogx = &session->kernel->redo_ctx;
dtc_rcy_inc_rcy_set_ref_num();
while (!g_analyze_paral_mgr.killed_flag) {
idx = dtc_rcy_atomic_list_pop(&g_analyze_paral_mgr.used_list);
if (idx == OG_INVALID_INT32) {
if (g_analyze_paral_mgr.read_log_end_flag) {
break;
}
cm_sleep(1);
continue;
}
batch = (log_batch_t *)g_analyze_paral_mgr.buf_list[idx].aligned_buf;
OG_LOG_DEBUG_INF("[DTC RCY] log batch with lsn=%llu, lfn=%llu, rst_id=%u, asn=%u, block_id=%u, idx=%u, start "
"process for instance=%u",
batch->lsn, (uint64)batch->head.point.lfn, batch->head.point.rst_id, batch->head.point.asn,
batch->head.point.block_id, idx, dtc_rcy->curr_node);
rcy_init_log_cursor(&cursor, batch);
group = log_fetch_group(ogx, &cursor);
while (group != NULL) {
if (dtc_rcy_analyze_group(session, group) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to analyze redo log group, lsn=%llu, rmid=%u", group->lsn,
group->rmid);
dtc_rcy->failed = OG_TRUE;
break;
}
group = log_fetch_group(ogx, &cursor);
}
ret = dtc_rcy_atomic_list_push(&g_analyze_paral_mgr.free_list, idx);
OG_LOG_DEBUG_INF("[DTC RCY] log batch with lsn=%llu, lfn=%llu, rst_id=%u, asn=%u, block_id=%u, idx=%u has been"
" processed for instance=%u",
batch->lsn, (uint64)batch->head.point.lfn, batch->head.point.rst_id, batch->head.point.asn,
batch->head.point.block_id, idx, dtc_rcy->curr_node);
knl_panic_log(ret == OG_TRUE, "[DTC RCY] paral redo log analyze, push used buffer=%u into free list error",
idx);
}
cm_atomic32_dec(&g_analyze_paral_mgr.running_thread_num);
dtc_rcy_dec_rcy_set_ref_num();
OG_LOG_RUN_INF("[DTC RCY] dtc_rcy_analyze_paral_proc finish, rcy_set ref num=%u", dtc_rcy->rcy_set_ref_num);
}
static bool32 is_min_batch_lsn(uint64 batch_lsn, knl_scn_t *batch_scn, bool32 *has_batch)
{
log_batch_t *batch = NULL;
uint32 prarl_buf_list_size = g_instance->kernel.attr.dtc_rcy_paral_buf_list_size;
for (uint32 idx = 0; idx < prarl_buf_list_size; idx++) {
*batch_scn = MAX(*batch_scn, g_replay_paral_mgr.batch_scn[idx]);
if (g_replay_paral_mgr.group_num[idx] == 0) {
continue;
}
*has_batch = OG_TRUE;
batch = (log_batch_t *)g_replay_paral_mgr.buf_list[idx].aligned_buf;
if (batch_lsn > batch->lsn) {
OG_LOG_DEBUG_INF("batch_lsn %llu is not min, batch->lsn %llu", batch_lsn, batch->lsn);
return OG_FALSE;
}
}
return OG_TRUE;
}
void dtc_update_standby_cluster_scn(knl_session_t *session, uint32 idx)
{
if (DB_IS_PRIMARY(&session->kernel->db) || OGRAC_PART_RECOVERY(session)) {
return;
}
knl_scn_t batch_scn = 0;
bool32 has_batch = OG_FALSE;
lrpl_context_t *lrpl_ctx = &session->kernel->lrpl_ctx;
log_batch_t *batch = (log_batch_t *)g_replay_paral_mgr.buf_list[idx].aligned_buf;
uint32 node_id = g_replay_paral_mgr.node_id[idx];
lrpl_ctx->dtc_curr_point[node_id] = batch->lsn > lrpl_ctx->dtc_curr_point[node_id].lsn
? batch->head.point
: lrpl_ctx->dtc_curr_point[node_id];
date_t rcy_time = cm_now() - g_replay_paral_mgr.batch_rpl_start_time[idx];
if (rcy_time != 0) {
lrpl_ctx->lrpl_speed = (double)(batch->space_size) * MICROSECS_PER_SECOND / SIZE_M(1) / ((double)rcy_time);
}
if (!is_min_batch_lsn(batch->lsn, &batch_scn, &has_batch)) {
return;
}
batch_scn = has_batch ? g_replay_paral_mgr.batch_scn[idx] : batch_scn;
OG_LOG_DEBUG_INF("update scn, old scn %llu, new scn %llu", session->kernel->scn, batch_scn);
if (batch_scn > session->kernel->scn) {
KNL_SET_SCN(&session->kernel->scn, batch_scn);
if (session->kernel->attr.enable_boc) {
tx_scn_broadcast(session);
}
}
log_context_t *ogx = &session->kernel->redo_ctx;
log_point_t curr_point = dtc_get_ctrl(session, g_replay_paral_mgr.node_id[idx])->rcy_point;
log_point_t lrp_point = dtc_get_ctrl(session, g_replay_paral_mgr.node_id[idx])->lrp_point;
OG_LOG_DEBUG_INF(
"[YJJ DEBUG] dtc_update_standby_cluster_scn, node_id: %d, batch->head.point: lfn: %llu, lsn: %llu; redo_ctx.curr_point: lfn: %llu, lsn: %llu; ctrl.curr_point: lfn: %llu, lsn: %llu; ctrl.curr_point: lfn: %llu, lsn: %llu",
g_replay_paral_mgr.node_id[idx], (uint64)batch->head.point.lfn, batch->head.point.lsn,
(uint64)ogx->curr_point.lfn, ogx->curr_point.lsn, (uint64)curr_point.lfn, curr_point.lsn, (uint64)lrp_point.lfn,
lrp_point.lsn);
ckpt_set_trunc_point_slave_role(session, &batch->head.point, g_replay_paral_mgr.node_id[idx]);
return;
}
void dtc_rcy_atomic_dec_group_num(knl_session_t *session, uint32 idx, int32 val)
{
status_t ret;
if (cm_atomic32_add(&g_replay_paral_mgr.group_num[idx], -val) == 0) {
dtc_update_standby_cluster_scn(session, idx);
ret = dtc_rcy_atomic_list_push(&g_replay_paral_mgr.free_list, idx);
knl_panic_log(ret == OG_TRUE, "[DTC RCY] push into free list error");
}
}
static void dtc_rcy_paral_replay_batch(knl_session_t *session, log_cursor_t *cursor, uint32 idx)
{
knl_instance_t *kernel = session->kernel;
rcy_context_t *rcy = &kernel->rcy_ctx;
log_group_t *group = NULL;
bool32 logic = OG_FALSE;
rcy_paral_group_t *next_paral_group = NULL;
log_context_t *ogx = &session->kernel->redo_ctx;
uint32 group_slot = rcy->curr_group_id;
knl_session_t *redo_session = session->kernel->sessions[SESSION_ID_KERNEL];
redo_session->dtc_session_type = session->dtc_session_type;
rcy->curr_group = (rcy_paral_group_t *)g_replay_paral_mgr.group_list[idx].aligned_buf;
g_replay_paral_mgr.group_num[idx] = DTC_RCY_GROUP_NUM_BASE;
g_replay_paral_mgr.batch_scn[idx] = 0;
g_replay_paral_mgr.batch_rpl_start_time[idx] = cm_now();
for (;;) {
group = log_fetch_group(ogx, cursor);
if (group == NULL) {
OG_LOG_DEBUG_INF("paral redo replay, fetch current log group is NULL");
break;
}
if (dtc_rcy_set_pitr_end_replay(session->kernel->db.recover_for_restore, group->lsn)) {
OG_LOG_RUN_INF("[DTC RCY] pcn is invalide, lsn=%llu, rmid=%u", group->lsn, group->rmid);
break;
}
redo_session->curr_lsn = group->lsn;
rcy_add_pages(rcy->curr_group, group, group_slot, rcy, &logic, &next_paral_group);
g_replay_paral_mgr.batch_scn[idx] = MAX(g_replay_paral_mgr.batch_scn[idx], rcy->curr_group->group_scn);
group_slot++;
rcy->curr_group_id = group_slot;
cm_atomic_set(&rcy->preload_hwm, (int64)rcy->page_list_count);
if (logic) {
rcy->wait_stats_view[LOGIC_GROUP_COUNT]++;
rcy->curr_group->ddl_lsn_pitr = dtc_rcy_get_ddl_lsn_pitr();
rcy_replay_logic_group(session, rcy->curr_group);
} else {
cm_atomic32_inc(&g_replay_paral_mgr.group_num[idx]);
rcy->curr_group->group_list_idx = idx;
rcy_add_replay_bucket(rcy->curr_group, rcy);
}
OG_LOG_DEBUG_INF("[DTC RCY] redo replay log group lsn=%llu, rmid=%u, kernel lsn=%llu, "
"id=%u, group_tid=%u, inc_idx=%u, enter_cnt=%u",
group->lsn, group->rmid, session->kernel->lsn, rcy->curr_group->id, rcy->curr_group->tx_id,
rcy->curr_group->group_list_idx, rcy->curr_group->enter_count);
dtc_update_lsn(session, group->lsn);
OG_LOG_DEBUG_INF("[DTC RCY] updated kernel->session->lsn=%llu", session->kernel->lsn);
rcy->curr_group = next_paral_group;
}
dtc_rcy_atomic_dec_group_num(session, idx, DTC_RCY_GROUP_NUM_BASE);
OG_LOG_DEBUG_INF("[DTC RCY] finish paral redo replay of log batch=%u", idx);
return;
}
static void dtc_close_analyze_proc()
{
for (uint32 i = 0; i < PARAL_ANALYZE_THREAD_NUM; i++) {
cm_close_thread(&g_analyze_paral_mgr.thread[i]);
}
}
static status_t dtc_rcy_analyze_batches_paral(knl_session_t *session)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_stat_t *stat = &dtc_rcy->rcy_stat;
int64 lgwr_buf_size = (int64)LOG_LGWR_BUF_SIZE(session);
rcy_context_t *rcy = &session->kernel->rcy_ctx;
log_batch_t *batch = NULL;
status_t status = OG_SUCCESS;
errno_t ret;
uint32 idx;
uint32 curr_node_idx = 0;
g_analyze_paral_mgr.killed_flag = OG_FALSE;
g_analyze_paral_mgr.read_log_end_flag = OG_FALSE;
g_analyze_paral_mgr.running_thread_num = 0;
uint32 prarl_buf_list_size = g_instance->kernel.attr.dtc_rcy_paral_buf_list_size;
OG_LOG_RUN_INF("[DTC RCY] paral redo log analyze start, dtc_rcy->phase=%u, session->id=%u", dtc_rcy->phase,
session->id);
dtc_rcy_atomic_list_init(&g_analyze_paral_mgr.free_list);
dtc_rcy_atomic_list_init(&g_analyze_paral_mgr.used_list);
for (uint32 i = 0; i < prarl_buf_list_size; i++) {
g_analyze_paral_mgr.free_list.array[i] = i;
if (cm_aligned_malloc(lgwr_buf_size, "dtc rcy read buffer", &g_analyze_paral_mgr.buf_list[i]) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to alloc log read buffer in paral analyze, buffer list id=%u, "
"lgwr_buf_size=%llu",
i, lgwr_buf_size);
dtc_rcy_free_list_in_analyze_paral(g_analyze_paral_mgr.buf_list, i);
return OG_ERROR;
}
}
g_analyze_paral_mgr.free_list.end = prarl_buf_list_size;
g_analyze_paral_mgr.free_list.writed_end = prarl_buf_list_size;
SYNC_POINT_GLOBAL_START(OGRAC_RECOVERY_ANAL_READ_LOG_FAIL, &status, OG_ERROR);
status = dtc_read_all_logs(session);
SYNC_POINT_GLOBAL_END;
if (status != OG_SUCCESS) {
dtc_rcy_free_list_in_analyze_paral(g_analyze_paral_mgr.buf_list, prarl_buf_list_size);
session->canceled = OG_TRUE;
OG_LOG_RUN_ERR("[DTC RCY] failed to load first log file in paral analyze, dtc_rcy->failed=%u. "
"session->canceled=%u",
dtc_rcy->failed, session->canceled);
return OG_ERROR;
}
knl_session_t *ss = NULL;
if (g_knl_callback.alloc_knl_session(OG_TRUE, (knl_handle_t *)&ss) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] dtc rcy proc init failed as alloc session failed");
return OG_ERROR;
}
if (OG_SUCCESS != cm_create_thread(dtc_rcy_read_node_log_proc, 0, ss, &dtc_rcy->read_log_thread)) {
OG_LOG_RUN_ERR("[DTC RCY] failed to create thread read node log proc");
return OG_ERROR;
}
if (dtc_rcy_fetch_log_batch(session, &batch, &curr_node_idx) != OG_SUCCESS) {
dtc_rcy_free_list_in_analyze_paral(g_analyze_paral_mgr.buf_list, prarl_buf_list_size);
OG_LOG_RUN_ERR("[DTC RCY] failed to extract first log batch in paral analyze, dtc_rcy->failed=%u. "
"session->canceled=%u",
dtc_rcy->failed, session->canceled);
return OG_ERROR;
}
for (uint32 i = 0; i < PARAL_ANALYZE_THREAD_NUM; i++) {
status = cm_create_thread(dtc_rcy_analyze_paral_proc, 0, (void *)session, &g_analyze_paral_mgr.thread[i]);
if (status == OG_SUCCESS) {
g_analyze_paral_mgr.running_thread_num++;
} else {
OG_LOG_RUN_ERR("[DTC RCY] failed to create paral analyze thread=%u", i);
batch = NULL;
g_analyze_paral_mgr.killed_flag = OG_TRUE;
break;
}
}
while (batch != NULL) {
if (session->canceled) {
OG_LOG_RUN_ERR("[DTC RCY] rcy session is canceled, session->id=%u", session->id);
OG_THROW_ERROR(ERR_OPERATION_CANCELED);
status = OG_ERROR;
g_analyze_paral_mgr.killed_flag = OG_TRUE;
break;
}
if (session->killed) {
OG_LOG_RUN_ERR("[DTC RCY] rcy session is canceled, session->id=%u", session->id);
OG_THROW_ERROR(ERR_OPERATION_KILLED);
status = OG_ERROR;
g_analyze_paral_mgr.killed_flag = OG_TRUE;
break;
}
if (dtc_rcy->canceled) {
OG_LOG_RUN_ERR("[DTC RCY] required to cancel this dtc recovery task");
g_analyze_paral_mgr.killed_flag = OG_TRUE;
break;
}
if (batch->scn > rcy->max_scn) {
OG_LOG_RUN_INF("[DTC RCY] log batch->scn=%llu is larger than rcy->max_scn=%llu, recovery done", batch->scn,
rcy->max_scn);
break;
}
idx = dtc_rcy_atomic_list_pop(&g_analyze_paral_mgr.free_list);
if (idx == OG_INVALID_INT32) {
continue;
}
ret = memcpy_sp(g_analyze_paral_mgr.buf_list[idx].aligned_buf, lgwr_buf_size, (char *)batch, batch->space_size);
knl_securec_check(ret);
knl_panic_log(dtc_rcy_atomic_list_push(&g_analyze_paral_mgr.used_list, idx),
"[DTC RCY] push buffer of idx %u from free list into used list error", idx);
OG_LOG_DEBUG_INF("log batch [%llu/%llu/%u] push to idx=%u", batch->lsn, (uint64)batch->head.point.lfn,
batch->head.point.block_id, idx);
if (dtc_rcy_fetch_log_batch(session, &batch, &curr_node_idx) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to extract log batch in paral analyze");
status = OG_ERROR;
g_analyze_paral_mgr.killed_flag = OG_TRUE;
break;
}
}
if (close_read_log_proc(&dtc_rcy->read_log_thread, ss) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] close read log proc time out");
return OG_ERROR;
}
if (g_analyze_paral_mgr.killed_flag == OG_FALSE) {
g_analyze_paral_mgr.read_log_end_flag = OG_TRUE;
}
while (cm_atomic32_get(&g_analyze_paral_mgr.running_thread_num) > 0) {
cm_sleep(1);
}
dtc_close_analyze_proc();
dtc_rcy_free_list_in_analyze_paral(g_analyze_paral_mgr.buf_list, prarl_buf_list_size);
session->canceled = dtc_rcy->canceled ? OG_TRUE : OG_FALSE;
OG_LOG_RUN_INF("[DTC RCY] paral redo log analyze finish, dtc_rcy->phase=%u, session->id=%u, "
"need replay redo size total(M)=%llu",
dtc_rcy->phase, session->id, stat->last_rcy_log_size / SIZE_M(1));
OG_LOG_RUN_INF("[DTC RCY] dtc_rcy canceled=%u, session canceled=%u", dtc_rcy->canceled, session->canceled);
return status;
}
status_t dtc_lrpl_load_log_batch(knl_session_t *session, log_batch_t **batch, uint32 *curr_node_idx)
{
lrpl_context_t *lrpl = &session->kernel->lrpl_ctx;
while (*batch == NULL) {
if (dtc_rcy_fetch_log_batch(session, batch, curr_node_idx) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC LRPL] failed to extract log batch in paral replay");
return OG_ERROR;
}
if (lrpl->is_closing && (*batch == NULL)) {
OG_LOG_RUN_INF("[DTC LRPL] lrpl will be closed and cur log batch is null, retry fetch log batch");
if (dtc_rcy_fetch_log_batch(session, batch, curr_node_idx) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC LRPL] failed to extract log batch in paral replay");
return OG_ERROR;
}
if (*batch == NULL) {
OG_LOG_RUN_INF("[DTC LRPL] lrpl replay end");
return OG_SUCCESS;
}
}
}
return OG_SUCCESS;
}
void dtc_standby_reset_recovery_stat(knl_session_t *session)
{
if (DB_IS_PRIMARY(&session->kernel->db)) {
return;
}
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_node_t *rcy_node = NULL;
for (uint32 i = 0; i < dtc_rcy->node_count; i++) {
rcy_node = &dtc_rcy->rcy_nodes[i];
if (rcy_node->recover_done == OG_TRUE) {
continue;
}
if (rcy_node->read_pos[rcy_node->read_buf_read_index] != 0) {
OG_LOG_DEBUG_INF("[DTC LRPL] no need reset node recovery_done, node %u read pos %u", i,
rcy_node->read_pos[rcy_node->read_buf_read_index]);
return;
}
}
for (uint32 i = 0; i < dtc_rcy->node_count; i++) {
rcy_node = &dtc_rcy->rcy_nodes[i];
rcy_node->recover_done = OG_FALSE;
if (cm_dbs_is_enable_dbs() == OG_TRUE) {
rcy_node->ulog_exist_data = OG_TRUE;
}
}
OG_LOG_DEBUG_INF("[DTC LRPL] reset node recovery_done info to false");
return;
}
bool32 dtc_rcy_need_continue(knl_session_t *session, log_batch_t **batch, uint32 *curr_node_idx)
{
if (*batch != NULL) {
return OG_TRUE;
}
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
if (!DB_IS_PRIMARY(&session->kernel->db)) {
if ((DB_NOT_READY(session) || !dtc_rcy->full_recovery)) {
return OG_FALSE;
}
if (dtc_lrpl_load_log_batch(session, batch, curr_node_idx) != OG_SUCCESS) {
CM_ABORT_REASONABLE(0, "[DTC RCY] ABORT INFO:lrpl failed to load log batch in paral replay");
return OG_FALSE;
}
}
return (*batch != NULL);
}
static void dtc_release_rcy_page_list(knl_session_t *session)
{
knl_instance_t *kernel = session->kernel;
rcy_context_t *rcy = &kernel->rcy_ctx;
if (rcy->page_list_count < RCY_PAGE_LIST_RELEASE_THRESHOLD) {
return;
}
OG_LOG_RUN_INF("[DTC RCY] page_list count is %u, release threshold is %u, need to release", rcy->page_list_count,
RCY_PAGE_LIST_RELEASE_THRESHOLD);
rcy_wait_replay_complete(session);
return;
}
static status_t dtc_rcy_replay_batches_paral(knl_session_t *session)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
int64 lgwr_buf_size = (int64)LOG_LGWR_BUF_SIZE(session);
log_batch_t *batch = NULL;
log_cursor_t cursor;
status_t status = OG_SUCCESS;
errno_t ret;
rcy_context_t *rcy = &session->kernel->rcy_ctx;
timeval_t elapsed_begin;
uint32 idx;
uint64 used_time;
uint64 fetch_log_time = 0;
uint64 replay_batch_time = 0;
uint32 curr_node_idx = 0;
uint32 prarl_buf_list_size = g_instance->kernel.attr.dtc_rcy_paral_buf_list_size;
OG_LOG_RUN_INF("[DTC RCY] start paral redo replay, dtc_rcy->phase=%u, session->kernel->lsn=%llu", dtc_rcy->phase,
session->kernel->lsn);
dtc_rcy_atomic_list_init(&g_replay_paral_mgr.free_list);
for (uint32 i = 0; i < prarl_buf_list_size; i++) {
g_replay_paral_mgr.free_list.array[i] = i;
if (cm_aligned_malloc(lgwr_buf_size, "dtc rcy read buffer", &g_replay_paral_mgr.buf_list[i]) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to alloc log read buffer in paral replay for buf_list id=%u", i);
dtc_rcy_free_list_in_analyze_paral(g_replay_paral_mgr.buf_list, i);
return OG_ERROR;
}
}
for (uint32 i = 0; i < prarl_buf_list_size; i++) {
if (cm_aligned_malloc(lgwr_buf_size, "dtc rcy paral group buffer", &g_replay_paral_mgr.group_list[i]) !=
OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to alloc paral group buffer in paral replay for group=%u", i);
dtc_rcy_free_list_in_analyze_paral(g_replay_paral_mgr.group_list, i);
dtc_rcy_free_list_in_analyze_paral(g_replay_paral_mgr.buf_list, prarl_buf_list_size);
return OG_ERROR;
}
}
g_replay_paral_mgr.free_list.end = prarl_buf_list_size;
g_replay_paral_mgr.free_list.writed_end = prarl_buf_list_size;
ELAPSED_BEGIN(elapsed_begin);
SYNC_POINT_GLOBAL_START(OGRAC_PARAL_REPLAY_READ_LOG_FAIL, &status, OG_ERROR);
status = dtc_read_all_logs(session);
SYNC_POINT_GLOBAL_END;
if (status != OG_SUCCESS) {
dtc_rcy_free_list_in_analyze_paral(g_replay_paral_mgr.buf_list, prarl_buf_list_size);
dtc_rcy_free_list_in_analyze_paral(g_replay_paral_mgr.group_list, prarl_buf_list_size);
OG_LOG_RUN_ERR("[DTC RCY] failed to read redo log files in paral replay");
return OG_ERROR;
}
ELAPSED_END(elapsed_begin, used_time);
OG_LOG_RUN_INF("[DTC RCY] read redo logs in paral replay used=%llu", used_time);
knl_session_t *ss = NULL;
if (g_knl_callback.alloc_knl_session(OG_TRUE, (knl_handle_t *)&ss) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] dtc rcy proc init failed as alloc session failed");
return OG_ERROR;
}
if (OG_SUCCESS != cm_create_thread(dtc_rcy_read_node_log_proc, 0, ss, &dtc_rcy->read_log_thread)) {
OG_LOG_RUN_ERR("[DTC RCY] failed to create thread read node log proc");
return OG_ERROR;
}
ELAPSED_BEGIN(elapsed_begin);
if (dtc_rcy_fetch_log_batch(session, &batch, &curr_node_idx) != OG_SUCCESS) {
dtc_rcy_free_list_in_analyze_paral(g_replay_paral_mgr.buf_list, prarl_buf_list_size);
dtc_rcy_free_list_in_analyze_paral(g_replay_paral_mgr.group_list, prarl_buf_list_size);
OG_LOG_RUN_ERR("[DTC RCY] failed to extract log batch in paral replay");
return OG_ERROR;
}
ELAPSED_END(elapsed_begin, fetch_log_time);
ELAPSED_BEGIN(rcy->paral_rcy_thread_start_work_time);
while (dtc_rcy_need_continue(session, &batch, &curr_node_idx)) {
if (session->canceled) {
OG_LOG_RUN_ERR("[DTC RCY] session is canceled, session->id=%u", session->id);
OG_THROW_ERROR(ERR_OPERATION_CANCELED);
status = OG_ERROR;
break;
}
if (session->killed) {
OG_LOG_RUN_ERR("[DTC RCY] session is killed, session->id=%u", session->id);
OG_THROW_ERROR(ERR_OPERATION_KILLED);
status = OG_ERROR;
break;
}
if (DB_IS_PRIMARY(&session->kernel->db) && dtc_rcy->canceled) {
OG_LOG_RUN_ERR("[DTC RCY] required to cancel this dtc recovery task");
break;
}
if (dtc_rcy_pitr_replay_end(rcy, batch)) {
break;
}
if (DB_IS_PRIMARY(&session->kernel->db) && dtc_rcy_full_recovery_replay_end(rcy, batch)) {
break;
}
idx = dtc_rcy_atomic_list_pop(&g_replay_paral_mgr.free_list);
if (idx == OG_INVALID_INT32) {
cm_spin_sleep();
continue;
}
dtc_release_rcy_page_list(session);
ret = memcpy_sp(g_replay_paral_mgr.buf_list[idx].aligned_buf, lgwr_buf_size, (char *)batch, batch->space_size);
knl_securec_check(ret);
g_replay_paral_mgr.node_id[idx] = curr_node_idx;
rcy_init_log_cursor(&cursor, (log_batch_t *)g_replay_paral_mgr.buf_list[idx].aligned_buf);
ELAPSED_BEGIN(elapsed_begin);
dtc_rcy_paral_replay_batch(session, &cursor, idx);
OG_LOG_DEBUG_INF("[DTC RCY] paral replay redo log batch lfn=%llu, lsn=%llu, point [%u-%u/%u] has been"
" processed for instance=%u, session lsn=%llu",
(uint64)batch->head.point.lfn, batch->lsn, batch->head.point.rst_id, batch->head.point.asn,
batch->head.point.block_id, dtc_rcy->curr_node, session->kernel->lsn);
ELAPSED_END(elapsed_begin, used_time);
replay_batch_time += used_time;
ELAPSED_BEGIN(elapsed_begin);
if (dtc_rcy_check_is_end_restore_recovery()) {
break;
}
if (dtc_rcy_fetch_log_batch(session, &batch, &curr_node_idx) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to extract log batch in paral replay");
status = OG_ERROR;
break;
}
ELAPSED_END(elapsed_begin, used_time);
fetch_log_time += used_time;
}
if (close_read_log_proc(&dtc_rcy->read_log_thread, ss) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] close read log proc time out");
return OG_ERROR;
}
dtc_rcy_wait_paral_replay_end(session);
dtc_rcy_free_list_in_analyze_paral(g_replay_paral_mgr.buf_list, prarl_buf_list_size);
dtc_rcy_free_list_in_analyze_paral(g_replay_paral_mgr.group_list, prarl_buf_list_size);
session->canceled = dtc_rcy->canceled ? OG_TRUE : OG_FALSE;
OG_LOG_RUN_INF("[DTC RCY] dtc_rcy canceled=%u, session canceled=%u", dtc_rcy->canceled, session->canceled);
OG_LOG_RUN_INF("[DTC RCY] finish paral redo replay, dtc_rcy->phase=%u, session->kernel->lsn=%llu, "
"fetch redo log used time=%llu replay_batch_time=%llu",
dtc_rcy->phase, session->kernel->lsn, fetch_log_time, replay_batch_time);
return status;
}
static void try_to_read_no_log_node(thread_t *thread, uint32 *last_nod_log_buffer_index)
{
OG_LOG_DEBUG_INF("[DTC RCY] dtc rcy try to read failed node");
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
knl_session_t *session = (knl_session_t *)thread->argument;
uint32 read_buf_size = g_instance->kernel.attr.rcy_node_read_buf_size;
for (int j = 0; j < dtc_rcy->node_count; ++j) {
dtc_rcy_node_t *node = &dtc_rcy->rcy_nodes[j];
if (node->read_size[node->read_buf_write_index] != 0) {
continue;
}
if (node->read_buf_ready[node->read_buf_write_index]) {
cm_spin_sleep();
OG_LOG_DEBUG_INF("[DTC RCY] read node read buffer is ready "
"node_id = %u read_buf_write_index=%u",
j, node->read_buf_write_index);
continue;
}
if (last_nod_log_buffer_index[j] == OG_INVALID_ID32) {
continue;
}
OG_LOG_DEBUG_INF("[DTC RCY] read node log proc read last failed node log last_failed_id=%u", j);
uint32 read_size = 0;
node->read_size[node->read_buf_write_index] = OG_INVALID_ID32;
if (dtc_read_node_log(dtc_rcy, session, j, &read_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] read node lod proc failed to load redo log of last failed node=%u", j);
return;
}
node->read_size[node->read_buf_write_index] = read_size;
if (read_size != 0) {
last_nod_log_buffer_index[j] = OG_INVALID_ID32;
node->read_buf_ready[node->read_buf_write_index] = OG_TRUE;
node->read_buf_write_index = (node->read_buf_write_index + 1) % read_buf_size;
OG_LOG_RUN_INF("[DTC RCY] read node lod proc last node log "
"success read_size = %u node=%u write_index=%u",
read_size, j, node->read_buf_write_index);
}
}
OG_LOG_DEBUG_INF("[DTC RCY] dtc rcy finish try to read failed node");
}
void dtc_rcy_read_node_log_proc(thread_t *thread)
{
uint32 read_buf_size = g_instance->kernel.attr.rcy_node_read_buf_size;
knl_session_t *session = (knl_session_t *)thread->argument;
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
uint32 last_nod_log_buffer_index[read_buf_size];
for (int i = 0; i < read_buf_size; ++i) {
last_nod_log_buffer_index[i] = OG_INVALID_ID32;
}
OG_LOG_RUN_INF("[DTC RCY] rcy read node log thread start "
"closed = %d result = %d ",
thread->closed, thread->result);
while (!thread->closed) {
for (uint32 i = 0; i < dtc_rcy->node_count; i++) {
if (thread->closed) {
break;
}
dtc_rcy_node_t *node = &dtc_rcy->rcy_nodes[i];
if (node->read_buf_ready[node->read_buf_write_index]) {
OG_LOG_DEBUG_INF("[DTC RCY] read log thread wait for read buf ready node_id =%u", i);
continue;
}
uint32 read_size = 0;
if (dtc_read_node_log(dtc_rcy, session, i, &read_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] read node log proc failed to "
"load redo log of crashed node=%u",
node->node_id);
break;
}
if (read_size == 0) {
node->read_size[node->read_buf_write_index] = read_size;
last_nod_log_buffer_index[i] = node->read_buf_write_index;
continue;
}
last_nod_log_buffer_index[i] = OG_INVALID_ID32;
try_to_read_no_log_node(thread, last_nod_log_buffer_index);
OG_LOG_DEBUG_INF("[DTC RCY] read node log proc finish read node "
"log node_id=%u read_buf_write_index=%u",
node->node_id, node->read_buf_write_index);
node->read_buf_ready[node->read_buf_write_index] = OG_TRUE;
node->read_size[node->read_buf_write_index] = read_size;
node->read_buf_write_index = (node->read_buf_write_index + 1) % read_buf_size;
}
}
thread->result = OG_TRUE;
OG_LOG_RUN_INF("[DTC RCY] rcy read node log thread is closed, closed = %d result = %d ", thread->closed,
thread->result);
}
static inline void dtc_rcy_next_phase(knl_session_t *session)
{
uint32 read_buf_size = g_instance->kernel.attr.rcy_node_read_buf_size;
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy->phase = PHASE_RECOVERY;
dtc_rcy->curr_node_idx = OG_INVALID_ID8;
dtc_rcy->curr_node = OG_INVALID_ID8;
dtc_rcy->curr_blk_size = OG_INVALID_ID16;
dtc_rcy->curr_batch_lsn = OG_INVALID_ID64;
dtc_rcy->is_end_restore_recover = OG_FALSE;
dtc_rcy->recovery_status = RECOVERY_REPLAY;
for (uint32 i = 0; i < dtc_rcy->node_count; i++) {
dtc_rcy->rcy_log_points[i].rcy_point = dtc_rcy->rcy_log_points[i].rcy_point_saved;
dtc_rcy->rcy_log_points[i].rcy_write_point = dtc_rcy->rcy_log_points[i].rcy_point_saved;
dtc_rcy->rcy_nodes[i].recover_done = OG_FALSE;
dtc_rcy->rcy_nodes[i].ulog_exist_data = OG_TRUE;
for (int j = 0; j < read_buf_size; ++j) {
dtc_rcy->rcy_nodes[i].read_pos[j] = 0;
dtc_rcy->rcy_nodes[i].write_pos[j] = 0;
dtc_rcy->rcy_nodes[i].read_size[j] = OG_INVALID_ID32;
dtc_rcy->rcy_nodes[i].not_finished[j] = OG_TRUE;
}
dtc_rcy->rcy_nodes[i].latest_lsn = 0;
dtc_rcy->rcy_nodes[i].latest_rcy_end_lsn = 0;
if (cm_dbs_is_enable_dbs() && session->kernel->db.recover_for_restore) {
dtc_rcy->rcy_log_points[i].rcy_point.asn = 0;
dtc_rcy->rcy_log_points[i].rcy_point.block_id = OG_INFINITE32;
dtc_rcy->rcy_log_points[i].rcy_write_point.asn = 0;
dtc_rcy->rcy_log_points[i].rcy_write_point.block_id = OG_INFINITE32;
OG_LOG_RUN_INF("[DTC RCY] dtc_rcy_next_phase dtc_rcy->rcy_write_log_points[i].rcy_point.asn = %u",
dtc_rcy->rcy_log_points[i].rcy_point.asn);
}
}
}
static status_t dtc_rcy_full_recovery_replay(knl_session_t *session, dtc_rcy_stat_t *stat)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
timeval_t begin_time;
ELAPSED_BEGIN(begin_time);
if (dtc_rcy->paral_rcy) {
if (dtc_rcy_replay_batches_paral(session) != OG_SUCCESS) {
return OG_ERROR;
}
} else {
if (dtc_rcy_process_batches(session) != OG_SUCCESS) {
return OG_ERROR;
}
}
ELAPSED_END(begin_time, stat->last_rcy_replay_elapsed);
return OG_SUCCESS;
}
static status_t dtc_rcy_full_recovery(knl_session_t *session)
{
timeval_t begin_time;
knl_session_t *se = session->kernel->sessions[SESSION_ID_KERNEL];
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_stat_t *stat = &dtc_rcy->rcy_stat;
stat->last_rcy_is_full_recovery = OG_TRUE;
reform_detail_t *rf_detail = &g_rc_ctx->reform_detail;
uint64 rcy_disk_read_time = se->stat->disk_read_time;
uint64 rcy_disk_read = se->stat->disk_reads;
if (dtc_rcy->phase == PHASE_ANALYSIS) {
ELAPSED_BEGIN(begin_time);
if (dtc_rcy_process_batches(session) != OG_SUCCESS) {
return OG_ERROR;
}
dtc_rcy_next_phase(session);
ELAPSED_END(begin_time, stat->last_rcy_analyze_elapsed);
OG_LOG_RUN_INF("[DTC RCY] finish redo analyze, pcn is equal num=%u", dtc_rcy->pcn_is_equal_num);
}
RC_STEP_BEGIN(rf_detail->recovery_replay_elapsed);
dtc_rcy->recovery_status = RECOVERY_REPLAY;
if (dtc_rcy_full_recovery_replay(session, stat) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] redo replay failed");
RC_STEP_END(rf_detail->recovery_replay_elapsed, RC_STEP_FAILED);
return OG_ERROR;
}
if (dtc_recover_check(session) != OG_SUCCESS) {
if (!DB_IS_MAXFIX(session)) {
RC_STEP_END(rf_detail->recovery_replay_elapsed, RC_STEP_FAILED);
return OG_ERROR;
}
}
RC_STEP_END(rf_detail->recovery_replay_elapsed, RC_STEP_FINISH);
OG_LOG_RUN_INF("[DTC RCY] finish redo replay, session lsn=%llu", ((knl_session_t *)g_rc_ctx->session)->kernel->lsn);
if (rc_set_redo_replay_done(g_rc_ctx->session, &(g_rc_ctx->info), OG_TRUE) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to broadcast reform status g_rc_ctx->status=%u", g_rc_ctx->status);
}
if (log_ddl_write_buffer(session) != OG_SUCCESS) {
return OG_ERROR;
}
rcy_disk_read_time = se->stat->disk_read_time - rcy_disk_read_time;
rcy_disk_read = se->stat->disk_reads - rcy_disk_read;
OG_LOG_RUN_INF("[DTC RCY] kernel session read_page_num=%llu, total_time(us)=%llu, ave_time(us)=%llu", rcy_disk_read,
rcy_disk_read_time, (rcy_disk_read == 0 ? 0 : rcy_disk_read_time / rcy_disk_read));
OG_LOG_RUN_INF("[DTC RCY] last_rcy_replay_elapsed_time=%llu", stat->last_rcy_replay_elapsed);
OG_LOG_RUN_INF("[DTC RCY] last_rcy_replay_log_size=%llu", stat->last_rcy_log_size);
ckpt_trigger(session, OG_TRUE, CKPT_TRIGGER_FULL);
return dtc_rcy_update_ckpt_log_point(session);
}
static status_t dtc_rcy_partial_recovery(knl_session_t *session)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_stat_t *stat = &dtc_rcy->rcy_stat;
reform_detail_t *rf_detail = &g_dtc->rf_ctx.reform_detail;
stat->last_rcy_is_full_recovery = OG_FALSE;
knl_session_t *se = session->kernel->sessions[SESSION_ID_KERNEL];
dtc_rcy->recovery_status = RECOVERY_ANALYSIS;
RC_STEP_BEGIN(rf_detail->recovery_set_create_elapsed);
if (dtc_rcy_analyze_batches_paral(session) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY][partial recovery] failed to paral analyze redo logs, dtc_rcy->failed=%u, "
"dtc_rcy->ss->canceled=%u",
dtc_rcy->failed, dtc_rcy->ss->canceled);
RC_STEP_END(rf_detail->recovery_set_create_elapsed, RC_STEP_FAILED);
return OG_ERROR;
}
RC_STEP_END(rf_detail->recovery_set_create_elapsed, RC_STEP_FINISH);
dtc_rcy_set_num_stat();
RC_STEP_BEGIN(rf_detail->recovery_set_revise_elapsed);
if (dtc_send_rcy_set(session) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY][partial recovery] failed to send rcy set to each master");
RC_STEP_END(rf_detail->recovery_set_revise_elapsed, RC_STEP_FAILED);
return OG_ERROR;
}
while (dtc_rcy->phase != PHASE_HANDLE_RCYSET_DONE) {
if (dtc_rcy_check_rcyset_msg(session) != OG_SUCCESS) {
OG_LOG_RUN_ERR("DTC RCY] failed to check rcyset msg");
RC_STEP_END(rf_detail->recovery_set_revise_elapsed, RC_STEP_FAILED);
return OG_ERROR;
}
cm_sleep(10);
if (session->canceled) {
OG_LOG_RUN_ERR("[DTC RCY] rcy session is cancled, session->id=%u", session->id);
RC_STEP_END(rf_detail->recovery_set_revise_elapsed, RC_STEP_FAILED);
OG_THROW_ERROR(ERR_OPERATION_CANCELED);
return OG_ERROR;
}
if (session->killed) {
OG_LOG_RUN_ERR("[DTC RCY] rcy session is cancled, session->id=%u", session->id);
RC_STEP_END(rf_detail->recovery_set_revise_elapsed, RC_STEP_FAILED);
OG_THROW_ERROR(ERR_OPERATION_KILLED);
return OG_ERROR;
}
if (dtc_rcy->canceled) {
session->canceled = OG_TRUE;
OG_LOG_RUN_ERR("[DTC RCY] required to cancel this dtc recovery task, session canceled=%u",
session->canceled);
RC_STEP_END(rf_detail->recovery_set_revise_elapsed, RC_STEP_FAILED);
return OG_ERROR;
}
if (dtc_rcy->failed == OG_TRUE) {
OG_LOG_RUN_ERR("[DTC RCY] check dtc_rcy->failed=%u", dtc_rcy->failed);
RC_STEP_END(rf_detail->recovery_set_revise_elapsed, RC_STEP_FAILED);
return OG_ERROR;
}
}
RC_STEP_END(rf_detail->recovery_set_revise_elapsed, RC_STEP_FINISH);
OG_LOG_RUN_INF("[DTC RCY][partial recovery] wait masters send rcy set results successfully, msg_sent=%u, "
"msg_recv=%u, dtc_rcy->phase=%u",
dtc_rcy->msg_sent, dtc_rcy->msg_recv, dtc_rcy->phase);
dtc_rcy_next_phase(session);
uint64 rcy_disk_read_time = se->stat->disk_read_time;
uint64 rcy_disk_read = se->stat->disk_reads;
uint64 rcy_record_page = 0;
rcy_set_t *rcy_set = &dtc_rcy->rcy_set;
for (uint32 i = 0; i < rcy_set->bucket_num; i++) {
rcy_record_page += rcy_set->buckets[i].count;
}
RC_STEP_BEGIN(rf_detail->recovery_replay_elapsed);
if (dtc_rcy->paral_rcy) {
if (dtc_rcy_replay_batches_paral(session) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to do redo log batch replay in parallel");
RC_STEP_END(rf_detail->recovery_replay_elapsed, RC_STEP_FAILED);
return OG_ERROR;
}
} else {
if (dtc_rcy_process_batches(session) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to do redo log batch replay");
RC_STEP_END(rf_detail->recovery_replay_elapsed, RC_STEP_FAILED);
return OG_ERROR;
}
}
RC_STEP_END(rf_detail->recovery_replay_elapsed, RC_STEP_FINISH);
if (dtc_recover_check(session) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY][partial recovery] failed to check dtc recovery rcy point");
return OG_ERROR;
}
OG_LOG_RUN_INF("[DTC RCY] finish redo replay, session lsn=%llu", ((knl_session_t *)g_rc_ctx->session)->kernel->lsn);
if (rc_set_redo_replay_done(g_rc_ctx->session, &(g_rc_ctx->info), OG_FALSE) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to broadcast reform status g_rc_ctx->status=%u", g_rc_ctx->status);
}
rcy_disk_read_time = se->stat->disk_read_time - rcy_disk_read_time;
rcy_disk_read = se->stat->disk_reads - rcy_disk_read;
OG_LOG_RUN_INF("[DTC RCY] kernel session read page num=%llu, total time(s)=%llu, ave_time(us)=%llu", rcy_disk_read,
rcy_disk_read_time / MICROSECS_PER_SECOND,
(rcy_disk_read == 0 ? 0 : rcy_disk_read_time / rcy_disk_read));
OG_LOG_RUN_INF("[DTC RCY] recovery set create time(us)=%llu, recovery set revise time(s)=%llu. recovery replay "
"time(s)=%llu",
(rf_detail->recovery_set_create_elapsed.cost_time) / MICROSECS_PER_SECOND_LL,
(rf_detail->recovery_set_revise_elapsed.cost_time) / MICROSECS_PER_SECOND_LL,
(rf_detail->recovery_replay_elapsed.cost_time) / MICROSECS_PER_SECOND_LL);
OG_LOG_RUN_INF("[DTC RCY] recovery set record page=%llu, recovery redo log size(M)=%llu", rcy_record_page,
stat->last_rcy_log_size / SIZE_M(1));
ckpt_trigger(session, OG_FALSE, CKPT_TRIGGER_INC);
OG_LOG_RUN_INF("[DTC RCY][partial recovery] trigger inc ckpt");
dtc_rcy_update_ckpt_prcy_info(session);
return OG_SUCCESS;
}
static status_t dtc_rcy_proc(knl_session_t *session)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
status_t status;
if (dtc_rcy->full_recovery) {
status = dtc_rcy_full_recovery(session);
} else {
status = dtc_rcy_partial_recovery(session);
}
OG_LOG_RUN_INF("[DTC RCY] dtc_rcy_proc, dtc_rcy->failed=%u, dtc_rcy->ss->canceled=%u, dtc_rcy->recovery_status=%u,"
"memory usage in bytes=%lu",
dtc_rcy->failed, dtc_rcy->ss->canceled, dtc_rcy->recovery_status, cm_print_memory_usage());
dtc_rcy->failed = (bool32)(status == OG_ERROR);
dtc_rcy->recovery_status = status == OG_ERROR ? dtc_rcy->recovery_status : RECOVERY_FINISH;
dtc_rcy->ss->canceled = dtc_rcy->failed ? OG_TRUE : OG_FALSE;
if (!DB_IS_PRIMARY(&session->kernel->db) && dtc_rcy->full_recovery && status == OG_SUCCESS) {
lrpl_context_t *lrpl = &session->kernel->lrpl_ctx;
lrpl->is_done = OG_TRUE;
}
dtc_recovery_close(session);
OG_LOG_RUN_INF("[DTC RCY] dtc_rcy_proc, dtc_rcy->failed=%u, dtc_rcy->ss->canceled=%u, dtc_rcy->recovery_status=%u,"
"memory usage in bytes=%lu",
dtc_rcy->failed, dtc_rcy->ss->canceled, dtc_rcy->recovery_status, cm_print_memory_usage());
return status;
}
static void dtc_rcy_thread_proc(thread_t *thread)
{
knl_session_t *session = (knl_session_t *)thread->argument;
if (dtc_rcy_proc(session) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] dtc_rcy_proc failed");
}
}
dtc_rcy_phase_e dtc_rcy_get_recover_phase(knl_session_t *session, bool32 full_recovery)
{
if (full_recovery) {
if (session->kernel->db.recover_for_restore) {
return PHASE_ANALYSIS;
} else {
return PHASE_RECOVERY;
}
} else {
return PHASE_ANALYSIS;
}
}
static status_t dtc_rcy_init_context(knl_session_t *session, dtc_rcy_context_t *dtc_rcy, uint32 count,
bool32 full_recovery)
{
knl_panic(count <= OG_MAX_INSTANCES);
dtc_rcy->curr_node_idx = OG_INVALID_ID8;
dtc_rcy->curr_node = OG_INVALID_ID8;
dtc_rcy->curr_blk_size = OG_INVALID_ID16;
dtc_rcy->curr_batch_lsn = OG_INVALID_ID64;
dtc_rcy->end_lsn_restore_recovery = OG_INVALID_ID64;
dtc_rcy->full_recovery = full_recovery;
dtc_rcy->phase = dtc_rcy_get_recover_phase(session, full_recovery);
dtc_rcy->replay_thread_num = session->kernel->attr.log_replay_processes;
dtc_rcy->canceled = OG_FALSE;
dtc_rcy->failed = OG_FALSE;
dtc_rcy->is_end_restore_recover = OG_FALSE;
dtc_rcy->need_analysis_leave_page_cnt = 0;
dtc_rcy->node_count = count;
dtc_rcy->msg_sent = 0;
dtc_rcy->msg_recv = 0;
dtc_rcy->paral_rcy_size = 0;
dtc_rcy->paral_rcy = (dtc_rcy->replay_thread_num > 1);
dtc_rcy->rcy_set_ref_num = 0;
dtc_rcy->pcn_is_equal_num = 0;
dtc_rcy->rcy_nodes = (dtc_rcy_node_t *)malloc(count * sizeof(dtc_rcy_node_t));
if (dtc_rcy->rcy_nodes == NULL) {
OG_THROW_ERROR(ERR_ALLOC_MEMORY, count * sizeof(dtc_rcy_node_t), "dtc recovery nodes");
OG_LOG_RUN_ERR("[DTC RCY] failed to alloc memory for crashed nodes");
return OG_ERROR;
}
errno_t ret = memset_sp(dtc_rcy->rcy_nodes, count * sizeof(dtc_rcy_node_t), 0, count * sizeof(dtc_rcy_node_t));
knl_securec_check(ret);
ret = memset_s(dtc_rcy->rcy_create_users, sizeof(dtc_rcy->rcy_create_users), 0, sizeof(dtc_rcy->rcy_create_users));
knl_securec_check(ret);
if (full_recovery) {
session->dtc_session_type = dtc_rcy->paral_rcy ? DTC_FULL_RCY_PARAL : DTC_FULL_RCY;
} else {
session->dtc_session_type = dtc_rcy->paral_rcy ? DTC_PART_RCY_PARAL : DTC_PART_RCY;
}
return OG_SUCCESS;
}
static void dtc_rcy_update_rcy_stat(knl_session_t *session, instance_list_t *recover_list, uint32 idx, uint8 node_id,
dtc_node_ctrl_t *node_ctrl)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy_stat_t *rcy_stat = &dtc_rcy->rcy_stat;
rcy_stat->rcy_log_points[idx].node_id = node_id;
rcy_stat->rcy_log_points[idx].rcy_point = node_ctrl->rcy_point;
rcy_stat->rcy_log_points[idx].lrp_point = node_ctrl->lrp_point;
rcy_stat->rcy_log_points[idx].curr_read_rcy_point = node_ctrl->rcy_point;
return;
}
static void dtc_init_node(dtc_rcy_node_t *rcy_node, reform_rcy_node_t *rcy_log_point, dtc_node_ctrl_t *ctrl,
uint8 node_id)
{
uint32 read_buf_size = g_instance->kernel.attr.rcy_node_read_buf_size;
rcy_node->node_id = node_id;
rcy_node->pitr_lfn = ctrl->lrp_point.lfn;
rcy_node->ddl_lsn_pitr = OG_INVALID_ID64;
rcy_node->arch_file.handle = OG_INVALID_HANDLE;
rcy_node->ulog_exist_data = OG_TRUE;
rcy_node->curr_file_length = 0;
rcy_node->latest_lsn = 0;
rcy_node->latest_rcy_end_lsn = 0;
rcy_log_point->node_id = node_id;
rcy_log_point->lsn = ctrl->lsn;
rcy_log_point->rcy_point = ctrl->rcy_point;
rcy_log_point->rcy_point_saved = ctrl->rcy_point;
rcy_log_point->rcy_write_point = ctrl->rcy_point;
rcy_node->read_buf_read_index = 0;
rcy_node->read_buf_write_index = 0;
rcy_node->read_buf = (aligned_buf_t *)malloc(read_buf_size * sizeof(aligned_buf_t));
rcy_node->read_pos = (uint32 *)malloc(read_buf_size * sizeof(uint32));
rcy_node->write_pos = (uint32 *)malloc(read_buf_size * sizeof(uint32));
rcy_node->read_buf_ready = (bool32 *)malloc(read_buf_size * sizeof(bool32));
rcy_node->read_size = (uint32 *)malloc(read_buf_size * sizeof(uint32));
rcy_node->not_finished = (bool32 *)malloc(read_buf_size * sizeof(bool32));
if (rcy_node->read_buf == NULL || rcy_node->read_pos == NULL || rcy_node->write_pos == NULL ||
rcy_node->read_buf_ready == NULL || rcy_node->read_size == NULL || rcy_node->not_finished == NULL) {
CM_ABORT(0, "[DTC RCY] alloc memory failed");
}
for (int i = 0; i < read_buf_size; ++i) {
rcy_node->write_pos[i] = 0;
rcy_node->read_pos[i] = 0;
rcy_node->read_buf_ready[i] = OG_FALSE;
rcy_node->read_size[i] = OG_INVALID_ID32;
rcy_node->not_finished[i] = OG_TRUE;
}
}
static status_t dtc_rcy_init_rcynode(knl_session_t *session, instance_list_t *recover_list, uint32 idx)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_node_ctrl_t *ctrl = NULL;
dtc_rcy_node_t *rcy_node = NULL;
reform_rcy_node_t *rcy_log_point = NULL;
uint8 node_id = recover_list->inst_id_list[idx];
uint32 read_buf_size = g_instance->kernel.attr.rcy_node_read_buf_size;
if (dtc_read_node_ctrl(session, node_id) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to read ctrl page for crashed node=%u", node_id);
return OG_ERROR;
}
bool32 is_dbstor = cm_dbs_is_enable_dbs();
ctrl = dtc_get_ctrl(session, node_id);
rcy_log_point = &dtc_rcy->rcy_log_points[idx];
rcy_node = &dtc_rcy->rcy_nodes[idx];
dtc_init_node(rcy_node, rcy_log_point, ctrl, node_id);
dtc_update_scn(session, ctrl->scn);
dtc_update_lsn(session, ctrl->lsn);
if (is_dbstor && session->kernel->db.recover_for_restore) {
rcy_log_point->rcy_point.asn = 0;
rcy_log_point->rcy_point.block_id = OG_INFINITE32;
rcy_log_point->rcy_write_point.asn = 0;
rcy_log_point->rcy_write_point.block_id = OG_INFINITE32;
}
int64 lgwr_buf_size = (int64)LOG_LGWR_BUF_SIZE(session);
int64 size = (is_dbstor && !DB_IS_PRIMARY(&session->kernel->db)) ? MAX(DBSTOR_LOG_SEGMENT_SIZE, lgwr_buf_size)
: lgwr_buf_size;
for (int i = 0; i < read_buf_size; ++i) {
if (cm_aligned_malloc(size, "dtc rcy read buffer", &rcy_node->read_buf[i]) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to alloc log read buffer for crashed node=%u", node_id);
return OG_ERROR;
}
}
errno_t ret = memset_sp(rcy_node->handle, sizeof(rcy_node->handle), OG_INVALID_HANDLE, sizeof(rcy_node->handle));
knl_securec_check(ret);
dtc_rcy_update_rcy_stat(session, recover_list, idx, node_id, ctrl);
OG_LOG_RUN_INF("[DTC RCY] Recover instance=%u from point [%u-%u/%u/%llu/%llu/%llu][%u/%u/%llu/%llu]", node_id,
ctrl->rcy_point.rst_id, ctrl->rcy_point.asn, ctrl->rcy_point.block_id, (uint64)ctrl->rcy_point.lfn,
ctrl->rcy_point.lsn, ctrl->lsn, ctrl->lrp_point.asn, ctrl->lrp_point.block_id,
(uint64)ctrl->lrp_point.lfn, ctrl->lrp_point.lsn);
return OG_SUCCESS;
}
static status_t dtc_rcy_init_rcyset(rcy_set_t *rcy_set)
{
rcy_set->bucket_num = OG_RCY_SET_BUCKET;
rcy_set->capacity = OG_RCY_SET_BUCKET * RCY_SET_BUCKET_TIMES;
uint64 bucket_size = sizeof(rcy_set_bucket_t) * rcy_set->bucket_num;
rcy_set->buckets = (rcy_set_bucket_t *)malloc(bucket_size);
if (rcy_set->buckets == NULL) {
OG_LOG_RUN_ERR("[DTC RCY] failed to alloc dtc recovery rcyset bucket");
OG_THROW_ERROR(ERR_ALLOC_MEMORY, bucket_size, "dtc recovery set bucket");
return OG_ERROR;
}
errno_t ret = memset_sp(rcy_set->buckets, bucket_size, 0, bucket_size);
knl_securec_check(ret);
rcy_set->item_pools = dtc_rcy_alloc_itempool(rcy_set);
if (rcy_set->item_pools == NULL) {
CM_FREE_PTR(rcy_set->buckets);
OG_LOG_RUN_ERR("[DTC RCY] failed to alloc dtc recovery rcyset itmepool");
return OG_ERROR;
}
rcy_set->curr_item_pools = rcy_set->item_pools;
ret = memset_sp(rcy_set->space_id_set, sizeof(rcy_set->space_id_set), OG_INVALID_ID32,
sizeof(rcy_set->space_id_set));
knl_securec_check(ret);
rcy_set->space_set_size = 0;
return OG_SUCCESS;
}
static status_t dtc_rcy_init_replay_proc(knl_session_t *session, dtc_rcy_context_t *dtc_rcy)
{
rcy_context_t *rcy = &session->kernel->rcy_ctx;
if (!dtc_rcy->paral_rcy) {
OG_LOG_RUN_INF("[DTC RCY] use single thread to replay.");
return OG_SUCCESS;
}
if (rcy_init_context(session) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to init rcy context");
return OG_ERROR;
}
rcy_init_proc(session);
rcy->curr_group = rcy->group_list;
if (rcy->paral_rcy == OG_FALSE) {
OG_LOG_RUN_ERR("[DTC RCY] failed to create paral replay thread");
return OG_ERROR;
} else {
OG_LOG_RUN_INF("[DTC RCY] expected number of created threads=%u, actual number of created threads=%u",
dtc_rcy->replay_thread_num, rcy->capacity);
dtc_rcy->replay_thread_num = rcy->capacity;
return OG_SUCCESS;
}
}
static inline void dtc_free_read_buf(uint32 index)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
uint32 read_buf_size = g_instance->kernel.attr.rcy_node_read_buf_size;
for (int k = 0; k < read_buf_size; k++) {
cm_aligned_free(&dtc_rcy->rcy_nodes[index].read_buf[k]);
}
}
static status_t init_paral_mgr()
{
uint32 prarl_buf_list_size = g_instance->kernel.attr.dtc_rcy_paral_buf_list_size;
g_analyze_paral_mgr.free_list.array = (uint32 *)malloc(prarl_buf_list_size * sizeof(uint32));
g_analyze_paral_mgr.used_list.array = (uint32 *)malloc(prarl_buf_list_size * sizeof(uint32));
g_analyze_paral_mgr.buf_list = (aligned_buf_t *)malloc(prarl_buf_list_size * sizeof(aligned_buf_t));
g_replay_paral_mgr.buf_list = (aligned_buf_t *)malloc(prarl_buf_list_size * sizeof(aligned_buf_t));
g_replay_paral_mgr.group_list = (aligned_buf_t *)malloc(prarl_buf_list_size * sizeof(aligned_buf_t));
g_replay_paral_mgr.group_num = (atomic32_t *)malloc(prarl_buf_list_size * sizeof(atomic32_t));
g_replay_paral_mgr.batch_scn = (knl_scn_t *)malloc(prarl_buf_list_size * sizeof(knl_scn_t));
g_replay_paral_mgr.node_id = (uint32 *)malloc(prarl_buf_list_size * sizeof(uint32));
g_replay_paral_mgr.batch_rpl_start_time = (date_t *)malloc(prarl_buf_list_size * sizeof(date_t));
g_replay_paral_mgr.free_list.array = (uint32 *)malloc(prarl_buf_list_size * sizeof(uint32));
if (g_analyze_paral_mgr.free_list.array == NULL || g_analyze_paral_mgr.buf_list == NULL ||
g_replay_paral_mgr.buf_list == NULL || g_replay_paral_mgr.group_list == NULL ||
g_replay_paral_mgr.group_num == NULL || g_replay_paral_mgr.batch_scn == NULL ||
g_replay_paral_mgr.node_id == NULL || g_replay_paral_mgr.batch_rpl_start_time == NULL ||
g_replay_paral_mgr.free_list.array == NULL || g_analyze_paral_mgr.used_list.array == NULL) {
CM_ABORT(0, "[DTC RCY] alloc memory failed");
}
MEMS_RETURN_IFERR(memset_sp(g_analyze_paral_mgr.free_list.array, prarl_buf_list_size * sizeof(uint32), 0,
prarl_buf_list_size * sizeof(uint32)));
MEMS_RETURN_IFERR(memset_sp(g_analyze_paral_mgr.used_list.array, prarl_buf_list_size * sizeof(uint32), 0,
prarl_buf_list_size * sizeof(uint32)));
MEMS_RETURN_IFERR(memset_sp(g_analyze_paral_mgr.buf_list, prarl_buf_list_size * sizeof(aligned_buf_t), 0,
prarl_buf_list_size * sizeof(aligned_buf_t)));
MEMS_RETURN_IFERR(memset_sp(g_replay_paral_mgr.buf_list, prarl_buf_list_size * sizeof(aligned_buf_t), 0,
prarl_buf_list_size * sizeof(aligned_buf_t)));
MEMS_RETURN_IFERR(memset_sp(g_replay_paral_mgr.group_list, prarl_buf_list_size * sizeof(aligned_buf_t), 0,
prarl_buf_list_size * sizeof(aligned_buf_t)));
MEMS_RETURN_IFERR(memset_sp((void *)g_replay_paral_mgr.group_num, prarl_buf_list_size * sizeof(atomic32_t), 0,
prarl_buf_list_size * sizeof(atomic32_t)));
MEMS_RETURN_IFERR(memset_sp(g_replay_paral_mgr.batch_scn, prarl_buf_list_size * sizeof(knl_scn_t), 0,
prarl_buf_list_size * sizeof(knl_scn_t)));
MEMS_RETURN_IFERR(memset_sp(g_replay_paral_mgr.node_id, prarl_buf_list_size * sizeof(uint32), 0,
prarl_buf_list_size * sizeof(uint32)));
MEMS_RETURN_IFERR(memset_sp(g_replay_paral_mgr.batch_rpl_start_time, prarl_buf_list_size * sizeof(date_t), 0,
prarl_buf_list_size * sizeof(date_t)));
MEMS_RETURN_IFERR(memset_sp(g_replay_paral_mgr.free_list.array, prarl_buf_list_size * sizeof(uint32), 0,
prarl_buf_list_size * sizeof(uint32)));
return OG_SUCCESS;
}
static status_t dtc_recovery_init(knl_session_t *session, instance_list_t *recover_list, bool32 full_recovery)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
uint32 count = recover_list->inst_id_count;
uint32 read_buf_size = g_instance->kernel.attr.rcy_node_read_buf_size;
if (init_paral_mgr() != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to init paral mgr");
free_paral_mgr();
return OG_ERROR;
}
dtc_rcy_init_last_recovery_stat(recover_list);
cm_reset_error();
if (dtc_rcy_init_context(session, dtc_rcy, count, full_recovery) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to init dtc recovery context");
return OG_ERROR;
}
if (dtc_rcy_init_replay_proc(session, dtc_rcy) != OG_SUCCESS) {
CM_FREE_PTR(dtc_rcy->rcy_nodes);
OG_LOG_RUN_ERR("[DTC RCY] failed to init dtc recovery replay proc");
return OG_ERROR;
}
for (uint32 i = 0; i < count; i++) {
if (dtc_rcy_init_rcynode(session, recover_list, i) != OG_SUCCESS) {
for (uint32 j = 0; j < i; j++) {
dtc_free_read_buf(j);
}
CM_FREE_PTR(dtc_rcy->rcy_nodes);
if (dtc_rcy->paral_rcy) {
rcy_close_proc(session);
rcy_free_buffer(&session->kernel->rcy_ctx);
}
OG_LOG_RUN_ERR("[DTC RCY] failed to init rcynode");
return OG_ERROR;
}
if (!DB_IS_PRIMARY(&session->kernel->db)) {
dtc_node_ctrl_t *ctrl = dtc_get_ctrl(session, i);
ckpt_set_trunc_point_slave_role(session, &ctrl->rcy_point, i);
}
}
if (dtc_rcy_init_rcyset(&dtc_rcy->rcy_set) != OG_SUCCESS) {
for (uint32 i = 0; i < count; i++) {
for (int k = 0; k < read_buf_size; k++) {
cm_aligned_free(&dtc_rcy->rcy_nodes[i].read_buf[k]);
}
}
CM_FREE_PTR(dtc_rcy->rcy_nodes);
if (dtc_rcy->paral_rcy) {
rcy_close_proc(session);
rcy_free_buffer(&session->kernel->rcy_ctx);
}
OG_LOG_RUN_ERR("[DTC RCY] failed to init recovery set");
return OG_ERROR;
}
return OG_SUCCESS;
}
static void dtc_recovery_from_double_write_area(knl_session_t *session, reform_info_t *reform_info)
{
ckpt_context_t *ogx = &session->kernel->ckpt_ctx;
if (ogx->double_write != OG_TRUE) {
OG_LOG_RUN_INF("Double write is disabled(%u), do NOT recovery from double write area.", ogx->double_write);
return;
}
ckpt_disable(session);
for (uint32 i = 0; i < reform_info->reform_list[REFORM_LIST_ABORT].inst_id_count; i++) {
ckpt_recover_partial_write_node(session, reform_info->reform_list[REFORM_LIST_ABORT].inst_id_list[i]);
}
ogx->dw_ckpt_start = dtc_my_ctrl(session)->dw_start;
ogx->dw_ckpt_end = dtc_my_ctrl(session)->dw_end;
ckpt_enable(session);
}
status_t dtc_recover_crashed_nodes(knl_session_t *session, instance_list_t *recover_list, bool32 full_recovery)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
dtc_rcy->ss = session;
dtc_rcy->recovery_status = RECOVERY_INIT;
if (!full_recovery) {
if (g_knl_callback.alloc_knl_session(OG_TRUE, (knl_handle_t *)&dtc_rcy->ss) != OG_SUCCESS) {
dtc_rcy->in_progress = OG_FALSE;
dtc_rcy->failed = OG_TRUE;
OG_LOG_RUN_ERR("[DTC RCY] failed to alloc knernel session for partial recovery, "
"dtc_rcy->recovery_status=%u, dtc_rcy->failed=%u, dtc_rcy->in_progress=%u",
dtc_rcy->recovery_status, dtc_rcy->failed, dtc_rcy->in_progress);
return OG_ERROR;
}
}
status_t status = OG_SUCCESS;
SYNC_POINT_GLOBAL_START(OGRAC_RECOVERY_INIT_FAIL, &status, OG_ERROR);
status = dtc_recovery_init(dtc_rcy->ss, recover_list, full_recovery);
SYNC_POINT_GLOBAL_END;
if (status != OG_SUCCESS) {
dtc_rcy->in_progress = OG_FALSE;
dtc_rcy->failed = OG_TRUE;
dtc_rcy->ss->dtc_session_type = DTC_TYPE_NONE;
OG_LOG_RUN_ERR("[DTC RCY] failed to init dtc recovery. dtc_rcy->recovery_status=%u, dtc_rcy->failed=%u, "
"dtc_rcy->in_progress=%u",
dtc_rcy->recovery_status, dtc_rcy->failed, dtc_rcy->in_progress);
return OG_ERROR;
}
if (!DB_IS_PRIMARY(&session->kernel->db) && rc_is_master()) {
ckpt_enable(session);
OG_LOG_RUN_INF("ckpt enabled");
}
if (dtc_rcy->canceled) {
dtc_recovery_close(session);
dtc_rcy->failed = OG_TRUE;
OG_LOG_RUN_INF("[DTC RCY] dtc_rcy canceled=%u, dtc_rcy->recovery_status=%u, dtc_rcy->failed=%u, "
"dtc_rcy->in_progress=%u",
dtc_rcy->canceled, dtc_rcy->recovery_status, dtc_rcy->failed, dtc_rcy->in_progress);
}
dtc_recovery_from_double_write_area(g_rc_ctx->session, &g_rc_ctx->info);
if (dtc_rcy->canceled) {
dtc_recovery_close(session);
dtc_rcy->failed = OG_TRUE;
OG_LOG_RUN_INF("[DTC RCY] dtc_rcy canceled=%u, dtc_rcy->recovery_status=%u, dtc_rcy->failed=%u, "
"dtc_rcy->in_progress=%u",
dtc_rcy->canceled, dtc_rcy->recovery_status, dtc_rcy->failed, dtc_rcy->in_progress);
}
if (full_recovery) {
status = dtc_rcy_proc(dtc_rcy->ss);
if (status != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] failed to do full recovery");
}
} else {
OG_LOG_RUN_INF("[DTC RCY][partial recovery] start paral redo replay, session->kernel->lsn=%llu",
session->kernel->lsn);
status = cm_create_thread(dtc_rcy_thread_proc, 0, dtc_rcy->ss, &DTC_RCY_CONTEXT->thread);
if (status != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY][partial recovery], failed to create rcy_thread_proc");
dtc_rcy->failed = OG_TRUE;
dtc_recovery_close(session);
}
}
return status;
}
status_t dtc_start_recovery(knl_session_t *session, instance_list_t *recover_list, bool32 full_recovery)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
cm_spin_lock(&dtc_rcy->lock, NULL);
if (dtc_rcy->in_progress) {
cm_spin_unlock(&dtc_rcy->lock);
OG_LOG_RUN_ERR("[DTC RCY] failed to start recovery task because another one is already in progress");
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", another DTC recovery is already in progress");
return OG_ERROR;
}
dtc_rcy->in_progress = OG_TRUE;
cm_spin_unlock(&dtc_rcy->lock);
return dtc_recover_crashed_nodes(session, recover_list, full_recovery);
}
bool32 dtc_recovery_in_progress(void)
{
if (DTC_RCY_CONTEXT->failed) {
return OG_FALSE;
}
return DTC_RCY_CONTEXT->in_progress;
}
bool32 dtc_recovery_need_stop(void)
{
if (DTC_RCY_CONTEXT->failed || DTC_RCY_CONTEXT->in_progress) {
return OG_TRUE;
}
return OG_FALSE;
}
bool32 dtc_recovery_failed(void)
{
return DTC_RCY_CONTEXT->failed;
}
void dtc_stop_recovery(void)
{
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
OG_LOG_RUN_INF("[DTC RCY] last recovery status=%u, dtc_rcy->failed=%u, dtc_rcy->in_progress=%u, "
"dtc_rcy->canceled=%u, dtc_rcy->ss->canceled=%u, dtc_rcy->ss->killed=%u",
dtc_rcy->recovery_status, dtc_rcy->failed, dtc_rcy->in_progress, dtc_rcy->canceled,
dtc_rcy->ss->canceled, dtc_rcy->ss->killed);
if (dtc_rcy->failed) {
CM_ABORT(0, "[DTC RCY] DTC RCY failed");
}
dtc_rcy->canceled = OG_TRUE;
OG_LOG_RUN_INF("[DTC RCY] stop current running thread, dtc_rcy->in_progress %u", dtc_rcy->in_progress);
while (dtc_recovery_in_progress()) {
cm_sleep(DTC_RCY_WAIT_STOP_SLEEP_TIME);
if (dtc_rcy->ss->canceled || dtc_rcy->ss->killed) {
if (rc_is_master() && !dtc_rcy->full_recovery) {
g_knl_callback.release_knl_session(dtc_rcy->ss);
}
return;
}
}
}
status_t dtc_recover(knl_session_t *session)
{
dtc_node_ctrl_t *curr_ctrl = dtc_my_ctrl(session);
log_point_t curr_point = curr_ctrl->rcy_point;
log_point_t lrp_point = curr_ctrl->lrp_point;
log_context_t *log = &session->kernel->redo_ctx;
reform_detail_t *rf_detail = &g_rc_ctx->reform_detail;
status_t status = OG_SUCCESS;
log_reset_point(session, &lrp_point);
ckpt_set_trunc_point(session, &curr_point);
session->kernel->redo_ctx.lfn = curr_point.lfn;
session->kernel->ckpt_ctx.trunc_lsn = (uint64)session->kernel->lsn;
if (rc_is_master() == OG_TRUE) {
if (DB_IS_PRIMARY(&session->kernel->db) || DB_NOT_READY(session)) {
g_rc_ctx->status = REFORM_RECOVERING;
}
instance_list_t *rcy_list = (instance_list_t *)cm_push(session->stack, sizeof(instance_list_t));
rcy_list->inst_id_count = session->kernel->db.ctrl.core.node_count;
for (uint8 i = 0; i < rcy_list->inst_id_count; i++) {
rcy_list->inst_id_list[i] = i;
}
RC_STEP_BEGIN(rf_detail->recovery_elapsed);
status = dtc_start_recovery(session, rcy_list, OG_TRUE);
RC_STEP_END(rf_detail->recovery_elapsed, RC_STEP_FINISH);
}
if (DB_CLUSTER_NO_CMS) {
g_rc_ctx->status = REFORM_DONE;
}
if (!cm_dbs_is_enable_dbs() && session->kernel->db.recover_for_restore) {
dtc_rcy_context_t *dtc_rcy = DTC_RCY_CONTEXT;
for (uint32_t i = 0; i < session->kernel->db.ctrl.core.node_count; i++) {
reform_rcy_node_t *rcy_log_point = &dtc_rcy->rcy_log_points[i];
log_point_t *point = &rcy_log_point->rcy_point;
OG_LOG_RUN_ERR("[DTC RCY] set first redo asn %u for node %u.", point->asn + 1, i);
if (dtc_bak_reset_logfile(session, point->asn + 1, OG_INVALID_ID32, i) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DTC RCY] set first redo asn %u for node %u failed.", point->asn + 1, i);
return OG_ERROR;
}
}
}
log_reset_point(session, &curr_ctrl->rcy_point);
ckpt_set_trunc_point(session, &curr_ctrl->rcy_point);
log_reset_file(session, &curr_ctrl->rcy_point);
DB_SET_LFN(&log->lfn, curr_ctrl->rcy_point.lfn);
log->buf_lfn[0] = log->lfn + 1;
log->buf_lfn[1] = log->lfn + 2;
if (rc_is_master() == OG_TRUE) {
cm_pop(session->stack);
}
return status;
}
status_t dtc_add_dirtypage_for_recovery(knl_session_t *session, page_id_t page_id)
{
otherwise the shared copy page can't be recovered in below scenario:
1) the shared copy page is removed from recovery set
2) after recovery the redo log of crashed node is truncated
3) later the partial recovery node crash afterwards.
*/
buf_bucket_t *bucket = buf_find_bucket(session, page_id);
cm_spin_lock(&bucket->lock, &session->stat->spin_stat.stat_bucket);
buf_ctrl_t *ctrl = buf_find_from_bucket(bucket, page_id);
drc_buf_res_t *buf_res = drc_get_buf_res_by_pageid(session, page_id);
if (!ctrl || ctrl->lock_mode == DRC_LOCK_NULL) {
as the page on disk may be not the latest one. */
cm_spin_unlock(&bucket->lock);
OG_LOG_RUN_WAR("[DTC RCY] can't skip enter page [%u-%u] due to it's not in memory or not usable", page_id.file,
page_id.page);
return OG_ERROR;
}
if (!ctrl->is_dirty) {
ctrl->is_dirty = OG_TRUE;
ckpt_enque_one_page(session, ctrl);
buf_res->need_flush = OG_TRUE;
}
cm_spin_unlock(&bucket->lock);
return OG_SUCCESS;
}
status_t dtc_init_node_logset_for_backup(knl_session_t *session, uint32 node_id, dtc_rcy_node_t *rcy_node,
logfile_set_t *file_set)
{
dtc_node_ctrl_t *ctrl = dtc_get_ctrl(session, node_id);
database_t *db = &session->kernel->db;
log_file_t *file = NULL;
char *buf = rcy_node->read_buf[rcy_node->read_buf_read_index].aligned_buf;
file_set->logfile_hwm = ctrl->log_hwm;
file_set->log_count = ctrl->log_count;
for (uint32 i = 0; i < file_set->logfile_hwm; i++) {
file = &file_set->items[i];
file->ctrl = (log_file_ctrl_t *)db_get_log_ctrl_item(db->ctrl.pages, i, sizeof(log_file_ctrl_t),
db->ctrl.log_segment, rcy_node->node_id);
rcy_node->handle[i] = -1;
if (LOG_IS_DROPPED(file->ctrl->flg)) {
continue;
}
if (dtc_log_file_not_used(ctrl, i)) {
dtc_init_not_used_log_file(file, db);
continue;
}
if (cm_open_device(file->ctrl->name, file->ctrl->type, knl_io_flag(session), &rcy_node->handle[i]) !=
OG_SUCCESS) {
OG_LOG_RUN_ERR("[DB] failed to open redo log file=%s ", file->ctrl->name);
return OG_ERROR;
}
if (cm_read_device(file->ctrl->type, rcy_node->handle[i], 0, buf,
CM_CALC_ALIGN(sizeof(log_file_head_t), file->ctrl->block_size)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DB] failed to open redo log file=%s ", file->ctrl->name);
return OG_ERROR;
}
if (log_verify_head_checksum(session, (log_file_head_t *)buf, file->ctrl->name) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[BACKUP] failed to verify head checksum of log file=%s", file->ctrl->name);
return OG_ERROR;
}
errno_t ret = memcpy_sp(&file->head, sizeof(log_file_head_t), buf, sizeof(log_file_head_t));
knl_securec_check(ret);
OG_LOG_RUN_INF("[BACKUP] Init logfile=%s, handle=%d, point=[%u-%u] write_pos=%llu for instance=%u",
file->ctrl->name, rcy_node->handle[i], file->head.rst_id, file->head.asn, file->head.write_pos,
rcy_node->node_id);
}
return OG_SUCCESS;
}