* Copyright (c) 2020 Huawei Technologies Co.,Ltd.
*
* openGauss is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
*
* http://license.coscl.org.cn/MulanPSL2
*
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* -------------------------------------------------------------------------
*
* xact.cpp
* parse xact xlog
*
* IDENTIFICATION
* src/gausskernel/storage/access/redo/xact.cpp
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "knl/knl_variable.h"
#ifdef PGXC
#include "pgxc/pgxc.h"
#endif
#include "access/clog.h"
#include "access/csnlog.h"
#include "access/multixact.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
#include "access/xlogproc.h"
#include "access/multi_redo_api.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "catalog/storage.h"
#include "commands/async.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "commands/sequence.h"
#include "executor/spi.h"
#include "libpq/be-fsstubs.h"
#include "pgxc/groupmgr.h"
#include "replication/datasyncrep.h"
#include "replication/datasender.h"
#include "replication/dataqueue.h"
#include "replication/walsender.h"
#include "replication/syncrep.h"
#include "storage/smgr/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/sinvaladt.h"
#include "storage/smgr/smgr.h"
#include "storage/smgr/segment.h"
#include "utils/combocid.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/plog.h"
#include "utils/relmapper.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
#include "access/redo_common.h"
* *************Add for batchredo begin***************
* for these func ,they don't hold the lock and parse the record
* they only operator the page, and update page's LSN
* they are shared by Normal redo、ExtremRTO and DFV
* **************************************************
*/
XLogRecParseState *xact_redo_rmddl_parse_to_block(XLogReaderState *record, XLogRecParseState *recordstatehead,
uint32 *blocknum, ColFileNodeRel *xnodes, int nrels)
{
XLogRecParseState *blockstate = NULL;
bool compress = (bool)(XLogRecGetInfo(record) & XLR_REL_COMPRESS);
Assert(nrels > 0);
(*blocknum)++;
XLogParseBufferAllocListStateFunc(record, &blockstate, &recordstatehead);
if (blockstate == NULL) {
return NULL;
}
RelFileNodeForkNum filenode = RelFileNodeForkNumFill(NULL, InvalidBackendId, MAIN_FORKNUM, InvalidBlockNumber);
XLogRecSetBlockCommonState(record, BLOCK_DATA_DDL_TYPE, filenode, blockstate);
XLogRecSetBlockDdlState(&(blockstate->blockparse.extra_rec.blockddlrec), BLOCK_DDL_DROP_RELNODE, (char *)xnodes,
nrels, compress);
return recordstatehead;
}
static XLogRecParseState *xact_xlog_commit_internal_parse(XLogReaderState *record, uint32 *blocknum)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
bool compress = (bool)(XLogRecGetInfo(record) & XLR_REL_COMPRESS);
TransactionId xid = XLogRecGetXid(record);
CommitSeqNo commitseql = COMMITSEQNO_INPROGRESS;
TransactionId *sub_xids = NULL;
int nsubxacts;
uint64 csn;
TransactionId max_xid;
XLogRecParseState *recordstatehead = NULL;
*blocknum = 0;
if (info != XLOG_XACT_COMMIT_COMPACT) {
xl_xact_commit *xlrec = NULL;
if (info == XLOG_XACT_COMMIT) {
xlrec = (xl_xact_commit *)XLogRecGetData(record);
} else {
xl_xact_commit_prepared *xlrecpre = (xl_xact_commit_prepared *)XLogRecGetData(record);
xlrec = &(xlrecpre->crec);
xid = xlrecpre->xid;
}
sub_xids = GET_SUB_XACTS(xlrec->xnodes, xlrec->nrels, compress);
nsubxacts = xlrec->nsubxacts;
csn = xlrec->csn;
} else {
xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *)XLogRecGetData(record);
sub_xids = xlrec->subxacts;
nsubxacts = xlrec->nsubxacts;
csn = xlrec->csn;
}
max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
recordstatehead = XactXlogClogParseToBlock(record, recordstatehead, blocknum, xid, nsubxacts, sub_xids,
CLOG_XID_STATUS_COMMITTED);
if (recordstatehead == NULL) {
return NULL;
}
if (csn >= COMMITSEQNO_FROZEN) {
commitseql = csn;
} else if (csn == COMMITSEQNO_INPROGRESS) {
commitseql = COMMITSEQNO_FROZEN;
}
recordstatehead = XactXlogCsnlogParseToBlock(record, blocknum, xid, nsubxacts, sub_xids, commitseql,
recordstatehead);
if (recordstatehead == NULL) {
return NULL;
}
recordstatehead = xact_xlog_commit_parse_to_block(record, recordstatehead, blocknum, max_xid, commitseql);
return recordstatehead;
}
static XLogRecParseState *xact_xlog_abort_internal_parse(XLogReaderState *record, uint32 *blocknum)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
bool compress = (bool)(XLogRecGetInfo(record) & XLR_REL_COMPRESS);
TransactionId xid = XLogRecGetXid(record);
TransactionId *sub_xids = NULL;
int nsubxacts;
TransactionId max_xid;
XLogRecParseState *recordstatehead = NULL;
*blocknum = 0;
xl_xact_abort *xlrec = NULL;
if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_WITH_XID) {
xlrec = (xl_xact_abort *)XLogRecGetData(record);
} else {
xl_xact_abort_prepared *xlrecpre = (xl_xact_abort_prepared *)XLogRecGetData(record);
xlrec = &(xlrecpre->arec);
xid = xlrecpre->xid;
}
sub_xids = GET_SUB_XACTS(xlrec->xnodes, xlrec->nrels, compress);
nsubxacts = xlrec->nsubxacts;
max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
recordstatehead = XactXlogClogParseToBlock(record, recordstatehead, blocknum, xid, nsubxacts, sub_xids,
CLOG_XID_STATUS_ABORTED);
if (recordstatehead == NULL) {
return NULL;
}
recordstatehead = XactXlogCsnlogParseToBlock(record, blocknum, xid, nsubxacts, sub_xids, COMMITSEQNO_ABORTED,
recordstatehead);
if (recordstatehead == NULL) {
return NULL;
}
recordstatehead = xact_xlog_abort_parse_to_block(record, recordstatehead, blocknum, max_xid, COMMITSEQNO_ABORTED);
return recordstatehead;
}
static XLogRecParseState *xact_xlog_prepare_internal_parse(XLogReaderState *record, uint32 *blocknum)
{
TransactionId xid;
TransactionId *sub_xids = NULL;
int nsubxacts;
TransactionId max_xid;
XLogRecParseState *recordstatehead = NULL;
char *recorddata = XLogRecGetData(record);
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *)recorddata;
*blocknum = 0;
if (TransactionIdIsNormal(hdr->xid)) {
bool compressMagic = hdr->magic == TWOPHASE_MAGIC_COMPRESSION;
int hdrSize = (hdr->magic == TWOPHASE_MAGIC_NEW || compressMagic) ?
sizeof(TwoPhaseFileHeaderNew) : sizeof(TwoPhaseFileHeader);
sub_xids = (TransactionId *)(recorddata + MAXALIGN(hdrSize));
nsubxacts = hdr->nsubxacts;
xid = hdr->xid;
recordstatehead = XactXlogCsnlogParseToBlock(record, blocknum, xid, nsubxacts, sub_xids,
COMMITSEQNO_COMMIT_INPROGRESS, recordstatehead);
if (recordstatehead == NULL) {
return NULL;
}
}
max_xid = XLogRecGetXid(record);
recordstatehead = xact_xlog_prepare_parse_to_block(record, recordstatehead, blocknum, max_xid);
return recordstatehead;
}
XLogRecParseState *xact_redo_parse_to_block(XLogReaderState *record, uint32 *blocknum)
{
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
XLogRecParseState *recordblockstate = NULL;
Assert(!XLogRecHasAnyBlockRefs(record));
if ((info == XLOG_XACT_COMMIT_COMPACT) || (info == XLOG_XACT_COMMIT) || (info == XLOG_XACT_COMMIT_PREPARED)) {
recordblockstate = xact_xlog_commit_internal_parse(record, blocknum);
} else if ((info == XLOG_XACT_ABORT) || (info == XLOG_XACT_ABORT_PREPARED) || (info == XLOG_XACT_ABORT_WITH_XID)) {
recordblockstate = xact_xlog_abort_internal_parse(record, blocknum);
} else if (info == XLOG_XACT_PREPARE) {
recordblockstate = xact_xlog_prepare_internal_parse(record, blocknum);
} else if (info == XLOG_XACT_ASSIGNMENT) {
const uint32 rightShiftSize = 32;
ereport(WARNING, (errmsg("xact_redo_parse_to_block: XLOG_XACT_ASSIGNMENT log(%X/%X) could not be here!!",
(uint32)(record->EndRecPtr >> rightShiftSize), (uint32)(record->EndRecPtr))));
} else
ereport(PANIC, (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
errmsg("xact_redo_parse_to_block: unknown op code %u", info)));
return recordblockstate;
}
* *************Add for batchredo end****************
* for these func ,they don't hold the lock and parse the record
* they only operator the page, and update page's LSN
* they are shared by Normal redo、ExtremRTO and DFV
* **************************************************
*/
* ************Add for batchredo begin****************
* for these func ,they don't hold the lock and parse the record
* they only operator the page, and update page's LSN
* they are shared by Normal redo、ExtremRTO and DFV
* ***************************************************
*/
XLogRecParseState *xact_xlog_commit_parse_to_block(XLogReaderState *record, XLogRecParseState *recordstatehead,
uint32 *blocknum, TransactionId maxxid, CommitSeqNo maxseqnum)
{
XLogRecParseState *blockstate = NULL;
RelFileNode relnode;
XLogRecPtr globalDelayDDLLSN;
XLogRecPtr lsn = record->EndRecPtr;
bool delayddlflag = false;
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
bool compress = (bool)(XLogRecGetInfo(record) & XLR_REL_COMPRESS);
uint64 xrecinfo = 0;
TimestampTz xact_time;
uint8 updateminrecovery = false;
ColFileNodeRel *xnodes = NULL;
int nrels = 0;
SharedInvalidationMessage *inval_msgs = NULL;
int invalidmsgnum = 0;
int nlibrary = 0;
char *libfilename = NULL;
relnode.dbNode = InvalidOid;
relnode.spcNode = InvalidOid;
relnode.relNode = InvalidOid;
relnode.bucketNode = InvalidBktId;
relnode.opt = 0;
(*blocknum)++;
XLogParseBufferAllocListStateFunc(record, &blockstate, &recordstatehead);
if (blockstate == NULL) {
return NULL;
}
if (info != XLOG_XACT_COMMIT_COMPACT) {
xl_xact_commit *xlrec = NULL;
TransactionId *subxacts = NULL;
int nsubxacts = 0;
if (info == XLOG_XACT_COMMIT) {
xlrec = (xl_xact_commit *)XLogRecGetData(record);
} else {
xl_xact_commit_prepared *xlrecpre = (xl_xact_commit_prepared *)XLogRecGetData(record);
xlrec = &(xlrecpre->crec);
}
xrecinfo = xlrec->xinfo;
xact_time = xlrec->xact_time;
xnodes = xlrec->xnodes;
nrels = xlrec->nrels;
if (nrels > 0) {
updateminrecovery = true;
globalDelayDDLLSN = GetDDLDelayStartPtr();
if (!XLogRecPtrIsInvalid(globalDelayDDLLSN) && XLByteLT(globalDelayDDLLSN, lsn))
delayddlflag = true;
else
delayddlflag = false;
}
subxacts = GET_SUB_XACTS(xlrec->xnodes, xlrec->nrels, compress);
nsubxacts = xlrec->nsubxacts;
inval_msgs = (SharedInvalidationMessage *)&(subxacts[xlrec->nsubxacts]);
invalidmsgnum = xlrec->nmsgs;
nlibrary = xlrec->nlibrary;
if (nlibrary > 0) {
libfilename = (char *)xnodes + (nrels * SIZE_OF_COLFILENODE(compress)) +
(nsubxacts * sizeof(TransactionId)) + (invalidmsgnum * sizeof(SharedInvalidationMessage));
}
relnode.dbNode = xlrec->dbId;
relnode.spcNode = xlrec->tsId;
} else {
xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *)XLogRecGetData(record);
xact_time = xlrec->xact_time;
}
RelFileNodeForkNum filenode =
RelFileNodeForkNumFill(&relnode, InvalidBackendId, InvalidForkNumber, InvalidBlockNumber);
XLogRecSetBlockCommonState(record, BLOCK_DATA_XACTDATA_TYPE, filenode, blockstate);
XLogRecSetXactCommonState(&blockstate->blockparse.extra_rec.blockxact, info, xrecinfo, xact_time);
XLogRecSetXactRecoveryState(&blockstate->blockparse.extra_rec.blockxact, maxxid, maxseqnum, delayddlflag,
updateminrecovery);
XLogRecSetXactDdlState(&blockstate->blockparse.extra_rec.blockxact, nrels, (void *)xnodes, invalidmsgnum,
(void *)inval_msgs, nlibrary, (void *)libfilename);
if (nrels > 0) {
recordstatehead = xact_redo_rmddl_parse_to_block(record, recordstatehead, blocknum, xnodes, nrels);
}
return recordstatehead;
}
XLogRecParseState *xact_xlog_abort_parse_to_block(XLogReaderState *record, XLogRecParseState *recordstatehead,
uint32 *blocknum, TransactionId maxxid, CommitSeqNo maxseqnum)
{
XLogRecParseState *blockstate = NULL;
XLogRecPtr globalDelayDDLLSN;
XLogRecPtr lsn = record->EndRecPtr;
bool delayddlflag = false;
uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
bool compress = (bool)(XLogRecGetInfo(record) & XLR_REL_COMPRESS);
char *libfilename = NULL;
(*blocknum)++;
XLogParseBufferAllocListStateFunc(record, &blockstate, &recordstatehead);
if (blockstate == NULL) {
return NULL;
}
xl_xact_abort *xlrec = NULL;
if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_WITH_XID) {
xlrec = (xl_xact_abort *)XLogRecGetData(record);
} else {
xl_xact_abort_prepared *xlrecpre = (xl_xact_abort_prepared *)XLogRecGetData(record);
xlrec = &(xlrecpre->arec);
}
TimestampTz xact_time = xlrec->xact_time;
ColFileNodeRel *xnodes = xlrec->xnodes;
int nrels = xlrec->nrels;
int nsubxacts = xlrec->nsubxacts;
if (nrels > 0) {
globalDelayDDLLSN = GetDDLDelayStartPtr();
if (!XLogRecPtrIsInvalid(globalDelayDDLLSN) && XLByteLT(globalDelayDDLLSN, lsn))
delayddlflag = true;
else
delayddlflag = false;
}
int nlibrary = xlrec->nlibrary;
if (nlibrary > 0) {
libfilename = (char *)xnodes + (nrels * SIZE_OF_COLFILENODE(compress)) + (nsubxacts * sizeof(TransactionId));
}
if (info == XLOG_XACT_ABORT_WITH_XID) {
libfilename += sizeof(TransactionId);
}
RelFileNodeForkNum filenode = RelFileNodeForkNumFill(NULL, InvalidBackendId, InvalidForkNumber, InvalidBlockNumber);
XLogRecSetBlockCommonState(record, BLOCK_DATA_XACTDATA_TYPE, filenode, blockstate);
XLogRecSetXactCommonState(&blockstate->blockparse.extra_rec.blockxact, info, 0, xact_time);
XLogRecSetXactRecoveryState(&blockstate->blockparse.extra_rec.blockxact, maxxid, maxseqnum, delayddlflag, false);
XLogRecSetXactDdlState(&blockstate->blockparse.extra_rec.blockxact, nrels, (void *)xnodes, 0, NULL, nlibrary,
(void *)libfilename);
if (nrels > 0) {
recordstatehead = xact_redo_rmddl_parse_to_block(record, recordstatehead, blocknum, xnodes, nrels);
}
return recordstatehead;
}
void XLogRecSetPreparState(XLogBlockPrepareParse *blockpreparestate, TransactionId maxxid, char *maindata,
Size maindatalen)
{
blockpreparestate->maxxid = maxxid;
blockpreparestate->maindata = maindata;
blockpreparestate->maindatalen = maindatalen;
}
XLogRecParseState *xact_xlog_prepare_parse_to_block(XLogReaderState *record, XLogRecParseState *recordstatehead,
uint32 *blocknum, TransactionId maxxid)
{
XLogRecParseState *blockstate = NULL;
(*blocknum)++;
XLogParseBufferAllocListStateFunc(record, &blockstate, &recordstatehead);
if (blockstate == NULL) {
return NULL;
}
RelFileNodeForkNum filenode = RelFileNodeForkNumFill(NULL, InvalidBackendId, InvalidForkNumber, InvalidBlockNumber);
XLogRecSetBlockCommonState(record, BLOCK_DATA_PREPARE_TYPE, filenode, blockstate);
char *maindata = XLogRecGetData(record);
Size maindatalen = XLogRecGetDataLen(record);
XLogRecSetPreparState(&blockstate->blockparse.extra_rec.blockprepare, maxxid, maindata, maindatalen);
return recordstatehead;
}
* ************Add for batchredo end****************
* for these func ,they don't hold the lock and parse the record
* they only operator the page, and update page's LSN
* they are shared by Normal redo, ExtremRTO and DFV
* *************************************************
*/