/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

#include "WindowAggsHandleFunction.h"

#include <utility>

#include "common.h"
#include "table/data/binary/BinaryRowData.h"

WindowAggsHandleFunction::WindowAggsHandleFunction(
    std::vector<std::unique_ptr<NamespaceAggsBasicFunction<int64_t>>> functions,
    std::vector<int32_t> aggValueTypeIds,
    std::vector<int32_t> outputValueTypeIds,
    SliceAssigner* sliceAssigner,
    int32_t accumulatorArity)
    : NamespaceAggsHandleFunction<int64_t>(accumulatorArity),
      aggValueTypeIds_(std::move(aggValueTypeIds)),
      outputValueTypeIds_(std::move(outputValueTypeIds)),
      sliceAssigner_(sliceAssigner),
      functions_(std::move(functions))
{
    if (outputValueTypeIds_.size() != aggValueTypeIds_.size() + 2) {
        THROW_LOGIC_EXCEPTION("Output value type ids must contain aggregate values and two window fields.");
    }
}

void WindowAggsHandleFunction::open(StateDataViewStore* store)
{
}

void WindowAggsHandleFunction::accumulate(RowData* accInput)
{
    for (auto& func : functions_) {
        func->accumulate(accInput);
    }
}

void WindowAggsHandleFunction::retract(RowData* retractInput)
{
    for (auto& func : functions_) {
        func->retract(retractInput);
    }
}

void WindowAggsHandleFunction::merge(int64_t ns, RowData* otherAcc)
{
    for (auto& func : functions_) {
        func->merge(ns, otherAcc);
    }
}

void WindowAggsHandleFunction::setAccumulators(int64_t ns, RowData* acc)
{
    currentAcc_ = acc;
    for (auto& func : functions_) {
        func->setAccumulators(ns, acc);
    }
}

RowData* WindowAggsHandleFunction::getAccumulators()
{
    for (const auto& function : functions_) {
        function->getAccumulators();
    }
    return currentAcc_;
}

RowData* WindowAggsHandleFunction::createAccumulators()
{
    auto* newAcc = BinaryRowData::createBinaryRowDataWithMem(accumulatorArity_);
    for (int32_t i = 0; i < accumulatorArity_; ++i) {
        newAcc->setNullAt(i);
    }
    return newAcc;
}

RowData* WindowAggsHandleFunction::getValue(int64_t ns)
{
    auto* res = BinaryRowData::createBinaryRowDataWithMem(this->outputValueTypeIds_.size());
    for (auto& func : functions_) {
        func->updateAggValue(res);
    }
    res->setLong(this->outputValueTypeIds_.size() - 2, sliceAssigner_->getWindowStart(ns));
    res->setLong(this->outputValueTypeIds_.size() - 1, ns);
    return res;
}

void WindowAggsHandleFunction::cleanup(int64_t ns)
{
}

void WindowAggsHandleFunction::close()
{
}