/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

#pragma once
#include <set>
#include <vector>
#include "table/runtime/operators/join/AbstractStreamingJoinOperator.h"
#include "table/data/JoinedRowData.h"
#include "table/data/GenericRowData.h"
#include "table/data/util/RowDataUtil.h"
#include "table/typeutils/RowDataSerializer.h"
#include "streaming/api/operators/Triggerable.h"
#include "table/runtime/operators/InternalTimerServiceImpl.h"
#include "core/typeutils/LongSerializer.h"
#include "table/runtime/operators/TableStreamOperator.h"
#include "table/runtime/operators/window/state/WindowListState.h"
#include "table/data/vectorbatch/VectorBatch.h"
#include "table/data/util/VectorBatchUtil.h"
#include "core/api/common/state/ListStateDescriptor.h"
#include "OmniOperatorJIT/core/src/codegen/simple_filter_codegen.h"
#include "OmniOperatorJIT/core/src/vector/unsafe_vector.h"
#include "OmniOperatorJIT/core/src/operator/execution_context.h"
#include "table/runtime/keyselector/KeySelector.h"
#include "table/utils/TimeWindowUtil.h"

#include <arm_sve.h>

using VectorBatchId = uint64_t;

using namespace omnistream;
using FilterFunc = bool (*)(int64_t *, bool *, int32_t *, bool *, int32_t *, int64_t);
template <typename KeyType>
class WindowJoinOperator : public TableStreamOperator<KeyType>, public Triggerable<KeyType, int64_t>, public TwoInputStreamOperator {
public:
    WindowJoinOperator(
        const nlohmann::json &config, Output *output, TypeSerializer *leftSerializer, TypeSerializer *rightSerializer);
    ~WindowJoinOperator() override;
    void open() override;
    void close() override;
    void processBatch1(StreamRecord *input) override;
    void processBatch2(StreamRecord *input) override;
    void processElement1(StreamRecord *element) override {};
    void processElement2(StreamRecord *element) override {};

    void ProcessWatermark1(Watermark* watermark) override
    {
        LOG(">>>>>>>>>>")
        if (this->combinedWatermark->UpdateWatermark(0, watermark->getTimestamp())) {
            if (this->timeServiceManager != nullptr) {
                this->timeServiceManager->advanceWatermark(new Watermark(this->combinedWatermark->GetCombinedWatermark()));
            }
            this->output->emitWatermark(new Watermark(this->combinedWatermark->GetCombinedWatermark()));
        }
    }
    void ProcessWatermark2(Watermark* watermark) override
    {
        LOG(">>>>>>>>>>")
        if (this->combinedWatermark->UpdateWatermark(1, watermark->getTimestamp())) {
            if (this->timeServiceManager != nullptr) {
                this->timeServiceManager->advanceWatermark(new Watermark(this->combinedWatermark->GetCombinedWatermark()));
            }
            this->output->emitWatermark(new Watermark(this->combinedWatermark->GetCombinedWatermark()));
        }
    }
    void onEventTime(TimerHeapInternalTimer<KeyType, int64_t> *timer) override;
    void onProcessingTime(TimerHeapInternalTimer<KeyType, int64_t> *timer) override;
    void initializeState(StreamTaskStateInitializerImpl *initializer, TypeSerializer *keySerializer) override;
    virtual void join(std::vector<VectorBatchId> *leftRecords, std::vector<VectorBatchId> *rightRecords) = 0;
    std::string getTypeName() override { return "WindowJoinOperator"; }
    InternalTimerServiceImpl<KeyType, int64_t> *getInternalTimerService()
    {
        return internalTimerService;
    };
    std::shared_ptr<omnistream::TaskMetricGroup> GetMectrics() override
    {
        LOG("WindowJoinOperator GetMectrics")
        return this->metrics;
    }
protected:
    TimestampedCollector *collector;
    std::vector<omniruntime::type::DataTypeId> leftTypes;
    std::vector<omniruntime::type::DataTypeId> rightTypes;
    std::vector<omniruntime::type::DataTypeId> outputTypes;
    WindowListState<KeyType, int64_t, VectorBatchId> *leftWindowState;
    WindowListState<KeyType, int64_t, VectorBatchId> *rightWindowState;
    ::FilterFunc generatedFilter; // Use global scope resolution to avoid ambiguity
    std::vector<int32_t> leftKeyIndex;
    std::vector<int32_t> rightKeyIndex;
    void buildInner(
        std::vector<VectorBatchId> *leftElements, std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch);
    void buildRightNull(std::vector<VectorBatchId> *leftElements, omnistream::VectorBatch *outputBatch);
    void buildLeftNull(std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch);
    void buildSemiAnti(
        std::vector<VectorBatchId> *elements, omnistream::VectorBatch *outputBatch, bool isSemi, std::set<int> *matchedSet);
    bool filter(VectorBatchId leftElement, VectorBatchId rightElement);
    InternalTimerServiceImpl<KeyType, int64_t> *internalTimerService;
    bool isNonEquiCondition;
    KeySelector<KeyType>* keySelectorLeft = nullptr;
    KeySelector<KeyType>* keySelectorRight = nullptr;

private:
    TypeSerializer *leftSerializer;
    TypeSerializer *rightSerializer;
    int leftWindowEndIndex;
    int rightWindowEndIndex;
    nlohmann::json description;
    std::set<int> colRefsForNonEquiCondition;
    int totalNumOfCols;
    // Vector, rowId, colId, valAddressPtr, isNullPtr
    std::vector<void (*)(omniruntime::vec::BaseVector *, int32_t, int32_t, int64_t *, bool *)> filterFuncPtrs;
    std::vector<bool> filterNullKeys;
    std::vector<int64_t> leftMaxTimestamps;
    std::vector<int64_t> rightMaxTimestamps;
    std::string shiftTimeZone = "UTC";
    omnistream::StateType backendType_ = omnistream::StateType::HEAP;

