* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* dcf_interface.c
* DCF API
*
* IDENTIFICATION
* src/dcf_interface.c
*
* -------------------------------------------------------------------------
*/
#include "interface/dcf_interface.h"
#include "cm_defs.h"
#include "util_error.h"
#include "cm_types.h"
#include "cm_log.h"
#include "cm_memory.h"
#include "cm_text.h"
#include "cm_num.h"
#include "mec.h"
#include "metadata.h"
#include "md_change.h"
#include "replication.h"
#include "rep_common.h"
#include "elc_stream.h"
#include "elc_status_check.h"
#include "util_perf_stat.h"
#include "cm_ip.h"
#include "cJSON.h"
#include "util_profile_stat.h"
#include "stream.h"
#include "cb_func.h"
#include "mec_reactor.h"
#include "mec_instance.h"
#include "md_param.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef enum en_block_ack {
NO_ACK = 0,
SUCCESS_ACK,
ERROR_ACK,
} block_ack_t;
typedef struct st_block_info {
uint32 block_time_ms;
block_ack_t ack;
uint64 leader_last_index;
thread_t thread;
cm_event_t event;
} block_info_t;
typedef struct st_dcf_status {
node_status_t status;
block_info_t block;
atomic32_t writing_cnt;
latch_t latch;
} dcf_status_t;
typedef struct st_dcf_exception_report {
cm_event_t exception_event;
thread_t exception_thread;
uint32 stream_id;
dcf_exception_t exception;
bool32 exception_init_flag;
} dcf_exception_report_t;
static dcf_status_t g_node_status[CM_MAX_STREAM_COUNT] = {0};
static latch_t g_dcf_latch = {0};
static bool32 g_dcf_inited = CM_FALSE;
static usr_cb_msg_proc_t g_cb_send_msg_notify = NULL;
static usr_cb_exception_notify_t g_cb_exception_notify = NULL;
static dcf_exception_report_t g_dcf_exception;
static bool32 g_node_inited = CM_FALSE;
#define MAX_PARALLEL_MAX_NUM 256
typedef struct st_dcf_parallel_msg {
spinlock_t lock;
uint32 cursor;
uint8 req_id_status[MAX_PARALLEL_MAX_NUM];
atomic_t ack_result[MAX_PARALLEL_MAX_NUM];
cm_event_t event[MAX_PARALLEL_MAX_NUM];
} dcf_parallel_msg_t;
static dcf_parallel_msg_t g_universal_write_info = {0};
static dcf_parallel_msg_t g_consensus_hc_info = {0};
status_t set_node_status(uint32 stream_id, node_status_t status, uint32 block_time_ms)
{
dcf_status_t *node_status = &g_node_status[stream_id];
cm_latch_x(&node_status->latch, 0, NULL);
if (status == NODE_BLOCKED) {
if (node_status->status == NODE_BLOCKED) {
cm_unlatch(&node_status->latch, NULL);
LOG_DEBUG_WAR("already blocked, can't set again.");
return CM_ERROR;
}
node_status->block.block_time_ms = block_time_ms;
}
node_status->status = status;
cm_unlatch(&node_status->latch, NULL);
return CM_SUCCESS;
}
static inline node_status_t get_node_status(uint32 stream_id)
{
dcf_status_t *node_status = &g_node_status[stream_id];
cm_latch_s(&node_status->latch, 0, CM_FALSE, NULL);
node_status_t status = node_status->status;
cm_unlatch(&node_status->latch, NULL);
return status;
}
void clear_node_block_status(uint32 stream_id)
{
if (get_node_status(stream_id) == NODE_BLOCKED) {
(void)set_node_status(stream_id, NODE_NORMAL, 0);
}
}
static inline uint32 get_block_time(uint32 stream_id)
{
dcf_status_t *node_status = &g_node_status[stream_id];
cm_latch_s(&node_status->latch, 0, CM_FALSE, NULL);
uint32 block_time = node_status->block.block_time_ms;
cm_unlatch(&node_status->latch, NULL);
return block_time;
}
static inline void set_block_ack(uint32 stream_id, block_ack_t ack, uint64 last_index)
{
dcf_status_t *node_status = &g_node_status[stream_id];
cm_latch_x(&node_status->latch, 0, NULL);
node_status->block.leader_last_index = last_index;
node_status->block.ack = ack;
cm_unlatch(&node_status->latch, NULL);
}
static inline block_ack_t get_block_ack(uint32 stream_id)
{
dcf_status_t *node_status = &g_node_status[stream_id];
cm_latch_s(&node_status->latch, 0, CM_FALSE, NULL);
block_ack_t ack = node_status->block.ack;
cm_unlatch(&node_status->latch, NULL);
return ack;
}
static inline uint64 get_block_last_index(uint32 stream_id)
{
dcf_status_t *node_status = &g_node_status[stream_id];
cm_latch_s(&node_status->latch, 0, CM_FALSE, NULL);
uint64 last_index = node_status->block.leader_last_index;
cm_unlatch(&node_status->latch, NULL);
return last_index;
}
status_t block_node_req(uint32 stream_id, uint32 node_id, uint32 block_time_ms)
{
mec_message_t pack;
uint32 src_node = md_get_cur_node();
CM_RETURN_IFERR(mec_alloc_pack(&pack, MEC_CMD_BLOCK_NODE_RPC_REQ, src_node, node_id, stream_id));
if (mec_put_int32(&pack, block_time_ms) != CM_SUCCESS) {
mec_release_pack(&pack);
LOG_DEBUG_ERR("block node req, encode fail.");
return CM_ERROR;
}
LOG_DEBUG_INF("send blockreq: stream=%u,src=%u,dst=%u,block_time=%u.", stream_id, src_node, node_id, block_time_ms);
status_t ret = mec_send_data(&pack);
mec_release_pack(&pack);
return ret;
}
status_t block_node_ack(uint32 stream_id, unsigned int node_id, block_ack_t ack)
{
if (ack == SUCCESS_ACK) {
timespec_t begin = cm_clock_now();
do {
cm_sleep(CM_SLEEP_10_FIXED);
if ((cm_clock_now() - begin) > MICROSECS_PER_SECOND) {
LOG_RUN_ERR("wait dcf write timeout, writing_cnt=%u.", g_node_status[stream_id].writing_cnt);
break;
}
} while (g_node_status[stream_id].writing_cnt != 0);
}
mec_message_t pack;
uint32 src_node = md_get_cur_node();
CM_RETURN_IFERR(mec_alloc_pack(&pack, MEC_CMD_BLOCK_NODE_RPC_ACK, src_node, node_id, stream_id));
uint64 last_index = stg_last_log_id(stream_id).index;
if (mec_put_int32(&pack, ack) != CM_SUCCESS || mec_put_int64(&pack, last_index) != CM_SUCCESS) {
mec_release_pack(&pack);
LOG_DEBUG_ERR("block node ack, encode fail.");
return CM_ERROR;
}
LOG_DEBUG_INF("send blockack: stream=%u,src=%u,dst=%u,ack=%d,last_index=%llu.",
stream_id, src_node, node_id, ack, last_index);
status_t ret = mec_send_data(&pack);
mec_release_pack(&pack);
return ret;
}
status_t block_node_req_proc(mec_message_t *pack)
{
uint32 stream_id = pack->head->stream_id;
uint32 src_node_id = pack->head->src_inst;
LOG_DEBUG_INF("recv blockreq: stream_id=%u, node_id=%u", stream_id, src_node_id);
uint32 block_time_ms;
CM_RETURN_IFERR(mec_get_int32(pack, (int32*)&block_time_ms));
block_ack_t ack = SUCCESS_ACK;
if (elc_get_node_role(stream_id) != DCF_ROLE_LEADER
|| set_node_status(stream_id, NODE_BLOCKED, block_time_ms) != CM_SUCCESS) {
ack = ERROR_ACK;
}
CM_RETURN_IFERR(block_node_ack(stream_id, src_node_id, ack));
if (ack == ERROR_ACK) {
return CM_SUCCESS;
}
LOG_DEBUG_INF("set node blocked, block_time_ms=%u.", block_time_ms);
cm_event_notify(&g_node_status[stream_id].block.event);
return CM_SUCCESS;
}
status_t block_node_ack_proc(mec_message_t *pack)
{
uint32 stream_id = pack->head->stream_id;
uint32 ack;
CM_RETURN_IFERR(mec_get_int32(pack, (int32*)&ack));
uint64 last_index;
CM_RETURN_IFERR(mec_get_int64(pack, (int64*)&last_index));
LOG_DEBUG_INF("recv blockack: stream_id=%u, ack=%u, last_index=%llu.", stream_id, ack, last_index);
ack = (ack == SUCCESS_ACK) ? SUCCESS_ACK : ERROR_ACK;
set_block_ack(stream_id, ack, last_index);
return CM_SUCCESS;
}
static void block_thread_entry(thread_t *thread)
{
uint32 stream_id = (uint32)(uint64)thread->argument;
LOG_DEBUG_INF("block_thread_entry: stream_id=%u.", stream_id);
cm_event_t *block_event = &g_node_status[stream_id].block.event;
(void)cm_set_thread_name("block_node");
while (!thread->closed) {
(void)cm_event_timedwait(block_event, CM_SLEEP_500_FIXED);
if (get_node_status(stream_id) == NODE_BLOCKED) {
timespec_t begin = cm_clock_now();
uint32 block_time_ms = get_block_time(stream_id);
while (((uint64)(cm_clock_now() - begin)) / MICROSECS_PER_MILLISEC < block_time_ms) {
cm_sleep(CM_SLEEP_1_FIXED);
}
(void)set_node_status(stream_id, NODE_NORMAL, 0);
}
}
}
static void exception_thread_entry(thread_t *thread)
{
dcf_exception_report_t *cur_exception = (dcf_exception_report_t *)thread->argument;
(void)cm_set_thread_name("exception reporting thread");
while (!thread->closed) {
(void)cm_event_timedwait(&(cur_exception->exception_event), CM_SLEEP_1000_FIXED);
if (cur_exception->exception != DCF_RUNNING_NORMAL) {
if (g_cb_exception_notify != NULL) {
int ret = g_cb_exception_notify(cur_exception->stream_id, (uint32) (cur_exception->exception));
LOG_DEBUG_INF("exception report callback: g_cb_exception_notify, retcode=%d", ret);
}
}
}
}
static status_t init_exception_report()
{
LOG_RUN_INF("init exception report");
g_dcf_exception.exception_init_flag = CM_FALSE;
g_dcf_exception.stream_id = CM_INVALID_STREAM_ID;
g_dcf_exception.exception = DCF_RUNNING_NORMAL;
CM_RETURN_IFERR(cm_event_init(&(g_dcf_exception.exception_event)));
CM_RETURN_IFERR(cm_create_thread(exception_thread_entry, 0, (void *)&g_dcf_exception,
&(g_dcf_exception.exception_thread)));
g_dcf_exception.exception_init_flag = CM_TRUE;
return CM_SUCCESS;
}
static status_t deinit_exception_report()
{
LOG_RUN_INF("deinit exception report");
if (g_dcf_exception.exception_init_flag) {
cm_close_thread(&(g_dcf_exception.exception_thread));
cm_event_destory(&(g_dcf_exception.exception_event));
}
g_dcf_exception.exception_init_flag = CM_FALSE;
return CM_SUCCESS;
}
status_t common_msg_proc(mec_message_t *pack)
{
int ret = CM_SUCCESS;
uint32 stream_id = pack->head->stream_id;
uint32 src_node_id = pack->head->src_inst;
char *msg = NULL;
uint32 msg_size = 0;
CM_RETURN_IFERR(mec_get_bin(pack, &msg_size, (void **)&msg));
if (g_cb_send_msg_notify != NULL) {
ret = g_cb_send_msg_notify(stream_id, src_node_id, msg, msg_size);
LOG_DEBUG_INF("Callback: dn_send_msg_notify, retcode=%d", ret);
}
return ret;
}
status_t change_member_req_proc(mec_message_t *pack);
status_t universal_write_req_proc(mec_message_t *pack);
status_t universal_write_ack_proc(mec_message_t *pack);
status_t dcf_get_commit_index_req_proc(mec_message_t *pack);
status_t dcf_get_commit_index_ack_proc(mec_message_t *pack);
status_t init_node_status()
{
uint32 streams[CM_MAX_STREAM_COUNT];
uint32 stream_count;
if (g_node_inited) {
LOG_RUN_INF("init_node_status already sucessful");
return CM_SUCCESS;
}
for (uint32 i = 0; i < MAX_PARALLEL_MAX_NUM; i++) {
CM_RETURN_IFERR(cm_event_init(&g_universal_write_info.event[i]));
CM_RETURN_IFERR(cm_event_init(&g_consensus_hc_info.event[i]));
}
if (md_get_stream_list(streams, &stream_count) != CM_SUCCESS) {
LOG_DEBUG_ERR("md_get_stream_list failed");
return CM_ERROR;
}
for (uint32 i = 0; i < stream_count; i++) {
uint32 stream_id = streams[i];
dcf_status_t *node_status = &g_node_status[stream_id];
cm_latch_init(&node_status->latch);
node_status->status = NODE_NORMAL;
node_status->block.ack = NO_ACK;
CM_RETURN_IFERR(cm_event_init(&node_status->block.event));
CM_RETURN_IFERR(cm_create_thread(block_thread_entry, 0, (void *)(uint64)stream_id, &node_status->block.thread));
}
register_msg_process(MEC_CMD_BLOCK_NODE_RPC_REQ, block_node_req_proc, PRIV_HIGH);
register_msg_process(MEC_CMD_BLOCK_NODE_RPC_ACK, block_node_ack_proc, PRIV_HIGH);
register_msg_process(MEC_CMD_SEND_COMMON_MSG, common_msg_proc, PRIV_LOW);
register_msg_process(MEC_CMD_CHANGE_MEMBER_RPC_REQ, change_member_req_proc, PRIV_HIGH);
register_msg_process(MEC_CMD_UNIVERSAL_WRITE_REQ, universal_write_req_proc, PRIV_LOW);
register_msg_process(MEC_CMD_UNIVERSAL_WRITE_ACK, universal_write_ack_proc, PRIV_LOW);
register_msg_process(MEC_CMD_GET_COMMIT_INDEX_REQ, dcf_get_commit_index_req_proc, PRIV_LOW);
register_msg_process(MEC_CMD_GET_COMMIT_INDEX_ACK, dcf_get_commit_index_ack_proc, PRIV_LOW);
g_node_inited = CM_TRUE;
return CM_SUCCESS;
}
static inline void deinit_node_status()
{
uint32 streams[CM_MAX_STREAM_COUNT];
uint32 stream_count;
if (md_get_stream_list(streams, &stream_count) != CM_SUCCESS) {
LOG_DEBUG_ERR("md_get_stream_list failed");
return;
}
if (!g_node_inited) {
return;
}
for (uint32 i = 0; i < stream_count; i++) {
uint32 stream_id = streams[i];
dcf_status_t *node_status = &g_node_status[stream_id];
if (set_node_status(stream_id, NODE_UNINIT, 0) != CM_SUCCESS) {
LOG_DEBUG_ERR("set node status to NODE_UNINIT failed in deinit");
}
cm_close_thread(&node_status->block.thread);
cm_event_destory(&node_status->block.event);
}
for (uint32 i = 0; i < MAX_PARALLEL_MAX_NUM; i++) {
cm_event_destory(&g_universal_write_info.event[i]);
cm_event_destory(&g_consensus_hc_info.event[i]);
}
MEMS_RETVOID_IFERR(memset_sp(g_node_status, sizeof(dcf_status_t) * CM_MAX_STREAM_COUNT, 0,
sizeof(dcf_status_t) * CM_MAX_STREAM_COUNT));
g_node_inited = CM_FALSE;
}
static inline bool32 check_if_node_inited(uint32 stream_id)
{
if (stream_id >= CM_MAX_STREAM_COUNT) {
LOG_DEBUG_ERR("stream_id=%u invalid", stream_id);
return CM_FALSE;
}
node_status_t status = get_node_status(stream_id);
LOG_DEBUG_INF("stream_id=%u node_status=%d", stream_id, status);
return (status == NODE_UNINIT) ? CM_FALSE : CM_TRUE;
}
static inline void wait_if_node_blocked(uint32 stream_id)
{
node_status_t status = get_node_status(stream_id);
if (status == NODE_BLOCKED) {
timespec_t begin = cm_clock_now();
while (status == NODE_BLOCKED) {
if ((cm_clock_now() - begin) > MICROSECS_PER_SECOND) {
LOG_DEBUG_WAR("node is blocked now, waiting...");
begin = cm_clock_now();
}
cm_sleep(1);
status = get_node_status(stream_id);
}
}
}
static usr_cb_log_output_t g_log_output_cb = NULL;
status_t init_logger_param(log_param_t *log_param)
{
param_value_t param_value;
CM_RETURN_IFERR(md_get_param(DCF_PARAM_LOG_PATH, ¶m_value));
if (strlen(param_value.log_path) == 0) {
CM_RETURN_IFERR(md_get_param(DCF_PARAM_DATA_PATH, ¶m_value));
PRTS_RETURN_IFERR(snprintf_s(log_param->log_home, CM_MAX_LOG_HOME_LEN, CM_MAX_LOG_HOME_LEN - 1,
"%s", param_value.data_path));
} else {
PRTS_RETURN_IFERR(snprintf_s(log_param->log_home, CM_MAX_LOG_HOME_LEN, CM_MAX_LOG_HOME_LEN - 1,
"%s", param_value.log_path));
}
CM_RETURN_IFERR(md_get_param(DCF_PARAM_INSTANCE_NAME, ¶m_value));
PRTS_RETURN_IFERR(snprintf_s(log_param->instance_name, CM_MAX_NAME_LEN, CM_MAX_NAME_LEN - 1,
"%s", param_value.instance_name));
CM_RETURN_IFERR(md_get_param(DCF_PARAM_LOG_FILENAME_FORMAT, ¶m_value));
log_param->log_filename_format = param_value.value_log_filename_format;
CM_RETURN_IFERR(md_get_param(DCF_PARAM_LOG_LEVEL, ¶m_value));
log_param->log_level = param_value.value_loglevel;
CM_RETURN_IFERR(md_get_param(DCF_PARAM_LOG_BACKUP_FILE_COUNT, ¶m_value));
log_param->log_backup_file_count = param_value.value_log_backup_count;
CM_RETURN_IFERR(md_get_param(DCF_PARAM_MAX_LOG_FILE_SIZE, ¶m_value));
log_param->max_log_file_size = param_value.value_max_log_file_size * SIZE_M(1);
CM_RETURN_IFERR(md_get_param(DCF_PARAM_LOG_FILE_PERMISSION, ¶m_value));
cm_log_set_file_permissions((uint16)param_value.value_log_file_permission);
CM_RETURN_IFERR(md_get_param(DCF_PARAM_LOG_PATH_PERMISSION, ¶m_value));
cm_log_set_path_permissions((uint16)param_value.value_log_path_permission);
CM_RETURN_IFERR(md_get_param(DCF_PARAM_LOG_SUPPRESS_ENABLE, ¶m_value));
log_param->log_suppress_enable = param_value.log_suppress_enable;
return CM_SUCCESS;
}
int64 cb_get_value_impl(stat_item_id_t item_id)
{
int64 value = 0;
switch (item_id) {
case SEND_QUEUE_COUNT:
value = mec_get_send_que_count(PRIV_LOW);
break;
case RECV_QUEUE_COUNT:
value = mec_get_recv_que_count(PRIV_LOW);
break;
case SEND_QUEUE_COUNT_HIGH:
value = mec_get_send_que_count(PRIV_HIGH);
break;
case RECV_QUEUE_COUNT_HIGH:
value = mec_get_recv_que_count(PRIV_HIGH);
break;
case STG_MEM_USED:
value = stg_get_total_mem_used();
break;
case MEC_SEND_MEM:
value = mec_get_send_mem_capacity(PRIV_LOW);
break;
case MEC_RECV_MEM:
value = mec_get_recv_mem_capacity(PRIV_LOW);
break;
case MEC_BUDDY_MEM:
value = get_mem_pool()->used_size;
break;
default:
break;
}
return value;
}
status_t register_stat_items()
{
CM_RETURN_IFERR(cm_reg_stat_item(DCF_WRITE_CNT, "DCFWriteCount", UNIT_DEFAULT, STAT_INDICATOR_ACC, NULL));
CM_RETURN_IFERR(cm_reg_stat_item(DCF_WRITE_SIZE, "DCFWriteSize", UNIT_MB, STAT_INDICATOR_ACC, NULL));
CM_RETURN_IFERR(cm_reg_stat_item(DISK_WRITE, "DiskWrite", UNIT_MS, STAT_INDICATOR_AVG, NULL));
CM_RETURN_IFERR(cm_reg_stat_item(SEND_PACK_SIZE, "SendPackSize", UNIT_MB, STAT_INDICATOR_ACC, NULL));
CM_RETURN_IFERR(cm_reg_stat_item(SEND_DELAY, "SendDelay", UNIT_MS, STAT_INDICATOR_AVG, NULL));
CM_RETURN_IFERR(cm_reg_stat_item(SEND_WAIT, "SendWait", UNIT_MS, STAT_INDICATOR_AVG, NULL));
CM_RETURN_IFERR(cm_reg_stat_item(RECV_DELAY, "RecvDelay", UNIT_MS, STAT_INDICATOR_AVG, NULL));
CM_RETURN_IFERR(
cm_reg_stat_item(SEND_QUEUE_COUNT, "SendQueueLowCount", UNIT_DEFAULT, STAT_INDICATOR_ACC, cb_get_value_impl));
CM_RETURN_IFERR(
cm_reg_stat_item(RECV_QUEUE_COUNT, "RecvQueueLowCount", UNIT_DEFAULT, STAT_INDICATOR_ACC, cb_get_value_impl));
CM_RETURN_IFERR(cm_reg_stat_item(SEND_QUEUE_COUNT_HIGH, "SendQueueHighCount",
UNIT_DEFAULT, STAT_INDICATOR_ACC, cb_get_value_impl));
CM_RETURN_IFERR(cm_reg_stat_item(RECV_QUEUE_COUNT_HIGH, "RecvQueueHighCount",
UNIT_DEFAULT, STAT_INDICATOR_ACC, cb_get_value_impl));
CM_RETURN_IFERR(
cm_reg_stat_item(STG_MEM_USED, "StgMemUsed", UNIT_MB, STAT_INDICATOR_ACC, cb_get_value_impl));
CM_RETURN_IFERR(
cm_reg_stat_item(MEC_SEND_MEM, "MecSendMem", UNIT_MB, STAT_INDICATOR_ACC, cb_get_value_impl));
CM_RETURN_IFERR(
cm_reg_stat_item(MEC_RECV_MEM, "MecRecvMem", UNIT_MB, STAT_INDICATOR_ACC, cb_get_value_impl));
CM_RETURN_IFERR(cm_reg_stat_item(HB_SEND_COUNT, "HBSendCount", UNIT_DEFAULT, STAT_INDICATOR_ACC, NULL));
CM_RETURN_IFERR(cm_reg_stat_item(HB_RECV_COUNT, "HBRecvCount", UNIT_DEFAULT, STAT_INDICATOR_ACC, NULL));
CM_RETURN_IFERR(cm_reg_stat_item(HB_RTT, "HBRTT", UNIT_MS, STAT_INDICATOR_AVG, NULL));
CM_RETURN_IFERR(
cm_reg_stat_item(MEC_BUDDY_MEM, "MecBuddyMem", UNIT_MB, STAT_INDICATOR_ACC, cb_get_value_impl));
return CM_SUCCESS;
}
status_t init_logger()
{
char file_name[CM_FULL_PATH_BUFFER_SIZE] = { '\0' };
log_param_t *log_param = cm_log_param_instance();
log_param->log_instance_startup = CM_FALSE;
CM_RETURN_IFERR(cm_set_log_module_name("DCF", sizeof("DCF") - 1));
CM_RETURN_IFERR(init_logger_param(log_param));
log_param->log_write = (usr_cb_log_output_t)g_log_output_cb;
PRTS_RETURN_IFERR(snprintf_s(file_name, CM_FULL_PATH_BUFFER_SIZE, CM_FULL_PATH_BUFFER_SIZE - 1, "%s/run/%s",
log_param->log_home, "dcf.rlog"));
CM_RETURN_IFERR(cm_log_init(LOG_RUN, file_name));
PRTS_RETURN_IFERR(snprintf_s(file_name, CM_FULL_PATH_BUFFER_SIZE, CM_FULL_PATH_BUFFER_SIZE - 1, "%s/debug/%s",
log_param->log_home, "dcf.dlog"));
CM_RETURN_IFERR(cm_log_init(LOG_DEBUG, file_name));
PRTS_RETURN_IFERR(snprintf_s(file_name, CM_FULL_PATH_BUFFER_SIZE, CM_FULL_PATH_BUFFER_SIZE - 1, "%s/oper/%s",
log_param->log_home, "oper.log"));
CM_RETURN_IFERR(cm_log_init(LOG_OPER, file_name));
PRTS_RETURN_IFERR(snprintf_s(file_name, CM_FULL_PATH_BUFFER_SIZE, CM_FULL_PATH_BUFFER_SIZE - 1, "%s/alarm/%s",
log_param->log_home, "alarm.log"));
CM_RETURN_IFERR(cm_log_init(LOG_ALARM, file_name));
PRTS_RETURN_IFERR(snprintf_s(file_name, CM_FULL_PATH_BUFFER_SIZE, CM_FULL_PATH_BUFFER_SIZE - 1, "%s/mec/%s",
log_param->log_home, "mec.log"));
CM_RETURN_IFERR(cm_log_init(LOG_MEC, file_name));
PRTS_RETURN_IFERR(snprintf_s(file_name, CM_FULL_PATH_BUFFER_SIZE, CM_FULL_PATH_BUFFER_SIZE - 1, "%s/trace/%s",
log_param->log_home, "trace.log"));
CM_RETURN_IFERR(cm_log_init(LOG_TRACE, file_name));
PRTS_RETURN_IFERR(snprintf_s(file_name, CM_FULL_PATH_BUFFER_SIZE, CM_FULL_PATH_BUFFER_SIZE - 1, "%s/profile/%s",
log_param->log_home, "dcf.plog"));
CM_RETURN_IFERR(cm_log_init(LOG_PROFILE, file_name));
log_param->log_instance_startup = CM_TRUE;
LOG_RUN_INF("[DCF]Logger init succeed");
return CM_SUCCESS;
}
static inline status_t profile_stat_init()
{
uint32 ret = cm_profile_stat_init();
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("DCF init profile statistics failed, %s, retcode=%d, error code=%d, error info=%s",
"config init failed",
ret, cm_get_error_code(), cm_get_errormsg(cm_get_error_code()));
cm_profile_stat_uninit();
return CM_ERROR;
}
CM_RETURN_IFERR(register_stat_items());
return CM_SUCCESS;
}
int dcf_set_param(const char* param_name, const char* param_value)
{
CM_CHECK_NULL_PTR(param_name);
cm_reset_error();
init_dcf_errno_desc();
dcf_param_t param_type;
param_value_t out_value;
if (cm_str_equal(param_name, "SSL_PWD_PLAINTEXT")) {
LOG_OPER("dcf set param, param_name=%s param_value=%s", param_name, "***");
} else {
LOG_OPER("dcf set param, param_name=%s param_value=%s", param_name, param_value);
}
CM_RETURN_IFERR(md_verify_param(param_name, param_value, ¶m_type, &out_value));
return md_set_param(param_type, &out_value);
}
int dcf_get_param(const char *param_name, char *param_value, unsigned int size)
{
CM_CHECK_NULL_PTR(param_name);
cm_reset_error();
init_dcf_errno_desc();
CM_RETURN_IFERR(md_get_param_by_name(param_name, param_value, size));
return CM_SUCCESS;
}
int dcf_register_after_writer(usr_cb_after_writer_t cb_func)
{
return rep_register_after_writer(ENTRY_TYPE_LOG, cb_func);
}
int dcf_register_consensus_notify(usr_cb_consensus_notify_t cb_func)
{
return rep_register_consensus_notify(ENTRY_TYPE_LOG, cb_func);
}
int dcf_register_status_notify(usr_cb_status_notify_t cb_func)
{
return elc_register_notify(cb_func);
}
int dcf_register_log_output(usr_cb_log_output_t cb_func)
{
g_log_output_cb = cb_func;
return CM_SUCCESS;
}
int dcf_register_exception_report(usr_cb_exception_notify_t cb_func)
{
g_cb_exception_notify = cb_func;
return CM_SUCCESS;
}
int dcf_register_decrypt_pwd(usr_cb_decrypt_pwd_t cb_func)
{
return mec_register_decrypt_pwd(cb_func);
}
int dcf_register_election_notify(usr_cb_election_notify_t cb_func)
{
return elc_register_election_notify(cb_func);
}
int dcf_register_msg_proc(usr_cb_msg_proc_t cb_func)
{
g_cb_send_msg_notify = cb_func;
return CM_SUCCESS;
}
int dcf_register_thread_memctx_init(usr_cb_thread_memctx_init_t cb_func)
{
return cb_register_thread_memctx_init(cb_func);
}
static inline status_t register_conf_type_cb()
{
CM_RETURN_IFERR(rep_register_after_writer(ENTRY_TYPE_CONF, md_after_write_cb));
CM_RETURN_IFERR(rep_register_consensus_notify(ENTRY_TYPE_CONF, md_consensus_notify_cb));
CM_RETURN_IFERR(stg_register_cb(ENTRY_TYPE_CONF, md_save));
return CM_SUCCESS;
}
static status_t init_tool_threads()
{
int ret = init_node_status();
if (ret != CM_SUCCESS) {
deinit_node_status();
return CM_ERROR;
}
ret = profile_stat_init();
if (ret != CM_SUCCESS) {
return CM_ERROR;
}
ret = init_exception_report();
if (ret != CM_SUCCESS) {
deinit_exception_report();
return CM_ERROR;
}
LOG_RUN_INF("[DCF]Tool init succeed");
return CM_SUCCESS;
}
static status_t init_main_threads(unsigned int node_id, const char *cfg_str)
{
CM_RETURN_IFERR(cm_start_timer(g_timer()));
CM_RETURN_IFERR_EX(init_logger(), CM_THROW_ERROR(ERR_INIT_LOGGER));
LOG_OPER("dcf start, node_id=%u cfg_str=%s", node_id, cfg_str);
status_t ret = md_init(node_id, cfg_str);
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("metadata init failed, error code=%d, error info=%s",
cm_get_error_code(), cm_get_errormsg(cm_get_error_code()));
return CM_ERROR;
}
ret = register_conf_type_cb();
if (ret != CM_SUCCESS) {
return CM_ERROR;
}
ret = mec_init();
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("network init failed, error code=%d, error info=%s",
cm_get_error_code(), cm_get_errormsg(cm_get_error_code()));
return CM_ERROR;
}
ret = stg_init();
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("storage init failed, error code=%d, error info=%s",
cm_get_error_code(), cm_get_errormsg(cm_get_error_code()));
return CM_ERROR;
}
ret = elc_init();
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("election init failed, error code=%d, error info=%s",
cm_get_error_code(), cm_get_errormsg(cm_get_error_code()));
return CM_ERROR;
}
ret = rep_init();
if (ret != CM_SUCCESS) {
LOG_RUN_ERR("replication init failed, error code=%d, error info=%s",
cm_get_error_code(), cm_get_errormsg(cm_get_error_code()));
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t add_manual_notify_item()
{
if (elc_stream_get_run_mode() != ELECTION_MANUAL) {
return CM_SUCCESS;
}
uint32 node_id = md_get_cur_node();
uint32 stream_list[CM_MAX_STREAM_COUNT];
uint32 stream_count;
if (md_get_stream_list(stream_list, &stream_count) != CM_SUCCESS) {
LOG_DEBUG_ERR("md get stream list fail when add manual notify item");
return CM_ERROR;
}
for (uint32 i = 0; i < stream_count; i++) {
uint32 stream_id = stream_list[i];
if (elc_get_node_role(stream_id) == DCF_ROLE_LEADER) {
CM_RETURN_IFERR(elc_stream_set_role(stream_id, DCF_ROLE_LEADER));
CM_RETURN_IFERR(elc_stream_set_votefor(stream_id, node_id));
LOG_DEBUG_INF("Set votefor when add manual notify item, stream_id=%u, votefor=%u", stream_id, node_id);
add_notify_item(stream_id, node_id, node_id, DCF_ROLE_FOLLOWER, DCF_ROLE_LEADER);
LOG_RUN_INF("dcf start as manual mode, add notify item for leader, stream_id=%u, node_id=%u",
stream_id, node_id);
}
}
return CM_SUCCESS;
}
static void clear_resource()
{
LOG_RUN_INF("begin to clear_resource");
cm_profile_stat_uninit();
elc_deinit();
rep_stop();
stg_deinit();
mec_deinit();
deinit_node_status();
if (deinit_exception_report() != CM_SUCCESS) {
LOG_DEBUG_ERR("deinit_exception_report failed");
}
md_uninit();
cm_close_timer(g_timer());
LOG_RUN_INF("clear_resource succeed");
}
int dcf_start(unsigned int node_id, const char *cfg_str)
{
CM_CHECK_NULL_PTR(cfg_str);
cm_latch_x(&g_dcf_latch, 0, NULL);
if (g_dcf_inited) {
cm_unlatch(&g_dcf_latch, NULL);
return CM_SUCCESS;
}
init_dcf_errno_desc();
cm_reset_error();
do {
status_t ret = init_main_threads(node_id, cfg_str);
if (ret != CM_SUCCESS) {
break;
}
ret = init_tool_threads();
if (ret != CM_SUCCESS) {
break;
}
ret = add_manual_notify_item();
if (ret != CM_SUCCESS) {
break;
}
(void)rep_check_param_majority_groups();
LOG_RUN_INF("dcf start succeed.");
g_dcf_inited = CM_TRUE;
cm_unlatch(&g_dcf_latch, NULL);
return CM_SUCCESS;
} while (0);
clear_resource();
cm_unlatch(&g_dcf_latch, NULL);
LOG_RUN_INF("dcf start failed");
return CM_ERROR;
}
int dcf_write(unsigned int stream_id, const char *buffer, unsigned int length,
unsigned long long key, unsigned long long *index)
{
unsigned long long index_in;
uint64 now = g_timer()->now;
cm_reset_error();
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (rep_get_can_write_flag(stream_id) != CM_TRUE) {
LOG_DEBUG_ERR("[DCF]can_write_flag false,can't write now, key=%llu.", key);
return CM_ERROR;
}
mem_pool_t* buddy_pool = get_mem_pool();
if (length > (buddy_pool->max_size >> CM_2X_FIXED)) {
LOG_DEBUG_ERR("[DCF]write_size[%u] is too big, don't support,key=%llu.", length, key);
return CM_ERROR;
}
wait_if_node_blocked(stream_id);
(void)cm_atomic32_inc(&g_node_status[stream_id].writing_cnt);
status_t ret = rep_write(stream_id, buffer, length, key, ENTRY_TYPE_LOG, &index_in);
(void)cm_atomic32_dec(&g_node_status[stream_id].writing_cnt);
if (ret != CM_SUCCESS) {
LOG_DEBUG_ERR("[DCF]rep_write failed, error code=%d key=%llu", ret, key);
return CM_ERROR;
}
ps_record(PS_WRITE, index_in, now);
if (index != NULL) {
*index = index_in;
LOG_DEBUG_INF("[DCF]write succeed, stream_id=%u buflength=%u key=%llu index=%llu",
stream_id, length, key, *index);
} else {
LOG_DEBUG_INF("[DCF]write succeed, stream_id=%u buflength=%u key=%llu", stream_id, length, key);
}
return CM_SUCCESS;
}
int dcf_set_trace_key(unsigned long long trace_key)
{
if (trace_key == (uint64)-1) {
LOG_DEBUG_ERR("tracekey %llu is invalid", trace_key);
return CM_ERROR;
}
set_trace_key(trace_key);
return CM_SUCCESS;
}
int dcf_read(unsigned int stream_id, unsigned long long index, char* buffer, unsigned int length)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
log_entry_t *entry = stg_get_entry(stream_id, index);
if (entry == NULL) {
LOG_DEBUG_ERR("[DCF] index %llu not found", index);
return CM_ERROR;
}
if (length < ENTRY_SIZE(entry)) {
LOG_DEBUG_ERR("[DCF] buffer is too small, buffer[%u] - log[%u]", length, ENTRY_SIZE(entry));
stg_entry_dec_ref(entry);
return CM_ERROR;
}
errno_t ret = memcpy_s(buffer, length, ENTRY_BUF(entry), ENTRY_SIZE(entry));
stg_entry_dec_ref(entry);
return ret == EOK ? ENTRY_SIZE(entry) : CM_ERROR;
}
int dcf_truncate(unsigned int stream_id, unsigned long long first_index_kept)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
LOG_DEBUG_INF("dcf truncate, stream_id=%u first_index_kept=%llu", stream_id, first_index_kept);
return stg_truncate_prefix(stream_id, first_index_kept);
}
int dcf_stop()
{
LOG_OPER("dcf stop");
cm_latch_x(&g_dcf_latch, 0, NULL);
if (!g_dcf_inited) {
cm_unlatch(&g_dcf_latch, NULL);
return CM_SUCCESS;
}
cm_reset_error();
clear_resource();
g_dcf_inited = CM_FALSE;
cm_unlatch(&g_dcf_latch, NULL);
LOG_RUN_INF("dcf stop succeed");
return CM_SUCCESS;
}
static status_t append_local_node(unsigned int stream_id, cJSON *obj)
{
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "local_node_id", md_get_cur_node()));
dcf_role_t role = elc_get_node_role(stream_id);
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddStringToObject(obj, "role", md_get_rolename_by_type(role)));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "term", elc_get_current_term(stream_id)));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "run_mode", elc_stream_get_run_mode()));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "work_mode", elc_get_work_mode(stream_id)));
param_value_t elc_timeout;
param_value_t auto_elc_pri_en;
param_value_t hb_interval;
param_value_t elc_switch_thd;
CM_RETURN_IFERR(md_get_param(DCF_PARAM_ELECTION_TIMEOUT, &elc_timeout));
CM_RETURN_IFERR(md_get_param(DCF_PARAM_AUTO_ELC_PRIORITY_EN, &auto_elc_pri_en));
CM_RETURN_IFERR(md_get_param(DCF_PARAM_HEARTBEAT_INTERVAL, &hb_interval));
CM_RETURN_IFERR(md_get_param(DCF_PARAM_ELECTION_SWITCH_THRESHOLD, &elc_switch_thd));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "hb_interval", hb_interval.value_hb_interval));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "elc_timeout", elc_timeout.value_elc_timeout));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "auto_elc_pri_en",
auto_elc_pri_en.value_auto_elc_priority_en));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "elc_switch_thd",
elc_switch_thd.value_elc_switch_thd));
elc_stream_lock_s(stream_id);
uint32 my_group = elc_stream_get_my_group(stream_id);
uint64 my_prio = elc_stream_get_priority(stream_id);
uint32 leader_group = elc_stream_get_leader_group(stream_id);
elc_stream_unlock(stream_id);
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "group", my_group));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "priority", my_prio));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "leader_group", leader_group));
bool32 is_in_major = elc_is_in_majority(stream_id);
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "is_in_major", is_in_major));
uint64 index = stg_get_applied_index(stream_id);
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "applied_index", index));
index = rep_get_commit_index(stream_id);
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "commit_index", index));
index = stg_first_index(stream_id);
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "first_index", index));
index = stg_last_index(stream_id);
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "last_index", index));
if (elc_get_node_role(stream_id) == DCF_ROLE_LEADER) {
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "cluster_min_apply_idx",
rep_get_cluster_min_apply_idx(stream_id)));
}
return CM_SUCCESS;
}
static bool32 is_node_leader(uint32 stream_id, uint32 node_id)
{
if (node_id == CM_INVALID_NODE_ID) {
return CM_FALSE;
}
uint32 votefor = elc_get_votefor(stream_id);
uint32 cur_node = md_get_cur_node();
param_value_t value;
dcf_work_mode_t work_mode = elc_get_work_mode(stream_id);
uint32 old_leader = elc_get_old_leader(stream_id);
bool32 cond = (node_id == votefor);
if (md_get_param(DCF_PARAM_RUN_MODE, &value) != CM_SUCCESS) {
return CM_FALSE;
}
if (cur_node == node_id) {
return (cond && (elc_get_node_role(stream_id) == DCF_ROLE_LEADER));
}
if (value.value_mode == ELECTION_AUTO && work_mode == WM_NORMAL) {
cond = cond && (node_id == old_leader) && mec_is_ready(stream_id, node_id, PRIV_HIGH);
}
return cond;
}
static status_t append_leader_node(unsigned int stream_id, cJSON *obj)
{
uint32 node_id = elc_get_votefor(stream_id);
if (!is_node_leader(stream_id, node_id)) {
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNullToObject(obj, "leader_id"));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNullToObject(obj, "leader_ip"));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNullToObject(obj, "leader_port"));
return CM_SUCCESS;
}
dcf_node_t node_item;
CM_RETURN_IFERR(md_get_node(node_id, &node_item));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "leader_id", node_item.node_id));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddStringToObject(obj, "leader_ip", node_item.ip));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "leader_port", node_item.port));
return CM_SUCCESS;
}
static status_t append_stream_node_detail(unsigned int stream_id, cJSON *obj)
{
uint32 node_list[CM_MAX_NODE_COUNT];
uint32 node_count;
cJSON *node_array = cJSON_CreateArray();
CM_RETURN_IFERR(md_get_stream_nodes(stream_id, node_list, &node_count));
for (uint32 i = 0; i < node_count; i++) {
uint32 node_id = node_list[i];
dcf_node_t node_item;
CM_RETURN_IFERR(md_get_node(node_id, &node_item));
cJSON *node = cJSON_CreateObject();
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(node, "node_id", node_item.node_id));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddStringToObject(node, "ip", node_item.ip));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(node, "port", node_item.port));
if (is_node_leader(stream_id, node_id)) {
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddStringToObject(node, "role",
md_get_rolename_by_type(DCF_ROLE_LEADER)));
} else {
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddStringToObject(node, "role",
md_get_rolename_by_type((node_item.default_role == DCF_ROLE_LEADER)
? DCF_ROLE_FOLLOWER : node_item.default_role)));
}
if (elc_get_node_role(stream_id) == DCF_ROLE_LEADER) {
log_id_t match_idx = rep_leader_get_match_index(stream_id, node_id);
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(node, "next_index",
rep_leader_get_next_index(stream_id, node_id)));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(node, "match_index", match_idx.index));
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(node, "apply_index",
rep_leader_get_apply_index(stream_id, node_id)));
}
if (cJSON_AddItemToArray(node_array, node) == CM_FALSE) {
LOG_DEBUG_ERR("cJSON AddItemToArray fail when append_stream_node_detail");
return CM_ERROR;
}
}
if (cJSON_AddItemToObject(obj, "nodes", node_array) == CM_FALSE) {
LOG_DEBUG_ERR("cJSON AddItemToObject fail when append_stream_node_detail");
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t append_stream_info(unsigned int stream_id, cJSON *obj)
{
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "stream_id", stream_id));
CM_RETURN_IFERR(append_local_node(stream_id, obj));
CM_RETURN_IFERR(append_leader_node(stream_id, obj));
CM_RETURN_IFERR(append_stream_node_detail(stream_id, obj));
return CM_SUCCESS;
}
static status_t append_cluster_info(cJSON *obj)
{
uint32 current_node_id = md_get_cur_node();
CM_CHECK_CJSON_OPER_ERR_AND_RETURN(cJSON_AddNumberToObject(obj, "local_node_id", current_node_id));
uint32 stream_list[CM_MAX_STREAM_COUNT];
uint32 stream_count;
CM_RETURN_IFERR(md_get_streams_by_node(current_node_id, stream_list, &stream_count));
cJSON *stream_array = cJSON_CreateArray();
for (uint32 i = 0; i < stream_count; i++) {
uint32 stream_id = stream_list[i];
cJSON *stream_obj = cJSON_CreateObject();
if (append_stream_info(stream_id, stream_obj) != CM_SUCCESS) {
cJSON_Delete(stream_obj);
cJSON_Delete(stream_array);
return CM_ERROR;
}
if (cJSON_AddItemToArray(stream_array, stream_obj) == CM_FALSE) {
LOG_DEBUG_ERR("cJSON AddItemToArray fail when append_cluster_info");
cJSON_Delete(stream_obj);
cJSON_Delete(stream_array);
return CM_ERROR;
}
}
if (cJSON_AddItemToObject(obj, "stream_list", stream_array) == CM_FALSE) {
LOG_DEBUG_ERR("cJSON AddItemToObject fail when append_cluster_info");
cJSON_Delete(stream_array);
return CM_ERROR;
}
return CM_SUCCESS;
}
int dcf_query_cluster_info(char* buffer, unsigned int length)
{
cm_reset_error();
if (buffer == NULL || length == 0) {
LOG_DEBUG_ERR("buffer=%p or length=%u invalid", buffer, length);
CM_THROW_ERROR(ERR_NULL_PTR);
return 0;
}
int error_no = memset_s(buffer, length, 0, length);
if (error_no != EOK) {
LOG_DEBUG_ERR("dcf_query_cluster_info, memset_s return failed, errno=%d", error_no);
CM_THROW_ERROR(ERR_SYSTEM_CALL, error_no);
return 0;
}
cJSON *obj = cJSON_CreateObject();
status_t ret = append_cluster_info(obj);
if (ret != CM_SUCCESS) {
CM_THROW_ERROR(ERR_QUERY_DCF_INFO_ERR, "append_cluster_info failed", ret);
LOG_DEBUG_ERR("dcf_query_cluster_info, %s, error code=%d", "append_cluster_info failed", ret);
cJSON_Delete(obj);
return 0;
}
error_no = cJSON_PrintPreallocated(obj, buffer, length, 0);
if (!error_no) {
CM_THROW_ERROR(ERR_QUERY_DCF_INFO_ERR, "cJSON_PrintPreallocated failed", error_no);
LOG_DEBUG_ERR("dcf_query_cluster_info, cJSON_PrintPreallocated failed");
cJSON_Delete(obj);
return 0;
}
cJSON_Delete(obj);
return (strlen(buffer) + 1);
}
int dcf_query_stream_info(unsigned int stream_id, char* buffer, unsigned int length)
{
cm_reset_error();
if (!check_if_node_inited(stream_id)) {
CM_THROW_ERROR(ERR_QUERY_DCF_INFO_ERR, "check_if_node_inited failed", -1);
return 0;
}
if (buffer == NULL || length == 0) {
LOG_DEBUG_ERR("buffer=%p or length=%u invalid", buffer, length);
CM_THROW_ERROR(ERR_NULL_PTR);
return 0;
}
int error_no = memset_s(buffer, length, 0, length);
if (error_no != EOK) {
LOG_DEBUG_ERR("dcf_query_stream_info, memset_s return failed, errno=%d", error_no);
CM_THROW_ERROR(ERR_SYSTEM_CALL, error_no);
return 0;
}
cJSON *obj = cJSON_CreateObject();
status_t ret = append_stream_info(stream_id, obj);
if (ret != CM_SUCCESS) {
CM_THROW_ERROR(ERR_QUERY_DCF_INFO_ERR, "dcf_query_stream_info failed", ret);
LOG_DEBUG_ERR("dcf_query_stream_info, %s, error code=%d", "append_stream_info failed", ret);
cJSON_Delete(obj);
return 0;
}
error_no = cJSON_PrintPreallocated(obj, buffer, length, 0);
if (!error_no) {
CM_THROW_ERROR(ERR_QUERY_DCF_INFO_ERR, "dcf_query_stream_info failed", error_no);
LOG_DEBUG_ERR("dcf_query_stream_info, cJSON_PrintPreallocated failed");
cJSON_Delete(obj);
return 0;
}
cJSON_Delete(obj);
return (strlen(buffer) + 1);
}
int dcf_query_leader_info(unsigned int stream_id, char *ip, unsigned int ip_len, unsigned int *port,
unsigned int *node_id)
{
cm_reset_error();
if (!check_if_node_inited(stream_id)) {
CM_THROW_ERROR(ERR_QUERY_DCF_INFO_ERR, "check_if_node_inited failed", CM_ERROR);
return CM_ERROR;
}
if (ip == NULL || ip_len == 0 || port == NULL || node_id == NULL) {
CM_THROW_ERROR(ERR_NULL_PTR);
LOG_DEBUG_ERR("ip=%p or ip_len=%u or port=%p or node_id=%p invalid", ip, ip_len, port, node_id);
return CM_ERROR;
}
uint32 votefor = elc_get_votefor(stream_id);
if (votefor == CM_INVALID_NODE_ID) {
ip[0] = '\0';
*port = 0;
} else {
dcf_node_t node_item;
CM_RETURN_IFERR(md_get_node(votefor, &node_item));
MEMS_RETURN_IFERR(strncpy_sp(ip, ip_len, node_item.ip, strlen(node_item.ip)));
*port = node_item.port;
*node_id = node_item.node_id;
}
return CM_SUCCESS;
}
int dcf_query_statistics_info(char *buffer, unsigned int length)
{
if (buffer == NULL || length == 0) {
return CM_ERROR;
}
cm_reset_error();
MEMS_RETURN_IFERR(memset_s(buffer, length, 0, length));
cJSON *obj = cJSON_CreateObject();
status_t ret = util_append_statistics_info(obj);
if (ret != CM_SUCCESS) {
LOG_DEBUG_ERR("DCF query statistics info failed, %s, error code=%d", "append cluster info failed", ret);
cJSON_Delete(obj);
return 0;
}
if (!cJSON_PrintPreallocated(obj, buffer, length, 0)) {
cJSON_Delete(obj);
return 0;
}
cJSON_Delete(obj);
return (strlen(buffer) + 1);
}
int dcf_get_errorno()
{
return cm_get_error_code();
}
const char* dcf_get_error(int code)
{
return cm_get_errormsg(code);
}
status_t add_member_request(unsigned int stream_id, unsigned int node_id, const char* ip, unsigned int port,
unsigned int role)
{
dcf_node_t node;
node.node_id = node_id;
MEMS_RETURN_IFERR(strncpy_s(node.ip, CM_MAX_IP_LEN, ip, strlen(ip) + 1));
node.port = port;
node.default_role = role;
node.voting_weight = CM_ELC_NORS_WEIGHT;
node.group = CM_DEFAULT_GROUP_ID;
node.priority = CM_DEFAULT_ELC_PRIORITY;
CM_RETURN_IFERR(md_add_stream_member(stream_id, &node));
uint32 size;
char *md_buf = (char *)malloc(CM_METADATA_DEF_MAX_LEN);
if (md_buf == NULL) {
LOG_DEBUG_ERR("add_member_request malloc failed");
return CM_ERROR;
}
CM_RETURN_IFERR_EX(md_to_string(md_buf, CM_METADATA_DEF_MAX_LEN, &size), CM_FREE_PTR(md_buf));
CM_RETURN_IFERR_EX(rep_write(stream_id, md_buf, size, CFG_LOG_KEY(CM_NODE_ID_ALL, OP_FLAG_ADD),
ENTRY_TYPE_CONF, NULL), CM_FREE_PTR(md_buf));
CM_FREE_PTR(md_buf);
return CM_SUCCESS;
}
#define SLEEP_TIME_PER 100
#define MIN_SLEEP_TIME 3000
status_t wait_process(unsigned int wait_timeout_ms)
{
uint32 wait_time = (wait_timeout_ms < MIN_SLEEP_TIME) ? MIN_SLEEP_TIME : wait_timeout_ms;
timespec_t begin = cm_clock_now();
do {
cm_sleep(SLEEP_TIME_PER);
if (md_get_status() == META_NORMAL) {
return CM_SUCCESS;
}
} while (((uint64)(cm_clock_now() - begin)) / MICROSECS_PER_MILLISEC < wait_time);
if (md_get_status() == META_NORMAL) {
return CM_SUCCESS;
}
LOG_DEBUG_ERR("timeout, wait_time=%u ms", wait_time);
return CM_TIMEDOUT;
}
int dcf_add_member(unsigned int stream_id, unsigned int node_id, const char *ip, unsigned int port,
dcf_role_t role, unsigned int wait_timeout_ms)
{
LOG_OPER("dcf add member, stream_id=%u node_id=%u ip=%s port=%u role=%d wait_timeout_ms=%u",
stream_id, node_id, ip, port, role, wait_timeout_ms);
cm_reset_error();
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (node_id == CM_INVALID_NODE_ID || node_id >= CM_MAX_NODE_COUNT) {
LOG_DEBUG_ERR("node_id=%u invalid", node_id);
return CM_ERROR;
}
if (ip == NULL) {
LOG_DEBUG_ERR("ip is null");
return CM_ERROR;
}
if (cm_check_ip_valid(ip) != CM_TRUE) {
LOG_DEBUG_ERR("ip=%s invalid", ip);
return CM_ERROR;
}
if (elc_get_node_role(stream_id) != DCF_ROLE_LEADER) {
LOG_DEBUG_ERR("current node is not leader.");
return CM_ERROR;
}
CM_RETURN_IFERR(md_set_status(META_CATCH_UP));
if (add_member_request(stream_id, node_id, ip, port, role) != CM_SUCCESS) {
CM_RETURN_IFERR(md_set_status(META_NORMAL));
return CM_ERROR;
}
return wait_process(wait_timeout_ms);
}
status_t remove_member_request(unsigned int stream_id, unsigned int node_id)
{
CM_RETURN_IFERR(md_remove_stream_member(stream_id, node_id));
uint32 size;
char *md_buf = (char *)malloc(CM_METADATA_DEF_MAX_LEN);
if (md_buf == NULL) {
LOG_DEBUG_ERR("remove_member_request malloc failed");
return CM_ERROR;
}
CM_RETURN_IFERR_EX(md_to_string(md_buf, CM_METADATA_DEF_MAX_LEN, &size), CM_FREE_PTR(md_buf));
CM_RETURN_IFERR_EX(rep_write(stream_id, md_buf, size, CFG_LOG_KEY(CM_NODE_ID_ALL, OP_FLAG_REMOVE),
ENTRY_TYPE_CONF, NULL), CM_FREE_PTR(md_buf));
CM_FREE_PTR(md_buf);
return CM_SUCCESS;
}
int dcf_remove_member(unsigned int stream_id, unsigned int node_id, unsigned int wait_timeout_ms)
{
LOG_OPER("dcf remove member, stream_id=%u node_id=%u wait_timeout_ms=%u",
stream_id, node_id, wait_timeout_ms);
cm_reset_error();
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (elc_get_node_role(stream_id) != DCF_ROLE_LEADER) {
LOG_DEBUG_ERR("current node is not leader.");
return CM_ERROR;
}
CM_RETURN_IFERR(md_set_status(META_CATCH_UP));
if (remove_member_request(stream_id, node_id) != CM_SUCCESS) {
CM_RETURN_IFERR(md_set_status(META_NORMAL));
LOG_DEBUG_ERR("remove_member fail, stream_id=%u, node_id=%u", stream_id, node_id);
return CM_ERROR;
}
return wait_process(wait_timeout_ms);
}
status_t change_member_request(uint32 stream_id, uint32 node_id, dcf_change_member_t *change_info)
{
CM_RETURN_IFERR(md_change_stream_member(stream_id, node_id, change_info));
uint32 size;
char *md_buf = (char *)malloc(CM_METADATA_DEF_MAX_LEN);
if (md_buf == NULL) {
LOG_DEBUG_ERR("change_member_role_request malloc failed");
return CM_ERROR;
}
CM_RETURN_IFERR_EX(md_to_string(md_buf, CM_METADATA_DEF_MAX_LEN, &size), CM_FREE_PTR(md_buf));
CM_RETURN_IFERR_EX(rep_write(stream_id, md_buf, size, CFG_LOG_KEY(node_id, change_info->op_type),
ENTRY_TYPE_CONF, NULL), CM_FREE_PTR(md_buf));
CM_FREE_PTR(md_buf);
return CM_SUCCESS;
}
status_t decode_change_member_req(mec_message_t *pack, dcf_change_member_t *change_info)
{
CM_RETURN_IFERR(mec_get_int32(pack, (int32*)&change_info->op_type));
CM_RETURN_IFERR(mec_get_int32(pack, (int32*)&change_info->new_role));
CM_RETURN_IFERR(mec_get_int32(pack, (int32*)&change_info->new_group));
CM_RETURN_IFERR(mec_get_int64(pack, (int64*)&change_info->new_priority));
return CM_SUCCESS;
}
status_t encode_change_member_req(mec_message_t *pack, const dcf_change_member_t *change_info)
{
CM_RETURN_IFERR(mec_put_int32(pack, change_info->op_type));
CM_RETURN_IFERR(mec_put_int32(pack, change_info->new_role));
CM_RETURN_IFERR(mec_put_int32(pack, change_info->new_group));
CM_RETURN_IFERR(mec_put_int64(pack, change_info->new_priority));
return CM_SUCCESS;
}
status_t change_member_req(uint32 stream_id, uint32 leader_id, dcf_change_member_t *change_info)
{
mec_message_t pack;
uint32 src_node = md_get_cur_node();
if (mec_alloc_pack(&pack, MEC_CMD_CHANGE_MEMBER_RPC_REQ, src_node, leader_id, stream_id) != CM_SUCCESS) {
LOG_DEBUG_ERR("change_member_req:mec_alloc_pack failed.stream_id=%u,leader_id=%u,src_node=%u",
stream_id, leader_id, src_node);
return CM_ERROR;
}
if (encode_change_member_req(&pack, change_info) != CM_SUCCESS) {
mec_release_pack(&pack);
LOG_DEBUG_ERR("change_member_req, encode fail.");
return CM_ERROR;
}
LOG_DEBUG_INF("send change_member_req: stream=%u,src=%u,leader_id=%u,op_type=%u.",
stream_id, src_node, leader_id, change_info->op_type);
status_t ret = mec_send_data(&pack);
mec_release_pack(&pack);
return ret;
}
status_t leader_change_member_nowait(uint32 stream_id, uint32 node_id, dcf_change_member_t *change_info)
{
if (NEED_CHANGE_ROLE(change_info->op_type) && change_info->new_role == DCF_ROLE_PASSIVE) {
uint32 voter_num = 0;
if (md_get_voter_num(stream_id, &voter_num) != CM_SUCCESS) {
LOG_DEBUG_ERR("get voter_num failed.");
return CM_ERROR;
}
if (voter_num <= CM_LEAST_VOTER) {
LOG_DEBUG_ERR("voter_num=%u is not enough, can't change node=%u to passive.", voter_num, node_id);
return CM_ERROR;
}
}
CM_RETURN_IFERR(md_set_status(META_CATCH_UP));
if (change_member_request(stream_id, node_id, change_info) != CM_SUCCESS) {
LOG_DEBUG_ERR("change node[%u]'s member failed.", node_id);
CM_RETURN_IFERR(md_set_status(META_NORMAL));
return CM_ERROR;
}
LOG_DEBUG_INF("change_member end, node_id=%u, op_type=%u.", node_id, change_info->op_type);
return CM_SUCCESS;
}
status_t change_member_req_proc(mec_message_t *pack)
{
uint32 stream_id = pack->head->stream_id;
uint32 src_node_id = pack->head->src_inst;
LOG_DEBUG_INF("recv change_member_req: stream_id=%u, src_node=%u", stream_id, src_node_id);
if (!I_AM_LEADER(stream_id)) {
LOG_DEBUG_ERR("I'm not leader now, can't change node[%u]'s member.", src_node_id);
return CM_ERROR;
}
dcf_change_member_t change_info;
CM_RETURN_IFERR(decode_change_member_req(pack, &change_info));
if (NEED_CHANGE_ROLE(change_info.op_type) && src_node_id == md_get_cur_node()) {
LOG_DEBUG_ERR("src_node[%u] is leader now, can't change role.", src_node_id);
return CM_ERROR;
}
return leader_change_member_nowait(stream_id, src_node_id, &change_info);
}
status_t leader_change_member_process(uint32 stream_id, uint32 node_id, dcf_change_member_t *change_info,
unsigned int wait_timeout_ms)
{
CM_RETURN_IFERR(leader_change_member_nowait(stream_id, node_id, change_info));
return wait_process(wait_timeout_ms);
}
status_t nonleader_change_member_process(uint32 stream_id, uint32 leader, uint32 node_id,
dcf_change_member_t *change_info, unsigned int wait_timeout_ms)
{
if (md_get_cur_node() != node_id) {
LOG_DEBUG_ERR("nonleader can only change self's member, node_id=%u.", node_id);
return CM_ERROR;
}
CM_RETURN_IFERR(md_set_status(META_CATCH_UP));
CM_RETURN_IFERR_EX(change_member_req(stream_id, leader, change_info), md_set_status(META_NORMAL));
timespec_t begin = cm_clock_now();
uint32 op_type = change_info->op_type;
while (((uint64)(cm_clock_now() - begin)) / MICROSECS_PER_MILLISEC < wait_timeout_ms) {
cm_sleep(CM_SLEEP_10_FIXED);
if ((NEED_CHANGE_ROLE(op_type) && elc_get_node_role(stream_id) != change_info->new_role) ||
(NEED_CHANGE_GROUP(op_type) && elc_get_my_group(stream_id) != change_info->new_group) ||
(NEED_CHANGE_PRIORITY(op_type) && elc_get_my_priority(stream_id) != change_info->new_priority)) {
continue;
}
LOG_DEBUG_INF("change self's member success, op_type=%u", op_type);
return CM_SUCCESS;
}
LOG_DEBUG_ERR("change self's member timeout, wait_time=%u ms", wait_timeout_ms);
CM_RETURN_IFERR(md_set_status(META_NORMAL));
return CM_TIMEDOUT;
}
int dcf_change_member(const char *change_str, unsigned int wait_timeout_ms)
{
uint32 stream_id = CM_INVALID_STREAM_ID;
uint32 node_id = CM_INVALID_NODE_ID;
dcf_change_member_t change_info = {0};
cm_reset_error();
CM_CHECK_NULL_PTR(change_str);
LOG_OPER("dcf change member, change_str=%s wait_timeout_ms=%u", change_str, wait_timeout_ms);
CM_RETURN_IFERR(parse_change_member_str(change_str, &stream_id, &node_id, &change_info));
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (NEED_CHANGE_ROLE(change_info.op_type) &&
(change_info.new_role != DCF_ROLE_FOLLOWER && change_info.new_role != DCF_ROLE_PASSIVE)) {
LOG_DEBUG_ERR("change member's role to (%u) is not support.", change_info.new_role);
return CM_ERROR;
}
dcf_node_t node_info;
CM_RETURN_IFERR(md_get_stream_node_ext(stream_id, node_id, &node_info));
if (NEED_CHANGE_ROLE(change_info.op_type) && node_info.default_role == DCF_ROLE_LOGGER) {
LOG_DEBUG_ERR("change LOGGER's role is not support.");
return CM_ERROR;
}
uint32 leader = elc_get_votefor(stream_id);
if (leader == CM_INVALID_NODE_ID) {
LOG_DEBUG_ERR("leader=%d invalid, can't change member now.", CM_INVALID_NODE_ID);
return CM_ERROR;
}
if (NEED_CHANGE_ROLE(change_info.op_type) && node_id == leader) {
LOG_DEBUG_ERR("node_id (%u) is leader, can't change role.", node_id);
return CM_ERROR;
}
if (I_AM_LEADER(stream_id)) {
LOG_DEBUG_INF("I'm leader, change node[%u]'s member now.", node_id);
return leader_change_member_process(stream_id, node_id, &change_info, wait_timeout_ms);
} else {
LOG_DEBUG_INF("I'm not leader, change node[%u]'s member now.", node_id);
return nonleader_change_member_process(stream_id, leader, node_id, &change_info, wait_timeout_ms);
}
}
#define CHANGE_ROLE_BUFFER_SIZE (uint32)256
int dcf_change_member_role(unsigned int stream_id, unsigned int node_id, dcf_role_t new_role,
unsigned int wait_timeout_ms)
{
char change_str[CHANGE_ROLE_BUFFER_SIZE] = {0};
PRTS_RETURN_IFERR(snprintf_s(change_str, CHANGE_ROLE_BUFFER_SIZE, CHANGE_ROLE_BUFFER_SIZE - 1,
"[{\"stream_id\":%u,\"node_id\":%u,\"role\":\"%s\"}]", stream_id, node_id, md_get_rolename_by_type(new_role)));
return dcf_change_member(change_str, wait_timeout_ms);
}
int dcf_set_applied_index(unsigned int stream_id, unsigned long long index)
{
cm_reset_error();
LOG_OPER("dcf set applied index, stream_id=%u index=%llu", stream_id, index);
if (stream_id == CM_INVALID_STREAM_ID || stream_id >= CM_MAX_STREAM_COUNT) {
return CM_ERROR;
}
return stg_set_applied_index(stream_id, index);
}
int dcf_get_cluster_min_applied_idx(unsigned int stream_id, unsigned long long* index)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (index == NULL) {
LOG_DEBUG_ERR("index is null");
return CM_ERROR;
}
*index = rep_get_cluster_min_apply_idx(stream_id);
return CM_SUCCESS;
}
int dcf_get_leader_last_index(unsigned int stream_id, unsigned long long* index)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (index == NULL) {
LOG_DEBUG_ERR("index is null");
return CM_ERROR;
}
*index = rep_get_leader_last_index(stream_id);
if (*index == CM_INVALID_INDEX_ID) {
return CM_ERROR;
}
return CM_SUCCESS;
}
int dcf_get_last_index(unsigned int stream_id, unsigned long long* index)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (index == NULL) {
LOG_DEBUG_ERR("index is null");
return CM_ERROR;
}
*index = rep_get_last_index(stream_id);
if (*index == CM_INVALID_INDEX_ID) {
return CM_ERROR;
}
return CM_SUCCESS;
}
int dcf_get_node_last_disk_index(unsigned int stream_id, unsigned int node_id, unsigned long long* index)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (index == NULL) {
LOG_DEBUG_ERR("index is null");
return CM_ERROR;
}
CM_RETURN_IFERR(md_check_stream_node_exist(stream_id, node_id));
if (elc_get_node_role(stream_id) != DCF_ROLE_LEADER) {
LOG_DEBUG_ERR("current node is not leader.");
return CM_ERROR;
}
*index = rep_leader_get_match_index(stream_id, node_id).index;
return CM_SUCCESS;
}
#define CHECK_IF_ROLE_CHANGE(stream_id, old_role) \
do { \
dcf_role_t _new_role_ = elc_get_node_role(stream_id); \
if (SECUREC_UNLIKELY(_new_role_ != (old_role))) { \
LOG_DEBUG_ERR("node role changed, old_role=%d, new_role=%d.", old_role, _new_role_); \
return CM_ERROR; \
} \
} while (0)
bool32 log_catch_up(uint32 stream_id, uint32 node_id)
{
uint64 my_idx;
uint64 cmp_idx;
my_idx = stg_last_log_id(stream_id).index;
if (I_AM_LEADER(stream_id)) {
cmp_idx = rep_leader_get_match_index(stream_id, node_id).index;
} else {
cmp_idx = rep_follower_get_leader_last_idx(stream_id);
cmp_idx = MAX(get_block_last_index(stream_id), cmp_idx);
}
return ((my_idx == cmp_idx) && (my_idx != 0));
}
int dcf_promote_leader(unsigned int stream_id, unsigned int node_id, unsigned int wait_timeout_ms)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
LOG_OPER("dcf promote leader, stream_id=%u node_id=%u wait_timeout_ms=%u",
stream_id, node_id, wait_timeout_ms);
dcf_node_t node_info;
CM_RETURN_IFERR(md_get_stream_node_ext(stream_id, node_id, &node_info));
dcf_role_t role = node_info.default_role;
if (role == DCF_ROLE_PASSIVE || role == DCF_ROLE_LOGGER) {
LOG_DEBUG_ERR("promote role (%d) to leader is not support.", role);
return CM_ERROR;
}
uint32 leader = elc_get_votefor(stream_id);
if (leader == CM_INVALID_NODE_ID) {
LOG_DEBUG_ERR("leader=%d invalid, can't promote now.", CM_INVALID_NODE_ID);
return CM_ERROR;
}
if (node_id == leader) {
LOG_DEBUG_INF("node_id (%u) is already leader.", node_id);
return CM_SUCCESS;
}
dcf_role_t original_role = elc_get_node_role(stream_id);
if (original_role != DCF_ROLE_LEADER && md_get_cur_node() != node_id) {
LOG_DEBUG_ERR("follower can only promote self now, node_id=%u.", node_id);
return CM_ERROR;
}
if (wait_timeout_ms == 0) {
LOG_DEBUG_INF("wait_timeout_ms is 0, promote directly.");
return elc_promote_leader(stream_id, node_id);
}
set_block_ack(stream_id, NO_ACK, CM_INVALID_INDEX_ID);
CM_RETURN_IFERR(block_node_req(stream_id, leader, wait_timeout_ms));
timespec_t begin = cm_clock_now();
block_ack_t ack = get_block_ack(stream_id);
while (((uint64)(cm_clock_now() - begin)) / MICROSECS_PER_MILLISEC < wait_timeout_ms) {
if (ack == ERROR_ACK) {
LOG_DEBUG_ERR("recv error ack, ack=%d.", ack);
return CM_ERROR;
} else if (ack == SUCCESS_ACK) {
if (log_catch_up(stream_id, node_id)) {
LOG_DEBUG_INF("catch_up leader, promote now.");
return elc_promote_leader(stream_id, node_id);
}
}
CHECK_IF_ROLE_CHANGE(stream_id, original_role);
cm_sleep(CM_SLEEP_1_FIXED);
ack = get_block_ack(stream_id);
}
LOG_DEBUG_ERR("not yet catchup or block_ack timeout, ack=%d.", ack);
return CM_ERROR;
}
int dcf_timeout_notify(unsigned int stream_id)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
timespec_t now = cm_clock_now();
uint32 elc_times = elc_stream_get_elc_timeout_ms() * MICROSECS_PER_MILLISEC;
if (stream_id != 0) {
return elc_stream_set_timeout(stream_id, now - elc_times);
} else {
uint32 stream_list[CM_MAX_STREAM_COUNT];
uint32 stream_count;
CM_RETURN_IFERR(md_get_stream_list(stream_list, &stream_count));
for (uint32 i = 0; i < stream_count; i++) {
(void)elc_stream_set_timeout(stream_list[i], now - elc_times);
}
return CM_SUCCESS;
}
}
int dcf_node_is_healthy(unsigned int stream_id, dcf_role_t* node_role, unsigned int* is_healthy)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
return elc_node_is_healthy(stream_id, node_role, is_healthy);
}
int dcf_set_work_mode(unsigned int stream_id, dcf_work_mode_t work_mode, unsigned int vote_num)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
LOG_OPER("dcf set work mode, stream_id=%u work_mode=%d vote_num=%u", stream_id, work_mode, vote_num);
return elc_set_work_mode(stream_id, work_mode, vote_num);
}
int dcf_check_if_all_logs_applied(unsigned int stream_id, unsigned int *all_applied)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (all_applied == NULL) {
LOG_DEBUG_ERR("all_applied pointer is null");
return CM_ERROR;
}
CM_RETURN_IFERR(md_check_stream_node_exist(stream_id, md_get_cur_node()));
*all_applied = rep_get_can_write_flag(stream_id);
LOG_DEBUG_INF("all_applied_flag=%u", *all_applied);
return CM_SUCCESS;
}
Internal Invocation
*/
void dcf_set_exception(unsigned int stream_id, dcf_exception_t exception)
{
g_dcf_exception.stream_id = stream_id;
g_dcf_exception.exception = exception;
}
int dcf_send_msg(unsigned int stream_id, unsigned int dest_node_id, const char* msg, unsigned int msg_size)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (dest_node_id == CM_INVALID_NODE_ID || dest_node_id >= CM_MAX_NODE_COUNT) {
LOG_DEBUG_ERR("The msg parameter from dest_node_id is invalid.");
return CM_ERROR;
}
if (msg == NULL || msg_size == 0) {
LOG_DEBUG_ERR("The msg parameter from dcf_send_msg is invalid.");
return CM_ERROR;
}
if (msg_size > SIZE_K(512)) {
LOG_DEBUG_ERR("The size of msg exceed 512K.");
return CM_ERROR;
}
uint32 src_node_id = md_get_cur_node();
mec_message_t pack;
CM_RETURN_IFERR(mec_alloc_pack(&pack, MEC_CMD_SEND_COMMON_MSG, src_node_id, dest_node_id, stream_id));
if (mec_put_bin(&pack, msg_size, msg) != CM_SUCCESS) {
LOG_DEBUG_ERR("Put msg into pack failed.");
mec_release_pack(&pack);
return CM_ERROR;
}
status_t status = mec_send_data(&pack);
mec_release_pack(&pack);
return status;
}
status_t get_stream_node_list(uint32 stream_id, uint64* inst_bits)
{
uint32 node_list[CM_MAX_NODE_COUNT];
uint32 node_count;
CM_RETURN_IFERR(md_get_stream_nodes(stream_id, node_list, &node_count));
uint32 current_node_id = md_get_cur_node();
for (uint32 i = 0; i < node_count; i++) {
uint32 id = node_list[i];
if (id != current_node_id) {
MEC_SET_BRD_INST(inst_bits, id);
}
}
return CM_SUCCESS;
}
int dcf_broadcast_msg(unsigned int stream_id, const char* msg, unsigned int msg_size)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (msg == NULL || msg_size == 0) {
LOG_DEBUG_ERR("The msg parameter from dcf_send_msg is invalid.");
return CM_ERROR;
}
if (msg_size > SIZE_K(512)) {
LOG_DEBUG_ERR("The size of msg exceed 512K.");
return CM_ERROR;
}
uint64 inst_bits[INSTS_BIT_SZ] = {0};
uint64 success_inst[INSTS_BIT_SZ];
CM_RETURN_IFERR(get_stream_node_list(stream_id, inst_bits));
uint32 src_node_id = md_get_cur_node();
mec_message_t pack;
CM_RETURN_IFERR(mec_alloc_pack(&pack, MEC_CMD_SEND_COMMON_MSG, src_node_id, CM_INVALID_NODE_ID, stream_id));
if (mec_put_bin(&pack, msg_size, msg) != CM_SUCCESS) {
LOG_DEBUG_ERR("Put msg into pack failed.");
mec_release_pack(&pack);
return CM_ERROR;
}
mec_broadcast(stream_id, inst_bits, &pack, success_inst);
LOG_DEBUG_INF("Send msg broadcast, local node_id=%d, stream_id=%u", src_node_id, stream_id);
mec_release_pack(&pack);
return CM_SUCCESS;
}
Suspend replication log for this node
*/
int dcf_pause_rep(unsigned int stream_id, unsigned int node_id, unsigned int time_us)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (node_id == CM_INVALID_NODE_ID || node_id >= CM_MAX_NODE_COUNT) {
LOG_DEBUG_ERR("The msg parameter from node_id is invalid.");
return CM_ERROR;
}
LOG_OPER("dcf set pausing time for replication, stream_id=%u node_id=%d time_us=%u", stream_id, node_id, time_us);
if (time_us != DCF_MAX_PAUSE_TIME && time_us > DCF_MAX_NORMAL_USE_PAUSE_TIME) {
LOG_DEBUG_ERR("time_us %u is greater than 1000000.", time_us);
return CM_ERROR;
}
if (!I_AM_LEADER(stream_id)) {
return CM_ERROR;
}
rep_set_pause_time(stream_id, node_id, time_us);
return CM_SUCCESS;
}
int dcf_demote_follower(unsigned int stream_id)
{
cm_reset_error();
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
LOG_OPER("dcf demote follower, stream_id=%u", stream_id);
return elc_demote_follower(stream_id);
}
#define UNIVERSAL_WAIT_TIME_OUT 3000
static uint32 alloc_req_id(dcf_parallel_msg_t* parallel_msg)
{
uint32 req_id = MAX_PARALLEL_MAX_NUM;
cm_spin_lock(¶llel_msg->lock, NULL);
for (int i = 0; i < MAX_PARALLEL_MAX_NUM; i++) {
uint32 cur_id = parallel_msg->cursor;
parallel_msg->cursor = (cur_id + 1) % MAX_PARALLEL_MAX_NUM;
if (parallel_msg->req_id_status[cur_id] == 0) {
parallel_msg->req_id_status[cur_id] = 1;
req_id = cur_id;
break;
}
}
cm_spin_unlock(¶llel_msg->lock);
return req_id;
}
static inline void free_req_id(dcf_parallel_msg_t* parallel_msg, uint32 req_id)
{
cm_spin_lock(¶llel_msg->lock, NULL);
parallel_msg->req_id_status[req_id] = 0;
cm_spin_unlock(¶llel_msg->lock);
}
static inline uint64 get_commit_index_result(uint32 req_id)
{
return (uint64)cm_atomic_get(&g_consensus_hc_info.ack_result[req_id]);
}
static inline void set_commit_index_result(uint32 req_id, uint64 result)
{
(void)cm_atomic_set(&g_consensus_hc_info.ack_result[req_id], result);
}
status_t send_fetching_commit_index_remote_req(unsigned int stream_id, uint32 is_consensus, uint32 req_id)
{
uint32 leader_node = elc_get_votefor(stream_id);
if (leader_node == CM_INVALID_NODE_ID) {
LOG_DEBUG_ERR("[DCF]leader=%d invalid, can't send consensus msg req now.", CM_INVALID_NODE_ID);
return CM_ERROR;
}
uint32 src_node = md_get_cur_node();
mec_message_t pack;
CM_RETURN_IFERR(mec_alloc_pack(&pack, MEC_CMD_GET_COMMIT_INDEX_REQ, src_node, leader_node, stream_id));
if (mec_put_int32(&pack, req_id) != CM_SUCCESS || mec_put_int32(&pack, is_consensus) != CM_SUCCESS) {
LOG_DEBUG_ERR("[DCF]consensus health-check encode failed, src_node=%u, leader=%u.", src_node, leader_node);
mec_release_pack(&pack);
return CM_ERROR;
}
status_t ret = mec_send_data(&pack);
mec_release_pack(&pack);
if (ret != CM_SUCCESS) {
LOG_DEBUG_ERR("[DCF]send consensus health-check req failed, src_node=%u, leader=%u.", src_node, leader_node);
}
return ret;
}
static inline uint64 get_universal_write_index(uint32 req_id)
{
return (uint64)cm_atomic_get(&g_universal_write_info.ack_result[req_id]);
}
static inline void set_universal_write_index(uint32 req_id, uint64 index)
{
(void)cm_atomic_set(&g_universal_write_info.ack_result[req_id], index);
}
status_t send_universal_write_req(unsigned int stream_id, const char *buffer, unsigned int length,
unsigned long long key, uint32 req_id)
{
uint32 leader = elc_get_votefor(stream_id);
if (leader == CM_INVALID_NODE_ID) {
LOG_DEBUG_ERR("[DCF]leader=%d invalid, can't send universal write req now.", CM_INVALID_NODE_ID);
return CM_ERROR;
}
uint32 src_node = md_get_cur_node();
mec_message_t pack;
CM_RETURN_IFERR(mec_alloc_pack(&pack, MEC_CMD_UNIVERSAL_WRITE_REQ, src_node, leader, stream_id));
if (mec_put_int32(&pack, req_id) != CM_SUCCESS
|| mec_put_int64(&pack, key) != CM_SUCCESS
|| mec_put_bin(&pack, length, buffer) != CM_SUCCESS) {
LOG_DEBUG_ERR("[DCF]universal write encode failed, src_node=%u, leader=%u.", src_node, leader);
mec_release_pack(&pack);
return CM_ERROR;
}
status_t ret = mec_send_data(&pack);
mec_release_pack(&pack);
if (ret != CM_SUCCESS) {
LOG_DEBUG_ERR("[DCF]send universal write req failed, src_node=%u, leader=%u.", src_node, leader);
}
return ret;
}
int dcf_universal_write(unsigned int stream_id, const char *buffer, unsigned int length,
unsigned long long key, unsigned long long *index)
{
cm_reset_error();
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (buffer == NULL || length == 0) {
LOG_DEBUG_ERR("[DCF]buffer(%p) or length(%d) error.", buffer, length);
return CM_ERROR;
}
if (I_AM_LEADER(stream_id)) {
LOG_DEBUG_INF("[DCF]I am leader, write directly.");
return dcf_write(stream_id, buffer, length, key, index);
}
uint32 req_id = alloc_req_id(&g_universal_write_info);
if (req_id >= MAX_PARALLEL_MAX_NUM) {
LOG_DEBUG_ERR("[DCF]req_id=%d invalid, try later.", req_id);
return CM_ERROR;
}
uint64 last_write_index = 0;
set_universal_write_index(req_id, last_write_index);
CM_RETURN_IFERR_EX(send_universal_write_req(stream_id, buffer, length, key, req_id),
free_req_id(&g_universal_write_info, req_id));
timespec_t begin = cm_clock_now();
uint64 ack_index;
while (((uint64)(cm_clock_now() - begin)) / MICROSECS_PER_MILLISEC < UNIVERSAL_WAIT_TIME_OUT) {
ack_index = get_universal_write_index(req_id);
if (ack_index > last_write_index) {
LOG_DEBUG_INF("[DCF]universal write succeed, stream_id=%u buflength=%u key=%llu index=%llu req_id=%u",
stream_id, length, key, ack_index, req_id);
free_req_id(&g_universal_write_info, req_id);
if (index != NULL) {
*index = ack_index;
}
return CM_SUCCESS;
}
(void)cm_event_timedwait(&g_universal_write_info.event[req_id], CM_SLEEP_1_FIXED);
}
LOG_DEBUG_ERR("[DCF]universal write timeout %u ms, stream_id=%u buflength=%u key=%llu index=%llu req_id=%u",
UNIVERSAL_WAIT_TIME_OUT, stream_id, length, key, ack_index, req_id);
free_req_id(&g_universal_write_info, req_id);
return CM_TIMEDOUT;
}
status_t universal_write_req_proc(mec_message_t *pack)
{
uint32 stream_id = pack->head->stream_id;
uint32 src_node = pack->head->src_inst;
LOG_DEBUG_INF("[DCF]recv universal_write_req: stream_id=%u, src_node=%u", stream_id, src_node);
uint32 req_id;
CM_RETURN_IFERR(mec_get_int32(pack, (int32*)&req_id));
uint64 key;
CM_RETURN_IFERR(mec_get_int64(pack, (int64*)&key));
char *buf = NULL;
uint32 len = 0;
CM_RETURN_IFERR(mec_get_bin(pack, &len, (void **)&buf));
uint64 write_index;
if (dcf_write(stream_id, buf, len, key, &write_index) != CM_SUCCESS) {
LOG_DEBUG_ERR("[DCF]recv universal_write_req: dcf write failed.");
return CM_ERROR;
}
mec_message_t ack_pack;
CM_RETURN_IFERR(mec_alloc_pack(&ack_pack, MEC_CMD_UNIVERSAL_WRITE_ACK, md_get_cur_node(), src_node, stream_id));
if (mec_put_int32(&ack_pack, req_id) != CM_SUCCESS
|| mec_put_int64(&ack_pack, write_index) != CM_SUCCESS) {
LOG_DEBUG_ERR("[DCF]recv universal_write_req: encode failed,write_index=%llu.", write_index);
mec_release_pack(&ack_pack);
return CM_ERROR;
}
status_t ret = mec_send_data(&ack_pack);
mec_release_pack(&ack_pack);
if (ret != CM_SUCCESS) {
LOG_DEBUG_ERR("[DCF]recv universal_write_req: send data failed.");
}
return ret;
}
status_t universal_write_ack_proc(mec_message_t *pack)
{
uint32 stream_id = pack->head->stream_id;
uint32 req_id;
CM_RETURN_IFERR(mec_get_int32(pack, (int32*)&req_id));
uint64 ack_write_index;
CM_RETURN_IFERR(mec_get_int64(pack, (int64*)&ack_write_index));
LOG_DEBUG_INF("[DCF]recv universal_write_ack: stream_id=%u, req_id=%u, writeindex=%llu.",
stream_id, req_id, ack_write_index);
if (req_id < MAX_PARALLEL_MAX_NUM) {
set_universal_write_index(req_id, ack_write_index);
cm_event_notify(&g_universal_write_info.event[req_id]);
} else {
LOG_DEBUG_ERR("[DCF]recv universal_write_ack: req_id=%u error, writeindex=%llu.", req_id, ack_write_index);
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t dcf_get_commit_index_local(unsigned int stream_id, unsigned int is_consensus,
unsigned long long* commit_index)
{
dcf_role_t node_role = DCF_ROLE_UNKNOWN;
unsigned int is_healthy = CM_FALSE;
if (is_consensus) {
if (dcf_node_is_healthy(stream_id, &node_role, &is_healthy) != CM_SUCCESS || !is_healthy) {
LOG_DEBUG_ERR("[DCF]Health check failed or the node is not healthy for geting commit index.");
return CM_ERROR;
}
}
*commit_index = rep_get_data_commit_index(stream_id);
LOG_DEBUG_INF("[DCF] dcf get local data commit index:%llu", *commit_index);
return CM_SUCCESS;
}
status_t dcf_get_commit_index_req_proc(mec_message_t *pack)
{
uint32 src_node = pack->head->src_inst;
uint32 stream_id = pack->head->stream_id;
uint64 commit_index = 0;
LOG_DEBUG_INF("[DCF]Recv dcf_health_check_req: stream_id=%u, src_node=%u", stream_id, src_node);
uint32 req_id;
CM_RETURN_IFERR(mec_get_int32(pack, (int32*)&req_id));
uint32 is_consensus = 0;
CM_RETURN_IFERR(mec_get_int32(pack, (int32*)&is_consensus));
if (I_AM_LEADER(stream_id)) {
if (dcf_get_commit_index_local(stream_id, is_consensus, &commit_index) != CM_SUCCESS) {
LOG_DEBUG_ERR("[DCF]Recv get_commit_index_req: check health failed, so send commit_index(0) to remote.");
}
} else {
LOG_DEBUG_ERR("[DCF]Recv get_commit_index_req: it's not leader now.");
}
mec_message_t ack_pack;
CM_RETURN_IFERR(mec_alloc_pack(&ack_pack, MEC_CMD_GET_COMMIT_INDEX_ACK, md_get_cur_node(), src_node, stream_id));
if (mec_put_int32(&ack_pack, req_id) != CM_SUCCESS
|| mec_put_int64(&ack_pack, commit_index) != CM_SUCCESS) {
LOG_DEBUG_ERR("[DCF]Recv get_commit_index_req: encode failed, commit_index=%llu.", commit_index);
mec_release_pack(&ack_pack);
return CM_ERROR;
}
status_t ret = mec_send_data(&ack_pack);
mec_release_pack(&ack_pack);
if (ret != CM_SUCCESS) {
LOG_DEBUG_ERR("[DCF]Recv get_commit_index_req: send data failed.");
}
return ret;
}
status_t dcf_get_commit_index_ack_proc(mec_message_t *pack)
{
uint32 stream_id = pack->head->stream_id;
uint32 req_id;
CM_RETURN_IFERR(mec_get_int32(pack, (int32*)&req_id));
uint64 commit_index;
CM_RETURN_IFERR(mec_get_int64(pack, (int64*)&commit_index));
LOG_DEBUG_INF("[DCF]Recv get_commit_index_ack: stream_id=%u, req_id=%u, commit_index=%llu.",
stream_id, req_id, commit_index);
if (req_id < MAX_PARALLEL_MAX_NUM) {
set_commit_index_result(req_id, commit_index);
cm_event_notify(&g_consensus_hc_info.event[req_id]);
} else {
LOG_DEBUG_ERR("[DCF]recv get_commit_index_ack: req_id=%u error, commit_index=%llu.", req_id, commit_index);
return CM_ERROR;
}
return CM_SUCCESS;
}
static int dcf_get_commit_index_remote(unsigned int stream_id, unsigned int is_consensus,
unsigned long long* commit_index)
{
uint32 req_id = alloc_req_id(&g_consensus_hc_info);
if (req_id >= MAX_PARALLEL_MAX_NUM) {
LOG_DEBUG_ERR("[DCF]Check health using req_id=%d invalid, try later.", req_id);
return CM_ERROR;
}
int64 index_default = -1;
set_commit_index_result(req_id, index_default);
CM_RETURN_IFERR_EX(send_fetching_commit_index_remote_req(stream_id, is_consensus, req_id),
free_req_id(&g_consensus_hc_info, req_id));
timespec_t begin = cm_clock_now();
uint64 result;
while (((uint64)(cm_clock_now() - begin)) / MICROSECS_PER_MILLISEC < UNIVERSAL_WAIT_TIME_OUT) {
result = get_commit_index_result(req_id);
if ((int64)result != index_default) {
free_req_id(&g_consensus_hc_info, req_id);
LOG_DEBUG_INF("[DCF]leader check health succeed, stream_id=%u result=%llu req_id=%u",
stream_id, result, req_id);
*commit_index = result;
return CM_SUCCESS;
}
(void)cm_event_timedwait(&g_consensus_hc_info.event[req_id], CM_SLEEP_1_FIXED);
}
LOG_DEBUG_ERR("[DCF]Waiting health-check timeout %u ms, stream_id=%u req_id=%u",
UNIVERSAL_WAIT_TIME_OUT, stream_id, req_id);
free_req_id(&g_consensus_hc_info, req_id);
return CM_TIMEDOUT;
}
int dcf_get_data_commit_index(unsigned int stream_id, dcf_commit_index_type_t index_type, unsigned long long* index)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (index == NULL) {
LOG_DEBUG_ERR("[DCF]The parameter index is null for geting commit index.");
return CM_ERROR;
}
unsigned int is_consensus = CM_FALSE;
if (index_type == DCF_LOCAL_COMMIT_INDEX) {
return dcf_get_commit_index_local(stream_id, CM_FALSE, index);
} else if (index_type == DCF_LEADER_COMMIT_INDEX) {
is_consensus = CM_FALSE;
} else if (index_type == DCF_CONSENSUS_COMMIT_INDEX) {
is_consensus = CM_TRUE;
} else {
LOG_DEBUG_ERR("[DCF]Getting commit index for index_type=%d is over range.", index_type);
return CM_ERROR;
}
if (I_AM_LEADER(stream_id)) {
LOG_DEBUG_INF("[DCF]I am leader, and get commit index directly.");
return dcf_get_commit_index_local(stream_id, is_consensus, index);
} else {
int ret = dcf_get_commit_index_remote(stream_id, is_consensus, index);
if (ret != CM_SUCCESS) {
LOG_DEBUG_ERR("[DCF]It's failed or timeout from leader for geting commit index.");
return ret;
}
}
return CM_SUCCESS;
}
int dcf_get_current_term_and_role(unsigned int stream_id, unsigned long long* term, dcf_role_t* role)
{
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
if (term == NULL || role == NULL) {
LOG_DEBUG_ERR("[DCF]term(%p) or role(%p) error.", term, role);
return CM_ERROR;
}
return elc_get_current_term_and_role(stream_id, term, role);
}
int dcf_set_election_priority(unsigned int stream_id, unsigned long long priority)
{
static date_t last[CM_MAX_STREAM_COUNT] = {0};
if (stream_id >= CM_MAX_STREAM_COUNT) {
LOG_DEBUG_ERR("[DCF]stream_id=%u invalid", stream_id);
return CM_ERROR;
}
date_t now = g_timer()->now;
if (now - last[stream_id] < MICROSECS_PER_SECOND) {
LOG_DEBUG_INF("[DCF]interval too small.stream_id=%u priority=%llu can't set this time.", stream_id, priority);
return CM_SUCCESS;
}
last[stream_id] = now;
if (!elc_stream_get_auto_elc_pri_en()) {
LOG_DEBUG_INF("[DCF]auto_priority disabled.stream_id=%u priority=%llu can't set.", stream_id, priority);
return CM_SUCCESS;
}
CM_RETURN_IF_FALSE(check_if_node_inited(stream_id));
elc_set_my_priority(stream_id, priority);
return CM_SUCCESS;
}
#ifdef WIN32
const char *dcf_get_version()
{
return "NONE";
}
#else
extern const char *dcf_get_version();
#endif
void dcf_set_timer(void *timer)
{
cm_set_timer((gs_timer_t *)timer);
}
int dcf_is_diff_endian(unsigned int stream_id, unsigned int dest_node_id)
{
cm_reset_error();
return (int)mec_is_diff_endian(stream_id, dest_node_id);
}
int dcf_get_conn_status(unsigned int stream_id, unsigned int dest_node_id)
{
uint32 count;
uint32 node_list[CM_MAX_NODE_COUNT];
uint32 node_id, dest_node;
cm_reset_error();
if (md_get_cur_node() == dest_node_id) {
return CM_SUCCESS;
}
CM_RETURN_IFERR(md_get_stream_nodes(stream_id, node_list, &count));
dest_node = dest_node_id;
for (uint32 i = 0; i < count; i++) {
node_id = node_list[i];
if (node_id == dest_node) {
status_t ret = mec_check_one_connect(node_id) ? CM_SUCCESS : CM_ERROR;
LOG_DEBUG_INF("check connect to dest node: %u, ret:%d", dest_node, ret);
return ret;
}
}
return CM_ERROR;
}
#ifdef __cplusplus
}
#endif