/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

#pragma once

template <typename K, typename W>
class SlicingWindowProcessor;

#include <ctime>
#include <limits>
#include <string>

#include "nlohmann/json.hpp"
#include "streaming/api/watermark/Watermark.h"
#include "streaming/api/operators/TimerHeapInternalTimer.h"
#include "table/data/vectorbatch/VectorBatch.h"
#include "table/runtime/operators/TableStreamOperator.h"
#include "streaming/api/operators/OneInputStreamOperator.h"
#include "streaming/api/operators/Triggerable.h"
#include "core/api/common/state/ListState.h"
#include "streaming/api/operators/TimestampedCollector.h"
#include "table/runtime/operators/InternalTimerService.h"
#include "streaming/api/operators/Output.h"
#include "functions/RuntimeContext.h"
#include "table/runtime/operators/window/processor/AbstractWindowAggProcessor.h"
#include "runtime/state/DefaultOperatorStateBackend.h"
#include "runtime/state/StateInitializationContextImpl.h"
#include "runtime/state/StateSnapshotContextSynchronousImpl.h"
#include "core/api/common/state/ListStateDescriptor.h"


using namespace omniruntime::type;
using json = nlohmann::json;

template <typename K, typename W>
class SlicingWindowOperator : public TableStreamOperator<K>, public OneInputStreamOperator
, public Triggerable<K, W> {
public:
    SlicingWindowOperator(std::unique_ptr<SlicingWindowProcessor<K, W>> windowProcessor, const nlohmann::json config);
    ~SlicingWindowOperator() override = default;
    void open() override;
    void initializeState(StreamTaskStateInitializerImpl *initializer, TypeSerializer *keySerializer) override
    {
        INFO_RELEASE("SlicingWindowOperator initializeState with initializer, operatorID: "
            << AbstractStreamOperator<K>::GetOperatorID().toString());
        AbstractStreamOperator<K>::SetOperatorID(OneInputStreamOperator::GetOperatorID().toString());
        AbstractStreamOperator<K>::initializeState(initializer, keySerializer);
    };

    void initializeState(StateInitializationContextImpl *context) override
    {
        INFO_RELEASE("SlicingWindowOperator initializeState");
        AbstractStreamOperator<K>::initializeState(context);

        std::string watermarkStateName = "watermark";
        auto *watermarkStateDesc = new ListStateDescriptor<int64_t>(watermarkStateName, new LongSerializer());
        auto *stateBackend = static_cast<DefaultOperatorStateBackend*>(context->getOperatorStateBackend());
        auto rawState = stateBackend->getUnionListState<int64_t>(watermarkStateDesc);
        this->watermarkState = rawState;

        if (context->isRestored()) {
            auto *watermarks = this->watermarkState->get();
            if (watermarks != nullptr && !watermarks->empty()) {
                int64_t minWatermark = std::numeric_limits<int64_t>::max();
                for (int64_t wm : *watermarks) {
                    if (wm < minWatermark) {
                        minWatermark = wm;
                    }
                }
                if (minWatermark != std::numeric_limits<int64_t>::max()) {
                    this->currentWatermark = minWatermark;
                }
            }
        }
    };

    void snapshotState(StateSnapshotContextSynchronousImpl *context) override
    {
        INFO_RELEASE("SlicingWindowOperator snapshotState");
        TableStreamOperator<K>::snapshotState(context);
        if (this->watermarkState != nullptr) {
            this->watermarkState->clear();
            this->watermarkState->add(this->currentWatermark);
        }
    };

    void notifyCheckpointComplete(long checkpointId) override {
        AbstractStreamOperator<K>::notifyCheckpointComplete(checkpointId);
    }

    void notifyCheckpointAborted(long checkpointId) override {
        AbstractStreamOperator<K>::notifyCheckpointAborted(checkpointId);
    }

    void snapshotState() {};
    void close();
    void processBatch(omnistream::VectorBatch *batch);
    void processBatch(StreamRecord *record) override;
    void ProcessWatermark(Watermark *mark) override;
    void processElement(StreamRecord *element) override {};
    void onEventTime(TimerHeapInternalTimer<K, W> *timer) override;
    void onProcessingTime(TimerHeapInternalTimer<K, W> *timer) override;
    void prepareSnapshotPreBarrier(int64_t checkpointId);
    Output* getOutput();
    void processWatermarkStatus(WatermarkStatus *watermarkStatus) override {};
    void onTimer(TimerHeapInternalTimer<K, W> *timer);

private:
    std::unique_ptr<SlicingWindowProcessor<K, W>> windowProcessor_ = nullptr;
    int64_t lastTriggeredProcessingTime = std::numeric_limits<int64_t>::min();
    std::shared_ptr<ListState<int64_t>> watermarkState;
    nlohmann::json description;
    InternalTimerServiceImpl<K, int64_t> *internalTimerService = nullptr;
    Output* output;
};

