* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#pragma once
#include <set>
#include <vector>
#include "table/runtime/operators/join/AbstractStreamingJoinOperator.h"
#include "table/data/JoinedRowData.h"
#include "table/data/GenericRowData.h"
#include "table/data/util/RowDataUtil.h"
#include "table/typeutils/RowDataSerializer.h"
#include "streaming/api/operators/Triggerable.h"
#include "table/runtime/operators/InternalTimerServiceImpl.h"
#include "core/typeutils/LongSerializer.h"
#include "table/runtime/operators/TableStreamOperator.h"
#include "table/runtime/operators/window/state/WindowListState.h"
#include "table/data/vectorbatch/VectorBatch.h"
#include "table/data/util/VectorBatchUtil.h"
#include "core/api/common/state/ListStateDescriptor.h"
#include "OmniOperatorJIT/core/src/codegen/simple_filter_codegen.h"
#include "OmniOperatorJIT/core/src/vector/unsafe_vector.h"
#include "OmniOperatorJIT/core/src/operator/execution_context.h"
#include "table/runtime/keyselector/KeySelector.h"
#include "table/utils/TimeWindowUtil.h"
#include <arm_sve.h>
using VectorBatchId = uint64_t;
using namespace omnistream;
using FilterFunc = bool (*)(int64_t *, bool *, int32_t *, bool *, int32_t *, int64_t);
template <typename KeyType>
class WindowJoinOperator : public TableStreamOperator<KeyType>, public Triggerable<KeyType, int64_t>, public TwoInputStreamOperator {
public:
WindowJoinOperator(
const nlohmann::json &config, Output *output, TypeSerializer *leftSerializer, TypeSerializer *rightSerializer);
~WindowJoinOperator() override;
void open() override;
void close() override;
void processBatch1(StreamRecord *input) override;
void processBatch2(StreamRecord *input) override;
void processElement1(StreamRecord *element) override {};
void processElement2(StreamRecord *element) override {};
void ProcessWatermark1(Watermark* watermark) override
{
LOG(">>>>>>>>>>")
if (this->combinedWatermark->UpdateWatermark(0, watermark->getTimestamp())) {
if (this->timeServiceManager != nullptr) {
this->timeServiceManager->advanceWatermark(new Watermark(this->combinedWatermark->GetCombinedWatermark()));
}
this->output->emitWatermark(new Watermark(this->combinedWatermark->GetCombinedWatermark()));
}
}
void ProcessWatermark2(Watermark* watermark) override
{
LOG(">>>>>>>>>>")
if (this->combinedWatermark->UpdateWatermark(1, watermark->getTimestamp())) {
if (this->timeServiceManager != nullptr) {
this->timeServiceManager->advanceWatermark(new Watermark(this->combinedWatermark->GetCombinedWatermark()));
}
this->output->emitWatermark(new Watermark(this->combinedWatermark->GetCombinedWatermark()));
}
}
void onEventTime(TimerHeapInternalTimer<KeyType, int64_t> *timer) override;
void onProcessingTime(TimerHeapInternalTimer<KeyType, int64_t> *timer) override;
void initializeState(StreamTaskStateInitializerImpl *initializer, TypeSerializer *keySerializer) override;
virtual void join(std::vector<VectorBatchId> *leftRecords, std::vector<VectorBatchId> *rightRecords) = 0;
std::string getTypeName() override { return "WindowJoinOperator"; }
InternalTimerServiceImpl<KeyType, int64_t> *getInternalTimerService()
{
return internalTimerService;
};
std::shared_ptr<omnistream::TaskMetricGroup> GetMectrics() override
{
LOG("WindowJoinOperator GetMectrics")
return this->metrics;
}
protected:
TimestampedCollector *collector;
std::vector<omniruntime::type::DataTypeId> leftTypes;
std::vector<omniruntime::type::DataTypeId> rightTypes;
std::vector<omniruntime::type::DataTypeId> outputTypes;
WindowListState<KeyType, int64_t, VectorBatchId> *leftWindowState;
WindowListState<KeyType, int64_t, VectorBatchId> *rightWindowState;
::FilterFunc generatedFilter;
std::vector<int32_t> leftKeyIndex;
std::vector<int32_t> rightKeyIndex;
void buildInner(
std::vector<VectorBatchId> *leftElements, std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch);
void buildRightNull(std::vector<VectorBatchId> *leftElements, omnistream::VectorBatch *outputBatch);
void buildLeftNull(std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch);
void buildSemiAnti(
std::vector<VectorBatchId> *elements, omnistream::VectorBatch *outputBatch, bool isSemi, std::set<int> *matchedSet);
bool filter(VectorBatchId leftElement, VectorBatchId rightElement);
InternalTimerServiceImpl<KeyType, int64_t> *internalTimerService;
bool isNonEquiCondition;
KeySelector<KeyType>* keySelectorLeft = nullptr;
KeySelector<KeyType>* keySelectorRight = nullptr;
private:
TypeSerializer *leftSerializer;
TypeSerializer *rightSerializer;
int leftWindowEndIndex;
int rightWindowEndIndex;
nlohmann::json description;
std::set<int> colRefsForNonEquiCondition;
int totalNumOfCols;
std::vector<void (*)(omniruntime::vec::BaseVector *, int32_t, int32_t, int64_t *, bool *)> filterFuncPtrs;
std::vector<bool> filterNullKeys;
std::vector<int64_t> leftMaxTimestamps;
std::vector<int64_t> rightMaxTimestamps;
std::string shiftTimeZone = "UTC";
omnistream::StateType backendType_ = omnistream::StateType::HEAP;
template <typename TYPE>
void insertLeft(int colIdx, std::vector<VectorBatchId> *leftElements, std::vector<VectorBatchId> *rightElements,
omnistream::VectorBatch *outputBatch, bool isInner);
template <typename TYPE>
void insertRight(int colIdx, std::vector<VectorBatchId> *leftElements, std::vector<VectorBatchId> *rightElements,
omnistream::VectorBatch *outputBatch, bool isInner);
void insertLeftVarchar(int colIdx, std::vector<VectorBatchId> *leftElements,
std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch, bool isInner);
void insertRightVarchar(int colIdx, std::vector<VectorBatchId> *leftElements,
std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch, bool isInner);
void processBatch(omnistream::VectorBatch *batch, int windowEndIndex,
WindowListState<KeyType, int64_t, VectorBatchId> *recordState, bool isLeftSide);
::FilterFunc generateJoinCondition();
void getAllColRefs(nlohmann::json &config);
void BuildInnerLeft(std::vector<VectorBatchId> *leftElements, std::vector<VectorBatchId> *rightElements,
omnistream::VectorBatch *outputBatch);
void BuildInnerRight(std::vector<VectorBatchId> *leftElements, std::vector<VectorBatchId> *rightElements,
omnistream::VectorBatch *outputBatch);
};
template <typename KeyType>
WindowJoinOperator<KeyType>::WindowJoinOperator(
const nlohmann::json &config, Output *output, TypeSerializer *leftSerializer, TypeSerializer *rightSerializer)
: TableStreamOperator<KeyType>(new TimestampedCollector(output)),
isNonEquiCondition(config.contains("nonEquiCondition") && !config["nonEquiCondition"].is_null()),
leftSerializer(leftSerializer), rightSerializer(rightSerializer),
leftWindowEndIndex(config["leftWindowEndIndex"]), rightWindowEndIndex(config["rightWindowEndIndex"]),
description(config),
colRefsForNonEquiCondition(), totalNumOfCols(), filterFuncPtrs()
{
auto leftTypeStr = config["leftInputTypes"].get<std::vector<std::string>>();
auto rightTypeStr = config["rightInputTypes"].get<std::vector<std::string>>();
rightKeyIndex = description["rightJoinKey"].get<std::vector<int32_t>>();
leftKeyIndex = description["leftJoinKey"].get<std::vector<int32_t>>();
for (const auto& i : leftTypeStr) {
leftTypes.push_back(LogicalType::flinkTypeToOmniTypeId(i));
}
for (const auto& i : rightTypeStr) {
rightTypes.push_back(LogicalType::flinkTypeToOmniTypeId(i));
}
if (config.contains("shiftTimeZone")) {
shiftTimeZone = config["shiftTimeZone"].get<std::string>();
}
outputTypes.insert(outputTypes.end(), this->leftTypes.begin(), this->leftTypes.end());
outputTypes.insert(outputTypes.end(), this->rightTypes.begin(), this->rightTypes.end());
}
template <typename KeyType>
WindowJoinOperator<KeyType>::~WindowJoinOperator()
{
delete collector;
delete keySelectorLeft;
keySelectorLeft = nullptr;
delete keySelectorRight;
keySelectorRight = nullptr;
}
template <typename KeyType>
void WindowJoinOperator<KeyType>::open()
{
totalNumOfCols = leftTypes.size() + rightTypes.size();
collector = new TimestampedCollector(this->output);
collector->eraseTimestamp();
internalTimerService =
AbstractStreamOperator<KeyType>::getInternalTimerService("window-timers", new LongSerializer(), this);
std::string leftName = "leftWindowState";
std::string rightName = "rightWindowState";
auto leftDescriptor = new ListStateDescriptor<VectorBatchId>(leftName, new LongSerializer());
auto rightDescriptor = new ListStateDescriptor<VectorBatchId>(rightName, new LongSerializer());
using S = InternalListState<KeyType, int64_t, VectorBatchId>;
auto keyedStateBackend = this->stateHandler->getKeyedStateBackend();
if (dynamic_cast<HeapKeyedStateBackend<KeyType>*>(keyedStateBackend)) {
backendType_ = omnistream::StateType::HEAP;
} else if (dynamic_cast<RocksdbKeyedStateBackend<KeyType>*>(keyedStateBackend)) {
backendType_ = omnistream::StateType::ROCKSDB;
} else {
THROW_LOGIC_EXCEPTION("Unsupported keyed state backend");
}
leftWindowState = new WindowListState<KeyType, int64_t, VectorBatchId>(
keyedStateBackend->template getOrCreateKeyedState<int64_t, S, std::vector<VectorBatchId>*>(
new LongSerializer(), leftDescriptor));
rightWindowState = new WindowListState<KeyType, int64_t, VectorBatchId>(
keyedStateBackend->template getOrCreateKeyedState<int64_t, S, std::vector<VectorBatchId>*>(
new LongSerializer(), rightDescriptor));
std::vector<int> leftKeyTypes;
std::vector<int> rightKeyTypes;
for (auto kIndex: this->leftKeyIndex) {
leftKeyTypes.push_back(this->leftTypes[kIndex]);
}
for (auto kIndex : this->rightKeyIndex) {
rightKeyTypes.push_back(this->rightTypes[kIndex]);
}
if (leftKeyTypes != rightKeyTypes) {
throw std::runtime_error("Left key types do not match right key types");
}
this->keySelectorLeft = new KeySelector<KeyType>(leftKeyTypes, this->leftKeyIndex);
this->keySelectorRight = new KeySelector<KeyType>(rightKeyTypes, this->rightKeyIndex);
generatedFilter = generateJoinCondition();
getAllColRefs(description["nonEquiCondition"]);
for (int i = 0; i < totalNumOfCols; i++) {
if (colRefsForNonEquiCondition.find(i) == colRefsForNonEquiCondition.end()) {
filterFuncPtrs.push_back(nullptr);
} else {
bool leftSideState = static_cast<size_t>(i) < leftTypes.size();
switch (leftSideState ? leftTypes[i] : rightTypes[i - leftTypes.size()]) {
case omniruntime::type::DataTypeId::OMNI_INT:
filterFuncPtrs.push_back(getValueAddress<int32_t>);
break;
case omniruntime::type::DataTypeId::OMNI_LONG:
filterFuncPtrs.push_back(getValueAddress<int64_t>);
break;
case omniruntime::type::DataTypeId::OMNI_DOUBLE:
filterFuncPtrs.push_back(getValueAddress<double>);
break;
case omniruntime::type::DataTypeId::OMNI_BOOLEAN:
filterFuncPtrs.push_back(getValueAddress<bool>);
break;
default:
THROW_LOGIC_EXCEPTION("Type not recognized")
}
}
}
}
template <typename KeyType>
void WindowJoinOperator<KeyType>::close()
{
AbstractStreamOperator<KeyType>::close();
collector = nullptr;
}
template <typename KeyType>
void WindowJoinOperator<KeyType>::processBatch1(StreamRecord* input){
auto record = std::unique_ptr<StreamRecord>(input);
auto batch = reinterpret_cast<omnistream::VectorBatch*>(record->getValue());
processBatch(batch, leftWindowEndIndex, leftWindowState, true);
if (backendType_ != omnistream::StateType::HEAP) {
delete batch;
}
}
template <typename KeyType>
void WindowJoinOperator<KeyType>::processBatch2(StreamRecord* input) {
auto record = std::unique_ptr<StreamRecord>(input);
auto batch = reinterpret_cast<omnistream::VectorBatch*>(record->getValue());
processBatch(batch, rightWindowEndIndex, rightWindowState, false);
if (backendType_ != omnistream::StateType::HEAP) {
delete batch;
}
}
template <typename KeyType>
inline void WindowJoinOperator<KeyType>::initializeState(StreamTaskStateInitializerImpl *initializer, TypeSerializer *keySerializer)
{
INFO_RELEASE("WindowJoinOperator initializeState with initializer, operatorID: " << TwoInputStreamOperator::GetOperatorID().toString());
AbstractStreamOperator<KeyType>::SetOperatorID(TwoInputStreamOperator::GetOperatorID().toString());
AbstractStreamOperator<KeyType>::initializeState(initializer, keySerializer);
}
template <typename KeyType>
void WindowJoinOperator<KeyType>::onEventTime(TimerHeapInternalTimer<KeyType, int64_t> *timer)
{
int64_t window = timer->getNamespace();
this->setCurrentKey(timer->getKey());
std::vector<VectorBatchId> *leftRecords = leftWindowState->get(window);
std::vector<VectorBatchId> *rightRecords = rightWindowState->get(window);
if (leftRecords != nullptr) {
LOG_PRINTF("onEventTime from left %d", leftRecords->size());
} else {
LOG_PRINTF("onEventTime from left 0");
}
if (rightRecords != nullptr) {
LOG_PRINTF("onEventTime from right %d", rightRecords->size());
} else {
LOG_PRINTF("onEventTime from right 0");
}
join(leftRecords, rightRecords);
if (leftRecords != nullptr) {
leftWindowState->clear(window);
if (backendType_ != omnistream::StateType::HEAP) { delete leftRecords; }
}
if (rightRecords != nullptr) {
rightWindowState->clear(window);
if (backendType_ != omnistream::StateType::HEAP) { delete rightRecords; }
}
std::vector<size_t> leftIndicesToDelete;
std::vector<size_t> rightIndicesToDelete;
for (size_t i = 0; i < leftMaxTimestamps.size(); ++i) {
if (window > leftMaxTimestamps[i] && leftMaxTimestamps[i]!= INT64_MIN ) {
leftIndicesToDelete.push_back(i);
leftMaxTimestamps[i] = INT64_MAX;
}
}
if (!leftIndicesToDelete.empty()) {
leftWindowState->clearVectors(leftIndicesToDelete);
}
for (size_t i = 0; i < rightMaxTimestamps.size(); ++i) {
if (window > rightMaxTimestamps[i] && rightMaxTimestamps[i]!= INT64_MIN) {
rightIndicesToDelete.push_back(i);
rightMaxTimestamps[i] = INT64_MAX;
}
}
if (!rightIndicesToDelete.empty()) {
rightWindowState->clearVectors(rightIndicesToDelete);
}
}
template <typename KeyType>
void WindowJoinOperator<KeyType>::onProcessingTime(TimerHeapInternalTimer<KeyType, int64_t> *timer)
{
THROW_LOGIC_EXCEPTION("Window Join only support event-time now")
}
BuildInner() Example:
key is first column
left:
1, 2
1, 3
1, 4
right:
1, 0
1, 5
result:
(1, 2)(1, 0)
(1, 2)(1, 5)
(1, 3)(1, 0)
(1, 3)(1, 5)
(1, 4)(1, 0)
(1, 4)(1, 5)
*/
template <typename KeyType> void WindowJoinOperator<KeyType>::BuildInnerLeft(std::vector<VectorBatchId> *leftElements,
std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch)
{
int colIdx = 0;
for (auto dataType : leftTypes) {
switch (dataType) {
case omniruntime::type::DataTypeId::OMNI_SHORT:
insertLeft<int16_t>(colIdx, leftElements, rightElements, outputBatch, true);
break;
case omniruntime::type::DataTypeId::OMNI_INT:
insertLeft<int32_t>(colIdx, leftElements, rightElements, outputBatch, true);
break;
case omniruntime::type::DataTypeId::OMNI_LONG:
case omniruntime::type::DataTypeId::OMNI_TIMESTAMP:
case omniruntime::type::DataTypeId::OMNI_TIMESTAMP_WITHOUT_TIME_ZONE:
case omniruntime::type::DataTypeId::OMNI_TIMESTAMP_WITH_LOCAL_TIME_ZONE:
insertLeft<int64_t>(colIdx, leftElements, rightElements, outputBatch, true);
break;
case omniruntime::type::DataTypeId::OMNI_DOUBLE:
insertLeft<double>(colIdx, leftElements, rightElements, outputBatch, true);
break;
case omniruntime::type::DataTypeId::OMNI_BOOLEAN:
insertLeft<bool>(colIdx, leftElements, rightElements, outputBatch, true);
break;
case omniruntime::type::DataTypeId::OMNI_DECIMAL128:
insertLeft<omniruntime::type::Decimal128>(colIdx, leftElements, rightElements, outputBatch, true);
break;
case omniruntime::type::DataTypeId::OMNI_CHAR:
case omniruntime::type::DataTypeId::OMNI_VARCHAR:
insertLeftVarchar(colIdx, leftElements, rightElements, outputBatch, true);
break;
default:
THROW_LOGIC_EXCEPTION("Type not recognized")
break;
}
colIdx++;
}
}
template <typename KeyType> void WindowJoinOperator<KeyType>::BuildInnerRight(std::vector<VectorBatchId> *leftElements,
std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch)
{
int colIdx = leftTypes.size();
for (auto dataType : rightTypes) {
switch (dataType) {
case omniruntime::type::DataTypeId::OMNI_SHORT:
insertRight<int16_t>(colIdx, leftElements, rightElements, outputBatch, true);
break;
case omniruntime::type::DataTypeId::OMNI_INT:
insertRight<int32_t>(colIdx, leftElements, rightElements, outputBatch, true);
break;
case omniruntime::type::DataTypeId::OMNI_LONG:
case omniruntime::type::DataTypeId::OMNI_TIMESTAMP:
case omniruntime::type::DataTypeId::OMNI_TIMESTAMP_WITHOUT_TIME_ZONE:
case omniruntime::type::DataTypeId::OMNI_TIMESTAMP_WITH_LOCAL_TIME_ZONE:
insertRight<int64_t>(colIdx, leftElements, rightElements, outputBatch, true);
break;
case omniruntime::type::DataTypeId::OMNI_DOUBLE:
insertRight<double>(colIdx, leftElements, rightElements, outputBatch, true);
break;
case omniruntime::type::DataTypeId::OMNI_BOOLEAN:
insertRight<bool>(colIdx, leftElements, rightElements, outputBatch, true);
break;
case omniruntime::type::DataTypeId::OMNI_DECIMAL128:
insertRight<omniruntime::type::Decimal128>(colIdx, leftElements, rightElements, outputBatch, true);
break;
case omniruntime::type::DataTypeId::OMNI_CHAR:
case omniruntime::type::DataTypeId::OMNI_VARCHAR:
insertRightVarchar(colIdx, leftElements, rightElements, outputBatch, true);
break;
default:
THROW_LOGIC_EXCEPTION("Type not recognized")
break;
}
colIdx++;
}
}
template <typename KeyType> void WindowJoinOperator<KeyType>::buildInner(std::vector<VectorBatchId> *leftElements,
std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch)
{
BuildInnerLeft(leftElements, rightElements, outputBatch);
BuildInnerRight(leftElements, rightElements, outputBatch);
}
template <typename KeyType>
void WindowJoinOperator<KeyType>::buildRightNull(std::vector<VectorBatchId> *leftElements, omnistream::VectorBatch *outputBatch)
{
int colIdx = 0;
for (auto dataType : leftTypes) {
switch (dataType) {
case omniruntime::type::DataTypeId::OMNI_SHORT:
insertLeft<int16_t>(colIdx, leftElements, nullptr, outputBatch, false);
break;
case omniruntime::type::DataTypeId::OMNI_INT:
insertLeft<int32_t>(colIdx, leftElements, nullptr, outputBatch, false);
break;
case omniruntime::type::DataTypeId::OMNI_LONG:
insertLeft<int64_t>(colIdx, leftElements, nullptr, outputBatch, false);
break;
case omniruntime::type::DataTypeId::OMNI_DOUBLE:
insertLeft<double>(colIdx, leftElements, nullptr, outputBatch, false);
break;
case omniruntime::type::DataTypeId::OMNI_BOOLEAN:
insertLeft<bool>(colIdx, leftElements, nullptr, outputBatch, false);
break;
case omniruntime::type::DataTypeId::OMNI_DECIMAL128:
insertLeft<omniruntime::type::Decimal128>(colIdx, leftElements, nullptr, outputBatch, false);
break;
case omniruntime::type::DataTypeId::OMNI_CHAR:
insertLeftVarchar(colIdx, leftElements, nullptr, outputBatch, false);
break;
default:
THROW_LOGIC_EXCEPTION("Type not recognized")
break;
}
colIdx++;
}
for (int i = leftTypes.size(); i < rightTypes.size() + leftTypes.size(); i++) {
for (int j = 0; j < outputBatch->Get(i)->GetSize(); j++) {
outputBatch->Get(i)->SetNull(j);
}
}
}
template <typename KeyType>
void WindowJoinOperator<KeyType>::buildLeftNull(std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch)
{
int colIdx = 0 + leftTypes.size();
for (auto dataType : rightTypes) {
switch (dataType) {
case omniruntime::type::DataTypeId::OMNI_SHORT:
insertRight<int16_t>(colIdx, nullptr, rightElements, outputBatch, false);
break;
case omniruntime::type::DataTypeId::OMNI_INT:
insertRight<int32_t>(colIdx, nullptr, rightElements, outputBatch, false);
break;
case omniruntime::type::DataTypeId::OMNI_LONG:
insertRight<int64_t>(colIdx, nullptr, rightElements, outputBatch, false);
break;
case omniruntime::type::DataTypeId::OMNI_DOUBLE:
insertRight<double>(colIdx, nullptr, rightElements, outputBatch, false);
break;
case omniruntime::type::DataTypeId::OMNI_BOOLEAN:
insertRight<bool>(colIdx, nullptr, rightElements, outputBatch, false);
break;
case omniruntime::type::DataTypeId::OMNI_DECIMAL128:
insertRight<omniruntime::type::Decimal128>(colIdx, nullptr, rightElements, outputBatch, false);
break;
case omniruntime::type::DataTypeId::OMNI_CHAR:
insertRightVarchar(colIdx, nullptr, rightElements, outputBatch, false);
break;
default:
THROW_LOGIC_EXCEPTION("Type not recognized")
break;
}
colIdx++;
}
for (int i = 0; i < leftTypes.size(); i++) {
for (int j = 0; j < outputBatch->Get(i)->GetSize(); j++) {
outputBatch->Get(i)->SetNull(j);
}
}
}
template <typename KeyType>
inline void WindowJoinOperator<KeyType>::buildSemiAnti(
std::vector<VectorBatchId> *elements, omnistream::VectorBatch *outputBatch, bool isSemi, std::set<int> *matchedSet)
{
}
template <typename KeyType>
void WindowJoinOperator<KeyType>::processBatch(omnistream::VectorBatch *batch, int windowEndIndex,
WindowListState<KeyType, int64_t, VectorBatchId> *recordState, bool isLeftSide)
{
auto maxTimeStamp = batch->setMaxTimestamp(isLeftSide ? leftWindowEndIndex : rightWindowEndIndex);
if(isLeftSide){
leftMaxTimestamps.push_back(maxTimeStamp);
}else{
rightMaxTimestamps.push_back(maxTimeStamp);
}
int batchID = recordState->getCurrentBatchId();
recordState->addVectorBatch(batch);
KeySelector<KeyType>* keySelector = nullptr;
keySelector = isLeftSide ? this->keySelectorLeft : this->keySelectorRight;
for (int i = 0; i < batch->GetRowCount(); i++) {
auto key = keySelector->getKey(batch, i);
this->setCurrentKey(key);
int64_t windowEndTime =
reinterpret_cast<omniruntime::vec::Vector<int64_t> *>(batch->Get(windowEndIndex))->GetValue(i);
if (TimeWindowUtil::isWindowFired(windowEndTime, internalTimerService->currentWatermark(), shiftTimeZone)) {
continue;
}
recordState->add(windowEndTime, VectorBatchUtil::getComboId(batchID, i));
internalTimerService->registerEventTimeTimer(
windowEndTime, TimeWindowUtil::toEpochMillsForTimer(windowEndTime - 1, shiftTimeZone));
}
}
template <typename KeyType>
template <typename TYPE>
inline void WindowJoinOperator<KeyType>::insertLeft(int colIdx, std::vector<VectorBatchId> *leftElements,
std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch, bool isInner) {
int num = (*leftElements).size();
uint32_t* batchIDdst = new uint32_t[num];
uint32_t* rowIDdst = new uint32_t[num];
int processNum = svcntw();
int half = svcntd();
for (int i = 0; i < num; i+=processNum) {
svbool_t pg = svwhilelt_b64(i, num);
svbool_t pg2 = svwhilelt_b64(i + half, num);
svbool_t pg3 = svwhilelt_b32(i, num);
svuint64_t comboID = svld1(pg, (*leftElements).data() + i);
svuint64_t comboID2 = svld1(pg2, (*leftElements).data() + i + half);
svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
svst1_u32(pg3, rowIDdst + i, rowID);
svst1_u32(pg3, batchIDdst + i, batchID);
}
auto col = reinterpret_cast<omniruntime::vec::Vector<TYPE> *>(outputBatch->Get(colIdx));
if (isNonEquiCondition || !isInner) {
int rowIdx = 0;
for (int j = 0; j < num; j++) {
int batchId = batchIDdst[j];
int rowId = rowIDdst[j];
auto batch = leftWindowState->getVectorBatch(batchId);
auto value = batch->template GetValueAt<TYPE>(colIdx, rowId);
col->SetValue(rowIdx, value);
rowIdx++;
if (backendType_ != omnistream::StateType::HEAP) {
delete batch;
}
}
} else {
int rowIdx = 0;
for (int j = 0; j < num; j++) {
int batchId = batchIDdst[j];
int rowId = rowIDdst[j];
auto batch = leftWindowState->getVectorBatch(batchId);
auto value = batch->template GetValueAt<TYPE>(colIdx, rowId);
for (size_t i = 0; i < rightElements->size(); i++) {
col->SetValue(i + rowIdx, value);
}
rowIdx += rightElements->size();
if (backendType_ != omnistream::StateType::HEAP) {
delete batch;
}
}
}
delete[] batchIDdst;
delete[] rowIDdst;
}
template <typename KeyType>
void WindowJoinOperator<KeyType>::insertLeftVarchar(int colIdx, std::vector<VectorBatchId> *leftElements,
std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch, bool isInner) {
using varcharVecType = omniruntime::vec::Vector<omniruntime::vec::LargeStringContainer<std::string_view>>;
auto col = reinterpret_cast<varcharVecType *>(outputBatch->Get(colIdx));
if (isNonEquiCondition || !isInner) {
int rowIdx = 0;
for (auto element : *leftElements) {
int batchId = VectorBatchUtil::getBatchId(element);
int rowId = VectorBatchUtil::getRowId(element);
auto batch = leftWindowState->getVectorBatch(batchId);
auto value = reinterpret_cast<varcharVecType *>(batch->Get(colIdx))->GetValue(rowId);
col->SetValue(rowIdx, value);
rowIdx++;
if (backendType_ != omnistream::StateType::HEAP) {
delete batch;
}
}
} else {
int rowIdx = 0;
int num = (*leftElements).size();
uint32_t* batchIDdst = new uint32_t[num];
uint32_t* rowIDdst = new uint32_t[num];
int processNum = svcntw();
int half = svcntd();
for (int i = 0; i < num; i+=processNum) {
svbool_t pg = svwhilelt_b64(i, num);
svbool_t pg2 = svwhilelt_b64(i + half, num);
svbool_t pg3 = svwhilelt_b32(i, num);
svuint64_t comboID = svld1(pg, (*leftElements).data() + i);
svuint64_t comboID2 = svld1(pg2, (*leftElements).data() + i + half);
svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
svst1_u32(pg3, rowIDdst + i, rowID);
svst1_u32(pg3, batchIDdst + i, batchID);
}
for (int j = 0; j < num; j++) {
int batchId = batchIDdst[j];
int rowId = rowIDdst[j];
auto batch = leftWindowState->getVectorBatch(batchId);
auto value = reinterpret_cast<varcharVecType *>(batch->Get(colIdx))->GetValue(rowId);
for (size_t i = 0; i < rightElements->size(); i++) {
col->SetValue(i + rowIdx, value);
}
rowIdx += rightElements->size();
if (backendType_ != omnistream::StateType::HEAP) {
delete batch;
}
}
delete[] batchIDdst;
delete[] rowIDdst;
}
}
template <typename KeyType>
template <typename TYPE>
inline void WindowJoinOperator<KeyType>::insertRight(int colIdx, std::vector<VectorBatchId> *leftElements,
std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch, bool isInner)
{
int num = (*rightElements).size();
uint32_t* batchIDdst = new uint32_t[num];
uint32_t* rowIDdst = new uint32_t[num];
int processNum = svcntw();
int half = svcntd();
for (int i = 0; i < num; i+=processNum) {
svbool_t pg = svwhilelt_b64(i, num);
svbool_t pg2 = svwhilelt_b64(i + half, num);
svbool_t pg3 = svwhilelt_b32(i, num);
svuint64_t comboID = svld1(pg, (*rightElements).data() + i);
svuint64_t comboID2 = svld1(pg2, (*rightElements).data() + i + half);
svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
svst1_u32(pg3, rowIDdst + i, rowID);
svst1_u32(pg3, batchIDdst + i, batchID);
}
auto col = reinterpret_cast<omniruntime::vec::Vector<TYPE> *>(outputBatch->Get(colIdx));
if (isNonEquiCondition || !isInner) {
int rowIdx = 0;
for (int i = 0; i < num; i++) {
int batchId = batchIDdst[i];
int rowId = rowIDdst[i];
auto batch = rightWindowState->getVectorBatch(batchId);
auto value = batch->template GetValueAt<TYPE>(colIdx - leftTypes.size(), rowId);
col->SetValue(rowIdx, value);
rowIdx++;
if (backendType_ != omnistream::StateType::HEAP) {
delete batch;
}
}
} else {
for (size_t i = 0; i < rightElements->size(); i++) {
auto element = rightElements->at(i);
int batchId = batchIDdst[i];
int rowId = rowIDdst[i];
auto batch = rightWindowState->getVectorBatch(batchId);
auto value = batch->template GetValueAt<TYPE>(colIdx - leftTypes.size(), rowId);
for (size_t j = 0; j < leftElements->size(); j++) {
int valIdx = i + leftElements->size() * j;
col->SetValue(valIdx, value);
}
if (backendType_ != omnistream::StateType::HEAP) {
delete batch;
}
}
}
delete[] batchIDdst;
delete[] rowIDdst;
}
template <typename KeyType>
void WindowJoinOperator<KeyType>::insertRightVarchar(int colIdx, std::vector<VectorBatchId> *leftElements,
std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch, bool isInner)
{
using varcharVecType = omniruntime::vec::Vector<omniruntime::vec::LargeStringContainer<std::string_view>>;
auto col = reinterpret_cast<varcharVecType *>(outputBatch->Get(colIdx));
if (isNonEquiCondition || !isInner) {
int rowIdx = 0;
for (auto element : *rightElements) {
int batchId = VectorBatchUtil::getBatchId(element);
int rowId = VectorBatchUtil::getRowId(element);
auto batch = rightWindowState->getVectorBatch(batchId);
auto value = reinterpret_cast<varcharVecType *>(batch->Get(colIdx - leftTypes.size()))->GetValue(rowId);
col->SetValue(rowIdx, value);
rowIdx++;
if (backendType_ != omnistream::StateType::HEAP) {
delete batch;
}
}
} else {
int num = (*rightElements).size();
uint32_t* batchIDdst = new uint32_t[num];
uint32_t* rowIDdst = new uint32_t[num];
int processNum = svcntw();
int half = svcntd();
for (int i = 0; i < num; i+=processNum) {
svbool_t pg = svwhilelt_b64(i, num);
svbool_t pg2 = svwhilelt_b64(i + half, num);
svbool_t pg3 = svwhilelt_b32(i, num);
svuint64_t comboID = svld1(pg, (*rightElements).data() + i);
svuint64_t comboID2 = svld1(pg2, (*rightElements).data() + i + half);
svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
svst1_u32(pg3, rowIDdst + i, rowID);
svst1_u32(pg3, batchIDdst + i, batchID);
}
for (size_t i = 0; i < rightElements->size(); i++) {
auto element = rightElements->at(i);
int batchId = batchIDdst[i];
int rowId = rowIDdst[i];
auto batch = rightWindowState->getVectorBatch(batchId);
auto value = reinterpret_cast<varcharVecType *>(batch->Get(colIdx - leftTypes.size()))->GetValue(rowId);
for (size_t j = 0; j < leftElements->size(); j++) {
int valIdx = i + leftElements->size() * j;
col->SetValue(valIdx, value);
}
if (backendType_ != omnistream::StateType::HEAP) {
delete batch;
}
}
delete[] batchIDdst;
delete[] rowIDdst;
}
}
template <typename KeyType>
::FilterFunc WindowJoinOperator<KeyType>::generateJoinCondition()
{
if (isNonEquiCondition) {
auto filter = description["nonEquiCondition"];
Expr *jExpr = JSONParser::ParseJSON(filter);
SimpleFilterCodeGen *filterCodegen = new SimpleFilterCodeGen("nonEquiCondition", *jExpr, nullptr);
int64_t fAddr = filterCodegen->GetFunction();
void *refFunc = &fAddr;
return *static_cast<::FilterFunc *>(refFunc);
}
return nullptr;
}
template <typename KeyType>
void WindowJoinOperator<KeyType>::getAllColRefs(nlohmann::json &config)
{
if (config["exprType"] == "FIELD_REFERENCE") {
colRefsForNonEquiCondition.emplace(config["colVal"]);
}
if (config.contains("right")) {
getAllColRefs(config["right"]);
}
if (config.contains("left")) {
getAllColRefs(config["left"]);
}
}
template <typename KeyType>
bool WindowJoinOperator<KeyType>::filter(VectorBatchId leftElement, VectorBatchId rightElement)
{
if (isNonEquiCondition) {
auto leftRowId = VectorBatchUtil::getRowId(leftElement);
auto leftBatchId = VectorBatchUtil::getBatchId(leftElement);
auto rightRowId = VectorBatchUtil::getRowId(rightElement);
auto rightBatchId = VectorBatchUtil::getBatchId(rightElement);
int64_t *vals = new int64_t[totalNumOfCols];
bool *nulls = new bool[totalNumOfCols];
auto resultBool = new bool(false);
auto batches = std::vector<omnistream::VectorBatch*>(totalNumOfCols);
for (auto col : colRefsForNonEquiCondition) {
omnistream::VectorBatch* batch;
omniruntime::vec::BaseVector* vector;
if (col < static_cast<int>(leftTypes.size())) {
batch = leftWindowState->getVectorBatch(leftBatchId);
vector = batch->Get(col);
} else {
batch = rightWindowState->getVectorBatch(rightBatchId);
vector = batch->Get(col - leftTypes.size());
}
batches.push_back(batch);
filterFuncPtrs[col](vector, static_cast<size_t>(col) < leftTypes.size() ? leftRowId : rightRowId, col, vals, nulls);
}
omniruntime::op::ExecutionContext context;
auto result = generatedFilter(vals, nulls, nullptr, resultBool, nullptr, (int64_t)(&context));
delete[] vals;
delete[] nulls;
delete resultBool;
if (backendType_ != omnistream::StateType::HEAP) {
for (auto batch_ : batches) {
delete batch_;
batch_ = nullptr;
}
}
return result;
} else {
return true;
}
}