* Copyright (c) 2022 Huawei Technologies Co.,Ltd.
*
* DSS is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* dss_lsnr.c
*
*
* IDENTIFICATION
* src/service/dss_lsnr.c
*
* -------------------------------------------------------------------------
*/
#include "cm_file.h"
#include "dss_log.h"
#include "dss_param.h"
#include "dss_lsnr.h"
#define DSS_EMERG_ACCEPT_BATCH 256
static void cs_close_uds_socks(uds_lsnr_t *lsnr)
{
uint32 loop;
for (loop = 0; loop < lsnr->sock_count; ++loop) {
if (lsnr->socks[loop] != CS_INVALID_SOCKET) {
cs_uds_socket_close(&lsnr->socks[loop]);
}
}
lsnr->sock_count = 0;
}
static bool32 cs_uds_create_link(socket_t sock_ready, cs_pipe_t *pipe)
{
uds_link_t *link = &pipe->link.uds;
link->local.salen = (socklen_t)sizeof(link->local.addr);
(void)cs_uds_getsockname(sock_ready, &link->local);
link->remote.salen = (socklen_t)sizeof(link->remote.addr);
link->sock = (socket_t)accept(sock_ready, SOCKADDR(&link->remote), &link->remote.salen);
if (link->sock == CS_INVALID_SOCKET) {
int32 err = cm_get_os_error();
if (err != EAGAIN && err != EWOULDBLOCK) {
LOG_RUN_ERR("Failed to accept connection request, OS error:%d", err);
}
return CM_FALSE;
}
cs_set_io_mode(link->sock, CM_FALSE, CM_FALSE);
cs_set_buffer_size(link->sock, CM_TCP_DEFAULT_BUFFER_SIZE, CM_TCP_DEFAULT_BUFFER_SIZE);
cs_set_keep_alive(link->sock, CM_TCP_KEEP_IDLE, CM_TCP_KEEP_INTERVAL, CM_TCP_KEEP_COUNT);
cs_set_linger(link->sock, 1, 1);
link->closed = CM_FALSE;
LOG_DEBUG_INF("[DSS_CONNECT] accept success, listen_sock=%d, conn_sock=%d", (int)sock_ready, (int)link->sock);
return CM_TRUE;
}
static status_t cs_handle_accepted_link(uds_lsnr_t *lsnr, socket_t sock_ready, cs_pipe_t *pipe, bool32 is_emerg)
{
status_t status;
if (lsnr->status != LSNR_STATUS_RUNNING && lsnr->status != LSNR_STATUS_PAUSING) {
LOG_RUN_ERR("cs_try_uds_accept error :%u\n", lsnr->status);
cs_uds_disconnect(&pipe->link.uds);
return CM_ERROR;
}
status = lsnr->action(is_emerg, lsnr, pipe);
if (status != CM_SUCCESS) {
LOG_DEBUG_ERR("[DSS_CONNECT] listener action failed, listen_sock=%d, conn_sock=%d, status=%d",
(int)sock_ready, (int)pipe->link.uds.sock, status);
cs_uds_disconnect(&pipe->link.uds);
return CM_ERROR;
}
return CM_SUCCESS;
}
static void cs_try_accept_on_socket(uds_lsnr_t *lsnr, socket_t sock_ready, cs_pipe_t *pipe)
{
bool32 is_emerg = (sock_ready == lsnr->socks[0]);
cs_pipe_t batch[DSS_EMERG_ACCEPT_BATCH];
uint32 batch_count = 0;
uint32 i;
if (!is_emerg) {
if (cs_uds_create_link(sock_ready, pipe)) {
(void)cs_handle_accepted_link(lsnr, sock_ready, pipe, CM_FALSE);
}
return;
}
while (batch_count < DSS_EMERG_ACCEPT_BATCH) {
if (!cs_uds_create_link(sock_ready, pipe)) {
break;
}
if (lsnr->status != LSNR_STATUS_RUNNING && lsnr->status != LSNR_STATUS_PAUSING) {
cs_uds_disconnect(&pipe->link.uds);
break;
}
batch[batch_count++] = *pipe;
}
for (i = 0; i < batch_count; i++) {
if (cs_handle_accepted_link(lsnr, sock_ready, &batch[i], CM_TRUE) != CM_SUCCESS) {
continue;
}
}
}
static void cs_try_uds_accept(uds_lsnr_t *lsnr, cs_pipe_t *pipe)
{
socket_t sock_ready;
int32 ret;
int32 loop = 0;
struct epoll_event evnts[CM_MAX_LSNR_HOST_COUNT];
ret = epoll_wait(lsnr->epoll_fd, evnts, (int)lsnr->sock_count, (int)CM_POLL_WAIT);
if (ret == 0) {
return;
}
if (ret < 0) {
if (cm_get_os_error() != EINTR) {
LOG_RUN_ERR("Failed to wait for connection request, OS error:%d", cm_get_os_error());
}
return;
}
for (loop = 0; loop < ret; loop++) {
sock_ready = evnts[loop].data.fd;
cs_try_accept_on_socket(lsnr, sock_ready, pipe);
}
}
static void cs_uds_lsnr_proc(thread_t *thread)
{
cs_pipe_t pipe;
uds_lsnr_t *lsnr = NULL;
CM_ASSERT(thread != NULL);
lsnr = (uds_lsnr_t *)thread->argument;
(void)memset_s(&pipe, sizeof(cs_pipe_t), 0, sizeof(cs_pipe_t));
pipe.type = CS_TYPE_DOMAIN_SCOKET;
cm_set_thread_name("uds-lsnr");
LOG_RUN_INF("uds-lsnr thread started");
while (!thread->closed) {
if (lsnr->status == LSNR_STATUS_RUNNING) {
cs_try_uds_accept(lsnr, &pipe);
} else if (lsnr->status == LSNR_STATUS_PAUSING) {
lsnr->status = LSNR_STATUS_PAUSED;
}
}
LOG_RUN_INF("uds-lsnr thread closed");
}
static status_t cs_uds_lsnr_init_epoll_fd(uds_lsnr_t *lsnr)
{
struct epoll_event ev;
uint32 loop;
lsnr->epoll_fd = epoll_create1(0);
if (lsnr->epoll_fd == -1) {
DSS_THROW_ERROR(ERR_SOCKET_LISTEN, "create epoll fd for listener", cm_get_os_error());
return CM_ERROR;
}
for (loop = 0; loop < lsnr->sock_count; loop++) {
ev.events = EPOLLIN;
ev.data.fd = (int)lsnr->socks[loop];
if (epoll_ctl(lsnr->epoll_fd, EPOLL_CTL_ADD, ev.data.fd, &ev) != 0) {
cm_close_file(lsnr->epoll_fd);
DSS_THROW_ERROR(ERR_SOCKET_LISTEN, "add socket for listening to epool fd", cm_get_os_error());
return CM_ERROR;
}
}
return CM_SUCCESS;
}
static status_t cs_create_uds_socks(uds_lsnr_t *lsnr)
{
char(*name)[DSS_MAX_PATH_BUFFER_SIZE] = lsnr->names;
lsnr->sock_count = 0;
for (uint32 loop = 0; loop < CM_MAX_LSNR_HOST_COUNT; loop++) {
if (name[loop][0] == '\0') {
continue;
}
if (cs_uds_create_listener(lsnr->names[loop], &lsnr->socks[loop], (uint16)lsnr->permissions) != CM_SUCCESS) {
cs_close_uds_socks(lsnr);
return CM_ERROR;
}
if (lsnr->sock_count == 0) {
(void)cs_set_io_mode(lsnr->socks[loop], CM_TRUE, CM_FALSE);
}
(void)cm_atomic_inc(&lsnr->sock_count);
}
return CM_SUCCESS;
}
void cs_pause_uds_lsnr(uds_lsnr_t *lsnr)
{
if (lsnr->thread.id == 0) {
return;
}
lsnr->status = LSNR_STATUS_PAUSING;
while (lsnr->status != LSNR_STATUS_PAUSED && !lsnr->thread.closed) {
cm_sleep(1);
}
}
void cs_stop_uds_lsnr(uds_lsnr_t *lsnr)
{
cm_close_thread(&lsnr->thread);
cs_close_uds_socks(lsnr);
(void)epoll_close(lsnr->epoll_fd);
}
status_t cs_start_uds_lsnr(uds_lsnr_t *lsnr, uds_connect_action_t action)
{
CM_ASSERT(lsnr != NULL);
status_t status;
lsnr->status = LSNR_STATUS_STOPPED;
lsnr->action = action;
for (uint32 loop = 0; loop < CM_MAX_LSNR_HOST_COUNT; loop++) {
lsnr->socks[loop] = CS_INVALID_SOCKET;
}
status = cs_create_uds_socks(lsnr);
if (status != CM_SUCCESS) {
LOG_RUN_ERR("create domain socket failed. error code is %d.", cm_get_os_error());
return CM_ERROR;
}
if (cs_uds_lsnr_init_epoll_fd(lsnr) != CM_SUCCESS) {
cs_close_uds_socks(lsnr);
LOG_RUN_ERR("failed to init epoll fd");
return CM_ERROR;
}
lsnr->status = LSNR_STATUS_RUNNING;
if (cm_create_thread(cs_uds_lsnr_proc, 0, lsnr, &lsnr->thread) != CM_SUCCESS) {
cs_close_uds_socks(lsnr);
(void)epoll_close(lsnr->epoll_fd);
lsnr->status = LSNR_STATUS_STOPPED;
LOG_RUN_ERR("failed to create accept thread");
return CM_ERROR;
}
return CM_SUCCESS;
}