* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef FLINK_TNEL_STREAMOPERATOR_H
#define FLINK_TNEL_STREAMOPERATOR_H
#include "StreamTaskStateInitializerImpl.h"
#include "Output.h"
#include "runtime/metrics/groups/TaskMetricGroup.h"
#include "runtime/state/CheckpointStreamFactory.h"
#include "OperatorSnapshotFutures.h"
#include "runtime/jobgraph/OperatorID.h"
#include "runtime/checkpoint/CheckpointOptions.h"
#include "runtime/checkpoint/TaskStateSnapshotDeserializer.h"
#include "runtime/checkpoint/CheckpointListener.h"
#include "io/network/api/StopMode.h"
class StreamOperator : public CheckpointListener {
public:
StreamOperator()
{
operatorId = OperatorID();
}
virtual void open() { };
virtual void close() { };
virtual void stop(StopMode mode) {}
virtual void setKeyContextElement(StreamRecord* record){};
virtual void setKeyContextElement1(StreamRecord* record){};
virtual void setKeyContextElement2(StreamRecord* record){};
virtual void initializeState(StreamTaskStateInitializerImpl *initializer, TypeSerializer *keySerializer) =0;
virtual bool isSetKeyContextElement()
{
return false;
}
virtual bool isSetKeyContextElement1()
{
return false;
}
virtual bool isSetKeyContextElement2()
{
return false;
}
virtual bool canBeStreamOperator()
{
return false;
}
virtual void finish() { };
virtual std::string getTypeName() = 0;
virtual ~StreamOperator()
{
LOG("StreamOperator::~StreamOperator()");
}
virtual std::shared_ptr<omnistream::TaskMetricGroup> GetMectrics()
{
LOG("Stream Operator GetMectrics")
return nullptr;
};
virtual void PrepareSnapshotPreBarrier(long checkpointId)
{
LOG("prepareSnapshotPreBarrier " << checkpointId);
};
* Called to draw a state snapshot from the operator.
*
* @return a runnable future to the state handle that points to the snapshotted state. For
* synchronous implementations, the runnable might already be finished.
* @throws Exception exception that happened during snapshotting.
*/
virtual OperatorSnapshotFutures *SnapshotState(long checkpointId, long timestamp,
CheckpointOptions *checkpointOptions, CheckpointStreamFactory* storageLocation,
const std::shared_ptr<OmniTaskBridge>& bridge)
{
LOG("checkpointId " << checkpointId << " timestamp " << timestamp);
return new OperatorSnapshotFutures();
}
virtual void notifyCheckpointComplete(long checkpointId)
{
LOG("notifyCheckpointComplete checkpointId " << checkpointId)
}
virtual void notifyCheckpointAborted(long checkpointId)
{
LOG("notifyCheckpointAborted checkpointId " << checkpointId)
}
virtual void notifyCheckpointSubsumed(long checkpointId)
{
LOG("notifyCheckpointSubsumed checkpointId " << checkpointId)
}
virtual void SetOperatorID(const std::string& operatorId_)
{
this->operatorId = TaskStateSnapshotDeserializer::HexStringToOperatorId<OperatorID>(operatorId_);
}
virtual OperatorID GetOperatorID()
{
return operatorId;
};
protected:
OperatorID operatorId;
};
#endif