#include "streaming/api/operators/AbstractStreamOperator.h"
#include "streaming/api/operators/StreamTaskStateInitializerImpl.h"
#include "runtime/state/VoidNamespace.h"
#include "runtime/state/heap/StateTable.h"
#include "core/typeutils/LongSerializer.h"
#include "OutputTest.h"
#include "runtime/taskmanager/OmniRuntimeEnvironment.h"
#include "core/api/common/TaskInfoImpl.h"
#include "runtime/state/VoidNamespace.h"
#include "table/data/binary/BinaryRowData.h"
#include "runtime/state/TaskStateManager.h"
#include <gtest/gtest.h>
TEST(AbstractStreamOperatorTest, InitTest)
{
AbstractStreamOperator<int>* op = new AbstractStreamOperator<int>();
op->setup();
auto env2 = new omnistream::RuntimeEnvironmentV2();
auto taskInfo = new TaskInformationPOD();
taskInfo->setStateBackend("HashMapStateBackend");
{
auto configPOD = taskInfo->getStreamConfigPOD();
auto operatorDesc = configPOD.getOperatorDescription();
operatorDesc.setOperatorId("deadbeefdeadbeefdeadbeefdeadbeef");
configPOD.setOperatorDescription(operatorDesc);
taskInfo->setStreamConfigPOD(configPOD);
}
env2->setTaskConfiguration(*taskInfo);
env2->SetTaskStateManager(std::make_shared<omnistream::TaskStateManager>());
ASSERT_NO_THROW((op->setOutput(new OutputTest())));
ASSERT_NO_THROW((op->open()));
ASSERT_NO_THROW((op->initializeState(new StreamTaskStateInitializerImpl(env2), new IntSerializer())));
ASSERT_NO_THROW((op->setCurrentKey(1)));
}
TEST(AbstractStreamOperatorTest, setAndGetCurrentKey)
{
AbstractStreamOperator<int>* op = new AbstractStreamOperator<int>();
op->setup();
op->open();
auto env2 = new omnistream::RuntimeEnvironmentV2();
auto taskInfo = new TaskInformationPOD();
taskInfo->setStateBackend("HashMapStateBackend");
{
auto configPOD = taskInfo->getStreamConfigPOD();
auto operatorDesc = configPOD.getOperatorDescription();
operatorDesc.setOperatorId("deadbeefdeadbeefdeadbeefdeadbeef");
configPOD.setOperatorDescription(operatorDesc);
taskInfo->setStreamConfigPOD(configPOD);
}
env2->setTaskConfiguration(*taskInfo);
env2->SetTaskStateManager(std::make_shared<omnistream::TaskStateManager>());
op->initializeState(new StreamTaskStateInitializerImpl(env2), new IntSerializer());
BinaryRowData* row = BinaryRowData::createBinaryRowDataWithMem(2);
row->setInt(0, 2L);
row->setInt(1, 4L);
op->setCurrentKey(*row->getInt(0));
int key = op->getCurrentKey();
ASSERT_EQ(2L, key);
}
TEST(AbstractStreamOperatorTest, compositeKeys)
{
AbstractStreamOperator<RowData*>* op = new AbstractStreamOperator<RowData*>();
op->setup();
op->open();
auto env2 = new omnistream::RuntimeEnvironmentV2();
auto taskInfo = new TaskInformationPOD();
taskInfo->setStateBackend("HashMapStateBackend");
{
auto configPOD = taskInfo->getStreamConfigPOD();
auto operatorDesc = configPOD.getOperatorDescription();
operatorDesc.setOperatorId("deadbeefdeadbeefdeadbeefdeadbeef");
configPOD.setOperatorDescription(operatorDesc);
taskInfo->setStreamConfigPOD(configPOD);
}
env2->setTaskConfiguration(*taskInfo);
env2->SetTaskStateManager(std::make_shared<omnistream::TaskStateManager>());
op->initializeState(new StreamTaskStateInitializerImpl(env2), new IntSerializer());
BinaryRowData* row = BinaryRowData::createBinaryRowDataWithMem(2);
row->setInt(0, 8);
row->setInt(1, 12);
op->setCurrentKey(dynamic_cast<RowData*>(row));
BinaryRowData* key = dynamic_cast<BinaryRowData*>(op->getCurrentKey());
ASSERT_EQ(8, *key->getInt(0));
ASSERT_EQ(12, *key->getInt(1));
}