* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef FLINK_TNEL_STREAMCALC_H
#define FLINK_TNEL_STREAMCALC_H
#include <functional>
#include <iostream>
#include <nlohmann/json.hpp>
#include <variant>
#include <vector>
#include "OmniOperatorJIT/core/src/vector/vector_batch.h"
#include "OmniOperatorJIT/core/src/expression/expressions.h"
#include "OmniOperatorJIT/core/src/expression/expr_printer.h"
#include "OmniOperatorJIT/core/src/expression/jsonparser/jsonparser.h"
#include "Output.h"
#include "AbstractUdfStreamOperator.h"
#include "OneInputStreamOperator.h"
#include "TimestampedCollector.h"
#include "table/data/GenericRowData.h"
#include "table/data/TimestampData.h"
#include "table/data/binary/BinaryRowData.h"
#include "table/data/binary/BinaryStringData.h"
using namespace omniruntime::expressions;
using ProjectFunc = int32_t (*)(const int64_t *, const uint8_t *, int32_t *, int64_t *, uint8_t *, int32_t *, int64_t);
class StreamCalc : public OneInputStreamOperator, public AbstractStreamOperator<RowData*> {
public:
explicit StreamCalc(const nlohmann::json& description, Output* output);
~StreamCalc() override;
void processElement(StreamRecord* record) override;
void open() override;
void close() override;
StreamRecord* getRecord()
{
return reUsableRecord_;
}
const char *getName() override;
void initializeState(StreamTaskStateInitializerImpl *initializer, TypeSerializer *keySerializer) override
{
LOG("StreamCalc initializeState()")
}
std::string getTypeName() override
{
std::string typeName = "StreamCalc";
typeName.append(__PRETTY_FUNCTION__) ;
return typeName ;
}
void processWatermarkStatus(WatermarkStatus *watermarkStatus) override
{
output->emitWatermarkStatus(watermarkStatus);
}
private:
void parseDescription(const nlohmann::json& description);
using ProjFuncType = void (*) (RowData*, int, RowData*, int);
int extractPrecision(std::basic_string<char> &basicString);
void GetMsgFromJson();
void ParseProjExprs(JSONParser *parser);
std::vector<ProjFuncType> projFuncs_;
std::vector<int32_t> outputTypeIds_;
std::vector<int32_t> outputLengths_;
std::vector<int32_t> inputLengths_;
std::vector<int> outputIndexes_;
int outputSize_;
int inputSize_;
nlohmann::json description_;
bool isSimpleProjection_ = false;
bool hasFilter = false;
TimestampedCollector* timestampedCollector_;
BinaryRowData* reUsableBinaryRow_;
StreamRecord* reUsableRecord_;
const static int SEG_SIZE = 2048;
std::vector<Expr *> projExprs;
Expr* filterCondition = nullptr;
ProjectFunc projector;
};
#endif