* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OPERATORCHAIN_V2_H
#define OPERATORCHAIN_V2_H
#include <executiongraph/StreamConfigPOD.h>
#include <executiongraph/StreamEdgePOD.h>
#include <executiongraph/operatorchain/OperatorChainPOD.h>
#include <streaming/runtime/io/RecordWriterOutputV2.h>
#include <streaming/runtime/io/RecordWriterOutput.h>
#include <streaming/api/operators/StreamOperatorFactory.h>
#include <streaming/api/operators/StreamOperator.h>
#include "StreamOperatorWrapper.h"
#include <typeinfo/TypeInformation.h>
#include "runtime/io/network/api/writer/V2/RecordWriterDelegateV2.h"
#include "runtime/io/network/api/writer/RecordWriterDelegate.h"
#include "CopyingBroadcastingOutputCollector.h"
#include "taskmanager/OmniRuntimeEnvironment.h"
#include "mailbox/StreamTaskActionExecutor.h"
#include "runtime/operators/TableOperatorConstants.h"
#include "runtime/operators/coordination/OperatorEventHandler.h"
#include "streaming/api/operators/OperatorSnapshotFutures.h"
#include "runtime/checkpoint/CheckpointMetaData.h"
#include "runtime/checkpoint/CheckpointOptions.h"
#include "runtime/checkpoint/channel/ChannelStateWriter.h"
#include "runtime/state/CheckpointStreamFactory.h"
#include "runtime/event/AbstractEvent.h"
#include "runtime/jobgraph/OperatorID.h"
#include "core/utils/function/Supplier.h"
#include "runtime/operators/coordination/AcknowledgeCheckpointEvent.h"
#include "runtime/operators/coordination/OperatorEventDispatcher.h"
namespace omnistream {
class OmniStreamTask;
class OperatorChainV2 {
public:
OperatorChainV2(std::weak_ptr<OmniStreamTask> containingTask,
std::shared_ptr<RecordWriterDelegateV2> recordWriterDelegate);
OperatorChainV2(std::shared_ptr<RuntimeEnvironmentV2> env,
std::shared_ptr<omnistream::datastream::RecordWriterDelegate> recordWriterDelegate)
{
TaskInformationPOD taskConfiguration = env->taskConfiguration();
auto configuration = taskConfiguration.getStreamConfigPOD();
auto outputsInOrder = configuration.getOutEdgesInOrder();
auto chainedConfigs = taskConfiguration.getChainedConfigMap();
std::unordered_map<int, datastream::RecordWriterOutput *> recordWriterOutputs;
createDataStreamChainOutputs(outputsInOrder, chainedConfigs, recordWriterDelegate, recordWriterOutputs);
std::vector<StreamOperatorWrapper *> allOperatorWrappers;
this->mainOperatorOutput = createDataStreamOutputCollector(configuration, chainedConfigs, recordWriterOutputs, allOperatorWrappers);
auto opDesc = configuration.getOperatorDescription();
auto chainedOperator = StreamOperatorFactory::createOperatorAndCollector(opDesc, mainOperatorOutput, nullptr);
auto operatorWrapper = new StreamOperatorWrapper(chainedOperator, false);
this->mainOperatorWrapper = operatorWrapper;
allOperatorWrappers.emplace_back(operatorWrapper);
if (allOperatorWrappers.size() != 0) {
this->tailOperatorWrapper = allOperatorWrappers[0];
} else {
this->tailOperatorWrapper = mainOperatorWrapper;
}
linkOperatorWrappers(allOperatorWrappers);
operatorDependenciesDeal();
operatorEventDispatcher = new OperatorEventDispatcherImpl(
env->getOperatorCoordinatorEventGateway());
}
OperatorChainV2(std::shared_ptr<RuntimeEnvironmentV2> env, std::shared_ptr<RecordWriterDelegateV2> recordWriterDelegate)
{
TaskInformationPOD taskConfiguration = env->taskConfiguration();
auto configuration = taskConfiguration.getStreamConfigPOD();
auto outputsInOrder = configuration.getOutEdgesInOrder();
auto chainedConfigs = taskConfiguration.getChainedConfigMap();
std::unordered_map<int, RecordWriterOutputV2*> recordWriterOutputs;
createChainOutputs(outputsInOrder, chainedConfigs, recordWriterDelegate, recordWriterOutputs);
std::vector<StreamOperatorWrapper *> allOperatorWrappers;
this->mainOperatorOutput = createOutputCollector(nullptr, configuration,
chainedConfigs, recordWriterOutputs, allOperatorWrappers);
auto opDesc = configuration.getOperatorDescription();
auto chainedOperator = StreamOperatorFactory::createOperatorAndCollector(opDesc, mainOperatorOutput, nullptr);
registerHandler(opDesc, chainedOperator);
auto operatorWrapper = new StreamOperatorWrapper(chainedOperator, false);
this->mainOperatorWrapper = operatorWrapper;
allOperatorWrappers.emplace_back(operatorWrapper);
if (!allOperatorWrappers.empty()) {
this->tailOperatorWrapper = allOperatorWrappers[0];
} else {
this->tailOperatorWrapper = mainOperatorWrapper;
}
linkOperatorWrappers(allOperatorWrappers);
}
~OperatorChainV2()
{
delete mainOperatorWrapper;
mainOperatorWrapper = nullptr;
tailOperatorWrapper = nullptr;
}
void finishOperators(StreamTaskActionExecutor *actionExecutor);
StreamOperator *createMainOperatorAndCollector(OperatorChainPOD &opChainConfig,
RecordWriterOutputV2 *chainOutput);
void initializeStateAndOpenOperators(StreamTaskStateInitializerImpl *initializer, const TaskInformationPOD& taskConfiguration_);
[[nodiscard]] StreamOperator *getMainOperator() const
{
return mainOperatorWrapper->getStreamOperator();
}
[[nodiscard]] WatermarkGaugeExposingOutput *GetMainOperatorOutput() const
{
return mainOperatorOutput;
}
bool IsClosed() const
{
return isClosed_;
}
virtual void CloseAllOperators()
{
isClosed_ = true;
}
void AlignedBarrierTimeout(long checkpointId)
{
}
void broadcastEvent(std::shared_ptr<AbstractEvent> event, bool isPriorityEvent)
{
LOG_DEBUG("broadcastEvent")
for (RecordWriterOutputV2* streamOutput : streamOutputs) {
streamOutput->broadcastEvent(event, isPriorityEvent);
}
};
void broadcastEvent(std::shared_ptr<AbstractEvent> event)
{
broadcastEvent(event, false);
};
const std::vector<RecordWriterOutputV2*>& getStreamOutputs() const
{
return streamOutputs;
}
void DispatchOperatorEvent(const std::string& operatorIdString, const std::string& eventString);
void PrepareSnapshotPreBarrier(long checkpointId);
void NotifyCheckpointComplete(long checkpointId);
void NotifyCheckpointAborted(long checkpointId);
void NotifyCheckpointSubsumed(long checkpointId);
void SnapshotState(std::unordered_map<OperatorID, OperatorSnapshotFutures *> *operatorSnapshotsInProgress,
CheckpointMetaData &checkpointMetaData, CheckpointOptions *checkpointOptions, std::shared_ptr<Supplier<bool>> isRunning,
std::shared_ptr<ChannelStateWriter::ChannelStateWriteResult> channelStateWriteResult, CheckpointStreamFactory* storage,
const std::shared_ptr<OmniTaskBridge>& bridge);
bool IsTaskDeployedAsFinished()
{
return false;
}
protected:
bool isClosed_ = false;
WatermarkGaugeExposingOutput *mainOperatorOutput;
std::unordered_map<std::string, OperatorEventHandler *> handlers;
StreamOperatorWrapper *mainOperatorWrapper;
std::vector<RecordWriterOutputV2*> streamOutputs;
StreamOperatorWrapper *tailOperatorWrapper;
OperatorEventDispatcherImpl* operatorEventDispatcher;
void SnapshotChannelStates(StreamOperator* op, std::shared_ptr<ChannelStateWriter::ChannelStateWriteResult> channelStateWriteResult,
OperatorSnapshotFutures* snapshotInProgress);
void SendAcknowledgeCheckpointEvent(long checkpointId);
ReadIterator getAllOperators(bool reverse) {
return reverse
? ReadIterator(tailOperatorWrapper, true)
: ReadIterator(mainOperatorWrapper, false);
}
void operatorDependenciesDeal();
private:
void createChainOutputs(std::vector<StreamEdgePOD>& outputsInOrder,
std::unordered_map<int, StreamConfigPOD>& chainedConfigs,
std::shared_ptr<RecordWriterDelegateV2> recordWriterDelegate,
std::unordered_map<int, RecordWriterOutputV2*>& recordWriterOutputs);
RecordWriterOutputV2* createStreamOutput(RecordWriterV2 *recordWriter, TypeInformation &typeInformation, const NonChainedOutputPOD &streamOutput);
void createDataStreamChainOutputs(std::vector<StreamEdgePOD>& outputsInOrder, std::unordered_map<int, StreamConfigPOD>& chainedConfigs,
std::shared_ptr<datastream::RecordWriterDelegate> recordWriterDelegate, std::unordered_map<int, datastream::RecordWriterOutput*>& recordWriterOutputs);
datastream::RecordWriterOutput* createDataStreamStreamOutput(datastream::RecordWriter* recordWriter, TypeInformation& typeInformation);
TypeInformation* getChainOutputType(OperatorPOD operatorPod);
TypeInformation* getDataStreamStateKeyType(OperatorPOD operatorPod);
TypeInformation* getDataStreamChainOutputType(OperatorPOD operatorPod);
static WatermarkGaugeExposingOutput* wrapOperatorIntoOutput(StreamOperator* op, omnistream::OperatorPOD &opConfig);
WatermarkGaugeExposingOutput *createOperatorChain(
const std::shared_ptr<OmniStreamTask> &streamTask,
StreamConfigPOD *operatorConfig,
std::unordered_map<int, StreamConfigPOD> &chainedConfigs,
std::unordered_map<int, RecordWriterOutputV2 *> &recordWriterOutputs,
std::vector<StreamOperatorWrapper *> &allOperatorWrappers);
WatermarkGaugeExposingOutput *createOutputCollector(
const std::shared_ptr<OmniStreamTask> &streamTask, StreamConfigPOD &operatorConfig,
std::unordered_map<int, StreamConfigPOD> &chainedConfigs,
std::unordered_map<int, RecordWriterOutputV2 *> &recordWriterOutputs,
std::vector<StreamOperatorWrapper *> &allOperatorWrappers);
static void linkOperatorWrappers(std::vector<StreamOperatorWrapper *> &allOperatorWrappers);
WatermarkGaugeExposingOutput *createDataStreamOutputCollector(StreamConfigPOD &operatorConfig,
std::unordered_map<int, StreamConfigPOD> &chainedConfigs,
std::unordered_map<int, datastream::RecordWriterOutput *> &recordWriterOutputs,
std::vector<StreamOperatorWrapper *> &allOperatorWrappers);
WatermarkGaugeExposingOutput *createDataStreamOperatorChain(StreamConfigPOD &operatorConfig,
std::unordered_map<int, StreamConfigPOD> &chainedConfigs,
std::unordered_map<int, datastream::RecordWriterOutput *> &recordWriterOutputs,
std::vector<StreamOperatorWrapper *> &allOperatorWrappers);
void registerHandler(OperatorPOD &opDesc, StreamOperator *streamOperator)
{
auto id = opDesc.getId();
if (id == OPERATOR_NAME_SOURCE_OPERATOR) {
if (auto *handler = dynamic_cast<OperatorEventHandler *>(streamOperator)) {
handlers.emplace(opDesc.getOperatorId(), handler);
} else {
LOG("Unsupported source operator of csv");
}
}
}
OperatorSnapshotFutures *BuildOperatorSnapshotFutures(
CheckpointMetaData checkpointMetaData,
CheckpointOptions *checkpointOptions,
StreamOperator *op,
std::shared_ptr<Supplier<bool>> isRunning,
std::shared_ptr<ChannelStateWriter::ChannelStateWriteResult> channelStateWriteResult,
CheckpointStreamFactory *storage,
const std::shared_ptr<OmniTaskBridge>& bridge);
OperatorSnapshotFutures *CheckpointStreamOperator(
StreamOperator *op,
CheckpointMetaData checkpointMetaData,
CheckpointOptions *checkpointOptions,
CheckpointStreamFactory *storageLocation,
std::shared_ptr<Supplier<bool>> isRunning,
const std::shared_ptr<OmniTaskBridge>& bridge);
};
};
#endif