DdengxuyueMisc bugfixes
3d79c591创建于 2021年3月6日历史提交
/* -------------------------------------------------------------------------
 *
 * ginentrypage.cpp
 *	  routines for handling GIN entry tree pages.
 *
 *
 * Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
 * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *    src/gausskernel/storage/access/gin/ginentrypage.cpp
 * -------------------------------------------------------------------------
 */
#include "postgres.h"
#include "knl/knl_variable.h"

#include "access/gin_private.h"
#include "access/xloginsert.h"
#include "miscadmin.h"
#include "utils/rel.h"
#include "utils/rel_gs.h"

static void entrySplitPage(GinBtree btree, Buffer origbuf, GinBtreeStack *stack, GinBtreeEntryInsertData *insertData,
                           BlockNumber updateblkno, Page *newlpage, Page *newrpage);

/*
 * Form a tuple for entry tree.
 *
 * If the tuple would be too big to be stored, function throws a suitable
 * error if errorTooBig is TRUE, or returns NULL if errorTooBig is FALSE.
 *
 * See src/backend/access/gin/README for a description of the index tuple
 * format that is being built here.  We build on the assumption that we
 * are making a leaf-level key entry containing a posting list of nipd items.
 * If the caller is actually trying to make a posting-tree entry, non-leaf
 * entry, or pending-list entry, it should pass dataSize = 0 and then overwrite
 * the t_tid fields as necessary.  In any case, 'data' can be NULL to skip
 * filling in the posting list; the caller is responsible for filling it
 * afterwards if data = NULL and nipd > 0.
 */
IndexTuple GinFormTuple(GinState *ginstate, OffsetNumber attnum, Datum key, GinNullCategory category, Pointer data,
                        Size dataSize, int nipd, bool errorTooBig)
{
    Datum datums[2];
    bool isnull[2];
    IndexTuple itup;
    uint32 newsize;
    errno_t ret = EOK;

    /* Build the basic tuple: optional column number, plus key datum */
    if (ginstate->oneCol) {
        datums[0] = key;
        isnull[0] = (category != GIN_CAT_NORM_KEY);
    } else {
        datums[0] = UInt16GetDatum(attnum);
        isnull[0] = false;
        datums[1] = key;
        isnull[1] = (category != GIN_CAT_NORM_KEY);
    }

    itup = index_form_tuple(ginstate->tupdesc[attnum - 1], datums, isnull);

    /*
     * Determine and store offset to the posting list, making sure there is
     * room for the category byte if needed.
     *
     * Note: because index_form_tuple MAXALIGNs the tuple size, there may well
     * be some wasted pad space.  Is it worth recomputing the data length to
     * prevent that?  That would also allow us to Assert that the real data
     * doesn't overlap the GinNullCategory byte, which this code currently
     * takes on faith.
     */
    newsize = IndexTupleSize(itup);

    if (IndexTupleHasNulls(itup)) {
        uint32 minsize;

        Assert(category != GIN_CAT_NORM_KEY);
        minsize = GinCategoryOffset(itup, ginstate) + sizeof(GinNullCategory);
        newsize = Max(newsize, minsize);
    }

    newsize = SHORTALIGN(newsize);

    GinSetPostingOffset(itup, newsize);
    GinSetNPosting(itup, nipd);

    /*
     * Add space needed for posting list, if any.  Then check that the tuple
     * won't be too big to store.
     */
    newsize += dataSize;

    newsize = MAXALIGN(newsize);
    if (newsize > GinMaxItemSize) {
        if (errorTooBig)
            ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                            errmsg("index row size %zu exceeds maximum %zu for index \"%s\"", (Size)newsize,
                                   (Size)GinMaxItemSize, RelationGetRelationName(ginstate->index))));
        pfree(itup);
        itup = NULL;
        return NULL;
    }

    /*
     * Resize tuple if needed
     */
    if (newsize != IndexTupleSize(itup)) {
        itup = (IndexTuple)repalloc(itup, newsize);

        /*
         * PostgreSQL 9.3 and earlier did not clear this new space, so we
         * might find uninitialized padding when reading tuples from disk.
         */
        ret = memset_s((char *)itup + IndexTupleSize(itup), newsize - IndexTupleSize(itup), 0,
                       newsize - IndexTupleSize(itup));
        securec_check(ret, "", "");
        /* set new size in tuple header */
        itup->t_info &= ~INDEX_SIZE_MASK;
        itup->t_info |= newsize;
    }

    /*
     * Copy in the posting list, if provided
     */
    if (data) {
        char *ptr = GinGetPosting(itup);

        ret = memcpy_s(ptr, dataSize, data, dataSize);
        securec_check(ret, "", "");
    }

    /*
     * Insert category byte, if needed
     */
    if (category != GIN_CAT_NORM_KEY) {
        Assert(IndexTupleHasNulls(itup));
        GinSetNullCategory(itup, ginstate, category);
    }
    return itup;
}