    template <typename TYPE>
    void insertLeft(int colIdx, std::vector<VectorBatchId> *leftElements, std::vector<VectorBatchId> *rightElements,
        omnistream::VectorBatch *outputBatch, bool isInner);
    template <typename TYPE>
    void insertRight(int colIdx, std::vector<VectorBatchId> *leftElements, std::vector<VectorBatchId> *rightElements,
        omnistream::VectorBatch *outputBatch, bool isInner);
    void insertLeftVarchar(int colIdx, std::vector<VectorBatchId> *leftElements,
        std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch, bool isInner);
    void insertRightVarchar(int colIdx, std::vector<VectorBatchId> *leftElements,
        std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch, bool isInner);
    
    void processBatch(omnistream::VectorBatch *batch, int windowEndIndex,
        WindowListState<KeyType, int64_t, VectorBatchId> *recordState, bool isLeftSide);
    ::FilterFunc generateJoinCondition();
    void getAllColRefs(nlohmann::json &config);
    void BuildInnerLeft(std::vector<VectorBatchId> *leftElements, std::vector<VectorBatchId> *rightElements,
        omnistream::VectorBatch *outputBatch);
    void BuildInnerRight(std::vector<VectorBatchId> *leftElements, std::vector<VectorBatchId> *rightElements,
        omnistream::VectorBatch *outputBatch);
};

template <typename KeyType>
WindowJoinOperator<KeyType>::WindowJoinOperator(
    const nlohmann::json &config, Output *output, TypeSerializer *leftSerializer, TypeSerializer *rightSerializer)
    : TableStreamOperator<KeyType>(new TimestampedCollector(output)),
      isNonEquiCondition(config.contains("nonEquiCondition") && !config["nonEquiCondition"].is_null()),
      leftSerializer(leftSerializer), rightSerializer(rightSerializer),
      leftWindowEndIndex(config["leftWindowEndIndex"]), rightWindowEndIndex(config["rightWindowEndIndex"]),
      description(config),
      colRefsForNonEquiCondition(), totalNumOfCols(), filterFuncPtrs()
{
    auto leftTypeStr = config["leftInputTypes"].get<std::vector<std::string>>();
    auto rightTypeStr = config["rightInputTypes"].get<std::vector<std::string>>();
    rightKeyIndex = description["rightJoinKey"].get<std::vector<int32_t>>();
    leftKeyIndex = description["leftJoinKey"].get<std::vector<int32_t>>();
    for (const auto& i : leftTypeStr) {
        leftTypes.push_back(LogicalType::flinkTypeToOmniTypeId(i));
    }

    for (const auto& i : rightTypeStr) {
        rightTypes.push_back(LogicalType::flinkTypeToOmniTypeId(i));
    }
    if (config.contains("shiftTimeZone")) {
        shiftTimeZone = config["shiftTimeZone"].get<std::string>();
    }
    outputTypes.insert(outputTypes.end(), this->leftTypes.begin(), this->leftTypes.end());
    outputTypes.insert(outputTypes.end(), this->rightTypes.begin(), this->rightTypes.end());
}

template <typename KeyType>
WindowJoinOperator<KeyType>::~WindowJoinOperator()
{
    delete collector;
    delete keySelectorLeft;
    keySelectorLeft = nullptr;
    delete keySelectorRight;
    keySelectorRight = nullptr;
    
}

