#include <gtest/gtest.h>
#include <iostream>
#include "streaming/api/operators/StreamMap.h"
#include "test_utils/Mocks.h"
using namespace std;
TEST(StreamMapTest, Constructor_ValidPath) {
MockOutput output;
nlohmann::json config;
config["udf_so"] = "/tmp/libMockMapFunction.so";
config["udf_obj"] = "{}";
omnistream::datastream::StreamMap<Object, Object*> streamMap(&output, config);
EXPECT_EQ(&output, streamMap.GetOutput());
}
TEST(StreamMapTest, Constructor_InvalidPath) {
MockOutput output;
nlohmann::json config;
config["udf_so"] = "invalid_path.so";
config["udf_obj"] = "{}";
omnistream::datastream::StreamMap<Object, Object*> streamMap(&output);
EXPECT_THROW(streamMap.loadUdf(config), std::out_of_range);
}
MockOutput *output = new MockOutput();
StreamMap<MapFunction, int> *streamMap = new StreamMap<MapFunction, int>(output, "/tmp/libMockMapFunction.so");
MockObject *input = new MockObject(10);
StreamRecord *record = new StreamRecord(input);
streamMap->processElement(record);
// 检查输出是否正确
vector<StreamRecord*> collectedRecords = output->getCollectedRecords();
EXPECT_EQ(collectedRecords.size(), 1);
EXPECT_EQ(dynamic_cast<MockObject *>(collectedRecords[0]->getValue())->getValue(), 11);
delete collectedRecords[0]->getValue();
delete collectedRecords[0];
delete input;
delete streamMap;
delete output;
}*/
TEST(StreamMapTest, GetName) {
MockOutput output;
nlohmann::json config;
config["udf_so"] = "/tmp/libMockMapFunction.so";
config["udf_obj"] = "{}";
omnistream::datastream::StreamMap<Object, Object*> streamMap(&output, config);
EXPECT_STREQ(streamMap.getName(), "StreamMap");
}
TEST(StreamMapTest, Open_NotImplemented) {
MockOutput output;
nlohmann::json config;
config["udf_so"] = "/tmp/libMockMapFunction.so";
config["udf_obj"] = "{}";
omnistream::datastream::StreamMap<Object, Object*> streamMap(&output, config);
streamMap.open();
}
TEST(StreamMapTest, Close_NotImplemented) {
MockOutput output;
nlohmann::json config;
config["udf_so"] = "/tmp/libMockMapFunction.so";
config["udf_obj"] = "{}";
omnistream::datastream::StreamMap<Object, Object*> streamMap(&output, config);
streamMap.close();
}