*
* ginentrypage.cpp
* routines for handling GIN entry tree pages.
*
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/gausskernel/storage/access/gin/ginentrypage.cpp
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/gin_private.h"
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "utils/rel.h"
#include "utils/rel_gs.h"
static void entrySplitPage(GinBtree btree, Buffer origbuf, GinBtreeStack *stack, GinBtreeEntryInsertData *insertData,
BlockNumber updateblkno, Page *newlpage, Page *newrpage);
* Form a tuple for entry tree.
*
* If the tuple would be too big to be stored, function throws a suitable
* error if errorTooBig is TRUE, or returns NULL if errorTooBig is FALSE.
*
* See src/backend/access/gin/README for a description of the index tuple
* format that is being built here. We build on the assumption that we
* are making a leaf-level key entry containing a posting list of nipd items.
* If the caller is actually trying to make a posting-tree entry, non-leaf
* entry, or pending-list entry, it should pass dataSize = 0 and then overwrite
* the t_tid fields as necessary. In any case, 'data' can be NULL to skip
* filling in the posting list; the caller is responsible for filling it
* afterwards if data = NULL and nipd > 0.
*/
IndexTuple GinFormTuple(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, Pointer data,
Size dataSize, int nipd, bool errorTooBig)
{
Datum datums[2];
bool isnull[2];
IndexTuple itup;
uint32 newsize;
errno_t ret = EOK;
if (ginstate->oneCol) {
datums[0] = key;
isnull[0] = (category != GIN_CAT_NORM_KEY);
} else {
datums[0] = UInt16GetDatum(attnum);
isnull[0] = false;
datums[1] = key;
isnull[1] = (category != GIN_CAT_NORM_KEY);
}
itup = index_form_tuple(ginstate->tupdesc[attnum - 1], datums, isnull);
* Determine and store offset to the posting list, making sure there is
* room for the category byte if needed.
*
* Note: because index_form_tuple MAXALIGNs the tuple size, there may well
* be some wasted pad space. Is it worth recomputing the data length to
* prevent that? That would also allow us to Assert that the real data
* doesn't overlap the GinNullCategory byte, which this code currently
* takes on faith.
*/
newsize = IndexTupleSize(itup);
if (IndexTupleHasNulls(itup)) {
uint32 minsize;
Assert(category != GIN_CAT_NORM_KEY);
minsize = GinCategoryOffset(itup, ginstate) + sizeof(GinNullCategory);
newsize = Max(newsize, minsize);
}
newsize = SHORTALIGN(newsize);
GinSetPostingOffset(itup, newsize);
GinSetNPosting(itup, nipd);
* Add space needed for posting list, if any. Then check that the tuple
* won't be too big to store.
*/
newsize += dataSize;
newsize = MAXALIGN(newsize);
if (newsize > GinMaxItemSize) {
if (errorTooBig)
ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("index row size %zu exceeds maximum %zu for index \"%s\"", (Size)newsize,
(Size)GinMaxItemSize, RelationGetRelationName(ginstate->index))));
pfree(itup);
itup = NULL;
return NULL;
}
* Resize tuple if needed
*/
if (newsize != IndexTupleSize(itup)) {
itup = (IndexTuple)repalloc(itup, newsize);
* PostgreSQL 9.3 and earlier did not clear this new space, so we
* might find uninitialized padding when reading tuples from disk.
*/
ret = memset_s((char *)itup + IndexTupleSize(itup), newsize - IndexTupleSize(itup), 0,
newsize - IndexTupleSize(itup));
securec_check(ret, "", "");
itup->t_info &= ~INDEX_SIZE_MASK;
itup->t_info |= newsize;
}
* Copy in the posting list, if provided
*/
if (data) {
char *ptr = GinGetPosting(itup);
ret = memcpy_s(ptr, dataSize, data, dataSize);
securec_check(ret, "", "");
}
* Insert category byte, if needed
*/
if (category != GIN_CAT_NORM_KEY) {
Assert(IndexTupleHasNulls(itup));
GinSetNullCategory(itup, ginstate, category);
}
return itup;
}
* Read item pointers from leaf entry tuple.
* Returns a palloc'd array of ItemPointers. The number of items is returned in nitems.
*/
ItemPointer ginReadTuple(GinState *ginstate, OffsetNumber attnum, IndexTuple itup, int *nitems)
{
Pointer ptr = GinGetPosting(itup);
int nipd = GinGetNPosting(itup);
ItemPointer ipd;
int ndecoded;
errno_t ret = EOK;
if (GinItupIsCompressed(itup)) {
if (nipd > 0) {
ipd = ginPostingListDecode((GinPostingList *)ptr, &ndecoded);
if (nipd != ndecoded)
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("number of items mismatch in GIN entry tuple, %d in tuple header, %d decoded",
nipd, ndecoded)));
} else {
ipd = (ItemPointer)palloc(0);
}
} else {
ipd = (ItemPointer)palloc(sizeof(ItemPointerData) * nipd);
ret = memcpy_s(ipd, sizeof(ItemPointerData) * nipd, ptr, sizeof(ItemPointerData) * nipd);
securec_check(ret, "", "");
}
*nitems = nipd;
return ipd;
}
* Form a non-leaf entry tuple by copying the key data from the given tuple,
* which can be either a leaf or non-leaf entry tuple.
*
* Any posting list in the source tuple is not copied. The specified child
* block number is inserted into t_tid.
*/
static IndexTuple GinFormInteriorTuple(IndexTuple itup, Page page, BlockNumber childblk)
{
IndexTuple nitup;
errno_t ret = EOK;
if (GinPageIsLeaf(page) && !GinIsPostingTree(itup)) {
uint32 origsize = GinGetPostingOffset(itup);
origsize = MAXALIGN(origsize);
nitup = (IndexTuple)palloc(origsize);
ret = memcpy_s(nitup, origsize, itup, origsize);
securec_check(ret, "", "");
nitup->t_info &= ~INDEX_SIZE_MASK;
nitup->t_info |= origsize;
} else {
nitup = (IndexTuple)palloc(IndexTupleSize(itup));
ret = memcpy_s(nitup, IndexTupleSize(itup), itup, IndexTupleSize(itup));
securec_check(ret, "", "");
}
GinSetDownlink(nitup, childblk);
return nitup;
}
* Entry tree is a "static", ie tuple never deletes from it,
* so we don't use right bound, we use rightmost key instead.
*/
static IndexTuple getRightMostTuple(Page page)
{
return (IndexTuple)PageGetItem(page, PageGetItemId(page, PageGetMaxOffsetNumber(page)));
}
static bool entryIsMoveRight(GinBtree btree, Page page)
{
IndexTuple itup;
OffsetNumber attnum;
Datum key;
GinNullCategory category;
if (GinPageRightMost(page))
return FALSE;
itup = getRightMostTuple(page);
attnum = gintuple_get_attrnum(btree->ginstate, itup);
key = gintuple_get_key(btree->ginstate, itup, &category);
if (ginCompareAttEntries(btree->ginstate, btree->entryAttnum, btree->entryKey, btree->entryCategory, attnum, key,
category) > 0)
return TRUE;
return FALSE;
}
* Find correct tuple in non-leaf page. It supposed that
* page correctly chosen and searching value SHOULD be on page
*/
static BlockNumber entryLocateEntry(GinBtree btree, GinBtreeStack *stack)
{
OffsetNumber low, high, maxoff;
IndexTuple itup = NULL;
int result;
Page page = BufferGetPage(stack->buffer);
Assert(!GinPageIsLeaf(page));
Assert(!GinPageIsData(page));
if (btree->fullScan) {
stack->off = FirstOffsetNumber;
stack->predictNumber *= PageGetMaxOffsetNumber(page);
return btree->getLeftMostChild(btree, page);
}
low = FirstOffsetNumber;
maxoff = high = PageGetMaxOffsetNumber(page);
Assert(high >= low);
high++;
while (high > low) {
OffsetNumber mid = low + ((high - low) / 2);
if (mid == maxoff && GinPageRightMost(page)) {
result = -1;
} else {
OffsetNumber attnum;
Datum key;
GinNullCategory category;
itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, mid));
attnum = gintuple_get_attrnum(btree->ginstate, itup);
key = gintuple_get_key(btree->ginstate, itup, &category);
result = ginCompareAttEntries(btree->ginstate, btree->entryAttnum, btree->entryKey, btree->entryCategory,
attnum, key, category);
}
if (result == 0) {
stack->off = mid;
Assert(GinGetDownlink(itup) != GIN_ROOT_BLKNO);
return GinGetDownlink(itup);
} else if (result > 0)
low = mid + 1;
else
high = mid;
}
Assert(high >= FirstOffsetNumber && high <= maxoff);
stack->off = high;
itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, high));
Assert(GinGetDownlink(itup) != GIN_ROOT_BLKNO);
return GinGetDownlink(itup);
}
* Searches correct position for value on leaf page.
* Page should be correctly chosen.
* Returns true if value found on page.
*/
static bool entryLocateLeafEntry(GinBtree btree, GinBtreeStack *stack)
{
Page page = BufferGetPage(stack->buffer);
OffsetNumber low, high;
Assert(GinPageIsLeaf(page));
Assert(!GinPageIsData(page));
if (btree->fullScan) {
stack->off = FirstOffsetNumber;
return TRUE;
}
low = FirstOffsetNumber;
high = PageGetMaxOffsetNumber(page);
if (high < low) {
stack->off = FirstOffsetNumber;
return false;
}
high++;
while (high > low) {
OffsetNumber mid = low + ((high - low) / 2);
IndexTuple itup;
OffsetNumber attnum;
Datum key;
GinNullCategory category;
int result;
itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, mid));
attnum = gintuple_get_attrnum(btree->ginstate, itup);
key = gintuple_get_key(btree->ginstate, itup, &category);
result = ginCompareAttEntries(btree->ginstate, btree->entryAttnum, btree->entryKey, btree->entryCategory,
attnum, key, category);
if (result == 0) {
stack->off = mid;
return true;
} else if (result > 0)
low = mid + 1;
else
high = mid;
}
stack->off = high;
return false;
}
static OffsetNumber entryFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
{
OffsetNumber i = 0;
OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
IndexTuple itup;
Assert(!GinPageIsLeaf(page));
Assert(!GinPageIsData(page));
if (storedOff >= FirstOffsetNumber && storedOff <= maxoff) {
itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, storedOff));
if (GinGetDownlink(itup) == blkno)
return storedOff;
* we hope, that needed pointer goes to right. It's true if there
* wasn't a deletion
*/
for (i = storedOff + 1; i <= maxoff; i++) {
itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, i));
if (GinGetDownlink(itup) == blkno)
return i;
}
maxoff = storedOff - 1;
}
for (i = FirstOffsetNumber; i <= maxoff; i++) {
itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, i));
if (GinGetDownlink(itup) == blkno)
return i;
}
return InvalidOffsetNumber;
}
static BlockNumber entryGetLeftMostPage(GinBtree btree, Page page)
{
IndexTuple itup;
Assert(!GinPageIsLeaf(page));
Assert(!GinPageIsData(page));
Assert(PageGetMaxOffsetNumber(page) >= FirstOffsetNumber);
itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
return GinGetDownlink(itup);
}
static bool entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off, GinBtreeEntryInsertData *insertData)
{
Size releasedsz = 0;
Size addedsz;
Page page = BufferGetPage(buf);
Assert(insertData->entry);
Assert(!GinPageIsData(page));
if (insertData->isDelete) {
IndexTuple itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, off));
releasedsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
}
addedsz = MAXALIGN(IndexTupleSize(insertData->entry)) + sizeof(ItemIdData);
if (PageGetFreeSpace(page) + releasedsz >= addedsz)
return true;
return false;
}
* Delete tuple on leaf page if tuples existed and we
* should update it, update old child blkno to new right page
* if child split occurred
*/
static void entryPreparePage(GinBtree btree, Page page, OffsetNumber off, GinBtreeEntryInsertData *insertData,
BlockNumber updateblkno)
{
Assert(insertData->entry);
Assert(!GinPageIsData(page));
if (insertData->isDelete) {
Assert(GinPageIsLeaf(page));
PageIndexTupleDelete(page, off);
}
if (!GinPageIsLeaf(page) && updateblkno != InvalidBlockNumber) {
IndexTuple itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, off));
GinSetDownlink(itup, updateblkno);
}
}
* Prepare to insert data on an entry page.
*
* If it will fit, return GPTP_INSERT after doing whatever setup is needed
* before we enter the insertion critical section. *ptp_workspace can be
* set to pass information along to the execPlaceToPage function.
*
* If it won't fit, perform a page split and return two temporary page
* images into *newlpage and *newrpage, with result GPTP_SPLIT.
*
* In neither case should the given page buffer be modified here.
*
* NOTE: on insertion to an internal node, in addition to inserting the given
* item, the downlink of the existing item at stack->off will be updated to
* point to updateblkno.
*/
static GinPlaceToPageRC entryBeginPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack, void *insertPayload,
BlockNumber updateblkno, void **ptp_workspace, Page *newlpage,
Page *newrpage)
{
GinBtreeEntryInsertData *insertData = (GinBtreeEntryInsertData *)insertPayload;
OffsetNumber off = stack->off;
if (!entryIsEnoughSpace(btree, buf, off, insertData)) {
entrySplitPage(btree, buf, stack, insertData, updateblkno, newlpage, newrpage);
return GPTP_SPLIT;
}
return GPTP_INSERT;
}
* Perform data insertion after beginPlaceToPage has decided it will fit.
*
* This is invoked within a critical section, and XLOG record creation (if
* needed) is already started. The target buffer is registered in slot 0.
*/
static void entryExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack, void *insertPayload,
BlockNumber updateblkno, void *ptp_workspace)
{
GinBtreeEntryInsertData *insertData = (GinBtreeEntryInsertData *)insertPayload;
Page page = BufferGetPage(buf);
OffsetNumber off = stack->off;
OffsetNumber placed;
entryPreparePage(btree, page, off, insertData, updateblkno);
placed = PageAddItem(page, (Item)insertData->entry, IndexTupleSize(insertData->entry), off, false, false);
if (placed != off) {
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add item to index page in \"%s\"", RelationGetRelationName(btree->index))));
}
if (RelationNeedsWAL(btree->index)) {
t_thrd.index_cxt.ptr_entry->isDelete = insertData->isDelete;
t_thrd.index_cxt.ptr_entry->offset = off;
XLogRegisterBufData(0, (char *)t_thrd.index_cxt.ptr_entry, offsetof(ginxlogInsertEntry, tuple));
XLogRegisterBufData(0, (char *)insertData->entry, IndexTupleSize(insertData->entry));
}
}
* Split entry page and insert new data.
*
* Returns new temp pages to *newlpage and *newrpage.
* The original buffer is left untouched.
*/
static void entrySplitPage(GinBtree btree, Buffer origbuf, GinBtreeStack *stack, GinBtreeEntryInsertData *insertData,
BlockNumber updateblkno, Page *newlpage, Page *newrpage)
{
OffsetNumber off = stack->off;
OffsetNumber i = 0;
OffsetNumber maxoff = 0;
OffsetNumber separator = InvalidOffsetNumber;
Size totalsize = 0;
Size lsize = 0;
Size size = 0;
char *ptr = NULL;
IndexTuple itup;
Page page;
Page lpage = PageGetTempPageCopy(BufferGetPage(origbuf));
Page rpage = PageGetTempPageCopy(BufferGetPage(origbuf));
Size pageSize = PageGetPageSize(lpage);
char tupstore[2 * BLCKSZ];
errno_t ret = EOK;
entryPreparePage(btree, lpage, off, insertData, updateblkno);
* First, append all the existing tuples and the new tuple we're inserting
* one after another in a temporary workspace.
*/
maxoff = PageGetMaxOffsetNumber(lpage);
ptr = tupstore;
for (i = FirstOffsetNumber; i <= maxoff; i++) {
if (i == off) {
size = MAXALIGN(IndexTupleSize(insertData->entry));
ret = memcpy_s(ptr, size, insertData->entry, size);
securec_check(ret, "", "");
ptr += size;
totalsize += size + sizeof(ItemIdData);
}
itup = (IndexTuple)PageGetItem(lpage, PageGetItemId(lpage, i));
size = MAXALIGN(IndexTupleSize(itup));
ret = memcpy_s(ptr, size, itup, size);
securec_check(ret, "", "");
ptr += size;
totalsize += size + sizeof(ItemIdData);
}
if (off == maxoff + 1) {
size = MAXALIGN(IndexTupleSize(insertData->entry));
ret = memcpy_s(ptr, size, insertData->entry, size);
securec_check(ret, "", "");
ptr += size;
totalsize += size + sizeof(ItemIdData);
}
* Initialize the left and right pages, and copy all the tuples back to
* them.
*/
GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
ptr = tupstore;
maxoff++;
lsize = 0;
page = lpage;
for (i = FirstOffsetNumber; i <= maxoff; i++) {
itup = (IndexTuple)ptr;
* Decide where to split. We try to equalize the pages' total data
* size, not number of tuples.
*/
if (lsize > totalsize / 2) {
if (separator == InvalidOffsetNumber)
separator = i - 1;
page = rpage;
} else {
lsize += MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
}
if (PageAddItem(page, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) ==
InvalidOffsetNumber)
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("failed to add item to index page in \"%s\"",
RelationGetRelationName(btree->index))));
ptr += MAXALIGN(IndexTupleSize(itup));
}
*newlpage = lpage;
*newrpage = rpage;
}
* Construct insertion payload for inserting the downlink for given buffer.
*/
static void *entryPrepareDownlink(GinBtree btree, Buffer lbuf)
{
GinBtreeEntryInsertData *insertData = NULL;
Page lpage = BufferGetPage(lbuf);
BlockNumber lblkno = BufferGetBlockNumber(lbuf);
IndexTuple itup;
itup = getRightMostTuple(lpage);
insertData = (GinBtreeEntryInsertData *)palloc(sizeof(GinBtreeEntryInsertData));
insertData->entry = GinFormInteriorTuple(itup, lpage, lblkno);
insertData->isDelete = false;
return insertData;
}
* Fills new root by rightest values from child.
* Also called from ginxlog, should not use btree
*/
void ginEntryFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage)
{
IndexTuple itup;
itup = GinFormInteriorTuple(getRightMostTuple(lpage), lpage, lblkno);
if (PageAddItem(root, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("failed to add item to index root page")));
pfree(itup);
itup = NULL;
itup = GinFormInteriorTuple(getRightMostTuple(rpage), rpage, rblkno);
if (PageAddItem(root, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("failed to add item to index root page")));
pfree(itup);
itup = NULL;
}
* Set up GinBtree for entry page access
*
* Note: during WAL recovery, there may be no valid data in ginstate
* other than a faked-up Relation pointer; the key datum is bogus too.
*/
void ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum, Datum key, GinNullCategory category, GinState *ginstate)
{
errno_t rc = EOK;
rc = memset_s(btree, sizeof(GinBtreeData), 0, sizeof(GinBtreeData));
securec_check(rc, "\0", "\0");
btree->index = ginstate->index;
btree->rootBlkno = GIN_ROOT_BLKNO;
btree->ginstate = ginstate;
btree->findChildPage = entryLocateEntry;
btree->getLeftMostChild = entryGetLeftMostPage;
btree->isMoveRight = entryIsMoveRight;
btree->findItem = entryLocateLeafEntry;
btree->findChildPtr = entryFindChildPtr;
btree->beginPlaceToPage = entryBeginPlaceToPage;
btree->execPlaceToPage = entryExecPlaceToPage;
btree->fillRoot = ginEntryFillRoot;
btree->prepareDownlink = entryPrepareDownlink;
btree->isData = FALSE;
btree->fullScan = FALSE;
btree->isBuild = FALSE;
btree->entryAttnum = attnum;
btree->entryKey = key;
btree->entryCategory = category;
}