* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#ifndef OMNISTREAM_RESOURCEGUARD_H
#define OMNISTREAM_RESOURCEGUARD_H
#include <atomic>
#include <condition_variable>
#include <mutex>
class ThreadInterrupt {
public:
static void interrupt();
static bool isInterrupted();
static void clear();
private:
static thread_local std::atomic<bool> interrupted_;
};
class ResourceGuard {
public:
ResourceGuard() : leaseCount(0),
closed(false) {};
~ResourceGuard() { close();}
class Lease {
public:
Lease(Lease&& other) noexcept
: parent(other.parent),
closed(other.closed.load())
{
other.closed = true;
};
Lease& operator=(Lease&& other) = delete;
~Lease() { close();};
void close()
{
if (!closed.exchange(true)) {
parent.releaseResource();
}
};
friend class ResourceGuard;
private:
ResourceGuard& parent;
std::atomic<bool> closed;
explicit Lease(ResourceGuard& guard) : parent(guard),
closed(false) {};
};
Lease* acquireResource()
{
std::unique_lock<std::mutex> lk(mtx);
if (closed) {
throw std::exception();
}
leaseCount++;
return new Lease(*this);
};
void releaseResource()
{
std::unique_lock<std::mutex> lk(mtx);
if (--leaseCount == 0 && closed) {
cv.notify_all();
}
};
void closeInterruptibly()
{
std::unique_lock<std::mutex> lk(mtx);
closed = true;
while (leaseCount > 0) {
cv.wait(lk);
}
};
void closeUninterruptibly()
{
std::unique_lock<std::mutex> lk(mtx);
closed = true;
while (leaseCount > 0) {
if (ThreadInterrupt::isInterrupted()) {
ThreadInterrupt::clear();
}
cv.wait(lk, [this]() {
return leaseCount == 0 || ThreadInterrupt::isInterrupted();
});
}
if (ThreadInterrupt::isInterrupted()) {
ThreadInterrupt::clear();
}
};
void close()
{
closeUninterruptibly();
};
bool isClosed() const
{
return closed;
};
int getLeaseCount() const
{
return leaseCount;
};
private:
std::atomic<int> leaseCount;
std::atomic<bool> closed;
std::mutex mtx;
std::condition_variable cv;
};
#endif