* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* CM is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cms_msg_que.cpp
*
*
* IDENTIFICATION
* src/cm_server/cms_msg_que.cpp
*
* -------------------------------------------------------------------------
*/
#include "cm_c.h"
#include "cm_util.h"
#include "cm_misc_base.h"
#include "cm_msg_buf_pool.h"
#include "cms_msg_que.h"
static wakeSenderFuncType wakeSenderFunc = NULL;
static CanProcThisMsgFunType CanProcThisMsgFun = NULL;
void InitMsgQue(PriMsgQues &que)
{
for (int i = 0; i < (int)MSG_SRC_COUNT; i++) {
CMFairMutexInit(que.ques[i].fairLock);
}
(void)pthread_mutex_init(&que.msgLock, NULL);
InitPthreadCondMonotonic(&que.msgCond);
}
void setWakeSenderFunc(wakeSenderFuncType func)
{
wakeSenderFunc = func;
}
void SetCanProcThisMsgFun(CanProcThisMsgFunType func)
{
CanProcThisMsgFun = func;
}
size_t getMsgCount(PriMsgQues *priQue)
{
size_t count = 0;
for (int i = 0; i < (int)MSG_SRC_COUNT; i++) {
MsgQuePtr que = &priQue->ques[i].que;
count += que->size();
}
return count;
}
bool existMsg(const PriMsgQues *priQue)
{
for (int i = 0; i < (int)MSG_SRC_COUNT; i++) {
ConstMsgQuePtr que = &priQue->ques[i].que;
if (!que->empty()) {
return true;
}
}
return false;
}
bool pushRecvMsg(PriMsgQues *priQue, MsgRecvInfo *msg, MsgSourceType src)
{
Assert(src >= 0 && src < MSG_SRC_COUNT);
(void)CMFairMutexLock(priQue->ques[src].fairLock, CMFairMutexType::CM_MUTEX_WRITE);
if (priQue->ques[src].que.size() >= MAX_MSG_IN_QUE) {
CMFairMutexUnLock(priQue->ques[src].fairLock);
return false;
}
msg->connID.t2 = GetMonotonicTimeMs();
priQue->ques[src].que.push_back((const char *)msg);
CMFairMutexUnLock(priQue->ques[src].fairLock);
(void)pthread_cond_broadcast(&priQue->msgCond);
return true;
}
static const MsgRecvInfo *getRecvMsgInner(PriMsgQues *priQue, MsgSourceType src, void *threadInfo)
{
Assert(src >= 0 && src < MSG_SRC_COUNT);
MsgRecvInfo *msg = NULL;
uint64 t3 = GetMonotonicTimeMs();
if (!existMsg(priQue)) {
return NULL;
}
for (int i = 0; i < (int)MSG_SRC_COUNT; i++) {
MsgQuePtr que = &priQue->ques[src].que;
(void)CMFairMutexLock(priQue->ques[src].fairLock, CMFairMutexType::CM_MUTEX_READ);
MsgQueType::iterator it = que->begin();
for (; it != que->end(); ++it) {
if (CanProcThisMsgFun == NULL || CanProcThisMsgFun(threadInfo, *it)) {
msg = (MsgRecvInfo *)*it;
(void)que->erase(it);
msg->connID.t3 = t3;
msg->connID.t4 = GetMonotonicTimeMs();
break;
}
}
CMFairMutexUnLock(priQue->ques[src].fairLock);
if (msg != NULL) {
break;
}
src = (src == MsgSrcAgent) ? MsgSrcCtl : MsgSrcAgent;
}
return msg;
}
const MsgRecvInfo *getRecvMsg(PriMsgQues *priQue, MsgSourceType src, uint32 waitTime, void *threadInfo)
{
struct timespec tv;
if (priQue == NULL) {
return NULL;
}
const MsgRecvInfo* msg = getRecvMsgInner(priQue, src, threadInfo);
if (msg == NULL && waitTime > 0) {
(void)clock_gettime(CLOCK_MONOTONIC, &tv);
tv.tv_sec = tv.tv_sec + (long long)waitTime;
(void)pthread_mutex_lock(&priQue->msgLock);
(void)pthread_cond_timedwait(&priQue->msgCond, &priQue->msgLock, &tv);
(void)pthread_mutex_unlock(&priQue->msgLock);
}
return msg;
}
void pushSendMsg(PriMsgQues *priQue, MsgSendInfo *msg, MsgSourceType src)
{
Assert(src >= 0 && src < MSG_SRC_COUNT);
ConnID connID = msg->connID;
(void)CMFairMutexLock(priQue->ques[src].fairLock, CMFairMutexType::CM_MUTEX_WRITE);
priQue->ques[src].que.push_back((const char*)msg);
msg->connID.t6 = GetMonotonicTimeMs();
CMFairMutexUnLock(priQue->ques[src].fairLock);
if (wakeSenderFunc != NULL) {
wakeSenderFunc(connID);
}
}
const MsgSendInfo *getSendMsg(PriMsgQues *priQue, MsgSourceType src)
{
const MsgSendInfo *msg = NULL;
if (!existMsg(priQue)) {
return NULL;
}
uint64 now = GetMonotonicTimeMs();
for (int i = 0; i < (int)MSG_SRC_COUNT; i++) {
MsgQuePtr que = &priQue->ques[src].que;
(void)CMFairMutexLock(priQue->ques[src].fairLock, CMFairMutexType::CM_MUTEX_READ);
MsgQueType::iterator it = que->begin();
for (; it != que->end(); ++it) {
MsgSendInfo *sendMsg = (MsgSendInfo *)(*it);
if (sendMsg->procTime == 0 || sendMsg->procTime <= now) {
msg = sendMsg;
(void)que->erase(it);
sendMsg->connID.t7 = now;
sendMsg->connID.t8 = GetMonotonicTimeMs();
break;
}
}
CMFairMutexUnLock(priQue->ques[src].fairLock);
if (msg != NULL) {
break;
}
src = (src == MsgSrcAgent) ? MsgSrcCtl : MsgSrcAgent;
}
return msg;
}
bool existSendMsg(const PriMsgQues *priQue)
{
return existMsg(priQue);
}