template <typename KeyType>
void WindowJoinOperator<KeyType>::open()
{
    totalNumOfCols = leftTypes.size() + rightTypes.size();

    collector = new TimestampedCollector(this->output);
    collector->eraseTimestamp();
    internalTimerService =
        AbstractStreamOperator<KeyType>::getInternalTimerService("window-timers", new LongSerializer(), this);
    std::string leftName = "leftWindowState";
    std::string rightName = "rightWindowState";
    auto leftDescriptor = new ListStateDescriptor<VectorBatchId>(leftName, new LongSerializer());
    auto rightDescriptor = new ListStateDescriptor<VectorBatchId>(rightName, new LongSerializer());

    using S = InternalListState<KeyType, int64_t, VectorBatchId>;
    auto keyedStateBackend = this->stateHandler->getKeyedStateBackend();
    if (dynamic_cast<HeapKeyedStateBackend<KeyType>*>(keyedStateBackend)) {
        backendType_ = omnistream::StateType::HEAP;
    } else if (dynamic_cast<RocksdbKeyedStateBackend<KeyType>*>(keyedStateBackend)) {
        backendType_ = omnistream::StateType::ROCKSDB;
    } else {
        THROW_LOGIC_EXCEPTION("Unsupported keyed state backend");
    }
    leftWindowState = new WindowListState<KeyType, int64_t, VectorBatchId>(
        keyedStateBackend->template getOrCreateKeyedState<int64_t, S, std::vector<VectorBatchId>*>(
            new LongSerializer(), leftDescriptor));
    rightWindowState = new WindowListState<KeyType, int64_t, VectorBatchId>(
        keyedStateBackend->template getOrCreateKeyedState<int64_t, S, std::vector<VectorBatchId>*>(
            new LongSerializer(), rightDescriptor));
   
    std::vector<int> leftKeyTypes;
    std::vector<int> rightKeyTypes;
    for (auto kIndex: this->leftKeyIndex) {
        leftKeyTypes.push_back(this->leftTypes[kIndex]);
    }
    for (auto kIndex : this->rightKeyIndex) {
        rightKeyTypes.push_back(this->rightTypes[kIndex]);
    }
    // make sure the key types are the same
    if (leftKeyTypes != rightKeyTypes) {
        throw std::runtime_error("Left key types do not match right key types");
    }   
    this->keySelectorLeft = new KeySelector<KeyType>(leftKeyTypes, this->leftKeyIndex);
    this->keySelectorRight = new KeySelector<KeyType>(rightKeyTypes, this->rightKeyIndex);
           
    
    generatedFilter = generateJoinCondition();
    getAllColRefs(description["nonEquiCondition"]);

    for (int i = 0; i < totalNumOfCols; i++) {
        if (colRefsForNonEquiCondition.find(i) == colRefsForNonEquiCondition.end()) {
            filterFuncPtrs.push_back(nullptr);
        } else {
            bool leftSideState = static_cast<size_t>(i) < leftTypes.size();
            switch (leftSideState ? leftTypes[i] : rightTypes[i - leftTypes.size()]) {
                case omniruntime::type::DataTypeId::OMNI_INT:
                    filterFuncPtrs.push_back(getValueAddress<int32_t>);
                    break;
                case omniruntime::type::DataTypeId::OMNI_LONG:
                    filterFuncPtrs.push_back(getValueAddress<int64_t>);
                    break;
                case omniruntime::type::DataTypeId::OMNI_DOUBLE:
                    filterFuncPtrs.push_back(getValueAddress<double>);
                    break;
                case omniruntime::type::DataTypeId::OMNI_BOOLEAN:
                    filterFuncPtrs.push_back(getValueAddress<bool>);
                    break;
                default:
                    THROW_LOGIC_EXCEPTION("Type not recognized")
            }
        }
    }
}

template <typename KeyType>
void WindowJoinOperator<KeyType>::close()
{
    AbstractStreamOperator<KeyType>::close();
    collector = nullptr;
}

template <typename KeyType>
void WindowJoinOperator<KeyType>::processBatch1(StreamRecord* input){
    auto record = std::unique_ptr<StreamRecord>(input);
    auto batch = reinterpret_cast<omnistream::VectorBatch*>(record->getValue());
    processBatch(batch, leftWindowEndIndex, leftWindowState, true);
    if (backendType_ != omnistream::StateType::HEAP) {
        delete batch;
    }
}

template <typename KeyType>
void WindowJoinOperator<KeyType>::processBatch2(StreamRecord* input) {
    auto record = std::unique_ptr<StreamRecord>(input);
    auto batch = reinterpret_cast<omnistream::VectorBatch*>(record->getValue());
    processBatch(batch, rightWindowEndIndex, rightWindowState, false);
    if (backendType_ != omnistream::StateType::HEAP) {
        delete batch;
    }
}

template <typename KeyType>
inline void WindowJoinOperator<KeyType>::initializeState(StreamTaskStateInitializerImpl *initializer, TypeSerializer *keySerializer)
{
    // First do the shared initialization step
    INFO_RELEASE("WindowJoinOperator initializeState with initializer, operatorID: " << TwoInputStreamOperator::GetOperatorID().toString());
    AbstractStreamOperator<KeyType>::SetOperatorID(TwoInputStreamOperator::GetOperatorID().toString());
    AbstractStreamOperator<KeyType>::initializeState(initializer, keySerializer);
}


