* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "MergingWindowSet.h"
#include "data/Row.h"
#include "runtime/operators/window/TimeWindow.h"
template <typename K, typename W>
void MergingWindowSet<K, W>::initializeCache(const K& key)
{
auto cache = cachedSortedWindows.get(key);
if (!cache) {
sortedWindows = new std::set<W>;
auto iter = mapping->iteratorV2();
while (iter->hasNext()) {
auto& entry = iter->next();
if (entry.getKey().has_value()) {
sortedWindows->emplace(entry.getKey().value());
} else {
THROW_RUNTIME_ERROR("Iterator return an entry with empty key.");
}
}
cachedSortedWindows.put(key, sortedWindows);
} else {
sortedWindows = cache.value();
}
}
template <typename K, typename W>
W MergingWindowSet<K, W>::getStateWindow(const W& window)
{
const auto& optionalRes = mapping->get(window);
if (optionalRes.has_value()) {
return optionalRes.value();
}
THROW_RUNTIME_ERROR(
"Cannot find the state of window, startTime:" + std::to_string(window.getStart()) +
", endTime: " + std::to_string(window.getEnd()));
}
template <typename K, typename W>
void MergingWindowSet<K, W>::retireWindow(const W& window)
{
mapping->remove(window);
auto it = sortedWindows->find(window);
if (it == sortedWindows->end()) {
THROW_RUNTIME_ERROR("Window is not in in-flight window set.");
}
sortedWindows->erase(it);
}
template <typename K, typename W>
W MergingWindowSet<K, W>::addWindow(const W& newWindow, const MergeFunction& mergeFunction)
{
reuseCollector_.clear();
windowAssigner->mergeWindows(newWindow, sortedWindows, reuseCollector_);
W resultWindow = newWindow;
bool isNewWindowMerged = false;
for (const auto& c : reuseCollector_) {
W mergeResult = c.first;
auto* mergedWindows = c.second;
if (mergedWindows->erase(newWindow)) {
isNewWindowMerged = true;
resultWindow = mergeResult;
}
if (mergedWindows->empty()) {
continue;
}
W mergedStateNamespace = getStateWindow(*mergedWindows->begin());
std::vector<W> mergedStateWindows;
for (const auto& mergedWindow : *mergedWindows) {
const std::optional<W>& optionalRes = mapping->get(mergedWindow);
if (optionalRes.has_value()) {
mapping->remove(mergedWindow);
sortedWindows->erase(mergedWindow);
if (!(optionalRes.value() == mergedStateNamespace)) {
mergedStateWindows.push_back(optionalRes.value());
}
}
}
mapping->put(mergeResult, mergedStateNamespace);
sortedWindows->insert(mergeResult);
if (!(mergedWindows->find(mergeResult) != mergedWindows->end() && mergedWindows->size() == 1)) {
mergeFunction(mergeResult, *mergedWindows, mergedStateNamespace, mergedStateWindows);
}
}
if (reuseCollector_.empty() || (resultWindow == newWindow && !isNewWindowMerged)) {
mapping->put(resultWindow, resultWindow);
sortedWindows->insert(resultWindow);
}
return resultWindow;
}
template class MergingWindowSet<std::shared_ptr<RowData>, TimeWindow>;