* This file is part of the oGRAC project.
* Copyright (c) 2024 Huawei Technologies Co.,Ltd.
*
* oGRAC is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* cs_pipe.c
*
*
* IDENTIFICATION
* src/protocol/cs_pipe.c
*
* -------------------------------------------------------------------------
*/
#include "cs_pipe.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef status_t (*recv_func_t)(void *link, char *buf, uint32 size, int32 *recv_size, uint32 *wait_event);
typedef status_t (*send_func_t)(void *link, const char *buf, uint32 size, int32 *send_size);
typedef status_t (*recv_timed_func_t)(void *link, char *buf, uint32 size, uint32 timeout);
typedef status_t (*send_timed_func_t)(void *link, const char *buf, uint32 size, uint32 timeout);
typedef status_t (*wait_func_t)(void *link, uint32 wait_for, int32 timeout, bool32 *ready);
const text_t g_pipe_type_names[CS_TYPE_CEIL] = {
{ "UNKNOWN", 7 },
{ "TCP", 3 },
{ "IPC", 3 },
{ "UDS", 3 },
{ "SSL", 3 },
{ "EMBEDDED", 8 },
{ "DIRECT", 6 },
{ "RDMA", 4 },
};
typedef struct st_vio {
recv_func_t vio_recv;
send_func_t vio_send;
wait_func_t vio_wait;
recv_timed_func_t vio_recv_timed;
send_timed_func_t vio_send_timed;
} vio_t;
static const vio_t g_vio_list[] = {
{ NULL, NULL, NULL, NULL, NULL },
{ (recv_func_t)cs_tcp_recv, (send_func_t)cs_tcp_send, (wait_func_t)cs_tcp_wait,
(recv_timed_func_t)cs_tcp_recv_timed, (send_timed_func_t)cs_tcp_send_timed },
{ NULL, NULL, NULL, NULL, NULL },
{ (recv_func_t)cs_uds_recv, (send_func_t)cs_uds_send, (wait_func_t)cs_uds_wait,
(recv_timed_func_t)cs_uds_recv_timed, (send_timed_func_t)cs_uds_send_timed },
{ (recv_func_t)cs_ssl_recv, (send_func_t)cs_ssl_send, (wait_func_t)cs_ssl_wait,
(recv_timed_func_t)cs_ssl_recv_timed, (send_timed_func_t)cs_ssl_send_timed },
{ NULL, NULL, NULL, NULL, NULL },
{ NULL, NULL, NULL, NULL, NULL },
{ (recv_func_t)cs_rdma_recv, (send_func_t)cs_rdma_send, (wait_func_t)cs_rdma_wait,
(recv_timed_func_t)cs_rdma_recv_timed, (send_timed_func_t)cs_rdma_send_timed },
};
Macro definitions for pipe I/O operations
@note
Performance sensitive, the pipe->type should be guaranteed by the caller.
e.g. CS_TYPE_TCP, CS_TYPE_SSL, CS_TYPE_DOMAIN_SOCKET
*/
#define GET_VIO(pipe) \
(&g_vio_list[MIN((pipe)->type, CS_TYPE_CEIL - 1)])
#define VIO_SEND(pipe, buf, size, len) \
GET_VIO(pipe)->vio_send(&(pipe)->link, buf, size, len)
#define VIO_SEND_TIMED(pipe, buf, size, timeout) \
GET_VIO(pipe)->vio_send_timed(&(pipe)->link, buf, size, timeout)
#define VIO_RECV(pipe, buf, size, len, wait_event) \
GET_VIO(pipe)->vio_recv(&(pipe)->link, buf, size, len, wait_event)
#define VIO_RECV_TIMED(pipe, buf, size, timeout) \
GET_VIO(pipe)->vio_recv_timed(&(pipe)->link, buf, size, timeout)
#define VIO_WAIT(pipe, ev, timeout, ready) \
GET_VIO(pipe)->vio_wait(&(pipe)->link, ev, timeout, ready)
static status_t cs_read_packet(cs_pipe_t *pipe, cs_packet_t *pack, bool32 cs_client)
{
int32 remain_size;
int32 head_size = sizeof(cs_packet_head_t);
int32 err_code = 0;
status_t ret_chk;
if (pack->buf == NULL) {
OG_THROW_ERROR(ERR_CLT_OBJECT_IS_NULL, "pack buf is null");
return OG_ERROR;
}
ret_chk = VIO_RECV_TIMED(pipe, pack->buf, head_size, OG_NETWORK_IO_TIMEOUT);
if (ret_chk != OG_SUCCESS) {
err_code = cm_get_error_code();
if (err_code == (int32)ERR_TCP_TIMEOUT) {
OG_THROW_ERROR(ERR_TCP_TIMEOUT,
cs_client ? "read head wait for server response" : "read head wait for client request");
}
return OG_ERROR;
}
if (CS_DIFFERENT_ENDIAN(pack->options)) {
pack->head->size = cs_reverse_int32(pack->head->size);
pack->head->flags = cs_reverse_int16(pack->head->flags);
pack->head->serial_number = cs_reverse_int32(pack->head->serial_number);
}
OG_RETURN_IFERR(cs_try_realloc_packet_buffer(pack, head_size));
remain_size = (int32)pack->head->size - head_size;
if (remain_size <= 0) {
return OG_SUCCESS;
}
ret_chk = VIO_RECV_TIMED(pipe, pack->buf + head_size, remain_size, OG_NETWORK_IO_TIMEOUT);
if (ret_chk != OG_SUCCESS) {
err_code = cm_get_error_code();
if (err_code == (int32)ERR_TCP_TIMEOUT) {
OG_THROW_ERROR(ERR_TCP_TIMEOUT,
cs_client ? "read body wait for server response" : "read body wait for client request");
}
}
return ret_chk;
}
static status_t cs_write_packet(cs_pipe_t *pipe, cs_packet_t *pack)
{
uint32 size = pack->head->size;
if (CS_DIFFERENT_ENDIAN(pack->options)) {
pack->head->size = cs_reverse_int32(pack->head->size);
pack->head->flags = cs_reverse_int16(pack->head->flags);
}
if (VIO_SEND_TIMED(pipe, pack->buf, size, OG_DEFAULT_NULL_VALUE) != OG_SUCCESS) {
return OG_ERROR;
}
return OG_SUCCESS;
}
static status_t cs_open_tcp_link(const char *host, uint16 port, cs_pipe_t *pipe, link_ready_ack_t *ack,
const char *bind_host)
{
tcp_link_t *link = NULL;
bool32 ready = OG_FALSE;
uint32 proto_code = OG_PROTO_CODE;
uint8 local_endian;
socket_attr_t sock_attr = {.connect_timeout = pipe->connect_timeout,
.l_onoff = pipe->l_onoff, .l_linger = pipe->l_linger };
link = &pipe->link.tcp;
OG_RETURN_IFERR(cs_tcp_connect(host, port, link, bind_host, &sock_attr));
do {
OG_BREAK_IF_ERROR(cs_tcp_send_timed(link, (char *)&proto_code, sizeof(proto_code), OG_NETWORK_IO_TIMEOUT));
OG_BREAK_IF_ERROR(cs_tcp_wait(link, CS_WAIT_FOR_READ, pipe->connect_timeout, &ready));
if (!ready) {
OG_THROW_ERROR(ERR_TCP_TIMEOUT, "connect wait for server response");
break;
}
OG_BREAK_IF_ERROR(cs_tcp_recv_timed(link, (char *)ack, sizeof(link_ready_ack_t), OG_NETWORK_IO_TIMEOUT));
local_endian = (IS_BIG_ENDIAN ? (uint8)1 : (uint8)0);
if (local_endian != ack->endian) {
ack->flags = cs_reverse_int16(ack->flags);
}
return OG_SUCCESS;
} while (0);
cs_close_socket(link->sock);
link->sock = CS_INVALID_SOCKET;
link->closed = OG_TRUE;
return OG_ERROR;
}
static status_t cs_open_uds_link(const char *server_path, const char *client_path,
cs_pipe_t *pipe, link_ready_ack_t *ack)
{
uds_link_t *link = NULL;
bool32 ready = OG_FALSE;
uint32 proto_code = OG_PROTO_CODE;
uint8 local_endian;
socket_attr_t sock_attr = {.connect_timeout = pipe->connect_timeout,
.l_onoff = pipe->l_onoff, .l_linger = pipe->l_linger };
link = &pipe->link.uds;
if (cs_create_uds_socket(&link->sock) != OG_SUCCESS) {
return OG_ERROR;
}
if (cs_uds_connect(server_path, client_path, link, &sock_attr) != OG_SUCCESS) {
goto error;
}
if (cs_uds_send_timed(link, (char *)&proto_code, sizeof(proto_code), OG_NETWORK_IO_TIMEOUT) != OG_SUCCESS) {
goto error;
}
if (cs_uds_wait(link, CS_WAIT_FOR_READ, pipe->connect_timeout, &ready) != OG_SUCCESS) {
goto error;
}
if (!ready) {
OG_THROW_ERROR(ERR_TCP_TIMEOUT, "connect wait for server response");
goto error;
}
if (cs_uds_recv_timed(link, (char *)ack, sizeof(link_ready_ack_t), OG_NETWORK_IO_TIMEOUT) != OG_SUCCESS) {
goto error;
}
local_endian = (IS_BIG_ENDIAN ? (uint8)1 : (uint8)0);
if (local_endian != ack->endian) {
ack->flags = cs_reverse_int16(ack->flags);
}
return OG_SUCCESS;
error:
cs_uds_socket_close(&link->sock);
link->closed = OG_TRUE;
return OG_ERROR;
}
static status_t cs_open_rdma_link(const char *host, uint16 port, cs_pipe_t *pipe, link_ready_ack_t *ack)
{
rdma_link_t *link = NULL;
bool32 ready = OG_FALSE;
uint32 proto_code = OG_PROTO_CODE;
uint8 local_endian;
link = &pipe->link.rdma;
if (cs_rdma_connect(host, port, link) != OG_SUCCESS) {
return OG_ERROR;
}
if (cs_rdma_send_timed(link, (char *)&proto_code, sizeof(proto_code), OG_NETWORK_IO_TIMEOUT) != OG_SUCCESS) {
return OG_ERROR;
}
if (cs_rdma_wait(link, CS_WAIT_FOR_READ, pipe->connect_timeout, &ready) != OG_SUCCESS) {
return OG_ERROR;
}
if (!ready) {
OG_THROW_ERROR(ERR_TCP_TIMEOUT, "rdma socket wait for server response");
return OG_ERROR;
}
OG_RETURN_IFERR(cs_rdma_recv_timed(link, (char *)ack, sizeof(link_ready_ack_t), OG_NETWORK_IO_TIMEOUT));
local_endian = (IS_BIG_ENDIAN ? (uint8)1 : (uint8)0);
if (local_endian != ack->endian) {
ack->flags = cs_reverse_int16(ack->flags);
}
return OG_SUCCESS;
}
TCP x.x.x.x:port, database_server1:port
RDMA: RDMA@x.x.x.x:port
IPC:/home/ogdb
UDS:/home/ogdb */
typedef struct st_server_info {
cs_pipe_type_t type;
char path[OG_FILE_NAME_BUFFER_SIZE];
uint16 port;
} server_info_t;
static status_t cs_parse_url(const char *url, server_info_t *server)
{
text_t text;
text_t part1;
text_t part2;
text_t ipc = { "IPC", 3 };
text_t uds = { "UDS", 3 };
cm_str2text((char *)url, &text);
(void)cm_split_rtext(&text, ':', '\0', &part1, &part2);
if (cm_text_equal_ins(&part1, &ipc)) {
server->type = CS_TYPE_IPC;
OG_RETURN_IFERR(cm_text2str(&part2, server->path, OG_FILE_NAME_BUFFER_SIZE));
if (part2.len == 0) {
OG_THROW_ERROR(ERR_CLT_INVALID_ATTR, "URL", url);
return OG_ERROR;
}
} else if (cm_text_equal_ins(&part1, &uds)) {
server->type = CS_TYPE_DOMAIN_SCOKET;
} else if (cm_text_str_contain_equal_ins(&part1, RDMA_HOST_PREFIX, RDMA_HOST_PREFIX_LEN)) {
text_t prefix;
(void)cm_fetch_text(&part1, '@', '\0', &prefix);
server->type = CS_TYPE_RSOCKET;
cm_text2str(&part1, server->path, OG_FILE_NAME_BUFFER_SIZE);
if (!cm_is_short(&part2)) {
OG_THROW_ERROR(ERR_CLT_INVALID_ATTR, "rdma URL", url);
return OG_ERROR;
}
if (cm_text2uint16(&part2, &server->port) != OG_SUCCESS) {
return OG_ERROR;
}
} else {
server->type = CS_TYPE_TCP;
OG_RETURN_IFERR(cm_text2str(&part1, server->path, OG_FILE_NAME_BUFFER_SIZE));
if (!cm_is_short(&part2)) {
OG_THROW_ERROR(ERR_CLT_INVALID_ATTR, "URL", url);
return OG_ERROR;
}
if (cm_text2uint16(&part2, &server->port) != OG_SUCCESS) {
return OG_ERROR;
}
}
return OG_SUCCESS;
}
status_t cs_connect(const char *url, cs_pipe_t *pipe, const char *bind_host,
const char *server_path, const char *client_path)
{
uint8 local_endian;
link_ready_ack_t ack;
server_info_t server;
CM_POINTER2(url, pipe);
OG_RETURN_IFERR(cs_parse_url(url, &server));
pipe->type = server.type;
pipe->node_type = CS_RESERVED;
if (pipe->type == CS_TYPE_TCP) {
OG_RETURN_IFERR(cs_open_tcp_link(server.path, server.port, pipe, &ack, bind_host));
} else if (pipe->type == CS_TYPE_DOMAIN_SCOKET) {
if (CM_IS_EMPTY_STR(server_path)) {
OG_THROW_ERROR(ERR_CLT_UDS_FILE_EMPTY);
return OG_ERROR;
}
OG_RETURN_IFERR(cs_open_uds_link(server_path, client_path, pipe, &ack));
} else if (pipe->type == CS_TYPE_RSOCKET) {
OG_RETURN_IFERR(cs_open_rdma_link(server.path, server.port, pipe, &ack));
} else {
OG_THROW_ERROR(ERR_PROTOCOL_NOT_SUPPORT);
return OG_ERROR;
}
local_endian = (IS_BIG_ENDIAN ? (uint8)1 : (uint8)0);
if (local_endian != ack.endian) {
pipe->options |= CSO_DIFFERENT_ENDIAN;
}
if ((ack.flags & CS_FLAG_CLIENT_SSL) && ack.handshake_version >= CS_VERSION_5) {
pipe->options |= CSO_CLIENT_SSL;
} else {
pipe->options &= ~CSO_CLIENT_SSL;
}
if (ack.flags & CS_FLAG_CN_CONN) {
pipe->node_type = CS_TYPE_CN;
} else if (ack.flags & CS_FLAG_DN_CONN) {
pipe->node_type = CS_TYPE_DN;
}
pipe->version = ack.handshake_version;
return OG_SUCCESS;
}
void cs_disconnect(cs_pipe_t *pipe)
{
CM_POINTER(pipe);
if (pipe->type == CS_TYPE_TCP) {
cs_tcp_disconnect(&pipe->link.tcp);
}
if (pipe->type == CS_TYPE_SSL) {
cs_ssl_disconnect(&pipe->link.ssl);
}
if (pipe->type == CS_TYPE_DOMAIN_SCOKET) {
cs_uds_disconnect(&pipe->link.uds);
}
if (pipe->type == CS_TYPE_RSOCKET) {
cs_rdma_disconnect(&pipe->link.rdma);
}
}
void cs_shutdown(cs_pipe_t *pipe)
{
switch (pipe->type) {
case CS_TYPE_TCP:
cs_shutdown_socket(pipe->link.tcp.sock);
break;
case CS_TYPE_SSL:
cs_shutdown_socket(pipe->link.ssl.tcp.sock);
break;
case CS_TYPE_DOMAIN_SCOKET:
cs_shutdown_socket(pipe->link.uds.sock);
break;
case CS_TYPE_RSOCKET:
cs_shutdown_socket(pipe->link.rdma.sock);
break;
default:
break;
}
}
status_t cs_read(cs_pipe_t *pipe, cs_packet_t *pack, bool32 cs_client)
{
CM_POINTER2(pipe, pack);
pack->options = pipe->options;
return cs_read_packet(pipe, pack, cs_client);
}
status_t cs_read_bytes(cs_pipe_t *pipe, char *buf, uint32 max_size, int32 *size)
{
CM_POINTER(pipe);
uint32 wait_event;
if (cs_wait(pipe, CS_WAIT_FOR_READ, OG_NETWORK_IO_TIMEOUT, NULL) != OG_SUCCESS) {
return OG_ERROR;
}
return VIO_RECV(pipe, buf, max_size, size, &wait_event);
}
status_t cs_read_fixed_size(cs_pipe_t *pipe, char *buf, int32 size)
{
bool32 ready;
int32 read_size;
int32 remain_size = size;
char *read_buf = buf;
if (size == 0) {
return OG_SUCCESS;
}
if (pipe->type == CS_TYPE_SSL) {
if (cs_ssl_read_buffer(&pipe->link.ssl, buf, remain_size, &read_size) == OG_SUCCESS) {
read_buf += read_size;
remain_size -= read_size;
}
}
if (remain_size == 0) {
return OG_SUCCESS;
}
if (cs_read_bytes(pipe, read_buf, remain_size, &read_size) != OG_SUCCESS) {
return OG_ERROR;
}
read_buf += read_size;
remain_size -= read_size;
while (remain_size > 0) {
if (cs_wait(pipe, CS_WAIT_FOR_READ, OG_NETWORK_IO_TIMEOUT, &ready) != OG_SUCCESS) {
return OG_ERROR;
}
if (!ready) {
continue;
}
if (cs_read_bytes(pipe, read_buf, remain_size, &read_size) != OG_SUCCESS) {
return OG_ERROR;
}
if (read_size < 0) {
return OG_ERROR;
}
read_buf += read_size;
remain_size -= read_size;
}
return OG_SUCCESS;
}
status_t cs_send_fixed_size(cs_pipe_t *pipe, char *buf, int32 size, uint32_t dst_id,
check_inst_alive_func check_dst_alive)
{
bool32 ready;
int32 send_size;
int32 remain_size = size;
char *send_buf = buf;
if (VIO_SEND(pipe, send_buf, remain_size, &send_size) != OG_SUCCESS) {
return OG_ERROR;
}
send_buf += send_size;
remain_size -= send_size;
while (remain_size > 0) {
if (cs_wait(pipe, CS_WAIT_FOR_WRITE, OG_NETWORK_WAIT_TIMEOUT, &ready) != OG_SUCCESS) {
return OG_ERROR;
}
if (!ready) {
if (check_dst_alive != NULL && !check_dst_alive(dst_id)) {
return OG_ERROR;
}
continue;
}
if (VIO_SEND(pipe, send_buf, remain_size, &send_size) != OG_SUCCESS) {
return OG_ERROR;
}
send_buf += send_size;
remain_size -= send_size;
}
return OG_SUCCESS;
}
status_t cs_send_bytes(cs_pipe_t *pipe, const char *buf, uint32 size)
{
CM_POINTER2(pipe, buf);
return VIO_SEND_TIMED(pipe, buf, size, OG_NETWORK_IO_TIMEOUT);
}
status_t cs_write(cs_pipe_t *pipe, cs_packet_t *pack)
{
CM_POINTER2(pipe, pack);
pack->options = pipe->options;
return cs_write_packet(pipe, pack);
}
status_t cs_wait(cs_pipe_t *pipe, uint32 wait_for, int32 timeout, bool32 *ready)
{
if (pipe->type == CS_TYPE_TCP) {
return cs_tcp_wait(&pipe->link.tcp, wait_for, timeout, ready);
}
if (pipe->type == CS_TYPE_SSL) {
return cs_ssl_wait(&pipe->link.ssl, wait_for, timeout, ready);
}
if (pipe->type == CS_TYPE_DOMAIN_SCOKET) {
return cs_uds_wait(&pipe->link.uds, wait_for, timeout, ready);
}
if (pipe->type == CS_TYPE_RSOCKET) {
return cs_rdma_wait(&pipe->link.rdma, wait_for, timeout, ready);
}
OG_THROW_ERROR(ERR_CLT_INVALID_VALUE, "pipe type", (uint32)pipe->type);
return OG_ERROR;
}
status_t cs_call(cs_pipe_t *pipe, cs_packet_t *req, cs_packet_t *ack)
{
if (cs_write(pipe, req) != OG_SUCCESS) {
return OG_ERROR;
}
if (cs_wait(pipe, CS_WAIT_FOR_READ, -1, NULL) != OG_SUCCESS) {
return OG_ERROR;
}
return cs_read(pipe, ack, OG_FALSE);
}
status_t cs_call_ex(cs_pipe_t *pipe, cs_packet_t *req, cs_packet_t *ack)
{
bool32 ready = OG_FALSE;
if (cs_write(pipe, req) != OG_SUCCESS) {
return OG_ERROR;
}
if (cs_wait(pipe, CS_WAIT_FOR_READ, pipe->socket_timeout, &ready) != OG_SUCCESS) {
return OG_ERROR;
}
if (!ready) {
OG_THROW_ERROR(ERR_SOCKET_TIMEOUT, pipe->socket_timeout / OG_TIME_THOUSAND_UN);
return OG_ERROR;
}
return cs_read(pipe, ack, OG_TRUE);
}
status_t cs_write_stream(cs_pipe_t *pipe, const char *buf, uint32 size, int32 max_pkg_size)
{
int32 offset = 0;
int32 remain_size = (int32)size;
int32 data_size;
CM_POINTER2(pipe, buf);
if (max_pkg_size == 0) {
return VIO_SEND_TIMED(pipe, buf, size, OG_REPL_SEND_TIMEOUT);
}
while (remain_size > 0) {
data_size = remain_size > max_pkg_size ? max_pkg_size : remain_size;
if (VIO_SEND_TIMED(pipe, buf + offset, data_size, OG_REPL_SEND_TIMEOUT) != OG_SUCCESS) {
return OG_ERROR;
}
offset += data_size;
remain_size -= data_size;
}
return OG_SUCCESS;
}
status_t cs_write_stream_timeout(cs_pipe_t *pipe, const char *buf, uint32 size, int32 max_pkg_size, uint32 timeout)
{
int32 offset = 0;
int32 remain_size = (int32)size;
int32 data_size;
CM_POINTER2(pipe, buf);
if (max_pkg_size == 0) {
return VIO_SEND_TIMED(pipe, buf, size, timeout);
}
while (remain_size > 0) {
data_size = remain_size > max_pkg_size ? max_pkg_size : remain_size;
if (VIO_SEND_TIMED(pipe, buf + offset, data_size, timeout) != OG_SUCCESS) {
return OG_ERROR;
}
offset += data_size;
remain_size -= data_size;
}
return OG_SUCCESS;
}
status_t cs_read_stream(cs_pipe_t *pipe, char *buf, uint32 timeout, uint32 max_size, int32 *size)
{
bool32 ready = OG_FALSE;
uint32 read_size;
uint32 recv_size;
uint32 new_timeout;
uint32 retry_count;
uint32 wait_event = 0;
CM_POINTER2(pipe, buf);
read_size = 0;
recv_size = 0;
retry_count = 0;
new_timeout = timeout;
if (pipe->type == CS_TYPE_SSL) {
if (VIO_RECV(pipe, buf, max_size, (int32 *)&recv_size, &wait_event) != OG_SUCCESS) {
return OG_ERROR;
}
read_size = recv_size;
}
wait_event = (wait_event == 0) ? CS_WAIT_FOR_READ : wait_event;
while (read_size < max_size) {
if (VIO_WAIT(pipe, wait_event, new_timeout, &ready) != OG_SUCCESS) {
return OG_ERROR;
}
if (!ready) {
if (read_size > 0) {
retry_count++;
if (retry_count > OG_MAX_REP_RETRY_COUNT) {
OG_THROW_ERROR (ERR_TCP_TIMEOUT_REMAIN, (int32)(max_size - read_size));
return OG_ERROR;
}
new_timeout = (timeout == 0) ? OG_POLL_WAIT : timeout;
continue;
}
break;
}
if (VIO_RECV(pipe, buf + read_size, max_size - read_size, (int32 *)&recv_size, &wait_event) != OG_SUCCESS) {
return OG_ERROR;
}
read_size += recv_size;
}
(*size) = read_size;
return OG_SUCCESS;
}
status_t cs_ssl_accept(ssl_ctx_t *fd, cs_pipe_t *pipe)
{
ssl_link_t *link = NULL;
link = &pipe->link.ssl;
link->ssl_ctx = fd;
if (cs_ssl_accept_socket(link, pipe->link.tcp.sock, OG_SSL_IO_TIMEOUT) != OG_SUCCESS) {
return OG_ERROR;
}
pipe->type = CS_TYPE_SSL;
return OG_SUCCESS;
}
status_t cs_ssl_connect(ssl_ctx_t *fd, cs_pipe_t *pipe)
{
ssl_link_t *link = NULL;
link = &pipe->link.ssl;
link->ssl_ctx = fd;
if (cs_ssl_connect_socket(link, pipe->link.tcp.sock, OG_SSL_IO_TIMEOUT) != OG_SUCCESS) {
return OG_ERROR;
}
pipe->type = CS_TYPE_SSL;
return OG_SUCCESS;
}
socket_t cs_get_socket_fd(cs_pipe_t* pipe)
{
if (pipe->type == CS_TYPE_TCP) {
return pipe->link.tcp.sock;
} else if (pipe->type == CS_TYPE_SSL) {
return pipe->link.ssl.tcp.sock;
} else if (pipe->type == CS_TYPE_RSOCKET) {
return pipe->link.rdma.sock;
} else {
return CS_INVALID_SOCKET;
}
}
#ifdef __cplusplus
}
#endif