* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* bak_common.c
*
*
* IDENTIFICATION
* src/kernel/backup/bak_common.c
*
* -------------------------------------------------------------------------
*/
#include "knl_backup_module.h"
#include "bak_common.h"
#include "bak_restore.h"
#include "cm_log.h"
#include "cm_file.h"
#include "cm_list.h"
#include "cs_protocol.h"
#include "knl_context.h"
#include "knl_backup.h"
#include "dtc_dls.h"
#include "dtc_ckpt.h"
#include "dtc_backup.h"
#include "dtc_database.h"
#include "rc_reform.h"
#include "knl_badblock.h"
#ifdef __cplusplus
extern "C" {
#endif
uint32 bak_get_build_stage(bak_stage_t *stage)
{
switch (*stage) {
case BACKUP_START:
return BUILD_START;
case BACKUP_CTRL_STAGE:
return BUILD_CTRL_STAGE;
case BACKUP_HEAD_STAGE:
return BUILD_HEAD_STAGE;
case BACKUP_DATA_STAGE:
return BUILD_DATA_STAGE;
case BACKUP_LOG_STAGE:
return BUILD_LOG_STAGE;
case BACKUP_PARAM_STAGE:
return BUILD_PARAM_STAGE;
case BACKUP_READ_FINISHED:
case BACKUP_END:
return BUILD_SYNC_FINISHED;
default:
return OG_INVALID_ID32;
}
}
void bak_replace_password(char *password)
{
size_t len = strlen(password);
if (len != 0) {
errno_t ret = memset_s(password, len, '*', len);
knl_securec_check(ret);
}
}
bool32 bak_paral_task_enable(knl_session_t *session)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
if (bak->restore && bak->is_noparal_version) {
return OG_FALSE;
} else if (bak->is_building && bak->proc_count == BUILD_SINGLE_THREAD) {
return OG_FALSE;
} else {
return OG_TRUE;
}
}
bool32 bak_log_paral_enable(bak_t *bak)
{
database_t *db = &bak->kernel->db;
if (!DB_IS_PRIMARY(db) || db->status != DB_STATUS_OPEN || bak->record.log_only || bak->record.data_only ||
bak->is_building || bak->restore || BAK_IS_UDS_DEVICE(bak) || DB_IS_RAFT_ENABLED(bak->kernel) ||
!bak->backup_log_prealloc) {
return OG_FALSE;
}
return OG_TRUE;
}
static bool32 bak_ignore_rstlog(knl_session_t *session)
{
database_t *db = &session->kernel->db;
core_ctrl_t *core = &db->ctrl.core;
reset_log_t *reset_log = &core->resetlogs;
uint64 rst_id = session->kernel->arch_ctx.arch_proc[0].last_archived_log.rst_id;
return (reset_log->rst_id == 1 && reset_log->rst_id < rst_id);
}
bool32 bak_need_wait_arch(knl_session_t *session)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
if (bak_ignore_rstlog(session) && !BAK_IS_DBSOTR(bak)) {
return OG_FALSE;
}
if (!DB_IS_PRIMARY(&(session->kernel->db)) || !DB_IS_OPEN(session)) {
OG_LOG_RUN_INF("[BACKUP] Do not need waitting archived log because of database role or database status");
return OG_FALSE;
}
OG_LOG_DEBUG_INF("[BACKUP] need waitting archived log");
return OG_TRUE;
}
status_t bak_check_session_status(knl_session_t *session)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
if (bak->failed) {
return OG_ERROR;
}
if (session->canceled) {
OG_THROW_ERROR(ERR_OPERATION_CANCELED);
return OG_ERROR;
}
if (session->killed) {
OG_THROW_ERROR(ERR_OPERATION_KILLED);
return OG_ERROR;
}
return OG_SUCCESS;
}
static void bak_set_proc_boundary(bak_t *bak, uint32 *start_id, uint32 *end_id, bool32 is_paral_log_proc)
{
if (!bak_log_paral_enable(bak)) {
*start_id = 1;
*end_id = bak->proc_count;
return;
}
if (is_paral_log_proc) {
*start_id = bak->proc_count - 1;
*end_id = bak->proc_count;
return;
}
*start_id = BAK_PARAL_DATA_START_POS;
*end_id = bak->proc_count - (BAK_PARAL_LOG_PROC_NUM - 1);
}
* condition 1: total proc num euqals parallelism add 3 more thread num;
* condition 2: total proc num equals parallelism
* | 0 | 1 | ... | parallelism | +1 | +2 | +3 |
* | common proc | log common proc | data paral proc | log paral proc |
* | common proc | data/log paral proc |
*/
status_t bak_get_free_proc(knl_session_t *session, bak_process_t **proc, bool32 is_paral_log_proc)
{
bak_context_t *bak_ctx = &session->kernel->backup_ctx;
bak_process_t *bg_process = bak_ctx->process;
bak_process_t *process = NULL;
bak_t *bak = &bak_ctx->bak;
uint32 start_id = 0;
uint32 end_id = 0;
*proc = NULL;
if (!bak_paral_task_enable(session)) {
OG_LOG_RUN_ERR("[%s] parallel backup/restore does not enable", bak->restore ? "RESTORE" : "BACKUP");
return OG_ERROR;
}
bak_set_proc_boundary(bak, &start_id, &end_id, is_paral_log_proc);
uint32 curr_id = start_id;
while (!bak->failed) {
if (bg_process[curr_id].is_free) {
process = &bg_process[curr_id];
break;
}
cm_sleep(100);
curr_id++;
curr_id = (curr_id > end_id) ? start_id : curr_id;
}
if (process == NULL) {
OG_LOG_RUN_ERR("[%s] process is NULL", bak->restore ? "RESTORE" : "BACKUP");
return OG_ERROR;
}
*proc = process;
return OG_SUCCESS;
}
void bak_wait_paral_proc(knl_session_t *session, bool32 is_paral_log_proc)
{
bak_context_t *bak_ctx = &session->kernel->backup_ctx;
bak_process_t *bg_process = bak_ctx->process;
bak_t *bak = &bak_ctx->bak;
uint32 start_id = 0;
uint32 end_id = 0;
if (!bak_paral_task_enable(session)) {
return;
}
bak_set_proc_boundary(bak, &start_id, &end_id, is_paral_log_proc);
for (uint32 id = start_id; id <= end_id; id++) {
while (!bak->failed && !bg_process[id].is_free) {
cm_sleep(100);
}
}
OG_LOG_DEBUG_INF("[%s] wait parallel backup/restore bak_task completed", bak->restore ? "RESTORE" : "BACKUP");
}
* return section count and section size of datafile when parallel backup, if filesize exceed section_threshold,
* we divede this datafile as mutilple files, but section count will not exceed proc_count.
* for example, section_threshold = 128M, proc_count = 4
* 1. filesize = 300M, will generate 3 files, sec_size = 128M, [0, 128M], [128M, 256M], [256M, 300M]
* 2. filesize = 1200M, will generate 4 files, sec_size = 300M, [0, 300M], [300M, 600M], [600M, 900M], [900M, 1200M]
* 3. filesize = 100M, do not section, sec_size = filesize = 100M, still backup as one file
*/
uint32 bak_datafile_section_count(knl_session_t *session, uint64 file_size_input, uint32 hwm_start, uint64 *sec_size,
bool32 *diveded)
{
uint64 file_size = file_size_input;
bak_t *bak = &session->kernel->backup_ctx.bak;
uint64 sec_min_size = bak->section_threshold;
uint32 proc_count = bak_log_paral_enable(bak) ? (bak->proc_count - BAK_PARAL_LOG_PROC_NUM) : bak->proc_count;
uint32 sec_num;
uint64 page_count;
uint64 sec_page_count;
if (proc_count == 1 || file_size <= (sec_min_size + hwm_start * DEFAULT_PAGE_SIZE(session))) {
*sec_size = file_size;
sec_num = 1;
*diveded = OG_FALSE;
return sec_num;
}
file_size -= hwm_start * DEFAULT_PAGE_SIZE(session);
page_count = file_size / DEFAULT_PAGE_SIZE(session);
if (file_size >= proc_count * sec_min_size) {
sec_page_count = (page_count - 1) / proc_count + 1;
*sec_size = (CM_ALIGN_ANY(sec_page_count, PAGE_GROUP_COUNT)) * DEFAULT_PAGE_SIZE(session);
sec_num = proc_count;
} else {
*sec_size = sec_min_size;
sec_page_count = sec_min_size / DEFAULT_PAGE_SIZE(session);
sec_num = page_count % sec_page_count == 0 ? (uint32)(page_count / sec_page_count)
: (uint32)(page_count / sec_page_count) + 1;
}
*diveded = OG_TRUE;
return sec_num;
}
status_t bak_encrypt_rand_iv(bak_file_t *file)
{
unsigned char iv[BAK_DEFAULT_GCM_IV_LENGTH];
errno_t ret;
if (cm_rand(iv, BAK_DEFAULT_GCM_IV_LENGTH) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_CRYPTION_ERROR, "failed to acquire random iv");
return OG_ERROR;
}
ret = memcpy_sp(file->gcm_iv, BAK_DEFAULT_GCM_IV_LENGTH, iv, BAK_DEFAULT_GCM_IV_LENGTH);
knl_securec_check(ret);
return OG_SUCCESS;
}
status_t bak_encrypt_init(bak_t *bak, bak_encrypt_ctx_t *encrypt_ctx, bak_file_t *file, bool32 is_encrypt)
{
unsigned char iv[BAK_DEFAULT_GCM_IV_LENGTH];
const unsigned char *key = (const unsigned char *)bak->key;
errno_t ret;
int32 res;
res = EVP_CIPHER_CTX_init(encrypt_ctx->ogx);
if (res == 0) {
OG_THROW_ERROR(ERR_CRYPTION_ERROR, "failed to init evp cipher ogx");
return OG_ERROR;
}
if (is_encrypt) {
if (cm_rand(iv, BAK_DEFAULT_GCM_IV_LENGTH) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_CRYPTION_ERROR, "failed to acquire random iv");
return OG_ERROR;
}
res = EVP_EncryptInit_ex(encrypt_ctx->ogx, EVP_aes_256_gcm(), NULL, key, (const unsigned char *)iv);
ret = memcpy_sp(file->gcm_iv, BAK_DEFAULT_GCM_IV_LENGTH, iv, BAK_DEFAULT_GCM_IV_LENGTH);
knl_securec_check(ret);
} else {
res = EVP_DecryptInit_ex(encrypt_ctx->ogx, EVP_aes_256_gcm(), NULL, key, (const unsigned char *)file->gcm_iv);
}
if (res == 0) {
OG_THROW_ERROR(ERR_CRYPTION_ERROR, "failed to init cryption ogx");
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t bak_encrypt_end(bak_t *bak, bak_encrypt_ctx_t *encrypt_ctx)
{
int32 out_len;
int32 res;
res = EVP_EncryptFinal_ex(encrypt_ctx->ogx, (unsigned char *)encrypt_ctx->encrypt_buf.aligned_buf, &out_len);
if (res == 0) {
OG_THROW_ERROR(ERR_CRYPTION_ERROR, "failed to finalize the encryption");
return OG_ERROR;
}
res = EVP_CIPHER_CTX_ctrl(encrypt_ctx->ogx, EVP_CTRL_AEAD_GET_TAG, EVP_GCM_TLS_TAG_LEN,
encrypt_ctx->encrypt_buf.aligned_buf);
if (res == 0) {
OG_THROW_ERROR(ERR_CRYPTION_ERROR, "failed to get the encryption tag");
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t bak_decrypt_end(bak_t *bak, bak_encrypt_ctx_t *encrypt_ctx, bak_file_t *file, bool32 ignore_logfile)
{
int32 res;
int32 outlen;
if (ignore_logfile) {
return OG_SUCCESS;
}
res = EVP_CIPHER_CTX_ctrl(encrypt_ctx->ogx, EVP_CTRL_AEAD_SET_TAG, EVP_GCM_TLS_TAG_LEN, (void *)file->gcm_tag);
if (res == 0) {
OG_THROW_ERROR(ERR_CRYPTION_ERROR, "failed to set tag");
return OG_ERROR;
}
res = EVP_DecryptFinal_ex(encrypt_ctx->ogx, (unsigned char *)encrypt_ctx->encrypt_buf.aligned_buf, &outlen);
if (res == 0) {
OG_THROW_ERROR(ERR_CRYPTION_ERROR, "failed to verify the tag, the data may be changed");
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t bak_encrypt_alloc(bak_t *bak, bak_encrypt_ctx_t *encrypt_ctx)
{
encrypt_ctx->ogx = EVP_CIPHER_CTX_new();
if (encrypt_ctx->ogx == NULL) {
OG_THROW_ERROR(ERR_CRYPTION_ERROR, "failed to alloc the cryption ogx");
return OG_ERROR;
}
return OG_SUCCESS;
}
static void bak_encrypt_free(bak_t *bak, bak_encrypt_ctx_t *encrypt_ctx)
{
EVP_CIPHER_CTX_free(encrypt_ctx->ogx);
encrypt_ctx->ogx = NULL;
}
status_t bak_alloc_encrypt_context(knl_session_t *session)
{
bak_context_t *backup_ctx = &session->kernel->backup_ctx;
bak_t *bak = &backup_ctx->bak;
bak_process_t *proc = NULL;
uint32 proc_count = bak->proc_count;
if (bak->encrypt_info.encrypt_alg == ENCRYPT_NONE) {
return OG_SUCCESS;
}
proc = &backup_ctx->process[BAK_COMMON_PROC];
if (bak_encrypt_alloc(bak, &proc->encrypt_ctx) != OG_SUCCESS) {
return OG_ERROR;
}
for (uint32 i = 1; i <= proc_count; i++) {
proc = &backup_ctx->process[i];
if (bak_encrypt_alloc(bak, &proc->encrypt_ctx) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
void bak_free_encrypt_context(knl_session_t *session)
{
bak_context_t *backup_ctx = &session->kernel->backup_ctx;
bak_t *bak = &backup_ctx->bak;
bak_process_t *proc = NULL;
uint32 proc_count = bak->proc_count;
if (bak->encrypt_info.encrypt_alg == ENCRYPT_NONE) {
return;
}
proc = &backup_ctx->process[BAK_COMMON_PROC];
bak_encrypt_free(bak, &proc->encrypt_ctx);
for (uint32 i = 1; i <= proc_count; i++) {
proc = &backup_ctx->process[i];
bak_encrypt_free(bak, &proc->encrypt_ctx);
}
}
status_t rst_decrypt_data(bak_process_t *proc, const char *buf, int32 size, uint32 left_size)
{
int32 outlen = 0;
int32 res;
res = EVP_DecryptUpdate(proc->encrypt_ctx.ogx,
(unsigned char *)proc->encrypt_ctx.encrypt_buf.aligned_buf + left_size, &outlen,
(const unsigned char *)buf, size);
if (res == 0) {
OG_THROW_ERROR(ERR_CRYPTION_ERROR, "failed to decrypt the data");
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t bak_encrypt_data(bak_process_t *proc, const char *buf, int32 size)
{
int32 outlen = 0;
int32 res;
res = EVP_EncryptUpdate(proc->encrypt_ctx.ogx, (unsigned char *)proc->encrypt_ctx.encrypt_buf.aligned_buf, &outlen,
(const unsigned char *)buf, size);
if (res == 0) {
OG_THROW_ERROR(ERR_CRYPTION_ERROR, "failed to encrypt the data");
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t bak_alloc_compress_context(knl_session_t *session, bool32 is_compress)
{
bak_context_t *backup_ctx = &session->kernel->backup_ctx;
bak_t *bak = &backup_ctx->bak;
bak_process_t *proc = NULL;
uint32 proc_count = bak->proc_count;
if (bak->record.attr.compress == COMPRESS_NONE) {
return OG_SUCCESS;
}
if (knl_compress_alloc(bak->record.attr.compress, &bak->compress_ctx, is_compress) != OG_SUCCESS) {
return OG_ERROR;
}
for (uint32 i = 1; i <= proc_count; i++) {
proc = &backup_ctx->process[i];
if (knl_compress_alloc(bak->record.attr.compress, &proc->compress_ctx, is_compress) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
void bak_free_compress_context(knl_session_t *session, bool32 is_compress)
{
bak_context_t *backup_ctx = &session->kernel->backup_ctx;
bak_t *bak = &backup_ctx->bak;
bak_process_t *proc = NULL;
uint32 proc_count = bak->proc_count;
if (bak->record.attr.compress == COMPRESS_NONE) {
return;
}
knl_compress_free(bak->record.attr.compress, &bak->compress_ctx, is_compress);
for (uint32 i = 1; i <= proc_count; i++) {
proc = &backup_ctx->process[i];
knl_compress_free(bak->record.attr.compress, &proc->compress_ctx, is_compress);
}
}
status_t bak_write_lz4_compress_head(bak_t *bak, bak_process_t *proc, bak_local_t *bak_file)
{
LZ4F_preferences_t ref = LZ4F_INIT_PREFERENCES;
char *lz4_write_buf = NULL;
size_t res;
if (bak->record.attr.compress != COMPRESS_LZ4) {
return OG_SUCCESS;
}
ref.compressionLevel = bak->compress_ctx.compress_level;
res = LZ4F_compressBegin(proc->compress_ctx.lz4f_cstream, proc->compress_ctx.compress_buf.aligned_buf,
(uint32)COMPRESS_BUFFER_SIZE(bak), &ref);
if (LZ4F_isError(res)) {
OG_THROW_ERROR(ERR_COMPRESS_ERROR, "lz4f", res, LZ4F_getErrorName(res));
return OG_ERROR;
}
lz4_write_buf = proc->compress_ctx.compress_buf.aligned_buf;
if (bak->encrypt_info.encrypt_alg != ENCRYPT_NONE) {
if (bak_encrypt_data(proc, proc->compress_ctx.compress_buf.aligned_buf, (int32)res) != OG_SUCCESS) {
return OG_ERROR;
}
lz4_write_buf = proc->encrypt_ctx.encrypt_buf.aligned_buf;
}
if (bak_local_write(bak_file, lz4_write_buf, (int32)res, bak, bak_file->size) != OG_SUCCESS) {
return OG_ERROR;
}
bak_file->size += res;
return OG_SUCCESS;
}
static status_t bak_get_last_recid(knl_session_t *session, uint64 *record_id)
{
knl_cursor_t *cursor = NULL;
CM_SAVE_STACK(session->stack);
knl_set_session_scn(session, OG_INVALID_ID64);
cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_BACKUP_SET_ID, 0);
cursor->index_dsc = OG_TRUE;
knl_init_index_scan(cursor, OG_FALSE);
knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, 0);
knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, 0);
if (OG_SUCCESS != knl_fetch(session, cursor)) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
if (cursor->eof) {
*record_id = 0;
} else {
cm_decode_row((char *)cursor->row, cursor->offsets, cursor->lens, NULL);
*record_id = *(uint64 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_RECID);
}
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
static status_t bak_get_record(knl_session_t *session, char *tag, bak_record_t *record)
{
text_t value;
CM_SAVE_STACK(session->stack);
knl_set_session_scn(session, OG_INVALID_ID64);
knl_cursor_t *cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_BACKUP_SET_ID, IX_SYS_BACKUPSET_002_ID);
knl_init_index_scan(cursor, OG_TRUE);
knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_STRING, (void *)tag,
(uint16)strlen(tag), IX_COL_SYS_BACKUPSET_002_TAG);
if (knl_fetch(session, cursor) != OG_SUCCESS) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
if (cursor->eof) {
OG_THROW_ERROR_EX(ERR_INVALID_OPERATION, ", tag %s does not exist in sys_backup_sets", tag);
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
cm_decode_row((char *)cursor->row, cursor->offsets, cursor->lens, NULL);
record->attr.backup_type = *(uint32 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_TYPE);
record->device = *(uint32 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_DEVICE_TYPE);
value.str = CURSOR_COLUMN_DATA(cursor, BAK_COL_DIR);
value.len = CURSOR_COLUMN_SIZE(cursor, BAK_COL_DIR);
(void)cm_text2str(&value, record->path, OG_FILE_NAME_BUFFER_SIZE);
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
static status_t bak_delete_record(knl_session_t *session, char *tag)
{
status_t status;
CM_SAVE_STACK(session->stack);
knl_cursor_t *cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_DELETE, SYS_BACKUP_SET_ID, IX_SYS_BACKUPSET_002_ID);
knl_init_index_scan(cursor, OG_TRUE);
knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_STRING, (void *)tag,
(uint16)strlen(tag), IX_COL_SYS_BACKUPSET_002_TAG);
if (knl_fetch(session, cursor) != OG_SUCCESS) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
if (cursor->eof) {
OG_THROW_ERROR_EX(ERR_INVALID_OPERATION, ", tag %s does not exist in sys_backup_sets", tag);
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
status = knl_internal_delete(session, cursor);
CM_RESTORE_STACK(session->stack);
if (status == OG_SUCCESS) {
knl_commit(session);
}
return status;
}
static status_t bak_save_record(knl_session_t *session, bak_record_t *record, uint64 recid)
{
status_t status;
row_assist_t ra;
knl_cursor_t *cursor = NULL;
uint32 max_size;
bak_t *bak = &session->kernel->backup_ctx.bak;
uint32 max_buffer_size;
CM_SAVE_STACK(session->stack);
cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_INSERT, SYS_BACKUP_SET_ID, OG_INVALID_ID32);
max_size = session->kernel->attr.max_row_size;
row_init(&ra, (char *)cursor->row, max_size, BAK_COL_RCY_LSN + 1);
(void)row_put_int64(&ra, recid);
(void)row_put_int32(&ra, record->attr.backup_type);
(void)row_put_int32(&ra, record->data_only ? BACKUP_DATA_STAGE : BACKUP_LOG_STAGE);
(void)row_put_int32(&ra, record->status);
(void)row_put_int32(&ra, record->attr.level);
(void)row_put_str(&ra, record->attr.tag);
(void)row_put_int64(&ra, record->ctrlinfo.scn);
(void)row_put_int64(&ra, record->ctrlinfo.lsn);
(void)row_put_int32(&ra, record->device);
(void)row_put_str(&ra, record->attr.base_tag);
(void)row_put_str(&ra, record->path);
(void)row_put_int32(&ra, session->kernel->db.ctrl.core.resetlogs.rst_id);
(void)row_put_str(&ra, record->policy);
(void)row_put_int32(&ra, record->ctrlinfo.rcy_point.asn);
(void)row_put_int64(&ra, record->ctrlinfo.rcy_point.block_id);
(void)row_put_int64(&ra, record->ctrlinfo.rcy_point.lfn);
(void)row_put_int32(&ra, record->ctrlinfo.lrp_point.asn);
(void)row_put_int64(&ra, record->ctrlinfo.lrp_point.block_id);
(void)row_put_int64(&ra, record->ctrlinfo.lrp_point.lfn);
(void)row_put_timestamp(&ra, record->start_time);
(void)row_put_timestamp(&ra, record->data_only ? (uint64)cm_now() : record->completion_time);
if (record->attr.base_buffer_size >= bak->backup_buf_size) {
max_buffer_size = record->attr.base_buffer_size;
} else {
max_buffer_size = bak->backup_buf_size;
}
(void)row_put_int32(&ra, max_buffer_size);
(void)row_put_str(&ra, session->kernel->attr.db_version);
uint64 min_lsn = dtc_get_min_lsn_lrp_point(record);
(void)row_put_int64(&ra, min_lsn);
status = knl_internal_insert(session, cursor);
if (status == OG_SUCCESS) {
knl_commit(session);
}
CM_RESTORE_STACK(session->stack);
return status;
}
static status_t bak_update_record(knl_session_t *session, bak_record_t *record)
{
knl_cursor_t *cursor = NULL;
row_assist_t ra;
uint16 size;
status_t status;
knl_set_session_scn(session, OG_INVALID_ID64);
CM_SAVE_STACK(session->stack);
cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_UPDATE, SYS_BACKUP_SET_ID, 1);
knl_init_index_scan(cursor, OG_TRUE);
knl_set_scan_key(INDEX_DESC(cursor->index), &cursor->scan_range.l_key, OG_TYPE_STRING, (void *)record->attr.tag,
(uint16)strlen(record->attr.tag), 0);
if (knl_fetch(session, cursor) != OG_SUCCESS) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
knl_panic_log(!cursor->eof, "data is not found, panic info: page %u-%u type %u table %s index %s",
cursor->rowid.file, cursor->rowid.page, ((page_head_t *)cursor->page_buf)->type,
((table_t *)cursor->table)->desc.name, ((index_t *)cursor->index)->desc.name);
row_init(&ra, cursor->update_info.data, HEAP_MAX_ROW_SIZE(session), 4);
(void)row_put_int32(&ra, record->data_only ? BACKUP_DATA_STAGE : BACKUP_LOG_STAGE);
(void)row_put_int32(&ra, record->status);
(void)row_put_int64(&ra, record->finish_scn);
(void)row_put_int64(&ra, record->completion_time);
cursor->update_info.count = 4;
cursor->update_info.columns[0] = BAK_COL_STAGE;
cursor->update_info.columns[1] = BAK_COL_STATUS;
cursor->update_info.columns[2] = BAK_COL_SCN;
cursor->update_info.columns[3] = BAK_COL_COMPLETION_TIME;
cm_decode_row(cursor->update_info.data, cursor->update_info.offsets, cursor->update_info.lens, &size);
status = knl_internal_update(session, cursor);
CM_RESTORE_STACK(session->stack);
if (status == OG_SUCCESS) {
knl_commit(session);
}
return status;
}
status_t bak_record_backup_set(knl_session_t *session, bak_record_t *record)
{
uint64 recid;
if (DB_IS_READONLY(session)) {
OG_LOG_RUN_WAR("[BACKUP] do not record backup set information in read-only mode");
return OG_SUCCESS;
}
if (!record->log_only || record->attr.backup_type == BACKUP_MODE_ARCHIVELOG) {
if (bak_get_last_recid(session, &recid) != OG_SUCCESS) {
return OG_ERROR;
}
recid++;
if (bak_save_record(session, record, recid) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_BACKUP_RECORD_FAILED);
return OG_ERROR;
}
} else {
if (bak_update_record(session, record) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_BACKUP_RECORD_FAILED);
return OG_ERROR;
}
}
return OG_SUCCESS;
}
void bak_set_progress(knl_session_t *session, bak_stage_t stage, uint64 data_size)
{
bak_context_t *backup_ctx = &session->kernel->backup_ctx;
bak_t *bak = &backup_ctx->bak;
bak_progress_t *progress = &bak->progress;
if (BAK_IS_FULL_BUILDING(bak) && !bak->is_first_link &&
bak_get_build_stage(&progress->stage) >= bak_get_build_stage(&stage)) {
OG_LOG_RUN_INF("[BUILD] reset progress stage to [%u] for break-point building", (uint32)progress->stage);
progress->stage = progress->build_progress.stage;
return;
}
cm_spin_lock(&progress->lock, NULL);
progress->processed_size = 0;
progress->stage = stage;
progress->data_size = data_size;
progress->base_rate += progress->weight;
if (bak->restore) {
if (bak->curr_id > 0 && (stage == BACKUP_HEAD_STAGE || stage == BACKUP_CTRL_STAGE)) {
progress->weight = 0;
} else if (stage == BACKUP_DATA_STAGE || stage == BACKUP_BUILD_STAGE) {
progress->weight = BAK_DATE_WEIGHT / (bak->depend_num + 2);
} else {
progress->weight = backup_ctx->stage_weight[stage];
}
} else {
progress->weight = backup_ctx->stage_weight[stage];
}
cm_spin_unlock(&progress->lock);
OG_LOG_DEBUG_INF("[BACKUP] data size %llu file name %s stage %u", data_size, bak->local.name, stage);
}
void bak_update_progress(bak_t *bak, uint64 size)
{
bak_progress_t *progress = &bak->progress;
cm_spin_lock(&progress->update_lock, NULL);
progress->processed_size += size;
cm_spin_unlock(&progress->update_lock);
}
void bak_set_progress_end(bak_t *bak)
{
bak_progress_t *progress = &bak->progress;
cm_spin_lock(&progress->lock, NULL);
progress->processed_size = 0;
progress->data_size = 0;
progress->stage = BACKUP_END;
progress->weight = 0;
if (!bak->restore && bak->record.data_only) {
progress->base_rate = BAK_CTRL_WEIGHT + BAK_DATE_WEIGHT;
} else {
progress->base_rate = 100;
}
cm_spin_unlock(&progress->lock);
}
void bak_reset_progress(bak_progress_t *progress)
{
progress->processed_size = 0;
progress->data_size = 0;
progress->stage = BACKUP_START;
progress->base_rate = 0;
progress->weight = 0;
}
void bak_reset_error(bak_error_t *error)
{
error->err_code = OG_SUCCESS;
error->err_msg[0] = '\0';
}
status_t bak_init_uds(uds_link_t *link, const char *sun_path)
{
if (cs_create_uds_socket(&link->sock) != OG_SUCCESS) {
return OG_ERROR;
}
socket_attr_t sock_attr = { .connect_timeout = OG_CONNECT_TIMEOUT, .l_onoff = 1, .l_linger = 1 };
if (cs_uds_connect(sun_path, NULL, link, &sock_attr) != OG_SUCCESS) {
(void)cs_close_socket(link->sock);
return OG_ERROR;
}
return OG_SUCCESS;
}
static inline bool32 bak_need_set_retry(bak_t *bak)
{
bak_stage_t *stage = &bak->progress.build_progress.stage;
return (BAK_IS_FULL_BUILDING(bak) && bak_get_build_stage(stage) <= BUILD_HEAD_STAGE);
}
status_t bak_agent_send(bak_t *bak, const char *buf, int32 size)
{
if (bak->is_building) {
if (cs_write_stream_timeout(bak->remote.pipe, buf, (uint32)size,
(int32)cm_atomic_get(&bak->kernel->attr.repl_pkg_size),
OG_BUILD_SEND_TIMEOUT) != OG_SUCCESS) {
bak->need_retry = bak_need_set_retry(bak) ? OG_TRUE : OG_FALSE;
OG_LOG_RUN_INF("[BACKUP] send failed, need_retry : %u", bak->need_retry);
return OG_ERROR;
}
return OG_SUCCESS;
} else {
return cs_uds_send_timed(&bak->remote.uds_link, buf, size,
bak->kernel->attr.nbu_backup_timeout * MILLISECS_PER_SECOND);
}
}
status_t bak_agent_recv(bak_t *bak, char *buf, int32 size)
{
if (bak->is_building) {
int32 recv_size;
if (cs_read_stream(bak->remote.pipe, buf, OG_DEFAULT_NULL_VALUE, (uint32)size, &recv_size) != OG_SUCCESS) {
bak->need_retry = bak_need_set_retry(bak) ? OG_TRUE : OG_FALSE;
OG_LOG_RUN_INF("[BACKUP] receive failed, need_retry : %u", bak->need_retry);
return OG_ERROR;
}
if (recv_size != size) {
bak->need_retry = bak_need_set_retry(bak) ? OG_TRUE : OG_FALSE;
OG_LOG_RUN_INF("[BACKUP] invalid recv_size %u received, expected size is %u, need_retry: %u",
(uint32)recv_size, (uint32)size, bak->need_retry);
return OG_ERROR;
}
return OG_SUCCESS;
} else {
return cs_uds_recv_timed(&bak->remote.uds_link, buf, size, OG_DEFAULT_NULL_VALUE);
}
}
status_t bak_agent_wait_pkg(bak_t *bak, bak_package_type_t ack)
{
bak_agent_head_t head;
if (bak_agent_recv(bak, (char *)&head, sizeof(bak_agent_head_t)) != OG_SUCCESS) {
return OG_ERROR;
}
if (head.cmd != ack) {
OG_THROW_ERROR(ERR_NOT_EXPECTED_BACKUP_PACKET, ack, head.cmd);
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t bak_agent_file_start(bak_t *bak, const char *path, uint32 type, uint32 file_id)
{
bak_agent_head_t head;
bak_start_msg_t start_msg;
errno_t ret;
head.ver = BAK_AGENT_PROTOCOL;
head.cmd = BAK_PKG_FILE_START;
head.len = sizeof(bak_agent_head_t) + sizeof(bak_start_msg_t);
head.serial_number = 0;
head.flags = 0;
head.reserved = 0;
bak->remote.serial_number = 0;
start_msg.type = type;
start_msg.file_id = file_id;
start_msg.frag_id = 0;
start_msg.curr_file_index = bak->curr_file_index;
ret = strcpy_sp(start_msg.policy, OG_BACKUP_PARAM_SIZE, bak->record.policy);
knl_securec_check(ret);
ret = strcpy_sp(start_msg.path, OG_FILE_NAME_BUFFER_SIZE, path);
knl_securec_check(ret);
OG_LOG_DEBUG_INF("[BACKUP] send start agent, type:%d, len:%u, msg type:%d, policy:%s, path:%s", type, head.len,
start_msg.type, start_msg.policy, start_msg.path);
if (bak_agent_send(bak, (char *)&head, sizeof(bak_agent_head_t)) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_agent_send(bak, (char *)&start_msg, sizeof(start_msg)) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_agent_wait_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t bak_agent_send_pkg(bak_t *bak, bak_package_type_t end_type)
{
bak_agent_head_t head;
head.ver = BAK_AGENT_PROTOCOL;
head.serial_number = bak->remote.serial_number++;
head.cmd = end_type;
head.len = sizeof(bak_agent_head_t);
head.flags = 0;
head.reserved = 0;
OG_LOG_DEBUG_INF("[BACKUP] send type %d", end_type);
if (bak_agent_send(bak, (char *)&head, sizeof(bak_agent_head_t)) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t bak_agent_write(bak_t *process, const char *buf, int32 size)
{
bak_agent_head_t head;
int32 offset = 0;
int32 remain_size = size;
int32 data_size;
head.ver = BAK_AGENT_PROTOCOL;
head.cmd = BAK_PKG_DATA;
head.flags = 0;
head.reserved = 0;
while (!process->failed && remain_size > 0) {
data_size = remain_size > (int32)BACKUP_BUFFER_SIZE(process) ? (int32)BACKUP_BUFFER_SIZE(process) : remain_size;
head.len = (uint32)data_size + sizeof(bak_agent_head_t);
head.serial_number = process->remote.serial_number++;
if (bak_agent_send(process, (char *)&head, sizeof(bak_agent_head_t)) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_agent_send(process, buf + offset, data_size) != OG_SUCCESS) {
return OG_ERROR;
}
offset += data_size;
remain_size -= data_size;
}
return process->failed ? OG_ERROR : OG_SUCCESS;
}
status_t rst_agent_read_head(bak_t *process, bak_package_type_t expected_type, uint32 *data_size, bool32 *read_end)
{
bak_agent_head_t head;
if (bak_agent_recv(process, (char *)&head, sizeof(bak_agent_head_t)) != OG_SUCCESS) {
return OG_ERROR;
}
if (head.cmd == BAK_PKG_FILE_END) {
*read_end = OG_TRUE;
return OG_SUCCESS;
}
if (head.cmd == BAK_PKG_ERROR && head.len > (sizeof(bak_agent_head_t) + sizeof(int32))) {
if (bak_agent_recv(process, (char *)&process->error_info.err_code, sizeof(int32)) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_agent_recv(process, process->error_info.err_msg, head.len - sizeof(bak_agent_head_t) - sizeof(int32)) !=
OG_SUCCESS) {
return OG_ERROR;
}
OG_THROW_ERROR(ERR_BACKUP_RESTORE, "build", process->error_info.err_msg);
return OG_ERROR;
}
if (head.cmd != expected_type) {
OG_THROW_ERROR(ERR_NOT_EXPECTED_BACKUP_PACKET, expected_type, head.cmd);
return OG_ERROR;
}
if (head.len <= sizeof(bak_agent_head_t)) {
OG_THROW_ERROR(ERR_INVALID_BACKUP_PACKET, head.len);
return OG_ERROR;
}
*data_size = head.len - sizeof(bak_agent_head_t);
return OG_SUCCESS;
}
status_t rst_agent_read(bak_t *bak, char *buf, uint32 buf_size, int32 *read_size, bool32 *read_end)
{
uint32 remain_size;
uint32 offset;
uint32 size;
*read_end = OG_FALSE;
remain_size = buf_size;
offset = 0;
while (remain_size > 0 && !bak->failed) {
if (bak->remote.remain_data_size == 0) {
if (rst_agent_read_head(bak, BAK_PKG_DATA, &bak->remote.remain_data_size, read_end) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (*read_end) {
break;
}
knl_panic(bak->remote.remain_data_size > 0);
size = remain_size > bak->remote.remain_data_size ? bak->remote.remain_data_size : remain_size;
if (bak_agent_recv(bak, buf + offset, size) != OG_SUCCESS) {
return OG_ERROR;
}
remain_size -= size;
offset += size;
bak->remote.remain_data_size -= size;
}
*read_size = (int32)offset;
return bak->failed ? OG_ERROR : OG_SUCCESS;
}
status_t bak_set_running(knl_session_t *session, bak_context_t *ogx)
{
status_t status = OG_ERROR;
if (!BAK_NOT_WORK(ogx)) {
return OG_ERROR;
}
if (DB_ATTR_CLUSTER(session)) {
if (dls_spin_try_lock(session, &ogx->lock) == OG_FALSE) {
return OG_ERROR;
}
if (BAK_NOT_WORK(ogx)) {
ogx->bak_condition = RUNNING;
status = OG_SUCCESS;
}
dls_spin_unlock(session, &ogx->lock);
} else {
cm_spin_lock(&ogx->lock.lock, NULL);
if (BAK_NOT_WORK(ogx)) {
ogx->bak_condition = RUNNING;
status = OG_SUCCESS;
}
cm_spin_unlock(&ogx->lock.lock);
}
return status;
}
void bak_check_node_status(knl_session_t *session, bool32 *running)
{
cluster_view_t view;
for (uint32 i = 0; i < g_dtc->profile.node_count; i++) {
if (i == g_dtc->profile.inst_id) {
continue;
}
rc_get_cluster_view(&view, OG_FALSE);
if (!rc_bitmap64_exist(&view.bitmap, i)) {
OG_LOG_RUN_WAR("[BACKUP] inst id (%u) is not alive, alive bitmap: %llu", i, view.bitmap);
continue;
}
if (dtc_bak_running(session, i, running) != OG_SUCCESS) {
OG_LOG_RUN_WAR("[BACKUP] fail to get backup status from node %u.", i);
continue;
}
if (*running != OG_FALSE) {
OG_LOG_RUN_ERR("[BACKUP] backup process is running in node %u, do not clean archived logfiles.", i);
break;
}
}
cm_reset_error();
return;
}
status_t bak_set_process_running(knl_session_t *session)
{
status_t status = OG_ERROR;
bool32 running = OG_FALSE;
bak_context_t *ogx = &session->kernel->backup_ctx;
if (!BAK_NOT_WORK(ogx)) {
return OG_ERROR;
}
cm_spin_lock(&ogx->lock.lock, NULL);
if (BAK_NOT_WORK(ogx)) {
bak_check_node_status(session, &running);
if (running == OG_FALSE) {
ogx->bak_condition = RUNNING;
status = OG_SUCCESS;
}
}
cm_spin_unlock(&ogx->lock.lock);
OG_LOG_RUN_INF("[BACKUP] set backup running result status %u", status);
return status;
}
void bak_unset_process_running(knl_session_t *session)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
cm_spin_lock(&ogx->lock.lock, NULL);
ogx->bak_condition = NOT_RUNNING;
cm_spin_unlock(&ogx->lock.lock);
}
status_t bak_set_build_running(knl_session_t *session, bak_context_t *ogx, build_progress_t *build_progress)
{
bak_t *bak = &ogx->bak;
build_progress_t *local_build_progress = &bak->progress.build_progress;
status_t status = OG_ERROR;
if (build_progress->start_time == 0) {
if (!BAK_NOT_WORK(ogx)) {
OG_THROW_ERROR(ERR_BACKUP_IN_PROGRESS, "backup");
return OG_ERROR;
}
if (dls_spin_try_lock(session, &ogx->lock) == OG_FALSE) {
return OG_ERROR;
}
if (BAK_NOT_WORK(ogx)) {
ogx->bak_condition = RUNNING;
status = OG_SUCCESS;
}
dls_spin_unlock(session, &ogx->lock);
return status;
}
while (BAK_IS_RUNNING(ogx)) {
if (bak_check_session_status(session) != OG_SUCCESS) {
return OG_ERROR;
}
cm_sleep(1);
}
if (build_progress->start_time != local_build_progress->start_time) {
OG_LOG_RUN_INF("[BUILD] standby build start time [%d] is not equal primary build start time [%d]",
build_progress->start_time, local_build_progress->start_time);
OG_THROW_ERROR(ERR_INVALID_OPERATION, " : The break-point between primary and stanby is not equal");
return OG_ERROR;
}
if (!BAK_IS_KEEP_ALIVE(ogx)) {
OG_LOG_RUN_INF("[BUILD] timeout for break-point building");
OG_THROW_ERROR(ERR_BACKUP_TIMEOUT, ":Timeout for break-point building");
return OG_ERROR;
}
dls_spin_lock(session, &ogx->lock, NULL);
if (BAK_IS_KEEP_ALIVE(ogx)) {
if (!bak_parameter_is_valid(build_progress)) {
OG_LOG_RUN_INF("[BUILD] Break-point parameters from standby database are not effective.");
OG_THROW_ERROR(ERR_INVALID_OPERATION, " : Break-point parameters from standby database are not effective");
}
ogx->bak_condition = RUNNING;
status = OG_SUCCESS;
}
dls_spin_unlock(session, &ogx->lock);
return status;
}
void bak_unset_running(knl_session_t *session, bak_context_t *ogx)
{
OG_LOG_RUN_INF("[BACKUP] RETRY : %u", ogx->bak.need_retry);
if (session->kernel->attr.clustered) {
ogx->bak_condition = NOT_RUNNING;
dls_spin_unlock(session, &ogx->lock);
} else {
cm_spin_lock(&ogx->lock.lock, NULL);
ogx->bak_condition = NOT_RUNNING;
cm_spin_unlock(&ogx->lock.lock);
}
}
void bak_unset_build_running(knl_session_t *session, bak_context_t *ogx)
{
bak_progress_t *progress = &ogx->bak.progress;
dls_spin_lock(session, &ogx->lock, NULL);
if (ogx->bak.need_retry) {
ogx->bak.need_retry = OG_FALSE;
progress->stage = BACKUP_END;
ogx->keep_live_start_time = cm_current_time();
ogx->bak_condition = KEEP_ALIVE;
OG_LOG_RUN_INF("[BUILD] progress stage : %u", progress->stage);
OG_LOG_RUN_INF("[BUILD] set keep alive condition");
} else {
ogx->bak.need_retry = OG_FALSE;
progress->stage = BACKUP_END;
ogx->bak_condition = NOT_RUNNING;
}
dls_spin_unlock(session, &ogx->lock);
}
void bak_set_error(bak_error_t *error_info)
{
int32 err_code;
const char *error_msg = NULL;
size_t msg_len;
errno_t ret;
cm_get_error(&err_code, &error_msg, NULL);
if (err_code != 0 && error_info->err_code == 0) {
cm_spin_lock(&error_info->err_lock, NULL);
if (error_info->err_code == 0) {
error_info->err_code = err_code;
msg_len = strlen(error_msg) + 1;
ret = memcpy_sp(error_info->err_msg, OG_MESSAGE_BUFFER_SIZE, error_msg, msg_len);
knl_securec_check(ret);
}
cm_spin_unlock(&error_info->err_lock);
}
}
void bak_set_error_msg(bak_error_t *error_info, const char *msg)
{
errno_t ret;
if (msg == NULL || msg[0] == '\0' || strlen(error_info->err_msg) != 0) {
return;
}
cm_spin_lock(&error_info->err_lock, NULL);
if (strlen(error_info->err_msg) == 0) {
ret = strcpy_s(error_info->err_msg, OG_MESSAGE_BUFFER_SIZE, msg);
knl_securec_check(ret);
}
cm_spin_unlock(&error_info->err_lock);
}
void bak_set_fail_error(bak_error_t *error_info, const char *str)
{
int32 err_code;
const char *error_msg = NULL;
cm_get_error(&err_code, &error_msg, NULL);
if (err_code != 0) {
if (error_info->err_code == 0) {
bak_set_error(error_info);
}
}
if (strlen(error_info->err_msg) == 0) {
bak_set_error_msg(error_info, "process failed without detailed error");
}
OG_THROW_ERROR(ERR_BACKUP_RESTORE, str, error_info->err_msg);
if (error_info->err_code == 0) {
bak_set_error(error_info);
}
}
status_t bak_agent_command(bak_t *bak, bak_package_type_t type)
{
if (!BAK_IS_UDS_DEVICE(bak)) {
return OG_SUCCESS;
}
if (bak_agent_send_pkg(bak, type) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_agent_wait_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
void bak_calc_head_checksum(bak_head_t *head, uint32 size)
{
uint16 cks_head;
uint16 cks_file;
uint32 tmp_cks;
head->attr.head_checksum = OG_INVALID_CHECKSUM;
head->attr.file_checksum = OG_INVALID_CHECKSUM;
tmp_cks = cm_get_checksum(head, sizeof(bak_head_t));
cks_head = REDUCE_CKS2UINT16(tmp_cks);
tmp_cks = cm_get_checksum(head, size);
cks_file = REDUCE_CKS2UINT16(tmp_cks);
head->attr.head_checksum = cks_head;
head->attr.file_checksum = cks_file;
}
void bak_calc_ctrlfile_checksum(knl_session_t *session, char *ctrl_buf, uint32 count)
{
ctrl_page_t *pages = (ctrl_page_t *)ctrl_buf;
bool32 cks_off = DB_IS_CHECKSUM_OFF(session);
uint32 i;
for (i = 0; i < count; i++) {
pages[i].tail.checksum = OG_INVALID_CHECKSUM;
if (cks_off) {
continue;
}
page_calc_checksum((page_head_t *)&pages[i], OG_DFLT_CTRL_BLOCK_SIZE);
}
}
status_t rst_verify_ctrlfile_checksum(knl_session_t *session, const char *name)
{
knl_instance_t *kernel = (knl_instance_t *)session->kernel;
uint32 cks_level = kernel->attr.db_block_checksum;
ctrl_page_t *pages = kernel->db.ctrl.pages;
uint32 i;
if (DB_IS_CHECKSUM_OFF(session)) {
return OG_SUCCESS;
}
for (i = 0; i < CTRL_MAX_PAGES(session); i++) {
if (pages[i].tail.checksum == OG_INVALID_CHECKSUM) {
continue;
}
if (!page_verify_checksum((page_head_t *)&pages[i], OG_DFLT_CTRL_BLOCK_SIZE)) {
OG_LOG_RUN_ERR("[RESTORE] the %d's ctrl page corrupted. "
"block size %u, ctrl file name %s, checksum level %s",
i, OG_DFLT_CTRL_BLOCK_SIZE, name, knl_checksum_level(cks_level));
OG_THROW_ERROR(ERR_CHECKSUM_FAILED, name);
return OG_ERROR;
}
}
return OG_SUCCESS;
}
status_t bak_verify_datafile_checksum(knl_session_t *session, bak_process_t *ogx, uint64 offset, const char *name,
bak_buf_data_t *data_buf)
{
uint32 cks_level = session->kernel->attr.db_block_checksum;
uint32 size = (uint32)data_buf->data_size;
uint64 page_offset = 0;
page_head_t *page = NULL;
bool32 skip_badblock = session->kernel->backup_ctx.bak.skip_badblock;
bool32 page_is_valid;
uint32 retry_times = 0;
if (DB_IS_CHECKSUM_OFF(session)) {
return OG_SUCCESS;
}
for (uint32 i = 0; i * DEFAULT_PAGE_SIZE(session) < size; i++) {
page_offset = offset / DEFAULT_PAGE_SIZE(session) + i;
page = (page_head_t *)(data_buf->data_addr + i * DEFAULT_PAGE_SIZE(session));
if (PAGE_CHECKSUM(page, DEFAULT_PAGE_SIZE(session)) == OG_INVALID_CHECKSUM) {
continue;
}
do {
page_is_valid = page_verify_checksum(page, DEFAULT_PAGE_SIZE(session));
if (!skip_badblock || page_is_valid) {
break;
}
retry_times++;
} while (retry_times < BAK_CHECKSUM_RETRY_TIMES);
if (page_is_valid) {
continue;
}
OG_LOG_RUN_ERR("[BACKUP] page corrupted(file %u, page %u). datafile page offset %llu, datafile name %s,"
"checksum level is %u, page size %u, cks %u, read_size %u, checksum level %s",
AS_PAGID_PTR(page->id)->file, AS_PAGID_PTR(page->id)->page, page_offset, name, cks_level,
PAGE_SIZE(*page), PAGE_CHECKSUM(page, DEFAULT_PAGE_SIZE(session)), size,
knl_checksum_level(cks_level));
if (!skip_badblock) {
OG_THROW_ERROR(ERR_CHECKSUM_FAILED_WITH_PAGE, AS_PAGID_PTR(page->id)->file, AS_PAGID_PTR(page->id)->page,
name);
return OG_ERROR;
}
if (badblock_write_page(session, page) != OG_SUCCESS) {
return OG_ERROR;
}
OG_LOG_RUN_WAR("[BACKUP] page corrupted(file %u, page %u), skipped.", AS_PAGID_PTR(page->id)->file,
AS_PAGID_PTR(page->id)->page);
}
return OG_SUCCESS;
}
void check_page_structure(page_head_t *pre_page, page_head_t *page, bool32 pre_page_id_damage,
bool32 *page_struct_damage, bool32 *page_id_damage)
{
*page_struct_damage = OG_FALSE;
*page_id_damage = OG_FALSE;
uint16 page_id_file = AS_PAGID_PTR(page->id)->file;
uint32 page_id_page = AS_PAGID_PTR(page->id)->page;
if (pre_page != NULL) {
uint16 pre_page_id_file = AS_PAGID_PTR(pre_page->id)->file;
uint32 pre_page_id_page = AS_PAGID_PTR(pre_page->id)->page;
if (pre_page_id_damage) {
if ((pre_page_id_file == page_id_file) && (pre_page_id_page + 1 == page_id_page)) {
*page_struct_damage = OG_TRUE;
*page_id_damage = OG_TRUE;
OG_LOG_RUN_ERR("[RESTORE] page_struct damaged, pre_page[file %u, page %u] damaged, "
"cur_page[file %u, page %u].",
pre_page_id_file, pre_page_id_page, page_id_file, page_id_page);
}
} else {
if ((pre_page_id_file != page_id_file) || (pre_page_id_page + 1 != page_id_page)) {
*page_struct_damage = OG_TRUE;
*page_id_damage = OG_TRUE;
OG_LOG_RUN_ERR("[RESTORE] page_struct damaged, pre_page[file %u, page %u] correct, "
"cur_page[file %u, page %u].",
pre_page_id_file, pre_page_id_page, page_id_file, page_id_page);
}
}
}
if ((page->type > PAGE_TYPE_COUNT) && (page->type != PAGE_TYPE_END)) {
*page_struct_damage = OG_TRUE;
OG_LOG_RUN_ERR("[RESTORE] page_struct damaged, type damage, cur_page[file %u, page %u], page_type %u.",
page_id_file, page_id_page, page->type);
}
if (page->pcn != PAGE_TAIL(page)->pcn) {
*page_struct_damage = OG_TRUE;
OG_LOG_RUN_ERR("[RESTORE] page_struct damaged, pcn not equal, cur_page[file %u, page %u], "
"page_head pcn %u, page_tail pcn %u",
page_id_file, page_id_page, (uint32)page->pcn, (uint32)PAGE_TAIL(page)->pcn);
}
}
static status_t rst_handle_badblock(knl_session_t *session, page_head_t *pre_page, page_head_t *page,
bool32 *pre_page_id_damage, const char *name)
{
bool32 page_struct_damage = OG_FALSE;
bool32 page_id_damage = OG_FALSE;
uint8 repair_type = session->kernel->backup_ctx.bak.repair_type;
if (repair_type == RESTORE_REPAIR_TYPE_NULL) {
OG_THROW_ERROR(ERR_CHECKSUM_FAILED_WITH_PAGE, AS_PAGID_PTR(page->id)->file, AS_PAGID_PTR(page->id)->page, name);
return OG_ERROR;
}
check_page_structure(pre_page, page, *pre_page_id_damage, &page_struct_damage, &page_id_damage);
if ((repair_type == RESTORE_REPAIR_DISCARD_BADBLOCK) || page_struct_damage ||
(AS_PAGID_PTR(page->id)->file <= 10)) {
page->hard_damage = 1;
}
page_calc_checksum(page, DEFAULT_PAGE_SIZE(session));
if (badblock_write_page_tmp(session, (void *)page, page_id_damage) != OG_SUCCESS) {
return OG_ERROR;
}
OG_LOG_RUN_WAR("[RESTORE] page repair(file %u, page %u), skip_badblock %u", AS_PAGID_PTR(page->id)->file,
AS_PAGID_PTR(page->id)->page, (uint8)page->hard_damage);
*pre_page_id_damage = page_id_damage;
return OG_SUCCESS;
}
status_t rst_verify_datafile_checksum(knl_session_t *session, bak_process_t *ogx, char *buf, uint32 page_count,
const char *name)
{
uint32 cks_level = session->kernel->attr.db_block_checksum;
bak_attr_t *attr = &session->kernel->backup_ctx.bak.record.attr;
uint64 page_offset;
page_head_t *page = NULL;
page_head_t *pre_page = NULL;
bool32 page_is_valid;
uint32 retry_times = 0;
bool32 pre_page_id_damage = OG_FALSE;
if (DB_IS_CHECKSUM_OFF(session)) {
return OG_SUCCESS;
}
for (uint32 i = 0; i < page_count; i++) {
pre_page = page;
page = (page_head_t *)(buf + i * DEFAULT_PAGE_SIZE(session));
if (PAGE_CHECKSUM(page, DEFAULT_PAGE_SIZE(session)) == OG_INVALID_CHECKSUM) {
continue;
}
page_id_t *page_id = (page_id_t *)page;
datafile_t *df = DATAFILE_GET(session, page_id->file);
if (DATAFILE_IS_COMPRESS(df) && attr->level == 0 && AS_PAGID_PTR(page->id)->page >= DF_MAP_HWM_START) {
continue;
}
do {
page_is_valid = page_verify_checksum(page, DEFAULT_PAGE_SIZE(session));
if (page_is_valid || session->kernel->backup_ctx.bak.repair_type == RESTORE_REPAIR_TYPE_NULL) {
break;
}
retry_times++;
} while (retry_times < BAK_CHECKSUM_RETRY_TIMES);
if (page_is_valid) {
pre_page_id_damage = OG_FALSE;
continue;
}
page_offset = ogx->curr_offset / DEFAULT_PAGE_SIZE(session) + i;
OG_LOG_RUN_ERR("[RESTORE] page corrupted(file %u, page %u). size %u cks %u, "
"page offset %llu, file name %s, checksum level %s",
AS_PAGID_PTR(page->id)->file, AS_PAGID_PTR(page->id)->page, PAGE_SIZE(*page),
PAGE_CHECKSUM(page, DEFAULT_PAGE_SIZE(session)), page_offset, name,
knl_checksum_level(cks_level));
if (rst_handle_badblock(session, pre_page, page, &pre_page_id_damage, name) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t rst_truncate_file(knl_session_t *session, const char *name, device_type_t type, int64 size)
{
int64 file_size;
int32 handle = OG_INVALID_HANDLE;
if (cm_open_device(name, type, knl_io_flag(session), &handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to open %s", name);
return OG_ERROR;
}
file_size = cm_device_size(cm_device_type(name), handle);
if (file_size == -1) {
cm_close_device(type, &handle);
OG_THROW_ERROR(ERR_SEEK_FILE, 0, SEEK_END, errno);
return OG_ERROR;
}
if (size < file_size) {
OG_LOG_RUN_INF("[RESTORE] truncate file from %lld to %lld, name %s", file_size, size, name);
if (cm_truncate_device(type, handle, size) != OG_SUCCESS) {
cm_close_device(type, &handle);
OG_LOG_RUN_ERR("[RESTORE] failed to truncate %s", name);
return OG_ERROR;
}
if (db_fsync(session, type, handle) != OG_SUCCESS) {
cm_close_device(type, &handle);
OG_LOG_RUN_ERR("[RESTORE] failed to fsync file %s", name);
return OG_ERROR;
}
}
cm_close_device(type, &handle);
return OG_SUCCESS;
}
status_t rst_truncate_datafile(knl_session_t *session)
{
datafile_t *df = NULL;
space_t *space = NULL;
for (uint32 i = 0; i < OG_MAX_DATA_FILES; i++) {
df = DATAFILE_GET(session, i);
if (!DATAFILE_IS_ONLINE(df) || !df->ctrl->used || DF_FILENO_IS_INVAILD(df)) {
continue;
}
space = SPACE_GET(session, df->space_id);
if (!SPACE_IS_ONLINE(space) || !space->ctrl->used) {
continue;
}
if (rst_truncate_file(session, df->ctrl->name, df->ctrl->type, df->ctrl->size) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
status_t rst_extend_file(knl_session_t *session, const char *name, device_type_t type, int64 size, char *buf,
uint32 buf_size)
{
int64 file_size;
int32 handle = OG_INVALID_HANDLE;
if (cm_open_device(name, type, knl_io_flag(session), &handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to open %s", name);
return OG_ERROR;
}
knl_panic(cm_device_type(name) == type);
if (type == DEV_TYPE_ULOG) {
return OG_SUCCESS;
}
file_size = cm_device_size(type, handle);
if (file_size == -1) {
cm_close_device(type, &handle);
OG_THROW_ERROR(ERR_SEEK_FILE, 0, SEEK_END, errno);
return OG_ERROR;
}
if (size > file_size) {
OG_LOG_RUN_INF("[RESTORE] extend file from %lld to %lld, name %s, handle %u", file_size, size, name, handle);
if (cm_dbs_is_enable_dbs() == OG_TRUE) {
if (cm_extend_device(type, handle, buf, buf_size, size - file_size,
session->kernel->attr.build_datafile_prealloc) != OG_SUCCESS) {
cm_close_device(type, &handle);
OG_LOG_RUN_ERR("[RESTORE] failed to extend %s", name);
return OG_ERROR;
}
} else {
if (cm_try_prealloc_extend_device(type, handle, buf, buf_size, size - file_size,
session->kernel->attr.build_datafile_prealloc) != OG_SUCCESS) {
cm_close_device(type, &handle);
OG_LOG_RUN_ERR("[RESTORE] failed to extend %s", name);
return OG_ERROR;
}
if (db_fsync(session, type, handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to fsync file %s", name);
cm_close_device(type, &handle);
return OG_ERROR;
}
}
}
cm_close_device(type, &handle);
return OG_SUCCESS;
}
uint32 bak_get_package_type(bak_file_type_t type)
{
switch (type) {
case BACKUP_CTRL_FILE:
return BAK_MSG_TYPE_CTRL;
case BACKUP_DATA_FILE:
return BAK_MSG_TYPE_DATA;
case BACKUP_LOG_FILE:
return BAK_MSG_TYPE_LOG;
case BACKUP_ARCH_FILE:
return BAK_MSG_TYPE_ARCH;
default:
return BAK_MSG_TYPE_HEAD;
}
}
status_t bak_head_verify_checksum(knl_session_t *session, bak_head_t *head, uint32 size, bool32 is_check_file)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
uint32 cks_level = session->kernel->attr.db_block_checksum;
uint32 tmp_cks;
uint16 org_cks_head;
uint16 org_cks_file;
uint16 new_cks;
if (DB_IS_CHECKSUM_OFF(session)) {
return OG_SUCCESS;
}
org_cks_head = head->attr.head_checksum;
org_cks_file = head->attr.file_checksum;
head->attr.head_checksum = OG_INVALID_CHECKSUM;
head->attr.file_checksum = OG_INVALID_CHECKSUM;
tmp_cks = cm_get_checksum(head, size);
new_cks = REDUCE_CKS2UINT16(tmp_cks);
if (is_check_file && org_cks_file != new_cks) {
OG_LOG_RUN_ERR("[BACKUP] backupset file checksum. file %s org_cks %u new_cks %u, "
"check type %u, checksum level %s",
ogx->bak.local.name, org_cks_file, new_cks, (uint32)is_check_file,
knl_checksum_level(cks_level));
OG_THROW_ERROR(ERR_CHECKSUM_FAILED, ogx->bak.local.name);
return OG_ERROR;
}
if (!is_check_file && org_cks_head != new_cks) {
OG_LOG_RUN_ERR("[BACKUP] backupset file checksum. file %s org_cks %u new_cks %u, "
"check type %u,, checksum level %s",
ogx->bak.local.name, org_cks_head, new_cks, (uint32)is_check_file,
knl_checksum_level(cks_level));
OG_THROW_ERROR(ERR_CHECKSUM_FAILED, ogx->bak.local.name);
return OG_ERROR;
}
head->attr.head_checksum = org_cks_head;
head->attr.file_checksum = org_cks_file;
return OG_SUCCESS;
}
static void bak_modify_proc_count(bak_t *bak)
{
if (!bak_log_paral_enable(bak)) {
return;
}
bak->proc_count += BAK_PARAL_LOG_PROC_NUM;
bak->log_proc_count = BAK_PARAL_LOG_PROC_NUM;
OG_LOG_RUN_INF("[BACKUP] Set params: modified data process number: %u, log process number: %u", bak->proc_count,
bak->log_proc_count);
}
void bak_reset_process_ctrl(bak_t *bak, bool32 restore)
{
if (BAK_IS_FULL_BUILDING(bak) && !bak->is_first_link) {
OG_LOG_RUN_INF("[BUILD] ignore reset progress for break-point building");
} else {
bak_reset_progress(&bak->progress);
}
bak_reset_error(&bak->error_info);
bak->depend_num = 0;
bak->curr_id = 0;
bak->restore = restore;
bak->record.status = BACKUP_PROCESSING;
bak->remote.remain_data_size = 0;
bak->ctrlfile_completed = OG_FALSE;
bak->need_retry = OG_FALSE;
bak_modify_proc_count(bak);
}
static void bak_reset_ctrl(bak_ctrl_t *ctrl)
{
cm_close_device(ctrl->type, &ctrl->handle);
ctrl->handle = (int32)OG_INVALID_ID32;
ctrl->name[0] = '\0';
ctrl->offset = 0;
}
void bak_reset_stats_and_alloc_sess(knl_session_t *session)
{
bak_process_t *procs = session->kernel->backup_ctx.process;
errno_t ret;
for (uint32 i = 0; i < OG_MAX_BACKUP_PROCESS; i++) {
ret = memset_sp(&procs[i].stat, sizeof(bak_process_stat_t), 0, sizeof(bak_process_stat_t));
knl_securec_check(ret);
if (g_knl_callback.alloc_knl_session(OG_FALSE, (knl_handle_t *)&procs[i].session) != OG_SUCCESS) {
CM_ASSERT(0);
return;
}
}
}
void bak_reset_process(bak_process_t *ogx)
{
cm_close_thread(&ogx->thread);
cm_close_thread(&ogx->write_thread);
cm_aligned_free(&ogx->backup_buf);
cm_aligned_free(&ogx->compress_ctx.compress_buf);
cm_aligned_free(&ogx->encrypt_ctx.encrypt_buf);
cm_aligned_free(&ogx->table_compress_ctx.read_buf);
cm_aligned_free(&ogx->table_compress_ctx.unzip_buf);
cm_aligned_free(&ogx->table_compress_ctx.zip_buf);
cm_aligned_free(&ogx->backup_rw_buf.aligned_buf);
g_knl_callback.release_knl_session((knl_handle_t *)ogx->session);
ogx->read_size = 0;
ogx->write_size = 0;
bak_reset_ctrl(&ogx->ctrl);
}
static void bak_process_init(bak_context_t *ogx, knl_session_t *session)
{
bak_process_t *process = NULL;
uint32 i;
uint32 j;
for (i = 0; i < OG_MAX_BACKUP_PROCESS; i++) {
process = &ogx->process[i];
process->session = NULL;
process->ctrl.handle = OG_INVALID_HANDLE;
for (j = 0; j < OG_MAX_DATA_FILES; j++) {
process->datafiles[j] = OG_INVALID_HANDLE;
process->datafile_name[j][0] = '\0';
}
bak_reset_fileinfo(&process->assign_ctrl);
}
}
static void bak_set_stage_weight(bak_context_t *ogx)
{
ogx->stage_weight[BACKUP_PARAM_STAGE] = BAK_PARAM_WEIGHT;
ogx->stage_weight[BACKUP_HEAD_STAGE] = BAK_HEAD_WEIGHT;
ogx->stage_weight[BACKUP_CTRL_STAGE] = BAK_CTRL_WEIGHT;
ogx->stage_weight[BACKUP_DATA_STAGE] = BAK_DATE_WEIGHT;
ogx->stage_weight[BACKUP_LOG_STAGE] = BAK_LOG_WEIGHT;
}
static void bak_stats_init(bak_context_t *ogx)
{
ogx->bak.stat.reads = 0;
ogx->bak.stat.writes = 0;
}
void bak_init(knl_session_t *session)
{
knl_instance_t *kernel = session->kernel;
bak_context_t *ogx = &kernel->backup_ctx;
bak_t *bak = &ogx->bak;
bak_ctrlinfo_t *ctrlinfo = &bak->record.ctrlinfo;
errno_t ret;
knl_panic(sizeof(bak_head_t) == BAK_HEAD_STRUCT_SIZE);
ret = memset_sp(ogx, sizeof(bak_context_t), 0, sizeof(bak_context_t));
knl_securec_check(ret);
bak->kernel = kernel;
bak->local.handle = OG_INVALID_HANDLE;
bak->log_local.handle = OG_INVALID_HANDLE;
ctrlinfo->rcy_point.asn = OG_INVALID_ID32;
ctrlinfo->lrp_point.asn = OG_INVALID_ID32;
ctrlinfo->scn = OG_INVALID_ID64;
bak->logfiles_created = OG_FALSE;
dls_init_spinlock(&(ogx->lock), DR_TYPE_DATABASE, DR_ID_DATABASE_BAKUP, 0);
bak_stats_init(ogx);
bak_process_init(ogx, kernel->sessions[SESSION_ID_BRU]);
bak_set_stage_weight(ogx);
}
static void bak_generate_datafile_name(knl_session_t *session, const char *path, uint32 index, uint32 file_id,
uint32 sec_id, char *file_name)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
uint32 space_id;
int32 ret;
if (bak->restore) {
ret = snprintf_s(file_name, OG_FILE_NAME_BUFFER_SIZE, OG_MAX_FILE_NAME_LEN, "%s/data_%s_%u_%u.bak", path,
bak->files[index].spc_name, file_id, sec_id);
} else {
if (!g_cluster_no_cms) {
space_id = DATAFILE_GET(session, file_id)->space_id;
ret = strcpy_sp(bak->files[index].spc_name, OG_NAME_BUFFER_SIZE, SPACE_GET(session, space_id)->ctrl->name);
knl_securec_check_ss(ret);
}
ret = snprintf_s(file_name, OG_FILE_NAME_BUFFER_SIZE, OG_MAX_FILE_NAME_LEN, "%s/data_%s_%u_%u.bak", path,
bak->files[index].spc_name, file_id, sec_id);
}
knl_securec_check_ss(ret);
}
static int32 bak_generate_bak_log_file(bak_t *bak, const char *path, uint32 index, uint32 file_id, char *file_name)
{
int32 ret;
if (BAK_IS_DBSOTR(bak)) {
ret = snprintf_s(file_name, OG_FILE_NAME_BUFFER_SIZE, OG_MAX_FILE_NAME_LEN, "%s/log_%u_%u_%llx_%llx.bak", path,
bak->files[index].inst_id, file_id, bak->files[index].start_lsn, bak->files[index].end_lsn);
} else {
ret = snprintf_s(file_name, OG_FILE_NAME_BUFFER_SIZE, OG_MAX_FILE_NAME_LEN, "%s/log_%u_%u_0.bak", path,
bak->files[index].inst_id, file_id);
}
return ret;
}
static int32 bak_generate_bak_arch_file(bak_t *bak, const char *path, uint32 index, uint32 file_id, char *file_name)
{
int32 ret;
if (BAK_IS_DBSOTR(bak)) {
ret = snprintf_s(file_name, OG_FILE_NAME_BUFFER_SIZE, OG_MAX_FILE_NAME_LEN, "%s/arch_%u_%u_%llx_%llx.bak", path,
bak->files[index].inst_id, file_id, bak->files[index].start_lsn, bak->files[index].end_lsn);
} else {
ret = snprintf_s(file_name, OG_FILE_NAME_BUFFER_SIZE, OG_MAX_FILE_NAME_LEN, "%s/arch_%u_%u_0.bak", path,
bak->files[index].inst_id, file_id);
}
return ret;
}
void bak_generate_bak_file(knl_session_t *session, const char *path, bak_file_type_t type, uint32 index, uint32 file_id,
uint32 sec_id, char *file_name)
{
int32 ret;
knl_instance_t *kernel = session->kernel;
bak_context_t *ogx = &kernel->backup_ctx;
bak_t *bak = &ogx->bak;
switch (type) {
case BACKUP_CTRL_FILE:
ret = snprintf_s(file_name, OG_FILE_NAME_BUFFER_SIZE, OG_MAX_FILE_NAME_LEN, "%s/ctrl_%d_%d.bak", path, 0,
0);
knl_securec_check_ss(ret);
break;
case BACKUP_DATA_FILE:
bak_generate_datafile_name(session, path, index, file_id, sec_id, file_name);
break;
case BACKUP_LOG_FILE:
ret = bak_generate_bak_log_file(bak, path, index, file_id, file_name);
knl_securec_check_ss(ret);
break;
case BACKUP_ARCH_FILE:
ret = bak_generate_bak_arch_file(bak, path, index, file_id, file_name);
knl_securec_check_ss(ret);
break;
case BACKUP_HEAD_FILE:
ret = snprintf_s(file_name, OG_FILE_NAME_BUFFER_SIZE, OG_MAX_FILE_NAME_LEN, "%s/backupset", path);
knl_securec_check_ss(ret);
break;
default:
break;
}
}
status_t bak_get_last_rcy_point(knl_session_t *session, log_point_t *point)
{
knl_cursor_t *cursor = NULL;
bak_stage_t stage;
CM_SAVE_STACK(session->stack);
knl_set_session_scn(session, OG_INVALID_ID64);
cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_BACKUP_SET_ID, 0);
cursor->index_dsc = OG_TRUE;
knl_init_index_scan(cursor, OG_FALSE);
knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, 0);
knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, 0);
for (;;) {
if (OG_SUCCESS != knl_fetch(session, cursor)) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
if (cursor->eof) {
point->rst_id = 0;
point->asn = 0;
point->lsn = 0;
break;
} else {
cm_decode_row((char *)cursor->row, cursor->offsets, cursor->lens, NULL);
point->rst_id = *(uint32 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_RESETLOGS);
point->asn = *(uint32 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_RCY_ASN);
point->lsn = *(uint64 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_RCY_LSN);
stage = *(uint32 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_STAGE);
}
if (stage == BACKUP_LOG_STAGE) {
break;
}
}
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
void build_disconnect(bak_t *bak)
{
bak_remote_t *remote = &bak->remote;
knl_disconnect(&remote->send_pipe);
remote->send_pipe.link.tcp.sock = CS_INVALID_SOCKET;
remote->pipe = NULL;
remote->send_pack = NULL;
remote->recv_pack = NULL;
}
static void bak_free_stream_buffer(bak_t *bak)
{
cm_aligned_free(&bak->send_stream.bufs[0]);
cm_aligned_free(&bak->send_stream.bufs[1]);
bak->send_stream.buf_size = 0;
cm_aligned_free(&bak->recv_stream.bufs[0]);
cm_aligned_free(&bak->recv_stream.bufs[1]);
bak->recv_stream.buf_size = 0;
}
static void bak_free_backup_buf(bak_t *bak)
{
if (bak->backup_buf != NULL) {
cm_aligned_free(&bak->align_buf);
bak->backup_buf = NULL;
bak->depends = NULL;
bak->compress_buf = NULL;
bak->ctrl_backup_buf = NULL;
}
}
void bak_reset_params(knl_session_t *session, bool32 restore)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
if (restore) {
bak_unset_build_running(session, ogx);
} else {
bak->progress.stage = BACKUP_END;
}
errno_t ret = memset_sp(bak->exclude_spcs, sizeof(bool32) * OG_MAX_SPACES, 0, sizeof(bool32) * OG_MAX_SPACES);
knl_securec_check(ret);
ret = memset_sp(bak->include_spcs, sizeof(bool32) * OG_MAX_SPACES, 0, sizeof(bool32) * OG_MAX_SPACES);
knl_securec_check(ret);
bak->record.status = bak->failed ? BACKUP_FAILED : BACKUP_SUCCESS;
bak->failed = OG_FALSE;
bak->is_building = OG_FALSE;
bak->depends = NULL;
bak->need_check = OG_FALSE;
bak->record.is_increment = OG_FALSE;
bak->record.is_repair = OG_FALSE;
ret = memset_sp(&bak->rst_file, sizeof(rst_file_info_t), 0, sizeof(rst_file_info_t));
knl_securec_check(ret);
ret = memset_sp(&bak->target_info, sizeof(knl_backup_targetinfo_t), 0, sizeof(knl_backup_targetinfo_t));
knl_securec_check(ret);
if (!bak->record.data_only) {
ret = memset_sp(bak->record.attr.tag, OG_NAME_BUFFER_SIZE, 0, OG_NAME_BUFFER_SIZE);
knl_securec_check(ret);
}
}
void bak_end(knl_session_t *session, bool32 restore)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
ctrlfile_set_t *ctrlfiles = &session->kernel->db.ctrlfiles;
bak_t *bak = &ogx->bak;
bak_error_t *error_info = &bak->error_info;
if (bak->encrypt_info.encrypt_alg != ENCRYPT_NONE) {
bak_replace_password(bak->password);
}
for (int32 i = OG_MAX_BACKUP_PROCESS - 1; i >= 0; i--) {
bak_reset_process(&ogx->process[i]);
}
if (badblock_end(session) != OG_SUCCESS) {
bak->failed = OG_TRUE;
}
if (bak->failed && !bak->need_retry) {
bak_set_fail_error(error_info, restore ? "restore" : "backup");
OG_LOG_RUN_ERR("[%s] %s failed", restore ? "RESTORE" : "BACKUP", restore ? "restore" : "backup");
} else {
bak_set_progress_end(bak);
OG_LOG_RUN_INF("[%s] %s success", restore ? "RESTORE" : "BACKUP", restore ? "restore" : "backup");
}
bak_free_backup_buf(bak);
bak_free_stream_buffer(bak);
bak_free_compress_context(session, !restore);
bak_free_encrypt_context(session);
if (BAK_IS_UDS_DEVICE(bak)) {
if (bak->failed && bak->remote.uds_link.sock != CS_INVALID_SOCKET) {
cs_uds_disconnect(&bak->remote.uds_link);
}
}
cm_close_device(bak->local.type, &bak->local.handle);
bak->local.handle = OG_INVALID_HANDLE;
bak->local.name[0] = '\0';
cm_close_device(bak->log_local.type, &bak->log_local.handle);
bak->log_local.handle = OG_INVALID_HANDLE;
bak->log_local.name[0] = '\0';
if (restore && bak->rst_file.file_type == RESTORE_ALL &&
(cm_device_type(session->kernel->db.ctrlfiles.items[0].name) != DEV_TYPE_RAW)) {
rst_close_ctrl_file(ctrlfiles);
rst_close_log_files(session);
}
bak_reset_params(session, restore);
}
status_t bak_validate_backupset(knl_session_t *session, knl_validate_t *param)
{
return OG_SUCCESS;
}
void bak_get_error(knl_session_t *session, int32 *code, const char **message)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
*code = ogx->bak.error_info.err_code;
*message = ogx->bak.error_info.err_msg;
}
static status_t bak_generate_default_backupset_name(knl_session_t *session)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
int32 ret;
ret = sprintf_s(bak->record.path, OG_MAX_BACKUP_PATH_LEN, DEFAULT_BAKCUPFILE_FORMAT, session->kernel->home,
bak->record.start_time);
if (ret == -1) {
OG_THROW_ERROR(ERR_EXCEED_MAX_BACKUP_PATH_LEN, "default backup path", OG_MAX_BACKUP_PATH_LEN);
return OG_ERROR;
}
return OG_SUCCESS;
}
static void bak_record_base_info(knl_cursor_t *cursor, bak_t *bak)
{
bak_dependence_t *info = bak->depends + bak->depend_num;
text_t value;
info->device = *(uint32 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_DEVICE_TYPE);
value.str = CURSOR_COLUMN_DATA(cursor, BAK_COL_POLICY);
value.len = CURSOR_COLUMN_SIZE(cursor, BAK_COL_POLICY);
(void)cm_text2str(&value, info->policy, OG_NAME_BUFFER_SIZE);
value.str = CURSOR_COLUMN_DATA(cursor, BAK_COL_DIR);
value.len = CURSOR_COLUMN_SIZE(cursor, BAK_COL_DIR);
(void)cm_text2str(&value, info->file_dest, OG_FILE_NAME_BUFFER_SIZE);
bak->depend_num++;
OG_LOG_RUN_INF("[BACKUP]depend_num1 %u, file dest %s", bak->depend_num, info->file_dest);
}
static status_t bak_check_db_version(knl_session_t *session, knl_cursor_t *cursor)
{
text_t value;
char db_version[OG_DB_NAME_LEN];
value.str = CURSOR_COLUMN_DATA(cursor, BAK_COL_DB_VERSION);
value.len = CURSOR_COLUMN_SIZE(cursor, BAK_COL_DB_VERSION);
(void)cm_text2str(&value, db_version, OG_DB_NAME_LEN);
if (strncmp(session->kernel->attr.db_version, db_version, OG_MIN_VERSION_NUM_LEN) != 0) {
return OG_ERROR;
}
return OG_SUCCESS;
}
bool32 bak_filter_incr(knl_session_t *session, knl_cursor_t *cursor, backup_device_t device, uint32 rst_value,
bool32 cumulative)
{
uint32 backup_type = *(uint32 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_TYPE);
uint32 reset_logs = *(uint32 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_RESETLOGS);
backup_device_t device_type = *(uint32 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_DEVICE_TYPE);
bak_stage_t stage = *(uint32 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_STAGE);
if (!BAK_MODE_IS_INCREMENTAL(backup_type) || (reset_logs != rst_value) || (device_type != device) ||
(stage != BACKUP_LOG_STAGE)) {
return OG_FALSE;
}
uint32 level = *(uint32 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_LEVEL);
if (cumulative && level != 0) {
return OG_FALSE;
}
if (bak_check_db_version(session, cursor) != OG_SUCCESS) {
return OG_FALSE;
}
return OG_TRUE;
}
status_t bak_select_incr_info(knl_session_t *session, bak_t *bak)
{
knl_cursor_t *cursor = NULL;
uint32 level;
bool32 save_lastest_incr = OG_TRUE;
text_t value;
bak_attr_t *attr = &bak->record.attr;
CM_SAVE_STACK(session->stack);
knl_set_session_scn(session, OG_INVALID_ID64);
cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_BACKUP_SET_ID, 0);
cursor->index_dsc = OG_TRUE;
knl_init_index_scan(cursor, OG_FALSE);
knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, 0);
knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, 0);
for (;;) {
if (OG_SUCCESS != knl_fetch(session, cursor)) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
if (cursor->eof) {
break;
}
cm_decode_row((char *)cursor->row, cursor->offsets, cursor->lens, NULL);
if (!bak_filter_incr(session, cursor, bak->record.device, session->kernel->db.ctrl.core.resetlogs.rst_id,
bak->cumulative)) {
continue;
}
if (save_lastest_incr) {
value.str = CURSOR_COLUMN_DATA(cursor, BAK_COL_TAG);
value.len = CURSOR_COLUMN_SIZE(cursor, BAK_COL_TAG);
(void)cm_text2str(&value, attr->base_tag, OG_NAME_BUFFER_SIZE);
attr->base_lsn = *(uint64 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_LSN);
attr->base_buffer_size = *(uint32 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_MAX_BUFFER_SIZE);
save_lastest_incr = OG_FALSE;
}
bak_record_base_info(cursor, bak);
level = *(uint32 *)CURSOR_COLUMN_DATA(cursor, BAK_COL_LEVEL);
if (level == 0) {
break;
}
if (bak->depend_num >= BAK_MAX_INCR_NUM) {
OG_THROW_ERROR(ERR_EXCEED_MAX_INCR_BACKUP);
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
}
OG_LOG_RUN_INF("[BACKUP] last backup base lsn is %llu", attr->base_lsn);
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
status_t bak_set_incr_info(knl_session_t *session, bak_t *bak)
{
bak_attr_t *attr = &bak->record.attr;
bak->depend_num = 0;
if (attr->level == 0) {
attr->base_lsn = 0;
attr->base_tag[0] = '\0';
attr->base_buffer_size = OG_MIN_BACKUP_BUF_SIZE;
return OG_SUCCESS;
}
if (bak->is_building) {
return OG_SUCCESS;
}
if (bak_select_incr_info(session, bak) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak->depend_num == 0) {
OG_THROW_ERROR(ERR_NO_VALID_BASE_BACKUPSET);
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t bak_set_data_path(knl_session_t *session, bak_t *bak, text_t *format)
{
if (format->len > 0) {
if (format->len > OG_MAX_BACKUP_PATH_LEN) {
OG_THROW_ERROR(ERR_EXCEED_MAX_BACKUP_PATH_LEN, T2S(format), OG_MAX_BACKUP_PATH_LEN);
return OG_ERROR;
}
if (cm_text2str(format, bak->record.path, OG_FILE_NAME_BUFFER_SIZE) != OG_SUCCESS) {
return OG_ERROR;
}
if (cm_check_exist_special_char(bak->record.path, (uint32)strlen(bak->record.path))) {
OG_THROW_ERROR(ERR_INVALID_DIR, bak->record.path);
return OG_ERROR;
}
} else {
if (bak_generate_default_backupset_name(session) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (bak->record.device == DEVICE_DISK) {
if (cm_create_device_dir_ex(cm_device_type(bak->record.path), bak->record.path) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
status_t bak_set_exclude_space(knl_session_t *session, bak_t *bak, galist_t *exclude_spcs)
{
text_t *spc_name = NULL;
space_t *space = NULL;
uint32 spc_id;
errno_t ret;
ret = memset_sp(bak->exclude_spcs, sizeof(bool32) * OG_MAX_SPACES, 0, sizeof(bool32) * OG_MAX_SPACES);
knl_securec_check(ret);
for (uint32 i = 0; i < exclude_spcs->count; i++) {
spc_name = (text_t *)cm_galist_get(exclude_spcs, i);
if (spc_get_space_id(session, spc_name, OG_TRUE, &spc_id) != OG_SUCCESS) {
return OG_ERROR;
}
space = SPACE_GET(session, spc_id);
if (SPACE_IS_DEFAULT(space)) {
OG_THROW_ERROR(ERR_EXCLUDE_SPACES, T2S(spc_name));
return OG_ERROR;
}
bak->exclude_spcs[spc_id] = OG_TRUE;
}
return OG_SUCCESS;
}
status_t bak_set_include_space(knl_session_t *session, bak_t *bak, galist_t *include_spcs)
{
text_t *spc_name = NULL;
uint32 spc_id;
errno_t ret;
ret = memset_sp(bak->include_spcs, sizeof(bool32) * OG_MAX_SPACES, 0, sizeof(bool32) * OG_MAX_SPACES);
knl_securec_check(ret);
for (uint32 i = 0; i < include_spcs->count; i++) {
spc_name = (text_t *)cm_galist_get(include_spcs, i);
if (spc_get_space_id(session, spc_name, OG_TRUE, &spc_id) != OG_SUCCESS) {
return OG_ERROR;
}
bak->include_spcs[spc_id] = OG_TRUE;
}
return OG_SUCCESS;
}
static status_t bak_check_backupset_to_delete(knl_session_t *session, knl_alterdb_backupset_t *def,
bak_record_t *record)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
if (def->force_delete) {
return OG_SUCCESS;
}
if (BAK_IS_UDS_DEVICE(bak)) {
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", backupset device type is not disk, please use delete force");
return OG_ERROR;
}
if (!cm_dir_exist(record->path)) {
OG_THROW_ERROR_EX(ERR_INVALID_OPERATION, ", %s does not exist", record->path);
return OG_ERROR;
}
if (cm_access_file(record->path, R_OK | W_OK | X_OK) != OG_SUCCESS) {
OG_THROW_ERROR_EX(ERR_INVALID_OPERATION, ", %s is not an readable or writable or executable folder",
record->path);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t bak_check_exist_dependent_backupset(knl_session_t *session, const char *tag)
{
text_t base_tag;
CM_SAVE_STACK(session->stack);
knl_set_session_scn(session, OG_INVALID_ID64);
knl_cursor_t *cursor = knl_push_cursor(session);
knl_open_sys_cursor(session, cursor, CURSOR_ACTION_SELECT, SYS_BACKUP_SET_ID, IX_SYS_BACKUPSET_001_ID);
knl_init_index_scan(cursor, OG_FALSE);
knl_set_key_flag(&cursor->scan_range.l_key, SCAN_KEY_LEFT_INFINITE, IX_COL_SYS_BACKUPSET_001_RECID);
knl_set_key_flag(&cursor->scan_range.r_key, SCAN_KEY_RIGHT_INFINITE, IX_COL_SYS_BACKUPSET_001_RECID);
for (;;) {
if (OG_SUCCESS != knl_fetch(session, cursor)) {
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
if (cursor->eof) {
break;
}
cm_decode_row((char *)cursor->row, cursor->offsets, cursor->lens, NULL);
base_tag.str = CURSOR_COLUMN_DATA(cursor, BAK_COL_BASE_TAG);
base_tag.len = CURSOR_COLUMN_SIZE(cursor, BAK_COL_BASE_TAG);
if (cm_text_str_equal_ins(&base_tag, tag)) {
OG_THROW_ERROR_EX(ERR_INVALID_OPERATION, ", exists backupset depends on the backupset");
CM_RESTORE_STACK(session->stack);
return OG_ERROR;
}
}
CM_RESTORE_STACK(session->stack);
return OG_SUCCESS;
}
static status_t bak_delete_backset_precheck(knl_session_t *session, knl_alterdb_backupset_t *def, bak_record_t *record)
{
if (bak_get_record(session, def->tag, record) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_check_backupset_to_delete(session, def, record) != OG_SUCCESS) {
return OG_ERROR;
}
if (BAK_MODE_IS_INCREMENTAL(record->attr.backup_type) &&
bak_check_exist_dependent_backupset(session, def->tag) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t bak_delete_backup_set(knl_session_t *session, knl_alterdb_backupset_t *def)
{
status_t status = OG_SUCCESS;
bak_record_t record;
bak_context_t *ogx = &session->kernel->backup_ctx;
if (bak_set_running(session, ogx) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_BACKUP_IN_PROGRESS, "backup or delete backupset");
return OG_ERROR;
}
if (bak_delete_backset_precheck(session, def, &record) != OG_SUCCESS) {
bak_unset_running(session, ogx);
return OG_ERROR;
}
if (bak_delete_record(session, def->tag) != OG_SUCCESS) {
bak_unset_running(session, ogx);
OG_LOG_RUN_ERR("[BACKUP] Failed to delete backupset record of %s", record.path);
return OG_ERROR;
}
#ifndef WIN32
if (record.device == DEVICE_DISK && cm_remove_dir(record.path) != OG_SUCCESS) {
if (def->force_delete) {
OG_LOG_RUN_INF("[BACKUP] Delete backupset %s error is ignored ", record.path);
status = OG_SUCCESS;
} else {
OG_THROW_ERROR(ERR_REMOVE_DIR, record.path);
status = OG_ERROR;
}
}
#endif
bak_unset_running(session, ogx);
return status;
}
bool32 bak_datafile_contains_dw(knl_session_t *session, bak_assignment_t *assign_ctrl)
{
if (assign_ctrl->type != BACKUP_DATA_FILE) {
return OG_FALSE;
}
uint32 dw_file_id = knl_get_dbwrite_file_id(session);
datafile_t *df = DATAFILE_GET(session, assign_ctrl->file_id);
return (bool32)(DATAFILE_CONTAINS_DW(df, dw_file_id));
}
* Backup pages before file_hwm_start firstly, to ensure data pages are backed up in group of 8 pages.
* Temporary tablespace datafile only backup space head page.
* Datafile containing dw first backup space head page, and then if is bitmap managed,
* backup map pages before file_hwm_start.
*/
uint64 bak_set_datafile_read_size(knl_session_t *session, uint64 offset, bool32 contains_dw, uint64 file_size,
uint32 hwm_start)
{
uint64 read_size;
if (offset == DEFAULT_PAGE_SIZE(session)) {
if (contains_dw || file_size == SPACE_HEAD_END * DEFAULT_PAGE_SIZE(session)) {
read_size = DEFAULT_PAGE_SIZE(session);
} else {
knl_panic(file_size >= offset);
read_size = hwm_start > 1 ? (hwm_start - 1) * DEFAULT_PAGE_SIZE(session) : file_size - offset;
head page */
}
} else {
if (contains_dw && offset == DW_SPC_HWM_START * DEFAULT_PAGE_SIZE(session) && hwm_start > DW_SPC_HWM_START) {
read_size = (hwm_start - DW_SPC_HWM_START) * DEFAULT_PAGE_SIZE(session);
} else {
knl_panic(file_size >= offset);
read_size = file_size - offset;
}
}
return read_size;
}
#ifndef WIN32
static bool32 bak_is_above_hwm_start(knl_session_t *session, uint32 size)
{
if (size == DEFAULT_PAGE_SIZE(session) || size == (DF_MAP_HWM_START - 1) * DEFAULT_PAGE_SIZE(session)) {
return OG_FALSE;
}
return OG_TRUE;
}
status_t bak_construct_decompress_group(knl_session_t *session, char *first_page)
{
page_head_t *page = NULL;
for (uint32 i = 0; i < PAGE_GROUP_COUNT; i++) {
page = (page_head_t *)(first_page + i * DEFAULT_PAGE_SIZE(session));
if (page->size_units == 0) {
continue;
}
page->compressed = 0;
if (!buf_check_load_page(session, page, *AS_PAGID_PTR(page->id), OG_TRUE)) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
bool32 bak_need_decompress(knl_session_t *session, bak_process_t *bak_proc)
{
bak_context_t *bak_ctx = &session->kernel->backup_ctx;
bak_attr_t *attr = &bak_ctx->bak.record.attr;
uint32 file_id = bak_proc->assign_ctrl.file_id;
datafile_t *df = DATAFILE_GET(session, file_id);
if (attr->level == 0 && DB_IS_CHECKSUM_OFF(session)) {
return OG_FALSE;
}
if (!DATAFILE_IS_COMPRESS(df)) {
return OG_FALSE;
}
if (!bak_is_above_hwm_start(session, bak_proc->read_size)) {
return OG_FALSE;
}
return OG_TRUE;
}
status_t bak_decompress_and_verify_datafile(knl_session_t *session, bak_process_t *bak_proc, bak_buf_data_t *data_buf)
{
bak_context_t *bak_ctx = &session->kernel->backup_ctx;
bak_attr_t *attr = &bak_ctx->bak.record.attr;
uint32 level = attr->level;
uint32 total_size = (uint32)data_buf->data_size;
page_head_t *first_page = NULL;
pcb_assist_t src_pcb_assist;
uint32 group_size;
errno_t ret;
knl_panic_log(total_size % (DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT) == 0,
"buf size %u is not in multiples of 8k", total_size);
if (pcb_get_buf(session, &src_pcb_assist) != OG_SUCCESS) {
return OG_ERROR;
}
for (uint32 i = 0; i * DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT < total_size; i++) {
first_page = (page_head_t *)(data_buf->data_addr + i * DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT);
if (!first_page->compressed) {
continue;
}
if (buf_decompress_group(session, src_pcb_assist.aligned_buf, (const char *)first_page, &group_size) !=
OG_SUCCESS) {
pcb_release_buf(session, &src_pcb_assist);
return OG_ERROR;
}
knl_panic_log(group_size == DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, "group size %u is not corrent",
group_size);
if (bak_construct_decompress_group(session, src_pcb_assist.aligned_buf) != OG_SUCCESS) {
pcb_release_buf(session, &src_pcb_assist);
return OG_ERROR;
}
* Full backup will back up compressed pages, while decompression is only for verification.
* Incremental backup will back up decompressed single pages.
*/
if (level == 1) {
ret = memcpy_sp(first_page, DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT, src_pcb_assist.aligned_buf,
DEFAULT_PAGE_SIZE(session) * PAGE_GROUP_COUNT);
knl_securec_check(ret);
}
}
pcb_release_buf(session, &src_pcb_assist);
return OG_SUCCESS;
}
page_id_t bak_first_compress_group_id(knl_session_t *session, page_id_t page_id)
{
page_id_t first;
knl_panic_log(page_id.page >= DF_MAP_HWM_START, "page %u-%u before space first extent %u-%u", page_id.file,
page_id.page, page_id.file, DF_MAP_HWM_START);
first.page = page_id.page - ((page_id.page - DF_MAP_HWM_START) % PAGE_GROUP_COUNT);
first.file = page_id.file;
first.aligned = 0;
return first;
}
#endif
* Primary backup and some DDL operations are mutually exclusive,
* and these DDL are also not allowed during standby backup.
* If the standby replays these DDL redo during backup, the backup needs to be set to fail.
*/
static bool32 backup_allow_logic_redo_type(logic_op_t type)
{
switch (type) {
case RD_ADD_LOGFILE:
case RD_DROP_LOGFILE:
case RD_SPC_RENAME_SPACE:
return OG_FALSE;
default:
return OG_TRUE;
}
return OG_TRUE;
}
static bool32 backup_allow_redo_type(log_entry_t *log)
{
logic_op_t *logic_type = (logic_op_t *)log->data;
switch (log->type) {
case RD_SPC_CREATE_SPACE:
case RD_SPC_REMOVE_SPACE:
case RD_SPC_CREATE_DATAFILE:
case RD_SPC_REMOVE_DATAFILE:
case RD_SPC_TRUNCATE_DATAFILE:
return OG_FALSE;
case RD_LOGIC_OPERATION:
return backup_allow_logic_redo_type(*logic_type);
default:
return OG_TRUE;
}
return OG_TRUE;
}
void backup_safe_entry(knl_session_t *session, log_entry_t *log, bool32 *need_unblock_backup)
{
if (!backup_allow_redo_type(log)) {
CM_ABORT(0, "[RCY] ABORT INFO: recover failed when redo %s is not allowed during standby backup",
rcy_redo_name(log));
}
*need_unblock_backup = OG_FALSE;
}
void backup_unsafe_entry(knl_session_t *session, log_entry_t *log, bool32 *need_unblock_backup)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
dls_spin_lock(session, &ogx->lock, NULL);
if (BAK_IS_RUNNING(ogx)) {
ogx->bak.failed = OG_TRUE;
OG_LOG_RUN_ERR("[BACKUP] backup stopped due to replay of unsupported redo %s during standby backup",
rcy_redo_name(log));
}
ogx->bak.rcy_stop_backup = OG_TRUE;
errno_t ret = strcpy_sp(ogx->bak.unsafe_redo, OG_NAME_BUFFER_SIZE, rcy_redo_name(log));
knl_securec_check(ret);
dls_spin_unlock(session, &ogx->lock);
*need_unblock_backup = OG_TRUE;
}
uint32 bak_log_get_id(knl_session_t *session, backup_data_type_t backup_type, uint32 rst_id, uint32 asn)
{
if (backup_type == DATA_TYPE_DBSTOR) {
return OG_INVALID_ID32;
} else {
return log_get_id_by_asn(session, rst_id, asn, NULL);
}
}
uint32 bak_get_rst_id(bak_t *bak, uint32 asn, reset_log_t *rst_log)
{
if (BAK_IS_DBSOTR(bak)) {
return rst_log->rst_id;
} else {
OG_LOG_DEBUG_INF("[BACKUP] last_asn %u, rst_id %u, asn %u.", rst_log->last_asn, rst_log->rst_id, asn);
return asn <= rst_log->last_asn ? (rst_log->rst_id - 1) : rst_log->rst_id;
}
}
void bak_set_data_type(knl_session_t *session)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
bak->record.data_type = knl_dbs_is_enable_dbs() ? DATA_TYPE_DBSTOR : DATA_TYPE_FILE;
}
status_t bak_init_rw_buf(bak_process_t *proc, uint32 buf_size, const char *task)
{
bak_rw_buf_t *rw_buf = &proc->backup_rw_buf;
OG_LOG_RUN_INF("[%s] init aligned buf for read and write process paral.", task);
error_t err = memset_sp(rw_buf, sizeof(bak_rw_buf_t), 0, sizeof(bak_rw_buf_t));
knl_securec_check(err);
if (cm_aligned_malloc(buf_size, "bak rw buf", &rw_buf->aligned_buf) != OG_SUCCESS) {
return OG_ERROR;
}
rw_buf->buf_data[0].data_addr = proc->backup_buf.aligned_buf;
rw_buf->buf_data[1].data_addr = rw_buf->aligned_buf.aligned_buf;
return OG_SUCCESS;
}
status_t bak_get_read_buf(bak_rw_buf_t *rw_buf, bak_buf_data_t **read_buf)
{
if (rw_buf->buf_stat[rw_buf->read_index] == OG_FALSE) {
*read_buf = &rw_buf->buf_data[rw_buf->read_index];
return OG_SUCCESS;
}
return OG_ERROR;
}
void bak_set_read_done(bak_rw_buf_t *rw_buf)
{
rw_buf->buf_stat[rw_buf->read_index] = OG_TRUE;
rw_buf->read_index = !rw_buf->read_index;
}
status_t bak_get_write_buf(bak_rw_buf_t *rw_buf, bak_buf_data_t **write_buf)
{
if (rw_buf->buf_stat[rw_buf->write_index] == OG_TRUE) {
*write_buf = &rw_buf->buf_data[rw_buf->write_index];
return OG_SUCCESS;
}
return OG_ERROR;
}
void bak_set_write_done(bak_rw_buf_t *rw_buf)
{
rw_buf->buf_stat[rw_buf->write_index] = OG_FALSE;
rw_buf->write_index = !rw_buf->write_index;
}
void bak_wait_write_finish(bak_rw_buf_t *rw_buf, bak_process_t *proc)
{
for (uint32 i = 0; i <= 1 && !proc->write_failed && !proc->read_failed;) {
if (rw_buf->buf_stat[i] == OG_TRUE) {
cm_sleep(BAK_WAIT_WRITE_FINISH_TIME);
continue;
}
i++;
}
}
#ifdef __cplusplus
}
#endif