*
* basebackup.cpp
* code for taking a base backup and streaming it to a standby
*
* Portions Copyright (c) 2020 Huawei Technologies Co.,Ltd.
* Portions Copyright (c) 2010-2012, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/gausskernel/storage/replication/basebackup.cpp
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include <sys/types.h>
#include <sys/stat.h>
#include "access/xlog_internal.h"
#include "access/cbmparsexlog.h"
#include "access/extreme_rto/standby_read/standby_read_base.h"
#include "catalog/catalog.h"
#include "catalog/pg_type.h"
#include "gs_thread.h"
#include "lib/stringinfo.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "nodes/pg_list.h"
#include "replication/basebackup.h"
#include "replication/dcf_data.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "replication/ss_disaster_cluster.h"
#include "replication/slot.h"
#include "access/xlog.h"
#include "storage/cfs/cfs_converter.h"
#include "storage/cfs/cfs_buffers.h"
#include "storage/smgr/fd.h"
#include "storage/ipc.h"
#include "storage/page_compression.h"
#include "storage/pmsignal.h"
#include "storage/checksum.h"
#include "storage/file/fio_device.h"
#ifdef ENABLE_MOT
#include "storage/mot/mot_fdw.h"
#endif
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
#include "storage/cfs/cfs_tools.h"
#include "postmaster/syslogger.h"
#include "pgxc/pgxc.h"
#include "miscadmin.h"
#include "replication/dcf_replication.h"
typedef struct {
const char *label;
bool progress;
bool fastcheckpoint;
bool nowait;
bool includewal;
bool sendtblspcmapfile;
bool isBuildFromStandby;
bool isCopySecureFiles;
bool isCopyUpgradeFile;
bool isObsmode;
} basebackup_options;
#define BUILD_PATH_LEN 2560
const int FILE_NAME_MAX_LEN = 1024;
const int MATCH_ONE = 1;
const int MATCH_TWO = 2;
const int MATCH_THREE = 3;
const int MATCH_FOUR = 4;
const int MATCH_FIVE = 5;
const int MATCH_SIX = 6;
const int MATCH_SEVEN = 7;
* Size of each block sent into the tar stream for larger files.
*/
#define TAR_SEND_SIZE (32 * 1024)
#define EREPORT_WAL_NOT_FOUND(segno) \
do { \
char walErrorName[MAXFNAMELEN]; \
XLogFileName(walErrorName, MAXFNAMELEN, t_thrd.xlog_cxt.ThisTimeLineID, segno); \
ereport(ERROR, (errmsg("could not find WAL file \"%s\"", walErrorName))); \
} while (0)
XLogRecPtr XlogCopyStartPtr = InvalidXLogRecPtr;
#ifdef ENABLE_MOT
static int64 sendDir(
const char *path, int basepathlen, bool sizeonly, List *tablespaces, bool sendtblspclinks, bool skipmot = true);
#else
static int64 sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, bool sendtblspclinks);
#endif
int64 sendTablespace(const char *path, bool sizeonly);
static void SendTableSpaceForBackup(basebackup_options* opt, List* tablespaces, char* labelfile,
char* tblspc_map_file);
static bool sendFile(char *readfilename, char *tarfilename, struct stat *statbuf, bool missing_ok);
static void sendFileWithContent(const char *filename, const char *content);
static void _tarWriteHeader(const char *filename, const char *linktarget, struct stat *statbuf);
static void send_int8_string(StringInfoData *buf, int64 intval);
static void SendBackupHeader(List *tablespaces);
#ifdef ENABLE_MOT
static void SendMotCheckpointHeader(const char* path);
#endif
static void base_backup_cleanup(int code, Datum arg);
static void SendSecureFileToDisasterCluster(basebackup_options *opt);
static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
static void parse_basebackup_options(List *options, basebackup_options *opt);
static int CompareWalFileNames(const void* a, const void* b);
static void SendXlogRecPtrResult(XLogRecPtr ptr, unsigned long long consensusPaxosIdx = 0);
static void send_xlog_location();
static void send_xlog_header(const char *linkpath);
static void save_xlogloc(const char *xloglocation);
static XLogRecPtr GetMinArchiveSlotLSN(void);
static XLogRecPtr GetMinLogicalSlotLSN(void);
static XLogRecPtr UpdateStartPtr(XLogRecPtr minLsn, XLogRecPtr curStartPtr);
static bool SendUndoMeta(FILE *fp, struct stat *statbuf);
const int undometaSize = UNDOSPACE_META_PAGE_SIZE + 2 * UNDOSPACE_SPACE_PAGE_SIZE;
const int undometaRetryMax = 3;
* save xlog location
*/
static void save_xlogloc(const char *xloglocation)
{
errno_t rc = 0;
if (0 == strncmp(xloglocation, t_thrd.proc_cxt.DataDir, strlen(t_thrd.proc_cxt.DataDir))) {
rc = strncpy_s(t_thrd.basebackup_cxt.g_xlog_location, MAXPGPATH,
xloglocation + strlen(t_thrd.proc_cxt.DataDir) + 1, MAXPGPATH - 1);
securec_check(rc, "", "");
t_thrd.basebackup_cxt.g_xlog_location[MAXPGPATH - 1] = '\0';
}
}
* send xlog location header
*/
static void send_xlog_header(const char *linkpath)
{
StringInfoData buf;
char pg_xlog[] = "data/pg_xlog";
pq_beginmessage(&buf, 'T');
pq_sendint16(&buf, 1);
pq_sendstring(&buf, "xloglink");
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendint32(&buf, TEXTOID);
pq_sendint16(&buf, UINT16_MAX);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage_noblock(&buf);
pq_beginmessage(&buf, 'D');
pq_sendint16(&buf, 1);
if (linkpath == NULL) {
pq_sendint32(&buf, strlen(pg_xlog));
pq_sendbytes(&buf, pg_xlog, strlen(pg_xlog));
} else {
pq_sendint32(&buf, strlen(linkpath));
pq_sendbytes(&buf, linkpath, strlen(linkpath));
}
pq_endmessage_noblock(&buf);
pq_puttextmessage_noblock('C', "SELECT");
}
* if xlog location is a link ,send it to standby
*/
static void send_xlog_location()
{
char fullpath[MAXPGPATH] = {0};
struct stat statbuf;
int rc = 0;
if (ENABLE_DSS) {
rc = snprintf_s(fullpath, sizeof(fullpath), sizeof(fullpath) - 1, SS_XLOGDIR);
} else {
rc = snprintf_s(fullpath, sizeof(fullpath), sizeof(fullpath) - 1, "%s/pg_xlog", t_thrd.proc_cxt.DataDir);
}
securec_check_ss(rc, "", "");
if (lstat(fullpath, &statbuf) != 0) {
ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat control file \"%s\": %m", fullpath)));
}
#ifndef WIN32
if (S_ISLNK(statbuf.st_mode)) {
#else
if (pgwin32_is_junction(fullpath)) {
#endif
#if defined(HAVE_READLINK) || defined(WIN32)
char linkpath[MAXPGPATH] = {0};
int rllen;
rllen = readlink(fullpath, linkpath, sizeof(linkpath));
if (rllen < 0)
ereport(ERROR, (errcode_for_file_access(), errmsg("could not read symbolic link \"%s\": %m", fullpath)));
if (rllen >= (int)sizeof(linkpath))
ereport(ERROR,
(errcode(ERRCODE_NAME_TOO_LONG), errmsg("symbolic link \"%s\" target is too long", fullpath)));
linkpath[MAXPGPATH - 1] = '\0';
save_xlogloc(linkpath);
send_xlog_header(linkpath);
#else
* If the platform does not have symbolic links, it should not be
* possible to have tablespaces - clearly somebody else created
* them. Warn about it and ignore.
*/
ereport(WARNING,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("tablespaces are not supported on this platform")));
continue;
#endif
} else if (S_ISDIR(statbuf.st_mode)) {
statbuf.st_mode = S_IFDIR | S_IRWXU;
send_xlog_header(NULL);
}
}
* Called when ERROR or FATAL happens in perform_base_backup() after
* we have started the backup - make sure we end it!
*/
static void base_backup_cleanup(int code, Datum arg)
{
do_pg_abort_backup();
}
#ifndef ENABLE_MULTIPLE_NODES
static void SetPaxosIndex(unsigned long long *consensusPaxosIdx)
{
if (g_instance.attr.attr_storage.dcf_attr.enable_dcf) {
unsigned long long minAppliedIdx = 0;
* Get the DCF min applied index from DCF in case that dcfData.realMinAppliedIdx was stale.
* Save min applied index in dcfData.realMinAppliedIdx in case that switchover happened and
* get min applied index 0 from DCF leader if the leader can't get response from some nodes
* at the beginning. It because these nodes have had exceptions before switchover.
*/
if (dcf_get_cluster_min_applied_idx(1, &minAppliedIdx) == 0) {
minAppliedIdx = (minAppliedIdx > t_thrd.shemem_ptr_cxt.dcfData->realMinAppliedIdx) ?
minAppliedIdx : t_thrd.shemem_ptr_cxt.dcfData->realMinAppliedIdx;
* Let the leader replicates DCF log from min applied index (include min applied index)
*/
*consensusPaxosIdx = minAppliedIdx;
ereport(LOG, (errmsg("Enable DCF and sending paxos index is %llu", *consensusPaxosIdx)));
} else {
ereport(ERROR, (errmsg("Enable DCF and get min applied index from dcf failed!")));
}
}
}
#endif
static void ProcessSecureFilesForDisasterCluster(const char *path)
{
#define BASE_PATH_LEN 2
struct dirent *de = NULL;
char pathbuf[MAXPGPATH];
struct stat statbuf;
int64 size = 0;
int rc = 0;
DIR *dir = AllocateDir(path);
while ((de = ReadDir(dir, path)) != NULL) {
if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0) {
continue;
}
if (!PostmasterIsAlive()) {
ereport(ERROR, (errcode_for_file_access(), errmsg("Postmaster exited, aborting active base backup")));
}
if (t_thrd.walsender_cxt.walsender_shutdown_requested || t_thrd.walsender_cxt.walsender_ready_to_stop) {
ereport(ERROR, (errcode_for_file_access(), errmsg("shutdown requested, aborting active base backup")));
}
if (t_thrd.postmaster_cxt.HaShmData &&
(t_thrd.walsender_cxt.server_run_mode != t_thrd.postmaster_cxt.HaShmData->current_mode)) {
ereport(ERROR, (errcode_for_file_access(), errmsg("server run mode changed, aborting active base backup")));
}
rc = snprintf_s(pathbuf, MAXPGPATH, MAXPGPATH - 1, "%s/%s", path, de->d_name);
securec_check_ss(rc, "", "");
if (lstat(pathbuf, &statbuf) != 0) {
if (errno != ENOENT) {
ereport(ERROR,
(errcode_for_file_access(), errmsg("could not stat file or directory \"%s\": %m", pathbuf)));
}
continue;
}
if (S_ISDIR(statbuf.st_mode)) {
_tarWriteHeader(pathbuf + BASE_PATH_LEN, NULL, &statbuf);
size += BUILD_PATH_LEN;
ProcessSecureFilesForDisasterCluster(pathbuf);
} else if (S_ISREG(statbuf.st_mode)) {
bool sent = sendFile(pathbuf, pathbuf + BASE_PATH_LEN, &statbuf, true);
size = size + ((statbuf.st_size + 511) & ~511) + BUILD_PATH_LEN;
if (!sent) {
ereport(ERROR,
(errcode_for_file_access(), errmsg("could not send file \"%s\"", pathbuf)));
}
}
}
FreeDir(dir);
}
static void SendSecureFileToDisasterCluster(basebackup_options *opt)
{
char pathbuf[MAXPGPATH];
struct stat statbuf;
StringInfoData buf;
tablespaceinfo *ti = (tablespaceinfo *)palloc0(sizeof(tablespaceinfo));
List *tablespaces = NIL;
int64 size = 0;
int rc = 0;
ti->size = size;
tablespaces = (List *)lappend(tablespaces, ti);
SendBackupHeader(tablespaces);
pq_beginmessage(&buf, 'H');
pq_sendbyte(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage_noblock(&buf);
if (opt->isCopyUpgradeFile) {
rc = snprintf_s(pathbuf, MAXPGPATH, MAXPGPATH - 1, "./upgrade_phase_info");
securec_check_ss(rc, "", "");
if (lstat(pathbuf, &statbuf) != 0) {
if (errno != ENOENT) {
ereport(ERROR,
(errcode_for_file_access(), errmsg("could not stat file or directory \"%s\": %m", pathbuf)));
} else {
ereport(ERROR,
(errcode_for_file_access(), errmsg("upgrade file \"%s\" does not exist.", pathbuf)));
}
}
bool sent = sendFile(pathbuf, pathbuf + BASE_PATH_LEN, &statbuf, true);
size = size + ((statbuf.st_size + 511) & ~511) + BUILD_PATH_LEN;
if (!sent) {
ereport(ERROR,
(errcode_for_file_access(), errmsg("could not send file \"%s\"", pathbuf)));
}
pq_putemptymessage_noblock('c');
pfree(ti);
return;
}
ProcessSecureFilesForDisasterCluster("./gs_secure_files");
pq_putemptymessage_noblock('c');
pfree(ti);
}
static XLogRecPtr UpdateStartPtr(XLogRecPtr minLsn, XLogRecPtr curStartPtr)
{
XLogRecPtr resStartPtr = curStartPtr;
if (!XLByteEQ(minLsn, InvalidXLogRecPtr) && (minLsn < resStartPtr)) {
if (XlogFileIsExisted(t_thrd.proc_cxt.DataDir, minLsn, DEFAULT_TIMELINE_ID) == true) {
resStartPtr = minLsn;
}
}
return resStartPtr;
}
* Actually do a base backup for the specified tablespaces.
*
* This is split out mainly to avoid complaints about "variable might be
* clobbered by longjmp" from stupider versions of gcc.
*/
static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
{
XLogRecPtr startptr;
XLogRecPtr endptr;
XLogRecPtr disasterSlotRestartPtr = InvalidXLogRecPtr;
char *labelfile = NULL;
char* tblspc_map_file = NULL;
List* tablespaces = NIL;
XLogSegNo startSegNo;
if (opt->isBuildFromStandby) {
startptr = StandbyDoStartBackup(opt->label, &labelfile, &tblspc_map_file, &tablespaces,
tblspcdir, opt->progress);
} else {
startptr =
do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile, tblspcdir, &tblspc_map_file, &tablespaces,
opt->progress, opt->sendtblspcmapfile);
}
if (opt->isObsmode) {
t_thrd.walsender_cxt.is_obsmode = true;
}
ReplicationSlotsComputeRequiredXmin(false);
ReplicationSlotsComputeRequiredLSN(NULL);
* If force recycle has been triggered, archive slot min lsn may be the smallest one, but its xlog is gone.
* In result, we fail to use minlsn to update startptr. But we need keep some needed xlogs which are smaller
* than startptr but bigger than archive slot min lsn. So calculate the specific restart lsn one by one.
*/
XLogRecPtr minlsn = XLogGetReplicationSlotMinimumLSNByOther();
startptr = UpdateStartPtr(minlsn, startptr);
disasterSlotRestartPtr = GetMinArchiveSlotLSN();
startptr = UpdateStartPtr(disasterSlotRestartPtr, startptr);
XLogRecPtr logicalMinLsn = GetMinLogicalSlotLSN();
startptr = UpdateStartPtr(logicalMinLsn, startptr);
LWLockAcquire(FullBuildXlogCopyStartPtrLock, LW_EXCLUSIVE);
XlogCopyStartPtr = startptr;
LWLockRelease(FullBuildXlogCopyStartPtrLock);
ereport(INFO,
(errmsg("The starting position of the xlog copy of the full build is: %X/%X. The slot minimum LSN is: %X/%X."
" The disaster slot minimum LSN is: %X/%X. The logical slot minimum LSN is: %X/%X.",
(uint32)(startptr >> 32), (uint32)startptr, (uint32)(minlsn >> 32), (uint32)minlsn,
(uint32)(disasterSlotRestartPtr >> 32), (uint32)disasterSlotRestartPtr,
(uint32)(logicalMinLsn >> 32), (uint32)logicalMinLsn)));
#ifdef ENABLE_MULTIPLE_NODES
cbm_rotate_file(startptr);
#endif
XLByteToSeg(startptr, startSegNo);
XLogSegNo lastRemovedSegno = XLogGetLastRemovedSegno();
if (startSegNo <= lastRemovedSegno) {
startptr = (lastRemovedSegno + 1) * XLogSegSize;
}
SendXlogRecPtrResult(startptr);
PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum)0);
SendTableSpaceForBackup(opt, tablespaces, labelfile, tblspc_map_file);
PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum)0);
if (opt->isBuildFromStandby) {
endptr = StandbyDoStopBackup(labelfile);
} else {
endptr = do_pg_stop_backup(labelfile, !opt->nowait);
}
if (opt->includewal) {
* We've left the last tar file "open", so we can now append the
* required WAL files to it.
*/
char pathbuf[MAXPGPATH];
XLogSegNo segno;
XLogSegNo startsegno;
XLogSegNo endsegno;
struct stat statbuf;
List* historyFileList = NIL;
List* walFileList = NIL;
char** walFiles;
int nWalFiles;
char firstoff[MAXFNAMELEN];
char lastoff[MAXFNAMELEN];
struct dirent* de;
int i;
ListCell* lc;
TimeLineID tli;
* I'd rather not worry about timelines here, so scan pg_xlog and
* include all WAL files in the range between 'startptr' and 'endptr',
* regardless of the timeline the file is stamped with. If there are
* some spurious WAL files belonging to timelines that don't belong in
* this server's history, they will be included too. Normally there
* shouldn't be such files, but if there are, there's little harm in
* including them.
*/
XLByteToSeg(startptr, startsegno);
XLogFileName(firstoff, MAXFNAMELEN, t_thrd.xlog_cxt.ThisTimeLineID, startsegno);
XLByteToPrevSeg(endptr, endsegno);
XLogFileName(lastoff, MAXFNAMELEN, t_thrd.xlog_cxt.ThisTimeLineID, endsegno);
DIR* dir = AllocateDir("pg_xlog");
if (!dir) {
ereport(ERROR, (errmsg("could not open directory \"%s\": %m", "pg_xlog")));
}
while ((de = ReadDir(dir, "pg_xlog")) != NULL) {
if (strlen(de->d_name) == 24 && strspn(de->d_name, "0123456789ABCDEF") == 24 &&
strcmp(de->d_name + 8, firstoff + 8) >= 0 && strcmp(de->d_name + 8, lastoff + 8) <= 0) {
walFileList = lappend(walFileList, pstrdup(de->d_name));
} else if (strlen(de->d_name) == 8 + strlen(".history") && strspn(de->d_name, "0123456789ABCDEF") == 8 &&
strcmp(de->d_name + 8, ".history") == 0) {
historyFileList = lappend(historyFileList, pstrdup(de->d_name));
}
}
FreeDir(dir);
* Before we go any further, check that none of the WAL segments we
* need were removed.
*/
CheckXLogRemoved(startsegno, t_thrd.xlog_cxt.ThisTimeLineID);
* Put the WAL filenames into an array, and sort. We send the files in
* order from oldest to newest, to reduce the chance that a file is
* recycled before we get a chance to send it over.
*/
nWalFiles = list_length(walFileList);
* There must be at least one xlog file in the pg_xlog directory,
* since we are doing backup-including-xlog.
*/
if (nWalFiles < 1) {
ereport(ERROR, (errmsg("could not find any WAL files")));
}
walFiles = (char**)palloc0(nWalFiles * sizeof(char*));
i = 0;
foreach (lc, walFileList) {
walFiles[i++] = (char*)lfirst(lc);
}
qsort(walFiles, nWalFiles, sizeof(char*), CompareWalFileNames);
* Sanity check: the first and last segment should cover startptr and
* endptr, with no gaps in between.
*/
XLogFromFileName(walFiles[0], &tli, &segno);
if (segno != startsegno) {
EREPORT_WAL_NOT_FOUND(startsegno);
}
for (i = 0; i < nWalFiles; i++) {
XLogSegNo currsegno = segno;
XLogSegNo nextsegno = segno + 1;
XLogFromFileName(walFiles[i], &tli, &segno);
if (!(nextsegno == segno || currsegno == segno)) {
EREPORT_WAL_NOT_FOUND(nextsegno);
}
}
if (segno != endsegno) {
EREPORT_WAL_NOT_FOUND(endsegno);
}
for (i = 0; i < nWalFiles; i++) {
FILE* fp;
char buf[TAR_SEND_SIZE];
size_t cnt;
pgoff_t len = 0;
int rt = snprintf_s(pathbuf, MAXPGPATH,MAXPGPATH -1, XLOGDIR "/%s", walFiles[i]);
securec_check_ss_c(rt, "\0", "\0");
XLogFromFileName(walFiles[i], &tli, &segno);
fp = AllocateFile(pathbuf, "rb");
if (fp == NULL) {
int save_errno = errno;
* Most likely reason for this is that the file was already
* removed by a checkpoint, so check for that to get a better
* error message.
*/
CheckXLogRemoved(segno, tli);
errno = save_errno;
ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", pathbuf)));
}
if (fstat(fileno(fp), &statbuf) != 0) {
ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", pathbuf)));
}
if (statbuf.st_size != (off_t)XLogSegSize) {
CheckXLogRemoved(segno, tli);
ereport(ERROR, (errcode_for_file_access(), errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
}
_tarWriteHeader(pathbuf, NULL, &statbuf);
while ((cnt = fread(buf, 1, Min((uint32)sizeof(buf), XLogSegSize - len), fp)) > 0) {
CheckXLogRemoved(segno, tli);
if (pq_putmessage('d', buf, cnt)) {
ereport(ERROR, (errmsg("base backup could not send data, aborting backup")));
}
len += cnt;
if (len == (off_t)XLogSegSize)
break;
}
if (len != (off_t)XLogSegSize) {
CheckXLogRemoved(segno, tli);
ereport(ERROR, (errcode_for_file_access(), errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
}
FreeFile(fp);
* Mark file as archived, otherwise files can get archived again
* after promotion of a new node. This is in line with
* walreceiver.c always doing a XLogArchiveForceDone() after a
* complete segment.
*/
StatusFilePath(pathbuf, MAXPGPATH, walFiles[i], ".done");
sendFileWithContent(pathbuf, "");
}
* Send timeline history files too. Only the latest timeline history
* file is required for recovery, and even that only if there happens
* to be a timeline switch in the first WAL segment that contains the
* checkpoint record, or if we're taking a base backup from a standby
* server and the target timeline changes while the backup is taken.
* But they are small and highly useful for debugging purposes, so
* better include them all, always.
*/
foreach (lc, historyFileList) {
char* fname = (char*)lfirst(lc);
int rt = snprintf_s(pathbuf, MAXPGPATH, MAXPGPATH-1, "/%s", fname);
securec_check_ss_c(rt, "\0", "\0");
if (lstat(pathbuf, &statbuf) != 0)
ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file \"%s\": %m", pathbuf)));
sendFile(pathbuf, pathbuf, &statbuf, false);
StatusFilePath(pathbuf, MAXPGPATH, fname, ".done");
sendFileWithContent(pathbuf, "");
}
pq_putemptymessage('c');
}
unsigned long long consensusPaxosIdx = 0;
#ifndef ENABLE_MULTIPLE_NODES
SetPaxosIndex(&consensusPaxosIdx);
#endif
SendXlogRecPtrResult(endptr, consensusPaxosIdx);
LWLockAcquire(FullBuildXlogCopyStartPtrLock, LW_EXCLUSIVE);
XlogCopyStartPtr = InvalidXLogRecPtr;
LWLockRelease(FullBuildXlogCopyStartPtrLock);
}
* list_sort comparison function, to compare log/seg portion of WAL segment
* filenames, ignoring the timeline portion.
*/
static int CompareWalFileNames(const void* a, const void* b)
{
char* fna = *((char**)a);
char* fnb = *((char**)b);
return strcmp(fna + 8, fnb + 8);
}
#ifdef ENABLE_MOT
* Called when ERROR or FATAL happens in PerformMotCheckpointFetch() after
* we have started the operation - make sure we end it!
*/
static void mot_checkpoint_fetch_cleanup(int code, Datum arg)
{
MOTCheckpointFetchUnlock();
}
* Sends the current checkpoint to the client.
*/
void PerformMotCheckpointFetch()
{
char fullChkptDir[MAXPGPATH] = {0};
char ctrlFilePath[MAXPGPATH] = {0};
size_t basePathLen = 0;
MOTCheckpointFetchLock();
PG_ENSURE_ERROR_CLEANUP(mot_checkpoint_fetch_cleanup, (Datum)0);
{
if (MOTCheckpointExists(ctrlFilePath, MAXPGPATH, fullChkptDir, MAXPGPATH, basePathLen)) {
SendMotCheckpointHeader(fullChkptDir);
StringInfoData buf;
pq_beginmessage(&buf, 'H');
pq_sendbyte(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage_noblock(&buf);
struct stat statbuf;
if (lstat(ctrlFilePath, &statbuf) != 0) {
ereport(ERROR,
(errcode_for_file_access(), errmsg("could not stat mot ctrl file \"%s\": %m", ctrlFilePath)));
}
sendFile(ctrlFilePath, "mot.ctrl", &statbuf, false);
sendDir(fullChkptDir, (int)basePathLen, false, NIL, false);
pq_putemptymessage_noblock('c');
}
}
PG_END_ENSURE_ERROR_CLEANUP(mot_checkpoint_fetch_cleanup, (Datum)0);
MOTCheckpointFetchUnlock();
}
#endif
* Parse the base backup options passed down by the parser
*/
static void parse_basebackup_options(List *options, basebackup_options *opt)
{
ListCell *lopt = NULL;
bool o_label = false;
bool o_progress = false;
bool o_fast = false;
bool o_nowait = false;
bool o_wal = false;
bool o_buildstandby = false;
bool o_copysecurefiles = false;
bool o_iscopyupgradefile = false;
bool o_tablespace_map = false;
bool o_isobsmode = false;
errno_t rc = memset_s(opt, sizeof(*opt), 0, sizeof(*opt));
securec_check(rc, "", "");
foreach (lopt, options) {
DefElem *defel = (DefElem *)lfirst(lopt);
if (strcmp(defel->defname, "label") == 0) {
if (o_label)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("duplicate option \"%s\"", defel->defname)));
opt->label = strVal(defel->arg);
o_label = true;
} else if (strcmp(defel->defname, "progress") == 0) {
if (o_progress)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("duplicate option \"%s\"", defel->defname)));
opt->progress = true;
o_progress = true;
} else if (strcmp(defel->defname, "fast") == 0) {
if (o_fast)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("duplicate option \"%s\"", defel->defname)));
opt->fastcheckpoint = true;
o_fast = true;
} else if (strcmp(defel->defname, "nowait") == 0) {
if (o_nowait)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("duplicate option \"%s\"", defel->defname)));
opt->nowait = true;
o_nowait = true;
} else if (strcmp(defel->defname, "wal") == 0) {
if (o_wal)
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("duplicate option \"%s\"", defel->defname)));
opt->includewal = true;
o_wal = true;
} else if (strcmp(defel->defname, "buildstandby") == 0) {
if (o_buildstandby) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("duplicate option \"%s\"", defel->defname)));
}
opt->isBuildFromStandby = true;
o_buildstandby = true;
} else if (strcmp(defel->defname, "tablespace_map") == 0) {
if (o_tablespace_map) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("duplicate option \"%s\"", defel->defname)));
}
opt->sendtblspcmapfile = true;
o_tablespace_map = true;
} else if (strcmp(defel->defname, "obsmode") == 0) {
if (o_isobsmode) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("duplicate option \"%s\"", defel->defname)));
}
opt->isObsmode = true;
o_isobsmode = true;
} else if (strcmp(defel->defname, "copysecurefile") == 0) {
if (o_copysecurefiles) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("duplicate option \"%s\"", defel->defname)));
}
opt->isCopySecureFiles = true;
o_copysecurefiles = true;
} else if (strcmp(defel->defname, "needupgradefile") == 0) {
if (o_iscopyupgradefile) {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("duplicate option \"%s\"", defel->defname)));
}
opt->isCopyUpgradeFile = true;
o_iscopyupgradefile = true;
} else {
ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("option \"%s\" not recognized", defel->defname)));
}
}
if (opt->label == NULL)
opt->label = "base backup";
}
* SendBaseBackup() - send a complete base backup.
*
* The function will put the system into backup mode like pg_start_backup()
* does, so that the backup is consistent even though we read directly from
* the filesystem, bypassing the buffer cache.
*/
void SendBaseBackup(BaseBackupCmd *cmd)
{
DIR *dir = NULL;
MemoryContext backup_context;
MemoryContext old_context;
basebackup_options opt;
parse_basebackup_options(cmd->options, &opt);
backup_context = AllocSetContextCreate(CurrentMemoryContext, "Streaming base backup context",
ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
old_context = MemoryContextSwitchTo(backup_context);
WalSndSetState(WALSNDSTATE_BACKUP);
if (opt.isCopySecureFiles) {
SendSecureFileToDisasterCluster(&opt);
MemoryContextSwitchTo(old_context);
MemoryContextDelete(backup_context);
return;
}
if (u_sess->attr.attr_common.update_process_title) {
char activitymsg[50];
int rc = 0;
rc = snprintf_s(activitymsg, sizeof(activitymsg), sizeof(activitymsg) - 1, "sending backup \"%s\"", opt.label);
securec_check_ss(rc, "", "");
set_ps_display(activitymsg, false);
}
if (ENABLE_DSS) {
int rc = 0;
char fullpath[MAXPGPATH] = {0};
char *dssdir = g_instance.attr.attr_storage.dss_attr.ss_dss_data_vg_name;
rc = snprintf_s(fullpath, MAXPGPATH, MAXPGPATH - 1, "%s/pg_tblspc", dssdir);
securec_check_ss(rc, "", "");
dir = AllocateDir(fullpath);
} else {
dir = AllocateDir("pg_tblspc");
}
if (dir == NULL) {
ereport(ERROR, (errcode_for_file_access(), errmsg("could not open directory \"%s\": %m", "pg_tblspc")));
return;
}
send_xlog_location();
perform_base_backup(&opt, dir);
FreeDir(dir);
MemoryContextSwitchTo(old_context);
MemoryContextDelete(backup_context);
}
static void send_int8_string(StringInfoData *buf, int64 intval)
{
char is[32];
int rc = 0;
rc = snprintf_s(is, sizeof(is), sizeof(is) - 1, INT64_FORMAT, intval);
securec_check_ss(rc, "", "");
pq_sendint32(buf, strlen(is));
pq_sendbytes(buf, is, strlen(is));
}
static void SendBackupHeader(List *tablespaces)
{
StringInfoData buf;
ListCell *lc = NULL;
pq_beginmessage(&buf, 'T');
pq_sendint16(&buf, 4);
pq_sendstring(&buf, "spcoid");
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendint32(&buf, OIDOID);
pq_sendint16(&buf, 4);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendstring(&buf, "spclocation");
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendint32(&buf, TEXTOID);
pq_sendint16(&buf, UINT16_MAX);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendstring(&buf, "size");
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendint32(&buf, INT8OID);
pq_sendint16(&buf, 8);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendstring(&buf, "relative");
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendint32(&buf, INT8OID);
pq_sendint16(&buf, 8);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage_noblock(&buf);
foreach (lc, tablespaces) {
tablespaceinfo *ti = (tablespaceinfo *)lfirst(lc);
pq_beginmessage(&buf, 'D');
pq_sendint16(&buf, 4);
if (ti->path == NULL) {
pq_sendint32(&buf, UINT32_MAX);
pq_sendint32(&buf, UINT32_MAX);
} else {
pq_sendint32(&buf, strlen(ti->oid));
pq_sendbytes(&buf, ti->oid, strlen(ti->oid));
char *path = ti->path;
if (ti->relativePath) {
path = ti->relativePath;
Assert(strlen(path) != strlen(ti->path));
}
pq_sendint32(&buf, strlen(path));
pq_sendbytes(&buf, path, strlen(path));
}
if (ti->size >= 0)
send_int8_string(&buf, ti->size / 1024);
else
pq_sendint32(&buf, UINT32_MAX);
if (ti->relativePath)
send_int8_string(&buf, 1);
else
pq_sendint32(&buf, UINT32_MAX);
pq_endmessage_noblock(&buf);
}
pq_puttextmessage_noblock('C', "SELECT");
}
#ifdef ENABLE_MOT
static void SendMotCheckpointHeader(const char* path)
{
StringInfoData buf;
pq_beginmessage(&buf, 'T');
pq_sendint16(&buf, 1);
pq_sendstring(&buf, "chkptloc");
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendint32(&buf, TEXTOID);
pq_sendint16(&buf, UINT16_MAX);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage_noblock(&buf);
if (path != NULL) {
pq_beginmessage(&buf, 'D');
pq_sendint16(&buf, 1);
pq_sendint32(&buf, strlen(path));
pq_sendbytes(&buf, path, strlen(path));
pq_endmessage_noblock(&buf);
}
pq_puttextmessage_noblock('C', "SELECT");
}
#endif
* Send a single resultset containing just a single
* XlogRecPtr record (in text format)
*/
static void SendXlogRecPtrResult(XLogRecPtr ptr, unsigned long long consensusPaxosIdx)
{
StringInfoData buf;
char str[MAXFNAMELEN];
int nRet = 0;
nRet = snprintf_s(str, MAXFNAMELEN, MAXFNAMELEN - 1, "%X/%X", (uint32)(ptr >> 32), (uint32)ptr);
securec_check_ss(nRet, "", "");
if (g_instance.attr.attr_storage.dcf_attr.enable_dcf) {
pq_beginmessage(&buf, 'T');
pq_sendint16(&buf, 2);
pq_sendstring(&buf, "recptr");
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendint32(&buf, TEXTOID);
pq_sendint16(&buf, UINT16_MAX);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendstring(&buf, "consensusPaxosIndex");
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendint32(&buf, INT8OID);
pq_sendint16(&buf, 8);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage_noblock(&buf);
pq_beginmessage(&buf, 'D');
pq_sendint16(&buf, 2);
pq_sendint32(&buf, strlen(str));
pq_sendbytes(&buf, str, strlen(str));
send_int8_string(&buf, consensusPaxosIdx);
} else {
pq_beginmessage(&buf, 'T');
pq_sendint16(&buf, 1);
pq_sendstring(&buf, "recptr");
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_sendint32(&buf, TEXTOID);
pq_sendint16(&buf, UINT16_MAX);
pq_sendint32(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage_noblock(&buf);
pq_beginmessage(&buf, 'D');
pq_sendint16(&buf, 1);
pq_sendint32(&buf, strlen(str));
pq_sendbytes(&buf, str, strlen(str));
}
pq_endmessage_noblock(&buf);
pq_puttextmessage_noblock('C', "SELECT");
}
* Inject a file with given name and content in the output tar stream.
*/
static void sendFileWithContent(const char *filename, const char *content)
{
struct stat statbuf;
int pad, len;
len = strlen(content);
* Construct a stat struct for the backup_label file we're injecting in
* the tar.
*/
#ifdef WIN32
statbuf.st_uid = 0;
statbuf.st_gid = 0;
#else
statbuf.st_uid = geteuid();
statbuf.st_gid = getegid();
#endif
statbuf.st_mtime = time(NULL);
statbuf.st_mode = S_IRUSR | S_IWUSR;
statbuf.st_size = len;
_tarWriteHeader(filename, NULL, &statbuf);
(void)pq_putmessage_noblock('d', content, len);
pad = ((len + 511) & ~511) - len;
if (pad > 0) {
char buf[512];
errno_t rc = 0;
rc = memset_s(buf, sizeof(buf), 0, pad);
securec_check(rc, "", "");
(void)pq_putmessage_noblock('d', buf, pad);
}
}
* Include the tablespace directory pointed to by 'path' in the output tar
* stream. If 'sizeonly' is true, we just calculate a total length and return
* it, without actually sending anything.
*
* Only used to send auxiliary tablespaces, not GAUSSDATA.
*/
int64 sendTablespace(const char *path, bool sizeonly)
{
int64 size = 0;
char pathbuf[MAXPGPATH] = {0};
char relativedirname[MAXPGPATH] = {0};
struct stat statbuf;
int rc = 0;
* 'path' points to the tablespace location, but we only want to include
* the version directory in it that belongs to us.
*/
if (ENABLE_DSS) {
rc = snprintf_s(relativedirname, sizeof(relativedirname), sizeof(relativedirname) - 1, "%s",
TABLESPACE_VERSION_DIRECTORY);
securec_check_ss(rc, "", "");
} else {
rc = snprintf_s(relativedirname, sizeof(relativedirname), sizeof(relativedirname) - 1, "%s_%s",
TABLESPACE_VERSION_DIRECTORY, g_instance.attr.attr_common.PGXCNodeName);
securec_check_ss(rc, "", "");
}
rc = snprintf_s(pathbuf, sizeof(pathbuf), sizeof(pathbuf) - 1, "%s/%s", path, relativedirname);
securec_check_ss(rc, "", "");
* Store a directory entry in the tar file so we get the permissions
* right.
*/
if (lstat(pathbuf, &statbuf) != 0) {
if (errno != ENOENT)
ereport(ERROR, (errcode_for_file_access(), errmsg("could not stat file or directory \"%s\": %m", pathbuf)));
return 0;
}
if (!sizeonly) {
if (ENABLE_DSS) {
_tarWriteHeader(pathbuf, NULL, &statbuf);
} else {
_tarWriteHeader(relativedirname, NULL, &statbuf);
}
}
size = BUILD_PATH_LEN;
size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true);
return size;
}
int IsBeginWith(const char *str1, char *str2)
{
if (str1 == NULL || str2 == NULL)
return -1;
int len1 = strlen(str1);
int len2 = strlen(str2);
if ((len1 < len2) || (len1 == 0 || len2 == 0)) {
return -1;
}
char *p = str2;
int i = 0;
while (*p != '\0') {
if (*p != str1[i]) {
return 0;
}
p++;
i++;
}
return 1;
}
bool IsSkipDir(const char * dirName)
{
if (strcmp(dirName, ".") == 0 || strcmp(dirName, "..") == 0)
return true;
if (strncmp(dirName, t_thrd.basebackup_cxt.g_xlog_location, strlen(dirName)) == 0)
return true;
if (strcmp(dirName, u_sess->attr.attr_common.Log_directory) == 0)
return true;
if (strncmp(dirName, PG_TEMP_FILE_PREFIX, strlen(PG_TEMP_FILE_PREFIX)) == 0)
return true;
if (strncmp(dirName, EXRTO_FILE_DIR, strlen(EXRTO_FILE_DIR)) == 0) {
return true;
}
* If there's a backup_label file, it belongs to a backup started by
* the user with pg_start_backup(). It is *not* correct for this
* backup, our backup_label is injected into the tar separately.
*/
if (strcmp(dirName, BACKUP_LABEL_FILE) == 0)
return true;
if (strcmp(dirName, DISABLE_CONN_FILE) == 0)
return true;
if (ENABLE_DSS) {
if (strcmp(dirName, "pg_doublewrite") == 0) {
return true;
}
if (strcmp(dirName, ".recycle") == 0) {
return true;
}
if (strcmp(dirName, "shared_postgresql.conf") == 0) {
return true;
}
if (strcmp(dirName, "shared_pg_hba.conf") == 0) {
return true;
}
}
return false;
}
bool IsSkipPath(const char * pathName)
{
if (strcmp(pathName, "./global/pg_control") == 0)
return true;
if (IsBeginWith(pathName, "./global/pg_dw") > 0)
return true;
if (strcmp(pathName, "./global/pg_dw") == 0)
return true;
if (strcmp(pathName, "./global/pg_dw_single") == 0)
return true;
if (strcmp(pathName, "./global/pg_dw.build") == 0)
return true;
if (strcmp(pathName, "./global/config_exec_params") == 0)
return true;
if (strcmp(pathName, "./gaussdb.state") == 0 || strcmp(pathName, "./gs_build.pid") == 0)
return true;
if (strcmp(pathName, "./disc_readonly_test") == 0)
return true;
if (strstr(pathName, "./pg_rewind_bak") != NULL || strstr(pathName, "./pg_rewind_filemap") != NULL)
return true;
* 1 major version upgrade mode, following path contains
* old database backup, need to skip
* 2 pg_location is for relative tablespace, we need skip it
*/
if (strcmp(pathName, "./full_upgrade_bak") == 0 || strcmp(pathName, "./pg_location") == 0)
return true;
if (strcmp(pathName, "./delay_xlog_recycle") == 0 || strcmp(pathName, "./delay_ddl_recycle") == 0)
return true;
if (t_thrd.walsender_cxt.is_obsmode == true && strcmp(pathName, "./pg_replslot") == 0)
return true;
if (ENABLE_DSS) {
char full_path[MAXPGPATH];
int rc = snprintf_s(full_path, sizeof(full_path), sizeof(full_path) - 1, "%s/pg_control",
g_instance.attr.attr_storage.dss_attr.ss_dss_data_vg_name);
securec_check_ss(rc, "\0", "\0");
if (strcmp(pathName, full_path) == 0) {
return true;
}
}
return false;
}
static bool IsDCFPath(const char *pathname)
{
char fullpath[MAXPGPATH] = {0};
errno_t rc = EOK;
if ((strcmp(pathname, "./paxosindex") == 0) || (strcmp(pathname, "./paxosindex.backup") == 0))
return true;
if (strlen(pathname) <= 2) {
return false;
}
rc = snprintf_s(fullpath, MAXPGPATH, MAXPGPATH - 1, "%s/%s/", t_thrd.proc_cxt.DataDir, pathname + 2);
securec_check_ss(rc, "", "");
int fullPathLen = strlen(fullpath);
char comparedPath[MAXPGPATH] = {0};
rc = snprintf_s(comparedPath, MAXPGPATH, MAXPGPATH - 1, "%s/",
g_instance.attr.attr_storage.dcf_attr.dcf_log_path);
securec_check_ss(rc, "", "");
if (strncmp(fullpath, comparedPath, fullPathLen) == 0) {
return true;
}
rc = snprintf_s(comparedPath, MAXPGPATH, MAXPGPATH - 1, "%s/",
g_instance.attr.attr_storage.dcf_attr.dcf_data_path);
securec_check_ss(rc, "", "");
if (strncmp(fullpath, comparedPath, fullPathLen) == 0) {
return true;
}
return false;
}
#define SEND_DIR_ADD_SIZE(size, statbuf) ((size) = (size) + (((statbuf).st_size + 511) & ~511) + BUILD_PATH_LEN)
remote backup client. */
class TpcFileStreamSender {
public:
@param[in] fileName the file name.
@param[in] basepathLen the basepath length of the file.
@param[in/out] stat the file's stat information. */
TpcFileStreamSender(const char *fileName, int basepathLen, struct stat& stat)
: m_fileName(fileName),
m_basepathLen(basepathLen),
m_statBuf(stat) {}
@return true if we succeed to open the file. otherwise return false. */
bool OpenFileForRead();
void SendHeader()
{
const char *tarFileName = m_fileName + m_basepathLen + 1;
_tarWriteHeader(tarFileName, NULL, static_cast<struct stat *>(&m_statBuf));
}
void SendStreams();
~TpcFileStreamSender()
{
if (m_streamExtent != nullptr) {
pfree(m_streamExtent);
m_streamExtent = nullptr;
}
if (m_rawExtent != nullptr) {
pfree(m_rawExtent);
m_rawExtent = nullptr;
}
if (m_file != nullptr) {
FreeFile(m_file);
}
}
private:
@param[in] ptr the buffer pointer
@param[in] ptrLen the buffer length. */
void SendStream(char *ptr, size_t ptrLen)
{
size_t sentLen = 0;
while (sentLen < ptrLen) {
auto leftLen = ptrLen - sentLen;
auto toSend = leftLen > TAR_SEND_SIZE ? TAR_SEND_SIZE : leftLen;
if (pq_putmessage_noblock('d', ptr + sentLen, toSend)) {
ereport(ERROR, (errcode_for_file_access(),
errmsg("failed to send compressed stream for backup")));
}
sentLen += toSend;
}
}
@param[in] blockNoValidate the start block no needs to validated
@return true if we succeed to read the extent. otherwise return false */
bool ReadCurrentExtent(uint16 blockNoValidate);
@return true if we succeed to Reorganize the new compressed extent. */
bool StreamCurrentExtent();
compression extent.
@param[in] no the relative n-th block in compression extent
@return The global logical block number */
BlockNumber LogicalBlockFromExtentBlock(BlockNumber no) const
{
return (CFS_LOGIC_BLOCKS_PER_FILE * m_segNo +
m_currentExtent * CFS_LOGIC_BLOCKS_PER_EXTENT +
no);
}
@return The raw PCA object. */
CfsExtentHeader *GetRawPca() const
{
Assert(m_rawExtent != nullptr);
auto pcaOffset = gExtentSizeInBytes - BLCKSZ;
return reinterpret_cast<CfsExtentHeader *>(m_rawExtent + pcaOffset);
}
@return The streamed PCA object. */
CfsExtentHeader *GetStreamPca() const
{
Assert(m_streamExtent != nullptr);
auto pcaOffset = gExtentSizeInBytes - BLCKSZ;
return reinterpret_cast<CfsExtentHeader *>(m_streamExtent + pcaOffset);
}
@param[in] ptr the buffer start pointer for the compact compressed pages
added.
@param[in] nthBlock the relative block number in the compressed pages extent.
@param[in/out] chunkPos curret allocated chunk_pos in new stream extent content.
@return the compressed page size in stream compress extent. return -1 in any failure
cases. */
int AddCompactCompressedPage(char *ptr, uint16 nthBlock, uint32 &chunkPos);
@param[in] blockNoValidate the relative block on in the extent
@return true if all blocks chunk adresses are validate from blockNoValidate. otherwise
return false. */
bool ValidateRawPCABlocksAddrFrom(uint16 blockNoValidate);
const char *m_fileName{nullptr};
int m_basepathLen{-1};
struct stat& m_statBuf;
FILE *m_file{nullptr};
int m_segNo{0};
size_t m_nExtents{0};
char *m_rawExtent{nullptr};
char *m_streamExtent{nullptr};
size_t m_currentExtent{0};
size_t m_retriedRead{0};
static constexpr size_t gExtentSizeInBytes = CFS_EXTENT_SIZE * BLCKSZ;
static const size_t gSNMaxFileReadTimes = 60;
static const size_t gSRetrySleepIntervalUs = 1000000;
};
* send file or compressed file
* @param sizeOnly send or not
* @param pathbuf path
* @param basepathlen subfix of path
* @param statbuf path stat
*/
static int64 SendRealFile(bool sizeOnly, char* pathbuf, int basepathlen, struct stat* statbuf)
{
int64 size = 0;
if (!sizeOnly && g_instance.attr.attr_storage.enableIncrementalCheckpoint &&
IsCompressedFile(pathbuf, strlen(pathbuf))) {
TpcFileStreamSender sender(pathbuf, basepathlen, *statbuf);
if (!sender.OpenFileForRead()) {
return size;
}
sender.SendHeader();
sender.SendStreams();
SEND_DIR_ADD_SIZE(size, *statbuf);
} else {
bool sent = false;
if (!sizeOnly) {
if (ENABLE_DSS && is_dss_file(pathbuf)) {
sent = sendFile(pathbuf, pathbuf, statbuf, true);
} else {
sent = sendFile(pathbuf, pathbuf + basepathlen + 1, statbuf, true);
}
}
if (sent || sizeOnly) {
SEND_DIR_ADD_SIZE(size, (*statbuf));
}
}
return size;
}
* Include all files from the given directory in the output tar stream. If
* 'sizeonly' is true, we just calculate a total length and return it, without
* actually sending anything.
*
* Omit any directory in the tablespaces list, to avoid backing up
* tablespaces twice when they were created inside PGDATA.
*/
#ifdef ENABLE_MOT
static int64 sendDir(
const char *path, int basepathlen, bool sizeonly, List *tablespaces, bool sendtblspclinks, bool skipmot)
#else
static int64 sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces, bool sendtblspclinks)
#endif
{
struct dirent *de = NULL;
char pathbuf[MAXPGPATH];
struct stat statbuf;
int64 size = 0;
int rc = 0;
char *dssdir = g_instance.attr.attr_storage.dss_attr.ss_dss_data_vg_name;
DIR *dir = AllocateDir(path);
while ((de = ReadDir(dir, path)) != NULL) {
if (IsSkipDir(de->d_name)) {
continue;
}
* Check if the postmaster has signaled us to exit, and abort with an
* error in that case. The error handler further up will call
* do_pg_abort_backup() for us.
*/
if (!PostmasterIsAlive()) {
ereport(ERROR, (errcode_for_file_access(), errmsg("Postmaster exited, aborting active base backup")));
}
if (t_thrd.walsender_cxt.walsender_shutdown_requested || t_thrd.walsender_cxt.walsender_ready_to_stop)
ereport(ERROR, (errcode_for_file_access(), errmsg("shutdown requested, aborting active base backup")));
if (t_thrd.postmaster_cxt.HaShmData &&
(t_thrd.walsender_cxt.server_run_mode != t_thrd.postmaster_cxt.HaShmData->current_mode))
ereport(ERROR, (errcode_for_file_access(), errmsg("server run mode changed, aborting active base backup")));
rc = snprintf_s(pathbuf, MAXPGPATH, MAXPGPATH - 1, "%s/%s", path, de->d_name);
securec_check_ss(rc, "", "");
if (g_instance.attr.attr_storage.dcf_attr.enable_dcf && IsDCFPath(pathbuf)) {
continue;
}
if (strcmp(pathbuf, "./postmaster.pid") == 0 || strcmp(pathbuf, "./postmaster.opts") == 0 ||
strcmp(pathbuf, "./gs_gazelle.conf") == 0)
continue;
if (strcmp(u_sess->attr.attr_common.application_name, "gs_basebackup") != 0) {
if (strcmp(pathbuf, "./pg_ctl.lock") == 0 || strcmp(pathbuf, "./postgresql.conf.lock") == 0 ||
strcmp(pathbuf, "./postgresql.conf.bak") == 0 || strcmp(pathbuf, "./postgresql.conf") == 0 ||
strcmp(de->d_name, "postgresql.conf.guc.bak") == 0 ||
strcmp(pathbuf, "./postgresql.conf.bak.old") == 0) {
continue;
}
if (strcmp(de->d_name, g_instance.attr.attr_security.ssl_cert_file) == 0 ||
strcmp(de->d_name, g_instance.attr.attr_security.ssl_key_file) == 0 ||
strcmp(de->d_name, g_instance.attr.attr_security.ssl_ca_file) == 0 ||
strcmp(de->d_name, g_instance.attr.attr_security.ssl_crl_file) == 0 ||
strcmp(de->d_name, ssl_cipher_file) == 0 || strcmp(de->d_name, ssl_rand_file) == 0
#ifdef USE_TASSL
|| strcmp(de->d_name, g_instance.attr.attr_security.ssl_enc_cert_file) == 0 ||
strcmp(de->d_name, g_instance.attr.attr_security.ssl_enc_key_file) == 0 ||
strcmp(de->d_name, ssl_enc_cipher_file) == 0 || strcmp(de->d_name, ssl_enc_rand_file) == 0
#endif
)
{
continue;
}
if (strcmp(pathbuf, "./client.crt") == 0 || strcmp(pathbuf, "./client.key") == 0) {
continue;
}
}
if (IS_PGXC_COORDINATOR && strcmp(pathbuf, "./pg_hba.conf") == 0)
continue;
if (IS_PGXC_COORDINATOR && strcmp(pathbuf, "./cn_drop_backup") == 0)
continue;
if (IsSkipPath(pathbuf)) {
continue;
}
#ifdef ENABLE_MOT
if (skipmot && (strcmp(pathbuf, "./mot.ctrl") == 0 || strncmp(pathbuf, "./chkpt_", strlen("./chkpt_")) == 0)) {
continue;
}
#endif
if (lstat(pathbuf, &statbuf) != 0) {
if (errno != ENOENT)
ereport(ERROR,
(errcode_for_file_access(), errmsg("could not stat file or directory \"%s\": %m", pathbuf)));
continue;
}
* Skip pg_errorinfo, not useful to copy. But include
* it as an empty directory anyway, so we get permissions right.
*/
if (strcmp(de->d_name, "pg_errorinfo") == 0) {
if (!sizeonly)
_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
size += BUILD_PATH_LEN;
continue;
}
* Skip physical slot file, not useful to copy, only include logical slot file.
*/
if (strcmp(path, "./pg_replslot") == 0) {
bool isphysicalslot = false;
bool isArchiveSlot = false;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (int i = 0; i < g_instance.attr.attr_storage.max_replication_slots; i++) {
ReplicationSlot *s = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[i];
if (s->in_use && s->data.database == InvalidOid && strcmp(de->d_name, NameStr(s->data.name)) == 0 &&
GET_SLOT_PERSISTENCY(s->data) != RS_BACKUP && GET_SLOT_EXTRA_DATA_LENGTH(s->data) == 0) {
isphysicalslot = true;
break;
}
if (s->in_use && s->data.database == InvalidOid && strcmp(de->d_name, NameStr(s->data.name)) == 0 &&
GET_SLOT_PERSISTENCY(s->data) != RS_BACKUP && s->extra_content != NULL) {
isArchiveSlot = true;
break;
}
}
LWLockRelease(ReplicationSlotControlLock);
if (isphysicalslot || (isArchiveSlot && AM_WAL_HADR_DNCN_SENDER))
continue;
}
* We can skip pg_xlog, the WAL segments need to be fetched from the
* WAL archive anyway. But include it as an empty directory anyway, so
* we get permissions right.
*/
if (strcmp(pathbuf, "./pg_xlog") == 0 ||
(ENABLE_DSS && strncmp(pathbuf, dssdir, strlen(dssdir)) == 0 &&
strstr(pathbuf + strlen(dssdir), "/pg_xlog") != NULL) ||
(ENABLE_DSS && strcmp(pathbuf, dssdir) == 0 &&
strstr(pathbuf + strlen(dssdir), "/pg_replication") != NULL)) {
if (!sizeonly) {
#ifndef WIN32
if (S_ISLNK(statbuf.st_mode)) {
#else
if (pgwin32_is_junction(pathbuf)) {
#endif
#if defined(HAVE_READLINK) || defined(WIN32)
char linkpath[MAXPGPATH] = {0};
int rllen = readlink(pathbuf, linkpath, sizeof(linkpath));
if (rllen < 0)
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not read symbolic link \"%s\": %m", pathbuf)));
if (rllen >= (int)sizeof(linkpath))
ereport(ERROR, (errcode(ERRCODE_NAME_TOO_LONG),
errmsg("symbolic link \"%s\" target is too long", pathbuf)));
linkpath[MAXPGPATH - 1] = '\0';
if (!sizeonly){
if (ENABLE_DSS && is_dss_file(pathbuf)) {
_tarWriteHeader(pathbuf, linkpath, &statbuf);
} else {
_tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf);
}
}
#else
* If the platform does not have symbolic links, it should not be
* possible to have tablespaces - clearly somebody else created
* them. Warn about it and ignore.
*/
ereport(WARNING, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("tablespaces are not supported on this platform")));
continue;
#endif
} else if (S_ISDIR(statbuf.st_mode)) {
statbuf.st_mode = S_IFDIR | S_IRWXU;
if (ENABLE_DSS && is_dss_file(pathbuf)) {
_tarWriteHeader(pathbuf, NULL, &statbuf);
} else {
_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
}
}
}
size += BUILD_PATH_LEN;
if (!sizeonly) {
#ifndef WIN32
if (S_ISLNK(statbuf.st_mode)) {
#else
if (pgwin32_is_junction(pathbuf)) {
#endif
#if defined(HAVE_READLINK) || defined(WIN32)
char linkpath[MAXPGPATH] = {0};
int rllen = readlink(pathbuf, linkpath, sizeof(linkpath));
if (rllen < 0)
ereport(ERROR,
(errcode_for_file_access(), errmsg("could not read symbolic link \"%s\": %m", pathbuf)));
if (rllen >= (int)sizeof(linkpath))
ereport(ERROR, (errcode(ERRCODE_NAME_TOO_LONG),
errmsg("symbolic link \"%s\" target is too long", pathbuf)));
linkpath[MAXPGPATH - 1] = '\0';
if (ENABLE_DSS && is_dss_file(pathbuf)) {
_tarWriteHeader(pathbuf, linkpath, &statbuf);
} else {
_tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf);
}
#else
* If the platform does not have symbolic links, it should not be
* possible to have tablespaces - clearly somebody else created
* them. Warn about it and ignore.
*/
ereport(WARNING, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("tablespaces are not supported on this platform")));
continue;
#endif
} else if (S_ISDIR(statbuf.st_mode)) {
* Also send archive_status directory (by hackishly reusing
* statbuf from above ...).
*/
statbuf.st_mode = S_IFDIR | S_IRWXU;
if (ENABLE_DSS && is_dss_file(pathbuf)) {
_tarWriteHeader(pathbuf, NULL, &statbuf);
} else {
_tarWriteHeader("pg_xlog/archive_status", NULL, &statbuf);
}
}
}
size += BUILD_PATH_LEN;
continue;
}
if ((strcmp(path, "./pg_tblspc") == 0 ||
(ENABLE_DSS && strncmp(pathbuf, dssdir, strlen(dssdir)) == 0 &&
strstr(pathbuf + strlen(dssdir), "/pg_tblspc") != NULL)) &&
#ifndef WIN32
S_ISLNK(statbuf.st_mode)
#else
pgwin32_is_junction(pathbuf)
#endif
) {
#if defined(HAVE_READLINK) || defined(WIN32)
char linkpath[MAXPGPATH];
int rllen = readlink(pathbuf, linkpath, sizeof(linkpath));
if (rllen < 0)
ereport(ERROR, (errcode_for_file_access(), errmsg("could not read symbolic link \"%s\": %m", pathbuf)));
if (rllen >= (int)sizeof(linkpath))
ereport(ERROR,
(errcode(ERRCODE_NAME_TOO_LONG), errmsg("symbolic link \"%s\" target is too long", pathbuf)));
linkpath[rllen] = '\0';
if (!sizeonly || ENABLE_DSS){
if (is_dss_file(pathbuf)) {
_tarWriteHeader(pathbuf, linkpath, &statbuf);
} else {
_tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf);
}
}
size += BUILD_PATH_LEN;
#else
* If the platform does not have symbolic links, it should not be
* possible to have tablespaces - clearly somebody else created
* them. Warn about it and ignore.
*/
ereport(WARNING,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("tablespaces are not supported on this platform")));
continue;
#endif
} else if (S_ISDIR(statbuf.st_mode)) {
bool skip_this_dir = false;
ListCell *lc = NULL;
* Store a directory entry in the tar file so we can get the
* permissions right.
*/
if (!sizeonly){
if (ENABLE_DSS && is_dss_file(pathbuf)) {
_tarWriteHeader(pathbuf, NULL, &statbuf);
} else {
_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
}
}
size += BUILD_PATH_LEN;
* Call ourselves recursively for a directory, unless it happens
* to be a separate tablespace located within PGDATA.
*/
foreach (lc, tablespaces) {
tablespaceinfo *ti = (tablespaceinfo *)lfirst(lc);
* ti->rpath is the tablespace relative path within PGDATA, or
* NULL if the tablespace has been properly located somewhere
* else.
*
* Skip past the leading "./" in pathbuf when comparing.
*/
if (ti->relativePath && strcmp(ti->relativePath, pathbuf + 2) == 0) {
skip_this_dir = true;
break;
}
}
* skip sending directories inside pg_tblspc, if not required.
*/
if (!sendtblspclinks &&
((ENABLE_DSS && strncmp(pathbuf, dssdir, strlen(dssdir)) == 0 &&
strstr(pathbuf + strlen(dssdir), "/pg_tblspc") != NULL) ||
strcmp(pathbuf, "./pg_tblspc") == 0)) {
skip_this_dir = true;
}
if (!skip_this_dir)
size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks);
} else if (S_ISREG(statbuf.st_mode)) {
size += SendRealFile(sizeonly, pathbuf, basepathlen, &statbuf);
} else
ereport(WARNING, (errmsg("skipping special file \"%s\"", pathbuf)));
}
FreeDir(dir);
return size;
}
* Functions for handling tar file format
*
* Copied from pg_dump, but modified to work with libpq for sending
*
* Utility routine to print possibly larger than 32 bit integers in a
* portable fashion. Filled with zeros.
*/
static void print_val(char *s, uint64 val, unsigned int base, size_t len)
{
int i;
for (i = len; i > 0; i--) {
int digit = val % base;
s[i - 1] = '0' + digit;
val = val / base;
}
}
static ForkNumber forkname_to_number(char *forkName, int *segNo)
{
ForkNumber forkNum;
if (forkName == NULL || *forkName == '\0')
return InvalidForkNumber;
for (int iforkNum = 0; iforkNum <= MAX_FORKNUM; iforkNum++) {
forkNum = (ForkNumber)iforkNum;
if (strstr(forkName, forkNames[forkNum]) != NULL) {
int nmatch = sscanf_s(forkName + strlen(forkNames[forkNum]), ".%d", segNo);
if (nmatch == 0) {
*segNo = 0;
}
return forkNum;
}
}
return MAX_FORKNUM + 1;
}
static bool check_data_filename(char *filename, int *segNo)
{
if (basename(filename) != filename) {
return false;
}
char *token = NULL;
char *tmptoken = NULL;
int nmatch = 0;
unsigned int relNode;
token = strtok_r(filename, "_", &tmptoken);
if ('\0' == tmptoken[0]) {
uint dot_count = 0;
int filename_idx = static_cast<int>(strlen(filename) - 1);
if (isdigit(filename[filename_idx]) == false) {
*segNo = 0;
return false;
}
while (filename_idx >= 0) {
if (filename[filename_idx] == '.') {
dot_count++;
}
if ((isdigit(filename[filename_idx]) == false && filename[filename_idx] != '.') || dot_count > 1) {
*segNo = 0;
return false;
}
filename_idx--;
}
nmatch = sscanf_s(filename, "%u.%d", &relNode, segNo);
return (nmatch == 1 || nmatch == 2);
} else {
ForkNumber forkNum = forkname_to_number(tmptoken, segNo);
if (forkNum == InvalidForkNumber) {
return false;
}
if (forkNum > MAX_FORKNUM) {
return false;
}
if (forkNum == FSM_FORKNUM || forkNum == VISIBILITYMAP_FORKNUM) {
return true;
}
return false;
}
}
UndoFileType CheckUndoPath(const char* fname, int* segNo)
{
RelFileNode rnode;
rnode.spcNode = InvalidOid;
rnode.dbNode = InvalidOid;
rnode.relNode = InvalidOid;
rnode.bucketNode = InvalidBktId;
rnode.opt = 0;
if (sscanf_s(fname, "undo/permanent/%05X.%07zX", &rnode.relNode, segNo) == MATCH_TWO) {
return UNDO_RECORD;
} else if (sscanf_s(fname, "undo/permanent/%05X.meta.%07zX", &rnode.relNode, segNo) == MATCH_TWO) {
return UNDO_META;
}
return UNDO_INVALID;
}
bool is_row_data_file(const char *path, int *segNo, UndoFileType *undoFileType)
{
struct stat path_st;
if (stat(path, &path_st) < 0) {
if (errno != ENOENT) {
ereport(ERROR, (errmsg("Cannot stat file %s when judge it's a rowdatafile for roach. OS error: %s", path,
strerror(errno))));
} else {
ereport(INFO, (errmsg("File %s not exists when judge it's a rowdatafile for roach. OS error: %s", path,
strerror(errno))));
}
return false;
} else if (S_ISDIR(path_st.st_mode)) {
return false;
}
char tablePath[MAXPGPATH] = {0};
if (IsCompressedFile(path, strlen(path))) {
auto rc = memcpy_s(tablePath, MAXPGPATH, path, strlen(path) - strlen(COMPRESS_STR));
securec_check_c(rc, "", "");
path = tablePath;
}
char buf[FILE_NAME_MAX_LEN];
unsigned int dbNode;
unsigned int spcNode;
int nmatch;
char *fname = NULL;
size_t pathLen = strlen(path);
if (pathLen >= 4) {
const char* suffix = path + pathLen - 4;
if (strncmp(suffix, "_pca", 4) == 0 || strncmp(suffix, "_pcd", 4) == 0) {
return false;
}
}
if ((fname = strstr((char *)path, "pg_tblspc/")) != NULL) {
nmatch = sscanf_s(fname, "pg_tblspc/%u/%*[^/]/%u/%s", &spcNode, &dbNode, buf, sizeof(buf));
if (nmatch == 3) {
return check_data_filename(buf, segNo);
}
}
if ((fname = strstr((char *)path, "global/")) != NULL) {
nmatch = sscanf_s(fname, "global/%s", buf, sizeof(buf));
if (nmatch == 1) {
return check_data_filename(buf, segNo);
}
}
if ((fname = strstr((char *)path, "base/")) != NULL) {
nmatch = sscanf_s(fname, "base/%u/%s", &dbNode, buf, sizeof(buf));
if (nmatch == 2) {
return check_data_filename(buf, segNo);
}
}
if ((fname = strstr((char *)path, "PG_9.2_201611171")) != NULL) {
nmatch = sscanf_s(fname, "PG_9.2_201611171_%*[^/]/%u/%s", &dbNode, buf, sizeof(buf));
if (nmatch == 2) {
return check_data_filename(buf, segNo);
}
}
if ((fname = strstr((char*)path, "undo/")) != NULL) {
*undoFileType = CheckUndoPath(fname, segNo);
return (*undoFileType != UNDO_INVALID);
}
return false;
}
static void SendTableSpaceForBackup(basebackup_options* opt, List* tablespaces, char* labelfile, char* tblspc_map_file)
{
ListCell *lc = NULL;
int64 asize = 0;
char *dssdir = g_instance.attr.attr_storage.dss_attr.ss_dss_data_vg_name;
if (ENABLE_DSS) {
asize = sendDir(".", 1, true, tablespaces, true) + sendDir(dssdir, 1, true, tablespaces, true);
} else {
asize = sendDir(".", 1, true, tablespaces, true);
}
tablespaceinfo *ti = (tablespaceinfo *)palloc0(sizeof(tablespaceinfo));
ti->size = opt->progress ? asize : -1;
tablespaces = (List *)lappend(tablespaces, ti);
SendBackupHeader(tablespaces);
foreach (lc, tablespaces) {
tablespaceinfo *iterti = (tablespaceinfo *)lfirst(lc);
StringInfoData buf;
pq_beginmessage(&buf, 'H');
pq_sendbyte(&buf, 0);
pq_sendint16(&buf, 0);
pq_endmessage_noblock(&buf);
if (iterti->path == NULL)
sendFileWithContent(BACKUP_LABEL_FILE, labelfile);
* if the tblspc created in datadir , the files under tblspc do not send,
* and send them as normal under datadir,
* so we just send these tblspcs only once.
*/
if (iterti->path != NULL) {
sendTablespace(iterti->path, false);
} else {
if (tblspc_map_file && opt->sendtblspcmapfile) {
sendFileWithContent(TABLESPACE_MAP, tblspc_map_file);
sendDir(".", 1, false, tablespaces, false);
} else
sendDir(".", 1, false, tablespaces, true);
if (ENABLE_DSS) {
sendDir(dssdir, 1, false, tablespaces, true);
}
}
if (iterti->path == NULL) {
struct stat statbuf;
TimeLineID primay_tli = 0;
char path[MAXPGPATH] = {0};
if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0) {
LWLockAcquire(FullBuildXlogCopyStartPtrLock, LW_EXCLUSIVE);
XlogCopyStartPtr = InvalidXLogRecPtr;
LWLockRelease(FullBuildXlogCopyStartPtrLock);
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not stat control file \"%s\": %m", XLOG_CONTROL_FILE)));
}
sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false);
primay_tli = t_thrd.xlog_cxt.ThisTimeLineID;
while (primay_tli > 1) {
TLHistoryFilePath(path, MAXPGPATH, primay_tli);
if (lstat(path, &statbuf) == 0)
sendFile(path, path, &statbuf, false);
primay_tli--;
}
}
* If we're including WAL, and this is the main data directory we
* don't terminate the tar stream here. Instead, we will append
* the xlog files below and terminate it then. This is safe since
* the main data directory is always sent *last*.
*/
if (opt->includewal && iterti->path == NULL) {
Assert(lnext(lc) == NULL);
} else
pq_putemptymessage_noblock('c');
}
}
* init buf_block if not yet; repalloc PqSendBuffer if necessary
*/
static void SendFilePreInit(void)
{
if (t_thrd.basebackup_cxt.buf_block == NULL) {
MemoryContext oldcxt = MemoryContextSwitchTo(THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE));
t_thrd.basebackup_cxt.buf_block = (char *)palloc0(TAR_SEND_SIZE);
MemoryContextSwitchTo(oldcxt);
}
* repalloc to `MaxBuildAllocSize' in one time, to avoid many small step repalloc in `pq_putmessage_noblock'
* and low performance.
*/
if (INT2SIZET(t_thrd.libpq_cxt.PqSendBufferSize) < MaxBuildAllocSize) {
t_thrd.libpq_cxt.PqSendBuffer = (char *)repalloc(t_thrd.libpq_cxt.PqSendBuffer, MaxBuildAllocSize);
t_thrd.libpq_cxt.PqSendBufferSize = MaxBuildAllocSize;
}
}
* check file
* @param readFileName
* @param statbuf
* @param supress error if missingOk is false when file is not found
* @return return null if file.size > MAX_TAR_MEMBER_FILELEN or file cant found
*/
static FILE *SizeCheckAndAllocate(char *readFileName, const struct stat &statbuf, bool missingOk)
{
* Some compilers will throw a warning knowing this test can never be true
* because pgoff_t can't exceed the compared maximum on their platform.
*/
if (statbuf.st_size > MAX_TAR_MEMBER_FILELEN) {
ereport(WARNING, (errcode(ERRCODE_NAME_TOO_LONG),
errmsg("archive member \"%s\" too large for tar format", readFileName)));
return NULL;
}
FILE *fp = AllocateFile(readFileName, "rb");
if (fp == NULL) {
if (errno == ENOENT && missingOk)
return NULL;
ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", readFileName)));
}
return fp;
}
@return true if we succeed to open the file. otherwise return false. */
bool TpcFileStreamSender::OpenFileForRead()
{
if (INT2SIZET(t_thrd.libpq_cxt.PqSendBufferSize) < MaxBuildAllocSize) {
t_thrd.libpq_cxt.PqSendBuffer = (char *)repalloc(t_thrd.libpq_cxt.PqSendBuffer, MaxBuildAllocSize);
t_thrd.libpq_cxt.PqSendBufferSize = MaxBuildAllocSize;
}
m_file = SizeCheckAndAllocate(const_cast<char *>(m_fileName), m_statBuf, true);
if (m_file == NULL) {
return false;
}
struct stat fileStat;
if (fstat(fileno(m_file), &fileStat) != 0) {
if (errno != ENOENT) {
ereport(ERROR, (errcode_for_file_access(),
errmsg("[BACKUP] could not stat file or directory \"%s\": ",
m_fileName)));
return false;
}
}
UndoFileType undoFileType = UNDO_INVALID;
if (!is_row_data_file(m_fileName, &m_segNo, &undoFileType)) {
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("[BACKUP] %s is not a relation file.", m_fileName)));
return false;
}
m_nExtents = fileStat.st_size / gExtentSizeInBytes;
auto currentMemory = MemoryContextSwitchTo(THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE));
m_rawExtent = static_cast<char *>(palloc(gExtentSizeInBytes));
m_streamExtent = static_cast<char *>(palloc(gExtentSizeInBytes));
MemoryContextSwitchTo(currentMemory);
return true;
}
@param[in] ptr the buffer pointer
@param[in] ptrLen the buffer length. */
void TpcFileStreamSender::SendStreams()
{
for (m_currentExtent = 0; m_currentExtent < m_nExtents; m_currentExtent++) {
if (!StreamCurrentExtent()) {
return;
}
#ifdef USE_ASSERT_CHECKING
auto new_pca = GetStreamPca();
auto ptr = m_streamExtent;
for (uint16 i = 0; i < new_pca->nblocks; i++) {
auto blockAddr = GetExtentAddress(new_pca, i);
auto compressedPageSize = blockAddr->nchunks * new_pca->chunk_size;
Assert(blockAddr->nchunks == blockAddr->allocated_chunks);
if (compressedPageSize != BLCKSZ) {
Assert(CompressedChecksum(ptr));
} else {
auto actualChecksum = pg_checksum_page(ptr, LogicalBlockFromExtentBlock(i));
Assert(actualChecksum == (PageHeader(ptr))->pd_checksum);
}
ptr += compressedPageSize;
}
#endif
SendStream(m_streamExtent, gExtentSizeInBytes);
}
size_t totalSent = m_nExtents * gExtentSizeInBytes;
size_t padSize = ((totalSent + 511) & (~511)) - totalSent;
if (padSize > 0) {
Assert(padSize < 512);
securec_check(memset_s(m_streamExtent, padSize, 0, padSize), "\0", "\0");
SendStream(m_streamExtent, padSize);
}
}
@param[in] blockNoValidate the relative block on in the extent
@return true if all blocks chunk adresses are validated as passed from blockNoValidate.
Otherwise return false. */
bool TpcFileStreamSender::ValidateRawPCABlocksAddrFrom(uint16 blockNoValidate) {
auto pca = GetRawPca();
auto nBlocks = pca->nblocks;
if (nBlocks > CFS_LOGIC_BLOCKS_PER_EXTENT || blockNoValidate > nBlocks ||
pca->chunk_size > CHUNK_SIZE_LIST[0]) {
ereport(LOG, (errmsg("[BACKUP] found invalid PCA format during check block chunks address "
"from block %u in file %s, current extent: %lu, total blocks in extent: "
"%u, PCA chunk size: %u", blockNoValidate, m_fileName,
m_currentExtent, nBlocks, pca->chunk_size)));
return false;
}
for (uint16 blkNo = blockNoValidate; blkNo < nBlocks; ++blkNo) {
auto block_addr = GetExtentAddress(pca, blkNo);
if (block_addr->checksum != AddrChecksum32(block_addr, block_addr->allocated_chunks) ||
block_addr->allocated_chunks > (BLCKSZ / pca->chunk_size)) {
ereport(LOG, (errmsg("[BACKUP] failed to validate block %u chunk address in file %s, "
"current extent: %lu, allocated chunks: %u", blkNo,
m_fileName, m_currentExtent, block_addr->allocated_chunks)));
return false;
}
}
return true;
}
@param[in] blockNoValidate the start block no needs to validated
@return true if we succeed to read the extent. otherwise return false */
bool TpcFileStreamSender::ReadCurrentExtent(uint16 blockNoValidate)
{
Assert(blockNoValidate < CFS_LOGIC_BLOCKS_PER_EXTENT);
for (; m_retriedRead < gSNMaxFileReadTimes; m_retriedRead++) {
auto rc = memset_s(m_rawExtent, gExtentSizeInBytes, 0xFF, gExtentSizeInBytes);
securec_check(rc, "\0", "\0");
if (rc != EOK) {
break;
}
if (m_retriedRead != 0) {
pg_usleep(gSRetrySleepIntervalUs);
ereport(LOG, (errmsg("[BACKUP] retry to read extent: %lu in file %s, retry times: %lu",
m_currentExtent, m_fileName, m_retriedRead)));
}
if (fseeko(m_file, m_currentExtent * gExtentSizeInBytes, SEEK_SET) != 0) {
ereport(ERROR, (errcode_for_file_access(),
errmsg("[BACKUP] failed to seek file: %s on position: %lu",
m_fileName, m_currentExtent * gExtentSizeInBytes)));
return false;
}
if (fread(m_rawExtent, 1, gExtentSizeInBytes, m_file) != gExtentSizeInBytes &&
ferror(m_file)) {
ereport(ERROR, (errcode_for_file_access(),
errmsg("[BACKUP] failed to read file: %s on postion: %lu",
m_fileName, m_currentExtent * gExtentSizeInBytes)));
return false;
}
if (!ValidateRawPCABlocksAddrFrom(blockNoValidate)) {
continue;
}
return true;
}
ereport(ERROR, (errcode_for_file_access(),
errmsg("[BACKUP] failed to read the valid %luth extent in file %s "
"because of the corrupted PCA page, total read times: %lu",
m_currentExtent, m_fileName, m_retriedRead)));
return false;
}
@param[in] ptr the buffer start pointer for the compact compressed pages
added.
@param[in] nthBlock the relative block number in the compressed pages extent.
@param[in/out] chunkPos curret allocated chunk_pos in new stream extent content.
@return the compressed page size in stream compress extent. return -1 in any failure
cases. */
int TpcFileStreamSender::AddCompactCompressedPage(char *ptr, uint16 nthBlock, uint32 &chunkPos)
{
do {
auto pca = GetRawPca();
auto copiedPtr = ptr;
CfsExtentAddress *block_addr = GetExtentAddress(pca, nthBlock);
for (uint8 i = 0; i < block_addr->nchunks; i++) {
auto start_index = i;
auto chunk_start_pos = OffsetOfPageCompressChunk(pca->chunk_size, block_addr->chunknos[i]);
while (i < block_addr->nchunks - 1 &&
block_addr->chunknos[i + 1] == block_addr->chunknos[i] + 1) {
i++;
}
auto sizeToCopy = (i - start_index + 1) * pca->chunk_size;
if (memcpy_s(copiedPtr, sizeToCopy, m_rawExtent + chunk_start_pos, sizeToCopy) != EOK) {
return -1;
}
copiedPtr += sizeToCopy;
}
if (unlikely(block_addr->nchunks) == 0) {
return 0;
}
auto compressedPageSize = block_addr->nchunks * pca->chunk_size;
if (compressedPageSize == BLCKSZ) {
auto logicalBlkNo = LogicalBlockFromExtentBlock(nthBlock);
auto actualChecksum = pg_checksum_page(ptr, logicalBlkNo);
if (actualChecksum != (PageHeader(ptr))->pd_checksum) {
ereport(LOG, (errmsg("[BACKUP] found corrupted fake compressed page: %u for backup "
"in %luth extent of file %s, actual checksum: %u, epected checksum: %u",
logicalBlkNo, m_currentExtent, m_fileName, actualChecksum,
(PageHeader(ptr))->pd_checksum)));
continue;
}
} else {
if (compressedPageSize != 0 && !CompressedChecksum(ptr)) {
ereport(LOG, (errmsg("[BACKUP] found corrupted compressed page: %u for backup in "
"%luth extent of file %s", LogicalBlockFromExtentBlock(nthBlock),
m_currentExtent, m_fileName)));
continue;
}
}
CfsExtentAddress *newBlockAddr = GetExtentAddress(GetStreamPca(), nthBlock);
for (uint8 i = 0; i < block_addr->nchunks; i++) {
newBlockAddr->chunknos[i] = chunkPos++;
}
newBlockAddr->nchunks = block_addr->nchunks;
newBlockAddr->allocated_chunks = block_addr->nchunks;
newBlockAddr->checksum = AddrChecksum32(newBlockAddr, newBlockAddr->nchunks);
return copiedPtr - ptr;
} while ((++m_retriedRead) < gSNMaxFileReadTimes && ReadCurrentExtent(nthBlock));
return -1;
}
@return true if we succeed to Reorganize the new compressed extent. */
bool TpcFileStreamSender::StreamCurrentExtent()
{
uint32 newChunkPos = 1;
m_retriedRead = 0;
if (!ReadCurrentExtent(0)) {
ereport(ERROR, (errcode_for_file_access(),
errmsg("[BACKUP] failed to read extent %lu of file %s for the first time, "
"total read times: %lu", m_currentExtent, m_fileName, m_retriedRead)));
return false;
}
auto pca = GetRawPca();
auto nBlocks = pca->nblocks;
auto rc = memset_s(m_streamExtent, gExtentSizeInBytes, 0, gExtentSizeInBytes);
securec_check(rc, "\0", "\0");
if (rc != EOK) {
ereport(ERROR, (errcode_for_file_access(), errmsg("[BACKUP] faile to memset")));
return false;
}
char *ptr = m_streamExtent;
CfsExtentHeader *newPca = GetStreamPca();
newPca->algorithm = pca->algorithm;
newPca->chunk_size = pca->chunk_size;
for (uint16 nthBlock = 0; nthBlock < nBlocks; nthBlock++) {
auto size = AddCompactCompressedPage(ptr, nthBlock, newChunkPos);
if (size < 0) {
ereport(ERROR, (errcode_for_file_access(),
errmsg("[BACKUP] failed to stream the valid %uth block in extent %lu of "
"file %s, total read times: %lu", nthBlock, m_currentExtent,
m_fileName, m_retriedRead)));
return false;
}
ptr += size;
}
Assert(static_cast<size_t>(ptr - m_streamExtent) < gExtentSizeInBytes);
newPca->nblocks = nBlocks;
newPca->allocated_chunks = newChunkPos - 1;
return true;
}
* Given the member, write the TAR header & send the file.
*
* If 'missing_ok' is true, will not throw an error if the file is not found.
*
* Returns true if the file was successfully sent, false if 'missing_ok',
* and the file did not exist.
*/
static bool sendFile(char *readfilename, char *tarfilename, struct stat *statbuf, bool missing_ok)
{
FILE *fp = NULL;
size_t cnt;
pgoff_t len = 0;
size_t pad;
errno_t rc = 0;
int check_loc = 0;
BlockNumber blkno = 0;
uint16 checksum = 0;
bool isNeedCheck = false;
int segNo = 0;
const int MAX_RETRY_LIMITA = 60;
int retryCnt = 0;
UndoFileType undoFileType = UNDO_INVALID;
SendFilePreInit();
fp = SizeCheckAndAllocate(readfilename, *statbuf, missing_ok);
if (fp == NULL) {
return false;
}
isNeedCheck = is_row_data_file(readfilename, &segNo, &undoFileType);
ereport(DEBUG1, (errmsg("sendFile, filename is %s, isNeedCheck is %d", readfilename, isNeedCheck)));
if (isNeedCheck) {
statbuf->st_size = statbuf->st_size - (statbuf->st_size % BLCKSZ);
}
_tarWriteHeader(tarfilename, NULL, statbuf);
char *fname = NULL;
if ((fname = strstr(readfilename, UNDO_META_FILE)) != NULL) {
return SendUndoMeta(fp, statbuf);
}
* need to send to main standby in standby cluster, so we must seek a postion accoring to primary id.
* Then content of primary id will be read.
*/
if (ENABLE_DSS && strcmp(tarfilename, XLOG_CONTROL_FILE) == 0) {
int read_size = BUFFERALIGN(sizeof(ControlFileData));
statbuf->st_size = read_size;
}
while ((cnt = fread(t_thrd.basebackup_cxt.buf_block, 1, Min(TAR_SEND_SIZE, statbuf->st_size - len), fp)) > 0) {
if (t_thrd.walsender_cxt.walsender_ready_to_stop)
ereport(ERROR, (errcode_for_file_access(), errmsg("base backup receive stop message, aborting backup")));
recheck:
if (cnt != (size_t)Min(TAR_SEND_SIZE, statbuf->st_size - len)) {
if (ferror(fp)) {
ereport(ERROR, (errcode_for_file_access(), errmsg("could not read file \"%s\": %m", readfilename)));
}
}
if (g_instance.attr.attr_storage.enableIncrementalCheckpoint && isNeedCheck && !SS_DISASTER_CLUSTER) {
uint32 segSize;
GET_SEG_SIZE(undoFileType, segSize);
if (len % BLCKSZ != 0 || cnt % BLCKSZ != 0) {
ereport(
ERROR,
(errcode_for_file_access(),
errmsg("base backup file length cannot be divisibed by 8k: file %s, len %ld, cnt %ld, aborting "
"backup",
readfilename, len, cnt)));
}
for (check_loc = 0; (unsigned int)(check_loc) < cnt; check_loc += BLCKSZ) {
blkno = len / BLCKSZ + check_loc / BLCKSZ + (segNo * segSize);
PageHeader phdr = PageHeader(t_thrd.basebackup_cxt.buf_block + check_loc);
if (!CheckPageZeroCases(phdr)) {
continue;
}
checksum = pg_checksum_page(t_thrd.basebackup_cxt.buf_block + check_loc, blkno);
if (phdr->pd_checksum != checksum) {
if (fseeko(fp, (off_t)len, SEEK_SET) != 0) {
ereport(ERROR,
(errcode_for_file_access(), errmsg("could not seek in file \"%s\": %m", readfilename)));
}
cnt = fread(t_thrd.basebackup_cxt.buf_block, 1, Min(TAR_SEND_SIZE, statbuf->st_size - len), fp);
if (cnt > 0 && retryCnt < MAX_RETRY_LIMITA) {
retryCnt++;
pg_usleep(1000000);
goto recheck;
} else if (cnt > 0 && retryCnt == MAX_RETRY_LIMITA) {
ereport(
ERROR,
(errcode_for_file_access(),
errmsg("base backup cheksum failed in file \"%s\" block %u (computed: %d, recorded: %d), "
"aborting backup",
readfilename, blkno, checksum, phdr->pd_checksum)));
} else {
retryCnt = 0;
break;
}
}
retryCnt = 0;
}
}
if (pq_putmessage_noblock('d', t_thrd.basebackup_cxt.buf_block, cnt))
ereport(ERROR, (errcode_for_file_access(), errmsg("base backup could not send data, aborting backup")));
len += cnt;
if (len >= statbuf->st_size) {
* Reached end of file. The file could be longer, if it was
* extended while we were sending it, but for a base backup we can
* ignore such extended data. It will be restored from WAL.
*/
break;
}
}
if (len < statbuf->st_size) {
rc = memset_s(t_thrd.basebackup_cxt.buf_block, TAR_SEND_SIZE, 0, TAR_SEND_SIZE);
securec_check(rc, "", "");
while (len < statbuf->st_size) {
cnt = Min(TAR_SEND_SIZE, statbuf->st_size - len);
(void)pq_putmessage_noblock('d', t_thrd.basebackup_cxt.buf_block, cnt);
len += cnt;
}
}
pad = ((len + 511) & ~511) - len;
if (pad > 0) {
rc = memset_s(t_thrd.basebackup_cxt.buf_block, pad, 0, pad);
securec_check(rc, "", "");
(void)pq_putmessage_noblock('d', t_thrd.basebackup_cxt.buf_block, pad);
}
(void)FreeFile(fp);
return true;
}
static void _tarWriteHeader(const char *filename, const char *linktarget, struct stat *statbuf)
{
char h[BUILD_PATH_LEN];
errno_t rc = EOK;
int nRet = 0;
* Note: most of the fields in a tar header are not supposed to be
* null-terminated. We use sprintf, which will write a null after the
* required bytes; that null goes into the first byte of the next field.
* This is okay as long as we fill the fields in order.
*/
rc = memset_s(h, sizeof(h), 0, sizeof(h));
securec_check(rc, "", "");
nRet = sprintf_s(&h[0], BUILD_PATH_LEN, "%.1023s", filename);
securec_check_ss(nRet, "", "");
if (linktarget != NULL || S_ISDIR(statbuf->st_mode)) {
* We only support symbolic links to directories, and this is
* indicated in the tar format by adding a slash at the end of the
* name, the same as for regular directories.
*/
h[strlen(filename)] = '/';
h[strlen(filename) + 1] = '\0';
}
nRet = sprintf_s(&h[1024], BUILD_PATH_LEN - 1024, "%07o ", statbuf->st_mode);
securec_check_ss(nRet, "", "");
if (linktarget != NULL || S_ISDIR(statbuf->st_mode))
print_val(&h[1048], 0, 8, 11);
else
print_val(&h[1048], statbuf->st_size, 8, 11);
if (linktarget != NULL) {
if (0 == strncmp(linktarget, t_thrd.proc_cxt.DataDir, strlen(t_thrd.proc_cxt.DataDir))) {
nRet = sprintf_s(&h[1080], BUILD_PATH_LEN - 1080, "3");
securec_check_ss(nRet, "", "");
nRet = sprintf_s(&h[1081], BUILD_PATH_LEN - 1081, "%.1023s",
linktarget + strlen(t_thrd.proc_cxt.DataDir) + 1);
securec_check_ss(nRet, "", "");
} else {
nRet = sprintf_s(&h[1080], BUILD_PATH_LEN - 1080, "2");
securec_check_ss(nRet, "", "");
nRet = sprintf_s(&h[1081], BUILD_PATH_LEN - 1081, "%.1023s", linktarget);
securec_check_ss(nRet, "", "");
}
} else if (S_ISDIR(statbuf->st_mode)) {
nRet = sprintf_s(&h[1080], BUILD_PATH_LEN - 1080, "5");
securec_check_ss(nRet, "", "");
} else {
nRet = sprintf_s(&h[1080], BUILD_PATH_LEN - 1080, "0");
securec_check_ss(nRet, "", "");
}
(void)pq_putmessage_noblock('d', h, BUILD_PATH_LEN);
}
static XLogRecPtr GetMinArchiveSlotLSN(void)
{
XLogRecPtr minArchSlotPtr = InvalidXLogRecPtr;
for (int slotno = 0; slotno < g_instance.attr.attr_storage.max_replication_slots; slotno++) {
XLogRecPtr restart_lsn;
ReplicationSlot *slot = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[slotno];
SpinLockAcquire(&slot->mutex);
if (slot->in_use == true && slot->archive_config != NULL && slot->archive_config->is_recovery == false) {
restart_lsn = slot->data.restart_lsn;
if ((!XLByteEQ(restart_lsn, InvalidXLogRecPtr)) &&
(XLByteEQ(minArchSlotPtr, InvalidXLogRecPtr) || XLByteLT(restart_lsn, minArchSlotPtr))) {
minArchSlotPtr = restart_lsn;
}
}
SpinLockRelease(&slot->mutex);
}
return minArchSlotPtr;
}
static XLogRecPtr GetMinLogicalSlotLSN(void)
{
XLogRecPtr minLogicalSlotPtr = InvalidXLogRecPtr;
for (int slotno = 0; slotno < g_instance.attr.attr_storage.max_replication_slots; slotno++) {
XLogRecPtr restart_lsn;
ReplicationSlot *slot = &t_thrd.slot_cxt.ReplicationSlotCtl->replication_slots[slotno];
SpinLockAcquire(&slot->mutex);
if (slot->in_use == true && slot->data.database != InvalidOid) {
restart_lsn = slot->data.restart_lsn;
if ((!XLByteEQ(restart_lsn, InvalidXLogRecPtr)) &&
(XLByteEQ(minLogicalSlotPtr, InvalidXLogRecPtr) || XLByteLT(restart_lsn, minLogicalSlotPtr))) {
minLogicalSlotPtr = restart_lsn;
}
}
SpinLockRelease(&slot->mutex);
}
return minLogicalSlotPtr;
}
void ut_save_xlogloc(const char *xloglocation)
{
save_xlogloc(xloglocation);
}
static bool SendUndoMeta(FILE *fp, struct stat *statbuf)
{
Assert(fp != NULL);
Assert(statbuf != NULL);
if (statbuf->st_size != undometaSize) {
(void)FreeFile(fp);
ereport(ERROR, (errmsg("Undometa size[%ld] error", statbuf->st_size)));
}
pgoff_t len = 0;
MemoryContext oldcxt = MemoryContextSwitchTo(THREAD_GET_MEM_CXT_GROUP(MEMORY_CONTEXT_STORAGE));
char *undoMeta = (char *)palloc0(statbuf->st_size);
MemoryContextSwitchTo(oldcxt);
int retry = 0;
size_t cnt = 0;
errno_t rc = 0;
size_t pad;
fseek(fp, 0, SEEK_SET);
while ((cnt = fread(undoMeta, 1, statbuf->st_size, fp)) > 0) {
if (t_thrd.walsender_cxt.walsender_ready_to_stop) {
pfree(undoMeta);
ereport(ERROR, (errcode_for_file_access(), errmsg("base backup receive stop message, aborting backup")));
}
if (cnt != (size_t)statbuf->st_size) {
if (ferror(fp)) {
pfree(undoMeta);
ereport(ERROR, (errcode_for_file_access(), errmsg("could not read file undometa file")));
}
}
if (CheckUndoMetaBuf(undoMeta)) {
ereport(LOG, (errmsg("checkUndoMeta Success")));
break;
}
retry++;
fseek(fp, 0, SEEK_SET);
if (retry > undometaRetryMax) {
pfree(undoMeta);
(void)FreeFile(fp);
ereport(ERROR, (errmsg("Read undo meta error")));
}
}
while (len < statbuf->st_size) {
if (t_thrd.walsender_cxt.walsender_ready_to_stop) {
ereport(ERROR, (errcode_for_file_access(), errmsg("base backup receive stop message, aborting backup")));
}
cnt = Min(TAR_SEND_SIZE, statbuf->st_size - len);
if (pq_putmessage_noblock('d', undoMeta + len, cnt)) {
ereport(ERROR, (errcode_for_file_access(), errmsg("base backup could not send data, aborting backup")));
}
len += cnt;
if (len >= statbuf->st_size) {
* Reached end of file. The file could be longer, if it was
* extended while we were sending it, but for a base backup we can
* ignore such extended data. It will be restored from WAL.
*/
break;
}
}
if (len < statbuf->st_size) {
rc = memset_s(t_thrd.basebackup_cxt.buf_block, TAR_SEND_SIZE, 0, TAR_SEND_SIZE);
securec_check(rc, "", "");
while (len < statbuf->st_size) {
cnt = Min(TAR_SEND_SIZE, statbuf->st_size - len);
(void)pq_putmessage_noblock('d', t_thrd.basebackup_cxt.buf_block, cnt);
len += cnt;
}
}
pad = ((len + 511) & ~511) - len;
if (pad > 0) {
rc = memset_s(t_thrd.basebackup_cxt.buf_block, pad, 0, pad);
securec_check(rc, "", "");
(void)pq_putmessage_noblock('d', t_thrd.basebackup_cxt.buf_block, pad);
}
pfree(undoMeta);
(void)FreeFile(fp);
return true;
}