* Copyright (C) 2025-2025. Huawei Technologies Co., Ltd. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "jsonl/JsonlDataDumper.h"
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>
#include <cstdlib>
#include "utils.h"
namespace dynolog_npu
{
namespace ipc_monitor
{
namespace jsonl
{
namespace
{
inline std::string GetRotateLogEnvValue(const char *envName, const std::string ¶mName)
{
const char *envVal = std::getenv(envName);
if (envVal != nullptr && envVal[0] != '\0')
{
LOG(INFO) << "Using " << paramName << " from environment variable: " << envVal;
return envVal;
}
return "";
}
inline std::string ResolveRotateLogValue(const std::string ¶mValue, const char *envName,
const std::string ¶mName)
{
if (!paramValue.empty())
{
LOG(INFO) << "Using " << paramName << " from parameter: " << paramValue;
return paramValue;
}
return GetRotateLogEnvValue(envName, paramName);
}
template <typename T, typename Converter>
bool ParseRotateLogValue(T &value, const std::string &valueStr, Converter converter, const std::string &funcName,
const std::string &valueName)
{
if (converter(value, valueStr))
{
return true;
}
LOG(WARNING) << "Jsonl " << funcName << " invalid " << valueName << ": " << valueStr;
return false;
}
uint32_t GetRotateLogLines(const std::string &jsons_rotate_log_lines)
{
constexpr uint32_t DEFAULT_LINES = 10000;
constexpr uint32_t MAX_LINES = 500000;
constexpr uint32_t MIN_LINES = 100;
auto linesStr =
ResolveRotateLogValue(jsons_rotate_log_lines, "MSMONITOR_JSONL_ROTATE_LOG_LINES", "jsons_rotate_log_lines");
uint32_t lines = DEFAULT_LINES;
if (!linesStr.empty())
{
if (ParseRotateLogValue(lines, linesStr, Str2Uint32, "GetRotateLogLines", "lines"))
{
auto clampedLines = std::clamp(lines, MIN_LINES, MAX_LINES);
if (clampedLines != lines)
{
LOG(WARNING) << "Jsonl GetRotateLogLines lines: " << lines << " is out of range [" << MIN_LINES << ", "
<< MAX_LINES << "],"
<< " clamped to " << clampedLines;
}
return clampedLines;
}
else
{
LOG(WARNING) << "Use default rotate log lines: " << DEFAULT_LINES;
}
}
return DEFAULT_LINES;
}
int32_t GetRotateLogMaxFiles(const std::string &jsons_rotate_log_files)
{
constexpr int32_t DEFAULT_NOT_ROTATE = -1;
constexpr int32_t MIN_ROTATE_FILES = 2;
auto filesStr =
ResolveRotateLogValue(jsons_rotate_log_files, "MSMONITOR_JSONL_ROTATE_LOG_FILES", "jsons_rotate_log_files");
if (filesStr.empty())
{
return DEFAULT_NOT_ROTATE;
}
int32_t maxFiles = DEFAULT_NOT_ROTATE;
if (ParseRotateLogValue(maxFiles, filesStr, Str2Int32, "GetRotateLogMaxFiles", "maxFiles"))
{
if (maxFiles == DEFAULT_NOT_ROTATE || maxFiles >= MIN_ROTATE_FILES)
{
return maxFiles;
}
LOG(WARNING) << "Jsonl GetRotateLogMaxFiles invalid maxFiles: " << maxFiles
<< ", rotate log maxFiles must be -1 or greater than or equal to " << MIN_ROTATE_FILES;
}
LOG(WARNING) << "Use default rotate log maxFiles: " << DEFAULT_NOT_ROTATE;
return DEFAULT_NOT_ROTATE;
}
}
void JsonlDataDumper::Init(const std::string &dirPath, size_t capacity, uint32_t maxDumpIntervalMs,
const std::string &jsons_rotate_log_lines, const std::string &jsons_rotate_log_files)
{
dumpDir_ = dirPath;
dataBuf_.Init(capacity);
init_.store(true);
maxDumpIntervalMs_ = std::chrono::milliseconds(maxDumpIntervalMs);
lastDumpTime_ = std::chrono::steady_clock::now();
auto logLines = GetRotateLogLines(jsons_rotate_log_lines);
auto maxFiles = GetRotateLogMaxFiles(jsons_rotate_log_files);
rotateLogger_ = std::make_unique<RotateLogger>(dumpDir_, logLines, maxFiles);
LOG(INFO) << "JsonlDataDumper Init, logLines: " << logLines
<< (maxFiles < 0 ? " not rotate" : ", maxFiles: " + std::to_string(maxFiles));
}
void JsonlDataDumper::UnInit()
{
if (init_.load())
{
dataBuf_.UnInit();
init_.store(false);
start_.store(false);
}
if (rotateLogger_ != nullptr)
{
rotateLogger_->UnInit();
}
}
void JsonlDataDumper::Start()
{
if (!init_.load() || Thread::Start() != 0)
{
return;
}
start_.store(true);
}
void JsonlDataDumper::Stop()
{
if (start_.load() == true)
{
start_.store(false);
Thread::Stop();
}
Flush();
}
void JsonlDataDumper::DumpData()
{
uint32_t batchSize = 0;
while (batchSize < kNotifyInterval)
{
std::unique_ptr<nlohmann::json> json = nullptr;
if (!dataBuf_.Pop(json) || json == nullptr)
{
break;
}
std::string encodeData = json->dump() + "\n";
rotateLogger_->Log(std::move(encodeData));
++batchSize;
}
lastDumpTime_ = std::chrono::steady_clock::now();
}
void JsonlDataDumper::Run()
{
while (true)
{
if (!start_.load())
{
break;
}
if (dataBuf_.Size() > kNotifyInterval)
{
DumpData();
}
else
{
auto curTime = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(curTime - lastDumpTime_) >= maxDumpIntervalMs_)
{
DumpData();
rotateLogger_->Flush();
LOG(INFO) << "JsonlDataDumper DumpData for timeout, timeout: " << maxDumpIntervalMs_.count() << "ms";
}
else
{
usleep(kMaxWaitTimeUs);
}
}
}
}
void JsonlDataDumper::Flush()
{
while (dataBuf_.Size() != 0)
{
DumpData();
}
}
void JsonlDataDumper::Record(std::unique_ptr<nlohmann::json> data)
{
if (!start_.load() || data == nullptr)
{
return;
}
dataBuf_.Push(std::move(data));
}
}
}
}