* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef FLINK_TNEL_STREAMCALCBATCH_H
#define FLINK_TNEL_STREAMCALCBATCH_H
#include <functional>
#include <iostream>
#include <nlohmann/json.hpp>
#include <variant>
#include <vector>
#include <unordered_map>
#include "Output.h"
#include "AbstractUdfStreamOperator.h"
#include "OneInputStreamOperator.h"
#include "TimestampedCollector.h"
#include "table/data/TimestampData.h"
#include "table/data/vectorbatch/temp_batch_function.h"
#include "table/data/vectorbatch/VectorBatch.h"
#include "table/data/util/VectorBatchUtil.h"
#include "OmniOperatorJIT/core/src/type/data_types.h"
#include "OmniOperatorJIT/core/src/expression/expressions.h"
#include "OmniOperatorJIT/core/src/expression/jsonparser/jsonparser.h"
#include "OmniOperatorJIT/core/src/codegen/simple_filter_codegen.h"
#include "OmniOperatorJIT/core/src/operator/execution_context.h"
#include "OmniOperatorJIT/core/src/codegen/expr_evaluator.h"
#include "OmniOperatorJIT/core/src/operator/config/operator_config.h"
#include "OmniOperatorJIT/core/src/memory/aligned_buffer.h"
#include "OmniOperatorJIT/core/src/operator/execution_context.h"
class StreamCalcBatch : public OneInputStreamOperator, public AbstractStreamOperator<int> {
public:
explicit StreamCalcBatch(const nlohmann::json& description, Output* output);
~StreamCalcBatch() override;
void processBatch(class StreamRecord* input) override;
void processElement(StreamRecord* record) override
{
NOT_IMPL_EXCEPTION
};
void open() override;
void close() override;
const char *getName() override;
void initializeState(StreamTaskStateInitializerImpl *initializer, TypeSerializer *keySerializer) override
{
LOG("StreamCalc initializeState()")
}
void ProcessWatermark(Watermark *watermark) override
{
output->emitWatermark(watermark);
}
void processWatermarkStatus(WatermarkStatus *watermarkStatus) override
{
output->emitWatermarkStatus(watermarkStatus);
}
std::string getTypeName() override
{
std::string typeName = "StreamCalc";
typeName.append(__PRETTY_FUNCTION__) ;
return typeName ;
}
std::shared_ptr<omnistream::TaskMetricGroup> GetMectrics() override
{
return AbstractStreamOperator::GetMectrics();
}
auto GetOutputBatch() const
{
return outputBatch;
}
private:
void parseDescription(nlohmann::json& description);
void collectUnsupportedExpr(nlohmann::json& description, int32_t& nextIndex);
inline void collectUnsupportedExprImpl(nlohmann::json& field, int32_t& nextIndex);
void manuallyAddNewVectors(omnistream::VectorBatch* vb) const;
std::vector<nlohmann::json> replacedDescriptions_;
std::vector<nlohmann::json> newRefDescriptions_;
std::vector<int32_t> outputTypeIds_;
omniruntime::type::DataTypes inputTypes_;
omnistream::VectorBatch* outputBatch;
std::vector<int32_t> outputIndexes_;
nlohmann::json description_;
bool isSimpleProjection_ = false;
bool hasFilter = false;
TimestampedCollector* timestampedCollector_;
omniruntime::mem::AlignedBuffer<int32_t> selectedRowsBuffer;
std::vector<omniruntime::expressions::Expr *> projExprs;
omniruntime::expressions::Expr* filterCondition = nullptr;
omniruntime::codegen::ExpressionEvaluator* exprEvaluator;
std::unique_ptr<omniruntime::op::ExecutionContext> executionContext;
};
#endif