* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* CM is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cma_process_messages_client.cpp
*
*
* IDENTIFICATION
* src/cm_agent/cma_process_messages_client.cpp
*
* -------------------------------------------------------------------------
*/
#include "cm/cm_elog.h"
#include "cma_connect.h"
#include "cma_connect_client.h"
#include "cma_common.h"
#include "cma_global_params.h"
#include "cma_instance_check.h"
#include "cma_status_check.h"
#include "cma_instance_management_res.h"
#include "cma_process_messages_client.h"
void NotifyClientConnectClose()
{
AgentToClientNotify cmaMsg = {{0}};
cmaMsg.head.msgType = (uint32)MSG_AGENT_CLIENT_NOTIFY_CONN_CLOSE;
cmaMsg.notify.isCmaConnClose = CM_TRUE;
PushMsgToAllClientSendQue((char*)&cmaMsg, sizeof(cmaMsg));
}
static void SendHeartbeatAckToClient(uint32 conId)
{
MsgHead hbAck = {0};
hbAck.msgType = MSG_AGENT_CLIENT_HEARTBEAT_ACK;
PushMsgToClientSendQue((char*)&hbAck, sizeof(MsgHead), conId);
}
static void SendStatusListToClient(CmResStatList &statList, uint32 conId, bool isNotifyChange)
{
AgentToClientResList sendList;
errno_t rc = memset_s(&sendList, sizeof(AgentToClientResList), 0, sizeof(AgentToClientResList));
securec_check_errno(rc, (void)rc);
if (isNotifyChange) {
sendList.head.msgType = (uint32)MSG_AGENT_CLIENT_RES_STATUS_CHANGE;
} else {
sendList.head.msgType = (uint32)MSG_AGENT_CLIENT_RES_STATUS_LIST;
}
(void)pthread_rwlock_rdlock(&(statList.rwlock));
rc = memcpy_s(&sendList.resStatusList, sizeof(OneResStatList), &statList.status, sizeof(OneResStatList));
securec_check_errno(rc, (void)rc);
(void)pthread_rwlock_unlock(&(statList.rwlock));
PrintCusInfoResList(&sendList.resStatusList, __FUNCTION__);
PushMsgToClientSendQue((char*)&sendList, sizeof(AgentToClientResList), conId);
}
static void SendLockFailAckToClient(uint32 conId)
{
AgentToClientResLockResult clientAck;
errno_t rc = memset_s(&clientAck, sizeof(AgentToClientResLockResult), 0, sizeof(AgentToClientResLockResult));
securec_check_errno(rc, (void)rc);
clientAck.head.msgType = MSG_CM_RES_LOCK_ACK;
clientAck.head.conId = conId;
clientAck.result.error = (uint32)CM_RES_CLIENT_CANNOT_DO;
PushMsgToClientSendQue((char*)&clientAck, sizeof(AgentToClientResLockResult), conId);
}
static void ProcessClientHeartbeat(const ClientHbMsg &hbMsg)
{
uint32 index = 0;
ClientConn *clientCon = GetClientConnect();
if (GetGlobalResStatusIndex(clientCon[hbMsg.head.conId].resName, index) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] ProcessClientHeartbeat, unknown the resName(%s) of client.\n",
clientCon[hbMsg.head.conId].resName);
return;
}
(void)pthread_rwlock_rdlock(&(g_resStatus[index].rwlock));
bool isResStatChanged = (hbMsg.version != g_resStatus[index].status.version);
(void)pthread_rwlock_unlock(&(g_resStatus[index].rwlock));
if (isResStatChanged) {
SendStatusListToClient(g_resStatus[index], hbMsg.head.conId, false);
} else {
SendHeartbeatAckToClient(hbMsg.head.conId);
}
}
static void ProcessInitMsg(const ClientInitMsg &initData)
{
AgentToClientInitResult sendMsg;
errno_t rc = memset_s(&sendMsg, sizeof(AgentToClientInitResult), 0, sizeof(AgentToClientInitResult));
securec_check_errno(rc, (void)rc);
sendMsg.head.msgType = (uint32)MSG_AGENT_CLIENT_INIT_ACK;
sendMsg.head.conId = initData.head.conId;
sendMsg.result.isSuccess = false;
ClientConn *clientCon = GetClientConnect();
for (const CmResConfList &resInfo : g_resConf) {
if ((strcmp(initData.resInfo.resName, resInfo.resName) == 0) && (g_currentNode->node == resInfo.nodeId) &&
initData.resInfo.resInstanceId == resInfo.resInstanceId) {
clientCon[initData.head.conId].cmInstanceId = resInfo.cmInstanceId;
clientCon[initData.head.conId].resInstanceId = resInfo.resInstanceId;
rc = strcpy_s(clientCon[initData.head.conId].resName, CM_MAX_RES_NAME, initData.resInfo.resName);
securec_check_errno(rc, (void)rc);
sendMsg.result.isSuccess = true;
break;
}
}
if (sendMsg.result.isSuccess) {
write_runlog(LOG, "[CLIENT] res(%s) init success.\n", initData.resInfo.resName);
} else {
write_runlog(LOG, "[CLIENT] res(%s) init failed, init cfg: nodeId(%u), resInstId(%u).\n",
initData.resInfo.resName, g_currentNode->node, initData.resInfo.resInstanceId);
}
PushMsgToClientSendQue((char*)&sendMsg, sizeof(AgentToClientInitResult), initData.head.conId);
}
static void GetResLockSendMsg(CmaToCmsResLock *sendMsg, const ClientCmLockMsg *lockMsg)
{
sendMsg->msgType = (int)MSG_CM_RES_LOCK;
sendMsg->lockOpt = lockMsg->info.lockOpt;
sendMsg->conId = lockMsg->head.conId;
const ClientConn *clientCon = GetClientConnect();
sendMsg->cmInstId = clientCon[sendMsg->conId].cmInstanceId;
errno_t rc = strcpy_s(sendMsg->resName, CM_MAX_RES_NAME, clientCon[sendMsg->conId].resName);
securec_check_errno(rc, (void)rc);
rc = strcpy_s(sendMsg->lockName, CM_MAX_LOCK_NAME, lockMsg->info.lockName);
securec_check_errno(rc, (void)rc);
}
static uint32 ResInstIdToCmInstId(const char *resName, uint32 resInstId)
{
uint32 index = 0;
if (GetGlobalResStatusIndex(resName, index) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] ProcessResStatusList, unknown the res(%s) of client.\n", resName);
return 0;
}
for (uint32 i = 0; i < g_resStatus[index].status.instanceCount; ++i) {
if (g_resStatus[index].status.resStat[i].resInstanceId == resInstId) {
return g_resStatus[index].status.resStat[i].cmInstanceId;
}
}
return 0;
}
static void ProcessCmResLock(ClientCmLockMsg *lockMsg)
{
CmaToCmsResLock sendMsg = {0};
GetResLockSendMsg(&sendMsg, lockMsg);
if (lockMsg->info.lockOpt == (uint32)CM_RES_LOCK_TRANS) {
sendMsg.transInstId = ResInstIdToCmInstId(sendMsg.resName, lockMsg->info.transInstId);
if (!IsResInstIdValid((int)sendMsg.transInstId)) {
write_runlog(ERROR, "[CLIENT] res instId(%u) is invalid, ack client!\n", sendMsg.transInstId);
SendLockFailAckToClient(lockMsg->head.conId);
return;
}
}
PushMsgToCmsSendQue((char*)&sendMsg, sizeof(CmaToCmsResLock), "res lock");
}
static void ProcessClientMsg(char *recvMsg)
{
const MsgHead *head = (MsgHead *)recvMsg;
switch (head->msgType) {
case MSG_CLIENT_AGENT_INIT_DATA: {
ClientInitMsg *initMsg = (ClientInitMsg *)recvMsg;
ProcessInitMsg(*initMsg);
break;
}
case MSG_CLIENT_AGENT_HEARTBEAT: {
ClientHbMsg *hbMsg = (ClientHbMsg *)recvMsg;
ProcessClientHeartbeat(*hbMsg);
break;
}
case MSG_CM_RES_LOCK: {
ClientCmLockMsg *lockMsg = (ClientCmLockMsg *)recvMsg;
ProcessCmResLock(lockMsg);
break;
}
default:
write_runlog(LOG, "[CLIENT] agent get unknown msg from client\n");
break;
}
}
void* ProcessMessageMain(void * const arg)
{
thread_name = "ProcessClientMsg";
write_runlog(LOG, "process client recv msg thread begin.\n");
for (;;) {
if (g_shutdownRequest || g_exitFlag) {
cm_sleep(SHUTDOWN_SLEEP_TIME);
continue;
}
MsgQueue &recvQueue = GetClientRecvQueue();
(void)pthread_mutex_lock(&recvQueue.lock);
while (recvQueue.msg.empty()) {
(void)pthread_cond_wait(&recvQueue.cond, &recvQueue.lock);
}
char *msg = recvQueue.msg.front().msgPtr;
recvQueue.msg.pop();
(void)pthread_mutex_unlock(&recvQueue.lock);
ProcessClientMsg(msg);
FreeBufFromMsgPool(msg);
}
return NULL;
}
static inline void UpdateResStatusList(CmResStatList *resStat, const OneResStatList *newStat)
{
(void)pthread_rwlock_wrlock(&(resStat->rwlock));
errno_t rc = memcpy_s(&resStat->status, sizeof(OneResStatList), newStat, sizeof(OneResStatList));
securec_check_errno(rc, (void)rc);
(void)pthread_rwlock_unlock(&(resStat->rwlock));
}
void ProcessResStatusList(const CmsReportResStatList *msg)
{
if (msg->resList.instanceCount > CM_MAX_RES_INST_COUNT) {
write_runlog(ERROR, "cms send to cma, custom resource instance count (%u) is unavail, range[0, %d].\n",
msg->resList.instanceCount, CM_MAX_RES_INST_COUNT);
return;
}
uint32 index = 0;
if (GetGlobalResStatusIndex(msg->resList.resName, index) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] ProcessResStatusList, unknown the res(%s) of client.\n", msg->resList.resName);
return;
}
UpdateResStatusList(&g_resStatus[index], &msg->resList);
PrintCusInfoResList(&msg->resList, __FUNCTION__);
}
void ProcessResStatusChanged(const CmsReportResStatList *msg)
{
ProcessResStatusList(msg);
uint32 index = 0;
if (GetGlobalResStatusIndex(msg->resList.resName, index) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] ProcessResStatusChanged, unknown the res(%s) of client.\n", msg->resList.resName);
return;
}
ClientConn *clientCon = GetClientConnect();
for (uint32 i = 0; i < CM_MAX_RES_COUNT; ++i) {
if (clientCon[i].isClosed || strcmp(clientCon[i].resName, msg->resList.resName) != 0) {
continue;
}
SendStatusListToClient(g_resStatus[index], i, true);
}
}
void ProcessResLockAckFromCms(const CmsReportLockResult *recvMsg)
{
AgentToClientResLockResult sendMsg;
errno_t rc = memset_s(&sendMsg, sizeof(AgentToClientResLockResult), 0, sizeof(AgentToClientResLockResult));
securec_check_errno(rc, (void)rc);
sendMsg.head.msgType = MSG_CM_RES_LOCK_ACK;
sendMsg.head.conId = recvMsg->conId;
sendMsg.result.error = recvMsg->error;
rc = strcpy_s(sendMsg.result.lockName, CM_MAX_LOCK_NAME, recvMsg->lockName);
securec_check_errno(rc, (void)rc);
ClientConn *clientCon = GetClientConnect();
if (recvMsg->lockOpt == (uint32)CM_RES_GET_LOCK_OWNER && recvMsg->error == 0) {
uint32 index = 0;
if (GetGlobalResStatusIndex(clientCon[sendMsg.head.conId].resName, index) != CM_SUCCESS) {
write_runlog(ERROR, "[CLIENT] ProcessResLockAckFromCms, unknown the res(%s) of client.\n",
clientCon[sendMsg.head.conId].resName);
return;
}
bool getFlag = false;
for (uint32 i = 0; i < g_resStatus[index].status.instanceCount; ++i) {
if (g_resStatus[index].status.resStat[i].cmInstanceId == recvMsg->lockOwner) {
sendMsg.result.lockOwner = g_resStatus[index].status.resStat[i].resInstanceId;
getFlag = true;
break;
}
}
if (!getFlag) {
sendMsg.result.lockOwner = 0;
sendMsg.result.error = (uint32)CM_RES_CLIENT_CANNOT_DO;
write_runlog(ERROR, "[CLIENT] unknown cmInstId %u.\n", recvMsg->lockOwner);
}
}
PushMsgToClientSendQue((char*)&sendMsg, sizeof(AgentToClientResLockResult), recvMsg->conId);
}
static void ProcessUnregResInst(const CmsNotifyAgentRegMsg *recvMsg)
{
CmResConfList *local = CmaGetResConfByResName(recvMsg->resName);
if (local == NULL) {
write_runlog(ERROR, "%s, get local res list failed.\n", __FUNCTION__);
return;
}
ResIsregStatus isreg = IsregOneResInst(local, recvMsg->resInstId);
if (isreg == CM_RES_ISREG_UNREG) {
write_runlog(LOG, "local res inst[%s:%u] has been unreg.\n", recvMsg->resName, recvMsg->resInstId);
} else if ((isreg == CM_RES_ISREG_REG) || (isreg == CM_RES_ISREG_PENDING) ||
(recvMsg->resStat == CM_RES_ISREG_REG) || (recvMsg->resStat == CM_RES_ISREG_PENDING)) {
(void)UnregOneResInst(local, recvMsg->resInstId);
} else if (isreg == CM_RES_ISREG_NOT_SUPPORT) {
write_runlog(LOG, "res inst[%s:%u] don't support reg, not need unreg.\n", recvMsg->resName, recvMsg->resInstId);
} else {
write_runlog(ERROR, "res inst[%s:%u] isreg:%s, can't do unreg.\n", recvMsg->resName, recvMsg->resInstId,
GetIsregStatus((int)isreg));
}
}
static void ProcessRegResInst(const CmsNotifyAgentRegMsg *recvMsg)
{
if (g_currentNode->node != recvMsg->nodeId) {
return;
}
CmResConfList *local = CmaGetResConfByResName(recvMsg->resName);
if (local == NULL) {
write_runlog(ERROR, "%s, get local res list failed.\n", __FUNCTION__);
return;
}
ResIsregStatus isreg = IsregOneResInst(local, recvMsg->resInstId);
if (isreg == CM_RES_ISREG_PENDING || recvMsg->resStat == CM_RES_ISREG_PENDING) {
write_runlog(LOG, "res inst[%s:%u] is pending, local check isreg:%s, cms check isreg:%s.\n", recvMsg->resName, recvMsg->resInstId,
GetIsregStatus((int)isreg), GetIsregStatus((int)recvMsg->resStat));
(void)RegOneResInst(local, recvMsg->resInstId, CM_TRUE);
} else if (isreg == CM_RES_ISREG_REG) {
write_runlog(LOG, "local res inst[%s:%u] has been reg.\n", recvMsg->resName, recvMsg->resInstId);
} else if ((isreg == CM_RES_ISREG_UNREG) || (isreg == CM_RES_ISREG_UNKNOWN)) {
write_runlog(LOG, "This res is abnormaly, before reg res inst, need clean res inst first.\n");
if ((CheckOneResInst(local) == CUS_RES_CHECK_STAT_OFFLINE) || (CleanOneResInst(local) == CM_SUCCESS)) {
(void)RegOneResInst(local, recvMsg->resInstId, CM_TRUE);
}
} else if (isreg == CM_RES_ISREG_NOT_SUPPORT) {
write_runlog(LOG, "res inst[%s:%u] don't support reg, not need reg.\n", recvMsg->resName, recvMsg->resInstId);
} else {
write_runlog(ERROR, "res inst[%s:%u] isreg:%s, can't do reg.\n", recvMsg->resName, recvMsg->resInstId,
GetIsregStatus((int)isreg));
}
}
void ProcessResRegFromCms(const CmsNotifyAgentRegMsg *recvMsg)
{
switch (recvMsg->resMode) {
case 0:
ProcessUnregResInst(recvMsg);
break;
case 1:
ProcessRegResInst(recvMsg);
break;
default:
write_runlog(ERROR, "ProcessResRegFromCms, unknown res mode.\n");
break;
}
}
void ProcessIsregCheckListChanged(const CmsFlushIsregCheckList *recvMsg)
{
write_runlog(LOG, "node(%u) report isreg list is wrong, need update.\n", g_currentNode->node);
UpdateIsregCheckList(recvMsg->checkList, recvMsg->checkCount);
}