* Copyright (c) 2025 Huawei Technologies Co., Ltd.
* openUBMC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
*/
#include <boost/asio.hpp>
#include <mc/log.h>
#include <mc/runtime.h>
#include <mc/runtime/runtime_context.h>
#include <mc/runtime/runtime_strand.h>
namespace mc::runtime {
runtime_strand::data_t::~data_t() {
destory_queue(&waiting_queue);
destory_queue(&ready_queue);
}
void runtime_strand::data_t::destory_queue(task_queue* q) {
while (!q->empty()) {
task_operation_base* op = q->front();
q->pop_front();
op->destroy();
}
}
runtime_strand::invoker::invoker(mc::shared_ptr<data_t> data)
: m_data(data) {
}
void runtime_strand::invoker::operator()() {
while (true) {
if (run_ready_handlers()) {
return;
}
std::lock_guard lock(m_data->mutex);
if (!m_data->waiting_queue.empty()) {
m_data->ready_queue.splice_back(m_data->waiting_queue);
} else {
m_data->locked = false;
break;
}
}
}
bool runtime_strand::invoker::run_ready_handlers() {
m_data->running_thread.store(std::this_thread::get_id(), std::memory_order_release);
while (true) {
task_operation_base* op = nullptr;
{
std::lock_guard lock(m_data->mutex);
if (m_data->shutdown || m_data->ready_queue.empty()) {
break;
}
op = m_data->ready_queue.front();
auto* current_shard = thread_pool::get_current_shard();
thread_pool* current_pool = current_shard ? ¤t_shard->pool : nullptr;
thread_pool* target_pool = op->target_pool;
if (target_pool && current_pool != target_pool) {
m_data->running_thread.store(std::thread::id{}, std::memory_order_release);
schedule_next(target_pool);
return true;
}
m_data->ready_queue.pop_front();
}
if (op) {
try {
op->execute();
} catch (std::exception& e) {
elog("Exception in runtime_strand: {}", e.what());
} catch (...) {
elog("Unknown exception in runtime_strand");
}
op->destroy();
}
}
m_data->running_thread.store(std::thread::id{}, std::memory_order_release);
return false;
}
void runtime_strand::invoker::schedule_next(thread_pool* target_pool) {
if (target_pool) {
target_pool->get_executor().post(invoker(m_data), std::allocator<void>{});
}
}
runtime_strand::runtime_strand() : m_data(mc::make_shared<data_t>()) {
;
}
runtime_strand::~runtime_strand() {
if (!m_data) {
return;
}
std::lock_guard lock(m_data->mutex);
m_data->shutdown = true;
}
bool runtime_strand::running_in_this_thread() const noexcept {
if (!m_data) {
return false;
}
return m_data->running_thread.load(std::memory_order_acquire) == std::this_thread::get_id();
}
bool runtime_strand::operator==(const runtime_strand& other) const noexcept {
return m_data == other.m_data;
}
bool runtime_strand::operator!=(const runtime_strand& other) const noexcept {
return !(*this == other);
}
execution_context& runtime_strand::context() const {
if (m_bound_pool) {
return *m_bound_pool;
}
return get_runtime_context().io();
}
thread_pool::executor_type runtime_strand::get_default_executor() {
return get_io_executor();
}
bool runtime_strand::enqueue_op(task_operation_base* op) const {
std::lock_guard lock(m_data->mutex);
if (m_data->shutdown) {
op->destroy();
return false;
}
bool was_unlocked = !m_data->locked;
if (was_unlocked) {
m_data->locked = true;
m_data->ready_queue.push_back(*op);
} else {
m_data->waiting_queue.push_back(*op);
}
return was_unlocked;
}
}