* Portions Copyright (c) 2021 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 2021, openGauss Contributors
*
* shared_storage_walreceiver.cpp
*
* Description: This file contains the shared_storage-specific parts of walreceiver. It's
* loaded as a dynamic module to avoid linking the main server binary with
* obs.
*
* IDENTIFICATION
* src/gausskernel/storage/replication/shared_storage_walreceiver.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include <sys/time.h>
#include "replication/archive_walreceiver.h"
#include "libpq/libpq-int.h"
#include "access/xlog.h"
#include "access/xlog_internal.h"
#include "nodes/pg_list.h"
#include "access/obs/obs_am.h"
#include "utils/timestamp.h"
#include "miscadmin.h"
#include "replication/walreceiver.h"
#include "replication/shared_storage_walreceiver.h"
#include "replication/ss_disaster_cluster.h"
#include "replication/slot.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/spin.h"
#include "utils/guc.h"
#include "pgxc/pgxc.h"
static const int READ_MSG_RATIO = 5;
bool SimpleCheckBlockheader(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, char *buff, int* size)
{
int checkSize = 0;
do {
Size validSize = SimpleValidatePage(xlogreader, targetPagePtr, buff);
if (validSize < XLOG_BLCKSZ) {
checkSize += validSize;
break;
}
targetPagePtr += XLOG_BLCKSZ;
buff += XLOG_BLCKSZ;
checkSize += XLOG_BLCKSZ;
} while (checkSize < *size);
if (checkSize < *size) {
*size = checkSize;
}
return (checkSize > 0);
}
bool SharedStorageXlogReadCheck(XLogReaderState *xlogreader, XLogRecPtr readEnd, XLogRecPtr readPageStart,
char *localBuff, int *readLen)
{
int diffLen;
Assert((readPageStart % XLOG_BLCKSZ) == 0);
XLogRecPtr alignReadEnd = readPageStart - readPageStart % ShareStorageBufSize + ShareStorageBufSize;
if (alignReadEnd > readEnd) {
XLogRecPtr ActualCopyEnd = TYPEALIGN(XLOG_BLCKSZ, readEnd);
diffLen = static_cast<int>(ActualCopyEnd - readPageStart);
} else {
diffLen = static_cast<int>(alignReadEnd - readPageStart);
}
bool readResult = false;
int readBytes = ReadXlogFromShareStorage(readPageStart, localBuff,
(int)TYPEALIGN(g_instance.xlog_cxt.shareStorageopCtl.blkSize, diffLen));
if (readBytes > 0) {
*readLen = (int)((XLogRecPtr)readBytes > (readEnd - readPageStart)) ? (readEnd - readPageStart) : readBytes;
readResult = SimpleCheckBlockheader(xlogreader, readPageStart, localBuff, readLen);
} else {
*readLen = 0;
ereport(FATAL,
(errcode(ERRCODE_INVALID_STATUS),
errmsg("read zero length of xlog from shared storage startlsn : %lx, readlen :%d, inserthead :%lx",
readPageStart, diffLen, readEnd)));
}
return readResult;
}
bool shared_storage_xlog_read(int timeout, unsigned char *type, char **buffer, int *len, bool isStopping)
{
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
TimestampTz start_time = GetCurrentTimestamp();
XLogRecPtr start_lsn = 0;
uint32 disable_connection_node =
pg_atomic_read_u32(&g_instance.comm_cxt.localinfo_cxt.need_disable_connection_node);
uint32 shared_storage_recover_done =
pg_atomic_read_u32(&t_thrd.walreceiverfuncs_cxt.WalRcv->rcvDoneFromShareStorage);
if (disable_connection_node && shared_storage_recover_done) {
return false;
}
SpinLockAcquire(&walrcv->mutex);
if (walrcv->isFirstTimeAccessStorage) {
start_lsn = walrcv->receiveStart;
walrcv->isFirstTimeAccessStorage = false;
SpinLockRelease(&walrcv->mutex);
} else {
SpinLockRelease(&walrcv->mutex);
SpinLockAcquire(&t_thrd.walreceiver_cxt.walRcvCtlBlock->mutex);
start_lsn = t_thrd.walreceiver_cxt.walRcvCtlBlock->receivePtr;
SpinLockRelease(&t_thrd.walreceiver_cxt.walRcvCtlBlock->mutex);
}
ShareStorageXLogCtl *ctlInfo = g_instance.xlog_cxt.shareStorageXLogCtl;
ReadShareStorageCtlInfo(ctlInfo);
if (XLByteLE(ctlInfo->insertHead, start_lsn)) {
return false;
}
int read_len = 0;
char *local_buff = t_thrd.libwalreceiver_cxt.shared_storage_read_buf;
XLogRecPtr page_off = start_lsn % XLOG_BLCKSZ;
XLogRecPtr page_lsn = start_lsn - page_off;
XLogReaderState *xlogreader = t_thrd.libwalreceiver_cxt.xlogreader;
do {
read_len = 0;
if (XLByteLT(page_lsn, ctlInfo->insertTail)) {
ha_set_rebuild_connerror(WALSEGMENT_REBUILD, REPL_INFO_ERROR);
ereport(ERROR,
(errcode(ERRCODE_INVALID_STATUS), errmsg("xlog in shared storage is overwriten by primary node")));
}
ReadShareStorageCtlInfo(ctlInfo);
uint32 lastTerm = pg_atomic_read_u32(&t_thrd.walreceiverfuncs_cxt.WalRcv->shareStorageTerm);
if (ctlInfo->term > lastTerm ||
(ctlInfo->xlogFileSize != (uint64)g_instance.attr.attr_storage.xlog_file_size &&
IS_SHARED_STORAGE_STANDBY_CLUSTER_STANDBY_MODE)) {
t_thrd.walreceiver_cxt.termChanged = true;
if (isStopping)
return false;
else {
ereport(ERROR, (errcode(ERRCODE_INVALID_STATUS),
errmsg("the term(%u:%u) or xlog file size(%lu:%lu) on shared storage is changed", ctlInfo->term,
lastTerm, ctlInfo->xlogFileSize, (uint64)g_instance.attr.attr_storage.xlog_file_size)));
}
}
if (SharedStorageXlogReadCheck(xlogreader, ctlInfo->insertHead, page_lsn, local_buff, &read_len)) {
break;
}
uint32 disable_connection_node =
pg_atomic_read_u32(&g_instance.comm_cxt.localinfo_cxt.need_disable_connection_node);
if (!disable_connection_node) {
const long sleeptime = 1000;
pg_usleep(sleeptime);
}
} while (ComputeTimeStamp(start_time) < timeout);
if (read_len <= 0 || read_len <= static_cast<int>(page_off)) {
uint32 disable_connection_node =
pg_atomic_read_u32(&g_instance.comm_cxt.localinfo_cxt.need_disable_connection_node);
if (disable_connection_node)
pg_atomic_write_u32(&t_thrd.walreceiverfuncs_cxt.WalRcv->rcvDoneFromShareStorage, true);
return false;
}
pg_atomic_write_u32(&t_thrd.walreceiverfuncs_cxt.WalRcv->rcvDoneFromShareStorage, false);
int dataLen = read_len - page_off;
WalDataMessageHeader *msgHdr = (WalDataMessageHeader *)(local_buff + page_off - sizeof(WalDataMessageHeader));
msgHdr->dataStart = start_lsn;
msgHdr->walEnd = InvalidXLogRecPtr;
msgHdr->sendTime = GetCurrentTimestamp();
msgHdr->catchup = false;
*len = sizeof(WalDataMessageHeader) + dataLen;
*type = 'w';
*buffer = (char *)msgHdr;
const uint32 shiftSize = 32;
ereport(u_sess->attr.attr_storage.HaModuleDebug ? LOG : DEBUG2,
(errmsg("[shared_storage_xlog_read]get xlog startlsn %08X/%08X, len %X\n", (uint32)(start_lsn >> shiftSize),
(uint32)start_lsn, (uint32)dataLen)));
return true;
}
void shared_storage_xlog_check_consistency()
{
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
XLogRecPtr localRec;
pg_crc32 localRecCrc = 0;
uint32 recordLen;
SpinLockAcquire(&walrcv->mutex);
localRecCrc = walrcv->latestRecordCrc;
localRec = walrcv->latestValidRecord;
recordLen = walrcv->latestRecordLen;
SpinLockRelease(&walrcv->mutex);
ShareStorageXLogCtl *sharedStorageCtl = g_instance.xlog_cxt.shareStorageXLogCtl;
ReadShareStorageCtlInfo(sharedStorageCtl);
if (sharedStorageCtl->systemIdentifier != GetSystemIdentifier()) {
ereport(WARNING, (errcode(ERRCODE_INVALID_STATUS),
errmsg("database system version is different between dorado and local"),
errdetail("The dorado's system version is %lu, the local's system version is %lu.",
sharedStorageCtl->systemIdentifier, GetSystemIdentifier())));
ShareStorageSetBuildErrorAndExit(SYSTEMID_REBUILD, false);
return;
}
if (XLByteLT(localRec, sharedStorageCtl->insertTail)) {
ereport(WARNING, (errcode(ERRCODE_INVALID_STATUS),
errmsg("standby's local request lsn[%lx], shared storage insert Tail[%lx]", localRec,
sharedStorageCtl->insertTail)));
ShareStorageSetBuildErrorAndExit(WALSEGMENT_REBUILD, false);
return;
}
XLogReaderState *xlogReader = XLogReaderAllocate(&SharedStorageXLogPageRead, NULL,
g_instance.xlog_cxt.shareStorageopCtl.blkSize);
if (xlogReader == NULL) {
ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"),
errdetail("shared_storage_xlog_check_consistency failed allocate an XLog reader")));
}
char *errormsg = NULL;
if (XLByteLT(localRec + recordLen, sharedStorageCtl->insertHead)) {
XLogRecord *record = XLogReadRecord(xlogReader, localRec, &errormsg, false);
if (record == NULL) {
ereport(WARNING, (errcode(ERRCODE_INVALID_STATUS),
errmsg("standby's local request lsn[%lx] is not valid record", localRec)));
ShareStorageSetBuildErrorAndExit(WALSEGMENT_REBUILD, false);
if (xlogReader != NULL) {
XLogReaderFree(xlogReader);
xlogReader = NULL;
}
return;
}
if (localRecCrc != record->xl_crc || recordLen != record->xl_tot_len) {
ereport(WARNING, (errcode(ERRCODE_INVALID_STATUS),
errmsg("standby's local request lsn[%lx] 's crc mismatched with shared storage"
"crc(local, share):[%u,%u].",
localRec, localRecCrc, record->xl_crc)));
ShareStorageSetBuildErrorAndExit(WALSEGMENT_REBUILD, false);
if (xlogReader != NULL) {
XLogReaderFree(xlogReader);
xlogReader = NULL;
}
return;
}
t_thrd.walreceiver_cxt.checkConsistencyOK = true;
} else {
ereport(WARNING, (errcode(ERRCODE_INVALID_STATUS),
errmsg("standby's local request lsn[%lx], shared storage inserthead[%lx], recordlen[%x]",
localRec, sharedStorageCtl->insertHead, recordLen)));
uint32 shiftSize = 32;
XLogRecPtr shareStorageLatestRecordStart = InvalidXLogRecPtr;
int shareStorageLatestRecordLen;
pg_crc32 shareStorageLatestRecordCrc;
FindLastRecordCheckInfoOnShareStorage(&shareStorageLatestRecordStart, &shareStorageLatestRecordCrc,
&shareStorageLatestRecordLen);
if (shareStorageLatestRecordStart == InvalidXLogRecPtr ||
XLByteLT(sharedStorageCtl->insertHead, shareStorageLatestRecordStart + shareStorageLatestRecordLen)) {
ShareStorageSetBuildErrorAndExit(NONE_REBUILD);
if (xlogReader != NULL) {
XLogReaderFree(xlogReader);
xlogReader = NULL;
}
return;
}
bool crcValid = false;
pg_crc32 localCheckCrc = GetXlogRecordCrc(shareStorageLatestRecordStart, crcValid, XLogPageRead, 0);
if (shareStorageLatestRecordCrc != localCheckCrc) {
ereport(WARNING, (errmsg("shared storage request lsn[%X/%X]'s crc mismatched (share, local):[%u,%u].",
static_cast<uint32>(shareStorageLatestRecordStart >> shiftSize),
static_cast<uint32>(shareStorageLatestRecordStart), shareStorageLatestRecordCrc, localCheckCrc)));
ShareStorageSetBuildErrorAndExit(WALSEGMENT_REBUILD, false);
if (xlogReader != NULL) {
XLogReaderFree(xlogReader);
xlogReader = NULL;
}
return;
}
ShareStorageSetBuildErrorAndExit(NONE_REBUILD);
}
pg_atomic_write_u32(&t_thrd.walreceiverfuncs_cxt.WalRcv->shareStorageTerm, sharedStorageCtl->term);
if (xlogReader != NULL) {
XLogReaderFree(xlogReader);
xlogReader = NULL;
}
return;
}
bool try_connect_libpq(LibpqrcvConnectParam *param, bool printError)
{
bool libpgConnected = false;
MemoryContext current_ctx = CurrentMemoryContext;
static TimestampTz first_fail_time = 0;
bool timeout = false;
PG_TRY();
{
libpgConnected = libpqrcv_connect(param->conninfo, ¶m->startpoint, param->slotname,
param->channel_identifier);
}
PG_CATCH();
{
(void)MemoryContextSwitchTo(current_ctx);
ErrorData *edata = CopyErrorData();
if (printError)
ereport(LOG, (errmsg("Failed to call try_connect_libpq: %s", edata->message)));
FlushErrorState();
FreeErrorData(edata);
libpgConnected = false;
libpqrcv_disconnect();
t_thrd.libwalreceiver_cxt.streamConn = 0;
if (first_fail_time != 0) {
const int maxWaitTime = 5000;
if (ComputeTimeStamp(first_fail_time) > maxWaitTime)
timeout = true;
} else {
first_fail_time = GetCurrentTimestamp();
}
}
PG_END_TRY();
ereport(LOG, (errmsg("try_connect_libpq try to connect to %s, status %u, timeout %u.", param->conninfo,
libpgConnected, timeout)));
if (timeout) {
first_fail_time = 0;
ereport(FATAL,
(errcode(ERRCODE_CONNECTION_TIMED_OUT), errmsg("try_connect_libpq timeout restart walreceiver")));
}
if (libpgConnected) {
first_fail_time = 0;
ShareStorageXLogCtl *ctlInfo = g_instance.xlog_cxt.shareStorageXLogCtl;
ReadShareStorageCtlInfo(ctlInfo);
if ((uint64)g_instance.attr.attr_storage.xlog_file_size != ctlInfo->xlogFileSize) {
ereport(FATAL, (errmsg("try_connect_libpq xlog file size not matched,guc:%ld, ctl info:%lu",
g_instance.attr.attr_storage.xlog_file_size, ctlInfo->xlogFileSize)));
}
}
return libpgConnected;
}
bool shared_storage_connect(char *conninfo, XLogRecPtr *startpoint, char *slotname, int channel_identifier)
{
bool libpgConnected = true;
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
walrcv->peer_role = PRIMARY_MODE;
walrcv->peer_state = NORMAL_STATE;
walrcv->isFirstTimeAccessStorage = true;
if (SS_DORADO_CLUSTER) {
libpgConnected = libpqrcv_connect(conninfo, startpoint, slotname, channel_identifier);
return libpgConnected;
}
if (IS_SHARED_STORAGE_MODE) {
uint32 totalLen = sizeof(WalDataMessageHeader) + ShareStorageBufSize +
g_instance.xlog_cxt.shareStorageopCtl.blkSize;
t_thrd.libwalreceiver_cxt.shared_storage_buf = (char *)palloc0(totalLen);
t_thrd.libwalreceiver_cxt.shared_storage_read_buf =
(char *)TYPEALIGN(g_instance.xlog_cxt.shareStorageopCtl.blkSize,
t_thrd.libwalreceiver_cxt.shared_storage_buf + sizeof(WalDataMessageHeader));
t_thrd.libwalreceiver_cxt.connect_param.conninfo = conninfo;
t_thrd.libwalreceiver_cxt.connect_param.startpoint = *startpoint;
t_thrd.libwalreceiver_cxt.connect_param.slotname = slotname;
t_thrd.libwalreceiver_cxt.connect_param.channel_identifier = channel_identifier;
XLogReaderState *xlogreader = XLogReaderAllocate(SharedStorageXLogPageRead, 0,
g_instance.xlog_cxt.shareStorageopCtl.blkSize);
if (xlogreader == NULL) {
ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"),
errdetail("Failed while allocating an XLog reading processor")));
}
xlogreader->system_identifier = GetSystemIdentifier();
t_thrd.libwalreceiver_cxt.xlogreader = xlogreader;
shared_storage_xlog_check_consistency();
libpgConnected = try_connect_libpq(&t_thrd.libwalreceiver_cxt.connect_param, true);
}
return libpgConnected;
}
bool shared_storage_receive(int timeout, unsigned char *type, char **buffer, int *len)
{
const uint32 timeRatio = 2;
static uint64 receiveNum = 0;
static TimestampTz prev_time = 0;
bool hasReadData = false;
volatile WalRcvData *walrcv = t_thrd.walreceiverfuncs_cxt.WalRcv;
SpinLockAcquire(&walrcv->mutex);
bool isStopping = (walrcv->walRcvState == WALRCV_STOPPING);
SpinLockRelease(&walrcv->mutex);
if (!isStopping) {
if (t_thrd.libwalreceiver_cxt.streamConn) {
hasReadData = libpqrcv_receive(0, type, buffer, len);
if (hasReadData) {
return true;
}
}
if ((receiveNum++) % READ_MSG_RATIO == 0) {
uint32 disable_connection_node =
pg_atomic_read_u32(&g_instance.comm_cxt.localinfo_cxt.need_disable_connection_node);
if ((!t_thrd.libwalreceiver_cxt.streamConn) && (!disable_connection_node)) {
const int waitTime = 5000;
if (prev_time == 0 || ComputeTimeStamp(prev_time) > waitTime) {
try_connect_libpq(&t_thrd.libwalreceiver_cxt.connect_param, true);
prev_time = GetCurrentTimestamp();
}
}
}
}
* When ss cluster replication enabled, no xlog will receive, so return false directly.
* Xlog will replicated by Dorado synchronous replication.
*/
if (SS_DORADO_CLUSTER) {
return false;
}
return shared_storage_xlog_read(timeout / timeRatio, type, buffer, len, isStopping);
}
void shared_storage_send(const char *buffer, int nbytes)
{
if (IS_SHARED_STORAGE_STANBY_MODE || SS_DORADO_CLUSTER) {
if (t_thrd.libwalreceiver_cxt.streamConn)
libpqrcv_send(buffer, nbytes);
}
}
void shared_storage_disconnect(void)
{
libpqrcv_disconnect();
if (t_thrd.libwalreceiver_cxt.shared_storage_buf != NULL) {
pfree(t_thrd.libwalreceiver_cxt.shared_storage_buf);
t_thrd.libwalreceiver_cxt.shared_storage_buf = NULL;
t_thrd.libwalreceiver_cxt.shared_storage_read_buf = NULL;
}
if (t_thrd.libwalreceiver_cxt.xlogreader) {
XLogReaderFree(t_thrd.libwalreceiver_cxt.xlogreader);
t_thrd.libwalreceiver_cxt.xlogreader = 0;
}
}