* Copyright (c) Huawei Technologies Co., Ltd. 2026. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: Unit tests for LogSampler core component.
*/
#include "datasystem/common/log/log_sampler.h"
#include "datasystem/common/log/access_recorder.h"
#include <atomic>
#include <cmath>
#include <cstdint>
#include <thread>
#include <vector>
#include "ut/common.h"
#include "datasystem/common/log/trace.h"
#include "datasystem/protos/share_memory.pb.h"
namespace datasystem {
namespace ut {
class LogSamplerTest : public CommonTest {
protected:
void SetUp() override
{
CommonTest::SetUp();
LogSampler::Instance().ResetForTest();
}
void TearDown() override
{
LogSampler::Instance().ResetForTest();
CommonTest::TearDown();
}
};
TEST_F(LogSamplerTest, RequestOnlyDerivation)
{
LogSampler &s = LogSampler::Instance();
s.ResetForTest();
LogSampleUserConfig cfg;
cfg.requestSampleRate = 0.2;
cfg.requestSampleRateExplicit = true;
cfg.accessSampleRateExplicit = false;
cfg.diagnosticSampleRateExplicit = false;
ASSERT_TRUE(s.UpdateConfigFromFlags(cfg));
auto *snap = s.GetSnapshotForTest();
ASSERT_NE(snap, nullptr);
EXPECT_EQ(snap->config.requestRate.ppm, 200000);
EXPECT_EQ(snap->config.accessRate.ppm, 600000);
EXPECT_EQ(snap->config.diagnosticRate.ppm, 800000);
EXPECT_TRUE(snap->config.enabled);
}
TEST_F(LogSamplerTest, ExplicitOverrideDerivation)
{
LogSampler &s = LogSampler::Instance();
s.ResetForTest();
LogSampleUserConfig cfg;
cfg.requestSampleRate = 0.2;
cfg.requestSampleRateExplicit = true;
cfg.accessSampleRate = 0.3;
cfg.accessSampleRateExplicit = true;
cfg.diagnosticSampleRateExplicit = false;
ASSERT_TRUE(s.UpdateConfigFromFlags(cfg));
auto *snap = s.GetSnapshotForTest();
ASSERT_NE(snap, nullptr);
EXPECT_EQ(snap->config.accessRate.ppm, 300000);
EXPECT_EQ(snap->config.diagnosticRate.ppm, kSamplePpmBase);
EXPECT_TRUE(snap->config.enabled);
}
TEST_F(LogSamplerTest, EnabledNormalization)
{
LogSampler &s = LogSampler::Instance();
s.ResetForTest();
LogSampleUserConfig cfg;
cfg.requestSampleRate = 1.0;
cfg.requestSampleRateExplicit = true;
cfg.accessSampleRate = 1.0;
cfg.accessSampleRateExplicit = true;
cfg.diagnosticSampleRate = 1.0;
cfg.diagnosticSampleRateExplicit = true;
ASSERT_TRUE(s.UpdateConfigFromFlags(cfg));
EXPECT_FALSE(s.IsSamplerEnabledFast());
s.ResetForTest();
cfg.requestSampleRate = 0.5;
ASSERT_TRUE(s.UpdateConfigFromFlags(cfg));
EXPECT_TRUE(s.IsSamplerEnabledFast());
}
TEST_F(LogSamplerTest, InvalidConfigRejected)
{
LogSampler &s = LogSampler::Instance();
s.ResetForTest();
LogSampleUserConfig validCfg;
validCfg.requestSampleRate = 0.5;
validCfg.requestSampleRateExplicit = true;
ASSERT_TRUE(s.UpdateConfigFromFlags(validCfg));
auto *snap1 = s.GetSnapshotForTest();
ASSERT_NE(snap1, nullptr);
EXPECT_EQ(snap1->config.requestRate.ppm, 500000);
LogSampleUserConfig invalidCfg;
invalidCfg.requestSampleRate = -0.1;
invalidCfg.requestSampleRateExplicit = true;
EXPECT_FALSE(s.UpdateConfigFromFlags(invalidCfg));
auto *snap2 = s.GetSnapshotForTest();
ASSERT_NE(snap2, nullptr);
EXPECT_EQ(snap2->config.requestRate.ppm, 500000);
s.ResetForTest();
LogSampleUserConfig nanCfg;
nanCfg.requestSampleRate = std::nan("");
nanCfg.requestSampleRateExplicit = true;
EXPECT_FALSE(s.UpdateConfigFromFlags(nanCfg));
s.ResetForTest();
LogSampleUserConfig overflowCfg;
overflowCfg.requestSampleRate = 1.5;
overflowCfg.requestSampleRateExplicit = true;
EXPECT_FALSE(s.UpdateConfigFromFlags(overflowCfg));
}
TEST_F(LogSamplerTest, HashThresholdDecision)
{
LogSampler &s = LogSampler::Instance();
s.ResetForTest();
s.SetSaltForTest(0x123456789ABCDEF0ULL);
LogSampleUserConfig cfg;
cfg.requestSampleRate = 0.5;
cfg.requestSampleRateExplicit = true;
ASSERT_TRUE(s.UpdateConfigFromFlags(cfg));
TraceGuard guard = Trace::Instance().SetRequestTraceUUID();
ASSERT_TRUE(Trace::Instance().IsRequestLogTrace());
SampleRate halfRate;
halfRate.ppm = 500000;
halfRate.threshold = BuildThreshold(500000);
bool firstResult = s.IsCurrentRequestSampledIn(halfRate);
bool admitted1 = false;
EXPECT_TRUE(Trace::Instance().GetRequestSampleDecision(admitted1));
EXPECT_EQ(admitted1, firstResult);
(void)s.IsCurrentRequestSampledIn(halfRate);
bool admitted2 = false;
EXPECT_TRUE(Trace::Instance().GetRequestSampleDecision(admitted2));
EXPECT_EQ(admitted2, firstResult);
}
TEST_F(LogSamplerTest, RandomDistribution)
{
LogSampler &s = LogSampler::Instance();
s.ResetForTest();
s.SetSaltForTest(42);
SampleRate halfRate;
halfRate.ppm = 500000;
halfRate.threshold = BuildThreshold(500000);
int hits = 0;
constexpr int kNumKeys = 100000;
for (int i = 0; i < kNumKeys; ++i) {
uint64_t traceHash = static_cast<uint64_t>(i * 1000);
if (s.ShouldSampleEvent(traceHash, LogSampleKind::DIAGNOSTIC, halfRate)) {
++hits;
}
}
double ratio = static_cast<double>(hits) / kNumKeys;
EXPECT_NEAR(ratio, 0.5, 0.01);
}
TEST_F(LogSamplerTest, SameTraceSequentialEvents)
{
LogSampler &s = LogSampler::Instance();
s.ResetForTest();
s.SetSaltForTest(0);
SampleRate halfRate;
halfRate.ppm = 500000;
halfRate.threshold = BuildThreshold(500000);
uint64_t traceHash = 12345;
int hits = 0;
constexpr int kNumCalls = 1000;
for (int i = 0; i < kNumCalls; ++i) {
if (s.ShouldSampleEvent(traceHash, LogSampleKind::DIAGNOSTIC, halfRate)) {
++hits;
}
}
double ratio = static_cast<double>(hits) / kNumCalls;
EXPECT_NEAR(ratio, 0.5, 0.05);
EXPECT_GT(ratio, 0.1);
EXPECT_LT(ratio, 0.9);
}
TEST_F(LogSamplerTest, FatalAlwaysPass)
{
LogSampler &s = LogSampler::Instance();
s.ResetForTest();
LogSampleUserConfig cfg;
cfg.requestSampleRate = 0.0;
cfg.requestSampleRateExplicit = true;
cfg.accessSampleRate = 0.0;
cfg.accessSampleRateExplicit = true;
cfg.diagnosticSampleRate = 0.0;
cfg.diagnosticSampleRateExplicit = true;
ASSERT_TRUE(s.UpdateConfigFromFlags(cfg));
EXPECT_TRUE(s.ShouldCreateRuntimeLog(LogSeverity::FATAL, false));
}
TEST_F(LogSamplerTest, DisabledFastPath)
{
LogSampler &s = LogSampler::Instance();
s.ResetForTest();
EXPECT_FALSE(s.IsSamplerEnabledFast());
EXPECT_TRUE(s.ShouldCreateRuntimeLog(LogSeverity::INFO, false));
EXPECT_TRUE(s.ShouldCreateRuntimeLog(LogSeverity::ERROR, false));
}
TEST_F(LogSamplerTest, RandomDistribution_0_1)
{
LogSampler &s = LogSampler::Instance();
s.ResetForTest();
s.SetSaltForTest(42);
SampleRate rate;
rate.ppm = 100000;
rate.threshold = BuildThreshold(100000);
int hits = 0;
constexpr int kNumKeys = 100000;
for (int i = 0; i < kNumKeys; ++i) {
uint64_t traceHash = static_cast<uint64_t>(i * 1000);
if (s.ShouldSampleEvent(traceHash, LogSampleKind::DIAGNOSTIC, rate)) {
++hits;
}
}
double ratio = static_cast<double>(hits) / kNumKeys;
EXPECT_NEAR(ratio, 0.1, 0.01);
}
TEST_F(LogSamplerTest, RandomDistribution_0_9)
{
LogSampler &s = LogSampler::Instance();
s.ResetForTest();
s.SetSaltForTest(42);
SampleRate rate;
rate.ppm = 900000;
rate.threshold = BuildThreshold(900000);
int hits = 0;
constexpr int kNumKeys = 100000;
for (int i = 0; i < kNumKeys; ++i) {
uint64_t traceHash = static_cast<uint64_t>(i * 1000);
if (s.ShouldSampleEvent(traceHash, LogSampleKind::DIAGNOSTIC, rate)) {
++hits;
}
}
double ratio = static_cast<double>(hits) / kNumKeys;
EXPECT_NEAR(ratio, 0.9, 0.01);
}
TEST_F(LogSamplerTest, RandomBucketDistribution)
{
LogSampler &s = LogSampler::Instance();
s.ResetForTest();
s.SetSaltForTest(42);
SampleRate halfRate;
halfRate.ppm = 500000;
halfRate.threshold = BuildThreshold(500000);
constexpr int kNumBuckets = 20;
constexpr int kNumKeys = 100000;
std::vector<int> bucketHits(kNumBuckets, 0);
std::vector<int> bucketTotal(kNumBuckets, 0);
for (int i = 0; i < kNumKeys; ++i) {
uint64_t traceHash = static_cast<uint64_t>(i * 997);
int bucket = static_cast<int>(i % kNumBuckets);
bucketTotal[bucket]++;
if (s.ShouldSampleEvent(traceHash, LogSampleKind::DIAGNOSTIC, halfRate)) {
bucketHits[bucket]++;
}
}
int firstHalfHits = 0;
int secondHalfHits = 0;
for (int b = 0; b < kNumBuckets; ++b) {
ASSERT_GT(bucketTotal[b], 0);
double ratio = static_cast<double>(bucketHits[b]) / bucketTotal[b];
EXPECT_NEAR(ratio, 0.5, 0.05);
if (b < kNumBuckets / 2) {
firstHalfHits += bucketHits[b];
} else {
secondHalfHits += bucketHits[b];
}
}
EXPECT_GT(firstHalfHits, 0);
EXPECT_GT(secondHalfHits, 0);
}
TEST_F(LogSamplerTest, ConcurrentConfigUpdateAndHotPathRead)
{
LogSampler &s = LogSampler::Instance();
s.ResetForTest();
s.Init();
constexpr int kConfigThreads = 4;
constexpr int kHotPathThreads = 8;
constexpr int kConfigUpdatesPerThread = 30;
constexpr int kHotPathIterations = 5000;
std::atomic<bool> configDone{false};
std::atomic<int> configUpdates{0};
auto configWorker = [&](int threadIdx) {
for (int i = 0; i < kConfigUpdatesPerThread; ++i) {
if (i % 2 == 0) {
LogSampleUserConfig cfg;
double rate = 0.1 + 0.1 * ((threadIdx + i) % 10);
cfg.requestSampleRate = rate;
cfg.requestSampleRateExplicit = true;
cfg.accessSampleRateExplicit = false;
cfg.diagnosticSampleRateExplicit = false;
s.UpdateConfigFromFlags(cfg);
} else {
LogSampleConfigPb proto;
proto.set_enabled(true);
uint32_t ppm = 100000 + 100000 * ((threadIdx + i) % 9);
proto.set_request_sample_ppm(ppm);
proto.set_access_sample_ppm(kSamplePpmBase);
proto.set_diagnostic_sample_ppm(ppm);
s.UpdateConfigFromProto(proto);
}
configUpdates.fetch_add(1, std::memory_order_relaxed);
}
};
auto hotPathReader = [&]() {
for (int i = 0; i < kHotPathIterations; ++i) {
s.IsSamplerEnabledFast();
s.ShouldCreateRuntimeLog(LogSeverity::INFO, false);
s.ShouldCreateRuntimeLog(LogSeverity::WARNING, true);
s.IsCurrentRequestSampledIn();
s.ShouldRecordAccess(AccessRecorderKey::DS_KV_CLIENT_SET);
s.ShouldRecordAccessType(AccessKeyType::CLIENT);
s.ShouldRecordAccessType(AccessKeyType::ACCESS);
s.ShouldRecordAccessType(AccessKeyType::REQUEST_OUT);
}
while (!configDone.load(std::memory_order_acquire)) {
s.IsSamplerEnabledFast();
s.ShouldCreateRuntimeLog(LogSeverity::INFO, false);
s.IsCurrentRequestSampledIn();
s.ShouldRecordAccessType(AccessKeyType::CLIENT);
}
};
std::vector<std::thread> configThreads;
std::vector<std::thread> hotPathThreads;
for (int t = 0; t < kConfigThreads; ++t) {
configThreads.emplace_back(configWorker, t);
}
for (int t = 0; t < kHotPathThreads; ++t) {
hotPathThreads.emplace_back(hotPathReader);
}
for (auto &t : configThreads) {
if (t.joinable()) {
t.join();
}
}
configDone.store(true, std::memory_order_release);
for (auto &t : hotPathThreads) {
if (t.joinable()) {
t.join();
}
}
EXPECT_GT(configUpdates.load(), 0);
}
}
}