/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

#ifndef OMNIFLINK_SINKWRITEROPERATOR_H
#define OMNIFLINK_SINKWRITEROPERATOR_H

#include <vector>
#include <memory>
#include <optional>
#include <cstdint>
#include <iostream>
#include <stdexcept>
#include <limits>
#include "core/include/common.h"
#include "connector/kafka/sink/KafkaSink.h"
#include "connector/kafka/sink/KafkaWriter.h"
#include "connector/kafka/sink/KafkaCommittable.h"
#include "connector/kafka/sink/KafkaCommittableSerializer.h"
#include "streaming/runtime/streamrecord/StreamRecord.h"
#include "streaming/api/operators/OneInputStreamOperator.h"
#include "streaming/api/operators/AbstractStreamOperator.h"
#include "streaming/api/operators/sink/KafkaSinkWriterStateHandler.h"
#include "streaming/api/operators/sink/InitContextImpl.h"

class SinkWriterOperator : public OneInputStreamOperator, public AbstractStreamOperator<void*> {
public:
    static ListStateDescriptor<std::vector<uint8_t>> STREAMING_COMMITTER_RAW_STATES_DESC;

    SinkWriterOperator(KafkaSink* kafkaSink, const nlohmann::json& config);

    ~SinkWriterOperator()
    {
        close();
    }

    void initializeState(StateInitializationContextImpl* context) override;

    void snapshotState(StateSnapshotContextSynchronousImpl* context) override;

    void open() override;

    void close() override;

    RowData* getOutputEntireRow(omnistream::VectorBatch* batch, int rowId);

    void processBatch(StreamRecord* record) override;

    void processElement(StreamRecord* record) override;

    void EndInput();

    void initializeState(StreamTaskStateInitializerImpl* initializer, TypeSerializer* keySerializer) override;

    void notifyCheckpointComplete(long checkpointId) override;

    void notifyCheckpointAborted(long checkpointId) override;

    void ProcessWatermark(Watermark* watermark) override;

    void processWatermarkStatus(WatermarkStatus* watermarkStatus) override;

    bool canBeStreamOperator() override
    {
        return isDataStream;
    }

    std::string getTypeName() override;

    KafkaSink* getKafkaSink()
    {
        return kafkaSink;
    }

private:
    template <typename K>
    InitContextImpl<K>* createInitContext(std::optional<uint64_t> restoredCheckpointId);
    template <typename CommT>
    void emitCommittables(std::int64_t checkpointId);
    template <typename CommT>
    void emit(
        int indexOfThisSubtask,
        int numberOfParallelSubtasks,
        std::int64_t checkpointId,
        const std::vector<CommT>& committables);
    KafkaCommittableSerializer* committableSerializer{};
    bool emitDownstream{};
    std::int64_t currentWatermark{};
    KafkaSink* kafkaSink = nullptr;
    KafkaWriter* sinkWriter = nullptr;
    bool endOfInput = false;
    nlohmann::json description;
    std::vector<std::string> inputTypes;

    ProcessingTimeServiceImpl* processingTimeService = nullptr;
    KafkaSinkWriterStateHandler* writerStateHandler = nullptr;
    std::vector<KafkaCommittable> legacyCommittables;
    bool isDataStream;
    bool closed_ = false;
    int32_t subtaskIndex;
};
#endif // OMNIFLINK_SINKWRITEROPERATOR_H