#include <gtest/gtest.h>
#include "streaming/api/operators/StreamGroupedReduceOperator.h"
#include "test_utils/Mocks.h"
#include "basictypes/Tuple2.h"
#include "basictypes/Long.h"
#include "runtime/taskmanager/OmniRuntimeEnvironment.h"
#include "runtime/state/TaskStateManager.h"
#include <memory>
#include <string>
using namespace std;
TEST(StreamGroupedReduceOperatorTest, Constructor_ValidPaths) {
MockOutput output;
string soPath = "/tmp/libMockReduceFunction.so";
nlohmann::json config;
config["udf_so"] = "/tmp/libMockReduceFunction.so";
config["key_so"][0] = "libMockKeyedBy.so";
config["hash_path"] = "/tmp/";
config["udf_obj"] = "{}";
omnistream::datastream::StreamGroupedReduceOperator<Object> reduceOp(&output);
EXPECT_NO_THROW(reduceOp.loadUdf(config));
}
TEST(StreamGroupedReduceOperatorTest, Constructor_ReduceFunctionNull) {
MockOutput output;
string soPath = "invalid_reduce.so";
string keyBySoName = "libMockKeyedBy.so";
nlohmann::json config;
config["udf_so"] = "invalid_reduce.so";
config["key_so"][0] = "libMockKeyedBy.so";
config["hash_path"] = "/tmp/";
config["udf_obj"] = "{}";
omnistream::datastream::StreamGroupedReduceOperator<Object> reduceOp(&output);
EXPECT_THROW(reduceOp.loadUdf(config), std::out_of_range);
}
TEST(StreamGroupedReduceOperatorTest, ProcessElement_NewKey) {
MockOutput output;
string soPath = "/tmp/libMockReduceFunction.so";
string keyBySoName = "/tmp/libMockKeyedBy.so";
nlohmann::json config;
config["udf_so"] = soPath;
config["key_so"][0] = "libMockKeyedBy.so";
config["hash_path"] = "/tmp/";
config["udf_obj"] = "{}";
omnistream::datastream::StreamGroupedReduceOperator<Object> reduceOp(&output, config);
auto env2 = new omnistream::RuntimeEnvironmentV2();
auto taskInfo = new TaskInformationPOD();
taskInfo->setStateBackend("HashMapStateBackend");
{
auto configPOD = taskInfo->getStreamConfigPOD();
auto operatorDesc = configPOD.getOperatorDescription();
operatorDesc.setOperatorId("deadbeefdeadbeefdeadbeefdeadbeef");
configPOD.setOperatorDescription(operatorDesc);
taskInfo->setStreamConfigPOD(configPOD);
}
env2->SetTaskStateManager(std::make_shared<omnistream::TaskStateManager>());
env2->setTaskConfiguration(*taskInfo);
StreamTaskStateInitializerImpl *initializer = new StreamTaskStateInitializerImpl(env2);
reduceOp.setup();
reduceOp.initializeState(initializer, new StringSerializer());
reduceOp.open();
Object* key = new String("key1");
Object* value = new Long(1);
Tuple2* tuple = new Tuple2(key, value);
StreamRecord *record = new StreamRecord(tuple);
reduceOp.setKeyContextElement(record);
reduceOp.processElement(record);
vector<StreamRecord*> collectedRecords = output.getCollectedRecords();
EXPECT_EQ(collectedRecords.size(), 1);
EXPECT_EQ(dynamic_cast<Long*>(reinterpret_cast<Tuple2*>(collectedRecords[0]->getValue())->f1)->getValue(), 1);
for (auto record : collectedRecords) {
delete record;
}
}
TEST(StreamGroupedReduceOperatorTest, ProcessElement_ExistingKey) {
MockOutput output;
string soPath = "/tmp/libMockReduceFunction.so";
string keyBySoName = "/tmp/libMockKeyedBy.so";
nlohmann::json config;
config["udf_so"] = soPath;
config["key_so"][0] = "libMockKeyedBy.so";
config["hash_path"] = "/tmp/";
config["udf_obj"] = "{}";
omnistream::datastream::StreamGroupedReduceOperator<Object> reduceOp(&output, config);
auto env2 = new omnistream::RuntimeEnvironmentV2();
auto taskInfo = new TaskInformationPOD();
taskInfo->setStateBackend("HashMapStateBackend");
{
auto configPOD = taskInfo->getStreamConfigPOD();
auto operatorDesc = configPOD.getOperatorDescription();
operatorDesc.setOperatorId("deadbeefdeadbeefdeadbeefdeadbeef");
configPOD.setOperatorDescription(operatorDesc);
taskInfo->setStreamConfigPOD(configPOD);
}
env2->SetTaskStateManager(std::make_shared<omnistream::TaskStateManager>());
env2->setTaskConfiguration(*taskInfo);
StreamTaskStateInitializerImpl *initializer =
new StreamTaskStateInitializerImpl(env2);
reduceOp.setup();
reduceOp.initializeState(initializer, new StringSerializer());
reduceOp.open();
Object* key1 = new String("key1");
Object* value1 = new Long(1);
Tuple2* tuple1 = new Tuple2(key1, value1);
StreamRecord *record1 = new StreamRecord(tuple1);
Object* key2 = new String("key1");
Object* value2 = new Long(2);
Tuple2* tuple2 = new Tuple2(key2, value2);
StreamRecord *record2 = new StreamRecord(tuple2);
reduceOp.setKeyContextElement(record1);
reduceOp.processElement(record1);
reduceOp.setKeyContextElement(record2);
reduceOp.processElement(record2);
vector<StreamRecord*> collectedRecords = output.getCollectedRecords();
EXPECT_EQ(collectedRecords.size(), 2);
EXPECT_EQ(dynamic_cast<Long*>(reinterpret_cast<Tuple2*>(collectedRecords[1]->getValue())->f1)->getValue(), 3);
for (auto record : collectedRecords) {
delete record;
}
}
TEST(StreamGroupedReduceOperatorTest, GetName) {
MockOutput output;
string soPath = "/tmp/libMockReduceFunction.so";
string keyBySoName = "libMockKeyedBy.so";
nlohmann::json config;
config["udf_so"] = soPath;
config["key_so"][0] = "libMockKeyedBy.so";
config["hash_path"] = "/tmp/";
config["udf_obj"] = "{}";
omnistream::datastream::StreamGroupedReduceOperator<Object> reduceOp(&output, config);
EXPECT_STREQ(reduceOp.getName(), "StreamGroupedReduceOperator");
}
TEST(StreamGroupedReduceOperatorTest, Open_NotImplemented) {
MockOutput output;
string soPath = "/tmp/libMockReduceFunction.so";
string keyBySoName = "libMockKeyedBy.so";
nlohmann::json config;
config["udf_so"] = soPath;
config["key_so"][0] = "libMockKeyedBy.so";
config["hash_path"] = "/tmp/";
config["udf_obj"] = "{}";
omnistream::datastream::StreamGroupedReduceOperator<Object> reduceOp(&output, config);
auto env2 = new omnistream::RuntimeEnvironmentV2();
auto taskInfo = new TaskInformationPOD();
taskInfo->setStateBackend("HashMapStateBackend");
{
auto configPOD = taskInfo->getStreamConfigPOD();
auto operatorDesc = configPOD.getOperatorDescription();
operatorDesc.setOperatorId("deadbeefdeadbeefdeadbeefdeadbeef");
configPOD.setOperatorDescription(operatorDesc);
taskInfo->setStreamConfigPOD(configPOD);
}
env2->SetTaskStateManager(std::make_shared<omnistream::TaskStateManager>());
env2->setTaskConfiguration(*taskInfo);
StreamTaskStateInitializerImpl *initializer = new StreamTaskStateInitializerImpl(env2);
reduceOp.setup();
reduceOp.initializeState(initializer, new LongSerializer());
reduceOp.open();
}