* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* repl_log_recv.c
*
*
* IDENTIFICATION
* src/kernel/replication/repl_log_recv.c
*
* -------------------------------------------------------------------------
*/
#include "knl_replication_module.h"
#include "repl_log_recv.h"
#include "cm_file.h"
#include "knl_context.h"
#include "knl_abr.h"
#include "dtc_database.h"
#define SUSPEND_INTERVAL 1000
#define EMPTY_LOG_POINT(pt) (((pt).rst_id == 0) && ((pt).asn == 0) && ((pt).offset == 0) && ((pt).lfn == 0))
#define LRCV_IS_RUNING(lrcv) ((lrcv)->session != NULL)
#define LRCV_INVALID_DBID(peer_dbid, local_dbid) ((peer_dbid) != 0 && (peer_dbid) != (local_dbid))
static inline void lrcv_disconnect(lrcv_context_t *lrcv)
{
cs_disconnect(lrcv->pipe);
}
static void lrcv_reset_bak_task(lrcv_context_t *lrcv)
{
rep_bak_task_t *task = &lrcv->task;
if (task->status == BAK_TASK_DONE) {
return;
}
if (task->status == BAK_TASK_WAIT_PROCESS) {
lrcv->task.error_no = ERR_SEND_RECORD_REQ_FAILED;
} else {
lrcv->task.error_no = ERR_RECORD_BACKUP_FAILED;
}
task->status = BAK_TASK_DONE;
}
static void lrcv_set_conn_err(lrcv_context_t *lrcv)
{
lrcv_disconnect(lrcv);
lrcv_reset_bak_task(lrcv);
}
static bool32 lrcv_need_suspend(lrcv_context_t *lrcv)
{
database_t *db = &lrcv->session->kernel->db;
if (lrcv->session->kernel->lftc_client_ctx.arch_lost) {
lrcv->status = LRCV_NEED_REPAIR;
return OG_TRUE;
}
if (lrcv->session->kernel->rcy_ctx.log_decrypt_failed) {
lrcv->status = LRCV_NEED_REPAIR;
return OG_TRUE;
}
if (lrcv->status == LRCV_NEED_REPAIR && DB_IS_PHYSICAL_STANDBY(db) && lrcv->peer_role == PEER_PRIMARY) {
return OG_TRUE;
}
return OG_FALSE;
}
static bool32 lrcv_rcv_msg_is_valid(lrcv_context_t *lrcv)
{
char *extend_buf = lrcv->extend_buf.read_buf.aligned_buf;
log_context_t *redo_ctx = &lrcv->session->kernel->redo_ctx;
switch (lrcv->header.type) {
case REP_BATCH_REQ: {
rep_batch_req_t *req = (rep_batch_req_t *)extend_buf;
if (log_point_is_invalid(&req->log_point) || log_point_is_invalid(&req->curr_point) ||
req->compress_alg < COMPRESS_NONE || req->compress_alg > COMPRESS_LZ4 ||
req->log_file_id >= redo_ctx->logfile_hwm) {
return OG_FALSE;
}
break;
}
case REP_QUERY_STATUS_REQ: {
rep_query_status_req_t *query_req = (rep_query_status_req_t *)extend_buf;
if (log_point_is_invalid(&query_req->curr_point)) {
return OG_FALSE;
}
break;
}
case REP_SWITCH_RESP: {
rep_switch_resp_t *switch_resp = (rep_switch_resp_t *)extend_buf;
if (switch_resp->state < REP_STATE_NORMAL || switch_resp->state > REP_STATE_REJECTED) {
return OG_FALSE;
}
break;
}
case REP_ABR_REQ: {
rep_abr_req_t *abr_req = (rep_abr_req_t *)extend_buf;
if (abr_req->blk_size != lrcv->session->kernel->attr.page_size) {
return OG_FALSE;
}
break;
}
case REP_HEART_BEAT_REQ:
case REP_RECORD_BACKUPSET_RESP: {
break;
}
default: {
return OG_FALSE;
}
}
return OG_TRUE;
}
static bool32 lrcv_need_exit(lrcv_context_t *lrcv)
{
database_t *db = &lrcv->session->kernel->db;
if (lrcv->session->killed) {
OG_LOG_RUN_INF("Log receiver thread has been killed");
return OG_TRUE;
}
if (db->status >= DB_STATUS_MOUNT && DB_IS_PRIMARY(db)) {
OG_LOG_RUN_INF("[Log Receiver] database role is primary, database status is %s, thread will exit normally",
db_get_status(lrcv->session));
return OG_TRUE;
}
return OG_FALSE;
}
static status_t lrcv_flush_log(lrcv_context_t *lrcv, log_point_t *log_point, void *batch, uint32 size)
{
log_context_t *log_ctx = &lrcv->session->kernel->redo_ctx;
log_file_t *file = &log_ctx->files[log_ctx->curr_file];
uint64 offset = (uint64)log_point->block_id * file->ctrl->block_size;
uint32 space_size = CM_CALC_ALIGN(size, file->ctrl->block_size);
log_batch_t *batch_x = (log_batch_t *)batch;
log_batch_tail_t *tail = (log_batch_tail_t *)((char *)batch + batch_x->size - sizeof(log_batch_tail_t));
if (cm_write_device(file->ctrl->type, file->handle, offset, batch, space_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to write log into %s[%u] with log size %u at point [%u-%u/%llu]",
file->ctrl->name, log_ctx->curr_file, space_size,
file->head.rst_id, file->head.asn, file->head.write_pos);
return OG_ERROR;
}
OG_LOG_DEBUG_INF("[Log Receiver] Write space size %u into log file[%u] at point [%u-%u/%llu] to %llu "
"size %u head [%llx/%llu/%llu] tail [%llx/%llu]",
space_size, log_ctx->curr_file, file->head.rst_id, file->head.asn,
file->head.write_pos, offset + space_size, batch_x->size, batch_x->head.magic_num,
(uint64)batch_x->head.point.lfn, batch_x->raft_index, tail->magic_num, (uint64)tail->point.lfn);
lrcv->session->kernel->lfn = log_point->lfn;
file->head.write_pos = offset + space_size;
file->head.last = batch_x->scn;
if (file->head.first == OG_INVALID_ID64) {
file->head.first = batch_x->scn;
log_flush_head(lrcv->session, file);
}
log_point->block_id = (uint32)(file->head.write_pos / file->ctrl->block_size);
log_ctx->free_size -= space_size;
return OG_SUCCESS;
}
static status_t lrcv_process_heart_beat(knl_session_t *session)
{
lrcv_context_t *lrcv = &session->kernel->lrcv_ctx;
char *buf = lrcv->send_buf.read_buf.aligned_buf;
rep_msg_header_t *rep_msg_header = (rep_msg_header_t *)buf;
rep_hb_resp_t *rep_hb_resp = (rep_hb_resp_t *)(buf + sizeof(rep_msg_header_t));
status_t status;
rep_msg_header->size = sizeof(rep_msg_header_t) + sizeof(rep_hb_resp_t);
rep_msg_header->type = REP_HEART_BEAT_RESP;
rep_hb_resp->flush_point = lrcv->flush_point;
rep_hb_resp->rcy_point = lrcv->session->kernel->redo_ctx.curr_replay_point;
rep_hb_resp->replay_lsn = (uint64)lrcv->session->kernel->lsn;
rep_hb_resp->flush_scn = lrcv->flush_scn;
rep_hb_resp->current_scn = DB_CURR_SCN(lrcv->session);
rep_hb_resp->contflush_point.rst_id = lrcv->contflush_point.rst_id;
rep_hb_resp->contflush_point.asn = lrcv->contflush_point.asn;
status = cs_write_stream(lrcv->pipe, buf, rep_msg_header->size,
(int32)cm_atomic_get(&session->kernel->attr.repl_pkg_size));
if (status != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to send heart beat response message to primary");
}
return status;
}
static status_t lrcv_fetch_archived_log(lrcv_context_t *lrcv, log_file_t *file, bool32 need_wait,
char *arch_name, uint32 arch_name_buf_size)
{
lftc_task_handle_t lftc_handle;
bool32 lftc_done = OG_FALSE;
time_t last_send_time = cm_current_time();
uint32 rst_id = file->head.rst_id;
uint32 asn = file->head.asn;
if (arch_get_archived_log_name(lrcv->session, rst_id, asn, ARCH_DEFAULT_DEST, arch_name, arch_name_buf_size,
lrcv->session->kernel->id)) {
return OG_SUCCESS;
}
arch_set_archive_log_name(lrcv->session, rst_id, asn, ARCH_DEFAULT_DEST, arch_name, arch_name_buf_size,
lrcv->session->kernel->id);
OG_LOG_RUN_INF("[Log Receiver] Archive log %s for node %d not found", arch_name, lrcv->session->kernel->id);
if (cm_file_exist(arch_name)) {
return OG_SUCCESS;
}
if (lftc_clt_create_task(lrcv->session, rst_id, asn, arch_name, &lftc_handle) != OG_SUCCESS) {
return OG_ERROR;
}
for (;;) {
if (lrcv_need_exit(lrcv) || lrcv_need_suspend(lrcv)) {
return OG_ERROR;
}
if (need_wait) {
time_t now = cm_current_time();
if ((now - last_send_time) >= REPL_HEART_BEAT_CHECK) {
if (lrcv_process_heart_beat(lrcv->session) != OG_SUCCESS) {
lrcv_set_conn_err(lrcv);
return OG_ERROR;
}
last_send_time = now;
}
}
if (lftc_clt_task_running(lrcv->session, &lftc_handle, &lftc_done)) {
cm_sleep(100);
continue;
}
if (lftc_done) {
return OG_SUCCESS;
}
if (!need_wait) {
return OG_ERROR;
}
cm_sleep(1000);
arch_set_archive_log_name(lrcv->session, rst_id, asn, ARCH_DEFAULT_DEST, arch_name, arch_name_buf_size,
lrcv->session->kernel->id);
if (lftc_clt_create_task(lrcv->session, rst_id, asn, arch_name, &lftc_handle) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t lrcv_verify_checksum_batch(knl_session_t *session, log_batch_t *batch_input, uint32 size, log_point_t
*log_point)
{
uint32 left_size = size;
log_batch_t *batch = batch_input;
if (batch == NULL) {
OG_LOG_RUN_ERR("[Log Receiver] batch is NULL!");
return OG_ERROR;
}
log_batch_tail_t *tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));
while (left_size >= sizeof(log_batch_t)) {
if (!rcy_validate_batch(batch, tail)) {
OG_LOG_RUN_ERR("[Log Receiver] invalid received batch with lfn %llu, size is [%u/%u]",
(uint64)batch->head.point.lfn, left_size, size);
return OG_ERROR;
}
if (rcy_verify_checksum(session, batch) != OG_SUCCESS) {
return OG_ERROR;
}
log_point->lfn = batch->head.point.lfn;
if (left_size <= batch->space_size) {
break;
}
left_size -= batch->space_size;
batch = (log_batch_t *)((char *)batch + batch->space_size);
tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));
}
return OG_SUCCESS;
}
static inline void lrcv_set_switch_wait(lrcv_context_t *lrcv, rep_batch_req_t *req)
{
lrcv->wait_info.waiting = OG_TRUE;
lrcv->wait_info.wait_point = req->log_point;
lrcv->wait_info.wait_point.block_id = 0;
lrcv->wait_info.file_id = req->log_file_id;
}
static void lrcv_reset_switch_wait(lrcv_context_t *lrcv)
{
log_context_t *log = &lrcv->session->kernel->redo_ctx;
log_file_t *file = &log->files[log->curr_file];
log_point_t *wait_point = &lrcv->wait_info.wait_point;
log_lock_logfile(lrcv->session);
if (!LOG_IS_DROPPED(file->ctrl->flg)) {
uint64 start_pos = CM_CALC_ALIGN(sizeof(log_file_head_t), file->ctrl->block_size);
cm_latch_x(&file->latch, lrcv->session->id, NULL);
file->head.asn = OG_INVALID_ASN;
log->free_size += file->head.write_pos - start_pos;
file->head.write_pos = start_pos;
file->ctrl->status = LOG_FILE_INACTIVE;
file->ctrl->archived = OG_FALSE;
log_flush_head(lrcv->session, file);
if (db_save_log_ctrl(lrcv->session, log->curr_file, lrcv->session->kernel->id) != OG_SUCCESS) {
CM_ABORT(0, "[Log Receiver] ABORT INFO: save control space file failed when reset switch wait");
}
cm_unlatch(&file->latch, NULL);
}
log->active_file = lrcv->wait_info.file_id;
log->curr_file = lrcv->wait_info.file_id;
file = &log->files[log->curr_file];
cm_latch_x(&file->latch, lrcv->session->id, NULL);
file->head.asn = wait_point->asn;
file->head.rst_id = (uint32)wait_point->rst_id;
file->head.write_pos = CM_CALC_ALIGN(sizeof(log_file_head_t), file->ctrl->block_size);
file->ctrl->status = LOG_FILE_CURRENT;
file->ctrl->archived = OG_FALSE;
log_flush_head(lrcv->session, file);
if (db_save_log_ctrl(lrcv->session, log->curr_file, lrcv->session->kernel->id) != OG_SUCCESS) {
CM_ABORT(0, "[Log Receiver] ABORT INFO: save control space file failed when reset switch wait");
}
cm_unlatch(&file->latch, NULL);
log_unlock_logfile(lrcv->session);
dtc_my_ctrl(lrcv->session)->log_first = log->active_file;
dtc_my_ctrl(lrcv->session)->log_last = log->curr_file;
if (db_save_core_ctrl(lrcv->session) != OG_SUCCESS) {
CM_ABORT(0, "[Log Receiver] ABORT INFO: save control space file failed when reset switch wait");
}
lrcv->wait_info.waiting = OG_FALSE;
errno_t err = memset_sp(wait_point, sizeof(log_point_t), 0, sizeof(log_point_t));
knl_securec_check(err);
lrcv->wait_info.file_id = OG_INVALID_FILEID;
}
static status_t lrcv_send_switch_wait(lrcv_context_t *lrcv)
{
char *buf = lrcv->send_buf.read_buf.aligned_buf;
rep_msg_header_t *rep_msg_header = (rep_msg_header_t *)buf;
rep_log_switch_wait_t *rep_switch_wait = (rep_log_switch_wait_t *)(buf + sizeof(rep_msg_header_t));
status_t status;
rep_msg_header->size = sizeof(rep_msg_header_t) + sizeof(rep_log_switch_wait_t);
rep_msg_header->type = REP_LOG_SWITCH_WAIT_REQ;
rep_switch_wait->wait_point = lrcv->wait_info.wait_point;
status = cs_write_stream(lrcv->pipe, buf, rep_msg_header->size,
(int32)cm_atomic_get(&lrcv->session->kernel->attr.repl_pkg_size));
if (status != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to send log switch wait message to primary");
}
return status;
}
static status_t lrcv_process_switch_wait(lrcv_context_t *lrcv)
{
knl_session_t *session = lrcv->session;
log_point_t *rcy_point = &dtc_my_ctrl(session)->rcy_point;
log_point_t *lrp_point = &dtc_my_ctrl(session)->lrp_point;
lrpl_context_t *lrpl = &session->kernel->lrpl_ctx;
log_point_t *wait_point = &lrcv->wait_info.wait_point;
log_file_head_t *head = &session->kernel->rcy_ctx.arch_file.head;
log_context_t *redo = &session->kernel->redo_ctx;
log_file_head_t *curr = &redo->files[redo->curr_file].head;
if (!lrcv->wait_info.waiting) {
return OG_SUCCESS;
}
* The following three situations indicate that all logs have been replayed
* 1. Replay point is at the end of the last archive log (the previous one of the wait point),
* 2. Replay point is at the end of current file (replay point is on current file),
* 3. Replay point is equal to wait point.
*/
if (!((head->asn == wait_point->asn - 1 && lrpl->curr_point.block_id * head->block_size == head->write_pos) ||
(curr->asn == wait_point->asn - 1 && redo->curr_point.block_id > 1 &&
redo->curr_point.block_id * curr->block_size == curr->write_pos) ||
(log_cmp_point(&lrpl->curr_point, wait_point) == 0))) {
return OG_SUCCESS;
}
rcy_wait_replay_complete(session);
if (rcy_point->lfn < lrp_point->lfn) {
ckpt_trigger(session, OG_FALSE, CKPT_TRIGGER_INC);
cm_sleep(10);
return OG_SUCCESS;
}
OG_LOG_RUN_INF("[Log Receiver] asn %u can locate in fileid %u on local node, log switch will succeed",
lrcv->wait_info.wait_point.asn, lrcv->wait_info.file_id);
lrcv_reset_switch_wait(lrcv);
if (lrcv_send_switch_wait(lrcv) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t lrcv_process_batch(lrcv_context_t *lrcv)
{
log_context_t *log = &lrcv->session->kernel->redo_ctx;
log_batch_t *batch = (log_batch_t *)(lrcv->recv_buf.read_buf.aligned_buf);
rep_batch_req_t *req = (rep_batch_req_t *)lrcv->extend_buf.read_buf.aligned_buf;
if (lrcv->header.size <= sizeof(rep_msg_header_t) + sizeof(rep_batch_req_t)) {
OG_LOG_RUN_ERR("[Log Receiver] invalid batch head size %u received, which is smaller than %u",
lrcv->header.size, (uint32)(sizeof(rep_msg_header_t) + sizeof(rep_batch_req_t)));
return OG_ERROR;
}
uint32 size = lrcv->header.size - sizeof(rep_msg_header_t) - sizeof(rep_batch_req_t);
OG_LOG_DEBUG_INF("[Log Receiver] Received batch with size %u on log file[%u] at point [%u-%u/%u/%llu], "
"current log file is %u",
size, req->log_file_id, req->log_point.rst_id, req->log_point.asn,
req->log_point.block_id, (uint64)req->log_point.lfn, log->curr_file);
lrcv->primary_curr_point = req->curr_point;
if (lrcv->status == LRCV_PREPARE) {
log_file_t *file = &log->files[log->curr_file];
* 1) req log_point > file head scenario: primary switched logfile during disconnection.
* 2) req log_point = file head scenario: standby has just been built end.
* 3) req log_point < file head scenario: this is cascaded standby, redo switched with skip,
* and peer node has failovered.
*/
if (LRCV_LOG_POINT_ON_POST_FILE(req->log_point, file->head) ||
(LRCV_LOG_POINT_ON_CURR_FILE(req->log_point, file->head) && req->log_file_id != log->curr_file) ||
LRCV_LOG_POINT_ON_PRE_FILE(req->log_point, file->head)) {
uint64 start_pos;
uint32 file_id = log->active_file;
char arch_name[OG_FILE_NAME_BUFFER_SIZE];
if (LRCV_LOG_POINT_ON_POST_FILE(req->log_point, file->head)) {
if (lrcv_fetch_archived_log(lrcv, file, OG_TRUE, arch_name, OG_FILE_NAME_BUFFER_SIZE) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to fetch archive log file [%u/%u] from primary",
file->head.rst_id, file->head.asn);
return OG_ERROR;
}
}
log_lock_logfile(lrcv->session);
start_pos = CM_CALC_ALIGN(sizeof(log_file_head_t), file->ctrl->block_size);
cm_latch_x(&file->latch, lrcv->session->id, NULL);
file->head.asn = OG_INVALID_ASN;
file->head.rst_id = (uint32)req->log_point.rst_id;
log->free_size += file->head.write_pos - start_pos;
file->head.write_pos = start_pos;
file->ctrl->status = LOG_FILE_INACTIVE;
file->ctrl->archived = OG_FALSE;
log_flush_head(lrcv->session, file);
if (db_save_log_ctrl(lrcv->session, log->curr_file, lrcv->session->kernel->id) != OG_SUCCESS) {
CM_ABORT(0, "[Log Receiver] ABORT INFO: save control space file failed when switch log file");
}
cm_unlatch(&file->latch, NULL);
OG_LOG_RUN_INF("[Log Receiver] Invalidate current file, active %u current %u",
log->active_file, log->curr_file);
while (file_id != log->curr_file) {
file = log->files + file_id;
if (file->ctrl->status == LOG_FILE_ACTIVE) {
start_pos = CM_CALC_ALIGN(sizeof(log_file_head_t), file->ctrl->block_size);
log->free_size += (uint64)file->ctrl->size - start_pos;
file->ctrl->status = LOG_FILE_UNUSED;
}
log_get_next_file(lrcv->session, &file_id, OG_FALSE);
}
log->active_file = req->log_file_id;
log->curr_file = req->log_file_id;
file = &log->files[log->curr_file];
cm_latch_x(&file->latch, lrcv->session->id, NULL);
file->head.asn = req->log_point.asn;
file->head.rst_id = (uint32)req->log_point.rst_id;
file->head.write_pos = CM_CALC_ALIGN(sizeof(log_file_head_t), file->ctrl->block_size);
file->ctrl->status = LOG_FILE_CURRENT;
file->ctrl->archived = OG_FALSE;
log_flush_head(lrcv->session, file);
if (db_save_log_ctrl(lrcv->session, log->curr_file, lrcv->session->kernel->id) != OG_SUCCESS) {
CM_ABORT(0, "[Log Receiver] ABORT INFO: save control space file failed when switch log file");
}
cm_unlatch(&file->latch, NULL);
log_unlock_logfile(lrcv->session);
OG_LOG_RUN_INF("[Log Receiver] Reset current file asn %u status %d, active %u current %u",
file->head.asn, file->ctrl->status, log->active_file, log->curr_file);
dtc_my_ctrl(lrcv->session)->log_first = log->active_file;
dtc_my_ctrl(lrcv->session)->log_last = log->curr_file;
if (OG_SUCCESS != db_save_core_ctrl(lrcv->session)) {
CM_ABORT(0, "[Log Receiver] ABORT INFO: save control space file failed when switch log file");
}
}
lrcv->reset_asn = req->log_point.asn;
lrcv->status = LRCV_READY;
}
if (req->log_file_id != log->curr_file ||
req->log_point.asn != log->files[log->curr_file].head.asn) {
callback_t callback;
if (log_switch_need_wait(lrcv->session, (uint16)req->log_file_id, req->log_point.asn)) {
lrcv_set_switch_wait(lrcv, req);
OG_LOG_RUN_INF("[Log Receiver] asn %u does not locate in fileid %u on local node, log switch should wait",
req->log_point.asn, req->log_file_id);
return OG_SUCCESS;
}
callback.keep_hb_entry = lrcv_process_heart_beat;
callback.keep_hb_param = lrcv->session;
if (log_switch_logfile(lrcv->session, (uint16)req->log_file_id, req->log_point.asn,
&callback) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to switch log file to %u with asn %u", req->log_file_id,
req->log_point.asn);
return OG_ERROR;
}
}
knl_panic(req->log_file_id == log->curr_file);
if (lrcv_verify_checksum_batch(lrcv->session, batch, size, &req->log_point) != OG_SUCCESS) {
return OG_ERROR;
}
if (lrcv_flush_log(lrcv, &req->log_point, (void *)batch, size) != OG_SUCCESS) {
return OG_ERROR;
}
lrcv->flush_point = req->log_point;
lrcv->flush_scn = req->scn;
return OG_SUCCESS;
}
static status_t lrcv_send_batch_ack(lrcv_context_t *lrcv)
{
char *buf = lrcv->send_buf.read_buf.aligned_buf;
rep_msg_header_t *rep_msg_header = (rep_msg_header_t *)buf;
rep_batch_resp_t *rep_batch_resp = (rep_batch_resp_t *)(buf + sizeof(rep_msg_header_t));
status_t status;
rep_msg_header->size = sizeof(rep_msg_header_t) + sizeof(rep_batch_resp_t);
rep_msg_header->type = REP_BATCH_RESP;
rep_batch_resp->flush_point = lrcv->flush_point;
rep_batch_resp->rcy_point = lrcv->session->kernel->redo_ctx.curr_replay_point;
rep_batch_resp->replay_lsn = (uint64)lrcv->session->kernel->lsn;
rep_batch_resp->flush_scn = lrcv->flush_scn;
rep_batch_resp->current_scn = DB_CURR_SCN(lrcv->session);
rep_batch_resp->contflush_point.rst_id = lrcv->contflush_point.rst_id;
rep_batch_resp->contflush_point.asn = lrcv->contflush_point.asn;
status = cs_write_stream(lrcv->pipe, buf, rep_msg_header->size,
(int32)cm_atomic_get(&lrcv->session->kernel->attr.repl_pkg_size));
if (status != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to send batch response message to primary");
}
return status;
}
static void lrcv_needrepair(lrcv_context_t *lrcv)
{
lrcv->status = LRCV_NEED_REPAIR;
cm_sleep(2000);
}
static status_t lrcv_wait_logfile_no_active(lrcv_context_t *lrcv)
{
log_context_t *log_ctx = &lrcv->session->kernel->redo_ctx;
log_point_t *rcy_point = &dtc_my_ctrl(lrcv->session)->rcy_point;
uint32 file_id = log_ctx->active_file;
log_file_t *file = NULL;
uint64 lfn;
uint64 scn;
OG_LOG_RUN_INF("[Log Receiver] log file active %u current %u", log_ctx->active_file, log_ctx->curr_file);
while (file_id != log_ctx->curr_file) {
ckpt_trigger(lrcv->session, OG_FALSE, CKPT_TRIGGER_INC);
file = log_ctx->files + file_id;
if (file->ctrl->status == LOG_FILE_INACTIVE || file->ctrl->status == LOG_FILE_UNUSED) {
log_get_next_file(lrcv->session, &file_id, OG_FALSE);
} else if (file->ctrl->status == LOG_FILE_ACTIVE) {
if (LRCV_LOG_POINT_ON_CURR_FILE(*rcy_point, file->head)) {
char arch_name[OG_FILE_NAME_BUFFER_SIZE];
if (arch_get_archived_log_name(lrcv->session, file->head.rst_id, file->head.asn, ARCH_DEFAULT_DEST,
arch_name, OG_FILE_NAME_BUFFER_SIZE, lrcv->session->kernel->id)) {
uint64 file_offset = 0;
if (log_get_file_offset(lrcv->session, arch_name, &lrcv->recv_buf.read_buf, &file_offset,
&lfn, &scn) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to get file offset for archived log %s ", arch_name);
return OG_ERROR;
}
if (rcy_point->lfn >= lfn || log_ctx->lfn >= lfn) {
OG_LOG_RUN_INF("[Log Receiver] rcy_point [%u-%u/%u/%llu] or replay lfn %llu has reached "
"the end of log file[%u] [%u-%u/%llu/%llu]",
rcy_point->rst_id, rcy_point->asn, rcy_point->block_id,
(uint64)rcy_point->lfn, log_ctx->lfn, file_id,
file->head.rst_id, file->head.asn, file_offset, lfn);
log_get_next_file(lrcv->session, &file_id, OG_FALSE);
} else {
OG_LOG_DEBUG_INF("[Log Receiver] rcy_point [%u-%u/%u/%llu] has not reached the end of "
"log file[%u] [%u-%u/%llu/%llu], so continue",
rcy_point->rst_id, rcy_point->asn, rcy_point->block_id,
(uint64)rcy_point->lfn, file_id, file->head.rst_id,
file->head.asn, file_offset, lfn);
cm_sleep(100);
}
}
} else if (LRCV_LOG_POINT_ON_POST_FILE(*rcy_point, file->head)) {
if (file->head.asn != OG_INVALID_ASN) {
log_recycle_file(lrcv->session, rcy_point);
} else {
log_get_next_file(lrcv->session, &file_id, OG_FALSE);
}
} else {
cm_sleep(1000);
if (lrcv_process_heart_beat(lrcv->session) != OG_SUCCESS) {
return OG_ERROR;
}
}
} else {
cm_sleep(10);
}
if (lrcv->session->killed || lrcv->thread.closed) {
OG_LOG_RUN_INF("Log receiver thread has been killed");
return OG_ERROR;
}
}
return OG_SUCCESS;
}
static status_t lrcv_prepare_log_files(lrcv_context_t *lrcv, rep_query_status_req_t *req)
{
log_context_t *log_ctx = &lrcv->session->kernel->redo_ctx;
log_point_t *lrp_point = &dtc_my_ctrl(lrcv->session)->lrp_point;
log_point_t *rcy_point = &dtc_my_ctrl(lrcv->session)->rcy_point;
uint64 curr_lfn = 0;
uint64 scn;
log_file_t *file = NULL;
if (lrcv_wait_logfile_no_active(lrcv) != OG_SUCCESS) {
return OG_ERROR;
}
arch_reset_file_id(lrcv->session, ARCH_DEFAULT_DEST);
lrcv->flush_point.lfn = 0;
lrcv->contflush_point.rst_id = dtc_my_ctrl(lrcv->session)->rcy_point.rst_id;
lrcv->contflush_point.asn = dtc_my_ctrl(lrcv->session)->rcy_point.asn;
file = &log_ctx->files[log_ctx->curr_file];
OG_LOG_RUN_INF("[Log Receiver] Standby current log file[%u] status %d log point is [%u-%u/%llu/%llu]",
log_ctx->curr_file, file->ctrl->status, file->head.rst_id,
file->head.asn, file->head.write_pos, curr_lfn);
OG_LOG_RUN_INF("[Log Receiver] Standby lrp_point is [%u-%u/%u/%llu] rcy_point is [%u-%u/%u/%llu]",
lrp_point->rst_id, lrp_point->asn, lrp_point->block_id, (uint64)lrp_point->lfn,
rcy_point->rst_id, rcy_point->asn, rcy_point->block_id, (uint64)rcy_point->lfn);
OG_LOG_RUN_INF("[Log Receiver] Primary current log point is [%u-%u/%u/%llu]",
req->curr_point.rst_id, req->curr_point.asn,
req->curr_point.block_id, (uint64)req->curr_point.lfn);
if (LRCV_LOG_POINT_ON_CURR_FILE(*lrp_point, file->head)) {
if (LRCV_LOG_POINT_ON_CURR_FILE(req->curr_point, file->head)) {
if (!lrcv->reconnected) {
lrcv->flush_point.rst_id = file->head.rst_id;
lrcv->flush_point.asn = file->head.asn;
if (log_get_file_offset(lrcv->session, file->ctrl->name, &lrcv->recv_buf.read_buf,
(uint64 *)&file->head.write_pos, &curr_lfn, &scn) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to get file offset for online log %s ", file->ctrl->name);
return OG_ERROR;
}
OG_LOG_RUN_INF("[Log Receiver] Standby repaired current log file[%u] status %d log point is [%u-%u/%llu/%llu]",
log_ctx->curr_file, file->ctrl->status, file->head.rst_id, file->head.asn,
file->head.write_pos, curr_lfn);
lrcv->flush_point.block_id = (uint32)(file->head.write_pos / file->ctrl->block_size);
lrcv->flush_point.lfn = curr_lfn;
} else {
lrcv->flush_point = log_ctx->curr_point;
}
OG_LOG_RUN_INF("[Log Receiver] lrp_point is on current log, and primary/standby log has no gap");
} else if (LRCV_LOG_POINT_ON_POST_FILE(req->curr_point, file->head)) {
char arch_name[OG_FILE_NAME_BUFFER_SIZE];
uint64 write_pos_ori = file->head.write_pos;
uint64 write_pos_new;
if (lrcv_fetch_archived_log(lrcv, file, OG_FALSE, arch_name, OG_FILE_NAME_BUFFER_SIZE) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to fetch archive log file [%u/%u] from primary",
file->head.rst_id, file->head.asn);
return OG_ERROR;
}
file->ctrl->archived = OG_TRUE;
if (db_save_log_ctrl(lrcv->session, log_ctx->curr_file, lrcv->session->kernel->id) != OG_SUCCESS) {
CM_ABORT(0, "[Log Receiver] ABORT INFO: save control redo file failed when prepare logfile");
}
lrcv->flush_point.rst_id = file->head.rst_id;
lrcv->flush_point.asn = file->head.asn;
if (log_get_file_offset(lrcv->session, arch_name, &lrcv->recv_buf.read_buf,
&write_pos_new, &curr_lfn, &scn) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to get file offset for archived log %s ", arch_name);
return OG_ERROR;
}
log_set_logfile_writepos(lrcv->session, file, write_pos_new);
OG_LOG_RUN_INF("[Log Receiver] Standby repaired current log file[%u] status %d log point "
"is [%u-%u/%llu/%llu] with old pos %llu",
log_ctx->curr_file, file->ctrl->status, file->head.rst_id, file->head.asn,
file->head.write_pos, curr_lfn, write_pos_ori);
if (lrcv->reconnected) {
log_ctx->free_size -= file->head.write_pos - write_pos_ori;
}
lrcv->flush_point.lfn = curr_lfn;
lrcv->flush_point.block_id = (uint32)(file->head.write_pos / file->ctrl->block_size);
OG_LOG_RUN_INF("[Log Receiver] lrp_point is on current log, and primary/standby log has gap");
} else {
OG_LOG_RUN_ERR("[Log Receiver] current log point [%u-%u/%u/%llu] from primary is "
"less than standby current log [%u-%u]",
req->curr_point.rst_id, req->curr_point.asn, req->curr_point.block_id,
(uint64)req->curr_point.lfn, file->head.rst_id, file->head.asn);
return OG_ERROR;
}
} else if (LRCV_LOG_POINT_ON_PRE_FILE(*lrp_point, file->head)) {
lrcv->flush_point = log_ctx->curr_point;
if (!lrcv->reconnected) {
if (log_get_file_offset(lrcv->session, file->ctrl->name, &lrcv->recv_buf.read_buf,
(uint64 *)&file->head.write_pos, &curr_lfn, &scn) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to get file offset for logfile[%u] %s, latest lfn %llu",
log_ctx->curr_file, file->ctrl->name, curr_lfn);
return OG_ERROR;
}
}
* The current point of the primary node is smaller than the current file of the standby node,
* this issue occurs only when redo skips and the peer node becomes the primary node after failover.
* In this case, we set flush point with redo current point.
*/
if (!LRCV_LOG_POINT_ON_PRE_FILE(req->curr_point, file->head)) {
lrcv->flush_point.rst_id = file->head.rst_id;
lrcv->flush_point.asn = file->head.asn;
lrcv->flush_point.block_id = (uint32)(file->head.write_pos / file->ctrl->block_size);
if (curr_lfn > lrcv->flush_point.lfn) {
lrcv->flush_point.lfn = curr_lfn;
}
}
OG_LOG_RUN_INF("[Log Receiver] lrp_point is on previous log");
} else {
knl_panic(file->head.asn == OG_INVALID_ASN);
}
if (!lrcv->reconnected) {
log_ctx->free_size += log_file_freesize(file);
}
OG_LOG_RUN_INF("[Log Receiver] Set flush point to [%u-%u/%u/%llu], log free size is %llu",
lrcv->flush_point.rst_id, lrcv->flush_point.asn, lrcv->flush_point.block_id,
(uint64)lrcv->flush_point.lfn, log_ctx->free_size);
return OG_SUCCESS;
}
static bool32 lrcv_log_ctrl_check(lrcv_context_t *lrcv, rep_query_status_req_t *req)
{
log_context_t *log_ctx = &lrcv->session->kernel->redo_ctx;
log_file_t *logfile = NULL;
log_file_ctrl_t *ctrl_pri = NULL;
if (req->log_num != log_ctx->logfile_hwm) {
OG_LOG_RUN_ERR("[Log Receiver] Redo hwm [%u/%u] is no equal for primary/standby, need repair",
req->log_num, log_ctx->logfile_hwm);
return OG_FALSE;
}
ctrl_pri = (log_file_ctrl_t *)((char *)req + sizeof(rep_query_status_req_t));
for (uint32 i = 0; i < log_ctx->logfile_hwm; i++) {
logfile = &log_ctx->files[i];
if (logfile->ctrl->flg != ctrl_pri->flg) {
OG_LOG_RUN_ERR("[Log Receiver] %s flag %u is different from %s flag %u (0-normal, 1-dropped, 2-alarmed), "
"need repair", (strlen(ctrl_pri->name) != 0) ? ctrl_pri->name : "redo", ctrl_pri->flg,
(strlen(logfile->ctrl->name) != 0) ? logfile->ctrl->name : "redo", logfile->ctrl->flg);
return OG_FALSE;
}
if (logfile->ctrl->block_size != ctrl_pri->block_size) {
OG_LOG_RUN_ERR("[Log Receiver] %s block size %u is different from %s block size %u, need repair",
ctrl_pri->name, ctrl_pri->block_size, logfile->ctrl->name, logfile->ctrl->block_size);
return OG_FALSE;
}
if (logfile->ctrl->file_id != ctrl_pri->file_id) {
OG_LOG_RUN_ERR("[Log Receiver] %s fileid %d is different from %s fileid %d, need repair",
ctrl_pri->name, ctrl_pri->file_id, logfile->ctrl->name, logfile->ctrl->file_id);
return OG_FALSE;
}
if (logfile->ctrl->size != ctrl_pri->size) {
OG_LOG_RUN_ERR("[Log Receiver] %s file size %lld is different from %s file size %lld, need repair",
ctrl_pri->name, ctrl_pri->size, logfile->ctrl->name, logfile->ctrl->size);
return OG_FALSE;
}
if (logfile->ctrl->type != ctrl_pri->type) {
OG_LOG_RUN_ERR("[Log Receiver] %s type %u is different from %s type %u (1-file, 2-raw, 3-cfs), "
"need repair", ctrl_pri->name, ctrl_pri->type, logfile->ctrl->name, logfile->ctrl->type);
return OG_FALSE;
}
ctrl_pri++;
}
return OG_TRUE;
}
static void lrcv_repair_logfile_rstid(knl_session_t *session, reset_log_t *rst_log)
{
log_file_t *file = NULL;
log_context_t *ogx = &session->kernel->redo_ctx;
for (uint32 i = 0; i < ogx->logfile_hwm; i++) {
file = &ogx->files[i];
if (file->head.asn > rst_log->last_asn && file->head.rst_id < rst_log->rst_id) {
OG_LOG_RUN_INF("[Log Receiver] logfile %s asn %u larger than resetlog asn %u, "
"but rstid %u less than resetlog rstid %u, revise file rstid with %u",
file->ctrl->name, file->head.asn, rst_log->last_asn, file->head.rst_id,
rst_log->rst_id, rst_log->rst_id);
file->head.rst_id = rst_log->rst_id;
log_flush_head(session, file);
}
}
}
static status_t lrcv_check_resetid(lrcv_context_t *lrcv, rep_query_status_req_t *req)
{
log_point_t *point = &lrcv->session->kernel->redo_ctx.curr_point;
database_t *db = &lrcv->session->kernel->db;
log_point_t *rcy_point = &dtc_my_ctrl(lrcv->session)->rcy_point;
reset_log_t *resetlog = &db->ctrl.core.resetlogs;
if (req->rst_log.rst_id == resetlog->rst_id && resetlog->rst_id != 0 &&
req->rst_log.last_lfn != resetlog->last_lfn) {
lrcv_needrepair(lrcv);
OG_LOG_RUN_ERR("[Log Receiver] primary has same resetid [%u] with standby, "
"but last lfn is not equal [%llu/%llu], "
"they are different sources, need repair",
req->rst_log.rst_id, req->rst_log.last_lfn, resetlog->last_lfn);
return OG_ERROR;
}
if (req->rst_log.rst_id > point->rst_id + OG_MAX_RESETLOG_DISTANCE) {
lrcv_needrepair(lrcv);
OG_LOG_RUN_ERR("[Log Receiver] Resetlog distance is larger than [%u], need repair, "
"rst_id in message/curr_point is %u/%u",
OG_MAX_RESETLOG_DISTANCE, req->rst_log.rst_id, point->rst_id);
return OG_ERROR;
}
if (req->rst_log.rst_id > rcy_point->rst_id + OG_MAX_RESETLOG_DISTANCE) {
ckpt_trigger(lrcv->session, OG_TRUE, CKPT_TRIGGER_FULL);
if (req->rst_log.rst_id > rcy_point->rst_id + OG_MAX_RESETLOG_DISTANCE) {
lrcv_needrepair(lrcv);
OG_LOG_RUN_ERR("[Log Receiver] Resetlog distance is larger than [%u], need repair, "
"rst_id in message/rcy_point is %u/%u",
OG_MAX_RESETLOG_DISTANCE, req->rst_log.rst_id, rcy_point->rst_id);
return OG_ERROR;
}
}
if (req->rst_log.rst_id < point->rst_id || req->rst_log.rst_id < resetlog->rst_id) {
lrcv_needrepair(lrcv);
OG_LOG_RUN_ERR("[Log Receiver] Standby current point resetid [%u] or resetlogs resetid [%u] is faster "
"than primary resetlogs [%u], need repair",
point->rst_id, resetlog->rst_id, req->rst_log.rst_id);
return OG_ERROR;
}
return OG_SUCCESS;
}
void lrcv_reset_primary_host(knl_session_t *session)
{
lrcv_context_t *lrcv = &session->kernel->lrcv_ctx;
errno_t ret = memset_sp(&lrcv->primary_host, sizeof(lrcv->primary_host), 0, sizeof(lrcv->primary_host));
knl_securec_check(ret);
}
static void lrcv_trigger_start_lsnd(lrcv_context_t *lrcv)
{
knl_session_t *session = lrcv->session;
arch_context_t *arch_ctx = &session->kernel->arch_ctx;
cm_spin_lock(&arch_ctx->dest_lock, NULL);
arch_ctx->arch_dest_state_changed = OG_TRUE;
while (arch_ctx->arch_dest_state_changed) {
if (session->killed) {
cm_spin_unlock(&arch_ctx->dest_lock);
return;
}
cm_sleep(1);
}
cm_spin_unlock(&arch_ctx->dest_lock);
}
static status_t lrcv_resetlog_check(lrcv_context_t *lrcv, rep_query_status_req_t *req)
{
log_point_t *point = &lrcv->session->kernel->redo_ctx.curr_point;
database_t *db = &lrcv->session->kernel->db;
log_point_t *rcy_point = &dtc_my_ctrl(lrcv->session)->rcy_point;
log_point_t *lrp_point = &dtc_my_ctrl(lrcv->session)->lrp_point;
reset_log_t *resetlog = &db->ctrl.core.resetlogs;
bool32 resetid_changed = OG_FALSE;
if (req->version >= ST_VERSION_1 && req->notify_repair) {
lrcv_needrepair(lrcv);
OG_LOG_RUN_ERR("[Log Receiver] primary detects invalid batches based on the sending point returned by "
"standby, need repair");
return OG_ERROR;
}
if (!db->ctrl.core.is_restored && LRCV_INVALID_DBID(req->dbid, db->ctrl.core.dbid)) {
lrcv_needrepair(lrcv);
OG_LOG_RUN_ERR("[Log Receiver] primary dbid [%u] is not equal to standby dbid [%u], need repair",
req->dbid, db->ctrl.core.dbid);
return OG_ERROR;
}
OG_LOG_RUN_INF("[Log Receiver] resetlog_check primary resetlog is [%u/%u/%llu], standby resetlog is [%u/%u/%llu], "
"primary current point is [%u/%u/%llu], standby current point is [%u/%u/%llu], "
"standby rcy point is [%u/%u/%llu], lrp point is [%u/%u/%llu], "
"standby flush point is [%u/%u/%llu]",
req->rst_log.rst_id, req->rst_log.last_asn, req->rst_log.last_lfn,
resetlog->rst_id, resetlog->last_asn, resetlog->last_lfn,
req->curr_point.rst_id, req->curr_point.asn, (uint64)req->curr_point.lfn,
point->rst_id, point->asn, (uint64)point->lfn,
rcy_point->rst_id, rcy_point->asn, (uint64)rcy_point->lfn,
lrp_point->rst_id, lrp_point->asn, (uint64)lrp_point->lfn,
lrcv->flush_point.rst_id, lrcv->flush_point.asn, (uint64)lrcv->flush_point.lfn);
if (lrcv_check_resetid(lrcv, req) != OG_SUCCESS) {
return OG_ERROR;
}
if (req->curr_point.lfn < point->lfn || LOG_POINT_FILE_LT(req->curr_point, *point)) {
lrcv_needrepair(lrcv);
OG_LOG_RUN_ERR("[Log Receiver] Standby current point [%u/%u/%llu] is faster "
"than primary current point [%u/%u/%llu], need repair", point->rst_id, point->asn,
(uint64)point->lfn, req->curr_point.rst_id, req->curr_point.asn, (uint64)req->curr_point.lfn);
return OG_ERROR;
}
if (req->rst_log.rst_id != point->rst_id &&
((req->rst_log.last_lfn < lrp_point->lfn - 1) || (req->rst_log.last_lfn < point->lfn))) {
lrcv_needrepair(lrcv);
OG_LOG_RUN_ERR("[Log Receiver] Standby log lrp point [%u/%llu] current redo point [%u/%llu] is faster "
"than primary [%u/%llu], need repair",
point->rst_id, (uint64)lrp_point->lfn - 1, point->rst_id,
(uint64)point->lfn, req->rst_log.rst_id, req->rst_log.last_lfn);
return OG_ERROR;
}
if (!lrcv_log_ctrl_check(lrcv, req)) {
lrcv_needrepair(lrcv);
return OG_ERROR;
}
* Consider the disaster recovery scenario as follows: node A is primary, node B & C is standby,
* and node D is cascaded standby.
* A ----------> C
* | |
* B D
*
* If node A is down abnormally, and node B is promoted to primary by failover, then node B will
* connect to node C.
* A ---> C
* / |
* B ---- D
*
* resetlog in node C will be different from D. So node C should reconnect to node D to transfer
* new resetlog to node D.
*/
if (req->rst_log.rst_id > resetlog->rst_id) {
resetid_changed = OG_TRUE;
}
*resetlog = req->rst_log;
if (!OG_INVALID_SCN(req->reset_log_scn)) {
db->ctrl.core.reset_log_scn = req->reset_log_scn;
}
if (db_save_core_ctrl(lrcv->session) != OG_SUCCESS) {
CM_ABORT(0, "[Log Receiver] ABORT INFO: Save core control file failed when database reset logs, need repair");
}
if (resetid_changed || lrcv->host_changed) {
lsnd_mark_reconnect(lrcv->session, resetid_changed, lrcv->host_changed);
}
lrcv_repair_logfile_rstid(lrcv->session, &req->rst_log);
OG_LOG_RUN_INF("[Log Receiver] Resetlog check passed. Current reset log is [%u/%u/%llu], "
"current log point in redo context is [%u-%u/%u/%llu]",
req->rst_log.rst_id, req->rst_log.last_asn, req->rst_log.last_lfn,
point->rst_id, point->asn, point->block_id, (uint64)point->lfn);
return OG_SUCCESS;
}
static void lrcv_try_change_db_role(lrcv_context_t *lrcv)
{
database_t *db = &lrcv->session->kernel->db;
if (lrcv->role_spec_building) {
return;
}
if (DB_IS_PHYSICAL_STANDBY(db) && (lrcv->peer_role == PEER_STANDBY)) {
db->ctrl.core.db_role = REPL_ROLE_CASCADED_PHYSICAL_STANDBY;
OG_LOG_RUN_INF("[Log Receiver] Changes database role from physical standby to cascaded physical standby");
}
if (DB_IS_CASCADED_PHYSICAL_STANDBY(db) && (lrcv->peer_role == PEER_PRIMARY)) {
db->ctrl.core.db_role = REPL_ROLE_PHYSICAL_STANDBY;
lrcv_trigger_start_lsnd(lrcv);
OG_LOG_RUN_INF("[Log Receiver] Changes database role from cascaded physical standby to physical standby");
}
if (db->status == DB_STATUS_OPEN && db_save_core_ctrl(lrcv->session) != OG_SUCCESS) {
CM_ABORT(0, "[Log Receiver] ABORT INFO: Save core control file failed when changes database role");
}
}
static void lrcv_try_save_dbid(lrcv_context_t *lrcv)
{
database_t *db = &lrcv->session->kernel->db;
if (db->ctrl.core.is_restored) {
db->ctrl.core.dbid = lrcv->dbid;
db_set_ctrl_restored(lrcv->session, OG_FALSE);
}
}
static status_t lrcv_process_query_status(lrcv_context_t *lrcv, rep_query_status_req_t *req)
{
database_t *db = &lrcv->session->kernel->db;
char *buf = lrcv->send_buf.read_buf.aligned_buf;
rep_msg_header_t *msg_hdr = (rep_msg_header_t *)buf;
rep_query_status_resp_t *resp = (rep_query_status_resp_t *)(buf + sizeof(rep_msg_header_t));
status_t status;
bool32 is_building_cascaded = OG_FALSE;
lrcv->peer_repl_port = req->repl_port;
lrcv->peer_role = req->is_standby ? PEER_STANDBY : PEER_PRIMARY;
if (DB_IS_CASCADED_PHYSICAL_STANDBY(db) && lrcv->role_spec_building) {
is_building_cascaded = OG_TRUE;
}
lrcv->dbid = req->dbid;
lrcv->primary_curr_point = req->curr_point;
lrcv->primary_resetlog = req->rst_log;
lrcv->primary_reset_log_scn = req->reset_log_scn;
if (db->status != DB_STATUS_OPEN || (is_building_cascaded && lrcv->peer_role == PEER_PRIMARY)) {
resp->is_ready = OG_FALSE;
} else {
if (lrcv_resetlog_check(lrcv, req) == OG_SUCCESS) {
if (lrcv_prepare_log_files(lrcv, req) == OG_SUCCESS) {
resp->is_ready = OG_TRUE;
resp->flush_point = lrcv->flush_point;
resp->rcy_point = dtc_my_ctrl(lrcv->session)->rcy_point;
resp->replay_lsn = (uint64)lrcv->session->kernel->lsn;
lrcv->status = LRCV_PREPARE;
lrcv->reconnected = OG_TRUE;
lrcv_try_save_dbid(lrcv);
lrcv_try_change_db_role(lrcv);
} else {
OG_LOG_RUN_ERR("[Log Receiver] Failed to prepare log files");
resp->is_ready = OG_FALSE;
}
} else {
OG_LOG_RUN_ERR("[Log Receiver] Failed to check reset log");
return OG_ERROR;
}
}
resp->is_building_cascaded = is_building_cascaded;
resp->is_building = lrcv->is_building;
msg_hdr->size = sizeof(rep_msg_header_t) + sizeof(rep_query_status_resp_t);
msg_hdr->type = REP_QUERY_STATUS_RESP;
status = cs_write_stream(lrcv->pipe, buf, msg_hdr->size,
(int32)cm_atomic_get(&lrcv->session->kernel->attr.repl_pkg_size));
if (status != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to send query status response to primary");
}
return status;
}
status_t lrcv_buf_alloc(knl_session_t *session, lrcv_context_t *lrcv)
{
uint32 log_size = (uint32)LOG_LGWR_BUF_SIZE(session);
uint32 buf_size;
if (lrcv->recv_buf.read_buf.alloc_buf != NULL) {
return OG_SUCCESS;
}
buf_size = log_size + SIZE_K(4);
if (cm_aligned_malloc((int64)buf_size, "lrcv recv buffer", &lrcv->recv_buf.read_buf) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to alloc recv buffer with size %u", buf_size);
return OG_ERROR;
}
lrcv->recv_buf.illusion_count = 0;
lrcv->recv_buf.read_pos = 0;
lrcv->recv_buf.write_pos = 0;
lrcv->d_ctx.zstd_dctx = ZSTD_createDCtx();
lrcv->d_ctx.buf_size = (uint32)LZ4_compressBound((int32)log_size) + SIZE_K(4);
lrcv->d_ctx.data_size = 0;
if (cm_aligned_malloc((int64)lrcv->d_ctx.buf_size, "lrcv compressed buffer",
&lrcv->d_ctx.compressed_buf) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to alloc compressed buffer with size %u", lrcv->d_ctx.buf_size);
cm_aligned_free(&lrcv->recv_buf.read_buf);
return OG_ERROR;
}
buf_size = SIZE_K(64);
if (cm_aligned_malloc((int64)buf_size, "lrcv send buffer", &lrcv->send_buf.read_buf) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to alloc send buffer with size %u", buf_size);
cm_aligned_free(&lrcv->recv_buf.read_buf);
cm_aligned_free(&lrcv->d_ctx.compressed_buf);
return OG_ERROR;
}
lrcv->send_buf.illusion_count = 0;
lrcv->send_buf.read_pos = 0;
lrcv->send_buf.write_pos = 0;
buf_size = SIZE_M(1);
if (cm_aligned_malloc((int64)buf_size, "lrcv extend buffer", &lrcv->extend_buf.read_buf) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to alloc extend buffer with size %u", buf_size);
cm_aligned_free(&lrcv->recv_buf.read_buf);
cm_aligned_free(&lrcv->send_buf.read_buf);
cm_aligned_free(&lrcv->d_ctx.compressed_buf);
return OG_ERROR;
}
lrcv->extend_buf.illusion_count = 0;
lrcv->extend_buf.read_pos = 0;
lrcv->extend_buf.write_pos = 0;
return OG_SUCCESS;
}
static status_t lrcv_process_switch_response(lrcv_context_t *lrcv, rep_switch_resp_t *resp)
{
switch_ctrl_t *ctrl = &lrcv->session->kernel->switch_ctrl;
cm_spin_lock(&lrcv->lock, NULL);
if (lrcv->state != REP_STATE_WAITING_DEMOTE) {
cm_spin_unlock(&lrcv->lock);
OG_LOG_RUN_INF("[Log Receiver] ignore switchover response message from primary");
return OG_SUCCESS;
}
if (resp->state != REP_STATE_PROMOTE_APPROVE) {
lrcv->state = REP_STATE_REJECTED;
cm_spin_unlock(&lrcv->lock);
OG_LOG_RUN_INF("[Log Receiver] switchover request is rejected by primary");
return OG_SUCCESS;
}
OG_LOG_RUN_INF("[Log Receiver] received switchover response message from primary");
cm_spin_lock(&ctrl->lock, NULL);
ctrl->request = SWITCH_REQ_PROMOTE;
cm_spin_unlock(&ctrl->lock);
lrcv->state = REP_STATE_STANDBY_PROMOTING;
cm_spin_unlock(&lrcv->lock);
return OG_SUCCESS;
}
static status_t lrcv_process_abr_req(lrcv_context_t *lrcv, rep_abr_req_t *abr_req)
{
char *buf = lrcv->send_buf.read_buf.aligned_buf;
rep_msg_header_t *msg_hdr = (rep_msg_header_t *)buf;
rep_abr_resp_t *abr_resp = (rep_abr_resp_t *)(buf + sizeof(rep_msg_header_t));
uint32 page_offset = sizeof(rep_msg_header_t) + sizeof(rep_abr_resp_t);
page_id_t page_id;
status_t status;
errno_t err;
page_id.file = abr_req->file;
page_id.page = abr_req->page;
msg_hdr->size = page_offset + abr_req->blk_size;
msg_hdr->type = REP_ABR_RESP;
abr_resp->lsnd_id = abr_req->lsnd_id;
abr_resp->file = abr_req->file;
abr_resp->page = abr_req->page;
if (abr_req->blk_size != lrcv->session->kernel->attr.page_size) {
OG_LOG_RUN_ERR("[Log Receiver] receives invalid ABR message, request page size is %u, default page size is %u",
abr_req->blk_size, lrcv->session->kernel->attr.page_size);
return OG_ERROR;
}
if (abr_wait_paral_rcy_compelte(lrcv->session) != OG_SUCCESS) {
return OG_ERROR;
}
if (buf_read_page(lrcv->session, page_id, LATCH_MODE_S, ENTER_PAGE_NORMAL) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] ABR standby failed to read page [%u-%u]", page_id.file, page_id.page);
return OG_ERROR;
}
err = memcpy_sp(buf + page_offset, (uint32)lrcv->send_buf.read_buf.buf_size - page_offset, lrcv->session->curr_page,
abr_req->blk_size);
knl_securec_check(err);
buf_leave_page(lrcv->session, OG_FALSE);
status = cs_write_stream(lrcv->pipe, buf, msg_hdr->size,
(int32)cm_atomic_get(&lrcv->session->kernel->attr.repl_pkg_size));
OG_LOG_RUN_INF("[Log Receiver] send ABR response to primary for file %u page %u with lsnd id %u status %d",
abr_req->file, abr_req->page, abr_req->lsnd_id, status);
return status;
}
static status_t lrcv_process_message(lrcv_context_t *lrcv)
{
char *extend_buf = lrcv->extend_buf.read_buf.aligned_buf;
switch (lrcv->header.type) {
case REP_BATCH_REQ: {
if (SECUREC_UNLIKELY(lrcv->wait_info.waiting)) {
break;
}
if (lrcv_process_batch(lrcv) != OG_SUCCESS) {
return OG_ERROR;
}
if (SECUREC_UNLIKELY(lrcv->wait_info.waiting)) {
if (lrcv_send_switch_wait(lrcv) != OG_SUCCESS) {
return OG_ERROR;
}
break;
}
if (lrcv_send_batch_ack(lrcv) != OG_SUCCESS) {
return OG_ERROR;
}
break;
}
case REP_QUERY_STATUS_REQ: {
rep_query_status_req_t *rep_query_status_req = (rep_query_status_req_t *)extend_buf;
if (lrcv_process_query_status(lrcv, rep_query_status_req) != OG_SUCCESS) {
return OG_ERROR;
}
break;
}
case REP_HEART_BEAT_REQ: {
if (lrcv_process_heart_beat(lrcv->session) != OG_SUCCESS) {
return OG_ERROR;
}
break;
}
case REP_SWITCH_RESP: {
rep_switch_resp_t *switch_resp = (rep_switch_resp_t *)extend_buf;
if (lrcv_process_switch_response(lrcv, switch_resp) != OG_SUCCESS) {
return OG_ERROR;
}
break;
}
case REP_ABR_REQ: {
rep_abr_req_t *abr_req = (rep_abr_req_t *)extend_buf;
if (lrcv_process_abr_req(lrcv, abr_req) != OG_SUCCESS) {
return OG_ERROR;
}
break;
}
case REP_RECORD_BACKUPSET_RESP: {
lrcv->task.failed = *(bool32 *)extend_buf;
if (lrcv->task.failed) {
lrcv->task.error_no = ERR_RECORD_BACKUP_FAILED;
}
lrcv->task.status = BAK_TASK_DONE;
break;
}
default:
OG_LOG_RUN_ERR("[Log Receiver] invalid replication message type %u", lrcv->header.type);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t lrcv_zstd_decompress(lrcv_context_t *lrcv, const char *buf, uint32 size, uint32 *data_size)
{
if (lrcv->recv_buf.read_buf.buf_size != OG_MAX_BATCH_SIZE + SIZE_K(4)) {
uint64 ori_batches_size = ZSTD_getFrameContentSize(buf, size);
if (ori_batches_size == ZSTD_CONTENTSIZE_ERROR || ori_batches_size == ZSTD_CONTENTSIZE_UNKNOWN) {
OG_LOG_RUN_ERR("[Log Receiver] failed to decompress(zstd) log batch message");
return OG_ERROR;
}
if (ori_batches_size > lrcv->recv_buf.read_buf.buf_size) {
if (cm_aligned_realloc((int64)(OG_MAX_BATCH_SIZE + SIZE_K(4)), "lrcv recv buffer",
&lrcv->recv_buf.read_buf) != OG_SUCCESS) {
CM_ABORT(0, "ABORT INFO: malloc lrcv compressed buffer fail.");
}
}
}
*data_size = (uint32)ZSTD_decompressDCtx(lrcv->d_ctx.zstd_dctx, lrcv->recv_buf.read_buf.aligned_buf,
(size_t)lrcv->recv_buf.read_buf.buf_size, buf, size);
if (ZSTD_isError(*data_size)) {
OG_LOG_RUN_ERR("[Log Receiver] failed to decompress(zstd) log batch message");
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t lrcv_lz4_decompress(lrcv_context_t *lrcv, const char *buf, uint32 size, uint32 *data_size)
{
int result = LZ4_decompress_safe(buf, lrcv->recv_buf.read_buf.aligned_buf, (int32)size,
(int32)lrcv->recv_buf.read_buf.buf_size);
if (result <= 0) {
if (lrcv->recv_buf.read_buf.buf_size != OG_MAX_BATCH_SIZE + SIZE_K(4)) {
if (cm_aligned_realloc((int64)(OG_MAX_BATCH_SIZE + SIZE_K(4)), "lrcv recv buffer",
&lrcv->recv_buf.read_buf) != OG_SUCCESS) {
CM_ABORT(0, "ABORT INFO: malloc lrcv compressed buffer fail.");
}
result = LZ4_decompress_safe(buf, lrcv->recv_buf.read_buf.aligned_buf, (int32)size,
(int32)lrcv->recv_buf.read_buf.buf_size);
if (result < 0) {
OG_LOG_RUN_ERR("[Log Receiver] failed to decompress(lz4) log batch message");
return OG_ERROR;
}
} else {
OG_LOG_RUN_ERR("[Log Receiver] failed to decompress(lz4) log batch message");
return OG_ERROR;
}
}
*data_size = (uint32)result;
return OG_SUCCESS;
}
static status_t lrcv_compress_receive(lrcv_context_t *lrcv, rep_batch_req_t *req, uint32 remain_size, int32 *recv_size)
{
uint32 batches_size = 0;
if (cs_read_stream(lrcv->pipe, lrcv->d_ctx.compressed_buf.aligned_buf, REPL_RECV_TIMEOUT, remain_size,
recv_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to receive log batch message from primary with size %u", remain_size);
return OG_ERROR;
}
if ((uint32)*recv_size != remain_size) {
OG_LOG_RUN_ERR("[Log Receiver] failed to receive log batch message from primary with size %u, %u received",
remain_size, *recv_size);
return OG_ERROR;
}
switch (req->compress_alg) {
case COMPRESS_ZSTD:
if (lrcv_zstd_decompress(lrcv, lrcv->d_ctx.compressed_buf.aligned_buf, (uint32)*recv_size,
&batches_size) != OG_SUCCESS) {
return OG_ERROR;
}
break;
case COMPRESS_LZ4:
if (lrcv_lz4_decompress(lrcv, lrcv->d_ctx.compressed_buf.aligned_buf, (uint32)*recv_size,
&batches_size) != OG_SUCCESS) {
return OG_ERROR;
}
break;
default:
break;
}
lrcv->header.size = sizeof(rep_msg_header_t) + sizeof(rep_batch_req_t) + batches_size;
return OG_SUCCESS;
}
static status_t lrcv_receive(lrcv_context_t *lrcv, uint32 timeout, int32 *recv_size)
{
rep_batch_req_t *rep_batch_req = NULL;
uint32 remain_size;
uint32 new_compress_buf_size = 0;
uint32 new_buf_size;
if (cs_read_stream(lrcv->pipe, (char *)&lrcv->header, timeout, sizeof(rep_msg_header_t), recv_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to receive message from primary");
return OG_ERROR;
}
if (*recv_size == 0) {
return OG_SUCCESS;
}
if (lrcv->header.size < (uint32)*recv_size) {
OG_LOG_RUN_ERR("[Log Receiver] invalid head size %u received, which is smaller than %u", lrcv->header.size,
(uint32)*recv_size);
return OG_ERROR;
}
remain_size = lrcv->header.size - (uint32)*recv_size;
if (lrcv->header.type == REP_BATCH_REQ) {
if (cs_read_stream(lrcv->pipe, lrcv->extend_buf.read_buf.aligned_buf, REPL_RECV_TIMEOUT,
sizeof(rep_batch_req_t), recv_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to receive rep_batch_req_t message from primary with size %u",
(uint32)sizeof(rep_batch_req_t));
return OG_ERROR;
}
if (*recv_size != sizeof(rep_batch_req_t)) {
OG_LOG_RUN_ERR("[Log Receiver] failed to receive rep_batch_req_t message from "
"primary with size %u, %u received",
(uint32)sizeof(rep_batch_req_t), *recv_size);
return OG_ERROR;
}
rep_batch_req = (rep_batch_req_t *)lrcv->extend_buf.read_buf.aligned_buf;
if (remain_size <= (uint32)*recv_size) {
OG_LOG_RUN_ERR("[Log Receiver] invalid head size %u received, which is smaller than %u", lrcv->header.size,
lrcv->header.size - remain_size + (uint32)*recv_size);
return OG_ERROR;
}
remain_size -= (uint32)*recv_size;
if (rep_batch_req->compress_alg == COMPRESS_NONE) {
if (lrcv->header.size > lrcv->recv_buf.read_buf.buf_size) {
new_buf_size = OG_MAX_BATCH_SIZE + SIZE_K(8);
if (cm_aligned_realloc((int64)new_buf_size, "lrcv recv buffer",
&lrcv->recv_buf.read_buf) != OG_SUCCESS) {
CM_ABORT(0, "[Log Receiver] failed to alloc recv buffer with size %u", new_buf_size);
}
}
if (remain_size > lrcv->recv_buf.read_buf.buf_size) {
OG_LOG_RUN_ERR("[Log Receiver] the remain data size %u exceeds the receive buffer size %u",
remain_size, (uint32)lrcv->extend_buf.read_buf.buf_size);
return OG_ERROR;
}
if (cs_read_stream(lrcv->pipe, lrcv->recv_buf.read_buf.aligned_buf, REPL_RECV_TIMEOUT, remain_size,
recv_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to receive log batch message from primary with size %u",
remain_size);
return OG_ERROR;
}
if ((uint32)*recv_size != remain_size) {
OG_LOG_RUN_ERR("[Log Receiver] failed to receive log batch message from primary "
"with size %u, %u received",
remain_size, *recv_size);
return OG_ERROR;
}
} else {
if (lrcv->header.size > lrcv->d_ctx.compressed_buf.buf_size) {
if (rep_batch_req->compress_alg == COMPRESS_ZSTD) {
new_compress_buf_size = (uint32)ZSTD_compressBound(OG_MAX_BATCH_SIZE) + SIZE_K(4);
} else if (rep_batch_req->compress_alg == COMPRESS_LZ4) {
new_compress_buf_size = (uint32)LZ4_compressBound((int32)OG_MAX_BATCH_SIZE) + SIZE_K(4);
} else {
OG_LOG_RUN_ERR("[Log Receiver] unsupported compress algorithm.");
return OG_ERROR;
}
if (cm_aligned_realloc((int64)new_compress_buf_size, "lrcv compressed buffer",
&lrcv->d_ctx.compressed_buf) != OG_SUCCESS) {
CM_ABORT(0, "ABORT INFO: malloc lrcv compress buffer fail.");
}
lrcv->d_ctx.compressed_buf.buf_size = new_compress_buf_size;
}
if (lrcv_compress_receive(lrcv, rep_batch_req, remain_size, recv_size) != OG_SUCCESS) {
return OG_ERROR;
}
}
} else {
if (remain_size > 0) {
if (remain_size > lrcv->extend_buf.read_buf.buf_size) {
OG_LOG_RUN_ERR("[Log Receiver] the remain data size %u exceeds the receive buffer size %u",
remain_size, (uint32)lrcv->extend_buf.read_buf.buf_size);
return OG_ERROR;
}
if (cs_read_stream(lrcv->pipe, lrcv->extend_buf.read_buf.aligned_buf, REPL_RECV_TIMEOUT, remain_size,
recv_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to receive message %u from primary with size %u",
lrcv->header.type, remain_size);
return OG_ERROR;
}
if ((uint32)*recv_size != remain_size) {
OG_LOG_RUN_ERR("[Log Receiver] failed to receive message %u from primary with size %u, %u received",
lrcv->header.type, remain_size, *recv_size);
return OG_ERROR;
}
}
}
(*recv_size) = lrcv->header.size;
if (!lrcv_rcv_msg_is_valid(lrcv)) {
OG_LOG_RUN_ERR("[Log Receiver] invalid message %u received", lrcv->header.type);
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t lrcv_send_switch_request(lrcv_context_t *lrcv)
{
char *buf = lrcv->send_buf.read_buf.aligned_buf;
rep_msg_header_t *rep_msg_header = (rep_msg_header_t *)buf;
rep_msg_header->size = sizeof(rep_msg_header_t);
rep_msg_header->type = REP_SWITCH_REQ;
if (cs_write_stream(lrcv->pipe, buf, rep_msg_header->size,
(int32)cm_atomic_get(&lrcv->session->kernel->attr.repl_pkg_size)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to send switchover request message to primary");
return OG_ERROR;
}
lrcv->state = REP_STATE_WAITING_DEMOTE;
OG_LOG_RUN_INF("[Log Receiver] send switchover request to primary");
return OG_SUCCESS;
}
static status_t lrcv_send_record_backupset(lrcv_context_t *lrcv)
{
bak_context_t *bak_ctx = &lrcv->session->kernel->backup_ctx;
bak_record_t *record = &bak_ctx->bak.record;
rep_msg_header_t rep_msg_header;
rep_msg_header.size = sizeof(rep_msg_header_t) + sizeof(bak_record_t);
rep_msg_header.type = REP_RECORD_BACKUPSET_REQ;
if (cs_write_stream(lrcv->pipe, (char *)&rep_msg_header, sizeof(rep_msg_header_t),
(int32)cm_atomic_get(&lrcv->session->kernel->attr.repl_pkg_size)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to send record backupset request message to primary");
return OG_ERROR;
}
if (cs_write_stream(lrcv->pipe, (char *)record, sizeof(bak_record_t),
(int32)cm_atomic_get(&lrcv->session->kernel->attr.repl_pkg_size)) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[Log Receiver] failed to send backupset record data to standby");
return OG_ERROR;
}
OG_LOG_RUN_INF("[Log Receiver] send record backupset request to primary");
return OG_SUCCESS;
}
static void lrcv_process_backup_req(lrcv_context_t *lrcv)
{
rep_bak_task_t *task = &lrcv->task;
if (task->status != BAK_TASK_WAIT_PROCESS) {
return;
}
if (lrcv_send_record_backupset(lrcv) != OG_SUCCESS) {
task->status = BAK_TASK_DONE;
task->failed = OG_TRUE;
task->error_no = ERR_SEND_RECORD_REQ_FAILED;
} else {
task->status = BAK_TASK_WAIT_RESPONSE;
}
}
static status_t lrcv_process_req(lrcv_context_t *lrcv)
{
if (lrcv->state == REP_STATE_DEMOTE_REQUEST) {
return lrcv_send_switch_request(lrcv);
}
lrcv_process_backup_req(lrcv);
if (lrcv_process_switch_wait(lrcv) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t lrcv_process_msg_loop(lrcv_context_t *lrcv)
{
int32 recv_size;
do {
if (lrcv_process_message(lrcv) != OG_SUCCESS) {
return OG_ERROR;
}
if (lrcv_process_req(lrcv) != OG_SUCCESS) {
return OG_ERROR;
}
if (lrcv_receive(lrcv, 0, &recv_size) != OG_SUCCESS) {
return OG_ERROR;
}
} while (recv_size > 0);
return OG_SUCCESS;
}
static inline void lrcv_try_set_repl_port(lrcv_context_t *lrcv, int32 recv_size)
{
if (recv_size == 0) {
return;
}
if (lrcv->header.type == REP_QUERY_STATUS_REQ) {
rep_query_status_req_t *req = (rep_query_status_req_t *)lrcv->extend_buf.read_buf.aligned_buf;
lrcv->peer_repl_port = req->repl_port;
}
}
status_t lrcv_proc(lrcv_context_t *lrcv)
{
thread_t *thread = &lrcv->thread;
int32 recv_size;
uint32 recv_retry_cnt = 0;
cm_set_thread_name("log_receiver");
OG_LOG_RUN_INF("log receiver thread started");
while (!thread->closed) {
if (lrcv_need_exit(lrcv)) {
lrcv_set_conn_err(lrcv);
return OG_ERROR;
}
if (lrcv_process_req(lrcv) != OG_SUCCESS) {
lrcv_set_conn_err(lrcv);
return OG_ERROR;
}
if (lrcv_receive(lrcv, LRCV_RECV_INTERVAL, &recv_size) != OG_SUCCESS) {
lrcv_set_conn_err(lrcv);
return OG_ERROR;
}
if (lrcv_need_suspend(lrcv)) {
lrcv_try_set_repl_port(lrcv, recv_size);
cm_sleep(SUSPEND_INTERVAL);
continue;
}
if (recv_size > 0) {
recv_retry_cnt = 0;
if (lrcv_process_msg_loop(lrcv) != OG_SUCCESS) {
lrcv_set_conn_err(lrcv);
return OG_ERROR;
}
} else {
if ((lrcv->status == LRCV_PREPARE || lrcv->status == LRCV_READY) &&
(recv_retry_cnt++ > lrcv->timeout * MILLISECS_PER_SECOND / LRCV_RECV_INTERVAL)) {
OG_LOG_RUN_INF("[Log Receiver] lrcv has not received message more than %us, primary is down probably",
lrcv->timeout);
lrcv_set_conn_err(lrcv);
return OG_ERROR;
}
}
}
lrcv_set_conn_err(lrcv);
OG_LOG_RUN_INF("log receiver thread closed");
return OG_SUCCESS;
}
void lrcv_close(knl_session_t *session)
{
knl_instance_t *kernel = (knl_instance_t *)session->kernel;
lrcv_context_t *lrcv = &kernel->lrcv_ctx;
lrcv_reset_primary_host(session);
lrcv->thread.closed = OG_TRUE;
while (LRCV_IS_RUNING(lrcv) && lrcv->thread.closed) {
cm_sleep(10);
}
}
static status_t lrcv_get_primary_host(knl_session_t *session, int32 retry_count_input, char *host, uint32 host_buf_size)
{
int32 retry_count = retry_count_input;
lrcv_context_t *lrcv = NULL;
errno_t ret;
while (session->kernel->lrcv_ctx.pipe == NULL) {
if (retry_count == 0) {
OG_LOG_RUN_ERR("[Log Receiver] abort wait connection ready during to exceeded max retries");
OG_THROW_ERROR(ERR_PRI_NOT_CONNECT, "primary info");
return OG_ERROR;
}
if (knl_failover_triggered(session->kernel)) {
OG_LOG_RUN_ERR("[Log Receiver] abort wait connection ready during to force promote");
return OG_ERROR;
}
if (session->kernel->lrpl_ctx.thread.closed) {
OG_LOG_RUN_ERR("[Log Receiver] abort wait connection ready during to lrpl closed");
OG_THROW_ERROR(ERR_OPERATION_KILLED);
return OG_ERROR;
}
cm_sleep(5);
if (retry_count > 0) {
retry_count--;
}
}
lrcv = &session->kernel->lrcv_ctx;
ret = strncpy_s(host, host_buf_size, lrcv->primary_host, OG_HOST_NAME_BUFFER_SIZE - 1);
knl_securec_check(ret);
return OG_SUCCESS;
}
status_t lrcv_get_primary_server(knl_session_t *session, int32 retry_count,
char *host, uint32 host_buf_size, uint16 *port)
{
lrcv_context_t *lrcv = &session->kernel->lrcv_ctx;
if (lrcv_get_primary_host(session, retry_count, host, host_buf_size) != OG_SUCCESS) {
return OG_ERROR;
}
if (port == NULL) {
return OG_SUCCESS;
}
*port = lrcv->peer_repl_port;
if (*port == 0) {
OG_THROW_ERROR(ERR_INVALID_REPL_PORT);
OG_LOG_RUN_ERR("[Log Receiver] peer repl port is 0, it may be that the REPL_PORT parameter "
"of the peer node is not set, or it has been disconnected from the peer node");
return OG_ERROR;
}
return OG_SUCCESS;
}
bool32 lrcv_switchover_enabled(knl_session_t *session)
{
char pri_host[OG_HOST_NAME_BUFFER_SIZE];
uint16 pri_port;
knl_attr_t *attr = &session->kernel->attr;
arch_attr_t *arch_attr = NULL;
if (lrcv_get_primary_server(session, 0, pri_host, OG_HOST_NAME_BUFFER_SIZE, &pri_port) != OG_SUCCESS) {
return OG_FALSE;
}
for (uint32 i = 1; i < OG_MAX_ARCH_DEST; i++) {
arch_attr = &attr->arch_attr[i];
if (strcmp(pri_host, arch_attr->service.host) == 0 && pri_port == arch_attr->service.port &&
arch_attr->enable && arch_attr->role_valid != VALID_FOR_STANDBY_ROLE) {
return OG_TRUE;
}
}
return OG_FALSE;
}
void lrcv_trigger_backup_task(knl_session_t *session)
{
lrcv_context_t *lrcv = &session->kernel->lrcv_ctx;
rep_bak_task_t *task = &lrcv->task;
CM_ASSERT(task->status == BAK_TASK_DONE);
if (LRCV_IS_RUNING(lrcv)) {
task->failed = OG_FALSE;
task->status = BAK_TASK_WAIT_PROCESS;
} else {
task->failed = OG_TRUE;
task->error_no = ERR_SEND_RECORD_REQ_FAILED;
}
}
status_t lrcv_wait_task_process(knl_session_t *session)
{
lrcv_context_t *lrcv = &session->kernel->lrcv_ctx;
rep_bak_task_t *task = &lrcv->task;
while (task->status != BAK_TASK_DONE) {
if (session->canceled) {
OG_THROW_ERROR(ERR_OPERATION_CANCELED);
return OG_ERROR;
}
if (session->killed) {
OG_THROW_ERROR(ERR_OPERATION_KILLED);
return OG_ERROR;
}
cm_sleep(100);
}
if (task->failed) {
OG_THROW_ERROR(task->error_no);
return OG_ERROR;
}
return OG_SUCCESS;
}
void lrcv_wait_status_prepared(knl_session_t *session)
{
lrcv_context_t *lrcv = &session->kernel->lrcv_ctx;
while (lrcv->status < LRCV_PREPARE) {
if (lrcv->session == NULL || lrcv->session->killed) {
return;
}
cm_sleep(100);
}
}
void lrcv_clear_needrepair_for_failover(knl_session_t *session)
{
session->kernel->lrcv_ctx.status = LRCV_DISCONNECTED;
session->kernel->lftc_client_ctx.arch_lost = OG_FALSE;
session->kernel->rcy_ctx.log_decrypt_failed = OG_FALSE;
}