#include <gtest/gtest.h>
#include <vector>
#include <climits>
#include "runtime/watermark/StatusWatermarkValve.h"
#include "streaming/api/watermark/Watermark.h"
#include "runtime/watermark/WatermarkStatus.h"
class MockStatusWatermarkValveOutput {
public:
std::vector<int64_t> emittedWatermarks;
std::vector<WatermarkStatus*> emittedWatermarkStatuses;
void emitWatermark(Watermark* watermark)
{
if (watermark != nullptr) {
emittedWatermarks.push_back(watermark->getTimestamp());
}
}
void emitWatermarkStatus(WatermarkStatus* status)
{
if (status != nullptr) {
emittedWatermarkStatuses.push_back(status);
}
}
int64_t getLastWatermark()
{
if (emittedWatermarks.empty()) {
return INT64_MIN;
}
return emittedWatermarks.back();
}
WatermarkStatus* getLastWatermarkStatus()
{
if (emittedWatermarkStatuses.empty()) {
return nullptr;
}
return emittedWatermarkStatuses.back();
}
};
TEST(StatusWatermarkValveTest, ConstructorWithInvalidChannels)
{
EXPECT_THROW(StatusWatermarkValve<MockStatusWatermarkValveOutput> valve(0), std::logic_error);
EXPECT_THROW(StatusWatermarkValve<MockStatusWatermarkValveOutput> valve(-1), std::logic_error);
}
TEST(StatusWatermarkValveTest, InputWatermarkWithInvalidChannelIndex)
{
StatusWatermarkValve<MockStatusWatermarkValveOutput> valve(2);
MockStatusWatermarkValveOutput output;
Watermark wm(100);
EXPECT_THROW(valve.inputWatermark(&wm, -1, &output), std::logic_error);
EXPECT_THROW(valve.inputWatermark(&wm, 2, &output), std::logic_error);
}
TEST(StatusWatermarkValveTest, InputWatermarkToSingleChannel)
{
StatusWatermarkValve<MockStatusWatermarkValveOutput> valve(1);
MockStatusWatermarkValveOutput output;
Watermark wm1(100);
valve.inputWatermark(&wm1, 0, &output);
EXPECT_EQ(output.getLastWatermark(), 100);
Watermark wm2(200);
valve.inputWatermark(&wm2, 0, &output);
EXPECT_EQ(output.getLastWatermark(), 200);
}
TEST(StatusWatermarkValveTest, InputWatermarkToMultipleChannels)
{
StatusWatermarkValve<MockStatusWatermarkValveOutput> valve(2);
MockStatusWatermarkValveOutput output;
Watermark wm1(100);
valve.inputWatermark(&wm1, 0, &output);
EXPECT_EQ(output.getLastWatermark(), INT64_MIN);
Watermark wm2(150);
valve.inputWatermark(&wm2, 1, &output);
EXPECT_EQ(output.getLastWatermark(), 100);
Watermark wm3(200);
valve.inputWatermark(&wm3, 1, &output);
EXPECT_EQ(output.getLastWatermark(), 100);
Watermark wm4(150);
valve.inputWatermark(&wm4, 0, &output);
EXPECT_EQ(output.getLastWatermark(), 150);
Watermark wm5(180);
valve.inputWatermark(&wm5, 0, &output);
EXPECT_EQ(output.getLastWatermark(), 180);
}