/*
 * Read item pointers from leaf entry tuple.
 * Returns a palloc'd array of ItemPointers. The number of items is returned in nitems.
 */
ItemPointer ginReadTuple(GinState *ginstate, OffsetNumber attnum, IndexTuple itup, int *nitems)
{
    Pointer ptr = GinGetPosting(itup);
    int nipd = GinGetNPosting(itup);
    ItemPointer ipd;
    int ndecoded;
    errno_t ret = EOK;

    if (GinItupIsCompressed(itup)) {
        if (nipd > 0) {
            ipd = ginPostingListDecode((GinPostingList *)ptr, &ndecoded);
            if (nipd != ndecoded)
                ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
                                errmsg("number of items mismatch in GIN entry tuple, %d in tuple header, %d decoded",
                                       nipd, ndecoded)));
        } else {
            ipd = (ItemPointer)palloc(0);
        }
    } else {
        ipd = (ItemPointer)palloc(sizeof(ItemPointerData) * nipd);
        ret = memcpy_s(ipd, sizeof(ItemPointerData) * nipd, ptr, sizeof(ItemPointerData) * nipd);
        securec_check(ret, "", "");
    }
    *nitems = nipd;
    return ipd;
}

/*
 * Form a non-leaf entry tuple by copying the key data from the given tuple,
 * which can be either a leaf or non-leaf entry tuple.
 *
 * Any posting list in the source tuple is not copied.  The specified child
 * block number is inserted into t_tid.
 */
static IndexTuple GinFormInteriorTuple(IndexTuple itup, Page page, BlockNumber childblk)
{
    IndexTuple nitup;
    errno_t ret = EOK;

    if (GinPageIsLeaf(page) && !GinIsPostingTree(itup)) {
        /* Tuple contains a posting list, just copy stuff before that */
        uint32 origsize = GinGetPostingOffset(itup);

        origsize = MAXALIGN(origsize);
        nitup = (IndexTuple)palloc(origsize);
        ret = memcpy_s(nitup, origsize, itup, origsize);
        securec_check(ret, "", "");
        /* ... be sure to fix the size header field ... */
        nitup->t_info &= ~INDEX_SIZE_MASK;
        nitup->t_info |= origsize;
    } else {
        /* Copy the tuple as-is */
        nitup = (IndexTuple)palloc(IndexTupleSize(itup));
        ret = memcpy_s(nitup, IndexTupleSize(itup), itup, IndexTupleSize(itup));
        securec_check(ret, "", "");
    }

    /* Now insert the correct downlink */
    GinSetDownlink(nitup, childblk);

    return nitup;
}

/*
 * Entry tree is a "static", ie tuple never deletes from it,
 * so we don't use right bound, we use rightmost key instead.
 */
static IndexTuple getRightMostTuple(Page page)
{
    return (IndexTuple)PageGetItem(page, PageGetItemId(page, PageGetMaxOffsetNumber(page)));
}

static bool entryIsMoveRight(GinBtree btree, Page page)
{
    IndexTuple itup;
    OffsetNumber attnum;
    Datum key;
    GinNullCategory category;

    if (GinPageRightMost(page))
        return FALSE;

    itup = getRightMostTuple(page);
    attnum = gintuple_get_attrnum(btree->ginstate, itup);
    key = gintuple_get_key(btree->ginstate, itup, &category);
    if (ginCompareAttEntries(btree->ginstate, btree->entryAttnum, btree->entryKey, btree->entryCategory, attnum, key,
                             category) > 0)
        return TRUE;

    return FALSE;
}

/*
 * Find correct tuple in non-leaf page. It supposed that
 * page correctly chosen and searching value SHOULD be on page
 */
