* -------------------------------------------------------------------------
* This file is part of the MindStudio project.
* Copyright (c) 2026 Huawei Technologies Co.,Ltd.
*
* MindStudio is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*/
#include <memory>
#include "ServerLog.h"
#include "DataBaseManager.h"
#include "ACLGraphDebugJsonFileParser.h"
namespace Dic::Module::Timeline {
bool ACLGraphDebugJsonFileParser::PostParse(std::shared_ptr<TextTraceDatabase> db, const std::string &rankId) {
std::vector<SliceDto> slices;
if (!db->QuerySliceDtoList(slices)) {
Server::ServerLog::Warn("[ACLGraph] No slices found for post-processing");
return false;
}
Server::ServerLog::Info("[ACLGraph] slice length: ", slices.size());
auto groups = GroupSlicesByTrackId(std::move(slices));
AdjustSyncPairs(groups);
std::vector<Trace::Flow> flows;
GenerateSyncFlows(groups, flows);
if (const auto compacted = MergeAndSortSlices(std::move(groups)); !compacted.empty()) {
if (db->ReplaceAllSlices(compacted)) {
Server::ServerLog::Info("[ACLGraph] Slice replacement succeeded: ", compacted.size(), " records");
if (db->InsertFlowList(flows)) {
Server::ServerLog::Info("[ACLGraph] Sync pairs aligned + ", flows.size(), " flows inserted");
} else {
Server::ServerLog::Warn("[ACLGraph] Flow insertion failed (non-fatal)");
}
} else {
Server::ServerLog::Error("[ACLGraph] Slice replacement failed!");
return false;
}
Server::ServerLog::Info("[ACLGraph] PostParse completed: ", compacted.size(), " slices compacted");
}
if (DataBaseManager::Instance().GetDeviceIdFromRankId(rankId).empty()) {
UpdateRankIdDeviceIdMapByProcessData(db, rankId);
}
return true;
}
std::unordered_map<uint64_t, std::vector<SliceDto>> ACLGraphDebugJsonFileParser::GroupSlicesByTrackId(
std::vector<SliceDto> slices) {
std::unordered_map<uint64_t, std::vector<SliceDto>> groups;
for (auto &slice : slices) {
groups[slice.trackId].emplace_back(std::move(slice));
}
Server::ServerLog::Debug("Grouped into ", groups.size(), " tracks");
return groups;
}
static void BuildWaitDependencyGraph(const std::unordered_map<uint64_t, std::vector<SliceDto>> &groups,
const std::unordered_map<std::string, Key> &recMap, std::vector<WaitInfo> &waitInfos,
std::unordered_map<Key, std::unordered_set<Key, PairHash>, PairHash> &deps,
std::unordered_map<Key, int, PairHash> &indeg) {
for (auto &[tid, slices] : groups) {
for (size_t i = 0; i < slices.size(); ++i) {
auto &s = slices[i];
if (s.name.rfind("EVENT_WAIT_", 0) == 0) {
if (auto it = recMap.find(s.name.substr(11)); it != recMap.end()) {
waitInfos.push_back({tid, i, it->second.first, it->second.second});
}
}
}
}
if (waitInfos.empty()) {
return;
}
for (auto &curr : waitInfos) {
Key currKey = {curr.waitTrack, curr.waitIdx};
for (size_t i = 0; i < curr.waitIdx; ++i) {
auto &e = groups.at(curr.waitTrack)[i];
if (e.name.rfind("EVENT_WAIT_", 0) == 0 && recMap.count(e.name.substr(11))) {
Key prev = {curr.waitTrack, i};
if (deps[prev].insert(currKey).second) {
indeg[currKey]++;
}
}
}
for (size_t i = 0; i < curr.recIdx; ++i) {
auto &e = groups.at(curr.recTrack)[i];
if (e.name.rfind("EVENT_WAIT_", 0) == 0 && recMap.count(e.name.substr(11))) {
Key prev = {curr.recTrack, i};
if (deps[prev].insert(currKey).second) {
indeg[currKey]++;
}
}
}
if (!indeg.count(currKey)) {
indeg[currKey] = 0;
}
}
}
static std::vector<Key> TopologicalSort(const std::vector<WaitInfo> &waitInfos,
const std::unordered_map<Key, std::unordered_set<Key, PairHash>, PairHash> &deps,
std::unordered_map<Key, int, PairHash> indeg)
{
std::queue<Key> q;
for (auto &wi : waitInfos) {
if (indeg[{wi.waitTrack, wi.waitIdx}] == 0) {
q.push({wi.waitTrack, wi.waitIdx});
}
}
std::vector<Key> order;
while (!q.empty()) {
auto u = q.front();
q.pop();
order.push_back(u);
if (auto it = deps.find(u); it != deps.end()) {
for (auto &v : it->second) {
if (--indeg[v] == 0) {
q.push(v);
}
}
}
}
if (order.size() != waitInfos.size()) {
Server::ServerLog::Error("Topological sort failed: graph contains cycle or input inconsistency. "
"Processed " +
std::to_string(order.size()) + " of " + std::to_string(waitInfos.size()) + " nodes.");
}
return order;
}
static void AdjustWaitsInOrder(std::unordered_map<uint64_t, std::vector<SliceDto>> &groups,
const std::vector<WaitInfo> &waitInfos, const std::vector<Key> &order,
const std::unordered_map<std::string, Key> &recMap) {
const uint64_t PADDING = 0u;
std::unordered_map<Key, const WaitInfo *, PairHash> idx;
for (auto &wi : waitInfos) {
idx[{wi.waitTrack, wi.waitIdx}] = &wi;
}
for (auto &k : order) {
if (!idx.count(k)) {
continue;
}
const auto &wi = *idx[k];
auto &wait = groups[wi.waitTrack][wi.waitIdx];
const auto &rec = groups[wi.recTrack][wi.recIdx];
const int64_t delta = static_cast<int64_t>(rec.timestamp + rec.duration + PADDING) -
static_cast<int64_t>(wait.timestamp + wait.duration);
if (delta < 0) {
Server::ServerLog::Warn("Skip WAIT sync (id=", wait.id, "): wait stop time > record stop time + PADDING");
continue;
}
wait.duration += delta;
auto &track = groups[wi.waitTrack];
for (size_t j = wi.waitIdx + 1; j < track.size(); ++j) {
track[j].timestamp += delta;
}
}
}
void ACLGraphDebugJsonFileParser::AdjustSyncPairs(std::unordered_map<uint64_t, std::vector<SliceDto>> &groups) {
std::unordered_map<std::string, Key> recMap;
for (auto &[tid, slices] : groups) {
for (size_t i = 0; i < slices.size(); ++i) {
if (slices[i].name.rfind("EVENT_RECORD_", 0) == 0) {
recMap[slices[i].name.substr(13)] = {tid, i};
}
}
}
if (recMap.empty()) {
return;
}
std::vector<WaitInfo> waitInfos;
std::unordered_map<Key, std::unordered_set<Key, PairHash>, PairHash> deps;
std::unordered_map<Key, int, PairHash> indeg;
BuildWaitDependencyGraph(groups, recMap, waitInfos, deps, indeg);
if (!waitInfos.empty()) {
auto order = TopologicalSort(waitInfos, deps, indeg);
AdjustWaitsInOrder(groups, waitInfos, order, recMap);
}
}
std::vector<SliceDto> ACLGraphDebugJsonFileParser::MergeAndSortSlices(
std::unordered_map<uint64_t, std::vector<SliceDto>> groups) {
std::vector<SliceDto> result;
size_t total_count = 0;
for (const auto &[_, group] : groups) {
total_count += group.size();
}
result.reserve(total_count);
for (auto &[_, group] : groups) {
result.insert(result.end(), std::make_move_iterator(group.begin()), std::make_move_iterator(group.end()));
}
std::sort(result.begin(), result.end(), [](const SliceDto &a, const SliceDto &b) {
return (a.trackId != b.trackId) ? (a.trackId < b.trackId) : (a.timestamp < b.timestamp);
});
return result;
}
void ACLGraphDebugJsonFileParser::GenerateSyncFlows(
const std::unordered_map<uint64_t, std::vector<SliceDto>> &groups, std::vector<Trace::Flow> &flows) {
struct RecordInfo {
uint64_t start;
uint64_t end;
uint64_t track;
size_t idx;
};
std::unordered_map<std::string, RecordInfo> records;
for (auto &[tid, slices] : groups) {
for (size_t i = 0; i < slices.size(); ++i) {
if (auto &s = slices[i]; s.name.rfind("EVENT_RECORD_", 0) == 0) {
records[s.name.substr(13)] = {s.timestamp, s.timestamp + s.duration, tid, i};
}
}
}
if (records.empty()) {
return;
}
const std::string CAT = "record_wait";
size_t pair_idx = 0;
for (auto &[tid, slices] : groups) {
for (const auto &s : slices) {
if (s.name.rfind("EVENT_WAIT_", 0) == 0) {
auto id = s.name.substr(11);
if (auto it = records.find(id); it != records.end()) {
const auto &rec = it->second;
std::string flow_id = "syncflow_" + id + "_" + std::to_string(pair_idx++);
Trace::Flow start;
start.trackId = rec.track;
start.ts = static_cast<int64_t>(rec.start);
start.flowId = flow_id;
start.name = "SyncStart_" + id;
start.type = Protocol::LINE_START;
const auto &rec_slice = groups.at(rec.track)[rec.idx];
start.tid = rec_slice.tid;
start.pid = rec_slice.pid;
start.cat = rec_slice.cat.empty() ? CAT : std::optional(rec_slice.cat);
flows.push_back(std::move(start));
Trace::Flow end;
end.trackId = tid;
end.ts = static_cast<int64_t>(s.timestamp);
end.flowId = flow_id;
end.name = "SyncEnd_" + id;
end.type = Protocol::LINE_END;
end.tid = s.tid;
end.pid = s.pid;
end.cat = s.cat.empty() ? CAT : std::optional(s.cat);
flows.push_back(std::move(end));
}
}
}
}
Server::ServerLog::Debug("Generated ", flows.size() / 2, " sync flow pairs (", flows.size(), " events)");
}
}