template <typename KeyType>
void WindowJoinOperator<KeyType>::onEventTime(TimerHeapInternalTimer<KeyType, int64_t> *timer)
{
    int64_t window = timer->getNamespace();
    
    this->setCurrentKey(timer->getKey());

    // 内存状态后端情况下,不需要手动做删除
    std::vector<VectorBatchId> *leftRecords = leftWindowState->get(window);
    std::vector<VectorBatchId> *rightRecords = rightWindowState->get(window);
    if (leftRecords != nullptr) {
        LOG_PRINTF("onEventTime from left %d", leftRecords->size());
    } else {
        LOG_PRINTF("onEventTime from left 0");
    }
    if (rightRecords != nullptr) {
        LOG_PRINTF("onEventTime from right %d", rightRecords->size());
    } else {
        LOG_PRINTF("onEventTime from right 0");
    }
    join(leftRecords, rightRecords);
    if (leftRecords != nullptr) {
        leftWindowState->clear(window);
        if (backendType_ != omnistream::StateType::HEAP) { delete leftRecords; }
    }
    if (rightRecords != nullptr) {
        rightWindowState->clear(window);
        if (backendType_ != omnistream::StateType::HEAP) { delete rightRecords; }
    }

    std::vector<size_t> leftIndicesToDelete;
    std::vector<size_t> rightIndicesToDelete;
    for (size_t i = 0; i < leftMaxTimestamps.size(); ++i) {
        if (window > leftMaxTimestamps[i] && leftMaxTimestamps[i]!= INT64_MIN ) {
            leftIndicesToDelete.push_back(i);
            leftMaxTimestamps[i] = INT64_MAX; 
        }
    }
    if (!leftIndicesToDelete.empty()) {
        leftWindowState->clearVectors(leftIndicesToDelete);
    }
    for (size_t i = 0; i < rightMaxTimestamps.size(); ++i) {
        if (window > rightMaxTimestamps[i] && rightMaxTimestamps[i]!= INT64_MIN) {
            rightIndicesToDelete.push_back(i);
            rightMaxTimestamps[i] = INT64_MAX; 
        }
    }
    if (!rightIndicesToDelete.empty()) {
        rightWindowState->clearVectors(rightIndicesToDelete);
    }
}

template <typename KeyType>
void WindowJoinOperator<KeyType>::onProcessingTime(TimerHeapInternalTimer<KeyType, int64_t> *timer)
{
    THROW_LOGIC_EXCEPTION("Window Join only support event-time now")
}


/*
BuildInner() Example:
key is first column

left:
1, 2
1, 3
1, 4

right:
1, 0
1, 5

result:
(1, 2)(1, 0)
(1, 2)(1, 5)
(1, 3)(1, 0)
(1, 3)(1, 5)
(1, 4)(1, 0)
(1, 4)(1, 5)
*/
template <typename KeyType> void WindowJoinOperator<KeyType>::BuildInnerLeft(std::vector<VectorBatchId> *leftElements,
    std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch)
{
    int colIdx = 0;
    // Left side
    for (auto dataType : leftTypes) {
        switch (dataType) {
            case omniruntime::type::DataTypeId::OMNI_SHORT:
                insertLeft<int16_t>(colIdx, leftElements, rightElements, outputBatch, true);
                break;
            case omniruntime::type::DataTypeId::OMNI_INT:
                insertLeft<int32_t>(colIdx, leftElements, rightElements, outputBatch, true);
                break;
            case omniruntime::type::DataTypeId::OMNI_LONG:
            case omniruntime::type::DataTypeId::OMNI_TIMESTAMP:
            case omniruntime::type::DataTypeId::OMNI_TIMESTAMP_WITHOUT_TIME_ZONE:
            case omniruntime::type::DataTypeId::OMNI_TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                insertLeft<int64_t>(colIdx, leftElements, rightElements, outputBatch, true);
                break;
            case omniruntime::type::DataTypeId::OMNI_DOUBLE:
                insertLeft<double>(colIdx, leftElements, rightElements, outputBatch, true);
                break;
            case omniruntime::type::DataTypeId::OMNI_BOOLEAN:
                insertLeft<bool>(colIdx, leftElements, rightElements, outputBatch, true);
                break;
            case omniruntime::type::DataTypeId::OMNI_DECIMAL128:
                insertLeft<omniruntime::type::Decimal128>(colIdx, leftElements, rightElements, outputBatch, true);
                break;
            case omniruntime::type::DataTypeId::OMNI_CHAR:
            case omniruntime::type::DataTypeId::OMNI_VARCHAR:
                insertLeftVarchar(colIdx, leftElements, rightElements, outputBatch, true);
                break;
            default:
                THROW_LOGIC_EXCEPTION("Type not recognized")
                break;
        }
        colIdx++;
    }
}
template <typename KeyType> void WindowJoinOperator<KeyType>::BuildInnerRight(std::vector<VectorBatchId> *leftElements,
    std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch)
{
    // Right side
    int colIdx = leftTypes.size();
    for (auto dataType : rightTypes) {
        switch (dataType) {
            case omniruntime::type::DataTypeId::OMNI_SHORT:
                insertRight<int16_t>(colIdx, leftElements, rightElements, outputBatch, true);
                break;
            case omniruntime::type::DataTypeId::OMNI_INT:
                insertRight<int32_t>(colIdx, leftElements, rightElements, outputBatch, true);
                break;
            case omniruntime::type::DataTypeId::OMNI_LONG:
            case omniruntime::type::DataTypeId::OMNI_TIMESTAMP:
            case omniruntime::type::DataTypeId::OMNI_TIMESTAMP_WITHOUT_TIME_ZONE:
            case omniruntime::type::DataTypeId::OMNI_TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                insertRight<int64_t>(colIdx, leftElements, rightElements, outputBatch, true);
                break;
            case omniruntime::type::DataTypeId::OMNI_DOUBLE:
                insertRight<double>(colIdx, leftElements, rightElements, outputBatch, true);
                break;
            case omniruntime::type::DataTypeId::OMNI_BOOLEAN:
                insertRight<bool>(colIdx, leftElements, rightElements, outputBatch, true);
                break;
            case omniruntime::type::DataTypeId::OMNI_DECIMAL128:
                insertRight<omniruntime::type::Decimal128>(colIdx, leftElements, rightElements, outputBatch, true);
                break;
            case omniruntime::type::DataTypeId::OMNI_CHAR:
            case omniruntime::type::DataTypeId::OMNI_VARCHAR:
                insertRightVarchar(colIdx, leftElements, rightElements, outputBatch, true);
                break;
            default:
                THROW_LOGIC_EXCEPTION("Type not recognized")
                break;
        }
        colIdx++;
    }
}
template <typename KeyType> void WindowJoinOperator<KeyType>::buildInner(std::vector<VectorBatchId> *leftElements,
    std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch)
{
    BuildInnerLeft(leftElements, rightElements, outputBatch);
    BuildInnerRight(leftElements, rightElements, outputBatch);
}

