* @Copyright: Copyright (c) Huawei Technologies Co., Ltd. 2021-2022. All rights reserved.
* @Description: lookup join operator test implementations
*/
#include <vector>
#include <chrono>
#include <thread>
#include <random>
#include <ctime>
#include "gtest/gtest.h"
#include "operator/join/hash_builder.h"
#include "operator/join/lookup_join.h"
#include "operator/join/lookup_outer_join.h"
#include "vector/vector_helper.h"
#include "util/config_util.h"
#include "util/test_util.h"
using namespace omniruntime::op;
using namespace omniruntime::vec;
using namespace omniruntime::expressions;
using namespace TestUtil;
using std::map;
using std::string;
using std::vector;
namespace JoinTest {
const int32_t DISTINCT_VALUE_COUNT = 4;
const int32_t REPEAT_COUNT = 200;
const int32_t COLUMN_COUNT_4 = 4;
const int32_t VEC_BATCH_COUNT_10 = 10;
const int32_t VEC_BATCH_COUNT_1 = 1;
const int32_t BUILD_POSITION_COUNT = 1000;
const int32_t PROBE_POSITION_COUNT = 1000;
const int32_t TIME_TO_SLEEP = 100;
void DeleteJoinOperatorFactory(HashBuilderOperatorFactory *hashBuilderOperatorFactory,
LookupJoinOperatorFactory *lookupJoinOperatorFactory,
LookupOuterJoinOperatorFactory *lookupOuterJoinOperatorFactory)
{
delete hashBuilderOperatorFactory;
delete lookupJoinOperatorFactory;
delete lookupOuterJoinOperatorFactory;
}
void DeleteJoinOperatorFactory(HashBuilderOperatorFactory *hashBuilderOperatorFactory,
LookupJoinOperatorFactory *lookupJoinOperatorFactory)
{
DeleteJoinOperatorFactory(hashBuilderOperatorFactory, lookupJoinOperatorFactory, nullptr);
}
VectorBatch *ConstructSimpleBuildData()
{
const int32_t dataSize = 10;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t buildData0[dataSize] = {1, 2, 1, 2, 3, 4, 5, 6, 7, 1};
int64_t buildData1[dataSize] = {79, 79, 70, 70, 70, 70, 70, 70, 70, 70};
return CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
}
VectorBatch **ConstructSimpleBuildData2()
{
const int32_t dataSize1 = 6;
const int32_t dataSize2 = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t buildData00[dataSize1] = {1, 1, 3, 6, 7, 1};
int64_t buildData01[dataSize1] = {79, 70, 70, 70, 70, 70};
VectorBatch *vecBatch0 = CreateVectorBatch(buildTypes, dataSize1, buildData00, buildData01);
int64_t buildData10[dataSize2] = {2, 2, 4, 5};
int64_t buildData11[dataSize2] = {79, 70, 70, 70};
VectorBatch *vecBatch1 = CreateVectorBatch(buildTypes, dataSize2, buildData10, buildData11);
auto **vectorBatches = new VectorBatch *[2];
vectorBatches[0] = vecBatch0;
vectorBatches[1] = vecBatch1;
return vectorBatches;
}
VectorBatch *ConstructSimpleProbeData()
{
const int32_t dataSize = 10;
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t probeData0[] = {1, 2, 3, 4, 5, 6, 1, 1, 2, 3};
int64_t probeData1[] = {78, 78, 78, 78, 78, 78, 78, 82, 82, 65};
return CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
}
VectorBatch *ConstructSimpleExpectedData()
{
const uint32_t originalDataSize = 10;
const uint32_t expectedDataSize = 18;
int64_t expectedData0[originalDataSize] = {78, 78, 78, 78, 78, 78, 78, 82, 82, 65};
int64_t expectedData1[expectedDataSize] = {79, 70, 70, 79, 70, 70, 70, 70, 70, 79, 70, 70, 79, 70, 70, 79, 70, 70};
auto inputType = IntType();
auto expectedVec0 = CreateVector<int64_t>(originalDataSize, expectedData0);
int32_t ids[expectedDataSize] = {0, 0, 0, 1, 1, 2, 3, 4, 5, 6, 6, 6, 7, 7, 7, 8, 8, 9};
auto expectedDictVec0 =
VectorHelper::CreateDictionary(ids, expectedDataSize, reinterpret_cast<Vector<int64_t> *>(expectedVec0));
auto expectedVec1 = CreateVector<int64_t>(expectedDataSize, expectedData1);
delete expectedVec0;
auto *vectorBatch = new VectorBatch(expectedDataSize);
vectorBatch->Append(expectedDictVec0);
vectorBatch->Append(expectedVec1);
return vectorBatch;
}
void BuildVectorValues(Vector<int64_t> *vector)
{
int32_t idx = 0;
for (int32_t j = 0; j < DISTINCT_VALUE_COUNT; j++) {
for (int32_t k = 0; k < REPEAT_COUNT; k++) {
vector->SetValue(idx++, j);
}
}
}
void BuildTestData(VectorBatch **vecBatches, int32_t vecBatchCount, int32_t columnCount)
{
int32_t positionCount = DISTINCT_VALUE_COUNT * REPEAT_COUNT;
for (int32_t i = 0; i < vecBatchCount; i++) {
auto *vecBatch = new VectorBatch(positionCount);
for (int32_t colIdx = 0; colIdx < columnCount; colIdx++) {
auto vector = std::make_unique<Vector<int64_t>>(positionCount);
BuildVectorValues(vector.get());
vecBatch->Append(vector.release());
}
vecBatches[i] = vecBatch;
}
}
HashBuilderOperatorFactory *CreateSimpleBuildFactory(int32_t operatorCount, JoinType joinType)
{
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
auto hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(joinType, buildTypes,
buildJoinCols, joinColsCount, operatorCount);
return hashBuilderFactory;
}
LookupJoinOperatorFactory *CreateSimpleProbeFactory(const HashBuilderOperatorFactory *hashBuilderFactory)
{
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int32_t probeOutputCols[1] = {1};
int32_t probeOutputColsCount = 1;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType() }));
int32_t buildOutputCols[1] = {1};
int32_t buildOutputColsCount = 1;
auto hashBuilderFactoryAddr = reinterpret_cast<int64_t>(hashBuilderFactory);
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
return lookupJoinFactory;
}
TEST(NativeOmniJoinTest, TestComparePerf)
{
int32_t buildVecBatchCount = 10;
auto **builderVecBatchesWithoutJit = new VectorBatch *[buildVecBatchCount];
BuildTestData(builderVecBatchesWithoutJit, buildVecBatchCount, COLUMN_COUNT_4);
auto hashBuilderFactoryWithoutJit = CreateSimpleBuildFactory(1, OMNI_JOIN_TYPE_INNER);
auto hashBuilderOperatorWithoutJit =
dynamic_cast<HashBuilderOperator *>(hashBuilderFactoryWithoutJit->CreateOperator());
Timer timer;
timer.SetStart();
for (int32_t pageIndex = 0; pageIndex < buildVecBatchCount; ++pageIndex) {
auto errNo = hashBuilderOperatorWithoutJit->AddInput(builderVecBatchesWithoutJit[pageIndex]);
EXPECT_EQ(errNo, OMNI_STATUS_NORMAL);
}
VectorBatch *hashBuilderOutputWithoutJit = nullptr;
hashBuilderOperatorWithoutJit->GetOutput(&hashBuilderOutputWithoutJit);
timer.CalculateElapse();
double wallElapsed = timer.GetWallElapse();
double cpuElapsed = timer.GetCpuElapse();
std::cout << "HashBuilder without OmniJit, wall " << wallElapsed << " cpu " << cpuElapsed << std::endl;
int32_t probeVecBatchCount = 1;
auto **probeVecBatchesWithoutJit = new VectorBatch *[probeVecBatchCount];
BuildTestData(probeVecBatchesWithoutJit, probeVecBatchCount, COLUMN_COUNT_4);
auto lookupJoinFactoryWithoutJit = CreateSimpleProbeFactory(hashBuilderFactoryWithoutJit);
auto lookupJoinOperatorWithoutJit =
dynamic_cast<LookupJoinOperator *>(lookupJoinFactoryWithoutJit->CreateOperator());
timer.Reset();
auto errNo = lookupJoinOperatorWithoutJit->AddInput(probeVecBatchesWithoutJit[0]);
EXPECT_EQ(errNo, OMNI_STATUS_NORMAL);
std::vector<VectorBatch *> lookupJoinOutputWithoutJit;
while (lookupJoinOperatorWithoutJit->GetStatus() != OMNI_STATUS_FINISHED) {
VectorBatch *result = nullptr;
lookupJoinOperatorWithoutJit->GetOutput(&result);
lookupJoinOutputWithoutJit.push_back(result);
}
timer.CalculateElapse();
wallElapsed = timer.GetWallElapse();
cpuElapsed = timer.GetCpuElapse();
std::cout << "LookupJoin without OmniJit, wall " << wallElapsed << " cpu " << cpuElapsed << std::endl;
auto **builderVecBatchesWithJit = new VectorBatch *[buildVecBatchCount];
BuildTestData(builderVecBatchesWithJit, buildVecBatchCount, COLUMN_COUNT_4);
auto hashBuilderFactoryWithJit = CreateSimpleBuildFactory(1, OMNI_JOIN_TYPE_INNER);
auto hashBuilderOperatorWithJit = dynamic_cast<HashBuilderOperator *>(hashBuilderFactoryWithJit->CreateOperator());
timer.Reset();
for (int32_t pageIndex = 0; pageIndex < buildVecBatchCount; ++pageIndex) {
errNo = hashBuilderOperatorWithJit->AddInput(builderVecBatchesWithJit[pageIndex]);
EXPECT_EQ(errNo, OMNI_STATUS_NORMAL);
}
VectorBatch *hashBuilderOutputWithJit = nullptr;
hashBuilderOperatorWithJit->GetOutput(&hashBuilderOutputWithJit);
timer.CalculateElapse();
wallElapsed = timer.GetWallElapse();
cpuElapsed = timer.GetCpuElapse();
std::cout << "HashBuilder with OmniJit, wall " << wallElapsed << " cpu " << cpuElapsed << std::endl;
auto **probeVecBatchesWithJit = new VectorBatch *[probeVecBatchCount];
BuildTestData(probeVecBatchesWithJit, probeVecBatchCount, COLUMN_COUNT_4);
auto lookupJoinFactoryWithJit = CreateSimpleProbeFactory(hashBuilderFactoryWithJit);
auto lookupJoinOperatorWithJit = dynamic_cast<LookupJoinOperator *>(lookupJoinFactoryWithJit->CreateOperator());
timer.Reset();
errNo = lookupJoinOperatorWithJit->AddInput(probeVecBatchesWithJit[0]);
EXPECT_EQ(errNo, OMNI_STATUS_NORMAL);
std::vector<VectorBatch *> lookupJoinOutputWithJit;
while (lookupJoinOperatorWithJit->GetStatus() != OMNI_STATUS_FINISHED) {
VectorBatch *result = nullptr;
lookupJoinOperatorWithJit->GetOutput(&result);
lookupJoinOutputWithJit.push_back(result);
}
timer.CalculateElapse();
wallElapsed = timer.GetWallElapse();
cpuElapsed = timer.GetCpuElapse();
std::cout << "LookupJoin with OmniJit, wall " << wallElapsed << " cpu " << cpuElapsed << std::endl;
EXPECT_EQ(hashBuilderOutputWithoutJit, nullptr);
EXPECT_EQ(hashBuilderOutputWithJit, nullptr);
EXPECT_EQ(lookupJoinOutputWithoutJit.size(), lookupJoinOutputWithJit.size());
for (uint32_t i = 0; i < lookupJoinOutputWithoutJit.size(); ++i) {
EXPECT_TRUE(VecBatchMatchIgnoreOrder(lookupJoinOutputWithoutJit[i], lookupJoinOutputWithJit[i]));
}
VectorHelper::FreeVecBatches(lookupJoinOutputWithoutJit);
VectorHelper::FreeVecBatches(lookupJoinOutputWithJit);
delete[] builderVecBatchesWithoutJit;
delete[] probeVecBatchesWithoutJit;
delete[] builderVecBatchesWithJit;
delete[] probeVecBatchesWithJit;
omniruntime::op::Operator::DeleteOperator(lookupJoinOperatorWithoutJit);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperatorWithJit);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperatorWithoutJit);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperatorWithJit);
delete lookupJoinFactoryWithoutJit;
delete lookupJoinFactoryWithJit;
delete hashBuilderFactoryWithoutJit;
delete hashBuilderFactoryWithJit;
}
TEST(NativeOmniJoinTest, TestInnerEqualityJoinWithOneBuildOp)
{
VectorBatch *vecBatch = ConstructSimpleBuildData();
HashBuilderOperatorFactory *hashBuilderFactory = CreateSimpleBuildFactory(1, OMNI_JOIN_TYPE_INNER);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(vecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
VectorBatch *probeVecBatch = ConstructSimpleProbeData();
LookupJoinOperatorFactory *lookupJoinFactory = CreateSimpleProbeFactory(hashBuilderFactory);
auto *lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
VectorBatch *expectVecBatch = ConstructSimpleExpectedData();
EXPECT_EQ(outputVecBatch->GetRowCount(), expectVecBatch->GetRowCount());
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, DISABLED_TestInnerEqualityJoinWithTwoBuildOp)
{
VectorBatch **vectorBatches = ConstructSimpleBuildData2();
HashBuilderOperatorFactory *hashBuilderFactory = CreateSimpleBuildFactory(2, OMNI_JOIN_TYPE_INNER);
auto *hashBuilderOperator0 = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator0->AddInput(vectorBatches[0]);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator0->GetOutput(&hashBuildOutput);
auto *hashBuilderOperator1 = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator1->AddInput(vectorBatches[1]);
hashBuilderOperator1->GetOutput(&hashBuildOutput);
VectorBatch *probeVecBatch = ConstructSimpleProbeData();
LookupJoinOperatorFactory *lookupJoinFactory = CreateSimpleProbeFactory(hashBuilderFactory);
auto *lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
VectorBatch *expectVecBatch = ConstructSimpleExpectedData();
EXPECT_EQ(outputVecBatch->GetRowCount(), expectVecBatch->GetRowCount());
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
delete[] vectorBatches;
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator0);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator1);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
struct HashJoinThreadArgs {
bool isOriginal;
int64_t hashBuilderFactoryAddr;
VectorBatch **buildVecBatches;
int32_t buildVecBatchCount;
int32_t partitionIndex;
int64_t lookupJoinFactoryAddr;
VectorBatch **probeVecBatches;
int32_t probeVecBatchCount;
std::mutex lock;
std::vector<HashBuilderOperator *> buildOperators;
};
VectorBatch **ConstructHashBuilderTestData(int32_t tableCount, int32_t columnCount)
{
int32_t numbers[] = {1, 2, 3, 4, 6, 7, 8, 9, 12, 13, 75, 27, 28, 38, 36, 32, 20, 50, 37};
int32_t numberCount = 19;
auto **vectorBatches = new VectorBatch *[tableCount];
int32_t positionCount = BUILD_POSITION_COUNT;
for (int32_t vecBatchIdx = 0; vecBatchIdx < tableCount; vecBatchIdx++) {
auto *vecBatch = new VectorBatch(columnCount);
for (int32_t vecIdx = 0; vecIdx < columnCount; vecIdx++) {
std::unique_ptr<Vector<int64_t>> vector = std::make_unique<Vector<int64_t>>(positionCount);
for (int32_t position = 0; position < positionCount; position++) {
int64_t value = numbers[position % numberCount];
vector->SetValue(position, value);
}
vecBatch->Append(vector.release());
}
vectorBatches[vecBatchIdx] = vecBatch;
}
return vectorBatches;
}
HashBuilderOperatorFactory *PrepareHashBuilder(int32_t operatorCount, bool isOriginal)
{
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType(), LongType(), LongType() }));
int32_t buildHashCols[] = {2, 3};
int32_t buildHashColsCount = 2;
HashBuilderOperatorFactory *hashBuilderOperatorFactory =
HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_INNER, buildTypes, buildHashCols,
buildHashColsCount, operatorCount);
return hashBuilderOperatorFactory;
}
LookupJoinOperatorFactory *PrepareLookupJoin(const HashBuilderOperatorFactory *hashBuilderOperatorFactory,
bool isOriginal)
{
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType(), LongType(), LongType() }));
int32_t probeOutputCols[] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[] = {2, 3};
int32_t probeHashColsCount = 2;
int32_t buildOutputCols[] = {0, 1};
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
auto hashBuilderFactoryAddr = reinterpret_cast<int64_t>(hashBuilderOperatorFactory);
auto lookupJoinOperatorFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes,
probeOutputCols, probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols,
buildOutputTypes.GetSize(), buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
return lookupJoinOperatorFactory;
}
void SetHashJoinThreadArgs(struct HashJoinThreadArgs *hashJoinThreadArgs, bool isOriginal,
int64_t hashBuilderFactoryAddr, VectorBatch **buildVecBatches, int32_t buildVecBatchCount,
int64_t lookupJoinFactoryAddr, VectorBatch **probeVecBatches, int32_t probeVecBatchCount)
{
hashJoinThreadArgs->isOriginal = isOriginal;
hashJoinThreadArgs->hashBuilderFactoryAddr = hashBuilderFactoryAddr;
hashJoinThreadArgs->buildVecBatches = buildVecBatches;
hashJoinThreadArgs->buildVecBatchCount = buildVecBatchCount;
hashJoinThreadArgs->lookupJoinFactoryAddr = lookupJoinFactoryAddr;
hashJoinThreadArgs->probeVecBatches = probeVecBatches;
hashJoinThreadArgs->probeVecBatchCount = probeVecBatchCount;
hashJoinThreadArgs->partitionIndex = -1;
}
void TestHashBuilder(struct HashJoinThreadArgs *hashJoinThreadArgs)
{
auto *hashBuilderOperatorFactory =
reinterpret_cast<HashBuilderOperatorFactory *>(hashJoinThreadArgs->hashBuilderFactoryAddr);
HashBuilderOperator *hashBuilderOperator = nullptr;
if (hashJoinThreadArgs->isOriginal) {
hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(hashBuilderOperatorFactory->CreateOperator());
} else {
hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderOperatorFactory));
}
if (hashJoinThreadArgs->partitionIndex != -1) {
auto input = DuplicateVectorBatch(hashJoinThreadArgs->buildVecBatches[hashJoinThreadArgs->partitionIndex]);
hashBuilderOperator->AddInput(input);
VectorBatch *buildOutputTable = nullptr;
hashBuilderOperator->GetOutput(&buildOutputTable);
hashJoinThreadArgs->lock.lock();
hashJoinThreadArgs->buildOperators.push_back(hashBuilderOperator);
hashJoinThreadArgs->lock.unlock();
} else {
for (int i = 0; i < hashJoinThreadArgs->buildVecBatchCount; ++i) {
hashBuilderOperator->AddInput(DuplicateVectorBatch(hashJoinThreadArgs->buildVecBatches[i]));
}
VectorBatch *buildOutputTable = nullptr;
hashBuilderOperator->GetOutput(&buildOutputTable);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
}
}
void TestHashBuilderMultiThread(VectorBatch **buildVecBatches)
{
HashBuilderOperatorFactory *hashBuilderOperatorFactory = PrepareHashBuilder(1, true);
struct HashJoinThreadArgs hashBuilderThreadArgs;
SetHashJoinThreadArgs(&hashBuilderThreadArgs, true, reinterpret_cast<int64_t>(hashBuilderOperatorFactory),
buildVecBatches, 1, 0, nullptr, 0);
TestHashBuilder(&hashBuilderThreadArgs);
DeleteJoinOperatorFactory(hashBuilderOperatorFactory, nullptr);
}
void TestLookupJoin(struct HashJoinThreadArgs *hashJoinThreadArgs)
{
auto *lookupJoinOperatorFactory =
reinterpret_cast<LookupJoinOperatorFactory *>(hashJoinThreadArgs->lookupJoinFactoryAddr);
LookupJoinOperator *lookupJoinOperator = nullptr;
if (hashJoinThreadArgs->isOriginal) {
lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinOperatorFactory->CreateOperator());
} else {
lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinOperatorFactory));
}
const int32_t maxLoopCount = 1000;
for (int loop = 0; loop < maxLoopCount; loop++) {
for (int i = 0; i < hashJoinThreadArgs->probeVecBatchCount; ++i) {
lookupJoinOperator->AddInput(DuplicateVectorBatch(hashJoinThreadArgs->probeVecBatches[i]));
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
if (outputVecBatch != nullptr) {
VectorHelper::FreeVecBatch(outputVecBatch);
}
}
}
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
}
void TestHashJoin(VectorBatch **buildVecBatches, int index, VectorBatch **probeVecBatches)
{
HashBuilderOperatorFactory *hashBuilderOperatorFactory = PrepareHashBuilder(1, true);
struct HashJoinThreadArgs hashBuilderThreadArgs;
SetHashJoinThreadArgs(&hashBuilderThreadArgs, true, reinterpret_cast<int64_t>(hashBuilderOperatorFactory),
buildVecBatches, 1, 0, nullptr, 0);
LookupJoinOperatorFactory *lookupJoinOperatorFactory = PrepareLookupJoin(hashBuilderOperatorFactory, true);
struct HashJoinThreadArgs lookupJoinThreadArgs;
SetHashJoinThreadArgs(&lookupJoinThreadArgs, true, 0, nullptr, 0,
reinterpret_cast<int64_t>(lookupJoinOperatorFactory), probeVecBatches, VEC_BATCH_COUNT_1);
hashBuilderThreadArgs.partitionIndex = index;
TestHashBuilder(&hashBuilderThreadArgs);
TestLookupJoin(&lookupJoinThreadArgs);
for (uint32_t j = 0; j < hashBuilderThreadArgs.buildOperators.size(); j++) {
omniruntime::op::Operator::DeleteOperator(hashBuilderThreadArgs.buildOperators[j]);
}
DeleteJoinOperatorFactory(hashBuilderOperatorFactory, lookupJoinOperatorFactory);
}
void TestHashJoinMultiThreads(VectorBatch **buildVecBatches, int threadNum, VectorBatch **probeVecBatches,
double &wallElapsed, double &cpuElapsed)
{
std::vector<std::thread> vecOfThreads;
Timer timer;
timer.SetStart();
for (int32_t i = 0; i < threadNum; ++i) {
std::thread th(TestHashJoin, buildVecBatches, i, probeVecBatches);
vecOfThreads.push_back(std::move(th));
}
for (auto &th : vecOfThreads) {
if (th.joinable()) {
th.join();
}
}
timer.CalculateElapse();
wallElapsed = timer.GetWallElapse();
cpuElapsed = timer.GetCpuElapse();
}
TEST(NativeOmniJoinTest, TestInnerEqualityBuildOriginalMultiThreads)
{
VectorBatch **buildVecBatches = ConstructHashBuilderTestData(VEC_BATCH_COUNT_10, COLUMN_COUNT_4);
std::cout << "finish build hash builder data" << std::endl;
const auto processorCount = std::thread::hardware_concurrency();
std::cout << "core number: " << processorCount << std::endl;
uint32_t threadNums[] = {1, 8, 16};
for (uint32_t i = 0; i < sizeof(threadNums) / sizeof(uint32_t); ++i) {
auto t = threadNums[i] < processorCount ? processorCount / threadNums[i] : 1;
uint32_t threadNum = threadNums[i];
std::vector<std::thread> vecOfThreads;
Timer timer;
timer.SetStart();
for (uint32_t j = 0; j < threadNum; ++j) {
std::thread th(TestHashBuilderMultiThread, buildVecBatches);
vecOfThreads.push_back(std::move(th));
}
for (auto &th : vecOfThreads) {
if (th.joinable()) {
th.join();
}
}
timer.CalculateElapse();
double wallElapsed = timer.GetWallElapse();
double cpuElapsed = timer.GetCpuElapse();
std::cout << "testHashBuilderOriginalMultiThreads " << threadNum << " wallElapsed time: " << wallElapsed <<
"s" << std::endl;
std::cout << "testHashBuilderOriginalMultiThreads " << threadNum << " cpuElapsed time: " <<
cpuElapsed / processorCount * t << "s" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(TIME_TO_SLEEP));
}
FreeVecBatches(buildVecBatches, VEC_BATCH_COUNT_10);
}
TEST(NativeOmniJoinTest, TestInnerEqualityBuildJITMultiThreads)
{
VectorBatch **buildVecBatches = ConstructHashBuilderTestData(VEC_BATCH_COUNT_10, COLUMN_COUNT_4);
std::cout << "finish build hash builder data" << std::endl;
const auto processorCount = std::thread::hardware_concurrency();
std::cout << "core number: " << processorCount << std::endl;
uint32_t threadNums[] = {1, 8, 16};
for (uint32_t i = 0; i < sizeof(threadNums) / sizeof(uint32_t); ++i) {
auto t = threadNums[i] < processorCount ? processorCount / threadNums[i] : 1;
uint32_t threadNum = threadNums[i];
std::vector<std::thread> vecOfThreads;
Timer timer;
timer.SetStart();
for (uint32_t j = 0; j < threadNum; ++j) {
std::thread th(TestHashBuilderMultiThread, buildVecBatches);
vecOfThreads.push_back(std::move(th));
}
for (auto &th : vecOfThreads) {
if (th.joinable()) {
th.join();
}
}
timer.CalculateElapse();
double wallElapsed = timer.GetWallElapse();
double cpuElapsed = timer.GetCpuElapse();
std::cout << "testHashBuilderJITMultiThreads " << threadNum << " wallElapsed time: " << wallElapsed << "s" <<
std::endl;
std::cout << "testHashBuilderJITMultiThreads " << threadNum << " cpuElapsed time: " <<
cpuElapsed / processorCount * t << "s" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(TIME_TO_SLEEP));
}
FreeVecBatches(buildVecBatches, VEC_BATCH_COUNT_10);
}
VectorBatch **ConstructBuilderTestData(int32_t *numbers, int32_t numberCount)
{
auto **vectorBatches = new VectorBatch *[numberCount];
for (int32_t vecBatchIdx = 0; vecBatchIdx < numberCount; vecBatchIdx++) {
auto *vectorBatch = new VectorBatch(1);
for (int32_t colIdx = 0; colIdx < COLUMN_COUNT_4; colIdx++) {
std::unique_ptr<Vector<int64_t>> column = std::make_unique<Vector<int64_t>>(1);
column->SetValue(0, numbers[vecBatchIdx]);
vectorBatch->Append(column.release());
}
vectorBatches[vecBatchIdx] = vectorBatch;
}
return vectorBatches;
}
VectorBatch **ConstructProbeTestData(const int32_t *numbers, int32_t numberCount)
{
if (numberCount <= 0) {
return nullptr;
}
int32_t positionCount = PROBE_POSITION_COUNT;
auto **vecBatches = new VectorBatch *[VEC_BATCH_COUNT_1];
for (int32_t vecBatchIdx = 0; vecBatchIdx < VEC_BATCH_COUNT_1; vecBatchIdx++) {
auto *vecBatch = new VectorBatch(COLUMN_COUNT_4);
for (int32_t colIdx = 0; colIdx < COLUMN_COUNT_4; colIdx++) {
std::unique_ptr<Vector<int64_t>> vector = std::make_unique<Vector<int64_t>>(positionCount);
for (int32_t posIdx = 0; posIdx < positionCount; posIdx++) {
int64_t value = numbers[(posIdx % numberCount)];
vector->SetValue(posIdx, value);
}
vecBatch->Append(vector.release());
}
vecBatches[vecBatchIdx] = vecBatch;
}
return vecBatches;
}
void TestHashBuilderMultiThreads(struct HashJoinThreadArgs *hashBuilderThreadArgs, int32_t threadNum)
{
std::vector<std::thread> vecOfThreads;
for (int32_t i = 0; i < threadNum; ++i) {
hashBuilderThreadArgs->partitionIndex = i;
std::thread th(TestHashBuilder, hashBuilderThreadArgs);
vecOfThreads.push_back(std::move(th));
}
for (auto &th : vecOfThreads) {
if (th.joinable()) {
th.join();
}
}
}
LookupJoinOperatorFactory *TestLookupJoinMultiThreads(VectorBatch ***probeVecBatches, int32_t threadNum, int32_t index,
bool isOriginal, const HashBuilderOperatorFactory *hashBuilderOperatorFactory, double &wallElapsed,
double &cpuElapsed)
{
LookupJoinOperatorFactory *lookupJoinOperatorFactory = PrepareLookupJoin(hashBuilderOperatorFactory, isOriginal);
struct HashJoinThreadArgs lookupJoinThreadArgs;
SetHashJoinThreadArgs(&lookupJoinThreadArgs, isOriginal, 0, nullptr, 0,
reinterpret_cast<int64_t>(lookupJoinOperatorFactory), probeVecBatches[index], VEC_BATCH_COUNT_1);
std::vector<std::thread> vecOfThreads;
Timer timer;
timer.SetStart();
for (int32_t i = 0; i < threadNum; ++i) {
std::thread t(TestLookupJoin, &lookupJoinThreadArgs);
vecOfThreads.push_back(std::move(t));
}
for (auto &th : vecOfThreads) {
if (th.joinable()) {
th.join();
}
}
timer.CalculateElapse();
wallElapsed = timer.GetWallElapse();
cpuElapsed = timer.GetCpuElapse();
return lookupJoinOperatorFactory;
}
TEST(NativeOmniJoinTest, TestInnerEqualityProbeOriginalMultiThreads)
{
int32_t numbers[3][16] = {
{6},
{6, 13, 7, 4, 1, 9, 3, 2},
{8, 13, 75, 12, 27, 28, 3, 2, 6, 38, 36, 32, 1, 20, 50, 37}
};
uint32_t threadNums[] = {1, 8, 16};
uint32_t groupCount = sizeof(threadNums) / sizeof(uint32_t);
auto ***buildVecBatches = new VectorBatch **[groupCount];
for (uint32_t i = 0; i < groupCount; i++) {
buildVecBatches[i] = ConstructBuilderTestData(numbers[i], threadNums[i]);
}
auto ***probeVecBatches = new VectorBatch **[groupCount];
for (uint32_t i = 0; i < groupCount; i++) {
probeVecBatches[i] = ConstructProbeTestData(numbers[i], threadNums[i]);
}
const auto processorCount = std::thread::hardware_concurrency();
std::cout << "core number: " << processorCount << std::endl;
for (uint32_t i = 0; i < groupCount; ++i) {
auto t = threadNums[i] < processorCount ? processorCount / threadNums[i] : 1;
uint32_t threadNum = threadNums[i];
double wallElapsed = 0;
double cpuElapsed = 0;
TestHashJoinMultiThreads(buildVecBatches[i], threadNum, probeVecBatches[i], wallElapsed, cpuElapsed);
std::cout << "testLookupJoinOriginalMultiThreads " << threadNum << " wallElapsed time: " << wallElapsed <<
"s" << std::endl;
std::cout << "testLookupJoinOriginalMultiThreads " << threadNum << " cpuElapsed time: " <<
cpuElapsed / processorCount * t << "s" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(TIME_TO_SLEEP));
}
for (uint32_t i = 0; i < groupCount; i++) {
FreeVecBatches(buildVecBatches[i], threadNums[i]);
FreeVecBatches(probeVecBatches[i], VEC_BATCH_COUNT_1);
}
delete[] buildVecBatches;
delete[] probeVecBatches;
}
TEST(NativeOmniJoinTest, TestInnerEqualityProbeJITMultiThreads)
{
int32_t numbers[3][16] = {
{6},
{6, 13, 7, 4, 1, 9, 3, 2},
{8, 13, 75, 12, 27, 28, 3, 2, 6, 38, 36, 32, 1, 20, 50, 37}
};
uint32_t threadNums[] = {1, 8, 16};
uint32_t groupCount = sizeof(threadNums) / sizeof(uint32_t);
auto ***buildVecBatches = new VectorBatch **[groupCount];
for (uint32_t i = 0; i < groupCount; i++) {
buildVecBatches[i] = ConstructBuilderTestData(numbers[i], threadNums[i]);
}
auto ***probeVecBatches = new VectorBatch **[groupCount];
for (uint32_t i = 0; i < groupCount; i++) {
probeVecBatches[i] = ConstructProbeTestData(numbers[i], threadNums[i]);
}
const auto processorCount = std::thread::hardware_concurrency();
std::cout << "core number: " << processorCount << std::endl;
for (uint32_t i = 0; i < groupCount; ++i) {
auto t = threadNums[i] < processorCount ? processorCount / threadNums[i] : 1;
uint32_t threadNum = threadNums[i];
double wallElapsed = 0;
double cpuElapsed = 0;
TestHashJoinMultiThreads(buildVecBatches[i], threadNum, probeVecBatches[i], wallElapsed, cpuElapsed);
std::cout << "testLookupJoinOriginalMultiThreads " << threadNum << " wallElapsed time: " << wallElapsed <<
"s" << std::endl;
std::cout << "testLookupJoinOriginalMultiThreads " << threadNum << " cpuElapsed time: " <<
cpuElapsed / processorCount * t << "s" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(TIME_TO_SLEEP));
}
for (uint32_t i = 0; i < groupCount; i++) {
FreeVecBatches(buildVecBatches[i], threadNums[i]);
FreeVecBatches(probeVecBatches[i], VEC_BATCH_COUNT_1);
}
delete[] buildVecBatches;
delete[] probeVecBatches;
}
TEST(NativeOmniJoinTest, TestLeftEqualityJoin)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t buildData0[] = {1, 2, 3, 4};
int64_t buildData1[] = {111, 11, 333, 33};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_LEFT, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t probeData0[] = {1, 2, 3, 4};
int64_t probeData1[] = {11, 22, 33, 44};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int64_t expectedDatas[4][expectedDataSize] = {
{1, 2, 3, 4},
{11, 22, 33, 44},
{2, 0, 4, 0},
{11, 0, 33, 0}};
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ LongType(), LongType(), LongType(), LongType() }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedDatas[0], expectedDatas[1],
expectedDatas[2], expectedDatas[3]);
expectedVecbatch->Get(2)->SetNull(1);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(1);
expectedVecbatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftEqualityJoinAndBuildLeft)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t buildData0[] = {1, 2, 3, 4};
int64_t buildData1[] = {111, 11, 333, 33};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_LEFT, OMNI_BUILD_LEFT, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t probeData0[] = {1, 2, 3, 4};
int64_t probeData1[] = {11, 22, 33, 44};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
auto lookupOuterJoinFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(probeTypes,
probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
BaseVector **pVector = outputVecBatch->GetVectors();
std::rotate(pVector, pVector + 2, pVector + outputVecBatch->GetVectorCount());
const int32_t expectedDataSize = 2;
DataTypes expectedTypes(std::vector<DataTypePtr>({ LongType(), LongType(), LongType(), LongType() }));
int64_t expectedDatas[4][expectedDataSize] = {
{1, 3},
{11, 33},
{2, 4},
{11, 33}};
AssertVecBatchEquals(outputVecBatch, probeTypes.GetSize() + buildOutputColsCount, expectedDataSize,
expectedDatas[0], expectedDatas[1]);
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t expectedDatasSize = 2;
int64_t expectedData0[expectedDatasSize] = {0, 0};
int64_t expectedData1[expectedDatasSize] = {0, 0};
int64_t expectedData2[expectedDatasSize] = {3, 1};
int64_t expectedData3[expectedDatasSize] = {333, 111};
auto expectedVec0 = CreateVector(expectedDatasSize, expectedData0);
auto expectedVec1 = CreateVector(expectedDatasSize, expectedData1);
auto expectedVec2 = CreateVector(expectedDatasSize, expectedData2);
auto expectedVec3 = CreateVector(expectedDatasSize, expectedData3);
auto vectorBatch = new VectorBatch(expectedDatasSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(appendOutput);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftEqualityJoinAndBuildLeftWithArray)
{
const int32_t dataSize = 5;
DataTypes buildTypes(std::vector<DataTypePtr>({LongType(), LongType()}));
int64_t buildData0[] = {1, 2, 3, 4, 5};
int64_t buildData1[] = {10, 11, 14, 15, 12};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_LEFT, OMNI_BUILD_LEFT, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({LongType(), LongType()}));
int64_t probeData0[] = {1, 2, 3, 4, 5};
int64_t probeData1[] = {11, 22, 33, 12, 10};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({LongType(), LongType()}));
auto hashBuilderFactoryAddr = (int64_t) hashBuilderFactory;
auto lookupJoinFactory =
LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount,
probeHashCols,
probeHashColsCount,
buildOutputCols,
buildOutputColsCount,
buildOutputTypes,
hashBuilderFactoryAddr, nullptr, false,
nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
auto lookupOuterJoinFactory =
LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(probeTypes,
probeOutputCols,
probeOutputColsCount,
buildOutputCols,
buildOutputTypes,
hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 3;
DataTypes expectedTypes(std::vector<DataTypePtr>({LongType(), LongType(), LongType(), LongType()}));
int64_t expectedDatas[4][expectedDataSize] = {
{1, 4, 5},
{11, 12, 10},
{2, 5, 1},
{11, 12, 10}};
auto *expectedVecbatch = CreateVectorBatch(expectedTypes, expectedDataSize, expectedDatas[0], expectedDatas[1],
expectedDatas[2], expectedDatas[3]);
BaseVector **pVector = outputVecBatch->GetVectors();
std::rotate(pVector, pVector + 2, pVector + outputVecBatch->GetVectorCount());
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t expectedDatasSize = 2;
int64_t expectedData0[expectedDatasSize] = {0, 0};
int64_t expectedData1[expectedDatasSize] = {0, 0};
int64_t expectedData2[expectedDatasSize] = {3, 4};
int64_t expectedData3[expectedDatasSize] = {14, 15};
auto expectedVec0 = CreateVector(expectedDatasSize, expectedData0);
auto expectedVec1 = CreateVector(expectedDatasSize, expectedData1);
auto expectedVec2 = CreateVector(expectedDatasSize, expectedData2);
auto expectedVec3 = CreateVector(expectedDatasSize, expectedData3);
auto vectorBatch = new VectorBatch(expectedDatasSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(appendOutput);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftEqualityJoinChar)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3) }));
int64_t buildData0[dataSize] = {1, 2, 3, 4};
std::string buildData1[dataSize] = {"aaa", "11", "ccc", "33"};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_LEFT, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(2) }));
int64_t probeData0[dataSize] = {1, 2, 3, 4};
std::string probeData1[dataSize] = {"11", "22", "33", "44"};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int64_t expectedData0[expectedDataSize] = {1, 2, 3, 4};
std::string expectedData1[expectedDataSize] = {"11", "22", "33", "44"};
int64_t expectedData2[expectedDataSize] = {2, 0, 4, 0};
std::string expectedData3[expectedDataSize] = {"11", "", "33", ""};
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3), LongType(), VarcharType(2) }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedData0, expectedData1,
expectedData2, expectedData3);
expectedVecbatch->Get(2)->SetNull(1);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(1);
expectedVecbatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftEqualityJoinDate32)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), Date32Type(DAY) }));
int64_t buildData0[dataSize] = {1, 2, 3, 4};
int32_t buildData1[dataSize] = {123, 11, 321, 33};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_LEFT, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), Date32Type(DAY) }));
int64_t probeData0[dataSize] = {1, 2, 3, 4};
int32_t probeData1[dataSize] = {11, 22, 33, 44};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), Date32Type(DAY) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int64_t expectedData0[expectedDataSize] = {1, 2, 3, 4};
int32_t expectedData1[expectedDataSize] = {11, 22, 33, 44};
int64_t expectedData2[expectedDataSize] = {2, 0, 4, 0};
int32_t expectedData3[expectedDataSize] = {11, -1, 33, -1};
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ LongType(), IntType(), LongType(), IntType() }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedData0, expectedData1,
expectedData2, expectedData3);
expectedVecbatch->Get(2)->SetNull(1);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(1);
expectedVecbatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftEqualityJoinShort)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), ShortType() }));
int64_t buildData0[dataSize] = {1, 2, 3, 4};
int16_t buildData1[dataSize] = {123, 11, 321, 33};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_LEFT, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), ShortType() }));
int64_t probeData0[dataSize] = {1, 2, 3, 4};
int16_t probeData1[dataSize] = {11, 22, 33, 44};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), ShortType() }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int64_t expectedData0[expectedDataSize] = {1, 2, 3, 4};
int16_t expectedData1[expectedDataSize] = {11, 22, 33, 44};
int64_t expectedData2[expectedDataSize] = {2, 0, 4, 0};
int16_t expectedData3[expectedDataSize] = {11, -1, 33, -1};
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ LongType(), ShortType(), LongType(), ShortType() }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedData0, expectedData1,
expectedData2, expectedData3);
expectedVecbatch->Get(2)->SetNull(1);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(1);
expectedVecbatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftEqualityJoinDecimal64)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), Decimal64Type(3, 0) }));
int64_t buildData0[dataSize] = {1, 2, 3, 4};
int64_t buildData1[dataSize] = {123, 11, 321, 33};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_LEFT, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), Decimal64Type(2, 0) }));
int64_t probeData0[dataSize] = {1, 2, 3, 4};
int64_t probeData1[dataSize] = {11, 22, 33, 44};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), Decimal64Type(3, 0) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int64_t expectedData0[expectedDataSize] = {1, 2, 3, 4};
int64_t expectedData1[expectedDataSize] = {11, 22, 33, 44};
int64_t expectedData2[expectedDataSize] = {2, 0, 4, 0};
int64_t expectedData3[expectedDataSize] = {11, -1, 33, -1};
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ LongType(), LongType(), LongType(), LongType() }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedData0, expectedData1,
expectedData2, expectedData3);
expectedVecbatch->Get(2)->SetNull(1);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(1);
expectedVecbatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftEqualityJoinDecimal128)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), Decimal128Type(3, 0) }));
int64_t buildData0[dataSize] = {1, 2, 3, 4};
Decimal128 buildData1[dataSize] = {Decimal128(123, 0), Decimal128(11, 0), Decimal128(321, 0), Decimal128(33, 0)};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_LEFT, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), Decimal128Type(2, 0) }));
int64_t probeData0[dataSize] = {1, 2, 3, 4};
Decimal128 probeData1[dataSize] = {Decimal128(11, 0), Decimal128(22, 0), Decimal128(33, 0), Decimal128(44, 0)};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), Decimal128Type(3, 0) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int64_t expectedData0[expectedDataSize] = {1, 2, 3, 4};
Decimal128 expectedData1[expectedDataSize] = {Decimal128(11, 0), Decimal128(22, 0), Decimal128(33, 0),
Decimal128(44, 0)};
int64_t expectedData2[expectedDataSize] = {2, 0, 4, 0};
Decimal128 expectedData3[expectedDataSize] = {Decimal128(11, 0), Decimal128(0, 0), Decimal128(33, 0),
Decimal128(0, 0)};
DataTypes expectedDataTypes(
std::vector<DataTypePtr>({ LongType(), Decimal128Type(), LongType(), Decimal128Type() }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedData0, expectedData1,
expectedData2, expectedData3);
expectedVecbatch->Get(2)->SetNull(1);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(1);
expectedVecbatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestInnerEqualityJoinDictionary)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t buildData0[] = {1, 2, 3, 4};
int64_t buildData1[] = {111, 11, 333, 33};
auto *buildVecBatch = new VectorBatch(dataSize);
buildVecBatch->Append(CreateVector<int64_t>(dataSize, buildData0));
const DataTypePtr &dataType = buildTypes.GetType(1);
int32_t ids[] = {0, 1, 2, 3};
buildVecBatch->Append(CreateDictionaryVector(*dataType, dataSize, ids, dataSize, buildData1));
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_INNER, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t probeData0[] = {1, 2, 3, 4};
int64_t probeData1[] = {11, 22, 33, 44};
auto *probeVecBatch = new VectorBatch(dataSize);
probeVecBatch->Append(CreateVector<int64_t>(dataSize, probeData0));
const DataTypePtr &probeDataType = probeTypes.GetType(1);
probeVecBatch->Append(CreateDictionaryVector(*probeDataType, dataSize, ids, dataSize, probeData1));
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 2;
int64_t expectedDatas[4][expectedDataSize] = {
{1, 3},
{11, 33},
{2, 4},
{11, 33}};
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ LongType(), LongType(), LongType(), LongType() }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedDatas[0], expectedDatas[1],
expectedDatas[2], expectedDatas[3]);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestInnerEqualityJoinHasOutputNulls)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3) }));
int64_t buildData0[dataSize] = {1, 0, 3, 0};
std::string buildData1[dataSize] = {"aaa", "11", "ccc", "33"};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
buildVecBatch->Get(0)->SetNull(1);
buildVecBatch->Get(0)->SetNull(3);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_INNER, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(2) }));
int64_t probeData0[dataSize] = {0, 2, 0, 4};
std::string probeData1[dataSize] = {"11", "22", "33", "44"};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
probeVecBatch->Get(0)->SetNull(0);
probeVecBatch->Get(0)->SetNull(2);
auto expectedProbeVec0 = SliceVector(probeVecBatch->Get(0), 0, dataSize);
auto expectedProbeVec1 = SliceVector(probeVecBatch->Get(1), 0, dataSize);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
LookupJoinOperatorFactory *lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(
probeTypes, probeOutputCols, probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols,
buildOutputColsCount, buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto *lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 2;
int64_t expectedData2[expectedDataSize] = {0, 0};
std::string expectedData3[expectedDataSize] = {"11", "33"};
auto expectedVector2 = CreateVector<int64_t>(expectedDataSize, expectedData2);
expectedVector2->SetNull(0);
expectedVector2->SetNull(1);
auto expectedVector3 = CreateVarcharVector(expectedData3, expectedDataSize);
int32_t ids[2] = {0, 2};
auto *expectedVecBatch = new VectorBatch(expectedDataSize);
DataTypePtr dataType = LongType();
expectedVecBatch->Append(CreateDictionary<OMNI_LONG>(expectedProbeVec0, ids, expectedDataSize));
expectedVecBatch->Append(CreateDictionary<OMNI_VARCHAR>(expectedProbeVec1, ids, expectedDataSize));
expectedVecBatch->Append(expectedVector2);
expectedVecBatch->Append(expectedVector3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecBatch));
delete expectedProbeVec0;
delete expectedProbeVec1;
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestInnerEqualityJoinHasOutputNullsChar)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), CharType(3) }));
int64_t buildData0[dataSize] = {1, 0, 3, 0};
std::string buildData1[dataSize] = {"aaa", "11", "ccc", "33"};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
buildVecBatch->Get(0)->SetNull(1);
buildVecBatch->Get(0)->SetNull(3);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_INNER, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), CharType(2) }));
int64_t probeData0[dataSize] = {0, 2, 0, 4};
std::string probeData1[dataSize] = {"11", "22", "33", "44"};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
probeVecBatch->Get(0)->SetNull(0);
probeVecBatch->Get(0)->SetNull(2);
auto expectedProbeVec0 = SliceVector(probeVecBatch->Get(0), 0, dataSize);
auto expectedProbeVec1 = SliceVector(probeVecBatch->Get(1), 0, dataSize);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), CharType(3) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 2;
int64_t expectedData2[expectedDataSize] = {0, 0};
std::string expectedData3[expectedDataSize] = {"11", "33"};
auto expectedVector2 = CreateVector<int64_t>(expectedDataSize, expectedData2);
expectedVector2->SetNull(0);
expectedVector2->SetNull(1);
auto expectedVector3 = CreateVarcharVector(expectedData3, expectedDataSize);
auto expectedVecBatch = new VectorBatch(expectedDataSize);
int32_t ids[2] = {0, 2};
expectedVecBatch->Append(CreateDictionary<OMNI_LONG>(expectedProbeVec0, ids, expectedDataSize));
expectedVecBatch->Append(CreateDictionary<OMNI_CHAR>(expectedProbeVec1, ids, expectedDataSize));
expectedVecBatch->Append(expectedVector2);
expectedVecBatch->Append(expectedVector3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecBatch));
delete expectedProbeVec0;
delete expectedProbeVec1;
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestInnerEqualityJoinWithIntFilter)
{
const int32_t dataSize = 10;
DataTypes buildTypes(std::vector<DataTypePtr>({ IntType(), IntType() }));
int32_t buildData0[dataSize] = {19, 14, 7, 19, 1, 20, 10, 13, 20, 16};
int32_t buildData1[dataSize] = {35709, 31904, 35709, 31904, 35709, 31904, 35709, 31904, 35709, 31904};
auto buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
auto *notEqualLeft = new FieldExpr(1, IntType());
auto *notEqualRight = new FieldExpr(3, IntType());
auto *notEqualExpr =
new BinaryExpr(omniruntime::expressions::Operator::NEQ, notEqualLeft, notEqualRight, BooleanType());
auto hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_INNER,
buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto hashBuilderOperator = static_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ IntType(), IntType() }));
int32_t probeData0[dataSize] = {20, 16, 13, 4, 20, 4, 22, 19, 8, 7};
int32_t probeData1[dataSize] = {35709, 35709, 31904, 12477, 31904, 38721, 90419, 35709, 88371, 35709};
auto probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
for (int32_t i = 0; i < dataSize; i++) {
if (i % 5 == 4) {
probeVecBatch->Get(1)->SetNull(i);
} else {
static_cast<Vector<int32_t> *>(probeVecBatch->Get(1))->SetValue(i, probeData1[i]);
}
}
auto expectedProbeVec0 = SliceVector(probeVecBatch->Get(0), 0, dataSize);
auto expectedProbeVec1 = SliceVector(probeVecBatch->Get(1), 0, dataSize);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ IntType(), IntType() }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, notEqualExpr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectDataSize = 3;
int32_t ids[expectDataSize] = {0, 1, 7};
int32_t expectData2[expectDataSize] = {20, 16, 19};
int32_t expectData3[expectDataSize] = {31904, 31904, 31904};
auto expectVecBatch = new VectorBatch(expectDataSize);
expectVecBatch->Append(CreateDictionary<OMNI_INT>(expectedProbeVec0, ids, expectDataSize));
expectVecBatch->Append(CreateDictionary<OMNI_INT>(expectedProbeVec1, ids, expectDataSize));
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData2));
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData3));
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
Expr::DeleteExprs({ notEqualExpr });
delete expectedProbeVec0;
delete expectedProbeVec1;
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestInnerEqualityJoinWithIntFilterWithOddDataSize)
{
const int32_t dataSize = 11;
DataTypes buildTypes(std::vector<DataTypePtr>({IntType(), IntType()}));
int32_t buildData0[dataSize] = {19, 14, 7, 19, 1, 20, 10, 13, 20, 16, 15};
int32_t buildData1[dataSize] = {35709, 31904, 35709, 31904, 35709, 31904, 35709, 31904, 35709, 31904, 31904};
auto buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
auto *notEqualLeft = new FieldExpr(1, IntType());
auto *notEqualRight = new FieldExpr(3, IntType());
auto *notEqualExpr =
new BinaryExpr(omniruntime::expressions::Operator::NEQ, notEqualLeft, notEqualRight, BooleanType());
auto hashBuilderFactory =
HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_INNER,
buildTypes, buildJoinCols,
joinColsCount,
operatorCount);
auto hashBuilderOperator = static_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({IntType(), IntType()}));
int32_t probeData0[dataSize] = {20, 16, 13, 4, 20, 4, 22, 19, 8, 7, 11};
int32_t probeData1[dataSize] = {35709, 35709, 31904, 12477, 31904, 38721, 90419, 35709, 88371, 35709, 35709};
auto probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
for (int32_t i = 0; i < dataSize; i++) {
if (i % 5 == 4) {
probeVecBatch->Get(1)->SetNull(i);
} else {
static_cast<Vector<int32_t> *>(probeVecBatch->Get(1))->SetValue(i, probeData1[i]);
}
}
auto expectedProbeVec0 = SliceVector(probeVecBatch->Get(0), 0, dataSize);
auto expectedProbeVec1 = SliceVector(probeVecBatch->Get(1), 0, dataSize);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({IntType(), IntType()}));
auto hashBuilderFactoryAddr = (int64_t) hashBuilderFactory;
auto lookupJoinFactory =
LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount,
probeHashCols,
probeHashColsCount,
buildOutputCols,
buildOutputColsCount,
buildOutputTypes,
hashBuilderFactoryAddr,
notEqualExpr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectDataSize = 3;
int32_t ids[expectDataSize] = {0, 1, 7};
int32_t expectData2[expectDataSize] = {20, 16, 19};
int32_t expectData3[expectDataSize] = {31904, 31904, 31904};
auto expectVecBatch = new VectorBatch(expectDataSize);
expectVecBatch->Append(CreateDictionary<OMNI_INT>(expectedProbeVec0, ids, expectDataSize));
expectVecBatch->Append(CreateDictionary<OMNI_INT>(expectedProbeVec1, ids, expectDataSize));
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData2));
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData3));
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
Expr::DeleteExprs({notEqualExpr});
delete expectedProbeVec0;
delete expectedProbeVec1;
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestInnerEqualityJoinWithIntFilterWithoutArray)
{
const int32_t dataSize = 11;
DataTypes buildTypes(std::vector<DataTypePtr>({IntType(), IntType()}));
int32_t buildData0[dataSize] = {19, 14, 7, 19, 1, 20, 10, 13, 20, 16, 99999};
int32_t buildData1[dataSize] = {35709, 31904, 35709, 31904, 35709, 31904, 35709, 31904, 35709, 31904, 31904};
auto buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
auto *notEqualLeft = new FieldExpr(1, IntType());
auto *notEqualRight = new FieldExpr(3, IntType());
auto *notEqualExpr =
new BinaryExpr(omniruntime::expressions::Operator::NEQ, notEqualLeft, notEqualRight, BooleanType());
auto hashBuilderFactory =
HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_INNER,
buildTypes, buildJoinCols,
joinColsCount,
operatorCount);
auto hashBuilderOperator = static_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({IntType(), IntType()}));
int32_t probeData0[dataSize] = {20, 16, 13, 4, 20, 4, 22, 19, 8, 7, 11};
int32_t probeData1[dataSize] = {35709, 35709, 31904, 12477, 31904, 38721, 90419, 35709, 88371, 35709, 35709};
auto probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
for (int32_t i = 0; i < dataSize; i++) {
if (i % 5 == 4) {
probeVecBatch->Get(1)->SetNull(i);
} else {
static_cast<Vector<int32_t> *>(probeVecBatch->Get(1))->SetValue(i, probeData1[i]);
}
}
auto expectedProbeVec0 = SliceVector(probeVecBatch->Get(0), 0, dataSize);
auto expectedProbeVec1 = SliceVector(probeVecBatch->Get(1), 0, dataSize);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({IntType(), IntType()}));
auto hashBuilderFactoryAddr = (int64_t) hashBuilderFactory;
auto lookupJoinFactory =
LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount,
probeHashCols,
probeHashColsCount,
buildOutputCols,
buildOutputColsCount,
buildOutputTypes,
hashBuilderFactoryAddr,
notEqualExpr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectDataSize = 3;
int32_t ids[expectDataSize] = {0, 1, 7};
int32_t expectData2[expectDataSize] = {20, 16, 19};
int32_t expectData3[expectDataSize] = {31904, 31904, 31904};
auto expectVecBatch = new VectorBatch(expectDataSize);
expectVecBatch->Append(CreateDictionary<OMNI_INT>(expectedProbeVec0, ids, expectDataSize));
expectVecBatch->Append(CreateDictionary<OMNI_INT>(expectedProbeVec1, ids, expectDataSize));
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData2));
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData3));
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
Expr::DeleteExprs({notEqualExpr});
delete expectedProbeVec0;
delete expectedProbeVec1;
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
omniruntime::expressions::Expr *CreateJoinFilterExprWithChar()
{
std::string funcStr = "substr";
DataTypePtr retType = VarcharType();
auto leftSubstrColumn = new FieldExpr(1, VarcharType());
auto leftSubstrIndex = new LiteralExpr(1, IntType());
auto leftSubstrLen = new LiteralExpr(5, IntType());
std::vector<Expr *> leftSubstrArgs;
leftSubstrArgs.push_back(leftSubstrColumn);
leftSubstrArgs.push_back(leftSubstrIndex);
leftSubstrArgs.push_back(leftSubstrLen);
auto leftSubstrExpr = GetFuncExpr(funcStr, leftSubstrArgs, VarcharType());
auto rightSubstrColumn = new FieldExpr(3, VarcharType());
auto rightSubstrIndex = new LiteralExpr(1, IntType());
auto rightSubstrLen = new LiteralExpr(5, IntType());
std::vector<Expr *> rightSubstrArgs;
rightSubstrArgs.push_back(rightSubstrColumn);
rightSubstrArgs.push_back(rightSubstrIndex);
rightSubstrArgs.push_back(rightSubstrLen);
auto rightSubstrExpr = GetFuncExpr(funcStr, rightSubstrArgs, VarcharType());
auto *notEqualExpr =
new BinaryExpr(omniruntime::expressions::Operator::NEQ, leftSubstrExpr, rightSubstrExpr, BooleanType());
return notEqualExpr;
}
TEST(NativeOmniJoinTest, TestInnerEqualityJoinWithCharFilter)
{
const int32_t dataSize = 10;
DataTypes buildTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
int32_t buildData0[dataSize] = {19, 14, 7, 19, 1, 20, 10, 13, 20, 16};
std::string buildData1[dataSize] = {"35709", "31904", "35709", "31904", "35709", "31904", "35709", "31904",
"35709", "31904"};
auto buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
string filterExpression = "$operator$NOT_EQUAL:4(substr:15(#1, 1:1, 5:1), substr:15(#3, 1:1, 5:1))";
auto hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_INNER,
buildTypes, buildJoinCols, joinColsCount, operatorCount);
omniruntime::expressions::Expr *joinFilter = CreateJoinFilterExprWithChar();
auto hashBuilderOperator = static_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
int32_t probeData0[dataSize] = {20, 16, 13, 4, 20, 4, 22, 19, 8, 7};
std::string probeData1[dataSize] = {"35709", "35709", "31904", "12477", "31904", "38721", "90419", "35709",
"88371", "35709"};
auto probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
using VarcharVector = Vector<LargeStringContainer<std::string_view>>;
for (int32_t i = 0; i < dataSize; i++) {
if (i % 5 == 4) {
static_cast<VarcharVector *>(probeVecBatch->Get(1))->SetNull(i);
} else {
std::string_view value(probeData1[i].data(), 5);
static_cast<VarcharVector *>(probeVecBatch->Get(1))->SetValue(i, value);
}
}
auto expectedProbeVec0 = SliceVector(probeVecBatch->Get(0), 0, dataSize);
auto expectedProbeVec1 = SliceVector(probeVecBatch->Get(1), 0, dataSize);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, joinFilter, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectDataSize = 3;
int32_t expectData2[expectDataSize] = {20, 16, 19};
std::string expectData3[expectDataSize] = {"31904", "31904", "31904"};
int32_t ids[expectDataSize] = {0, 1, 7};
auto expectVecBatch = new VectorBatch(expectDataSize);
expectVecBatch->Append(CreateDictionary<OMNI_INT>(expectedProbeVec0, ids, expectDataSize));
expectVecBatch->Append(CreateDictionary<OMNI_VARCHAR>(expectedProbeVec1, ids, expectDataSize));
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData2));
expectVecBatch->Append(CreateVarcharVector(expectData3, expectDataSize));
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
Expr::DeleteExprs({ joinFilter });
delete expectedProbeVec0;
delete expectedProbeVec1;
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestInnerEqualityJoinWithCharFilterWithOddDataSize)
{
const int32_t dataSize = 9;
DataTypes buildTypes(std::vector<DataTypePtr>({IntType(), VarcharType(5)}));
int32_t buildData0[dataSize] = {19, 14, 7, 19, 1, 20, 10, 13, 16};
std::string buildData1[dataSize] = {"35709", "31904", "35709", "31904", "35709", "31904", "35709", "31904",
"31904"};
auto buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
string filterExpression = "$operator$NOT_EQUAL:4(substr:15(#1, 1:1, 5:1), substr:15(#3, 1:1, 5:1))";
auto hashBuilderFactory =
HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_INNER,
buildTypes, buildJoinCols,
joinColsCount,
operatorCount);
omniruntime::expressions::Expr *joinFilter = CreateJoinFilterExprWithChar();
auto hashBuilderOperator = static_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({IntType(), VarcharType(5)}));
int32_t probeData0[dataSize] = {20, 16, 13, 4, 20, 4, 22, 19, 8};
std::string probeData1[dataSize] = {"35709", "35709", "31904", "12477", "31904", "38721", "90419", "35709",
"88371"};
auto probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
using VarcharVector = Vector<LargeStringContainer<std::string_view>>;
for (int32_t i = 0; i < dataSize; i++) {
if (i % 5 == 4) {
static_cast<VarcharVector *>(probeVecBatch->Get(1))->SetNull(i);
} else {
std::string_view value(probeData1[i].data(), 5);
static_cast<VarcharVector *>(probeVecBatch->Get(1))->SetValue(i, value);
}
}
auto expectedProbeVec0 = SliceVector(probeVecBatch->Get(0), 0, dataSize);
auto expectedProbeVec1 = SliceVector(probeVecBatch->Get(1), 0, dataSize);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({IntType(), VarcharType(5)}));
auto hashBuilderFactoryAddr = (int64_t) hashBuilderFactory;
auto lookupJoinFactory =
LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount,
probeHashCols,
probeHashColsCount,
buildOutputCols,
buildOutputColsCount,
buildOutputTypes,
hashBuilderFactoryAddr,
joinFilter, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectDataSize = 3;
int32_t expectData2[expectDataSize] = {20, 16, 19};
std::string expectData3[expectDataSize] = {"31904", "31904", "31904"};
int32_t ids[expectDataSize] = {0, 1, 7};
auto expectVecBatch = new VectorBatch(expectDataSize);
expectVecBatch->Append(CreateDictionary<OMNI_INT>(expectedProbeVec0, ids, expectDataSize));
expectVecBatch->Append(CreateDictionary<OMNI_VARCHAR>(expectedProbeVec1, ids, expectDataSize));
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData2));
expectVecBatch->Append(CreateVarcharVector(expectData3, expectDataSize));
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
Expr::DeleteExprs({joinFilter});
delete expectedProbeVec0;
delete expectedProbeVec1;
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestInnerEqualityJoinWithCharFilter2)
{
const int32_t dataSize = 10;
DataTypes buildTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
int32_t buildData0[dataSize] = {20, 16, 13, 4, 20, 4, 22, 19, 8, 7};
std::string buildData1[dataSize] = {"35709", "35709", "31904", "12477", "31904", "38721", "90419", "35709",
"88371", "35709"};
auto buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
for (int32_t i = 0; i < dataSize; i++) {
if (i % 5 == 4) {
buildVecBatch->Get(1)->SetNull(i);
} else {
std::string_view value(buildData1[i].data(), 5);
static_cast<Vector<LargeStringContainer<std::string_view>> *>(buildVecBatch->Get(1))->SetValue(i, value);
}
}
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
string filterExpression = "$operator$NOT_EQUAL:4(substr:15(#1, 1:1, 5:1), substr:15(#3, 1:1, 5:1))";
auto hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_INNER,
buildTypes, buildJoinCols, joinColsCount, operatorCount);
omniruntime::expressions::Expr *joinFilter = CreateJoinFilterExprWithChar();
auto hashBuilderOperator = static_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
int32_t probeData0[dataSize] = {19, 14, 7, 19, 1, 20, 10, 13, 20, 16};
std::string probeData1[dataSize] = {"35709", "31904", "35709", "31904", "35709", "31904", "35709", "31904",
"35709", "31904"};
auto probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
auto expectedProbeVec0 = SliceVector(probeVecBatch->Get(0), 0, dataSize);
auto expectedProbeVec1 = SliceVector(probeVecBatch->Get(1), 0, dataSize);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, joinFilter, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectDataSize = 3;
int32_t expectData2[expectDataSize] = {19, 20, 16};
std::string expectData3[expectDataSize] = {"35709", "35709", "35709"};
int32_t ids[expectDataSize] = {3, 5, 9};
auto expectVecBatch = new VectorBatch(expectDataSize);
expectVecBatch->Append(CreateDictionary<OMNI_INT>(expectedProbeVec0, ids, expectDataSize));
expectVecBatch->Append(CreateDictionary<OMNI_VARCHAR>(expectedProbeVec1, ids, expectDataSize));
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData2));
expectVecBatch->Append(CreateVarcharVector(expectData3, expectDataSize));
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
Expr::DeleteExprs({ joinFilter });
delete expectedProbeVec0;
delete expectedProbeVec1;
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestInnerEqualityJoinWithCharFilter2WithOddDataSize)
{
const int32_t dataSize = 9;
DataTypes buildTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
int32_t buildData0[dataSize] = {20, 16, 13, 4, 20, 4, 22, 19, 8};
std::string buildData1[dataSize] = {"35709", "35709", "31904", "12477", "31904", "38721", "90419", "35709",
"88371"};
auto buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
for (int32_t i = 0; i < dataSize; i++) {
if (i % 5 == 4) {
buildVecBatch->Get(1)->SetNull(i);
} else {
std::string_view value(buildData1[i].data(), 5);
static_cast<Vector<LargeStringContainer<std::string_view>> *>(buildVecBatch->Get(1))->SetValue(i, value);
}
}
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
string filterExpression = "$operator$NOT_EQUAL:4(substr:15(#1, 1:1, 5:1), substr:15(#3, 1:1, 5:1))";
auto hashBuilderFactory =
HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_INNER,
buildTypes, buildJoinCols, joinColsCount,
operatorCount);
omniruntime::expressions::Expr *joinFilter = CreateJoinFilterExprWithChar();
auto hashBuilderOperator = static_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
int32_t probeData0[dataSize] = {19, 14, 7, 19, 1, 20, 10, 13, 20};
std::string probeData1[dataSize] = {"35709", "31904", "35709", "31904", "35709", "31904", "35709", "31904",
"35709"};
auto probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
auto expectedProbeVec0 = SliceVector(probeVecBatch->Get(0), 0, dataSize);
auto expectedProbeVec1 = SliceVector(probeVecBatch->Get(1), 0, dataSize);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory =
LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols,
probeHashColsCount, buildOutputCols,
buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr,
joinFilter, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectDataSize = 2;
int32_t expectData2[expectDataSize] = {19, 20};
std::string expectData3[expectDataSize] = {"35709", "35709"};
int32_t ids[expectDataSize] = {3, 5};
auto expectVecBatch = new VectorBatch(expectDataSize);
expectVecBatch->Append(CreateDictionary<OMNI_INT>(expectedProbeVec0, ids, expectDataSize));
expectVecBatch->Append(CreateDictionary<OMNI_VARCHAR>(expectedProbeVec1, ids, expectDataSize));
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData2));
expectVecBatch->Append(CreateVarcharVector(expectData3, expectDataSize));
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
Expr::DeleteExprs({joinFilter});
delete expectedProbeVec0;
delete expectedProbeVec1;
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftEqualityJoinWithCharFilter)
{
const int32_t dataSize = 10;
DataTypes buildTypes(std::vector<DataTypePtr>({IntType(), VarcharType(5)}));
int32_t buildData0[dataSize] = {19, 14, 7, 19, 1, 20, 10, 13, 20, 16};
std::string buildData1[dataSize] = {"35709", "31904", "35709", "31904", "35709", "31904", "35709", "31904",
"35709", "31904"};
auto buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
string filterExpression = "$operator$NOT_EQUAL:4(substr:15(#1, 1:1, 5:1), substr:15(#3, 1:1, 5:1))";
auto hashBuilderFactory =
HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_LEFT,
buildTypes, buildJoinCols,
joinColsCount,
operatorCount);
omniruntime::expressions::Expr *joinFilter = CreateJoinFilterExprWithChar();
auto hashBuilderOperator = static_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({IntType(), VarcharType(5)}));
int32_t probeData0[dataSize] = {20, 16, 13, 4, 20, 4, 22, 19, 8, 7};
std::string probeData1[dataSize] = {"35709", "35709", "31904", "12477", "31904", "38721", "90419", "35709",
"88371", "35709"};
auto probeVec0 = CreateVector<int32_t>(dataSize, probeData0);
auto probeVec1 = new Vector<LargeStringContainer<std::string_view>>(dataSize);
for (int32_t i = 0; i < dataSize; i++) {
if (i % 5 == 4) {
probeVec1->SetNull(i);
} else {
std::string_view value(probeData1[i].data(), probeData1[i].length());
probeVec1->SetValue(i, value);
}
}
auto probeVecBatch = new VectorBatch(dataSize);
probeVecBatch->Append(probeVec0);
probeVecBatch->Append(probeVec1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({IntType(), VarcharType(5)}));
auto hashBuilderFactoryAddr = (int64_t) hashBuilderFactory;
auto lookupJoinFactory =
LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount,
probeHashCols,
probeHashColsCount,
buildOutputCols,
buildOutputColsCount,
buildOutputTypes,
hashBuilderFactoryAddr,
joinFilter, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectDataSize = 10;
int32_t expectData0[expectDataSize] = {20, 16, 13, 20, 19, 7, 4, 4, 22, 8};
std::string expectData1[expectDataSize] = {"35709", "35709", "31904", "", "35709", "", "12477", "38721",
"90419", "88371"};
auto expectedProbeVec0 = CreateVector<int32_t>(dataSize, expectData0);
auto expectedProbeVec1 = new Vector<LargeStringContainer<std::string_view>>(dataSize);
for (int32_t i = 0; i < expectDataSize; i++) {
if (i == 3 || i == 5) {
expectedProbeVec1->SetNull(i);
} else {
std::string_view value(expectData1[i].data(), expectData1[i].length());
expectedProbeVec1->SetValue(i, value);
}
}
int32_t expectData2[expectDataSize] = {20, 16, -1, -1, 19, -1, -1, -1, -1, -1};
std::string expectData3[expectDataSize] = {"31904", "31904", "", "", "31904", "", "", "", "", ""};
auto expectVec2 = new Vector<int32_t>(expectDataSize);
auto expectVec3 = new Vector<LargeStringContainer<std::string_view>>(expectDataSize);
for (int32_t i = 0; i < expectDataSize; i++) {
if (i == 0 || i == 1 || i == 4) {
expectVec2->SetValue(i, expectData2[i]);
std::string_view value(expectData3[i].data(), expectData3[i].length());
expectVec3->SetValue(i, value);
} else {
expectVec2->SetNull(i);
expectVec3->SetNull(i);
}
}
auto expectVecBatch = new VectorBatch(expectDataSize);
expectVecBatch->Append(expectedProbeVec0);
expectVecBatch->Append(expectedProbeVec1);
expectVecBatch->Append(expectVec2);
expectVecBatch->Append(expectVec3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
Expr::DeleteExprs({joinFilter});
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftEqualityJoinWithCharFilterWithOddDataSize)
{
const int32_t dataSize = 11;
DataTypes buildTypes(std::vector<DataTypePtr>({IntType(), VarcharType(5)}));
int32_t buildData0[dataSize] = {19, 14, 7, 19, 1, 20, 10, 13, 20, 16, 16};
std::string buildData1[dataSize] = {"35709", "31904", "35709", "31904", "35709", "31904", "35709", "31904",
"35709", "31904", "31904"};
auto buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
string filterExpression = "$operator$NOT_EQUAL:4(substr:15(#1, 1:1, 5:1), substr:15(#3, 1:1, 5:1))";
auto hashBuilderFactory =
HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_LEFT,
buildTypes, buildJoinCols,
joinColsCount,
operatorCount);
omniruntime::expressions::Expr *joinFilter = CreateJoinFilterExprWithChar();
auto hashBuilderOperator = static_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({IntType(), VarcharType(5)}));
int32_t probeData0[dataSize] = {20, 16, 13, 4, 20, 4, 22, 19, 8, 7, 7};
std::string probeData1[dataSize] = {"35709", "35709", "31904", "12477", "31904", "38721", "90419", "35709",
"88371", "35709", "35709"};
auto probeVec0 = CreateVector<int32_t>(dataSize, probeData0);
auto probeVec1 = new Vector<LargeStringContainer<std::string_view>>(dataSize);
for (int32_t i = 0; i < dataSize; i++) {
if (i % 5 == 4) {
probeVec1->SetNull(i);
} else {
std::string_view value(probeData1[i].data(), probeData1[i].length());
probeVec1->SetValue(i, value);
}
}
auto probeVecBatch = new VectorBatch(dataSize);
probeVecBatch->Append(probeVec0);
probeVecBatch->Append(probeVec1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({IntType(), VarcharType(5)}));
auto hashBuilderFactoryAddr = (int64_t) hashBuilderFactory;
auto lookupJoinFactory =
LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount,
probeHashCols,
probeHashColsCount,
buildOutputCols,
buildOutputColsCount,
buildOutputTypes,
hashBuilderFactoryAddr,
joinFilter, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectDataSize = 12;
int32_t expectedData0[expectDataSize] = {20, 16, 16, 13, 20, 19, 7, 4, 4, 22, 8, 7};
std::string expectedData1[expectDataSize] = {"35709", "35709", "35709", "31904", "", "35709", "", "12477", "38721",
"90419",
"88371", "35709"};
int32_t expectData2[expectDataSize] = {20, 16, 16, -1, -1, 19, -1, -1, -1, -1, -1, -1};
std::string expectData3[expectDataSize] = {"31904", "31904", "31904", "", "", "31904", "", "", "", "", "", ""};
auto expectedProbeVec0 = CreateVector<int32_t>(expectDataSize, expectedData0);
auto expectedProbeVec1 = new Vector<LargeStringContainer<std::string_view>>(expectDataSize);
for (int32_t i = 0; i < expectDataSize; i++) {
if (i == 4 || i == 6) {
expectedProbeVec1->SetNull(i);
} else {
std::string_view value(expectedData1[i].data(), expectedData1[i].length());
expectedProbeVec1->SetValue(i, value);
}
}
auto expectVec2 = new Vector<int32_t>(expectDataSize);
auto expectVec3 = new Vector<LargeStringContainer<std::string_view>>(expectDataSize);
for (int32_t i = 0; i < expectDataSize; i++) {
if (i == 0 || i == 1 || i == 2 || i == 5) {
expectVec2->SetValue(i, expectData2[i]);
std::string_view value(expectData3[i].data(), expectData3[i].length());
expectVec3->SetValue(i, value);
} else {
expectVec2->SetNull(i);
expectVec3->SetNull(i);
}
}
auto expectVecBatch = new VectorBatch(expectDataSize);
expectVecBatch->Append(expectedProbeVec0);
expectVecBatch->Append(expectedProbeVec1);
expectVecBatch->Append(expectVec2);
expectVecBatch->Append(expectVec3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
Expr::DeleteExprs({ joinFilter });
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
VectorBatch *CreateBuildInputForAllTypes(DataTypes &buildTypes, void **buildDatas, int32_t dataSize, bool isDictionary)
{
int32_t buildTypesSize = buildTypes.GetSize();
auto *buildTypeIds = const_cast<int32_t *>(buildTypes.GetIds());
BaseVector *buildVectors[buildTypesSize];
for (int32_t i = 0; i < buildTypesSize; i++) {
buildVectors[i] = VectorHelper::CreateVector(OMNI_FLAT, buildTypeIds[i], dataSize);
SetValue(buildVectors[i], 0, buildDatas[i]);
}
for (int32_t i = 1; i < dataSize; i++) {
for (int32_t j = 0; j < buildTypesSize; j++) {
if (i == j + 1) {
buildVectors[j]->SetNull(i);
} else {
SetValue(buildVectors[j], i, buildDatas[j]);
}
}
}
if (isDictionary) {
int32_t ids[dataSize];
for (int32_t i = 0; i < dataSize; i++) {
ids[i] = i;
}
for (int32_t i = 0; i < buildTypesSize; i++) {
auto buildVector = buildVectors[i];
buildVectors[i] = DYNAMIC_TYPE_DISPATCH(CreateDictionary, buildTypeIds[i], buildVector, ids, dataSize);
delete buildVector;
}
}
auto buildVecBatch = new VectorBatch(dataSize);
for (int32_t i = 0; i < buildTypesSize; i++) {
buildVecBatch->Append(buildVectors[i]);
}
return buildVecBatch;
}
VectorBatch *CreateProbeInputForAllTypes(DataTypes &probeTypes, void **probeDatas, int32_t dataSize, bool isDictionary)
{
int32_t probeTypesSize = probeTypes.GetSize();
auto *probeTypeIds = const_cast<int32_t *>(probeTypes.GetIds());
BaseVector *probeVectors[probeTypesSize];
for (int32_t i = 0; i < probeTypesSize; i++) {
probeVectors[i] = VectorHelper::CreateVector(OMNI_FLAT, probeTypeIds[i], dataSize);
}
for (int32_t i = 0; i < dataSize - 1; i++) {
for (int32_t j = 0; j < probeTypesSize; j++) {
if (i == j) {
probeVectors[j]->SetNull(i);
} else {
SetValue(probeVectors[j], i, probeDatas[j]);
}
}
}
for (int32_t j = 0; j < probeTypesSize; j++) {
SetValue(probeVectors[j], probeTypesSize, probeDatas[j]);
}
if (isDictionary) {
int32_t ids[dataSize];
for (int32_t i = 0; i < dataSize; i++) {
ids[i] = i;
}
for (int32_t i = 0; i < probeTypesSize; i++) {
auto probeVector = probeVectors[i];
probeVectors[i] = DYNAMIC_TYPE_DISPATCH(CreateDictionary, probeTypeIds[i], probeVector, ids, dataSize);
delete probeVector;
}
}
auto probeVecBatch = new VectorBatch(dataSize);
for (int32_t j = 0; j < probeTypesSize; j++) {
probeVecBatch->Append(probeVectors[j]);
}
return probeVecBatch;
}
VectorBatch *CreateExpectVecBatchForAllTypes(DataTypes &probeTypes, VectorBatch *probeVecBatch,
VectorBatch *buildVecBatch)
{
int32_t probeVecCount = probeVecBatch->GetVectorCount();
int32_t buildVecCount = buildVecBatch->GetVectorCount();
const int32_t expectDataSize = 1;
int32_t index = 0;
auto expectVecBatch = new VectorBatch(expectDataSize);
for (int32_t i = 0; i < probeVecCount; i++) {
BaseVector *col = probeVecBatch->Get(i);
expectVecBatch->Append(SliceVector(col, probeVecCount, 1));
index++;
}
for (int32_t i = 0; i < buildVecCount; i++) {
auto buildVector = buildVecBatch->Get(i);
expectVecBatch->Append(SliceVector(buildVector, 0, 1));
index++;
}
return expectVecBatch;
}
TEST(NativeOmniJoinTest, TestInnerEqualityJoinOnAllTypesWithNulls)
{
int32_t intValue = 20;
int64_t longValue = 20;
bool boolValue = true;
double doubleValue = 20.0;
Decimal128 decimal128(20, 0);
std::string stringValue("20");
int16_t shortValue = 20;
const int32_t dataSize = 11;
void *joinDatas[dataSize] = {&intValue, &longValue, &boolValue, &doubleValue, &intValue, &longValue, &decimal128,
&stringValue, &stringValue, &shortValue};
DataTypes joinTypes(std::vector<DataTypePtr>({ IntType(), LongType(), BooleanType(), DoubleType(), Date32Type(DAY),
Decimal64Type(2, 0), Decimal128Type(2, 0), VarcharType(2), CharType(2), ShortType() }));
int32_t joinTypesSize = joinTypes.GetSize();
int32_t joinColumns[joinTypesSize];
for (int32_t i = 0; i < joinTypesSize; i++) {
joinColumns[i] = i;
}
auto buildVecBatch = CreateBuildInputForAllTypes(joinTypes, joinDatas, dataSize, false);
auto probeVecBatch = CreateProbeInputForAllTypes(joinTypes, joinDatas, dataSize, false);
auto expectVecBatch = CreateExpectVecBatchForAllTypes(joinTypes, probeVecBatch, buildVecBatch);
int32_t operatorCount = 1;
auto hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_INNER,
joinTypes, joinColumns, joinTypesSize, operatorCount);
auto hashBuilderOperator = static_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
int32_t buildOutputColsCount = joinTypes.GetSize();
auto lookupJoinFactory =
LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(joinTypes, joinColumns, joinTypesSize, joinColumns,
joinTypesSize, joinColumns, buildOutputColsCount, joinTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestInnerEqualityJoinOnDictionaryWithNulls)
{
int32_t intValue = 20;
int64_t longValue = 20;
bool boolValue = true;
double doubleValue = 20.0;
Decimal128 decimal128(20, 0);
std::string stringValue("20");
int16_t shortValue = 20;
const int32_t dataSize = 11;
void *joinDatas[dataSize] = {&intValue, &longValue, &boolValue, &doubleValue, &intValue, &longValue, &decimal128,
&stringValue, &stringValue, &shortValue};
DataTypes joinTypes(std::vector<DataTypePtr>({ IntType(), LongType(), BooleanType(), DoubleType(), Date32Type(DAY),
Decimal64Type(2, 0), Decimal128Type(2, 0), VarcharType(2), CharType(2), ShortType() }));
int32_t joinTypesSize = joinTypes.GetSize();
int32_t joinColumns[joinTypesSize];
for (int32_t i = 0; i < joinTypesSize; i++) {
joinColumns[i] = i;
}
auto buildVecBatch = CreateBuildInputForAllTypes(joinTypes, joinDatas, dataSize, true);
auto probeVecBatch = CreateProbeInputForAllTypes(joinTypes, joinDatas, dataSize, true);
auto expectVecBatch = CreateExpectVecBatchForAllTypes(joinTypes, probeVecBatch, buildVecBatch);
int32_t operatorCount = 1;
auto hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_INNER,
joinTypes, joinColumns, joinTypesSize, operatorCount);
auto hashBuilderOperator = static_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
int32_t buildOutputColsCount = joinTypes.GetSize();
auto lookupJoinFactory =
LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(joinTypes, joinColumns, joinTypesSize, joinColumns,
joinTypesSize, joinColumns, buildOutputColsCount, joinTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
LookupOuterJoinOperatorFactory *CreateSimpleLookupOuterJoinFactory(const HashBuilderOperatorFactory *hashBuilderFactory)
{
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int32_t probeOutputCols[1] = {1};
int32_t probeOutputColsCount = 1;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType() }));
int32_t buildOutputCols[1] = {1};
auto hashBuilderFactoryAddr = reinterpret_cast<int64_t>(hashBuilderFactory);
auto lookupOuterJoinFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(probeTypes,
probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
return lookupOuterJoinFactory;
}
TEST(NativeOmniJoinTest, TestFullEqualityJoinWithOneBuildOp)
{
VectorBatch *vecBatch = ConstructSimpleBuildData();
HashBuilderOperatorFactory *hashBuilderFactory = CreateSimpleBuildFactory(1, OMNI_JOIN_TYPE_FULL);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(vecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
VectorBatch *probeVecBatch = ConstructSimpleProbeData();
LookupJoinOperatorFactory *lookupJoinFactory = CreateSimpleProbeFactory(hashBuilderFactory);
auto *lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
auto lookupOuterJoinFactory = CreateSimpleLookupOuterJoinFactory(hashBuilderFactory);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
VectorBatch *expectVecBatch = ConstructSimpleExpectedData();
DataTypes expectTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
EXPECT_EQ(outputVecBatch->GetRowCount(), expectVecBatch->GetRowCount());
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
int64_t expectedData0[1] = {0};
int64_t expectedData1[1] = {70};
auto expectedVec0 = CreateVector(1, expectedData0);
auto expectedVec1 = CreateVector(1, expectedData1);
auto vectorBatch = new VectorBatch(1);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Get(0)->SetNull(0);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
VectorHelper::FreeVecBatch(appendOutput);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
TEST(NativeOmniJoinTest, DISABLED_TestFullEqualityJoinWithTwoBuildOp)
{
VectorBatch **vectorBatches = ConstructSimpleBuildData2();
HashBuilderOperatorFactory *hashBuilderFactory = CreateSimpleBuildFactory(2, OMNI_JOIN_TYPE_FULL);
auto *hashBuilderOperator0 = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator0->AddInput(vectorBatches[0]);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator0->GetOutput(&hashBuildOutput);
auto *hashBuilderOperator1 = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator1->AddInput(vectorBatches[1]);
hashBuilderOperator1->GetOutput(&hashBuildOutput);
VectorBatch *probeVecBatch = ConstructSimpleProbeData();
LookupJoinOperatorFactory *lookupJoinFactory = CreateSimpleProbeFactory(hashBuilderFactory);
auto *lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
auto lookupOuterJoinFactory = CreateSimpleLookupOuterJoinFactory(hashBuilderFactory);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
VectorBatch *expectVecBatch = ConstructSimpleExpectedData();
DataTypes expectTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
EXPECT_EQ(outputVecBatch->GetRowCount(), expectVecBatch->GetRowCount());
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
int64_t expectedData0[1] = {0};
int64_t expectedData1[1] = {70};
auto expectedVec0 = CreateVector(1, expectedData0);
auto expectedVec1 = CreateVector(1, expectedData1);
auto vectorBatch = new VectorBatch(1);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Get(0)->SetNull(0);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
delete[] vectorBatches;
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
VectorHelper::FreeVecBatch(appendOutput);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator0);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator1);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
TEST(NativeOmniJoinTest, TestFullEqualityJoinWithArray)
{
const int32_t dataSize = 5;
DataTypes buildTypes(std::vector<DataTypePtr>({LongType(), LongType()}));
int64_t buildData0[] = {1, 2, 3, 4, 5};
int64_t buildData1[] = {1, 2, 3, 4, 5};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({LongType(), LongType()}));
int64_t probeData0[] = {1, 2, 3, 4, 7};
int64_t probeData1[] = {1, 5, 7, 9, 11};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({LongType(), LongType()}));
auto hashBuilderFactoryAddr = (int64_t) hashBuilderFactory;
auto lookupJoinFactory =
LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount,
probeHashCols,
probeHashColsCount,
buildOutputCols,
buildOutputColsCount,
buildOutputTypes,
hashBuilderFactoryAddr, nullptr, false,
nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
auto lookupOuterJoinFactory =
LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(probeTypes,
probeOutputCols,
probeOutputColsCount,
buildOutputCols,
buildOutputTypes,
hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 5;
DataTypes expectedTypes(std::vector<DataTypePtr>({LongType(), LongType(), LongType(), LongType()}));
int64_t expectedDatas[4][expectedDataSize] = {
{1, 2, 3, 4, 7},
{1, 5, 7, 9, 11},
{1, 5, -1, -1, -1},
{1, 5, -1, -1, -1}};
auto *expectedVecbatch = CreateVectorBatch(expectedTypes, expectedDataSize, expectedDatas[0], expectedDatas[1],
expectedDatas[2], expectedDatas[3]);
expectedVecbatch->Get(2)->SetNull(2);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(2)->SetNull(4);
expectedVecbatch->Get(3)->SetNull(2);
expectedVecbatch->Get(3)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(4);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t expectedDatasSize = 3;
int64_t expectedData0[expectedDatasSize] = {0, 0, 0};
int64_t expectedData1[expectedDatasSize] = {0, 0, 0};
int64_t expectedData2[expectedDatasSize] = {2, 3, 4};
int64_t expectedData3[expectedDatasSize] = {2, 3, 4};
auto expectedVec0 = CreateVector(expectedDatasSize, expectedData0);
auto expectedVec1 = CreateVector(expectedDatasSize, expectedData1);
auto expectedVec2 = CreateVector(expectedDatasSize, expectedData2);
auto expectedVec3 = CreateVector(expectedDatasSize, expectedData3);
auto vectorBatch = new VectorBatch(expectedDatasSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(0)->SetNull(2);
vectorBatch->Get(1)->SetNull(2);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(appendOutput);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
TEST(NativeOmniJoinTest, TestFullEqualityJoin)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t buildData0[] = {1, 2, 3, 4};
int64_t buildData1[] = {111, 11, 333, 33};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t probeData0[] = {1, 2, 3, 4};
int64_t probeData1[] = {11, 22, 33, 44};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
auto lookupOuterJoinFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(probeTypes,
probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ LongType(), LongType(), LongType(), LongType() }));
int64_t expectedDatas[4][expectedDataSize] = {
{1, 2, 3, 4},
{11, 22, 33, 44},
{2, 0, 4, 0},
{11, 0, 33, 0}};
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedDatas[0], expectedDatas[1],
expectedDatas[2], expectedDatas[3]);
expectedVecbatch->Get(2)->SetNull(1);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(1);
expectedVecbatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t expectedDatasSize = 2;
int64_t expectedData0[expectedDatasSize] = {0, 0};
int64_t expectedData1[expectedDatasSize] = {0, 0};
int64_t expectedData2[expectedDatasSize] = {3, 1};
int64_t expectedData3[expectedDatasSize] = {333, 111};
auto expectedVec0 = CreateVector(expectedDatasSize, expectedData0);
auto expectedVec1 = CreateVector(expectedDatasSize, expectedData1);
auto expectedVec2 = CreateVector(expectedDatasSize, expectedData2);
auto expectedVec3 = CreateVector(expectedDatasSize, expectedData3);
auto vectorBatch = new VectorBatch(expectedDatasSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(appendOutput);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
TEST(NativeOmniJoinTest, TestOneKeyFullEqualityJoinWithBuildNull)
{
const int32_t dataSize = 6;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t buildData0[] = {1, 2, 3, 4, 5, 6};
int64_t buildData1[] = {111, 11, 333, 33, 11, 11};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
buildVecBatch->Get(1)->SetNull(0);
buildVecBatch->Get(1)->SetNull(2);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
const int32_t probeDataSize = 4;
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t probeData0[] = {1, 2, 3, 4};
int64_t probeData1[] = {11, 22, 33, 44};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, probeDataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
auto lookupOuterJoinFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(probeTypes,
probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 6;
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ LongType(), LongType(), LongType(), LongType() }));
int64_t expectedDatas[4][expectedDataSize] = {
{1, 1, 1, 2, 3, 4},
{11, 11, 11, 22, 33, 44},
{2, 5, 6, 0, 4, 0},
{11, 11, 11, 0, 33, 0}};
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedDatas[0], expectedDatas[1],
expectedDatas[2], expectedDatas[3]);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(2)->SetNull(5);
expectedVecbatch->Get(3)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(5);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t expectedDatasSize = 2;
int64_t expectedData0[expectedDatasSize] = {0, 0};
int64_t expectedData1[expectedDatasSize] = {0, 0};
int64_t expectedData2[expectedDatasSize] = {1, 3};
int64_t expectedData3[expectedDatasSize] = {111, 333};
auto expectedVec0 = CreateVector(expectedDatasSize, expectedData0);
auto expectedVec1 = CreateVector(expectedDatasSize, expectedData1);
auto expectedVec2 = CreateVector(expectedDatasSize, expectedData2);
auto expectedVec3 = CreateVector(expectedDatasSize, expectedData3);
auto vectorBatch = new VectorBatch(expectedDatasSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
vectorBatch->Get(3)->SetNull(0);
vectorBatch->Get(3)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(appendOutput);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
TEST(NativeOmniJoinTest, TestFixedKeysFullEqualityJoinWithBuildNull)
{
const int32_t dataSize = 6;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t buildData0[] = {1, 1, 3, 3, 5, 6};
int64_t buildData1[] = {111, 11, 333, 33, 11, 11};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
buildVecBatch->Get(1)->SetNull(0);
buildVecBatch->Get(1)->SetNull(2);
int32_t buildJoinCols[2] = {0, 1};
int32_t joinColsCount = 2;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
const int32_t probeDataSize = 4;
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t probeData0[] = {1, 2, 3, 4};
int64_t probeData1[] = {11, 22, 33, 44};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, probeDataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[2] = {0, 1};
int32_t probeHashColsCount = 2;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
auto lookupOuterJoinFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(probeTypes,
probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ LongType(), LongType(), LongType(), LongType() }));
int64_t expectedDatas[4][expectedDataSize] = {
{1, 2, 3, 4},
{11, 22, 33, 44},
{1, 0, 3, 0},
{11, 0, 33, 0}};
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedDatas[0], expectedDatas[1],
expectedDatas[2], expectedDatas[3]);
expectedVecbatch->Get(2)->SetNull(1);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(1);
expectedVecbatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t expectedDatasSize = 4;
int64_t expectedData0[expectedDatasSize] = {0, 0, 0, 0};
int64_t expectedData1[expectedDatasSize] = {0, 0, 0, 0};
int64_t expectedData2[expectedDatasSize] = {1, 3, 5, 6};
int64_t expectedData3[expectedDatasSize] = {111, 333, 11, 11};
auto expectedVec0 = CreateVector(expectedDatasSize, expectedData0);
auto expectedVec1 = CreateVector(expectedDatasSize, expectedData1);
auto expectedVec2 = CreateVector(expectedDatasSize, expectedData2);
auto expectedVec3 = CreateVector(expectedDatasSize, expectedData3);
auto vectorBatch = new VectorBatch(expectedDatasSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(0)->SetNull(2);
vectorBatch->Get(0)->SetNull(3);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
vectorBatch->Get(1)->SetNull(2);
vectorBatch->Get(1)->SetNull(3);
vectorBatch->Get(3)->SetNull(0);
vectorBatch->Get(3)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(appendOutput);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
TEST(NativeOmniJoinTest, TestVariableKeysFullEqualityJoinWithBuildNull)
{
const int32_t dataSize = 8;
DataTypes buildTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
int32_t buildData0[dataSize] = {20, 14, 7, 19, 1, 20, 20, 13};
std::string buildData1[dataSize] = {"35709", "31904", "35709", "31904", "35709", "35709", "35709", "31904"};
auto buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
buildVecBatch->Get(0)->SetNull(1);
buildVecBatch->Get(0)->SetNull(2);
int32_t buildJoinCols[2] = {0, 1};
int32_t joinColsCount = 2;
int32_t operatorCount = 1;
auto hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_FULL,
buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto hashBuilderOperator = static_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
const int32_t probeDataSize = 6;
int32_t probeData0[probeDataSize] = {20, 16, 13, 4, 20, 4};
std::string probeData1[probeDataSize] = {"35709", "35709", "31904", "12477", "31904", "38721"};
auto probeVecBatch = CreateVectorBatch(probeTypes, probeDataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[2] = {0, 1};
int32_t probeHashColsCount = 2;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectDataSize = 8;
int32_t expectData0[expectDataSize] = {20, 20, 20, 16, 13, 4, 20, 4};
std::string expectData1[expectDataSize] = {"35709", "35709", "35709", "35709", "31904", "12477", "31904", "38721"};
int32_t expectData2[expectDataSize] = {20, 20, 20, 0, 13, 0, 0, 0};
std::string expectData3[expectDataSize] = {"35709", "35709", "35709", "", "31904", "", "", ""};
auto expectVecBatch = new VectorBatch(expectDataSize);
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData0));
expectVecBatch->Append(CreateVarcharVector(expectData1, expectDataSize));
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData2));
expectVecBatch->Append(CreateVarcharVector(expectData3, expectDataSize));
expectVecBatch->Get(2)->SetNull(3);
expectVecBatch->Get(2)->SetNull(5);
expectVecBatch->Get(2)->SetNull(6);
expectVecBatch->Get(2)->SetNull(7);
expectVecBatch->Get(3)->SetNull(3);
expectVecBatch->Get(3)->SetNull(5);
expectVecBatch->Get(3)->SetNull(6);
expectVecBatch->Get(3)->SetNull(7);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
auto lookupOuterJoinFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(probeTypes,
probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t expectedDatasSize = 4;
int32_t expectedData0[expectedDatasSize] = {0, 0, 0, 0};
std::string expectedData1[expectedDatasSize] = {"", "", "", ""};
int32_t expectedData2[expectedDatasSize] = {0, 0, 1, 19};
std::string expectedData3[expectedDatasSize] = {"31904", "35709", "35709", "31904"};
auto expectedVec0 = CreateVector(expectedDatasSize, expectedData0);
auto expectedVec1 = CreateVarcharVector(expectedData1, expectedDatasSize);
auto expectedVec2 = CreateVector(expectedDatasSize, expectedData2);
auto expectedVec3 = CreateVarcharVector(expectedData3, expectedDatasSize);
auto vectorBatch = new VectorBatch(expectedDatasSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(0)->SetNull(2);
vectorBatch->Get(0)->SetNull(3);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
vectorBatch->Get(1)->SetNull(2);
vectorBatch->Get(1)->SetNull(3);
vectorBatch->Get(2)->SetNull(0);
vectorBatch->Get(2)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
VectorHelper::FreeVecBatch(appendOutput);
VectorHelper::FreeVecBatch(vectorBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
TEST(NativeOmniJoinTest, TestFullEqualityJoinChar)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3) }));
int64_t buildData0[dataSize] = {1, 2, 3, 4};
std::string buildData1[dataSize] = {"aaa", "11", "ccc", "33"};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(2) }));
int64_t probeData0[dataSize] = {1, 2, 3, 4};
std::string probeData1[dataSize] = {"11", "22", "33", "44"};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupOuterJoinOperatorFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(
probeTypes, probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinOperatorFactory->CreateOperator();
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int64_t expectedData0[expectedDataSize] = {1, 2, 3, 4};
std::string expectedData1[expectedDataSize] = {"11", "22", "33", "44"};
int64_t expectedData2[expectedDataSize] = {2, 0, 4, 0};
std::string expectedData3[expectedDataSize] = {"11", "", "33", ""};
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(2), LongType(), VarcharType(3) }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedData0, expectedData1,
expectedData2, expectedData3);
expectedVecbatch->Get(2)->SetNull(1);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(1);
expectedVecbatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t expectedDatasSize = 2;
int64_t expectedDatas0[expectedDatasSize] = {0, 0};
std::string expectedDatas1[expectedDatasSize] = {"", ""};
int64_t expectedDatas2[expectedDatasSize] = {3, 1};
std::string expectedDatas3[expectedDatasSize] = {"ccc", "aaa"};
auto expectedVec0 = CreateVector(expectedDatasSize, expectedDatas0);
auto expectedVec1 = CreateVarcharVector(expectedDatas1, expectedDatasSize);
auto expectedVec2 = CreateVector(expectedDatasSize, expectedDatas2);
auto expectedVec3 = CreateVarcharVector(expectedDatas3, expectedDatasSize);
auto vectorBatch = new VectorBatch(expectedDatasSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(appendOutput);
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinOperatorFactory);
}
TEST(NativeOmniJoinTest, TestFullEqualityJoinDate32)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), Date32Type(DAY) }));
int64_t buildData0[dataSize] = {1, 2, 3, 4};
int32_t buildData1[dataSize] = {123, 11, 321, 33};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), Date32Type(DAY) }));
int64_t probeData0[dataSize] = {1, 2, 3, 4};
int32_t probeData1[dataSize] = {11, 22, 33, 44};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), Date32Type(DAY) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupOuterJoinOperatorFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(
probeTypes, probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinOperatorFactory->CreateOperator();
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int64_t expectedData0[expectedDataSize] = {1, 2, 3, 4};
int32_t expectedData1[expectedDataSize] = {11, 22, 33, 44};
int64_t expectedData2[expectedDataSize] = {2, 0, 4, 0};
int32_t expectedData3[expectedDataSize] = {11, -1, 33, -1};
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ LongType(), Date32Type(DAY), LongType(), Date32Type(DAY) }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedData0, expectedData1,
expectedData2, expectedData3);
expectedVecbatch->Get(2)->SetNull(1);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(1);
expectedVecbatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t expectedDatasSize = 2;
int64_t expectedDatas0[expectedDatasSize] = {0, 0};
int32_t expectedDatas1[expectedDatasSize] = {0, 0};
int64_t expectedDatas2[expectedDatasSize] = {3, 1};
int32_t expectedDatas3[expectedDatasSize] = {321, 123};
auto expectedVec0 = CreateVector(expectedDatasSize, expectedDatas0);
auto expectedVec1 = CreateVector(expectedDatasSize, expectedDatas1);
auto expectedVec2 = CreateVector(expectedDatasSize, expectedDatas2);
auto expectedVec3 = CreateVector(expectedDatasSize, expectedDatas3);
auto vectorBatch = new VectorBatch(expectedDatasSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
VectorHelper::FreeVecBatch(appendOutput);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinOperatorFactory);
}
TEST(NativeOmniJoinTest, TestFullEqualityJoinDecimal64)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), Decimal64Type(3, 0) }));
int64_t buildData0[dataSize] = {1, 2, 3, 4};
int64_t buildData1[dataSize] = {123, 11, 321, 33};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), Decimal64Type(2, 0) }));
int64_t probeData0[dataSize] = {1, 2, 3, 4};
int64_t probeData1[dataSize] = {11, 22, 33, 44};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), Decimal64Type(3, 0) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupOuterJoinOperatorFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(
probeTypes, probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinOperatorFactory->CreateOperator();
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int64_t expectedData0[expectedDataSize] = {1, 2, 3, 4};
int64_t expectedData1[expectedDataSize] = {11, 22, 33, 44};
int64_t expectedData2[expectedDataSize] = {2, 0, 4, 0};
int64_t expectedData3[expectedDataSize] = {11, -1, 33, -1};
DataTypes expectedDataTypes(
std::vector<DataTypePtr>({ LongType(), Decimal64Type(2, 0), LongType(), Decimal64Type(3, 0) }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedData0, expectedData1,
expectedData2, expectedData3);
expectedVecbatch->Get(2)->SetNull(1);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(1);
expectedVecbatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t expectedDatasSize = 2;
int64_t expectedDatas0[expectedDatasSize] = {0, 0};
int64_t expectedDatas1[expectedDatasSize] = {0, 0};
int64_t expectedDatas2[expectedDatasSize] = {3, 1};
int64_t expectedDatas3[expectedDatasSize] = {321, 123};
auto expectedVec0 = CreateVector(expectedDatasSize, expectedDatas0);
auto expectedVec1 = CreateVector(expectedDatasSize, expectedDatas1);
auto expectedVec2 = CreateVector(expectedDatasSize, expectedDatas2);
auto expectedVec3 = CreateVector(expectedDatasSize, expectedDatas3);
auto vectorBatch = new VectorBatch(expectedDatasSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(appendOutput);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinOperatorFactory);
}
TEST(NativeOmniJoinTest, TestFullEqualityJoinDecimal128)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), Decimal128Type(3, 0) }));
int64_t buildData0[dataSize] = {1, 2, 3, 4};
Decimal128 buildData1[dataSize] = {Decimal128(123, 0), Decimal128(11, 0), Decimal128(321, 0), Decimal128(33, 0)};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), Decimal128Type(2, 0) }));
int64_t probeData0[dataSize] = {1, 2, 3, 4};
Decimal128 probeData1[dataSize] = {Decimal128(11, 0), Decimal128(22, 0), Decimal128(33, 0), Decimal128(44, 0)};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), Decimal128Type(3, 0) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupOuterJoinOperatorFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(
probeTypes, probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinOperatorFactory->CreateOperator();
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int64_t expectedData0[expectedDataSize] = {1, 2, 3, 4};
Decimal128 expectedData1[expectedDataSize] = {Decimal128(11, 0), Decimal128(22, 0), Decimal128(33, 0),
Decimal128(44, 0)};
int64_t expectedData2[expectedDataSize] = {2, 0, 4, 0};
Decimal128 expectedData3[expectedDataSize] = {Decimal128(11, 0), Decimal128(0, 0), Decimal128(33, 0),
Decimal128(0, 0)};
DataTypes expectedDataTypes(
std::vector<DataTypePtr>({ LongType(), Decimal128Type(2, 0), LongType(), Decimal128Type(3, 0) }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedData0, expectedData1,
expectedData2, expectedData3);
expectedVecbatch->Get(2)->SetNull(1);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(1);
expectedVecbatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t expectedDatasSize = 2;
int64_t expectedDatas0[expectedDatasSize] = {0, 0};
Decimal128 expectedDatas1[expectedDatasSize] = {0, 0};
int64_t expectedDatas2[expectedDatasSize] = {1, 3};
Decimal128 expectedDatas3[expectedDatasSize] = {Decimal128(123, 0), Decimal128(321, 0)};
auto expectedVec0 = CreateVector(expectedDatasSize, expectedDatas0);
auto expectedVec1 = CreateVector(expectedDatasSize, expectedDatas1);
auto expectedVec2 = CreateVector(expectedDatasSize, expectedDatas2);
auto expectedVec3 = CreateVector(expectedDatasSize, expectedDatas3);
auto vectorBatch = new VectorBatch(expectedDatasSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(appendOutput);
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinOperatorFactory);
}
TEST(NativeOmniJoinTest, TestFullEqualityJoinShort)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), ShortType() }));
int64_t buildData0[dataSize] = {1, 2, 3, 4};
int16_t buildData1[dataSize] = {123, 11, 321, 33};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), ShortType() }));
int64_t probeData0[dataSize] = {1, 2, 3, 4};
int16_t probeData1[dataSize] = {11, 22, 33, 44};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), ShortType() }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
auto lookupOuterJoinOperatorFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(
probeTypes, probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinOperatorFactory->CreateOperator();
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int64_t expectedData0[expectedDataSize] = {1, 2, 3, 4};
int16_t expectedData1[expectedDataSize] = {11, 22, 33, 44};
int64_t expectedData2[expectedDataSize] = {2, 0, 4, 0};
int16_t expectedData3[expectedDataSize] = {11, -1, 33, -1};
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ LongType(), ShortType(), LongType(), ShortType() }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedData0, expectedData1,
expectedData2, expectedData3);
expectedVecbatch->Get(2)->SetNull(1);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(1);
expectedVecbatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t expectedDatasSize = 2;
int64_t expectedDatas0[expectedDatasSize] = {0, 0};
int16_t expectedDatas1[expectedDatasSize] = {0, 0};
int64_t expectedDatas2[expectedDatasSize] = {3, 1};
int16_t expectedDatas3[expectedDatasSize] = {321, 123};
auto expectedVec0 = CreateVector(expectedDatasSize, expectedDatas0);
auto expectedVec1 = CreateVector(expectedDatasSize, expectedDatas1);
auto expectedVec2 = CreateVector(expectedDatasSize, expectedDatas2);
auto expectedVec3 = CreateVector(expectedDatasSize, expectedDatas3);
auto vectorBatch = new VectorBatch(expectedDatasSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(appendOutput);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinOperatorFactory);
}
TEST(NativeOmniJoinTest, TestFullEqualityJoinDictionary)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t buildData0[] = {1, 2, 3, 4};
int64_t buildData1[] = {111, 11, 333, 33};
auto *buildVecBatch = new VectorBatch(dataSize);
buildVecBatch->Append(CreateVector(dataSize, buildData0));
const DataTypePtr &dataType = buildTypes.GetType(1);
int32_t ids[] = {0, 1, 2, 3};
buildVecBatch->Append(CreateDictionaryVector(*dataType, dataSize, ids, dataSize, buildData1));
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t probeData0[] = {1, 2, 3, 4};
int64_t probeData1[] = {11, 22, 33, 44};
auto *probeVecBatch = new VectorBatch(dataSize);
probeVecBatch->Append(CreateVector(dataSize, probeData0));
const DataTypePtr &probeDataType = probeTypes.GetType(1);
probeVecBatch->Append(CreateDictionaryVector(*probeDataType, dataSize, ids, dataSize, probeData1));
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
auto lookupOuterJoinFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(probeTypes,
probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int64_t expectedDatas[4][expectedDataSize] = {
{1, 2, 3, 4},
{11, 22, 33, 44},
{2, 0, 4, 0},
{11, 0, 33, 0}};
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ LongType(), LongType(), LongType(), LongType() }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedDatas[0], expectedDatas[1],
expectedDatas[2], expectedDatas[3]);
expectedVecbatch->Get(2)->SetNull(1);
expectedVecbatch->Get(2)->SetNull(3);
expectedVecbatch->Get(3)->SetNull(1);
expectedVecbatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t expectedDatasSize = 2;
int64_t expectedDatas0[expectedDatasSize] = {0, 0};
int64_t expectedDatas1[expectedDatasSize] = {0, 0};
int64_t expectedDatas2[expectedDatasSize] = {3, 1};
int64_t expectedDatas3[expectedDatasSize] = {333, 111};
auto expectedVec0 = CreateVector(expectedDatasSize, expectedDatas0);
auto expectedVec1 = CreateVector(expectedDatasSize, expectedDatas1);
auto expectedVec2 = CreateVector(expectedDatasSize, expectedDatas2);
auto expectedVec3 = CreateVector(expectedDatasSize, expectedDatas3);
auto vectorBatch = new VectorBatch(expectedDatasSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
VectorHelper::FreeVecBatch(appendOutput);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
TEST(NativeOmniJoinTest, TestFullEqualityJoinHasOutputNulls)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3) }));
int64_t buildData0[dataSize] = {1, 0, 3, 0};
std::string buildData1[dataSize] = {"aaa", "11", "ccc", "33"};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
buildVecBatch->Get(0)->SetNull(1);
buildVecBatch->Get(0)->SetNull(3);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(2) }));
int64_t probeData0[dataSize] = {0, 2, 0, 4};
std::string probeData1[dataSize] = {"11", "22", "33", "44"};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
probeVecBatch->Get(0)->SetNull(0);
probeVecBatch->Get(0)->SetNull(2);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
LookupJoinOperatorFactory *lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(
probeTypes, probeOutputCols, probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols,
buildOutputColsCount, buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto *lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
auto lookupOuterJoinFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(probeTypes,
probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int32_t ids[4] = {0, 1, 2, 3};
DataTypes expectedTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(2), LongType(), VarcharType(3) }));
int64_t expectedData0[expectedDataSize] = {0, 2, 0, 4};
std::string expectedData1[expectedDataSize] = {"11", "22", "33", "44"};
int64_t expectedData2[expectedDataSize] = {0, 0, 0, 0};
std::string expectedData3[expectedDataSize] = {"11", "a", "33", "a"};
auto expectedVector2 = CreateVector(expectedDataSize, expectedData2);
expectedVector2->SetNull(0);
expectedVector2->SetNull(1);
expectedVector2->SetNull(2);
expectedVector2->SetNull(3);
auto expectedVector3 = CreateVarcharVector(expectedData3, expectedDataSize);
auto expectedVecBatch = new VectorBatch(expectedDataSize);
expectedVecBatch->Append(
CreateDictionaryVector(*expectedTypes.GetType(0), expectedDataSize, ids, expectedDataSize, expectedData0));
expectedVecBatch->Append(
CreateDictionaryVector(*expectedTypes.GetType(1), expectedDataSize, ids, expectedDataSize, expectedData1));
expectedVecBatch->Append(expectedVector2);
expectedVecBatch->Append(expectedVector3);
expectedVecBatch->Get(0)->SetNull(0);
expectedVecBatch->Get(0)->SetNull(2);
expectedVecBatch->Get(3)->SetNull(1);
expectedVecBatch->Get(3)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecBatch));
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t expectedDatasSize = 2;
int64_t expectedDatas0[expectedDatasSize] = {0, 0};
std::string expectedDatas1[expectedDatasSize] = {"", ""};
int64_t expectedDatas2[expectedDatasSize] = {3, 1};
std::string expectedDatas3[expectedDatasSize] = {"ccc", "aaa"};
auto expectedVec0 = CreateVector(expectedDatasSize, expectedDatas0);
auto expectedVec1 = CreateVarcharVector(expectedDatas1, expectedDatasSize);
auto expectedVec2 = CreateVector(expectedDatasSize, expectedDatas2);
auto expectedVec3 = CreateVarcharVector(expectedDatas3, expectedDatasSize);
auto vectorBatch = new VectorBatch(expectedDatasSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecBatch);
VectorHelper::FreeVecBatch(appendOutput);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftSemiEqualityJoin)
{
const int32_t dataSize = 8;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3) }));
int64_t buildData0[dataSize] = {1, 0, 3, 0, 1, 0, 2, 2};
std::string buildData1[dataSize] = {"aaa", "11", "ccc", "33", "aaa", "11", "ccc", "33"};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
buildVecBatch->Get(0)->SetNull(1);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_LEFT_SEMI, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(2) }));
int64_t probeData0[dataSize] = {0, 2, 0, 1, 0, 2, 0, 4};
std::string probeData1[dataSize] = {"11", "22", "33", "44", "11", "22", "33", "44"};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
probeVecBatch->Get(0)->SetNull(2);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[0] = {};
int32_t buildOutputColsCount = 0;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
LookupJoinOperatorFactory *lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(
probeTypes, probeOutputCols, probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols,
buildOutputColsCount, buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto *lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 6;
int64_t expectedData0[expectedDataSize] = {0, 2, 1, 0, 2, 0};
std::string expectedData1[expectedDataSize] = {"11", "22", "44", "11", "22", "33"};
auto expectedVector0 = CreateVector<int64_t>(expectedDataSize, expectedData0);
auto expectedVector1 = CreateVarcharVector(expectedData1, expectedDataSize);
auto expectedVecBatch = new VectorBatch(expectedDataSize);
expectedVecBatch->Append(expectedVector0);
expectedVecBatch->Append(expectedVector1);
DataTypes expectedTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(2) }));
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecBatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftSemiEqualityJoinWithOddDataSize)
{
const int32_t dataSize = 7;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3) }));
int64_t buildData0[dataSize] = {1, 0, 3, 0, 1, 0, 2};
std::string buildData1[dataSize] = {"aaa", "11", "ccc", "33", "aaa", "11", "ccc"};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
buildVecBatch->Get(0)->SetNull(1);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_LEFT_SEMI, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(2) }));
int64_t probeData0[dataSize] = {0, 2, 0, 1, 0, 2, 0};
std::string probeData1[dataSize] = {"11", "22", "33", "44", "11", "22", "33"};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
probeVecBatch->Get(0)->SetNull(2);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[0] = {};
int32_t buildOutputColsCount = 0;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
LookupJoinOperatorFactory *lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(
probeTypes, probeOutputCols, probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols,
buildOutputColsCount, buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto *lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 6;
int64_t expectedData0[expectedDataSize] = {0, 2, 1, 0, 2, 0};
std::string expectedData1[expectedDataSize] = {"11", "22", "44", "11", "22", "33"};
auto expectedVector0 = CreateVector<int64_t>(expectedDataSize, expectedData0);
auto expectedVector1 = CreateVarcharVector(expectedData1, expectedDataSize);
auto expectedVecBatch = new VectorBatch(expectedDataSize);
expectedVecBatch->Append(expectedVector0);
expectedVecBatch->Append(expectedVector1);
DataTypes expectedTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(2) }));
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecBatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftSemiEqualityJoinWithCharFilter)
{
const int32_t dataSize = 10;
DataTypes buildTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
int32_t buildData0[dataSize] = {19, 14, 7, 19, 1, 20, 10, 13, 20, 16};
std::string buildData1[dataSize] = {"35709", "31904", "35709", "31904", "35709", "31904", "35709", "31904",
"35709", "31904"};
auto buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
string filterExpression = "$operator$NOT_EQUAL:4(substr:15(#1, 1:1, 5:1), substr:15(#3, 1:1, 5:1))";
auto hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_LEFT_SEMI,
buildTypes, buildJoinCols, joinColsCount, operatorCount);
omniruntime::expressions::Expr *joinFilter = CreateJoinFilterExprWithChar();
auto hashBuilderOperator = static_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
int32_t probeData0[dataSize] = {20, 16, 13, 4, 20, 4, 22, 19, 20, 7};
std::string probeData1[dataSize] = {"35709", "35709", "31904", "12477", "31904", "38721", "90419", "35709",
"88371", "35709"};
auto probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
static_cast<Vector<std::string_view> *>(probeVecBatch->Get(1))->SetNull(dataSize - 1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[0] = {};
int32_t buildOutputColsCount = 0;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, joinFilter, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectDataSize = 5;
int32_t expectData0[expectDataSize] = {20, 16, 20, 19, 20};
std::string expectData1[expectDataSize] = {"35709", "35709", "31904", "35709", "88371"};
auto expectVecBatch = new VectorBatch(expectDataSize);
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData0));
expectVecBatch->Append(CreateVarcharVector(expectData1, expectDataSize));
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
Expr::DeleteExprs({ joinFilter });
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestRightEqualityJoin)
{
const int32_t dataSize = 8;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3) }));
int64_t buildData0[dataSize] = {1, 0, 3, 0, 1, 0, 2, 2};
std::string buildData1[dataSize] = {"aaa", "11", "ccc", "33", "aa", "11", "ccc", "33"};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_RIGHT, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(2) }));
const int32_t probeDataSize = 4;
int64_t probeData0[probeDataSize] = {0, 2, 0, 1};
std::string probeData1[probeDataSize] = {"11", "22", "33", "44"};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, probeDataSize, probeData0, probeData1);
probeVecBatch->Get(0)->SetNull(2);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(3) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
LookupJoinOperatorFactory *lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(
probeTypes, probeOutputCols, probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols,
buildOutputColsCount, buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto *lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 8;
int64_t expectedData0[expectedDataSize] = {0, 0, 0, 2, 2, 1, 1, 0};
std::string expectedData1[expectedDataSize] = {"11", "11", "11", "22", "22", "44", "44", "33"};
int64_t expectedData2[expectedDataSize] = {0, 0, 0, 2, 2, 1, 1, 0};
std::string expectedData3[expectedDataSize] = {"11", "33", "11", "ccc", "33", "aaa", "aa", ""};
auto expectedVector0 = CreateVector<int64_t>(expectedDataSize, expectedData0);
expectedVector0->SetNull(7);
auto expectedVector1 = CreateVarcharVector(expectedData1, expectedDataSize);
auto expectedVector2 = CreateVector<int64_t>(expectedDataSize, expectedData2);
expectedVector2->SetNull(7);
auto expectedVector3 = CreateVarcharVector(expectedData3, expectedDataSize);
expectedVector3->SetNull(7);
auto expectedVecBatch = new VectorBatch(expectedDataSize);
expectedVecBatch->Append(expectedVector0);
expectedVecBatch->Append(expectedVector1);
expectedVecBatch->Append(expectedVector2);
expectedVecBatch->Append(expectedVector3);
DataTypes expectedTypes(std::vector<DataTypePtr>({ LongType(), VarcharType(2), LongType(), VarcharType(3) }));
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecBatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestRightEqualityJoinWithCharFilter)
{
const int32_t dataSize = 10;
DataTypes buildTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
int32_t buildData0[dataSize] = {19, 14, 7, 19, 1, 20, 10, 13, 20, 16};
std::string buildData1[dataSize] = {"35709", "31904", "35709", "31904", "35709", "31904", "35709", "31904",
"35709", "31904"};
auto buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
string filterExpression = "$operator$NOT_EQUAL:4(substr:15(#1, 1:1, 5:1), substr:15(#3, 1:1, 5:1))";
auto hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(OMNI_JOIN_TYPE_RIGHT,
buildTypes, buildJoinCols, joinColsCount, operatorCount);
omniruntime::expressions::Expr *joinFilter = CreateJoinFilterExprWithChar();
auto hashBuilderOperator = static_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
int32_t probeData0[dataSize] = {20, 16, 13, 4, 20, 4, 22, 19, 20, 7};
std::string probeData1[dataSize] = {"35709", "35709", "31904", "12477", "31904", "38721", "90419", "35709",
"88371", "35709"};
auto probeVecBatch = CreateVectorBatch(probeTypes, dataSize, probeData0, probeData1);
static_cast<Vector<std::string_view> *>(probeVecBatch->Get(1))->SetNull(dataSize - 1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ IntType(), VarcharType(5) }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, joinFilter, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectDataSize = 11;
int32_t expectData0[expectDataSize] = {20, 16, 13, 20, 19, 20, 20, 7, 4, 4, 22};
std::string expectData1[expectDataSize] = {"35709", "35709", "31904", "31904", "35709",
"88371", "88371", "", "12477", "38721", "90419"};
int32_t expectData2[expectDataSize] = {20, 16, 0, 20, 19, 20, 20, 0, 0, 0, 0};
std::string expectData3[expectDataSize] = {"31904", "31904", "", "35709", "31904", "31904", "35709", "", "", "",
""};
auto expectVecBatch = new VectorBatch(expectDataSize);
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData0));
expectVecBatch->Append(CreateVarcharVector(expectData1, expectDataSize));
expectVecBatch->Append(CreateVector<int32_t>(expectDataSize, expectData2));
expectVecBatch->Append(CreateVarcharVector(expectData3, expectDataSize));
static_cast<Vector<std::string_view> *>(expectVecBatch->Get(1))->SetNull(7);
auto nullIndex = {2, 7, 8, 9, 10};
for (auto idx : nullIndex) {
expectVecBatch->Get(2)->SetNull(idx);
static_cast<Vector<std::string_view> *>(expectVecBatch->Get(3))->SetNull(idx);
}
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectVecBatch));
Expr::DeleteExprs({ joinFilter });
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftAntiEqualityJoin)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t buildData0[] = {1, 2, 3, 4};
int64_t buildData1[] = {111, 11, 333, 33};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_LEFT_ANTI, buildTypes, buildJoinCols, joinColsCount, operatorCount);
HashBuilderOperator *hashBuilderOperator =
dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
int32_t probeDataSize = 6;
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t probeData0[] = {1, 2, 3, 4, 3, 6};
int64_t probeData1[] = {11, 22, 33, 44, 22, 44};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, probeDataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[0] = {};
int32_t buildOutputColsCount = 0;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({}));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int64_t expectedDatas[2][expectedDataSize] = {{2, 4, 3, 6}, {22, 44, 22, 44}};
AssertVecBatchEquals(outputVecBatch, probeTypes.GetSize(), expectedDataSize, expectedDatas[0], expectedDatas[1]);
VectorHelper::FreeVecBatch(outputVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftAntiEqualityJoinWithOddProbeDataSize)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t buildData0[] = {1, 2, 3, 4};
int64_t buildData1[] = {111, 11, 333, 33};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_LEFT_ANTI, buildTypes, buildJoinCols, joinColsCount, operatorCount);
HashBuilderOperator *hashBuilderOperator =
dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
int32_t probeDataSize = 7;
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t probeData0[] = {1, 2, 3, 4, 3, 6, 7};
int64_t probeData1[] = {11, 22, 33, 44, 22, 44, 77};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, probeDataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[0] = {};
int32_t buildOutputColsCount = 0;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({}));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory =
LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols,
probeHashColsCount, buildOutputCols,
buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr,
nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 5;
int64_t expectedDatas[2][expectedDataSize] = {{2, 4, 3, 6, 7}, {22, 44, 22, 44, 77}};
AssertVecBatchEquals(outputVecBatch, probeTypes.GetSize(), expectedDataSize, expectedDatas[0], expectedDatas[1]);
VectorHelper::FreeVecBatch(outputVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestInnerEqualityJoinOfArrayTable)
{
const int32_t dataSize = 6;
DataTypes buildTypes(std::vector<DataTypePtr>({ ShortType(), ShortType() }));
int16_t buildData0[] = {1, 2, 3, 4, 1, 2};
int16_t buildData1[] = {111, 11, 333, 33, 20, 16};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_INNER, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ ShortType(), ShortType() }));
int16_t probeData0[] = {1, 2, 3, 4};
int16_t probeData1[] = {11, 22, 33, 44};
const int32_t probeDataSize = 4;
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, probeDataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ ShortType(), ShortType() }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 6;
int16_t expectedDatas[4][expectedDataSize] = {
{1, 1, 2, 2, 3, 4},
{11, 11, 22, 22, 33, 44},
{1, 1, 2, 2, 3, 4},
{111, 20, 11, 16, 333, 33}};
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ ShortType(), ShortType(), ShortType(), ShortType() }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedDatas[0], expectedDatas[1],
expectedDatas[2], expectedDatas[3]);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestLeftAntiEqualityJoinWithArrayProbe)
{
const int32_t dataSize = 7;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t buildData0[] = {1, 2, 3, 4, 6, 7, 8};
int64_t buildData1[] = {2, 11, 7, 3, 4, 5, 5, 6};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_LEFT_ANTI, buildTypes, buildJoinCols, joinColsCount, operatorCount);
HashBuilderOperator *hashBuilderOperator =
dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
int32_t probeDataSize = 7;
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t probeData0[] = {1, 2, 3, 4, 3, 6, 7};
int64_t probeData1[] = {28, 2, 3, 10, 0, 15, 8};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, probeDataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[0] = {};
int32_t buildOutputColsCount = 0;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({}));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory =
LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols,
probeHashColsCount, buildOutputCols,
buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr,
nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 5;
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t expectedDatas[2][expectedDataSize] = {{1, 4, 3, 6, 7}, {28, 10, 0, 15, 8}};
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedDatas[0], expectedDatas[1]);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestFullEqualityJoinOfArrayTable)
{
const int32_t dataSize = 6;
DataTypes buildTypes(std::vector<DataTypePtr>({ ShortType(), ShortType() }));
int16_t buildData0[] = {1, 2, 3, 4, 1, 2};
int16_t buildData1[] = {111, 11, 333, 33, 20, 16};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
buildVecBatch->Get(0)->SetNull(4);
buildVecBatch->Get(0)->SetNull(5);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ ShortType(), ShortType() }));
int16_t probeData0[] = {1, 2, 3, 4};
int16_t probeData1[] = {11, 22, 33, 44};
const int32_t probeDataSize = 4;
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, probeDataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ ShortType(), ShortType() }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
int16_t expectedDatas[4][expectedDataSize] = {
{1, 2, 3, 4},
{11, 22, 33, 44},
{1, 2, 3, 4},
{111, 11, 333, 33}};
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ ShortType(), ShortType(), ShortType(), ShortType() }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedDatas[0], expectedDatas[1],
expectedDatas[2], expectedDatas[3]);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
auto lookupOuterJoinFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(probeTypes,
probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t remainSlotSize = 2;
int16_t expectedData0[remainSlotSize] = {0, 0};
int16_t expectedData1[remainSlotSize] = {0, 0};
int16_t expectedData2[remainSlotSize] = {0, 0};
int16_t expectedData3[remainSlotSize] = {20, 16};
auto expectedVec0 = CreateVector(remainSlotSize, expectedData0);
auto expectedVec1 = CreateVector(remainSlotSize, expectedData1);
auto expectedVec2 = CreateVector(remainSlotSize, expectedData2);
auto expectedVec3 = CreateVector(remainSlotSize, expectedData3);
auto vectorBatch = new VectorBatch(remainSlotSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
vectorBatch->Get(2)->SetNull(0);
vectorBatch->Get(2)->SetNull(1);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(appendOutput);
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
TEST(NativeOmniJoinTest, TestFullEqualityJoinOfArrayTableWithOddDataSize)
{
const int32_t dataSize = 7;
DataTypes buildTypes(std::vector<DataTypePtr>({ShortType(), ShortType()}));
int16_t buildData0[] = {1, 2, 3, 4, 1, 2, 8};
int16_t buildData1[] = {111, 11, 333, 33, 20, 16, 88};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
buildVecBatch->Get(0)->SetNull(4);
buildVecBatch->Get(0)->SetNull(5);
int32_t buildJoinCols[1] = {0};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(hashBuilderFactory->CreateOperator());
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
DataTypes probeTypes(std::vector<DataTypePtr>({ShortType(), ShortType()}));
int16_t probeData0[] = {1, 2, 3, 4, 5, 6, 7};
int16_t probeData1[] = {11, 22, 33, 44, 55, 66, 77};
const int32_t probeDataSize = 7;
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, probeDataSize, probeData0, probeData1);
int32_t probeOutputCols[2] = {0, 1};
int32_t probeOutputColsCount = 2;
int32_t probeHashCols[1] = {0};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[2] = {0, 1};
int32_t buildOutputColsCount = 2;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ShortType(), ShortType()}));
auto hashBuilderFactoryAddr = (int64_t) hashBuilderFactory;
auto lookupJoinFactory =
LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount,
probeHashCols,
probeHashColsCount,
buildOutputCols,
buildOutputColsCount,
buildOutputTypes,
hashBuilderFactoryAddr, nullptr, false,
nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(lookupJoinFactory->CreateOperator());
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 7;
int16_t expectedDatas[4][expectedDataSize] = {
{1, 2, 3, 4, 5, 6, 7},
{11, 22, 33, 44, 55, 66, 77},
{1, 2, 3, 4, -1, -1, -1},
{111, 11, 333, 33, -1, -1, -1}};
DataTypes expectedDataTypes(std::vector<DataTypePtr>({ ShortType(), ShortType(), ShortType(), ShortType() }));
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedDatas[0], expectedDatas[1],
expectedDatas[2], expectedDatas[3]);
expectedVecbatch->Get(2)->SetNull(4);
expectedVecbatch->Get(2)->SetNull(5);
expectedVecbatch->Get(2)->SetNull(6);
expectedVecbatch->Get(3)->SetNull(4);
expectedVecbatch->Get(3)->SetNull(5);
expectedVecbatch->Get(3)->SetNull(6);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
auto lookupOuterJoinFactory =
LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(probeTypes,
probeOutputCols,
probeOutputColsCount,
buildOutputCols,
buildOutputTypes,
hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
VectorBatch *appendOutput;
lookupOuterJoinOperator->GetOutput(&appendOutput);
const int32_t remainSlotSize = 3;
int16_t expectedData0[remainSlotSize] = {0, 0, 0};
int16_t expectedData1[remainSlotSize] = {0, 0, 0};
int16_t expectedData2[remainSlotSize] = {8, 0, 0};
int16_t expectedData3[remainSlotSize] = {88, 20, 16};
auto expectedVec0 = CreateVector(remainSlotSize, expectedData0);
auto expectedVec1 = CreateVector(remainSlotSize, expectedData1);
auto expectedVec2 = CreateVector(remainSlotSize, expectedData2);
auto expectedVec3 = CreateVector(remainSlotSize, expectedData3);
auto vectorBatch = new VectorBatch(remainSlotSize);
vectorBatch->Append(expectedVec0);
vectorBatch->Append(expectedVec1);
vectorBatch->Append(expectedVec2);
vectorBatch->Append(expectedVec3);
vectorBatch->Get(0)->SetNull(0);
vectorBatch->Get(0)->SetNull(1);
vectorBatch->Get(0)->SetNull(2);
vectorBatch->Get(1)->SetNull(0);
vectorBatch->Get(1)->SetNull(1);
vectorBatch->Get(1)->SetNull(2);
vectorBatch->Get(2)->SetNull(1);
vectorBatch->Get(2)->SetNull(2);
EXPECT_TRUE(VecBatchMatch(appendOutput, vectorBatch));
VectorHelper::FreeVecBatch(outputVecBatch);
VectorHelper::FreeVecBatch(appendOutput);
VectorHelper::FreeVecBatch(vectorBatch);
VectorHelper::FreeVecBatch(expectedVecbatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
TEST(NativeOmniJoinTest, TestExistenceEqualityJoin)
{
const int32_t dataSize = 4;
DataTypes buildTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t buildData0[] = {1, 2, 3, 4};
int64_t buildData1[] = {111, 11, 333, 33};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1);
int32_t buildJoinCols[1] = {1};
int32_t joinColsCount = 1;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_EXISTENCE, buildTypes, buildJoinCols, joinColsCount, operatorCount);
HashBuilderOperator *hashBuilderOperator =
dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
int32_t probeDataSize = 6;
DataTypes probeTypes(std::vector<DataTypePtr>({ LongType(), LongType() }));
int64_t probeData0[] = {1, 2, 3, 4, 3, 6};
int64_t probeData1[] = {11, 22, 33, 44, 22, 44};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, probeDataSize, probeData0, probeData1);
int32_t probeOutputCols[0] = {};
int32_t probeOutputColsCount = 0;
int32_t probeHashCols[1] = {1};
int32_t probeHashColsCount = 1;
int32_t buildOutputCols[0] = {};
int32_t buildOutputColsCount = 0;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({}));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(
probeTypes, probeOutputCols, probeOutputColsCount, probeHashCols, probeHashColsCount,
buildOutputCols, buildOutputColsCount, buildOutputTypes, hashBuilderFactoryAddr,
nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 6;
bool expectedDatas[1][expectedDataSize] = {{true, false, true, false, false, false}};
AssertVecBatchEquals(outputVecBatch, 1, expectedDataSize, expectedDatas[0]);
VectorHelper::FreeVecBatch(outputVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory);
}
TEST(NativeOmniJoinTest, TestMultiKeysFullEqualityJoinInteger)
{
const int32_t dataSize = 6;
DataTypes buildTypes(std::vector<DataTypePtr>({ ShortType(), IntType(), LongType() }));
int16_t buildData0[] = {1, 2, 3, 4, 5, 6};
int32_t buildData1[] = {11, 22, 33, 44, 55, 66};
int64_t buildData2[] = {111, 222, 333, 444, 555, 666};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1, buildData2);
int32_t buildJoinCols[3] = {0, 1, 2};
int32_t joinColsCount = 3;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
const int32_t probeDataSize = 4;
DataTypes probeTypes(std::vector<DataTypePtr>({ ShortType(), IntType(), LongType() }));
int16_t probeData0[] = {1, 3, 5, 7};
int32_t probeData1[] = {11, 33, 55, 77};
int64_t probeData2[] = {111, 333, 555, 777};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, probeDataSize, probeData0, probeData1, probeData2);
int32_t probeOutputCols[3] = {0, 1, 2};
int32_t probeOutputColsCount = 3;
int32_t probeHashCols[3] = {0, 1, 2};
int32_t probeHashColsCount = 3;
int32_t buildOutputCols[3] = {0, 1, 2};
int32_t buildOutputColsCount = 3;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ ShortType(), IntType(), LongType() }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
auto lookupOuterJoinFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(probeTypes,
probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
DataTypes expectedDataTypes(std::vector<DataTypePtr>(
{ ShortType(), IntType(), LongType(), ShortType(), IntType(), LongType() }));
int16_t expectedDatas0[expectedDataSize] = {1, 3, 5, 7};
int32_t expectedDatas1[expectedDataSize] = {11, 33, 55, 77};
int64_t expectedDatas2[expectedDataSize] = {111, 333, 555, 777};
int16_t expectedDatas3[expectedDataSize] = {1, 3, 5, 0};
int32_t expectedDatas4[expectedDataSize] = {11, 33, 55, 0};
int64_t expectedDatas5[expectedDataSize] = {111, 333, 555, 0};
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedDatas0, expectedDatas1,
expectedDatas2, expectedDatas3, expectedDatas4, expectedDatas5);
expectedVecbatch->Get(3)->SetNull(3);
expectedVecbatch->Get(4)->SetNull(3);
expectedVecbatch->Get(5)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorHelper::FreeVecBatch(expectedVecbatch);
VectorHelper::FreeVecBatch(outputVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
TEST(NativeOmniJoinTest, TestMultiKeysFullEqualityJoinDecimal64)
{
const int32_t dataSize = 6;
DataTypes buildTypes(std::vector<DataTypePtr>({ ShortType(), IntType(), Decimal64Type() }));
int16_t buildData0[] = {1, 2, 3, 4, 5, 6};
int32_t buildData1[] = {11, 22, 33, 44, 55, 66};
double buildData2[] = {1.11, 2.22, 3.33, 4.44, 5.55, 6.66};
VectorBatch *buildVecBatch = CreateVectorBatch(buildTypes, dataSize, buildData0, buildData1, buildData2);
int32_t buildJoinCols[3] = {0, 1, 2};
int32_t joinColsCount = 3;
int32_t operatorCount = 1;
HashBuilderOperatorFactory *hashBuilderFactory = HashBuilderOperatorFactory::CreateHashBuilderOperatorFactory(
OMNI_JOIN_TYPE_FULL, buildTypes, buildJoinCols, joinColsCount, operatorCount);
auto *hashBuilderOperator = dynamic_cast<HashBuilderOperator *>(CreateTestOperator(hashBuilderFactory));
hashBuilderOperator->AddInput(buildVecBatch);
VectorBatch *hashBuildOutput = nullptr;
hashBuilderOperator->GetOutput(&hashBuildOutput);
const int32_t probeDataSize = 4;
DataTypes probeTypes(std::vector<DataTypePtr>({ ShortType(), IntType(), Decimal64Type() }));
int16_t probeData0[] = {1, 3, 5, 7};
int32_t probeData1[] = {11, 33, 55, 77};
double probeData2[] = {1.11, 3.33, 5.55, 7.77};
VectorBatch *probeVecBatch = CreateVectorBatch(probeTypes, probeDataSize, probeData0, probeData1, probeData2);
int32_t probeOutputCols[3] = {0, 1, 2};
int32_t probeOutputColsCount = 3;
int32_t probeHashCols[3] = {0, 1, 2};
int32_t probeHashColsCount = 3;
int32_t buildOutputCols[3] = {0, 1, 2};
int32_t buildOutputColsCount = 3;
DataTypes buildOutputTypes(std::vector<DataTypePtr>({ ShortType(), IntType(), Decimal64Type() }));
auto hashBuilderFactoryAddr = (int64_t)hashBuilderFactory;
auto lookupJoinFactory = LookupJoinOperatorFactory::CreateLookupJoinOperatorFactory(probeTypes, probeOutputCols,
probeOutputColsCount, probeHashCols, probeHashColsCount, buildOutputCols, buildOutputColsCount,
buildOutputTypes, hashBuilderFactoryAddr, nullptr, false, nullptr);
auto lookupJoinOperator = dynamic_cast<LookupJoinOperator *>(CreateTestOperator(lookupJoinFactory));
auto lookupOuterJoinFactory = LookupOuterJoinOperatorFactory::CreateLookupOuterJoinOperatorFactory(probeTypes,
probeOutputCols, probeOutputColsCount, buildOutputCols, buildOutputTypes, hashBuilderFactoryAddr);
auto lookupOuterJoinOperator = lookupOuterJoinFactory->CreateOperator();
lookupJoinOperator->AddInput(probeVecBatch);
VectorBatch *outputVecBatch = nullptr;
lookupJoinOperator->GetOutput(&outputVecBatch);
const int32_t expectedDataSize = 4;
DataTypes expectedDataTypes(std::vector<DataTypePtr>(
{ ShortType(), IntType(), Decimal64Type(), ShortType(), IntType(), Decimal64Type() }));
int16_t expectedDatas0[expectedDataSize] = {1, 3, 5, 7};
int32_t expectedDatas1[expectedDataSize] = {11, 33, 55, 77};
double expectedDatas2[expectedDataSize] = {1.11, 3.33, 5.55, 7.77};
int16_t expectedDatas3[expectedDataSize] = {1, 3, 5, 0};
int32_t expectedDatas4[expectedDataSize] = {11, 33, 55, 0};
double expectedDatas5[expectedDataSize] = {1.11, 3.33, 5.55, 0};
auto *expectedVecbatch = CreateVectorBatch(expectedDataTypes, expectedDataSize, expectedDatas0, expectedDatas1,
expectedDatas2, expectedDatas3, expectedDatas4, expectedDatas5);
expectedVecbatch->Get(3)->SetNull(3);
expectedVecbatch->Get(4)->SetNull(3);
expectedVecbatch->Get(5)->SetNull(3);
EXPECT_TRUE(VecBatchMatchIgnoreOrder(outputVecBatch, expectedVecbatch));
VectorHelper::FreeVecBatch(expectedVecbatch);
VectorHelper::FreeVecBatch(outputVecBatch);
omniruntime::op::Operator::DeleteOperator(hashBuilderOperator);
omniruntime::op::Operator::DeleteOperator(lookupJoinOperator);
omniruntime::op::Operator::DeleteOperator(lookupOuterJoinOperator);
DeleteJoinOperatorFactory(hashBuilderFactory, lookupJoinFactory, lookupOuterJoinFactory);
}
}