* Copyright (c) Huawei Technologies Co., Ltd. 2021-2022. All rights reserved.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* ---------------------------------------------------------------------------------------
*
* ss_reform_common.cpp
* common methods for crash recovery, switchover and failover.
*
*
* IDENTIFICATION
* src/gausskernel/ddes/adapter/ss_init.cpp
*
* ---------------------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/xlog.h"
#include "access/multi_redo_api.h"
#include "postmaster/postmaster.h"
#include "storage/smgr/fd.h"
#include "storage/dss/fio_dss.h"
#include "ddes/dms/ss_dms.h"
#include "ddes/dms/ss_common_attr.h"
#include "ddes/dms/ss_dms_bufmgr.h"
#include "ddes/dms/ss_reform_common.h"
#include "storage/file/fio_device.h"
#include "storage/smgr/segment_internal.h"
#include "replication/walreceiver.h"
#include "replication/walsender_private.h"
#include "replication/ss_disaster_cluster.h"
* Add xlog reader private structure for page read.
*/
typedef struct XLogPageReadPrivate {
int emode;
bool fetching_ckpt;
bool randAccess;
} XLogPageReadPrivate;
int SSReadXlogInternal(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, XLogRecPtr targetRecPtr, char *buf,
int readLen, int readFile)
{
uint32 preReadOff;
Assert(readLen > 0);
Assert(readLen <= XLogPreReadSize);
do {
if ((XLByteInPreReadBuf(targetPagePtr, xlogreader->preReadStartPtr))) {
preReadOff = targetPagePtr % XLogPreReadSize;
int err = memcpy_s(buf, readLen, xlogreader->preReadBuf + preReadOff, readLen);
securec_check(err, "\0", "\0");
break;
} else {
* That preReadStartPtr is InvalidXlogPreReadStartPtr has three kinds of occasions.
*/
if (xlogreader->preReadStartPtr == InvalidXlogPreReadStartPtr && SS_DISASTER_CLUSTER) {
ereport(LOG, (errmsg("[SS] In ss disaster cluster mode, preReadStartPtr is 0.")));
}
uint32 targetPageOff = targetPagePtr % XLogSegSize;
preReadOff = targetPageOff - targetPageOff % XLogPreReadSize;
ssize_t actualBytes = pread(readFile, xlogreader->preReadBuf, XLogPreReadSize, preReadOff);
if (actualBytes != XLogPreReadSize) {
return false;
}
xlogreader->preReadStartPtr = targetPagePtr + preReadOff - targetPageOff;
}
} while (true);
return readLen;
}
XLogReaderState *SSXLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data, Size alignedSize)
{
XLogReaderState *state = XLogReaderAllocate(pagereadfunc, private_data, alignedSize);
if (state != NULL) {
state->preReadStartPtr = InvalidXlogPreReadStartPtr;
if (AmCBMReaderProcess()) {
if (t_thrd.cbm_cxt.preReadBuff == NULL) {
t_thrd.cbm_cxt.preReadBuff = (char *)palloc_extended(XLogPreReadSize + alignedSize,
MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
}
state->preReadBufOrigin = t_thrd.cbm_cxt.preReadBuff;
} else {
state->preReadBufOrigin = (char *)palloc_extended(XLogPreReadSize + alignedSize,
MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
}
if (state->preReadBufOrigin == NULL) {
pfree(state->errormsg_buf);
state->errormsg_buf = NULL;
pfree(state->readBufOrigin);
state->readBufOrigin = NULL;
state->readBuf = NULL;
pfree(state->readRecordBuf);
state->readRecordBuf = NULL;
pfree(state);
state = NULL;
return NULL;
}
if (alignedSize == 0) {
state->preReadBuf = state->preReadBufOrigin;
} else {
state->preReadBuf = (char *)TYPEALIGN(alignedSize, state->preReadBufOrigin);
}
state->xlogFlushPtrForPerRead = InvalidXLogRecPtr;
}
return state;
}
void SSUpdateReformerCtrl()
{
Assert(SS_PRIMARY_MODE || SS_DISASTER_CLUSTER);
int fd = -1;
int len;
errno_t err = EOK;
char *fname[2];
len = sizeof(ss_reformer_ctrl_t);
int write_size = (int)BUFFERALIGN(len);
char buffer[write_size] __attribute__((__aligned__(ALIGNOF_BUFFER))) = { 0 };
err = memcpy_s(&buffer, write_size, &g_instance.dms_cxt.SSReformerControl, len);
securec_check(err, "\0", "\0");
INIT_CRC32C(((ss_reformer_ctrl_t *)buffer)->crc);
COMP_CRC32C(((ss_reformer_ctrl_t *)buffer)->crc, (char *)buffer, offsetof(ss_reformer_ctrl_t, crc));
FIN_CRC32C(((ss_reformer_ctrl_t *)buffer)->crc);
fname[0] = XLOG_CONTROL_FILE_BAK;
fname[1] = XLOG_CONTROL_FILE;
for (int i = 0; i < BAK_CTRL_FILE_NUM; i++) {
if (i == 0) {
fd = BasicOpenFile(fname[i], O_CREAT | O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
} else {
fd = BasicOpenFile(fname[i], O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
}
if (fd < 0) {
ereport(FATAL, (errcode_for_file_access(), errmsg("[SS] could not open control file \"%s\": %s",
fname[i], TRANSLATE_ERRNO)));
}
SSWriteInstanceControlFile(fd, buffer, REFORM_CTRL_PAGE, write_size);
if (close(fd)) {
ereport(PANIC, (errcode_for_file_access(), errmsg("[SS] could not close control file: %s",
TRANSLATE_ERRNO)));
}
}
}
void SSReadReformerCtrl()
{
pg_crc32c crc;
errno_t rc = EOK;
int fd = -1;
char *fname = XLOG_CONTROL_FILE;
bool retry = false;
loop:
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
fd = BasicOpenFile(fname, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0) {
LWLockRelease(ControlFileLock);
ereport(FATAL, (errcode_for_file_access(), errmsg("[SS] could not open control file \"%s\": %m", fname)));
}
off_t seekpos = (off_t)BLCKSZ * REFORM_CTRL_PAGE;
int len = sizeof(ss_reformer_ctrl_t);
int read_size = (int)BUFFERALIGN(len);
char buffer[read_size] __attribute__((__aligned__(ALIGNOF_BUFFER)));
if (pread(fd, buffer, read_size, seekpos) != read_size) {
LWLockRelease(ControlFileLock);
ereport(PANIC, (errcode_for_file_access(), errmsg("[SS] could not read from control file: %m")));
}
rc = memcpy_s(&g_instance.dms_cxt.SSReformerControl, len, buffer, len);
securec_check(rc, "", "");
if (close(fd) < 0) {
ereport(PANIC, (errcode_for_file_access(), errmsg("[SS] could not close control file: %m")));
}
INIT_CRC32C(crc);
COMP_CRC32C(crc, (char *)&g_instance.dms_cxt.SSReformerControl, offsetof(ss_reformer_ctrl_t, crc));
FIN_CRC32C(crc);
if (!EQ_CRC32C(crc, g_instance.dms_cxt.SSReformerControl.crc)) {
if (!retry) {
ereport(WARNING, (errmsg("[SS] control file \"%s\" contains incorrect checksum, try backup file", fname)));
fname = XLOG_CONTROL_FILE_BAK;
retry = true;
LWLockRelease(ControlFileLock);
goto loop;
} else {
LWLockRelease(ControlFileLock);
ereport(FATAL, (errmsg("[SS] incorrect checksum in control file")));
}
}
g_instance.dms_cxt.SSRecoveryInfo.cluster_ondemand_status = g_instance.dms_cxt.SSReformerControl.clusterStatus;
LWLockRelease(ControlFileLock);
}
void SSClearSegCache()
{
(void)LWLockAcquire(ShmemIndexLock, LW_EXCLUSIVE);
HeapMemResetHash(t_thrd.storage_cxt.SegSpcCache, "[SS] Shared Seg Spc hash by request");
LWLockRelease(ShmemIndexLock);
}
void SSStandbySetLibpqswConninfo()
{
if (strlen(g_instance.dms_cxt.dmsInstAddr[g_instance.dms_cxt.SSReformerControl.primaryInstId]) == 0) {
ereport(WARNING, (errmsg("[SS] Failed to get ip of primary node!")));
return;
}
int replIdx = -1;
ReplConnInfo *replconninfo = NULL;
for (int i = 0; i < MAX_REPLNODE_NUM; ++i) {
replconninfo = t_thrd.postmaster_cxt.ReplConnArray[i];
if (replconninfo == NULL) {
continue;
}
if (strcmp(replconninfo->remotehost,
g_instance.dms_cxt.dmsInstAddr[g_instance.dms_cxt.SSReformerControl.primaryInstId]) == 0) {
replIdx = i;
break;
}
}
if (replIdx == -1) {
ereport(WARNING, (errmsg("[SS] Failed to get replconninfo of primary node, check the replconninfo config!")));
return;
}
replconninfo = t_thrd.postmaster_cxt.ReplConnArray[replIdx];
errno_t rc = EOK;
rc = snprintf_s(g_instance.dms_cxt.conninfo, MAXCONNINFO, MAXCONNINFO - 1,
"host=%s port=%d localhost=%s localport=%d", replconninfo->remotehost, replconninfo->remoteport,
replconninfo->localhost, replconninfo->localport);
securec_check_ss(rc, "\0", "\0");
g_instance.dms_cxt.conninfo[MAXCONNINFO - 1] = '\0';
return;
}
static void SSReadClusterRunMode()
{
int fd = -1;
char *fname = NULL;
int read_size = 0;
int len = sizeof(ss_reformer_ctrl_t);
fname = XLOG_CONTROL_FILE;
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
fd = BasicOpenFile(fname, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0) {
LWLockRelease(ControlFileLock);
ereport(FATAL, (errcode_for_file_access(), errmsg("[SS] could not open control file \"%s\": %m", fname)));
}
off_t seekpos = (off_t)BLCKSZ * REFORM_CTRL_PAGE;
read_size = (int)BUFFERALIGN(len);
char buffer[read_size] __attribute__((__aligned__(ALIGNOF_BUFFER)));
if (pread(fd, buffer, read_size, seekpos) != read_size) {
(void)close(fd);
LWLockRelease(ControlFileLock);
ereport(PANIC, (errcode_for_file_access(), errmsg("[SS] could not read from control file: %m")));
}
g_instance.dms_cxt.SSReformerControl.clusterRunMode = ((ss_reformer_ctrl_t*)buffer)->clusterRunMode;
if (close(fd) < 0) {
LWLockRelease(ControlFileLock);
ereport(PANIC, (errcode_for_file_access(), errmsg("[SS] could not close control file: %m")));
}
LWLockRelease(ControlFileLock);
}
void SSDisasterRefreshMode()
{
LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
SSUpdateReformerCtrl();
LWLockRelease(ControlFileLock);
ereport(LOG, (errmsg("[SS] SSDisasterRefreshMode change control file cluster run mode to: %d",
g_instance.dms_cxt.SSReformerControl.clusterRunMode)));
}
void SSDisasterUpdateHAmode()
{
SSReadClusterRunMode();
if (SS_REFORM_REFORMER) {
if (SS_DISASTER_PRIMARY_CLUSTER) {
t_thrd.postmaster_cxt.HaShmData->current_mode = PRIMARY_MODE;
} else if (SS_DISASTER_STANDBY_CLUSTER) {
t_thrd.postmaster_cxt.HaShmData->current_mode = STANDBY_MODE;
}
} else {
t_thrd.postmaster_cxt.HaShmData->current_mode = NORMAL_MODE;
}
ereport(LOG, (errmsg("[SS] SSDisasterUpdateHAmode change Ha current mode to: %d",
t_thrd.postmaster_cxt.HaShmData->current_mode)));
}
bool SSPerformingStandbyScenario()
{
if (SS_IN_REFORM) {
if (g_instance.dms_cxt.SSReformInfo.reform_type == DMS_REFORM_TYPE_FOR_NORMAL_OPENGAUSS &&
((uint64)(0x1 << SS_PRIMARY_ID) & g_instance.dms_cxt.SSReformInfo.bitmap_reconnect) == 0) {
return true;
} else if (g_instance.dms_cxt.SSReformInfo.reform_type == DMS_REFORM_TYPE_FOR_FULL_CLEAN) {
return true;
}
}
return false;
}
void SSGrantDSSWritePermission(void)
{
* Whether dss_set_server_status_wrapper leads to dead loop. There exists occasion that database maybe block
* here but fail and success log have no print. So it is suspected greatly that dss_set_server_status_wrapper
* cause database stuck.
*/
ereport(LOG, (errmodule(MOD_DMS), (errmsg("[SS reform] set dss server status as primary start."))));
while (dss_set_server_status_wrapper() != GS_SUCCESS) {
pg_usleep(REFORM_WAIT_LONG);
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[SS reform] Failed to set DSS as primary, vgname: \"%s\", socketpath: \"%s\"",
g_instance.attr.attr_storage.dss_attr.ss_dss_data_vg_name,
g_instance.attr.attr_storage.dss_attr.ss_dss_conn_path),
errhint("Check vgname and socketpath and restart later.")));
}
ereport(LOG, (errmodule(MOD_DMS), (errmsg("[SS reform] set dss server status as primary: success"))));
}
bool SSPrimaryRestartScenario()
{
if (SS_IN_REFORM) {
if (g_instance.dms_cxt.SSReformInfo.reform_type == DMS_REFORM_TYPE_FOR_NORMAL_OPENGAUSS &&
((uint64)(0x1 << SS_PRIMARY_ID) & g_instance.dms_cxt.SSReformInfo.bitmap_reconnect) != 0) {
return true;
}
}
return false;
}
* PRIMARY_CLUSTER and single cluster
* 1)standby scenario
* 2)primary restart -- no need exit
* 3)switchover
* 4)failover -- need exit
* STANDBY_CLUSTER
* all scenario -- need exit
*/
bool SSBackendNeedExitScenario()
{
if (!SS_IN_REFORM) {
return false;
}
if (SS_DISASTER_STANDBY_CLUSTER) {
return true;
}
if (g_instance.attr.attr_sql.enableRemoteExcute) {
ereport(LOG, (errmsg("[SS] remote execute is enabled, all backends need exit to ensure"
"complete transaction!")));
return true;
}
if (SSPerformingStandbyScenario() || SSPrimaryRestartScenario()) {
return false;
}
return true;
}
void SSProcessForceExit()
{
int log_level = WARNING;
#ifdef USE_ASSERT_CHECKING
log_level = PANIC;
#endif
ereport(log_level, (errmodule(MOD_DMS),
errmsg("[SS reform] db exit directly by force now")));
_exit(0);
}
void SSWaitStartupExit(bool send_signal)
{
if (g_instance.pid_cxt.StartupPID == 0) {
return;
}
if (send_signal) {
if ((SS_STANDBY_FAILOVER && !g_instance.dms_cxt.SSRecoveryInfo.restart_failover_flag) ||
SS_STANDBY_PROMOTING) {
g_instance.dms_cxt.SSRecoveryInfo.startup_need_exit_normally = true;
pg_memory_barrier();
}
SendPostmasterSignal(PMSIGNAL_DMS_TERM_STARTUP);
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform] send terminate startup thread signal to PM.")));
}
if (SS_IN_REFORM && dms_reform_failed()) {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[SS reform] reform failed.")));
}
#ifdef USE_ASSERT_CHECKING
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform] wait for the startup thread to exit")));
#else
long rto_limit = SS_RTO_LIMIT;
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform] wait startup thread exit until RTO limit time:%ld sec",
rto_limit / (1000 * 1000))));
long wait_time = 0;
#endif
while (true) {
if (g_instance.pid_cxt.StartupPID == 0) {
break;
}
if (g_instance.dms_cxt.SSRecoveryInfo.recovery_trapped_in_page_request) {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[SS reform] recovery_trapped_in_page_request: Thread pageredo or startup are trapped"
"in page request during recovery phase, db exit directly by force now.")));
_exit(0);
}
#ifndef USE_ASSERT_CHECKING
if (wait_time > rto_limit) {
SSProcessForceExit();
}
wait_time += REFORM_WAIT_TIME;
#endif
pg_usleep(REFORM_WAIT_TIME);
}
}
void SSHandleStartupWhenReformStart(dms_reform_start_context_t *rs_cxt)
{
if (g_instance.pid_cxt.StartupPID == 0) {
return;
}
if (ENABLE_ONDEMAND_RECOVERY) {
if (rs_cxt->reform_type == DMS_REFORM_TYPE_FOR_SWITCHOVER_OPENGAUSS &&
rs_cxt->role != DMS_ROLE_REFORMER && ENABLE_ONDEMAND_REALTIME_BUILD) {
g_instance.dms_cxt.SSRecoveryInfo.realtime_build_in_reform = true;
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform][On-demand] reform start phase: stop ondemand realtime build before switchover.")));
SSWaitStartupExit(true);
return;
* Ondemand recovery feature need to make sure startup-process alive in follow scenarios:
* 1. Standby node does not need to stop startup process if ondemand realtime build is enable before normal reform.
* 2. Primary node does not need to stop startup process in ondemand-redo pharse, because startup process is performing recovery.
*/
} else if (SS_IN_ONDEMAND_RECOVERY || !SS_ONDEMAND_REALTIME_BUILD_DISABLED) {
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform][On-demand] reform start phase: ondemand realtime build is enable, no need wait startup thread exit.")));
return;
}
}
if (SS_DISASTER_MAIN_STANDBY_NODE) {
ereport(LOG, (errmodule(MOD_DMS),
errmsg("[SS reform][Dual cluster] reform start phase: standby cluster main node "
"no need wait startup thread exit.")));
return;
}
ss_reform_info_t *reform_info = &g_instance.dms_cxt.SSReformInfo;
if (reform_info->reform_ver == reform_info->reform_ver_startup_wait &&
reform_info->reform_ver_startup_wait != 0) {
SSWaitStartupExit(false);
} else {
ereport(WARNING, (errmodule(MOD_DMS),
errmsg("[SS reform] reform start phase, last round reform version:%ld, startup wait version:%ld",
reform_info->reform_ver, reform_info->reform_ver_startup_wait)));
SSProcessForceExit();
}
}
char* SSGetLogHeaderTypeStr()
{
if (!SS_IN_REFORM) {
return "[SS]";
}
switch (g_instance.dms_cxt.SSReformInfo.reform_type)
{
case (DMS_REFORM_TYPE_FOR_NORMAL_OPENGAUSS):
return "[SS reform]";
break;
case (DMS_REFORM_TYPE_FOR_FAILOVER_OPENGAUSS):
return "[SS reform][SS failover]";
break;
case (DMS_REFORM_TYPE_FOR_SWITCHOVER_OPENGAUSS):
return "[SS reform][SS swithover]";
break;
default:
return "[SS]";
}
}
void ProcessNoCleanBackendsScenario() {
* PM_WAIT_REFORM indicates that this node don't do actually thing during reform phase.
* what only will be done currently is waiting this round of reform to finish.
* 1. primary restart, standby need set PM_WAIT_REFORM
* 2. standby restart, primary need set PM_WAIT_REFORM
* 3. SS_PERFORMING_SWITCHOVER, primary and standby all return. Because primary need PM_RUN to
* do ProcessDemoteRequest.
*/
if (SS_PERFORMING_SWITCHOVER) {
return;
}
if (((uint64)(0x1 << SS_MY_INST_ID) & g_instance.dms_cxt.SSReformInfo.bitmap_reconnect) == 0 &&
!g_instance.dms_cxt.SSRecoveryInfo.startup_reform &&
g_instance.dms_cxt.SSReformInfo.reform_type != DMS_REFORM_TYPE_FOR_FULL_CLEAN) {
pmState = PM_WAIT_REFORM;
ereport(LOG, (errmodule(MOD_DMS), errmsg("[SS reform] no clean backends, pmState=%d, SSClusterState=%d, "
"demotion=%d-%d, rec=%d", pmState, g_instance.dms_cxt.SSClusterState, g_instance.demotion,
t_thrd.walsender_cxt.WalSndCtl->demotion, t_thrd.xlog_cxt.InRecovery)));
}
}