* txid.c
*
* Export internal transaction IDs to user level.
*
* Note that only top-level transaction IDs are ever converted to TXID.
* This is important because TXIDs frequently persist beyond the global
* xmin horizon, or may even be shipped to other machines, so we cannot
* rely on being able to correlate subtransaction IDs with their parents
* via functions such as SubTransGetTopmostTransaction().
*
*
* Copyright (c) 2003-2012, PostgreSQL Global Development Group
* 64-bit txids: Marko Kreen, Skype Technologies
*
* src/backend/utils/adt/txid.c
*
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#include "access/gtm.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "libpq/pqformat.h"
#include "storage/procarray.h"
#include "storage/standby.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#define MAX_TXID UINT64CONST(0x7FFFFFFFFFFFFFFF)
* If defined, use bsearch() function for searching for txids in snapshots
* that have more than the specified number of values.
*/
#define USE_BSEARCH_IF_NXIP_GREATER 30
* Snapshot containing 8byte txids.
*/
typedef struct {
* 4-byte length hdr, should not be touched directly.
*
* Explicit embedding is ok as we want always correct alignment anyway.
*/
int32 __varsz;
uint32 nxip;
TransactionId xmin;
TransactionId xmax;
TransactionId csn;
TransactionId xip[1];
} TxidSnapshot;
#define TXID_SNAPSHOT_SIZE(nxip) (offsetof(TxidSnapshot, xip) + sizeof(TransactionId) * (nxip))
#define TXID_SNAPSHOT_MAX_NXIP ((MaxAllocSize - offsetof(TxidSnapshot, xip)) / sizeof(TransactionId))
* txid comparator for qsort/bsearch
*/
static int cmp_txid(const void* aa, const void* bb)
{
TransactionId a = *(const TransactionId*)aa;
TransactionId b = *(const TransactionId*)bb;
if (a < b)
return -1;
if (a > b)
return 1;
return 0;
}
* check txid visibility.
*/
static bool is_visible_txid(TransactionId value, const TxidSnapshot* snap)
{
if (value < snap->xmin)
return true;
else if (value >= snap->xmax)
return false;
#ifdef USE_BSEARCH_IF_NXIP_GREATER
else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER) {
void* res = NULL;
res = bsearch(&value, snap->xip, snap->nxip, sizeof(TransactionId), cmp_txid);
return (res) ? false : true;
}
#endif
else {
uint32 i;
for (i = 0; i < snap->nxip; i++) {
if (value == snap->xip[i])
return false;
}
return true;
}
}
* helper functions to use StringInfo for TxidSnapshot creation.
*/
static StringInfo buf_init(TransactionId xmin, TransactionId xmax)
{
TxidSnapshot snap;
StringInfo buf;
snap.xmin = xmin;
snap.xmax = xmax;
snap.nxip = 0;
buf = makeStringInfo();
appendBinaryStringInfo(buf, (char*)&snap, TXID_SNAPSHOT_SIZE(0));
return buf;
}
static void buf_add_txid(StringInfo buf, TransactionId xid)
{
TxidSnapshot* snap = (TxidSnapshot*)buf->data;
snap->nxip++;
appendBinaryStringInfo(buf, (char*)&xid, sizeof(xid));
}
static TxidSnapshot* buf_finalize(StringInfo buf)
{
TxidSnapshot* snap = (TxidSnapshot*)buf->data;
SET_VARSIZE(snap, buf->len);
buf->data = NULL;
pfree_ext(buf);
return snap;
}
* simple number parser.
*
* We return 0 on error, which is invalid value for txid.
*/
static TransactionId str2txid(const char* s, const char** endp)
{
TransactionId val = 0;
TransactionId cutoff = MAX_TXID / 10;
TransactionId cutlim = MAX_TXID % 10;
for (; *s; s++) {
unsigned d;
if (*s < '0' || *s > '9')
break;
d = *s - '0';
* check for overflow
*/
if (val > cutoff || (val == cutoff && d > cutlim)) {
val = 0;
break;
}
val = val * 10 + d;
}
if (endp != NULL)
*endp = s;
return val;
}
* parse snapshot from cstring
*/
static TxidSnapshot* parse_snapshot(const char* str)
{
TransactionId xmin;
TransactionId xmax;
TransactionId last_val = 0, val;
const char* str_start = str;
const char* endp = NULL;
StringInfo buf;
xmin = str2txid(str, &endp);
if (*endp != ':')
goto bad_format;
str = endp + 1;
xmax = str2txid(str, &endp);
if (*endp != ':')
goto bad_format;
str = endp + 1;
if (xmin == 0 || xmax == 0 || xmin > xmax)
goto bad_format;
buf = buf_init(xmin, xmax);
while (*str != '\0') {
val = str2txid(str, &endp);
str = endp;
if (val < xmin || val >= xmax || val <= last_val)
goto bad_format;
buf_add_txid(buf, val);
last_val = val;
if (*str == ',')
str++;
else if (*str != '\0')
goto bad_format;
}
return buf_finalize(buf);
bad_format:
ereport(
ERROR, (errcode(ERRCODE_UNEXPECTED_NULL_VALUE), errmsg("invalid input for txid_snapshot: \"%s\"", str_start)));
return NULL;
}
* Public functions.
*
* txid_current() and txid_current_snapshot() are the only ones that
* communicate with core xid machinery. All the others work on data
* returned by them.
*/
* txid_current() returns int8
*
* Return the current toplevel transaction ID
*/
Datum txid_current(PG_FUNCTION_ARGS)
{
PG_RETURN_INT64(GetTopTransactionId());
}
Datum gs_txid_oldestxmin(PG_FUNCTION_ARGS)
{
PG_RETURN_INT64(GetGlobalOldestXmin());
}
Datum pgxc_snapshot_status(PG_FUNCTION_ARGS)
{
#ifndef ENABLE_MULTIPLE_NODES
FuncCallContext* funcctx = NULL;
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("unsupported view in single node mode.")));
SRF_RETURN_DONE(funcctx);
#else
FuncCallContext* funcctx = NULL;
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported function or view in %s mode.", GTM_LITE_MODE ? "GTM-Lite" : "GTM-Free")));
SRF_RETURN_DONE(funcctx);
#endif
}
* txid_current_snapshot() returns txid_snapshot
*
* Return current snapshot in TXID format
*
* Note that only top-transaction XIDs are included in the snapshot.
*/
Datum txid_current_snapshot(PG_FUNCTION_ARGS)
{
TxidSnapshot* snap = NULL;
uint32 size;
Snapshot cur;
cur = GetActiveSnapshot();
if (cur == NULL)
ereport(ERROR, (errcode(ERRCODE_UNEXPECTED_NULL_VALUE), errmsg("no active snapshot set")));
size = TXID_SNAPSHOT_SIZE(0);
snap = (TxidSnapshot*)palloc(size);
SET_VARSIZE(snap, size);
snap->xmin = cur->xmin;
snap->xmax = cur->xmax;
snap->csn = cur->snapshotcsn;
snap->nxip = 0;
PG_RETURN_POINTER(snap);
}
* txid_snapshot_in(cstring) returns txid_snapshot
*
* input function for type txid_snapshot
*/
Datum txid_snapshot_in(PG_FUNCTION_ARGS)
{
char* str = PG_GETARG_CSTRING(0);
TxidSnapshot* snap = NULL;
snap = parse_snapshot(str);
PG_RETURN_POINTER(snap);
}
* txid_snapshot_out(txid_snapshot) returns cstring
*
* output function for type txid_snapshot
*/
Datum txid_snapshot_out(PG_FUNCTION_ARGS)
{
TxidSnapshot* snap = (TxidSnapshot*)PG_GETARG_VARLENA_P(0);
StringInfoData str;
uint32 i;
initStringInfo(&str);
appendStringInfo(&str, XID_FMT ":", snap->xmin);
appendStringInfo(&str, XID_FMT ":", snap->xmax);
for (i = 0; i < snap->nxip; i++) {
if (i > 0)
appendStringInfoChar(&str, ',');
appendStringInfo(&str, XID_FMT, snap->xip[i]);
}
PG_FREE_IF_COPY(snap, 0);
PG_RETURN_CSTRING(str.data);
}
* txid_snapshot_recv(internal) returns txid_snapshot
*
* binary input function for type txid_snapshot
*
* format: int4 nxip, int8 xmin, int8 xmax, int8 xip
*/
Datum txid_snapshot_recv(PG_FUNCTION_ARGS)
{
StringInfo buf = (StringInfo)PG_GETARG_POINTER(0);
TxidSnapshot* snap = NULL;
TransactionId last = 0;
int nxip;
int i;
TransactionId xmin, xmax, csn;
nxip = pq_getmsgint(buf, 4);
if (nxip < 0 || nxip > (int)TXID_SNAPSHOT_MAX_NXIP)
goto bad_format;
xmin = pq_getmsgint64(buf);
xmax = pq_getmsgint64(buf);
csn = pq_getmsgint64(buf);
if (xmin == 0 || xmax == 0 || xmin > xmax || xmax > MAX_TXID)
goto bad_format;
snap = (TxidSnapshot*)palloc(TXID_SNAPSHOT_SIZE(nxip));
snap->xmin = xmin;
snap->xmax = xmax;
snap->csn = csn;
snap->nxip = nxip;
SET_VARSIZE(snap, TXID_SNAPSHOT_SIZE(nxip));
for (i = 0; i < nxip; i++) {
TransactionId cur = pq_getmsgint64(buf);
if (cur <= last || cur < xmin || cur >= xmax)
goto bad_format;
snap->xip[i] = cur;
last = cur;
}
PG_RETURN_POINTER(snap);
bad_format:
ereport(ERROR, (errcode(ERRCODE_UNEXPECTED_NULL_VALUE), errmsg("invalid snapshot data")));
return (Datum)NULL;
}
* txid_snapshot_send(txid_snapshot) returns bytea
*
* binary output function for type txid_snapshot
*
* format: int4 nxip, int8 xmin, int8 xmax, int8 xip
*/
Datum txid_snapshot_send(PG_FUNCTION_ARGS)
{
TxidSnapshot* snap = (TxidSnapshot*)PG_GETARG_VARLENA_P(0);
StringInfoData buf;
uint32 i;
pq_begintypsend(&buf);
pq_sendint32(&buf, snap->nxip);
pq_sendint64(&buf, snap->xmin);
pq_sendint64(&buf, snap->xmax);
pq_sendint64(&buf, snap->csn);
for (i = 0; i < snap->nxip; i++)
pq_sendint64(&buf, snap->xip[i]);
PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
}
* txid_visible_in_snapshot(int8, txid_snapshot) returns bool
*
* is txid visible in snapshot ?
*/
Datum txid_visible_in_snapshot(PG_FUNCTION_ARGS)
{
TransactionId value = PG_GETARG_INT64(0);
TxidSnapshot* snap = (TxidSnapshot*)PG_GETARG_VARLENA_P(1);
PG_RETURN_BOOL(is_visible_txid(value, snap));
}
* txid_snapshot_xmin(txid_snapshot) returns int8
*
* return snapshot's xmin
*/
Datum txid_snapshot_xmin(PG_FUNCTION_ARGS)
{
TxidSnapshot* snap = (TxidSnapshot*)PG_GETARG_VARLENA_P(0);
PG_RETURN_INT64(snap->xmin);
}
* txid_snapshot_xmax(txid_snapshot) returns int8
*
* return snapshot's xmax
*/
Datum txid_snapshot_xmax(PG_FUNCTION_ARGS)
{
TxidSnapshot* snap = (TxidSnapshot*)PG_GETARG_VARLENA_P(0);
PG_RETURN_INT64(snap->xmax);
}
* txid_snapshot_xip(txid_snapshot) returns setof int8
*
* return in-progress TXIDs in snapshot.
*/
Datum txid_snapshot_xip(PG_FUNCTION_ARGS)
{
FuncCallContext* fctx = NULL;
TxidSnapshot* snap = NULL;
TransactionId value;
errno_t rc = EOK;
if (SRF_IS_FIRSTCALL()) {
TxidSnapshot* arg = (TxidSnapshot*)PG_GETARG_VARLENA_P(0);
fctx = SRF_FIRSTCALL_INIT();
snap = (TxidSnapshot*)MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
if (VARSIZE(arg) > 0) {
rc = memcpy_s(snap, VARSIZE(arg), arg, VARSIZE(arg));
securec_check(rc, "\0", "\0");
}
fctx->user_fctx = snap;
}
fctx = SRF_PERCALL_SETUP();
snap = (TxidSnapshot*)fctx->user_fctx;
if (fctx->call_cntr < snap->nxip) {
value = snap->xip[fctx->call_cntr];
SRF_RETURN_NEXT(fctx, Int64GetDatum(value));
} else {
SRF_RETURN_DONE(fctx);
}
}