diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 55 |
1 files changed, 47 insertions, 8 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 818284759d9..dc11af7cbcd 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -646,7 +646,8 @@ static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn, XLogRecPtr missingContrecPtr, TimeLineID newTLI); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); -static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); +static void KeepLogSeg(XLogRecPtr recptr, XLogRecPtr slotsMinLSN, + XLogSegNo *logSegNo); static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); static void AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, @@ -6333,6 +6334,7 @@ CreateCheckPoint(int flags) VirtualTransactionId *vxids; int nvxids; int oldXLogAllowed = 0; + XLogRecPtr slotsMinReqLSN; /* * An end-of-recovery checkpoint is really a shutdown checkpoint, just @@ -6535,6 +6537,15 @@ CreateCheckPoint(int flags) END_CRIT_SECTION(); /* + * Get the current minimum LSN to be used later in the WAL segment + * cleanup. We may clean up only WAL segments, which are not needed + * according to synchronized LSNs of replication slots. The slot's LSN + * might be advanced concurrently, so we call this before + * CheckPointReplicationSlots() synchronizes replication slots. + */ + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + + /* * In some cases there are groups of actions that must all occur on one * side or the other of a checkpoint record. Before flushing the * checkpoint record we must explicitly wait for any backend currently @@ -6699,15 +6710,23 @@ CreateCheckPoint(int flags) * prevent the disk holding the xlog from growing full. */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); - KeepLogSeg(recptr, &_logSegNo); + KeepLogSeg(recptr, slotsMinReqLSN, &_logSegNo); if (InvalidateObsoleteReplicationSlots(_logSegNo)) { /* + * Recalculate the current minimum LSN to be used in the WAL segment + * cleanup. Then, we must synchronize the replication slots again in + * order to make this LSN safe to use. + */ + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + CheckPointReplicationSlots(); + + /* * Some slots have been invalidated; recalculate the old-segment * horizon, starting again from RedoRecPtr. */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); - KeepLogSeg(recptr, &_logSegNo); + KeepLogSeg(recptr, slotsMinReqLSN, &_logSegNo); } _logSegNo--; RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr, @@ -6979,6 +6998,7 @@ CreateRestartPoint(int flags) XLogRecPtr endptr; XLogSegNo _logSegNo; TimestampTz xtime; + XLogRecPtr slotsMinReqLSN; /* Concurrent checkpoint/restartpoint cannot happen */ Assert(!IsUnderPostmaster || MyBackendType == B_CHECKPOINTER); @@ -7061,6 +7081,15 @@ CreateRestartPoint(int flags) MemSet(&CheckpointStats, 0, sizeof(CheckpointStats)); CheckpointStats.ckpt_start_t = GetCurrentTimestamp(); + /* + * Get the current minimum LSN to be used later in the WAL segment + * cleanup. We may clean up only WAL segments, which are not needed + * according to synchronized LSNs of replication slots. The slot's LSN + * might be advanced concurrently, so we call this before + * CheckPointReplicationSlots() synchronizes replication slots. + */ + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + if (log_checkpoints) LogCheckpointStart(flags, true); @@ -7143,15 +7172,23 @@ CreateRestartPoint(int flags) receivePtr = GetWalRcvFlushRecPtr(NULL, NULL); replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; - KeepLogSeg(endptr, &_logSegNo); + KeepLogSeg(endptr, slotsMinReqLSN, &_logSegNo); if (InvalidateObsoleteReplicationSlots(_logSegNo)) { /* + * Recalculate the current minimum LSN to be used in the WAL segment + * cleanup. Then, we must synchronize the replication slots again in + * order to make this LSN safe to use. + */ + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + CheckPointReplicationSlots(); + + /* * Some slots have been invalidated; recalculate the old-segment * horizon, starting again from RedoRecPtr. */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); - KeepLogSeg(endptr, &_logSegNo); + KeepLogSeg(endptr, slotsMinReqLSN, &_logSegNo); } _logSegNo--; @@ -7245,6 +7282,7 @@ GetWALAvailability(XLogRecPtr targetLSN) XLogSegNo oldestSegMaxWalSize; /* oldest segid kept by max_wal_size */ XLogSegNo oldestSlotSeg; /* oldest segid kept by slot */ uint64 keepSegs; + XLogRecPtr slotsMinReqLSN; /* * slot does not reserve WAL. Either deactivated, or has never been active @@ -7258,8 +7296,9 @@ GetWALAvailability(XLogRecPtr targetLSN) * oldestSlotSeg to the current segment. */ currpos = GetXLogWriteRecPtr(); + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); XLByteToSeg(currpos, oldestSlotSeg, wal_segment_size); - KeepLogSeg(currpos, &oldestSlotSeg); + KeepLogSeg(currpos, slotsMinReqLSN, &oldestSlotSeg); /* * Find the oldest extant segment file. We get 1 until checkpoint removes @@ -7320,7 +7359,7 @@ GetWALAvailability(XLogRecPtr targetLSN) * invalidation is optionally done here, instead. */ static void -KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) +KeepLogSeg(XLogRecPtr recptr, XLogRecPtr slotsMinReqLSN, XLogSegNo *logSegNo) { XLogSegNo currSegNo; XLogSegNo segno; @@ -7333,7 +7372,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) * Calculate how many segments are kept by slots first, adjusting for * max_slot_wal_keep_size. */ - keep = XLogGetReplicationSlotMinimumLSN(); + keep = slotsMinReqLSN; if (keep != InvalidXLogRecPtr && keep < recptr) { XLByteToSeg(keep, segno, wal_segment_size); |