/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT.
 * See the Mulan PSL v2 for more details.
 */

#include "PanedWindowProcessFunction.h"

#include <memory>

#include "table/runtime/operators/window/TimeWindow.h"

template <typename K, typename W>
std::vector<W> PanedWindowProcessFunction<K, W>::assignActualWindows(RowData* inputRow, int64_t timestamp)
{
    std::vector<W> elementWindows = windowAssigner->assignWindows(inputRow, timestamp);
    std::vector<W> actualWindows;
    actualWindows.reserve(elementWindows.size());
    for (const auto& window : elementWindows) {
        if (!this->isWindowLate(window)) {
            actualWindows.push_back(window);
        }
    }
    return actualWindows;
}

template <typename K, typename W>
std::vector<W> PanedWindowProcessFunction<K, W>::assignStateNamespace(RowData* inputRow, int64_t timestamp)
{
    W pane = windowAssigner->assignPane(inputRow, timestamp);
    if (isPaneLate(pane)) {
        return {};
    }
    return {pane};
}

template <typename K, typename W>
void PanedWindowProcessFunction<K, W>::prepareAggregateAccumulatorForEmit(const W& window)
{
    std::vector<W> panes = windowAssigner->splitIntoPanes(window);
    RowData* acc = this->windowAggregator->createAccumulators();
    this->windowAggregator->setAccumulators(window, acc);
    for (const auto& pane : panes) {
        RowData* paneAcc = this->ctx->getWindowAccumulators(pane);
        if (paneAcc != nullptr) {
            this->windowAggregator->merge(pane, paneAcc);
        }
    }
}

template <typename K, typename W>
void PanedWindowProcessFunction<K, W>::cleanWindowIfNeeded(const W& window, int64_t currentTime)
{
    if (this->isCleanupTime(window, currentTime)) {
        std::vector<W> panes = windowAssigner->splitIntoPanes(window);
        for (const auto& pane : panes) {
            W lastWindow = windowAssigner->getLastWindow(pane);
            if (window == lastWindow) {
                this->ctx->clearWindowState(pane);
            }
        }
        this->ctx->clearTrigger(window);
        this->ctx->clearPreviousState(window);
    }
}

template <typename K, typename W>
bool PanedWindowProcessFunction<K, W>::isPaneLate(const W& pane)
{
    return windowAssigner->isEventTime() && this->isWindowLate(windowAssigner->getLastWindow(pane));
}

template class PanedWindowProcessFunction<std::shared_ptr<RowData>, TimeWindow>;