diff options
Diffstat (limited to 'src/backend/access/transam/twophase.c')
-rw-r--r-- | src/backend/access/transam/twophase.c | 105 |
1 files changed, 90 insertions, 15 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index c479c4881b9..d6e4b7980f3 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -144,11 +144,7 @@ int max_prepared_xacts = 0; * * typedef struct GlobalTransactionData *GlobalTransaction appears in * twophase.h - * - * Note that the max value of GIDSIZE must fit in the uint16 gidlen, - * specified in TwoPhaseFileHeader. */ -#define GIDSIZE 200 typedef struct GlobalTransactionData { @@ -211,12 +207,14 @@ static void RecordTransactionCommitPrepared(TransactionId xid, RelFileNode *rels, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, - bool initfileinval); + bool initfileinval, + const char *gid); static void RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, - RelFileNode *rels); + RelFileNode *rels, + const char *gid); static void ProcessRecords(char *bufptr, TransactionId xid, const TwoPhaseCallback callbacks[]); static void RemoveGXact(GlobalTransaction gxact); @@ -898,7 +896,7 @@ TwoPhaseGetDummyProc(TransactionId xid) /* * Header for a 2PC state file */ -#define TWOPHASE_MAGIC 0x57F94533 /* format identifier */ +#define TWOPHASE_MAGIC 0x57F94534 /* format identifier */ typedef struct TwoPhaseFileHeader { @@ -914,6 +912,8 @@ typedef struct TwoPhaseFileHeader int32 ninvalmsgs; /* number of cache invalidation messages */ bool initfileinval; /* does relcache init file need invalidation? */ uint16 gidlen; /* length of the GID - GID follows the header */ + XLogRecPtr origin_lsn; /* lsn of this record at origin node */ + TimestampTz origin_timestamp; /* time of prepare at origin node */ } TwoPhaseFileHeader; /* @@ -1065,6 +1065,7 @@ EndPrepare(GlobalTransaction gxact) { TwoPhaseFileHeader *hdr; StateFileChunk *record; + bool replorigin; /* Add the end sentinel to the list of 2PC records */ RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0, @@ -1075,6 +1076,21 @@ EndPrepare(GlobalTransaction gxact) Assert(hdr->magic == TWOPHASE_MAGIC); hdr->total_len = records.total_len + sizeof(pg_crc32c); + replorigin = (replorigin_session_origin != InvalidRepOriginId && + replorigin_session_origin != DoNotReplicateId); + + if (replorigin) + { + Assert(replorigin_session_origin_lsn != InvalidXLogRecPtr); + hdr->origin_lsn = replorigin_session_origin_lsn; + hdr->origin_timestamp = replorigin_session_origin_timestamp; + } + else + { + hdr->origin_lsn = InvalidXLogRecPtr; + hdr->origin_timestamp = 0; + } + /* * If the data size exceeds MaxAllocSize, we won't be able to read it in * ReadTwoPhaseFile. Check for that now, rather than fail in the case @@ -1107,7 +1123,16 @@ EndPrepare(GlobalTransaction gxact) XLogBeginInsert(); for (record = records.head; record != NULL; record = record->next) XLogRegisterData(record->data, record->len); + + XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); + gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE); + + if (replorigin) + /* Move LSNs forward for this replication origin */ + replorigin_session_advance(replorigin_session_origin_lsn, + gxact->prepare_end_lsn); + XLogFlush(gxact->prepare_end_lsn); /* If we crash now, we have prepared: WAL replay will fix things */ @@ -1283,6 +1308,44 @@ ReadTwoPhaseFile(TransactionId xid, bool give_warnings) return buf; } +/* + * ParsePrepareRecord + */ +void +ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed) +{ + TwoPhaseFileHeader *hdr; + char *bufptr; + + hdr = (TwoPhaseFileHeader *) xlrec; + bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader)); + + parsed->origin_lsn = hdr->origin_lsn; + parsed->origin_timestamp = hdr->origin_timestamp; + parsed->twophase_xid = hdr->xid; + parsed->dbId = hdr->database; + parsed->nsubxacts = hdr->nsubxacts; + parsed->nrels = hdr->ncommitrels; + parsed->nabortrels = hdr->nabortrels; + parsed->nmsgs = hdr->ninvalmsgs; + + strncpy(parsed->twophase_gid, bufptr, hdr->gidlen); + bufptr += MAXALIGN(hdr->gidlen); + + parsed->subxacts = (TransactionId *) bufptr; + bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); + + parsed->xnodes = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); + + parsed->abortnodes = (RelFileNode *) bufptr; + bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + + parsed->msgs = (SharedInvalidationMessage *) bufptr; + bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); +} + + /* * Reads 2PC data from xlog. During checkpoint this data will be moved to @@ -1435,11 +1498,12 @@ FinishPreparedTransaction(const char *gid, bool isCommit) hdr->nsubxacts, children, hdr->ncommitrels, commitrels, hdr->ninvalmsgs, invalmsgs, - hdr->initfileinval); + hdr->initfileinval, gid); else RecordTransactionAbortPrepared(xid, hdr->nsubxacts, children, - hdr->nabortrels, abortrels); + hdr->nabortrels, abortrels, + gid); ProcArrayRemove(proc, latestXid); @@ -1752,7 +1816,8 @@ restoreTwoPhaseData(void) if (buf == NULL) continue; - PrepareRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr); + PrepareRedoAdd(buf, InvalidXLogRecPtr, + InvalidXLogRecPtr, InvalidRepOriginId); } } LWLockRelease(TwoPhaseStateLock); @@ -2165,7 +2230,8 @@ RecordTransactionCommitPrepared(TransactionId xid, RelFileNode *rels, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, - bool initfileinval) + bool initfileinval, + const char *gid) { XLogRecPtr recptr; TimestampTz committs = GetCurrentTimestamp(); @@ -2193,7 +2259,7 @@ RecordTransactionCommitPrepared(TransactionId xid, ninvalmsgs, invalmsgs, initfileinval, false, MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK, - xid); + xid, gid); if (replorigin) @@ -2255,7 +2321,8 @@ RecordTransactionAbortPrepared(TransactionId xid, int nchildren, TransactionId *children, int nrels, - RelFileNode *rels) + RelFileNode *rels, + const char *gid) { XLogRecPtr recptr; @@ -2278,7 +2345,7 @@ RecordTransactionAbortPrepared(TransactionId xid, nchildren, children, nrels, rels, MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK, - xid); + xid, gid); /* Always flush, since we're about to remove the 2PC state file */ XLogFlush(recptr); @@ -2309,7 +2376,8 @@ RecordTransactionAbortPrepared(TransactionId xid, * data, the entry is marked as located on disk. */ void -PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) +PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, + XLogRecPtr end_lsn, RepOriginId origin_id) { TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf; char *bufptr; @@ -2358,6 +2426,13 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts); TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact; + if (origin_id != InvalidRepOriginId) + { + /* recover apply progress */ + replorigin_advance(origin_id, hdr->origin_lsn, end_lsn, + false /* backward */ , false /* WAL */ ); + } + elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid); } |