* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "MergingWindowProcessFunction.h"
template<typename K, typename W>
void MergingWindowProcessFunction<K, W>::open(Context<K, W> *ctx) {
InternalWindowProcessFunction<K, W>::open(ctx);
auto* mapStateDescriptor = new MapStateDescriptor<W, W>("session-window-mapping", windowSerializer, windowSerializer);
MapState<W, W>* mapState = dynamic_cast<typename WindowOperator<K, W>::WindowContext*>(ctx)->getPartitionedState(mapStateDescriptor);
mergingWindows = std::make_unique<MergingWindowSet<K, W>>(windowAssigner, mapState);
}
template<typename K, typename W>
std::vector<W> MergingWindowProcessFunction<K, W>::assignStateNamespace(RowData* inputRow, int64_t timestamp) {
std::vector<W> elementWindows = windowAssigner->assignWindows(inputRow, timestamp);
mergingWindows->initializeCache(this->ctx->currentKey());
reuseActualWindows.clear();
auto MergingFunction = [this](W &mergeResult, std::unordered_set<W> &mergedWindows, W &stateWindowResult,
std::vector<W> &stateWindowsToBeMerged) {
int64_t mergeResultMaxTs = mergeResult.maxTimestamp();
if (!windowAssigner->isEventTime() && mergeResultMaxTs <= this->ctx->currentProcessingTime()) {
throw std::runtime_error(
"The end timestamp of a processing-time window cannot become earlier than the current processing time ");
}
this->ctx->onMerge(mergeResult, stateWindowsToBeMerged);
for (const auto& m: mergedWindows) {
this->ctx->clearTrigger(m);
this->ctx->deleteCleanupTimer(m);
}
if (!stateWindowsToBeMerged.empty()) {
RowData *targetAcc = this->ctx->getWindowAccumulators(stateWindowResult);
if (targetAcc == nullptr) {
targetAcc = this->windowAggregator->createAccumulators();
}
this->windowAggregator->setAccumulators(stateWindowResult, targetAcc);
for (const auto& w: stateWindowsToBeMerged) {
RowData *acc = this->ctx->getWindowAccumulators(w);
if (acc != nullptr) {
this->windowAggregator->merge(w, acc);
}
this->ctx->clearWindowState(w);
this->ctx->clearPreviousState(w);
}
targetAcc = this->windowAggregator->getAccumulators();
this->ctx->setWindowAccumulators(stateWindowResult, targetAcc);
}
};
for (const auto &window: elementWindows) {
W actualWindow = mergingWindows->addWindow(window, MergingFunction);
if (this->isWindowLate(actualWindow)) {
mergingWindows->retireWindow(actualWindow);
} else {
reuseActualWindows.push_back(actualWindow);
}
}
std::vector<W> affectedWindows;
affectedWindows.reserve(reuseActualWindows.size());
for (const auto &actual: reuseActualWindows) {
affectedWindows.push_back(mergingWindows->getStateWindow(actual));
}
return affectedWindows;
}
template<typename K, typename W>
std::vector<W> MergingWindowProcessFunction<K, W>::assignActualWindows(RowData *inputRow, int64_t timestamp) {
return reuseActualWindows;
}
template<typename K, typename W>
void MergingWindowProcessFunction<K, W>::prepareAggregateAccumulatorForEmit(const W& window) {
W stateWindow = mergingWindows->getStateWindow(window);
RowData *acc = this->ctx->getWindowAccumulators(stateWindow);
if (acc == nullptr) {
acc = InternalWindowProcessFunction<K, W>::windowAggregator->createAccumulators();
}
InternalWindowProcessFunction<K, W>::windowAggregator->setAccumulators(stateWindow, acc);
}
template<typename K, typename W>
void MergingWindowProcessFunction<K, W>::cleanWindowIfNeeded(const W& window, int64_t currentTime) {
if (this->isCleanupTime(window, currentTime)) {
this->ctx->clearTrigger(window);
W stateWindow = mergingWindows->getStateWindow(window);
this->ctx->clearWindowState(stateWindow);
mergingWindows->initializeCache(this->ctx->currentKey());
mergingWindows->retireWindow(window);
}
}
template class MergingWindowProcessFunction<shared_ptr<RowData>, TimeWindow>;