* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* This file is part of the MindStudio project.
*
* MindStudio is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------*/
#include "analysis/csrc/application/database/db_assembler.h"
#include <atomic>
#include <functional>
#include "analysis/csrc/application/credential/id_pool.h"
#include "analysis/csrc/application/database/db_constant.h"
#include "analysis/csrc/domain/entities/viewer_data/ai_task/include/api_data.h"
#include "analysis/csrc/domain/entities/viewer_data/ai_task/include/ascend_task_data.h"
#include "analysis/csrc/domain/entities/viewer_data/ai_task/include/ccu_mission_data.h"
#include "analysis/csrc/domain/entities/viewer_data/ai_task/include/communication_info_data.h"
#include "analysis/csrc/domain/entities/viewer_data/ai_task/include/dpu_data.h"
#include "analysis/csrc/domain/entities/viewer_data/ai_task/include/kfc_turn_data.h"
#include "analysis/csrc/domain/entities/viewer_data/ai_task/include/mc2_comm_info_data.h"
#include "analysis/csrc/domain/entities/viewer_data/ai_task/include/memcpy_info_data.h"
#include "analysis/csrc/domain/entities/viewer_data/ai_task/include/msprof_tx_host_data.h"
#include "analysis/csrc/domain/entities/viewer_data/ai_task/include/task_info_data.h"
#include "analysis/csrc/domain/entities/viewer_data/ai_task/include/unified_pmu_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/acc_pmu_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/ddr_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/hbm_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/hccs_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/host_usage_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/llc_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/low_power_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/netdev_stats_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/npu_mem_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/npu_module_mem_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/npu_op_mem_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/pcie_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/qos_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/sio_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/soc_bandwidth_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/sys_io_data.h"
#include "analysis/csrc/domain/entities/viewer_data/system/include/ub_data.h"
#include "analysis/csrc/domain/services/environment/context.h"
#include "analysis/csrc/infrastructure/dfx/error_code.h"
#include "analysis/csrc/infrastructure/dump_tools/json_tool/include/json_writer.h"
#include "analysis/csrc/infrastructure/utils/thread_pool.h"
namespace Analysis
{
namespace Application
{
using namespace Analysis::Domain;
using namespace Analysis::Utils;
using namespace Analysis::Domain::Environment;
using IdPool = Analysis::Application::Credential::IdPool;
namespace
{
const std::string UNKNOWN = "UNKNOWN";
const size_t EXPECT_TIME_LEN = 14;
const std::string TASK_INDEX_NAME = "TaskIndex";
const std::vector<std::string> TASK_INDEX_COL_NAMES = {"startNs", "globalTaskId"};
const std::string COMM_INDEX_NAME = "CommunicationTaskIndex";
const std::vector<std::string> COMM_TASK_INDEX_COLS = {"globalTaskId"};
using CommScheduleDataFormat = std::vector<std::tuple<uint64_t, uint64_t, uint64_t, uint64_t>>;
using ComputeTaskInfoFormat =
std::vector<std::tuple<uint64_t, uint64_t, uint32_t, uint32_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t,
uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t>>;
using CommunicationOpDataFormat =
std::vector<std::tuple<uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint32_t, int32_t, int32_t, uint64_t,
uint64_t, uint64_t, uint64_t, uint16_t, uint32_t>>;
using CommunicationTaskDataFormat =
std::vector<std::tuple<uint64_t, uint64_t, uint64_t, uint32_t, uint64_t, uint64_t, uint64_t, uint32_t, uint32_t,
uint64_t, uint64_t, uint64_t, uint64_t, uint32_t, uint16_t, double>>;
enum class DevType
{
NPU,
CPU,
DPU,
};
struct ComputeTaskInfoData
{
uint64_t opName;
uint64_t globalTaskId;
uint64_t taskType;
uint64_t opType;
uint64_t inputFormats;
uint64_t inputDataTypes;
uint64_t inputShapes;
uint64_t outputFormats;
uint64_t outputDataTypes;
uint64_t outputShapes;
uint64_t hashId;
uint64_t opState;
uint64_t hf32Eligible;
uint64_t gridDim;
uint64_t blockDim;
};
std::string ReplaceQuotes(const std::string& input)
{
std::string res = input;
std::string::size_type pos = 0;
while ((pos = res.find(CSV_OPERATOR, pos)) != std::string::npos)
{
res.replace(pos, CSV_OPERATOR.length(), SINGLE_OPERATOR);
pos += SINGLE_OPERATOR.length();
}
return res;
}
void ProcessShapeInfo(ComputeTaskInfoData& taskInfoData, const TaskInfoData& item)
{
std::string inputFormats = ReplaceQuotes(item.inputFormats);
std::string inputDataTypes = ReplaceQuotes(item.inputDataTypes);
std::string inputShapes = ReplaceQuotes(item.inputShapes);
std::string outputFormats = ReplaceQuotes(item.outputFormats);
std::string outputDataTypes = ReplaceQuotes(item.outputDataTypes);
std::string outputShapes = ReplaceQuotes(item.outputShapes);
taskInfoData.inputFormats = IdPool::GetInstance().GetUint64Id(inputFormats);
taskInfoData.inputDataTypes = IdPool::GetInstance().GetUint64Id(inputDataTypes);
taskInfoData.inputShapes = IdPool::GetInstance().GetUint64Id(inputShapes);
taskInfoData.outputFormats = IdPool::GetInstance().GetUint64Id(outputFormats);
taskInfoData.outputDataTypes = IdPool::GetInstance().GetUint64Id(outputDataTypes);
taskInfoData.outputShapes = IdPool::GetInstance().GetUint64Id(outputShapes);
}
bool CreateTableIndex(const std::string& tableName, const std::string& indexName, const DBInfo& msprofDB,
const std::vector<std::string>& colNames)
{
INFO("Processor CreateTableIndex, table is % , indexName is %.", tableName, indexName);
if (msprofDB.dbRunner == nullptr)
{
ERROR("Report db runner is nullptr.");
return false;
}
if (!msprofDB.dbRunner->CreateIndex(tableName, indexName, colNames))
{
ERROR("Create table index failed, table is % , indexName is %.", tableName, indexName);
return false;
}
return true;
}
bool SaveApiData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
uint32_t pid = Context::GetInstance().GetPidFromInfoJson(HOST_ID, profPath);
auto apiData = dataInventory.GetPtr<std::vector<ApiData>>();
if (apiData == nullptr)
{
WARN("Api data not exist.");
return true;
}
std::vector<std::tuple<uint64_t, uint64_t, uint16_t, uint64_t, uint64_t, uint64_t>> res;
if (!Reserve(res, apiData->size()))
{
ERROR("Reserved for api data failed.");
return false;
}
for (const auto& item : *apiData)
{
uint64_t name = IdPool::GetInstance().GetUint64Id(item.apiName);
uint64_t globalTid = Utils::Contact(pid, item.threadId);
res.emplace_back(item.timestamp, item.end, item.level, globalTid, item.connectionId, name);
}
return SaveData(res, TABLE_NAME_CANN_API, msprofDB);
}
bool SaveMemcpyInfoData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
auto memcpyInfoData = dataInventory.GetPtr<std::vector<MemcpyInfoData>>();
if (memcpyInfoData == nullptr)
{
WARN("MemcpyInfo data not exist.");
return true;
}
std::vector<std::tuple<uint64_t, uint64_t, uint16_t>> res;
if (!Reserve(res, memcpyInfoData->size()))
{
ERROR("Reserved for memcpyInfo data failed.");
return false;
}
for (const auto& item : *memcpyInfoData)
{
uint64_t globalTaskId = IdPool::GetInstance().GetId(
std::make_tuple(item.taskId.deviceId, item.taskId.streamId, item.taskId.taskId, item.taskId.contextId,
item.taskId.batchId, static_cast<uint32_t>(DevType::NPU)));
res.emplace_back(globalTaskId, item.dataSize, item.memcpyOperation);
}
return SaveData(res, TABLE_NAME_MEMCPY_INFO, msprofDB);
}
template <typename T>
void ConvertOpData(CommunicationOpDataFormat& processedOpData, const std::vector<T>& opData)
{
for (const T& item : opData)
{
uint64_t groupName = IdPool::GetInstance().GetUint64Id(item.groupName);
uint64_t opName = IdPool::GetInstance().GetUint64Id(item.opName);
uint32_t opId = IdPool::GetInstance().GetUint32Id(item.opKey);
uint64_t algType = IdPool::GetInstance().GetUint64Id(item.algType);
uint64_t opType = IdPool::GetInstance().GetUint64Id(item.opType);
processedOpData.emplace_back(opName, item.timestamp, item.end, item.connectionId, groupName, opId, item.relay,
item.retry, item.dataType, algType, item.count, opType, item.deviceId,
item.rankSize);
}
}
bool SaveCommOpData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
auto opData = dataInventory.GetPtr<std::vector<CommunicationOpData>>();
auto kfcData = dataInventory.GetPtr<std::vector<KfcOpData>>();
if (opData == nullptr && kfcData == nullptr)
{
WARN("Communication op data not exist.");
return true;
}
CommunicationOpDataFormat processedOpData;
auto dataSize = (opData ? opData->size() : 0) + (kfcData ? kfcData->size() : 0);
if (!Reserve(processedOpData, dataSize))
{
ERROR("Reserved for communication op data failed.");
return false;
}
if (opData)
{
ConvertOpData<CommunicationOpData>(processedOpData, *opData);
}
if (kfcData)
{
ConvertOpData<KfcOpData>(processedOpData, *kfcData);
}
return SaveData(processedOpData, TABLE_NAME_COMMUNICATION_OP, msprofDB);
}
template <typename T>
void ConvertTaskData(CommunicationTaskDataFormat& processedTaskData, const std::vector<T>& taskData)
{
uint64_t notifyId;
for (const T& item : taskData)
{
uint64_t groupName = IdPool::GetInstance().GetUint64Id(item.groupName);
uint64_t opName = IdPool::GetInstance().GetUint64Id(item.opName);
uint64_t taskType = IdPool::GetInstance().GetUint64Id(item.taskType);
uint64_t globalTaskId =
IdPool::GetInstance().GetId(std::make_tuple(item.deviceId, item.streamId, item.taskId, item.contextId,
item.batchId, static_cast<uint32_t>(DevType::NPU)));
uint32_t opId = IdPool::GetInstance().GetUint32Id(item.opKey);
if (!IsNumber(item.notifyId) || StrToU64(notifyId, item.notifyId) != ANALYSIS_OK)
{
notifyId = UINT64_MAX;
}
processedTaskData.emplace_back(opName, globalTaskId, taskType, item.planeId, groupName, notifyId, item.rdmaType,
item.srcRank, item.dstRank, item.transportType, item.size, item.dataType,
item.linkType, opId, item.isMaster,
item.bandwidth * 1000 * 1000 * 1000);
}
}
bool SaveCommTaskData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
auto taskData = dataInventory.GetPtr<std::vector<CommunicationTaskData>>();
auto kfcTask = dataInventory.GetPtr<std::vector<KfcTaskData>>();
if (taskData == nullptr && kfcTask == nullptr)
{
WARN("Communication task data not exist.");
return true;
}
CommunicationTaskDataFormat processedTaskData;
auto dataSize = (taskData ? taskData->size() : 0) + (kfcTask ? kfcTask->size() : 0);
if (!Reserve(processedTaskData, dataSize))
{
ERROR("Reserved for communication task failed.");
return false;
}
if (taskData)
{
ConvertTaskData<CommunicationTaskData>(processedTaskData, *taskData);
}
if (kfcTask)
{
ConvertTaskData<KfcTaskData>(processedTaskData, *kfcTask);
}
return SaveData(processedTaskData, TABLE_NAME_COMMUNICATION_TASK_INFO, msprofDB) &&
CreateTableIndex(TABLE_NAME_COMMUNICATION_TASK_INFO, COMM_INDEX_NAME, msprofDB, COMM_TASK_INDEX_COLS);
}
bool SaveCommunicationData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
bool saveFlag = SaveCommOpData(dataInventory, msprofDB, profPath);
saveFlag = SaveCommTaskData(dataInventory, msprofDB, profPath) && saveFlag;
return saveFlag;
}
bool SaveAccPmuData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using AccPmuDataFormat =
std::vector<std::tuple<uint16_t, uint32_t, uint32_t, uint32_t, uint32_t, uint64_t, uint16_t>>;
auto accPmuData = dataInventory.GetPtr<std::vector<AccPmuData>>();
if (accPmuData == nullptr)
{
WARN("Acc pmu data not exist.");
return true;
}
AccPmuDataFormat res;
if (!Reserve(res, accPmuData->size()))
{
ERROR("Reserved for acc pmu data failed.");
return false;
}
for (const auto& item : *accPmuData)
{
res.emplace_back(item.accId, item.readBwLevel, item.writeBwLevel, item.readOstLevel, item.writeOstLevel,
item.timestamp, item.deviceId);
}
return SaveData(res, TABLE_NAME_ACC_PMU, msprofDB);
}
bool SaveAicoreFreqData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using AicoreFreqDataFormat = std::vector<std::tuple<uint16_t, uint64_t, double, int32_t>>;
auto aicoreFreqData = dataInventory.GetPtr<std::vector<LowPowerData>>();
if (aicoreFreqData == nullptr)
{
WARN("Aicore freq data not exist.");
return true;
}
AicoreFreqDataFormat res;
if (!Reserve(res, aicoreFreqData->size()))
{
ERROR("Reserved for aicore freq data failed.");
return false;
}
const static std::set<int32_t> dDieId{-1, 0, 1};
for (const auto& item : *aicoreFreqData)
{
if (dDieId.find(item.dieId) != dDieId.end())
{
res.emplace_back(item.deviceId, item.timestamp, item.freq, item.dieId);
}
}
return SaveData(res, TABLE_NAME_AICORE_FREQ, msprofDB);
}
bool SaveDDRData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using DDRDataFormat = std::vector<std::tuple<uint16_t, uint64_t, uint64_t, uint64_t>>;
auto ddrData = dataInventory.GetPtr<std::vector<DDRData>>();
if (ddrData == nullptr)
{
WARN("DDR data not exist.");
return true;
}
DDRDataFormat res;
if (!Reserve(res, ddrData->size()))
{
ERROR("Reserved for DDR data failed.");
return false;
}
for (const auto& item : *ddrData)
{
res.emplace_back(item.deviceId, item.timestamp, static_cast<uint64_t>(item.fluxRead),
static_cast<uint64_t>(item.fluxWrite));
}
return SaveData(res, TABLE_NAME_DDR, msprofDB);
}
bool SaveSingleEnumData(const std::string& tableName, DBInfo& msprofDB)
{
using EnumDataFormat = std::vector<std::tuple<uint16_t, std::string>>;
auto table = ENUM_TABLE.find(tableName);
EnumDataFormat enumData;
if (!Utils::Reserve(enumData, table->second.size()))
{
ERROR("Reserve for % data failed.", tableName);
return false;
}
for (const auto& record : table->second)
{
enumData.emplace_back(record.second, record.first);
}
return SaveData(enumData, tableName, msprofDB);
}
bool SaveEnumData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
INFO("EnumProcessor Process.");
bool flag = true;
for (const auto& tableInfo : ENUM_TABLE)
{
flag = SaveSingleEnumData(tableInfo.first, msprofDB) && flag;
}
return flag;
}
bool SaveHbmData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using HbmDataFormat = std::vector<std::tuple<uint16_t, uint64_t, uint64_t, uint8_t, uint64_t>>;
auto hbmData = dataInventory.GetPtr<std::vector<HbmData>>();
if (hbmData == nullptr)
{
WARN("HBM data not exist.");
return true;
}
HbmDataFormat res;
if (!Reserve(res, hbmData->size()))
{
ERROR("Reserved for HBM data failed.");
return false;
}
for (const auto& item : *hbmData)
{
uint64_t type = IdPool::GetInstance().GetUint64Id(item.eventType);
uint64_t bandwidth = static_cast<uint64_t>(item.bandWidth * BYTE_SIZE * BYTE_SIZE);
res.emplace_back(item.deviceId, item.timestamp, bandwidth, item.hbmId, type);
}
return SaveData(res, TABLE_NAME_HBM, msprofDB);
}
bool SaveHccsData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using HccsDataFormat = std::vector<std::tuple<uint16_t, uint64_t, uint64_t, uint64_t>>;
auto hccsData = dataInventory.GetPtr<std::vector<HccsData>>();
if (hccsData == nullptr)
{
WARN("HCCS data not exist.");
return true;
}
HccsDataFormat res;
if (!Reserve(res, hccsData->size()))
{
ERROR("Reserved for HCCS data failed.");
return false;
}
for (const auto& item : *hccsData)
{
uint64_t txThroughput = static_cast<uint64_t>(item.txThroughput * BYTE_SIZE * BYTE_SIZE);
uint64_t rxThroughput = static_cast<uint64_t>(item.rxThroughput * BYTE_SIZE * BYTE_SIZE);
res.emplace_back(item.deviceId, item.timestamp, txThroughput, rxThroughput);
}
return SaveData(res, TABLE_NAME_HCCS, msprofDB);
}
bool SaveNetDevStatsData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using NetDevStatsEventDataFormat =
std::vector<std::tuple<uint16_t, uint64_t, uint64_t, uint64_t, uint64_t, double, uint64_t, double, uint64_t,
uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t,
double, uint64_t, double>>;
auto netDevStatsEventData = dataInventory.GetPtr<std::vector<NetDevStatsEventData>>();
if (netDevStatsEventData == nullptr)
{
WARN("NetDevStats data not exist.");
return true;
}
NetDevStatsEventDataFormat res;
if (!Reserve(res, netDevStatsEventData->size()))
{
ERROR("Reserved for NetDevStats data failed.");
return false;
}
for (const auto& item : *netDevStatsEventData)
{
res.emplace_back(item.deviceId, item.timestamp, item.macTxPfcPkt, item.macRxPfcPkt, item.macTxByte,
item.macTxBandwidth, item.macRxByte, item.macRxBandwidth, item.macTxBadByte, item.macRxBadByte,
item.roceTxPkt, item.roceRxPkt, item.roceTxErrPkt, item.roceRxErrPkt, item.roceTxCnpPkt,
item.roceRxCnpPkt, item.roceNewPktRty, item.nicTxByte, item.nicTxBandwidth, item.nicRxByte,
item.nicRxBandwidth);
}
return SaveData(res, TABLE_NAME_NETDEV_STATS, msprofDB);
}
bool SaveHostInfoData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
if (profPath.empty())
{
ERROR("Prof path is empty.");
return false;
}
using HostInfoDataFormat = std::vector<std::tuple<std::string, std::string>>;
HostInfoDataFormat hostInfoData;
std::string hostUid = Context::GetInstance().GetHostUid(HOST_ID, profPath);
std::string hostName = Context::GetInstance().GetHostName(HOST_ID, profPath);
hostInfoData.emplace_back(hostUid, hostName);
if (hostInfoData.empty())
{
INFO("Host dir not exist.");
return true;
}
return SaveData(hostInfoData, TABLE_NAME_HOST_INFO, msprofDB);
}
bool SaveLlcData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using LlcDataFormat = std::vector<std::tuple<uint16_t, uint32_t, uint64_t, double, uint64_t, uint64_t>>;
auto llcData = dataInventory.GetPtr<std::vector<LLcData>>();
if (llcData == nullptr)
{
WARN("LLC data not exist.");
return true;
}
LlcDataFormat res;
if (!Reserve(res, llcData->size()))
{
ERROR("Reserved for LLC data failed.");
return false;
}
for (const auto& item : *llcData)
{
uint64_t throughput = static_cast<uint64_t>(item.throughput * BYTE_SIZE * BYTE_SIZE);
uint64_t mode = IdPool::GetInstance().GetUint64Id(item.mode);
res.emplace_back(item.deviceId, item.llcID, item.timestamp, item.hitRate, throughput, mode);
}
return SaveData(res, TABLE_NAME_LLC, msprofDB);
}
bool SaveMetaData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using DataFormat = std::vector<std::tuple<std::string, std::string>>;
DataFormat metaData;
if (!Utils::Reserve(metaData, META_DATA.size()))
{
ERROR("Reserve for meta data failed.");
return false;
}
for (const auto& record : META_DATA)
{
metaData.emplace_back(record.first, record.second);
}
return SaveData(metaData, TABLE_NAME_META_DATA, msprofDB);
}
bool SaveMsprofTxData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using MsprofTxDataFormat = std::vector<
std::tuple<uint64_t, uint64_t, uint16_t, uint32_t, uint32_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t>>;
auto msprofTxData = dataInventory.GetPtr<std::vector<MsprofTxHostData>>();
if (msprofTxData == nullptr)
{
WARN("MsprofTx data not exist.");
return true;
}
MsprofTxDataFormat res;
if (!Reserve(res, msprofTxData->size()))
{
ERROR("Reserved for MsprofTx data failed.");
return false;
}
uint32_t pid = Context::GetInstance().GetPidFromInfoJson(HOST_ID, profPath);
for (const auto& item : *msprofTxData)
{
uint64_t message = IdPool::GetInstance().GetUint64Id(item.message);
uint64_t domain = IdPool::GetInstance().GetUint64Id(item.domain);
uint64_t globalTid = Utils::Contact(pid, item.tid);
res.emplace_back(item.timestamp, item.end, item.eventType, UINT32_MAX, item.category, message, globalTid,
globalTid, domain, item.connectionId);
}
return SaveData(res, TABLE_NAME_MSTX, msprofDB);
}
void UpdateNpuData(const std::string& profPath, const std::string& deviceDir,
std::vector<std::tuple<int16_t, std::string>>& npuInfoData,
std::vector<std::tuple<int16_t, int16_t>>& rankDeviceMapData)
{
uint16_t deviceId = Utils::GetDeviceIdByDevicePath(deviceDir);
uint16_t chip = Context::GetInstance().GetPlatformVersion(deviceId, profPath);
std::string chipName;
auto it = CHIP_TABLE.find(chip);
if (it == CHIP_TABLE.end())
{
ERROR("Unknown chip type key: % in %", chip, deviceDir);
chipName = UNKNOWN;
}
else
{
chipName = it->second;
}
npuInfoData.emplace_back(static_cast<int16_t>(deviceId), chipName);
rankDeviceMapData.emplace_back(-1, static_cast<int16_t>(deviceId));
}
bool SaveNpuData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
if (profPath.empty())
{
ERROR("Prof path is empty.");
return false;
}
auto deviceDirs = Utils::File::GetFilesWithPrefix(profPath, DEVICE_PREFIX);
using NpuInfoDataFormat = std::vector<std::tuple<int16_t, std::string>>;
NpuInfoDataFormat npuInfoData;
using RankDeviceMapDataFormat = std::vector<std::tuple<int16_t, int16_t>>;
RankDeviceMapDataFormat rankDeviceMapData;
for (const auto& deviceDir : deviceDirs)
{
UpdateNpuData(profPath, deviceDir, npuInfoData, rankDeviceMapData);
}
if (deviceDirs.empty())
{
WARN("No device info in %, will save default data.", profPath);
npuInfoData.emplace_back(-1, UNKNOWN);
rankDeviceMapData.emplace_back(-1, -1);
}
return SaveData(npuInfoData, TABLE_NAME_NPU_INFO, msprofDB) &&
SaveData(rankDeviceMapData, TABLE_NAME_RANK_DEVICE_MAP, msprofDB);
}
bool SaveNpuMemData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using NpuMemDataFormat = std::vector<std::tuple<uint64_t, uint64_t, uint64_t, uint64_t, uint16_t>>;
auto npuMemData = dataInventory.GetPtr<std::vector<NpuMemData>>();
if (npuMemData == nullptr)
{
WARN("NpuMem data not exist.");
return true;
}
NpuMemDataFormat res;
if (!Reserve(res, npuMemData->size()))
{
ERROR("Reserved for NpuMem data failed.");
return false;
}
const std::string app = "app";
const uint64_t appIndex = 0;
const std::string device = "device";
const uint64_t deviceIndex = 1;
uint64_t stringAppId = IdPool::GetInstance().GetUint64Id(app);
uint64_t stringDeviceId = IdPool::GetInstance().GetUint64Id(device);
for (const auto& item : *npuMemData)
{
uint64_t type = UINT64_MAX;
if (StrToU64(type, item.event) == ANALYSIS_ERROR)
{
ERROR("Converting string(event: %) to integer failed, deviceId is: %.", item.event, item.deviceId);
}
if (type == appIndex)
{
type = stringAppId;
}
else if (type == deviceIndex)
{
type = stringDeviceId;
}
res.emplace_back(type, item.ddr, item.hbm, item.timestamp, item.deviceId);
}
return SaveData(res, TABLE_NAME_NPU_MEM, msprofDB);
}
bool SaveNpuOpMemData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using NpuOpMemDataFormat = std::vector<
std::tuple<uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint16_t>>;
auto npuOpMemData = dataInventory.GetPtr<std::vector<NpuOpMemData>>();
if (npuOpMemData == nullptr)
{
WARN("NpuOpMem data not exist.");
return true;
}
NpuOpMemDataFormat res;
if (!Reserve(res, npuOpMemData->size()))
{
ERROR("Reserved for NpuOpMem data failed.");
return false;
}
uint64_t stringGeId = IdPool::GetInstance().GetUint64Id("GE");
uint32_t pid = Context::GetInstance().GetPidFromInfoJson(HOST_ID, profPath);
uint64_t operatorNameId;
uint64_t globalTid;
for (const auto& item : *npuOpMemData)
{
operatorNameId = IdPool::GetInstance().GetUint64Id(item.operatorName);
globalTid = Utils::Contact(pid, item.threadId);
res.emplace_back(operatorNameId, item.addr, item.type, item.size, item.timestamp, globalTid,
item.totalAllocateMemory, item.totalReserveMemory, stringGeId, item.deviceId);
}
return SaveData(res, TABLE_NAME_NPU_OP_MEM, msprofDB);
}
bool SaveNpuModuleMemData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using NpuModuleMemDataFormat = std::vector<std::tuple<uint32_t, uint64_t, uint64_t, uint16_t>>;
auto npuModuleMemData = dataInventory.GetPtr<std::vector<NpuModuleMemData>>();
if (npuModuleMemData == nullptr)
{
WARN("NpuModuleMem data not exist.");
return true;
}
NpuModuleMemDataFormat res;
if (!Reserve(res, npuModuleMemData->size()))
{
ERROR("Reserved for NpuModuleMem data failed.");
return false;
}
for (const auto& item : *npuModuleMemData)
{
res.emplace_back(item.moduleId, item.timestamp, item.totalReserved, item.deviceId);
}
return SaveData(res, TABLE_NAME_NPU_MODULE_MEM, msprofDB);
}
bool SavePCIeData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using PCIeDataFormat =
std::vector<std::tuple<uint16_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t,
uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t,
uint64_t, uint64_t, uint64_t, uint64_t, uint64_t>>;
auto pcieMemData = dataInventory.GetPtr<std::vector<PCIeData>>();
if (pcieMemData == nullptr)
{
WARN("PCIe data not exist.");
return true;
}
PCIeDataFormat res;
if (!Reserve(res, pcieMemData->size()))
{
ERROR("Reserved for PCIe data failed.");
return false;
}
for (const auto& item : *pcieMemData)
{
res.emplace_back(item.deviceId, item.timestamp, item.txPost.min, item.txPost.max, item.txPost.avg,
item.txNonpost.min, item.txNonpost.max, item.txNonpost.avg, item.txCpl.min, item.txCpl.max,
item.txCpl.avg, item.txNonpostLatency.min, item.txNonpostLatency.max,
item.txNonpostLatency.avg, item.rxPost.min, item.rxPost.max, item.rxPost.avg,
item.rxNonpost.min, item.rxNonpost.max, item.rxNonpost.avg, item.rxCpl.min, item.rxCpl.max,
item.rxCpl.avg);
}
return SaveData(res, TABLE_NAME_PCIE, msprofDB);
}
bool SaveSessionTimeInfoData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using TimeDataFormat = std::vector<std::tuple<uint64_t, uint64_t>>;
Utils::ProfTimeRecord tempRecord;
if (!Context::GetInstance().GetProfTimeRecordInfo(tempRecord, profPath))
{
ERROR("GetProfTimeRecordInfo failed, profPath is %.", profPath);
return false;
}
TimeDataFormat timeInfoData = {std::make_tuple(tempRecord.startTimeNs, tempRecord.endTimeNs)};
return SaveData(timeInfoData, TABLE_NAME_SESSION_TIME_INFO, msprofDB);
}
bool SaveSocData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using SocDataFormat = std::vector<std::tuple<uint32_t, uint32_t, uint64_t, uint16_t>>;
auto socMemData = dataInventory.GetPtr<std::vector<SocBandwidthData>>();
if (socMemData == nullptr)
{
WARN("Soc data not exist.");
return true;
}
SocDataFormat res;
if (!Reserve(res, socMemData->size()))
{
ERROR("Reserved for Soc data failed.");
return false;
}
for (const auto& item : *socMemData)
{
res.emplace_back(item.l2BufferBwLevel, item.mataBwLevel, item.timestamp, item.deviceId);
}
return SaveData(res, TABLE_NAME_SOC, msprofDB);
}
bool SaveComputeTaskInfo(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
auto computeTaskInfo = dataInventory.GetPtr<std::vector<TaskInfoData>>();
auto kfcStream = dataInventory.GetPtr<std::vector<MC2CommInfoData>>();
if (computeTaskInfo == nullptr)
{
WARN("ComputeTaskInfo data not exist.");
return true;
}
ComputeTaskInfoFormat res;
CommScheduleDataFormat scheduleData;
if (!Reserve(res, computeTaskInfo->size()))
{
ERROR("Reserved for ComputeTaskInfo data failed.");
return false;
}
ComputeTaskInfoData taskInfoData{};
for (const auto& item : *computeTaskInfo)
{
taskInfoData.opName = IdPool::GetInstance().GetUint64Id(item.opName);
taskInfoData.globalTaskId =
IdPool::GetInstance().GetId(std::make_tuple(item.deviceId, item.streamId, item.taskId, item.contextId,
item.batchId, static_cast<uint32_t>(DevType::NPU)));
taskInfoData.taskType = IdPool::GetInstance().GetUint64Id(item.taskType);
taskInfoData.opType = IdPool::GetInstance().GetUint64Id(item.opType);
if ((kfcStream && std::find_if(kfcStream->begin(), kfcStream->end(), [item](const MC2CommInfoData& mc)
{ return mc.aiCpuKfcStreamId == item.streamId; }) != kfcStream->end()) ||
Utils::EndsWith(item.opName, AICPU_KERNEL))
{
scheduleData.emplace_back(taskInfoData.opName, taskInfoData.globalTaskId, taskInfoData.taskType,
taskInfoData.opType);
continue;
}
ProcessShapeInfo(taskInfoData, item);
taskInfoData.hashId = IdPool::GetInstance().GetUint64Id(item.hashId);
taskInfoData.opState = IdPool::GetInstance().GetUint64Id(item.opState);
taskInfoData.hf32Eligible = IdPool::GetInstance().GetUint64Id(item.opFlag);
taskInfoData.gridDim = IdPool::GetInstance().GetUint64Id(item.gridDim);
taskInfoData.blockDim = IdPool::GetInstance().GetUint64Id(item.blockDim);
res.emplace_back(taskInfoData.opName, taskInfoData.globalTaskId, item.blockNum, item.mixBlockNum,
taskInfoData.taskType, taskInfoData.opType, taskInfoData.inputFormats,
taskInfoData.inputDataTypes, taskInfoData.inputShapes, taskInfoData.outputFormats,
taskInfoData.outputDataTypes, taskInfoData.outputShapes, taskInfoData.hashId,
taskInfoData.opState, taskInfoData.hf32Eligible, taskInfoData.gridDim, taskInfoData.blockDim);
}
bool flag = true;
if (!res.empty())
{
flag = SaveData(res, TABLE_NAME_COMPUTE_TASK_INFO, msprofDB);
}
if (!scheduleData.empty())
{
flag = SaveData(scheduleData, TABLE_NAME_COMMUNICATION_SCHEDULE_TASK_INFO, msprofDB) && flag;
}
return flag;
}
bool SaveAscendTaskData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using ascendTaskDataFormat = std::vector<std::tuple<uint64_t, uint64_t, uint32_t, int64_t, uint64_t, uint32_t,
uint32_t, uint32_t, int32_t, uint32_t, uint32_t>>;
auto ascendTaskData = dataInventory.GetPtr<std::vector<AscendTaskData>>();
auto deviceTxData = dataInventory.GetPtr<std::vector<MsprofTxDeviceData>>();
if (ascendTaskData == nullptr && deviceTxData == nullptr)
{
WARN("AscendTaskData data and device tx data not exist.");
return true;
}
ascendTaskDataFormat res;
auto dataSize = (ascendTaskData ? ascendTaskData->size() : 0) + (deviceTxData ? deviceTxData->size() : 0);
if (!Reserve(res, dataSize))
{
ERROR("Reserved for AscendTaskData data failed.");
return false;
}
uint64_t globalTaskId;
uint32_t globalPid = Context::GetInstance().GetPidFromInfoJson(HOST_ID, profPath);
uint64_t taskType;
if (ascendTaskData != nullptr)
{
for (const auto& item : *ascendTaskData)
{
globalTaskId =
IdPool::GetInstance().GetId(std::make_tuple(item.deviceId, item.streamId, item.taskId, item.contextId,
item.batchId, static_cast<uint32_t>(DevType::NPU)));
taskType = IdPool::GetInstance().GetUint64Id(item.taskType);
res.emplace_back(item.timestamp, item.end, item.deviceId, item.connectionId, globalTaskId, globalPid,
taskType, item.contextId, item.streamId, item.taskId, item.modelId);
}
}
if (deviceTxData != nullptr)
{
for (const auto& txData : *deviceTxData)
{
globalTaskId = IdPool ::GetInstance().GetId(std::make_tuple(txData.deviceId, txData.streamId, txData.taskId,
UINT32_MAX, txData.connectionId,
static_cast<uint32_t>(DevType::NPU)));
taskType = IdPool::GetInstance().GetUint64Id(txData.taskType);
res.emplace_back(txData.timestamp, txData.timestamp + static_cast<uint64_t>(txData.duration),
txData.deviceId, txData.connectionId, globalTaskId, globalPid, taskType, UINT32_MAX,
txData.streamId, txData.taskId, txData.modelId);
}
}
return SaveData(res, TABLE_NAME_TASK, msprofDB) &&
CreateTableIndex(TABLE_NAME_TASK, TASK_INDEX_NAME, msprofDB, TASK_INDEX_COL_NAMES);
}
bool SaveStringIdsData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using OriDataFormat = std::unordered_map<std::string, uint64_t>;
using ProcessedDataFormat = std::vector<std::tuple<uint64_t, std::string>>;
OriDataFormat oriData = IdPool::GetInstance().GetAllUint64Ids();
if (oriData.empty())
{
WARN("No StringIds data.");
return true;
}
ProcessedDataFormat res;
if (!Utils::Reserve(res, oriData.size()))
{
ERROR("Reserve for stringIds data failed.");
return false;
}
for (const auto& pair : oriData)
{
res.emplace_back(pair.second, pair.first);
}
return SaveData(res, TABLE_NAME_STRING_IDS, msprofDB);
}
template <typename T>
bool SaveSysIOData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& tableName)
{
using SysIODataFormat =
std::vector<std::tuple<uint16_t, uint64_t, uint64_t, double, double, uint32_t, uint32_t, uint32_t, uint32_t,
double, double, uint32_t, uint32_t, uint32_t, uint32_t, uint16_t>>;
auto sysIOMemData = dataInventory.GetPtr<std::vector<T>>();
if (sysIOMemData == nullptr)
{
WARN("SysIO % data not exist.", tableName);
return true;
}
SysIODataFormat res;
auto sysIOOriginalData = sysIOMemData->back().sysIOOriginalData;
if (sysIOOriginalData.empty())
{
WARN("SysIO % data not exist.", tableName);
return true;
}
if (!Reserve(res, sysIOOriginalData.size()))
{
ERROR("Reserved for SysIO % data failed.", tableName);
return false;
}
for (const auto& item : sysIOOriginalData)
{
res.emplace_back(item.deviceId, item.timestamp,
item.bandwidth,
item.rxPacketRate, item.rxByteRate, item.rxPackets, item.rxBytes, item.rxErrors,
item.rxDropped, item.txPacketRate, item.txByteRate, item.txPackets, item.txBytes,
item.txErrors, item.txDropped, item.funcId);
}
return SaveData(res, tableName, msprofDB);
}
bool SaveNicData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
return SaveSysIOData<NicOriginalData>(dataInventory, msprofDB, TABLE_NAME_NIC);
}
bool SaveRoCEData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
return SaveSysIOData<RoceOriginalData>(dataInventory, msprofDB, TABLE_NAME_ROCE);
}
bool SaveTaskPmuData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using PTFormat = std::vector<std::tuple<uint64_t, uint64_t, double>>;
PTFormat res;
auto unifiedTaskPmuData = dataInventory.GetPtr<std::vector<UnifiedTaskPmu>>();
if (unifiedTaskPmuData == nullptr)
{
WARN("unifiedTaskPmuData data not exist.");
return true;
}
if (!Reserve(res, unifiedTaskPmuData->size()))
{
ERROR("Reserved for unifiedTaskPmuData data failed.");
return false;
}
for (const auto& item : *unifiedTaskPmuData)
{
uint64_t globalTaskId = IdPool::GetInstance().GetId(
std::make_tuple(static_cast<uint16_t>(item.deviceId), item.streamId, item.taskId, item.subtaskId,
item.batchId, static_cast<uint32_t>(DevType::NPU)));
res.emplace_back(globalTaskId, IdPool::GetInstance().GetUint64Id(item.header), item.value);
}
return SaveData(res, TABLE_NAME_TASK_PMU_INFO, msprofDB);
}
bool SaveSamplePmuTimelineData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using PSTFormat = std::vector<std::tuple<uint16_t, uint64_t, uint64_t, double, double, uint16_t, uint64_t>>;
PSTFormat res;
auto unifiedSamplePmuTimelineData = dataInventory.GetPtr<std::vector<UnifiedSampleTimelinePmu>>();
if (unifiedSamplePmuTimelineData == nullptr)
{
WARN("UnifiedSampleTimelinePmu data not exist.");
return true;
}
if (!Reserve(res, unifiedSamplePmuTimelineData->size()))
{
ERROR("Reserved for UnifiedSampleTimelinePmu data failed.");
return false;
}
for (const auto& item : *unifiedSamplePmuTimelineData)
{
res.emplace_back(item.deviceId, item.timestamp, item.totalCycle, item.usage, item.freq, item.coreId,
item.coreType);
}
return SaveData(res, TABLE_NAME_SAMPLE_PMU_TIMELINE, msprofDB);
}
bool SaveSamplePmuSummaryData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
using PSSFormat = std::vector<std::tuple<uint16_t, uint64_t, double, uint16_t, uint64_t>>;
PSSFormat res;
auto unifiedSamplePmuSummaryData = dataInventory.GetPtr<std::vector<UnifiedSampleSummaryPmu>>();
if (unifiedSamplePmuSummaryData == nullptr)
{
WARN("UnifiedSampleSummaryPmu data not exist.");
return true;
}
if (!Reserve(res, unifiedSamplePmuSummaryData->size()))
{
ERROR("Reserved for UnifiedSampleSummaryPmu data failed.");
return false;
}
for (const auto& item : *unifiedSamplePmuSummaryData)
{
res.emplace_back(item.deviceId, IdPool::GetInstance().GetUint64Id(item.metric), item.value, item.coreId,
item.coreType);
}
return SaveData(res, TABLE_NAME_SAMPLE_PMU_SUMMARY, msprofDB);
}
bool SaveCpuUsageData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
auto cpuData = dataInventory.GetPtr<std::vector<CpuUsageData>>();
if (cpuData == nullptr)
{
WARN("Cpu usage data not exist.");
return true;
}
std::vector<std::tuple<uint64_t, uint64_t, double>> res;
if (!Reserve(res, cpuData->size()))
{
ERROR("Reserved for cpu usage data failed.");
return false;
}
const std::string avgStr = "Avg";
std::unordered_map<std::string, uint64_t> cpuIds;
std::set<std::string> invalidIds{avgStr};
for (const auto& item : *cpuData)
{
if (invalidIds.find(item.cpuNo) != invalidIds.end() || cpuIds.find(item.cpuNo) != cpuIds.end())
{
continue;
}
uint64_t cpuId;
if (StrToU64(cpuId, item.cpuNo) == ANALYSIS_OK)
{
cpuIds[item.cpuNo] = cpuId;
}
else
{
invalidIds.insert(item.cpuNo);
}
}
for (const auto& item : *cpuData)
{
if (invalidIds.find(item.cpuNo) != invalidIds.end())
{
continue;
}
res.emplace_back(item.timestamp, cpuIds[item.cpuNo], item.usage);
}
return SaveData(res, TABLE_NAME_CPU_USAGE, msprofDB);
}
bool SaveHostMemUsageData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
auto memData = dataInventory.GetPtr<std::vector<MemUsageData>>();
if (memData == nullptr)
{
WARN("Host mem usage data not exist.");
return true;
}
std::vector<std::tuple<uint64_t, double>> res;
if (!Reserve(res, memData->size()))
{
ERROR("Reserved for mem usage data failed.");
return false;
}
for (const auto& item : *memData)
{
res.emplace_back(item.timestamp, item.usage);
}
return SaveData(res, TABLE_NAME_HOST_MEM_USAGE, msprofDB);
}
bool SaveHostDiskUsageData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
auto diskData = dataInventory.GetPtr<std::vector<DiskUsageData>>();
if (diskData == nullptr)
{
WARN("Host disk usage data not exist.");
return true;
}
std::vector<std::tuple<uint64_t, double, double, double>> res;
if (!Reserve(res, diskData->size()))
{
ERROR("Reserved for disk usage data failed.");
return false;
}
for (const auto& item : *diskData)
{
res.emplace_back(item.timestamp, item.readRate * BYTE_SIZE, item.writeRate * BYTE_SIZE,
item.usage);
}
return SaveData(res, TABLE_NAME_HOST_DISK_USAGE, msprofDB);
}
bool SaveHostNetworkUsageData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
auto networkData = dataInventory.GetPtr<std::vector<NetWorkUsageData>>();
if (networkData == nullptr)
{
WARN("Host network usage data not exist.");
return true;
}
std::vector<std::tuple<uint64_t, double, double>> res;
if (!Reserve(res, networkData->size()))
{
ERROR("Reserved for network usage data failed.");
return false;
}
for (const auto& item : *networkData)
{
res.emplace_back(item.timestamp, item.usage, item.speed * BYTE_SIZE);
}
return SaveData(res, TABLE_NAME_HOST_NETWORK_USAGE, msprofDB);
}
bool SaveOSRuntimeApiData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
auto runtimeApiData = dataInventory.GetPtr<std::vector<OSRuntimeApiData>>();
if (runtimeApiData == nullptr)
{
WARN("OS runtime api data not exist.");
return true;
}
std::vector<std::tuple<uint64_t, uint64_t, uint64_t, uint64_t>> res;
if (!Reserve(res, runtimeApiData->size()))
{
ERROR("Reserved for runtime api data failed.");
return false;
}
for (const auto& item : *runtimeApiData)
{
res.emplace_back(IdPool::GetInstance().GetUint64Id(item.name), Contact(item.pid, item.tid), item.timestamp,
item.endTime);
}
return SaveData(res, TABLE_NAME_OSRT_API, msprofDB);
}
bool SaveQosData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
const std::string QOS = "QoS ";
auto qosData = dataInventory.GetPtr<std::vector<QosData>>();
if (qosData == nullptr)
{
WARN("QOS data not exist.");
return true;
}
std::vector<std::tuple<uint64_t, uint64_t, uint64_t, uint64_t>> res;
if (!Reserve(res, qosData->size()))
{
ERROR("Reserved for QOS data failed.");
return false;
}
std::unordered_map<uint16_t, std::vector<uint64_t>> qosEventsMap;
auto deviceList = File::GetFilesWithPrefix(profPath, DEVICE_PREFIX);
for (const auto& devicePath : deviceList)
{
auto deviceId = GetDeviceIdByDevicePath(devicePath);
auto qosEvents = Context::GetInstance().GetQosEvents(deviceId, profPath);
std::vector<uint64_t> qosEventsIds;
for (const auto& event : qosEvents)
{
qosEventsIds.push_back(IdPool::GetInstance().GetUint64Id(QOS + event));
}
qosEventsMap[deviceId] = qosEventsIds;
}
for (const auto& data : *qosData)
{
auto it = qosEventsMap.find(data.deviceId);
if (it == qosEventsMap.end())
{
continue;
}
std::vector<uint32_t> bandwidth{data.bw1, data.bw2, data.bw3, data.bw4, data.bw5,
data.bw6, data.bw7, data.bw8, data.bw9, data.bw10};
for (size_t i = 0; i < it->second.size(); i++)
{
res.emplace_back(data.deviceId, it->second[i], bandwidth[i] * BYTE_SIZE * BYTE_SIZE, data.timestamp);
}
}
return SaveData(res, TABLE_NAME_QOS, msprofDB);
}
bool SaveDPUData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
auto dpuData = dataInventory.GetPtr<std::vector<DPUData>>();
if (dpuData == nullptr || dpuData->empty())
{
WARN("Can't get dpuData from dataInventory");
return true;
}
using DPUTaskFormat = std::vector<
std::tuple<uint16_t, uint32_t, uint64_t, uint64_t, uint64_t, uint16_t, uint32_t, uint64_t, uint64_t>>;
DPUTaskFormat res;
if (!Reserve(res, dpuData->size()))
{
ERROR("Reserved for DPU task data failed.");
return false;
}
for (const auto& data : *dpuData)
{
uint64_t globalTaskId = IdPool::GetInstance().GetId(std::make_tuple(
data.dpuDeviceId, data.streamId, data.taskId, UINT32_MAX, UINT32_MAX, static_cast<uint32_t>(DevType::DPU)));
uint64_t opNameId = IdPool::GetInstance().GetUint64Id(data.opName);
Infra::JsonWriter jsonWriter;
jsonWriter.StartArray();
jsonWriter["Thread Id"] << data.threadId;
jsonWriter["Physic Stream Id"] << data.streamId;
jsonWriter["Task Id"] << data.taskId;
if (!data.isHccl)
{
jsonWriter["Task Type"] << data.taskType;
}
else
{
double bandwidth = 0;
double dur = static_cast<double>(data.endTime - data.timestamp) / 1000.0;
if (dur > 0 && data.dataSize <= INT64_MAX)
{
constexpr double MICRO_SECOND = 1000000.0;
bandwidth = data.dataSize / (dur / MICRO_SECOND);
}
jsonWriter["OP Type"] << data.opType;
jsonWriter["AI CPU Device Id"] << data.npuDeviceId;
jsonWriter["AI CPU Task Id"] << data.aicpuTaskId;
jsonWriter["Group Name"] << (data.groupName + "(" + data.groupNameId + ")");
jsonWriter["Plane Id"] << data.planeId;
jsonWriter["Notify Id"] << data.notifyId;
jsonWriter["Duration Estimated(us)"] << data.durationEstimated;
jsonWriter["Rank Size"] << data.rankSize;
jsonWriter["Src Rank"] << data.localRank;
jsonWriter["Dst Rank"] << data.remoteRank;
jsonWriter["Transport Type"] << data.transportType;
jsonWriter["Size(Byte)"] << data.dataSize;
jsonWriter["Bandwidth(B/s)"] << bandwidth;
jsonWriter["Data Type"] << data.dataType;
jsonWriter["Link Type"] << data.linkType;
jsonWriter["Rdma Type"] << data.rdmaType;
}
jsonWriter.EndArray();
std::string args = jsonWriter.GetString();
uint64_t argsId = IdPool::GetInstance().GetUint64Id(args);
res.emplace_back(data.dpuDeviceId, data.threadId, data.timestamp, data.endTime, globalTaskId, data.streamId,
data.taskId, opNameId, argsId);
}
return SaveData(res, TABLE_NAME_DPU_TASK, msprofDB);
}
bool SaveSIOData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
auto sioData = dataInventory.GetPtr<std::vector<SioData>>();
if (sioData == nullptr || sioData->empty())
{
WARN("SIO data not exist.");
return true;
}
using SioDataFormat = std::vector<
std::tuple<uint16_t, uint64_t, uint64_t, double, double, double, double, double, double, double, double>>;
SioDataFormat res;
if (!Reserve(res, sioData->size()))
{
ERROR("Reserved for SIO data failed.");
return false;
}
for (const auto& data : *sioData)
{
uint64_t nameId = IdPool::GetInstance().GetUint64Id(data.name);
res.emplace_back(data.deviceId, nameId, data.timestamp, data.reqRxBandwidth * BYTE_SIZE * BYTE_SIZE,
data.rspRxBandwidth * BYTE_SIZE * BYTE_SIZE, data.snpRxBandwidth * BYTE_SIZE * BYTE_SIZE,
data.datRxBandwidth * BYTE_SIZE * BYTE_SIZE, data.reqTxBandwidth * BYTE_SIZE * BYTE_SIZE,
data.rspTxBandwidth * BYTE_SIZE * BYTE_SIZE, data.snpTxBandwidth * BYTE_SIZE * BYTE_SIZE,
data.datTxBandwidth * BYTE_SIZE * BYTE_SIZE);
}
return SaveData(res, TABLE_NAME_SIO, msprofDB);
}
bool SaveUBData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
auto ubData = dataInventory.GetPtr<std::vector<UbData>>();
if (ubData == nullptr || ubData->empty())
{
WARN("UB data not exist.");
return true;
}
using UbDataFormat = std::vector<std::tuple<uint16_t, uint16_t, uint64_t, uint64_t, uint64_t>>;
UbDataFormat res;
if (!Reserve(res, ubData->size()))
{
ERROR("Reserved for UB data failed.");
return false;
}
for (const auto& data : *ubData)
{
res.emplace_back(data.deviceId, data.portId, data.timestamp, data.udmaRxBind, data.udmaTxBind);
}
return SaveData(res, TABLE_NAME_UB, msprofDB);
}
bool SaveCCUData(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)
{
auto ccuData = dataInventory.GetPtr<std::vector<CCUMissionTimelineData>>();
if (ccuData == nullptr || ccuData->empty())
{
WARN("CCU mission data not exist.");
return true;
}
using CcuDataFormat = std::vector<std::tuple<uint16_t, uint64_t, uint64_t, uint64_t, uint64_t, uint64_t>>;
CcuDataFormat res;
if (!Reserve(res, ccuData->size()))
{
ERROR("Reserved for CCU data failed.");
return false;
}
for (const auto& data : *ccuData)
{
uint64_t globalTaskId = IdPool::GetInstance().GetId(std::make_tuple(
data.deviceId, data.streamId, data.taskId, UINT32_MAX, UINT32_MAX, static_cast<uint32_t>(DevType::NPU)));
uint64_t nameId = IdPool::GetInstance().GetUint64Id(data.timeType);
uint64_t startNs = data.timestamp;
uint64_t endNs = data.timestamp + static_cast<uint64_t>(data.duration);
Infra::JsonWriter jsonWriter;
jsonWriter.StartArray();
jsonWriter["Physic Stream Id"] << data.streamId;
jsonWriter["Task Id"] << data.taskId;
if (data.timeType == CCU_TIME_TYPE_WAIT)
{
jsonWriter["Notify Instruction ID"] << data.instructionId;
jsonWriter["Notify Rank ID"] << data.notifyRankId;
}
else
{
jsonWriter["Instruction ID"] << data.instructionId;
}
if (data.hasDieId)
{
jsonWriter["Die Id"] << data.dieId;
}
if (data.hasDataSize)
{
jsonWriter["Data Size"] << data.dataSize;
}
if (data.hasBandwidth)
{
jsonWriter["Bandwidth (B/s)"] << data.bandwidth * BYTE_SIZE * BYTE_SIZE;
}
if (data.hasReduceInfo)
{
jsonWriter["Reduce Op Type"] << data.reduceOpType;
jsonWriter["Input Data Type"] << data.inputDataType;
jsonWriter["Output Data Type"] << data.outputDataType;
}
if (data.hasMask)
{
jsonWriter["Mask"] << data.mask;
}
if (data.hasDelayChannel)
{
jsonWriter["Maximum Delay Channel"] << data.maxDelayChannel;
jsonWriter["Maximum Channel Delay"] << data.maxChannelDelay;
}
jsonWriter.EndArray();
std::string args = jsonWriter.GetString();
uint64_t argsId = IdPool::GetInstance().GetUint64Id(args);
res.emplace_back(data.deviceId, globalTaskId, nameId, startNs, endNs, argsId);
}
return SaveData(res, TABLE_NAME_CCU, msprofDB);
}
using SaveDataFunc = std::function<bool(DataInventory& dataInventory, DBInfo& msprofDB, const std::string& profPath)>;
const std::unordered_map<std::string, SaveDataFunc> DATA_SAVER = {
{Viewer::Database::PROCESSOR_NAME_API, SaveApiData},
{Viewer::Database::PROCESSOR_NAME_COMMUNICATION, SaveCommunicationData},
{Viewer::Database::PROCESSOR_NAME_ACC_PMU, SaveAccPmuData},
{Viewer::Database::PROCESSOR_NAME_AICORE_FREQ, SaveAicoreFreqData},
{Viewer::Database::PROCESSOR_NAME_DDR, SaveDDRData},
{Viewer::Database::PROCESSOR_NAME_ENUM, SaveEnumData},
{Viewer::Database::PROCESSOR_NAME_HBM, SaveHbmData},
{Viewer::Database::PROCESSOR_NAME_HOST_INFO, SaveHostInfoData},
{Viewer::Database::PROCESSOR_NAME_HCCS, SaveHccsData},
{Viewer::Database::PROCESSOR_NAME_NETDEV_STATS, SaveNetDevStatsData},
{Viewer::Database::PROCESSOR_NAME_LLC, SaveLlcData},
{Viewer::Database::PROCESSOR_NAME_META_DATA, SaveMetaData},
{Viewer::Database::PROCESSOR_NAME_MSTX, SaveMsprofTxData},
{Viewer::Database::PROCESSOR_NAME_NPU_INFO, SaveNpuData},
{Viewer::Database::PROCESSOR_NAME_NPU_MEM, SaveNpuMemData},
{Viewer::Database::PROCESSOR_NAME_NPU_OP_MEM, SaveNpuOpMemData},
{Viewer::Database::PROCESSOR_NAME_NPU_MODULE_MEM, SaveNpuModuleMemData},
{Viewer::Database::PROCESSOR_NAME_PCIE, SavePCIeData},
{Viewer::Database::PROCESSOR_NAME_SESSION_TIME_INFO, SaveSessionTimeInfoData},
{Viewer::Database::PROCESSOR_NAME_SOC, SaveSocData},
{Viewer::Database::PROCESSOR_NAME_NIC, SaveNicData},
{Viewer::Database::PROCESSOR_NAME_ROCE, SaveRoCEData},
{Viewer::Database::PROCESSOR_NAME_DPU, SaveDPUData},
{Viewer::Database::PROCESSOR_NAME_SIO, SaveSIOData},
{Viewer::Database::PROCESSOR_NAME_UB, SaveUBData},
{Viewer::Database::PROCESSOR_NAME_TASK, SaveAscendTaskData},
{Viewer::Database::PROCESSOR_NAME_COMPUTE_TASK_INFO, SaveComputeTaskInfo},
{Viewer::Database::PROCESSOR_NAME_MEMCPY_INFO, SaveMemcpyInfoData},
{Viewer::Database::PROCESSOR_NAME_TASK_PMU_INFO, SaveTaskPmuData},
{Viewer::Database::PROCESSOR_NAME_SAMPLE_PMU_TIMELINE, SaveSamplePmuTimelineData},
{Viewer::Database::PROCESSOR_NAME_SAMPLE_PMU_SUMMARY, SaveSamplePmuSummaryData},
{Viewer::Database::PROCESSOR_NAME_CPU_USAGE, SaveCpuUsageData},
{Viewer::Database::PROCESSOR_NAME_MEM_USAGE, SaveHostMemUsageData},
{Viewer::Database::PROCESSOR_NAME_DISK_USAGE, SaveHostDiskUsageData},
{Viewer::Database::PROCESSOR_NAME_NETWORK_USAGE, SaveHostNetworkUsageData},
{Viewer::Database::PROCESSOR_NAME_OSRT_API, SaveOSRuntimeApiData},
{Viewer::Database::PROCESSOR_NAME_QOS, SaveQosData},
{Viewer::Database::PROCESSOR_NAME_CCU_MISSION, SaveCCUData},
};
bool CheckMsprofDb(const std::string& outputPath)
{
std::vector<std::string> files = File::GetOriginData(outputPath, {DB_NAME_MSPROF_DB}, {".json", ".csv"});
if (files.empty())
{
return false;
}
std::string timestampMax;
std::string latestFile;
for (const auto& file : files)
{
auto dbName = Split(file, "/").back();
size_t start = dbName.find(DB_NAME_MSPROF_DB) + DB_NAME_MSPROF_DB.length() + 1;
size_t end = dbName.find(".db");
if (start == std::string::npos || end == std::string::npos) continue;
std::string timestampStr = dbName.substr(start, end - start);
if (!IsNumber(timestampStr) || timestampStr.size() != EXPECT_TIME_LEN)
{
ERROR("Invalid msprof db name %.", dbName);
continue;
}
if (timestampStr > timestampMax)
{
timestampMax = timestampStr;
latestFile = file;
}
}
DBInfo msprofDB(latestFile, TABLE_NAME_STRING_IDS);
if (!msprofDB.ConstructDBRunner(latestFile))
{
ERROR("Construct for msprof db runner failed.");
return false;
}
if (!Utils::FileReader::Check(latestFile, MAX_DB_BYTES))
{
ERROR("Check % failed.", latestFile);
return false;
}
if (msprofDB.dbRunner->CheckTableExists(msprofDB.tableName))
{
INFO("Find completed msprof db, %.", latestFile);
return true;
}
INFO("The % database is incomplete and will be deleted.", latestFile);
PRINT_INFO("The % database is incomplete and will be deleted.", latestFile);
if (!Utils::File::DeleteFile(latestFile))
{
ERROR("Failed to delete file, %.", latestFile);
}
return false;
}
std::string GetDBPath(const std::string& outputDir)
{
return Utils::File::PathJoin({outputDir, DB_NAME_MSPROF_DB + "_" + Analysis::Utils::GetFormatLocalTime() + ".db"});
}
const std::set<std::string> DB_DATA_PROCESS_LIST{
PROCESSOR_NAME_API, PROCESSOR_NAME_COMMUNICATION, PROCESSOR_NAME_COMPUTE_TASK_INFO,
PROCESSOR_NAME_KFC_TASK, PROCESSOR_NAME_KFC_COMM, PROCESSOR_NAME_DEVICE_TX,
PROCESSOR_NAME_MSTX, PROCESSOR_NAME_STEP_TRACE, PROCESSOR_NAME_TASK,
PROCESSOR_NAME_ACC_PMU, PROCESSOR_NAME_AICORE_FREQ, PROCESSOR_NAME_LOW_POWER,
PROCESSOR_NAME_DDR, PROCESSOR_NAME_HBM, PROCESSOR_NAME_HCCS,
PROCESSOR_NAME_NETDEV_STATS, PROCESSOR_NAME_CPU_USAGE, PROCESSOR_NAME_MEM_USAGE,
PROCESSOR_NAME_DISK_USAGE, PROCESSOR_NAME_NETWORK_USAGE, PROCESSOR_NAME_OSRT_API,
PROCESSOR_NAME_LLC, PROCESSOR_NAME_NPU_MEM, PROCESSOR_NAME_PCIE,
PROCESSOR_NAME_DPU, PROCESSOR_NAME_SIO, PROCESSOR_NAME_UB,
PROCESSOR_NAME_SOC, PROCESSOR_NAME_NIC, PROCESSOR_NAME_ROCE,
PROCESSOR_NAME_QOS, PROCESSOR_NAME_CCU_MISSION, PROCESSOR_MC2_COMM_INFO,
PROCESSOR_NAME_MEMCPY_INFO, PROCESSOR_NAME_NPU_OP_MEM, PROCESSOR_NAME_NPU_MODULE_MEM,
PROCESSOR_NAME_UNIFIED_PMU,
};
}
DBAssembler::DBAssembler(const std::string& profPath, const std::string& outputPath)
: profPath_(profPath), outputPath_(outputPath)
{
MAKE_SHARED0_NO_OPERATION(msprofDB_.database, MsprofDB);
auto msprofDBPath = GetDBPath(outputPath);
msprofDB_.ConstructDBRunner(msprofDBPath);
}
bool DBAssembler::Run(DataInventory& dataInventory)
{
INFO("Start exporting db!");
PRINT_INFO("Start exporting the db!");
if (CheckMsprofDb(outputPath_))
{
PRINT_INFO("Find completed msprof db. End exporting db output_file.");
return true;
}
std::atomic<bool> retFlag(true);
const uint16_t processorsLimit = 10;
Analysis::Utils::ThreadPool pool(processorsLimit);
pool.Start();
for (const auto& saveFunc : DATA_SAVER)
{
pool.AddTask(
[saveFunc, &retFlag, &dataInventory, this]()
{
INFO("Begin to save % data.", saveFunc.first);
auto flag = saveFunc.second(dataInventory, msprofDB_, profPath_);
if (!flag)
{
ERROR("Save % data failed.", saveFunc.first);
}
retFlag = flag && retFlag;
});
}
pool.WaitAllTasks();
pool.Stop();
retFlag = SaveStringIdsData(dataInventory, msprofDB_, profPath_) && retFlag;
PRINT_INFO("End exporting db output_file. The file is stored in the PROF file.");
return retFlag;
}
const std::set<std::string>& DBAssembler::GetProcessList() { return DB_DATA_PROCESS_LIST; }
}
}