* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OMNISTREAM_INTERNALTIMERSERVICESERIALIZATIONPROXY_H
#define OMNISTREAM_INTERNALTIMERSERVICESERIALIZATIONPROXY_H
#pragma once
#include <cstdint>
#include <array>
#include "runtime/state/KeyedStateCheckpointOutputStream.h"
#include "runtime/state/restore/RawKeyedStateInputStreamProxy.h"
template <typename K>
class InternalTimeServiceManager;
* C++ counterpart of Flink's InternalTimerServiceSerializationProxy.
*
* The proxy writes one key-group of all registered timer services into the raw
* keyed state stream. Version is intentionally aligned with Flink 1.16.x.
*/
template <typename K>
class InternalTimerServiceSerializationProxy {
public:
static constexpr int32_t VERSION = 2;
inline static constexpr std::array<uint8_t, 4> VERSIONED_IDENTIFIER = {0xF1, 0xCD, 0x85, 0x9F};
InternalTimerServiceSerializationProxy(InternalTimeServiceManager<K>* timerServicesManager, int32_t keyGroupIdx)
: timerServicesManager_(timerServicesManager),
keyGroupIdx_(keyGroupIdx)
{
}
void write(KeyedStateCheckpointOutputStream* out)
{
out->writeBytes(VERSIONED_IDENTIFIER.data(), VERSIONED_IDENTIFIER.size());
out->writeInt(VERSION);
timerServicesManager_->writeTimersForKeyGroup(out, keyGroupIdx_);
}
void read(RawKeyedStateInputStreamProxy* in)
{
for (uint8_t expected : VERSIONED_IDENTIFIER) {
uint8_t actual = in->readByte();
if (actual != expected) {
INFO_RELEASE(
"Error: read Invalid Flink timer service serialization header. expected byte="
<< static_cast<int>(expected) << ", actual byte=" << static_cast<int>(actual));
THROW_LOGIC_EXCEPTION(
"Invalid Flink timer service serialization header. expected byte="
<< static_cast<int>(expected) << ", actual byte=" << static_cast<int>(actual));
}
}
int32_t version = in->readInt();
if (version != VERSION) {
INFO_RELEASE("Error: read Unsupported timer service serialization version: " << version);
THROW_LOGIC_EXCEPTION("Unsupported timer service serialization version: " << version);
}
timerServicesManager_->readTimersForKeyGroup(in, keyGroupIdx_, version);
}
private:
InternalTimeServiceManager<K>* timerServicesManager_ = nullptr;
int32_t keyGroupIdx_ = -1;
};
#endif