*
* knl_upage.cpp
* the page format of inplace update engine.
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/gausskernel/storage/access/ustore/knl_upage.cpp
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "storage/buf/bufmgr.h"
#include "utils/rel.h"
#include "access/transam.h"
#include "access/ustore/knl_upage.h"
#include "access/ustore/knl_uvisibility.h"
#include "storage/freespace.h"
#ifdef DEBUG_UHEAP
#include "pgstat.h"
#endif
#define ISNULL_BITMAP_NUMBER 2
#define HIGH_BITS_LENGTH_OF_LSN 32
static void UpageVerifyTuple(UHeapPageHeader header, OffsetNumber off, TupleDesc tupDesc, Relation rel,
RelFileNode* rNode, BlockNumber blkno, bool isRedo = false);
static void UpageVerifyAllRowptr(UHeapPageHeader header, RelFileNode* rNode, BlockNumber blkno, bool isRedo = false);
static void UpageVerifyRowptr(RowPtr *rowPtr, Page page, OffsetNumber offnum, RelFileNode* rNode, BlockNumber blkno);
template<UPageType upagetype> void UPageInit(Page page, Size pageSize, Size specialSize, uint8 tdSlots)
{
Assert(pageSize == BLCKSZ);
char *ptr = (char *)page;
memset_s(ptr, pageSize, 0, pageSize);
* allow us to have compile time bindings for all the page types. e.g. See the instatiation for UPAGE_HEAP below. */
switch (upagetype) {
case UPAGE_INDEX: {
break;
}
default: {
Assert(0);
break;
}
}
}
template<> void UPageInit<UPAGE_HEAP>(Page page, Size page_size, Size special_size, uint8 tdSlots)
{
Assert(page_size == BLCKSZ);
char *ptr = (char *)page;
memset_s(ptr, page_size, 0, page_size);
Assert(page_size > (SizeOfUHeapPageHeaderData + special_size + (tdSlots * sizeof(TD))));
PageInit(page, page_size, special_size, false);
ptr += SizeOfUHeapPageHeaderData;
UHeapPageHeaderData *uheap_page = (UHeapPageHeaderData *)page;
uheap_page->td_count = tdSlots;
uheap_page->pd_lower = SizeOfUHeapPageHeaderData + tdSlots * sizeof(TD);
uheap_page->pd_upper = page_size - special_size;
uheap_page->potential_freespace = uheap_page->pd_upper - uheap_page->pd_lower;
PageSetPageSizeAndVersion(page, page_size, PG_UHEAP_PAGE_LAYOUT_VERSION);
}
template<> void UPageInit<UPAGE_TOAST>(Page page, Size page_size, Size special_size, uint8 tdSlots)
{
Assert(page_size == BLCKSZ);
char *ptr = (char *)page;
memset_s(ptr, page_size, 0, page_size);
Assert(page_size > (SizeOfUHeapPageHeaderData + special_size + (UHEAP_DEFAULT_TOAST_TD_COUNT * sizeof(TD))));
PageInit(page, page_size, special_size, false);
ptr += SizeOfUHeapPageHeaderData;
UHeapPageHeaderData *uheap_page = (UHeapPageHeaderData *)page;
uheap_page->td_count = UHEAP_DEFAULT_TOAST_TD_COUNT;
uheap_page->pd_lower = SizeOfUHeapPageHeaderData + UHEAP_DEFAULT_TOAST_TD_COUNT * sizeof(TD);
uheap_page->pd_upper = page_size - special_size;
uheap_page->potential_freespace = uheap_page->pd_upper - uheap_page->pd_lower;
PageSetPageSizeAndVersion(page, page_size, PG_UHEAP_PAGE_LAYOUT_VERSION);
}
static bool CheckOffsetValidity(Page page, OffsetNumber offsetNumber, bool *needshuffle, bool overwrite)
{
RowPtr *itemId = NULL;
OffsetNumber limit;
UHeapPageHeaderData *uphdr = (UHeapPageHeaderData *)page;
limit = OffsetNumberNext(UHeapPageGetMaxOffsetNumber(page));
if (overwrite) {
if (offsetNumber < limit) {
itemId = UPageGetRowPtr(uphdr, offsetNumber);
if (t_thrd.xlog_cxt.InRecovery) {
if (RowPtrHasStorage(itemId)) {
elog(WARNING, "will not overwrite a used ItemId");
return false;
}
} else if (RowPtrIsUsed(itemId) || RowPtrHasStorage(itemId)) {
elog(WARNING, "will not overwrite a used ItemId");
return false;
}
}
} else {
if (offsetNumber < limit) {
*needshuffle = true;
}
}
return true;
}
static void FindNextFreeSlot(const UHeapBufferPage *bufpage, Page input_page, OffsetNumber *offsetNumber)
{
RowPtr *itemId = NULL;
Page page = bufpage->page;
UHeapPageHeaderData *uphdr = (UHeapPageHeaderData *)page;
OffsetNumber limit;
limit = OffsetNumberNext(UHeapPageGetMaxOffsetNumber(page));
if (UPageHasFreeLinePointers(uphdr)) {
bool hasPendingXact = false;
* Look for "recyclable" (unused) ItemId. We check for no storage
* as well, just to be paranoid --- unused items should never have
* storage.
*/
for (*offsetNumber = FirstOffsetNumber; *offsetNumber < limit; (*offsetNumber)++) {
itemId = UPageGetRowPtr(uphdr, *offsetNumber);
if (!RowPtrIsUsed(itemId) && !RowPtrHasStorage(itemId)) {
break;
}
}
if (*offsetNumber >= limit && !hasPendingXact) {
UPageClearHasFreeLinePointers(uphdr);
}
} else {
*offsetNumber = limit;
}
}
static bool CalculateLowerUpperPointers(Page page, OffsetNumber offsetNumber, Item item, Size size, bool needshuffle)
{
int lower;
int upper;
Size alignedSize;
RowPtr *itemId = NULL;
OffsetNumber limit;
UHeapPageHeaderData *uphdr = (UHeapPageHeaderData *)page;
limit = OffsetNumberNext(UHeapPageGetMaxOffsetNumber(page));
if (offsetNumber > limit) {
elog(WARNING, "specified item offset is too large");
return false;
}
* Compute new lower and upper pointers for page, see if it'll fit.
*
* Note: do arithmetic as signed ints, to avoid mistakes if, say, size >
* pd_upper.
*/
if (offsetNumber == limit || needshuffle) {
lower = uphdr->pd_lower + sizeof(ItemIdData);
if (uphdr->potential_freespace >= sizeof(ItemIdData)) {
uphdr->potential_freespace -= sizeof(ItemIdData);
} else {
uphdr->potential_freespace = 0;
}
} else {
lower = uphdr->pd_lower;
}
alignedSize = SHORTALIGN(size);
upper = (int) uphdr->pd_upper - (int) alignedSize;
if (lower > upper) {
return false;
}
* OK to insert the item. First, shuffle the existing pointers if needed.
*/
itemId = UPageGenerateRowPtr(uphdr, offsetNumber);
if (needshuffle) {
errno_t rc = memmove_s(itemId + 1, (limit - offsetNumber) * sizeof(ItemIdData),
itemId, (limit - offsetNumber) * sizeof(ItemIdData));
securec_check(rc, "\0", "\0");
}
SetNormalRowPointer(itemId, upper, size);
errno_t rc = memcpy_s((char *) page + upper, size, item, size);
securec_check(rc, "\0", "\0");
uphdr->pd_lower = (uint16) lower;
uphdr->pd_upper = (uint16) upper;
return true;
}
* UPageAddItem: this function is similar to PageAddItem, but we
* removed the flag is_heap since UPageAddItem is only called on
* a UHeap data page
*/
OffsetNumber UPageAddItem(Relation rel, UHeapBufferPage *bufpage, Item item, Size size,
OffsetNumber offsetNumber, bool overwrite)
{
Page page, input_page = NULL;
bool needshuffle = false;
bool offsetvalid = false;
if (BufferIsValid(bufpage->buffer)) {
Assert(!PageIsValid(bufpage->page));
page = BufferGetPage(bufpage->buffer);
bufpage->page = page;
} else {
Assert(PageIsValid(bufpage->page));
page = bufpage->page;
input_page = bufpage->page;
}
Assert(PageIsValid(page));
UHeapPageHeaderData *uphdr = (UHeapPageHeaderData *)page;
* Be wary about corrupted page pointers
* Fixme: Add more sanity checks for page
*/
if (uphdr->pd_lower > uphdr->pd_upper)
ereport(PANIC, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: pd_lower = %u, pd_upper = %u", uphdr->pd_lower,
uphdr->pd_upper)));
if (OffsetNumberIsValid(offsetNumber)) {
offsetvalid = CheckOffsetValidity(page, offsetNumber, &needshuffle, overwrite);
if (!offsetvalid) {
return InvalidOffsetNumber;
}
} else {
FindNextFreeSlot(bufpage, input_page, &offsetNumber);
}
if (!t_thrd.xlog_cxt.InRecovery && offsetNumber > CalculatedMaxUHeapTuplesPerPage(RelationGetInitTd(rel))) {
elog(WARNING, "can't put (%d)th item in a Ustore page with max %d items",
offsetNumber, CalculatedMaxUHeapTuplesPerPage(RelationGetInitTd(rel)));
return InvalidOffsetNumber;
}
if (!CalculateLowerUpperPointers(page, offsetNumber, item, size, needshuffle)) {
return InvalidOffsetNumber;
}
return offsetNumber;
}
* UHeapGetTuple
*
* Copy a raw tuple from a uheap page, forming a UHeapTuple.
*/
UHeapTuple UHeapGetTuple(Relation relation, Buffer buffer, OffsetNumber offnum, UHeapTuple freebuf)
{
Page dp;
RowPtr *rp;
Size tupleLen;
UHeapDiskTuple item;
UHeapTuple tuple;
#ifdef DEBUG_UHEAP
UHEAPSTAT_COUNT_TUPLE_VISITS();
#endif
dp = BufferGetPage(buffer);
rp = UPageGetRowPtr(dp, offnum);
Assert(offnum >= FirstOffsetNumber && offnum <= UHeapPageGetMaxOffsetNumber(dp));
Assert(RowPtrIsNormal(rp));
tupleLen = RowPtrIsDeleted(rp) ? 0 : RowPtrGetLen(rp);
if (freebuf) {
Assert(freebuf->disk_tuple != NULL);
tuple = freebuf;
* The caller is using a pre allocated buffer, eg: TidScan.
* Make sure to initialize the correct tuple type.
*/
tuple->tupTableType = UHEAP_TUPLE;
} else {
tuple = (UHeapTuple)uheaptup_alloc(UHeapTupleDataSize + tupleLen);
tuple->disk_tuple = (UHeapDiskTuple)((char *)tuple + UHeapTupleDataSize);
}
tuple->table_oid = RelationGetRelid(relation);
tuple->xc_node_id = u_sess->pgxc_cxt.PGXCNodeIdentifier;
tuple->disk_tuple_size = tupleLen;
ItemPointerSet(&tuple->ctid, BufferGetBlockNumber(buffer), offnum);
item = (UHeapDiskTuple)UPageGetRowData(dp, rp);
errno_t rc = memcpy_s(tuple->disk_tuple, tupleLen, item, tupleLen);
securec_check(rc, "", "");
return tuple;
}
* UHeapGetTuplePartial
*
* Copy part of a raw tuple from a uheap page, forming a UHeapTuple that excludes some columns
* Columns that aren't copied are marked as NULL
*/
UHeapTuple UHeapGetTuplePartial(Relation relation, Buffer buffer, OffsetNumber offnum, AttrNumber lastVar,
bool *boolArr)
{
Page dp = BufferGetPage(buffer);
RowPtr *rp = UPageGetRowPtr(dp, offnum);
bool enableReverseBitmap = false;
#ifdef DEBUG_UHEAP
UHEAPSTAT_COUNT_TUPLE_VISITS();
#endif
Assert(offnum >= FirstOffsetNumber && offnum <= UHeapPageGetMaxOffsetNumber(dp));
Assert(RowPtrIsNormal(rp));
UHeapDiskTuple item = (UHeapDiskTuple)UPageGetRowData(dp, rp);
Size tupleLen = RowPtrIsDeleted(rp) ? 0 : RowPtrGetLen(rp);
TupleDesc rowDesc = RelationGetDescr(relation);
#ifdef USE_ASSERT_CHECKING
int natts = rowDesc->natts;
#endif
int diskTupleNAttrs = UHeapTupleHeaderGetNatts(item);
enableReverseBitmap = NAttrsReserveSpace(diskTupleNAttrs);
uint8 nullsLen = BITMAPLEN(diskTupleNAttrs);
if (enableReverseBitmap) {
nullsLen += BITMAPLEN(diskTupleNAttrs);
}
if (lastVar > 0) {
tupleLen += (nullsLen + UHEAP_MAX_ATTR_PAD);
if (tupleLen > MaxUHeapTupleSize(relation)) {
lastVar = -1;
}
}
UHeapTuple tuple = (UHeapTuple)uheaptup_alloc(UHeapTupleDataSize + tupleLen);
tuple->disk_tuple = (UHeapDiskTuple)((char *)tuple + UHeapTupleDataSize);
tuple->table_oid = RelationGetRelid(relation);
tuple->xc_node_id = u_sess->pgxc_cxt.PGXCNodeIdentifier;
tuple->disk_tuple_size = tupleLen;
ItemPointerSet(&tuple->ctid, BufferGetBlockNumber(buffer), offnum);
if (lastVar <= 0) {
errno_t rc = memcpy_s(tuple->disk_tuple, tupleLen, item, tupleLen);
securec_check(rc, "", "");
} else {
Assert(lastVar <= natts);
errno_t rc = memcpy_s(tuple->disk_tuple, SizeOfUHeapDiskTupleTillThoff, item, SizeOfUHeapDiskTupleTillThoff);
securec_check(rc, "", "");
UHeapDiskTupSetHasNulls(tuple->disk_tuple);
tuple->disk_tuple->t_hoff = SizeOfUHeapDiskTupleData + nullsLen;
rc = memset_s(tuple->disk_tuple->data, nullsLen, 0, nullsLen);
securec_check(rc, "", "");
if (UHeapDiskTupNoNulls(item)) {
UHeapCopyDiskTupleNoNull(rowDesc, boolArr, tuple, lastVar, item);
} else {
UHeapCopyDiskTupleWithNulls(rowDesc, boolArr, tuple, lastVar, item);
}
}
FastVerifyUTuple(tuple->disk_tuple, buffer);
Assert(UHeapTupleHeaderGetNatts(tuple->disk_tuple) == diskTupleNAttrs);
return tuple;
}
static Size UPageGetFreeSpace(Page page)
{
int space = (int)((UHeapPageHeaderData *)page)->pd_upper -
(int)((UHeapPageHeaderData *)page)->pd_lower;
if (space < (int)sizeof(RowPtr)) {
return 0;
}
space -= sizeof(RowPtr);
return (Size)space;
}
* PageGetUHeapFreeSpace
* Returns the size of the free (allocatable) space on a uheap page,
* reduced by the space needed for a new line pointer.
*
* This is same as PageGetHeapFreeSpace except for max tuples that can
* be accommodated on a page or the way unused items are dealt.
*/
Size PageGetUHeapFreeSpace(Page page)
{
Size space;
space = UPageGetFreeSpace(page);
if (space > 0) {
OffsetNumber offnum, nline;
nline = UHeapPageGetMaxOffsetNumber(page);
if (nline >= CalculatedMaxUHeapTuplesPerPage(UPageGetTDSlotCount(page))) {
if (UPageHasFreeLinePointers(page)) {
* Since this is just a hint, we must confirm that there is
* indeed a free line pointer
*/
for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum)) {
RowPtr *lp = UPageGetRowPtr(page, offnum);
* The unused items that have pending xact information
* can't be reused.
*/
if (!RowPtrIsUsed(lp))
break;
}
if (offnum > nline) {
* The hint is wrong, but we can't clear it here since we
* don't have the ability to mark the page dirty.
*/
space = 0;
}
} else {
* Although the hint might be wrong, PageAddItem will believe
* it anyway, so we must believe it too.
*/
space = 0;
}
}
}
return space;
}
* PageGetExactUHeapFreeSpace
* Returns the size of the free (allocatable) space on a page,
* without any consideration for adding/removing line pointers.
*/
Size PageGetExactUHeapFreeSpace(Page page)
{
int space;
* Use signed arithmetic here so that we behave sensibly if pd_lower >
* pd_upper.
*/
space = (int)((UHeapPageHeaderData *)page)->pd_upper - (int)((UHeapPageHeaderData *)page)->pd_lower;
if (space < 0) {
return 0;
}
return (Size)space;
}
static UHeapFreeOffsetRanges *
LocateUsableItemIds(Page page, UHeapTuple *tuples, int ntuples, Size saveFreeSpace, int *nthispage, Size *usedSpace)
{
bool inRange = false;
OffsetNumber offsetNumber;
OffsetNumber limit;
Size availSpace;
UHeapFreeOffsetRanges *ufreeOffsetRanges;
bool isFirstInsert = true;
ufreeOffsetRanges = (UHeapFreeOffsetRanges *) palloc0(sizeof(UHeapFreeOffsetRanges));
ufreeOffsetRanges->nranges = 0;
availSpace = PageGetExactUHeapFreeSpace(page);
limit = OffsetNumberNext(UHeapPageGetMaxOffsetNumber(page));
* Look for "recyclable" (unused) ItemId. We check for no storage as
* well, just to be paranoid --- unused items should never have
* storage.
*/
for (offsetNumber = 1; offsetNumber < limit; offsetNumber++) {
RowPtr *rp = UPageGetRowPtr(page, offsetNumber);
if (*nthispage >= ntuples) {
break;
}
if (!RowPtrIsUsed(rp) && !RowPtrHasStorage(rp)) {
UHeapTuple uheaptup = tuples[*nthispage];
Size neededSpace;
if (isFirstInsert) {
neededSpace = *usedSpace + uheaptup->disk_tuple_size;
isFirstInsert = false;
} else {
neededSpace = *usedSpace + uheaptup->disk_tuple_size + saveFreeSpace;
}
if (availSpace < neededSpace) {
break;
}
(*usedSpace) += uheaptup->disk_tuple_size;
(*nthispage)++;
if (!inRange) {
ufreeOffsetRanges->nranges++;
ufreeOffsetRanges->startOffset[ufreeOffsetRanges->nranges - 1] = offsetNumber;
inRange = true;
}
ufreeOffsetRanges->endOffset[ufreeOffsetRanges->nranges - 1] = offsetNumber;
} else {
inRange = false;
}
}
return ufreeOffsetRanges;
}
* UHeapGetUsableOffsetRanges
*
* Given a page and a set of tuples, it calculates how many tuples can fit in
* the page and the contiguous ranges of free offsets that can be used/reused
* in the same page to store those tuples.
*/
UHeapFreeOffsetRanges *UHeapGetUsableOffsetRanges(Buffer buffer, UHeapTuple *tuples, int ntuples, Size saveFreeSpace)
{
Page page;
int nthispage = 0;
Size usedSpace = 0;
Size availSpace;
OffsetNumber limit;
UHeapFreeOffsetRanges *ufreeOffsetRanges = NULL;
bool isFirstInsert;
page = BufferGetPage(buffer);
limit = OffsetNumberNext(UHeapPageGetMaxOffsetNumber(page));
availSpace = PageGetExactUHeapFreeSpace(page);
UHeapPageHeaderData *uphdr = (UHeapPageHeaderData *)page;
if (UPageHasFreeLinePointers(uphdr)) {
ufreeOffsetRanges = LocateUsableItemIds(page, tuples, ntuples, saveFreeSpace, &nthispage, &usedSpace);
} else {
ufreeOffsetRanges = (UHeapFreeOffsetRanges *) palloc0(sizeof(UHeapFreeOffsetRanges));
ufreeOffsetRanges->nranges = 0;
}
isFirstInsert = (ufreeOffsetRanges->nranges == 0);
* Now, there are no free line pointers. Check whether we can insert
* another tuple in the page, then we'll insert another range starting
* from limit to max required offset number. We can decide the actual end
* offset for this range while inserting tuples in the buffer.
*/
if ((limit <= CalculatedMaxUHeapTuplesPerPage(UPageGetTDSlotCount(page))) && (nthispage < ntuples)) {
UHeapTuple uheaptup = tuples[nthispage];
Size neededSpace;
if (isFirstInsert) {
neededSpace = usedSpace + sizeof(ItemIdData) + uheaptup->disk_tuple_size;
isFirstInsert = false;
} else {
neededSpace = usedSpace + sizeof(ItemIdData) + uheaptup->disk_tuple_size + saveFreeSpace;
}
if (availSpace >= neededSpace) {
OffsetNumber maxRequiredOffset;
int requiredTuples = ntuples - nthispage;
* Choose minimum among MaxOffsetNumber and the maximum offsets
* required for tuples.
*/
maxRequiredOffset = Min(MaxOffsetNumber, (limit + requiredTuples));
ufreeOffsetRanges->nranges++;
ufreeOffsetRanges->startOffset[ufreeOffsetRanges->nranges - 1] = limit;
ufreeOffsetRanges->endOffset[ufreeOffsetRanges->nranges - 1] = maxRequiredOffset;
} else if (ufreeOffsetRanges->nranges == 0) {
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED), errmsg("tuple is too big, please check the table fillfactor: "
"needed_size = %lu, free_space = %lu, save_free_space = %lu.",
(unsigned long)neededSpace,
(unsigned long)availSpace,
(unsigned long)saveFreeSpace)));
}
}
if (ufreeOffsetRanges->nranges == 0) {
ereport(PANIC, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("please check free space: limit = %u, num_this_page = %d, free_space = %lu.", limit, nthispage,
(unsigned long)availSpace)));
}
return ufreeOffsetRanges;
}
void UHeapRecordPotentialFreeSpace(Buffer buffer, int delta)
{
UHeapPageHeaderData *page = (UHeapPageHeaderData *)BufferGetPage(buffer);
Assert(page->potential_freespace >= 0);
Size maxWritableSpc =
page->pd_special - (SizeOfUHeapPageHeaderData + page->td_count * sizeof(TD));
Size oldPotentialFreespace = page->potential_freespace;
Size newPotentialFreespace = Min(oldPotentialFreespace + delta, maxWritableSpc);
page->potential_freespace = newPotentialFreespace;
}
bool VerifyPageHeader(Page page)
{
if (page == NULL) {
return false;
}
PageHeader phdr = (PageHeader)page;
uint16 pdLower = phdr->pd_lower;
uint16 pdUpper = phdr->pd_upper;
uint16 pdSpecial = phdr->pd_special;
if (pdLower > pdUpper || pdUpper > pdSpecial || pdSpecial != BLCKSZ) {
ereport(PANIC, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("page header invalid: lower = %u, upper = %u, special = %u.",
pdLower, pdUpper, pdSpecial)));
}
return true;
}
static int RpCompare(const void *rp1, const void *rp2)
{
return ((RpSort)rp1)->start - ((RpSort)rp2)->start;
}
void FastVerifyUTuple(UHeapDiskTuple diskTup, Buffer buffer)
{
if (u_sess->attr.attr_storage.ustore_verify_level < (int) USTORE_VERIFY_DEFAULT) {
return;
}
int tdSlot = UHeapTupleHeaderGetTDSlot(diskTup);
int tdCount = UHEAP_MAX_TD;
BlockNumber blockno = InvalidBlockNumber;
Oid relId = InvalidOid;
uint16 reserved = diskTup->reserved;
if (!BufferIsInvalid(buffer)) {
BufferDesc *bufdesc = GetBufferDescriptor(buffer - 1);
Page page = BufferGetPage(buffer);
UHeapPageHeaderData *phdr = (UHeapPageHeaderData *)page;
tdCount = phdr->td_count;
relId = bufdesc->tag.rnode.relNode;
blockno = BufferGetBlockNumber(buffer);
}
if (tdSlot < 0 || tdSlot > tdCount) {
ereport(PANIC, (errmodule(MOD_USTORE), errmsg(
"verify utuple invalid! "
"LogInfo: tdSlot %d, tdcount %u, reserved %u. "
"TransInfo: oid %u, blockno %u.",
tdSlot, tdCount, reserved, relId, blockno)));
}
}
static int getModule(bool isRedo)
{
return isRedo ? USTORE_VERIFY_MOD_REDO : USTORE_VERIFY_MOD_UPAGE;
}
void UpageVerify(UHeapPageHeader header, XLogRecPtr lastRedo, TupleDesc tupDesc, Relation rel,
RelFileNode* rNode, BlockNumber blkno, bool isRedo, uint8 mask, OffsetNumber num)
{
BYPASS_VERIFY(getModule(isRedo), rel);
if (!isRedo) {
rNode = rel ? &rel->rd_node : NULL;
}
if (rNode == NULL) {
RelFileNode invalidRelFileNode = {InvalidOid, InvalidOid, InvalidOid};
rNode = &invalidRelFileNode;
}
CHECK_VERIFY_LEVEL(USTORE_VERIFY_FAST);
uint8 curMask = mask & USTORE_VERIFY_UPAGE_MASK;
if ((curMask & USTORE_VERIFY_UPAGE_HEADER) > 0) {
UpageVerifyHeader(header, lastRedo, rNode, blkno, isRedo);
}
if (num != InvalidOffsetNumber) {
if ((curMask & USTORE_VERIFY_UPAGE_ROW) > 0) {
RowPtr *rowptr = UPageGetRowPtr(header, num);
UpageVerifyRowptr(rowptr, (Page)header, num, rNode, blkno);
}
if ((curMask & USTORE_VERIFY_UPAGE_TUPLE) > 0) {
UpageVerifyTuple(header, num, tupDesc, rel, rNode, blkno, isRedo);
}
}
CHECK_VERIFY_LEVEL(USTORE_VERIFY_COMPLETE);
if ((curMask & USTORE_VERIFY_UPAGE_ROWS) > 0) {
UpageVerifyAllRowptr(header, rNode, blkno, isRedo);
}
if ((curMask & USTORE_VERIFY_UPAGE_TUPLE) > 0) {
if (num == InvalidOffsetNumber) {
for (OffsetNumber offNum= FirstOffsetNumber; offNum <= UHeapPageGetMaxOffsetNumber((char *)header); offNum++) {
UpageVerifyTuple(header, offNum, tupDesc, rel, rNode, blkno, isRedo);
}
}
}
}
void UpageVerifyHeader(UHeapPageHeader header, XLogRecPtr lastRedo, RelFileNode* rNode, BlockNumber blkno, bool isRedo)
{
if (lastRedo != InvalidXLogRecPtr && PageGetLSN(header) < lastRedo) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|HEADER] Current lsn(%X/%X) in page is smaller than last checkpoint(%X/%X)), "
"rnode[%u,%u,%u], block %u.", (uint32)(PageGetLSN(header) >> HIGH_BITS_LENGTH_OF_LSN),
(uint32)PageGetLSN(header), (uint32)(lastRedo >> HIGH_BITS_LENGTH_OF_LSN), (uint32)lastRedo,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno)));
}
if (unlikely(header->pd_lower < (SizeOfUHeapPageHeaderData + SizeOfUHeapTDData(header)) ||
header->pd_lower > header->pd_upper || header->pd_upper > header->pd_special ||
header->potential_freespace > BLCKSZ || header->pd_special != BLCKSZ)) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|HEADER] lower = %u, upper = %u, special = %u, potential = %u,"
" rnode[%u,%u,%u], block %u.", header->pd_lower, header->pd_upper, header->pd_special,
header->potential_freespace, rNode->spcNode, rNode->dbNode, rNode->relNode, blkno)));
}
if (header->td_count <= 0 || header->td_count > UHEAP_MAX_TD) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|HEADER] tdcount invalid: tdcount = %u, rnode[%u,%u,%u], block %u.",
header->td_count, rNode->spcNode, rNode->dbNode, rNode->relNode, blkno)));
}
if (TransactionIdFollows(header->pd_prune_xid, t_thrd.xact_cxt.ShmemVariableCache->nextXid)) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|HEADER] prune_xid invalid: prune_xid = %lu, nextxid = %lu."
" rnode[%u,%u,%u], block %u.", header->pd_prune_xid, t_thrd.xact_cxt.ShmemVariableCache->nextXid,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno)));
}
}
static void UpageVerifyTuple(UHeapPageHeader header, OffsetNumber offnum, TupleDesc tupDesc, Relation rel,
RelFileNode* rNode, BlockNumber blkno, bool isRedo)
{
RowPtr *rp = NULL;
UHeapDiskTuple diskTuple = NULL;
int tdSlot = InvalidTDSlotId;
bool hasInvalidXact = false;
TransactionId tupXid = InvalidTransactionId;
UHeapTupleTransInfo td_info = {InvalidTDSlotId, InvalidTransactionId, InvalidCommandId, INVALID_UNDO_REC_PTR};
rp = UPageGetRowPtr(header, offnum);
if (RowPtrIsNormal(rp)) {
diskTuple = (UHeapDiskTuple)UPageGetRowData(header, rp);
tdSlot = UHeapTupleHeaderGetTDSlot(diskTuple);
hasInvalidXact = UHeapTupleHasInvalidXact(diskTuple->flag);
tupXid = UDiskTupleGetModifiedXid(diskTuple, (Page)header);
td_info.td_slot = tdSlot;
if ((tdSlot != UHEAPTUP_SLOT_FROZEN)) {
if (tdSlot < 1 || tdSlot > header->td_count) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|TUPLE]tdSlot out of bounds, tdSlot = %d, td_count = %d, "
"rnode[%u,%u,%u], block %u, offnum %u.", tdSlot, header->td_count,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, offnum)));
return;
}
UHeapPageTDData *tdPtr = (UHeapPageTDData *)PageGetTDPointer(header);
TD *this_trans = &tdPtr->td_info[tdSlot - 1];
td_info.td_slot = tdSlot;
td_info.cid = InvalidCommandId;
td_info.urec_add = this_trans->undo_record_ptr;
td_info.xid = this_trans->xactid;
TransactionId xid = this_trans->xactid;
if (TransactionIdFollows(xid, t_thrd.xact_cxt.ShmemVariableCache->nextXid)) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|TUPLE]tdxid invalid: tdSlot = %d, tdxid = %lu, nextxid = %lu, "
"rnode[%u,%u,%u], block %u, offnum %u.", tdSlot, xid, t_thrd.xact_cxt.ShmemVariableCache->nextXid,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, offnum)));
}
if (TransactionIdIsValid(xid) && !UHeapTransactionIdDidCommit(xid) &&
TransactionIdPrecedes(xid, g_instance.undo_cxt.globalFrozenXid)) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|TUPLE]tdxid %lu in tdslot(%d) is smaller than global frozen xid %lu, "
"rnode[%u,%u,%u], block %u, offnum %u.", xid, tdSlot, g_instance.undo_cxt.globalFrozenXid,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, offnum)));
}
}
if (!hasInvalidXact && IS_VALID_UNDO_REC_PTR(td_info.urec_add) &&
(!TransactionIdIsValid(td_info.xid) || (TransactionIdIsValid(tupXid) &&
!TransactionIdEquals(td_info.xid, tupXid)))) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|TUPLE] tup xid inconsistency with td: tupxid = %lu, tdxid = %lu, urp %lu, "
"rnode[%u,%u,%u], block %u, offnum %u.", tupXid, td_info.xid, td_info.urec_add,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, offnum)));
return;
}
CHECK_VERIFY_LEVEL(USTORE_VERIFY_COMPLETE)
int tupSize = (rel == NULL) ? 0 : CalTupSize(rel, diskTuple, tupDesc);
if (tupSize > (int)RowPtrGetLen(rp) || (diskTuple->reserved != 0 &&
diskTuple->reserved != 0xFF)) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|TUPLE]corrupted tuple: tupsize = %d, rpsize = %u, "
"rnode[%u,%u,%u], block %u, offnum %u.",
tupSize, RowPtrGetLen(rp), rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, offnum)));
return;
}
if (!TransactionIdIsValid(tupXid)) {
return;
}
if (hasInvalidXact) {
if (!UHeapTransactionIdDidCommit(tupXid) && !t_thrd.xlog_cxt.InRecovery) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|TUPLE] tup xid not commit, tupxid = %lu, "
"rnode[%u,%u,%u], block %u, offnum %u.", tupXid,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, offnum)));
return;
}
if (TransactionIdEquals(td_info.xid, tupXid)) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|TUPLE] td reused but xid equal td: tupxid = %lu, tdxid = %lu, "
"rnode[%u,%u,%u], block %u, offnum %u.", tupXid, td_info.xid,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, offnum)));
return;
}
}
}
}
static void UpageVerifyRowptr(RowPtr *rowPtr, Page page, OffsetNumber offnum, RelFileNode* rNode, BlockNumber blkno)
{
UHeapPageHeader phdr = (UHeapPageHeader)page;
int nline = UHeapPageGetMaxOffsetNumber(page);
UHeapDiskTuple diskTuple = (UHeapDiskTuple)UPageGetRowData(page, rowPtr);
uint32 offset = RowPtrGetOffset(rowPtr);
uint32 len = SHORTALIGN(RowPtrGetLen(rowPtr));
int tdSlot = UHeapTupleHeaderGetTDSlot(diskTuple);
bool hasInvalidXact = UHeapTupleHasInvalidXact(diskTuple->flag);
TransactionId tupXid = UDiskTupleGetModifiedXid(diskTuple, page);
TransactionId locker = UHeapDiskTupleGetRawXid(diskTuple, page);
TransactionId topXid = GetTopTransactionId();
UHeapPageTDData *tdPtr = (UHeapPageTDData *)PageGetTDPointer(page);
if (!RowPtrIsNormal(rowPtr)) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|ROWPTR] Rowptr is abnormal (flags:%d, offset %d, len %d), "
"rnode[%u,%u,%u], block %u, offnum %u.", rowPtr->flags, offset, len, rNode->spcNode,
rNode->dbNode, rNode->relNode, blkno, offnum)));
return;
}
if (tdSlot < 1 || tdSlot > phdr->td_count) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|ROWPTR] Invalid tdSlot %d, td count of page is %d, "
"rnode[%u,%u,%u], block %u, offnum %u.",
tdSlot, phdr->td_count, rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, offnum)));
return;
}
TD *thistrans = &tdPtr->td_info[tdSlot - 1];
UndoRecPtr tdUrp = thistrans->undo_record_ptr;
TransactionId tdXid = thistrans->xactid;
if (UHEAP_XID_IS_LOCK(diskTuple->flag)) {
if (!TransactionIdEquals(locker, topXid)) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|ROWPTR] locker invalid: locker %lu, topxid %lu, "
"rnode[%u,%u,%u], block %u, offnum %u.",
locker, topXid, rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, offnum)));
return;
}
} else if (!IS_VALID_UNDO_REC_PTR(tdUrp) || hasInvalidXact || !TransactionIdEquals(tdXid, locker) ||
!TransactionIdEquals(tdXid, topXid) || !TransactionIdEquals(tdXid, tupXid)) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|ROWPTR] Td xid invalid: tdSlot %d, tdxid %lu, topxid %lu, "
"tupxid %lu, isInvalidSlot %d, rnode[%u,%u,%u], block %u, offnum %u.",
tdSlot, tdXid, topXid, tupXid, hasInvalidXact,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, offnum)));
return;
}
CHECK_VERIFY_LEVEL(USTORE_VERIFY_COMPLETE)
for (OffsetNumber i = FirstOffsetNumber; i <= nline; i++) {
if (i == offnum) {
continue;
}
RowPtr *rp = UPageGetRowPtr(page, i);
if (RowPtrIsNormal(rp)) {
uint32 tupOffset = RowPtrGetOffset(rp);
uint32 tupLen = SHORTALIGN(RowPtrGetLen(rp));
if (tupOffset < offset) {
if (tupOffset + tupLen > offset) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|ROWPTR] Rowptr data is abnormal, flags %d, offset %u,"
" len %d, alignTupLen %u, targetRpOffset %u, "
"rnode[%u,%u,%u], block %u, offnum %u, offnum2 %u.",
rp->flags, tupOffset, RowPtrGetLen(rp), tupLen, offset,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, offnum, i)));
}
} else if (offset + len > tupOffset) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|ROWPTR] Rowptr data is abnormal, flags %d, offset %u,"
" len %d, alignTupLen %u, targetRpOffset %u, targetRpLen %u, "
"rnode[%u,%u,%u], block %u, offnum %u, offnum2 %u.",
rp->flags, tupOffset, RowPtrGetLen(rp), tupLen, offset, len,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, offnum, i)));
}
}
}
}
static void UpageVerifyAllRowptr(UHeapPageHeader header, RelFileNode* rNode, BlockNumber blkno, bool isRedo)
{
int nline = UHeapPageGetMaxOffsetNumber((char *)header);
int tdSlot = 0;
int nstorage = 0;
OffsetNumber i;
RpSortData rowptrs[MaxPossibleUHeapTuplesPerPage];
RpSort sortPtr = rowptrs;
RowPtr *rp = NULL;
for (i = FirstOffsetNumber; i <= nline; i++) {
rp = UPageGetRowPtr(header, i);
if (RowPtrIsNormal(rp)) {
sortPtr->start = (int)RowPtrGetOffset(rp);
sortPtr->end = sortPtr->start + (int)SHORTALIGN(RowPtrGetLen(rp));
sortPtr->offset = i;
if (sortPtr->start < header->pd_upper || sortPtr->end > header->pd_special) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|ALLROWPTR] rpstart = %u, rplen = %u, pdlower = %u, pdupper = %u, "
"rnode[%u,%u,%u], block %u, offnum %u.",
RowPtrGetOffset(rp), RowPtrGetLen(rp), header->pd_lower, header->pd_upper,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, i)));
return;
}
sortPtr++;
} else if (RowPtrIsDeleted(rp)) {
tdSlot = RowPtrGetTDSlot(rp);
if (tdSlot == UHEAPTUP_SLOT_FROZEN) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|ALLROWPTR] tdslot frozen, tdSlot = %d, "
"rnode[%u,%u,%u], block %u, offnum %u.", tdSlot,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, i)));
return;
}
if (tdSlot < 1 || tdSlot > header->td_count) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|ALLROWPTR] tdSlot out of bounds, tdSlot = %d, "
"td_count = %d, rnode[%u,%u,%u], block %u, offnum %u.", tdSlot, header->td_count,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, i)));
return;
}
UHeapPageTDData *tdPtr = (UHeapPageTDData *)PageGetTDPointer(header);
TD * this_trans = &tdPtr->td_info[tdSlot - 1];
if (TransactionIdFollows(this_trans->xactid, t_thrd.xact_cxt.ShmemVariableCache->nextXid)) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|ALLROWPTR] tdxid invalid: tdSlot %d, tdxid = %lu, "
"nextxid = %lu, rnode[%u,%u,%u], block %u, offnum %u.", tdSlot, this_trans->xactid,
t_thrd.xact_cxt.ShmemVariableCache->nextXid,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno, i)));
return;
}
}
}
nstorage = sortPtr - rowptrs;
CHECK_VERIFY_LEVEL(USTORE_VERIFY_COMPLETE)
if (nstorage <= 1) {
return;
}
qsort((char *)rowptrs, nstorage, sizeof(RpSortData), RpCompare);
for (i = 0; i < nstorage - 1; i++) {
RpSort temp_ptr1 = &rowptrs[i];
RpSort temp_ptr2 = &rowptrs[i + 1];
if (temp_ptr1->end > temp_ptr2->start) {
ereport(defence_errlevel(), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("[UPAGE_VERIFY|ALLROWPTR]corrupted line pointer: rp1offnum %u, rp1start = %u, rp1end = %u, "
"rp2offnum = %u, rp2start = %u, rp2end = %u, rnode[%u,%u,%u], block %u.",
temp_ptr1->offset, temp_ptr1->start, temp_ptr1->end,
temp_ptr2->offset, temp_ptr2->start, temp_ptr2->end,
rNode->spcNode, rNode->dbNode, rNode->relNode, blkno)));
return;
}
}
}