*
* streamplan_single.cpp
* functions related to stream plan.
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/gausskernel/optimizer/plan/streamplan_single.cpp
*
* -------------------------------------------------------------------------
*/
#include <math.h>
#include <pthread.h>
#include "access/heapam.h"
#include "access/transam.h"
#include "access/hash.h"
#include "catalog/pg_class.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_inherits_fn.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_trigger.h"
#include "catalog/pg_type.h"
#include "catalog/pgxc_class.h"
#include "catalog/pgxc_node.h"
#include "commands/explain.h"
#include "commands/tablecmds.h"
#include "foreign/fdwapi.h"
#include "foreign/foreign.h"
#include "funcapi.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/print.h"
#include "optimizer/cost.h"
#include "optimizer/clauses.h"
#include "optimizer/dataskew.h"
#include "optimizer/nodegroups.h"
#include "optimizer/pathnode.h"
#include "optimizer/pgxcship.h"
#include "optimizer/planmain.h"
#include "optimizer/planner.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/streamplan.h"
#include "optimizer/tlist.h"
#include "parser/parse_collate.h"
#include "parser/parse_coerce.h"
#include "parser/parse_clause.h"
#include "parser/parse_merge.h"
#include "parser/parse_node.h"
#include "parser/parse_oper.h"
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
#include "pgxc/groupmgr.h"
#include "pgxc/locator.h"
#include "pgxc/nodemgr.h"
#include "pgxc/pgxc.h"
#include "pgxc/pgxcnode.h"
#include "pgxc/poolmgr.h"
#include "pgxc/poolutils.h"
#include "postgres.h"
#include "knl/knl_variable.h"
#include "rewrite/rewriteHandler.h"
#include "nodes/pg_list.h"
#include "securec.h"
#include "lz4.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/rel_gs.h"
#include "utils/syscache.h"
void set_default_stream()
{
if (IsInitdb) {
u_sess->opt_cxt.is_stream = false;
u_sess->opt_cxt.is_stream_support = false;
} else {
* And u_sess->stream_cxt.global_obj != NULL means outer query already init
* stream object, do not use smp in inner query.
*/
u_sess->opt_cxt.is_stream = (u_sess->opt_cxt.query_dop > 1 &&
u_sess->opt_cxt.smp_enabled &&
u_sess->stream_cxt.global_obj == NULL);
u_sess->opt_cxt.is_stream_support = u_sess->opt_cxt.is_stream;
}
#ifdef USE_SPQ
if (t_thrd.spq_ctx.spq_role != ROLE_UTILITY) {
u_sess->opt_cxt.is_stream_support = true;
}
if (t_thrd.spq_ctx.spq_role == ROLE_QUERY_COORDINTOR) {
u_sess->opt_cxt.is_stream = u_sess->attr.attr_sql.enable_stream_operator;
}
#endif
}
int2vector* get_baserel_distributekey_no(Oid relid)
{
if (!check_stream_support()) {
return NULL;
}
AttrNumber attnum = 1;
while (true) {
HeapTuple tp;
Form_pg_attribute att_tup;
tp = SearchSysCache2(ATTNUM, ObjectIdGetDatum(relid), Int16GetDatum(attnum));
if (!HeapTupleIsValid(tp)) {
attnum = 0;
ReleaseSysCache(tp);
break;
}
att_tup = (Form_pg_attribute)GETSTRUCT(tp);
if (!att_tup->attisdropped) {
ReleaseSysCache(tp);
break;
}
++attnum;
ReleaseSysCache(tp);
}
if (attnum == 0)
return NULL;
int2 col[1] = { attnum };
int2vector* attnumVec = buildint2vector(col, 1);
return attnumVec;
}
List* build_baserel_distributekey(RangeTblEntry* rte, int relindex)
{
if (IS_PGXC_DATANODE || !IS_PGXC_COORDINATOR || !rte->relid || get_rel_relkind(rte->relid) != RELKIND_RELATION)
return NIL;
DISTRIBUTED_FEATURE_NOT_SUPPORTED();
return NIL;
}
Plan* make_simple_RemoteQuery(Plan* lefttree, PlannerInfo* root, bool is_subplan, ExecNodes* target_exec_nodes)
{
if (NULL == root->glob)
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
(errmsg("Could not find globle planner info when make simple remote query."))));
if (root->glob->insideRecursion)
return lefttree;
if (is_execute_on_coordinator(lefttree) || is_execute_on_allnodes(lefttree))
return lefttree;
if (lefttree->dop > 1) {
lefttree = create_local_gather(lefttree);
}
return lefttree;
}
void add_remote_subplan(PlannerInfo* root, RemoteQuery* result_node)
{
DISTRIBUTED_FEATURE_NOT_SUPPORTED();
}
ExecNodes* get_plan_max_ExecNodes(Plan* lefttree, List* subplans)
{
ExecNodes* final_exec_nodes = NULL;
final_exec_nodes = makeNode(ExecNodes);
final_exec_nodes->nodeList = NIL;
final_exec_nodes->baselocatortype = LOCATOR_TYPE_REPLICATED;
final_exec_nodes->accesstype = RELATION_ACCESS_READ;
final_exec_nodes->primarynodelist = NIL;
final_exec_nodes->en_expr = NULL;
final_exec_nodes->nodeList = lefttree->exec_nodes->nodeList;
Distribution* distribution = ng_convert_to_distribution(final_exec_nodes->nodeList);
ng_set_distribution(&final_exec_nodes->distribution, distribution);
return final_exec_nodes;
}
bool is_replicated_plan(Plan* plan)
{
return false;
}
bool is_hashed_plan(Plan* plan)
{
if (IsA(plan, Stream)) {
if (is_broadcast_stream((Stream*)plan) || is_gather_stream((Stream*)plan))
return false;
else if (is_redistribute_stream((Stream*)plan))
return true;
} else if (plan->exec_nodes != NULL)
return IsLocatorDistributedByHash(plan->exec_nodes->baselocatortype);
return false;
}
bool is_rangelist_plan(Plan* plan)
{
return false;
}
ExecNodes* stream_merge_exec_nodes(Plan* lefttree, Plan* righttree, bool push_nodelist)
{
return lefttree->exec_nodes;
}
#ifdef USE_SPQ
char* SpqCompressSerializedPlan(const char* plan_string, int* cLen)
{
char* compressedPlan = NULL;
int oLen = strlen(plan_string) + 1;
compressedPlan = (char*)palloc0(LZ4_COMPRESSBOUND(oLen));
*cLen = LZ4_compress_default(plan_string, compressedPlan, oLen, LZ4_compressBound(oLen));
validate_LZ4_compress_result(*cLen, MOD_OPT, "compress serialized plan");
return compressedPlan;
}
char* SpqDecompressSerializedPlan(const char* comp_plan_string, int cLen, int oLen)
{
char* serializedPlan = (char*)palloc0(oLen);
int returnLen = LZ4_decompress_safe(comp_plan_string, serializedPlan, cLen, oLen);
if (returnLen < 0) {
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_DATA_CORRUPTED),
errmsg("LZ4 decompressing serialize plan failed, decompressing result %d", returnLen)));
}
if (returnLen != oLen) {
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_OPTIMIZER_INCONSISTENT_STATE),
errmsg("LZ4 decompressing serialize plan failed, returnLen not equal with oLen.")));
}
if (strlen(serializedPlan) + 1 != (uint32)oLen) {
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_OPTIMIZER_INCONSISTENT_STATE),
errmsg("LZ4 decompressing serialize plan failed, length of serializedPlan not euqal with oLen.")));
}
return serializedPlan;
}
static bool IsModifyTable(PlannedStmt* planned_stmt, Plan* node)
{
* fix plan on enable_force_vector_engine = on
* ->Vector Streaming (type: GATHER)
* ->Vector Adapter
* -> ModifyTable
*/
if (IsA(node, RowToVec) || IsA(node, VecToRow))
node = node->lefttree;
if (IsA(node, ModifyTable) || IsA(node, VecModifyTable) ||
(CMD_SELECT != planned_stmt->commandType && IsModifyTableForDfsTable(node)))
return true;
return false;
}
* @Description: get all referenced subplans from current plan.
*
* @in result_plan: current plan
* @in/out context: the context the current plan reference all subplan info
*/
void set_node_ref_subplan_walker(Plan* result_plan, set_node_ref_subplan_context* context)
{
if (NULL == result_plan)
return;
ListCell* lc = NULL;
List* subplan_list = check_subplan_list(result_plan);
foreach (lc, subplan_list) {
Node* pnode = (Node*)lfirst(lc);
SubPlan* subplan = NULL;
Plan* plan = NULL;
if (IsA(pnode, SubPlan)) {
subplan = (SubPlan*)lfirst(lc);
subplan_list = list_concat(subplan_list, check_subplan_expr(subplan->testexpr));
} else {
AssertEreport(IsA(pnode, Param), MOD_OPT, "The current node is not a param node");
Param* param = (Param*)pnode;
ListCell* lc2 = NULL;
foreach (lc2, context->org_initPlan) {
subplan = (SubPlan*)lfirst(lc2);
if (list_member_int(subplan->setParam, param->paramid))
break;
}
if (subplan == NULL || lc2 == NULL)
continue;
}
plan = (Plan*)list_nth(context->org_subplans, subplan->plan_id - 1);
set_node_ref_subplan_walker(plan, context);
context->subplan_plan_ids = lappend_int(context->subplan_plan_ids, subplan->plan_id);
}
switch (nodeTag(result_plan)) {
case T_Append:
case T_VecAppend: {
Append* append = (Append*)result_plan;
ListCell* lc3 = NULL;
foreach (lc3, append->appendplans) {
Plan* plan = (Plan*)lfirst(lc3);
set_node_ref_subplan_walker(plan, context);
}
} break;
case T_ModifyTable:
case T_VecModifyTable: {
ModifyTable* mt = (ModifyTable*)result_plan;
ListCell* lc4 = NULL;
foreach (lc4, mt->plans) {
Plan* plan = (Plan*)lfirst(lc4);
set_node_ref_subplan_walker(plan, context);
}
} break;
case T_SubqueryScan:
case T_VecSubqueryScan: {
SubqueryScan* ss = (SubqueryScan*)result_plan;
if (ss->subplan)
set_node_ref_subplan_walker(ss->subplan, context);
} break;
case T_MergeAppend: {
MergeAppend* ma = (MergeAppend*)result_plan;
ListCell* lc5 = NULL;
foreach (lc5, ma->mergeplans) {
Plan* plan = (Plan*)lfirst(lc5);
set_node_ref_subplan_walker(plan, context);
}
} break;
case T_BitmapAnd:
case T_CStoreIndexAnd: {
BitmapAnd* ba = (BitmapAnd*)result_plan;
ListCell* lc6 = NULL;
foreach (lc6, ba->bitmapplans) {
Plan* plan = (Plan*)lfirst(lc6);
set_node_ref_subplan_walker(plan, context);
}
} break;
case T_BitmapOr:
case T_CStoreIndexOr: {
BitmapOr* bo = (BitmapOr*)result_plan;
ListCell* lc7 = NULL;
foreach (lc7, bo->bitmapplans) {
Plan* plan = (Plan*)lfirst(lc7);
set_node_ref_subplan_walker(plan, context);
}
} break;
case T_ExtensiblePlan: {
ListCell* lc8 = NULL;
foreach(lc8, ((ExtensiblePlan*)result_plan)->extensible_plans) {
set_node_ref_subplan_walker((Plan*)lfirst(lc8), context);
}
} break;
#ifdef USE_SPQ
case T_Sequence: {
Sequence* sequence = (Sequence*)result_plan;
ListCell* lc9 = NULL;
foreach(lc9, sequence->subplans) {
Plan* plan = (Plan*)lfirst(lc9);
set_node_ref_subplan_walker(plan, context);
}
} break;
#endif
default: {
if (result_plan->lefttree)
set_node_ref_subplan_walker(result_plan->lefttree, context);
if (result_plan->righttree)
set_node_ref_subplan_walker(result_plan->righttree, context);
} break;
}
return;
}
static void set_node_ref_subplan(Plan* plan, PlannedStmt* planned_stmt, PlannedStmt* ship_planned_stmt)
{
List *ret_subplans = NIL, *ret_initPlans = NIL;
ListCell *lc1 = NULL, *lc2 = NULL;
set_node_ref_subplan_context context;
context.org_subplans = planned_stmt->subplans;
context.org_initPlan = planned_stmt->initPlan;
context.subplan_plan_ids = NIL;
set_node_ref_subplan_walker(plan, &context);
foreach (lc2, planned_stmt->subplans) {
if (NIL == context.subplan_plan_ids)
ret_subplans = lappend(ret_subplans, NULL);
else {
Plan* tmp_plan = NULL;
foreach (lc1, context.subplan_plan_ids) {
int planid = lfirst_int(lc1);
tmp_plan = (Plan*)list_nth(planned_stmt->subplans, planid - 1);
if (((Plan*)lfirst(lc2))->plan_node_id == tmp_plan->plan_node_id)
break;
}
if (NULL == lc1)
ret_subplans = lappend(ret_subplans, NULL);
else
ret_subplans = lappend(ret_subplans, tmp_plan);
}
}
foreach (lc2, planned_stmt->initPlan) {
if (NIL == context.subplan_plan_ids)
ret_initPlans = lappend(ret_initPlans, NULL);
else {
SubPlan* subplan = (SubPlan*)lfirst(lc2);
if (list_member_int(context.subplan_plan_ids, subplan->plan_id))
ret_initPlans = lappend(ret_initPlans, subplan);
else
ret_initPlans = lappend(ret_initPlans, NULL);
}
}
ship_planned_stmt->subplans = ret_subplans;
ship_planned_stmt->initPlan = ret_initPlans;
}
* Serialized the plan tree to string
*/
void SpqSerializePlan(Plan* node, PlannedStmt* planned_stmt, StringInfoData* str,
RemoteQuery* step, bool push_subplan, uint64 queryId)
{
PlannedStmt* ShipPlannedStmt = NULL;
ShipPlannedStmt = makeNode(PlannedStmt);
if (planned_stmt->commandType != CMD_SELECT && IsModifyTable(planned_stmt, node)) {
ShipPlannedStmt->commandType = planned_stmt->commandType;
ShipPlannedStmt->hasReturning = planned_stmt->hasReturning;
} else {
ShipPlannedStmt->commandType = CMD_SELECT;
ShipPlannedStmt->hasReturning = false;
}
ShipPlannedStmt->queryId = queryId;
ShipPlannedStmt->spq_session_id = planned_stmt->spq_session_id;
ShipPlannedStmt->current_id = planned_stmt->current_id;
ShipPlannedStmt->hasModifyingCTE = planned_stmt->hasModifyingCTE;
ShipPlannedStmt->canSetTag = planned_stmt->canSetTag;
ShipPlannedStmt->transientPlan = planned_stmt->transientPlan;
ShipPlannedStmt->dependsOnRole = planned_stmt->dependsOnRole;
ShipPlannedStmt->planTree = node;
ShipPlannedStmt->rtable = planned_stmt->rtable;
ShipPlannedStmt->dataDestRelIndex = planned_stmt->dataDestRelIndex;
ShipPlannedStmt->MaxBloomFilterNum = planned_stmt->MaxBloomFilterNum;
ShipPlannedStmt->query_mem[0] = planned_stmt->query_mem[0];
ShipPlannedStmt->assigned_query_mem[0] = planned_stmt->assigned_query_mem[0];
ShipPlannedStmt->assigned_query_mem[1] = planned_stmt->assigned_query_mem[1];
* Currently, when delete/update operator applied Dfs table, the append
* plan node will be pushed down, so set ShipPlannedStmt->resultRelations is
* planned_stmt->resultRelations.
*/
if (IsModifyTable(planned_stmt, node))
ShipPlannedStmt->resultRelations = planned_stmt->resultRelations;
else
ShipPlannedStmt->resultRelations = NIL;
ShipPlannedStmt->utilityStmt = planned_stmt->utilityStmt;
*/
if (push_subplan) {
bool with_recursive = ContainRecursiveUnionSubplan(planned_stmt);
if (planned_stmt->subplans && !with_recursive) {
set_node_ref_subplan(node, planned_stmt, ShipPlannedStmt);
} else {
ShipPlannedStmt->subplans = planned_stmt->subplans;
ShipPlannedStmt->initPlan = planned_stmt->initPlan;
}
}
ShipPlannedStmt->subplan_ids = planned_stmt->subplan_ids;
ShipPlannedStmt->rewindPlanIDs = planned_stmt->rewindPlanIDs;
ShipPlannedStmt->rowMarks = planned_stmt->rowMarks;
ShipPlannedStmt->relationOids = planned_stmt->relationOids;
ShipPlannedStmt->invalItems = planned_stmt->invalItems;
ShipPlannedStmt->nParamExec = planned_stmt->nParamExec;
ShipPlannedStmt->num_streams = step->num_stream;
ShipPlannedStmt->gather_count = step->num_gather;
ShipPlannedStmt->num_nodes = step->nodeCount;
ShipPlannedStmt->nodesDefinition = planned_stmt->nodesDefinition;
* For un-stream plan, we can not finalize node id and parent node id for result plan.
*/
if (IS_STREAM_PLAN || IS_PGXC_DATANODE || planned_stmt->in_compute_pool) {
ShipPlannedStmt->instrument_option = planned_stmt->instrument_option;
} else
ShipPlannedStmt->instrument_option = 0;
ShipPlannedStmt->num_plannodes = planned_stmt->num_plannodes;
ShipPlannedStmt->query_string = planned_stmt->query_string;
ShipPlannedStmt->in_compute_pool = planned_stmt->in_compute_pool;
ShipPlannedStmt->has_obsrel = planned_stmt->has_obsrel;
ShipPlannedStmt->num_bucketmaps = planned_stmt->num_bucketmaps;
ShipPlannedStmt->query_dop = planned_stmt->query_dop;
appendStringInfoChar(str, FLAG_SERIALIZED_PLAN);
for (int i = 0; i < ShipPlannedStmt->num_bucketmaps; i++) {
ShipPlannedStmt->bucketMap[i] = planned_stmt->bucketMap[i];
ShipPlannedStmt->bucketCnt[i] = planned_stmt->bucketCnt[i];
}
ShipPlannedStmt->ng_num = planned_stmt->ng_num;
ShipPlannedStmt->ng_queryMem = planned_stmt->ng_queryMem;
#ifdef USE_SPQ
ShipPlannedStmt->enable_adaptive_scan = planned_stmt->enable_adaptive_scan;
ShipPlannedStmt->write_node_index = planned_stmt->write_node_index;
#endif
appendStringInfoString(str, nodeToString(ShipPlannedStmt));
}
#endif
char* CompressSerializedPlan(const char* plan_string, int* cLen)
{
#ifdef USE_SPQ
return SpqCompressSerializedPlan(plan_string, cLen);
#endif
DISTRIBUTED_FEATURE_NOT_SUPPORTED();
return NULL;
}
char* DecompressSerializedPlan(const char* comp_plan_string, int cLen, int oLen)
{
#ifdef USE_SPQ
return SpqDecompressSerializedPlan(comp_plan_string, cLen, oLen);
#endif
DISTRIBUTED_FEATURE_NOT_SUPPORTED();
return NULL;
}
void SerializePlan(Plan* node, PlannedStmt* planned_stmt, StringInfoData* str, int num_stream, int num_gather)
{
DISTRIBUTED_FEATURE_NOT_SUPPORTED();
}
Plan* mark_distribute_dml(
PlannerInfo* root, Plan** sourceplan, ModifyTable* mt_plan, List** resultRelations, List* mergeActionList)
{
Plan* subplan = *sourceplan;
if (is_single_baseresult_plan(subplan)) {
inherit_plan_locator_info((Plan*)mt_plan, *sourceplan);
}
return (Plan*)mt_plan;
}
* Just execute the plan as a replicateion
*/
static void mark_distribute_setop_allnodes(Plan* plan)
{
plan->distributed_keys = NIL;
plan->exec_type = EXEC_ON_ALL_NODES;
Distribution* distribution = ng_get_default_computing_group_distribution();
plan->exec_nodes = ng_convert_to_exec_nodes(distribution, LOCATOR_TYPE_REPLICATED, RELATION_ACCESS_READ);
}
* We should get distribute key for union all for two case:
* 1. All subplan are hash and there have distkey;
* 2. Some subplan are hash which have distkey and some are replication.
*/
static List* get_distkey_for_unionall(List** subPlanKeyArray, int subPlanNum)
{
List* common_diskey = NIL;
for (int i = 0; i < subPlanNum; i++) {
if (subPlanKeyArray[i] != NIL) {
if (common_diskey == NIL)
common_diskey = subPlanKeyArray[i];
else if (!equal(common_diskey, subPlanKeyArray[i]))
return NIL;
}
}
return common_diskey;
}
bool judge_redistribute_setop_support(PlannerInfo* root, List* subplanlist, Bitmapset* redistributePlanSet)
{
Index subPlanIndex = 0;
ListCell* lc = NULL;
if (bms_is_empty(redistributePlanSet))
return true;
foreach (lc, subplanlist) {
if (bms_is_member(subPlanIndex, redistributePlanSet)) {
Plan* subPlan = (Plan*)lfirst(lc);
if (NULL != make_distkey_for_append(root, subPlan))
return true;
else
return false;
}
subPlanIndex++;
}
return true;
}
static bool redistributeInfo(PlannerInfo* root, List* subPlans, Plan* plan, List** redistributeKey,
Bitmapset** redistributePlanSet, Distribution** redistributeDistribution, bool isunionall, bool canDiskeyChange)
{
ListCell* cell = NULL;
Plan* subPlan = NULL;
int subPlanNum = list_length(subPlans);
Cost* subPlanCostArray = NULL;
List** subPlanKeyArray = NULL;
List* subPlanKeyIndex = NULL;
int subPlanIndex = 0;
List* redistributeKeyIndex = NULL;
Bitmapset* redistributePlanSetCopy = NULL;
bool result = true;
bool norediskeyplan = false, redistributedplan = false;
* This is the case that no columns needed by append,
* but with lower level of colstore table.
*/
if (plan->targetlist == NIL && !isunionall)
return false;
subPlanCostArray = (Cost*)palloc0(sizeof(Cost) * subPlanNum);
subPlanKeyArray = (List**)palloc0(sizeof(List*) * subPlanNum);
subPlanIndex = 0;
Distribution* target_distribution = ng_get_best_setop_distribution(subPlans, isunionall, root->is_correlated);
* Find redistribute key for each subplan.
*
* We have three kinds of subplan here.
* (1) replicate plan.
* (2) redistributed plan.
* (3) non-replicate plan that has no distribute key possible
*
* Note, we can't have all replicate plan in subplans here, since it has been handled earlier.
* Note, there may be all replicate plan in subplans, if their exec nodes have no overlap.
*
* For union all, we need replicate plan to be redistributed, and find a common redistribute
* key as possible (not a must). For non-union all, we need to find a common redistribute
* key for all the plans.
*/
foreach (cell, subPlans) {
subPlan = (Plan*)lfirst(cell);
Distribution* current_distribution = ng_get_dest_distribution(subPlan);
if (!ng_is_single_node_group_distribution(current_distribution)) {
subPlanKeyIndex = distributeKeyIndex(root, subPlan->distributed_keys, subPlan->targetlist);
if (NULL == subPlanKeyIndex) {
if (!isunionall) {
redistributePlanSetCopy = bms_add_member(redistributePlanSetCopy, subPlanIndex);
}
if (canDiskeyChange) {
ExecNodes* en = IsA(subPlan, Stream) ? ((Stream*)subPlan)->consumer_nodes : subPlan->exec_nodes;
bool partial_single_node =
(list_length(en->nodeList) == 1 && bms_num_members(target_distribution->bms_data_nodeids) > 1);
* Whether a stream has been added for the plan only executes on one datanode
* of multi-datanode group, we should redistribute it beforehand. Or it'll lead
* duplicate results when pushing down exec_nodes to other datanodes, and we
* can't prevent exec_nodes pushing down since executor needs all the consumer
* to be same in one thread, or it'll hang.
*/
if (partial_single_node) {
ListCell* lc = NULL;
List* distkeys = NIL;
* Since there's no stats info in append rel, we can only roughly
* choose the distribute key to do redistribute
*/
foreach (lc, subPlan->targetlist) {
TargetEntry* tle = (TargetEntry*)lfirst(lc);
if (IsTypeDistributable(exprType((Node*)tle->expr))) {
distkeys = list_make1(tle->expr);
break;
}
}
if (distkeys != NIL) {
subPlan = make_stream_plan(root, subPlan, distkeys, 1.0, target_distribution);
subPlanKeyArray[subPlanIndex] =
distributeKeyIndex(root, subPlan->distributed_keys, subPlan->targetlist);
} else {
* No suitable distribute key, and we don't support distribute on
* roundrobin, so make a const to distribute on it. NOTE. We know
* it's not a good idea, but no way in the moment. Can improve later
*/
Const* con = makeConst(INT4OID, -1, InvalidOid, -2, (Datum)0, true, false);
distkeys = list_make1(con);
subPlan = make_stream_plan(root, subPlan, distkeys, 1.0, target_distribution);
}
redistributedplan = true;
lfirst(cell) = subPlan;
} else
norediskeyplan = true;
}
} else {
subPlanKeyArray[subPlanIndex] = subPlanKeyIndex;
redistributedplan = true;
}
}
if (!isunionall) {
unsigned int producer_num_datanodes = ng_get_dest_num_data_nodes(subPlan);
unsigned int consumer_num_datanodes = bms_num_members(target_distribution->bms_data_nodeids);
if (consumer_num_datanodes == 0) {
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_DIVISION_BY_ZERO),
(errmsg("consumer_num_datanodes should not be zero"))));
}
subPlanCostArray[subPlanIndex] =
ng_calculate_setop_branch_stream_cost(subPlan, producer_num_datanodes, consumer_num_datanodes);
}
subPlanIndex++;
}
if (isunionall) {
if (!norediskeyplan) {
redistributeKeyIndex = get_distkey_for_unionall(subPlanKeyArray, subPlanNum);
} else if (!redistributedplan && !judge_redistribute_setop_support(root, subPlans, redistributePlanSetCopy)) {
result = false;
}
} else {
* Find no distribute key for subPlan original, we should generate distribute key from max
* cost plan as the distribute key.
*/
if (!redistributedplan) {
redistributeKeyIndex = get_max_cost_distkey_for_nulldistkey(root, subPlans, subPlanNum, subPlanCostArray);
if (redistributeKeyIndex == NIL)
result = false;
} else {
* There has distribute key for subPlan original,
* use subPlanKeyArray as the distribute key.
*/
redistributeKeyIndex = get_max_cost_distkey_for_hasdistkey(
root, subPlans, subPlanNum, subPlanKeyArray, subPlanCostArray, &redistributePlanSetCopy);
}
}
pfree_ext(subPlanCostArray);
pfree_ext(subPlanKeyArray);
*redistributePlanSet = redistributePlanSetCopy;
*redistributeKey = redistributeKeyIndex;
*redistributeDistribution = target_distribution;
return result;
}
static ExecNodes* append_merge_exec_nodes(List* subplans, bool is_distributed)
{
ListCell* lc = NULL;
ExecNodes* merged_en = (ExecNodes*)makeNode(ExecNodes);
Distribution* merged_distribution = NULL;
foreach (lc, subplans) {
Plan* subplan = (Plan*)lfirst(lc);
ExecNodes* en = subplan->exec_nodes;
if (IsA(subplan, Stream))
en = ((Stream*)subplan)->consumer_nodes;
merged_en->nodeList = list_merge_int(merged_en->nodeList, en->nodeList);
* There are two callers of this function
* (1) mark_distribute_setop : the en->distribution may not the same
* (2) mark_distribute_setop_distribution : the en->distribution should be the same
* So, we could not do this assert : ng_is_same_group(&merged_en->distribution, &en->distribution)
*/
if (is_distributed &&
(merged_distribution != NULL && !ng_is_same_group(merged_distribution, &en->distribution))) {
elog(ERROR, "The distribution of merged and exec node are not the same\n"
"merged distribution is %s\n"
"supblan distribution is %s",
dist_to_str(merged_distribution),
dist_to_str(&en->distribution));
}
Distribution* new_merged_distribution = ng_get_union_distribution(merged_distribution, &en->distribution);
DestroyDistribution(merged_distribution);
merged_distribution = new_merged_distribution;
}
ng_set_distribution(&merged_en->distribution, merged_distribution);
foreach (lc, subplans) {
Plan* subplan = (Plan*)lfirst(lc);
ExecNodes* en = subplan->exec_nodes;
* If the subplan contains stream, we should pushdown the merged exec_nodes.
* The reason why we do this is because different exec_nodes between top plan node and
* stream consumer_nodes may cause hang up.
*
* And we must pushdown merged exec_nodes when subplan->dop > 1. because when add local
* stream, we need the plan node on both sides of the local stream node have the same exec nodes.
* pushdown merged exec_nodes can guarantee this.
*/
if (!contain_special_plan_node(subplan, T_Stream, CPLN_NO_IGNORE_MATERIAL) && subplan->dop == 1) {
continue;
}
if (IsA(subplan, Stream))
en = ((Stream*)subplan)->consumer_nodes;
if (list_length(merged_en->nodeList) > list_length(en->nodeList)) {
* Use a deep copy of 'merged_en', in case the subplan's
* baselocatortype was changed by the assignment of
* merged_en->baselocatortype.
*/
ExecNodes* temp_execnodes = (ExecNodes*)copyObject(merged_en);
temp_execnodes->baselocatortype = en->baselocatortype;
pushdown_execnodes(subplan, temp_execnodes, true);
}
}
merged_en->baselocatortype = LOCATOR_TYPE_HASH;
return merged_en;
}
static void mark_distribute_setop_distribution(PlannerInfo* root, Node* node, Plan* plan, List* subPlans,
Bitmapset* redistributePlanSet, List* redistributeKey, Distribution* redistributeDistribution)
{
ListCell* cell = NULL;
List* newSubPlans = NIL;
Plan* subPlan = NULL;
Index subPlanIndex = 0;
TargetEntry* teEntry = NULL;
MergeAppend* mergeAppend = NULL;
Append* append = NULL;
ListCell* attnumCell = NULL;
AttrNumber attnum;
RecursiveUnion* recursiveUnionPlan = NULL;
Distribution *newDistribution = redistributeDistribution;
if (IsA(node, MergeAppend)) {
mergeAppend = (MergeAppend*)node;
} else if (IsA(node, Append)) {
AssertEreport(IsA(node, Append), MOD_OPT, "The node is NOT a Append");
append = (Append*)node;
} else if (IsA(node, RecursiveUnion)) {
recursiveUnionPlan = (RecursiveUnion*)node;
}
if (!bms_is_empty(redistributePlanSet)) {
foreach (cell, subPlans) {
subPlan = (Plan*)lfirst(cell);
List* distribute_keys = NIL;
List* subplandistkey = redistributeKey;
* There are four cases enter the else branch below:
* 1. All subplan are hash and there are no distkey;
* 2. All subplan are hash which some have distkey and some have no distkey;
* 3. Some subplan are hash which involve two cases above-mentioned and some are replication.
* We will choose distkey for replication of union all.
*/
if (subplandistkey == NIL)
subplandistkey = make_distkey_for_append(root, subPlan);
* Add distribute node
*/
foreach (attnumCell, subplandistkey) {
attnum = lfirst_int(attnumCell);
if ((attnum - 1) >= list_length(subPlan->targetlist)) {
elog(ERROR, "attnum overflow the length of subplan targetlist");
}
teEntry = (TargetEntry*)list_nth(subPlan->targetlist, attnum - 1);
distribute_keys = lappend(distribute_keys, teEntry->expr);
}
if (bms_is_member(subPlanIndex, redistributePlanSet)) {
* If both sub plan are replicate plan and we could not get distribute keys for them,
* we need to broadcast both of them to a single datanode from redistributeDistribution
*/
bool noDistribute_keys = (distribute_keys == NIL &&
bms_num_members(redistributeDistribution->bms_data_nodeids) > 1);
if (noDistribute_keys) {
newDistribution = ng_get_random_single_dn_distribution(redistributeDistribution);
}
Plan* newplan = subPlan;
if (root->is_correlated && SUBQUERY_PREDPUSH(root))
elog(ERROR, "Can not add stream operator on to parameterize plan.");
bool partial_single_node = bms_num_members (newDistribution->bms_data_nodeids) == 1 &&
bms_num_members(redistributeDistribution->bms_data_nodeids) > 1 &&
distribute_keys == NIL;
if (partial_single_node) {
* If a stream plan only executes on one datanode of multi-datanode group,
* we should redistribute it. Or it'll lead duplicate results when pushing
* down exec_nodes to other datanodes
*/
const int typeMod = -1;
const int typeLen = -2;
Const *con = makeConst(INT4OID, typeMod, InvalidOid, typeLen, (Datum)0, true, false);
newplan = make_stream_plan(root, subPlan, list_make1(con), 0, redistributeDistribution);
} else {
newplan = make_stream_plan(root, subPlan, distribute_keys, 0, newDistribution);
* stream->consumer_nodes->distribution
* to make sure all the subplans of append has the same distributeion.
*/
if (newDistribution != redistributeDistribution) {
ng_copy_distribution(&((Stream *)newplan)->consumer_nodes->distribution,
redistributeDistribution);
}
}
if (IsA(newplan, Stream)) {
Stream *streamNode = (Stream *)newplan;
streamNode->is_sorted = IsA(node, MergeAppend) ? true : false;
}
if (PointerIsValid(mergeAppend)) {
newSubPlans = lappend(newSubPlans,
make_sort(root,
newplan,
mergeAppend->numCols,
mergeAppend->sortColIdx,
mergeAppend->sortOperators,
mergeAppend->collations,
mergeAppend->nullsFirst,
-1));
} else {
newSubPlans = lappend(newSubPlans, newplan);
}
} else {
newSubPlans = lappend(newSubPlans, subPlan);
}
subPlanIndex++;
}
if (PointerIsValid(mergeAppend)) {
mergeAppend->mergeplans = newSubPlans;
} else if (PointerIsValid(recursiveUnionPlan)) {
const int invalidLen = 2;
AssertEreport(list_length(newSubPlans) == invalidLen, MOD_OPT, "Invalid subplan length");
Plan* recursive_base_plan = (Plan*)recursiveUnionPlan;
recursive_base_plan->lefttree = (Plan*)linitial(newSubPlans);
recursive_base_plan->righttree = (Plan*)lsecond(newSubPlans);
} else {
AssertEreport(PointerIsValid(append), MOD_OPT, "The append is NULL");
append->appendplans = newSubPlans;
}
subPlans = newSubPlans;
}
foreach (attnumCell, redistributeKey) {
attnum = lfirst_int(attnumCell);
if (list_length(plan->targetlist) < attnum) {
elog(ERROR, "target list is too short");
}
teEntry = (TargetEntry*)list_nth(plan->targetlist, attnum - 1);
plan->distributed_keys = lappend(plan->distributed_keys, teEntry->expr);
}
plan->exec_type = EXEC_ON_DATANODES;
plan->exec_nodes = append_merge_exec_nodes(subPlans, true);
}
void mark_distribute_setop(PlannerInfo* root, Node* node, bool isunionall, bool canDiskeyChange)
{
List* subPlans = NIL;
ListCell* planCell = NULL;
Plan* plan = NULL;
Bitmapset* execAllNodesPlanSet = NULL;
Index subPlanIndex = 0;
bool execOnCoords = false;
if (IsA(node, Append)) {
Append* appendPlan = (Append*)node;
subPlans = appendPlan->appendplans;
plan = &(appendPlan->plan);
} else if (IsA(node, RecursiveUnion)) {
RecursiveUnion* recursive_union_plan = (RecursiveUnion*)node;
plan = &recursive_union_plan->plan;
subPlans = lappend(subPlans, plan->lefttree);
subPlans = lappend(subPlans, plan->righttree);
} else {
MergeAppend* mergeAppendPlan = NULL;
AssertEreport(IsA(node, MergeAppend), MOD_OPT, "The node is NOT a MergeAppend");
mergeAppendPlan = (MergeAppend*)node;
subPlans = mergeAppendPlan->mergeplans;
plan = &(mergeAppendPlan->plan);
}
AssertEreport(PointerIsValid(subPlans), MOD_OPT, "The subPlan is NULL");
AssertEreport(list_length(subPlans) >= 1, MOD_OPT, "The list length of subplan is 0");
foreach (planCell, subPlans) {
Plan* subPlan = (Plan*)lfirst(planCell);
if (is_execute_on_coordinator(subPlan)) {
execOnCoords = true;
break;
}
}
if (execOnCoords) {
if (SUBQUERY_IS_PARAM(root) && root->is_correlated) {
elog(ERROR, "Can not add stream operator on to parameterize plan.");
}
} else {
subPlanIndex = 0;
foreach (planCell, subPlans) {
Plan* subPlan = (Plan*)lfirst(planCell);
if (root->is_correlated && !(SUBQUERY_PREDPUSH(root))) {
if (contain_special_plan_node(subPlan, T_Stream)) {
subPlan = materialize_finished_plan(subPlan, true, root->glob->vectorized);
}
lfirst(planCell) = subPlan;
}
if (is_execute_on_allnodes(subPlan)) {
execAllNodesPlanSet = bms_add_member(execAllNodesPlanSet, subPlanIndex);
subPlanIndex++;
continue;
}
if (subPlan->exec_nodes->nodeList == NIL) {
elog(DEBUG1, "[mark_distribute_setop] empty node list");
}
AssertEreport(PointerIsValid(subPlan->exec_nodes), MOD_OPT, "The subPlan's exec_nodes is NULL");
subPlanIndex++;
}
if (bms_num_members(execAllNodesPlanSet) == list_length(subPlans)) {
mark_distribute_setop_allnodes(plan);
} else {
* Just redistribute subplans if there is no less than one subplan
* that is distributed
*/
Bitmapset* redistributePlanSet = NULL;
List* redistributeKey = NIL;
Distribution* redistributeDistribution = NULL;
bool result = false;
* get redistribute information
* return true if succeed; else return fail
*/
result = redistributeInfo(root,
subPlans,
plan,
&redistributeKey,
&redistributePlanSet,
&redistributeDistribution,
isunionall,
canDiskeyChange);
if (result) {
mark_distribute_setop_distribution(
root, node, plan, subPlans, redistributePlanSet, redistributeKey, redistributeDistribution);
} else {
* Add remote query node on top of each subplan if fail
* to get redistribute information
*/
mark_distribute_setop_remotequery(root, node, plan, subPlans);
}
}
if (PointerIsValid(execAllNodesPlanSet)) {
pfree_ext(execAllNodesPlanSet);
}
}
}
void mark_stream_unsupport()
{
u_sess->opt_cxt.is_stream_support = false;
ereport(ERROR, (errmodule(MOD_OPT), (errcode(ERRCODE_STREAM_NOT_SUPPORTED), errmsg("mark_stream_unsupport."))));
}
void materialize_remote_query(Plan* result_plan, bool* materialized, bool sort_to_store)
{
DISTRIBUTED_FEATURE_NOT_SUPPORTED();
}
void SerializePlan(
Plan* node, PlannedStmt* planned_stmt, StringInfoData* str, int num_stream, int num_gather, bool push_subplan)
{
DISTRIBUTED_FEATURE_NOT_SUPPORTED();
}