* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef FLINK_TNEL_OPERATOREVENT_H
#define FLINK_TNEL_OPERATOREVENT_H
#include "io/SimpleVersionedSerializer.h"
class OperatorEvent {
public:
virtual ~OperatorEvent() = default;
virtual std::string toString() = 0;
};
class WatermarkAlignmentEvent : public OperatorEvent {
public:
WatermarkAlignmentEvent(long maxWatermark) : maxWatermark(maxWatermark) {}
long getMaxWatermark()
{
return maxWatermark;
}
std::string toString() override
{
nlohmann::json j;
j["maxWatermark"] = maxWatermark;
return j.dump();
}
private:
long maxWatermark;
};
template <typename E>
class AddSplitEvent : public OperatorEvent {
public:
AddSplitEvent(int serializerVersion, std::vector<std::vector<uint8_t>> splitsVec)
: serializerVersion(serializerVersion), splitsVec(std::move(splitsVec)) {}
AddSplitEvent(std::vector<E> splits, SimpleVersionedSerializer<E>* splitSerializer)
{
splitsVec.reserve(splits.size());
serializerVersion = splitSerializer->getVersion();
for (const auto& split : splits) {
splitsVec.push_back(splitSerializer->serialize(split));
}
}
std::vector<E*> splits(SimpleVersionedSerializer<E>* splitSerializer) const
{
std::vector<E*> result;
result.reserve(splitsVec.size());
for (auto serializedSplit : splitsVec) {
result.push_back(splitSerializer->deserialize(serializerVersion, serializedSplit));
}
return result;
}
std::string toString() override
{
nlohmann::json j;
j["event"] = "AddSplitEvent";
j["serializerVersion"] = serializerVersion;
return j.dump();
}
private:
int serializerVersion;
std::vector<std::vector<uint8_t>> splitsVec;
};
class SourceEvent {
public:
virtual ~SourceEvent() = default;
};
class NoMoreSplitsEvent : public OperatorEvent {
std::string toString() override
{
nlohmann::json j;
j["event"] = "NoMoreSplitsEvent";
return j.dump();
}
};
#endif