* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* bak_log_paral.c
*
*
* IDENTIFICATION
* src/kernel/backup/bak_log_paral.c
*
* -------------------------------------------------------------------------
*/
#include "bak_log_paral.h"
#include "dtc_database.h"
#include "knl_backup_module.h"
#ifdef __cplusplus
extern "C" {
#endif
void bak_set_head_for_paral_log(bak_t *bak)
{
bak->log_proc_count = 0;
bak->paral_log_bak_complete = OG_FALSE;
bak->paral_log_bak_number = 0;
bak->paral_last_asn = 0;
bak->arch_is_lost = OG_FALSE;
bak->log_proc_is_ready = OG_FALSE;
errno_t ret = memset_sp(&bak->device, sizeof(bak_device_t), 0, sizeof(bak_device_t));
knl_securec_check(ret);
}
status_t bak_found_archived_log(knl_session_t *session, uint32 rst_id, uint32 asn, arch_ctrl_t **arch_ctrl,
bool32 is_paral_log_proc)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
OG_LOG_DEBUG_INF("[BACKUP] start wait for archivelog: %u-%u", rst_id, asn);
while (*arch_ctrl == NULL && !bak->failed) {
*arch_ctrl = arch_get_archived_log_info(session, rst_id, asn, ARCH_DEFAULT_DEST, session->kernel->id);
if (is_paral_log_proc && bak->progress.stage == BACKUP_LOG_STAGE) {
return OG_SUCCESS;
}
cm_sleep(BAK_LOG_SLEEP_TIME);
}
device_type_t type = arch_get_device_type((*arch_ctrl)->name);
if (*arch_ctrl != NULL && !cm_exist_device(type, (*arch_ctrl)->name)) {
OG_LOG_RUN_ERR("[BACKUP] failed to get archived log for [%u-%u]", rst_id, asn);
return OG_ERROR;
}
if (bak->failed) {
return OG_ERROR;
}
OG_LOG_RUN_INF("[BACKUP] found archivelog: %u-%u, name: %s", rst_id, asn, (*arch_ctrl)->name);
return OG_SUCCESS;
}
status_t bak_set_log_ctrl(knl_session_t *session, bak_process_t *process, uint32 asn, uint32 *block_size, bool32
*compressed)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
database_t *db = &session->kernel->db;
bak_assignment_t *assign_ctrl = &process->assign_ctrl;
bak_ctrl_t *ctrl = &process->ctrl;
log_file_t *file = NULL;
arch_ctrl_t *arch_ctrl = NULL;
uint32 rst_id = bak_get_rst_id(bak, asn, &(db->ctrl.core.resetlogs));
errno_t ret;
logfile_set_t *logfile_set = MY_LOGFILE_SET(session);
OG_LOG_RUN_INF("[BACKUP] Try to get archived log for [%u-%u]", rst_id, asn);
assign_ctrl->log_asn = asn;
assign_ctrl->file_id = bak_log_get_id(session, bak->record.data_type, rst_id, asn);
assign_ctrl->file_size = 0;
if (assign_ctrl->file_id == OG_INVALID_ID32) {
arch_ctrl = arch_get_archived_log_info(session, rst_id, asn, ARCH_DEFAULT_DEST, session->kernel->id);
if (arch_ctrl == NULL) {
OG_LOG_RUN_ERR("[BACKUP] failed to get archived log for [%u-%u]", rst_id, asn);
OG_THROW_ERROR(ERR_FILE_NOT_EXIST, "archive log", "for backup");
return OG_ERROR;
}
ret = strcpy_sp(ctrl->name, OG_FILE_NAME_BUFFER_SIZE, arch_ctrl->name);
knl_securec_check(ret);
ctrl->type = arch_get_device_type(arch_ctrl->name);
OG_LOG_DEBUG_INF("[BACKUP] Get archived log %s for [%u-%u]", ctrl->name, rst_id, asn);
*block_size = (uint32)arch_ctrl->block_size;
*compressed = arch_is_compressed(arch_ctrl);
bak_record_new_file(bak, BACKUP_ARCH_FILE, asn, 0, rst_id, OG_FALSE, arch_ctrl->start_lsn, arch_ctrl->end_lsn);
} else {
file = &logfile_set->items[assign_ctrl->file_id];
ret = strcpy_sp(ctrl->name, OG_FILE_NAME_BUFFER_SIZE, file->ctrl->name);
knl_securec_check(ret);
ctrl->type = file->ctrl->type;
*block_size = file->ctrl->block_size;
*compressed = OG_FALSE;
OG_LOG_DEBUG_INF("[BACKUP] Get online log %s for [%u-%u] write pos %llu",
ctrl->name, rst_id, asn, file->head.write_pos);
if (assign_ctrl->file_id == dtc_my_ctrl(session)->log_last) {
bak_record_new_file(bak, BACKUP_LOG_FILE, assign_ctrl->file_id, 0, rst_id, OG_FALSE, 0, 0);
assign_ctrl->file_size = file->head.write_pos;
} else {
bak_record_new_file(bak, BACKUP_ARCH_FILE, asn, 0, rst_id, OG_FALSE, 0, 0);
}
}
if (cm_open_device(ctrl->name, ctrl->type, knl_arch_io_flag(session, *compressed), &ctrl->handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[BACKUP] failed to open %s", ctrl->name);
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t bak_set_archived_log_ctrl(knl_session_t *session, bak_process_t *process, uint32 asn, uint32 *block_size,
bool32 *compressed, bool32 is_paral_log_proc)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
database_t *db = &session->kernel->db;
bak_assignment_t *assign_ctrl = &process->assign_ctrl;
bak_ctrl_t *ctrl = &process->ctrl;
arch_ctrl_t *arch_ctrl = NULL;
reset_log_t rst_log = db->ctrl.core.resetlogs;
uint32 rst_id = bak_get_rst_id(bak, asn, &(rst_log));
errno_t ret;
if (bak_found_archived_log(session, rst_id, asn, &arch_ctrl, is_paral_log_proc) != OG_SUCCESS) {
return OG_ERROR;
}
if (arch_ctrl == NULL) {
bak->arch_is_lost = OG_TRUE;
knl_panic_log(is_paral_log_proc,
"[BACKUP] only in paral log proc, arch ctrl could get nothing while paral log proc is exiting");
return OG_SUCCESS;
}
assign_ctrl->file_id = OG_INVALID_ID32;
assign_ctrl->log_asn = asn;
assign_ctrl->file_size = 0;
ret = strcpy_sp(ctrl->name, OG_FILE_NAME_BUFFER_SIZE, arch_ctrl->name);
knl_securec_check(ret);
*block_size = (uint32)arch_ctrl->block_size;
*compressed = arch_is_compressed(arch_ctrl);
bak_record_new_file(bak, BACKUP_ARCH_FILE, asn, 0, rst_id,
is_paral_log_proc, arch_ctrl->start_lsn,
arch_ctrl->end_lsn);
ctrl->type = arch_get_device_type(ctrl->name);
if (cm_open_device(ctrl->name, ctrl->type, knl_arch_io_flag(session, *compressed), &ctrl->handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[BACKUP] failed to open %s", ctrl->name);
return OG_ERROR;
}
return OG_SUCCESS;
}
uint32 bak_get_log_slot(bak_t *bak, bool32 is_paral_log_proc)
{
if (is_paral_log_proc) {
uint32 slot = bak->device.sec_number + bak->paral_log_bak_number + 1;
knl_panic_log(slot > bak->device.sec_number,
"[BACKUP] log's slot number %u should larger than df's sec number %u", slot, bak->device.sec_number);
return slot;
}
return bak->file_count;
}
static void bak_try_update_device_info(knl_session_t *session, datafile_t *datafile, uint32 file_id, uint64 file_size,
uint32 file_hwm_start)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
uint64 sec_size = 0;
bool32 diveded = OG_FALSE;
if (!bak_log_paral_enable(bak)) {
return;
}
knl_panic_log(bak->section_threshold > 0, "[BACKUP] section_threshold [%llu] is not correct",
bak->section_threshold);
bak->device.datafile[file_id].file_size = file_size;
bak->device.datafile[file_id].hwm_start = file_hwm_start;
bak->device.datafile[file_id].id = datafile->ctrl->id;
bak->device.datafile[file_id].sec_num =
bak_datafile_section_count(session, file_size, file_hwm_start, &sec_size, &diveded);
errno_t ret = strcpy_sp(bak->device.datafile[file_id].name, OG_FILE_NAME_BUFFER_SIZE, datafile->ctrl->name);
knl_securec_check(ret);
bak->device.sec_number += bak->device.datafile[file_id].sec_num;
}
status_t bak_check_datafiles_num(knl_session_t *session, bool32 update_device)
{
bak_context_t *bkup_ctx = &session->kernel->backup_ctx;
bak_t *bak = &bkup_ctx->bak;
uint32 file_id = 0;
uint32 datafile_num = 0;
uint64 file_size = 0;
uint32 file_hwm_start = 0;
bak->device.sec_number = 0;
for (;;) {
datafile_t *datafile = db_get_next_datafile(session, &file_id, &file_size, &file_hwm_start);
if (datafile == NULL) {
break;
}
if (bak->target_info.target == TARGET_ALL && bak->exclude_spcs[datafile->space_id]) {
file_id = datafile->ctrl->id + 1;
continue;
}
if (bak->target_info.target == TARGET_TABLESPACE && !bak->include_spcs[datafile->space_id]) {
file_id = datafile->ctrl->id + 1;
continue;
}
if (update_device) {
bak_try_update_device_info(session, datafile, file_id, file_size, file_hwm_start);
}
datafile_num++;
file_id = datafile->ctrl->id + 1;
}
if (datafile_num == 0) {
bak->failed = OG_TRUE;
OG_LOG_RUN_ERR("[BACKUP] valid datafiles number is 0");
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", can not backup when valid datafiles number is 0");
return OG_ERROR;
}
if (update_device) {
bak->log_proc_is_ready = OG_TRUE;
}
return OG_SUCCESS;
}
status_t bak_check_bak_device(bak_t *bak, datafile_t *datafile, bak_assignment_t *assign_ctrl)
{
if (!bak_log_paral_enable(bak)) {
return OG_SUCCESS;
}
if (bak->device.datafile[assign_ctrl->file_id].id != datafile->ctrl->id) {
OG_LOG_RUN_ERR("[BACKUP] datafile id has been changed from %u to %u",
bak->device.datafile[assign_ctrl->file_id].id, datafile->ctrl->id);
return OG_ERROR;
}
if (bak->device.datafile[assign_ctrl->file_id].file_size > assign_ctrl->file_size) {
OG_LOG_RUN_ERR("[BACKUP] datafile size has been shrink from %llu to %llu",
bak->device.datafile[assign_ctrl->file_id].file_size, assign_ctrl->file_size);
return OG_ERROR;
}
if (bak->device.datafile[assign_ctrl->file_id].hwm_start != assign_ctrl->file_hwm_start) {
OG_LOG_RUN_ERR("[BACKUP] datafile hwm_start has been changed from %u to %u",
bak->device.datafile[assign_ctrl->file_id].hwm_start, assign_ctrl->file_hwm_start);
return OG_ERROR;
}
if (strcmp(bak->device.datafile[assign_ctrl->file_id].name, datafile->ctrl->name) != 0) {
OG_LOG_RUN_ERR("[BACKUP] datafile name has been changed from %s to %s",
bak->device.datafile[assign_ctrl->file_id].name, datafile->ctrl->name);
return OG_ERROR;
}
return OG_SUCCESS;
}
void bak_try_reset_file_size(bak_t *bak, bak_assignment_t *assign_ctrl)
{
if (bak_log_paral_enable(bak)) {
OG_LOG_RUN_INF("[BACKUP] file [id: %u] size been reset from %llu to %llu", assign_ctrl->file_id,
assign_ctrl->file_size, bak->device.datafile[assign_ctrl->file_id].file_size);
assign_ctrl->file_size = bak->device.datafile[assign_ctrl->file_id].file_size;
}
}
void bak_try_wait_paral_log_proc(bak_t *bak)
{
bak_progress_t *progress = &bak->progress;
if (!bak_log_paral_enable(bak)) {
OG_LOG_RUN_INF("[BACKUP] do not need wait paral log proc");
return;
}
cm_spin_lock(&progress->lock, NULL);
progress->stage = BACKUP_LOG_STAGE;
cm_spin_unlock(&progress->lock);
while (!bak->failed) {
if (bak->paral_log_bak_complete) {
break;
}
cm_sleep(BAK_LOG_SLEEP_TIME);
}
OG_LOG_RUN_INF("[BACKUP] successful to found paral log proc exit");
}
status_t bak_try_merge_bak_info(bak_t *bak, uint32 last_asn, uint32 *start_asn)
{
if (!bak_log_paral_enable(bak)) {
OG_LOG_RUN_INF("[BACKUP] do not need merge bak info");
return OG_SUCCESS;
}
if (bak->paral_last_asn == 0) {
OG_LOG_RUN_INF("[BACKUP] paral log proc doesn't obtain any archive log before exit");
return OG_SUCCESS;
}
knl_panic_log(*start_asn <= last_asn, "start asn [%u] should be not more than last asn [%u]", *start_asn, last_asn);
if (bak->paral_last_asn < *start_asn || bak->paral_last_asn > last_asn) {
OG_LOG_RUN_ERR("[BACKUP] paral last asn: %u is invalid, start asn: %u, last asn: %u",
bak->paral_last_asn, *start_asn, last_asn);
return OG_ERROR;
}
OG_LOG_RUN_INF("[BACKUP] update start asn from %u to %u", *start_asn, bak->paral_last_asn + 1);
*start_asn = bak->paral_last_asn + 1;
uint32 temp_count = bak->file_count;
bak->file_count += bak->paral_log_bak_number;
OG_LOG_RUN_INF("[BACKUP] update file_count from %u to %u", temp_count, bak->file_count);
return OG_SUCCESS;
}
bool32 bak_equal_last_asn(knl_session_t *session, uint32 last_asn)
{
log_context_t *redo_ctx = &session->kernel->redo_ctx;
if (!bak_need_wait_arch(session)) {
return OG_FALSE;
}
OG_LOG_RUN_INF("[BACKUP] curr file's asn: %u, last asn: %u in backing up operation",
redo_ctx->files[redo_ctx->curr_file].head.asn, last_asn);
return (redo_ctx->files[redo_ctx->curr_file].head.asn == last_asn);
}
void bak_log_read_proc(thread_t *thread)
{
bak_process_t *process = (bak_process_t *)thread->argument;
knl_session_t *session = process->session;
bak_t *bak = &session->kernel->backup_ctx.bak;
bak_ctrlinfo_t *ctrlinfo = &bak->record.ctrlinfo;
uint32 curr_asn = (uint32)ctrlinfo->rcy_point.asn;
bak_process_t *proc = NULL;
uint32 block_size = 0;
bool32 arch_compressed = OG_FALSE;
OG_LOG_RUN_INF("[BACKUP] parallel process %u start to backup archive log", process->proc_id);
while (!thread->closed && !bak->failed) {
if (!bak->log_proc_is_ready) {
cm_sleep(BAK_LOG_SLEEP_TIME);
continue;
}
if (bak->progress.stage == BACKUP_LOG_STAGE) {
bak_wait_paral_proc(session, OG_TRUE);
bak->paral_log_bak_complete = OG_TRUE;
break;
}
if (bak_get_free_proc(session, &proc, OG_TRUE) != OG_SUCCESS) {
bak->failed = OG_TRUE;
break;
}
if (bak_set_archived_log_ctrl(session, proc, curr_asn, &block_size, &arch_compressed, OG_TRUE) != OG_SUCCESS) {
bak->failed = OG_TRUE;
break;
}
if (bak->arch_is_lost) {
knl_panic_log(bak->progress.stage == BACKUP_LOG_STAGE, "only in BACKUP LOG STAGE, get arch log can failed");
bak_wait_paral_proc(session, OG_TRUE);
bak->paral_log_bak_complete = OG_TRUE;
break;
}
proc->assign_ctrl.log_block_size = block_size;
if (bak_assign_backup_task(session, proc, 0, OG_TRUE) != OG_SUCCESS) {
cm_close_device(proc->ctrl.type, &proc->ctrl.handle);
bak->failed = OG_TRUE;
break;
}
curr_asn++;
}
if (bak->failed) {
OG_LOG_RUN_ERR("[BACKUP] failed: paral log process %u stop", process->proc_id);
bak_set_error(&bak->error_info);
} else {
OG_LOG_RUN_INF("[BACKUP] success: paral log process %u stop", process->proc_id);
}
}
#ifdef __cplusplus
}
#endif