* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef LOCALBUFFERPOOL_H
#define LOCALBUFFERPOOL_H
#include <deque>
#include "BufferPool.h"
#include "io/AvailabilityHelper.h"
#include "AvailabilityStatus.h"
#include "NetworkBufferPool.h"
namespace omnistream {
class LocalBufferPool : public BufferPool, public std::enable_shared_from_this<LocalBufferPool> {
public:
LocalBufferPool(
std::shared_ptr<NetworkBufferPool> networkBufferPool,
int numberOfSubpartitions,
int maxBuffersPerChannel, int currentPoolSize, int numberOfRequiredSegments, int maxNumberOfSegments, std::shared_ptr<AvailabilityHelper> availabilityHelper);
~LocalBufferPool() {
for (auto segment : availableSegments) {
delete segment;
}
availableSegments.clear();
}
virtual bool isRequestedSizeReached() = 0;
void checkConsistentAvailability();
std::shared_ptr<CompletableFuture> checkAndUpdateAvailability();
virtual bool requestSegmentFromGlobal() = 0;
void requestSegmentFromGlobalWhenAvailable();
void onGlobalPoolAvailable();
int getNumberOfRequiredSegments() const override;
bool addBufferListener(std::shared_ptr<BufferListener> listener) override;
std::shared_ptr<CompletableFuture> GetAvailableFuture() override;
void recycle(Segment *segment) override;
virtual bool hasExcessBuffers() = 0;
virtual void returnSegment(Segment *segment) = 0;
void mayNotifyAvailable(std::shared_ptr<CompletableFuture> toNotify);
bool fireBufferAvailableNotification(std::shared_ptr<BufferListener> listener, Segment *segment);
void recycle(Segment *segment, int channel);
virtual void returnExcessSegments() = 0;
void setNumBuffers(int numBuffers) override;
void cancel() override;
protected:
class SubpartitionBufferRecycler : public BufferRecycler {
public:
SubpartitionBufferRecycler(int channel, std::shared_ptr<LocalBufferPool> bufferPool);
void recycle(Segment *segment) override;
std::string toString() const override
{
return "MemoryBufferRecycler";
};
protected:
int channel_;
std::shared_ptr<LocalBufferPool> bufferPool_;
};
static const int UNKNOWN_CHANNEL = -1;
std::recursive_mutex availableSegmentsLock;
std::deque<std::shared_ptr<BufferListener>> registeredListeners_;
std::deque<Segment*> availableSegments;
std::shared_ptr<NetworkBufferPool> networkBufferPool;
int maxBuffersPerChannel_;
int currentPoolSize_;
int numberOfRequiredSegments_;
int maxNumberOfSegments;
std::shared_ptr<AvailabilityHelper> availabilityHelper_;
std::vector<int> subpartitionBuffersCount_;
bool isDestroyed_ = false;
int unavailableSubpartitionsCount_ = 0;
bool shouldBeAvailable();
const AvailabilityStatus &checkAvailability();
std::atomic<bool> cancelled_{false};
bool requestingNotificationOfGlobalPoolAvailable = false;
};
}
#endif