diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/walreceiver.c | 9 | ||||
-rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 13 | ||||
-rw-r--r-- | src/include/replication/walreceiver.h | 14 |
3 files changed, 27 insertions, 9 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 3826e82c052..32a1575ab07 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -218,7 +218,7 @@ WalReceiverMain(void) /* Fetch information required to start streaming */ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); - startpoint = walrcv->receivedUpto; + startpoint = walrcv->receiveStart; SpinLockRelease(&walrcv->mutex); /* Arrange to clean up at walreceiver exit */ @@ -558,8 +558,11 @@ XLogWalRcvFlush(bool dying) /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); - walrcv->latestChunkStart = walrcv->receivedUpto; - walrcv->receivedUpto = LogstreamResult.Flush; + if (XLByteLT(walrcv->receivedUpto, LogstreamResult.Flush)) + { + walrcv->latestChunkStart = walrcv->receivedUpto; + walrcv->receivedUpto = LogstreamResult.Flush; + } SpinLockRelease(&walrcv->mutex); /* Signal the startup process that new WAL has arrived */ diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 04c90049437..48ab503d893 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -199,8 +199,17 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) walrcv->walRcvState = WALRCV_STARTING; walrcv->startTime = now; - walrcv->receivedUpto = recptr; - walrcv->latestChunkStart = recptr; + /* + * If this is the first startup of walreceiver, we initialize + * receivedUpto and latestChunkStart to receiveStart. + */ + if (walrcv->receiveStart.xlogid == 0 && + walrcv->receiveStart.xrecoff == 0) + { + walrcv->receivedUpto = recptr; + walrcv->latestChunkStart = recptr; + } + walrcv->receiveStart = recptr; SpinLockRelease(&walrcv->mutex); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 9137b861c7d..775232b6e6f 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -52,11 +52,17 @@ typedef struct pg_time_t startTime; /* + * receiveStart is the first byte position that will be received. + * When startup process starts the walreceiver, it sets receiveStart + * to the point where it wants the streaming to begin. + */ + XLogRecPtr receiveStart; + + /* * receivedUpto-1 is the last byte position that has already been - * received. When startup process starts the walreceiver, it sets - * receivedUpto to the point where it wants the streaming to begin. After - * that, walreceiver updates this whenever it flushes the received WAL to - * disk. + * received. At the first startup of walreceiver, receivedUpto is + * set to receiveStart. After that, walreceiver updates this whenever + * it flushes the received WAL to disk. */ XLogRecPtr receivedUpto; |