*
* pgxcship.cpp
* Routines to evaluate expression shippability to remote nodes
*
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 2010-2012, Postgres-XC Development Group
* Portions Copyright (c) 1994, Regents of the University of California
* Portions Copyright (c) 2021, openGauss Contributors
*
*
* IDENTIFICATION
* src/gausskernel/optimizer/util/pgxcship.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/transam.h"
#include "catalog/pg_class.h"
#include "catalog/pg_inherits_fn.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_proc.h"
#ifdef PGXC
#include "catalog/pg_aggregate.h"
#include "catalog/pg_trigger.h"
#endif
#include "catalog/pg_type.h"
#include "catalog/pgxc_node.h"
#include "commands/proclang.h"
#include "commands/trigger.h"
#include "foreign/foreign.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/relation.h"
#include "optimizer/clauses.h"
#include "optimizer/nodegroups.h"
#include "optimizer/pathnode.h"
#include "optimizer/pgxcplan.h"
#include "optimizer/pgxcship.h"
#include "optimizer/planmain.h"
#include "optimizer/prep.h"
#include "optimizer/tlist.h"
#include "parser/analyze.h"
#include "parser/parsetree.h"
#include "parser/parse_coerce.h"
#include "parser/parse_type.h"
#include "pgxc/pgxcnode.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/hotkey.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/rel_gs.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#ifdef STREAMPLAN
#include "optimizer/streamplan.h"
#include "pgxc/pgxc.h"
#endif
typedef bool (*CheckColocateHook)(Query* query, RangeTblEntry* rte, void* plpgsql_func);
* Shippability_context
* This context structure is used by the Fast Query Shipping walker, to gather
* information during analysing query for Fast Query Shipping.
*/
typedef struct {
bool sc_for_expr;
* of the Query or a function RangeTableEntry
* , otherwise, we are checking
* shippability of a stand-alone expression.
*/
bool sc_for_trigger;
* Query inside a trigger body.
*/
Bitmapset* sc_shippability;
* query.
*/
Query* sc_query;
int sc_query_level;
int sc_max_varlevelsup;
* variable reference in the query. If this
* value is greater than 0, the query is not
* shippable, if shipped alone.
*/
ExecNodes* sc_exec_nodes;
ExecNodes* sc_subquery_en;
* for individual subqueries. This gets
* ultimately merged with sc_exec_nodes.
*/
bool sc_groupby_has_distcol;
bool sc_light_proxy;
void* sc_plpgsql_func;
CheckColocateHook sc_plpgsql_check_colocate_hook;
bool sc_contain_column_store;
bool sc_use_star_upper_level;
bool sc_disallow_volatile_func_shippability;
bool sc_inselect;
bool sc_security_barrier;
} Shippability_context;
* ShippabilityStat
* List of reasons why a query/expression is not shippable to remote nodes.
*/
typedef enum {
SS_UNSHIPPABLE_EXPR = 0,
SS_NEED_SINGLENODE,
* there is only a single node involved.
* Athought aggregates too fit in this class, we
* have a separate status to report aggregates,
* see below.
*/
SS_NEEDS_COORD,
SS_NO_NODES,
* the query
*/
SS_UNSUPPORTED_EXPR,
* by FQS, but such expressions might be
* supported by FQS in future
*/
SS_HAS_AGG_EXPR,
SS_UNSHIPPABLE_TYPE,
SS_UNSHIPPABLE_TRIGGER,
SS_UNSHIPPABLE_FUNCTION,
SS_NEED_NO_CSTORE,
SS_UNSHIPPABLE_UDF,
} ShippabilityStat;
static bool pgxc_test_shippability_reason(Shippability_context* context, ShippabilityStat reason);
static void pgxc_set_shippability_reason(Shippability_context* context, ShippabilityStat reason);
static void pgxc_reset_shippability_reason(Shippability_context* context, ShippabilityStat reason);
static bool pgxc_shippability_walker(Node* node, Shippability_context* sc_context);
static void pgxc_set_exprtype_shippability(Oid exprtype, Shippability_context* sc_context);
static void pgxc_set_insert_shippability(Query* query, Shippability_context* sc_context);
static ExecNodes* pgxc_FQS_get_relation_nodes(RangeTblEntry* rte, Index varno, Query* query);
static ExecNodes* pgxc_FQS_find_datanodes(Shippability_context* sc_context);
static bool pgxc_query_needs_coord(Query* query);
static bool pgxc_query_contains_only_pg_catalog(List* rtable);
static bool pgxc_distinct_has_distcol(Query* query, ExecNodes* exec_nodes);
static ExecNodes* pgxc_FQS_find_datanodes_recurse(Node* node, Shippability_context* sc_context);
static ExecNodes* pgxc_FQS_datanodes_for_rtr(Index varno, Shippability_context* sc_context);
static void pgxc_replace_dist_vars_subquery(Query* query, ExecNodes* exec_nodes, Index varno);
static bool pgxc_is_trigger_shippable(int trig_idx, Relation rel = NULL);
static bool check_has_sameAlias(List* targetList);
static bool pgxc_check_shippability_in_trigger(Query* query, Shippability_context* sc_context);
static void check_insert_subquery_shippability(Query* query, Query *subquery, Shippability_context* sc_context);
static bool PgxcIsDistInfoSame(ExecNodes *innerEn, ExecNodes *outerEn);
static void make_params_transparent(Query* query, int query_level);
static bool has_consistent_execnodes(ExecNodes* exec_nodes_rte, ExecNodes* exec_nodes_qry, int* dcol_cnt);
static Expr* find_distcol_expr(void* query, Index varno, AttrNumber attrNum, Node* quals);
* A set of functions for get_var_from_node() argument
* reject: return false all time (please set to default)
* pass: return true all time (in some condition)
* restricted: switch between set of oids and return true if found.
*/
Var* get_var_from_node(Node* node, bool (*func)(Oid) = func_oid_check_reject);
bool func_oid_check_reject(Oid oid)
{
return false;
}
bool func_oid_check_restricted(Oid oid)
{
switch (oid) {
case INT1TOINT2OID:
case INT1TOINT4OID:
case INT1TOINT8OID:
case INT2TOINT4OID:
case INT4TOINT2OID:
case INT2TOINT8OID:
case INT8TOINT2OID:
case INT4TOINT8OID:
case INT8TOINT4OID:
case FLOAT4TOFLOAT8OID:
case FLOAT8TOFLOAT4OID:
return true;
break;
default:
return false;
}
return false;
}
bool func_oid_check_pass(Oid oid)
{
return true;
}
* Set the given reason in Shippability_context indicating why the query can not be
* shipped directly to remote nodes.
*/
static void pgxc_set_shippability_reason(Shippability_context* context, ShippabilityStat reason)
{
context->sc_shippability = bms_add_member(context->sc_shippability, reason);
}
* pgxc_reset_shippability_reason
* Reset reason why the query cannot be shipped to remote nodes
*/
static void pgxc_reset_shippability_reason(Shippability_context* context, ShippabilityStat reason)
{
context->sc_shippability = bms_del_member(context->sc_shippability, reason);
return;
}
* See if a given reason is why the query can not be shipped directly
* to the remote nodes.
*/
static bool pgxc_test_shippability_reason(Shippability_context* context, ShippabilityStat reason)
{
return bms_is_member(reason, context->sc_shippability);
}
* pgxc_set_exprtype_shippability
* Set the expression type shippability. For now composite types
* derived from view definitions are not shippable.
*/
static void pgxc_set_exprtype_shippability(Oid exprtype, Shippability_context* sc_context)
{
char typerelkind = '\0';
Oid relid;
relid = typeidTypeRelid(exprtype);
if (relid != InvalidOid)
typerelkind = get_rel_relkind(relid);
if (RELKIND_IS_SEQUENCE(typerelkind) ||
typerelkind == RELKIND_VIEW ||
typerelkind == RELKIND_CONTQUERY)
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_TYPE);
}
* pgxc_FQS_datanodes_for_rtr
* For a given RangeTblRef find the datanodes where corresponding data is
* located.
*/
static ExecNodes* pgxc_FQS_datanodes_for_rtr(Index varno, Shippability_context* sc_context)
{
Query* query = sc_context->sc_query;
RangeTblEntry* rte = rt_fetch(varno, query->rtable);
if (rte->security_barrier) {
sc_context->sc_security_barrier = true;
}
switch (rte->rtekind) {
case RTE_RELATION: {
bool flag = rte->relkind != RELKIND_RELATION && rte->relkind != RELKIND_FOREIGN_TABLE &&
rte->relkind != RELKIND_MATVIEW && rte->relkind != RELKIND_STREAM;
if (flag)
return NULL;
* In case of inheritance, child tables can have completely different
* Datanode distribution than parent. To handle inheritance we need
* to merge the Datanodes of the children table as well. The inheritance
* is resolved during planning, so we may not have the RTEs of the
* children here. Also, the exact method of merging Datanodes of the
* children is not known yet. So, when inheritance is requested, query
* can not be shipped.
* See prologue of has_subclass, we might miss on the optimization
* because has_subclass can return true even if there aren't any
* subclasses, but it's ok.
*/
if (rte->inh && has_subclass(rte->relid))
return NULL;
return pgxc_FQS_get_relation_nodes(rte, varno, query);
} break;
case RTE_SUBQUERY: {
ExecNodes* exec_nodes = NULL;
bool contain_column_store = sc_context->sc_contain_column_store;
bool use_star_targets = false;
* Subquery in RangeTbleEntry is not scanned while scanning the
* parent query, since we don't scan the parent query's rtable.
*/
exec_nodes = (sc_context->sc_inselect) ? sc_context->sc_exec_nodes :
pgxc_is_query_shippable(rte->subquery,
sc_context->sc_query_level + 1,
sc_context->sc_light_proxy,
&contain_column_store,
&use_star_targets);
if (contain_column_store)
sc_context->sc_contain_column_store = contain_column_store;
if (use_star_targets)
query->use_star_targets = use_star_targets;
* If the query result is distributed by HASH or MODULO, we need to
* map the Vars on which its distributed to the columns in the
* result.
*/
if (exec_nodes && IsExecNodesDistributedByValue(exec_nodes))
pgxc_replace_dist_vars_subquery(rte->subquery, exec_nodes, varno);
return exec_nodes;
} break;
case RTE_VALUES: {
if (query->commandType != CMD_INSERT)
return NULL;
ExecNodes* exec_nodes = NULL;
* In case of Values, we only optimize the INSERT case when the
* result relation is located on a single node
*/
exec_nodes = pgxc_FQS_datanodes_for_rtr((Index)linitial_int(query->resultRelations), sc_context);
if (exec_nodes != NULL && exec_nodes->nodeList != NIL &&
list_length(exec_nodes->nodeList) == 1)
return exec_nodes;
else
return NULL;
} break;
case RTE_JOIN:
case RTE_CTE:
case RTE_FUNCTION:
default:
return NULL;
}
}
* match_equivclause_recurse
* Iterating relations list recursively. Returns a list of matched Vars.
* Pushing the remaining and the list of matched Vars to the next iteration.
* Keep iterating until no new matching Vars found in the list.
* @param Var/RelabelType, Const/Param pair and iterating list
* @return rewritten list
*/
static List* match_equivclause_recurse(Node* var, Node* con, List* quals_v)
{
ListCell* qcell = NULL;
List* ret_quals = NIL;
List* tmp_quals = NIL;
Node* tmp_var = NULL;
* The input list quals_v contains all Var-Var relations
* (or RelabelType variant depends on the input node *var type)
* For each relation, matching the input var with both args of the relation.
* If matched, then pair the other half with the input con and
* make up all implied relations base on that.
* To visualize this:
* var: A con: 2 from relation 'A = 2'
* In this iteration, we update the quals_v:
* A = B and C = A and B = C
* With this:
* 2 = B and C = 2 and B = C (and B = C)
* Then push the rest to the next iteration.
*/
foreach(qcell, quals_v) {
Expr* qual_expr = (Expr*)lfirst(qcell);
OpExpr* op = (OpExpr*)qual_expr;
Node* lvar = (Node*)linitial(op->args);
Node* rvar = (Node*)lsecond(op->args);
Node* tvar = NULL;
if (IsA(lvar, RelabelType)) {
lvar = (Node*)((RelabelType*)lvar)->arg;
}
if (IsA(rvar, RelabelType)) {
rvar = (Node*)((RelabelType*)rvar)->arg;
}
if ((nodeTag(lvar) != nodeTag(var)) && (nodeTag(rvar) != nodeTag(var))) {
tmp_quals = lappend(tmp_quals, op);
continue;
}
if (equal(lvar, var)) {
tvar = rvar;
} else if (equal(rvar, var)) {
tvar = lvar;
} else {
tmp_quals = lappend(tmp_quals, op);
}
if (tvar != NULL) {
OpExpr* ret_op = (OpExpr*)copyObject(op);
ret_op->args = list_make2((Node*)copyObject(tvar), (Node*)copyObject(con));
ret_quals = lappend(ret_quals, ret_op);
if (tmp_var == NULL) {
tmp_var = tvar;
continue;
}
if (!equal(tmp_var, tvar)) {
OpExpr* tmp_op = (OpExpr*)copyObject(op);
tmp_op->args = list_make2(tmp_var, tvar);
tmp_quals = lappend(tmp_quals, tmp_op);
tmp_var = tvar;
}
}
}
if (tmp_var == NULL) {
return list_concat(ret_quals, (List*)copyObject(tmp_quals));
}
ret_quals = list_concat(ret_quals, match_equivclause_recurse(tmp_var, con, tmp_quals));
list_free_ext(tmp_quals);
return ret_quals;
}
* match_equivclause
* Matching the Vars and its target Const by traversing the Var-Const relations list.
* @param list of con-con relation and list of var-var relation
* @return combined rewritten list
*/
static List* match_equivclause(List* quals_c, List* quals_v)
{
ListCell* qcell = NULL;
List* ret_quals = NIL;
Node* var = NULL;
Node* con = NULL;
* The input list quals_c contains all Var-Const relations
* (or Var-Param or both, depends on the query tree quals)
* Carefully seperate the Const with Var and pass them to the recurse
* as parameters. Note that the query can have multiple set of relations,
* so we need to make sure nothing get removed by accident in this process.
*/
foreach(qcell, quals_c) {
Expr* qual_expr = (Expr*)lfirst(qcell);
OpExpr* op = (OpExpr*)qual_expr;
Node* arg1 = (Node*)linitial(op->args);
Node* arg2 = (Node*)lsecond(op->args);
if (IsA(arg1, RelabelType)) {
arg1 = (Node*)((RelabelType*)arg1)->arg;
}
if (IsA(arg2, RelabelType)) {
arg2 = (Node*)((RelabelType*)arg2)->arg;
}
if ((IsA(arg1, Const)) || (IsA(arg1, Param))) {
var = arg2;
con = arg1;
} else if ((IsA(arg2, Const)) || (IsA(arg2, Param))) {
var = arg1;
con = arg2;
} else {
return NIL;
}
* Update lists if more than one Const is given.
* We only need those relations which has not been rewritten by recurse function.
* Therefore, we do a intersection on Var list before get into the next Const.
* Then filter out those extra relations in return quals until the last iteration.
*/
if (ret_quals != NULL) {
quals_v = list_intersection(ret_quals, quals_v);
ret_quals = list_difference(ret_quals, quals_v);
}
if (IsA(var, Var) || IsA(var, RelabelType)) {
ret_quals = list_concat_unique(ret_quals, match_equivclause_recurse(var, con, quals_v));
}
}
if (ret_quals != NIL) {
ret_quals = list_concat_unique(ret_quals, (List*)copyObject(quals_c));
}
return ret_quals;
}
static bool IsVarConstCond(const Node* arg1, const Node* arg2)
{
if ((IsA(arg1, Const) || IsA(arg1, Param)) && IsA(arg2, Var)) {
return true;
}
if ((IsA(arg2, Const) || IsA(arg2, Param)) && IsA(arg1, Var)) {
return true;
}
return false;
}
* rewrite_equivclause
* Find matching Vars and complete the clauses with equivalence classes.
* Rewrite the jointree quals to make it more straightforward.
* Node that we do not want to handle any kind of type casts here since they
* are not part of the equivalent class anymore.
* @param quals from FromExpr of the query
*/
static Node* rewrite_equivclause(Node *quals)
{
ListCell* qcell = NULL;
List* quals_all = NIL;
List* ret_quals = NIL;
List* tmp_quals_c = NIL;
List* tmp_quals_v = NIL;
if (!IsA(quals, List)) {
quals_all = make_ands_implicit((Expr*)quals);
} else {
quals_all = (List*)quals;
}
* Shuffle through all nodes. Seperate Var-Const and Var-Var relations
* into two seperate lists. Filter out unwanted expressions in quals.
* Append the unsupported/unwanted expression to the return quals.
*/
foreach(qcell, quals_all) {
Expr* qual_expr = (Expr*)lfirst(qcell);
OpExpr* op = (OpExpr*)qual_expr;
if (!IsA(qual_expr, OpExpr) || !isEqualExpr((Node*)op)) {
ret_quals = lappend(ret_quals, (Node*)copyObject(op));
continue;
}
Node* arg1 = (Node*)linitial(op->args);
Node* arg2 = (Node*)lsecond(op->args);
if (IsA(arg1, RelabelType)) {
arg1 = (Node*)((RelabelType*)arg1)->arg;
}
if (IsA(arg2, RelabelType)) {
arg2 = (Node*)((RelabelType*)arg2)->arg;
}
if (nodeTag(arg1) == nodeTag(arg2)) {
if (nodeTag(arg1) == T_Const) {
ret_quals = lappend(ret_quals, (Node*)copyObject(op));
continue;
}
tmp_quals_v = lappend(tmp_quals_v, op);
} else if (IsVarConstCond(arg1, arg2)) {
tmp_quals_c = lappend(tmp_quals_c, op);
} else {
ret_quals = lappend(ret_quals, (Node*)copyObject(op));
}
}
if (tmp_quals_c == NIL || tmp_quals_v == NIL) {
list_free_ext(tmp_quals_c);
list_free_ext(tmp_quals_v);
list_free_ext(quals_all);
list_free_deep(ret_quals);
return quals;
}
ret_quals = list_concat(ret_quals, match_equivclause(tmp_quals_c, tmp_quals_v));
if (ret_quals == NULL) {
ret_quals = quals_all;
} else {
list_free_ext(quals_all);
}
Node* result = (Node*)make_andclause(ret_quals);
list_free_ext(tmp_quals_c);
list_free_ext(tmp_quals_v);
return result;
}
* pgxc_FQS_find_datanodes_recurse
* Recursively find whether the sub-tree of From Expr rooted under given node is
* pushable and if yes where.
*/
static ExecNodes* pgxc_FQS_find_datanodes_recurse(Node* node, Shippability_context* sc_context)
{
Query* query = sc_context->sc_query;
if (node == NULL)
return NULL;
switch (nodeTag(node)) {
case T_FromExpr: {
FromExpr* from_expr = (FromExpr*)node;
ListCell* lcell = NULL;
bool first = false;
ExecNodes* result_en = NULL;
* For INSERT commands, we won't have any entries in the from list.
* Get the datanodes using the resultRelation index.
*/
if (query->commandType != CMD_SELECT && !from_expr->fromlist)
return pgxc_FQS_datanodes_for_rtr(linitial_int(query->resultRelations), sc_context);
* All the entries in the From list are considered to be INNER
* joined with the quals as the JOIN condition. Get the datanodes
* for the first entry in the From list. For every subsequent entry
* determine whether the join between the relation in that entry and
* the cumulative JOIN of previous entries can be pushed down to the
* datanodes and the corresponding set of datanodes where the join
* can be pushed down.
*/
first = true;
result_en = NULL;
foreach (lcell, from_expr->fromlist) {
Node* fromlist_entry = (Node*)lfirst(lcell);
ExecNodes* tmp_en = NULL;
ExecNodes* en = pgxc_FQS_find_datanodes_recurse(fromlist_entry, sc_context);
* If any entry in fromlist is not shippable, jointree is not
* shippable
*/
if (en == NULL) {
FreeExecNodes(&result_en);
return NULL;
}
if (!first && query->commandType != CMD_SELECT) {
FreeExecNodes(&result_en);
return NULL;
}
if (first) {
first = false;
result_en = en;
continue;
}
tmp_en = result_en;
* Check whether the JOIN is pushable to the datanodes and
* find the datanodes where the JOIN can be pushed to. In FQS
* the query is shippable if only all the expressions are
* shippable. Hence assume that the targetlists of the joining
* relations are shippable.
*/
result_en = pgxc_is_join_shippable(result_en, en, false, false, JOIN_INNER, from_expr->quals);
FreeExecNodes(&tmp_en);
}
return result_en;
} break;
case T_RangeTblRef: {
RangeTblRef* rtr = (RangeTblRef*)node;
return pgxc_FQS_datanodes_for_rtr(rtr->rtindex, sc_context);
} break;
case T_JoinExpr: {
JoinExpr* join_expr = (JoinExpr*)node;
ExecNodes* len = NULL;
ExecNodes* ren = NULL;
ExecNodes* result_en = NULL;
if (query->commandType != CMD_SELECT)
return NULL;
len = pgxc_FQS_find_datanodes_recurse(join_expr->larg, sc_context);
ren = pgxc_FQS_find_datanodes_recurse(join_expr->rarg, sc_context);
if (len == NULL || ren == NULL) {
FreeExecNodes(&len);
FreeExecNodes(&ren);
return NULL;
}
* Check whether the JOIN is pushable or not, and find the datanodes
* where the JOIN can be pushed to. In FQS the query is shippable if
* only all the expressions are shippable. Hence assume that the
* targetlists of the joining relations are shippable.
*/
result_en = pgxc_is_join_shippable(ren, len, false, false, join_expr->jointype, join_expr->quals);
FreeExecNodes(&len);
FreeExecNodes(&ren);
return result_en;
} break;
default:
return NULL;
break;
}
return NULL;
}
* pgxc_FQS_find_datanodes
* Find the list of nodes where to ship query.
*/
static ExecNodes* pgxc_FQS_find_datanodes(Shippability_context* sc_context)
{
ExecNodes* exec_nodes = NULL;
Query* query = sc_context->sc_query;
* For SELECT, the datanodes required to execute the query is obtained from
* the join tree of the query
*/
exec_nodes = pgxc_FQS_find_datanodes_recurse((Node*)query->jointree, sc_context);
if (exec_nodes != NULL && exec_nodes->nodeList != NIL) {
* If this is the highest level query in the query tree and
* relations involved in the query are such that ultimate JOIN is
* replicated JOIN, choose only one of them.
* If we do this for lower level queries in query tree, we might loose
* chance because common nodes are left out.
*/
if (IsExecNodesReplicated(exec_nodes) &&
((exec_nodes->accesstype == RELATION_ACCESS_READ_FOR_UPDATE && exec_nodes->nodeList->length > 1) ||
exec_nodes->accesstype == RELATION_ACCESS_READ) &&
sc_context->sc_query_level == 0 && !sc_context->sc_inselect) {
List* tmp_list = exec_nodes->nodeList;
exec_nodes->nodeList = GetPreferredReplicationNode(exec_nodes->nodeList);
list_free_ext(tmp_list);
}
return exec_nodes;
} else if (exec_nodes != NULL && exec_nodes->en_expr != NIL) {
* If we found the expression which can decide which can be used to decide
* where to ship the query, use that
*/
return exec_nodes;
}
return NULL;
}
static void free_distcol_info(Datum* distcol_value, bool* distcol_isnull, Oid* distcol_type, List* idx_dist)
{
pfree_ext(distcol_value);
pfree_ext(distcol_isnull);
pfree_ext(distcol_type);
list_free_ext(idx_dist);
}
static void free_and_reset_exec_nodes_info(ExecNodes* nodes, ExecNodes* rel_exec_nodes, RelationLocInfo* rel_loc_info)
{
list_free_deep(rel_exec_nodes->en_expr);
list_free_ext(rel_exec_nodes->primarynodelist);
list_free_ext(rel_exec_nodes->nodeList);
rel_exec_nodes->en_expr = NIL;
rel_exec_nodes->primarynodelist = nodes->primarynodelist;
rel_exec_nodes->nodeList = nodes->nodeList;
rel_exec_nodes->en_relid = rel_loc_info->relid;
pfree_ext(nodes);
}
* pgxc_FQS_find_insert_multiple_values_nodes
* Identifies the insert multi-values statement and determines
* whether the query can be executed by a single DN. If yes,
* the DN information is returned.
*/
static ExecNodes* pgxc_FQS_find_insert_multiple_values_nodes(Query* query, RangeTblEntry* values_rte,
RelationLocInfo* rel_loc_info, ExecNodes* rel_exec_nodes, List* shardingColNames, bool pseudoTsDistcol = false)
{
ListCell* cell = NULL;
foreach (cell, rel_exec_nodes->en_expr)
{
Expr* distcol_expr = (Expr*)lfirst(cell);
if (!IsA(distcol_expr, Var)) {
continue;
}
RangeTblEntry* rte_distcol_expr = rt_fetch(((Var*)distcol_expr)->varno, query->rtable);
if (rte_distcol_expr->rtekind != RTE_VALUES) {
return NULL;
}
}
int len;
if (pseudoTsDistcol) {
len = list_length(shardingColNames);
} else {
len = list_length(rel_loc_info->partAttrNum);
}
Datum* distcol_value = (Datum*)palloc(len * sizeof(Datum));
bool* distcol_isnull = (bool*)palloc(len * sizeof(bool));
Oid* distcol_type = (Oid*)palloc(len * sizeof(Oid));
List* idx_dist = NIL;
int i;
ListCell* cell1 = NULL;
ListCell* cell2 = NULL;
bool distcol_is_sharding = true;
ExecNodes* nodes = NULL;
foreach (cell, values_rte->values_lists) {
List* row_list = (List*)lfirst(cell);
i = 0;
list_free_ext(idx_dist);
forboth(cell1, rel_exec_nodes->en_expr, cell2, shardingColNames)
{
Expr* distcol_expr = (Expr*)lfirst(cell1);
Node* discol_node = NULL;
if (IsA(distcol_expr, Var)) {
Var* distcol_var = (Var*)distcol_expr;
discol_node = (Node*)list_nth(row_list, distcol_var->varattno - 1);
} else {
discol_node = (Node*)distcol_expr;
}
char* colName = (char*)lfirst(cell2);
ELOG_FIELD_NAME_START(colName);
distcol_expr = (Expr*)eval_const_expressions_params(NULL, (Node*)discol_node, query->boundParamsQ);
ELOG_FIELD_NAME_END;
if (distcol_expr != NULL && IsA(distcol_expr, Const)) {
Const* const_expr = (Const*)distcol_expr;
distcol_value[i] = const_expr->constvalue;
distcol_isnull[i] = const_expr->constisnull;
distcol_type[i] = const_expr->consttype;
idx_dist = lappend_int(idx_dist, i);
i++;
} else {
distcol_is_sharding = false;
break;
}
}
if (!distcol_is_sharding) {
break;
}
ExecNodes* tmp_nodes = GetRelationNodes(
rel_loc_info, distcol_value, distcol_isnull, distcol_type, idx_dist, rel_exec_nodes->accesstype);
if (nodes == NULL) {
nodes = tmp_nodes;
} else {
* when it's list/range distributed table,
* inserted values may match none of DNs and nodeList maybe null.
* so we must check nodeList first.
*/
if (nodes->nodeList == NULL || tmp_nodes->nodeList == NULL ||
linitial_int(nodes->nodeList) != linitial_int(tmp_nodes->nodeList)) {
distcol_is_sharding = false;
break;
}
}
}
free_distcol_info(distcol_value, distcol_isnull, distcol_type, idx_dist);
if (distcol_is_sharding && nodes != NULL && nodes->nodeList != NIL) {
free_and_reset_exec_nodes_info(nodes, rel_exec_nodes, rel_loc_info);
list_free_ext(shardingColNames);
return rel_exec_nodes;
} else {
return NULL;
}
}
* pgxc_FQS_find_insert_single_values_nodes
* Determines whether the query can be executed by a single DN. If yes,
* the DN information is returned.
*/
static ExecNodes* pgxc_FQS_find_insert_single_values_nodes(
Query* query, RelationLocInfo* rel_loc_info, ExecNodes* rel_exec_nodes, List* shardingColNames,
bool pseudoTsDistcol = false)
{
int len;
if (pseudoTsDistcol) {
len = list_length(shardingColNames);
} else {
len = list_length(rel_loc_info->partAttrNum);
}
Datum* distcol_value = (Datum*)palloc(len * sizeof(Datum));
bool* distcol_isnull = (bool*)palloc(len * sizeof(bool));
Oid* distcol_type = (Oid*)palloc(len * sizeof(Oid));
List* idx_dist = NIL;
int i = 0;
ListCell* cell1 = NULL;
ListCell* cell2 = NULL;
forboth(cell1, rel_exec_nodes->en_expr, cell2, shardingColNames)
{
Expr* distcol_expr = (Expr*)lfirst(cell1);
char* colName = (char*)lfirst(cell2);
ELOG_FIELD_NAME_START(colName);
distcol_expr = (Expr*)eval_const_expressions_params(NULL, (Node*)distcol_expr, query->boundParamsQ);
ELOG_FIELD_NAME_END;
if (distcol_expr != NULL && IsA(distcol_expr, Const)) {
Const* const_expr = (Const*)distcol_expr;
distcol_value[i] = const_expr->constvalue;
distcol_isnull[i] = const_expr->constisnull;
distcol_type[i] = const_expr->consttype;
idx_dist = lappend_int(idx_dist, i);
i++;
} else
break;
}
if (cell1 == NULL) {
ExecNodes* nodes = GetRelationNodes(
rel_loc_info, distcol_value, distcol_isnull, distcol_type, idx_dist, rel_exec_nodes->accesstype);
free_distcol_info(distcol_value, distcol_isnull, distcol_type, idx_dist);
if (nodes != NULL && nodes->nodeList != NIL) {
free_and_reset_exec_nodes_info(nodes, rel_exec_nodes, rel_loc_info);
list_free_ext(shardingColNames);
return rel_exec_nodes;
} else {
return NULL;
}
} else {
free_distcol_info(distcol_value, distcol_isnull, distcol_type, idx_dist);
return NULL;
}
}
* ts_get_taglist
* Return a list of all tag columns in the timeserise table.
* This function applies only to timeseries relation which use the pseudo distribution column.
* So the hidden column is checked in the function.
*/
static List* ts_get_taglist(Oid relid)
{
Relation rel = relation_open(relid, AccessShareLock);
TupleDesc tup_desc = RelationGetDescr(rel);
FormData_pg_attribute* attr = tup_desc->attrs;
List* tag_name_list = NIL;
bool valid_ts_table = false;
char* str = NULL;
for (int i = 0; i < tup_desc->natts; i++) {
if (attr[i].attkvtype == ATT_KV_TAG && IsTypeDistributable(attr[i].atttypid)) {
str = pstrdup(NameStr(attr[i].attname));
if (str != NULL) {
tag_name_list = lappend(tag_name_list, makeString(str));
} else {
ereport(ERROR, (errmodule(MOD_TIMESERIES),
errmsg("Timeseries relation %u cloud not find tag column %d", relid, i)));
}
}
if (attr[i].attkvtype == ATT_KV_HIDE) {
valid_ts_table = true;
}
}
if (!valid_ts_table) {
relation_close(rel, AccessShareLock);
list_free_ext(tag_name_list);
return NULL;
}
Assert(tag_name_list != NULL);
relation_close(rel, AccessShareLock);
return tag_name_list;
}
static RangeTblEntry* get_insert_multiple_values_rte(Query* query)
{
ListCell* cell = NULL;
RangeTblEntry* rte_tmp = NULL;
RangeTblEntry* values_rte = NULL;
foreach (cell, query->rtable) {
rte_tmp = (RangeTblEntry*)lfirst(cell);
if (rte_tmp->rtekind == RTE_VALUES) {
if (values_rte != NULL) {
ereport(ERROR,
(errcode(ERRCODE_RESTRICT_VIOLATION),
errmsg("too many values RTEs in INSERT")));
}
values_rte = rte_tmp;
}
}
return values_rte;
}
* pgxc_FQS_get_relation_nodes
* Return ExecNodes structure so as to decide which node the query should
* execute on. If it is possible to set the node list directly, set it.
* Otherwise set the appropriate distribution column expression or relid in
* ExecNodes structure.
*/
static ExecNodes* pgxc_FQS_get_relation_nodes(RangeTblEntry* rte, Index varno, Query* query)
{
CmdType command_type = query->commandType;
bool for_update = query->rowMarks ? true : false;
ExecNodes* rel_exec_nodes = NULL;
RelationAccessType rel_access = RELATION_ACCESS_READ;
RelationLocInfo* rel_loc_info = NULL;
AssertEreport(rte == rt_fetch(varno, (query->rtable)), MOD_OPT, "could not find range table entry.");
switch (command_type) {
case CMD_SELECT:
if (for_update)
rel_access = RELATION_ACCESS_READ_FOR_UPDATE;
else
rel_access = RELATION_ACCESS_READ;
break;
case CMD_UPDATE:
case CMD_DELETE:
case CMD_MERGE:
rel_access = RELATION_ACCESS_UPDATE;
break;
case CMD_INSERT:
rel_access = RELATION_ACCESS_INSERT;
break;
default:
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE),
errmsg("Unrecognised command type %d", command_type)));
break;
}
rel_loc_info = GetRelationLocInfo(rte->relid);
if (rel_loc_info == NULL)
return NULL;
if (list_length(rel_loc_info->nodeList) == 1) {
rel_exec_nodes = GetRelationNodes(rel_loc_info, NULL, NULL, NULL, NULL, rel_access);
return rel_exec_nodes;
}
rel_exec_nodes = GetRelationNodesByQuals(
(void*)query, rte->relid, varno, query->jointree->quals, rel_access, query->boundParamsQ, true);
if (list_length(query->rtable) != 1 && rel_exec_nodes == NULL)
rel_exec_nodes = GetRelationNodes(rel_loc_info, NULL, NULL, NULL, NULL, rel_access);
if (rel_exec_nodes == NULL)
return NULL;
if (IsExecNodesDistributedByValue(rel_exec_nodes)) {
List* dist_var = pgxc_get_dist_var(varno, rte, query->targetList);
rel_exec_nodes->en_dist_vars = list_make1(dist_var);
}
if (rel_access == RELATION_ACCESS_INSERT && IsRelationDistributedByValue(rel_loc_info)) {
ListCell* lc = NULL;
TargetEntry* tle = NULL;
* If the INSERT is happening on a table distributed by value of a
* column, find out the
* expression for distribution column in the targetlist, and stick in
* in ExecNodes, and clear the nodelist. Execution will find
* out where to insert the row.
*/
List* distributeCol = GetRelationDistribColumn(rel_loc_info);
ListCell* cell = NULL;
List* shardingColNames = NIL;
bool pseudoTsDistcol = false;
* replace the distribute list to the tag column list.
*/
if (rte->orientation == REL_TIMESERIES_ORIENTED && list_length(distributeCol) == 1 &&
strcmp(TS_PSEUDO_DIST_COLUMN, strVal(lfirst(list_head(distributeCol)))) == 0) {
ereport(DEBUG1, (errmodule(MOD_TIMESERIES),
errmsg("In FQS path replace hidetag distribution.")));
if (ts_get_taglist(rte->relid) != NULL) {
distributeCol = ts_get_taglist(rte->relid);
pseudoTsDistcol = true;
}
}
foreach (cell, distributeCol) {
foreach (lc, query->targetList) {
tle = (TargetEntry*)lfirst(lc);
if (tle->resjunk)
continue;
if (strcmp(tle->resname, strVal(lfirst(cell))) == 0) {
tle = (TargetEntry*)eval_const_expressions(NULL, (Node*)tle);
rel_exec_nodes->en_expr = lappend(rel_exec_nodes->en_expr, tle->expr);
shardingColNames = lappend(shardingColNames, tle->resname);
break;
}
}
if (lc == NULL) {
list_free_deep(rel_exec_nodes->en_expr);
list_free_ext(shardingColNames);
return NULL;
}
}
if (query->boundParamsQ || !u_sess->pcache_cxt.query_has_params) {
RangeTblEntry* values_rte = get_insert_multiple_values_rte(query);
if (pseudoTsDistcol) {
if (values_rte != NULL) {
ExecNodes* nodes = pgxc_FQS_find_insert_multiple_values_nodes(
query, values_rte, rel_loc_info, rel_exec_nodes, shardingColNames, true);
if (nodes != NULL) {
return nodes;
}
} else {
ExecNodes* nodes = pgxc_FQS_find_insert_single_values_nodes(
query, rel_loc_info, rel_exec_nodes, shardingColNames, true);
if (nodes != NULL) {
return nodes;
}
}
} else {
if (values_rte != NULL) {
ExecNodes* nodes = pgxc_FQS_find_insert_multiple_values_nodes(
query, values_rte, rel_loc_info, rel_exec_nodes, shardingColNames);
if (nodes != NULL) {
return nodes;
}
} else {
ExecNodes* nodes = pgxc_FQS_find_insert_single_values_nodes(
query, rel_loc_info, rel_exec_nodes, shardingColNames);
if (nodes != NULL) {
return nodes;
}
}
}
}
list_free_ext(distributeCol);
list_free_ext(rel_exec_nodes->primarynodelist);
rel_exec_nodes->primarynodelist = NULL;
list_free_ext(rel_exec_nodes->nodeList);
rel_exec_nodes->nodeList = NULL;
rel_exec_nodes->en_relid = rel_loc_info->relid;
list_free_ext(shardingColNames);
}
return rel_exec_nodes;
}
static bool isIncludeAlldistcol(List* dist_vars, List* queryClause, List* targetList)
{
ListCell* l_cell = NULL;
ListCell* lcell = NULL;
ListCell* cell = NULL;
List* dis_list = NIL;
Var* disVar = NULL;
foreach (l_cell, dist_vars) {
dis_list = (List*)lfirst(l_cell);
foreach (cell, dis_list) {
disVar = (Var*)lfirst(cell);
foreach (lcell, queryClause) {
SortGroupClause* sgc = (SortGroupClause*)lfirst(lcell);
Node* sgc_expr = NULL;
if (!IsA(sgc, SortGroupClause))
continue;
sgc_expr = get_sortgroupclause_expr(sgc, targetList);
if (unlikely(sgc_expr == NULL)) {
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_UNEXPECTED_NULL_VALUE),
errmsg("sgc_expr should not be NULL")));
}
if (IsA(sgc_expr, Var) && equal(sgc_expr, disVar)) {
break;
}
}
if (NULL == lcell) {
break;
}
}
if (NULL == cell) {
return true;
}
}
return false;
}
bool pgxc_query_has_distcolgrouping(Query* query, ExecNodes* exec_nodes)
{
if (exec_nodes == NULL || exec_nodes->en_dist_vars == NIL || !query->groupClause)
return false;
return isIncludeAlldistcol(exec_nodes->en_dist_vars, query->groupClause, query->targetList);
}
static bool pgxc_distinct_has_distcol(Query* query, ExecNodes* exec_nodes)
{
if (exec_nodes == NULL || exec_nodes->en_dist_vars == NIL || !query->distinctClause)
return false;
return isIncludeAlldistcol(exec_nodes->en_dist_vars, query->distinctClause, query->targetList);
}
* pgxc_shippability_walker
* walks the query/expression tree routed at the node passed in, gathering
* information which will help decide whether the query to which this node
* belongs is shippable to the Datanodes.
*
* The function should try to walk the entire tree analysing each subquery for
* shippability. If a subquery is shippable but not the whole query, we would be
* able to create a RemoteQuery node for that subquery, shipping it to the
* Datanode.
*
* Return value of this function is governed by the same rules as
* expression_tree_walker(), see prologue of that function for details.
*/
static bool pgxc_shippability_walker(Node* node, Shippability_context* sc_context)
{
if (node == NULL)
return false;
* kind of node and find out under what conditions query with this node can
* be shippable. For each node, update the context (add fields if
* necessary) so that decision whether to FQS the query or not can be made.
* Every node which has a result is checked to see if the result type of that
* expression is shippable.
*/
switch (nodeTag(node)) {
case T_Const:
case T_UserVar:
pgxc_set_exprtype_shippability(exprType(node), sc_context);
break;
* For placeholder nodes the shippability of the node, depends upon the
* expression which they refer to. It will be checked separately, when
* that expression is encountered.
*/
case T_CaseTestExpr:
pgxc_set_exprtype_shippability(exprType(node), sc_context);
break;
* record_in() function throws error, thus requesting a result in the
* form of anonymous record from datanode gets into error. Hence, if the
* top expression of a target entry is ROW(), it's not shippable.
*/
case T_TargetEntry: {
TargetEntry* tle = (TargetEntry*)node;
if (tle->expr) {
char typtype = get_typtype(exprType((Node*)tle->expr));
if (!typtype || typtype == TYPTYPE_PSEUDO) {
if (sc_context->sc_contain_column_store)
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
else {
pgxc_set_shippability_reason(sc_context, SS_NEED_NO_CSTORE);
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
}
}
}
} break;
case T_SortGroupClause:
if (sc_context->sc_for_expr)
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
break;
case T_CoerceViaIO: {
CoerceViaIO* cvio = (CoerceViaIO*)node;
Oid input_type = exprType((Node*)cvio->arg);
Oid output_type = cvio->resulttype;
CoercionContext cc;
cc = ((cvio->coerceformat == COERCE_IMPLICIT_CAST) ? COERCION_IMPLICIT : COERCION_EXPLICIT);
* Internally we use IO coercion for types which do not have casting
* defined for them e.g. cstring::date. If such casts are sent to
* the datanode, those won't be accepted. Hence such casts are
* unshippable. Since it will be shown as an explicit cast.
*
* If single node with no column table, it can be shipped
* because cn and dn should do the same casting.
*/
if (IsA(cvio->arg, Const))
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
else if (!can_coerce_type(1, &input_type, &output_type, cc)) {
if (sc_context->sc_contain_column_store)
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
else {
pgxc_set_shippability_reason(sc_context, SS_NEED_NO_CSTORE);
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
}
}
pgxc_set_exprtype_shippability(exprType(node), sc_context);
} break;
* Nodes, which are shippable if the tree rooted under these nodes is
* shippable
*/
case T_CoerceToDomainValue:
* Mostly, CoerceToDomainValue node appears in DDLs,
* do we handle DDLs here?
*/
case T_FieldSelect:
case T_NamedArgExpr:
case T_RelabelType:
case T_BoolExpr:
* We might need to take into account the kind of boolean
* operator we have in the quals and see if the corresponding
* function is immutable.
*/
case T_ArrayCoerceExpr:
case T_ConvertRowtypeExpr:
case T_CaseExpr:
case T_ArrayExpr:
case T_RowExpr:
case T_CollateExpr:
case T_CoalesceExpr:
case T_XmlExpr:
case T_NullTest:
case T_BooleanTest:
case T_CoerceToDomain:
case T_HashFilter:
pgxc_set_exprtype_shippability(exprType(node), sc_context);
break;
case T_List:
case T_RangeTblRef:
case T_UpsertExpr:
break;
case T_ArrayRef:
* When multiple values of of an array are updated at once
* FQS planner cannot yet handle SQL representation correctly.
* So disable FQS in this case and let standard planner manage it.
*/
case T_FieldStore:
* openGauss deparsing logic does not handle the FieldStore
* for more than one fields (see processIndirection()). So, let's
* handle it through standard planner, where whole row will be
* constructed.
*/
case T_SetToDefault:
* we should actually check whether the default value to
* be substituted is shippable to the Datanode.
*/
if (sc_context->sc_contain_column_store)
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
else {
pgxc_set_shippability_reason(sc_context, SS_NEED_NO_CSTORE);
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
}
pgxc_set_exprtype_shippability(exprType(node), sc_context);
break;
case T_Var: {
Var* var = (Var*)node;
* if a subquery references an upper level variable, that query is
* not shippable, if shipped alone.
*/
if (var->varlevelsup > (unsigned int)sc_context->sc_max_varlevelsup)
sc_context->sc_max_varlevelsup = var->varlevelsup;
pgxc_set_exprtype_shippability(exprType(node), sc_context);
} break;
case T_Param: {
Param* param = (Param*)node;
if (param->paramkind != PARAM_EXTERN)
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
pgxc_set_exprtype_shippability(exprType(node), sc_context);
} break;
case T_CurrentOfExpr: {
* Ideally we should not see CurrentOf expression here, it
* should have been replaced by the CTID = ? expression. But
* still, no harm in shipping it as is.
*/
pgxc_set_exprtype_shippability(exprType(node), sc_context);
} break;
case T_Aggref: {
Aggref* aggref = (Aggref*)node;
* An aggregate is completely shippable to the Datanode, if the
* whole group resides on that Datanode. This will be clear when
* we see the GROUP BY clause.
* agglevelsup is minimum of variable's varlevelsup, so we will
* set the sc_max_varlevelsup when we reach the appropriate
* VARs in the tree.
*/
pgxc_set_shippability_reason(sc_context, SS_HAS_AGG_EXPR);
* If a stand-alone expression to be shipped, is an
* 1. aggregate with ORDER BY, DISTINCT directives, it needs all
* the qualifying rows
* 2. aggregate without collection function
* 3. aggregate with polymorphic transition type, the
* the transition type needs to be resolved to correctly interpret
* the transition results from Datanodes.
* Hence, such an expression can not be shipped to the datanodes.
*/
if (aggref->aggorder || aggref->aggdistinct || aggref->agglevelsup || !aggref->agghas_collectfn ||
IsPolymorphicType(aggref->aggtrantype)) {
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
* we will append finalised agg func to the pushed down SQL query.
* But for array_agg/string_agg, the finalised func are not supported
* calling as SQL functions.because they are internal functions.
* So not push down them.
*/
if (!pgxc_is_internal_agg_final_func(aggref->aggfnoid))
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
}
* If the aggregate is on anyarray,
* and coordinates will execute on the aggregate output,
* such as doing ORBER BY or encoding mismatches between client and server,
* such expression can not be shipped to the datanodes.
* This is because in parse_agg() function,
* datanode will replace agg->aggtype with agg->aggtrantype.
* After fix that, remember to remove this.
* For example:
* postgres=# show server_encoding;
* server_encoding
* -----------------
* UTF8
* (1 row)
* postgres=# set client_encoding=SQL_ASCII;
* SET
* postgres=# select max(f1) from arraggtest;
* ERROR: cache lookup failed for type 2779086336
* postgres=# explain verbose select max(f1) from arraggtest;
* QUERY PLAN
* --------------------------------------------------------------
* Data Node Scan (cost=0.00..0.00 rows=0 width=0)
* Output: (max(arraggtest.f1))
* Node/s: datanode2
* Remote query: SELECT max(f1) AS max FROM public.arraggtest
* (4 rows)
* postgres=# select max(f1) from arraggtest;
* ERROR: cache lookup failed for type 2779086336
*/
if (aggref->aggtrantype == ANYARRAYOID) {
if (!sc_context->sc_light_proxy ||
(sc_context->sc_query != NULL && sc_context->sc_query->sortClause != NIL))
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
}
if (sc_context->sc_light_proxy) {
HeapTuple aggTuple;
Form_pg_aggregate aggform;
aggTuple = SearchSysCache(AGGFNOID, ObjectIdGetDatum(aggref->aggfnoid), 0, 0, 0);
if (!HeapTupleIsValid(aggTuple))
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_CACHE_LOOKUP_FAILED),
errmsg("cache lookup failed for aggregate %u", aggref->aggfnoid)));
aggform = (Form_pg_aggregate)GETSTRUCT(aggTuple);
if (OidIsValid(aggform->aggfinalfn))
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
ReleaseSysCache(aggTuple);
}
pgxc_set_exprtype_shippability(exprType(node), sc_context);
} break;
case T_FuncExpr: {
FuncExpr* funcexpr = (FuncExpr*)node;
* NOTICE: it's too restrictive not to ship non-immutable
* functions to the Datanode. We need a better way to see what
* can be shipped to the Datanode and what can not be.
*/
shipping_context context;
errno_t rc = memset_s(&context, sizeof(context), 0, sizeof(context));
securec_check(rc, "\0", "\0");
context.is_randomfunc_shippable = u_sess->opt_cxt.is_randomfunc_shippable && IS_STREAM_PLAN;
context.is_ecfunc_shippable = IS_STREAM_PLAN;
context.disallow_volatile_func_shippable = sc_context->sc_disallow_volatile_func_shippability;
if (!pgxc_is_func_shippable(funcexpr->funcid, &context)) {
if (funcexpr->funcid == NEXTVALFUNCOID)
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
else
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_UDF);
} else if (funcexpr->funcid == TESTSKEWNESSRETURNTYPE) {
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
}
if (pgxc_is_shippable_func_contain_any(funcexpr->funcid)) {
return expression_tree_walker(
(Node*)funcexpr->args, (bool (*)())pgxc_shippability_walker, (void*)sc_context);
}
* If this is a stand alone expression and the function returns a
* set of rows, we need to handle it along with the final result of
* other expressions. So, it can not be shippable.
*/
if (funcexpr->funcretset && sc_context->sc_for_expr)
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_FUNCTION);
pgxc_set_exprtype_shippability(exprType(node), sc_context);
} break;
case T_OpExpr:
case T_DistinctExpr:
case T_NullIfExpr:
{
* All of these three are structurally equivalent to OpExpr, so
* cast the node to OpExpr and check if the operator function is
* immutable. See NOTICE item for FuncExpr.
*/
OpExpr* op_expr = (OpExpr*)node;
Oid opfuncid = OidIsValid(op_expr->opfuncid) ? op_expr->opfuncid : get_opcode(op_expr->opno);
if (!OidIsValid(opfuncid) || !pgxc_is_func_shippable(opfuncid))
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
pgxc_set_exprtype_shippability(exprType(node), sc_context);
} break;
case T_ScalarArrayOpExpr: {
* Check if the operator function is shippable to the Datanode
* NOTICE: see immutability note for FuncExpr above
*/
ScalarArrayOpExpr* sao_expr = (ScalarArrayOpExpr*)node;
Oid opfuncid = OidIsValid(sao_expr->opfuncid) ? sao_expr->opfuncid : get_opcode(sao_expr->opno);
if (!OidIsValid(opfuncid) || !pgxc_is_func_shippable(opfuncid))
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
} break;
case T_RowCompareExpr:
case T_MinMaxExpr: {
* Should we be checking the comparision operator
* functions as well, as we did for OpExpr OR that check is
* unnecessary. Operator functions are always shippable?
* Otherwise this node should be treated similar to other
* "shell" nodes.
*/
pgxc_set_exprtype_shippability(exprType(node), sc_context);
} break;
case T_Query: {
Query* query = (Query*)node;
bool hasTrigger = false;
if (NULL == sc_context->sc_query) {
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_OPTIMIZER_INCONSISTENT_STATE),
errmsg("Invalid query for shippable check.")));
}
if (query->returningList)
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
if (sc_context->sc_for_expr) {
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
break;
}
* We are checking shippability of whole query, go ahead. The query
* in the context should be same as the query being checked
*/
AssertEreport(query == sc_context->sc_query, MOD_OPT, "fqs query is unexpected.");
if (query->commandType == CMD_UTILITY && IsA(query->utilityStmt, CreateTableAsStmt))
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
if (!sc_context->sc_light_proxy && query->commandType == CMD_SELECT && sc_context->sc_query_level > 0 &&
check_has_sameAlias(query->targetList)) {
sc_context->sc_use_star_upper_level = true;
}
* If it is a PBE-style SELECT query with multiple tables,
* we only ship it if single node for now.
*/
if (query->commandType == CMD_SELECT && list_length(query->rtable) > 1 &&
u_sess->pcache_cxt.query_has_params) {
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
}
if (query->hasRecursive)
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
* If the query needs Coordinator for evaluation or the query can be
* completed on Coordinator itself, we don't ship it to the Datanode
*/
if (pgxc_query_needs_coord(query))
pgxc_set_shippability_reason(sc_context, SS_NEEDS_COORD);
* whether it can be completely evaluated on the Datanode just like SELECT
* queries. But we need to be careful while finding out the Datanodes to
* execute the query on, esp. for the result relations. If one happens to
* remove/change this restriction, make sure you change
* pgxc_FQS_get_relation_nodes appropriately.
* For now UPDATE and DELETE DMLs with single rtable entry are candidates for FQS
* INSERT DMLs will be checked later within pgxc_set_insert_shippability.
*/
if (query->commandType != CMD_SELECT && query->commandType != CMD_INSERT &&
list_length(query->rtable) > 1) {
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
}
* group by operator. Each group by maybe include all distribute also possible not
* include all distribute therefore to ap function we need send plan to datanodes.
*
* However, we can ship the query if single node.
*/
if (query->groupingSets) {
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
}
* In following conditions query is shippable when there is only one
* Datanode involved
* 1. the query has aggregagtes without grouping by distribution
* column
* 2. the query has window functions
* 3. the query has ORDER BY clause
* 4. the query has Distinct clause without distribution column in
* distinct clause
* 5. the query has limit and offset clause
*/
if (query->hasWindowFuncs || query->sortClause || query->limitOffset || query->limitCount)
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
if (sc_context->sc_for_trigger && (CMD_UPDATE == query->commandType || CMD_DELETE == query->commandType)) {
if (!pgxc_check_shippability_in_trigger(query, sc_context))
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
}
* If the distribution column in insert query is a set, the query won't be shippable
*/
bool temp_random_shippabe = u_sess->opt_cxt.is_randomfunc_shippable;
if (CMD_INSERT == query->commandType) {
if (sc_context->sc_for_trigger) {
if (!pgxc_check_shippability_in_trigger(query, sc_context))
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
} else {
pgxc_set_insert_shippability(query, sc_context);
}
}
if (query->commandType != CMD_SELECT) {
RangeTblEntry* rte = (RangeTblEntry*)list_nth(query->rtable, linitial_int(query->resultRelations) - 1);
RelationLocInfo* locator = GetRelationLocInfo(rte->relid);
if (NULL != locator && IsLocatorReplicated(locator->locatorType)) {
sc_context->sc_disallow_volatile_func_shippability = true;
}
}
if (sc_context->sc_light_proxy) {
ListCell* lc = NULL;
foreach (lc, query->rtable) {
RangeTblEntry* rte = (RangeTblEntry*)lfirst(lc);
if (rte->rtekind == RTE_RELATION && (rte->relkind == RELKIND_VIEW ||
rte->relkind == RELKIND_CONTQUERY)) {
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
break;
}
}
}
* walk the entire query tree to analyse the query. We will walk the
* range table, when examining the FROM clause. No need to do it
* here. However, for light proxy, we will find view that doesn't
* in FROM clause, so traverse it here
*/
if (query_tree_walker(query, (bool (*)())pgxc_shippability_walker, sc_context, QTW_IGNORE_RANGE_TABLE)) {
u_sess->opt_cxt.is_randomfunc_shippable = temp_random_shippabe;
return true;
}
u_sess->opt_cxt.is_randomfunc_shippable = temp_random_shippabe;
* NOTICE:
* There is a subquery in this query, which references Vars in the upper
* query. For now stop shipping such queries. We should get rid of this
* condition.
*
* For single node, we can ship the query.
*/
if (sc_context->sc_max_varlevelsup != 0)
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
* Check shippability of triggers on this query. Don't consider
* TRUNCATE triggers; it's a utility statement and triggers are
* handled explicitly in ExecuteTruncate()
*/
if (query->commandType == CMD_UPDATE || query->commandType == CMD_INSERT ||
query->commandType == CMD_DELETE) {
RangeTblEntry* rte = (RangeTblEntry*)list_nth(query->rtable, linitial_int(query->resultRelations) - 1);
if (!pgxc_check_triggers_shippability(rte->relid, query->commandType, &hasTrigger)) {
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_TRIGGER);
} else {
if (u_sess->attr.attr_sql.enable_trigger_shipping && hasTrigger && !sc_context->sc_for_trigger)
query->isRowTriggerShippable = true;
else
query->isRowTriggerShippable = false;
}
}
* If the query contains equivalent classes, rewrite the quals to boost performance.
* e.g.
* Rewrite [... WHERE A = B and B = 3] into [... WHERE A = 3 and B = 3]
* The original quals are recovered afterwards. All changes on quals will be discarded.
*/
bool has_rewritten_quals = false;
Node* copied_quals = NULL;
Node* original_quals = NULL;
if (query->jointree != NULL && IsA((Node*)query->jointree, FromExpr)) {
FromExpr* from_expr = (FromExpr*)query->jointree;
if (and_clause(from_expr->quals)) {
copied_quals = (Node*)copyObject(from_expr->quals);
original_quals = from_expr->quals;
query->jointree->quals = copied_quals;
from_expr->quals = rewrite_equivclause(from_expr->quals);
has_rewritten_quals = true;
}
}
* Walk the join tree of the query and find the
* Datanodes needed for evaluating this query
*/
sc_context->sc_exec_nodes = pgxc_FQS_find_datanodes(sc_context);
if (has_rewritten_quals) {
query->jointree->quals = original_quals;
pfree_ext(copied_quals);
}
* Presence of aggregates or having clause, implies grouping. In
* such cases, the query won't be shippable unless 1. there is only
* a single node involved 2. GROUP BY clause has distribution column
* in it. In the later case aggregates for a given group are entirely
* computable on a single datanode, because all the rows
* participating in particular group reside on that datanode.
* The distribution column can be of any relation
* participating in the query. All the rows of that relation with
* the same value of distribution column reside on same node.
*/
if ((query->hasAggs || query->havingQual || query->groupClause) && sc_context->sc_exec_nodes &&
!pgxc_query_has_distcolgrouping(query, sc_context->sc_exec_nodes))
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
* If distribution column of any relation is present in the distinct
* clause, values for that column across nodes will differ, thus two
* nodes won't be able to produce same result row. Hence in such
* case, we can execute the queries on many nodes managing to have
* distinct result.
*/
if (query->distinctClause && sc_context->sc_exec_nodes &&
!pgxc_distinct_has_distcol(query, sc_context->sc_exec_nodes))
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
} break;
case T_FromExpr: {
if (sc_context->sc_for_expr)
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
* We will examine the jointree of query separately to determine the
* set of datanodes where to execute the query.
* If this is an INSERT query with quals, resulting from say
* conditional rule, we can not handle those in FQS, since there is
* not SQL representation for such quals.
*/
if (sc_context->sc_query != NULL && sc_context->sc_query->commandType == CMD_INSERT &&
((FromExpr*)node)->quals)
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
} break;
case T_WindowFunc: {
WindowFunc* winf = (WindowFunc*)node;
* A window function can be evaluated on a Datanode if there is
* only one Datanode involved.
*/
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
* row_number() is not shippable for replication tables, since
* the data may not have the same order on all datanodes.
*/
if (winf->winfnoid == ROWNUMBERFUNCOID) {
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
}
* A window function is not shippable as part of a stand-alone
* expression. If the window function is non-immutable, it can not
* be shipped to the datanodes.
*/
if (sc_context->sc_for_expr || !pgxc_is_func_shippable(winf->winfnoid)) {
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
}
pgxc_set_exprtype_shippability(exprType(node), sc_context);
} break;
case T_WindowClause: {
* A window function can be evaluated on a Datanode if there is
* only one Datanode involved.
*/
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
* A window function is not shippable as part of a stand-alone
* expression
*/
if (sc_context->sc_for_expr)
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
} break;
case T_JoinExpr:
if (sc_context->sc_for_expr)
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
* The shippability of join will be deduced while
* examining the jointree of the query. Nothing to do here
*/
break;
case T_SubLink: {
* We need to walk the tree in sublink to check for its
* shippability. We need to call pgxc_is_query_shippable() on Query
* instead of this function so that every subquery gets a different
* context for itself. We should avoid the default expression walker
* getting called on the subquery. At the same time we don't want to
* miss any other member (current or future) of this structure, from
* being scanned. So, copy the SubLink structure with subselect
* being NULL and call expression_tree_walker on the copied
* structure.
*/
SubLink sublink = *(SubLink*)node;
ExecNodes* sublink_en = NULL;
* Walk the query and find the nodes where the query should be
* executed and node distribution. Merge this with the existing
* node list obtained for other subqueries. If merging fails, we
* can not ship the whole query.
*/
if (IsA(sublink.subselect, Query)) {
bool use_star_targets = false;
bool contain_column_store = sc_context->sc_contain_column_store;
sublink_en = pgxc_is_query_shippable((Query*)(sublink.subselect),
sc_context->sc_query_level + 1,
sc_context->sc_light_proxy,
&contain_column_store,
&use_star_targets);
if (contain_column_store)
sc_context->sc_contain_column_store = contain_column_store;
if (use_star_targets && sc_context->sc_query) {
sc_context->sc_query->use_star_targets = use_star_targets;
}
} else
sublink_en = NULL;
* to evaluate on, don't bother to merge again.
*/
if (!pgxc_test_shippability_reason(sc_context, SS_NO_NODES)) {
* If this is the first time we are finding out the nodes for
* SubLink, we don't have anything to merge, just assign.
*/
if (sc_context->sc_subquery_en == NULL)
sc_context->sc_subquery_en = sublink_en;
* Merge the accumulated SubLink ExecNodes and the
* ExecNodes for this subquery.
*/
else if (sublink_en != NULL) {
sc_context->sc_subquery_en = pgxc_merge_exec_nodes(sublink_en, sc_context->sc_subquery_en);
} else
sc_context->sc_subquery_en = NULL;
* If we didn't find a cumulative ExecNodes, set shippability
* reason, so that we don't bother merging future sublinks.
*/
if (sc_context->sc_subquery_en == NULL)
pgxc_set_shippability_reason(sc_context, SS_NO_NODES);
} else
AssertEreport(!sc_context->sc_subquery_en, MOD_OPT, "unexpected exec nodes in shippability_context.");
pgxc_set_exprtype_shippability(exprType(node), sc_context);
sublink.subselect = NULL;
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
return expression_tree_walker((Node*)&sublink, (bool (*)())pgxc_shippability_walker, sc_context);
} break;
case T_GroupingFunc:
case T_PlaceHolderVar: {
if (sc_context->sc_contain_column_store)
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
else {
pgxc_set_shippability_reason(sc_context, SS_NEED_NO_CSTORE);
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
}
} break;
case T_MergeAction: {
* the original MERGE are not supported for shippability.
*/
if (sc_context->sc_query != NULL && sc_context->sc_query->commandType != CMD_INSERT) {
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
return false;
}
} break;
case T_GroupingId:
case T_SubPlan:
case T_AlternativeSubPlan:
case T_CommonTableExpr:
case T_SetOperationStmt:
case T_AppendRelInfo:
case T_PlaceHolderInfo: {
pgxc_set_shippability_reason(sc_context, SS_UNSUPPORTED_EXPR);
* These expressions are not supported for shippability entirely, so
* there is no need to walk trees underneath those. If we do so, we
* might walk the trees with wrong context there.
*/
return false;
} break;
default:
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE),
errmsg("unrecognized node type: %d", (int)nodeTag(node))));
break;
}
return expression_tree_walker(node, (bool (*)())pgxc_shippability_walker, (void*)sc_context);
}
* Returns whether or not the rtable (and its subqueries)
* contain foreign table entries.
*/
bool pgxc_query_contains_foreign_table(List* rtable)
{
ListCell* item = NULL;
foreach (item, rtable) {
RangeTblEntry* rte = (RangeTblEntry*)lfirst(item);
if (rte->relkind == RELKIND_FOREIGN_TABLE || rte->relkind == RELKIND_STREAM) {
return true;
} else if (rte->rtekind == RTE_SUBQUERY && pgxc_query_contains_foreign_table(rte->subquery->rtable))
return true;
}
return false;
}
* pgxc_query_needs_coord
* Check if the query needs Coordinator for evaluation or it can be completely
* evaluated on Coordinator. Return true if so, otherwise return false.
*/
static bool pgxc_query_needs_coord(Query* query)
{
* If the query is an EXEC DIRECT on the same Coordinator where it's fired,
* it should not be shipped
*/
if (query->is_local)
return true;
* If the query involves just the catalog tables, and is not an EXEC DIRECT
* statement, it can be evaluated completely on the Coordinator. No need to
* involve Datanodes.
*/
if (NIL != query->rtable && pgxc_query_contains_only_pg_catalog(query->rtable))
return true;
if (NIL != query->rtable && pgxc_query_contains_foreign_table(query->rtable))
return true;
return false;
}
* Returns whether or not the rtable (and its subqueries)
* only contain pg_catalog entries.
*/
static bool pgxc_query_contains_only_pg_catalog(List* rtable)
{
ListCell* item = NULL;
foreach (item, rtable) {
RangeTblEntry* rte = (RangeTblEntry*)lfirst(item);
if (rte->rtekind == RTE_RELATION) {
if (!is_sys_table(rte->relid))
return false;
} else if (rte->rtekind == RTE_SUBQUERY && !pgxc_query_contains_only_pg_catalog(rte->subquery->rtable))
return false;
}
return true;
}
* contains_subquery_walker
* Sublink walker, returns true iff found.
*/
static bool contains_subquery_walker(Node* node, void* context)
{
if (node == NULL) {
return false;
}
contain_subquery_context* ctxt = (contain_subquery_context*)context;
if (IsA(node, Query)) {
Query* qry = (Query*)node;
ctxt->subquery_exprs = lappend(ctxt->subquery_exprs, qry);
bool result = query_tree_walker((Query*)node, (bool (*)())contains_subquery_walker, context, 0);
return result;
}
return expression_tree_walker(node, (bool (*)())contains_subquery_walker, context);
}
* contains_specified_sublink
* Walke through node, find all sublinks.
* We don't have any specifications now, add them to the context if needed.
*/
List* contains_subquery(Node* node, contain_subquery_context* context)
{
(void)query_or_expression_tree_walker(node, (bool (*)())contains_subquery_walker, (void*)context, 0);
return context->subquery_exprs;
}
contain_subquery_context init_contain_subquery_context()
{
contain_subquery_context context;
context.subquery_exprs = NIL;
return context;
}
* make_params_transparent
* Make the boundParams transparent to all subqueries, if exists.
* @param query and current query level
* @return nothing.
*/
static void make_params_transparent(Query* query, int query_level)
{
ListCell* lc = NULL;
List* subquery_list = NULL;
if (query_level != 0) {
return;
}
contain_subquery_context context = init_contain_subquery_context();
subquery_list = contains_subquery((Node*)query, &context);
foreach (lc, subquery_list) {
Query* tmp_subquery = (Query*)lfirst(lc);
tmp_subquery->boundParamsQ = query->boundParamsQ;
}
}
* pgxc_is_query_shippable
* This function calls the query walker to analyse the query to gather
* information like Constraints under which the query can be shippable, nodes
* on which the query is going to be executed etc.
* Based on the information gathered, it decides whether the query can be
* executed on Datanodes directly without involving Coordinator.
* If the query is shippable this routine also returns the nodes where the query
* should be shipped. If the query is not shippable, it returns NULL.
*/
ExecNodes* pgxc_is_query_shippable(
Query* query, int query_level, bool light_proxy, bool* contain_column_store, bool* use_star_up)
{
Shippability_context sc_context;
ExecNodes* exec_nodes = NULL;
bool canShip = true;
Bitmapset* shippability = NULL;
if (pgxc_query_contains_foreign_table(query->rtable)) {
return NULL;
}
if (query->hasRowSecurity) {
return NULL;
}
errno_t errorno = EOK;
errorno = memset_s(&sc_context, sizeof(sc_context), '\0', sizeof(sc_context));
securec_check(errorno, "", "");
sc_context.sc_inselect = false;
sc_context.sc_query = query;
sc_context.sc_query_level = query_level;
sc_context.sc_for_expr = false;
sc_context.sc_light_proxy = light_proxy;
sc_context.sc_contain_column_store = (contain_column_store != NULL && *contain_column_store) ?
true : contains_column_tables(query->rtable);
sc_context.sc_use_star_upper_level = false;
sc_context.sc_security_barrier = false;
if (query->boundParamsQ != NULL) {
make_params_transparent(query, query_level);
}
* We might have already decided not to ship the query to the Datanodes, but
* still walk it anyway to find out if there are any subqueries which can be
* shipped.
*/
pgxc_shippability_walker((Node*)query, &sc_context);
if (sc_context.sc_security_barrier) {
return NULL;
}
#ifdef STREAMPLAN
* For nextval function, we disable FQS, but should allow stream plan for bulkload
* So we differentiate the unship reason for nextval from other function
*/
if (pgxc_test_shippability_reason(&sc_context, SS_UNSHIPPABLE_FUNCTION)) {
set_stream_off();
}
#endif
exec_nodes = sc_context.sc_exec_nodes;
* The shippability context contains two ExecNodes, one for the subLinks
* involved in the Query and other for the relation involved in FromClause.
* They are computed at different times while scanning the query. Merge both
* of them in pgxc_merge_exec_nodes. If query doesn't have SubLinks, we
* don't need to consider corresponding ExecNodes.
*/
if (query->hasSubLinks && exec_nodes) {
if (sc_context.sc_subquery_en != NULL) {
exec_nodes = pgxc_merge_exec_nodes(exec_nodes, sc_context.sc_subquery_en);
} else {
FreeExecNodes(&exec_nodes);
}
}
* Look at the information gathered by the walker in Shippability_context and that
* in the Query structure to decide whether we should ship this query
* directly to the Datanode or not
*/
* If the planner was not able to find the Datanodes to the execute the
* query, the query is not completely shippable. So, return NULL
*/
if (exec_nodes == NULL) {
return NULL;
}
* The original can be saved away */
shippability = bms_copy(sc_context.sc_shippability);
* If the query has an expression which renders the shippability to single
* node, and query needs to be shipped to more than one node, it can not be
* shipped.
*/
if (bms_is_member(SS_NEED_SINGLENODE, shippability)) {
* if nodeList has no nodes, it ExecNodes will have en_expr to know
* the nodes where to execute. Given the values of en_expr, it is true
* the query is executed on some single node.
* If query has result replicated across nodes, it's as good as having a
* single node.
* If query had returning list, it is only shippable with (real) single node.
*/
canShip = (list_length(exec_nodes->nodeList) > 1 &&
(!IsExecNodesReplicated(exec_nodes) || query->returningList != NIL)) ?
false : canShip;
shippability = bms_del_member(shippability, SS_NEED_SINGLENODE);
}
* If HAS_AGG_EXPR is set but NEED_SINGLENODE is not set, it means the
* aggregates are entirely shippable, so don't worry about it.
*/
shippability = bms_del_member(shippability, SS_HAS_AGG_EXPR);
if (!sc_context.sc_contain_column_store) {
shippability = bms_del_member(shippability, SS_NEED_NO_CSTORE);
}
if (!bms_is_empty(shippability)) {
canShip = false;
}
if (!canShip && exec_nodes != NULL) {
FreeExecNodes(&exec_nodes);
}
AssertEreport(!canShip || exec_nodes, MOD_OPT, "the query must be shippable and exec node is null.");
bms_free_ext(shippability);
shippability = NULL;
if (contain_column_store != NULL) {
*contain_column_store = sc_context.sc_contain_column_store;
}
if (use_star_up != NULL) {
*use_star_up = sc_context.sc_use_star_upper_level;
}
return exec_nodes;
}
* pgxc_is_expr_shippable
* Check whether the given expression can be shipped to datanodes.
*
* Note on has_aggs
* The aggregate expressions are not shippable if they can not be completely
* evaluated on a single datanode. But this function does not have enough
* context to determine the set of datanodes where the expression will be
* evaluated. Hence, the caller of this function can handle aggregate
* expressions, it passes a non-NULL value for has_aggs. This function returns
* whether the expression has any aggregates or not through this argument. If a
* caller passes NULL value for has_aggs, this function assumes that the caller
* can not handle the aggregates and deems the expression has unshippable.
*/
bool pgxc_is_expr_shippable(Expr* node, bool* has_aggs)
{
Shippability_context sc_context;
errno_t errorno = EOK;
errorno = memset_s(&sc_context, sizeof(sc_context), '\0', sizeof(sc_context));
securec_check(errorno, "", "");
sc_context.sc_query = NULL;
sc_context.sc_query_level = 0;
sc_context.sc_for_expr = true;
pgxc_shippability_walker((Node*)node, &sc_context);
* If caller is interested in knowing, whether the expression has aggregates
* let the caller know about it. The caller is capable of handling such
* expressions. Otherwise assume such an expression as not shippable.
*/
if (has_aggs != NULL) {
*has_aggs = pgxc_test_shippability_reason(&sc_context, SS_HAS_AGG_EXPR);
} else if (pgxc_test_shippability_reason(&sc_context, SS_HAS_AGG_EXPR)) {
return false;
}
pgxc_reset_shippability_reason(&sc_context, SS_HAS_AGG_EXPR);
if (!bms_is_empty(sc_context.sc_shippability)) {
return false;
}
return true;
}
bool pgxc_is_funcRTE_shippable(Expr* node)
{
Shippability_context sc_context;
errno_t rc;
AssertEreport(NULL != node, MOD_OPT, "Expr must not be null.");
rc = memset_s(&sc_context, sizeof(sc_context), 0, sizeof(sc_context));
securec_check(rc, "", "");
sc_context.sc_query = NULL;
sc_context.sc_query_level = 0;
sc_context.sc_for_expr = false;
pgxc_shippability_walker((Node*)node, &sc_context);
if (bms_is_empty(sc_context.sc_shippability))
return true;
return false;
}
* Internal functions.They can not be called as SQL functions.
* string_agg_finalfn bytea_string_agg_finalfn array_agg_finalfn
*/
bool pgxc_is_internal_agg_final_func(Oid funcid)
{
bool is_internal_func = true;
switch (funcid) {
case ARRAYAGGFUNCOID:
case STRINGAGGFUNCOID:
case BYTEASTRINGAGGFUNCOID:
case LISTAGGFUNCOID:
case LISTAGGNOARG2FUNCOID:
case INT2LISTAGGFUNCOID:
case INT2LISTAGGNOARG2FUNCOID:
case INT4LISTAGGFUNCOID:
case INT4LISTAGGNOARG2FUNCOID:
case INT8LISTAGGFUNCOID:
case INT8LISTAGGNOARG2FUNCOID:
case FLOAT4LISTAGGFUNCOID:
case FLOAT4LISTAGGNOARG2FUNCOID:
case FLOAT8LISTAGGFUNCOID:
case FLOAT8LISTAGGNOARG2FUNCOID:
case NUMERICLISTAGGFUNCOID:
case NUMERICLISTAGGNOARG2FUNCOID:
case DATELISTAGGFUNCOID:
case DATELISTAGGNOARG2FUNCOID:
case TIMESTAMPLISTAGGFUNCOID:
case TIMESTAMPLISTAGGNOARG2FUNCOID:
case TIMESTAMPTZLISTAGGFUNCOID:
case TIMESTAMPTZLISTAGGNOARG2FUNCOID:
case INTERVALLISTAGGFUNCOID:
case INTERVALLISTAGGNOARG2FUNCOID:
case JSONAGGFUNCOID:
case JSONOBJECTAGGFUNCOID:
case GROUPCONCATFUNCOID:
is_internal_func = false;
break;
default:
break;
}
return is_internal_func;
}
bool is_avg_func(Oid funcid)
{
switch (funcid) {
case INT8AVGFUNCOID:
case INT4AVGFUNCOID:
case INT2AVGFUNCOID:
case INT1AVGFUNCOID:
case NUMERICAVGFUNCOID:
case FLOAT4AVGFUNCOID:
case FLOAT8AVGFUNCOID:
case INTERVALAVGFUNCOID:
case INTERVALAGGAVGFUNCOID:
return true;
default:
break;
}
return false;
}
* Determine if the function arguments include ANY type. These function arguments
* will be expanded to check if contains expression that can not be shipped.
*/
static Oid shippable_func_contain_ANY[] = {
CONCATFUNCOID, CONCATWSFUNCOID, PGTYPEOFFUNCOID, TEXTANYCATFUNCOID, ANYTEXTCATFUNCOID, ANYTOTEXTFORMATFUNCOID
};
bool pgxc_is_shippable_func_contain_any(Oid funcid)
{
for (uint i = 0; i < lengthof(shippable_func_contain_ANY); i++) {
if (funcid == shippable_func_contain_ANY[i]) {
return true;
}
}
return false;
}
* pgxc_is_func_shippable
* Determine if a function is shippable
*/
bool pgxc_is_func_shippable(Oid funcid, shipping_context* context)
{
bool proshippable = false;
char provolatile = func_volatile(funcid);
proshippable = get_func_proshippable(funcid);
* For the time being a function is thought as shippable
* 1. it is immutable.
* 2. it is in shippable_stablefunc
* 3. some special volatile func.
*/
if (PROVOLATILE_IMMUTABLE == provolatile) {
return true;
} else if (PROVOLATILE_STABLE == provolatile) {
* For online expansion, we have to force some function shipple to DN e.g. get
* start/end tupleid.
*/
if (redis_func_shippable(funcid)) {
return true;
}
#ifdef ENABLE_MULTIPLE_NODES
if (vector_search_func_shippable(funcid)) {
return true;
}
#endif
if (IS_PGXC_COORDINATOR && proshippable) {
return true;
}
} else if (PROVOLATILE_VOLATILE == provolatile && context != NULL && !context->disallow_volatile_func_shippable) {
switch (funcid) {
case GSENCRYPTAES128FUNCOID:
case RANDOMFUNCOID:
return context->is_randomfunc_shippable;
break;
case NEXTVALFUNCOID:
if (t_thrd.proc->workingVersionNum < LARGE_SEQUENCE_VERSION_NUM) {
return false;
}
return context->is_nextval_shippable;
break;
case ECEXTENSIONFUNCOID:
case ECHADOOPFUNCOID:
return context->is_ecfunc_shippable;
break;
case PGSYSTIMESTAMPFUNCOID:
case TIMEZONETZFUNCOID:
return true;
break;
default:
break;
}
if (IS_PGXC_COORDINATOR && proshippable) {
return true;
}
}
return false;
}
* pgxc_find_dist_equijoin_qual
* Check equijoin conditions on given relations
*/
List* pgxc_find_dist_equijoin_qual(List* dist_vars1, List* dist_vars2, Node* quals)
{
List* lquals = NIL;
List* dist_1 = NIL;
List* dist_2 = NIL;
ListCell* qcell = NULL;
ListCell* cell_1 = NULL;
ListCell* cell_2 = NULL;
bool isFineAllDisKey = true;
List* qual_expr_list = NIL;
if (quals == NULL)
return NIL;
if (!IsA(quals, List))
lquals = make_ands_implicit((Expr*)quals);
else
lquals = (List*)quals;
foreach (cell_1, dist_vars1) {
dist_1 = (List*)lfirst(cell_1);
foreach (cell_2, dist_vars2) {
isFineAllDisKey = true;
dist_2 = (List*)lfirst(cell_2);
if (list_length(dist_1) != list_length(dist_2)) {
continue;
}
ListCell *cell1 = NULL, *cell2 = NULL;
forboth(cell1, dist_1, cell2, dist_2)
{
Var* lDistVar = (Var*)lfirst(cell1);
Var* rDistVar = (Var*)lfirst(cell2);
foreach (qcell, lquals) {
Expr* qual_expr = (Expr*)lfirst(qcell);
OpExpr* op = NULL;
Var* lvar = NULL;
Var* rvar = NULL;
if (!IsA(qual_expr, OpExpr))
continue;
op = (OpExpr*)qual_expr;
if (list_length(op->args) != 2)
continue;
* Check if both operands are Vars or RelabelType,if are RelabelType,we check the args.
* otherwise check next expression
*/
lvar = locate_distribute_var((Expr*)linitial(op->args));
rvar = locate_distribute_var((Expr*)lsecond(op->args));
if (NULL == lvar || NULL == rvar)
continue;
* If the data types of both the columns are not same, continue. Hash
* and Modulo of a the same bytes will be same if the data types are
* same. So, only when the data types of the columns are same, we can
* ship a distributed JOIN to the Datanodes
*/
if (exprType((Node*)lvar) != exprType((Node*)rvar))
continue;
if (!((equal(lDistVar, lvar) && equal(rDistVar, rvar)) ||
((equal(lDistVar, rvar) && equal(rDistVar, lvar)))))
continue;
* If the operator is not an assignment operator, check next
* constraint. An operator is an assignment operator if it's
* mergejoinable or hashjoinable. Beware that not every assignment
* operator is mergejoinable or hashjoinable, so we might leave some
* oportunity. But then we have to rely on the opname which may not
* be something we know to be equality operator as well.
*/
if (!op_mergejoinable(op->opno, exprType((Node*)lvar)) &&
!op_hashjoinable(op->opno, exprType((Node*)lvar)))
continue;
qual_expr_list = lappend(qual_expr_list, qual_expr);
break;
}
if (qcell == NULL) {
list_free_ext(qual_expr_list);
qual_expr_list = NULL;
isFineAllDisKey = false;
break;
}
}
if (isFineAllDisKey) {
return qual_expr_list;
}
}
}
return NULL;
}
* pgxc_merge_exec_nodes
* The routine combines the two exec_nodes passed such that the resultant
* exec_node corresponds to the JOIN of respective relations.
* If both exec_nodes can not be merged, it returns NULL.
*/
ExecNodes* pgxc_merge_exec_nodes(ExecNodes* en1, ExecNodes* en2, int jointype)
{
ExecNodes* merged_en = (ExecNodes*)makeNode(ExecNodes);
ExecNodes* tmp_en = NULL;
if (en1 == NULL) {
tmp_en = (ExecNodes*)copyObject(en2);
return tmp_en;
}
if (en2 == NULL) {
tmp_en = (ExecNodes*)copyObject(en1);
return tmp_en;
}
* Push join down when both en_expr are same, like nodelist.
* For now, we can only deal with one value in en_expr.
* Todo: when there are more values in en_expr, we need to compare the two lists
* to make sure they will go to the same datanode/s.
*/
bool single_expr_en1 =
(en1->en_expr != NIL && IsExecNodesColumnDistributed(en1) && en1->accesstype == RELATION_ACCESS_READ);
bool single_expr_en2 =
(en2->en_expr != NIL && IsExecNodesColumnDistributed(en2) && en2->accesstype == RELATION_ACCESS_READ);
bool single_node_en1 = (IsExecNodesColumnDistributed(en1) && list_length(en1->nodeList) == 1);
bool single_node_en2 = (IsExecNodesColumnDistributed(en2) && list_length(en2->nodeList) == 1);
bool has_list_range_node = IsExecNodesDistributedBySlice(en1) || IsExecNodesDistributedBySlice(en2);
* For hash, we only check the node; for list/range, we should make sure they are from same distribution too. */
if (!has_list_range_node || (has_list_range_node && PgxcIsDistInfoSame(en1, en2))) {
if (single_expr_en1 && single_expr_en2) {
if (ng_is_same_group(&en1->distribution, &en2->distribution)) {
tmp_en = (ExecNodes*)copyObject(en1);
if (list_difference(en1->en_expr, en2->en_expr)) {
tmp_en->dynamic_en_expr = list_append_unique(tmp_en->dynamic_en_expr, linitial(en1->en_expr));
tmp_en->dynamic_en_expr = list_append_unique(tmp_en->dynamic_en_expr, linitial(en2->en_expr));
}
} else
tmp_en = NULL;
return tmp_en;
} else if (jointype >= 0 &&
((RHS_join(jointype) && single_expr_en1 && !single_node_en2) ||
(LHS_join(jointype) && !single_node_en1 && single_expr_en2)) &&
ng_is_same_group(&en1->distribution, &en2->distribution)) {
tmp_en = single_expr_en1 ? (ExecNodes*)copyObject(en1) : (ExecNodes*)copyObject(en2);
return tmp_en;
}
}
if (en1->primarynodelist || en2->primarynodelist || en1->en_expr || en2->en_expr || OidIsValid(en1->en_relid) ||
OidIsValid(en2->en_relid) || en2->accesstype != RELATION_ACCESS_READ)
return NULL;
if (IsExecNodesReplicated(en1) && IsExecNodesReplicated(en2)) {
* Replicated/replicated join case
* Check that replicated relation is not disjoint
* with initial relation which is also replicated.
* If there is a common portion of the node list between
* the two relations, other rtables have to be checked on
* this restricted list.
*/
if (en1->accesstype != RELATION_ACCESS_READ) {
FreeExecNodes(&merged_en);
return NULL;
}
merged_en->nodeList = list_intersection_int(en1->nodeList, en2->nodeList);
Oid group_oid = InvalidOid;
if (!list_difference_int(en1->nodeList, en2->nodeList) && !list_difference_int(en2->nodeList, en1->nodeList)) {
group_oid =
(InvalidOid != en1->distribution.group_oid) ? en1->distribution.group_oid : en2->distribution.group_oid;
}
Distribution* distribution = ng_convert_to_distribution(merged_en->nodeList);
ng_set_distribution(&merged_en->distribution, distribution);
merged_en->distribution.group_oid = group_oid;
merged_en->baselocatortype = LOCATOR_TYPE_REPLICATED;
if (!merged_en->nodeList)
FreeExecNodes(&merged_en);
return merged_en;
}
if (IsExecNodesReplicated(en1) && IsExecNodesColumnDistributed(en2)) {
List* diff_nodelist = NULL;
if (en1->accesstype != RELATION_ACCESS_READ) {
FreeExecNodes(&merged_en);
return NULL;
}
* Replicated/distributed join case.
* Node list of distributed table has to be included
* in node list of replicated table.
*/
diff_nodelist = list_difference_int(en2->nodeList, en1->nodeList);
* If the difference list is not empty, this means that node list of
* distributed table is not completely mapped by node list of replicated
* table, so go through standard planner.
*/
if (diff_nodelist != NULL)
FreeExecNodes(&merged_en);
else {
merged_en->nodeList = list_copy(en2->nodeList);
ng_copy_distribution(&merged_en->distribution, &en2->distribution);
merged_en->baselocatortype = en2->baselocatortype;
merged_en->en_dist_vars = en2->en_dist_vars;
if (IsLocatorDistributedBySlice(en2->baselocatortype)) {
merged_en->rangelistOid = en2->rangelistOid;
}
}
return merged_en;
}
if (IsExecNodesColumnDistributed(en1) && IsExecNodesReplicated(en2)) {
List* diff_nodelist = NULL;
* Distributed/replicated join case.
* Node list of distributed table has to be included
* in node list of replicated table.
*/
diff_nodelist = list_difference_int(en1->nodeList, en2->nodeList);
* If the difference list is not empty, this means that node list of
* distributed table is not completely mapped by node list of replicated
* table, so go through standard planner.
*/
if (diff_nodelist != NULL)
FreeExecNodes(&merged_en);
else {
merged_en->nodeList = list_copy(en1->nodeList);
ng_copy_distribution(&merged_en->distribution, &en1->distribution);
merged_en->baselocatortype = en1->baselocatortype;
merged_en->en_dist_vars = en1->en_dist_vars;
if (IsLocatorDistributedBySlice(en1->baselocatortype)) {
merged_en->rangelistOid = en1->rangelistOid;
}
}
return merged_en;
}
if (IsExecNodesColumnDistributed(en1) && IsExecNodesColumnDistributed(en2)) {
* Distributed/distributed case
* If the caller has suggested that this is an equi-join between two
* distributed results, check that they have the same nodes in the distribution
* node list. The caller is expected to fully decide whether to merge
* the nodes or not.
*/
bool merged_ok = false;
if (IsLocatorDistributedBySlice(en1->baselocatortype) || IsLocatorDistributedBySlice(en2->baselocatortype)) {
if (!PgxcIsDistInfoSame(en1, en2)) {
FreeExecNodes(&merged_en);
return merged_en;
}
merged_en->rangelistOid = en1->rangelistOid;
}
if (!list_difference_int(en1->nodeList, en2->nodeList) && !list_difference_int(en2->nodeList, en1->nodeList) &&
ng_is_same_group(&en1->distribution, &en2->distribution)) {
merged_en->nodeList = list_copy(en1->nodeList);
merged_ok = true;
} else if (jointype >= 0 && ng_is_same_group(&en1->distribution, &en2->distribution) &&
((RHS_join(jointype) && single_node_en1 && !single_node_en2) ||
(LHS_join(jointype) && !single_node_en1 && single_node_en2))) {
*
* (1) bmsql_district distribute by (d_w_id)
* (2) bmsql_order_line distribute by (ol_w_id)
* (3) and we have a join condition on distribute key: ol_w_id = d_w_id
* (4) adn we have a where clause d_w_id = 1
*
* the query looks like this:
*
* SELECT ol_i_id
* FROM bmsql_district
* JOIN bmsql_order_line ON ol_w_id = d_w_id
* WHERE d_w_id = 1;
*
* the join can be shipped to all datanodes using condition (1)(2)(3)
* while condition (4) can force the join to ONE dn
*
*/
merged_en->nodeList = single_node_en1 ? list_copy(en1->nodeList) : list_copy(en2->nodeList);
merged_ok = true;
}
if (merged_ok) {
Distribution* distribution =
(InvalidOid != en1->distribution.group_oid) ? &en1->distribution : &en2->distribution;
ng_copy_distribution(&merged_en->distribution, distribution);
if (en1->baselocatortype == en2->baselocatortype) {
merged_en->baselocatortype = en1->baselocatortype;
merged_en->en_dist_vars = list_concat(list_copy(en1->en_dist_vars), list_copy(en2->en_dist_vars));
} else
merged_en->baselocatortype = LOCATOR_TYPE_DISTRIBUTED;
} else {
FreeExecNodes(&merged_en);
}
return merged_en;
}
ereport(ERROR,
(errmodule(MOD_OPT),
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("openGauss does not support this distribution type yet"),
errdetail("The feature is not currently supported"))));
return NULL;
}
* pgxc_check_index_shippability
* Check shippability of index described by given conditions. This generic
* function can be called even if the index is not yet defined.
*/
bool pgxc_check_index_shippability(
RelationLocInfo* relLocInfo, bool is_primary, bool is_unique, bool is_exclusion, List* indexAttrs, List* indexExprs)
{
bool result = true;
ListCell* lc = NULL;
* Leave if no locator information, in this case shippability has no
* meaning.
*/
if (relLocInfo == NULL)
return result;
* Scan the expressions used in index and check the shippability of each
* of them. If only one is not-shippable, the index is considered as non
* shippable. It is important to check the shippability of the expressions
* before refining scan on the index columns and distribution type of
* parent relation.
*/
foreach (lc, indexExprs) {
if (!pgxc_is_expr_shippable((Expr*)lfirst(lc), NULL)) {
result = false;
goto finish;
}
}
* Check if relation is distributed on a single node, in this case
* the constraint can be shipped in all the cases.
*/
if (list_length(relLocInfo->nodeList) == 1)
return result;
* Check the case of EXCLUSION index.
* EXCLUSION constraints are shippable only for replicated relations as
* such constraints need that one tuple is checked on all the others, and
* if this tuple is correctly excluded of the others, the constraint is
* verified.
*/
if (is_exclusion) {
if (!IsRelationReplicated(relLocInfo)) {
result = false;
goto finish;
}
}
* Check the case of PRIMARY KEY INDEX and UNIQUE index.
* Those constraints are shippable if the parent relation is replicated
* or if the column
*/
if (is_unique || is_primary) {
* Perform different checks depending on distribution type of parent
* relation.
*/
switch (relLocInfo->locatorType) {
case LOCATOR_TYPE_REPLICATED:
result = true;
break;
case LOCATOR_TYPE_RROBIN: {
Relation rel = RelationIdGetRelation(relLocInfo->relid);
if (!RelationIsValid(rel)) {
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("could not open relation with OID %u", relLocInfo->relid)));
}
* Index on roundrobin parent table cannot be safely shipped
* because of the random behavior of data balancing.
* But, the hdfs foreign table support informational constraint.
*/
result = false;
if (RELKIND_FOREIGN_TABLE == rel->rd_rel->relkind || RELKIND_STREAM == rel->rd_rel->relkind) {
ForeignTable* ftbl = NULL;
ForeignServer* fsvr = NULL;
Oid relationId = RelationGetRelid(rel);
ftbl = GetForeignTable(relationId);
if (NULL == ftbl)
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
(errmsg("Failed to get foreign table."))));
fsvr = GetForeignServer(ftbl->serverid);
if (NULL == fsvr)
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
(errmsg("Failed to get foreign server."))));
if (isObsOrHdfsTableFormSrvName(fsvr->servername) ||
IS_OBS_CSV_TXT_FOREIGN_TABLE(relLocInfo->relid)) {
result = true;
}
}
RelationClose(rel);
break;
}
case LOCATOR_TYPE_RANGE:
case LOCATOR_TYPE_LIST:
case LOCATOR_TYPE_HASH:
case LOCATOR_TYPE_MODULO: {
* Unique indexes on Hash and Modulo tables are shippable if the
* index expression contains all the distribution expressions of
* its parent relation.
*
* Here is a short example with concatenate that cannot be
* shipped:
* CREATE TABLE aa (a text, b text) DISTRIBUTE BY HASH(a);
* CREATE UNIQUE INDEX aap ON aa((a || b));
* INSERT INTO aa VALUES ('a', 'abb');
* INSERT INTO aa VALUES ('aab', b); -- no error ??!
* The output uniqueness is not guaranteed as both INSERT will
* go to different nodes. For such simple reasons unique
* indexes on distributed tables are not shippable.
* Shippability is not even ensured if all the expressions
* used as Var are only distributed columns as the hash output of
* their value combination does not ensure that query will
* be directed to the correct remote node. Uniqueness is not even
* protected if the index expression contains only the distribution
* column like for that with a cluster of 2 Datanodes:
* CREATE TABLE aa (a int) DISTRIBUTE BY HASH(a);
* CREATE UNIQUE INDEX aap ON (abs(a));
* INSERT INTO aa (2); -- to Datanode 1
* INSERT INTO aa (-2); -- to Datanode 2, breaks uniqueness
*
* For the time being distribution key can only be
* defined on a single column, so this will need to be changed
* onde a relation distribution will be able to be defined based
* on an expression of multiple columns.
*/
if (indexExprs != NIL) {
result = false;
break;
}
if (indexAttrs == NIL)
break;
* Check that distribution column is included in the list of
* index columns.
*/
ListCell* cell = NULL;
AttrNumber attnum;
foreach (cell, relLocInfo->partAttrNum) {
attnum = lfirst_int(cell);
if (!list_member_int(indexAttrs, attnum)) {
* Distribution column is not in index column list
* So index can be enforced remotely.
*/
result = false;
break;
}
}
* by being here we are now sure that the index can be enforced
* remotely as the distribution column is included in index.
*/
break;
}
case LOCATOR_TYPE_NONE:
case LOCATOR_TYPE_DISTRIBUTED:
case LOCATOR_TYPE_CUSTOM:
default:
Assert(0);
}
}
finish:
return result;
}
* pgxc_check_fk_shippabilily
* Check the shippability of a parent and a child relation based on the
* distribution of each and the columns that are used to reference to
* parent and child relation. This can be used for inheritance or foreign
* key shippability evaluation.
*/
bool pgxc_check_fk_shippability(
RelationLocInfo* parentLocInfo, RelationLocInfo* childLocInfo, List* parentRefs, List* childRefs)
{
bool result = true;
AssertEreport(list_length(parentRefs) == list_length(childRefs),
MOD_OPT,
"shippability of parent and child relation check failed.");
* If either child or parent have no relation data, shippability makes
* no sense.
*/
if (parentLocInfo == NULL || childLocInfo == NULL)
return result;
if (IsLocatorInfoEqual(parentLocInfo, childLocInfo))
return result;
switch (parentLocInfo->locatorType) {
case LOCATOR_TYPE_REPLICATED:
* If the parent relation is replicated, the child relation can
* always refer to it on all the nodes.
*/
result = true;
break;
case LOCATOR_TYPE_RROBIN:
* If the parent relation is based on roundrobin, the child
* relation cannot be enforced on remote nodes before of the
* random behavior of data balancing.
*/
result = false;
break;
case LOCATOR_TYPE_RANGE:
case LOCATOR_TYPE_LIST:
case LOCATOR_TYPE_HASH:
case LOCATOR_TYPE_MODULO: {
* If parent table is distributed, the child table can reference
* to its parent safely if the following conditions are satisfied:
* - parent and child are both hash-based, or both modulo-based
* - parent reference columns contain the distribution column
* of the parent relation
* - child reference columns contain the distribution column
* of the child relation
* - both child and parent map the same nodes for data location
*/
if (IsRelationReplicated(childLocInfo)) {
result = false;
break;
}
* Parent and child need to have the same distribution type:
* hash or modulo.
*/
if (parentLocInfo->locatorType != childLocInfo->locatorType) {
result = false;
break;
}
* Parent and child need to have their data located exactly
* on the same list of nodes.
*/
if (list_difference_int(childLocInfo->nodeList, parentLocInfo->nodeList) ||
list_difference_int(parentLocInfo->nodeList, childLocInfo->nodeList)) {
result = false;
break;
}
* Check that child and parents are referenced using their
* distribution column.
*/
if (list_length(childLocInfo->partAttrNum) != list_length(parentLocInfo->partAttrNum)) {
result = false;
break;
}
ListCell* cell1 = NULL;
ListCell* cell2 = NULL;
AttrNumber attnum1, attnum2;
forboth(cell1, childLocInfo->partAttrNum, cell2, parentLocInfo->partAttrNum)
{
attnum1 = lfirst_int(cell1);
attnum2 = lfirst_int(cell2);
if (!list_member_int(childRefs, attnum1) || !list_member_int(parentRefs, attnum2)) {
result = false;
break;
}
int i1 = 0, i2 = 0;
ListCell* lc = NULL;
foreach (lc, childRefs) {
if (lfirst_int(lc) == attnum1)
break;
i1++;
}
foreach (lc, parentRefs) {
if (lfirst_int(lc) == attnum2)
break;
i2++;
}
if (i1 != i2) {
result = false;
break;
}
}
if (IsLocatorDistributedBySlice(parentLocInfo->locatorType)) {
* if parent and child slice definition and slice-dn map are the same,
* then parent-child constraint can be shipped.
*/
if (!IsSliceInfoEqualByOid(parentLocInfo->relid, childLocInfo->relid)) {
result = false;
}
}
break;
}
case LOCATOR_TYPE_NONE:
case LOCATOR_TYPE_DISTRIBUTED:
case LOCATOR_TYPE_CUSTOM:
default:
Assert(0);
}
return result;
}
* pgxc_replace_dist_vars_subquery
* The function looks up the members of ExecNodes::en_dist_var in the
* query->targetList. If found, they are re-stamped with the given varno and
* resno of the TargetEntry found and added to the new distribution var list
* being created. This function is useful to re-stamp the distribution columns
* of a subquery.
*/
static void pgxc_replace_dist_vars_subquery(Query* query, ExecNodes* exec_nodes, Index varno)
{
ListCell* l_cell = NULL;
ListCell* lcell = NULL;
List* new_dist_vars = NIL;
List* dis_list = NIL;
List* tmp_l = NIL;
foreach (l_cell, exec_nodes->en_dist_vars) {
dis_list = (List*)lfirst(l_cell);
foreach (lcell, dis_list) {
Var* var = (Var*)lfirst(lcell);
TargetEntry* tle = NULL;
AssertEreport(IsA(var, Var), MOD_OPT, "targetlist type wrong.");
tle = tlist_member((Node*)var, query->targetList);
if (tle != NULL) {
Var* new_dist_var = makeVar(varno,
tle->resno,
exprType((Node*)tle->expr),
exprTypmod((Node*)tle->expr),
exprCollation((Node*)tle->expr),
0);
new_dist_vars = lappend(new_dist_vars, new_dist_var);
} else {
break;
}
}
if (lcell == NULL) {
if (new_dist_vars != NIL) {
tmp_l = lappend(tmp_l, new_dist_vars);
}
} else {
list_free_deep(new_dist_vars);
}
new_dist_vars = NIL;
}
list_free_deep(exec_nodes->en_dist_vars);
exec_nodes->en_dist_vars = tmp_l;
}
* pgxc_get_dist_var
* Given the varno, corresponding range table entry and targetlist, get the Var
* node for distribution column, present in the targetlist as the root of
* expression if there is one; otherwise return one created.
*
* If it's a replicated table or table local to the coordinator, or
* any relation other than distributed table it returns NULL.
*
* varno: is the index of the range table entry in the query range table, to be
* set as Var::varno
* rte: range table entry corresponding to varno. There is no way to verify
* where the correspondence is true.
* tlist: target list or just the list of expression where to find the Var
* corresponding to the distribution column
*/
List* pgxc_get_dist_var(Index varno, RangeTblEntry* rte, List* tlist)
{
RelationLocInfo* rel_loc_info = GetRelationLocInfo(rte->relid);
ListCell* lcell = NULL;
Var* dist_var = NULL;
Oid dist_var_type;
int32 dist_var_typmod;
Oid dist_var_collid;
List* varList = NULL;
if (rel_loc_info == NULL || !IsRelationDistributedByValue(rel_loc_info))
return NULL;
ListCell* cell = NULL;
foreach (cell, rel_loc_info->partAttrNum) {
AttrNumber attnum = lfirst_int(cell);
bool isMatch = false;
foreach (lcell, tlist) {
TargetEntry* tle = (TargetEntry*)lfirst(lcell);
Var* var = NULL;
if (tle && IsA(tle, TargetEntry))
var = (Var*)tle->expr;
else
var = (Var*)tle;
if (var && IsA(var, Var) && (var->varno == varno) && (var->varattno == attnum)) {
isMatch = true;
varList = lappend(varList, (Var*)copyObject(var));
break;
}
}
if (isMatch == false) {
* Bare distribution column is not found in the targetlist, craft a Var for
* it.
*/
get_rte_attribute_type(rte, attnum, &dist_var_type, &dist_var_typmod, &dist_var_collid);
dist_var = makeVar(varno, attnum, dist_var_type, dist_var_typmod, dist_var_collid, 0);
varList = lappend(varList, dist_var);
}
isMatch = false;
}
return varList;
}
static bool PgxcIsDistInfoSame(ExecNodes *innerEn, ExecNodes *outerEn)
{
if (innerEn->baselocatortype != outerEn->baselocatortype) {
return false;
}
return IsSliceInfoEqualByOid(innerEn->rangelistOid, outerEn->rangelistOid);
}
* pgxc_is_join_shippable
* The shippability of JOIN is decided in following steps
* 1. Are the JOIN conditions shippable?
* For INNER JOIN it's possible to apply some of the conditions at the
* Datanodes and others at coordinator. But for other JOINs, JOIN conditions
* decide which tuples on the OUTER side are appended with NULL columns from
* INNER side, we need all the join conditions to be shippable for the join to
* be shippable.
* 2. Do the JOIN conditions have quals that will make it shippable?
* When both sides of JOIN are replicated, irrespective of the quals the JOIN
* is shippable.
* INNER joins between replicated and distributed relation are shippable
* irrespective of the quals. OUTER join between replicated and distributed
* relation is shippable if distributed relation is the outer relation.
* All joins between hash/modulo distributed relations are shippable if they
* have equi-join on the distributed column, such that distribution columns
* have same datatype and same distribution strategy.
* 3. Are datanodes where the joining relations exist, compatible?
* Joins between replicated relations are shippable if both relations share a
* datanode. Joins between distributed relations are shippable if both
* relations are distributed on same set of Datanodes. Join between replicated
* and distributed relations is shippable is replicated relation is replicated
* on all nodes where distributed relation is distributed.
* 4. Are targetlists of both sides shippable?
* For OUTER Joins if there is at least one unshippable entry in the targetlist
* of the relation which contributes NULL columns in the join result, the join
* is not shippable. In such cases, the unshippable expression is projected at
* the coordinator, thus causing a non-NULL value to appear instead of NULL
* value in the result.
*
* The first step is to be applied by the caller of this function.
*/
ExecNodes* pgxc_is_join_shippable(ExecNodes* inner_en, ExecNodes* outer_en, bool inner_unshippable_tlist,
bool outer_unshippable_tlist, JoinType jointype, Node* join_quals)
{
bool merge_nodes = false;
List* dist_vars = NIL;
* If either of inner_en or outer_en is NULL, return NULL. We can't ship the
* join when either of the sides do not have datanodes to ship to.
*/
if (outer_en == NULL || inner_en == NULL)
return NULL;
* We only handle reduction of INNER, LEFT [OUTER], RIGHT [OUTER] and
* FULL [OUTER] joins.
*/
if (jointype != JOIN_INNER && jointype != JOIN_LEFT && jointype != JOIN_RIGHT && jointype != JOIN_FULL)
return NULL;
* For left outer join, if the inner relation (for which null columns are
* added if there is a row unmatched from outer join), has unshippable
* targetlist entry, we can not ship the join. This is because, the unshippable
* targetlist entry needs to be calculated before it can be added to the
* JOIN result, either as NULL or non-NULL.
* Similarly for FULL OUTER Join, none of the sides should have unshippable
* targetlist expression.
*/
if (jointype == JOIN_LEFT && inner_unshippable_tlist)
return NULL;
if (jointype == JOIN_RIGHT && outer_unshippable_tlist)
return NULL;
if (jointype == JOIN_FULL && (inner_unshippable_tlist || outer_unshippable_tlist))
return NULL;
if ((IsExecNodesReplicated(inner_en) && IsExecNodesReplicated(outer_en)) ||
(list_length(inner_en->nodeList) == 1 && list_length(outer_en->nodeList) == 1))
merge_nodes = true;
else if (inner_en->en_expr != NIL && outer_en->en_expr != NIL)
merge_nodes = true;
else if (IsExecNodesColumnDistributed(inner_en) && IsExecNodesColumnDistributed(outer_en)) {
* If two sides are distributed in the same manner by a value, with an
* equi-join on the distribution column and that condition
* is shippable, ship the join if node lists from both sides can be
* merged.
*/
if (inner_en->baselocatortype == outer_en->baselocatortype && IsExecNodesDistributedByValue(inner_en) &&
jointype != JOIN_FULL) {
List* equi_join_expr =
pgxc_find_dist_equijoin_qual(inner_en->en_dist_vars, outer_en->en_dist_vars, join_quals);
ListCell* cell = NULL;
if (equi_join_expr != NULL) {
foreach (cell, equi_join_expr) {
Expr* join_expr = (Expr*)lfirst(cell);
if (!pgxc_is_expr_shippable(join_expr, NULL)) {
break;
}
}
if (cell == NULL) {
merge_nodes = true;
if (JOIN_LEFT == jointype) {
dist_vars = list_copy((List*)linitial(outer_en->en_dist_vars));
} else if (JOIN_RIGHT == jointype) {
dist_vars = list_copy((List*)linitial(inner_en->en_dist_vars));
}
}
}
}
} else if (IsExecNodesColumnDistributed(outer_en) && IsExecNodesReplicated(inner_en) &&
(jointype == JOIN_INNER || jointype == JOIN_LEFT)) {
* If outer side is distributed and inner side is replicated, we can ship
* LEFT OUTER and INNER join.
*/
merge_nodes = true;
} else if (IsExecNodesReplicated(outer_en) && IsExecNodesColumnDistributed(inner_en) &&
(jointype == JOIN_INNER || jointype == JOIN_RIGHT)) {
* If outer side is replicated and inner side is distributed, we can ship
* only for INNER join.
*/
merge_nodes = true;
}
* If the ExecNodes of inner and outer nodes can be merged, the JOIN is
* shippable
*/
if (merge_nodes) {
ExecNodes* merged_en = pgxc_merge_exec_nodes(inner_en, outer_en, (int)jointype);
if (dist_vars != NIL && merged_en != NULL) {
list_free_ext(merged_en->en_dist_vars);
merged_en->en_dist_vars = list_make1(dist_vars);
}
return merged_en;
} else {
return NULL;
}
}
* pgxc_check_triggers_shippability:
* Return true if none of the triggers prevents the query from being FQSed.
*/
bool pgxc_check_triggers_shippability(Oid relid, int commandType, bool* hasTrigger)
{
int16 trigevent = pgxc_get_trigevent((CmdType)commandType);
Relation rel = relation_open(relid, AccessShareLock);
bool found_nonshippable = false;
* If we don't find a non-shippable row trigger, then the statement is
* shippable as far as triggers are concerned. For FQSed query, statement
* triggers are separately invoked on coordinator.
*/
found_nonshippable = pgxc_find_nonshippable_row_trig(rel, trigevent, 0, true, hasTrigger);
relation_close(rel, AccessShareLock);
return !found_nonshippable;
}
* @Description : find IUD statement trigger on the relation.
* @in relid : the relation oid need be find.
* @in commandType : the command type on the relation.
* @return : true if there is a statement trigger.
*/
bool pgxc_find_statement_trigger(Oid relid, int commandType)
{
bool found_statement_trigger = false;
Relation rel = NULL;
int16 trigevent = 0;
if (commandType != CMD_UPDATE && commandType != CMD_INSERT && commandType != CMD_DELETE)
return false;
if (!OidIsValid(relid))
return false;
trigevent = pgxc_get_trigevent((CmdType)commandType);
rel = relation_open(relid, AccessShareLock);
if (!(rel->trigdesc != NULL && pgxc_has_trigger_for_event(trigevent, rel->trigdesc))) {
relation_close(rel, AccessShareLock);
return false;
}
for (int i = 0; i < rel->trigdesc->numtriggers; i++) {
uint16 tgtype = rel->trigdesc->triggers[i].tgtype;
if (!TRIGGER_FOR_ROW(tgtype)) {
found_statement_trigger = true;
break;
}
}
relation_close(rel, AccessShareLock);
return found_statement_trigger;
}
* Search for a non-shippable ROW trigger of a particular type.
*
* If ignore_timing is true, just the event_type is used to find a match, so
* once the event matches, the search returns true regardless of whether it is a
* before or after row trigger.
*
* If ignore_timing is false, return true if we find one or more non-shippable
* row triggers that match the exact combination of event and timing.
*
* We have to do this way because the bitmask used for timing does
* not have unique bit positions for different values. For e.g. for AFTER timing
* type, the bit position 0x2 has value 0, and for BEFORE type the same
* bit position has value 1, so it is impossible to use these bits to suggest
* ignoring the timing. (ROW and STATEMENT values also share the same 0x1 bit
* but we only want ROW triggers so it does not matter here). Hence an extra
* flag ignore_timing to indicate that we want to ignore the timing
* and only consider event type. The caller may just pass 0 for timing.
* NOTE: To indicate that timing is to be ignored, we can device our own
* "invalid" timing value in which all of the timing bits are set to 1
* (i.e. the exact TRIGGER_TYPE_TIMING_MASK value), but that will make the
* function calls unreadable.
*/
bool pgxc_find_nonshippable_row_trig(
Relation rel, int16 tgtype_event, int16 tgtype_timing, bool ignore_timing, bool* hasTrigger)
{
int i;
* This function is used for finding matching row triggers only; should not
* be called for TRUNCATE command.
*/
AssertEreport(!TRIGGER_FOR_TRUNCATE((int32)tgtype_event), MOD_OPT, "should not be called for TRUNCATE command.");
if (hasTrigger != NULL)
*hasTrigger = true;
if (rel->trigdesc == NULL) {
if (hasTrigger != NULL)
*hasTrigger = false;
return false;
}
* Quick check by just scanning the trigger descriptor, before
* actually peeking into each of the individual triggers.
*/
if (!pgxc_has_trigger_for_event(tgtype_event, rel->trigdesc)) {
if (hasTrigger != NULL)
*hasTrigger = false;
return false;
}
for (i = 0; i < rel->trigdesc->numtriggers; i++) {
uint16 tgtype = rel->trigdesc->triggers[i].tgtype;
if (!TRIGGER_FOR_ROW(tgtype))
continue;
* If we are asked to find triggers of *any* level or timing, just match
* the event type to determine whether we should ignore this trigger.
*/
if (ignore_timing) {
if ((TRIGGER_FOR_INSERT((int32)tgtype_event) && !TRIGGER_FOR_INSERT(tgtype)) ||
(TRIGGER_FOR_UPDATE((int32)tgtype_event) && !TRIGGER_FOR_UPDATE(tgtype)) ||
(TRIGGER_FOR_DELETE((int32)tgtype_event) && !TRIGGER_FOR_DELETE(tgtype)))
continue;
} else {
* Otherwise, do an exact match with the given combination of event
* and timing.
*/
if (!TRIGGER_TYPE_MATCHES(tgtype, (int32)TRIGGER_TYPE_ROW, (int32)tgtype_timing, (int32)tgtype_event))
continue;
}
* We now know that we cannot ignore this trigger, so check its
* shippability.
*/
if (!pgxc_is_trigger_shippable(i, rel))
return true;
}
return false;
}
#define REL_GET_ITH_TRIG(rel, i) (rel->trigdesc->triggers[i])
static inline bool trigger_need_compile(int trig_idx, Relation rel)
{
return (IS_PGXC_COORDINATOR && u_sess->attr.attr_sql.enable_trigger_shipping && rel != NULL &&
rel->rd_locator_info != NULL && !IsLocatorReplicated(rel->rd_locator_info->locatorType) &&
get_func_lang(REL_GET_ITH_TRIG(rel, trig_idx).tgfoid) == get_language_oid("plpgsql", true) &&
TRIGGER_FOR_ROW(REL_GET_ITH_TRIG(rel, trig_idx).tgtype));
}
* pgxc_is_trigger_shippable:
* Check if trigger is shippable to a remote node. This function would be
* called both on coordinator as well as datanode. We want this function
* to be workable on datanode because we want to skip non-shippable triggers
* on datanode.
*/
static bool pgxc_is_trigger_shippable(int trig_idx, Relation rel)
{
* Notice: We should consider if trigger is internal or not, since internal trigger
* must run on DN.
*/
if (!u_sess->attr.attr_sql.enable_trigger_shipping) {
if (!REL_GET_ITH_TRIG(rel, trig_idx).tgisinternal)
return false;
else
return true;
}
bool res = true;
if (REL_GET_ITH_TRIG(rel, trig_idx).tgenabled == TRIGGER_DISABLED)
return true;
* If trigger is based on a constraint or is internal, enforce its launch
* whatever the node type where we are for the time being.
* We need to remove this condition once constraints are better
* implemented within openGauss as a constraint can be locally
* evaluated on remote nodes depending on the distribution type of the table
* on which it is defined or on its parent/child distribution types.
*/
if (REL_GET_ITH_TRIG(rel, trig_idx).tgisinternal)
return true;
* INSTEAD OF triggers can only be defined on views, which are defined
* only on Coordinators, so they cannot be shipped.
*/
if (TRIGGER_FOR_INSTEAD(REL_GET_ITH_TRIG(rel, trig_idx).tgtype))
return false;
if (trigger_need_compile(trig_idx, rel)) {
FunctionCallInfoData fake_fcinfo;
FmgrInfo flinfo;
TriggerData trigdata;
PLpgSQL_function* function = NULL;
errno_t rc = EOK;
rc = memset_s(&fake_fcinfo, sizeof(fake_fcinfo), 0, sizeof(fake_fcinfo));
securec_check(rc, "\0", "\0");
rc = memset_s(&flinfo, sizeof(flinfo), 0, sizeof(flinfo));
securec_check(rc, "\0", "\0");
fake_fcinfo.flinfo = &flinfo;
flinfo.fn_oid = REL_GET_ITH_TRIG(rel, trig_idx).tgfoid;
flinfo.fn_mcxt = CurrentMemoryContext;
rc = memset_s(&trigdata, sizeof(trigdata), 0, sizeof(trigdata));
securec_check(rc, "\0", "\0");
trigdata.type = T_TriggerData;
fake_fcinfo.context = (Node*)&trigdata;
function = plpgsql_compile_nohashkey(&fake_fcinfo);
* We need table attribute info to set NEW and OLD rec in the downstream,
* and also need pre_parse_trig flag to indicate we are doing pre-parsing
* for trigger body
*/
function->tg_relation = rel;
function->pre_parse_trig = true;
res = plpgsql_is_trigger_shippable(function);
function->pre_parse_trig = false;
function->tg_relation = NULL;
plpgsql_free_function_memory(function);
} else {
shipping_context context;
errno_t rc = memset_s(&context, sizeof(context), 0, sizeof(context));
securec_check(rc, "\0", "\0");
context.is_randomfunc_shippable = RANDOM_SHIPPABLE;
context.is_ecfunc_shippable = IS_STREAM_PLAN;
if (!pgxc_is_func_shippable(REL_GET_ITH_TRIG(rel, trig_idx).tgfoid, &context))
res = false;
}
return res;
}
* get_var_from_node
* Get a single Var reference base on node type
* FuncExpr must have allowed (non hash breaking) type cast funcid to work.
* @param The input node from expressions
* A function that defines the Oid range of FuncExpr, default is reject(false)
* @return The Var related to the input node expression
*/
Var* get_var_from_node(Node* node, bool (*func)(Oid))
{
Var* var = NULL;
if (node == NULL)
return NULL;
switch (nodeTag(node)) {
case T_Var: {
var = (Var*)node;
break;
}
case T_FuncExpr: {
FuncExpr* fvar = (FuncExpr*)node;
if (func(fvar->funcid) && fvar->args) {
var = get_var_from_node((Node*)linitial(fvar->args), func);
}
break;
}
case T_RelabelType: {
RelabelType* rtvar = (RelabelType*)node;
if (rtvar->arg == NULL)
return NULL;
var = get_var_from_node((Node*)rtvar->arg);
break;
}
default:
return NULL;
}
return var;
}
* has_shippable_insert_rte:
* Check rte feature for insert select.
* @param rte pending for evaluate
* @return true if ready to ship as part of insertion
*/
static bool has_shippable_insert_rte(RangeTblEntry* rte)
{
if (rte == NULL)
return false;
if (list_length(rte->partitionOidList) > 0)
return false;
if (rte->sublink_pull_up || rte->subquery_pull_up)
return false;
if (rte->isbucket || rte->relhasbucket || rte->lateral || rte->isexcluded)
return false;
if (rte->correlated_with_recursive_cte)
return false;
return true;
}
* get_rte_orientation_recurse:
* Recursively find the source RTE and return its orientation.
* @param the query, the var from upper-level query's targetEntry,
* the address of AttrNumber whos value get updated on recurse.
* @return the orientation of the rte relation.
*/
static RelOrientation get_rte_orientation_recurse(Query* query, Var* var, AttrNumber* attrno)
{
check_stack_depth();
TargetEntry* tar = get_tle_by_resno(query->targetList, var->varattno);
Var* tvar = NULL;
if (tar) {
tvar = get_var_from_node((Node*)tar->expr, func_oid_check_restricted);
}
if (tvar == NULL || attrno == NULL) {
return REL_ORIENT_UNKNOWN;
}
*attrno = tvar->varattno;
RangeTblEntry* rte = rt_fetch(tvar->varno, query->rtable);
if (rte->rtekind == RTE_RELATION) {
return rte->orientation;
} else if (rte->rtekind == RTE_SUBQUERY) {
return get_rte_orientation_recurse(rte->subquery, tvar, attrno);
} else {
return REL_ORIENT_UNKNOWN;
}
}
Expr* get_RelabelType_expr(Expr* expr)
{
if (IsA(expr, RelabelType)) {
return ((RelabelType*)expr)->arg;
} else if (IsA(expr, FuncExpr) && list_length(((FuncExpr*)expr)->args) == 1 &&
IsFunctionShippable(((FuncExpr*)expr)->funcid)) {
return (Expr*)linitial(((FuncExpr*)expr)->args);
} else {
}
return expr;
}
bool equal_var(Expr* var, Index varno, AttrNumber attrNum)
{
if (!IsA(var, Var)) {
return false;
}
if (((Var*)var)->varno == varno && ((Var*)var)->varattno == attrNum) {
return true;
}
return false;
}
* find_distcol_expr
* This is the slightly twisted version of pgxc_find_distcol_expr
* It returns the whole expression instead of the lower level expression.
*/
static Expr* find_distcol_expr(void* query_arg, Index varno, AttrNumber attrNum, Node* quals)
{
List* lquals = NULL;
ListCell* qual_cell = NULL;
Query* query = (Query*)query_arg;
if (quals == NULL)
return NULL;
if (!IsA(quals, List))
lquals = make_ands_implicit((Expr*)quals);
else
lquals = (List*)quals;
foreach (qual_cell, lquals) {
Expr* qual_expr = (Expr*)lfirst(qual_cell);
OpExpr* op = NULL;
Expr* lexpr = NULL;
Expr* rexpr = NULL;
Var* var_expr = NULL;
Expr* distcol_expr = NULL;
if (IsA(qual_expr, NullTest)) {
NullTest* nt = (NullTest*)qual_expr;
if (nt->nulltesttype == IS_NULL && IsA(nt->arg, Var)) {
var_expr = (Var*)(nt->arg);
if (!equal_var((Expr*)var_expr, varno, attrNum))
continue;
distcol_expr = (Expr*)makeNullConst(var_expr->vartype, var_expr->vartypmod, var_expr->varcollid);
return distcol_expr;
}
continue;
}
if (!IsA(qual_expr, OpExpr))
continue;
op = (OpExpr*)qual_expr;
if (list_length(op->args) != 2)
continue;
lexpr = (Expr*)linitial(op->args);
rexpr = (Expr*)lsecond(op->args);
lexpr = get_RelabelType_expr(lexpr);
rexpr = get_RelabelType_expr(rexpr);
if (equal_var(lexpr, varno, attrNum)) {
var_expr = (Var*)lexpr;
distcol_expr = rexpr;
} else if (equal_var(rexpr, varno, attrNum)) {
var_expr = (Var*)rexpr;
distcol_expr = lexpr;
} else {
continue;
}
Var baserel_var = *var_expr;
if (var_expr->varlevelsup == 0) {
(void)get_real_rte_varno_attno(query, &(baserel_var.varno), &(baserel_var.varattno));
var_expr = &baserel_var;
}
if (!equal_var((Expr*)var_expr, varno, attrNum))
continue;
if (!op_mergejoinable(op->opno, exprType((Node*)lexpr)) && !op_hashjoinable(op->opno, exprType((Node*)lexpr)))
continue;
return distcol_expr;
}
return NULL;
}
Expr* get_coerced_expr(Query* query, RelationLocInfo* loc_info, AttrNumber attnum, Expr* distcol_expr)
{
Oid reloid = loc_info->relid;
Oid disttype = get_atttype(reloid, attnum);
int32 disttypmod = get_atttypmod(reloid, attnum);
if (distcol_expr != NULL) {
Oid exprtype = exprType((Node*)distcol_expr);
if (disttype == NUMERICOID || disttype == BPCHAROID || disttype == VARCHAROID) {
if (can_coerce_type(1, &exprtype, &disttype, COERCION_ASSIGNMENT)) {
distcol_expr = (Expr*)coerce_type(NULL,
(Node*)distcol_expr,
exprtype,
disttype,
disttypmod,
COERCION_ASSIGNMENT,
COERCE_IMPLICIT_CAST,
NULL,
NULL,
-1);
} else {
distcol_expr = NULL;
}
} else {
distcol_expr = (Expr*)coerce_to_target_type(NULL,
(Node*)distcol_expr,
exprtype,
disttype,
disttypmod,
COERCION_ASSIGNMENT,
COERCE_IMPLICIT_CAST,
NULL,
NULL,
-1);
}
} else {
return NULL;
}
return distcol_expr;
}
* find_distcol_expr_recurse:
* This function can traverse all levels of the subquery and
* calls the pgxc_find_distcol_expr() on each level.
* @param the query, its jointree and the var from targetEntry, single node flag
* @return the expr of given var
*/
static Expr* find_distcol_expr_recurse(Query* query, FromExpr* from_expr, Var* var, bool* no_quals, bool single_node)
{
check_stack_depth();
Expr* expr = NULL;
Var* tvar = NULL;
RangeTblEntry* rte = rt_fetch(var->varno, query->rtable);
if (from_expr == NULL) {
return NULL;
}
if (no_quals) {
*no_quals = (from_expr->quals == NULL);
}
expr = find_distcol_expr(query, var->varno, var->varattno, from_expr->quals);
if (rte->rtekind != RTE_SUBQUERY || (expr && !IsA(expr, Var))) {
return expr;
}
Query* subquery = rte->subquery;
TargetEntry* tar = get_tle_by_resno(subquery->targetList, var->varattno);
if (tar == NULL) {
return NULL;
}
tvar = get_var_from_node((Node*)tar->expr, func_oid_check_restricted);
if (tvar == NULL) {
return single_node ? tar->expr : NULL;
}
return find_distcol_expr_recurse(subquery, subquery->jointree, tvar, no_quals, single_node);
}
* has_shippable_col_for_insert:
* Find the source RTE and check if the column is ready to ship.
* @param the SELECT query, SELECT target entries, target RTE from INSERT
* @return insert select flags: unshippable or shippable distribution column/normal column
*/
static int has_shippable_col_for_insert(Query* query, TargetEntry* tar, RangeTblEntry* rte)
{
Var* tvar = NULL;
tvar = get_var_from_node((Node*)tar->expr, func_oid_check_restricted);
if (tvar == NULL)
return INSEL_UNSHIPPABLE_COL;
RangeTblEntry* rte_qry = rt_fetch(tvar->varno, query->rtable);
if (!has_shippable_insert_rte(rte_qry)) {
return INSEL_UNSHIPPABLE_COL;
}
if (tar->resorigtbl == 0) {
return INSEL_UNSHIPPABLE_COL;
}
Oid relid = tar->resorigtbl;
AttrNumber attrno = tvar->varattno;
RelOrientation orien = REL_ORIENT_UNKNOWN;
if (rte_qry->rtekind == RTE_SUBQUERY) {
orien = get_rte_orientation_recurse(rte_qry->subquery, tvar, &attrno);
} else {
orien = rte_qry->orientation;
}
if (rte->orientation != orien || orien == REL_ORIENT_UNKNOWN) {
return INSEL_UNSHIPPABLE_COL;
}
if (attrno == 0) {
return INSEL_UNSHIPPABLE_COL;
}
* Check if it is a distribution column
* if not, return shippable normal column flag
* if true, check if this column is defined in quals.
*/
if (!IsDistribColumn(relid, attrno)) {
return INSEL_SHIPPABLE_NCOL;
}
bool no_quals = true;
if (find_distcol_expr_recurse(query, query->jointree, tvar, &no_quals, false) || no_quals) {
return INSEL_SHIPPABLE_DCOL;
}
return INSEL_UNSHIPPABLE_COL;
}
* has_consistent_execnodes:
* Check exec nodes consistency for INSERT SELECT queries.
* @param exec nodes of target RTE and subqury from INSERT SELECT; dist column count (update with function);
* single_node flag: single DN execution ignores some of the limitations.
* @return true if consistent, else false.
*/
static bool has_consistent_execnodes(ExecNodes* exec_nodes_rte, ExecNodes* exec_nodes_qry, int* dcol_cnt)
{
ListCell* lc = NULL;
List* dist_vars = NIL;
Distribution distr_r;
Distribution distr_q;
if (exec_nodes_rte == NULL || exec_nodes_qry == NULL)
return false;
distr_r = exec_nodes_rte->distribution;
distr_q = exec_nodes_qry->distribution;
if (distr_r.group_oid != distr_q.group_oid) {
return false;
}
if (IsExecNodesReplicated(exec_nodes_rte) && IsExecNodesReplicated(exec_nodes_qry)) {
return true;
}
if (exec_nodes_qry->accesstype != RELATION_ACCESS_READ) {
return false;
}
* Agree on numbers
* Shows that all RTEs have the same number of distribution columns.
* It does not care about RTEs being subqueries or anything else.
*/
int dcnt = 0;
foreach(lc, exec_nodes_rte->en_dist_vars)
{
dist_vars = (List*)lfirst(lc);
dcnt = list_length(dist_vars);
}
if (dcnt <= 0 || list_length(exec_nodes_rte->en_dist_vars) > 1)
return false;
if (dcol_cnt) {
*dcol_cnt = dcnt;
}
foreach(lc, exec_nodes_qry->en_dist_vars)
{
dist_vars = (List*)lfirst(lc);
int cnt = list_length(dist_vars);
if (cnt != dcnt) {
return false;
}
}
return true;
}
* is_insert_subquery_deparsable
* Check if this INSERT statement is able to parse back with get_var_from_node().
* Node: We want to make sure it is able to parse back with get_var_from_node()
* so that the query can ship to datanodes with correct column order.
* @param query and subquery of INSERT statement
* @return true if all target entries are able to parse back with get_var_from_node()
*/
static bool is_insert_subquery_deparsable(Query* query, Query* subquery)
{
ListCell* lc = NULL;
* Before the targetList is modified by anywhere other than rewriteTargetListIU(),
* we can assure that the difference between query's and subquery's targetList is
* default values. Then, we need to make sure all default values can be identified
* by our deparser.
*
* The statement is unshippable(and not valid) if negative number of default values is given.
*/
int default_value_cnt = list_length(query->targetList) - list_length(subquery->targetList);
if (unlikely(default_value_cnt < 0)) {
return false;
}
foreach(lc, query->targetList) {
TargetEntry* tar = (TargetEntry*)lfirst(lc);
* If we can get a Var from the targetEntry's expr, we can deparse it.
*/
Var* var = get_var_from_node((Node*)tar->expr, func_oid_check_pass);
if (var && (var->varattno <= list_length(subquery->targetList))) {
continue;
}
* If we can't get a Var, it MUST be a default value.
* We make this assumption base on the fact that all default value
* should end up being a Const expression.
* However, if we found more Const expressions than possible number
* of default values, it means that we are unable to deparse all
* non-default columns (since we need a Var to do that and one of them
* doesn't have one).
* (e.g. INSERT INTO t1(a, b) SELECT a, $1 FROM t2;
* `b` here is referencing a Param, not a Var,
* and if `t1` has a column `c` with default value '1',
* we will end up here)
*/
if (default_value_cnt == 0) {
return false;
}
* We only handle the expressions that can be evaluated.
*/
if (eval_const_expressions(NULL, (Node*)tar->expr)) {
default_value_cnt--;
} else {
return false;
}
}
if (default_value_cnt != 0) {
return false;
}
return true;
}
* get_relative_dist_pos
* Get the relative position of a distribution column in ExecNodes.
* varno is not required if the RTE is target table.
* @param exec nodes of RTE, table indices(varno, varattno), is it a target table of a INSERT query?
* @return position of the distribution column relative to other distribution columns in RTE
*/
int get_relative_dist_pos(ExecNodes* exec_nodes, Index varno, AttrNumber varattno, bool is_target_table)
{
ListCell* lc = NULL;
ListCell* dlc = NULL;
List* dist_vars = NIL;
foreach(lc, exec_nodes->en_dist_vars) {
dist_vars = (List*)lfirst(lc);
int pos = 0;
foreach(dlc, dist_vars)
{
Var* tvar = (Var*)lfirst(dlc);
pos++;
if ((is_target_table || varno == tvar->varno) && varattno == tvar->varattno) {
return pos;
}
}
}
return 0;
}
* get_nodeList_from_dexprs
* Get the execute datanode list from distribution expressions.
* NOTE: This is a proprietary method, do not use this to get exec_nodes.
* @param query, subquery, token list and access type
* @return datenode list
*/
List* get_nodeList_from_dexprs(
Query* query, Query* subquery, List* dexprList, List* idx_dist, List* distcols)
{
int num_of_distcols = list_length(dexprList);
if (list_length(idx_dist) != num_of_distcols) {
list_free_ext(idx_dist);
return NULL;
}
Datum* distcol_value = (Datum*)palloc(num_of_distcols * sizeof(Datum));
bool* distcol_isnull = (bool*)palloc(num_of_distcols * sizeof(bool));
Oid* distcol_type = (Oid*)palloc(num_of_distcols * sizeof(Oid));
int i = 0;
RelationLocInfo* loc_info =
GetRelationLocInfo((rt_fetch(linitial_int(query->resultRelations), query->rtable))->relid);
ListCell* lc = NULL;
ListCell* lc2 = NULL;
forboth(lc, dexprList, lc2, distcols) {
Node* node = (Node*)lfirst(lc);
int distAttno = lfirst_int(lc2);
Const* const_expr = NULL;
* We need to find the expression associated with the given Var.
* And then try to transform the expression into a equivalent Const.
*/
if (IsA(node, Var)) {
Var* var_expr = (Var*)node;
Expr* expr = find_distcol_expr_recurse(subquery, subquery->jointree, var_expr, NULL, true);
expr = get_coerced_expr(query, loc_info, distAttno, expr);
node = eval_const_expressions_params(NULL, (Node*)expr, query->boundParamsQ);
}
if (node == NULL || !IsA(node, Const)) {
free_distcol_info(distcol_value, distcol_isnull, distcol_type, idx_dist);
return NIL;
}
* If it is a Const/can be transformed into Const,
* we simply put it in the eval space.
*/
const_expr = (Const*)node;
distcol_value[i] = const_expr->constvalue;
distcol_isnull[i] = const_expr->constisnull;
distcol_type[i] = const_expr->consttype;
i++;
}
ExecNodes* exec_nodes = GetRelationNodes(
loc_info, distcol_value, distcol_isnull, distcol_type, idx_dist, RELATION_ACCESS_READ);
free_distcol_info(distcol_value, distcol_isnull, distcol_type, idx_dist);
return exec_nodes->nodeList;
}
* get_one_dexpr
* Roughly get one distribute expression on targetlists.
* @param query, subquery, target entry of the INSERT query
* @return distribute expression
*/
Expr* get_one_dexpr(Query* query, Query* subquery, TargetEntry* tar_rte)
{
Expr* dexpr = NULL;
dexpr = (Expr*)get_var_from_node((Node*)tar_rte->expr, func_oid_check_restricted);
if (dexpr == NULL) {
dexpr = (Expr*)eval_const_expressions(NULL, (Node*)tar_rte->expr);
if (dexpr == NULL) {
return NULL;
}
return dexpr;
}
TargetEntry* tar_qry = get_tle_by_resno(subquery->targetList, ((Var*)dexpr)->varattno);
if (tar_qry == NULL) {
return NULL;
}
dexpr = (Expr*)get_var_from_node((Node*)tar_qry->expr, func_oid_check_restricted);
if (dexpr == NULL) {
dexpr = (Expr*)eval_const_expressions_params(NULL, (Node*)tar_qry->expr, query->boundParamsQ);
if (dexpr == NULL) {
return NULL;
}
}
return dexpr;
}
* check_insert_subquery_on_singlenode
* If subquery can be executed on a single node. All we need to do is to make sure
* the INSERT can be executed on the same node.
* @param query, subquery, exec_nodes of query and subquery
* @return true if node list matches, else false
*/
bool check_insert_subquery_on_singlenode(
Query* query, Query* subquery, ExecNodes* en_rte, ExecNodes* en_qry, Shippability_context* sc_context)
{
List* dexprList = NIL;
List* posList = NIL;
List* refList = NIL;
Expr* dexpr = NULL;
Oid relid_rte = (rt_fetch(linitial_int(query->resultRelations), query->rtable))->relid;
ListCell* lc = NULL;
foreach(lc, query->targetList) {
TargetEntry* tar_rte = (TargetEntry*)lfirst(lc);
if (!IsDistribColumn(relid_rte, tar_rte->resno)) {
continue;
}
posList = lappend_int(posList, get_relative_dist_pos(en_rte, 0, tar_rte->resno, true) - 1);
dexpr = get_one_dexpr(query, subquery, tar_rte);
if (dexpr == NULL) {
list_free_ext(dexprList);
list_free_ext(posList);
list_free_ext(refList);
return false;
}
dexprList = lappend(dexprList, dexpr);
if (IsA(dexpr, Var)) {
refList = lappend_int(refList, tar_rte->resno);
} else {
refList = lappend_int(refList, 0);
}
}
List* nodeList = get_nodeList_from_dexprs(query, subquery, dexprList, posList, refList);
list_free_ext(dexprList);
list_free_ext(refList);
if (nodeList && list_is_subset_int(nodeList, en_qry->nodeList) && sc_context->sc_exec_nodes) {
sc_context->sc_exec_nodes->nodeList = nodeList;
return true;
}
return false;
}
bool check_replicated_junktlist(Query* subquery)
{
ListCell *lc = NULL;
foreach (lc, subquery->targetList) {
TargetEntry *en = (TargetEntry *)lfirst(lc);
if (!IsA(en->expr, Var)) {
continue;
}
Var *v = (Var *)en->expr;
if (v->varattno < 0) {
return true;
}
}
return false;
}
* check_insert_subquery_shippability:
* Check if an INSERT SELECT query is shippable.
* Need to make sure the distribution of the subquery is matching with the distribution of the target table.
* @param INSERT query, SELECT subquery, Shippability_context
* @return void, but sc_context->inselect got updated
*/
static void check_insert_subquery_shippability(Query* query, Query* subquery, Shippability_context* sc_context)
{
ExecNodes* exec_nodes_rte = NULL;
ExecNodes* exec_nodes_qry = NULL;
int dcnt_all = 0;
int dcnt_pst = 0;
bool contain_column_store = sc_context->sc_contain_column_store;
bool use_star_targets = false;
if (query->rowMarks || subquery->rowMarks) {
return;
}
RangeTblEntry *rte = rt_fetch(linitial_int(query->resultRelations), query->rtable);
if (!has_shippable_insert_rte(rte) || !is_insert_subquery_deparsable(query, subquery)) {
return;
}
if (rte->rtekind != RTE_RELATION || rte->relid == 0) {
return;
}
Oid relid_rte = rte->relid;
exec_nodes_rte = pgxc_FQS_get_relation_nodes(rte, (Index)linitial_int(query->resultRelations), query);
exec_nodes_qry = pgxc_is_query_shippable(subquery,
sc_context->sc_query_level + 1,
sc_context->sc_light_proxy,
&contain_column_store,
&use_star_targets);
if (contain_column_store)
sc_context->sc_contain_column_store = contain_column_store;
if (use_star_targets)
query->use_star_targets = use_star_targets;
if (unlikely(sc_context->sc_exec_nodes != NULL)) {
return;
}
sc_context->sc_exec_nodes = exec_nodes_qry;
if (!has_consistent_execnodes(exec_nodes_rte, exec_nodes_qry, &dcnt_all)) {
return;
}
* If we know it is going to be executed on a single node, we can evaluate the shippability
* base on the node being executed on, instead of the consistency of distribution.
*/
if (((IsExecNodesReplicated(exec_nodes_qry) && exec_nodes_rte->baselocatortype == LOCATOR_TYPE_HASH) ||
ExecOnSingleNode(exec_nodes_qry->nodeList)) &&
check_insert_subquery_on_singlenode(query, subquery, exec_nodes_rte, exec_nodes_qry, sc_context)) {
sc_context->sc_inselect = true;
return;
}
if ((exec_nodes_rte->baselocatortype != exec_nodes_qry->baselocatortype)) {
return;
}
if (IsExecNodesReplicated(exec_nodes_rte) &&
IsExecNodesReplicated(exec_nodes_qry) &&
check_replicated_junktlist(subquery)) {
return;
}
bool isDistBySlice = IsLocatorDistributedBySlice(exec_nodes_rte->baselocatortype);
if (exec_nodes_rte->baselocatortype != LOCATOR_TYPE_HASH && !isDistBySlice) {
sc_context->sc_inselect = IsExecNodesReplicated(exec_nodes_rte);
return;
}
if (isDistBySlice && !PgxcIsDistInfoSame(exec_nodes_rte, exec_nodes_qry)) {
return;
}
* To see if a INSERT SELECT query is shippable, we need to focus on the distribution
* of both the target table and the output of SELECT subquery. And it is necessary to
* make sure the distribution of the table and targetlist of subquery has the same pattern.
* e.g.
* [table t1(a, b, c) hash(a, b)] and [table t2(b, c, d) hash(b, c)] has the same pattern
* but
* [table t1(a, b, c) hash(a, b)] and [table t2(b, c, d) hash(c, b)] does not.
* Note that we are not interested in what happened to those non distribution columns,
* but we do apply the same limitations on them for obvious reasons.
*/
ListCell* lc = NULL;
foreach (lc, query->targetList) {
bool is_dcol_qry = false;
Var* tvar_rte = NULL;
Var* tvar_qry = NULL;
Index src_varno = 0;
TargetEntry* tar_rte = (TargetEntry*)lfirst(lc);
if (!IsDistribColumn(relid_rte, tar_rte->resno)) {
continue;
}
tvar_rte = get_var_from_node((Node*)tar_rte->expr, func_oid_check_restricted);
if (tvar_rte == NULL) {
return;
}
TargetEntry* tar_qry = get_tle_by_resno(subquery->targetList, tvar_rte->varattno);
if (tar_qry == NULL) {
return;
}
tvar_qry = get_var_from_node((Node*)tar_qry->expr, func_oid_check_restricted);
if (tvar_qry == NULL) {
return;
}
int ship_flag = has_shippable_col_for_insert(subquery, tar_qry, rte);
if (ship_flag == INSEL_SHIPPABLE_DCOL) {
is_dcol_qry = true;
} else {
return;
}
Index new_src_varno = tvar_qry->varno;
dcnt_pst++;
if (src_varno == 0) {
src_varno = new_src_varno;
} else if (src_varno != new_src_varno) {
return;
} else {
}
int pos_r = get_relative_dist_pos(exec_nodes_rte, linitial_int(query->resultRelations), tar_rte->resno, true);
int pos_q = get_relative_dist_pos(exec_nodes_qry, tvar_qry->varno, tvar_qry->varattno, false);
if (pos_r != pos_q) {
return;
}
if (pos_r == 0) {
return;
}
}
if (dcnt_pst == dcnt_all && dcnt_pst != 0) {
sc_context->sc_inselect = true;
}
return;
}
static bool pgxc_check_insert_rte_shippability(Query* query, bool sc_light_proxy, Shippability_context* sc_context)
{
RangeTblEntry* rte = NULL;
RangeTblEntry* values_rte = NULL;
RangeTblEntry* excluded_rte = NULL;
RangeTblEntry* target_rte = NULL;
ListCell* cell = NULL;
List* rtables = query->rtable;
foreach(cell, rtables) {
rte = (RangeTblEntry*)lfirst(cell);
if (rte->rtekind == RTE_VALUES) {
if (values_rte != NULL) {
return false;
}
values_rte = rte;
} else if (rte->rtekind == RTE_RELATION) {
if (rte->isexcluded) {
if (excluded_rte != NULL) {
return false;
}
excluded_rte = rte;
} else {
if (target_rte != NULL) {
return false;
}
target_rte = rte;
}
} else if (rte->rtekind == RTE_SUBQUERY) {
if (!sc_context->sc_inselect) {
check_insert_subquery_shippability(query, rte->subquery, sc_context);
}
return sc_context->sc_inselect;
} else {
return false;
}
}
return true;
}
* check whether expr of targestList can be shipped for distribution column
*/
static void pgxc_set_insert_shippability(Query* query, Shippability_context* sc_context)
{
RangeTblEntry* rte = NULL;
ListCell* cell = NULL;
ListCell* attCell = NULL;
Assert(query->commandType == CMD_INSERT);
rte = (RangeTblEntry*)list_nth(query->rtable, linitial_int(query->resultRelations) - 1);
if (list_length(query->rtable) == 1 &&
(RELKIND_RELATION != rte->relkind || rte->partAttrNum == NIL)) {
return;
}
if (pgxc_check_insert_rte_shippability(query, sc_context->sc_light_proxy, sc_context)) {
if (!sc_context->sc_inselect) {
pgxc_set_shippability_reason(sc_context, SS_NEED_SINGLENODE);
} else {
return;
}
} else {
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
return;
}
AttrNumber attnum;
bool shippable_qual = false;
foreach (attCell, rte->partAttrNum) {
attnum = lfirst_int(attCell);
foreach (cell, query->targetList) {
TargetEntry* tle = (TargetEntry*)lfirst(cell);
if (tle->resno == attnum) {
bool is_randomfunc_shippable_state = u_sess->opt_cxt.is_randomfunc_shippable;
u_sess->opt_cxt.is_randomfunc_shippable = false;
shippable_qual = pgxc_is_expr_shippable(tle->expr, NULL);
u_sess->opt_cxt.is_randomfunc_shippable = is_randomfunc_shippable_state;
if (!shippable_qual) {
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
return;
}
if (contain_volatile_functions((Node*)tle->expr)) {
pgxc_set_shippability_reason(sc_context, SS_UNSHIPPABLE_EXPR);
return;
}
break;
}
}
}
}
* pgxc_check_dynamic_param:
* If there's dynamicExpr that need to dynamically determine if the query can
* be pushed to one datanode, use the value of params to judge it. If the value
* of params are different, it can't be pushed down, and return false.
*/
bool pgxc_check_dynamic_param(List* dynamicExpr, ParamListInfo params)
{
if (dynamicExpr == NIL)
return true;
if (params == NULL)
return false;
ListCell* lc = NULL;
Node* node = (Node*)linitial(dynamicExpr);
Datum value = 0;
bool isnull = false;
Oid nodeType = InvalidOid;
if (IsA(node, Param)) {
Param* par = (Param*)node;
Assert(par->paramkind == PARAM_EXTERN);
value = params->params[par->paramid - 1].value;
isnull = params->params[par->paramid - 1].isnull;
nodeType = par->paramtype;
} else
return false;
Type typ = typeidType(nodeType);
if (!OidIsValid(nodeType) || !typeByVal(typ)) {
ReleaseSysCache(typ);
return false;
}
ReleaseSysCache(typ);
foreach (lc, dynamicExpr) {
Datum value_local = 0;
Datum isnull_local = false;
if (lc == list_head(dynamicExpr))
continue;
node = (Node*)lfirst(lc);
if (IsA(node, Param)) {
Param* par = (Param*)node;
Assert(par->paramkind == PARAM_EXTERN);
value_local = params->params[par->paramid - 1].value;
isnull_local = params->params[par->paramid - 1].isnull;
} else
return false;
if (isnull != isnull_local)
break;
else if (!isnull && !datumIsEqual(value, value_local, true, -1))
break;
}
if (lc != NULL)
return false;
return true;
}
* @Description : check the shippable of query which is in trigger function.
* @in expr : the expr in trigger body.
* @return : true if the query in trigger is shippable.
*/
bool pgxc_is_query_shippable_in_trigger(PLpgSQL_expr* expr)
{
List* parsetree_list = NIL;
ListCell* parsetree_item = NULL;
MemoryContext oldcontext;
MemoryContext trigship_context;
bool can_ship = true;
oldcontext = MemoryContextSwitchTo(t_thrd.mem_cxt.msg_mem_cxt);
parsetree_list = pg_parse_query(expr->query);
MemoryContextSwitchTo(oldcontext);
trigship_context = AllocSetContextCreate(t_thrd.mem_cxt.msg_mem_cxt,
"TrigshipContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
foreach (parsetree_item, parsetree_list) {
List* querytree_list = NIL;
ListCell* querytree_item = NULL;
Node* parsetree = (Node*)lfirst(parsetree_item);
bool snapshot_set = false;
if (analyze_requires_snapshot(parsetree)) {
PushActiveSnapshot(GetTransactionSnapshot());
snapshot_set = true;
}
oldcontext = MemoryContextSwitchTo(trigship_context);
* Generate the querytrees throuth querys in trigger body and the main
* processing point is the get column info with no cur_estate, so we
* filled the necessary info like copy_plpgsql_datum do.
*/
querytree_list =
pg_analyze_and_rewrite_params(parsetree, expr->query, (ParserSetupHook)plpgsql_parser_setup, (void*)expr);
foreach (querytree_item, querytree_list) {
Query* query = castNode(Query, lfirst(querytree_item));
if (query->commandType == CMD_UTILITY) {
can_ship = false;
break;
} else {
Shippability_context sc_context;
ExecNodes* exec_nodes = NULL;
errno_t rc = EOK;
rc = memset_s(&sc_context, sizeof(sc_context), '\0', sizeof(sc_context));
securec_check(rc, "\0", "\0");
sc_context.sc_query = query;
sc_context.sc_query_level = 0;
sc_context.sc_for_expr = false;
sc_context.sc_for_trigger = true;
sc_context.sc_plpgsql_func = (void*)expr->func;
sc_context.sc_plpgsql_check_colocate_hook = plpgsql_check_colocate;
* We also use this walker to check the shippable of query in trigger as same
* as we do to the query outside the trigger, but there are some special logic
* only for the inner statement of the trigger.
*/
pgxc_shippability_walker((Node*)query, &sc_context);
exec_nodes = sc_context.sc_exec_nodes;
* Exec_nodes is empty means not shippable.
* SELECT command is unshippable as it can not be checked colocate with IUD.
*/
if (exec_nodes == NULL || query->commandType == CMD_SELECT) {
bms_free_ext(sc_context.sc_shippability);
can_ship = false;
break;
}
if (!bms_is_empty(sc_context.sc_shippability)) {
FreeExecNodes(&exec_nodes);
bms_free_ext(sc_context.sc_shippability);
can_ship = false;
break;
}
FreeExecNodes(&exec_nodes);
}
}
MemoryContextSwitchTo(oldcontext);
MemoryContextDelete(trigship_context);
if (snapshot_set)
PopActiveSnapshot();
if (!can_ship)
return false;
}
return can_ship;
}
* check_has_sameAlias
*
* @Description : called by pgxc_shippability_walker for check
* if the subquery/sublink's targets has same alias names.
*
* @in targetList : the targetList of subquery/sublink.
* @return : true if has same alias names.
*/
static bool check_has_sameAlias(List* targetList)
{
ListCell* cell = NULL;
foreach (cell, targetList) {
TargetEntry* target_1 = (TargetEntry*)lfirst(cell);
ListCell* cell2 = cell;
while ((cell2 = lnext(cell2)) != NULL) {
TargetEntry* targest_2 = (TargetEntry*)lfirst(cell2);
if (target_1->resname && targest_2->resname &&
strncmp(target_1->resname, targest_2->resname, strlen(targest_2->resname) + 1) == 0) {
return true;
}
}
}
return false;
}
* @Description : called by pgxc_shippability_walker for check the shippable of query in trigger
* @in expr : the expr in trigger body.
* @return : true if the query in trigger is shippable.
*/
static bool pgxc_check_shippability_in_trigger(Query* query, Shippability_context* sc_context)
{
RangeTblEntry* rte = NULL;
rte = (RangeTblEntry*)list_nth(query->rtable, linitial_int(query->resultRelations) - 1);
if (rte->relkind != RELKIND_RELATION || rte->partAttrNum == NIL)
return true;
if (sc_context->sc_plpgsql_check_colocate_hook != NULL) {
return (*sc_context->sc_plpgsql_check_colocate_hook)(query, rte, sc_context->sc_plpgsql_func);
}
return false;
}