diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 159 |
1 files changed, 140 insertions, 19 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 0f1047179cb..25edb5e1412 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1726,25 +1726,109 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId } /* + * Wake up the logical walsender processes with logical failover slots if the + * currently acquired physical slot is specified in standby_slot_names GUC. + */ +void +PhysicalWakeupLogicalWalSnd(void) +{ + Assert(MyReplicationSlot && SlotIsPhysical(MyReplicationSlot)); + + /* + * If we are running in a standby, there is no need to wake up walsenders. + * This is because we do not support syncing slots to cascading standbys, + * so, there are no walsenders waiting for standbys to catch up. + */ + if (RecoveryInProgress()) + return; + + if (SlotExistsInStandbySlotNames(NameStr(MyReplicationSlot->data.name))) + ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv); +} + +/* + * Returns true if not all standbys have caught up to the flushed position + * (flushed_lsn) when the current acquired slot is a logical failover + * slot and we are streaming; otherwise, returns false. + * + * If returning true, the function sets the appropriate wait event in + * wait_event; otherwise, wait_event is set to 0. + */ +static bool +NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event) +{ + int elevel = got_STOPPING ? ERROR : WARNING; + bool failover_slot; + + failover_slot = (replication_active && MyReplicationSlot->data.failover); + + /* + * Note that after receiving the shutdown signal, an ERROR is reported if + * any slots are dropped, invalidated, or inactive. This measure is taken + * to prevent the walsender from waiting indefinitely. + */ + if (failover_slot && !StandbySlotsHaveCaughtup(flushed_lsn, elevel)) + { + *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION; + return true; + } + + *wait_event = 0; + return false; +} + +/* + * Returns true if we need to wait for WALs to be flushed to disk, or if not + * all standbys have caught up to the flushed position (flushed_lsn) when the + * current acquired slot is a logical failover slot and we are + * streaming; otherwise, returns false. + * + * If returning true, the function sets the appropriate wait event in + * wait_event; otherwise, wait_event is set to 0. + */ +static bool +NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, + uint32 *wait_event) +{ + /* Check if we need to wait for WALs to be flushed to disk */ + if (target_lsn > flushed_lsn) + { + *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL; + return true; + } + + /* Check if the standby slots have caught up to the flushed position */ + return NeedToWaitForStandbys(flushed_lsn, wait_event); +} + +/* * Wait till WAL < loc is flushed to disk so it can be safely sent to client. * - * Returns end LSN of flushed WAL. Normally this will be >= loc, but - * if we detect a shutdown request (either from postmaster or client) - * we will return early, so caller must always check. + * If the walsender holds a logical failover slot, we also wait for all the + * specified streaming replication standby servers to confirm receipt of WAL + * up to RecentFlushPtr. It is beneficial to wait here for the confirmation + * up to RecentFlushPtr rather than waiting before transmitting each change + * to logical subscribers, which is already covered by RecentFlushPtr. + * + * Returns end LSN of flushed WAL. Normally this will be >= loc, but if we + * detect a shutdown request (either from postmaster or client) we will return + * early, so caller must always check. */ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; + uint32 wait_event = 0; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; /* * Fast path to avoid acquiring the spinlock in case we already know we - * have enough WAL available. This is particularly interesting if we're - * far behind. + * have enough WAL available and all the standby servers have confirmed + * receipt of WAL up to RecentFlushPtr. This is particularly interesting + * if we're far behind. */ - if (RecentFlushPtr != InvalidXLogRecPtr && - loc <= RecentFlushPtr) + if (!XLogRecPtrIsInvalid(RecentFlushPtr) && + !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event)) return RecentFlushPtr; /* Get a more recent flush pointer. */ @@ -1753,8 +1837,14 @@ WalSndWaitForWal(XLogRecPtr loc) else RecentFlushPtr = GetXLogReplayRecPtr(NULL); + /* + * Within the loop, we wait for the necessary WALs to be flushed to disk + * first, followed by waiting for standbys to catch up if there are enough + * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal. + */ for (;;) { + bool wait_for_standby_at_stop = false; long sleeptime; /* Clear any already-pending wakeups */ @@ -1781,21 +1871,35 @@ WalSndWaitForWal(XLogRecPtr loc) if (got_STOPPING) XLogBackgroundFlush(); - /* Update our idea of the currently flushed position. */ - if (!RecoveryInProgress()) - RecentFlushPtr = GetFlushRecPtr(NULL); - else - RecentFlushPtr = GetXLogReplayRecPtr(NULL); + /* + * To avoid the scenario where standbys need to catch up to a newer + * WAL location in each iteration, we update our idea of the currently + * flushed position only if we are not waiting for standbys to catch + * up. + */ + if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION) + { + if (!RecoveryInProgress()) + RecentFlushPtr = GetFlushRecPtr(NULL); + else + RecentFlushPtr = GetXLogReplayRecPtr(NULL); + } /* - * If postmaster asked us to stop, don't wait anymore. + * If postmaster asked us to stop and the standby slots have caught up + * to the flushed position, don't wait anymore. * * It's important to do this check after the recomputation of * RecentFlushPtr, so we can send all remaining data before shutting * down. */ if (got_STOPPING) - break; + { + if (NeedToWaitForStandbys(RecentFlushPtr, &wait_event)) + wait_for_standby_at_stop = true; + else + break; + } /* * We only send regular messages to the client for full decoded @@ -1810,11 +1914,18 @@ WalSndWaitForWal(XLogRecPtr loc) !waiting_for_ping_response) WalSndKeepalive(false, InvalidXLogRecPtr); - /* check whether we're done */ - if (loc <= RecentFlushPtr) + /* + * Exit the loop if already caught up and doesn't need to wait for + * standby slots. + */ + if (!wait_for_standby_at_stop && + !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event)) break; - /* Waiting for new WAL. Since we need to wait, we're now caught up. */ + /* + * Waiting for new WAL or waiting for standbys to catch up. Since we + * need to wait, we're now caught up. + */ WalSndCaughtUp = true; /* @@ -1852,7 +1963,9 @@ WalSndWaitForWal(XLogRecPtr loc) if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL); + Assert(wait_event != 0); + + WalSndWait(wakeEvents, sleeptime, wait_event); } /* reactivate latch so WalSndLoop knows to continue */ @@ -2262,6 +2375,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredLSN(); + PhysicalWakeupLogicalWalSnd(); } /* @@ -3535,6 +3649,7 @@ WalSndShmemInit(void) ConditionVariableInit(&WalSndCtl->wal_flush_cv); ConditionVariableInit(&WalSndCtl->wal_replay_cv); + ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv); } } @@ -3604,8 +3719,14 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) * * And, we use separate shared memory CVs for physical and logical * walsenders for selective wake ups, see WalSndWakeup() for more details. + * + * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV + * until awakened by physical walsenders after the walreceiver confirms + * the receipt of the LSN. */ - if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) + if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION) + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv); else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL) ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv); |