static BlockNumber entryLocateEntry(GinBtree btree, GinBtreeStack *stack)
{
    OffsetNumber low, high, maxoff;
    IndexTuple itup = NULL;
    int result;
    Page page = BufferGetPage(stack->buffer);

    Assert(!GinPageIsLeaf(page));
    Assert(!GinPageIsData(page));

    if (btree->fullScan) {
        stack->off = FirstOffsetNumber;
        stack->predictNumber *= PageGetMaxOffsetNumber(page);
        return btree->getLeftMostChild(btree, page);
    }

    low = FirstOffsetNumber;
    maxoff = high = PageGetMaxOffsetNumber(page);
    Assert(high >= low);

    high++;

    while (high > low) {
        OffsetNumber mid = low + ((high - low) / 2);

        if (mid == maxoff && GinPageRightMost(page)) {
            /* Right infinity */
            result = -1;
        } else {
            OffsetNumber attnum;
            Datum key;
            GinNullCategory category;

            itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, mid));
            attnum = gintuple_get_attrnum(btree->ginstate, itup);
            key = gintuple_get_key(btree->ginstate, itup, &category);
            result = ginCompareAttEntries(btree->ginstate, btree->entryAttnum, btree->entryKey, btree->entryCategory,
                                          attnum, key, category);
        }

        if (result == 0) {
            stack->off = mid;
            Assert(GinGetDownlink(itup) != GIN_ROOT_BLKNO);
            return GinGetDownlink(itup);
        } else if (result > 0)
            low = mid + 1;
        else
            high = mid;
    }

    Assert(high >= FirstOffsetNumber && high <= maxoff);

    stack->off = high;
    itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, high));
    Assert(GinGetDownlink(itup) != GIN_ROOT_BLKNO);
    return GinGetDownlink(itup);
}

/*
 * Searches correct position for value on leaf page.
 * Page should be correctly chosen.
 * Returns true if value found on page.
 */
static bool entryLocateLeafEntry(GinBtree btree, GinBtreeStack *stack)
{
    Page page = BufferGetPage(stack->buffer);
    OffsetNumber low, high;

    Assert(GinPageIsLeaf(page));
    Assert(!GinPageIsData(page));

    if (btree->fullScan) {
        stack->off = FirstOffsetNumber;
        return TRUE;
    }

    low = FirstOffsetNumber;
    high = PageGetMaxOffsetNumber(page);
    if (high < low) {
        stack->off = FirstOffsetNumber;
        return false;
    }

    high++;

    while (high > low) {
        OffsetNumber mid = low + ((high - low) / 2);
        IndexTuple itup;
        OffsetNumber attnum;
        Datum key;
        GinNullCategory category;
        int result;

        itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, mid));
        attnum = gintuple_get_attrnum(btree->ginstate, itup);
        key = gintuple_get_key(btree->ginstate, itup, &category);
        result = ginCompareAttEntries(btree->ginstate, btree->entryAttnum, btree->entryKey, btree->entryCategory,
                                      attnum, key, category);
        if (result == 0) {
            stack->off = mid;
            return true;
        } else if (result > 0)
            low = mid + 1;
        else
            high = mid;
    }

    stack->off = high;
    return false;
}

static OffsetNumber entryFindChildPtr(GinBtree btree, Page page, BlockNumber blkno, OffsetNumber storedOff)
{
    OffsetNumber i = 0;
    OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
    IndexTuple itup;

    Assert(!GinPageIsLeaf(page));
    Assert(!GinPageIsData(page));

    /* if page isn't changed, we returns storedOff */
    if (storedOff >= FirstOffsetNumber && storedOff <= maxoff) {
        itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, storedOff));
        if (GinGetDownlink(itup) == blkno)
            return storedOff;

        /*
         * we hope, that needed pointer goes to right. It's true if there
         * wasn't a deletion
         */
        for (i = storedOff + 1; i <= maxoff; i++) {
            itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, i));
            if (GinGetDownlink(itup) == blkno)
                return i;
        }
        maxoff = storedOff - 1;
    }

    /* last chance */
    for (i = FirstOffsetNumber; i <= maxoff; i++) {
        itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, i));
        if (GinGetDownlink(itup) == blkno)
            return i;
    }

    return InvalidOffsetNumber;
}

static BlockNumber entryGetLeftMostPage(GinBtree btree, Page page)
{
    IndexTuple itup;

    Assert(!GinPageIsLeaf(page));
    Assert(!GinPageIsData(page));
    Assert(PageGetMaxOffsetNumber(page) >= FirstOffsetNumber);

    itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, FirstOffsetNumber));
    return GinGetDownlink(itup);
}

static bool entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off, GinBtreeEntryInsertData *insertData)
{
    Size releasedsz = 0;
    Size addedsz;
    Page page = BufferGetPage(buf);

    Assert(insertData->entry);
    Assert(!GinPageIsData(page));

    if (insertData->isDelete) {
        IndexTuple itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, off));

        releasedsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
    }

    addedsz = MAXALIGN(IndexTupleSize(insertData->entry)) + sizeof(ItemIdData);
    if (PageGetFreeSpace(page) + releasedsz >= addedsz)
        return true;

    return false;
}