template <typename KeyType>
void WindowJoinOperator<KeyType>::buildRightNull(std::vector<VectorBatchId> *leftElements, omnistream::VectorBatch *outputBatch)
{
    int colIdx = 0;
    for (auto dataType : leftTypes) {
        switch (dataType) {
            case omniruntime::type::DataTypeId::OMNI_SHORT:
                insertLeft<int16_t>(colIdx, leftElements, nullptr, outputBatch, false);
                break;
            case omniruntime::type::DataTypeId::OMNI_INT:
                insertLeft<int32_t>(colIdx, leftElements, nullptr, outputBatch, false);
                break;
            case omniruntime::type::DataTypeId::OMNI_LONG:
                insertLeft<int64_t>(colIdx, leftElements, nullptr, outputBatch, false);
                break;
            case omniruntime::type::DataTypeId::OMNI_DOUBLE:
                insertLeft<double>(colIdx, leftElements, nullptr, outputBatch, false);
                break;
            case omniruntime::type::DataTypeId::OMNI_BOOLEAN:
                insertLeft<bool>(colIdx, leftElements, nullptr, outputBatch, false);
                break;
            case omniruntime::type::DataTypeId::OMNI_DECIMAL128:
                insertLeft<omniruntime::type::Decimal128>(colIdx, leftElements, nullptr, outputBatch, false);
                break;
            case omniruntime::type::DataTypeId::OMNI_CHAR:
                insertLeftVarchar(colIdx, leftElements, nullptr, outputBatch, false);
                break;
            default:
                THROW_LOGIC_EXCEPTION("Type not recognized")
                break;
        }
        colIdx++;
    }

    for (int i = leftTypes.size(); i < rightTypes.size() + leftTypes.size(); i++) {
        for (int j = 0; j < outputBatch->Get(i)->GetSize(); j++) {
            outputBatch->Get(i)->SetNull(j);
        }
    }
}

template <typename KeyType>
void WindowJoinOperator<KeyType>::buildLeftNull(std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch)
{
    int colIdx = 0 + leftTypes.size();
    for (auto dataType : rightTypes) {
        switch (dataType) {
            case omniruntime::type::DataTypeId::OMNI_SHORT:
                insertRight<int16_t>(colIdx, nullptr, rightElements, outputBatch, false);
                break;
            case omniruntime::type::DataTypeId::OMNI_INT:
                insertRight<int32_t>(colIdx, nullptr, rightElements, outputBatch, false);
                break;
            case omniruntime::type::DataTypeId::OMNI_LONG:
                insertRight<int64_t>(colIdx, nullptr, rightElements, outputBatch, false);
                break;
            case omniruntime::type::DataTypeId::OMNI_DOUBLE:
                insertRight<double>(colIdx, nullptr, rightElements, outputBatch, false);
                break;
            case omniruntime::type::DataTypeId::OMNI_BOOLEAN:
                insertRight<bool>(colIdx, nullptr, rightElements, outputBatch, false);
                break;
            case omniruntime::type::DataTypeId::OMNI_DECIMAL128:
                insertRight<omniruntime::type::Decimal128>(colIdx, nullptr, rightElements, outputBatch, false);
                break;
            case omniruntime::type::DataTypeId::OMNI_CHAR:
                insertRightVarchar(colIdx, nullptr, rightElements, outputBatch, false);
                break;
            default:
                THROW_LOGIC_EXCEPTION("Type not recognized")
                break;
        }
        colIdx++;
    }

    for (int i = 0; i < leftTypes.size(); i++) {
        for (int j = 0; j < outputBatch->Get(i)->GetSize(); j++) {
            outputBatch->Get(i)->SetNull(j);
        }
    }
}

