* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include "CompletableFuture.h"
namespace omnistream {
CompletableFuture::CompletableFuture() : state(FutureState::NOT_STARTED), done(false) {}
CompletableFuture::CompletableFuture(bool flag, FutureState futureState) : state(futureState), done(flag) {}
CompletableFuture::~CompletableFuture()
{
if (worker.joinable()) {
worker.detach();
}
}
CompletableFuture::CompletableFuture(CompletableFuture&& other) noexcept
: state(other.state), done(other.done.load()), exception(std::move(other.exception))
{
if (other.worker.joinable()) {
other.worker.detach();
}
}
CompletableFuture& CompletableFuture::operator=(CompletableFuture&& other) noexcept
{
if (this != &other) {
if (worker.joinable()) {
worker.join();
}
state = other.state;
done.store(other.done.load());
exception = std::move(other.exception);
if (other.worker.joinable()) {
other.worker.detach();
}
}
return *this;
}
void CompletableFuture::executeTask(std::shared_ptr<CompletableFuture> future, std::shared_ptr<Runnable> task)
{
try {
{
std::lock_guard<std::mutex> lock(future->mtx);
future->state = FutureState::RUNNING;
}
task->run();
{
std::lock_guard<std::mutex> lock(future->mtx);
future->state = FutureState::COMPLETED;
}
} catch (...) {
std::lock_guard<std::mutex> lock(future->mtx);
future->exception = std::current_exception();
future->state = FutureState::FAILED;
}
future->done.store(true);
future->cv.notify_all();
future->self.reset();
}
void CompletableFuture::runAs(std::shared_ptr<Runnable> task)
{
if (isDone()) {
throw std::runtime_error("Future already completed");
}
{
std::lock_guard<std::mutex> lock(mtx);
if (state != FutureState::NOT_STARTED) {
throw std::runtime_error("Future already started");
}
}
self = shared_from_this();
worker = std::thread(executeTask, self, task);
}
std::shared_ptr<CompletableFuture> CompletableFuture::runAsync(std::shared_ptr<Runnable> task)
{
std::shared_ptr<CompletableFuture> future = std::make_shared<CompletableFuture>();
future->runAs(task);
return future;
}
std::shared_ptr<CompletableFuture> CompletableFuture::thenRun(std::shared_ptr<Runnable> task)
{
class ChainedTask : public Runnable {
public:
ChainedTask(std::shared_ptr<CompletableFuture> parentFuture, std::shared_ptr<Runnable> nextTask)
: parent(parentFuture), next(std::move(nextTask)) {}
void run() override
{
parent->get();
next->run();
}
private:
std::shared_ptr<CompletableFuture> parent;
std::shared_ptr<Runnable> next;
};
auto chainedTask = std::make_shared<ChainedTask>(shared_from_this(), task);
if (isDone()) {
chainedTask->run();
return shared_from_this();
} else {
return runAsync(chainedTask);
}
}
FutureState CompletableFuture::getState()
{
std::lock_guard<std::mutex> lock(mtx);
return state;
}
void CompletableFuture::get()
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this] { return done.load(); });
if (exception) {
std::rethrow_exception(exception);
}
}
bool CompletableFuture::get(long timeoutMillis)
{
std::unique_lock<std::mutex> lock(mtx);
bool result = cv.wait_for(lock, std::chrono::milliseconds(timeoutMillis), [this] { return done.load(); });
if (result && exception) {
std::rethrow_exception(exception);
}
return result;
}
std::shared_ptr<CompletableFuture> CompletableFuture::allOf(const std::vector<std::shared_ptr<CompletableFuture>>& futures)
{
class AllOfTask : public Runnable {
public:
explicit AllOfTask(const std::vector<std::shared_ptr<CompletableFuture>>& futures)
: allFutures(futures) {}
void run() override
{
for (const auto& future : allFutures) {
future->get();
}
}
private:
std::vector<std::shared_ptr<CompletableFuture>> allFutures;
};
auto allOfTask = std::make_shared<AllOfTask>(futures);
return runAsync(allOfTask);
}
std::shared_ptr<CompletableFuture> CompletableFuture::anyOf(const std::vector<std::shared_ptr<CompletableFuture>>& futures)
{
if (futures.empty()) {
auto result = std::make_shared<CompletableFuture>();
result->complete();
return result;
}
class AnyOfTask : public Runnable {
public:
explicit AnyOfTask(const std::vector<std::shared_ptr<CompletableFuture>>& futures)
: anyFutures(futures) {}
void run() override
{
std::atomic<bool> anyDone(false);
std::vector<std::thread> checkers;
for (const auto& future : anyFutures) {
checkers.emplace_back([future, &anyDone]() {
try {
future->get();
} catch (...) {
}
anyDone.store(true);
});
}
while (!anyDone.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
for (auto& thread : checkers) {
if (thread.joinable()) {
thread.detach();
}
}
}
private:
std::vector<std::shared_ptr<CompletableFuture>> anyFutures;
};
auto anyOfTask = std::make_shared<AnyOfTask>(futures);
return runAsync(anyOfTask);
}
bool CompletableFuture::cancel()
{
std::lock_guard<std::mutex> lock(mtx);
if (done.load() || state == FutureState::RUNNING) {
return false;
}
state = FutureState::CANCELLED;
done.store(true);
cv.notify_all();
return true;
}
bool CompletableFuture::isDone() const
{
return done.load();
}
bool CompletableFuture::isCancelled()
{
std::lock_guard<std::mutex> lock(mtx);
return state == FutureState::CANCELLED;
}
void CompletableFuture::setCompleted()
{
std::lock_guard<std::mutex> lock(mtx);
state = FutureState::COMPLETED;
}
void CompletableFuture::complete()
{
std::lock_guard<std::mutex> lock(mtx);
state = FutureState::COMPLETED;
done.store(true);
cv.notify_all();
}
std::string CompletableFuture::toString() const
{
switch (state) {
case FutureState::CANCELLED:
return "Cancelled";
case FutureState::NOT_STARTED:
return "NotStarted";
case FutureState::RUNNING:
return "Running";
case FutureState::COMPLETED:
return "Completed";
case FutureState::FAILED:
return "Failed";
}
return {};
}
}