* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "LocalBufferPool.h"
namespace omnistream {
LocalBufferPool::LocalBufferPool(
std::shared_ptr<NetworkBufferPool> networkBufferPool,
int numberOfSubpartitions, int maxBuffersPerChannel, int currentPoolSize,
int numberOfRequiredSegments, int maxNumberOfSegments,
std::shared_ptr<AvailabilityHelper> availabilityHelper)
: networkBufferPool(networkBufferPool), maxBuffersPerChannel_(maxBuffersPerChannel), currentPoolSize_(currentPoolSize),
numberOfRequiredSegments_(numberOfRequiredSegments),
maxNumberOfSegments(maxNumberOfSegments),
availabilityHelper_(availabilityHelper),
subpartitionBuffersCount_(numberOfSubpartitions, 0) {
}
void LocalBufferPool::checkConsistentAvailability()
{
LOG("checkConsistentAvailability in lock")
bool shouldBeAvailableValue = shouldBeAvailable();
if (availabilityHelper_->isApproximatelyAvailable() != shouldBeAvailableValue) {
throw std::runtime_error("Inconsistent availability: expected " + std::to_string(shouldBeAvailableValue));
}
}
const AvailabilityStatus &LocalBufferPool::checkAvailability()
{
if (!availableSegments.empty()) {
return AvailabilityStatus::from(shouldBeAvailable(), false);
}
if (isRequestedSizeReached()) {
return AvailabilityStatus::UNAVAILABLE_NEED_NOT_REQUESTING_NOTIFICATION();
}
bool needRequestingNotificationOfGlobalPoolAvailable = false;
if (!requestSegmentFromGlobal()) {
needRequestingNotificationOfGlobalPoolAvailable = !requestingNotificationOfGlobalPoolAvailable;
}
return AvailabilityStatus::from(shouldBeAvailable(), needRequestingNotificationOfGlobalPoolAvailable);
}
std::shared_ptr<CompletableFuture> LocalBufferPool::checkAndUpdateAvailability()
{
std::shared_ptr<CompletableFuture> toNotify = nullptr;
const AvailabilityStatus &availabilityStatus = checkAvailability();
if (availabilityStatus.isAvailable()) {
toNotify = availabilityHelper_->getUnavailableToResetAvailable();
} else {
availabilityHelper_->resetUnavailable();
}
if (availabilityStatus.isNeedRequestingNotificationOfGlobalPoolAvailable()) {
requestSegmentFromGlobalWhenAvailable();
}
checkConsistentAvailability();
return toNotify;
}
bool LocalBufferPool::shouldBeAvailable()
{
LOG("shouldBeAvailable in lock")
return !availableSegments.empty() && unavailableSubpartitionsCount_ == 0;
}
void LocalBufferPool::recycle(Segment *segment)
{
recycle(segment, UNKNOWN_CHANNEL);
}
void LocalBufferPool::recycle(Segment *segment, int channel)
{
std::shared_ptr<BufferListener> listener = nullptr;
std::shared_ptr<CompletableFuture> toNotify = nullptr;
do {
std::lock_guard<std::recursive_mutex> lock(availableSegmentsLock);
if (channel != UNKNOWN_CHANNEL) {
if (subpartitionBuffersCount_[channel]-- == maxBuffersPerChannel_) {
unavailableSubpartitionsCount_--;
}
}
if (isDestroyed_ || hasExcessBuffers()) {
returnSegment(segment);
return;
} else {
if (registeredListeners_.empty()) {
availableSegments.push_back(segment);
if (!availabilityHelper_->isApproximatelyAvailable() && shouldBeAvailable()) {
toNotify = availabilityHelper_->getUnavailableToResetAvailable();
}
break;
} else {
listener = registeredListeners_.front();
registeredListeners_.pop_front();
}
}
checkConsistentAvailability();
}
while (!fireBufferAvailableNotification(listener, segment));
mayNotifyAvailable(toNotify);
}
int LocalBufferPool::getNumberOfRequiredSegments() const
{
return numberOfRequiredSegments_;
}
void LocalBufferPool::setNumBuffers(int numBuffers)
{
std::shared_ptr<CompletableFuture> toNotify;
{
std::lock_guard<std::recursive_mutex> lock(availableSegmentsLock);
if (numBuffers < numberOfRequiredSegments_) {
throw std::invalid_argument(
"Buffer pool needs at least " + std::to_string(numberOfRequiredSegments_) +
" buffers, but tried to set to " + std::to_string(numBuffers));
}
currentPoolSize_ = std::min(numBuffers, maxNumberOfSegments);
returnExcessSegments();
if (isDestroyed_) {
return;
}
toNotify = checkAndUpdateAvailability();
}
mayNotifyAvailable(toNotify);
}
bool LocalBufferPool::addBufferListener(std::shared_ptr<BufferListener> listener)
{
std::lock_guard<std::recursive_mutex> lock(availableSegmentsLock);
if (!availableSegments.empty() || isDestroyed_) {
return false;
}
registeredListeners_.push_back(listener);
return true;
}
void LocalBufferPool::onGlobalPoolAvailable()
{
std::shared_ptr<CompletableFuture> toNotify = nullptr;
{
std::lock_guard<std::recursive_mutex> lock(availableSegmentsLock);
requestingNotificationOfGlobalPoolAvailable = false;
if (isDestroyed_ || availabilityHelper_->isApproximatelyAvailable()) {
return;
}
toNotify = checkAndUpdateAvailability();
}
mayNotifyAvailable(toNotify);
}
std::shared_ptr<CompletableFuture> LocalBufferPool::GetAvailableFuture()
{
return availabilityHelper_->GetAvailableFuture();
}
void LocalBufferPool::requestSegmentFromGlobalWhenAvailable()
{
if (requestingNotificationOfGlobalPoolAvailable) {
throw std::runtime_error("local buffer pool is already in the state of requesting memory segment from global when it is available.");
}
requestingNotificationOfGlobalPoolAvailable = true;
class InnerRunnable : public Runnable {
public:
InnerRunnable(LocalBufferPool* self) {
this->self = self;
}
~InnerRunnable() = default;
void run() override {
self->onGlobalPoolAvailable();
}
private:
LocalBufferPool* self;
};
auto runnable = std::make_shared<InnerRunnable>(this);
networkBufferPool->GetAvailableFuture()->thenRun(runnable);
}
void LocalBufferPool::mayNotifyAvailable(std::shared_ptr<CompletableFuture> toNotify)
{
if (toNotify) {
toNotify->complete();
}
}
bool LocalBufferPool::fireBufferAvailableNotification(std::shared_ptr<BufferListener> listener,
Segment *segment)
{
return listener->notifyBufferAvailable(nullptr);
}
void LocalBufferPool::cancel()
{
cancelled_ = true;
std::shared_ptr<CompletableFuture> toNotify = nullptr;
{
std::lock_guard<std::recursive_mutex> lock(availableSegmentsLock);
toNotify = availabilityHelper_->getUnavailableToResetAvailable();
}
mayNotifyAvailable(toNotify);
}
LocalBufferPool::SubpartitionBufferRecycler::SubpartitionBufferRecycler(int channel,
std::shared_ptr<LocalBufferPool> bufferPool)
: channel_(channel), bufferPool_(bufferPool) {
}
void LocalBufferPool::SubpartitionBufferRecycler::recycle(Segment *segment)
{
bufferPool_->recycle(segment, channel_);
}
}