* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#pragma once
#include "RocksDBCachingPriorityQueueSet.h"
#include "state/PriorityQueueSetFactory.h"
#include "state/RocksDbKvStateInfo.h"
#include "state/RocksDBWriteBatchWrapper.h"
#include "runtime/state/RocksDbKvStateInfo.h"
#include "state/heap/KeyGroupPartitionedPriorityQueue.h"
class RocksDBPriorityQueueSetFactory : public PriorityQueueSetFactory {
public:
RocksDBPriorityQueueSetFactory(
KeyGroupRange* keyGroupRange,
int32_t keyGroupPrefixBytes,
int32_t numberOfKeyGroups,
std::unordered_map<std::string, std::shared_ptr<RocksDbKvStateInfo>>* kvStateInformation,
rocksdb::DB* db,
std::shared_ptr<rocksdb::ReadOptions> readOptions,
std::shared_ptr<RocksDBWriteBatchWrapper> writeBatchWrapper,
std::function<rocksdb::ColumnFamilyOptions(const std::string&)> columnFamilyOptionsFactory,
int64_t writeBufferManagerCapacity)
: keyGroupRange_(keyGroupRange),
keyGroupPrefixBytes_(keyGroupPrefixBytes),
numberOfKeyGroups_(numberOfKeyGroups),
kvStateInformation_(kvStateInformation),
db_(db),
readOptions_(readOptions),
writeBatchWrapper_(writeBatchWrapper),
columnFamilyOptionsFactory_(columnFamilyOptionsFactory),
writeBufferManagerCapacity_(writeBufferManagerCapacity)
{
sharedElementOutView_ = std::make_shared<DataOutputSerializer>(128);
sharedElementInView_ = std::make_shared<DataInputDeserializer>();
}
template <typename K, typename T, typename Comparator>
std::shared_ptr<KeyGroupedInternalPriorityQueue<T>> create(
std::string stateName, TypeSerializer* byteOrderedElementSerializer)
{
return this->create<K, T, Comparator>(stateName, byteOrderedElementSerializer, false);
}
template <typename K, typename T, typename Comparator>
std::shared_ptr<KeyGroupedInternalPriorityQueue<T>> create(
std::string stateName, TypeSerializer* byteOrderedElementSerializer, bool allowFutureMetadataUpdates)
{
auto stateCFHandle =
tryRegisterPriorityQueueMetaInfo(stateName, byteOrderedElementSerializer, allowFutureMetadataUpdates);
auto columnFamilyHandle = stateCFHandle->columnFamilyHandle_;
using Factory = typename KeyGroupPartitionedPriorityQueue<K, T, Comparator>::PartitionQueueSetFactory;
Factory factory = [this, columnFamilyHandle, byteOrderedElementSerializer](
int32_t keyGroupId, int32_t totalKeyGroups) {
return std::make_shared<RocksDBCachingPriorityQueueSet<K, T, Comparator>>(
keyGroupId,
keyGroupPrefixBytes_,
db_,
readOptions_,
columnFamilyHandle,
byteOrderedElementSerializer,
sharedElementOutView_,
sharedElementInView_,
writeBatchWrapper_,
DEFAULT_CACHES_SIZE);
};
return std::make_shared<KeyGroupPartitionedPriorityQueue<K, T, Comparator>>(
factory, keyGroupRange_, numberOfKeyGroups_);
}
private:
std::shared_ptr<RocksDbKvStateInfo> tryRegisterPriorityQueueMetaInfo(
const std::string& stateName, TypeSerializer* byteOrderedElementSerializer, bool allowFutureMetadataUpdates)
{
std::shared_ptr<RocksDbKvStateInfo> stateInfo;
auto iter = kvStateInformation_->find(stateName);
if (iter == kvStateInformation_->end()) {
auto metaInfo =
std::make_shared<RegisteredPriorityQueueStateBackendMetaInfo>(stateName, byteOrderedElementSerializer);
metaInfo = allowFutureMetadataUpdates ? metaInfo->withSerializerUpgradesAllowed() : metaInfo;
stateInfo = RocksDbOperationUtils::createStateInfo(metaInfo, db_, columnFamilyOptionsFactory_);
RocksDbOperationUtils::registerKvStateInformation(kvStateInformation_, stateName, stateInfo);
} else {
stateInfo = iter->second;
auto castedMetaInfo =
std::dynamic_pointer_cast<RegisteredPriorityQueueStateBackendMetaInfo>(stateInfo->metaInfo_);
if (castedMetaInfo == nullptr) {
INFO_RELEASE(
"Error >>> tryRegisterPriorityQueueMetaInfo Restored priority queue state meta info type "
"mismatch. stateName="
<< stateName);
THROW_RUNTIME_ERROR("Restored priority queue state meta info type mismatch. stateName=" + stateName);
}
auto previousElementSerializer = castedMetaInfo->getPreviousElementSerializer();
if (previousElementSerializer != byteOrderedElementSerializer) {
auto compatibilityResult = castedMetaInfo->updateElementSerializer(byteOrderedElementSerializer);
if (compatibilityResult.isIncompatible()) {
INFO_RELEASE(
"Error >>> tryRegisterPriorityQueueMetaInfo The new priority queue serializer must "
"not be incompatible. stateName="
<< stateName << ", compatibility=" << compatibilityResult.toString());
THROW_RUNTIME_ERROR(
"The new priority queue serializer must not be incompatible. stateName=" + stateName +
", compatibility=" + compatibilityResult.toString());
}
auto metaInfo = std::make_shared<RegisteredPriorityQueueStateBackendMetaInfo>(
stateName, byteOrderedElementSerializer);
metaInfo = allowFutureMetadataUpdates ? metaInfo->withSerializerUpgradesAllowed() : metaInfo;
stateInfo = std::make_shared<RocksDbKvStateInfo>(stateInfo->columnFamilyHandle_, metaInfo);
(*kvStateInformation_)[stateName] = stateInfo;
INFO_RELEASE(
"RocksDBPriorityQueueSetFactory: updated restored priority queue serializer"
<< ", stateName=" << stateName << ", compatibility=" << compatibilityResult.toString());
}
}
return stateInfo;
}
static constexpr int32_t DEFAULT_CACHES_SIZE = 128;
KeyGroupRange* keyGroupRange_;
int32_t keyGroupPrefixBytes_;
int32_t numberOfKeyGroups_;
std::unordered_map<std::string, std::shared_ptr<RocksDbKvStateInfo>>* kvStateInformation_;
rocksdb::DB* db_;
std::shared_ptr<rocksdb::ReadOptions> readOptions_;
std::shared_ptr<RocksDBWriteBatchWrapper> writeBatchWrapper_;
std::function<rocksdb::ColumnFamilyOptions(const std::string&)> columnFamilyOptionsFactory_;
int64_t writeBufferManagerCapacity_;
std::shared_ptr<DataOutputSerializer> sharedElementOutView_;
std::shared_ptr<DataInputDeserializer> sharedElementInView_;
};