#include "dynolog/src/LibkinetoConfigManager.h"
#include <fmt/core.h>
#include <fmt/format.h>
#include <glog/logging.h>
#include <exception>
#include <fstream>
#include <iterator>
#include <thread>
#include <type_traits>
#include <utility>
#include "hbt/src/common/System.h"
#ifdef __linux__
#include <sys/prctl.h>
#endif
namespace dynolog {
namespace {
const int VerboseLevel = 2;
constexpr std::chrono::seconds kKeepAliveTimeSecs(60);
constexpr char kConfigFile[] = "/etc/libkineto.conf";
inline void setThreadName(const std::string& name) {
#ifdef __linux__
constexpr size_t kMaxBuff = 16;
std::array<char, kMaxBuff> buff;
std::size_t len = name.copy(buff.begin(), 0, kMaxBuff - 1);
buff[len] = '\0';
::prctl(PR_SET_NAME, buff.begin(), 0, 0, 0);
#endif
}
}
static std::string addTraceIdToConfigString(
const std::string& trace_id,
const std::string& config) {
const std::string kTraceIdIdentifier = "REQUEST_TRACE_ID";
return fmt::format(
R"(
{}
{}={})",
config,
kTraceIdIdentifier,
trace_id);
}
static std::string generateTraceId(int32_t pid) {
std::string str_trace_id = fmt::format(
"{}:{}:{}", facebook::hbt::getHostName(), pid, std::time(nullptr));
std::size_t hashed_trace_id = std::hash<std::string>{}(str_trace_id);
return std::to_string(hashed_trace_id);
}
LibkinetoConfigManager::LibkinetoConfigManager() {
managerThread_ = new std::thread(&LibkinetoConfigManager::start, this);
}
LibkinetoConfigManager::~LibkinetoConfigManager() {
stopFlag_ = true;
managerCondVar_.notify_one();
managerThread_->join();
delete managerThread_;
managerThread_ = nullptr;
}
std::shared_ptr<LibkinetoConfigManager> LibkinetoConfigManager::getInstance() {
static auto instance = std::make_shared<LibkinetoConfigManager>();
return instance;
}
void LibkinetoConfigManager::start() {
setThreadName("kinetoConfigMgr");
LOG(INFO) << "Starting LibkinetoConfigManager runloop";
while (true) {
refreshBaseConfig();
std::unique_lock<std::mutex> lock(mutex_);
managerCondVar_.wait_for(lock, kKeepAliveTimeSecs);
if (stopFlag_) {
break;
}
runGc();
}
}
static std::string readConfigFromConfigFile(const char* filename) {
std::ifstream file(filename);
if (!file) {
return "";
}
std::string conf;
try {
conf.assign(
std::istreambuf_iterator<char>(file), std::istreambuf_iterator<char>());
} catch (std::exception& e) {
LOG(ERROR) << "Error in reading libkineto config from config file: "
<< e.what();
}
return conf;
}
void LibkinetoConfigManager::refreshBaseConfig() {
auto cfg = readConfigFromConfigFile(kConfigFile);
if (!cfg.empty() && cfg != baseConfig_) {
std::lock_guard<std::mutex> guard(mutex_);
baseConfig_ = cfg;
}
}
void LibkinetoConfigManager::runGc() {
auto t = std::chrono::system_clock::now();
int job_count = jobs_.size();
for (auto job_it = jobs_.begin(); job_it != jobs_.end();) {
auto& procs = job_it->second;
for (auto proc_it = procs.begin(); proc_it != procs.end();) {
struct LibkinetoProcess& proc = proc_it->second;
if ((t - proc.lastRequestTime) > kKeepAliveTimeSecs) {
LOG(INFO) << fmt::format(
"Stopped tracking process ({}) from job {}",
fmt::join(proc_it->first, ","),
job_it->first);
onProcessCleanup(proc_it->first);
proc_it = procs.erase(proc_it);
} else {
proc_it++;
}
}
if (procs.empty()) {
LOG(INFO) << "Stopped tracking job " << job_it->first;
jobInstancesPerGpu_.erase(job_it->first);
job_it = jobs_.erase(job_it);
} else {
job_it++;
}
}
if (job_count != jobs_.size()) {
LOG(INFO) << "Tracked jobs: " << jobs_.size();
}
}
int32_t LibkinetoConfigManager::registerLibkinetoContext(
const std::string& jobId,
int32_t pid,
int32_t gpu) {
std::lock_guard<std::mutex> guard(mutex_);
auto& instances = jobInstancesPerGpu_[jobId][gpu];
instances.insert(pid);
LOG(INFO) << fmt::format("Registered process ({}) for job {}.", pid, jobId);
return instances.size();
}
std::string LibkinetoConfigManager::obtainOnDemandConfig(
const std::string& jobId,
const std::vector<int32_t>& pids,
int32_t configType) {
VLOG(VerboseLevel) << fmt::format(
"obtainOnDemandConfig({}, ({}), {})",
jobId,
fmt::join(pids, ","),
configType);
std::string ret;
std::set<int32_t> pids_set(pids.begin(), pids.end());
std::lock_guard<std::mutex> guard(mutex_);
auto _emplace_result = jobs_[jobId].emplace(pids_set, LibkinetoProcess{});
const auto& it = _emplace_result.first;
bool newProcess = _emplace_result.second;
struct LibkinetoProcess& process = it->second;
if (newProcess) {
process.pid = pids[0];
LOG(INFO) << fmt::format(
"Registered process ({}) for job {}.", fmt::join(pids, ", "), jobId);
onRegisterProcess(pids_set);
}
if ((configType & int(LibkinetoConfigType::EVENTS)) &&
!process.eventProfilerConfig.empty()) {
ret += process.eventProfilerConfig + "\n";
process.eventProfilerConfig.clear();
}
if ((configType & int(LibkinetoConfigType::ACTIVITIES)) &&
!process.activityProfilerConfig.empty()) {
ret += process.activityProfilerConfig + "\n";
process.activityProfilerConfig.clear();
}
process.lastRequestTime = std::chrono::system_clock::now();
return ret;
}
void LibkinetoConfigManager::setOnDemandConfigForProcess(
GpuProfilerResult& res,
LibkinetoProcess& process,
const std::string& config,
int32_t configType ,
int32_t limit) {
res.processesMatched.push_back(process.pid);
if (res.eventProfilersTriggered.size() < limit &&
(configType & int(LibkinetoConfigType::EVENTS))) {
if (process.eventProfilerConfig.empty()) {
process.eventProfilerConfig = config;
res.eventProfilersTriggered.push_back(process.pid);
} else {
res.eventProfilersBusy++;
}
}
if (res.activityProfilersTriggered.size() < limit &&
(configType & int(LibkinetoConfigType::ACTIVITIES))) {
if (process.activityProfilerConfig.empty()) {
preCheckOnDemandConfig(process);
std::string trace_id = generateTraceId(process.pid);
std::string updatedConfig = addTraceIdToConfigString(trace_id, config);
res.activityProfilersTriggered.push_back(process.pid);
process.activityProfilerConfig = updatedConfig;
res.traceIds.push_back(trace_id);
LOG(INFO) << " PID: " << process.pid << ", Trace Id: " << trace_id;
} else {
res.activityProfilersBusy++;
}
}
}
GpuProfilerResult LibkinetoConfigManager::setOnDemandConfig(
const std::string& jobId,
const std::set<int32_t>& pids,
const std::string& config,
int32_t configType ,
int32_t limit) {
LOG(INFO) << fmt::format(
"Initiating on-demand GPU profiling for job ID {}, pids [{}]",
jobId,
fmt::join(pids, ","));
GpuProfilerResult res;
res.activityProfilersBusy = 0;
res.eventProfilersBusy = 0;
size_t nPids = pids.size();
bool traceAllPids = nPids == 0 || (nPids == 1 && *pids.begin() == 0);
{
std::lock_guard<std::mutex> guard(mutex_);
if (auto it = jobs_.find(jobId); it != jobs_.end()) {
auto& processes = it->second;
for (auto& pair : processes) {
for (const auto& pid : pair.first) {
if (traceAllPids || pids.find(pid) != pids.end()) {
auto& process = pair.second;
setOnDemandConfigForProcess(
res, process, config, configType, limit);
break;
}
}
}
if (res.activityProfilersTriggered.size() > 0) {
onSetOnDemandConfig(pids);
}
}
}
LOG(INFO) << "On-demand request: " << res.processesMatched.size()
<< " matching processes";
if (configType & int(LibkinetoConfigType::EVENTS)) {
LOG(INFO) << "Installed event profiler config for "
<< res.eventProfilersTriggered.size() << " process(es) " << "("
<< res.eventProfilersBusy << " busy)";
}
if (configType & int(LibkinetoConfigType::ACTIVITIES)) {
LOG(INFO) << "Installed activity profiler config for "
<< res.activityProfilersTriggered.size() << " process(es) " << "("
<< res.activityProfilersBusy << " busy)";
}
return res;
}
int LibkinetoConfigManager::processCount(const std::string& jobId) const {
int count = 0;
std::lock_guard<std::mutex> guard(mutex_);
auto it = jobs_.find(jobId);
if (it != jobs_.end()) {
count = it->second.size();
}
LOG(INFO) << "Process count for job ID " << jobId << ": " << count;
return count;
}
void LibkinetoConfigManager::updateNpuStatus(
const std::string& jobId,
int32_t pid,
int32_t status,
const std::string& msgType) {
std::lock_guard<std::mutex> guard(mutex_);
if (msgType == kLibkinetoTraceStatus) {
npuTraceStatus_ = status;
} else if (msgType == kLibkinetoMonitorStatus) {
npuMonitorStatus_ = status;
}
}
int32_t LibkinetoConfigManager::getNpuTraceStatus()
{
std::lock_guard<std::mutex> guard(mutex_);
return npuTraceStatus_;
}
int32_t LibkinetoConfigManager::getNpuMonitorStatus()
{
std::lock_guard<std::mutex> guard(mutex_);
return npuMonitorStatus_;
}
}