template <typename KeyType>
inline void WindowJoinOperator<KeyType>::buildSemiAnti(
    std::vector<VectorBatchId> *elements, omnistream::VectorBatch *outputBatch, bool isSemi, std::set<int> *matchedSet)
{
}

template <typename KeyType>
void WindowJoinOperator<KeyType>::processBatch(omnistream::VectorBatch *batch, int windowEndIndex,
    WindowListState<KeyType, int64_t, VectorBatchId> *recordState, bool isLeftSide)
{
    auto maxTimeStamp = batch->setMaxTimestamp(isLeftSide ? leftWindowEndIndex : rightWindowEndIndex);
    if(isLeftSide){
        leftMaxTimestamps.push_back(maxTimeStamp);
    }else{
        rightMaxTimestamps.push_back(maxTimeStamp);
    }
    int batchID = recordState->getCurrentBatchId();
    recordState->addVectorBatch(batch);
    KeySelector<KeyType>* keySelector = nullptr;
    keySelector = isLeftSide ? this->keySelectorLeft : this->keySelectorRight;
    
    for (int i = 0; i < batch->GetRowCount(); i++) {
        auto key = keySelector->getKey(batch, i);
        this->setCurrentKey(key);
       
        int64_t windowEndTime =
            reinterpret_cast<omniruntime::vec::Vector<int64_t> *>(batch->Get(windowEndIndex))->GetValue(i);
        if (TimeWindowUtil::isWindowFired(windowEndTime, internalTimerService->currentWatermark(), shiftTimeZone)) {
            continue;
        }
        recordState->add(windowEndTime, VectorBatchUtil::getComboId(batchID, i));
        internalTimerService->registerEventTimeTimer(
            windowEndTime, TimeWindowUtil::toEpochMillsForTimer(windowEndTime - 1, shiftTimeZone));
    }
}

template <typename KeyType>
template <typename TYPE>
inline void WindowJoinOperator<KeyType>::insertLeft(int colIdx, std::vector<VectorBatchId> *leftElements,
        std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch, bool isInner) {
    int num = (*leftElements).size();
    uint32_t* batchIDdst = new uint32_t[num];
    uint32_t* rowIDdst = new uint32_t[num];
    int processNum = svcntw();
    int half = svcntd();
    for (int i = 0; i < num; i+=processNum) {
        svbool_t pg = svwhilelt_b64(i, num);
        svbool_t pg2 = svwhilelt_b64(i + half, num);
        svbool_t pg3 = svwhilelt_b32(i, num);
        svuint64_t comboID = svld1(pg, (*leftElements).data() + i);
        svuint64_t comboID2 = svld1(pg2, (*leftElements).data() + i + half);
        svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
        svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
        svst1_u32(pg3, rowIDdst + i, rowID);
        svst1_u32(pg3, batchIDdst + i, batchID);
    }
    auto col = reinterpret_cast<omniruntime::vec::Vector<TYPE> *>(outputBatch->Get(colIdx));
    if (isNonEquiCondition || !isInner) {
        int rowIdx = 0;
        for (int j = 0; j < num; j++) {
            int batchId = batchIDdst[j];
            int rowId = rowIDdst[j];
            auto batch = leftWindowState->getVectorBatch(batchId);
            auto value = batch->template GetValueAt<TYPE>(colIdx, rowId);
            col->SetValue(rowIdx, value);
            rowIdx++;
            if (backendType_ != omnistream::StateType::HEAP) {
                delete batch;
            }
        }
    } else {
        int rowIdx = 0;
        for (int j = 0; j < num; j++) {
            int batchId = batchIDdst[j];
            int rowId = rowIDdst[j];
            auto batch = leftWindowState->getVectorBatch(batchId);
            auto value = batch->template GetValueAt<TYPE>(colIdx, rowId);
            for (size_t i = 0; i < rightElements->size(); i++) {
                col->SetValue(i + rowIdx, value);
            }
            rowIdx += rightElements->size();
            if (backendType_ != omnistream::StateType::HEAP) {
                delete batch;
            }
        }
    }
    delete[] batchIDdst;
    delete[] rowIDdst;
}

