*
* knl_uredo.cpp
* WAL replay logic for uheap.
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/storage/access/ustore/knl_uredo.cpp
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "access/parallel_recovery/page_redo.h"
#include "access/xlogutils.h"
#include "catalog/pg_tablespace.h"
#include "storage/standby.h"
#include "access/ustore/knl_uredo.h"
#include "access/ustore/knl_upage.h"
#include "access/ustore/knl_undorequest.h"
#include "access/ustore/knl_uverify.h"
#include "access/xlogutils.h"
#include "access/xlogproc.h"
#include "access/ustore/knl_uheap.h"
#include "access/ustore/undo/knl_uundoxlog.h"
#include "access/ustore/knl_whitebox_test.h"
#include "securec.h"
#include "storage/freespace.h"
#include "access/ustore/undo/knl_uundoapi.h"
#include "access/multi_redo_api.h"
using namespace undo;
static const int FREESPACE_FRACTION = 5;
union TupleBuffer {
UHeapDiskTupleData hdr;
char data[MaxPossibleUHeapTupleSize];
};
struct UpdateRedoBuffers {
RedoBufferInfo oldbuffer;
RedoBufferInfo newbuffer;
};
struct UpdateRedoTuples {
UHeapTupleData *oldtup;
UHeapDiskTuple newtup;
};
struct UpdateRedoAffixLens {
uint16 prefixlen;
uint16 suffixlen;
};
static UHeapDiskTuple GetUHeapDiskTupleFromRedoData(char *data, Size *datalen,
TupleBuffer &tbuf)
{
UHeapDiskTuple disktup;
XlUHeapHeader xlhdr;
errno_t rc;
rc = memcpy_s((char *)&xlhdr, SizeOfUHeapHeader, data, SizeOfUHeapHeader);
securec_check(rc, "\0", "\0");
data += SizeOfUHeapHeader;
disktup = &tbuf.hdr;
rc = memset_s((char *)disktup, SizeOfUHeapDiskTupleData, 0, SizeOfUHeapDiskTupleData);
securec_check(rc, "\0", "\0");
rc = memcpy_s((char *)disktup + SizeOfUHeapDiskTupleData, *datalen, data, *datalen);
securec_check(rc, "\0", "\0");
*datalen += SizeOfUHeapDiskTupleData;
UHeapTupleHeaderSetTDSlot(disktup, xlhdr.td_id);
disktup->xid = (ShortTransactionId)FrozenTransactionId;
disktup->flag2 = xlhdr.flag2;
disktup->flag = xlhdr.flag;
disktup->t_hoff = xlhdr.t_hoff;
FastVerifyUTuple(disktup, InvalidBuffer);
return disktup;
}
static UndoRecPtr PrepareAndInsertUndoRecordForInsertRedo(XLogReaderState *record,
const BlockNumber blkno, uint32 *skipSize, const bool tryPrepare)
{
XLogRecPtr lsn = record->EndRecPtr;
TransactionId xid = XLogRecGetXid(record);
undo::XlogUndoMeta undometa;
UndoRecPtr *blkprev;
UndoRecPtr *prevurp;
Oid *partitionOid;
UndoRecPtr invalidUrp = INVALID_UNDO_REC_PTR;
Oid invalidPartitionOid = 0;
bool hasCSN = XLogRecHasCSN(record);
XlUHeapInsert *xlrec = (XlUHeapInsert *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + SizeOfXLOGCSN(hasCSN));
char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader);
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_BLK_PREV) != 0) {
blkprev = (UndoRecPtr *) ((char *)currLogPtr);
currLogPtr += sizeof(UndoRecPtr);
*skipSize += sizeof(UndoRecPtr);
} else {
blkprev = &invalidUrp;
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PREV_URP) != 0) {
prevurp = (UndoRecPtr *) ((char *)currLogPtr);
currLogPtr += sizeof(UndoRecPtr);
*skipSize += sizeof(UndoRecPtr);
} else {
prevurp = &invalidUrp;
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PARTITION_OID) != 0) {
partitionOid = (Oid *) ((char *)currLogPtr);
currLogPtr += sizeof(Oid);
*skipSize += sizeof(Oid);
} else {
partitionOid = &invalidPartitionOid;
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0) {
currLogPtr += sizeof(TransactionId);
*skipSize += sizeof(TransactionId);
}
XlogUndoMeta *xlundometa = (XlogUndoMeta *)((char *)currLogPtr);
UndoRecPtr urecptr = xlundohdr->urecptr;
CopyUndoMeta(*xlundometa, undometa);
*skipSize += undometa.Size();
if (tryPrepare) {
bool skipInsert = IsSkipInsertUndo(urecptr);
if (skipInsert) {
undometa.SetInfo(XLOG_UNDOMETA_INFO_SKIP);
}
* wrote those information in the xlundohdr because we can grab them from the XLOG record itself.
*/
RelFileNode targetNode = { 0 };
bool res PG_USED_FOR_ASSERTS_ONLY = XLogRecGetBlockTag(record, 0, &targetNode, NULL, NULL);
Assert(res == true);
UndoRecord *undorec = (*t_thrd.ustore_cxt.urecvec)[0];
undorec->SetUrp(urecptr);
urecptr = UHeapPrepareUndoInsert(xlundohdr->relOid, *partitionOid,
targetNode.relNode, targetNode.spcNode, UNDO_PERMANENT,
xid, 0, *blkprev, *prevurp, blkno, xlundohdr, &undometa);
ereport(DEBUG2, (errmodule(MOD_UNDO),
errmsg(UNDOFORMAT("redo:undoptr=%lu, xid %lu, partoid=%u, spcoid=%u."),
urecptr, xid, *partitionOid, targetNode.spcNode)));
Assert(UNDO_PTR_GET_OFFSET(urecptr) == UNDO_PTR_GET_OFFSET(xlundohdr->urecptr));
undorec->SetOffset(xlrec->offnum);
if (!skipInsert) {
InsertPreparedUndo(t_thrd.ustore_cxt.urecvec, lsn);
}
undo::RedoUndoMeta(record, &undometa, xlundohdr->urecptr, t_thrd.ustore_cxt.urecvec->LastRecord(),
t_thrd.ustore_cxt.urecvec->LastRecordSize());
UndoRecordVerify(undorec);
UHeapResetPreparedUndo();
}
return urecptr;
}
static XLogRedoAction GetInsertRedoAction(XLogReaderState *record, RedoBufferInfo *buffer, const uint32 skipSize)
{
XLogRedoAction action;
if (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_PAGE) {
bool hasCSN = XLogRecHasCSN(record);
XlUHeapInsert *xlrec = (XlUHeapInsert *)XLogRecGetData(record);
XlUndoHeader *xlundohdr =
(XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert + SizeOfXLOGCSN(hasCSN));
TransactionId *xidBase = (TransactionId *)((char *)xlundohdr + SizeOfXLUndoHeader + skipSize);
uint16 *tdCount = (uint16 *)((char *)xidBase + sizeof(TransactionId));
XLogInitBufferForRedo(record, 0, buffer);
Page page = buffer->pageinfo.page;
if (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_TOAST_PAGE) {
UPageInit<UPAGE_TOAST>(page, buffer->pageinfo.pagesize, UHEAP_SPECIAL_SIZE);
} else {
UPageInit<UPAGE_HEAP>(page, buffer->pageinfo.pagesize, UHEAP_SPECIAL_SIZE, *tdCount);
}
UHeapPageHeaderData *uheappage = (UHeapPageHeaderData *)page;
uheappage->pd_xid_base = *xidBase;
uheappage->pd_multi_base = 0;
action = BLK_NEEDS_REDO;
} else {
action = XLogReadBufferForRedo(record, 0, buffer);
}
return action;
}
static void PerformInsertRedoAction(XLogReaderState *record, const Buffer buf, const UndoRecPtr urecptr,
TupleBuffer &tbuf)
{
TransactionId xid = XLogRecGetXid(record);
XlUHeapInsert *xlrec = (XlUHeapInsert *)XLogRecGetData(record);
Size datalen;
int tdSlotId;
UHeapBufferPage bufpage;
char *data = XLogRecGetBlockData(record, 0, &datalen);
if (datalen < SizeOfUHeapHeader) {
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("Datalen less than SizeOfUHeapHeader")));
}
Size newlen = datalen - SizeOfUHeapHeader;
Page page = BufferGetPage(buf);
if (UHeapPageGetMaxOffsetNumber(page) + 1 < xlrec->offnum) {
UPagePrintErrorInfo(page, "The max offset number is invalid");
}
* For UHeap, in case of "SELECT INTO" statement, length of data will
* be equal to the UHeap header size, but in heap, it will be always
* greater than heap header size, because in heap, we have one byte
* alignment in case of zero byte data length.
*/
Assert(datalen > SizeOfUHeapHeader && newlen <= MaxPossibleUHeapTupleSize);
UHeapDiskTuple utup = GetUHeapDiskTupleFromRedoData(data, &newlen, tbuf);
bufpage.buffer = buf;
bufpage.page = NULL;
FastVerifyUTuple(utup, InvalidBuffer);
if (UPageAddItem(NULL, &bufpage, (Item)utup, newlen, xlrec->offnum, true) == InvalidOffsetNumber) {
UPagePrintErrorInfo(page, "UPageAddItem failed");
}
UHeapRecordPotentialFreeSpace(buf, -1 * SHORTALIGN(newlen));
tdSlotId = UHeapTupleHeaderGetTDSlot(utup);
UHeapPageSetUndo(buf, tdSlotId, xid, urecptr);
PageSetLSN(page, record->EndRecPtr);
MarkBufferDirty(buf);
}
void UHeapXlogInsert(XLogReaderState *record)
{
RedoBufferInfo buffer = { 0 };
RelFileNode targetNode;
BlockNumber blkno = InvalidBlockNumber;
XLogRedoAction action;
uint32 skipSize = 0;
TupleBuffer tbuf;
bool allReplay = !AmPageRedoWorker() || !SUPPORT_USTORE_UNDO_WORKER;
bool onlyReplayUndo = allReplay ? false : parallel_recovery::DoPageRedoWorkerReplayUndo();
WHITEBOX_TEST_STUB(UHEAP_XLOG_INSERT_FAILED, WhiteboxDefaultErrorEmit);
XLogRecGetBlockTag(record, 0, &targetNode, NULL, &blkno);
UndoRecPtr urecptr = PrepareAndInsertUndoRecordForInsertRedo(record, blkno, &skipSize,
(allReplay || onlyReplayUndo));
if (allReplay || !onlyReplayUndo) {
#ifdef ENABLE_HTAP
if (HAVE_HTAP_TABLES) {
ItemPointerData ctid;
XlUHeapInsert *xlrec = (XlUHeapInsert *)XLogRecGetData(record);
ItemPointerSet(&ctid, blkno, xlrec->offnum);
IMCStoreInsertHook(targetNode.relNode, &ctid, XLogRecGetXid(record));
}
#endif
action = GetInsertRedoAction(record, &buffer, skipSize);
if (action == BLK_NEEDS_REDO) {
PerformInsertRedoAction(record, buffer.buf, urecptr, tbuf);
Page page = BufferGetPage(buffer.buf);
UpageVerify((UHeapPageHeader)page, t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr, NULL,
NULL, &targetNode, blkno, true);
}
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
}
}
static UndoRecPtr PrepareAndInsertUndoRecordForDeleteRedo(XLogReaderState *record,
UHeapTupleData *utup, const BlockNumber blkno, const bool tryPrepare, TupleBuffer &tbuf)
{
XLogRecPtr lsn = record->EndRecPtr;
TransactionId xid = XLogRecGetXid(record);
Size recordlen = XLogRecGetDataLen(record);
undo::XlogUndoMeta undometa;
UndoRecPtr *blkprev;
UndoRecPtr *prevurp;
Oid *partitionOid;
bool *hasSubXact;
UndoRecPtr invalidUrp = INVALID_UNDO_REC_PTR;
Oid invalidPartitionOid = 0;
bool defaultHasSubXact = false;
uint32 readSize = 0;
bool hasCSN = XLogRecHasCSN(record);
XlUHeapDelete *xlrec = (XlUHeapDelete *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete + SizeOfXLOGCSN(hasCSN));
char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader);
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_SUB_XACT) != 0) {
hasSubXact = (bool *) ((char *)currLogPtr);
currLogPtr += sizeof(bool);
readSize += sizeof(bool);
} else {
hasSubXact = &defaultHasSubXact;
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_BLK_PREV) != 0) {
blkprev = (UndoRecPtr *) ((char *)currLogPtr);
currLogPtr += sizeof(UndoRecPtr);
readSize += sizeof(UndoRecPtr);
} else {
blkprev = &invalidUrp;
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PREV_URP) != 0) {
prevurp = (UndoRecPtr *) ((char *)currLogPtr);
currLogPtr += sizeof(UndoRecPtr);
readSize += sizeof(UndoRecPtr);
} else {
prevurp = &invalidUrp;
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PARTITION_OID) != 0) {
partitionOid = (Oid *) ((char *)currLogPtr);
currLogPtr += sizeof(Oid);
readSize += sizeof(Oid);
} else {
partitionOid = &invalidPartitionOid;
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0) {
currLogPtr += sizeof(TransactionId);
readSize += sizeof(TransactionId);
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_TOAST) != 0) {
uint32 toastLen = *(uint32 *)currLogPtr;
currLogPtr += sizeof(toastLen) + toastLen;
readSize += sizeof(uint32) + toastLen;
}
undo::XlogUndoMeta *xlundometa = (undo::XlogUndoMeta *)((char *)currLogPtr);
UndoRecPtr urecptr = xlundohdr->urecptr;
CopyUndoMeta(*xlundometa, undometa);
uint32 undoMetaSize = undometa.Size();
* If the WAL stream contains undo tuple, then replace it with the
* explicitly stored tuple.
*/
Size datalen = recordlen - SizeOfXLUndoHeader - SizeOfUHeapDelete - undoMetaSize - SizeOfUHeapHeader -
readSize - SizeOfXLOGCSN(hasCSN);
char *data = (char *)xlrec + SizeOfUHeapDelete + SizeOfXLUndoHeader + undoMetaSize + readSize +
SizeOfXLOGCSN(hasCSN);
utup->disk_tuple = GetUHeapDiskTupleFromRedoData(data, &datalen, tbuf);
utup->disk_tuple_size = datalen;
if (tryPrepare) {
bool skipInsert = IsSkipInsertUndo(urecptr);
if (skipInsert) {
undometa.SetInfo(XLOG_UNDOMETA_INFO_SKIP);
}
TD oldTD;
oldTD.xactid = xlrec->oldxid;
oldTD.undo_record_ptr = INVALID_UNDO_REC_PTR;
UndoRecord *undorec = (*t_thrd.ustore_cxt.urecvec)[0];
undorec->SetUrp(urecptr);
* wrote those information in the xlundohdr because we can grab them from the XLOG record itself.
*/
RelFileNode targetNode = { 0 };
bool res PG_USED_FOR_ASSERTS_ONLY = XLogRecGetBlockTag(record, 0, &targetNode, NULL, NULL);
Assert(res == true);
urecptr = UHeapPrepareUndoDelete(xlundohdr->relOid, *partitionOid,
targetNode.relNode, targetNode.spcNode, UNDO_PERMANENT,
InvalidBuffer, xlrec->offnum, xid,
*hasSubXact ? TopSubTransactionId : InvalidSubTransactionId, 0,
*blkprev, *prevurp, &oldTD, utup, blkno, xlundohdr, &undometa);
Assert(UNDO_PTR_GET_OFFSET(urecptr) == UNDO_PTR_GET_OFFSET(xlundohdr->urecptr));
undorec->SetOffset(xlrec->offnum);
if (!skipInsert) {
InsertPreparedUndo(t_thrd.ustore_cxt.urecvec, lsn);
}
undo::RedoUndoMeta(record, &undometa, xlundohdr->urecptr, t_thrd.ustore_cxt.urecvec->LastRecord(),
t_thrd.ustore_cxt.urecvec->LastRecordSize());
UndoRecordVerify(undorec);
UHeapResetPreparedUndo();
}
return urecptr;
}
static void PerformDeleteRedoAction(XLogReaderState *record, UHeapTupleData *utup,
RedoBufferInfo *buffer, const UndoRecPtr urecptr)
{
XLogRecPtr lsn = record->EndRecPtr;
XlUHeapDelete *xlrec = (XlUHeapDelete *)XLogRecGetData(record);
TransactionId xid = XLogRecGetXid(record);
Size datalen = utup->disk_tuple_size;
Page page = buffer->pageinfo.page;
RowPtr *rp;
if (UHeapPageGetMaxOffsetNumber(page) >= xlrec->offnum) {
rp = UPageGetRowPtr(page, xlrec->offnum);
} else {
UPagePrintErrorInfo(page, "The max offset number is invalid");
}
UHeapRecordPotentialFreeSpace(buffer->buf, SHORTALIGN(datalen));
utup->disk_tuple = (UHeapDiskTuple)UPageGetRowData(page, rp);
utup->disk_tuple_size = RowPtrGetLen(rp);
utup->disk_tuple->xid = (ShortTransactionId)FrozenTransactionId;
UHeapTupleHeaderSetTDSlot(utup->disk_tuple, xlrec->td_id);
utup->disk_tuple->flag = xlrec->flag;
UHeapPageSetUndo(buffer->buf, xlrec->td_id, xid, urecptr);
UPageSetPrunable(page, xid);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer->buf);
}
static void UHeapXlogDelete(XLogReaderState *record)
{
RedoBufferInfo buffer = { 0 };
UHeapTupleData utup;
RelFileNode targetNode;
BlockNumber blkno = InvalidBlockNumber;
ItemPointerData targetTid;
XLogRedoAction action;
TupleBuffer tbuf;
XlUHeapDelete *xlrec = (XlUHeapDelete *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete);
bool allReplay = !AmPageRedoWorker() || !SUPPORT_USTORE_UNDO_WORKER;
bool onlyReplayUndo = allReplay ? false : parallel_recovery::DoPageRedoWorkerReplayUndo();
WHITEBOX_TEST_STUB(UHEAP_XLOG_DELETE_FAILED, WhiteboxDefaultErrorEmit);
XLogRecGetBlockTag(record, 0, &targetNode, NULL, &blkno);
ItemPointerSetBlockNumber(&targetTid, blkno);
ItemPointerSetOffsetNumber(&targetTid, xlrec->offnum);
utup.table_oid = xlundohdr->relOid;
utup.ctid = targetTid;
utup.t_xid_base = InvalidTransactionId;
UndoRecPtr urecptr = PrepareAndInsertUndoRecordForDeleteRedo(record, &utup, blkno,
(allReplay || onlyReplayUndo), tbuf);
if (allReplay || !onlyReplayUndo) {
#ifdef ENABLE_HTAP
if (HAVE_HTAP_TABLES) {
IMCStoreDeleteHook(targetNode.relNode, &targetTid, XLogRecGetXid(record));
}
#endif
action = XLogReadBufferForRedo(record, 0, &buffer);
if (action == BLK_NEEDS_REDO) {
PerformDeleteRedoAction(record, &utup, &buffer, urecptr);
Page page = BufferGetPage(buffer.buf);
UpageVerify((UHeapPageHeader)page, t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr, NULL,
NULL, &targetNode, blkno, true);
}
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
}
}
static void UHeapXlogFreezeTdSlot(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
XlUHeapFreezeTdSlot *xlrec = (XlUHeapFreezeTdSlot *)XLogRecGetData(record);
int *frozenSlots = (int *)((char *)xlrec + SizeOfUHeapFreezeTDSlot);
RedoBufferInfo buffer = { 0 };
Page page;
XLogRedoAction action;
RelFileNode rnode = ((RelFileNode) {0, 0, 0, -1});
BlockNumber blkno;
int nFrozen = xlrec->nFrozen;
int slotNo = 0;
WHITEBOX_TEST_STUB(UHEAP_XLOG_FREEZE_TD_SLOT_FAILED, WhiteboxDefaultErrorEmit);
(void) XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
if (InHotStandby && TransactionIdIsValid(xlrec->latestFrozenXid)) {
ResolveRecoveryConflictWithSnapshot(xlrec->latestFrozenXid, rnode, record->ReadRecPtr);
}
action = XLogReadBufferForRedo(record, 0, &buffer);
* skip redo processing when action is BLK_NOTFOUND, which indicates
* a truncated or deleted block
*/
if (action == BLK_NOTFOUND)
return;
if (action == BLK_NEEDS_REDO) {
page = buffer.pageinfo.page;
UHeapPageTDData *tdPtr = (UHeapPageTDData *)PageGetTDPointer(page);
TD *transinfo = tdPtr->td_info;
UHeapFreezeOrInvalidateTuples(buffer.buf, nFrozen, frozenSlots, true);
for (int i = 0; i < nFrozen; i++) {
TD *thistrans;
slotNo = frozenSlots[i];
thistrans = &transinfo[slotNo];
Assert(!TransactionIdFollows(thistrans->xactid, xlrec->latestFrozenXid));
thistrans->xactid = InvalidTransactionId;
thistrans->undo_record_ptr = INVALID_UNDO_REC_PTR;
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer.buf);
UpageVerify((UHeapPageHeader)page, t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr, NULL,
NULL, &rnode, blkno, true);
}
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
}
static void UHeapXlogInvalidTdSlot(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
uint16 *nCompletedSlots = (uint16 *)XLogRecGetData(record);
int *completedXactSlots = (int *)((char *)nCompletedSlots + sizeof(uint16));
RedoBufferInfo buffer = { 0 };
Page page;
XLogRedoAction action;
int slotNo = 0;
BlockNumber blkno = InvalidBlockNumber;
WHITEBOX_TEST_STUB(UHEAP_XLOG_INVALID_TD_SLOT_FAILED, WhiteboxDefaultErrorEmit);
action = XLogReadBufferForRedo(record, 0, &buffer);
(void) XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
* skip redo processing when action is BLK_NOTFOUND, which indicates
* a truncated or deleted block
*/
if (action == BLK_NOTFOUND)
return;
if (action == BLK_NEEDS_REDO) {
page = buffer.pageinfo.page;
UHeapPageTDData *tdPtr = (UHeapPageTDData *)PageGetTDPointer(page);
TD *transinfo = tdPtr->td_info;
UHeapFreezeOrInvalidateTuples(buffer.buf, *nCompletedSlots, completedXactSlots, false);
for (int i = 0; i < *nCompletedSlots; i++) {
slotNo = completedXactSlots[i];
transinfo[slotNo].xactid = InvalidTransactionId;
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer.buf);
UpageVerify((UHeapPageHeader)page, t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr, NULL,
NULL, NULL, blkno, true);
}
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
}
static void PerformCleanRedoAction(XLogReaderState *record, RedoBufferInfo *buffer, Size *freespace)
{
XlUHeapClean *xlrec = (XlUHeapClean *)XLogRecGetData(record);
XLogRecPtr lsn = record->EndRecPtr;
Page page = buffer->pageinfo.page;
Size datalen;
UPruneState prstate;
int ndeleted = xlrec->ndeleted;
int ndead = xlrec->ndead;
OffsetNumber *deleted = (OffsetNumber *)XLogRecGetBlockData(record, 0, &datalen);
OffsetNumber *end = (OffsetNumber *)((char *)deleted + datalen);
OffsetNumber *nowdead = deleted + (ndeleted * 2);
OffsetNumber *nowunused = nowdead + ndead;
OffsetNumber *nowfixed;
OffsetNumber *targetOffnum;
OffsetNumber tmpTargetOff;
OffsetNumber *offnum;
Size *spaceRequired;
Size tmpSpcRqd;
int i;
int tmpunused = 0;
int *nunused = &tmpunused;
uint16 tmpfixed = 0;
uint16 *nfixed = &tmpfixed;
char *unused = (char *)xlrec + SizeOfUHeapClean;
if (xlrec->flags & XLZ_CLEAN_CONTAINS_OFFSET) {
targetOffnum = (OffsetNumber *)((char *)xlrec + SizeOfUHeapClean);
spaceRequired = (Size *)((char *)targetOffnum + sizeof(OffsetNumber));
unused = ((char *)spaceRequired + sizeof(Size));
} else {
targetOffnum = &tmpTargetOff;
*targetOffnum = InvalidOffsetNumber;
spaceRequired = &tmpSpcRqd;
*spaceRequired = 0;
}
if (xlrec->flags & XLZ_CLEAN_CONTAINS_TUPLEN) {
nunused = (int *)unused;
nfixed = (uint16 *)((char *)nunused + sizeof(int));
nowfixed = nowunused + *nunused;
Assert(*nfixed > 0);
} else {
*nunused = (end - nowunused);
*nfixed = 0;
nowfixed = nowunused;
}
Assert(*nunused >= 0);
offnum = deleted;
for (i = 0; i < ndeleted; i++) {
prstate.nowdeleted[i] = *offnum++;
}
offnum = nowdead;
for (i = 0; i < ndead; i++) {
prstate.nowdead[i] = *offnum++;
}
offnum = nowunused;
for (i = 0; i < *nunused; i++) {
prstate.nowunused[i] = *offnum++;
}
offnum = nowfixed;
for (i = 0; i < *nfixed; i++) {
prstate.nowfixed[i] = *offnum++;
}
offnum = nowfixed + *nfixed;
for (i = 0; i < *nfixed; i++) {
prstate.fixedlen[i] = *offnum++;
}
prstate.ndeleted = ndeleted;
prstate.ndead = ndead;
prstate.nunused = *nunused;
prstate.nfixed = *nfixed;
UHeapPagePruneExecute(buffer->buf, *targetOffnum, &prstate);
if (xlrec->flags & XLZ_CLEAN_ALLOW_PRUNING) {
bool pruned PG_USED_FOR_ASSERTS_ONLY = false;
UPageRepairFragmentation(NULL, buffer->buf, *targetOffnum, *spaceRequired, &pruned, false);
* Pruning must be successful at redo time, otherwise the page
* contents on master and standby might differ.
*/
Assert(pruned);
}
*freespace = PageGetUHeapFreeSpace(page);
* below */
* Note: we don't worry about updating the page's prunability hints.
* At worst this will cause an extra prune cycle to occur soon.
*/
PageSetLSN(page, lsn);
MarkBufferDirty(buffer->buf);
}
static void UHeapXlogClean(XLogReaderState *record)
{
XlUHeapClean *xlrec = (XlUHeapClean *)XLogRecGetData(record);
RedoBufferInfo buffer = { 0 };
Size freespace = 0;
RelFileNode rnode = ((RelFileNode) {0, 0, 0, -1});
BlockNumber blkno = InvalidBlockNumber;
XLogRedoAction action;
WHITEBOX_TEST_STUB(UHEAP_XLOG_CLEAN_FAILED, WhiteboxDefaultErrorEmit);
XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
* We're about to remove tuples. In Hot Standby mode, ensure that there's
* no queries running for which the removed tuples are still visible.
*
* Not all INPLACEHEAP_CLEAN records remove tuples with xids, so we only want to
* conflict on the records that cause MVCC failures for user queries. If
* latestRemovedXid is invalid, skip conflict processing.
*/
if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid)) {
XLogRecPtr lsn = record->ReadRecPtr;
ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode, lsn);
}
* If we have a full-page image, restore it (using a cleanup lock) and
* we're done.
*/
action = XLogReadBufferForRedo(record, 0, &buffer);
if (action == BLK_NEEDS_REDO) {
PerformCleanRedoAction(record, &buffer, &freespace);
Page page = BufferGetPage(buffer.buf);
UpageVerify((UHeapPageHeader)page, t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr, NULL,
NULL, &rnode, blkno, true);
}
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
* Update the FSM as well.
*
* XXX: Don't do this if the page was restored from full page image. We
* don't bother to update the FSM in that case, it doesn't need to be
* totally accurate anyway.
*/
if (action == BLK_NEEDS_REDO)
XLogRecordPageWithFreeSpace(rnode, blkno, freespace);
}
static void GetXidbaseAndAffixLensFromUpdateRedo(char *curxlogptr, XLogReaderState *record,
TransactionId **xidBase, uint16 **tdCount, UpdateRedoAffixLens *affixLens)
{
XlUHeapUpdate *xlrec = (XlUHeapUpdate *)XLogRecGetData(record);
if (xlrec->flags & XLZ_NON_INPLACE_UPDATE) {
if (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_PAGE) {
*xidBase = (TransactionId *)curxlogptr;
curxlogptr += sizeof(TransactionId);
*tdCount = (uint16 *)curxlogptr;
curxlogptr += sizeof(uint16);
}
} else {
int *undoXorDeltaSizePtr = (int *)curxlogptr;
int undoXorDeltaSize = *undoXorDeltaSizePtr;
curxlogptr += sizeof(int);
char *xorCurxlogptr = curxlogptr;
curxlogptr += undoXorDeltaSize;
uint8 *tHoffPtr = (uint8 *)xorCurxlogptr;
uint8 tHoff = *tHoffPtr;
xorCurxlogptr += sizeof(uint8) + tHoff - OffsetTdId;
uint8 *flagsPtr = (uint8 *)xorCurxlogptr;
uint8 flags = *flagsPtr;
xorCurxlogptr += sizeof(uint8);
if (flags & UREC_INPLACE_UPDATE_XOR_PREFIX) {
uint16 *prefixlenPtr = (uint16 *)(xorCurxlogptr);
xorCurxlogptr += sizeof(uint16);
affixLens->prefixlen = *prefixlenPtr;
}
if (flags & UREC_INPLACE_UPDATE_XOR_SUFFIX) {
uint16 *suffixlenPtr = (uint16 *)(xorCurxlogptr);
xorCurxlogptr += sizeof(uint16);
affixLens->suffixlen = *suffixlenPtr;
}
}
}
static UndoRecPtr PrepareAndInsertUndoRecordForUpdateRedo(XLogReaderState *record,
UHeapTupleData *oldtup, XlUndoHeader **xlnewundohdr, char **xidBasePos, TupleBuffer &tbuf)
{
XLogRecPtr lsn = record->EndRecPtr;
Size recordlen = XLogRecGetDataLen(record);
TransactionId xid = XLogRecGetXid(record);
undo::XlogUndoMeta undometa;
UndoRecPtr newUrecptr = INVALID_UNDO_REC_PTR;
UndoRecPtr *blkprev;
UndoRecPtr *prevurp;
Oid *partitionOid;
bool *hasSubXact;
UndoRecPtr *newblkprev;
UndoRecPtr *newprevurp;
Oid *newpartitionOid;
UndoRecPtr invalidUrp = INVALID_UNDO_REC_PTR;
Oid invalidPartitionOid = 0;
bool defaultHasSubXact = false;
uint32 readSize = 0;
int undoXorDeltaSize = 0;
bool inplaceUpdate = true;
char *xlogXorDelta = NULL;
bool hasCSN = XLogRecHasCSN(record);
XlUHeapUpdate *xlrec = (XlUHeapUpdate *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate + SizeOfXLOGCSN(hasCSN));
UndoRecPtr urecptr = xlundohdr->urecptr;
char *curxlogptr = ((char *)xlundohdr) + SizeOfXLUndoHeader;
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_SUB_XACT) != 0) {
hasSubXact = (bool *) ((char *)curxlogptr);
curxlogptr += sizeof(bool);
readSize += sizeof(bool);
} else {
hasSubXact = &defaultHasSubXact;
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_BLK_PREV) != 0) {
blkprev = (UndoRecPtr *) ((char *)curxlogptr);
curxlogptr += sizeof(UndoRecPtr);
readSize += sizeof(UndoRecPtr);
} else {
blkprev = &invalidUrp;
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PREV_URP) != 0) {
prevurp = (UndoRecPtr *) ((char *)curxlogptr);
curxlogptr += sizeof(UndoRecPtr);
readSize += sizeof(UndoRecPtr);
} else {
prevurp = &invalidUrp;
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PARTITION_OID) != 0) {
partitionOid = (Oid *) ((char *)curxlogptr);
curxlogptr += sizeof(Oid);
readSize += sizeof(Oid);
} else {
partitionOid = &invalidPartitionOid;
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0) {
curxlogptr += sizeof(TransactionId);
readSize += sizeof(TransactionId);
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_TOAST) != 0) {
uint32 toastLen = *(uint32 *)curxlogptr;
curxlogptr += sizeof(toastLen) + toastLen;
readSize += sizeof(toastLen) + toastLen;
}
bool allReplay = !AmPageRedoWorker() || !SUPPORT_USTORE_UNDO_WORKER;
bool onlyReplayUndo = allReplay ? false : parallel_recovery::DoPageRedoWorkerReplayUndo();
if (xlrec->flags & XLZ_NON_INPLACE_UPDATE) {
*xlnewundohdr = (XlUndoHeader *)curxlogptr;
curxlogptr += SizeOfXLUndoHeader;
inplaceUpdate = false;
if (((*xlnewundohdr)->flag & XLOG_UNDO_HEADER_HAS_BLK_PREV) != 0) {
newblkprev = (UndoRecPtr *) ((char *)curxlogptr);
curxlogptr += sizeof(UndoRecPtr);
readSize += sizeof(UndoRecPtr);
} else {
newblkprev = &invalidUrp;
}
if (((*xlnewundohdr)->flag & XLOG_UNDO_HEADER_HAS_PREV_URP) != 0) {
newprevurp = (UndoRecPtr *) ((char *)curxlogptr);
curxlogptr += sizeof(UndoRecPtr);
readSize += sizeof(UndoRecPtr);
} else {
newprevurp = &invalidUrp;
}
if (((*xlnewundohdr)->flag & XLOG_UNDO_HEADER_HAS_PARTITION_OID) != 0) {
newpartitionOid = (Oid *) ((char *)curxlogptr);
curxlogptr += sizeof(Oid);
readSize += sizeof(Oid);
} else {
newpartitionOid = &invalidPartitionOid;
}
}
undo::XlogUndoMeta *xlundometa = (undo::XlogUndoMeta *)curxlogptr;
CopyUndoMeta(*xlundometa, undometa);
uint32 undoMetaSize = undometa.Size();
curxlogptr += undoMetaSize;
*xidBasePos = curxlogptr;
if (inplaceUpdate) {
int *undoXorDeltaSizePtr = (int *)curxlogptr;
undoXorDeltaSize = *undoXorDeltaSizePtr;
curxlogptr += sizeof(int);
xlogXorDelta = curxlogptr;
} else {
Size initPageXtraInfo = 0;
if (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_PAGE) {
initPageXtraInfo = sizeof(TransactionId) + sizeof(uint16);
curxlogptr += initPageXtraInfo;
}
char *data = (char *)curxlogptr;
Size datalen = recordlen - SizeOfUHeapHeader - SizeOfXLUndoHeader - SizeOfUHeapUpdate -
undoMetaSize - SizeOfXLUndoHeader - initPageXtraInfo - readSize - SizeOfXLOGCSN(hasCSN);
oldtup->disk_tuple = GetUHeapDiskTupleFromRedoData(data, &datalen, tbuf);
oldtup->disk_tuple_size = datalen;
}
BlockNumber oldblk = InvalidBlockNumber;
BlockNumber newblk = InvalidBlockNumber;
RelFileNode rnode;
XLogRecGetBlockTag(record, 0, &rnode, NULL, &newblk);
if (XLogRecGetBlockTag(record, 1, NULL, NULL, &oldblk)) {
Assert(!inplaceUpdate);
} else {
oldblk = newblk;
}
ItemPointerData oldtid, newtid;
ItemPointerSet(&oldtid, oldblk, xlrec->old_offnum);
ItemPointerSet(&newtid, newblk, xlrec->new_offnum);
oldtup->table_oid = xlundohdr->relOid;
oldtup->ctid = oldtid;
oldtup->t_xid_base = InvalidTransactionId;
if (allReplay || onlyReplayUndo) {
bool skipInsert = IsSkipInsertUndo(urecptr);
if (skipInsert) {
undometa.SetInfo(XLOG_UNDOMETA_INFO_SKIP);
}
TD oldTD;
oldTD.xactid = xlrec->oldxid;
oldTD.undo_record_ptr = INVALID_UNDO_REC_PTR;
UndoRecord *oldundorec = (*t_thrd.ustore_cxt.urecvec)[0];
oldundorec->SetUrp(urecptr);
UndoRecord *newundorec = NULL;
if (!inplaceUpdate) {
newundorec = (*t_thrd.ustore_cxt.urecvec)[1];
newundorec->SetUrp((*xlnewundohdr)->urecptr);
}
* wrote those information in the xlundohdr because we can grab them from the XLOG record itself.
* Here we grab the first block tag. If the record has more than one blocks, then their tags
* should be the same.
*/
RelFileNode targetNode = { 0 };
bool res PG_USED_FOR_ASSERTS_ONLY = XLogRecGetBlockTag(record, 0, &targetNode, NULL, NULL);
Assert(res == true);
urecptr = UHeapPrepareUndoUpdate(xlundohdr->relOid, *partitionOid,
targetNode.relNode, targetNode.spcNode,
UNDO_PERMANENT, InvalidBuffer, InvalidBuffer, xlrec->old_offnum, xid,
*hasSubXact ? TopSubTransactionId : InvalidSubTransactionId, 0, *blkprev,
inplaceUpdate ? *blkprev : *newblkprev, *prevurp, &oldTD, oldtup,
inplaceUpdate, &newUrecptr, undoXorDeltaSize, oldblk, newblk, xlundohdr, &undometa);
Assert(UNDO_PTR_GET_OFFSET(urecptr) == UNDO_PTR_GET_OFFSET(xlundohdr->urecptr));
if (!skipInsert) {
if (!inplaceUpdate) {
newundorec->SetOffset(xlrec->new_offnum);
appendBinaryStringInfo(oldundorec->Rawdata(), (char *)&newtid, sizeof(ItemPointerData));
} else {
UndoRecord *undorec = (*t_thrd.ustore_cxt.urecvec)[0];
appendBinaryStringInfo(undorec->Rawdata(), xlogXorDelta, undoXorDeltaSize);
}
if (*hasSubXact) {
SubTransactionId subxid = TopSubTransactionId;
oldundorec->SetUinfo(UNDO_UREC_INFO_CONTAINS_SUBXACT);
appendBinaryStringInfo(oldundorec->Rawdata(), (char *)&subxid, sizeof(SubTransactionId));
}
InsertPreparedUndo(t_thrd.ustore_cxt.urecvec, lsn);
}
undo::RedoUndoMeta(record, &undometa, xlundohdr->urecptr, t_thrd.ustore_cxt.urecvec->LastRecord(),
t_thrd.ustore_cxt.urecvec->LastRecordSize());
URecVector *urecvec = t_thrd.ustore_cxt.urecvec;
UndoRecord *undorec = (*urecvec)[0];
if (inplaceUpdate) {
UndoRecordVerify(undorec);
} else {
UndoRecordVerify(newundorec);
}
UHeapResetPreparedUndo();
}
return urecptr;
}
static void PerformUpdateOldRedoAction(XLogReaderState *record, UHeapTupleData *oldtup,
UpdateRedoBuffers *buffers, const UndoRecPtr urecptr, const bool sameBlock)
{
XLogRecPtr lsn = record->EndRecPtr;
TransactionId xid = XLogRecGetXid(record);
XlUHeapUpdate *xlrec = (XlUHeapUpdate *)XLogRecGetData(record);
Buffer oldbuf = buffers->oldbuffer.buf;
Page oldpage = buffers->oldbuffer.pageinfo.page;
RowPtr *rp = NULL;
if (UHeapPageGetMaxOffsetNumber(oldpage) >= xlrec->old_offnum) {
rp = UPageGetRowPtr(oldpage, xlrec->old_offnum);
} else {
UPagePrintErrorInfo(oldpage, "The max offset number is invalid");
}
oldtup->disk_tuple = (UHeapDiskTuple)UPageGetRowData(oldpage, rp);
oldtup->disk_tuple_size = RowPtrGetLen(rp);
oldtup->disk_tuple->flag = xlrec->old_tuple_flag;
oldtup->disk_tuple->xid = (ShortTransactionId)FrozenTransactionId;
UHeapTupleHeaderSetTDSlot(oldtup->disk_tuple, xlrec->old_tuple_td_id);
if (!sameBlock) {
UHeapPageSetUndo(oldbuf, xlrec->old_tuple_td_id, xid, urecptr);
}
if (xlrec->flags & XLZ_NON_INPLACE_UPDATE) {
UPageSetPrunable(oldpage, xid);
if (buffers->newbuffer.buf != oldbuf) {
UHeapRecordPotentialFreeSpace(oldbuf, SHORTALIGN(oldtup->disk_tuple_size));
}
} else if (xlrec->flags & XLZ_LINK_UPDATE) {
UPageSetPrunable(oldpage, xid);
UHeapRecordPotentialFreeSpace(oldbuf, SHORTALIGN(oldtup->disk_tuple_size));
}
PageSetLSN(oldpage, lsn);
MarkBufferDirty(oldbuf);
}
static XLogRedoAction GetUpdateNewRedoAction(XLogReaderState *record, UpdateRedoBuffers *buffers,
const XLogRedoAction oldaction, const TransactionId *xidBase, const uint16 *tdCount, const bool sameBlock)
{
XLogRedoAction newaction;
if (sameBlock) {
buffers->newbuffer.buf = buffers->oldbuffer.buf;
buffers->newbuffer.pageinfo.page = buffers->oldbuffer.pageinfo.page;
newaction = oldaction;
} else if (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_PAGE) {
XLogInitBufferForRedo(record, 0, &buffers->newbuffer);
Page newpage = buffers->newbuffer.pageinfo.page;
if (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_TOAST_PAGE) {
UPageInit<UPAGE_TOAST>(newpage, buffers->newbuffer.pageinfo.pagesize, UHEAP_SPECIAL_SIZE);
} else {
UPageInit<UPAGE_HEAP>(newpage, buffers->newbuffer.pageinfo.pagesize, UHEAP_SPECIAL_SIZE, *tdCount);
}
UHeapPageHeaderData *uheappage = (UHeapPageHeaderData *)newpage;
uheappage->pd_xid_base = *xidBase;
uheappage->pd_multi_base = 0;
newaction = BLK_NEEDS_REDO;
} else {
newaction = XLogReadBufferForRedo(record, 0, &buffers->newbuffer);
}
return newaction;
}
static uint32 GetUHeapDiskTupleFromUpdateNewRedoData(XLogReaderState *record, UpdateRedoTuples *tuples,
UpdateRedoAffixLens *affixLens, TupleBuffer &tbuf, const bool sameBlock)
{
Size datalen;
errno_t rc;
XlUHeapHeader xlhdr;
XlUHeapUpdate *xlrec = (XlUHeapUpdate *)XLogRecGetData(record);
char *recdata = XLogRecGetBlockData(record, 0, &datalen);
char *recdataEnd = recdata + datalen;
if (xlrec->flags & XLZ_NON_INPLACE_UPDATE) {
if (xlrec->flags & XLZ_UPDATE_PREFIX_FROM_OLD) {
Assert(sameBlock);
rc = memcpy_s(&affixLens->prefixlen, sizeof(uint16), recdata, sizeof(uint16));
securec_check(rc, "\0", "\0");
recdata += sizeof(uint16);
}
if (xlrec->flags & XLZ_UPDATE_SUFFIX_FROM_OLD) {
Assert(sameBlock);
rc = memcpy_s(&affixLens->suffixlen, sizeof(uint16), recdata, sizeof(uint16));
securec_check(rc, "\0", "\0");
recdata += sizeof(uint16);
}
}
rc = memcpy_s((char *)&xlhdr, SizeOfUHeapHeader, recdata, SizeOfUHeapHeader);
securec_check(rc, "\0", "\0");
recdata += SizeOfUHeapHeader;
Size tuplen = recdataEnd - recdata;
Assert(tuplen <= MaxPossibleUHeapTupleSize);
tuples->newtup = &tbuf.hdr;
rc = memset_s((char *)tuples->newtup, SizeOfUHeapDiskTupleData, 0, SizeOfUHeapDiskTupleData);
securec_check(rc, "\0", "\0");
* Reconstruct the new tuple using the prefix and/or suffix from the
* old tuple, and the data stored in the WAL record.
*/
char *newp = (char *)tuples->newtup + SizeOfUHeapDiskTupleData;
if (affixLens->prefixlen > 0) {
int len;
len = xlhdr.t_hoff - SizeOfUHeapDiskTupleData;
if (len > 0) {
rc = memcpy_s(newp, tuplen, recdata, len);
securec_check(rc, "\0", "\0");
recdata += len;
newp += len;
}
rc = memcpy_s(newp, affixLens->prefixlen, (char *)tuples->oldtup->disk_tuple +
tuples->oldtup->disk_tuple->t_hoff, affixLens->prefixlen);
securec_check(rc, "\0", "\0");
newp += affixLens->prefixlen;
len = tuplen - (xlhdr.t_hoff - SizeOfUHeapDiskTupleData);
if (len > 0) {
rc = memcpy_s(newp, tuplen, recdata, len);
securec_check(rc, "\0", "\0");
recdata += len;
newp += len;
}
} else {
* copy bitmap [+ padding] [+ oid] + data from record, all in one
* go
*/
rc = memcpy_s(newp, tuplen, recdata, tuplen);
securec_check(rc, "\0", "\0");
recdata += tuplen;
newp += tuplen;
}
Assert(recdata == recdataEnd);
if (affixLens->suffixlen > 0) {
rc = memcpy_s(newp, affixLens->suffixlen,
(char *)tuples->oldtup->disk_tuple + tuples->oldtup->disk_tuple_size - affixLens->suffixlen,
affixLens->suffixlen);
securec_check(rc, "\0", "\0");
}
uint32 newlen = SizeOfUHeapDiskTupleData + tuplen + affixLens->prefixlen + affixLens->suffixlen;
UHeapTupleHeaderSetTDSlot(tuples->newtup, xlhdr.td_id);
tuples->newtup->xid = (ShortTransactionId)FrozenTransactionId;
tuples->newtup->flag2 = xlhdr.flag2;
tuples->newtup->flag = xlhdr.flag;
tuples->newtup->t_hoff = xlhdr.t_hoff;
FastVerifyUTuple(tuples->newtup, InvalidBuffer);
return newlen;
}
static Size PerformUpdateNewRedoAction(XLogReaderState *record, UpdateRedoBuffers *buffers,
UpdateRedoTuples *tuples, const uint32 newlen, XlUndoHeader *xlnewundohdr,
const UndoRecPtr urecptr, const bool sameBlock)
{
XLogRecPtr lsn = record->EndRecPtr;
TransactionId xid = XLogRecGetXid(record);
XlUHeapUpdate *xlrec = (XlUHeapUpdate *)XLogRecGetData(record);
Buffer oldbuf = buffers->oldbuffer.buf;
Buffer newbuf = buffers->newbuffer.buf;
Page oldpage = buffers->oldbuffer.pageinfo.page;
Page newpage = buffers->newbuffer.pageinfo.page;
if (UHeapPageGetMaxOffsetNumber(newpage) + 1 < xlrec->new_offnum) {
UPagePrintErrorInfo(newpage, "The max offset number is invalid");
}
if (xlrec->flags & XLZ_NON_INPLACE_UPDATE) {
UHeapBufferPage bufpage = {newbuf, NULL};
if (UPageAddItem(NULL, &bufpage, (Item)tuples->newtup, newlen, xlrec->new_offnum, true) ==
InvalidOffsetNumber) {
UPagePrintErrorInfo(newpage, "UPageAddItem failed");
}
if (newbuf != oldbuf) {
UHeapRecordPotentialFreeSpace(newbuf, -1 * SHORTALIGN(newlen));
} else {
int delta = newlen - tuples->oldtup->disk_tuple_size;
UHeapRecordPotentialFreeSpace(newbuf, -1 * SHORTALIGN(delta));
}
if (sameBlock) {
UHeapPageSetUndo(newbuf, xlrec->old_tuple_td_id, xid, xlnewundohdr->urecptr);
} else {
UHeapPageSetUndo(newbuf, tuples->newtup->td_id, xid, xlnewundohdr->urecptr);
}
} else if (xlrec->flags & XLZ_LINK_UPDATE) {
RowPtr *rp = UPageGetRowPtr(oldpage, xlrec->old_offnum);
PutLinkUpdateTuple(oldpage, (Item)tuples->newtup, rp, newlen);
UHeapRecordPotentialFreeSpace(newbuf, newlen);
Assert(oldpage == newpage);
UPageSetPrunable(newpage, XLogRecGetXid(record));
UHeapPageSetUndo(newbuf, xlrec->old_tuple_td_id, xid, urecptr);
} else {
* For inplace updates, we copy the entire data portion including
* the tuple header.
*/
RowPtr *rp = UPageGetRowPtr(oldpage, xlrec->old_offnum);
if (newlen >= RowPtrGetLen(rp) || (xlrec->flags & XLZ_UPDATE_PREFIX_FROM_OLD) != 0 ||
(xlrec->flags & XLZ_UPDATE_SUFFIX_FROM_OLD) != 0) {
RowPtrChangeLen(rp, newlen);
}
errno_t rc = memcpy_s((char *)tuples->oldtup->disk_tuple, newlen, (char *)tuples->newtup, newlen);
securec_check(rc, "\0", "\0");
if (newlen < tuples->oldtup->disk_tuple_size) {
Assert(oldpage == newpage);
UPageSetPrunable(newpage, XLogRecGetXid(record));
}
UHeapPageSetUndo(newbuf, xlrec->old_tuple_td_id, xid, urecptr);
}
Size freespace = PageGetUHeapFreeSpace(newpage);
* below */
PageSetLSN(newpage, lsn);
MarkBufferDirty(newbuf);
return freespace;
}
static void UHeapXlogUpdate(XLogReaderState *record)
{
XlUndoHeader *xlnewundohdr = NULL;
UpdateRedoBuffers buffers;
buffers.oldbuffer = { 0 };
buffers.newbuffer = { 0 };
RelFileNode rnode = { 0 };
BlockNumber oldblk = InvalidBlockNumber;
BlockNumber newblk = InvalidBlockNumber;
UHeapTupleData oldtup;
UpdateRedoTuples tuples;
XLogRedoAction oldaction, newaction;
TupleBuffer tbuf;
UpdateRedoAffixLens affixLens = {0, 0};
uint32 newlen = 0;
Size freespace = 0;
bool sameBlock = false;
bool allReplay = !AmPageRedoWorker() || !SUPPORT_USTORE_UNDO_WORKER;
bool onlyReplayUndo = allReplay ? false : parallel_recovery::DoPageRedoWorkerReplayUndo();
char *xidBasePos = NULL;
TransactionId *xidBase = NULL;
uint16 *tdCount = NULL;
XlUHeapUpdate *xlrec = (XlUHeapUpdate *)XLogRecGetData(record);
bool inplaceUpdate = !(xlrec->flags & XLZ_NON_INPLACE_UPDATE);
WHITEBOX_TEST_STUB(UHEAP_XLOG_UPDATE_FAILED, WhiteboxDefaultErrorEmit);
XLogRecGetBlockTag(record, 0, &rnode, NULL, &newblk);
if (XLogRecGetBlockTag(record, 1, NULL, NULL, &oldblk)) {
Assert(!inplaceUpdate);
} else {
oldblk = newblk;
sameBlock = true;
}
UndoRecPtr urecptr = PrepareAndInsertUndoRecordForUpdateRedo(record, &oldtup, &xlnewundohdr, &xidBasePos, tbuf);
GetXidbaseAndAffixLensFromUpdateRedo(xidBasePos, record, &xidBase, &tdCount, &affixLens);
tuples.oldtup = &oldtup;
if (allReplay || !onlyReplayUndo) {
#ifdef ENABLE_HTAP
if (HAVE_HTAP_TABLES) {
ItemPointerData ctid;
ItemPointerData newCtid;
ItemPointerSet(&ctid, oldblk, xlrec->old_offnum);
ItemPointerSet(&newCtid, newblk, xlrec->new_offnum);
IMCStoreUpdateHook(rnode.relNode, &ctid, &newCtid);
}
#endif
oldaction = XLogReadBufferForRedo(record, sameBlock ? 0 : 1, &buffers.oldbuffer);
newaction = GetUpdateNewRedoAction(record, &buffers, oldaction, xidBase, tdCount, sameBlock);
if (oldaction == BLK_NEEDS_REDO) {
PerformUpdateOldRedoAction(record, &oldtup, &buffers, urecptr, sameBlock);
}
* recover new tuple on data page
* First, construct new tuple from xlog record
*/
if (newaction == BLK_NEEDS_REDO) {
newlen = GetUHeapDiskTupleFromUpdateNewRedoData(record, &tuples, &affixLens, tbuf, sameBlock);
freespace = PerformUpdateNewRedoAction(record, &buffers, &tuples, newlen, xlnewundohdr, urecptr, sameBlock);
Page page = BufferGetPage(buffers.newbuffer.buf);
UpageVerify((UHeapPageHeader)page, t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr, NULL,
NULL, &rnode, BufferGetBlockNumber(buffers.newbuffer.buf), true);
}
if (BufferIsValid(buffers.newbuffer.buf) && buffers.newbuffer.buf != buffers.oldbuffer.buf) {
UnlockReleaseBuffer(buffers.newbuffer.buf);
}
if (BufferIsValid(buffers.oldbuffer.buf)) {
UnlockReleaseBuffer(buffers.oldbuffer.buf);
}
if (newaction == BLK_NEEDS_REDO && !inplaceUpdate && freespace < BLCKSZ / FREESPACE_FRACTION) {
XLogRecordPageWithFreeSpace(rnode, newblk, freespace);
}
}
}
static int GetOffsetRangesForMultiInsert(UHeapFreeOffsetRanges **ufreeOffsetRanges, char* data, UndoRecPtr **urpvec)
{
char *rangesData = data;
int nranges = *(int *)rangesData;
rangesData += sizeof(int);
*urpvec = (UndoRecPtr *)rangesData;
rangesData += nranges * sizeof(UndoRecPtr);
*ufreeOffsetRanges = (UHeapFreeOffsetRanges *)palloc0(sizeof(UHeapFreeOffsetRanges));
Assert(nranges > 0);
errno_t rc = memcpy_s(&(*ufreeOffsetRanges)->startOffset[0], sizeof(OffsetNumber) * nranges, (char *)rangesData,
sizeof(OffsetNumber) * nranges);
securec_check(rc, "\0", "\0");
rangesData += sizeof(OffsetNumber) * nranges;
rc = memcpy_s(&(*ufreeOffsetRanges)->endOffset[0], sizeof(OffsetNumber) * nranges, (char *)rangesData,
sizeof(OffsetNumber) * nranges);
securec_check(rc, "\0", "\0");
rangesData += sizeof(OffsetNumber) * nranges;
return nranges;
}
static UndoRecPtr PrepareAndInsertUndoRecordForMultiInsertRedo(XLogReaderState *record, const BlockNumber blkno,
XlUHeapMultiInsert **xlrec, UHeapFreeOffsetRanges **ufreeOffsetRanges, TransactionId **xidBase,
uint16 **tdCount)
{
URecVector *urecvec = NULL;
UndoRecPtr *urpvec = NULL;
undo::XlogUndoMeta undometa;
UndoRecPtr *blkprev;
UndoRecPtr *prevurp;
Oid *partitionOid;
UndoRecPtr invalidUrp = INVALID_UNDO_REC_PTR;
Oid invalidPartitionOid = 0;
XLogRecPtr lsn = record->EndRecPtr;
TransactionId xid = XLogRecGetXid(record);
bool isinit = (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_PAGE) != 0;
bool allReplay = !AmPageRedoWorker() || !SUPPORT_USTORE_UNDO_WORKER;
bool onlyReplayUndo = allReplay ? false : parallel_recovery::DoPageRedoWorkerReplayUndo();
XlUndoHeader *xlundohdr = (XlUndoHeader *)XLogRecGetData(record);
char *curxlogptr = (char *)xlundohdr + SizeOfXLUndoHeader;
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_BLK_PREV) != 0) {
blkprev = (UndoRecPtr *) ((char *)curxlogptr);
Assert(*blkprev != INVALID_UNDO_REC_PTR);
curxlogptr += sizeof(UndoRecPtr);
} else {
blkprev = &invalidUrp;
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PREV_URP) != 0) {
prevurp = (UndoRecPtr *) ((char *)curxlogptr);
curxlogptr += sizeof(UndoRecPtr);
} else {
prevurp = &invalidUrp;
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PARTITION_OID) != 0) {
partitionOid = (Oid *) ((char *)curxlogptr);
curxlogptr += sizeof(Oid);
} else {
partitionOid = &invalidPartitionOid;
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0) {
curxlogptr += sizeof(TransactionId);
}
UndoRecPtr *last_urecptr = (UndoRecPtr *)curxlogptr;
UndoRecPtr urecptr = *last_urecptr;
curxlogptr = (char *)last_urecptr + sizeof(*last_urecptr);
undo::XlogUndoMeta *xlundometa = (undo::XlogUndoMeta *)curxlogptr;
CopyUndoMeta(*xlundometa, undometa);
uint32 undoMetaSize = undometa.Size();
curxlogptr += undoMetaSize;
if (isinit) {
*xidBase = (TransactionId *)curxlogptr;
curxlogptr += sizeof(TransactionId);
*tdCount = (uint16 *)curxlogptr;
curxlogptr += sizeof(uint16);
}
bool hasCSN = XLogRecHasCSN(record);
curxlogptr = curxlogptr + SizeOfXLOGCSN(hasCSN);
(*xlrec) = (XlUHeapMultiInsert *)curxlogptr;
curxlogptr = (char *)*xlrec + SizeOfUHeapMultiInsert;
int nranges = GetOffsetRangesForMultiInsert(ufreeOffsetRanges, curxlogptr, &urpvec);
if (allReplay || onlyReplayUndo) {
bool skipInsert = IsSkipInsertUndo(urecptr);
if (skipInsert) {
undometa.SetInfo(XLOG_UNDOMETA_INFO_SKIP);
}
bool skipUndo = ((*xlrec)->flags & XLZ_INSERT_IS_FROZEN);
* wrote those information in the xlundohdr because we can grab them from the XLOG record itself.
*/
RelFileNode targetNode = { 0 };
bool res PG_USED_FOR_ASSERTS_ONLY = XLogRecGetBlockTag(record, 0, &targetNode, NULL, NULL);
Assert(res == true);
urecptr = UHeapPrepareUndoMultiInsert(xlundohdr->relOid, *partitionOid, targetNode.relNode,
targetNode.spcNode, UNDO_PERMANENT, InvalidBuffer, nranges, xid, InvalidCommandId, *blkprev,
*prevurp, &urecvec, NULL, urpvec, blkno, xlundohdr, &undometa);
* We can skip inserting undo records if the tuples are to be marked as
* frozen.
*/
if (!skipUndo && !skipInsert) {
for (int i = 0; i < nranges; i++) {
MemoryContext old_cxt = MemoryContextSwitchTo((*urecvec)[i]->mem_context());
initStringInfo((*urecvec)[i]->Rawdata());
MemoryContextSwitchTo(old_cxt);
appendBinaryStringInfo((*urecvec)[i]->Rawdata(), (char *)&(*ufreeOffsetRanges)->startOffset[i],
sizeof(OffsetNumber));
appendBinaryStringInfo((*urecvec)[i]->Rawdata(), (char *)&(*ufreeOffsetRanges)->endOffset[i],
sizeof(OffsetNumber));
}
* undo should be inserted at same location as it was during the
* actual insert (DO operation).
*/
Assert(UNDO_PTR_GET_OFFSET((*urecvec)[0]->Urp()) == UNDO_PTR_GET_OFFSET(xlundohdr->urecptr));
InsertPreparedUndo(urecvec, lsn);
}
undo::RedoUndoMeta(record, &undometa, xlundohdr->urecptr, urecvec->LastRecord(), urecvec->LastRecordSize());
UHeapResetPreparedUndo();
DELETE_EX(urecvec);
}
return urecptr;
}
static XLogRedoAction GetMultiInsertRedoAction(XLogReaderState *record, RedoBufferInfo *buffer, TransactionId *xidBase,
uint16 *tdCount)
{
XLogRedoAction action;
bool isinit = (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_PAGE) != 0;
if (isinit) {
XLogInitBufferForRedo(record, 0, buffer);
Page page = buffer->pageinfo.page;
if (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_TOAST_PAGE) {
UPageInit<UPAGE_TOAST>(page, buffer->pageinfo.pagesize, UHEAP_SPECIAL_SIZE);
} else {
UPageInit<UPAGE_HEAP>(page, buffer->pageinfo.pagesize, UHEAP_SPECIAL_SIZE, *tdCount);
}
UHeapPageHeaderData *uheappage = (UHeapPageHeaderData *)page;
uheappage->pd_xid_base = *xidBase;
uheappage->pd_multi_base = 0;
action = BLK_NEEDS_REDO;
} else {
action = XLogReadBufferForRedo(record, 0, buffer);
}
return action;
}
static UHeapDiskTuple GetUHeapDiskTupleFromMultiInsertRedoData(char **data, int *datalen, TupleBuffer &tbuf)
{
UHeapDiskTuple disktup;
XlMultiInsertUTuple *xlhdr;
errno_t rc;
xlhdr = (XlMultiInsertUTuple *)(*data);
*data = ((char *)xlhdr) + SizeOfMultiInsertUTuple;
*datalen = xlhdr->datalen;
Assert(*datalen <= (int)MaxPossibleUHeapTupleSize);
disktup = &tbuf.hdr;
rc = memset_s((char *)disktup, SizeOfUHeapDiskTupleData, 0, SizeOfUHeapDiskTupleData);
securec_check_c(rc, "\0", "\0");
rc = memcpy_s((char *)disktup + SizeOfUHeapDiskTupleData, *datalen, (char *)*data, *datalen);
securec_check_c(rc, "\0", "\0");
*data += *datalen;
*datalen += SizeOfUHeapDiskTupleData;
UHeapTupleHeaderSetTDSlot(disktup, xlhdr->td_id);
disktup->xid = (ShortTransactionId)FrozenTransactionId;
disktup->flag2 = xlhdr->flag2;
disktup->flag = xlhdr->flag;
disktup->t_hoff = xlhdr->t_hoff;
return disktup;
}
static void PerformMultiInsertRedoAction(XLogReaderState *record, XlUHeapMultiInsert *xlrec,
RedoBufferInfo *buffer, const UndoRecPtr urecptr, UHeapFreeOffsetRanges *ufreeOffsetRanges)
{
TupleBuffer tbuf;
int tdSlot = 0;
int newlen;
Size len;
bool firstTime = true;
int prevTdSlot = -1;
bool skipUndo = (xlrec->flags & XLZ_INSERT_IS_FROZEN);
bool isinit = (XLogRecGetInfo(record) & XLOG_UHEAP_INIT_PAGE) != 0;
TransactionId xid = XLogRecGetXid(record);
Page page = buffer->pageinfo.page;
UHeapBufferPage bufpage;
OffsetNumber offnum = ufreeOffsetRanges->startOffset[0];
char *tupdata = XLogRecGetBlockData(record, 0, &len);
char *endptr = tupdata + len;
for (int i = 0, j = 0; i < xlrec->ntuples; i++, offnum++) {
* If we're reinitializing the page, the tuples are stored in
* order from FirstOffsetNumber. Otherwise there's an array of
* offsets in the WAL record, and the tuples come after that.
*/
if (isinit) {
offnum = FirstOffsetNumber + i;
} else {
* Change the offset range if we've reached the end of current
* range.
*/
if (offnum > ufreeOffsetRanges->endOffset[j]) {
j++;
offnum = ufreeOffsetRanges->startOffset[j];
}
}
if (UHeapPageGetMaxOffsetNumber(page) + 1 < offnum) {
UPagePrintErrorInfo(page, "The max offset number is invalid");
}
UHeapDiskTuple uhtup = GetUHeapDiskTupleFromMultiInsertRedoData(&tupdata, &newlen, tbuf);
FastVerifyUTuple(uhtup, InvalidBuffer);
bufpage.buffer = buffer->buf;
bufpage.page = NULL;
if (UPageAddItem(NULL, &bufpage, (Item)uhtup, newlen, offnum, true) == InvalidOffsetNumber) {
UPagePrintErrorInfo(page, "UPageAddItem failed,");
}
UHeapRecordPotentialFreeSpace(buffer->buf, SHORTALIGN(newlen));
if (!skipUndo) {
tdSlot = UHeapTupleHeaderGetTDSlot(uhtup);
if (firstTime) {
prevTdSlot = tdSlot;
firstTime = false;
} else {
Assert(prevTdSlot == tdSlot);
prevTdSlot = tdSlot;
}
}
}
if (!skipUndo) {
UHeapPageSetUndo(buffer->buf, tdSlot, xid, urecptr);
}
PageSetLSN(page, record->EndRecPtr);
MarkBufferDirty(buffer->buf);
if (tupdata != endptr) {
elog(PANIC, "total tuple length mismatch");
}
}
static void UHeapXlogMultiInsert(XLogReaderState *record)
{
RelFileNode rnode;
BlockNumber blkno = InvalidBlockNumber;
RedoBufferInfo buffer = { 0 };
XlUHeapMultiInsert *xlrec = NULL;
XLogRedoAction action = BLK_NOTFOUND;
UHeapFreeOffsetRanges *ufreeOffsetRanges = NULL;
TransactionId *xidBase = NULL;
uint16 *tdCount = NULL;
bool allReplay = !AmPageRedoWorker() || !SUPPORT_USTORE_UNDO_WORKER;
bool onlyReplayUndo = allReplay ? false : parallel_recovery::DoPageRedoWorkerReplayUndo();
WHITEBOX_TEST_STUB(UHEAP_XLOG_MULTI_INSERT_FAILED, WhiteboxDefaultErrorEmit);
XLogRecGetBlockTag(record, 0, &rnode, NULL, &blkno);
UndoRecPtr urecptr = PrepareAndInsertUndoRecordForMultiInsertRedo(record, blkno, &xlrec,
&ufreeOffsetRanges, &xidBase, &tdCount);
if (allReplay || !onlyReplayUndo) {
action = GetMultiInsertRedoAction(record, &buffer, xidBase, tdCount);
}
if (action == BLK_NEEDS_REDO) {
PerformMultiInsertRedoAction(record, xlrec, &buffer, urecptr, ufreeOffsetRanges);
Page page = BufferGetPage(buffer.buf);
UpageVerify((UHeapPageHeader)page, t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr, NULL,
NULL, &rnode, blkno, true);
}
pfree(ufreeOffsetRanges);
if (allReplay || !onlyReplayUndo) {
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
}
}
static void UHeapXlogBaseShift(XLogReaderState *record)
{
RedoBufferInfo buffer = { 0 };
XLogRecPtr lsn = record->EndRecPtr;
BlockNumber blkno = InvalidBlockNumber;
if (XLogReadBufferForRedo(record, HEAP_BASESHIFT_ORIG_BLOCK_NUM, &buffer) == BLK_NEEDS_REDO) {
char *maindata = XLogRecGetData(record);
XlUHeapBaseShift *xlrec = (XlUHeapBaseShift *)maindata;
Page page = buffer.pageinfo.page;
UHeapPageShiftBase(InvalidBuffer, page, xlrec->multi, xlrec->delta);
XLogRecGetBlockTag(record, HEAP_BASESHIFT_ORIG_BLOCK_NUM, NULL, NULL, &blkno);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer.buf);
UpageVerify((UHeapPageHeader)page, t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr, NULL,
NULL, NULL, blkno, true);
}
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
}
static void UHeapXlogExtendTDSlot(XLogReaderState *record)
{
RedoBufferInfo buffer = { 0 };
XLogRecPtr lsn = record->EndRecPtr;
Page page;
XLogRedoAction action = (XLogRedoAction)-1;
errno_t ret = EOK;
XlUHeapExtendTdSlots *xlrec = NULL;
BlockNumber blkno = InvalidBlockNumber;
xlrec = (XlUHeapExtendTdSlots *)XLogRecGetData(record);
action = XLogReadBufferForRedo(record, 0, &buffer);
page = BufferGetPage(buffer.buf);
(void) XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
if (action == BLK_NEEDS_REDO) {
int i;
uint8 extendedSlots;
uint8 prevSlots;
size_t linePtrSize;
char *start;
char *end;
TD *thistrans;
UHeapPageTDData *tdPtr;
UHeapPageHeaderData *phdr = (UHeapPageHeaderData *)page;
prevSlots = xlrec->nPrevSlots;
extendedSlots = xlrec->nExtended;
tdPtr = (UHeapPageTDData *)PageGetTDPointer(page);
Assert(prevSlots == phdr->td_count);
* Move the line pointers ahead in the page to make room for
* added transaction slots.
*/
start = page + SizeOfUHeapPageHeaderData + (prevSlots * sizeof(TD));
end = page + phdr->pd_lower;
linePtrSize = end - start;
ret = memmove_s((char *)start + (extendedSlots * sizeof(TD)), linePtrSize, start, linePtrSize);
securec_check(ret, "\0", "\0");
* Initialize the new transaction slots
*/
for (i = prevSlots; i < prevSlots + extendedSlots; i++) {
thistrans = &tdPtr->td_info[i];
thistrans->xactid = InvalidTransactionId;
thistrans->undo_record_ptr = INVALID_UNDO_REC_PTR;
}
* Re-initialize number of txn slots and begining
* of free space in the header.
*/
phdr->td_count = prevSlots + extendedSlots;
phdr->pd_lower += extendedSlots * sizeof(TD);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer.buf);
UpageVerify((UHeapPageHeader)page, t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr, NULL,
NULL, NULL, blkno, true);
}
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
}
#ifdef ENABLE_MULTIPLE_NODES
const static bool SUPPORT_HOT_STANDBY = false;
#else
const static bool SUPPORT_HOT_STANDBY = true;
#endif
static void UHeapXlogFreeze(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
XlUHeapFreeze *xlrec = (XlUHeapFreeze *)XLogRecGetData(record);
TransactionId cutoffXid = xlrec->cutoff_xid;
RedoBufferInfo buffer = { 0 };
UHeapTupleData utuple;
RelFileNode rnode;
BlockNumber blkno = InvalidBlockNumber;
(void)XLogRecGetBlockTag(record, HEAP_FREEZE_ORIG_BLOCK_NUM, &rnode, NULL, &blkno);
* In Hot Standby mode, ensure that there's no queries running which still
* consider the frozen xids as running.
*/
if (InHotStandby && SUPPORT_HOT_STANDBY) {
ResolveRecoveryConflictWithSnapshot(cutoffXid, rnode, record->ReadRecPtr);
}
if (XLogReadBufferForRedo(record, HEAP_FREEZE_ORIG_BLOCK_NUM, &buffer) == BLK_NEEDS_REDO) {
Size blkdatalen = 0;
char *blkdata = XLogRecGetBlockData(record, HEAP_FREEZE_ORIG_BLOCK_NUM, &blkdatalen);
Page page = buffer.pageinfo.page;
TransactionId cutoffXid = xlrec->cutoff_xid;
OffsetNumber *offsets = (OffsetNumber *)blkdata;
OffsetNumber *offsetsEnd = NULL;
UHeapPageHeader upageHeader = (UHeapPageHeader) page;
uint16 maxPageOffsetCnt = (BLCKSZ- (SizeOfUHeapPageHeaderData + SizeOfUHeapTDData(upageHeader) +
UHEAP_SPECIAL_SIZE)) /sizeof(RowPtr);
if ((blkdatalen / sizeof(OffsetNumber)) > maxPageOffsetCnt) {
elog(PANIC, "Invalid max offset number(%u), max(%u).", (uint32)(blkdatalen / sizeof(OffsetNumber)),
maxPageOffsetCnt);
}
if (blkdatalen > 0) {
offsetsEnd = (OffsetNumber *)((char *)offsets + blkdatalen);
while (offsets < offsetsEnd) {
RowPtr *rowptr = UPageGetRowPtr(page, *offsets);
Assert(RowPtrIsNormal(rowptr));
if (RowPtrIsNormal(rowptr) && rowptr->len >= BLCKSZ) {
elog(PANIC, "Invalid rp(%u) length(%u)", rowptr->offset, rowptr->len);
}
utuple.disk_tuple = (UHeapDiskTuple)UPageGetRowData(page, rowptr);
utuple.disk_tuple_size = RowPtrGetLen(rowptr);
UHeapTupleCopyBaseFromPage(&utuple, page);
TransactionId xid = UHeapTupleGetRawXid(&utuple);
if (TransactionIdIsNormal(xid) && TransactionIdPrecedes(xid, cutoffXid)) {
UHeapTupleSetRawXid((&utuple), FrozenTransactionId);
}
offsets++;
}
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer.buf);
UpageVerify((UHeapPageHeader)page, t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr, NULL,
NULL, &rnode, blkno, true);
}
if (BufferIsValid(buffer.buf)) {
UnlockReleaseBuffer(buffer.buf);
}
}
void UHeapXlogNewPage(XLogReaderState *record)
{
RedoBufferInfo buffer = { 0 };
if (XLogReadBufferForRedo(record, UHEAP_NEWPAGE_ORIG_BLOCK_NUM, &buffer) != BLK_RESTORED) {
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("unexpected result when restoring backup block")));
}
UnlockReleaseBuffer(buffer.buf);
}
void UHeapRedo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
* These operations don't overwrite MVCC data so no conflict processing is
* required. The ones in heap2 rmgr do.
*/
switch (info & XLOG_UHEAP_OPMASK) {
case XLOG_UHEAP_INSERT:
UHeapXlogInsert(record);
break;
case XLOG_UHEAP_DELETE:
UHeapXlogDelete(record);
break;
case XLOG_UHEAP_UPDATE:
UHeapXlogUpdate(record);
break;
case XLOG_UHEAP_FREEZE_TD_SLOT:
UHeapXlogFreezeTdSlot(record);
break;
case XLOG_UHEAP_INVALID_TD_SLOT:
UHeapXlogInvalidTdSlot(record);
break;
case XLOG_UHEAP_CLEAN:
UHeapXlogClean(record);
break;
case XLOG_UHEAP_MULTI_INSERT:
UHeapXlogMultiInsert(record);
break;
case XLOG_UHEAP_NEW_PAGE:
UHeapXlogNewPage(record);
break;
default:
ereport(PANIC, (errmsg("UHeapRedo: unknown op code %u", (uint8)info)));
}
elog(DEBUG2, "UHeapRedo called lsn %016lx", record->EndRecPtr);
}
void UHeap2Redo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
* These operations don't overwrite MVCC data so no conflict processing is
* required. The ones in heap2 rmgr do.
*/
switch (info & XLOG_UHEAP_OPMASK) {
case XLOG_UHEAP2_BASE_SHIFT:
UHeapXlogBaseShift(record);
break;
case XLOG_UHEAP2_FREEZE:
UHeapXlogFreeze(record);
break;
case XLOG_UHEAP2_EXTEND_TD_SLOTS:
UHeapXlogExtendTDSlot(record);
break;
default:
ereport(PANIC, (errmsg("UHeap2Redo: unknown op code %u", (uint8)info)));
}
elog(DEBUG2, "UHeap2Redo called lsn %016lx", record->EndRecPtr);
}
static void UHeapUndoXlogPageRestore(char *curxlogptr, Buffer buffer, Page page)
{
OffsetNumber *xlogMinLPOffset = (OffsetNumber *)curxlogptr;
curxlogptr += sizeof(OffsetNumber);
OffsetNumber *xlogMaxLPOffset = (OffsetNumber *)curxlogptr;
curxlogptr += sizeof(OffsetNumber);
Assert(*xlogMinLPOffset > InvalidOffsetNumber);
Assert(*xlogMaxLPOffset <= MaxOffsetNumber);
if (*xlogMaxLPOffset >= *xlogMinLPOffset) {
size_t lpSize = (*xlogMaxLPOffset - *xlogMinLPOffset + 1) * sizeof(RowPtr);
error_t rc = memcpy_s((char *)UPageGetRowPtr(page, *xlogMinLPOffset), lpSize, curxlogptr, lpSize);
securec_check_c(rc, "\0", "\0");
curxlogptr += lpSize;
Offset *xlogCopyStartOffset = (Offset *)curxlogptr;
curxlogptr += sizeof(Offset);
Offset *xlogCopyEndOffset = (Offset *)curxlogptr;
curxlogptr += sizeof(Offset);
Assert(*xlogCopyStartOffset > (Offset)SizeOfUHeapPageHeaderData);
Assert(*xlogCopyEndOffset <= BLCKSZ);
if (*xlogCopyEndOffset > *xlogCopyStartOffset) {
size_t dataSize = *xlogCopyEndOffset - *xlogCopyStartOffset;
rc = memcpy_s((char *)page + *xlogCopyStartOffset, dataSize, curxlogptr, dataSize);
securec_check_c(rc, "\0", "\0");
curxlogptr += dataSize;
}
TransactionId *pdPruneXid = (TransactionId *)curxlogptr;
curxlogptr += sizeof(TransactionId);
uint16 *pdFlags = (uint16 *)curxlogptr;
curxlogptr += sizeof(uint16);
uint16 *potentialFreespace = (uint16 *)curxlogptr;
curxlogptr += sizeof(uint16);
UHeapPageHeaderData *phdr = (UHeapPageHeaderData *)page;
phdr->pd_flags = *pdFlags;
phdr->pd_prune_xid = *pdPruneXid;
phdr->potential_freespace = *potentialFreespace;
}
int *tdSlotId = (int *)curxlogptr;
curxlogptr += sizeof(int);
TransactionId *xid = (TransactionId *)curxlogptr;
curxlogptr += sizeof(TransactionId);
UndoRecPtr *slotPrevUrp = (UndoRecPtr *)curxlogptr;
curxlogptr += sizeof(UndoRecPtr);
UHeapPageSetUndo(buffer, *tdSlotId, *xid, *slotPrevUrp);
}
* replay of undo page operation
*/
static void UHeapUndoXlogPage(XLogReaderState *record)
{
RedoBufferInfo redoBuffInfo = { 0 };
uint8 *flags = (uint8 *)XLogRecGetData(record);
char *curxlogptr = (char *)((char *)flags + sizeof(uint8));
XLogRedoAction action = XLogReadBufferForRedo(record, 0, &redoBuffInfo);
Buffer buf = redoBuffInfo.buf;
BlockNumber blkno = InvalidBlockNumber;
XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
if (action == BLK_NEEDS_REDO) {
Page page = BufferGetPage(buf);
if (*flags & XLU_INIT_PAGE) {
TransactionId *xidBase = (TransactionId *)curxlogptr;
curxlogptr += sizeof(TransactionId);
uint16 *tdCount = (uint16 *)curxlogptr;
curxlogptr += sizeof(uint16);
if (*flags & XLOG_UHEAP_INIT_TOAST_PAGE) {
UPageInit<UPAGE_TOAST>(page, BufferGetPageSize(buf), UHEAP_SPECIAL_SIZE);
} else {
UPageInit<UPAGE_HEAP>(page, BufferGetPageSize(buf), UHEAP_SPECIAL_SIZE, *tdCount);
}
UHeapPageHeaderData *uheappage = (UHeapPageHeaderData *)page;
uheappage->pd_xid_base = *xidBase;
uheappage->pd_multi_base = 0;
} else {
UHeapUndoXlogPageRestore(curxlogptr, buf, page);
}
PageSetLSN(page, record->EndRecPtr);
MarkBufferDirty(buf);
UpageVerify((UHeapPageHeader)page, t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr, NULL,
NULL, NULL, blkno, true);
}
if (BufferIsValid(buf))
UnlockReleaseBuffer(buf);
}
static void UHeapUndoXlogResetXid(XLogReaderState *record)
{
RedoBufferInfo redoBuffInfo = { 0 };
XLogRecPtr lsn = record->EndRecPtr;
XlUHeapUndoResetSlot *xlrec = (XlUHeapUndoResetSlot *)XLogRecGetData(record);
XLogRedoAction action = XLogReadBufferForRedo(record, 0, &redoBuffInfo);
BlockNumber blkno = InvalidBlockNumber;
(void) XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
Buffer buf = redoBuffInfo.buf;
if (action == BLK_NEEDS_REDO) {
UHeapPageSetUndo(buf, xlrec->td_slot_id, InvalidTransactionId, xlrec->urec_ptr);
PageSetLSN(BufferGetPage(buf), lsn);
MarkBufferDirty(buf);
Page page = BufferGetPage(buf);
UpageVerify((UHeapPageHeader)page, t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr, NULL,
NULL, NULL, blkno, true);
}
if (BufferIsValid(buf))
UnlockReleaseBuffer(buf);
}
static void UHeapUndoXlogAbortSpecinsert(XLogReaderState *record)
{
RedoBufferInfo redoBuffInfo = { 0 };
uint8 *flags = (uint8 *)XLogRecGetData(record);
XLogRecPtr lsn = record->EndRecPtr;
XlUHeapUndoAbortSpecInsert *xlrec = (XlUHeapUndoAbortSpecInsert *)((char *)flags + sizeof(uint8));
XLogRedoAction action = XLogReadBufferForRedo(record, 0, &redoBuffInfo);
Buffer buf = redoBuffInfo.buf;
BlockNumber blkno = InvalidBlockNumber;
(void) XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
if (action == BLK_NEEDS_REDO) {
bool relhasindex = *flags & XLU_ABORT_SPECINSERT_REL_HAS_INDEX;
bool isPageInit = *flags & XLU_ABORT_SPECINSERT_INIT_PAGE;
bool hasValidPrevurp = *flags & XLU_ABORT_SPECINSERT_PREVURP_VALID;
bool hasValidXid = *flags & XLU_ABORT_SPECINSERT_XID_VALID;
TransactionId *xid = NULL, *xidbase = NULL;
uint16 *tdCount = NULL;
UndoRecPtr *prevurp = NULL;
char *curxlogptr = (char *)((char *)xlrec + SizeOfUHeapUndoAbortSpecInsert);
int tdSlot;
if (isPageInit) {
xidbase = (TransactionId *)curxlogptr;
curxlogptr += sizeof(TransactionId);
tdCount = (uint16 *)curxlogptr;
curxlogptr += sizeof(uint16);
}
if (hasValidXid) {
xid = (TransactionId *)curxlogptr;
curxlogptr += sizeof(TransactionId);
}
if (hasValidPrevurp) {
prevurp = (UndoRecPtr *)curxlogptr;
curxlogptr += sizeof(UndoRecPtr);
}
ExecuteUndoForInsertRecovery(buf, xlrec->offset, XLogRecGetXid(record), relhasindex, &tdSlot);
UHeapPageSetUndo(buf, tdSlot, (xid == NULL) ? InvalidTransactionId : *xid,
(prevurp == NULL) ? INVALID_UNDO_REC_PTR : *prevurp);
if (isPageInit) {
Page page = BufferGetPage(buf);
UPageInit<UPAGE_HEAP>(page, BufferGetPageSize(buf), UHEAP_SPECIAL_SIZE, *tdCount);
UHeapPageHeaderData *uheappage = (UHeapPageHeaderData *)page;
uheappage->pd_xid_base = *xidbase;
uheappage->pd_multi_base = 0;
}
PageSetLSN(BufferGetPage(buf), lsn);
MarkBufferDirty(buf);
Page page = BufferGetPage(buf);
UpageVerify((UHeapPageHeader)page, t_thrd.shemem_ptr_cxt.XLogCtl->RedoRecPtr, NULL,
NULL, NULL, blkno, true);
}
if (BufferIsValid(buf))
UnlockReleaseBuffer(buf);
}
void UHeapUndoRedo(XLogReaderState *record)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
switch (info) {
case XLOG_UHEAPUNDO_PAGE:
UHeapUndoXlogPage(record);
break;
case XLOG_UHEAPUNDO_RESET_SLOT:
UHeapUndoXlogResetXid(record);
break;
case XLOG_UHEAPUNDO_ABORT_SPECINSERT:
UHeapUndoXlogAbortSpecinsert(record);
break;
default:
elog(PANIC, "UHeapUndoRedo: unknown op code %u", info);
}
}
static TransactionId UHeapXlogGetCurrentXidInsert(XLogReaderState *record, bool hasCSN)
{
XlUHeapInsert *xlrec = (XlUHeapInsert *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapInsert +
SizeOfXLOGCSN(hasCSN));
bool hasCurrentXid = ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0);
if (!hasCurrentXid) {
return XLogRecGetXid(record);
}
char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader);
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_BLK_PREV) != 0) {
currLogPtr += sizeof(UndoRecPtr);
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PREV_URP) != 0) {
currLogPtr += sizeof(UndoRecPtr);
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PARTITION_OID) != 0) {
currLogPtr += sizeof(Oid);
}
return *(TransactionId*)(currLogPtr);
}
static TransactionId UHeapXlogGetCurrentXidDelete(XLogReaderState *record, bool hasCSN)
{
XlUHeapDelete *xlrec = (XlUHeapDelete *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapDelete +
SizeOfXLOGCSN(hasCSN));
bool hasCurrentXid = ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0);
if (!hasCurrentXid) {
return XLogRecGetXid(record);
}
char *currLogPtr = ((char *)xlundohdr + SizeOfXLUndoHeader);
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_SUB_XACT) != 0) {
currLogPtr += sizeof(bool);
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_BLK_PREV) != 0) {
currLogPtr += sizeof(UndoRecPtr);
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PREV_URP) != 0) {
currLogPtr += sizeof(UndoRecPtr);
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PARTITION_OID) != 0) {
currLogPtr += sizeof(Oid);
}
return *(TransactionId*)(currLogPtr);
}
static TransactionId UHeapXlogGetCurrentXidUpdate(XLogReaderState *record, bool hasCSN)
{
XlUHeapUpdate *xlrec = (XlUHeapUpdate *)XLogRecGetData(record);
XlUndoHeader *xlundohdr = (XlUndoHeader *)((char *)xlrec + SizeOfUHeapUpdate +
SizeOfXLOGCSN(hasCSN));
bool hasCurrentXid = ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0);
if (!hasCurrentXid) {
return XLogRecGetXid(record);
}
char *currLogPtr = (char *)xlundohdr + SizeOfXLUndoHeader;
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_SUB_XACT) != 0) {
currLogPtr += sizeof(bool);
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_BLK_PREV) != 0) {
currLogPtr += sizeof(UndoRecPtr);
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PREV_URP) != 0) {
currLogPtr += sizeof(UndoRecPtr);
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PARTITION_OID) != 0) {
currLogPtr += sizeof(Oid);
}
return *(TransactionId*)(currLogPtr);
}
static TransactionId UHeapXlogGetCurrentXidMultiInsert(XLogReaderState *record)
{
XlUndoHeader *xlundohdr = (XlUndoHeader *)XLogRecGetData(record);
bool hasCurrentXid = ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_CURRENT_XID) != 0);
if (!hasCurrentXid) {
return XLogRecGetXid(record);
}
char *curxlogptr = (char *)xlundohdr + SizeOfXLUndoHeader;
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_BLK_PREV) != 0) {
curxlogptr += sizeof(UndoRecPtr);
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PREV_URP) != 0) {
curxlogptr += sizeof(UndoRecPtr);
}
if ((xlundohdr->flag & XLOG_UNDO_HEADER_HAS_PARTITION_OID) != 0) {
curxlogptr += sizeof(Oid);
}
return *(TransactionId*)(curxlogptr);
}
TransactionId UHeapXlogGetCurrentXid(XLogReaderState *record, bool hasCSN)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
switch (info & XLOG_UHEAP_OPMASK) {
case XLOG_UHEAP_INSERT:
return UHeapXlogGetCurrentXidInsert(record, hasCSN);
case XLOG_UHEAP_DELETE:
return UHeapXlogGetCurrentXidDelete(record, hasCSN);
case XLOG_UHEAP_UPDATE:
return UHeapXlogGetCurrentXidUpdate(record, hasCSN);
case XLOG_UHEAP_FREEZE_TD_SLOT:
case XLOG_UHEAP_INVALID_TD_SLOT:
case XLOG_UHEAP_CLEAN:
break;
case XLOG_UHEAP_MULTI_INSERT:
return UHeapXlogGetCurrentXidMultiInsert(record);
default:
ereport(PANIC, (errmsg("UHeapRedo: unknown op code %u", (uint8)info)));
}
return XLogRecGetXid(record);
}