* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* CM is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cms_process_messages_clt.cpp
*
*
* IDENTIFICATION
* src/cm_server/cms_process_messages_clt.cpp
*
* -------------------------------------------------------------------------
*/
#include "cms_ddb.h"
#include "hotpatch/hotpatch.h"
#include "cms_common.h"
#include "cm_msg.h"
#include "cms_process_messages.h"
#include "cms_arbitrate_datanode.h"
#include "cms_arbitrate_datanode_pms.h"
#include "cms_az.h"
#include "cms_alarm.h"
#include <arpa/inet.h>
#include <netinet/in.h>
#include "cm_ip.h"
#include "cm_msg_version_convert.h"
const int KV_POS = 2;
#if ((defined(ENABLE_MULTIPLE_NODES)) || (defined(ENABLE_PRIVATEGAUSS)))
const int SYNC_FINISH_PERCENT = 99;
static int NeedDoGsGuc(uint32 groupIndex, int memberIndex);
static bool CompareMemberWithExpectSyncList(uint32 groupIdx, int memIdx);
static bool IsExpectSyncListInstanceNormal(uint32 groupIndex, uint32 instanceId, int *primaryCount);
#endif
static bool SwitchoverStatusCheck(uint32 group_index, int member_index);
void process_to_query_instance_status_msg(MsgRecvInfo* recvMsgInfo, const cm_query_instance_status *query_status_ptr);
int FinishRedoCheck(void)
{
int finish_redo_group = 0;
for (uint32 i = 0; i < g_dynamic_header->relationCount; i++) {
(void)pthread_rwlock_rdlock(&(g_instance_group_report_status_ptr[i].lk_lock));
if (g_instance_role_group_ptr[i].count > 0 &&
g_instance_role_group_ptr[i].instanceMember[0].instanceType == INSTANCE_TYPE_DATANODE &&
g_instance_group_report_status_ptr[i].instance_status.finish_redo) {
finish_redo_group++;
}
(void)pthread_rwlock_unlock(&(g_instance_group_report_status_ptr[i].lk_lock));
}
return finish_redo_group;
}
static bool CmsDoCltDdbOper(CltSendDdbOper *ddbOper, char *value, uint32 valueLen, DDB_RESULT *ddbResult, status_t *st)
{
switch (ddbOper->dbOper) {
case DDB_GET_OPER:
*st = GetKVFromDDb(ddbOper->key, ddbOper->keyLen, value, valueLen, ddbResult);
break;
case DDB_SET_OPER:
*st = SetKV2Ddb(ddbOper->key, ddbOper->keyLen, ddbOper->value, ddbOper->valueLen, NULL);
break;
case DDB_DEL_OPER:
*st = DelKeyInDdb(ddbOper->key, ddbOper->keyLen);
break;
default:
write_runlog(ERROR, "unkown ddbOper(%d).\n", ddbOper->dbOper);
return false;
}
return true;
}
static void GetSendCltDdbMsg(
CmSendDdbOperRes *sendDdbRes, const CltSendDdbOper *ddbOper, const char *value, status_t st)
{
sendDdbRes->msgType = (int32)MSG_CM_CLIENT_DDB_OPER_ACK;
sendDdbRes->dbOper = ddbOper->dbOper;
sendDdbRes->node = ddbOper->node;
errno_t rc = strcpy_s(sendDdbRes->threadName, THREAD_NAME_LEN, ddbOper->threadName);
securec_check_errno(rc, (void)rc);
sendDdbRes->keyLen = ddbOper->keyLen;
rc = strcpy_s(sendDdbRes->key, MAX_PATH_LEN, ddbOper->key);
securec_check_errno(rc, (void)rc);
if (ddbOper->dbOper == DDB_GET_OPER) {
sendDdbRes->valueLen = (uint32)strlen(value);
rc = strcpy_s(sendDdbRes->value, MAX_PATH_LEN, value);
securec_check_errno(rc, (void)rc);
} else {
sendDdbRes->valueLen = ddbOper->valueLen;
rc = strcpy_s(sendDdbRes->value, MAX_PATH_LEN, ddbOper->value);
securec_check_errno(rc, (void)rc);
}
if (st != CM_SUCCESS) {
sendDdbRes->exeStatus = false;
} else {
sendDdbRes->exeStatus = true;
}
}
void ProcessCltSendOper(MsgRecvInfo* recvMsgInfo, CltSendDdbOper *ddbOper)
{
if (ddbOper == NULL || ddbOper->dbOper == DDB_INIT_OPER) {
write_runlog(ERROR, "ddbOper is NULL, or dbOper is DDB_INIT_OPER.\n");
return;
}
ddbOper->key[MAX_PATH_LEN - 1] = '\0';
ddbOper->value[MAX_PATH_LEN - 1] = '\0';
ddbOper->reserved[RESERVE_LEN - 1] = '\0';
ddbOper->threadName[THREAD_NAME_LEN - 1] = '\0';
status_t st = CM_SUCCESS;
DDB_RESULT ddbResult = SUCCESS_GET_VALUE;
char value[MAX_PATH_LEN] = {0};
bool res = CmsDoCltDdbOper(ddbOper, value, MAX_PATH_LEN, &ddbResult, &st);
if (!res) {
return;
}
CmSendDdbOperRes sendDdbRes = {0};
GetSendCltDdbMsg(&sendDdbRes, ddbOper, value, st);
write_runlog(LOG,
"send ddbRes to client(%u), threadName is %s, oper is %d, key(%s), value(%s), exeStatus is %d.\n",
ddbOper->node,
ddbOper->threadName,
ddbOper->dbOper,
ddbOper->key,
sendDdbRes.value,
sendDdbRes.exeStatus);
(void)RespondMsg(recvMsgInfo, 'S', (const char *)(&sendDdbRes), sizeof(CmSendDdbOperRes));
}
#if ((defined(ENABLE_MULTIPLE_NODES)) || (defined(ENABLE_PRIVATEGAUSS)))
void FindInstanceInSyncList(uint32 groupIdx, int memIdx, bool *hasFound)
{
DatanodeSyncList *exceptSyncList = &(g_instance_group_report_status_ptr[groupIdx].instance_status.exceptSyncList);
uint32 instanceId = g_instance_role_group_ptr[groupIdx].instanceMember[memIdx].instanceId;
for (int i = 0; i < exceptSyncList->count; ++i) {
if (exceptSyncList->dnSyncList[i] == instanceId) {
*hasFound = true;
break;
}
}
}
static bool CompareMemberWithExpectSyncList(uint32 groupIdx, int memIdx)
{
DatanodeSyncList *expectSyncList = &(g_instance_group_report_status_ptr[groupIdx].instance_status.exceptSyncList);
cm_instance_datanode_report_status *dnReport =
&(g_instance_group_report_status_ptr[groupIdx].instance_status.data_node_member[memIdx]);
DatanodeSyncList *memberSyncList = &(dnReport->dnSyncList);
if (log_min_messages <= DEBUG1) {
char expectSyncListStr[MAX_PATH_LEN] = {0};
char memberSyncListStr[MAX_PATH_LEN] = {0};
GetSyncListString(expectSyncList, expectSyncListStr, sizeof(expectSyncListStr));
GetSyncListString(memberSyncList, memberSyncListStr, sizeof(memberSyncListStr));
write_runlog(DEBUG1,
"instanceId(%u), expectSyncList is [%s], memberSyncList is [%s], dbstate is %d, "
"syncDone is %d.\n", g_instance_role_group_ptr[groupIdx].instanceMember[memIdx].instanceId,
expectSyncListStr, memberSyncListStr, dnReport->local_status.db_state, dnReport->syncDone);
}
if (expectSyncList->count != memberSyncList->count) {
return false;
}
for (int i = 0; i < expectSyncList->count; ++i) {
if (expectSyncList->dnSyncList[i] != memberSyncList->dnSyncList[i]) {
return false;
}
}
return true;
}
static int NeedDoGsGuc(uint32 groupIndex, int memberIndex)
{
if (g_isEnableUpdateSyncList == CANNOT_START_SYNCLIST_THREADS) {
return SEND_AZ_SYNC_LIST;
}
if (g_isEnableUpdateSyncList == SYNCLIST_THREADS_IN_MAINTENANCE) {
return CAN_NOT_SEND_SYNC_lIST;
}
bool hasFound = false;
int primaryCount = 0;
DatanodeSyncList exceptSyncList = g_instance_group_report_status_ptr[groupIndex].instance_status.exceptSyncList;
cm_instance_datanode_report_status *dnReport =
&(g_instance_group_report_status_ptr[groupIndex].instance_status.data_node_member[memberIndex]);
FindInstanceInSyncList(groupIndex, memberIndex, &hasFound);
if (!hasFound) {
return NOT_NEED_TO_SEND_SYNC_LIST;
}
if (dnReport->local_status.local_role == INSTANCE_ROLE_PRIMARY && !CompareCurWithExceptSyncList(groupIndex)) {
write_runlog(DEBUG1, "instd %u: dn primary cur is different from expect, need to send sync list.\n",
GetInstanceIdInGroup(groupIndex, memberIndex));
return NEED_TO_SEND_SYNC_LIST;
}
if (CompareMemberWithExpectSyncList(groupIndex, memberIndex)) {
return NOT_NEED_TO_SEND_SYNC_LIST;
}
for (int i = 0; i < exceptSyncList.count; ++i) {
if (!IsExpectSyncListInstanceNormal(groupIndex, exceptSyncList.dnSyncList[i], &primaryCount)) {
return CAN_NOT_SEND_SYNC_lIST;
}
}
if (primaryCount != 1) {
return CAN_NOT_SEND_SYNC_lIST;
}
return NEED_TO_SEND_SYNC_LIST;
}
bool isDnGroupAvailable(uint32 groupId, int dnId)
{
synchronous_standby_mode currentMode =
g_instance_group_report_status_ptr[groupId].instance_status.data_node_member[dnId].sync_standby_mode;
if (currentMode <= FirstAz2) {
return true;
}
const int ONE_HUNDRED = 100;
const int percent_of_auto_gsguc = 50;
int availableNum = 0;
for (int j = 0; j < g_instance_role_group_ptr[groupId].count; j++) {
if (g_instance_group_report_status_ptr[groupId].instance_status.data_node_member[j].local_status.db_state ==
INSTANCE_HA_STATE_NORMAL) {
availableNum++;
}
}
return (availableNum * ONE_HUNDRED / g_instance_role_group_ptr[groupId].count) > percent_of_auto_gsguc;
}
static bool IsExpectSyncListInstanceNormal(uint32 groupIndex, uint32 instanceId, int *primaryCount)
{
for (int i = 0; i < g_instance_role_group_ptr[groupIndex].count; ++i) {
if (instanceId != g_instance_role_group_ptr[groupIndex].instanceMember[i].instanceId) {
continue;
}
if (g_instance_group_report_status_ptr[groupIndex].instance_status.data_node_member[i].local_status.db_state !=
INSTANCE_HA_STATE_NORMAL ||
g_instance_group_report_status_ptr[groupIndex].instance_status.data_node_member[i].local_status.term ==
InvalidTerm) {
return false;
}
if (g_instance_group_report_status_ptr[groupIndex]
.instance_status.data_node_member[i].local_status.local_role == INSTANCE_ROLE_PRIMARY) {
++(*primaryCount);
}
break;
}
return true;
}
#endif
status_t GetSwitchOverMsg(uint32 i, int j, bool doSwitchoverFast, cm_to_agent_switchover *switchoverMsg)
{
uint32 term = InvalidTerm;
cm_instance_report_status *instStatus = &g_instance_group_report_status_ptr[i].instance_status;
const cm_instance_role_status *instInfo = &g_instance_role_group_ptr[i].instanceMember[j];
if (doSwitchoverFast) {
switchoverMsg->msg_type = MSG_CM_AGENT_SWITCHOVER_FAST;
} else {
switchoverMsg->msg_type = MSG_CM_AGENT_SWITCHOVER;
}
switchoverMsg->node = instInfo->node;
switchoverMsg->instanceId = instInfo->instanceId;
switchoverMsg->instance_type = instInfo->instanceType;
switchoverMsg->wait_seconds = instStatus->command_member[j].time_out;
EnCheckSynclist echeck = CheckInstInSyncList(i, j, "[Send switchover]");
if (echeck != SYNCLIST_IS_FINISTH && (g_clusterType != V3SingleInstCluster)) {
write_runlog(LOG, "instId(%u) may be doing modify sync(%d), cannot do switchover.\n",
GetInstanceIdInGroup(i, j), (int32)echeck);
CleanCommand(i, j);
return CM_ERROR;
}
cm_instance_command_status *cmd = &(instStatus->command_member[j]);
if (cmd->cmdPur == INSTANCE_ROLE_STANDBY && cmd->cmdSour == INSTANCE_ROLE_CASCADE_STANDBY) {
switchoverMsg->term = InvalidTerm;
return CM_SUCCESS;
}
if ((g_clusterType != V3SingleInstCluster) && ((term = ReadTermFromDdb(i)) == InvalidTerm)) {
write_runlog(ERROR, "Term on Ddb has not been set yet, which should not happen.\n");
return CM_ERROR;
}
switchoverMsg->term = term;
return CM_SUCCESS;
}
void ProcessSwitchOverMsg(MsgRecvInfo* recvMsgInfo, uint32 i, int j, const cm_to_agent_switchover *switchoverMsg)
{
cm_instance_report_status *instStatus = &g_instance_group_report_status_ptr[i].instance_status;
const cm_instance_role_status *instInfo = &g_instance_role_group_ptr[i].instanceMember[j];
instStatus->command_member[j].command_send_status = INSTANCE_COMMAND_SEND_STATUS_SENDING;
instStatus->command_member[j].command_send_times = 0;
uint32 syncInstid = GetAvaiSyncDdbInstId();
if (syncInstid != 0) {
write_runlog(ERROR,
"primary dn(%u) most_available_sync is on, can not do switchover.\n", syncInstid);
return;
}
if (instStatus->command_member[j].delaySwitchoverTime > 0) {
write_runlog(LOG, "instId(%u) delayTime is %d, cannot do switchover.\n", instInfo->instanceId,
instStatus->command_member[j].delaySwitchoverTime);
return;
}
if (instStatus->command_member[j].command_status == INSTANCE_COMMAND_WAIT_EXEC) {
instStatus->command_member[j].command_send_num = 0;
}
instStatus->command_member[j].command_send_num++;
write_runlog(LOG,
"send switchover to instance(%u) for [%d/%d] times.\n",
instInfo->instanceId,
GetSendTimes(i, j, false),
GetSendTimes(i, j, true));
if (g_multi_az_cluster && (instStatus->command_member[j].command_send_num > 1)) {
return;
}
if (!IsArchiveMaxSendTimes(i, j)) {
return;
}
WriteKeyEventLog(KEY_EVENT_SWITCHOVER, g_instance_role_group_ptr[i].instanceMember[j].instanceId,
"send switchover message, node=%u, instance=%u",
g_instance_role_group_ptr[i].instanceMember[j].node,
g_instance_role_group_ptr[i].instanceMember[j].instanceId);
if (RespondMsg(recvMsgInfo, 'S', (const char *)(switchoverMsg), sizeof(cm_to_agent_switchover)) == 0) {
instStatus->command_member[j].command_status = INSTANCE_COMMAND_WAIT_EXEC_ACK;
} else {
instStatus->command_member[j].command_send_status = INSTANCE_COMMAND_SEND_STATUS_FAIL;
}
return;
}
void ProcessBuildMsg(MsgRecvInfo* recvMsgInfo, uint32 i, int j)
{
cm_instance_report_status *instStatus = &g_instance_group_report_status_ptr[i].instance_status;
const cm_instance_role_status *instInfo = &g_instance_role_group_ptr[i].instanceMember[j];
cm_to_agent_build buildMsg;
buildMsg.msg_type = MSG_CM_AGENT_BUILD;
buildMsg.full_build = instStatus->command_member[j].full_build;
buildMsg.node = instInfo->node;
buildMsg.instanceId = instInfo->instanceId;
buildMsg.instance_type = instInfo->instanceType;
buildMsg.wait_seconds = instStatus->command_member[j].time_out;
buildMsg.term = instStatus->term;
buildMsg.parallel = instStatus->command_member[j].parallel;
buildMsg.role = instInfo->role;
if (buildMsg.instance_type == INSTANCE_TYPE_DATANODE) {
if (g_clusterType == V3SingleInstCluster) {
buildMsg.primaryNodeId = GetPrimaryDnIndex();
} else {
uint32 primaryTerm = find_primary_term(i);
if (primaryTerm > InvalidTerm || IsBoolCmParamTrue(g_enableDcf)) {
buildMsg.term = primaryTerm;
} else {
write_runlog(ERROR,
"line %d: No legal primary for building instance %u.",
__LINE__,
instInfo->instanceId);
return;
}
}
}
instStatus->command_member[j].command_send_status = INSTANCE_COMMAND_SEND_STATUS_SENDING;
instStatus->command_member[j].command_send_times = 0;
if (RespondMsg(recvMsgInfo, 'S', (char *)(&buildMsg), sizeof(buildMsg)) == 0) {
instStatus->command_member[j].command_status = INSTANCE_COMMAND_WAIT_EXEC_ACK;
} else {
instStatus->command_member[j].command_send_status = INSTANCE_COMMAND_SEND_STATUS_FAIL;
}
return;
}
void SaveNotifyMsg(uint32 i, int j, cm_to_agent_notify_cn *p_cm_to_agent_notify_cn)
{
int count = 0;
cm_instance_report_status *instStatus = &g_instance_group_report_status_ptr[i].instance_status;
cm_notify_msg_status *notify_msg = &instStatus->coordinatemember.notify_msg;
for (uint32 k = 0; k < g_datanode_instance_count; k++) {
if (notify_msg->notify_status != NULL && notify_msg->notify_status[k] &&
notify_msg->datanode_instance != NULL) {
p_cm_to_agent_notify_cn->datanodeId[count++] = notify_msg->datanode_instance[k];
}
}
if (notify_msg->gtmIdBroadCast != 0) {
p_cm_to_agent_notify_cn->datanodeId[count] = notify_msg->gtmIdBroadCast;
}
p_cm_to_agent_notify_cn->coordinatorId = 0;
instStatus->command_member[j].command_send_status = INSTANCE_COMMAND_SEND_STATUS_SENDING;
instStatus->command_member[j].command_send_times = 0;
return;
}
int GetNotifyCnCount(cm_instance_report_status *instStatus, const cm_instance_role_status *instInfo)
{
int count = 0;
int gtmCount = 0;
cm_notify_msg_status *notify_msg;
* Scan the notify status array, allocate the datanode array memory due to the
* actual scan result, put all the needed datanode instanceId into notify msg.
*/
notify_msg = &instStatus->coordinatemember.notify_msg;
for (uint32 k = 0; k < g_datanode_instance_count; k++) {
if (notify_msg->notify_status != NULL && notify_msg->notify_status[k]) {
Assert(notify_msg->datanode_instance[k] != 0);
count++;
}
}
if (notify_msg->gtmIdBroadCast != 0) {
gtmCount++;
}
write_runlog(LOG,
"coordinator %u notify map datanode count %d, gtm count %d, notifyCount %d\n",
instInfo->instanceId,
count,
gtmCount,
instStatus->command_member[0].notifyCnCount);
count += gtmCount;
return count;
}
status_t ProcessNotifyMsg(MsgRecvInfo* recvMsgInfo, uint32 i, int j, int count)
{
cm_instance_report_status *instStatus = &g_instance_group_report_status_ptr[i].instance_status;
const cm_instance_role_status *instInfo = &g_instance_role_group_ptr[i].instanceMember[j];
size_t msg_len = offsetof(cm_to_agent_notify_cn, datanodeId) + (uint32)(count) * sizeof(uint32);
if (msg_len == 0) {
return CM_ERROR;
}
cm_to_agent_notify_cn *p_cm_to_agent_notify_cn = (cm_to_agent_notify_cn *)malloc(msg_len);
if (p_cm_to_agent_notify_cn == NULL) {
write_runlog(ERROR, "out of memory! requested size: %lu.\n", msg_len);
return CM_ERROR;
}
p_cm_to_agent_notify_cn->msg_type = MSG_CM_AGENT_NOTIFY_CN;
p_cm_to_agent_notify_cn->node = instInfo->node;
p_cm_to_agent_notify_cn->instanceId = instInfo->instanceId;
p_cm_to_agent_notify_cn->datanodeCount = count;
p_cm_to_agent_notify_cn->notifyCount = instStatus->command_member[0].notifyCnCount;
SaveNotifyMsg(i, j, p_cm_to_agent_notify_cn);
if (RespondMsg(recvMsgInfo, 'S', (char *)p_cm_to_agent_notify_cn, msg_len) == 0) {
instStatus->command_member[j].command_status = INSTANCE_COMMAND_WAIT_EXEC_ACK;
} else {
instStatus->command_member[j].command_send_status = INSTANCE_COMMAND_SEND_STATUS_FAIL;
}
FREE_AND_RESET(p_cm_to_agent_notify_cn);
return CM_SUCCESS;
}
#if ((defined(ENABLE_MULTIPLE_NODES)) || (defined(ENABLE_PRIVATEGAUSS)))
status_t CheckDnSendRecv(uint32 i, int j, int needDoGsGuc, const cm_instance_role_status *dnRole)
{
const long printInterval = 60;
cm_instance_datanode_report_status *dnReport =
&(g_instance_group_report_status_ptr[i].instance_status.data_node_member[j]);
int senderPercent = dnReport->sender_status[0].sync_percent;
int receiverPercent = dnReport->receive_status.sync_percent;
if ((senderPercent < SYNC_FINISH_PERCENT && receiverPercent < SYNC_FINISH_PERCENT &&
dnReport->local_status.local_role != INSTANCE_ROLE_UNKNOWN) || needDoGsGuc == CAN_NOT_SEND_SYNC_lIST) {
int32 logLevel = LOG;
cmTime_t checkEnd;
(void)clock_gettime(CLOCK_MONOTONIC, &checkEnd);
bool res = ((needDoGsGuc == CAN_NOT_SEND_SYNC_lIST) || (dnRole->role == INSTANCE_ROLE_DUMMY_STANDBY) ||
(needDoGsGuc == NOT_NEED_TO_SEND_SYNC_LIST) ||
(checkEnd.tv_sec - dnReport->printBegin.tv_sec < printInterval));
if (res) {
logLevel = DEBUG1;
} else {
(void)clock_gettime(CLOCK_MONOTONIC, &(dnReport->printBegin));
}
write_runlog(logLevel,
"instance %u, sender percent %d, receive percent %d, needDoGsGuc is %d, will do nothing.\n",
dnRole->instanceId, senderPercent, receiverPercent, needDoGsGuc);
return CM_ERROR;
}
return CM_SUCCESS;
}
void SendSyncListMsg(MsgRecvInfo* recvMsgInfo, uint32 nodeId, uint32 i, const cm_instance_role_status *dnRole,
const DatanodeSyncList *exceptSyncList)
{
CmToAgentGsGucSyncList cmToAgentSyncListContent = {0};
cmToAgentSyncListContent.msgType = (int)MSG_CM_AGENT_DN_SYNC_LIST;
cmToAgentSyncListContent.node = nodeId;
cmToAgentSyncListContent.instanceId = dnRole->instanceId;
cmToAgentSyncListContent.instanceNum = g_instance_role_group_ptr[i].count;
cmToAgentSyncListContent.groupIndex = i;
error_t rc = memcpy_s(
&(cmToAgentSyncListContent.dnSyncList), sizeof(DatanodeSyncList), exceptSyncList, sizeof(DatanodeSyncList));
securec_check_errno(rc, (void)rc);
char dnSyncListStr[MAX_PATH_LEN] = {0};
GetSyncListString(&(cmToAgentSyncListContent.dnSyncList), dnSyncListStr, sizeof(dnSyncListStr));
WriteKeyEventLog(KEY_EVENT_RELOAD_GS_GUC, dnRole->instanceId,
"send reload gs guc msg, instanceId is %u, msgtype is %d, SyncList is [%s]",
dnRole->instanceId, cmToAgentSyncListContent.msgType, dnSyncListStr);
(void)RespondMsg(recvMsgInfo, 'S', (char *)(&cmToAgentSyncListContent), sizeof(CmToAgentGsGucSyncList));
return;
}
void PutGsGucContent(MsgRecvInfo* recvMsgInfo, synchronous_standby_mode standbyMode, uint32 nodeId,
const cm_instance_role_status *dnRole)
{
cm_to_agent_gs_guc cm_to_agent_gs_guc_content;
cm_to_agent_gs_guc_content.msg_type = MSG_CM_AGENT_GS_GUC;
cm_to_agent_gs_guc_content.type = standbyMode;
cm_to_agent_gs_guc_content.node = nodeId;
cm_to_agent_gs_guc_content.instanceId = dnRole->instanceId;
WriteKeyEventLog(KEY_EVENT_RELOAD_GS_GUC, dnRole->instanceId,
"send reload gs guc msg, instanceId is %u, type is %d", dnRole->instanceId,
(int)cm_to_agent_gs_guc_content.type);
(void)RespondMsg(recvMsgInfo,
'S', (char *)(&cm_to_agent_gs_guc_content), sizeof(cm_to_agent_gs_guc_content));
return;
}
#endif
void CmToAgentMsg(MsgRecvInfo* recvMsgInfo, int msgType)
{
uint32 i;
int j;
status_t ret = CM_SUCCESS;
bool doSwitchoverFast = false;
if (recvMsgInfo->connID.remoteType != CM_AGENT) {
return;
}
for (i = 0; i < g_dynamic_header->relationCount; i++) {
cm_instance_report_status *instStatus = &g_instance_group_report_status_ptr[i].instance_status;
for (j = 0; j < g_instance_role_group_ptr[i].count; j++) {
const cm_instance_role_status *instInfo = &g_instance_role_group_ptr[i].instanceMember[j];
if ((instInfo->node == recvMsgInfo->connID.agentNodeId) &&
((instStatus->command_member[j].command_status == INSTANCE_COMMAND_WAIT_EXEC) ||
!SwitchoverStatusCheck(i, j))) {
(void)pthread_rwlock_wrlock(&(g_instance_group_report_status_ptr[i].lk_lock));
switch (instStatus->command_member[j].pengding_command) {
case MSG_CM_AGENT_SWITCHOVER:
cm_to_agent_switchover switchoverMsg;
doSwitchoverFast = instStatus->command_member[j].msgProcFlag & MPF_DO_SWITCHOVER;
ret = GetSwitchOverMsg(i, j, doSwitchoverFast, &switchoverMsg);
if (ret != CM_SUCCESS) {
break;
}
ProcessSwitchOverMsg(recvMsgInfo, i, j, &switchoverMsg);
break;
case MSG_CM_AGENT_BUILD:
ProcessBuildMsg(recvMsgInfo, i, j);
break;
case MSG_CM_AGENT_NOTIFY_CN:
if (msgType == MSG_AGENT_CM_COORDINATE_INSTANCE_STATUS) {
int count = GetNotifyCnCount(instStatus, instInfo);
ret = ProcessNotifyMsg(recvMsgInfo, i, j, count);
if (ret != CM_SUCCESS) {
(void)pthread_rwlock_unlock(&(g_instance_group_report_status_ptr[i].lk_lock));
return;
}
break;
} else {
*/
break;
}
default:
write_runlog(LOG, "unknown command is %d \n", instStatus->command_member[j].pengding_command);
(void)pthread_rwlock_unlock(&(g_instance_group_report_status_ptr[i].lk_lock));
return;
}
(void)pthread_rwlock_unlock(&(g_instance_group_report_status_ptr[i].lk_lock));
}
}
#if ((defined(ENABLE_MULTIPLE_NODES)) || (defined(ENABLE_PRIVATEGAUSS)))
if (msgType == MSG_AGENT_CM_DATA_INSTANCE_REPORT_STATUS &&
enable_az_auto_switchover != 0) {
if (IsBoolCmParamTrue(g_enableDcf)) {
write_runlog(DEBUG1, "working in dcf mode, return.\n");
continue;
}
for (j = 0; j < g_instance_role_group_ptr[i].count; j++) {
cm_instance_role_status *dnRole = &(g_instance_role_group_ptr[i].instanceMember[j]);
uint32 nodeId = dnRole->node;
if (nodeId != recvMsgInfo->connID.agentNodeId) {
continue;
}
if (g_instance_role_group_ptr[i].instanceMember[0].instanceType != INSTANCE_TYPE_DATANODE) {
break;
}
if (!IsDnSyncListVaild(i, NULL)) {
break;
}
synchronous_standby_mode standbyMode = instStatus->data_node_member[j].sync_standby_mode;
int needDoGsGuc = NeedDoGsGuc(i, j);
write_runlog(DEBUG1, "instanceId is %u, needDoGsGuc is %d.\n", dnRole->instanceId, needDoGsGuc);
if ((standbyMode == AnyFirstNo && needDoGsGuc == SEND_AZ_SYNC_LIST) ||
(needDoGsGuc != NEED_TO_SEND_SYNC_LIST && needDoGsGuc != SEND_AZ_SYNC_LIST)) {
continue;
}
ret = CheckDnSendRecv(i, j, needDoGsGuc, dnRole);
if (ret != CM_SUCCESS) {
break;
}
DatanodeSyncList exceptSyncList = instStatus->exceptSyncList;
if (needDoGsGuc == NEED_TO_SEND_SYNC_LIST) {
if (instStatus->data_node_member[j].send_gs_guc_time < CM_GS_GUC_SEND_INTERVAL) {
continue;
}
if (instStatus->data_node_member[j].local_status.local_role == INSTANCE_ROLE_UNKNOWN) {
continue;
}
instStatus->data_node_member[j].send_gs_guc_time = 0;
SendSyncListMsg(recvMsgInfo, nodeId, i, dnRole, &exceptSyncList);
continue;
}
if (standbyMode != AnyFirstNo && (nodeId == recvMsgInfo->connID.agentNodeId)) {
bool dnGroupAvailable = isDnGroupAvailable(i, j);
if (!dnGroupAvailable) {
write_runlog(LOG,
"instance %u: dnAvailable: %d, will do nothing.\n",
dnRole->instanceId,
dnGroupAvailable);
break;
}
if (instStatus->data_node_member[j].send_gs_guc_time < CM_GS_GUC_SEND_INTERVAL) {
continue;
}
instStatus->data_node_member[j].send_gs_guc_time = 0;
PutGsGucContent(recvMsgInfo, standbyMode, nodeId, dnRole);
}
}
}
#endif
}
}
static bool SwitchoverStatusCheck(uint32 group_index, int member_index)
{
if (g_instance_group_report_status_ptr[group_index]
.instance_status.command_member[member_index].command_send_times < 0) {
g_instance_group_report_status_ptr[group_index].instance_status.command_member[member_index]
.command_send_times = 0;
}
if (g_instance_group_report_status_ptr[group_index].instance_status.command_member[member_index].pengding_command !=
MSG_CM_AGENT_SWITCHOVER) {
return true;
}
if (g_instance_group_report_status_ptr[group_index]
.instance_status.command_member[member_index]
.command_send_times >= SWITCHOVER_SEND_CHECK_RATE &&
g_instance_group_report_status_ptr[group_index]
.instance_status.data_node_member[member_index]
.local_status.db_state != INSTANCE_HA_STATE_WAITING &&
g_instance_group_report_status_ptr[group_index]
.instance_status.data_node_member[member_index]
.local_status.db_state != INSTANCE_HA_STATE_PROMOTING &&
IsArchiveMaxSendTimes(group_index, member_index) &&
g_instance_role_group_ptr[group_index].instanceMember[member_index].instanceType == INSTANCE_TYPE_DATANODE) {
g_instance_group_report_status_ptr[group_index]
.instance_status.command_member[member_index]
.command_send_times = 0;
return false;
}
return true;
}
uint32 find_nodeindex_by_nodeid(uint32 nodeId, int instanceType)
{
uint32 index = 0;
bool findIndex = false;
for (uint32 i = 0; i < g_node_num; i++) {
if (instanceType == PROCESS_ETCD) {
if (g_node[i].etcd && g_node[i].node == nodeId) {
findIndex = true;
break;
}
if (g_node[i].etcd) {
++index;
}
} else if (instanceType == PROCESS_CMSERVER) {
if (g_node[i].cmServerLevel == 1 && g_node[i].node == nodeId) {
findIndex = true;
break;
}
if (g_node[i].cmServerLevel == 1) {
++index;
}
} else {
write_runlog(ERROR, "unexpected replicat number %u.\n", index);
continue;
}
}
if (index > CM_PRIMARY_STANDBY_NUM || !findIndex) {
write_runlog(ERROR, "unexpected replicat number %u.\n", index);
}
return index;
}
void process_to_query_instance_status_msg(
MsgRecvInfo* recvMsgInfo, const cm_query_instance_status *query_status_ptr)
{
cm_query_instance_status cm_query_instance_status_content;
if (query_status_ptr->msg_step == QUERY_STATUS_CMSERVER_STEP) {
cm_query_instance_status_content.msg_type = MSG_CM_QUERY_INSTANCE_STATUS;
cm_query_instance_status_content.nodeId = query_status_ptr->nodeId;
cm_query_instance_status_content.instanceType = query_status_ptr->instanceType;
cm_query_instance_status_content.msg_step = QUERY_STATUS_CMSERVER_STEP;
if (cm_query_instance_status_content.instanceType == PROCESS_ETCD) {
uint32 index = find_nodeindex_by_nodeid(cm_query_instance_status_content.nodeId, PROCESS_ETCD);
cm_query_instance_status_content.status = g_instance_status_for_etcd[index];
} else if (cm_query_instance_status_content.instanceType == PROCESS_CMSERVER) {
uint32 index = find_nodeindex_by_nodeid(cm_query_instance_status_content.nodeId, PROCESS_CMSERVER);
cm_query_instance_status_content.status = g_instance_status_for_cm_server[index];
cm_query_instance_status_content.pending = g_instance_status_for_cm_server_pending[index];
} else {
write_runlog(ERROR, "unknown instance type %u for query instance status.\n",
cm_query_instance_status_content.instanceType);
}
(void)RespondMsg(recvMsgInfo,
'S',
(char *)&(cm_query_instance_status_content),
sizeof(cm_query_instance_status_content),
DEBUG5);
} else if (query_status_ptr->msg_step == QUERY_STATUS_CMAGENT_STEP) {
(void)pthread_rwlock_wrlock(&instance_status_rwlock);
if (query_status_ptr->instanceType == PROCESS_ETCD) {
uint32 index = find_nodeindex_by_nodeid(query_status_ptr->nodeId, PROCESS_ETCD);
if (query_status_ptr->status == CM_ETCD_LEADER) {
for (uint32 i = 0; i < g_etcd_num; i++) {
if (i != index && g_instance_status_for_etcd[i] == CM_ETCD_LEADER) {
g_instance_status_for_etcd[i] = CM_ETCD_FOLLOWER;
g_instance_status_for_etcd[index] = CM_ETCD_LEADER;
}
}
}
g_instance_status_for_etcd[index] = query_status_ptr->status;
g_instance_status_for_etcd_timeout[index] = cmserver_and_etcd_instance_status_for_timeout;
} else if (query_status_ptr->instanceType == PROCESS_CMSERVER) {
uint32 index = find_nodeindex_by_nodeid(query_status_ptr->nodeId, PROCESS_CMSERVER);
if (g_HA_status->local_role == CM_SERVER_PRIMARY && g_currentNode->node == query_status_ptr->nodeId) {
g_instance_status_for_cm_server[index] = CM_SERVER_PRIMARY;
} else {
g_instance_status_for_cm_server[index] = query_status_ptr->status;
}
g_instance_status_for_cm_server_pending[index] = query_status_ptr->pending;
g_instance_status_for_cm_server_timeout[index] = cmserver_and_etcd_instance_status_for_timeout;
} else {
write_runlog(
ERROR, "unknown instance type %u for query instance status.\n", query_status_ptr->instanceType);
}
(void)pthread_rwlock_unlock(&instance_status_rwlock);
}
}
void HotpatchInfoStateToString(unsigned int state, char *state_string, size_t length_state_string)
{
int ret;
const char *string_state[] = {
"UNKNOWN",
"UNLOAD",
"DEACTIVE",
"ACTIVE",
};
switch (state) {
case HP_STATE_ACTIVED:
case HP_STATE_DEACTIVE:
case HP_STATE_UNLOAD:
ret = snprintf_s(state_string, length_state_string, length_state_string - 1, "%s", string_state[state]);
securec_check_intval(ret, (void)ret);
break;
default:
ret = snprintf_s(state_string, length_state_string, length_state_string - 1, "%s", string_state[0]);
securec_check_intval(ret, (void)ret);
break;
}
return;
}
int HotpatchGetPatchInfo(const char *file_path, int *patch_number, PATCH_INFO_T **patch_info_all)
{
int max_patch_number;
char real_path[PATH_MAX] = {0};
PATCH_INFO_HEADER_T hp_header = {0};
if (file_path == NULL) {
return HP_ERROR_FILE_PATH_ERROR;
}
if (realpath(file_path, real_path) == NULL) {
return HP_ERROR_FILE_PATH_ERROR;
}
FILE *fp = fopen(real_path, "r");
if (fp == NULL) {
return HP_ERROR_FILE_OPEN_ERROR;
}
size_t ret = fread(&hp_header, sizeof(PATCH_INFO_HEADER_T), 1, fp);
if (ret != 1) {
(void)fclose(fp);
fp = NULL;
return HP_ERROR_FILE_READ_ERROR;
}
max_patch_number = hp_header.max_patch_number;
if ((max_patch_number <= 0) || (max_patch_number > g_max_number_patch)) {
(void)fclose(fp);
fp = NULL;
return HP_ERROR_PATCH_NUMBER_ERROR;
}
*patch_info_all = (PATCH_INFO_T *)malloc((size_t)max_patch_number * sizeof(PATCH_INFO_T));
if (*patch_info_all == NULL) {
(void)fclose(fp);
fp = NULL;
return HP_ERROR_SYSTEM_ERROR;
}
ret = fread(*patch_info_all, sizeof(PATCH_INFO_T), (size_t)max_patch_number, fp);
if ((int)ret != max_patch_number) {
free(*patch_info_all);
*patch_info_all = NULL;
(void)fclose(fp);
fp = NULL;
return HP_ERROR_PATCH_INFO_ERROR;
}
*patch_number = max_patch_number;
(void)fclose(fp);
fp = NULL;
return HP_OK;
}
#if defined (ENABLE_MULTIPLE_NODES) || defined (ENABLE_PRIVATEGAUSS)
void HotpatchReturnError(MsgRecvInfo* recvMsgInfo, int err_code)
{
char return_string[MAX_LENGTH_RETURN_STRING] = {0};
patch_err_info_handler(err_code, return_string, MAX_LENGTH_RETURN_STRING);
(void)RespondMsg(recvMsgInfo, 'S', return_string, strlen(return_string), DEBUG5);
}
void SendHotPatchMsg(MsgRecvInfo* recvMsgInfo, int patch_number, const PATCH_INFO_T *patch_info_all)
{
int ret;
char *patch_name = NULL;
char patch_state_string[g_length_statstr] = {0};
int str_start = 0;
int list_strlen;
list_strlen = patch_number * g_max_length_line;
char *return_string_list = (char *)malloc(list_strlen);
if (return_string_list == NULL) {
HotpatchReturnError(recvMsgInfo, (int32)HP_ERROR_SYSTEM_ERROR);
return;
}
for (int i = 0; i < patch_number; i++) {
HotpatchInfoStateToString(patch_info_all[i].patch_state, patch_state_string,
sizeof(patch_state_string));
patch_name = strip_path_from_pathname(patch_info_all[i].patch_name);
if (patch_name == NULL) {
ret = snprintf_s(return_string_list + str_start,
list_strlen - str_start,
(list_strlen - str_start) - 1,
"PATCH: UNKNOW STATE: %s\n",
patch_state_string);
} else {
ret = snprintf_s(return_string_list + str_start,
list_strlen - str_start,
(list_strlen - str_start) - 1,
"PATCH: %s STATE: %s\n",
patch_name,
patch_state_string);
}
securec_check_intval(ret, (void)ret);
str_start = str_start + ret;
}
ret = strcat_s(return_string_list, list_strlen, "[PATCH-SUCCESS] : LIST PATCH\n");
securec_check_errno(ret, (void)ret);
(void)RespondMsg(recvMsgInfo, 'S', return_string_list, strlen(return_string_list), DEBUG5);
free(return_string_list);
return;
}
void HotpatchReturnList(MsgRecvInfo* recvMsgInfo)
{
int ret;
int patch_number;
char patch_dir[MAX_PATH_LEN] = {0};
PATCH_INFO_T *patch_info_all = NULL;
if (cm_server_dataDir == NULL) {
HotpatchReturnError(recvMsgInfo, (int32)HP_ERROR_CM_DATADIR_NULL);
return;
}
ret = snprintf_s(patch_dir, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%s/hotpatch/patch.info", cm_server_dataDir);
securec_check_intval(ret, (void)ret);
ret = HotpatchGetPatchInfo(patch_dir, &patch_number, &patch_info_all);
if (ret != HP_OK) {
HotpatchReturnError(recvMsgInfo, ret);
return;
}
SendHotPatchMsg(recvMsgInfo, patch_number, patch_info_all);
free(patch_info_all);
patch_info_all = NULL;
return;
}
#endif
void ProcessHotpatchMessage(MsgRecvInfo* recvMsgInfo, cm_hotpatch_msg *hotpatch_msg)
{
#if defined (ENABLE_MULTIPLE_NODES) || defined (ENABLE_PRIVATEGAUSS)
int ret;
bool is_list = false;
char return_string[MAX_LENGTH_RETURN_STRING] = {0};
hotpatch_msg->command[MAX_LENGTH_HP_CMD - 1] = '\0';
if (strncmp(hotpatch_msg->command, "list", g_length_list) == 0) {
is_list = true;
ret = exec_hotpatch_command(hotpatch_msg->path, "check", return_string, MAX_LENGTH_RETURN_STRING);
} else {
ret = exec_hotpatch_command(hotpatch_msg->path, hotpatch_msg->command, return_string, MAX_LENGTH_RETURN_STRING);
}
if (ret != HP_OK) {
write_runlog(LOG, "hotpatch exec error. ret is %d\n", ret);
}
if (is_list && (strncmp(return_string, "[PATCH-SUCCESS]", g_length_okstr) == 0)) {
HotpatchReturnList(recvMsgInfo);
} else if (is_list && (strstr(return_string, "No patch loaded now") != NULL)) {
ret = strcpy_s(return_string, sizeof(return_string), "[PATCH-SUCCESS] LIST PATCH , NO PATCH LOAD!\n");
securec_check_errno(ret, (void)ret);
(void)RespondMsg(recvMsgInfo, 'S', return_string, strlen(return_string), DEBUG5);
} else {
(void)RespondMsg(recvMsgInfo, 'S', return_string, strlen(return_string), DEBUG5);
}
#endif
return;
}
void ProcessStopArbitrationMessage(void)
{
ctl_stop_cluster_server_halt_arbitration_timeout = (uint32)ctl_stop_cluster_server_halt_arbitration_timeout_init;
write_runlog(WARNING,
"Received stop arbitration from cm_ctl, meaning that cm_ctl is about to running "
"a full stop-cluster. stopping arbitration for %u\n",
ctl_stop_cluster_server_halt_arbitration_timeout);
}
void process_finish_redo_check_message(MsgRecvInfo* recvMsgInfo)
{
cm_to_ctl_finish_redo_check_ack msg_finish_redo_check_ack;
msg_finish_redo_check_ack.msg_type = MSG_CM_CTL_FINISH_REDO_CHECK_ACK;
msg_finish_redo_check_ack.finish_redo_count = FinishRedoCheck();
(void)RespondMsg(recvMsgInfo, 'S', (char *)(&msg_finish_redo_check_ack), sizeof(msg_finish_redo_check_ack));
}
void process_finish_redo_message(MsgRecvInfo* recvMsgInfo)
{
cm_msg_type msgFinishRedoAck;
if (backup_open != CLUSTER_PRIMARY) {
msgFinishRedoAck.msg_type = MSG_CM_CTL_BACKUP_OPEN;
(void)RespondMsg(recvMsgInfo, 'S', (char *)(&msgFinishRedoAck), sizeof(cm_msg_type));
return;
}
do_finish_redo = true;
if (undocumentedVersion == 0 || undocumentedVersion >= 92214) {
char status_key[MAX_PATH_LEN] = {0};
char value[MAX_PATH_LEN] = {0};
errno_t rc = memset_s(value, sizeof(value), '2', sizeof(value) - 1);
securec_check_errno(rc, (void)rc);
rc = snprintf_s(status_key, MAX_PATH_LEN, MAX_PATH_LEN - 1, "/%s/finish_redo_status", pw->pw_name);
securec_check_intval(rc, (void)rc);
for (uint32 group_index = 0; group_index < g_dynamic_header->relationCount; group_index++) {
if (g_instance_role_group_ptr[group_index].instanceMember[0].instanceType == INSTANCE_TYPE_DATANODE) {
(void)pthread_rwlock_wrlock(&(g_instance_group_report_status_ptr[group_index].lk_lock));
g_instance_group_report_status_ptr[group_index].instance_status.finish_redo = true;
(void)pthread_rwlock_unlock(&(g_instance_group_report_status_ptr[group_index].lk_lock));
uint32 dn_index = g_instance_role_group_ptr[group_index].instanceMember[0].instanceId - 6001;
value[dn_index] = '1';
ReportForceFinishRedoAlarm(group_index, 0, false);
}
}
(void)pthread_rwlock_wrlock(&g_finish_redo_rwlock);
status_t st = SetKV2Ddb(status_key, MAX_PATH_LEN, value, MAX_PATH_LEN, NULL);
if (st != CM_SUCCESS) {
write_runlog(
ERROR, "%d: Ddb set finish_redos flag failed. key = %s,value = %s.\n", __LINE__, status_key, value);
(void)pthread_rwlock_unlock(&g_finish_redo_rwlock);
return;
}
(void)pthread_rwlock_unlock(&g_finish_redo_rwlock);
write_runlog(LOG, "%d: Ddb set finish_redos flag success. key = %s,value = %s.\n", __LINE__, status_key, value);
} else {
char statusValue[MAX_PATH_LEN] = {0};
errno_t rc = strcpy_s(statusValue, MAX_PATH_LEN, "true");
securec_check_errno(rc, (void)rc);
for (uint32 i = 0; i < g_dynamic_header->relationCount; i++) {
(void)pthread_rwlock_wrlock(&(g_instance_group_report_status_ptr[i].lk_lock));
if (g_instance_role_group_ptr[i].count > 0 &&
g_instance_role_group_ptr[i].instanceMember[0].instanceType == INSTANCE_TYPE_DATANODE) {
g_instance_group_report_status_ptr[i].instance_status.finish_redo = true;
}
(void)pthread_rwlock_unlock(&(g_instance_group_report_status_ptr[i].lk_lock));
ReportForceFinishRedoAlarm(i, 0, false);
char status_key[MAX_PATH_LEN] = {0};
rc = snprintf_s(status_key, MAX_PATH_LEN, MAX_PATH_LEN - 1, "/%s/finish_redo/%u", pw->pw_name, i);
securec_check_intval(rc, (void)rc);
(void)pthread_rwlock_wrlock(&g_finish_redo_rwlock);
status_t st = SetKV2Ddb(status_key, MAX_PATH_LEN, statusValue, (uint32)MAX_PATH_LEN, NULL);
if (st != CM_SUCCESS) {
write_runlog(
ERROR, "%d: Ddb set finish redo flag failed, key = %s, value = true.\n", __LINE__, status_key);
(void)pthread_rwlock_unlock(&g_finish_redo_rwlock);
continue;
}
(void)pthread_rwlock_unlock(&g_finish_redo_rwlock);
write_runlog(LOG, "line %d: Finish redo flag has been set to ddb in group %u.\n", __LINE__, i);
}
}
write_runlog(LOG, "Finish redo flag has been set to true.\n");
msgFinishRedoAck.msg_type = MSG_CM_CTL_FINISH_REDO_ACK;
(void)RespondMsg(recvMsgInfo, 'S', (char *)(&msgFinishRedoAck), sizeof(msgFinishRedoAck));
}
void process_finish_switchover_message(MsgRecvInfo* recvMsgInfo)
{
CleanSwitchoverCommand();
}
void FlushCmToAgentMsg(MsgRecvInfo* recvMsgInfo, int msgType)
{
CmToAgentMsg(recvMsgInfo, msgType);
}
status_t GetAgentDataReportMsg(CM_StringInfo inBuffer, agent_to_cm_datanode_status_report *agentToCmDatanodeStatusPtr)
{
if (undocumentedVersion != 0 && undocumentedVersion < SUPPORT_IPV6_VERSION) {
const agent_to_cm_datanode_status_report_ipv4 *agent_to_cm_datanode_status_ptr_ipv4 =
(const agent_to_cm_datanode_status_report_ipv4 *)CmGetmsgbytes(inBuffer,
sizeof(agent_to_cm_datanode_status_report_ipv4));
if (agent_to_cm_datanode_status_ptr_ipv4 == NULL) {
write_runlog(ERROR, "MSG_AGENT_CM_DATA_INSTANCE_REPORT_STATUS is null. inBuffer->qtype: %d \n",
inBuffer->qtype);
return CM_ERROR;
}
AgentToCmDatanodeStatusReportV1ToV2(agent_to_cm_datanode_status_ptr_ipv4, agentToCmDatanodeStatusPtr);
} else {
const agent_to_cm_datanode_status_report *agent_to_cm_datanode_status_ptr =
(const agent_to_cm_datanode_status_report *)CmGetmsgbytes(inBuffer,
sizeof(agent_to_cm_datanode_status_report));
if (agent_to_cm_datanode_status_ptr == NULL) {
write_runlog(ERROR,
"MSG_AGENT_CM_DATA_INSTANCE_REPORT_STATUS is null. inBuffer->qtype: %d \n", inBuffer->qtype);
return CM_ERROR;
}
errno_t rc = memcpy_s(agentToCmDatanodeStatusPtr,
sizeof(agent_to_cm_datanode_status_report),
agent_to_cm_datanode_status_ptr,
sizeof(agent_to_cm_datanode_status_report));
securec_check_errno(rc, (void)rc);
}
return CM_SUCCESS;
}
void SetAgentDataReportMsg(MsgRecvInfo* recvMsgInfo, CM_StringInfo inBuffer)
{
agent_to_cm_datanode_status_report agent_to_cm_datanode_status_ptr = {0};
if (GetAgentDataReportMsg(inBuffer, &agent_to_cm_datanode_status_ptr) != CM_SUCCESS) {
return;
}
if (agent_to_cm_datanode_status_ptr.node != recvMsgInfo->connID.agentNodeId) {
write_runlog(ERROR,
"Reject datanode status report from invalid agent connection: "
"connNode=%u, reportNode=%u, instanceId=%u, remoteType=%d.\n",
recvMsgInfo->connID.agentNodeId,
agent_to_cm_datanode_status_ptr.node,
agent_to_cm_datanode_status_ptr.instanceId,
recvMsgInfo->connID.remoteType);
return;
}
agent_to_cm_datanode_status_ptr.local_status.disconn_host[CM_IP_LENGTH - 1] = '\0';
agent_to_cm_datanode_status_ptr.local_status.local_host[CM_IP_LENGTH - 1] = '\0';
if (!g_inReload) {
if (agent_to_cm_datanode_status_ptr.instanceType != INSTANCE_TYPE_DATANODE) {
write_runlog(ERROR,
"Instance type %d not equal INSTANCE_TYPE_DATANODE(%d) ! Maybe the msg_type send by remote "
"cm_agent not match with local cm_server."
"Check whether the version of cm_agent and cm_server on node %u is the same as localhost. "
"msg_type=%d, node=%u, instanceId=%u, instanceType=%d, connectStatus=%d, processStatus=%d \n",
agent_to_cm_datanode_status_ptr.instanceType,
INSTANCE_TYPE_DATANODE, agent_to_cm_datanode_status_ptr.node,
agent_to_cm_datanode_status_ptr.msg_type, agent_to_cm_datanode_status_ptr.node,
agent_to_cm_datanode_status_ptr.instanceId, agent_to_cm_datanode_status_ptr.instanceType,
agent_to_cm_datanode_status_ptr.connectStatus, agent_to_cm_datanode_status_ptr.processStatus);
return;
}
g_loopState.execStatus[0] = 0;
struct timeval checkBegin = {0, 0};
struct timeval checkEnd = {0, 0};
(void)gettimeofday(&checkBegin, NULL);
if (g_single_node_cluster) {
datanode_instance_arbitrate_single(recvMsgInfo, &agent_to_cm_datanode_status_ptr);
} else if (g_multi_az_cluster) {
DatanodeInstanceArbitrate(recvMsgInfo, &agent_to_cm_datanode_status_ptr);
} else {
datanode_instance_arbitrate_for_psd(recvMsgInfo, &agent_to_cm_datanode_status_ptr);
}
(void)gettimeofday(&checkEnd, NULL);
if ((checkEnd.tv_sec - checkBegin.tv_sec) >= 2) {
write_runlog(LOG,
"it take %llu seconds for DN arbitrate.\n",
(unsigned long long)GetTimeMinus(checkEnd, checkBegin));
}
g_loopState.execStatus[0] = 1;
}
return;
}
static void DnBarrierReportRespToCm(cm_to_agent_barrier_info *barrierRespMsg)
{
errno_t rc = memset_s(barrierRespMsg, sizeof(cm_to_agent_barrier_info), 0, sizeof(cm_to_agent_barrier_info));
securec_check_errno(rc, (void)rc);
if (pw == NULL || pw->pw_name == NULL) {
write_runlog(ERROR, "failed to get user name to make etcd key\n");
return;
}
if (g_queryBarrier[0] != '\0') {
rc = memcpy_s(barrierRespMsg->queryBarrier, BARRIERLEN - 1, g_queryBarrier, BARRIERLEN - 1);
securec_check_errno(rc, (void)rc);
}
rc = memcpy_s(barrierRespMsg->targetBarrier, BARRIERLEN - 1, g_targetBarrier, BARRIERLEN - 1);
securec_check_errno(rc, (void)rc);
}
static void SetDatanodeBarrierInfo(const AgentToCmBarrierStatusReport* barrierInfo)
{
uint32 node = barrierInfo->node;
uint32 instanceId = barrierInfo->instanceId;
uint32 groupIdx = 0;
int memIdx = 0;
int ret;
int rc;
ret = find_node_in_dynamic_configure(node, instanceId, &groupIdx, &memIdx);
if (ret != 0) {
write_runlog(LOG, "can't find the instance(node =%u instanceid =%u)\n", node, instanceId);
return;
}
(void)pthread_rwlock_wrlock(&(g_instance_group_report_status_ptr[groupIdx].lk_lock));
cm_instance_datanode_report_status *localRep =
&(g_instance_group_report_status_ptr[groupIdx].instance_status.data_node_member[memIdx]);
rc = memcpy_s(localRep->barrierID, BARRIERLEN, barrierInfo->barrierID, BARRIERLEN);
securec_check_errno(rc, (void)rc);
rc = memcpy_s(localRep->query_barrierId, BARRIERLEN, barrierInfo->query_barrierId, BARRIERLEN);
securec_check_errno(rc, (void)rc);
localRep->is_barrier_exist = barrierInfo->is_barrier_exist;
(void)pthread_rwlock_unlock(&(g_instance_group_report_status_ptr[groupIdx].lk_lock));
}
void ProcessDnBarrierinfo(MsgRecvInfo* recvMsgInfo, CM_StringInfo inBuffer)
{
AgentToCmBarrierStatusReport *barrierInfo =
(AgentToCmBarrierStatusReport *)CmGetmsgbytes(inBuffer, sizeof(AgentToCmBarrierStatusReport));
if (barrierInfo == NULL) {
write_runlog(ERROR, "MSG_AGENT_CM_DATA_INSTANCE_BARRIER is null. inBuffer->qtype: %d \n", inBuffer->qtype);
return;
}
barrierInfo->global_achive_barrierId[BARRIERLEN - 1] = '\0';
barrierInfo->global_barrierId[BARRIERLEN - 1] = '\0';
barrierInfo->query_barrierId[BARRIERLEN - 1] = '\0';
barrierInfo->barrierID[BARRIERLEN - 1] = '\0';
SetDatanodeBarrierInfo(barrierInfo);
cm_to_agent_barrier_info barrierRespMsg;
DnBarrierReportRespToCm(&barrierRespMsg);
barrierRespMsg.msg_type = (int)MSG_CM_AGENT_DATANODE_INSTANCE_BARRIER;
barrierRespMsg.instanceId = barrierInfo->instanceId;
barrierRespMsg.node = barrierInfo->node;
(void)RespondMsg(recvMsgInfo, 'S', (char *)(&barrierRespMsg), sizeof(cm_to_agent_barrier_info), DEBUG5);
return;
}
static status_t CheckDdbType(DDB_TYPE toDdbType)
{
if (g_dbType == DB_UNKOWN) {
write_runlog(LOG, "[switch] ddb type is unknown.\n");
return CM_ERROR;
}
bool isEtcdToDcc = (g_dbType == DB_ETCD) && (toDdbType == DB_DCC);
bool isDccToEtcd = (g_dbType == DB_DCC) && (toDdbType == DB_ETCD);
if (isEtcdToDcc || isDccToEtcd) {
write_runlog(LOG, "[switch] can do switch ddb to %d.\n", (int)toDdbType);
return CM_SUCCESS;
}
write_runlog(ERROR, "[switch] current ddb is %s, can't switch to %d.\n", GetDdbToString(g_dbType), (int)toDdbType);
return CM_ERROR;
}
static status_t CreateMaintainPath(const char *maintainFile)
{
std::string ddbStr = to_string(static_cast<int>(g_dbType));
FILE *fp = fopen(maintainFile, "w+");
if (fp == NULL) {
write_runlog(ERROR, "[switch] can't open file \"%s\", errno(%d).\n", maintainFile, errno);
return CM_ERROR;
}
(void)chmod(maintainFile, S_IRUSR | S_IWUSR);
if (fwrite(ddbStr.c_str(), ddbStr.size(), 1, fp) != 1) {
write_runlog(ERROR, "[switch] could not write file \"%s\", errno(%d)\n", maintainFile, errno);
(void)fclose(fp);
return CM_ERROR;
}
(void)fclose(fp);
return CM_SUCCESS;
}
static status_t SendMaintainFileToAllCms(const char *maintainFile)
{
int ret;
char cmd[CM_PATH_LENGTH] = {0};
for (uint32 i = 0; i < g_cm_server_num; ++i) {
if (g_node[g_nodeIndexForCmServer[i]].node == g_currentNode->node) {
continue;
}
if (GetIpVersion(g_node[g_nodeIndexForCmServer[i]].sshChannel[0]) == AF_INET6) {
ret = snprintf_s(cmd, CM_PATH_LENGTH, CM_PATH_LENGTH - 1, "scp %s %s@[%s]:%s", maintainFile, pw->pw_name,
g_node[g_nodeIndexForCmServer[i]].sshChannel[0], maintainFile);
securec_check_intval(ret, (void)ret);
} else {
ret = snprintf_s(cmd, CM_PATH_LENGTH, CM_PATH_LENGTH - 1, "scp %s %s@%s:%s", maintainFile, pw->pw_name,
g_node[g_nodeIndexForCmServer[i]].sshChannel[0], maintainFile);
securec_check_intval(ret, (void)ret);
}
ret = system(cmd);
if (ret != -1 && WEXITSTATUS(ret) == 0) {
write_runlog(LOG, "[switch] exec cmd(%s) success\n", cmd);
continue;
} else {
write_runlog(ERROR, "[switch] exec cmd(%s) failed, ret = %d, errno = %d.\n", cmd, WEXITSTATUS(ret), errno);
return CM_ERROR;
}
}
return CM_SUCCESS;
}
static status_t CreateMaintainFileInAllCms()
{
char maintainFile[CM_PATH_LENGTH] = {0};
if (GetMaintainPath(maintainFile, CM_PATH_LENGTH) != CM_SUCCESS) {
write_runlog(ERROR, "[switch] get maintain file path fail.\n");
return CM_ERROR;
}
if (CreateMaintainPath(maintainFile) != CM_SUCCESS) {
write_runlog(ERROR, "[switch] can't into maintain mode.\n");
return CM_ERROR;
}
if (SendMaintainFileToAllCms(maintainFile) != CM_SUCCESS) {
write_runlog(ERROR, "[switch] send maintain file fail.\n");
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t CreateCommitFlagFile()
{
FILE *fp;
errno_t rc;
char gausshomePath[CM_PATH_LENGTH] = {0};
char commitFlagFile[CM_PATH_LENGTH] = {0};
if (GetHomePath(gausshomePath, sizeof(gausshomePath)) != EOK) {
write_runlog(ERROR, "get GAUSSHOME env fail, errno(%d).\n", errno);
return CM_ERROR;
}
rc = snprintf_s(commitFlagFile, CM_PATH_LENGTH, CM_PATH_LENGTH - 1, "%s/bin/switch_commit_flag", gausshomePath);
securec_check_intval(rc, (void)rc);
canonicalize_path(commitFlagFile);
if ((fp = fopen(commitFlagFile, "w+")) == NULL) {
write_runlog(ERROR, "[switch] can't open file \"%s\", errno(%d).\n", commitFlagFile, errno);
return CM_ERROR;
}
(void)chmod(commitFlagFile, S_IRUSR | S_IWUSR);
(void)fclose(fp);
return CM_SUCCESS;
}
static status_t DeleteCommitFlag()
{
int ret;
char gausshome[CM_PATH_LENGTH] = {0};
char commitFlag[CM_PATH_LENGTH] = {0};
if (GetHomePath(gausshome, sizeof(gausshome)) != 0) {
write_runlog(ERROR, "get GAUSSHOME env fail, errno(%d).\n", errno);
return CM_ERROR;
}
ret = snprintf_s(commitFlag, CM_PATH_LENGTH, CM_PATH_LENGTH - 1, "%s/bin/switch_commit_flag", gausshome);
securec_check_intval(ret, (void)ret);
(void)unlink(commitFlag);
return CM_SUCCESS;
}
static status_t SetCmsConfParam(const DDB_TYPE ddb)
{
int ret;
char setCmd[CM_PATH_LENGTH] = {0};
if (ddb == DB_UNKOWN) {
write_runlog(ERROR, "[switch] set ddb is unknown.\n");
return CM_ERROR;
}
if (CreateCommitFlagFile() != CM_SUCCESS) {
write_runlog(ERROR, "[switch] create commit flag file fail.\n");
return CM_ERROR;
}
ret = snprintf_s(
setCmd,
CM_PATH_LENGTH,
CM_PATH_LENGTH - 1,
"nohup cm_ctl set --param --server -k \"ddb_type\"=\"%d\"",
(int)(ddb));
securec_check_intval(ret, (void)ret);
if (system(setCmd) != 0) {
write_runlog(ERROR, "[switch] exec set cmd(%s) fail.\n", setCmd);
return CM_ERROR;
}
if (DeleteCommitFlag() != CM_SUCCESS) {
write_runlog(ERROR, "[switch] delete commit file fail.\n");
return CM_ERROR;
}
write_runlog(LOG, "[switch] exec set cmd(%s) success.\n", setCmd);
return CM_SUCCESS;
}
static void ProcessEnterMaintainMode(const CtlToCmsSwitch *switchMsg, CmsToCtlSwitchAck *ackMsg)
{
errno_t rc;
DDB_TYPE toDdbType = GetStringToDdb(switchMsg->ddbType);
if (toDdbType == DB_ETCD && g_etcd_num == 0) {
ackMsg->isSuccess = false;
rc = strcpy_s(ackMsg->errMsg, CM_PATH_LENGTH,
"can't switch to ETCD, cause no etcd is configured in the cluster.");
securec_check_errno(rc, (void)rc);
return;
}
if (CheckDdbType(toDdbType) != CM_SUCCESS) {
ackMsg->isSuccess = false;
rc = snprintf_s(ackMsg->errMsg, CM_PATH_LENGTH, CM_PATH_LENGTH - 1,
"current ddb is %s, can't switch to %s, you can switch to DCC or ETCD.",
GetDdbToString(g_dbType), switchMsg->ddbType);
securec_check_intval(rc, (void)rc);
return;
}
if (CreateMaintainFileInAllCms() != CM_SUCCESS) {
ackMsg->isSuccess = false;
rc = strcpy_s(ackMsg->errMsg, CM_PATH_LENGTH, "create maintain files fail.");
securec_check_errno(rc, (void)rc);
return;
}
ackMsg->isSuccess = true;
return;
}
static status_t SwitchSaveAllKVS()
{
char kvFile[MAX_PATH_LEN] = {0};
DDB_RESULT dbResult = SUCCESS_GET_VALUE;
DrvSaveOption option = {0};
if (GetDdbKVFilePath(kvFile, MAX_PATH_LEN) != CM_SUCCESS) {
write_runlog(ERROR, "[switch] can't get kv file path.\n");
return CM_ERROR;
}
option.kvFile = kvFile;
if (SaveAllKVFromDDb(&dbResult, &option) != CM_SUCCESS) {
write_runlog(ERROR, "[switch] can't save all KV, error msg is %d.\n", (int)dbResult);
return CM_ERROR;
}
return CM_SUCCESS;
}
static void ProcessSaveAllKVS(const char *ddbType, CmsToCtlSwitchAck *ackMsg)
{
errno_t rc;
if (SwitchSaveAllKVS() != CM_SUCCESS) {
ackMsg->isSuccess = false;
rc = strcpy_s(ackMsg->errMsg, CM_PATH_LENGTH, "save all kvs from ddb fail.");
securec_check_errno(rc, (void)rc);
return;
}
if (SetCmsConfParam(GetStringToDdb(ddbType)) != CM_SUCCESS) {
ackMsg->isSuccess = false;
rc = strcpy_s(ackMsg->errMsg, CM_PATH_LENGTH, "set cm_server.conf ddb_type param fail.");
securec_check_errno(rc, (void)rc);
return;
}
ackMsg->isSuccess = true;
return;
}
static status_t GetMaintainFromOtherCms(const char *kvFile)
{
int ret;
int index = -1;
char cmd[CM_PATH_LENGTH] = {0};
for (uint32 i = 0; i < g_cm_server_num; ++i) {
if (g_node[i].node == g_currentNode->node) {
continue;
}
ret = snprintf_s(cmd,
CM_PATH_LENGTH,
CM_PATH_LENGTH - 1,
"pssh %s -H %s \" stat %s \" > /dev/null 2>&1",
PSSH_TIMEOUT_OPTION,
g_node[i].sshChannel[0],
kvFile);
securec_check_intval(ret, (void)ret);
ret = system(cmd);
if (ret != -1 && WEXITSTATUS(ret) == 0) {
write_runlog(LOG, "[switch] node(%u) exist maintain, exec cmd(%s).\n", i, cmd);
index = (int)i;
break;
}
write_runlog(LOG, "[switch] node(%u) exist no maintain, exec cmd(%s).\n", i, cmd);
}
if (index == -1) {
write_runlog(ERROR, "no cms has maintain file.\n");
return CM_ERROR;
}
ret = memset_s(cmd, sizeof(cmd), 0, sizeof(cmd));
securec_check_errno(ret, (void)ret);
if (GetIpVersion(g_node[index].sshChannel[0]) == AF_INET6) {
ret = snprintf_s(cmd, CM_PATH_LENGTH, CM_PATH_LENGTH - 1, "scp %s@[%s]:%s %s > /dev/null 2>&1",
pw->pw_name, g_node[index].sshChannel[0], kvFile, kvFile);
securec_check_intval(ret, (void)ret);
} else {
ret = snprintf_s(cmd, CM_PATH_LENGTH, CM_PATH_LENGTH - 1, "scp %s@%s:%s %s > /dev/null 2>&1",
pw->pw_name, g_node[index].sshChannel[0], kvFile, kvFile);
securec_check_intval(ret, (void)ret);
}
ret = system(cmd);
if (ret != -1 && WEXITSTATUS(ret) == 0) {
write_runlog(LOG, "[switch] exec cmd(%s) success.\n", cmd);
return CM_SUCCESS;
}
write_runlog(ERROR, "[switch] exec cmd(%s) fail, ret = %d, errno = %d.\n", cmd, WEXITSTATUS(ret), errno);
return CM_ERROR;
}
static status_t GetKVFile(char *kvFile, uint32 len)
{
struct stat buf = {0};
if (GetDdbKVFilePath(kvFile, len) != CM_SUCCESS) {
return CM_ERROR;
}
if (stat(kvFile, &buf) != 0) {
if (GetMaintainFromOtherCms(kvFile) != CM_SUCCESS) {
write_runlog(ERROR, "[switch] no kv file, can't do switch ddb.\n");
return CM_ERROR;
}
}
return CM_SUCCESS;
}
static inline void ModifierLine(char *line)
{
char *find = strchr(line, '\n');
if (find != NULL) {
*find = '\0';
}
return;
}
static void ProcessSwitchDdb(CmsToCtlSwitchAck *ackMsg)
{
errno_t rc;
DrvSetOption opt = {0};
char kvFile[CM_PATH_LENGTH] = {0};
if (GetKVFile(kvFile, CM_PATH_LENGTH) != CM_SUCCESS) {
ackMsg->isSuccess = false;
rc = strcpy_s(ackMsg->errMsg, CM_PATH_LENGTH, "no kv file exist.");
securec_check_errno(rc, (void)rc);
return;
}
char **lines = CmReadfile(kvFile);
if (lines == NULL) {
write_runlog(ERROR, "read kv file failed, errno(%d).\n", errno);
rc = strcpy_s(ackMsg->errMsg, CM_PATH_LENGTH, "read kv file failed.");
securec_check_errno(rc, (void)rc);
return;
}
int i = 0;
opt.maintainCanSet = true;
opt.isSetBinary = false;
while (lines[i] != NULL && lines[i + 1] != NULL) {
ModifierLine(lines[i]);
ModifierLine(lines[i + 1]);
if (SetKV2Ddb(lines[i], DDB_KEY_LEN, lines[i + 1], DDB_VALUE_LEN, &opt) != CM_SUCCESS) {
ackMsg->isSuccess = false;
rc = strcpy_s(ackMsg->errMsg, CM_PATH_LENGTH, "put all kv to new ddb fail.");
securec_check_errno(rc, (void)rc);
freefile(lines);
return;
}
i += KV_POS;
}
ackMsg->isSuccess = true;
freefile(lines);
return;
}
void ProcessCtlToCmsSwitchMsg(MsgRecvInfo* recvMsgInfo, CtlToCmsSwitch *switchMsg)
{
CmsToCtlSwitchAck ackMsg;
switchMsg->ddbType[CM_PATH_LENGTH - 1] = '\0';
switch (switchMsg->step) {
case SWITCH_DDB_ENTER_MAINTAIN:
ProcessEnterMaintainMode(switchMsg, &ackMsg);
break;
case SWITCH_DDB_SAVE_ALL_KVS:
ProcessSaveAllKVS(switchMsg->ddbType, &ackMsg);
break;
case SWITCH_DDB:
ProcessSwitchDdb(&ackMsg);
break;
case UNKNOWN_STEP:
default:
break;
}
ackMsg.msgType = (int)MSG_CMS_CTL_SWITCH_ACK;
(void)RespondMsg(recvMsgInfo, 'S', (char *)(&ackMsg), sizeof(ackMsg));
return;
}