* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* datarcvwriter.cpp
* functions for data receive management
*
*
* IDENTIFICATION
* src/gausskernel/storage/replication/datarcvwriter.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/xlogutils.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "catalog/catalog.h"
#include "catalog/pg_tablespace.h"
#include "replication/dataqueue.h"
#include "replication/datareceiver.h"
#include "replication/datasender.h"
#include "replication/walsender.h"
#include "storage/buf/bufmgr.h"
#include "storage/cu.h"
#include "storage/custorage.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/lmgr.h"
#include "storage/smgr/smgr.h"
#include "storage/smgr/relfilenode_hash.h"
#include "postmaster/alarmchecker.h"
#include "utils/guc.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/resowner.h"
#include "instruments/gs_stack.h"
#include "gssignal/gs_signal.h"
#define MAX_DUMMY_DATA_FILE (RELSEG_SIZE * BLCKSZ)
typedef enum {
NONEEXISTDATABASEDIR = 0,
EXISTDATABASEDIR,
DATABASEDIRCREATECANCEL
} DataBaseDirState;
static int dummy_data_writer_file_fd = -1;
static uint32 dummy_data_writer_file_offset = 0;
bool dummy_data_writer_use_file = false;
static void DataRcvWriterSigHupHandler(SIGNAL_ARGS);
static void DataRcvWriterQuickDie(SIGNAL_ARGS);
static void ReqShutdownHandler(SIGNAL_ARGS);
static void SetDataRcvWriterLatch(void);
static void EmptyDataRcvWriterLatch(void);
static void DummyStandbyDoDataWrite(char *buf, uint32 nbytes);
static void DataWriterHashCreate(void);
static bool DataWriterHashSearch(const RelFileNode &node, int attid, ForkNumber forkno, StorageEngine type,
HASHACTION action, Relation &reln, CUStorage *&cuStorage);
static void DataWriterHashRemove(bool flushdata);
static bool DatabaseHashSearch(Oid spcoid, Oid dboid);
static DataBaseDirState CheckDatabaseReady(Oid spcNode, Oid dbNode);
static void DummyStandbyDataRcvWrite(char *buf, uint32 nbytes);
static bool CanWriteBuffer(SMgrRelation smgr, ForkNumber forkNum, const char *path);
* Called when the DataRcvWriterMain is ending.
*/
static void ShutdownDataRcvWriter(int code, Datum arg)
{
LockErrorCleanup();
LockReleaseCurrentOwner();
EmptyDataRcvWriterLatch();
}
void DataRcvWriterMain(void)
{
sigjmp_buf localSigjmpBuf;
MemoryContext datarcvWriterContext;
t_thrd.xlog_cxt.InRecovery = true;
ereport(LOG, (errmsg("datarcvwriter thread started")));
* Reset some signals that are accepted by postmaster but not here
*/
(void)gspqsignal(SIGHUP, DataRcvWriterSigHupHandler);
(void)gspqsignal(SIGINT, SIG_IGN);
(void)gspqsignal(SIGTERM, ReqShutdownHandler);
(void)gspqsignal(SIGQUIT, DataRcvWriterQuickDie);
(void)gspqsignal(SIGALRM, SIG_IGN);
(void)gspqsignal(SIGPIPE, SIG_IGN);
(void)gspqsignal(SIGUSR1, SIG_IGN);
(void)gspqsignal(SIGUSR2, SIG_IGN);
(void)gspqsignal(SIGURG, print_stack);
* Reset some signals that are accepted by postmaster but not here
*/
(void)gspqsignal(SIGCHLD, SIG_DFL);
(void)gspqsignal(SIGTTIN, SIG_DFL);
(void)gspqsignal(SIGTTOU, SIG_DFL);
(void)gspqsignal(SIGCONT, SIG_DFL);
(void)gspqsignal(SIGWINCH, SIG_DFL);
(void)sigdelset(&t_thrd.libpq_cxt.BlockSig, SIGQUIT);
on_shmem_exit(ShutdownDataRcvWriter, 0);
* Create a resource owner to keep track of our resources (currently only
* buffer pins).
*/
t_thrd.utils_cxt.CurrentResourceOwner = ResourceOwnerCreate(NULL, "DataReceive Writer",
THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE));
* Create a memory context that we will do all our work in. We do this so
* that we can reset the context during error recovery and thereby avoid
* possible memory leaks. Formerly this code just ran in
* t_thrd.top_mem_cxt, but resetting that would be a really bad idea.
*/
datarcvWriterContext = AllocSetContextCreate(t_thrd.top_mem_cxt, "DataReceive Writer", ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
(void)MemoryContextSwitchTo(datarcvWriterContext);
if (dummyStandbyMode) {
InitDummyDataNum();
t_thrd.datarcvwriter_cxt.dummy_data_writer_file_num++;
dummy_data_writer_file_offset = 0;
}
* If an exception is encountered, processing resumes here.
*
* See notes in postgres.c about the design of this coding.
*/
int curTryCounter = 0;
int *oldTryCounter = NULL;
if (sigsetjmp(localSigjmpBuf, 1) != 0) {
gstrace_tryblock_exit(true, oldTryCounter);
t_thrd.log_cxt.error_context_stack = NULL;
HOLD_INTERRUPTS();
EmitErrorReport();
AbortAsyncListIO();
AtEOXact_SysDBCache(false);
* These operations are really just a minimal subset of
* AbortTransaction(). We don't have very many resources to worry
* about in pagewriter, but we do have LWLocks, buffers, and temp files.
*/
LWLockReleaseAll();
AbortBufferIO();
UnlockBuffers();
ResourceOwnerRelease(t_thrd.utils_cxt.CurrentResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, false, true);
AtEOXact_Buffers(false);
LockErrorCleanup();
LockReleaseCurrentOwner();
AtEOXact_Files();
AtEOXact_HashTables(false);
* Now return to normal top-level context and clear ErrorContext for
* next time.
*/
(void)MemoryContextSwitchTo(datarcvWriterContext);
FlushErrorState();
DataWriterHashRemove(false);
MemoryContextResetAndDeleteChildren(datarcvWriterContext);
RESUME_INTERRUPTS();
* Sleep at least 1 second after any error. A write error is likely
* to be repeated, and we don't want to be filling the error logs as
* fast as we can.
*/
pg_usleep(1000000L);
* Close all open files after any error. This is helpful on Windows,
* where holding deleted files open causes various strange errors.
* It's not clear we need it elsewhere, but shouldn't hurt.
*/
smgrcloseall();
}
oldTryCounter = gstrace_tryblock_entry(&curTryCounter);
t_thrd.log_cxt.PG_exception_stack = &localSigjmpBuf;
* Unblock signals (they were blocked when the postmaster forked us)
*/
gs_signal_setmask(&t_thrd.libpq_cxt.UnBlockSig, NULL);
(void)gs_signal_unblock_sigusr2();
* Use the recovery target timeline ID during recovery
*/
if (!RecoveryInProgress())
ereport(FATAL, (errmsg("cannot continue Data streaming, recovery has already ended")));
t_thrd.xlog_cxt.ThisTimeLineID = GetRecoveryTargetTLI();
* register procLatch to datarcv shared memory
*/
SetDataRcvWriterLatch();
* init a hash table to store the rel file node fd.
*/
DataWriterHashCreate();
pgstat_report_appname("Data Receive Writer");
pgstat_report_activity(STATE_IDLE, NULL);
* Loop forever
*/
for (;;) {
int rc;
ResetLatch(&t_thrd.proc->procLatch);
if (t_thrd.datarcvwriter_cxt.gotSIGHUP) {
t_thrd.datarcvwriter_cxt.gotSIGHUP = false;
ProcessConfigFile(PGC_SIGHUP);
}
while (!t_thrd.datarcvwriter_cxt.shutdownRequested && DataRcvWrite() > 0)
;
if (t_thrd.datarcvwriter_cxt.shutdownRequested) {
ereport(LOG, (errmsg("datarcvwriter thread shut down")));
* From here on, elog(ERROR) should end with exit(1), not send
* control back to the sigsetjmp block above
*/
u_sess->attr.attr_common.ExitOnAnyError = true;
proc_exit(0);
}
rc = WaitLatch(&t_thrd.proc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, (long)1000 );
if (rc & WL_POSTMASTER_DEATH) {
ereport(LOG, (errmsg("datarcvwriter thread shut down with exit code 1")));
gs_thread_exit(1);
}
}
}
static void DataRcvWriterSigHupHandler(SIGNAL_ARGS)
{
int saveErrno = errno;
t_thrd.datarcvwriter_cxt.gotSIGHUP = true;
if (t_thrd.proc) {
SetLatch(&t_thrd.proc->procLatch);
}
errno = saveErrno;
}
static void DataRcvWriterQuickDie(SIGNAL_ARGS)
{
gs_signal_setmask(&t_thrd.libpq_cxt.BlockSig, NULL);
* We DO NOT want to run proc_exit() callbacks -- we're here because
* shared memory may be corrupted, so we don't want to try to clean up our
* transaction. Just nail the windows shut and get out of town. Now that
* there's an atexit callback to prevent third-party code from breaking
* things by calling exit() directly, we have to reset the callbacks
* explicitly to make this work as intended.
*/
on_exit_reset();
* Note we do exit(2) not exit(0). This is to force the postmaster into a
* system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
* backend. This is necessary precisely because we don't clean up our
* shared memory state. (The "dead man switch" mechanism in pmsignal.c
* should ensure the postmaster sees this as a crash, too, but no harm in
* being doubly sure.)
*/
gs_thread_exit(2);
}
static void ReqShutdownHandler(SIGNAL_ARGS)
{
int saveErrno = errno;
t_thrd.datarcvwriter_cxt.shutdownRequested = true;
t_thrd.int_cxt.ProcDiePending = true;
if (t_thrd.proc) {
SetLatch(&t_thrd.proc->procLatch);
}
errno = saveErrno;
}
void SetDataRcvWriterPID(ThreadId tid)
{
volatile DataRcvData *datarcv = t_thrd.datareceiver_cxt.DataRcv;
SpinLockAcquire(&datarcv->mutex);
datarcv->writerPid = tid;
SpinLockRelease(&datarcv->mutex);
}
static void SetDataRcvWriterLatch(void)
{
volatile DataRcvData *datarcv = t_thrd.datareceiver_cxt.DataRcv;
SpinLockAcquire(&datarcv->mutex);
datarcv->datarcvWriterLatch = &t_thrd.proc->procLatch;
datarcv->writerPid = t_thrd.proc_cxt.MyProcPid;
SpinLockRelease(&datarcv->mutex);
}
static void EmptyDataRcvWriterLatch(void)
{
volatile DataRcvData *datarcv = t_thrd.datareceiver_cxt.DataRcv;
SpinLockAcquire(&datarcv->mutex);
datarcv->datarcvWriterLatch = NULL;
datarcv->writerPid = 0;
SpinLockRelease(&datarcv->mutex);
}
bool DataRcvWriterInProgress(void)
{
volatile DataRcvData *datarcv = t_thrd.datareceiver_cxt.DataRcv;
SpinLockAcquire(&datarcv->mutex);
if (datarcv->writerPid == 0) {
SpinLockRelease(&datarcv->mutex);
return false;
}
SpinLockRelease(&datarcv->mutex);
return true;
}
* Write data to disk of DummyStandby.
*/
static void DummyStandbyDoDataWrite(char *buf, uint32 nbytes)
{
uint32 sum = 0;
uint32 currentlen = 0;
char *caculatebuf = buf;
dummy_data_writer_use_file = true;
errno_t rc = 0;
while (nbytes > 0) {
rc = memcpy_s((void *)¤tlen, sizeof(uint32), caculatebuf, sizeof(uint32));
securec_check(rc, "", "");
if ((sum + currentlen) > (uint32)g_instance.attr.attr_storage.MaxSendSize * 1024) {
DummyStandbyDataRcvWrite(buf, (uint32)sum);
buf = caculatebuf;
sum = 0;
} else {
caculatebuf += currentlen;
sum += currentlen;
nbytes -= currentlen;
}
}
DummyStandbyDataRcvWrite(buf, (uint32)sum);
dummy_data_writer_use_file = false;
}
* Write data to disk, the data is acquire by GetFromDataQueue.
* 1. When enable_mix_replication is off, and Data sender/DataRcv started,
* we periodly Write old-format ROW_STORE and COLUMN_STORE data to disk
* 2. When enable_mix_replication is on, the data is from WalRcv, and written to
* disk by walrcvwriter, for 'd'message. In this case, we have skip the new-format
* data when call DoDataWrite
*/
uint32 DoDataWrite(char *buf, uint32 nbytes)
{
#define InvalidRelFileNode ((RelFileNode){ 0, 0, 0, -1})
RelFileNode curnode = InvalidRelFileNode;
RelFileNode prevnode = InvalidRelFileNode;
int curattid = -1;
int prevattid = -1;
DataElementHeaderData datahdr;
DataQueuePtr lastqueueoffset = { 0, 0 };
uint32 currentlen = 0;
int headerlen = sizeof(DataElementHeaderData);
Relation reln = NULL;
CUStorage *cuStorage = NULL;
errno_t errorno = EOK;
#ifdef DATA_DEBUG
pg_crc32 crc;
#endif
while (nbytes > 0) {
if (!g_instance.attr.attr_storage.enable_mix_replication) {
errorno = memcpy_s(¤tlen, sizeof(uint32), buf, sizeof(uint32));
securec_check(errorno, "", "");
buf += sizeof(uint32);
}
errorno = memcpy_s((void *)&datahdr, headerlen, buf, headerlen);
securec_check(errorno, "", "");
RelFileNodeCopy(curnode, datahdr.rnode, GETBUCKETID(datahdr.attid));
curattid = (int)GETATTID((uint32)datahdr.attid);
buf += headerlen;
if (!g_instance.attr.attr_storage.enable_mix_replication) {
Assert(currentlen == (sizeof(uint32) + (uint32)headerlen + datahdr.data_size));
if (currentlen != (sizeof(uint32) + (uint32)headerlen + datahdr.data_size)) {
ereport(PANIC, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("corrupt wal data write len %u bytes, "
"the expected write data_size %u",
currentlen, datahdr.data_size)));
}
}
ereport(DEBUG5, (errmsg("DoDataWrite write page: rnode[%u,%u,%u], blocknum[%u], "
"pageoffset[%lu], size[%u], queueoffset[%u/%u], attid[%d]",
datahdr.rnode.spcNode, datahdr.rnode.dbNode, datahdr.rnode.relNode,
datahdr.blocknum, datahdr.offset, datahdr.data_size,
datahdr.queue_offset.queueid, datahdr.queue_offset.queueoff,
(int)GETATTID((uint32)datahdr.attid))));
#ifdef DATA_DEBUG
INIT_CRC32(crc);
COMP_CRC32(crc, buf, datahdr.data_size);
FIN_CRC32(crc);
if (!EQ_CRC32(datahdr.data_crc, crc)) {
ereport(PANIC, (errmsg("writing incorrect data page checksum at: "
"rnode[%u,%u,%u], blocknum[%u], "
"pageoffset[%u], size[%u], queueoffset[%u/%u]",
datahdr.rnode.spcNode, datahdr.rnode.dbNode, datahdr.rnode.relNode,
datahdr.blocknum, datahdr.offset, datahdr.data_size,
datahdr.queue_offset.queueid, datahdr.queue_offset.queueoff)));
}
#endif
if (g_instance.attr.attr_storage.enable_mix_replication && ROW_STORE == datahdr.type) {
Assert(false);
ereport(PANIC, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("The Row Store Heap Log SHOULD NOT BE synchronized in the WAL Streaming. "
"Tracking the data header info: "
"rnode[%u/%u/%u] blocknum[%u] pageoffset[%lu] "
"size[%u] queueoffset[%u/%u].",
datahdr.rnode.spcNode, datahdr.rnode.dbNode, datahdr.rnode.relNode,
datahdr.blocknum, datahdr.offset, datahdr.data_size,
datahdr.queue_offset.queueid, datahdr.queue_offset.queueoff)));
return nbytes;
}
if ((memcmp(&curnode, &prevnode, sizeof(RelFileNode)) != 0) || (curattid != prevattid)) {
prevnode = curnode;
prevattid = curattid;
if (!DataWriterHashSearch(curnode, curattid, MAIN_FORKNUM, datahdr.type, HASH_FIND, reln, cuStorage)) {
* Before we open the smgr, we should insure that the database
* directory of current relation is ready based on previous write.
* If it's not ready, wait for recovery to create.
*/
if ((curnode.spcNode != GLOBALTABLESPACE_OID) && !DatabaseHashSearch(curnode.spcNode, curnode.dbNode)) {
DataBaseDirState databaseDirState = CheckDatabaseReady(curnode.spcNode, curnode.dbNode);
if (databaseDirState == DATABASEDIRCREATECANCEL) {
* when enable_mix_replcation is off,
* Cancel has been requested, give up the following write.
* when enable_mix_replcation is on, ERROR
*/
if (!g_instance.attr.attr_storage.enable_mix_replication) {
WakeupDataRecovery();
} else {
ereport(ERROR, (errmsg("Failed to write the wal data: the database path %s doesn't exist.",
GetDatabasePath(curnode.dbNode, curnode.spcNode))));
}
break;
} else if (databaseDirState == NONEEXISTDATABASEDIR) {
buf += datahdr.data_size;
nbytes -= currentlen;
continue;
} else {
;
}
}
if (ROW_STORE == datahdr.type) {
reln = CreateFakeRelcacheEntry(curnode);
RelationOpenSmgr(reln);
DataWriterHashSearch(curnode, curattid, MAIN_FORKNUM, ROW_STORE, HASH_ENTER, reln, cuStorage);
} else if (COLUMN_STORE == datahdr.type) {
CFileNode cFileNode(curnode);
cFileNode.m_forkNum = MAIN_FORKNUM;
cFileNode.m_attid = curattid;
cuStorage = New(CurrentMemoryContext) CUStorage(cFileNode);
DataWriterHashSearch(curnode, curattid, MAIN_FORKNUM, COLUMN_STORE, HASH_ENTER, reln, cuStorage);
}
}
}
* Row store write data page by Buffer Manager API
*
* Lock relation for extension is nessesary for write received pages.
* We use this lock in reader, this doesn't comply with the principle
* of lock relation for extension, then read buffer, then lock buffer,
* then release relation extension. Cause standby server doesn't vacuum
* the pages, it's ok.
*/
if (ROW_STORE == datahdr.type) {
Buffer buffer;
Page page;
SMgrRelation smgr;
char* path = NULL;
#ifdef ENABLE_MULTIPLE_NODES
LockRelFileNode(curnode, AccessShareLock);
#endif
smgr = smgropen(curnode, InvalidBackendId);
path = relpath(smgr->smgr_rnode, MAIN_FORKNUM);
retry:
if (CanWriteBuffer(smgr, MAIN_FORKNUM, path) == true) {
* We use RBM_ZERO_ON_ERROR rather than RBM_ZERO_AND_LOCK here
* 'cause if the page has not been loaded into buffer, buffer
* manager would just allocate a zero buffer rather than read the
* page from smgr. See more in ReadBuffer_common() comments inside.
*/
buffer = XLogReadBufferExtended(curnode, MAIN_FORKNUM, datahdr.blocknum, RBM_ZERO_ON_ERROR, NULL);
Assert(BufferIsValid(buffer));
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
page = (Page)BufferGetPage(buffer);
XLogRecPtr local_lsn;
XLogRecPtr peer_lsn;
local_lsn = PageGetLSN(page);
peer_lsn = PageGetLSN((Page)buf);
* In data page replicated scenes, maybe appear an old page to cover
* an new page. Explame for two scenes:
* 1: when standby is crash, page A will be copyed to dummystandby,
* then dummystandby is crash, stanby is ok, and primary update
* A to A', standby will get A', then primary is crash, stanby
* will failover, it will get page A to cover the page A'.
* 2: when session1 is copying page A to table t1; session2 is update
* page A to A'; standby redo the A' first, then write the A, and
* A will cover A'.
*/
if (XLByteLT(peer_lsn, local_lsn))
;
else {
errno_t rc = 0;
rc = memcpy_s(page, BLCKSZ, buf, datahdr.data_size);
securec_check(rc, "", "");
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
} else {
if (CheckFileExists(path) == FILE_EXIST) {
goto retry;
}
ereport(WARNING, (errmsg("HA-DoDataWrite: No File Write(file not exists), rnode %u/%u/%u, blockno %u ",
curnode.spcNode, curnode.dbNode, curnode.relNode,
datahdr.blocknum)));
}
#ifdef ENABLE_MULTIPLE_NODES
UnlockRelFileNode(curnode, AccessShareLock);
#endif
pfree_ext(path);
if (u_sess->attr.attr_storage.HaModuleDebug) {
ereport(LOG, (errmsg("HA-DoDataWrite: rnode %u/%u/%u, blockno %u ", curnode.spcNode, curnode.dbNode,
curnode.relNode, datahdr.blocknum)));
}
} else if (COLUMN_STORE == datahdr.type) {
Assert(cuStorage->m_cnode.m_attid == GETATTID(datahdr.attid));
if (u_sess->attr.attr_storage.HaModuleDebug) {
int align_size = CUAlignUtils::GetCuAlignSizeColumnId(datahdr.attid);
check_cu_block(buf, datahdr.data_size, align_size);
ereport(LOG, (errmsg("HA-DoDataWrite: rnode %u/%u/%u, col %u, "
"blockno %lu, cuUnitCount %u",
datahdr.rnode.spcNode, datahdr.rnode.dbNode, datahdr.rnode.relNode,
GETATTID((uint)datahdr.attid), datahdr.offset / align_size,
datahdr.data_size / align_size)));
}
#ifdef ENABLE_MULTIPLE_NODES
LockRelFileNode(curnode, AccessShareLock);
#endif
cuStorage->SaveCU(buf, datahdr.offset, datahdr.data_size, false);
UnlockRelFileNode(curnode, AccessShareLock);
}
buf += datahdr.data_size;
if (!g_instance.attr.attr_storage.enable_mix_replication) {
nbytes -= currentlen;
lastqueueoffset = datahdr.queue_offset;
volatile DataRcvData *datarcv = t_thrd.datareceiver_cxt.DataRcv;
ereport(DEBUG5, (errmsg("recwriter: receive: %u/%u, local: %u/%u, offset %u/%u",
datarcv->receivePosition.queueid, datarcv->receivePosition.queueoff,
datarcv->localWritePosition.queueid, datarcv->localWritePosition.queueoff,
datahdr.queue_offset.queueid, datahdr.queue_offset.queueoff)));
WakeupDataRecovery();
} else
nbytes -= ((uint32)headerlen + datahdr.data_size);
}
DataWriterHashRemove(true);
if (!g_instance.attr.attr_storage.enable_mix_replication) {
volatile DataRcvData *datarcv = t_thrd.datareceiver_cxt.DataRcv;
SpinLockAcquire(&datarcv->mutex);
datarcv->receivePosition.queueid = lastqueueoffset.queueid;
datarcv->receivePosition.queueoff = lastqueueoffset.queueoff;
SpinLockRelease(&datarcv->mutex);
}
return nbytes;
}
* Write data to disk.
*/
int DataRcvWrite(void)
{
volatile DataRcvData *datarcv = t_thrd.datareceiver_cxt.DataRcv;
char *writeBuf = NULL;
uint32 nbytes = 0;
uint32 remainbytes = 0;
DataQueuePtr curstartpos;
DataQueuePtr curendpos;
START_CRIT_SECTION();
LWLockAcquire(RcvWriteLock, LW_EXCLUSIVE);
SpinLockAcquire(&datarcv->mutex);
curstartpos.queueid = datarcv->localWritePosition.queueid;
curstartpos.queueoff = datarcv->localWritePosition.queueoff;
SpinLockRelease(&datarcv->mutex);
nbytes = GetFromDataQueue(writeBuf, g_instance.attr.attr_storage.DataQueueBufSize * 1024, curstartpos, curendpos,
true, t_thrd.dataqueue_cxt.DataWriterQueue);
if (nbytes == 0) {
LWLockRelease(RcvWriteLock);
END_CRIT_SECTION();
return 0;
}
if (dummyStandbyMode)
DummyStandbyDoDataWrite(writeBuf, nbytes);
else
remainbytes = DoDataWrite(writeBuf, nbytes);
smgrcloseall();
nbytes -= remainbytes;
SpinLockAcquire(&datarcv->mutex);
datarcv->localWritePosition.queueid = curstartpos.queueid;
datarcv->localWritePosition.queueoff = curstartpos.queueoff;
DQByteAdvance(datarcv->localWritePosition, nbytes);
SpinLockRelease(&datarcv->mutex);
PopFromDataQueue(((DataRcvData *)datarcv)->localWritePosition, t_thrd.dataqueue_cxt.DataWriterQueue);
LWLockRelease(RcvWriteLock);
END_CRIT_SECTION();
return nbytes;
}
static void DataWriterHashCreate(void)
{
if (t_thrd.datarcvwriter_cxt.data_writer_rel_tab == NULL) {
HASHCTL ctl;
errno_t rc = 0;
rc = memset_s(&ctl, sizeof(ctl), 0, sizeof(ctl));
securec_check(rc, "", "");
ctl.keysize = sizeof(data_writer_rel_key);
ctl.entrysize = sizeof(data_writer_rel);
ctl.hash = DataWriterRelKeyHash;
ctl.match = DataWriterRelKeyMatch;
t_thrd.datarcvwriter_cxt.data_writer_rel_tab = hash_create("data writer rel table", 100, &ctl,
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE);
} else
return;
}
static bool DataWriterHashSearch(const RelFileNode &node, int attid, ForkNumber forkno, StorageEngine type,
HASHACTION action, Relation &reln, CUStorage *&cuStorage)
{
data_writer_rel_key key;
data_writer_rel *hentry = NULL;
bool found = false;
errno_t rc = 0;
rc = memset_s(&key, sizeof(data_writer_rel_key), 0, sizeof(data_writer_rel_key));
securec_check(rc, "", "");
key.node = node;
key.forkno = forkno;
key.attid = attid;
key.type = type;
if (t_thrd.datarcvwriter_cxt.data_writer_rel_tab == NULL) {
DataWriterHashCreate();
}
hentry = (data_writer_rel *)hash_search(t_thrd.datarcvwriter_cxt.data_writer_rel_tab, (void *)&key, action, &found);
if (action == HASH_FIND) {
if (found) {
reln = hentry->reln;
cuStorage = hentry->cuStorage;
}
}
if (action == HASH_ENTER) {
Assert(!found);
if (!found) {
hentry->reln = reln;
hentry->cuStorage = cuStorage;
}
}
return found;
}
static bool DatabaseHashSearch(Oid spcoid, Oid dboid)
{
HASH_SEQ_STATUS status;
data_writer_rel *hentry = NULL;
if (t_thrd.datarcvwriter_cxt.data_writer_rel_tab == NULL)
return false;
hash_seq_init(&status, t_thrd.datarcvwriter_cxt.data_writer_rel_tab);
while ((hentry = (data_writer_rel *)hash_seq_search(&status)) != NULL) {
if (hentry->key.node.spcNode == spcoid && hentry->key.node.dbNode == dboid) {
hash_seq_term(&status);
return true;
}
}
return false;
}
* Remove relations in current data writer hash table, flush data if requested.
* We use the hash table to cach the relation info we are handling to reduce the
* overhead of IO.
* Call the function once, after we handle all the data in data writer queue.
*/
static void DataWriterHashRemove(bool flushdata)
{
#define ERRORDATA_FLUSH_NUM 5
HASH_SEQ_STATUS status;
data_writer_rel *hentry = NULL;
Relation reln;
CUStorage *cuStorage = NULL;
if (t_thrd.datarcvwriter_cxt.data_writer_rel_tab == NULL)
return;
* After flush data failed five times, maybe the storage has damage,
* In order to prevent operation hang up, the process will PANIC.
*/
if (t_thrd.datarcvwriter_cxt.dataRcvWriterFlushPageErrorCount++ >= ERRORDATA_FLUSH_NUM)
ereport(PANIC, (errmsg_internal("ERRORDATA_FLUSH_NUM exceeded")));
hash_seq_init(&status, t_thrd.datarcvwriter_cxt.data_writer_rel_tab);
while ((hentry = (data_writer_rel *)hash_seq_search(&status)) != NULL) {
if (hentry->key.type == ROW_STORE && !g_instance.attr.attr_storage.enable_mix_replication) {
reln = hentry->reln;
if (flushdata) {
char* path = relpath(reln->rd_smgr->smgr_rnode, MAIN_FORKNUM);
#ifdef ENABLE_MULTIPLE_NODES
LockRelFileNode(reln->rd_node, AccessExclusiveLock);
#endif
if (smgrexists(reln->rd_smgr, MAIN_FORKNUM) && CheckFileExists(path) == FILE_EXIST) {
FlushRelationBuffers(reln);
smgrimmedsync(reln->rd_smgr, MAIN_FORKNUM);
} else {
ereport(WARNING, (errmsg("HA-DataWriterHashRemove: No File SYNC, rnode %u/%u/%u dose not exists",
reln->rd_node.spcNode, reln->rd_node.dbNode, reln->rd_node.relNode)));
}
#ifdef ENABLE_MULTIPLE_NODES
UnlockRelFileNode(reln->rd_node, AccessExclusiveLock);
#endif
pfree_ext(path);
}
RelationCloseSmgr(reln);
FreeFakeRelcacheEntry(reln);
} else if (hentry->key.type == COLUMN_STORE) {
cuStorage = hentry->cuStorage;
if (cuStorage != NULL) {
if (flushdata) {
#ifdef ENABLE_MULTIPLE_NODES
LockRelFileNode(hentry->key.node, AccessShareLock);
#endif
* unlink/truncate the cu file begin at 0, so we just check
* whether the xxx_Cx.0 is exist.
*/
if (cuStorage->IsDataFileExist(0)) {
cuStorage->FlushDataFile();
}
#ifdef ENABLE_MULTIPLE_NODES
UnlockRelFileNode(hentry->key.node, AccessShareLock);
#endif
if (u_sess->attr.attr_storage.HaModuleDebug)
ereport(LOG,
(errmsg("HA-DataWriterHashRemove: rnode %u/%u/%u, col %d", hentry->key.node.spcNode,
hentry->key.node.dbNode, hentry->key.node.relNode, hentry->key.attid)));
}
DELETE_EX(cuStorage);
}
}
hash_search(t_thrd.datarcvwriter_cxt.data_writer_rel_tab, (void *)&hentry->key, HASH_REMOVE, NULL);
}
if (flushdata)
t_thrd.datarcvwriter_cxt.dataRcvWriterFlushPageErrorCount = 0;
}
* We only handle path in pg_tblspc/. Wait until the tablepsace been redo
* If the spcNode we got from the tablespace path does not exist. If the spcNode
* is not a link, throw a error here.
*/
static DataBaseDirState CheckDatabaseReady(Oid spcNode, Oid dbNode)
{
char *dbpath = NULL;
struct stat st;
int nRet = 0;
DataBaseDirState dbDirState = NONEEXISTDATABASEDIR;
dbpath = GetDatabasePath(dbNode, spcNode);
pg_memory_barrier();
Assert(OidIsValid(spcNode));
retry:
if (stat(dbpath, &st) < 0) {
pg_memory_barrier();
if (t_thrd.xlog_cxt.RedoDone) {
dbDirState = NONEEXISTDATABASEDIR;
ereport(LOG, (errmsg("skip the dropped database directory")));
return dbDirState;
}
if (errno == ENOENT) {
#if defined(HAVE_READLINK) || defined(WIN32)
char tbpath[64] = {0};
char linkpath[MAXPGPATH] = {0};
int rllen = 0;
if (spcNode == GLOBALTABLESPACE_OID || spcNode == DEFAULTTABLESPACE_OID)
goto invalid_handle;
nRet = snprintf_s(tbpath, sizeof(tbpath), 63, "pg_tblspc/%u", spcNode);
securec_check_ss(nRet, "", "");
rllen = readlink(tbpath, linkpath, sizeof(linkpath));
bool b_einval = rllen < 0 && (errno == EINVAL || errno == ENOTDIR);
bool b_enoent = rllen < 0 && errno == ENOENT;
if (b_einval) {
pfree(dbpath);
dbpath = NULL;
ereport(PANIC, (errmsg("could not read symbolic link \"%s\": %m", tbpath)));
} else if (b_enoent) {
ereport(DEBUG3, (errmsg("sleep a while waiting for tablespace \"%s\" ready", tbpath)));
goto invalid_handle;
} else if (rllen < 0) {
pfree(dbpath);
dbpath = NULL;
ereport(ERROR, (errcode_for_file_access(), errmsg("invalid tablespace link %s: %s", tbpath, TRANSLATE_ERRNO)));
} else if (rllen >= (int)sizeof(linkpath)) {
pfree(dbpath);
dbpath = NULL;
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("symbolic link \"%s\" target is too long", tbpath)));
} else {
linkpath[rllen] = '\0';
if (stat(linkpath, &st) < 0 || !S_ISDIR(st.st_mode))
ereport(PANIC, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("invalid tablespace directory %s: %s", tbpath, TRANSLATE_ERRNO)));
goto invalid_handle;
}
#else
* If the platform does not have symbolic links, it should not be
* possible to have tablespaces - clearly somebody else created
* them. Warn about it and ignore.
*/
pfree(dbpath);
dbpath = NULL;
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("tablespaces are not supported on this platform")));
#endif
} else {
pfree(dbpath);
dbpath = NULL;
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"database %u/%u\" invalid directory : %m", spcNode, dbNode)));
}
} else {
pfree(dbpath);
dbpath = NULL;
if (!S_ISDIR(st.st_mode))
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"database %u/%u\" exists but is not a directory", spcNode, dbNode)));
dbDirState = EXISTDATABASEDIR;
return dbDirState;
}
invalid_handle:
if ((t_thrd.int_cxt.ProcDiePending || t_thrd.proc_cxt.proc_exit_inprogress) &&
!t_thrd.datarcvwriter_cxt.AmDataReceiverForDummyStandby) {
ereport(WARNING, (errcode(ERRCODE_ADMIN_SHUTDOWN), errmsg("canceling the wait for database directory \"%s\" "
"being created in recovery",
dbpath)));
pfree(dbpath);
dbpath = NULL;
dbDirState = DATABASEDIRCREATECANCEL;
return dbDirState;
}
ereport(LOG, (errmsg("waiting for database directory \"%s\" being created in recovery", dbpath)));
pg_usleep(5000000L);
goto retry;
}
static void DummyStandbyDataRcvWrite(char *buf, uint32 nbytes)
{
ssize_t byteswritten;
char path[MAXPGPATH] = {0};
volatile DataRcvData *datarcv = t_thrd.datareceiver_cxt.DataRcv;
DataElementHeaderData dataehdr;
uint32 currentlen = 0;
int headerlen = sizeof(DataElementHeaderData);
uint32 caculatebytes = nbytes;
char *caculatebuf = buf;
int nRet = 0;
* get the file num to store:
* 1. If MAX_DUMMY_DATA_FILE <= dummy_data_writer_file_offset, open
* the next file to store
* 2. If the current file can not store the nbytes, discard the current file space,
* open the next file to store.
*/
if (dummy_data_writer_file_offset >= MAX_DUMMY_DATA_FILE ||
(MAX_DUMMY_DATA_FILE - dummy_data_writer_file_offset < (uint32)(nbytes + sizeof(nbytes)))) {
ereport(DEBUG2, (errmsg("data file num %u", t_thrd.datarcvwriter_cxt.dummy_data_writer_file_num)));
t_thrd.datarcvwriter_cxt.dummy_data_writer_file_num++;
dummy_data_writer_file_offset = 0;
if (dummy_data_writer_file_fd >= 0) {
close(dummy_data_writer_file_fd);
dummy_data_writer_file_fd = -1;
}
}
if (dummy_data_writer_file_fd < 0) {
nRet = snprintf_s(path, sizeof(path), MAXPGPATH - 1, "base/dummy_standby/%u",
t_thrd.datarcvwriter_cxt.dummy_data_writer_file_num);
securec_check_ss_c(nRet, "", "");
dummy_data_writer_file_fd = open(path, O_RDWR | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (dummy_data_writer_file_fd < 0)
ereport(PANIC, (errcode_for_file_access(),
errmsg("could not create data file \"%s\", dummy_data_writer_file_fd=%d: %m", path,
dummy_data_writer_file_fd)));
}
while (caculatebytes > 0) {
errno_t rc = 0;
rc = memcpy_s((void *)¤tlen, sizeof(uint32), caculatebuf, sizeof(uint32));
securec_check(rc, "", "");
caculatebuf += sizeof(uint32);
rc = memcpy_s((void *)&dataehdr, headerlen, caculatebuf, headerlen);
securec_check(rc, "", "");
caculatebuf += headerlen;
caculatebuf += dataehdr.data_size;
caculatebytes -= currentlen;
}
errno = 0;
byteswritten = write(dummy_data_writer_file_fd, &nbytes, sizeof(uint32));
if (byteswritten < (ssize_t)sizeof(uint32)) {
if (errno == 0) {
errno = ENOSPC;
}
ereport(PANIC, (errcode_for_file_access(),
errmsg("could not write to data file %s buffer len %u, length %u: %s", path, nbytes, nbytes, TRANSLATE_ERRNO)));
}
errno = 0;
byteswritten = write(dummy_data_writer_file_fd, buf, nbytes);
if (byteswritten < nbytes) {
if (errno == 0)
errno = ENOSPC;
ereport(PANIC, (errcode_for_file_access(), errmsg("could not write to data file %s "
"at offset %u, length %u: %m",
path, (uint32)dummy_data_writer_file_offset, nbytes)));
}
dummy_data_writer_file_offset = dummy_data_writer_file_offset + nbytes + sizeof(nbytes);
if (u_sess->attr.attr_storage.HaModuleDebug) {
ereport(LOG,
(errmsg("HA-DummyStandbyDataRcvWrite: fileno %u, nbytes %u, queueoffset %u/%u,"
" dummy_data_writer_file_offset %u, dummy_data_writer_file_fd %d",
t_thrd.datarcvwriter_cxt.dummy_data_writer_file_num, nbytes, dataehdr.queue_offset.queueid,
dataehdr.queue_offset.queueoff, dummy_data_writer_file_offset, dummy_data_writer_file_fd)));
}
if (pg_fdatasync(dummy_data_writer_file_fd) != 0)
ereport(PANIC, (errcode_for_file_access(),
errmsg("could not fdatasync data file num %u, fd %d: %m",
t_thrd.datarcvwriter_cxt.dummy_data_writer_file_num, dummy_data_writer_file_fd)));
SpinLockAcquire(&datarcv->mutex);
datarcv->receivePosition.queueid = dataehdr.queue_offset.queueid;
datarcv->receivePosition.queueoff = dataehdr.queue_offset.queueoff;
SpinLockRelease(&datarcv->mutex);
}
* close data fd for rm data file
*/
extern void CloseDataFile(void)
{
close(dummy_data_writer_file_fd);
dummy_data_writer_file_fd = -1;
dummy_data_writer_file_offset = 0;
}
void InitDummyDataNum(void)
{
DIR *dir = NULL;
struct dirent *de = NULL;
int max_num_file = 0;
int min_num_file = 0;
char *dirpath = DUMMY_STANDBY_DATADIR;
errno = 0;
dir = AllocateDir(dirpath);
if (dir == NULL && errno == ENOENT) {
if (mkdir(dirpath, S_IRWXU) < 0) {
ereport(ERROR, (errcode_for_file_access(), errmsg("could not create directory \"%s\": %m", dirpath)));
}
dir = AllocateDir(dirpath);
}
if (dir == NULL) {
ereport(ERROR, (errcode_for_file_access(), errmsg("could not open directory \"%s\": %m", dirpath)));
return;
}
while ((de = ReadDir(dir, dirpath)) != NULL) {
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
continue;
min_num_file = atoi(de->d_name);
break;
}
FreeDir(dir);
dir = AllocateDir(dirpath);
while ((de = ReadDir(dir, dirpath)) != NULL) {
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
continue;
max_num_file = Max(atoi(de->d_name), max_num_file);
min_num_file = Min(atoi(de->d_name), min_num_file);
ereport(DEBUG5, (errmsg("InitDummyDataNum de->d_name=%s; max_num_path=%d; min_num_file=%d.", de->d_name,
max_num_file, min_num_file)));
}
t_thrd.datarcvwriter_cxt.dummy_data_writer_file_num = max_num_file;
t_thrd.datasender_cxt.dummy_data_read_file_num = min_num_file;
FreeDir(dir);
}
static bool CanWriteBuffer(SMgrRelation smgr, ForkNumber forkNum, const char *path)
{
if (CheckFileExists(path) == FILE_EXIST) {
return true;
} else {
* In the case that the physical file is unlink and the fd of file is still held by other thread, the datawriter
* thead can not write data to shared buffer. Thus, it may dirty the shared buffer and the pendingOpsTable hash
* table, and it may cause coredump when doing checkpoint due to unlinked file.
*/
if ((smgr->md_fd[forkNum] == NULL) && (FdRefcntIsZero(smgr, forkNum))) {
return true;
} else {
return false;
}
}
}