* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OMNISTREAM_HEAPSNAPSHOTSTRATEGY_H
#define OMNISTREAM_HEAPSNAPSHOTSTRATEGY_H
#include "runtime/state/SnapshotStrategy.h"
#include "runtime/state/KeyedStateHandle.h"
#include "runtime/state/FullSnapshotResources.h"
#include "runtime/state/FullSnapshotAsyncWriter.h"
#include "runtime/checkpoint/SavepointType.h"
#include "runtime/checkpoint/CheckpointType.h"
#include "runtime/state/heap/HeapSnapshotResourceFactory.h"
#include "common.h"
* Checkpoint snapshot strategy for Heap state backend.
*
* Uses the same FullSnapshotAsyncWriter as the RocksDB full/savepoint strategies,
* but gets its data from HeapFullSnapshotResources instead of RocksDB snapshots.
*
* The strategy is long-lived and owns the sync/async split of the heap snapshot
* pipeline: syncPrepareResources() freezes a point-in-time resource view through
* HeapSnapshotResourceFactory, and asyncSnapshot() wraps it in a FullSnapshotAsyncWriter.
* Why SavepointType (canonical) is used here even for regular checkpoints:
* FullSnapshotAsyncWriter always emits the canonical savepoint binary format
* (END_OF_KEY_GROUP_MASK / FIRST_BIT_IN_BYTE_MASK framing). Flink's
* HeapKeyedStateBackendBuilder routes restore via HeapSavepointRestoreOperation
* (-> FullSnapshotRestoreOperation, which understands that format) only when
* the handle implements SavepointKeyedStateHandle. Tagging the produced handle
* with a SavepointType makes FullSnapshotAsyncWriter wrap the result in a
* KeyGroupsSavepointStateHandle, so Flink-native restore from an OmniStream
* heap checkpoint dispatches to the right reader instead of HeapRestoreOperation
* (which expects the heap-specific per-key-group [int keyGroupId][short stateId]...
* framing and would otherwise throw "Unexpected key-group in restore").
*/
template <typename K>
class HeapSnapshotStrategy : public SnapshotStrategy<KeyedStateHandle, FullSnapshotResources> {
public:
explicit HeapSnapshotStrategy(const std::shared_ptr<HeapSnapshotResourceFactory<K>>& snapshotResourceFactory)
: snapshotResourceFactory_(snapshotResourceFactory)
{
}
std::shared_ptr<FullSnapshotResources> syncPrepareResources(long checkpointId) override
{
auto snapshotResources = snapshotResourceFactory_->createSnapshotResources(checkpointId);
return snapshotResources;
}
std::shared_ptr<SnapshotResultSupplier<KeyedStateHandle>> asyncSnapshot(
const std::shared_ptr<FullSnapshotResources>& snapshotResources,
long checkpointId,
long timestamp,
CheckpointStreamFactory* streamFactory,
CheckpointOptions* checkpointOptions,
std::string keySerializer = "") override
{
if (snapshotResources->getMetaInfoSnapshots().empty()) {
INFO_RELEASE("Error:HeapSnapshotStrategy: no states to snapshot, returning empty");
struct EmptySnapshotResultSupplier : public SnapshotResultSupplier<KeyedStateHandle> {
std::shared_ptr<SnapshotResult<KeyedStateHandle>> get(
std::shared_ptr<omnistream::OmniTaskBridge> bridge) override
{
return SnapshotResult<KeyedStateHandle>::Empty();
}
};
return std::make_shared<EmptySnapshotResultSupplier>();
}
return std::make_shared<FullSnapshotAsyncWriter>(
SavepointType::savepoint(SavepointFormatType::CANONICAL),
checkpointOptions,
checkpointId,
snapshotResources,
keySerializer);
}
private:
std::shared_ptr<HeapSnapshotResourceFactory<K>> snapshotResourceFactory_;
};
#endif