* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* remoteCombiner.h
*
* IDENTIFICATION
* src/include/pgxc/remoteCombiner.h
*
* -------------------------------------------------------------------------
*/
#ifndef REMOTE_COMBINER_H
#define REMOTE_COMBINER_H
#include "locator.h"
#include "nodes/nodes.h"
#include "pgxcnode.h"
#include "access/tupdesc.h"
#include "executor/tuptable.h"
#include "nodes/execnodes.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "optimizer/pgxcplan.h"
#include "tcop/dest.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "utils/lsyscache.h"
#include "utils/snapshot.h"
#include "utils/rowstore.h"
typedef enum {
REQUEST_TYPE_NOT_DEFINED,
REQUEST_TYPE_COMMAND,
REQUEST_TYPE_QUERY,
REQUEST_TYPE_COPY_IN,
REQUEST_TYPE_COPY_OUT,
REQUEST_TYPE_COMMITING
} RequestType;
* Type of requests associated to a remote COPY OUT
*/
typedef enum {
REMOTE_COPY_NONE,
REMOTE_COPY_STDOUT,
REMOTE_COPY_FILE,
REMOTE_COPY_TUPLESTORE
} RemoteCopyType;
typedef struct NodeIdxInfo {
Oid nodeoid;
int nodeidx;
} NodeIdxInfo;
typedef enum {
PBE_NONE = 0,
PBE_ON_ONE_NODE,
PBE_ON_MULTI_NODES
} PBERunStatus;
typedef struct ParallelFunctionState {
char* sql_statement;
ExecNodes* exec_nodes;
TupleDesc tupdesc;
Tuplestorestate* tupstore;
int64 result;
void* resultset;
MemoryContext slotmxct;
bool read_only;
bool need_tran_block;
* because remote nodes use anyarry_out transform anyarray to string, so transform string to
* anyarray in local node is required */
bool need_transform_anyarray;
* nodes connections. */
bool active_nodes_only;
} ParallelFunctionState;
typedef bool (*fetchTupleFun)(
RemoteQueryState* combiner, TupleTableSlot* slot, ParallelFunctionState* parallelfunctionstate);
typedef struct RemoteQueryState {
ScanState ss;
int node_count;
PGXCNodeHandle** connections;
int conn_count;
int current_conn;
CombineType combine_type;
int command_complete_count;
RequestType request_type;
TupleDesc tuple_desc;
int description_count;
int copy_in_count;
int copy_out_count;
int errorCode;
char* errorMessage;
char* errorDetail;
char* errorContext;
char* hint;
char* query;
int cursorpos;
bool is_fatal_error;
bool query_Done;
RemoteDataRowData currentRow;
RowStoreManager row_store;
should be cleaned for reuse by other RemoteQuery*/
CommitSeqNo maxCSN;
bool hadrMainStandby;
* To handle special case - if this RemoteQuery is feeding sorted data to
* Sort plan and if the connection fetching data from the Datanode
* is buffered. If EOF is reached on a connection it should be removed from
* the array, but we need to know node number of the connection to find
* messages in the buffer. So we store nodenum to that array if reach EOF
* when buffering
*/
int* tapenodes;
RemoteCopyType remoteCopyType;
FILE* copy_file;
uint64 processed;
char* cursor;
char* update_cursor;
int cursor_count;
PGXCNodeHandle** cursor_connections;
char* paramval_data;
int paramval_len;
Oid* rqs_param_types;
int rqs_num_params;
int eflags;
bool eof_underlying;
Tuplestorestate* tuplestorestate;
CommandId rqs_cmd_id;
uint64 rqs_processed;
uint64 rqs_cur_processed;
void* tuplesortstate;
void* batchsortstate;
fetchTupleFun fetchTuple;
bool need_more_data;
RemoteErrorData remoteErrData;
bool need_error_check;
int64 analyze_totalrowcnt[ANALYZEDELTA];
int64 analyze_memsize[ANALYZEDELTA];
int valid_command_complete_count;
RemoteQueryType position;
bool* switch_connection;
bool refresh_handles;
NodeIdxInfo* nodeidxinfo;
PBERunStatus pbe_run_status;
bool resource_error;
char* previousNodeName;
char* serializedPlan;
ParallelFunctionState* parallel_function_state;
bool has_stream_for_loop;
#ifdef USE_SPQ
uint64 queryId;
PGXCNodeHandle** spq_connections_info;
pg_conn **nodeCons;
spq_qc_ctx* qc_ctx;
#endif
} RemoteQueryState;
extern RemoteQueryState* CreateResponseCombiner(int node_count, CombineType combine_type);
extern RemoteQueryState* CreateResponseCombinerForBarrier(int nodeCount, CombineType combineType);
extern bool validate_combiner(RemoteQueryState* combiner);
extern void CloseCombiner(RemoteQueryState* combiner);
extern void CloseCombinerForBarrier(RemoteQueryState* combiner);
extern bool ValidateAndCloseCombiner(RemoteQueryState* combiner);
#endif