*
* nodeLockRows.cpp
* Routines to handle FOR UPDATE/FOR SHARE row locking
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/gausskernel/runtime/executor/nodeLockRows.cpp
*
* -------------------------------------------------------------------------
*
* INTERFACE ROUTINES
* ExecLockRows - fetch locked rows
* ExecInitLockRows - initialize node and subnodes..
* ExecEndLockRows - shutdown node and subnodes
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/xact.h"
#include "access/ustore/knl_uheap.h"
#include "catalog/pg_partition_fn.h"
#include "executor/executor.h"
#include "executor/node/nodeLockRows.h"
#ifdef PGXC
#include "pgxc/pgxc.h"
#endif
#include "storage/buf/bufmgr.h"
#include "utils/rel.h"
#include "utils/rel_gs.h"
#include "utils/snapmgr.h"
#include "access/tableam.h"
static TupleTableSlot* ExecLockRows(PlanState* state);
* ExecLockRows
* return: a tuple or NULL
* ----------------------------------------------------------------
*/
static TupleTableSlot* ExecLockRows(PlanState* state)
{
LockRowsState* node = castNode(LockRowsState, state);
TupleTableSlot* slot = NULL;
EState* estate = NULL;
PlanState* outer_plan = NULL;
bool epq_started = false;
ListCell* lc = NULL;
int2 bucket_id = InvalidOid;
Relation target_rel = NULL;
Partition target_part = NULL;
Relation bucket_rel = NULL;
bool orig_early_free = false;
bool orig_early_deinit = false;
CHECK_FOR_INTERRUPTS();
* get information from the node
*/
estate = node->ps.state;
outer_plan = outerPlanState(node);
* Get next tuple from subplan, if any.
*/
lnext:
* EvalPlanQual is called when concurrent lockrows or update or delete
* we should skip early free.
*/
orig_early_free = outer_plan->state->es_skip_early_free;
orig_early_deinit = outer_plan->state->es_skip_early_deinit_consumer;
outer_plan->state->es_skip_early_free = true;
outer_plan->state->es_skip_early_deinit_consumer = true;
* We must reset the targetPart and targetRel to NULL for correct used
* searchFakeReationForPartitionOid in goto condition.
*/
target_rel = NULL;
target_part = NULL;
u_sess->exec_cxt.isLockRows = true;
slot = ExecProcNode(outer_plan);
u_sess->exec_cxt.isLockRows = false;
outer_plan->state->es_skip_early_free = orig_early_free;
outer_plan->state->es_skip_early_deinit_consumer = orig_early_deinit;
if (TupIsNull(slot))
return NULL;
* Attempt to lock the source tuple(s). (Note we only have locking
* rowmarks in lr_arowMarks.)
*/
epq_started = false;
foreach (lc, node->lr_arowMarks) {
ExecAuxRowMark* aerm = (ExecAuxRowMark*)lfirst(lc);
ExecRowMark* erm = aerm->rowmark;
Datum datum;
bool isNull = false;
bool rowMovement = false;
HeapTupleData tuple;
union {
HeapTupleHeaderData hdr;
char data[MaxHeapTupleSize + sizeof(HeapTupleHeaderData)];
} tbuf;
Buffer buffer;
TM_FailureData tmfd;
LockTupleMode lock_mode;
TM_Result test;
Tuple copy_tuple;
target_part = NULL;
if (erm->rti != erm->prti) {
Oid table_oid;
datum = ExecGetJunkAttribute(slot, aerm->toidAttNo, &isNull);
if (isNull) {
ereport(ERROR,
(errcode(ERRCODE_NULL_JUNK_ATTRIBUTE), errmsg("tableoid is NULL when try to lock current row.")));
}
table_oid = DatumGetObjectId(datum);
if (table_oid != RelationGetRelid(erm->relation)) {
ItemPointerSetInvalid(&(erm->curCtid));
continue;
}
}
if (RELATION_OWN_BUCKET(erm->relation)) {
Datum bucket_datum;
bucket_datum = ExecGetJunkAttribute(slot, aerm->tbidAttNo, &isNull);
if (isNull)
ereport(ERROR,
(errcode(ERRCODE_NULL_JUNK_ATTRIBUTE), errmsg("bucketid is NULL when try to lock current row.")));
bucket_id = DatumGetObjectId(bucket_datum);
Assert(bucket_id != InvalidBktId);
}
if (PointerIsValid(erm->relation->partMap)) {
Oid tblid = InvalidOid;
Datum part_datum;
part_datum = ExecGetJunkAttribute(slot, aerm->toidAttNo, &isNull);
rowMovement = erm->relation->rd_rel->relrowmovement;
tblid = DatumGetObjectId(part_datum);
if (tblid != erm->relation->rd_id) {
searchFakeReationForPartitionOid(estate->esfRelations,
estate->es_query_cxt, erm->relation, tblid, INVALID_PARTITION_NO, target_rel,
target_part, RowShareLock);
Assert(tblid == target_rel->rd_id);
}
} else {
target_rel = erm->relation;
}
if (target_rel == NULL) {
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("ExecLockRows:target relation cannot be NULL")));
}
datum = ExecGetJunkAttribute(slot, aerm->ctidAttNo, &isNull);
if (isNull)
ereport(
ERROR, (errcode(ERRCODE_NULL_JUNK_ATTRIBUTE), errmsg("ctid is NULL when try to lock current row.")));
tuple.t_self = *((ItemPointer)DatumGetPointer(datum));
tuple.t_data = &tbuf.hdr;
bucket_rel = target_rel;
if (RELATION_OWN_BUCKET(target_rel)) {
searchHBucketFakeRelation(estate->esfRelations, estate->es_query_cxt, target_rel, bucket_id, bucket_rel);
}
switch (erm->markType) {
case ROW_MARK_EXCLUSIVE:
lock_mode = LockTupleExclusive;
break;
case ROW_MARK_NOKEYEXCLUSIVE:
lock_mode = LockTupleNoKeyExclusive;
break;
case ROW_MARK_SHARE:
lock_mode = LockTupleShared;
break;
case ROW_MARK_KEYSHARE:
lock_mode = LockTupleKeyShare;
break;
default:
elog(ERROR, "unsupported rowmark type");
lock_mode = LockTupleNoKeyExclusive;
break;
}
test = tableam_tuple_lock(bucket_rel, &tuple, &buffer,
estate->es_output_cid, lock_mode, erm->waitPolicy, &tmfd,
#ifdef ENABLE_MULTIPLE_NODES
false, false, false, estate->es_snapshot, NULL, true,
#else
false, true, false, estate->es_snapshot, NULL, true,
#endif
false, InvalidTransactionId,
erm->waitSec);
ReleaseBuffer(buffer);
switch (test) {
case TM_WouldBlock:
goto lnext;
case TM_SelfCreated:
ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("attempted to lock invisible tuple")));
break;
case TM_SelfUpdated:
case TM_SelfModified:
goto lnext;
case TM_Ok:
break;
case TM_Updated:
if (IsolationUsesXactSnapshot())
ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
* EvalPlanQual need to reinitialize child plan to do some recheck due to concurrent update,
* but we wrap the left tree of Stream node in backend thread. So the child plan cannot be
* reinitialized successful now.
*/
if (IS_PGXC_DATANODE && u_sess->exec_cxt.under_stream_runtime &&
estate->es_plannedstmt->num_streams > 0) {
ereport(ERROR, (errcode(ERRCODE_STREAM_CONCURRENT_UPDATE),
errmsg("concurrent update under Stream mode is not yet supported")));
}
copy_tuple = tableam_tuple_lock_updated(estate->es_output_cid, bucket_rel, lock_mode, &tmfd.ctid, tmfd.xmax, estate->es_snapshot, true);
if (copy_tuple == NULL) {
if (rowMovement) {
* the may be a row movement update action which delete tuple from original
* partition and insert tuple to new partition or we can add lock on the tuple to
* be delete or updated to avoid throw exception
*/
ereport(ERROR, (errcode(ERRCODE_TRANSACTION_ROLLBACK),
errmsg("partition table update conflict"),
errdetail("disable row movement of table can avoid this conflict")));
}
goto lnext;
}
tuple.t_self = ((HeapTuple)copy_tuple)->t_self;
* Need to run a recheck subquery. Initialize EPQ state if we
* didn't do so already.
*/
if (!epq_started) {
EvalPlanQualBegin(&node->lr_epqstate, estate);
epq_started = true;
}
EvalPlanQualSetTuple(&node->lr_epqstate, erm->rti, copy_tuple);
break;
case TM_Deleted:
if (IsolationUsesXactSnapshot())
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
if (rowMovement) {
* the may be a row movement update action which delete tuple from original
* partition and insert tuple to new partition or we can add lock on the tuple to
* be delete or updated to avoid throw exception
*/
ereport(ERROR, (errcode(ERRCODE_TRANSACTION_ROLLBACK),
errmsg("partition table update conflict"),
errdetail("disable row movement of table can avoid this conflict")));
}
goto lnext;
default:
ereport(ERROR,
(errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE),
errmsg("unrecognized tuple_lock_tuple status: %d when lock a tuple", test)));
}
erm->curCtid = tuple.t_self;
}
* If we need to do EvalPlanQual testing, do so.
*/
if (epq_started) {
union {
HeapTupleHeaderData hdr;
char data[MaxHeapTupleSize + sizeof(HeapTupleHeaderData)];
} tbuf;
* First, fetch a copy of any rows that were successfully locked
* without any update having occurred. (We do this in a separate pass
* so as to avoid overhead in the common case where there are no
* concurrent updates.)
*/
foreach (lc, node->lr_arowMarks) {
ExecAuxRowMark* aerm = (ExecAuxRowMark*)lfirst(lc);
ExecRowMark* erm = aerm->rowmark;
HeapTupleData tuple;
Buffer buffer;
target_part = NULL;
if (!ItemPointerIsValid(&(erm->curCtid))) {
Assert(erm->rti != erm->prti);
continue;
}
if (EvalPlanQualGetTuple(&node->lr_epqstate, erm->rti) != NULL)
continue;
if (PointerIsValid(erm->relation->partMap)) {
Datum partdatum;
Oid tblid = InvalidOid;
bool partisNull = false;
partdatum = ExecGetJunkAttribute(slot, aerm->toidAttNo, &partisNull);
tblid = DatumGetObjectId(partdatum);
if (tblid != erm->relation->rd_id) {
searchFakeReationForPartitionOid(estate->esfRelations,
estate->es_query_cxt,
erm->relation,
tblid,
INVALID_PARTITION_NO,
target_rel,
target_part,
RowShareLock);
}
} else {
target_rel = erm->relation;
}
if (target_rel == NULL) {
ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("ExecLockRows:target relation cannot be NULL for plan qual recheck.")));
}
tuple.t_self = erm->curCtid;
tuple.t_data = &tbuf.hdr;
bucket_rel = target_rel;
if (RELATION_OWN_BUCKET(target_rel)) {
bucket_id = computeTupleBucketId(target_rel, &tuple);
searchHBucketFakeRelation(estate->esfRelations, estate->es_query_cxt, target_rel, bucket_id, bucket_rel);
}
if (!tableam_tuple_fetch(bucket_rel, SnapshotAny, &tuple, &buffer, false, NULL))
ereport(ERROR,
(errcode(ERRCODE_FETCH_DATA_FAILED), errmsg("failed to fetch tuple for EvalPlanQual recheck")));
EvalPlanQualSetTuple(&node->lr_epqstate, erm->rti, tableam_tops_copy_tuple(&tuple));
ReleaseBuffer(buffer);
}
* Now fetch any non-locked source rows --- the EPQ logic knows how to
* do that.
*/
EvalPlanQualSetSlot(&node->lr_epqstate, slot);
EvalPlanQualFetchRowMarks(&node->lr_epqstate);
orig_early_free = node->lr_epqstate.estate->es_skip_early_free;
orig_early_deinit = node->lr_epqstate.estate->es_skip_early_deinit_consumer;
node->lr_epqstate.estate->es_skip_early_free = true;
node->lr_epqstate.estate->es_skip_early_deinit_consumer = true;
* And finally we can re-evaluate the tuple.
*/
slot = EvalPlanQualNext(&node->lr_epqstate);
node->lr_epqstate.estate->es_skip_early_free = orig_early_free;
node->lr_epqstate.estate->es_skip_early_deinit_consumer = orig_early_deinit;
if (TupIsNull(slot)) {
goto lnext;
}
}
return slot;
}
* ExecInitLockRows
*
* This initializes the LockRows node state structures and
* the node's subplan.
* ----------------------------------------------------------------
*/
LockRowsState* ExecInitLockRows(LockRows* node, EState* estate, int eflags)
{
LockRowsState* lrstate = makeNode(LockRowsState);
Plan* outer_plan = outerPlan(node);
List* epq_arowmarks = NIL;
ListCell* lc = NULL;
Assert(!(eflags & EXEC_FLAG_MARK));
lrstate->ps.plan = (Plan*)node;
lrstate->ps.state = estate;
lrstate->ps.ExecProcNode = ExecLockRows;
* Miscellaneous initialization
*
* LockRows nodes never call ExecQual or ExecProject.
*/
* Tuple table initialization (XXX not actually used...)
*/
ExecInitResultTupleSlot(estate, &lrstate->ps);
u_sess->exec_cxt.isLockRows = true;
* then initialize outer plan
*/
outerPlanState(lrstate) = ExecInitNode(outer_plan, estate, eflags);
u_sess->exec_cxt.isLockRows = false;
* LockRows nodes do no projections, so initialize projection info for
* this node appropriately
*/
TupleDesc resultDesc = ExecGetResultType(outerPlanState(lrstate));
ExecAssignResultTypeFromTL(&lrstate->ps, resultDesc->td_tam_ops);
lrstate->ps.ps_ProjInfo = NULL;
Assert(lrstate->ps.ps_ResultTupleSlot->tts_tupleDescriptor->td_tam_ops);
* Locate the ExecRowMark(s) that this node is responsible for, and
* construct ExecAuxRowMarks for them. (InitPlan should already have
* built the global list of ExecRowMarks.)
*/
lrstate->lr_arowMarks = NIL;
epq_arowmarks = NIL;
foreach (lc, node->rowMarks) {
PlanRowMark* rc = (PlanRowMark*)lfirst(lc);
ExecRowMark* erm = NULL;
ExecAuxRowMark* aerm = NULL;
Assert(IsA(rc, PlanRowMark));
if (rc->isParent)
continue;
if (!(IS_PGXC_COORDINATOR || u_sess->pgxc_cxt.PGXCNodeId < 0 ||
bms_is_member(u_sess->pgxc_cxt.PGXCNodeId, rc->bms_nodeids))) {
continue;
}
erm = ExecFindRowMark(estate, rc->rti);
aerm = ExecBuildAuxRowMark(erm, outer_plan->targetlist);
* Only locking rowmarks go into our own list. Non-locking marks are
* passed off to the EvalPlanQual machinery. This is because we don't
* want to bother fetching non-locked rows unless we actually have to
* do an EPQ recheck.
*/
if (RowMarkRequiresRowShareLock(erm->markType))
lrstate->lr_arowMarks = lappend(lrstate->lr_arowMarks, aerm);
else
epq_arowmarks = lappend(epq_arowmarks, aerm);
}
EvalPlanQualInit(&lrstate->lr_epqstate, estate, outer_plan, epq_arowmarks, node->epqParam);
return lrstate;
}
* ExecEndLockRows
*
* This shuts down the subplan and frees resources allocated
* to this node.
* ----------------------------------------------------------------
*/
void ExecEndLockRows(LockRowsState* node)
{
EvalPlanQualEnd(&node->lr_epqstate);
ExecEndNode(outerPlanState(node));
}
void ExecReScanLockRows(LockRowsState* node)
{
* if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
*/
if (node->ps.lefttree->chgParam == NULL)
ExecReScan(node->ps.lefttree);
}