* Copyright (c) Huawei Technologies Co., Ltd. 2012-2018. All rights reserved.
*/
#include <gtest/gtest.h>
#include "core/api/common/serialization/JsonRowDataDeserializationSchema.h"
#include "types/logical/LogicalType.h"
#include "runtime/operators/source/csv/CsvRow.h"
#include "runtime/operators/source/csv/CsvSchema.h"
#include "streaming/runtime/streamrecord/StreamRecord.h"
#include "runtime/operators/source/csv/CsvInputFormat.h"
#include "streaming/api/operators/StreamSource.h"
#include "table/runtime/operators/source/InputSplit.h"
#include "typeutils/BinaryRowDataSerializer.h"
#include "streaming/runtime/io/RecordWriterOutput.h"
#include "test/table/runtime/operators/DummyStreamPartitioner.h"
TEST(SourceTestTest, CsvRowKeepsQuotesInUnquotedJsonField)
{
omnistream::csv::CsvSchema schema({omniruntime::type::OMNI_LONG, omniruntime::type::OMNI_VARCHAR});
omnistream::csv::CsvRow row("11,[\"only\"]", schema);
ASSERT_EQ(row.getNodes().size(), 2);
EXPECT_EQ(row.getNodes()[0]->getValue(), "11");
EXPECT_EQ(row.getNodes()[1]->getValue(), "[\"only\"]");
}
TEST(SourceTestTest, CsvRowParsesQuotedJsonField)
{
omnistream::csv::CsvSchema schema({omniruntime::type::OMNI_LONG, omniruntime::type::OMNI_VARCHAR});
omnistream::csv::CsvRow row("11,\"[\"\"only\"\"]\"", schema);
ASSERT_EQ(row.getNodes().size(), 2);
EXPECT_EQ(row.getNodes()[0]->getValue(), "11");
EXPECT_EQ(row.getNodes()[1]->getValue(), "[\"only\"]");
}
TEST(SourceTestTest, JsonRowDataDeserializationSchemaHandlesNullStringField)
{
nlohmann::json description = {
{"outputNames", {"id", "json_str"}}, {"outputTypes", {"BIGINT", "VARCHAR(2147483647)"}}};
JsonRowDataDeserializationSchema schema(description);
std::string record = R"({"id":1,"json_str":null})";
std::vector<const uint8_t*> messages{reinterpret_cast<const uint8_t*>(record.data())};
std::vector<size_t> lengths{record.size()};
auto* batch = reinterpret_cast<omnistream::VectorBatch*>(schema.deserialize(messages, lengths));
ASSERT_NE(batch, nullptr);
auto* idVector = reinterpret_cast<omniruntime::vec::Vector<int64_t>*>(batch->Get(0));
auto* jsonVector =
reinterpret_cast<omniruntime::vec::Vector<omniruntime::vec::LargeStringContainer<std::string_view>>*>(
batch->Get(1));
EXPECT_FALSE(idVector->IsNull(0));
EXPECT_EQ(idVector->GetValue(0), 1);
EXPECT_TRUE(jsonVector->IsNull(0));
delete batch;
}
TEST(SourceTestTest, DISABLED_initoper)
{
std::string description = R"DELIM({ "format" : "csv",
"delimiter" : null,
"filePath" : "input/genbid.csv",
"fields" : [{"kind" : "logical", "isNull" : true, "precision" : 3,
"type" : "BIGINT", "timestampKind" : 0, "fieldName" : "auction" },
{"kind" : "logical", "isNull" : true, "precision" : 3,
"type" : "BIGINT", "timestampKind" : 0, "fieldName" : "bidder" },
{"kind" : "logical", "isNull" : true, "precision" : 3,
"type" : "BIGINT", "timestampKind" : 0, "fieldName" : "price" },
{"kind" : "logical", "isNull" : true, "precision" : 3,
"type" : "STRING", "timestampKind" : 0, "fieldName" : "channel" },
{"kind" : "logical", "isNull" : true, "precision" : 3,
"type" : "STRING", "timestampKind" : 0, "fieldName" : "url" },
{"kind" : "logical", "isNull" : true, "precision" : 3,
"type" : "TIMESTAMP(3)", "timestampKind" : 0, "fieldName" : "dateTime" },
{"kind" : "logical", "isNull" : true, "precision" : 3,
"type" : "STRING", "timestampKind" : 0, "fieldName" : "extra" }
],
"selectFields" : [ 0, 1, 2, 3, 4 ],
"csvSelectFieldToProjectFieldMapping" : [ 0, 1, 2, 3, 4 ],
"csvSelectFieldToCsvFieldMapping" : [ 3, 5, 2, 1, 0 ]})DELIM";
nlohmann::json opDescriptionJSON = nlohmann::json::parse(description);
std::vector<DataTypeId> fields;
for (auto& field : opDescriptionJSON["fields"]) {
fields.push_back(LogicalType::flinkTypeToOmniTypeId(field["type"]));
}
omnistream::csv::CsvSchema schema(fields);
std::vector<int> csvSelectFieldToProjectFieldMapping = opDescriptionJSON["csvSelectFieldToProjectFieldMapping"];
std::vector<int> csvSelectFieldToCsvFieldMapping = opDescriptionJSON["csvSelectFieldToCsvFieldMapping"];
std::vector<int> oneMap;
oneMap.resize(csvSelectFieldToProjectFieldMapping.size());
for (int i = 0; i < csvSelectFieldToProjectFieldMapping.size(); i++) {
oneMap[csvSelectFieldToProjectFieldMapping[i]] = csvSelectFieldToCsvFieldMapping[i];
}
auto csvInputFormat = new omnistream::csv::CsvInputFormat<omnistream::VectorBatch>(schema, 1000, oneMap);
omnistream::InputSplit* inputSplit = new omnistream::InputSplit(opDescriptionJSON["filePath"], 0, 100000);
auto func = new omnistream::InputFormatSourceFunction<omnistream::VectorBatch>(csvInputFormat, inputSplit);
BinaryRowDataSerializer* binaryRowDataSerializer = new BinaryRowDataSerializer(2);
const int32_t BUFFER_CAPACITY = 256;
uint8_t* address = new uint8_t[BUFFER_CAPACITY];
auto* partitioner = new DummyStreamPartitioner();
auto* recordWriter = new omnistream::datastream::RecordWriter(address, BUFFER_CAPACITY, partitioner);
auto* recordWriteOutput = new omnistream::datastream::RecordWriterOutput(binaryRowDataSerializer, recordWriter);
auto* source = new omnistream::StreamSource(func, recordWriteOutput);
source->setup();
source->run();
}