* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* libcomm_interface.cpp
*
* IDENTIFICATION
* src/gausskernel/cbb/communication/libcomm_utils/libcomm_interface.cpp
*
* -------------------------------------------------------------------------
*/
#include <arpa/inet.h>
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <libcgroup.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <net/if.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/param.h>
#include <sys/time.h>
#include <unistd.h>
#include "../libcomm_core/mc_tcp.h"
#include "../libcomm_core/mc_poller.h"
#include "../libcomm_utils/libcomm_thread.h"
#include "../libcomm_common.h"
#include "libcomm_lqueue.h"
#include "libcomm_queue.h"
#include "libcomm_lock_free_queue.h"
#include "distributelayer/streamCore.h"
#include "distributelayer/streamProducer.h"
#include "pgxc/poolmgr.h"
#include "libpq/auth.h"
#include "libpq/pqsignal.h"
#include "storage/ipc.h"
#include "utils/ps_status.h"
#include "utils/dynahash.h"
#include "vecexecutor/vectorbatch.h"
#include "vecexecutor/vecnodes.h"
#include "executor/exec/execStream.h"
#include "miscadmin.h"
#include "gssignal/gs_signal.h"
#include "pgxc/pgxc.h"
#include "../lib_hcom4db/libhcom.h"
#ifdef ENABLE_UT
#define static
#endif
int g_ackchk_time = 2000;
extern LibcommAdaptLayer g_libcomm_adapt;
extern int gs_check_cmailbox_data(const gsocket* gs_sock_array, int nproducer, int* producer,
bool close_expected, check_cmailbox_option opt);
bool is_tcp_mode()
{
return g_libcomm_adapt.recv_data == libcomm_tcp_recv;
}
static void recv_ackchk_msg(int sock)
{
struct pollfd input_fd;
input_fd.fd = sock;
input_fd.events = POLLIN;
input_fd.revents = 0;
if (poll(&input_fd, 1, 0) > 0) {
char buf[1024];
(void)recv(input_fd.fd, buf, sizeof(buf), 0);
}
}
int gs_send_ctrl_msg_without_lock(struct node_sock* ns, FCMSG_T* msg, int node_idx, int role)
{
int ctrl_sock = -1;
int ctrl_sock_id = -1;
int rc = -1;
int send_bytes = 0;
uint64 time_enter, time_now;
time_enter = mc_timers_ms();
ctrl_sock = ns->get_nl(CTRL_TCP_SOCK, &ctrl_sock_id);
if (ctrl_sock >= 0) {
while (1) {
* but we will assure the data will
* be sent out if the network is ok
*/
rc = mc_tcp_write_noblock(ctrl_sock, (char*)msg + send_bytes, sizeof(FCMSG_T) - send_bytes);
if (rc < 0) {
break;
}
if ((role == ROLE_PRODUCER) && (g_instance.comm_cxt.g_s_node_sock[node_idx].ip_changed == true)) {
errno = ECOMMTCPPEERCHANGED;
rc = -1;
shutdown(ctrl_sock, SHUT_RDWR);
break;
}
send_bytes += rc;
if ((uint32)send_bytes >= sizeof(FCMSG_T)) {
break;
}
time_now = mc_timers_ms();
if (((time_now - time_enter) >
(((uint64)(unsigned)g_instance.comm_cxt.counters_cxt.g_comm_send_timeout) * SEC_TO_MICRO_SEC)) &&
(time_now > time_enter)) {
errno = ECOMMTCPSENDTIMEOUT;
rc = -1;
shutdown(ctrl_sock, SHUT_RDWR);
break;
}
}
}
if (rc <= 0) {
LIBCOMM_ELOG(WARNING,
"(SendCtrlMsg)\tFailed to send type[%s] message to node:%s with socket[%d,%d]:%s.",
ctrl_msg_string(msg->type),
ns->remote_nodename,
ctrl_sock,
ctrl_sock_id,
mc_strerror(errno));
}
COMM_DEBUG_CALL(printfcmsg("SendCtrlMsg", msg));
return rc;
}
int gs_send_ctrl_msg(struct node_sock* ns, FCMSG_T* msg, int role)
{
int rc = -1;
if (msg->node_idx == 0 && msg->streamid == 0) {
return 0;
}
ns->lock();
rc = gs_send_ctrl_msg_without_lock(ns, msg, msg->node_idx, role);
ns->unlock();
return rc;
}
int gs_send_ctrl_msg_by_socket(LibCommConn *conn, FCMSG_T* msg, int node_idx)
{
int rc = LibCommClientWriteBlock(conn, (void*)msg, sizeof(FCMSG_T));
if (rc <= 0) {
LIBCOMM_ELOG(WARNING,
"(SendCtrlMsg)\tFailed to send type[%s] message to node[%d]:%s with socket[%d]:%s.",
ctrl_msg_string(msg->type),
msg->node_idx,
msg->nodename,
conn->socket,
mc_strerror(errno));
}
COMM_DEBUG_CALL(printfcmsg("SendCtrlMsg", msg));
return rc;
}
static int getRecvError(const sock_id fdId, int nodeIdx, struct mc_lqueue_item* iovItem)
{
int error = RECV_NEED_RETRY;
struct iovec* iov = iovItem->element.data;
uint16 msgType = *(uint16*)iov->iov_base;
switch (msgType) {
case LIBCOMM_PKG_TYPE_CONNECT:
error = gs_accept_data_conntion(iov, fdId);
break;
case LIBCOMM_PKG_TYPE_DELAY_REQUEST:
case LIBCOMM_PKG_TYPE_DELAY_REPLY:
error = gs_handle_data_delay_message(nodeIdx, iovItem, msgType);
if (0 == error) {
error = RECV_NEED_RETRY;
}
break;
default:
struct libcomm_delay_package* delayMsg = (struct libcomm_delay_package*)iov->iov_base;
COMM_DEBUG_LOG("[DELAY_INFO]recv invalid type[%d] sn=%d\n", msgType, delayMsg->sn);
error = RECV_NET_ERROR;
break;
}
return error;
}
void WakeupSession(SessionInfo *session_info, bool err_occurs, const char* caller_name)
{
CommEpollFd *comm_epfd = NULL;
if (session_info == NULL) {
LIBCOMM_ELOG(LOG, "Trace: CommEpollFd NotifyListener(%s), no register", caller_name);
return;
}
if (gs_compare_and_swap_32(&session_info->is_idle, (int)true, (int)false) == false) {
LIBCOMM_ELOG(LOG, "Trace: CommEpollFd NotifyListener(%s), no event "
"detail: idx[%d], streamid[%d], is_idle[false]",
caller_name,
session_info->commfd.logic_fd.idx,
session_info->commfd.logic_fd.sid);
return;
}
comm_epfd = session_info->comm_epfd_ptr;
comm_epfd->PushReadyEvent(session_info, err_occurs);
LIBCOMM_ELOG(LOG, "Trace: CommEpollFd DoWakeup(%s), normal event detail:epfd[%d], idx[%d], streamid[%d]",
caller_name,
comm_epfd->m_epfd,
session_info->commfd.logic_fd.idx,
session_info->commfd.logic_fd.sid);
comm_epfd->DoWakeup();
}
void NotifyListener(struct c_mailbox* cmailbox, bool err_occurs, const char* caller_name)
{
Assert(cmailbox);
SessionInfo *session_info = NULL;
if (cmailbox->session_info_ptr) {
session_info = cmailbox->session_info_ptr;
} else {
LogicFd logic_fd = {cmailbox->idx, cmailbox->streamid};
session_info = g_instance.comm_logic_cxt.comm_fd_collection->GetSessionInfo(&logic_fd);
cmailbox->session_info_ptr = session_info;
}
WakeupSession(session_info, err_occurs, caller_name);
}
int gs_internal_recv(const sock_id fd_id, int node_idx, int flags)
{
int recvsk = fd_id.fd;
int idx = node_idx;
int error = 0;
struct c_mailbox* cmailbox = NULL;
COMM_TIMER_INIT();
for (;;) {
LibcommRecvInfo recv_info = {0};
COMM_TIMER_LOG("(r|inner recv)\tInternal receive start.");
int64 curr_rcv_time = 0;
int64 last_rcv_time = 0;
if (idx >= 0) {
last_rcv_time = g_instance.comm_cxt.g_receivers->receiver_conn[idx].last_rcv_time;
}
recv_info.socket = recvsk;
recv_info.node_idx = idx;
if (idx >= 0) {
LIBCOMM_PTHREAD_RWLOCK_WRLOCK(&g_instance.comm_cxt.g_receivers->receiver_conn[idx].rwlock);
}
error = g_libcomm_adapt.recv_data(&recv_info);
if (idx >= 0) {
LIBCOMM_PTHREAD_RWLOCK_UNLOCK(&g_instance.comm_cxt.g_receivers->receiver_conn[idx].rwlock);
}
COMM_TIMER_LOG("(r|inner recv)\tInternal receive something.");
if (unlikely((error == RECV_NEED_RETRY))) {
COMM_DEBUG_LOG("(r|inner recv)\tReceiver from node[%d] socket[%d]:%s.", idx, recvsk, strerror(errno));
return RECV_NEED_RETRY;
}
if (unlikely(error < 0)) {
LIBCOMM_ELOG(WARNING,
"(r|inner recv)\tFailed to receive sock[%d], "
"return %d, from node[%d]:%s, %s.",
recvsk,
error,
idx,
REMOTE_NAME(g_instance.comm_cxt.g_r_node_sock, idx),
strerror(errno));
return error;
}
int streamid = recv_info.streamid;
int version = recv_info.version;
struct mc_lqueue_item* iov_item = recv_info.iov_item;
struct iovec* iov = iov_item->element.data;
* We recv a small message and the last message is very early,
* we want to send a ack message in order to avoid tcp ack package missing.
* The message should be less than MTU, so we use 1024 byte.
*/
if (is_tcp_mode() && streamid > 0 && g_ackchk_time > 0 && iov->iov_len < 1024 && idx >= 0) {
curr_rcv_time = g_instance.comm_cxt.g_receivers->receiver_conn[idx].last_rcv_time;
if (last_rcv_time > 0 && curr_rcv_time - last_rcv_time > g_ackchk_time) {
#ifdef USE_SSL
if (g_instance.attr.attr_network.comm_enable_SSL) {
SSL* ssl = LibCommClientGetSSLForSocket(recvsk);
if (ssl != NULL) {
LIBCOMM_ELOG(LOG, "gs_internal_recv start\n");
LibCommClientSSLWrite(ssl, "ACK", strlen("ACK"));
} else {
return RECV_NEED_RETRY;
}
} else
#endif
{
send(recvsk, "ACK", strlen("ACK"), 0);
}
}
}
if (streamid == 0) {
error = getRecvError(fd_id, node_idx, iov_item);
if (error == RECV_NEED_RETRY) {
return RECV_NEED_RETRY;
}
libcomm_free_iov_item(&iov_item, IOV_DATA_SIZE);
return error;
} else if (streamid > 0 && idx >= 0) {
cmailbox = &C_MAILBOX(idx, streamid);
if (gs_push_cmailbox_buffer(cmailbox, iov_item, version) < 0) {
libcomm_free_iov_item(&iov_item, IOV_DATA_SIZE);
}
if (ENABLE_THREAD_POOL_DN_LOGICCONN) {
NotifyListener(cmailbox, false, __FUNCTION__);
}
COMM_TIMER_LOG("(r|inner recv)\tInternal receive finish for [%d,%d].", idx, streamid);
return RECV_NEED_RETRY;
} else {
COMM_DEBUG_LOG("(r|inner recv)\tWrong stream id %d:%d from Node %d, sock[%d], error[%d]:%s.",
streamid,
version,
idx,
recvsk,
error,
strerror(errno));
libcomm_free_iov_item(&iov_item, IOV_DATA_SIZE);
return RECV_NEED_RETRY;
}
}
}
int gs_recv(
gsocket* gs_sock,
void* buff,
int buff_size)
{
if ((gs_sock == NULL) || (buff == NULL) || (buff_size <= 0)) {
LIBCOMM_ELOG(WARNING,
"(r|recv)\tInvalid argument: "
"%s%sbuff size:%d.",
gs_sock == NULL ? "gs_sock is NULL, " : "",
buff == NULL ? "buff is NULL, " : "",
buff_size);
errno = ECOMMTCPARGSINVAL;
return -1;
}
uint64 time_enter = COMM_STAT_TIME();
uint64 time_now = time_enter;
int idx = gs_sock->idx;
int streamid = gs_sock->sid;
int version = gs_sock->ver;
struct FCMSG_T fcmsgs = {0x0};
if ((idx < 0) || (idx >= g_instance.comm_cxt.counters_cxt.g_cur_node_num) || (streamid <= 0) ||
(streamid >= g_instance.comm_cxt.counters_cxt.g_max_stream_num)) {
LIBCOMM_ELOG(WARNING, "(r|recv)\tInvalid argument: node idx:%d, stream id:%d.", idx, streamid);
errno = ECOMMTCPARGSINVAL;
return -1;
}
AutoContextSwitch commContext(g_instance.comm_cxt.comm_global_mem_cxt);
int ret = -1;
struct c_mailbox* cmailbox = NULL;
struct iovec* iov = NULL;
struct mc_lqueue_item* q_item = NULL;
bool is_notify_quota = false;
bool TempImmediateInterruptOK = t_thrd.int_cxt.ImmediateInterruptOK;
t_thrd.int_cxt.ImmediateInterruptOK = false;
errno_t ss_rc = 0;
errno = 0;
PGSTAT_INIT_TIME_RECORD();
PGSTAT_START_TIME_RECORD();
COMM_TIMER_INIT();
COMM_TIMER_LOG("(r|recv)\tReceive start for node[%d,%d]:%s.",
idx,
streamid,
REMOTE_NAME(g_instance.comm_cxt.g_r_node_sock, idx));
cmailbox = &(C_MAILBOX(idx, streamid));
LIBCOMM_PTHREAD_MUTEX_LOCK(&cmailbox->sinfo_lock);
if (false == gs_check_mailbox(cmailbox->local_version, version)) {
MAILBOX_ELOG(
cmailbox, WARNING, "(r|recv)\tStream has already closed, detail:%s.", mc_strerror(cmailbox->close_reason));
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&cmailbox->sinfo_lock);
errno = cmailbox->close_reason;
ret = -1;
goto return_result;
}
if (cmailbox->buff_q->is_empty == 1) {
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&cmailbox->sinfo_lock);
errno = ECOMMTCPNODATA;
ret = -1;
goto return_result;
}
q_item = mc_lqueue_remove(cmailbox->buff_q, q_item);
if (q_item != NULL) {
iov = q_item->element.data;
} else {
iov = NULL;
}
COMM_TIMER_LOG("(r|recv)\tReceived data for node[%d,%d]:%s.",
idx,
streamid,
REMOTE_NAME(g_instance.comm_cxt.g_r_node_sock, idx));
if (iov == NULL || iov->iov_len == 0) {
libcomm_free_iov_item(&q_item, IOV_DATA_SIZE);
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&cmailbox->sinfo_lock);
errno = ECOMMTCPNODATA;
ret = -1;
goto return_result;
}
if (g_instance.comm_cxt.quota_cxt.g_having_quota) {
is_notify_quota = gs_r_quota_notify(cmailbox, &fcmsgs);
COMM_TIMER_LOG("(r|recv)\tSend quota to node[%d,%d]:%s.",
idx,
streamid,
REMOTE_NAME(g_instance.comm_cxt.g_r_node_sock, idx));
}
COMM_DEBUG_LOG("(r|recv)\tNode[%d]:%s stream[%d] recv %zu msg:%c. "
"bufCAP[%lu] and buff_q->u_size[%lu].",
idx,
g_instance.comm_cxt.g_r_node_sock[idx].remote_nodename,
streamid,
iov->iov_len,
((char*)iov->iov_base)[0],
cmailbox->bufCAP,
cmailbox->buff_q->u_size);
time_now = COMM_STAT_TIME();
COMM_STAT_CALL(cmailbox, cmailbox->statistic->gs_recv_time += ABS_SUB(time_now, time_enter));
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&cmailbox->sinfo_lock);
if (is_notify_quota && (gs_send_ctrl_msg(&g_instance.comm_cxt.g_r_node_sock[idx], &fcmsgs, ROLE_CONSUMER) <= 0)) {
MAILBOX_ELOG(cmailbox, WARNING, "(r|quota notify)\tFailed to send quota:%s.", mc_strerror(errno));
}
LIBCOMM_ASSERT((bool)(buff_size >= (int)iov->iov_len), idx, streamid, ROLE_CONSUMER);
ret = (int)iov->iov_len;
ss_rc = memcpy_s(buff, buff_size, iov->iov_base, iov->iov_len);
securec_check(ss_rc, "\0", "\0");
libcomm_free_iov_item(&q_item, IOV_DATA_SIZE);
COMM_TIMER_LOG("(r|recv)\tReceive finish for node[%d,%d]:%s.",
idx,
streamid,
REMOTE_NAME(g_instance.comm_cxt.g_r_node_sock, idx));
return_result:
LIBCOMM_INTERFACE_END(false, TempImmediateInterruptOK);
END_NET_STREAM_RECV_INFO(ret);
return ret;
}
* function name : gs_receivers_flow_handle_tid_request
* description : save peer thread id when received CTRL_PEER_TID message.
* arguments : _in_ fcmsgr: the message that receivers_flow thread received.
* return value : void
*/
void gs_receivers_flow_handle_tid_request(FCMSG_T* fcmsgr)
{
int streamid = fcmsgr->streamid;
int node_idx = fcmsgr->node_idx;
int version = fcmsgr->version;
struct c_mailbox* cmailbox = NULL;
cmailbox = &(C_MAILBOX(node_idx, streamid));
LIBCOMM_PTHREAD_MUTEX_LOCK(&(cmailbox->sinfo_lock));
if (true == gs_check_mailbox(cmailbox->local_version, version)) {
cmailbox->peer_thread_id = fcmsgr->extra_info;
}
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&(cmailbox->sinfo_lock));
}
static int gs_internal_connect(libcommaddrinfo* libcomm_addrinfo)
{
* Step 1: Initialize local variable
*/
if (libcomm_addrinfo == NULL) {
LIBCOMM_ELOG(WARNING, "(s|connect)\tInvalid argument: libcomm addr info is NULL");
errno = ECOMMTCPARGSINVAL;
return -1;
}
if (unlikely((libcomm_addrinfo->host == NULL) || (libcomm_addrinfo->ctrl_port <= 0) ||
(libcomm_addrinfo->listen_port <= 0))) {
LIBCOMM_ELOG(WARNING,
"(s|connect)\tInvalid argument: %s"
"tcp port:%d, libcomm port:%d.",
libcomm_addrinfo->host == NULL ? "host is NULL, " : "",
libcomm_addrinfo->ctrl_port,
libcomm_addrinfo->listen_port);
errno = ECOMMTCPARGSINVAL;
return -1;
}
int rc = 0;
int node_idx = -1;
int streamid = -1;
errno_t ss_rc;
uint32 cpylen;
int to_ctrl_tcp_port = libcomm_addrinfo->ctrl_port;
int node_shift = gs_get_nodeshift(libcomm_addrinfo->nodename);
char namebuf[NAMEDATALEN];
rc = memcpy_s(namebuf, NAMEDATALEN, libcomm_addrinfo->nodename, NAMEDATALEN);
securec_check(rc, "\0", "\0");
rc = sprintf_s(libcomm_addrinfo->nodename, NAMEDATALEN, "%d_%s", node_shift, namebuf);
securec_check_ss_c(rc, "\0", "\0");
rc = sprintf_s(libcomm_addrinfo->selfnodename, NAMEDATALEN, "%d_%s",
node_shift, g_instance.comm_cxt.localinfo_cxt.g_self_nodename);
securec_check_ss_c(rc, "\0", "\0");
libcomm_addrinfo->shift = node_shift;
uint64 time_enter = COMM_STAT_TIME();
uint64 time_now = time_enter;
* Step 1: Modify localhost to IP
*/
if (IS_LOCAL_HOST(libcomm_addrinfo->host)) {
libcomm_addrinfo->host = "127.0.0.1";
}
* Step 2: Get or set node index by remote nodename
*/
COMM_TIMER_INIT();
node_idx = gs_get_node_idx(libcomm_addrinfo->nodename);
if (unlikely(node_idx < 0)) {
LIBCOMM_ELOG(WARNING,
"(s|send connect)\tFailed to get node index for %s: %s.",
libcomm_addrinfo->nodename,
mc_strerror(errno));
return -1;
}
COMM_TIMER_LOG("(s|send connect)\tConnect start for node[%d]:%s.",
node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
COMM_TIMER_LOG("(s|send connect)\tBuild ctrl channel connect for node[%d]:%s.",
node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
if (unlikely(gs_s_check_connection(libcomm_addrinfo, node_idx, false, CTRL_CHANNEL) == false)) {
LIBCOMM_ELOG(WARNING,
"(s|connect)\tFailed to connect to host:port[%s:%d], node name[%s].",
libcomm_addrinfo->host,
to_ctrl_tcp_port,
libcomm_addrinfo->nodename);
return -1;
}
COMM_TIMER_LOG("(s|send connect)\tBuild data connection for node[%d]:%s.",
node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
if (unlikely(gs_s_check_connection(libcomm_addrinfo, node_idx, false, DATA_CHANNEL) == false)) {
LIBCOMM_ELOG(WARNING,
"(s|connect)\tFailed to build data connection "
"to %s:%d for node[%d]:%s, detail:%s.",
libcomm_addrinfo->host,
to_ctrl_tcp_port,
node_idx,
libcomm_addrinfo->nodename,
mc_strerror(errno));
errno = ECOMMTCPCONNFAIL;
return -1;
}
COMM_TIMER_LOG("(s|send connect)\tBuild data logical connection for node[%d]:%s.",
node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
* Step 6: Get stream id, initialize pmailbox and save information keys -> stream index in
* hash table (g_s_htab_nodeid_skey_to_stream)
*/
streamid = gs_get_stream_id(node_idx);
if (streamid < 0) {
LIBCOMM_ELOG(WARNING,
"(s|connect)\tFailed to obtain logic stream idx for connect %s:%d, node:%s.",
libcomm_addrinfo->host,
to_ctrl_tcp_port,
libcomm_addrinfo->nodename);
return -1;
}
struct p_mailbox* pmailbox = &P_MAILBOX(node_idx, streamid);
LIBCOMM_PTHREAD_MUTEX_LOCK(&pmailbox->sinfo_lock);
Assert(pmailbox->state == MAIL_CLOSED);
uint16 version = pmailbox->local_version + 1;
if (version >= MAX_MAILBOX_VERSION) {
version = 0;
}
pmailbox->local_version = version;
pmailbox->ctrl_tcp_sock = g_instance.comm_cxt.g_s_node_sock[node_idx].get_nl(CTRL_TCP_SOCK, NULL);
pmailbox->state = MAIL_READY;
pmailbox->bufCAP = 0;
pmailbox->stream_key = libcomm_addrinfo->streamKey;
pmailbox->query_id = IS_SPQ_COORDINATOR ? libcomm_addrinfo->streamKey.queryId : DEBUG_QUERY_ID;
pmailbox->local_thread_id = 0;
pmailbox->peer_thread_id = 0;
pmailbox->close_reason = 0;
pmailbox->shift = node_shift;
if (g_instance.comm_cxt.commutil_cxt.g_stat_mode && (pmailbox->statistic == NULL)) {
LIBCOMM_MALLOC(pmailbox->statistic, sizeof(struct pmailbox_statistic), pmailbox_statistic);
if (NULL == pmailbox->statistic) {
errno = ECOMMTCPRELEASEMEM;
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&(pmailbox->sinfo_lock));
return -1;
}
}
if (pmailbox->statistic != NULL) {
time_now = COMM_STAT_TIME();
pmailbox->statistic->start_time = time_now;
pmailbox->statistic->connect_time = time_enter;
}
* Step 7: Send connection request to consumer with stream id
* Send ready control message
*/
COMM_TIMER_LOG("(s|send connect)\tSend ready message to node[%d, %d]:%s.",
node_idx,
streamid,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
struct FCMSG_T fcmsgs = {0x0};
fcmsgs.type = (IS_PGXC_COORDINATOR || IS_SPQ_COORDINATOR) ? CTRL_CONN_DUAL : CTRL_CONN_REQUEST;
fcmsgs.extra_info = (IS_PGXC_COORDINATOR && (u_sess != NULL)) ?
(unsigned long)(unsigned)u_sess->pgxc_cxt.NumDataNodes : 0;
fcmsgs.node_idx = node_idx;
fcmsgs.streamid = streamid;
fcmsgs.version = version;
fcmsgs.stream_key = libcomm_addrinfo->streamKey;
fcmsgs.query_id = IS_SPQ_COORDINATOR ? libcomm_addrinfo->streamKey.queryId : DEBUG_QUERY_ID;
cpylen = comm_get_cpylen(libcomm_addrinfo->selfnodename, NAMEDATALEN);
ss_rc = memset_s(fcmsgs.nodename, NAMEDATALEN, 0x0, NAMEDATALEN);
securec_check(ss_rc, "\0", "\0");
ss_rc = strncpy_s(fcmsgs.nodename, NAMEDATALEN, libcomm_addrinfo->selfnodename, cpylen + 1);
securec_check(ss_rc, "\0", "\0");
fcmsgs.nodename[cpylen] = '\0';
fcmsgs.streamcap = ((unsigned long)(unsigned)g_instance.attr.attr_network.comm_control_port << 32) +
(long)g_instance.attr.attr_network.comm_sctp_port;
#ifdef USE_SPQ
if (fcmsgs.type == CTRL_CONN_DUAL) {
bool found = false;
QCConnKey key = {
.query_id = libcomm_addrinfo->streamKey.queryId,
.plan_node_id = libcomm_addrinfo->streamKey.planNodeId,
.node_id = (uint16)node_idx,
.type = SPQ_QE_CONNECTION,
};
pthread_rwlock_wrlock(&g_instance.spq_cxt.adp_connects_lock);
QCConnEntry* entry = (QCConnEntry*)hash_search(g_instance.spq_cxt.adp_connects, (void*)&key, HASH_ENTER, &found);
if (!found) {
entry->forward = {
.idx = (uint16)node_idx,
.sid = (uint16)streamid,
.ver = version,
.type = GSOCK_PRODUCER,
};
entry->backward.idx = 0;
}
pthread_rwlock_unlock(&g_instance.spq_cxt.adp_connects_lock);
}
#endif
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&(pmailbox->sinfo_lock));
rc = gs_send_ctrl_msg(&g_instance.comm_cxt.g_s_node_sock[node_idx], &fcmsgs, ROLE_PRODUCER);
if (rc <= 0) {
LIBCOMM_ELOG(WARNING,
"(s|connect)\tFailed to send ready msg to node[%d]:%s, detail:%s.",
node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx),
mc_strerror(errno));
errno = ECOMMTCPTCPDISCONNECT;
#ifdef USE_SPQ
if (fcmsgs.type == CTRL_CONN_DUAL) {
QCConnKey key = {
.query_id = libcomm_addrinfo->streamKey.queryId,
.plan_node_id = libcomm_addrinfo->streamKey.planNodeId,
.node_id = (uint16)node_idx,
.type = SPQ_QE_CONNECTION,
};
bool found = false;
pthread_rwlock_wrlock(&g_instance.spq_cxt.adp_connects_lock);
hash_search(g_instance.spq_cxt.adp_connects, (void*)&key, HASH_REMOVE, &found);
pthread_rwlock_unlock(&g_instance.spq_cxt.adp_connects_lock);
}
#endif
return -1;
}
* Step 8: set the node index and stream index for caller, and return the stream index
*/
libcomm_addrinfo->gs_sock.idx = node_idx;
libcomm_addrinfo->gs_sock.sid = streamid;
libcomm_addrinfo->gs_sock.ver = version;
libcomm_addrinfo->gs_sock.type = (IS_PGXC_COORDINATOR) ? GSOCK_DAUL_CHANNEL : GSOCK_PRODUCER;
COMM_DEBUG_LOG("(s|send connect)\tConnect finish for node[%d]:%s.",
node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
return streamid;
}
void gs_close_timeout_connections(libcommaddrinfo** libcomm_addrinfo, int addr_num,
int node_idx, int streamid)
{
int i;
struct p_mailbox* pmailbox = NULL;
for (i = 0; i < addr_num; i++) {
pmailbox = &P_MAILBOX(libcomm_addrinfo[i]->gs_sock.idx, libcomm_addrinfo[i]->gs_sock.sid);
LIBCOMM_PTHREAD_MUTEX_LOCK(&pmailbox->sinfo_lock);
if ((gs_check_mailbox(pmailbox->local_version, libcomm_addrinfo[i]->gs_sock.ver)) &&
(pmailbox->state == MAIL_READY)) {
LIBCOMM_ELOG(WARNING,
"(s|parallel connect)\t wait ready response timeout node[%d] stream[%d], node "
"name[%s].",
node_idx,
streamid,
g_instance.comm_cxt.g_s_node_sock[node_idx].remote_nodename);
gs_s_close_logic_connection(pmailbox, ECOMMTCPEPOLLTIMEOUT, NULL);
}
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
}
}
static bool IsValidLibcommAddr(int node_idx, int fd, ip_key* addr)
{
if (fd < 0) {
LIBCOMM_ELOG(WARNING, "(s|connect)\tInvalid libcomm fd[%d] node_id[%d]", fd, node_idx);
return false;
}
struct sockaddr_in sa = {0};
socklen_t len = sizeof(sa);
const int max_ip_addr_len = 64;
if (getpeername(fd, (struct sockaddr *)&sa, &len) < 0) {
LIBCOMM_ELOG(WARNING, "(s|connect)\tFailed to check libcomm addr node[%d]%s, fd[%d]",
node_idx, g_instance.comm_cxt.g_s_node_sock[node_idx].remote_nodename, fd);
return false;
}
struct sockaddr_in* ipConn = (struct sockaddr_in*)&sa;
char peer_host[max_ip_addr_len] = {0};
(void)inet_ntop(AF_INET, &ipConn->sin_addr, (char*)peer_host, max_ip_addr_len);
int peer_port = ntohs(ipConn->sin_port);
if (strcmp(peer_host, addr->ip) == 0 && peer_port == addr->port) {
COMM_DEBUG_LOG("(s|connect)\tConnection is valid:ip-port[%s:%d], get peer ip-port[%s:%d]",
addr->ip, addr->port, peer_host, peer_port);
return true;
} else {
LIBCOMM_ELOG(WARNING,
"(s|connect)\tInvalid libcomm addr node[%d]%s, fd[%d], p-port[%s:%d], get peer ip-port[%s:%d]",
node_idx, g_instance.comm_cxt.g_s_node_sock[node_idx].remote_nodename,
fd, addr->ip, addr->port, peer_host, peer_port);
return false;
}
}
* function name : gs_s_check_connection
* description : sender check that is receiver changed.
* if the destination ip is changed,
* which means the primary and the standby are reverted,
* then we close tcp connection and rebuild later.
* arguments :
* _in_ libcomm_addrinfo: remote infomation.
* _in_ node_idx: remote node index.
* return value :
* false: failed.
* true : succeed.
*/
bool gs_s_check_connection(libcommaddrinfo* libcomm_addrinfo, int node_idx, bool is_reply, int type)
{
ip_key addr;
errno_t ss_rc;
uint32 cpylen;
struct sock_id fd_id = {-1, -1};
cpylen = comm_get_cpylen(libcomm_addrinfo->host, HOST_LEN_OF_HTAB);
ss_rc = memset_s(addr.ip, HOST_LEN_OF_HTAB, 0x0, HOST_LEN_OF_HTAB);
securec_check(ss_rc, "\0", "\0");
ss_rc = strncpy_s(addr.ip, HOST_LEN_OF_HTAB, libcomm_addrinfo->host, cpylen + 1);
securec_check(ss_rc, "\0", "\0");
addr.ip[cpylen] = '\0';
addr.shift = libcomm_addrinfo->shift;
if (type == CTRL_CHANNEL) {
addr.port = libcomm_addrinfo->ctrl_port;
} else {
addr.port = libcomm_addrinfo->listen_port;
}
retry:
int rc = gs_s_get_connection_state(addr, node_idx, type);
if (likely(rc == CONNSTATESUCCEED)) {
if (type == DATA_CHANNEL && !get_dll_status()) {
LIBCOMM_PTHREAD_RWLOCK_WRLOCK(&g_instance.comm_cxt.g_senders->sender_conn[node_idx].rwlock);
if (IsValidLibcommAddr(node_idx,
g_instance.attr.attr_network.comm_data_channel_conn[node_idx - 1]->socket, &addr) == false ||
LibCommClientCheckSocket(g_instance.attr.attr_network.comm_data_channel_conn[node_idx - 1]) != 0) {
fd_id.fd = g_instance.comm_cxt.g_senders->sender_conn[node_idx].socket;
fd_id.id = g_instance.comm_cxt.g_senders->sender_conn[node_idx].socket_id;
LIBCOMM_ELOG(WARNING,
"(s|connect)\tFailed to check libcomm socket "
"node[%d]:%s, errno[%d]:%s, close socket[%d,%d].",
node_idx,
g_instance.comm_cxt.g_s_node_sock[node_idx].remote_nodename,
errno,
mc_strerror(errno),
fd_id.fd,
fd_id.id);
gs_s_close_bad_data_socket(&fd_id, ECOMMTCPDISCONNECT, node_idx);
LIBCOMM_PTHREAD_RWLOCK_UNLOCK(&g_instance.comm_cxt.g_senders->sender_conn[node_idx].rwlock);
goto retry;
}
LIBCOMM_PTHREAD_RWLOCK_UNLOCK(&g_instance.comm_cxt.g_senders->sender_conn[node_idx].rwlock);
}
COMM_DEBUG_LOG("(s|connect)\tAlready has connection:port[%s:%d], node name[%s]].",
addr.ip,
addr.port,
libcomm_addrinfo->nodename);
return true;
} else if (rc == CONNSTATECONNECTING) {
if (type == CTRL_CHANNEL) {
rc = gs_s_build_tcp_ctrl_connection(libcomm_addrinfo, node_idx, is_reply);
} else {
rc = g_libcomm_adapt.connect(libcomm_addrinfo, node_idx);
}
if (rc < 0) {
gs_update_connection_state(addr, CONNSTATEFAIL, true, node_idx);
return false;
}
return true;
} else {
COMM_DEBUG_LOG("(s|connect)\tFailed checking connection state:port[%s:%d], node name[%s] error[%s].",
libcomm_addrinfo->host,
libcomm_addrinfo->ctrl_port,
libcomm_addrinfo->nodename,
mc_strerror(errno));
return false;
}
}
int gs_clean_connection(libcommaddrinfo** libcomm_addrinfo, int addr_num, int error_index,
int re, bool TempImmediateInterruptOK)
{
int i = 0;
libcommaddrinfo* addr_info = NULL;
LIBCOMM_ELOG(WARNING,
"(s|parallel connect)\tFailed to connect node:%s, detail:%s.",
libcomm_addrinfo[error_index]->nodename,
mc_strerror(errno));
for (i = 0; i < addr_num; i++) {
addr_info = libcomm_addrinfo[i];
gs_close_gsocket(&(addr_info->gs_sock));
}
LIBCOMM_INTERFACE_END((re == ETIMEDOUT), TempImmediateInterruptOK);
return (error_index + 1);
}
* function name : gs_connect
* description : build connects one or multiple address.
* arguments : _in_ libcomm_addrinfo: the address info list.
* _in_ addr_num: the number of address info list.
* _in_ error_index: error index of address info list wher connect failed.
* return value : 0: succeed
* : -1: all connection failed
: other value: failed connection index
* call gs_internal_connect, cause we want to receive a MAIL_READY message,
* which means the logic connection has been build successfully.
*/
int gs_connect(libcommaddrinfo** libcomm_addrinfo, int addr_num, int timeout)
{
int re = -1;
int i;
int ret;
if (timeout == -1) {
timeout = CONNECT_TIMEOUT;
}
libcommaddrinfo* addr_info = NULL;
int error_index = -1;
if (libcomm_addrinfo == NULL || addr_num == 0) {
LIBCOMM_ELOG(WARNING,
"(s|connect)\tInvalid argument: %saddress number is %d.",
libcomm_addrinfo == NULL ? "libcomm addr info is NULL, " : "",
addr_num);
errno = ECOMMTCPARGSINVAL;
return -1;
}
AutoContextSwitch commContext(g_instance.comm_cxt.comm_global_mem_cxt);
bool TempImmediateInterruptOK = t_thrd.int_cxt.ImmediateInterruptOK;
t_thrd.int_cxt.ImmediateInterruptOK = false;
errno = 0;
if (gs_poll_create() != 0) {
LIBCOMM_ELOG(WARNING, "(s|parallel connect)\tFailed to malloc for create poll!");
LIBCOMM_INTERFACE_END(false, TempImmediateInterruptOK);
return -1;
}
#ifdef USE_SSL
if (g_instance.attr.attr_network.comm_enable_SSL) {
ret = LibCommClientSecureInit();
if (ret != 0) {
LIBCOMM_ELOG(ERROR, "(s|connect|SSL)\tSet libcomm ssl context initialize failed!\n");
return -1;
}
}
#endif
for (i = 0; i < addr_num; i++) {
addr_info = libcomm_addrinfo[i];
addr_info->gs_sock = GS_INVALID_GSOCK;
pgstat_report_waitstatus_comm(STATE_STREAM_WAIT_CONNECT_NODES,
addr_info->nodeIdx,
addr_num - i,
-1,
global_node_definition ? global_node_definition->num_nodes : -1);
re = gs_internal_connect(addr_info);
if (re < 0) {
if (IS_PGXC_COORDINATOR || IS_SPQ_COORDINATOR) {
continue;
} else {
error_index = i;
return gs_clean_connection(libcomm_addrinfo, addr_num, error_index,
re, TempImmediateInterruptOK);
}
}
}
ret = gs_check_all_mailbox(libcomm_addrinfo, addr_num, re, TempImmediateInterruptOK, timeout);
if (ret > 0) {
return ret;
}
LIBCOMM_INTERFACE_END(false, TempImmediateInterruptOK);
return 0;
}
static int GsWaitPmailboxReady(
struct p_mailbox* pmailbox, SendOptions* options, TimeProfile* time, WaitResult* wr)
{
int node_idx = pmailbox->idx;
int streamid = pmailbox->streamid;
int total_wait_time = 0;
int single_timeout = SINGLE_WAITQUOTA;
for (;;) {
if (pmailbox->bufCAP >= options->need_send_len) {
pmailbox->state = MAIL_RUN;
return 0;
}
pmailbox->state = MAIL_HOLD;
pmailbox->semaphore = t_thrd.comm_cxt.libcomm_semaphore;
COMM_DEBUG_LOG("(s|send)\tNode[%d] stream[%d], node name[%s] in MAIL_HOLD, cap[%lu].",
node_idx,
streamid,
g_instance.comm_cxt.g_s_node_sock[node_idx].remote_nodename,
pmailbox->bufCAP);
if (options->block_mode == FALSE) {
wr->ret = BROADCAST_WAIT_QUOTA;
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
return -1;
}
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
pgstat_report_waitstatus_phase(PHASE_WAIT_QUOTA);
StreamTimeWaitQuotaStart(t_thrd.pgxc_cxt.GlobalNetInstr);
time->start = mc_timers_ms();
wr->ret = gs_poll(single_timeout);
time->end = mc_timers_ms();
StreamTimeWaitQuotaEnd(t_thrd.pgxc_cxt.GlobalNetInstr);
LIBCOMM_PTHREAD_MUTEX_LOCK(&pmailbox->sinfo_lock);
if (gs_check_mailbox(pmailbox->local_version, options->local_version) == false) {
COMM_DEBUG_LOG("(s|send)\tStream has already closed by remote:%s, detail:%s.",
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx),
mc_strerror(pmailbox->close_reason));
errno = pmailbox->close_reason;
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
wr->ret = -1;
return -1;
}
pmailbox->semaphore = NULL;
if (pmailbox->bufCAP >= options->need_send_len) {
pmailbox->state = MAIL_RUN;
return 0;
}
if (wr->ret == 0) {
wr->ret = 0;
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
return -1;
}
if (wr->ret == ETIMEDOUT) {
total_wait_time += single_timeout;
if (total_wait_time >= options->time_out) {
errno = ECOMMTCPEPOLLTIMEOUT;
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
wr->ret = 0;
return -1;
}
}
if (wr->ret != 0 && wr->ret != ETIMEDOUT) {
MAILBOX_ELOG(pmailbox, WARNING, "(s|send)\tStream waked up by error[%d]:%s.", wr->ret, mc_strerror(errno));
gs_s_close_logic_connection(pmailbox, ECOMMTCPWAITQUOTAFAIL, options->fcmsgs);
wr->notify_remote = true;
errno = ECOMMTCPWAITQUOTAFAIL;
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
wr->ret = -1;
return -1;
}
}
return -1;
}
static int GsInternalSendRemote(LibcommSendInfo *sendInfo, struct p_mailbox* pmailbox, int nodeIdx)
{
int ret = 0;
do {
ret = g_libcomm_adapt.send_data(sendInfo);
if (ret == 0) {
(void)usleep(100);
}
} while (ret == 0);
if (ret < 0) {
MAILBOX_ELOG(pmailbox,
WARNING,
"(s|send)\tFailed to send data message on socket[%d], error[%d]:%s.",
g_instance.comm_cxt.g_senders->sender_conn[nodeIdx].socket,
errno,
mc_strerror(errno));
errno = ECOMMTCPSND;
ret = -1;
return ret;
}
if (is_tcp_mode() && g_ackchk_time > 0) {
recv_ackchk_msg(sendInfo->socket);
}
return ret;
}
static void GsUpdatePmailboxStatics(struct p_mailbox* pmailbox, int sentSize, TimeProfile* waitQuota, TimeProfile* sendProfile, TimeProfile* timeProfile)
{
if (g_instance.comm_cxt.quota_cxt.g_having_quota) {
pmailbox->bufCAP -= sentSize;
}
if (pmailbox->state != MAIL_RUN) {
LIBCOMM_ELOG(WARNING, "(s|send)\tsend state is wrong[%d].", pmailbox->state);
}
if (pmailbox->statistic != NULL) {
if (pmailbox->statistic->first_send_time == 0) {
pmailbox->statistic->first_send_time = sendProfile->start;
pmailbox->statistic->producer_elapsed_time += (uint32)ABS_SUB(timeProfile->start, pmailbox->statistic->start_time);
} else {
pmailbox->statistic->producer_elapsed_time +=
(uint32)ABS_SUB(timeProfile->start, t_thrd.comm_cxt.g_producer_process_duration);
}
pmailbox->statistic->last_send_time = sendProfile->start;
pmailbox->statistic->wait_quota_overhead += (uint32)ABS_SUB(waitQuota->end, waitQuota->start);
pmailbox->statistic->os_send_overhead += ABS_SUB(sendProfile->end, sendProfile->start);
timeProfile->end = COMM_STAT_TIME();
pmailbox->statistic->total_send_time += ABS_SUB(timeProfile->end, timeProfile->start);
pmailbox->statistic->send_bytes += (uint64)(unsigned)sentSize;
pmailbox->statistic->call_send_count++;
}
}
static int GsSendCheckParams(gsocket* gsSock, char* message, int mLen)
{
if (gsSock == NULL) {
LIBCOMM_ELOG(WARNING, "(s|send)\tInvalid argument: gsSock is NULL");
errno = ECOMMTCPARGSINVAL;
return -1;
}
int node_idx = gsSock->idx;
int streamid = gsSock->sid;
if ((message == NULL) || (mLen <= 0) || (node_idx < 0) ||
(node_idx >= g_instance.comm_cxt.counters_cxt.g_cur_node_num) || (streamid <= 0) ||
(streamid >= g_instance.comm_cxt.counters_cxt.g_max_stream_num)) {
LIBCOMM_ELOG(WARNING,
"(s|send)\tInvalid argument: %s"
"len=%d, "
"node idx=%d, stream id=%d.",
message == NULL ? "message is NULL, " : "",
mLen,
node_idx,
streamid);
errno = ECOMMTCPARGSINVAL;
return -1;
}
return 0;
}
* @Description: Communication library external interface for send.
* @IN gs_sock: all information libcomm needed.
* @IN message: data.
* @IN m_len: data len.
* @Return:-1: -1: send failed.
* m_len: send succsessed.
* @See also:
* send message to the destination datanode through logic channel
* 1. check if stream info (quota and state), sending messages whenever there is enough quota
* 2. if no quota, wait on gs_poll
* 3. decrease quota after successful send, if quota is used up, set stream in HOLD state and wait on gs_poll
* 4. return error or sent message size
*/
int gs_send(gsocket* gs_sock, char* message, int m_len, int time_out, bool block_mode)
{
if (GsSendCheckParams(gs_sock, message, m_len) < 0) {
return -1;
}
PGSTAT_INIT_TIME_RECORD();
PGSTAT_START_TIME_RECORD();
int node_idx = gs_sock->idx;
int streamid = gs_sock->sid;
int local_version = gs_sock->ver;
int remote_version = -1;
AutoContextSwitch commContext(g_instance.comm_cxt.comm_global_mem_cxt);
bool TempImmediateInterruptOK = t_thrd.int_cxt.ImmediateInterruptOK;
t_thrd.int_cxt.ImmediateInterruptOK = false;
errno = 0;
COMM_TIMER_INIT();
int ret = 0;
int res = 0;
int sent_size = 0;
bool send_msg = false;
errno_t ss_rc;
TimeProfile wait_quota = {0};
TimeProfile send_proile = {0};
TimeProfile time_profile = {0};
time_profile.start = COMM_STAT_TIME();
time_profile.end = time_profile.start;
unsigned long need_send_len = 0;
struct FCMSG_T fcmsgs = {0x0};
struct sock_id fd_id = {0, 0};
bool notify_remote = false;
struct p_mailbox* pmailbox = NULL;
WaitResult wr = {
.ret = 0,
.notify_remote = notify_remote
};
SendOptions options = {0};
if (time_out == -1) {
time_out = WAITQUOTA_TIMEOUT;
}
char* node_name = NULL;
WaitStatePhase oldPhase = pgstat_report_waitstatus_phase(PHASE_NONE, true);
if (gs_poll_create() != 0) {
LIBCOMM_ELOG(WARNING, "(s|send)\tFailed to malloc for create poll!");
return -1;
}
pmailbox = &P_MAILBOX(node_idx, streamid);
LIBCOMM_PTHREAD_MUTEX_LOCK(&pmailbox->sinfo_lock);
if (gs_check_mailbox(pmailbox->local_version, local_version) == false) {
COMM_DEBUG_LOG("(s|send)\tStream already closed, remote:%s, detail:%s.",
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx),
mc_strerror(pmailbox->close_reason));
errno = pmailbox->close_reason;
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
ret = -1;
goto return_result;
}
pmailbox->local_thread_id = t_thrd.comm_cxt.MyPid;
if ((unsigned long)(unsigned)m_len > DEFULTMSGLEN) {
need_send_len = DEFULTMSGLEN;
} else {
need_send_len = (unsigned long)(unsigned)m_len;
}
fd_id.fd = g_instance.comm_cxt.g_senders->sender_conn[node_idx].socket;
fd_id.id = g_instance.comm_cxt.g_senders->sender_conn[node_idx].socket_id;
COMM_TIMER_LOG("(s|send)\tWait quota start for node[%d,%d]:%s.",
node_idx,
streamid,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
options.need_send_len = need_send_len;
options.block_mode = block_mode;
options.time_out = time_out;
options.fcmsgs = &fcmsgs;
options.local_version = local_version;
res = GsWaitPmailboxReady(pmailbox, &options, &wait_quota, &wr);
COMM_TIMER_LOG("(s|send)\tWait quota end for node[%d,%d]:%s.",
node_idx,
streamid,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
ret = wr.ret;
notify_remote = wr.notify_remote;
if (res < 0) {
goto return_result;
}
send_proile.start = COMM_STAT_TIME();
remote_version = pmailbox->remote_version;
send_msg = gs_s_form_start_ctrl_msg(pmailbox, &fcmsgs);
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
if (send_msg && (gs_send_ctrl_msg(&g_instance.comm_cxt.g_s_node_sock[node_idx], &fcmsgs, ROLE_PRODUCER) <= 0)) {
errno = ECOMMTCPTCPDISCONNECT;
ret = -1;
goto return_result;
}
StreamTimeOSSendStart(t_thrd.pgxc_cxt.GlobalNetInstr);
COMM_TIMER_LOG("(s|send)\tSend message start for node[%d,%d]:%s.",
node_idx,
streamid,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
node_name = g_instance.comm_cxt.g_s_node_sock[node_idx].remote_nodename;
char remote_nodename[NAMEDATALEN];
int shift;
ss_rc = sscanf_s(node_name, "%d_%s", &shift, &remote_nodename, (unsigned)NAMEDATALEN);
securec_check_for_sscanf_s(ss_rc, 2, "\0", "\0");
if (0 == strcmp(remote_nodename, g_instance.comm_cxt.localinfo_cxt.g_self_nodename)) {
LIBCOMM_PTHREAD_RWLOCK_RDLOCK(&g_instance.comm_cxt.g_senders->sender_conn[node_idx].rwlock);
ret = gs_push_local_buffer(streamid, message, need_send_len, remote_version, node_name);
LIBCOMM_PTHREAD_RWLOCK_UNLOCK(&g_instance.comm_cxt.g_senders->sender_conn[node_idx].rwlock);
if (ret < 0) {
LIBCOMM_PTHREAD_MUTEX_LOCK(&pmailbox->sinfo_lock);
gs_s_close_logic_connection(pmailbox, errno, &fcmsgs);
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
if (IS_NOTIFY_REMOTE(errno)) {
notify_remote = true;
}
MAILBOX_ELOG(
pmailbox, WARNING, "(s|send)\tFailed to push local cmailbox, error[%d]:%s.", errno, mc_strerror(errno));
ret = -1;
goto return_result;
}
} else {
LibcommSendInfo send_info;
send_info.socket = fd_id.fd;
send_info.socket_id = fd_id.id;
send_info.node_idx = node_idx;
send_info.streamid = streamid;
send_info.version = remote_version;
send_info.msg = message;
send_info.msg_len = need_send_len;
ret = GsInternalSendRemote(&send_info, pmailbox, node_idx);
if (ret < 0) {
goto return_result;
}
}
sent_size = ret;
send_proile.end = COMM_STAT_TIME();
StreamTimeOSSendEnd(t_thrd.pgxc_cxt.GlobalNetInstr);
COMM_TIMER_LOG("(s|send)\tSend message end for node[%d,%d]:%s.",
node_idx,
streamid,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
LIBCOMM_PTHREAD_MUTEX_LOCK(&pmailbox->sinfo_lock);
if (gs_check_mailbox(pmailbox->local_version, local_version) == false) {
errno = pmailbox->close_reason;
COMM_DEBUG_LOG("(s|send)\tStream has already closed by remote:%s, detail:%s.",
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx),
mc_strerror(errno));
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
ret = -1;
goto return_result;
}
LIBCOMM_ASSERT((bool)(pmailbox->bufCAP >= (unsigned long)(unsigned)sent_size), node_idx, streamid, ROLE_PRODUCER);
GsUpdatePmailboxStatics(pmailbox, sent_size, &wait_quota, &send_proile, &time_profile);
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
COMM_DEBUG_LOG("(s|send)\tSend to node[%d]:%s on stream[%d] with msg:%c, m_len[%d] @ bufCAP[%lu].",
node_idx,
g_instance.comm_cxt.g_s_node_sock[node_idx].remote_nodename,
streamid,
message[0],
sent_size,
pmailbox->bufCAP);
COMM_TIMER_LOG("(s|send)\tSend finish for node[%d,%d]:%s.",
node_idx,
streamid,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
return_result:
if (notify_remote) {
(void)gs_send_ctrl_msg(&g_instance.comm_cxt.g_s_node_sock[node_idx], &fcmsgs, ROLE_PRODUCER);
}
LIBCOMM_INTERFACE_END(false, false);
t_thrd.int_cxt.ImmediateInterruptOK = TempImmediateInterruptOK;
t_thrd.comm_cxt.g_producer_process_duration = time_profile.end;
pgstat_report_waitstatus_phase(oldPhase);
END_NET_STREAM_SEND_INFO(ret);
return ret;
}
int gs_broadcast_check_paras(struct libcommaddrinfo* libcomm_addr_head, char* message, int m_len)
{
if ((libcomm_addr_head == NULL) || (message == NULL) || (m_len <= 0)) {
LIBCOMM_ELOG(WARNING,
"(s|send)\tInvalid argument: %s%s, len=%d.",
libcomm_addr_head == NULL ? "addr list is NULL, " : "",
message == NULL ? "message is NULL, " : "",
m_len);
errno = ECOMMTCPARGSINVAL;
return -1;
}
return 0;
}
void gs_clean_addrinfo(struct libcommaddrinfo* libcomm_addr_head)
{
struct libcommaddrinfo* addr_info = libcomm_addr_head;
struct p_mailbox* wait_quota_pmailbox = NULL;
int addr_num = libcomm_addr_head->addr_list_size;
int i;
for (i = 0; i < addr_num; i++) {
if (addr_info->status == BROADCAST_WAIT_QUOTA) {
addr_info->status = BROADCAST_NEED_SEND;
wait_quota_pmailbox = &P_MAILBOX(addr_info->gs_sock.idx, addr_info->gs_sock.sid);
LIBCOMM_PTHREAD_MUTEX_LOCK(&wait_quota_pmailbox->sinfo_lock);
if (true == gs_check_mailbox(wait_quota_pmailbox->local_version, addr_info->gs_sock.ver)) {
wait_quota_pmailbox->semaphore = NULL;
}
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&wait_quota_pmailbox->sinfo_lock);
}
addr_info = addr_info->addr_list_next;
}
return;
}
* @Description: send the message to all connection in address info list.
* @IN libcomm_addr_head: the head of address info list.
* @IN message: data.
* @IN m_len: data len.
* @Return: -1: broadcast send failed.
* m_len: broadcast send succsessed.
* @See also:
*/
int gs_broadcast_send(struct libcommaddrinfo* libcomm_addr_head, char* message, int m_len, int time_out)
{
int ret = gs_broadcast_check_paras(libcomm_addr_head, message, m_len);
if (ret != 0) {
return ret;
}
int i = 0;
int send_re = -1;
int wait_quota = 0;
if (time_out == -1) {
time_out = WAITQUOTA_TIMEOUT;
}
struct libcommaddrinfo* addr_info = libcomm_addr_head;
int addr_num = libcomm_addr_head->addr_list_size;
int error_conn_count = 0;
struct libcommaddrinfo* wait_addr_info = NULL;
struct p_mailbox* wait_quota_pmailbox = NULL;
if (addr_num == 0) {
LIBCOMM_ELOG(WARNING, "(s|broad case)\tNo one need send in addr list.");
errno = ECOMMTCPARGSINVAL;
return -1;
}
AutoContextSwitch commContext(g_instance.comm_cxt.comm_global_mem_cxt);
COMM_TIMER_INIT();
COMM_DEBUG_LOG("(s|broad case)\tStart to send to %d datanodes, msg_len=%d.", addr_num, m_len);
COMM_TIMER_LOG("(s|broad case)\tBroad cast send start.");
if (gs_poll_create() != 0) {
LIBCOMM_ELOG(WARNING, "(r|wait poll)\tFailed to malloc for create poll!");
return -1;
}
wait_quota = 0;
error_conn_count = 0;
addr_info = libcomm_addr_head;
for (i = 0; i < addr_num; i++) {
if (addr_info->status == BROADCAST_NEED_SEND) {
send_re = gs_send(&addr_info->gs_sock, message, m_len, time_out, FALSE);
if (send_re > 0) {
addr_info->status = BROADCAST_SEND_FINISH;
} else if (send_re == BROADCAST_WAIT_QUOTA) {
addr_info->status = BROADCAST_WAIT_QUOTA;
wait_quota_pmailbox = &P_MAILBOX(addr_info->gs_sock.idx, addr_info->gs_sock.sid);
wait_quota = 1;
wait_addr_info = addr_info;
COMM_DEBUG_LOG("(s|broad case)\tNeed wait quota for %s.", addr_info->nodename);
} else if (send_re == 0) {
LIBCOMM_ELOG(WARNING, "(s|broad case)\trecv EINTR.");
goto clean_return;
} else if (send_re < 0) {
(void)gs_s_close_stream(&addr_info->gs_sock);
addr_info->gs_sock = GS_INVALID_GSOCK;
addr_info->listen_port = 0;
addr_info->status = BROADCAST_CONNECT_CLOSED;
error_conn_count++;
COMM_DEBUG_LOG("(s|broad case)\tFail to send to %s.", addr_info->nodename);
}
} else if (addr_info->status == BROADCAST_CONNECT_CLOSED) {
error_conn_count++;
}
addr_info = addr_info->addr_list_next;
}
if (error_conn_count == addr_num) {
COMM_DEBUG_LOG("(s|broad case)\tFail to send to all connection.");
COMM_TIMER_LOG("(s|broad case)\tStart broad cast error.");
libcomm_addr_head->addr_list_size = 0;
return -1;
}
if (wait_quota == 1) {
StreamTimeWaitQuotaStart(t_thrd.pgxc_cxt.GlobalNetInstr);
COMM_DEBUG_LOG("(s|broad case)\tWait quota start.");
if (wait_addr_info != NULL) {
pgstat_report_waitstatus_comm(STATE_WAIT_FLUSH_DATA,
wait_addr_info->nodeIdx,
-1,
u_sess->stream_cxt.producer_obj->getParentPlanNodeId(),
global_node_definition ? global_node_definition->num_nodes : -1);
}
COMM_TIMER_LOG("(s|broad case)\tWait quota start.");
uint64 wait_quota_start = mc_timers_ms();
(void)gs_poll(time_out);
uint64 wait_quota_end = mc_timers_ms();
COMM_TIMER_LOG("(s|broad case)\tWait quota end.");
LIBCOMM_PTHREAD_MUTEX_LOCK(&wait_quota_pmailbox->sinfo_lock);
COMM_STAT_CALL(wait_quota_pmailbox,
wait_quota_pmailbox->statistic->wait_quota_overhead += ABS_SUB(wait_quota_end, wait_quota_start));
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&wait_quota_pmailbox->sinfo_lock);
StreamTimeWaitQuotaEnd(t_thrd.pgxc_cxt.GlobalNetInstr);
goto clean_return;
}
addr_info = libcomm_addr_head;
while (addr_info != NULL) {
if (addr_info->status == BROADCAST_SEND_FINISH) {
addr_info->status = BROADCAST_NEED_SEND;
}
addr_info = addr_info->addr_list_next;
}
COMM_TIMER_LOG("(s|broad case)\tBroad cast send end.");
return m_len;
clean_return:
gs_clean_addrinfo(libcomm_addr_head);
return 0;
}
int gs_wait_poll(gsocket* gs_sock_array,
int nproducer,
int* producer,
int timeout,
bool close_expected)
{
if ((gs_sock_array == NULL) || (producer == NULL) || (nproducer <= 0)) {
LIBCOMM_ELOG(WARNING,
"(r|wait poll)\tInvalid argument: %s%s"
"nproducer=%d.",
gs_sock_array == NULL ? "gs_sock_array is NULL, " : "",
producer == NULL ? "producer is NULL, " : "",
nproducer);
errno = ECOMMTCPARGSINVAL;
return -1;
}
AutoContextSwitch commContext(g_instance.comm_cxt.comm_global_mem_cxt);
if (gs_poll_create() != 0) {
LIBCOMM_ELOG(WARNING, "(r|wait poll)\tPoll create failed! Detail:%s.", mc_strerror(errno));
return -1;
}
int ret = 0;
const int interruptTimeOut = 60;
struct libcomm_time_record time_record = {0};
time_record.time_enter = COMM_STAT_TIME();
time_record.time_now = time_record.time_enter;
bool TempImmediateInterruptOK = t_thrd.int_cxt.ImmediateInterruptOK;
t_thrd.int_cxt.ImmediateInterruptOK = false;
errno = 0;
check_cmailbox_option opt = {&time_record, 1, 0};
COMM_TIMER_INIT();
time_record.t_begin = t_begin;
COMM_TIMER_LOG("(r|wait poll)\tStart timer log.");
for (;;) {
time_record.wait_lock_start = COMM_STAT_TIME();
ret = gs_check_cmailbox_data(gs_sock_array, nproducer, producer, close_expected, opt);
if (ret != 0) {
goto return_result;
}
opt.first_cycle = 0;
int pool_re = 0;
int time_out = -1;
if (timeout > 0) {
time_out = timeout;
}
if (InterruptPending || t_thrd.int_cxt.ProcDiePending) {
time_out = (timeout > 0) ? timeout : interruptTimeOut;
LIBCOMM_ELOG(WARNING, "(r|wait poll)\t The gs_wait_poll receive Cancel Interrupt.");
}
time_record.wait_data_start = COMM_STAT_TIME();
COMM_TIMER_LOG("(r|wait poll)\tWait poll start.");
pool_re = gs_poll(time_out);
COMM_TIMER_LOG("(r|wait poll)\tWait poll end.");
time_record.wait_data_end = COMM_STAT_TIME();
if (pool_re == 0) {
* if there is no interrupt, the caller will call this function again
*/
if (InterruptPending || t_thrd.int_cxt.ProcDiePending) {
ret = 0;
goto return_result;
} else if (t_thrd.int_cxt.ProcDiePending) {
ret = -1;
} else {
continue;
}
}
if (-1 == pool_re) {
opt.poll_error_flag = 1;
continue;
}
if (pool_re == ETIMEDOUT) {
LIBCOMM_ELOG(WARNING, "(r|wait poll)\tFailed to wait for notify, timeout:%ds.", time_out);
errno = ECOMMTCPEPOLLTIMEOUT;
} else {
LIBCOMM_ELOG(WARNING, "(r|wait poll)\tFailed to wait poll, detail: %s.", mc_strerror(errno));
errno = ECOMMTCPWAITPOLLERROR;
}
ret = -1;
goto return_result;
}
return_result:
gs_check_all_producers_mailbox(gs_sock_array, nproducer, producer);
LIBCOMM_INTERFACE_END((ret == 0), TempImmediateInterruptOK);
t_thrd.comm_cxt.g_consumer_process_duration = COMM_STAT_TIME();
return ret;
}