template <typename KeyType>
void WindowJoinOperator<KeyType>::insertLeftVarchar(int colIdx, std::vector<VectorBatchId> *leftElements,
        std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch, bool isInner) {
    using varcharVecType = omniruntime::vec::Vector<omniruntime::vec::LargeStringContainer<std::string_view>>;
    auto col = reinterpret_cast<varcharVecType *>(outputBatch->Get(colIdx));
    if (isNonEquiCondition || !isInner) {
        int rowIdx = 0;
        for (auto element : *leftElements) {
            int batchId = VectorBatchUtil::getBatchId(element);
            int rowId = VectorBatchUtil::getRowId(element);
            auto batch = leftWindowState->getVectorBatch(batchId);
            auto value = reinterpret_cast<varcharVecType *>(batch->Get(colIdx))->GetValue(rowId);
            col->SetValue(rowIdx, value);
            rowIdx++;
            if (backendType_ != omnistream::StateType::HEAP) {
                delete batch;
            }
        }
    } else {
        int rowIdx = 0;
        int num = (*leftElements).size();
        uint32_t* batchIDdst = new uint32_t[num];
        uint32_t* rowIDdst = new uint32_t[num];

        int processNum = svcntw();
        int half = svcntd();
        for (int i = 0; i < num; i+=processNum) {
            svbool_t pg = svwhilelt_b64(i, num);
            svbool_t pg2 = svwhilelt_b64(i + half, num);
            svbool_t pg3 = svwhilelt_b32(i, num);
            svuint64_t comboID = svld1(pg, (*leftElements).data() + i);
            svuint64_t comboID2 = svld1(pg2, (*leftElements).data() + i + half);

            svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
            svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));

            svst1_u32(pg3, rowIDdst + i, rowID);
            svst1_u32(pg3, batchIDdst + i, batchID);
        }
        for (int j = 0; j < num; j++) {
            int batchId = batchIDdst[j];
            int rowId = rowIDdst[j];
            auto batch = leftWindowState->getVectorBatch(batchId);
            auto value = reinterpret_cast<varcharVecType *>(batch->Get(colIdx))->GetValue(rowId);
            for (size_t i = 0; i < rightElements->size(); i++) {
                col->SetValue(i + rowIdx, value);
            }
            rowIdx += rightElements->size();
            if (backendType_ != omnistream::StateType::HEAP) {
                delete batch;
            }
        }
        delete[] batchIDdst;
        delete[] rowIDdst;
    }
}

template <typename KeyType>
template <typename TYPE>
inline void WindowJoinOperator<KeyType>::insertRight(int colIdx, std::vector<VectorBatchId> *leftElements,
    std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch, bool isInner)
{
    int num = (*rightElements).size();
    uint32_t* batchIDdst = new uint32_t[num];
    uint32_t* rowIDdst = new uint32_t[num];
    int processNum = svcntw();
    int half = svcntd();
    for (int i = 0; i < num; i+=processNum) {
        svbool_t pg = svwhilelt_b64(i, num);
        svbool_t pg2 = svwhilelt_b64(i + half, num);
        svbool_t pg3 = svwhilelt_b32(i, num);
        svuint64_t comboID = svld1(pg, (*rightElements).data() + i);
        svuint64_t comboID2 = svld1(pg2, (*rightElements).data() + i + half);
        svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
        svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
        svst1_u32(pg3, rowIDdst + i, rowID);
        svst1_u32(pg3, batchIDdst + i, batchID);
    }
    auto col = reinterpret_cast<omniruntime::vec::Vector<TYPE> *>(outputBatch->Get(colIdx));
    if (isNonEquiCondition || !isInner) {
        int rowIdx = 0;
        for (int i = 0; i < num; i++) {
            int batchId = batchIDdst[i];
            int rowId = rowIDdst[i];
            auto batch = rightWindowState->getVectorBatch(batchId);
            auto value = batch->template GetValueAt<TYPE>(colIdx - leftTypes.size(), rowId);
            col->SetValue(rowIdx, value);
            rowIdx++;
            if (backendType_ != omnistream::StateType::HEAP) {
                delete batch;
            }
        }
    } else {
        for (size_t i = 0; i < rightElements->size(); i++) {
            auto element = rightElements->at(i);
            int batchId = batchIDdst[i];
            int rowId = rowIDdst[i];
            auto batch = rightWindowState->getVectorBatch(batchId);
            auto value = batch->template GetValueAt<TYPE>(colIdx - leftTypes.size(), rowId);
            for (size_t j = 0; j < leftElements->size(); j++) {
                int valIdx = i + leftElements->size() * j;
                col->SetValue(valIdx, value);
            }
            if (backendType_ != omnistream::StateType::HEAP) {
                delete batch;
            }
        }
    }
    delete[] batchIDdst;
    delete[] rowIDdst;
}

