From 1eb6d6527aae264b3e0b9c95aa70bb7a594ad1cf Mon Sep 17 00:00:00 2001 From: Simon Riggs Date: Wed, 28 Mar 2018 17:42:50 +0100 Subject: Store 2PC GID in commit/abort WAL recs for logical decoding Store GID of 2PC in commit/abort WAL records when wal_level = logical. This allows logical decoding to send the SAME gid to subscribers across restarts of logical replication. Track relica origin replay progress for 2PC. (Edited from patch 0003 in the logical decoding 2PC series.) Authors: Nikhil Sontakke, Stas Kelvich Reviewed-by: Simon Riggs, Andres Freund --- src/backend/access/transam/xact.c | 78 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 73 insertions(+), 5 deletions(-) (limited to 'src/backend/access/transam/xact.c') diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index cfc62011b50..b88d4ccf746 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1226,7 +1226,7 @@ RecordTransactionCommit(void) nmsgs, invalMessages, RelcacheInitFileInval, forceSyncCommit, MyXactFlags, - InvalidTransactionId /* plain commit */ ); + InvalidTransactionId, NULL /* plain commit */ ); if (replorigin) /* Move LSNs forward for this replication origin */ @@ -1578,7 +1578,8 @@ RecordTransactionAbort(bool isSubXact) XactLogAbortRecord(xact_time, nchildren, children, nrels, rels, - MyXactFlags, InvalidTransactionId); + MyXactFlags, InvalidTransactionId, + NULL); /* * Report the latest async abort LSN, so that the WAL writer knows to @@ -5234,7 +5235,8 @@ XactLogCommitRecord(TimestampTz commit_time, int nrels, RelFileNode *rels, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, bool forceSync, - int xactflags, TransactionId twophase_xid) + int xactflags, TransactionId twophase_xid, + const char *twophase_gid) { xl_xact_commit xlrec; xl_xact_xinfo xl_xinfo; @@ -5246,6 +5248,7 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xact_origin xl_origin; uint8 info; + int gidlen = 0; Assert(CritSectionCount > 0); @@ -5308,6 +5311,13 @@ XactLogCommitRecord(TimestampTz commit_time, { xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; xl_twophase.xid = twophase_xid; + Assert(twophase_gid != NULL); + + if (XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_GID; + gidlen = strlen(twophase_gid) + 1; /* include '\0' */ + } } /* dump transaction origin information */ @@ -5358,7 +5368,16 @@ XactLogCommitRecord(TimestampTz commit_time, } if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) + { XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID) + { + static const char zeroes[MAXIMUM_ALIGNOF] = { 0 }; + XLogRegisterData((char*) twophase_gid, gidlen); + if (MAXALIGN(gidlen) != gidlen) + XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen); + } + } if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); @@ -5379,15 +5398,19 @@ XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, - int xactflags, TransactionId twophase_xid) + int xactflags, TransactionId twophase_xid, + const char *twophase_gid) { xl_xact_abort xlrec; xl_xact_xinfo xl_xinfo; xl_xact_subxacts xl_subxacts; xl_xact_relfilenodes xl_relfilenodes; xl_xact_twophase xl_twophase; + xl_xact_dbinfo xl_dbinfo; + xl_xact_origin xl_origin; uint8 info; + int gidlen = 0; Assert(CritSectionCount > 0); @@ -5423,6 +5446,31 @@ XactLogAbortRecord(TimestampTz abort_time, { xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; xl_twophase.xid = twophase_xid; + Assert(twophase_gid != NULL); + + if (XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_GID; + gidlen = strlen(twophase_gid) + 1; /* include '\0' */ + } + } + + if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO; + xl_dbinfo.dbId = MyDatabaseId; + xl_dbinfo.tsId = MyDatabaseTableSpace; + } + + /* dump transaction origin information only for abort prepared */ + if ( (replorigin_session_origin != InvalidRepOriginId) && + TransactionIdIsValid(twophase_xid) && + XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; + + xl_origin.origin_lsn = replorigin_session_origin_lsn; + xl_origin.origin_timestamp = replorigin_session_origin_timestamp; } if (xl_xinfo.xinfo != 0) @@ -5437,6 +5485,10 @@ XactLogAbortRecord(TimestampTz abort_time, if (xl_xinfo.xinfo != 0) XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) + XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo)); + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS) { XLogRegisterData((char *) (&xl_subxacts), @@ -5454,7 +5506,22 @@ XactLogAbortRecord(TimestampTz abort_time, } if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) + { XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID) + { + static const char zeroes[MAXIMUM_ALIGNOF] = { 0 }; + XLogRegisterData((char*) twophase_gid, gidlen); + if (MAXALIGN(gidlen) != gidlen) + XLogRegisterData((char*) zeroes, MAXALIGN(gidlen) - gidlen); + } + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) + XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); + + if (TransactionIdIsValid(twophase_xid)) + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); return XLogInsert(RM_XACT_ID, info); } @@ -5777,7 +5844,8 @@ xact_redo(XLogReaderState *record) LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE); PrepareRedoAdd(XLogRecGetData(record), record->ReadRecPtr, - record->EndRecPtr); + record->EndRecPtr, + XLogRecGetOrigin(record)); LWLockRelease(TwoPhaseStateLock); } else if (info == XLOG_XACT_ASSIGNMENT) -- cgit v1.2.3