diff options
author | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2020-05-08 15:30:34 -0400 |
---|---|---|
committer | Alvaro Herrera <alvherre@alvh.no-ip.org> | 2020-05-08 15:40:11 -0400 |
commit | b060dbe0001a1d6bf26cd294710f3cb203868d46 (patch) | |
tree | 6e9e980aa63ec1ec3655b93c92b9b5caa6689d38 /src/backend/access/transam/xlogreader.c | |
parent | 871696ba20e0251e86041576373809d1c7ca161d (diff) |
Rework XLogReader callback system
Code review for 0dc8ead46363, prompted by a bug closed by 91c40548d5f7.
XLogReader's system for opening and closing segments had gotten too
complicated, with callbacks being passed at both the XLogReaderAllocate
level (read_page) as well as at the WALRead level (segment_open). This
was confusing and hard to follow, so restructure things so that these
callbacks are passed together at XLogReaderAllocate time, and add
another callback to the set (segment_close) to make it a coherent whole.
Also, ensure XLogReaderState is an argument to all the callbacks, so
that they can grab at the ->private data if necessary.
Document the whole arrangement more clearly.
Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Discussion: https://postgr.es/m/20200422175754.GA19858@alvherre.pgsql
Diffstat (limited to 'src/backend/access/transam/xlogreader.c')
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 51 |
1 files changed, 29 insertions, 22 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 79ff976474c..7cee8b92c90 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -71,7 +71,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) */ XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, - XLogPageReadCB pagereadfunc, void *private_data) + XLogReaderRoutine *routine, void *private_data) { XLogReaderState *state; @@ -81,6 +81,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, if (!state) return NULL; + /* initialize caller-provided support functions */ + state->routine = *routine; + state->max_block_id = -1; /* @@ -102,7 +105,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, waldir); - state->read_page = pagereadfunc; /* system_identifier initialized to zeroes above */ state->private_data = private_data; /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */ @@ -137,7 +139,7 @@ XLogReaderFree(XLogReaderState *state) int block_id; if (state->seg.ws_file != -1) - close(state->seg.ws_file); + state->routine.segment_close(state); for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) { @@ -250,7 +252,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * XLogBeginRead() or XLogFindNextRecord() must be called before the first call * to XLogReadRecord(). * - * If the read_page callback fails to read the requested data, NULL is + * If the page_read callback fails to read the requested data, NULL is * returned. The callback is expected to have reported the error; errormsg * is set to NULL. * @@ -559,10 +561,10 @@ err: /* * Read a single xlog page including at least [pageptr, reqLen] of valid data - * via the read_page() callback. + * via the page_read() callback. * * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the read_page callback). + * is set in that case (unless the error occurs in the page_read callback). * * We fetch the page from a reader-local cache if we know we have the required * data and if there hasn't been any error since caching the data. @@ -589,7 +591,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * Data is not in our buffer. * * Every time we actually read the segment, even if we looked at parts of - * it before, we need to do verification as the read_page callback might + * it before, we need to do verification as the page_read callback might * now be rereading data from a different source. * * Whenever switching to a new WAL segment, we read the first page of the @@ -601,9 +603,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) { XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; - readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ, - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; @@ -619,9 +621,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) * First, read the requested data length, but at least a short page header * so that we can validate it. */ - readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; @@ -638,9 +640,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) /* still not enough */ if (readLen < XLogPageHeaderSize(hdr)) { - readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr), - state->currRecPtr, - state->readBuf); + readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), + state->currRecPtr, + state->readBuf); if (readLen < 0) goto err; } @@ -1041,11 +1043,14 @@ err: #endif /* FRONTEND */ /* + * Helper function to ease writing of XLogRoutine->page_read callbacks. + * If this function is used, caller must supply an open_segment callback in + * 'state', as that is used here. + * * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL * fetched from timeline 'tli'. * - * 'seg/segcxt' identify the last segment used. 'openSegment' is a callback - * to open the next segment, if necessary. + * 'seg/segcxt' identify the last segment used. * * Returns true if succeeded, false if an error occurs, in which case * 'errinfo' receives error details. @@ -1054,9 +1059,10 @@ err: * WAL buffers when possible. */ bool -WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, +WALRead(XLogReaderState *state, + char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALOpenSegment *seg, WALSegmentContext *segcxt, - WALSegmentOpen openSegment, WALReadError *errinfo) + WALReadError *errinfo) { char *p; XLogRecPtr recptr; @@ -1086,10 +1092,11 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, XLogSegNo nextSegNo; if (seg->ws_file >= 0) - close(seg->ws_file); + state->routine.segment_close(state); XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize); - seg->ws_file = openSegment(nextSegNo, segcxt, &tli); + seg->ws_file = state->routine.segment_open(state, nextSegNo, + segcxt, &tli); /* Update the current segment info. */ seg->ws_tli = tli; |