*
* slru.cpp
* Simple LRU buffering for transaction status logfiles
*
* We use a simple least-recently-used scheme to manage a pool of page
* buffers. Under ordinary circumstances we expect that write
* traffic will occur mostly to the latest page (and to the just-prior
* page, soon after a page transition). Read traffic will probably touch
* a larger span of pages, but in any case a fairly small number of page
* buffers should be sufficient. So, we just search the buffers using plain
* linear search; there's no need for a hashtable or anything fancy.
* The management algorithm is straight LRU except that we will never swap
* out the latest page (since we know it's going to be hit again eventually).
*
* We use a control LWLock to protect the shared data structures, plus
* per-buffer LWLocks that synchronize I/O for each buffer. The control lock
* must be held to examine or modify any shared state. A process that is
* reading in or writing out a page buffer does not hold the control lock,
* only the per-buffer lock for the buffer it is working on.
*
* "Holding the control lock" means exclusive lock in all cases except for
* SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for
* the implications of that.
*
* When initiating I/O on a buffer, we acquire the per-buffer lock exclusively
* before releasing the control lock. The per-buffer lock is released after
* completing the I/O, re-acquiring the control lock, and updating the shared
* state. (Deadlock is not possible here, because we never try to initiate
* I/O when someone else is already doing I/O on the same buffer.)
* To wait for I/O to complete, release the control lock, acquire the
* per-buffer lock in shared mode, immediately release the per-buffer lock,
* reacquire the control lock, and then recheck state (since arbitrary things
* could have happened while we didn't have the lock).
*
* As with the regular buffer manager, it is possible for another process
* to re-dirty a page that is currently being written out. This is handled
* by re-setting the page's page_dirty flag.
*
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/gausskernel/storage/access/transam/slru.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include "access/slru.h"
#include "access/transam.h"
#include "access/xlog.h"
#include "access/csnlog.h"
#include "commands/copy.h"
#include "storage/smgr/fd.h"
#include "storage/shmem.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "utils/builtins.h"
#include "storage/file/fio_device.h"
* During SimpleLruFlush(), we will usually not need to write/fsync more
* than one or two physical files, but we may need to write several pages
* per file. We can consolidate the I/O requests by leaving files open
* until control returns to SimpleLruFlush(). This data structure remembers
* which files are open.
*/
#define MAX_FLUSH_BUFFERS 16
typedef struct SlruFlushData {
int num_files;
int fd[MAX_FLUSH_BUFFERS];
int64 segno[MAX_FLUSH_BUFFERS];
} SlruFlushData;
typedef struct SlruFlushData *SlruFlush;
static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno);
static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata);
static bool SlruPhysicalReadPage(SlruCtl ctl, int64 pageno, int slotno);
static bool SlruPhysicalWritePage(SlruCtl ctl, int64 pageno, int slotno, SlruFlush fdata);
static void SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid);
static int SlruSelectLRUPage(SlruCtl ctl, int64 pageno);
template<bool enableBank>
static inline void SlruRecentlyUsed(SlruShared shared, int slotno);
static inline int execSimpleLruReadPageReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
{
SlruShared shared = ctl->shared;
bool enableBank = shared->enable_banks;
int slotno;
LWLock *bankLock = enableBank ? SimpleLruGetBankLock(ctl, pageno) : ctl->shared->control_lock;
int bankno = enableBank ? SimpleLruGetBankno(ctl, pageno) : 0;
int bankstart = enableBank ? bankno * SLRU_BANK_SIZE : 0;
int bankend = enableBank ? bankstart + SLRU_BANK_SIZE : shared->num_slots;
void (*SlruRecentlyUsedFunc)(SlruShared shared, int slotno);
SlruRecentlyUsedFunc = enableBank ? SlruRecentlyUsed<true> : SlruRecentlyUsed<false>;
Assert(LWLockHeldByMe(bankLock));
* See if page is already in a buffer.
* If enableBank, we only need to search the slots of target bank.
* If not, we need to search all slots.
*/
for (slotno = bankstart; slotno < bankend; slotno++) {
if (shared->page_number[slotno] == pageno && shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS) {
SlruRecentlyUsedFunc(shared, slotno);
return slotno;
}
}
LWLockRelease(bankLock);
(void)LWLockAcquire(bankLock, LW_EXCLUSIVE);
return SimpleLruReadPage(ctl, pageno, true, xid);
}
Size SimpleLruShmemSize(int nslots, int nlsns, bool enableBank)
{
int nbanks = nslots / SLRU_BANK_SIZE;
Size sz;
Assert(!enableBank || nslots < SLRU_MAX_ALLOWED_BUFFERS);
Assert(!enableBank || nslots % SLRU_BANK_SIZE == 0);
sz = MAXALIGN(sizeof(SlruSharedData));
sz += MAXALIGN(nslots * sizeof(char *));
sz += MAXALIGN(nslots * sizeof(SlruPageStatus));
sz += MAXALIGN(nslots * sizeof(bool));
sz += MAXALIGN(nslots * sizeof(int64));
sz += MAXALIGN(nslots * sizeof(int));
sz += MAXALIGN(nslots * sizeof(LWLock *));
if (enableBank) {
sz += MAXALIGN(nbanks * sizeof(LWLock *));
sz += MAXALIGN(nbanks * sizeof(int));
}
if (nlsns > 0)
sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));
return BUFFERALIGN(sz) + BLCKSZ * nslots;
}
void SimpleLruInit(SlruCtl ctl, const char *name, int bufferTrancheId, int bankTrancheId,
int nslots, int nlsns, LWLock *ctllock, const char *subdir, int index,
bool enableBank, int partitionNum)
{
SlruShared shared;
bool found = false;
errno_t rc = EOK;
int nbanks = enableBank ? nslots / SLRU_BANK_SIZE : 0;
Assert(!enableBank || nslots < SLRU_MAX_ALLOWED_BUFFERS);
shared = (SlruShared)ShmemInitStruct(name, SimpleLruShmemSize(nslots, nlsns, enableBank), &found);
if (!IsUnderPostmaster) {
char *ptr = NULL;
Size offset;
int slotno;
Assert(!found);
rc = memset_s(shared, sizeof(SlruSharedData), 0, sizeof(SlruSharedData));
securec_check(rc, "\0", "\0");
shared->enable_banks = enableBank;
shared->control_lock = enableBank ? NULL : ctllock;
shared->num_slots = nslots;
shared->lsn_groups_per_page = nlsns;
shared->cur_lru_count = 0;
shared->force_check_first_xid = false;
ptr = (char *)shared;
offset = MAXALIGN(sizeof(SlruSharedData));
shared->page_buffer = (char **)(ptr + offset);
offset += MAXALIGN(nslots * sizeof(char *));
shared->page_status = (SlruPageStatus *)(ptr + offset);
offset += MAXALIGN(nslots * sizeof(SlruPageStatus));
shared->page_dirty = (bool *)(ptr + offset);
offset += MAXALIGN(nslots * sizeof(bool));
shared->page_number = (int64 *)(ptr + offset);
offset += MAXALIGN(nslots * sizeof(int64));
shared->page_lru_count = (int *)(ptr + offset);
offset += MAXALIGN(nslots * sizeof(int));
shared->buffer_locks = (LWLock **)(ptr + offset);
offset += MAXALIGN(nslots * sizeof(LWLock *));
if (enableBank) {
shared->bank_locks = (LWLock **) (ptr + offset);
offset += MAXALIGN(nbanks * sizeof(LWLock *));
shared->bank_cur_lru_count = (int *) (ptr + offset);
offset += MAXALIGN(nbanks * sizeof(int));
} else {
shared->bank_locks = NULL;
shared->bank_cur_lru_count = NULL;
}
if (nlsns > 0) {
shared->group_lsn = (XLogRecPtr *)(ptr + offset);
offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));
}
ptr += BUFFERALIGN(offset);
for (slotno = 0; slotno < nslots; slotno++) {
shared->page_buffer[slotno] = ptr;
shared->page_status[slotno] = SLRU_PAGE_EMPTY;
shared->page_dirty[slotno] = false;
shared->page_lru_count[slotno] = 0;
shared->buffer_locks[slotno] = LWLockAssign(bufferTrancheId);
ptr += BLCKSZ;
}
if (enableBank) {
for (int bankno = 0; bankno < nbanks; bankno++) {
shared->bank_locks[bankno] = LWLockAssign(bankTrancheId);
shared->bank_cur_lru_count[bankno] = 0;
}
}
pg_atomic_init_u32(&shared->clogGroupFirst, INVALID_PGPROCNO);
Assert((Size)(ptr - (char *) shared) <= SimpleLruShmemSize(nslots, nlsns, enableBank));
} else {
Assert(found);
Assert(shared->num_slots == nslots);
}
* Initialize the unshared control struct, including directory path. We
* assume caller set PagePrecedes.
*/
ctl->shared = shared;
ctl->do_fsync = true;
ctl->bank_mask = (nslots / SLRU_BANK_SIZE) - 1;
ctl->slru_partition_num = partitionNum;
rc = strncpy_s(ctl->dir, sizeof(ctl->dir), subdir, strlen(subdir));
securec_check(rc, "\0", "\0");
}
* Initialize (or reinitialize) a page to zeroes.
*
* The page is not actually written, just set up in shared memory.
* The slot number of the new page is returned.
*
* Control lock must be held at entry, and will be held at exit.
*/
int SimpleLruZeroPage(SlruCtl ctl, int64 pageno, bool *pBZeroPage)
{
SlruShared shared = ctl->shared;
bool enableBank = shared->enable_banks;
LWLock *lock = enableBank ? SimpleLruGetBankLock(ctl, pageno) : shared->control_lock;
errno_t rc = EOK;
int slotno;
void (*SlruRecentlyUsedFunc)(SlruShared shared, int slotno);
SlruRecentlyUsedFunc = enableBank ? SlruRecentlyUsed<true> : SlruRecentlyUsed<false>;
Assert(LWLockHeldByMeInMode(lock, LW_EXCLUSIVE));
for (;;) {
slotno = SlruSelectLRUPage(ctl, pageno);
if (shared->page_number[slotno] == pageno && shared->page_status[slotno] != SLRU_PAGE_EMPTY) {
* If page is being read in, we must wait for I/O complete and recheck.
* If page is being written or valid, don't need to zero this page.
*/
if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS) {
SimpleLruWaitIO(ctl, slotno);
continue;
} else {
if (pBZeroPage != NULL) {
*pBZeroPage = false;
}
SlruRecentlyUsedFunc(shared, slotno);
return slotno;
}
}
* case 1: a free slot, or
* case 2: an existing slot from victim page, or
* case 3: an existing slot with the same pageno
*/
if (!(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
(shared->page_status[slotno] == SLRU_PAGE_VALID && !shared->page_dirty[slotno])))
ereport(PANIC,
(errmodule(MOD_SLRU), errcode(ERRCODE_DATA_EXCEPTION), errmsg("slru zero page under %s", ctl->dir),
errdetail("page(%ld) in slot(%d) status(%d) | number(%ld) | dirty(%d) is wrong.", pageno, slotno,
shared->page_status[slotno], shared->page_number[slotno], shared->page_dirty[slotno]),
errhint("Try it again.")));
shared->page_number[slotno] = pageno;
shared->page_status[slotno] = SLRU_PAGE_VALID;
shared->page_dirty[slotno] = true;
SlruRecentlyUsedFunc(shared, slotno);
rc = memset_s(shared->page_buffer[slotno], BLCKSZ, 0, BLCKSZ);
securec_check(rc, "\0", "\0");
SimpleLruZeroLSNs(ctl, slotno);
shared->latest_page_number = pageno;
return slotno;
}
}
* Zero all the LSNs we store for this slru page.
*
* This should be called each time we create a new page, and each time we read
* in a page from disk into an existing buffer. (Such an old page cannot
* have any interesting LSNs, since we'd have flushed them before writing
* the page in the first place.)
*
* This assumes that InvalidXLogRecPtr is bitwise-all-0.
*/
static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
{
errno_t rc = EOK;
SlruShared shared = ctl->shared;
if (shared->lsn_groups_per_page > 0) {
rc = memset_s(&shared->group_lsn[slotno * shared->lsn_groups_per_page],
shared->lsn_groups_per_page * sizeof(XLogRecPtr), 0,
shared->lsn_groups_per_page * sizeof(XLogRecPtr));
securec_check(rc, "\0", "\0");
}
}
* Wait for any active I/O on a page slot to finish.
*
* Important points:
* 1. This does not guarantee that new I/O hasn't been started before we return, though.
* 2. In fact the slot might not even contain the same page anymore.
*
* Control lock must be held at entry, and will be held at exit.
*/
void SimpleLruWaitIO(SlruCtl ctl, int slotno)
{
volatile SlruShared shared = ctl->shared;
bool enableBank = shared->enable_banks;
int bankno = SlotGetBankNumber(slotno);
LWLock *lock = enableBank ? shared->bank_locks[bankno] : shared->control_lock;
Assert(shared->page_status[slotno] != SLRU_PAGE_EMPTY);
LWLockRelease(lock);
(void)LWLockAcquire(shared->buffer_locks[slotno], LW_SHARED);
LWLockRelease(shared->buffer_locks[slotno]);
LWLockAcquire(lock, LW_EXCLUSIVE);
* If the slot is still in an io-in-progress state, then either someone
* already started a new I/O on the slot, or a previous I/O failed and
* neglected to reset the page state. That shouldn't happen, really, but
* it seems worth a few extra cycles to check and recover from it. We can
* cheaply test for failure by seeing if the buffer lock is still held (we
* assume that transaction abort would release the lock).
*/
if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS) {
if (LWLockConditionalAcquire(shared->buffer_locks[slotno], LW_SHARED)) {
if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS)
shared->page_status[slotno] = SLRU_PAGE_EMPTY;
else {
shared->page_status[slotno] = SLRU_PAGE_VALID;
shared->page_dirty[slotno] = true;
}
LWLockRelease(shared->buffer_locks[slotno]);
}
}
}
* Find a page in a shared buffer, reading it in if necessary.
* The page number must correspond to an already-initialized page.
*
* If write_ok is true then it is OK to return a page that is in
* WRITE_IN_PROGRESS state; it is the caller's responsibility to be sure
* that modification of the page is safe. If write_ok is false then we
* will not return the page until it is not undergoing active I/O.
*
* The passed-in xid is used only for error reporting, and may be
* InvalidTransactionId if no specific xid is associated with the action.
*
* Return value is the shared-buffer slot number now holding the page.
* The buffer's LRU access info is updated.
*
* The correct bank lock must be held at entry, and will be held at exit.
*/
int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, TransactionId xid)
{
SlruShared shared = ctl->shared;
bool enableBank = shared->enable_banks;
LWLock* banklock = enableBank ? SimpleLruGetBankLock(ctl, pageno) : shared->control_lock;
void (*SlruRecentlyUsedFunc)(SlruShared shared, int slotno);
SlruRecentlyUsedFunc = enableBank ? SlruRecentlyUsed<true> : SlruRecentlyUsed<false>;
Assert(LWLockHeldByMeInMode(banklock, LW_EXCLUSIVE));
for (;;) {
int slotno;
bool ok = false;
slotno = SlruSelectLRUPage(ctl, pageno);
if (shared->page_number[slotno] == pageno && shared->page_status[slotno] != SLRU_PAGE_EMPTY) {
* If page is still being read in, we must wait for I/O. Likewise
* if the page is being written and the caller said that's not OK.
*/
if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
(shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS && !write_ok)) {
SimpleLruWaitIO(ctl, slotno);
continue;
}
SlruRecentlyUsedFunc(shared, slotno);
return slotno;
}
if (!(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
(shared->page_status[slotno] == SLRU_PAGE_VALID && !shared->page_dirty[slotno])))
ereport(PANIC,
(errmodule(MOD_SLRU), errcode(ERRCODE_DATA_EXCEPTION), errmsg("slru read page under %s", ctl->dir),
errdetail("page(%ld) in slot(%d) status(%d) | dirty(%d) is wrong, xid %lu", pageno, slotno,
shared->page_status[slotno], shared->page_dirty[slotno], xid)));
shared->page_number[slotno] = pageno;
shared->page_status[slotno] = SLRU_PAGE_READ_IN_PROGRESS;
shared->page_dirty[slotno] = false;
(void)LWLockAcquire(shared->buffer_locks[slotno], LW_EXCLUSIVE);
if (!ENABLE_DSS) {
LWLockRelease(banklock);
}
ok = SlruPhysicalReadPage(ctl, pageno, slotno);
SimpleLruZeroLSNs(ctl, slotno);
if (!ENABLE_DSS) {
LWLockAcquire(banklock, LW_EXCLUSIVE);
}
if (!(shared->page_number[slotno] == pageno && shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS &&
!shared->page_dirty[slotno]))
ereport(PANIC,
(errmodule(MOD_SLRU), errcode(ERRCODE_DATA_EXCEPTION), errmsg("slru read page under %s", ctl->dir),
errdetail("page(%ld) in slot(%d) status(%d) | number(%ld) | dirty(%d) is wrong, xid %lu", pageno,
slotno, shared->page_status[slotno], shared->page_number[slotno],
shared->page_dirty[slotno], xid)));
shared->page_status[slotno] = ok ? SLRU_PAGE_VALID : SLRU_PAGE_EMPTY;
LWLockRelease(shared->buffer_locks[slotno]);
if (!ok) {
LWLockRelease(banklock);
SlruReportIOError(ctl, pageno, xid);
}
SlruRecentlyUsedFunc(shared, slotno);
return slotno;
}
}
* Find a page in a shared buffer, reading it in if necessary.
* The page number must correspond to an already-initialized page.
* The caller must intend only read-only access to the page.
*
* The passed-in xid is used only for error reporting, and may be
* InvalidTransactionId if no specific xid is associated with the action.
*
* Return value is the shared-buffer slot number now holding the page.
* The buffer's LRU access info is updated.
*
* Control lock must NOT be held at entry, but will be held at exit.
* It is unspecified whether the lock will be shared or exclusive.
*/
int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
{
bool enableBank = ctl->shared->enable_banks;
LWLock* banklock = enableBank? SimpleLruGetBankLock(ctl, pageno) : ctl->shared->control_lock;
LWLockAcquire(banklock, LW_SHARED);
return execSimpleLruReadPageReadOnly(ctl, pageno, xid);
}
* @Description: Same as SimpleLruReadPage_ReadOnly, but the shared lock must be held by the caller
* and will be held at exit.
* @in ctl - the slru control data
* @in pageno - the page number to read
* @in xid - the transaction to be search
* @return - return the buffer slot for the input transaction
*/
int SimpleLruReadPage_ReadOnly_Locked(SlruCtl ctl, int64 pageno, TransactionId xid)
{
bool enableBank = ctl->shared->enable_banks;
LWLock *banklock = enableBank ? SimpleLruGetBankLock(ctl, pageno) : ctl->shared->control_lock;
Assert(LWLockHeldByMe(banklock));
return execSimpleLruReadPageReadOnly(ctl, pageno, xid);
}
* Write a page from a shared buffer, if necessary.
* Does nothing if the specified slot is not dirty.
*
* NOTE: only one write attempt is made here. Hence, it is possible that
* the page is still dirty at exit (if someone else re-dirtied it during
* the write). However, we *do* attempt a fresh write even if the page
* is already being written; this is for checkpoints.
*
* Control lock must be held at entry, and will be held at exit.
*/
static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata)
{
SlruShared shared = ctl->shared;
bool enableBank = shared->enable_banks;
int64 pageno = shared->page_number[slotno];
int bankno = SlotGetBankNumber(slotno);
LWLock *banklock = enableBank ? SimpleLruGetBankLock(ctl, pageno) : ctl->shared->control_lock;
bool ok = false;
Assert(shared->page_status[slotno] != SLRU_PAGE_EMPTY);
Assert(LWLockHeldByMeInMode(banklock, LW_EXCLUSIVE));
while (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS && shared->page_number[slotno] == pageno) {
SimpleLruWaitIO(ctl, slotno);
}
* Do nothing if page is not dirty, or if buffer no longer contains the
* same page we were called for.
*/
if (!shared->page_dirty[slotno] || shared->page_status[slotno] != SLRU_PAGE_VALID ||
shared->page_number[slotno] != pageno)
return;
* Mark the slot write-busy, and clear the dirtybit. After this point, a
* transaction status update on this page will mark it dirty again.
*/
shared->page_status[slotno] = SLRU_PAGE_WRITE_IN_PROGRESS;
shared->page_dirty[slotno] = false;
(void)LWLockAcquire(shared->buffer_locks[slotno], LW_EXCLUSIVE);
if (!ENABLE_DSS) {
LWLockRelease(banklock);
}
ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata);
if (!ok && (fdata != NULL)) {
int i;
for (i = 0; i < fdata->num_files; i++)
(void)close(fdata->fd[i]);
}
if (!ENABLE_DSS) {
(void)LWLockAcquire(banklock, LW_EXCLUSIVE);
}
if (!(shared->page_number[slotno] == pageno && shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS))
ereport(PANIC,
(errmodule(MOD_SLRU), errcode(ERRCODE_DATA_EXCEPTION), errmsg("slru write page under %s", ctl->dir),
errdetail("page(%ld) in slot(%d) status(%d) | number(%ld) | dirty(%d) is wrong", pageno, slotno,
shared->page_status[slotno], shared->page_number[slotno], shared->page_dirty[slotno])));
if (!ok)
shared->page_dirty[slotno] = true;
shared->page_status[slotno] = SLRU_PAGE_VALID;
LWLockRelease(shared->buffer_locks[slotno]);
if (!ok) {
LWLockRelease(banklock);
SlruReportIOError(ctl, pageno, InvalidTransactionId);
}
}
* Wrapper of SlruInternalWritePage, for external callers.
* fdata is always passed a NULL here.
*/
void SimpleLruWritePage(SlruCtl ctl, int slotno)
{
Assert(ctl->shared->page_status[slotno] != SLRU_PAGE_EMPTY);
SlruInternalWritePage(ctl, slotno, NULL);
}
static bool SSPreAllocSegment(int fd, SlruFlush fdata)
{
struct stat s;
if (fstat(fd, &s) < 0) {
t_thrd.xact_cxt.slru_errcause = SLRU_OPEN_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
if (fdata == NULL) {
(void)close(fd);
}
return false;
}
int64 trunc_size = (int64)(SLRU_PAGES_PER_SEGMENT * BLCKSZ);
if (s.st_size < trunc_size) {
pgstat_report_waitevent(WAIT_EVENT_SLRU_WRITE);
errno = 0;
if (fallocate(fd, 0, s.st_size, trunc_size) != 0) {
pgstat_report_waitevent(WAIT_EVENT_END);
if (errno == 0) {
errno = ENOSPC;
}
t_thrd.xact_cxt.slru_errcause = SLRU_WRITE_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
if (fdata == NULL) {
(void)close(fd);
}
return false;
}
pgstat_report_waitevent(WAIT_EVENT_END);
}
return true;
}
* Physical read of a (previously existing) page into a buffer slot
*
* On failure, we cannot just ereport(ERROR) since caller has put state in
* shared memory that must be undone. So, we return FALSE and save enough
* info in static variables to let SlruReportIOError make the report.
*
* For now, assume it's not worth keeping a file pointer open across
* read/write operations. We could cache one virtual file pointer ...
*/
static bool SlruPhysicalReadPage(SlruCtl ctl, int64 pageno, int slotno)
{
SlruShared shared = ctl->shared;
int64 segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
int offset = rpageno * BLCKSZ;
char path[MAXPGPATH];
int fd;
int rc = 0;
rc = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%04X%08X", (ctl)->dir, (uint32)((uint64)(segno) >> 32),
(uint32)((segno) & (int64)0xFFFFFFFF));
securec_check_ss(rc, "", "");
* In a crash-and-restart situation, it's possible for us to receive
* commands to set the commit status of transactions whose bits are in
* already-truncated segments of the commit log (see notes in
* SlruPhysicalWritePage). Hence, if we are t_thrd.xlog_cxt.InRecovery, allow the case
* where the file doesn't exist, and return zeroes instead.
*/
fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0) {
if (errno != ENOENT || !t_thrd.xlog_cxt.InRecovery) {
t_thrd.xact_cxt.slru_errcause = SLRU_OPEN_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
return false;
}
ereport(LOG, (errmodule(MOD_SLRU), errmsg("file \"%s\" doesn't exist, zero page %ld in buffer", path, pageno)));
rc = memset_s(shared->page_buffer[slotno], BLCKSZ, 0, BLCKSZ);
securec_check(rc, "\0", "\0");
return true;
}
if (ENABLE_DSS) {
struct stat s;
if (fstat(fd, &s) < 0) {
t_thrd.xact_cxt.slru_errcause = SLRU_OPEN_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
(void)close(fd);
return false;
}
if (s.st_size <= offset) {
int64 trunc_size = (int64)(SLRU_PAGES_PER_SEGMENT * BLCKSZ);
if (s.st_size < trunc_size) {
pgstat_report_waitevent(WAIT_EVENT_SLRU_WRITE);
errno = 0;
if (fallocate(fd, 0, s.st_size, trunc_size) != 0) {
pgstat_report_waitevent(WAIT_EVENT_END);
if (errno == 0) {
errno = ENOSPC;
}
t_thrd.xact_cxt.slru_errcause = SLRU_WRITE_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
(void)close(fd);
return false;
}
pgstat_report_waitevent(WAIT_EVENT_END);
}
}
} else {
if (lseek(fd, (off_t)offset, SEEK_SET) < 0) {
t_thrd.xact_cxt.slru_errcause = SLRU_SEEK_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
(void)close(fd);
return false;
}
}
errno = 0;
pgstat_report_waitevent(WAIT_EVENT_SLRU_READ);
if (pread(fd, shared->page_buffer[slotno], BLCKSZ, (off_t)offset) != BLCKSZ) {
pgstat_report_waitevent(WAIT_EVENT_END);
if (!t_thrd.xlog_cxt.InRecovery) {
t_thrd.xact_cxt.slru_errcause = SLRU_READ_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
(void)close(fd);
return false;
}
ereport(LOG, (errmodule(MOD_SLRU), errmsg("file \"%s\" read error, zero page %ld in buffer", path, pageno)));
rc = memset_s(shared->page_buffer[slotno], BLCKSZ, 0, BLCKSZ);
securec_check(rc, "\0", "\0");
(void)close(fd);
return true;
}
pgstat_report_waitevent(WAIT_EVENT_END);
if (close(fd)) {
t_thrd.xact_cxt.slru_errcause = SLRU_CLOSE_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
return false;
}
return true;
}
* Physical write of a page from a buffer slot
*
* On failure, we cannot just ereport(ERROR) since caller has put state in
* shared memory that must be undone. So, we return FALSE and save enough
* info in static variables to let SlruReportIOError make the report.
*
* For now, assume it's not worth keeping a file pointer open across
* independent read/write operations. We do batch operations during
* SimpleLruFlush, though.
*
* fdata is NULL for a standalone write, pointer to open-file info during
* SimpleLruFlush.
*/
static bool SlruPhysicalWritePage(SlruCtl ctl, int64 pageno, int slotno, SlruFlush fdata)
{
SlruShared shared = ctl->shared;
int64 segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
int offset = rpageno * BLCKSZ;
char path[MAXPGPATH];
int fd = -1;
int rc = 0;
* Honor the write-WAL-before-data rule, if appropriate, so that we do not
* write out data before associated WAL records. This is the same action
* performed during FlushBuffer() in the main buffer manager.
*/
if (shared->group_lsn != NULL) {
* We must determine the largest async-commit LSN for the page. This
* is a bit tedious, but since this entire function is a slow path
* anyway, it seems better to do this here than to maintain a per-page
* LSN variable (which'd need an extra comparison in the
* transaction-commit path).
*/
XLogRecPtr max_lsn;
int lsnindex, lsnoff;
lsnindex = slotno * shared->lsn_groups_per_page;
max_lsn = shared->group_lsn[lsnindex++];
for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++) {
XLogRecPtr this_lsn = shared->group_lsn[lsnindex++];
if (XLByteLT(max_lsn, this_lsn))
max_lsn = this_lsn;
}
if (!XLogRecPtrIsInvalid(max_lsn)) {
* As noted above, ereport ERROR is not acceptable here, so if
* XLogFlush were to fail, we must PANIC. This isn't much of a
* restriction because XLogFlush is just about all critical
* section anyway, but let's make sure.
*/
START_CRIT_SECTION();
XLogWaitFlush(max_lsn);
END_CRIT_SECTION();
}
}
* During a Flush, we may already have the desired file open.
*/
if (fdata != NULL) {
int i;
for (i = 0; i < fdata->num_files; i++) {
if (fdata->segno[i] == segno) {
fd = fdata->fd[i];
break;
}
}
}
if (fd < 0) {
* If the file doesn't already exist, we should create it. It is
* possible for this to need to happen when writing a page that's not
* first in its segment; we assume the OS can cope with that. (Note:
* it might seem that it'd be okay to create files only when
* SimpleLruZeroPage is called for the first page of a segment.
* However, if after a crash and restart the REDO logic elects to
* replay the log from a checkpoint before the latest one, then it's
* possible that we will get commands to set transaction status of
* transactions that have already been truncated from the commit log.
* Easiest way to deal with that is to accept references to
* nonexistent files here and in SlruPhysicalReadPage.)
*
* Note: it is possible for more than one backend to be executing this
* code simultaneously for different pages of the same file. Hence,
* don't use O_EXCL or O_TRUNC or anything like that.
*/
rc = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%04X%08X", (ctl)->dir, (uint32)((uint64)(segno) >> 32),
(uint32)((segno) & (int64)0xFFFFFFFF));
securec_check_ss(rc, "", "");
fd = BasicOpenFile(path, O_RDWR | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0) {
t_thrd.xact_cxt.slru_errcause = SLRU_OPEN_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
return false;
}
if (fdata != NULL) {
if (fdata->num_files < MAX_FLUSH_BUFFERS) {
fdata->fd[fdata->num_files] = fd;
fdata->segno[fdata->num_files] = segno;
fdata->num_files++;
} else {
* In the unlikely event that we exceed MAX_FLUSH_BUFFERS,
* fall back to treating it as a standalone write.
*/
fdata = NULL;
}
}
}
if (lseek(fd, (off_t)offset, SEEK_SET) < 0) {
bool failed = true;
if (ENABLE_DSS && errno == ERR_DSS_FILE_SEEK) {
if (!SSPreAllocSegment(fd, fdata)) {
return false;
}
if (lseek(fd, (off_t)offset, SEEK_SET) >= 0) {
failed = false;
}
}
if (failed) {
t_thrd.xact_cxt.slru_errcause = SLRU_SEEK_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
if (fdata == NULL) {
(void)close(fd);
}
return false;
}
}
if (SS_STANDBY_PROMOTING) {
return true;
}
if (SS_STANDBY_MODE && strlen(shared->page_buffer[slotno])) {
force_backtrace_messages = true;
ereport(PANIC, (errmodule(MOD_DMS), errmsg("DMS standby can't write to disk")));
}
errno = 0;
pgstat_report_waitevent(WAIT_EVENT_SLRU_WRITE);
if (write(fd, shared->page_buffer[slotno], BLCKSZ) != BLCKSZ) {
pgstat_report_waitevent(WAIT_EVENT_END);
if (errno == 0)
errno = ENOSPC;
t_thrd.xact_cxt.slru_errcause = SLRU_WRITE_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
if (fdata == NULL)
(void)close(fd);
return false;
}
pgstat_report_waitevent(WAIT_EVENT_END);
* If not part of Flush, need to fsync now. We assume this happens
* infrequently enough that it's not a performance issue.
*/
if (fdata == NULL) {
pgstat_report_waitevent(WAIT_EVENT_SLRU_SYNC);
if (ctl->do_fsync && pg_fsync(fd)) {
pgstat_report_waitevent(WAIT_EVENT_END);
t_thrd.xact_cxt.slru_errcause = SLRU_FSYNC_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
(void)close(fd);
return false;
}
pgstat_report_waitevent(WAIT_EVENT_END);
if (close(fd)) {
t_thrd.xact_cxt.slru_errcause = SLRU_CLOSE_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
return false;
}
}
return true;
}
* Issue the error message after failure of SlruPhysicalReadPage or
* SlruPhysicalWritePage. Call this after cleaning up shared-memory state.
*/
static void SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid)
{
int64 segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
int offset = rpageno * BLCKSZ;
char path[MAXPGPATH];
int rc = 0;
rc = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%04X%08X", (ctl)->dir, (uint32)((uint64)(segno) >> 32),
(uint32)((segno) & (int64)0xFFFFFFFF));
securec_check_ss(rc, "", "");
errno = t_thrd.xact_cxt.slru_errno;
switch (t_thrd.xact_cxt.slru_errcause) {
case SLRU_OPEN_FAILED:
ereport(ERROR, (errmodule(MOD_SLRU), errcode_for_file_access(),
errmsg("could not access status of transaction %lu , nextXid is %lu, pageno %ld, "
"t_thrd.pgxact->xmin %lu",
xid, t_thrd.xact_cxt.ShmemVariableCache->nextXid, pageno, t_thrd.pgxact->xmin),
errdetail("Could not open file \"%s\": %m.", path)));
break;
case SLRU_SEEK_FAILED:
ereport(ERROR, (errmodule(MOD_SLRU), errcode_for_file_access(),
errmsg("could not access status of transaction %lu, nextXid is %lu", xid,
t_thrd.xact_cxt.ShmemVariableCache->nextXid),
errdetail("Could not seek in file \"%s\" to offset %d: %m.", path, offset)));
break;
case SLRU_READ_FAILED:
ereport(ERROR, (errmodule(MOD_SLRU), errcode_for_file_access(),
errmsg("could not access status of transaction %lu, nextXid is %lu", xid,
t_thrd.xact_cxt.ShmemVariableCache->nextXid),
errdetail("Could not read from file \"%s\" at offset %d: %m.", path, offset)));
break;
case SLRU_WRITE_FAILED:
ereport(ERROR, (errmodule(MOD_SLRU), errcode_for_file_access(),
errmsg("could not access status of transaction %lu", xid),
errdetail("Could not write to file \"%s\" at offset %d: %m.", path, offset)));
break;
case SLRU_FSYNC_FAILED:
ereport(data_sync_elevel(ERROR), (errmodule(MOD_SLRU), errcode_for_file_access(),
errmsg("could not access status of transaction %lu", xid),
errdetail("Could not fsync file \"%s\": %m.", path)));
break;
case SLRU_CLOSE_FAILED:
ereport(ERROR, (errmodule(MOD_SLRU), errcode_for_file_access(),
errmsg("could not access status of transaction %lu", xid),
errdetail("Could not close file \"%s\": %m.", path)));
break;
default:
ereport(ERROR, (errmodule(MOD_SLRU), errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("unrecognized SimpleLru error cause: %d", (int)t_thrd.xact_cxt.slru_errcause)));
break;
}
}
* Macro to mark a buffer slot "most recently used". Note multiple evaluation
* of arguments!
*
* The reason for the if-test is that there are often many consecutive
* accesses to the same page (particularly the latest page). By suppressing
* useless increments of cur_lru_count, we reduce the probability that old
* pages' counts will "wrap around" and make them appear recently used.
*
* We allow this code to be executed concurrently by multiple processes within
* SimpleLruReadPage_ReadOnly(). As long as int reads and writes are atomic,
* this should not cause any completely-bogus values to enter the computation.
* However, it is possible for either cur_lru_count or individual
* page_lru_count entries to be "reset" to lower values than they should have,
* in case a process is delayed while it executes this macro. With care in
* SlruSelectLRUPage(), this does little harm, and in any case the absolute
* worst possible consequence is a nonoptimal choice of page to evict. The
* gain from allowing concurrent reads of SLRU pages seems worth it.
*/
template<bool enableBank>
static void SlruRecentlyUsed(SlruShared shared, int slotno)
{
if (enableBank) {
int bankno = SlotGetBankNumber(slotno);
int new_lru_count = shared->bank_cur_lru_count[bankno];
Assert(shared->page_status[slotno] != SLRU_PAGE_EMPTY);
if (new_lru_count != shared->page_lru_count[slotno]) {
shared->bank_cur_lru_count[bankno] = ++new_lru_count;
shared->page_lru_count[slotno] = new_lru_count;
}
} else {
int new_lru_count = (shared)->cur_lru_count;
if (new_lru_count != (shared)->page_lru_count[slotno]) {
(shared)->cur_lru_count = ++new_lru_count;
(shared)->page_lru_count[slotno] = new_lru_count;
}
}
}
* Select the slot to re-use when we need a free slot.
*
* The target page number is passed because we need to consider the
* possibility that some other process reads in the target page while
* we are doing I/O to free a slot. Hence, check or recheck to see if
* any slot already holds the target page, and return that slot if so.
* Thus, the returned slot is *either* a slot already holding the pageno
* (could be any state except EMPTY), *or* a freeable slot (state EMPTY
* or CLEAN).
*
* Control lock must be held at entry, and will be held at exit.
*/
static int SlruSelectLRUPage(SlruCtl ctl, int64 pageno)
{
SlruShared shared = ctl->shared;
bool enableBank = shared->enable_banks;
for (;;) {
int slotno;
int cur_count;
int best_valid_slot = 0;
int best_valid_delta = -1;
int64 best_valid_page_number = 0;
int best_invalid_slot = 0;
int best_invalid_delta = -1;
int64 best_invalid_page_number = 0;
int bankno = SimpleLruGetBankno(ctl, pageno);
int bankstart = enableBank ? bankno * SLRU_BANK_SIZE : 0;
int bankend = enableBank ? bankstart + SLRU_BANK_SIZE : shared->num_slots;
if (enableBank) {
Assert(LWLockHeldByMe(SimpleLruGetBankLock(ctl, pageno)));
} else {
Assert(LWLockHeldByMe(shared->control_lock));
}
for (slotno = 0; slotno < shared->num_slots; slotno++) {
if (shared->page_number[slotno] == pageno && shared->page_status[slotno] != SLRU_PAGE_EMPTY) {
Assert(0 <= slotno && slotno < shared->num_slots);
return slotno;
}
}
* If we find any EMPTY slot, just select that one. Else choose a
* victim page to replace. We normally take the least recently used
* valid page, but we will never take the slot containing
* latest_page_number, even if it appears least recently used. We
* will select a slot that is already I/O busy only if there is no
* other choice: a read-busy slot will not be least recently used once
* the read finishes, and waiting for an I/O on a write-busy slot is
* inferior to just picking some other slot. Testing shows the slot
* we pick instead will often be clean, allowing us to begin a read at
* once.
*
* Normally the page_lru_count values will all be different and so
* there will be a well-defined LRU page. But since we allow
* concurrent execution of SlruRecentlyUsed() within
* SimpleLruReadPage_ReadOnly(), it is possible that multiple pages
* acquire the same lru_count values. In that case we break ties by
* choosing the furthest-back page.
*
* Notice that this next line forcibly advances cur_lru_count to a
* value that is certainly beyond any value that will be in the
* page_lru_count array after the loop finishes. This ensures that
* the next execution of SlruRecentlyUsed will mark the page newly
* used, even if it's for a page that has the current counter value.
* That gets us back on the path to having good data when there are
* multiple pages with the same lru_count.
*/
if (enableBank) {
cur_count = (shared->bank_cur_lru_count[bankno])++;
} else {
cur_count = (shared->cur_lru_count)++;
}
for (slotno = bankstart; slotno < bankend; slotno++) {
int this_delta;
int64 this_page_number;
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY) {
return slotno;
}
this_delta = cur_count - shared->page_lru_count[slotno];
if (this_delta < 0) {
* Clean up in case shared updates have caused cur_count
* increments to get "lost". We back off the page counts,
* rather than trying to increase cur_count, to avoid any
* question of infinite loops or failure in the presence of
* wrapped-around counts.
*/
shared->page_lru_count[slotno] = cur_count;
this_delta = 0;
}
this_page_number = shared->page_number[slotno];
if (this_page_number == shared->latest_page_number) {
continue;
}
if (shared->page_status[slotno] == SLRU_PAGE_VALID) {
if (this_delta > best_valid_delta ||
(this_delta == best_valid_delta && this_page_number < best_valid_page_number)) {
best_valid_slot = slotno;
best_valid_delta = this_delta;
best_valid_page_number = this_page_number;
}
} else {
if (this_delta > best_invalid_delta ||
(this_delta == best_invalid_delta && this_page_number < best_valid_page_number)) {
best_invalid_slot = slotno;
best_invalid_delta = this_delta;
best_invalid_page_number = this_page_number;
}
}
}
* If all pages (except possibly the latest one) are I/O busy, we'll
* have to wait for an I/O to complete and then retry. In that
* unhappy case, we choose to wait for the I/O on the least recently
* used slot, on the assumption that it was likely initiated first of
* all the I/Os in progress and may therefore finish first.
*/
if (best_valid_delta < 0) {
SimpleLruWaitIO(ctl, best_invalid_slot);
continue;
}
* If the selected page is clean, we're set.
*/
if (!shared->page_dirty[best_valid_slot]) {
return best_valid_slot;
}
* Write the page.
*/
SlruInternalWritePage(ctl, best_valid_slot, NULL);
* Now loop back and try again. This is the easiest way of dealing
* with corner cases such as the victim page being re-dirtied while we
* wrote it.
*/
}
}
* Flush dirty pages to disk during checkpoint or database shutdown
*/
int SimpleLruFlush(SlruCtl ctl, bool checkpoint)
{
SlruShared shared = ctl->shared;
bool enableBank = shared->enable_banks;
SlruFlushData fdata = { 0, {0}, {0}};
int slotno;
int64 pageno = 0;
int i;
int prevbank = SlotGetBankNumber(0);
LWLock *lock;
bool ok = false;
* Find and write dirty pages
*/
fdata.num_files = 0;
lock = enableBank ? shared->bank_locks[prevbank] : shared->control_lock;
(void)LWLockAcquire(lock, LW_EXCLUSIVE);
for (slotno = 0; slotno < shared->num_slots; slotno++) {
if (enableBank) {
int curbank = SlotGetBankNumber(slotno);
* If the current bank lock is not same as the previous bank lock then
* release the previous lock and acquire the new lock.
*/
if (curbank != prevbank) {
LWLockRelease(shared->bank_locks[prevbank]);
LWLockAcquire(shared->bank_locks[curbank], LW_EXCLUSIVE);
prevbank = curbank;
}
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
continue;
}
SlruInternalWritePage(ctl, slotno, &fdata);
* When called during a checkpoint, we cannot assert that the slot is
* clean now, since another process might have re-dirtied it already.
* That's okay.
*/
Assert(checkpoint || shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
(shared->page_status[slotno] == SLRU_PAGE_VALID && !shared->page_dirty[slotno]));
}
lock = enableBank ? shared->bank_locks[prevbank] : shared->control_lock;
LWLockRelease(lock);
* Now fsync and close any files that were open
*/
ok = true;
for (i = 0; i < fdata.num_files; i++) {
pgstat_report_waitevent(WAIT_EVENT_SLRU_FLUSH_SYNC);
if (ctl->do_fsync && pg_fsync(fdata.fd[i])) {
t_thrd.xact_cxt.slru_errcause = SLRU_FSYNC_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
ok = false;
}
pgstat_report_waitevent(WAIT_EVENT_END);
if (close(fdata.fd[i])) {
t_thrd.xact_cxt.slru_errcause = SLRU_CLOSE_FAILED;
t_thrd.xact_cxt.slru_errno = errno;
pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
ok = false;
}
}
if (!ok)
SlruReportIOError(ctl, pageno, InvalidTransactionId);
return fdata.num_files;
}
* Remove all segments before the one holding the passed page number
*/
void SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage, int partitionNum)
{
if (SS_STANDBY_MODE) {
ereport(WARNING, (errmodule(MOD_DMS), errmsg("DMS standby can't truncate slru page")));
return;
}
int prevbank;
SlruShared shared = NULL;
int64 slotno;
bool isCsnLogCtl = false;
bool isPart = (partitionNum > NUM_SLRU_DEFAULT_PARTITION);
bool enableBank = ctl->shared->enable_banks;
isCsnLogCtl = strcmp(ctl->dir, CSNLOGDIR) == 0;
* The cutoff point is the start of the segment containing cutoffPage.
*/
cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
* Scan shared memory and remove any pages preceding the cutoff page, to
* ensure we won't rewrite them later. (Since this is normally called in
* or just after a checkpoint, any dirty pages should have been flushed
* already ... we're just being extra careful here.)
*/
for (int i = 0; i < partitionNum; i++) {
shared = (ctl + i)->shared;
if (enableBank) {
prevbank = SlotGetBankNumber(0);
LWLockAcquire(shared->bank_locks[prevbank], LW_EXCLUSIVE);
} else {
(void)LWLockAcquire(shared->control_lock, LW_EXCLUSIVE);
}
restart:;
* While we are holding the lock, make an important safety check: the
* planned cutoff point must be <= the current endpoint page. Otherwise we
* have already wrapped around, and proceeding with the truncation would
* risk removing the current segment. If we use partitioned slru ctl, no need
* to check the lateset_page_number.
*/
if (shared->latest_page_number < cutoffPage && !isPart) {
LWLock *lock = enableBank ? shared->bank_locks[prevbank] : shared->control_lock;
LWLockRelease(lock);
ereport(LOG, (errmodule(MOD_SLRU),
errmsg("could not truncate directory \"%s\": apparent wraparound", ctl->dir)));
return;
}
for (slotno = 0; slotno < shared->num_slots; slotno++) {
if (enableBank) {
int curbank = SlotGetBankNumber(slotno);
if (curbank != prevbank) {
LWLockRelease(shared->bank_locks[prevbank]);
LWLockAcquire(shared->bank_locks[curbank], LW_EXCLUSIVE);
prevbank = curbank;
}
}
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
continue;
if (shared->page_number[slotno] >= cutoffPage)
continue;
* If page is clean, just change state to EMPTY (expected case).
*/
if (shared->page_status[slotno] == SLRU_PAGE_VALID && !shared->page_dirty[slotno]) {
shared->page_status[slotno] = SLRU_PAGE_EMPTY;
continue;
}
* Hmm, we have (or may have) I/O operations acting on the page, so
* we've got to wait for them to finish and then start again. This is
* the same logic as in SlruSelectLRUPage. (XXX if page is dirty,
* wouldn't it be OK to just discard it without writing it? For now,
* keep the logic the same as it was.) Csnlog just discard it without writing it.
*/
if (shared->page_status[slotno] == SLRU_PAGE_VALID) {
if (isCsnLogCtl) {
shared->page_status[slotno] = SLRU_PAGE_EMPTY;
continue;
}
SlruInternalWritePage((ctl + i), slotno, NULL);
} else {
SimpleLruWaitIO((ctl + i), slotno);
}
goto restart;
}
LWLock *lock = enableBank ? shared->bank_locks[prevbank] : shared->control_lock;
LWLockRelease(lock);
}
ereport(LOG, (errmodule(MOD_SLRU), errmsg("remove old segments(<%ld) under %s", cutoffPage, ctl->dir)));
(void)SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage);
}
* SlruScanDirectory callback
* This callback reports true if there's any segment prior to the one
* containing the page passed as "data".
*/
bool SlruScanDirCbReportPresence(SlruCtl ctl, const char *filename, int64 segpage, const void *data)
{
int64 cutoffPage = *(int64 *)data;
cutoffPage -= cutoffPage % SLRU_PAGES_PER_SEGMENT;
if (segpage < cutoffPage)
return true;
return false;
}
* SlruScanDirectory callback.
* This callback deletes segments prior to the one passed in as "data".
*/
bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, const char *filename, int64 segpage, const void *data)
{
char path[MAXPGPATH];
int64 cutoffPage = *(int64 *)data;
int rc = 0;
if (segpage < cutoffPage) {
rc = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", ctl->dir, filename);
securec_check_ss(rc, "\0", "\0");
ereport(DEBUG2, (errmodule(MOD_SLRU), errmsg("removing file \"%s\"", path)));
(void)unlink(path);
}
return false;
}
* SlruScanDirectory callback.
* This callback deletes all segments.
*/
bool SlruScanDirCbDeleteAll(SlruCtl ctl, const char *filename, int64 segpage, const void *data)
{
char path[MAXPGPATH];
int rc = 0;
rc = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", ctl->dir, filename);
securec_check_ss(rc, "\0", "\0");
ereport(DEBUG2, (errmodule(MOD_SLRU), errmsg("removing file \"%s\"", path)));
(void)unlink(path);
return false;
}
* Scan the SimpleLRU directory and apply a callback to each file found in it.
*
* If the callback returns true, the scan is stopped. The last return value
* from the callback is returned.
*
* Note that the ordering in which the directory is scanned is not guaranteed.
*
* Note that no locking is applied.
*/
bool SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, const void *data)
{
bool retval = false;
DIR *cldir = NULL;
struct dirent *clde = NULL;
int64 segno;
int64 segpage;
cldir = AllocateDir(ctl->dir);
while ((clde = ReadDir(cldir, ctl->dir)) != NULL) {
if (strlen(clde->d_name) == 12 && strspn(clde->d_name, "0123456789ABCDEF") == 12) {
segno = (int64)pg_strtouint64(clde->d_name, NULL, 16);
segpage = segno * SLRU_PAGES_PER_SEGMENT;
ereport(DEBUG2, (errmodule(MOD_SLRU),
errmsg("SlruScanDirectory invoking callback on %s/%s", ctl->dir, clde->d_name)));
retval = callback(ctl, clde->d_name, segpage, data);
if (retval)
break;
}
}
(void)FreeDir(cldir);
return retval;
}
void SimpleLruSetPageEmpty(SlruCtl ctl, const char *name, int trancheId, int nslots, int nlsns,
const char *subdir, int index)
{
bool enableBank = ctl->shared->enable_banks;
bool found = false;
int slotno;
SlruShared shared = (SlruShared)ShmemInitStruct(name, SimpleLruShmemSize(nslots, nlsns, enableBank), &found);
for (slotno = 0; slotno < nslots; slotno++) {
shared->page_status[slotno] = SLRU_PAGE_EMPTY;
}
}
#ifdef ENABLE_UT
void ut_SetErrCause(int errcause)
{
t_thrd.xact_cxt.slru_errcause = (SlruErrorCause)errcause;
}
void ut_SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid)
{
SlruReportIOError(ctl, pageno, xid);
}
#endif
void SetSlruBufferDefaultNum(void)
{
for (int i = 0; i < SLRU_BUFFER_KIND; i++) {
g_instance.attr.attr_storage.num_slru_buffers[i] = SLRU_BUFFER_INFO[i].defaultBufferNum;
}
}
static int FindSlruBufferIndex(const char* name)
{
int i;
for (i = 0; i < LWLOCK_PART_KIND; i++) {
if (pg_strcasecmp(name, SLRU_BUFFER_INFO[i].name) == 0) {
return i;
}
}
return -1;
}
static bool CheckAndSetSlruBufferNum(const char* input)
{
const int pairNum = 2;
char* strs = TrimStr(input);
if (strs == NULL || strs[0] == '\0') {
return false;
}
const char* delim = "=";
List *res = NULL;
char* nextToken = NULL;
char* token = strtok_s(strs, delim, &nextToken);
while (token != NULL) {
res = lappend(res, TrimStr(token));
token = strtok_s(NULL, delim, &nextToken);
}
pfree(strs);
if (res->length != pairNum) {
list_free_deep(res);
return false;
}
int index = FindSlruBufferIndex((char*)linitial(res));
if (index != -1) {
if (!StrToInt32((char*)lsecond(res), &g_instance.attr.attr_storage.num_slru_buffers[index])) {
ereport(FATAL, (errcode(ERRCODE_OPERATE_INVALID_PARAM),
errmsg("num_slru_buffers attr has invalid slru buffer num:%s.", (char*)lsecond(res))));
}
list_free_deep(res);
return true;
} else {
ereport(FATAL, (errcode(ERRCODE_OPERATE_INVALID_PARAM),
errmsg("num_slru_buffers attr has invalid slru buffer name: %s.", (char*)linitial(res))));
return false;
}
}
void CheckAndSetSlruBufferInfo(const List* res)
{
ListCell* cell = NULL;
foreach (cell, res) {
char* input = (char*)lfirst(cell);
if (!CheckAndSetSlruBufferNum(input)) {
ereport(FATAL, (errcode(ERRCODE_OPERATE_INVALID_PARAM),
errmsg("num_slru_buffer attr has invalid input syntax.")));
}
}
}
void CheckSlruBufferNumRange(void)
{
int i;
for (i = 0; i < SLRU_BUFFER_KIND; i++) {
if (g_instance.attr.attr_storage.num_slru_buffers[i] < SLRU_BUFFER_INFO[i].minBufferNum ||
g_instance.attr.attr_storage.num_slru_buffers[i] > SLRU_BUFFER_INFO[i].maxBufferNum) {
ereport(FATAL, (errcode(ERRCODE_OPERATE_INVALID_PARAM),
errmsg("Invalid attribute for internal slru buffers."),
errdetail("Current %s slru buffer num %d is out of range [%d, %d].",
SLRU_BUFFER_INFO[i].name, g_instance.attr.attr_storage.num_slru_buffers[i],
SLRU_BUFFER_INFO[i].minBufferNum, SLRU_BUFFER_INFO[i].maxBufferNum)));
}
if (g_instance.attr.attr_storage.num_slru_buffers[i] % SLRU_BANK_SIZE != 0) {
ereport(FATAL, (errcode(ERRCODE_OPERATE_INVALID_PARAM),
errmsg("Invalid attribute for internal slru buffers."),
errdetail("Current %s slru buffer num %d must be a multiple of %d.",
SLRU_BUFFER_INFO[i].name, g_instance.attr.attr_storage.num_slru_buffers[i],
SLRU_BANK_SIZE)));
}
}
}