* Copyright (c) Huawei Technologies Co., Ltd. 2025. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <atomic>
#include <string>
#include <thread>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/eventfd.h>
#include <sys/socket.h>
#include <securec.h>
#include <csignal>
#include <unistd.h>
#include "actor/buslog.hpp"
#include "evloop/evloop.hpp"
namespace litebus {
int EventLoopRun(EvLoop *evloop, int timeout)
{
int nevent = 0;
struct epoll_event *events = nullptr;
(void)sem_post(&evloop->semId);
size_t size = sizeof(struct epoll_event) * EPOLL_EVENTS_SIZE;
events = (struct epoll_event *)malloc(size);
if (events == nullptr) {
BUSLOG_ERROR("malloc events fail");
return BUS_ERROR;
}
(void)memset_s(events, size, 0, size);
while (!evloop->stopLoop) {
evloop->EventFreeDelEvents();
BUSLOG_DEBUG("timeout:{},epoll_fd:{}", timeout, evloop->efd);
nevent = epoll_wait(evloop->efd, events, EPOLL_EVENTS_SIZE, timeout);
BUSLOG_DEBUG("nevent:{},epoll_fd:{}", nevent, evloop->efd);
if (nevent < 0) {
if (errno != EINTR) {
BUSLOG_ERROR("epoll_wait failed, epoll_fd:{},errno:{}", evloop->efd, errno);
free(events);
return BUS_ERROR;
} else {
continue;
}
} else if (nevent > 0) {
evloop->HandleEvent(events, nevent);
} else {
BUSLOG_ERROR("epoll_wait failed, epoll_fd:{},ret:0,errno:{}", evloop->efd, errno);
evloop->stopLoop = 1;
}
if (evloop->stopLoop) {
evloop->EventFreeDelEvents();
}
}
evloop->stopLoop = 0;
BUSLOG_INFO("event epoll loop run end");
free(events);
return BUS_OK;
}
void *EvloopRun(void *arg)
{
if (arg == nullptr) {
BUSLOG_ERROR("arg is null");
} else {
(void)EventLoopRun((EvLoop *)arg, -1);
}
return nullptr;
}
void QueueReadyCallback(int fd, uint32_t events, void *arg)
{
EvLoop *evloop = (EvLoop *)arg;
if (evloop == nullptr) {
BUSLOG_ERROR("evloop is null, fd:{},events:{}", fd, events);
return;
}
uint64_t count;
if (read(evloop->queueEventfd, &count, sizeof(count)) == static_cast<ssize_t>(sizeof(count))) {
std::queue<std::function<void()>> q;
evloop->queueMutex.lock();
evloop->queue.swap(q);
evloop->queueMutex.unlock();
while (!q.empty()) {
q.front()();
q.pop();
}
}
}
void EvLoop::CleanUp()
{
if (queueEventfd != -1) {
(void)close(queueEventfd);
queueEventfd = -1;
}
if (efd != -1) {
(void)close(efd);
efd = -1;
}
}
int EvLoop::AddFuncToEvLoop(std::function<void()> &&func)
{
queueMutex.lock();
queue.emplace(std::move(func));
int result = queue.size() > INT32_MAX ? INT32_MAX : static_cast<int>(queue.size());
queueMutex.unlock();
if (result == 1) {
uint64_t one = 1;
if (write(queueEventfd, &one, sizeof(one)) != static_cast<ssize_t>(sizeof(one))) {
BUSLOG_WARN("fail to write queueEventfd, fd:{},errno:{}", queueEventfd, errno);
}
}
return result;
}
bool EvLoop::Init(const std::string &threadName)
{
int retval = EventLoopCreate();
if (retval != BUS_OK) {
return false;
}
(void)sem_init(&semId, 0, 0);
if (pthread_create(&loopThread, nullptr, EvloopRun, static_cast<void *>(this)) != 0) {
BUSLOG_ERROR("pthread_create fail");
Finish();
return false;
}
(void)sem_wait(&semId);
#if __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 12
std::string name = threadName;
if (name.empty()) {
name = "EventLoopThread";
}
retval = pthread_setname_np(loopThread, name.c_str());
if (retval != 0) {
BUSLOG_INFO("set pthread name fail, name:{},retval:{}", name, retval);
} else {
BUSLOG_INFO("set pthread name success, name:{},loopThread:{}", name, loopThread);
}
#endif
return true;
}
void EvLoop::Finish()
{
if (loopThread) {
void *threadResult = nullptr;
StopEventLoop();
int ret = pthread_join(loopThread, &threadResult);
if (ret != 0) {
BUSLOG_INFO("pthread_join loopThread fail");
}
loopThread = 0;
}
EventLoopDestroy();
BUSLOG_INFO("stop loop succ");
}
EvLoop::~EvLoop()
{
try {
Finish();
} catch (...) {
}
}
void EvLoop::DeleteEvent(int fd)
{
auto iter = events.find(fd);
if (iter == events.end()) {
BUSLOG_DEBUG("not found event, fd:{}", fd);
return;
}
BUSLOG_DEBUG("erase event, fd:{}", fd);
EventData *eventData = iter->second;
if (eventData != nullptr) {
delete eventData;
}
(void)events.erase(fd);
}
EventData *EvLoop::FindEvent(int fd)
{
auto iter = events.find(fd);
if (iter == events.end()) {
return nullptr;
}
return iter->second;
}
void EvLoop::AddEvent(EventData *eventData)
{
if (!eventData) {
return;
}
DeleteEvent(eventData->fd);
(void)events.emplace(eventData->fd, eventData);
}
int EvLoop::EventLoopCreate(void)
{
int retval;
stopLoop = 0;
efd = epoll_create(EPOLL_SIZE);
if (efd == -1) {
BUSLOG_ERROR("epoll_create fail, errno:{}", errno);
CleanUp();
return BUS_ERROR;
}
queueEventfd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
if (queueEventfd == -1) {
BUSLOG_ERROR("eventfd fail, errno:{}", errno);
CleanUp();
return BUS_ERROR;
}
retval =
AddFdEvent(queueEventfd,
static_cast<uint32_t>(EPOLLIN) | static_cast<uint32_t>(EPOLLHUP) | static_cast<uint32_t>(EPOLLERR),
QueueReadyCallback, (void *)this);
if (retval != BUS_OK) {
BUSLOG_ERROR("add queue event fail, queueEventfd:{}", queueEventfd);
CleanUp();
return BUS_ERROR;
}
return BUS_OK;
}
int EvLoop::AddFdEvent(int fd, uint32_t tEvents, EventHandler handler, void *data)
{
struct epoll_event ev;
EventData *evdata = nullptr;
int ret;
(void)memset_s(&ev, sizeof(ev), 0, sizeof(ev));
ev.events = tEvents;
evdata = new (std::nothrow) EventData();
if (evdata == nullptr) {
BUSLOG_ERROR("malloc eventData fail, fd:{},epollfd:{}", fd, efd);
return BUS_ERROR;
}
evdata->data = data;
evdata->handler = handler;
evdata->fd = fd;
eventsLock.lock();
AddEvent(evdata);
eventsLock.unlock();
ev.data.ptr = evdata;
BUSLOG_DEBUG("epoll add, fd:{},epollfd:{}", fd, efd);
ret = epoll_ctl(efd, EPOLL_CTL_ADD, fd, &ev);
if (ret) {
eventsLock.lock();
DeleteEvent(fd);
eventsLock.unlock();
if (errno != EEXIST) {
BUSLOG_ERROR("epoll add fail, fd:{},epollfd:{},errno:{}", fd, efd, errno);
} else {
BUSLOG_ERROR("epoll add already exists, fd:{},epollfd:{},errno:{}", fd, efd, errno);
}
return BUS_ERROR;
}
return BUS_OK;
}
int EvLoop::DelFdEvent(int fd)
{
EventData *tev = nullptr;
struct epoll_event ev;
int ret;
eventsLock.lock();
tev = FindEvent(fd);
if (tev == nullptr) {
eventsLock.unlock();
BUSLOG_DEBUG("event search fail, fd:{},epollfd:{}", fd, efd);
return BUS_ERROR;
}
(void)events.erase(tev->fd);
AddDeletedEvents(tev);
eventsLock.unlock();
BUSLOG_DEBUG("epoll ctl delete, fd:{},epollfd:{}", fd, efd);
ev.events = 0;
ev.data.ptr = tev;
ret = epoll_ctl(efd, EPOLL_CTL_DEL, fd, &ev);
if (ret < 0) {
BUSLOG_ERROR("epoll ctl delete fail, fd:{},epollfd:{},errno:{}", fd, efd, errno);
return BUS_ERROR;
}
return BUS_OK;
}
int EvLoop::ModifyFdEvent(int fd, uint32_t tEvents)
{
struct epoll_event ev;
EventData *tev = nullptr;
int ret;
tev = FindEvent(fd);
if (tev == nullptr) {
BUSLOG_ERROR("event lookup fail, fd:{},events:{}", fd, tEvents);
return BUS_ERROR;
}
(void)memset_s(&ev, sizeof(ev), 0, sizeof(ev));
ev.events = tEvents;
ev.data.ptr = tev;
BUSLOG_DEBUG("epoll modify, fd:{},events:{}", fd, tEvents);
ret = epoll_ctl(efd, EPOLL_CTL_MOD, fd, &ev);
if (ret != 0) {
BUSLOG_ERROR("epoll modify fail, fd:{},events:{},errno:{}", fd, tEvents, errno);
return BUS_ERROR;
}
return BUS_OK;
}
void EvLoop::AddDeletedEvents(EventData *eventData)
{
std::list<EventData *> deleteEventList;
std::map<int, std::list<EventData *>>::iterator fdIter = deletedEvents.find(eventData->fd);
if (fdIter == deletedEvents.end()) {
deletedEvents[eventData->fd].push_back(eventData);
return;
}
deleteEventList = fdIter->second;
std::list<EventData *>::iterator eventIter = deleteEventList.begin();
bool found = false;
while (eventIter != deleteEventList.end()) {
if (*eventIter == eventData) {
BUSLOG_WARN("fd has been deleted before, fd:{},efd:{}", eventData->fd, efd);
found = true;
break;
}
++eventIter;
}
if (found) {
return;
}
deletedEvents[eventData->fd].push_back(eventData);
return;
}
void EvLoop::EventFreeDelEvents()
{
std::map<int, std::list<EventData *>>::iterator fdIter = deletedEvents.begin();
while (fdIter != deletedEvents.end()) {
std::list<EventData *> deleteEventList = fdIter->second;
std::list<EventData *>::iterator eventIter = deleteEventList.begin();
while (eventIter != deleteEventList.end()) {
EventData *deleteEv = *eventIter;
delete deleteEv;
deleteEv = nullptr;
++eventIter;
}
(void)deletedEvents.erase(fdIter++);
}
deletedEvents.clear();
}
int EvLoop::FindDeletedEvent(const EventData *tev)
{
std::map<int, std::list<EventData *>>::iterator fdIter = deletedEvents.find(tev->fd);
if (fdIter == deletedEvents.end()) {
return 0;
}
std::list<EventData *> deleteEventList = fdIter->second;
std::list<EventData *>::iterator eventIter = deleteEventList.begin();
while (eventIter != deleteEventList.end()) {
if (*eventIter == tev) {
return 1;
}
++eventIter;
}
return 0;
}
void EvLoop::HandleEvent(const struct epoll_event *tEvents, int nevent)
{
int i;
int found;
EventData *tev = nullptr;
for (i = 0; i < nevent; i++) {
tev = static_cast<EventData *>(tEvents[i].data.ptr);
if (tev != nullptr) {
found = FindDeletedEvent(tev);
if (found) {
BUSLOG_WARN("fd has been deleted from epoll, fd:{},efd:{}", tev->fd, efd);
continue;
}
tev->handler(tev->fd, tEvents[i].events, tev->data);
}
}
}
void EvLoop::StopEventLoop()
{
if (stopLoop == 1) {
return;
}
stopLoop = 1;
uint64_t one = 1;
if (write(queueEventfd, &one, sizeof(one)) != static_cast<ssize_t>(sizeof(one))) {
BUSLOG_WARN("fail to write queueEventfd, fd:{},errno:{}", queueEventfd, errno);
}
return;
}
void EvLoop::EventLoopDestroy()
{
EventFreeDelEvents();
if (efd > 0) {
if (queueEventfd > 0) {
(void)DelFdEvent(queueEventfd);
(void)close(queueEventfd);
queueEventfd = -1;
}
(void)close(efd);
efd = -1;
}
}
}