summaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/README2
-rw-r--r--src/backend/replication/walreceiver.c15
-rw-r--r--src/backend/replication/walreceiverfuncs.c24
-rw-r--r--src/backend/replication/walsender.c2
4 files changed, 30 insertions, 13 deletions
diff --git a/src/backend/replication/README b/src/backend/replication/README
index 0cbb9906135..8ccdd86e74b 100644
--- a/src/backend/replication/README
+++ b/src/backend/replication/README
@@ -54,7 +54,7 @@ and WalRcvData->slotname, and initializes the starting point in
WalRcvData->receiveStart.
As walreceiver receives WAL from the master server, and writes and flushes
-it to disk (in pg_wal), it updates WalRcvData->receivedUpto and signals
+it to disk (in pg_wal), it updates WalRcvData->flushedUpto and signals
the startup process to know how far WAL replay can advance.
Walreceiver sends information about replication progress to the master server
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index aee67c61aa6..d69fb90132d 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -12,7 +12,7 @@
* in the primary server), and then keeps receiving XLOG records and
* writing them to the disk as long as the connection is alive. As XLOG
* records are received and flushed to disk, it updates the
- * WalRcv->receivedUpto variable in shared memory, to inform the startup
+ * WalRcv->flushedUpto variable in shared memory, to inform the startup
* process of how far it can proceed with XLOG replay.
*
* A WAL receiver cannot directly load GUC parameters used when establishing
@@ -261,6 +261,8 @@ WalReceiverMain(void)
SpinLockRelease(&walrcv->mutex);
+ pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
+
/* Arrange to clean up at walreceiver exit */
on_shmem_exit(WalRcvDie, 0);
@@ -984,6 +986,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
LogstreamResult.Write = recptr;
}
+
+ /* Update shared-memory status */
+ pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
}
/*
@@ -1005,10 +1010,10 @@ XLogWalRcvFlush(bool dying)
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
- if (walrcv->receivedUpto < LogstreamResult.Flush)
+ if (walrcv->flushedUpto < LogstreamResult.Flush)
{
- walrcv->latestChunkStart = walrcv->receivedUpto;
- walrcv->receivedUpto = LogstreamResult.Flush;
+ walrcv->latestChunkStart = walrcv->flushedUpto;
+ walrcv->flushedUpto = LogstreamResult.Flush;
walrcv->receivedTLI = ThisTimeLineID;
}
SpinLockRelease(&walrcv->mutex);
@@ -1361,7 +1366,7 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
state = WalRcv->walRcvState;
receive_start_lsn = WalRcv->receiveStart;
receive_start_tli = WalRcv->receiveStartTLI;
- received_lsn = WalRcv->receivedUpto;
+ received_lsn = WalRcv->flushedUpto;
received_tli = WalRcv->receivedTLI;
last_send_time = WalRcv->lastMsgSendTime;
last_receipt_time = WalRcv->lastMsgReceiptTime;
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 21d18236076..4afad83539c 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -282,11 +282,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
/*
* If this is the first startup of walreceiver (on this timeline),
- * initialize receivedUpto and latestChunkStart to the starting point.
+ * initialize flushedUpto and latestChunkStart to the starting point.
*/
if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
{
- walrcv->receivedUpto = recptr;
+ walrcv->flushedUpto = recptr;
walrcv->receivedTLI = tli;
walrcv->latestChunkStart = recptr;
}
@@ -304,7 +304,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
}
/*
- * Returns the last+1 byte position that walreceiver has written.
+ * Returns the last+1 byte position that walreceiver has flushed.
*
* Optionally, returns the previous chunk start, that is the first byte
* written in the most recent walreceiver flush cycle. Callers not
@@ -312,13 +312,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
* receiveTLI.
*/
XLogRecPtr
-GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
+GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
{
WalRcvData *walrcv = WalRcv;
XLogRecPtr recptr;
SpinLockAcquire(&walrcv->mutex);
- recptr = walrcv->receivedUpto;
+ recptr = walrcv->flushedUpto;
if (latestChunkStart)
*latestChunkStart = walrcv->latestChunkStart;
if (receiveTLI)
@@ -329,6 +329,18 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
}
/*
+ * Returns the last+1 byte position that walreceiver has written.
+ * This returns a recently written value without taking a lock.
+ */
+XLogRecPtr
+GetWalRcvWriteRecPtr(void)
+{
+ WalRcvData *walrcv = WalRcv;
+
+ return pg_atomic_read_u64(&walrcv->writtenUpto);
+}
+
+/*
* Returns the replication apply delay in ms or -1
* if the apply delay info is not available
*/
@@ -345,7 +357,7 @@ GetReplicationApplyDelay(void)
TimestampTz chunkReplayStartTime;
SpinLockAcquire(&walrcv->mutex);
- receivePtr = walrcv->receivedUpto;
+ receivePtr = walrcv->flushedUpto;
SpinLockRelease(&walrcv->mutex);
replayPtr = GetXLogReplayRecPtr(NULL);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 06e8b790360..122d884f3e4 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2949,7 +2949,7 @@ GetStandbyFlushRecPtr(void)
* has streamed, but hasn't been replayed yet.
*/
- receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
+ receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
ThisTimeLineID = replayTLI;