* -------------------------------------------------------------------------
* This file is part of the MindStudio project.
* Copyright (c) 2025 Huawei Technologies Co.,Ltd.
*
* MindStudio is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*/
#include "pch.h"
#include "DataBaseManager.h"
#include "EventParser.h"
#include "ParserStatusManager.h"
#include "TraceTime.h"
#include "CacheManager.h"
#include "ProjectParserFactory.h"
#include "TraceFileSimulationParser.h"
namespace Dic {
namespace Module {
namespace Timeline {
using namespace Dic::Server;
TraceFileSimulationParser &TraceFileSimulationParser::Instance() {
static TraceFileSimulationParser instance;
return instance;
}
TraceFileSimulationParser::TraceFileSimulationParser() {
threadPool = std::make_unique<ThreadPool>(TraceFileSimulationParser::maxThreadNum);
}
TraceFileSimulationParser::~TraceFileSimulationParser() { threadPool->ShutDown(); }
bool TraceFileSimulationParser::Parse(const std::vector<std::string> &filePathArr, const std::string &rankId,
const std::string &selectedFolder, const std::string &fileId) {
ServerLog::Info("start parse. file id:", rankId);
ParserStatusManager::Instance().SetParserStatus(rankId, ParserStatus::INIT);
threadPool->AddTask(PreParseTask, TraceIdManager::GetTraceId(), filePathArr, rankId, fileId);
return true;
}
void TraceFileSimulationParser::PreParseTask(
const std::vector<std::string> &filePathArr, const std::string &rankId, const std::string &fileId) {
ParserStatusManager::Instance().WaitStartParse();
if (!InitParser(filePathArr, rankId, fileId)) {
auto msg = "Failed to open db. Please delete dbFile and try again or see logs.";
ParseEndCallBack(rankId, fileId, false, msg);
}
}
bool TraceFileSimulationParser::InitParser(
const std::vector<std::string> &filePathArr, const std::string &rankId, const std::string &fileId) {
if (!ParserStatusManager::Instance().SetRunningStatus(rankId)) {
ServerLog::Info("Pre task skip this file.");
return false;
}
auto db = DataBaseManager::Instance().GetTraceDatabaseByFileId(fileId);
if (db == nullptr) {
ServerLog::Error("Failed to get connection. fileId: ", fileId);
return false;
}
std::shared_ptr<TextTraceDatabase> database =
std::dynamic_pointer_cast<TextTraceDatabase, VirtualTraceDatabase>(db);
if (database == nullptr) {
ServerLog::Error("Failed to convert virtual trace database to json trace database in event parser.");
return false;
}
if (!(database->DropTable() && database->CreateTable())) {
ServerLog::Error("Failed to open trace database. rankId:", rankId);
return false;
}
auto &instance = TraceFileSimulationParser::Instance();
auto start = std::chrono::high_resolution_clock::now();
std::shared_ptr<std::vector<std::future<void>>> futures = std::make_shared<std::vector<std::future<void>>>();
for (const auto &filePath : filePathArr) {
ServerLog::Info("Start parse. file id:", rankId, ". path:", filePath);
auto splitFile = TraceFileSimulationParser::SplitFile(filePath);
instance.fileProgressMap[rankId] = std::make_unique<FileProgress>(0, FileUtil::GetFileSize(filePath.c_str()));
if (splitFile.empty()) {
ServerLog::Error("Failed to split file. filePath: %", filePath);
ParseEndCallBack(rankId, fileId, false, "Failed to split file: " + filePath);
continue;
}
for (const auto &pos : splitFile) {
auto future =
instance.threadPool->AddTask(ParseTask, TraceIdManager::GetTraceId(), filePath, rankId, fileId, pos);
futures->emplace_back(std::move(future));
}
}
instance.threadPool->AddTask(
EndParseTask, TraceIdManager::GetTraceId(), rankId, filePathArr, futures, start, fileId);
return true;
}
void TraceFileSimulationParser::ParseTask(const std::string &filePath, const std::string &rankId,
const std::string &fileId, std::pair<int64_t, int64_t> pos) {
if (ParserStatusManager::Instance().GetParserStatus(rankId) != ParserStatus::RUNNING) {
ServerLog::Info("Parse task skip this file. ID:", rankId);
return;
}
auto db = DataBaseManager::Instance().GetTraceDatabaseByFileId(fileId);
if (db == nullptr) {
ServerLog::Warn("Failed to get connection when parse simulation json,ID: ", rankId);
return;
}
std::shared_ptr<TextTraceDatabase> databasePtr =
std::dynamic_pointer_cast<TextTraceDatabase, VirtualTraceDatabase>(db);
if (databasePtr == nullptr) {
ServerLog::Warn("Failed to get text connection when parse simulation json,ID: ", rankId);
return;
}
EventParser eventParser(filePath, rankId, databasePtr);
eventParser.SetSimulationStatus(true);
if (!eventParser.Parse(pos.first, pos.second)) {
if (ParserStatusManager::Instance().SetTerminateStatus(rankId) == ParserStatus::RUNNING) {
ParseEndCallBack(rankId, fileId, false, eventParser.GetError());
}
}
auto &instance = TraceFileSimulationParser::Instance();
std::unique_ptr<FileProgress> &curFileProgress = instance.fileProgressMap[rankId];
curFileProgress->AddToParsedSize(pos.second - pos.first);
instance.parseProgressCallback(rankId, curFileProgress->GetParsedSize(), curFileProgress->GetTotalSize(),
curFileProgress->GetProgressPercentage());
}
void TraceFileSimulationParser::EndParseTask(const std::string &rankId, const std::vector<std::string> &filePathArr,
std::shared_ptr<std::vector<std::future<void>>> futures,
std::chrono::time_point<std::chrono::high_resolution_clock> start, const std::string &fileId) {
if (ParserStatusManager::Instance().GetParserStatus(rankId) != ParserStatus::RUNNING) {
ParserStatusManager::Instance().SetFinishStatus(rankId);
ServerLog::Info("End parse task skip this file. ID:", rankId);
return;
}
ServerLog::Info("Wait parse completed. ID:", rankId);
for (const auto &future : *futures) {
future.wait();
}
auto end = std::chrono::high_resolution_clock::now();
ServerLog::Info("Parse completed. ID:", rankId,
" Cost time(ms): ", std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count());
auto database = std::dynamic_pointer_cast<TextTraceDatabase, VirtualTraceDatabase>(
DataBaseManager::Instance().GetTraceDatabaseByRankId(rankId));
if (database == nullptr) {
ServerLog::Error("Failed to get connection. fileId:", rankId);
ParserStatusManager::Instance().SetFinishStatus(rankId);
return;
}
database->CreateIndex();
CacheManager::Instance().ClearCacheByRankId(rankId);
ServerLog::Info("Update depth completed. ID:", rankId);
ParseEndCallBack(rankId, fileId, true, "");
database->UpdateValueIntoStatusInfoTable(CONNECTION_UNIT, FINISH_STATUS);
ProjectParserBase::SendUnitFinishNotify(fileId, true, CONNECTION_UNIT);
}
void TraceFileSimulationParser::ParseEndCallBack(
const std::string &rankId, const std::string &fileId, bool result, const std::string &message) {
auto oldStatus = ParserStatusManager::Instance().GetParserStatus(rankId);
ParserStatusManager::Instance().SetFinishStatus(rankId);
auto &instance = TraceFileSimulationParser::Instance();
if (instance.parseEndCallback != nullptr && oldStatus != ParserStatus::TERMINATE) {
ServerLog::Info("TraceFileSimulationParser send Message");
instance.parseEndCallback(rankId, fileId, result, message);
}
}
int64_t TraceFileSimulationParser::GetTrackId(
const std::string &fileId, const std::string &pid, const std::string &tid) {
std::unique_lock<std::mutex> lock(trackMutex);
auto item = std::make_pair(pid, tid);
if (trackIdMap[fileId].count(item) > 0) {
return trackIdMap[fileId].at(item);
}
if (trackId == INT64_MAX) {
trackId = 0;
}
trackIdMap[fileId].emplace(item, ++trackId);
return trackId;
}
void TraceFileSimulationParser::Reset() {
ServerLog::Info("Reset. wait task completed.");
ParserStatusManager::Instance().SetAllTerminateStatus();
threadPool->Reset();
ServerLog::Info("Task completed.");
auto connList = DataBaseManager::Instance().GetAllTraceDatabase();
for (auto &conn : connList) {
conn->Stop();
}
trackIdMap.clear();
trackId = 0;
DataBaseManager::Instance().Clear();
TraceTime::Instance().Reset();
FileParser::Reset();
ParserStatusManager::Instance().ClearAllParserStatus();
CacheManager::Instance().ClearAll();
ServerLog::Info("End Reset trace Parser");
}
void TraceFileSimulationParser::DeleteParseFiles(const std::vector<std::string> &fileIds) {
for (const auto &fileId : fileIds) {
auto status = ParserStatusManager::Instance().SetTerminateStatus(fileId);
ServerLog::Info("Before delete file. id:", fileId, ", status:", static_cast<int>(status));
}
ParserStatusManager::Instance().WaitAllFinished(fileIds);
for (const auto &fileId : fileIds) {
auto oldStatus = ParserStatusManager::Instance().GetParserStatus(fileId);
ServerLog::Info("Clear Cache. id:", fileId, ", status:", static_cast<int>(oldStatus));
CacheManager::Instance().ClearCacheByRankId(fileId);
}
}
}
}
}