* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "GroupWindowAggsHandleFunction.h"
#include <type_traits>
#include <utility>
#include "table/data/binary/BinaryRowData.h"
#include "table/runtime/operators/window/TimeWindow.h"
#include "table/utils/TimeWindowUtil.h"
#include "typeutils/BinaryRowDataSerializer.h"
template <typename N>
GroupWindowAggsHandleFunction<N>::GroupWindowAggsHandleFunction(
std::vector<std::unique_ptr<NamespaceAggsBasicFunction<N>>> functions,
std::vector<int32_t> aggValueTypeIds,
std::vector<int32_t> windowPropertyTypeIds,
std::vector<int32_t> outputValueTypeIds,
int32_t accumulatorArity,
std::string shiftTimeZone)
: NamespaceAggsHandleFunction<N>(accumulatorArity),
functions_(std::move(functions)),
aggValueTypeIds_(std::move(aggValueTypeIds)),
windowPropertyTypeIds_(std::move(windowPropertyTypeIds)),
outputValueTypeIds_(std::move(outputValueTypeIds)),
shiftTimeZone_(std::move(shiftTimeZone))
{
if (outputValueTypeIds_.size() != windowPropertyTypeIds_.size() + aggValueTypeIds_.size()) {
THROW_LOGIC_EXCEPTION(
"Output value type ids size must be equal to window property type ids size plus agg "
"value type ids size!");
}
}
template <typename N>
void GroupWindowAggsHandleFunction<N>::open(StateDataViewStore* store)
{
NOT_IMPL_EXCEPTION;
}
template <typename N>
void GroupWindowAggsHandleFunction<N>::setAccumulators(N namespaceVal, RowData* accumulators)
{
currentAcc_ = accumulators;
for (const auto& function : functions_) {
function->setAccumulators(namespaceVal, accumulators);
}
}
template <typename N>
void GroupWindowAggsHandleFunction<N>::accumulate(RowData* inputRow)
{
for (const auto& function : functions_) {
function->accumulate(inputRow);
}
}
template <typename N>
void GroupWindowAggsHandleFunction<N>::retract(RowData* inputRow)
{
for (const auto& function : functions_) {
function->retract(inputRow);
}
}
template <typename N>
void GroupWindowAggsHandleFunction<N>::merge(N namespaceVal, RowData* otherAcc)
{
for (const auto& function : functions_) {
function->merge(namespaceVal, otherAcc);
}
}
template <typename N>
RowData* GroupWindowAggsHandleFunction<N>::createAccumulators()
{
auto* accumulators = BinaryRowData::createBinaryRowDataWithMem(this->accumulatorArity_);
for (int32_t i = 0; i < this->accumulatorArity_; ++i) {
accumulators->setNullAt(i);
}
return accumulators;
}
template <typename N>
RowData* GroupWindowAggsHandleFunction<N>::getAccumulators()
{
for (const auto& function : functions_) {
function->getAccumulators();
}
return currentAcc_;
}
template <typename N>
RowData* GroupWindowAggsHandleFunction<N>::getValue(N namespaceVal)
{
auto* value = BinaryRowData::createBinaryRowDataWithMem(outputValueTypeIds_.size());
for (const auto& function : functions_) {
function->updateAggValue(value);
}
if constexpr (std::is_same_v<N, TimeWindow>) {
const int64_t windowStart = static_cast<TimeWindow>(namespaceVal).getStart();
const int64_t windowEnd = static_cast<TimeWindow>(namespaceVal).getEnd();
value->setLong(
outputValueTypeIds_.size() - 4, convertWindowBoundaryTimestamp(windowStart, windowPropertyTypeIds_[0]));
value->setLong(
outputValueTypeIds_.size() - 3, convertWindowBoundaryTimestamp(windowEnd, windowPropertyTypeIds_[1]));
value->setLong(
outputValueTypeIds_.size() - 2, convertWindowBoundaryTimestamp(windowEnd - 1, windowPropertyTypeIds_[2]));
value->setLong(outputValueTypeIds_.size() - 1, -1);
} else {
NOT_IMPL_EXCEPTION;
}
return value;
}
template <typename N>
void GroupWindowAggsHandleFunction<N>::cleanup(N namespaceVal)
{
}
template <typename N>
void GroupWindowAggsHandleFunction<N>::close()
{
}
template <typename N>
int64_t GroupWindowAggsHandleFunction<N>::convertWindowBoundaryTimestamp(
int64_t utcTimestampMills, int32_t propertyTypeId) const
{
if (propertyTypeId == omniruntime::type::DataTypeId::OMNI_TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
return TimeWindowUtil::toEpochMills(utcTimestampMills, shiftTimeZone_);
}
return utcTimestampMills;
}
template class GroupWindowAggsHandleFunction<int64_t>;
template class GroupWindowAggsHandleFunction<TimeWindow>;