* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* rep_leader.c
* leader process
*
* IDENTIFICATION
* src/replication/rep_leader.c
*
* -------------------------------------------------------------------------
*/
#include "rep_leader.h"
#include "cm_date.h"
#include "cm_thread.h"
#include "metadata.h"
#include "election.h"
#include "rep_msg_pack.h"
#include "rep_common.h"
#include "replication.h"
#include "cm_timer.h"
#include "util_perf_stat.h"
#include "rep_monitor.h"
#include "cm_num.h"
#include "cm_text.h"
#define APPEND_NORMAL_MODE 0
#define APPEND_REMATCH_MODE 1
#define APPEND_INTERVAL 1000
#define FLAG_EXISTS_ACTIVE_NODE 0x1
#define FLAG_EXISTS_LOG 0x2
#define FLAG_CONTROL_FLOW 0x4
typedef struct st_rep_leader_state_t {
volatile uint64 next_index[CM_MAX_NODE_COUNT];
volatile log_id_t match_index[CM_MAX_NODE_COUNT];
volatile uint8 append_mode[CM_MAX_NODE_COUNT];
volatile uint32 pause_time[CM_MAX_NODE_COUNT];
atomic32_t try_rematch[CM_MAX_NODE_COUNT];
uint64 apply_index[CM_MAX_NODE_COUNT];
uint64 pre_app_time[CM_MAX_NODE_COUNT];
uint32 disk_error;
}rep_leader_state_t;
#define NEXT_INDEX (g_leader_state[stream_id].next_index[node_id])
#define MATCH_INDEX (g_leader_state[stream_id].match_index[node_id])
#define APPEND_MODE (g_leader_state[stream_id].append_mode[node_id])
#define PAUSE_TIME (g_leader_state[stream_id].pause_time[node_id])
#define TRY_REMATCH (g_leader_state[stream_id].try_rematch[node_id])
#define APPLY_INDEX (g_leader_state[stream_id].apply_index[node_id])
#define PRE_APPTIME (g_leader_state[stream_id].pre_app_time[node_id])
#define DISK_ERROR (g_leader_state[stream_id].disk_error)
#define DISK_ERROR_THRESHOLD 10
rep_leader_state_t g_leader_state[CM_MAX_STREAM_COUNT];
static cm_thread_cond_t g_appendlog_cond;
static thread_t g_appendlog_thread[REP_MAX_APPEND_THREAS_NUM];
static uint64 g_append_thread_id[CM_MAX_NODE_COUNT];
static uint32 g_append_thread_num;
static uint32 g_cur_node_id;
thread_t g_leader_monitor_thread;
rep_monitor_statistics_t g_leader_monitor_statistics;
#define LOAD_LEVEL (g_leader_monitor_statistics.load_level)
#define ADJUST_STEP (g_leader_monitor_statistics.adjust_step)
#define HIGH_LEVEL_TIMES (g_leader_monitor_statistics.high_level_times)
#define REP_FC_TIME_UNIT 100
#define REP_FC_INIT_VAL 10
#define REP_FC_SAMP_PERIOD 1
#define REP_FC_CTRL_PERIOD 5
#define REP_FC_MAX_VAL 100
#define FC_MIN_MAX 2
#define REP_FC_CTRL_THD 4000
static volatile uint32 g_rep_flow_ctrl_val = REP_FC_INIT_VAL;
static uint32 g_flow_ctrl_type = FC_NONE;
static void rep_appendlog_thread_entry(thread_t *thread);
static status_t rep_appendlog_ack_proc(mec_message_t *pack);
static void rep_follower_accepted_trigger(uint32 stream_id, uint32 node_id, log_id_t log_id);
static void rep_leader_monitor_entry(thread_t *thread);
static void rep_init_thread_id();
status_t rep_leader_reset(uint32 stream_id);
status_t rep_wait_all_logs_applied(uint32 stream_id);
status_t rep_leader_init()
{
uint32 streams[CM_MAX_STREAM_COUNT];
uint32 stream_count;
param_value_t param_value;
g_cur_node_id = md_get_cur_node();
cm_init_cond(&g_appendlog_cond);
register_msg_process(MEC_CMD_APPEND_LOG_RPC_ACK, rep_appendlog_ack_proc, PRIV_LOW);
if (md_get_param(DCF_PARAM_MEC_BATCH_SIZE, ¶m_value) != CM_SUCCESS) {
LOG_RUN_ERR("rep_leader_init: get batchsize failed.");
return CM_ERROR;
}
if (param_value.batch_size == 0) {
g_flow_ctrl_type = FC_COMMIT_DELAY;
}
LOG_RUN_INF("rep_leader_init: flow_ctrl_type=%u.", g_flow_ctrl_type);
if (md_get_param(DCF_REP_APPEND_THREAD_NUM, ¶m_value) != CM_SUCCESS) {
return CM_ERROR;
}
g_append_thread_num = param_value.rep_append_thread_num;
if (g_append_thread_num <= 0 || g_append_thread_num > REP_MAX_APPEND_THREAS_NUM) {
LOG_RUN_ERR("rep_leader_init failed: invalid param value :REP_APPEND_THREAD_NUM = %u.",
g_append_thread_num);
return CM_ERROR;
}
rep_init_thread_id();
CM_RETURN_IFERR(md_get_stream_list(streams, &stream_count));
for (uint32 i = 0; i < stream_count; i++) {
uint32 stream_id = streams[i];
uint32 node_num = 0;
CM_RETURN_IFERR(md_get_stream_nodes_count(stream_id, &node_num));
if (node_num == 1) {
CM_RETURN_IFERR(rep_leader_reset(stream_id));
}
}
CM_RETURN_IFERR(rep_monitor_init());
CM_RETURN_IFERR(cm_create_thread(rep_leader_monitor_entry, 0, NULL, &g_leader_monitor_thread));
for (uint64 i = 0; i < g_append_thread_num; i++) {
CM_RETURN_IFERR(cm_create_thread(rep_appendlog_thread_entry, 0,
(void*)i, &g_appendlog_thread[i]));
}
for (uint32 i = 0; i < stream_count; i++) {
uint32 stream_id = streams[i];
if (I_AM_LEADER(stream_id)) {
CM_RETURN_IFERR(rep_wait_all_logs_applied(stream_id));
rep_set_can_write_flag(stream_id, CM_TRUE);
}
}
LOG_RUN_INF("rep_leader_init finished");
return CM_SUCCESS;
}
void rep_leader_deinit()
{
for (uint64 i = 0; i < g_append_thread_num; i++) {
cm_close_thread(&g_appendlog_thread[i]);
}
cm_close_thread(&g_leader_monitor_thread);
LOG_RUN_INF("rep_leader_deinit finished");
}
status_t rep_wait_node_log_catchup(uint32 stream_id, uint32 node_id)
{
uint64 leader_last_index = rep_get_last_index(stream_id);
uint64 node_last_index = rep_leader_get_match_index(stream_id, node_id).index;
uint64 node_old_last_index = 0;
timespec_t begin = cm_clock_now();
timespec_t last = cm_clock_now();
while (node_last_index != leader_last_index) {
if ((cm_clock_now() - last) > MICROSECS_PER_SECOND * CM_10X_FIXED) {
LOG_RUN_INF("[REP]already wait for %lld seconds,leader_last_index=%llu,node_last_index=%llu",
(cm_clock_now() - begin) / MICROSECS_PER_SECOND, leader_last_index, node_last_index);
last = cm_clock_now();
if (node_last_index == node_old_last_index) {
LOG_RUN_WAR("[REP]wait_node_log_catchup failed, node=%u,leader_last_index=%llu,"
"node_last_index=%llu,node_old_last_index=%llu",
node_id, leader_last_index, node_last_index, node_old_last_index);
return CM_ERROR;
}
node_old_last_index = node_last_index;
}
cm_sleep(CM_SLEEP_1_FIXED);
if (!I_AM_LEADER(stream_id)) {
LOG_RUN_INF("[REP]wait_node_log_catchup:I'm not leader now.");
return CM_ERROR;
}
if (elc_is_notify_thread_closed() == CM_TRUE) {
LOG_RUN_INF("[REP]wait_node_log_catchup:status_notify_thread closed, stop now.");
return CM_ERROR;
}
leader_last_index = rep_get_last_index(stream_id);
node_last_index = rep_leader_get_match_index(stream_id, node_id).index;
}
LOG_DEBUG_INF("[REP]wait_node_log_catchup OK. leader_last_index=%llu, node_last_index=%llu",
leader_last_index, node_last_index);
return CM_SUCCESS;
}
status_t rep_wait_all_logs_applied(uint32 stream_id)
{
uint64 last_index = rep_get_last_index(stream_id);
uint64 applied_index = stg_get_applied_index(stream_id);
timespec_t begin = cm_clock_now();
timespec_t last = cm_clock_now();
while (last_index != applied_index) {
if ((cm_clock_now() - last) > MICROSECS_PER_SECOND) {
LOG_RUN_INF("[REP]already wait for %lld seconds,last_index=%llu,applied_index=%llu",
(cm_clock_now() - begin) / MICROSECS_PER_SECOND, last_index, applied_index);
last = cm_clock_now();
}
cm_sleep(CM_SLEEP_1_FIXED);
if (!I_AM_LEADER(stream_id)) {
LOG_RUN_INF("[REP]wait_all_logs_applied:I'm not leader now.");
return CM_ERROR;
}
if (elc_is_notify_thread_closed() == CM_TRUE) {
LOG_RUN_INF("[REP]wait_all_logs_applied:status_notify_thread closed, stop now.");
return CM_ERROR;
}
last_index = rep_get_last_index(stream_id);
applied_index = stg_get_applied_index(stream_id);
}
LOG_DEBUG_INF("[REP]wait_all_logs_applied OK. last_index=%llu, applied_index=%llu", last_index, applied_index);
return CM_SUCCESS;
}
status_t rep_leader_reset(uint32 stream_id)
{
uint32 nodes[CM_MAX_NODE_COUNT];
uint32 count;
CM_RETURN_IFERR(md_get_stream_nodes(stream_id, nodes, &count));
log_id_t last_log = stg_last_log_id(stream_id);
for (uint32 i = 0; i < count; i++) {
uint32 node_id = nodes[i];
NEXT_INDEX = last_log.index;
if (node_id == g_cur_node_id) {
MATCH_INDEX = stg_last_disk_log_id(stream_id);
} else {
log_id_t* invalid_log_id = get_invalid_log_id();
MATCH_INDEX = *invalid_log_id;
}
APPEND_MODE = APPEND_NORMAL_MODE;
TRY_REMATCH = CM_FALSE;
PRE_APPTIME = 0;
PAUSE_TIME = 0;
LOG_DEBUG_INF("[REP]rep_leader_reset:node_id=%u,next_index=%llu", node_id,
NEXT_INDEX);
}
if (I_AM_LEADER(stream_id)) {
1. try commit previous term's log
2. ensure configurations on all nodes are consistent */
CM_RETURN_IFERR(md_set_status(META_CATCH_UP));
uint32 size;
char *md_buf = (char *)malloc(CM_METADATA_DEF_MAX_LEN);
if (md_buf == NULL) {
LOG_DEBUG_ERR("rep_leader_reset malloc failed");
CM_RETURN_IFERR(md_set_status(META_NORMAL));
return CM_ERROR;
}
if (md_to_string(md_buf, CM_METADATA_DEF_MAX_LEN, &size) != CM_SUCCESS) {
CM_FREE_PTR(md_buf);
CM_RETURN_IFERR(md_set_status(META_NORMAL));
return CM_ERROR;
}
if (rep_write(stream_id, md_buf, size, CFG_LOG_KEY(CM_INVALID_NODE_ID, OP_FLAG_NONE),
ENTRY_TYPE_CONF, NULL) != CM_SUCCESS) {
CM_FREE_PTR(md_buf);
CM_RETURN_IFERR(md_set_status(META_NORMAL));
return CM_ERROR;
}
CM_FREE_PTR(md_buf);
CM_RETURN_IFERR(md_set_status(META_NORMAL));
} else {
LOG_RUN_WAR("rep_leader_reset:I'm not a leader now!");
}
LOG_RUN_INF("rep_leader_reset finished");
return CM_SUCCESS;
}
static inline void rep_init_appendlog_head(uint32 stream_id, rep_apendlog_req_t* appendlog_req,
uint64 pre_log_index, uint64 last_log_index)
{
appendlog_req->head.req_seq = g_timer()->now;
appendlog_req->head.ack_seq = 0;
appendlog_req->head.trace_key = get_trace_key();
appendlog_req->head.msg_ver = REP_MSG_VER;
appendlog_req->leader_node_id = g_cur_node_id;
appendlog_req->leader_term = elc_get_current_term(stream_id);
appendlog_req->leader_commit_log = rep_get_commit_log(stream_id);
appendlog_req->leader_first_log.index = stg_first_index(stream_id);
appendlog_req->leader_first_log.term = stg_get_term(stream_id, appendlog_req->leader_first_log.index);
appendlog_req->pre_log.index = pre_log_index;
appendlog_req->pre_log.term = stg_get_term(stream_id, pre_log_index);
appendlog_req->leader_last_index = last_log_index;
appendlog_req->cluster_min_apply_id = rep_get_cluster_min_apply_idx(stream_id);
appendlog_req->log_count = 0;
}
static uint64 rep_calu_log_count_by_control(dcf_role_t default_role, uint64 log_count)
{
if (default_role != DCF_ROLE_PASSIVE) {
return log_count;
}
LOG_DEBUG_INF("[REP]before control count: %llu", log_count);
if (log_count == 0) {
return log_count;
}
log_count = (uint64)(log_count * ADJUST_STEP);
LOG_DEBUG_INF("[REP]flow control count: %llu, load level: %d, step: %f, high times: %u", log_count, LOAD_LEVEL,
ADJUST_STEP, HIGH_LEVEL_TIMES);
if (log_count == 0) {
return 1;
}
return log_count;
}
static uint64 rep_calu_log_count(uint32 stream_id, uint32 node_id, dcf_role_t default_role, uint64 log_begin,
uint64 log_end)
{
uint64 log_count;
if (log_end == CM_INVALID_INDEX_ID) {
return 0;
}
if (log_end < log_begin) {
return 0;
}
if (log_begin == CM_INVALID_INDEX_ID) {
log_count = log_end;
} else {
log_count = (log_end - log_begin) + 1;
}
if (APPEND_MODE != APPEND_NORMAL_MODE) {
log_count = MIN(log_count, 1);
}
return rep_calu_log_count_by_control(default_role, log_count);
}
#define MEC_AND_REP_HEAD_SIZE (sizeof(mec_message_head_t) + sizeof(rep_apendlog_req_t))
CM_STATIC_ASSERT(MEC_BUFFER_RESV_SIZE >= (PADDING_BUFFER_SIZE + MEC_AND_REP_HEAD_SIZE));
CM_STATIC_ASSERT(MEC_BUFFER_RESV_SIZE < (PADDING_BUFFER_SIZE + MEC_AND_REP_HEAD_SIZE + SIZE_K(1)));
static status_t rep_appendlog_node(uint32 stream_id, uint32 node_id, dcf_role_t default_role, uint64 last_log_index,
bool8* node_exists_log)
{
uint64 old_next_index = (uint64)cm_atomic_get((atomic_t*)&NEXT_INDEX);
uint64 log_begin = old_next_index == CM_INVALID_INDEX_ID ? 1 : old_next_index;
log_begin = MAX(log_begin, stg_first_index(stream_id));
uint64 log_count = rep_calu_log_count(stream_id, node_id, default_role, log_begin, last_log_index);
*node_exists_log = (log_count > 0);
Periodically sending empty logs ensures that lost packets are retransmitted */
mec_message_t pack;
CM_RETURN_IFERR(mec_alloc_pack(&pack, MEC_CMD_APPEND_LOG_RPC_REQ, g_cur_node_id, node_id, stream_id));
uint64 pre_log_index = log_begin == CM_INVALID_INDEX_ID ? CM_INVALID_INDEX_ID : log_begin - 1;
rep_apendlog_req_t appendlog_req;
rep_init_appendlog_head(stream_id, &appendlog_req, pre_log_index, last_log_index);
CM_RETURN_IFERR_EX(rep_encode_appendlog_head(&pack, &appendlog_req), mec_release_pack(&pack));
uint32 log_count_pos = mec_get_write_pos(&pack) - sizeof(uint64);
uint64 j = 0;
uint32 total_size = 0;
for (uint64 index = log_begin; j < log_count; index++, j++) {
log_entry_t* entry = stg_get_entry(stream_id, index);
if (entry == NULL) {
break;
}
total_size += (sizeof(rep_log_t) + ENTRY_SIZE(entry));
if (total_size > MESSAGE_BUFFER_SIZE && j > 0) {
LOG_DEBUG_INF("[REP]total_size[%u] is enough, send size[%u]. log_count[%llu], j[%llu]",
total_size, (uint32)(total_size - (sizeof(rep_log_t) + ENTRY_SIZE(entry))), log_count, j);
stg_entry_dec_ref(entry);
break;
}
status_t ret = rep_encode_one_log(&pack, log_count_pos, j + 1, entry);
stg_entry_dec_ref(entry);
if (ret != CM_SUCCESS) {
mec_release_pack(&pack);
LOG_DEBUG_ERR("[REP]encode_one_log fail, index=%llu, j=%llu", index, j);
return CM_ERROR;
}
ps_record1(PS_PACK, index);
}
appendlog_req.log_count = j;
CM_RETURN_IFERR_EX(mec_send_data(&pack), mec_release_pack(&pack));
LOG_DEBUG_INF("[REP]rep send succeed: " REP_APPEND_REQ_FMT, REP_APPEND_REQ_VAL(&pack, &appendlog_req, log_begin));
if (APPEND_MODE == APPEND_NORMAL_MODE) {
(void)cm_atomic_cas((atomic_t*)&NEXT_INDEX, old_next_index, log_begin + j);
LOG_DEBUG_INF("[REP]set next_index to %llu,stream_id=%u,node_id=%u", NEXT_INDEX, stream_id, node_id);
}
mec_release_pack(&pack);
return CM_SUCCESS;
}
static bool32 can_append_log(uint32 stream_id, uint64 last_index, uint32 node_id, dcf_role_t default_role)
{
if (default_role == DCF_ROLE_PASSIVE && LOAD_LEVEL == DCF_LOAD_HIGH_LEVEL &&
(g_timer()->now - PRE_APPTIME) < HIGH_LEVEL_SUSPEND_TIME) {
return CM_FALSE;
}
if ((g_timer()->now - PRE_APPTIME) <= PAUSE_TIME) {
return CM_FALSE;
}
if (g_flow_ctrl_type != FC_NONE) {
if ((g_timer()->now - PRE_APPTIME) < (g_rep_flow_ctrl_val * REP_FC_TIME_UNIT)) {
return CM_FALSE;
}
}
if ((APPEND_MODE == APPEND_NORMAL_MODE && last_index >= NEXT_INDEX) ||
(APPEND_MODE == APPEND_REMATCH_MODE && cm_atomic32_cas(&TRY_REMATCH, 1, 0)) ||
(g_timer()->now - PRE_APPTIME) > APPEND_INTERVAL*MICROSECS_PER_MILLISEC) {
return CM_TRUE;
}
return CM_FALSE;
}
static status_t rep_appendlog_stream(uint64 thread_id, uint32 stream_id, uint32* stream_flag)
{
dcf_node_role_t nodes[CM_MAX_NODE_COUNT];
uint32 node_count;
uint64 last_index = stg_last_index(stream_id);
*stream_flag = 0;
CM_RETURN_IFERR(md_get_stream_node_roles(stream_id, nodes, &node_count));
for (uint32 i = 0; i < node_count; i++) {
uint32 node_id = nodes[i].node_id;
dcf_role_t default_role = nodes[i].default_role;
if (node_id == g_cur_node_id) {
continue;
}
if (thread_id != g_append_thread_id[node_id]) {
continue;
}
if (!mec_is_ready(stream_id, node_id, PRIV_LOW)) {
LOG_DEBUG_ERR_EX("[REP]stream_id%u, node_id%u's connection is not ready", stream_id, node_id);
continue;
}
*stream_flag |= FLAG_EXISTS_ACTIVE_NODE;
if (!can_append_log(stream_id, last_index, node_id, default_role)) {
*stream_flag |= FLAG_CONTROL_FLOW;
continue;
}
PRE_APPTIME = g_timer()->now;
bool8 node_exists_log = CM_FALSE;
if (rep_appendlog_node(stream_id, node_id, default_role, last_index, &node_exists_log) != CM_SUCCESS) {
LOG_DEBUG_ERR("[REP]rep_appendlog_to_node failed:stream_id=%u,node_id=%u.", stream_id, node_id);
continue;
}
if (node_exists_log) {
*stream_flag |= FLAG_EXISTS_LOG;
}
}
return CM_SUCCESS;
}
static void rep_appendlog_thread_entry(thread_t *thread)
{
uint32 streams[CM_MAX_STREAM_COUNT];
uint32 stream_count;
uint64 thread_id = (uint64)thread->argument;
uint32 rep_flag = 0;
if (cm_set_thread_name("rep_appendlog") != CM_SUCCESS) {
LOG_DEBUG_ERR("[REP]set apply thread name failed!");
}
if (md_get_stream_list(streams, &stream_count) != CM_SUCCESS) {
LOG_DEBUG_ERR("[REP]md_get_stream_list failed");
return;
}
while (!thread->closed) {
rep_flag = 0;
for (uint32 i = 0; i < stream_count; i++) {
uint32 stream_id = streams[i];
if (!I_AM_LEADER(stream_id)) {
continue;
}
uint32 stream_flag = 0;
if (rep_appendlog_stream(thread_id, stream_id, &stream_flag) != CM_SUCCESS) {
LOG_DEBUG_ERR("[REP]rep_appendlog failed.");
continue;
}
rep_flag |= stream_flag;
}
if (!(rep_flag & FLAG_EXISTS_ACTIVE_NODE)) {
LOG_DEBUG_INF("[REP]not exists active node.");
cm_sleep(CM_SLEEP_1000_FIXED);
continue;
}
if (rep_flag & FLAG_CONTROL_FLOW) {
(void)cm_wait_cond(&g_appendlog_cond, CM_SLEEP_1_FIXED);
continue;
}
if (!(rep_flag & FLAG_EXISTS_LOG)) {
(void)cm_wait_cond(&g_appendlog_cond, CM_SLEEP_500_FIXED);
}
}
}
void rep_flow_ctrl_sampling_and_calc()
{
uint64 commit_count, commit_total, commit_max, avg_delay;
static uint64 total_delay = 0;
static uint64 last_avg_delay = UINT64_MAX;
static uint64 max_delay = 0;
static uint64 min_delay = UINT64_MAX;
int32 delta;
static int32 ctrl = REP_FC_INIT_VAL;
static int32 direction = 1;
static uint64 count = 0;
uint64 cur_delay = 0;
static timespec_t last = 0;
if (cm_clock_now() - last >= REP_FC_SAMP_PERIOD * MICROSECS_PER_SECOND) {
last = cm_clock_now();
ps_get_stat(PS_COMMIT, &commit_count, &commit_total, &commit_max);
if (commit_count != 0) {
cur_delay = commit_total / commit_count;
total_delay += cur_delay;
max_delay = MAX(max_delay, cur_delay);
min_delay = MIN(min_delay, cur_delay);
count++;
if (count % REP_FC_CTRL_PERIOD == 0) {
avg_delay = (total_delay - (max_delay + min_delay)) / (REP_FC_CTRL_PERIOD - FC_MIN_MAX);
delta = MAX(ctrl / CM_10X_FIXED, 1);
if (avg_delay > last_avg_delay) {
direction = 0 - direction;
} else if (avg_delay == last_avg_delay) {
delta = 0;
}
last_avg_delay = avg_delay;
ctrl = (avg_delay < REP_FC_CTRL_THD) ? (ctrl / CM_2X_FIXED) : MAX(ctrl + delta * direction, 1);
ctrl = MAX(ctrl, 0);
ctrl = MIN(ctrl, REP_FC_MAX_VAL);
g_rep_flow_ctrl_val = (uint32)ctrl;
total_delay = 0;
max_delay = 0;
min_delay = UINT64_MAX;
}
}
LOG_PROFILE("commit_cnt=%llu, cur_lat=%llu, flow_ctrl_val=%u", commit_count, cur_delay, g_rep_flow_ctrl_val);
}
}
static void rep_leader_monitor_entry(thread_t *thread)
{
if (cm_set_thread_name("rep_leader_monitor") != CM_SUCCESS) {
LOG_DEBUG_ERR("[REP]set monitor thread name failed!");
}
LOG_RUN_INF("leader monitor thread start.");
while (!thread->closed) {
if (g_flow_ctrl_type == FC_NONE) {
cm_sleep(CM_SLEEP_1000_FIXED);
continue;
}
rep_flow_ctrl_sampling_and_calc();
(void)rep_monitor_statistics(&g_leader_monitor_statistics);
}
LOG_RUN_INF("leader monitor thread end.");
}
static status_t rep_adjust_majority_groups(uint32 majority_groups[CM_MAX_GROUP_COUNT],
uint32 *count, dcf_node_attr_t *sort_index, uint32 voted_cnt)
{
CM_RETURN_IFERR(md_get_majority_groups(majority_groups, count));
if (*count == 0) {
return CM_SUCCESS;
}
uint32 total = *count;
uint32 idx = 0;
for (uint32 i = 0; i < total; i++) {
uint32 group_value = majority_groups[i];
bool32 exists = CM_FALSE;
for (uint32 j = 0 ; j < voted_cnt; j++) {
if (sort_index[j].group == group_value) {
exists = CM_TRUE;
break;
}
}
if (!exists) {
majority_groups[i] = CM_INVALID_GROUP_ID;
(*count)--;
} else {
majority_groups[idx] = majority_groups[i];
idx++;
}
}
return CM_SUCCESS;
}
static void rep_cal_voted_group_count(uint32 majority_groups[CM_MAX_GROUP_COUNT],
uint32 count, uint32 sort_index_group, uint32 *voted_group_count)
{
for (uint32 j = 0; j < count; j++) {
if (majority_groups[j] == sort_index_group && majority_groups[j] != CM_INVALID_GROUP_ID) {
(*voted_group_count)++;
majority_groups[j] = CM_INVALID_GROUP_ID;
break;
}
}
return;
}
static int rep_index_compare(const void *a, const void *b)
{
if (((dcf_node_attr_t *)a)->index == ((dcf_node_attr_t *)b)->index) {
return 0;
} else if (((dcf_node_attr_t *)a)->index > ((dcf_node_attr_t *)b)->index) {
return -1;
} else {
return 1;
}
}
static status_t rep_cal_commit_index(dcf_node_attr_t *sort_index, uint32 voted_cnt, uint32 stream_id, uint32 quorum,
uint64 *commit_idx)
{
uint32 all_votes = 0;
uint32 majority_groups[CM_MAX_GROUP_COUNT];
uint32 majority_groups_count = 0;
uint32 voted_group_count = 0;
bool32 is_group_voted = CM_TRUE;
for (uint32 i = 0; i < CM_MAX_GROUP_COUNT; i++) {
majority_groups[i] = CM_INVALID_GROUP_ID;
}
qsort(sort_index, voted_cnt, sizeof(dcf_node_attr_t), rep_index_compare);
if (rep_adjust_majority_groups(majority_groups, &majority_groups_count, sort_index, voted_cnt) == CM_SUCCESS) {
is_group_voted = majority_groups_count == 0 ? CM_TRUE : CM_FALSE;
}
for (uint32 i = 0; i < voted_cnt; i++) {
all_votes += sort_index[i].weight;
if (!is_group_voted) {
rep_cal_voted_group_count(majority_groups, majority_groups_count, sort_index[i].group, &voted_group_count);
is_group_voted = voted_group_count >= majority_groups_count ? CM_TRUE: CM_FALSE;
}
if (all_votes >= quorum && is_group_voted) {
*commit_idx = sort_index[i].index;
return CM_SUCCESS;
}
}
if (all_votes < quorum) {
LOG_RUN_ERR("[REP]rep_cal_commit_index: all_votes %u is less than quorum %u", all_votes, quorum);
} else if (!is_group_voted) {
LOG_RUN_ERR("[REP]rep_cal_commit_index: not all group configed in majority groups was voted,"
"voted group count:%u, majority groups count:%u", voted_group_count, majority_groups_count);
for (uint32 k = 0; k < majority_groups_count; k++) {
if (majority_groups[k] != CM_INVALID_GROUP_ID) {
LOG_RUN_ERR("[REP]rep_cal_commit_index, group: %u", majority_groups[k]);
}
}
}
return CM_ERROR;
}
static status_t rep_try_commit_log(uint32 stream_id)
{
uint32 node_count, quorum, weight;
uint32 vote_count = 0;
uint32 voted_node = 0;
bool32 is_elc_voter;
uint64 commit_index;
uint32 nodes[CM_MAX_NODE_COUNT];
dcf_node_attr_t sort_index[CM_MAX_NODE_COUNT];
uint64 min_apply_id = CM_INVALID_ID64;
dcf_node_t node_item;
CM_RETURN_IFERR(elc_get_quorum(stream_id, &quorum));
CM_RETURN_IFERR(md_get_stream_nodes(stream_id, nodes, &node_count));
for (uint32 i = 0; i < node_count; i++) {
uint32 node_id = nodes[i];
CM_RETURN_IFERR(md_get_node(node_id, &node_item));
CM_RETURN_IFERR(elc_is_voter(stream_id, node_id, &is_elc_voter));
if (is_elc_voter) {
CM_RETURN_IFERR(elc_node_voting_weight(stream_id, node_id, &weight));
uint64 index = MATCH_INDEX.index;
sort_index[voted_node].index = index;
sort_index[voted_node].weight = weight;
sort_index[voted_node].group = node_item.group;
vote_count += weight;
++voted_node;
LOG_DEBUG_INF("[REP]rep_try_commit_log:node_id=%u,match_index=%llu,weight=%u.", node_id, index, weight);
}
min_apply_id = MIN(min_apply_id, APPLY_INDEX);
}
rep_set_cluster_min_apply_idx(stream_id, min_apply_id);
CM_RETURN_IFERR(rep_cal_commit_index(sort_index, voted_node, stream_id, quorum, &commit_index));
uint64 log_term = stg_get_term(stream_id, commit_index);
uint64 cur_term = elc_get_current_term(stream_id);
LOG_DEBUG_INF("[REP]rep_cal_commit_idx:quorum=%u,try commit_idx=%llu,log_term=%llu,cur_term=%llu.",
quorum, commit_index, log_term, cur_term);
if (log_term == cur_term) {
log_id_t last = rep_get_commit_log(stream_id);
if (last.index != commit_index) {
if (commit_index <= last.index) {
LOG_DEBUG_WAR("[REP]current commit_index(%llu) is not larger than last.index(%llu), work_mode=%d",
commit_index, last.index, elc_get_work_mode(stream_id));
}
rep_set_commit_log(stream_id, log_term, commit_index);
rep_apply_trigger();
LOG_DEBUG_INF("[REP]leader set commit index to (%llu,%llu)", log_term, commit_index);
}
} else {
LOG_DEBUG_INF("[REP]index term is not current term,can't be committed.index=%llu,"
"log_term=%llu,current term = %llu", commit_index, log_term, cur_term);
}
return CM_SUCCESS;
}
status_t rep_leader_acceptlog_proc(uint32 stream_id)
{
LOG_TRACE(rep_get_tracekey(), "accept:rep_leader_acceptlog_proc rep_try_commit_log.");
LOG_DEBUG_INF("rep_leader_acceptlog_proc.");
uint32 node_id = g_cur_node_id;
APPLY_INDEX = stg_get_applied_index(stream_id);
CM_RETURN_IFERR(rep_try_commit_log(stream_id));
return CM_SUCCESS;
}
static void rep_rematch_proc(uint32 stream_id, uint32 node_id, const rep_apendlog_ack_t* ack)
{
APPEND_MODE = APPEND_REMATCH_MODE;
log_id_t next_log = rep_get_pre_term_log(stream_id, ack->pre_log.index);
LOG_DEBUG_INF("[REP] pre_log(%llu,%llu),mismatch_log(%llu,%llu),next_log(%llu,%llu)",
ack->pre_log.term, ack->pre_log.index,
ack->mismatch_log.term, ack->mismatch_log.index,
next_log.term, next_log.index);
if (next_log.index < ack->mismatch_log.index) {
next_log = ack->mismatch_log;
}
if (NEXT_INDEX > next_log.index) {
(void)cm_atomic_set((atomic_t*)&NEXT_INDEX, ack->mismatch_log.index);
LOG_DEBUG_INF("[REP]pre log is mismatch,reset next index to:%llu,stream_id=%u,node_id=%u",
NEXT_INDEX, stream_id, node_id);
} else {
LOG_DEBUG_INF("[REP]pre log is mismatch,next index:%llu,mismatch(%llu,%llu)",
NEXT_INDEX, ack->mismatch_log.term, ack->mismatch_log.index);
}
(void)cm_atomic32_cas(&TRY_REMATCH, 0, 1);
rep_appendlog_trigger(stream_id);
}
static status_t rep_check_appendlog_ack(uint32 stream_id, uint32 node_id, rep_apendlog_ack_t* ack)
{
uint64 cur_term = elc_get_current_term(stream_id);
if (ack->follower_term > cur_term) {
(void)elc_judge_term(stream_id, ack->follower_term);
LOG_DEBUG_INF("[REP]follower's term is greater than mine.[%llu > %llu]", ack->follower_term, cur_term);
return CM_ERROR;
}
if (ack->ret_code == ERR_TERM_IS_EXPIRED) {
(void)elc_judge_term(stream_id, ack->follower_term);
LOG_DEBUG_INF("[REP]follower's term is greater than mine.[%llu,%llu]", ack->follower_term, cur_term);
return CM_ERROR;
} else if (ack->ret_code == ERR_APPEN_LOG_REQ_LOST) {
LOG_DEBUG_INF("[REP]append log may be lost.reset next index from %llu to %llu,node_id=%u.",
NEXT_INDEX, MATCH_INDEX.index + 1, node_id);
NEXT_INDEX = MATCH_INDEX.index + 1;
} else if (ack->ret_code == ERR_TERM_IS_NOT_MATCH) {
rep_rematch_proc(stream_id, node_id, ack);
return CM_ERROR;
} else if (ack->ret_code != 0) {
LOG_DEBUG_INF("[REP]follower process failed.ret_code=%d", ack->ret_code);
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t rep_appendlog_ack_proc(mec_message_t *pack)
{
uint32 stream_id = pack->head->stream_id;
uint32 node_id = pack->head->src_inst;
rep_apendlog_ack_t ack;
if (rep_decode_appendlog_ack(pack, &ack) != CM_SUCCESS) {
LOG_DEBUG_ERR("[REP]rep_decode_appendlog_ack failed.");
return CM_ERROR;
}
LOG_DEBUG_INF("[REP]recv ack." REP_APPEND_ACK_FMT, REP_APPEND_ACK_VAL(pack, &ack));
if (ack.follower_accept_log.index != CM_INVALID_INDEX_ID) {
ps_record1(PS_FOLLOWER_ACCEPT, ack.follower_accept_log.index);
}
CM_RETURN_IFERR(rep_check_appendlog_ack(stream_id, node_id, &ack));
APPLY_INDEX = ack.apply_id;
if (ack.follower_accept_log.index != CM_INVALID_INDEX_ID ||
ack.follower_accept_log.term != CM_INVALID_TERM_ID) {
uint64 my_term = stg_get_term(stream_id, ack.follower_accept_log.index);
if (my_term == ack.follower_accept_log.term) {
if (APPEND_MODE == APPEND_REMATCH_MODE) {
APPEND_MODE = APPEND_NORMAL_MODE;
(void)cm_atomic32_cas(&TRY_REMATCH, 1, 0);
NEXT_INDEX = ack.follower_accept_log.index;
}
rep_follower_accepted_trigger(stream_id, pack->head->src_inst, ack.follower_accept_log);
LOG_DEBUG_INF("[REP]follower process succeed,next_index=%llu,set match_index=(%llu,%llu)",
NEXT_INDEX, MATCH_INDEX.term, MATCH_INDEX.index);
} else {
if (APPEND_MODE == APPEND_REMATCH_MODE) {
APPEND_MODE = APPEND_NORMAL_MODE;
(void)cm_atomic32_cas(&TRY_REMATCH, 1, 0);
NEXT_INDEX = NEXT_INDEX + 1;
}
}
log_id_t last_log = stg_last_log_id(stream_id);
if (last_log.index >= NEXT_INDEX) {
rep_appendlog_trigger(stream_id);
}
}
(void)elc_set_hb_ack_timeout(stream_id, node_id, cm_clock_now());
return CM_SUCCESS;
}
void rep_appendlog_trigger(uint32 stream_id)
{
LOG_DEBUG_INF("rep_appendlog_trigger.");
cm_release_cond(&g_appendlog_cond);
}
static void rep_follower_accepted_trigger(uint32 stream_id, uint32 node_id, log_id_t log_id)
{
LOG_TRACE(log_id.index, "rep_follower_accepted_trigger.");
LOG_TRACE(rep_get_tracekey(), "rep_follower_accepted_trigger.log_id=%llu", log_id.index);
LOG_DEBUG_INF("[REP]rep_follower_accepted_trigger,node_id=%u,log=(%llu,%llu)",
node_id, log_id.term, log_id.index);
MATCH_INDEX = log_id;
rep_set_accept_flag(stream_id);
}
void rep_leader_acceptlog(uint32 stream_id, uint64 term, uint64 index, status_t status)
{
if (status != CM_SUCCESS) {
if (++DISK_ERROR >= DISK_ERROR_THRESHOLD) {
DISK_ERROR = 0;
(void)elc_demote_follower(stream_id);
}
return;
}
LOG_DEBUG_INF("rep_leader_acceptlog.");
LOG_TRACE(index, "rep_leader_acceptlog.");
uint32 node_id = g_cur_node_id;
MATCH_INDEX.term = term;
MATCH_INDEX.index = index;
NEXT_INDEX = index + 1;
DISK_ERROR = 0;
}
log_id_t rep_leader_get_match_index(uint32 stream_id, uint32 node_id)
{
return MATCH_INDEX;
}
uint64 rep_leader_get_next_index(uint32 stream_id, uint32 node_id)
{
return NEXT_INDEX;
}
uint64 rep_leader_get_apply_index(uint32 stream_id, uint32 node_id)
{
return APPLY_INDEX;
}
void rep_set_pause_time(uint32 stream_id, uint32 node_id, uint32 pause_time)
{
PAUSE_TIME = pause_time;
}
uint32 rep_get_pause_time(uint32 stream_id, uint32 node_id)
{
return PAUSE_TIME;
}
static inline void rep_init_thread_id()
{
uint64 node_id;
uint64 node_cnt = 0;
uint32 cur_node_id = md_get_cur_node();
for (node_id = 0; node_id < CM_MAX_NODE_COUNT; node_id++) {
g_append_thread_id[node_id] = node_cnt % g_append_thread_num;
if (node_id != cur_node_id) {
node_cnt++;
}
}
}
static status_t rep_check_group_value_valid(uint32 group_value, bool32 *is_valid)
{
*is_valid = CM_FALSE;
uint32 node_list[CM_MAX_NODE_COUNT];
uint32 node_count;
uint32 stream_list[CM_MAX_STREAM_COUNT];
uint32 stream_count;
CM_RETURN_IFERR(md_get_stream_list(stream_list, &stream_count));
for (uint32 i = 0; i < stream_count; i++) {
uint32 stream_id = stream_list[i];
CM_RETURN_IFERR(md_get_stream_nodes(stream_id, node_list, &node_count));
for (uint32 i = 0; i < node_count; i++) {
uint32 node_id = node_list[i];
dcf_node_t node_item;
bool32 is_voter = CM_FALSE;
CM_RETURN_IFERR(md_get_node(node_id, &node_item));
CM_RETURN_IFERR(md_is_voter(stream_id, node_id, &is_voter));
if (group_value == node_item.group && is_voter) {
*is_valid = CM_TRUE;
return CM_SUCCESS;
}
}
}
return CM_SUCCESS;
}
status_t rep_check_param_majority_groups()
{
uint32 groups[CM_MAX_GROUP_COUNT] = { 0 };
uint32 count = 0;
bool32 is_valid = CM_FALSE;
CM_RETURN_IFERR(md_get_majority_groups(groups, &count));
for (int i = 0; i < count; i++) {
CM_RETURN_IFERR(rep_check_group_value_valid(groups[i], &is_valid));
if (!is_valid) {
LOG_RUN_WAR("[REP] group %u in majority_groups is not valid, will ignored,"
"pls reset param MAJORITY_GROUPS.", groups[i]);
return CM_ERROR;
}
}
return CM_SUCCESS;
}