* Copyright (c) 2022 Huawei Technologies Co.,Ltd.
*
* CM is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cma_msg_queue.cpp
*
* IDENTIFICATION
* src/cm_agent/cma_msg_queue.cpp
*
* -------------------------------------------------------------------------
*/
#include <csignal>
#include "cm_elog.h"
#include "cm_msg_buf_pool.h"
#include "cma_connect.h"
#include "cma_connect_client.h"
#include "cma_msg_queue.h"
typedef struct AgentMsgQueueSt {
MsgQueue cms;
MsgQueue client;
} AgentMsgQueue;
AgentMsgQueue *g_sendQueue = NULL;
AgentMsgQueue *g_recvQueue = NULL;
pthread_t g_recvSendThreadId = 0;
static const uint32 MSG_QUEUE_MAX_COUNT = 512;
void AllocCmaMsgQueueMemory()
{
g_sendQueue = new AgentMsgQueue();
g_recvQueue = new AgentMsgQueue();
}
void FreeMsgQueueMemory()
{
delete g_sendQueue;
g_sendQueue = NULL;
delete g_recvQueue;
g_recvQueue = NULL;
}
static char *GetMsgBufAndFillBuf(const char *msgPtr, uint32 msgLen)
{
char *ptr = (char*)AllocBufFromMsgPool(msgLen);
if (ptr == NULL) {
write_runlog(ERROR, "AllocBufFromMsgPool failed.\n");
return NULL;
}
errno_t rc = memcpy_s(ptr, msgLen, msgPtr, msgLen);
securec_check_errno(rc, (void)rc);
return ptr;
}
static inline void WakeCmaSendThread()
{
if (g_recvSendThreadId == 0) {
write_runlog(LOG, "recvSendThread not ready, can't wakeup.\n");
return;
}
if (pthread_kill(g_recvSendThreadId, SIGUSR1) != 0) {
write_runlog(ERROR, "send SIGUSR1 sig to recv and send thread fail.\n");
}
}
static inline bool PushToAgentMsgQue(const AgentMsgPkg *msgPkg, MsgQueue *msgQue)
{
(void)pthread_mutex_lock(&msgQue->lock);
if (msgQue->msg.size() >= MSG_QUEUE_MAX_COUNT) {
(void)pthread_mutex_unlock(&msgQue->lock);
return false;
}
msgQue->msg.push(*msgPkg);
(void)pthread_mutex_unlock(&msgQue->lock);
(void)pthread_cond_signal(&msgQue->cond);
return true;
}
bool PushMsgToCmsSendQue(const char *msgPtr, uint32 msgLen, const char *msgInfo)
{
if (msgPtr != NULL && msgLen >= sizeof(int32) && *(int32*)msgPtr == 0) {
write_runlog(LOG, "%s msgPtr is 0. it may be error.\n", msgInfo);
}
AgentMsgPkg msgPkg = {0};
msgPkg.msgLen = msgLen;
msgPkg.msgPtr = GetMsgBufAndFillBuf(msgPtr, msgLen);
if (msgPkg.msgPtr == NULL) {
return false;
}
write_runlog(DEBUG5, "push [%s] msg to send que:msgLen=%u.\n", msgInfo, msgPkg.msgLen);
if (!PushToAgentMsgQue(&msgPkg, &g_sendQueue->cms)) {
write_runlog(ERROR, "[CLIENT] cms send queue is full, drop msg.\n");
FreeBufFromMsgPool(msgPkg.msgPtr);
return false;
}
WakeCmaSendThread();
return true;
}
void PushMsgToAllClientSendQue(const char *msgPtr, uint32 msgLen)
{
ClientConn *clientConn = GetClientConnect();
for (uint32 i = 0; i < CM_MAX_RES_COUNT; ++i) {
if (!clientConn[i].isClosed) {
write_runlog(LOG, "notify inst(%u), CMA disconnect with CMS.\n", clientConn[i].cmInstanceId);
PushMsgToClientSendQue(msgPtr, msgLen, i);
}
}
}
void PushMsgToClientSendQue(const char *msgPtr, uint32 msgLen, uint32 conId)
{
AgentMsgPkg msgPkg = {0};
msgPkg.msgLen = msgLen;
msgPkg.conId = conId;
msgPkg.msgPtr = GetMsgBufAndFillBuf(msgPtr, msgLen);
CM_RETURN_IF_NULL(msgPkg.msgPtr);
const char *resName = GetClientConnect()[conId].resName;
write_runlog(DEBUG5, "push msg to res(%s) client send que:msgLen=%u.\n", resName, msgPkg.msgLen);
if (!PushToAgentMsgQue(&msgPkg, &g_sendQueue->client)) {
write_runlog(ERROR, "[CLIENT] client send queue is full, drop msg.\n");
FreeBufFromMsgPool(msgPkg.msgPtr);
}
}
void PushMsgToCmsRecvQue(const char *msgPtr, uint32 msgLen)
{
AgentMsgPkg msgPkg = {0};
msgPkg.msgLen = msgLen;
msgPkg.msgPtr = GetMsgBufAndFillBuf(msgPtr, msgLen);
CM_RETURN_IF_NULL(msgPkg.msgPtr);
write_runlog(DEBUG5, "push msg to recv que:msgLen=%u.\n", msgPkg.msgLen);
if (!PushToAgentMsgQue(&msgPkg, &g_recvQueue->cms)) {
write_runlog(ERROR, "[CLIENT] cms recv queue is full, drop msg.\n");
FreeBufFromMsgPool(msgPkg.msgPtr);
}
}
bool PushMsgToClientRecvQue(const char *msgPtr, uint32 msgLen, uint32 conId)
{
AgentMsgPkg msgPkg = {0};
msgPkg.msgLen = msgLen;
msgPkg.conId = conId;
msgPkg.msgPtr = GetMsgBufAndFillBuf(msgPtr, msgLen);
if (msgPkg.msgPtr == NULL) {
return false;
}
const char *resName = GetClientConnect()[conId].resName;
write_runlog(DEBUG5, "push msg to res(%s) client recv que:msgLen=%u.\n", resName, msgPkg.msgLen);
if (!PushToAgentMsgQue(&msgPkg, &g_recvQueue->client)) {
write_runlog(ERROR, "[CLIENT] client recv queue is full, drop msg.\n");
FreeBufFromMsgPool(msgPkg.msgPtr);
return false;
}
return true;
}
void CleanCmsMsgQueueCore(AgentMsgQueue *msgQueue)
{
(void)pthread_mutex_lock(&msgQueue->cms.lock);
while (!msgQueue->cms.msg.empty()) {
FreeBufFromMsgPool(msgQueue->cms.msg.front().msgPtr);
msgQueue->cms.msg.pop();
}
(void)pthread_mutex_unlock(&msgQueue->cms.lock);
}
void CleanClientMsgQueueCore(AgentMsgQueue *msgQueue, uint32 conId)
{
(void)pthread_mutex_lock(&msgQueue->client.lock);
queue<AgentMsgPkg> newQue;
while (!msgQueue->client.msg.empty()) {
if (msgQueue->client.msg.front().conId == conId) {
FreeBufFromMsgPool(msgQueue->client.msg.front().msgPtr);
msgQueue->client.msg.pop();
continue;
}
newQue.push(msgQueue->client.msg.front());
msgQueue->client.msg.pop();
}
swap(msgQueue->client.msg, newQue);
(void)pthread_mutex_unlock(&msgQueue->client.lock);
}
void CleanCmsMsgQueue()
{
CleanCmsMsgQueueCore(g_sendQueue);
CleanCmsMsgQueueCore(g_recvQueue);
}
void CleanClientMsgQueue(uint32 conId = 0)
{
CleanClientMsgQueueCore(g_sendQueue, conId);
CleanClientMsgQueueCore(g_recvQueue, conId);
}
static void CleanAllClientMsgQueueCore(MsgQueue *clientQueue)
{
(void)pthread_mutex_lock(&clientQueue->lock);
while (!clientQueue->msg.empty()) {
FreeBufFromMsgPool(clientQueue->msg.front().msgPtr);
clientQueue->msg.pop();
}
(void)pthread_mutex_unlock(&clientQueue->lock);
}
void CleanAllClientRecvMsgQueue()
{
CleanAllClientMsgQueueCore(&g_recvQueue->client);
}
void CleanAllClientSendMsgQueue()
{
CleanAllClientMsgQueueCore(&g_sendQueue->client);
}
MsgQueue &GetCmsSendQueue()
{
return g_sendQueue->cms;
}
MsgQueue &GetCmsRecvQueue()
{
return g_recvQueue->cms;
}
MsgQueue &GetClientSendQueue()
{
return g_sendQueue->client;
}
MsgQueue &GetClientRecvQueue()
{
return g_recvQueue->client;
}
pthread_t &GetSendRecvThreadId()
{
return g_recvSendThreadId;
}
bool IsCmsSendQueueEmpty()
{
return g_sendQueue->cms.msg.empty();
}
void AllQueueInit()
{
(void)pthread_mutex_init(&g_sendQueue->cms.lock, NULL);
(void)pthread_cond_init(&g_sendQueue->cms.cond, NULL);
(void)pthread_mutex_init(&g_sendQueue->client.lock, NULL);
(void)pthread_cond_init(&g_sendQueue->client.cond, NULL);
(void)pthread_mutex_init(&g_recvQueue->cms.lock, NULL);
(void)pthread_cond_init(&g_recvQueue->cms.cond, NULL);
(void)pthread_mutex_init(&g_recvQueue->client.lock, NULL);
(void)pthread_cond_init(&g_recvQueue->client.cond, NULL);
}