* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 2010-2013, PostgreSQL Global Development Group
*
*
* walrcvwriter.cpp
* functions for xlog receive management
*
* IDENTIFICATION
* src/gausskernel/storage/replication/walrcvwriter.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/xlog_internal.h"
#include "access/xlogutils.h"
#include "catalog/pg_tablespace.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "replication/walreceiver.h"
#include "replication/dataqueue.h"
#include "replication/datareceiver.h"
#include "replication/walsender.h"
#include "replication/dcf_replication.h"
#include "replication/ss_disaster_cluster.h"
#include "storage/buf/bufmgr.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/smgr/smgr.h"
#include "storage/file/fio_device.h"
#include "utils/guc.h"
#include "access/xlog.h"
#include "access/multi_redo_api.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "gssignal/gs_signal.h"
#include "gs_bbox.h"
#include "storage/gs_uwal/gs_uwal.h"
#ifdef ENABLE_MULTIPLE_NODES
#include "postmaster/barrier_preparse.h"
#endif
* These variables are used similarly to openLogFile/SegNo/Off,
* but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
* corresponding the filename of recvFile.
*/
static volatile int recvFile = -1;
static volatile TimeLineID recvFileTLI = 0;
static volatile XLogSegNo recvSegNo = 0;
static volatile uint32 recvOff = 0;
#define MAX_DUMMY_DATA_FILE (RELSEG_SIZE * BLCKSZ)
#define UWAL_XLOGDIR (g_instance.attr.attr_storage.uwal_devices_path)
static int ws_dummy_data_writer_file_fd = -1;
static uint32 ws_dummy_data_writer_file_offset = 0;
bool ws_dummy_data_writer_use_file = false;
static void WSDataWriteOnDummyStandby(const char *buf, uint32 nbytes);
static void WSDataRcvWriteOnDummyStandby(const char *buf, uint32 nbytes);
static void walrcvWriterSigHupHandler(SIGNAL_ARGS);
static void walrcvWriterQuickDie(SIGNAL_ARGS);
static void reqShutdownHandler(SIGNAL_ARGS);
int walRcvWriteUwal(WalRcvCtlBlock *walrcb, UwalrcvWriterState *uwalrcv, UwalInfo *info);
int defWalRcvWriteUwal(WalRcvCtlBlock *walrcb);
int uwalRcvStateInit(UwalrcvWriterState *uwalrcv, UwalInfo info);
static void uwalRcvStateAlloc();
static void uwalRcvStateFree();
static void XLogWalRcvWriteFromUwal(WalRcvCtlBlock *walrcb, char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli);
static int RenameUwalFile(XLogRecPtr recptr, TimeLineID tli);
static int RemoveUwalFile(XLogRecPtr recptr, TimeLineID tli);
static int WalRcvUwalTruncate(WalRcvCtlBlock *walrcb, UwalrcvWriterState *uwalrcv, UwalInfo *info);
static void RemoveFromUwal(UwalrcvWriterState *uwalrcv, bool needUpdate);
static void UpdateUwalRcv(UwalrcvWriterState *uwalrcv, UwalInfo *info);
extern void XlogArchUwal(XLogRecPtr archRqstPtr);
void SetWalRcvWriterPID(ThreadId tid)
{
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
SpinLockAcquire(&walrcv->mutex);
walrcv->writerPid = tid;
SpinLockRelease(&walrcv->mutex);
}
static void setWalRcvWriterLatch(void)
{
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
SpinLockAcquire(&walrcv->mutex);
walrcv->walrcvWriterLatch = &t_thrd.proc->procLatch;
walrcv->writerPid = t_thrd.proc_cxt.MyProcPid;
SpinLockRelease(&walrcv->mutex);
}
static void emptyWalRcvWriterLatch(void)
{
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
SpinLockAcquire(&walrcv->mutex);
walrcv->walrcvWriterLatch = NULL;
walrcv->writerPid = 0;
SpinLockRelease(&walrcv->mutex);
}
bool WalRcvWriterInProgress(void)
{
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
SpinLockAcquire(&walrcv->mutex);
if (walrcv->writerPid == 0) {
SpinLockRelease(&walrcv->mutex);
return false;
}
SpinLockRelease(&walrcv->mutex);
return true;
}
WalRcvCtlBlock *getCurrentWalRcvCtlBlock(void)
{
WalRcvCtlBlock *walrcb = NULL;
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
SpinLockAcquire(&walrcv->mutex);
walrcb = walrcv->walRcvCtlBlock;
SpinLockRelease(&walrcv->mutex);
return walrcb;
}
* Write XLOG data to disk.
*/
static void XLogWalRcvWrite(WalRcvCtlBlock *walrcb, char *buf, Size nbytes, XLogRecPtr recptr)
{
int startoff;
int byteswritten;
int nRetCode = 0;
Size write_bytes = nbytes;
instr_time startTime;
instr_time endTime;
while (nbytes > 0) {
int segbytes;
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo)) {
bool use_existent = false;
* fsync() and close current file before we switch to next one. We
* would otherwise have to reopen this file to fsync it later
*/
if (recvFile >= 0) {
char xlogfname[MAXFNAMELEN];
* XLOG segment files will be re-read by recovery in startup
* process soon, so we don't advise the OS to release cache
* pages associated with the file like XLogFileClose() does.
*/
if (close(recvFile) != 0)
ereport(PANIC, (errcode_for_file_access(),
errmsg("could not close log file %s: %m",
XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, recvSegNo))));
* Create .done file forcibly to prevent the restored segment from
* being archived again later.
*/
XLogFileName(xlogfname, MAXFNAMELEN, recvFileTLI, recvSegNo);
XLogArchiveForceDone(xlogfname);
}
recvFile = -1;
XLByteToSeg(recptr, recvSegNo);
use_existent = true;
recvFile = XLogFileInit(recvSegNo, &use_existent, true);
recvFileTLI = t_thrd.xlog_cxt.ThisTimeLineID;
recvOff = 0;
g_instance.wal_cxt.walRecvWriterStats->currentXlogSegno = recvSegNo;
}
startoff = (int)(recptr % XLogSegSize);
if (startoff + nbytes > XLogSegSize)
segbytes = (int)(XLogSegSize - startoff);
else
segbytes = nbytes;
if (recvOff != (uint32)startoff) {
if (lseek(recvFile, (off_t)startoff, SEEK_SET) < 0)
ereport(PANIC, (errcode_for_file_access(),
errmsg("could not seek in log file %s to offset %d: %m",
XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, recvSegNo), startoff)));
recvOff = startoff;
}
errno = 0;
INSTR_TIME_SET_CURRENT(startTime);
byteswritten = write(recvFile, buf, segbytes);
INSTR_TIME_SET_CURRENT(endTime);
if (g_instance.wal_cxt.walRecvWriterStats->isEnableStat) {
INSTR_TIME_SUBTRACT(endTime, startTime);
PgStat_Counter elapsedTime = (PgStat_Counter)INSTR_TIME_GET_MICROSEC(endTime);
SpinLockAcquire(&g_instance.wal_cxt.walRecvWriterStats->mutex);
volatile bool recheck = g_instance.wal_cxt.walRecvWriterStats->isEnableStat;
if (recheck) {
g_instance.wal_cxt.walRecvWriterStats->totalWriteTime += elapsedTime;
g_instance.wal_cxt.walRecvWriterStats->totalWriteBytes += byteswritten;
g_instance.wal_cxt.walRecvWriterStats->writeTimes++;
}
SpinLockRelease(&g_instance.wal_cxt.walRecvWriterStats->mutex);
}
if (byteswritten <= 0) {
if (errno == 0)
errno = ENOSPC;
ereport(PANIC,
(errcode_for_file_access(),
errmsg("could not write to log file %s at offset %u, length %lu: %m",
XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, recvSegNo), recvOff, INT2ULONG(segbytes))));
}
if (recvOff == (uint32)0 && segbytes >= (int)sizeof(XLogPageHeaderData)) {
if (((XLogPageHeader)buf)->xlp_magic == XLOG_PAGE_MAGIC &&
(recvSegNo * XLogSegSize) != ((XLogPageHeader)buf)->xlp_pageaddr) {
ereport(PANIC, (errcode_for_file_access(),
errmsg("unexpected page addr %lu of log file %s", ((XLogPageHeader)buf)->xlp_pageaddr,
XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, recvSegNo))));
}
}
XLByteAdvance(recptr, byteswritten);
recvOff += byteswritten;
nbytes -= byteswritten;
buf += byteswritten;
INSTR_TIME_SET_CURRENT(startTime);
issue_xlog_fsync(recvFile, recvSegNo);
INSTR_TIME_SET_CURRENT(endTime);
if (g_instance.wal_cxt.walRecvWriterStats->isEnableStat) {
INSTR_TIME_SUBTRACT(endTime, startTime);
PgStat_Counter elapsedTime = (PgStat_Counter)INSTR_TIME_GET_MICROSEC(endTime);
SpinLockAcquire(&g_instance.wal_cxt.walRecvWriterStats->mutex);
volatile bool recheck = g_instance.wal_cxt.walRecvWriterStats->isEnableStat;
if (recheck) {
g_instance.wal_cxt.walRecvWriterStats->totalSyncBytes += byteswritten;
g_instance.wal_cxt.walRecvWriterStats->totalSyncTime += elapsedTime;
g_instance.wal_cxt.walRecvWriterStats->syncTimes++;
}
SpinLockRelease(&g_instance.wal_cxt.walRecvWriterStats->mutex);
}
t_thrd.walrcvwriter_cxt.walStreamWrite = recptr;
SpinLockAcquire(&walrcb->mutex);
walrcb->writePtr = walrcb->flushPtr = recptr;
SpinLockRelease(&walrcb->mutex);
SpinLockAcquire(&walrcv->mutex);
if (XLByteLT(walrcv->receivedUpto, t_thrd.walrcvwriter_cxt.walStreamWrite)) {
walrcv->latestChunkStart = walrcv->receivedUpto;
walrcv->receivedUpto = t_thrd.walrcvwriter_cxt.walStreamWrite;
}
SpinLockRelease(&walrcv->mutex);
#ifdef ENABLE_MULTIPLE_NODES
WakeUpBarrierPreParseBackend();
#endif
WakeupRecovery();
if (AllowCascadeReplication())
WalSndWakeup();
if (u_sess->attr.attr_common.update_process_title) {
char activitymsg[50];
nRetCode = snprintf_s(activitymsg, sizeof(activitymsg), sizeof(activitymsg) - 1,
"walrcvwriter streaming %X/%X",
(uint32)(t_thrd.walrcvwriter_cxt.walStreamWrite >> 32),
(uint32)t_thrd.walrcvwriter_cxt.walStreamWrite);
securec_check_ss(nRetCode, "\0", "\0");
set_ps_display(activitymsg, false);
}
ereport(DEBUG2, (errmsg("write xlog done: start %X/%X %lu bytes", (uint32)(recptr >> 32), (uint32)recptr,
write_bytes)));
}
#ifndef ENABLE_MULTIPLE_NODES
if (g_instance.attr.attr_storage.dcf_attr.enable_dcf) {
UpdateRecordIdxState();
}
#endif
}
void WalRcvXLogClose(void)
{
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
if (recvFile >= 0) {
* XLOG segment files will be re-read by recovery in startup
* process soon, so we don't advise the OS to release cache
* pages associated with the file like XLogFileClose() does.
*/
if (close(recvFile) != 0)
ereport(PANIC,
(errcode_for_file_access(), errmsg("could not close log file %s: %m",
XLogFileNameP(t_thrd.xlog_cxt.ThisTimeLineID, recvSegNo))));
}
recvFile = -1;
LWLockRelease(WALWriteLock);
}
static void WSDataWriteOnDummyStandby(const char *buf, uint32 nbytes)
{
ssize_t write_len = -1;
char path[MAXPGPATH] = {0};
int nRet = 0;
* get the file num to store:
* 1. If MAX_DUMMY_DATA_FILE <= wal_data_writer_file_offset, open
* the next file to store
* 2. If the current file can not store the nbytes, discard the current file space,
* open the next file to store.
*/
if (ws_dummy_data_writer_file_offset >= MAX_DUMMY_DATA_FILE ||
(MAX_DUMMY_DATA_FILE - ws_dummy_data_writer_file_offset < (uint32)(nbytes + sizeof(nbytes)))) {
ereport(DEBUG2, (errmsg("data file num %u", t_thrd.walrcvwriter_cxt.ws_dummy_data_writer_file_num)));
t_thrd.walrcvwriter_cxt.ws_dummy_data_writer_file_num++;
ws_dummy_data_writer_file_offset = 0;
if (ws_dummy_data_writer_file_fd >= 0) {
close(ws_dummy_data_writer_file_fd);
ws_dummy_data_writer_file_fd = -1;
}
}
if (ws_dummy_data_writer_file_fd < 0) {
nRet = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "base/dummy_standby/%u",
t_thrd.walrcvwriter_cxt.ws_dummy_data_writer_file_num);
securec_check_ss_c(nRet, "\0", "\0");
ws_dummy_data_writer_file_fd = open(path, O_RDWR | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (ws_dummy_data_writer_file_fd < 0)
ereport(PANIC, (errcode_for_file_access(),
errmsg("could not create data file \"%s\", dummy_data_writer_file_fd=%d: %m", path,
ws_dummy_data_writer_file_fd)));
}
errno = 0;
write_len = write(ws_dummy_data_writer_file_fd, &nbytes, sizeof(uint32));
if (write_len < (ssize_t)sizeof(uint32)) {
if (errno == 0) {
errno = ENOSPC;
}
ereport(PANIC, (errcode_for_file_access(), errmsg("could not write to data file %s buffer len %u, length %u: %s",
path, nbytes, nbytes, TRANSLATE_ERRNO)));
}
errno = 0;
write_len = write(ws_dummy_data_writer_file_fd, buf, nbytes);
if (write_len < nbytes) {
if (errno == 0)
errno = ENOSPC;
ereport(PANIC, (errcode_for_file_access(), errmsg("could not write to data file %s "
"at offset %u, length %u: %m",
path, (uint32)ws_dummy_data_writer_file_offset, nbytes)));
}
ws_dummy_data_writer_file_offset = ws_dummy_data_writer_file_offset + nbytes + sizeof(nbytes);
ereport(u_sess->attr.attr_storage.HaModuleDebug ? LOG : DEBUG2,
(errmsg("the dummy data write info: writer_file_num %u %u bytes",
t_thrd.walrcvwriter_cxt.ws_dummy_data_writer_file_num, nbytes)));
if (pg_fdatasync(ws_dummy_data_writer_file_fd) != 0)
ereport(PANIC, (errcode_for_file_access(),
errmsg("could not fdatasync data file num %u, fd %d: %m",
t_thrd.walrcvwriter_cxt.ws_dummy_data_writer_file_num, ws_dummy_data_writer_file_fd)));
}
* Write data to disk of DummyStandby.
*/
static void WSDataRcvWriteOnDummyStandby(const char *buf, uint32 nbytes)
{
ws_dummy_data_writer_use_file = true;
WSDataWriteOnDummyStandby(buf, (uint32)nbytes);
ws_dummy_data_writer_use_file = false;
}
* close data fd for rm data file
*/
void CloseWSDataFileOnDummyStandby(void)
{
close(ws_dummy_data_writer_file_fd);
ws_dummy_data_writer_file_fd = -1;
ws_dummy_data_writer_file_offset = 0;
}
void InitWSDataNumOnDummyStandby(void)
{
DIR *dir = NULL;
struct dirent *de = NULL;
int max_num_file = 0;
int min_num_file = 0;
char *dirpath = DUMMY_STANDBY_DATADIR;
errno = 0;
dir = AllocateDir(dirpath);
if (dir == NULL && errno == ENOENT) {
if (mkdir(dirpath, S_IRWXU) < 0) {
ereport(ERROR, (errcode_for_file_access(), errmsg("could not create directory \"%s\": %m", dirpath)));
}
dir = AllocateDir(dirpath);
}
if (dir == NULL) {
ereport(ERROR, (errcode_for_file_access(), errmsg("could not open directory \"%s\": %m", dirpath)));
return;
}
while ((de = ReadDir(dir, dirpath)) != NULL) {
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
continue;
min_num_file = atoi(de->d_name);
break;
}
FreeDir(dir);
dir = AllocateDir(dirpath);
while ((de = ReadDir(dir, dirpath)) != NULL) {
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
continue;
max_num_file = Max(atoi(de->d_name), max_num_file);
min_num_file = Min(atoi(de->d_name), min_num_file);
ereport(DEBUG5, (errmsg("InitDummyDataNum de->d_name=%s; max_num_path=%d; min_num_file=%d.", de->d_name,
max_num_file, min_num_file)));
}
t_thrd.walsender_cxt.ws_dummy_data_read_file_num = min_num_file;
t_thrd.walrcvwriter_cxt.ws_dummy_data_writer_file_num = max_num_file;
FreeDir(dir);
}
* Write all the wal data to disk.
*/
int WalDataRcvWrite(void)
{
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
char *writeBuf = NULL;
char *cur_buf = NULL;
uint32 left_nbytes = 0;
uint32 get_queue_nbytes = 0;
uint32 remainbytes = 0;
uint32 cur_data_nbytes = 0;
DataQueuePtr start_pos = { 0, 0 };
DataQueuePtr end_pos = { 0, 0 };
WalRcvCtlBlock *walrcb = getCurrentWalRcvCtlBlock();
errno_t errorno = EOK;
XLogRecPtr recptr = InvalidXLogRecPtr;
char data_flag;
if (!g_instance.attr.attr_storage.enable_mix_replication) {
if (g_instance.attr.attr_storage.enable_uwal) {
return defWalRcvWriteUwal(walrcb);
} else {
return walRcvWrite(walrcb);
}
} else if (!t_thrd.xlog_cxt.InRecovery)
t_thrd.xlog_cxt.InRecovery = true;
START_CRIT_SECTION();
LWLockAcquire(RcvWriteLock, LW_EXCLUSIVE);
SpinLockAcquire(&walrcv->mutex);
start_pos.queueid = walrcv->local_write_pos.queueid;
start_pos.queueoff = walrcv->local_write_pos.queueoff;
SpinLockRelease(&walrcv->mutex);
get_queue_nbytes = GetFromDataQueue(writeBuf, g_instance.attr.attr_storage.DataQueueBufSize * 1024, start_pos,
end_pos, true, t_thrd.dataqueue_cxt.DataWriterQueue);
if (get_queue_nbytes == 0) {
LWLockRelease(RcvWriteLock);
END_CRIT_SECTION();
return 0;
}
cur_buf = writeBuf;
left_nbytes = get_queue_nbytes;
while (left_nbytes > 0) {
errorno = memcpy_s(&cur_data_nbytes, sizeof(uint32), cur_buf, sizeof(uint32));
securec_check(errorno, "\0", "\0");
data_flag = *(cur_buf + sizeof(uint32));
switch (data_flag) {
case 'w':
Assert(cur_data_nbytes > (sizeof(uint32) + 1 + sizeof(XLogRecPtr)));
errorno = memcpy_s(&recptr, sizeof(XLogRecPtr), cur_buf + sizeof(uint32) + 1, sizeof(XLogRecPtr));
securec_check(errorno, "\0", "\0");
Assert(!XLogRecPtrIsInvalid(recptr));
(void)WSWalRcvWrite(walrcb, cur_buf + sizeof(uint32) + 1 + sizeof(XLogRecPtr),
cur_data_nbytes - (sizeof(uint32) + 1 + sizeof(XLogRecPtr)), recptr);
break;
case 'd':
if (u_sess->attr.attr_storage.HaModuleDebug)
WSDataRcvCheck(cur_buf, cur_data_nbytes);
if (dummyStandbyMode)
WSDataRcvWriteOnDummyStandby(cur_buf, cur_data_nbytes);
else {
*
* 1. total_len has already been parsed as cur_data_nbytes
*/
cur_buf += sizeof(uint32);
Assert(cur_buf[0] == 'd');
cur_buf += 1;
cur_buf += sizeof(XLogRecPtr) * 2;
cur_data_nbytes -= WS_DATA_MSG_PREFIX_LEN;
left_nbytes -= WS_DATA_MSG_PREFIX_LEN;
remainbytes = DoDataWrite(cur_buf, cur_data_nbytes);
Assert(remainbytes == 0);
cur_data_nbytes -= remainbytes;
}
smgrcloseall();
break;
default:
Assert(false);
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unexpected wal data type %c", data_flag)));
break;
}
cur_buf += cur_data_nbytes;
left_nbytes -= cur_data_nbytes;
}
Assert(left_nbytes == 0);
SpinLockAcquire(&walrcv->mutex);
walrcv->local_write_pos.queueid = start_pos.queueid;
walrcv->local_write_pos.queueoff = start_pos.queueoff;
DQByteAdvance(walrcv->local_write_pos, get_queue_nbytes);
SpinLockRelease(&walrcv->mutex);
PopFromDataQueue(((WalRcvData *)walrcv)->local_write_pos, t_thrd.dataqueue_cxt.DataWriterQueue);
LWLockRelease(RcvWriteLock);
END_CRIT_SECTION();
return (int)get_queue_nbytes;
}
* Flush the log to disk.
*
* If we're in the midst of dying, it's unwise to do anything that might throw
* an error, so we skip sending a reply in that case.
*/
int walRcvWrite(WalRcvCtlBlock *walrcb)
{
int64 walfreeoffset;
int64 walwriteoffset;
char *walrecvbuf = NULL;
XLogRecPtr startptr;
int64 recBufferSize = g_instance.attr.attr_storage.WalReceiverBufSize * 1024;
int nbytes = 0;
if (walrcb == NULL)
return 0;
START_CRIT_SECTION();
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
SpinLockAcquire(&walrcb->mutex);
if (walrcb->walFreeOffset == walrcb->walWriteOffset) {
SpinLockRelease(&walrcb->mutex);
LWLockRelease(WALWriteLock);
END_CRIT_SECTION();
return 0;
}
walfreeoffset = walrcb->walFreeOffset;
walwriteoffset = walrcb->walWriteOffset;
walrecvbuf = walrcb->walReceiverBuffer;
startptr = walrcb->walStart;
SpinLockRelease(&walrcb->mutex);
nbytes = (walfreeoffset < walwriteoffset) ? (recBufferSize - walwriteoffset) : (walfreeoffset - walwriteoffset);
XLogWalRcvWrite(walrcb, walrecvbuf + walwriteoffset, nbytes, startptr);
XLByteAdvance(startptr, nbytes);
ereport(DEBUG5,
(errmsg("walRcvWrite: write len:%d, at %u,%X", nbytes, (uint32)(startptr >> 32), (uint32)startptr)));
SpinLockAcquire(&walrcb->mutex);
walrcb->walWriteOffset += nbytes;
walrcb->walStart = startptr;
if (walrcb->walWriteOffset == recBufferSize) {
walrcb->walWriteOffset = 0;
if (walrcb->walFreeOffset == recBufferSize) {
walrcb->walFreeOffset = 0;
}
}
walfreeoffset = walrcb->walFreeOffset;
walwriteoffset = walrcb->walWriteOffset;
SpinLockRelease(&walrcb->mutex);
ereport(DEBUG5, (errmsg("walRcvWrite: nbytes(%d),walfreeoffset(%ld),walwriteoffset(%ld),startptr(%u:%u)", nbytes,
walfreeoffset, walwriteoffset, (uint32)(startptr >> 32), (uint32)startptr)));
LWLockRelease(WALWriteLock);
END_CRIT_SECTION();
g_instance.comm_cxt.predo_cxt.redoPf.local_max_lsn = startptr;
return nbytes;
}
* The main difference between walRcvWrite() and WSWalRcvWrite() is that
* for walRcvWrite() the wal source is from the walrcb and for WSWalRcvWrite()
* the wal source is from the wal write queue.
*/
int WSWalRcvWrite(WalRcvCtlBlock *walrcb, char *buf, Size nbytes, XLogRecPtr start_ptr)
{
if (walrcb == NULL) {
ereport(NOTICE, (errmsg("have nothing of the wal receiver wal block info.")));
return 0;
}
START_CRIT_SECTION();
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
XLogWalRcvWrite(walrcb, buf, nbytes, start_ptr);
LWLockRelease(WALWriteLock);
END_CRIT_SECTION();
return nbytes;
}
* Called when the WalRcvWriterMain is ending.
*/
static void ShutdownWalRcvWriter(int code, Datum arg)
{
emptyWalRcvWriterLatch();
}
static void WalRcvWriterProcSigUsr1Handler(SIGNAL_ARGS)
{
int saveErrno = errno;
latch_sigusr1_handler();
errno = saveErrno;
}
void walrcvWriterMain(void)
{
sigjmp_buf localSigjmpBuf;
sigset_t oldSigMask;
MemoryContext walrcvWriterContext;
ereport(LOG, (errmsg("walrcvwriter thread started")));
* Reset some signals that are accepted by postmaster but not here
*/
(void)gspqsignal(SIGHUP, walrcvWriterSigHupHandler);
(void)gspqsignal(SIGINT, SIG_IGN);
(void)gspqsignal(SIGTERM, reqShutdownHandler);
(void)gspqsignal(SIGQUIT, walrcvWriterQuickDie);
(void)gspqsignal(SIGALRM, SIG_IGN);
(void)gspqsignal(SIGPIPE, SIG_IGN);
(void)gspqsignal(SIGUSR1, WalRcvWriterProcSigUsr1Handler);
(void)gspqsignal(SIGUSR2, SIG_IGN);
(void)gspqsignal(SIGURG, print_stack);
* Reset some signals that are accepted by postmaster but not here
*/
(void)gspqsignal(SIGCHLD, SIG_DFL);
(void)gspqsignal(SIGTTIN, SIG_DFL);
(void)gspqsignal(SIGTTOU, SIG_DFL);
(void)gspqsignal(SIGCONT, SIG_DFL);
(void)gspqsignal(SIGWINCH, SIG_DFL);
(void)sigdelset(&t_thrd.libpq_cxt.BlockSig, SIGQUIT);
on_shmem_exit(ShutdownWalRcvWriter, 0);
* Create a resource owner to keep track of our resources (currently only
* buffer pins).
*/
t_thrd.utils_cxt.CurrentResourceOwner = ResourceOwnerCreate(NULL, "WalReceive Writer",
THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE));
* Create a memory context that we will do all our work in. We do this so
* that we can reset the context during error recovery and thereby avoid
* possible memory leaks. Formerly this code just ran in
* t_thrd.top_mem_cxt, but resetting that would be a really bad idea.
*/
walrcvWriterContext = AllocSetContextCreate(t_thrd.top_mem_cxt, "WalReceive Writer", ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
(void)MemoryContextSwitchTo(walrcvWriterContext);
if (dummyStandbyMode) {
InitWSDataNumOnDummyStandby();
t_thrd.walrcvwriter_cxt.ws_dummy_data_writer_file_num++;
ws_dummy_data_writer_file_offset = 0;
}
* If an exception is encountered, processing resumes here.
*
* See notes in postgres.c about the design of this coding.
*/
int curTryCounter = 0;
int *oldTryCounter = nullptr;
if (sigsetjmp(localSigjmpBuf, 1) != 0) {
gstrace_tryblock_exit(true, oldTryCounter);
pthread_sigmask(SIG_SETMASK, &oldSigMask, NULL);
t_thrd.log_cxt.error_context_stack = NULL;
HOLD_INTERRUPTS();
EmitErrorReport();
AbortAsyncListIO();
AtEOXact_SysDBCache(false);
* These operations are really just a minimal subset of
* AbortTransaction(). We don't have very many resources to worry
* about in pagewriter, but we do have LWLocks, buffers, and temp files.
*/
LWLockReleaseAll();
AbortBufferIO();
UnlockBuffers();
ResourceOwnerRelease(t_thrd.utils_cxt.CurrentResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, false, true);
AtEOXact_Buffers(false);
AtEOXact_Files();
AtEOXact_HashTables(false);
* Now return to normal top-level context and clear ErrorContext for
* next time.
*/
(void)MemoryContextSwitchTo(walrcvWriterContext);
FlushErrorState();
MemoryContextResetAndDeleteChildren(walrcvWriterContext);
RESUME_INTERRUPTS();
* Sleep at least 1 second after any error. A write error is likely
* to be repeated, and we don't want to be filling the error logs as
* fast as we can.
*/
pg_usleep(1000000L);
* Close all open files after any error. This is helpful on Windows,
* where holding deleted files open causes various strange errors.
* It's not clear we need it elsewhere, but shouldn't hurt.
*/
smgrcloseall();
}
oldTryCounter = gstrace_tryblock_entry(&curTryCounter);
t_thrd.log_cxt.PG_exception_stack = &localSigjmpBuf;
* Unblock signals (they were blocked when the postmaster forked us)
*/
gs_signal_setmask(&t_thrd.libpq_cxt.UnBlockSig, NULL);
(void)gs_signal_unblock_sigusr2();
* Use the recovery target timeline ID during recovery
*/
if (g_instance.attr.attr_storage.enable_uwal) {
if (t_thrd.postmaster_cxt.HaShmData->current_mode != NORMAL_MODE &&
t_thrd.postmaster_cxt.HaShmData->current_mode != PRIMARY_MODE) {
if (!RecoveryInProgress())
ereport(FATAL, (errmsg("cannot continue WAL streaming, recovery has already ended")));
}
} else {
if (!RecoveryInProgress())
ereport(FATAL, (errmsg("cannot continue WAL streaming, recovery has already ended")));
}
t_thrd.xlog_cxt.ThisTimeLineID = GetRecoveryTargetTLI();
* register procLatch to walrcv shared memory
*/
setWalRcvWriterLatch();
t_thrd.walrcvwriter_cxt.walStreamWrite = GetXLogReplayRecPtr(NULL);
pgstat_report_appname("Wal Receive Writer");
pgstat_report_activity(STATE_IDLE, NULL);
if (g_instance.attr.attr_storage.enable_uwal) {
uwalRcvStateAlloc();
}
* Loop forever
*/
for (;;) {
int rc;
ResetLatch(&t_thrd.proc->procLatch);
if (t_thrd.walrcvwriter_cxt.gotSIGHUP) {
t_thrd.walrcvwriter_cxt.gotSIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
while (!t_thrd.walrcvwriter_cxt.shutdownRequested && WalDataRcvWrite() > 0)
;
if (t_thrd.walrcvwriter_cxt.shutdownRequested) {
ereport(LOG, (errmsg("walrcvwriter thread shut down")));
* From here on, elog(ERROR) should end with exit(1), not send
* control back to the sigsetjmp block above
*/
u_sess->attr.attr_common.ExitOnAnyError = true;
if (g_instance.attr.attr_storage.enable_uwal) {
GsUwalRcvFlush();
while (WalDataRcvWrite() > 0) {
};
uwalRcvStateFree();
}
proc_exit(0);
}
rc = WaitLatch(&t_thrd.proc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, (long)1000 );
if (rc & WL_POSTMASTER_DEATH) {
ereport(LOG, (errmsg("walrcvwriter thread shut down with code 1")));
gs_thread_exit(1);
}
}
}
static void walrcvWriterSigHupHandler(SIGNAL_ARGS)
{
int saveErrno = errno;
t_thrd.walrcvwriter_cxt.gotSIGHUP = true;
if (t_thrd.proc) {
SetLatch(&t_thrd.proc->procLatch);
}
errno = saveErrno;
}
static void walrcvWriterQuickDie(SIGNAL_ARGS)
{
gs_signal_setmask(&t_thrd.libpq_cxt.BlockSig, NULL);
* We DO NOT want to run proc_exit() callbacks -- we're here because
* shared memory may be corrupted, so we don't want to try to clean up our
* transaction. Just nail the windows shut and get out of town. Now that
* there's an atexit callback to prevent third-party code from breaking
* things by calling exit() directly, we have to reset the callbacks
* explicitly to make this work as intended.
*/
on_exit_reset();
* Note we do exit(2) not exit(0). This is to force the postmaster into a
* system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
* backend. This is necessary precisely because we don't clean up our
* shared memory state. (The "dead man switch" mechanism in pmsignal.c
* should ensure the postmaster sees this as a crash, too, but no harm in
* being doubly sure.)
*/
gs_thread_exit(2);
}
static void reqShutdownHandler(SIGNAL_ARGS)
{
int saveErrno = errno;
t_thrd.walrcvwriter_cxt.shutdownRequested = true;
if (t_thrd.proc) {
SetLatch(&t_thrd.proc->procLatch);
}
errno = saveErrno;
}
int walRcvWriteUwal(WalRcvCtlBlock *walrcb, UwalrcvWriterState *uwalrcv, UwalInfo *info)
{
char *walrecvbuf = NULL;
XLogRecPtr startPtr = InvalidXLogRecPtr;
XLogRecPtr readPtr = InvalidXLogRecPtr;
XLogRecPtr writePtr = InvalidXLogRecPtr;
int64 recBufferSize = g_instance.attr.attr_storage.WalReceiverBufSize * 1024;
int nbytes = 0;
int64 readLen = 0;
int64 uwalReadOffset;
int64 uwalFreeOffset;
TimeLineID tli;
bool writeNoWait = false;
START_CRIT_SECTION();
SpinLockAcquire(&uwalrcv->writeMutex);
writePtr = uwalrcv->writePtr;
SpinLockRelease(&uwalrcv->writeMutex);
SpinLockAcquire(&uwalrcv->mutex);
if (uwalrcv->readPtr == writePtr) {
SpinLockRelease(&uwalrcv->mutex);
END_CRIT_SECTION();
return 0;
}
walrecvbuf = uwalrcv->uwalReceiverBuffer;
startPtr = uwalrcv->startPtr;
readPtr = uwalrcv->readPtr;
tli = uwalrcv->startTimeLine;
SpinLockRelease(&uwalrcv->mutex);
readLen = writePtr - readPtr;
if (readLen <= 0) {
ereport(LOG, (errmsg("walRcvWriteUwal no need to write")));
END_CRIT_SECTION();
return 0;
}
uwalReadOffset = readPtr - startPtr;
uwalFreeOffset = recBufferSize - uwalReadOffset;
nbytes = (int)Min((int64)MaxReadUwalBytes, Min(uwalFreeOffset, readLen));
if (startPtr / XLogSegSize != t_thrd.xlog_cxt.uwalInfo.info.startWriteOffset / XLogSegSize) {
if (startPtr / XLogSegSize < writePtr / XLogSegSize) {
uint64_t segNum = (writePtr / XLogSegSize) - (startPtr / XLogSegSize);
startPtr = (writePtr / XLogSegSize) * XLogSegSize;
SpinLockAcquire(&uwalrcv->mutex);
uwalrcv->startPtr = startPtr;
uwalrcv->flushPtr = startPtr;
uwalrcv->writeNoWait = false;
SpinLockRelease(&uwalrcv->mutex);
END_CRIT_SECTION();
return segNum * XLogSegSize;
}
END_CRIT_SECTION();
return 0;
}
if (startPtr % XLogSegSize + uwalReadOffset + nbytes > XLogSegSize) {
nbytes = (int)(XLogSegSize - uwalReadOffset - startPtr % XLogSegSize);
writeNoWait = true;
}
int ret = 0;
if (nbytes > 0) {
ret = GsUwalRead(&info->id, readPtr, walrecvbuf + uwalReadOffset, nbytes);
if (0 != ret) {
ereport(
LOG,
(errmsg("walRcvWriteUwal uwal_read failed ret: %d,readPtr: %lu,startPtr: %lu,startWriteOffset: %lu",
ret, readPtr, startPtr, t_thrd.xlog_cxt.uwalInfo.info.startWriteOffset)));
END_CRIT_SECTION();
return -1;
}
XLByteAdvance(readPtr, nbytes);
uwalReadOffset += nbytes;
}
SpinLockAcquire(&uwalrcv->mutex);
uwalrcv->readPtr = readPtr;
writeNoWait = writeNoWait || uwalrcv->writeNoWait;
SpinLockRelease(&uwalrcv->mutex);
if (uwalReadOffset < recBufferSize && writeNoWait == false) {
END_CRIT_SECTION();
return 0;
}
XLogWalRcvWriteFromUwal(walrcb, walrecvbuf, uwalReadOffset, startPtr, tli);
XLByteAdvance(startPtr, uwalReadOffset);
SpinLockAcquire(&uwalrcv->mutex);
uwalrcv->startPtr = startPtr;
uwalrcv->flushPtr = startPtr;
uwalrcv->writeNoWait = false;
SpinLockRelease(&uwalrcv->mutex);
ereport(DEBUG5,
(errmsg("walRcvWrite: nbytes(%d),startptr(%u:%u)", nbytes, (uint32)(startPtr >> 32), (uint32)startPtr)));
END_CRIT_SECTION();
g_instance.comm_cxt.predo_cxt.redoPf.local_max_lsn = startPtr;
return nbytes;
}
int defWalRcvWriteUwal(WalRcvCtlBlock *walrcb)
{
UwalrcvWriterState *uwalrcv = GsGetCurrentUwalRcvState();
if (t_thrd.walrcvwriter_cxt.shutdownRequested) {
if (t_thrd.xlog_cxt.uwalInfo.info.dataSize != 0) {
ereport(WARNING, (errmsg("walrcvwriter in shutdown requested, move the rest uwal file")));
RemoveFromUwal(uwalrcv, true);
}
return 0;
}
int ret = 0;
bool needXlogCatchup = false;
bool fullSync = true;
XLogRecPtr startPtr = InvalidXLogRecPtr;
if (t_thrd.xlog_cxt.uwalInfo.info.dataSize == 0) {
int ret = GsUwalQueryByUser(t_thrd.xlog_cxt.ThisTimeLineID, true);
if (0 != ret) {
ereport(LOG, (errmsg("walrcvwriter thread shut down with UwalQueryByUser failed retCode: %d", ret)));
gs_thread_exit(1);
}
if (t_thrd.xlog_cxt.uwalInfo.info.dataSize > 0) {
ereport(LOG, (errmsg("walrcvwriter find uwal datasize : %lu", t_thrd.xlog_cxt.uwalInfo.info.dataSize)));
uwalRcvStateInit(uwalrcv, t_thrd.xlog_cxt.uwalInfo);
RemoveFromUwal(uwalrcv, false);
} else {
ereport(LOG, (errmsg("walrcvwriter no uwal detected")));
return 0;
}
}
if (t_thrd.postmaster_cxt.HaShmData->current_mode == STANDBY_MODE) {
SpinLockAcquire(&uwalrcv->mutex);
needXlogCatchup = uwalrcv->needXlogCatchup;
SpinLockRelease(&uwalrcv->mutex);
if (needXlogCatchup) {
if (walrcb == NULL)
return 0;
int ret = walRcvWrite(walrcb);
if (ret != 0) {
return ret;
}
ret = GsUwalQuery(&t_thrd.xlog_cxt.uwalInfo.id, &t_thrd.xlog_cxt.uwalInfo.info);
if (ret != 0) {
ereport(WARNING, (errmsg("GsUwalQuery return failed")));
return ret;
}
SpinLockAcquire(&walrcb->mutex);
startPtr = walrcb->walStart;
SpinLockRelease(&walrcb->mutex);
if (startPtr < t_thrd.xlog_cxt.uwalInfo.info.truncateOffset) {
return ret;
}
SpinLockAcquire(&uwalrcv->mutex);
uwalrcv->needXlogCatchup = false;
SpinLockRelease(&uwalrcv->mutex);
}
}
ret = walRcvWriteUwal(walrcb, uwalrcv, &t_thrd.xlog_cxt.uwalInfo);
if (t_thrd.postmaster_cxt.HaShmData->current_mode == NORMAL_MODE) {
WalRcvUwalTruncate(walrcb, uwalrcv, &t_thrd.xlog_cxt.uwalInfo);
RemoveFromUwal(uwalrcv, false);
} else {
SpinLockAcquire(&uwalrcv->mutex);
fullSync = uwalrcv->fullSync;
SpinLockRelease(&uwalrcv->mutex);
if (fullSync) {
WalRcvUwalTruncate(walrcb, uwalrcv, &t_thrd.xlog_cxt.uwalInfo);
}
RemoveFromUwal(uwalrcv, !fullSync);
}
return ret;
}
int uwalRcvStateInit(UwalrcvWriterState *uwalrcv, UwalInfo info)
{
SpinLockAcquire(&uwalrcv->mutex);
uwalrcv->startTimeLine = info.info.startTimeLine;
uwalrcv->startPtr = info.info.truncateOffset;
uwalrcv->flushPtr = info.info.truncateOffset;
uwalrcv->truncatePtr = info.info.truncateOffset;
uwalrcv->readPtr = info.info.truncateOffset;
uwalrcv->renamePtr = info.info.truncateOffset;
uwalrcv->needQuery = false;
uwalrcv->needXlogCatchup = true;
SpinLockRelease(&uwalrcv->mutex);
struct timeval tv;
gettimeofday(&tv, NULL);
uwalrcv->truncateTimeStamp = tv.tv_sec;
SpinLockAcquire(&uwalrcv->writeMutex);
uwalrcv->writePtr = info.info.writeOffset;
SpinLockRelease(&uwalrcv->writeMutex);
ereport(LOG, (errmsg("uwalRcvStateInit: truncate %lu, write %lu",
info.info.truncateOffset, info.info.writeOffset)));
return 0;
}
static void uwalRcvStateAlloc()
{
char *buf = NULL;
int64 recBufferSize = g_instance.attr.attr_storage.WalReceiverBufSize * 1024;
size_t len = offsetof(UwalrcvWriterState, uwalReceiverBuffer) + recBufferSize;
errno_t rc = 0;
buf = (char *)palloc(len);
if (buf == NULL) {
ereport(FATAL, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory")));
}
rc = memset_s(buf, sizeof(UwalrcvWriterState), 0, sizeof(UwalrcvWriterState));
securec_check_c(rc, "\0", "\0");
UwalrcvWriterState *state = (UwalrcvWriterState *)buf;
SpinLockInit(&state->mutex);
SpinLockInit(&state->writeMutex);
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
SpinLockAcquire(&walrcv->uwalMutex);
walrcv->uwalRcvState = state;
SpinLockRelease(&walrcv->uwalMutex);
}
static void uwalRcvStateFree()
{
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
SpinLockAcquire(&walrcv->uwalMutex);
if (walrcv->uwalRcvState != NULL) {
pfree(walrcv->uwalRcvState);
walrcv->uwalRcvState = NULL;
}
SpinLockRelease(&walrcv->uwalMutex);
}
static void XLogWalRcvWriteFromUwal(WalRcvCtlBlock *walrcb, char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
{
int startoff;
int byteswritten;
int nRetCode = 0;
Size write_bytes = nbytes;
while (nbytes > 0) {
int segbytes;
if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo)) {
bool use_existent = false;
* fsync() and close current file before we switch to next one. We
* would otherwise have to reopen this file to fsync it later
*/
if (recvFile >= 0) {
char xlogfname[MAXFNAMELEN];
* XLOG segment files will be re-read by recovery in startup
* process soon, so we don't advise the OS to release cache
* pages associated with the file like XLogFileClose() does.
*/
if (close(recvFile) != 0)
ereport(PANIC, (errcode_for_file_access(),
errmsg("could not close log file %s: %m", XLogFileNameP(tli, recvSegNo))));
* Create .done file forcibly to prevent the restored segment from
* being archived again later.
*/
XLogFileName(xlogfname, MAXFNAMELEN, recvFileTLI, recvSegNo);
XLogArchiveForceDone(xlogfname);
}
recvFile = -1;
XLByteToSeg(recptr, recvSegNo);
use_existent = true;
recvFile = XLogFileInit(recvSegNo, &use_existent, true);
recvFileTLI = tli;
recvOff = 0;
}
startoff = (int)(recptr % XLogSegSize);
if (startoff + nbytes > XLogSegSize)
segbytes = (int)(XLogSegSize - startoff);
else
segbytes = nbytes;
if (recvOff != (uint32)startoff) {
if (lseek(recvFile, (off_t)startoff, SEEK_SET) < 0)
ereport(PANIC, (errcode_for_file_access(), errmsg("could not seek in log file %s to offset %d: %m",
XLogFileNameP(tli, recvSegNo), startoff)));
recvOff = startoff;
}
errno = 0;
byteswritten = write(recvFile, buf, segbytes);
if (byteswritten <= 0) {
if (errno == 0)
errno = ENOSPC;
ereport(PANIC,
(errcode_for_file_access(), errmsg("could not write to log file %s at offset %u, length %lu: %m",
XLogFileNameP(tli, recvSegNo), recvOff, INT2ULONG(segbytes))));
}
if (recvOff == (uint32)0 && segbytes >= (int)sizeof(XLogPageHeaderData)) {
if (((XLogPageHeader)buf)->xlp_magic == XLOG_PAGE_MAGIC &&
(recvSegNo * XLogSegSize) != ((XLogPageHeader)buf)->xlp_pageaddr) {
ereport(PANIC, (errcode_for_file_access(),
errmsg("unexpected page addr %lu of log file %s", ((XLogPageHeader)buf)->xlp_pageaddr,
XLogFileNameP(tli, recvSegNo))));
}
}
XLByteAdvance(recptr, byteswritten);
recvOff += byteswritten;
nbytes -= byteswritten;
buf += byteswritten;
issue_xlog_fsync(recvFile, recvSegNo);
if (u_sess->attr.attr_common.update_process_title) {
char activitymsg[50];
nRetCode = snprintf_s(activitymsg, sizeof(activitymsg), sizeof(activitymsg) - 1,
"walrcvwriter streaming %X/%X", (uint32)(recptr >> 32), (uint32)recptr);
securec_check_ss(nRetCode, "\0", "\0");
set_ps_display(activitymsg, false);
}
ereport(DEBUG2, (errmsg("write xlog done: start %X/%X %lu bytes", (uint32)(recptr >> 32), (uint32)recptr,
write_bytes)));
}
}
static int RenameUwalFile(XLogRecPtr recptr, TimeLineID tli)
{
char path[MAXPGPATH];
char newpath[MAXPGPATH];
char filename[MAXFNAMELEN];
XLByteToSeg(recptr, recvSegNo);
recvFileTLI = tli;
XLogFileName(filename, MAXFNAMELEN, recvFileTLI, recvSegNo);
errno_t errorno = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", UWAL_XLOGDIR, filename);
securec_check_ss(errorno, "", "");
errorno = snprintf_s(newpath, MAXPGPATH, MAXPGPATH - 1, "%s/%s", SS_XLOGDIR, filename);
securec_check_ss(errorno, "", "");
if (rename(path, newpath) != 0) {
ereport(LOG, (errcode_for_file_access(), errmsg("could not rename old transaction log file \"%s\": %m", path)));
return -1;
}
return 0;
}
static int RemoveUwalFile(XLogRecPtr recptr, TimeLineID tli)
{
char path[MAXPGPATH];
char filename[MAXFNAMELEN];
XLByteToSeg(recptr, recvSegNo);
recvFileTLI = tli;
XLogFileName(filename, MAXFNAMELEN, recvFileTLI, recvSegNo);
errno_t errorno = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", UWAL_XLOGDIR, filename);
securec_check_ss(errorno, "", "");
if (remove(path) != 0) {
ereport(LOG, (errcode_for_file_access(), errmsg("could not rename old transaction log file \"%s\": %m", path)));
return -1;
}
return 0;
}
static int WalRcvUwalTruncate(WalRcvCtlBlock *walrcb, UwalrcvWriterState *uwalrcv, UwalInfo *info)
{
XLogRecPtr flushPtr = InvalidXLogRecPtr;
XLogRecPtr truncatePtr = InvalidXLogRecPtr;
XLogRecPtr startPtr = InvalidXLogRecPtr;
XLogRecPtr expectTruncate = InvalidXLogRecPtr;
bool needQuery = false;
bool fullSync = false;
int ret;
if (uwalrcv == NULL)
return -1;
SpinLockAcquire(&uwalrcv->mutex);
flushPtr = uwalrcv->flushPtr;
truncatePtr = uwalrcv->truncatePtr;
needQuery = uwalrcv->needQuery;
expectTruncate = uwalrcv->expectTruncate;
SpinLockRelease(&uwalrcv->mutex);
startPtr = flushPtr;
if (startPtr / XLogSegSize == info->info.startWriteOffset / XLogSegSize) {
return 0;
}
int loop = 0;
while (needQuery && loop++ < 2) {
ret = GsUwalQuery(&info->id, &(info->info));
if (ret != 0) {
ereport(WARNING, (errmsg("GsUwalQuery return failed")));
return ret;
}
if (truncatePtr != info->info.truncateOffset) {
SpinLockAcquire(&uwalrcv->mutex);
uwalrcv->truncatePtr = info->info.truncateOffset;
SpinLockRelease(&uwalrcv->mutex);
truncatePtr = info->info.truncateOffset;
if (info->info.truncateOffset == expectTruncate) {
SpinLockAcquire(&uwalrcv->mutex);
uwalrcv->needQuery = false;
SpinLockRelease(&uwalrcv->mutex);
needQuery = false;
break;
}
}
pg_usleep(100000);
}
struct timeval tv;
gettimeofday(&tv, NULL);
uint64_t elapsed = tv.tv_sec - uwalrcv->truncateTimeStamp;
if (needQuery || (startPtr / XLogSegSize == truncatePtr / XLogSegSize)
|| elapsed < g_instance.attr.attr_storage.uwal_truncate_interval) {
return 0;
}
SpinLockAcquire(&uwalrcv->mutex);
fullSync = uwalrcv->fullSync;
SpinLockRelease(&uwalrcv->mutex);
if(!fullSync) {
ereport(LOG, (errmsg("WalRcvUwalTruncate truncate not in fullSync")));
return 0;
}
START_CRIT_SECTION();
ret = GsUwalTruncate(&(info->id), startPtr);
gettimeofday(&tv, NULL);
uwalrcv->truncateTimeStamp = tv.tv_sec;
if (0 != ret) {
ereport(LOG, (errmsg("WalRcvUwalTruncate failed retCode: %d", ret)));
END_CRIT_SECTION();
return -1;
}
pg_usleep(10000);
ret = GsUwalQuery(&info->id, &(info->info));
if (ret != 0) {
ereport(WARNING, (errmsg("GsUwalQuery return failed")));
END_CRIT_SECTION();
return ret;
}
END_CRIT_SECTION();
SpinLockAcquire(&uwalrcv->mutex);
uwalrcv->truncatePtr = info->info.truncateOffset;
uwalrcv->expectTruncate = startPtr;
if (info->info.truncateOffset == startPtr) {
uwalrcv->needQuery = false;
} else {
uwalrcv->needQuery = true;
}
SpinLockRelease(&uwalrcv->mutex);
return 0;
}
static void RemoveFromUwal(UwalrcvWriterState *uwalrcv, bool needUpdate)
{
if (needUpdate) {
UpdateUwalRcv(uwalrcv, &t_thrd.xlog_cxt.uwalInfo);
}
XLogRecPtr truncatePtr = InvalidXLogRecPtr;
XLogRecPtr renamePtr = InvalidXLogRecPtr;
TimeLineID tli;
SpinLockAcquire(&uwalrcv->mutex);
truncatePtr = uwalrcv->truncatePtr;
tli = uwalrcv->startTimeLine;
renamePtr = uwalrcv->renamePtr;
SpinLockRelease(&uwalrcv->mutex);
while (truncatePtr / XLogSegSize != renamePtr / XLogSegSize) {
if (renamePtr / XLogSegSize != t_thrd.xlog_cxt.uwalInfo.info.startWriteOffset / XLogSegSize) {
if (RenameUwalFile(renamePtr, tli) < 0) {
ereport(LOG, (errmsg("RenameUwalFile failed")));
return;
}
} else {
if (RemoveUwalFile(renamePtr, tli) < 0) {
ereport(LOG, (errmsg("RemoveUwalFile failed")));
return;
}
}
if (t_thrd.postmaster_cxt.HaShmData->current_mode == NORMAL_MODE ||
t_thrd.postmaster_cxt.HaShmData->current_mode == PRIMARY_MODE) {
XlogArchUwal(renamePtr);
}
renamePtr = (renamePtr / XLogSegSize + 1) * XLogSegSize;
}
SpinLockAcquire(&uwalrcv->mutex);
uwalrcv->renamePtr = renamePtr;
SpinLockRelease(&uwalrcv->mutex);
return;
}
static void UpdateUwalRcv(UwalrcvWriterState *uwalrcv, UwalInfo *info)
{
XLogRecPtr truncatePtr = InvalidXLogRecPtr;
SpinLockAcquire(&uwalrcv->mutex);
truncatePtr = uwalrcv->truncatePtr;
SpinLockRelease(&uwalrcv->mutex);
int ret = GsUwalQuery(&info->id, &(info->info));
if (ret != 0) {
ereport(WARNING, (errmsg("GsUwalQuery return failed")));;
return;
}
if (truncatePtr != info->info.truncateOffset) {
SpinLockAcquire(&uwalrcv->mutex);
uwalrcv->truncatePtr = info->info.truncateOffset;
SpinLockRelease(&uwalrcv->mutex);
}
}
static int MoveNotManagedUwalFile(XLogRecPtr truncatePtr, XLogRecPtr startPtr, TimeLineID tli)
{
DIR* uwaldir = nullptr;
struct dirent* uwalde = nullptr;
uint32_t xlogFileNameLen = 24;
if ((uwaldir = opendir(UWAL_XLOGDIR)) != nullptr) {
while ((uwalde = readdir(uwaldir)) != nullptr) {
if (!(strlen(uwalde->d_name) == xlogFileNameLen &&
strspn(uwalde->d_name, "0123456789ABCDEF") == xlogFileNameLen)) {
continue;
}
TimeLineID xlogTli;
XLogSegNo xlogSegNo;
XLogFromFileName(uwalde->d_name, &xlogTli, &xlogSegNo);
if (tli != xlogTli) {
continue;
}
XLogRecPtr xlogPtr;
XLogSegNoOffsetToRecPtr(xlogSegNo, 0, xlogPtr);
if (xlogPtr / XLogSegSize >= truncatePtr / XLogSegSize || xlogPtr / XLogSegSize <= startPtr / XLogSegSize) {
continue;
}
char path[MAXPGPATH];
char newpath[MAXPGPATH];
errno_t errorno = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", UWAL_XLOGDIR, uwalde->d_name);
securec_check_ss(errorno, "", "");
errorno = snprintf_s(newpath, MAXPGPATH, MAXPGPATH - 1, "%s/%s", SS_XLOGDIR, uwalde->d_name);
securec_check_ss(errorno, "", "");
if (rename(path, newpath) != 0) {
ereport(LOG, (errcode_for_file_access(),
errmsg("Failed to rename old transaction log file \"%s\". The file may have been moved", path)));
} else {
ereport(LOG, (errmsg("MoveNotManagedUwalFile: moved uwal file %s", uwalde->d_name)));
}
}
(void)closedir(uwaldir);
}
return 0;
}
void MoveUwalFile(void)
{
UwalVector *vec = GsUwalGetInitInfo();
if (vec == nullptr) {
return;
}
for (uint32_t i = 0; i < vec->cnt; i++) {
XLogRecPtr truncatePtr = vec->uwals[i].info.truncateOffset;
XLogRecPtr startPtr = vec->uwals[i].info.startWriteOffset;
TimeLineID timeLine = vec->uwals[i].info.startTimeLine;
if (MoveNotManagedUwalFile(truncatePtr, startPtr, timeLine) != 0) {
break;
}
}
pfree(vec->uwals);
vec->uwals = nullptr;
pfree(vec);
vec = nullptr;
return;
}