template <typename W>
class WindowProcessorContext {
public:
    omniruntime::mem::MemoryManager *getMemoryManager();
    int64_t getMemorySize();
    AbstractKeyedStateBackend<omnistream::VectorBatch> *getKeyedStateBackend();
    InternalTimerService<W> *getTimerService();
    void setBackend();

private:
    // operatorOwner is not used
    omniruntime::mem::MemoryManager *memoryManager;
    int64_t memorySize;
    InternalTimerService<W> *timerService;
    AbstractKeyedStateBackend<omnistream::VectorBatch> *keyedStateBackend;
    Output *collector;
    RuntimeContext *runtimeContext;
};

template <typename K, typename W>
SlicingWindowOperator<K, W>::SlicingWindowOperator(std::unique_ptr<SlicingWindowProcessor<K, W>> windowProcessor,
        const nlohmann::json config)
        :
        TableStreamOperator<K>(windowProcessor->getOutput()),
        windowProcessor_(std::move(windowProcessor)),
        description(config) {
    this->output = windowProcessor_->getOutput();
}

template <typename K, typename W>
void SlicingWindowOperator<K, W>::open() {
    TableStreamOperator<K>::open();
    lastTriggeredProcessingTime = std::numeric_limits<int64_t>::min();
    auto *runtimeCtx = AbstractStreamOperator<K>::getRuntimeContext();
    auto backState = this->stateHandler->getKeyedStateBackend();
    TypeSerializer *windowSerializer = windowProcessor_->createWindowSerializer();
    internalTimerService =
            AbstractStreamOperator<K>::template getInternalTimerService<int64_t>(
                "window-timers", windowSerializer, this);
    windowProcessor_->open(backState, description, runtimeCtx, internalTimerService);
    windowProcessor_->initializeWatermark(this->currentWatermark);
}

template <typename K, typename W>
void SlicingWindowOperator<K, W>::close() {
    windowProcessor_->close();
}

template <typename K, typename W>
void SlicingWindowOperator<K, W>::processBatch(StreamRecord* input) {
    auto record = std::unique_ptr<StreamRecord>(input);
    auto batch = std::unique_ptr<omnistream::VectorBatch>(reinterpret_cast<omnistream::VectorBatch*>(record->getValue()));
    this->processBatch(batch.get());
}

template <typename K, typename W>
void SlicingWindowOperator<K, W>::processBatch(omnistream::VectorBatch *batch) {
    windowProcessor_->processBatch(batch);
}

template <typename K, typename W>
void SlicingWindowOperator<K, W>::ProcessWatermark(Watermark *mark) {
    if (mark->getTimestamp() > this->currentWatermark) {
        windowProcessor_->advanceProgress(mark->getTimestamp());
        TableStreamOperator<K>::ProcessWatermark(mark);
    } else {
        TableStreamOperator<K>::ProcessWatermark(new Watermark(this->currentWatermark));
    }
}

template <typename K, typename W>
void SlicingWindowOperator<K, W>::onEventTime(TimerHeapInternalTimer<K, W> *timer) {
    onTimer(timer);
}

template <typename K, typename W>
void SlicingWindowOperator<K, W>::onProcessingTime(TimerHeapInternalTimer<K, W> *timer) {
    if (timer->getTimestamp() > lastTriggeredProcessingTime) {
        lastTriggeredProcessingTime = timer->getTimestamp();
        windowProcessor_->advanceProgress(timer->getTimestamp());
    }
    onTimer(timer);
}

template <typename K, typename W>
void SlicingWindowOperator<K, W>::onTimer(TimerHeapInternalTimer<K, W> *timer) {
    K windowKey = timer->getKey();
    this->stateHandler->setCurrentKey(windowKey);
    W window = timer->getNamespace();
    windowProcessor_->fireWindow(window);
    windowProcessor_->clearWindow(window);
}

template <typename K, typename W>
void SlicingWindowOperator<K, W>::prepareSnapshotPreBarrier(int64_t checkpointId) {
    INFO_RELEASE("SlicingWindowOperator prepareSnapshotPreBarrier:" << checkpointId)
    windowProcessor_->prepareCheckpoint();
}

template <typename W>
AbstractKeyedStateBackend<omnistream::VectorBatch> *WindowProcessorContext<W>::getKeyedStateBackend() {
    return keyedStateBackend;
}

template <typename K, typename W>
Output*  SlicingWindowOperator<K, W>::getOutput() {
    return windowProcessor_->getOutput();
}