* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef FLINK_TNEL_TIMESTAMPINSERTERSINKOPERATOR_H
#define FLINK_TNEL_TIMESTAMPINSERTERSINKOPERATOR_H
#include "streaming/api/operators/AbstractUdfStreamOperator.h"
#include "streaming/api/operators/OneInputStreamOperator.h"
#include "DiscardingSink.h"
#include "streaming/runtime/streamrecord/StreamRecord.h"
class TimeStampInserterSinkOperator : public AbstractUdfStreamOperator<SinkFunction<StreamRecord*>, int>,
public OneInputStreamOperator {
public:
TimeStampInserterSinkOperator(const nlohmann::json& description, Output* output, nlohmann::json desc)
: AbstractUdfStreamOperator(new DiscardingSink(description), output),
description(description)
{
rowTimeIndex = desc["rowtimeFieldIndex"];
};
~TimeStampInserterSinkOperator() override {};
void open() override;
const char* getName() override;
void initializeState(StreamTaskStateInitializerImpl* initializer, TypeSerializer* keySerializer) override
{
INFO_RELEASE("TimeStampInserterSinkOperator::initializeState not impl");
}
void notifyCheckpointComplete(long checkpointId) override
{
INFO_RELEASE(
"TimeStampInserterSinkOperator::notifyCheckpointComplete not impl checkpointId : " << checkpointId);
}
void notifyCheckpointAborted(long checkpointId) override
{
INFO_RELEASE("TimeStampInserterSinkOperator::notifyCheckpointAborted not impl checkpointId : " << checkpointId);
}
void processBatch(StreamRecord* record) override;
void processElement(StreamRecord* record) override;
void ProcessWatermark(Watermark* watermark) override
{
AbstractStreamOperator<int>::ProcessWatermark(watermark);
}
std::string getTypeName() override;
void processWatermarkStatus(WatermarkStatus* watermarkStatus) override
{
output->emitWatermarkStatus(watermarkStatus);
}
int rowTimeIndex;
private:
nlohmann::json description;
};
#endif