* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#pragma once
#include <vector>
#include <fstream>
#include <unordered_map>
#include <set>
#include <nlohmann/json.hpp>
#include "table/data/RowData.h"
#include "table/data/JoinedRowData.h"
#include "core/api/common/state/ValueState.h"
#include "streaming/api/functions/KeyedProcessFunction.h"
#include "streaming/api/operators/StreamingRuntimeContext.h"
#include "streaming/api/operators/TimestampedCollector.h"
#include "table/runtime/keyselector/KeySelector.h"
class RowTimeDeduplicateFunction
: public KeyedProcessFunction<RowData *, omnistream::VectorBatch *, omnistream::VectorBatch *> {
public:
explicit RowTimeDeduplicateFunction(const nlohmann::json &config)
{
generateUpdateBefore_ = config["generateUpdateBefore"];
generateInsert_ = config["generateInsert"];
rowtimeIndex_ = config["rowtimeIndex"];
keepLastRow_ = config["keepLastRow"];
keyIndex = config["grouping"].get<std::vector<int32_t>>();
inputTypes = config["inputTypes"].get<std::vector<std::string>>();
keyedTypes = getKeyedTypes(keyIndex, config["inputTypes"]);
groupByKeySelector = new KeySelector<RowData*>(keyedTypes, keyIndex);
};
public:
void processBatch(omnistream::VectorBatch *inputVB, Context &ctx, TimestampedCollector &out) override;
void initOutputVector(omnistream::VectorBatch *out, omnistream::VectorBatch *inputVB, int rowCount);
void open(const Configuration &) override;
static std::vector<std::int32_t> getKeyedTypes(const std::vector<int32_t> keyedIndex,
const std::vector<std::string> inputTypes);
unordered_map<RowData *, int32_t> GetNeededUpdateRecord(
omnistream::VectorBatch *inputVB);
bool CompareRecord(int preRowId, int currentRowId,omnistream::VectorBatch* previousVB,
omnistream::VectorBatch* currentVB);
omnistream::VectorBatch* ProcessUpdateRecord(omnistream::VectorBatch *inputVB, Context &ctx);
omnistream::VectorBatch* GetVectorBatchById(int32_t batchId);
void CopyTargetVectorBatchToOut(omnistream::VectorBatch *outputVB, long comboID, int rowIndex);
void UpdateStateBackend(std::vector<std::tuple<long,long,RowData*>> &updateRecords,Context& ctx);
void freeDelBatch();
int getCurrentBatchId()
{
return recordStateVB->getVectorBatchesSize();
}
void processElement(omnistream::VectorBatch *input, Context* ctx, TimestampedCollector* out) override
{
NOT_IMPL_EXCEPTION
}
JoinedRowData *getResultRow() override
{
return nullptr;
}
ValueState<RowData *> *getValueState() override
{
return nullptr;
}
private:
nlohmann::json description;
std::vector<std::string> inputTypes;
bool generateUpdateBefore_;
bool generateInsert_;
int rowtimeIndex_;
bool keepLastRow_;
std::vector<int32_t> keyIndex;
ValueState<int64_t>* recordStateVB = nullptr;
KeySelector<RowData*> *groupByKeySelector;
std::vector<int32_t> keyedTypes;
std::set<omnistream::VectorBatch *> delVb;
std::unordered_map<int32_t,omnistream::VectorBatch *> vectorBatchCacheMap;
omnistream::StateType backendType_ = omnistream::StateType::HEAP;
};