#include <gtest/gtest.h>
#include <iostream>
#include <nlohmann/json.hpp>
#include "OmniOperatorJIT/core/test/util/test_util.h"
#include "test/core/operators/OutputTest.h"
#include "streaming/api/operators/StreamOperatorFactory.h"
#include "table/runtime/operators/TableOperatorConstants.h"
#include "runtime/executiongraph/operatorchain/OperatorPOD.h"
#include "streaming/api/operators/sink/OutputConversionOperator.h"
using json = nlohmann::json;
std::string outputconversionDescription = R"DELIM({"originDescription":"TableToDataSteam(type=ROW<`key` STRING, `countRightId` BIGINT NOT NULL, `sumRightId` BIGINT, `value` STRING> NOT NULL, rowtime=false)",
"inputTypes":["VARCHAR(2147483647)","BIGINT","BIGINT","VARCHAR(2147483647)"],
"outputTypes":["VARCHAR(2147483647)","BIGINT","BIGINT","VARCHAR(2147483647)"]})DELIM";
omnistream::VectorBatch* newVectorBatchOutputConversion() {
auto* vbatch = new omnistream::VectorBatch(5);
std::vector<std::string> key = {"2_0_7y0tKfD", "11_0_9PxSTa", "7_0_UiVsMjV", "13_1_R6Qnyd", "5_0_YgXQRyu"};
std::vector<int64_t> countRightId = {3, 4, 5, 6, 7};
std::vector<int64_t> sumRightId = {1009, 1007, 1003, 2002, 2004};
std::vector<std::string> value = {"1009", "1007", "1003", "2002", "2004"};
vbatch->Append(omniruntime::TestUtil::CreateVarcharVector(key.data(), 5));
vbatch->Append(omniruntime::TestUtil::CreateVector<int64_t>(5, countRightId.data()));
vbatch->Append(omniruntime::TestUtil::CreateVector<int64_t>(5, sumRightId.data()));
vbatch->Append(omniruntime::TestUtil::CreateVarcharVector(value.data(), 5));
vbatch->setRowKind(0, RowKind::INSERT);
vbatch->setRowKind(1, RowKind::INSERT);
vbatch->setRowKind(2, RowKind::INSERT);
vbatch->setRowKind(3, RowKind::INSERT);
vbatch->setRowKind(4, RowKind::INSERT);
return vbatch;
}
TEST(OutputConversionOperatorTest, DISABLED_ProcessTest) {
json parsedJson = json::parse(outputconversionDescription);
omnistream::OperatorPOD outputconversionPOD("nexmark_source", std::string(OPERATOR_NAME_OUTPUT_CONVERSION), outputconversionDescription, {}, {});
auto *output = new ExternalOutputTest();
auto* outputConversionOperator = dynamic_cast<OutputConversionOperator*>(
omnistream::StreamOperatorFactory::createOperatorAndCollector(outputconversionPOD, output, nullptr));
omnistream::VectorBatch* vBatch = newVectorBatchOutputConversion();
auto* streamRecord = new StreamRecord(vBatch);
outputConversionOperator->open();
outputConversionOperator->processElement(streamRecord);
std::cout << "=========== print result ==========" << std::endl;
auto resultVec = output->getAll();
for (int j = 0; j < resultVec.size(); j++) {
auto resultRow = resultVec[j];
std::cout << to_string(resultRow->getKind()) << std::endl;
int arity = resultRow->getArity();
std::vector<std::string> outputTypes = parsedJson["outputTypes"].get<std::vector<std::basic_string<char>>>();
for (int i = 0; i < arity; i++) {
if (outputTypes[i] == "BIGINT" || outputTypes[i].find("TIMESTAMP") != std::string::npos) {
std::cout << std::any_cast<long>(resultRow->getField(i)) << "| ";
} else if (outputTypes[i] == "INTEGER") {
std::cout << std::any_cast<int>(resultRow->getField(i)) << "| ";
} else if (outputTypes[i] == "DOUBLE") {
std::cout << std::any_cast<long>(resultRow->getField(i)) << "| ";
} else if (outputTypes[i] == "BOOLEAN") {
std::cout << std::any_cast<int>(resultRow->getField(i)) << "| ";
} else if (outputTypes[i] == "STRING" || outputTypes[i] == "VARCHAR" || outputTypes[i] == "VARCHAR(2147483647)") {
auto str_result = std::any_cast<std::string>(resultRow->getField(i));
std::cout << str_result << "| ";
} else {
std::cout << "Unknown type." << std::endl;
}
}
std::cout << std::endl;
}
}