/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

#pragma once

#include "RocksDBSharedResources.h"
#include <cstdint>

class RocksDBMemoryControllerUtils {
public:
    RocksDBMemoryControllerUtils() = delete;
    ~RocksDBMemoryControllerUtils() = delete;
    RocksDBMemoryControllerUtils(const RocksDBMemoryControllerUtils&) = delete;
    RocksDBMemoryControllerUtils& operator=(const RocksDBMemoryControllerUtils&) = delete;

    static std::shared_ptr<RocksDBSharedResources> allocateRocksDBSharedResources(const omnistream::TaskInformationPOD& taskInfo) {
        if (taskInfo.getCacheAddr() != 0 && taskInfo.getWriteBufferManagerAddr() != 0) {
            INFO_RELEASE("CacheAddr and WriteBufferManagerAddr are not 0, RocksDBSharedResources is created in JAVA.");
            std::shared_ptr<rocksdb::Cache> cachePtr(
                    reinterpret_cast<rocksdb::Cache*>(taskInfo.getCacheAddr()),
                    [](rocksdb::Cache*) {});
            std::shared_ptr<rocksdb::WriteBufferManager> writeBufferManagerPtr(
                    reinterpret_cast<rocksdb::WriteBufferManager*>(taskInfo.getWriteBufferManagerAddr()),
                    [](rocksdb::WriteBufferManager*) {});
            auto writeBufferManagerCapacity = RocksDBMemoryControllerUtils::calculateWriteBufferManagerCapacity(
                    taskInfo.getStateBackendManagedMemorySize(),
                    taskInfo.getRocksDBMemoryConfiguration().getWriteBufferRatio());

            return std::make_shared<RocksDBSharedResources>(
                    cachePtr,
                    writeBufferManagerPtr,
                    writeBufferManagerCapacity,
                    taskInfo.getRocksDBMemoryConfiguration().isUsingPartitionedIndexFilters());
        } else {
            auto stateBackendManagedMemorySize = taskInfo.getStateBackendManagedMemorySize();
            auto writeBufferRatio = taskInfo.getRocksDBMemoryConfiguration().getWriteBufferRatio();
            auto highPriorityPoolRatio = taskInfo.getRocksDBMemoryConfiguration().getHighPriorityPoolRatio();
            auto usingPartitionedIndexFilters = taskInfo.getRocksDBMemoryConfiguration().isUsingPartitionedIndexFilters();

            return allocateRocksDBSharedResources(
                    stateBackendManagedMemorySize,
                    writeBufferRatio,
                    highPriorityPoolRatio,
                    usingPartitionedIndexFilters);
        }

    }

    static std::shared_ptr<RocksDBSharedResources> allocateRocksDBSharedResources(uint64_t totalMemorySize, double writeBufferRatio,
            double highPriorityPoolRatio, bool usingPartitionedIndexFilters) {
        uint64_t calculateCacheCapacity = calculateActualCacheCapacity(totalMemorySize, writeBufferRatio);
        auto cache = createCache(calculateCacheCapacity, highPriorityPoolRatio);
        uint64_t writeBufferManagerCapacity = calculateWriteBufferManagerCapacity(totalMemorySize, writeBufferRatio);
        auto writeBufferManager = createWriteBufferManager(writeBufferManagerCapacity, cache);
        INFO_RELEASE("Create RocksDBSharedResources in C++. totalMemorySize: " << totalMemorySize <<
                ", writeBufferRatio: " << writeBufferRatio <<
                ", highPriorityPoolRatio: " << highPriorityPoolRatio <<
                ", usingPartitionedIndexFilters: " << usingPartitionedIndexFilters <<
                ", calculateCacheCapacity: " << calculateCacheCapacity <<
                ", writeBufferManagerCapacity: " << writeBufferManagerCapacity);
        return std::make_shared<RocksDBSharedResources>(
                cache,
                writeBufferManager,
                writeBufferManagerCapacity,
                usingPartitionedIndexFilters
        );
    }

    static uint64_t calculateActualCacheCapacity(uint64_t totalMemorySize, double writeBufferRatio) {
        return static_cast<int64_t>((3 - writeBufferRatio) * totalMemorySize / 3);
    }

    static int64_t calculateWriteBufferManagerCapacity(int64_t totalMemorySize, double writeBufferRatio) {
        return static_cast<int64_t>(2 * totalMemorySize * writeBufferRatio / 3);
    }

    static std::shared_ptr<rocksdb::Cache> createCache(int64_t cacheCapacity, double highPriorityPoolRatio) {
        return rocksdb::NewLRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
    }

    static std::shared_ptr<rocksdb::WriteBufferManager> createWriteBufferManager(
            int64_t writeBufferManagerCapacity,
            const std::shared_ptr<rocksdb::Cache> cache) {
        return std::make_shared<rocksdb::WriteBufferManager>(writeBufferManagerCapacity, cache);
    }

    static int64_t calculateRocksDBDefaultArenaBlockSize(int64_t writeBufferSize) {
        int64_t arenaBlockSize = writeBufferSize / 8;

        constexpr int64_t align = 4 * 1024;
        return ((arenaBlockSize + align - 1) / align) * align;
    }

    static int64_t calculateRocksDBMutableLimit(int64_t bufferSize) {
        return bufferSize * 7 / 8;
    }

    static bool validateArenaBlockSize(int64_t arenaBlockSize, int64_t mutableLimit) {
        return arenaBlockSize <= mutableLimit;
    }
};