* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef NETWORKOBJECTBUFFERPOOL_H
#define NETWORKOBJECTBUFFERPOOL_H
#include <memory>
#include <vector>
#include <deque>
#include <set>
#include <condition_variable>
#include "ObjectSegment.h"
#include "io/AvailabilityHelper.h"
#include "BufferPool.h"
#include "BufferPoolFactory.h"
#include "LocalObjectBufferPool.h"
namespace omnistream {
class LocalObjectBufferPool;
}
namespace omnistream {
class NetworkObjectBufferPool : public NetworkBufferPool,
public std::enable_shared_from_this<NetworkObjectBufferPool> {
public:
NetworkObjectBufferPool(int numberOfSegmentsToAllocate, int segmentSize)
:NetworkObjectBufferPool(numberOfSegmentsToAllocate, segmentSize,
std::chrono::milliseconds(INT_MAX)) {}
NetworkObjectBufferPool(int numberOfSegmentsToAllocate, int segmentSize, std::chrono::milliseconds requestSegmentsTimeout);
~NetworkObjectBufferPool() override;
ObjectSegment *requestPooledObjectSegment();
std::vector<ObjectSegment *> requestPooledObjectSegmentsBlocking(int numberOfSegmentsToRequest);
void recyclePooledObjectSegment(ObjectSegment *segment);
std::vector<MemorySegment*> requestUnpooledMemorySegments(int numberOfSegmentsToRequest) override {
THROW_LOGIC_EXCEPTION("error")
}
void recycleUnpooledMemorySegments(const std::vector<MemorySegment*>& segments) override {
THROW_LOGIC_EXCEPTION("error")
}
std::vector<ObjectSegment *> requestUnpooledObjectSegments(int numberOfSegmentsToRequest) override;
void recycleUnpooledObjectSegments(const std::vector<ObjectSegment *>& segments) override;
void destroy();
bool isDestroyed() const;
int getTotalNumberOfObjectSegments() const;
long getTotalMemory() const;
int getNumberOfAvailableObjectSegments();
long getAvailableMemory();
int getNumberOfUsedObjectSegments();
long getUsedMemory();
int getNumberOfRegisteredBufferPools();
int countBuffers();
std::shared_ptr<CompletableFuture> GetAvailableFuture() override;
std::shared_ptr<BufferPool> createBufferPool(int numRequiredBuffers, int maxUsedBuffers) override;
std::shared_ptr<BufferPool> createBufferPool(int numRequiredBuffers, int maxUsedBuffers,
int numSubpartitions, int maxBuffersPerChannel) override;
void destroyBufferPool(std::shared_ptr<BufferPool> objectBufferPool) override;
void destroyAllBufferPools();
std::string toString() const override;
private:
std::vector<ObjectSegment *> internalRequestObjectSegments(int numberOfSegmentsToRequest);
ObjectSegment *internalRequestObjectSegment();
void revertRequiredBuffers(int size);
void internalRecycleObjectSegments(const std::vector<ObjectSegment *>& segments);
std::shared_ptr<BufferPool> internalCreateObjectBufferPool(int numRequiredBuffers, int maxUsedBuffers,
int numSubpartitions, int maxBuffersPerChannel);
void tryRedistributeBuffers(int numberOfSegmentsToRequest);
void redistributeBuffers();
std::string getConfigDescription();
int totalNumberOfObjectSegments;
int objectSegmentSize;
std::deque<ObjectSegment *> availableObjectSegments;
std::recursive_mutex availableObjSegMutex;
bool isDestroyed_ = false;
std::recursive_mutex factoryLock;
std::set<std::shared_ptr<LocalObjectBufferPool>> allBufferPools;
int numTotalRequiredBuffers {};
std::chrono::milliseconds requestSegmentsTimeout {};
std::shared_ptr<AvailabilityHelper> availabilityHelper = std::make_shared<AvailabilityHelper>();
std::condition_variable cv;
};
}
#endif