* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OMNISTREAM_ROCKSDBSNAPSHOTSTRATEGYBASE_H
#define OMNISTREAM_ROCKSDBSNAPSHOTSTRATEGYBASE_H
#include <memory>
#include <string>
#include <vector>
#include <map>
#include <unordered_map>
#include <functional>
#include <mutex>
#include <jni.h>
#include <rocksdb/db.h>
#include <rocksdb/utilities/checkpoint.h>
#include "runtime/state/UUID.h"
#include "typeutils/TypeSerializer.h"
#include "runtime/state/RocksDbKvStateInfo.h"
#include "runtime/state/LocalRecoveryConfig.h"
#include "runtime/state/metainfo/StateMetaInfoSnapshot.h"
#include "state/SnapshotDirectory.h"
#include "state/SnapshotDirectoryFactory.h"
#include "state/LocalRecoveryDirectoryProviderImpl.h"
#include "state/IncrementalKeyedStateHandle.h"
#include "runtime/state/rocksdb/util/ResourceGuard.h"
#include "fs/CloseableRegistry.h"
#include "runtime/state/PlaceholderStreamStateHandle.h"
#include "runtime/state/KeyGroupRange.h"
#include "runtime/state/CheckpointStreamFactory.h"
#include "runtime/state/SnapshotResult.h"
#include "runtime/checkpoint/CheckpointOptions.h"
#include "state/IncrementalRemoteKeyedStateHandle.h"
#include "state/IncrementalLocalKeyedStateHandle.h"
#include "runtime/state/SnapshotStrategyRunner.h"
#include "runtime/state/SnapshotStrategy.h"
#include "runtime/checkpoint/CheckpointListener.h"
using HandleAndLocalPath = IncrementalRemoteKeyedStateHandle::HandleAndLocalPath;
class PreviousSnapshot;
class NativeRocksDBSnapshotResources;
class RocksDBSnapshotStrategyBase : public CheckpointListener,
public SnapshotStrategy<KeyedStateHandle, SnapshotResources> {
public:
RocksDBSnapshotStrategyBase(
std::string description,
rocksdb::DB* db,
std::shared_ptr<ResourceGuard> rocksDBResourceGuard,
std::shared_ptr<TypeSerializer> keySerializer,
std::unordered_map<std::string, std::shared_ptr<RocksDbKvStateInfo>> *kvStateInformation,
KeyGroupRange keyGroupRange,
int keyGroupPrefixBytes,
std::shared_ptr<LocalRecoveryConfig> localRecoveryConfig,
std::string instanceBasePath,
UUID backendUID
);
std::string getDescription() const;
std::shared_ptr<SnapshotResources> syncPrepareResources(long checkpointId);
void cleanupIncompleteSnapshot(
std::shared_ptr<SnapshotDirectory> localBackupDirectory
);
virtual std::shared_ptr<SnapshotResultSupplier<KeyedStateHandle>> asyncSnapshot(
const std::shared_ptr<SnapshotResources>& snapshotResources,
long checkpointId,
long timestamp,
CheckpointStreamFactory* checkpointStreamFactory,
CheckpointOptions* checkpointOptions,
std::string keySerializer = "") = 0;
protected:
virtual std::shared_ptr<PreviousSnapshot> snapshotMetaData(
long checkpointId,
std::vector<std::shared_ptr<StateMetaInfoSnapshot>>& stateMetaInfoSnapshots
) = 0;
std::shared_ptr<SnapshotResult<StreamStateHandle>> materializeMetaData(
std::vector<std::shared_ptr<StateMetaInfoSnapshot>>& stateMetaInfoSnapshots,
long checkpointId,
CheckpointOptions *checkpointOptions,
std::shared_ptr<omnistream::OmniTaskBridge> bridge,
std::string keySerializer = "")
{
return bridge->CallMaterializeMetaData(checkpointId, stateMetaInfoSnapshots, localRecoveryConfig_,
checkpointOptions, keySerializer);
};
std::shared_ptr<SnapshotDirectory> prepareLocalSnapshotDirectory(long checkpointId);
void takeDBNativeCheckpoint(std::shared_ptr<SnapshotDirectory> outputDirectory);
std::string description_;
rocksdb::DB* db_;
std::shared_ptr<ResourceGuard> rocksDBResourceGuard_;
std::unordered_map<std::string, std::shared_ptr<RocksDbKvStateInfo>> *kvStateInformation_;
KeyGroupRange keyGroupRange_;
int keyGroupPrefixBytes_;
std::shared_ptr<LocalRecoveryConfig> localRecoveryConfig_;
std::string instanceBasePath_;
std::string localDirectoryName_;
UUID backendUID_;
std::shared_ptr<TypeSerializer> keySerializer_;
static const PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT;
class RocksDBSnapshotOperation : public SnapshotResultSupplier<KeyedStateHandle> {
public:
RocksDBSnapshotOperation(
long checkpointId,
CheckpointStreamFactory* checkpointStreamFactory,
std::shared_ptr<SnapshotDirectory> localBackupDirectory,
std::vector<std::shared_ptr<StateMetaInfoSnapshot>> stateMetaInfoSnapshots,
std::shared_ptr<TypeSerializer> keySerializer);
virtual ~RocksDBSnapshotOperation() = default;
std::shared_ptr<SnapshotResult<KeyedStateHandle>> get(std::shared_ptr<omnistream::OmniTaskBridge> bridge) override;
protected:
std::shared_ptr<KeyedStateHandle> getLocalSnapshot(
RocksDBSnapshotStrategyBase* parent,
std::shared_ptr<StreamStateHandle> localStreamStateHandle,
std::vector<HandleAndLocalPath> sharedState);
long checkpointId;
CheckpointStreamFactory* checkpointStreamFactory;
std::vector<std::shared_ptr<StateMetaInfoSnapshot>> stateMetaInfoSnapshots;
std::shared_ptr<SnapshotDirectory> localBackupDirectory;
std::shared_ptr<CloseableRegistry> tmpResourcesRegistry;
std::shared_ptr<TypeSerializer> keySerializer;
};
};
class PreviousSnapshot {
public:
explicit PreviousSnapshot(std::vector<HandleAndLocalPath> confirmedSstFiles);
std::shared_ptr<StreamStateHandle> getUploaded(const std::string& filename);
static const std::shared_ptr<PreviousSnapshot> EMPTY_PREVIOUS_SNAPSHOT;
private:
std::map<std::string, std::shared_ptr<StreamStateHandle>> confirmedSstFiles_;
};
class NativeRocksDBSnapshotResources : public SnapshotResources {
public:
NativeRocksDBSnapshotResources(
std::shared_ptr<SnapshotDirectory> snapshotDirectory,
std::shared_ptr<PreviousSnapshot> previousSnapshot,
std::vector<std::shared_ptr<StateMetaInfoSnapshot>> stateMetaInfoSnapshots);
void release();
void cleanup() {};
std::shared_ptr<SnapshotDirectory> snapshotDirectory;
std::shared_ptr<PreviousSnapshot> previousSnapshot;
std::vector<std::shared_ptr<StateMetaInfoSnapshot>> stateMetaInfoSnapshots;
std::shared_ptr<TypeSerializer> keySerializer;
};
class SnapshotResultSupplierEmpty : public SnapshotResultSupplier<KeyedStateHandle> {
public:
std::shared_ptr<SnapshotResult<KeyedStateHandle>> get(std::shared_ptr<omnistream::OmniTaskBridge> bridge) override
{
return SnapshotResult<KeyedStateHandle>::Empty();
}
};
class BridgeKeyedStateHandle : public DirectoryKeyedStateHandle {
public:
explicit BridgeKeyedStateHandle(std::shared_ptr<IncrementalLocalKeyedStateHandle> handle)
: DirectoryKeyedStateHandle(handle->getDirectoryStateHandle(), handle->GetKeyGroupRange()),
handle(handle){};
std::shared_ptr<IncrementalLocalKeyedStateHandle> handle;
};
#endif