/*
 * Copyright (c) 2021 Huawei Technologies Co.,Ltd.
 *
 * openGauss is licensed under Mulan PSL v2.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *
 *          http://license.coscl.org.cn/MulanPSL2
 *
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 * -------------------------------------------------------------------------
 *
 * elc_msg_proc.c
 *    election process
 *
 * IDENTIFICATION
 *    src/election/elc_msg_proc.c
 *
 * -------------------------------------------------------------------------
 */

#include "elc_msg_proc.h"
#include "elc_stream.h"
#include "stg_manager.h"
#include "replication.h"
#include "cm_timer.h"
#include "util_defs.h"
#include "elc_status_check.h"

#ifdef __cplusplus
extern "C" {
#endif

#define WM_NORMAL_2_MINORITY_CANDIDATE_TERM_INC 1000

static status_t elc_set_candidate_term(uint32 stream_id, uint32 node_id, uint64 current_term);

status_t proc_node_voting(uint32 stream_id, uint32 src_node_id)
{
    uint32 voting_weight;
    bool32 is_voter = CM_FALSE;
    CM_RETURN_IFERR(md_is_voter(stream_id, src_node_id, &is_voter));
    if (is_voter) {
        CM_RETURN_IFERR(elc_get_voting_weight(stream_id, src_node_id, &voting_weight));
        CM_RETURN_IFERR(elc_stream_increase_vote_count(stream_id, voting_weight));
    }

    LOG_RUN_INF("[ELC]get vote from stream_id=%u, node_id=%u, term=%llu, vote_count=%u", stream_id, src_node_id,
        elc_stream_get_current_term(stream_id), elc_stream_get_vote_count(stream_id));
    return CM_SUCCESS;
}

status_t proc_node_voting_no(uint32 stream_id, uint32 src_node_id)
{
    uint32 voting_weight;
    bool32 is_voter = CM_FALSE;
    CM_RETURN_IFERR(md_is_voter(stream_id, src_node_id, &is_voter));
    if (is_voter) {
        CM_RETURN_IFERR(elc_get_voting_weight(stream_id, src_node_id, &voting_weight));
        CM_RETURN_IFERR(elc_stream_increase_vote_no_count(stream_id, voting_weight));
    }
    LOG_RUN_INF("[ELC]get vote no from stream_id=%u, node_id=%u, term=%llu, vote_no_count=%u", stream_id, src_node_id,
        elc_stream_get_current_term(stream_id), elc_stream_get_vote_no_count(stream_id));
    return CM_SUCCESS;
}

bool32 is_win(uint32 stream_id)
{
    bool32 is_win;

    if (elc_stream_is_win(stream_id, &is_win) != CM_SUCCESS) {
        return CM_FALSE;
    }

    return is_win;
}

bool32 is_not_win(uint32 stream_id)
{
    bool32 is_not_win = CM_FALSE;
    if (elc_stream_is_not_win(stream_id, &is_not_win) != CM_SUCCESS) {
        return CM_FALSE;
    }
    return is_not_win;
}

status_t elc_vote_req(uint32 stream_id, uint32 vote_flag)
{
    dcf_work_mode_t work_mode = elc_stream_get_work_mode(stream_id);
    uint32 node_id = elc_stream_get_current_node();
    uint64 current_term = elc_stream_get_current_term(stream_id);

    CM_RETURN_IFERR(elc_stream_set_votefor(stream_id, node_id));
    LOG_DEBUG_INF("[ELC]Set votefor as self when elc vote req, stream_id=%u, votefor=%u", stream_id, node_id);

    elc_stream_reset_vote_count(stream_id);
    CM_RETURN_IFERR(proc_node_voting(stream_id, node_id));

    if (is_win(stream_id))  {
        CM_RETURN_IFERR(elc_set_candidate_term(stream_id, node_id, current_term));
        LOG_RUN_INF("[ELC]set self as leader after voting for self, stream_id=%u, work_mode=%d", stream_id, work_mode);
        CM_RETURN_IFERR(elc_stream_set_role(stream_id, DCF_ROLE_LEADER));
        return CM_SUCCESS;
    }

    mec_message_t pack;
    elc_vote_t req_vote;
    req_vote.candidate_id = node_id;
    req_vote.candidate_term = current_term;
    req_vote.last_log = stg_last_log_id(stream_id);
    req_vote.vote_flag = vote_flag;
    req_vote.work_mode = elc_stream_get_work_mode(stream_id);

    CM_RETURN_IFERR(mec_alloc_pack(&pack, MEC_CMD_VOTE_REQUEST_RPC_REQ, node_id, CM_INVALID_NODE_ID, stream_id));
    if (elc_encode_vote_req(&pack, &req_vote) != CM_SUCCESS) {
        mec_release_pack(&pack);
        LOG_DEBUG_ERR("[ELC]Encode vote request failed");
        return CM_ERROR;
    }

    uint64 inst_bits[INSTS_BIT_SZ] = {0};
    uint64 success_inst[INSTS_BIT_SZ];
    if (elc_stream_vote_node_list(stream_id, inst_bits) != CM_SUCCESS) {
        mec_release_pack(&pack);
        LOG_DEBUG_ERR("[ELC]Prepare vote node list failed");
        return CM_ERROR;
    }
    mec_broadcast(stream_id, inst_bits, &pack, success_inst);
    LOG_RUN_INF("[ELC]elc vote req broadcast, stream_id=%u candidate_id=%u candidate_term=%llu last_log.term=%llu "
        "last_log.index=%llu vote_flag=0x%x work_mode=%d", stream_id, req_vote.candidate_id, req_vote.candidate_term,
        req_vote.last_log.term, req_vote.last_log.index, req_vote.vote_flag, req_vote.work_mode);
    mec_release_pack(&pack);
    return CM_SUCCESS;
}

bool32 elc_check_last_log(log_id_t* log_a, log_id_t* log_b)
{
    if (log_id_cmp(log_a, log_b) >= 0) {
        return CM_TRUE;
    }
    return CM_FALSE;
}

static status_t elc_judge_vote_postproc(uint32 stream_id, uint32 src_node, const elc_vote_t* req_vote,
    bool32 vote_granted)
{
    dcf_role_t role = elc_stream_get_role(stream_id);
    uint64 current_term = elc_stream_get_current_term(stream_id);
    uint64 candidate_term = req_vote->candidate_term;

    if (vote_granted == CM_TRUE) {
        if (role != DCF_ROLE_LOGGER && role != DCF_ROLE_PASSIVE) {
            CM_RETURN_IFERR(elc_stream_set_role(stream_id, DCF_ROLE_FOLLOWER));
        }
        CM_RETURN_IFERR(elc_stream_set_votefor(stream_id, src_node));
        LOG_DEBUG_INF("[ELC]Set votefor when judge vote postproc, stream_id=%u, votefor=%u", stream_id, src_node);
        CM_RETURN_IFERR(elc_stream_set_term(stream_id, candidate_term));
        CM_RETURN_IFERR(elc_stream_set_timeout(stream_id, cm_clock_now()));
    } else if (current_term < candidate_term) {
        CM_RETURN_IFERR(elc_stream_set_term(stream_id, candidate_term));
        if (role != DCF_ROLE_LOGGER && role != DCF_ROLE_PASSIVE) {
            CM_RETURN_IFERR(elc_stream_set_role(stream_id, DCF_ROLE_FOLLOWER));
        }
        CM_RETURN_IFERR(elc_stream_set_votefor(stream_id, CM_INVALID_NODE_ID));
        LOG_DEBUG_INF("[ELC]Set votefor as invalid nodeid when judge vote postproc,"
            " current_term=%llu, candidate_term=%llu", current_term, candidate_term);
    }
    return CM_SUCCESS;
}

bool32 elc_need_demote_follow(uint32 stream_id, timespec_t now, uint32 elc_timeout_cnt)
{
    uint32 weight;
    uint32 elc_timeout = elc_stream_get_elc_timeout_ms();
    uint32 hb_ack_timeout_num = 0;
    uint32 voter_num = 0;
    dcf_node_role_t node_list[CM_MAX_NODE_COUNT];
    uint32 node_count;
    uint32 local_node_id = md_get_cur_node(stream_id);
    dcf_role_t default_role;
    if (elc_stream_get_work_mode(stream_id) != WM_NORMAL) {
        return CM_FALSE;
    }
    if (md_get_stream_node_roles(stream_id, node_list, &node_count) != CM_SUCCESS ||
        md_get_voter_num(stream_id, &voter_num) != CM_SUCCESS) {
        return CM_FALSE;
    }
    for (uint32 i = 0; i < node_count; i++) {
        uint32 node_id = node_list[i].node_id;
        default_role  = node_list[i].default_role;
        if (node_id == local_node_id || default_role == DCF_ROLE_PASSIVE) {
            continue;
        }
        uint64 hb_ack_time = elc_stream_get_hb_ack_time(stream_id, node_id);
        uint64 interval = (now < hb_ack_time) ? 0 : (uint64)(now - hb_ack_time);
        if (interval / MICROSECS_PER_MILLISEC > elc_timeout * elc_timeout_cnt) {
            LOG_DEBUG_WAR("[ELC]recv heartbeat ack timout from node_id=%u\n", node_id);
            CM_RETURN_IFERR(elc_get_voting_weight(stream_id, node_id, &weight));
            hb_ack_timeout_num += weight;
        }
        if (hb_ack_timeout_num >= ((voter_num + 1) / CM_2X_FIXED)) {
            LOG_DEBUG_INF("[ELC]Leader need demote follow, local_node_id:%u hb_ack_timeout_num:%u voter_num:%u",
                local_node_id, hb_ack_timeout_num, voter_num);
            return CM_TRUE;
        }
    }
    return CM_FALSE;
}

static bool32 elc_need_judge_vote(uint32 stream_id, uint32 src_node, const elc_vote_t* req_vote)
{
    uint64 current_term = elc_stream_get_current_term(stream_id);
    uint32 votefor = elc_stream_get_votefor(stream_id);
    timespec_t now = cm_clock_now();

    if (ELC_FORCE_VOTE(req_vote->vote_flag)) {
        return CM_TRUE;
    }

    if (votefor == CM_INVALID_NODE_ID || votefor == src_node) {
        return CM_TRUE;
    }

    if (current_term == req_vote->candidate_term && elc_stream_get_role(stream_id) == DCF_ROLE_LEADER &&
        !elc_need_demote_follow(stream_id, now, CM_1X_FIXED)) {
        LOG_RUN_INF("[ELC] leader no need to judge vote from src_node:%u, current_term=%llu", src_node, current_term);
        return CM_FALSE;
    }

    uint64 candidate_term = req_vote->candidate_term;
    if (ELC_PRE_VOTE(req_vote->vote_flag)) {
        candidate_term = candidate_term + 1;
    }

    if ((current_term == candidate_term || current_term == req_vote->candidate_term) &&
        elc_stream_get_role(stream_id) != DCF_ROLE_PRE_CANDIDATE) {
        timespec_t last_hb_time = elc_stream_get_timeout(stream_id);
        uint64 interval_time = ((uint64)(cm_clock_now() - last_hb_time)) / MICROSECS_PER_MILLISEC;
        if (interval_time <  elc_stream_get_elc_timeout_ms()) {
            LOG_RUN_INF("[ELC]not timeout yet, votefor=%u current_term=%llu candidate_term=%llu req_vote_flag=%u",
                elc_stream_get_votefor(stream_id), current_term, candidate_term, req_vote->vote_flag);
            return CM_FALSE;
        }
    }

    return CM_TRUE;
}

status_t elc_judge_vote(uint32 stream_id, uint32 src_node, elc_vote_t* req_vote, bool32* vote_granted)
{
    dcf_role_t role = elc_stream_get_role(stream_id);
    uint64 current_term = elc_stream_get_current_term(stream_id);
    dcf_work_mode_t work_mode = elc_stream_get_work_mode(stream_id);
    if (work_mode != req_vote->work_mode || role == DCF_ROLE_PASSIVE) {
        LOG_DEBUG_INF("[ELC]Not granted to node:%u, since judge as invalid vote, candidate_id=%u work_mode=%d, "
            "local node work_mode=%d role=%d", src_node, req_vote->candidate_id, req_vote->work_mode, work_mode, role);
        return CM_SUCCESS;
    }

    dcf_node_t node_info;
    CM_RETURN_IFERR(md_get_stream_node_ext(stream_id, src_node, &node_info));
    if (node_info.default_role == DCF_ROLE_PASSIVE) {
        LOG_DEBUG_INF("[ELC]Not granted to node:%u, since vote from passive role", src_node);
        return CM_SUCCESS;
    }

    uint64 candidate_term = req_vote->candidate_term;
    if (ELC_PRE_VOTE(req_vote->vote_flag)) {
        candidate_term = candidate_term + 1;
    }

    if (!elc_need_judge_vote(stream_id, src_node, req_vote)) {
        return CM_SUCCESS;
    }

    if (current_term > candidate_term) {
        *vote_granted = CM_FALSE;
        LOG_DEBUG_INF("[ELC]Not granted to node:%u, current_term(%llu) > candidate_term(%llu)", src_node, current_term,
            candidate_term);
        return CM_SUCCESS;
    }

    log_id_t last_log = stg_last_log_id(stream_id);
    if (elc_check_last_log(&req_vote->last_log, &last_log)) {
        *vote_granted = CM_TRUE;
    }
    LOG_DEBUG_INF("[ELC]node:%u,req_last_log term=%llu index=%llu, local_last_log.term=%llu,index=%llu,granted=%u",
        src_node, req_vote->last_log.term, req_vote->last_log.index, last_log.term, last_log.index, *vote_granted);

    if (ELC_PRE_VOTE(req_vote->vote_flag)) {
        LOG_DEBUG_INF("[ELC]elc judge vote return since it's pre vote(vote_flag=0x%x)", req_vote->vote_flag);
        return CM_SUCCESS;
    }

    CM_RETURN_IFERR(elc_judge_vote_postproc(stream_id, src_node, req_vote, *vote_granted));

    return CM_SUCCESS;
}

status_t elc_promote_req(uint32 stream_id, uint32 node_id)
{
    uint32 src_node_id = elc_stream_get_current_node();
    mec_message_t pack;
    elc_hb_t req_vote;
    req_vote.term = elc_stream_get_current_term(stream_id);
    req_vote.send_time = cm_clock_now();
    CM_RETURN_IFERR(mec_alloc_pack(&pack, MEC_CMD_PROMOTE_LEADER_RPC_REQ, src_node_id, node_id, stream_id));
    if (elc_encode_hb_req(&pack, &req_vote) != CM_SUCCESS) {
        mec_release_pack(&pack);
        LOG_DEBUG_ERR("[ELC]encode failed, when send promote message");
        return CM_ERROR;
    }
    status_t status = mec_send_data(&pack);
    mec_release_pack(&pack);
    return status;
}

status_t elc_promote_proc(mec_message_t *pack)
{
    uint32 stream_id = pack->head->stream_id;
    uint32 src_node_id = pack->head->src_inst;
    LOG_DEBUG_INF("[ELC]Receive promote message from stream_id=%u, node_id=%u", stream_id, src_node_id);

    elc_hb_t ack_vote;
    CM_RETURN_IFERR(elc_decode_hb_req(pack, &ack_vote));

    elc_stream_lock_x(stream_id);
    uint64 current_term = elc_stream_get_current_term(stream_id);
    if (ack_vote.term < current_term) {
        LOG_DEBUG_INF("[ELC]term has changed, ignore this message, stream_id=%u, src_node_id=%u",
            stream_id, src_node_id);
        elc_stream_unlock(stream_id);
        return CM_SUCCESS;
    }
    dcf_role_t role = elc_stream_get_role(stream_id);
    if (role == DCF_ROLE_PASSIVE || role == DCF_ROLE_LOGGER) {
        LOG_DEBUG_INF("[ELC]role(%d) can't be elected, ignore this message, stream_id=%u, src_node_id=%u",
            role, stream_id, src_node_id);
        elc_stream_unlock(stream_id);
        return CM_SUCCESS;
    }
    CM_RETURN_IFERR_EX(elc_stream_set_timeout(stream_id, cm_clock_now()), elc_stream_unlock(stream_id));
    CM_RETURN_IFERR_EX(elc_stream_set_role(stream_id, DCF_ROLE_CANDIDATE), elc_stream_unlock(stream_id));
    CM_RETURN_IFERR_EX(elc_stream_set_term(stream_id, current_term + 1), elc_stream_unlock(stream_id));
    uint32 vote_flag = VOTE_FLAG_FORCE_VOTE;
    status_t ret = elc_vote_req(stream_id, vote_flag);
    elc_stream_unlock(stream_id);
    return ret;
}

status_t elc_vote_proc(mec_message_t *pack)
{
    elc_vote_t req_vote;
    uint32 stream_id = pack->head->stream_id;
    uint32 node_id = elc_stream_get_current_node();

    LOG_DEBUG_INF("[ELC]Receive voting request from node_id=%u, stream_id=%u, current_node=%u",
        pack->head->src_inst, stream_id, node_id);

    CM_RETURN_IFERR(elc_decode_vote_req(pack, &req_vote));

    elc_stream_lock_x(stream_id);

    bool32 vote_granted = CM_FALSE;
    CM_RETURN_IFERR_EX(elc_judge_vote(stream_id, pack->head->src_inst, &req_vote, &vote_granted),
        elc_stream_unlock(stream_id));

    mec_message_t ack_pack;
    CM_RETURN_IFERR_EX(mec_alloc_pack(&ack_pack, MEC_CMD_VOTE_REQUEST_RPC_ACK, node_id,
        pack->head->src_inst, stream_id), elc_stream_unlock(stream_id));
    elc_vote_ack_t ack_vote;
    ack_vote.term = elc_stream_get_current_term(stream_id);
    ack_vote.vote_granted = vote_granted;
    ack_vote.work_mode = elc_stream_get_work_mode(stream_id);
    ack_vote.vote_flag = req_vote.vote_flag;

    elc_stream_unlock(stream_id);

    if (elc_encode_vote_ack(&ack_pack, &ack_vote) != CM_SUCCESS) {
        mec_release_pack(&ack_pack);
        return CM_ERROR;
    }

    LOG_RUN_INF("[ELC]Send response to node_id=%u, stream_id=%u, current_node=%u, current_term=%llu, vote_granted=%u, "
        "work_mode=%d", pack->head->src_inst, stream_id, node_id, ack_vote.term, vote_granted, ack_vote.work_mode);

    status_t status = mec_send_data(&ack_pack);
    mec_release_pack(&ack_pack);
    return status;
}

static status_t elc_set_candidate_term(uint32 stream_id, uint32 node_id, uint64 current_term)
{
    dcf_work_mode_t work_mode = elc_stream_get_work_mode(stream_id);
    if (work_mode == WM_NORMAL) {
        return elc_stream_set_term(stream_id, current_term + 1);
    } else if (work_mode == WM_MINORITY) {
        dcf_work_mode_t prev_work_mode = elc_stream_get_vote_node_work_mode(stream_id, node_id);
        if (prev_work_mode == WM_NORMAL) {
            CM_RETURN_IFERR(elc_stream_set_vote_node_work_mode(stream_id, node_id, WM_MINORITY));
            return elc_stream_set_term(stream_id, current_term + WM_NORMAL_2_MINORITY_CANDIDATE_TERM_INC);
        } else {
            return elc_stream_set_term(stream_id, current_term + 1);
        }
    }
    return CM_SUCCESS;
}

status_t vote_grant_proc(uint32 stream_id, uint32 node_id, uint32 src_node, dcf_role_t role, uint64 current_term,
    const elc_vote_ack_t *ack_vote)
{
    if (role == DCF_ROLE_PRE_CANDIDATE) {
        if (ack_vote->work_mode != elc_stream_get_work_mode(stream_id)) {
            return CM_SUCCESS;
        }
        if (ack_vote->term > current_term) {
            return CM_SUCCESS;
        }
        CM_RETURN_IFERR(proc_node_voting(stream_id, src_node));
        if (!is_win(stream_id)) {
            return CM_SUCCESS;
        }
        LOG_RUN_INF("[ELC]pre-voting succeeded, stream_id=%u, node_id=%u, current_term=%llu",
            stream_id, node_id, current_term);
        CM_RETURN_IFERR(elc_stream_set_timeout(stream_id, cm_clock_now()));
        CM_RETURN_IFERR(elc_stream_set_role(stream_id, DCF_ROLE_CANDIDATE));
        CM_RETURN_IFERR(elc_set_candidate_term(stream_id, node_id, current_term));
        uint32 vote_flag = VOTE_FLAG_INIT;
        CM_RETURN_IFERR(elc_vote_req(stream_id, vote_flag));
    } else if (role == DCF_ROLE_CANDIDATE) {
        if (ack_vote->work_mode != elc_stream_get_work_mode(stream_id)) {
            return CM_SUCCESS;
        }
        if (ack_vote->term != current_term) {
            LOG_RUN_WAR("[ELC]term inconsistency, ignore this message, stream_id=%u, node_id=%u,"
                "current_term=%llu, peer_node=%u, peer_term=%llu",
                stream_id, node_id, current_term, src_node, ack_vote->term);
            return CM_SUCCESS;
        }
        CM_RETURN_IFERR(proc_node_voting(stream_id, src_node));
        if (!is_win(stream_id)) {
            return CM_SUCCESS;
        }
        LOG_RUN_INF("[ELC]election is successful, stream_id=%u, node_id=%u, current_term=%llu",
            stream_id, node_id, current_term);
        if (ELC_FORCE_VOTE(ack_vote->vote_flag)) {
            LOG_RUN_INF("[ELC]set force_vote_flag, stream_id=%u, src_node=%u", stream_id, src_node);
            elc_stream_set_force_vote_flag(stream_id, CM_TRUE);
        }
        CM_RETURN_IFERR(elc_stream_set_role(stream_id, DCF_ROLE_LEADER));
        timespec_t now = cm_clock_now();
        for (uint32 j = 0; j < CM_MAX_NODE_COUNT; j++) {
            CM_RETURN_IFERR(elc_stream_set_hb_ack_time(stream_id, j, now));
        }
        CM_RETURN_IFERR(elc_stream_set_old_leader(stream_id, node_id));
        CM_RETURN_IFERR(elc_send_status_info(stream_id));
    }
    return CM_SUCCESS;
}

status_t elc_vote_ack_proc_inside(uint32 stream_id, uint32 src_node, elc_vote_ack_t* ack_vote)
{
    uint32 node_id = elc_stream_get_current_node();
    dcf_role_t role = elc_stream_get_role(stream_id);
    uint64 current_term = elc_stream_get_current_term(stream_id);
    int32 work_mode = elc_stream_get_work_mode(stream_id);

    LOG_RUN_INF("[ELC]receive ack from node_id=%u, stream_id=%u, current_node=%u, current_term=%llu, role=%d, "
        "work_mode=%d peer_term=%llu, vote_granted=%u work_mode=%d", src_node, stream_id, node_id, current_term, role,
        work_mode, ack_vote->term, ack_vote->vote_granted, ack_vote->work_mode);

    if (role != DCF_ROLE_PRE_CANDIDATE && role != DCF_ROLE_CANDIDATE) {
        LOG_DEBUG_INF("[ELC]role changed already, ignore ack");
        return CM_SUCCESS;
    }

    if (ack_vote->vote_granted) {
        CM_RETURN_IFERR(vote_grant_proc(stream_id, node_id, src_node, role, current_term, ack_vote));
    } else { // no vote been obtained
        LOG_RUN_INF("[ELC]no vote been obtained, stream_id=%u, node_id=%u, current_term=%llu, "
            "peer_node=%u, peer_term=%llu",
            stream_id, node_id, current_term, src_node, ack_vote->term);

        if (work_mode != ack_vote->work_mode) {
            return CM_SUCCESS;
        }

        if (ack_vote->term > current_term) {
            CM_RETURN_IFERR(elc_stream_set_term(stream_id, ack_vote->term));
            if (role != DCF_ROLE_LOGGER && role != DCF_ROLE_PASSIVE) {
                CM_RETURN_IFERR(elc_stream_set_role(stream_id, DCF_ROLE_FOLLOWER));
            }
            CM_RETURN_IFERR(elc_stream_set_votefor(stream_id, CM_INVALID_NODE_ID));
            LOG_DEBUG_INF("[ELC]Set votefor as invalid nodeid when vote_ack_proc,"
                " current_term=%llu, ack_vote term=%llu", current_term, ack_vote->term);
        }

        CM_RETURN_IFERR(proc_node_voting_no(stream_id, src_node));
        if (is_not_win(stream_id)) {
            timespec_t date = cm_clock_now() - elc_stream_get_elc_timeout_ms() * MICROSECS_PER_MILLISEC;
            (void)elc_stream_set_timeout(stream_id, date);
            LOG_DEBUG_INF("[ELC]Election is defeated, set last hb time:%lld", date);
        }
    }

    return CM_SUCCESS;
}
status_t elc_vote_ack_proc(mec_message_t *pack)
{
    LOG_DEBUG_INF("[ELC]begin elc_vote_ack_proc");
    uint32 stream_id = pack->head->stream_id;
    elc_vote_ack_t ack_vote;
    CM_RETURN_IFERR(elc_decode_vote_ack(pack, &ack_vote));

    elc_stream_lock_x(stream_id);
    status_t ret = elc_vote_ack_proc_inside(stream_id, pack->head->src_inst, &ack_vote);
    elc_stream_unlock(stream_id);
    LOG_DEBUG_INF("[ELC]end elc_vote_ack_proc");
    return ret;
}

static status_t elc_hb_proc(mec_message_t *pack, elc_hb_t *hb_req, const rcv_node_info_t *rcv_info)
{
    uint32 stream_id = pack->head->stream_id;
    uint32 src_node_id = pack->head->src_inst;
    LOG_DEBUG_INF("[ELC]Receive heartbeat from stream_id=%u, node_id=%u", stream_id, src_node_id);
    stat_record(HB_RECV_COUNT, 1);

    elc_stream_lock_x(stream_id);
    elc_stream_set_leader_group(stream_id, rcv_info->group);
    if (elc_stream_get_leader_start_time(stream_id) == 0) {
        elc_stream_set_leader_start_time(stream_id, cm_clock_now());
    }
    dcf_work_mode_t work_mode = elc_stream_get_work_mode(stream_id);
    if (work_mode == WM_MINORITY && hb_req->work_mode == WM_NORMAL) {
        LOG_DEBUG_INF("[ELC]Ignore heartbeat from node:%u as mismatched work mode, stream_id=%u",
            src_node_id, stream_id);
        elc_stream_unlock(stream_id);
        return CM_SUCCESS;
    }
    status_t ret = elc_stream_refresh_hb_time(stream_id, hb_req->term, hb_req->work_mode, src_node_id);
    elc_stream_unlock(stream_id);
    return ret;
}

static status_t elc_check_md_match_proc(uint32 stream_id, uint32 hb_ack_chksum, timespec_t now, bool32 *need_md_rep)
{
    timespec_t last_md_rep_time = elc_stream_get_last_md_rep_time(stream_id);
    uint32 chksum = md_get_checksum();

    LOG_DEBUG_INF("[ELC]Check metadata match proc, local chksum=%u recved hb_ack_chksum:%u", chksum, hb_ack_chksum);

    if (chksum == hb_ack_chksum) {
        return CM_SUCCESS;
    }

    uint64 interval_time = ((uint64)(now - last_md_rep_time)) / MICROSECS_PER_MILLISEC;
    if (interval_time >= CM_MD_MISMATCH_REP_INTERVAL && elc_stream_get_role(stream_id) == DCF_ROLE_LEADER) {
        LOG_DEBUG_INF("[ELC]Check metadata as mismatched, leader need to rep metadata");
        *need_md_rep = CM_TRUE;
    }

    return CM_SUCCESS;
}

status_t elc_encode_status_info(mec_message_t* pack, uint32 stream_id, int64 send_time)
{
    rcv_node_info_t req_status;
    req_status.role = elc_stream_get_role(stream_id);
    req_status.group = elc_stream_get_my_group(stream_id);
    req_status.priority = elc_stream_get_priority(stream_id);
    req_status.is_in_majority = elc_is_in_majority(stream_id);
    req_status.is_future_hb = elc_stream_is_future_hb(stream_id);
    CM_RETURN_IFERR(elc_encode_status_check_req(pack, &req_status));

    elc_hb_t req_hb;
    req_hb.term = elc_stream_get_current_term(stream_id);
    req_hb.work_mode = elc_stream_get_work_mode(stream_id);
    req_hb.md_chksum = md_get_checksum();
    req_hb.send_time = send_time;
    CM_RETURN_IFERR(elc_encode_hb_req(pack, &req_hb));
    return CM_SUCCESS;
}

status_t elc_send_status_info(uint32 stream_id)
{
    uint32 cur_node_id = md_get_cur_node();
    mec_message_t pack;
    CM_RETURN_IFERR(mec_alloc_pack(&pack, MEC_CMD_STATUS_CHECK_RPC_REQ, cur_node_id, CM_INVALID_NODE_ID, stream_id));

    if (elc_encode_status_info(&pack, stream_id, cm_clock_now()) != CM_SUCCESS) {
        mec_release_pack(&pack);
        LOG_DEBUG_ERR("[ELC]send_status_info encode failed, stream_id=%u,node_id=%u", stream_id, cur_node_id);
        return CM_ERROR;
    }

    uint64 inst_bits[INSTS_BIT_SZ] = {0};
    uint64 success_inst[INSTS_BIT_SZ];
    if (elc_stream_vote_node_list(stream_id, inst_bits) != CM_SUCCESS) {
        mec_release_pack(&pack);
        LOG_DEBUG_ERR("[ELC]status_check prepare node list failed, stream_id=%u,node_id=%u", stream_id, cur_node_id);
        return CM_ERROR;
    }
    mec_broadcast(stream_id, inst_bits, &pack, success_inst);
    if (elc_stream_get_role(stream_id) == DCF_ROLE_LEADER) {
        stat_record(HB_SEND_COUNT, 1);
    }
    LOG_DEBUG_INF("[ELC]elc status info send end, stream_id=%u,node_id=%u", stream_id, cur_node_id);
    mec_release_pack(&pack);
    return CM_SUCCESS;
}

static status_t elc_status_info_ack(uint32 stream_id, uint32 dst_node, int64 req_hb_send_time)
{
    mec_message_t pack;
    uint32 src_node = elc_stream_get_current_node();
    CM_RETURN_IFERR(mec_alloc_pack(&pack, MEC_CMD_STATUS_CHECK_RPC_ACK, src_node, dst_node, stream_id));

    elc_stream_lock_s(stream_id);
    status_t ret = elc_encode_status_info(&pack, stream_id, req_hb_send_time);
    elc_stream_unlock(stream_id);
    if (ret != CM_SUCCESS) {
        mec_release_pack(&pack);
        LOG_DEBUG_ERR("[ELC]status_info_ack encode failed, stream_id=%u,node_id=%u", stream_id, src_node);
        return CM_ERROR;
    }

    ret = mec_send_data(&pack);
    LOG_DEBUG_INF("[ELC]status_info_ack end, stream_id=%u,node_id=%u", stream_id, src_node);
    mec_release_pack(&pack);
    return ret;
}

status_t elc_status_check_req_proc(mec_message_t *pack)
{
    uint32 stream_id = pack->head->stream_id;
    uint32 src_node = pack->head->src_inst;
    LOG_DEBUG_INF("[ELC]recv status_check_req: stream_id=%u, src_node=%u", stream_id, src_node);

    rcv_node_info_t rcv_info;
    CM_RETURN_IFERR(elc_decode_status_check_req(pack, &rcv_info));
    rcv_info.last_recv_time = cm_clock_now();
    elc_save_status_check_info(stream_id, src_node, &rcv_info);

    if (rcv_info.role == DCF_ROLE_LEADER) {
        elc_hb_t hb_req;
        CM_RETURN_IFERR(elc_decode_hb_req(pack, &hb_req));
        CM_RETURN_IFERR(elc_hb_proc(pack, &hb_req, &rcv_info));

        LOG_DEBUG_INF("[ELC]send status_info ack, stream_id=%u", stream_id);
        CM_RETURN_IFERR(elc_status_info_ack(stream_id, src_node, hb_req.send_time));
    }

    LOG_DEBUG_INF("[ELC]recv status_check_req end: stream_id=%u, src_node=%u", stream_id, src_node);
    return CM_SUCCESS;
}

status_t elc_status_check_ack_proc(mec_message_t *pack)
{
    uint32 stream_id = pack->head->stream_id;
    uint32 src_node_id = pack->head->src_inst;

    rcv_node_info_t rcv_info;
    CM_RETURN_IFERR(elc_decode_status_check_req(pack, &rcv_info));

    elc_hb_t ack_hb;
    CM_RETURN_IFERR(elc_decode_hb_req(pack, &ack_hb));
    stat_record(HB_RTT, (uint64)(cm_clock_now() - ack_hb.send_time));
    LOG_DEBUG_INF("[ELC]Receive heartbeat ack from stream_id=%u, node_id=%u, ack_hb's term:%llu work_mode:%u "
        "md_chksum:%u", stream_id, src_node_id, ack_hb.term, ack_hb.work_mode, ack_hb.md_chksum);

    elc_stream_lock_x(stream_id);
    status_t ret = elc_stream_set_vote_node_work_mode(stream_id, src_node_id, ack_hb.work_mode);
    if (ret != CM_SUCCESS) {
        elc_stream_unlock(stream_id);
        return ret;
    }

    bool32 need_md_rep = CM_FALSE;
    timespec_t now = cm_clock_now();
    ret = elc_stream_refresh_hb_ack_time(stream_id, ack_hb.term, src_node_id);
    if (ret != CM_SUCCESS) {
        elc_stream_unlock(stream_id);
        return ret;
    }
    ret = elc_check_md_match_proc(stream_id, ack_hb.md_chksum, now, &need_md_rep);
    elc_stream_unlock(stream_id);
    if (need_md_rep == CM_TRUE) {
        uint32 size;
        CM_RETURN_IFERR(md_set_status(META_CATCH_UP));
        if (md_to_string(md_get_buffer(), CM_METADATA_DEF_MAX_LEN, &size) != CM_SUCCESS) {
            CM_RETURN_IFERR(md_set_status(META_NORMAL));
            return CM_ERROR;
        }
        LOG_DEBUG_INF("[ELC]Check metadata as mismatched, leader prepare to rep metadata:%s", md_get_buffer());
        if (rep_write(stream_id, md_get_buffer(), size, CFG_LOG_KEY(src_node_id, OP_FLAG_ALL),
            ENTRY_TYPE_CONF, NULL) != CM_SUCCESS) {
            CM_RETURN_IFERR(md_set_status(META_NORMAL));
            return CM_ERROR;
        }
        CM_RETURN_IFERR(md_set_status(META_NORMAL));
        CM_RETURN_IFERR(elc_stream_set_last_md_rep_time(stream_id, now));
    }
    return ret;
}

#ifdef __cplusplus
}
#endif