*
* streampath_single.cpp
* functions for stream path generation in local node.
*
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/gausskernel/optimizer/path/streampath_single.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include <math.h>
#include "bulkload/foreignroutine.h"
#include "catalog/pg_statistic.h"
#include "commands/copy.h"
#include "foreign/foreign.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/print.h"
#include "nodes/relation.h"
#include "optimizer/clauses.h"
#include "optimizer/cost.h"
#include "optimizer/nodegroups.h"
#include "optimizer/optimizerdebug.h"
#include "optimizer/planmain.h"
#include "optimizer/planner.h"
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
#include "optimizer/planner.h"
#include "optimizer/pruning.h"
#include "optimizer/randomplan.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/streampath.h"
#include "optimizer/tlist.h"
#include "parser/parse_hint.h"
#include "parser/parsetree.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "utils/selfuncs.h"
#ifdef PGXC
#include "commands/tablecmds.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/streamplan.h"
#include "pgxc/pgxc.h"
#endif
* @Description: constructor for JoinPathGen.
*
* @param[IN] root: the plannerInfo for this join.
* @param[IN] joinrel: the join relation.
* @param[IN] jointype: join type.
* @param[IN] save_jointype: save join type.
* @param[IN] sjinfo: extra info about the join for selectivity estimation.
* @param[IN] semifactors: contains valid data if jointype is SEMI or ANTI.
* @param[IN] joinclauses: the clauses nodes to do join match.
* @param[IN] restrictlist: all RestrictInfo nodes to apply at the join.
* @param[IN] inner_path: the inner subpath for join.
* @param[IN] outer_path: the outer subpath for join.
* @param[IN] required_outer: the set of required outer rels.
*/
JoinPathGen::JoinPathGen(PlannerInfo* root, RelOptInfo* joinrel, JoinType jointype, JoinType save_jointype,
JoinPathExtraData* extra, List* joinclauses, List* restrictinfo,
Path* outer_path, Path* inner_path, Relids required_outer)
: JoinPathGenBase(root, joinrel, jointype, save_jointype, extra, joinclauses,
restrictinfo, outer_path, inner_path, required_outer)
{}
* @Description: decontructor function for join path generation.
*/
JoinPathGen::~JoinPathGen()
{}
* @Description: decide whether we need redistribute inner or outer side
* base on subpath distribute keys, join clauses and target
* node group.
*
* @return void
*/
void JoinPathGen::initDistribution()
{
* Wheather inner or outer need to be redistributed base on their distribute key and join clauses
* TRUE means need to be redistributed,
* FALSE means do not need to be redistributed
*/
m_redistributeInner = is_distribute_need_on_joinclauses(
m_root, m_innerPath->distribute_keys, m_joinClauses, m_innerRel, m_outerRel, &m_rrinfoInner);
* Wheather inner or outer need to be redistributed base on their distribute key and join clauses
* TRUE means need to be redistributed,
* FALSE means do not need to be redistributed
*/
m_redistributeOuter = is_distribute_need_on_joinclauses(
m_root, m_outerPath->distribute_keys, m_joinClauses, m_outerRel, m_innerRel, &m_rrinfoOuter);
initRangeListDistribution();
}
* @Description: choose suitable parallel stream(like local stream)
* for parallel plan.
*
* @return void
*/
bool JoinPathGen::addJoinParallelInfo()
{
StreamType inner_stream = m_streamInfoPair->inner_info.type;
StreamType outer_stream = m_streamInfoPair->outer_info.type;
bool is_subplan_parallel = (m_innerPath->dop > 1) || (m_outerPath->dop > 1);
bool inner_can_local_distribute = true;
bool outer_can_local_distribute = true;
bool inner_need_local_distribute = true;
bool outer_need_local_distribute = true;
if (!is_subplan_parallel && m_dop <= 1) {
return true;
}
parallelLocalRedistribute(&inner_can_local_distribute,
&outer_can_local_distribute,
&inner_need_local_distribute,
&outer_need_local_distribute);
if (is_subplan_parallel && m_dop <= 1) {
setStreamParallelInfo(false);
setStreamParallelInfo(true);
} else {
if (inner_stream == STREAM_BROADCAST || outer_stream == STREAM_BROADCAST) {
setStreamParallelInfo(false);
setStreamParallelInfo(true);
} else if (inner_stream == STREAM_REDISTRIBUTE && outer_stream == STREAM_NONE) {
if (outer_can_local_distribute) {
if (outer_need_local_distribute)
setStreamParallelInfo(true, LOCAL_DISTRIBUTE);
else
setStreamParallelInfo(true);
setStreamParallelInfo(false);
} else {
return false;
}
} else if (inner_stream == STREAM_NONE && outer_stream == STREAM_REDISTRIBUTE) {
if (inner_can_local_distribute) {
if (inner_need_local_distribute)
setStreamParallelInfo(false, LOCAL_DISTRIBUTE);
else
setStreamParallelInfo(false);
setStreamParallelInfo(true);
} else {
return false;
}
} else if (inner_stream == STREAM_REDISTRIBUTE && outer_stream == STREAM_REDISTRIBUTE) {
setStreamParallelInfo(false);
setStreamParallelInfo(true);
} else if (inner_stream == STREAM_NONE && outer_stream == STREAM_NONE) {
StreamInfoPair* streamInfoPair = m_streamInfoPair;
if (can_broadcast_inner(m_jointype, m_saveJointype, false, NIL, NIL)) {
newStreamInfoPair(streamInfoPair);
setStreamParallelInfo(false, LOCAL_BROADCAST);
setStreamParallelInfo(true);
m_streamInfoList = lappend(m_streamInfoList, (void*)m_streamInfoPair);
}
if (can_broadcast_outer(m_jointype, m_saveJointype, false, NIL, NIL)) {
newStreamInfoPair(streamInfoPair);
setStreamParallelInfo(false);
setStreamParallelInfo(true, LOCAL_BROADCAST);
m_streamInfoList = lappend(m_streamInfoList, m_streamInfoPair);
}
if (inner_can_local_distribute && outer_can_local_distribute) {
newStreamInfoPair(streamInfoPair);
if (inner_need_local_distribute)
setStreamParallelInfo(false, LOCAL_DISTRIBUTE);
else
setStreamParallelInfo(false);
if (outer_need_local_distribute)
setStreamParallelInfo(true, LOCAL_DISTRIBUTE);
else
setStreamParallelInfo(true);
m_streamInfoList = lappend(m_streamInfoList, (void*)m_streamInfoPair);
}
pfree_ext(streamInfoPair);
return false;
}
}
return true;
}
* @Description: the most important part of join path generation is create
* stream path for join base on join clauses, subpath distribution,
* target nodegroup info, data skew info and smp info. We should
* consider all of these situations one by one to create a join path.
* Only the stream info is set at this stage, the stream path will
* be created later.
*
* @return void
*/
void JoinPathGen::addJoinStreamInfo()
{
if (!isParallelEnable() && m_dop > 1)
return;
initDistribution();
if (m_dop > 1) {
addStreamMppInfo();
} else {
setStreamBaseInfo(STREAM_NONE, STREAM_NONE, NIL, NIL);
}
addStreamParallelInfo();
}
* @Description: set parallel info include consumer/producer dop and
* parallel stream type.
*
* @param[IN] stream_outer: is outer side of join.
* @param[IN] sstype: smp stream type.
* @return void
*/
void JoinPathGen::setStreamParallelInfo(bool stream_outer, SmpStreamType sstype)
{
StreamInfo* sinfo = stream_outer ? &m_streamInfoPair->outer_info : &m_streamInfoPair->inner_info;
Path* subpath = stream_outer ? m_outerPath : m_innerPath;
StreamType stype = sinfo->type;
ParallelDesc* pdesc = &sinfo->smpDesc;
pdesc->distriType = PARALLEL_NONE;
pdesc->consumerDop = m_dop;
pdesc->producerDop = SET_DOP(subpath->dop);
switch (stype) {
case STREAM_BROADCAST: {
pdesc->distriType = LOCAL_BROADCAST;
break;
}
case STREAM_REDISTRIBUTE: {
pdesc->distriType = LOCAL_DISTRIBUTE;
break;
}
case STREAM_NONE: {
if (sstype == PARALLEL_NONE) {
if (pdesc->consumerDop != pdesc->producerDop) {
pdesc->distriType = LOCAL_ROUNDROBIN;
sinfo->type = STREAM_REDISTRIBUTE;
}
} else {
pdesc->distriType = sstype;
sinfo->type = STREAM_REDISTRIBUTE;
}
sinfo->stream_keys = stream_outer ? m_distributeKeysOuter : m_distributeKeysInner;
break;
}
default:
break;
}
}
* @Description: set distribute keys for join path.
*
* @param[IN] joinpath: join path.
* @param[IN] desired_key: desired key that try to meet.
* @param[IN] exact_match: if there's a desired key, whether we should do exact match.
* @return bool: if the path is valid.
*/
const bool JoinPathGen::setJoinDistributeKeys(JoinPath* joinpath, List* desired_key, bool exact_match)
{
Path* inner_path = joinpath->innerjoinpath;
Path* outer_path = joinpath->outerjoinpath;
bool is_roundrobin_inner = (inner_path->locator_type == LOCATOR_TYPE_RROBIN);
bool is_roundrobin_outer = (outer_path->locator_type == LOCATOR_TYPE_RROBIN);
bool is_replicate_inner = is_replicated_path(inner_path);
bool is_replicate_outer = is_replicated_path(outer_path);
bool is_path_valid = true;
if (is_roundrobin_inner || is_roundrobin_outer) {
joinpath->path.distribute_keys = NIL;
} else if (is_replicate_outer && is_replicate_inner) {
joinpath->path.distribute_keys = NIL;
} else if (is_replicate_outer || is_replicate_inner) {
if (is_replicate_outer) {
joinpath->path.distribute_keys = inner_path->distribute_keys;
joinpath->path.rangelistOid = inner_path->rangelistOid;
} else {
joinpath->path.distribute_keys = outer_path->distribute_keys;
joinpath->path.rangelistOid = outer_path->rangelistOid;
}
} else if (joinpath->path.dop > 1) {
if (m_jointype != JOIN_FULL) {
joinpath->path.distribute_keys = locate_distribute_key(
m_jointype, outer_path->distribute_keys, inner_path->distribute_keys, desired_key, exact_match);
if (!joinpath->path.distribute_keys)
is_path_valid = false;
}
}
return is_path_valid;
}