*
* nbtxlog.cpp
* WAL replay logic for btrees.
*
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/gausskernel/storage/access/nbtree/nbtxlog.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/nbtree.h"
#include "access/transam.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "access/xlogproc.h"
#include "storage/procarray.h"
#include "storage/smgr/relfilenode.h"
#include "miscadmin.h"
#include "pgxc/pgxc.h"
#include "access/multi_redo_api.h"
#include "access/parallel_recovery/dispatcher.h"
#ifdef ENABLE_UT
#define static
#endif
* We must keep track of expected insertions due to page splits, and apply
* them manually if they are not seen in the WAL log during replay. This
* makes it safe for page insertion to be a multiple-WAL-action process.
*
* Similarly, deletion of an only child page and deletion of its parent page
* form multiple WAL log entries, and we have to be prepared to follow through
* with the deletion if the log ends between.
*
* The data structure is a simple linked list --- this should be good enough,
* since we don't expect a page split or multi deletion to remain incomplete
* for long. In any case we need to respect the order of operations.
*/
typedef struct bt_incomplete_action {
RelFileNode node;
bool is_split;
bool is_root;
BlockNumber leftblk;
BlockNumber rightblk;
XLogPhyBlock leftpblk;
XLogPhyBlock rightpblk;
uint32 level;
BlockNumber delblk;
XLogPhyBlock delpblk;
} bt_incomplete_action;
static void log_incomplete_split(const RelFileNode *node, BlockNumber leftblk, BlockNumber rightblk,
const XLogPhyBlock &leftpblk, const XLogPhyBlock &rightpblk, bool is_root)
{
MemoryContext oldCtx = NULL;
if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.predo_cxt.parallelRedoCtx);
}
bt_incomplete_action *action = (bt_incomplete_action *)palloc(sizeof(bt_incomplete_action));
if (log_min_messages <= DEBUG4) {
ereport(LOG, (errmsg("[BTREE_ACTION_TRACE]log_incomplete_split: spc:%u,db:%u,rel:%u,"
"leftblk:%u,rightblk:%u,is_root:%d",
node->spcNode, node->dbNode, node->relNode, leftblk, rightblk, is_root)));
}
action->node = *node;
action->is_split = true;
action->is_root = is_root;
action->leftblk = leftblk;
action->rightblk = rightblk;
action->leftpblk = leftpblk;
action->rightpblk = rightpblk;
t_thrd.xlog_cxt.incomplete_actions = lappend(t_thrd.xlog_cxt.incomplete_actions, action);
if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
(void)MemoryContextSwitchTo(oldCtx);
}
}
static void forget_matching_split(const RelFileNode *node, BlockNumber downlink, bool is_root)
{
ListCell *l = NULL;
if (log_min_messages <= DEBUG4) {
ereport(LOG, (errmsg("[BTREE_ACTION_TRACE]forget_matching_split begin: spc:%u,db:%u,rel:%u,"
"downlink:%u, is_root:%d",
node->spcNode, node->dbNode, node->relNode, downlink, is_root)));
}
MemoryContext oldCtx = NULL;
if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.predo_cxt.parallelRedoCtx);
}
foreach (l, t_thrd.xlog_cxt.incomplete_actions) {
bt_incomplete_action *action = (bt_incomplete_action *)lfirst(l);
if (RelFileNodeEquals(*node, action->node) && action->is_split && downlink == action->rightblk) {
if (log_min_messages <= DEBUG4) {
ereport(LOG,
(errmsg("[BTREE_ACTION_TRACE]forget_matching_split successfully: input spc:%u,db:%u,rel:%u,"
"downlink:%u, is_root:%d, action spc:%u,db:%u,rel:%u,"
"is_split:%d,is_root:%d,leftblk:%u,rightblk:%u,level:%u,delblk:%u",
node->spcNode, node->dbNode, node->relNode, downlink, is_root, action->node.spcNode,
action->node.dbNode, action->node.relNode, action->is_split, action->is_root,
action->leftblk, action->rightblk, action->level, action->delblk)));
}
if (is_root != action->is_root)
ereport(LOG, (errmsg("forget_matching_split: fishy is_root data (expected %d, got %d)", action->is_root,
is_root)));
t_thrd.xlog_cxt.incomplete_actions = list_delete_ptr(t_thrd.xlog_cxt.incomplete_actions, action);
pfree(action);
break;
}
}
if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
(void)MemoryContextSwitchTo(oldCtx);
}
}
static void log_incomplete_deletion(const RelFileNode *node, BlockNumber delblk, const XLogPhyBlock &delpblk)
{
MemoryContext oldCtx = NULL;
if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.predo_cxt.parallelRedoCtx);
}
bt_incomplete_action *action = (bt_incomplete_action *)palloc(sizeof(bt_incomplete_action));
if (log_min_messages <= DEBUG4) {
ereport(LOG, (errmsg("[BTREE_ACTION_TRACE]log_incomplete_deletion: spc:%u,db:%u,rel:%u,"
"delblk:%u",
node->spcNode, node->dbNode, node->relNode, delblk)));
}
action->node = *node;
action->is_split = false;
action->delblk = delblk;
action->delpblk = delpblk;
t_thrd.xlog_cxt.incomplete_actions = lappend(t_thrd.xlog_cxt.incomplete_actions, action);
if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
(void)MemoryContextSwitchTo(oldCtx);
}
}
static void forget_matching_deletion(const RelFileNode *node, BlockNumber delblk)
{
ListCell *l = NULL;
MemoryContext oldCtx = NULL;
if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.predo_cxt.parallelRedoCtx);
}
if (SHOW_DEBUG_MESSAGE()) {
ereport(LOG,
(errmsg("[BTREE_ACTION_TRACE]forget_matching_deletion begin: spc:%u,db:%u,rel:%u,"
"delblk:%u",
node->spcNode,
node->dbNode,
node->relNode,
delblk)));
}
foreach (l, t_thrd.xlog_cxt.incomplete_actions) {
bt_incomplete_action *action = (bt_incomplete_action *)lfirst(l);
if (RelFileNodeEquals(*node, action->node) && !action->is_split && delblk == action->delblk) {
if (SHOW_DEBUG_MESSAGE()) {
ereport(LOG,
(errmsg("[BTREE_ACTION_TRACE]forget_matching_deletion successfully: input spc:%u,db:%u,rel:%u,"
"delblk:%u, action spc:%u,db:%u,rel:%u,"
"is_split:%d,is_root:%d,leftblk:%u,rightblk:%u,level:%u,delblk:%u",
node->spcNode, node->dbNode, node->relNode, delblk, action->node.spcNode,
action->node.dbNode, action->node.relNode, action->is_split, action->is_root,
action->leftblk, action->rightblk, action->level, action->delblk)));
}
t_thrd.xlog_cxt.incomplete_actions = list_delete_ptr(t_thrd.xlog_cxt.incomplete_actions, action);
pfree(action);
break;
}
}
if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
(void)MemoryContextSwitchTo(oldCtx);
}
}
static void _bt_restore_meta(XLogReaderState *record, uint8 block_id)
{
RedoBufferInfo metabuf;
char *ptr = NULL;
Size len;
if (SSCheckInitPageXLogSimple(record, block_id, &metabuf) == BLK_DONE) {
return;
}
XLogInitBufferForRedo(record, block_id, &metabuf);
ptr = XLogRecGetBlockData(record, block_id, &len);
BtreeRestoreMetaOperatorPage(&metabuf, (void *)ptr, len);
MarkBufferDirty(metabuf.buf);
UnlockReleaseBuffer(metabuf.buf);
}
* _bt_clear_incomplete_split -- clear INCOMPLETE_SPLIT flag on a page
*
* This is a common subroutine of the redo functions of all the WAL record
* types that can insert a downlink: insert, split, and newroot.
*/
static void _bt_clear_incomplete_split(XLogReaderState *record, uint8 block_id)
{
RedoBufferInfo buffer;
if (XLogReadBufferForRedo(record, block_id, &buffer) == BLK_NEEDS_REDO) {
BtreeXlogClearIncompleteSplit(&buffer);
MarkBufferDirty(buffer.buf);
}
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
}
static void btree_xlog_insert(bool isleaf, bool ismeta, XLogReaderState *record, bool issplitupgrade, bool is_posting)
{
xl_btree_insert *xlrec = (xl_btree_insert *)XLogRecGetData(record);
RelFileNode rnode;
RedoBufferInfo buffer;
char *datapos = NULL;
BlockNumber downlink = 0;
* Insertion to an internal page finishes an incomplete split at the child
* level. Clear the incomplete-split flag in the child. Note: during
* normal operation, the child and parent pages are locked at the same
* time, so that clearing the flag and inserting the downlink appear
* atomic to other backends. We don't bother with that during replay,
* because readers don't care about the incomplete-split flag and there
* cannot be updates happening.
*/
if (!issplitupgrade) {
XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
if (!isleaf) {
datapos = (char *)xlrec + SizeOfBtreeInsert;
errno_t rc = memcpy_s(&downlink, sizeof(BlockNumber), datapos, sizeof(BlockNumber));
securec_check(rc, "\0", "\0");
}
} else {
if (!isleaf) {
_bt_clear_incomplete_split(record, BTREE_INSERT_CHILD_BLOCK_NUM);
}
}
if (XLogReadBufferForRedo(record, BTREE_INSERT_ORIG_BLOCK_NUM, &buffer) == BLK_NEEDS_REDO) {
Size datalen;
datapos = XLogRecGetBlockData(record, BTREE_INSERT_ORIG_BLOCK_NUM, &datalen);
if (!is_posting) {
BtreeXlogInsertOperatorPage(&buffer, (void *)xlrec, (void *)datapos, datalen);
} else {
Assert(isleaf);
btree_xlog_insert_posting_operator_page(&buffer, (void *)xlrec, (void *)datapos, datalen);
}
MarkBufferDirty(buffer.buf);
}
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
* Note: in normal operation, we'd update the metapage while still holding
* lock on the page we inserted into. But during replay it's not
* necessary to hold that lock, since no other index updates can be
* happening concurrently, and readers will cope fine with following an
* obsolete link from the metapage.
*/
if (!issplitupgrade) {
if (ismeta) {
_bt_restore_meta(record, 1);
}
if (!isleaf) {
forget_matching_split(&rnode, downlink, false);
}
} else {
if (ismeta) {
_bt_restore_meta(record, BTREE_INSERT_META_BLOCK_NUM);
}
}
}
static void btree_xlog_split_update(bool onleft, bool isroot, bool is_dedup, XLogReaderState *record)
{
Size datalen;
char *datapos = NULL;
RelFileNode rnode;
BlockNumber leftsib;
BlockNumber rightsib;
BlockNumber rnext;
XLogRecGetBlockTag(record, BTREE_SPLIT_LEFT_BLOCK_NUM, &rnode, NULL, &leftsib);
XLogRecGetBlockTag(record, BTREE_SPLIT_RIGHT_BLOCK_NUM, NULL, NULL, &rightsib);
if (!XLogRecGetBlockTag(record, BTREE_SPLIT_RIGHTNEXT_BLOCK_NUM, NULL, NULL, &rnext)) {
rnext = P_NONE;
}
xl_btree_split_posting *xlrec = (xl_btree_split_posting *)XLogRecGetData(record);
bool isleaf = (xlrec->level == 0);
if (!isleaf) {
_bt_clear_incomplete_split(record, BTREE_SPLIT_CHILD_BLOCK_NUM);
}
RedoBufferInfo rbuf;
XLogRedoAction action = SSCheckInitPageXLog(record, BTREE_SPLIT_RIGHT_BLOCK_NUM, &rbuf);
if (action == BLK_NEEDS_REDO) {
XLogInitBufferForRedo(record, BTREE_SPLIT_RIGHT_BLOCK_NUM, &rbuf);
datapos = XLogRecGetBlockData(record, BTREE_SPLIT_RIGHT_BLOCK_NUM, &datalen);
BtreeXlogSplitOperatorRightpage(&rbuf, (void *)xlrec, leftsib, rnext, (void *)datapos, datalen);
MarkBufferDirty(rbuf.buf);
}
RedoBufferInfo lbuf;
if (XLogReadBufferForRedo(record, BTREE_SPLIT_LEFT_BLOCK_NUM, &lbuf) == BLK_NEEDS_REDO) {
datapos = XLogRecGetBlockData(record, BTREE_SPLIT_LEFT_BLOCK_NUM, &datalen);
BtreeXlogSplitOperatorLeftpage(&lbuf, (void *)xlrec, rightsib, onleft, is_dedup, (void *)datapos, datalen);
MarkBufferDirty(lbuf.buf);
}
if (BufferIsValid(lbuf.buf)) {
UnlockReleaseBuffer(lbuf.buf);
}
UnlockReleaseBuffer(rbuf.buf);
if (rnext != P_NONE) {
RedoBufferInfo buffer;
if (XLogReadBufferForRedo(record, BTREE_SPLIT_RIGHTNEXT_BLOCK_NUM, &buffer) == BLK_NEEDS_REDO) {
BtreeXlogSplitOperatorNextpage(&buffer, rightsib);
MarkBufferDirty(buffer.buf);
}
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
}
}
static void btree_xlog_split(bool onleft, bool isroot, XLogReaderState *record, bool issplitupgrade, bool is_dedup_upgrade)
{
if (issplitupgrade) {
btree_xlog_split_update(onleft, isroot, is_dedup_upgrade, record);
return;
}
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_split_posting *xlrec = (xl_btree_split_posting *)XLogRecGetData(record);
bool isleaf = (xlrec->level == 0);
RedoBufferInfo lbuf;
RedoBufferInfo rbuf;
Page rpage;
BTPageOpaqueInternal ropaque;
char *datapos = NULL;
Size datalen;
Item left_hikey = NULL;
Size left_hikeysz = 0;
RelFileNode rnode;
XLogPhyBlock leftpblk;
XLogPhyBlock rightpblk;
BlockNumber leftsib;
BlockNumber rightsib;
BlockNumber rnext;
XLogRecGetBlockTag(record, 0, &rnode, NULL, &leftsib, &leftpblk);
XLogRecGetBlockTag(record, 1, NULL, NULL, &rightsib, &rightpblk);
if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &rnext)) {
rnext = P_NONE;
}
if (!isleaf) {
BlockNumber downlink;
datapos = (char *)xlrec + SizeOfBtreeSplit;
downlink = BlockIdGetBlockNumber((BlockId)datapos);
forget_matching_split(&rnode, downlink, false);
}
XLogRedoAction action = SSCheckInitPageXLog(record, 1, &rbuf);
if (action == BLK_NEEDS_REDO) {
XLogInitBufferForRedo(record, 1, &rbuf);
datapos = XLogRecGetBlockData(record, 1, &datalen);
rpage = rbuf.pageinfo.page;
_bt_pageinit(rpage, rbuf.pageinfo.pagesize);
ropaque = (BTPageOpaqueInternal)PageGetSpecialPointer(rpage);
ropaque->btpo_prev = leftsib;
ropaque->btpo_next = rnext;
ropaque->btpo.level = xlrec->level;
ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
ropaque->btpo_cycleid = 0;
_bt_restore_page(rpage, datapos, (int)datalen);
* On leaf level, the high key of the left page is equal to the first key
* on the right page.
*/
if (isleaf) {
ItemId hiItemId = PageGetItemId(rpage, P_FIRSTDATAKEY(ropaque));
left_hikey = PageGetItem(rpage, hiItemId);
left_hikeysz = ItemIdGetLength(hiItemId);
}
PageSetLSN(rpage, lsn);
MarkBufferDirty(rbuf.buf);
}
* Now reconstruct left (original) sibling page
*/
if (XLogReadBufferForRedo(record, 0, &lbuf) == BLK_NEEDS_REDO) {
* To retain the same physical order of the tuples that they had, we
* initialize a temporary empty page for the left page and add all the
* items to that in item number order. This mirrors how _bt_split()
* works. It's not strictly required to retain the same physical
* order, as long as the items are in the correct item number order,
* but it helps debugging. See also _bt_restore_page(), which does
* the same for the right page.
*/
Page lpage = lbuf.pageinfo.page;
BTPageOpaqueInternal lopaque = (BTPageOpaqueInternal)PageGetSpecialPointer(lpage);
OffsetNumber off;
Item newitem = NULL;
Size newitemsz = 0;
Page newlpage;
OffsetNumber leftoff;
IndexTuple new_posting = NULL;
OffsetNumber replace_posting_off = InvalidOffsetNumber;
datapos = XLogRecGetBlockData(record, 0, &datalen);
if (!isleaf) {
left_hikey = (Item)datapos;
left_hikeysz = MAXALIGN(IndexTupleSize(left_hikey));
datapos += left_hikeysz;
datalen -= left_hikeysz;
}
if (onleft || (is_dedup_upgrade && xlrec->posting_off != 0)) {
newitem = (Item)datapos;
newitemsz = MAXALIGN(IndexTupleSize(newitem));
datapos += newitemsz;
datalen -= newitemsz;
if (is_dedup_upgrade && xlrec->posting_off != 0) {
replace_posting_off = OffsetNumberPrev(xlrec->newitemoff);
newitem = (Item)CopyIndexTuple((IndexTuple)newitem);
ItemId itemid = PageGetItemId(lpage, replace_posting_off);
IndexTuple orig_posting = (IndexTuple)PageGetItem(lpage, itemid);
new_posting = btree_dedup_swap_posting((IndexTuple)newitem, orig_posting, xlrec->posting_off);
}
}
Assert(datalen == 0);
START_CRIT_SECTION();
newlpage = PageGetTempPageCopySpecial(lpage);
END_CRIT_SECTION();
leftoff = P_HIKEY;
if (PageAddItem(newlpage, left_hikey, left_hikeysz, P_HIKEY, false, false) == InvalidOffsetNumber)
ereport(PANIC, (errmsg("failed to add high key to left page after split")));
leftoff = OffsetNumberNext(leftoff);
for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstright; off++) {
ItemId itemid;
Size itemsz;
Item item;
if (off == replace_posting_off)
{
Assert(onleft || xlrec->firstright == xlrec->newitemoff);
if (PageAddItem(newlpage, (Item)new_posting, MAXALIGN(IndexTupleSize(new_posting)), leftoff, false,
false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new posting list item to left page after split");
leftoff = OffsetNumberNext(leftoff);
continue;
} else if (onleft && off == xlrec->newitemoff) {
if (PageAddItem(newlpage, newitem, newitemsz, leftoff, false, false) == InvalidOffsetNumber)
ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("failed to add new item to left page after split")));
leftoff = OffsetNumberNext(leftoff);
}
itemid = PageGetItemId(lpage, off);
itemsz = ItemIdGetLength(itemid);
item = PageGetItem(lpage, itemid);
if (PageAddItem(newlpage, item, itemsz, leftoff, false, false) == InvalidOffsetNumber)
ereport(ERROR,
(errcode(ERRCODE_INDEX_CORRUPTED), errmsg("failed to add old item to left page after split")));
leftoff = OffsetNumberNext(leftoff);
}
if (onleft && off == xlrec->newitemoff) {
if (PageAddItem(newlpage, newitem, newitemsz, leftoff, false, false) == InvalidOffsetNumber)
ereport(ERROR,
(errcode(ERRCODE_INDEX_CORRUPTED), errmsg("failed to add new item to left page after split")));
leftoff = OffsetNumberNext(leftoff);
}
PageRestoreTempPage(newlpage, lpage);
lopaque = (BTPageOpaqueInternal)PageGetSpecialPointer(lpage);
lopaque->btpo_flags = isleaf ? BTP_LEAF : 0;
lopaque->btpo_next = rightsib;
lopaque->btpo_cycleid = 0;
PageSetLSN(lpage, lsn);
MarkBufferDirty(lbuf.buf);
}
if (BufferIsValid(lbuf.buf)) {
UnlockReleaseBuffer(lbuf.buf);
}
UnlockReleaseBuffer(rbuf.buf);
* Fix left-link of the page to the right of the new right sibling.
*
* Note: in normal operation, we do this while still holding lock on the
* two split pages. However, that's not necessary for correctness in WAL
* replay, because no other index update can be in progress, and readers
* will cope properly when following an obsolete left-link.
*/
if (rnext != P_NONE) {
RedoBufferInfo buffer;
if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) {
Page page = buffer.pageinfo.page;
BTPageOpaqueInternal pageop = (BTPageOpaqueInternal)PageGetSpecialPointer(page);
pageop->btpo_prev = rightsib;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer.buf);
}
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
}
log_incomplete_split(&rnode, leftsib, rightsib, leftpblk, rightpblk, isroot);
}
static void btree_xlog_dedup(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_dedup *xlrec = (xl_btree_dedup *)XLogRecGetData(record);
RedoBufferInfo redobuf;
if (XLogReadBufferForRedo(record, 0, &redobuf) == BLK_NEEDS_REDO) {
char *ptr = XLogRecGetBlockData(record, 0, NULL);
Page page = redobuf.pageinfo.page;
BTPageOpaqueInternal opaque = (BTPageOpaqueInternal)PageGetSpecialPointer(page);
BTDedupState state = (BTDedupState)palloc(sizeof(BTDedupStateData));
state->deduplicate = true;
state->num_max_items = 0;
state->max_posting_size = BTREE_MAX_ITEM_SIZE(page);
state->base = NULL;
state->base_off = InvalidOffsetNumber;
state->base_tuple_size = 0;
state->heap_tids = (ItemPointer)palloc(state->max_posting_size);
state->num_heap_tids = 0;
state->num_items = 0;
state->size_freed = 0;
state->num_intervals = 0;
OffsetNumber minoff = P_FIRSTDATAKEY(opaque);
OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
Page newpage = PageGetTempPageCopySpecial(page);
if (!P_RIGHTMOST(opaque)) {
ItemId itemid = PageGetItemId(page, P_HIKEY);
Size itemsz = ItemIdGetLength(itemid);
IndexTuple item = (IndexTuple)PageGetItem(page, itemid);
if (PageAddItem(newpage, (Item)item, itemsz, P_HIKEY, false, false) == InvalidOffsetNumber) {
elog(ERROR, "deduplication failed to add highkey");
}
}
BTDedupInterval intervals = (BTDedupInterval)ptr;
for (OffsetNumber offnum = minoff; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) {
ItemId itemid = PageGetItemId(page, offnum);
IndexTuple itup = (IndexTuple)PageGetItem(page, itemid);
if (offnum == minoff) {
btree_dedup_begin(state, itup, offnum);
} else if (state->num_intervals < xlrec->num_intervals &&
state->base_off == intervals[state->num_intervals].base_off &&
state->num_items < intervals[state->num_intervals].num_items) {
if (!btree_dedup_merge(state, itup)) {
elog(ERROR, "deduplication failed to add heap tid to pending posting list");
}
} else {
btree_dedup_end(newpage, state);
btree_dedup_begin(state, itup, offnum);
}
}
btree_dedup_end(newpage, state);
Assert(state->num_intervals == xlrec->num_intervals);
Assert(memcmp(state->intervals, intervals, state->num_intervals * sizeof(BTDedupIntervalData)) == 0);
if (P_HAS_GARBAGE(opaque)) {
BTPageOpaqueInternal nopaque = (BTPageOpaqueInternal)PageGetSpecialPointer(newpage);
nopaque->btpo_flags &= ~BTP_HAS_GARBAGE;
}
PageRestoreTempPage(newpage, page);
PageSetLSN(page, lsn);
MarkBufferDirty(redobuf.buf);
pfree(state->heap_tids);
pfree(state);
}
if (BufferIsValid(redobuf.buf))
UnlockReleaseBuffer(redobuf.buf);
}
static void btree_xlog_vacuum(XLogReaderState *record, bool is_dedup_upgrade)
{
xl_btree_vacuum_posting *xlrec = (xl_btree_vacuum_posting *)XLogRecGetData(record);
RedoBufferInfo redobuf;
* Like in btvacuumpage(), we need to take a cleanup lock on every leaf
* page. See nbtree/README for details.
*/
if (XLogReadBufferForRedoExtended(record, BTREE_VACUUM_ORIG_BLOCK_NUM, RBM_NORMAL, true, &redobuf) ==
BLK_NEEDS_REDO) {
char *ptr = NULL;
Size len = 0;
ptr = XLogRecGetBlockData(record, BTREE_VACUUM_ORIG_BLOCK_NUM, &len);
if (is_dedup_upgrade) {
uint16 update_len PG_USED_FOR_ASSERTS_ONLY = 0;
uint16 delete_len = (xlrec->num_deleted * sizeof(OffsetNumber));
if (xlrec->num_updated > 0) {
Page page = redobuf.pageinfo.page;
OffsetNumber *updated_offsets = (OffsetNumber *)(ptr + xlrec->num_deleted * sizeof(OffsetNumber));
xl_btree_update *updates =
(xl_btree_update *)((char *)updated_offsets + xlrec->num_updated * sizeof(OffsetNumber));
update_len += (xlrec->num_updated * sizeof(OffsetNumber));
for (int i = 0; i < xlrec->num_updated; i++) {
ItemId itemid = PageGetItemId(page, updated_offsets[i]);
IndexTuple orig_tuple = (IndexTuple)PageGetItem(page, itemid);
Size deleted_tids_size = updates->num_deleted_tids * sizeof(uint16);
BTVacuumPosting vac_posting = (BTVacuumPosting)palloc(offsetof(BTVacuumPostingData, delete_tids) +
deleted_tids_size);
vac_posting->updated_offset = updated_offsets[i];
vac_posting->itup = orig_tuple;
vac_posting->num_deleted_tids = updates->num_deleted_tids;
errno_t rc = memcpy_s(vac_posting->delete_tids,
deleted_tids_size, (char *) updates + SizeOfBtreeUpdate, deleted_tids_size);
securec_check(rc, "", "");
btree_dedup_update_posting(vac_posting);
Size itemsz = MAXALIGN(IndexTupleSize(vac_posting->itup));
if (!page_index_tuple_overwrite(page, updated_offsets[i], (Item)vac_posting->itup, itemsz))
elog(PANIC, "failed to update partially dead item");
pfree(vac_posting->itup);
pfree(vac_posting);
update_len += (SizeOfBtreeUpdate + deleted_tids_size);
updates = (xl_btree_update *)((char *)updates + SizeOfBtreeUpdate + deleted_tids_size);
}
}
Assert(len == (update_len + delete_len));
BtreeXlogVacuumOperatorPage(&redobuf, (void *)xlrec, (void *)ptr, delete_len);
} else {
BtreeXlogVacuumOperatorPage(&redobuf, (void *)xlrec, (void *)ptr, len);
}
MarkBufferDirty(redobuf.buf);
}
if (BufferIsValid(redobuf.buf))
UnlockReleaseBuffer(redobuf.buf);
}
static void btree_xlog_delete(XLogReaderState *record)
{
RedoBufferInfo buffer;
* If we have any conflict processing to do, it must happen before we
* update the page.
*
* Btree delete records can conflict with standby queries. You might
* think that vacuum records would conflict as well, but we've handled
* that already. XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
* cleaned by the vacuum of the heap and so we can resolve any conflicts
* just once when that arrives. After that we know that no conflicts
* exist from individual btree vacuum records on that index.
*
* XXX: In MPPDB, we don't support hot_standby query on standby.
*/
if (XLogReadBufferForRedo(record, BTREE_DELETE_ORIG_BLOCK_NUM, &buffer) == BLK_NEEDS_REDO) {
BtreeXlogDeleteOperatorPage(&buffer, (void *)XLogRecGetData(record), XLogRecGetDataLen(record));
MarkBufferDirty(buffer.buf);
}
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
}
static void btree_xlog_delete_page(uint8 info, XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_delete_page *xlrec = (xl_btree_delete_page *)XLogRecGetData(record);
RelFileNode rnode;
BlockNumber parent;
BlockNumber target;
BlockNumber leftsib;
BlockNumber rightsib;
XLogPhyBlock pblk;
RedoBufferInfo buffer;
Page page;
BTPageOpaqueInternal pageop;
leftsib = xlrec->leftblk;
rightsib = xlrec->rightblk;
XLogRecGetBlockTag(record, 0, &rnode, NULL, &target);
XLogRecGetBlockTag(record, 3, NULL, NULL, &parent, &pblk);
* In normal operation, we would lock all the pages this WAL record
* touches before changing any of them. In WAL replay, it should be okay
* to lock just one page at a time, since no concurrent index updates can
* be happening, and readers should not care whether they arrive at the
* target page or not (since it's surely empty).
*
* parent page
*/
if (XLogReadBufferForRedo(record, 3, &buffer) == BLK_NEEDS_REDO) {
OffsetNumber poffset, maxoff;
page = buffer.pageinfo.page;
pageop = (BTPageOpaqueInternal)PageGetSpecialPointer(page);
poffset = xlrec->poffset;
maxoff = PageGetMaxOffsetNumber(page);
if (poffset >= maxoff) {
Assert(info == XLOG_BTREE_MARK_PAGE_HALFDEAD);
Assert(poffset == P_FIRSTDATAKEY(pageop));
PageIndexTupleDelete(page, poffset);
pageop->btpo_flags |= BTP_HALF_DEAD;
} else {
ItemId itemid;
IndexTuple itup;
OffsetNumber nextoffset;
Assert(info != XLOG_BTREE_MARK_PAGE_HALFDEAD);
itemid = PageGetItemId(page, poffset);
itup = (IndexTuple)PageGetItem(page, itemid);
ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY);
nextoffset = OffsetNumberNext(poffset);
PageIndexTupleDelete(page, nextoffset);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer.buf);
}
if (BufferIsValid(buffer.buf))
UnlockReleaseBuffer(buffer.buf);
if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) {
page = buffer.pageinfo.page;
pageop = (BTPageOpaqueInternal)PageGetSpecialPointer(page);
pageop->btpo_prev = leftsib;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer.buf);
}
if (BufferIsValid(buffer.buf))
UnlockReleaseBuffer(buffer.buf);
if (leftsib != P_NONE) {
if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO) {
page = buffer.pageinfo.page;
pageop = (BTPageOpaqueInternal)PageGetSpecialPointer(page);
pageop->btpo_next = rightsib;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer.buf);
}
if (BufferIsValid(buffer.buf))
UnlockReleaseBuffer(buffer.buf);
}
XLogRedoAction action = SSCheckInitPageXLog(record, 0, &buffer);
if (action == BLK_NEEDS_REDO) {
XLogInitBufferForRedo(record, 0, &buffer);
page = buffer.pageinfo.page;
_bt_pageinit(page, buffer.pageinfo.pagesize);
pageop = (BTPageOpaqueInternal)PageGetSpecialPointer(page);
pageop->btpo_prev = leftsib;
pageop->btpo_next = rightsib;
pageop->btpo_flags = BTP_DELETED;
pageop->btpo_cycleid = 0;
((BTPageOpaque)pageop)->xact = xlrec->btpo_xact;
PageSetLSN(page, lsn);
MarkBufferDirty(buffer.buf);
}
UnlockReleaseBuffer(buffer.buf);
if (info == XLOG_BTREE_UNLINK_PAGE_META)
_bt_restore_meta(record, 4);
forget_matching_deletion(&rnode, target);
if (info == XLOG_BTREE_MARK_PAGE_HALFDEAD) {
log_incomplete_deletion(&rnode, parent, pblk);
}
}
static void btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record)
{
xl_btree_mark_page_halfdead *xlrec = (xl_btree_mark_page_halfdead *)XLogRecGetData(record);
RedoBufferInfo pbuffer;
* In normal operation, we would lock all the pages this WAL record
* touches before changing any of them. In WAL replay, it should be okay
* to lock just one page at a time, since no concurrent index updates can
* be happening, and readers should not care whether they arrive at the
* target page or not (since it's surely empty).
*/
if (XLogReadBufferForRedo(record, BTREE_HALF_DEAD_PARENT_PAGE_NUM, &pbuffer) == BLK_NEEDS_REDO) {
BtreeXlogHalfdeadPageOperatorParentpage(&pbuffer, xlrec);
MarkBufferDirty(pbuffer.buf);
}
if (BufferIsValid(pbuffer.buf)) {
UnlockReleaseBuffer(pbuffer.buf);
}
RedoBufferInfo lbuffer;
if (SSCheckInitPageXLogSimple(record, BTREE_HALF_DEAD_LEAF_PAGE_NUM, &lbuffer) == BLK_DONE) {
return;
}
XLogInitBufferForRedo(record, BTREE_HALF_DEAD_LEAF_PAGE_NUM, &lbuffer);
BtreeXlogHalfdeadPageOperatorLeafpage(&lbuffer, xlrec);
MarkBufferDirty(lbuffer.buf);
UnlockReleaseBuffer(lbuffer.buf);
}
static void btree_xlog_unlink_page(uint8 info, XLogReaderState *record)
{
xl_btree_unlink_page *xlrec = (xl_btree_unlink_page *)XLogRecGetData(record);
BlockNumber leftsib;
BlockNumber rightsib;
leftsib = xlrec->leftsib;
rightsib = xlrec->rightsib;
* In normal operation, we would lock all the pages this WAL record
* touches before changing any of them. In WAL replay, it should be okay
* to lock just one page at a time, since no concurrent index updates can
* be happening, and readers should not care whether they arrive at the
* target page or not (since it's surely empty).
*/
RedoBufferInfo rbuffer;
if (XLogReadBufferForRedo(record, BTREE_UNLINK_PAGE_RIGHT_NUM, &rbuffer) == BLK_NEEDS_REDO) {
BtreeXlogUnlinkPageOperatorRightpage(&rbuffer, xlrec);
MarkBufferDirty(rbuffer.buf);
}
if (BufferIsValid(rbuffer.buf)) {
UnlockReleaseBuffer(rbuffer.buf);
}
if (leftsib != P_NONE) {
RedoBufferInfo lbuffer;
if (XLogReadBufferForRedo(record, BTREE_UNLINK_PAGE_LEFT_NUM, &lbuffer) == BLK_NEEDS_REDO) {
BtreeXlogUnlinkPageOperatorLeftpage(&lbuffer, xlrec);
MarkBufferDirty(lbuffer.buf);
}
if (BufferIsValid(lbuffer.buf)) {
UnlockReleaseBuffer(lbuffer.buf);
}
}
RedoBufferInfo buffer;
XLogRedoAction action = SSCheckInitPageXLog(record, BTREE_UNLINK_PAGE_CUR_PAGE_NUM, &buffer);
if (action == BLK_NEEDS_REDO) {
XLogInitBufferForRedo(record, BTREE_UNLINK_PAGE_CUR_PAGE_NUM, &buffer);
BtreeXlogUnlinkPageOperatorCurpage(&buffer, xlrec);
MarkBufferDirty(buffer.buf);
}
UnlockReleaseBuffer(buffer.buf);
* If we deleted a parent of the targeted leaf page, instead of the leaf
* itself, update the leaf to point to the next remaining child in the
* branch.
*/
if (XLogRecHasBlockRef(record, BTREE_UNLINK_PAGE_CHILD_NUM)) {
* There is no real data on the page, so we just re-create it from
* scratch using the information from the WAL record.
*/
RedoBufferInfo cbuffer;
XLogRedoAction action = SSCheckInitPageXLog(record, BTREE_UNLINK_PAGE_CHILD_NUM, &cbuffer);
if (action == BLK_NEEDS_REDO) {
XLogInitBufferForRedo(record, BTREE_UNLINK_PAGE_CHILD_NUM, &cbuffer);
BtreeXlogUnlinkPageOperatorChildpage(&cbuffer, xlrec);
MarkBufferDirty(cbuffer.buf);
}
UnlockReleaseBuffer(cbuffer.buf);
}
if (info == XLOG_BTREE_UNLINK_PAGE_META) {
_bt_restore_meta(record, BTREE_UNLINK_PAGE_META_NUM);
}
}
static void btree_xlog_newroot_update(XLogReaderState *record)
{
xl_btree_newroot *xlrec = (xl_btree_newroot *)XLogRecGetData(record);
BlockNumber downlink = 0;
RedoBufferInfo buffer;
RedoBufferInfo lbuffer;
char *ptr = NULL;
Size len;
XLogRedoAction action = SSCheckInitPageXLog(record, BTREE_NEWROOT_ORIG_BLOCK_NUM, &buffer);
if (action == BLK_NEEDS_REDO) {
XLogInitBufferForRedo(record, BTREE_NEWROOT_ORIG_BLOCK_NUM, &buffer);
ptr = XLogRecGetBlockData(record, BTREE_NEWROOT_ORIG_BLOCK_NUM, &len);
BtreeXlogNewrootOperatorPage(&buffer, (void *)xlrec, (void *)ptr, len, &downlink);
MarkBufferDirty(buffer.buf);
}
UnlockReleaseBuffer(buffer.buf);
lbuffer.buf = InvalidBuffer;
if (xlrec->level > 0 && XLogReadBufferForRedo(record, BTREE_NEWROOT_LEFT_BLOCK_NUM, &lbuffer) == BLK_NEEDS_REDO) {
BtreeXlogClearIncompleteSplit(&lbuffer);
MarkBufferDirty(lbuffer.buf);
}
if (BufferIsValid(lbuffer.buf)) {
UnlockReleaseBuffer(lbuffer.buf);
}
_bt_restore_meta(record, BTREE_NEWROOT_META_BLOCK_NUM);
}
static void btree_xlog_newroot(XLogReaderState *record, bool issplitupgrade)
{
if (issplitupgrade) {
btree_xlog_newroot_update(record);
return;
}
XLogRecPtr lsn = record->EndRecPtr;
xl_btree_newroot *xlrec = (xl_btree_newroot *)XLogRecGetData(record);
RelFileNode rnode;
Page page;
BTPageOpaqueInternal pageop;
BlockNumber downlink = 0;
RedoBufferInfo buffer;
char *ptr = NULL;
Size len;
XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
XLogRedoAction action = SSCheckInitPageXLog(record, 0, &buffer);
if (action == BLK_NEEDS_REDO) {
XLogInitBufferForRedo(record, 0, &buffer);
page = buffer.pageinfo.page;
_bt_pageinit(page, buffer.pageinfo.pagesize);
pageop = (BTPageOpaqueInternal)PageGetSpecialPointer(page);
pageop->btpo_flags = BTP_ROOT;
pageop->btpo_prev = pageop->btpo_next = P_NONE;
pageop->btpo.level = xlrec->level;
if (xlrec->level == 0) {
pageop->btpo_flags |= BTP_LEAF;
}
pageop->btpo_cycleid = 0;
if (xlrec->level > 0) {
IndexTuple itup;
ptr = XLogRecGetBlockData(record, 0, &len);
_bt_restore_page(page, ptr, len);
itup = (IndexTuple)PageGetItem(page, PageGetItemId(page, P_FIRSTKEY));
downlink = ItemPointerGetBlockNumber(&(itup->t_tid));
Assert(ItemPointerGetOffsetNumber(&(itup->t_tid)) == P_HIKEY);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer.buf);
}
UnlockReleaseBuffer(buffer.buf);
if (xlrec->level > 0) {
forget_matching_split(&rnode, downlink, true);
}
_bt_restore_meta(record, 1);
}
bool IsBtreeVacuum(const XLogReaderState *record)
{
uint8 info = (XLogRecGetInfo(record) & (~XLR_INFO_MASK));
if (XLogRecGetRmid(record) == RM_BTREE_ID) {
if ((info == XLOG_BTREE_REUSE_PAGE) || (info == XLOG_BTREE_VACUUM) || (info == XLOG_BTREE_DELETE) ||
(info == XLOG_BTREE_UNLINK_PAGE) || (info == XLOG_BTREE_UNLINK_PAGE_META) ||
(info == XLOG_BTREE_MARK_PAGE_HALFDEAD)) {
return true;
}
}
return false;
}
static void btree_xlog_reuse_page(XLogReaderState *record)
{
xl_btree_reuse_page *xlrec = (xl_btree_reuse_page *)XLogRecGetData(record);
* Btree reuse_page records exist to provide a conflict point when we
* reuse pages in the index via the FSM. That's all they do though.
*
* latestRemovedXid was the page's btpo.xact. The btpo.xact <
* RecentGlobalXmin test in _bt_page_recyclable() conceptually mirrors the
* pgxact->xmin > limitXmin test in GetConflictingVirtualXIDs().
* Consequently, one XID value achieves the same exclusion effect on
* master and standby.
*/
RelFileNode tmp_node;
RelFileNodeCopy(tmp_node, xlrec->node, XLogRecGetBucketId(record));
if (InHotStandby && g_supportHotStandby && !IS_EXRTO_READ) {
XLogRecPtr lsn = record->ReadRecPtr;
ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, tmp_node, lsn);
}
}
void btree_redo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
bool issplitupgrade = (XLogRecGetInfo(record) & BTREE_SPLIT_UPGRADE_FLAG) != 0;
bool isdelupgrade = (XLogRecGetInfo(record) & BTREE_DELETE_UPGRADE_FLAG) != 0;
bool is_dedup_upgrade = (XLogRecGetInfo(record) & BTREE_DEDUPLICATION_FLAG) != 0;
switch (info) {
case XLOG_BTREE_INSERT_LEAF:
btree_xlog_insert(true, false, record, issplitupgrade, false);
break;
case XLOG_BTREE_INSERT_UPPER:
btree_xlog_insert(false, false, record, issplitupgrade, false);
break;
case XLOG_BTREE_INSERT_META:
btree_xlog_insert(false, true, record, issplitupgrade, false);
break;
case XLOG_BTREE_SPLIT_L:
btree_xlog_split(true, false, record, issplitupgrade, is_dedup_upgrade);
break;
case XLOG_BTREE_SPLIT_R:
btree_xlog_split(false, false, record, issplitupgrade, is_dedup_upgrade);
break;
case XLOG_BTREE_SPLIT_L_ROOT:
btree_xlog_split(true, true, record, issplitupgrade, is_dedup_upgrade);
break;
case XLOG_BTREE_SPLIT_R_ROOT:
btree_xlog_split(false, true, record, issplitupgrade, is_dedup_upgrade);
break;
case XLOG_BTREE_VACUUM:
btree_xlog_vacuum(record, is_dedup_upgrade);
break;
case XLOG_BTREE_DELETE:
btree_xlog_delete(record);
break;
case XLOG_BTREE_UNLINK_PAGE:
case XLOG_BTREE_UNLINK_PAGE_META:
if (!isdelupgrade) {
btree_xlog_delete_page(info, record);
} else {
btree_xlog_unlink_page(info, record);
}
break;
case XLOG_BTREE_MARK_PAGE_HALFDEAD:
if (!isdelupgrade) {
btree_xlog_delete_page(info, record);
} else {
btree_xlog_mark_page_halfdead(info, record);
}
break;
case XLOG_BTREE_NEWROOT:
btree_xlog_newroot(record, issplitupgrade);
break;
case XLOG_BTREE_REUSE_PAGE:
btree_xlog_reuse_page(record);
break;
case XLOG_BTREE_INSERT_POST:
btree_xlog_insert(true, false, record, issplitupgrade, true);
break;
case XLOG_BTREE_DEDUP:
btree_xlog_dedup(record);
break;
default:
ereport(PANIC, (errmsg("btree_redo: unknown op code %hhu", info)));
}
}
void btree_xlog_startup(void)
{
t_thrd.xlog_cxt.incomplete_actions = NIL;
}
void btree_xlog_finish_incomplete_split(bt_incomplete_action *action)
{
Buffer lbuf, rbuf;
Page lpage, rpage;
BTPageOpaqueInternal lpageop, rpageop;
bool is_only = false;
Relation reln;
lbuf = XLogReadBufferExtended(action->node, MAIN_FORKNUM, action->leftblk, RBM_NORMAL, &action->leftpblk);
if (BufferIsValid(lbuf))
LockBuffer(lbuf, BUFFER_LOCK_EXCLUSIVE);
else
ereport(PANIC, (errmsg("btree_xlog_cleanup: left block unfound")));
lpage = (Page)BufferGetPage(lbuf);
lpageop = (BTPageOpaqueInternal)PageGetSpecialPointer(lpage);
rbuf = XLogReadBufferExtended(action->node, MAIN_FORKNUM, action->rightblk, RBM_NORMAL, &action->rightpblk);
if (BufferIsValid(rbuf))
LockBuffer(rbuf, BUFFER_LOCK_EXCLUSIVE);
else
ereport(PANIC, (errmsg("btree_xlog_cleanup: right block unfound")));
rpage = (Page)BufferGetPage(rbuf);
rpageop = (BTPageOpaqueInternal)PageGetSpecialPointer(rpage);
is_only = P_LEFTMOST(lpageop) && P_RIGHTMOST(rpageop);
reln = CreateFakeRelcacheEntry(action->node);
_bt_insert_parent(reln, lbuf, rbuf, NULL, action->is_root, is_only);
FreeFakeRelcacheEntry(reln);
}
void btree_xlog_finish_incomplete_deletion(const bt_incomplete_action *action)
{
Buffer buf;
buf = XLogReadBufferExtended(action->node, MAIN_FORKNUM, action->delblk, RBM_NORMAL, &action->delpblk);
if (BufferIsValid(buf)) {
Relation reln;
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
reln = CreateFakeRelcacheEntry(action->node);
if (_bt_pagedel(reln, buf, NULL) == 0) {
ereport(PANIC, (errmsg("btree_xlog_cleanup: _bt_pagedel failed")));
}
FreeFakeRelcacheEntry(reln);
}
}
void *btree_get_incomplete_actions()
{
List *incompleteActions = t_thrd.xlog_cxt.incomplete_actions;
t_thrd.xlog_cxt.incomplete_actions = NIL;
return incompleteActions;
}
void btree_xlog_cleanup(void)
{
ListCell *l = NULL;
TimestampTz start_time = GetCurrentTimestamp();
int64 duration = 0;
MemoryContext oldCtx = NULL;
if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
oldCtx = MemoryContextSwitchTo(g_instance.comm_cxt.predo_cxt.parallelRedoCtx);
}
if (log_min_messages <= DEBUG4) {
ereport(LOG, (errmsg("[BTREE_ACTION_TRACE]btree_xlog_cleanup")));
}
foreach (l, t_thrd.xlog_cxt.incomplete_actions) {
bt_incomplete_action *action = (bt_incomplete_action *)lfirst(l);
t_thrd.xlog_cxt.imcompleteActionCnt++;
ereport(WARNING, (errmsg("[BTREE_ACTION_TRACE]btree_xlog_cleanup: action spc:%u,db:%u,rel:%u,"
"is_split:%d,is_root:%d,leftblk:%u,rightblk:%u,level:%u,delblk:%u,happen:%u,enable:%u",
action->node.spcNode, action->node.dbNode, action->node.relNode, action->is_split,
action->is_root, action->leftblk, action->rightblk, action->level, action->delblk,
t_thrd.xlog_cxt.forceFinishHappened,
g_instance.attr.attr_storage.enable_update_max_page_flush_lsn)));
if (FORCE_FINISH_ENABLED) {
continue;
}
if (action->is_split && action->is_root) {
if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
MemoryContext ctx = MemoryContextSwitchTo(oldCtx);
btree_xlog_finish_incomplete_split(action);
(void)MemoryContextSwitchTo(ctx);
} else {
btree_xlog_finish_incomplete_split(action);
}
}
}
foreach (l, t_thrd.xlog_cxt.incomplete_actions) {
bt_incomplete_action *action = (bt_incomplete_action *)lfirst(l);
ereport(WARNING, (errmsg("[BTREE_ACTION_TRACE]btree_xlog_cleanup2: action spc:%u,db:%u,rel:%u,"
"is_split:%u,is_root:%u,leftblk:%u,rightblk:%u,level:%u,delblk:%u,happen:%u,enable:%u",
action->node.spcNode, action->node.dbNode, action->node.relNode, action->is_split,
action->is_root, action->leftblk, action->rightblk, action->level, action->delblk,
t_thrd.xlog_cxt.forceFinishHappened,
g_instance.attr.attr_storage.enable_update_max_page_flush_lsn)));
if (FORCE_FINISH_ENABLED) {
continue;
}
if (action->is_split) {
if (!action->is_root) {
if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
MemoryContext ctx = MemoryContextSwitchTo(oldCtx);
btree_xlog_finish_incomplete_split(action);
(void)MemoryContextSwitchTo(ctx);
} else {
btree_xlog_finish_incomplete_split(action);
}
}
} else {
if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
MemoryContext ctx = MemoryContextSwitchTo(oldCtx);
btree_xlog_finish_incomplete_deletion(action);
(void)MemoryContextSwitchTo(ctx);
} else {
btree_xlog_finish_incomplete_deletion(action);
}
}
}
t_thrd.xlog_cxt.incomplete_actions = NIL;
if (get_real_recovery_parallelism() > 1 && (!parallel_recovery::DispatchPtrIsNull())) {
(void)MemoryContextSwitchTo(oldCtx);
}
duration = GetCurrentTimestamp() - start_time;
ereport(LOG, (errmodule(MOD_REDO), errcode(ERRCODE_LOG),
errmsg("btree_xlog_cleanup is over, it takes time:%ld microseconds", duration)));
}
bool btree_safe_restartpoint(void)
{
if (t_thrd.xlog_cxt.incomplete_actions)
return false;
return true;
}
void btree_clear_imcompleteAction()
{
if ((get_real_recovery_parallelism() > 1) && (!parallel_recovery::DispatchPtrIsNull())) {
SwitchToDispatcherContext();
t_thrd.xlog_cxt.incomplete_actions =
parallel_recovery::CheckImcompleteAction(t_thrd.xlog_cxt.incomplete_actions);
EndDispatcherContext();
}
btree_xlog_cleanup();
}