* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* bak_restore.c
*
*
* IDENTIFICATION
* src/kernel/backup/bak_restore.c
*
* -------------------------------------------------------------------------
*/
#include "knl_backup_module.h"
#include "bak_restore.h"
#include "cm_file.h"
#include "bak_paral.h"
#include "knl_abr.h"
#include "knl_context.h"
#include "knl_space_ddl.h"
#include "dtc_dls.h"
#include "dtc_ckpt.h"
#include "dtc_backup.h"
#include "dtc_log.h"
#include "dtc_database.h"
#include "rc_reform.h"
#include "srv_param_common.h"
#include "knl_badblock.h"
#ifdef __cplusplus
extern "C" {
#endif
#define RST_NEED_RESTORE_CTRLFILE(bak) ((bak)->rst_file.file_type == RESTORE_ALL || \
(bak)->rst_file.file_type == RESTORE_CTRL)
#define RST_NEED_RESTORE_DATAFILE(bak) ((bak)->rst_file.file_type == RESTORE_ALL || \
(bak)->rst_file.file_type == RESTORE_DATAFILE)
#define RST_NEED_RESTORE_ARCHFILE(bak) ((bak)->rst_file.file_type == RESTORE_ALL || \
(bak)->rst_file.file_type == RESTORE_ARCHFILE)
#define ESTIMATE_VALID_DATA_PERCENT 0.5
#define BAK_WAIT_FILE_EXTEND_MS 1000
static void rst_wait_logfiles_created(bak_t *bak);
void rst_close_ctrl_file(ctrlfile_set_t *ctrlfiles)
{
for (uint32 i = 0; i < ctrlfiles->count; i++) {
cm_close_device(ctrlfiles->items[i].type, &ctrlfiles->items[i].handle);
}
}
void rst_close_log_files(knl_session_t *session)
{
log_file_t *file = NULL;
for (uint32 i = 0; i < dtc_my_ctrl(session)->log_hwm; i++) {
file = &MY_LOGFILE_SET(session)->items[i];
if (LOG_IS_DROPPED(file->ctrl->flg)) {
continue;
}
cm_close_device(file->ctrl->type, &file->handle);
}
}
status_t rst_write_data(knl_session_t *session, bak_ctrl_t *ctrl, const char *buf, int32 size)
{
if (cm_open_device(ctrl->name, ctrl->type, O_SYNC, &ctrl->handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to open %s", ctrl->name);
return OG_ERROR;
}
if (cm_write_device(ctrl->type, ctrl->handle, ctrl->offset, buf, size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write %s", ctrl->name);
return OG_ERROR;
}
if (ctrl->type != DEV_TYPE_DBSTOR_FILE) {
if (db_fdatasync_file(session, ctrl->handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to fdatasync logfile %s", ctrl->name);
return OG_ERROR;
}
}
ctrl->offset += size;
return OG_SUCCESS;
}
static status_t rst_create_ctrl_device(knl_session_t *session, ctrlfile_t *ctrlfile)
{
knl_instance_t *kernel = session->kernel;
bool32 is_dbstor = knl_dbs_is_enable_dbs();
if (is_dbstor) {
ctrlfile->blocks = kernel->attr.clustered ? CTRL_MAX_PAGES_CLUSTERED : CTRL_MAX_PAGES_NONCLUSTERED;
ctrlfile->block_size = OG_DFLT_CTRL_BLOCK_SIZE;
uint32 flags = (ctrlfile->type == DEV_TYPE_PGPOOL ? 0xFFFFFFFF : knl_io_flag(session));
if (cm_build_device(ctrlfile->name, ctrlfile->type, kernel->attr.xpurpose_buf,
OG_XPURPOSE_BUFFER_SIZE, (int64)ctrlfile->blocks * ctrlfile->block_size,
flags, OG_FALSE, &ctrlfile->handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to build %s ", ctrlfile->name);
return OG_ERROR;
}
if (cm_open_device(ctrlfile->name, ctrlfile->type, knl_io_flag(session), &ctrlfile->handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to open %s", ctrlfile->name);
return OG_ERROR;
}
} else {
if (cm_create_device(ctrlfile->name, ctrlfile->type, knl_io_flag(session), &ctrlfile->handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to create %s ", ctrlfile->name);
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t rst_build_ctrl_file(knl_session_t *session)
{
knl_instance_t *kernel = session->kernel;
ctrlfile_set_t *ctrlfiles = &kernel->db.ctrlfiles;
ctrlfile_t *ctrlfile = NULL;
uint32 size = CTRL_MAX_PAGES(session) * OG_DFLT_CTRL_BLOCK_SIZE;
db_store_core(&kernel->db);
knl_panic(ctrlfiles->count <= OG_MAX_CTRL_FILES);
bak_calc_ctrlfile_checksum(session, (char *)kernel->db.ctrl.pages, CTRL_MAX_PAGES(session));
for (uint32 i = 0; i < ctrlfiles->count; i++) {
ctrlfile = &ctrlfiles->items[i];
if (!cm_exist_device(ctrlfile->type, ctrlfile->name)) {
if (rst_create_ctrl_device(session, ctrlfile)) {
return OG_ERROR;
}
} else {
if (cm_open_device(ctrlfile->name, ctrlfile->type, knl_io_flag(session), &ctrlfile->handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to open %s", ctrlfile->name);
return OG_ERROR;
}
}
if (cm_write_device(ctrlfile->type, ctrlfile->handle, 0, (void *)(&kernel->db.ctrl.pages[0]),
size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write %s", ctrlfile->name);
cm_close_device(ctrlfile->type, &ctrlfile->handle);
return OG_ERROR;
}
if (db_fdatasync_file(session, ctrlfile->handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to fdatasync datafile %s", ctrlfile->name);
cm_close_device(ctrlfile->type, &ctrlfile->handle);
return OG_ERROR;
}
cm_close_device(ctrlfile->type, &ctrlfile->handle);
}
return OG_SUCCESS;
}
static void rst_offline_unused_space(knl_session_t *session)
{
knl_instance_t *kernel = session->kernel;
database_t *db = &kernel->db;
space_t *space = NULL;
space_ctrl_t *ctrl = NULL;
bak_t *bak = &kernel->backup_ctx.bak;
if (!BAK_IS_TABLESPCE_RESTORE(bak)) {
return;
}
for (uint32 i = 0; i < OG_MAX_SPACES; i++) {
space = &db->spaces[i];
ctrl = space->ctrl;
if (SPACE_IS_DEFAULT(space) || !ctrl->used ||
cm_str_equal(bak->spc_name, ctrl->name)) {
continue;
}
spc_offline_space_files(session, db->spaces[i].ctrl->files, db->spaces[i].ctrl->file_hwm);
}
}
static void rst_convert_temp_path(knl_session_t *session)
{
knl_instance_t *kernel = session->kernel;
bak_t *bak = &kernel->backup_ctx.bak;
char convert_path[OG_FILE_NAME_BUFFER_SIZE];
errno_t err;
if (!BAK_IS_TABLESPCE_RESTORE(bak)) {
return;
}
err = snprintf_s(convert_path, OG_FILE_NAME_BUFFER_SIZE, OG_FILE_NAME_BUFFER_SIZE - 1, "%s/data", kernel->home);
knl_securec_check_ss(err);
db_convert_temp_path(session, convert_path);
}
static status_t rst_restore_ctrlfile_items(knl_session_t *session)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
if (db_generate_ctrlitems(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (db_update_ctrl_filename(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (db_update_config_ctrl_name(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak->record.attr.level == 0) {
if (db_create_ctrl_file(session) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (db_update_storage_filename(session) != OG_SUCCESS) {
return OG_ERROR;
}
rst_convert_temp_path(session);
rst_offline_unused_space(session);
return OG_SUCCESS;
}
status_t rst_process_existed_datafile(bak_process_t *ogx, datafile_ctrl_t *df, uint32 i)
{
status_t status;
if (ogx->datafiles[i] != OG_INVALID_HANDLE && !cm_str_equal(ogx->datafile_name[i], df->name)) {
if (ogx->datafile_version[i] == df->create_version) {
OG_LOG_RUN_WAR("[RESTORE] datafile %s has been renamed to %s, id %u, create version %u",
ogx->datafile_name[i], df->name, i, df->create_version);
status = cm_rename_device(df->type, ogx->datafile_name[i], df->name);
} else {
OG_LOG_RUN_WAR("[RESTORE] datafile %s has been deleted, recreated as %s, id %u, version: old %u, new %u",
ogx->datafile_name[i], df->name, i, ogx->datafile_version[i], df->create_version);
cm_close_device(df->type, &ogx->datafiles[i]);
status = cm_remove_device(ogx->file_type[i], ogx->datafile_name[i]);
ogx->datafiles[i] = OG_INVALID_HANDLE;
}
return status;
}
return OG_SUCCESS;
}
static status_t rst_create_datafile_device(knl_session_t *session, datafile_t *df, bak_process_t *ogx, uint32
file_index)
{
knl_instance_t *kernel = session->kernel;
bool32 is_dbstor = knl_dbs_is_enable_dbs();
if (is_dbstor) {
if (cm_build_device(df->ctrl->name, df->ctrl->type, kernel->attr.xpurpose_buf,
OG_XPURPOSE_BUFFER_SIZE, df->ctrl->size,
knl_io_flag(session), OG_FALSE, &ogx->datafiles[file_index]) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to build %s ", df->ctrl->name);
return OG_ERROR;
}
if (cm_open_device(df->ctrl->name, df->ctrl->type,
knl_io_flag(session), &ogx->datafiles[file_index]) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to open %s ", df->ctrl->name);
return OG_ERROR;
}
} else {
if (cm_create_device(df->ctrl->name, df->ctrl->type, knl_io_flag(session),
&ogx->datafiles[file_index]) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to create %s ", df->ctrl->name);
return OG_ERROR;
}
}
return OG_SUCCESS;
}
status_t rst_create_datafiles(knl_session_t *session, bak_process_t *ogx)
{
knl_instance_t *kernel = session->kernel;
bak_context_t *bak_ctx = &session->kernel->backup_ctx;
bak_t *bak = &kernel->backup_ctx.bak;
datafile_t *df = NULL;
errno_t ret;
if (BAK_IS_FULL_BUILDING(bak) && !bak->is_first_link) {
return OG_SUCCESS;
}
for (uint32 i = 0; i < OG_MAX_DATA_FILES; i++) {
ogx->datafile_size[i] = OG_INVALID_INT64;
df = &kernel->db.datafiles[i];
if (bak->rst_file.file_type == RESTORE_DATAFILE && bak->rst_file.file_id != df->ctrl->id) {
continue;
}
OG_LOG_DEBUG_INF("[BACKUP] %u datafile1 %s, datatfile2 %s", i, df->ctrl->name, ogx->datafile_name[i]);
if (!df->ctrl->used || !DATAFILE_IS_ONLINE(df)) {
if (strlen(ogx->datafile_name[i]) <= 0) {
continue;
}
cm_close_device(ogx->file_type[i], &ogx->datafiles[i]);
if (cm_remove_device(ogx->file_type[i], ogx->datafile_name[i]) != OG_SUCCESS) {
return OG_ERROR;
}
ogx->datafile_name[i][0] = '\0';
ogx->datafiles[i] = OG_INVALID_HANDLE;
continue;
}
if (rst_process_existed_datafile(ogx, df->ctrl, i) != OG_SUCCESS) {
return OG_ERROR;
}
if (ogx->datafiles[i] == OG_INVALID_HANDLE) {
if (rst_create_datafile_device(session, df, ogx, i) != OG_SUCCESS) {
return OG_ERROR;
}
bak->extended[i] = 0;
OG_LOG_DEBUG_INF("[RESTORE] build file %s size %llu block size %u ", df->ctrl->name,
(uint64)(df->ctrl->size), df->ctrl->block_size);
} else {
if (rst_extend_database_file(session, bak_ctx, df->ctrl->name, df->ctrl->type, df->ctrl->size) !=
OG_SUCCESS) {
return OG_ERROR;
}
bak->extended[i] = 1;
OG_LOG_RUN_INF("[RESTORE] extend file %s, file size: %lld", df->ctrl->name, df->ctrl->size);
}
ret = strcpy_sp(ogx->datafile_name[i], OG_FILE_NAME_BUFFER_SIZE, df->ctrl->name);
knl_securec_check(ret);
ogx->file_type[i] = df->ctrl->type;
ogx->datafile_version[i] = df->ctrl->create_version;
ogx->datafile_size[i] = cm_device_size(cm_device_type(ogx->datafile_name[i]), ogx->datafiles[i]);
if (ogx->datafile_size[i] == -1) {
OG_THROW_ERROR(ERR_SEEK_FILE, 0, SEEK_END, errno);
return OG_ERROR;
}
}
OG_LOG_RUN_INF("[RESTORE] create all datafiles finished");
return OG_SUCCESS;
}
status_t rst_create_logfiles(knl_session_t *session)
{
knl_instance_t *kernel = session->kernel;
log_file_ctrl_t *logfile = NULL;
int32 handle = OG_INVALID_HANDLE;
uint32 i;
bak_t *bak = &kernel->backup_ctx.bak;
logfile_set_t *logfile_set = MY_LOGFILE_SET(session);
bool32 is_dbstor = knl_dbs_is_enable_dbs();
for (i = 0; i < logfile_set->logfile_hwm; i++) {
logfile = logfile_set->items[i].ctrl;
if (LOG_IS_DROPPED(logfile->flg)) {
continue;
}
if (BAK_IS_FULL_BUILDING(bak) && !bak->is_first_link && cm_file_exist(logfile->name)) {
OG_LOG_RUN_INF("[BUILD] delete logfile %s ", logfile->name);
if (cm_remove_device(logfile->type, logfile->name) != OG_SUCCESS) {
return OG_ERROR;
}
OG_LOG_RUN_INF("[BUILD] create logfile %s ", logfile->name);
}
if (cm_create_device(logfile->name, logfile->type, knl_redo_io_flag(session), &handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to create %s ", logfile->name);
return OG_ERROR;
}
OG_LOG_RUN_INF("[RESTORE] restore build file, src_file:%s, file size :%lld", logfile->name, logfile->size);
cm_close_device(logfile->type, &handle);
if (is_dbstor) {
break;
}
}
return OG_SUCCESS;
}
status_t rst_extend_database_file(knl_session_t *session, bak_context_t *ogx, const char *name, device_type_t type,
int64 size)
{
bak_process_t *proc = NULL;
proc = &ogx->process[BAK_COMMON_PROC];
if (rst_extend_file(session, name, type, size, proc->backup_buf.aligned_buf,
BACKUP_BUFFER_SIZE(&session->kernel->backup_ctx.bak)) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t rst_set_logfile_ctrl(knl_session_t *session, uint32 curr_file_index, log_file_head_t *head,
bak_ctrl_t *ctrl, bool32 *ignore_data)
{
knl_instance_t *kernel = session->kernel;
bak_t *bak = &kernel->backup_ctx.bak;
bak_file_t *file_info = &bak->files[curr_file_index];
log_file_ctrl_t *logfile = NULL;
logfile_set_t *logfile_set = MY_LOGFILE_SET(session);
*ignore_data = OG_FALSE;
ctrl->offset = 0;
if (file_info->type == BACKUP_LOG_FILE) {
rst_wait_logfiles_created(bak);
logfile = logfile_set->items[file_info->id].ctrl;
ctrl->type = logfile->type;
errno_t ret = strcpy_sp(ctrl->name, OG_FILE_NAME_BUFFER_SIZE, logfile->name);
knl_securec_check(ret);
if (cm_open_device(logfile->name, logfile->type, knl_redo_io_flag(session), &ctrl->handle) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
knl_panic(file_info->id == head->asn);
if (BAK_IS_DBSOTR(bak)) {
arch_file_name_info_t file_name_info = {head->rst_id, head->asn, file_info->inst_id,
OG_FILE_NAME_BUFFER_SIZE, head->first_lsn, head->last_lsn, ctrl->name};
arch_set_archive_log_name_with_lsn(session, ARCH_DEFAULT_DEST, &file_name_info);
} else {
arch_set_archive_log_name(session, head->rst_id, head->asn, ARCH_DEFAULT_DEST,
ctrl->name, OG_FILE_NAME_BUFFER_SIZE, session->kernel->id);
}
ctrl->type = arch_get_device_type(ctrl->name);
OG_LOG_DEBUG_INF("[RESTORE] bak_set_logfile_ctrl get archive log %s", ctrl->name);
if (cm_exist_device(ctrl->type, ctrl->name)) {
OG_LOG_RUN_INF("[RESTORE] Archive log %s exists", ctrl->name);
if (arch_process_existed_archfile(session, ctrl->name, *head, ignore_data) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (!*ignore_data) {
if (cm_create_device(ctrl->name, ctrl->type, O_BINARY | O_SYNC | O_RDWR | O_EXCL, &ctrl->handle) !=
OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to create %s", ctrl->name);
return OG_ERROR;
}
OG_LOG_RUN_INF("[RESTORE] Create %s", ctrl->name);
}
return OG_SUCCESS;
}
static status_t rst_amend_ctrlinfo(knl_session_t *session, uint32 last_asn, uint32 log_first)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
bak_ctrlinfo_t *ctrlinfo = &bak->record.ctrlinfo;
if (log_reset_logfile(session, last_asn, log_first) != OG_SUCCESS) {
return OG_ERROR;
}
dtc_my_ctrl(session)->dw_start = dtc_my_ctrl(session)->dw_end = DW_DISTRICT_BEGIN(session->kernel->id);
dtc_my_ctrl(session)->scn = ctrlinfo->scn;
dtc_my_ctrl(session)->lrp_point = ctrlinfo->lrp_point;
session->kernel->scn = dtc_my_ctrl(session)->scn;
if (dtc_save_ctrl(session, session->kernel->id) != OG_SUCCESS) {
CM_ABORT(0, "[RESTORE] ABORT INFO: save node control file failed when restore log files");
}
return OG_SUCCESS;
}
static status_t rst_wait_read_data(bak_t *bak, bak_process_t *ogx)
{
while (!bak->failed && ogx->read_size == 0) {
if (bak->progress.stage == BACKUP_READ_FINISHED) {
OG_THROW_ERROR(ERR_INVALID_BACKUPSET, "incomplete backup");
return OG_ERROR;
}
cm_sleep(1);
}
return bak->failed ? OG_ERROR : OG_SUCCESS;
}
static status_t rst_update_track_file(knl_session_t *session, bak_t *bak, bool32 allow_not_exist)
{
build_progress_t *build_progress = &bak->progress.build_progress;
char bak_process_file[OG_FILE_NAME_BUFFER_SIZE];
int32 handle = INVALID_FILE_HANDLE;
errno_t ret;
if (!BAK_IS_FULL_BUILDING(bak)) {
return OG_SUCCESS;
}
ret = snprintf_s(bak_process_file, OG_FILE_NAME_BUFFER_SIZE, OG_FILE_NAME_BUFFER_SIZE - 1, "%s/bak_process_file",
session->kernel->home);
knl_securec_check_ss(ret);
if (!cm_file_exist((const char *)bak_process_file)) {
if (allow_not_exist) {
if (cm_create_file(bak_process_file, O_BINARY | O_RDWR, &handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[BUILD] failed to create track file %s", bak_process_file);
return OG_ERROR;
}
} else {
OG_LOG_RUN_ERR("[BUILD] bak track file : %s does not exist", bak_process_file);
return OG_ERROR;
}
} else {
if (cm_open_file(bak_process_file, O_BINARY | O_RDWR, &handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[BUILD] failed to open %s ", bak_process_file);
return OG_ERROR;
}
}
if (cm_write_file(handle, (void *)build_progress, sizeof(build_progress_t)) != OG_SUCCESS) {
cm_close_file(handle);
OG_LOG_RUN_ERR("[BUILD] failed to write %s", bak_process_file);
return OG_ERROR;
}
if (session->kernel->attr.enable_fdatasync) {
if (cm_fdatasync_file(handle) != OG_SUCCESS) {
cm_close_file(handle);
OG_LOG_RUN_ERR("[BUILD] failed to fdatasync datafile %s", bak_process_file);
return OG_ERROR;
}
}
bak->build_retry_time = 0;
cm_close_file(handle);
return OG_SUCCESS;
}
status_t rst_delete_track_file(knl_session_t *session, bak_t *bak, bool32 allow_not_exist)
{
char bak_process_file[OG_FILE_NAME_BUFFER_SIZE];
errno_t ret;
if (!BAK_IS_FULL_BUILDING(bak) && bak->error_info.err_code != ERR_BUILD_CANCELLED) {
return OG_SUCCESS;
}
ret = snprintf_s(bak_process_file, OG_FILE_NAME_BUFFER_SIZE, OG_FILE_NAME_BUFFER_SIZE - 1, "%s/bak_process_file",
session->kernel->home);
knl_securec_check_ss(ret);
if (!cm_file_exist((const char *)bak_process_file)) {
if (allow_not_exist) {
return OG_SUCCESS;
}
OG_LOG_RUN_ERR("[BUILD] failed to find track file %s", bak_process_file);
return OG_ERROR;
}
if (cm_remove_file(bak_process_file) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[BUILD] failed to delete track file %s", bak_process_file);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t rst_restore_logfile(knl_session_t *session, log_file_head_t *head, bool32 ignore_data)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_process_t *write_ctx = &ogx->process[BAK_COMMON_PROC];
bak_t *bak = &ogx->bak;
uint64 data_size = head->write_pos;
uint32 read_size = CM_CALC_ALIGN((uint32)head->block_size, sizeof(log_file_head_t));
int32 left_offset;
errno_t ret;
build_progress_t *build_progress = &bak->progress.build_progress;
if (!ignore_data) {
if (rst_write_data(session, &write_ctx->ctrl, (const char *)head, read_size) != OG_SUCCESS) {
return OG_ERROR;
}
}
knl_panic(data_size >= read_size);
data_size -= read_size;
while (data_size > 0) {
if (rst_wait_read_data(bak, write_ctx) != OG_SUCCESS) {
return OG_ERROR;
}
if (write_ctx->read_size < 0) {
return OG_ERROR;
}
write_ctx->left_size = write_ctx->read_size % head->block_size;
left_offset = write_ctx->read_size - write_ctx->left_size;
write_ctx->read_size -= write_ctx->left_size;
read_size = ((uint64)write_ctx->read_size > data_size) ? (uint32)data_size : (uint32)write_ctx->read_size;
if (!ignore_data) {
if (rst_write_data(session, &write_ctx->ctrl, write_ctx->backup_buf.aligned_buf + write_ctx->write_size,
read_size) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (write_ctx->left_size > 0) {
ret = memmove_s(write_ctx->backup_buf.aligned_buf, BACKUP_BUFFER_SIZE(bak),
write_ctx->backup_buf.aligned_buf + left_offset, write_ctx->left_size);
knl_securec_check(ret);
}
if (BAK_IS_FULL_BUILDING(bak)) {
build_progress->asn = head->asn;
build_progress->stage = BACKUP_LOG_STAGE;
build_progress->curr_file_index = bak->curr_file_index;
}
write_ctx->read_size -= (int32)read_size;
write_ctx->write_size += (int32)read_size;
data_size -= read_size;
}
if (rst_update_track_file(session, bak, OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t rst_write_to_buf(bak_t *bak, bak_process_t *write_ctx, char *buf, uint32 buf_size)
{
int32 remain_size;
int32 offset;
int32 read_size;
errno_t ret;
remain_size = buf_size;
offset = 0;
while (remain_size > 0) {
if (rst_wait_read_data(bak, write_ctx) != OG_SUCCESS) {
return OG_ERROR;
}
read_size = (write_ctx->read_size > remain_size) ? remain_size : write_ctx->read_size;
ret = memcpy_sp(buf + offset, remain_size, write_ctx->backup_buf.aligned_buf + write_ctx->write_size,
read_size);
knl_securec_check(ret);
write_ctx->read_size -= read_size;
write_ctx->write_size += read_size;
offset += read_size;
remain_size -= read_size;
}
OG_LOG_DEBUG_INF("[RESTORE] restore to buf size:%u", buf_size);
return OG_SUCCESS;
}
static status_t rst_prepare_logfile(knl_session_t *session, char *buf, uint32 curr_file_index, bak_process_t *write_ctx,
bool32 *ignore_data)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
uint32 fill_size;
log_file_head_t *head = (log_file_head_t *)buf;
if (rst_write_to_buf(bak, write_ctx, (char *)head, sizeof(log_file_head_t)) != OG_SUCCESS) {
return OG_ERROR;
}
if (log_verify_head_checksum(session, head, bak->local.name) != OG_SUCCESS) {
return OG_ERROR;
}
fill_size = CM_CALC_ALIGN((uint32)head->block_size, sizeof(log_file_head_t)) - sizeof(log_file_head_t);
if (rst_write_to_buf(bak, write_ctx, buf + sizeof(log_file_head_t), fill_size) != OG_SUCCESS) {
return OG_ERROR;
}
if (session->kernel->attr.clustered) {
if (dtc_bak_set_logfile_ctrl(session, curr_file_index, head, &write_ctx->ctrl, ignore_data) != OG_SUCCESS) {
return OG_ERROR;
}
} else {
if (rst_set_logfile_ctrl(session, curr_file_index, head, &write_ctx->ctrl, ignore_data) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t rst_restore_arch_log(knl_session_t *session, log_file_head_t *head, uint32 curr_file_index)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_process_t *write_ctx = &ogx->process[BAK_COMMON_PROC];
bak_t *bak = &ogx->bak;
status_t status;
bool32 ignore_data = OG_FALSE;
if (rst_prepare_logfile(session, (char *)head, curr_file_index, write_ctx, &ignore_data) != OG_SUCCESS) {
return OG_ERROR;
}
status = rst_restore_logfile(session, head, ignore_data);
cm_close_device(write_ctx->ctrl.type, &write_ctx->ctrl.handle);
if (status != OG_SUCCESS) {
return OG_ERROR;
}
if (bak->files[curr_file_index].type == BACKUP_ARCH_FILE && bak->rst_file.file_type == RESTORE_ALL) {
if (session->kernel->attr.clustered) {
if (dtc_rst_arch_try_record_archinfo(session, ARCH_DEFAULT_DEST, write_ctx->ctrl.name, head,
bak->files[curr_file_index].inst_id) != OG_SUCCESS) {
return OG_ERROR;
}
} else {
if (arch_try_record_archinfo(session, ARCH_DEFAULT_DEST, write_ctx->ctrl.name, head) != OG_SUCCESS) {
return OG_ERROR;
}
}
}
return OG_SUCCESS;
}
static status_t rst_restore_logfiles(knl_session_t *session, uint32 *last_log_index)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_process_t *write_ctx = &ogx->process[BAK_COMMON_PROC];
bak_t *bak = &ogx->bak;
status_t status;
log_context_t *log_ctx = &session->kernel->redo_ctx;
char *buf = log_ctx->logwr_buf;
log_file_head_t *head = (log_file_head_t *)buf;
uint32 curr_file_index = bak->curr_file_index;
bak_stage_t *stage = &bak->progress.build_progress.stage;
if (BAK_IS_FULL_BUILDING(bak) && bak_get_build_stage(stage) > BUILD_LOG_STAGE) {
return OG_SUCCESS;
}
if (bak->rst_file.file_type == RESTORE_ALL) {
if (bak->record.attr.level > 0 && bak->is_building) {
status = rst_build_update_logfiles(session, write_ctx);
} else {
if (session->kernel->attr.clustered) {
status = dtc_rst_create_logfiles(session);
} else {
status = rst_create_logfiles(session);
}
}
if (status != OG_SUCCESS) {
return OG_ERROR;
}
}
bak->logfiles_created = OG_TRUE;
OG_LOG_RUN_INF("[RESTORE] create logfiles completed");
while (!bak->failed && bak->progress.stage != BACKUP_READ_FINISHED) {
if (write_ctx->read_size == 0 || bak_paral_task_enable(session)) {
cm_sleep(1);
continue;
}
curr_file_index = bak->curr_file_index;
if (rst_restore_arch_log(session, head, curr_file_index) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (bak_paral_task_enable(session)) {
*last_log_index = bak->file_count - 1;
} else {
*last_log_index = curr_file_index;
}
return bak->failed ? OG_ERROR : OG_SUCCESS;
}
static status_t rst_write_to_ctrl_buf(knl_session_t *session)
{
knl_instance_t *kernel = session->kernel;
bak_process_t *write_ctx = &kernel->backup_ctx.process[BAK_COMMON_PROC];
bak_t *bak = &kernel->backup_ctx.bak;
uint32 size = CTRL_MAX_PAGES(session) * OG_DFLT_CTRL_BLOCK_SIZE;
if (bak->record.is_repair) {
if (rst_write_to_buf(bak, write_ctx, bak->ctrl_data_buf, size) != OG_SUCCESS) {
return OG_ERROR;
}
if (!rst_db_files_not_changed(session, (ctrl_page_t *)bak->ctrl_data_buf)) {
OG_LOG_RUN_ERR("[BUILD] primary's log/data amount/name has changed");
OG_THROW_ERROR(ERR_INVALID_OPERATION, ": primary's log/data amount/name has changed");
return OG_ERROR;
}
OG_LOG_RUN_INF("[BUILD] check file amount/name successfully");
errno_t ret = memcpy_sp((char *)(&kernel->db.ctrl.pages[0]), size, bak->ctrl_data_buf, size);
knl_securec_check(ret);
} else if (rst_write_to_buf(bak, write_ctx, (char *)(&kernel->db.ctrl.pages[0]), size) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t rst_restore_ctrlfiles(knl_session_t *session)
{
knl_instance_t *kernel = session->kernel;
bak_process_t *write_ctx = &kernel->backup_ctx.process[BAK_COMMON_PROC];
bak_t *bak = &kernel->backup_ctx.bak;
if (BAK_IS_FULL_BUILDING(bak) && !bak->is_first_link) {
return OG_SUCCESS;
}
while (bak->progress.stage != BACKUP_CTRL_STAGE) {
if (bak->failed) {
return OG_ERROR;
}
cm_sleep(1);
}
if (rst_write_to_ctrl_buf(session) != OG_SUCCESS) {
return OG_ERROR;
}
db_load_core(&kernel->db);
if (rst_verify_ctrlfile_checksum(session, bak->local.name) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_restore_ctrlfile_items(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (!db_sysdata_version_is_equal(session, OG_FALSE)) {
return OG_ERROR;
}
arch_set_arch_start(session, 0, session->kernel->id);
arch_set_arch_end(session, 0, session->kernel->id);
if (session->kernel->attr.clustered) {
dtc_rst_arch_set_arch_start_and_end(session);
}
kernel->db.ctrl.core.build_completed = OG_FALSE;
kernel->db.ctrl.core.db_role = bak->is_building ? REPL_ROLE_PHYSICAL_STANDBY : REPL_ROLE_PRIMARY;
if (rst_build_ctrl_file(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_create_datafiles(session, write_ctx) != OG_SUCCESS) {
bak->failed = OG_TRUE;
return OG_ERROR;
}
bak->ctrlfile_completed = OG_TRUE;
session->kernel->db.ctrl.core.bak_dbid = session->kernel->db.ctrl.core.dbid;
session->kernel->db.ctrl.core.ddl_pitr_lsn = bak->ddl_pitr_lsn;
return OG_SUCCESS;
}
static uint32 rst_skip_empty_pages(char *buf, uint32 size, uint32 page_size)
{
page_head_t *page = NULL;
uint32 offset = 0;
while (offset < size) {
page = (page_head_t *)(buf + offset);
if (page->size_units != 0) {
break;
}
offset += page_size;
}
return offset;
}
static uint32 rst_calc_ordered_pages(char *buf, uint32 size, uint32 page_size)
{
page_head_t *page = (page_head_t *)buf;
uint32 offset = page_size;
uint32 count = 1;
page_id_t *page_id = AS_PAGID_PTR(page->id);
uint16 file_id = page_id->file;
uint32 page_pos = page_id->page + 1;
while (offset < size) {
page = (page_head_t *)(buf + offset);
page_id = AS_PAGID_PTR(page->id);
if (page_id->file != file_id || page_id->page != page_pos) {
break;
}
offset += page_size;
page_pos = page_id->page + 1;
count++;
}
return count;
}
status_t rst_fill_file_gap(device_type_t type, int32 handle, int64 start, int64 end, const char *buf, uint32 buf_size)
{
uint32 data_size;
int64 offset = start;
if ((uint64)offset == OG_INVALID_ID64) {
offset = cm_device_size(type, handle);
if (offset == -1) {
OG_THROW_ERROR(ERR_SEEK_FILE, 0, SEEK_END, errno);
return OG_ERROR;
}
}
knl_panic(offset >= 0);
while (offset < end) {
if (end - offset > buf_size) {
data_size = buf_size;
} else {
data_size = (uint32)(end - offset);
}
if (cm_write_device(type, handle, offset, buf, data_size) != OG_SUCCESS) {
return OG_ERROR;
}
offset += data_size;
}
return OG_SUCCESS;
}
#ifndef WIN32
static status_t rst_get_group_for_one_page(knl_session_t *session, datafile_t *df, int32 handle, page_id_t first,
bak_table_compress_ctx_t *table_compress_ctx, char **group)
{
char *read_buf = table_compress_ctx->read_buf.aligned_buf;
char *unzip_buf = table_compress_ctx->unzip_buf.aligned_buf;
int64 offset = (int64)first.page * DEFAULT_PAGE_SIZE(session);
errno_t ret;
ret = memset_sp(read_buf, DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, 0,
DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT);
knl_securec_check(ret);
ret = memset_sp(unzip_buf, DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, 0,
DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT);
knl_securec_check(ret);
if (offset >= cm_device_size(df->ctrl->type, handle)) {
*group = read_buf;
return OG_SUCCESS;
}
knl_panic_log(offset + DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT <= cm_device_size(df->ctrl->type, handle),
"offset: %lld is right with file size: %lld", offset, cm_device_size(df->ctrl->type, handle));
if (cm_read_device(df->ctrl->type, handle, offset, read_buf,
DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[BUFFER] failed to read datafile %s, offset %lld, size %u", df->ctrl->name, offset,
DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT);
return OG_ERROR;
}
if (((page_head_t *)read_buf)->size_units == 0 || !((page_head_t *)read_buf)->compressed) {
*group = read_buf;
} else {
uint32 group_size;
if (buf_decompress_group(session, unzip_buf, read_buf, &group_size) != OG_SUCCESS) {
return OG_ERROR;
}
knl_panic_log(group_size == DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT,
"group size %u is not correct", group_size);
if (bak_construct_decompress_group(session, unzip_buf) != OG_SUCCESS) {
return OG_ERROR;
}
*group = unzip_buf;
}
return OG_SUCCESS;
}
static uint32 rst_update_group(knl_session_t *session, page_id_t first, page_head_t *target_page, uint32 remain_input,
char **group)
{
uint32 remain = remain_input;
uint32 offset_in_group = AS_PAGID_PTR(target_page->id)->page - first.page;
uint32 page_list_num = 1;
page_id_t first_for_next;
page_head_t *next_page = NULL;
knl_panic_log(AS_PAGID_PTR(target_page->id)->page >= first.page, "page %u-%u pos should larger than %u-%u",
AS_PAGID_PTR(target_page->id)->file, AS_PAGID_PTR(target_page->id)->page, first.file, first.page);
while (remain > 0) {
next_page = (page_head_t *)((char *)target_page + page_list_num * DEFAULT_PAGE_SIZE(session));
knl_panic_log(AS_PAGID_PTR(next_page->id)->page > first.page, "page %u-%u pos should larger than %u-%u",
AS_PAGID_PTR(next_page->id)->file, AS_PAGID_PTR(next_page->id)->page, first.file, first.page);
first_for_next = bak_first_compress_group_id(session, *AS_PAGID_PTR(next_page->id));
if (!IS_SAME_PAGID(first_for_next, first)) {
break;
}
page_list_num++;
remain--;
}
errno_t ret = memcpy_sp(*group + offset_in_group * DEFAULT_PAGE_SIZE(session),
page_list_num * DEFAULT_PAGE_SIZE(session),
(char *)target_page, page_list_num * DEFAULT_PAGE_SIZE(session));
knl_securec_check(ret);
OG_LOG_DEBUG_INF("[RESTORE] update pages start pos %u, num: %u", offset_in_group, page_list_num);
return page_list_num;
}
static status_t rst_punch_hole_in_group(knl_session_t *session, datafile_t *df, int32 handle, int64 first_hole_offset,
uint32 hole_num)
{
knl_panic_log(hole_num > 0, "hole number %u should larger than zero", hole_num);
for (uint32 i = 0; i < hole_num; i++) {
if (cm_file_punch_hole(handle, first_hole_offset + i * DEFAULT_PAGE_SIZE(session),
DEFAULT_PAGE_SIZE(session)) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t rst_write_group_directly(knl_session_t *session, datafile_t *df, int32 handle, char *group)
{
page_head_t *page = (page_head_t *)group;
page_id_t page_id = *AS_PAGID_PTR(page->id);
int64 offset = (int64)(AS_PAGID_PTR(page->id)->page) * DEFAULT_PAGE_SIZE(session);
if (cm_write_device(df->ctrl->type, handle, offset, group,
DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write datafile %s", df->ctrl->name);
return OG_ERROR;
}
OG_LOG_DEBUG_INF("[RESTORE] write de-compressed group start:%u-%u,type:%u", page_id.file, page_id.page,
(uint32)page->type);
return OG_SUCCESS;
}
static status_t rst_write_hole(knl_session_t *session, bak_process_t *ogx, datafile_t *df, int32 handle,
page_head_t *page, uint32 slot)
{
page_id_t hole_page_id = *AS_PAGID_PTR(page->id);
int64 first_hole_offset = (int64)hole_page_id.page * DEFAULT_PAGE_SIZE(session) + DEFAULT_PAGE_SIZE(session);
uint32 hole_num = PAGE_GROUP_COUNT - slot;
int64 last_hole_offset = first_hole_offset + hole_num * DEFAULT_PAGE_SIZE(session);
if (rst_fill_file_gap(df->ctrl->type, handle, first_hole_offset, last_hole_offset, ogx->fill_buf,
BACKUP_BUFFER_SIZE(&session->kernel->backup_ctx.bak)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write datafile %s", df->ctrl->name);
return OG_ERROR;
}
if (rst_punch_hole_in_group(session, df, handle, first_hole_offset, hole_num) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to punch datafile %s", df->ctrl->name);
return OG_ERROR;
}
OG_LOG_DEBUG_INF("[RESTORE] punch holes start page_id %u, total page num: %u", hole_page_id.page + 1, hole_num);
return OG_SUCCESS;
}
static status_t rst_write_group(knl_session_t *session, bak_process_t *ogx, datafile_t *df, int32 handle,
bak_table_compress_ctx_t *compress_ctx, char *group)
{
char *zip_buf = compress_ctx->zip_buf.aligned_buf;
uint32 compressed_size;
uint32 remaining_size;
uint32 actual_size;
uint32 zsize;
errno_t ret;
compressed_size = ZSTD_compress((char *)zip_buf, DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, group,
DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, ZSTD_DEFAULT_COMPRESS_LEVEL);
if (ZSTD_isError(compressed_size)) {
knl_panic(0);
}
if (compressed_size > DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT) {
if (rst_write_group_directly(session, df, handle, group) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
remaining_size = compressed_size;
knl_panic_log(compressed_size <= DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT,
"size %u is not correct", compressed_size);
zsize = COMPRESS_PAGE_VALID_SIZE(session);
uint32 slot = 0;
page_head_t *page = NULL;
uint32 offset_in_zip = 0;
do {
if (remaining_size > zsize) {
actual_size = zsize;
} else {
actual_size = remaining_size;
}
page = (page_head_t *)(group + slot * DEFAULT_PAGE_SIZE(session));
ret = memcpy_sp((char *)page + DEFAULT_PAGE_SIZE(session) - zsize, actual_size,
zip_buf + offset_in_zip, actual_size);
knl_securec_check(ret);
COMPRESS_PAGE_HEAD(page)->compressed_size = compressed_size;
COMPRESS_PAGE_HEAD(page)->compress_algo = COMPRESS_ZSTD;
COMPRESS_PAGE_HEAD(page)->group_cnt = GROUP_COUNT_8;
COMPRESS_PAGE_HEAD(page)->unused = 0;
page->compressed = 1;
dbwr_compress_checksum(session, page);
if (cm_write_device(df->ctrl->type, handle, (int64)(AS_PAGID_PTR(page->id)->page) * DEFAULT_PAGE_SIZE(session),
page, DEFAULT_PAGE_SIZE(session)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write datafile %s", df->ctrl->name);
return OG_ERROR;
}
page_id_t page_id = *AS_PAGID_PTR(page->id);
OG_LOG_DEBUG_INF("[RESTORE] write compress page:%u-%u,type:%u", page_id.file, page_id.page, (uint32)page->type);
remaining_size -= actual_size;
offset_in_zip += actual_size;
slot++;
} while (remaining_size != 0);
if (slot == PAGE_GROUP_COUNT) {
return OG_SUCCESS;
}
if (rst_write_hole(session, ogx, df, handle, page, slot) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t rst_incr_write_compress_pages(knl_session_t *session, bak_process_t *ogx, page_head_t *page,
rst_assist_t rst_assist)
{
bak_table_compress_ctx_t *table_compress_ctx = &ogx->table_compress_ctx;
page_id_t page_id = rst_assist.page_id;
uint32 page_count = rst_assist.page_count;
int32 handle = ogx->datafiles[page_id.file];
datafile_t *df = rst_assist.datafile;
uint32 pos = 0;
while (pos < page_count) {
page_head_t *target_page = (page_head_t *)((char *)page + pos * DEFAULT_PAGE_SIZE(session));
page_id_t target_page_id = *AS_PAGID_PTR(target_page->id);
if (target_page_id.page < DF_MAP_HWM_START) {
if (cm_write_device(df->ctrl->type, handle, (int64)(target_page_id.page) * DEFAULT_PAGE_SIZE(session),
target_page, DEFAULT_PAGE_SIZE(session)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write datafile %s", df->ctrl->name);
return OG_ERROR;
}
OG_LOG_DEBUG_INF("[RESTORE] write less 128's page:%u-%u,type:%u",
target_page_id.file, target_page_id.page, (uint32)target_page->type);
pos++;
continue;
}
page_id_t first = bak_first_compress_group_id(session, *AS_PAGID_PTR(target_page->id));
knl_panic_log(target_page->size_units != 0, "zero page not allowed");
char *group = NULL;
if (rst_get_group_for_one_page(session, df, handle, first, table_compress_ctx, &group) != OG_SUCCESS) {
return OG_ERROR;
}
uint32 remain = page_count - pos - 1;
uint32 page_list_num = rst_update_group(session, first, target_page, remain, &group);
knl_panic_log(page_list_num >= 1 && page_list_num <= PAGE_GROUP_COUNT, "page number %u is not correct",
page_list_num);
pos += page_list_num;
if (rst_write_group(session, ogx, df, handle, table_compress_ctx, group) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t rst_incr_write_punch_datafile(knl_session_t *session, bak_process_t *ogx, page_head_t *page,
rst_assist_t rst_assist)
{
page_id_t page_id = rst_assist.page_id;
uint32 page_count = rst_assist.page_count;
int32 handle = ogx->datafiles[page_id.file];
datafile_t *df = rst_assist.datafile;
page_head_t *target_page = NULL;
for (uint32 i = 0; i < page_count; i++) {
target_page = (page_head_t *)((char *)page + i * DEFAULT_PAGE_SIZE(session));
page_id_t target_page_id = *AS_PAGID_PTR(target_page->id);
int64 page_offset = (int64)target_page_id.page * DEFAULT_PAGE_SIZE(session);
if (target_page->type == PAGE_TYPE_PUNCH_PAGE) {
if (rst_fill_file_gap(df->ctrl->type, handle, page_offset, page_offset + DEFAULT_PAGE_SIZE(session),
ogx->fill_buf, BACKUP_BUFFER_SIZE(&session->kernel->backup_ctx.bak)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write datafile %s", df->ctrl->name);
return OG_ERROR;
}
if (cm_file_punch_hole(handle, page_offset, DEFAULT_PAGE_SIZE(session)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to punch datafile %s", df->ctrl->name);
return OG_ERROR;
}
OG_LOG_DEBUG_INF("[RESTORE] punch datafile %s page_id %u-%u",
df->ctrl->name, target_page_id.file, target_page_id.page);
continue;
}
if (cm_write_device(df->ctrl->type, handle, page_offset, target_page,
DEFAULT_PAGE_SIZE(session)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write datafile %s", df->ctrl->name);
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t rst_save_continuous_pages(knl_session_t *session, bak_process_t *ogx, page_head_t *page,
rst_assist_t rst_assist)
{
datafile_t *df = rst_assist.datafile;
bak_context_t *bak_ctx = &session->kernel->backup_ctx;
bak_attr_t *attr = &bak_ctx->bak.record.attr;
if (DATAFILE_IS_COMPRESS(df) && attr->level == 1) {
if (DATAFILE_IS_PUNCHED(df)) {
OG_LOG_RUN_ERR("[RESTORE] compress datafile %s is not expected to be a punched datafile", df->ctrl->name);
return OG_ERROR;
}
if (rst_incr_write_compress_pages(session, ogx, page, rst_assist) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write compress datafile %s", df->ctrl->name);
return OG_ERROR;
}
OG_LOG_DEBUG_INF("[RESTORE] inc rst, compress datafile. start page:%u-%u,type:%u,count:%u write successfully",
rst_assist.page_id.file, rst_assist.page_id.page, (uint32)page->type, rst_assist.page_count);
return OG_SUCCESS;
}
if (DATAFILE_IS_PUNCHED(df) && attr->level == 1) {
if (rst_incr_write_punch_datafile(session, ogx, page, rst_assist) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write punched datafile %s", df->ctrl->name);
return OG_ERROR;
}
OG_LOG_DEBUG_INF("[RESTORE] inc rst, punched datafile. start page:%u-%u,type:%u,count:%u write successfully",
rst_assist.page_id.file, rst_assist.page_id.page, (uint32)page->type, rst_assist.page_count);
return OG_SUCCESS;
}
page_id_t page_id = rst_assist.page_id;
uint32 page_count = rst_assist.page_count;
int64 offset = rst_assist.file_offset;
int32 handle = ogx->datafiles[page_id.file];
if (cm_write_device(df->ctrl->type, handle, offset, page, DEFAULT_PAGE_SIZE(session) * page_count) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write datafile %s", df->ctrl->name);
return OG_ERROR;
}
return OG_SUCCESS;
}
#endif
static status_t rst_save_pages(knl_session_t *session, bak_process_t *ogx, page_head_t *page, rst_assist_t rst_assist)
{
const char *fill_buf = ogx->fill_buf;
page_id_t page_id = rst_assist.page_id;
uint32 page_count = rst_assist.page_count;
datafile_t *df = rst_assist.datafile;
int32 handle = INVALID_FILE_HANDLE;
int64 offset = rst_assist.file_offset;
uint64 fill_start = OG_INVALID_ID64;
bak_context_t *bak_ctx = &session->kernel->backup_ctx;
bak_stat_t *stat = &bak_ctx->bak.stat;
bak_t *bak = &session->kernel->backup_ctx.bak;
build_progress_t *build_progress = &bak->progress.build_progress;
if (bak_paral_task_enable(session) && ogx->assign_ctrl.section_end != 0) {
fill_start = ogx->assign_ctrl.fill_offset;
}
if (page_id.file == 0 && page_id.page == 0 && !DATAFILE_IS_COMPRESS(df)) {
knl_panic_log(page_count == 1, "the page_count is abnormal, panic info: page %u-%u type %u, page_count %u",
page_id.file, page_id.page, page->type, page_count);
return OG_SUCCESS;
}
brain_repair_filer_page_from_remote(session, page, page_count);
if (spc_open_datafile(session, df, &ogx->datafiles[page_id.file]) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to open datafile %s", df->ctrl->name);
return OG_ERROR;
}
ogx->file_type[page_id.file] = df->ctrl->type;
handle = ogx->datafiles[page_id.file];
if (!BAK_FILE_NEED_PUNCH(df) && rst_fill_file_gap(df->ctrl->type, handle, (int64)fill_start,
(int64)page_id.page * DEFAULT_PAGE_SIZE(session), fill_buf, BACKUP_BUFFER_SIZE(bak)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write datafile %s", df->ctrl->name);
return OG_ERROR;
}
#ifdef WIN32
if (cm_write_device(df->ctrl->type, handle, offset, page, DEFAULT_PAGE_SIZE(session) * page_count) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write datafile %s", df->ctrl->name);
return OG_ERROR;
}
#else
if (rst_save_continuous_pages(session, ogx, page, rst_assist) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write datafile %s", df->ctrl->name);
return OG_ERROR;
}
#endif
if (db_fdatasync_file(session, handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to fdatasync datafile %s", df->ctrl->name);
return OG_ERROR;
}
(void)cm_atomic_inc(&stat->writes);
if (ogx->assign_ctrl.fill_offset < (uint64)offset + DEFAULT_PAGE_SIZE(session) * page_count) {
ogx->assign_ctrl.fill_offset = (uint64)offset + DEFAULT_PAGE_SIZE(session) * page_count;
}
if (BAK_IS_FULL_BUILDING(bak)) {
build_progress->stage = bak->progress.stage;
build_progress->file_id = df->ctrl->id;
build_progress->data_offset = (uint64)offset + DEFAULT_PAGE_SIZE(session) * page_count;
build_progress->curr_file_index = bak->curr_file_index;
if (rst_update_track_file(session, bak, OG_FALSE) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to update track file");
return OG_ERROR;
}
}
if (bak_paral_task_enable(session) && ogx->assign_ctrl.section_end != 0) {
knl_panic_log((uint64)offset + DEFAULT_PAGE_SIZE(session) * page_count <= ogx->assign_ctrl.section_end,
"panic info: page %u-%u type %u", page_id.file, page_id.page, page->type);
knl_panic_log((uint64)offset >= ogx->assign_ctrl.section_start, "panic info: page %u-%u type %u",
page_id.file, page_id.page, page->type);
}
return OG_SUCCESS;
}
static uint64 rst_get_punch_start(knl_session_t *session, bak_process_t *ogx, uint32 file_id, uint32 data_offset)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
uint64 punch_start;
if (BAK_IS_UDS_DEVICE(bak)) {
punch_start = ogx->curr_offset + data_offset;
return punch_start;
}
if (bak->is_building) {
punch_start = (bak->proc_count == BUILD_SINGLE_THREAD)
? cm_device_size(cm_device_type(ogx->datafile_name[file_id]), ogx->datafiles[file_id])
: ogx->curr_offset + data_offset;
return punch_start;
}
if (bak->record.attr.compress == COMPRESS_NONE) {
punch_start = (ogx->assign_ctrl.section_start == 0 ?
ogx->curr_offset + data_offset + DEFAULT_PAGE_SIZE(session) :
ogx->curr_offset + data_offset + ogx->assign_ctrl.section_start);
} else {
punch_start = (ogx->assign_ctrl.section_start == 0 ?
ogx->uncompressed_offset + data_offset + DEFAULT_PAGE_SIZE(session) :
ogx->uncompressed_offset + data_offset + ogx->assign_ctrl.section_start);
}
return punch_start;
}
static status_t rst_try_punch_empty_pages(knl_session_t *session, bak_process_t *ogx,
uint32 data_offset, uint32 zero_pages_offsets)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
const char *fill_buf = ogx->fill_buf;
uint32 file_id = bak_paral_task_enable(session) ?
bak->files[ogx->assign_ctrl.bak_index].id : bak->files[bak->curr_file_index].id;
datafile_t *df = DATAFILE_GET(session, file_id);
if (zero_pages_offsets == 0 || !BAK_FILE_NEED_PUNCH(df)) {
return OG_SUCCESS;
}
uint64 punch_start = rst_get_punch_start(session, ogx, file_id, data_offset);
uint64 punch_end = punch_start + zero_pages_offsets;
if (punch_start % DEFAULT_PAGE_SIZE(session) != 0) {
OG_LOG_RUN_ERR("[RESTORE] datafile %s punch offset %llu is not an integral multiple of page size",
df->ctrl->name, punch_start);
return OG_ERROR;
}
OG_LOG_DEBUG_INF("[RESTORE] punch datafile %s start page id %u-%u, total page num: %u",
df->ctrl->name, file_id, (uint32)punch_start / DEFAULT_PAGE_SIZE(session),
zero_pages_offsets / DEFAULT_PAGE_SIZE(session));
if (spc_open_datafile(session, df, &ogx->datafiles[file_id]) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to open datafile %s", df->ctrl->name);
return OG_ERROR;
}
ogx->file_type[file_id] = df->ctrl->type;
if (rst_fill_file_gap(df->ctrl->type, ogx->datafiles[file_id], (int64)punch_start,
(int64)punch_end, fill_buf, BACKUP_BUFFER_SIZE(bak)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to write datafile %s", df->ctrl->name);
return OG_ERROR;
}
if (cm_file_punch_hole(ogx->datafiles[file_id], punch_start, punch_end - punch_start) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to punch datafile %s start page_id %u-%u, total page num: %u",
df->ctrl->name, file_id, (uint32)punch_start / DEFAULT_PAGE_SIZE(session),
zero_pages_offsets / DEFAULT_PAGE_SIZE(session));
return OG_ERROR;
}
return OG_SUCCESS;
}
static void rst_build_wait_write_datafile(knl_session_t *session, bak_t *bak, bool32 *skip_write)
{
bak_stage_t *stage = &bak->progress.build_progress.stage;
if (bak_get_build_stage(stage) > BUILD_DATA_STAGE) {
*skip_write = OG_TRUE;
return;
}
while (!bak->failed && bak->progress.stage != BACKUP_DATA_STAGE) {
if (bak->progress.stage < BACKUP_DATA_STAGE) {
cm_sleep(1);
} else {
*skip_write = OG_TRUE;
return;
}
}
bak->progress.build_progress.stage = bak->progress.stage;
bak->progress.build_progress.curr_file_index = bak->curr_file_index;
bak->progress.build_progress.data_offset = DEFAULT_PAGE_SIZE(session);
if (rst_update_track_file(session, bak, bak->is_first_link) != OG_SUCCESS) {
bak->failed = OG_TRUE;
}
}
static void rst_wait_write_datafile(bak_t *bak)
{
while (!bak->failed && bak->progress.stage != BACKUP_DATA_STAGE) {
cm_sleep(1);
}
}
status_t rst_restore_datafile(knl_session_t *session, bak_t *bak, bak_process_t *ogx, char *buf, const char *filename)
{
uint32 zero_pages_offsets;
uint32 data_offset = 0;
bak_attr_t *attr = &bak->record.attr;
rst_assist_t rst_assist;
rst_assist.page_count = 1;
if (rst_verify_datafile_checksum(session, ogx, buf, (uint32)ogx->read_size / DEFAULT_PAGE_SIZE(session),
filename) != OG_SUCCESS) {
return OG_ERROR;
}
while (!bak->failed && data_offset < (uint32)ogx->read_size) {
if (attr->level == 0) {
zero_pages_offsets = rst_skip_empty_pages(buf + data_offset, (uint32)ogx->read_size - data_offset,
DEFAULT_PAGE_SIZE(session));
if (rst_try_punch_empty_pages(session, ogx, data_offset, zero_pages_offsets) != OG_SUCCESS) {
return OG_ERROR;
}
data_offset += zero_pages_offsets;
if (data_offset >= (uint32)ogx->read_size) {
break;
}
}
rst_assist.page_id = *(page_id_t *)(buf + data_offset);
rst_assist.datafile = DATAFILE_GET(session, rst_assist.page_id.file);
rst_assist.file_offset = (int64)rst_assist.page_id.page * DEFAULT_PAGE_SIZE(session);
rst_assist.page_count = rst_calc_ordered_pages(buf + data_offset, (uint32)ogx->read_size - data_offset,
DEFAULT_PAGE_SIZE(session));
while (bak->extended[rst_assist.page_id.file] == 0) {
OG_LOG_RUN_WAR("[RESTORE] page_file %u has not been extend.", rst_assist.page_id.file);
cm_sleep(BAK_WAIT_FILE_EXTEND_MS);
}
if (rst_save_pages(session, ogx, (page_head_t *)(buf + data_offset), rst_assist) != OG_SUCCESS) {
return OG_ERROR;
}
data_offset += DEFAULT_PAGE_SIZE(session) * rst_assist.page_count;
zero_pages_offsets = 0;
}
return bak->failed ? OG_ERROR : OG_SUCCESS;
}
static status_t rst_restore_datafiles(knl_session_t *session, bak_context_t *ogx, bak_process_t *write_ctx)
{
bak_t *bak = &ogx->bak;
int32 left_offset;
errno_t ret;
bool32 skip_write = OG_FALSE;
if (BAK_IS_FULL_BUILDING(bak)) {
rst_build_wait_write_datafile(session, bak, &skip_write);
} else {
rst_wait_write_datafile(bak);
}
if (skip_write) {
return OG_SUCCESS;
}
while (!bak->failed && bak->progress.stage == BACKUP_DATA_STAGE) {
if (write_ctx->read_size == 0 || bak_paral_task_enable(session)) {
cm_sleep(1);
continue;
}
write_ctx->left_size = write_ctx->read_size % (int32)DEFAULT_PAGE_SIZE(session);
left_offset = write_ctx->read_size - write_ctx->left_size;
write_ctx->read_size -= write_ctx->left_size;
if (rst_restore_datafile(session, bak, write_ctx, write_ctx->backup_buf.aligned_buf,
bak->local.name) != OG_SUCCESS) {
return OG_ERROR;
}
if (write_ctx->left_size > 0) {
ret = memmove_s(write_ctx->backup_buf.aligned_buf, BACKUP_BUFFER_SIZE(bak),
write_ctx->backup_buf.aligned_buf + left_offset, write_ctx->left_size);
knl_securec_check(ret);
}
write_ctx->read_size = 0;
}
return bak->failed ? OG_ERROR : OG_SUCCESS;
}
static status_t rst_regist_archive(knl_session_t *session, uint32 *last_arvhied_asn)
{
database_t *db = &session->kernel->db;
uint32 rst_id = db->ctrl.core.resetlogs.rst_id;
uint32 archive_asn = *last_arvhied_asn + 1;
if (arch_try_regist_archive(session, rst_id, &archive_asn) != OG_SUCCESS) {
return OG_ERROR;
}
*last_arvhied_asn = archive_asn - 1;
return OG_SUCCESS;
}
status_t arch_try_regist_archive_by_dbstor(knl_session_t *session, uint32 *asn, uint32 rst_id,
uint64 start_lsn, uint64 end_lsn, uint32 inst_id)
{
char file_name[OG_FILE_NAME_BUFFER_SIZE] = { 0 };
bool32 is_first = OG_TRUE;
arch_file_name_info_t file_name_info = {rst_id, *asn, inst_id, OG_FILE_NAME_BUFFER_SIZE,
start_lsn, end_lsn, file_name};
uint64 *find_lsn = &file_name_info.start_lsn;
for (;;) {
if (is_first) {
arch_set_archive_log_name_with_lsn(session, ARCH_DEFAULT_DEST, &file_name_info);
*find_lsn = file_name_info.end_lsn + 1;
is_first = OG_FALSE;
} else {
status_t status = arch_find_archive_log_name(session, &file_name_info);
if (status != OG_SUCCESS) {
return OG_SUCCESS;
}
*find_lsn = file_name_info.end_lsn + 1;
}
if (!cm_exist_device(arch_get_device_type(file_name), file_name)) {
return OG_SUCCESS;
}
if (arch_regist_archive(session, file_name) != OG_SUCCESS) {
return OG_ERROR;
}
*asn = file_name_info.asn;
}
return OG_SUCCESS;
}
static status_t rst_regist_archive_by_dbstor(knl_session_t *session, uint32 *last_archived_asn)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
int32 file_index = bak->file_count - 1;
if (file_index < 0) {
return OG_SUCCESS;
}
if (bak->files[file_index].type != BACKUP_ARCH_FILE) {
return OG_SUCCESS;
}
if (arch_try_regist_archive_by_dbstor(session, last_archived_asn, bak->files[file_index].rst_id,
bak->files[file_index].start_lsn, bak->files[file_index].end_lsn,
bak->files[file_index].inst_id) != OG_ERROR) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t rst_build_wait_head(knl_session_t *session)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
if (!bak->is_building) {
return OG_SUCCESS;
}
while (!bak->failed && !bak->head_is_built) {
if (bak_check_session_status(session) != OG_SUCCESS) {
return OG_ERROR;
}
cm_sleep(1);
}
return bak->failed ? OG_ERROR : OG_SUCCESS;
}
static void get_asn_and_file_id(knl_session_t *session, uint32 file_index, uint32* asn, uint32* file_id)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
bak_ctrlinfo_t *ctrlinfo = &bak->record.ctrlinfo;
if (bak->files[file_index].type == BACKUP_ARCH_FILE) {
*asn = ctrlinfo->lrp_point.asn;
if (DB_IS_RAFT_ENABLED(session->kernel)) {
*file_id = bak->log_first_slot;
OG_LOG_RUN_INF("[RESTORE] raft log fisrt is %u", *file_id);
}
} else {
*asn = ctrlinfo->lrp_point.asn - 1;
*file_id = bak->files[file_index].id;
}
}
void rst_update_process_data_size(knl_session_t *session, bak_context_t *ogx)
{
knl_instance_t *kernel = session->kernel;
datafile_t *datafile = NULL;
log_file_ctrl_t *logfile = NULL;
uint32 i;
logfile_set_t *logfile_set = MY_LOGFILE_SET(session);
for (i = 0; i < OG_MAX_DATA_FILES; i++) {
datafile = &kernel->db.datafiles[i];
if (!datafile->ctrl->used || !DATAFILE_IS_ONLINE(datafile)) {
continue;
}
if (ogx->bak.rst_file.file_type == RESTORE_DATAFILE && ogx->bak.rst_file.file_id != datafile->ctrl->id) {
continue;
}
bak_update_progress(&ogx->bak, (uint64)datafile->ctrl->size);
}
if (ogx->bak.rst_file.file_type == RESTORE_DATAFILE) {
return;
}
for (i = 0; i < logfile_set->logfile_hwm; i++) {
logfile = logfile_set->items[i].ctrl;
if (LOG_IS_DROPPED(logfile->flg)) {
continue;
}
bak_update_progress(&ogx->bak, (uint64)logfile->size);
}
return;
}
static status_t rst_amend_files(knl_session_t *session, uint32 last_log_index)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
uint32 bak_last_asn;
uint32 log_first = OG_INVALID_ID32;
uint32 last_archived_asn;
uint64 data_size;
data_size = db_get_datafiles_size(session) + db_get_logfiles_size(session);
bak_set_progress(session, BACKUP_BUILD_STAGE, data_size);
rst_update_process_data_size(session, ogx);
if (ogx->bak.rst_file.file_type == RESTORE_DATAFILE) {
return OG_SUCCESS;
}
bool32 is_dbstor = BAK_IS_DBSOTR(bak);
if (!is_dbstor) {
get_asn_and_file_id(session, last_log_index, &last_archived_asn, &log_first);
}
if (!bak->is_building) {
if (is_dbstor) {
last_archived_asn = 0;
bak_last_asn = last_archived_asn;
if (rst_regist_archive_by_dbstor(session, &last_archived_asn) != OG_SUCCESS) {
return OG_ERROR;
}
} else {
bak_last_asn = last_archived_asn;
if (rst_regist_archive(session, &last_archived_asn) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (last_archived_asn != bak_last_asn) {
log_first = OG_INVALID_ID32;
}
}
if (rst_amend_ctrlinfo(session, last_archived_asn + 1, log_first) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
char *rst_fetch_filename(bak_t *bak)
{
int32 pos = bak->depend_num - bak->curr_id;
if (bak->depend_num > 0 && pos > 0) {
OG_LOG_DEBUG_INF("[BACKUP]depend_num2 %u, file dest %s", pos, (bak->depends + pos - 1)->file_dest);
return (bak->depends + pos - 1)->file_dest;
}
OG_LOG_DEBUG_INF("[BACKUP]depend_num3 %u, record path %s", pos, bak->record.path);
return bak->record.path;
}
static void rst_write_all_files(knl_session_t *session, bak_t *bak, bak_process_t *write_ctx, uint32 *last_log_index)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
while (BAK_IS_FULL_BUILDING(bak) && bak->progress.stage < BACKUP_CTRL_STAGE) {
if (bak->failed) {
return;
}
cm_sleep(1);
}
while (!bak->failed) {
if (RST_NEED_RESTORE_CTRLFILE(bak)) {
if (rst_restore_ctrlfiles(session) != OG_SUCCESS) {
bak_set_error(&bak->error_info);
bak_set_error_msg(&bak->error_info, "restore ctrl files failed");
bak->failed = OG_TRUE;
break;
}
}
if (RST_NEED_RESTORE_DATAFILE(bak)) {
if (rst_create_datafiles(session, write_ctx) != OG_SUCCESS) {
bak_set_error(&bak->error_info);
bak_set_error_msg(&bak->error_info, "restore create datafiles failed");
bak->failed = OG_TRUE;
break;
}
OG_LOG_RUN_INF("[RESTORE] create datafiles finished");
if (rst_restore_datafiles(session, ogx, write_ctx) != OG_SUCCESS) {
bak_set_error(&bak->error_info);
bak_set_error_msg(&bak->error_info, "restore datafiles failed");
bak->failed = OG_TRUE;
break;
}
OG_LOG_RUN_INF("[RESTORE] restore datafiles finished");
}
while (!bak->failed && bak->progress.stage == BACKUP_DATA_STAGE) {
cm_sleep(1);
}
if (bak->progress.stage >= BACKUP_LOG_STAGE) {
break;
}
}
if (!bak->failed && RST_NEED_RESTORE_ARCHFILE(bak)) {
if (rst_restore_logfiles(session, last_log_index) != OG_SUCCESS) {
bak_set_error(&bak->error_info);
bak_set_error_msg(&bak->error_info, "restore log files failed");
bak->failed = OG_TRUE;
}
OG_LOG_RUN_INF("[RESTORE] restore logfiles finished, current backupset %s", rst_fetch_filename(bak));
}
}
static void rst_write_proc(thread_t *thread)
{
bak_process_t *write_ctx = (bak_process_t *)thread->argument;
knl_session_t *session = write_ctx->session;
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
build_progress_t *build_progress = &bak->progress.build_progress;
uint32 i;
uint32 last_log_index = OG_INVALID_ID32;
KNL_SESSION_SET_CURR_THREADID(session, cm_get_current_thread_id());
rst_write_all_files(session, bak, write_ctx, &last_log_index);
if (!bak->failed && bak->rst_file.file_type != RESTORE_ARCHFILE) {
if (BAK_IS_FULL_BUILDING(bak) && !bak->is_first_link) {
last_log_index = build_progress->curr_file_index;
}
if (rst_build_wait_head(session) != OG_SUCCESS) {
bak_set_error(&bak->error_info);
bak_set_error_msg(&bak->error_info, "restore wait backupset head failed");
bak->failed = OG_TRUE;
}
OG_LOG_RUN_INF("[RESTORE] all files have heen written");
if (rst_delete_track_file(session, bak, OG_FALSE) != OG_SUCCESS) {
bak_set_error(&bak->error_info);
bak_set_error_msg(&bak->error_info, "restore delete track file failed");
bak->failed = OG_TRUE;
}
if (session->kernel->attr.clustered) {
if (dtc_rst_amend_files(session, bak->file_count - 1) != OG_SUCCESS) {
bak_set_error(&bak->error_info);
bak_set_error_msg(&bak->error_info, "restore amend files failed");
bak->failed = OG_TRUE;
}
} else {
if (rst_amend_files(session, last_log_index) != OG_SUCCESS) {
bak_set_error(&bak->error_info);
bak_set_error_msg(&bak->error_info, "restore amend files failed");
bak->failed = OG_TRUE;
}
}
}
bak->progress.stage = BACKUP_WRITE_FINISHED;
for (i = 0; i < OG_MAX_DATA_FILES; i++) {
cm_close_device(write_ctx->file_type[i], &write_ctx->datafiles[i]);
write_ctx->datafile_name[i][0] = '\0';
}
if (cm_device_type(session->kernel->db.ctrlfiles.items[0].name) == DEV_TYPE_RAW) {
rst_close_ctrl_file(&session->kernel->db.ctrlfiles);
}
bak_set_error(&bak->error_info);
KNL_SESSION_CLEAR_THREADID(session);
}
static status_t rst_set_params(bak_t *bak, knl_restore_t *param)
{
errno_t ret;
bak->is_building = OG_FALSE;
bak->is_first_link = OG_TRUE;
bak->need_check = OG_FALSE;
(void)cm_text2str(¶m->path, bak->record.path, OG_FILE_NAME_BUFFER_SIZE);
(void)cm_text2str(¶m->policy, bak->record.policy, OG_BACKUP_PARAM_SIZE);
bak->record.device = param->device;
bak->record.data_type = knl_dbs_is_enable_dbs() ? DATA_TYPE_DBSTOR : DATA_TYPE_FILE;
bak->proc_count = param->parallelism == 0 ? BAK_DEFAULT_PARALLELISM : param->parallelism;
bak->encrypt_info.encrypt_alg = param->crypt_info.encrypt_alg;
(void)cm_text2str(¶m->spc_name, bak->spc_name, OG_NAME_BUFFER_SIZE);
if (bak->encrypt_info.encrypt_alg != ENCRYPT_NONE) {
ret = strncpy_sp(bak->password, OG_PASSWORD_BUFFER_SIZE, param->crypt_info.password,
strlen(param->crypt_info.password));
knl_securec_check(ret);
bak_replace_password(param->crypt_info.password);
}
bak->backup_buf_size = param->buffer_size;
bak->repair_type = param->repair_type;
OG_RETURN_IFERR(srv_get_param_bool32("RESTOR_ARCH_PREFER_BAK_SET", &bak->prefer_bak_set));
return OG_SUCCESS;
}
status_t rst_alloc_resource(knl_session_t *session, bak_t *bak)
{
char uds_path[OG_FILE_NAME_BUFFER_SIZE];
int32 ret;
if (cm_aligned_malloc(BACKUP_BUFFER_SIZE(bak) * 3, "bak buffer", &bak->align_buf) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_ALLOC_MEMORY, (uint64)BACKUP_BUFFER_SIZE(bak) * 3, "backup");
return OG_ERROR;
}
bak->backup_buf = bak->align_buf.aligned_buf;
bak->depends = (bak_dependence_t *)(bak->backup_buf + BACKUP_BUFFER_SIZE(bak));
bak->compress_buf = bak->backup_buf + 2 * BACKUP_BUFFER_SIZE(bak);
if (BAK_IS_UDS_DEVICE(bak)) {
ret = sprintf_s(uds_path, OG_FILE_NAME_BUFFER_SIZE, BAK_SUN_PATH_FORMAT,
session->kernel->home, session->kernel->instance_name);
knl_securec_check_ss(ret);
if (bak_init_uds(&bak->remote.uds_link, uds_path) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (cm_aligned_malloc(BACKUP_BUFFER_SIZE(bak), "rst stream buf0", &bak->recv_stream.bufs[0]) != OG_SUCCESS) {
return OG_ERROR;
}
if (cm_aligned_malloc(BACKUP_BUFFER_SIZE(bak), "rst stream buf1", &bak->recv_stream.bufs[1]) != OG_SUCCESS) {
return OG_ERROR;
}
bak->recv_stream.buf_size = BACKUP_BUFFER_SIZE(bak);
return OG_SUCCESS;
}
static status_t rst_alloc_write_resource(bak_process_t *common_proc)
{
knl_session_t *session = common_proc->session;
errno_t ret;
bak_t *bak = &session->kernel->backup_ctx.bak;
if (cm_aligned_malloc((int64)BACKUP_BUFFER_SIZE(bak) * 2, "backup process",
&common_proc->backup_buf) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_ALLOC_MEMORY, (uint64)BACKUP_BUFFER_SIZE(bak) * 2, "backup process");
return OG_ERROR;
}
common_proc->fill_buf = common_proc->backup_buf.aligned_buf + BACKUP_BUFFER_SIZE(bak);
ret = memset_sp(common_proc->fill_buf, BACKUP_BUFFER_SIZE(bak), 0, BACKUP_BUFFER_SIZE(bak));
knl_securec_check(ret);
if (cm_aligned_malloc((int64)BACKUP_BUFFER_SIZE(bak), "backup process", &common_proc->encrypt_ctx.encrypt_buf) !=
OG_SUCCESS) {
OG_THROW_ERROR(ERR_ALLOC_MEMORY, (uint64)BACKUP_BUFFER_SIZE(bak), "backup process");
return OG_ERROR;
}
if (cm_aligned_malloc((int64)DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, "backup process",
&common_proc->table_compress_ctx.read_buf) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_ALLOC_MEMORY, (uint64)DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, "backup process");
return OG_ERROR;
}
if (cm_aligned_malloc((int64)DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, "backup process",
&common_proc->table_compress_ctx.unzip_buf) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_ALLOC_MEMORY, (uint64)DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, "backup process");
return OG_ERROR;
}
if (cm_aligned_malloc((int64)DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, "backup process",
&common_proc->table_compress_ctx.zip_buf) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_ALLOC_MEMORY, (uint64)DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, "backup process");
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t rst_init_paral_proc_resource(bak_process_t *common_proc, bak_process_t *proc, bak_context_t *ogx, uint32 i)
{
knl_session_t *session = common_proc->session;
proc->read_size = 0;
proc->write_size = 0;
proc->left_size = 0;
proc->proc_id = i;
proc->is_free = OG_FALSE;
proc->read_failed = OG_FALSE;
proc->write_failed = OG_FALSE;
proc->read_execute = OG_FALSE;
proc->fill_buf = common_proc->fill_buf;
if (cm_aligned_malloc((int64)BACKUP_BUFFER_SIZE(&ogx->bak), "restore process", &proc->backup_buf) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_init_rw_buf(proc, (int64)BACKUP_BUFFER_SIZE(&ogx->bak), "RESTORE") != OG_SUCCESS) {
OG_THROW_ERROR(ERR_ALLOC_MEMORY, (uint64)BACKUP_BUFFER_SIZE(&ogx->bak), "restore process");
return OG_ERROR;
}
if (cm_aligned_malloc((int64)BACKUP_BUFFER_SIZE(&ogx->bak), "restore process",
&proc->compress_ctx.compress_buf) != OG_SUCCESS) {
return OG_ERROR;
}
if (cm_aligned_malloc((int64)BACKUP_BUFFER_SIZE(&ogx->bak), "restore process",
&proc->encrypt_ctx.encrypt_buf) != OG_SUCCESS) {
return OG_ERROR;
}
if (cm_aligned_malloc((int64)DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, "restore process",
&proc->table_compress_ctx.read_buf) != OG_SUCCESS) {
return OG_ERROR;
}
if (cm_aligned_malloc((int64)DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, "restore process",
&proc->table_compress_ctx.unzip_buf) != OG_SUCCESS) {
return OG_ERROR;
}
if (cm_aligned_malloc((int64)DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, "restore process",
&proc->table_compress_ctx.zip_buf) != OG_SUCCESS) {
return OG_ERROR;
}
if (cm_create_thread(rst_paral_task_write_proc, 0, proc, &proc->write_thread) != OG_SUCCESS) {
return OG_ERROR;
}
if (cm_create_thread(bak_paral_task_proc, 0, proc, &proc->thread) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t rst_start_write_thread(bak_process_t *common_proc)
{
bak_context_t *backup_ctx = &common_proc->session->kernel->backup_ctx;
bak_process_t *proc = NULL;
uint32 proc_count = backup_ctx->bak.proc_count;
common_proc->read_size = 0;
common_proc->write_size = 0;
common_proc->left_size = 0;
common_proc->proc_id = BAK_COMMON_PROC;
if (rst_alloc_write_resource(common_proc) != OG_SUCCESS) {
return OG_ERROR;
}
if (cm_create_thread(rst_write_proc, 0, common_proc, &common_proc->thread) != OG_SUCCESS) {
return OG_ERROR;
}
if (!bak_paral_task_enable(common_proc->session)) {
return OG_SUCCESS;
}
for (uint32 i = 1; i <= proc_count; i++) {
proc = &backup_ctx->process[i];
if (rst_init_paral_proc_resource(common_proc, proc, backup_ctx, i) != OG_SUCCESS) {
return OG_ERROR;
}
}
bak_wait_paral_proc(common_proc->session, OG_FALSE);
return OG_SUCCESS;
}
status_t rst_prepare(knl_session_t *session, knl_restore_t *param)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
if (rst_set_params(bak, param) != OG_SUCCESS) {
return OG_ERROR;
}
if (param->file_type == RESTORE_ARCHFILE) {
bak->rst_file.file_type = RESTORE_ARCHFILE;
}
if (bak->rst_file.file_type != RESTORE_DATAFILE) {
if (arch_init(session) != OG_SUCCESS) {
return OG_ERROR;
}
}
bak_reset_process_ctrl(bak, OG_TRUE);
bak_reset_stats_and_alloc_sess(session);
if (rst_alloc_resource(session, bak) != OG_SUCCESS) {
return OG_ERROR;
}
if (badblock_init(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_start_write_thread(&ogx->process[BAK_COMMON_PROC]) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t rst_read_data(bak_context_t *ogx, void *buf, int32 buf_size, int32 *read_size, bool32 *end, uint64 file_offset)
{
bak_stat_t *stat = &ogx->bak.stat;
bak_t *bak = &ogx->bak;
if (ogx->bak.is_building || BAK_IS_UDS_DEVICE(bak)) {
if (rst_agent_read(&ogx->bak, (char *)buf, (uint32)buf_size, read_size, end) != OG_SUCCESS) {
return OG_ERROR;
}
} else {
status_t s = cm_read_device_nocheck(ogx->bak.local.type, ogx->bak.local.handle,
file_offset, buf, buf_size, read_size);
if (s != OG_SUCCESS) {
return OG_ERROR;
}
if (ogx->block_repairing) {
return OG_SUCCESS;
}
(void)cm_atomic_inc(&stat->reads);
*end = (*read_size < buf_size);
}
bak_update_progress(&ogx->bak, (uint64)*read_size);
OG_LOG_DEBUG_INF("[RESTORE] read size :%u file name:%s", *read_size, bak->local.name);
return OG_SUCCESS;
}
status_t rst_read_check_size(int32 read_size, int32 expect_size, const char *file_name)
{
if (read_size != expect_size) {
OG_LOG_RUN_ERR("[RESTORE] read incomplete data from %s", file_name);
OG_THROW_ERROR(ERR_READ_DEVICE_INCOMPLETE, read_size, expect_size);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t rst_read_start(knl_session_t *session, bak_file_type_t file_type, uint32 file_index,
uint32 file_id, uint32 sec_id)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
char *path = NULL;
uint32 read_type;
bak_attr_t *attr = &bak->record.attr;
path = rst_fetch_filename(bak);
ogx->process[BAK_COMMON_PROC].read_size = 0;
ogx->process[BAK_COMMON_PROC].write_size = 0;
if (bak->is_building) {
bak->remote.remain_data_size = 0;
} else if (BAK_IS_UDS_DEVICE(bak)) {
read_type = bak_get_package_type(file_type);
if (bak_agent_file_start(bak, path, read_type, file_id) != OG_SUCCESS) {
return OG_ERROR;
}
bak->remote.remain_data_size = 0;
} else {
bak_generate_bak_file(session, path, file_type, file_index, file_id, sec_id, bak->local.name);
bak->local.type = cm_device_type(bak->local.name);
if (cm_open_device(bak->local.name, bak->local.type, O_BINARY | O_SYNC | O_RDWR, &bak->local.handle) !=
OG_SUCCESS) {
return OG_ERROR;
}
OG_LOG_RUN_INF("[RESTORE] file name:%s, type=%d, handle=%d",
bak->local.name, bak->local.type, bak->local.handle);
}
if (file_type != BACKUP_HEAD_FILE && (attr->compress != COMPRESS_NONE)) {
if (knl_compress_init(bak->record.attr.compress, &bak->compress_ctx, OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (file_type != BACKUP_HEAD_FILE && (bak->encrypt_info.encrypt_alg != ENCRYPT_NONE)) {
if (bak_encrypt_init(bak, &ogx->process[BAK_COMMON_PROC].encrypt_ctx, &bak->files[file_index],
OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t rst_read_end(knl_session_t *session, bak_t *bak, bak_file_type_t file_type, bak_file_t *file)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
if (bak->is_building) {
if (file_type != BACKUP_HEAD_FILE && (bak->record.attr.compress != COMPRESS_NONE)) {
knl_compress_end(bak->record.attr.compress, &bak->compress_ctx, OG_FALSE);
}
return OG_SUCCESS;
} else if (BAK_IS_UDS_DEVICE(bak)) {
if (bak_agent_send_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
return OG_ERROR;
}
} else {
cm_close_device(bak->local.type, &bak->local.handle);
bak->local.handle = OG_INVALID_HANDLE;
}
if (file_type != BACKUP_HEAD_FILE && (bak->record.attr.compress != COMPRESS_NONE)) {
knl_compress_end(bak->record.attr.compress, &bak->compress_ctx, OG_FALSE);
}
if (file_type != BACKUP_HEAD_FILE && (bak->encrypt_info.encrypt_alg != ENCRYPT_NONE)) {
if (bak_decrypt_end(bak, &ogx->process[BAK_COMMON_PROC].encrypt_ctx, file, OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t rst_check_encrypt_version(bak_t *bak, bak_head_t *head)
{
text_t c_cipher;
text_t s_cipher;
if (bak->encrypt_info.encrypt_alg != head->encrypt_info.encrypt_alg) {
OG_THROW_ERROR(ERR_CRYPTION_ERROR,
"the SQL statement does not match the encryption attribute of the backup set");
return OG_ERROR;
}
if (head->encrypt_info.encrypt_alg != ENCRYPT_NONE) {
cm_str2text(bak->password, &c_cipher);
cm_str2text(head->sys_pwd, &s_cipher);
if (cm_check_password(&c_cipher, &s_cipher) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_CRYPTION_ERROR, "Authentication failure");
return OG_ERROR;
}
if (cm_encrypt_KDF2((uchar *)bak->password, (uint32)strlen(bak->password),
(uchar *)head->encrypt_info.salt, OG_KDF2SALTSIZE, OG_KDF2DEFITERATION,
(uchar *)bak->key, OG_AES256KEYSIZE) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
status_t rst_set_head(knl_session_t *session, bak_head_t *head, bool32 set_config)
{
config_t *config = session->kernel->attr.config;
bak_t *bak = &session->kernel->backup_ctx.bak;
bak_attr_t *attr = &bak->record.attr;
bak_ctrlinfo_t *ctrlinfo = &bak->record.ctrlinfo;
attr->backup_type = head->attr.backup_type;
attr->level = head->attr.level;
attr->compress = head->attr.compress;
errno_t ret = strcpy_sp(attr->tag, OG_NAME_BUFFER_SIZE, head->attr.tag);
knl_securec_check(ret);
if (rst_check_encrypt_version(bak, head) != OG_SUCCESS) {
return OG_ERROR;
}
bak->file_count = bak->is_building ? bak->file_count : head->file_count;
ctrlinfo->rcy_point = head->ctrlinfo.rcy_point;
ctrlinfo->lrp_point = head->ctrlinfo.lrp_point;
ctrlinfo->scn = head->ctrlinfo.scn;
ctrlinfo->lsn = head->ctrlinfo.lsn;
if (session->kernel->attr.clustered) {
for (uint32 i = 0; i < OG_MAX_INSTANCES; i++) {
ctrlinfo->dtc_rcy_point[i] = head->ctrlinfo.dtc_rcy_point[i];
ctrlinfo->dtc_lrp_point[i] = head->ctrlinfo.dtc_lrp_point[i];
}
}
bak->log_first_slot = head->log_fisrt_slot;
bak->curr_file_index = 0;
bak->ddl_pitr_lsn = head->ddl_pitr_lsn;
OG_LOG_RUN_INF("[RESTORE] get ddl pitr lsn %llu from backupset head", head->ddl_pitr_lsn);
if (!set_config) {
return OG_SUCCESS;
}
if (cm_alter_config(config, "CONTROL_FILES", head->control_files, CONFIG_SCOPE_BOTH, OG_TRUE) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t rst_restore_config_param(knl_session_t *session)
{
config_t *config = session->kernel->attr.config;
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
bool32 read_end = OG_FALSE;
int32 read_size;
if ((BAK_IS_FULL_BUILDING(bak) && !bak->is_first_link) || bak->record.is_repair) {
return OG_SUCCESS;
}
if (rst_read_data(ogx, bak->backup_buf, OG_MAX_CONFIG_LINE_SIZE, &read_size, &read_end, 0) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_read_check_size(read_size, (int32)OG_MAX_CONFIG_LINE_SIZE, "config") != OG_SUCCESS) {
return OG_ERROR;
}
if (cm_alter_config(config, "CONTROL_FILES", bak->backup_buf, CONFIG_SCOPE_BOTH, OG_TRUE) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t rst_wait_agent_process(bak_t *bak)
{
if (!bak->is_building && !BAK_IS_UDS_DEVICE(bak)) {
return OG_SUCCESS;
}
if (bak_agent_wait_pkg(bak, BAK_PKG_FILE_END) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t rst_restore_fileinfo(bak_context_t *ogx, bak_head_t *head, char *buf)
{
bool32 read_end = OG_FALSE;
int32 read_size;
int32 expect_size = bak_get_align_size(ogx->bak.local.type, head->file_count * sizeof(bak_file_t),
CM_CTSTORE_ALIGN_SIZE);
uint32 head_size = bak_get_align_size(ogx->bak.local.type, sizeof(bak_head_t), CM_CTSTORE_ALIGN_SIZE);
if (head->file_count == 0) {
return OG_SUCCESS;
}
if (head->file_count > BAK_MAX_FILE_NUM) {
OG_THROW_ERROR(ERR_INVALID_BACKUPSET, "invalid number of backupset file count");
return OG_ERROR;
}
if (rst_read_data(ogx, buf, expect_size, &read_size, &read_end, head_size) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_read_check_size(read_size, expect_size, "backupset file items") != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t rst_restore_dependents(bak_context_t *ogx, bak_head_t *head, char *buf)
{
bool32 read_end = OG_FALSE;
int32 read_size;
int32 expect_size = bak_get_align_size(ogx->bak.local.type, head->depend_num * sizeof(bak_dependence_t),
CM_CTSTORE_ALIGN_SIZE);
uint32 head_size = bak_get_align_size(ogx->bak.local.type,
sizeof(bak_head_t) + head->file_count * sizeof(bak_file_t),
CM_CTSTORE_ALIGN_SIZE);
if (head->depend_num == 0) {
return OG_SUCCESS;
}
if (head->depend_num > BAK_MAX_INCR_NUM) {
OG_THROW_ERROR(ERR_INVALID_BACKUPSET, "invalid number of backupset depentdent count");
return OG_ERROR;
}
OG_LOG_DEBUG_INF("[BACKUP] read catalog");
if (rst_read_data(ogx, buf, expect_size, &read_size, &read_end, head_size) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_read_check_size(read_size, expect_size, "backupset dependent items") != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t rst_set_backupset_info(knl_session_t *session, bak_t *bak, bak_head_t *head, bool32 fetch_catalog,
uint32 version_head_size)
{
errno_t ret;
uint32 offset;
bool32 set_config = !session->kernel->backup_ctx.block_repairing;
bak_free_compress_context(session, OG_FALSE);
bak_free_encrypt_context(session);
if (rst_set_head(session, head, set_config) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_alloc_compress_context(session, OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_alloc_encrypt_context(session) != OG_SUCCESS) {
return OG_ERROR;
}
offset = bak_get_align_size(bak->local.type, version_head_size, CM_CTSTORE_ALIGN_SIZE);
if (head->file_count > 0) {
ret = memcpy_sp(bak->files, BAK_MAX_FILE_NUM * sizeof(bak_file_t), (char *)head + offset,
head->file_count * sizeof(bak_file_t));
knl_securec_check(ret);
offset += bak_get_align_size(bak->local.type, bak->file_count * sizeof(bak_file_t), CM_CTSTORE_ALIGN_SIZE);
}
if (!fetch_catalog) {
return OG_SUCCESS;
}
if (head->depend_num > 0) {
ret = memcpy_sp(bak->depends, BACKUP_BUFFER_SIZE(bak), (char *)head + offset,
sizeof(bak_dependence_t) * head->depend_num);
knl_securec_check(ret);
}
bak->depend_num = head->depend_num;
return OG_SUCCESS;
}
static status_t rst_bakcup_version_check(bak_t *bak, bak_head_t *head, uint32 *left_size)
{
bak_version_t version = head->version;
if (version.major_ver != BAK_VERSION_MAJOR || version.min_ver != BAK_VERSION_MIN ||
version.magic != BAK_VERSION_MAGIC) {
OG_LOG_RUN_WAR("[RESTORE] backupset version mismatch, expected %u-%u-%u, input is %u-%u-%u",
BAK_VERSION_MAJOR, BAK_VERSION_MIN, BAK_VERSION_MAGIC,
version.major_ver, version.min_ver, version.magic);
}
if (version.major_ver < BAK_VERSION_MIN_WITH_ENCRYPTION) {
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", backupset version is too old");
return OG_ERROR;
}
if (version.major_ver == BAK_VERSION_MIN_WITH_ENCRYPTION && version.min_ver == 0) {
bak->is_noparal_version = OG_TRUE;
*left_size = sizeof(bak_old_version_head_t) - sizeof(bak_version_t);
} else {
bak->is_noparal_version = OG_FALSE;
*left_size = sizeof(bak_head_t) - sizeof(bak_version_t);
}
return OG_SUCCESS;
}
uint32 rst_get_db_main_version_len(char *db_version)
{
#define MAIN_VERSION_SEG_NUM 3
char *cur_pos = db_version;
uint32_t len = 0;
uint32_t seg_num = 0;
while (*cur_pos != '\0') {
if (*cur_pos == '.') {
seg_num++;
}
len++;
if (seg_num == MAIN_VERSION_SEG_NUM) {
len--;
break;
}
cur_pos++;
}
return len;
#undef MAIN_VERSION_SEG_NUM
}
status_t rst_db_version_check(knl_session_t *session, bak_t *bak, bak_head_t *head)
{
knl_attr_t *attr = &session->kernel->attr;
if (!attr->restore_check_version) {
return OG_SUCCESS;
}
if (bak->is_noparal_version) {
return OG_ERROR;
}
if (rst_get_db_main_version_len(head->db_version) != rst_get_db_main_version_len(attr->db_version)) {
OG_LOG_RUN_ERR("[RESTORE] backupset version %s mismatch with db version %s",
head->db_version, attr->db_version);
return OG_ERROR;
}
if (strncmp(head->db_version, attr->db_version, rst_get_db_main_version_len(head->db_version)) != 0) {
OG_LOG_RUN_ERR("[RESTORE] backupset version %s mismatch with db version %s",
head->db_version, attr->db_version);
return OG_ERROR;
}
return OG_SUCCESS;
}
* For example, incremental backupsets include: incr_0(level0),incr_1(level1),incr_2(level1)
* Block repair only find page from incremental backupset, no need to check.
* Restore database will first open incr_2 head to find the dependent backupsets,
* and then restore in the order of incr_0, incr_1, incr_2.
* Therefore, previous opened backupset tag is the same as next opened backupset base_tag.
*/
static status_t rst_incremental_backset_tag_check(knl_session_t *session, bak_t *bak, bak_head_t *head)
{
if (session->kernel->backup_ctx.block_repairing) {
return OG_SUCCESS;
}
if (head->attr.level == 1 && !CM_IS_EMPTY_STR(bak->record.attr.tag) &&
!cm_str_equal_ins(bak->record.attr.tag, head->attr.base_tag)) {
OG_THROW_ERROR_EX(ERR_INVALID_OPERATION,
", backupset with tag %s expected base backupset tag to be %s but found %s.",
head->attr.tag, head->attr.base_tag, bak->record.attr.tag);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t rst_check_backupset(knl_session_t *session, bak_t *bak, bak_head_t *head)
{
if (head->attr.backup_type != BACKUP_MODE_FULL && BAK_IS_TABLESPCE_RESTORE(bak)) {
OG_THROW_ERROR(ERR_INVALID_OPERATION,
", can not restore tablespace from backupset which is not full backupset");
return OG_ERROR;
}
* There are four backupset types: full database backupset, incremental database backupset,
* tablespace backupset, archivelog backupset.
* tablespace backupset must be used for datafile repair.
* archivelog backupset must be used for archivelog restore.
* incremental database backupset must be used for database restore.
*/
if (head->attr.backup_type == BACKUP_MODE_FULL && bak->rst_file.file_type == RESTORE_ARCHFILE) {
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", because it's a full database backupset");
return OG_ERROR;
}
if (BAK_MODE_IS_INCREMENTAL(head->attr.backup_type) && bak->rst_file.file_type != RESTORE_ALL) {
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", because it's a incremental backupset");
return OG_ERROR;
}
if (head->attr.backup_type == BACKUP_MODE_TABLESPACE && bak->rst_file.file_type != RESTORE_DATAFILE) {
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", because it's a tablespace backupset");
return OG_ERROR;
}
if (head->attr.backup_type == BACKUP_MODE_ARCHIVELOG && bak->rst_file.file_type != RESTORE_ARCHFILE) {
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", because it's a archivelog backupset");
return OG_ERROR;
}
if (rst_db_version_check(session, bak, head) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", database version and backupset version do not match.");
return OG_ERROR;
}
if (rst_incremental_backset_tag_check(session, bak, head) != OG_SUCCESS) {
return OG_ERROR;
}
if (BAK_IS_UDS_DEVICE(bak) && (head->max_buffer_size > bak->backup_buf_size)) {
OG_THROW_ERROR(ERR_INVALID_OPERATION,
", buffer size for restore(%u) cannot be smaller than buffer size for backup(%u).",
bak->backup_buf_size, head->max_buffer_size);
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t rst_restore_backupset_head(knl_session_t *session, bool32 fetch_catalog)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
bak_head_t *head = (bak_head_t *)bak->backup_buf;
bool32 read_end = OG_FALSE;
int32 read_size;
uint32 head_size = 0;
uint32 left_size;
if (rst_read_data(ogx, bak->backup_buf, sizeof(bak_version_t), &read_size, &read_end, head_size) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_read_check_size(read_size, (int32)sizeof(bak_version_t), "backupset version") != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_bakcup_version_check(bak, head, &left_size) != OG_SUCCESS) {
return OG_ERROR;
}
head_size = sizeof(bak_version_t);
if (rst_read_data(ogx, bak->backup_buf + head_size, left_size, &read_size, &read_end, head_size) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_read_check_size(read_size, (int32)left_size, "backupset head") != OG_SUCCESS) {
return OG_ERROR;
}
head_size += left_size;
if (bak_head_verify_checksum(session, head, (uint32)head_size, OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_check_backupset(session, bak, head) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_restore_fileinfo(ogx, head, bak->backup_buf + head_size) != OG_SUCCESS) {
return OG_ERROR;
}
head_size += head->file_count * sizeof(bak_file_t);
if (rst_restore_dependents(ogx, head, bak->backup_buf + head_size) != OG_SUCCESS) {
return OG_ERROR;
}
head_size += head->depend_num * sizeof(bak_dependence_t);
if (bak_head_verify_checksum(session, head, head_size, OG_TRUE) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_set_backupset_info(session, bak, head, fetch_catalog, left_size + sizeof(bak_version_t)) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t rst_read_head_file(knl_session_t *session, bool32 fetch_catalog)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
uint64 data_size;
data_size = sizeof(bak_head_t) + BAK_MAX_FILE_NUM * sizeof(bak_file_t);
bak_set_progress(session, BACKUP_HEAD_STAGE, data_size);
if (rst_read_start(session, BACKUP_HEAD_FILE, 0, 0, 0) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_restore_backupset_head(session, fetch_catalog) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_wait_agent_process(bak) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_read_end(session, bak, BACKUP_HEAD_FILE, NULL) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t rst_wait_write(bak_t *bak, bak_process_t *ogx)
{
while (!bak->failed && ogx->read_size != 0) {
cm_sleep(1);
continue;
}
return bak->failed ? OG_ERROR : OG_SUCCESS;
}
static status_t rst_write_to_write_buf(bak_t *bak, bak_process_t *ogx, const char *buf, int32 buf_size,
uint64 file_offset)
{
errno_t ret;
int32 offset = 0;
int32 remain_size = buf_size;
int32 data_size;
while (!bak->failed && remain_size > 0) {
if (rst_wait_write(bak, ogx) != OG_SUCCESS) {
return OG_ERROR;
}
data_size = remain_size > ((int32)BACKUP_BUFFER_SIZE(bak) - ogx->left_size) ?
((int32)BACKUP_BUFFER_SIZE(bak) - ogx->left_size) : remain_size;
ret = memcpy_sp(ogx->backup_buf.aligned_buf + ogx->left_size,
(uint32)(BACKUP_BUFFER_SIZE(bak) - ogx->left_size), buf + offset, (uint32)data_size);
knl_securec_check(ret);
ogx->curr_offset = file_offset + (uint64)offset;
ogx->write_size = 0;
CM_MFENCE;
ogx->read_size = ogx->left_size + data_size;
offset += data_size;
remain_size -= data_size;
}
return bak->failed ? OG_ERROR : OG_SUCCESS;
}
static status_t rst_decompress_to_write_buf(bak_t *bak, bak_process_t *ogx, char *buf, uint32 buf_size,
bool32 last_package, uint64 file_offset)
{
knl_compress_set_input(bak->record.attr.compress, &bak->compress_ctx, buf, buf_size);
for (;;) {
if (knl_decompress(bak->record.attr.compress, &bak->compress_ctx, last_package, bak->compress_buf,
BACKUP_BUFFER_SIZE(bak)) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_write_to_write_buf(bak, ogx, bak->compress_buf, (int32)bak->compress_ctx.write_len,
file_offset) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak->compress_ctx.finished) {
break;
}
}
return OG_SUCCESS;
}
static status_t rst_read_file_data(knl_session_t *session)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
bool32 read_end = OG_FALSE;
int32 read_size = 0;
uint32 buf_content_size;
uint64 file_size = bak->files[bak->curr_file_index].size;
uint64 file_offset = 0;
bool32 compressed = (bak->record.attr.compress != COMPRESS_NONE);
bool32 encrypted = (bak->encrypt_info.encrypt_alg != ENCRYPT_NONE);
bak_process_t *proc = &ogx->process[BAK_COMMON_PROC];
char *use_buf = bak->backup_buf;
int32 buf_size = (int32)(BACKUP_BUFFER_SIZE(bak) - bak->compress_ctx.last_left_size);
int32 request_size;
bak->compress_ctx.last_left_size = 0;
while (!bak->failed && !read_end) {
if (bak_check_session_status(session) != OG_SUCCESS) {
return OG_ERROR;
}
request_size = (file_size - file_offset <= buf_size) ? (file_size - file_offset) : buf_size;
if (request_size == 0) {
break;
}
if (rst_read_data(ogx, bak->backup_buf + bak->compress_ctx.last_left_size,
request_size, &read_size, &read_end, file_offset) != OG_SUCCESS) {
bak->failed = OG_TRUE;
break;
}
read_end = (file_size == file_offset + read_size);
if (encrypted) {
if (rst_decrypt_data(proc, bak->backup_buf + bak->compress_ctx.last_left_size, read_size,
bak->compress_ctx.last_left_size) != OG_SUCCESS) {
return OG_ERROR;
}
use_buf = proc->encrypt_ctx.encrypt_buf.aligned_buf;
}
if (compressed) {
buf_content_size = (uint32)read_size + bak->compress_ctx.last_left_size;
if (rst_decompress_to_write_buf(bak, proc, use_buf, buf_content_size, read_end,
file_offset) != OG_SUCCESS) {
bak->failed = OG_TRUE;
break;
}
} else {
if (rst_write_to_write_buf(bak, proc, use_buf, read_size, file_offset) != OG_SUCCESS) {
bak->failed = OG_TRUE;
return OG_ERROR;
}
}
file_offset += read_size;
}
if (!bak->is_building && file_size != file_offset) {
OG_LOG_RUN_ERR("[RESTORE] unexpected read size %llu, expected file size is %llu, file name %s", file_offset,
file_size, bak->local.name);
OG_THROW_ERROR(ERR_FILE_SIZE_MISMATCH, (int64)file_offset, file_size);
bak->failed = OG_TRUE;
return OG_ERROR;
}
return bak->failed ? OG_ERROR : OG_SUCCESS;
}
status_t rst_read_file(knl_session_t *session, uint32 file_index)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
if (rst_read_start(session, bak->files[file_index].type, file_index, bak->files[file_index].id,
bak->files[file_index].sec_id) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_read_file_data(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_wait_write(bak, &ogx->process[BAK_COMMON_PROC]) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_read_end(session, bak, bak->files[file_index].type, &bak->files[file_index]) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static bool32 rst_skip_file(knl_session_t *session, bak_file_t *file)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
datafile_t *df = NULL;
if (bak->rst_file.file_type == RESTORE_DATAFILE) {
if (file->type != BACKUP_DATA_FILE || bak->rst_file.file_id != file->id) {
return OG_TRUE;
} else {
bak->rst_file.exist_repair_file = OG_TRUE;
return OG_FALSE;
}
}
if (file->type != BACKUP_DATA_FILE) {
return OG_FALSE;
}
df = DATAFILE_GET(session, file->id);
return (bool32)((BAK_IS_TABLESPCE_RESTORE(bak) && !DATAFILE_IS_ONLINE(df)));
}
status_t rst_find_duplicative_archfile(knl_session_t *session, bak_file_t *file, bool32 *found_duplicate)
{
bool32 found_start_lsn = OG_FALSE;
bool32 found_end_lsn = OG_FALSE;
local_arch_file_info_t file_info;
char *arch_path = session->kernel->attr.arch_attr[0].local_path;
bool32 is_dbstor = BAK_IS_DBSOTR(&session->kernel->backup_ctx.bak);
bool32 dbid_equal = OG_FALSE;
device_type_t type = arch_get_device_type(arch_path);
void *file_list = NULL;
uint32 file_num = 0;
char *file_name = NULL;
if (cm_malloc_file_list(type, &file_list, arch_path, &file_num) != OG_SUCCESS) {
return OG_ERROR;
}
if (cm_query_device(type, arch_path, file_list, &file_num) != OG_SUCCESS) {
cm_free_file_list(&file_list);
return OG_ERROR;
}
for (uint32 i = 0; i < file_num; i++) {
file_name = cm_get_name_from_file_list(type, file_list, i);
if (file_name == NULL) {
cm_free_file_list(&file_list);
return OG_ERROR;
}
if (cm_match_arch_pattern(file_name) == OG_FALSE) {
continue;
}
if (bak_convert_archfile_name(file_name, &file_info,
file->inst_id, file->rst_id, is_dbstor) == OG_FALSE) {
continue;
}
if (bak_check_archfile_dbid(session, arch_path, file_name, &dbid_equal) != OG_SUCCESS) {
cm_free_file_list(&file_list);
return OG_ERROR;
}
if (dbid_equal != OG_TRUE) {
continue;
}
if (is_dbstor) {
if (file_info.local_end_lsn >= file->end_lsn && file_info.local_start_lsn < file->end_lsn) {
found_end_lsn = OG_TRUE;
}
if (file_info.local_end_lsn >= file->start_lsn + 1 && file_info.local_start_lsn < file->start_lsn + 1) {
found_start_lsn = OG_TRUE;
}
if (found_end_lsn && found_start_lsn) {
*found_duplicate = OG_TRUE;
break;
}
} else {
if (file_info.local_asn == file->id) {
*found_duplicate = OG_TRUE;
break;
}
}
}
cm_free_file_list(&file_list);
return OG_SUCCESS;
}
bool32 rst_skip_for_duplicative_archfile(knl_session_t *session, bak_file_t *cur_file, bak_file_t *next_file)
{
if (cur_file->type == BACKUP_ARCH_FILE) {
cur_file->skipped = OG_FALSE;
if (session->kernel->backup_ctx.bak.prefer_bak_set == OG_TRUE) {
return OG_FALSE;
}
if (next_file->type != BACKUP_ARCH_FILE || next_file->inst_id != cur_file->inst_id) {
bool32 found_duplicate = OG_FALSE;
if (rst_find_duplicative_archfile(session, cur_file, &found_duplicate) != OG_SUCCESS) {
return OG_FALSE;
}
if (found_duplicate) {
cur_file->skipped = OG_TRUE;
OG_LOG_RUN_INF("[RESTORE] restore logfile %u from arch dir", cur_file->id);
return OG_TRUE;
}
}
}
return OG_FALSE;
}
status_t rst_stream_read_file(knl_session_t *session)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
rst_stream_buf_t *recv_stream = &bak->recv_stream;
bak_process_t *common_proc = &session->kernel->backup_ctx.process[BAK_COMMON_PROC];
log_file_head_t *head = (log_file_head_t *)common_proc->backup_buf.aligned_buf;
uint32 bak_index = bak->curr_file_index;
uint32 file_id = bak->files[bak_index].id;
bool32 ignore_logfile = OG_FALSE;
char *path = rst_fetch_filename(bak);
bak_file_type_t type = bak->files[bak->curr_file_index].type;
if (bak->rst_file.file_type == RESTORE_ALL) {
if (rst_wait_ctrlfile_ready(bak) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (BAK_IS_UDS_DEVICE(bak)) {
if (bak_agent_file_start(bak, path, bak_get_package_type(type), file_id) != OG_SUCCESS) {
return OG_ERROR;
}
}
bak->remote.remain_data_size = 0;
rst_init_recv_stream(bak);
if (rst_stream_read_prepare(session, recv_stream, head, &ignore_logfile) != OG_SUCCESS) {
return OG_ERROR;
}
rst_assign_stream_restore_task(session, &common_proc->ctrl);
if (rst_recv_stream_data(session, ignore_logfile) != OG_SUCCESS) {
return OG_ERROR;
}
bak_wait_paral_proc(session, OG_FALSE);
if (BAK_IS_UDS_DEVICE(bak)) {
if (bak_agent_send_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (BAK_IS_FULL_BUILDING(bak) && (bak->files[bak->curr_file_index].type == BACKUP_LOG_FILE ||
bak->files[bak->curr_file_index].type == BACKUP_ARCH_FILE)) {
build_progress_t *build_progress = &bak->progress.build_progress;
build_progress->asn = head->asn;
build_progress->stage = BACKUP_LOG_STAGE;
build_progress->curr_file_index = bak->curr_file_index;
}
if (bak->files[bak->curr_file_index].type == BACKUP_ARCH_FILE && bak->rst_file.file_type == RESTORE_ALL) {
if (arch_try_record_archinfo(session, ARCH_DEFAULT_DEST, common_proc->ctrl.name, head) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t rst_read_files(knl_session_t *session, bak_file_type_t type)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
bak_process_t *proc = NULL;
while (!bak->failed && bak->curr_file_index < bak->file_count) {
if (bak_check_session_status(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak->files[bak->curr_file_index].type > type) {
break;
}
if (rst_skip_file(session, &bak->files[bak->curr_file_index]) ||
rst_skip_for_duplicative_archfile(session, &bak->files[bak->curr_file_index],
&bak->files[bak->curr_file_index + 1])) {
bak->curr_file_index++;
continue;
}
if (type >= BACKUP_DATA_FILE && type <= BACKUP_ARCH_FILE && bak_paral_task_enable(session)) {
if (BAK_IS_UDS_DEVICE(bak)) {
if (rst_stream_read_file(session)) {
return OG_ERROR;
}
} else {
if (bak_get_free_proc(session, &proc, OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_assign_restore_task(session, proc) != OG_SUCCESS) {
return OG_ERROR;
}
}
} else {
if (rst_read_file(session, bak->curr_file_index) != OG_SUCCESS) {
return OG_ERROR;
}
}
bak->curr_file_index++;
}
bak_wait_paral_proc(session, OG_FALSE);
return bak->failed ? OG_ERROR : OG_SUCCESS;
}
status_t rst_wait_ctrlfile_ready(bak_t *bak)
{
while (!bak->ctrlfile_completed) {
if (bak->failed) {
return OG_ERROR;
}
cm_sleep(10);
}
return OG_SUCCESS;
}
static status_t rst_read_ctrl_file(knl_session_t *session, bak_t *bak)
{
uint32 data_size = CTRL_MAX_PAGES(session) * OG_DFLT_CTRL_BLOCK_SIZE;
bak->ctrlfile_completed = OG_FALSE;
bak_set_progress(session, BACKUP_CTRL_STAGE, data_size);
if (rst_read_files(session, BACKUP_CTRL_FILE) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_wait_ctrlfile_ready(bak) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static uint64 rst_calc_bakfiles_size(knl_session_t *session, bak_t *bak, bak_file_type_t type)
{
uint64 size = 1;
uint32 id = bak->curr_file_index;
while (id < bak->file_count) {
if (bak->files[id].type > type) {
break;
}
if (!rst_skip_file(session, &bak->files[id])) {
size += bak->files[id].size;
}
id++;
}
return size;
}
static status_t rst_read_datafile(knl_session_t *session, bak_t *bak)
{
uint64 data_size;
if (bak->is_building) {
data_size = (uint64)(db_get_datafiles_size(session) * ESTIMATE_VALID_DATA_PERCENT);
} else {
data_size = rst_calc_bakfiles_size(session, bak, BACKUP_DATA_FILE);
}
bak_set_progress(session, BACKUP_DATA_STAGE, data_size);
if (rst_read_files(session, BACKUP_DATA_FILE) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak->rst_file.file_type == RESTORE_DATAFILE) {
if (!bak->rst_file.exist_repair_file) {
OG_THROW_ERROR(ERR_FILE_NOT_EXIST, "in backupset the", "specifical");
return OG_ERROR;
}
bak->rst_file.rcy_point = bak->record.ctrlinfo.rcy_point;
}
return OG_SUCCESS;
}
static void rst_wait_logfiles_created(bak_t *bak)
{
while (!bak->logfiles_created) {
if (bak->failed) {
break;
}
cm_sleep(1);
}
}
static status_t rst_read_archfile(knl_session_t *session, bak_t *bak)
{
uint64 data_size;
if (bak->is_building) {
data_size = db_get_logfiles_size(session);
} else {
data_size = rst_calc_bakfiles_size(session, bak, BACKUP_ARCH_FILE);
}
bak->logfiles_created = OG_FALSE;
bak_set_progress(session, BACKUP_LOG_STAGE, data_size);
rst_wait_logfiles_created(bak);
bak->curr_arch_id = 0;
if (rst_read_files(session, BACKUP_ARCH_FILE) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_agent_command(bak, BAK_PKG_SET_END) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t rst_restore_one_bakset(knl_session_t *session, bool32 fetch_catalog)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
if (bak->is_building) {
if (rst_restore_config_param(session) != OG_SUCCESS) {
return OG_ERROR;
}
} else {
if (bak_agent_command(bak, BAK_PKG_SET_START) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_read_head_file(session, fetch_catalog) != OG_SUCCESS) {
return OG_ERROR;
}
if (fetch_catalog && bak->depend_num > 0) {
if (rst_read_head_file(session, OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
}
}
if (RST_NEED_RESTORE_CTRLFILE(bak)) {
if (rst_read_ctrl_file(session, bak) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (RST_NEED_RESTORE_DATAFILE(bak)) {
if (rst_read_datafile(session, bak) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t rst_restore_all_incremental_backset(knl_session_t *session, bak_t *bak)
{
while (!bak->failed) {
if (bak_check_session_status(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak->curr_id == bak->depend_num) {
break;
}
bak->curr_id++;
if (bak->rst_file.file_type == RESTORE_ALL) {
rst_close_ctrl_file(&session->kernel->db.ctrlfiles);
}
if (bak_agent_command(bak, BAK_PKG_SET_END) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_restore_one_bakset(session, OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t rst_restore(knl_session_t *session)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
if (bak_agent_command(bak, BAK_PKG_START) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_restore_one_bakset(session, OG_TRUE) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak->depend_num > 0) {
if (rst_restore_all_incremental_backset(session, bak) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (!bak->failed && RST_NEED_RESTORE_ARCHFILE(bak)) {
if (rst_read_archfile(session, bak) != OG_SUCCESS) {
return OG_ERROR;
}
}
bak->progress.stage = BACKUP_READ_FINISHED;
if (!bak->failed) {
if (bak_agent_command(bak, BAK_PKG_END) != OG_SUCCESS) {
return OG_ERROR;
}
if (BAK_IS_UDS_DEVICE(bak)) {
cs_uds_disconnect(&bak->remote.uds_link);
}
}
return OG_SUCCESS;
}
void rst_wait_write_end(bak_t *ctrl)
{
while (!ctrl->failed && ctrl->progress.stage != BACKUP_WRITE_FINISHED) {
cm_sleep(1);
}
}
status_t rst_proc(knl_session_t *session)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
status_t status = OG_SUCCESS;
if (rst_restore(session) != OG_SUCCESS) {
bak_set_error(&bak->error_info);
bak_set_error_msg(&bak->error_info, "restore read process failed");
bak->failed = OG_TRUE;
status = OG_ERROR;
}
rst_wait_write_end(bak);
if (bak->failed) {
status = OG_ERROR;
}
if (bak->rst_file.file_type != RESTORE_DATAFILE) {
bak_end(session, OG_TRUE);
}
if (bak->failed) {
status = OG_ERROR;
}
return status;
}
static void rst_backend_proc(thread_t *thread)
{
knl_session_t *session = (knl_session_t *)thread->argument;
bak_context_t *ogx = &session->kernel->backup_ctx;
KNL_SESSION_SET_CURR_THREADID(session, cm_get_current_thread_id());
(void)rst_proc(session);
KNL_SESSION_CLEAR_THREADID(session);
bak_set_error(&ogx->bak.error_info);
}
status_t rst_check_backupset_path(knl_restore_t *param)
{
char path[OG_FILE_NAME_BUFFER_SIZE];
if (param->path.len > OG_MAX_BACKUP_PATH_LEN) {
OG_THROW_ERROR(ERR_EXCEED_MAX_BACKUP_PATH_LEN, T2S(¶m->path), OG_MAX_BACKUP_PATH_LEN);
return OG_ERROR;
}
(void)cm_text2str(¶m->path, path, OG_FILE_NAME_BUFFER_SIZE);
if (cm_check_exist_special_char(path, (uint32)strlen(path)) ||
(param->device == DEVICE_DISK && !cm_dir_exist(path))) {
OG_THROW_ERROR(ERR_INVALID_DIR, path);
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t rst_restore_database(knl_session_t *session, knl_restore_t *param)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
uint32 paral_count = param->parallelism == 0 ? BAK_DEFAULT_PARALLELISM : param->parallelism;
status_t status;
if (bak_set_running(session, ogx) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_BACKUP_IN_PROGRESS, "restore");
return OG_ERROR;
}
if (param->type == RESTORE_BLOCK_RECOVER) {
ogx->block_repairing = OG_TRUE;
status = abr_restore_block_recover(session, param);
bak_unset_running(session, ogx);
ogx->block_repairing = OG_FALSE;
return status;
}
ogx->bak.arch_keep_compressed = session->kernel->attr.restore_keep_arch_compressed;
OG_LOG_RUN_INF("[RESTORE] restore start, device:%d, policy:%s, paral count %u, path:%s, arch_compressed %u, "
"buffer size:%uM", param->device, T2S(¶m->policy), paral_count, T2S_EX(¶m->path),
ogx->bak.arch_keep_compressed, param->buffer_size / SIZE_M(1));
if (rst_prepare(session, param) != OG_SUCCESS) {
bak_end(session, OG_TRUE);
return OG_ERROR;
}
if (param->disconnect) {
if (cm_create_thread(rst_backend_proc, 0, session->kernel->sessions[SESSION_ID_BRU],
&ogx->bak.restore_thread) != OG_SUCCESS) {
bak_end(session, OG_TRUE);
return OG_ERROR;
}
return OG_SUCCESS;
}
return rst_proc(session);
}
static status_t dtc_init_logset_for_pitr(knl_session_t *session, uint8 node_id)
{
knl_instance_t *kernel = session->kernel;
database_t *db = &kernel->db;
log_file_t *logfile = NULL;
logfile_set_t *file_set = LOGFILE_SET(session, node_id);
uint32 i;
status_t ret;
if (session->kernel->id == node_id) {
return OG_SUCCESS;
}
file_set->logfile_hwm = dtc_get_ctrl(session, node_id)->log_hwm;
file_set->log_count = dtc_get_ctrl(session, node_id)->log_count;
for (i = 0; i < file_set->logfile_hwm; i++) {
logfile = &file_set->items[i];
logfile->ctrl = (log_file_ctrl_t *)db_get_log_ctrl_item(db->ctrl.pages, i, sizeof(log_file_ctrl_t),
db->ctrl.log_segment, node_id);
if (LOG_IS_DROPPED(logfile->ctrl->flg)) {
continue;
}
logfile->handle = OG_INVALID_HANDLE;
ret = cm_open_device(logfile->ctrl->name, logfile->ctrl->type, knl_redo_io_flag(session), &logfile->handle);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("[DB] failed to open %s ", logfile->ctrl->name);
return OG_ERROR;
}
}
OG_LOG_RUN_INF("[DB] init logset for node %u finish.", node_id);
return OG_SUCCESS;
}
status_t dtc_log_prepare_for_pitr(knl_session_t *se)
{
knl_instance_t *kernel = se->kernel;
uint32 rst_id = kernel->db.ctrl.core.resetlogs.rst_id;
for (uint32 i = 0; i < kernel->db.ctrl.core.node_count; i++) {
uint32 archive_asn = dtc_get_ctrl(se, i)->rcy_point.asn;
OG_LOG_RUN_INF("[RESTORE] archive_asn %llu.", (uint64)archive_asn);
if (dtc_rst_regist_archive(se, &archive_asn, rst_id, i) != OG_SUCCESS) {
return OG_ERROR;
}
OG_LOG_RUN_INF("[RESTORE] after register, archive_asn %llu.", (uint64)archive_asn);
if (dtc_init_logset_for_pitr(se, i) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] init logset for node %u failed.", i);
return OG_ERROR;
}
uint32 max_asn;
if (arch_try_arch_redo_by_nodeid(se, &max_asn, i) != OG_SUCCESS) {
return OG_ERROR;
}
OG_LOG_RUN_INF("[RESTORE] find redo max_asn %llu.", (uint64)max_asn);
archive_asn = MAX(archive_asn, max_asn);
archive_asn += 1;
OG_LOG_RUN_INF("[RESTORE] set redo asn %llu.", (uint64)archive_asn);
if (dtc_bak_reset_logfile(se, archive_asn, OG_INVALID_ID32, i) != OG_SUCCESS) {
return OG_ERROR;
}
log_reset_inactive_head(se);
}
return OG_SUCCESS;
}
status_t log_prepare_for_pitr_dbstor(knl_session_t *se)
{
arch_ctrl_t *last = arch_get_last_log(se);
uint32 rst_id = se->kernel->db.ctrl.core.resetlogs.rst_id;
uint32 archive_asn = last->asn + 1;
if (arch_try_regist_archive_by_dbstor(se, &archive_asn, rst_id, last->start_lsn,
last->end_lsn, 0) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t dtc_log_prepare_pitr(knl_session_t *se)
{
bool32 is_dbstor = knl_dbs_is_enable_dbs();
if (se->kernel->attr.clustered) {
if (is_dbstor) {
return dtc_rst_amend_all_arch_file_dbstor(se);
}
return dtc_log_prepare_for_pitr(se);
} else {
if (is_dbstor) {
return log_prepare_for_pitr_dbstor(se);
}
return log_prepare_for_pitr(se);
}
}
#ifdef __cplusplus
}
#endif