summaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlogreader.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlogreader.c')
-rw-r--r--src/backend/access/transam/xlogreader.c39
1 files changed, 38 insertions, 1 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index e370fa984cd..456cdab461e 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -220,6 +220,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
total_len;
uint32 targetRecOff;
uint32 pageHeaderSize;
+ bool assembled;
bool gotheader;
int readOff;
@@ -235,6 +236,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
state->errormsg_buf[0] = '\0';
ResetDecoder(state);
+ state->abortedRecPtr = InvalidXLogRecPtr;
+ state->missingContrecPtr = InvalidXLogRecPtr;
if (RecPtr == InvalidXLogRecPtr)
{
@@ -263,7 +266,9 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
randAccess = true;
}
+restart:
state->currRecPtr = RecPtr;
+ assembled = false;
targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
targetRecOff = RecPtr % XLOG_BLCKSZ;
@@ -373,6 +378,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
char *buffer;
uint32 gotlen;
+ assembled = true;
/* Copy the first fragment of the record from the first page. */
memcpy(state->readRecordBuf,
state->readBuf + RecPtr % XLOG_BLCKSZ, len);
@@ -394,8 +400,25 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
Assert(SizeOfXLogShortPHD <= readOff);
- /* Check that the continuation on next page looks valid */
pageHeader = (XLogPageHeader) state->readBuf;
+
+ /*
+ * If we were expecting a continuation record and got an
+ * "overwrite contrecord" flag, that means the continuation record
+ * was overwritten with a different record. Restart the read by
+ * assuming the address to read is the location where we found
+ * this flag; but keep track of the LSN of the record we were
+ * reading, for later verification.
+ */
+ if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
+ {
+ state->overwrittenRecPtr = state->currRecPtr;
+ ResetDecoder(state);
+ RecPtr = targetPagePtr;
+ goto restart;
+ }
+
+ /* Check that the continuation on next page looks valid */
if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
{
report_invalid_record(state,
@@ -497,6 +520,20 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
return NULL;
err:
+ if (assembled)
+ {
+ /*
+ * We get here when a record that spans multiple pages needs to be
+ * assembled, but something went wrong -- perhaps a contrecord piece
+ * was lost. If caller is WAL replay, it will know where the aborted
+ * record was and where to direct followup WAL to be written, marking
+ * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
+ * in turn signal downstream WAL consumers that the broken WAL record
+ * is to be ignored.
+ */
+ state->abortedRecPtr = RecPtr;
+ state->missingContrecPtr = targetPagePtr;
+ }
/*
* Invalidate the read state. We might read from a different source after