#include "FtracePlugin.h"
#include "../../../include/common/constant.h"
extern "C" pid_t g_hooked_pid;
FtracePlugin::FtracePlugin() {
pluginName_ = PluginNameType::FTRACE_PLUGIN.data();
active_.store(false);
stop_flag_.store(true);
std::string dir = std::string(get_sys_trace_root_dir()) + pluginName_;
systrace::util::fs_utils::CreateDirectoryIfNotExists(dir);
output_ = dir + "/" + get_id() + "_" + std::to_string(g_hooked_pid) +
"_rank_" + std::to_string(get_local_rank()) + ".log";
}
FtracePlugin::~FtracePlugin() { stop(); }
bool FtracePlugin::start(const json ¶ms, int duration) {
if (systrace::util::config::GlobalConfig::Instance().local_rank != 0)
return true;
bool expected = false;
if (!active_.compare_exchange_strong(expected, true)) {
LOG_MODULE(INFO, pluginName_) << "already starting";
return true;
}
try {
task_config_ = parse_config_from_json(params);
ftrace_path_ = PluginUtils::ensure_ftrace_path();
std::vector<int> target_cpus;
PluginUtils::parse_cpu_list(task_config_.cpu_list, target_cpus);
if (!init_ftrace())
throw std::runtime_error("Ftrace init fail");
output_dir_ =
std::string(get_sys_trace_root_dir()) + pluginName_ + "/tmp";
mkdir(output_dir_.c_str(), 0755);
stop_flag_.store(false);
collect_duration_ = duration;
for (int cpu_id : target_cpus) {
CpuReader reader;
reader.cpu_id = cpu_id;
std::string raw_path = ftrace_path_ + "per_cpu/cpu" +
std::to_string(cpu_id) + "/trace_pipe";
reader.raw_fd = open(raw_path.c_str(), O_RDONLY | O_NONBLOCK);
std::string out_path =
output_dir_ + "/cpu" + std::to_string(cpu_id) + ".raw";
reader.out_fd =
open(out_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0644);
if (pipe(reader.pipe_fds) == -1)
throw std::runtime_error("Pipe fail");
fcntl(reader.pipe_fds[0], F_SETPIPE_SZ, K_SPLICE_SIZE);
fcntl(reader.pipe_fds[1], F_SETPIPE_SZ, K_SPLICE_SIZE);
if (reader.raw_fd < 0 || reader.out_fd < 0)
throw std::runtime_error("Open per-cpu FD fail");
cpu_readers_.push_back(std::move(reader));
}
for (auto &reader : cpu_readers_) {
reader.worker = std::thread(&FtracePlugin::per_cpu_splice_loop,
this, std::ref(reader));
}
if (duration > 0) {
systrace::utils::TimerManager::getInstance().startTimer(
get_id(), duration, [this]() { this->stop(); });
}
LOG_MODULE(INFO, pluginName_)
<< "Parallel Ftrace started. Output file: " << output_;
return true;
} catch (const std::exception &e) {
LOG_MODULE(ERROR, pluginName_) << "Start failed: " << e.what();
reset_plugin_state();
return false;
}
}
void FtracePlugin::stop() {
if (get_local_rank() != 0)
return;
if (stop_flag_.exchange(true))
return;
if (active_.load()) {
try {
PluginUtils::write_file(ftrace_path_ + "tracing_on", "0");
systrace::utils::TimerManager::getInstance().stopTimer(get_id());
for (auto &reader : cpu_readers_) {
if (reader.worker.joinable())
reader.worker.join();
}
cleanup_resources();
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
LOG_MODULE(DEBUG, pluginName_)
<< "Parallel ftrace stop. All CPU buffers flushed.";
reset_ftrace();
LOG_MODULE(DEBUG, pluginName_)
<< "All collection threads joined. Starting data aggregation "
"and processing...";
process_raw_data();
active_.store(false);
output_dir_.clear();
cpu_readers_.clear();
LOG_MODULE(INFO, pluginName_) << "Parallel ftrace stop.";
} catch (const std::exception &e) {
LOG_MODULE(ERROR, pluginName_) << "stop failed: " << e.what();
return;
}
}
}
void FtracePlugin::per_cpu_splice_loop(CpuReader &reader) {
const auto start_time = std::chrono::steady_clock::now();
while (true) {
bool should_exit = stop_flag_.load();
if (collect_duration_ > 0) {
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::steady_clock::now() - start_time)
.count();
if (elapsed >= collect_duration_)
should_exit = true;
}
ssize_t moved =
splice(reader.raw_fd, nullptr, reader.pipe_fds[1], nullptr,
K_SPLICE_SIZE, SPLICE_F_MOVE | SPLICE_F_NONBLOCK);
if (moved > 0) {
splice(reader.pipe_fds[0], nullptr, reader.out_fd, nullptr, moved,
SPLICE_F_MOVE);
} else if (moved < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (should_exit)
break;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
break;
} else
break;
if (should_exit && moved <= 0)
break;
}
fsync(reader.out_fd);
}
void FtracePlugin::process_raw_data() {
std::remove(output_.c_str());
std::ofstream outfile(output_,
std::ios::out | std::ios::trunc | std::ios::binary);
for (const auto &reader : cpu_readers_) {
std::string part_path =
output_dir_ + "/cpu" + std::to_string(reader.cpu_id) + ".raw";
std::ifstream infile(part_path);
if (infile.is_open()) {
outfile << infile.rdbuf();
infile.close();
std::remove(part_path.c_str());
}
}
outfile.close();
}
int FtracePlugin::get_local_rank() {
return systrace::util::config::GlobalConfig::Instance().local_rank;
}
bool FtracePlugin::init_ftrace() {
reset_ftrace();
std::vector<int> cpus;
PluginUtils::parse_cpu_list(task_config_.cpu_list, cpus);
PluginUtils::write_file(ftrace_path_ + "tracing_cpumask",
PluginUtils::generate_cpu_mask(cpus));
PluginUtils::write_file(ftrace_path_ + "buffer_size_kb",
task_config_.core_config.buffer_size_kb);
for (const auto &ev : task_config_.core_config.enable_events) {
PluginUtils::write_file(ftrace_path_ + "events/" + ev + "/enable", "1");
}
if (!task_config_.core_config.trace_functions.empty()) {
PluginUtils::write_file(ftrace_path_ + "current_tracer",
task_config_.core_config.function_tracer);
PluginUtils::write_file(ftrace_path_ + "set_ftrace_filter",
task_config_.core_config.trace_functions);
PluginUtils::write_file(
ftrace_path_ + "options/func_stack_trace",
task_config_.core_config.func_stack_trace ? "1" : "0");
}
if (!task_config_.core_config.ftrace_pid.empty()) {
PluginUtils::write_file(ftrace_path_ + "set_ftrace_pid",
task_config_.core_config.ftrace_pid);
} else {
PluginUtils::write_file(ftrace_path_ + "set_ftrace_pid", "");
}
if (!task_config_.core_config.event_pid.empty()) {
PluginUtils::write_file(ftrace_path_ + "set_event_pid",
task_config_.core_config.event_pid);
} else {
PluginUtils::write_file(ftrace_path_ + "set_event_pid", "");
}
PluginUtils::write_file(ftrace_path_ + "trace_clock", "boot");
PluginUtils::write_file(ftrace_path_ + "options/stacktrace",
task_config_.core_config.stack_trace ? "1" : "0");
PluginUtils::write_file(ftrace_path_ + "tracing_on", "1");
return true;
}
void FtracePlugin::reset_ftrace() {
if (ftrace_path_.empty())
return;
try {
PluginUtils::write_file(ftrace_path_ + "tracing_on", "0");
} catch (...) {}
try {
PluginUtils::write_file(ftrace_path_ + "events/enable", "0");
} catch (...) {}
try {
PluginUtils::write_file(ftrace_path_ + "set_ftrace_filter", "");
} catch (...) {}
try {
PluginUtils::write_file(ftrace_path_ + "set_graph_function", "");
} catch (...) {}
try {
PluginUtils::write_file(ftrace_path_ + "set_ftrace_pid", "");
} catch (...) {}
try {
PluginUtils::write_file(ftrace_path_ + "current_tracer", "nop");
} catch (const std::exception &e) {
LOG_MODULE(WARN, pluginName_) << "Final Tracer reset still busy. This "
"usually means an unclosed FD exists. "
<< e.what();
}
}
TraceTaskConfig
FtracePlugin::parse_config_from_json(const nlohmann::json ¶ms) {
TraceTaskConfig config;
try {
if (params.contains("cpu_list")) {
config.cpu_list = params.at("cpu_list").get<std::string>();
} else {
throw std::invalid_argument("miss cpu_list");
}
auto &core = config.core_config;
if (params.contains("events")) {
auto &events = params.at("events");
if (events.is_array()) {
core.enable_events = events.get<std::vector<std::string>>();
} else if (events.is_string()) {
core.enable_events =
PluginUtils::split_string(events.get<std::string>());
} else {
throw std::invalid_argument("events type error");
}
core.enable_events.erase(
std::remove_if(core.enable_events.begin(),
core.enable_events.end(),
[](const std::string &s) { return s.empty(); }),
core.enable_events.end());
}
if (params.contains("func")) {
auto &func = params.at("func");
if (!func.is_string()) {
throw std::invalid_argument("func type error");
}
std::string func_str = func.get<std::string>();
std::replace(func_str.begin(), func_str.end(), ',', ' ');
core.trace_functions = func_str;
}
if (params.contains("set_ftrace_pid")) {
auto &sfp = params.at("set_ftrace_pid");
if (!sfp.is_string()) {
throw std::invalid_argument("trace_pid type error");
}
std::string ftrace_pid_str = sfp.get<std::string>();
std::replace(ftrace_pid_str.begin(), ftrace_pid_str.end(), ',',
' ');
core.ftrace_pid = ftrace_pid_str;
}
if (params.contains("set_event_pid")) {
auto &sep = params.at("set_event_pid");
if (!sep.is_string()) {
throw std::invalid_argument("event_pid type error");
}
std::string event_pid_str = sep.get<std::string>();
std::replace(event_pid_str.begin(), event_pid_str.end(), ',', ' ');
core.event_pid = event_pid_str;
}
if (params.contains("buffer_size_kb")) {
auto &bsk = params.at("buffer_size_kb");
if (!bsk.is_string()) {
throw std::invalid_argument("buffer_size_kb type error");
}
core.buffer_size_kb = bsk.get<std::string>();
}
if (params.contains("event_stack_trace")) {
core.stack_trace =
PluginUtils::safe_parse_boolean(params.at("event_stack_trace"));
}
if (params.contains("func_stack_trace")) {
core.func_stack_trace =
PluginUtils::safe_parse_boolean(params.at("func_stack_trace"));
}
if (params.contains("function_tracer")) {
auto &ft = params.at("function_tracer");
if (!ft.is_string()) {
throw std::invalid_argument("function_tracer type error");
}
core.function_tracer = ft.get<std::string>();
}
} catch (...) { throw std::runtime_error("parse_config_from_json error"); }
return config;
}
void FtracePlugin::cleanup_resources() {
for (auto &reader : cpu_readers_) {
if (reader.raw_fd >= 0) {
close(reader.raw_fd);
reader.raw_fd = -1;
}
if (reader.out_fd >= 0) {
close(reader.out_fd);
reader.out_fd = -1;
}
if (reader.pipe_fds[0] >= 0) {
close(reader.pipe_fds[0]);
close(reader.pipe_fds[1]);
reader.pipe_fds[0] = reader.pipe_fds[1] = -1;
}
}
}
void FtracePlugin::reset_plugin_state() {
active_.store(false);
stop_flag_.store(true);
cleanup_resources();
}