summaryrefslogtreecommitdiff
path: root/src/backend/access/transam
diff options
context:
space:
mode:
authorAlexander Korotkov <akorotkov@postgresql.org>2025-11-03 13:31:13 +0200
committerAlexander Korotkov <akorotkov@postgresql.org>2025-11-05 11:44:13 +0200
commit3b4e53a075ea5671b075f8fd873241179f8e64af (patch)
treeeaa18a033d0ee4dfc8d2447346a4a7fbfac79919 /src/backend/access/transam
parent8af3ae0d4b36f4cbd6c72b12357ba928d02b3ebd (diff)
Add infrastructure for efficient LSN waiting
Implement a new facility that allows processes to wait for WAL to reach specific LSNs, both on primary (waiting for flush) and standby (waiting for replay) servers. The implementation uses shared memory with per-backend information organized into pairing heaps, allowing O(1) access to the minimum waited LSN. This enables fast-path checks: after replaying or flushing WAL, the startup process or WAL writer can quickly determine if any waiters need to be awakened. Key components: - New xlogwait.c/h module with WaitForLSNReplay() and WaitForLSNFlush() - Separate pairing heaps for replay and flush waiters - WaitLSN lightweight lock for coordinating shared state - Wait events WAIT_FOR_WAL_REPLAY and WAIT_FOR_WAL_FLUSH for monitoring This infrastructure can be used by features that need to wait for WAL operations to complete. Discussion: https://www.postgresql.org/message-id/flat/CAPpHfdsjtZLVzxjGT8rJHCYbM0D5dwkO+BBjcirozJ6nYbOW8Q@mail.gmail.com Discussion: https://www.postgresql.org/message-id/flat/CABPTF7UNft368x-RgOXkfj475OwEbp%2BVVO-wEXz7StgjD_%3D6sw%40mail.gmail.com Author: Kartyshov Ivan <i.kartyshov@postgrespro.ru> Author: Alexander Korotkov <aekorotkov@gmail.com> Author: Xuneng Zhou <xunengzhou@gmail.com> Reviewed-by: Michael Paquier <michael@paquier.xyz> Reviewed-by: Peter Eisentraut <peter.eisentraut@enterprisedb.com> Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Alexander Lakhin <exclusion@gmail.com> Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Reviewed-by: Euler Taveira <euler@eulerto.com> Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi> Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Reviewed-by: Xuneng Zhou <xunengzhou@gmail.com>
Diffstat (limited to 'src/backend/access/transam')
-rw-r--r--src/backend/access/transam/Makefile3
-rw-r--r--src/backend/access/transam/meson.build1
-rw-r--r--src/backend/access/transam/xlogwait.c409
3 files changed, 412 insertions, 1 deletions
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 661c55a9db7..a32f473e0a2 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -36,7 +36,8 @@ OBJS = \
xlogreader.o \
xlogrecovery.o \
xlogstats.o \
- xlogutils.o
+ xlogutils.o \
+ xlogwait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build
index e8ae9b13c8e..74a62ab3eab 100644
--- a/src/backend/access/transam/meson.build
+++ b/src/backend/access/transam/meson.build
@@ -24,6 +24,7 @@ backend_sources += files(
'xlogrecovery.c',
'xlogstats.c',
'xlogutils.c',
+ 'xlogwait.c',
)
# used by frontend programs to build a frontend xlogreader
diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c
new file mode 100644
index 00000000000..e04567cfd67
--- /dev/null
+++ b/src/backend/access/transam/xlogwait.c
@@ -0,0 +1,409 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogwait.c
+ * Implements waiting for WAL operations to reach specific LSNs.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/access/transam/xlogwait.c
+ *
+ * NOTES
+ * This file implements waiting for WAL operations to reach specific LSNs
+ * on both physical standby and primary servers. The core idea is simple:
+ * every process that wants to wait publishes the LSN it needs to the
+ * shared memory, and the appropriate process (startup on standby, or
+ * WAL writer/backend on primary) wakes it once that LSN has been reached.
+ *
+ * The shared memory used by this module comprises a procInfos
+ * per-backend array with the information of the awaited LSN for each
+ * of the backend processes. The elements of that array are organized
+ * into a pairing heap waitersHeap, which allows for very fast finding
+ * of the least awaited LSN.
+ *
+ * In addition, the least-awaited LSN is cached as minWaitedLSN. The
+ * waiter process publishes information about itself to the shared
+ * memory and waits on the latch until it is woken up by the appropriate
+ * process, standby is promoted, or the postmaster dies. Then, it cleans
+ * information about itself in the shared memory.
+ *
+ * On standby servers: After replaying a WAL record, the startup process
+ * first performs a fast path check minWaitedLSN > replayLSN. If this
+ * check is negative, it checks waitersHeap and wakes up the backend
+ * whose awaited LSNs are reached.
+ *
+ * On primary servers: After flushing WAL, the WAL writer or backend
+ * process performs a similar check against the flush LSN and wakes up
+ * waiters whose target flush LSNs have been reached.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <float.h>
+#include <math.h>
+
+#include "access/xlog.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+
+
+static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
+ void *arg);
+
+struct WaitLSNState *waitLSNState = NULL;
+
+/* Report the amount of shared memory space needed for WaitLSNState. */
+Size
+WaitLSNShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitLSNState, procInfos);
+ size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
+ return size;
+}
+
+/* Initialize the WaitLSNState in the shared memory. */
+void
+WaitLSNShmemInit(void)
+{
+ bool found;
+
+ waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
+ WaitLSNShmemSize(),
+ &found);
+ if (!found)
+ {
+ int i;
+
+ /* Initialize heaps and tracking */
+ for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
+ {
+ pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
+ pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, (void *) (uintptr_t) i);
+ }
+
+ /* Initialize process info array */
+ memset(&waitLSNState->procInfos, 0,
+ (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
+ }
+}
+
+/*
+ * Comparison function for LSN waiters heaps. Waiting processes are ordered by
+ * LSN, so that the waiter with smallest LSN is at the top.
+ */
+static int
+waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
+{
+ int i = (uintptr_t) arg;
+ const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode[i], a);
+ const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode[i], b);
+
+ if (aproc->waitLSN < bproc->waitLSN)
+ return 1;
+ else if (aproc->waitLSN > bproc->waitLSN)
+ return -1;
+ else
+ return 0;
+}
+
+/*
+ * Update minimum waited LSN for the specified LSN type
+ */
+static void
+updateMinWaitedLSN(WaitLSNType lsnType)
+{
+ XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
+ int i = (int) lsnType;
+
+ Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
+
+ if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
+ {
+ pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
+ WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode[i], node);
+
+ minWaitedLSN = procInfo->waitLSN;
+ }
+ pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
+}
+
+/*
+ * Add current process to appropriate waiters heap based on LSN type
+ */
+static void
+addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
+{
+ WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
+ int i = (int) lsnType;
+
+ Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
+
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+ procInfo->procno = MyProcNumber;
+ procInfo->waitLSN = lsn;
+
+ Assert(!procInfo->inHeap[i]);
+ pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode[i]);
+ procInfo->inHeap[i] = true;
+ updateMinWaitedLSN(lsnType);
+
+ LWLockRelease(WaitLSNLock);
+}
+
+/*
+ * Remove current process from appropriate waiters heap based on LSN type
+ */
+static void
+deleteLSNWaiter(WaitLSNType lsnType)
+{
+ WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
+ int i = (int) lsnType;
+
+ Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
+
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+ if (procInfo->inHeap[i])
+ {
+ pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode[i]);
+ procInfo->inHeap[i] = false;
+ updateMinWaitedLSN(lsnType);
+ }
+
+ LWLockRelease(WaitLSNLock);
+}
+
+/*
+ * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
+ * on the stack. It should be enough to take single iteration for most cases.
+ */
+#define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
+
+/*
+ * Remove waiters whose LSN has been reached from the heap and set their
+ * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
+ * and set latches for all waiters.
+ *
+ * This function first accumulates waiters to wake up into an array, then
+ * wakes them up without holding a WaitLSNLock. The array size is static and
+ * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
+ * to wake up all the waiters at once in the vast majority of cases. However,
+ * if there are more waiters, this function will loop to process them in
+ * multiple chunks.
+ */
+static void
+wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
+{
+ ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
+ int numWakeUpProcs;
+ int i = (int) lsnType;
+
+ Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
+
+ do
+ {
+ numWakeUpProcs = 0;
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+ /*
+ * Iterate the waiters heap until we find LSN not yet reached. Record
+ * process numbers to wake up, but send wakeups after releasing lock.
+ */
+ while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
+ {
+ pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
+ WaitLSNProcInfo *procInfo;
+
+ /* Get procInfo using appropriate heap node */
+ procInfo = pairingheap_container(WaitLSNProcInfo, heapNode[i], node);
+
+ if (!XLogRecPtrIsInvalid(currentLSN) && procInfo->waitLSN > currentLSN)
+ break;
+
+ Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
+ wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
+ (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
+
+ /* Update appropriate flag */
+ procInfo->inHeap[i] = false;
+
+ if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
+ break;
+ }
+
+ updateMinWaitedLSN(lsnType);
+ LWLockRelease(WaitLSNLock);
+
+ /*
+ * Set latches for processes whose waited LSNs have been reached.
+ * Since SetLatch() is a time-consuming operation, we do this outside
+ * of WaitLSNLock. This is safe because procLatch is never freed, so
+ * at worst we may set a latch for the wrong process or for no process
+ * at all, which is harmless.
+ */
+ for (i = 0; i < numWakeUpProcs; i++)
+ SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
+
+ } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
+}
+
+/*
+ * Wake up processes waiting for LSN to reach currentLSN
+ */
+void
+WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
+{
+ int i = (int) lsnType;
+
+ Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
+
+ /* Fast path check */
+ if (pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
+ return;
+
+ wakeupWaiters(lsnType, currentLSN);
+}
+
+/*
+ * Clean up LSN waiters for exiting process
+ */
+void
+WaitLSNCleanup(void)
+{
+ if (waitLSNState)
+ {
+ int i;
+
+ /*
+ * We do a fast-path check of the heap flags without the lock. These
+ * flags are set to true only by the process itself. So, it's only
+ * possible to get a false positive. But that will be eliminated by a
+ * recheck inside deleteLSNWaiter().
+ */
+
+ for (i = 0; i < (int) WAIT_LSN_TYPE_COUNT; i++)
+ {
+ if (waitLSNState->procInfos[MyProcNumber].inHeap[i])
+ deleteLSNWaiter((WaitLSNType) i);
+ }
+ }
+}
+
+/*
+ * Wait using MyLatch till the given LSN is reached, the replica gets
+ * promoted, or the postmaster dies.
+ *
+ * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
+ * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
+ * or replica got promoted before the target LSN reached.
+ */
+WaitLSNResult
+WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
+{
+ XLogRecPtr currentLSN;
+ TimestampTz endtime = 0;
+ int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+ /* Shouldn't be called when shmem isn't initialized */
+ Assert(waitLSNState);
+
+ /* Should have a valid proc number */
+ Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends);
+
+ if (timeout > 0)
+ {
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
+ wake_events |= WL_TIMEOUT;
+ }
+
+ /*
+ * Add our process to the waiters heap. It might happen that target LSN
+ * gets reached before we do. The check at the beginning of the loop
+ * below prevents the race condition.
+ */
+ addLSNWaiter(targetLSN, lsnType);
+
+ for (;;)
+ {
+ int rc;
+ long delay_ms = -1;
+
+ if (lsnType == WAIT_LSN_TYPE_REPLAY)
+ currentLSN = GetXLogReplayRecPtr(NULL);
+ else
+ currentLSN = GetFlushRecPtr(NULL);
+
+ /* Check that recovery is still in-progress */
+ if (!RecoveryInProgress())
+ {
+ /*
+ * Recovery was ended, but check if target LSN was already
+ * reached.
+ */
+ deleteLSNWaiter(lsnType);
+
+ if (PromoteIsTriggered() && targetLSN <= currentLSN)
+ return WAIT_LSN_RESULT_SUCCESS;
+ return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
+ }
+ else
+ {
+ /* Check if the waited LSN has been reached */
+ if (targetLSN <= currentLSN)
+ break;
+ }
+
+ if (timeout > 0)
+ {
+ delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
+ if (delay_ms <= 0)
+ break;
+ }
+
+ CHECK_FOR_INTERRUPTS();
+
+ rc = WaitLatch(MyLatch, wake_events, delay_ms,
+ (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (rc & WL_POSTMASTER_DEATH)
+ ereport(FATAL,
+ errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating connection due to unexpected postmaster exit"),
+ errcontext("while waiting for LSN"));
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+ }
+
+ /*
+ * Delete our process from the shared memory heap. We might already be
+ * deleted by the startup process. The 'inHeap' flags prevents us from
+ * the double deletion.
+ */
+ deleteLSNWaiter(lsnType);
+
+ /*
+ * If we didn't reach the target LSN, we must be exited by timeout.
+ */
+ if (targetLSN > currentLSN)
+ return WAIT_LSN_RESULT_TIMEOUT;
+
+ return WAIT_LSN_RESULT_SUCCESS;
+}