/*
 * Delete tuple on leaf page if tuples existed and we
 * should update it, update old child blkno to new right page
 * if child split occurred
 */
static void entryPreparePage(GinBtree btree, Page page, OffsetNumber off, GinBtreeEntryInsertData *insertData,
                             BlockNumber updateblkno)
{
    Assert(insertData->entry);
    Assert(!GinPageIsData(page));

    if (insertData->isDelete) {
        Assert(GinPageIsLeaf(page));
        PageIndexTupleDelete(page, off);
    }

    if (!GinPageIsLeaf(page) && updateblkno != InvalidBlockNumber) {
        IndexTuple itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, off));

        GinSetDownlink(itup, updateblkno);
    }
}

/*
 * Prepare to insert data on an entry page.
 *
 * If it will fit, return GPTP_INSERT after doing whatever setup is needed
 * before we enter the insertion critical section.  *ptp_workspace can be
 * set to pass information along to the execPlaceToPage function.
 *
 * If it won't fit, perform a page split and return two temporary page
 * images into *newlpage and *newrpage, with result GPTP_SPLIT.
 *
 * In neither case should the given page buffer be modified here.
 *
 * NOTE: on insertion to an internal node, in addition to inserting the given
 * item, the downlink of the existing item at stack->off will be updated to
 * point to updateblkno.
 */
static GinPlaceToPageRC entryBeginPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack, void *insertPayload,
                                              BlockNumber updateblkno, void **ptp_workspace, Page *newlpage,
                                              Page *newrpage)
{
    GinBtreeEntryInsertData *insertData = (GinBtreeEntryInsertData *)insertPayload;
    OffsetNumber off = stack->off;

    /* If it doesn't fit, deal with split case */
    if (!entryIsEnoughSpace(btree, buf, off, insertData)) {
        entrySplitPage(btree, buf, stack, insertData, updateblkno, newlpage, newrpage);
        return GPTP_SPLIT;
    }

    /* Else, we're ready to proceed with insertion */
    return GPTP_INSERT;
}

/*
 * Perform data insertion after beginPlaceToPage has decided it will fit.
 *
 * This is invoked within a critical section, and XLOG record creation (if
 * needed) is already started.	The target buffer is registered in slot 0.
 */
static void entryExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack, void *insertPayload,
                                 BlockNumber updateblkno, void *ptp_workspace)
{
    GinBtreeEntryInsertData *insertData = (GinBtreeEntryInsertData *)insertPayload;
    Page page = BufferGetPage(buf);
    OffsetNumber off = stack->off;
    OffsetNumber placed;

    entryPreparePage(btree, page, off, insertData, updateblkno);

    placed = PageAddItem(page, (Item)insertData->entry, IndexTupleSize(insertData->entry), off, false, false);
    if (placed != off) {
        ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
                        errmsg("failed to add item to index page in \"%s\"", RelationGetRelationName(btree->index))));
    }

    if (RelationNeedsWAL(btree->index)) {
        t_thrd.index_cxt.ptr_entry->isDelete = insertData->isDelete;
        t_thrd.index_cxt.ptr_entry->offset = off;

        XLogRegisterBufData(0, (char *)t_thrd.index_cxt.ptr_entry, offsetof(ginxlogInsertEntry, tuple));
        XLogRegisterBufData(0, (char *)insertData->entry, IndexTupleSize(insertData->entry));
    }
}

/*
 * Split entry page and insert new data.
 *
 * Returns new temp pages to *newlpage and *newrpage.
 * The original buffer is left untouched.
 */
