summaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlogreader.c
diff options
context:
space:
mode:
authorThomas Munro <tmunro@postgresql.org>2021-05-10 16:00:53 +1200
committerThomas Munro <tmunro@postgresql.org>2021-05-10 16:06:09 +1200
commitc2dc19342e05e081dc13b296787baa38352681ef (patch)
tree10ba15831ecc5e9795912cac612c871ffad63a82 /src/backend/access/transam/xlogreader.c
parent63db0ac3f9e6bae313da67f640c95c0045b7f0ee (diff)
Revert recovery prefetching feature.
This set of commits has some bugs with known fixes, but at this late stage in the release cycle it seems best to revert and resubmit next time, along with some new automated test coverage for this whole area. Commits reverted: dc88460c: Doc: Review for "Optionally prefetch referenced data in recovery." 1d257577: Optionally prefetch referenced data in recovery. f003d9f8: Add circular WAL decoding buffer. 323cbe7c: Remove read_page callback from XLogReader. Remove the new GUC group WAL_RECOVERY recently added by a55a9847, as the corresponding section of config.sgml is now reverted. Discussion: https://postgr.es/m/CAOuzzgrn7iKnFRsB4MHp3UisEQAGgZMbk_ViTN4HV4-Ksq8zCg%40mail.gmail.com
Diffstat (limited to 'src/backend/access/transam/xlogreader.c')
-rw-r--r--src/backend/access/transam/xlogreader.c1545
1 files changed, 445 insertions, 1100 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 4277e92d7c9..42738eb940c 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -36,14 +36,11 @@
static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
pg_attribute_printf(2, 3);
static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
-static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr,
- int reqLen, bool header_inclusive);
-size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len);
-static XLogReadRecordResult XLogDecodeOneRecord(XLogReaderState *state,
- bool allow_oversized);
+static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
+ int reqLen);
static void XLogReaderInvalReadState(XLogReaderState *state);
static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
- XLogRecPtr PrevRecPtr, XLogRecord *record);
+ XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
XLogRecPtr recptr);
static void ResetDecoder(XLogReaderState *state);
@@ -53,8 +50,6 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
/* size of the buffer allocated for error message. */
#define MAX_ERRORMSG_LEN 1000
-#define DEFAULT_DECODE_BUFFER_SIZE 0x10000
-
/*
* Construct a string in state->errormsg_buf explaining what's wrong with
* the current record being read.
@@ -69,8 +64,6 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
va_start(args, fmt);
vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
va_end(args);
-
- state->errormsg_deferred = true;
}
/*
@@ -80,7 +73,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
*/
XLogReaderState *
XLogReaderAllocate(int wal_segment_size, const char *waldir,
- WALSegmentCleanupCB cleanup_cb)
+ XLogReaderRoutine *routine, void *private_data)
{
XLogReaderState *state;
@@ -91,7 +84,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
return NULL;
/* initialize caller-provided support functions */
- state->cleanup_cb = cleanup_cb;
+ state->routine = *routine;
+
+ state->max_block_id = -1;
/*
* Permanently allocate readBuf. We do it this way, rather than just
@@ -112,7 +107,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
waldir);
- /* ReadRecPtr, EndRecPtr, reqLen and readLen initialized to zeroes above */
+ /* system_identifier initialized to zeroes above */
+ state->private_data = private_data;
+ /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
MCXT_ALLOC_NO_OOM);
if (!state->errormsg_buf)
@@ -141,11 +138,18 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
void
XLogReaderFree(XLogReaderState *state)
{
- if (state->seg.ws_file >= 0)
- state->cleanup_cb(state);
+ int block_id;
- if (state->decode_buffer && state->free_decode_buffer)
- pfree(state->decode_buffer);
+ if (state->seg.ws_file != -1)
+ state->routine.segment_close(state);
+
+ for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
+ {
+ if (state->blocks[block_id].data)
+ pfree(state->blocks[block_id].data);
+ }
+ if (state->main_data)
+ pfree(state->main_data);
pfree(state->errormsg_buf);
if (state->readRecordBuf)
@@ -155,22 +159,6 @@ XLogReaderFree(XLogReaderState *state)
}
/*
- * Set the size of the decoding buffer. A pointer to a caller supplied memory
- * region may also be passed in, in which case non-oversized records will be
- * decoded there.
- */
-void
-XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size)
-{
- Assert(state->decode_buffer == NULL);
-
- state->decode_buffer = buffer;
- state->decode_buffer_size = size;
- state->decode_buffer_head = buffer;
- state->decode_buffer_tail = buffer;
-}
-
-/*
* Allocate readRecordBuf to fit a record of at least the given length.
* Returns true if successful, false if out of memory.
*
@@ -257,799 +245,290 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
/* Begin at the passed-in record pointer. */
state->EndRecPtr = RecPtr;
- state->NextRecPtr = RecPtr;
state->ReadRecPtr = InvalidXLogRecPtr;
- state->DecodeRecPtr = InvalidXLogRecPtr;
- state->readRecordState = XLREAD_NEXT_RECORD;
}
/*
- * See if we can release the last record that was returned by
- * XLogReadRecord(), to free up space.
+ * Attempt to read an XLOG record.
+ *
+ * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
+ * to XLogReadRecord().
+ *
+ * If the page_read callback fails to read the requested data, NULL is
+ * returned. The callback is expected to have reported the error; errormsg
+ * is set to NULL.
+ *
+ * If the reading fails for some other reason, NULL is also returned, and
+ * *errormsg is set to a string with details of the failure.
+ *
+ * The returned pointer (or *errormsg) points to an internal buffer that's
+ * valid until the next call to XLogReadRecord.
*/
-static void
-XLogReleasePreviousRecord(XLogReaderState *state)
+XLogRecord *
+XLogReadRecord(XLogReaderState *state, char **errormsg)
{
- DecodedXLogRecord *record;
+ XLogRecPtr RecPtr;
+ XLogRecord *record;
+ XLogRecPtr targetPagePtr;
+ bool randAccess;
+ uint32 len,
+ total_len;
+ uint32 targetRecOff;
+ uint32 pageHeaderSize;
+ bool gotheader;
+ int readOff;
/*
- * Remove it from the decoded record queue. It must be the oldest
- * item decoded, decode_queue_tail.
+ * randAccess indicates whether to verify the previous-record pointer of
+ * the record we're reading. We only do this if we're reading
+ * sequentially, which is what we initially assume.
*/
- record = state->record;
- Assert(record == state->decode_queue_tail);
- state->record = NULL;
- state->decode_queue_tail = record->next;
+ randAccess = false;
- /* It might also be the newest item decoded, decode_queue_head. */
- if (state->decode_queue_head == record)
- state->decode_queue_head = NULL;
+ /* reset error state */
+ *errormsg = NULL;
+ state->errormsg_buf[0] = '\0';
- /* Release the space. */
- if (unlikely(record->oversized))
- {
- /* It's not in the the decode buffer, so free it to release space. */
- pfree(record);
- }
- else
+ ResetDecoder(state);
+
+ RecPtr = state->EndRecPtr;
+
+ if (state->ReadRecPtr != InvalidXLogRecPtr)
{
- /* It must be the tail record in the decode buffer. */
- Assert(state->decode_buffer_tail == (char *) record);
+ /* read the record after the one we just read */
/*
- * We need to update tail to point to the next record that is in the
- * decode buffer, if any, being careful to skip oversized ones
- * (they're not in the decode buffer).
+ * EndRecPtr is pointing to end+1 of the previous WAL record. If
+ * we're at a page boundary, no more records can fit on the current
+ * page. We must skip over the page header, but we can't do that until
+ * we've read in the page, since the header size is variable.
*/
- record = record->next;
- while (unlikely(record && record->oversized))
- record = record->next;
-
- if (record)
- {
- /* Adjust tail to release space up to the next record. */
- state->decode_buffer_tail = (char *) record;
- }
- else if (state->decoding && !state->decoding->oversized)
- {
- /*
- * We're releasing the last fully decoded record in
- * XLogReadRecord(), but some time earlier we partially decoded a
- * record in XLogReadAhead() and were unable to complete the job.
- * We'll set the buffer head and tail to point to the record we
- * started working on, so that we can continue (perhaps from a
- * different source).
- */
- state->decode_buffer_tail = (char *) state->decoding;
- state->decode_buffer_head = (char *) state->decoding;
- }
- else
- {
- /*
- * Otherwise we might as well just reset head and tail to the
- * start of the buffer space, because we're empty. This means
- * we'll keep overwriting the same piece of memory if we're not
- * doing any prefetching.
- */
- state->decode_buffer_tail = state->decode_buffer;
- state->decode_buffer_head = state->decode_buffer;
- }
}
-}
-
-/*
- * Similar to XLogNextRecord(), but this traditional interface is for code
- * that just wants the header, not the decoded record. Callers can access the
- * decoded record through the XLogRecGetXXX() macros.
- */
-XLogReadRecordResult
-XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
-{
- XLogReadRecordResult result;
- DecodedXLogRecord *decoded;
-
- /* Consume the next decoded record. */
- result = XLogNextRecord(state, &decoded, errormsg);
- if (result == XLREAD_SUCCESS)
+ else
{
/*
- * The traditional interface just returns the header, not the decoded
- * record. The caller will access the decoded record through the
- * XLogRecGetXXX() macros.
+ * Caller supplied a position to start at.
+ *
+ * In this case, EndRecPtr should already be pointing to a valid
+ * record starting position.
*/
- *record = &decoded->header;
+ Assert(XRecOffIsValid(RecPtr));
+ randAccess = true;
}
- else
- *record = NULL;
- return result;
-}
-/*
- * Consume the next record. XLogBeginRead() or XLogFindNextRecord() must be
- * called before the first call to XLogNextRecord().
- *
- * This function may return XLREAD_NEED_DATA several times before returning a
- * result record. The caller shall read in some new data then call this
- * function again with the same parameters.
- *
- * When a record is successfully read, returns XLREAD_SUCCESS with result
- * record being stored in *record. Otherwise *record is set to NULL.
- *
- * Returns XLREAD_NEED_DATA if more data is needed to finish decoding the
- * current record. In that case, state->readPagePtr and state->reqLen inform
- * the desired position and minimum length of data needed. The caller shall
- * read in the requested data and set state->readBuf to point to a buffer
- * containing it. The caller must also set state->seg->ws_tli and
- * state->readLen to indicate the timeline that it was read from, and the
- * length of data that is now available (which must be >= given reqLen),
- * respectively.
- *
- * Returns XLREAD_FULL if allow_oversized is true, and no space is available.
- * This is intended for readahead.
- *
- * If invalid data is encountered, returns XLREAD_FAIL with *record being set
- * to NULL. *errormsg is set to a string with details of the failure. The
- * returned pointer (or *errormsg) points to an internal buffer that's valid
- * until the next call to XLogReadRecord.
- *
- */
-XLogReadRecordResult
-XLogNextRecord(XLogReaderState *state,
- DecodedXLogRecord **record,
- char **errormsg)
-{
- /* Release the space occupied by the last record we returned. */
- if (state->record)
- XLogReleasePreviousRecord(state);
+ state->currRecPtr = RecPtr;
- for (;;)
- {
- XLogReadRecordResult result;
-
- /* We can now return the oldest item in the queue, if there is one. */
- if (state->decode_queue_tail)
- {
- /*
- * Record this as the most recent record returned, so that we'll
- * release it next time. This also exposes it to the
- * XLogRecXXX(decoder) macros, which pass in the decoder rather
- * than the record for historical reasons.
- */
- state->record = state->decode_queue_tail;
-
- /*
- * It should be immediately after the last the record returned by
- * XLogReadRecord(), or at the position set by XLogBeginRead() if
- * XLogReadRecord() hasn't been called yet. It may be after a
- * page header, though.
- */
- Assert(state->record->lsn == state->EndRecPtr ||
- (state->EndRecPtr % XLOG_BLCKSZ == 0 &&
- (state->record->lsn == state->EndRecPtr + SizeOfXLogShortPHD ||
- state->record->lsn == state->EndRecPtr + SizeOfXLogLongPHD)));
-
- /*
- * Set ReadRecPtr and EndRecPtr to correspond to that
- * record.
- *
- * Calling code could access these through the returned decoded
- * record, but for now we'll update them directly here, for the
- * benefit of all the existing code that accesses these variables
- * directly.
- */
- state->ReadRecPtr = state->record->lsn;
- state->EndRecPtr = state->record->next_lsn;
+ targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
+ targetRecOff = RecPtr % XLOG_BLCKSZ;
- *errormsg = NULL;
- *record = state->record;
-
- return XLREAD_SUCCESS;
- }
- else if (state->errormsg_deferred)
- {
- /*
- * If we've run out of records, but we have a deferred error, now
- * is the time to report it.
- */
- state->errormsg_deferred = false;
- if (state->errormsg_buf[0] != '\0')
- *errormsg = state->errormsg_buf;
- else
- *errormsg = NULL;
- *record = NULL;
- state->EndRecPtr = state->DecodeRecPtr;
-
- return XLREAD_FAIL;
- }
+ /*
+ * Read the page containing the record into state->readBuf. Request enough
+ * byte to cover the whole record header, or at least the part of it that
+ * fits on the same page.
+ */
+ readOff = ReadPageInternal(state, targetPagePtr,
+ Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
+ if (readOff < 0)
+ goto err;
- /* We need to get a decoded record into our queue first. */
- result = XLogDecodeOneRecord(state, true /* allow_oversized */ );
- switch(result)
- {
- case XLREAD_NEED_DATA:
- *errormsg = NULL;
- *record = NULL;
- return result;
- case XLREAD_SUCCESS:
- Assert(state->decode_queue_tail != NULL);
- break;
- case XLREAD_FULL:
- /* Not expected because we passed allow_oversized = true */
- Assert(false);
- break;
- case XLREAD_FAIL:
- /*
- * If that produced neither a queued record nor a queued error,
- * then we're at the end (for example, archive recovery with no
- * more files available).
- */
- Assert(state->decode_queue_tail == NULL);
- if (!state->errormsg_deferred)
- {
- state->EndRecPtr = state->DecodeRecPtr;
- *errormsg = NULL;
- *record = NULL;
- return result;
- }
- break;
- }
+ /*
+ * ReadPageInternal always returns at least the page header, so we can
+ * examine it now.
+ */
+ pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+ if (targetRecOff == 0)
+ {
+ /*
+ * At page start, so skip over page header.
+ */
+ RecPtr += pageHeaderSize;
+ targetRecOff = pageHeaderSize;
}
-
- /* unreachable */
- return XLREAD_FAIL;
-}
-
-/*
- * Try to decode the next available record. The next record will also be
- * returned to XLogRecordRead().
- *
- * In addition to the values that XLogReadRecord() can return, XLogReadAhead()
- * can also return XLREAD_FULL to indicate that further readahead is not
- * possible yet due to lack of space.
- */
-XLogReadRecordResult
-XLogReadAhead(XLogReaderState *state, DecodedXLogRecord **record, char **errormsg)
-{
- XLogReadRecordResult result;
-
- /* We stop trying after encountering an error. */
- if (unlikely(state->errormsg_deferred))
+ else if (targetRecOff < pageHeaderSize)
{
- /* We only report the error message the first time, see below. */
- *errormsg = NULL;
- return XLREAD_FAIL;
+ report_invalid_record(state, "invalid record offset at %X/%X",
+ LSN_FORMAT_ARGS(RecPtr));
+ goto err;
}
- /*
- * Try to decode one more record, if we have space. Pass allow_oversized
- * = false, so that this call returns fast if the decode buffer is full.
- */
- result = XLogDecodeOneRecord(state, false);
- switch (result)
+ if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
+ targetRecOff == pageHeaderSize)
{
- case XLREAD_SUCCESS:
- /* New record at head of decode record queue. */
- Assert(state->decode_queue_head != NULL);
- *record = state->decode_queue_head;
- return result;
- case XLREAD_FULL:
- /* No space in circular decode buffer. */
- return result;
- case XLREAD_NEED_DATA:
- /* The caller needs to insert more data. */
- return result;
- case XLREAD_FAIL:
- /* Report the error. XLogReadRecord() will also report it. */
- Assert(state->errormsg_deferred);
- if (state->errormsg_buf[0] != '\0')
- *errormsg = state->errormsg_buf;
- return result;
+ report_invalid_record(state, "contrecord is requested by %X/%X",
+ LSN_FORMAT_ARGS(RecPtr));
+ goto err;
}
- /* Unreachable. */
- return XLREAD_FAIL;
-}
+ /* ReadPageInternal has verified the page header */
+ Assert(pageHeaderSize <= readOff);
-/*
- * Allocate space for a decoded record. The only member of the returned
- * object that is initialized is the 'oversized' flag, indicating that the
- * decoded record wouldn't fit in the decode buffer and must eventually be
- * freed explicitly.
- *
- * Return NULL if there is no space in the decode buffer and allow_oversized
- * is false, or if memory allocation fails for an oversized buffer.
- */
-static DecodedXLogRecord *
-XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized)
-{
- size_t required_space = DecodeXLogRecordRequiredSpace(xl_tot_len);
- DecodedXLogRecord *decoded = NULL;
+ /*
+ * Read the record length.
+ *
+ * NB: Even though we use an XLogRecord pointer here, the whole record
+ * header might not fit on this page. xl_tot_len is the first field of the
+ * struct, so it must be on this page (the records are MAXALIGNed), but we
+ * cannot access any other fields until we've verified that we got the
+ * whole header.
+ */
+ record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
+ total_len = record->xl_tot_len;
- /* Allocate a circular decode buffer if we don't have one already. */
- if (unlikely(state->decode_buffer == NULL))
- {
- if (state->decode_buffer_size == 0)
- state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE;
- state->decode_buffer = palloc(state->decode_buffer_size);
- state->decode_buffer_head = state->decode_buffer;
- state->decode_buffer_tail = state->decode_buffer;
- state->free_decode_buffer = true;
- }
- if (state->decode_buffer_head >= state->decode_buffer_tail)
+ /*
+ * If the whole record header is on this page, validate it immediately.
+ * Otherwise do just a basic sanity check on xl_tot_len, and validate the
+ * rest of the header after reading it from the next page. The xl_tot_len
+ * check is necessary here to ensure that we enter the "Need to reassemble
+ * record" code path below; otherwise we might fail to apply
+ * ValidXLogRecordHeader at all.
+ */
+ if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
{
- /* Empty, or head is to the right of tail. */
- if (state->decode_buffer_head + required_space <=
- state->decode_buffer + state->decode_buffer_size)
- {
- /* There is space between head and end. */
- decoded = (DecodedXLogRecord *) state->decode_buffer_head;
- decoded->oversized = false;
- return decoded;
- }
- else if (state->decode_buffer + required_space <
- state->decode_buffer_tail)
- {
- /* There is space between start and tail. */
- decoded = (DecodedXLogRecord *) state->decode_buffer;
- decoded->oversized = false;
- return decoded;
- }
+ if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
+ randAccess))
+ goto err;
+ gotheader = true;
}
else
{
- /* Head is to the left of tail. */
- if (state->decode_buffer_head + required_space <
- state->decode_buffer_tail)
+ /* XXX: more validation should be done here */
+ if (total_len < SizeOfXLogRecord)
{
- /* There is space between head and tail. */
- decoded = (DecodedXLogRecord *) state->decode_buffer_head;
- decoded->oversized = false;
- return decoded;
+ report_invalid_record(state,
+ "invalid record length at %X/%X: wanted %u, got %u",
+ LSN_FORMAT_ARGS(RecPtr),
+ (uint32) SizeOfXLogRecord, total_len);
+ goto err;
}
+ gotheader = false;
}
- /* Not enough space in the decode buffer. Are we allowed to allocate? */
- if (allow_oversized)
+ len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
+ if (total_len > len)
{
- decoded = palloc_extended(required_space, MCXT_ALLOC_NO_OOM);
- if (decoded == NULL)
- return NULL;
- decoded->oversized = true;
- return decoded;
- }
+ /* Need to reassemble record */
+ char *contdata;
+ XLogPageHeader pageHeader;
+ char *buffer;
+ uint32 gotlen;
- return decoded;
-}
-
-/*
- * Try to read and decode the next record and add it to the head of the
- * decoded record queue. If 'allow_oversized' is false, then XLREAD_FULL can
- * be returned to indicate the decoding buffer is full. XLogBeginRead() or
- * XLogFindNextRecord() must be called before the first call to
- * XLogReadRecord().
- *
- * This function runs a state machine consisting of the following states.
- *
- * XLREAD_NEXT_RECORD:
- * The initial state. If called with a valid XLogRecPtr, try to read a
- * record at that position. If invalid RecPtr is given try to read a record
- * just after the last one read. The next state is XLREAD_TOT_LEN.
- *
- * XLREAD_TOT_LEN:
- * Examining record header. Ends after reading record length.
- * recordRemainLen and recordGotLen are initialized. The next state is
- * XLREAD_FIRST_FRAGMENT.
- *
- * XLREAD_FIRST_FRAGMENT:
- * Reading the first fragment. Goes to XLREAD_NEXT_RECORD if that's all or
- * XLREAD_CONTINUATION if we need more data.
-
- * XLREAD_CONTINUATION:
- * Reading continuation of record. If the whole record is now decoded, goes
- * to XLREAD_NEXT_RECORD. During this state, recordRemainLen indicates how
- * much is left.
- *
- * If invalid data is found in any state, the state machine stays at the
- * current state. This behavior allows us to continue reading a record
- * after switching to a different source, during streaming replication.
- */
-static XLogReadRecordResult
-XLogDecodeOneRecord(XLogReaderState *state, bool allow_oversized)
-{
- XLogRecord *record;
- char *errormsg; /* not used */
- XLogRecord *prec;
-
- /* reset error state */
- state->errormsg_buf[0] = '\0';
- record = NULL;
+ /*
+ * Enlarge readRecordBuf as needed.
+ */
+ if (total_len > state->readRecordBufSize &&
+ !allocate_recordbuf(state, total_len))
+ {
+ /* We treat this as a "bogus data" condition */
+ report_invalid_record(state, "record length %u at %X/%X too long",
+ total_len, LSN_FORMAT_ARGS(RecPtr));
+ goto err;
+ }
- switch (state->readRecordState)
- {
- case XLREAD_NEXT_RECORD:
- Assert(!state->decoding);
+ /* Copy the first fragment of the record from the first page. */
+ memcpy(state->readRecordBuf,
+ state->readBuf + RecPtr % XLOG_BLCKSZ, len);
+ buffer = state->readRecordBuf + len;
+ gotlen = len;
- if (state->DecodeRecPtr != InvalidXLogRecPtr)
- {
- /* read the record after the one we just read */
+ do
+ {
+ /* Calculate pointer to beginning of next page */
+ targetPagePtr += XLOG_BLCKSZ;
- /*
- * NextRecPtr is pointing to end+1 of the previous WAL record.
- * If we're at a page boundary, no more records can fit on the
- * current page. We must skip over the page header, but we
- * can't do that until we've read in the page, since the
- * header size is variable.
- */
- state->PrevRecPtr = state->DecodeRecPtr;
- state->DecodeRecPtr = state->NextRecPtr;
- }
- else
- {
- /*
- * Caller supplied a position to start at.
- *
- * In this case, EndRecPtr should already be pointing to a
- * valid record starting position.
- */
- Assert(XRecOffIsValid(state->NextRecPtr));
- state->DecodeRecPtr = state->NextRecPtr;
+ /* Wait for the next page to become available */
+ readOff = ReadPageInternal(state, targetPagePtr,
+ Min(total_len - gotlen + SizeOfXLogShortPHD,
+ XLOG_BLCKSZ));
- /*
- * We cannot verify the previous-record pointer when we're
- * seeking to a particular record. Reset PrevRecPtr so that we
- * won't try doing that.
- */
- state->PrevRecPtr = InvalidXLogRecPtr;
- }
+ if (readOff < 0)
+ goto err;
- state->record_verified = false;
- state->readRecordState = XLREAD_TOT_LEN;
- /* fall through */
+ Assert(SizeOfXLogShortPHD <= readOff);
- case XLREAD_TOT_LEN:
+ /* Check that the continuation on next page looks valid */
+ pageHeader = (XLogPageHeader) state->readBuf;
+ if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
{
- uint32 total_len;
- uint32 pageHeaderSize;
- XLogRecPtr targetPagePtr;
- uint32 targetRecOff;
- XLogPageHeader pageHeader;
-
- Assert(!state->decoding);
-
- targetPagePtr =
- state->DecodeRecPtr - (state->DecodeRecPtr % XLOG_BLCKSZ);
- targetRecOff = state->DecodeRecPtr % XLOG_BLCKSZ;
-
- /*
- * Check if we have enough data. For the first record in the
- * page, the requesting length doesn't contain page header.
- */
- if (XLogNeedData(state, targetPagePtr,
- Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ),
- targetRecOff != 0))
- return XLREAD_NEED_DATA;
-
- /* error out if caller supplied bogus page */
- if (!state->page_verified)
- goto err;
-
- /* examine page header now. */
- pageHeaderSize =
- XLogPageHeaderSize((XLogPageHeader) state->readBuf);
- if (targetRecOff == 0)
- {
- /* At page start, so skip over page header. */
- state->DecodeRecPtr += pageHeaderSize;
- targetRecOff = pageHeaderSize;
- }
- else if (targetRecOff < pageHeaderSize)
- {
- report_invalid_record(state, "invalid record offset at %X/%X",
- LSN_FORMAT_ARGS(state->DecodeRecPtr));
- goto err;
- }
-
- pageHeader = (XLogPageHeader) state->readBuf;
- if ((pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
- targetRecOff == pageHeaderSize)
- {
- report_invalid_record(state, "contrecord is requested by %X/%X",
- LSN_FORMAT_ARGS(state->DecodeRecPtr));
- goto err;
- }
-
- /* XLogNeedData has verified the page header */
- Assert(pageHeaderSize <= state->readLen);
-
- /*
- * Read the record length.
- *
- * NB: Even though we use an XLogRecord pointer here, the
- * whole record header might not fit on this page. xl_tot_len
- * is the first field of the struct, so it must be on this
- * page (the records are MAXALIGNed), but we cannot access any
- * other fields until we've verified that we got the whole
- * header.
- */
- prec = (XLogRecord *) (state->readBuf +
- state->DecodeRecPtr % XLOG_BLCKSZ);
- total_len = prec->xl_tot_len;
-
- /* Find space to decode this record. */
- Assert(state->decoding == NULL);
- state->decoding = XLogReadRecordAlloc(state, total_len,
- allow_oversized);
- if (state->decoding == NULL)
- {
- /*
- * We couldn't get space. If allow_oversized was true,
- * then palloc() must have failed. Otherwise, report that
- * our decoding buffer is full. This means that weare
- * trying to read too far ahead.
- */
- if (allow_oversized)
- goto err;
- return XLREAD_FULL;
- }
-
- /*
- * If the whole record header is on this page, validate it
- * immediately. Otherwise do just a basic sanity check on
- * xl_tot_len, and validate the rest of the header after
- * reading it from the next page. The xl_tot_len check is
- * necessary here to ensure that we enter the
- * XLREAD_CONTINUATION state below; otherwise we might fail to
- * apply ValidXLogRecordHeader at all.
- */
- if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
- {
- if (!ValidXLogRecordHeader(state, state->DecodeRecPtr,
- state->PrevRecPtr, prec))
- goto err;
-
- state->record_verified = true;
- }
- else
- {
- /* XXX: more validation should be done here */
- if (total_len < SizeOfXLogRecord)
- {
- report_invalid_record(state,
- "invalid record length at %X/%X: wanted %u, got %u",
- LSN_FORMAT_ARGS(state->DecodeRecPtr),
- (uint32) SizeOfXLogRecord, total_len);
- goto err;
- }
- }
-
- /*
- * Wait for the rest of the record, or the part of the record
- * that fit on the first page if crossed a page boundary, to
- * become available.
- */
- state->recordGotLen = 0;
- state->recordRemainLen = total_len;
- state->readRecordState = XLREAD_FIRST_FRAGMENT;
+ report_invalid_record(state,
+ "there is no contrecord flag at %X/%X",
+ LSN_FORMAT_ARGS(RecPtr));
+ goto err;
}
- /* fall through */
- case XLREAD_FIRST_FRAGMENT:
+ /*
+ * Cross-check that xlp_rem_len agrees with how much of the record
+ * we expect there to be left.
+ */
+ if (pageHeader->xlp_rem_len == 0 ||
+ total_len != (pageHeader->xlp_rem_len + gotlen))
{
- uint32 total_len = state->recordRemainLen;
- uint32 request_len;
- uint32 record_len;
- XLogRecPtr targetPagePtr;
- uint32 targetRecOff;
-
- Assert(state->decoding);
-
- /*
- * Wait for the rest of the record on the first page to become
- * available
- */
- targetPagePtr =
- state->DecodeRecPtr - (state->DecodeRecPtr % XLOG_BLCKSZ);
- targetRecOff = state->DecodeRecPtr % XLOG_BLCKSZ;
-
- request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ);
- record_len = request_len - targetRecOff;
-
- /* ReadRecPtr contains page header */
- Assert(targetRecOff != 0);
- if (XLogNeedData(state, targetPagePtr, request_len, true))
- return XLREAD_NEED_DATA;
-
- /* error out if caller supplied bogus page */
- if (!state->page_verified)
- goto err;
-
- prec = (XLogRecord *) (state->readBuf + targetRecOff);
-
- /* validate record header if not yet */
- if (!state->record_verified && record_len >= SizeOfXLogRecord)
- {
- if (!ValidXLogRecordHeader(state, state->DecodeRecPtr,
- state->PrevRecPtr, prec))
- goto err;
-
- state->record_verified = true;
- }
-
+ report_invalid_record(state,
+ "invalid contrecord length %u (expected %lld) at %X/%X",
+ pageHeader->xlp_rem_len,
+ ((long long) total_len) - gotlen,
+ LSN_FORMAT_ARGS(RecPtr));
+ goto err;
+ }
- if (total_len == record_len)
- {
- /* Record does not cross a page boundary */
- Assert(state->record_verified);
+ /* Append the continuation from this page to the buffer */
+ pageHeaderSize = XLogPageHeaderSize(pageHeader);
- if (!ValidXLogRecord(state, prec, state->DecodeRecPtr))
- goto err;
+ if (readOff < pageHeaderSize)
+ readOff = ReadPageInternal(state, targetPagePtr,
+ pageHeaderSize);
- state->record_verified = true; /* to be tidy */
+ Assert(pageHeaderSize <= readOff);
- /* We already checked the header earlier */
- state->NextRecPtr = state->DecodeRecPtr + MAXALIGN(record_len);
+ contdata = (char *) state->readBuf + pageHeaderSize;
+ len = XLOG_BLCKSZ - pageHeaderSize;
+ if (pageHeader->xlp_rem_len < len)
+ len = pageHeader->xlp_rem_len;
- record = prec;
- state->readRecordState = XLREAD_NEXT_RECORD;
- break;
- }
+ if (readOff < pageHeaderSize + len)
+ readOff = ReadPageInternal(state, targetPagePtr,
+ pageHeaderSize + len);
- /*
- * The record continues on the next page. Need to reassemble
- * record
- */
- Assert(total_len > record_len);
+ memcpy(buffer, (char *) contdata, len);
+ buffer += len;
+ gotlen += len;
- /* Enlarge readRecordBuf as needed. */
- if (total_len > state->readRecordBufSize &&
- !allocate_recordbuf(state, total_len))
- {
- /* We treat this as a "bogus data" condition */
- report_invalid_record(state,
- "record length %u at %X/%X too long",
- total_len,
- LSN_FORMAT_ARGS(state->DecodeRecPtr));
+ /* If we just reassembled the record header, validate it. */
+ if (!gotheader)
+ {
+ record = (XLogRecord *) state->readRecordBuf;
+ if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr,
+ record, randAccess))
goto err;
- }
-
- /* Copy the first fragment of the record from the first page. */
- memcpy(state->readRecordBuf, state->readBuf + targetRecOff,
- record_len);
- state->recordGotLen += record_len;
- state->recordRemainLen -= record_len;
-
- /* Calculate pointer to beginning of next page */
- state->recordContRecPtr = state->DecodeRecPtr + record_len;
- Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0);
-
- state->readRecordState = XLREAD_CONTINUATION;
+ gotheader = true;
}
- /* fall through */
+ } while (gotlen < total_len);
- case XLREAD_CONTINUATION:
- {
- XLogPageHeader pageHeader = NULL;
- uint32 pageHeaderSize;
- XLogRecPtr targetPagePtr = InvalidXLogRecPtr;
+ Assert(gotheader);
- /*
- * we enter this state only if we haven't read the whole
- * record.
- */
- Assert(state->decoding);
- Assert(state->recordRemainLen > 0);
-
- while (state->recordRemainLen > 0)
- {
- char *contdata;
- uint32 request_len PG_USED_FOR_ASSERTS_ONLY;
- uint32 record_len;
-
- /* Wait for the next page to become available */
- targetPagePtr = state->recordContRecPtr;
-
- /* this request contains page header */
- Assert(targetPagePtr != 0);
- if (XLogNeedData(state, targetPagePtr,
- Min(state->recordRemainLen, XLOG_BLCKSZ),
- false))
- return XLREAD_NEED_DATA;
-
- if (!state->page_verified)
- goto err_continue;
-
- Assert(SizeOfXLogShortPHD <= state->readLen);
-
- /* Check that the continuation on next page looks valid */
- pageHeader = (XLogPageHeader) state->readBuf;
- if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
- {
- report_invalid_record(
- state,
- "there is no contrecord flag at %X/%X reading %X/%X",
- LSN_FORMAT_ARGS(state->recordContRecPtr),
- LSN_FORMAT_ARGS(state->DecodeRecPtr));
- goto err;
- }
-
- /*
- * Cross-check that xlp_rem_len agrees with how much of
- * the record we expect there to be left.
- */
- if (pageHeader->xlp_rem_len == 0 ||
- pageHeader->xlp_rem_len != state->recordRemainLen)
- {
- report_invalid_record(
- state,
- "invalid contrecord length %u at %X/%X reading %X/%X, expected %u",
- pageHeader->xlp_rem_len,
- LSN_FORMAT_ARGS(state->recordContRecPtr),
- LSN_FORMAT_ARGS(state->DecodeRecPtr),
- state->recordRemainLen);
- goto err;
- }
-
- /* Append the continuation from this page to the buffer */
- pageHeaderSize = XLogPageHeaderSize(pageHeader);
-
- /*
- * XLogNeedData should have ensured that the whole page
- * header was read
- */
- Assert(pageHeaderSize <= state->readLen);
-
- contdata = (char *) state->readBuf + pageHeaderSize;
- record_len = XLOG_BLCKSZ - pageHeaderSize;
- if (pageHeader->xlp_rem_len < record_len)
- record_len = pageHeader->xlp_rem_len;
-
- request_len = record_len + pageHeaderSize;
-
- /*
- * XLogNeedData should have ensured all needed data was
- * read
- */
- Assert(request_len <= state->readLen);
-
- memcpy(state->readRecordBuf + state->recordGotLen,
- (char *) contdata, record_len);
- state->recordGotLen += record_len;
- state->recordRemainLen -= record_len;
-
- /* If we just reassembled the record header, validate it. */
- if (!state->record_verified)
- {
- Assert(state->recordGotLen >= SizeOfXLogRecord);
- if (!ValidXLogRecordHeader(state, state->DecodeRecPtr,
- state->PrevRecPtr,
- (XLogRecord *) state->readRecordBuf))
- goto err;
-
- state->record_verified = true;
- }
-
- /*
- * Calculate pointer to beginning of next page, and
- * continue
- */
- state->recordContRecPtr += XLOG_BLCKSZ;
- }
+ record = (XLogRecord *) state->readRecordBuf;
+ if (!ValidXLogRecord(state, record, RecPtr))
+ goto err;
- /* targetPagePtr is pointing the last-read page here */
- prec = (XLogRecord *) state->readRecordBuf;
- if (!ValidXLogRecord(state, prec, state->DecodeRecPtr))
- goto err;
+ pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+ state->ReadRecPtr = RecPtr;
+ state->EndRecPtr = targetPagePtr + pageHeaderSize
+ + MAXALIGN(pageHeader->xlp_rem_len);
+ }
+ else
+ {
+ /* Wait for the record data to become available */
+ readOff = ReadPageInternal(state, targetPagePtr,
+ Min(targetRecOff + total_len, XLOG_BLCKSZ));
+ if (readOff < 0)
+ goto err;
- pageHeaderSize =
- XLogPageHeaderSize((XLogPageHeader) state->readBuf);
- state->NextRecPtr = targetPagePtr + pageHeaderSize
- + MAXALIGN(pageHeader->xlp_rem_len);
+ /* Record does not cross a page boundary */
+ if (!ValidXLogRecord(state, record, RecPtr))
+ goto err;
- record = prec;
- state->readRecordState = XLREAD_NEXT_RECORD;
+ state->EndRecPtr = RecPtr + MAXALIGN(total_len);
- break;
- }
+ state->ReadRecPtr = RecPtr;
}
/*
@@ -1059,195 +538,133 @@ XLogDecodeOneRecord(XLogReaderState *state, bool allow_oversized)
(record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
{
/* Pretend it extends to end of segment */
- state->NextRecPtr += state->segcxt.ws_segsize - 1;
- state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize);
+ state->EndRecPtr += state->segcxt.ws_segsize - 1;
+ state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
}
- Assert(!record || state->readLen >= 0);
- if (DecodeXLogRecord(state, state->decoding, record, state->DecodeRecPtr, &errormsg))
- {
- /* Record the location of the next record. */
- state->decoding->next_lsn = state->NextRecPtr;
-
- /*
- * If it's in the decode buffer (not an "oversized" record allocated
- * with palloc()), mark the decode buffer space as occupied.
- */
- if (!state->decoding->oversized)
- {
- /* The new decode buffer head must be MAXALIGNed. */
- Assert(state->decoding->size == MAXALIGN(state->decoding->size));
- if ((char *) state->decoding == state->decode_buffer)
- state->decode_buffer_head = state->decode_buffer +
- state->decoding->size;
- else
- state->decode_buffer_head += state->decoding->size;
- }
-
- /* Insert it into the queue of decoded records. */
- Assert(state->decode_queue_head != state->decoding);
- if (state->decode_queue_head)
- state->decode_queue_head->next = state->decoding;
- state->decode_queue_head = state->decoding;
- if (!state->decode_queue_tail)
- state->decode_queue_tail = state->decoding;
- state->decoding = NULL;
-
- return XLREAD_SUCCESS;
- }
+ if (DecodeXLogRecord(state, record, errormsg))
+ return record;
+ else
+ return NULL;
err:
- if (state->decoding && state->decoding->oversized)
- pfree(state->decoding);
- state->decoding = NULL;
-err_continue:
/*
- * Invalidate the read page. We might read from a different source after
+ * Invalidate the read state. We might read from a different source after
* failure.
*/
XLogReaderInvalReadState(state);
- /*
- * If an error was written to errmsg_buf, it'll be returned to the caller
- * of XLogReadRecord() after all successfully decoded records from the
- * read queue.
- */
+ if (state->errormsg_buf[0] != '\0')
+ *errormsg = state->errormsg_buf;
- return XLREAD_FAIL;
+ return NULL;
}
/*
- * Checks that an xlog page loaded in state->readBuf is including at least
- * [pageptr, reqLen] and the page is valid. header_inclusive indicates that
- * reqLen is calculated including page header length.
- *
- * Returns false if the buffer already contains the requested data, or found
- * error. state->page_verified is set to true for the former and false for the
- * latter.
+ * Read a single xlog page including at least [pageptr, reqLen] of valid data
+ * via the page_read() callback.
*
- * Otherwise returns true and requests data loaded onto state->readBuf by
- * state->readPagePtr and state->readLen. The caller shall call this function
- * again after filling the buffer at least with that portion of data and set
- * state->readLen to the length of actually loaded data.
+ * Returns -1 if the required page cannot be read for some reason; errormsg_buf
+ * is set in that case (unless the error occurs in the page_read callback).
*
- * If header_inclusive is false, corrects reqLen internally by adding the
- * actual page header length and may request caller for new data.
+ * We fetch the page from a reader-local cache if we know we have the required
+ * data and if there hasn't been any error since caching the data.
*/
-static bool
-XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen,
- bool header_inclusive)
+static int
+ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
{
+ int readLen;
uint32 targetPageOff;
XLogSegNo targetSegNo;
- uint32 addLen = 0;
+ XLogPageHeader hdr;
- /* Some data is loaded, but page header is not verified yet. */
- if (!state->page_verified &&
- !XLogRecPtrIsInvalid(state->readPagePtr) && state->readLen >= 0)
- {
- uint32 pageHeaderSize;
-
- /* just loaded new data so needs to verify page header */
-
- /* The caller must have loaded at least page header */
- Assert(state->readLen >= SizeOfXLogShortPHD);
+ Assert((pageptr % XLOG_BLCKSZ) == 0);
- /*
- * We have enough data to check the header length. Recheck the loaded
- * length against the actual header length.
- */
- pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+ XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
+ targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
- /* Request more data if we don't have the full header. */
- if (state->readLen < pageHeaderSize)
- {
- state->reqLen = pageHeaderSize;
- return true;
- }
+ /* check whether we have all the requested data already */
+ if (targetSegNo == state->seg.ws_segno &&
+ targetPageOff == state->segoff && reqLen <= state->readLen)
+ return state->readLen;
- /* Now that we know we have the full header, validate it. */
- if (!XLogReaderValidatePageHeader(state, state->readPagePtr,
- (char *) state->readBuf))
- {
- /* That's bad. Force reading the page again. */
- XLogReaderInvalReadState(state);
+ /*
+ * Data is not in our buffer.
+ *
+ * Every time we actually read the segment, even if we looked at parts of
+ * it before, we need to do verification as the page_read callback might
+ * now be rereading data from a different source.
+ *
+ * Whenever switching to a new WAL segment, we read the first page of the
+ * file and validate its header, even if that's not where the target
+ * record is. This is so that we can check the additional identification
+ * info that is present in the first page's "long" header.
+ */
+ if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
+ {
+ XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
- return false;
- }
+ readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
+ state->currRecPtr,
+ state->readBuf);
+ if (readLen < 0)
+ goto err;
- state->page_verified = true;
+ /* we can be sure to have enough WAL available, we scrolled back */
+ Assert(readLen == XLOG_BLCKSZ);
- XLByteToSeg(state->readPagePtr, state->seg.ws_segno,
- state->segcxt.ws_segsize);
+ if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
+ state->readBuf))
+ goto err;
}
/*
- * The loaded page may not be the one caller is supposing to read when we
- * are verifying the first page of new segment. In that case, skip further
- * verification and immediately load the target page.
+ * First, read the requested data length, but at least a short page header
+ * so that we can validate it.
*/
- if (state->page_verified && pageptr == state->readPagePtr)
- {
- /*
- * calculate additional length for page header keeping the total
- * length within the block size.
- */
- if (!header_inclusive)
- {
- uint32 pageHeaderSize =
- XLogPageHeaderSize((XLogPageHeader) state->readBuf);
+ readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
+ state->currRecPtr,
+ state->readBuf);
+ if (readLen < 0)
+ goto err;
- addLen = pageHeaderSize;
- if (reqLen + pageHeaderSize <= XLOG_BLCKSZ)
- addLen = pageHeaderSize;
- else
- addLen = XLOG_BLCKSZ - reqLen;
- }
+ Assert(readLen <= XLOG_BLCKSZ);
- /* Return if we already have it. */
- if (reqLen + addLen <= state->readLen)
- return false;
- }
+ /* Do we have enough data to check the header length? */
+ if (readLen <= SizeOfXLogShortPHD)
+ goto err;
- /* Data is not in our buffer, request the caller for it. */
- XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
- targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
- Assert((pageptr % XLOG_BLCKSZ) == 0);
+ Assert(readLen >= reqLen);
- /*
- * Every time we request to load new data of a page to the caller, even if
- * we looked at a part of it before, we need to do verification on the
- * next invocation as the caller might now be rereading data from a
- * different source.
- */
- state->page_verified = false;
+ hdr = (XLogPageHeader) state->readBuf;
- /*
- * Whenever switching to a new WAL segment, we read the first page of the
- * file and validate its header, even if that's not where the target
- * record is. This is so that we can check the additional identification
- * info that is present in the first page's "long" header. Don't do this
- * if the caller requested the first page in the segment.
- */
- if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
+ /* still not enough */
+ if (readLen < XLogPageHeaderSize(hdr))
{
- /*
- * Then we'll see that the targetSegNo now matches the ws_segno, and
- * will not come back here, but will request the actual target page.
- */
- state->readPagePtr = pageptr - targetPageOff;
- state->reqLen = XLOG_BLCKSZ;
- return true;
+ readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
+ state->currRecPtr,
+ state->readBuf);
+ if (readLen < 0)
+ goto err;
}
/*
- * Request the caller to load the page. We need at least a short page
- * header so that we can validate it.
+ * Now that we know we have the full header, validate it.
*/
- state->readPagePtr = pageptr;
- state->reqLen = Max(reqLen + addLen, SizeOfXLogShortPHD);
- return true;
+ if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
+ goto err;
+
+ /* update read state information */
+ state->seg.ws_segno = targetSegNo;
+ state->segoff = targetPageOff;
+ state->readLen = readLen;
+
+ return readLen;
+
+err:
+ XLogReaderInvalReadState(state);
+ return -1;
}
/*
@@ -1256,7 +673,9 @@ XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen,
static void
XLogReaderInvalReadState(XLogReaderState *state)
{
- state->readPagePtr = InvalidXLogRecPtr;
+ state->seg.ws_segno = 0;
+ state->segoff = 0;
+ state->readLen = 0;
}
/*
@@ -1264,12 +683,11 @@ XLogReaderInvalReadState(XLogReaderState *state)
*
* This is just a convenience subroutine to avoid duplicated code in
* XLogReadRecord. It's not intended for use from anywhere else.
- *
- * If PrevRecPtr is valid, the xl_prev is is cross-checked with it.
*/
static bool
ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
- XLogRecPtr PrevRecPtr, XLogRecord *record)
+ XLogRecPtr PrevRecPtr, XLogRecord *record,
+ bool randAccess)
{
if (record->xl_tot_len < SizeOfXLogRecord)
{
@@ -1286,7 +704,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
record->xl_rmid, LSN_FORMAT_ARGS(RecPtr));
return false;
}
- if (PrevRecPtr == InvalidXLogRecPtr)
+ if (randAccess)
{
/*
* We can't exactly verify the prev-link, but surely it should be less
@@ -1504,22 +922,6 @@ XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
* here.
*/
-XLogFindNextRecordState *
-InitXLogFindNextRecord(XLogReaderState *reader_state, XLogRecPtr start_ptr)
-{
- XLogFindNextRecordState *state = (XLogFindNextRecordState *)
- palloc_extended(sizeof(XLogFindNextRecordState),
- MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
- if (!state)
- return NULL;
-
- state->reader_state = reader_state;
- state->targetRecPtr = start_ptr;
- state->currRecPtr = start_ptr;
-
- return state;
-}
-
/*
* Find the first record with an lsn >= RecPtr.
*
@@ -1531,25 +933,27 @@ InitXLogFindNextRecord(XLogReaderState *reader_state, XLogRecPtr start_ptr)
* This positions the reader, like XLogBeginRead(), so that the next call to
* XLogReadRecord() will read the next valid record.
*/
-bool
-XLogFindNextRecord(XLogFindNextRecordState *state)
+XLogRecPtr
+XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
{
+ XLogRecPtr tmpRecPtr;
+ XLogRecPtr found = InvalidXLogRecPtr;
XLogPageHeader header;
- XLogRecord *record;
- XLogReadRecordResult result;
char *errormsg;
- Assert(!XLogRecPtrIsInvalid(state->currRecPtr));
+ Assert(!XLogRecPtrIsInvalid(RecPtr));
/*
* skip over potential continuation data, keeping in mind that it may span
* multiple pages
*/
+ tmpRecPtr = RecPtr;
while (true)
{
XLogRecPtr targetPagePtr;
int targetRecOff;
uint32 pageHeaderSize;
+ int readLen;
/*
* Compute targetRecOff. It should typically be equal or greater than
@@ -1557,27 +961,27 @@ XLogFindNextRecord(XLogFindNextRecordState *state)
* that, except when caller has explicitly specified the offset that
* falls somewhere there or when we are skipping multi-page
* continuation record. It doesn't matter though because
- * XLogNeedData() is prepared to handle that and will read at least
- * short page-header worth of data
+ * ReadPageInternal() is prepared to handle that and will read at
+ * least short page-header worth of data
*/
- targetRecOff = state->currRecPtr % XLOG_BLCKSZ;
+ targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
/* scroll back to page boundary */
- targetPagePtr = state->currRecPtr - targetRecOff;
+ targetPagePtr = tmpRecPtr - targetRecOff;
- if (XLogNeedData(state->reader_state, targetPagePtr, targetRecOff,
- targetRecOff != 0))
- return true;
-
- if (!state->reader_state->page_verified)
+ /* Read the page containing the record */
+ readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
+ if (readLen < 0)
goto err;
- header = (XLogPageHeader) state->reader_state->readBuf;
+ header = (XLogPageHeader) state->readBuf;
pageHeaderSize = XLogPageHeaderSize(header);
- /* we should have read the page header */
- Assert(state->reader_state->readLen >= pageHeaderSize);
+ /* make sure we have enough data for the page header */
+ readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
+ if (readLen < 0)
+ goto err;
/* skip over potential continuation data */
if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
@@ -1592,21 +996,21 @@ XLogFindNextRecord(XLogFindNextRecordState *state)
* Note that record headers are MAXALIGN'ed
*/
if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
- state->currRecPtr = targetPagePtr + XLOG_BLCKSZ;
+ tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
else
{
/*
* The previous continuation record ends in this page. Set
- * state->currRecPtr to point to the first valid record
+ * tmpRecPtr to point to the first valid record
*/
- state->currRecPtr = targetPagePtr + pageHeaderSize
+ tmpRecPtr = targetPagePtr + pageHeaderSize
+ MAXALIGN(header->xlp_rem_len);
break;
}
}
else
{
- state->currRecPtr = targetPagePtr + pageHeaderSize;
+ tmpRecPtr = targetPagePtr + pageHeaderSize;
break;
}
}
@@ -1616,36 +1020,31 @@ XLogFindNextRecord(XLogFindNextRecordState *state)
* because either we're at the first record after the beginning of a page
* or we just jumped over the remaining data of a continuation.
*/
- XLogBeginRead(state->reader_state, state->currRecPtr);
- while ((result = XLogReadRecord(state->reader_state, &record, &errormsg)) !=
- XLREAD_FAIL)
+ XLogBeginRead(state, tmpRecPtr);
+ while (XLogReadRecord(state, &errormsg) != NULL)
{
- if (result == XLREAD_NEED_DATA)
- return true;
-
/* past the record we've found, break out */
- if (state->targetRecPtr <= state->reader_state->ReadRecPtr)
+ if (RecPtr <= state->ReadRecPtr)
{
/* Rewind the reader to the beginning of the last record. */
- state->currRecPtr = state->reader_state->ReadRecPtr;
- XLogBeginRead(state->reader_state, state->currRecPtr);
- return false;
+ found = state->ReadRecPtr;
+ XLogBeginRead(state, found);
+ return found;
}
}
err:
- XLogReaderInvalReadState(state->reader_state);
+ XLogReaderInvalReadState(state);
- state->currRecPtr = InvalidXLogRecPtr;;
- return false;
+ return InvalidXLogRecPtr;
}
#endif /* FRONTEND */
/*
- * Helper function to ease writing of routines that read raw WAL data.
- * If this function is used, caller must supply a segment_open callback and
- * segment_close callback as that is used here.
+ * Helper function to ease writing of XLogRoutine->page_read callbacks.
+ * If this function is used, caller must supply a segment_open callback in
+ * 'state', as that is used here.
*
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'.
@@ -1658,7 +1057,6 @@ err:
*/
bool
WALRead(XLogReaderState *state,
- WALSegmentOpenCB segopenfn, WALSegmentCloseCB segclosefn,
char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
WALReadError *errinfo)
{
@@ -1690,10 +1088,10 @@ WALRead(XLogReaderState *state,
XLogSegNo nextSegNo;
if (state->seg.ws_file >= 0)
- segclosefn(state);
+ state->routine.segment_close(state);
XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
- segopenfn(state, nextSegNo, &tli);
+ state->routine.segment_open(state, nextSegNo, &tli);
/* This shouldn't happen -- indicates a bug in segment_open */
Assert(state->seg.ws_file >= 0);
@@ -1745,84 +1143,34 @@ WALRead(XLogReaderState *state,
* ----------------------------------------
*/
-/*
- * Private function to reset the state, forgetting all decoded records, if we
- * are asked to move to a new read position.
- */
+/* private function to reset the state between records */
static void
ResetDecoder(XLogReaderState *state)
{
- DecodedXLogRecord *r;
+ int block_id;
- /* Reset the decoded record queue, freeing any oversized records. */
- while ((r = state->decode_queue_tail))
- {
- state->decode_queue_tail = r->next;
- if (r->oversized)
- pfree(r);
- }
- state->decode_queue_head = NULL;
- state->decode_queue_tail = NULL;
- state->record = NULL;
- state->decoding = NULL;
-
- /* Reset the decode buffer to empty. */
- state->decode_buffer_head = state->decode_buffer;
- state->decode_buffer_tail = state->decode_buffer;
+ state->decoded_record = NULL;
- /* Clear error state. */
- state->errormsg_buf[0] = '\0';
- state->errormsg_deferred = false;
-}
+ state->main_data_len = 0;
-/*
- * Compute the maximum possible amount of padding that could be required to
- * decode a record, given xl_tot_len from the record's header. This is the
- * amount of output buffer space that we need to decode a record, though we
- * might not finish up using it all.
- *
- * This computation is pessimistic and assumes the maximum possible number of
- * blocks, due to lack of better information.
- */
-size_t
-DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
-{
- size_t size = 0;
-
- /* Account for the fixed size part of the decoded record struct. */
- size += offsetof(DecodedXLogRecord, blocks[0]);
- /* Account for the flexible blocks array of maximum possible size. */
- size += sizeof(DecodedBkpBlock) * (XLR_MAX_BLOCK_ID + 1);
- /* Account for all the raw main and block data. */
- size += xl_tot_len;
- /* We might insert padding before main_data. */
- size += (MAXIMUM_ALIGNOF - 1);
- /* We might insert padding before each block's data. */
- size += (MAXIMUM_ALIGNOF - 1) * (XLR_MAX_BLOCK_ID + 1);
- /* We might insert padding at the end. */
- size += (MAXIMUM_ALIGNOF - 1);
-
- return size;
+ for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ {
+ state->blocks[block_id].in_use = false;
+ state->blocks[block_id].has_image = false;
+ state->blocks[block_id].has_data = false;
+ state->blocks[block_id].apply_image = false;
+ }
+ state->max_block_id = -1;
}
/*
- * Decode a record. "decoded" must point to a MAXALIGNed memory area that has
- * space for at least DecodeXLogRecordRequiredSpace(record) bytes. On
- * success, decoded->size contains the actual space occupied by the decoded
- * record, which may turn out to be less.
- *
- * Only decoded->oversized member must be initialized already, and will not be
- * modified. Other members will be initialized as required.
+ * Decode the previously read record.
*
* On error, a human-readable error message is returned in *errormsg, and
* the return value is false.
*/
bool
-DecodeXLogRecord(XLogReaderState *state,
- DecodedXLogRecord *decoded,
- XLogRecord *record,
- XLogRecPtr lsn,
- char **errormsg)
+DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
{
/*
* read next _size bytes from record buffer, but check for overrun first.
@@ -1837,20 +1185,17 @@ DecodeXLogRecord(XLogReaderState *state,
} while(0)
char *ptr;
- char *out;
uint32 remaining;
uint32 datatotal;
RelFileNode *rnode = NULL;
uint8 block_id;
- decoded->header = *record;
- decoded->lsn = lsn;
- decoded->next = NULL;
- decoded->record_origin = InvalidRepOriginId;
- decoded->toplevel_xid = InvalidTransactionId;
- decoded->main_data = NULL;
- decoded->main_data_len = 0;
- decoded->max_block_id = -1;
+ ResetDecoder(state);
+
+ state->decoded_record = record;
+ state->record_origin = InvalidRepOriginId;
+ state->toplevel_xid = InvalidTransactionId;
+
ptr = (char *) record;
ptr += SizeOfXLogRecord;
remaining = record->xl_tot_len - SizeOfXLogRecord;
@@ -1868,7 +1213,7 @@ DecodeXLogRecord(XLogReaderState *state,
COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
- decoded->main_data_len = main_data_len;
+ state->main_data_len = main_data_len;
datatotal += main_data_len;
break; /* by convention, the main data fragment is
* always last */
@@ -1879,18 +1224,18 @@ DecodeXLogRecord(XLogReaderState *state,
uint32 main_data_len;
COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
- decoded->main_data_len = main_data_len;
+ state->main_data_len = main_data_len;
datatotal += main_data_len;
break; /* by convention, the main data fragment is
* always last */
}
else if (block_id == XLR_BLOCK_ID_ORIGIN)
{
- COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId));
+ COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
}
else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
{
- COPY_HEADER_FIELD(&decoded->toplevel_xid, sizeof(TransactionId));
+ COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
}
else if (block_id <= XLR_MAX_BLOCK_ID)
{
@@ -1898,11 +1243,7 @@ DecodeXLogRecord(XLogReaderState *state,
DecodedBkpBlock *blk;
uint8 fork_flags;
- /* mark any intervening block IDs as not in use */
- for (int i = decoded->max_block_id + 1; i < block_id; ++i)
- decoded->blocks[i].in_use = false;
-
- if (block_id <= decoded->max_block_id)
+ if (block_id <= state->max_block_id)
{
report_invalid_record(state,
"out-of-order block_id %u at %X/%X",
@@ -1910,9 +1251,9 @@ DecodeXLogRecord(XLogReaderState *state,
LSN_FORMAT_ARGS(state->ReadRecPtr));
goto err;
}
- decoded->max_block_id = block_id;
+ state->max_block_id = block_id;
- blk = &decoded->blocks[block_id];
+ blk = &state->blocks[block_id];
blk->in_use = true;
blk->apply_image = false;
@@ -1922,8 +1263,6 @@ DecodeXLogRecord(XLogReaderState *state,
blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
- blk->recent_buffer = InvalidBuffer;
-
COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
if (blk->has_data && blk->data_len == 0)
@@ -2058,18 +1397,17 @@ DecodeXLogRecord(XLogReaderState *state,
/*
* Ok, we've parsed the fragment headers, and verified that the total
* length of the payload in the fragments is equal to the amount of data
- * left. Copy the data of each fragment to contiguous space after the
- * blocks array, inserting alignment padding before the data fragments so
- * they can be cast to struct pointers by REDO routines.
+ * left. Copy the data of each fragment to a separate buffer.
+ *
+ * We could just set up pointers into readRecordBuf, but we want to align
+ * the data for the convenience of the callers. Backup images are not
+ * copied, however; they don't need alignment.
*/
- out = ((char *) decoded) +
- offsetof(DecodedXLogRecord, blocks) +
- sizeof(decoded->blocks[0]) * (decoded->max_block_id + 1);
/* block data first */
- for (block_id = 0; block_id <= decoded->max_block_id; block_id++)
+ for (block_id = 0; block_id <= state->max_block_id; block_id++)
{
- DecodedBkpBlock *blk = &decoded->blocks[block_id];
+ DecodedBkpBlock *blk = &state->blocks[block_id];
if (!blk->in_use)
continue;
@@ -2078,36 +1416,57 @@ DecodeXLogRecord(XLogReaderState *state,
if (blk->has_image)
{
- /* no need to align image */
- blk->bkp_image = out;
- memcpy(out, ptr, blk->bimg_len);
+ blk->bkp_image = ptr;
ptr += blk->bimg_len;
- out += blk->bimg_len;
}
if (blk->has_data)
{
- out = (char *) MAXALIGN(out);
- blk->data = out;
+ if (!blk->data || blk->data_len > blk->data_bufsz)
+ {
+ if (blk->data)
+ pfree(blk->data);
+
+ /*
+ * Force the initial request to be BLCKSZ so that we don't
+ * waste time with lots of trips through this stanza as a
+ * result of WAL compression.
+ */
+ blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ));
+ blk->data = palloc(blk->data_bufsz);
+ }
memcpy(blk->data, ptr, blk->data_len);
ptr += blk->data_len;
- out += blk->data_len;
}
}
/* and finally, the main data */
- if (decoded->main_data_len > 0)
+ if (state->main_data_len > 0)
{
- out = (char *) MAXALIGN(out);
- decoded->main_data = out;
- memcpy(decoded->main_data, ptr, decoded->main_data_len);
- ptr += decoded->main_data_len;
- out += decoded->main_data_len;
- }
+ if (!state->main_data || state->main_data_len > state->main_data_bufsz)
+ {
+ if (state->main_data)
+ pfree(state->main_data);
- /* Report the actual size we used. */
- decoded->size = MAXALIGN(out - (char *) decoded);
- Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >=
- decoded->size);
+ /*
+ * main_data_bufsz must be MAXALIGN'ed. In many xlog record
+ * types, we omit trailing struct padding on-disk to save a few
+ * bytes; but compilers may generate accesses to the xlog struct
+ * that assume that padding bytes are present. If the palloc
+ * request is not large enough to include such padding bytes then
+ * we'll get valgrind complaints due to otherwise-harmless fetches
+ * of the padding bytes.
+ *
+ * In addition, force the initial request to be reasonably large
+ * so that we don't waste time with lots of trips through this
+ * stanza. BLCKSZ / 2 seems like a good compromise choice.
+ */
+ state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
+ BLCKSZ / 2));
+ state->main_data = palloc(state->main_data_bufsz);
+ }
+ memcpy(state->main_data, ptr, state->main_data_len);
+ ptr += state->main_data_len;
+ }
return true;
@@ -2132,30 +1491,18 @@ bool
XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
{
- return XLogRecGetRecentBuffer(record, block_id, rnode, forknum, blknum,
- NULL);
-}
-
-bool
-XLogRecGetRecentBuffer(XLogReaderState *record, uint8 block_id,
- RelFileNode *rnode, ForkNumber *forknum,
- BlockNumber *blknum, Buffer *recent_buffer)
-{
DecodedBkpBlock *bkpb;
- if (block_id > record->record->max_block_id ||
- !record->record->blocks[block_id].in_use)
+ if (!record->blocks[block_id].in_use)
return false;
- bkpb = &record->record->blocks[block_id];
+ bkpb = &record->blocks[block_id];
if (rnode)
*rnode = bkpb->rnode;
if (forknum)
*forknum = bkpb->forknum;
if (blknum)
*blknum = bkpb->blkno;
- if (recent_buffer)
- *recent_buffer = bkpb->recent_buffer;
return true;
}
@@ -2169,11 +1516,10 @@ XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
{
DecodedBkpBlock *bkpb;
- if (block_id > record->record->max_block_id ||
- !record->record->blocks[block_id].in_use)
+ if (!record->blocks[block_id].in_use)
return NULL;
- bkpb = &record->record->blocks[block_id];
+ bkpb = &record->blocks[block_id];
if (!bkpb->has_data)
{
@@ -2201,13 +1547,12 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
char *ptr;
PGAlignedBlock tmp;
- if (block_id > record->record->max_block_id ||
- !record->record->blocks[block_id].in_use)
+ if (!record->blocks[block_id].in_use)
return false;
- if (!record->record->blocks[block_id].has_image)
+ if (!record->blocks[block_id].has_image)
return false;
- bkpb = &record->record->blocks[block_id];
+ bkpb = &record->blocks[block_id];
ptr = bkpb->bkp_image;
if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)