* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* redo_bufpage.cpp
* common function about page
*
* IDENTIFICATION
* src/gausskernel/storage/access/redo/redo_bufpage.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "storage/buf/bufpage.h"
#include "storage/buf/bufmgr.h"
#include "access/itup.h"
#include "access/htup.h"
static bool PageCheckAndFreeDict(Page page);
static int ItemOffCompare(const void *itemIdp1, const void *itemIdp2);
* PageInit
* Initializes the contents of a page.
*/
void PageInit(Page page, Size pageSize, Size specialSize, bool isHeap)
{
PageHeader p = (PageHeader)page;
specialSize = MAXALIGN(specialSize);
Assert(pageSize == BLCKSZ);
Assert(pageSize > specialSize + SizeOfPageHeaderData);
errno_t ret = memset_s(p, pageSize, 0, pageSize);
securec_check(ret, "", "");
p->pd_upper = pageSize - specialSize;
p->pd_special = pageSize - specialSize;
if (isHeap) {
p->pd_lower = SizeOfHeapPageHeaderData;
PageSetPageSizeAndVersion(page, pageSize, PG_HEAP_PAGE_LAYOUT_VERSION);
} else {
p->pd_lower = SizeOfPageHeaderData;
PageSetPageSizeAndVersion(page, pageSize, PG_COMM_PAGE_LAYOUT_VERSION);
}
}
void SegPageInit(Page page, Size pageSize)
{
PageHeader p = (PageHeader)page;
Assert(pageSize == BLCKSZ);
errno_t ret = memset_s(p, pageSize, 0, pageSize);
securec_check(ret, "", "");
p->pd_upper = pageSize;
p->pd_special = pageSize;
p->pd_lower = SizeOfPageHeaderData;
PageSetPageSizeAndVersion(page, pageSize, PG_SEGMENT_PAGE_LAYOUT_VERSION);
}
void PageReinitWithDict(Page page, Size dictSize)
{
HeapPageHeader pd;
Assert(PageIsEmpty(page));
PageSetCompressed(page);
pd = (HeapPageHeader)page;
pd->pd_upper = PageGetPageSize(page) - MAXALIGN(dictSize);
pd->pd_special = pd->pd_upper;
}
static int ItemOffCompare(const void *itemIdp1, const void *itemIdp2)
{
return ((itemIdSort)itemIdp2)->itemoff - ((itemIdSort)itemIdp1)->itemoff;
}
* PageIndexMultiDelete
*
* This routine handles the case of deleting multiple tuples from an
* index page at once. It is considerably faster than a loop around
* PageIndexTupleDelete ... however, the caller *must* supply the array
* of item numbers to be deleted in item number order!
*/
void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
{
PageHeader phdr = (PageHeader)page;
Offset pdLower = phdr->pd_lower;
Offset pdUpper = phdr->pd_upper;
Offset pdSpecial = phdr->pd_special;
itemIdSortData itemidbase[MaxIndexTuplesPerPage];
itemIdSort itemidptr = NULL;
ItemId lp;
int nline, nused;
int i;
Size totallen;
Offset upper;
Size size;
unsigned offset;
int nextitm;
OffsetNumber offnum;
errno_t rc = EOK;
Assert(nitems < MaxIndexTuplesPerPage);
* If there aren't very many items to delete, then retail
* PageIndexTupleDelete is the best way. Delete the items in reverse
* order so we don't have to think about adjusting item numbers for
* previous deletions.
*
* Description: tune the magic number here
*/
if (nitems <= PAGE_INDEX_CAN_TUPLE_DELETE) {
while (--nitems >= 0)
PageIndexTupleDelete(page, itemnos[nitems]);
return;
}
* As with PageRepairFragmentation, paranoia seems justified.
*/
Assert(!PageIsCompressed(page));
if ((unsigned int)(pdLower) < GetPageHeaderSize(page) || pdLower > pdUpper || pdUpper > pdSpecial ||
pdSpecial > BLCKSZ || (unsigned int)(pdSpecial) != MAXALIGN(pdSpecial))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %d, upper = %d, special = %d", pdLower, pdUpper, pdSpecial)));
* Scan the item pointer array and build a list of just the ones we are
* going to keep. Notice we do not modify the page yet, since we are
* still validity-checking.
*/
nline = PageGetMaxOffsetNumber(page);
itemidptr = itemidbase;
totallen = 0;
nused = 0;
nextitm = 0;
for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum)) {
lp = PageGetItemId(page, offnum);
Assert(ItemIdHasStorage(lp));
size = ItemIdGetLength(lp);
offset = ItemIdGetOffset(lp);
if (offset < (unsigned int)(pdUpper) || (offset + size) > (unsigned int)(pdSpecial) ||
offset != MAXALIGN(offset))
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted item pointer: offset = %u, size = %u", offset, (unsigned int)size)));
if (nextitm < nitems && offnum == itemnos[nextitm]) {
nextitm++;
} else {
itemidptr->offsetindex = nused;
itemidptr->itemoff = offset;
itemidptr->olditemid = *lp;
itemidptr->alignedlen = MAXALIGN(size);
totallen += itemidptr->alignedlen;
itemidptr++;
nused++;
}
}
if (nextitm != nitems)
ereport(ERROR, (errcode(ERRCODE_INVALID_ROW_COUNT_IN_RESULT_OFFSET_CLAUSE),
errmsg("incorrect index offsets supplie, nitems %d, next_item %d", nitems, nextitm)));
if (totallen > (Size)(pdSpecial - pdLower))
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("corrupted item lengths: total %u, available space %d",
(unsigned int)totallen, pdSpecial - pdLower)));
qsort((char *)itemidbase, nused, sizeof(itemIdSortData), ItemOffCompare);
upper = pdSpecial;
for (i = 0, itemidptr = itemidbase; i < nused; i++, itemidptr++) {
lp = PageGetItemId(page, itemidptr->offsetindex + 1);
upper -= itemidptr->alignedlen;
rc = memmove_s((char *)page + upper, itemidptr->alignedlen, (char *)page + itemidptr->itemoff,
itemidptr->alignedlen);
securec_check(rc, "", "");
*lp = itemidptr->olditemid;
lp->lp_off = upper;
}
phdr->pd_lower = SizeOfPageHeaderData + nused * sizeof(ItemIdData);
phdr->pd_upper = upper;
}
* PageIndexTupleDelete
*
* This routine does the work of removing a tuple from an index page.
*
* Unlike heap pages, we compact out the line pointer for the removed tuple.
*/
void PageIndexTupleDelete(Page page, OffsetNumber offnum)
{
PageHeader phdr = (PageHeader)page;
char *addr = NULL;
ItemId tup;
Size size;
unsigned offset;
int nbytes;
int offidx;
int nline;
errno_t rc = EOK;
* As with PageRepairFragmentation, paranoia seems justified.
*/
Assert(!PageIsCompressed(page));
if (phdr->pd_lower < SizeOfPageHeaderData || phdr->pd_lower > phdr->pd_upper || phdr->pd_upper > phdr->pd_special ||
phdr->pd_special > BLCKSZ)
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u", phdr->pd_lower,
phdr->pd_upper, phdr->pd_special)));
nline = PageGetMaxOffsetNumber(page);
if ((int)offnum <= 0 || (int)offnum > nline)
ereport(ERROR, (errcode(ERRCODE_INVALID_ROW_COUNT_IN_RESULT_OFFSET_CLAUSE),
errmsg("invalid index offnum: %u", offnum)));
offidx = offnum - 1;
tup = PageGetItemId(page, offnum);
Assert(ItemIdHasStorage(tup));
size = ItemIdGetLength(tup);
offset = ItemIdGetOffset(tup);
if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special || offset != (unsigned int)(MAXALIGN(offset)) ||
size != (unsigned int)(MAXALIGN(size)))
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted item pointer: offset = %u, size = %u", offset, (unsigned int)size)));
* First, we want to get rid of the pd_linp entry for the index tuple. We
* copy all subsequent linp's back one slot in the array. We don't use
* PageGetItemId, because we are manipulating the _array_, not individual
* linp's.
*/
nbytes = phdr->pd_lower - ((char *)&phdr->pd_linp[offidx + 1] - (char *)phdr);
if (nbytes > 0) {
rc = memmove_s((char *)&(phdr->pd_linp[offidx]), nbytes, (char *)&(phdr->pd_linp[offidx + 1]), nbytes);
securec_check(rc, "", "");
}
* Now move everything between the old upper bound (beginning of tuple
* space) and the beginning of the deleted tuple forward, so that space in
* the middle of the page is left free. If we've just deleted the tuple
* at the beginning of tuple space, then there's no need to do the copy
* (and bcopy on some architectures SEGV's if asked to move zero bytes).
*/
addr = (char *)page + phdr->pd_upper;
if (offset > phdr->pd_upper) {
rc = memmove_s(addr + size, (int)(offset - phdr->pd_upper), addr, (int)(offset - phdr->pd_upper));
securec_check(rc, "", "");
}
phdr->pd_upper += size;
phdr->pd_lower -= sizeof(ItemIdData);
* Finally, we need to adjust the linp entries that remain.
*
* Anything that used to be before the deleted tuple's data was moved
* forward by the size of the deleted tuple.
*/
if (!PageIsEmpty(page)) {
int i;
nline--;
for (i = 1; i <= nline; i++) {
ItemId ii = PageGetItemId(phdr, i);
Assert(ItemIdHasStorage(ii));
if (ItemIdGetOffset(ii) <= offset)
ii->lp_off += size;
}
}
}
bool page_index_tuple_overwrite(Page page, OffsetNumber offnum, Item new_item, Size new_size)
{
PageHeader page_header = (PageHeader)page;
errno_t rc;
Offset pd_lower = page_header->pd_lower;
Offset pd_upper = page_header->pd_upper;
Offset pd_special = page_header->pd_special;
if ((unsigned int)pd_lower < SizeOfPageHeaderData || pd_lower > pd_upper || pd_upper > pd_special ||
pd_special > BLCKSZ || (unsigned int)pd_special != MAXALIGN(pd_special))
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u", pd_lower, pd_upper,
pd_special)));
int count = PageGetMaxOffsetNumber(page);
if ((int)offnum <= 0 || (int)offnum > count)
elog(ERROR, "invalid index offnum: %u", offnum);
ItemId item_id = PageGetItemId(page, offnum);
Assert(ItemIdHasStorage(item_id));
Size old_size = ItemIdGetLength(item_id);
unsigned offset = ItemIdGetOffset(item_id);
if (offset < (unsigned int)pd_upper || (offset + old_size) > (unsigned int)pd_special || offset != (unsigned int)MAXALIGN(offset))
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted line pointer: offset = %u, size = %u", offset, (unsigned int)old_size)));
Size aligned_old_size = MAXALIGN(old_size);
Size aligned_new_size = MAXALIGN(new_size);
if (aligned_new_size > (Size)(aligned_old_size + (pd_upper - pd_lower)))
return false;
int size_diff = (int)(aligned_old_size - aligned_new_size);
if (size_diff != 0) {
char *addr = (char *)page + pd_upper;
size_t tuples_size = offset - pd_upper;
if (tuples_size) {
rc = memmove_s(addr + size_diff, tuples_size, addr, tuples_size);
securec_check(rc, "", "");
}
page_header->pd_upper += size_diff;
for (int i = FirstOffsetNumber; i <= count; i++) {
ItemId ii = PageGetItemId(page_header, i);
if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
ii->lp_off += size_diff;
}
}
item_id->lp_off = offset + size_diff;
item_id->lp_len = new_size;
rc = memcpy_s(PageGetItem(page, item_id), new_size, new_item, new_size);
securec_check(rc, "", "");
return true;
}
* PageRestoreTempPage
* Copy temporary page back to permanent page after special processing
* and release the temporary page.
*/
void PageRestoreTempPage(Page tempPage, Page oldPage)
{
Size pageSize;
errno_t rc = EOK;
pageSize = PageGetPageSize(tempPage);
rc = memcpy_s((char *)oldPage, pageSize, (char *)tempPage, pageSize);
securec_check(rc, "", "");
pfree(tempPage);
}
void DumpPageInfo(Page page, XLogRecPtr newLsn)
{
PageHeader phdr = (PageHeader)page;
OffsetNumber nline = PageGetMaxOffsetNumber(page);
ereport(DEBUG4, (errmodule(MOD_REDO), errcode(ERRCODE_LOG),
errmsg("DumpPageInfo: lsn:%X/%X, pd_checksum:%u, flags:%u, lower:%u, upper:%u, special:%u"
"pagesize_version:%u, prune_xid:%u, nline:%u, newLsn:%X/%X",
phdr->pd_lsn.xlogid, phdr->pd_lsn.xrecoff, phdr->pd_checksum, phdr->pd_flags,
phdr->pd_lower, phdr->pd_upper, phdr->pd_special, phdr->pd_pagesize_version,
phdr->pd_prune_xid, nline, (uint32)(newLsn >> 32), (uint32)newLsn)));
}
* PageGetHeapFreeSpace
* Returns the size of the free (allocatable) space on a page,
* reduced by the space needed for a new line pointer.
*
* The difference between this and PageGetFreeSpace is that this will return
* zero if there are already MaxHeapTuplesPerPage line pointers in the page
* and none are free. We use this to enforce that no more than
* MaxHeapTuplesPerPage line pointers are created on a heap page. (Although
* no more tuples than that could fit anyway, in the presence of redirected
* or dead line pointers it'd be possible to have too many line pointers.
* To avoid breaking code that assumes MaxHeapTuplesPerPage is a hard limit
* on the number of line pointers, we make this extra check.)
*/
Size PageGetHeapFreeSpace(Page page)
{
Size space;
space = PageGetFreeSpace(page);
if (space > 0) {
OffsetNumber offnum, nline;
* Are there already MaxHeapTuplesPerPage line pointers in the page?
*/
nline = PageGetMaxOffsetNumber(page);
if (nline >= MaxHeapTuplesPerPage) {
if (PageHasFreeLinePointers((PageHeader)page)) {
* Since this is just a hint, we must confirm that there is
* indeed a free line pointer
*/
for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum)) {
ItemId lp = PageGetItemId(page, offnum);
if (!ItemIdIsUsed(lp))
break;
}
if (offnum > nline) {
* The hint is wrong, but we can't clear it here since we
* don't have the ability to mark the page dirty.
*/
space = 0;
}
} else {
* Although the hint might be wrong, PageAddItem will believe
* it anyway, so we must believe it too.
*/
space = 0;
}
}
}
return space;
}
* PageGetFreeSpace
* Returns the size of the free (allocatable) space on a page,
* reduced by the space needed for a new line pointer.
*
* Note: this should usually only be used on index pages. Use
* PageGetHeapFreeSpace on heap pages.
*/
Size PageGetFreeSpace(Page page)
{
int space;
* Use signed arithmetic here so that we behave sensibly if pd_lower >
* pd_upper.
*/
space = (int)((PageHeader)page)->pd_upper - (int)((PageHeader)page)->pd_lower;
if (space < (int)sizeof(ItemIdData))
return 0;
space -= sizeof(ItemIdData);
return (Size)space;
}
* PageRepairFragmentation
*
* Frees fragmented space on a page.
* It doesn't remove unused line pointers! Please don't change this.
*
* This routine is usable for heap pages only, but see PageIndexMultiDelete.
*
* As a side effect, the page's PD_HAS_FREE_LINES hint bit is updated.
*/
void PageRepairFragmentation(Page page)
{
Offset pdLower = ((PageHeader)page)->pd_lower;
Offset pdUpper = ((PageHeader)page)->pd_upper;
Offset pdSpecial = ((PageHeader)page)->pd_special;
ItemId lp;
int nline, nstorage, nunused;
int i;
Size totallen;
Offset upper;
errno_t rc = EOK;
* It's worth the trouble to be more paranoid here than in most places,
* because we are about to reshuffle data in (what is usually) a shared
* disk buffer. If we aren't careful then corrupted pointers, lengths,
* etc could cause us to clobber adjacent disk buffers, spreading the data
* loss further. So, check everything.
*/
if ((unsigned int)(pdLower) < SizeOfPageHeaderData || pdLower > pdUpper || pdUpper > pdSpecial ||
pdSpecial > BLCKSZ || (unsigned int)(pdSpecial) != MAXALIGN(pdSpecial))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %d, upper = %d, special = %d", pdLower, pdUpper, pdSpecial)));
nline = PageGetMaxOffsetNumber(page);
nunused = nstorage = 0;
for (i = FirstOffsetNumber; i <= nline; i++) {
lp = PageGetItemId(page, i);
if (ItemIdIsUsed(lp)) {
if (ItemIdHasStorage(lp))
nstorage++;
} else {
ItemIdSetUnused(lp);
nunused++;
}
}
if (nstorage == 0) {
((PageHeader)page)->pd_upper = pdSpecial;
} else {
itemIdSortData itemidbase[MaxHeapTuplesPerPage];
itemIdSort itemidptr = itemidbase;
totallen = 0;
for (i = 0; i < nline; i++) {
lp = PageGetItemId(page, i + 1);
if (ItemIdHasStorage(lp)) {
itemidptr->offsetindex = i;
itemidptr->itemoff = ItemIdGetOffset(lp);
if (itemidptr->itemoff < (int)pdUpper || itemidptr->itemoff >= (int)pdSpecial)
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted item pointer: %d", itemidptr->itemoff)));
itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
totallen += itemidptr->alignedlen;
itemidptr++;
}
}
if (totallen > (Size)(pdSpecial - pdLower))
ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED), errmsg("corrupted item lengths: total %u, available space %d",
(unsigned int)totallen, pdSpecial - pdLower)));
qsort((char *)itemidbase, nstorage, sizeof(itemIdSortData), ItemOffCompare);
upper = pdSpecial;
for (i = 0, itemidptr = itemidbase; i < nstorage; i++, itemidptr++) {
lp = PageGetItemId(page, itemidptr->offsetindex + 1);
upper -= itemidptr->alignedlen;
rc = memmove_s((char *)page + upper, itemidptr->alignedlen, (char *)page + itemidptr->itemoff,
itemidptr->alignedlen);
securec_check(rc, "", "");
lp->lp_off = upper;
}
((PageHeader)page)->pd_upper = upper;
if (SS_PRIMARY_MODE) {
Buffer buf = BlockGetBuffer((char *)page);
dms_buf_ctrl_t *buf_ctrl = &t_thrd.storage_cxt.dmsBufCtl[buf - 1];
buf_ctrl->need_check_pincount = true;
}
}
if (nunused > 0)
PageSetHasFreeLinePointers(page);
else
PageClearHasFreeLinePointers(page);
}
static bool PageCheckAndFreeDict(Page page)
{
PageHeader phdr = (PageHeader)page;
HeapTupleHeader htup;
ItemId lp;
int nline, i;
Offset pdUpper;
Size pageSize;
Size tupsSize;
Size dictsize;
uint16 headersize;
errno_t rc = EOK;
headersize = GetPageHeaderSize(page);
if (phdr->pd_lower < headersize || phdr->pd_lower > phdr->pd_upper || phdr->pd_upper > phdr->pd_special ||
phdr->pd_special > BLCKSZ) {
ereport(PANIC, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u", phdr->pd_lower,
phdr->pd_upper, phdr->pd_special)));
}
Assert(PageIsCompressed(page) && PageIsAllVisible(page));
nline = PageGetMaxOffsetNumber(page);
for (i = FirstOffsetNumber; i <= nline; i++) {
lp = PageGetItemId(page, i);
if (ItemIdIsUsed(lp) && ItemIdHasStorage(lp)) {
htup = (HeapTupleHeader)PageGetItem(page, lp);
if (HEAP_TUPLE_IS_COMPRESSED(htup)) {
return false;
}
}
}
pageSize = PageGetPageSize(page);
Assert(pageSize == BLCKSZ);
dictsize = (Size)PageGetSpecialSize(page);
pdUpper = phdr->pd_upper + dictsize;
tupsSize = pageSize - dictsize - phdr->pd_upper;
if (tupsSize > 0) {
rc = memmove_s((char *)page + pdUpper, tupsSize, (char *)page + phdr->pd_upper, tupsSize);
securec_check(rc, "", "");
}
for (i = FirstOffsetNumber; i <= nline; i++) {
lp = PageGetItemId(phdr, i);
if (!ItemIdIsUsed(lp) || !ItemIdHasStorage(lp))
continue;
Assert(ItemIdIsNormal(lp));
ItemIdSetNormal(lp, (ItemIdGetOffset(lp) + dictsize), ItemIdGetLength(lp));
}
phdr->pd_upper = pdUpper;
phdr->pd_special = pageSize;
PageClearCompressed(page);
return true;
}
bool PageFreeDict(Page page)
{
Assert(PageIsCompressed(page));
if (PageIsEmpty(page)) {
PageHeader pd;
pd = (PageHeader)page;
pd->pd_upper = pd->pd_special;
PageClearCompressed(page);
return true;
}
* are no compressed tuples. */
if (PageIsAllVisible(page)) {
return PageCheckAndFreeDict(page);
}
return false;
}
* PageGetTempPageCopySpecial
* Get a temporary page in local memory for special processing.
* The page is PageInit'd with the same special-space size as the
* given page, and the special space is copied from the given page.
*/
Page PageGetTempPageCopySpecial(Page page)
{
Size pageSize;
Page temp;
errno_t rc = EOK;
pageSize = PageGetPageSize(page);
temp = (Page)palloc(pageSize);
Assert(!PageIsCompressed(page));
PageInit(temp, pageSize, PageGetSpecialSize(page));
rc = memcpy_s(PageGetSpecialPointer(temp), PageGetSpecialSize(page), PageGetSpecialPointer(page),
PageGetSpecialSize(page));
securec_check(rc, "", "");
return temp;
}
* PageAddItem
*
* Add an item to a page. Return value is offset at which it was
* inserted, or InvalidOffsetNumber if there's not room to insert.
*
* If overwrite is true, we just store the item at the specified
* offsetNumber (which must be either a currently-unused item pointer,
* or one past the last existing item). Otherwise,
* if offsetNumber is valid and <= current max offset in the page,
* insert item into the array at that position by shuffling ItemId's
* down to make room.
* If offsetNumber is not valid, then assign one by finding the first
* one that is both unused and deallocated.
*
* If is_heap is true, we enforce that there can't be more than
* MaxHeapTuplesPerPage line pointers on the page.
*
* !!! EREPORT(ERROR) IS DISALLOWED HERE !!!
*/
OffsetNumber PageAddItem(Page page, Item item, Size size, OffsetNumber offsetNumber, bool overwrite, bool is_heap)
{
PageHeader phdr = (PageHeader)page;
Size alignedSize;
int lower;
int upper;
ItemId itemId;
OffsetNumber limit;
bool needshuffle = false;
uint16 headersize;
errno_t rc = EOK;
* Be wary about corrupted page pointers
*/
headersize = GetPageHeaderSize(page);
if (phdr->pd_lower < headersize || phdr->pd_lower > phdr->pd_upper || phdr->pd_upper > phdr->pd_special ||
phdr->pd_special > BLCKSZ)
ereport(PANIC, (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u", phdr->pd_lower,
phdr->pd_upper, phdr->pd_special)));
* Select offsetNumber to place the new item at
*/
limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
if (OffsetNumberIsValid(offsetNumber)) {
if (overwrite) {
if (offsetNumber < limit) {
itemId = PageGetItemId(phdr, offsetNumber);
if (ItemIdIsUsed(itemId) || ItemIdHasStorage(itemId)) {
ereport(WARNING, (errmsg("will not overwrite a used ItemId")));
return InvalidOffsetNumber;
}
}
} else {
if (offsetNumber < limit)
needshuffle = true;
}
} else {
if (PageHasFreeLinePointers(phdr)) {
* Look for "recyclable" (unused) ItemId. We check for no storage
* as well, just to be paranoid --- unused items should never have
* storage.
*/
for (offsetNumber = 1; offsetNumber < limit; offsetNumber++) {
itemId = PageGetItemId(phdr, offsetNumber);
if (!ItemIdIsUsed(itemId) && !ItemIdHasStorage(itemId))
break;
}
if (offsetNumber >= limit) {
PageClearHasFreeLinePointers(phdr);
}
} else {
offsetNumber = limit;
}
}
if (offsetNumber > limit) {
ereport(WARNING, (errmsg("specified item offset is too large")));
return InvalidOffsetNumber;
}
if (is_heap && offsetNumber > MaxHeapTuplesPerPage) {
ereport(WARNING, (errmsg("can't put more than MaxHeapTuplesPerPage items in a heap page")));
return InvalidOffsetNumber;
}
* Compute new lower and upper pointers for page, see if it'll fit.
*
* Note: do arithmetic as signed ints, to avoid mistakes if, say,
* alignedSize > pd_upper.
*/
if (offsetNumber == limit || needshuffle)
lower = phdr->pd_lower + sizeof(ItemIdData);
else
lower = phdr->pd_lower;
alignedSize = MAXALIGN(size);
upper = (int)phdr->pd_upper - (int)alignedSize;
if (lower > upper)
return InvalidOffsetNumber;
* OK to insert the item. First, shuffle the existing pointers if needed.
*/
itemId = PageGetItemId(phdr, offsetNumber);
if (needshuffle) {
rc = memmove_s(itemId + 1, (limit - offsetNumber) * sizeof(ItemIdData), itemId,
(limit - offsetNumber) * sizeof(ItemIdData));
securec_check(rc, "", "");
}
ItemIdSetNormal(itemId, upper, size);
rc = memcpy_s((char *)page + upper, alignedSize, item, size);
securec_check(rc, "", "");
phdr->pd_lower = (LocationIndex)lower;
phdr->pd_upper = (LocationIndex)upper;
return offsetNumber;
}