diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 55 |
1 files changed, 47 insertions, 8 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c1dd8b54328..3735a2e3dfd 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -667,7 +667,8 @@ static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn, XLogRecPtr pagePtr, TimeLineID newTLI); static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags); -static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo); +static void KeepLogSeg(XLogRecPtr recptr, XLogRecPtr slotsMinLSN, + XLogSegNo *logSegNo); static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void); static void AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, @@ -6891,6 +6892,7 @@ CreateCheckPoint(int flags) VirtualTransactionId *vxids; int nvxids; int oldXLogAllowed = 0; + XLogRecPtr slotsMinReqLSN; /* * An end-of-recovery checkpoint is really a shutdown checkpoint, just @@ -7120,6 +7122,15 @@ CreateCheckPoint(int flags) END_CRIT_SECTION(); /* + * Get the current minimum LSN to be used later in the WAL segment + * cleanup. We may clean up only WAL segments, which are not needed + * according to synchronized LSNs of replication slots. The slot's LSN + * might be advanced concurrently, so we call this before + * CheckPointReplicationSlots() synchronizes replication slots. + */ + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + + /* * In some cases there are groups of actions that must all occur on one * side or the other of a checkpoint record. Before flushing the * checkpoint record we must explicitly wait for any backend currently @@ -7307,17 +7318,25 @@ CreateCheckPoint(int flags) * prevent the disk holding the xlog from growing full. */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); - KeepLogSeg(recptr, &_logSegNo); + KeepLogSeg(recptr, slotsMinReqLSN, &_logSegNo); if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED, _logSegNo, InvalidOid, InvalidTransactionId)) { /* + * Recalculate the current minimum LSN to be used in the WAL segment + * cleanup. Then, we must synchronize the replication slots again in + * order to make this LSN safe to use. + */ + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + CheckPointReplicationSlots(shutdown); + + /* * Some slots have been invalidated; recalculate the old-segment * horizon, starting again from RedoRecPtr. */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); - KeepLogSeg(recptr, &_logSegNo); + KeepLogSeg(recptr, slotsMinReqLSN, &_logSegNo); } _logSegNo--; RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr, @@ -7590,6 +7609,7 @@ CreateRestartPoint(int flags) XLogRecPtr endptr; XLogSegNo _logSegNo; TimestampTz xtime; + XLogRecPtr slotsMinReqLSN; /* Concurrent checkpoint/restartpoint cannot happen */ Assert(!IsUnderPostmaster || MyBackendType == B_CHECKPOINTER); @@ -7672,6 +7692,15 @@ CreateRestartPoint(int flags) MemSet(&CheckpointStats, 0, sizeof(CheckpointStats)); CheckpointStats.ckpt_start_t = GetCurrentTimestamp(); + /* + * Get the current minimum LSN to be used later in the WAL segment + * cleanup. We may clean up only WAL segments, which are not needed + * according to synchronized LSNs of replication slots. The slot's LSN + * might be advanced concurrently, so we call this before + * CheckPointReplicationSlots() synchronizes replication slots. + */ + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + if (log_checkpoints) LogCheckpointStart(flags, true); @@ -7760,17 +7789,25 @@ CreateRestartPoint(int flags) receivePtr = GetWalRcvFlushRecPtr(NULL, NULL); replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; - KeepLogSeg(endptr, &_logSegNo); + KeepLogSeg(endptr, slotsMinReqLSN, &_logSegNo); if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED, _logSegNo, InvalidOid, InvalidTransactionId)) { /* + * Recalculate the current minimum LSN to be used in the WAL segment + * cleanup. Then, we must synchronize the replication slots again in + * order to make this LSN safe to use. + */ + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); + CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN); + + /* * Some slots have been invalidated; recalculate the old-segment * horizon, starting again from RedoRecPtr. */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); - KeepLogSeg(endptr, &_logSegNo); + KeepLogSeg(endptr, slotsMinReqLSN, &_logSegNo); } _logSegNo--; @@ -7865,6 +7902,7 @@ GetWALAvailability(XLogRecPtr targetLSN) XLogSegNo oldestSegMaxWalSize; /* oldest segid kept by max_wal_size */ XLogSegNo oldestSlotSeg; /* oldest segid kept by slot */ uint64 keepSegs; + XLogRecPtr slotsMinReqLSN; /* * slot does not reserve WAL. Either deactivated, or has never been active @@ -7878,8 +7916,9 @@ GetWALAvailability(XLogRecPtr targetLSN) * oldestSlotSeg to the current segment. */ currpos = GetXLogWriteRecPtr(); + slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN(); XLByteToSeg(currpos, oldestSlotSeg, wal_segment_size); - KeepLogSeg(currpos, &oldestSlotSeg); + KeepLogSeg(currpos, slotsMinReqLSN, &oldestSlotSeg); /* * Find the oldest extant segment file. We get 1 until checkpoint removes @@ -7940,7 +7979,7 @@ GetWALAvailability(XLogRecPtr targetLSN) * invalidation is optionally done here, instead. */ static void -KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) +KeepLogSeg(XLogRecPtr recptr, XLogRecPtr slotsMinReqLSN, XLogSegNo *logSegNo) { XLogSegNo currSegNo; XLogSegNo segno; @@ -7953,7 +7992,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) * Calculate how many segments are kept by slots first, adjusting for * max_slot_wal_keep_size. */ - keep = XLogGetReplicationSlotMinimumLSN(); + keep = slotsMinReqLSN; if (keep != InvalidXLogRecPtr && keep < recptr) { XLByteToSeg(keep, segno, wal_segment_size); |