*
* ubtinsert.cpp
* Item insertion in Lehman and Yao btrees for Postgres.
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/gausskernel/storage/access/ubtree/ubtinsert.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/heapam.h"
#include "access/nbtree.h"
#include "access/ubtree.h"
#include "access/transam.h"
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "access/genam.h"
#include "access/ustore/knl_uverify.h"
#include "catalog/index.h"
#include "miscadmin.h"
#include "storage/buf/bufmgr.h"
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/smgr/smgr.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/inval.h"
#include "utils/snapmgr.h"
#include "pgstat.h"
static TransactionId UBTreeCheckUnique(Relation rel, IndexTuple itup, Relation heapRel, Buffer buf,
OffsetNumber offset, BTScanInsert itup_key, IndexUniqueCheck checkUnique, bool *is_unique, GPIScanDesc gpiScan);
static OffsetNumber UBTreeFindInsertLoc(Relation rel, Buffer *bufptr, OffsetNumber firstlegaloff, BTScanInsert itup_key,
IndexTuple newtup, bool checkingunique, BTStack stack, Relation heapRel);
static OffsetNumber UBTreeFindDeleteLoc(Relation rel, Buffer* bufP, OffsetNumber offset,
BTScanInsert itup_key, IndexTuple itup);
static void UBTreeInsertOnPage(Relation rel, BTScanInsert itup_key, Buffer buf, Buffer cbuf, BTStack stack,
IndexTuple itup, OffsetNumber newitemoff, bool split_only_page);
static Buffer UBTreeSplit(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, OffsetNumber newitemoff,
Size newitemsz, IndexTuple newitem, bool newitemonleft, BTScanInsert itup_key);
static bool UBTreePageAddTuple(Page page, Size itemsize, IndexTuple itup, OffsetNumber itup_off, bool isnew);
static void UBTreeDeleteOnPage(Relation rel, Buffer buf, OffsetNumber offset, bool isRollbackIndex);
static Buffer UBTreeNewRoot(Relation rel, Buffer lbuf, Buffer rbuf);
* UBTreePagePruneOpt() -- Optional prune a index page
*
* This function trying to prune the page if possible, and always do BtPageRepairFragmentation()
* if we deleted any dead tuple.
*
* The passed tryDelete indicate whether we just try to delete the whole page. When this value is
* true, we will do nothing if there is an alive tuple in the page, and delete the whole page if all
* tuples are dead.
*
* IMPORTANT: Make sure we've got the write lock on the buf before call this function.
*
* Note: The page must be a leaf page.
*/
bool UBTreePagePruneOpt(Relation rel, Buffer buf, bool tryDelete, BTStack del_blknos)
{
Page page = BufferGetPage(buf);
UBTPageOpaqueInternal opaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
WHITEBOX_TEST_STUB("UBTreePagePruneOpt", WhiteboxDefaultErrorEmit);
* We can't write WAL in recovery mode, so there's no point trying to
* clean the page. The master will likely issue a cleaning WAL record soon
* anyway, so this is no particular loss.
*/
if (RecoveryInProgress()) {
if (tryDelete)
_bt_relbuf(rel, buf);
return false;
}
if (!P_ISLEAF(opaque)) {
if (tryDelete)
_bt_relbuf(rel, buf);
return false;
}
if (tryDelete && P_RIGHTMOST(opaque)) {
_bt_relbuf(rel, buf);
return false;
}
TransactionId oldestXmin = u_sess->utils_cxt.RecentGlobalDataXmin;
if (RelationGetNamespace(rel) == PG_TOAST_NAMESPACE) {
GetOldestXminForUndo(&oldestXmin);
}
Assert(TransactionIdIsValid(oldestXmin));
* Should not prune page when use local snapshot, InitPostgres e.g.
* If use local snapshot RecentXmin might not consider xid cn send later,
* wrong xid status might be judged in TransactionIdIsInProgress,
* and wrong tuple infomask might be set in HeapTupleSatisfiesVacuum.
* So if useLocalSnapshot is ture in postmaster env, we don't prune page.
* Keep prune page can be done in single mode (standlone --single), so just in PostmasterEnvironment.
*/
if ((t_thrd.xact_cxt.useLocalSnapshot && IsPostmasterEnvironment) ||
g_instance.attr.attr_storage.IsRoachStandbyCluster || u_sess->attr.attr_common.upgrade_mode == 1) {
if (tryDelete)
_bt_relbuf(rel, buf);
return false;
}
* Let's see if we really need pruning.
*
* Forget it if page is not hinted to contain something prunable that's
* older than oldestXmin.
*/
if (TransactionIdIsValid(PageGetPruneXid(page)) && !IndexPageIsPrunable(page, oldestXmin)) {
if (tryDelete)
_bt_relbuf(rel, buf);
return false;
}
if (tryDelete) {
UBTreePagePrune(rel, buf, oldestXmin);
Assert(!P_RIGHTMOST(opaque));
if (PageGetMaxOffsetNumber(page) == 1) {
return UBTreePageDel(rel, buf, del_blknos) > 0;
} else {
_bt_relbuf(rel, buf);
return false;
}
}
return UBTreePagePrune(rel, buf, oldestXmin);
}
* UBTreePagePrune() -- Apply page pruning to the page.
*
* If passed tryDelete parameter is false, delete all dead tuples in the page, then repair the fragmentation.
* If passed tryDelete is true, only delete the whole page when all tuples are dead.
*
* Note: we won't delete the high key (if any).
*/
bool UBTreePagePrune(Relation rel, Buffer buf, TransactionId oldestXmin, OidRBTree *invisibleParts)
{
int npreviousDead = 0;
int16 activeTupleCount = 0;
bool has_pruned = false;
Page page = BufferGetPage(buf);
UBTPageOpaqueInternal opaque;
OffsetNumber offnum, maxoff;
IndexPruneState prstate;
WHITEBOX_TEST_STUB("UBTreePagePruneOpt", WhiteboxDefaultErrorEmit);
#ifdef XLOG_BTREE_PRUNE_DEBUG
errno_t rc = memcpy_s(xlrec.pagebak, sizeof(xl_ubtree_prune_page), page, BLCKSZ);
securec_check(rc, "\0", "\0");
#endif
prstate.new_prune_xid = InvalidTransactionId;
prstate.latestRemovedXid = InvalidTransactionId;
prstate.ndead = 0;
opaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
for (offnum = P_FIRSTDATAKEY(opaque); offnum <= maxoff; offnum = OffsetNumberNext(offnum)) {
ItemId itemid;
itemid = PageGetItemId(page, offnum);
if (ItemIdIsDead(itemid)) {
prstate.previousdead[npreviousDead] = offnum;
npreviousDead++;
continue;
}
if (RelationIsGlobalIndex(rel) && invisibleParts != NULL) {
AttrNumber partitionOidAttr = IndexRelationGetNumberOfAttributes(rel);
TupleDesc tupdesc = RelationGetDescr(rel);
IndexTuple itup = (IndexTuple)PageGetItem(page, itemid);
bool isnull = false;
Oid partOid = DatumGetUInt32(index_getattr(itup, partitionOidAttr, tupdesc, &isnull));
Assert(!isnull);
if (OidRBTreeMemberOid(invisibleParts, partOid)) {
prstate.nowdead[prstate.ndead] = offnum;
prstate.ndead++;
continue;
}
}
if (UBTreePruneItem(page, offnum, oldestXmin, &prstate,
RelationGetNamespace(rel) == PG_TOAST_NAMESPACE)) {
activeTupleCount++;
}
}
START_CRIT_SECTION();
if (npreviousDead > 0 || prstate.ndead > 0) {
WaitState oldStatus = pgstat_report_waitstatus(STATE_PRUNE_INDEX);
UBTreePagePruneExecute(page, prstate.previousdead, npreviousDead, &prstate, oldestXmin);
UBTreePagePruneExecute(page, prstate.nowdead, prstate.ndead, &prstate, oldestXmin);
UBTreePageRepairFragmentation(rel, BufferGetBlockNumber(buf), page);
has_pruned = true;
* Update the page's pd_prune_xid field to either zero, or the lowest
* XID of any soon-prunable tuple.
*/
PageSetPruneXid(page, prstate.new_prune_xid);
opaque->activeTupleCount = activeTupleCount;
MarkBufferDirty(buf);
if (RelationNeedsWAL(rel)) {
XLogRecPtr recptr;
xl_ubtree_prune_page xlrec;
xlrec.count = npreviousDead + prstate.ndead;
xlrec.new_prune_xid = prstate.new_prune_xid;
xlrec.latestRemovedXid = prstate.latestRemovedXid;
XLogBeginInsert();
XLogRegisterData((char*)&xlrec, SizeOfUBTreePrunePage);
XLogRegisterData((char*)&prstate.previousdead, npreviousDead * sizeof(OffsetNumber));
XLogRegisterData((char*)&prstate.nowdead, prstate.ndead * sizeof(OffsetNumber));
XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
recptr = XLogInsert(RM_UBTREE_ID, XLOG_UBTREE_PRUNE_PAGE);
PageSetLSN(page, recptr);
}
pgstat_report_waitstatus(oldStatus);
} else {
* If we didn't prune anything, but have found a new value for the
* pd_prune_xid field, update it and mark the buffer dirty. This is
* treated as a non-WAL-logged hint.
*
* Also clear the "page is full" flag if it is set, since there's no
* point in repeating the prune/defrag process until something else
* happens to the page.
*/
if (PageGetPruneXid(page) != prstate.new_prune_xid || opaque->activeTupleCount != activeTupleCount) {
PageSetPruneXid(page, prstate.new_prune_xid);
opaque->activeTupleCount = activeTupleCount;
MarkBufferDirtyHint(buf, true);
}
}
END_CRIT_SECTION();
UBTreeVerify(rel, page, BufferGetBlockNumber(buf));
return has_pruned;
}
* UBTreePruneItem() -- Check a item's status, and update prune state if found any dead tuple or new_prune_xid.
*
* The return value indicates whether the item is active.
*/
bool UBTreePruneItem(Page page, OffsetNumber offnum, TransactionId oldestXmin, IndexPruneState* prstate,
bool isToast)
{
TransactionId xmin, xmax;
bool isDead = false;
bool xminCommitted = false;
bool xmaxCommitted = false;
isDead = UBTreeItupGetXminXmax(page, offnum, oldestXmin, &xmin, &xmax,
&xminCommitted, &xmaxCommitted, isToast);
* INDEXTUPLE_DEAD: xmin invalid || xmax frozen
* INDEXTUPLE_DELETED: xmax valid && xmax committed
* INDEXTUPLE_ACTIVE: else
*/
if (isDead) {
prstate->nowdead[prstate->ndead] = offnum;
prstate->ndead++;
return false;
}
if (TransactionIdIsValid(xmax) && xmaxCommitted) {
if (!TransactionIdIsValid(prstate->new_prune_xid) || TransactionIdPrecedes(xmax, prstate->new_prune_xid))
prstate->new_prune_xid = xmax;
}
return !TransactionIdIsValid(xmax);
}
* UBTreePagePruneExecute() -- Traverse the nowdead array, set the corresponding tuples as dead.
*/
void UBTreePagePruneExecute(Page page, OffsetNumber *nowdead, int ndead, IndexPruneState *prstate,
TransactionId oldest_xmin)
{
OffsetNumber *offnum = NULL;
int i;
UBTPageOpaqueInternal opaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
offnum = nowdead;
for (i = 0; i < ndead; i++) {
OffsetNumber off = *offnum++;
ItemId lp = PageGetItemId(page, off);
if (prstate != NULL && ItemIdHasStorage(lp)) {
IndexTuple itup = (IndexTuple)PageGetItem(page, lp);
UstoreIndexXid uxid = (UstoreIndexXid)UstoreIndexTupleGetXid(itup);
TransactionId xid = ShortTransactionIdToNormal(opaque->xid_base, uxid->xmax);
if (TransactionIdIsNormal(xid)) {
if ((!TransactionIdIsValid(prstate->latestRemovedXid) ||
TransactionIdPrecedes(prstate->latestRemovedXid, xid)) &&
TransactionIdPrecedes(xid, oldest_xmin)) {
prstate->latestRemovedXid = xid;
}
}
}
ItemIdSetDead(lp);
}
}
static int ItemOffCompare(const void* itemIdp1, const void* itemIdp2)
{
return ((itemIdSort)itemIdp2)->itemoff - ((itemIdSort)itemIdp1)->itemoff;
}
* UBTreePageRepairFragmentation() -- Repair the fragmentation in the page.
*
* On the basis of PageRepairFragmentation(Page page), we ignore redirected/unused, compact both
* line pointers and index tuples.
*/
void UBTreePageRepairFragmentation(Relation rel, BlockNumber blkno, Page page)
{
Offset pdLower = ((PageHeader)page)->pd_lower;
Offset pdUpper = ((PageHeader)page)->pd_upper;
Offset pdSpecial = ((PageHeader)page)->pd_special;
WHITEBOX_TEST_STUB("UBTreePageRepairFragmentation", WhiteboxDefaultErrorEmit);
const char *relName = (rel ? RelationGetRelationName(rel) : "Unknown During Redo");
if ((unsigned int)(pdLower) < SizeOfPageHeaderData || pdLower > pdUpper || pdUpper > pdSpecial ||
pdSpecial > BLCKSZ || (unsigned int)(pdSpecial) != MAXALIGN(pdSpecial))
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %d, upper = %d, special = %d. rel \"%s\", blkno %u",
pdLower, pdUpper, pdSpecial, relName, blkno)));
UBTPageOpaqueInternal opaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
int nline = PageGetMaxOffsetNumber(page);
int nstorage = 0;
Size totallen = 0;
itemIdSortData itemidbase[MaxIndexTuplesPerPage];
itemIdSort itemidptr = itemidbase;
for (int i = FirstOffsetNumber; i <= nline; i++) {
ItemId lp = PageGetItemId(page, i);
if (ItemIdHasStorage(lp)) {
nstorage++;
itemidptr->offsetindex = nstorage;
itemidptr->itemoff = ItemIdGetOffset(lp);
itemidptr->olditemid = *lp;
if (itemidptr->itemoff < (int)pdUpper || itemidptr->itemoff >= (int)pdSpecial)
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted item pointer: %d. rel \"%s\", blkno %u",
itemidptr->itemoff, relName, blkno)));
itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
totallen += itemidptr->alignedlen;
itemidptr++;
}
}
if (totallen > (Size)(pdSpecial - pdLower))
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted item lengths: total %u, available space %d. rel \"%s\", blkno %u",
(unsigned int)totallen, pdSpecial - pdLower, relName, blkno)));
qsort((char*)itemidbase, nstorage, sizeof(itemIdSortData), ItemOffCompare);
Offset upper = pdSpecial;
itemidptr = itemidbase;
for (int i = 0; i < nstorage; i++, itemidptr++) {
ItemId lp = PageGetItemId(page, itemidptr->offsetindex);
upper -= itemidptr->alignedlen;
errno_t rc = memmove_s((char *)page + upper, itemidptr->alignedlen, (char *)page + itemidptr->itemoff,
itemidptr->alignedlen);
securec_check(rc, "\0", "\0");
*lp = itemidptr->olditemid;
lp->lp_off = upper;
}
opaque->btpo_flags &= ~BTP_HAS_GARBAGE;
((PageHeader)page)->pd_lower = GetPageHeaderSize(page) + nstorage * sizeof(ItemIdData);
((PageHeader)page)->pd_upper = upper;
}
* UBTreeDoInsert() -- Handle insertion of a single index tuple in the tree.
*
* This routine is called by the public interface routines, btbuild
* and btinsert. By here, itup is filled in, including the TID.
*
* If checkUnique is UNIQUE_CHECK_NO or UNIQUE_CHECK_PARTIAL, this
* will allow duplicates. Otherwise (UNIQUE_CHECK_YES or
* UNIQUE_CHECK_EXISTING) it will throw error for a duplicate.
* For UNIQUE_CHECK_EXISTING we merely run the duplicate check, and
* don't actually insert.
*
* The result value is only significant for UNIQUE_CHECK_PARTIAL
* it must be TRUE if the entry is known unique, else FALSE.
* (In the current implementation we'll also return TRUE after a
* successful UNIQUE_CHECK_YES or UNIQUE_CHECK_EXISTING call, but
* that's just a coding artifact.)
*/
bool UBTreeDoInsert(Relation rel, IndexTuple itup, IndexUniqueCheck checkUnique, Relation heapRel)
{
bool is_unique = false;
BTScanInsert itupKey;
BTStack stack = NULL;
Buffer buf;
OffsetNumber offset;
Oid indexHeapRelOid = InvalidOid;
Relation indexHeapRel = NULL;
Partition part = NULL;
Relation partRel = NULL;
bool checkingunique = (checkUnique != UNIQUE_CHECK_NO);
WHITEBOX_TEST_STUB("UBTreeDoInsert-begin", WhiteboxDefaultErrorEmit);
if (rel == NULL || rel->rd_rel == NULL) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("relation or rd_rel is NULL")));
}
itupKey = UBTreeMakeScanKey(rel, itup);
GPIScanDesc gpiScan = NULL;
if (RelationIsGlobalIndex(rel)) {
Assert(RelationIsPartition(heapRel));
GPIScanInit(&gpiScan);
indexHeapRelOid = IndexGetRelation(rel->rd_id, false);
Assert(OidIsValid(indexHeapRelOid));
if (indexHeapRelOid == heapRel->grandparentId) {
indexHeapRel = relation_open(indexHeapRelOid, AccessShareLock);
Assert(RelationIsSubPartitioned(indexHeapRel));
part = partitionOpen(indexHeapRel, heapRel->parentId, AccessShareLock);
partRel = partitionGetRelation(indexHeapRel, part);
gpiScan->parentRelation = partRel;
partitionClose(indexHeapRel, part, AccessShareLock);
} else {
gpiScan->parentRelation = relation_open(heapRel->parentId, AccessShareLock);
}
}
if (checkingunique) {
if (!itupKey->anynullkeys) {
itupKey->scantid = NULL;
} else {
* Scan key for new tuple contains NULL key values. Bypass
* checkingunique steps. They are unnecessary because core code
* considers NULL unequal to every value, including NULL.
*
* This optimization avoids O(N^2) behavior within the
* _bt_findinsertloc() heapkeyspace path when a unique index has a
* large number of "duplicates" with NULL key values.
*/
checkingunique = false;
Assert(checkUnique != UNIQUE_CHECK_EXISTING);
is_unique = true;
}
}
* It's very common to have an index on an auto-incremented or
* monotonically increasing value. In such cases, every insertion happens
* towards the end of the index. We try to optimise that case by caching
* the right-most leaf of the index. If our cached block is still the
* rightmost leaf, has enough free space to accommodate a new entry and
* the insertion key is strictly greater than the first key in this page,
* then we can safely conclude that the new key will be inserted in the
* cached block. So we simply search within the cached block and insert the
* key at the appropriate location. We call it a fastpath.
*
* Testing has revealed, though, that the fastpath can result in increased
* contention on the exclusive-lock on the rightmost leaf page. So we
* conditionally check if the lock is available. If it's not available then
* we simply abandon the fastpath and take the regular path. This makes
* sense because unavailability of the lock also signals that some other
* backend might be concurrently inserting into the page, thus reducing our
* chances to finding an insertion place in this page.
*/
top:
bool fastpath = false;
offset = InvalidOffsetNumber;
if (RelationGetTargetBlock(rel) != InvalidBlockNumber) {
* Conditionally acquire exclusive lock on the buffer before doing any
* checks. If we don't get the lock, we simply follow slowpath. If we
* do get the lock, this ensures that the index state cannot change, as
* far as the rightmost part of the index is concerned.
*/
buf = ReadBuffer(rel, RelationGetTargetBlock(rel));
if (ConditionalLockBuffer(buf)) {
_bt_checkpage(rel, buf);
Page page = BufferGetPage(buf);
UBTPageOpaqueInternal lpageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
Size itemsz = MAXALIGN(IndexTupleSize(itup));
* Check if the page is still the rightmost leaf page, has enough
* free space to accommodate the new tuple, no split is in progress
* and the scankey is greater than or equal to the first key on the
* page.
*/
if (P_ISLEAF(lpageop) && P_RIGHTMOST(lpageop) && !P_INCOMPLETE_SPLIT(lpageop) && !P_IGNORE(lpageop) &&
(PageGetFreeSpace(page) > itemsz) && PageGetMaxOffsetNumber(page) >= P_FIRSTDATAKEY(lpageop) &&
UBTreeCompare(rel, itupKey, page, P_FIRSTDATAKEY(lpageop), InvalidBuffer) > 0) {
fastpath = true;
} else {
_bt_relbuf(rel, buf);
* Something did not workout. Just forget about the cached
* block and follow the normal path. It might be set again if
* the conditions are favourble.
*/
RelationSetTargetBlock(rel, InvalidBlockNumber);
}
} else {
ReleaseBuffer(buf);
* If someone's holding a lock, it's likely to change anyway,
* so don't try again until we get an updated rightmost leaf.
*/
RelationSetTargetBlock(rel, InvalidBlockNumber);
}
}
if (!fastpath) {
stack = UBTreeSearch(rel, itupKey, &buf, BT_WRITE);
}
WHITEBOX_TEST_STUB("UBTreeDoInsert-found", WhiteboxDefaultErrorEmit);
* If we're not allowing duplicates, make sure the key isn't already in
* the index.
*
* NOTE: obviously, _bt_check_unique can only detect keys that are already
* in the index; so it cannot defend against concurrent insertions of the
* same key. We protect against that by means of holding a write lock on
* the first page the value could be on, regardless of the value of its
* implicit heap TID tiebreaker attribute. Any other would-be inserter of
* the same key must acquire a write lock on the same page, so only one
* would-be inserter can be making the check at one time. Furthermore,
* once we are past the check we hold write locks continuously until we
* have performed our insertion, so no later inserter can fail to see our
* insertion. (This requires some care in _bt_findinsertloc.)
*
* If we must wait for another xact, we release the lock while waiting,
* and then must start over completely.
*
* For a partial uniqueness check, we don't wait for the other xact. Just
* let the tuple in and return false for possibly non-unique, or true for
* definitely unique.
*/
if (checkingunique) {
TransactionId xwait;
offset = UBTreeBinarySearch(rel, itupKey, buf, true);
xwait = UBTreeCheckUnique(rel, itup, heapRel, buf, offset, itupKey, checkUnique, &is_unique, gpiScan);
if (unlikely(TransactionIdIsValid(xwait))) {
_bt_relbuf(rel, buf);
buf = InvalidBuffer;
XactLockTableWait(xwait);
if (stack) {
_bt_freestack(stack);
}
goto top;
}
if (itupKey->heapkeyspace) {
itupKey->scantid = &itup->t_tid;
}
}
if (checkUnique != UNIQUE_CHECK_EXISTING) {
* The only conflict predicate locking cares about for indexes is when
* an index tuple insert conflicts with an existing lock. We don't
* know the actual page we're going to insert to yet because scantid
* was not filled in initially, but it's okay to use the "first valid"
* page instead. This reasoning also applies to INCLUDE indexes,
* whose extra attributes are not considered part of the key space.
*/
CheckForSerializableConflictIn(rel, NULL, buf);
offset = UBTreeFindInsertLoc(rel, &buf, offset, itupKey, itup, checkingunique, stack, heapRel);
UBTreeInsertOnPage(rel, itupKey, buf, InvalidBuffer, stack, itup, offset, false);
} else {
_bt_relbuf(rel, buf);
}
if (stack) {
_bt_freestack(stack);
}
pfree(itupKey);
if (gpiScan != NULL) {
if (indexHeapRelOid == heapRel->grandparentId) {
releaseDummyRelation(&partRel);
relation_close(indexHeapRel, AccessShareLock);
} else {
relation_close(gpiScan->parentRelation, AccessShareLock);
}
GPIScanEnd(gpiScan);
}
WHITEBOX_TEST_STUB("UBTreeDoInsert-end", WhiteboxDefaultErrorEmit);
return is_unique;
}
* UBTreeCheckUnique() -- Check for violation of unique index constraint
*
* offset points to the first possible item that could conflict. It can
* also point to end-of-page, which means that the first tuple to check
* is the first tuple on the next page.
*
* Returns InvalidTransactionId if there is no conflict, else an xact ID
* we must wait for to see if it commits a conflicting tuple. If an actual
* conflict is detected, no return --- just ereport().
*
* However, if checkUnique == UNIQUE_CHECK_PARTIAL, we always return
* InvalidTransactionId because we don't want to wait. In this case we
* set *is_unique to false if there is a potential conflict, and the
* core code must redo the uniqueness check later.
*/
static TransactionId UBTreeCheckUnique(Relation rel, IndexTuple itup, Relation heapRel, Buffer buf, OffsetNumber offset,
BTScanInsert itup_key, IndexUniqueCheck checkUnique, bool* is_unique, GPIScanDesc gpiScan)
{
SnapshotData SnapshotDirty;
OffsetNumber maxoff;
Page page;
UBTPageOpaqueInternal opaque;
Buffer nbuf = InvalidBuffer;
bool found = false;
Relation tarRel = heapRel;
WHITEBOX_TEST_STUB("UBTreeCheckUnique", WhiteboxDefaultErrorEmit);
*is_unique = true;
InitDirtySnapshot(SnapshotDirty);
page = BufferGetPage(buf);
opaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
* Scan over all equal tuples, looking for live conflicts.
*/
Assert(!itup_key->anynullkeys);
Assert(itup_key->scantid == NULL);
for (;;) {
ItemId curitemid;
IndexTuple curitup;
BlockNumber nblkno;
* make sure the offset points to an actual item before trying to
* examine it...
*/
if (offset <= maxoff) {
curitemid = PageGetItemId(page, offset);
* We can skip items that are marked killed.
*
* Formerly, we applied _bt_isequal() before checking the kill
* flag, so as to fall out of the item loop as soon as possible.
* However, in the presence of heavy update activity an index may
* contain many killed items with the same key; running
* _bt_isequal() on each killed item gets expensive. Furthermore
* it is likely that the non-killed version of each key appears
* first, so that we didn't actually get to exit any sooner
* anyway. So now we just advance over killed items as quickly as
* we can. We only apply _bt_isequal() when we get to a non-killed
* item or the end of the page.
*/
if (!ItemIdIsDead(curitemid)) {
ItemPointerData htid;
* _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
* how we handling NULLs - and so we must not use _bt_compare
* in real comparison, but only for ordering/finding items on
* pages. - vadim 03/24/97
*/
if (!UBTreeIsEqual<UBTPageOpaqueInternal>(rel, page, offset, itup_key->keysz, itup_key->scankeys)) {
break;
}
curitup = (IndexTuple) PageGetItem(page, curitemid);
htid = curitup->t_tid;
Oid curPartOid = InvalidOid;
Datum datum;
bool isNull;
if (RelationIsGlobalIndex(rel)) {
datum = index_getattr(curitup, IndexRelationGetNumberOfAttributes(rel),
RelationGetDescr(rel), &isNull);
curPartOid = DatumGetUInt32(datum);
Assert(isNull == false);
if (curPartOid != gpiScan->currPartOid) {
GPISetCurrPartOid(gpiScan, curPartOid);
if (!GPIGetNextPartRelation(gpiScan, CurrentMemoryContext, AccessShareLock)) {
if (CheckPartitionIsInvisible(gpiScan)) {
ItemIdMarkDead(curitemid);
}
opaque->btpo_flags |= BTP_HAS_GARBAGE;
if (nbuf != InvalidBuffer) {
MarkBufferDirtyHint(nbuf, true);
} else {
MarkBufferDirtyHint(buf, true);
}
goto next;
} else {
tarRel = gpiScan->fakePartRelation;
}
}
}
* If we are doing an index recheck (UNIQUE_CHECK_EXISTING mode), we expect
* to find tuple already in ubtree. Once the index tuple matched, it will be
* marked found. Traverse all tuples, if no matching tuple is found, report
* an error. For GPI, part oid should be the same as heapRel oid.
*/
if (checkUnique == UNIQUE_CHECK_EXISTING && ItemPointerCompare(&htid, &itup->t_tid) == 0
&& !RelationIsGlobalIndex(rel)) {
found = true;
} else if (checkUnique == UNIQUE_CHECK_EXISTING && ItemPointerCompare(&htid, &itup->t_tid) == 0
&& curPartOid == heapRel->rd_id && RelationIsGlobalIndex(rel)) {
found = true;
} else {
TransactionId xmin, xmax;
bool isdead = false;
bool xminCommitted = false;
bool xmaxCommitted = false;
bool xminVisible = false;
bool xmaxVisible = false;
isdead = UBTreeItupGetXminXmax(page, offset, InvalidTransactionId,
&xmin, &xmax, &xminCommitted, &xmaxCommitted,
RelationGetNamespace(rel) == PG_TOAST_NAMESPACE);
if (checkUnique == UNIQUE_CHECK_PARTIAL) {
if (TransactionIdIsValid(xmin) && !TransactionIdIsValid(xmax)) {
if (nbuf != InvalidBuffer)
_bt_relbuf(rel, nbuf);
*is_unique = false;
return InvalidTransactionId;
}
}
xminVisible = xminCommitted || TransactionIdIsCurrentTransactionId(xmin);
xmaxVisible = xmaxCommitted || TransactionIdIsCurrentTransactionId(xmax);
* By here, there are 3 cases:
* 1. conflict, when xmin is valid *and* xmax is not valid.
* 2. not conflict, when xmin is not valid *or* xmax is valid.
* 3. unknown, when xmin or xmax is in-progress.
*
* In case 3, we have to return the xid in order to tell the caller to wait for the xid.
*/
if (TransactionIdIsValid(xmin) && xminVisible && !TransactionIdIsValid(xmax)) {
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
char *key_desc = NULL;
index_deform_tuple(itup, RelationGetDescr(rel), values, isnull);
key_desc = BuildIndexValueDescription(rel, values, isnull);
* violates unique constraint. this means wrong max cu id happens.
*/
Assert(memcmp(NameStr(rel->rd_rel->relname), "pg_cudesc_", strlen("pg_cudesc_")) != 0);
ereport(ERROR, (errcode(ERRCODE_UNIQUE_VIOLATION),
errmsg("duplicate key value violates unique constraint \"%s\"",
RelationGetRelationName(rel)),
key_desc ? errdetail("Key %s already exists.", key_desc) : 0));
}
if (!TransactionIdIsValid(xmin) || (TransactionIdIsValid(xmax) && xmaxVisible))
goto next;
if (nbuf != InvalidBuffer)
_bt_relbuf(rel, nbuf);
return TransactionIdIsValid(xmax) ? xmax : xmin;
}
}
}
next:
* Advance to next tuple to continue checking.
*/
if (offset < maxoff)
offset = OffsetNumberNext(offset);
else {
if (P_RIGHTMOST(opaque))
break;
int highkeycmp = UBTreeCompare(rel, itup_key, page, P_HIKEY, InvalidBuffer);
Assert(highkeycmp <= 0);
if (highkeycmp != 0) {
break;
}
for (;;) {
nblkno = opaque->btpo_next;
nbuf = _bt_relandgetbuf(rel, nbuf, nblkno, BT_READ);
page = BufferGetPage(nbuf);
opaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
break;
if (P_RIGHTMOST(opaque))
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("fell off the end of index \"%s\" at blkno %u",
RelationGetRelationName(rel), nblkno)));
}
maxoff = PageGetMaxOffsetNumber(page);
offset = P_FIRSTDATAKEY(opaque);
}
}
* If we are doing a recheck then we should have found the tuple we are
* checking. Otherwise there's something very wrong --- probably, the
* index is on a non-immutable expression.
*/
if (checkUnique == UNIQUE_CHECK_EXISTING && !found)
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to re-find tuple within index \"%s\"", RelationGetRelationName(rel)),
errhint("This may be because of a non-immutable index expression.")));
if (nbuf != InvalidBuffer)
_bt_relbuf(rel, nbuf);
return InvalidTransactionId;
}
* UBTreeFindInsertLoc() -- Finds an insert location for a tuple
*
* On entry, *buf and *offsetptr point to the first legal position
* where the new tuple could be inserted. The caller should hold an
* exclusive lock on *buf. *offsetptr can also be set to
* InvalidOffsetNumber, in which case the function will search for the
* right location within the page if needed.
*
* If 'checkingunique' is true, the buffer on entry is the first page
* that contains duplicates of the new key. If there are duplicates on
* multiple pages, the correct insertion position might be some page to
* the right, rather than the first page. In that case, this function
* moves right to the correct target page.
*
* If the new key is equal to one or more existing keys, we can
* legitimately place it anywhere in the series of equal keys --- in fact,
* if the new key is equal to the page's "high key" we can place it on
* the next page. If it is equal to the high key, and there's not room
* to insert the new tuple on the current page without splitting, then
* we can move right hoping to find more free space and avoid a split.
* (We should not move right indefinitely, however, since that leads to
* O(N^2) insertion behavior in the presence of many equal keys.)
* Once we have chosen the page to put the key on, we'll insert it before
* any existing equal keys because of the way _bt_binsrch() works.
*
* (In a !heapkeyspace index, there can be multiple pages with the same
* high key, where the new tuple could legitimately be placed on. In
* that case, the caller passes the first page containing duplicates,
* just like when checkinunique=true. If that page doesn't have enough
* room for the new tuple, this function moves right, trying to find a
* legal page that does.)
*
* On exit, *buf and *offsetptr point to the chosen insert location. If
* _bt_findinsertloc decides to move right, the lock and pin on the
* original page will be released and the new page returned to the caller
* is exclusively locked instead.
*
* If there is not enough room on the page for the new tuple, we try to
* make room by removing any LP_DEAD tuples. (Try pruning page in UStore's
* multi-version index)
*
* newtup is the new tuple we're inserting, and scankey is an insertion
* type scan key for it.
*/
static OffsetNumber UBTreeFindInsertLoc(Relation rel, Buffer *bufptr, OffsetNumber firstlegaloff, BTScanInsert itup_key,
IndexTuple newtup, bool checkingunique, BTStack stack, Relation heapRel)
{
Buffer buf = *bufptr;
Page page = BufferGetPage(buf);
Size itemsz;
UBTPageOpaqueInternal lpageop;
bool movedright = false;
bool pruned = false;
lpageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
itemsz = IndexTupleDSize(*newtup);
itemsz = MAXALIGN(itemsz);
* need to be consistent */
if (unlikely(itemsz > UBTMaxItemSize(page))) {
UBTreeCheckThirdPage<UBTPageOpaqueInternal>(rel, heapRel, itup_key->heapkeyspace, page, newtup);
}
Assert(P_ISLEAF(lpageop) && !P_INCOMPLETE_SPLIT(lpageop));
Assert(!itup_key->heapkeyspace || itup_key->scantid != NULL);
Assert(itup_key->heapkeyspace || itup_key->scantid == NULL);
Assert(itup_key->heapkeyspace);
* If we're inserting into a unique index, we may have to walk right
* through leaf pages to find the one leaf page that we must insert on
* to.
*
* This is needed for checkingunique callers because a scantid was not
* used when we called _bt_search(). scantid can only be set after
* _bt_check_unique() has checked for duplicates. The buffer
* initially stored in insertstate->buf has the page where the first
* duplicate key might be found, which isn't always the page that new
* tuple belongs on. The heap TID attribute for new tuple (scantid)
* could force us to insert on a sibling page, though that should be
* very rare in practice.
*/
if (checkingunique) {
while (!P_RIGHTMOST(lpageop) && UBTreeCompare(rel, itup_key, page, P_HIKEY, InvalidBuffer) > 0) {
Buffer rbuf = InvalidBuffer;
BlockNumber rblkno = lpageop->btpo_next;
for (;;) {
rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
page = BufferGetPage(rbuf);
lpageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
* If this page was incompletely split, finish the split now.
* We do this while holding a lock on the left sibling, which
* is not good because finishing the split could be a fairly
* lengthy operation. But this should happen very seldom.
*/
if (P_INCOMPLETE_SPLIT(lpageop)) {
UBTreeFinishSplit(rel, rbuf, stack);
rbuf = InvalidBuffer;
continue;
}
if (!P_IGNORE(lpageop))
break;
if (P_RIGHTMOST(lpageop))
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("fell off the end of index \"%s\" at blkno %u",
RelationGetRelationName(rel), rblkno)));
rblkno = lpageop->btpo_next;
}
_bt_relbuf(rel, buf);
buf = rbuf;
movedright = true;
pruned = false;
}
}
* If the target page is full, see if we can obtain enough space by:
* (a) pruning this page if we have multi-version info.
* (b) erasing LP_DEAD items.
*/
if (PageGetFreeSpace(page) < itemsz) {
if (P_ISLEAF(lpageop)) {
if (UBTreePagePruneOpt(rel, buf, false)) {
pruned = true;
}
}
}
* Now we are on the right page, so find the insert position. If we moved
* right at all, we know we should insert at the start of the page. If we
* didn't move right, we can use the firstlegaloff hint if the caller
* supplied one, unless we pruned the page which might have moved tuples
* around making the hint invalid.
*
* After we take TID as tie-breaker, tuples with the same key value need
* to be sorted by TID. Since firstlegaloff does not consider TID, this
* hint may not be accurate. We need to examine with _bt_compare() again,
* because TID is already included in itup_key now (after checking unique).
*/
*bufptr = buf;
if (movedright) {
* moving right means we may insert in the front of the page, but still
* need to check carefully when itup_key->heapkeyspace.
*/
firstlegaloff = P_FIRSTDATAKEY(lpageop);
}
bool prunedByPrepareXid = IndexPagePrepareForXid(rel, page, GetCurrentTransactionId(), RelationNeedsWAL(rel), buf);
pruned = pruned || prunedByPrepareXid;
OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
if (firstlegaloff != InvalidOffsetNumber && !pruned &&
(firstlegaloff > maxoff || UBTreeCompare(rel, itup_key, page, firstlegaloff, InvalidBuffer) <= 0)) {
return firstlegaloff;
}
return UBTreeBinarySearch(rel, itup_key, buf, true);
}
* UBTreeInsertOnPage() -- Insert a tuple on a particular page in the index.
*
* This recursive procedure does the following things
*
* On entry, we must have the correct buffer in which to do the
* insertion, and the buffer must be pinned and write-locked. On return,
* we will have dropped both the pin and the lock on the buffer.
*
* When inserting to a non-leaf page, 'cbuf' is the left-sibling of the
* page we're inserting the downlink for. This function will clear the
* INCOMPLETE_SPLIT flag on it, and release the buffer.
*
* The locking interactions in this code are critical. You should
* grok Lehman and Yao's paper before making any changes. In addition,
* you need to understand how we disambiguate duplicate keys in this
* implementation, in order to be able to find our location using
* L&Y "move right" operations. Since we may insert duplicate user
* keys, and since these dups may propagate up the tree, we use the
* 'afteritem' parameter to position ourselves correctly for the
* insertion on internal pages.
*/
static void UBTreeInsertOnPage(Relation rel, BTScanInsert itup_key, Buffer buf, Buffer cbuf, BTStack stack,
IndexTuple itup, OffsetNumber newitemoff, bool split_only_page)
{
Page page;
UBTPageOpaqueInternal lpageop;
OffsetNumber firstright = InvalidOffsetNumber;
Size itemsz;
page = BufferGetPage(buf);
lpageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
Assert(P_ISLEAF(lpageop) == !BufferIsValid(cbuf));
Assert(!P_ISLEAF(lpageop) ||
UBTreeTupleGetNAtts(itup, rel) ==
IndexRelationGetNumberOfAttributes(rel));
Assert(P_ISLEAF(lpageop) ||
UBTreeTupleGetNAtts(itup, rel) <=
IndexRelationGetNumberOfKeyAttributes(rel));
WHITEBOX_TEST_STUB("UBTreeInsertOnPage", WhiteboxDefaultErrorEmit);
* Every internal page should have exactly one negative infinity item at
* all times. Only _bt_split() and _bt_newroot() should add items that
* become negative infinity items through truncation, since they're the
* only routines that allocate new internal pages.
*/
Assert(P_ISLEAF(lpageop) || newitemoff > P_FIRSTDATAKEY(lpageop));
if (P_INCOMPLETE_SPLIT(lpageop)) {
elog(ERROR, "cannot insert to incompletely split page %u", BufferGetBlockNumber(buf));
}
itemsz = IndexTupleDSize(*itup);
itemsz = MAXALIGN(itemsz);
* Do we need to split the page to fit the item on it?
*
* Note: PageGetFreeSpace() subtracts sizeof(ItemIdData) from its result,
* so this comparison is correct even though we appear to be accounting
* only for the item and not for its line pointer.
*/
if (PageGetFreeSpace(page) < itemsz) {
bool is_root = P_ISROOT(lpageop) > 0 ? true : false;
bool is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(lpageop);
bool newitemonleft = false;
Buffer rbuf;
if (rel->rd_indexsplit == INDEXSPLIT_NO_INSERTPT) {
firstright = UBTreeFindsplitlocInsertpt(rel, buf, newitemoff, itemsz, &newitemonleft, itup);
} else {
firstright = UBTreeFindsplitloc(rel, buf, newitemoff, itemsz, &newitemonleft);
}
rbuf = UBTreeSplit(rel, buf, cbuf, firstright, newitemoff, itemsz, itup, newitemonleft, itup_key);
PredicateLockPageSplit(rel, BufferGetBlockNumber(buf), BufferGetBlockNumber(rbuf));
* By here,
*
* + our target page has been split;
* + the original tuple has been inserted;
* + we have write locks on both the old (left half)
* and new (right half) buffers, after the split; and
* + we know the key we want to insert into the parent
* (it's the "high key" on the left child page).
*
* We're ready to do the parent insertion. We need to hold onto the
* locks for the child pages until we locate the parent, but we can
* release them before doing the actual insertion (see Lehman and Yao
* for the reasoning).
* ----------
*/
UBTreeInsertParent(rel, buf, rbuf, stack, is_root, is_only);
UBTreeTryRecycleEmptyPage(rel);
} else {
Buffer metabuf = InvalidBuffer;
Page metapg = NULL;
BTMetaPageData *metad = NULL;
OffsetNumber itup_off;
BlockNumber itup_blkno;
itup_off = newitemoff;
itup_blkno = BufferGetBlockNumber(buf);
* If we are doing this insert because we split a page that was the
* only one on its tree level, but was not the root, it may have been
* the "fast root". We need to ensure that the fast root link points
* at or above the current page. We can safely acquire a lock on the
* metapage here --- see comments for _bt_newroot().
*/
if (split_only_page) {
Assert(!P_ISLEAF(lpageop));
metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
metapg = BufferGetPage(metabuf);
metad = BTPageGetMeta(metapg);
if (metad->btm_fastlevel >= lpageop->btpo.level) {
_bt_relbuf(rel, metabuf);
metabuf = InvalidBuffer;
}
}
START_CRIT_SECTION();
if (!UBTreePageAddTuple(page, itemsz, itup, newitemoff, true))
ereport(PANIC, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add new item to block %u in index \"%s\"",
itup_blkno, RelationGetRelationName(rel))));
lpageop->activeTupleCount++;
MarkBufferDirty(buf);
if (BufferIsValid(metabuf)) {
metad->btm_fastroot = itup_blkno;
rel->rd_rootcache = InvalidBuffer;
metad->btm_fastlevel = lpageop->btpo.level;
MarkBufferDirty(metabuf);
}
if (BufferIsValid(cbuf)) {
Page cpage = BufferGetPage(cbuf);
UBTPageOpaqueInternal cpageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(cpage);
Assert(P_INCOMPLETE_SPLIT(cpageop));
cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
MarkBufferDirty(cbuf);
}
if (RelationNeedsWAL(rel)) {
xl_btree_insert xlrec;
xl_btree_metadata_old xlmeta;
uint8 xlinfo;
XLogRecPtr recptr;
IndexTupleData trunctuple;
xlrec.offnum = itup_off;
XLogBeginInsert();
XLogRegisterData((char *)&xlrec, SizeOfBtreeInsert);
if (P_ISLEAF(lpageop)) {
xlinfo = XLOG_UBTREE_INSERT_LEAF;
* Cache the block information if we just inserted into the
* rightmost leaf page of the index.
*/
if (P_RIGHTMOST(lpageop)) {
RelationSetTargetBlock(rel, BufferGetBlockNumber(buf));
}
} else {
* Register the left child whose INCOMPLETE_SPLIT flag was
* cleared.
*/
XLogRegisterBuffer(1, cbuf, REGBUF_STANDARD);
xlinfo = XLOG_UBTREE_INSERT_UPPER;
}
if (BufferIsValid(metabuf)) {
xlmeta.root = metad->btm_root;
xlmeta.level = metad->btm_level;
xlmeta.fastroot = metad->btm_fastroot;
xlmeta.fastlevel = metad->btm_fastlevel;
XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT | REGBUF_STANDARD);
XLogRegisterBufData(2, (char *)&xlmeta, sizeof(xl_btree_metadata_old));
xlinfo = XLOG_UBTREE_INSERT_META;
}
XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
if (!P_ISLEAF(lpageop) && newitemoff == P_FIRSTDATAKEY(lpageop)) {
trunctuple = *itup;
trunctuple.t_info = sizeof(IndexTupleData);
XLogRegisterBufData(0, (char*)&trunctuple, sizeof(IndexTupleData));
} else {
ItemId iid = PageGetItemId(page, itup_off);
XLogRegisterBufData(0, (char*)PageGetItem(page, iid), itemsz);
}
recptr = XLogInsert(RM_UBTREE_ID, xlinfo);
if (BufferIsValid(metabuf)) {
PageSetLSN(metapg, recptr);
}
if (BufferIsValid(cbuf)) {
PageSetLSN(BufferGetPage(cbuf), recptr);
}
PageSetLSN(page, recptr);
}
END_CRIT_SECTION();
UBTreeVerify(rel, page, BufferGetBlockNumber(buf), itup_off, true);
if (BufferIsValid(metabuf)) {
_bt_relbuf(rel, metabuf);
}
if (BufferIsValid(cbuf)) {
_bt_relbuf(rel, cbuf);
}
_bt_relbuf(rel, buf);
}
}
* UBTreeSplit() -- split a page in the btree.
*
* On entry, buf is the page to split, and is pinned and write-locked.
* firstright is the item index of the first item to be moved to the
* new right page. newitemoff etc. tell us about the new item that
* must be inserted along with the data from the old page.
*
* itup_key is used for suffix truncation on leaf pages (internal
* page callers pass NULL). When splitting a non-leaf page, 'cbuf'
* is the left-sibling of the page we're inserting the downlink for.
* This function will clear the INCOMPLETE_SPLIT flag on it, and
* release the buffer.
*
* Returns the new right sibling of buf, pinned and write-locked.
* The pin and lock on buf are maintained.
*/
static Buffer UBTreeSplit(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstrightoff, OffsetNumber newitemoff,
Size newitemsz, IndexTuple newitem, bool newitemonleft, BTScanInsert itup_key)
{
Buffer rbuf;
Page origpage;
Page leftpage, rightpage;
BlockNumber origpagenumber, rightpagenumber;
UBTPageOpaqueInternal ropaque, lopaque, oopaque;
Buffer sbuf = InvalidBuffer;
Page spage = NULL;
UBTPageOpaqueInternal sopaque = NULL;
Size itemsz;
ItemId itemid;
IndexTuple item;
OffsetNumber leftoff, rightoff, newitemleftoff = InvalidOffsetNumber;
OffsetNumber maxoff;
OffsetNumber i;
bool isroot = false;
bool isleaf = false;
errno_t rc;
IndexTuple firstright, lefthighkey;
Buffer actualInsertBuf = InvalidBuffer;
OffsetNumber actualInsertOff = InvalidOffsetNumber;
WHITEBOX_TEST_STUB("UBTreeSplit", WhiteboxDefaultErrorEmit);
* Acquire a new page to split into.
* NOTE: after the page is absolutely used, call UBTreeRecordUsedPage()
* before we release the Exclusive lock.
*/
UBTRecycleQueueAddress addr;
rbuf = UBTreeGetNewPage(rel, &addr);
* origpage is the original page to be split. leftpage is a temporary
* buffer that receives the left-sibling data, which will be copied back
* into origpage on success. rightpage is the new page that receives the
* right-sibling data. If we fail before reaching the critical section,
* origpage hasn't been modified and leftpage is only workspace. In
* principle we shouldn't need to worry about rightpage either, because it
* hasn't been linked into the btree page structure; but to avoid leaving
* possibly-confusing junk behind, we are careful to rewrite rightpage as
* zeroes before throwing any error.
*/
origpage = BufferGetPage(buf);
leftpage = PageGetTempPage(origpage);
rightpage = BufferGetPage(rbuf);
origpagenumber = BufferGetBlockNumber(buf);
rightpagenumber = BufferGetBlockNumber(rbuf);
UBTreePageInit(leftpage, BufferGetPageSize(buf));
* Copy the original page's LSN into leftpage, which will become the
* updated version of the page. We need this because XLogInsert will
* examine the LSN and possibly dump it in a page image.
*/
PageSetLSN(leftpage, PageGetLSN(origpage));
oopaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(origpage);
lopaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(leftpage);
ropaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(rightpage);
isroot = P_ISROOT(oopaque) > 0 ? true : false;
isleaf = P_ISLEAF(oopaque) > 0 ? true : false;
lopaque->btpo_flags = oopaque->btpo_flags;
lopaque->btpo_flags &= ~(BTP_ROOT | BTP_SPLIT_END | BTP_HAS_GARBAGE);
ropaque->btpo_flags = lopaque->btpo_flags;
lopaque->btpo_flags |= BTP_INCOMPLETE_SPLIT;
lopaque->btpo_prev = oopaque->btpo_prev;
lopaque->btpo_next = rightpagenumber;
ropaque->btpo_prev = origpagenumber;
ropaque->btpo_next = oopaque->btpo_next;
lopaque->btpo.level = ropaque->btpo.level = oopaque->btpo.level;
lopaque->btpo_cycleid = _bt_vacuum_cycleid(rel);
ropaque->btpo_cycleid = lopaque->btpo_cycleid;
lopaque->last_delete_xid = oopaque->last_delete_xid;
ropaque->last_delete_xid = oopaque->last_delete_xid;
lopaque->xid_base = oopaque->xid_base;
ropaque->xid_base = oopaque->xid_base;
lopaque->activeTupleCount = 0;
ropaque->activeTupleCount = 0;
* If the page we're splitting is not the rightmost page at its level in
* the tree, then the first entry on the page is the high key for the
* page. We need to copy that to the right half. Otherwise (meaning the
* rightmost page case), all the items on the right half will be user
* data.
*/
rightoff = P_HIKEY;
if (!P_RIGHTMOST(oopaque)) {
itemid = PageGetItemId(origpage, P_HIKEY);
itemsz = ItemIdGetLength(itemid);
item = (IndexTuple)PageGetItem(origpage, itemid);
Assert(UBTreeTupleGetNAtts(item, rel) > 0);
Assert(UBTreeTupleGetNAtts(item, rel) <= IndexRelationGetNumberOfKeyAttributes(rel));
if (PageAddItem(rightpage, (Item)item, itemsz, rightoff, false, false) == InvalidOffsetNumber) {
rc = memset_s(rightpage, BLCKSZ, 0, BufferGetPageSize(rbuf));
securec_check(rc, "", "");
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add hikey to the right sibling while splitting block %u of index \"%s\"",
origpagenumber, RelationGetRelationName(rel))));
}
rightoff = OffsetNumberNext(rightoff);
}
* The "high key" for the new left page will be the first key that's going
* to go into the new right page, or possibly a truncated version if this
* is a leaf page split. This might be either the existing data item at
* position firstright, or the incoming tuple.
*
* The high key for the left page is formed using the first item on the
* right page, which may seem to be contrary to Lehman & Yao's approach of
* using the left page's last item as its new high key when splitting on
* the leaf level. It isn't, though: suffix truncation will leave the
* left page's high key fully equal to the last item on the left page when
* two tuples with equal key values (excluding heap TID) enclose the split
* point. It isn't actually necessary for a new leaf high key to be equal
* to the last item on the left for the L&Y "subtree" invariant to hold.
* It's sufficient to make sure that the new leaf high key is strictly
* less than the first item on the right leaf page, and greater than or
* equal to (not necessarily equal to) the last item on the left leaf
* page.
*
* In other words, when suffix truncation isn't possible, L&Y's exact
* approach to leaf splits is taken. (Actually, even that is slightly
* inaccurate. A tuple with all the keys from firstright but the heap TID
* from lastleft will be used as the new high key, since the last left
* tuple could be physically larger despite being opclass-equal in respect
* of all attributes prior to the heap TID attribute.)
*/
bool itup_extended = false;
if (!newitemonleft && newitemoff == firstrightoff) {
itemsz = newitemsz;
firstright = newitem;
* we have expanded the itup size by 8B in multi-version index to represent the
* space it actually occupies. When the itup is written to the page, the 8B is
* restored.
*
* if we choose newitem as firstright here, it may be copied as HIKEY. So we need
* to remind the 8B for this case.
*/
itup_extended = isleaf;
} else {
itemid = PageGetItemId(origpage, firstrightoff);
firstright = (IndexTuple)PageGetItem(origpage, itemid);
itemsz = IndexTupleSize(firstright);
}
* Truncate unneeded key and non-key attributes of the high key item
* before inserting it on the left page. This can only happen at the leaf
* level, since in general all pivot tuple values originate from leaf
* level high keys. A pivot tuple in a grandparent page must guide a
* search not only to the correct parent page, but also to the correct
* leaf page.
*/
if (isleaf) {
IndexTuple lastleft;
* Determine which tuple will become the last on the left page. This
* is needed to decide how many attributes from the first item on the
* right page must remain in new high key for left page.
*/
if (newitemonleft && newitemoff == firstrightoff) {
lastleft = newitem;
} else {
OffsetNumber lastleftoff;
lastleftoff = OffsetNumberPrev(firstrightoff);
Assert(lastleftoff >= P_FIRSTDATAKEY(oopaque));
itemid = PageGetItemId(origpage, lastleftoff);
lastleft = (IndexTuple) PageGetItem(origpage, itemid);
}
if (itup_key == NULL) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("itup_key for truncate is NULL")));
return InvalidBuffer;
}
lefthighkey = UBTreeTruncate(rel, lastleft, firstright, itup_key, itup_extended);
itemsz = IndexTupleSize(lefthighkey);
itemsz = MAXALIGN(itemsz);
} else {
* Don't perform suffix truncation on a copy of firstright to make
* left page high key for internal page splits. Must use firstright
* as new high key directly.
*
* Each distinct separator key value originates as a leaf level high
* key; all other separator keys/pivot tuples are copied from one
* level down. A separator key in a grandparent page must be
* identical to high key in rightmost parent page of the subtree to
* its left, which must itself be identical to high key in rightmost
* child page of that same subtree (this even applies to separator
* from grandparent's high key). There must always be an unbroken
* "seam" of identical separator keys that guide index scans at every
* level, starting from the grandparent. That's why suffix truncation
* is unsafe here.
*
* Internal page splits will truncate firstright into a "negative
* infinity" data item when it gets inserted on the new right page
* below, though. This happens during the call to _bt_pgaddtup() for
* the new first data item for right page. Do not confuse this
* mechanism with suffix truncation. It is just a convenient way of
* implementing page splits that split the internal page "inside"
* firstright. The lefthighkey separator key cannot appear a second
* time in the right page (only firstright's downlink goes in right
* page).
*/
lefthighkey = firstright;
}
* Add new high key to leftpage
*/
leftoff = P_HIKEY;
Assert(UBTreeTupleGetNAtts(lefthighkey, rel) > 0);
Assert(UBTreeTupleGetNAtts(lefthighkey, rel) <= IndexRelationGetNumberOfKeyAttributes(rel));
Assert(itemsz == MAXALIGN(IndexTupleSize(lefthighkey)));
if (PageAddItem(leftpage, (Item)lefthighkey, itemsz, leftoff, false, false) == InvalidOffsetNumber) {
rc = memset_s(rightpage, BLCKSZ, 0, BufferGetPageSize(rbuf));
securec_check(rc, "", "");
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add hikey to the left sibling while splitting block %u of index \"%s\"",
origpagenumber, RelationGetRelationName(rel))));
}
leftoff = OffsetNumberNext(leftoff);
if (lefthighkey != firstright) {
pfree(lefthighkey);
}
* Now transfer all the data items to the appropriate page.
*
* Note: we *must* insert at least the right page's items in item-number
* order, for the benefit of _bt_restore_page().
*/
maxoff = PageGetMaxOffsetNumber(origpage);
for (i = P_FIRSTDATAKEY(oopaque); i <= maxoff; i = OffsetNumberNext(i)) {
itemid = PageGetItemId(origpage, i);
itemsz = ItemIdGetLength(itemid);
item = (IndexTuple)PageGetItem(origpage, itemid);
if (i == newitemoff) {
Assert(actualInsertBuf == InvalidBuffer);
Assert(actualInsertOff == InvalidOffsetNumber);
if (newitemonleft) {
newitemleftoff = leftoff;
if (!UBTreePageAddTuple(leftpage, newitemsz, newitem, leftoff, true)) {
rc = memset_s(leftpage, BLCKSZ, 0, BufferGetPageSize(rbuf));
securec_check(rc, "", "");
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add new item to the left sibling while splitting block %u of index \"%s\"",
origpagenumber, RelationGetRelationName(rel))));
}
actualInsertBuf = buf;
actualInsertOff = leftoff;
leftoff = OffsetNumberNext(leftoff);
lopaque->activeTupleCount++;
} else {
if (!UBTreePageAddTuple(rightpage, newitemsz, newitem, rightoff, true)) {
rc = memset_s(rightpage, BLCKSZ, 0, BufferGetPageSize(rbuf));
securec_check(rc, "", "");
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add new item to the right sibling while splitting block %u of index \"%s\"",
origpagenumber, RelationGetRelationName(rel))));
}
actualInsertBuf = rbuf;
actualInsertOff = rightoff;
rightoff = OffsetNumberNext(rightoff);
ropaque->activeTupleCount++;
}
}
if (i < firstrightoff) {
if (!UBTreePageAddTuple(leftpage, itemsz, item, leftoff, false)) {
rc = memset_s(rightpage, BLCKSZ, 0, BufferGetPageSize(rbuf));
securec_check(rc, "", "");
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add old item to the left sibling while splitting block %u of index \"%s\"",
origpagenumber, RelationGetRelationName(rel))));
}
leftoff = OffsetNumberNext(leftoff);
if (isleaf && !TransactionIdIsValid(((UstoreIndexXid)UstoreIndexTupleGetXid(item))->xmax))
lopaque->activeTupleCount++;
} else {
if (!UBTreePageAddTuple(rightpage, itemsz, item, rightoff, false)) {
rc = memset_s(rightpage, BLCKSZ, 0, BufferGetPageSize(rbuf));
securec_check(rc, "", "");
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add old item to the right sibling while splitting block %u of index \"%s\"",
origpagenumber, RelationGetRelationName(rel))));
}
rightoff = OffsetNumberNext(rightoff);
if (isleaf && !TransactionIdIsValid(((UstoreIndexXid)UstoreIndexTupleGetXid(item))->xmax))
ropaque->activeTupleCount++;
}
}
if (i <= newitemoff) {
* Can't have newitemonleft here; that would imply we were told to put
* *everything* on the left page, which cannot fit (if it could, we'd
* not be splitting the page).
*/
Assert(!newitemonleft);
if (!UBTreePageAddTuple(rightpage, newitemsz, newitem, rightoff, true)) {
rc = memset_s(rightpage, BLCKSZ, 0, BufferGetPageSize(rbuf));
securec_check(rc, "", "");
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add new item to the right sibling while splitting block %u of index \"%s\"",
origpagenumber, RelationGetRelationName(rel))));
}
Assert(actualInsertBuf == InvalidBuffer);
Assert(actualInsertOff == InvalidOffsetNumber);
actualInsertBuf = rbuf;
actualInsertOff = rightoff;
rightoff = OffsetNumberNext(rightoff);
ropaque->activeTupleCount++;
}
* We have to grab the right sibling (if any) and fix the prev pointer
* there. We are guaranteed that this is deadlock-free since no other
* writer will be holding a lock on that page and trying to move left, and
* all readers release locks on a page before trying to fetch its
* neighbors.
*/
if (!P_RIGHTMOST(oopaque)) {
sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
spage = BufferGetPage(sbuf);
sopaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(spage);
if (sopaque->btpo_prev != origpagenumber) {
rc = memset_s(rightpage, BLCKSZ, 0, BufferGetPageSize(rbuf));
securec_check(rc, "", "");
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("right sibling's left-link doesn't match: block %u links to %u instead of expected %u in "
"index \"%s\"",
oopaque->btpo_next, sopaque->btpo_prev, origpagenumber, RelationGetRelationName(rel))));
}
* Check to see if we can set the SPLIT_END flag in the right-hand
* split page; this can save some I/O for vacuum since it need not
* proceed to the right sibling. We can set the flag if the right
* sibling has a different cycleid: that means it could not be part of
* a group of pages that were all split off from the same ancestor
* page. If you're confused, imagine that page A splits to A B and
* then again, yielding A C B, while vacuum is in progress. Tuples
* originally in A could now be in either B or C, hence vacuum must
* examine both pages. But if D, our right sibling, has a different
* cycleid then it could not contain any tuples that were in A when
* the vacuum started.
*/
if (sopaque->btpo_cycleid != ropaque->btpo_cycleid)
ropaque->btpo_flags |= BTP_SPLIT_END;
}
* Right sibling is locked, new siblings are prepared, but original page
* is not updated yet.
*
* NO EREPORT(ERROR) till right sibling is updated. We can get away with
* not starting the critical section till here because we haven't been
* scribbling on the original page yet; see comments above.
*/
START_CRIT_SECTION();
* By here, the original data page has been split into two new halves, and
* these are correct. The algorithm requires that the left page never
* move during a split, so we copy the new left page back on top of the
* original. Note that this is not a waste of time, since we also require
* (in the page management code) that the center of a page always be
* clean, and the most efficient way to guarantee this is just to compact
* the data by reinserting it into a new left page. (XXX the latter
* comment is probably obsolete; but in any case it's good to not scribble
* on the original page until we enter the critical section.)
*
* We need to do this before writing the WAL record, so that XLogInsert
* can WAL log an image of the page if necessary.
*/
PageRestoreTempPage(leftpage, origpage);
MarkBufferDirty(buf);
MarkBufferDirty(rbuf);
if (!P_RIGHTMOST(ropaque)) {
sopaque->btpo_prev = rightpagenumber;
MarkBufferDirty(sbuf);
}
* Clear INCOMPLETE_SPLIT flag on child if inserting the new item finishes
* a split.
*/
if (!isleaf) {
Page cpage = BufferGetPage(cbuf);
UBTPageOpaqueInternal cpageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(cpage);
cpageop->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
MarkBufferDirty(cbuf);
}
if (RelationNeedsWAL(rel)) {
xl_ubtree_split xlrec;
uint8 xlinfo;
XLogRecPtr recptr;
xlrec.level = ropaque->btpo.level;
xlrec.firstright = firstrightoff;
xlrec.newitemoff = newitemoff;
xlrec.opaqueVersion = UBTREE_OPAQUE_VERSION_RCR;
xlrec.lopaque = *(UBTPageOpaqueInternal)PageGetSpecialPointer(origpage);
xlrec.ropaque = *ropaque;
XLogBeginInsert();
XLogRegisterData((char *)&xlrec, SizeOfUBtreeSplit);
XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
XLogRegisterBuffer(1, rbuf, REGBUF_WILL_INIT);
if (!P_RIGHTMOST(ropaque)) {
XLogRegisterBuffer(2, sbuf, REGBUF_STANDARD);
}
if (BufferIsValid(cbuf)) {
XLogRegisterBuffer(3, cbuf, REGBUF_STANDARD);
}
* Log the new item, if it was inserted on the left page. (If it was
* put on the right page, we don't need to explicitly WAL log it
* because it's included with all the other items on the right page.)
* Show the new item as belonging to the left page buffer, so that it
* is not stored if XLogInsert decides it needs a full-page image of
* the left page. We store the offset anyway, though, to support
* archive compression of these records.
*/
if (newitemonleft) {
Assert(newitemleftoff != InvalidOffsetNumber);
ItemId iid = PageGetItemId(origpage, newitemleftoff);
IndexTuple itup = (IndexTuple)PageGetItem(origpage, iid);
XLogRegisterBufData(0, (char*)itup, ItemIdGetLength(iid));
}
itemid = PageGetItemId(origpage, P_HIKEY);
item = (IndexTuple)PageGetItem(origpage, itemid);
XLogRegisterBufData(0, (char*)item, MAXALIGN(IndexTupleSize(item)));
* Log the contents of the right page in the format understood by
* _bt_restore_page(). We set lastrdata->buffer to InvalidBuffer,
* because we're going to recreate the whole page anyway, so it should
* never be stored by XLogInsert.
*
* Direct access to page is not good but faster - we should implement
* some new func in page API. Note we only store the tuples
* themselves, knowing that they were inserted in item-number order
* and so the item pointers can be reconstructed.
* See comments for _bt_restore_page().
*/
XLogRegisterBufData(1, (char *)rightpage + ((PageHeader)rightpage)->pd_upper,
((PageHeader)rightpage)->pd_special - ((PageHeader)rightpage)->pd_upper);
if (isroot) {
xlinfo = newitemonleft ? XLOG_UBTREE_SPLIT_L_ROOT : XLOG_UBTREE_SPLIT_R_ROOT;
} else {
xlinfo = newitemonleft ? XLOG_UBTREE_SPLIT_L : XLOG_UBTREE_SPLIT_R;
}
recptr = XLogInsert(RM_UBTREE_ID, xlinfo | BTREE_SPLIT_OPAQUE_FLAG);
PageSetLSN(origpage, recptr);
PageSetLSN(rightpage, recptr);
if (!P_RIGHTMOST(ropaque)) {
PageSetLSN(spage, recptr);
}
if (!isleaf) {
PageSetLSN(BufferGetPage(cbuf), recptr);
}
}
END_CRIT_SECTION();
Page page = BufferGetPage(actualInsertBuf);
UBTreeVerify(rel, page, BufferGetBlockNumber(actualInsertBuf), actualInsertOff, true);
UBTreeRecordUsedPage(rel, addr);
if (!P_RIGHTMOST(ropaque)) {
_bt_relbuf(rel, sbuf);
}
if (!isleaf) {
_bt_relbuf(rel, cbuf);
}
return rbuf;
}
void ReportFailedToDelete(Relation rel, Buffer buf, IndexTuple itup, char* extraInfo)
{
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
index_deform_tuple(itup, RelationGetDescr(rel), values, isnull);
char *key_desc = BuildIndexValueDescription(rel, values, isnull);
Oid partOid = InvalidOid;
if (RelationIsGlobalIndex(rel)) {
bool ignore = false;
AttrNumber partitionOidAttr = IndexRelationGetNumberOfAttributes(rel);
TupleDesc tupdesc = RelationGetDescr(rel);
partOid = DatumGetUInt32(index_getattr(itup, partitionOidAttr, tupdesc, &ignore));
}
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("xid %lu failed to delete index tuple %s in \"%s\" with TID (%u,%u) partOid %u. "
"latest fetched index block is %u. extraInfo: \"%s\"",
GetCurrentTransactionId(),
key_desc ? key_desc : "(Unknown)",
RelationGetRelationName(rel),
ItemPointerGetBlockNumber(&itup->t_tid),
ItemPointerGetOffsetNumber(&itup->t_tid),
partOid, BufferGetBlockNumber(buf),
extraInfo)));
}
* UBTreeDoDelete() -- Handle deletion of a single index tuple in the tree.
*/
bool UBTreeDoDelete(Relation rel, IndexTuple itup, bool isRollbackIndex)
{
bool found = true;
BTScanInsert itupKey;
Buffer buf;
volatile Page page = NULL;
OffsetNumber offset;
itupKey = UBTreeMakeScanKey(rel, itup);
(void)UBTreeSearch(rel, itupKey, &buf, BT_WRITE, false);
* STEP 2: look for the exact itup that needs to be deleted.
* Note: 1. the key, tid and partOid of the to-be-delete tuple must exactly matches.
* 2. we may need to try again because we prune the page when preparing xid-base.
*/
bool retried = false;
while (true) {
offset = UBTreeBinarySearch(rel, itupKey, buf, true);
WHITEBOX_TEST_STUB("UBTreeDoDelete-found", WhiteboxDefaultErrorEmit);
offset = UBTreeFindDeleteLoc(rel, &buf, offset, itupKey, itup);
page = BufferGetPage(buf);
if (offset == InvalidOffsetNumber) {
ReportFailedToDelete(rel, buf, itup, "can't find it");
}
bool pruned = IndexPagePrepareForXid(rel, page, GetCurrentTransactionId(), RelationNeedsWAL(rel), buf);
if (!pruned) {
break;
} else if (!retried) {
retried = true;
} else {
ReportFailedToDelete(rel, buf, itup, "can't fit xid-base");
}
}
UBTreeDeleteOnPage(rel, buf, offset, isRollbackIndex);
pfree(itupKey);
return found;
}
* UBTreeFindDeleteLoc() -- Finds the location of the given tuple
*
* We consider the location is right when we found a tuple whose t_tid equals
* the target, and the xmax is invalid (or aborted).
*
* Currently, we find the first tuple with the same key by _bt_binsrch(),
* but we need to go through all the equal tuples to find the tuple with
* same t_tid. This can be optimized by sort by key and t_tid when inserting
* and comparing.
*
* If we found a tuple whose xmin is aborted, we will mark it as dead.
*/
static OffsetNumber UBTreeFindDeleteLoc(Relation rel, Buffer* bufP, OffsetNumber offset,
BTScanInsert itup_key, IndexTuple itup)
{
OffsetNumber maxoff;
Page page;
UBTPageOpaqueInternal opaque;
TransactionId xmin, xmax;
WHITEBOX_TEST_STUB("UBTreeFindDeleteLoc", WhiteboxDefaultErrorEmit);
page = BufferGetPage(*bufP);
opaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
AttrNumber partitionOidAttr = IndexRelationGetNumberOfAttributes(rel);
TupleDesc tupdesc = RelationGetDescr(rel);
Oid partOid = InvalidOid;
bool isnull = false;
if (RelationIsGlobalIndex(rel)) {
partOid = DatumGetUInt32(index_getattr(itup, partitionOidAttr, tupdesc, &isnull));
Assert(!isnull);
}
* Scan over all equal tuples, looking for itup.
*/
for (;;) {
ItemId curitemid;
IndexTuple curitup;
BlockNumber nblkno;
if (offset <= maxoff) {
curitemid = PageGetItemId(page, offset);
if (!ItemIdIsDead(curitemid)) {
* _bt_compare returns 0 for (1,NULL) and (1,NULL) - this's
* how we handling NULLs - and so we must not use _bt_compare
* in real comparison, but only for ordering/finding items on
* pages. - vadim 03/24/97
*/
if (!UBTreeIsEqual<UBTPageOpaqueInternal>(rel, page, offset, itup_key->keysz, itup_key->scankeys))
return InvalidOffsetNumber;
curitup = (IndexTuple)PageGetItem(page, curitemid);
bool isDead = false;
bool xminCommitted = false;
bool xmaxCommitted = false;
isDead = UBTreeItupGetXminXmax(page, offset, InvalidTransactionId,
&xmin, &xmax, &xminCommitted, &xmaxCommitted,
RelationGetNamespace(rel) == PG_TOAST_NAMESPACE);
if (!isDead && ItemPointerEquals(&itup->t_tid, &curitup->t_tid) && !TransactionIdIsValid(xmax)) {
if (RelationIsGlobalIndex(rel)) {
Oid curPartOid = DatumGetUInt32(index_getattr(curitup, partitionOidAttr, tupdesc, &isnull));
Assert(!isnull);
if (partOid == curPartOid)
return offset;
} else {
return offset;
}
}
}
}
* Advance to next tuple to continue checking.
*/
if (offset < maxoff)
offset = OffsetNumberNext(offset);
else {
int highkeycmp;
if (P_RIGHTMOST(opaque))
break;
highkeycmp = UBTreeCompare(rel, itup_key, page, P_HIKEY, InvalidBuffer);
Assert(highkeycmp <= 0);
if (highkeycmp != 0) {
break;
}
for (;;) {
nblkno = opaque->btpo_next;
*bufP = _bt_relandgetbuf(rel, *bufP, nblkno, BT_WRITE);
page = BufferGetPage(*bufP);
opaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
if (!P_IGNORE(opaque))
break;
if (P_RIGHTMOST(opaque))
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("fell off the end of index \"%s\" at blkno %u",
RelationGetRelationName(rel), nblkno)));
}
maxoff = PageGetMaxOffsetNumber(page);
offset = P_FIRSTDATAKEY(opaque);
}
}
return InvalidOffsetNumber;
}
* UBTreePageAddTuple() -- add a tuple to a particular page in the index.
*
* This routine adds the tuple to the page as requested. It does
* not affect pin/lock status, but you'd better have a write lock
* and pin on the target buffer! Don't forget to write and release
* the buffer afterwards, either.
*
* The main difference between this routine and a bare PageAddItem call
* is that this code knows that the leftmost index tuple on a non-leaf
* btree page doesn't need to have a key. Therefore, it strips such
* tuples down to just the tuple header. CAUTION: this works ONLY if
* we insert the tuples in order, so that the given itup_off does
* represent the final position of the tuple!
*/
static bool UBTreePageAddTuple(Page page, Size itemsize, IndexTuple itup, OffsetNumber itup_off, bool isnew)
{
UBTPageOpaqueInternal opaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
IndexTupleData trunctuple;
if (!P_ISLEAF(opaque) && itup_off == P_FIRSTDATAKEY(opaque)) {
trunctuple = *itup;
trunctuple.t_info = sizeof(IndexTupleData);
UBTreeTupleSetNAtts(&trunctuple, 0, false);
itup = &trunctuple;
itemsize = sizeof(IndexTupleData);
}
if (!(isnew && P_ISLEAF(opaque))) {
* needed */
if (PageAddItem(page, (Item)itup, itemsize, itup_off, false, false) == InvalidOffsetNumber)
return false;
} else {
Size newsize = itemsize - sizeof(ShortTransactionId) * 2;
IndexTupleSetSize(itup, newsize);
* New inserted tuple need mark xmin, except HIKEY.
* We assume itemsize included the reserved space for xmin/xmax.
*/
bool needXid = (itup_off >= P_FIRSTDATAKEY(opaque));
((PageHeader)page)->pd_upper -= sizeof(ShortTransactionId) * 2;
UstoreIndexXid uxid = (UstoreIndexXid)(((char*)page) + ((PageHeader)page)->pd_upper);
TransactionId xmin = needXid ? GetCurrentTransactionId() : FrozenTransactionId;
uxid->xmin = NormalTransactionIdToShort(opaque->xid_base, xmin);
uxid->xmax = InvalidTransactionId;
if (PageAddItem(page, (Item)itup, newsize, itup_off, false, false) == InvalidOffsetNumber) {
((PageHeader)page)->pd_upper += sizeof(ShortTransactionId) * 2;
return false;
}
ItemId iid = PageGetItemId(page, itup_off);
ItemIdSetNormal(iid, ((PageHeader)page)->pd_upper, itemsize);
}
return true;
}
* UBTreeDeleteOnPage() -- Delete a tuple on a particular page in the index.
*
* If the deletion deleted the last tuple in the page, the page may
* become empty after the last transaction committed. So we record it as
* empty page for further page recycle logic.
*/
static void UBTreeDeleteOnPage(Relation rel, Buffer buf, OffsetNumber offset, bool isRollbackIndex)
{
WHITEBOX_TEST_STUB("UBTreeDeleteOnPage", WhiteboxDefaultErrorEmit);
Page page = BufferGetPage(buf);
UBTPageOpaqueInternal opaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
ItemId iid = PageGetItemId(page, offset);
IndexTuple itup = (IndexTuple)PageGetItem(page, iid);
UstoreIndexXid uxid = (UstoreIndexXid)UstoreIndexTupleGetXid(itup);
TransactionId xid = GetCurrentTransactionId();
START_CRIT_SECTION();
uxid->xmax = NormalTransactionIdToShort(opaque->xid_base, xid);
if (isRollbackIndex) {
* Rollback before upsert statement retry.
* We need to mark this tuple as dead to avoid possible deadlock in the future unique check.
*/
ItemIdMarkDead(iid);
}
opaque->activeTupleCount--;
if (TransactionIdPrecedes(opaque->last_delete_xid, xid))
opaque->last_delete_xid = xid;
MarkBufferDirty(buf);
if (RelationNeedsWAL(rel)) {
xl_ubtree_mark_delete xlrec;
XLogRecPtr recptr;
xlrec.xid = xid;
xlrec.offset = offset;
XLogBeginInsert();
XLogRegisterData((char*)&xlrec, SizeOfUBTreeMarkDelete);
XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
recptr = XLogInsert(RM_UBTREE_ID, XLOG_UBTREE_MARK_DELETE);
PageSetLSN(page, recptr);
}
END_CRIT_SECTION();
UBTreeVerify(rel, page, BufferGetBlockNumber(buf), offset);
bool needRecordEmpty = (opaque->activeTupleCount == 0);
if (needRecordEmpty) {
* This delete operation deleted the last tuple in the page, This page is likely
* to be empty later, so record this empty hint into a queue.
*/
UBTreeRecordEmptyPage(rel, BufferGetBlockNumber(buf), opaque->last_delete_xid);
_bt_relbuf(rel, buf);
UBTreeTryRecycleEmptyPage(rel);
} else {
_bt_relbuf(rel, buf);
}
}
* UBTreeInsertParent() -- Insert downlink into parent after a page split.
*
* On entry, buf and rbuf are the left and right split pages, which we
* still hold write locks on per the L&Y algorithm. We release the
* write locks once we have write lock on the parent page. (Any sooner,
* and it'd be possible for some other process to try to split or delete
* one of these pages, and get confused because it cannot find the downlink.)
*
* stack - stack showing how we got here. May be NULL in cases that don't
* have to be efficient (concurrent ROOT split, WAL recovery)
* is_root - we split the true root
* is_only - we split a page alone on its level (might have been fast root)
*/
void UBTreeInsertParent(Relation rel, Buffer buf, Buffer rbuf, BTStack stack, bool is_root, bool is_only)
{
* Here we have to do something Lehman and Yao don't talk about: deal with
* a root split and construction of a new root. If our stack is empty
* then we have just split a node on what had been the root level when we
* descended the tree. If it was still the root then we perform a
* new-root construction. If it *wasn't* the root anymore, search to find
* the next higher level that someone constructed meanwhile, and find the
* right place to insert as for the normal case.
*
* If we have to search for the parent level, we do so by re-descending
* from the root. This is not super-efficient, but it's rare enough not
* to matter. (This path is also taken when called from WAL recovery ---
* we have no stack in that case.)
*/
if (is_root) {
Buffer rootbuf;
Assert(stack == NULL);
Assert(is_only);
rootbuf = UBTreeNewRoot(rel, buf, rbuf);
_bt_relbuf(rel, rootbuf);
_bt_relbuf(rel, rbuf);
_bt_relbuf(rel, buf);
} else {
BlockNumber bknum = BufferGetBlockNumber(buf);
BlockNumber rbknum = BufferGetBlockNumber(rbuf);
Page page = BufferGetPage(buf);
IndexTuple new_item;
BTStackData fakestack;
IndexTuple ritem;
Buffer pbuf;
if (stack == NULL) {
UBTPageOpaqueInternal lpageop;
if (!t_thrd.xlog_cxt.InRecovery) {
ereport(DEBUG2, (errmsg("concurrent ROOT page split")));
}
lpageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
pbuf = UBTreeGetEndPoint(rel, lpageop->btpo.level + 1, false);
stack = &fakestack;
stack->bts_blkno = BufferGetBlockNumber(pbuf);
stack->bts_offset = InvalidOffsetNumber;
stack->bts_btentry = InvalidBlockNumber;
stack->bts_parent = NULL;
_bt_relbuf(rel, pbuf);
}
ritem = (IndexTuple)PageGetItem(page, PageGetItemId(page, P_HIKEY));
* assure that memory is properly allocated, prevent from missing log of insert parent */
START_CRIT_SECTION();
new_item = CopyIndexTuple(ritem);
UBTreeTupleSetDownLink(new_item, rbknum);
END_CRIT_SECTION();
* Find the parent buffer and get the parent page.
*
* Oops - if we were moved right then we need to change stack item! We
* want to find parent pointing to where we are, right ? - vadim
* 05/27/97
*/
stack->bts_btentry = bknum;
pbuf = UBTreeGetStackBuf(rel, stack);
* by _bt_insertonpg()
*/
_bt_relbuf(rel, rbuf);
if (pbuf == InvalidBuffer) {
XLogWaitFlush(t_thrd.xlog_cxt.LogwrtResult->Write);
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to re-find parent key in index \"%s\" for split pages %u/%u",
RelationGetRelationName(rel), bknum, rbknum)));
}
UBTreeInsertOnPage(rel, NULL, pbuf, buf, stack->bts_parent, new_item, stack->bts_offset + 1, is_only);
pfree(new_item);
}
}
* UBTreeNewRoot() -- Create a new root page for the index.
*
* We've just split the old root page and need to create a new one.
* In order to do this, we add a new root page to the file, then lock
* the metadata page and update it. This is guaranteed to be deadlock-
* free, because all readers release their locks on the metadata page
* before trying to lock the root, and all writers lock the root before
* trying to lock the metadata page. We have a write lock on the old
* root page, so we have not introduced any cycles into the waits-for
* graph.
*
* On entry, lbuf (the old root) and rbuf (its new peer) are write-
* locked. On exit, a new root page exists with entries for the
* two new children, metapage is updated and unlocked/unpinned.
* The new root buffer is returned to caller which has to unlock/unpin
* lbuf, rbuf & rootbuf.
*/
static Buffer UBTreeNewRoot(Relation rel, Buffer lbuf, Buffer rbuf)
{
Buffer rootbuf;
Page lpage, rootpage;
BlockNumber lbkno, rbkno;
BlockNumber rootblknum;
UBTPageOpaqueInternal rootopaque;
UBTPageOpaqueInternal lopaque;
ItemId itemid;
IndexTuple item;
IndexTuple left_item;
Size left_item_sz;
IndexTuple right_item;
Size right_item_sz;
Buffer metabuf;
Page metapg;
BTMetaPageData *metad = NULL;
lbkno = BufferGetBlockNumber(lbuf);
rbkno = BufferGetBlockNumber(rbuf);
lpage = BufferGetPage(lbuf);
lopaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(lpage);
* We have split the old root page and need to create a new one.
* Here we can not have any ERROR before the new root is created,
* and the metapage is update.
*
* If we meet the ERROR here, the old root has been split into two
* pages, and the new root is failed to create. In this situation, the B+
* tree is without the root page, and the metadata is wrong.
*
* 1. root 1 splits to 1 and 2, block 1's root flag has changed.
* 2. when we get in this function and haven't updated metapage's
* rootpointer. ERROR here and we release our LWlock on 1 and 2.
* 3. we continue insert into 1 and block 1 splits again. we use outdated
* metapage to find 'trueroot' . deaklock.
*
* Don't worry about incomplete btree pages as a result of PANIC here,
* because we can complete this incomplete action in btree_xlog_cleanup
* after redo our xlog.
*/
START_CRIT_SECTION();
* get a new root page
* NOTE: after the page is absolutely used, call UBTreeRecordUsedPage()
* before we release the Exclusive lock.
*/
UBTRecycleQueueAddress addr;
rootbuf = UBTreeGetNewPage(rel, &addr);
rootpage = BufferGetPage(rootbuf);
rootblknum = BufferGetBlockNumber(rootbuf);
metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
metapg = BufferGetPage(metabuf);
metad = BTPageGetMeta(metapg);
* Create downlink item for left page (old root). Since this will be the
* first item in a non-leaf page, it implicitly has minus-infinity key
* value, so we need not store any actual key in it.
*/
left_item_sz = sizeof(IndexTupleData);
left_item = (IndexTuple)palloc(left_item_sz);
left_item->t_info = (unsigned short)left_item_sz;
UBTreeTupleSetDownLink(left_item, lbkno);
UBTreeTupleSetNAtts(left_item, 0, false);
* Create downlink item for right page. The key for it is obtained from
* the "high key" position in the left page.
*/
itemid = PageGetItemId(lpage, P_HIKEY);
item = (IndexTuple)PageGetItem(lpage, itemid);
right_item = CopyIndexTuple(item);
UBTreeTupleSetDownLink(right_item, rbkno);
right_item_sz = IndexTupleSize(right_item);
rootopaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(rootpage);
rootopaque->btpo_prev = rootopaque->btpo_next = P_NONE;
rootopaque->btpo_flags = BTP_ROOT;
rootopaque->btpo.level = ((UBTPageOpaqueInternal)PageGetSpecialPointer(lpage))->btpo.level + 1;
rootopaque->btpo_cycleid = 0;
metad->btm_root = rootblknum;
metad->btm_level = rootopaque->btpo.level;
metad->btm_fastroot = rootblknum;
metad->btm_fastlevel = rootopaque->btpo.level;
rel->rd_rootcache = InvalidBuffer;
* Insert the left page pointer into the new root page. The root page is
* the rightmost page on its level so there is no "high key" in it; the
* two items will go into positions P_HIKEY and P_FIRSTKEY.
*
* Note: we *must* insert the two items in item-number order, for the
* benefit of _bt_restore_page().
*/
if (PageAddItem(rootpage, (Item)left_item, left_item_sz, P_HIKEY, false, false) == InvalidOffsetNumber)
ereport(PANIC, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add leftkey to new root page while splitting block %u of index \"%s\"",
BufferGetBlockNumber(lbuf), RelationGetRelationName(rel))));
* insert the right page pointer into the new root page.
*/
if (PageAddItem(rootpage, (Item)right_item, right_item_sz, P_FIRSTKEY, false, false) == InvalidOffsetNumber)
ereport(PANIC, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add rightkey to new root page while splitting block %u of index \"%s\"",
BufferGetBlockNumber(lbuf), RelationGetRelationName(rel))));
Assert(P_INCOMPLETE_SPLIT(lopaque));
lopaque->btpo_flags &= ~BTP_INCOMPLETE_SPLIT;
MarkBufferDirty(lbuf);
MarkBufferDirty(rootbuf);
MarkBufferDirty(metabuf);
if (RelationNeedsWAL(rel)) {
xl_btree_newroot xlrec;
XLogRecPtr recptr;
xl_btree_metadata_old md;
xlrec.rootblk = rootblknum;
xlrec.level = metad->btm_level;
Assert(xlrec.level > 0);
XLogBeginInsert();
XLogRegisterData((char *)&xlrec, SizeOfBtreeNewroot);
XLogRegisterBuffer(0, rootbuf, REGBUF_WILL_INIT);
XLogRegisterBuffer(1, lbuf, REGBUF_STANDARD);
XLogRegisterBuffer(2, metabuf, REGBUF_WILL_INIT | REGBUF_STANDARD);
md.root = rootblknum;
md.level = metad->btm_level;
md.fastroot = rootblknum;
md.fastlevel = metad->btm_level;
XLogRegisterBufData(2, (char *)&md, sizeof(xl_btree_metadata_old));
* Direct access to page is not good but faster - we should implement
* some new func in page API.
*/
XLogRegisterBufData(0, (char *)rootpage + ((PageHeader)rootpage)->pd_upper,
((PageHeader)rootpage)->pd_special - ((PageHeader)rootpage)->pd_upper);
recptr = XLogInsert(RM_UBTREE_ID, XLOG_UBTREE_NEWROOT);
PageSetLSN(lpage, recptr);
PageSetLSN(rootpage, recptr);
PageSetLSN(metapg, recptr);
}
END_CRIT_SECTION();
UBTreeVerify(rel, rootpage, rootblknum);
_bt_relbuf(rel, metabuf);
UBTreeRecordUsedPage(rel, addr);
pfree(left_item);
pfree(right_item);
return rootbuf;
}
* UBTreeFinishSplit() -- Finish an incomplete split
*
* A crash or other failure can leave a split incomplete. The insertion
* routines won't allow to insert on a page that is incompletely split.
* Before inserting on such a page, call _bt_finish_split().
*
* On entry, 'lbuf' must be locked in write-mode. On exit, it is unlocked
* and unpinned.
*/
void UBTreeFinishSplit(Relation rel, Buffer lbuf, BTStack stack)
{
Page lpage = BufferGetPage(lbuf);
UBTPageOpaqueInternal lpageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(lpage);
Buffer rbuf;
Page rpage;
UBTPageOpaqueInternal rpageop;
bool wasRoot = false;
Assert(P_INCOMPLETE_SPLIT(lpageop));
rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
rpage = BufferGetPage(rbuf);
rpageop = (UBTPageOpaqueInternal)PageGetSpecialPointer(rpage);
if (!stack) {
Buffer metabuf;
Page metapg;
BTMetaPageData *metad;
metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
metapg = BufferGetPage(metabuf);
metad = BTPageGetMeta(metapg);
wasRoot = (metad->btm_root == BufferGetBlockNumber(lbuf));
_bt_relbuf(rel, metabuf);
} else {
wasRoot = false;
}
bool wasOnly = (P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop));
elog(DEBUG1, "finishing incomplete split of %u/%u", BufferGetBlockNumber(lbuf), BufferGetBlockNumber(rbuf));
UBTreeInsertParent(rel, lbuf, rbuf, stack, wasRoot, wasOnly);
}
* UBTreeGetStackBuf() -- Walk back up the tree one step, and find the item
* we last looked at in the parent.
*
* This is possible because we save the downlink from the parent item,
* which is enough to uniquely identify it. Insertions into the parent
* level could cause the item to move right; deletions could cause it
* to move left, but not left of the page we previously found it in.
*
* Adjusts bts_blkno & bts_offset if changed.
*
* Returns InvalidBuffer if item not found (should not happen).
*/
Buffer UBTreeGetStackBuf(Relation rel, BTStack stack)
{
BlockNumber blkno;
OffsetNumber start;
blkno = stack->bts_blkno;
start = stack->bts_offset;
t_thrd.storage_cxt.is_btree_split = true;
for (;;) {
Buffer buf;
Page page;
UBTPageOpaqueInternal opaque;
buf = _bt_getbuf(rel, blkno, BT_WRITE);
page = BufferGetPage(buf);
opaque = (UBTPageOpaqueInternal)PageGetSpecialPointer(page);
if (P_INCOMPLETE_SPLIT(opaque)) {
UBTreeFinishSplit(rel, buf, stack->bts_parent);
continue;
}
if (!P_IGNORE(opaque)) {
OffsetNumber offnum, minoff, maxoff;
ItemId itemid;
IndexTuple item;
minoff = P_FIRSTDATAKEY(opaque);
maxoff = PageGetMaxOffsetNumber(page);
* start = InvalidOffsetNumber means "search the whole page". We
* need this test anyway due to possibility that page has a high
* key now when it didn't before.
*/
if (start < minoff)
start = minoff;
* Need this check too, to guard against possibility that page
* split since we visited it originally.
*/
if (start > maxoff)
start = OffsetNumberNext(maxoff);
* These loops will check every item on the page --- but in an
* order that's attuned to the probability of where it actually
* is. Scan to the right first, then to the left.
*/
for (offnum = start; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) {
itemid = PageGetItemId(page, offnum);
item = (IndexTuple)PageGetItem(page, itemid);
bool getRightItem = false;
getRightItem = BTreeInnerTupleGetDownLink(item) == stack->bts_btentry;
if (getRightItem) {
stack->bts_blkno = blkno;
stack->bts_offset = offnum;
t_thrd.storage_cxt.is_btree_split = false;
return buf;
}
}
for (offnum = OffsetNumberPrev(start); offnum >= minoff; offnum = OffsetNumberPrev(offnum)) {
itemid = PageGetItemId(page, offnum);
item = (IndexTuple)PageGetItem(page, itemid);
bool getRightItem = false;
getRightItem = BTreeInnerTupleGetDownLink(item) == stack->bts_btentry;
if (getRightItem) {
stack->bts_blkno = blkno;
stack->bts_offset = offnum;
t_thrd.storage_cxt.is_btree_split = false;
return buf;
}
}
}
* The item we're looking for moved right at least one page.
*/
if (P_RIGHTMOST(opaque)) {
_bt_relbuf(rel, buf);
t_thrd.storage_cxt.is_btree_split = false;
return InvalidBuffer;
}
blkno = opaque->btpo_next;
start = InvalidOffsetNumber;
_bt_relbuf(rel, buf);
}
}