* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OMNISTREAM_ROCKSDBHANDLE_H
#define OMNISTREAM_ROCKSDBHANDLE_H
#pragma once
#include <stdexcept>
#include <memory>
#include <string>
#include <vector>
#include <filesystem>
#include <iostream>
#include "runtime/state/RegisteredStateMetaInfoBase.h"
#include "runtime/state/rocksdb/RocksDbOperationUtils.h"
#include "runtime/state/metainfo/StateMetaInfoSnapshot.h"
#include "runtime/state/RocksDbKvStateInfo.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
namespace fs = std::filesystem;
class RocksDbHandle {
public:
RocksDbHandle(
std::unordered_map<std::string, std::shared_ptr<RocksDbKvStateInfo>>* kvStateInformation,
const fs::path& instanceRocksDBPath,
std::shared_ptr<rocksdb::DBOptions> dbOptions,
std::function<rocksdb::ColumnFamilyOptions(const std::string&)> columnFamilyOptionsFactory)
: kvStateInformation(kvStateInformation),
dbPath(instanceRocksDBPath.string()),
dbOptions(std::move(dbOptions)),
columnFamilyOptionsFactory(std::move(columnFamilyOptionsFactory))
{
columnFamilyHandles.reserve(1);
}
void openDB()
{
loadDb();
}
void openDB(
const std::vector<rocksdb::ColumnFamilyDescriptor>& columnFamilyDescriptors,
const std::vector<StateMetaInfoSnapshot>& stateMetaInfoSnapshots,
const fs::path& restoreSourcePath)
{
this->columnFamilyDescriptors = columnFamilyDescriptors;
this->columnFamilyHandles.resize(columnFamilyDescriptors.size() + 1);
restoreInstanceDirectoryFromPath(restoreSourcePath);
loadDb();
for (size_t i = 0; i < stateMetaInfoSnapshots.size(); i++) {
getOrRegisterStateColumnFamilyHandle(columnFamilyHandles[i], stateMetaInfoSnapshots[i]);
}
}
rocksdb::DB* getDb()
{
return db;
}
rocksdb::ColumnFamilyHandle* getDefaultColumnFamilyHandle()
{
return defaultColumnFamilyHandle;
};
std::vector<rocksdb::ColumnFamilyHandle*> getColumnFamilyHandles()
{
return columnFamilyHandles;
}
std::function<rocksdb::ColumnFamilyOptions(const std::string&)> getColumnFamilyOptionsFactory()
{
return columnFamilyOptionsFactory;
}
std::shared_ptr<rocksdb::DBOptions> getDbOptions()
{
return dbOptions;
}
std::shared_ptr<RocksDbKvStateInfo> getOrRegisterStateColumnFamilyHandle(
rocksdb::ColumnFamilyHandle* columnFamilyHandle, const StateMetaInfoSnapshot& stateMetaInfoSnapshot)
{
std::shared_ptr<RocksDbKvStateInfo> registeredStateMetaInfoEntry = nullptr;
auto it = kvStateInformation->find(stateMetaInfoSnapshot.getName());
if (it != kvStateInformation->end()) {
registeredStateMetaInfoEntry = it->second;
}
if (nullptr == registeredStateMetaInfoEntry) {
std::shared_ptr<RegisteredStateMetaInfoBase> stateMetaInfo =
RegisteredStateMetaInfoBase::fromMetaInfoSnapshot(stateMetaInfoSnapshot);
if (columnFamilyHandle == nullptr) {
registeredStateMetaInfoEntry =
RocksDbOperationUtils::createStateInfo(stateMetaInfo, db, columnFamilyOptionsFactory);
} else {
registeredStateMetaInfoEntry =
std::make_shared<RocksDbKvStateInfo>(columnFamilyHandle, std::move(stateMetaInfo));
}
RocksDbOperationUtils::registerKvStateInformation(
kvStateInformation, stateMetaInfoSnapshot.getName(), registeredStateMetaInfoEntry);
}
return registeredStateMetaInfoEntry;
}
void restoreInstanceDirectoryFromPath(const fs::path& source)
{
fs::path instanceRocksDBDirectory(dbPath);
std::error_code ec;
bool created = fs::create_directories(instanceRocksDBDirectory, ec);
if (ec || !created) {
std::string errMsg = "Could not create RocksDB data directory: " + instanceRocksDBDirectory.string();
std::cerr << "ERROR: " << errMsg << std::endl;
throw std::runtime_error(errMsg);
}
std::vector<fs::directory_entry> entries;
try {
for (const auto& entry : fs::directory_iterator(source)) {
entries.push_back(entry);
}
} catch (const fs::filesystem_error& ex) {
std::string errMsg = "Could not list directory: " + source.string() + ", error: " + ex.what();
std::cerr << "ERROR: " + errMsg << std::endl;
throw;
}
for (const auto& entry : entries) {
if (!entry.is_regular_file()) {
continue;
}
const fs::path& file = entry.path();
const std::string fileName = file.filename().string();
const fs::path targetFile = instanceRocksDBDirectory / fileName;
bool hardLinkSuccess = false;
if (endsWithSst(fileName)) {
try {
fs::create_hard_link(file, targetFile);
hardLinkSuccess = true;
continue;
} catch (const fs::filesystem_error& ex) {
std::string logMessage = "Could not hard link sst file " + fileName;
std::cout << "INFO: " << logMessage << std::endl;
}
}
std::cout << "hardLinkSuccess state: " << hardLinkSuccess << std::endl;
try {
fs::copy_file(file, targetFile, fs::copy_options::overwrite_existing);
} catch (const fs::filesystem_error& ex) {
std::string errMsg = "Failed to copy file from " + file.string() + " to " + targetFile.string() +
", error: " + ex.what();
std::cerr << "ERROR: " << errMsg << std::endl;
throw;
}
}
}
private:
std::unordered_map<std::string, std::shared_ptr<RocksDbKvStateInfo>>* kvStateInformation;
const std::string dbPath;
std::shared_ptr<rocksdb::DBOptions> dbOptions;
const std::function<rocksdb::ColumnFamilyOptions(const std::string&)> columnFamilyOptionsFactory;
std::vector<rocksdb::ColumnFamilyHandle*> columnFamilyHandles;
std::vector<rocksdb::ColumnFamilyDescriptor> columnFamilyDescriptors;
rocksdb::DB* db;
rocksdb::ColumnFamilyHandle* defaultColumnFamilyHandle;
const std::string SST_FILE_SUFFIX = ".sst";
std::string::size_type SST_SUFFIX_LENGTH = 4;
void loadDb()
{
rocksdb::ColumnFamilyOptions columnFamilyOptions =
RocksDbOperationUtils::createColumnFamilyOptions(columnFamilyOptionsFactory, "default");
db = RocksDbOperationUtils::openDB(
dbPath, columnFamilyDescriptors, columnFamilyHandles, columnFamilyOptions, *dbOptions);
defaultColumnFamilyHandle = columnFamilyHandles[0];
columnFamilyHandles.erase(columnFamilyHandles.begin());
}
bool endsWithSst(const std::string& str)
{
if (str.length() < SST_SUFFIX_LENGTH) {
return false;
}
return str.substr(str.length() - SST_SUFFIX_LENGTH) == SST_FILE_SUFFIX;
}
};
#endif