* Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved.
*
* ubs-optimizer is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "data_receiver.h"
#include <filesystem>
#include <fstream>
#include <iostream>
#include <utility>
#include "log/ebpf_logger_macros.h"
namespace fs = std::filesystem;
DataReceiver::DataReceiver(std::shared_ptr<MutexContext> ctx, size_t bufferSize, int flushIntervalSec,
std::string outputPath)
: context(std::move(ctx)),
maxBufferSize(bufferSize),
flushInterval(flushIntervalSec),
outputPath(std::move(outputPath)),
stopFlag(false)
{
flushThread = std::thread(&DataReceiver::flushTimer, this);
}
DataReceiver::~DataReceiver()
{
{
std::unique_lock<std::mutex> lock(mtx);
stopFlag = true;
condVar.notify_all();
}
if (flushThread.joinable()) {
flushThread.join();
}
writeToDisk();
}
void DataReceiver::save(const std::string &dataJson)
{
std::unique_lock<std::mutex> lock(mtx);
jsonBuffer.push_back(dataJson);
if (jsonBuffer.size() >= maxBufferSize) {
writeToDisk();
}
}
void DataReceiver::flushTimer()
{
std::unique_lock<std::mutex> lock(mtx);
while (!stopFlag) {
if (condVar.wait_for(lock, std::chrono::seconds(flushInterval), [this]() { return stopFlag; })) {
break;
}
writeToDisk();
}
}
void DataReceiver::writeToDisk()
{
if (!context) {
EBPF_LOG_ERROR("Error: pointer is null!");
return;
}
std::lock_guard<std::mutex> lock(context->fileMutex);
if (jsonBuffer.empty()) {
return;
}
fs::path pathObj(outputPath);
fs::path parentDir = pathObj.parent_path();
if (!parentDir.empty() && !fs::exists(parentDir)) {
fs::path curDir;
for (const auto &part : parentDir) {
curDir /= part;
if (!fs::exists(curDir)) {
fs::create_directories(curDir);
fs::permissions(curDir, fs::perms::owner_all);
}
}
}
bool fileExits = fs::exists(pathObj);
std::ofstream ofs(outputPath, std::ios::app);
if (!ofs.is_open()) {
EBPF_LOG_ERROR("Failed to open file: " + outputPath);
return;
}
if (fileExits ^ fs::exists(pathObj)) {
fs::permissions(outputPath, fs::perms::owner_read | fs::perms::owner_write);
}
for (const auto &jsonLine : jsonBuffer) {
EBPF_LOG_DEBUG("Flushing " + jsonLine);
ofs << jsonLine << "\n";
}
jsonBuffer.clear();
}