* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cms_socket.c
*
*
* IDENTIFICATION
* src/cms/interface/cms_socket.c
*
* -------------------------------------------------------------------------
*/
#include "cms_log_module.h"
#include "cm_defs.h"
#include "cm_file.h"
#include "cms_comm.h"
#include "cm_ip.h"
#include "cs_packet.h"
#include "cs_tcp.h"
#include "cms_client.h"
#include "cs_uds.h"
#include "securec.h"
#include "cms_socket.h"
status_t cms_socket_init(void)
{
#ifdef WIN32
WORD wVersionRequested;
WSADATA wsaData;
wVersionRequested = MAKEWORD(1, 1);
if (WSAStartup(wVersionRequested, &wsaData) != 0) {
return OG_ERROR;
}
#endif
return OG_SUCCESS;
}
int32 cms_socket_error(void)
{
#ifdef WIN32
return WSAGetLastError();
#else
return errno;
#endif
}
status_t cms_socket_open(socket_t* sock_out)
{
socket_t sock = -1;
while (OG_TRUE) {
sock = socket(AF_INET, SOCK_DGRAM, 0);
if (sock == CS_INVALID_SOCKET) {
OG_LOG_RUN_ERR("create socket failed, errno %d[%s]", errno, strerror(errno));
return OG_ERROR;
}
#ifndef _WIN32
if (sock != STDIN_FILENO &&
sock != STDOUT_FILENO &&
sock != STDERR_FILENO) {
break;
}
#else
break;
#endif
}
int32 size = SIZE_M(64);
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*)&size, sizeof(size));
setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char*)&size, sizeof(size));
cms_socket_setopt_blocking(sock, OG_FALSE);
cms_socket_setopt_close_exec(sock);
*sock_out = sock;
return OG_SUCCESS;
}
void cms_socket_close(socket_t sock)
{
cs_close_socket(sock);
}
status_t cms_socket_setopt_blocking(socket_t sockfd, bool32 flag)
{
#ifdef WIN32
unsigned long block = flag ? 0 : 1;
return ioctlsocket(sockfd, FIONBIO, &block);
#else
int32 nDelayFlag = fcntl(sockfd, F_GETFL, 0);
if (nDelayFlag == -1) {
OG_LOG_RUN_ERR("socket getfl error, sock %d, errno %d[%s]", sockfd, errno, strerror(errno));
return OG_ERROR;
}
nDelayFlag = flag ? nDelayFlag & (~O_NONBLOCK) : nDelayFlag | O_NONBLOCK;
return fcntl(sockfd, F_SETFL, nDelayFlag);
#endif
}
status_t cms_socket_setopt_close_exec(socket_t sockfd)
{
#ifndef WIN32
int32 flags = fcntl(sockfd, F_GETFD);
if (flags == -1) {
OG_LOG_RUN_ERR("socket getfd error, sock %d, errno %d[%s]", sockfd, errno, strerror(errno));
return OG_ERROR;
}
flags |= FD_CLOEXEC;
if (fcntl(sockfd, F_SETFD, flags) == -1) {
return OG_ERROR;
}
#endif
return OG_SUCCESS;
}
status_t cms_socket_setopt_reuse(socket_t sockfd, bool32 flag)
{
int32 option = flag ? 1 : 0;
return setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char*)&option, sizeof(option));
}
status_t cms_socket_wait(socket_t sock, uint32 wait_for, int32 timeout, bool32* ready)
{
struct pollfd fd;
int32 ret;
int32 tv;
if (ready != NULL) {
*ready = OG_FALSE;
}
tv = (timeout < 0 ? -1 : timeout);
fd.fd = sock;
fd.revents = 0;
if (wait_for == CS_WAIT_FOR_WRITE) {
fd.events = POLLOUT;
} else {
fd.events = POLLIN;
}
ret = cs_tcp_poll(&fd, 1, tv);
if (ret >= 0) {
if (ready != NULL) {
*ready = (ret > 0);
}
return OG_SUCCESS;
}
if (errno != EINTR) {
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t cms_uds_build_addr(cms_sockaddr_un_t* addr, const char* pszName, int32* len)
{
errno_t ret = memset_s(addr, sizeof(cms_sockaddr_un_t), 0, sizeof(cms_sockaddr_un_t));
MEMS_RETURN_IFERR(ret);
addr->sun_family = AF_UNIX;
ret = strncpy_s(addr->sun_path, sizeof(addr->sun_path), pszName, strlen(pszName));
MEMS_RETURN_IFERR(ret);
*len = sizeof_addr_un(*addr);
return OG_SUCCESS;
}
status_t cms_uds_create_listener(const char* pszName, socket_t* sock_out)
{
int32 len = 0;
struct sockaddr_un un;
status_t ret = OG_SUCCESS;
socket_t sock = CMS_IO_INVALID_SOCKET;
sock = socket(AF_UNIX, SOCK_STREAM, 0);
if (sock < 0) {
OG_LOG_RUN_ERR("socket failed, sock %d, errno %d[%s]", sock, errno, strerror(errno));
return OG_ERROR;
}
status_t status = cms_uds_build_addr(&un, pszName, &len);
if (status == OG_ERROR) {
OG_LOG_RUN_ERR("build uds address failed, sock %d, errno %d[%s]", sock, errno, strerror(errno));
cms_socket_close(sock);
return OG_ERROR;
}
unlink(un.sun_path);
ret = bind(sock, (struct sockaddr*)&un, (socklen_t)len);
if (ret < 0) {
OG_LOG_RUN_ERR("bind failed, ret %d, sun path %s, sock %d, errno %d[%s]", ret, un.sun_path,
sock, errno, strerror(errno));
cms_socket_close(sock);
return OG_ERROR;
}
(void)chmod(pszName, S_IRUSR | S_IWUSR);
ret = listen(sock, CMS_UDS_LISTEN_BACKLOG);
if (ret < 0) {
OG_LOG_RUN_ERR("listen failed, ret %d, sock %d, errno %d[%s]", ret, sock, errno, strerror(errno));
cms_socket_close(sock);
return OG_ERROR;
}
*sock_out = sock;
return OG_SUCCESS;
}
status_t cms_uds_connect(const char* pszName, socket_t* sock_out)
{
int32 len;
struct sockaddr_un un;
status_t ret;
socket_t sock;
sock = socket(AF_UNIX, SOCK_STREAM, 0);
if (sock < 0) {
OG_LOG_RUN_ERR("socket failed, sock %d, uds path %s, errno %d[%s]", sock, pszName, errno, strerror(errno));
return OG_ERROR;
}
errno_t err = memset_s(&un, sizeof(un), 0, sizeof(un));
MEMS_RETURN_IFERR(err);
un.sun_family = AF_UNIX;
err = strcpy_sp(un.sun_path, sizeof(un.sun_path), pszName);
MEMS_RETURN_IFERR(err);
len = offsetof(struct sockaddr_un, sun_path) + strlen(un.sun_path);
ret = connect(sock, (struct sockaddr*)&un, (size_t)len);
if (ret < 0) {
OG_LOG_RUN_ERR("connect failed, ret %d, uds path %s, errno %d[%s]", ret, un.sun_path, errno, strerror(errno));
(void)cms_socket_close(sock);
return OG_ERROR;
}
*sock_out = sock;
return OG_SUCCESS;
}
status_t cms_socket_accept(socket_t sockfd, int32 timeout_ms, socket_t* sock)
{
if (timeout_ms >= 0) {
cms_socket_wait(sockfd, CS_WAIT_FOR_READ, timeout_ms, NULL);
}
*sock = accept(sockfd, 0, 0);
if (*sock == CS_INVALID_SOCKET) {
OG_LOG_RUN_ERR("accept failed, errno %d[%s].", errno, strerror(errno));
return OG_ERROR;
}
return OG_SUCCESS;
}
status_t cms_socket_send_bytes(socket_t sockfd, const char* data, int32* dlen, int32 timeout_ms)
{
int32 cc = 0;
int32 counter = 0;
int32 snd_bytes = *dlen;
if (sockfd < 0) {
*dlen = counter;
return OG_ERROR;
}
while (counter < snd_bytes) {
cms_socket_wait(sockfd, CS_WAIT_FOR_WRITE, timeout_ms, NULL);
cc = send(sockfd, data + counter, (size_t)(snd_bytes - counter), 0);
if (cc > 0) {
counter += cc;
continue;
}
if (0 == cc) {
*dlen = counter;
OG_LOG_RUN_ERR("send failed, errno %d[%s].", errno, strerror(errno));
return OG_ERROR;
}
if (errno != EINTR) {
*dlen = counter;
OG_LOG_RUN_ERR("send failed, ret %d, errno %d[%s].", cc, errno, strerror(errno));
return OG_ERROR;
}
}
*dlen = counter;
return OG_SUCCESS;
}
status_t cms_socket_recv_bytes(socket_t sockfd, char* buf, int32* buf_len, int32 timeout_ms, bool32 is_retry_conn)
{
char* pdata = buf;
int32 cc = 0;
int32 counter = 0;
int32 rcv_bytes = *buf_len;
status_t ret = OG_SUCCESS;
if (sockfd < 0) {
OG_LOG_RUN_ERR("recv failed, sockfd is invalid, sockfd %d.", sockfd);
*buf_len = counter;
return OG_ERROR;
}
if (is_retry_conn) {
struct timeval timeout_recv = {CMS_LINUX_RECV_TMOUNT_SEC_RETRY, CMS_LINUX_RECV_TMOUNT_MS};
if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&timeout_recv, sizeof(struct timeval)) < 0) {
OG_LOG_RUN_ERR("setsockopt failed");
return OG_ERROR;
}
}
while (counter < rcv_bytes) {
ret = cms_socket_wait(sockfd, CS_WAIT_FOR_READ, timeout_ms, NULL);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("socket wait failed, ret = %d", ret);
return ret;
}
cc = recv(sockfd, pdata + counter, (size_t)(rcv_bytes - counter), 0);
if (cc > 0) {
counter += cc;
continue;
}
if (0 == cc || errno == ECONNRESET) {
*buf_len = counter;
OG_LOG_RUN_INF("connection is closed by peer, sock %d.", sockfd);
return OG_ERROR_CONN_CLOSED;
}
if (errno != EINTR) {
*buf_len = counter;
OG_LOG_RUN_ERR("recv failed, ret %d, sock %d, errno %d[%s].", cc, sockfd, errno, strerror(errno));
return OG_ERROR;
}
}
*buf_len = counter;
return OG_SUCCESS;
}
status_t cms_socket_recv_header(socket_t sockfd, char* buf, int32 size, int32 timeout_ms, bool32 is_retry_conn)
{
status_t ret = OG_SUCCESS;
int32 header_len = sizeof(cms_packet_head_t);
if (size < header_len) {
OG_LOG_RUN_ERR("message buffer is not enough.");
return OG_ERROR;
}
ret = cms_socket_recv_bytes(sockfd, buf, &header_len, timeout_ms, is_retry_conn);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("recv msg failed, ret %d, len %d.", ret, header_len);
return ret;
}
return OG_SUCCESS;
}
status_t cms_socket_recv_body(socket_t sockfd, char* buf, int32 size, int32 timeout_ms)
{
status_t ret = OG_SUCCESS;
int32 body_len = size;
ret = cms_socket_recv_bytes(sockfd, buf, &body_len, timeout_ms, OG_FALSE);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("recv msg failed, ret %d, len %d", ret, body_len);
return ret;
}
return OG_SUCCESS;
}
status_t cms_socket_recv(socket_t sockfd, cms_packet_head_t* msg, int32 size, int32 timeout_ms, bool32 is_retry_conn)
{
status_t ret = OG_SUCCESS;
int32 body_len = 0;
ret = cms_socket_recv_header(sockfd, (char*)msg, size, timeout_ms, is_retry_conn);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("recv msg header failed, ret %d", ret);
return ret;
}
body_len = msg->msg_size - sizeof(cms_packet_head_t);
if (body_len > size - sizeof(cms_packet_head_t)) {
OG_LOG_RUN_ERR("message buffer is not enough, msg type %u, msg req %llu, src msg req %llu, msg size %u, "
"body len %d", (int32)msg->msg_type, msg->msg_seq, msg->src_msg_seq, msg->msg_size, body_len);
return OG_ERROR;
}
ret = cms_socket_recv_body(sockfd, (char*)msg + sizeof(cms_packet_head_t), body_len, timeout_ms);
if (ret != OG_SUCCESS) {
OG_LOG_RUN_ERR("recv msg body failed, ret %d, msg type %u, msg req %llu, src msg req %llu, msg size %u, "
"body len %d, sock %d", ret, (int32)msg->msg_type, msg->msg_seq, msg->src_msg_seq, msg->msg_size,
body_len, sockfd);
return ret;
}
if (is_retry_conn) {
struct timeval recv_timeout = { CMS_LINUX_RECV_TMOUNT_SEC, CMS_LINUX_RECV_TMOUNT_MS };
if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (char *)&recv_timeout, sizeof(struct timeval)) < 0) {
OG_LOG_RUN_ERR("setsockopt failed");
return OG_ERROR;
}
}
CMS_LOG_MSG(OG_LOG_DEBUG_INF, "receive msg succeed", msg);
return OG_SUCCESS;
}
status_t cms_socket_send(socket_t sockfd, cms_packet_head_t* msg, int32 timeout_ms)
{
int32 size = msg->msg_size;
OG_RETURN_IFERR(cms_socket_send_bytes(sockfd, (char*)msg, &size, timeout_ms));
CMS_LOG_MSG(OG_LOG_DEBUG_INF, "send msg succeed", msg);
return OG_SUCCESS;
}