*
* poolmgr.h
*
* Definitions for the Datanode connection pool.
*
*
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
* Portions Copyright (c) 2010-2012 Postgres-XC Development Group
*
* src/include/pgxc/poolmgr.h
*
* -------------------------------------------------------------------------
*/
#ifndef POOLMGR_H
#define POOLMGR_H
#include <sys/time.h>
#include "pool_comm.h"
#include "pgxcnode.h"
#include "storage/pmsignal.h"
#include "utils/hsearch.h"
#ifdef ENABLE_MULTIPLE_NODES
#include "postgres.h"
#include "knl/knl_variable.h"
#include "miscadmin.h"
#include "access/xact.h"
#include "catalog/pgxc_node.h"
#include "commands/dbcommands.h"
#include "nodes/nodes.h"
#include "threadpool/threadpool.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/resowner.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "pgxc/pgxc.h"
#include "pgxc/nodemgr.h"
#include "postmaster/postmaster.h"
#include "postmaster/autovacuum.h"
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include "gssignal/gs_signal.h"
#include "libcomm/libcomm.h"
#include "tcop/utility.h"
#include "postmaster/postmaster.h"
#include "portability/instr_time.h"
#endif
#define MAX_IDLE_TIME 60
#define ENABLE_STATELESS_REUSE g_instance.attr.attr_network.PoolerStatelessReuse
#define CONN_STATE_NORMAL 'n'
#define CONN_STATE_ERROR 'e'
#define FOUR_ARGS (4)
#define FIVE_ARGS (5)
#define SIX_ARGS (6)
* List of flags related to pooler connection clean up when disconnecting
* a session or relaeasing handles.
* When Local SET commands (POOL_CMD_LOCAL_SET) are used, local parameter
* string is cleaned by the node commit itself.
* When global SET commands (POOL_CMD_GLOBAL_SET) are used, "RESET ALL"
* command is sent down to activated nodes to at session end. At the end
* of a transaction, connections using global SET commands are not sent
* back to pool.
* When temporary object commands are used (POOL_CMD_TEMP), "DISCARD ALL"
* query is sent down to nodes whose connection is activated at the end of
* a session.
* At the end of a transaction, a session using either temporary objects
* or global session parameters has its connections not sent back to pool.
*
* Local parameters are used to change within current transaction block.
* They are sent to remote nodes invloved in the transaction after sending
* BEGIN TRANSACTION using a special firing protocol.
* They cannot be sent when connections are obtained, making them having no
* effect as BEGIN is sent by backend after connections are obtained and
* obtention confirmation has been sent back to backend.
* SET CONSTRAINT, SET LOCAL commands are in this category.
*
* Global parmeters are used to change the behavior of current session.
* They are sent to the nodes when the connections are obtained.
* SET GLOBAL, general SET commands are in this category.
*/
typedef enum {
POOL_CMD_TEMP,
POOL_CMD_LOCAL_SET,
POOL_CMD_GLOBAL_SET
} PoolCommandType;
typedef enum {
POOL_NODE_CN,
POOL_NODE_DN
} PoolNodeType;
typedef enum {
POOL_NODE_NONE = 0,
POOL_NODE_PRIMARY,
POOL_NODE_STANDBY
} PoolNodeMode;
typedef struct {
Oid nodeoid;
char* connstr;
char* connstr1;
int freeSize;
int size;
bool valid;
pthread_mutex_t lock;
PGXCNodePoolSlot** slot;
HTAB* useridSlotsHash;
} PGXCNodePool;
typedef struct PoolerCleanParams {
DatabasePool **dbPool;
List *nodeList;
Oid *nodeOids;
int totalCleanNums;
int needCleanNumsPerNode;
int totalNodes;
struct timeval *currVal;
} PoolerCleanParams;
typedef struct {
ThreadId pid;
int num_invalid_node;
int num_connections;
int result_offset;
int* invalid_nodes;
Oid* conn_oids;
PGXCNodePoolSlot** connections;
DatabasePool* dbpool;
char* session_params;
char* temp_namespace;
PoolNodeType node_type;
PoolNodeMode* node_mode;
PoolConnDef* conndef;
PoolConnDef* conndef_for_validate;
bool is_retry;
PoolAgent* agent;
} PoolGeneralInfo;
* The number of conn slots is recorded based on the number of connSlotNums
* So we bind the two values together to maintain integrity
*/
typedef struct PoolSlotGroup {
PGXCNodePoolSlot** connSlots;
Oid* connOids;
uint32 connSlotNums;
uint32 offset;
} PoolSlotGroup;
* Invalid backend entry
* Collect info from pooler and pgxc_node. An invalid backend
* may hold several invalid connections to different node.
*/
typedef struct {
ThreadId tid;
char** node_name;
bool is_thread_pool_session;
* used to send signal.
* if IS_THREAD_POOL_WORKER, set to session_ctr_index;
* else is MyProcPid
*/
ThreadId send_sig_pid;
int num_nodes;
int total_len;
bool is_connecting;
} InvalidBackendEntry;
* PoolConnectionInfo entry for pg_pooler_status view
*/
typedef struct {
char* database;
char* user_name;
ThreadId tid;
char* pgoptions;
bool in_use;
Oid nodeOid;
char *session_params;
int fdsock;
ThreadId remote_pid;
uint32 used_count;
int idx;
int streamid;
} PoolConnectionInfo;
* Record global connection status for function 'comm_check_connection_status'
* This struct in g_instance.pooler_cxt is defined to g_GlobalConnStatus!
*/
typedef struct GlobalConnStatus {
struct ConnectionStatus **connEntries;
int totalEntriesCount;
pthread_mutex_t connectionStatusLock;
} GlobalConnStatus;
* ConnectionStatus entry for pg_conn_status view
*/
typedef struct ConnectionStatus {
Oid remote_nodeoid;
char *remote_name;
char *remote_host;
int remote_port;
bool is_connected;
bool no_error_occur;
int sock;
} ConnectionStatus;
typedef struct PGXCNodeConnectionInfo {
Oid nodeoid;
char* host;
int port;
int64 cn_connect_time;
int64 dn_connect_time;
} PGXCNodeConnectionInfo;
typedef struct {
int count;
int free_count;
int64 memory_size;
int64 free_memory_size;
int dbpool_count;
int nodepool_count;
} PoolConnStat;
typedef struct DatanodeShardInfo
{
Oid s_nodeoid;
NameData s_data_shardname;
Oid s_primary_nodeoid;
NameData s_primary_nodename;
} DatanodeShardInfo;
extern void PGXCPoolerProcessIam(void);
extern bool IsPGXCPoolerProcess(void);
extern int PoolManagerInit(void);
extern int PoolManagerDestroy(void);
* Get handle to pool manager. This function should be called just before
* forking off new session. It creates PoolHandle, PoolAgent and a pipe between
* them. PoolAgent is stored within Postmaster's memory context and Session
* closes it later. PoolHandle is returned and should be store in a local
* variable. After forking off it can be stored in global memory, so it will
* only be accessible by the process running the session.
*/
extern PoolHandle* GetPoolManagerHandle(void);
* Called from Postmaster(Coordinator) after fork. Close one end of the pipe and
* free memory occupied by PoolHandler
*/
extern void PoolManagerCloseHandle(PoolHandle* handle);
* Gracefully close connection to the PoolManager
*/
extern void PoolManagerDisconnect(void);
extern char* session_options(void);
* Called from Session process after fork(). Associate handle with session
* for subsequent calls. Associate session with specified database and
* initialize respective connection pool
*/
extern void PoolManagerConnect(PoolHandle* handle, const char* database, const char* user_name, const char* pgoptions);
* Reconnect to pool manager
* This simply does a disconnection followed by a reconnection.
*/
extern void PoolManagerReconnect(void);
* Save a SET command in Pooler.
* This command is run on existent agent connections
* and stored in pooler agent to be replayed when new connections
* are requested.
*/
extern int PoolManagerSetCommand(PoolCommandType command_type, const char* set_command, const char* name = NULL);
extern bool PoolManagerConnectionStatus(List* datanodelist, List* coordlist);
extern PoolConnDef* PoolManagerGetConnections(List* datanodelist, List* coordlist);
extern bool PoolManagerCleanConnection(
List* datanodelist, List* coordlist, const char* dbname, const char* username, bool is_missing = false);
extern bool PoolManagerCheckConnectionInfo(void);
extern void PoolManagerReloadConnectionInfo(void);
extern int PoolManagerAbortTransactions(
const char* dbname, const char* username, ThreadId** proc_pids, bool** isthreadpool);
extern void PoolManagerReleaseConnections(const char* status_array, int array_size, bool has_error);
extern InvalidBackendEntry* PoolManagerValidateConnection(bool clear, const char* co_node_name, uint32& count);
extern int PoolManagerCancelQuery(int dn_count, const int* dn_list, int co_count, const int* co_list);
extern int PoolManagerStopQuery(int dn_count, const int* dn_list);
extern bool IsPoolHandle(void);
extern bool test_conn(PGXCNodePoolSlot* slot, Oid nodeid);
extern PoolAgent* get_poolagent(void);
extern void release_connection(PoolAgent* agent, PGXCNodePoolSlot** ptrSlot, Oid node, bool force_destroy);
extern void wait_connection_ready(int finish_cnt, int total_cnt, int epfd, int timeout);
extern int fill_node_define(Oid *cn_oids, Oid *dn_oids, int cn_num, int dn_num,
NodeDefinition **nodeDef_list, Oid &result);
extern ConnectionStatus *fill_conn_entry(int total_cnt, NodeDefinition **nodeDef_list, int epfd, int *finish_cnt);
extern int PoolManagerSendLocalCommand(int dn_count, const int* dn_list, int co_count, const int* co_list);
extern void PoolManagerInitPoolerAgent();
extern int get_pool_connection_info(PoolConnectionInfo** connectionEntry);
extern int check_connection_status(ConnectionStatus **connectionEntry);
extern void set_pooler_ping(bool mod);
extern void pooler_get_connection_statinfo(PoolConnStat* conn_stat);
extern int register_pooler_session_param(const char* name, const char* queryString, PoolCommandType command_type);
extern int delete_pooler_session_params(const char* name);
extern void agent_reset_session(PoolAgent* agent, bool need_reset);
extern const char* GetNodeNameByOid(Oid nodeOid);
extern void InitOidNodeNameMappingCache();
extern void CleanOidNodeNameMappingCache();
extern int* StreamConnectNodes(libcommaddrinfo** addrArray, int connNum);
#ifdef ENABLE_MULTIPLE_NODES
extern void free_agent(PoolAgent* agent);
void destroy_slots(List* slots);
extern int get_connection(DatabasePool* dbPool, Oid nodeid, PGXCNodePoolSlot** slot, PoolNodeType node_type);
extern void decrease_node_size(DatabasePool* dbpool, Oid nodeid);
extern PGXCNodePool* acquire_nodepool(DatabasePool* dbPool, Oid nodeid, PoolNodeType node_type);
extern int node_info_check(PoolAgent* agent);
extern int node_info_check_pooler_stateless(PoolAgent* agent);
extern void agent_init(PoolAgent* agent, const char* database, const char* user_name, const char* pgoptions);
extern void agent_destroy(PoolAgent* agent);
extern PoolAgent* agent_create(void);
extern int agent_session_command(PoolAgent* agent, const char* set_command,
PoolCommandType command_type, bool append_sess_param);
extern int agent_set_command(PoolAgent* agent, const char* set_command,
PoolCommandType command_type, bool append_sess_param);
extern int agent_temp_command(PoolAgent* agent, const char* namespace_name);
extern DatabasePool* create_database_pool(const char* database, const char* user_name, const char* pgoptions);
extern void reload_database_pools(PoolAgent* agent);
extern DatabasePool* find_database_pool(const char* database, const char* user_name, const char* pgoptions);
extern int send_local_commands(PoolAgent* agent, int dn_count, const int* dn_list, int co_count, const int* co_list);
extern int cancel_query_on_connections(
PoolAgent* agent, int dn_count, const int* dn_list, int co_count, const int* co_list);
extern int stop_query_on_connections(PoolAgent* agent, int dn_count, const int* dn_list);
extern void agent_release_connections(PoolAgent* agent, bool force_destroy, const char* status_array = NULL);
void release_connection(PoolAgent* agent, PGXCNodePoolSlot** slot, Oid node, bool force_destroy);
extern void destroy_slot(PGXCNodePoolSlot* slot);
extern List* destroy_node_pool(PGXCNodePool* node_pool, List* slots);
extern int clean_connection(List* nodelist, const char* database, const char* user_name);
extern ThreadId* abort_pids(int* count, ThreadId pid, const char* database, const char* user_name, bool** isthreadpool);
extern char* build_node_conn_str(Oid nodeid, DatabasePool* dbPool, bool isNeedSlave);
extern void agent_acquire_connections_parallel(
PoolAgent* agent, PoolConnDef* result, List* datanodelist, List* coordlist);
extern void agent_acquire_connections_start(PoolAgent* agent, List* datanodelist, List* coordlist);
extern void agent_acquire_connections_end(PoolAgent* agent);
extern bool is_current_node_during_connecting(
PoolAgent* agent, const NodeRelationInfo *needClearNodeArray, int waitClearCnt, char node_type);
extern void reset_params_htab(HTAB* htab, bool);
extern int PGXCNodeSendSetParallel(PoolAgent* agent, const char* set_command);
extern char* MakePoolerSessionParams(PoolCommandType commandType);
extern int get_nodeinfo_from_matric(
char nodeType, int needCreateArrayLen, NodeRelationInfo *needCreateNodeArray);
extern void pooler_sleep_interval_time(bool *poolValidateCancel);
extern HTAB* create_user_slothash(const char* tabname, long nelem);
extern void agent_verify_node_size(PGXCNodePool* nodePool);
extern PGXCNodePoolSlot* alloc_slot_mem(DatabasePool* dbpool);
extern char* GenerateSqlCommand(int argCount, ...);
extern void reload_user_name_pgoptions(PoolGeneralInfo* info, PoolAgent* agent, PGXCNodePoolSlot* slot);
extern bool release_slot_to_nodepool(PGXCNodePool* nodePool, bool force_destroy, PGXCNodePoolSlot* slot);
extern void free_pool_conn(PoolConnDef* conndef_for_validate);
extern void FillNodeConnectionStatus(ConnectionStatus *connsEntry, int entryCnt);
extern void RecreateGlobalConnEntries();
extern void ResetPoolerConnectionStatus();
extern void FlushPoolerConnectionStatus(Oid nodeOid);
extern MemoryContext PoolerMemoryContext;
* Allocations of core objects: Datanode connections, upper level structures,
* connection strings, etc.
*/
extern MemoryContext PoolerCoreContext;
* Memory to store Agents
*/
extern MemoryContext PoolerAgentContext;
extern DatabasePool* databasePools;
extern int agentCount;
extern PoolAgent** poolAgents;
extern pthread_mutex_t g_poolAgentsLock;
extern int MaxAgentCount;
#define g_GlobalConnStatus (g_instance.pooler_cxt.globalConnStatus)
#define PROTO_TCP 1
#define get_agent(handle) ((PoolAgent*)(handle))
#endif
#ifdef ENABLE_UT
extern THR_LOCAL PoolHandle* poolHandle;
extern void free_user_name_pgoptions(PGXCNodePoolSlot* slot);
extern int agent_wait_send_connection_params_pooler_reuse(NODE_CONNECTION* conn);
#endif
#endif