*
* preptlist.cpp
* Routines to preprocess the parse tree target list
*
* For INSERT and UPDATE queries, the targetlist must contain an entry for
* each attribute of the target relation in the correct order. For all query
* types, we may need to add junk tlist entries for Vars used in the RETURNING
* list and row ID information needed for EvalPlanQual checking.
*
* NOTE: the rewriter's rewriteTargetListIU and rewriteTargetListUD
* routines also do preprocessing of the targetlist. The division of labor
* between here and there is a bit arbitrary and historical.
*
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/gausskernel/optimizer/prep/preptlist.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/heapam.h"
#include "access/sysattr.h"
#include "catalog/pg_type.h"
#include "foreign/fdwapi.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/prep.h"
#include "optimizer/streamplan.h"
#include "optimizer/tlist.h"
#include "parser/parsetree.h"
#include "parser/parse_coerce.h"
#include "parser/parse_merge.h"
#include "pgxc/pgxc.h"
#include "rewrite/rewriteHandler.h"
#include "tcop/utility.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/rel_gs.h"
static List* expand_targetlist(List* tlist, int command_type, List* resultRelations, List* range_table);
#ifdef STREAMPLAN
static List* add_distribute_column(List* tlist, Index result_relation, List* range_table);
#endif
* preprocess_targetlist
* Driver for preprocessing the parse tree targetlist.
*
* Returns the new targetlist.
*/
List* preprocess_targetlist(PlannerInfo* root, List* tlist)
{
Query* parse = root->parse;
int result_relation = linitial2_int(parse->resultRelations);
List* range_table = parse->rtable;
CmdType command_type = parse->commandType;
ListCell* lc = NULL;
RangeTblEntry* rte = NULL;
* Sanity check: if there is a result relation, it'd better be a real
* relation not a subquery. Else parser or rewriter messed up.
*/
foreach (lc, parse->resultRelations) {
int resultRelation = lfirst_int(lc);
rte = rt_fetch(resultRelation, range_table);
if (rte->subquery != NULL || rte->relid == InvalidOid)
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_OPTIMIZER_INCONSISTENT_STATE),
(errmsg("subquery cannot be result relation"))));
}
* for heap_form_tuple to work, the targetlist must match the exact order
* of the attributes. We also need to fill in any missing attributes. -ay
* 10/94
*/
if (command_type == CMD_INSERT || command_type == CMD_UPDATE)
tlist = expand_targetlist(tlist, command_type, parse->resultRelations, range_table);
#ifdef STREAMPLAN
else if (IS_STREAM_PLAN && command_type == CMD_DELETE)
tlist = add_distribute_column(tlist, result_relation, range_table);
#endif
if (command_type == CMD_MERGE) {
ListCell* l = NULL;
bool is_qual_expanded = false;
* For MERGE, add any junk column(s) needed to allow the executor to
* identify the rows to be updated or deleted, with different
* handling for partitioned tables.
*/
rewriteTargetListMerge(parse, result_relation, range_table);
* For MERGE command, handle targetlist of each MergeAction separately.
* Give the same treatment to MergeAction->targetList as we would have
* given to a regular INSERT/UPDATE/DELETE.
*/
foreach (l, parse->mergeActionList) {
MergeAction* action = (MergeAction*)lfirst(l);
switch (action->commandType) {
case CMD_INSERT:
case CMD_UPDATE:
action->targetList =
expand_targetlist(action->targetList, action->commandType, parse->resultRelations, range_table);
if (action->pulluped_targetList != NIL)
action->pulluped_targetList = expand_targetlist(
action->pulluped_targetList, action->commandType, parse->resultRelations, range_table);
break;
case CMD_DELETE:
break;
case CMD_NOTHING:
break;
default:
ereport(ERROR, ((errcode(ERRCODE_CASE_NOT_FOUND), errmsg("unknown action in MERGE WHEN clause"))));
}
if (!IS_STREAM_PLAN && action->qual != NULL && !IS_SINGLE_NODE) {
if (!is_qual_expanded) {
parse->targetList = expandQualTL(parse->targetList, parse);
is_qual_expanded = true;
}
action->qual = substitute_var(action->qual, parse->targetList);
}
}
if (!IS_PGXC_DATANODE && !IS_STREAM_PLAN && !IS_SINGLE_NODE) {
parse->targetList = expandActionTL(parse->targetList, parse);
}
}
* Add necessary junk columns for rowmarked rels. These values are needed
* for locking of rels selected FOR UPDATE/SHARE, and to do EvalPlanQual
* rechecking. See comments for PlanRowMark in plannodes.h.
*/
foreach (lc, root->rowMarks) {
PlanRowMark* rc = (PlanRowMark*)lfirst(lc);
Var* var = NULL;
char resname[32];
TargetEntry* tle = NULL;
RangeTblEntry* rte = NULL;
int ret = 0;
if (rc->rti != rc->prti)
continue;
rte = rt_fetch(rc->rti, range_table);
if (rc->markType != ROW_MARK_COPY) {
var = makeVar(rc->rti, SelfItemPointerAttributeNumber, TIDOID, -1, InvalidOid, 0);
ret = snprintf_s(resname, sizeof(resname), sizeof(resname) - 1, "ctid%u", rc->rowmarkId);
securec_check_ss(ret, "", "");
tle = makeTargetEntry((Expr*)var, list_length(tlist) + 1, pstrdup(resname), true);
tlist = lappend(tlist, tle);
bool need_oid = rc->isParent || rte->ispartrel || rte->orientation == REL_COL_ORIENTED;
if (need_oid) {
var = makeVar(rc->rti, TableOidAttributeNumber, OIDOID, -1, InvalidOid, 0);
ret = snprintf_s(resname, sizeof(resname), sizeof(resname) - 1, "tableoid%u", rc->rowmarkId);
securec_check_ss(ret, "", "");
tle = makeTargetEntry((Expr*)var, list_length(tlist) + 1, pstrdup(resname), true);
tlist = lappend(tlist, tle);
}
if (rte->relhasbucket) {
var = makeVar(rc->rti, BucketIdAttributeNumber, INT2OID, -1, InvalidBktId, 0);
ret = snprintf_s(resname, sizeof(resname), sizeof(resname) - 1, "tablebucketid%u", rc->rowmarkId);
securec_check_ss(ret, "", "");
tle = makeTargetEntry((Expr*)var, list_length(tlist) + 1, pstrdup(resname), true);
tlist = lappend(tlist, tle);
}
* Add table's distribute columns to targetlist for FOR UPDATE/SHARE
* in stream plan.
*/
if (IS_STREAM_PLAN && (rc->markType == ROW_MARK_EXCLUSIVE || rc->markType == ROW_MARK_SHARE)) {
tlist = add_distribute_column(tlist, rc->rti, range_table);
}
} else {
bool is_hdfs_ftbl = false;
List* sub_tlist = NIL;
if (rte->relkind == RELKIND_FOREIGN_TABLE || rte->relkind == RELKIND_STREAM) {
if (isObsOrHdfsTableFormTblOid(rte->relid) || IS_OBS_CSV_TXT_FOREIGN_TABLE(rte->relid)) {
is_hdfs_ftbl = true;
}
} else if (rte->rtekind == RTE_SUBQUERY && (IS_STREAM_PLAN || IS_SINGLE_NODE)) {
sub_tlist = rte->subquery->targetList;
}
if (!is_hdfs_ftbl && sub_tlist == NIL) {
var = makeWholeRowVar(rt_fetch(rc->rti, range_table), rc->rti, 0, false);
if (var == NULL) {
ereport(ERROR,(errcode(ERRCODE_UNDEFINED_FILE),
errmsg("could not get the whole row to build a junk var.")));
return NULL;
}
int rcode = snprintf_s(resname, sizeof(resname), sizeof(resname) - 1, "wholerow%u", rc->rowmarkId);
securec_check_ss(rcode, "\0", "\0");
tle = makeTargetEntry((Expr*)var, list_length(tlist) + 1, pstrdup(resname), true);
tlist = lappend(tlist, tle);
} else {
int varattno = 0;
Relation rel = NULL;
int maxattrs;
TupleDesc tupdesc = NULL;
ListCell* lc2 = NULL;
if (is_hdfs_ftbl) {
rel = relation_open(rte->relid, AccessShareLock);
tupdesc = rel->rd_att;
maxattrs = tupdesc->natts;
} else {
maxattrs = list_length(sub_tlist);
lc2 = list_head(sub_tlist);
}
for (varattno = 0; varattno < maxattrs; varattno++) {
Oid vartype = InvalidOid;
int32 vartypmod = -1;
Oid varCollid = InvalidOid;
if (is_hdfs_ftbl) {
Form_pg_attribute attr = &tupdesc->attrs[varattno];
vartype = attr->atttypid;
varCollid = attr->attcollation;
vartypmod = attr->atttypmod;
} else {
if (varattno != 0)
lc2 = lnext(lc2);
TargetEntry* tleLocal = (TargetEntry*)lfirst(lc2);
Expr* expr = tleLocal->expr;
if (tleLocal->resjunk)
continue;
vartype = exprType((Node*)expr);
vartypmod = exprTypmod((Node*)expr);
varCollid = exprCollation((Node*)expr);
}
var = makeVar(rc->rti, varattno + 1, vartype, vartypmod, varCollid, 0);
ret = snprintf_s(resname, sizeof(resname), sizeof(resname) - 1, "wholerow%u", rc->rowmarkId);
securec_check_ss(ret, "\0", "\0");
tle = makeTargetEntry((Expr*)var, list_length(tlist) + 1, pstrdup(resname), true);
tlist = lappend(tlist, tle);
}
rc->markType = ROW_MARK_COPY_DATUM;
rc->numAttrs = maxattrs;
if (is_hdfs_ftbl)
relation_close(rel, AccessShareLock);
}
#ifdef STREAMPLAN
if (IS_STREAM_PLAN && var->vartype == RECORDOID) {
if (rte->relname != NULL) {
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"Type of Record in %s that is not a real table can not be shipped",
rte->relname);
securec_check_ss_c(sprintf_rc, "\0", "\0");
} else {
errno_t sprintf_rc = sprintf_s(u_sess->opt_cxt.not_shipping_info->not_shipping_reason,
NOTPLANSHIPPING_LENGTH,
"Type of Record in non-real table can not be shipped");
securec_check_ss_c(sprintf_rc, "\0", "\0");
}
mark_stream_unsupport();
}
#endif
}
}
* If the query has a RETURNING list, add resjunk entries for any Vars
* used in RETURNING that belong to other relations. We need to do this
* to make these Vars available for the RETURNING calculation. Vars that
* belong to the result rel don't need to be added, because they will be
* made to refer to the actual heap tuple.
*/
if (parse->returningList && list_length(parse->rtable) > 1) {
List* vars = NIL;
ListCell* l = NULL;
vars = pull_var_clause((Node*)parse->returningList, PVC_RECURSE_AGGREGATES, PVC_INCLUDE_PLACEHOLDERS);
foreach (l, vars) {
Var* var = (Var*)lfirst(l);
TargetEntry* tle = NULL;
if (IsA(var, Var) && var->varno == (unsigned int)result_relation)
continue;
if (tlist_member((Node*)var, tlist))
continue;
tle = makeTargetEntry((Expr*)var, list_length(tlist) + 1, NULL, true);
tlist = lappend(tlist, tle);
}
list_free_ext(vars);
}
return tlist;
}
*
* TARGETLIST EXPANSION
*
*****************************************************************************/
* expand_targetlist
* Given a target list as generated by the parser and result relations,
* add targetlist entries for any missing attributes, and ensure the
* non-junk attributes appear in proper field order.
*/
static List* expand_targetlist(List* tlist, int command_type, List* resultRelations, List* range_table)
{
List* new_tlist = NIL;
ListCell* tlist_item = NULL;
Relation rel;
int attrno, numattrs;
ListCell* l = NULL;
tlist_item = list_head(tlist);
* The rewriter should have already ensured that the TLEs are in correct
* order; but we have to insert TLEs for any missing attributes.
*
* Scan the tuple description in the relation's relcache entry to make
* sure we have all the user attributes in the right order. We assume
* that the rewriter already acquired at least AccessShareLock on the
* relation, so we need no lock here.
*/
foreach (l, resultRelations) {
int hash_col_cnt = 0;
bool is_ledger_rel = false;
int result_relation = lfirst_int(l);
rel = heap_open(getrelid(result_relation, range_table), NoLock);
is_ledger_rel = rel->rd_isblockchain;
numattrs = RelationGetNumberOfAttributes(rel);
for (attrno = 1; attrno <= numattrs; attrno++) {
Form_pg_attribute att_tup = &rel->rd_att->attrs[attrno - 1];
TargetEntry* new_tle = NULL;
if (tlist_item != NULL) {
TargetEntry* old_tle = (TargetEntry*)lfirst(tlist_item);
if (!old_tle->resjunk && old_tle->resno == attrno &&
(list_length(resultRelations) == 1 || old_tle->rtindex == (Index)result_relation)) {
new_tle = old_tle;
tlist_item = lnext(tlist_item);
}
}
if (new_tle == NULL) {
* Didn't find a matching tlist entry, so make one.
*
* For INSERT, generate a NULL constant. (We assume the rewriter
* would have inserted any available default value.) Also, if the
* column isn't dropped, apply any domain constraints that might
* exist --- this is to catch domain NOT NULL.
*
* For UPDATE, generate a Var reference to the existing value of
* the attribute, so that it gets copied to the new tuple. But
* generate a NULL for dropped columns (we want to drop any old
* values).
*
* When generating a NULL constant for a dropped column, we label
* it INT4 (any other guaranteed-to-exist datatype would do as
* well). We can't label it with the dropped column's datatype
* since that might not exist anymore. It does not really matter
* what we claim the type is, since NULL is NULL --- its
* representation is datatype-independent. This could perhaps
* confuse code comparing the finished plan to the target
* relation, however.
*/
Oid atttype = att_tup->atttypid;
int32 atttypmod = att_tup->atttypmod;
Oid attcollation = att_tup->attcollation;
Node* new_expr = NULL;
if (is_ledger_rel && strcmp(NameStr(att_tup->attname), "hash") == 0) {
hash_col_cnt = 1;
continue;
}
switch (command_type) {
case CMD_INSERT:
if (!att_tup->attisdropped) {
new_expr = (Node*)makeConst(atttype,
-1,
attcollation,
att_tup->attlen,
(Datum)0,
true,
att_tup->attbyval);
new_expr =
coerce_to_domain(new_expr, InvalidOid, -1, atttype, COERCE_IMPLICIT_CAST,
NULL, NULL, -1, false, false);
} else {
new_expr = (Node*)makeConst(INT4OID,
-1,
InvalidOid,
sizeof(int32),
(Datum)0,
true,
true );
}
break;
case CMD_MERGE:
case CMD_UPDATE:
if (!att_tup->attisdropped) {
new_expr = (Node*)makeVar((Index)result_relation, attrno - hash_col_cnt, atttype,
atttypmod, attcollation, 0);
} else {
new_expr = (Node*)makeConst(INT4OID,
-1,
InvalidOid,
sizeof(int32),
(Datum)0,
true,
true );
}
break;
default:
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_DATA_EXCEPTION),
(errmsg("unrecognized command_type: %d", (int)command_type))));
new_expr = NULL;
break;
}
new_tle = makeTargetEntry((Expr*)new_expr, attrno, pstrdup(NameStr(att_tup->attname)), false);
new_tle->rtindex = (Index)result_relation;
}
new_tlist = lappend(new_tlist, new_tle);
}
heap_close(rel, NoLock);
}
int next_junk_attrno = list_length(new_tlist) + 1;
* The remaining tlist entries should be resjunk; append them all to the
* end of the new tlist, making sure they have resnos higher than the last
* real attribute. (Note: although the rewriter already did such
* renumbering, we have to do it again here in case we are doing an UPDATE
* in a table with dropped columns, or an inheritance child table with
* extra columns.)
*/
while (tlist_item != NULL) {
TargetEntry* old_tle = (TargetEntry*)lfirst(tlist_item);
if (!old_tle->resjunk)
ereport(ERROR,
(errmodule(MOD_OPT),
errcode(ERRCODE_OPTIMIZER_INCONSISTENT_STATE),
(errmsg("targetlist is not sorted correctly"))));
if (old_tle->resno != next_junk_attrno) {
old_tle = flatCopyTargetEntry(old_tle);
old_tle->resno = next_junk_attrno;
}
new_tlist = lappend(new_tlist, old_tle);
next_junk_attrno++;
tlist_item = lnext(tlist_item);
}
return new_tlist;
}
* Locate PlanRowMark for given RT index, or return NULL if none
*
* This probably ought to be elsewhere, but there's no very good place
*/
PlanRowMark* get_plan_rowmark(List* rowmarks, Index rtindex)
{
ListCell* l = NULL;
foreach (l, rowmarks) {
PlanRowMark* rc = (PlanRowMark*)lfirst(l);
if (rc->rti == rtindex)
return rc;
}
return NULL;
}
#ifdef STREAMPLAN
* add_distribute_column
* Given a target list as generated by the parser and a result relation,
* add distribute entries to the target list if the attribute is missing, and
* ensure it appears in the final target list for stream distribution.
*/
static List* add_distribute_column(List* tlist, Index result_relation, List* range_table)
{
Relation rel = heap_open(getrelid(result_relation, range_table), NoLock);
int2vector* disattrno = NULL;
ListCell* lc = NULL;
if (rel->rd_rel->relkind == RELKIND_RELATION)
disattrno = get_baserel_distributekey_no(getrelid(result_relation, range_table));
if (disattrno == NULL) {
heap_close(rel, NoLock);
return tlist;
}
int no = 0;
int len = disattrno->dim1;
bool* isExist = (bool*)palloc0(len * sizeof(bool));
for (int i = 0; i < len; i++) {
no = disattrno->values[i];
foreach (lc, tlist) {
TargetEntry* tle = (TargetEntry*)lfirst(lc);
Expr* node = tle->expr;
if (IsA(node, Var) && ((Var*)node)->varno == result_relation && ((Var*)node)->varattno == no) {
isExist[i] = true;
break;
}
}
}
for (int i = 0; i < len; i++) {
if (isExist[i] == false) {
no = disattrno->values[i];
Form_pg_attribute att_tup = &rel->rd_att->attrs[no - 1];
Node* new_expr =
(Node*)makeVar(result_relation, no, att_tup->atttypid, att_tup->atttypmod, att_tup->attcollation, 0);
TargetEntry* tle =
makeTargetEntry((Expr*)new_expr, list_length(tlist) + 1, pstrdup(NameStr(att_tup->attname)), true);
tlist = lappend(tlist, tle);
}
}
heap_close(rel, NoLock);
return tlist;
}
List* preprocess_upsert_targetlist(List* tlist, int result_relation, List* range_table)
{
return expand_targetlist(tlist, CMD_UPDATE, list_make1_int(result_relation), range_table);
}
#endif