* snapmgr.c
* openGauss snapshot manager
*
* We keep track of snapshots in two ways: those "registered" by resowner.c,
* and the "active snapshot" stack. All snapshots in either of them live in
* persistent memory. When a snapshot is no longer in any of these lists
* (tracked by separate refcounts on each snapshot), its memory can be freed.
*
* The same is true for historic snapshots used during logical decoding,
* their lifetime is managed separately (as they life longer as one xact.c
* transaction).
*
* The FirstXactSnapshot, if any, is treated a bit specially: we increment its
* regd_count and count it in RegisteredSnapshots, but this reference is not
* tracked by a resource owner. We used to use the TopTransactionResourceOwner
* to track this snapshot reference, but that introduces logical circularity
* and thus makes it impossible to clean up in a sane fashion. It's better to
* handle this reference as an internally-tracked registration, so that this
* module is entirely lower-level than ResourceOwners.
*
* Likewise, any snapshots that have been exported by pg_export_snapshot
* have regd_count = 1 and are counted in RegisteredSnapshots, but are not
* tracked by any resource owner.
*
* These arrangements let us reset MyPgXact->xmin when there are no snapshots
* referenced by this transaction. (One possible improvement would be to be
* able to advance Xmin when the snapshot with the earliest Xmin is no longer
* referenced. That's a bit harder though, it requires more locking, and
* anyway it should be rather uncommon to keep temporary snapshots referenced
* for too long.)
*
*
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
* Portions Copyright (c) 2021, openGauss Contributors
*
* IDENTIFICATION
* src/backend/utils/time/snapmgr.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include <sys/stat.h>
#include "access/csnlog.h"
#include "access/multi_redo_api.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/multi_redo_api.h"
#include "catalog/pg_type.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "pgxc/execRemote.h"
#include "storage/predicate.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "ddes/dms/ss_common_attr.h"
#include "ddes/dms/ss_transaction.h"
#include "storage/file/fio_device.h"
#ifdef PGXC
#include "pgxc/pgxc.h"
#endif
#include "storage/sinvaladt.h"
SnapshotData CatalogSnapshotData = {SNAPSHOT_MVCC};
* Elements of the active snapshot stack.
*
* Each element here accounts for exactly one active_count on SnapshotData.
*
* NB: the code assumes that elements in this list are in non-increasing
* order of as_level; also, the list must be NULL-terminated.
*/
typedef struct ActiveSnapshotElt {
Snapshot as_snap;
int as_level;
struct ActiveSnapshotElt* as_next;
} ActiveSnapshotElt;
static THR_LOCAL bool RegisterStreamSnapshot = false;
#define SNAPSHOT_EXPORT_DIR (g_instance.datadir_cxt.snapshotsDir)
typedef struct ExportedSnapshot {
char *snapfile;
Snapshot snapshot;
} ExportedSnapshot;
#define XactExportFilePath(path, xid, num, suffix) \
{ \
int rc = snprintf_s(path, \
sizeof(path), \
sizeof(path) - 1, \
"%s/%08X%08X-%d%s", \
(g_instance.datadir_cxt.snapshotsDir), \
(uint32)((xid) >> 32), \
(uint32)(xid), \
(num), \
(suffix)); \
securec_check_ss(rc, "", ""); \
}
#define MAX_ULONG_LENGTH 22
THR_LOCAL SnapshotData SnapshotNowData = {SNAPSHOT_NOW};
THR_LOCAL SnapshotData SnapshotSelfData = {SNAPSHOT_SELF};
THR_LOCAL SnapshotData SnapshotAnyData = {SNAPSHOT_ANY};
THR_LOCAL SnapshotData SnapshotToastData = {SNAPSHOT_TOAST};
#ifdef ENABLE_MULTIPLE_NODES
THR_LOCAL SnapshotData SnapshotNowNoSyncData = {SNAPSHOT_NOW_NO_SYNC};
#endif
static Snapshot CopySnapshot(Snapshot snapshot);
static void FreeSnapshot(Snapshot snapshot);
static void SnapshotResetXmin(void);
static bool TransactionIdIsPreparedAtSnapshot(TransactionId xid, Snapshot snapshot);
* TransactionIdIsPreparedAtSnapshot
* At the time of snapshot, true iff transaction associated
* with the identifier is prepared for two-phase commit
*
* Note: only gxacts marked "valid" are considered; but notice we do not
* check the locking status.
*
*/
static bool TransactionIdIsPreparedAtSnapshot(TransactionId xid, Snapshot snapshot)
{
for (int i = 0; i < snapshot->prepared_count; i++) {
if (xid == snapshot->prepared_array[i]) {
return true;
}
}
return false;
}
bool IsXidVisibleInGtmLiteLocalSnapshot(TransactionId xid, Snapshot snapshot,
TransactionIdStatus hint_status, bool xmin_equal_xmax, Buffer buffer, bool *sync)
{
* calling XidVisibleInSnapshot(), it will determine that if it is visible correctly.
* However, because we support mixed snapshots (global and local), we have to
* examine if the xmin is in prepared list or not. Even if it is "invisible",
* but if it is in the list, we will treat it as visible (if the transaction is NOT aborted at the end).
* exceptional case for xmin == xmax when check xmin
* xmin_qual_xmax always false when check xmax
*/
if (hint_status == XID_ABORTED || xmin_equal_xmax) {
return false;
}
if (!t_thrd.xact_cxt.bInAbortTransaction && TransactionIdIsPreparedAtSnapshot(xid, snapshot)) {
if (hint_status == XID_COMMITTED) {
return true;
}
if (u_sess->attr.attr_common.xc_maintenance_mode) {
return false;
}
if (sync != NULL) {
*sync = true;
}
SyncWaitXidEnd(xid, buffer);
if (TransactionIdDidCommit(xid) == true) {
return true;
}
}
return false;
}
void RecheckXidFinish(TransactionId xid, CommitSeqNo csn)
{
if (TransactionIdIsInProgress(xid)) {
ereport(defence_errlevel(), (errmsg("transaction id %lu is still running, "
"cannot set csn %lu to abort.", xid, csn)));
}
}
* XidVisibleInSnapshot
* Is the given XID visible according to the snapshot?
*
* On return, *hintstatus is set to indicate if the transaction had committed,
* or aborted, whether or not it's not visible to us.
*/
bool XidVisibleInSnapshot(TransactionId xid, Snapshot snapshot, TransactionIdStatus* hintstatus, Buffer buffer, bool* sync)
{
volatile CommitSeqNo csn;
bool looped = false;
TransactionId parentXid = InvalidTransactionId;
*hintstatus = XID_INPROGRESS;
#ifdef XIDVIS_DEBUG
ereport(DEBUG1,
(errmsg("XidVisibleInSnapshot xid %ld cur_xid %ld snapshot csn %lu xmax %ld",
xid,
GetCurrentTransactionIdIfAny(),
snapshot->snapshotcsn,
snapshot->xmax)));
#endif
loop:
if (ENABLE_DMS) {
if (SSCanFetchLocalSnapshotTxnRelatedInfo()) {
csn = TransactionIdGetCommitSeqNo(xid, false, true, false, snapshot);
} else {
csn = SSTransactionIdGetCommitSeqNo(xid, false, true, false, snapshot, sync);
}
} else {
csn = TransactionIdGetCommitSeqNo(xid, false, true, false, snapshot);
}
#ifdef XIDVIS_DEBUG
ereport(DEBUG1,
(errmsg("XidVisibleInSnapshot xid %ld cur_xid %ld csn %ld snapshot"
"csn %ld xmax %ld",
xid,
GetCurrentTransactionIdIfAny(),
csn,
snapshot->snapshotcsn,
snapshot->xmax)));
#endif
if (COMMITSEQNO_IS_COMMITTED(csn)) {
*hintstatus = XID_COMMITTED;
if (csn < snapshot->snapshotcsn)
return true;
else
return false;
} else if (COMMITSEQNO_IS_COMMITTING(csn)) {
if (SS_STANDBY_MODE) {
ereport(FATAL, (errmsg("SS xid %lu's csn %lu is still COMMITTING after Master txn waited.", xid, csn)));
}
if (looped) {
if (snapshot->takenDuringRecovery) {
return false;
}
ereport(DEBUG1, (errmsg("transaction id %lu's csn %ld is changed to ABORT after lockwait.", xid, csn)));
RecheckXidFinish(xid, csn);
CSNLogSetCommitSeqNo(xid, 0, NULL, COMMITSEQNO_ABORTED);
SetLatestFetchState(xid, COMMITSEQNO_ABORTED);
*hintstatus = XID_ABORTED;
return false;
} else {
if (!COMMITSEQNO_IS_SUBTRANS(csn)) {
CommitSeqNo latestCSN = GET_COMMITSEQNO(csn);
if (latestCSN >= snapshot->snapshotcsn) {
ereport(DEBUG1,
(errmsg(
"snapshotcsn %lu lower than csn %lu stored in csn log, don't need to sync wait, trx id %lu",
snapshot->snapshotcsn,
csn,
xid)));
return false;
}
} else {
parentXid = (TransactionId)GET_PARENTXID(csn);
}
if (u_sess->attr.attr_common.xc_maintenance_mode || t_thrd.xact_cxt.bInAbortTransaction) {
return false;
}
if (sync != NULL) {
*sync = true;
}
if (TransactionIdIsValid(parentXid))
SyncWaitXidEnd(parentXid, buffer, snapshot);
else
SyncWaitXidEnd(xid, buffer, snapshot);
looped = true;
parentXid = InvalidTransactionId;
goto loop;
}
} else {
if (csn == COMMITSEQNO_ABORTED)
*hintstatus = XID_ABORTED;
return false;
}
}
bool UHeapXidVisibleInSnapshot(TransactionId xid, Snapshot snapshot,
TransactionIdStatus *hintstatus, Buffer buffer, bool *sync)
{
if (TransactionIdOlderThanAllUndo(xid)) {
return CommittedXidVisibleInSnapshot(xid, snapshot, buffer);
}
return XidVisibleInSnapshot(xid, snapshot, hintstatus, buffer, sync);
}
bool XidVisibleInDecodeSnapshot(TransactionId xid, Snapshot snapshot, TransactionIdStatus* hintstatus, Buffer buffer)
{
volatile CommitSeqNo csn;
*hintstatus = XID_INPROGRESS;
csn = TransactionIdGetCommitSeqNo(xid, false, true, false, snapshot);
if (COMMITSEQNO_IS_COMMITTED(csn)) {
*hintstatus = XID_COMMITTED;
if (csn < snapshot->snapshotcsn) {
return true;
} else {
return false;
}
} else {
if (csn == COMMITSEQNO_ABORTED) {
*hintstatus = XID_ABORTED;
}
}
return false;
}
* CommittedXidVisibleInSnapshot
* Is the given XID visible according to the snapshot?
*
* This is the same as XidVisibleInSnapshot, but the caller knows that the
* given XID committed. The only question is whether it's visible to our
* snapshot or not.
*/
bool CommittedXidVisibleInSnapshot(TransactionId xid, Snapshot snapshot, Buffer buffer)
{
CommitSeqNo csn;
bool looped = false;
TransactionId parentXid = InvalidTransactionId;
if (!GTM_LITE_MODE || snapshot->gtm_snapshot_type == GTM_SNAPSHOT_TYPE_LOCAL) {
* Make a quick range check to eliminate most XIDs without looking at the
* CSN log.
*/
if (TransactionIdPrecedes(xid, snapshot->xmin))
return true;
}
loop:
if (ENABLE_DMS) {
if (SSCanFetchLocalSnapshotTxnRelatedInfo()) {
csn = TransactionIdGetCommitSeqNo(xid, true, true, false, snapshot);
} else {
csn = SSTransactionIdGetCommitSeqNo(xid, true, true, false, snapshot, NULL);
}
} else {
csn = TransactionIdGetCommitSeqNo(xid, true, true, false, snapshot);
}
if (COMMITSEQNO_IS_COMMITTING(csn)) {
if (SS_STANDBY_MODE) {
ereport(FATAL, (errmsg("SS xid %lu's csn %lu is still COMMITTING after Master txn waited.", xid, csn)));
}
if (looped) {
ereport(WARNING,
(errmsg("committed transaction id %lu's csn %lu"
"is changed to frozen after lockwait.",
xid,
csn)));
CSNLogSetCommitSeqNo(xid, 0, NULL, COMMITSEQNO_FROZEN);
SetLatestFetchState(xid, COMMITSEQNO_FROZEN);
return true;
} else {
if (!COMMITSEQNO_IS_SUBTRANS(csn)) {
CommitSeqNo latestCSN = GET_COMMITSEQNO(csn);
if (latestCSN >= snapshot->snapshotcsn) {
ereport(DEBUG1,
(errmsg("snapshotcsn %lu lower than csn %lu"
" stored in csn log, don't need to sync wait, trx id %lu",
snapshot->snapshotcsn,
csn,
xid)));
return false;
}
} else {
parentXid = (TransactionId)GET_PARENTXID(csn);
}
if (u_sess->attr.attr_common.xc_maintenance_mode || t_thrd.xact_cxt.bInAbortTransaction) {
return false;
}
if (TransactionIdIsValid(parentXid))
SyncWaitXidEnd(parentXid, buffer);
else
SyncWaitXidEnd(xid, buffer);
looped = true;
parentXid = InvalidTransactionId;
goto loop;
}
} else if (!COMMITSEQNO_IS_COMMITTED(csn)) {
ereport(WARNING,
(errmsg("transaction/csn %lu/%lu was hinted as "
"committed, but was not marked as committed in "
"the transaction log",
xid,
csn)));
* We have contradicting evidence on whether the transaction committed or
* not. Let's assume that it did. That seems better than erroring out.
*/
return true;
}
if (csn < snapshot->snapshotcsn)
return true;
else
return false;
}
bool CommittedXidVisibleInDecodeSnapshot(TransactionId xid, Snapshot snapshot, Buffer buffer)
{
CommitSeqNo csn;
csn = TransactionIdGetCommitSeqNo(xid, true, true, false, snapshot);
if (COMMITSEQNO_IS_COMMITTING(csn)) {
return false;
} else if (!COMMITSEQNO_IS_COMMITTED(csn)) {
ereport(WARNING,
(errmsg("transaction/csn %lu/%lu was hinted as "
"committed, but was not marked as committed in "
"the transaction log",
xid, csn)));
* We have contradicting evidence on whether the transaction committed or
* not. Let's assume that it did. That seems better than erroring out.
*/
return true;
}
if (csn < snapshot->snapshotcsn) {
return true;
} else {
return false;
}
}
* GetTransactionSnapshot
* Get the appropriate snapshot for a new query in a transaction.
*
* Note that the return value may point at static storage that will be modified
* by future calls and by CommandCounterIncrement(). Callers should call
* RegisterSnapshot or PushActiveSnapshot on the returned snap if it is to be
* used very long.
*/
Snapshot GetTransactionSnapshot(bool force_local_snapshot)
{
* Return historic snapshot if doing logical decoding. We'll never
* need a non-historic transaction snapshot in this (sub-)transaction, so
* there's no need to be careful to set one up for later calls to
* GetTransactionSnapshot function.
*/
if (HistoricSnapshotActive()) {
Assert(!u_sess->utils_cxt.FirstSnapshotSet);
return u_sess->utils_cxt.HistoricSnapshot;
}
if (!u_sess->utils_cxt.FirstSnapshotSet) {
Assert(u_sess->utils_cxt.RegisteredSnapshots == 0);
Assert(u_sess->utils_cxt.FirstXactSnapshot == NULL);
* In transaction-snapshot mode, the first snapshot must live until
* end of xact regardless of what the caller does with it, so we must
* make a copy of it rather than returning CurrentSnapshotData
* directly. Furthermore, if we're running in serializable mode,
* predicate.c needs to wrap the snapshot fetch in its own processing.
*/
Assert(!(u_sess->utils_cxt.CurrentSnapshot != NULL && u_sess->utils_cxt.CurrentSnapshot->user_data != NULL));
if (IsolationUsesXactSnapshot()) {
if (IsolationIsSerializable()) {
u_sess->utils_cxt.CurrentSnapshot =
GetSerializableTransactionSnapshot(u_sess->utils_cxt.CurrentSnapshotData);
} else {
u_sess->utils_cxt.CurrentSnapshot =
GetSnapshotData(u_sess->utils_cxt.CurrentSnapshotData, false);
}
Assert(
!(u_sess->utils_cxt.CurrentSnapshot != NULL && u_sess->utils_cxt.CurrentSnapshot->user_data != NULL));
u_sess->utils_cxt.CurrentSnapshot = CopySnapshot(u_sess->utils_cxt.CurrentSnapshot);
u_sess->utils_cxt.FirstXactSnapshot = u_sess->utils_cxt.CurrentSnapshot;
u_sess->utils_cxt.FirstXactSnapshot->regd_count++;
u_sess->utils_cxt.RegisteredSnapshots++;
} else {
u_sess->utils_cxt.CurrentSnapshot =
GetSnapshotData(u_sess->utils_cxt.CurrentSnapshotData, force_local_snapshot);
}
u_sess->utils_cxt.FirstSnapshotSet = true;
return u_sess->utils_cxt.CurrentSnapshot;
}
if (IsolationUsesXactSnapshot()|| IS_EXRTO_STANDBY_READ) {
#ifdef PGXC
* Consider this test case taken from portals.sql
*
* CREATE TABLE cursor (a int, b int) distribute by replication;
* INSERT INTO cursor VALUES (10);
* BEGIN;
* SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
* DECLARE c1 NO SCROLL CURSOR FOR SELECT * FROM cursor FOR UPDATE;
* INSERT INTO cursor VALUES (2);
* FETCH ALL FROM c1;
* would result in
* ERROR: attempted to lock invisible tuple
* because FETCH would be sent as a select to the remote nodes
* with command id 0, whereas the command id would be 2
* in the current snapshot.
* (1 sent by Coordinator due to declare cursor &
* 2 because of the insert inside the transaction)
* The command id should therefore be updated in the
* current snapshot.
*/
if (IsConnFromCoord())
SnapshotSetCommandId(GetCurrentCommandId(false));
#endif
if (IS_EXRTO_STANDBY_READ) {
t_thrd.pgxact->xmin = u_sess->utils_cxt.CurrentSnapshot->xmin;
t_thrd.proc->exrto_min = u_sess->utils_cxt.CurrentSnapshot->read_lsn;
t_thrd.proc->exrto_read_lsn = t_thrd.proc->exrto_min;
t_thrd.proc->exrto_gen_snap_time = GetCurrentTimestamp();
}
return u_sess->utils_cxt.CurrentSnapshot;
}
Assert(!(u_sess->utils_cxt.CurrentSnapshot != NULL && u_sess->utils_cxt.CurrentSnapshot->user_data != NULL));
u_sess->utils_cxt.CurrentSnapshot =
GetSnapshotData(u_sess->utils_cxt.CurrentSnapshotData, force_local_snapshot);
return u_sess->utils_cxt.CurrentSnapshot;
}
void StreamTxnContextSetSnapShot(void* snapshotPtr)
{
u_sess->utils_cxt.FirstSnapshotSet = true;
Snapshot snapshot = (Snapshot)snapshotPtr;
u_sess->utils_cxt.CurrentSnapshot = u_sess->utils_cxt.CurrentSnapshotData;
u_sess->utils_cxt.CurrentSnapshot->xmin = snapshot->xmin;
u_sess->utils_cxt.CurrentSnapshot->xmax = snapshot->xmax;
u_sess->utils_cxt.CurrentSnapshot->timeline = snapshot->timeline;
u_sess->utils_cxt.CurrentSnapshot->snapshotcsn = snapshot->snapshotcsn;
u_sess->utils_cxt.CurrentSnapshot->read_lsn = snapshot->read_lsn;
u_sess->utils_cxt.CurrentSnapshot->curcid = snapshot->curcid;
* This is a new snapshot, so set both refcounts are zero, and mark it as
* not copied in persistent memory.
*/
u_sess->utils_cxt.CurrentSnapshot->active_count = 0;
u_sess->utils_cxt.CurrentSnapshot->regd_count = 0;
u_sess->utils_cxt.CurrentSnapshot->copied = false;
}
void StreamTxnContextSetMyPgXactXmin(TransactionId xmin)
{
t_thrd.pgxact->xmin = xmin;
}
* GetLatestSnapshot
* Get a snapshot that is up-to-date as of the current instant,
* even if we are executing in transaction-snapshot mode.
*/
Snapshot GetLatestSnapshot(void)
{
* So far there are no cases requiring support for GetLatestSnapshot()
* during logical decoding, but it wouldn't be hard to add if
* required.
*/
Assert(!HistoricSnapshotActive());
if (!u_sess->utils_cxt.FirstSnapshotSet)
return GetTransactionSnapshot();
Assert(!(u_sess->utils_cxt.SecondarySnapshot != NULL && u_sess->utils_cxt.SecondarySnapshot->user_data != NULL));
u_sess->utils_cxt.SecondarySnapshot =
GetSnapshotData(u_sess->utils_cxt.SecondarySnapshotData, false);
return u_sess->utils_cxt.SecondarySnapshot;
}
Snapshot get_standby_snapshot()
{
if (u_sess->utils_cxt.FirstSnapshotSet) {
Assert(!(u_sess->utils_cxt.CurrentSnapshot != NULL && u_sess->utils_cxt.CurrentSnapshot->user_data != NULL));
return u_sess->utils_cxt.CurrentSnapshot;
}
return GetTransactionSnapshot();
}
* GetCatalogSnapshot
* Get a snapshot that is sufficiently up-to-date for scan of the
* system catalog with the specified OID.
*/
Snapshot GetCatalogSnapshot()
{
* Return historic snapshot if we're doing logical decoding, but
* return a non-historic, snapshot if we temporarily are doing up2date
* lookups.
*/
if (HistoricSnapshotActive())
return u_sess->utils_cxt.HistoricSnapshot;
if (IS_EXRTO_RECOVERY_IN_PROGRESS && t_thrd.role != TRACK_STMT_WORKER && !dummyStandbyMode) {
return get_standby_snapshot();
}
return SnapshotNow;
}
* get_toast_snapshot
* Get a snapshot that is sufficiently up-to-date for scan of the
* toast with the specified OID.
*/
Snapshot get_toast_snapshot()
{
if (IS_EXRTO_RECOVERY_IN_PROGRESS && t_thrd.role != TRACK_STMT_WORKER && !dummyStandbyMode) {
return get_standby_snapshot();
}
return SnapshotToast;
}
* SnapshotSetCommandId
* Propagate CommandCounterIncrement into the static snapshots, if set
*/
void SnapshotSetCommandId(CommandId curcid)
{
if (!u_sess->utils_cxt.FirstSnapshotSet)
return;
if (u_sess->utils_cxt.CurrentSnapshot)
u_sess->utils_cxt.CurrentSnapshot->curcid = curcid;
if (u_sess->utils_cxt.SecondarySnapshot)
u_sess->utils_cxt.SecondarySnapshot->curcid = curcid;
if (SS_PRIMARY_MODE || SS_OFFICIAL_PRIMARY) {
t_thrd.pgxact->cid = curcid;
}
}
* SetTransactionSnapshot
* Set the transaction's snapshot from an imported MVCC snapshot.
*
* Note that this is very closely tied to GetTransactionSnapshot --- it
* must take care of all the same considerations as the first-snapshot case
* in GetTransactionSnapshot.
*/
void SetTransactionSnapshot(Snapshot sourcesnap, VirtualTransactionId *sourcevxid, ThreadId sourcepid)
{
Assert(!u_sess->utils_cxt.FirstSnapshotSet);
Assert(u_sess->utils_cxt.RegisteredSnapshots == 0);
Assert(u_sess->utils_cxt.FirstXactSnapshot == NULL);
Assert(!(u_sess->utils_cxt.CurrentSnapshot != NULL && u_sess->utils_cxt.CurrentSnapshot->user_data != NULL));
* Even though we are not going to use the snapshot it computes, we must
* call GetSnapshotData, for two reasons: (1) to be sure that
* CurrentSnapshotData's XID arrays have been allocated, and (2) to update
* RecentXmin and RecentGlobalXmin. (We could alternatively include those
* two variables in exported snapshot files, but it seems better to have
* snapshot importers compute reasonably up-to-date values for them.)
*/
u_sess->utils_cxt.CurrentSnapshot = GetSnapshotData(u_sess->utils_cxt.CurrentSnapshotData, false);
* Now copy appropriate fields from the source snapshot.
*/
u_sess->utils_cxt.CurrentSnapshot->xmin = sourcesnap->xmin;
u_sess->utils_cxt.CurrentSnapshot->xmax = sourcesnap->xmax;
u_sess->utils_cxt.CurrentSnapshot->snapshotcsn = sourcesnap->snapshotcsn;
u_sess->utils_cxt.CurrentSnapshot->timeline = sourcesnap->timeline;
u_sess->utils_cxt.CurrentSnapshot->takenDuringRecovery = sourcesnap->takenDuringRecovery;
* Now we have to fix what GetSnapshotData did with MyPgXact->xmin and
* TransactionXmin. There is a race condition: to make sure we are not
* causing the global xmin to go backwards, we have to test that the
* source transaction is still running, and that has to be done
* atomically. So let procarray.c do it.
*
* Note: in serializable mode, predicate.c will do this a second time. It
* doesn't seem worth contorting the logic here to avoid two calls,
* especially since it's not clear that predicate.c *must* do this.
*/
* In transaction-snapshot mode, the first snapshot must live until end of
* xact, so we must make a copy of it. Furthermore, if we're running in
* serializable mode, predicate.c needs to do its own processing.
*/
if (IsolationUsesXactSnapshot()) {
if (IsolationIsSerializable())
SetSerializableTransactionSnapshot(u_sess->utils_cxt.CurrentSnapshot, sourcevxid, sourcepid);
Assert(!(u_sess->utils_cxt.CurrentSnapshot != NULL && u_sess->utils_cxt.CurrentSnapshot->user_data != NULL));
u_sess->utils_cxt.CurrentSnapshot = CopySnapshot(u_sess->utils_cxt.CurrentSnapshot);
u_sess->utils_cxt.FirstXactSnapshot = u_sess->utils_cxt.CurrentSnapshot;
u_sess->utils_cxt.FirstXactSnapshot->regd_count++;
u_sess->utils_cxt.RegisteredSnapshots++;
}
u_sess->utils_cxt.FirstSnapshotSet = true;
}
* CopySnapshot
* Copy the given snapshot.
*
* The copy is palloc'd in u_sess->top_transaction_mem_cxt and has initial refcounts set
* to 0. The returned snapshot has the copied flag set.
*/
Snapshot CopySnapshot(Snapshot snapshot)
{
Snapshot newsnap;
int rc = 0;
Assert(snapshot != InvalidSnapshot);
newsnap = (Snapshot)MemoryContextAlloc(u_sess->top_transaction_mem_cxt, sizeof(SnapshotData));
rc = memcpy_s(newsnap, sizeof(SnapshotData), snapshot, sizeof(SnapshotData));
securec_check(rc, "", "");
newsnap->regd_count = 0;
newsnap->active_count = 0;
newsnap->copied = true;
newsnap->user_data = NULL;
if (GTM_LITE_MODE && snapshot->prepared_array) {
Assert(snapshot->prepared_array_capacity > 0);
size_t arraySize = sizeof(TransactionId) * snapshot->prepared_array_capacity;
newsnap->prepared_array =
(TransactionId *)MemoryContextAlloc(u_sess->top_transaction_mem_cxt, arraySize);
rc = memcpy_s(newsnap->prepared_array, arraySize, snapshot->prepared_array, arraySize);
securec_check(rc, "", "");
}
return newsnap;
}
* CopySnapshot
* Copy the given snapshot.
*
* The copy is palloc'd in u_sess->top_transaction_mem_cxt and has initial refcounts set
* to 0. The returned snapshot has the copied flag set.
*/
Snapshot CopySnapshotByCurrentMcxt(Snapshot snapshot)
{
Snapshot newsnap;
errno_t rc;
Assert(snapshot != InvalidSnapshot);
newsnap = (Snapshot)palloc(sizeof(SnapshotData));
rc = memcpy_s(newsnap, sizeof(SnapshotData), snapshot, sizeof(SnapshotData));
securec_check(rc, "\0", "\0");
newsnap->regd_count = 0;
newsnap->active_count = 0;
newsnap->copied = true;
newsnap->user_data = NULL;
if (GTM_LITE_MODE && snapshot->prepared_array) {
int arraySize = sizeof(TransactionId) * snapshot->prepared_array_capacity;
newsnap->prepared_array = (TransactionId *)palloc(arraySize);
rc = memcpy_s(newsnap->prepared_array, arraySize, snapshot->prepared_array, arraySize);
securec_check(rc, "", "");
}
return newsnap;
}
* FreeSnapshot
* Free the memory associated with a snapshot.
*/
static void FreeSnapshot(Snapshot snapshot)
{
Assert(snapshot->regd_count == 0);
Assert(snapshot->active_count == 0);
Assert(snapshot->copied);
Assert(!(snapshot != NULL && snapshot->user_data != NULL));
if (GTM_LITE_MODE && snapshot->prepared_array) {
pfree_ext(snapshot->prepared_array);
}
pfree_ext(snapshot);
}
* FreeSnapshot
* Free the memory associated with snapshot members exclude snapshot struct.
*/
void FreeSnapshotDeepForce(Snapshot snap)
{
if (snap->xip) {
pfree_ext(snap->xip);
}
if (snap->subxip) {
pfree_ext(snap->subxip);
}
if (snap->prepared_array) {
pfree_ext(snap->prepared_array);
}
if (snap->user_data) {
pfree_ext(snap->user_data);
}
pfree_ext(snap);
}
* PushActiveSnapshot
* Set the given snapshot as the current active snapshot
*
* If the passed snapshot is a statically-allocated one, or it is possibly
* subject to a future command counter update, create a new long-lived copy
* with active refcount=1. Otherwise, only increment the refcount.
*/
void PushActiveSnapshot(Snapshot snap)
{
ActiveSnapshotElt* newactive = NULL;
Assert(snap != InvalidSnapshot);
newactive = (ActiveSnapshotElt*)MemoryContextAlloc(u_sess->top_transaction_mem_cxt, sizeof(ActiveSnapshotElt));
* Checking SecondarySnapshot is probably useless here, but it seems
* better to be sure.
*/
if (snap == u_sess->utils_cxt.CurrentSnapshot || snap == u_sess->utils_cxt.SecondarySnapshot || !snap->copied ||
snap == u_sess->pgxc_cxt.gc_fdw_snapshot)
newactive->as_snap = CopySnapshot(snap);
else
newactive->as_snap = snap;
newactive->as_next = u_sess->utils_cxt.ActiveSnapshot;
newactive->as_level = GetCurrentTransactionNestLevel();
newactive->as_snap->active_count++;
u_sess->utils_cxt.ActiveSnapshot = newactive;
}
* PushCopiedSnapshot
* As above, except forcibly copy the presented snapshot.
*
* This should be used when the ActiveSnapshot has to be modifiable, for
* example if the caller intends to call UpdateActiveSnapshotCommandId.
* The new snapshot will be released when popped from the stack.
*/
void PushCopiedSnapshot(Snapshot snapshot)
{
PushActiveSnapshot(CopySnapshot(snapshot));
}
* UpdateActiveSnapshotCommandId
*
* Update the current CID of the active snapshot. This can only be applied
* to a snapshot that is not referenced elsewhere.
*/
void UpdateActiveSnapshotCommandId(void)
{
Assert(u_sess->utils_cxt.ActiveSnapshot != NULL);
Assert(u_sess->utils_cxt.ActiveSnapshot->as_snap->active_count == 1);
Assert(u_sess->utils_cxt.ActiveSnapshot->as_snap->regd_count == 0);
u_sess->utils_cxt.ActiveSnapshot->as_snap->curcid = GetCurrentCommandId(false);
}
* PopActiveSnapshot
*
* Remove the topmost snapshot from the active snapshot stack, decrementing the
* reference count, and free it if this was the last reference.
*/
void PopActiveSnapshot(void)
{
* In multi commit/rollback within stored procedure, the AciveSnapshot already poped.
* Therefore, no need to pop the active snapshot. Otherwise it will cause seg falut.
*/
if(!u_sess->utils_cxt.ActiveSnapshot) {
return;
}
ActiveSnapshotElt* newstack = NULL;
newstack = u_sess->utils_cxt.ActiveSnapshot->as_next;
Assert(u_sess->utils_cxt.ActiveSnapshot->as_snap->active_count > 0);
u_sess->utils_cxt.ActiveSnapshot->as_snap->active_count--;
if (u_sess->utils_cxt.ActiveSnapshot->as_snap->active_count == 0 &&
u_sess->utils_cxt.ActiveSnapshot->as_snap->regd_count == 0)
FreeSnapshot(u_sess->utils_cxt.ActiveSnapshot->as_snap);
pfree_ext(u_sess->utils_cxt.ActiveSnapshot);
u_sess->utils_cxt.ActiveSnapshot = newstack;
SnapshotResetXmin();
}
* GetActiveSnapshot
* Return the topmost snapshot in the Active stack.
*/
Snapshot GetActiveSnapshot(void)
{
#ifdef PGXC
* Check if topmost snapshot is null or not,
* if it is, a new one will be taken from GTM.
*/
if (!u_sess->utils_cxt.ActiveSnapshot && IS_PGXC_COORDINATOR && !IsConnFromCoord())
return NULL;
#endif
if (u_sess->utils_cxt.ActiveSnapshot == NULL) {
ereport(ERROR,
(errmodule(MOD_TRANS_SNAPSHOT),
errcode(ERRCODE_INVALID_STATUS),
errmsg("snapshot is not active")));
}
return u_sess->utils_cxt.ActiveSnapshot->as_snap;
}
* ActiveSnapshotSet
* Return whether there is at least one snapshot in the Active stack
*/
bool ActiveSnapshotSet(void)
{
return u_sess->utils_cxt.ActiveSnapshot != NULL;
}
* RegisterSnapshot
* Register a snapshot as being in use by the current resource owner
*
* If InvalidSnapshot is passed, it is not registered.
*/
Snapshot RegisterSnapshot(Snapshot snapshot)
{
if (snapshot == InvalidSnapshot)
return InvalidSnapshot;
return RegisterSnapshotOnOwner(snapshot, t_thrd.utils_cxt.CurrentResourceOwner);
}
* RegisterSnapshotOnOwner
* As above, but use the specified resource owner
*/
Snapshot RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
{
Snapshot snap;
if (snapshot == InvalidSnapshot)
return InvalidSnapshot;
snap = snapshot->copied ? snapshot : CopySnapshot(snapshot);
ResourceOwnerEnlargeSnapshots(owner);
snap->regd_count++;
ResourceOwnerRememberSnapshot(owner, snap);
u_sess->utils_cxt.RegisteredSnapshots++;
return snap;
}
* UnregisterSnapshot
*
* Decrement the reference count of a snapshot, remove the corresponding
* reference from CurrentResourceOwner, and free the snapshot if no more
* references remain.
*/
void UnregisterSnapshot(Snapshot snapshot)
{
if (snapshot == NULL)
return;
UnregisterSnapshotFromOwner(snapshot, t_thrd.utils_cxt.CurrentResourceOwner);
}
* UnregisterSnapshotFromOwner
* As above, but use the specified resource owner
*/
void UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
{
if (snapshot == NULL)
return;
if (ResourceOwnerForgetSnapshot(owner, snapshot, true)) {
Assert(u_sess->utils_cxt.RegisteredSnapshots > 0);
Assert(snapshot->regd_count > 0);
u_sess->utils_cxt.RegisteredSnapshots--;
if (--snapshot->regd_count == 0 && snapshot->active_count == 0) {
FreeSnapshot(snapshot);
SnapshotResetXmin();
}
}
}
void RegisterStreamSnapshots()
{
RegisterStreamSnapshot = true;
}
void ForgetRegisterStreamSnapshots()
{
RegisterStreamSnapshot = false;
}
void UnRegisterStreamSnapshots()
{
ForgetRegisterStreamSnapshots();
SnapshotResetXmin();
}
* SnapshotResetXmin
*
* If there are no more snapshots, we can reset our PGXACT->xmin to InvalidXid.
* Note we can do this without locking because we assume that storing an Xid
* is atomic.
*/
static void SnapshotResetXmin(void)
{
if (u_sess->utils_cxt.RegisteredSnapshots == 0 && u_sess->utils_cxt.ActiveSnapshot == NULL) {
t_thrd.pgxact->xmin = InvalidTransactionId;
t_thrd.proc->snapXmax = InvalidTransactionId;
t_thrd.proc->snapCSN = InvalidCommitSeqNo;
t_thrd.pgxact->csn_min = InvalidCommitSeqNo;
t_thrd.pgxact->csn_dr = InvalidCommitSeqNo;
t_thrd.proc->exrto_read_lsn = 0;
t_thrd.proc->exrto_gen_snap_time = 0;
}
}
* AtSubCommit_Snapshot
*/
void AtSubCommit_Snapshot(int level)
{
ActiveSnapshotElt* active = NULL;
* Relabel the active snapshots set in this subtransaction as though they
* are owned by the parent subxact.
*/
for (active = u_sess->utils_cxt.ActiveSnapshot; active != NULL; active = active->as_next) {
if (active->as_level < level)
break;
active->as_level = level - 1;
}
}
* AtSubAbort_Snapshot
* Clean up snapshots after a subtransaction abort
*/
void AtSubAbort_Snapshot(int level)
{
while (u_sess->utils_cxt.ActiveSnapshot && u_sess->utils_cxt.ActiveSnapshot->as_level >= level) {
ActiveSnapshotElt* next = NULL;
next = u_sess->utils_cxt.ActiveSnapshot->as_next;
* Decrement the snapshot's active count. If it's still registered or
* marked as active by an outer subtransaction, we can't free it yet.
*/
Assert(u_sess->utils_cxt.ActiveSnapshot->as_snap->active_count >= 1);
u_sess->utils_cxt.ActiveSnapshot->as_snap->active_count -= 1;
if (u_sess->utils_cxt.ActiveSnapshot->as_snap->active_count == 0 &&
u_sess->utils_cxt.ActiveSnapshot->as_snap->regd_count == 0)
FreeSnapshot(u_sess->utils_cxt.ActiveSnapshot->as_snap);
pfree_ext(u_sess->utils_cxt.ActiveSnapshot);
u_sess->utils_cxt.ActiveSnapshot = next;
}
SnapshotResetXmin();
}
static void free_snapshot_prepared_array(void)
{
if (!GTM_LITE_MODE) {
return;
}
int def_prep_array_num = get_snapshot_defualt_prepared_num();
if (u_sess->utils_cxt.CurrentSnapshotData->prepared_array_capacity > def_prep_array_num) {
pfree_ext(u_sess->utils_cxt.CurrentSnapshotData->prepared_array);
u_sess->utils_cxt.CurrentSnapshotData->prepared_array_capacity = 0;
u_sess->utils_cxt.CurrentSnapshotData->prepared_count = 0;
}
if (u_sess->utils_cxt.SecondarySnapshotData->prepared_array_capacity > def_prep_array_num) {
pfree_ext(u_sess->utils_cxt.SecondarySnapshotData->prepared_array);
u_sess->utils_cxt.SecondarySnapshotData->prepared_array_capacity = 0;
u_sess->utils_cxt.SecondarySnapshotData->prepared_count = 0;
}
}
* AtEOXact_Snapshot
* Snapshot manager's cleanup function for end of transaction
*/
void AtEOXact_Snapshot(bool isCommit)
{
* In transaction-snapshot mode we must release our privately-managed
* reference to the transaction snapshot. We must decrement
* RegisteredSnapshots to keep the check below happy. But we don't bother
* to do FreeSnapshot, for two reasons: the memory will go away with
* u_sess->top_transaction_mem_cxt anyway, and if someone has left the snapshot
* stacked as active, we don't want the code below to be chasing through a
* dangling pointer.
*/
if (u_sess->utils_cxt.FirstXactSnapshot != NULL) {
Assert(u_sess->utils_cxt.FirstXactSnapshot->regd_count > 0);
Assert(u_sess->utils_cxt.RegisteredSnapshots > 0);
u_sess->utils_cxt.RegisteredSnapshots--;
}
u_sess->utils_cxt.FirstXactSnapshot = NULL;
* If we exported any snapshots, clean them up.
*/
if (u_sess->utils_cxt.exportedSnapshots != NIL) {
ListCell *lc;
* Get rid of the files. Unlink failure is only a WARNING because (1)
* it's too late to abort the transaction, and (2) leaving a leaked
* file around has little real consequence anyway.
*
* We also also need to remove the snapshots from RegisteredSnapshots
* to prevent a warning below.
*
* As with the FirstXactSnapshot, we don't need to free resources of
* the snapshot iself as it will go away with the memory context.
*/
foreach(lc, u_sess->utils_cxt.exportedSnapshots) {
ExportedSnapshot *esnap = (ExportedSnapshot *)lfirst(lc);
if (unlink(esnap->snapfile))
ereport(WARNING, (errmsg("could not unlink file \"%s\": %m", esnap->snapfile)));
}
* Note: you might be thinking "why do we have the exportedSnapshots
* list at all? All we need is a counter!". You're right, but we do
* it this way in case we ever feel like improving xmin management.
*/
Assert(u_sess->utils_cxt.RegisteredSnapshots >= list_length(u_sess->utils_cxt.exportedSnapshots));
u_sess->utils_cxt.RegisteredSnapshots -= list_length(u_sess->utils_cxt.exportedSnapshots);
u_sess->utils_cxt.exportedSnapshots = NIL;
}
if (isCommit) {
ActiveSnapshotElt* active = NULL;
if (u_sess->utils_cxt.RegisteredSnapshots != 0)
ereport(WARNING,
(errmsg(
"%d registered snapshots seem to remain after cleanup", u_sess->utils_cxt.RegisteredSnapshots)));
for (active = u_sess->utils_cxt.ActiveSnapshot; active != NULL; active = active->as_next)
ereport(WARNING, (errmsg("snapshot still active")));
}
free_snapshot_prepared_array();
* And reset our state. We don't need to free the memory explicitly --
* it'll go away with u_sess->top_transaction_mem_cxt.
*/
u_sess->utils_cxt.ActiveSnapshot = NULL;
u_sess->utils_cxt.RegisteredSnapshots = 0;
Assert(!(u_sess->utils_cxt.CurrentSnapshot != NULL && u_sess->utils_cxt.CurrentSnapshot->user_data != NULL));
Assert(!(u_sess->utils_cxt.SecondarySnapshot != NULL && u_sess->utils_cxt.SecondarySnapshot->user_data != NULL));
u_sess->utils_cxt.CurrentSnapshot = NULL;
u_sess->utils_cxt.SecondarySnapshot = NULL;
u_sess->utils_cxt.FirstSnapshotSet = false;
SnapshotResetXmin();
t_thrd.proc->exrto_min = InvalidXLogRecPtr;
if (IS_EXRTO_STANDBY_READ && t_thrd.proc->exrto_reload_cache) {
t_thrd.proc->exrto_reload_cache = false;
reset_invalidation_cache();
}
}
* ExportSnapshot
* Export the snapshot to a file so that other backends can import it.
* Returns the token (the file name) that can be used to import this
* snapshot.
*/
char* ExportSnapshot(Snapshot snapshot, CommitSeqNo *snapshotCsn)
{
TransactionId topXid;
TransactionId* children = NULL;
ExportedSnapshot *esnap;
int nchildren;
int addTopXid;
StringInfoData buf;
FILE* f = NULL;
int i;
MemoryContext oldcxt;
char path[MAXPGPATH];
char pathtmp[MAXPGPATH];
int rc;
* It's tempting to call RequireTransactionChain here, since it's not very
* useful to export a snapshot that will disappear immediately afterwards.
* However, we haven't got enough information to do that, since we don't
* know if we're at top level or not. For example, we could be inside a
* plpgsql function that is going to fire off other transactions via
* dblink. Rather than disallow perfectly legitimate usages, don't make a
* check.
*
* Also note that we don't make any restriction on the transaction's
* isolation level; however, importers must check the level if they are
* serializable.
*/
* Get our transaction ID if there is one, to include in the snapshot.
*/
topXid = GetTopTransactionIdIfAny();
* We cannot export a snapshot from a subtransaction because there's no
* easy way for importers to verify that the same subtransaction is still
* running.
*/
if (IsSubTransaction())
ereport(
ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), errmsg("cannot export a snapshot from a subtransaction")));
* We do however allow previous committed subtransactions to exist.
* Importers of the snapshot must see them as still running, so get their
* XIDs to add them to the snapshot.
*/
nchildren = xactGetCommittedChildren(&children);
* Generate file path for the snapshot. We start numbering of snapshots
* inside the transaction from 1.
*/
rc = snprintf_s(path, sizeof(path), sizeof(path) - 1, "%s/%08X-%08X-%d", SNAPSHOT_EXPORT_DIR,
t_thrd.proc->backendId, t_thrd.proc->lxid, list_length(u_sess->utils_cxt.exportedSnapshots) + 1);
securec_check_ss(rc, "", "");
* Copy the snapshot into u_sess->top_transaction_mem_cxt, add it to the
* exportedSnapshots list, and mark it pseudo-registered. We do this to
* ensure that the snapshot's xmin is honored for the rest of the
* transaction. (Right now, because SnapshotResetXmin is so stupid, this
* is overkill; but later we might make that routine smarter.)
*/
snapshot = CopySnapshot(snapshot);
oldcxt = MemoryContextSwitchTo(u_sess->top_transaction_mem_cxt);
esnap = (ExportedSnapshot *) palloc(sizeof(ExportedSnapshot));
esnap->snapfile = pstrdup(path);
esnap->snapshot = snapshot;
u_sess->utils_cxt.exportedSnapshots = lappend(u_sess->utils_cxt.exportedSnapshots, esnap);
(void)MemoryContextSwitchTo(oldcxt);
snapshot->regd_count++;
u_sess->utils_cxt.RegisteredSnapshots++;
* Fill buf with a text serialization of the snapshot, plus identification
* data about this transaction. The format expected by ImportSnapshot is
* pretty rigid: each line must be fieldname:value.
*/
initStringInfo(&buf);
appendStringInfo(&buf, "vxid: %d/" XID_FMT "\n", t_thrd.proc->backendId, t_thrd.proc->lxid);
appendStringInfo(&buf, "pid:%lu\n", t_thrd.proc_cxt.MyProcPid);
appendStringInfo(&buf, "dbid:%u\n", u_sess->proc_cxt.MyDatabaseId);
appendStringInfo(&buf, "iso:%d\n", u_sess->utils_cxt.XactIsoLevel);
appendStringInfo(&buf, "ro:%d\n", u_sess->attr.attr_common.XactReadOnly);
appendStringInfo(&buf, "xmin:" XID_FMT "\n", snapshot->xmin);
appendStringInfo(&buf, "xmax:" XID_FMT "\n", snapshot->xmax);
appendStringInfo(&buf, "timeline:%u\n", snapshot->timeline);
* We must include our own top transaction ID in the top-xid data, since
* by definition we will still be running when the importing transaction
* adopts the snapshot, but GetSnapshotData never includes our own XID in
* the snapshot. (There must, therefore, be enough room to add it.)
*
* However, it could be that our topXid is after the xmax, in which case
* we shouldn't include it because xip[] members are expected to be before
* xmax. (We need not make the same check for subxip[] members, see
* snapshot.h.)
*/
addTopXid = (TransactionIdIsValid(topXid) && TransactionIdPrecedes(topXid, snapshot->xmax)) ? 1 : 0;
if (addTopXid)
appendStringInfo(&buf, "xip:" XID_FMT "\n", topXid);
appendStringInfo(&buf, "snapshotcsn:" XID_FMT "\n", snapshot->snapshotcsn);
if (snapshotCsn) {
*snapshotCsn = snapshot->snapshotcsn;
}
* Similarly, we add our subcommitted child XIDs to the subxid data. Here,
* we have to cope with possible overflow.
*/
if (nchildren > GetMaxSnapshotSubxidCount())
appendStringInfoString(&buf, "sof:1\n");
else {
appendStringInfoString(&buf, "sof:0\n");
for (i = 0; i < nchildren; i++)
appendStringInfo(&buf, "sxp:" XID_FMT "\n", children[i]);
}
appendStringInfo(&buf, "rec:%u\n", snapshot->takenDuringRecovery);
* Now write the text representation into a file. We first write to a
* ".tmp" filename, and rename to final filename if no error. This
* ensures that no other backend can read an incomplete file
* (ImportSnapshot won't allow it because of its valid-characters check).
*/
rc = snprintf_s(pathtmp, sizeof(pathtmp), sizeof(pathtmp) - 1, "%s.tmp", path);
securec_check_ss(rc, "", "");
if (!(f = AllocateFile(pathtmp, PG_BINARY_W)))
ereport(ERROR, (errcode_for_file_access(), errmsg("could not create file \"%s\": %m", pathtmp)));
if (fwrite(buf.data, buf.len, 1, f) != 1)
ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to file \"%s\": %m", pathtmp)));
if (FreeFile(f))
ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to file \"%s\": %m", pathtmp)));
* Now that we have written everything into a .tmp file, rename the file
* to remove the .tmp suffix.
*/
if (rename(pathtmp, path) < 0)
ereport(
ERROR, (errcode_for_file_access(), errmsg("could not rename file \"%s\" to \"%s\": %m", pathtmp, path)));
* The basename of the file is what we return from pg_export_snapshot().
* It's already in path in a textual format and we know that the path
* starts with SNAPSHOT_EXPORT_DIR. Skip over the prefix and the slash
* and pstrdup it so as not to return the address of a local variable.
*/
return pstrdup(path + strlen(SNAPSHOT_EXPORT_DIR) + 1);
}
* pg_export_snapshot
* SQL-callable wrapper for ExportSnapshot.
*/
Datum pg_export_snapshot(PG_FUNCTION_ARGS)
{
char* snapshotName = NULL;
snapshotName = ExportSnapshot(GetActiveSnapshot(), NULL);
PG_RETURN_TEXT_P(cstring_to_text(snapshotName));
}
* pg_export_snapshot_and_csn
* SQL-callable wrapper for ExportSnapshot.
* Return snapshot name and csn.
*/
Datum pg_export_snapshot_and_csn(PG_FUNCTION_ARGS)
{
char *snapshotName = NULL;
Datum values[2];
bool nulls[2] = {0};
TupleDesc tupdesc = NULL;
HeapTuple tuple = NULL;
Datum result;
CommitSeqNo snapshotcsn = InvalidCommitSeqNo;
char strCSN[MAX_ULONG_LENGTH] = {0};
errno_t errorno = EOK;
* Construct a tuple descriptor for the result row. This must match this
* function's pg_proc entry!
*/
tupdesc = CreateTemplateTupleDesc(2, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "snapshot_name", TEXTOID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 2, "CSN", TEXTOID, -1, 0);
tupdesc = BlessTupleDesc(tupdesc);
snapshotName = ExportSnapshot(GetActiveSnapshot(), &snapshotcsn);
errorno = snprintf_s(strCSN, MAX_ULONG_LENGTH, MAX_ULONG_LENGTH - 1, "%X", snapshotcsn);
securec_check_ss(errorno, "", "");
values[0] = CStringGetTextDatum(snapshotName);
values[1] = CStringGetTextDatum(strCSN);
nulls[0] = false;
nulls[1] = false;
tuple = heap_form_tuple(tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple);
PG_RETURN_DATUM(result);
}
* Parsing subroutines for ImportSnapshot: parse a line with the given
* prefix followed by a value, and advance *s to the next line. The
* filename is provided for use in error messages.
*/
static int parseIntFromText(const char* prefix, char** s, const char* filename)
{
char* ptr = *s;
int prefixlen = strlen(prefix);
int val;
if (strncmp(ptr, prefix, prefixlen) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr += prefixlen;
if (sscanf_s(ptr, "%d", &val) != 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr = strchr(ptr, '\n');
if (ptr == NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid snapshot data in file \"%s\"", filename)));
*s = ptr + 1;
return val;
}
static TransactionId parseXidFromText(const char* prefix, char** s, const char* filename)
{
char* ptr = *s;
int prefixlen = strlen(prefix);
TransactionId val;
if (strncmp(ptr, prefix, prefixlen) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr += prefixlen;
if (sscanf_s(ptr, XID_FMT, &val) != 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr = strchr(ptr, '\n');
if (ptr == NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid snapshot data in file \"%s\"", filename)));
*s = ptr + 1;
return val;
}
static GTM_Timeline parseTimelineFromText(const char* prefix, char** s, const char* filename)
{
char* ptr = *s;
int prefixlen = strlen(prefix);
GTM_Timeline val;
if (strncmp(ptr, prefix, prefixlen) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr += prefixlen;
if (sscanf_s(ptr, "%u", &val) != 1)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr = strchr(ptr, '\n');
if (ptr == NULL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid snapshot data in file \"%s\"", filename)));
*s = ptr + 1;
return val;
}
static void parseVxidFromText(const char *prefix, char **s, const char *filename, VirtualTransactionId *vxid)
{
char *ptr = *s;
int prefixlen = strlen(prefix);
if (strncmp(ptr, prefix, prefixlen) != 0)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr += prefixlen;
if (sscanf_s(ptr, "%d/%u", &vxid->backendId, &vxid->localTransactionId) != 2)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid snapshot data in file \"%s\"", filename)));
ptr = strchr(ptr, '\n');
if (!ptr)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid snapshot data in file \"%s\"", filename)));
*s = ptr + 1;
}
* ImportSnapshot
* Import a previously exported snapshot. The argument should be a
* filename in SNAPSHOT_EXPORT_DIR. Load the snapshot from that file.
* This is called by "SET TRANSACTION SNAPSHOT 'foo'".
*/
void ImportSnapshot(const char* idstr)
{
char path[MAXPGPATH];
FILE* f = NULL;
struct stat stat_buf;
char* filebuf = NULL;
VirtualTransactionId src_vxid;
ThreadId src_pid;
Oid src_dbid;
int src_isolevel;
bool src_readonly = false;
SnapshotData snapshot;
* Must be at top level of a fresh transaction. Note in particular that
* we check we haven't acquired an XID --- if we have, it's conceivable
* that the snapshot would show it as not running, making for very screwy
* behavior.
*/
if (u_sess->utils_cxt.FirstSnapshotSet || GetTopTransactionIdIfAny() != InvalidTransactionId || IsSubTransaction())
ereport(ERROR,
(errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("SET TRANSACTION SNAPSHOT must be called before any query")));
* If we are in read committed mode then the next query would execute with
* a new snapshot thus making this function call quite useless.
*/
if (!IsolationUsesXactSnapshot())
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("a snapshot-importing transaction must have isolation level SERIALIZABLE or REPEATABLE READ")));
* Verify the identifier: only 0-9, A-F and hyphens are allowed. We do
* this mainly to prevent reading arbitrary files.
*/
if (strspn(idstr, "0123456789ABCDEF-") != strlen(idstr))
ereport(
ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid snapshot identifier: \"%s\"", idstr)));
int rc = snprintf_s(path, MAXPGPATH, MAXPGPATH - 1, "%s/%s", SNAPSHOT_EXPORT_DIR, idstr);
securec_check_ss(rc, "", "");
f = AllocateFile(path, PG_BINARY_R);
if (f == NULL)
ereport(
ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid snapshot identifier: \"%s\"", idstr)));
if (fstat(fileno(f), &stat_buf))
ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("could not stat file \"%s\": %m", path)));
filebuf = (char*)palloc(stat_buf.st_size + 1);
if (fread(filebuf, stat_buf.st_size, 1, f) != 1)
ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("could not read file \"%s\": %m", path)));
filebuf[stat_buf.st_size] = '\0';
(void)FreeFile(f);
* Construct a snapshot struct by parsing the file content.
*/
rc = memset_s(&snapshot, sizeof(SnapshotData), 0, sizeof(snapshot));
securec_check(rc, "", "");
parseVxidFromText("vxid:", &filebuf, path, &src_vxid);
src_pid = parseIntFromText("pid:", &filebuf, path);
src_dbid = parseXidFromText("dbid:", &filebuf, path);
src_isolevel = parseIntFromText("iso:", &filebuf, path);
src_readonly = parseIntFromText("ro:", &filebuf, path);
snapshot.xmin = parseXidFromText("xmin:", &filebuf, path);
snapshot.xmax = parseXidFromText("xmax:", &filebuf, path);
snapshot.timeline = parseTimelineFromText("timeline:", &filebuf, path);
snapshot.snapshotcsn = parseXidFromText("snapshotcsn:", &filebuf, path);
parseIntFromText("sof:", &filebuf, path);
snapshot.takenDuringRecovery = parseIntFromText("rec:", &filebuf, path);
* Do some additional sanity checking, just to protect ourselves. We
* don't trouble to check the array elements, just the most critical
* fields.
*/
if (!VirtualTransactionIdIsValid(src_vxid) || !OidIsValid(src_dbid) || !TransactionIdIsNormal(snapshot.xmin) ||
!TransactionIdIsNormal(snapshot.xmax))
ereport(ERROR,
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION), errmsg("invalid snapshot data in file \"%s\"", path)));
* If we're serializable, the source transaction must be too, otherwise
* predicate.c has problems (SxactGlobalXmin could go backwards). Also, a
* non-read-only transaction can't adopt a snapshot from a read-only
* transaction, as predicate.c handles the cases very differently.
*/
if (IsolationIsSerializable()) {
if (src_isolevel != XACT_SERIALIZABLE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("a serializable transaction cannot import a snapshot from a non-serializable transaction")));
if (src_readonly && !u_sess->attr.attr_common.XactReadOnly)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("a non-read-only serializable transaction cannot import a snapshot from a read-only "
"transaction")));
}
* We cannot import a snapshot that was taken in a different database,
* because vacuum calculates OldestXmin on a per-database basis; so the
* source transaction's xmin doesn't protect us from data loss. This
* restriction could be removed if the source transaction were to mark its
* xmin as being globally applicable. But that would require some
* additional syntax, since that has to be known when the snapshot is
* initially taken. (See pgsql-hackers discussion of 2011-10-21.)
*/
if (src_dbid != u_sess->proc_cxt.MyDatabaseId)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot import a snapshot from a different database")));
SetTransactionSnapshot(&snapshot, &src_vxid, src_pid);
}
* XactHasExportedSnapshots
* Test whether current transaction has exported any snapshots.
*/
bool XactHasExportedSnapshots(void)
{
return (u_sess->utils_cxt.exportedSnapshots != NIL);
}
* DeleteAllExportedSnapshotFiles
* Clean up any files that have been left behind by a crashed backend
* that had exported snapshots before it died.
*
* This should be called during database startup or crash recovery.
*/
void DeleteAllExportedSnapshotFiles(void)
{
char buf[MAXPGPATH];
DIR* s_dir = NULL;
struct dirent* s_de = NULL;
int rc = 0;
if (!(s_dir = AllocateDir(SNAPSHOT_EXPORT_DIR))) {
* We really should have that directory in a sane cluster setup. But
* then again if we don't, it's not fatal enough to make it FATAL.
* Since we're running in the postmaster, LOG is our best bet.
*/
ereport(LOG, (errmsg("could not open directory \"%s\": %m", SNAPSHOT_EXPORT_DIR)));
return;
}
while ((s_de = ReadDir(s_dir, SNAPSHOT_EXPORT_DIR)) != NULL) {
if (strcmp(s_de->d_name, ".") == 0 || strcmp(s_de->d_name, "..") == 0)
continue;
rc = snprintf_s(buf, MAXPGPATH, MAXPGPATH - 1, "%s/%s", SNAPSHOT_EXPORT_DIR, s_de->d_name);
securec_check_ss(rc, "", "");
if (unlink(buf))
ereport(LOG, (errmsg("could not unlink file \"%s\": %m", buf)));
}
FreeDir(s_dir);
}
void StreamTxnContextSaveSnapmgr(void* stc)
{
STCSaveElem(((StreamTxnContext*)stc)->RecentGlobalXmin, u_sess->utils_cxt.RecentGlobalXmin);
STCSaveElem(((StreamTxnContext*)stc)->TransactionXmin, u_sess->utils_cxt.TransactionXmin);
STCSaveElem(((StreamTxnContext*)stc)->RecentXmin, u_sess->utils_cxt.RecentXmin);
}
void StreamTxnContextRestoreSnapmgr(const void* stc)
{
STCRestoreElem(((StreamTxnContext*)stc)->RecentGlobalXmin, u_sess->utils_cxt.RecentGlobalXmin);
STCRestoreElem(((StreamTxnContext*)stc)->TransactionXmin, u_sess->utils_cxt.TransactionXmin);
STCRestoreElem(((StreamTxnContext*)stc)->RecentXmin, u_sess->utils_cxt.RecentXmin);
}
bool ThereAreNoPriorRegisteredSnapshots(void)
{
if (u_sess->utils_cxt.RegisteredSnapshots <= 1)
return true;
return false;
}
Snapshot PgFdwCopySnapshot(Snapshot snapshot)
{
return CopySnapshot(snapshot);
}
* Setup a snapshot that replaces normal catalog snapshots that allows catalog
* access to behave just like it did at a certain point in the past.
*
* Needed for logical decoding.
*/
void SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB* tuplecids)
{
Assert(historic_snapshot != NULL);
u_sess->utils_cxt.HistoricSnapshot = historic_snapshot;
u_sess->utils_cxt.tuplecid_data = tuplecids;
}
* Make catalog snapshots behave normally again.
*/
void TeardownHistoricSnapshot(bool is_error)
{
u_sess->utils_cxt.HistoricSnapshot = NULL;
u_sess->utils_cxt.tuplecid_data = NULL;
}
bool HistoricSnapshotActive(void)
{
return u_sess->utils_cxt.HistoricSnapshot != NULL;
}
HTAB* HistoricSnapshotGetTupleCids(void)
{
Assert(HistoricSnapshotActive());
return u_sess->utils_cxt.tuplecid_data;
}