* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* libcomm_shakehands.cpp
*
* IDENTIFICATION
* src/gausskernel/cbb/communication/libcomm_utils/libcomm_shakehands.cpp
*
* -------------------------------------------------------------------------
*/
#include <arpa/inet.h>
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <libcgroup.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <net/if.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/param.h>
#include <sys/time.h>
#include <unistd.h>
#include "../libcomm_core/mc_tcp.h"
#include "../libcomm_core/mc_poller.h"
#include "../libcomm_utils/libcomm_thread.h"
#include "../libcomm_common.h"
#include "libcomm_lqueue.h"
#include "libcomm_queue.h"
#include "libcomm_lock_free_queue.h"
#include "distributelayer/streamCore.h"
#include "distributelayer/streamProducer.h"
#include "pgxc/poolmgr.h"
#include "libpq/auth.h"
#include "libpq/pqsignal.h"
#include "storage/ipc.h"
#include "utils/ps_status.h"
#include "utils/dynahash.h"
#include "vecexecutor/vectorbatch.h"
#include "vecexecutor/vecnodes.h"
#include "executor/exec/execStream.h"
#include "miscadmin.h"
#include "gssignal/gs_signal.h"
#include "pgxc/pgxc.h"
#ifdef ENABLE_UT
#define static
#endif
#ifdef USE_SPQ
extern void gs_s_build_reply_conntion(libcommaddrinfo* addr_info, int remote_version);
#endif
* function name : gs_r_build_reply_connection
* description : as the connection between cn & dn is duplex.
* dn need to build an inverse connection to cn.
* this func will:
* 1. build an inverse physical conn if needed.
* 2. build an inverse logic conn
* 3. initial pmailbox
* arguments : fcmsgr: provides gs_sock and remote port
* return value : -1: error
* : 0:Succeed
*/
#ifdef USE_SPQ
int gs_r_build_reply_connection(BackConnInfo* fcmsgr, int local_version, uint16 *sid)
#else
static int gs_r_build_reply_connection(FCMSG_T* fcmsgr, int local_version)
#endif
{
errno_t ss_rc;
uint32 cpylen;
uint16 node_idx = fcmsgr->node_idx;
#ifdef USE_SPQ
int streamid = gs_get_stream_id(node_idx);
*sid = streamid;
#else
int streamid = fcmsgr->streamid;
#endif
Assert(streamid >= 0);
uint16 remote_version = fcmsgr->version;
char remote_host[HOST_ADDRSTRLEN] = {0x0};
char remote_nodename[NAMEDATALEN] = {0x0};
cpylen = comm_get_cpylen(g_instance.comm_cxt.g_r_node_sock[node_idx].remote_host, HOST_ADDRSTRLEN);
ss_rc = memset_s(remote_host, HOST_ADDRSTRLEN, 0x0, HOST_ADDRSTRLEN);
securec_check(ss_rc, "\0", "\0");
ss_rc =
strncpy_s(remote_host, HOST_ADDRSTRLEN, g_instance.comm_cxt.g_r_node_sock[node_idx].remote_host, cpylen + 1);
securec_check(ss_rc, "\0", "\0");
remote_host[cpylen] = '\0';
cpylen = comm_get_cpylen(g_instance.comm_cxt.g_r_node_sock[node_idx].remote_nodename, NAMEDATALEN);
ss_rc = memset_s(remote_nodename, NAMEDATALEN, 0x0, NAMEDATALEN);
securec_check(ss_rc, "\0", "\0");
ss_rc = strncpy_s(
remote_nodename, NAMEDATALEN, g_instance.comm_cxt.g_r_node_sock[node_idx].remote_nodename, cpylen + 1);
securec_check(ss_rc, "\0", "\0");
remote_nodename[cpylen] = '\0';
int remote_ctrl_port = (int)(fcmsgr->streamcap >> 32);
int remote_data_port = (int)(fcmsgr->streamcap);
libcommaddrinfo libcomm_addrinfo;
libcomm_addrinfo.host = remote_host;
libcomm_addrinfo.ctrl_port = remote_ctrl_port;
libcomm_addrinfo.listen_port = remote_data_port;
cpylen = comm_get_cpylen(remote_nodename, NAMEDATALEN);
ss_rc = memset_s(libcomm_addrinfo.nodename, NAMEDATALEN, 0x0, NAMEDATALEN);
securec_check(ss_rc, "\0", "\0");
ss_rc = strncpy_s(libcomm_addrinfo.nodename, NAMEDATALEN, remote_nodename, cpylen + 1);
securec_check(ss_rc, "\0", "\0");
libcomm_addrinfo.nodename[cpylen] = '\0';
#ifdef USE_SPQ
libcomm_addrinfo.gs_sock.idx = fcmsgr->node_idx;
libcomm_addrinfo.gs_sock.sid = streamid;
libcomm_addrinfo.gs_sock.ver = local_version;
libcomm_addrinfo.streamKey = fcmsgr->stream_key;
#endif
COMM_DEBUG_LOG("(r|build reply conn)\tBuild TCP connect for node[%d]:%s.",
node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
if (unlikely(gs_s_check_connection(&libcomm_addrinfo, node_idx, true, CTRL_CHANNEL) == false)) {
LIBCOMM_ELOG(WARNING,
"(r|build reply conn)\tFailed to connect to host:port[%s:%d], node name[%s].",
libcomm_addrinfo.host,
libcomm_addrinfo.ctrl_port,
remote_nodename);
return -1;
}
* Check and build logic connection whit the remote point (if need)
*/
COMM_DEBUG_LOG("(r|build reply conn)\tBuild data connection for node[%d]:%s.",
node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
if (unlikely(gs_s_check_connection(&libcomm_addrinfo, node_idx, true, DATA_CHANNEL) == false)) {
LIBCOMM_ELOG(WARNING,
"(r|build reply conn)\tFailed to build data connection "
"to %s:%d for node[%d]:%s, detail:%s.",
libcomm_addrinfo.host,
libcomm_addrinfo.listen_port,
node_idx,
libcomm_addrinfo.nodename,
mc_strerror(errno));
errno = ECOMMTCPCONNFAIL;
return -1;
}
COMM_DEBUG_LOG("(r|build reply conn)\tBuild data logical connection for node[%d]:%s.",
node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
#ifdef USE_SPQ
char* node_name = g_instance.comm_cxt.g_s_node_sock[node_idx].remote_nodename;
if (0 == strcmp(g_instance.comm_cxt.localinfo_cxt.g_self_nodename, node_name)) {
QCConnKey key = {
.query_id = fcmsgr->stream_key.queryId,
.plan_node_id = fcmsgr->stream_key.planNodeId,
.node_id = node_idx,
.type = SPQ_QE_CONNECTION,
};
bool found = false;
QCConnEntry* entry;
entry = (QCConnEntry*)hash_search(g_instance.spq_cxt.adp_connects, (void*)&key, HASH_FIND, &found);
if (found) {
entry->backward = {
.idx = node_idx,
.sid = (uint16)streamid,
.ver = remote_version,
.type = GSOCK_CONSUMER,
};
gs_s_build_reply_conntion(&libcomm_addrinfo, local_version);
} else {
LIBCOMM_ELOG(WARNING, "(s|connect)\tFailed to build local connection to node[%d]:%s, detail:%s.", node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx), mc_strerror(errno));
errno = ECOMMTCPTCPDISCONNECT;
gs_close_gsocket(fcmsgr->backward);
return -1;
}
} else {
struct FCMSG_T fcmsgs = {0x0};
fcmsgs.type = CTRL_BACKWARD_REGIST;
fcmsgs.extra_info = 0;
fcmsgs.node_idx = node_idx;
fcmsgs.streamid = streamid;
fcmsgs.version = remote_version;
fcmsgs.stream_key = libcomm_addrinfo.streamKey;
fcmsgs.query_id = fcmsgr->query_id;
cpylen = comm_get_cpylen(g_instance.comm_cxt.localinfo_cxt.g_self_nodename, NAMEDATALEN);
ss_rc = memset_s(fcmsgs.nodename, NAMEDATALEN, 0x0, NAMEDATALEN);
securec_check(ss_rc, "\0", "\0");
ss_rc = strncpy_s(fcmsgs.nodename, NAMEDATALEN, g_instance.comm_cxt.localinfo_cxt.g_self_nodename, cpylen + 1);
securec_check(ss_rc, "\0", "\0");
fcmsgs.nodename[cpylen] = '\0';
fcmsgs.streamcap = fcmsgr->streamcap;
int rc = gs_send_ctrl_msg(&g_instance.comm_cxt.g_s_node_sock[node_idx], &fcmsgs, ROLE_PRODUCER);
if (rc <= 0) {
LIBCOMM_ELOG(WARNING, "(s|connect)\tFailed to send ready msg to node[%d]:%s, detail:%s.", node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx), mc_strerror(errno));
errno = ECOMMTCPTCPDISCONNECT;
gs_close_gsocket(fcmsgr->backward);
return -1;
}
char ack;
do {
rc = gs_recv(fcmsgr->backward, &ack, sizeof(char));
} while (rc == 0 || errno == ECOMMTCPNODATA);
if (rc < 0 || ack != 'o') {
gs_close_gsocket(fcmsgr->backward);
return -1;
}
}
#endif
struct p_mailbox* pmailbox = &P_MAILBOX(node_idx, streamid);
LIBCOMM_PTHREAD_MUTEX_LOCK(&pmailbox->sinfo_lock);
if (pmailbox->state != MAIL_CLOSED) {
MAILBOX_ELOG(pmailbox,
WARNING,
"(r|build reply conn)\tFailed to get mailbox for node[%d,%d]:%s.",
node_idx,
streamid,
REMOTE_NAME(g_instance.comm_cxt.g_s_node_sock, node_idx));
gs_s_close_logic_connection(pmailbox, ECOMMTCPREMOETECLOSE, NULL);
}
pmailbox->local_version = local_version;
pmailbox->remote_version = remote_version;
pmailbox->ctrl_tcp_sock = g_instance.comm_cxt.g_s_node_sock[node_idx].ctrl_tcp_sock;
pmailbox->state = MAIL_RUN;
pmailbox->bufCAP = DEFULTMSGLEN;
pmailbox->stream_key = fcmsgr->stream_key;
pmailbox->query_id = fcmsgr->query_id;
pmailbox->local_thread_id = 0;
pmailbox->peer_thread_id = 0;
pmailbox->close_reason = 0;
COMM_STAT_CALL(pmailbox, pmailbox->statistic->start_time = (uint32)mc_timers_ms());
if (g_instance.comm_cxt.commutil_cxt.g_stat_mode && (NULL == pmailbox->statistic)) {
LIBCOMM_MALLOC(pmailbox->statistic, sizeof(struct pmailbox_statistic), pmailbox_statistic);
if (NULL == pmailbox->statistic) {
errno = ECOMMTCPRELEASEMEM;
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
return -1;
}
}
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
return 0;
}
* function name : gs_r_close_logic_connection
* description : consumer close logic connetion and reset mailbox,
* if producer call this, we only set state is CTRL_TO_CLOSE,
* then really close when consumer call this.
* notice : we must get mailbox lock before
* arguments : _in_ cmailbox: libcomm conntion info.
* _in_ close_reason: close reason.
*/
void gs_r_close_logic_connection(struct c_mailbox* cmailbox, int close_reason, FCMSG_T* msg)
{
errno_t ss_rc;
uint32 cpylen;
if (cmailbox->state == MAIL_CLOSED) {
return;
}
if (ENABLE_THREAD_POOL_DN_LOGICCONN) {
NotifyListener(cmailbox, true, __FUNCTION__);
}
if (IS_NOTIFY_REMOTE(close_reason) && msg) {
msg->type = CTRL_CLOSED;
msg->node_idx = cmailbox->idx;
msg->streamid = cmailbox->streamid;
msg->streamcap = 0;
msg->version = cmailbox->remote_version;
msg->query_id = cmailbox->query_id;
ss_rc = memset_s(msg->nodename, NAMEDATALEN, 0x0, NAMEDATALEN);
securec_check(ss_rc, "\0", "\0");
ss_rc = sprintf_s(msg->nodename, NAMEDATALEN, "%d_%s",
cmailbox->shift, g_instance.comm_cxt.localinfo_cxt.g_self_nodename);
securec_check_ss_c(ss_rc, "\0", "\0");
}
gs_poll_signal(cmailbox->semaphore);
gs_r_reset_cmailbox(cmailbox, close_reason);
return;
}
uint64 gs_get_recv_ready_time()
{
return g_instance.comm_cxt.localinfo_cxt.g_r_first_recv_time;
}
* function name : gs_receivers_flow_handle_ready_request
* description : handle ready request when received MAIL_READY.
* arguments :
* _in_ fcmsgr: the message that receivers_flow thread received.
* _in_ t_fd_id: the socket that receivers_flow thread used.
* producer is building a logic connection,
* we get stream index from the message,
* and initialize consumer mailbox,
* then reply MAIL_READY message to producer.
*/
void gs_receivers_flow_handle_ready_request(FCMSG_T* fcmsgr)
{
uint16 streamid = fcmsgr->streamid;
uint16 node_idx = fcmsgr->node_idx;
int remote_verion = fcmsgr->version;
int ctrl_socket = g_instance.comm_cxt.g_r_node_sock[node_idx].ctrl_tcp_sock;
struct FCMSG_T fcmsgs = {0x0};
struct c_mailbox* cmailbox = NULL;
int rc = -1;
uint64 time_now;
uint64 time_callback_start = 0;
uint64 time_callback_end = 0;
uint64 deal_time = 0;
errno_t ss_rc;
uint32 cpylen;
gsocket gs_sock = {0};
uint16 local_version;
long add_quota = DEFULTMSGLEN;
StreamConnInfo connInfo = {0};
COMM_TIMER_INIT();
#ifdef LIBCOMM_FAULT_INJECTION_ENABLE
if (is_comm_fault_injection(LIBCOMM_FI_CONSUMER_REJECT)) {
LIBCOMM_ELOG(WARNING, "(r|flow ctrl)\t[FAULT INJECTION]Consumer can not accept.");
errno = ECOMMTCPAPPCLOSE;
goto accept_failed;
}
#endif
Assert(node_idx >= 0);
cmailbox = &C_MAILBOX(node_idx, streamid);
LIBCOMM_PTHREAD_MUTEX_LOCK(&cmailbox->sinfo_lock);
if (cmailbox->state != MAIL_CLOSED) {
MAILBOX_ELOG(cmailbox,
WARNING,
"(r|flow ctrl)\tFailed to get mailbox for node[%d]:%s.",
node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_r_node_sock, node_idx));
gs_r_close_logic_connection(cmailbox, ECOMMTCPREMOETECLOSE, NULL);
}
local_version = cmailbox->local_version + 1;
if (local_version >= MAX_MAILBOX_VERSION) {
local_version = 0;
}
char nodename[NAMEDATALEN];
int shift;
ss_rc = sscanf_s(fcmsgr->nodename, "%d_%s", &shift, &nodename, (unsigned)NAMEDATALEN);
securec_check_for_sscanf_s(ss_rc, 2, "\0", "\0");
ss_rc = sprintf_s(nodename, NAMEDATALEN, "%d_%s", shift, g_instance.comm_cxt.localinfo_cxt.g_self_nodename);
securec_check_ss_c(ss_rc, "\0", "\0");
cmailbox->local_version = local_version;
cmailbox->remote_version = remote_verion;
cmailbox->ctrl_tcp_sock = ctrl_socket;
cmailbox->state = MAIL_RUN;
cmailbox->bufCAP += add_quota;
cmailbox->stream_key = fcmsgr->stream_key;
cmailbox->query_id = fcmsgr->query_id;
cmailbox->local_thread_id = 0;
cmailbox->peer_thread_id = 0;
cmailbox->close_reason = 0;
cmailbox->shift = shift;
if (g_instance.comm_cxt.commutil_cxt.g_stat_mode && (cmailbox->statistic == NULL)) {
LIBCOMM_MALLOC(cmailbox->statistic, sizeof(struct cmailbox_statistic), cmailbox_statistic);
if (cmailbox->statistic == NULL) {
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&cmailbox->sinfo_lock);
errno = ECOMMTCPRELEASEMEM;
goto accept_failed;
}
}
time_now = COMM_STAT_TIME();
COMM_STAT_CALL(cmailbox, cmailbox->statistic->start_time = (uint32)time_now);
COMM_DEBUG_LOG("(r|flow ctrl)\tNode[%d] stream[%d], node name[%s] is in state[%s].",
node_idx,
streamid,
g_instance.comm_cxt.g_r_node_sock[node_idx].remote_nodename,
stream_stat_string(cmailbox->state));
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&cmailbox->sinfo_lock);
deal_time = ABS_SUB((long long int)time(NULL), (long long int)gs_get_recv_ready_time());
if ((long long int)mc_tcp_get_connect_timeout() < (long long int)deal_time) {
LIBCOMM_ELOG(WARNING, "(r|flow ctrl)\tIt takes %lus to process the ready message, query id:%lu.",
deal_time, u_sess->debug_query_id);
}
#ifndef USE_SPQ
if (fcmsgr->type == CTRL_CONN_DUAL) {
u_sess->pgxc_cxt.NumDataNodes = (int)(fcmsgr->extra_info);
if (gs_r_build_reply_connection(fcmsgr, local_version) != 0) {
LIBCOMM_PTHREAD_MUTEX_LOCK(&cmailbox->sinfo_lock);
gs_r_close_logic_connection(cmailbox, ECOMMTCPTCPDISCONNECT, NULL);
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&cmailbox->sinfo_lock);
goto accept_failed;
}
}
#endif
gs_sock.idx = node_idx;
gs_sock.sid = streamid;
gs_sock.ver = local_version;
if (fcmsgr->type == CTRL_CONN_DUAL) {
#ifdef USE_SPQ
bool found = false;
QCConnKey key = {
.query_id = fcmsgr->stream_key.queryId,
.plan_node_id = fcmsgr->stream_key.planNodeId,
.node_id = 0,
.type = SPQ_QC_CONNECTION,
};
gs_sock.type = GSOCK_CONSUMER;
pthread_rwlock_wrlock(&g_instance.spq_cxt.adp_connects_lock);
QCConnEntry* entry = (QCConnEntry*)hash_search(g_instance.spq_cxt.adp_connects, (void*)&key, HASH_ENTER, &found);
if (!found) {
entry->backward = gs_sock;
entry->forward.idx = 0;
entry->streamcap = fcmsgr->streamcap;
}
pthread_rwlock_unlock(&g_instance.spq_cxt.adp_connects_lock);
ereport(LOG, (errmsg("Receive dual connect request from qc, queryid: %lu, nodeid: %u",
fcmsgr->stream_key.queryId, fcmsgr->stream_key.planNodeId)));
#else
gs_sock.type = GSOCK_DAUL_CHANNEL;
rc = gs_send_msg_by_unix_domain((void*)&gs_sock, sizeof(gs_sock));
if (rc <= 0) {
LIBCOMM_ELOG(WARNING,
"(r|flow ctrl)\t fail to notify main thread from node[%d]:%s with socket[%d].",
node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_r_node_sock, node_idx),
ctrl_socket);
gs_close_gsocket(&gs_sock);
goto accept_failed;
}
#endif
}
fcmsgs.type = CTRL_CONN_ACCEPT;
fcmsgs.node_idx = node_idx;
fcmsgs.streamid = streamid;
fcmsgs.streamcap = add_quota;
fcmsgs.version = remote_verion;
fcmsgs.query_id = fcmsgr->query_id;
fcmsgs.extra_info = local_version;
cpylen = comm_get_cpylen(nodename, NAMEDATALEN);
ss_rc = memset_s(fcmsgs.nodename, NAMEDATALEN, 0x0, NAMEDATALEN);
securec_check(ss_rc, "\0", "\0");
ss_rc = strncpy_s(fcmsgs.nodename, NAMEDATALEN, nodename, cpylen + 1);
securec_check(ss_rc, "\0", "\0");
fcmsgs.nodename[cpylen] = '\0';
rc = gs_send_ctrl_msg(&g_instance.comm_cxt.g_r_node_sock[node_idx], &fcmsgs, ROLE_CONSUMER);
if (rc <= 0) {
errno = ECOMMTCPTCPDISCONNECT;
LIBCOMM_ELOG(WARNING,
"(r|flow ctrl)\tFailed to send ready msg to node[%d]:%s with socket[%d]:%s.",
node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_r_node_sock, node_idx),
ctrl_socket,
mc_strerror(errno));
return;
}
COMM_DEBUG_LOG("(r|flow ctrl)\tSuccess to send CTRL_CONN_ACCEPT msg to node[%d]:%s with socket[%d].",
node_idx, REMOTE_NAME(g_instance.comm_cxt.g_r_node_sock, node_idx), ctrl_socket);
deal_time = ABS_SUB((long long int)time(NULL), (long long int)gs_get_recv_ready_time());
if ((uint64)(unsigned)mc_tcp_get_connect_timeout() < deal_time) {
LIBCOMM_ELOG(WARNING, "(r|flow ctrl)\tIt takes %lus to process the ready message, query id:%lu.",
deal_time, u_sess->debug_query_id);
}
if (fcmsgr->type == CTRL_CONN_DUAL) {
return;
}
#ifdef LIBCOMM_SPEED_TEST_ENABLE
else if (fcmsgr->stream_key.queryId == LIBCOMM_PERFORMANCE_PLAN_ID) {
;
}
#endif
else {
Assert(g_instance.comm_cxt.gs_wakeup_consumer != NULL);
gs_sock.type = GSOCK_CONSUMER;
ss_rc = memset_s(&connInfo, sizeof(StreamConnInfo), 0x0, sizeof(StreamConnInfo));
securec_check(ss_rc, "\0", "\0");
connInfo.port.libcomm_layer.gsock = gs_sock;
cpylen = comm_get_cpylen(fcmsgr->nodename, NAMEDATALEN);
ss_rc = memset_s(connInfo.nodeName, NAMEDATALEN, 0x0, NAMEDATALEN);
securec_check(ss_rc, "\0", "\0");
ss_rc = strncpy_s(connInfo.nodeName, NAMEDATALEN, fcmsgr->nodename, cpylen + 1);
securec_check(ss_rc, "\0", "\0");
connInfo.nodeName[cpylen] = '\0';
time_callback_start = time(NULL);
bool wakeup_if = (*g_instance.comm_cxt.gs_wakeup_consumer)(fcmsgr->stream_key, connInfo);
time_callback_end = time(NULL);
deal_time = ABS_SUB(time_callback_end, time_callback_start);
if ((uint64)(unsigned)mc_tcp_get_connect_timeout() < deal_time) {
LIBCOMM_ELOG(WARNING,
"(r|flow ctrl)\tWake up consumer timeout for node[%d]:%s, it takes %lus, \
because the lock wait timeout, query id:%lu :%s.",
node_idx, REMOTE_NAME(g_instance.comm_cxt.g_r_node_sock, node_idx),
deal_time, u_sess->debug_query_id, mc_strerror(errno));
}
if (!wakeup_if) {
COMM_DEBUG_LOG("(r|flow ctrl)\tFailed to wake up consumer "
"for node[%d]:%s, query maybe already quit:%s.",
node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_r_node_sock, node_idx),
mc_strerror(errno));
(void)gs_r_close_stream(&gs_sock);
return;
}
}
COMM_TIMER_LOG("(r|flow ctrl)\tWake up consumer, node[%d, %d]:%s.",
node_idx,
streamid,
REMOTE_NAME(g_instance.comm_cxt.g_r_node_sock, node_idx));
return;
accept_failed:
fcmsgs.type = CTRL_CONN_REJECT;
fcmsgs.node_idx = node_idx;
fcmsgs.streamid = streamid;
fcmsgs.streamcap = 0;
fcmsgs.version = remote_verion;
fcmsgs.query_id = fcmsgr->query_id;
ss_rc = memset_s(fcmsgs.nodename, NAMEDATALEN, 0x0, NAMEDATALEN);
securec_check(ss_rc, "\0", "\0");
ss_rc = sprintf_s(fcmsgs.nodename, NAMEDATALEN, "%d_%s", shift, g_instance.comm_cxt.localinfo_cxt.g_self_nodename);
securec_check_ss_c(ss_rc, "\0", "\0");
rc = gs_send_ctrl_msg(&g_instance.comm_cxt.g_r_node_sock[node_idx], &fcmsgs, ROLE_CONSUMER);
if (rc <= 0) {
LIBCOMM_ELOG(WARNING,
"(r|flow ctrl)\tFailed to send init msg to node[%d]:%s with socket[%d]:%s.",
node_idx,
REMOTE_NAME(g_instance.comm_cxt.g_r_node_sock, node_idx),
ctrl_socket,
mc_strerror(errno));
}
return;
}
* function name : gs_receivers_flow_handle_close_request
* description : handle close request when received MAIL_CLOSED.
* arguments : _in_ fcmsgr: the message that receivers_flow thread received.
* Producer closed the stream, which indicate error happened when sender is sending data
* if it is a Closed message, we should close local mailbox and tell the thread of executor to quit
*/
void gs_receivers_flow_handle_close_request(FCMSG_T* fcmsgr)
{
int streamid = fcmsgr->streamid;
int node_idx = fcmsgr->node_idx;
struct c_mailbox* cmailbox = NULL;
cmailbox = &(C_MAILBOX(node_idx, streamid));
LIBCOMM_PTHREAD_MUTEX_LOCK(&(cmailbox->sinfo_lock));
if (false == gs_check_mailbox(cmailbox->local_version, fcmsgr->version)) {
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&(cmailbox->sinfo_lock));
return;
}
COMM_DEBUG_LOG("(r|flow ctrl)\tStream[%d] is closed "
"by remote node[%d]:%s, query[%lu].",
streamid,
node_idx,
g_instance.comm_cxt.g_r_node_sock[node_idx].remote_nodename,
fcmsgr->query_id);
gs_r_close_logic_connection(cmailbox, ECOMMTCPREMOETECLOSE, NULL);
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&(cmailbox->sinfo_lock));
return;
}
* function name : gs_receivers_flow_handle_assert_fail_request
* description : handle assert fail when receivers_flow thread received CTRL_ASSERT_FAIL.
* arguments : _in_ fcmsgr: the message that receivers_flow thread received.
* return value : void
* if the sender assert fail and core, we will get a CTRL_ASSERT_FAIL message,
* we should make the consumer core as well
*/
void gs_receivers_flow_handle_assert_fail_request(FCMSG_T* fcmsgr)
{
int sidx = fcmsgr->streamid;
int nidx = fcmsgr->node_idx;
struct c_mailbox* cmailbox = NULL;
cmailbox = &(C_MAILBOX(nidx, sidx));
LIBCOMM_ELOG(WARNING,
"(r|flow ctrl)\tNode[%d] stream[%d] assert fail, node name[%s] with state[%d] has bufCAP[%lu] and "
"buff_q->u_size[%lu].",
nidx,
sidx,
g_instance.comm_cxt.g_r_node_sock[nidx].remote_nodename,
cmailbox->state,
cmailbox->bufCAP,
cmailbox->buff_q->u_size);
MAILBOX_ELOG(cmailbox, WARNING, "(r|flow ctrl)\tMailbox Info which assert fail.");
Assert(0 != 0);
}
* function name : gs_senders_flow_handle_tid_request
* description : save peer thread id when received CTRL_PEER_TID.
* arguments : _in_ fcmsgr: the message that senders_flow thread received.
* return value : void
*/
void gs_senders_flow_handle_tid_request(FCMSG_T* fcmsgr)
{
int streamid = fcmsgr->streamid;
int node_idx = fcmsgr->node_idx;
int version = fcmsgr->version;
struct p_mailbox* pmailbox = NULL;
pmailbox = &P_MAILBOX(node_idx, streamid);
LIBCOMM_PTHREAD_MUTEX_LOCK(&pmailbox->sinfo_lock);
if (gs_check_mailbox(pmailbox->local_version, version) == false) {
MAILBOX_ELOG(pmailbox, WARNING, "(s|flow ctrl)\tStream has already closed.");
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
return;
}
pmailbox->peer_thread_id = fcmsgr->extra_info;
pmailbox->bufCAP += fcmsgr->streamcap;
if (pmailbox->state == MAIL_HOLD) {
gs_poll_signal(pmailbox->semaphore);
}
COMM_DEBUG_LOG("(s|flow ctrl)\tWake up mailbox[%d][%d], node[%s], bufCAP[%lu] add[%ld] type[%s].",
node_idx,
streamid,
g_instance.comm_cxt.g_s_node_sock[node_idx].remote_nodename,
pmailbox->bufCAP,
fcmsgr->streamcap,
stream_stat_string(pmailbox->state));
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
}
* function name : gs_senders_flow_handle_init_request
* description : handle init request when received CTRL_INIT.
* arguments : _in_ fcmsgr: the message that senders_flow thread received.
* return value : void
* if the receiver failed to get an usable stream index, we will get a CTRL_INIT message,
* we should tell the executor thread to exit and report the error.
*/
void gs_senders_flow_handle_init_request(FCMSG_T* fcmsgr)
{
int streamid = fcmsgr->streamid;
int node_idx = fcmsgr->node_idx;
struct p_mailbox* pmailbox = NULL;
pmailbox = &P_MAILBOX(node_idx, streamid);
LIBCOMM_PTHREAD_MUTEX_LOCK(&pmailbox->sinfo_lock);
if (gs_check_mailbox(pmailbox->local_version, fcmsgr->version) == false) {
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
return;
}
gs_s_close_logic_connection(pmailbox, ECOMMTCPREJECTSTREAM, NULL);
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
}
* function name : gs_senders_flow_handle_ready_request
* description : handle ready request when received MAIL_READY.
* arguments : _in_ fcmsgr: the message that senders_flow thread received.
* return value : void
* if the receiver succeed to get an usable stream index, we will get a MAIL_READY message,
* we should change the state of pmailbox and tell the executor thread to continue
*/
void gs_senders_flow_handle_ready_request(FCMSG_T* fcmsgr)
{
int streamid = fcmsgr->streamid;
int node_idx = fcmsgr->node_idx;
struct p_mailbox* pmailbox = NULL;
uint64 time_now = 0;
pmailbox = &P_MAILBOX(node_idx, streamid);
LIBCOMM_PTHREAD_MUTEX_LOCK(&pmailbox->sinfo_lock);
if (gs_check_mailbox(pmailbox->local_version, fcmsgr->version) == false) {
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
return;
}
if (pmailbox->state != MAIL_READY) {
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
return;
}
pmailbox->state = MAIL_RUN;
pmailbox->bufCAP += fcmsgr->streamcap;
pmailbox->remote_version = (uint16)(fcmsgr->extra_info);
COMM_DEBUG_LOG("(s|flow ctrl)\tnode[%d] stream[%d] is in state[%s], node name[%s], bufCAP[%lu].",
node_idx,
streamid,
stream_stat_string(pmailbox->state),
g_instance.comm_cxt.g_s_node_sock[node_idx].remote_nodename,
pmailbox->bufCAP);
gs_poll_signal(pmailbox->semaphore);
if (pmailbox->statistic != NULL) {
time_now = COMM_STAT_TIME();
pmailbox->statistic->connect_time = ABS_SUB(time_now, pmailbox->statistic->connect_time);
}
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
}
* function name : gs_senders_flow_handle_resume_request
* description : handle resume request when received MAIL_RUN.
* arguments : _in_ fcmsgr: the message that senders_flow thread received.
* return value : void
* if the receiver have enough buffer to continue receive, we will get a MAIL_RUN message,
* we should change the state of pmailbox and tell the executor thread to continue
*/
void gs_senders_flow_handle_resume_request(FCMSG_T* fcmsgr)
{
int streamid = fcmsgr->streamid;
int node_idx = fcmsgr->node_idx;
struct p_mailbox* pmailbox = NULL;
pmailbox = &P_MAILBOX(node_idx, streamid);
LIBCOMM_PTHREAD_MUTEX_LOCK(&pmailbox->sinfo_lock);
if (gs_check_mailbox(pmailbox->local_version, fcmsgr->version) ==
false) {
if (pmailbox->state != MAIL_CLOSED) {
MAILBOX_ELOG(pmailbox, WARNING, "(s|flow ctrl)\tStream not work.");
}
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
return;
}
pmailbox->bufCAP += fcmsgr->streamcap;
COMM_DEBUG_LOG("(s|flow ctrl)\tWake up node[%d] stream[%d], node name[%s], bufCAP[%lu] add[%ld].",
node_idx,
streamid,
g_instance.comm_cxt.g_s_node_sock[node_idx].remote_nodename,
pmailbox->bufCAP,
fcmsgr->streamcap);
if (pmailbox->state == MAIL_HOLD) {
gs_poll_signal(pmailbox->semaphore);
}
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
}
* function name : gs_senders_flow_handle_close_request
* description : handle close request when senders_flow thread received MAIL_CLOSED.
* arguments : _in_ fcmsgr: the message that senders_flow thread received.
* return value : void
* if the receiver quit, we will get a MAIL_CLOSED message,
* we should close the pmailbox and tell the executor thread to continue
*/
void gs_senders_flow_handle_close_request(FCMSG_T* fcmsgr)
{
int streamid = fcmsgr->streamid;
int node_idx = fcmsgr->node_idx;
struct p_mailbox* pmailbox = NULL;
pmailbox = &P_MAILBOX(node_idx, streamid);
LIBCOMM_PTHREAD_MUTEX_LOCK(&pmailbox->sinfo_lock);
COMM_DEBUG_LOG("(s|flow ctrl)\tStream[%d] is closed "
"by remote node[%d]:%s, query[%lu].",
streamid,
node_idx,
g_instance.comm_cxt.g_s_node_sock[node_idx].remote_nodename,
fcmsgr->query_id);
if (gs_check_mailbox(pmailbox->local_version, fcmsgr->version) == false) {
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
return;
}
gs_s_close_logic_connection(pmailbox, ECOMMTCPREMOETECLOSE, NULL);
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
}
* function name : gs_senders_flow_handle_assert_fail_request
* description : handle assert fail when senders_flow thread received CTRL_ASSERT_FAIL.
* arguments : _in_ fcmsgr: the message that senders_flow thread received.
* return value : void
* if the receiver assert fail and core, we will get a CTRL_ASSERT_FAIL message,
* we should make the producer core as well
*/
void gs_senders_flow_handle_assert_fail_request(FCMSG_T* fcmsgr)
{
int sidx = fcmsgr->streamid;
int nidx = fcmsgr->node_idx;
struct p_mailbox* pmailbox = NULL;
pmailbox = &(P_MAILBOX(nidx, sidx));
LIBCOMM_ELOG(WARNING,
"(s|flow ctrl)\tNode[%d] stream[%d] assert fail, node name[%s] with state[%d] has bufCAP[%lu].",
nidx,
sidx,
g_instance.comm_cxt.g_s_node_sock[nidx].remote_nodename,
pmailbox->state,
pmailbox->bufCAP);
MAILBOX_ELOG(pmailbox, WARNING, "(s|flow ctrl)\tMailbox Info which assert fail.");
Assert(0 != 0);
}
extern ThreadId getThreadIdForLibcomm(int logictid);
void gs_senders_flow_handle_stop_query_request(FCMSG_T* fcmsgr)
{
int streamid = fcmsgr->streamid;
int node_idx = fcmsgr->node_idx;
struct p_mailbox* pmailbox = NULL;
pmailbox = &P_MAILBOX(node_idx, streamid);
LIBCOMM_PTHREAD_MUTEX_LOCK(&pmailbox->sinfo_lock);
COMM_DEBUG_LOG("(s|flow ctrl)\tQuery[%lu] is stop "
"by remote node[%d]:%s with stream[%d].",
fcmsgr->query_id,
node_idx,
g_instance.comm_cxt.g_s_node_sock[node_idx].remote_nodename,
streamid);
if (gs_check_mailbox(pmailbox->local_version, fcmsgr->version) == false) {
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
return;
}
int logictid = (int)ntohl(fcmsgr->extra_info);
ThreadId backendTID = getThreadIdForLibcomm(logictid);
if (0 != backendTID) {
StreamNodeGroup::stopAllThreadInNodeGroup(backendTID, fcmsgr->query_id);
}
LIBCOMM_PTHREAD_MUTEX_UNLOCK(&pmailbox->sinfo_lock);
}