* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: Test logging performance.
*/
#include "datasystem/common/log/log.h"
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cmath>
#include <dirent.h>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <numeric>
#include <string>
#include <thread>
#include <vector>
#include "ut/common.h"
#include "datasystem/common/constants.h"
#include "datasystem/common/flags/flags.h"
#include "datasystem/common/log/trace.h"
#include "datasystem/common/log/spdlog/provider.h"
#include "datasystem/common/log/logging.h"
#include "datasystem/common/log/log_sampler.h"
DS_DECLARE_bool(alsologtostderr);
DS_DECLARE_bool(logtostderr);
DS_DECLARE_bool(log_async);
DS_DECLARE_double(request_sample_rate);
DS_DECLARE_double(access_sample_rate);
DS_DECLARE_double(diagnostic_sample_rate);
DS_DECLARE_string(log_dir);
DS_DECLARE_uint32(max_log_size);
DS_DECLARE_uint32(stderrthreshold);
namespace datasystem {
namespace ut {
constexpr size_t LOG_PREFIX_ESTIMATED_SIZE = 85;
constexpr double MICROSEC_TO_MILLISEC = 1e3;
constexpr double MICROSEC_TO_SEC = 1e6;
constexpr int WARMUP_ITERATIONS = 1000;
constexpr int OUTPUT_PRECISION = 2;
constexpr int PURE_LOG_THREAD_COUNT = 32;
constexpr int PURE_LOG_PER_THREAD_QPS = 125;
constexpr int PURE_LOG_TARGET_QPS = PURE_LOG_THREAD_COUNT * PURE_LOG_PER_THREAD_QPS;
constexpr int PURE_LOG_DURATION_SEC = 10;
constexpr double PURE_LOG_SAMPLE_RATE = 0.5;
* Test log asynchronous mode performance testing
* Sets up common test environment and utility functions
*/
class LogPerformanceTest : public CommonTest {
public:
void SetUp() override
{
CommonTest::SetUp();
LogSampler::Instance().ResetForTest();
}
void TearDown() override
{
Provider::Instance().FlushLogs();
LogSampler::Instance().ResetForTest();
CommonTest::TearDown();
}
void StartLogging()
{
FLAGS_alsologtostderr = false;
FLAGS_logtostderr = false;
FLAGS_stderrthreshold = LogSeverity::FATAL;
FLAGS_max_log_size = 2048;
FLAGS_log_async = true;
Logging::GetInstance()->Start("ds_llt", true, 1);
}
void StartLoggingWithSamplerRate(double requestRate)
{
FLAGS_request_sample_rate = requestRate;
LogSampler::Instance().ResetForTest();
StartLogging();
LogSampleUserConfig cfg;
cfg.requestSampleRate = requestRate;
cfg.requestSampleRateExplicit = true;
LogSampler::Instance().UpdateConfigFromFlags(cfg);
}
void StartLoggingWithoutSampler()
{
LogSampler::Instance().ResetForTest();
StartLogging();
}
void StartLoggingWithSamplerConfig(double requestRate, double accessRate, double diagnosticRate)
{
FLAGS_request_sample_rate = requestRate;
FLAGS_access_sample_rate = accessRate;
FLAGS_diagnostic_sample_rate = diagnosticRate;
LogSampler::Instance().ResetForTest();
StartLogging();
LogSampleUserConfig cfg;
cfg.requestSampleRate = requestRate;
cfg.requestSampleRateExplicit = true;
cfg.accessSampleRate = accessRate;
cfg.accessSampleRateExplicit = true;
cfg.diagnosticSampleRate = diagnosticRate;
cfg.diagnosticSampleRateExplicit = true;
LogSampler::Instance().UpdateConfigFromFlags(cfg);
}
* Generate test log content of specified payload length
* @param payloadLength Length of the actual log message content (excluding prefix)
* @return Generated string with exactly 'payloadLength' characters
*/
std::string GenerateLogPayload(size_t payloadLength)
{
static const std::string base = "abcdefghijklmnopqrstuvwxyz1234567890";
std::string content;
while (content.size() < payloadLength) {
content += base;
}
return content.substr(0, payloadLength);
}
* Calculate total log size including estimated prefix
* @param payloadLength Length of the log message content
* @return Total log size (payload + prefix) in bytes
*/
size_t CalculateTotalLogSize(size_t payloadLength)
{
return payloadLength + LOG_PREFIX_ESTIMATED_SIZE;
}
* High-precision timer utility for measuring microsecond-level performance
*/
class Timer {
public:
void Start()
{
startTime_ = std::chrono::high_resolution_clock::now();
}
* Calculate elapsed time since start() was called
* @return Elapsed time in microseconds
*/
double ElapsedUs()
{
auto endTime = std::chrono::high_resolution_clock::now();
return std::chrono::duration<double, std::micro>(endTime - startTime_).count();
}
private:
std::chrono::high_resolution_clock::time_point startTime_;
};
* Calculate P99 latency (99% of log operations complete faster than this value)
* @param latencies Collection of latency data points
* @return P99 latency value in microseconds
*/
double CalculateP99(std::vector<double> &latencies)
{
if (latencies.empty())
return 0.0;
std::sort(latencies.begin(), latencies.end());
size_t index = static_cast<size_t>(latencies.size() * 0.99);
if (index >= latencies.size())
index = latencies.size() - 1;
return latencies[index];
}
double CalculatePercentile(std::vector<double> latencies, double percentile)
{
if (latencies.empty()) {
return 0.0;
}
std::sort(latencies.begin(), latencies.end());
size_t index = static_cast<size_t>(latencies.size() * percentile);
if (index >= latencies.size()) {
index = latencies.size() - 1;
}
return latencies[index];
}
* Calculate average latency
* @param latencies Collection of latency data points
* @return Average latency in microseconds
*/
double CalculateAvgLatency(std::vector<double> &latencies)
{
if (latencies.empty())
return 0.0;
double sum = std::accumulate(latencies.begin(), latencies.end(), 0.0);
return sum / latencies.size();
}
int CountInfoLogLinesWithMarker(const std::string &marker)
{
DIR *dp = opendir(FLAGS_log_dir.c_str());
if (dp == nullptr) {
return 0;
}
int count = 0;
struct dirent *entry = nullptr;
while ((entry = readdir(dp)) != nullptr) {
std::string filename(entry->d_name);
if (filename.find("ds_llt") == std::string::npos || filename.find(".INFO") == std::string::npos
|| filename.find(".log") == std::string::npos) {
continue;
}
std::ifstream file(FLAGS_log_dir + "/" + filename);
std::string line;
while (std::getline(file, line)) {
if (line.find(marker) != std::string::npos) {
++count;
}
}
}
closedir(dp);
return count;
}
struct PureLogResult {
double rate = 0;
double accessRate = 0;
double diagnosticRate = 0;
bool directLog = false;
bool expensivePayload = false;
bool samplerOnly = false;
std::string samplerMode;
int totalRequests = 0;
double elapsedSec = 0.0;
double achievedQps = 0.0;
double avgUs = 0.0;
double p50Us = 0.0;
double p95Us = 0.0;
double p99Us = 0.0;
double maxUs = 0.0;
int writtenSamples = 0;
int payloadBuilds = 0;
};
std::string BuildExpensivePayload(const std::string &payload, int threadId, int seq,
std::atomic<int> &payloadBuilds)
{
payloadBuilds.fetch_add(1, std::memory_order_relaxed);
std::string message;
message.reserve(payload.size() * 4 + 128);
message.append(" thread=").append(std::to_string(threadId));
message.append(" seq=").append(std::to_string(seq));
message.append(" payload=");
for (int i = 0; i < 4; ++i) {
message.append(payload);
}
return message;
}
void EmitPureLogLine(bool directLog, bool expensivePayload, const std::string &marker, const std::string &payload,
int threadId, int seq, std::atomic<int> &payloadBuilds)
{
if (expensivePayload) {
if (directLog) {
LogMessage(LogSeverity::INFO, __FILE__, __LINE__).Stream()
<< marker << BuildExpensivePayload(payload, threadId, seq, payloadBuilds);
} else {
LOG(INFO) << marker << BuildExpensivePayload(payload, threadId, seq, payloadBuilds);
}
return;
}
if (directLog) {
LogMessage(LogSeverity::INFO, __FILE__, __LINE__).Stream()
<< marker << ", thread=" << threadId << ", seq=" << seq << ", payload=" << payload;
} else {
LOG(INFO) << marker << ", thread=" << threadId << ", seq=" << seq << ", payload=" << payload;
}
}
PureLogResult RunPureLogWorkload(double requestRate, const std::string &marker, bool expensivePayload,
bool directLog, const std::string &samplerMode = "")
{
StartLoggingWithSamplerRate(requestRate);
constexpr int requestsPerThread = PURE_LOG_PER_THREAD_QPS * PURE_LOG_DURATION_SEC;
constexpr int totalRequests = PURE_LOG_THREAD_COUNT * requestsPerThread;
const std::string payload = GenerateLogPayload(64);
std::atomic<int> ready{ 0 };
std::atomic<bool> start{ false };
std::atomic<int> payloadBuilds{ 0 };
std::chrono::steady_clock::time_point scheduledStart;
std::vector<std::vector<double>> threadLatencies(PURE_LOG_THREAD_COUNT);
std::vector<std::thread> threads;
threads.reserve(PURE_LOG_THREAD_COUNT);
for (int threadId = 0; threadId < PURE_LOG_THREAD_COUNT; ++threadId) {
threads.emplace_back([&, threadId]() {
threadLatencies[threadId].reserve(requestsPerThread);
ready.fetch_add(1, std::memory_order_release);
while (!start.load(std::memory_order_acquire)) {
std::this_thread::yield();
}
const auto period = std::chrono::microseconds(1000000 / PURE_LOG_PER_THREAD_QPS);
for (int i = 0; i < requestsPerThread; ++i) {
std::this_thread::sleep_until(scheduledStart + period * i);
auto begin = std::chrono::high_resolution_clock::now();
{
TraceGuard traceGuard = Trace::Instance().SetRequestTraceUUID();
EmitPureLogLine(directLog, expensivePayload, marker, payload, threadId, i, payloadBuilds);
}
auto end = std::chrono::high_resolution_clock::now();
threadLatencies[threadId].push_back(
std::chrono::duration<double, std::micro>(end - begin).count());
}
Trace::Instance().Invalidate();
});
}
while (ready.load(std::memory_order_acquire) != PURE_LOG_THREAD_COUNT) {
std::this_thread::yield();
}
scheduledStart = std::chrono::steady_clock::now() + std::chrono::milliseconds(200);
start.store(true, std::memory_order_release);
for (auto &thread : threads) {
thread.join();
}
const auto finish = std::chrono::steady_clock::now();
Provider::Instance().FlushLogs();
std::vector<double> allLatencies;
allLatencies.reserve(totalRequests);
for (auto &latencies : threadLatencies) {
allLatencies.insert(allLatencies.end(), latencies.begin(), latencies.end());
}
EXPECT_EQ(allLatencies.size(), static_cast<size_t>(totalRequests));
PureLogResult result;
result.rate = requestRate;
result.directLog = directLog;
result.expensivePayload = expensivePayload;
result.samplerMode = samplerMode.empty()
? (std::fabs(requestRate - 1.0) < 1e-9
? "sampler-through"
: (std::fabs(requestRate) < 1e-9 ? "sampler-reject" : "sampler-half"))
: samplerMode;
result.samplerOnly = false;
result.totalRequests = totalRequests;
result.elapsedSec = std::chrono::duration<double>(finish - scheduledStart).count();
result.achievedQps = totalRequests / result.elapsedSec;
result.p50Us = CalculatePercentile(allLatencies, 0.50);
result.p95Us = CalculatePercentile(allLatencies, 0.95);
result.p99Us = CalculatePercentile(allLatencies, 0.99);
result.avgUs = CalculateAvgLatency(allLatencies);
result.maxUs = *std::max_element(allLatencies.begin(), allLatencies.end());
result.writtenSamples = CountInfoLogLinesWithMarker(marker);
result.payloadBuilds = payloadBuilds.load(std::memory_order_relaxed);
return result;
}
PureLogResult RunPureLogWorkloadMultiRate(double requestRate, double accessRate, double diagnosticRate,
const std::string &marker, bool expensivePayload, bool directLog,
const std::string &samplerMode = "")
{
StartLoggingWithSamplerConfig(requestRate, accessRate, diagnosticRate);
constexpr int requestsPerThread = PURE_LOG_PER_THREAD_QPS * PURE_LOG_DURATION_SEC;
constexpr int totalRequests = PURE_LOG_THREAD_COUNT * requestsPerThread;
const std::string payload = GenerateLogPayload(64);
std::atomic<int> ready{ 0 };
std::atomic<bool> start{ false };
std::atomic<int> payloadBuilds{ 0 };
std::chrono::steady_clock::time_point scheduledStart;
std::vector<std::vector<double>> threadLatencies(PURE_LOG_THREAD_COUNT);
std::vector<std::thread> threads;
threads.reserve(PURE_LOG_THREAD_COUNT);
for (int threadId = 0; threadId < PURE_LOG_THREAD_COUNT; ++threadId) {
threads.emplace_back([&, threadId]() {
threadLatencies[threadId].reserve(requestsPerThread);
ready.fetch_add(1, std::memory_order_release);
while (!start.load(std::memory_order_acquire)) {
std::this_thread::yield();
}
const auto period = std::chrono::microseconds(1000000 / PURE_LOG_PER_THREAD_QPS);
for (int i = 0; i < requestsPerThread; ++i) {
std::this_thread::sleep_until(scheduledStart + period * i);
auto begin = std::chrono::high_resolution_clock::now();
{
TraceGuard traceGuard = Trace::Instance().SetRequestTraceUUID();
EmitPureLogLine(directLog, expensivePayload, marker, payload, threadId, i, payloadBuilds);
}
auto end = std::chrono::high_resolution_clock::now();
threadLatencies[threadId].push_back(
std::chrono::duration<double, std::micro>(end - begin).count());
}
Trace::Instance().Invalidate();
});
}
while (ready.load(std::memory_order_acquire) != PURE_LOG_THREAD_COUNT) {
std::this_thread::yield();
}
scheduledStart = std::chrono::steady_clock::now() + std::chrono::milliseconds(200);
start.store(true, std::memory_order_release);
for (auto &thread : threads) {
thread.join();
}
const auto finish = std::chrono::steady_clock::now();
Provider::Instance().FlushLogs();
std::vector<double> allLatencies;
allLatencies.reserve(totalRequests);
for (auto &latencies : threadLatencies) {
allLatencies.insert(allLatencies.end(), latencies.begin(), latencies.end());
}
EXPECT_EQ(allLatencies.size(), static_cast<size_t>(totalRequests));
PureLogResult result;
result.rate = requestRate;
result.accessRate = accessRate;
result.diagnosticRate = diagnosticRate;
result.directLog = directLog;
result.expensivePayload = expensivePayload;
result.samplerMode = samplerMode.empty() ? "multi-rate" : samplerMode;
result.samplerOnly = false;
result.totalRequests = totalRequests;
result.elapsedSec = std::chrono::duration<double>(finish - scheduledStart).count();
result.achievedQps = totalRequests / result.elapsedSec;
result.p50Us = CalculatePercentile(allLatencies, 0.50);
result.p95Us = CalculatePercentile(allLatencies, 0.95);
result.p99Us = CalculatePercentile(allLatencies, 0.99);
result.avgUs = CalculateAvgLatency(allLatencies);
result.maxUs = *std::max_element(allLatencies.begin(), allLatencies.end());
result.writtenSamples = CountInfoLogLinesWithMarker(marker);
result.payloadBuilds = payloadBuilds.load(std::memory_order_relaxed);
return result;
}
PureLogResult RunPureLogWorkloadNoSampler(const std::string &marker, bool expensivePayload)
{
StartLoggingWithoutSampler();
constexpr int requestsPerThread = PURE_LOG_PER_THREAD_QPS * PURE_LOG_DURATION_SEC;
constexpr int totalRequests = PURE_LOG_THREAD_COUNT * requestsPerThread;
const std::string payload = GenerateLogPayload(64);
std::atomic<int> ready{ 0 };
std::atomic<bool> start{ false };
std::atomic<int> payloadBuilds{ 0 };
std::chrono::steady_clock::time_point scheduledStart;
std::vector<std::vector<double>> threadLatencies(PURE_LOG_THREAD_COUNT);
std::vector<std::thread> threads;
threads.reserve(PURE_LOG_THREAD_COUNT);
for (int threadId = 0; threadId < PURE_LOG_THREAD_COUNT; ++threadId) {
threads.emplace_back([&, threadId]() {
threadLatencies[threadId].reserve(requestsPerThread);
ready.fetch_add(1, std::memory_order_release);
while (!start.load(std::memory_order_acquire)) {
std::this_thread::yield();
}
const auto period = std::chrono::microseconds(1000000 / PURE_LOG_PER_THREAD_QPS);
for (int i = 0; i < requestsPerThread; ++i) {
std::this_thread::sleep_until(scheduledStart + period * i);
auto begin = std::chrono::high_resolution_clock::now();
{
TraceGuard traceGuard = Trace::Instance().SetRequestTraceUUID();
EmitPureLogLine(false, expensivePayload, marker, payload, threadId, i, payloadBuilds);
}
auto end = std::chrono::high_resolution_clock::now();
threadLatencies[threadId].push_back(
std::chrono::duration<double, std::micro>(end - begin).count());
}
Trace::Instance().Invalidate();
});
}
while (ready.load(std::memory_order_acquire) != PURE_LOG_THREAD_COUNT) {
std::this_thread::yield();
}
scheduledStart = std::chrono::steady_clock::now() + std::chrono::milliseconds(200);
start.store(true, std::memory_order_release);
for (auto &thread : threads) {
thread.join();
}
const auto finish = std::chrono::steady_clock::now();
Provider::Instance().FlushLogs();
std::vector<double> allLatencies;
allLatencies.reserve(totalRequests);
for (auto &latencies : threadLatencies) {
allLatencies.insert(allLatencies.end(), latencies.begin(), latencies.end());
}
EXPECT_EQ(allLatencies.size(), static_cast<size_t>(totalRequests));
PureLogResult result;
result.rate = 0;
result.accessRate = 0;
result.diagnosticRate = 0;
result.directLog = false;
result.expensivePayload = expensivePayload;
result.samplerMode = "no-sampler";
result.samplerOnly = false;
result.totalRequests = totalRequests;
result.elapsedSec = std::chrono::duration<double>(finish - scheduledStart).count();
result.achievedQps = totalRequests / result.elapsedSec;
result.p50Us = CalculatePercentile(allLatencies, 0.50);
result.p95Us = CalculatePercentile(allLatencies, 0.95);
result.p99Us = CalculatePercentile(allLatencies, 0.99);
result.avgUs = CalculateAvgLatency(allLatencies);
result.maxUs = *std::max_element(allLatencies.begin(), allLatencies.end());
result.writtenSamples = CountInfoLogLinesWithMarker(marker);
result.payloadBuilds = payloadBuilds.load(std::memory_order_relaxed);
return result;
}
void PrintPureLogResult(const std::string &tag, const PureLogResult &result)
{
std::cout << tag << " samplerMode=" << result.samplerMode << " samplerOnly=" << result.samplerOnly
<< " rate=" << result.rate << " accessRate=" << result.accessRate
<< " diagnosticRate=" << result.diagnosticRate << " directLog=" << result.directLog
<< " expensivePayload=" << result.expensivePayload << " threads=" << PURE_LOG_THREAD_COUNT
<< " targetQps=" << PURE_LOG_TARGET_QPS << " durationSec=" << PURE_LOG_DURATION_SEC
<< " totalRequests=" << result.totalRequests << " elapsedSec=" << std::fixed
<< std::setprecision(OUTPUT_PRECISION) << result.elapsedSec << " achievedQps=" << result.achievedQps
<< " avgUs=" << result.avgUs << " p50Us=" << result.p50Us << " p95Us=" << result.p95Us
<< " p99Us=" << result.p99Us << " maxUs=" << result.maxUs
<< " writtenSamples=" << result.writtenSamples << " payloadBuilds=" << result.payloadBuilds
<< std::endl;
}
private:
std::string loggerName_;
};
* Single-threaded throughput test
* Measures performance with different log message sizes (including prefix)
* Reports: throughput (messages/sec), throughput (MB/sec), average latency and P99 latency
*/
TEST_F(LogPerformanceTest, SingleThreadThroughput)
{
std::cout << "\n[Single-thread] Spdlog throughput test\n";
StartLogging();
std::vector<std::pair<size_t, size_t>> testCases = {
{ 32, 1000000 },
{ 512, 200000 },
{ 4096, 50000 }
};
for (auto &[payloadLength, count] : testCases) {
size_t totalLogSize = CalculateTotalLogSize(payloadLength);
std::string logPayload = GenerateLogPayload(payloadLength);
Timer totalTimer;
std::vector<double> latencies;
latencies.reserve(count);
for (int i = 0; i < WARMUP_ITERATIONS; ++i) {
LOG(INFO) << logPayload;
}
Provider::Instance().FlushLogs();
totalTimer.Start();
for (size_t i = 0; i < count; ++i) {
auto start = std::chrono::high_resolution_clock::now();
LOG(INFO) << logPayload;
auto end = std::chrono::high_resolution_clock::now();
double latencyUs = std::chrono::duration<double, std::micro>(end - start).count();
latencies.push_back(latencyUs);
}
Provider::Instance().FlushLogs();
double totalElapsedUs = totalTimer.ElapsedUs();
double totalElapsedSec = totalElapsedUs / MICROSEC_TO_SEC;
double totalBytes = static_cast<double>(totalLogSize * count);
double tps = count / totalElapsedSec;
double mbps = (totalBytes / (MB_TO_BYTES)) / totalElapsedSec;
double avgLatency = CalculateAvgLatency(latencies);
double p99Latency = CalculateP99(latencies);
std::cout << " Payload length: " << payloadLength << "B, Total log size: ~" << totalLogSize << "B\n";
std::cout << " Total messages: " << count << ", Total data: " << std::fixed
<< std::setprecision(OUTPUT_PRECISION) << (totalBytes / (MB_TO_BYTES)) << "MB\n";
std::cout << " Total duration: " << std::fixed << std::setprecision(OUTPUT_PRECISION)
<< (totalElapsedUs / MICROSEC_TO_MILLISEC) << "ms\n";
std::cout << " Throughput: " << std::fixed << std::setprecision(OUTPUT_PRECISION) << tps << " msg/sec ("
<< mbps << " MB/sec)\n";
std::cout << " Latency: Avg = " << std::fixed << std::setprecision(OUTPUT_PRECISION) << avgLatency << "μs, "
<< "P99 = " << p99Latency << "μs\n\n";
}
}
* Multi-threaded throughput test
* Measures performance with different log message sizes (including prefix)
* Reports: throughput (messages/sec), throughput (MB/sec), average latency and P99 latency
*/
TEST_F(LogPerformanceTest, MultiThreadThroughput)
{
std::cout << "\n[Multi-thread] Spdlog throughput test\n";
StartLogging();
std::vector<std::pair<size_t, size_t>> testCases = {
{ 32, 1000000 },
{ 512, 200000 },
{ 4096, 50000 }
};
const size_t threadCount = 10;
std::cout << "[Multi-thread] Using " << threadCount << " concurrent threads\n";
for (auto &[payloadLength, totalCount] : testCases) {
size_t perThreadCount = totalCount / threadCount;
size_t totalLogSize = CalculateTotalLogSize(payloadLength);
std::string logPayload = GenerateLogPayload(payloadLength);
Timer totalTimer;
std::vector<double> allLatencies;
std::mutex latencyMutex;
std::vector<std::thread> threads;
for (size_t t = 0; t < threadCount; ++t) {
threads.emplace_back([&]() {
for (int i = 0; i < WARMUP_ITERATIONS; ++i) {
LOG(INFO) << logPayload;
}
});
}
for (auto &th : threads) {
th.join();
}
threads.clear();
Provider::Instance().FlushLogs();
totalTimer.Start();
for (size_t t = 0; t < threadCount; ++t) {
threads.emplace_back([&, threadId = t]() {
std::vector<double> threadLatencies;
threadLatencies.reserve(perThreadCount);
for (size_t i = 0; i < perThreadCount; ++i) {
auto start = std::chrono::high_resolution_clock::now();
LOG(INFO) << logPayload;
auto end = std::chrono::high_resolution_clock::now();
double latencyUs = std::chrono::duration<double, std::micro>(end - start).count();
threadLatencies.push_back(latencyUs);
}
std::lock_guard<std::mutex> lock(latencyMutex);
allLatencies.insert(allLatencies.end(), threadLatencies.begin(), threadLatencies.end());
});
}
for (auto &th : threads) {
th.join();
}
Provider::Instance().FlushLogs();
double totalElapsedUs = totalTimer.ElapsedUs();
double totalElapsedSec = totalElapsedUs / MICROSEC_TO_SEC;
double totalBytes = static_cast<double>(totalLogSize * totalCount);
double tps = totalCount / totalElapsedSec;
double mbps = (totalBytes / (MB_TO_BYTES)) / totalElapsedSec;
double avgLatency = CalculateAvgLatency(allLatencies);
double p99Latency = CalculateP99(allLatencies);
std::cout << " Payload length: " << payloadLength << "B, Total log size: ~" << totalLogSize << "B\n";
std::cout << " Total messages: " << totalCount << " (per thread: " << perThreadCount << ")\n";
std::cout << " Total data: " << std::fixed << std::setprecision(OUTPUT_PRECISION)
<< (totalBytes / (MB_TO_BYTES)) << "MB, Duration: " << std::fixed
<< std::setprecision(OUTPUT_PRECISION) << (totalElapsedUs / MICROSEC_TO_MILLISEC) << "ms\n";
std::cout << " Throughput: " << std::fixed << std::setprecision(OUTPUT_PRECISION) << tps << " msg/sec ("
<< mbps << " MB/sec)\n";
std::cout << " Latency: Avg = " << std::fixed << std::setprecision(OUTPUT_PRECISION) << avgLatency << "μs, "
<< "P99 = " << p99Latency << "μs\n\n";
}
}
TEST_F(LogPerformanceTest, LEVEL1_RequestLogSampleRateHalfPureLoggingQps4000)
{
const std::string marker = "pure-log-sample-rate-qps4000-rate-" + std::to_string(PURE_LOG_SAMPLE_RATE);
auto result = RunPureLogWorkload(PURE_LOG_SAMPLE_RATE, marker, false, false, "sampler-half");
PrintPureLogResult("PURE_LOG_SAMPLE_RATE_RESULT", result);
EXPECT_GE(result.achievedQps, PURE_LOG_TARGET_QPS * 0.95);
EXPECT_LT(result.p99Us, 50000.0);
}
TEST_F(LogPerformanceTest, LEVEL1_RequestLogSampleRateHalfRejectedExpensivePayloadQps4000)
{
const std::string marker = "pure-log-sample-rate-expensive-rate-qps4000-" + std::to_string(PURE_LOG_SAMPLE_RATE);
auto result = RunPureLogWorkload(PURE_LOG_SAMPLE_RATE, marker, true, false, "sampler-half");
PrintPureLogResult("PURE_LOG_SAMPLE_RATE_EXPENSIVE_RESULT", result);
EXPECT_GE(result.achievedQps, PURE_LOG_TARGET_QPS * 0.95);
EXPECT_LT(result.p99Us, 50000.0);
EXPECT_GT(result.payloadBuilds, 0);
EXPECT_LT(result.payloadBuilds, result.totalRequests);
}
TEST_F(LogPerformanceTest, DISABLED_LEVEL1_SamplerPassThroughMacroOverheadQps4000)
{
auto noSampler = RunPureLogWorkloadNoSampler("pure-log-nosampler-baseline", true);
auto samplerThrough = RunPureLogWorkload(1.0, "pure-log-sampler-through", true, false, "sampler-through");
PrintPureLogResult("PURE_LOG_NOSAMPLER_BASELINE_RESULT", noSampler);
PrintPureLogResult("PURE_LOG_SAMPLER_THROUGH_RESULT", samplerThrough);
EXPECT_GE(noSampler.achievedQps, PURE_LOG_TARGET_QPS * 0.95);
EXPECT_GE(samplerThrough.achievedQps, PURE_LOG_TARGET_QPS * 0.95);
EXPECT_EQ(samplerThrough.payloadBuilds, samplerThrough.totalRequests);
EXPECT_LE(samplerThrough.p99Us, noSampler.p99Us * 1.01);
EXPECT_LE(samplerThrough.avgUs, noSampler.avgUs * 1.01);
}
TEST_F(LogPerformanceTest, LEVEL1_RequestRejectFastPathQps4000)
{
auto result = RunPureLogWorkloadMultiRate(0.0, 1.0, 1.0,
"pure-log-reject-fast-path-qps4000", true, false, "sampler-reject");
PrintPureLogResult("PURE_LOG_REJECT_FAST_PATH_RESULT", result);
EXPECT_GE(result.achievedQps, PURE_LOG_TARGET_QPS * 0.95);
EXPECT_EQ(result.payloadBuilds, 0);
EXPECT_LT(result.p99Us, 50000.0);
}
TEST_F(LogPerformanceTest, LEVEL1_SamplerOnlyDecisionCostQps4000)
{
StartLoggingWithSamplerConfig(0.5, 0.5, 1.0);
constexpr int totalDecisions = PURE_LOG_TARGET_QPS * PURE_LOG_DURATION_SEC;
std::atomic<int> ready{ 0 };
std::atomic<bool> start{ false };
std::vector<std::vector<double>> threadLatencies(PURE_LOG_THREAD_COUNT);
std::vector<std::thread> threads;
threads.reserve(PURE_LOG_THREAD_COUNT);
for (int threadId = 0; threadId < PURE_LOG_THREAD_COUNT; ++threadId) {
threads.emplace_back([&, threadId]() {
threadLatencies[threadId].reserve(totalDecisions / PURE_LOG_THREAD_COUNT);
ready.fetch_add(1, std::memory_order_release);
while (!start.load(std::memory_order_acquire)) {
std::this_thread::yield();
}
TraceGuard traceGuard = Trace::Instance().SetRequestTraceUUID();
for (int i = 0; i < totalDecisions / PURE_LOG_THREAD_COUNT; ++i) {
auto beg = std::chrono::high_resolution_clock::now();
volatile bool result =
LogSampler::Instance().ShouldCreateRuntimeLog(LogSeverity::INFO, false);
(void)result;
auto end = std::chrono::high_resolution_clock::now();
threadLatencies[threadId].push_back(
std::chrono::duration<double, std::micro>(end - beg).count());
}
});
}
while (ready.load(std::memory_order_acquire) < PURE_LOG_THREAD_COUNT) {
std::this_thread::yield();
}
auto globalStart = std::chrono::high_resolution_clock::now();
start.store(true, std::memory_order_release);
for (auto &t : threads) {
t.join();
}
auto globalEnd = std::chrono::high_resolution_clock::now();
double totalSec = std::chrono::duration<double>(globalEnd - globalStart).count();
double achievedQps = totalDecisions / totalSec;
std::vector<double> allLatencies;
for (auto &v : threadLatencies) {
allLatencies.insert(allLatencies.end(), v.begin(), v.end());
}
std::sort(allLatencies.begin(), allLatencies.end());
double p50 = allLatencies[allLatencies.size() / 2];
double p99 = allLatencies[allLatencies.size() * 99 / 100];
std::cout << "SAMPLER_ONLY_QPS=" << std::fixed << std::setprecision(OUTPUT_PRECISION)
<< achievedQps << " p50=" << p50 << " p99=" << p99
<< " samplerMode=sampler-only samplerOnly=true" << std::endl;
EXPECT_GE(achievedQps, PURE_LOG_TARGET_QPS * 0.95);
EXPECT_LT(p99, 10.0);
}
TEST_F(LogPerformanceTest, LEVEL1_NoSamplerBaselineQps4000)
{
auto result = RunPureLogWorkloadNoSampler("pure-log-nosampler-baseline-qps4000", true);
PrintPureLogResult("PURE_LOG_NOSAMPLER_BASELINE_QPS4000_RESULT", result);
EXPECT_GE(result.achievedQps, PURE_LOG_TARGET_QPS * 0.95);
EXPECT_EQ(result.payloadBuilds, result.totalRequests);
EXPECT_LT(result.p99Us, 50000.0);
}
}
}