/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */
#pragma once

#include <string>

#include "executiongraph/TaskInformationPOD.h"
#include "runtime/state/RocksDbKvStateInfo.h"
#include "runtime/state/RegisteredStateMetaInfoBase.h"
#include "runtime/state/RocksIteratorWrapper.h"

#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"


namespace fs = std::filesystem;

class RocksDbOperationUtils {
public:
    static rocksdb::DB* openDB(
            const std::string& path,
            const std::vector<rocksdb::ColumnFamilyDescriptor>& stateColumnFamilyDescriptors,
            std::vector<rocksdb::ColumnFamilyHandle*>& stateColumnFamilyHandles,
            const rocksdb::ColumnFamilyOptions& columnFamilyOptions,
            const rocksdb::DBOptions& dbOptions)
    {
        std::vector<rocksdb::ColumnFamilyDescriptor> columnFamilyDescriptors;
        columnFamilyDescriptors.emplace_back(rocksdb::kDefaultColumnFamilyName, columnFamilyOptions);
        columnFamilyDescriptors.insert(
            columnFamilyDescriptors.end(),
            stateColumnFamilyDescriptors.begin(),
            stateColumnFamilyDescriptors.end());
        rocksdb::DB* db;
        rocksdb::Status status = rocksdb::DB::Open(dbOptions,
                                                   path, columnFamilyDescriptors, &stateColumnFamilyHandles, &db);
        if (!status.ok()) {
            for (auto* handle : stateColumnFamilyHandles) {
                delete handle;
            }
            stateColumnFamilyHandles.clear();
            throw std::runtime_error("rocksdb open error");
        }

        if (1 + stateColumnFamilyDescriptors.size() != stateColumnFamilyHandles.size()) {
            delete db;
            for (auto* handle : stateColumnFamilyHandles) {
                delete handle;
            }
            stateColumnFamilyHandles.clear();

            throw std::runtime_error("Not all requested column family handles have been created");
        }

        return db;
    }

    static std::unique_ptr<RocksIteratorWrapper> getRocksIterator(
            rocksdb::DB* db,
            rocksdb::ColumnFamilyHandle* columnFamilyHandle,
            const rocksdb::ReadOptions& readOptions) {
        auto itPtr = db->NewIterator(readOptions, columnFamilyHandle);
        std::unique_ptr<rocksdb::Iterator> itUniqueptr(itPtr);
        return std::make_unique<RocksIteratorWrapper>(std::move(itUniqueptr));
    }

    static void registerKvStateInformation(
            std::unordered_map<std::string, std::shared_ptr<RocksDbKvStateInfo>> *kvStateInformation,
            const std::string& columnFamilyName,
            std::shared_ptr<RocksDbKvStateInfo> registeredColumn)
    {
        kvStateInformation->emplace(columnFamilyName, registeredColumn);
    }

    static std::shared_ptr<RocksDbKvStateInfo> createStateInfo(
            std::shared_ptr<RegisteredStateMetaInfoBase> metaInfoBase,
            rocksdb::DB* db,
            std::function<rocksdb::ColumnFamilyOptions(const std::string&)> columnFamilyOptionsFactory)
    {
        rocksdb::ColumnFamilyDescriptor columnFamilyDescriptor =
                createColumnFamilyDescriptor(
                    metaInfoBase,
                    columnFamilyOptionsFactory);
        rocksdb::ColumnFamilyHandle* handle = createColumnFamily(columnFamilyDescriptor, db);
        return std::make_shared<RocksDbKvStateInfo>(handle, metaInfoBase);
    }

    static rocksdb::ColumnFamilyDescriptor createColumnFamilyDescriptor(
            std::shared_ptr<RegisteredStateMetaInfoBase> metaInfoBase,
            std::function<rocksdb::ColumnFamilyOptions(const std::string&)> columnFamilyOptionsFactory)
    {
        auto options = createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase->getName());
        const std::string& nameStr = metaInfoBase->getName();
        return rocksdb::ColumnFamilyDescriptor(nameStr, options);
    }

    static rocksdb::ColumnFamilyOptions createColumnFamilyOptions(
            std::function<rocksdb::ColumnFamilyOptions(const std::string&)> columnFamilyOptionsFactory,
            const std::string& stateName)
    {
        auto columnFamilyOptions = columnFamilyOptionsFactory(stateName);
        return columnFamilyOptions;
    }

    static rocksdb::ColumnFamilyHandle* createColumnFamily(
            rocksdb::ColumnFamilyDescriptor& columnDescriptor,
            rocksdb::DB* db)
    {
        rocksdb::ColumnFamilyHandle* handle = nullptr;
        rocksdb::Status status = db->CreateColumnFamily(columnDescriptor.options, columnDescriptor.name, &handle);
        if (!status.ok()) {
            throw std::runtime_error("Error creating ColumnFamilyHandle.");
        }
        return handle;
    }
};