* Copyright (c) Huawei Technologies Co., Ltd. 2026. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Description: URMA async event handler for Jetty/JFC error events.
*/
#include "datasystem/common/rdma/urma_async_event_handler.h"
#include <sys/epoll.h>
#include <unistd.h>
#include <cerrno>
#include "datasystem/common/inject/inject_point.h"
#include "datasystem/common/log/log.h"
#include "datasystem/common/log/trace.h"
#include "datasystem/common/rdma/urma_dlopen_util.h"
#include "datasystem/common/util/format.h"
#include "datasystem/common/util/raii.h"
#include "datasystem/common/util/status_helper.h"
namespace datasystem {
namespace {
constexpr uint32_t K_URMA_ERROR_LOG_EVERY_N = 100;
constexpr int K_URMA_ASYNC_EVENT_EPOLL_TIMEOUT_MS = 100;
}
void UrmaAsyncEventHandler::Init(UrmaResource *resource)
{
urmaResource_ = resource;
}
void UrmaAsyncEventHandler::Start(const std::atomic<bool> &stopFlag)
{
serverAsyncEventThread_ = std::make_unique<std::thread>(&UrmaAsyncEventHandler::Run, this, std::ref(stopFlag));
}
void UrmaAsyncEventHandler::Stop()
{
if (serverAsyncEventThread_ && serverAsyncEventThread_->joinable()) {
LOG(INFO) << "Waiting for Async Event thread to exit";
serverAsyncEventThread_->join();
serverAsyncEventThread_.reset();
}
}
Status UrmaAsyncEventHandler::GetAsyncEvent(urma_async_event_t &event)
{
event = {};
CHECK_FAIL_RETURN_STATUS(urmaResource_ != nullptr, K_RUNTIME_ERROR, "URMA resource is null");
INJECT_POINT("UrmaManager.InjectAsyncEvent", [this, &event](int eventType) {
switch (eventType) {
case URMA_EVENT_JETTY_ERR: {
std::shared_ptr<UrmaJetty> jetty;
RETURN_IF_NOT_OK(urmaResource_->GetAnyValidJetty(jetty));
event.urma_ctx = urmaResource_->GetContext();
event.event_type = URMA_EVENT_JETTY_ERR;
event.element.jetty = jetty->Raw();
return Status::OK();
}
case URMA_EVENT_JFC_ERR:
event.urma_ctx = urmaResource_->GetContext();
event.event_type = URMA_EVENT_JFC_ERR;
event.element.jfc = urmaResource_->GetJfc();
CHECK_FAIL_RETURN_STATUS(event.element.jfc != nullptr, K_RUNTIME_ERROR,
"URMA JFC is null for injected async event");
return Status::OK();
default:
return Status(K_INVALID, FormatString("Unsupported injected async event type: %d", eventType));
}
});
urma_status_t rc = ds_urma_get_async_event(urmaResource_->GetContext(), &event);
if (rc != URMA_SUCCESS) {
return Status(K_URMA_ERROR, FormatString("urma_get_async_event failed: %d", rc));
}
return Status::OK();
}
Status UrmaAsyncEventHandler::Run(const std::atomic<bool> &stopFlag)
{
LOG(INFO) << "[URMA_AE] Async event thread started";
CHECK_FAIL_RETURN_STATUS(urmaResource_ != nullptr, K_RUNTIME_ERROR, "URMA resource is null");
auto *context = urmaResource_->GetContext();
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(context != nullptr, K_RUNTIME_ERROR, "[URMA_AE] URMA context is null");
int asyncFd = context->async_fd;
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(asyncFd > 0, K_RUNTIME_ERROR,
FormatString("[URMA_AE] Invalid URMA async fd: %d", asyncFd));
int epollFd = epoll_create(1);
CHECK_FAIL_RETURN_STATUS_PRINT_ERROR(epollFd > 0, K_RUNTIME_ERROR,
FormatString("[URMA_AE] epoll_create failed: %d", errno));
Raii closeEpollFd([epollFd]() { RETRY_ON_EINTR(close(epollFd)); });
epoll_event asyncEvent{};
asyncEvent.events = EPOLLIN;
asyncEvent.data.fd = asyncFd;
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, asyncFd, &asyncEvent) != 0) {
RETURN_STATUS_LOG_ERROR(K_RUNTIME_ERROR,
FormatString("[URMA_AE] epoll_ctl add async fd %d failed: %d", asyncFd, errno));
}
while (!stopFlag.load()) {
epoll_event epollEvent;
int nevent = epoll_wait(epollFd, &epollEvent, 1, K_URMA_ASYNC_EVENT_EPOLL_TIMEOUT_MS);
INJECT_POINT_NO_RETURN("UrmaManager.InjectAsyncEvent", [&nevent, &epollEvent](int) {
nevent = 1;
epollEvent.events = EPOLLIN;
});
if (nevent == 0 || (nevent < 0 && errno == EINTR) || stopFlag) {
continue;
}
if (nevent < 0) {
LOG_FIRST_AND_EVERY_N(ERROR, K_URMA_ERROR_LOG_EVERY_N) << "[URMA_AE] epoll_wait failed: " << errno;
continue;
}
if ((epollEvent.events & (EPOLLERR | EPOLLHUP | EPOLLRDBAND)) != 0) {
LOG_FIRST_AND_EVERY_N(ERROR, K_URMA_ERROR_LOG_EVERY_N)
<< "[URMA_AE] async fd epoll event invalid: " << epollEvent.events;
continue;
}
urma_async_event_t event;
auto traceGuard = Trace::Instance().SetTraceUUID();
auto rc = GetAsyncEvent(event);
if (rc.IsError()) {
LOG_FIRST_AND_EVERY_N(ERROR, K_URMA_ERROR_LOG_EVERY_N)
<< "[URMA_AE] GetAsyncEvent failed: " << rc.ToString();
continue;
}
LOG(WARNING) << "[URMA_AE] Received async event, type=" << static_cast<int>(event.event_type);
auto handleRc = HandleUrmaAsyncEvent(event);
if (handleRc.IsError()) {
LOG(ERROR) << "[URMA_AE] HandleUrmaAsyncEvent failed: " << handleRc.ToString();
}
ds_urma_ack_async_event(&event);
}
LOG(INFO) << "[URMA_AE] Async event thread exiting";
return Status::OK();
}
Status UrmaAsyncEventHandler::HandleUrmaAsyncEvent(const urma_async_event_t &event)
{
switch (event.event_type) {
case URMA_EVENT_JETTY_ERR:
return HandleJettyErrAsyncEvent(event.element.jetty);
case URMA_EVENT_JFC_ERR:
return HandleJfcErrAsyncEvent(event.element.jfc);
default:
LOG(WARNING) << "[URMA_AE] Unhandled async event type=" << static_cast<int>(event.event_type);
return Status::OK();
}
}
Status UrmaAsyncEventHandler::HandleJettyErrAsyncEvent(urma_jetty_t *rawJetty)
{
CHECK_FAIL_RETURN_STATUS(urmaResource_ != nullptr, K_RUNTIME_ERROR, "URMA resource is null");
if (rawJetty == nullptr) {
LOG(ERROR) << "[URMA_AE_JETTY_ERR] rawJetty is null in async event";
return Status::OK();
}
const uint32_t jettyId = rawJetty->jetty_id.id;
LOG(WARNING) << "[URMA_AE_JETTY_ERR] jettyId=" << jettyId;
std::shared_ptr<UrmaJetty> failedJetty;
auto lookupRc = urmaResource_->GetJettyById(jettyId, failedJetty);
if (lookupRc.IsError() || failedJetty == nullptr) {
LOG(WARNING) << "[URMA_AE_JETTY_ERR] Jetty " << jettyId
<< " not found in registry, may have already been cleaned up";
return Status::OK();
}
auto connection = failedJetty->GetConnection().lock();
if (connection == nullptr) {
LOG(WARNING) << "[URMA_AE_JETTY_ERR] Jetty " << jettyId
<< " has no bound connection, cannot trigger recovery";
return Status::OK();
}
LOG(WARNING) << "[URMA_AE_JETTY_ERR] Triggering ReCreateJetty for jettyId=" << jettyId;
auto recreateRc = connection->ReCreateJetty(*urmaResource_, failedJetty);
if (recreateRc.IsError()) {
LOG(ERROR) << "[URMA_AE_JETTY_ERR] ReCreateJetty failed for jettyId=" << jettyId << ": "
<< recreateRc.ToString();
} else {
LOG(INFO) << "[URMA_AE_JETTY_ERR] ReCreateJetty succeeded for jettyId=" << jettyId;
}
INJECT_POINT("UrmaManager.HandleJettyErrAsyncEvent");
return Status::OK();
}
Status UrmaAsyncEventHandler::HandleJfcErrAsyncEvent(urma_jfc_t *rawJfc)
{
INJECT_POINT("UrmaManager.HandleJfcErrAsyncEvent");
if (rawJfc == nullptr) {
LOG(ERROR) << "[URMA_AE_JFC_ERR] rawJfc is null in async event";
return Status::OK();
}
LOG(WARNING) << "[URMA_AE_JFC_ERR] jfc_id=" << rawJfc->jfc_id.id << ", no recovery action taken at this stage";
return Status::OK();
}
}