/*
 * Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
 * You can use this software according to the terms and conditions of the Mulan PSL v2.
 * You may obtain a copy of Mulan PSL v2 at:
 *          http://license.coscl.org.cn/MulanPSL2
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PSL v2 for more details.
 */

#ifndef LOCALMEMORYBUFFERPOOL_H
#define LOCALMEMORYBUFFERPOOL_H

#include "NetworkBuffer.h"
#include "MemoryBufferBuilder.h"
#include "LocalBufferPool.h"
#include "NetworkMemoryBufferPool.h"

namespace omnistream::datastream {
    class LocalMemoryBufferPool : public LocalBufferPool {
    public:
        LocalMemoryBufferPool(std::shared_ptr<NetworkMemoryBufferPool> networkBufferPool,
                              int numberOfRequiredMemorySegments)
            : LocalMemoryBufferPool(networkBufferPool, numberOfRequiredMemorySegments, INT_MAX, 0, INT_MAX) {
        }

        LocalMemoryBufferPool(std::shared_ptr<NetworkMemoryBufferPool> networkBufferPool,
                              int numberOfRequiredMemorySegments, int maxNumberOfMemorySegments)
            : LocalMemoryBufferPool(networkBufferPool, numberOfRequiredMemorySegments, maxNumberOfMemorySegments, 0,
                                    INT_MAX) {
        }

        LocalMemoryBufferPool(std::shared_ptr<NetworkMemoryBufferPool> networkBufferPool,
                              int numberOfRequiredMemorySegments,
                              int maxNumberOfMemorySegments,
                              int numberOfSubpartitions,
                              int maxBuffersPerChannel);

        ~LocalMemoryBufferPool() override = default;

        void postConstruct();

        void lazyDestroy() override;

        void reserveSegments(int numberOfSegmentsToReserve) override;

        bool isDestroyed() override;

        int getMaxNumberOfSegments() const override;

        int getNumberOfAvailableSegments() override;

        int getNumBuffers() override;

        int bestEffortGetNumOfUsedBuffers() const override;

        std::shared_ptr<Buffer> requestBuffer() override;

        BufferBuilder *requestBufferBuilder() override;

        BufferBuilder *requestBufferBuilder(int targetChannel) override;

        BufferBuilder *requestBufferBuilderBlocking() override;

        BufferBuilder *requestBufferBuilderBlocking(int targetChannel) override;

        std::shared_ptr<NetworkBuffer> requestNetworkBuffer();

        MemoryBufferBuilder *requestMemoryBufferBuilder();

        MemoryBufferBuilder *requestMemoryBufferBuilder(int targetChannel);

        MemoryBufferBuilder *requestMemoryBufferBuilderBlocking();

        MemoryBufferBuilder *requestMemoryBufferBuilderBlocking(int targetChannel);

        Segment *requestSegment() override;

        Segment *requestSegment(int targetChannel) override;

        Segment *requestSegmentBlocking() override;

        Segment *requestSegmentBlocking(int targetChannel) override;

        MemorySegment *requestPooledMemorySegment();

        MemorySegment *requestOverdraftMemorySegmentFromGlobal();

        MemorySegment *requestMemorySegment();

        MemorySegment *requestMemorySegment(int targetChannel);

        MemorySegment *requestMemorySegmentBlocking();

        MemorySegment *requestMemorySegmentBlocking(int targetChannel);

        std::shared_ptr<NetworkBuffer> toNetworkBuffer(MemorySegment *memorySegment);

        MemoryBufferBuilder *toMemoryBufferBuilder(MemorySegment *memorySegment, int targetChannel);

        std::string toString() const override;

    private:
        bool requestSegmentFromGlobal() override;

        void returnSegment(Segment *segment) override;

        void returnMemorySegment(MemorySegment *segment);

        void returnExcessSegments() override;

        void returnExcessMemorySegments();

        bool hasExcessBuffers() override;

        bool isRequestedSizeReached() override;

        std::shared_ptr<NetworkMemoryBufferPool> networkMemoryBufferPool;

        std::deque<std::shared_ptr<BufferListener> > registeredListeners_;

        /**
         * Number of all memory segments, which have been requested from the network buffer pool and are
         * somehow referenced through this pool (e.g. wrapped in Buffer instances or as available
         * segments).
         */
        int numberOfRequestedMemorySegments;
        std::vector<std::shared_ptr<BufferRecycler> > subpartitionBufferRecyclers_;
        bool isDestroyed_ = false;
    };

}


#endif // LOCALMEMORYBUFFERPOOL_H