/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

#ifndef FLINK_TNEL_COUNTDISTINCTFUNCTION_H
#define FLINK_TNEL_COUNTDISTINCTFUNCTION_H

#include <cstddef>
#include <cstdint>
#include <tuple>
#include <vector>
#include "../AggsHandleFunction.h"
#include "../table/runtime/dataview/StateDataViewStore.h"
#include "../runtime/state/VoidNamespace.h"

using namespace omniruntime::type;

class CountDistinctFunction : public AggsHandleFunction {
public:
    CountDistinctFunction(int aggIdx, std::string inputType, int accIndex = -1, int valueIndex = -1,
                          int aggFuncIndex = -1, int filterIndex = -1)
        : valueIsNull(true), aggIdx(aggIdx), accIndex(accIndex), valueIndex(valueIndex), aggFuncIndex(aggFuncIndex), filterIndex(filterIndex)
    {
        hasFilter = filterIndex != -1;
        typeId = LogicalType::flinkTypeToOmniTypeId(inputType);
    }

    void setWindowSize(int windowSize) override {};
    bool equaliser(BinaryRowData *r1, BinaryRowData *r2) override;
    void open(StateDataViewStore *store);
    void accumulate(RowData *accInput) override;
    void accumulate(omnistream::VectorBatch *input, const std::vector<int> &indices) override;
    void retract(RowData *retractInput) override;
    void retract(omnistream::VectorBatch* input, const std::vector<int>& indices) override;
    void merge(RowData *otherAcc) override;
    void setAccumulators(RowData *acc) override;
    void resetAccumulators() override;
    void getAccumulators(BinaryRowData *accumulators) override;
    void createAccumulators(BinaryRowData *accumulators) override;
    void getValue(BinaryRowData *aggValue) override;
    void cleanup() override;
    void close() override;
    void setCurrentGroupKey(RowData* key) override;
    void accumulateInRocksDB(omnistream::VectorBatch *input, const std::vector<int> &indices);
    void updateInnerState();
    void bindAccValueIndex(int accStartIndex, int valueStartIndex) override
    {
        accIndex = accStartIndex;
        valueIndex = valueStartIndex;
    }
    int accumulatorSlots() const override { return 1; }
    bool hasAggOutput() const override { return valueIndex >= 0; }
    ~CountDistinctFunction() override;


private:
    using PendingDistinctUpdates = std::vector<std::tuple<RowData*, long, long>>;

    long aggCount;
    bool valueIsNull;
    int aggIdx;
    int accIndex;
    int valueIndex;
    bool hasFilter;
    int aggFuncIndex;
    int filterIndex;
    omniruntime::type::DataTypeId typeId;
    StateDataViewStore *store;
    KeyedStateMapViewWithKeysNullable<VoidNamespace, long, long> *distinctMapView;
    RowData * currentGroupKey;
    PendingDistinctUpdates pendingDistinctUpdates;
};


#endif // FLINK_TNEL_COUNTDISTINCTFUNCTION_H