* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include <gtest/gtest.h>
#include <mockcpp/mockcpp.hpp>
#include <mockcpp/ChainingMockHelper.h>
#include <gtest/gtest_pred_impl.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "mmpa_api.h"
#ifdef __cplusplus
#if __cplusplus
extern "C" {
#endif
#endif
mmCond cond;
mmMutexFC mtxfc;
int socketFlag = 0;
INT32 pollFlag = 0;
int pipeFlag = 0;
int namedpipeFlag = 0;
#define BUFFER 255
#define PERM S_IREAD | S_IWRITE
char g_tmpStr[BUFFER+1] = "hello msg queue!";
mmKey_t g_key = 0x12121212;
mmThreadKey g_thread_log_key;
struct msgtype {
long mtype;
char buffer[BUFFER+1];
};
int utFilter(const struct dirent *entry)
{
return entry->d_name[0] == 't';
}
void signal_action(int arg)
{
printf("Receive the shutdown singal\n");
}
void *thread_action(void* arg)
{
printf("install the signal slot\n");
signal(SIGINT, signal_action);
while (1) {
sleep(1);
}
}
VOID *tlsTestThread1(VOID *p)
{
char *ptrResult = NULL;
mmThreadKey keyTmp = 121212;
char thread_log_filename[20];
sprintf_s(thread_log_filename, sizeof(thread_log_filename), "tlsTestThread%d.log", 1);
mmTlsSet(keyTmp, thread_log_filename);
ptrResult = (char*)mmTlsGet(keyTmp);
mmTlsSet(g_thread_log_key, thread_log_filename);
ptrResult = (char*)mmTlsGet(g_thread_log_key);
if (ptrResult) {
printf("thread1 result is %s\n", ptrResult);
}
return NULL;
}
VOID *tlsTestThread2(VOID *p)
{
char thread_log_filename[20];
char *ptrResult = NULL;
sprintf_s(thread_log_filename, sizeof(thread_log_filename), "tlsTestThread%d.log", 2);
mmTlsSet(g_thread_log_key, thread_log_filename);
ptrResult = (char*)mmTlsGet(g_thread_log_key);
if (ptrResult) {
printf("thread2 result is %s\n", ptrResult);
}
return NULL;
}
VOID* msgqueue_server(VOID* p)
{
struct msgtype msg;
mmMsgid msgid;
if ((msgid = mmMsgCreate(g_key, PERM|M_MSG_CREAT)) == (mmMsgid)EN_ERROR) {
fprintf(stderr, "Creat Message Error??%s\a\n", strerror(errno));
return NULL;
}
msg.mtype = 1;
strncpy_s(msg.buffer, sizeof(msg.buffer), g_tmpStr, BUFFER);
printf("server:start mmMsgSnd\n");
mmMsgSnd(msgid, NULL, 0, M_MSG_NOWAIT);
mmMsgSnd(msgid, &msg, sizeof(struct msgtype), M_MSG_NOWAIT);
sleep(2);
mmMsgClose(msgid);
return NULL;
}
VOID* msgqueue_client(VOID* p)
{
struct msgtype msg;
mmMsgid msgid;
if ((msgid = mmMsgOpen(g_key, PERM|M_MSG_CREAT)) == (mmMsgid)EN_ERROR) {
fprintf(stderr, "Creat Message Error??%s\a\n", strerror(errno));
return NULL;
}
msg.mtype = 0;
mmMsgRcv(msgid, NULL, sizeof(struct msgtype), 0);
mmMsgRcv(msgid, &msg, sizeof(struct msgtype), 0);
printf("client:recv msg:%s\n", msg.buffer);
mmMsgClose(msgid);
return NULL;
}
INT32 getIdleSocketid()
{
for (INT32 i = 50000; i <= 55000; i++) {
CHAR port[50];
sprintf_s(port, sizeof(port), "netstat -np| grep -v STREAM | grep %d", i);
INT32 ret = system(port);
if (0 != ret) {
printf("i = %d\n", i);
return i;
}
}
}
void pollDataCallback(pmmPollData pPolledData)
{
if (pPolledData == NULL || pPolledData->buf == NULL || pPolledData->bufRes <= 0) {
printf("pPolledData is NULL\n");
return;
}
printf("pPolledData->buf is %s\n", pPolledData->buf);
printf("pPolledData->bufRes is %d\n", pPolledData->bufRes);
switch (pPolledData->bufType) {
case pollTypeIoctl:
printf("pPolledData->bufType is %d\n", pPolledData->bufType);
break;
case pollTypeRead:
printf("pPolledData->bufType is %d\n", pPolledData->bufType);
break;
case pollTypeRecv:
printf("pPolledData->bufType is %d\n", pPolledData->bufType);
break;
}
}
VOID* UTtest_callback(VOID* pstArg)
{
printf("UTtest_callback run success\n");
char threadName[MMPA_THREADNAME_SIZE] = "UTCALLBACK";
char getName[MMPA_THREADNAME_SIZE] = {};
(void)mmSetCurrentThreadName(threadName);
(void)mmGetCurrentThreadName(getName, MMPA_THREADNAME_SIZE);
printf("mmGetCurrentThreadName name is %s\n", getName);
mmSleep(100);
return NULL;
}
VOID* poll_server_pipe(VOID* p)
{
char *fifo_name[MMPA_PIPE_COUNT] = {"../tests/ut/mmpa/ut_readfifo", "../tests/ut/mmpa/ut_writefifo"};
int res = 0;
const int mode = 0;
mmPipeHandle pipe[MMPA_PIPE_COUNT];
res = mmCreateNamedPipe(pipe, fifo_name, mode);
pipeFlag = 1;
if (res != -1 || pipe[0] != 0) {
mmPollfd fds[10];
memset_s(fds, sizeof(fds), 0, sizeof(fds));
fds[0].handle = pipe[0];
fds[0].pollType = pollTypeRead;
mmPollData polledData;
memset_s(&polledData, sizeof(mmPollData), 0, sizeof(mmPollData));
char buf[1024] = {0};
polledData.buf = buf;
polledData.bufLen = 1024;
mmCompletionHandle handleIOCP = mmCreateCompletionPort();
while (1) {
INT32 bResult = mmPoll(fds, 1, 5000, handleIOCP, &polledData, pollDataCallback);
if (bResult < 0) {
printf("mmPoll return, fd is closed!\n");
break;
}
polledData.bufLen = 0;
}
polledData.bufLen = 1024;
while (1) {
INT32 bResult = mmPoll(fds, 1, 100000, handleIOCP, &polledData, pollDataCallback);
if (bResult < 0) {
printf("mmPoll return, fd is closed!\n");
break;
}
}
mmCloseNamedPipe(pipe);
}
return NULL;
}
VOID* poll_client_pipe(VOID* p)
{
char *fifo_name[MMPA_PIPE_COUNT] = {"../tests/ut/mmpa/ut_readfifo", "../tests/ut/mmpa/ut_writefifo"};
int res = 0;
const int mode = 0;
char buffer[PIPE_BUF + 1];
memset_s(buffer, sizeof(buffer), '\0', sizeof(buffer));
mmPipeHandle pipe[MMPA_PIPE_COUNT];
while (true) {
if (0 != pipeFlag) {
res = mmOpenNamePipe(pipe, fifo_name, mode);
break;
}
}
if (res != -1) {
snprintf_s(buffer, sizeof(buffer), 64, "the message :hello pipe!");
res = write(pipe[0], buffer, strlen(buffer));
if (res == -1) {
fprintf(stderr, "Write error on pipe\n");
return NULL;
}
sleep(2);
res = write(pipe[0], buffer, strlen(buffer));
if (res == -1) {
fprintf(stderr, "Write error on pipe\n");
return NULL;
}
sleep(2);
res = write(pipe[0], buffer, strlen(buffer));
if (res == -1) {
fprintf(stderr, "Write error on pipe\n");
return NULL;
}
mmCloseNamedPipe(pipe);
unlink(fifo_name[0]);
unlink(fifo_name[1]);
}
return NULL;
}
VOID* poll_server_namepipe(VOID* p)
{
char *fifo_name[MMPA_PIPE_COUNT] = {"../tests/ut/mmpa/readpipe", "../tests/ut/mmpa/writepipe"};
int res = 0;
const int mode = 0;
int bytes_sent = 0;
char buffer[64] = {};
unlink(fifo_name[0]);
unlink(fifo_name[1]);
mmPipeHandle pipe[MMPA_PIPE_COUNT] = {0};
res = mmCreatePipe(pipe, fifo_name, MMPA_PIPE_COUNT, mode);
if (res == 0) {
printf("mmCreatePipe success\n");
namedpipeFlag = 1;
} else {
printf("mmCreatePipe failed\n");
return NULL;
}
if (res != -1 && pipe[0] > 0) {
mmPollfd fds[10];
memset_s(fds, sizeof(fds), 0, sizeof(fds));
fds[0].handle = pipe[0];
fds[0].pollType = pollTypeRead;
mmPollData polledData;
memset_s(&polledData, sizeof(mmPollData), 0, sizeof(mmPollData));
char buf[1024] = {0};
polledData.buf = buf;
polledData.bufLen = 1024;
INT32 bResult = 0;
mmCompletionHandle handleIOCP = mmCreateCompletionPort();
while (1) {
INT32 bResult = mmPoll(fds, 1, 5000, handleIOCP, &polledData, pollDataCallback);
if (bResult < 0) {
printf("mmPoll return, fd is closed!\n");
break;
}
polledData.bufLen = 0;
}
polledData.bufLen = 1024;
while (1) {
INT32 bResult = mmPoll(fds, 1, 10000, handleIOCP, &polledData, pollDataCallback);
if (bResult < 0) {
printf("mmPoll return, fd is closed!\n");
break;
}
}
mmClosePipe(pipe, MMPA_PIPE_COUNT);
}
return NULL;
}
VOID* poll_client_namepipe(VOID* p)
{
char *fifo_name[MMPA_PIPE_COUNT] = {"../tests/ut/mmpa/readpipe", "../tests/ut/mmpa/writepipe"};
int count = 5;
int res = 0;
const int mode = 0;
char buffer[PIPE_BUF + 1];
memset_s(buffer, sizeof(buffer), '\0', sizeof(buffer));
mmPipeHandle pipe[MMPA_PIPE_COUNT] = {0};
while (count--) {
if (0 != namedpipeFlag) {
res = mmOpenPipe(pipe, fifo_name, MMPA_PIPE_COUNT, mode);
if (res == 0) {
printf("mmOpenPipe success\n");
break;
}
}
sleep(1);
}
if (res != -1) {
snprintf_s(buffer, sizeof(buffer), 64, "the message :hello namepipe!");
res = write(pipe[0], buffer, strlen(buffer));
if (res == -1) {
fprintf(stderr, "Write error on pipe\n");
return NULL;
}
sleep(2);
res = write(pipe[0], buffer, strlen(buffer));
if (res == -1) {
fprintf(stderr, "Write error on pipe\n");
return NULL;
}
sleep(2);
res = write(pipe[0], buffer, strlen(buffer));
if (res == -1) {
fprintf(stderr, "Write error on pipe\n");
return NULL;
}
mmClosePipe(pipe, MMPA_PIPE_COUNT);
unlink(fifo_name[0]);
unlink(fifo_name[1]);
}
return NULL;
}
VOID* poll_server_socket(VOID* p)
{
mmSockHandle listenfd;
mmSockHandle connfd;
struct sockaddr_in serv_add;
mmSocklen_t stAddrLen = sizeof(serv_add);
char recvMsg[50] = {0};
INT32 recvResult = 0;
listenfd = mmSocket(AF_INET, SOCK_STREAM, 0);
memset_s(&serv_add, sizeof(serv_add), '0', sizeof(serv_add));
serv_add.sin_family = AF_INET;
serv_add.sin_addr.s_addr = 0;
serv_add.sin_port = htons(*(INT32 *)p);
mmBind(listenfd, (mmSockAddr*)&serv_add, stAddrLen);
mmListen(listenfd, 5);
pollFlag = 1;
while (true) {
connfd = mmAccept(listenfd, (mmSockAddr*)NULL, NULL);
if (connfd >= 0) {
mmPollfd fds[10];
memset_s(fds, sizeof(fds), 0, sizeof(fds));
fds[0].handle = connfd;
fds[0].pollType = pollTypeRecv;
mmPollData polledData;
memset_s(&polledData, sizeof(mmPollData), 0, sizeof(mmPollData));
char buf[1024] = {0};
polledData.buf = buf;
polledData.bufLen = 1024;
INT32 bResult = 0;
mmCompletionHandle handleIOCP = mmCreateCompletionPort();
while (true) {
INT32 bResult = mmPoll(fds, 1, 10000, handleIOCP, &polledData, pollDataCallback);
if (bResult < 0) {
printf("mmPoll return, fd is closed!\n");
break;
}
}
mmCloseCompletionPort(handleIOCP);
sleep(1);
mmCloseSocket(connfd);
break;
}
}
mmCloseSocket(listenfd);
return NULL;
}
VOID* poll_client_socket(VOID* p)
{
mmSockHandle sockfd;
struct sockaddr_in serv_add;
CHAR sendMsg[50] = {"hello.mmpa!"};
INT32 sendResult = 0;
memset_s(&serv_add, sizeof(serv_add), '0', sizeof(serv_add));
serv_add.sin_family = AF_INET;
inet_aton("127.0.0.1", (struct in_addr *)&serv_add.sin_addr);
serv_add.sin_port = htons(*(INT32 *)p);
sockfd = mmSocket(AF_INET, SOCK_STREAM, 0);
while (true) {
if (0 != pollFlag) {
mmConnect(sockfd, (mmSockAddr*)&serv_add, sizeof(serv_add));
sendResult = mmSocketSend(sockfd, (VOID*)sendMsg, 50, 0);
printf("The send msg length is %d.\r\n", sendResult);
break;
}
mmSleep(1000);
}
mmSleep(1000);
mmCloseSocket(sockfd);
return NULL;
}
VOID* server_socket(VOID* p)
{
mmSockHandle listenfd;
mmSockHandle connfd;
struct sockaddr_in serv_add;
mmSocklen_t stAddrLen = sizeof(serv_add);
char recvMsg[50] = {0};
INT32 recvResult = 0;
listenfd = mmSocket(AF_INET, SOCK_STREAM, 0);
memset_s(&serv_add, sizeof(serv_add), '0', sizeof(serv_add));
serv_add.sin_family = AF_INET;
serv_add.sin_addr.s_addr = 0;
serv_add.sin_port = htons(*(INT32 *)p);
mmBind(listenfd, (mmSockAddr*)&serv_add, stAddrLen);
mmListen(listenfd, 5);
socketFlag = 1;
while (true) {
connfd = mmAccept(listenfd, (mmSockAddr*)NULL, NULL);
if (connfd >= 0) {
sleep(1);
recvResult = mmSocketRecv(connfd, (VOID*)recvMsg, 50, 0);
printf("The recv msg length is %d; the msg is %s\r\n", recvResult, recvMsg);
sleep(1);
mmCloseSocket(connfd);
break;
}
}
mmCloseSocket(listenfd);
return NULL;
}
VOID* client_socket(VOID* p)
{
mmSockHandle sockfd;
struct sockaddr_in serv_add;
char sendMsg[50] = {"hello.mmpa!"};
INT32 sendResult = 0;
memset_s(&serv_add, sizeof(serv_add), '0', sizeof(serv_add));
sockfd = mmSocket(AF_INET, SOCK_STREAM, 0);
serv_add.sin_family = AF_INET;
inet_aton("127.0.0.1", (struct in_addr *)&serv_add.sin_addr);
serv_add.sin_port = htons(*(INT32 *)p);
while (true) {
if (0 != socketFlag) {
mmConnect(sockfd, (mmSockAddr*)&serv_add, sizeof(serv_add));
sendResult = mmSocketSend(sockfd, (VOID*)sendMsg, 50, 0);
printf("The send msg length is %d.\r\n", sendResult);
break;
}
sleep(1);
}
sleep(1);
mmCloseSocket(sockfd);
return NULL;
}
static void cleanup_handler(void *arg)
{
printf("Cleanup handler of second thread./n");
free(arg);
mmMutexUnLock(&mtxfc);
return;
}
VOID* thread_func(VOID *arg)
{
struct node *p = NULL;
INT32 ret = EN_OK;
pthread_cleanup_push(cleanup_handler, p);
mmMutexLock(&mtxfc);
ret = mmCondWait(&cond, &mtxfc);
mmMutexUnLock(&mtxfc);
pthread_cleanup_pop(0);
return NULL;
}
VOID* thread_func_time(VOID *arg)
{
struct node *p = NULL;
INT32 ret = EN_OK;
pthread_cleanup_push(cleanup_handler, p);
mmMutexLock(&mtxfc);
time_t t;
struct tm *timeinfo;
time(&t);
timeinfo = localtime(&t);
printf("before mmCondTimedWait 时间:%s\n", asctime(timeinfo));
ret = mmCondTimedWait(&cond, &mtxfc, 999);
time(&t);
timeinfo = localtime(&t);
printf("after mmCondTimedWait1 时间:%s\n", asctime(timeinfo));
ret = mmCondTimedWait(&cond, &mtxfc, 999);
time(&t);
timeinfo = localtime(&t);
printf("after mmCondTimedWait2 时间:%s\n", asctime(timeinfo));
mmMutexUnLock(&mtxfc);
pthread_cleanup_pop(0);
return NULL;
}
#ifdef __cplusplus
#if __cplusplus
}
#endif
#endif