* -------------------------------------------------------------------------
* This file is part of the MindStudio project.
* Copyright (c) 2025 Huawei Technologies Co.,Ltd.
*
* MindStudio is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*/
#include "pch.h"
#include "TraceDatabaseDef.h"
#include "DomainObject.h"
#include "DominQuery.h"
#include "SliceCacheManager.h"
#include "TextRepository.h"
#include "FlowAnalyzer.h"
using namespace Dic::Server;
namespace Dic::Module::Timeline {
FlowAnalyzer::FlowAnalyzer() {
if (repository == nullptr) {
repository = std::make_unique<TextRepository>();
}
}
void FlowAnalyzer::SetRepository(std::unique_ptr<FlowRepoInterface> repositoryDependency) {
repository = std::move(repositoryDependency);
}
std::vector<FlowPoint> FlowAnalyzer::ComputeAllFlowPointBySliceId(FlowQuery &flowQuery, const std::string &sliceId) {
std::unordered_set<std::string> onSliceFlowPointSet = ComputeOnSliceFlowPointBySliceId(flowQuery, sliceId);
for (const auto &item : onSliceFlowPointSet) {
flowQuery.flowIds.emplace_back(item);
}
std::vector<FlowPoint> allPoints;
repository->QueryFlowPointByFlowId(flowQuery, allPoints);
const uint64_t targetTime = flowQuery.minTimestamp + flowQuery.startTime;
const uint64_t targetTrackId = flowQuery.trackId;
FlowPoint targetPoint;
targetPoint.timestamp = targetTime;
targetPoint.trackId = targetTrackId;
auto it = std::lower_bound(allPoints.begin(), allPoints.end(), targetPoint, FlowPoint::CompareTimestampASC);
if (it == allPoints.end()) {
return {};
}
std::vector<FlowPoint> res;
res.emplace_back(*it);
for (auto i = it - 1; i >= allPoints.begin(); --i) {
res.emplace_back(*i);
if (i->timestamp != targetTime && i->trackId == targetTrackId) {
break;
}
}
for (auto i = it + 1; i < allPoints.end(); ++i) {
res.emplace_back(*i);
if (i->timestamp != targetTime && i->trackId == targetTrackId) {
break;
}
}
return res;
}
std::unordered_set<std::string> FlowAnalyzer::ComputeOnSliceFlowPointBySliceId(
const FlowQuery &flowQuery, const std::string &sliceId) {
std::unordered_set<std::string> res;
std::vector<FlowPoint> flowPointVec;
repository->QueryFlowPointByTimeRange(flowQuery, flowPointVec);
ServerLog::Info("flowPointVec is: ", flowPointVec.size());
auto &instance = SliceCacheManager::Instance();
SliceQuery sliceQuery;
sliceQuery.startTime = flowQuery.startTime;
sliceQuery.endTime = flowQuery.endTime;
std::vector<SliceDomain> sliceVec =
instance.GetSliceDomainVec(std::to_string(flowQuery.trackId), flowQuery.fileId, sliceQuery);
if (std::empty(sliceVec)) {
return res;
}
for (const auto &item : flowPointVec) {
auto it = ComputeSliceByFlowPoint(item, sliceVec);
if (it != sliceVec.end() && std::to_string(it->id) == sliceId) {
res.emplace(item.flowId);
}
}
return res;
}
std::vector<SliceDomain>::const_iterator FlowAnalyzer::ComputeSliceByFlowPoint(
const FlowPoint &flowPoint, const std::vector<SliceDomain> &sliceVec) const {
SliceDomain slice;
slice.timestamp = flowPoint.timestamp;
slice.id = 0;
auto it = sliceVec.begin();
if (flowPoint.type == Protocol::LINE_START) {
it = std::lower_bound(sliceVec.begin(), sliceVec.end(), slice, SliceDomain::CompareTimestampASC);
if (it != sliceVec.end() && it->timestamp == flowPoint.timestamp) {
return it;
}
while (it != sliceVec.end() && it > sliceVec.begin()) {
it--;
if (it->timestamp <= flowPoint.timestamp && it->endTime >= flowPoint.timestamp) {
break;
}
}
return it;
}
if (flowPoint.type == Protocol::LINE_END || flowPoint.type == Protocol::LINE_END_OPTIONAL) {
it = std::lower_bound(sliceVec.begin(), sliceVec.end(), slice, SliceDomain::CompareTimestampASC);
if (it != sliceVec.end()) {
return it;
}
}
return it;
}
void FlowAnalyzer::ComputeCategoryAndFlowMap(const std::vector<FlowDetailDto> &flowDetailVec,
std::map<std::string, std::vector<Protocol::UnitSingleFlow>> &flowMap, uint64_t minTimestamp) {
const static int FLOW_COUNT = 2;
if (flowDetailVec.size() != FLOW_COUNT) {
return;
}
Protocol::UnitSingleFlow unitSingleFlow;
unitSingleFlow.title = flowDetailVec[0].name;
unitSingleFlow.cat = flowDetailVec[0].cat;
unitSingleFlow.id = flowDetailVec[0].flowId;
FlowDetailDto from(flowDetailVec[0]);
FlowDetailDto to(flowDetailVec[1]);
if (from.type != to.type && to.type == Protocol::LINE_START) {
from = flowDetailVec[1];
to = flowDetailVec[0];
}
unitSingleFlow.from.id = from.id;
unitSingleFlow.from.pid = from.pid;
unitSingleFlow.from.tid = from.tid;
if (from.timestamp < minTimestamp || to.timestamp < minTimestamp) {
return;
}
unitSingleFlow.from.timestamp = from.timestamp - minTimestamp;
unitSingleFlow.from.depth = from.depth;
unitSingleFlow.to.id = to.id;
unitSingleFlow.to.pid = to.pid;
unitSingleFlow.to.tid = to.tid;
unitSingleFlow.to.timestamp = to.timestamp - minTimestamp;
unitSingleFlow.to.depth = to.depth;
flowMap[unitSingleFlow.cat].emplace_back(unitSingleFlow);
}
void FlowAnalyzer::SortByTrackIdASC(std::vector<FlowPoint> &flowCategoryEventsDtoVec) {
std::sort(flowCategoryEventsDtoVec.begin(), flowCategoryEventsDtoVec.end(), CompareTrackIdASC);
}
void FlowAnalyzer::SortByFlowIdAndTimestampASC(std::vector<FlowPoint> &flowCategoryEventsDtoVec) {
std::sort(flowCategoryEventsDtoVec.begin(), flowCategoryEventsDtoVec.end(), CompareFlowIdAndTimestampASC);
}
* 计算屏幕上需要呈现的连线点
* @param flowEventsVec 数据集
* @param startTime 屏幕开始时间
* @param endTime 屏幕结束时间
* @param flowIdResult 计算结果
*/
void FlowAnalyzer::ComputeScreenFlowPoint(const std::vector<FlowPoint> &flowEventsVec, uint64_t startTime,
uint64_t endTime, std::vector<FlowPoint> &flowIdResult) {
FlowPointSampleStruct flowPointSampleStruct;
GroupSampleFlowPoint(flowEventsVec, startTime, endTime, flowPointSampleStruct);
uint64_t unitTime = (endTime - startTime) / 1000;
for (const auto &item : flowPointSampleStruct.startPointResultSet) {
OfferFlowPointPair(flowEventsVec, flowIdResult, flowPointSampleStruct, item, unitTime);
}
for (const auto &item : flowPointSampleStruct.endPointResultSet) {
OfferFlowPointPair(flowEventsVec, flowIdResult, flowPointSampleStruct, item, unitTime);
}
}
* 根据连线点组装连线
* 连线存在四种类型:1 - S-F; 2 - S-T; 3 - S-T-T-...T-F; 4 - S-F-F-...-F
* S - Protocol::LINE_START
* T - Protocol::LINE_END_OPTIONAL
* F - Protocol::LINE_END
* @param flowEventsVec 连线点
* @param category 连线类别
* @param flowDetailList 结果集
*/
void FlowAnalyzer::ComputeUintFlows(const std::vector<FlowPoint> &flowEventsVec, const std::string &category,
std::vector<std::unique_ptr<Protocol::UnitSingleFlow>> &flowDetailList) {
std::string curFlowId;
Protocol::FlowLocation location;
Protocol::FlowLocation onePointer;
Protocol::FlowLocation twoPointer;
for (const auto &flow : flowEventsVec) {
std::string type = flow.type;
std::string flowId = flow.flowId;
if (type == Protocol::LINE_START || flowId != curFlowId) {
location = ComputeLocation(location, flow, type);
onePointer = location;
} else if ((type == Protocol::LINE_END_OPTIONAL || type == Protocol::LINE_END) && flowId == curFlowId) {
auto flowEvent = std::make_unique<Protocol::UnitSingleFlow>();
flowEvent->cat = category;
Protocol::FlowLocation fromPointer;
if (onePointer.type != Protocol::LINE_END) {
fromPointer = onePointer;
twoPointer = onePointer;
} else {
fromPointer = twoPointer;
}
onePointer.pid = flow.pid;
onePointer.tid = flow.tid;
onePointer.depth = flow.depth;
onePointer.type = flow.type;
onePointer.timestamp = flow.timestamp;
onePointer.rankId = flow.rankId;
onePointer.id = std::to_string(flow.id);
onePointer.duration = flow.duration;
flowEvent->from.pid = fromPointer.pid;
flowEvent->from.tid = fromPointer.tid;
flowEvent->from.depth = fromPointer.depth;
flowEvent->from.timestamp = fromPointer.timestamp;
flowEvent->from.rankId = fromPointer.rankId;
flowEvent->from.id = fromPointer.id;
flowEvent->from.duration = fromPointer.duration;
flowEvent->to.pid = flow.pid;
flowEvent->to.tid = flow.tid;
flowEvent->to.depth = flow.depth;
flowEvent->to.timestamp = flow.timestamp;
flowEvent->to.rankId = flow.rankId;
flowEvent->to.id = std::to_string(flow.id);
flowEvent->to.duration = flow.duration;
flowDetailList.emplace_back(std::move(flowEvent));
}
curFlowId = flowId;
}
}
Protocol::FlowLocation &FlowAnalyzer::ComputeLocation(
Protocol::FlowLocation &location, const FlowPoint &flow, const std::string &type) {
location.pid = flow.pid;
location.tid = flow.tid;
location.depth = flow.depth;
location.timestamp = flow.timestamp;
location.type = type;
location.rankId = flow.rankId;
location.id = std::to_string(flow.id);
location.duration = flow.duration;
return location;
}
void FlowAnalyzer::OfferFlowPointPair(const std::vector<FlowPoint> &flowEventsVec, std::vector<FlowPoint> &flowIdResult,
FlowPointSampleStruct &flowPointSampleStruct, const std::string &flowId, uint64_t unitTime) const {
if (flowPointSampleStruct.resultFlowIdSet.count(flowId) > 0) {
return;
}
if (flowPointSampleStruct.endPointMap.count(flowId) == 0) {
return;
}
if (flowPointSampleStruct.startPointMap.count(flowId) == 0) {
return;
}
std::vector<FlowPoint> tempPoint;
for (const auto &item : flowPointSampleStruct.endPointMap[flowId]) {
tempPoint.emplace_back(flowEventsVec[item]);
}
std::sort(tempPoint.begin(), tempPoint.end());
uint64_t curTime = 0;
uint64_t curTrackId = 0;
for (const auto &item : tempPoint) {
if (item.trackId != curTrackId) {
curTime = 0;
curTrackId = item.trackId;
}
if (item.timestamp >= curTime + unitTime || curTime == 0) {
flowIdResult.emplace_back(item);
curTime = item.timestamp;
}
}
for (const auto &item : flowPointSampleStruct.startPointMap[flowId]) {
flowIdResult.emplace_back(flowEventsVec[item]);
}
flowPointSampleStruct.resultFlowIdSet.emplace(flowId);
}
void FlowAnalyzer::GroupSampleFlowPoint(const std::vector<FlowPoint> &flowEventsVec, uint64_t startTime,
uint64_t endTime, FlowPointSampleStruct &flowPointSampleStruct) {
uint64_t curTrackId = std::numeric_limits<uint64_t>::max();
uint64_t index = 0;
uint64_t uintTime = (endTime - startTime) / 500;
for (const auto &item : flowEventsVec) {
if (curTrackId != item.trackId) {
curTrackId = item.trackId;
flowPointSampleStruct.curBeginLimitTime = uintTime;
flowPointSampleStruct.curBeginStartTime = 0;
flowPointSampleStruct.curEndLimitTime = uintTime;
flowPointSampleStruct.curEndStartTime = 0;
}
if (item.timestamp <= endTime && item.type == Protocol::LINE_START) {
flowPointSampleStruct.startPointMap[item.flowId].push_back(index);
}
if (item.timestamp >= startTime &&
(item.type == Protocol::LINE_END || item.type == Protocol::LINE_END_OPTIONAL)) {
flowPointSampleStruct.endPointMap[item.flowId].push_back(index);
}
index++;
if (uintTime == 0 && item.timestamp < startTime) {
continue;
}
if (uintTime == 0 && item.timestamp > endTime) {
continue;
}
ComputePointOnScreen(flowPointSampleStruct, uintTime, item);
}
}
void FlowAnalyzer::ComputePointOnScreen(
FlowPointSampleStruct &flowPointSampleStruct, uint64_t uintTime, const FlowPoint &flowPoint) {
if (uintTime == 0 && flowPoint.type == Protocol::LINE_START) {
flowPointSampleStruct.startPointResultSet.emplace(flowPoint.flowId);
return;
}
if (uintTime == 0 && (flowPoint.type == Protocol::LINE_END || flowPoint.type == Protocol::LINE_END_OPTIONAL)) {
flowPointSampleStruct.endPointResultSet.emplace(flowPoint.flowId);
return;
}
if (flowPoint.type == Protocol::LINE_START) {
while (flowPoint.timestamp >= flowPointSampleStruct.curBeginLimitTime) {
flowPointSampleStruct.curBeginLimitTime += uintTime;
flowPointSampleStruct.curBeginStartTime += uintTime;
}
if (flowPoint.timestamp >= flowPointSampleStruct.curBeginStartTime) {
flowPointSampleStruct.startPointResultSet.emplace(flowPoint.flowId);
flowPointSampleStruct.curBeginLimitTime += uintTime;
flowPointSampleStruct.curBeginStartTime += uintTime;
}
}
if ((flowPoint.type == Protocol::LINE_END || flowPoint.type == Protocol::LINE_END_OPTIONAL)) {
while (flowPoint.timestamp >= flowPointSampleStruct.curEndLimitTime) {
flowPointSampleStruct.curEndStartTime += uintTime;
flowPointSampleStruct.curEndLimitTime += uintTime;
}
if (flowPoint.timestamp >= flowPointSampleStruct.curEndStartTime) {
flowPointSampleStruct.endPointResultSet.emplace(flowPoint.flowId);
flowPointSampleStruct.curEndStartTime += uintTime;
flowPointSampleStruct.curEndLimitTime += uintTime;
}
}
}
bool FlowAnalyzer::CompareTrackIdASC(const FlowPoint &first, const FlowPoint &second) {
if (first.trackId < second.trackId) {
return true;
}
if (first.trackId == second.trackId && first.id < second.id) {
return true;
}
return false;
}
bool FlowAnalyzer::CompareFlowIdAndTimestampASC(const FlowPoint &first, const FlowPoint &second) {
if (first.flowId < second.flowId) {
return true;
}
if (first.flowId > second.flowId) {
return false;
}
if (second.type != Protocol::LINE_START && first.type == Protocol::LINE_START) {
return true;
}
if (first.type != Protocol::LINE_START && second.type == Protocol::LINE_START) {
return false;
}
if (first.type != Protocol::LINE_START && second.type != Protocol::LINE_START &&
first.timestamp < second.timestamp) {
return true;
}
if (first.type != Protocol::LINE_START && second.type != Protocol::LINE_START &&
first.timestamp > second.timestamp) {
return false;
}
if (first.type != Protocol::LINE_START && second.type != Protocol::LINE_START && first.id < second.id) {
return true;
}
if (first.type == Protocol::LINE_START && second.type == Protocol::LINE_START && first.id < second.id) {
return true;
}
return false;
}
}