diff options
| -rw-r--r-- | src/backend/replication/walsender.c | 50 |
1 files changed, 33 insertions, 17 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 59822f22b8d..1ce21a2ad98 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -233,6 +233,7 @@ typedef struct int write_head; int read_heads[NUM_SYNC_REP_WAIT_MODE]; WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]; + WalTimeSample overflowed[NUM_SYNC_REP_WAIT_MODE]; } LagTracker; static LagTracker *lag_tracker; @@ -4207,7 +4208,6 @@ WalSndKeepaliveIfNecessary(void) static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) { - bool buffer_full; int new_write_head; int i; @@ -4229,25 +4229,19 @@ LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) * of space. */ new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE; - buffer_full = false; for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i) { + /* + * If the buffer is full, move the slowest reader to a separate + * overflow entry and free its space in the buffer so the write head + * can advance. + */ if (new_write_head == lag_tracker->read_heads[i]) - buffer_full = true; - } - - /* - * If the buffer is full, for now we just rewind by one slot and overwrite - * the last sample, as a simple (if somewhat uneven) way to lower the - * sampling rate. There may be better adaptive compaction algorithms. - */ - if (buffer_full) - { - new_write_head = lag_tracker->write_head; - if (lag_tracker->write_head > 0) - lag_tracker->write_head--; - else - lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1; + { + lag_tracker->overflowed[i] = + lag_tracker->buffer[lag_tracker->read_heads[i]]; + lag_tracker->read_heads[i] = -1; + } } /* Store a sample at the current write head position. */ @@ -4274,6 +4268,28 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) { TimestampTz time = 0; + /* + * If 'lsn' has not passed the WAL position stored in the overflow entry, + * return the elapsed time (in microseconds) since the saved local flush + * time. If the flush time is in the future (due to clock drift), return + * -1 to treat as no valid sample. + * + * Otherwise, switch back to using the buffer to control the read head and + * compute the elapsed time. The read head is then reset to point to the + * oldest entry in the buffer. + */ + if (lag_tracker->read_heads[head] == -1) + { + if (lag_tracker->overflowed[head].lsn > lsn) + return (now >= lag_tracker->overflowed[head].time) ? + now - lag_tracker->overflowed[head].time : -1; + + time = lag_tracker->overflowed[head].time; + lag_tracker->last_read[head] = lag_tracker->overflowed[head]; + lag_tracker->read_heads[head] = + (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE; + } + /* Read all unread samples up to this LSN or end of buffer. */ while (lag_tracker->read_heads[head] != lag_tracker->write_head && lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn) |
