summaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c55
1 files changed, 47 insertions, 8 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index c1dd8b54328..3735a2e3dfd 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -667,7 +667,8 @@ static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn,
XLogRecPtr pagePtr,
TimeLineID newTLI);
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
-static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
+static void KeepLogSeg(XLogRecPtr recptr, XLogRecPtr slotsMinLSN,
+ XLogSegNo *logSegNo);
static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
static void AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli,
@@ -6891,6 +6892,7 @@ CreateCheckPoint(int flags)
VirtualTransactionId *vxids;
int nvxids;
int oldXLogAllowed = 0;
+ XLogRecPtr slotsMinReqLSN;
/*
* An end-of-recovery checkpoint is really a shutdown checkpoint, just
@@ -7120,6 +7122,15 @@ CreateCheckPoint(int flags)
END_CRIT_SECTION();
/*
+ * Get the current minimum LSN to be used later in the WAL segment
+ * cleanup. We may clean up only WAL segments, which are not needed
+ * according to synchronized LSNs of replication slots. The slot's LSN
+ * might be advanced concurrently, so we call this before
+ * CheckPointReplicationSlots() synchronizes replication slots.
+ */
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+
+ /*
* In some cases there are groups of actions that must all occur on one
* side or the other of a checkpoint record. Before flushing the
* checkpoint record we must explicitly wait for any backend currently
@@ -7307,17 +7318,25 @@ CreateCheckPoint(int flags)
* prevent the disk holding the xlog from growing full.
*/
XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
- KeepLogSeg(recptr, &_logSegNo);
+ KeepLogSeg(recptr, slotsMinReqLSN, &_logSegNo);
if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED,
_logSegNo, InvalidOid,
InvalidTransactionId))
{
/*
+ * Recalculate the current minimum LSN to be used in the WAL segment
+ * cleanup. Then, we must synchronize the replication slots again in
+ * order to make this LSN safe to use.
+ */
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+ CheckPointReplicationSlots(shutdown);
+
+ /*
* Some slots have been invalidated; recalculate the old-segment
* horizon, starting again from RedoRecPtr.
*/
XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
- KeepLogSeg(recptr, &_logSegNo);
+ KeepLogSeg(recptr, slotsMinReqLSN, &_logSegNo);
}
_logSegNo--;
RemoveOldXlogFiles(_logSegNo, RedoRecPtr, recptr,
@@ -7590,6 +7609,7 @@ CreateRestartPoint(int flags)
XLogRecPtr endptr;
XLogSegNo _logSegNo;
TimestampTz xtime;
+ XLogRecPtr slotsMinReqLSN;
/* Concurrent checkpoint/restartpoint cannot happen */
Assert(!IsUnderPostmaster || MyBackendType == B_CHECKPOINTER);
@@ -7672,6 +7692,15 @@ CreateRestartPoint(int flags)
MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
+ /*
+ * Get the current minimum LSN to be used later in the WAL segment
+ * cleanup. We may clean up only WAL segments, which are not needed
+ * according to synchronized LSNs of replication slots. The slot's LSN
+ * might be advanced concurrently, so we call this before
+ * CheckPointReplicationSlots() synchronizes replication slots.
+ */
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+
if (log_checkpoints)
LogCheckpointStart(flags, true);
@@ -7760,17 +7789,25 @@ CreateRestartPoint(int flags)
receivePtr = GetWalRcvFlushRecPtr(NULL, NULL);
replayPtr = GetXLogReplayRecPtr(&replayTLI);
endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
- KeepLogSeg(endptr, &_logSegNo);
+ KeepLogSeg(endptr, slotsMinReqLSN, &_logSegNo);
if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED,
_logSegNo, InvalidOid,
InvalidTransactionId))
{
/*
+ * Recalculate the current minimum LSN to be used in the WAL segment
+ * cleanup. Then, we must synchronize the replication slots again in
+ * order to make this LSN safe to use.
+ */
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
+ CheckPointReplicationSlots(flags & CHECKPOINT_IS_SHUTDOWN);
+
+ /*
* Some slots have been invalidated; recalculate the old-segment
* horizon, starting again from RedoRecPtr.
*/
XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
- KeepLogSeg(endptr, &_logSegNo);
+ KeepLogSeg(endptr, slotsMinReqLSN, &_logSegNo);
}
_logSegNo--;
@@ -7865,6 +7902,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
XLogSegNo oldestSegMaxWalSize; /* oldest segid kept by max_wal_size */
XLogSegNo oldestSlotSeg; /* oldest segid kept by slot */
uint64 keepSegs;
+ XLogRecPtr slotsMinReqLSN;
/*
* slot does not reserve WAL. Either deactivated, or has never been active
@@ -7878,8 +7916,9 @@ GetWALAvailability(XLogRecPtr targetLSN)
* oldestSlotSeg to the current segment.
*/
currpos = GetXLogWriteRecPtr();
+ slotsMinReqLSN = XLogGetReplicationSlotMinimumLSN();
XLByteToSeg(currpos, oldestSlotSeg, wal_segment_size);
- KeepLogSeg(currpos, &oldestSlotSeg);
+ KeepLogSeg(currpos, slotsMinReqLSN, &oldestSlotSeg);
/*
* Find the oldest extant segment file. We get 1 until checkpoint removes
@@ -7940,7 +7979,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
* invalidation is optionally done here, instead.
*/
static void
-KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
+KeepLogSeg(XLogRecPtr recptr, XLogRecPtr slotsMinReqLSN, XLogSegNo *logSegNo)
{
XLogSegNo currSegNo;
XLogSegNo segno;
@@ -7953,7 +7992,7 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
* Calculate how many segments are kept by slots first, adjusting for
* max_slot_wal_keep_size.
*/
- keep = XLogGetReplicationSlotMinimumLSN();
+ keep = slotsMinReqLSN;
if (keep != InvalidXLogRecPtr && keep < recptr)
{
XLByteToSeg(keep, segno, wal_segment_size);