* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OMNISTREAM_HEAPPRIORITYQUEUESINGLESTATEITERATOR_H
#define OMNISTREAM_HEAPPRIORITYQUEUESINGLESTATEITERATOR_H
#include <algorithm>
#include <cstdint>
#include <memory>
#include <type_traits>
#include <vector>
#include "basictypes/Object.h"
#include "common.h"
#include "core/memory/DataOutputSerializer.h"
#include "core/typeutils/TypeSerializer.h"
#include "runtime/state/KeyGroupRangeAssignment.h"
#include "runtime/state/rocksdb/iterator/SingleStateIterator.h"
#include "runtime/state/heap/HeapPriorityQueueSet.h"
#include "core/utils/type_traits_ext.h"
* Sync-phase materialized iterator for Heap priority queue state.
*
* Flink's heap backend writes PRIORITY_QUEUE state through the same full snapshot
* key/value stream as normal keyed state. Each PQ element is encoded as:
* key = [key-group-prefix] + [serialized priority queue element]
* value = []
*
* The live queue is copied and serialized during construction, so async snapshot
* writing does not touch mutable timer/PQ data structures.
*/
template <typename K, typename T, typename Comparator>
class HeapPriorityQueueSingleStateIterator : public SingleStateIterator {
public:
static_assert(is_shared_ptr_v<T>, "T should be shared ptr.");
HeapPriorityQueueSingleStateIterator(
HeapPriorityQueueSet<K, T, Comparator> *priorityQueue,
TypeSerializer *elementSerializer,
int kvStateId,
int keyGroupPrefixBytes,
int totalNumberOfKeyGroups)
: elementSerializer_(elementSerializer),
kvStateId_(kvStateId),
keyGroupPrefixBytes_(keyGroupPrefixBytes),
totalNumberOfKeyGroups_(totalNumberOfKeyGroups)
{
collectAndSerializeEntries(priorityQueue);
currentIndex_ = 0;
valid_ = !entries_.empty();
refreshKeyGroup();
INFO_RELEASE("HeapPriorityQueueSingleStateIterator: kvStateId=" << kvStateId_
<< ", entryCount=" << entries_.size()
<< ", valid=" << valid_);
}
void next() override
{
if (valid_) {
currentIndex_++;
valid_ = currentIndex_ < entries_.size();
refreshKeyGroup();
}
}
bool isValid() const override
{
return valid_;
}
ByteView key() const override
{
const auto &key = entries_[currentIndex_].serializedKey;
return ByteView::fromBuffer(key.data(), key.size());
}
ByteView value() const override
{
return {};
}
int keyGroup() const override
{
return currentKeyGroup_;
}
int getKvStateId() const override
{
return kvStateId_;
}
size_t getEntryCount() const override
{
return entries_.size();
}
void close() override
{
entries_.clear();
valid_ = false;
}
private:
using ElementType = typename T::element_type;
struct SerializedEntry {
std::vector<int8_t> serializedKey;
};
TypeSerializer *elementSerializer_;
int kvStateId_;
int keyGroupPrefixBytes_;
int totalNumberOfKeyGroups_;
std::vector<SerializedEntry> entries_;
size_t currentIndex_ = 0;
int currentKeyGroup_ = -1;
bool valid_ = false;
void refreshKeyGroup()
{
currentKeyGroup_ = -1;
if (!valid_ || currentIndex_ >= entries_.size()) {
return;
}
const auto &key = entries_[currentIndex_].serializedKey;
if (key.size() < static_cast<size_t>(keyGroupPrefixBytes_)) {
return;
}
int result = 0;
for (int i = 0; i < keyGroupPrefixBytes_; ++i) {
result <<= 8;
result |= static_cast<int>(static_cast<uint8_t>(key[i]));
}
currentKeyGroup_ = result;
}
void collectAndSerializeEntries(HeapPriorityQueueSet<K, T, Comparator> *priorityQueue)
{
if (priorityQueue == nullptr) {
return;
}
std::vector<T> snapshot = priorityQueue->toArray();
entries_.reserve(snapshot.size());
int entryIndex = 0;
for (const auto &element : snapshot) {
if (!element) {
continue;
}
const auto &key = element->getKey();
int keyGroup = KeyGroupRangeAssignment<K>::assignToKeyGroup(key, totalNumberOfKeyGroups_);
SerializedEntry entry;
try {
entry.serializedKey = serializeKey(keyGroup, element);
} catch (const std::exception &e) {
INFO_RELEASE("Error: HeapPriorityQueueSingleStateIterator: serialize EXCEPTION at keyGroup="
<< keyGroup << ", entryIndex=" << entryIndex << ", error=" << e.what());
throw;
}
entries_.push_back(std::move(entry));
entryIndex++;
}
std::sort(entries_.begin(), entries_.end(),
[this](const SerializedEntry &a, const SerializedEntry &b) -> bool {
const int len = std::min({keyGroupPrefixBytes_,
static_cast<int>(a.serializedKey.size()),
static_cast<int>(b.serializedKey.size())});
for (int i = 0; i < len; i++) {
auto av = static_cast<uint8_t>(a.serializedKey[i]);
auto bv = static_cast<uint8_t>(b.serializedKey[i]);
if (av != bv) {
return av < bv;
}
}
return false;
});
}
std::vector<int8_t> serializeKey(int keyGroup, const T &element)
{
DataOutputSerializer outputSerializer;
OutputBufferStatus outputBufferStatus;
outputSerializer.setBackendBuffer(&outputBufferStatus);
if (keyGroupPrefixBytes_ == 1) {
outputSerializer.writeByte(static_cast<uint32_t>(keyGroup));
} else {
outputSerializer.writeByte(static_cast<uint32_t>((keyGroup >> 8) & 0xFF));
outputSerializer.writeByte(static_cast<uint32_t>(keyGroup & 0xFF));
}
serializeElement(element, outputSerializer);
auto *data = outputSerializer.getData();
size_t len = outputSerializer.length();
std::vector<int8_t> result(len);
for (size_t i = 0; i < len; i++) {
result[i] = static_cast<int8_t>(data[i]);
}
return result;
}
void serializeElement(const T &element, DataOutputSerializer &outputSerializer)
{
if (elementSerializer_ == nullptr) {
INFO_RELEASE("Error: serializeElement Priority queue element serializer is null");
THROW_LOGIC_EXCEPTION("Priority queue element serializer is null")
}
if constexpr (std::is_base_of_v<Object, ElementType>) {
elementSerializer_->serialize(static_cast<Object *>(element.get()), outputSerializer);
} else {
elementSerializer_->serialize(static_cast<void *>(element.get()), outputSerializer);
}
}
};
#endif