summaryrefslogtreecommitdiff
path: root/src/backend/access/transam/twophase.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/twophase.c')
-rw-r--r--src/backend/access/transam/twophase.c105
1 files changed, 90 insertions, 15 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index c479c4881b9..d6e4b7980f3 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -144,11 +144,7 @@ int max_prepared_xacts = 0;
*
* typedef struct GlobalTransactionData *GlobalTransaction appears in
* twophase.h
- *
- * Note that the max value of GIDSIZE must fit in the uint16 gidlen,
- * specified in TwoPhaseFileHeader.
*/
-#define GIDSIZE 200
typedef struct GlobalTransactionData
{
@@ -211,12 +207,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid,
RelFileNode *rels,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
- bool initfileinval);
+ bool initfileinval,
+ const char *gid);
static void RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
- RelFileNode *rels);
+ RelFileNode *rels,
+ const char *gid);
static void ProcessRecords(char *bufptr, TransactionId xid,
const TwoPhaseCallback callbacks[]);
static void RemoveGXact(GlobalTransaction gxact);
@@ -898,7 +896,7 @@ TwoPhaseGetDummyProc(TransactionId xid)
/*
* Header for a 2PC state file
*/
-#define TWOPHASE_MAGIC 0x57F94533 /* format identifier */
+#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
typedef struct TwoPhaseFileHeader
{
@@ -914,6 +912,8 @@ typedef struct TwoPhaseFileHeader
int32 ninvalmsgs; /* number of cache invalidation messages */
bool initfileinval; /* does relcache init file need invalidation? */
uint16 gidlen; /* length of the GID - GID follows the header */
+ XLogRecPtr origin_lsn; /* lsn of this record at origin node */
+ TimestampTz origin_timestamp; /* time of prepare at origin node */
} TwoPhaseFileHeader;
/*
@@ -1065,6 +1065,7 @@ EndPrepare(GlobalTransaction gxact)
{
TwoPhaseFileHeader *hdr;
StateFileChunk *record;
+ bool replorigin;
/* Add the end sentinel to the list of 2PC records */
RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
@@ -1075,6 +1076,21 @@ EndPrepare(GlobalTransaction gxact)
Assert(hdr->magic == TWOPHASE_MAGIC);
hdr->total_len = records.total_len + sizeof(pg_crc32c);
+ replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+ replorigin_session_origin != DoNotReplicateId);
+
+ if (replorigin)
+ {
+ Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr);
+ hdr->origin_lsn = replorigin_session_origin_lsn;
+ hdr->origin_timestamp = replorigin_session_origin_timestamp;
+ }
+ else
+ {
+ hdr->origin_lsn = InvalidXLogRecPtr;
+ hdr->origin_timestamp = 0;
+ }
+
/*
* If the data size exceeds MaxAllocSize, we won't be able to read it in
* ReadTwoPhaseFile. Check for that now, rather than fail in the case
@@ -1107,7 +1123,16 @@ EndPrepare(GlobalTransaction gxact)
XLogBeginInsert();
for (record = records.head; record != NULL; record = record->next)
XLogRegisterData(record->data, record->len);
+
+ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+
+ if (replorigin)
+ /* Move LSNs forward for this replication origin */
+ replorigin_session_advance(replorigin_session_origin_lsn,
+ gxact->prepare_end_lsn);
+
XLogFlush(gxact->prepare_end_lsn);
/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1283,6 +1308,44 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings)
return buf;
}
+/*
+ * ParsePrepareRecord
+ */
+void
+ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
+{
+ TwoPhaseFileHeader *hdr;
+ char *bufptr;
+
+ hdr = (TwoPhaseFileHeader *) xlrec;
+ bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
+
+ parsed->origin_lsn = hdr->origin_lsn;
+ parsed->origin_timestamp = hdr->origin_timestamp;
+ parsed->twophase_xid = hdr->xid;
+ parsed->dbId = hdr->database;
+ parsed->nsubxacts = hdr->nsubxacts;
+ parsed->nrels = hdr->ncommitrels;
+ parsed->nabortrels = hdr->nabortrels;
+ parsed->nmsgs = hdr->ninvalmsgs;
+
+ strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
+ bufptr += MAXALIGN(hdr->gidlen);
+
+ parsed->subxacts = (TransactionId *) bufptr;
+ bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
+
+ parsed->xnodes = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode));
+
+ parsed->abortnodes = (RelFileNode *) bufptr;
+ bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode));
+
+ parsed->msgs = (SharedInvalidationMessage *) bufptr;
+ bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
+}
+
+
/*
* Reads 2PC data from xlog. During checkpoint this data will be moved to
@@ -1435,11 +1498,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
hdr->nsubxacts, children,
hdr->ncommitrels, commitrels,
hdr->ninvalmsgs, invalmsgs,
- hdr->initfileinval);
+ hdr->initfileinval, gid);
else
RecordTransactionAbortPrepared(xid,
hdr->nsubxacts, children,
- hdr->nabortrels, abortrels);
+ hdr->nabortrels, abortrels,
+ gid);
ProcArrayRemove(proc, latestXid);
@@ -1752,7 +1816,8 @@ restoreTwoPhaseData(void)
if (buf == NULL)
continue;
- PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr);
+ PrepareRedoAdd(buf, InvalidXLogRecPtr,
+ InvalidXLogRecPtr, InvalidRepOriginId);
}
}
LWLockRelease(TwoPhaseStateLock);
@@ -2165,7 +2230,8 @@ RecordTransactionCommitPrepared(TransactionId xid,
RelFileNode *rels,
int ninvalmsgs,
SharedInvalidationMessage *invalmsgs,
- bool initfileinval)
+ bool initfileinval,
+ const char *gid)
{
XLogRecPtr recptr;
TimestampTz committs = GetCurrentTimestamp();
@@ -2193,7 +2259,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
ninvalmsgs, invalmsgs,
initfileinval, false,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
- xid);
+ xid, gid);
if (replorigin)
@@ -2255,7 +2321,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
int nrels,
- RelFileNode *rels)
+ RelFileNode *rels,
+ const char *gid)
{
XLogRecPtr recptr;
@@ -2278,7 +2345,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
nchildren, children,
nrels, rels,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
- xid);
+ xid, gid);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
@@ -2309,7 +2376,8 @@ RecordTransactionAbortPrepared(TransactionId xid,
* data, the entry is marked as located on disk.
*/
void
-PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
+PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
+ XLogRecPtr end_lsn, RepOriginId origin_id)
{
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
char *bufptr;
@@ -2358,6 +2426,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn)
Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
+ if (origin_id != InvalidRepOriginId)
+ {
+ /* recover apply progress */
+ replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
+ false /* backward */ , false /* WAL */ );
+ }
+
elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
}