*
* reorderbuffer.h
* openGauss logical replay/reorder buffer management.
*
* Copyright (c) 2012-2014, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/include/replication/reorderbuffer.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef REORDERBUFFER_H
#define REORDERBUFFER_H
#include "lib/ilist.h"
#include "storage/sinval.h"
#include "replication/ddlmessage.h"
#include "utils/hsearch.h"
#include "utils/rel.h"
#include "utils/snapshot.h"
#include "utils/timestamp.h"
#include "access/ustore/knl_utuple.h"
#include "lib/binaryheap.h"
typedef struct LogicalDecodingContext LogicalDecodingContext;
typedef struct ReorderBufferTupleBuf {
slist_node node;
HeapTupleData tuple;
Size alloc_tuple_size;
ReorderBufferTupleBuf* freeNext;
} ReorderBufferTupleBuf;
#define ReorderBufferTupleBufData(p) ((HeapTupleHeader)MAXALIGN(((char*)p) + sizeof(ReorderBufferTupleBuf)))
typedef struct ReorderBufferUTupleBuf {
slist_node node;
UHeapTupleData tuple;
Size alloc_tuple_size;
ReorderBufferUTupleBuf* freeNext;
} ReorderBufferUTupleBuf;
#define ReorderBufferUTupleBufData(p) ((UHeapDiskTuple)MAXALIGN(((char*)p) + sizeof(ReorderBufferUTupleBuf)))
* Types of the change passed to a 'change' callback.
*
* For efficiency and simplicity reasons we want to keep Snapshots, CommandIds
* and ComboCids in the same list with the user visible INSERT/UPDATE/DELETE
* changes. Users of the decoding facilities will never see changes with
* *_INTERNAL_* actions.
*/
enum ReorderBufferChangeType {
REORDER_BUFFER_CHANGE_INSERT,
REORDER_BUFFER_CHANGE_UPDATE,
REORDER_BUFFER_CHANGE_DELETE,
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
REORDER_BUFFER_CHANGE_DDL,
REORDER_BUFFER_CHANGE_TRUNCATE,
REORDER_BUFFER_CHANGE_UINSERT,
REORDER_BUFFER_CHANGE_UUPDATE,
REORDER_BUFFER_CHANGE_UDELETE
};
* a single 'change', can be an insert (with one tuple), an update (old, new),
* or a delete (old).
*
* The same struct is also used internally for other purposes but that should
* never be visible outside reorderbuffer.c.
*/
typedef struct ReorderBufferChange {
XLogRecPtr lsn;
enum ReorderBufferChangeType action;
struct ReorderBufferTXN *txn;
RepOriginId origin_id;
* Context data for the change, which part of the union is valid depends
* on action/action_internal.
*/
union {
struct {
RelFileNode relnode;
bool clear_toast_afterwards;
ReorderBufferTupleBuf* oldtuple;
ReorderBufferTupleBuf* newtuple;
CommitSeqNo snapshotcsn;
} tp;
* Truncate data for REORDER_BUFFER_CHANGE_TRUNCATE representing
* one set of relations to be truncated.
*/
struct {
Size nrelids;
bool cascade;
bool restart_seqs;
Oid *relids;
} truncate;
struct {
RelFileNode relnode;
bool clear_toast_afterwards;
ReorderBufferUTupleBuf* oldtuple;
ReorderBufferUTupleBuf* newtuple;
CommitSeqNo snapshotcsn;
} utp;
struct {
char *prefix;
Size message_size;
char *message;
Oid relid;
DeparsedCommandType cmdtype;
} ddl;
Snapshot snapshot;
* New command id for existing snapshot in a catalog changing tx. Set
* when action == *_INTERNAL_COMMAND_ID.
*/
CommandId command_id;
* New cid mapping for catalog changing transaction, set when action
* == *_INTERNAL_TUPLECID.
*/
struct {
RelFileNode node;
ItemPointerData tid;
CommandId cmin;
CommandId cmax;
CommandId combocid;
} tuplecid;
} data;
* While in use this is how a change is linked into a transactions,
* otherwise it's the preallocated list.
*/
dlist_node node;
} ReorderBufferChange;
typedef struct ReorderBufferTXN {
* The transactions transaction id, can be a toplevel or sub xid.
*/
TransactionId xid;
bool has_catalog_changes;
bool is_known_as_subxact;
TransactionId toplevel_xid;
* LSN of the first data carrying, WAL record with knowledge about this
* xid. This is allowed to *not* be first record adorned with this xid, if
* the previous records aren't relevant for logical decoding.
*/
XLogRecPtr first_lsn;
* LSN of the record that lead to this xact to be committed or
* aborted. This can be a
* * plain commit record
* * plain commit record, of a parent transaction
* * prepared transaction commit
* * plain abort record
* * prepared transaction abort
* * error during decoding
* ----
*/
XLogRecPtr final_lsn;
* LSN pointing to the end of the commit record + 1.
*/
XLogRecPtr end_lsn;
* LSN of the last lsn at which snapshot information reside, so we can
* restart decoding from there and fully recover this transaction from
* WAL.
*/
XLogRecPtr restart_decoding_lsn;
RepOriginId origin_id;
XLogRecPtr origin_lsn;
CommitSeqNo csn;
* Commit time, only known when we read the actual commit record.
*/
TimestampTz commit_time;
* Base snapshot or NULL.
* The base snapshot is used to decode all changes until either this
* transaction modifies the catalog, or another catalog-modifying
* transaction commits.
*/
Snapshot base_snapshot;
XLogRecPtr base_snapshot_lsn;
dlist_node base_snapshot_node;
* How many ReorderBufferChange's do we have in this txn.
*
* Changes in subtransactions are *not* included but tracked separately.
*/
uint64 nentries;
* How many of the above entries are stored in memory in contrast to being
* spilled to disk.
*/
uint64 nentries_mem;
* Has this transaction been spilled to disk? It's not always possible to
* deduce that fact by comparing nentries with nentries_mem, because
* e.g. subtransactions of a large transaction might get serialized
* together with the parent - if they're restored to memory they'd have
* nentries_mem == nentries.
*/
bool serialized;
* List of ReorderBufferChange structs, including new Snapshots and new
* CommandIds
*/
dlist_head changes;
* List of (relation, ctid) => (cmin, cmax) mappings for catalog tuples.
* Those are always assigned to the toplevel transaction. (Keep track of
* #entries to create a hash of the right size)
*/
dlist_head tuplecids;
uint64 ntuplecids;
* On-demand built hash for looking up the above values.
*/
HTAB* tuplecid_hash;
* Hash containing (potentially partial) toast entries. NULL if no toast
* tuples have been found for the current change.
*/
HTAB* toast_hash;
* non-hierarchical list of subtransactions that are *not* aborted. Only
* used in toplevel transactions.
*/
dlist_head subtxns;
uint32 nsubtxns;
* Stored cache invalidations. This is not a linked list because we get
* all the invalidations at once.
*/
uint32 ninvalidations;
SharedInvalidationMessage* invalidations;
* Position in one of three lists:
* * list of subtransactions if we are *known* to be subxact
* * list of toplevel xacts (can be a as-yet unknown subxact)
* * list of preallocated ReorderBufferTXNs
* ---
*/
dlist_node node;
Size size;
* Private data pointer of the output plugin.
*/
void *output_plugin_private;
} ReorderBufferTXN;
typedef struct ReorderBuffer ReorderBuffer;
typedef void (*ReorderBufferApplyChangeCB)(
ReorderBuffer* rb, ReorderBufferTXN* txn, Relation relation, ReorderBufferChange* change);
typedef void (*ReorderBufferApplyTruncateCB) (
ReorderBuffer *rb, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
typedef void (*ReorderBufferBeginCB)(ReorderBuffer* rb, ReorderBufferTXN* txn);
typedef void (*ReorderBufferCommitCB)(ReorderBuffer* rb, ReorderBufferTXN* txn, XLogRecPtr commit_lsn);
typedef void (*ReorderBufferDDLMessageCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
const char *prefix,
Oid relid,
DeparsedCommandType cmdtype,
Size sz,
const char *message);
typedef void (*ReorderBufferAbortCB)(ReorderBuffer* rb, ReorderBufferTXN* txn);
typedef void (*ReorderBufferPrepareCB)(ReorderBuffer* rb, ReorderBufferTXN* txn);
struct ReorderBuffer {
* xid => ReorderBufferTXN lookup table
*/
HTAB* by_txn;
* Transactions that could be a toplevel xact, ordered by LSN of the first
* record bearing that xid..
*/
dlist_head toplevel_by_lsn;
* Transactions and subtransactions that have a base snapshot, ordered by
* LSN of the record which caused us to first obtain the base snapshot.
* This is not the same as toplevel_by_lsn, because we only set the base
* snapshot on the first logical-decoding-relevant record (eg. heap
* writes), whereas the initial LSN could be set by other operations.
*/
dlist_head txns_by_base_snapshot_lsn;
* one-entry sized cache for by_txn. Very frequently the same txn gets
* looked up over and over again.
*/
TransactionId by_txn_last_xid;
ReorderBufferTXN* by_txn_last_txn;
* Callacks to be called when a transactions commits.
*/
ReorderBufferBeginCB begin;
ReorderBufferApplyChangeCB apply_change;
ReorderBufferApplyTruncateCB apply_truncate;
ReorderBufferCommitCB commit;
ReorderBufferAbortCB abort;
ReorderBufferPrepareCB prepare;
ReorderBufferDDLMessageCB ddl;
* Pointer that will be passed untouched to the callbacks.
*/
void* private_data;
* Saved output plugin option
*/
bool output_rewrites;
* Private memory context.
*/
MemoryContext context;
* Data structure slab cache.
*
* We allocate/deallocate some structures very frequently, to avoid bigger
* overhead we cache some unused ones here.
*
* The maximum number of cached entries is controlled by const variables
* ontop of reorderbuffer.c
*/
dlist_head cached_transactions;
Size nr_cached_transactions;
dlist_head cached_changes;
Size nr_cached_changes;
slist_head cached_tuplebufs;
Size nr_cached_tuplebufs;
TransactionId lastRunningXactOldestXmin;
XLogRecPtr current_restart_decoding_lsn;
char* outbuf;
Size outbufsize;
Size size;
};
typedef struct ReorderBufferTXNByIdEnt {
TransactionId xid;
ReorderBufferTXN *txn;
} ReorderBufferTXNByIdEnt;
typedef struct ReorderBufferTupleCidKey {
RelFileNode relnode;
ItemPointerData tid;
} ReorderBufferTupleCidKey;
typedef struct ReorderBufferTupleCidEnt {
ReorderBufferTupleCidKey key;
CommandId cmin;
CommandId cmax;
CommandId combocid;
} ReorderBufferTupleCidEnt;
typedef struct ReorderBufferIterTXNEntry {
XLogRecPtr lsn;
ReorderBufferChange *change;
ReorderBufferTXN *txn;
int fd;
XLogSegNo segno;
} ReorderBufferIterTXNEntry;
typedef struct ReorderBufferIterTXNState {
binaryheap *heap;
Size nr_txns;
dlist_head old_change;
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
} ReorderBufferIterTXNState;
typedef struct ReorderBufferToastEnt {
Oid chunk_id;
int32 last_chunk_seq;
* have seen */
Size num_chunks;
Size size;
dlist_head chunks;
struct varlena *reconstructed;
* to in main tup */
} ReorderBufferToastEnt;
typedef struct ReorderBufferDiskChange {
Size size;
ReorderBufferChange change;
} ReorderBufferDiskChange;
ReorderBuffer* ReorderBufferAllocate(void);
void ReorderBufferFree(ReorderBuffer*);
ReorderBufferTupleBuf* ReorderBufferGetTupleBuf(ReorderBuffer*, Size tuple_len);
void ReorderBufferReturnTupleBuf(ReorderBuffer*, ReorderBufferTupleBuf* tuple);
ReorderBufferChange* ReorderBufferGetChange(ReorderBuffer*);
void ReorderBufferReturnChange(ReorderBuffer*, ReorderBufferChange*);
ReorderBufferUTupleBuf *ReorderBufferGetUTupleBuf(ReorderBuffer*, Size tuple_len);
void ReorderBufferQueueChange(LogicalDecodingContext*, TransactionId, XLogRecPtr lsn, ReorderBufferChange*);
void ReorderBufferRemoveChangeForUpsert(LogicalDecodingContext *ctx, TransactionId xid, XLogRecPtr lsn);
void ReorderBufferCommit(ReorderBuffer*, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
RepOriginId origin_id, XLogRecPtr origin_lsn, CommitSeqNo csn, TimestampTz commit_time);
void ReorderBufferAssignChild(ReorderBuffer*, TransactionId, TransactionId, XLogRecPtr commit_lsn);
void ReorderBufferCommitChild(ReorderBuffer*, TransactionId, TransactionId, XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
void ReorderBufferAbort(ReorderBuffer*, TransactionId, XLogRecPtr lsn);
void ReorderBufferAbortOld(ReorderBuffer*, TransactionId xid, XLogRecPtr lsn);
void ReorderBufferForget(ReorderBuffer*, TransactionId, XLogRecPtr lsn);
void ReorderBufferSetBaseSnapshot(ReorderBuffer*, TransactionId, XLogRecPtr lsn, struct SnapshotData* snap);
void ReorderBufferAddSnapshot(LogicalDecodingContext*, TransactionId, XLogRecPtr lsn, struct SnapshotData* snap);
void ReorderBufferAddNewCommandId(LogicalDecodingContext*, TransactionId, XLogRecPtr lsn, CommandId cid);
void ReorderBufferAddNewTupleCids(ReorderBuffer*, TransactionId, XLogRecPtr lsn, const RelFileNode& node,
const ItemPointerData& pt, CommandId cmin, CommandId cmax, CommandId combocid);
void ReorderBufferAddInvalidations(
ReorderBuffer*, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage* msgs);
TransactionId ReorderBufferGetOldestXmin(ReorderBuffer* rb);
void ReorderBufferProcessXid(ReorderBuffer*, TransactionId xid, XLogRecPtr lsn);
void ReorderBufferXidSetCatalogChanges(ReorderBuffer*, TransactionId xid, XLogRecPtr lsn);
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer*, TransactionId xid);
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer*, TransactionId xid);
ReorderBufferTXN* ReorderBufferGetOldestTXN(ReorderBuffer*);
void ReorderBufferSetRestartPoint(ReorderBuffer*, XLogRecPtr ptr);
void StartupReorderBuffer(void);
void ReorderBufferClear(const char *slotname);
extern void ReorderBufferQueueDDLMessage(LogicalDecodingContext *ctx, TransactionId xid, XLogRecPtr lsn,
const char*prefix, Size message_size, const char *message, Oid relid, DeparsedCommandType cmdtype);
#endif