* -------------------------------------------------------------------------
* This file is part of the MindStudio project.
* Copyright (c) 2025 Huawei Technologies Co.,Ltd.
*
* MindStudio is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*/
#include <algorithm>
#include "pch.h"
#include "PythonUtil.h"
#include "CommunicationMatrixRapidHandler.h"
#include "CommunicationRapidSaxHandler.h"
#include "ConstantDefs.h"
#include "DataBaseManager.h"
#include "ParserStatusManager.h"
#include "DbClusterDataBase.h"
#include "TraceTime.h"
#include "CollectionUtil.h"
#include "CommunicationGroupParser.h"
#include "MetaDataCacheManager.h"
#include "ClusterFileParser.h"
namespace Dic {
namespace Module {
namespace Timeline {
using namespace Dic::Server;
using namespace rapidjson;
using namespace Dic::Module::FullDb;
void ClusterFileParser::ParseCommunication(const std::vector<std::string> &filePathList) {
if (filePathList.empty()) {
ServerLog::Error("Communication file list is empty.");
return;
}
const std::string &filePath = FileUtil::PathPreprocess(filePathList[0].c_str());
Server::ServerLog::Info("ParseCommunication: " + filePath);
SaxParseJsonFile(filePath, 0);
}
void ClusterFileParser::ParseCommunicationMatrix(const std::vector<std::string> &filePathList) {
if (filePathList.empty()) {
ServerLog::Error("Communication matrix file list is empty.");
return;
}
const std::string &filePath = FileUtil::PathPreprocess(filePathList[0].c_str());
Server::ServerLog::Info("ParseCommunicationMatrix: " + filePath);
SaxParseJsonFile(filePath, 1);
}
void ClusterFileParser::SaxParseJsonFile(const std::string &filePath, int saxHandlerType) {
auto start = std::chrono::high_resolution_clock::now();
auto checkFilePath = FileUtil::CheckPathSecurity(filePath, CHECK_FILE_WRITE);
if (!checkFilePath) {
return;
}
std::shared_ptr<TextClusterDatabase> textDb = std::dynamic_pointer_cast<TextClusterDatabase>(database);
if (textDb == nullptr) {
ServerLog::Error("Fail to parse json file, file type:", saxHandlerType);
return;
}
FILE *fp = fopen(filePath.c_str(), "rb");
if (fp == nullptr) {
return;
}
constexpr size_t bufferLen = 65536;
std::string readBuffer;
readBuffer.reserve(bufferLen);
rapidjson::FileReadStream is(fp, readBuffer.data(), sizeof(bufferLen));
rapidjson::Reader reader;
if (saxHandlerType == 0) {
CommunicationRapidSaxHandler rapidSaxHandler(textDb, uniqueKey);
reader.Parse<kParseNumbersAsStringsFlag>(is, rapidSaxHandler);
} else {
CommunicationMatrixRapidHandler matrixRapidHandler(textDb, uniqueKey);
reader.Parse(is, matrixRapidHandler);
}
auto end = std::chrono::high_resolution_clock::now();
ServerLog::Info("End sax_parse_json_file data into db, file:", filePath, "cost time:", (end - start).count());
fclose(fp);
}
void ClusterFileParser::ParseStepStatisticsFile(const std::vector<std::string> &filePathList) {
const std::string &filePath = FileUtil::PathPreprocess(filePathList[0].c_str());
auto start = std::chrono::high_resolution_clock::now();
if (!ValidateUtil::CheckCsvFile(filePath)) {
return;
}
std::ifstream stepTraceFileCsv = OpenReadFileSafely(filePath);
std::string line;
std::map<std::string, size_t> indexMap;
std::shared_ptr<TextClusterDatabase> textDb = std::dynamic_pointer_cast<TextClusterDatabase>(database);
if (textDb == nullptr) {
ServerLog::Error("Can't get cluster database when parse step statistics file.");
stepTraceFileCsv.close();
return;
}
bool isHeader = true;
std::map<std::string, size_t> dataMap;
while (std::getline(stepTraceFileCsv, line)) {
std::vector<std::string> tokens = StringUtil::StringSplit(line);
if (!tokens.empty() and isHeader) {
std::vector<std::string> difference =
CollectionUtil::CalDifferenceVector(VALID_STEP_STATISTICS_HEADERS, tokens);
if (difference.size() != 0) {
ServerLog::Error("The header of step statistics file is invalid, "
"missing header data as follows: %, filePath: %",
StringUtil::join(difference, ","), filePath);
stepTraceFileCsv.close();
return;
}
for (size_t i = 0; i < tokens.size(); ++i) {
dataMap[tokens[i]] = i;
}
isHeader = false;
continue;
}
if (tokens.size() != dataMap.size()) {
ServerLog::Warn("Row size is not equal to header number.");
continue;
}
StepStatistic statistic = MapToStepStatistic(dataMap, tokens);
textDb->InsertStepStatisticsInfo(statistic);
}
auto end = std::chrono::high_resolution_clock::now();
ServerLog::Info("End parse step statistics file data into db, cost time:", (end - start).count());
stepTraceFileCsv.close();
}
void ClusterFileParser::SaveClusterBaseInfo(const std::string &selectedPath) {
ClusterBaseInfo baseInfo;
baseInfo.filePath = selectedPath;
std::shared_ptr<TextClusterDatabase> textDb = std::dynamic_pointer_cast<TextClusterDatabase>(database);
if (textDb == nullptr) {
ServerLog::Error("Can't get cluster database when sava cluster base info.");
return;
}
std::optional<DistributedArgs> args = MetaDataCacheManager::Instance().GetDistributedArgsInfo();
if (args.has_value()) {
baseInfo.config = args.value().config;
baseInfo.level = PARALLEL_CONFIG_LEVEL_COLLECTED;
} else {
bool result = textDb->GetParallelConfigFromStepTrace(baseInfo.config, baseInfo.level);
if (!result) {
ServerLog::Error("Failed to get initial parallel config from profiler_metadata.json or step trace.");
}
}
textDb->InsertClusterBaseInfo(baseInfo);
ServerLog::Info("End save cluster base info data into db, path: ", selectedPath);
}
bool ClusterFileParser::InitCommunicationGroupInfo(std::vector<CommGroupParallelInfo> &groupInfos) {
std::vector<std::string> communicationGroupList =
FileUtil::FindFirstFileByRegex(selectedFilePath, std::regex(R"(communication_group.json)"));
if (communicationGroupList.empty()) {
ServerLog::Error("Failed to get communicationGroup files");
return false;
}
groupInfos = Communication::CommunicationGroupParser::ParseCommunicationGroup(communicationGroupList[0]);
if (groupInfos.empty()) {
return false;
}
return true;
}
bool ClusterFileParser::ParseClusterFiles() {
ParserStatusManager::Instance().SetClusterParseStatus(uniqueKey, ParserStatus::INIT);
if (!InitClusterDatabase()) {
ParserStatusManager::Instance().SetClusterParseStatus(uniqueKey, ParserStatus::FINISH);
ServerLog::Error("Init cluster database occur an err");
return false;
}
if (!needClearDb) {
bool skipStatus = SkipClusterParse();
ParserStatusManager::Instance().SetClusterParseStatus(uniqueKey, ParserStatus::FINISH);
return skipStatus;
}
ParserStatusManager::Instance().SetClusterParseStatus(uniqueKey, ParserStatus::RUNNING);
bool parseRes = InitBaseInfoAndMatrixData();
if (!parseRes) {
ParserStatusManager::Instance().SetClusterParseStatus(uniqueKey, ParserStatus::FINISH);
}
return parseRes;
}
bool ClusterFileParser::SkipClusterParse() {
std::shared_ptr<TextClusterDatabase> textDb = std::dynamic_pointer_cast<TextClusterDatabase>(database);
if (textDb == nullptr) {
ServerLog::Error("Fail to skip cluster parse, can't get cluster database when parse cluster files.");
return false;
}
ServerLog::Info("cluster db file is already exist, skip parse ");
uint64_t min = UINT64_MAX;
uint64_t max = 0;
textDb->QueryExtremumTimestamp(min, max);
if (min != UINT64_MAX && max != 0) {
Timeline::TraceTime::Instance().UpdateTime(min, max);
}
return true;
}
bool ClusterFileParser::InitBaseInfoAndMatrixData() {
std::shared_ptr<TextClusterDatabase> textDb = std::dynamic_pointer_cast<TextClusterDatabase>(database);
if (textDb == nullptr) {
ServerLog::Error("Fail to init base info and matrixData, can't get cluster database when parse cluster files.");
return false;
}
std::regex patternCommunicationMatrix(R"(cluster_communication_matrix.json)");
std::vector<std::string> communicationMatrixFileList =
FileUtil::FindFirstFileByRegex(selectedFilePath, patternCommunicationMatrix);
std::regex patternCommunicationTime(R"(cluster_communication.json)");
std::vector<std::string> communicationTimeFileList =
FileUtil::FindFirstFileByRegex(selectedFilePath, patternCommunicationTime);
if ((communicationMatrixFileList.empty() && communicationTimeFileList.empty()) &&
!AttAnalyze(selectedFilePath, ATT_MODEL_MATRIX, AttDataType::TEXT)) {
return false;
}
std::vector<CommGroupParallelInfo> groupInfos;
if (!InitCommunicationGroupInfo(groupInfos) || !textDb->InsertGroupInfos(groupInfos)) {
ServerLog::Warn("Fail to parse communication group file.");
}
std::vector<std::string> communicationMatrixList =
FileUtil::FindFirstFileByRegex(selectedFilePath, patternCommunicationMatrix);
if (!communicationMatrixList.empty()) {
ParseCommunicationMatrix(communicationMatrixList);
}
textDb->SaveLastData();
std::regex patternStepTrace(R"(cluster_step_trace_time.csv)");
std::vector<std::string> stepTraceFileList = FileUtil::FindFirstFileByRegex(selectedFilePath, patternStepTrace);
if (!stepTraceFileList.empty()) {
ParseStepStatisticsFile(stepTraceFileList);
SaveClusterBaseInfo(selectedFilePath);
}
if (!textDb->CreateIndex()) {
ServerLog::Error("Failed to create index on cluster database. path:", selectedFilePath);
return false;
}
return true;
}
bool ClusterFileParser::BackupExistedClusterFiles(const std::vector<std::string> &backUpMatrixList,
const std::vector<std::string> &backUpGroupList, const std::vector<std::string> &backUpStepList) {
if (backUpMatrixList.empty() || backUpGroupList.empty() || backUpStepList.empty()) {
return false;
}
bool isCopyFile = FileUtil::CopyFileByPath(
FileUtil::PathPreprocess(backUpMatrixList[0]), FileUtil::SplicePath(selectedFilePath, "tmp_matrix.json"));
isCopyFile = isCopyFile &&
FileUtil::CopyFileByPath(
FileUtil::PathPreprocess(backUpStepList[0]), FileUtil::SplicePath(selectedFilePath, "tmp_step.csv"));
isCopyFile = isCopyFile &&
FileUtil::CopyFileByPath(
FileUtil::PathPreprocess(backUpGroupList[0]), FileUtil::SplicePath(selectedFilePath, "tmp_group.json"));
return isCopyFile;
}
bool ClusterFileParser::RestoreClusterFiles(const std::vector<std::string> &backUpMatrixList,
const std::vector<std::string> &backUpGroupList, const std::vector<std::string> &backUpStepList) {
std::string tempMatrixPath = FileUtil::SplicePath(selectedFilePath, "tmp_matrix.json");
std::string tempStepPath = FileUtil::SplicePath(selectedFilePath, "tmp_step.csv");
std::string tempGroupPath = FileUtil::SplicePath(selectedFilePath, "tmp_group.json");
bool reductionFileRes =
FileUtil::CopyFileByPath(tempMatrixPath, FileUtil::PathPreprocess(backUpMatrixList[0].c_str())) &&
FileUtil::RemoveFile(tempMatrixPath);
reductionFileRes = reductionFileRes &&
FileUtil::CopyFileByPath(tempStepPath, FileUtil::PathPreprocess(backUpStepList[0].c_str())) &&
FileUtil::RemoveFile(tempStepPath);
reductionFileRes = reductionFileRes &&
FileUtil::CopyFileByPath(tempGroupPath, FileUtil::PathPreprocess(backUpGroupList[0].c_str())) &&
FileUtil::RemoveFile(tempGroupPath);
return reductionFileRes;
}
bool ClusterFileParser::ParseClusterStep2Files() {
if (ParserStatusManager::Instance().IsClusterParserFinalState(uniqueKey)) {
return true;
}
std::vector<std::string> communicationFileList =
FileUtil::FindFirstFileByRegex(selectedFilePath, std::regex(R"(cluster_communication.json)"));
std::vector<std::string> backUpMatrixList =
FileUtil::FindFirstFileByRegex(selectedFilePath, std::regex(R"(cluster_communication_matrix.json)"));
std::vector<std::string> backUpGroupList =
FileUtil::FindFirstFileByRegex(selectedFilePath, std::regex(R"(communication_group.json)"));
std::vector<std::string> backUpStepList =
FileUtil::FindFirstFileByRegex(selectedFilePath, std::regex(R"(cluster_step_trace_time.csv)"));
bool isCopyFile = false;
if (communicationFileList.empty()) {
isCopyFile = BackupExistedClusterFiles(backUpMatrixList, backUpGroupList, backUpStepList);
if (!isCopyFile) {
ServerLog::Warn("Copy matrix and other files failed.");
}
}
if (communicationFileList.empty() && !AttAnalyze(selectedFilePath, ATT_MODEL_TIME, AttDataType::TEXT)) {
ParserStatusManager::Instance().SetClusterParseStatus(uniqueKey, ParserStatus::FINISH);
return false;
}
if (isCopyFile && !RestoreClusterFiles(backUpMatrixList, backUpGroupList, backUpStepList)) {
ServerLog::Warn("Copy and clear matrix temp file failed.");
}
bool res = TransCommunicationToDb(selectedFilePath, std::regex(R"(cluster_communication.json)"));
ParserStatusManager::Instance().SetClusterParseStatus(uniqueKey, ParserStatus::FINISH);
return res;
}
bool ClusterFileParser::TransCommunicationToDb(
const std::string &selectedPath, const std::regex &patternCommunication) {
std::shared_ptr<TextClusterDatabase> textDb = std::dynamic_pointer_cast<TextClusterDatabase>(database);
if (textDb == nullptr) {
ServerLog::Error("Failed to connect to cluster database.", selectedPath);
return false;
}
std::vector<std::string> communicationFileList = FileUtil::FindFirstFileByRegex(selectedPath, patternCommunication);
if (!communicationFileList.empty()) {
ParseCommunication(communicationFileList);
}
textDb->SaveLastData();
if (!textDb->CreateTimeIndex()) {
ServerLog::Error("Failed to CreateTimeIndex on cluster database. path:", selectedPath);
return false;
}
if (ParserStatusManager::Instance().IsClusterParserFinalState(uniqueKey)) {
ServerLog::Warn("Parser Cluster Status Is Terminal");
return false;
}
textDb->UpdateClusterParseStatus(FINISH_STATUS);
uint64_t min = UINT64_MAX;
uint64_t max = 0;
textDb->QueryExtremumTimestamp(min, max);
if (min != UINT64_MAX && max != 0) {
Timeline::TraceTime::Instance().UpdateTime(min, max);
}
return true;
}
bool ClusterFileParser::InitClusterDatabase() {
clusterDbPath = selectedFilePath + FILE_SEPARATOR + "cluster.db";
DataBaseManager::Instance().CreateClusterConnectionPool(selectedFilePath, clusterDbPath, DataType::TEXT);
database = DataBaseManager::Instance().GetClusterDatabase(selectedFilePath);
std::shared_ptr<TextClusterDatabase> databaseWrite = std::dynamic_pointer_cast<TextClusterDatabase>(database);
if (databaseWrite == nullptr) {
ServerLog::Error("Can't get text cluster database.");
return false;
}
std::ifstream file = OpenReadFileSafely(clusterDbPath, std::ios::in);
if (!file.good()) {
if (!(databaseWrite->DropAllTable() && databaseWrite->CreateTable() && databaseWrite->SetConfig() &&
databaseWrite->SetDbVersion() && databaseWrite->InitStmt())) {
ServerLog::Error("Failed to update databaseWrite. path:", selectedFilePath);
return false;
}
} else {
auto isChange = databaseWrite->IsDatabaseVersionChange();
std::string status = databaseWrite->QueryParseClusterStatus();
needClearDb = isChange || status.empty() || strcmp(status.c_str(), FINISH_STATUS.c_str()) != 0;
if (needClearDb && !(databaseWrite->DropAllTable() && databaseWrite->CreateTable())) {
ServerLog::Error("Failed to dropAllTable. path:", selectedFilePath, "isChange:", isChange);
return false;
}
if (!(databaseWrite->SetConfig() && databaseWrite->SetDbVersion() && databaseWrite->InitStmt())) {
ServerLog::Error("Failed to init databaseWrite. path:", selectedFilePath);
return false;
}
}
return true;
}
bool ClusterFileParser::CheckDocumentValid(const Document &doc) {
if (doc.HasParseError()) {
ServerLog::Error("JSON file is invalid.");
return false;
}
bool isLegal = doc.IsObject() && doc.HasMember("p2p") && doc.FindMember("p2p")->value.IsArray() &&
doc.HasMember("collective") && doc.FindMember("collective")->value.IsArray();
if (!isLegal) {
ServerLog::Error("JSON file is illegal.");
return false;
}
return true;
}
bool ClusterFileParser::AttAnalyze(const std::string &selectedPath, const std::string &mode, AttDataType dataType) {
ServerLog::Info("Start execute cluster analysis");
if (!StringUtil::ValidateCommandFilePathParam(selectedPath)) {
ServerLog::Warn("validate string select path failed! select path", selectedPath);
return false;
}
const std::string scriptPath =
std::string("msprof_analyze") + FILE_SEPARATOR + "cluster_analyse" + FILE_SEPARATOR + "cluster_analysis.py";
std::vector<std::string> arguments{"-d", selectedPath};
if (!mode.empty()) {
arguments.emplace_back("-m");
arguments.emplace_back(mode);
}
ServerLog::Info("Start execute command, selected path:", selectedPath, " ,mode: ", mode);
if (PythonUtil::ExecuteScript(scriptPath, arguments) != 0) {
ServerLog::Warn(
"Execute cluster analysis failed, skip parse cluster file, selected path:", selectedPath, " ,mode: ", mode);
return false;
}
ServerLog::Info("Execute cluster analysis succeeded, selected path: ", selectedPath, ",mode: ", mode);
return true;
}
StepStatistic ClusterFileParser::MapToStepStatistic(
std::map<std::string, size_t> &dataMap, const std::vector<std::string> &tokens) {
StepStatistic statistic;
std::string stepId = GetStrValue(dataMap, tokens, FIELD_STEP);
std::string flag = GetStrValue(dataMap, tokens, FIELD_TYPE);
std::string order = GetStrValue(dataMap, tokens, FIELD_INDEX);
statistic.stepId = stepId.empty() ? "0" : stepId;
statistic.rankId = std::strcmp(flag.c_str(), "rank") == 0 ? order : "";
if (std::strcmp(flag.c_str(), "stage") == 0 && order.find('\"') != std::string::npos &&
order.length() > subStrlen) {
order = order.substr(1, order.length() - subStrlen);
statistic.stageId = order;
}
statistic.computingTime = NumberUtil::StringToDouble(GetStrValue(dataMap, tokens, FIELD_COMPUTING));
statistic.pureCommunicationTime =
NumberUtil::StringToDouble(GetStrValue(dataMap, tokens, FIELD_COMMUNICATION_NOT_OVERLAPPED));
statistic.overlapCommunicationTime = NumberUtil::StringToDouble(GetStrValue(dataMap, tokens, FIELD_OVERLAPPED));
statistic.communicationTime = NumberUtil::StringToDouble(GetStrValue(dataMap, tokens, FIELD_COMMUNICATION));
statistic.freeTime = NumberUtil::StringToDouble(GetStrValue(dataMap, tokens, FIELD_FREE));
statistic.stageTime = NumberUtil::StringToDouble(GetStrValue(dataMap, tokens, FIELD_STAGE));
statistic.bubbleTime = NumberUtil::StringToDouble(GetStrValue(dataMap, tokens, FIELD_BUBBLE));
statistic.pureCommunicationExcludeReceiveTime =
NumberUtil::StringToDouble(GetStrValue(dataMap, tokens, FIELD_COMMUNICATION_NOT_OVERLAPPED_AND_RECEIVE));
std::string prepareTime = GetStrValue(dataMap, tokens, FIELD_PREPARE_TIME);
statistic.prepareTime = prepareTime.empty() ? -1 : NumberUtil::StringToDouble(prepareTime);
bool allParallelKeys = std::all_of(PARALLEL_STRATEGY_HEADERS.begin(), PARALLEL_STRATEGY_HEADERS.end(),
[&dataMap](const std::string &str) { return dataMap.find(str) != dataMap.end(); });
if (allParallelKeys) {
statistic.dpIndex = NumberUtil::StringToLong(GetStrValue(dataMap, tokens, FIELD_DP_INDEX));
statistic.ppIndex = NumberUtil::StringToLong(GetStrValue(dataMap, tokens, FIELD_PP_INDEX));
statistic.tpIndex = NumberUtil::StringToLong(GetStrValue(dataMap, tokens, FIELD_TP_INDEX));
}
return statistic;
}
std::string ClusterFileParser::GetStrValue(
std::map<std::string, size_t> &dataMap, const std::vector<std::string> &tokens, const std::string &key) {
if (dataMap.find(key) == dataMap.end()) {
return "";
}
size_t index = dataMap[key];
return tokens[index];
}
void UpdateTraceTimeOfDb(std::shared_ptr<FullDb::DbClusterDataBase> clusterDatabase) {
uint64_t min = UINT64_MAX;
uint64_t max = 0;
clusterDatabase->QueryExtremumTimestamp(min, max);
if (min != UINT64_MAX && max != 0) {
Timeline::TraceTime::Instance().UpdateTime(min, max);
}
}
bool ClusterFileParser::ParserClusterOfDb() {
ParserStatusManager::Instance().SetClusterParseStatus(uniqueKey, ParserStatus::INIT);
std::string tempPath(selectedFilePath);
if (!FileUtil::IsFolder(selectedFilePath)) {
tempPath = FileUtil::GetParentPath(selectedFilePath);
}
std::vector<std::string> clusterPath = FileUtil::FindFilesWithFilter(tempPath, std::regex(clusterDBReg));
if (clusterPath.empty()) {
if (!AttAnalyze(tempPath, ATT_MODEL_DEFAULT, AttDataType::DB)) {
ParserStatusManager::Instance().SetClusterParseStatus(uniqueKey, ParserStatus::FINISH);
return false;
}
clusterPath = FileUtil::FindFilesWithFilter(tempPath, std::regex(clusterDBReg));
}
if (clusterPath.empty()) {
ParserStatusManager::Instance().SetClusterParseStatus(uniqueKey, ParserStatus::FINISH);
return false;
}
ServerLog::Info("Cluster Db Path: " + clusterPath[0]);
clusterDbPath = clusterPath[0];
Dic::Module::FullDb::DataBaseManager::Instance().CreateClusterConnectionPool(
selectedFilePath, clusterDbPath, Dic::Module::Timeline::DataType::DB);
database = Dic::Module::FullDb::DataBaseManager::Instance().GetClusterDatabase(selectedFilePath);
std::shared_ptr<FullDb::DbClusterDataBase> clusterDatabase = std::dynamic_pointer_cast<DbClusterDataBase>(database);
if (clusterDatabase == nullptr) {
ServerLog::Error("Failed to get db cluster connection.");
ParserStatusManager::Instance().SetClusterParseStatus(uniqueKey, ParserStatus::FINISH);
return false;
}
if (!clusterDatabase->IsDatabaseVersionChange() && clusterDatabase->HasFinishedParseLastTime()) {
ParserStatusManager::Instance().SetClusterParseStatus(uniqueKey, ParserStatus::FINISH);
UpdateTraceTimeOfDb(clusterDatabase);
return true;
}
ClusterBaseInfo baseInfo;
InitFullDbClusterBaseInfo(clusterDatabase, baseInfo);
if (!clusterDatabase->DropTable() or !clusterDatabase->CreateTable() or !clusterDatabase->SetDataBaseVersion() or
!clusterDatabase->UpdatesClusterParseStatus(NOT_FINISH_STATUS)) {
ParserStatusManager::Instance().SetClusterParseStatus(uniqueKey, ParserStatus::FINISH);
return false;
}
clusterDatabase->InsertClusterBaseInfo(baseInfo);
clusterDatabase->UpdatesClusterParseStatus(FINISH_STATUS);
ServerLog::Info("ParseClusterFiles is success");
ParserStatusManager::Instance().SetClusterParseStatus(uniqueKey, ParserStatus::FINISH);
UpdateTraceTimeOfDb(clusterDatabase);
return true;
}
void ClusterFileParser::InitFullDbClusterBaseInfo(
std::shared_ptr<FullDb::DbClusterDataBase> &clusterDatabase, ClusterBaseInfo &baseInfo) {
if (clusterDatabase == nullptr) {
return;
}
clusterDatabase->SetHasClusterBaseInfoTable();
if (clusterDatabase->HasClusterBaseInfoTable()) {
clusterDatabase->QueryDistributedArgs(baseInfo.config, baseInfo.level);
} else {
clusterDatabase->GetParallelConfigFromStepTrace(baseInfo.config, baseInfo.level);
}
}
std::string ClusterFileParser::GetClusterDbPath() { return clusterDbPath; }
ClusterFileParser::ClusterFileParser(
const std::string &filePath, std::shared_ptr<VirtualClusterDatabase> database, const std::string &uniqueKey)
: selectedFilePath(filePath), uniqueKey(uniqueKey), database(database) {}
bool ClusterFileParser::CheckIsCluster(const std::string &filePath) {
std::vector<std::string> folders;
std::vector<std::string> files;
if (filePath.find(CLUSTER_ANALYSIS_OUTPUT) != std::string::npos) {
ServerLog::Info("this folder is cluster_analysis_output, Check Cluster is true");
return true;
}
if (!FileUtil::FindFolders(filePath, folders, files)) {
ServerLog::Info("FindFolders is empty, Check Cluster is false");
return false;
}
return std::any_of(
folders.begin(), folders.end(), [](std::string &folder) { return folder == CLUSTER_ANALYSIS_OUTPUT; });
}
}
}
}