* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* spgxlog.cpp
* parse spgist xlog
*
* IDENTIFICATION
* src/gausskernel/storage/access/redo/spgxlog.cpp
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/spgist_private.h"
#include "utils/rel_gs.h"
#include "access/transam.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "access/xlogproc.h"
#include "storage/standby.h"
#include "utils/memutils.h"
#include "access/multi_redo_api.h"
typedef enum {
SPG_CREATE_INDEX_META_BLOCK_NUM = 0,
SPG_CREATE_INDEX_ROOT_BLOCK_NUM,
SPG_CREATE_INDEX_LEAF_BLOCK_NUM,
} XLogSpgCreateIndexBlockEnum;
typedef enum {
SPG_ADD_LEAF_LEAF_BLOCK_NUM = 0,
SPG_ADD_LEAF_PARENT_BLOCK_NUM,
} XLogSpgAddLeafBlockEnum;
typedef enum {
SPG_MOVE_LEAF_DST_BLOCK_NUM = 0,
SPG_MOVE_LEAF_SRC_BLOCK_NUM,
SPG_MOVE_LEAF_PARENT_BLOCK_NUM,
} XLogSpgMoveLeafBlockEnum;
typedef enum {
SPG_ADD_NODE_OLD_BLOCK_NUM = 0,
SPG_ADD_NODE_NEW_BLOCK_NUM,
SPG_ADD_NODE_PARENT_BLOCK_NUM,
} XLogSpgAddNodeBlockEnum;
typedef enum {
SPG_SPLIT_TUPLE_OLD_BLOCK_NUM = 0,
SPG_SPLIT_TUPLE_NEW_BLOCK_NUM,
} XLogSpgSplitTupleBlockEnum;
typedef enum {
SPG_PICK_SPLIT_SRC_BLOCK_NUM = 0,
SPG_PICK_SPLIT_DST_BLOCK_NUM,
SPG_PICK_SPLIT_INNER_BLOCK_NUM,
SPG_PICK_SPLIT_PARENT_BLOCK_NUM,
} XLogSpgPickSplitBlockEnum;
typedef enum {
SPG_PICK_VACUUM_LEAF_BLOCK_NUM = 0,
} XLogSpgVacuumLeafBlockEnum;
typedef enum {
SPG_PICK_VACUUM_ROOT_BLOCK_NUM = 0,
} XLogSpgVacuumRootBlockEnum;
typedef enum {
SPG_PICK_VACUUM_REDIRECT_BLOCK_NUM = 0,
} XLogSpgVacuumRedirectBlockEnum;
void spgRedoCreateIndexOperatorMetaPage(RedoBufferInfo *buffer)
{
Assert(buffer->blockinfo.blkno == SPGIST_METAPAGE_BLKNO);
Page page = buffer->pageinfo.page;
SpGistInitMetapage(page);
PageSetLSN(page, buffer->lsn);
}
void spgRedoCreateIndexOperatorRootPage(RedoBufferInfo *buffer)
{
Assert(buffer->blockinfo.blkno == SPGIST_ROOT_BLKNO);
Page page = buffer->pageinfo.page;
Assert(buffer->pageinfo.pagesize == BLCKSZ);
SpGistInitPage(page, SPGIST_LEAF);
PageSetLSN(page, buffer->lsn);
}
void spgRedoCreateIndexOperatorLeafPage(RedoBufferInfo *buffer)
{
Assert(buffer->blockinfo.blkno == SPGIST_NULL_BLKNO);
Page page = buffer->pageinfo.page;
Assert(buffer->pageinfo.pagesize == BLCKSZ);
SpGistInitPage(page, SPGIST_LEAF | SPGIST_NULLS);
PageSetLSN(page, buffer->lsn);
}
void spgRedoAddLeafOperatorPage(RedoBufferInfo *bufferinfo, void *recorddata)
{
Page page = bufferinfo->pageinfo.page;
char *ptr = (char *)recorddata;
spgxlogAddLeaf *xldata = (spgxlogAddLeaf *)ptr;
SpGistLeafTuple leafTuple;
ptr += sizeof(spgxlogAddLeaf);
leafTuple = (SpGistLeafTuple)ptr;
if (xldata->newPage) {
Assert(bufferinfo->pageinfo.pagesize == BLCKSZ);
SpGistInitPage(page, SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
}
if (xldata->offnumLeaf != xldata->offnumHeadLeaf) {
addOrReplaceTuple(page, (Item)leafTuple, leafTuple->size, xldata->offnumLeaf);
if (xldata->offnumHeadLeaf != InvalidOffsetNumber) {
SpGistLeafTuple head;
head = (SpGistLeafTuple)PageGetItem(page, PageGetItemId(page, xldata->offnumHeadLeaf));
Assert(head->nextOffset == leafTuple->nextOffset);
head->nextOffset = xldata->offnumLeaf;
}
} else {
PageIndexTupleDelete(page, xldata->offnumLeaf);
if (PageAddItem(page, (Item)leafTuple, leafTuple->size, xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add item of size %u to SPGiST index page", leafTuple->size)));
}
PageSetLSN(page, bufferinfo->lsn);
}
void spgRedoAddLeafOperatorParent(RedoBufferInfo *bufferinfo, void *recorddata, BlockNumber blknoLeaf)
{
SpGistInnerTuple tuple;
char *ptr = (char *)recorddata;
spgxlogAddLeaf *xldata = (spgxlogAddLeaf *)ptr;
Page page = bufferinfo->pageinfo.page;
tuple = (SpGistInnerTuple)PageGetItem(page, PageGetItemId(page, xldata->offnumParent));
spgUpdateNodeLink(tuple, xldata->nodeI, blknoLeaf, xldata->offnumLeaf);
PageSetLSN(page, bufferinfo->lsn);
}
void spgRedoMoveLeafsOpratorDstPage(RedoBufferInfo *buffer, void *recorddata, void *insertdata, void *tupledata)
{
spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *)recorddata;
OffsetNumber *toInsert = (OffsetNumber *)insertdata;
int nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
* In normal operation we would have all three pages (source, dest, and
* parent) locked simultaneously; but in WAL replay it should be safe to
* update them one at a time, as long as we do it in the right order.
*/
Page page = buffer->pageinfo.page;
if (xldata->newPage) {
Assert(buffer->pageinfo.pagesize == BLCKSZ);
SpGistInitPage(page, SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
}
char *ptr = (char *)tupledata;
int i;
for (i = 0; i < nInsert; i++) {
SpGistLeafTuple lt = (SpGistLeafTuple)ptr;
addOrReplaceTuple(page, (Item)lt, lt->size, toInsert[i]);
ptr += lt->size;
}
PageSetLSN(page, buffer->lsn);
}
void spgRedoMoveLeafsOpratorSrcPage(RedoBufferInfo *buffer, void *recorddata, void *insertdata, void *deletedata,
BlockNumber blknoDst, int nInsert)
{
spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *)recorddata;
Page page = buffer->pageinfo.page;
OffsetNumber *toInsert = (OffsetNumber *)insertdata;
OffsetNumber *toDelete = (OffsetNumber *)deletedata;
SpGistState state;
fillFakeState(&state, xldata->stateSrc);
spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT, SPGIST_PLACEHOLDER, blknoDst,
toInsert[nInsert - 1]);
PageSetLSN(page, buffer->lsn);
}
void spgRedoMoveLeafsOpratorParentPage(RedoBufferInfo *buffer, void *recorddata, void *insertdata, BlockNumber blknoDst,
int nInsert)
{
spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *)recorddata;
Page page = buffer->pageinfo.page;
OffsetNumber *toInsert = (OffsetNumber *)insertdata;
SpGistInnerTuple tuple;
tuple = (SpGistInnerTuple)PageGetItem(page, PageGetItemId(page, xldata->offnumParent));
spgUpdateNodeLink(tuple, xldata->nodeI, blknoDst, toInsert[nInsert - 1]);
PageSetLSN(page, buffer->lsn);
}
void spgRedoAddNodeUpdateSrcPage(RedoBufferInfo *buffer, void *recorddata, void *tuple, void *tupleheader)
{
spgxlogAddNode *xldata = (spgxlogAddNode *)recorddata;
Page page = buffer->pageinfo.page;
char *innerTuple = (char *)tuple;
SpGistInnerTupleData *innerTupleHdr = (SpGistInnerTupleData *)tupleheader;
PageIndexTupleDelete(page, xldata->offnum);
if (PageAddItem(page, (Item)innerTuple, innerTupleHdr->size, xldata->offnum, false, false) != xldata->offnum)
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add item of size %u to SPGiST index page", innerTupleHdr->size)));
PageSetLSN(page, buffer->lsn);
}
void spgRedoAddNodeOperatorSrcPage(RedoBufferInfo *buffer, void *recorddata, BlockNumber blknoNew)
{
spgxlogAddNode *xldata = (spgxlogAddNode *)recorddata;
Page page = buffer->pageinfo.page;
SpGistDeadTuple dt;
SpGistState state;
fillFakeState(&state, xldata->stateSrc);
if (state.isBuild)
dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER, InvalidBlockNumber, InvalidOffsetNumber);
else
dt = spgFormDeadTuple(&state, SPGIST_REDIRECT, blknoNew, xldata->offnumNew);
PageIndexTupleDelete(page, xldata->offnum);
if (PageAddItem(page, (Item)dt, dt->size, xldata->offnum, false, false) != xldata->offnum)
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add item of size %u to SPGiST index page", dt->size)));
if (state.isBuild)
SpGistPageGetOpaque(page)->nPlaceholder++;
else
SpGistPageGetOpaque(page)->nRedirection++;
* If parent is in this same page,update it now.
*/
if (xldata->parentBlk == 0) {
SpGistInnerTuple parentTuple;
parentTuple = (SpGistInnerTuple)PageGetItem(page, PageGetItemId(page, xldata->offnumParent));
spgUpdateNodeLink(parentTuple, xldata->nodeI, blknoNew, xldata->offnumNew);
}
PageSetLSN(page, buffer->lsn);
}
void spgRedoAddNodeOperatorDestPage(RedoBufferInfo *buffer, void *recorddata, void *tuple, void *tupleheader,
BlockNumber blknoNew)
{
spgxlogAddNode *xldata = (spgxlogAddNode *)recorddata;
Page page = buffer->pageinfo.page;
char *innerTuple = (char *)tuple;
SpGistInnerTupleData *innerTupleHdr = (SpGistInnerTupleData *)tupleheader;
* In normal operation we would have all three pages (source, dest,
* and parent) locked simultaneously; but in WAL replay it should be
* safe to update them one at a time, as long as we do it in the right
* order. We must insert the new tuple before replacing the old tuple
* with the redirect tuple.
*/
if (xldata->newPage) {
Assert(buffer->pageinfo.pagesize == BLCKSZ);
SpGistInitPage(page, 0);
}
addOrReplaceTuple(page, (Item)innerTuple, innerTupleHdr->size, xldata->offnumNew);
* If parent is in this same page, update it now.
*/
if (xldata->parentBlk == 1) {
SpGistInnerTuple parentTuple;
parentTuple = (SpGistInnerTuple)PageGetItem(page, PageGetItemId(page, xldata->offnumParent));
spgUpdateNodeLink(parentTuple, xldata->nodeI, blknoNew, xldata->offnumNew);
}
PageSetLSN(page, buffer->lsn);
}
void spgRedoAddNodeOperatorParentPage(RedoBufferInfo *buffer, void *recorddata, BlockNumber blknoNew)
{
spgxlogAddNode *xldata = (spgxlogAddNode *)recorddata;
Page page = buffer->pageinfo.page;
SpGistInnerTuple parentTuple;
parentTuple = (SpGistInnerTuple)PageGetItem(page, PageGetItemId(page, xldata->offnumParent));
spgUpdateNodeLink(parentTuple, xldata->nodeI, blknoNew, xldata->offnumNew);
PageSetLSN(page, buffer->lsn);
}
void spgRedoSplitTupleOperatorDestPage(RedoBufferInfo *buffer, void *recorddata, void *tuple)
{
spgxlogSplitTuple *xldata = (spgxlogSplitTuple *)recorddata;
Page page = buffer->pageinfo.page;
SpGistInnerTuple postfixTuple = (SpGistInnerTuple)tuple;
if (xldata->newPage) {
Assert(buffer->pageinfo.pagesize == BLCKSZ);
SpGistInitPage(page, 0);
}
page = BufferGetPage(buffer->buf);
addOrReplaceTuple(page, (Item)postfixTuple, postfixTuple->size, xldata->offnumPostfix);
PageSetLSN(page, buffer->lsn);
}
void spgRedoSplitTupleOperatorSrcPage(RedoBufferInfo *buffer, void *recorddata, void *pretuple, void *posttuple)
{
spgxlogSplitTuple *xldata = (spgxlogSplitTuple *)recorddata;
Page page = buffer->pageinfo.page;
SpGistInnerTuple prefixTuple = (SpGistInnerTuple)pretuple;
SpGistInnerTuple postfixTuple = (SpGistInnerTuple)posttuple;
page = BufferGetPage(buffer->buf);
PageIndexTupleDelete(page, xldata->offnumPrefix);
if (PageAddItem(page, (Item)prefixTuple, prefixTuple->size, xldata->offnumPrefix, false, false) !=
xldata->offnumPrefix)
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add item of size %u to SPGiST index page", prefixTuple->size)));
if (xldata->postfixBlkSame)
addOrReplaceTuple(page, (Item)postfixTuple, postfixTuple->size, xldata->offnumPostfix);
PageSetLSN(page, buffer->lsn);
}
void spgRedoPickSplitRestoreLeafTuples(RedoBufferInfo *buffer, void *recorddata, bool destflag, void *pageselect,
void *insertdata, void *tuple)
{
char *ptr = (char *)recorddata;
spgxlogPickSplit *xldata = (spgxlogPickSplit *)ptr;
Page page = buffer->pageinfo.page;
uint8 *leafPageSelect = (uint8 *)pageselect;
OffsetNumber *toInsert = (OffsetNumber *)insertdata;
char *ptrlt = (char *)tuple;
int i;
for (i = 0; i < xldata->nInsert; i++) {
SpGistLeafTuple lt = (SpGistLeafTuple)ptrlt;
ptrlt += lt->size;
if ((destflag && leafPageSelect[i]) || ((!destflag) && (!leafPageSelect[i]))) {
addOrReplaceTuple(page, (Item)lt, lt->size, toInsert[i]);
}
}
}
void spgRedoPickSplitOperatorSrcPage(RedoBufferInfo *srcBuffer, void *recorddata, void *deleteoffset,
BlockNumber blknoInner, void *pageselect, void *insertdata, void *tuple)
{
char *ptr = (char *)recorddata;
spgxlogPickSplit *xldata = (spgxlogPickSplit *)ptr;
Page srcPage = srcBuffer->pageinfo.page;
OffsetNumber *toDelete = (OffsetNumber *)deleteoffset;
SpGistState state;
if (xldata->initSrc) {
Assert(srcBuffer->pageinfo.pagesize == BLCKSZ);
SpGistInitPage(srcPage, SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
} else {
fillFakeState(&state, xldata->stateSrc);
* We have it a bit easier here than in doPickSplit(),
* because we know the inner tuple's location already, so
* we can inject the correct redirection tuple now.
*/
if (!state.isBuild)
spgPageIndexMultiDelete(&state, srcPage, toDelete, xldata->nDelete, SPGIST_REDIRECT, SPGIST_PLACEHOLDER,
blknoInner, xldata->offnumInner);
else
spgPageIndexMultiDelete(&state, srcPage, toDelete, xldata->nDelete, SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
InvalidBlockNumber, InvalidOffsetNumber);
}
spgRedoPickSplitRestoreLeafTuples(srcBuffer, recorddata, false, pageselect, insertdata, tuple);
PageSetLSN(srcPage, srcBuffer->lsn);
}
void spgRedoPickSplitOperatorDestPage(RedoBufferInfo *destBuffer, void *recorddata, void *pageselect, void *insertdata, void *tuple)
{
char *ptr = (char *)recorddata;
spgxlogPickSplit *xldata = (spgxlogPickSplit *)ptr;
Page destPage = destBuffer->pageinfo.page;
if (xldata->initDest) {
Assert(destBuffer->pageinfo.pagesize == BLCKSZ);
SpGistInitPage(destPage, SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
}
spgRedoPickSplitRestoreLeafTuples(destBuffer, recorddata, true, pageselect, insertdata, tuple);
PageSetLSN(destPage, destBuffer->lsn);
}
void spgRedoPickSplitOperatorInnerPage(RedoBufferInfo *innerBuffer, void *recorddata, void *tuple, void *tupleheader,
BlockNumber blknoInner)
{
char *ptr = (char *)recorddata;
spgxlogPickSplit *xldata = (spgxlogPickSplit *)ptr;
Page page = innerBuffer->pageinfo.page;
char *innerTuple = (char *)tuple;
SpGistInnerTupleData *innerTupleHdr = (SpGistInnerTupleData *)tupleheader;
if (xldata->initInner) {
Assert(innerBuffer->pageinfo.pagesize == BLCKSZ);
SpGistInitPage(page, (xldata->storesNulls ? SPGIST_NULLS : 0));
}
addOrReplaceTuple(page, (Item)innerTuple, innerTupleHdr->size, xldata->offnumInner);
if (xldata->innerIsParent) {
SpGistInnerTuple parent;
parent = (SpGistInnerTuple)PageGetItem(page, PageGetItemId(page, xldata->offnumParent));
spgUpdateNodeLink(parent, xldata->nodeI, blknoInner, xldata->offnumInner);
}
PageSetLSN(page, innerBuffer->lsn);
}
void spgRedoPickSplitOperatorParentPage(RedoBufferInfo *parentBuffer, void *recorddata, BlockNumber blknoInner)
{
char *ptr = (char *)recorddata;
spgxlogPickSplit *xldata = (spgxlogPickSplit *)ptr;
SpGistInnerTuple parent;
Page page = parentBuffer->pageinfo.page;
parent = (SpGistInnerTuple)PageGetItem(page, PageGetItemId(page, xldata->offnumParent));
spgUpdateNodeLink(parent, xldata->nodeI, blknoInner, xldata->offnumInner);
PageSetLSN(page, parentBuffer->lsn);
}
void spgRedoVacuumLeafOperatorPage(RedoBufferInfo *buffer, void *recorddata)
{
char *ptr = (char *)recorddata;
spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *)ptr;
Page page = buffer->pageinfo.page;
OffsetNumber *toDead = NULL;
OffsetNumber *toPlaceholder = NULL;
OffsetNumber *moveSrc = NULL;
OffsetNumber *moveDest = NULL;
OffsetNumber *chainSrc = NULL;
OffsetNumber *chainDest = NULL;
SpGistState state;
int i;
fillFakeState(&state, xldata->stateSrc);
ptr += sizeof(spgxlogVacuumLeaf);
toDead = (OffsetNumber *)ptr;
ptr += sizeof(OffsetNumber) * xldata->nDead;
toPlaceholder = (OffsetNumber *)ptr;
ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
moveSrc = (OffsetNumber *)ptr;
ptr += sizeof(OffsetNumber) * xldata->nMove;
moveDest = (OffsetNumber *)ptr;
ptr += sizeof(OffsetNumber) * xldata->nMove;
chainSrc = (OffsetNumber *)ptr;
ptr += sizeof(OffsetNumber) * xldata->nChain;
chainDest = (OffsetNumber *)ptr;
spgPageIndexMultiDelete(&state, page, toDead, xldata->nDead, SPGIST_DEAD, SPGIST_DEAD, InvalidBlockNumber,
InvalidOffsetNumber);
spgPageIndexMultiDelete(&state, page, toPlaceholder, xldata->nPlaceholder, SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
InvalidBlockNumber, InvalidOffsetNumber);
for (i = 0; i < xldata->nMove; i++) {
ItemId idSrc = PageGetItemId(page, moveSrc[i]);
ItemId idDest = PageGetItemId(page, moveDest[i]);
ItemIdData tmp;
tmp = *idSrc;
*idSrc = *idDest;
*idDest = tmp;
}
spgPageIndexMultiDelete(&state, page, moveSrc, xldata->nMove, SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
InvalidBlockNumber, InvalidOffsetNumber);
for (i = 0; i < xldata->nChain; i++) {
SpGistLeafTuple lt;
lt = (SpGistLeafTuple)PageGetItem(page, PageGetItemId(page, chainSrc[i]));
Assert(lt->tupstate == SPGIST_LIVE);
lt->nextOffset = chainDest[i];
}
PageSetLSN(page, buffer->lsn);
}
void spgRedoVacuumRootOperatorPage(RedoBufferInfo *buffer, void *recorddata)
{
char *ptr = (char *)recorddata;
spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *)ptr;
Page page = buffer->pageinfo.page;
OffsetNumber *toDelete = NULL;
ptr += sizeof(spgxlogVacuumRoot);
toDelete = (OffsetNumber *)ptr;
PageIndexMultiDelete(page, toDelete, xldata->nDelete);
PageSetLSN(page, buffer->lsn);
}
void spgRedoVacuumRedirectOperatorPage(RedoBufferInfo *buffer, void *recorddata)
{
char *ptr = (char *)recorddata;
spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *)ptr;
Page page = buffer->pageinfo.page;
OffsetNumber *itemToPlaceholder = NULL;
int i;
itemToPlaceholder = xldata->offsets;
for (i = 0; i < xldata->nToPlaceholder; i++) {
SpGistDeadTuple dt;
dt = (SpGistDeadTuple)PageGetItem(page, PageGetItemId(page, itemToPlaceholder[i]));
Assert(dt->tupstate == SPGIST_REDIRECT);
dt->tupstate = SPGIST_PLACEHOLDER;
ItemPointerSetInvalid(&dt->pointer);
}
SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
Assert(opaque->nRedirection >= xldata->nToPlaceholder);
opaque->nRedirection -= xldata->nToPlaceholder;
opaque->nPlaceholder += xldata->nToPlaceholder;
if (xldata->firstPlaceholder != InvalidOffsetNumber) {
int max = PageGetMaxOffsetNumber(page);
OffsetNumber *toDelete = NULL;
toDelete = (OffsetNumber *)palloc(sizeof(OffsetNumber) * max);
for (i = xldata->firstPlaceholder; i <= max; i++)
toDelete[i - xldata->firstPlaceholder] = i;
i = max - xldata->firstPlaceholder + 1;
Assert(opaque->nPlaceholder >= i);
opaque->nPlaceholder -= i;
PageIndexMultiDelete(page, toDelete, i);
pfree(toDelete);
}
PageSetLSN(page, buffer->lsn);
}
static XLogRecParseState *SpgXlogCreateIndexParseBlock(XLogReaderState *record, uint32 *blocknum)
{
XLogRecParseState *recordstatehead = NULL;
XLogParseBufferAllocListFunc(record, &recordstatehead, NULL);
XLogRecSetBlockDataState(record, SPG_CREATE_INDEX_META_BLOCK_NUM, recordstatehead);
XLogRecParseState *blockstate = NULL;
XLogParseBufferAllocListFunc(record, &blockstate, recordstatehead);
XLogRecSetBlockDataState(record, SPG_CREATE_INDEX_ROOT_BLOCK_NUM, blockstate);
XLogParseBufferAllocListFunc(record, &blockstate, recordstatehead);
XLogRecSetBlockDataState(record, SPG_CREATE_INDEX_LEAF_BLOCK_NUM, blockstate);
*blocknum = 3;
return recordstatehead;
}
static XLogRecParseState *SpgXlogAddLeafParseBlock(XLogReaderState *record, uint32 *blocknum)
{
XLogRecParseState *recordstatehead = NULL;
XLogParseBufferAllocListFunc(record, &recordstatehead, NULL);
XLogRecSetBlockDataState(record, SPG_CREATE_INDEX_META_BLOCK_NUM, recordstatehead);
*blocknum = 1;
char *ptr = XLogRecGetData(record);
spgxlogAddLeaf *xldata = (spgxlogAddLeaf *)ptr;
if (xldata->offnumParent != InvalidOffsetNumber) {
XLogRecParseState *blockstate = NULL;
XLogParseBufferAllocListFunc(record, &blockstate, recordstatehead);
XLogRecSetBlockDataState(record, SPG_ADD_LEAF_PARENT_BLOCK_NUM, blockstate);
++(*blocknum);
}
return recordstatehead;
}
static XLogRecParseState *SpgXlogMoveLeafsParseBlock(XLogReaderState *record, uint32 *blocknum)
{
XLogRecParseState *recordstatehead = NULL;
XLogParseBufferAllocListFunc(record, &recordstatehead, NULL);
XLogRecSetBlockDataState(record, SPG_MOVE_LEAF_DST_BLOCK_NUM, recordstatehead);
XLogRecParseState *blockstate = NULL;
XLogParseBufferAllocListFunc(record, &blockstate, recordstatehead);
XLogRecSetBlockDataState(record, SPG_MOVE_LEAF_SRC_BLOCK_NUM, blockstate);
XLogParseBufferAllocListFunc(record, &blockstate, recordstatehead);
XLogRecSetBlockDataState(record, SPG_MOVE_LEAF_PARENT_BLOCK_NUM, blockstate);
*blocknum = 3;
return recordstatehead;
}
static XLogRecParseState *SpgXlogAddNodeParseBlock(XLogReaderState *record, uint32 *blocknum)
{
XLogRecParseState *recordstatehead = NULL;
XLogParseBufferAllocListFunc(record, &recordstatehead, NULL);
XLogRecSetBlockDataState(record, SPG_ADD_NODE_OLD_BLOCK_NUM, recordstatehead);
*blocknum = 1;
if (XLogRecHasBlockRef(record, 1)) {
XLogRecParseState *blockstate = NULL;
XLogParseBufferAllocListFunc(record, &blockstate, recordstatehead);
XLogRecSetBlockDataState(record, SPG_ADD_NODE_NEW_BLOCK_NUM, blockstate);
++(*blocknum);
char *ptr = XLogRecGetData(record);
spgxlogAddNode *xldata = (spgxlogAddNode *)ptr;
if (xldata->parentBlk == 2) {
XLogParseBufferAllocListFunc(record, &blockstate, recordstatehead);
XLogRecSetBlockDataState(record, SPG_ADD_NODE_PARENT_BLOCK_NUM, blockstate);
++(*blocknum);
}
}
return recordstatehead;
}
static XLogRecParseState *SpgXlogSplitTupleParseBlock(XLogReaderState *record, uint32 *blocknum)
{
XLogRecParseState *recordstatehead = NULL;
XLogParseBufferAllocListFunc(record, &recordstatehead, NULL);
XLogRecSetBlockDataState(record, SPG_SPLIT_TUPLE_OLD_BLOCK_NUM, recordstatehead);
*blocknum = 1;
char *ptr = XLogRecGetData(record);
spgxlogSplitTuple *xldata = (spgxlogSplitTuple *)ptr;
if (!xldata->postfixBlkSame) {
XLogRecParseState *blockstate = NULL;
XLogParseBufferAllocListFunc(record, &blockstate, recordstatehead);
XLogRecSetBlockDataState(record, SPG_SPLIT_TUPLE_NEW_BLOCK_NUM, blockstate);
++(*blocknum);
}
return recordstatehead;
}
static XLogRecParseState *SpgXlogPickSplitParseBlock(XLogReaderState *record, uint32 *blocknum)
{
XLogRecParseState *recordstatehead = NULL;
XLogParseBufferAllocListFunc(record, &recordstatehead, NULL);
XLogRecSetBlockDataState(record, SPG_PICK_SPLIT_SRC_BLOCK_NUM, recordstatehead);
*blocknum = 1;
if (XLogRecHasBlockRef(record, 1)) {
XLogRecParseState *blockstate = NULL;
XLogParseBufferAllocListFunc(record, &blockstate, recordstatehead);
XLogRecSetBlockDataState(record, SPG_PICK_SPLIT_DST_BLOCK_NUM, blockstate);
++(*blocknum);
}
XLogRecParseState *blockstate = NULL;
XLogParseBufferAllocListFunc(record, &blockstate, recordstatehead);
XLogRecSetBlockDataState(record, SPG_PICK_SPLIT_INNER_BLOCK_NUM, blockstate);
++(*blocknum);
XLogParseBufferAllocListFunc(record, &blockstate, recordstatehead);
XLogRecSetBlockDataState(record, SPG_PICK_SPLIT_PARENT_BLOCK_NUM, blockstate);
++(*blocknum);
return recordstatehead;
}
static XLogRecParseState *SpgXlogVacuumLeafParseBlock(XLogReaderState *record, uint32 *blocknum)
{
XLogRecParseState *recordstatehead = NULL;
XLogParseBufferAllocListFunc(record, &recordstatehead, NULL);
XLogRecSetBlockDataState(record, SPG_PICK_VACUUM_LEAF_BLOCK_NUM, recordstatehead);
*blocknum = 1;
return recordstatehead;
}
static XLogRecParseState *SpgXlogVacuumRootParseBlock(XLogReaderState *record, uint32 *blocknum)
{
XLogRecParseState *recordstatehead = NULL;
XLogParseBufferAllocListFunc(record, &recordstatehead, NULL);
XLogRecSetBlockDataState(record, SPG_PICK_VACUUM_ROOT_BLOCK_NUM, recordstatehead);
*blocknum = 1;
return recordstatehead;
}
static XLogRecParseState *SpgXlogVacuumRedirectParseBlock(XLogReaderState *record, uint32 *blocknum)
{
XLogRecParseState *recordstatehead = NULL;
XLogParseBufferAllocListFunc(record, &recordstatehead, NULL);
XLogRecSetBlockDataState(record, SPG_PICK_VACUUM_REDIRECT_BLOCK_NUM, recordstatehead);
*blocknum = 1;
return recordstatehead;
}
XLogRecParseState *SpgRedoParseToBlock(XLogReaderState *record, uint32 *blocknum)
{
*blocknum = 0;
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
XLogRecParseState *recordblockstate = NULL;
switch (info) {
case XLOG_SPGIST_CREATE_INDEX:
recordblockstate = SpgXlogCreateIndexParseBlock(record, blocknum);
break;
case XLOG_SPGIST_ADD_LEAF:
recordblockstate = SpgXlogAddLeafParseBlock(record, blocknum);
break;
case XLOG_SPGIST_MOVE_LEAFS:
recordblockstate = SpgXlogMoveLeafsParseBlock(record, blocknum);
break;
case XLOG_SPGIST_ADD_NODE:
recordblockstate = SpgXlogAddNodeParseBlock(record, blocknum);
break;
case XLOG_SPGIST_SPLIT_TUPLE:
recordblockstate = SpgXlogSplitTupleParseBlock(record, blocknum);
break;
case XLOG_SPGIST_PICKSPLIT:
recordblockstate = SpgXlogPickSplitParseBlock(record, blocknum);
break;
case XLOG_SPGIST_VACUUM_LEAF:
recordblockstate = SpgXlogVacuumLeafParseBlock(record, blocknum);
break;
case XLOG_SPGIST_VACUUM_ROOT:
recordblockstate = SpgXlogVacuumRootParseBlock(record, blocknum);
break;
case XLOG_SPGIST_VACUUM_REDIRECT:
recordblockstate = SpgXlogVacuumRedirectParseBlock(record, blocknum);
break;
default:
ereport(PANIC, (errmsg("spg_redo: unknown op code %u", info)));
}
return NULL;
}