* Copyright (C) 2026-2026. Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "monitor/MonitorProcessManager.h"
#include "monitor/Monitor.h"
namespace dynolog_npu {
namespace ipc_monitor {
namespace monitor {
void MonitorProcessManager::RunPreTask()
{
sessionStartTime_ = getCurrentTimestamp64();
LOG(INFO) << "MonitorProcessManager start";
}
void MonitorProcessManager::RunPostTask()
{
std::lock_guard<std::mutex> lock(dataMutex_);
sessionStartTime_ = 0;
mstxMarkerHostData_.clear();
mstxRangeHostData_.clear();
mstxRangeDeviceData_.clear();
LOG(INFO) << "MonitorProcessManager finish";
}
void MonitorProcessManager::ProcessApiData(msptiActivityApi *record, msptiActivityKind kind)
{
uint64_t endTime = record->end;
if (endTime < sessionStartTime_) {
return;
}
Monitor::GetInstance()->ReportAPIData(
{record->name, record->start, endTime, record->pt.processId, record->pt.threadId, record->correlationId}, kind);
}
void MonitorProcessManager::ProcessCommunicationData(msptiActivityCommunication *record)
{
uint64_t endTime = record->end;
if (endTime < sessionStartTime_) {
return;
}
Monitor::GetInstance()->ReportCommunicationData(
{record->name, record->start, endTime, record->ds.deviceId, record->ds.streamId, record->count,
GetCommunicationDataTypeName(record->dataType), record->commName, record->algType, record->correlationId});
}
void MonitorProcessManager::ProcessKernelData(msptiActivityKernel *record)
{
uint64_t endTime = record->end;
if (endTime < sessionStartTime_) {
return;
}
Monitor::GetInstance()->ReportKernelData(
{record->name, record->start, endTime, record->ds.deviceId, record->ds.streamId, record->correlationId, record->type});
}
void MonitorProcessManager::ProcessMstxData(msptiActivityMarker *record)
{
if (record->timestamp < sessionStartTime_) {
return;
}
std::lock_guard<std::mutex> lock(dataMutex_);
if (record->sourceKind == msptiActivitySourceKind::MSPTI_ACTIVITY_SOURCE_KIND_HOST) {
ProcessMstxHostData(record);
} else if (record->sourceKind == msptiActivitySourceKind::MSPTI_ACTIVITY_SOURCE_KIND_DEVICE) {
ProcessMstxDeviceData(record);
}
}
void MonitorProcessManager::ProcessMstxHostData(msptiActivityMarker *record)
{
uint64_t connectionId = record->id;
uint64_t timestamp = record->timestamp;
std::string message = record->name;
std::string domain = record->domain;
if (record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_INSTANTANEOUS ||
record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_INSTANTANEOUS_WITH_DEVICE) {
Monitor::GetInstance()->ReportMarkerData(
{message, "Host", domain, connectionId, timestamp, timestamp, record->objectId.pt.processId, record->objectId.pt.threadId, 0, 0});
if (record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_INSTANTANEOUS_WITH_DEVICE) {
mstxMarkerHostData_.emplace(connectionId, MstxHostData{timestamp, domain, message});
}
} else if (record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_START ||
record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_START_WITH_DEVICE) {
mstxRangeHostData_.emplace(connectionId, MstxHostData{timestamp, domain, message});
} else if (record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_END ||
record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_END_WITH_DEVICE) {
auto it = mstxRangeHostData_.find(connectionId);
if (it != mstxRangeHostData_.end()) {
Monitor::GetInstance()->ReportMarkerData(
{it->second.message, "Host", it->second.domain, connectionId, it->second.timestamp, timestamp,
record->objectId.pt.processId, record->objectId.pt.threadId, 0, 0});
if (record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_END) {
mstxRangeHostData_.erase(it);
}
}
}
}
void MonitorProcessManager::ProcessMstxDeviceData(msptiActivityMarker *record)
{
uint64_t connectionId = record->id;
uint64_t timestamp = record->timestamp;
uint32_t deviceId = static_cast<uint32_t>(record->objectId.ds.deviceId);
if (record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_INSTANTANEOUS_WITH_DEVICE) {
auto it = mstxMarkerHostData_.find(connectionId);
Monitor::GetInstance()->ReportMarkerData(
{it != mstxMarkerHostData_.end() ? it->second.message : std::string(record->name),
"Device", it != mstxMarkerHostData_.end() ? it->second.domain : std::string(record->domain),
connectionId, timestamp, timestamp, 0, 0, deviceId, record->objectId.ds.streamId});
if (it != mstxMarkerHostData_.end()) {
mstxMarkerHostData_.erase(it);
}
} else if (record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_START_WITH_DEVICE) {
mstxRangeDeviceData_.emplace(connectionId, MstxDeviceData{timestamp});
} else if (record->flag == msptiActivityFlag::MSPTI_ACTIVITY_FLAG_MARKER_END_WITH_DEVICE) {
auto it = mstxRangeDeviceData_.find(connectionId);
if (it != mstxRangeDeviceData_.end()) {
auto hostIt = mstxRangeHostData_.find(connectionId);
Monitor::GetInstance()->ReportMarkerData(
{hostIt != mstxRangeHostData_.end() ? hostIt->second.message : std::string(record->name),
"Device", hostIt != mstxRangeHostData_.end() ? hostIt->second.domain : std::string(record->domain),
connectionId, it->second.timestamp, timestamp, 0, 0, deviceId, record->objectId.ds.streamId});
mstxRangeDeviceData_.erase(it);
if (hostIt != mstxRangeHostData_.end()) {
mstxRangeHostData_.erase(hostIt);
}
}
}
}
ErrCode MonitorProcessManager::ConsumeMsptiData(msptiActivity *record)
{
if (record == nullptr) {
LOG(ERROR) << "MonitorProcessManager::ConsumeMsptiData record is null";
return ErrCode::VALUE;
}
switch (record->kind) {
case msptiActivityKind::MSPTI_ACTIVITY_KIND_API:
ProcessApiData(ReinterpretConvert<msptiActivityApi*>(record),
msptiActivityKind::MSPTI_ACTIVITY_KIND_API);
break;
case msptiActivityKind::MSPTI_ACTIVITY_KIND_COMMUNICATION:
ProcessCommunicationData(ReinterpretConvert<msptiActivityCommunication*>(record));
break;
case msptiActivityKind::MSPTI_ACTIVITY_KIND_KERNEL:
ProcessKernelData(ReinterpretConvert<msptiActivityKernel*>(record));
break;
case msptiActivityKind::MSPTI_ACTIVITY_KIND_MARKER:
ProcessMstxData(ReinterpretConvert<msptiActivityMarker*>(record));
break;
case msptiActivityKind::MSPTI_ACTIVITY_KIND_RUNTIME_API:
ProcessApiData(ReinterpretConvert<msptiActivityApi*>(record),
msptiActivityKind::MSPTI_ACTIVITY_KIND_RUNTIME_API);
break;
case msptiActivityKind::MSPTI_ACTIVITY_KIND_ACL_API:
ProcessApiData(ReinterpretConvert<msptiActivityApi*>(record),
msptiActivityKind::MSPTI_ACTIVITY_KIND_ACL_API);
break;
case msptiActivityKind::MSPTI_ACTIVITY_KIND_NODE_API:
ProcessApiData(ReinterpretConvert<msptiActivityApi*>(record),
msptiActivityKind::MSPTI_ACTIVITY_KIND_NODE_API);
break;
default:
LOG(WARNING) << record->kind << " is not supported for MonitorProcessManager";
break;
}
return ErrCode::SUC;
}
}
}
}