* Copyright (c) Huawei Technologies Co., Ltd. 2022. All rights reserved.
*
* CM is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cms_process_messages_agent.cpp
*
*
* IDENTIFICATION
* src/cm_server/cms_process_messages_agent.cpp
*
* -------------------------------------------------------------------------
*/
#include "cms_conn.h"
#include "cms_cus_res.h"
#include "cms_common_res.h"
#include "cms_ddb_adapter.h"
#include "cms_global_params.h"
#include "cms_process_messages.h"
#include "cm_misc_res.h"
typedef struct ResStatReportInfoSt {
uint32 nodeId;
MaxClusterResStatus isAvail;
} ResStatReportInfo;
typedef struct OneResStatReportInterSt {
uint32 nodeId;
uint32 cmInstId;
uint32 statReportInter;
} OneResStatReportInter;
typedef struct ResStatReportInterSt {
char resName[CM_MAX_RES_NAME];
uint32 instCount;
OneResStatReportInter resReport[CM_MAX_RES_INST_COUNT];
} ResStatReportInter;
static ResStatReportInfo *g_resNodeStat = NULL;
static ResStatReportInter *g_resInstReport = NULL;
void ProcessReportResChangedMsg(bool notifyClient, const OneResStatList *status)
{
CmsReportResStatList sendMsg = {0};
sendMsg.msgType = notifyClient ? (int)MSG_CM_AGENT_RES_STATUS_CHANGED : (int)MSG_CM_AGENT_RES_STATUS_LIST;
errno_t rc = memcpy_s(&sendMsg.resList, sizeof(OneResStatList), status, sizeof(OneResStatList));
securec_check_errno(rc, (void)rc);
(void)BroadcastMsg('S', (char *)(&sendMsg), sizeof(CmsReportResStatList));
}
uint32 ResCheckResultTransToResStat(uint32 recvStat)
{
if (recvStat == CUS_RES_CHECK_STAT_ONLINE) {
return (uint32)CM_RES_STAT_ONLINE;
}
if (recvStat == CUS_RES_CHECK_STAT_OFFLINE) {
return (uint32)CM_RES_STAT_OFFLINE;
}
if (recvStat == CUS_RES_CHECK_STAT_TIMEOUT) {
return (uint32)CM_RES_STAT_UNKNOWN;
}
if (recvStat == CUS_RES_CHECK_STAT_ABNORMAL) {
return (uint32)CM_RES_STAT_ONLINE;
}
return (uint32)CM_RES_STAT_UNKNOWN;
}
void IncreaseResReportInterMain(const char *resName, uint32 increaseInstId)
{
if (g_resInstReport == NULL) {
return;
}
for (uint32 i = 0; i < CusResCount(); ++i) {
if (strcmp(g_resInstReport[i].resName, resName) != 0) {
continue;
}
for (uint32 j = 0; j < g_resInstReport[i].instCount; ++j) {
if (g_resInstReport[i].resReport[j].cmInstId == increaseInstId) {
++(g_resInstReport[i].resReport[j].statReportInter);
break;
}
}
break;
}
}
void IncreaseOneResInstReportInter(const char *resName, uint32 instId)
{
IncreaseResReportInterMain(resName, instId);
}
uint32 GetResReportInterMain(const char *resName, uint32 instId)
{
if (g_resInstReport == NULL) {
return 0;
}
for (uint32 i = 0; i < CusResCount(); ++i) {
if (strcmp(g_resInstReport[i].resName, resName) != 0) {
continue;
}
for (uint32 j = 0; j < g_resInstReport[i].instCount; ++j) {
if (g_resInstReport[i].resReport[j].cmInstId == instId) {
return g_resInstReport[i].resReport[j].statReportInter;
}
}
break;
}
return 0;
}
uint32 GetOneResInstReportInter(const char *resName, uint32 instId)
{
return GetResReportInterMain(resName, instId);
}
static bool IsOneCusResStatValid(uint32 status)
{
if (status == CUS_RES_CHECK_STAT_ONLINE ||
status == CUS_RES_CHECK_STAT_OFFLINE ||
status == CUS_RES_CHECK_STAT_ABNORMAL ||
status == CUS_RES_CHECK_STAT_TIMEOUT) {
return true;
}
return false;
}
static void CleanResReportInter(const CmResourceStatus *resStat)
{
for (uint32 i = 0; i < CusResCount(); ++i) {
if (strcmp(g_resInstReport[i].resName, resStat->resName) != 0) {
continue;
}
for (uint32 j = 0; j < g_resInstReport[i].instCount; ++j) {
if (g_resInstReport[i].resReport[j].cmInstId != resStat->cmInstanceId) {
continue;
}
if (IsOneCusResStatValid(resStat->status)) {
g_resInstReport[i].resReport[j].statReportInter = 0;
}
break;
}
break;
}
}
void CleanAllResStatusReportInter()
{
write_runlog(LOG, "cms will clean res status report.\n");
for (uint32 i = 0; i < CusResCount(); ++i) {
for (uint32 j = 0; j < g_resInstReport[i].instCount; ++j) {
g_resInstReport[i].resReport[j].statReportInter = 0;
}
}
}
static bool8 IsNodeCriticalResReportTimeout(uint32 nodeId)
{
for (uint32 i = 0; i < CusResCount(); ++i) {
for (uint32 j = 0; j < g_resInstReport[i].instCount; ++j) {
if (g_resInstReport[i].resReport[j].nodeId != nodeId) {
continue;
}
if (g_resInstReport[i].resReport[j].statReportInter >= g_agentNetworkTimeout) {
return CM_TRUE;
}
}
}
return CM_FALSE;
}
static void InitResInstReport()
{
g_resInstReport = (ResStatReportInter*)CmMalloc(sizeof(ResStatReportInter) * CusResCount());
errno_t rc;
for (uint32 i = 0; i < CusResCount(); ++i) {
rc = strcpy_s(g_resInstReport[i].resName, CM_MAX_RES_NAME, g_resStatus[i].status.resName);
securec_check_errno(rc, (void)rc);
g_resInstReport[i].instCount = g_resStatus[i].status.instanceCount;
for (uint32 j = 0; j < g_resInstReport[i].instCount; ++j) {
g_resInstReport[i].resReport[j].nodeId = g_resStatus[i].status.resStat[j].nodeId;
g_resInstReport[i].resReport[j].cmInstId = g_resStatus[i].status.resStat[j].cmInstanceId;
g_resInstReport[i].resReport[j].statReportInter = 0;
}
}
}
static void InitResStatReport()
{
g_resNodeStat = (ResStatReportInfo*)CmMalloc(sizeof(ResStatReportInfo) * GetResNodeCount());
for (uint32 i = 0; i < GetResNodeCount(); ++i) {
g_resNodeStat[i].nodeId = GetResNodeId(i);
g_resNodeStat[i].isAvail = MAX_CLUSTER_STATUS_INIT;
}
}
void InitNodeReportVar()
{
InitResInstReport();
InitResStatReport();
}
static uint32 FindNodeReportResInterByNodeId(uint32 nodeId)
{
for (uint32 i = 0; i < GetResNodeCount(); ++i) {
if (g_resNodeStat[i].nodeId == nodeId) {
return i;
}
}
return GetResNodeCount();
}
static MaxClusterResStatus GetResNodeStatByReport(uint32 stat)
{
if (stat == RES_INST_WORK_STATUS_UNAVAIL) {
return MAX_CLUSTER_STATUS_UNAVAIL;
} else if (stat == RES_INST_WORK_STATUS_AVAIL) {
return MAX_CLUSTER_STATUS_AVAIL;
} else if (stat == RES_INST_WORK_STATUS_UNKNOWN) {
return MAX_CLUSTER_STATUS_UNKNOWN;
} else {
write_runlog(LOG, "recv unknown status %u.\n", stat);
}
return MAX_CLUSTER_STATUS_UNKNOWN;
}
static MaxClusterResStatus IsAllNodeResInstAvail(const OneNodeResourceStatus *nodeStat, MaxClusterResStatus oldStat)
{
for (uint32 i = 0; i < nodeStat->count; ++i) {
MaxClusterResStatus newStat = GetResNodeStatByReport(nodeStat->status[i].workStatus);
if (newStat != oldStat) {
write_runlog(LOG, "recv cus_res inst(%u): new work_status(%d), old work_status(%d).\n",
nodeStat->status[i].cmInstanceId, (int)newStat, (int)oldStat);
}
if (newStat != MAX_CLUSTER_STATUS_AVAIL) {
return newStat;
}
}
return MAX_CLUSTER_STATUS_AVAIL;
}
static inline void WriteGetResNodeStatErrLog(uint32 nodeId)
{
write_runlog(ERROR, "can't find nodeId(%u) in g_resNodeStat.\n", nodeId);
for (uint32 i = 0; i < GetResNodeCount(); ++i) {
write_runlog(ERROR, "g_resNodeStat[%u].nodeId = %u.\n", i, g_resNodeStat[i].nodeId);
}
}
static void RecordResStatReport(const OneNodeResourceStatus *nodeStat)
{
if (g_resNodeStat == NULL || g_resInstReport == NULL) {
write_runlog(ERROR, "g_resNodeStat or g_resInstReport is null.\n");
return;
}
uint32 ind = FindNodeReportResInterByNodeId(nodeStat->node);
if (ind < GetResNodeCount()) {
for (uint32 i = 0; i < nodeStat->count; ++i) {
CleanResReportInter(&nodeStat->status[i]);
}
g_resNodeStat[ind].isAvail = IsAllNodeResInstAvail(nodeStat, g_resNodeStat[ind].isAvail);
} else {
WriteGetResNodeStatErrLog(nodeStat->node);
}
}
static const char* GetClusterResStatStr(MaxClusterResStatus stat)
{
switch (stat) {
case MAX_CLUSTER_STATUS_INIT:
return "init";
case MAX_CLUSTER_STATUS_UNKNOWN:
return "unknown";
case MAX_CLUSTER_STATUS_AVAIL:
return "avail";
case MAX_CLUSTER_STATUS_UNAVAIL:
return "unavail";
case MAX_CLUSTER_STATUS_CEIL:
break;
}
return "";
}
MaxClusterResStatus GetResNodeStat(uint32 nodeId, int logLevel)
{
if (g_resNodeStat == NULL || g_resInstReport == NULL) {
return MAX_CLUSTER_STATUS_UNKNOWN;
}
if (IsNodeCriticalResReportTimeout(nodeId)) {
write_runlog(logLevel, "recv node(%u) agent report res status msg timeout.\n", nodeId);
return MAX_CLUSTER_STATUS_UNAVAIL;
}
uint32 ind = FindNodeReportResInterByNodeId(nodeId);
if (ind < GetResNodeCount()) {
if (g_resNodeStat[ind].isAvail != MAX_CLUSTER_STATUS_AVAIL) {
write_runlog(logLevel, "node(%u) stat (%s).\n", nodeId, GetClusterResStatStr(g_resNodeStat[ind].isAvail));
}
return g_resNodeStat[ind].isAvail;
} else {
WriteGetResNodeStatErrLog(nodeId);
}
return MAX_CLUSTER_STATUS_UNKNOWN;
}
static bool8 IsResInstStatChange(uint32 cmInstId, uint32 recvStat, CmResStatList *oldResStat, uint32 *changeInd)
{
for (uint32 i = 0; i < oldResStat->status.instanceCount; ++i) {
if (cmInstId != oldResStat->status.resStat[i].cmInstanceId) {
continue;
}
if (!IsOneCusResStatValid(recvStat)) {
if (oldResStat->status.resStat[i].status != (uint32)CM_RES_STAT_UNKNOWN) {
write_runlog(LOG, "recv inst(%u) invalid res status(%u), exceed (%u)s, will set it unknown.\n",
cmInstId, recvStat, g_agentNetworkTimeout);
}
return CM_FALSE;
}
uint32 newStat = ResCheckResultTransToResStat(recvStat);
uint32 oldStat = oldResStat->status.resStat[i].status;
if (oldStat != newStat) {
write_runlog(LOG, "inst(%u)'s old status(%u) change to new status(%u).\n", cmInstId, oldStat, newStat);
(*changeInd) = i;
return CM_TRUE;
} else {
return CM_FALSE;
}
}
write_runlog(ERROR, "res(%s)'s inst(%u) not exist.\n", oldResStat->status.resName, cmInstId);
return CM_FALSE;
}
static void ProcessOneResInstStatReport(CmResourceStatus *newStat)
{
newStat->resName[CM_MAX_RES_NAME - 1] = '\0';
uint32 index = 0;
if (GetGlobalResStatusIndex(newStat->resName, index) != CM_SUCCESS) {
write_runlog(ERROR, "%s, unknown the resName(%s).\n", __func__, newStat->resName);
return;
}
uint32 changeInd = 0;
bool8 isChanged = IsResInstStatChange(newStat->cmInstanceId, newStat->status, &g_resStatus[index], &changeInd);
if (isChanged) {
(void)pthread_rwlock_wrlock(&(g_resStatus[index].rwlock));
g_resStatus[index].status.resStat[changeInd].status = ResCheckResultTransToResStat(newStat->status);
++(g_resStatus[index].status.version);
OneResStatList resStat = g_resStatus[index].status;
(void)pthread_rwlock_unlock(&(g_resStatus[index].rwlock));
ProcessReportResChangedMsg(true, &resStat);
PrintCusInfoResList(&resStat, __FUNCTION__);
}
}
void ProcessAgent2CmResStatReportMsg(ReportResStatus *resStatusPtr)
{
RecordResStatReport(&resStatusPtr->nodeStat);
for (uint32 i = 0; i < resStatusPtr->nodeStat.count; ++i) {
ProcessOneResInstStatReport(&resStatusPtr->nodeStat.status[i]);
}
}
void ProcessRequestResStatusListMsg(MsgRecvInfo* recvMsgInfo)
{
CmsReportResStatList sendMsg = {0};
sendMsg.msgType = (int)MSG_CM_AGENT_RES_STATUS_LIST;
for (uint32 i = 0; i < CusResCount(); ++i) {
(void)pthread_rwlock_rdlock(&(g_resStatus[i].rwlock));
errno_t rc = memcpy_s(&sendMsg.resList, sizeof(OneResStatList), &g_resStatus[i].status, sizeof(OneResStatList));
securec_check_errno(rc, (void)rc);
(void)pthread_rwlock_unlock(&(g_resStatus[i].rwlock));
(void)RespondMsg(recvMsgInfo, 'S', (char*)(&sendMsg), sizeof(CmsReportResStatList));
}
}
void ProcessRequestLatestResStatusListMsg(MsgRecvInfo *recvMsgInfo, RequestLatestStatList *recvMsg)
{
CmsReportResStatList sendMsg = {0};
sendMsg.msgType = (int)MSG_CM_AGENT_RES_STATUS_LIST;
errno_t rc;
for (uint32 i = 0; i < CusResCount(); ++i) {
if (g_resStatus[i].status.version == recvMsg->statVersion[i]) {
continue;
}
(void)pthread_rwlock_rdlock(&(g_resStatus[i].rwlock));
rc = memcpy_s(&sendMsg.resList, sizeof(OneResStatList), &g_resStatus[i].status, sizeof(OneResStatList));
securec_check_errno(rc, (void)rc);
(void)pthread_rwlock_unlock(&(g_resStatus[i].rwlock));
(void)RespondMsg(recvMsgInfo, 'S', (char*)(&sendMsg), sizeof(CmsReportResStatList));
}
}
static inline void GetResLockDdbKey(char *key, uint32 keyLen, const char *resName, const char *lockName)
{
int ret = snprintf_s(key, keyLen, keyLen - 1, "/%s/CM/LockOwner/%s/%s", pw->pw_name, resName, lockName);
securec_check_intval(ret, (void)ret);
}
static status_t GetLockOwner(const char *resName, const char *lockName, uint32 &curLockOwner)
{
char key[MAX_PATH_LEN] = {0};
GetResLockDdbKey(key, MAX_PATH_LEN, resName, lockName);
char lockValue[MAX_PATH_LEN] = {0};
DDB_RESULT ddbResult = SUCCESS_GET_VALUE;
status_t st = GetKVFromDDb(key, MAX_PATH_LEN, lockValue, MAX_PATH_LEN, &ddbResult);
if (st != CM_SUCCESS) {
if (ddbResult != CAN_NOT_FIND_THE_KEY) {
write_runlog(ERROR, "[CLIENT] failed to get value of key(%s).\n", key);
return CM_ERROR;
}
curLockOwner = 0;
return CM_SUCCESS;
}
if (is_digit_string(lockValue) != 1) {
write_runlog(ERROR, "[CLIENT] the value(%s) of key(%s) is not digit, delete it.\n", lockValue, key);
if (DelKeyInDdb(key, (uint32)strlen(key)) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] ddb del failed. key=%s.\n", key);
return CM_ERROR;
}
curLockOwner = 0;
return CM_SUCCESS;
}
int owner = CmAtoi(lockValue, 1);
if (!IsResInstIdValid(owner)) {
write_runlog(LOG, "[CLIENT] cur res(%s) (%s)lock owner(%d) is invalid, delete it.\n", resName, lockName, owner);
if (DelKeyInDdb(key, (uint32)strlen(key)) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] ddb del failed. key=%s.\n", key);
return CM_ERROR;
}
curLockOwner = 0;
return CM_SUCCESS;
}
curLockOwner = (uint32)owner;
return CM_SUCCESS;
}
static uint32 GetOfficialId()
{
static uint32 officialId = -1;
for (uint32 i = 0; i < g_node_num; i++) {
for (uint32 j = 0; j < g_node[i].datanodeCount; j++) {
if (g_node[i].datanode[j].datanodeRole == PRIMARY_DN) {
officialId = g_node[i].datanode[j].datanodeId;
write_runlog(LOG, "PRIMARY_DN is %d.\n", officialId);
return officialId;
}
}
}
return officialId;
}
static status_t SetLockOwner4FirstReform()
{
char key[MAX_PATH_LEN] = {0};
char resName[CM_MAX_RES_NAME] = {0};
char lockName[CM_MAX_LOCK_NAME] = {0};
char lockValue[MAX_PATH_LEN] = {0};
char officialId[MAX_PATH_LEN] = {0};
errno_t rc;
rc = strncpy_s(resName, CM_MAX_RES_NAME, "dms_res", strlen("dms_res"));
securec_check_errno(rc, (void)rc);
rc = strncpy_s(lockName, CM_MAX_LOCK_NAME, "dms_reformer_lock", strlen("dms_reformer_lock"));
securec_check_errno(rc, (void)rc);
GetResLockDdbKey(key, MAX_PATH_LEN, resName, lockName);
DDB_RESULT ddbResult = SUCCESS_GET_VALUE;
status_t st = GetKVFromDDb(key, MAX_PATH_LEN, lockValue, MAX_PATH_LEN, &ddbResult);
if (st != CM_SUCCESS) {
if (ddbResult != CAN_NOT_FIND_THE_KEY) {
write_runlog(ERROR, "failed to get value with key(%s) in 1st reform, error info:%d.\n", key, (int)ddbResult);
return CM_ERROR;
}
write_runlog(LOG, "not exit res(%s) status, key:\"%s\" in ddb in 1st reform.\n", resName, key);
}
uint32 tempOfficialId = GetOfficialId();
if (IsResInstIdValid(tempOfficialId)) {
if (IsOneResInstWork(resName, tempOfficialId)) {
snprintf(officialId, MAX_PATH_LEN, "%d", tempOfficialId);
} else {
write_runlog(ERROR, "res(%s) inst(%u) has been get out of cluster, can't do lock.\n",
resName, tempOfficialId);
return CM_ERROR;
}
} else {
write_runlog(ERROR, "can not get the official id.\n");
return CM_ERROR;
}
if (strlen(lockValue) == 0) {
status_t st = SetKV2Ddb(key, MAX_PATH_LEN, officialId, MAX_PATH_LEN, NULL);
if (st != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] failed to set official id for key(%s).\n", key);
return CM_ERROR;
}
write_runlog(LOG, "[CLIENT] official id set for key(%s): %s\n", key, officialId);
}
return CM_SUCCESS;
}
static status_t SetNewLockOwner(const char *resName, const char *lockName, uint32 curLockOwner, uint32 resInstId)
{
char key[MAX_PATH_LEN] = {0};
GetResLockDdbKey(key, MAX_PATH_LEN, resName, lockName);
char value[MAX_PATH_LEN] = {0};
int ret = snprintf_s(value, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%u", resInstId);
securec_check_intval(ret, (void)ret);
DrvSetOption opt = {0};
char preValue[MAX_PATH_LEN] = {0};
if (curLockOwner != 0) {
ret = snprintf_s(preValue, MAX_PATH_LEN, MAX_PATH_LEN - 1, "%u", curLockOwner);
securec_check_intval(ret, (void)ret);
opt.preValue = preValue;
opt.len = (uint32)strlen(preValue);
}
if (SetKV2Ddb(key, MAX_PATH_LEN, value, MAX_PATH_LEN, &opt) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] ddb set failed. key=%s, value=%s.\n", key, value);
return CM_ERROR;
}
return CM_SUCCESS;
}
static status_t DeleteLockKey(const char *resName, const char *lockName)
{
char key[MAX_PATH_LEN];
GetResLockDdbKey(key, MAX_PATH_LEN, resName, lockName);
if (DelKeyInDdb(key, (uint32)strlen(key)) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] ddb del failed. key=%s.\n", key);
return CM_ERROR;
}
return CM_SUCCESS;
}
void ReleaseResLockOwner(const char *resName, uint32 instId)
{
const uint32 kvCount = 10;
DrvKeyValue kvs[kvCount];
errno_t rc = memset_s(kvs, sizeof(DrvKeyValue) * kvCount, 0, sizeof(DrvKeyValue) * kvCount);
securec_check_errno(rc, (void)rc);
char key[MAX_PATH_LEN] = {0};
int ret = snprintf_s(key, MAX_PATH_LEN, MAX_PATH_LEN - 1, "/%s/CM/LockOwner/%s", pw->pw_name, resName);
securec_check_intval(ret, (void)ret);
DDB_RESULT dbResult = SUCCESS_GET_VALUE;
if (GetAllKVFromDDb(key, MAX_PATH_LEN, kvs, kvCount, &dbResult) != CM_SUCCESS) {
if (dbResult != CAN_NOT_FIND_THE_KEY) {
write_runlog(ERROR, "[CLIENT] res(%s) release lock owner failed, get kvs fail.\n", resName);
}
return;
}
PrintKeyValueMsg(key, kvs, kvCount, DEBUG5);
for (uint32 i = 0; i < kvCount; ++i) {
if (kvs[i].key[0] == '\0' || kvs[i].value[0] == '\0') {
break;
}
if ((uint32)CmAtol(kvs[i].value, 0) != instId) {
continue;
}
if (DelKeyInDdb(kvs[i].key, (uint32)strlen(kvs[i].key)) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] release lock failed. key=%s, value=%s.\n", kvs[i].key, kvs[i].value);
} else {
write_runlog(LOG, "[CLIENT] release lock success. key=%s, value=%s.\n", kvs[i].key, kvs[i].value);
}
}
}
static bool RealTimeBuildIsOff(const char *resName, const char* lockName, uint32 cmInstId)
{
if (strcmp(lockName, "dms_reformer_lock") != 0) {
return false;
}
if (g_realtimeBuildStatus == 0) {
write_runlog(LOG, "skip check as realtimebuild of all nodes is off\n");
return false;
}
uint32 index = 0;
if (GetGlobalResStatusIndex(resName, index) != CM_SUCCESS) {
write_runlog(ERROR, "%s, unknown resName(%s).\n", __FUNCTION__, resName);
return false;
}
uint32 nodeId = 0;
bool found = false;
CmResStatList *resStat = &g_resStatus[index];
uint32 realtimeStatusForCmp = 0;
for (uint32 i = 0; i < resStat->status.instanceCount; ++i) {
if (resStat->status.resStat[i].cmInstanceId == cmInstId) {
nodeId = resStat->status.resStat[i].nodeId;
found = true;
}
if (resStat->status.resStat[i].status == 1) {
realtimeStatusForCmp |= (1U << (resStat->status.resStat[i].nodeId - 1));
}
}
if (realtimeStatusForCmp == (1U << (nodeId - 1))) {
write_runlog(LOG, "skip check as I'm the only one live node, curStatus:%u, g_realtimeBuildStatus:%u.\n",
realtimeStatusForCmp, g_realtimeBuildStatus);
return false;
}
write_runlog(LOG, "instid:%u, nodeid:%u, curStatus:%u, g_realtimeBuildStatus:%u.\n", cmInstId, nodeId,
realtimeStatusForCmp, g_realtimeBuildStatus);
if (((g_realtimeBuildStatus & realtimeStatusForCmp) == realtimeStatusForCmp) ||
((g_realtimeBuildStatus & realtimeStatusForCmp) == 0)) {
write_runlog(LOG, "skip check as realtimebuild of all nodes is off or on, g_realtimeBuildStatus:%u.\n",
g_realtimeBuildStatus);
return false;
}
if (!found) {
write_runlog(ERROR, "%s, skip check as can't get nodeId by instanceId(%d).\n", __FUNCTION__, cmInstId);
return false;
}
return !(g_realtimeBuildStatus & (1U << (nodeId - 1)));
}
static ClientError CmResLock(const CmaToCmsResLock *lockMsg)
{
if (SetLockOwner4FirstReform() != CM_SUCCESS) {
write_runlog(LOG, "[%s], can not set lockowner in first reform.\n", __FUNCTION__);
}
if (!IsResInstIdValid((int)lockMsg->cmInstId)) {
write_runlog(ERROR, "[CLIENT] res(%s) (%s)lock new owner (%u) is invalid.\n",
lockMsg->resName, lockMsg->lockName, lockMsg->cmInstId);
return CM_RES_CLIENT_CANNOT_DO;
}
if (!CanProcessResStatus()) {
write_runlog(LOG, "[%s], res status list invalid, can't continue.\n", __FUNCTION__);
return CM_RES_CLIENT_CANNOT_DO;
}
if (!IsOneResInstWork(lockMsg->resName, lockMsg->cmInstId)) {
write_runlog(ERROR, "[CLIENT] res(%s) inst(%u) has been get out of cluster, can't do lock.\n",
lockMsg->resName, lockMsg->cmInstId);
return CM_RES_CLIENT_CANNOT_DO;
}
uint32 curLockOwner;
if (GetLockOwner(lockMsg->resName, lockMsg->lockName, curLockOwner) != CM_SUCCESS) {
write_runlog(LOG, "[CLIENT] get (%s)lock owner failed, res(%s) inst(%u) can't lock.\n",
lockMsg->lockName, lockMsg->resName, lockMsg->cmInstId);
return CM_RES_CLIENT_DDB_ERR;
}
if (RealTimeBuildIsOff(lockMsg->resName, lockMsg->lockName, lockMsg->cmInstId)) {
write_runlog(LOG, "[CLIENT] res(%s) (%s)lock owner is inst(%u), inst(%u) can't lock as the realtime build status is off.\n",
lockMsg->resName, lockMsg->lockName, curLockOwner, lockMsg->cmInstId);
return CM_RES_CLIENT_CANNOT_DO;
}
if (curLockOwner == lockMsg->cmInstId) {
write_runlog(LOG, "[CLIENT] res(%s) (%s)lock owner(%u) is same with lock candidate, can't lock again.\n",
lockMsg->resName, lockMsg->lockName, curLockOwner);
return CM_RES_CLIENT_CANNOT_DO;
}
if (curLockOwner != 0) {
write_runlog(LOG, "[CLIENT] res(%s) (%s)lock owner is inst(%u), inst(%u) can't lock.\n",
lockMsg->resName, lockMsg->lockName, curLockOwner, lockMsg->cmInstId);
return CM_RES_CLIENT_CANNOT_DO;
}
if (SetNewLockOwner(lockMsg->resName, lockMsg->lockName, curLockOwner, lockMsg->cmInstId) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] res(%s) instance(%u) (%s)lock failed.\n",
lockMsg->resName, lockMsg->cmInstId, lockMsg->lockName);
return CM_RES_CLIENT_DDB_ERR;
}
write_runlog(LOG, "[CLIENT] res(%s) instance(%u) (%s)lock success.\n",
lockMsg->resName, lockMsg->cmInstId, lockMsg->lockName);
return CM_RES_CLIENT_SUCCESS;
}
static ClientError CmResUnlock(const CmaToCmsResLock *lockMsg)
{
uint32 curLockOwner = 0;
if (GetLockOwner(lockMsg->resName, lockMsg->lockName, curLockOwner) != CM_SUCCESS) {
write_runlog(LOG, "[CLIENT] get cur lock owner failed, res(%s) lockName(%s) inst(%u) can't unlock.\n",
lockMsg->resName, lockMsg->lockName, lockMsg->cmInstId);
return CM_RES_CLIENT_DDB_ERR;
}
if (curLockOwner == 0) {
write_runlog(LOG, "[CLIENT] cur lock owner is NULL, res(%s) lockName(%s) inst(%u) can't unlock.\n",
lockMsg->resName, lockMsg->lockName, lockMsg->cmInstId);
return CM_RES_CLIENT_CANNOT_DO;
}
if (curLockOwner != lockMsg->cmInstId) {
write_runlog(LOG, "[CLIENT] res(%s) lockName(%s) lock owner is (%u) not inst(%u), can't unlock.\n",
lockMsg->resName, lockMsg->lockName, curLockOwner, lockMsg->cmInstId);
return CM_RES_CLIENT_CANNOT_DO;
}
if (DeleteLockKey(lockMsg->resName, lockMsg->lockName) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] res(%s) inst(%u) unlock failed, because ddb del lock owner failed.\n",
lockMsg->resName, lockMsg->cmInstId);
return CM_RES_CLIENT_DDB_ERR;
}
write_runlog(LOG, "[CLIENT] res(%s) instance(%u) unlock success.\n", lockMsg->resName, lockMsg->cmInstId);
return CM_RES_CLIENT_SUCCESS;
}
static ClientError ResGetLockOwner(const char *resName, const char *lockName, uint32 &lockOwner)
{
if (GetLockOwner(resName, lockName, lockOwner) != CM_SUCCESS) {
write_runlog(LOG, "[CLIENT] get res(%s) (%s)lock owner failed.\n", resName, lockName);
return CM_RES_CLIENT_DDB_ERR;
}
if (lockOwner == 0) {
write_runlog(LOG, "[CLIENT] get res(%s) (%s)lock no owner.\n", resName, lockName);
return CM_RES_CLIENT_NO_LOCK_OWNER;
}
return CM_RES_CLIENT_SUCCESS;
}
static ClientError TransLockOwner(const CmaToCmsResLock *lockMsg)
{
const char *resName = lockMsg->resName;
const char *lockName = lockMsg->lockName;
uint32 resInstId = lockMsg->cmInstId;
uint32 newLockOwner = lockMsg->transInstId;
uint32 curLockOwner = 0;
if (GetLockOwner(resName, lockName, curLockOwner) != CM_SUCCESS) {
write_runlog(LOG, "[CLIENT] get (%s)lock owner failed, res(%s) can't trans lock.\n", lockName, resName);
return CM_RES_CLIENT_DDB_ERR;
}
if (curLockOwner != resInstId) {
write_runlog(LOG, "[CLIENT] res(%s) (%s)lock owner is inst(%u), inst(%u) can't trans lock.\n",
resName, lockName, curLockOwner, resInstId);
return CM_RES_CLIENT_CANNOT_DO;
}
if (!CanProcessResStatus()) {
write_runlog(LOG, "[%s], res status list invalid, can't continue.\n", __FUNCTION__);
return CM_RES_CLIENT_CANNOT_DO;
}
if (!IsOneResInstWork(resName, newLockOwner)) {
write_runlog(LOG, "[CLIENT] res(%s) inst(%u) get out of cluster, can't be lockOwner.\n", resName, newLockOwner);
return CM_RES_CLIENT_CANNOT_DO;
}
if (SetNewLockOwner(resName, lockName, curLockOwner, newLockOwner) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] res(%s) inst(%u) trans to inst(%u) failed, cause ddb failed.\n",
resName, resInstId, newLockOwner);
return CM_RES_CLIENT_DDB_ERR;
}
write_runlog(LOG, "[CLIENT] res(%s) inst(%u) trans to inst(%u) success.\n", resName, resInstId, newLockOwner);
return CM_RES_CLIENT_SUCCESS;
}
void ProcessCmResLock(MsgRecvInfo* recvMsgInfo, CmaToCmsResLock *lockMsg)
{
lockMsg->resName[CM_MAX_RES_NAME - 1] = '\0';
lockMsg->lockName[CM_MAX_LOCK_NAME - 1] = '\0';
CmsReportLockResult ackMsg = {0};
ackMsg.msgType = (int)MSG_CM_RES_LOCK_ACK;
ackMsg.conId = lockMsg->conId;
ackMsg.lockOpt = lockMsg->lockOpt;
ackMsg.lockOwner = 0;
errno_t rc = strcpy_s(ackMsg.lockName, CM_MAX_LOCK_NAME, lockMsg->lockName);
securec_check_errno(rc, (void)rc);
switch (lockMsg->lockOpt) {
case (uint32)CM_RES_LOCK: {
ackMsg.error = (uint32)CmResLock(lockMsg);
break;
}
case (uint32)CM_RES_UNLOCK: {
ackMsg.error = (uint32)CmResUnlock(lockMsg);
break;
}
case (uint32)CM_RES_GET_LOCK_OWNER: {
ackMsg.error = (uint32)ResGetLockOwner(lockMsg->resName, lockMsg->lockName, ackMsg.lockOwner);
break;
}
case (uint32)CM_RES_LOCK_TRANS: {
ackMsg.error = (uint32)TransLockOwner(lockMsg);
break;
}
default: {
write_runlog(ERROR, "[CLIENT] unknown lockOpt(%u).\n", lockMsg->lockOpt);
ackMsg.error = (uint32)CM_RES_CLIENT_CANNOT_DO;
break;
}
}
if (RespondMsg(recvMsgInfo, 'S', (char *)(&ackMsg), sizeof(CmsReportLockResult), DEBUG5) != 0) {
write_runlog(ERROR, "[CLIENT] send lock ack msg failed.\n");
}
}
static inline void CopyResStatusToSendMsg(OneResStatList *sendStat, CmResStatList *saveStat)
{
(void)pthread_rwlock_rdlock(&saveStat->rwlock);
errno_t rc = memcpy_s(sendStat, sizeof(OneResStatList), &saveStat->status, sizeof(OneResStatList));
securec_check_errno(rc, (void)rc);
(void)pthread_rwlock_unlock(&saveStat->rwlock);
}
void ProcessResInstanceStatusMsg(MsgRecvInfo* recvMsgInfo, const CmsToCtlGroupResStatus *queryStatusPtr)
{
CmsToCtlGroupResStatus instStatMsg = {0};
instStatMsg.msgType = (int)MSG_CM_QUERY_INSTANCE_STATUS;
if (queryStatusPtr->msgStep == QUERY_RES_STATUS_STEP) {
for (uint32 i = 0; i < CusResCount(); ++i) {
instStatMsg.msgStep = QUERY_RES_STATUS_STEP_ACK;
CopyResStatusToSendMsg(&instStatMsg.oneResStat, &g_resStatus[i]);
(void)RespondMsg(recvMsgInfo, 'S', (char*)&(instStatMsg), sizeof(instStatMsg), DEBUG5);
}
instStatMsg.msgStep = QUERY_RES_STATUS_STEP_ACK_END;
(void)RespondMsg(recvMsgInfo, 'S', (char*)&(instStatMsg), sizeof(instStatMsg), DEBUG5);
}
}
void ProcessQueryOneResInst(MsgRecvInfo* recvMsgInfo, const QueryOneResInstStat *queryMsg)
{
CmsToCtlOneResInstStat ackMsg = {0};
ackMsg.msgType = (int)MSG_CM_CTL_QUERY_RES_INST_ACK;
uint32 destInstId = queryMsg->instId;
for (uint32 i = 0; i < CusResCount(); ++i) {
for (uint32 j = 0; j < g_resStatus[i].status.instanceCount; ++j) {
if (g_resStatus[i].status.resStat[j].cmInstanceId != destInstId) {
continue;
}
(void)pthread_rwlock_rdlock(&g_resStatus[i].rwlock);
ackMsg.instStat = g_resStatus[i].status.resStat[j];
int instanceType = g_instance_role_group_ptr[i].instanceMember[j].instanceType;
if (instanceType == INSTANCE_TYPE_DATANODE && ackMsg.instStat.status == CM_RES_STAT_ONLINE &&
!g_enableWalRecord) {
const cm_instance_report_status *instStatus = &g_instance_group_report_status_ptr[i].instance_status;
int localStatus = instStatus->data_node_member[j].local_status.db_state;
int dnLocalRole = instStatus->data_node_member[j].local_status.local_role;
if ((dnLocalRole != INSTANCE_ROLE_PRIMARY && dnLocalRole != INSTANCE_ROLE_STANDBY &&
dnLocalRole != INSTANCE_ROLE_MAIN_STANDBY) || localStatus != INSTANCE_HA_STATE_NORMAL) {
ackMsg.instStat.status = CM_RES_STAT_UNKNOWN;
}
}
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
(void)RespondMsg(recvMsgInfo, 'S', (char*)&(ackMsg), sizeof(ackMsg), DEBUG5);
return;
}
}
write_runlog(ERROR, "unknown res instId(%u).\n", destInstId);
}
void ProcessQueryOneNodeStat(MsgRecvInfo* recvMsgInfo, const QueryOneNodeStat *queryMsg)
{
CmsToCtlOneNodeStat ackMsg = {0};
ackMsg.msgType = (int)MSG_CM_CTL_QUERY_NODE_ACK;
uint32 nodeId = queryMsg->nodeId;
for (uint32 i = 0; i < CusResCount(); ++i) {
for (uint32 j = 0; j < g_resStatus[i].status.instanceCount; ++j) {
if (g_resStatus[i].status.resStat[j].nodeId != nodeId) {
continue;
}
(void)pthread_rwlock_rdlock(&g_resStatus[i].rwlock);
ackMsg.isNodeOnline = (g_resStatus[i].status.resStat[j].status == CM_RES_STAT_ONLINE) ? true : false;
(void)pthread_rwlock_unlock(&g_resStatus[i].rwlock);
if (ackMsg.isNodeOnline == false) {
(void)RespondMsg(recvMsgInfo, 'S', (char*)&(ackMsg), sizeof(ackMsg), DEBUG5);
return;
}
}
}
(void)RespondMsg(recvMsgInfo, 'S', (char*)&(ackMsg), sizeof(ackMsg), DEBUG5);
write_runlog(LOG, "node %u status is online.\n", nodeId);
}
static bool IsregIsNotUnknown(ResInstIsreg *isregList, uint32 isregCount)
{
for (uint32 i = 0; i < isregCount; ++i) {
if (!IsRecvIsregStatValid(isregList[i].isreg)) {
write_runlog(ERROR, "recv inst(%u) isreg stat(%d) invalid.\n", isregList[i].cmInstId, isregList[i].isreg);
return false;
}
if (isregList[i].isreg == (int)CM_RES_ISREG_UNKNOWN) {
write_runlog(LOG, "recv inst(%u) isreg(%s).\n", isregList[i].cmInstId, GetIsregStatus(isregList[i].isreg));
return false;
}
}
return true;
}
void ProcessResIsregMsg(MsgRecvInfo *recvMsgInfo, CmaToCmsIsregMsg *isreg)
{
if (isreg->isregCount > CM_MAX_RES_INST_COUNT) {
write_runlog(ERROR, "recv isreg list count(%u) invalid, max(%d).\n", isreg->isregCount, CM_MAX_RES_INST_COUNT);
return;
}
bool needUpdateAgentCheckList = false;
UpdateResIsregStatusList(isreg->nodeId, isreg->isregList, isreg->isregCount, &needUpdateAgentCheckList);
if (needUpdateAgentCheckList) {
write_runlog(LOG, "recv check list is not right.\n");
CmsFlushIsregCheckList sendMsg = {0};
sendMsg.msgType = (int)MSG_CM_AGENT_ISREG_CHECK_LIST_CHANGED;
GetCheckListByNodeId(isreg->nodeId, sendMsg.checkList, &sendMsg.checkCount);
(void)RespondMsg(recvMsgInfo, 'S', (char*)&(sendMsg), sizeof(sendMsg), LOG);
} else {
if (IsregIsNotUnknown(isreg->isregList, isreg->isregCount)) {
CleanReportInter(isreg->nodeId);
}
}
}
void ResetResNodeStat()
{
if (g_resNodeStat == NULL) {
return;
}
for (uint32 i = 0; i < GetResNodeCount(); ++i) {
g_resNodeStat[i].isAvail = MAX_CLUSTER_STATUS_INIT;
}
}