/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

#ifndef NETWORKMEMORYBUFFERPOOL_H
#define NETWORKMEMORYBUFFERPOOL_H
#include <memory>
#include <vector>
#include <deque>
#include <set>
#include <condition_variable>

#include <climits>
#include <memory/MemorySegment.h>
#include "runtime/buffer/ObjectSegment.h"

#include "io/AvailabilityHelper.h"
#include "BufferPool.h"
#include "NetworkBufferPool.h"

namespace omnistream::datastream {
    class LocalMemoryBufferPool;
}


namespace omnistream::datastream {

using namespace omnistream;

class NetworkMemoryBufferPool : public NetworkBufferPool,
        public std::enable_shared_from_this<NetworkMemoryBufferPool> {
public:
    NetworkMemoryBufferPool(int numberOfSegmentsToAllocate, int segmentSize)
        :NetworkMemoryBufferPool(numberOfSegmentsToAllocate, segmentSize,
                                 std::chrono::milliseconds(INT_MAX)) {}
    NetworkMemoryBufferPool(int numberOfSegmentsToAllocate, int segmentSize, std::chrono::milliseconds requestSegmentsTimeout);
    ~NetworkMemoryBufferPool() override {
        for (auto segment : availableMemorySegments) {
            delete segment;
        }
        availableMemorySegments.clear();
    }

    MemorySegment *requestPooledMemorySegment();
    std::vector<MemorySegment *> requestPooledMemorySegmentsBlocking(int numberOfSegmentsToRequest);
    void recyclePooledMemorySegment(MemorySegment *segment);

    std::vector<MemorySegment *> requestUnpooledMemorySegments(int numberOfSegmentsToRequest) override;
    void recycleUnpooledMemorySegments(const std::vector<MemorySegment *>& segments) override;
    std::vector<ObjectSegment *> requestUnpooledObjectSegments(int numberOfSegmentsToRequest) override {
        THROW_LOGIC_EXCEPTION("error")
    }

    void recycleUnpooledObjectSegments(const std::vector<ObjectSegment *> &segments) override {
        THROW_LOGIC_EXCEPTION("error")
    }
    void destroy();
    bool isDestroyed() const;
    int getTotalNumberOfMemorySegments() const;
    long getTotalMemory() const;
    int getNumberOfAvailableMemorySegments();
    long getAvailableMemory();
    int getNumberOfUsedMemorySegments();
    long getUsedMemory();
    int getNumberOfRegisteredBufferPools();
    int countBuffers();
    std::shared_ptr<CompletableFuture> GetAvailableFuture() override;
    std::shared_ptr<BufferPool> createBufferPool(int numRequiredBuffers, int maxUsedBuffers) override;
    std::shared_ptr<BufferPool> createBufferPool(int numRequiredBuffers, int maxUsedBuffers,
                                                 int numSubpartitions, int maxBuffersPerChannel) override;
    void destroyBufferPool(std::shared_ptr<BufferPool> objectBufferPool) override;
    void destroyAllBufferPools();

    std::string toString() const override;

private:
    std::vector<MemorySegment *> internalRequestMemorySegments(int numberOfSegmentsToRequest);
    MemorySegment *internalRequestMemorySegment();
    void revertRequiredBuffers(int size);
    void internalRecycleMemorySegments(const std::vector<MemorySegment *>& segments);
    std::shared_ptr<LocalMemoryBufferPool> internalCreateMemoryBufferPool(int numRequiredBuffers, int maxUsedBuffers,
                                                                          int numSubpartitions,
                                                                          int maxBuffersPerChannel);
    void tryRedistributeBuffers(int numberOfSegmentsToRequest);
    void redistributeBuffers();
    std::string getConfigDescription();

    std::shared_ptr<AvailabilityHelper>  availabilityHelper;
    int totalNumberOfMemorySegments;
    std::deque<MemorySegment*> availableMemorySegments;
    std::recursive_mutex availableMemorySegmentMutex;
    bool isDestroyed_ = false;
    std::recursive_mutex factoryLock;
    std::set<std::shared_ptr<LocalMemoryBufferPool>> allMemoryBufferPools;
    std::set<std::shared_ptr<LocalMemoryBufferPool>> resizableBufferPools;
    int numTotalRequiredBuffers {};
    std::chrono::milliseconds requestSegmentsTimeout {};
    std::condition_variable_any cv;
    int segmentSize;
};
}

#endif // NETWORKMEMORYBUFFERPOOL_H