#include <gtest/gtest.h>
#include "datagen/nexmark/generator/NexmarkGenerator.h"
#include "datagen/nexmark/NexmarkSourceFunction.h"
#include "core/typeinfo/TypeInfoFactory.h"
#include "streaming/api/operators/StreamingRuntimeContext.h"
#include "runtime/taskmanager/OmniRuntimeEnvironment.h"
class DummySourceContext : public SourceContext {
public:
DummySourceContext(Object* lock) : lock(lock)
{
}
void collect(void* element) override
{
reUseRecord = element;
}
void collectWithTimestamp(void* element, int64_t timestamp) override
{
collect(element);
}
void emitWatermark(Watermark* mark) override
{
}
void markAsTemporarilyIdle() override
{
}
Object* getCheckpointLock() override
{
return lock;
}
void close() override
{
}
void* reUseRecord;
Object* lock;
};
TEST(SourceTest, NexmarkDataGeneratorTest)
{
int batchSize = 100;
BatchEventDeserializer* eventDeserializer = new BatchEventDeserializer(batchSize);
auto typeInfo = TypeInfoFactory::createTypeInfo("String");
NexmarkConfiguration nexmarkConfig;
* const NexmarkConfiguration &configuration,
int64_t baseTime,
int64_t firstEventId,
int64_t maxEventsOrZero,
int64_t firstEventNumber
*/
std::string baseTimeStr = "2025-02-22 00:00:00";
GeneratorConfig config{nexmarkConfig, 1740182400000, 0, 100, 0};
NexmarkSourceFunction<omnistream::VectorBatch> srcFunc{config, eventDeserializer, typeInfo};
auto runtimeEnv = new omnistream::RuntimeEnvironmentV2();
auto runtimeCtx = new StreamingRuntimeContext<int>();
runtimeCtx->setEnvironment(runtimeEnv);
srcFunc.setRuntimeContext(runtimeCtx);
Configuration runConfig;
srcFunc.open(runConfig);
thread_local Object lock;
DummySourceContext ctx(&lock);
srcFunc.run(&ctx);
auto output = reinterpret_cast<omnistream::VectorBatch*>(ctx.reUseRecord);
EXPECT_EQ(output->GetRowCount(), batchSize);
}