summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/walsender.c50
1 files changed, 33 insertions, 17 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 59822f22b8d..1ce21a2ad98 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -233,6 +233,7 @@ typedef struct
int write_head;
int read_heads[NUM_SYNC_REP_WAIT_MODE];
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
+ WalTimeSample overflowed[NUM_SYNC_REP_WAIT_MODE];
} LagTracker;
static LagTracker *lag_tracker;
@@ -4207,7 +4208,6 @@ WalSndKeepaliveIfNecessary(void)
static void
LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
{
- bool buffer_full;
int new_write_head;
int i;
@@ -4229,25 +4229,19 @@ LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
* of space.
*/
new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
- buffer_full = false;
for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
{
+ /*
+ * If the buffer is full, move the slowest reader to a separate
+ * overflow entry and free its space in the buffer so the write head
+ * can advance.
+ */
if (new_write_head == lag_tracker->read_heads[i])
- buffer_full = true;
- }
-
- /*
- * If the buffer is full, for now we just rewind by one slot and overwrite
- * the last sample, as a simple (if somewhat uneven) way to lower the
- * sampling rate. There may be better adaptive compaction algorithms.
- */
- if (buffer_full)
- {
- new_write_head = lag_tracker->write_head;
- if (lag_tracker->write_head > 0)
- lag_tracker->write_head--;
- else
- lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
+ {
+ lag_tracker->overflowed[i] =
+ lag_tracker->buffer[lag_tracker->read_heads[i]];
+ lag_tracker->read_heads[i] = -1;
+ }
}
/* Store a sample at the current write head position. */
@@ -4274,6 +4268,28 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
{
TimestampTz time = 0;
+ /*
+ * If 'lsn' has not passed the WAL position stored in the overflow entry,
+ * return the elapsed time (in microseconds) since the saved local flush
+ * time. If the flush time is in the future (due to clock drift), return
+ * -1 to treat as no valid sample.
+ *
+ * Otherwise, switch back to using the buffer to control the read head and
+ * compute the elapsed time. The read head is then reset to point to the
+ * oldest entry in the buffer.
+ */
+ if (lag_tracker->read_heads[head] == -1)
+ {
+ if (lag_tracker->overflowed[head].lsn > lsn)
+ return (now >= lag_tracker->overflowed[head].time) ?
+ now - lag_tracker->overflowed[head].time : -1;
+
+ time = lag_tracker->overflowed[head].time;
+ lag_tracker->last_read[head] = lag_tracker->overflowed[head];
+ lag_tracker->read_heads[head] =
+ (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
+ }
+
/* Read all unread samples up to this LSN or end of buffer. */
while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)