diff options
| -rw-r--r-- | src/backend/replication/walsender.c | 50 | 
1 files changed, 33 insertions, 17 deletions
| diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 59822f22b8d..1ce21a2ad98 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -233,6 +233,7 @@ typedef struct  	int			write_head;  	int			read_heads[NUM_SYNC_REP_WAIT_MODE];  	WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]; +	WalTimeSample overflowed[NUM_SYNC_REP_WAIT_MODE];  } LagTracker;  static LagTracker *lag_tracker; @@ -4207,7 +4208,6 @@ WalSndKeepaliveIfNecessary(void)  static void  LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)  { -	bool		buffer_full;  	int			new_write_head;  	int			i; @@ -4229,25 +4229,19 @@ LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)  	 * of space.  	 */  	new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE; -	buffer_full = false;  	for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)  	{ +		/* +		 * If the buffer is full, move the slowest reader to a separate +		 * overflow entry and free its space in the buffer so the write head +		 * can advance. +		 */  		if (new_write_head == lag_tracker->read_heads[i]) -			buffer_full = true; -	} - -	/* -	 * If the buffer is full, for now we just rewind by one slot and overwrite -	 * the last sample, as a simple (if somewhat uneven) way to lower the -	 * sampling rate.  There may be better adaptive compaction algorithms. -	 */ -	if (buffer_full) -	{ -		new_write_head = lag_tracker->write_head; -		if (lag_tracker->write_head > 0) -			lag_tracker->write_head--; -		else -			lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1; +		{ +			lag_tracker->overflowed[i] = +				lag_tracker->buffer[lag_tracker->read_heads[i]]; +			lag_tracker->read_heads[i] = -1; +		}  	}  	/* Store a sample at the current write head position. */ @@ -4274,6 +4268,28 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)  {  	TimestampTz time = 0; +	/* +	 * If 'lsn' has not passed the WAL position stored in the overflow entry, +	 * return the elapsed time (in microseconds) since the saved local flush +	 * time. If the flush time is in the future (due to clock drift), return +	 * -1 to treat as no valid sample. +	 * +	 * Otherwise, switch back to using the buffer to control the read head and +	 * compute the elapsed time.  The read head is then reset to point to the +	 * oldest entry in the buffer. +	 */ +	if (lag_tracker->read_heads[head] == -1) +	{ +		if (lag_tracker->overflowed[head].lsn > lsn) +			return (now >= lag_tracker->overflowed[head].time) ? +				now - lag_tracker->overflowed[head].time : -1; + +		time = lag_tracker->overflowed[head].time; +		lag_tracker->last_read[head] = lag_tracker->overflowed[head]; +		lag_tracker->read_heads[head] = +			(lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE; +	} +  	/* Read all unread samples up to this LSN or end of buffer. */  	while (lag_tracker->read_heads[head] != lag_tracker->write_head &&  		   lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn) | 
