* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* libcomm.h
*
*
* IDENTIFICATION
* src/include/libcomm/libcomm.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef _GS_LIBCOMM_H_
#define _GS_LIBCOMM_H_
#include <stdio.h>
#include <netinet/in.h>
#include "sys/epoll.h"
#ifndef WIN32
#include <pthread.h>
#else
#include "pthread-win32.h"
#endif
#include <stdlib.h>
#include "c.h"
#include "cipher.h"
#include "utils/palloc.h"
#ifdef USE_SSL
#include "openssl/err.h"
#include "openssl/ssl.h"
#include "openssl/rand.h"
#include "openssl/ossl_typ.h"
#include "openssl/obj_mac.h"
#include "openssl/dh.h"
#include "openssl/bn.h"
#include "openssl/x509.h"
#include "openssl/x509_vfy.h"
#include "openssl/opensslconf.h"
#include "openssl/crypto.h"
#include "openssl/bio.h"
#endif
#define ECOMMTCPARGSINVAL 1001
#define ECOMMTCPMEMALLOC 1002
#define ECOMMTCPCVINIT 1003
#define ECOMMTCPCVDESTROY 1004
#define ECOMMTCPLOCKINIT 1005
#define ECOMMTCPLOCKDESTROY 1006
#define ECOMMTCPNODEIDFD 1007
#define ECOMMTCPNODEIDXTCPFD 1008
#define ECOMMTCPSETSTREAMIDX 1009
#define ECOMMTCPBUFFQSIZE 1010
#define ECOMMTCPQUTOASZIE 1011
#define ECOMMTCPSEMINIT 1012
#define ECOMMTCPSEMPOST 1013
#define ECOMMTCPSEMWAIT 1014
#define ECOMMTCPEPOLLINIT 1015
#define ECOMMTCPEPOLLHNDL 1016
#define ECOMMTCPSCTPADRINIT 1017
#define ECOMMTCPSCTPLISTEN 1018
#define ECOMMTCPCMAILBOXINIT 1019
#define ECOMMTCPNODEIDXSCTPPORT 1020
#define ECOMMTCPSTREAMIDX 1021
#define ECOMMTCPTCPFD 1022
#define ECOMMTCPEPOLLEVNT 1023
#define ECOMMTCPCTRLMSG 1024
#define ECOMMTCPCTRLMSGWR 1025
#define ECOMMTCPCTRLMSGRD 1026
#define ECOMMTCPSTREAMIDXINVAL 1027
#define ECOMMTCPINVALNODEID 1028
#define ECOMMTCPCTRLMSGSIZE 1029
#define ECOMMTCPCVSIGNAL 1030
#define ECOMMTCPEPOLLLST 1031
#define ECOMMTCPCTRLCONN 1032
#define ECOMMTCPSND 1033
#define ECOMMTCPEPOLLCLOSE 1034
#define ECOMMTCPEPOLLTIMEOUT 1035
#define ECOMMTCPHASHENTRYDEL 1036
#define ECOMMTCPTHREADSTOP 1037
#define ECOMMTCPMAILBOXCLOSE 1038
#define ECOMMTCPSTREAMSTATE 1039
#define ECOMMTCPSCTPFDINVAL 1040
#define ECOMMTCPNODATA 1041
#define ECOMMTCPTHREADSTART 1042
#define ECOMMTCPHASHENTRYADD 1043
#define ECOMMTCPBUILDSCTPASSOC 1044
#define ECOMMTCPWRONGSTREAMKEY 1045
#define ECOMMTCPRELEASEMEM 1046
#define ECOMMTCPTCPDISCONNECT 1047
#define ECOMMTCPDISCONNECT 1048
#define ECOMMTCPREMOETECLOSE 1049
#define ECOMMTCPAPPCLOSE 1050
#define ECOMMTCPLOCALCLOSEPOLL 1051
#define ECOMMTCPPEERCLOSEPOLL 1052
#define ECOMMTCPCONNFAIL 1054
#define ECOMMTCPSTREAMCONNFAIL 1055
#define ECOMMTCPREJECTSTREAM 1056
#define ECOMMTCPCONNTIMEOUT 1057
#define ECOMMTCPWAITQUOTAFAIL 1058
#define ECOMMTCPWAITPOLLERROR 1059
#define ECOMMTCPPEERCHANGED 1060
#define ECOMMTCPGSSAUTHFAIL 1061
#define ECOMMTCPSENDTIMEOUT 1062
#define ECOMMTCPNOTINTERNALIP 1063
#define HOST_ADDRSTRLEN INET6_ADDRSTRLEN
#define HOST_LEN_OF_HTAB 64
#define NAMEDATALEN 64
#define MSG_TIME_LEN 30
#define MAX_DN_NODE_NUM 8192
#define MAX_CN_NODE_NUM 1024
#define MAX_CN_DN_NODE_NUM (MAX_DN_NODE_NUM + MAX_CN_NODE_NUM)
#define MIN_CN_DN_NODE_NUM (1 + 1)
#define DOUBLE_NAMEDATALEN 128
#define SEC_TO_MICRO_SEC 1000
typedef enum {
LIBCOMM_NONE,
LIBCOMM_SEND_CTRL,
LIBCOMM_RECV_CTRL,
LIBCOMM_RECV_LOOP,
LIBCOMM_AUX
} LibcommThreadTypeDef;
typedef enum
{
SEND_SOME = 0,
SECURE_READ,
SECURE_WRITE,
READ_DATA,
READ_DATA_FROM_LOGIC
} CommMsgOper;
typedef enum
{
POSTMASTER = 0,
GS_SEND_flow,
GS_RECV_FLOW,
GS_RECV_LOOP,
} CommThreadUsed;
typedef struct CommStreamKey {
uint64 queryId;
uint32 planNodeId;
uint32 producerSmpId;
uint32 consumerSmpId;
} TcpStreamKey;
typedef struct {
uint16 idx;
uint16 sid;
uint16 ver;
uint16 type;
} gsocket;
struct StreamConnInfo;
typedef struct {
char remote_node[NAMEDATALEN];
char remote_host[HOST_ADDRSTRLEN];
int idx;
int stream_id;
const char* stream_state;
int tcp_sock;
uint64 query_id;
TcpStreamKey stream_key;
long quota_size;
unsigned long buff_usize;
long bytes;
long time;
long speed;
unsigned long local_thread_id;
unsigned long peer_thread_id;
} CommRecvStreamStatus;
typedef struct {
char remote_node[NAMEDATALEN];
char remote_host[HOST_ADDRSTRLEN];
int idx;
int stream_id;
const char* stream_state;
int tcp_sock;
int packet_count;
int quota_count;
uint64 query_id;
TcpStreamKey stream_key;
long bytes;
long time;
long speed;
long quota_size;
long wait_quota;
long send_overhead;
unsigned long local_thread_id;
unsigned long peer_thread_id;
} CommSendStreamStatus;
typedef struct {
long recv_speed;
long send_speed;
int recv_count_speed;
int send_count_speed;
long buffer;
long mem_libcomm;
long mem_libpq;
int postmaster;
int gs_sender_flow;
int gs_receiver_flow;
int gs_receiver_loop;
int stream_conn_num;
} CommStat;
typedef struct {
char remote_node[NAMEDATALEN];
char remote_host[HOST_ADDRSTRLEN];
int idx;
int stream_num;
uint32 min_delay;
uint32 dev_delay;
uint32 max_delay;
} CommDelayInfo;
#ifdef USE_SSL
typedef struct SSL_INFO {
SSL* ssl;
X509* peer;
char* peer_cn;
unsigned long count;
int sock;
} SSL_INFO;
typedef struct libcommsslinfo {
SSL_INFO node;
struct libcommsslinfo* next;
} libcomm_sslinfo;
#endif
typedef struct {
int socket;
char *libcommhost;
char *sslcert;
char *sslcrl;
char *sslkey;
char *sslrootcert;
char *remote_nodename;
bool sigpipe_so;
#ifdef USE_SSL
SSL *ssl;
X509 *peer;
#endif
char* sslmode;
bool sigpipe_flag;
unsigned char cipher_passwd[CIPHER_LEN + 1];
} LibCommConn;
typedef struct libcommaddrinfo {
char* host;
char nodename[NAMEDATALEN];
int ctrl_port;
int listen_port;
int status;
int nodeIdx;
CommStreamKey streamKey;
unsigned int qid;
bool parallel_send_mode;
int addr_list_size;
libcommaddrinfo* addr_list_next;
gsocket gs_sock;
char selfnodename[NAMEDATALEN];
int shift;
} libcomm_addrinfo;
typedef struct MessageIpcLog
{
char type;
int msg_cursor;
int msg_len;
int len_cursor;
uint32 len_cache;
char last_msg_type;
int last_msg_len;
int last_msg_count;
char last_msg_time[MSG_TIME_LEN];
}MessageIpcLog;
typedef struct MessageCommLog
{
MessageIpcLog recv_ipc_log;
MessageIpcLog send_ipc_log;
}MessageCommLog;
typedef enum {
GSOCK_INVALID,
GSOCK_PRODUCER,
GSOCK_CONSUMER,
GSOCK_DAUL_CHANNEL,
} GSOCK_TYPE;
extern gsocket gs_invalid_gsock;
#define GS_INVALID_GSOCK gs_invalid_gsock
void mc_elog(int elevel, const char* fmt, ...) __attribute__((format(printf, 2, 3)));
#define LIBCOMM_DEBUG_LOG(format, ...) \
do { \
; \
} while (0)
typedef enum { ROLE_PRODUCER, ROLE_CONSUMER, ROLE_MAX_TYPE } SctpNodeRole;
typedef enum { CONNSTATEFAIL, CONNSTATECONNECTING, CONNSTATESUCCEED } ConnectionState;
typedef enum { DATA_CHANNEL, CTRL_CHANNEL } ChannelType;
typedef bool (*wakeup_hook_type)(TcpStreamKey key, StreamConnInfo connInfo);
extern int gs_set_basic_info(const char* local_host,
const char* local_node_name,
int node_num,
char* sock_path);
extern int gs_connect(libcommaddrinfo** sctp_addrinfo,
int addr_num,
int timeout
);
extern void gs_connect_regist_callback(wakeup_hook_type wakeup_callback);
extern int gs_send(gsocket* gs_sock,
char* message,
int m_len,
int time_out,
bool block_mode
);
extern int gs_broadcast_send(struct libcommaddrinfo* sctp_addrinfo, char* message, int m_len, int time_out);
extern int gs_recv(
gsocket* gs_sock,
void* buff,
int buff_size
);
extern int gs_wait_poll(gsocket* gs_sock_array,
int nproducer,
int* producer,
int timeout,
bool close_expected
);
extern void gs_comm_ipc_print(MessageIpcLog *ipc_log, char *remotenode, gsocket *gs_sock, CommMsgOper msg_oper);
extern MessageCommLog* gs_comm_ipc_performance(MessageCommLog *msgLog,
void *ptr,
int n,
char *remotenode,
gsocket *gs_sock,
CommMsgOper logType);
extern int gs_r_close_stream(int sctp_idx,
int sctp_sid,
int version
);
extern int gs_r_close_stream(gsocket* gsock);
extern int gs_s_close_stream(int sctp_idx,
int sctp_sid,
int version
);
extern void gs_close_gsocket(gsocket* gsock);
extern void gs_poll_close();
extern int gs_s_close_stream(gsocket* gsock);
extern int gs_poll(int time_out);
extern int gs_poll_create();
extern int gs_close_all_stream_by_debug_id(uint64 query_id);
extern bool gs_stop_query(gsocket* gsock, uint32 remote_pid);
extern void gs_shutdown_comm();
extern void gs_r_cancel();
extern void gs_set_working_version_num(int num);
extern void gs_log_comm_status();
extern int gs_check_SLESSP2_version();
extern int gs_get_stream_num();
const char* gs_comm_strerror();
extern bool get_next_recv_stream_status(CommRecvStreamStatus* stream_status);
extern bool get_next_send_stream_status(CommSendStreamStatus* stream_status);
extern bool gs_get_comm_stat(CommStat* comm_stat);
extern bool get_next_comm_delay_info(CommDelayInfo* delay_info);
extern void gs_set_debug_mode(bool mod);
extern void gs_set_stat_mode(bool mod);
extern void gs_set_timer_mode(bool mod);
extern void gs_set_no_delay(bool mod);
extern void gs_set_ackchk_time(int mod);
extern void gs_set_libcomm_used_rate(int rate);
extern void init_libcomm_cpu_rate();
extern void gs_set_working_version_num(int num);
extern long gs_get_comm_used_memory(void);
extern long gs_get_comm_peak_memory(void);
extern Size gs_get_comm_context_memory(void);
extern int gs_release_comm_memory();
extern int gs_recv_msg_by_unix_domain(int fd, gsocket* gs_sock);
extern bool gs_test_libcomm_conn(gsocket* gs_sock);
extern void gs_clean_cmailbox(gsocket gs_sock);
extern bool gs_check_mailbox(uint16 version1, uint16 version2);
extern void gs_r_reset_cmailbox(struct c_mailbox* cmailbox, int close_reason);
extern void gs_s_reset_pmailbox(struct p_mailbox* pmailbox, int close_reason);
extern bool gs_mailbox_build(int idx);
extern void gs_mailbox_destory(int idx);
extern void gs_change_capacity(int newval);
extern int gs_get_cur_node();
extern void commSenderFlowMain();
extern void commReceiverFlowMain();
extern void commAuxiliaryMain();
extern void commPoolCleanerMain();
extern void commReceiverMain(void* tid_callback);
extern void gs_init_adapt_layer();
extern void gs_senders_struct_set();
extern void init_comm_buffer_size();
extern void gs_set_local_host(const char* host);
extern void gs_broadcast_poll();
extern void gs_set_kerberos_keyfile();
extern void gs_receivers_struct_init(int ctrl_port, int data_port);
extern void gs_set_comm_session();
extern int gs_get_node_idx(char* node_name, bool fromhcom = false);
extern int gs_memory_pool_queue_initial_success(uint32 index);
extern struct mc_lqueue_item* gs_memory_pool_queue_pop(char* iov);
extern bool gs_memory_pool_queue_push(char* item);
extern ThreadId startCommSenderFlow(void);
extern ThreadId startCommReceiverFlow();
extern ThreadId startCommAuxiliary();
extern ThreadId startCommReceiver(int* tid);
extern void startCommReceiverWorker(ThreadId* threadid);
extern void gs_init_hash_table();
extern int mc_tcp_connect_nonblock(const char* host, int port);
extern void CommResourceInit();
extern int CommEpollCreate(int size);
extern int CommEpollCtl(int epfd, int op, int fd, struct epoll_event *event);
extern int CommEpollWait(int epfd, struct epoll_event *event, int maxevents, int timeout);
extern int CommEpollClose(int epfd);
extern void InitCommLogicResource();
extern void ProcessCommLogicTearDown();
extern int gs_get_nodeshift(char* node_name);
class CommEpollFd;
struct HTAB;
typedef struct LogicFd {
int idx;
int streamid;
} LogicFd;
typedef struct CommEpFdInfo {
int epfd;
CommEpollFd *comm_epfd;
} CommEpFdInfo;
typedef struct CommFd {
int fd;
gsocket logic_fd;
} CommFd;
struct knl_session_context;
class CommEpollFd;
typedef struct SessionInfo {
LogicFd logic_fd;
CommFd commfd;
int wakeup_cnt;
int handle_wakeup_cnt;
volatile bool err_occurs;
CommEpollFd *comm_epfd_ptr;
knl_session_context *session_ptr;
int is_idle;
} SessionInfo;
#ifndef NO_COMPILE_CLASS_FDCOLLECTION
class FdCollection : public BaseObject {
public:
FdCollection();
~FdCollection();
void Init();
void DeInit();
int AddEpfd(int epfd, int size);
int DelEpfd(int epfd);
void CleanEpfd();
inline CommEpollFd* GetCommEpollFd(int epfd);
int AddLogicFd(CommEpollFd *comm_epfd, void *session_ptr);
int DelLogicFd(const LogicFd *logic_fd);
void CleanLogicFd();
SessionInfo* GetSessionInfo(const LogicFd *logic_fd);
private:
inline bool IsEpfdValid(int fd);
HTAB *m_epfd_htab;
pthread_mutex_t m_epfd_htab_lock;
HTAB *m_logicfd_htab;
pthread_mutex_t m_logicfd_htab_lock;
int m_logicfd_nums;
};
#endif
extern void WakeupSession(SessionInfo *session_info, bool err_occurs, const char* caller_name);
* LIBCOMM_CHECK is defined when make commcheck
*/
#ifdef LIBCOMM_CHECK
#define LIBCOMM_FAULT_INJECTION_ENABLE
#define LIBCOMM_SPEED_TEST_ENABLE
#else
#undef LIBCOMM_FAULT_INJECTION_ENABLE
#undef LIBCOMM_SPEED_TEST_ENABLE
#endif
* Libcomm Speed Test Framework
* Before start: 1, Must run a stream query to get DN connections.
* 2, add new guc params to $GAUSSHOME/bin/cluster_guc.conf
* Start: Use gs_guc reload to set test thread num.
* For example: gs_guc reload -Z datanode -N all -I all -c "comm_test_thread_num=1"
* Stop: Use gs_guc reload to set test thread num=0.
*/
#ifdef LIBCOMM_SPEED_TEST_ENABLE
extern void gs_set_test_thread_num(int newval);
extern void gs_set_test_msg_len(int newval);
extern void gs_set_test_send_sleep(int newval);
extern void gs_set_test_send_once(int newval);
extern void gs_set_test_recv_sleep(int newval);
extern void gs_set_test_recv_once(int newval);
#endif
* Before start: 1, enable LIBCOMM_FAULT_INJECTION_ENABLE.
* 2, add new guc params to $GAUSSHOME/bin/cluster_guc.conf
* Start: Use gs_guc reload to set FI num.
* For instance: gs_guc reload -Z datanode -N all -I all -c "comm_fault_injection=6"
* Stop: Use gs_guc reload to set comm_fault_injection=0.
*/
#ifdef LIBCOMM_FAULT_INJECTION_ENABLE
typedef enum {
LIBCOMM_FI_NONE,
LIBCOMM_FI_R_TCP_DISCONNECT,
LIBCOMM_FI_R_SCTP_DISCONNECT,
LIBCOMM_FI_S_TCP_DISCONNECT,
LIBCOMM_FI_S_SCTP_DISCONNECT,
LIBCOMM_FI_RELEASE_MEMORY,
LIBCOMM_FI_FAILOVER,
LIBCOMM_FI_NO_STREAMID,
LIBCOMM_FI_CONSUMER_REJECT,
LIBCOMM_FI_R_APP_CLOSE,
LIBCOMM_FI_S_APP_CLOSE,
LIBCOMM_FI_CANCEL_SIGNAL,
LIBCOMM_FI_CLOSE_BY_VIEW,
LIBCOMM_FI_GSS_TCP_FAILED,
LIBCOMM_FI_GSS_SCTP_FAILED,
LIBCOMM_FI_R_PACKAGE_SPLIT,
LIBCOMM_FI_MALLOC_FAILED,
LIBCOMM_FI_MC_TCP_READ_FAILED,
LIBCOMM_FI_MC_TCP_READ_BLOCK_FAILED,
LIBCOMM_FI_MC_TCP_READ_NONBLOCK_FAILED,
LIBCOMM_FI_MC_TCP_WRITE_FAILED,
LIBCOMM_FI_MC_TCP_WRITE_NONBLOCK_FAILED,
LIBCOMM_FI_MC_TCP_ACCEPT_FAILED,
LIBCOMM_FI_MC_TCP_LISTEN_FAILED,
LIBCOMM_FI_MC_TCP_CONNECT_FAILED,
LIBCOMM_FI_FD_SOCKETVERSION_FAILED,
LIBCOMM_FI_SOCKID_NODEIDX_FAILED,
LIBCOMM_FI_POLLER_ADD_FD_FAILED,
LIBCOMM_FI_NO_NODEIDX,
LIBCOMM_FI_CREATE_POLL_FAILED,
LIBCOMM_FI_DYNAMIC_CAPACITY_FAILED,
LIBCOMM_FI_MAX
} LibcommFaultInjection;
extern void gs_set_fault_injection(int newval);
extern void set_comm_fault_injection(int type);
extern bool is_comm_fault_injection(LibcommFaultInjection type);
#endif
#ifdef USE_SPQ
constexpr uint16 SPQ_QE_CONNECTION = 0;
constexpr uint16 SPQ_QC_CONNECTION = 1;
struct QCConnKey {
uint64 query_id;
uint32 plan_node_id;
uint16 node_id;
uint16 type;
};
struct QCConnEntry {
QCConnKey key;
uint64 streamcap;
gsocket forward;
gsocket backward;
int scannedPageNum;
int internal_node_id;
};
struct BackConnInfo {
uint16 node_idx;
uint16 version;
uint64 streamcap;
uint64 query_id;
CommStreamKey stream_key;
gsocket *backward;
};
extern int gs_r_build_reply_connection(BackConnInfo* fcmsgr, int local_version, uint16 *sid);
#endif
#endif