summaryrefslogtreecommitdiff
path: root/src/backend/access/transam/twophase.c
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-11-20 17:56:26 +0200
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-11-20 18:46:41 +0200
commit2c03216d831160bedd72d45f712601b6f7d03f1c (patch)
treeab6a03d031ffa605d848b0b7067add15e56e2207 /src/backend/access/transam/twophase.c
parent8dc626defec23016dd5988208d8704b858b9d21d (diff)
Revamp the WAL record format.
Each WAL record now carries information about the modified relation and block(s) in a standardized format. That makes it easier to write tools that need that information, like pg_rewind, prefetching the blocks to speed up recovery, etc. There's a whole new API for building WAL records, replacing the XLogRecData chains used previously. The new API consists of XLogRegister* functions, which are called for each buffer and chunk of data that is added to the record. The new API also gives more control over when a full-page image is written, by passing flags to the XLogRegisterBuffer function. This also simplifies the XLogReadBufferForRedo() calls. The function can dig the relation and block number from the WAL record, so they no longer need to be passed as arguments. For the convenience of redo routines, XLogReader now disects each WAL record after reading it, copying the main data part and the per-block data into MAXALIGNed buffers. The data chunks are not aligned within the WAL record, but the redo routines can assume that the pointers returned by XLogRecGet* functions are. Redo routines are now passed the XLogReaderState, which contains the record in the already-disected format, instead of the plain XLogRecord. The new record format also makes the fixed size XLogRecord header smaller, by removing the xl_len field. The length of the "main data" portion is now stored at the end of the WAL record, and there's a separate header after XLogRecord for it. The alignment padding at the end of XLogRecord is also removed. This compansates for the fact that the new format would otherwise be more bulky than the old format. Reviewed by Andres Freund, Amit Kapila, Michael Paquier, Alvaro Herrera, Fujii Masao.
Diffstat (limited to 'src/backend/access/transam/twophase.c')
-rw-r--r--src/backend/access/transam/twophase.c105
1 files changed, 44 insertions, 61 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index d23c292edcd..40de84e934e 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -889,14 +889,21 @@ typedef struct TwoPhaseRecordOnDisk
/*
* During prepare, the state file is assembled in memory before writing it
- * to WAL and the actual state file. We use a chain of XLogRecData blocks
- * so that we will be able to pass the state file contents directly to
- * XLogInsert.
+ * to WAL and the actual state file. We use a chain of StateFileChunk blocks
+ * for that.
*/
+typedef struct StateFileChunk
+{
+ char *data;
+ uint32 len;
+ struct StateFileChunk *next;
+} StateFileChunk;
+
static struct xllist
{
- XLogRecData *head; /* first data block in the chain */
- XLogRecData *tail; /* last block in chain */
+ StateFileChunk *head; /* first data block in the chain */
+ StateFileChunk *tail; /* last block in chain */
+ uint32 num_chunks;
uint32 bytes_free; /* free bytes left in tail block */
uint32 total_len; /* total data bytes in chain */
} records;
@@ -917,11 +924,11 @@ save_state_data(const void *data, uint32 len)
if (padlen > records.bytes_free)
{
- records.tail->next = palloc0(sizeof(XLogRecData));
+ records.tail->next = palloc0(sizeof(StateFileChunk));
records.tail = records.tail->next;
- records.tail->buffer = InvalidBuffer;
records.tail->len = 0;
records.tail->next = NULL;
+ records.num_chunks++;
records.bytes_free = Max(padlen, 512);
records.tail->data = palloc(records.bytes_free);
@@ -951,8 +958,7 @@ StartPrepare(GlobalTransaction gxact)
SharedInvalidationMessage *invalmsgs;
/* Initialize linked list */
- records.head = palloc0(sizeof(XLogRecData));
- records.head->buffer = InvalidBuffer;
+ records.head = palloc0(sizeof(StateFileChunk));
records.head->len = 0;
records.head->next = NULL;
@@ -960,6 +966,7 @@ StartPrepare(GlobalTransaction gxact)
records.head->data = palloc(records.bytes_free);
records.tail = records.head;
+ records.num_chunks = 1;
records.total_len = 0;
@@ -1019,7 +1026,7 @@ EndPrepare(GlobalTransaction gxact)
TransactionId xid = pgxact->xid;
TwoPhaseFileHeader *hdr;
char path[MAXPGPATH];
- XLogRecData *record;
+ StateFileChunk *record;
pg_crc32 statefile_crc;
pg_crc32 bogus_crc;
int fd;
@@ -1117,12 +1124,16 @@ EndPrepare(GlobalTransaction gxact)
* We save the PREPARE record's location in the gxact for later use by
* CheckPointTwoPhase.
*/
+ XLogEnsureRecordSpace(0, records.num_chunks);
+
START_CRIT_SECTION();
MyPgXact->delayChkpt = true;
- gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
- records.head);
+ XLogBeginInsert();
+ for (record = records.head; record != NULL; record = record->next)
+ XLogRegisterData(record->data, record->len);
+ gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
XLogFlush(gxact->prepare_lsn);
/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1180,6 +1191,7 @@ EndPrepare(GlobalTransaction gxact)
SyncRepWaitForLSN(gxact->prepare_lsn);
records.tail = records.head = NULL;
+ records.num_chunks = 0;
}
/*
@@ -2071,8 +2083,6 @@ RecordTransactionCommitPrepared(TransactionId xid,
SharedInvalidationMessage *invalmsgs,
bool initfileinval)
{
- XLogRecData rdata[4];
- int lastrdata = 0;
xl_xact_commit_prepared xlrec;
XLogRecPtr recptr;
@@ -2094,39 +2104,24 @@ RecordTransactionCommitPrepared(TransactionId xid,
xlrec.crec.nsubxacts = nchildren;
xlrec.crec.nmsgs = ninvalmsgs;
- rdata[0].data = (char *) (&xlrec);
- rdata[0].len = MinSizeOfXactCommitPrepared;
- rdata[0].buffer = InvalidBuffer;
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitPrepared);
+
/* dump rels to delete */
if (nrels > 0)
- {
- rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) rels;
- rdata[1].len = nrels * sizeof(RelFileNode);
- rdata[1].buffer = InvalidBuffer;
- lastrdata = 1;
- }
+ XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
+
/* dump committed child Xids */
if (nchildren > 0)
- {
- rdata[lastrdata].next = &(rdata[2]);
- rdata[2].data = (char *) children;
- rdata[2].len = nchildren * sizeof(TransactionId);
- rdata[2].buffer = InvalidBuffer;
- lastrdata = 2;
- }
+ XLogRegisterData((char *) children,
+ nchildren * sizeof(TransactionId));
+
/* dump cache invalidation messages */
if (ninvalmsgs > 0)
- {
- rdata[lastrdata].next = &(rdata[3]);
- rdata[3].data = (char *) invalmsgs;
- rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage);
- rdata[3].buffer = InvalidBuffer;
- lastrdata = 3;
- }
- rdata[lastrdata].next = NULL;
+ XLogRegisterData((char *) invalmsgs,
+ ninvalmsgs * sizeof(SharedInvalidationMessage));
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);
+ recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED);
/*
* We don't currently try to sleep before flush here ... nor is there any
@@ -2169,8 +2164,6 @@ RecordTransactionAbortPrepared(TransactionId xid,
int nrels,
RelFileNode *rels)
{
- XLogRecData rdata[3];
- int lastrdata = 0;
xl_xact_abort_prepared xlrec;
XLogRecPtr recptr;
@@ -2189,30 +2182,20 @@ RecordTransactionAbortPrepared(TransactionId xid,
xlrec.arec.xact_time = GetCurrentTimestamp();
xlrec.arec.nrels = nrels;
xlrec.arec.nsubxacts = nchildren;
- rdata[0].data = (char *) (&xlrec);
- rdata[0].len = MinSizeOfXactAbortPrepared;
- rdata[0].buffer = InvalidBuffer;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbortPrepared);
+
/* dump rels to delete */
if (nrels > 0)
- {
- rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) rels;
- rdata[1].len = nrels * sizeof(RelFileNode);
- rdata[1].buffer = InvalidBuffer;
- lastrdata = 1;
- }
+ XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
+
/* dump committed child Xids */
if (nchildren > 0)
- {
- rdata[lastrdata].next = &(rdata[2]);
- rdata[2].data = (char *) children;
- rdata[2].len = nchildren * sizeof(TransactionId);
- rdata[2].buffer = InvalidBuffer;
- lastrdata = 2;
- }
- rdata[lastrdata].next = NULL;
+ XLogRegisterData((char *) children,
+ nchildren * sizeof(TransactionId));
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED, rdata);
+ recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);