summaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c159
1 files changed, 140 insertions, 19 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 0f1047179cb..25edb5e1412 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1726,25 +1726,109 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
}
/*
+ * Wake up the logical walsender processes with logical failover slots if the
+ * currently acquired physical slot is specified in standby_slot_names GUC.
+ */
+void
+PhysicalWakeupLogicalWalSnd(void)
+{
+ Assert(MyReplicationSlot && SlotIsPhysical(MyReplicationSlot));
+
+ /*
+ * If we are running in a standby, there is no need to wake up walsenders.
+ * This is because we do not support syncing slots to cascading standbys,
+ * so, there are no walsenders waiting for standbys to catch up.
+ */
+ if (RecoveryInProgress())
+ return;
+
+ if (SlotExistsInStandbySlotNames(NameStr(MyReplicationSlot->data.name)))
+ ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv);
+}
+
+/*
+ * Returns true if not all standbys have caught up to the flushed position
+ * (flushed_lsn) when the current acquired slot is a logical failover
+ * slot and we are streaming; otherwise, returns false.
+ *
+ * If returning true, the function sets the appropriate wait event in
+ * wait_event; otherwise, wait_event is set to 0.
+ */
+static bool
+NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
+{
+ int elevel = got_STOPPING ? ERROR : WARNING;
+ bool failover_slot;
+
+ failover_slot = (replication_active && MyReplicationSlot->data.failover);
+
+ /*
+ * Note that after receiving the shutdown signal, an ERROR is reported if
+ * any slots are dropped, invalidated, or inactive. This measure is taken
+ * to prevent the walsender from waiting indefinitely.
+ */
+ if (failover_slot && !StandbySlotsHaveCaughtup(flushed_lsn, elevel))
+ {
+ *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION;
+ return true;
+ }
+
+ *wait_event = 0;
+ return false;
+}
+
+/*
+ * Returns true if we need to wait for WALs to be flushed to disk, or if not
+ * all standbys have caught up to the flushed position (flushed_lsn) when the
+ * current acquired slot is a logical failover slot and we are
+ * streaming; otherwise, returns false.
+ *
+ * If returning true, the function sets the appropriate wait event in
+ * wait_event; otherwise, wait_event is set to 0.
+ */
+static bool
+NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn,
+ uint32 *wait_event)
+{
+ /* Check if we need to wait for WALs to be flushed to disk */
+ if (target_lsn > flushed_lsn)
+ {
+ *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL;
+ return true;
+ }
+
+ /* Check if the standby slots have caught up to the flushed position */
+ return NeedToWaitForStandbys(flushed_lsn, wait_event);
+}
+
+/*
* Wait till WAL < loc is flushed to disk so it can be safely sent to client.
*
- * Returns end LSN of flushed WAL. Normally this will be >= loc, but
- * if we detect a shutdown request (either from postmaster or client)
- * we will return early, so caller must always check.
+ * If the walsender holds a logical failover slot, we also wait for all the
+ * specified streaming replication standby servers to confirm receipt of WAL
+ * up to RecentFlushPtr. It is beneficial to wait here for the confirmation
+ * up to RecentFlushPtr rather than waiting before transmitting each change
+ * to logical subscribers, which is already covered by RecentFlushPtr.
+ *
+ * Returns end LSN of flushed WAL. Normally this will be >= loc, but if we
+ * detect a shutdown request (either from postmaster or client) we will return
+ * early, so caller must always check.
*/
static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)
{
int wakeEvents;
+ uint32 wait_event = 0;
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
/*
* Fast path to avoid acquiring the spinlock in case we already know we
- * have enough WAL available. This is particularly interesting if we're
- * far behind.
+ * have enough WAL available and all the standby servers have confirmed
+ * receipt of WAL up to RecentFlushPtr. This is particularly interesting
+ * if we're far behind.
*/
- if (RecentFlushPtr != InvalidXLogRecPtr &&
- loc <= RecentFlushPtr)
+ if (!XLogRecPtrIsInvalid(RecentFlushPtr) &&
+ !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
return RecentFlushPtr;
/* Get a more recent flush pointer. */
@@ -1753,8 +1837,14 @@ WalSndWaitForWal(XLogRecPtr loc)
else
RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+ /*
+ * Within the loop, we wait for the necessary WALs to be flushed to disk
+ * first, followed by waiting for standbys to catch up if there are enough
+ * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
+ */
for (;;)
{
+ bool wait_for_standby_at_stop = false;
long sleeptime;
/* Clear any already-pending wakeups */
@@ -1781,21 +1871,35 @@ WalSndWaitForWal(XLogRecPtr loc)
if (got_STOPPING)
XLogBackgroundFlush();
- /* Update our idea of the currently flushed position. */
- if (!RecoveryInProgress())
- RecentFlushPtr = GetFlushRecPtr(NULL);
- else
- RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+ /*
+ * To avoid the scenario where standbys need to catch up to a newer
+ * WAL location in each iteration, we update our idea of the currently
+ * flushed position only if we are not waiting for standbys to catch
+ * up.
+ */
+ if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
+ {
+ if (!RecoveryInProgress())
+ RecentFlushPtr = GetFlushRecPtr(NULL);
+ else
+ RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+ }
/*
- * If postmaster asked us to stop, don't wait anymore.
+ * If postmaster asked us to stop and the standby slots have caught up
+ * to the flushed position, don't wait anymore.
*
* It's important to do this check after the recomputation of
* RecentFlushPtr, so we can send all remaining data before shutting
* down.
*/
if (got_STOPPING)
- break;
+ {
+ if (NeedToWaitForStandbys(RecentFlushPtr, &wait_event))
+ wait_for_standby_at_stop = true;
+ else
+ break;
+ }
/*
* We only send regular messages to the client for full decoded
@@ -1810,11 +1914,18 @@ WalSndWaitForWal(XLogRecPtr loc)
!waiting_for_ping_response)
WalSndKeepalive(false, InvalidXLogRecPtr);
- /* check whether we're done */
- if (loc <= RecentFlushPtr)
+ /*
+ * Exit the loop if already caught up and doesn't need to wait for
+ * standby slots.
+ */
+ if (!wait_for_standby_at_stop &&
+ !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
break;
- /* Waiting for new WAL. Since we need to wait, we're now caught up. */
+ /*
+ * Waiting for new WAL or waiting for standbys to catch up. Since we
+ * need to wait, we're now caught up.
+ */
WalSndCaughtUp = true;
/*
@@ -1852,7 +1963,9 @@ WalSndWaitForWal(XLogRecPtr loc)
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
- WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL);
+ Assert(wait_event != 0);
+
+ WalSndWait(wakeEvents, sleeptime, wait_event);
}
/* reactivate latch so WalSndLoop knows to continue */
@@ -2262,6 +2375,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
{
ReplicationSlotMarkDirty();
ReplicationSlotsComputeRequiredLSN();
+ PhysicalWakeupLogicalWalSnd();
}
/*
@@ -3535,6 +3649,7 @@ WalSndShmemInit(void)
ConditionVariableInit(&WalSndCtl->wal_flush_cv);
ConditionVariableInit(&WalSndCtl->wal_replay_cv);
+ ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv);
}
}
@@ -3604,8 +3719,14 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
*
* And, we use separate shared memory CVs for physical and logical
* walsenders for selective wake ups, see WalSndWakeup() for more details.
+ *
+ * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
+ * until awakened by physical walsenders after the walreceiver confirms
+ * the receipt of the LSN.
*/
- if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
+ if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
+ ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+ else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);