* Copyright (c) 2021 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cs_pipe.c
* protocol process
*
* IDENTIFICATION
* src/network/protocol/cs_pipe.c
*
* -------------------------------------------------------------------------
*/
#include "cs_pipe.h"
#include "cm_num.h"
#include "util_profile_stat.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef status_t (*recv_func_t)(void *link, char *buf, uint32 size, int32 *recv_size, uint32 *wait_event);
typedef status_t (*send_func_t)(void *link, const char *buf, uint32 size, int32 *send_size);
typedef status_t (*recv_timed_func_t)(void *link, char *buf, uint32 size, uint32 timeout);
typedef status_t (*send_timed_func_t)(void *link, const char *buf, uint32 size, uint32 timeout);
typedef status_t (*wait_func_t)(void *link, uint32 wait_for, int32 timeout, bool32 *ready);
const text_t g_pipe_type_names[CS_TYPE_CEIL] = {
{ "UNKNOWN", 7 },
{ "TCP", 3 },
{ "SSL", 3 },
};
typedef struct st_vio {
recv_func_t vio_recv;
send_func_t vio_send;
wait_func_t vio_wait;
recv_timed_func_t vio_recv_timed;
send_timed_func_t vio_send_timed;
} vio_t;
static const vio_t g_vio_list[] = {
{ NULL, NULL, NULL, NULL, NULL },
{ (recv_func_t)cs_tcp_recv, (send_func_t)cs_tcp_send, (wait_func_t)cs_tcp_wait,
(recv_timed_func_t)cs_tcp_recv_timed, (send_timed_func_t)cs_tcp_send_timed },
{ (recv_func_t)cs_ssl_recv, (send_func_t)cs_ssl_send, (wait_func_t)cs_ssl_wait,
(recv_timed_func_t)cs_ssl_recv_timed, (send_timed_func_t)cs_ssl_send_timed },
};
Macro definitions for pipe I/O operations
@note
Performance sensitive, the pipe->type should be guaranteed by the caller.
e.g. CS_TYPE_TCP, CS_TYPE_SSL, CS_TYPE_DOMAIN_SOCKET
*/
#define GET_VIO(pipe) \
(&g_vio_list[MIN((pipe)->type, CS_TYPE_CEIL - 1)])
#define VIO_SEND(pipe, buf, size, len) \
GET_VIO(pipe)->vio_send(&(pipe)->link, buf, size, len)
#define VIO_SEND_TIMED(pipe, buf, size, timeout) \
GET_VIO(pipe)->vio_send_timed(&(pipe)->link, buf, size, timeout)
#define VIO_RECV(pipe, buf, size, len, wait_event) \
GET_VIO(pipe)->vio_recv(&(pipe)->link, buf, size, len, wait_event)
#define VIO_RECV_TIMED(pipe, buf, size, timeout) \
GET_VIO(pipe)->vio_recv_timed(&(pipe)->link, buf, size, timeout)
#define VIO_WAIT(pipe, ev, timeout, ready) \
GET_VIO(pipe)->vio_wait(&(pipe)->link, ev, timeout, ready)
static status_t cs_open_tcp_link(const char *host, uint16 port, cs_pipe_t *pipe, link_ready_ack_t *ack,
const char *bind_host)
{
tcp_link_t *link = NULL;
bool32 ready = CM_FALSE;
uint32 proto_code = CM_PROTO_CODE;
uint8 local_endian;
socket_attr_t sock_attr = {
.connect_timeout = pipe->connect_timeout,
.l_onoff = pipe->l_onoff,
.l_linger = pipe->l_linger };
link = &pipe->link.tcp;
CM_RETURN_IFERR(cs_tcp_connect(host, port, link, bind_host, &sock_attr));
LOG_RUN_INF("[MEC]after cs_tcp_connect to host %s port %u.", host, port);
do {
if (cs_tcp_send_timed(link, (char *)&proto_code, sizeof(proto_code), CM_NETWORK_IO_TIMEOUT) != CM_SUCCESS) {
LOG_DEBUG_WAR("[MEC]cs_tcp_send_timed fail, proto_code=%u.", proto_code);
break;
}
if (cs_tcp_wait(link, CS_WAIT_FOR_READ, pipe->connect_timeout, &ready) != CM_SUCCESS) {
LOG_DEBUG_WAR("[MEC]cs_tcp_wait fail when cs_open_tcp_link.");
break;
}
if (!ready) {
LOG_DEBUG_WAR("[MEC]connect wait fail, not ready.");
CM_THROW_ERROR(ERR_TCP_TIMEOUT, "connect wait for server response");
break;
}
if (cs_tcp_recv_timed(link, (char *)ack, sizeof(link_ready_ack_t), CM_NETWORK_IO_TIMEOUT) != CM_SUCCESS) {
LOG_DEBUG_WAR("[MEC]cs_tcp_recv_timed fail, read ack timeout.");
break;
}
local_endian = (IS_BIG_ENDIAN ? (uint8)1 : (uint8)0);
if (local_endian != ack->endian) {
ack->flags = cs_reverse_int16(ack->flags);
ack->version = cs_reverse_int32(ack->version);
pipe->options |= CSO_DIFFERENT_ENDIAN;
}
LOG_RUN_INF("[MEC]cs_open_tcp_link success.");
return CM_SUCCESS;
} while (0);
LOG_DEBUG_WAR("[MEC]cs_open_tcp_link fail.");
(void)cs_close_socket(link->sock);
link->sock = CS_INVALID_SOCKET;
link->closed = CM_TRUE;
return CM_ERROR;
}
TCP 192.168.1.10:1622, database_server1:1622
RDMA: RDMA@192.168.1.10:1622
IPC:/home/gsdb
UDS:/home/gsdb */
typedef struct st_server_info {
cs_pipe_type_t type;
char path[CM_FILE_NAME_BUFFER_SIZE];
uint16 port;
} server_info_t;
static status_t cs_parse_url(const char *url, server_info_t *server)
{
text_t text, part1, part2;
cm_str2text((char *)url, &text);
(void)cm_split_rtext(&text, ':', '\0', &part1, &part2);
server->type = CS_TYPE_TCP;
CM_RETURN_IFERR(cm_text2str(&part1, server->path, CM_FILE_NAME_BUFFER_SIZE));
if (!cm_is_short(&part2)) {
CM_THROW_ERROR(ERR_INVALID_VALUE, "URL", url);
return CM_ERROR;
}
if (cm_text2uint16(&part2, &server->port) != CM_SUCCESS) {
return CM_ERROR;
}
return CM_SUCCESS;
}
status_t cs_connect(const char *url, cs_pipe_t *pipe, const char *bind_host)
{
link_ready_ack_t ack;
server_info_t server;
CM_RETURN_IFERR(cs_parse_url(url, &server));
pipe->type = server.type;
if (pipe->type == CS_TYPE_TCP) {
CM_RETURN_IFERR(cs_open_tcp_link(server.path, server.port, pipe, &ack, bind_host));
} else {
CM_THROW_ERROR(ERR_PROTOCOL_NOT_SUPPORT);
return CM_ERROR;
}
pipe->version = ack.version;
return CM_SUCCESS;
}
void cs_disconnect(cs_pipe_t *pipe)
{
if (pipe->type == CS_TYPE_TCP) {
cs_tcp_disconnect(&pipe->link.tcp);
}
if (pipe->type == CS_TYPE_SSL) {
cs_ssl_disconnect(&pipe->link.ssl);
}
}
void cs_shutdown(cs_pipe_t *pipe)
{
switch (pipe->type) {
case CS_TYPE_TCP:
cs_shutdown_socket(pipe->link.tcp.sock);
break;
case CS_TYPE_SSL:
cs_shutdown_socket(pipe->link.ssl.tcp.sock);
break;
default:
break;
}
}
status_t cs_send_fixed_size(cs_pipe_t *pipe, char *buf, int32 size)
{
bool32 ready;
int32 send_size;
int32 remain_size = size;
char *send_buf = buf;
int32 wait_interval = 0;
if (VIO_SEND(pipe, send_buf, remain_size, &send_size) != CM_SUCCESS) {
return CM_ERROR;
}
send_buf += send_size;
remain_size -= send_size;
while (remain_size > 0) {
if (cs_wait(pipe, CS_WAIT_FOR_WRITE, CM_POLL_WAIT, &ready) != CM_SUCCESS) {
return CM_ERROR;
}
if (!ready) {
wait_interval += CM_POLL_WAIT;
if (wait_interval >= pipe->socket_timeout) {
CM_THROW_ERROR(ERR_TCP_TIMEOUT, "send data");
return CM_ERROR;
}
continue;
}
if (VIO_SEND(pipe, send_buf, remain_size, &send_size) != CM_SUCCESS) {
return CM_ERROR;
}
send_buf += send_size;
remain_size -= send_size;
}
return CM_SUCCESS;
}
status_t cs_send_bytes(cs_pipe_t *pipe, const char *buf, uint32 size)
{
return VIO_SEND_TIMED(pipe, buf, size, CM_NETWORK_IO_TIMEOUT);
}
status_t cs_read_bytes(cs_pipe_t *pipe, char *buf, uint32 max_size, int32 *size)
{
uint32 wait_event;
if (cs_wait(pipe, CS_WAIT_FOR_READ, CM_NETWORK_IO_TIMEOUT, NULL) != CM_SUCCESS) {
return CM_ERROR;
}
return VIO_RECV(pipe, buf, max_size, size, &wait_event);
}
status_t cs_read_fixed_size(cs_pipe_t *pipe, char *buf, int32 size)
{
bool32 ready;
int32 read_size;
uint32 wait_event;
int32 remain_size = size;
char *read_buf = buf;
int32 wait_interval = 0;
if (size == 0) {
return CM_SUCCESS;
}
if (VIO_RECV(pipe, read_buf, remain_size, &read_size, &wait_event) != CM_SUCCESS) {
return CM_ERROR;
}
read_buf += read_size;
remain_size -= read_size;
while (remain_size > 0) {
if (cs_wait(pipe, CS_WAIT_FOR_READ, CM_POLL_WAIT, &ready) != CM_SUCCESS) {
return CM_ERROR;
}
if (!ready) {
wait_interval += CM_POLL_WAIT;
if (wait_interval >= pipe->socket_timeout) {
CM_THROW_ERROR(ERR_TCP_TIMEOUT, "recv data");
return CM_ERROR;
}
continue;
}
if (VIO_RECV(pipe, read_buf, remain_size, &read_size, &wait_event) != CM_SUCCESS) {
return CM_ERROR;
}
read_buf += read_size;
remain_size -= read_size;
}
return CM_SUCCESS;
}
status_t cs_wait(cs_pipe_t *pipe, uint32 wait_for, int32 timeout, bool32 *ready)
{
if (pipe->type == CS_TYPE_TCP) {
return cs_tcp_wait(&pipe->link.tcp, wait_for, timeout, ready);
}
if (pipe->type == CS_TYPE_SSL) {
return cs_ssl_wait(&pipe->link.ssl, wait_for, timeout, ready);
}
return CM_ERROR;
}
socket_t cs_get_socket_fd(const cs_pipe_t* pipe)
{
if (pipe->type == CS_TYPE_TCP) {
return pipe->link.tcp.sock;
} else if (pipe->type == CS_TYPE_SSL) {
return pipe->link.ssl.tcp.sock;
} else {
return CS_INVALID_SOCKET;
}
}
status_t cs_ssl_accept(ssl_ctx_t *fd, cs_pipe_t *pipe)
{
ssl_link_t *link = NULL;
link = &pipe->link.ssl;
link->ssl_ctx = fd;
if (cs_ssl_accept_socket(link, pipe->link.tcp.sock, CM_SSL_IO_TIMEOUT) != CM_SUCCESS) {
return CM_ERROR;
}
pipe->type = CS_TYPE_SSL;
return CM_SUCCESS;
}
status_t cs_ssl_connect(ssl_ctx_t *fd, cs_pipe_t *pipe)
{
ssl_link_t *link = NULL;
link = &pipe->link.ssl;
link->ssl_ctx = fd;
if (cs_ssl_connect_socket(link, pipe->link.tcp.sock, CM_SSL_IO_TIMEOUT) != CM_SUCCESS) {
return CM_ERROR;
}
pipe->type = CS_TYPE_SSL;
return CM_SUCCESS;
}
#ifdef __cplusplus
}
#endif