* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* repl_raft.c
*
*
* IDENTIFICATION
* src/kernel/replication/repl_raft.c
*
* -------------------------------------------------------------------------
*/
#include "knl_replication_module.h"
#include "repl_raft.h"
#include "knl_recovery.h"
#include "knl_context.h"
#include "dtc_dls.h"
#include "dtc_database.h"
knl_session_t *g_raft_session = NULL;
void raft_pending_switch_request(knl_session_t *session, switch_ctrl_t *ctrl)
{
cm_spin_lock(&ctrl->lock, NULL);
ctrl->request = SWITCH_REQ_RAFT_PROMOTE_PENDING;
ctrl->state = SWITCH_IDLE;
ctrl->keep_sid = session->id;
ctrl->handling = OG_FALSE;
cm_spin_unlock(&ctrl->lock);
}
static unsigned long long raft_getappliedindex_cb(void)
{
static uint64 prev_lfn = OG_INVALID_ID64;
static uint64 prev_raft_idx = OG_INVALID_ID64;
raft_context_t *ogx = &g_raft_session->kernel->raft_ctx;
OG_LOG_DEBUG_INF("RAFT: get appliedindex raft saved raft flush point lfn=%lld, saved raft flush point index=%lld\n",
(uint64)ogx->saved_raft_flush_point.lfn, ogx->saved_raft_flush_point.raft_index);
knl_panic((prev_lfn == OG_INVALID_ID64 && prev_raft_idx == OG_INVALID_ID64) ||
(prev_lfn <= ogx->saved_raft_flush_point.lfn && prev_raft_idx <= ogx->saved_raft_flush_point.raft_index));
prev_lfn = ogx->saved_raft_flush_point.lfn;
prev_raft_idx = ogx->saved_raft_flush_point.raft_index;
return ogx->saved_raft_flush_point.raft_index;
}
static status_t db_notify_failover_promote_for_raft(knl_session_t *session)
{
switch_ctrl_t *ctrl = &session->kernel->switch_ctrl;
cm_spin_lock(&ctrl->lock, NULL);
if (ctrl->request == SWITCH_REQ_NONE) {
ctrl->request = SWITCH_REQ_RAFT_PROMOTE_PENDING;
ctrl->state = SWITCH_IDLE;
ctrl->keep_sid = 0;
ctrl->handling = OG_FALSE;
}
if (ctrl->request != SWITCH_REQ_RAFT_PROMOTE_PENDING) {
cm_spin_unlock(&ctrl->lock);
OG_LOG_RUN_INF("RAFT: invalid switch request, server is handling another switch request");
OG_THROW_ERROR(ERR_INVALID_SWITCH_REQUEST, "server is handling another switch request");
return OG_ERROR;
}
ctrl->request = SWITCH_REQ_RAFT_PROMOTE;
cm_spin_unlock(&ctrl->lock);
OG_LOG_RUN_INF("[DB] notify server to do failover");
return OG_SUCCESS;
}
static void raft_promote_cb(long long errCode)
{
status_t status;
raft_context_t *raft_ctx = &g_raft_session->kernel->raft_ctx;
OG_LOG_RUN_WAR("RAFT: promote cb start to promote leader: errCode=%lld\n", errCode);
raft_wait_for_log_flush(g_raft_session, raft_ctx->commit_lfn);
knl_panic(raft_ctx->logwr_async_buf_write_pos == raft_ctx->logwr_async_buf_flush_pos);
raft_reset_async_buffer(raft_ctx);
status = db_notify_failover_promote_for_raft(g_raft_session);
if (status != OG_SUCCESS) {
OG_LOG_RUN_WAR("RAFT: promote cb fail to do failover.");
} else {
OG_LOG_RUN_WAR("RAFT: promote cb failover successfully.");
}
}
static void raft_demote_cb(long long errCode)
{
CM_ABORT(0, "[RAFT] ABORT INFO: demote to follower, shutdown DB, and wait for CM to restart it, errCode=%lld\n",
errCode);
}
static status_t raft_batch_verify_checksum(log_batch_t *batch, uint32 len)
{
log_batch_tail_t *tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));
if (batch->space_size != len) {
OG_LOG_RUN_ERR("[RAFT] invalid received size for batch ,lfn :%llu", (uint64)batch->head.point.lfn);
return OG_ERROR;
}
if (!rcy_validate_batch(batch, tail)) {
OG_LOG_RUN_ERR("[RAFT] invalid received batch ,lfn :%llu", (uint64)batch->head.point.lfn);
return OG_ERROR;
}
if (rcy_verify_checksum(g_raft_session, batch) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static void raft_get_lfn_scn_cb(char *buf, uint32 len, uint64 *lfn, uint64 *scn)
{
raft_context_t *raft_ctx = &g_raft_session->kernel->raft_ctx;
*lfn = 0;
*scn = 0;
if (raft_ctx->status == RAFT_STATUS_CLOSING) {
return;
}
if (!DB_IS_PRIMARY(&g_raft_session->kernel->db)) {
return;
}
if (buf == NULL) {
*lfn = raft_ctx->raft_flush_point.lfn;
*scn = raft_ctx->raft_flush_point.scn;
return;
}
log_batch_t *batch = (log_batch_t *)buf;
batch->raft_index = 0;
if (raft_batch_verify_checksum(batch, len) != OG_SUCCESS) {
return;
}
*lfn = batch->head.point.lfn;
*scn = batch->scn;
}
static uint64 raft_get_primary_id(knl_session_t *session)
{
status_t status;
char *start_ptr = NULL;
char *end_ptr = NULL;
char *query_info = NULL;
uint64 leader_node_id = 0;
uint32 leader_id_index = 3;
uint32 pos;
status = knl_raft_query_info(session, "cluster", &query_info);
if (status != OG_SUCCESS) {
return leader_node_id;
}
if (query_info == NULL) {
return leader_node_id;
}
pos = 0;
start_ptr = query_info;
end_ptr = strstr(query_info, ";");
while (pos != leader_id_index && end_ptr != NULL) {
start_ptr = end_ptr + 1;
end_ptr = strstr(start_ptr, ";");
pos++;
}
if (start_ptr == NULL || end_ptr == NULL) {
free(query_info);
return leader_node_id;
}
leader_node_id = strtoull(start_ptr, &end_ptr, 10);
free(query_info);
return leader_node_id;
}
bool32 raft_is_primary_alive(knl_session_t *session)
{
return raft_get_primary_id(session) != 0;
}
static void raft_printfunc(int level, char *raft_log, int len)
{
OG_LOG_RAFT(level, "%s, len=%d", raft_log, len);
}
void raft_log_flush_async_head(raft_context_t *raft_ctx, log_file_t *file)
{
errno_t err;
uint32 block_size = CM_CALC_ALIGN(file->ctrl->block_size, sizeof(log_file_head_t));
knl_panic(block_size <= raft_ctx->logwr_head_buf_size);
err = memset_sp(raft_ctx->logwr_head_buf, raft_ctx->logwr_head_buf_size, 0, block_size);
knl_securec_check(err);
err = memcpy_sp(raft_ctx->logwr_head_buf, raft_ctx->logwr_head_buf_size, &file->head, sizeof(log_file_head_t));
knl_securec_check(err);
if (cm_write_device(file->ctrl->type, file->handle, 0, raft_ctx->logwr_head_buf, block_size) != OG_SUCCESS) {
CM_ABORT(0, "[RAFT] ABORT INFO: flush redo file:%s, offset:%u, size:%lu failed.", file->ctrl->name, 0,
sizeof(log_file_head_t));
}
}
static void raft_log_flush_init(knl_session_t *session, log_batch_t *batch)
{
log_context_t *redo_ctx = &session->kernel->redo_ctx;
raft_context_t *raft_ctx = &session->kernel->raft_ctx;
log_file_t *file = redo_ctx->files + redo_ctx->curr_file;
uint32 next;
knl_panic(batch->head.point.asn == file->head.asn || batch->head.point.asn == file->head.asn + 1);
if (file->head.asn != batch->head.point.asn) {
knl_panic(!DB_IS_PRIMARY(&session->kernel->db));
log_flush_head(session, file);
log_get_next_file(session, &next, OG_TRUE);
while (next == redo_ctx->active_file) {
cm_spin_unlock(&raft_ctx->raft_write_disk_lock);
ckpt_trigger(session, OG_FALSE, CKPT_TRIGGER_INC);
OG_LOG_RUN_WAR("RAFT: log free size not enough in raft log flush init, wait for ckpt.");
cm_sleep(100);
cm_spin_lock(&raft_ctx->raft_write_disk_lock, NULL);
log_get_next_file(session, &next, OG_TRUE);
}
(void)log_switch_file(session);
knl_panic(redo_ctx->files[redo_ctx->curr_file].head.asn == batch->head.point.asn);
redo_ctx->stat.space_requests++;
OG_LOG_DEBUG_INF("RAFT: log flush init switch log file.");
}
file = &redo_ctx->files[redo_ctx->curr_file];
knl_panic((uint64)file->ctrl->size >= batch->space_size + batch->head.point.block_id * file->ctrl->block_size);
}
void log_flush_init_for_raft(knl_session_t *session, uint32 batch_size)
{
log_context_t *ogx = &session->kernel->redo_ctx;
log_file_t *file = ogx->files + ogx->curr_file;
if (log_file_freesize(file) < batch_size) {
raft_context_t *raft_ctx = &session->kernel->raft_ctx;
uint32 asn = file->head.asn;
OG_LOG_DEBUG_INF("RAFT: log flush init for raft, flush buffer before switch logfile: "
"flush pos=%d, raft pos=%d, write pos= %d\n",
raft_ctx->logwr_async_buf_flush_pos, raft_ctx->logwr_async_buf_raft_pos,
raft_ctx->logwr_async_buf_write_pos);
raft_wait_for_log_flush(session, raft_ctx->sent_lfn);
log_flush_head(session, file);
(void)log_switch_file(session);
ogx->stat.space_requests++;
file = ogx->files + ogx->curr_file;
knl_panic(asn + 1 == file->head.asn);
}
file = ogx->files + ogx->curr_file;
knl_panic(log_file_freesize(file) >= batch_size);
}
status_t raft_check_log_size(knl_session_t *session)
{
knl_instance_t *kernel = (knl_instance_t *)session->kernel;
log_context_t *ogx = &kernel->redo_ctx;
log_file_t *file = NULL;
ogx->logfile_hwm = MY_LOGFILE_SET(session)->logfile_hwm;
ogx->files = MY_LOGFILE_SET(session)->items;
int64 size = ogx->files[0].ctrl->size;
for (uint32 i = 0; i < ogx->logfile_hwm; i++) {
file = &ogx->files[i];
if (LOG_IS_DROPPED(file->ctrl->flg)) {
continue;
}
if (file->ctrl->size != size) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
status_t raft_load(knl_session_t *session)
{
knl_instance_t *kernel = (knl_instance_t *)session->kernel;
raft_context_t *raft_ctx = NULL;
if (!DB_IS_RAFT_ENABLED(kernel)) {
return OG_SUCCESS;
}
raft_ctx = &kernel->raft_ctx;
raft_ctx->status = RAFT_STATUS_STARTING;
if (raft_check_log_size(session) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_LOG_SIZE_NOT_MATCH);
return OG_ERROR;
}
if (log_check_blocksize(session) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_LOG_BLOCK_NOT_MATCH);
return OG_ERROR;
}
if (log_check_minsize(session) != OG_SUCCESS) {
OG_THROW_ERROR(ERR_LOG_FILE_SIZE_TOO_SMALL, (int64)LOG_MIN_SIZE(session, kernel));
return OG_ERROR;
}
if (RAFT_IS_RESTORE_PRIMARY(kernel)) {
kernel->db.ctrl.core.raft_flush_point.raft_index = RAFT_DEFAULT_INDEX;
kernel->db.ctrl.core.raft_flush_point.scn = OG_INVALID_ID64;
kernel->db.ctrl.core.raft_flush_point.lfn = dtc_my_ctrl(session)->lrp_point.lfn;
if (db_save_core_ctrl(session) != OG_SUCCESS) {
CM_ABORT(0, "[DB] ABORT INFO: save core control file failed reset lrp raft index.");
}
}
raft_ctx->log_block_size = kernel->redo_ctx.files[0].ctrl->block_size;
raft_ctx->raft_flush_point = kernel->db.ctrl.core.raft_flush_point;
raft_ctx->raft_recv_point = kernel->db.ctrl.core.raft_flush_point;
raft_ctx->saved_raft_flush_point = kernel->db.ctrl.core.raft_flush_point;
raft_ctx->old_role = kernel->db.ctrl.core.db_role;
return OG_SUCCESS;
}
static void raft_sendlog_cb(char *buf, int buf_len, unsigned long long lsn, unsigned long long raft_index,
long long errorCode)
{
log_context_t *redo_ctx = &g_raft_session->kernel->redo_ctx;
raft_context_t *raft_ctx = &g_raft_session->kernel->raft_ctx;
log_batch_t *batch_log = NULL;
log_batch_t *new_batch = NULL;
log_batch_tail_t *tail = NULL;
if (raft_ctx->status == RAFT_STATUS_CLOSING) {
return;
}
batch_log = (log_batch_t *)buf;
tail = (log_batch_tail_t *)((char *)batch_log + batch_log->size - sizeof(log_batch_tail_t));
OG_LOG_DEBUG_INF("RAFT: sendlog cb: lsn=%lld, raft_index=%lld, batch len= %d, batch log size=%d, "
"batch log space size=%d, batch head lfn=%lld,"
"batch tail lfn=%lld, curr_point lfn=%lld, recv_point lfn =%lld, flush_point lfn=%lld\n",
lsn, raft_index, buf_len, batch_log->size, batch_log->space_size,
(uint64)batch_log->head.point.lfn, (uint64)tail->point.lfn, (uint64)redo_ctx->curr_point.lfn,
(uint64)raft_ctx->recv_point.lfn, (uint64)raft_ctx->flush_point.lfn);
knl_panic(!DB_IS_PRIMARY(&g_raft_session->kernel->db) || g_raft_session->kernel->db.status != DB_STATUS_OPEN);
knl_panic(lsn == batch_log->head.point.lfn);
knl_panic(batch_log->head.magic_num == LOG_MAGIC_NUMBER);
knl_panic(tail->magic_num == LOG_MAGIC_NUMBER);
knl_panic(batch_log->head.point.lfn == tail->point.lfn);
knl_panic(raft_ctx->recv_point.lfn >= raft_ctx->flush_point.lfn);
knl_panic(raft_ctx->raft_recv_point.raft_index >= raft_ctx->raft_flush_point.raft_index);
if (batch_log->head.point.lfn <= raft_ctx->recv_point.lfn) {
OG_LOG_RUN_WAR("RAFT: sendlog cb, ignore duplicated batch: lsn=%lld, raft_index=%lld, batch len= %d, "
"batch lfn=%lld, recv point=%lld, flush_point=%lld\n",
lsn, raft_index, buf_len, (uint64)batch_log->head.point.lfn,
(uint64)raft_ctx->recv_point.lfn, (uint64)raft_ctx->flush_point.lfn);
return;
}
if (batch_log->head.point.lfn != raft_ctx->recv_point.lfn + 1) {
CM_ABORT(0, "[RAFT] ABORT INFO: sendlog cb: log lost, lsn=%llu, raft_index=%llu, buf_len= %d, "
"batch lfn=%llu, recv point=%llu, flush_point=%llu\n",
lsn, raft_index, buf_len, (uint64)batch_log->head.point.lfn, (uint64)raft_ctx->recv_point.lfn,
(uint64)raft_ctx->flush_point.lfn);
}
if (raft_index < raft_ctx->raft_recv_point.raft_index) {
CM_ABORT(0, "[RAFT] ABORT INFO: sendlog cb: raft not in order, lsn=%llu, raft_index=%llu, buf_len= %d, "
"batch lfn=%llu, recv point lfn=%llu, recv point raft_index=%llu, "
"flush_point lfn=%llu, flush_point raft_index=%llu\n",
lsn, raft_index, buf_len, (uint64)batch_log->head.point.lfn, (uint64)raft_ctx->recv_point.lfn,
(uint64)raft_ctx->raft_recv_point.raft_index, (uint64)raft_ctx->flush_point.lfn,
(uint64)raft_ctx->raft_flush_point.raft_index);
}
batch_log->raft_index = raft_index;
OG_LOG_DEBUG_INF("RAFT: sendlog cb1: flush pos=%d, raft pos=%d, write pos= %d,"
"batch lfn=%lld, batch raft_index=%lld\n",
raft_ctx->logwr_async_buf_flush_pos, raft_ctx->logwr_async_buf_raft_pos,
raft_ctx->logwr_async_buf_write_pos, (uint64)batch_log->head.point.lfn, batch_log->raft_index);
if (raft_write_to_async_buffer_num(g_raft_session, batch_log, &new_batch) != OG_SUCCESS) {
return;
}
cm_spin_lock(&raft_ctx->raft_lock, NULL);
raft_ctx->logwr_async_buf_raft_pos = raft_ctx->logwr_async_buf_write_pos;
raft_ctx->recv_point.rst_id = batch_log->head.point.rst_id;
raft_ctx->recv_point.asn = batch_log->head.point.asn;
raft_ctx->recv_point.block_id = batch_log->head.point.block_id + batch_log->space_size / raft_ctx->log_block_size;
raft_ctx->recv_point.lfn = new_batch->head.point.lfn;
raft_ctx->raft_recv_point.lfn = new_batch->head.point.lfn;
raft_ctx->raft_recv_point.raft_index = raft_index;
raft_ctx->sent_lfn = new_batch->head.point.lfn;
raft_ctx->commit_lfn = new_batch->head.point.lfn;
OG_LOG_DEBUG_INF("RAFT: sendlog cb3: flush pos=%d, raft pos=%d, write pos= %d, "
"batch lfn=%lld, batch raft_index=%lld\n",
raft_ctx->logwr_async_buf_flush_pos, raft_ctx->logwr_async_buf_raft_pos,
raft_ctx->logwr_async_buf_write_pos, (uint64)new_batch->head.point.lfn, new_batch->raft_index);
cm_spin_unlock(&raft_ctx->raft_lock);
(void)cm_release_cond(&raft_ctx->cond);
}
void raft_writelog_cb(unsigned long long lsn, unsigned long long raft_index, long long errorCode)
{
raft_context_t *raft_ctx = &g_raft_session->kernel->raft_ctx;
log_batch_t *batch_log = NULL;
log_batch_tail_t *tail = NULL;
static uint64 prev_lfn = OG_INVALID_ID64;
static uint64 prev_raft_index = OG_INVALID_ID64;
OG_LOG_DEBUG_INF("RAFT: writelog cb lsn: %llu, raft_index: %llu, errorCode: %llu", lsn, raft_index, errorCode);
knl_panic(DB_IS_PRIMARY(&g_raft_session->kernel->db));
knl_panic(lsn != 0);
knl_panic(lsn != OG_INVALID_ID64);
knl_panic(raft_index != 0);
knl_panic(raft_index != OG_INVALID_ID64);
knl_panic(prev_lfn == OG_INVALID_ID64 || prev_lfn + 1 == lsn);
knl_panic(prev_raft_index == OG_INVALID_ID64 ||
prev_raft_index <= raft_index);
knl_panic(raft_ctx->commit_lfn == 0 || raft_ctx->commit_lfn + 1 == lsn);
OG_LOG_DEBUG_INF("RAFT: writelog_cb1: flush pos=%d, raft pos=%d, write pos= %d\n",
raft_ctx->logwr_async_buf_flush_pos, raft_ctx->logwr_async_buf_raft_pos,
raft_ctx->logwr_async_buf_write_pos);
prev_lfn = lsn;
prev_raft_index = raft_index;
raft_ctx->commit_lfn = lsn;
knl_panic(raft_ctx->sent_lfn >= raft_ctx->commit_lfn);
cm_spin_lock(&raft_ctx->raft_lock, NULL);
batch_log = (log_batch_t *)(raft_ctx->logwr_async_buf +
(uint64)raft_ctx->logwr_async_buf_raft_pos * raft_ctx->logwr_async_buf_slot_size);
tail = (log_batch_tail_t *)((char *)batch_log + batch_log->size - sizeof(log_batch_tail_t));
knl_panic(batch_log->head.point.lfn == lsn);
knl_panic(batch_log->head.point.lfn == tail->point.lfn);
knl_panic(batch_log->head.magic_num == LOG_MAGIC_NUMBER);
knl_panic(tail->magic_num == LOG_MAGIC_NUMBER);
knl_panic(batch_log->raft_index == OG_INVALID_ID64);
batch_log->raft_index = raft_index;
raft_ctx->logwr_async_buf_raft_pos = RAFT_ASYNC_LOG_NEXT(raft_ctx, raft_ctx->logwr_async_buf_raft_pos);
raft_ctx->recv_point.rst_id = batch_log->head.point.rst_id;
raft_ctx->recv_point.asn = batch_log->head.point.asn;
raft_ctx->recv_point.block_id = batch_log->head.point.block_id + batch_log->space_size / raft_ctx->log_block_size;
raft_ctx->recv_point.lfn = batch_log->head.point.lfn;
raft_ctx->raft_recv_point.lfn = batch_log->head.point.lfn;
raft_ctx->raft_recv_point.raft_index = raft_index;
OG_LOG_DEBUG_INF("RAFT: writelog cb2: flush pos=%d, raft pos=%d, write pos= %d, batch lfn=%lld, "
"batch raft_index=%lld, send lfn =%lld, commit lfn=%lld\n",
raft_ctx->logwr_async_buf_flush_pos, raft_ctx->logwr_async_buf_raft_pos,
raft_ctx->logwr_async_buf_write_pos, (uint64)batch_log->head.point.lfn,
batch_log->raft_index, raft_ctx->sent_lfn, raft_ctx->commit_lfn);
cm_spin_unlock(&raft_ctx->raft_lock);
(void)cm_release_cond(&raft_ctx->cond);
}
static void raft_db_register_callback(knl_session_t *session)
{
raft_context_t *ogx = &session->kernel->raft_ctx;
char *writelog_cb_func_name = "WriteLogCallbackIn";
char *sendlog_cb_func_name = "SendLogCBFunc";
char *getappliedindex_func_name = "GetAppliedIndex";
char *promote_cb_func_name = "PromoteCBFunc";
char *demote_cb_func_name = "DemoteCBFunc";
char *print_func_name = "PrintFuncName";
char *get_lfn_scn_name = "GetLfnScn";
(void)raft_lib_register(&ogx->raft_proc, print_func_name, (void *)raft_printfunc);
(void)raft_lib_register(&ogx->raft_proc, writelog_cb_func_name, (void *)raft_writelog_cb);
(void)raft_lib_register(&ogx->raft_proc, sendlog_cb_func_name, (void *)raft_sendlog_cb);
(void)raft_lib_register(&ogx->raft_proc, getappliedindex_func_name, (void *)raft_getappliedindex_cb);
(void)raft_lib_register(&ogx->raft_proc, promote_cb_func_name, (void *)raft_promote_cb);
(void)raft_lib_register(&ogx->raft_proc, demote_cb_func_name, (void *)raft_demote_cb);
(void)raft_lib_register(&ogx->raft_proc, get_lfn_scn_name, (void *)raft_get_lfn_scn_cb);
}
static status_t raft_db_init_module(knl_session_t *session, uint64 *last_committed_lfn)
{
knl_attr_t *attr = &session->kernel->attr;
raft_context_t *raft_ctx = &session->kernel->raft_ctx;
status_t status;
errno_t errcode;
if (attr->raft_kudu_dir[0] == '\0') {
errno_t ret;
ret = snprintf_s(attr->raft_kudu_dir, OG_FILE_NAME_BUFFER_SIZE, OG_FILE_NAME_BUFFER_SIZE - 1, "%s/kudu",
session->kernel->home);
knl_securec_check_ss(ret);
}
cm_init_cond(&raft_ctx->cond);
if (raft_load_lib(&raft_ctx->raft_proc) != OG_SUCCESS) {
OG_LOG_RUN_INF("RAFT: failed to load ibconsistency.so");
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "load ibconsistency.so");
return OG_ERROR;
}
if (raft_lib_initDBEngines(&raft_ctx->raft_proc, attr->raft_log_level) != OG_SUCCESS) {
OG_LOG_RUN_INF("RAFT: failed to init db engine");
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "init db engine");
return OG_ERROR;
}
g_raft_session = session->kernel->sessions[SESSION_ID_LOGWR_ASYNC];
raft_db_register_callback(session);
status = raft_lib_set_param(&raft_ctx->raft_proc, (char *)"RAFT_PRIORITY_TYPE", (void *)attr->raft_priority_type);
if (status != OG_SUCCESS) {
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "set raft priority type");
return OG_ERROR;
}
errcode = strncpy_s(raft_ctx->priority_type, OG_FILE_NAME_BUFFER_SIZE,
attr->raft_priority_type, strlen(attr->raft_priority_type));
if (errcode != EOK) {
OG_THROW_ERROR(ERR_SYSTEM_CALL, (errcode));
return OG_ERROR;
}
status = raft_lib_set_param(&raft_ctx->raft_proc, (char *)"RAFT_PRIORITY_LEVEL", (void *)attr->raft_priority_level);
if (status != OG_SUCCESS) {
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "set raft priority level");
return OG_ERROR;
}
errcode = strncpy_s(raft_ctx->priority_level, OG_FILE_NAME_BUFFER_SIZE,
attr->raft_priority_level, strlen(attr->raft_priority_level));
if (errcode != EOK) {
OG_THROW_ERROR(ERR_SYSTEM_CALL, (errcode));
return OG_ERROR;
}
status = raft_lib_set_param(&raft_ctx->raft_proc, (char *)"RAFT_LAYOUT_INFO", (void *)attr->raft_layout_info);
if (status != OG_SUCCESS) {
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "set raft layout info");
return OG_ERROR;
}
errcode = strncpy_s(raft_ctx->layout_info, OG_FILE_NAME_BUFFER_SIZE,
attr->raft_layout_info, strlen(attr->raft_layout_info));
if (errcode != EOK) {
OG_THROW_ERROR(ERR_SYSTEM_CALL, (errcode));
return OG_ERROR;
}
status = raft_lib_set_param(&raft_ctx->raft_proc, (char *)"RAFT_PENDING_CMDS_BUFFER_SIZE",
(void *)attr->raft_pending_cmds_buffer_size);
if (status != OG_SUCCESS) {
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "set raft pending cmds buffer size");
return OG_ERROR;
}
status = raft_lib_set_param(&raft_ctx->raft_proc, (char *)"RAFT_SEND_BUFFER_SIZE",
(void *)attr->raft_send_buffer_size);
if (status != OG_SUCCESS) {
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "set raft send buffer size");
return OG_ERROR;
}
status = raft_lib_set_param(&raft_ctx->raft_proc, (char *)"RAFT_RECEIVE_BUFFER_SIZE",
(void *)attr->raft_receive_buffer_size);
if (status != OG_SUCCESS) {
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "set raft receive buffer size");
return OG_ERROR;
}
status = raft_lib_set_param(&raft_ctx->raft_proc, (char *)"RAFT_RAFT_ENTRY_CACHE_MEMORY_SIZE",
(void *)attr->raft_entry_cache_memory_size);
if (status != OG_SUCCESS) {
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "set raft entry cache memory size");
return OG_ERROR;
}
status = raft_lib_set_param(&raft_ctx->raft_proc, (char *)"RAFT_MAX_SIZE_PER_MSG",
(void *)attr->raft_max_size_per_msg);
if (status != OG_SUCCESS) {
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "set raft max size per msg");
return OG_ERROR;
}
OG_LOG_RUN_INF("RAFT: set raft tls dir : %s", attr->raft_tls_dir);
if (attr->raft_tls_dir[0] != '\0') {
status = raft_lib_set_param(&raft_ctx->raft_proc, (char *)"RAFT_TLS_DIR", (void *)attr->raft_tls_dir);
if (status != OG_SUCCESS) {
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "set raft TLS dir");
return OG_ERROR;
}
}
OG_LOG_RUN_INF("RAFT: set raft token verify : %s", attr->raft_token_verify);
if (attr->raft_token_verify[0] != '\0') {
status = raft_lib_set_param(&raft_ctx->raft_proc, (char *)"RAFT_TOKEN_VERIFY", (void *)attr->raft_token_verify);
if (status != OG_SUCCESS) {
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "set raft token verify");
return OG_ERROR;
}
}
status = raft_lib_set_param(&raft_ctx->raft_proc, (char *)"RAFT_MEMORY_THRESHOLD",
(void *)attr->raft_mem_threshold);
if (status != OG_SUCCESS) {
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "set raft mem threshold");
return OG_ERROR;
}
status = raft_lib_set_param(&raft_ctx->raft_proc, (char *)"RAFT_ELECTION_TIMEOUT",
(void *)attr->raft_election_timeout);
if (status != OG_SUCCESS) {
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "set raft election timeout");
return OG_ERROR;
}
char raft_failover_lib_timeout[OG_HOST_NAME_BUFFER_SIZE] = {0};
int iret = sprintf_s(raft_failover_lib_timeout, OG_HOST_NAME_BUFFER_SIZE, "%llu",
(unsigned long long)MILLISECS_PER_SECOND * attr->raft_failover_lib_timeout);
if (iret == -1) {
OG_THROW_ERROR(ERR_SYSTEM_CALL, iret);
return OG_ERROR;
}
status = raft_lib_set_param(&raft_ctx->raft_proc, (char *)"RAFT_FAILOVER_LIB_TIMEOUT",
(void *)raft_failover_lib_timeout);
if (status != OG_SUCCESS) {
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "set raft failover lib timeout");
return OG_ERROR;
}
status = raft_lib_init_consistency(&raft_ctx->raft_proc, attr->raft_node_id, attr->raft_peer_ids,
attr->raft_kudu_dir, attr->raft_local_addr, attr->raft_peer_addrs,
last_committed_lfn, attr->raft_start_mode);
if (status != OG_SUCCESS) {
OG_LOG_RUN_INF("RAFT: failed to init raft module");
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "init raft module");
return OG_ERROR;
}
raft_ctx->status = RAFT_STATUS_INITED;
knl_panic(*last_committed_lfn != OG_INVALID_ID64);
OG_LOG_RUN_WAR("RAFT: Init consistency successfully with node id (%d) peer(%s), kudu(%s), "
"addr(%s), peerAddrs(%s), lfn=%lld.",
attr->raft_node_id, attr->raft_peer_ids, attr->raft_kudu_dir,
attr->raft_local_addr, attr->raft_peer_addrs, *last_committed_lfn);
return OG_SUCCESS;
}
void raft_reset_async_buffer(raft_context_t *raft_ctx)
{
raft_ctx->logwr_async_buf_flush_pos = 0;
raft_ctx->logwr_async_buf_raft_pos = 0;
raft_ctx->logwr_async_buf_write_pos = 0;
}
void raft_wait_for_log_flush(knl_session_t *session, uint64 end_lfn)
{
raft_context_t *raft_ctx = &session->kernel->raft_ctx;
while (raft_ctx->raft_flush_point.lfn < end_lfn) {
cm_sleep(1);
if (session->killed) {
OG_THROW_ERROR(ERR_OPERATION_KILLED);
return;
}
}
}
void raft_wait_for_batch_commit_in_raft(knl_session_t *session, uint64 lfn)
{
raft_context_t *raft_ctx = &session->kernel->raft_ctx;
for (;;) {
if (lfn <= raft_ctx->commit_lfn) {
return;
}
if (session->killed) {
OG_THROW_ERROR(ERR_OPERATION_KILLED);
return;
}
(void)cm_wait_cond(&raft_ctx->cond, 3);
}
}
status_t raft_write_to_async_buffer_num(knl_session_t *session, log_batch_t *batch, log_batch_t **new_batch)
{
raft_context_t *ogx = &session->kernel->raft_ctx;
log_batch_tail_t *tail = (log_batch_tail_t *)((char *)batch + batch->size - sizeof(log_batch_tail_t));
OG_LOG_DEBUG_INF("RAFT: write to async buffer 0: flush pos=%d, raft pos=%d, write pos= %d, "
"batch lfn=%lld, batch index=%lld\n",
ogx->logwr_async_buf_flush_pos, ogx->logwr_async_buf_raft_pos,
ogx->logwr_async_buf_write_pos, (uint64)batch->head.point.lfn, batch->raft_index);
batch->space_size = CM_CALC_ALIGN(batch->size, ogx->log_block_size);
if (DB_IS_PRIMARY(&session->kernel->db)) {
log_calc_batch_checksum(session, batch);
}
if (ogx->status == RAFT_STATUS_CLOSING) {
return OG_ERROR;
}
cm_spin_lock(&ogx->raft_lock, NULL);
knl_panic(batch->head.point.lfn != 0 && batch->space_size != 0 && batch->head.point.lfn == tail->point.lfn &&
batch->head.magic_num == LOG_MAGIC_NUMBER && tail->magic_num == LOG_MAGIC_NUMBER);
knl_panic(ogx->logwr_async_buf_slot_size >= batch->space_size);
uint32 left_size = ogx->logwr_async_buf_size -
(uint64)ogx->logwr_async_buf_write_pos * ogx->logwr_async_buf_slot_size;
uint32 copy_size = left_size > SECUREC_MEM_MAX_LEN ? SECUREC_MEM_MAX_LEN : left_size;
errno_t ret = memcpy_s(ogx->logwr_async_buf +
(uint64)ogx->logwr_async_buf_write_pos * ogx->logwr_async_buf_slot_size, copy_size, batch, batch->space_size);
knl_securec_check(ret);
*new_batch = (log_batch_t *)(ogx->logwr_async_buf +
(uint64)ogx->logwr_async_buf_write_pos * ogx->logwr_async_buf_slot_size);
ogx->logwr_async_buf_write_pos = RAFT_ASYNC_LOG_NEXT(ogx, ogx->logwr_async_buf_write_pos);
cm_spin_unlock(&ogx->raft_lock);
OG_LOG_DEBUG_INF("RAFT: write to async buffer1: flush pos=%d, raft pos=%d, write pos= %d, "
"batch lfn=%lld, batch index=%lld\n",
ogx->logwr_async_buf_flush_pos, ogx->logwr_async_buf_raft_pos, ogx->logwr_async_buf_write_pos,
(uint64)(*new_batch)->head.point.lfn, (*new_batch)->raft_index);
if (ogx->logwr_async_buf_write_pos != ogx->logwr_async_buf_flush_pos) {
return OG_SUCCESS;
}
while (ogx->logwr_async_buf_write_pos == ogx->logwr_async_buf_flush_pos) {
(void)cm_wait_cond(&ogx->cond, 1);
if (ogx->status == RAFT_STATUS_CLOSING) {
return OG_ERROR;
}
}
knl_panic(ogx->logwr_async_buf_write_pos != ogx->logwr_async_buf_raft_pos);
return OG_SUCCESS;
}
void raft_async_log_buf_init(knl_session_t *session)
{
knl_instance_t *kernel = session->kernel;
raft_context_t *raft_ctx = &kernel->raft_ctx;
raft_ctx->logwr_async_buf = kernel->attr.lgwr_async_buf;
raft_ctx->logwr_head_buf = kernel->attr.lgwr_head_buf;
raft_ctx->logwr_async_buf_size = (uint32)kernel->attr.lgwr_async_buf_size;
raft_ctx->logwr_async_buf_num = kernel->attr.raft_log_async_buffer_num;
raft_ctx->logwr_async_buf_slot_size = (uint32)kernel->attr.log_buf_size;
raft_ctx->logwr_head_buf_size = (uint32)kernel->attr.lgwr_head_buf_size;
raft_ctx->sent_lfn = 0;
raft_ctx->commit_lfn = 0;
raft_reset_async_buffer(raft_ctx);
}
status_t raft_flush_log(knl_session_t *session, log_batch_t *batch)
{
int64 writeLogRes;
OG_LOG_DEBUG_INF("RAFT: raft flush log : batch len= %d, batch space len=%d, batch lfn=%lld\n",
batch->size, batch->space_size, (uint64)batch->head.point.lfn);
knl_panic(batch->space_size < OG_MIN_RAFT_PER_MSG_SIZE);
if (session->kernel->raft_ctx.status == RAFT_STATUS_CLOSING) {
return OG_ERROR;
}
writeLogRes = raft_lib_exec_writelog_cmd(&session->kernel->raft_ctx.raft_proc,
batch->head.point.lfn, batch, batch->space_size);
if (writeLogRes != OG_SUCCESS) {
OG_LOG_RUN_INF("RAFT: failed to write raft log");
OG_THROW_ERROR(ERR_WRITE_LOG_FAILED, "raft log");
return OG_ERROR;
}
return OG_SUCCESS;
}
static inline void raft_primary_wait_replay(log_context_t *redo_ctx, uint64 kudu_lfn)
{
for (;;) {
if (redo_ctx->curr_point.lfn >= kudu_lfn) {
break;
}
cm_sleep(5);
}
}
static inline void raft_standby_wait_raftlog(raft_context_t *raft_ctx, uint64 kudu_lfn)
{
for (;;) {
if (raft_ctx->flush_point.lfn >= kudu_lfn) {
break;
}
cm_sleep(5);
}
}
status_t raft_db_start_follower(knl_session_t *session, repl_role_t old_role)
{
status_t status;
knl_instance_t *kernel = (knl_instance_t *)session->kernel;
log_context_t *redo_ctx = &kernel->redo_ctx;
lrcv_context_t *lrcv_ctx = &kernel->lrcv_ctx;
raft_context_t *raft_ctx = &kernel->raft_ctx;
lrpl_context_t *lrpl_ctx = &session->kernel->lrpl_ctx;
uint64 lfn = dtc_my_ctrl(session)->rcy_point.lfn;
uint32 file_id;
if (!DB_IS_RAFT_ENABLED(kernel)) {
return OG_SUCCESS;
}
lrcv_ctx->status = LRCV_READY;
for (;;) {
if (lrpl_ctx->curr_point.asn != OG_INVALID_ASN && log_point_equal(&lrpl_ctx->curr_point, redo_ctx)) {
break;
}
if (session->killed) {
OG_LOG_RUN_INF("RAFT: session killed");
OG_THROW_ERROR(ERR_OPERATION_KILLED);
return OG_ERROR;
}
cm_sleep(100);
}
raft_ctx->flush_point = lrpl_ctx->curr_point;
raft_ctx->recv_point = lrpl_ctx->curr_point;
knl_panic(raft_ctx->raft_flush_point.lfn <= raft_ctx->flush_point.lfn);
raft_ctx->raft_flush_point.lfn = raft_ctx->flush_point.lfn;
knl_panic(raft_ctx->raft_recv_point.lfn <= raft_ctx->recv_point.lfn);
raft_ctx->raft_recv_point.lfn = raft_ctx->recv_point.lfn;
file_id = log_get_id_by_asn(session, (uint32)raft_ctx->flush_point.rst_id, raft_ctx->flush_point.asn, NULL);
knl_panic(redo_ctx->curr_file == file_id);
log_unlatch_file(session, file_id);
status = raft_db_init_module(session, &lfn);
if (status != OG_SUCCESS) {
OG_LOG_RUN_INF("RAFT: init db module failed");
return OG_ERROR;
}
if (old_role == REPL_ROLE_PRIMARY) {
raft_primary_wait_replay(redo_ctx, lfn);
} else {
raft_standby_wait_raftlog(raft_ctx, lfn);
}
OG_LOG_RUN_INF("RAFT: complete redo log from kudu finished to lfn(%lld), last role(%d).", lfn, old_role);
return OG_SUCCESS;
}
status_t raft_db_start_leader(knl_session_t *session)
{
if (!DB_IS_RAFT_INITED(session->kernel)) {
OG_LOG_RUN_INF("RAFT: raft is not enabled, or raft module is not inited");
OG_THROW_ERROR(ERR_RAFT_MODULE_NOT_INITED);
return OG_ERROR;
}
uint32 raft_failover_lib_timeout = session->kernel->attr.raft_failover_lib_timeout;
int64 promoteRes = raft_lib_promote_leader(&session->kernel->raft_ctx.raft_proc, 1,
(unsigned long long)MILLISECS_PER_SECOND * raft_failover_lib_timeout);
if (promoteRes != OG_SUCCESS) {
OG_LOG_RUN_INF("RAFT: failed to promote raft leader");
OG_THROW_ERROR(ERR_RAFT_INIT_FAILED, "promote raft leader");
return OG_ERROR;
}
OG_LOG_RUN_WAR("RAFT: start callback to promote leader in database");
return OG_SUCCESS;
}
status_t knl_raft_add_member(knl_handle_t session, uint64 node_id, char *addr, uint64 timeout, uint64 role)
{
int64 status;
knl_session_t *se = (knl_session_t *)session;
CM_POINTER(session);
if (!DB_IS_RAFT_INITED(se->kernel)) {
OG_LOG_RUN_INF("RAFT: raft is not enabled, or raft module is not inited");
OG_THROW_ERROR(ERR_RAFT_MODULE_NOT_INITED);
return OG_ERROR;
}
OG_LOG_RUN_WAR("RAFT: add member, node id=%lld, addr=%s, role=%lld\n", node_id, addr, role);
status = raft_lib_add_member(&se->kernel->raft_ctx.raft_proc, node_id, addr, timeout, role);
if (status != 0) {
OG_LOG_RUN_INF("RAFT: add member failed");
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t knl_raft_del_member(knl_handle_t session, uint64 node_id, uint64 timeout)
{
int64 status;
knl_session_t *se = (knl_session_t *)session;
CM_POINTER(session);
if (!DB_IS_RAFT_INITED(se->kernel)) {
OG_LOG_RUN_INF("RAFT: raft is not enabled, or raft module is not inited");
OG_THROW_ERROR(ERR_RAFT_MODULE_NOT_INITED);
return OG_ERROR;
}
OG_LOG_RUN_WAR("RAFT: del member, node id=%lld\n", node_id);
status = raft_lib_delete_member(&se->kernel->raft_ctx.raft_proc, node_id, timeout);
if (status != 0) {
OG_LOG_RUN_INF("RAFT: delete member failed");
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t knl_raft_query_info(knl_handle_t session, char *type, char **query_info)
{
knl_session_t *se = (knl_session_t *)session;
CM_POINTER(session);
if (!DB_IS_RAFT_INITED(se->kernel)) {
OG_LOG_RUN_INF("RAFT: raft is not enabled, or raft module is not inited");
OG_THROW_ERROR(ERR_RAFT_MODULE_NOT_INITED);
return OG_ERROR;
}
*query_info = raft_lib_query_info(&se->kernel->raft_ctx.raft_proc, type);
return OG_SUCCESS;
}
status_t knl_raft_monitor_info(knl_handle_t session, char **monitor_info)
{
knl_session_t *se = (knl_session_t *)session;
CM_POINTER(session);
if (!DB_IS_RAFT_INITED(se->kernel)) {
OG_LOG_RUN_INF("RAFT: raft is not enabled, or raft module is not inited");
OG_THROW_ERROR(ERR_RAFT_MODULE_NOT_INITED);
return OG_ERROR;
}
*monitor_info = raft_lib_monitor(&se->kernel->raft_ctx.raft_proc);
return OG_SUCCESS;
}
status_t knl_raft_version(knl_handle_t session, char **version)
{
knl_session_t *se = (knl_session_t *)session;
CM_POINTER(session);
if (!DB_IS_RAFT_INITED(se->kernel)) {
OG_LOG_RUN_INF("RAFT: raft is not enabled, or raft module is not inited");
OG_THROW_ERROR(ERR_RAFT_MODULE_NOT_INITED);
return OG_ERROR;
}
*version = raft_lib_get_version(&se->kernel->raft_ctx.raft_proc);
return OG_SUCCESS;
}
status_t knl_raft_set_param(knl_handle_t session, char *param_name, void *value)
{
int status;
knl_session_t *se = (knl_session_t *)session;
CM_POINTER(session);
if (!DB_IS_RAFT_INITED(se->kernel)) {
OG_LOG_RUN_INF("RAFT: raft is not enabled, or raft module is not inited");
OG_THROW_ERROR(ERR_RAFT_MODULE_NOT_INITED);
return OG_ERROR;
}
status = raft_lib_set_param(&se->kernel->raft_ctx.raft_proc, param_name, (void *)value);
if (status != 0) {
OG_LOG_RUN_INF("RAFT: set parameter failed");
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t raft_log_flush_to_disk(knl_session_t *session, log_context_t *ogx, log_batch_t *batch)
{
log_file_t *file = &ogx->files[ogx->curr_file];
batch->space_size = CM_CALC_ALIGN(batch->size, file->ctrl->block_size);
int64 offset = (int64)batch->head.point.block_id * file->head.block_size;
if (cm_write_device(file->ctrl->type, file->handle, offset, batch, batch->space_size) != OG_SUCCESS) {
OG_LOG_RUN_ERR("[LOG] failed to read %s", file->ctrl->name);
cm_close_device(file->ctrl->type, &file->handle);
return OG_ERROR;
}
if (!DB_IS_PRIMARY(&session->kernel->db)) {
knl_panic(file->head.write_pos == (uint64)offset);
file->head.write_pos += batch->space_size;
ogx->free_size -= batch->space_size;
file->head.last = batch->scn;
if (file->head.first == OG_INVALID_ID64) {
file->head.first = batch->scn;
log_flush_head(session, file);
}
}
return OG_SUCCESS;
}
void log_async_proc(thread_t *thread)
{
knl_session_t *session = (knl_session_t *)thread->argument;
log_context_t *redo_ctx = &session->kernel->redo_ctx;
raft_context_t *raft_ctx = &session->kernel->raft_ctx;
log_batch_t *batch_log = NULL;
log_batch_tail_t *tail = NULL;
static uint64 prev_lfn = OG_INVALID_ID64;
static uint64 prev_raft_index = OG_INVALID_ID64;
log_point_t last_flush_point;
knl_panic(DB_IS_RAFT_ENABLED(session->kernel));
KNL_SESSION_SET_CURR_THREADID(session, cm_get_current_thread_id());
while (!thread->closed) {
if (DB_NOT_READY(session) || !DB_IS_RAFT_INITED(session->kernel)) {
cm_sleep(200);
continue;
}
if (raft_ctx->logwr_async_buf_flush_pos == raft_ctx->logwr_async_buf_raft_pos) {
(void)cm_wait_cond(&raft_ctx->cond, 2);
continue;
}
cm_spin_lock(&raft_ctx->raft_lock, NULL);
if (raft_ctx->logwr_async_buf_flush_pos == raft_ctx->logwr_async_buf_raft_pos) {
cm_spin_unlock(&raft_ctx->raft_lock);
continue;
}
batch_log = (log_batch_t *)(raft_ctx->logwr_async_buf +
(uint64)raft_ctx->logwr_async_buf_flush_pos * raft_ctx->logwr_async_buf_slot_size);
knl_panic(batch_log->raft_index != OG_INVALID_ID64 && batch_log->head.magic_num == LOG_MAGIC_NUMBER);
last_flush_point = raft_ctx->flush_point;
OG_LOG_DEBUG_INF("RAFT: log async1: flush pos=%d, raft pos=%d, write pos= %d, "
"batch lfn=%lld, batch index=%lld\n",
raft_ctx->logwr_async_buf_flush_pos, raft_ctx->logwr_async_buf_raft_pos,
raft_ctx->logwr_async_buf_write_pos, (uint64)batch_log->head.point.lfn, batch_log->raft_index);
knl_panic(prev_lfn == OG_INVALID_ID64 || prev_lfn + 1 == batch_log->head.point.lfn);
knl_panic(prev_raft_index == OG_INVALID_ID64 || prev_raft_index <= batch_log->raft_index);
prev_lfn = batch_log->head.point.lfn;
prev_raft_index = batch_log->raft_index;
tail = (log_batch_tail_t *)((char *)batch_log + batch_log->size - sizeof(log_batch_tail_t));
knl_panic(rcy_validate_batch(batch_log, tail));
cm_spin_unlock(&raft_ctx->raft_lock);
cm_spin_lock(&raft_ctx->raft_write_disk_lock, NULL);
raft_log_flush_init(session, batch_log);
if (raft_log_flush_to_disk(session, redo_ctx, (log_batch_t *)batch_log) != OG_SUCCESS) {
cm_spin_unlock(&raft_ctx->raft_write_disk_lock);
KNL_SESSION_CLEAR_THREADID(session);
CM_ABORT(0, "[RAFT] ABORT INFO: raft log async proc, log flush to disk failed.");
}
knl_panic(batch_log->head.point.lfn == (uint64)raft_ctx->flush_point.lfn + 1);
knl_panic(batch_log->raft_index >= raft_ctx->raft_flush_point.raft_index);
raft_ctx->flush_point.block_id = batch_log->head.point.block_id +
batch_log->space_size / raft_ctx->log_block_size;
raft_ctx->flush_point.asn = batch_log->head.point.asn;
raft_ctx->flush_point.lfn = batch_log->head.point.lfn;
raft_ctx->flush_point.rst_id = batch_log->head.point.rst_id;
raft_ctx->raft_flush_point.scn = batch_log->scn;
raft_ctx->raft_flush_point.lfn = batch_log->head.point.lfn;
raft_ctx->raft_flush_point.raft_index = batch_log->raft_index;
session->kernel->lfn = batch_log->head.point.lfn;
knl_panic(log_cmp_point(&last_flush_point, &raft_ctx->flush_point) < 0);
cm_spin_unlock(&raft_ctx->raft_write_disk_lock);
raft_ctx->logwr_async_buf_flush_pos = RAFT_ASYNC_LOG_NEXT(raft_ctx, raft_ctx->logwr_async_buf_flush_pos);
OG_LOG_DEBUG_INF("RAFT: log async2: flush pos=%d, raft pos=%d, write pos= %d, batch lfn=%lld,"
"batch index=%lld, flush_point lfn=%lld, flush_point offset=%u\n",
raft_ctx->logwr_async_buf_flush_pos, raft_ctx->logwr_async_buf_raft_pos,
raft_ctx->logwr_async_buf_write_pos, (uint64)batch_log->head.point.lfn,
batch_log->raft_index, (uint64)raft_ctx->flush_point.lfn, raft_ctx->flush_point.block_id);
}
KNL_SESSION_CLEAR_THREADID(session);
}
void raft_stop_consistency(knl_session_t *session)
{
raft_context_t *raft_ctx = &session->kernel->raft_ctx;
raft_lib_stop_consistency(&raft_ctx->raft_proc);
raft_close_lib(&raft_ctx->raft_proc);
cm_destory_cond(&raft_ctx->cond);
}