* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* bak_build.c
*
*
* IDENTIFICATION
* src/kernel/backup/bak_build.c
*
* -------------------------------------------------------------------------
*/
#include "knl_backup_module.h"
#include "bak_build.h"
#include "cm_file.h"
#include "cm_kmc.h"
#include "knl_backup.h"
#include "bak_restore.h"
#include "knl_context.h"
#include "dtc_database.h"
static void bak_set_build_params(bak_t *bak, cs_pipe_t *pipe, cs_packet_t *send_pack, cs_packet_t *recv_pack,
build_param_ctrl_t *ctrl)
{
bak_remote_t *remote = &bak->remote;
bak_attr_t *attr = &bak->record.attr;
char ipstr[CM_MAX_IP_LEN];
bak->record.start_time = (uint64)cm_now();
bak->is_building = OG_TRUE;
bak->record.log_only = OG_FALSE;
bak->record.data_only = OG_FALSE;
bak->record.is_repair = ctrl->is_repair;
bak->record.is_increment = ctrl->is_increment;
attr->backup_type = BACKUP_MODE_FULL;
attr->level = (ctrl->is_increment || ctrl->is_repair) ? OG_TRUE : OG_FALSE;
attr->base_lsn = ctrl->base_lsn;
attr->compress = ctrl->compress;
bak->compress_ctx.compress_level = ctrl->compress_level;
bak->encrypt_info.encrypt_alg = ENCRYPT_NONE;
bak->record.device = DEVICE_DISK;
bak->cumulative = OG_FALSE;
bak->is_first_link = OG_TRUE;
bak->need_retry = OG_FALSE;
bak->need_check = OG_FALSE;
bak->failed = OG_FALSE;
bak->proc_count = ctrl->parallelism;
bak->backup_buf_size = ctrl->buffer_size;
remote->pipe = pipe;
remote->send_pack = send_pack;
remote->recv_pack = recv_pack;
errno_t ret = strncpy_s(bak->peer_host, OG_HOST_NAME_BUFFER_SIZE, cm_inet_ntop(
(struct sockaddr *)&pipe->link.tcp.remote.addr, ipstr, CM_MAX_IP_LEN), OG_HOST_NAME_BUFFER_SIZE - 1);
knl_securec_check(ret);
OG_LOG_RUN_INF("[BUILD] is increment %u, base lsn %llu ", ctrl->is_increment, ctrl->base_lsn);
}
bool32 knl_brain_repair_check(knl_session_t *session)
{
reset_log_t *reset_log = &session->kernel->lrcv_ctx.primary_resetlog;
knl_instance_t *kernel = (knl_instance_t *)session->kernel;
log_point_t *rcy = &dtc_my_ctrl(session)->rcy_point;
if ((kernel->lrcv_ctx.dbid != 0) && (kernel->lrcv_ctx.dbid != kernel->db.ctrl.core.dbid)) {
OG_LOG_RUN_ERR("[REPAIR] primary dbid [%u] is not equal to standby dbid [%u], need repair",
kernel->lrcv_ctx.dbid, kernel->db.ctrl.core.dbid);
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", primary dbid is not equal to standby dbid.");
return OG_FALSE;
}
if (reset_log->last_asn > rcy->asn || rcy->rst_id >= reset_log->rst_id) {
OG_LOG_RUN_ERR("[REPAIR] reset_log and rcy point not matched. reset_log rst_id-asn-lfn: %u-%u-%llu,"
"rcy point rst_id-asn-lfn: %u-%u-%llu.", reset_log->rst_id, reset_log->last_asn, reset_log->last_lfn,
rcy->rst_id, rcy->asn, (uint64)rcy->lfn);
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", asn of reset_log is larger than asn of rcy or rst_id of rcy is larger "
"than rst_id of reset_log.");
return OG_FALSE;
}
uint32 log_count = log_get_count(session);
if (rcy->asn - reset_log->last_asn >= log_count) {
OG_LOG_RUN_ERR("[REPAIR] the count of log between rcy(asn: %u) and reset_log(asn: %u) is larger than "
"log_count %u.", rcy->asn, reset_log->last_asn, log_count);
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", log generated by standby is too large.");
return OG_FALSE;
}
if (reset_log->rst_id - rcy->rst_id > 1) {
OG_LOG_RUN_ERR("[REPAIR] reset_log and rcy point not matched. reset_log rst_id-asn-lfn: %u-%u-%llu,"
"rcy point rst_id-asn-lfn: %u-%u-%llu.", reset_log->rst_id, reset_log->last_asn, reset_log->last_lfn,
rcy->rst_id, rcy->asn, (uint64)rcy->lfn);
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", rst_id of reset_log and rcy is not mapped.");
return OG_FALSE;
}
if (reset_log->last_lfn > rcy->lfn) {
OG_LOG_RUN_ERR("[REPAIR] primary lfn [%llu] is larger than standby rcy lfn [%llu]", reset_log->last_lfn, (uint64)rcy->lfn);
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", primary lfn is larger than standby rcy lfn.");
return OG_FALSE;
}
return OG_TRUE;
}
static status_t build_analyze_mem_init(knl_session_t *session)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
build_analyse_bucket_t *free_list = &bak->build_aly_free_list;
int64 buf_size = BUILD_ALY_MAX_PAGE_SIZE + BUILD_ALY_MAX_ITEM_SIZE + BUILD_ALY_MAX_BUCKET_SIZE;
errno_t ret;
if (cm_aligned_malloc(buf_size, "build log analysis", &bak->build_aly_mem) != OG_SUCCESS) {
return OG_ERROR;
}
bak->build_aly_pages = (page_id_t *)bak->build_aly_mem.aligned_buf;
bak->build_aly_items = (build_analyse_item_t *)(bak->build_aly_mem.aligned_buf + BUILD_ALY_MAX_PAGE_SIZE);
bak->build_aly_buckets = (build_analyse_bucket_t *)(bak->build_aly_mem.aligned_buf + BUILD_ALY_MAX_PAGE_SIZE +
BUILD_ALY_MAX_ITEM_SIZE);
ret = memset_sp(bak->build_aly_mem.aligned_buf, (size_t)buf_size, 0, (size_t)buf_size);
knl_securec_check(ret);
free_list->count = BUILD_ALY_MAX_ITEM;
free_list->first = &bak->build_aly_items[0];
for (uint32 i = 0; i < BUILD_ALY_MAX_ITEM - 1; i++) {
bak->build_aly_items[i].next = &bak->build_aly_items[i + 1];
bak->build_aly_items[i].page_id = &bak->build_aly_pages[i];
}
bak->build_aly_items[BUILD_ALY_MAX_ITEM - 1].page_id = &bak->build_aly_pages[BUILD_ALY_MAX_ITEM - 1];
bak->page_count = 0;
if (cm_aligned_malloc(OG_MAX_BATCH_SIZE, "build analyze log buffer", &bak->log_buf) != OG_SUCCESS) {
cm_aligned_free(&bak->build_aly_mem);
return OG_ERROR;
}
return OG_SUCCESS;
}
static void build_analyze_mem_free(knl_session_t *session)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
cm_aligned_free(&bak->build_aly_mem);
bak->build_aly_pages = NULL;
bak->build_aly_items = NULL;
bak->build_aly_buckets = NULL;
bak->page_count = 0;
cm_aligned_free(&bak->log_buf);
}
build_analyse_item_t *build_analyze_pop_free_item(knl_session_t *session)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
build_analyse_bucket_t *free_list = &bak->build_aly_free_list;
build_analyse_item_t *item = NULL;
if (free_list->first == NULL) {
return NULL;
}
item = free_list->first;
free_list->first = item->next;
free_list->count--;
item->next = NULL;
return item;
}
static void brain_repair_invalidate_page(bak_t *bak, page_id_t *page_id)
{
uint32 file_hash = page_id->file % BUILD_ALY_MAX_FILE;
uint32 page_hash = page_id->page % BUILD_ALY_MAX_BUCKET_PER_FILE;
build_analyse_bucket_t *bucket = &bak->build_aly_buckets[file_hash * BUILD_ALY_MAX_BUCKET_PER_FILE + page_hash];
build_analyse_item_t *item = bucket->first;
while (item != NULL) {
if (IS_SAME_PAGID(*item->page_id, *page_id)) {
item->page_id->file = OG_INVALID_FILEID;
break;
}
item = item->next;
}
}
void brain_repair_filer_page_from_remote(knl_session_t *session, page_head_t *page, uint32 page_count)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
page_head_t *page_head = page;
if (!bak->record.is_repair) {
return;
}
char *page_buff = NULL;
for (uint32 i = 0; i < page_count; i++) {
page_id_t *page_id = AS_PAGID_PTR(page_head->id);
brain_repair_invalidate_page(bak, page_id);
page_buff = (char*)page_head + DEFAULT_PAGE_SIZE(session);
page_head = (page_head_t *)page_buff;
}
return;
}
static status_t build_aly_add_page(knl_session_t *session, page_id_t page_id)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
uint32 file_hash = page_id.file % BUILD_ALY_MAX_FILE;
uint32 page_hash = page_id.page % BUILD_ALY_MAX_BUCKET_PER_FILE;
build_analyse_bucket_t *bucket = &bak->build_aly_buckets[file_hash * BUILD_ALY_MAX_BUCKET_PER_FILE + page_hash];
build_analyse_item_t *item = bucket->first;
while (item != NULL) {
if (IS_SAME_PAGID(*item->page_id, page_id)) {
return OG_SUCCESS;
}
item = item->next;
}
build_analyse_item_t *new_item = build_analyze_pop_free_item(session);
if (new_item == NULL) {
OG_LOG_RUN_ERR("[BUILD] pages changed by standby is too much.");
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", pages changed by standby is too much.");
return OG_ERROR;
}
OG_LOG_DEBUG_INF("[REPAIR] standby find page %u-%u", page_id.file, page_id.page);
new_item->next = bucket->first;
*new_item->page_id = page_id;
bucket->first = new_item;
bucket->count++;
bak->page_count++;
return OG_SUCCESS;
}
static void build_analyze_construct_hash(knl_session_t *session)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
build_analyse_item_t *item = NULL;
build_analyse_bucket_t *bucket = NULL;
uint32 file_hash;
uint32 page_hash;
for (uint32 i = 0; i < bak->page_count; i++) {
item = &bak->build_aly_items[i];
file_hash = item->page_id->file % BUILD_ALY_MAX_FILE;
page_hash = item->page_id->page % BUILD_ALY_MAX_BUCKET_PER_FILE;
bucket = &bak->build_aly_buckets[file_hash * BUILD_ALY_MAX_BUCKET_PER_FILE + page_hash];
item->next = bucket->first;
bucket->first = item;
bucket->count++;
}
}
static status_t build_find_batch(knl_session_t *session, device_type_t type, int32 *handle, log_file_head_t *head,
log_point_t *point, uint32 *data_size, aligned_buf_t *align_buf)
{
uint64 size;
uint64 remain_size;
uint64 buf_size = (uint64)align_buf->buf_size;
bool32 finished = OG_FALSE;
log_batch_t *batch = NULL;
log_batch_tail_t *tail = NULL;
char *buf = align_buf->aligned_buf;
uint64 last_lfn = 0;
int64 file_size = cm_file_size(*handle);
if (file_size == -1) {
OG_LOG_RUN_ERR("[BUILD] failed to get log [%u-%u] size ", point->rst_id, point->asn);
OG_THROW_ERROR(ERR_SEEK_FILE, 0, SEEK_END, errno);
return OG_ERROR;
}
uint64 offset = CM_CALC_ALIGN(sizeof(log_file_head_t), (uint32)head->block_size);
do {
size = (uint64)file_size - offset;
size = size > buf_size ? buf_size : size;
if (size == 0 || finished) {
break;
}
if (cm_read_device(type, *handle, (int64)offset, buf, (int32)size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[BUILD] Failed to read log [%u-%u] file", point->rst_id, point->asn);
return OG_ERROR;
}
batch = (log_batch_t *)buf;
tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));
remain_size = size;
while (remain_size >= sizeof(log_batch_t)) {
if (!rcy_validate_batch(batch, tail) || batch->head.point.rst_id != head->rst_id ||
(last_lfn != 0 && batch->head.point.lfn != last_lfn + 1)) {
finished = OG_TRUE;
OG_LOG_RUN_INF("[BUILD] log [%u-%u] offset %llu invalid batch size %u "
"head [%llu/%u-%u/%llu/%llu] latest_lfn %llu",
head->rst_id, head->asn, offset, batch->size, batch->head.magic_num,
batch->head.point.rst_id, batch->head.point.asn,
(uint64)batch->head.point.lfn, batch->raft_index, last_lfn);
break;
}
last_lfn = batch->head.point.lfn;
if (batch->head.point.lfn == point->lfn) {
finished = OG_TRUE;
if (session->kernel->lrcv_ctx.primary_reset_log_scn == batch->scn ||
session->kernel->lrcv_ctx.primary_reset_log_scn == 0) {
*data_size = batch->size;
} else {
*data_size = 0;
OG_LOG_RUN_ERR("[BUILD] For same lfn [%llu] in resetlog, scn from primary and standby [%llu/%llu] "
"is different, break-brain build is not allowed", (uint64)point->lfn,
session->kernel->lrcv_ctx.primary_reset_log_scn, batch->scn);
}
break;
}
offset += batch->space_size;
remain_size -= batch->space_size;
batch = (log_batch_t *)((char *)batch + batch->space_size);
if (remain_size < batch->space_size) {
break;
}
tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));
}
} while (1);
if (*data_size != 0) {
errno_t ret = memmove_s(buf, (uint32)buf_size, batch, batch->size);
knl_securec_check(ret);
}
return OG_SUCCESS;
}
static status_t build_load_from_online(knl_session_t *session, uint32 file_id, log_point_t *point, uint32 *data_size,
aligned_buf_t *align_buf)
{
log_file_head_t head;
int32 handle = OG_INVALID_HANDLE;
log_file_t *file = &session->kernel->redo_ctx.files[file_id];
if (cm_open_device(file->ctrl->name, file->ctrl->type, knl_redo_io_flag(session), &handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[BUILD] failed to open %s", file->ctrl->name);
return OG_ERROR;
}
uint64 size = CM_CALC_ALIGN(sizeof(log_file_head_t), file->ctrl->block_size);
log_file_head_t *tmp_head = (log_file_head_t *)align_buf->aligned_buf;
if (cm_read_device(file->ctrl->type, handle, 0, tmp_head, (int32)size) != OG_SUCCESS) {
cm_close_device(file->ctrl->type, &handle);
OG_LOG_RUN_ERR("[BUILD] failed to read %s ", file->ctrl->name);
return OG_ERROR;
}
head.block_size = tmp_head->block_size;
head.rst_id = tmp_head->rst_id;
head.asn = tmp_head->asn;
if (build_find_batch(session, file->ctrl->type, &handle, &head, point, data_size, align_buf) != OG_SUCCESS) {
cm_close_device(file->ctrl->type, &handle);
return OG_ERROR;
}
cm_close_device(file->ctrl->type, &handle);
return OG_SUCCESS;
}
static status_t build_load_from_arch(knl_session_t *session, log_point_t *point, uint32 *data_size, aligned_buf_t *align_buf)
{
arch_file_t arch_file = { .name = { 0 }, .handle = OG_INVALID_HANDLE };
bool32 is_compress = OG_FALSE;
if (rcy_load_arch(session, (uint32)point->rst_id, point->asn, &arch_file, &is_compress) != OG_SUCCESS) {
cm_close_device(cm_device_type(arch_file.name), &arch_file.handle);
rcy_close_file(session);
return OG_ERROR;
}
if (build_find_batch(session, cm_device_type(arch_file.name), &arch_file.handle, &arch_file.head, point, data_size,
align_buf) != OG_SUCCESS) {
cm_close_device(cm_device_type(arch_file.name), &arch_file.handle);
rcy_close_file(session);
return OG_ERROR;
}
cm_close_device(cm_device_type(arch_file.name), &arch_file.handle);
return OG_SUCCESS;
}
static status_t build_load_batch(knl_session_t *session, log_point_t *point, uint32 *data_size, aligned_buf_t *buf)
{
status_t status = OG_SUCCESS;
knl_instance_t *kernel = session->kernel;
bak_context_t *ogx = &kernel->backup_ctx;
bak_t *bak = &ogx->bak;
log_lock_logfile(session);
uint32 file_id = bak_log_get_id(session, bak->record.data_type, (uint32)point->rst_id, point->asn);
log_unlock_logfile(session);
if (file_id != OG_INVALID_ID32) {
status = build_load_from_online(session, file_id, point, data_size, buf);
log_unlatch_file(session, file_id);
} else {
status = build_load_from_arch(session, point, data_size, buf);
}
return status;
}
static status_t bak_build_prepare(knl_session_t *session)
{
struct st_knl_instance *kernel = session->kernel;
bak_context_t *ogx = &kernel->backup_ctx;
bak_t *bak = &ogx->bak;
bak_remote_t *remote = &bak->remote;
* malloc a memory space to store four parts as follows:
* |--bak->backup_buf--------|--bak->compress_buf------|--remote->send_pack----|--remote->recv_pack----|
* |--BACKUP_BUFFER_SIZE(bak)--|--BACKUP_BUFFER_SIZE(bak)--|--sizeof(cs_packet_t)--|--sizeof(cs_packet_t)--|
* so the total space size is BACKUP_BUFFER_SIZE(bak) * 2 and sizeof(cs_packet_t) * 2
*/
size_t size = BACKUP_BUFFER_SIZE(bak) + BACKUP_BUFFER_SIZE(bak) + sizeof(cs_packet_t) + sizeof(cs_packet_t);
char *buf = (char *)malloc(size);
if (buf == NULL) {
OG_THROW_ERROR(ERR_ALLOC_MEMORY, (uint64)size, "backup");
return OG_ERROR;
}
size_t offset = 0;
bak->backup_buf = buf + offset;
offset += BACKUP_BUFFER_SIZE(bak);
bak_reset_process_ctrl(bak, OG_TRUE);
bak->compress_buf = buf + offset;
offset += BACKUP_BUFFER_SIZE(bak);
* between the memory addresses of remote->send_pack and bak->backup_buf are bak->backup_buf and bak->compress_buf,
* so remote->send_pack is offset by 2 * OG_BACKUP_BUFFER_SIZE compared to bak->backup_buf
*/
remote->send_pack = (cs_packet_t *)(buf + offset);
offset += sizeof(cs_packet_t);
* between the memory addresses of remote->recv_pack and bak->backup_buf are bak->backup_buf,bak->compress_buf and
* remote->send_pack, so remote->recv_pack is offset by 2 * OG_BACKUP_BUFFER_SIZE and sizeof(cs_packet_t) compared
* to bak->backup_buf
*/
remote->recv_pack = (cs_packet_t *)(buf + offset);
remote->pipe = &remote->send_pipe;
remote->pipe->link.tcp.sock = CS_INVALID_SOCKET;
bak->record.start_time = (uint64)cm_now();
bak->record.attr.backup_type = BACKUP_MODE_FULL;
bak->is_building = OG_TRUE;
bak->ctrl_data_buf = (char *)malloc(CTRL_MAX_PAGES(session) * OG_DFLT_CTRL_BLOCK_SIZE);
if (bak->ctrl_data_buf == NULL) {
OG_THROW_ERROR(ERR_ALLOC_MEMORY, (uint32)(CTRL_MAX_PAGES(session) * OG_DFLT_CTRL_BLOCK_SIZE), "build");
return OG_ERROR;
}
if (bak->proc_count > BUILD_SINGLE_THREAD) {
if (cm_aligned_malloc(BACKUP_BUFFER_SIZE(bak), "rst stream buf0", &bak->recv_stream.bufs[0]) != OG_SUCCESS) {
free(bak->ctrl_data_buf);
bak->ctrl_data_buf = NULL;
return OG_ERROR;
}
if (cm_aligned_malloc(BACKUP_BUFFER_SIZE(bak), "rst stream buf1", &bak->recv_stream.bufs[1]) != OG_SUCCESS) {
free(bak->ctrl_data_buf);
bak->ctrl_data_buf = NULL;
cm_aligned_free(&bak->recv_stream.bufs[0]);
return OG_ERROR;
}
bak->recv_stream.buf_size = BACKUP_BUFFER_SIZE(bak);
}
if (rst_start_write_thread(&ogx->process[BAK_COMMON_PROC]) != OG_SUCCESS) {
free(bak->ctrl_data_buf);
bak->ctrl_data_buf = NULL;
cm_aligned_free(&bak->recv_stream.bufs[0]);
cm_aligned_free(&bak->recv_stream.bufs[1]);
return OG_ERROR;
}
return OG_SUCCESS;
}
void bak_build_end(knl_session_t *session)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
bak_error_t *error_info = &bak->error_info;
OG_LOG_RUN_INF("[BUILD] remote backup finished, release resources");
for (int32 i = OG_MAX_BACKUP_PROCESS - 1; i >= 0; i--) {
bak_reset_process(&ogx->process[i]);
}
if (bak->backup_buf != NULL) {
free(bak->backup_buf);
bak->backup_buf = NULL;
bak->depends = NULL;
bak->compress_buf = NULL;
bak->ctrl_backup_buf = NULL;
}
if (bak->ctrl_data_buf != NULL) {
free(bak->ctrl_data_buf);
bak->ctrl_data_buf = NULL;
}
bak_free_compress_context(session, OG_FALSE);
if (bak->proc_count > BUILD_SINGLE_THREAD) {
cm_aligned_free(&bak->recv_stream.bufs[0]);
cm_aligned_free(&bak->recv_stream.bufs[1]);
bak->recv_stream.buf_size = 0;
}
cm_close_device(bak->local.type, &bak->local.handle);
bak->local.handle = OG_INVALID_HANDLE;
if (bak->failed && !bak->need_retry) {
bak_set_fail_error(error_info, "build");
OG_LOG_RUN_INF("[BUILD] build failed, error code :%d, %s", error_info->err_code, error_info->err_msg);
}
build_disconnect(bak);
bak->failed = OG_FALSE;
bak->is_building = OG_FALSE;
bak->record.is_increment = OG_FALSE;
bak->record.is_repair = OG_FALSE;
if (rst_delete_track_file(session, bak, OG_TRUE) != OG_SUCCESS) {
OG_LOG_RUN_WAR("[BUILD] failed to delete bak process file.");
}
bak_unset_running(session, ogx);
}
static status_t bak_build_connect(knl_session_t *session)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
bak_remote_t *remote = &bak->remote;
uint16 port;
char url[OG_HOST_NAME_BUFFER_SIZE + OG_TCP_PORT_MAX_LENGTH + 1] = { 0 };
char server_host[OG_HOST_NAME_BUFFER_SIZE];
char bind_host[OG_HOST_NAME_BUFFER_SIZE];
int32 ret;
if (lrcv_get_primary_server(session, OG_BACKUP_RETRY_COUNT, server_host, OG_HOST_NAME_BUFFER_SIZE,
&port) != OG_SUCCESS) {
return OG_ERROR;
}
ret = snprintf_s(url, sizeof(url), sizeof(url) - 1, "%s:%u", server_host, (uint32)port);
knl_securec_check_ss(ret);
OG_LOG_RUN_INF("[BUILD] remote backup, standby connect to primary %s", url);
remote->pipe->connect_timeout = REPL_CONNECT_TIMEOUT;
remote->pipe->socket_timeout = REPL_SOCKET_TIMEOUT;
arch_get_bind_host(session, server_host, bind_host, OG_HOST_NAME_BUFFER_SIZE);
if (cs_connect(url, remote->pipe, bind_host, NULL, NULL) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[BACKUP] failed to connect %s", url);
return OG_ERROR;
}
if (knl_login(session, remote->pipe, REP_LOGIN_BACKUP, (const char *)bind_host, NULL) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[BACKUP] login failed");
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t bak_build_start(knl_session_t *session)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
if (arch_init(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_alloc_compress_context(session, OG_FALSE) != OG_SUCCESS) {
bak->failed = OG_TRUE;
return OG_ERROR;
}
if (bak_build_prepare(session) != OG_SUCCESS) {
bak->failed = OG_TRUE;
return OG_ERROR;
}
for (uint32 i = 0; i < BUILD_MAX_RETRY; i++) {
if (bak_build_connect(session) == OG_SUCCESS) {
return OG_SUCCESS;
}
}
bak->need_retry = OG_TRUE;
return OG_ERROR;
}
static bak_file_type_t rst_calc_file_type(uint32 type)
{
bak_file_type_t file_type;
switch (type) {
case BAK_MSG_TYPE_CTRL:
file_type = BACKUP_CTRL_FILE;
break;
case BAK_MSG_TYPE_DATA:
file_type = BACKUP_DATA_FILE;
break;
case BAK_MSG_TYPE_ARCH:
file_type = BACKUP_ARCH_FILE;
break;
case BAK_MSG_TYPE_LOG:
file_type = BACKUP_LOG_FILE;
break;
default:
file_type = BACKUP_HEAD_FILE;
break;
}
return file_type;
}
static inline bool32 rst_restart_from_file_end(bak_t *bak, bak_start_msg_t *start_msg)
{
return (BAK_IS_FULL_BUILDING(bak) && bak->need_check && bak->file_count != start_msg->curr_file_index);
}
static status_t rst_read_start_msg(knl_session_t *session, bak_start_msg_t *start_msg)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
bool32 read_end = OG_FALSE;
if (rst_agent_read_head(bak, BAK_PKG_FILE_START, &bak->remote.remain_data_size, &read_end) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_agent_recv(bak, (char *)start_msg, sizeof(bak_start_msg_t)) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_agent_send_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
return OG_ERROR;
}
OG_LOG_RUN_INF("[RESTORE]: type:%u, file_id:%u, curr_file_index:%u", start_msg->type, start_msg->file_id,
start_msg->curr_file_index);
if (start_msg->type != BAK_MSG_TYPE_HEAD) {
if (rst_restart_from_file_end(bak, start_msg)) {
OG_LOG_RUN_INF("[BUILD] break-point build restart from end of file [%u/%u]", bak->file_count,
start_msg->curr_file_index);
if (bak->file_count != start_msg->curr_file_index - 1) {
OG_LOG_RUN_INF("[BUILD] break-point build restart from unexpected position");
return OG_ERROR;
}
bak->file_count++;
}
bak->need_check = OG_FALSE;
bak->files[bak->file_count].type = rst_calc_file_type(start_msg->type);
bak->files[bak->file_count].id = start_msg->file_id;
bak->curr_file_index = bak->file_count;
bak->file_count++;
}
return OG_SUCCESS;
}
static bak_stage_t rst_calc_stage(uint32 type)
{
bak_stage_t stage;
switch (type) {
case BAK_MSG_TYPE_CTRL:
stage = BACKUP_CTRL_STAGE;
break;
case BAK_MSG_TYPE_DATA:
stage = BACKUP_DATA_STAGE;
break;
case BAK_MSG_TYPE_HEAD:
stage = BACKUP_HEAD_STAGE;
break;
default:
stage = BACKUP_LOG_STAGE;
break;
}
return stage;
}
static status_t rst_build_receive_files(knl_session_t *session)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
bak_start_msg_t start_msg;
bak_stage_t stage = BACKUP_CTRL_STAGE;
bak->head_is_built = OG_FALSE;
while (!bak->failed && stage != BACKUP_HEAD_STAGE) {
if (bak_check_session_status(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_read_start_msg(session, &start_msg) != OG_SUCCESS) {
return OG_ERROR;
}
stage = rst_calc_stage(start_msg.type);
if (stage == BACKUP_HEAD_STAGE) {
bak->remote.remain_data_size = 0;
bak->progress.stage = BACKUP_READ_FINISHED;
break;
}
if (bak->progress.stage != stage) {
bak->progress.stage = stage;
}
bak_file_type_t type = rst_calc_file_type(start_msg.type);
if (type >= BACKUP_DATA_FILE && type <= BACKUP_ARCH_FILE && bak_paral_task_enable(session)) {
if (rst_stream_read_file(session) != OG_SUCCESS) {
return OG_ERROR;
}
} else {
if (rst_read_file(session, bak->curr_file_index) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (bak_agent_send_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
return OG_ERROR;
}
}
return bak->failed ? OG_ERROR : OG_SUCCESS;
}
static status_t rst_build_keyfile(knl_session_t *session, bak_head_t *head, int32 read_size)
{
return OG_SUCCESS;
}
static status_t rst_build_head(knl_session_t *session)
{
bak_context_t *ctx = &session->kernel->backup_ctx;
bak_t *bak = &ctx->bak;
int32 read_size;
bool32 read_end = OG_FALSE;
int32 buf_size = sizeof(bak_head_t);
bak_head_t *head = (bak_head_t *)bak->backup_buf;
int32 expect_size = (int32)sizeof(bak_head_t);
if (rst_read_data(ctx, (void *)head, buf_size, &read_size, &read_end, 0) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_read_check_size(read_size, expect_size, "backupset head") != OG_SUCCESS) {
return OG_ERROR;
}
if (bak->is_building) {
if (rst_build_keyfile(session, head, read_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[BUILD] rst build keyfile failed.");
return OG_ERROR;
}
}
if (rst_set_head(session, head, OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_wait_agent_process(bak) != OG_SUCCESS) {
return OG_ERROR;
}
bak->head_is_built = OG_TRUE;
return OG_SUCCESS;
}
static status_t build_analyze_entry(knl_session_t *session, log_entry_t *log)
{
if (RD_TYPE_IS_ENTER_PAGE(log->type)) {
rd_enter_page_t *redo = (rd_enter_page_t *)log->data;
page_id_t page_id = MAKE_PAGID(redo->file, redo->page);
if (build_aly_add_page(session, page_id) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t build_analyze_group(knl_session_t *session, log_group_t *group)
{
uint32 offset = sizeof(log_group_t);
log_entry_t *log = NULL;
while (offset < LOG_GROUP_ACTUAL_SIZE(group)) {
log = (log_entry_t *)((char *)group + offset);
if (build_analyze_entry(session, log) != OG_SUCCESS) {
return OG_ERROR;
}
offset += log->size;
}
return OG_SUCCESS;
}
static status_t build_analyze_batch(knl_session_t *session, log_batch_t *batch)
{
knl_session_t *se = session->kernel->sessions[SESSION_ID_KERNEL];
log_context_t *ogx = &session->kernel->redo_ctx;
log_cursor_t cursor;
rcy_init_log_cursor(&cursor, batch);
log_group_t *group = log_fetch_group(ogx, &cursor);
while (group != NULL) {
if (build_analyze_group(se, group) != OG_SUCCESS) {
return OG_ERROR;
}
group = log_fetch_group(ogx, &cursor);
}
return OG_SUCCESS;
}
static inline void build_next_file(log_point_t *point, bool32 *need_more_log)
{
point->asn++;
point->block_id = 0;
*need_more_log = OG_TRUE;
}
static bool32 build_check_batch(log_point_t *point, log_batch_t *batch, bool32 *need_more_log)
{
if (batch->head.magic_num != LOG_MAGIC_NUMBER || !LFN_IS_CONTINUOUS(batch->head.point.lfn, point->lfn)) {
build_next_file(point, need_more_log);
return OG_FALSE;
}
return OG_TRUE;
}
static status_t build_analyze(knl_session_t *session, log_point_t *point, log_batch_t *batch, uint32 block_size,
uint32 data_size_input, bool32 *need_more_log)
{
uint32 data_size = data_size_input;
log_context_t *ogx = &session->kernel->redo_ctx;
log_batch_tail_t *tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));
bool32 first_batch = OG_TRUE;
if (data_size == 0) {
build_next_file(point, need_more_log);
return OG_SUCCESS;
}
while (data_size >= sizeof(log_batch_id_t)) {
if (!build_check_batch(point, batch, need_more_log)) {
return OG_SUCCESS;
}
if (data_size < batch->space_size) {
if (first_batch) {
*need_more_log = OG_FALSE;
OG_LOG_RUN_ERR("[BUILD] analyze failed, invalid batch(%u,%u).", point->asn, point->block_id);
return OG_ERROR;
}
*need_more_log = OG_TRUE;
return OG_SUCCESS;
}
if (!rcy_validate_batch(batch, tail)) {
*need_more_log = OG_FALSE;
OG_LOG_RUN_ERR("[BUILD] analyze failed, invalid batch(%u,%u).", point->asn, point->block_id);
return OG_ERROR;
}
first_batch = OG_FALSE;
if (rcy_verify_checksum(session, batch) != OG_SUCCESS) {
*need_more_log = OG_FALSE;
OG_LOG_RUN_ERR("[BUILD] analyze failed");
return OG_ERROR;
}
if (batch->encrypted) {
if (log_decrypt(session, batch, ogx->logwr_cipher_buf, ogx->logwr_cipher_buf_size) != OG_SUCCESS) {
*need_more_log = OG_FALSE;
OG_LOG_RUN_ERR("[BUILD] analyze failed");
return OG_ERROR;
}
}
if (build_analyze_batch(session, batch) != OG_SUCCESS) {
*need_more_log = OG_FALSE;
OG_LOG_RUN_ERR("[BUILD] analyze failed");
return OG_ERROR;
}
data_size -= batch->space_size;
point->lfn = batch->head.point.lfn;
point->block_id += batch->space_size / block_size;
batch = (log_batch_t *)((char *)batch + batch->space_size);
tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));
}
*need_more_log = OG_TRUE;
return OG_SUCCESS;
}
static status_t build_analyze_redo(knl_session_t *session, log_point_t *curr_point)
{
aligned_buf_t log_buf = session->kernel->backup_ctx.bak.log_buf;
log_batch_t *batch = NULL;
bool32 need_more_log = OG_FALSE;
uint32 data_size = 0;
uint32 block_size;
while (bak_load_log_batch(session, curr_point, &data_size, &log_buf, &block_size) == OG_SUCCESS) {
batch = (log_batch_t *)log_buf.aligned_buf;
if (build_analyze(session, curr_point, batch, block_size, data_size, &need_more_log) != OG_SUCCESS) {
cm_aligned_free(&log_buf);
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", analyze log failed");
return OG_ERROR;
}
if (!need_more_log) {
break;
}
}
return OG_SUCCESS;
}
static status_t build_repair_prepare(knl_session_t *session, uint64 *base_lsn)
{
reset_log_t resetlog = session->kernel->lrcv_ctx.primary_resetlog;
aligned_buf_t log_buf = session->kernel->backup_ctx.bak.log_buf;
log_point_t target_point;
uint32 data_size = 0;
log_point_t *lrp = &dtc_my_ctrl(session)->lrp_point;
target_point.rst_id = resetlog.rst_id - 1;
target_point.asn = resetlog.last_asn;
target_point.lfn = resetlog.last_lfn;
target_point.block_id = 0;
OG_LOG_RUN_INF("[REPAIR] analysis primary resetlog rstid-asn-lfn: %u-%u-%llu", resetlog.rst_id, resetlog.last_asn,
resetlog.last_lfn);
if (log_load(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (build_load_batch(session, &target_point, &data_size, &log_buf) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", can not find resetlog point in standby.");
return OG_ERROR;
}
if (data_size != 0) {
log_batch_t *batch = (log_batch_t *)log_buf.aligned_buf;
*base_lsn = rcy_fetch_batch_lsn(session, batch);
target_point.block_id = batch->head.point.block_id;
} else {
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", can not find resetlog point in standby.");
return OG_ERROR;
}
target_point.lfn -= 1;
OG_LOG_RUN_INF("[REPAIR] analysis start point rstid-asn-lfn: %u-%u-%llu", target_point.rst_id, target_point.asn,
(uint64)target_point.lfn);
if (build_analyze_redo(session, &target_point) != OG_SUCCESS) {
return OG_ERROR;
}
OG_LOG_RUN_INF("[REPAIR] analysis end point rstid-asn-lfn: %u-%u-%llu", target_point.rst_id, target_point.asn,
(uint64)target_point.lfn);
if (target_point.lfn < lrp->lfn) {
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", analysis end lfn is smaller than lrp lfn.");
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t rst_build_get_base_lsn(knl_session_t *session, uint64 *base_lsn)
{
log_point_t start_point = dtc_my_ctrl(session)->rcy_point;
log_batch_t *batch = NULL;
log_batch_tail_t *tail = NULL;
uint32 data_size = 0;
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
database_t *db = &session->kernel->db;
reset_log_t rst_log = db->ctrl.core.resetlogs;
aligned_buf_t log_buf;
uint32 block_size;
if (log_load(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (dtc_my_ctrl(session)->rcy_point.lfn == dtc_my_ctrl(session)->lrp_point.lfn) {
*base_lsn = DB_CURR_LSN(session);
return OG_SUCCESS;
}
if (cm_aligned_malloc(OG_MAX_BATCH_SIZE, "backup log buffer", &log_buf) != OG_SUCCESS) {
return OG_ERROR;
}
for (;;) {
if (bak_load_log_batch(session, &start_point, &data_size, &log_buf, &block_size) != OG_SUCCESS) {
cm_aligned_free(&log_buf);
return OG_ERROR;
}
batch = (log_batch_t *)log_buf.aligned_buf;
if (data_size >= sizeof(log_batch_t) && data_size >= batch->size) {
tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));
if (rcy_validate_batch(batch, tail)) {
break;
}
}
start_point.asn++;
start_point.rst_id = bak_get_rst_id(bak, start_point.asn, &(rst_log));
start_point.block_id = 0;
}
*base_lsn = rcy_fetch_batch_lsn(session, batch);
cm_aligned_free(&log_buf);
return OG_SUCCESS;
}
static bool32 rst_logfile_not_changed(knl_instance_t *kernel, log_file_ctrl_t *log_old, log_file_ctrl_t *log_new)
{
char name[OG_FILE_NAME_BUFFER_SIZE];
errno_t ret;
ret = strcpy_sp(name, OG_FILE_NAME_BUFFER_SIZE, log_new->name);
knl_securec_check(ret);
if (LOG_IS_DROPPED(log_old->flg) != LOG_IS_DROPPED(log_new->flg)) {
OG_LOG_RUN_ERR("[BUILD] primary's log flag has changed old: %u, new: %u", log_old->flg, log_new->flg);
return OG_FALSE;
}
if (db_change_storage_path(&kernel->attr.log_file_convert, name, OG_FILE_NAME_BUFFER_SIZE) != OG_SUCCESS) {
return OG_FALSE;
}
if (!cm_str_equal(log_old->name, name)) {
OG_LOG_RUN_ERR("[BUILD] primary's log amount/name has changed old: %s, new: %s", log_old->name, name);
return OG_FALSE;
}
return OG_TRUE;
}
static bool32 rst_datafile_not_changed(knl_instance_t *kernel, datafile_ctrl_t *df_old, datafile_ctrl_t *df_new)
{
char name[OG_FILE_NAME_BUFFER_SIZE];
errno_t ret;
ret = strcpy_sp(name, OG_FILE_NAME_BUFFER_SIZE, df_new->name);
knl_securec_check(ret);
if (df_old->used != df_new->used || (df_old->flag & DATAFILE_FLAG_ONLINE) != (df_new->flag & DATAFILE_FLAG_ONLINE))
{
OG_LOG_RUN_ERR("[BUILD] primary's data flag has changed old_used: %u, new_used: %u, old_flag: %u,"
"new_flag: %u", df_old->used, df_new->used, df_old->flag, df_new->flag);
return OG_FALSE;
}
if (df_old->create_version != df_new->create_version) {
OG_LOG_RUN_ERR("[BUILD] primary's data version has changed old: %u, new: %u", df_old->create_version,
df_new->create_version);
return OG_FALSE;
}
if (db_change_storage_path(&kernel->attr.data_file_convert, name, OG_FILE_NAME_BUFFER_SIZE) != OG_SUCCESS) {
return OG_FALSE;
}
if (!cm_str_equal(df_old->name, name)) {
OG_LOG_RUN_ERR("[BUILD] primary's data amount/name has changed old: %s, new: %s", df_old->name, name);
return OG_FALSE;
}
return OG_TRUE;
}
bool32 rst_db_files_not_changed(knl_session_t *session, ctrl_page_t *new_ctrl)
{
knl_instance_t *kernel = session->kernel;
datafile_ctrl_t *df_old = NULL;
datafile_ctrl_t *df_new = NULL;
log_file_ctrl_t *log_old = NULL;
log_file_ctrl_t *log_new = NULL;
uint32 offset = CTRL_LOG_SEGMENT;
uint32 count;
uint32 i;
for (i = 0; i < OG_MAX_LOG_FILES; i++) {
log_new = (log_file_ctrl_t *)db_get_ctrl_item(new_ctrl, i, sizeof(log_file_ctrl_t), offset);
log_old = MY_LOGFILE_SET(session)->items[i].ctrl;
if (rst_logfile_not_changed(kernel, log_old, log_new) != OG_TRUE) {
return OG_FALSE;
}
}
count = CTRL_MAX_BUF_SIZE / sizeof(log_file_ctrl_t);
offset = offset + (OG_MAX_LOG_FILES - 1) / count + 1;
count = CTRL_MAX_BUF_SIZE / sizeof(space_ctrl_t);
offset = offset + (OG_MAX_SPACES - 1) / count + 1;
for (i = 0; i < OG_MAX_DATA_FILES; i++) {
df_new = (datafile_ctrl_t *)db_get_ctrl_item(new_ctrl, i, sizeof(datafile_ctrl_t), offset);
df_old = kernel->db.datafiles[i].ctrl;
if (rst_datafile_not_changed(kernel, df_old, df_new) != OG_TRUE) {
return OG_FALSE;
}
}
return OG_TRUE;
}
status_t rst_build_update_logfiles(knl_session_t *session, bak_process_t *ogx)
{
log_file_ctrl_t *logfile = NULL;
int32 *handle = NULL;
bool32 is_dbstor = knl_dbs_is_enable_dbs();
for (uint32 i = 0; i < dtc_my_ctrl(session)->log_hwm; i++) {
logfile = MY_LOGFILE_SET(session)->items[i].ctrl;
handle = &MY_LOGFILE_SET(session)->items[i].handle;
if (LOG_IS_DROPPED(logfile->flg)) {
if (strlen(ogx->logfile_name[i]) == 0) {
continue;
}
cm_close_device(ogx->log_type[i], handle);
if (cm_remove_device(ogx->log_type[i], ogx->logfile_name[i]) != OG_SUCCESS) {
return OG_ERROR;
}
continue;
}
if (cm_exist_device(logfile->type, logfile->name) && *handle != OG_INVALID_HANDLE) {
cm_close_device(ogx->log_type[i], handle);
if (cm_open_device(logfile->name, logfile->type, knl_redo_io_flag(session), handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to open %s ", logfile->name);
return OG_ERROR;
}
continue;
}
if (strlen(ogx->logfile_name[i]) > 0) {
cm_close_device(ogx->log_type[i], handle);
if (cm_remove_device(ogx->log_type[i], ogx->logfile_name[i]) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (cm_create_device(logfile->name, logfile->type, knl_redo_io_flag(session), handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] failed to create %s ", logfile->name);
return OG_ERROR;
}
if (is_dbstor) {
break;
}
}
return OG_SUCCESS;
}
static status_t rst_load_ctrlfile(knl_session_t *session)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
text_t ctrlfiles;
database_t *db = &session->kernel->db;
dtc_node_ctrl_t *node = dtc_my_ctrl(session);
char *param = cm_get_config_value(session->kernel->attr.config, "CONTROL_FILES");
cm_str2text(param, &ctrlfiles);
if (ctrlfiles.len == 0) {
OG_LOG_RUN_INF("[BUILD] ctrl file does not exist");
OG_THROW_ERROR(ERR_LOAD_CONTROL_FILE, "ctrl file does not exist");
return OG_ERROR;
}
if (db_load_ctrlspace(session, &ctrlfiles) != OG_SUCCESS) {
return OG_ERROR;
}
node->archived_start = 0;
node->archived_end = 0;
db->ctrl.core.build_completed = OG_FALSE;
db->ctrl.core.db_role = bak->is_building ? REPL_ROLE_PHYSICAL_STANDBY : REPL_ROLE_PRIMARY;
bak->ctrlfile_completed = OG_TRUE;
return OG_SUCCESS;
}
static status_t rst_check_ctrl_file(knl_session_t *session)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
bool32 read_end = OG_FALSE;
int32 read_size;
if (!BAK_IS_FULL_BUILDING(bak) || bak->is_first_link) {
return OG_SUCCESS;
}
OG_LOG_RUN_INF("[BUILD] start read ctrl file");
if (rst_read_data(ogx, bak->ctrl_data_buf, CTRL_MAX_PAGES(session) * OG_DFLT_CTRL_BLOCK_SIZE,
&read_size, &read_end, 0) != OG_SUCCESS) {
return OG_ERROR;
}
OG_LOG_RUN_INF("[BUILD] ctrl file read_size : %u", read_size);
if (bak_agent_send_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_load_ctrlfile(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (!db_sysdata_version_is_equal(session, OG_FALSE)) {
return OG_ERROR;
}
OG_LOG_RUN_INF("[BUILD] start check file amount/name");
if (!rst_db_files_not_changed(session, (ctrl_page_t *)bak->ctrl_data_buf)) {
OG_LOG_RUN_ERR("[BUILD] primary's log/data amount/name has changed");
if (rst_delete_track_file(session, bak, OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
OG_THROW_ERROR(ERR_INVALID_OPERATION, ": primary's log/data amount/name has changed");
return OG_ERROR;
}
OG_LOG_RUN_INF("[BUILD] check file amount/name successfully");
return OG_SUCCESS;
}
static status_t rst_build_restore(knl_session_t *session)
{
if (rst_restore_config_param(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_check_ctrl_file(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_build_receive_files(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_build_head(session) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static char *bak_get_compress_name(compress_algo_e compress)
{
switch (compress) {
case COMPRESS_NONE:
return "NONE";
case COMPRESS_ZLIB:
return "ZLIB";
case COMPRESS_ZSTD:
return "ZSTD";
case COMPRESS_LZ4:
return "LZ4";
default:
cm_assert(OG_FALSE);
return "NULL";
}
}
bool32 bak_parameter_is_valid(build_progress_t *build_progress)
{
uint32 stage = bak_get_build_stage(&build_progress->stage);
if (stage != BUILD_LOG_STAGE && stage != BUILD_DATA_STAGE) {
return OG_FALSE;
}
return OG_TRUE;
}
static status_t bak_obtain_build_progress(bak_t *bak, bak_process_t *proc, const char *file_name)
{
build_progress_t *build_progress = &bak->progress.build_progress;
int32 handle = OG_INVALID_HANDLE;
if (cm_open_file(file_name, O_BINARY | O_RDWR, &handle) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[BACKUP] failed to open %s ", file_name);
return OG_ERROR;
}
if (cm_read_file(handle, build_progress, sizeof(build_progress_t), NULL) != OG_SUCCESS) {
cm_close_file(handle);
OG_LOG_RUN_ERR("[BACKUP] failed to read %s ", file_name);
return OG_ERROR;
}
cm_close_file(handle);
OG_LOG_RUN_INF("[BUILD] write build progress [%u/%u/%llu/%u/%u/%u/%d]", build_progress->stage,
build_progress->file_id, build_progress->data_offset, build_progress->asn,
build_progress->curr_file_index, build_progress->last_file_index, build_progress->start_time);
if (!bak_parameter_is_valid(build_progress)) {
OG_LOG_RUN_ERR("[BACKUP] parameters are not effective in %s ", file_name);
return OG_ERROR;
}
bak->file_count = build_progress->curr_file_index;
return OG_SUCCESS;
}
static status_t bak_agent_send_build_ctrl(knl_session_t *session, bak_t *bak, bak_process_t *proc)
{
bak_agent_head_t head;
bak_attr_t *attr = &bak->record.attr;
build_param_ctrl_t ctrl;
build_progress_t *build_progress = &bak->progress.build_progress;
char bak_process_file[OG_FILE_NAME_BUFFER_SIZE];
errno_t ret;
int32 primary_start_time;
bool32 read_end = OG_FALSE;
int32 read_size;
head.ver = BAK_AGENT_PROTOCOL;
head.serial_number = bak->remote.serial_number++;
head.cmd = BAK_PKG_START;
head.len = sizeof(bak_agent_head_t) + sizeof(build_param_ctrl_t) + sizeof(build_progress_t);
head.flags = 0;
head.reserved = 0;
ctrl.compress = attr->compress;
ctrl.compress_level = bak->compress_ctx.compress_level;
ctrl.parallelism = bak->proc_count;
ctrl.is_increment = bak->record.is_increment;
ctrl.base_lsn = attr->base_lsn;
ctrl.is_repair = bak->record.is_repair;
ctrl.buffer_size = bak->backup_buf_size;
OG_LOG_DEBUG_INF("[BACKUP] send type %d", head.cmd);
if (bak_agent_send(bak, (char *)&head, sizeof(bak_agent_head_t)) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_agent_send(bak, (char *)&ctrl, sizeof(build_param_ctrl_t)) != OG_SUCCESS) {
return OG_ERROR;
}
ret = snprintf_s(bak_process_file, OG_FILE_NAME_BUFFER_SIZE, OG_FILE_NAME_BUFFER_SIZE - 1, "%s/bak_process_file",
session->kernel->home);
knl_securec_check_ss(ret);
if (cm_file_exist((const char *)bak_process_file)) {
if (bak_obtain_build_progress(bak, proc, bak_process_file) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", building process file is not effective.");
return OG_ERROR;
}
bak->need_check = OG_TRUE;
bak->is_first_link = OG_FALSE;
} else {
ret = memset_sp(build_progress, sizeof(build_progress_t), 0, sizeof(build_progress_t));
knl_securec_check(ret);
bak->is_first_link = OG_TRUE;
}
if (bak_agent_send(bak, (char *)build_progress, sizeof(build_progress_t)) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_agent_read(bak, (char *)&primary_start_time, sizeof(int32), &read_size, &read_end) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak->is_first_link) {
build_progress->start_time = primary_start_time;
OG_LOG_RUN_INF("[BUILD] first link [primary/standby] start time : [%d/%d]", primary_start_time,
build_progress->start_time);
} else {
if (build_progress->start_time != primary_start_time) {
OG_LOG_RUN_INF("[BUILD] [primary/standby] start time : [%d/%d]", primary_start_time,
build_progress->start_time);
OG_LOG_RUN_INF("[BUILD] primary has restart or timeout, only non-breakpoint building is allowed");
if (rst_delete_track_file(session, bak, OG_FALSE) != OG_SUCCESS) {
return OG_ERROR;
}
OG_THROW_ERROR(ERR_INVALID_OPERATION,
"primary has restart or timeout, only non-breakpoint building is allowed");
return OG_ERROR;
}
OG_LOG_RUN_INF("[BUILD] [primary/standby] start time : [%d/%d]", primary_start_time,
build_progress->start_time);
}
return OG_SUCCESS;
}
static status_t rst_build_prepare_basefiles(knl_session_t *session, bak_process_t *ogx)
{
knl_instance_t *kernel = session->kernel;
datafile_t *df = NULL;
log_file_ctrl_t *log = NULL;
uint32 i;
errno_t ret;
for (i = 0; i < OG_MAX_DATA_FILES; i++) {
ogx->datafile_size[i] = OG_INVALID_INT64;
df = &kernel->db.datafiles[i];
if (!df->ctrl->used || !DATAFILE_IS_ONLINE(df)) {
ogx->datafile_name[i][0] = '\0';
continue;
}
if (cm_open_device(df->ctrl->name, df->ctrl->type, knl_io_flag(session), &ogx->datafiles[i]) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[BUILD] failed to open %s ", df->ctrl->name);
return OG_ERROR;
}
ret = strcpy_sp(ogx->datafile_name[i], OG_FILE_NAME_BUFFER_SIZE, df->ctrl->name);
knl_securec_check(ret);
ogx->file_type[i] = df->ctrl->type;
ogx->datafile_version[i] = df->ctrl->create_version;
ogx->datafile_size[i] = cm_device_size(cm_device_type(ogx->datafile_name[i]), ogx->datafiles[i]);
if (ogx->datafile_size[i] == -1) {
OG_THROW_ERROR(ERR_SEEK_FILE, 0, SEEK_END, errno);
return OG_ERROR;
}
}
for (i = 0; i < OG_MAX_LOG_FILES; i++) {
ogx->logfile_name[i][0] = '\0';
ogx->log_type[i] = 0;
}
for (i = 0; i < MY_LOGFILE_SET(session)->logfile_hwm; i++) {
log = MY_LOGFILE_SET(session)->items[i].ctrl;
if (LOG_IS_DROPPED(log->flg)) {
continue;
}
ret = strcpy_sp(ogx->logfile_name[i], OG_FILE_NAME_BUFFER_SIZE, log->name);
knl_securec_check(ret);
ogx->log_type[i] = log->type;
}
return OG_SUCCESS;
}
static status_t bak_check_keep_alive(knl_session_t *session)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
uint32 build_keep_alive_timeout = session->kernel->attr.build_keep_alive_timeout;
bak_t *bak = &ogx->bak;
if (bak->record.attr.level != 0) {
return OG_SUCCESS;
}
bak->build_retry_time++;
if (bak->build_retry_time == build_keep_alive_timeout) {
bak->need_retry = OG_FALSE;
OG_THROW_ERROR(ERR_INVALID_OPERATION, ", please check run log for information of building timeout.");
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t bak_agent_send_diff_pages(knl_session_t *session)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
bak_agent_head_t head;
uint32 page_count = BUILD_ALY_MAX_ITEM - bak->build_aly_free_list.count;
knl_panic(page_count == bak->page_count);
OG_LOG_RUN_INF("[BUILD] start to send page list to primary, page_count: %u", page_count);
head.ver = BAK_AGENT_PROTOCOL;
head.cmd = BAK_PKG_DATA;
head.len = sizeof(bak_agent_head_t) + page_count * sizeof(page_id_t);
head.serial_number = 0;
head.flags = 0;
head.reserved = 0;
if (bak_agent_send(bak, (char *)&head, sizeof(bak_agent_head_t)) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_agent_send(bak, (char *)bak->build_aly_pages, page_count * sizeof(page_id_t)) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_agent_wait_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
return OG_ERROR;
}
OG_LOG_RUN_INF("[BUILD] send page list to primary successfully");
return OG_SUCCESS;
}
inline static void bak_brain_close_file(int32 *handle, uint32 count)
{
for (uint32 i = 0; i < count; i++) {
cm_close_file(handle[i]);
}
}
static status_t bak_brain_reset_page(knl_session_t *session)
{
int32 handles[OG_MAX_DATA_FILES];
errno_t ret = memset_sp(handles, OG_MAX_DATA_FILES * sizeof(int32), -1, OG_MAX_DATA_FILES * sizeof(int32));
knl_securec_check(ret);
build_analyse_item_t *item = NULL;
bak_t *bak = &session->kernel->backup_ctx.bak;
char *buf = session->kernel->attr.xpurpose_buf;
ret = memset_sp(buf, DEFAULT_PAGE_SIZE(session), 0, DEFAULT_PAGE_SIZE(session));
knl_securec_check(ret);
for (uint32 i = 0; i < bak->page_count; i++) {
item = &bak->build_aly_items[i];
if (item->page_id->file == OG_INVALID_FILEID) {
continue;
}
datafile_t *df = DATAFILE_GET(session, item->page_id->file);
if (cm_open_device(df->ctrl->name, df->ctrl->type, knl_io_flag(session), &handles[item->page_id->file]) !=
OG_SUCCESS) {
OG_LOG_RUN_ERR("[REPAIR] failed to open datafile %s", df->ctrl->name);
return OG_ERROR;
}
int64 offset = (int64)item->page_id->page * DEFAULT_PAGE_SIZE(session);
if (cm_write_device(df->ctrl->type, handles[item->page_id->file], offset, buf, DEFAULT_PAGE_SIZE(session)) !=
OG_SUCCESS) {
OG_LOG_RUN_ERR("[REPAIR] failed to write datafile %s", df->ctrl->name);
bak_brain_close_file(handles, OG_MAX_DATA_FILES);
return OG_ERROR;
}
OG_LOG_DEBUG_INF("[REPAIR] brain reset page %u-%u", item->page_id->file, item->page_id->page);
item->page_id->file = OG_INVALID_FILEID;
}
for (uint32 i = 0; i < OG_MAX_DATA_FILES; i++) {
if (handles[i] == OG_INVALID_HANDLE) {
continue;
}
if (db_fdatasync_file(session, handles[i]) != OG_SUCCESS) {
bak_brain_close_file(handles, OG_MAX_DATA_FILES);
OG_LOG_RUN_ERR("[REPAIR] failed to fdatasync datafile");
return OG_ERROR;
}
}
bak_brain_close_file(handles, OG_MAX_DATA_FILES);
return OG_SUCCESS;
}
static status_t bak_build_from_remote(knl_session_t *session)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
bak_t *bak = &ogx->bak;
bak_process_t *proc = &ogx->process[BAK_COMMON_PROC];
if (bak_check_keep_alive(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_build_start(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_agent_send_build_ctrl(session, bak, proc) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak->record.is_repair && bak_agent_send_diff_pages(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak->record.attr.level > 0) {
if (rst_build_prepare_basefiles(session, &ogx->process[BAK_COMMON_PROC]) != OG_SUCCESS) {
return OG_ERROR;
}
}
if (rst_build_restore(session) != OG_SUCCESS) {
if (!bak->need_retry) {
OG_LOG_RUN_INF("[RESTORE] restore failed, no more retry");
(void)bak_agent_send_pkg(bak, BAK_PKG_ACK);
}
return OG_ERROR;
}
if (bak_agent_send_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
return OG_ERROR;
}
rst_wait_write_end(bak);
if (bak->record.is_repair) {
if (bak_brain_reset_page(session) != OG_SUCCESS) {
return OG_ERROR;
}
}
return bak->failed ? OG_ERROR : OG_SUCCESS;
}
static status_t rst_build_clean_arch(knl_session_t *session)
{
uint32 arch_num = 0;
uint32 clean_locator = 0;
arch_ctrl_t *arch_ctrl = NULL;
arch_proc_context_t *proc_ctx = &session->kernel->arch_ctx.arch_proc[0];
dtc_node_ctrl_t *node = dtc_my_ctrl(session);
arch_get_files_num(session, ARCH_DEFAULT_DEST, session->kernel->id, &arch_num);
for (uint32 i = 0; i < arch_num; i++) {
clean_locator = (node->archived_start + i) % OG_MAX_ARCH_NUM;
arch_ctrl = db_get_arch_ctrl(session, clean_locator, session->kernel->id);
if (arch_ctrl->recid == 0) {
continue;
}
if (!cm_exist_device(cm_device_type(arch_ctrl->name), arch_ctrl->name)) {
OG_LOG_RUN_INF("[RESTORE] archive file %s does not exist", arch_ctrl->name);
} else {
if (cm_remove_device(cm_device_type(arch_ctrl->name), arch_ctrl->name) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[RESTORE] Failed to remove archive file %s", arch_ctrl->name);
return OG_ERROR;
}
}
arch_ctrl->recid = 0;
}
proc_ctx->curr_arch_size = 0;
return OG_SUCCESS;
}
static status_t bak_build_prepare_attr(knl_session_t *session, build_param_ctrl_t *ctrl)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
bak_attr_t *attr = &bak->record.attr;
attr->compress = ctrl->compress;
attr->level = (ctrl->is_increment || ctrl->is_repair) ? OG_TRUE : OG_FALSE;
bak->compress_ctx.compress_level = ctrl->compress_level;
bak->record.is_repair = ctrl->is_repair;
bak->record.is_increment = ctrl->is_increment;
bak->proc_count = (ctrl->parallelism == 0) ? BUILD_DEFAULT_PARALLELISM : ctrl->parallelism;
bak->backup_buf_size = ctrl->buffer_size;
if (ctrl->base_lsn != OG_INVALID_LSN) {
attr->base_lsn = ctrl->base_lsn;
return OG_SUCCESS;
}
if (ctrl->is_increment) {
if (rst_build_get_base_lsn(session, &ctrl->base_lsn) != OG_SUCCESS) {
return OG_ERROR;
}
if (rst_build_clean_arch(session) != OG_SUCCESS) {
return OG_ERROR;
}
} else if (ctrl->is_repair) {
if (build_analyze_mem_init(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (build_repair_prepare(session, &ctrl->base_lsn) != OG_SUCCESS) {
build_analyze_mem_free(session);
return OG_ERROR;
}
if (rst_build_clean_arch(session) != OG_SUCCESS) {
build_analyze_mem_free(session);
return OG_ERROR;
}
}
attr->base_lsn = ctrl->base_lsn;
return OG_SUCCESS;
}
status_t bak_build_restore(knl_session_t *session, build_param_ctrl_t *ctrl)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
lrcv_context_t *lrcv_ctx = &session->kernel->lrcv_ctx;
bak_t *bak = &ogx->bak;
OG_LOG_RUN_INF("[BUILD] build start, compress method %s, compress level %u, is increment %u, is repair %u, "
"buffer size %uM", bak_get_compress_name(ctrl->compress), ctrl->compress_level, ctrl->is_increment,
ctrl->is_repair, ctrl->buffer_size / SIZE_M(1));
bak->need_retry = OG_FALSE;
if (lrcv_ctx->thread.closed) {
OG_THROW_ERROR(ERR_LRCV_NOT_READY);
return OG_ERROR;
}
if (bak_set_running(session, ogx) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_BACKUP_IN_PROGRESS, "restore");
return OG_ERROR;
}
if (bak_build_prepare_attr(session, ctrl) != OG_SUCCESS) {
bak_unset_running(session, ogx);
return OG_ERROR;
}
status_t status = bak_build_from_remote(session);
if (status != OG_SUCCESS) {
bak->failed = OG_TRUE;
}
if (!bak->need_retry) {
OG_LOG_RUN_INF("[BUILD] backup remote finished, failed :%d", bak->failed);
}
if (bak->error_info.err_code != ERR_BACKUP_IN_PROGRESS && !bak->need_retry) {
build_analyze_mem_free(session);
}
bak_build_end(session);
return status;
}
static void bak_send_error(bak_t *bak, cs_pipe_t *pipe, int32 max_pkg_size)
{
bak_agent_head_t head;
int32 err_code;
const char *error_msg = NULL;
int32 receive_size;
if (!bak->build_stopped && bak->need_retry) {
return;
}
head.ver = BAK_AGENT_PROTOCOL;
head.cmd = BAK_PKG_ERROR;
if (bak->build_stopped) {
err_code = ERR_BUILD_CANCELLED;
error_msg = "Build has been cancelled";
} else {
cm_get_error(&err_code, &error_msg, NULL);
}
head.len = (uint32)(sizeof(bak_agent_head_t) + sizeof(int32) + strlen(error_msg) + 1);
OG_LOG_RUN_WAR("[BACKUP] build failed, error:%d, %s", err_code, error_msg);
if (cs_write_stream(pipe, (char *)&head, sizeof(bak_agent_head_t), max_pkg_size) != OG_SUCCESS) {
return;
}
if (cs_write_stream(pipe, (char *)&err_code, sizeof(int32), max_pkg_size) != OG_SUCCESS) {
return;
}
if (cs_write_stream(pipe, (char *)error_msg, (uint32)strlen(error_msg) + 1, max_pkg_size) != OG_SUCCESS) {
return;
}
(void)cs_read_stream(pipe, (char *)&head, OG_NETWORK_IO_TIMEOUT, sizeof(bak_agent_head_t), (int32 *)&receive_size);
}
static status_t bak_build_wait_start(knl_session_t *session, cs_pipe_t *pipe, build_param_ctrl_t *ctrl,
build_progress_t *build_progress)
{
bak_agent_head_t head;
int32 receive_size;
if (cs_read_stream(pipe, (char *)&head, OG_NETWORK_IO_TIMEOUT, sizeof(bak_agent_head_t),
(int32 *)&receive_size) != OG_SUCCESS) {
return OG_ERROR;
}
if (receive_size != sizeof(bak_agent_head_t)) {
OG_THROW_ERROR(ERR_TCP_TIMEOUT_REMAIN, (int32)(sizeof(bak_agent_head_t) - receive_size));
return OG_ERROR;
}
if (head.cmd != BAK_PKG_START) {
OG_THROW_ERROR(ERR_NOT_EXPECTED_BACKUP_PACKET, BAK_PKG_START, head.cmd);
return OG_ERROR;
}
if (cs_read_stream(pipe, (char *)ctrl, OG_NETWORK_IO_TIMEOUT, sizeof(build_param_ctrl_t),
(int32 *)&receive_size) != OG_SUCCESS) {
return OG_ERROR;
}
if (receive_size != sizeof(build_param_ctrl_t)) {
OG_THROW_ERROR(ERR_TCP_TIMEOUT_REMAIN, (int32)(sizeof(build_param_ctrl_t) - receive_size));
return OG_ERROR;
}
if (cs_read_stream(pipe, (char *)build_progress, OG_NETWORK_IO_TIMEOUT, sizeof(build_progress_t),
(int32 *)&receive_size) != OG_SUCCESS) {
return OG_ERROR;
}
OG_LOG_RUN_INF("[BUILD] received standby build progress [%u/%u/%llu/%u/%u/%u/%d]", build_progress->stage,
build_progress->file_id, build_progress->data_offset, build_progress->asn, build_progress->curr_file_index,
build_progress->last_file_index, build_progress->start_time);
if (receive_size != sizeof(build_progress_t)) {
OG_THROW_ERROR(ERR_TCP_TIMEOUT_REMAIN, (int32)(sizeof(build_progress_t) - receive_size));
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t bak_set_break_point_params(knl_session_t *session, bak_context_t *ogx, build_progress_t *build_progress)
{
build_progress_t *local_build_progress = &ogx->bak.progress.build_progress;
int32 temp_time_start;
errno_t ret;
switch (build_progress->stage) {
case BACKUP_START:
ogx->bak.is_first_link = OG_TRUE;
ogx->bak.need_check = OG_FALSE;
ret = memset_sp(local_build_progress, sizeof(build_progress_t), 0, sizeof(build_progress_t));
knl_securec_check(ret);
local_build_progress->start_time = (int32)session->kernel->scn;
break;
case BACKUP_DATA_STAGE:
case BACKUP_LOG_STAGE:
case BACKUP_HEAD_STAGE:
ogx->bak.is_first_link = OG_FALSE;
ogx->bak.need_check = OG_TRUE;
ogx->bak.send_buf.buf_size = 0;
temp_time_start = local_build_progress->start_time;
*local_build_progress = *build_progress;
local_build_progress->start_time = temp_time_start;
ogx->bak.file_count = build_progress->curr_file_index;
ogx->bak.curr_file_index = build_progress->curr_file_index;
ogx->bak.progress.stage = build_progress->stage;
break;
default:
OG_LOG_RUN_ERR("[BUILD] primary DB received unexpected build process stage, %u",
ogx->bak.progress.build_progress.stage);
return OG_ERROR;
}
OG_LOG_RUN_INF("[BUILD] received standby build progress [%u/%u/%llu/%u/%u/%u/%d]", build_progress->stage,
build_progress->file_id, build_progress->data_offset, build_progress->asn,
build_progress->curr_file_index, build_progress->last_file_index, build_progress->start_time);
return OG_SUCCESS;
}
static status_t bak_get_page_list(knl_session_t *session)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
uint32 read_size = 0;
bool32 read_end = OG_FALSE;
bak->remote.remain_data_size = 0;
OG_LOG_RUN_INF("[BUILD] start to get page list from standby");
if (rst_agent_read_head(bak, BAK_PKG_DATA, &read_size, &read_end) != OG_SUCCESS) {
return OG_ERROR;
}
if (read_size > BUILD_ALY_MAX_PAGE_SIZE) {
OG_LOG_RUN_ERR("[BUILD] get page list from standby failed, received size: %u, max size: %llu", read_size,
BUILD_ALY_MAX_PAGE_SIZE);
return OG_ERROR;
}
if (bak_agent_recv(bak, (char *)bak->build_aly_pages, (int32)read_size) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_agent_send_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
return OG_ERROR;
}
bak->page_count = read_size / sizeof(page_id_t);
build_analyze_construct_hash(session);
OG_LOG_RUN_INF("[BUILD] get page list from standby successfully, page count: %u", bak->page_count);
return OG_SUCCESS;
}
status_t bak_build_backup(knl_session_t *session, cs_pipe_t *pipe, cs_packet_t *send_pack, cs_packet_t *recv_pack)
{
bak_context_t *ogx = &session->kernel->backup_ctx;
build_param_ctrl_t ctrl;
bak_t *bak = &ogx->bak;
build_progress_t *local_build_progress = &ogx->bak.progress.build_progress;
build_progress_t build_progress;
if (bak_build_wait_start(session, pipe, &ctrl, &build_progress) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_precheck(session) != OG_SUCCESS) {
bak_send_error(bak, pipe, (int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
return OG_ERROR;
}
if (bak_set_build_running(session, ogx, &build_progress) != OG_SUCCESS) {
bak_send_error(bak, pipe, (int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
return OG_ERROR;
}
bak_set_build_params(&ogx->bak, pipe, send_pack, recv_pack, &ctrl);
if (bak_set_break_point_params(session, ogx, &build_progress) != OG_SUCCESS) {
bak_unset_build_running(session, ogx);
bak_send_error(bak, pipe, (int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
return OG_ERROR;
}
if (bak_agent_write(bak, (char *)&local_build_progress->start_time, sizeof(int32)) != OG_SUCCESS) {
bak_unset_build_running(session, ogx);
bak_send_error(bak, pipe, (int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
return OG_ERROR;
}
if (ctrl.is_repair) {
if (build_analyze_mem_init(session) != OG_SUCCESS) {
bak_unset_build_running(session, ogx);
bak_send_error(bak, pipe, (int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
return OG_ERROR;
}
if (bak_get_page_list(session) != OG_SUCCESS) {
bak_unset_build_running(session, ogx);
build_analyze_mem_free(session);
bak_send_error(bak, pipe, (int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
return OG_ERROR;
}
}
status_t status = bak_backup_proc(session);
if (ctrl.is_repair) {
build_analyze_mem_free(session);
}
if (status != OG_SUCCESS) {
bak_send_error(bak, pipe, (int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
}
return status;
}
void bak_stream_read_prepare(knl_session_t *session, bak_process_t *process, datafile_t *datafile, uint32 sec_id)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
bak_assignment_t *assign_ctrl = &process->assign_ctrl;
bak_ctrl_t *ctrl = &process->ctrl;
errno_t ret;
ret = strcpy_sp(ctrl->name, OG_FILE_NAME_BUFFER_SIZE, datafile->ctrl->name);
knl_securec_check(ret);
ctrl->type = datafile->ctrl->type;
ctrl->offset = DEFAULT_PAGE_SIZE(session);
assign_ctrl->start = ctrl->offset;
assign_ctrl->end = ctrl->offset;
assign_ctrl->file_id = datafile->ctrl->id;
assign_ctrl->sec_id = 0;
bak_record_new_file(bak, BACKUP_DATA_FILE, assign_ctrl->file_id, sec_id, 0, OG_FALSE, 0, 0);
}
status_t bak_send_stream_data(knl_session_t *session, bak_t *bak, bak_assignment_t *assign_ctrl)
{
bak_process_t *proc = &session->kernel->backup_ctx.process[BAK_COMMON_PROC];
bak_stream_buf_t *stream = &bak->send_stream;
date_t start;
bool32 file_end = OG_FALSE;
uint32 send_size;
uint64 last_read_offset = 0;
uint64 curr_read_offset = 0;
char *send_buf = NULL;
for (;;) {
if (bak_check_session_status(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (!cm_spin_try_lock(&stream->lock)) {
cm_spin_sleep();
continue;
}
file_end = (stream->read_offset == assign_ctrl->file_size);
if (stream->data_size[stream->wid] == 0 && !file_end) {
cm_spin_unlock(&stream->lock);
cm_spin_sleep();
continue;
}
curr_read_offset = stream->read_offset;
stream->wid = !stream->wid;
stream->fid = !stream->fid;
stream->data_size[stream->wid] = 0;
cm_spin_unlock(&stream->lock);
send_size = stream->data_size[stream->fid];
send_buf = stream->bufs[stream->fid].aligned_buf;
if (send_size > 0) {
start = g_timer()->now;
if (bak_agent_write(bak, send_buf, send_size) != OG_SUCCESS) {
return OG_ERROR;
}
proc->stat.write_time += (g_timer()->now - start);
proc->stat.write_size += send_size;
}
bak_update_progress(bak, (curr_read_offset - last_read_offset));
last_read_offset = curr_read_offset;
if (file_end) {
break;
}
}
return OG_SUCCESS;
}
void bak_init_send_stream(bak_t *bak, uint32 start, uint64 filesize, uint32 file_id)
{
bak_stream_buf_t *stream_buf = &bak->send_stream;
bak_read_cursor_t *cursor = &bak->read_cursor;
stream_buf->curr_block_id = 0;
stream_buf->read_offset = start;
stream_buf->bakfile_size = 0;
stream_buf->wid = 0;
stream_buf->fid = 1;
stream_buf->data_size[0] = 0;
stream_buf->data_size[1] = 0;
cursor->block_id = 0;
cursor->file_id = file_id;
cursor->file_size = filesize;
cursor->offset = start;
cursor->read_size = 0;
}
status_t bak_stream_send_end(bak_t *bak, bak_stream_buf_t *stream_buf)
{
bak->backup_size = stream_buf->bakfile_size;
if (bak_agent_send_pkg(bak, BAK_PKG_FILE_END) != OG_SUCCESS) {
return OG_ERROR;
}
if (bak_agent_wait_pkg(bak, BAK_PKG_ACK) != OG_SUCCESS) {
return OG_ERROR;
}
knl_panic(bak->files[bak->curr_file_index].type != BACKUP_HEAD_FILE);
knl_panic(bak->curr_file_index < BAK_MAX_FILE_NUM);
bak->files[bak->curr_file_index].size = bak->backup_size;
bak->files[bak->curr_file_index].sec_start = 0;
bak->files[bak->curr_file_index].sec_end = 0;
bak->curr_file_index++;
return OG_SUCCESS;
}
static void rst_assign_uds_restore_task(knl_session_t *session, bak_process_t *proc)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
uint32 bak_index = bak->file_count - 1;
bak_file_t *bak_file = &bak->files[bak_index];
bak_assignment_t *assign_ctrl = &proc->assign_ctrl;
knl_panic(proc->is_free);
assign_ctrl->task = bak->is_building ? BAK_BUILD_RESTORE_TASK : BAK_STREAM_RESTORE_TASK;
bak_file->sec_start = assign_ctrl->section_start;
bak_file->sec_end = assign_ctrl->section_end;
for (uint32 j = 0; j < OG_MAX_DATA_FILES; j++) {
knl_panic(proc->datafiles[j] == OG_INVALID_HANDLE);
}
CM_MFENCE;
proc->is_free = OG_FALSE;
}
void rst_assign_stream_restore_task(knl_session_t *session, bak_ctrl_t *init_ctrl)
{
bak_context_t *backup_ctx = &session->kernel->backup_ctx;
bak_t *bak = &backup_ctx->bak;
bak_assignment_t *assign_ctrl = NULL;
bak_process_t *proc = NULL;
bak_ctrl_t *ctrl = NULL;
uint32 bak_index = bak->curr_file_index;
for (uint32 id = 0; id < bak->proc_count; id++) {
proc = &backup_ctx->process[id + 1];
knl_panic(proc->is_free);
ctrl = &proc->ctrl;
*ctrl = *init_ctrl;
ctrl->handle = OG_INVALID_HANDLE;
assign_ctrl = &proc->assign_ctrl;
assign_ctrl->sec_id = 0;
assign_ctrl->start = 0;
assign_ctrl->end = 0;
assign_ctrl->section_start = 0;
assign_ctrl->section_end = 0;
assign_ctrl->bak_index = bak_index;
assign_ctrl->bak_file.handle = bak->local.handle;
assign_ctrl->type = bak->files[bak_index].type;
rst_assign_uds_restore_task(session, proc);
}
}
void rst_init_recv_stream(bak_t *bak)
{
rst_stream_buf_t *recv_stream = &bak->recv_stream;
recv_stream->prev_block = 0;
recv_stream->curr_block_offset = 0;
recv_stream->wid = 0;
recv_stream->fid = 1;
recv_stream->usable_size[0] = 0;
recv_stream->usable_size[1] = 0;
recv_stream->recv_size[0] = 0;
recv_stream->recv_size[1] = 0;
recv_stream->curr_file_tail = 0;
recv_stream->is_eof = OG_FALSE;
}
status_t rst_stream_read_prepare(knl_session_t *session, rst_stream_buf_t *recv_stream, log_file_head_t *head,
bool32 *ignore_logfile)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
bak_process_t *common_proc = &session->kernel->backup_ctx.process[BAK_COMMON_PROC];
uint32 bak_index = bak->curr_file_index;
uint32 file_id = bak->files[bak_index].id;
bool32 ignore_data = OG_FALSE;
if (bak->files[bak_index].type == BACKUP_DATA_FILE) {
recv_stream->base_filesize = common_proc->datafile_size[file_id];
} else {
int32 read_size;
bool32 log_end = OG_FALSE;
if (rst_agent_read(bak, (char *)head, sizeof(log_file_head_t), &read_size, &log_end) != OG_SUCCESS) {
return OG_ERROR;
}
if (read_size < sizeof(log_file_head_t)) {
OG_LOG_RUN_ERR("[RESTOR] failed to read log file head, read size %d less than %u", read_size,
(uint32)sizeof(log_file_head_t));
return OG_ERROR;
}
if (log_verify_head_checksum(session, head, "") != OG_SUCCESS) {
return OG_ERROR;
}
uint32 fill_size = CM_CALC_ALIGN(sizeof(log_file_head_t), (uint32)head->block_size) - sizeof(log_file_head_t);
if (rst_agent_read(bak, ((char *)head) + sizeof(log_file_head_t), fill_size, &read_size, &log_end) !=
OG_SUCCESS) {
return OG_ERROR;
}
if (read_size != fill_size) {
OG_LOG_RUN_ERR("[RESTOR] failed to read log file head, read size %d less than %u", read_size, fill_size);
return OG_ERROR;
}
common_proc->ctrl.handle = OG_INVALID_HANDLE;
if (rst_set_logfile_ctrl(session, bak->curr_file_index, head, &common_proc->ctrl, &ignore_data) != OG_SUCCESS) {
return OG_ERROR;
}
if (!ignore_data) {
if (rst_write_data(session, &common_proc->ctrl, (const char *)head, fill_size + sizeof(log_file_head_t)) !=
OG_SUCCESS) {
cm_close_device(common_proc->ctrl.type, &common_proc->ctrl.handle);
return OG_ERROR;
}
}
cm_close_device(common_proc->ctrl.type, &common_proc->ctrl.handle);
bak_update_progress(bak, (uint64)(fill_size + sizeof(log_file_head_t)));
}
*ignore_logfile = ignore_data;
return OG_SUCCESS;
}
* recieve buffer tail may be incomplete block, need set uasble size
*/
static status_t rst_stream_aligned_blocks(rst_stream_buf_t *stream, uint32 recv_size, bool32 ignore_data)
{
knl_panic(recv_size <= stream->buf_size);
if (ignore_data || recv_size == 0) {
stream->usable_size[stream->fid] = 0;
stream->recv_size[stream->fid] = 0;
return OG_SUCCESS;
}
uint32 usable_size = 0;
char *buf = stream->bufs[stream->fid].aligned_buf;
bak_block_head_t *block = (bak_block_head_t *)(buf + usable_size);
while (block->block_size + usable_size <= recv_size) {
if (block->magic_num != LOG_MAGIC_NUMBER) {
OG_LOG_RUN_ERR("[RESTOR] invalid received block with id %u, size is %u", block->block_id,
block->block_size);
return OG_ERROR;
}
usable_size += block->block_size;
if (sizeof(bak_block_head_t) + usable_size > recv_size) {
break;
}
block = (bak_block_head_t *)(buf + usable_size);
}
if (usable_size < sizeof(bak_block_head_t)) {
OG_LOG_RUN_ERR("[RESTOR] invalid received block with id %u, size is %u", block->block_id, block->block_size);
return OG_ERROR;
}
knl_panic(usable_size <= recv_size);
stream->usable_size[stream->fid] = usable_size;
stream->recv_size[stream->fid] = recv_size;
return OG_SUCCESS;
}
static uint32 rst_fix_incomplete_block(rst_stream_buf_t *stream)
{
char *curr_recv_buf = stream->bufs[stream->fid].aligned_buf;
char *prev_buf_tail = stream->bufs[stream->wid].aligned_buf + stream->usable_size[stream->wid];
uint32 tail_size = stream->recv_size[stream->wid] - stream->usable_size[stream->wid];
errno_t ret;
if (tail_size > 0) {
ret = memmove_s(curr_recv_buf, stream->buf_size, prev_buf_tail, tail_size);
knl_securec_check(ret);
}
return tail_size;
}
static status_t rst_stream_read_internal(knl_session_t *session, char *buf, uint32 buf_size, int32 *recv_size,
bool32 *file_end)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
bak_process_t *proc = &session->kernel->backup_ctx.process[BAK_COMMON_PROC];
date_t start = g_timer()->now;
if (rst_agent_read(bak, buf, buf_size, recv_size, file_end) != OG_SUCCESS) {
return OG_ERROR;
}
proc->stat.read_time += (g_timer()->now - start);
proc->stat.read_size += (*recv_size);
bak_update_progress(bak, (uint64)(*recv_size));
return OG_SUCCESS;
}
status_t rst_recv_stream_data(knl_session_t *session, bool32 ignore_data)
{
bak_t *bak = &session->kernel->backup_ctx.bak;
rst_stream_buf_t *stream = &bak->recv_stream;
char *recv_buf = NULL;
bool32 file_end = OG_FALSE;
uint32 last_tail_size;
int32 recv_size = 0;
for (;;) {
if (bak_check_session_status(session) != OG_SUCCESS) {
return OG_ERROR;
}
if (!cm_spin_try_lock(&stream->lock)) {
cm_spin_sleep();
continue;
}
if (stream->usable_size[stream->wid] > stream->curr_block_offset) {
cm_spin_unlock(&stream->lock);
cm_spin_sleep();
continue;
}
knl_panic(stream->usable_size[stream->wid] == stream->curr_block_offset);
stream->wid = !stream->wid;
stream->fid = !stream->fid;
stream->curr_block_offset = 0;
recv_buf = stream->bufs[stream->fid].aligned_buf;
if (file_end) {
knl_panic(stream->recv_size[stream->wid] == stream->usable_size[stream->wid]);
stream->is_eof = OG_TRUE;
cm_spin_unlock(&stream->lock);
break;
}
cm_spin_unlock(&stream->lock);
last_tail_size = rst_fix_incomplete_block(stream);
if (rst_stream_read_internal(session, recv_buf + last_tail_size, stream->buf_size - last_tail_size, &recv_size,
&file_end) != OG_SUCCESS) {
return OG_ERROR;
}
OG_LOG_DEBUG_INF("[BACKUP] read size :%u, last block tail size %u", recv_size, last_tail_size);
recv_size += last_tail_size;
if (rst_stream_aligned_blocks(stream, (uint32)recv_size, ignore_data) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}