* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This program is free software, you can redistribute it and/or modify it under the terms and conditions of
* CANN Open Software License Agreement Version 2.0 (the "License").
* Please refer to the License for details. You may not use this file except in compliance with the License.
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED,
* INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE.
* See LICENSE in the root of the software repository for the full text of the License.
*/
#include "gtest/gtest.h"
#include <gmock/gmock.h>
#include <mockcpp/mockcpp.hpp>
#include "hccl_stub.h"
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <runtime/rt.h>
#include <iostream>
#include <fstream>
#include <nlohmann/json.hpp>
#define private public
#define protected public
#include "hcom_executor.h"
#include "hcom_executor_internel.h"
#undef private
#undef protected
#include "llt_hccl_stub_ge.h"
#include "common/ge_common/ge_types.h"
#include "v80_rank_table.h"
#include "llt_hccl_stub_hccl.h"
#include "dlhccl_function.h"
#include "adapter_dlhcclfunc.h"
using namespace std;
using namespace hccl;
class HcomExecutorTest : public testing::Test {
protected:
static void SetUpTestCase() {
std::cout << "HcomExecutorTest SetUP" << std::endl;
}
static void TearDownTestCase() {
std::cout << "HcomExecutorTest TearDown" << std::endl;
}
virtual void SetUp() {
std::cout << "A Test SetUP" << std::endl;
}
virtual void TearDown() {
std::cout << "A Test TearDown" << std::endl;
}
};
static HcclResult g_excutorStatus;
void setExecutorStatus(HcclResult status) {
HCCL_INFO("this is setExecutorStatus fuction");
g_excutorStatus = status;
}
void getExecutorStatus(HcclResult &status) {
status = g_excutorStatus;
}
static std::vector<HcclResult> g_mutiexcutorStatus;
void setMutiExecutorStatus(HcclResult status) {
HCCL_INFO("this is setExecutorStatus fuction");
g_mutiexcutorStatus.push_back(status);
}
void getMutiExecutorStatus(HcclResult &status, int count) {
if (g_mutiexcutorStatus.size() != count) {
status = HCCL_E_INTERNAL;
return;
}
for (int i = 0; i < g_mutiexcutorStatus.size(); i++) {
if (g_mutiexcutorStatus[i] != HCCL_SUCCESS) {
status = HCCL_E_INTERNAL;
return;
}
}
status = HCCL_SUCCESS;
g_mutiexcutorStatus.clear();
return;
}
ge::graphStatus FakeGetOption1(ge::GEThreadLocalContext *that, const std::string &optionExec,
std::string &dumpDebugValue) {
nlohmann::json group_list = {{{"group_name", "aa"}, {"group_rank_list", {0, 1}}}};
if (optionExec == ge::OPTION_EXEC_HCOM_GROUPLIST) {
dumpDebugValue = group_list.dump();
}
return ge::GRAPH_SUCCESS;
}
ge::graphStatus FakeGetOptionGrouplist(ge::GEThreadLocalContext *that, const std::string &optionExec,
std::string &dumpDebugValue) {
nlohmann::json rank_table = {{"status", "completed"},
{"deploy_mode", "lab"},
{"group_count", "1"},
{"chip_info", "910"},
{"board_id", "0x0000"},
{"para_plane_nic_location", "device"},
{"para_plane_nic_num", "1"},
{"para_plane_nic_name", {"eth0"}},
{"group_list",
{{
{"group_name", ""},
{"device_num", "1"},
{"server_num", "1"},
{"instance_count", "1"},
{"instance_list",
{{{"rank_id", "0"},
{"server_id", "172.17.1.120"},
{"devices", {{{"device_id", "0"}, {"device_ip", "192.168.1.120"}}}}}}},
}}}};
nlohmann::json group_list = {{{"group_name", "bbb"}, {"group_rank_list", {0, 1}}}};
if (optionExec == ge::OPTION_EXEC_RANK_TABLE) {
dumpDebugValue = rank_table.dump();
} else if (optionExec == ge::OPTION_EXEC_HCOM_GROUPLIST) {
dumpDebugValue = group_list.dump();
} else if (optionExec == ge::OPTION_EXEC_RANK_ID) {
dumpDebugValue.push_back('1');
} else if (optionExec == ge::OPTION_EXEC_ROLE_TABLE_ADDR) {
return !ge::GRAPH_SUCCESS;
} else if (optionExec == ge::OPTION_EXEC_RANK_TABLE_ADDR) {
dumpDebugValue = std::to_string(ge::PtrToValue(rank_table.dump().data()));
return ge::GRAPH_SUCCESS;
} else {
dumpDebugValue.push_back('1');
}
return ge::GRAPH_SUCCESS;
}
ge::graphStatus FakeGetOptionError1(ge::GEThreadLocalContext *that, const std::string &optionExec,
std::string &dumpDebugValue) {
if (optionExec == ge::OPTION_EXEC_HCOM_GROUPLIST) {
dumpDebugValue = R"({"rank_map":[{"logic_rank_id":1,"model_rank_id":0},{"logic_rank_id":1,"model_rank_id":0]})";
}
return ge::GRAPH_SUCCESS;
}
#define HCCL_COM_DATA_SIZE 1024
TEST_F(HcomExecutorTest, ut_executor_initgroup) {
set_board_id(0x0000);
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_initgroup.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
int ret = HCCL_SUCCESS;
ret = hrtSetDevice(0);
EXPECT_EQ(ret, HCCL_SUCCESS);
char *rank_table_file = "./ut_executor_initgroup.json";
char *rank_ID = "0";
MOCKER_CPP(&ge::GEThreadLocalContext::GetOption).stubs().will(invoke(FakeGetOption1));
ret = hccl::HcomExecutor::GetInstance().InitGroup();
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
remove(file_name_t);
GlobalMockObject::verify();
}
TEST_F(HcomExecutorTest, ut_executor_initgroup1) {
set_board_id(0x0000);
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_initgroup1.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
int ret = HCCL_SUCCESS;
ret = hrtSetDevice(0);
EXPECT_EQ(ret, HCCL_SUCCESS);
MOCKER_CPP(&ge::GEThreadLocalContext::GetOption).stubs().will(invoke(FakeGetOptionGrouplist));
u32 rankId = 0;
MOCKER(HcomGetRankId).stubs().with(mockcpp::any(), outBound(&rankId)).will(returnValue(HCCL_SUCCESS));
MOCKER(HcomCreateGroup).stubs().will(returnValue(HCCL_SUCCESS));
ret = hccl::HcomExecutor::GetInstance().InitGroup();
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
remove(file_name_t);
GlobalMockObject::verify();
}
TEST_F(HcomExecutorTest, ut_executor_initgroup1_error) {
set_board_id(0x0000);
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_initgroup1_error.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
int ret = HCCL_SUCCESS;
ret = hrtSetDevice(0);
EXPECT_EQ(ret, HCCL_SUCCESS);
MOCKER_CPP(&ge::GEThreadLocalContext::GetOption).stubs().will(invoke(FakeGetOptionError1));
u32 rankId = 0;
MOCKER(HcomGetRankId).stubs().with(mockcpp::any(), outBound(&rankId)).will(returnValue(HCCL_SUCCESS));
MOCKER(HcomCreateGroup).stubs().will(returnValue(HCCL_SUCCESS));
ret = hccl::HcomExecutor::GetInstance().InitGroup();
EXPECT_EQ(ret, HCCL_E_PARA);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
remove(file_name_t);
GlobalMockObject::verify();
}
#if 1
TEST_F(HcomExecutorTest, ut_executor_broadcast) {
set_board_id(0x0000);
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_broadcast.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
int ret = HCCL_SUCCESS;
rtError_t rt_ret = RT_ERROR_NONE;
rtStream_t stream;
s8 *sendbuf;
s32 rank = 0;
s32 errors = 0;
s32 count = HCCL_COM_DATA_SIZE;
ret = hrtSetDevice(0);
EXPECT_EQ(ret, HCCL_SUCCESS);
MOCKER_CPP(&ge::GEThreadLocalContext::GetOption).stubs().will(invoke(FakeGetOptionGrouplist));
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
HcclComm hcclComm;
ret = HcomGetCommHandleByGroup(NULL, &hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = hccl::HcomExecutor::GetInstance().InitHcclComm(nullptr, hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
rt_ret = rtStreamCreate(&stream, 5);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
sendbuf = (s8 *)sal_malloc(count * sizeof(s8));
sal_memset(sendbuf, count * sizeof(s8), 0, count * sizeof(s8));
for (int j = 0; j < count; j++) {
sendbuf[j] = 2;
}
ret = HcomBroadcast("testbcast", sendbuf, count, HCCL_DATA_TYPE_INT8, 0, NULL, stream);
EXPECT_EQ(ret, HCCL_SUCCESS);
rt_ret = rtStreamSynchronize(stream);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
for (int j = 0; j < count; j++) {
if (sendbuf[j] != 2) {
HCCL_ERROR("\n rank:%d sendbuf[%d]:%f", rank, j, sendbuf[j]);
errors++;
break;
}
}
for (int j = 0; j < count; j++) {
sendbuf[j] = 2;
}
(void)HcomSetWorkflowMode(HcclWorkflowMode::HCCL_WORKFLOW_MODE_OP_BASE);
ret = HcclBroadcast(sendbuf, count, HCCL_DATA_TYPE_INT8, 0, hcclComm, stream);
EXPECT_EQ(ret, HCCL_SUCCESS);
rt_ret = rtStreamSynchronize(stream);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
for (int j = 0; j < count; j++) {
if (sendbuf[j] != 2) {
HCCL_ERROR("\n rank:%d sendbuf[%d]:%f", rank, j, sendbuf[j]);
errors++;
break;
}
}
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
sal_free(sendbuf);
rt_ret = rtStreamDestroy(stream);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
remove(file_name_t);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
EXPECT_EQ(errors, 0);
}
TEST_F(HcomExecutorTest, ut_executor_allreduce) {
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_allreduce.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
int ret = HCCL_SUCCESS;
rtError_t rt_ret = RT_ERROR_NONE;
rtStream_t stream;
s8 *sendbuf;
s8 *recvbuf;
s32 rank = 0;
s32 errors = 0;
s32 count = HCCL_COM_DATA_SIZE;
ret = hrtSetDevice(0);
EXPECT_EQ(ret, HCCL_SUCCESS);
MOCKER_CPP(&ge::GEThreadLocalContext::GetOption).stubs().will(invoke(FakeGetOptionGrouplist));
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
HcclComm hcclComm;
ret = HcomGetCommHandleByGroup(NULL, &hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = hccl::HcomExecutor::GetInstance().InitHcclComm(nullptr, hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
rt_ret = rtStreamCreate(&stream, 5);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
sendbuf = (s8 *)sal_malloc(count * sizeof(s8));
sal_memset(sendbuf, count * sizeof(s8), 0, count * sizeof(s8));
recvbuf = (s8 *)sal_malloc(count * sizeof(s8));
sal_memset(recvbuf, count * sizeof(s8), 0, count * sizeof(s8));
for (int j = 0; j < count; j++) {
sendbuf[j] = 2;
}
ret = HcomAllReduce("testallreduce", sendbuf, recvbuf, count, HCCL_DATA_TYPE_INT8, HCCL_REDUCE_SUM, NULL, stream);
EXPECT_EQ(ret, HCCL_SUCCESS);
rt_ret = rtStreamSynchronize(stream);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
for (int j = 0; j < count; j++) {
if (recvbuf[j] != 2) {
errors++;
break;
}
}
for (int j = 0; j < count; j++) {
sendbuf[j] = 2;
}
(void)HcomSetWorkflowMode(HcclWorkflowMode::HCCL_WORKFLOW_MODE_OP_BASE);
ret = HcclAllReduce(sendbuf, recvbuf, count, HCCL_DATA_TYPE_INT8, HCCL_REDUCE_SUM, hcclComm, stream);
EXPECT_EQ(ret, HCCL_SUCCESS);
rt_ret = rtStreamSynchronize(stream);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
for (int j = 0; j < count; j++) {
if (recvbuf[j] != 2) {
errors++;
break;
}
}
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
sal_free(sendbuf);
sal_free(recvbuf);
rt_ret = rtStreamDestroy(stream);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
remove(file_name_t);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
}
#endif
#if 1
TEST_F(HcomExecutorTest, ut_executor_equeue_allreduce_mix) {
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_equeue_allreduce_mix.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
int ret = HCCL_SUCCESS;
rtError_t rt_ret = RT_ERROR_NONE;
rtStream_t stream;
s8 *sendbuf;
s8 *recvbuf;
s32 rank = 0;
s32 errors = 0;
s32 count = HCCL_COM_DATA_SIZE;
ret = hrtSetDevice(0);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
HcclComm hcclComm;
ret = HcomGetCommHandleByGroup(NULL, &hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = hccl::HcomExecutor::GetInstance().InitHcclComm(nullptr, hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
rt_ret = rtStreamCreate(&stream, 5);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
sendbuf = (s8 *)sal_malloc(count * sizeof(s8));
sal_memset(sendbuf, count * sizeof(s8), 0, count * sizeof(s8));
recvbuf = (s8 *)sal_malloc(count * sizeof(s8));
sal_memset(recvbuf, count * sizeof(s8), 0, count * sizeof(s8));
for (int j = 0; j < count; j++) {
sendbuf[j] = 2;
}
ret = HcomAllReduce("testallreduce", sendbuf, recvbuf, count, HCCL_DATA_TYPE_INT8, HCCL_REDUCE_SUM, NULL, stream);
EXPECT_EQ(ret, HCCL_SUCCESS);
rt_ret = rtStreamSynchronize(stream);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
for (int j = 0; j < count; j++) {
if (recvbuf[j] != 2) {
errors++;
break;
}
}
for (int j = 0; j < count; j++) {
sendbuf[j] = 2;
}
setExecutorStatus(HCCL_E_RESERVED);
HCCL_INFO("executor start");
HcomOperation opInfo;
opInfo.hcclType = HCCL_TYPE_ALLREDUCE;
opInfo.inputPtr = sendbuf;
opInfo.outputPtr = recvbuf;
opInfo.dataType = HCCL_DATA_TYPE_INT8;
opInfo.count = count;
opInfo.opType = HCCL_REDUCE_SUM;
ret = HcomExecEnqueueOperation(opInfo, setExecutorStatus);
if (ret != HCCL_SUCCESS) {
HcomExecFinalize();
}
EXPECT_EQ(ret, HCCL_SUCCESS);
HcclResult excutorStatus = HCCL_E_RESERVED;
const std::chrono::seconds TIMEOUT(1);
const auto start = std::chrono::steady_clock::now();
while (excutorStatus != HCCL_SUCCESS) {
getExecutorStatus(excutorStatus);
std::this_thread::sleep_for(std::chrono::milliseconds(LLT_SOCKET_SLEEP_MILLISECONDS));
const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start);
if (elapsed > TIMEOUT) {
HcomExecFinalize();
HCCL_ERROR("Wait timeout for getExecutor status timeout[%lld]", TIMEOUT);
return;
}
}
EXPECT_EQ(excutorStatus, HCCL_SUCCESS);
for (int j = 0; j < count; j++) {
if (recvbuf[j] != 2) {
errors++;
break;
}
}
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
sal_free(sendbuf);
sal_free(recvbuf);
rt_ret = rtStreamDestroy(stream);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
setExecutorStatus(HCCL_E_RESERVED);
remove(file_name_t);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
EXPECT_EQ(errors, 0);
}
#endif
#if 1
TEST_F(HcomExecutorTest, ut_executor_equeue_allreduce) {
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_equeue_allreduce.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
int ret = HCCL_SUCCESS;
rtError_t rt_ret = RT_ERROR_NONE;
rtStream_t stream;
s8 *sendbuf;
s8 *recvbuf;
s32 rank = 0;
s32 errors = 0;
s32 count = HCCL_COM_DATA_SIZE;
ret = hrtSetDevice(0);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
HcclComm hcclComm;
ret = HcomGetCommHandleByGroup(NULL, &hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = hccl::HcomExecutor::GetInstance().InitHcclComm(nullptr, hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
rt_ret = rtStreamCreate(&stream, 5);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
sendbuf = (s8 *)sal_malloc(count * sizeof(s8));
sal_memset(sendbuf, count * sizeof(s8), 0, count * sizeof(s8));
recvbuf = (s8 *)sal_malloc(count * sizeof(s8));
sal_memset(recvbuf, count * sizeof(s8), 0, count * sizeof(s8));
for (int j = 0; j < count; j++) {
sendbuf[j] = 2;
}
setExecutorStatus(HCCL_E_RESERVED);
HCCL_INFO("executor start");
HcomOperation opInfo;
opInfo.hcclType = HCCL_TYPE_ALLREDUCE;
opInfo.inputPtr = sendbuf;
opInfo.outputPtr = recvbuf;
opInfo.dataType = HCCL_DATA_TYPE_INT8;
opInfo.count = count;
opInfo.opType = HCCL_REDUCE_SUM;
ret = HcomExecEnqueueOperation(opInfo, setExecutorStatus);
if (ret != HCCL_SUCCESS) {
HCCL_ERROR("HcomExecEnqueueOperation error");
HcomExecFinalize();
}
EXPECT_EQ(ret, HCCL_SUCCESS);
HcclResult excutorStatus = HCCL_E_RESERVED;
const std::chrono::seconds TIMEOUT(1);
const auto start = std::chrono::steady_clock::now();
while (excutorStatus != HCCL_SUCCESS) {
getExecutorStatus(excutorStatus);
std::this_thread::sleep_for(std::chrono::milliseconds(LLT_SOCKET_SLEEP_MILLISECONDS));
const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start);
if (elapsed > TIMEOUT) {
HCCL_ERROR("Wait timeout for getExecutor status timeout[%lld]", TIMEOUT);
HcomExecFinalize();
return;
}
}
std::this_thread::sleep_for(std::chrono::seconds(1));
HCCL_INFO("HcomExcutor excutorStatus[%d]", excutorStatus);
EXPECT_EQ(excutorStatus, HCCL_SUCCESS);
for (int j = 0; j < count; j++) {
if (recvbuf[j] != 2) {
errors++;
break;
}
}
HCCL_INFO("HcomExecFinalize start");
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
sal_free(sendbuf);
sal_free(recvbuf);
rt_ret = rtStreamDestroy(stream);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
setExecutorStatus(HCCL_E_RESERVED);
remove(file_name_t);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
EXPECT_EQ(errors, 0);
}
#endif
#if 1
TEST_F(HcomExecutorTest, ut_executor_equeue_reduce_scatter) {
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_equeue_reduce_scatter.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
int ret = HCCL_SUCCESS;
rtError_t rt_ret = RT_ERROR_NONE;
rtStream_t stream;
s8 *sendbuf;
s8 *recvbuf;
s32 rank = 0;
s32 errors = 0;
s32 count = HCCL_COM_DATA_SIZE;
ret = hrtSetDevice(0);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
HcclComm hcclComm;
ret = HcomGetCommHandleByGroup(NULL, &hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = hccl::HcomExecutor::GetInstance().InitHcclComm(nullptr, hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
rt_ret = rtStreamCreate(&stream, 5);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
sendbuf = (s8 *)sal_malloc(count * sizeof(s8));
sal_memset(sendbuf, count * sizeof(s8), 0, count * sizeof(s8));
recvbuf = (s8 *)sal_malloc(count * sizeof(s8));
sal_memset(recvbuf, count * sizeof(s8), 0, count * sizeof(s8));
for (int j = 0; j < count; j++) {
sendbuf[j] = 2;
}
setExecutorStatus(HCCL_E_RESERVED);
HCCL_INFO("executor start");
HcomOperation opInfo;
opInfo.hcclType = HCCL_TYPE_REDUCESCATTER;
opInfo.inputPtr = sendbuf;
opInfo.outputPtr = recvbuf;
opInfo.dataType = HCCL_DATA_TYPE_INT8;
opInfo.count = count;
opInfo.opType = HCCL_REDUCE_SUM;
ret = HcomExecEnqueueOperation(opInfo, setExecutorStatus);
if (ret != HCCL_SUCCESS) {
HCCL_ERROR("HcomExecEnqueueOperation error");
HcomExecFinalize();
}
EXPECT_EQ(ret, HCCL_SUCCESS);
HcclResult excutorStatus = HCCL_E_RESERVED;
const std::chrono::seconds TIMEOUT(1);
const auto start = std::chrono::steady_clock::now();
while (excutorStatus != HCCL_SUCCESS) {
getExecutorStatus(excutorStatus);
std::this_thread::sleep_for(std::chrono::milliseconds(LLT_SOCKET_SLEEP_MILLISECONDS));
const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start);
if (elapsed > TIMEOUT) {
HCCL_ERROR("Wait timeout for getExecutor status timeout[%lld]", TIMEOUT);
HcomExecFinalize();
return;
}
}
std::this_thread::sleep_for(std::chrono::seconds(1));
HCCL_INFO("HcomExcutor excutorStatus[%d]", excutorStatus);
EXPECT_EQ(excutorStatus, HCCL_SUCCESS);
for (int j = 0; j < count; j++) {
if (recvbuf[j] != 2) {
errors++;
break;
}
}
HCCL_INFO("HcomExecFinalize start");
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
sal_free(sendbuf);
sal_free(recvbuf);
rt_ret = rtStreamDestroy(stream);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
setExecutorStatus(HCCL_E_RESERVED);
remove(file_name_t);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
EXPECT_EQ(errors, 0);
}
#endif
#if 1
TEST_F(HcomExecutorTest, ut_executor_equeue_broardcast) {
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_equeue_broardcast.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
int ret = HCCL_SUCCESS;
rtError_t rt_ret = RT_ERROR_NONE;
s8 *sendbuf;
s32 rank = 0;
s32 errors = 0;
s32 count = HCCL_COM_DATA_SIZE;
ret = hrtSetDevice(0);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
HcclComm hcclComm;
ret = HcomGetCommHandleByGroup(NULL, &hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = hccl::HcomExecutor::GetInstance().InitHcclComm(nullptr, hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
sendbuf = (s8 *)sal_malloc(count * sizeof(s8) + 512);
sal_memset(sendbuf, count * sizeof(s8) + 512, 0, count * sizeof(s8) + 512);
for (int j = 0; j < count; j++) {
sendbuf[j + 128] = 2;
}
HCCL_INFO("executor start");
HcomOperation opInfo;
opInfo.hcclType = HCCL_TYPE_BROADCAST;
opInfo.inputPtr = sendbuf + 128;
opInfo.dataType = HCCL_DATA_TYPE_INT8;
opInfo.count = count;
opInfo.opType = HCCL_REDUCE_SUM;
opInfo.root = 0;
for (int i = 0; i < 5; i++) {
ret = HcomExecEnqueueOperation(opInfo, setMutiExecutorStatus);
if (ret != HCCL_SUCCESS) {
HCCL_ERROR("HcomExecEnqueueOperation error");
HcomExecFinalize();
}
EXPECT_EQ(ret, HCCL_SUCCESS);
}
HcclResult excutorStatus = HCCL_E_RESERVED;
const std::chrono::seconds TIMEOUT(1);
const auto start = std::chrono::steady_clock::now();
while (excutorStatus != HCCL_SUCCESS) {
getMutiExecutorStatus(excutorStatus, 5);
std::this_thread::sleep_for(std::chrono::milliseconds(LLT_SOCKET_SLEEP_MILLISECONDS));
const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start);
if (elapsed > TIMEOUT) {
HCCL_ERROR("Wait timeout for getExecutor status timeout[%lld]", TIMEOUT);
HcomExecFinalize();
return;
}
}
HCCL_INFO("HcomExcutor excutorStatus[%d]", excutorStatus);
EXPECT_EQ(excutorStatus, HCCL_SUCCESS);
for (int j = 0; j < count; j++) {
if (sendbuf[j + 128] != 2) {
HCCL_ERROR("\n rank:%d sendbuf[%d]:%f", rank, j + 128, sendbuf[j + 128]);
errors++;
break;
}
}
HCCL_INFO("HcomExecFinalize start");
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
sal_free(sendbuf);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
setExecutorStatus(HCCL_E_RESERVED);
remove(file_name_t);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
EXPECT_EQ(errors, 0);
}
#endif
#if 1
TEST_F(HcomExecutorTest, ut_executor_equeue_allgather) {
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_equeue_allgather.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
int ret = HCCL_SUCCESS;
rtError_t rt_ret = RT_ERROR_NONE;
s8 *sendbuf;
s8 *recvbuf;
s32 rank = 0;
s32 errors = 0;
s32 count = HCCL_COM_DATA_SIZE;
ret = hrtSetDevice(0);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
HcclComm hcclComm;
ret = HcomGetCommHandleByGroup(NULL, &hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = hccl::HcomExecutor::GetInstance().InitHcclComm(nullptr, hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
sendbuf = (s8 *)sal_malloc(count * sizeof(s8) + 512);
sal_memset(sendbuf, count * sizeof(s8) + 512, 0, count * sizeof(s8) + 512);
recvbuf = (s8 *)sal_malloc(count * sizeof(s8) + 512);
sal_memset(recvbuf, count * sizeof(s8) + 512, 0, count * sizeof(s8) + 512);
for (int j = 0; j < count; j++) {
sendbuf[j + 128] = 2;
}
setExecutorStatus(HCCL_E_RESERVED);
HCCL_INFO("executor start");
HcomOperation opInfo;
opInfo.hcclType = HCCL_TYPE_ALLGATHER;
opInfo.inputPtr = sendbuf + 128;
opInfo.outputPtr = recvbuf + 128;
opInfo.dataType = HCCL_DATA_TYPE_INT8;
opInfo.count = count;
opInfo.opType = HCCL_REDUCE_SUM;
ret = HcomExecEnqueueOperation(opInfo, setExecutorStatus);
if (ret != HCCL_SUCCESS) {
HCCL_ERROR("HcomExecEnqueueOperation error");
HcomExecFinalize();
}
EXPECT_EQ(ret, HCCL_SUCCESS);
HcclResult excutorStatus = HCCL_E_RESERVED;
const std::chrono::seconds TIMEOUT(1);
const auto start = std::chrono::steady_clock::now();
while (excutorStatus != HCCL_SUCCESS) {
getExecutorStatus(excutorStatus);
std::this_thread::sleep_for(std::chrono::milliseconds(LLT_SOCKET_SLEEP_MILLISECONDS));
const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start);
if (elapsed > TIMEOUT) {
HCCL_ERROR("Wait timeout for getExecutor status timeout[%lld]", TIMEOUT);
HcomExecFinalize();
return;
}
}
HCCL_INFO("HcomExcutor excutorStatus[%d]", excutorStatus);
EXPECT_EQ(excutorStatus, HCCL_SUCCESS);
for (int j = 0; j < count; j++) {
if (recvbuf[j + 128] != 2) {
errors++;
break;
}
}
HCCL_INFO("HcomExecFinalize start");
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
sal_free(sendbuf);
sal_free(recvbuf);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
setExecutorStatus(HCCL_E_RESERVED);
remove(file_name_t);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
EXPECT_EQ(errors, 0);
}
#endif
#if 1
TEST_F(HcomExecutorTest, ut_executor_equeue_allgather_mutiInit) {
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_equeue_allgather_mutiInit.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
int ret = HCCL_SUCCESS;
rtError_t rt_ret = RT_ERROR_NONE;
s8 *sendbuf;
s8 *recvbuf;
s32 rank = 0;
s32 errors = 0;
s32 count = HCCL_COM_DATA_SIZE;
ret = hrtSetDevice(0);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
HcclComm hcclComm;
ret = HcomGetCommHandleByGroup(NULL, &hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = hccl::HcomExecutor::GetInstance().InitHcclComm(nullptr, hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
sendbuf = (s8 *)sal_malloc(count * sizeof(s8) + 512);
sal_memset(sendbuf, count * sizeof(s8) + 512, 0, count * sizeof(s8) + 512);
recvbuf = (s8 *)sal_malloc(count * sizeof(s8) + 512);
sal_memset(recvbuf, count * sizeof(s8) + 512, 0, count * sizeof(s8) + 512);
for (int j = 0; j < count; j++) {
sendbuf[j + 128] = 2;
}
setExecutorStatus(HCCL_E_RESERVED);
HCCL_INFO("executor start");
HcomOperation opInfo;
opInfo.hcclType = HCCL_TYPE_ALLGATHER;
opInfo.inputPtr = sendbuf + 128;
opInfo.outputPtr = recvbuf + 128;
opInfo.dataType = HCCL_DATA_TYPE_INT8;
opInfo.count = count;
opInfo.opType = HCCL_REDUCE_SUM;
ret = HcomExecEnqueueOperation(opInfo, setExecutorStatus);
if (ret != HCCL_SUCCESS) {
HCCL_ERROR("HcomExecEnqueueOperation error");
HcomExecFinalize();
}
EXPECT_EQ(ret, HCCL_SUCCESS);
HcclResult excutorStatus = HCCL_E_RESERVED;
const std::chrono::seconds TIMEOUT(1);
const auto start = std::chrono::steady_clock::now();
while (excutorStatus != HCCL_SUCCESS) {
getExecutorStatus(excutorStatus);
std::this_thread::sleep_for(std::chrono::milliseconds(LLT_SOCKET_SLEEP_MILLISECONDS));
const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start);
if (elapsed > TIMEOUT) {
HCCL_ERROR("Wait timeout for getExecutor status timeout[%lld]", TIMEOUT);
HcomExecFinalize();
return;
}
}
HCCL_INFO("HcomExcutor excutorStatus[%d]", excutorStatus);
EXPECT_EQ(excutorStatus, HCCL_SUCCESS);
for (int j = 0; j < count; j++) {
if (recvbuf[j + 128] != 2) {
errors++;
break;
}
}
HCCL_INFO("HcomExecFinalize start");
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
sal_free(sendbuf);
sal_free(recvbuf);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
setExecutorStatus(HCCL_E_RESERVED);
remove(file_name_t);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
EXPECT_EQ(errors, 0);
}
#endif
#if 1
TEST_F(HcomExecutorTest, ut_executor_equeue_mutiInit) {
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_equeue_mutiInit.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
int ret = HCCL_SUCCESS;
rtError_t rt_ret = RT_ERROR_NONE;
s32 rank = 0;
s32 errors = 0;
s32 count = HCCL_COM_DATA_SIZE;
ret = hrtSetDevice(0);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
HCCL_INFO("HcomExecFinalize start");
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
setExecutorStatus(HCCL_E_RESERVED);
remove(file_name_t);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
EXPECT_EQ(errors, 0);
}
#endif
TEST_F(HcomExecutorTest, ut_executor_equeue_alltoallv) {
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_equeue_alltoallv.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
char *identify = "0";
s32 rankSize = 1;
s32 rank = atoi(identify);
u64 count = 2;
HCCL_INFO("alltoall_int32 : identify[%d], count[%llu]", rank, count);
u32 devLogicId = 0xFFFF;
HcclResult ret = hrtSetDevice(devLogicId);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
HcclComm hcclComm;
ret = HcomGetCommHandleByGroup(NULL, &hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = hccl::HcomExecutor::GetInstance().InitHcclComm(nullptr, hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
const int COUNT_PER_RANK = count;
u64 memSize = COUNT_PER_RANK * rankSize * sizeof(s32);
HostMem hostSendMem = HostMem::alloc(memSize);
memset_s(hostSendMem.ptr(), memSize, 0, COUNT_PER_RANK * rankSize);
for (u32 i = 0; i < COUNT_PER_RANK * rankSize; i++) {
*((s32 *)hostSendMem.ptr() + i) = rank + 1;
}
vector<u64> sendCounts(rankSize, COUNT_PER_RANK);
vector<u64> recvCounts(rankSize, COUNT_PER_RANK);
vector<u64> sdispls(rankSize, 0);
vector<u64> rdispls(rankSize, 0);
for (int i = 0; i < rankSize; i++) {
sdispls[i] = COUNT_PER_RANK * i;
rdispls[i] = COUNT_PER_RANK * i;
HCCL_INFO("num[%d] displs[%d]", i, COUNT_PER_RANK * i);
}
DeviceMem devSendCounts = DeviceMem::alloc(rankSize * sizeof(u64));
ret = hrtMemSyncCopy(devSendCounts.ptr(), rankSize * sizeof(u64), sendCounts.data(), rankSize * sizeof(u64),
HcclRtMemcpyKind::HCCL_RT_MEMCPY_KIND_HOST_TO_DEVICE);
EXPECT_EQ(ret, HCCL_SUCCESS);
DeviceMem devSdispls = DeviceMem::alloc(rankSize * sizeof(u64));
ret = hrtMemSyncCopy(devSdispls.ptr(), rankSize * sizeof(u64), sdispls.data(), rankSize * sizeof(u64),
HcclRtMemcpyKind::HCCL_RT_MEMCPY_KIND_HOST_TO_DEVICE);
EXPECT_EQ(ret, HCCL_SUCCESS);
DeviceMem devRecvCounts = DeviceMem::alloc(rankSize * sizeof(u64));
ret = hrtMemSyncCopy(devRecvCounts.ptr(), rankSize * sizeof(u64), recvCounts.data(), rankSize * sizeof(u64),
HcclRtMemcpyKind::HCCL_RT_MEMCPY_KIND_HOST_TO_DEVICE);
EXPECT_EQ(ret, HCCL_SUCCESS);
DeviceMem devRdispls = DeviceMem::alloc(rankSize * sizeof(u64));
ret = hrtMemSyncCopy(devRdispls.ptr(), rankSize * sizeof(u64), rdispls.data(), rankSize * sizeof(u64),
HcclRtMemcpyKind::HCCL_RT_MEMCPY_KIND_HOST_TO_DEVICE);
EXPECT_EQ(ret, HCCL_SUCCESS);
DeviceMem sendMem = DeviceMem::alloc(memSize);
ret = hrtMemSyncCopy(sendMem.ptr(), memSize, hostSendMem.ptr(), memSize,
HcclRtMemcpyKind::HCCL_RT_MEMCPY_KIND_HOST_TO_DEVICE);
EXPECT_EQ(ret, HCCL_SUCCESS);
DeviceMem recvMem = DeviceMem::alloc(memSize);
HcomAllToAllVParams opInfo;
opInfo.group = nullptr;
opInfo.sendbuf = sendMem.ptr();
opInfo.sendcounts = devSendCounts.ptr();
opInfo.sdispls = devSdispls.ptr();
opInfo.sendtype = HCCL_DATA_TYPE_INT32;
opInfo.recvbuf = recvMem.ptr();
opInfo.recvcounts = devRecvCounts.ptr();
opInfo.rdispls = devRdispls.ptr();
opInfo.recvtype = HCCL_DATA_TYPE_INT32;
setExecutorStatus(HCCL_E_RESERVED);
ret = HcomExecEnqueueAllToAllV(opInfo, setExecutorStatus);
EXPECT_EQ(ret, HCCL_SUCCESS);
HCCL_INFO("HcomExecFinalize start");
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
setExecutorStatus(HCCL_E_RESERVED);
remove(file_name_t);
(void)aclrtResetDevice(0);
}
TEST_F(HcomExecutorTest, ut_executor_equeue_alltoallvc) {
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_equeue_alltoallvc.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
char *identify = "0";
s32 rankSize = 1;
s32 rank = atoi(identify);
u64 count = 2;
HCCL_INFO("alltoallvc_int32 : identify[%d], count[%llu]", rank, count);
u32 devLogicId = 0xFFFF;
HcclResult ret = hrtSetDevice(devLogicId);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
HcclComm hcclComm;
ret = HcomGetCommHandleByGroup(NULL, &hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = hccl::HcomExecutor::GetInstance().InitHcclComm(nullptr, hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
const int COUNT_PER_RANK = count;
u64 memSize = COUNT_PER_RANK * rankSize * sizeof(s32);
HostMem hostSendMem = HostMem::alloc(memSize);
memset_s(hostSendMem.ptr(), memSize, 0, COUNT_PER_RANK * rankSize);
for (u32 i = 0; i < COUNT_PER_RANK * rankSize; i++) {
*((s32 *)hostSendMem.ptr() + i) = rank + 1;
}
vector<u64> sendCounts(rankSize, COUNT_PER_RANK);
vector<u64> recvCounts(rankSize, COUNT_PER_RANK);
vector<u64> sendCountMatrix(rankSize * rankSize, COUNT_PER_RANK);
DeviceMem devSendCountMatrix = DeviceMem::alloc(rankSize * rankSize * sizeof(u64));
ret = hrtMemSyncCopy(devSendCountMatrix.ptr(), rankSize * rankSize * sizeof(u64), sendCountMatrix.data(),
rankSize * rankSize * sizeof(u64), HcclRtMemcpyKind::HCCL_RT_MEMCPY_KIND_HOST_TO_DEVICE);
EXPECT_EQ(ret, HCCL_SUCCESS);
DeviceMem sendMem = DeviceMem::alloc(memSize);
ret = hrtMemSyncCopy(sendMem.ptr(), memSize, hostSendMem.ptr(), memSize,
HcclRtMemcpyKind::HCCL_RT_MEMCPY_KIND_HOST_TO_DEVICE);
EXPECT_EQ(ret, HCCL_SUCCESS);
DeviceMem recvMem = DeviceMem::alloc(memSize);
HcomAllToAllVCParams opInfo;
opInfo.group = nullptr;
opInfo.sendbuf = sendMem.ptr();
opInfo.sendcountmatrix = devSendCountMatrix.ptr();
opInfo.sendtype = HCCL_DATA_TYPE_INT32;
opInfo.recvbuf = recvMem.ptr();
opInfo.recvtype = HCCL_DATA_TYPE_INT32;
setExecutorStatus(HCCL_E_RESERVED);
ret = HcomExecEnqueueAllToAllVC(opInfo, setExecutorStatus);
EXPECT_EQ(ret, HCCL_SUCCESS);
HCCL_INFO("HcomExecFinalize start");
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
setExecutorStatus(HCCL_E_RESERVED);
remove(file_name_t);
(void)aclrtResetDevice(0);
}
TEST_F(HcomExecutorTest, ut_executor_equeue_gather_alltoallv) {
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_equeue_gather_alltoallv.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
char *identify = "0";
s32 rankSize = 1;
s32 rank = atoi(identify);
u64 count = 200;
HCCL_INFO("alltoall_int32 : identify[%d], count[%llu]", rank, count);
u32 devLogicId = 0xFFFF;
HcclResult ret = hrtSetDevice(devLogicId);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
HcclComm hcclComm;
ret = HcomGetCommHandleByGroup(NULL, &hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = hccl::HcomExecutor::GetInstance().InitHcclComm(nullptr, hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
u32 countsNum = 200;
vector<u64> addrInfo;
vector<u64> addrInfoCountPerRank(rankSize, countsNum);
const int COUNT_PER_RANK = count;
u64 memSize = COUNT_PER_RANK * rankSize * sizeof(s32) * countsNum;
HostMem hostSendMem = HostMem::alloc(memSize);
memset_s(hostSendMem.ptr(), memSize, 0, COUNT_PER_RANK * rankSize);
for (u32 i = 0; i < COUNT_PER_RANK * rankSize * countsNum; i++) {
*((s32 *)hostSendMem.ptr() + i) = rank + 1;
if (i % COUNT_PER_RANK == 0) {
addrInfo.push_back((uintptr_t)((s32 *)hostSendMem.ptr() + i));
addrInfo.push_back(COUNT_PER_RANK * sizeof(s32));
}
}
DeviceMem devAddrInfo = DeviceMem::alloc(addrInfo.size() * sizeof(u64));
ret = hrtMemSyncCopy(devAddrInfo.ptr(), addrInfo.size() * sizeof(u64), addrInfo.data(), addrInfo.size() * sizeof(u64),
HcclRtMemcpyKind::HCCL_RT_MEMCPY_KIND_HOST_TO_DEVICE);
EXPECT_EQ(ret, HCCL_SUCCESS);
DeviceMem devCountPerRank = DeviceMem::alloc(addrInfoCountPerRank.size() * sizeof(u64));
ret = hrtMemSyncCopy(devCountPerRank.ptr(), addrInfoCountPerRank.size() * sizeof(u64), addrInfoCountPerRank.data(),
addrInfoCountPerRank.size() * sizeof(u64), HcclRtMemcpyKind::HCCL_RT_MEMCPY_KIND_HOST_TO_DEVICE);
EXPECT_EQ(ret, HCCL_SUCCESS);
vector<u64> recvCounts(rankSize, COUNT_PER_RANK);
vector<u64> rdispls(rankSize, 0);
for (int i = 0; i < rankSize; i++) {
rdispls[i] = COUNT_PER_RANK * i;
HCCL_INFO("num[%d] displs[%d]", i, COUNT_PER_RANK * i);
}
DeviceMem devRecvCounts = DeviceMem::alloc(rankSize * sizeof(u64));
ret = hrtMemSyncCopy(devRecvCounts.ptr(), rankSize * sizeof(u64), recvCounts.data(), rankSize * sizeof(u64),
HcclRtMemcpyKind::HCCL_RT_MEMCPY_KIND_HOST_TO_DEVICE);
EXPECT_EQ(ret, HCCL_SUCCESS);
DeviceMem devRdispls = DeviceMem::alloc(rankSize * sizeof(u64));
ret = hrtMemSyncCopy(devRdispls.ptr(), rankSize * sizeof(u64), rdispls.data(), rankSize * sizeof(u64),
HcclRtMemcpyKind::HCCL_RT_MEMCPY_KIND_HOST_TO_DEVICE);
EXPECT_EQ(ret, HCCL_SUCCESS);
EXPECT_EQ(ret, HCCL_SUCCESS);
DeviceMem recvMem = DeviceMem::alloc(memSize);
DeviceMem gatherMem = DeviceMem::alloc(memSize);
HCCL_ERROR("memsize[%llu]", memSize);
HcomGatherAllToAllVParams opInfo;
opInfo.addrInfo = devAddrInfo.ptr();
opInfo.addrInfoCountPerRank = devCountPerRank.ptr();
opInfo.recvbuf = recvMem.ptr();
opInfo.recvcounts = devRecvCounts.ptr();
opInfo.rdispls = devRdispls.ptr();
opInfo.recvtype = HCCL_DATA_TYPE_INT32;
opInfo.gatheredbuf = gatherMem.ptr();
constexpr char HCCL_WORLD_GROUP[] = "hccl_world_group";
opInfo.group = HCCL_WORLD_GROUP;
opInfo.addrLength = -1;
setExecutorStatus(HCCL_E_RESERVED);
ret = HcomExecEnqueueGatherAllToAllV(opInfo, setExecutorStatus);
EXPECT_EQ(ret, HCCL_E_NOT_SUPPORT);
HcclResult exeRet = HCCL_E_RESERVED;
const std::chrono::seconds TIMEOUT(1);
const auto start = std::chrono::steady_clock::now();
while (exeRet != HCCL_SUCCESS) {
getExecutorStatus(exeRet);
std::this_thread::sleep_for(std::chrono::milliseconds(LLT_SOCKET_SLEEP_MILLISECONDS));
const auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::steady_clock::now() - start);
if (elapsed > TIMEOUT) {
HCCL_ERROR("Wait timeout for getExecutor status timeout[%lld]", TIMEOUT);
HcomExecFinalize();
HcomDestroy();
return;
}
}
HCCL_INFO("HcomExecFinalize start");
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomDestroy();
EXPECT_EQ(ret, HCCL_SUCCESS);
setExecutorStatus(HCCL_E_RESERVED);
remove(file_name_t);
(void)aclrtResetDevice(0);
}
#if 1
TEST_F(HcomExecutorTest, ut_executor_reduce) {
nlohmann::json rank_table = rank_table_910_1server_1rank;
char file_name_t[] = "./ut_executor_reduce.json";
std::ofstream outfile(file_name_t, std::ios::out | std::ios::trunc | std::ios::binary);
if (outfile.is_open()) {
outfile << std::setw(1) << rank_table << std::endl;
HCCL_INFO("open %s success", file_name_t);
} else {
HCCL_ERROR("open %s failed", file_name_t);
}
outfile.close();
int ret = HCCL_SUCCESS;
rtError_t rt_ret = RT_ERROR_NONE;
rtStream_t stream;
s8 *sendbuf;
s8 *recvbuf;
s32 rank = 0;
s32 errors = 0;
s32 count = HCCL_COM_DATA_SIZE;
ret = hrtSetDevice(0);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = HcomExecInitialize();
EXPECT_EQ(ret, HCCL_SUCCESS);
HcclComm hcclComm;
ret = HcomGetCommHandleByGroup(NULL, &hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
ret = hccl::HcomExecutor::GetInstance().InitHcclComm(nullptr, hcclComm);
EXPECT_EQ(ret, HCCL_SUCCESS);
rt_ret = rtStreamCreate(&stream, 5);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
sendbuf = (s8 *)sal_malloc(count * sizeof(s8));
sal_memset(sendbuf, count * sizeof(s8), 0, count * sizeof(s8));
recvbuf = (s8 *)sal_malloc(count * sizeof(s8));
sal_memset(recvbuf, count * sizeof(s8), 0, count * sizeof(s8));
for (int j = 0; j < count; j++) {
sendbuf[j] = 2;
}
ret = HcomReduce("testreduce", sendbuf, recvbuf, count, HCCL_DATA_TYPE_INT8, HCCL_REDUCE_SUM, 0, NULL, stream);
EXPECT_EQ(ret, HCCL_SUCCESS);
rt_ret = rtStreamSynchronize(stream);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
for (int j = 0; j < count; j++) {
if (recvbuf[j] != 2) {
HCCL_ERROR("\n rank:%d recvbuf[%d]:%f", rank, j, recvbuf[j]);
errors++;
break;
}
}
for (int j = 0; j < count; j++) {
sendbuf[j] = 2;
}
(void)HcomSetWorkflowMode(HcclWorkflowMode::HCCL_WORKFLOW_MODE_OP_BASE);
ret = HcclReduce(sendbuf, recvbuf, count, HCCL_DATA_TYPE_INT8, HCCL_REDUCE_SUM, 0, hcclComm, stream);
EXPECT_EQ(ret, HCCL_SUCCESS);
rt_ret = rtStreamSynchronize(stream);
EXPECT_EQ(rt_ret, RT_ERROR_NONE);
for (int j = 0; j < count; j++) {
if (recvbuf[j] != 2) {
HCCL_ERROR("\n rank:%d recvbuf[%d]:%f", rank, j, recvbuf[j]);
errors++;
break;
}
}
ret = HcomExecFinalize();
EXPECT_EQ(ret, HCCL_SUCCESS);
}
#endif
TEST_F(HcomExecutorTest, ut_executor_runGather) {
MOCKER_CPP(&HcomExecutor::GatherMemCopyThread).stubs().with().will(returnValue(HCCL_SUCCESS));
MOCKER(hrtMemSyncCopy).stubs().with(mockcpp::any()).will(returnValue(HCCL_SUCCESS));
std::vector<u64> addrInfo = {10, 20, 30, 40, 50, 60};
std::vector<u64> addrInfoCountPerRank = {1, 2};
u32 rankSize = 2;
u64 sendCounts[rankSize];
u64 sdispls[rankSize];
void *sendDevBuf = nullptr;
s32 addrLength = -1;
HcclResult result = hccl::HcomExecutor::GetInstance().RunGather(addrInfo, addrInfoCountPerRank, rankSize, sendCounts,
sdispls, sendDevBuf, addrLength);
EXPECT_EQ(result, HCCL_SUCCESS);
}
TEST_F(HcomExecutorTest, Ut_HcomExec) {
struct HcomRemoteOperation opInfo;
auto callback = [](HcclResult status) {};
HcclResult result = HcomExecEnqueueRemoteOperation(opInfo, callback);
EXPECT_EQ(result, HCCL_E_NOT_SUPPORT);
std::string remoteAccessType = "testType";
std::vector<HcomRemoteAccessAddrInfo> addrInfos;
result = HcomExecEnqueueRemoteAccess(remoteAccessType, addrInfos, callback);
EXPECT_EQ(result, HCCL_E_NOT_SUPPORT);
HcomOperation_t opInfo1;
opInfo1.group = "testGroup";
opInfo1.inputPtr = nullptr;
opInfo1.count = 10;
opInfo1.dataType = HCCL_DATA_TYPE_INT32;
opInfo1.root = 0;
MOCKER_CPP(HcceBroadcast).stubs().will(returnValue(HCCL_SUCCESS));
result = hccl::HcomExecutor::GetInstance().ExecuteBroadcast(opInfo1);
EXPECT_EQ(result, HCCL_SUCCESS);
MOCKER_CPP(HcceAllReduce).stubs().will(returnValue(HCCL_SUCCESS));
result = hccl::HcomExecutor::GetInstance().ExecuteAllreduce(opInfo1);
EXPECT_EQ(result, HCCL_SUCCESS);
MOCKER_CPP(HcceAllGather).stubs().will(returnValue(HCCL_SUCCESS));
result = hccl::HcomExecutor::GetInstance().ExecuteAllGather(opInfo1);
EXPECT_EQ(result, HCCL_SUCCESS);
MOCKER_CPP(HcceReduceScatter).stubs().will(returnValue(HCCL_SUCCESS));
result = hccl::HcomExecutor::GetInstance().ExecuteReduceScatter(opInfo1);
EXPECT_EQ(result, HCCL_SUCCESS);
}
TEST_F(HcomExecutorTest, Ut_ExecuteAlltoAll) {
void *baseAddr = malloc(100);
u64 offset = 0;
std::vector<u64> addrInfo = {10, 20, 30, 40};
u64 beginIndex = 0;
u64 count = 2;
u64 tmpMemSize = 0;
hccl::HcomExecutor::GetInstance().GatherMemCopyThread(baseAddr, offset, addrInfo, beginIndex, count, tmpMemSize);
free(baseAddr);
HcomAllToAllVParams opInfo;
opInfo.group = nullptr;
opInfo.sendbuf = (void *)0x1000;
opInfo.sendcounts = (u64 *)0x2000;
opInfo.sdispls = (u64 *)0x3000;
opInfo.sendtype = HCCL_DATA_TYPE_INT8;
opInfo.recvbuf = (void *)0x4000;
opInfo.recvcounts = (u64 *)0x5000;
opInfo.rdispls = (u64 *)0x6000;
opInfo.recvtype = HCCL_DATA_TYPE_INT8;
MOCKER_CPP(HcceAlltoAll).stubs().will(returnValue(HCCL_SUCCESS));
MOCKER_CPP(HcceAlltoAllV).stubs().will(returnValue(HCCL_SUCCESS));
HcclResult result = hccl::HcomExecutor::GetInstance().ExecuteAlltoAll(opInfo, false);
EXPECT_EQ(result, HCCL_SUCCESS);
result = hccl::HcomExecutor::GetInstance().ExecuteAlltoAll(opInfo, true);
EXPECT_EQ(result, HCCL_SUCCESS);
HcomAllToAllVCParams opInfo1;
opInfo1.group = nullptr;
opInfo1.sendbuf = (void *)0x1000;
opInfo1.sendtype = HCCL_DATA_TYPE_INT8;
opInfo1.recvbuf = (void *)0x2000;
opInfo1.recvtype = HCCL_DATA_TYPE_INT8;
opInfo1.sendcountmatrix = (void *)0x3000;
MOCKER_CPP(HcceAlltoAllVC).stubs().will(returnValue(HCCL_SUCCESS));
result = hccl::HcomExecutor::GetInstance().ExecuteAlltoAllVC(opInfo1);
EXPECT_EQ(result, HCCL_SUCCESS);
}