summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/walreceiver.c9
-rw-r--r--src/backend/replication/walreceiverfuncs.c13
-rw-r--r--src/include/replication/walreceiver.h14
3 files changed, 27 insertions, 9 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 3826e82c052..32a1575ab07 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -218,7 +218,7 @@ WalReceiverMain(void)
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
- startpoint = walrcv->receivedUpto;
+ startpoint = walrcv->receiveStart;
SpinLockRelease(&walrcv->mutex);
/* Arrange to clean up at walreceiver exit */
@@ -558,8 +558,11 @@ XLogWalRcvFlush(bool dying)
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
- walrcv->latestChunkStart = walrcv->receivedUpto;
- walrcv->receivedUpto = LogstreamResult.Flush;
+ if (XLByteLT(walrcv->receivedUpto, LogstreamResult.Flush))
+ {
+ walrcv->latestChunkStart = walrcv->receivedUpto;
+ walrcv->receivedUpto = LogstreamResult.Flush;
+ }
SpinLockRelease(&walrcv->mutex);
/* Signal the startup process that new WAL has arrived */
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 04c90049437..48ab503d893 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -199,8 +199,17 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
walrcv->walRcvState = WALRCV_STARTING;
walrcv->startTime = now;
- walrcv->receivedUpto = recptr;
- walrcv->latestChunkStart = recptr;
+ /*
+ * If this is the first startup of walreceiver, we initialize
+ * receivedUpto and latestChunkStart to receiveStart.
+ */
+ if (walrcv->receiveStart.xlogid == 0 &&
+ walrcv->receiveStart.xrecoff == 0)
+ {
+ walrcv->receivedUpto = recptr;
+ walrcv->latestChunkStart = recptr;
+ }
+ walrcv->receiveStart = recptr;
SpinLockRelease(&walrcv->mutex);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 9137b861c7d..775232b6e6f 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -52,11 +52,17 @@ typedef struct
pg_time_t startTime;
/*
+ * receiveStart is the first byte position that will be received.
+ * When startup process starts the walreceiver, it sets receiveStart
+ * to the point where it wants the streaming to begin.
+ */
+ XLogRecPtr receiveStart;
+
+ /*
* receivedUpto-1 is the last byte position that has already been
- * received. When startup process starts the walreceiver, it sets
- * receivedUpto to the point where it wants the streaming to begin. After
- * that, walreceiver updates this whenever it flushes the received WAL to
- * disk.
+ * received. At the first startup of walreceiver, receivedUpto is
+ * set to receiveStart. After that, walreceiver updates this whenever
+ * it flushes the received WAL to disk.
*/
XLogRecPtr receivedUpto;