static void entrySplitPage(GinBtree btree, Buffer origbuf, GinBtreeStack *stack, GinBtreeEntryInsertData *insertData,
                           BlockNumber updateblkno, Page *newlpage, Page *newrpage)
{
    OffsetNumber off = stack->off;
    OffsetNumber i = 0;
    OffsetNumber maxoff = 0;
    OffsetNumber separator = InvalidOffsetNumber;
    Size totalsize = 0;
    Size lsize = 0;
    Size size = 0;
    char *ptr = NULL;
    IndexTuple itup;
    Page page;
    Page lpage = PageGetTempPageCopy(BufferGetPage(origbuf));
    Page rpage = PageGetTempPageCopy(BufferGetPage(origbuf));
    Size pageSize = PageGetPageSize(lpage);
    char tupstore[2 * BLCKSZ];
    errno_t ret = EOK;

    entryPreparePage(btree, lpage, off, insertData, updateblkno);

    /*
     * First, append all the existing tuples and the new tuple we're inserting
     * one after another in a temporary workspace.
     */
    maxoff = PageGetMaxOffsetNumber(lpage);
    ptr = tupstore;
    for (i = FirstOffsetNumber; i <= maxoff; i++) {
        if (i == off) {
            size = MAXALIGN(IndexTupleSize(insertData->entry));
            ret = memcpy_s(ptr, size, insertData->entry, size);
            securec_check(ret, "", "");
            ptr += size;
            totalsize += size + sizeof(ItemIdData);
        }

        itup = (IndexTuple)PageGetItem(lpage, PageGetItemId(lpage, i));
        size = MAXALIGN(IndexTupleSize(itup));
        ret = memcpy_s(ptr, size, itup, size);
        securec_check(ret, "", "");
        ptr += size;
        totalsize += size + sizeof(ItemIdData);
    }

    if (off == maxoff + 1) {
        size = MAXALIGN(IndexTupleSize(insertData->entry));
        ret = memcpy_s(ptr, size, insertData->entry, size);
        securec_check(ret, "", "");
        ptr += size;
        totalsize += size + sizeof(ItemIdData);
    }

    /*
     * Initialize the left and right pages, and copy all the tuples back to
     * them.
     */
    GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
    GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);

    ptr = tupstore;
    maxoff++;
    lsize = 0;

    page = lpage;
    for (i = FirstOffsetNumber; i <= maxoff; i++) {
        itup = (IndexTuple)ptr;

        /*
         * Decide where to split.  We try to equalize the pages' total data
         * size, not number of tuples.
         */
        if (lsize > totalsize / 2) {
            if (separator == InvalidOffsetNumber)
                separator = i - 1;
            page = rpage;
        } else {
            lsize += MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
        }

        if (PageAddItem(page, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) ==
            InvalidOffsetNumber)
            ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("failed to add item to index page in \"%s\"",
                                                                     RelationGetRelationName(btree->index))));
        ptr += MAXALIGN(IndexTupleSize(itup));
    }

    /* return temp pages to caller */
    *newlpage = lpage;
    *newrpage = rpage;
}

/*
 * Construct insertion payload for inserting the downlink for given buffer.
 */
static void *entryPrepareDownlink(GinBtree btree, Buffer lbuf)
{
    GinBtreeEntryInsertData *insertData = NULL;
    Page lpage = BufferGetPage(lbuf);
    BlockNumber lblkno = BufferGetBlockNumber(lbuf);
    IndexTuple itup;

    itup = getRightMostTuple(lpage);

    insertData = (GinBtreeEntryInsertData *)palloc(sizeof(GinBtreeEntryInsertData));
    insertData->entry = GinFormInteriorTuple(itup, lpage, lblkno);
    insertData->isDelete = false;

    return insertData;
}

/*
 * Fills new root by rightest values from child.
 * Also called from ginxlog, should not use btree
 */
void ginEntryFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage)
{
    IndexTuple itup;

    itup = GinFormInteriorTuple(getRightMostTuple(lpage), lpage, lblkno);
    if (PageAddItem(root, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
        ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("failed to add item to index root page")));
    pfree(itup);
    itup = NULL;
    itup = GinFormInteriorTuple(getRightMostTuple(rpage), rpage, rblkno);
    if (PageAddItem(root, (Item)itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
        ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("failed to add item to index root page")));
    pfree(itup);
    itup = NULL;
}

/*
 * Set up GinBtree for entry page access
 *
 * Note: during WAL recovery, there may be no valid data in ginstate
 * other than a faked-up Relation pointer; the key datum is bogus too.
 */
void ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum, Datum key, GinNullCategory category, GinState *ginstate)
{
    errno_t rc = EOK;

    rc = memset_s(btree, sizeof(GinBtreeData), 0, sizeof(GinBtreeData));
    securec_check(rc, "\0", "\0");

    btree->index = ginstate->index;
    btree->rootBlkno = GIN_ROOT_BLKNO;
    btree->ginstate = ginstate;

    btree->findChildPage = entryLocateEntry;
    btree->getLeftMostChild = entryGetLeftMostPage;
    btree->isMoveRight = entryIsMoveRight;
    btree->findItem = entryLocateLeafEntry;
    btree->findChildPtr = entryFindChildPtr;
    btree->beginPlaceToPage = entryBeginPlaceToPage;
    btree->execPlaceToPage = entryExecPlaceToPage;
    btree->fillRoot = ginEntryFillRoot;
    btree->prepareDownlink = entryPrepareDownlink;

    btree->isData = FALSE;
    btree->fullScan = FALSE;
    btree->isBuild = FALSE;

    btree->entryAttnum = attnum;
    btree->entryKey = key;
    btree->entryCategory = category;
}