* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#pragma once
#include <nlohmann/json.hpp>
#include "StreamOperator.h"
#include "AbstractStreamOperator.h"
#include "StreamOperatorStateHandler.h"
#include "Output.h"
#include "StreamingRuntimeContext.h"
#include "StreamTaskStateInitializerImpl.h"
#include "ChainingStrategy.h"
#include "Input.h"
#include "table/runtime/operators/InternalTimerServiceImpl.h"
#include "table/runtime/operators/InternalTimeServiceManager.h"
#include "KeyContext.h"
#include "streaming/runtime/tasks/ProcessingTimeService.h"
#include "core/api/common/eventtime/IndexedCombinedWatermarkStatus.h"
#include "table/typeutils/RowDataSerializer.h"
#include "runtime/metrics/groups/TaskMetricGroup.h"
#include "streaming/runtime/tasks/omni/OmniStreamTask.h"
#include "runtime/state/StateInitializationContextImpl.h"
* K: such as Object*
* */
template <typename K>
class AbstractStreamOperator : public StreamOperator,
public KeyContext<K>,
public StreamOperatorStateHandler<K>::CheckpointedStreamOperator {
public:
void setDescription(nlohmann::json description)
{
desc = description;
}
AbstractStreamOperator()
{
this->runtimeContext = nullptr;
}
explicit AbstractStreamOperator(Output* output)
{
this->output = output;
this->runtimeContext = nullptr;
}
~AbstractStreamOperator() override
{
LOG("AbstractStreamOperator::~AbstractStreamOperator()");
delete stateHandler;
delete runtimeContext;
delete combinedWatermark;
};
void setup()
{
LOG("AbstractStreamOperator::setup()" << "new StreamingRuntimeContext<K>");
this->runtimeContext = new StreamingRuntimeContext<K>(nullptr, nullptr);
constexpr int inputsCount = 2;
combinedWatermark = new omnistream::IndexedCombinedWatermarkStatus(inputsCount);
}
void SetOpName(std::string operatorName)
{
this->opName = operatorName;
}
std::string GetOpName()
{
return this->opName;
}
void setup(std::shared_ptr<omnistream::OmniStreamTask> task)
{
this->setup();
if (task != nullptr) {
this->metrics = task->env()->taskMetricGroup();
}
}
std::shared_ptr<omnistream::TaskMetricGroup> GetMectrics() override
{
LOG("AbstractStreamOperator GetMectrics");
return this->metrics;
}
void setOutput(Output* outputPtr)
{
this->output = outputPtr;
}
Output* GetOutput()
{
return output;
}
void setCurrentKey(K key) override
{
stateHandler->setCurrentKey(key);
};
K getCurrentKey() override
{
return stateHandler->getCurrentKey();
};
void open() override {};
void close() override
{
if (stateHandler != nullptr) {
stateHandler->dispose();
}
};
TypeSerializer* GetOperatorKeySerializer()
{
return new BinaryRowDataSerializer(1);
};
void initializeState(StateInitializationContextImpl* context) override
{
}
void initializeState(StreamTaskStateInitializerImpl* initializer, TypeSerializer* keySerializer) override
{
LOG("abstractStreamOperator::initializeState");
auto operatorID = this->GetOperatorID();
StreamOperatorStateContextImpl<K>* context =
initializer->streamOperatorStateContext<K>(keySerializer, this, processingTimeService, &operatorID);
stateHandler = new StreamOperatorStateHandler<K>(context);
auto stateStore = stateHandler->getKeyedStateStore();
if (runtimeContext != nullptr) {
runtimeContext->setKeyedStateStore(stateStore);
runtimeContext->setEnvironment(initializer->getEnvironment());
}
timeServiceManager = context->getInternalTimeServiceManager();
stateHandler->initializeOperatorState(this);
}
StreamingRuntimeContext<K>* getRuntimeContext() const
{
return runtimeContext;
}
AbstractKeyedStateBackend<K>* getKeyedStateBackend() const
{
return stateHandler->getKeyedStateBackend();
}
OperatorStateBackend* getOperatorStateBackend()
{
return stateHandler->getOperatorStateBackend();
}
std::string getTypeName() override
{
std::string typeName = "AbstractStreamOperator";
typeName.append(__PRETTY_FUNCTION__);
return typeName;
}
template <typename N>
InternalTimerServiceImpl<K, N>* getInternalTimerService(
std::string name, TypeSerializer* namespaceSerializer, Triggerable<K, N>* triggerable)
{
if (timeServiceManager == nullptr) {
THROW_LOGIC_EXCEPTION("The timer service has not been initialized");
}
AbstractKeyedStateBackend<K>* keyedStateBackend = getKeyedStateBackend();
if (keyedStateBackend == nullptr) {
THROW_LOGIC_EXCEPTION("Timers can only be used on keyed operators");
}
return timeServiceManager->template getInternalTimerService<N>(
name, keyedStateBackend->getKeySerializer(), namespaceSerializer, triggerable);
}
virtual void ProcessWatermark(Watermark* mark)
{
if (timeServiceManager != nullptr) {
timeServiceManager->advanceWatermark(mark);
}
output->emitWatermark(mark);
}
void ProcessWatermark1(Watermark* mark)
{
ProcessWatermark(mark, 0);
}
void ProcessWatermark2(Watermark* mark)
{
ProcessWatermark(mark, 1);
}
void processWatermarkStatus(WatermarkStatus* watermarkStatus)
{
output->emitWatermarkStatus(watermarkStatus);
}
void setProcessingTimeService(ProcessingTimeService* service)
{
processingTimeService = service;
};
ProcessingTimeService* getProcessingTimeService()
{
return processingTimeService;
};
OperatorSnapshotFutures* SnapshotState(
long checkpointId,
long timestamp,
CheckpointOptions* checkpointOptions,
CheckpointStreamFactory* storageLocation,
const std::shared_ptr<OmniTaskBridge>& bridge) override
{
return stateHandler->SnapshotState(
this,
timeServiceManager,
GetOpName(),
checkpointId,
timestamp,
checkpointOptions,
storageLocation,
false,
bridge);
}
void notifyCheckpointComplete(long checkpointId) override
{
stateHandler->notifyCheckpointComplete(checkpointId);
}
void notifyCheckpointAborted(long checkpointId) override
{
stateHandler->notifyCheckpointAborted(checkpointId);
}
protected:
StreamOperatorStateHandler<K>* stateHandler = nullptr;
Output* output = nullptr;
StreamingRuntimeContext<K>* runtimeContext = nullptr;
ChainingStrategy chainingStrategy;
nlohmann::json desc;
InternalTimeServiceManager<K>* timeServiceManager = nullptr;
std::shared_ptr<omnistream::TaskMetricGroup> metrics;
std::string opName;
bool isStream = false;
omnistream::IndexedCombinedWatermarkStatus* combinedWatermark = nullptr;
private:
ProcessingTimeService* processingTimeService = nullptr;
void ProcessWatermark(Watermark* mark, int index)
{
LOG(">>>>>>>>>>");
if (combinedWatermark->UpdateWatermark(index, mark->getTimestamp())) {
Watermark watermark(combinedWatermark->GetCombinedWatermark());
this->ProcessWatermark(&watermark);
}
}
};