template <typename KeyType>
void WindowJoinOperator<KeyType>::insertRightVarchar(int colIdx, std::vector<VectorBatchId> *leftElements,
    std::vector<VectorBatchId> *rightElements, omnistream::VectorBatch *outputBatch, bool isInner)
{
    using varcharVecType = omniruntime::vec::Vector<omniruntime::vec::LargeStringContainer<std::string_view>>;
    auto col = reinterpret_cast<varcharVecType *>(outputBatch->Get(colIdx));
    if (isNonEquiCondition || !isInner) {
        int rowIdx = 0;
        for (auto element : *rightElements) {
            int batchId = VectorBatchUtil::getBatchId(element);
            int rowId = VectorBatchUtil::getRowId(element);
            auto batch = rightWindowState->getVectorBatch(batchId);
            auto value = reinterpret_cast<varcharVecType *>(batch->Get(colIdx - leftTypes.size()))->GetValue(rowId);
            col->SetValue(rowIdx, value);
            rowIdx++;
            if (backendType_ != omnistream::StateType::HEAP) {
                delete batch;
            }
        }
    } else {
        int num = (*rightElements).size();
        uint32_t* batchIDdst = new uint32_t[num];
        uint32_t* rowIDdst = new uint32_t[num];
        int processNum = svcntw();
        int half = svcntd();
        for (int i = 0; i < num; i+=processNum) {
            svbool_t pg = svwhilelt_b64(i, num);
            svbool_t pg2 = svwhilelt_b64(i + half, num);
            svbool_t pg3 = svwhilelt_b32(i, num);
            svuint64_t comboID = svld1(pg, (*rightElements).data() + i);
            svuint64_t comboID2 = svld1(pg2, (*rightElements).data() + i + half);
            svuint32_t rowID = svuzp1(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
            svuint32_t batchID = svuzp2(svreinterpret_u32(comboID), svreinterpret_u32(comboID2));
            svst1_u32(pg3, rowIDdst + i, rowID);
            svst1_u32(pg3, batchIDdst + i, batchID);
        }
        for (size_t i = 0; i < rightElements->size(); i++) {
            auto element = rightElements->at(i);
            int batchId = batchIDdst[i];
            int rowId = rowIDdst[i];
            auto batch = rightWindowState->getVectorBatch(batchId);
            auto value = reinterpret_cast<varcharVecType *>(batch->Get(colIdx - leftTypes.size()))->GetValue(rowId);
            for (size_t j = 0; j < leftElements->size(); j++) {
                int valIdx = i + leftElements->size() * j;
                col->SetValue(valIdx, value);
            }
            if (backendType_ != omnistream::StateType::HEAP) {
                delete batch;
            }
        }
        delete[] batchIDdst;
        delete[] rowIDdst;
    }
}
template <typename KeyType>
::FilterFunc WindowJoinOperator<KeyType>::generateJoinCondition()
{
    if (isNonEquiCondition) {
        auto filter = description["nonEquiCondition"];
        Expr *jExpr = JSONParser::ParseJSON(filter);
        SimpleFilterCodeGen *filterCodegen = new SimpleFilterCodeGen("nonEquiCondition", *jExpr, nullptr);
        int64_t fAddr = filterCodegen->GetFunction();
        void *refFunc = &fAddr;
        return *static_cast<::FilterFunc *>(refFunc);
    }
    return nullptr;
}

template <typename KeyType>
void WindowJoinOperator<KeyType>::getAllColRefs(nlohmann::json &config)
{
    if (config["exprType"] == "FIELD_REFERENCE") {
        colRefsForNonEquiCondition.emplace(config["colVal"]);
    }

    if (config.contains("right")) {
        getAllColRefs(config["right"]);
    }

    if (config.contains("left")) {
        getAllColRefs(config["left"]);
    }
}

template <typename KeyType>
bool WindowJoinOperator<KeyType>::filter(VectorBatchId leftElement, VectorBatchId rightElement)
{
    if (isNonEquiCondition) {
        auto leftRowId = VectorBatchUtil::getRowId(leftElement);
        auto leftBatchId = VectorBatchUtil::getBatchId(leftElement);

        auto rightRowId = VectorBatchUtil::getRowId(rightElement);
        auto rightBatchId = VectorBatchUtil::getBatchId(rightElement);

        int64_t *vals = new int64_t[totalNumOfCols];
        bool *nulls = new bool[totalNumOfCols];
        auto resultBool = new bool(false);
        auto batches = std::vector<omnistream::VectorBatch*>(totalNumOfCols);

        for (auto col : colRefsForNonEquiCondition) {
            omnistream::VectorBatch* batch;
            omniruntime::vec::BaseVector* vector;
            if (col < static_cast<int>(leftTypes.size())) {
                batch = leftWindowState->getVectorBatch(leftBatchId);
                vector = batch->Get(col);
            } else {
                batch = rightWindowState->getVectorBatch(rightBatchId);
                vector = batch->Get(col - leftTypes.size());
            }
            batches.push_back(batch);
            filterFuncPtrs[col](vector, static_cast<size_t>(col) < leftTypes.size() ? leftRowId : rightRowId, col, vals, nulls);
        }

        omniruntime::op::ExecutionContext context;

        auto result = generatedFilter(vals, nulls, nullptr, resultBool, nullptr, (int64_t)(&context));

        delete[] vals;
        delete[] nulls;
        delete resultBool;
        if (backendType_ != omnistream::StateType::HEAP) {
            // Notice: The logic of the filter seems to compare pointers directly, so the batch memory can only be deleted at the end.
            for (auto batch_ : batches) {
                delete batch_;
                batch_ = nullptr;
            }
        }
        return result;
    } else {
        return true;
    }
}