* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef FLINK_TNEL_STREAMOPERATORFACTORY_H
#define FLINK_TNEL_STREAMOPERATORFACTORY_H
#include <executiongraph/operatorchain/OperatorPOD.h>
#include <executiongraph/operatorchain/OperatorPOD.h>
#include "OneInputStreamOperator.h"
#include "../../../core/graph/OperatorConfig.h"
#include "streaming/runtime/io/RecordWriterOutput.h"
#include "StreamOperator.h"
namespace omnistream {
class OmniStreamTask;
class StreamOperatorFactory {
public:
static StreamOperator* createOperatorAndCollector(omnistream::OperatorConfig &opConfig,
WatermarkGaugeExposingOutput* chainOutput);
static StreamOperator* createOperatorAndCollector(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<OmniStreamTask> task);
private:
static StreamOperator* CreateStreamCalcOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateStreamJoinOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateLocalWindowAggOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateGlobalWindowAggOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateGroupWindowAggOp(OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput,
std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateWatermarkAssignerOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput,
std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateKeyedProcessOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateSinkOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateSourceOp(omnistream::OperatorPOD &opConfig, WatermarkGaugeExposingOutput* chainOutput,
std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateStreamExpandOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateTimestampInserterOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateProcessOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateConstraintEnforcerOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateWindowInnerJoinOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateStreamingFileWriterOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreatePartitionCommitterOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateInputConversionOperator(omnistream::OperatorPOD &opDesc,
WatermarkGaugeExposingOutput *chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateFilterOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput *chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateMapOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput);
static StreamOperator* CreateBatchFilterOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput);
static StreamOperator* CreateFlatMapOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput);
static StreamOperator* CreateReduceOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
static StreamOperator* CreateSinkWriterOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task,
ProcessingTimeService* processingTimeService);
static StreamOperator* CreateCommitOp(omnistream::OperatorPOD& opConfig,
WatermarkGaugeExposingOutput* chainOutput,
std::shared_ptr<omnistream::OmniStreamTask> task,
ProcessingTimeService* processingTimeService);
static StreamOperator* CreateKeyedCoProcessOp(omnistream::OperatorPOD &opConfig,
WatermarkGaugeExposingOutput* chainOutput, std::shared_ptr<omnistream::OmniStreamTask> task);
};
}
#endif