*
* execRemote.h
*
* Functions to execute commands on multiple Datanodes
*
*
* Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
* Portions Copyright (c) 2010-2012 Postgres-XC Development Group
*
* src/include/pgxc/execRemote.h
*
* -------------------------------------------------------------------------
*/
#ifndef EXECREMOTE_H
#define EXECREMOTE_H
#include "locator.h"
#include "nodes/nodes.h"
#include "pgxcnode.h"
#include "access/tupdesc.h"
#include "executor/tuptable.h"
#include "nodes/execnodes.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "optimizer/pgxcplan.h"
#include "pgxc/remoteCombiner.h"
#include "tcop/dest.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "utils/lsyscache.h"
#include "utils/snapshot.h"
#include "utils/rowstore.h"
#define LPROXY_CONTINUE 0
#define LPROXY_FINISH 1
#define LPROXY_ERROR 2
#define RESPONSE_EOF EOF
#define RESPONSE_COMPLETE 0
#define RESPONSE_SUSPENDED 1
#define RESPONSE_TUPDESC 2
#define RESPONSE_DATAROW 3
#define RESPONSE_COPY 4
#define RESPONSE_BARRIER_OK 5
#define RESPONSE_PLANID_OK 6
#define RESPONSE_ANALYZE_ROWCNT 7
#define RESPONSE_SEQUENCE_OK 8
#define RESPONSE_MAXCSN_RECEIVED 9
#define REMOTE_CHECKMSG_LEN 8
#define STREAM_CHECKMSG_LEN 20
#define RESPONSE_RECURSIVE_SYNC_R 98
#define RESPONSE_RECURSIVE_SYNC_F 99
#ifdef WIN32
#define SOCK_ERRNO (WSAGetLastError())
#define SOCK_STRERROR winsock_strerror
#define SOCK_ERRNO_SET(e) WSASetLastError(e)
#else
#define SOCK_ERRNO errno
#define SOCK_STRERROR pqStrerror
#define SOCK_ERRNO_SET(e) (errno = (e))
#endif
#define ERROR_CHECK_TIMEOUT 3
#define ALARM_RETRY_COUNT 2
typedef struct CombineTag {
CmdType cmdType;
char data[COMPLETION_TAG_BUFSIZE];
} CombineTag;
typedef void (*xact_callback)(bool isCommit, const void* args);
typedef void (*strategy_func)(ParallelFunctionState*);
typedef enum RemoteXactNodeStatus {
RXACT_NODE_NONE,
RXACT_NODE_PREPARE_SENT,
RXACT_NODE_PREPARE_FAILED,
RXACT_NODE_PREPARED,
RXACT_NODE_COMMIT_SENT,
RXACT_NODE_COMMIT_FAILED,
RXACT_NODE_COMMITTED,
RXACT_NODE_ABORT_SENT,
RXACT_NODE_ABORT_FAILED,
RXACT_NODE_ABORTED
} RemoteXactNodeStatus;
typedef enum RemoteXactStatus {
RXACT_NONE,
RXACT_PREPARE_FAILED,
RXACT_PREPARED,
RXACT_COMMIT_FAILED,
RXACT_PART_COMMITTED,
RXACT_COMMITTED,
RXACT_ABORT_FAILED,
RXACT_PART_ABORTED,
RXACT_ABORTED
} RemoteXactStatus;
typedef struct RemoteXactState {
RemoteXactStatus status;
* Information about all the nodes involved in the transaction. We track
* the number of writers and readers. The first numWriteRemoteNodes entries
* in the remoteNodeHandles and remoteNodeStatus correspond to the writer
* connections and rest correspond to the reader connections.
*/
int numWriteRemoteNodes;
int numReadRemoteNodes;
int maxRemoteNodes;
PGXCNodeHandle** remoteNodeHandles;
RemoteXactNodeStatus* remoteNodeStatus;
bool preparedLocalNode;
bool need_primary_dn_commit;
char prepareGID[256];
} RemoteXactState;
#ifdef PGXC
typedef struct abort_callback_type {
xact_callback function;
void* fparams;
} abort_callback_type;
#endif
#ifdef USE_SPQ
typedef struct SpqAdpScanPagesReq {
int plan_node_id;
int direction;
uint32 nblocks;
int64_t cur_scan_iter_no;
} SpqAdpScanPagesReq;
typedef struct SpqAdpScanPagesRes {
int32 success;
BlockNumber page_start;
BlockNumber page_end;
} SpqAdpScanPagesRes;
void disconnect_qc_conn();
#endif
static inline char* GetIndexNameForStat(Oid indid, char* relname)
{
char* indname = get_rel_name(indid);
if (indname == NULL) {
ereport(LOG,
(errmsg("Analyze can not get index name by index id %u on table %s and will skip this index.",
indid, relname)));
}
return indname;
}
extern PGXCNodeHandle** DataNodeCopyBegin(const char* query, List* nodelist, Snapshot snapshot);
extern int DataNodeCopyIn(const char* data_row, int len, const char* eol, ExecNodes* exec_nodes,
PGXCNodeHandle** copy_connections, bool is_binary = false);
extern uint64 DataNodeCopyOut(ExecNodes* exec_nodes, PGXCNodeHandle** copy_connections, TupleDesc tupleDesc,
FILE* copy_file, Tuplestorestate* store, RemoteCopyType remoteCopyType);
extern void DataNodeCopyFinish(PGXCNodeHandle** copy_connections, int n_copy_connections, int primary_dn_index,
CombineType combine_type, Relation rel);
extern bool DataNodeCopyEnd(PGXCNodeHandle* handle, bool is_error);
extern int DataNodeCopyInBinaryForAll(const char* msg_buf, int len, PGXCNodeHandle** copy_connections);
extern int ExecCountSlotsRemoteQuery(RemoteQuery* node);
extern RemoteQueryState* ExecInitRemoteQuery(RemoteQuery* node, EState* estate, int eflags, bool row_plan = true);
extern void ExecEndRemoteQuery(RemoteQueryState* step, bool pre_end = false);
extern void FreeParallelFunctionState(ParallelFunctionState* state);
extern void StrategyFuncSum(ParallelFunctionState* state);
extern ParallelFunctionState* RemoteFunctionResultHandler(char* sql_statement, ExecNodes* exec_nodes,
strategy_func function, bool read_only = true, RemoteQueryExecType exec_type = EXEC_ON_DATANODES,
bool non_check_count = false, bool need_tran_block = false, bool need_transform_anyarray = false,
bool active_nodes_only = false);
extern void ExecRemoteUtility(RemoteQuery* node);
extern void ExecRemoteUtility_ParallelDDLMode(RemoteQuery* node, const char* FirstExecNode);
extern void ExecRemoteUtilityParallelBarrier(const RemoteQuery* node, const char* firstExecNode);
extern RemoteQueryState* CreateResponseCombinerForBarrier(int nodeCount, CombineType combineType);
extern void CloseCombinerForBarrier(RemoteQueryState* combiner);
extern HeapTuple* ExecRemoteUtilityWithResults(
VacuumStmt* stmt, RemoteQuery* node, ANALYZE_RQTYPE arq_type, AnalyzeMode eAnalyzeMode = ANALYZENORMAL);
extern HeapTuple* RecvRemoteSampleMessage(
VacuumStmt* stmt, RemoteQuery* node, ANALYZE_RQTYPE arq_type, AnalyzeMode eAnalyzeMode = ANALYZENORMAL);
extern int handle_response(PGXCNodeHandle* conn, RemoteQueryState* combiner, bool isdummy = false);
extern bool is_data_node_ready(PGXCNodeHandle* conn);
extern void HandleCmdComplete(CmdType commandType, CombineTag* combine, const char* msg_body, size_t len);
extern bool FetchTuple(
RemoteQueryState* combiner, TupleTableSlot* slot, ParallelFunctionState* parallelfunctionstate = NULL);
extern bool FetchTupleSimple(RemoteQueryState* combiner, TupleTableSlot* slot);
template <bool BatchFormat, bool ForParallelFunction>
extern bool FetchTupleByMultiChannel(
RemoteQueryState* combiner, TupleTableSlot* slot, ParallelFunctionState* parallelfunctionstate = NULL);
extern bool FetchBatch(RemoteQueryState* combiner, VectorBatch* batch);
extern void BufferConnection(PGXCNodeHandle* conn, bool cachedata = true);
extern void ExecRemoteQueryReScan(RemoteQueryState* node, ExprContext* exprCtxt);
extern void SetDataRowForExtParams(ParamListInfo params, RemoteQueryState* rq_state);
extern void ExecCloseRemoteStatement(const char* stmt_name, List* nodelist);
extern void ExecSetTempObjectIncluded(void);
extern bool ExecIsTempObjectIncluded(void);
extern TupleTableSlot* ExecProcNodeDMLInXC(EState* estate, TupleTableSlot* sourceDataSlot, TupleTableSlot* newDataSlot);
extern void pgxc_all_success_nodes(ExecNodes** d_nodes, ExecNodes** c_nodes, char** failednodes_msg);
extern int PackConnections(RemoteQueryState* node);
extern void AtEOXact_DBCleanup(bool isCommit);
extern void set_dbcleanup_callback(xact_callback function, const void* paraminfo, int paraminfo_size);
extern void do_query(RemoteQueryState* node, bool vectorized = false);
extern bool do_query_for_planrouter(RemoteQueryState* node, bool vectorized = false);
extern void do_query_for_scangather(RemoteQueryState* node, bool vectorized = false);
extern void do_query_for_first_tuple(RemoteQueryState* node, bool vectorized, int regular_conn_count,
PGXCNodeHandle** connections, PGXCNodeHandle* primaryconnection, List* dummy_connections);
extern void free_RemoteXactState(void);
extern char* repairObjectName(const char* relname);
extern char* repairTempNamespaceName(char* schemaname);
extern void pgxc_node_report_error(RemoteQueryState* combiner, int elevel = 0);
extern void setSocketError(const char*, const char*);
extern char* getSocketError(int* errcode);
extern int getStreamSocketError(const char* str);
extern int FetchStatistics4WLM(const char* sql, void* info, Size size, strategy_func func);
extern void FetchGlobalStatistics(VacuumStmt* stmt, Oid relid, RangeVar* parentRel, bool isReplication = false);
extern bool IsInheritor(Oid relid);
extern Tuplesortstate* tuplesort_begin_merge(TupleDesc tupDesc, int nkeys, AttrNumber* attNums, Oid* sortOperators,
Oid* sortCollations, const bool* nullsFirstFlags, void* combiner, int workMem);
extern void pgxc_node_remote_savepoint(
const char* cmdString, RemoteQueryExecType exec_type, bool bNeedXid, bool bNeedBegin,
GlobalTransactionId transactionId = InvalidTransactionId);
extern bool pgxc_start_command_on_connection(PGXCNodeHandle* connection, RemoteQueryState* remotestate,
Snapshot snapshot, const char* compressPlan = NULL, int cLen = 0);
extern PGXCNodeAllHandles* connect_compute_pool(int srvtype);
extern char* generate_begin_command(void);
extern PGXCNodeAllHandles* get_exec_connections(
RemoteQueryState* planstate, ExecNodes* exec_nodes, RemoteQueryExecType exec_type);
extern void send_local_csn_min_to_ccn();
extern void csnminsync_get_global_csn_min(int conn_count, PGXCNodeHandle** connections);
extern void SendPGXCNodeCommitCsn(uint64 commit_csn);
extern void NotifyDNSetCSN2CommitInProgress();
extern void AssembleDataRow(StreamState* node);
extern bool isInLargeUpgrade();
extern uint64 get_datasize(Plan* plan, int srvtype, int* filenum);
extern void report_table_skewness_alarm(AlarmType alarmType, const char* tableName);
extern bool InternalDoQueryForPlanrouter(RemoteQueryState* node, bool vectorized);
extern List* TryGetNeededDNNum(uint64 dnneeded);
extern List* GetDnlistForHdfs(int fnum);
extern void MakeNewSpiltmap(Plan* plan, SplitMap* map);
extern List* ReassignSplitmap(Plan* plan, int dn_num);
extern int ComputeNodeBegin(int conn_count, PGXCNodeHandle** connections, GlobalTransactionId gxid);
extern void sendQuery(const char* sql, const PGXCNodeAllHandles* pgxc_handles,
int conn_count, bool isCoordinator,
RemoteQueryState* remotestate, const Snapshot snapshot);
StringInfo* SendExplainToDNs(ExplainState*, RemoteQuery*, int*, const char*);
bool CheckPrepared(RemoteQuery* rq, Oid nodeoid);
void FindExecNodesInPBE(RemoteQueryState* planstate, ExecNodes* exec_nodes, RemoteQueryExecType exec_type);
extern PGXCNodeHandle* GetRegisteredTransactionNodes(bool write);
extern bool check_errmsg_for_receive_buffer(RemoteQueryState* combiner, int tapenum,
bool* has_checked, int* has_err_idx);
#ifdef ENABLE_UT
#include "workload/cpwlm.h"
extern THR_LOCAL List* XactWriteNodes;
extern THR_LOCAL List* XactReadNodes;
extern PGXCNodeAllHandles* connect_compute_pool_for_HDFS();
extern PGXCNodeAllHandles* make_cp_conn(ComputePoolConfig** configs, int cnum, int srvtype, const char* dbname);
extern List* get_dnlist_for_hdfs(int fnum);
extern void ReloadTransactionNodes(void);
extern void PgFdwRemoteReply(StringInfo msg);
#endif
#endif