* Copyright (C) 2025-2025. Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MetricMemSetProcess.h"
#include <numeric>
namespace dynolog_npu {
namespace ipc_monitor {
namespace metric {
std::string MemSetMetric::seriesToJson()
{
nlohmann::json jsonMsg;
jsonMsg["kind"] = "MemSet";
jsonMsg["deviceId"] = deviceId;
jsonMsg["duration"] = duration;
jsonMsg["timestamp"] = timestamp;
return jsonMsg.dump();
}
void MetricMemSetProcess::ConsumeMsptiData(msptiActivity *record)
{
msptiActivityMemset* memSet = ReinterpretConvert<msptiActivityMemset*>(record);
std::shared_ptr<msptiActivityMemset> tmp;
MakeSharedPtr(tmp);
if (tmp == nullptr || memcpy_s(tmp.get(), sizeof(msptiActivityMemset), memSet, sizeof(msptiActivityMemset)) != EOK) {
LOG(ERROR) << "memcpy_s failed " << IPC_ERROR(ErrCode::MEMORY);
return;
}
{
std::unique_lock<std::mutex> lock(dataMutex);
records.emplace_back(std::move(tmp));
}
}
std::vector<MemSetMetric> MetricMemSetProcess::AggregatedData()
{
std::vector<std::shared_ptr<msptiActivityMemset>> copyRecords;
{
std::unique_lock<std::mutex> lock(dataMutex);
copyRecords = std::move(records);
records.clear();
}
if (copyRecords.empty()) {
return {};
}
std::unordered_map<uint32_t, std::vector<std::shared_ptr<msptiActivityMemset>>> deviceId2MemsetData =
groupby(copyRecords, [](const std::shared_ptr<msptiActivityMemset>& data) -> std::uint32_t {
return data->deviceId;
});
std::vector<MemSetMetric> ans;
auto curTimestamp = getCurrentTimestamp64();
for (auto& pair: deviceId2MemsetData) {
MemSetMetric memSetMetric{};
auto deviceId = pair.first;
auto& memSetDatas = pair.second;
memSetMetric.duration = std::accumulate(memSetDatas.begin(), memSetDatas.end(), 0ULL,
[](uint64_t acc, std::shared_ptr<msptiActivityMemset> memSet) {
return acc + memSet->end - memSet->start;
});
memSetMetric.deviceId = deviceId;
memSetMetric.timestamp = curTimestamp;
ans.emplace_back(memSetMetric);
}
return ans;
}
void MetricMemSetProcess::SendProcessMessage()
{
auto afterAggregated = AggregatedData();
for (auto& metric: afterAggregated) {
SendMessage(metric.seriesToJson());
}
}
void MetricMemSetProcess::Clear()
{
records.clear();
}
}
}
}