* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OMNISTREAM_ROCKSINCREMENTALSNAPSHOTSTRATEGY_H
#define OMNISTREAM_ROCKSINCREMENTALSNAPSHOTSTRATEGY_H
#include <filesystem>
#include <algorithm>
#include <map>
#include <vector>
#include <mutex>
#include <atomic>
#include <memory>
#include "RocksDBSnapshotStrategyBase.h"
#include "runtime/state/rocksdb/RocksDBStateUploader.h"
#include "fs/CloseableRegistry.h"
#include "state/IncrementalRemoteKeyedStateHandle.h"
#include "checkpoint/CheckpointOptions.h"
#include "runtime/state/SnapshotResult.h"
#include "runtime/state/SnapshotStrategy.h"
#include "runtime/state/rocksdb/util/ResourceGuard.h"
using namespace std::filesystem;
using HandleAndLocalPath = IncrementalRemoteKeyedStateHandle::HandleAndLocalPath;
class RocksIncrementalSnapshotStrategy : public RocksDBSnapshotStrategyBase {
public:
RocksIncrementalSnapshotStrategy(
rocksdb::DB* db,
std::shared_ptr<ResourceGuard> rocksDBResourceGuard,
std::shared_ptr<TypeSerializer> keySerializer,
std::unordered_map<std::string, std::shared_ptr<RocksDbKvStateInfo>> *kvStateInformation,
KeyGroupRange keyGroupRange,
int keyGroupPrefixBytes,
std::shared_ptr<LocalRecoveryConfig> localRecoveryConfig,
const std::string& instanceBasePath,
UUID backendUID,
const std::map<long, std::vector<HandleAndLocalPath>>& uploadedStateHandles,
std::shared_ptr<RocksDBStateUploader> rocksDBStateUploader,
long lastCompletedCheckpointId
);
std::shared_ptr<SnapshotResources> syncPrepareResources(long checkpointId)
{
return RocksDBSnapshotStrategyBase::syncPrepareResources(checkpointId);
};
std::shared_ptr<SnapshotResultSupplier<KeyedStateHandle>> asyncSnapshot(
const std::shared_ptr<SnapshotResources>& snapshotResources,
long checkpointId,
long timestamp,
CheckpointStreamFactory* checkpointStreamFactory,
CheckpointOptions* checkpointOptions,
std::string keySerializer = "") override;
void notifyCheckpointComplete(long completedCheckpointId);
void notifyCheckpointAborted(long abortedCheckpointId);
void close();
protected:
std::shared_ptr<PreviousSnapshot> snapshotMetaData(
long checkpointId,
std::vector<std::shared_ptr<StateMetaInfoSnapshot>>& stateMetaInfoSnapshots) override;
class RocksDBIncrementalSnapshotOperation : public RocksDBSnapshotStrategyBase::RocksDBSnapshotOperation {
public:
RocksDBIncrementalSnapshotOperation(
RocksIncrementalSnapshotStrategy* parent,
long checkpointId,
CheckpointStreamFactory* checkpointStreamFactory,
std::shared_ptr<SnapshotDirectory> localBackupDirectory,
std::shared_ptr<PreviousSnapshot> previousSnapshot,
SnapshotType::SharingFilesStrategy sharingFilesStrategy,
std::vector<std::shared_ptr<StateMetaInfoSnapshot>>& stateMetaInfoSnapshots,
CheckpointOptions *checkpointOptions,
std::shared_ptr<TypeSerializer> keySerializer);
std::shared_ptr<SnapshotResult<KeyedStateHandle>> get(std::shared_ptr<omnistream::OmniTaskBridge> bridge) override;
private:
long uploadSnapshotFiles(
std::vector<HandleAndLocalPath>& sstFiles,
std::vector<HandleAndLocalPath>& miscFiles,
std::shared_ptr<omnistream::OmniTaskBridge> bridge);
void createUploadFilePaths(
const std::vector<path>& files,
std::vector<HandleAndLocalPath>& sstFiles,
std::vector<fs::path>& sstFilePaths,
std::vector<fs::path>& miscFilePaths);
RocksIncrementalSnapshotStrategy* parent_;
std::shared_ptr<PreviousSnapshot> previousSnapshot_;
SnapshotType::SharingFilesStrategy sharingFilesStrategy_;
CheckpointOptions *checkpointOptions_;
};
private:
std::mutex uploadedSstFilesMutex_;
std::map<long, std::vector<HandleAndLocalPath>> uploadedSstFiles_;
std::atomic<long> lastCompletedCheckpointId_;
std::shared_ptr<RocksDBStateUploader> stateUploader_;
};
#endif