* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include "KeyGroupRange.h"
#include "TimerConsistencyCheckControl.h"
#include "heap/HeapPriorityQueueDataDigest.h"
#include "heap/HeapPriorityQueueSetFactory.h"
#include "heap/HeapPriorityQueueSnapshotRestoreWrapper.h"
#include "heap/RestoredHeapPriorityQueueSnapshotRestoreWrapper.h"
class HeapPriorityQueuesManager {
public:
HeapPriorityQueuesManager(
std::shared_ptr<std::unordered_map<std::string, std::shared_ptr<HeapPriorityQueueSnapshotRestoreWrapperBase>>> registeredPQStates,
std::shared_ptr<HeapPriorityQueueSetFactory> priorityQueueSetFactory,
KeyGroupRange* keyGroupRange,
int32_t numberOfKeyGroups)
:
registeredPQStates_(registeredPQStates),
priorityQueueSetFactory_(priorityQueueSetFactory),
keyGroupRange_(keyGroupRange),
numberOfKeyGroups_(numberOfKeyGroups) {}
template <typename K, typename T, typename Comparator>
std::shared_ptr<KeyGroupedInternalPriorityQueue<T>> createOrUpdate(
std::string stateName,
TypeSerializer* byteOrderedElementSerializer) {
return createOrUpdate<K, T, Comparator>(stateName, byteOrderedElementSerializer, false);
}
template <typename K, typename T, typename Comparator>
std::shared_ptr<KeyGroupedInternalPriorityQueue<T>> createOrUpdate(
std::string stateName,
TypeSerializer* byteOrderedElementSerializer,
bool allowFutureMetadataUpdates) {
auto iter = registeredPQStates_->find(stateName);
if (iter != registeredPQStates_->end()) {
auto existingState = std::dynamic_pointer_cast<HeapPriorityQueueSnapshotRestoreWrapper<K, T, Comparator>>(iter->second);
if (existingState != nullptr) {
return existingState->getHeapPriorityQueueSet();
}
auto pendingRestoredState = std::dynamic_pointer_cast<RestoredHeapPriorityQueueSnapshotRestoreWrapper>(iter->second);
if (pendingRestoredState != nullptr) {
auto metaInfo = std::make_shared<RegisteredPriorityQueueStateBackendMetaInfo>(
stateName,
byteOrderedElementSerializer);
auto restoredState = createInternal<K, T, Comparator>(metaInfo);
const size_t pendingEntryCount = pendingRestoredState->size();
pendingRestoredState->template drainTo<K, T, Comparator>(restoredState, getKeyGroupPrefixBytes());
if (TimerConsistencyCheckControl::timerConsistencyCheckEnabled) {
logRestoredQueueDigest<K, T, Comparator>(stateName, restoredState);
}
INFO_RELEASE("HeapPriorityQueuesManager: drained " << pendingEntryCount
<< " pending restored entries for priority queue state '" << stateName << "'");
return restoredState->getHeapPriorityQueueSet();
}
INFO_RELEASE(
"Error: createOrUpdate Priority queue type is not supported for restored HEAP PQ state:" << stateName);
THROW_LOGIC_EXCEPTION("Priority queue type is not supported for restored HEAP PQ state: " << stateName)
}
auto metaInfo = std::make_shared<RegisteredPriorityQueueStateBackendMetaInfo>(stateName, byteOrderedElementSerializer);
return createInternal<K, T, Comparator>(metaInfo)->getHeapPriorityQueueSet();
}
std::shared_ptr<std::unordered_map<std::string, std::shared_ptr<HeapPriorityQueueSnapshotRestoreWrapperBase>>> getRegisteredPQStates() {
return registeredPQStates_;
}
private:
int getKeyGroupPrefixBytes() const
{
return numberOfKeyGroups_ > 128 ? 2 : 1;
}
template <typename K, typename T, typename Comparator>
std::shared_ptr<HeapPriorityQueueSnapshotRestoreWrapper<K, T, Comparator>> createInternal(
std::shared_ptr<RegisteredPriorityQueueStateBackendMetaInfo> metaInfo) {
const std::string& stateName = metaInfo->getName();
std::shared_ptr<KeyGroupedInternalPriorityQueue<T>> priorityQueue =
priorityQueueSetFactory_->create<K, T, Comparator>(stateName, metaInfo->getElementSerializer());
using WrapperType = HeapPriorityQueueSnapshotRestoreWrapper<K, T, Comparator>;
std::shared_ptr<WrapperType> wrapper = std::make_shared<WrapperType>(
std::static_pointer_cast<HeapPriorityQueueSet<K, T, Comparator>>(priorityQueue),
metaInfo,
keyGroupRange_,
numberOfKeyGroups_
);
(*registeredPQStates_)[stateName] = wrapper;
return wrapper;
}
template <typename K, typename T, typename Comparator>
void logRestoredQueueDigest(
const std::string &stateName,
const std::shared_ptr<HeapPriorityQueueSnapshotRestoreWrapper<K, T, Comparator>> &wrapper)
{
auto summary = HeapPriorityQueueDataDigest::createSummary(
"restore",
-1,
stateName,
keyGroupRange_ == nullptr ? numberOfKeyGroups_ : keyGroupRange_->getNumberOfKeyGroups());
if (wrapper != nullptr) {
auto iterator = wrapper->createSnapshotIterator(-1, getKeyGroupPrefixBytes());
while (iterator != nullptr && iterator->isValid()) {
HeapPriorityQueueDataDigest::addSerializedEntry(
summary,
iterator->key(),
iterator->value(),
getKeyGroupPrefixBytes());
iterator->next();
}
if (iterator != nullptr) {
iterator->close();
}
}
HeapPriorityQueueDataDigest::logSummary(summary);
}
std::shared_ptr<std::unordered_map<std::string, std::shared_ptr<HeapPriorityQueueSnapshotRestoreWrapperBase>>> registeredPQStates_;
std::shared_ptr<HeapPriorityQueueSetFactory> priorityQueueSetFactory_;
KeyGroupRange* keyGroupRange_;
int32_t numberOfKeyGroups_;
};