*
* logical.h
* openGauss logical decoding coordination
*
* Copyright (c) 2012-2014, PostgreSQL Global Development Group
*
*
* IDENTIFICATION
* src/include/replication/logical.h
*
* ---------------------------------------------------------------------------------------
*/
#ifndef LOGICAL_H
#define LOGICAL_H
#include "replication/slot.h"
#include "access/xlog.h"
#include "access/xlogreader.h"
#include "replication/output_plugin.h"
#include "postgres.h"
#include "knl/knl_variable.h"
#include "nodes/pg_list.h"
#include "storage/proc.h"
#include "access/parallel_recovery/redo_item.h"
#include "nodes/parsenodes_common.h"
#include "nodes/replnodes.h"
#include "access/ustore/knl_utuple.h"
#include "replication/logical_queue.h"
#include "replication/parallel_reorderbuffer.h"
#define LOGICAL_DECODE_EXTRA_FLOAT_DIGITS 3
#define MAX_PARALLEL_DECODE_NUM 20
#define MAX_REPLICATION_SLOT_NUM 100
#define TOAST_FLAG ((uint32)1 << 31)
#define TOAST_MASK (((uint32)1 << 31) - 1)
typedef void (*LogicalOutputPluginWriterWrite)(
struct LogicalDecodingContext* lr, XLogRecPtr Ptr, TransactionId xid, bool last_write);
typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
typedef struct logicalLog logicalLog;
typedef struct LogicalDecodingContext {
MemoryContext context;
XLogReaderState* reader;
ReplicationSlot* slot;
struct ReorderBuffer* reorder;
struct SnapBuild* snapshot_builder;
* Marks the logical decoding context as fast forward decoding one.
* Such a context does not have plugin loaded so most of the the following
* properties are unused.
*/
bool fast_forward;
OutputPluginCallbacks callbacks;
OutputPluginOptions options;
* User specified options
*/
List* output_plugin_options;
* User-Provided callback for writing/streaming out data.
*/
LogicalOutputPluginWriterPrepareWrite prepare_write;
LogicalOutputPluginWriterWrite do_write;
* Output buffer.
*/
StringInfo out;
* Private data pointer of the output plugin.
*/
void* output_plugin_private;
* Private data pointer for the data writer.
*/
void* output_writer_private;
* State for writing output.
*/
bool accept_writes;
bool prepared_write;
XLogRecPtr write_location;
TransactionId write_xid;
bool random_mode;
} LogicalDecodingContext;
typedef struct chosenTable {
char *schema;
char *table;
} chosenTable;
typedef void(*ParallelDecodeChangeCB)(Relation relation, ParallelReorderBufferChange* change, logicalLog *logChange,
ParallelLogicalDecodingContext* ctx, int slotId);
typedef struct {
bool include_xids;
bool include_timestamp;
bool skip_empty_xacts;
bool xact_wrote_changes;
bool only_local;
int max_txn_in_memory;
int max_reorderbuffer_in_memory;
char decode_style;
int parallel_decode_num;
int sending_batch;
ParallelDecodeChangeCB decode_change;
List *tableWhiteList;
int parallel_queue_size;
bool include_originid;
} ParallelDecodeOption;
typedef struct {
MemoryContext context;
ParallelDecodeOption pOptions;
} ParallelDecodingData;
typedef struct {
MemoryContext context;
bool include_xids;
bool include_timestamp;
bool skip_empty_xacts;
bool xact_wrote_changes;
bool only_local;
int max_txn_in_memory;
int max_reorderbuffer_in_memory;
List *tableWhiteList;
bool include_originid;
} PluginTestDecodingData;
typedef struct ParallelLogicalDecodingContext {
MemoryContext context;
XLogReaderState* reader;
ReplicationSlot* slot;
ParallelReorderBuffer* reorder;
* Marks the logical decoding context as fast forward decoding one.
* Such a context does not have plugin loaded so most of the the following
* properties are unused.
*/
bool fast_forward;
ParallelOutputPluginCallbacks callbacks;
OutputPluginOptions options;
* User specified options
*/
List* output_plugin_options;
* Output buffer.
*/
StringInfo out;
* Private data pointer of the output plugin.
*/
void* output_plugin_private;
* Private data pointer for the data writer.
*/
void* output_writer_private;
* State for writing output.
*/
bool accept_writes;
bool prepared_write;
XLogRecPtr write_location;
TransactionId write_xid;
bool random_mode;
bool isParallel;
* Buffer for updating write_location
*/
StringInfo writeLocationBuffer;
} ParallelLogicalDecodingContext;
typedef struct ParallelDecodeWorker {
int id;
gs_thread_t tid;
int slotId;
LogicalQueue* changeQueue;
LogicalQueue* LogicalLogQueue;
logicalLog* freeGetLogicalLogHead;
MemoryContext oldCtx;
char dbUser[NAMEDATALEN];
char dbName[NAMEDATALEN];
char slotname[NAMEDATALEN];
char decodeStyle;
} ParallelDecodeWorker;
typedef struct ParallelDecodeReaderWorker {
uint32 id;
ThreadId tid;
int slotId;
char dbUser[NAMEDATALEN];
char dbName[NAMEDATALEN];
char slotname[NAMEDATALEN];
StartReplicationCmd *cmd;
XLogRecPtr current_lsn;
XLogRecPtr restart_lsn;
XLogRecPtr candidate_oldest_xmin_lsn;
XLogRecPtr candidate_oldest_xmin;
XLogRecPtr flushLSN;
LogicalQueue* queue;
MemoryContext oldCtx;
ParallelDecodingData data;
slock_t rwlock;
} ParallelDecodeReaderWorker;
typedef struct LogicalDispatcher {
MemoryContext oldCtx;
int decodeWorkerId;
ParallelDecodeWorker** decodeWorkers;
ParallelDecodeOption pOptions;
int totalWorkerCount;
ParallelDecodeReaderWorker* readWorker;
ParallelReorderBufferChange* freeChangeHead;
ParallelReorderBufferChange* freeGetChangeHead;
ReorderBufferTupleBuf* freeTupleHead;
ReorderBufferTupleBuf* freeGetTupleHead;
logicalLog* freeLogicalLogHead;
char slotName[NAMEDATALEN];
int32 pendingCount;
int32 pendingMax;
int exitCode;
uint64 totalCostTime;
uint64 txnCostTime;
uint64 pprCostTime;
uint64 sentPtr;
uint32 curChangeNum;
uint32 curTupleNum;
uint32 curLogNum;
uint64 num;
uint32* chosedWorkerIds;
uint32 chosedWorkerCount;
uint32 readyWorkerCnt;
int id;
TimestampTz decodeTime;
bool remainPatch;
bool checkpointNeedFullSync;
bool active;
bool firstLoop;
bool abnormal;
XLogRecPtr startpoint;
int64 workingTxnCnt;
int64 workingTxnMemory;
struct ReplicationSlot* MyReplicationSlot;
} LogicalDispatcher;
#define QUEUE_RESULT_LEN 512
typedef struct ParallelStatusData {
char slotName[NAMEDATALEN];
int parallelDecodeNum;
char readQueueLen[QUEUE_RESULT_LEN];
char decodeQueueLen[QUEUE_RESULT_LEN];
char readerLsn[MAXFNAMELEN];
int64 workingTxnCnt;
int64 workingTxnMemory;
} ParallelStatusData;
typedef struct DecodeOptionsDefault {
int parallel_decode_num;
int parallel_queue_size;
int max_txn_in_memory;
int max_reorderbuffer_in_memory;
} DecodeOptionsDefault;
extern LogicalDispatcher g_Logicaldispatcher[];
extern bool firstCreateDispatcher;
extern bool QuoteCheckOut(char* newval);
extern void CheckLogicalDecodingRequirements(Oid databaseId);
extern void ParallelReorderBufferQueueChange(ParallelReorderBuffer *rb, logicalLog *change, int slotId);
extern void ParallelReorderBufferForget(ParallelReorderBuffer *rb, int slotId, ParallelReorderBufferTXN *txn);
extern void ParallelReorderBufferCommit(ParallelReorderBuffer *rb, logicalLog *change, int slotId,
ParallelReorderBufferTXN *txn);
extern LogicalDecodingContext* CreateInitDecodingContext(const char* plugin, List* output_plugin_options,
bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write);
extern LogicalDecodingContext* CreateInitDecodingContextInternal(char* plugin, List* output_plugin_options,
XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write, bool set_xmin_horizon, ReplicationSlot* slot);
extern LogicalDecodingContext* CreateDecodingContext(XLogRecPtr start_lsn, List* output_plugin_options,
bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write);
extern LogicalDecodingContext* CreateDecodingContextForArea(XLogRecPtr start_lsn, const char* plugin,List* output_plugin_options,
bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write);
extern ParallelLogicalDecodingContext *ParallelCreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options,
bool fast_forward, XLogPageReadCB read_page, int slotId);
extern void DecodingContextFindStartpoint(LogicalDecodingContext* ctx);
extern bool DecodingContextReady(LogicalDecodingContext* ctx);
extern void FreeDecodingContext(LogicalDecodingContext* ctx);
extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin);
extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext* ctx, RepOriginId origin_id);
extern void CloseLogicalAdvanceConnect();
extern void NotifyPrimaryAdvance(XLogRecPtr restart, XLogRecPtr flush);
extern void NotifyPrimaryCatalogXmin(TransactionId catalogXmin);
extern void ParallelDecodeWorkerMain(void* point);
extern void LogicalReadWorkerMain(void* point);
extern void ParseProcessRecord(ParallelLogicalDecodingContext *ctx, XLogReaderState *record, ParallelDecodeReaderWorker
*worker);
extern void StartLogicalLogWorkers(char* dbUser, char* dbName, char* slotname, List *options, int parallelDecodeNum);
extern void CheckBooleanOption(DefElem *elem, bool *booleanOption, bool defaultValue);
extern void CheckIntOption(DefElem *elem, int *intOption, int defaultValue, int minVal, int maxVal);
extern int ParseParallelDecodeNumOnly(List *options);
extern bool CheckWhiteList(const List *whiteList, const char *schema, const char *table);
extern bool ParseStringToWhiteList(char *tableString, List **tableWhiteList);
extern void ParseWhiteList(List **whiteList, DefElem* elem);
extern void ParseDecodingOptionPlugin(ListCell* option, PluginTestDecodingData* data, OutputPluginOptions* opt);
extern ParallelStatusData *GetParallelDecodeStatus(uint32 *num);
extern void PrintLiteral(StringInfo s, Oid typid, char* outputstr);
extern void FreeLogicalLog(ParallelReorderBuffer *rb, logicalLog *logChange, int slotId, bool nocache);
extern bool LogicalDecodeParseOptionsDefault(const char* defaultStr, void **options);
extern DecodeOptionsDefault* LogicalDecodeGetOptionsDefault();
template <typename T> void LogicalDecodeReportLostChanges(const T *iterstate);
extern void tuple_to_stringinfo(Relation relation, StringInfo s, TupleDesc tupdesc, HeapTuple tuple, bool isOld,
bool printOid = false);
#endif