summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/Makefile3
-rw-r--r--src/backend/access/transam/meson.build1
-rw-r--r--src/backend/access/transam/xlogwait.c409
-rw-r--r--src/backend/storage/ipc/ipci.c3
-rw-r--r--src/backend/utils/activity/wait_event_names.txt3
-rw-r--r--src/include/access/xlogwait.h98
-rw-r--r--src/include/storage/lwlocklist.h1
-rw-r--r--src/tools/pgindent/typedefs.list4
8 files changed, 521 insertions, 1 deletions
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 661c55a9db7..a32f473e0a2 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -36,7 +36,8 @@ OBJS = \
xlogreader.o \
xlogrecovery.o \
xlogstats.o \
- xlogutils.o
+ xlogutils.o \
+ xlogwait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build
index e8ae9b13c8e..74a62ab3eab 100644
--- a/src/backend/access/transam/meson.build
+++ b/src/backend/access/transam/meson.build
@@ -24,6 +24,7 @@ backend_sources += files(
'xlogrecovery.c',
'xlogstats.c',
'xlogutils.c',
+ 'xlogwait.c',
)
# used by frontend programs to build a frontend xlogreader
diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c
new file mode 100644
index 00000000000..e04567cfd67
--- /dev/null
+++ b/src/backend/access/transam/xlogwait.c
@@ -0,0 +1,409 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogwait.c
+ * Implements waiting for WAL operations to reach specific LSNs.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/access/transam/xlogwait.c
+ *
+ * NOTES
+ * This file implements waiting for WAL operations to reach specific LSNs
+ * on both physical standby and primary servers. The core idea is simple:
+ * every process that wants to wait publishes the LSN it needs to the
+ * shared memory, and the appropriate process (startup on standby, or
+ * WAL writer/backend on primary) wakes it once that LSN has been reached.
+ *
+ * The shared memory used by this module comprises a procInfos
+ * per-backend array with the information of the awaited LSN for each
+ * of the backend processes. The elements of that array are organized
+ * into a pairing heap waitersHeap, which allows for very fast finding
+ * of the least awaited LSN.
+ *
+ * In addition, the least-awaited LSN is cached as minWaitedLSN. The
+ * waiter process publishes information about itself to the shared
+ * memory and waits on the latch until it is woken up by the appropriate
+ * process, standby is promoted, or the postmaster dies. Then, it cleans
+ * information about itself in the shared memory.
+ *
+ * On standby servers: After replaying a WAL record, the startup process
+ * first performs a fast path check minWaitedLSN > replayLSN. If this
+ * check is negative, it checks waitersHeap and wakes up the backend
+ * whose awaited LSNs are reached.
+ *
+ * On primary servers: After flushing WAL, the WAL writer or backend
+ * process performs a similar check against the flush LSN and wakes up
+ * waiters whose target flush LSNs have been reached.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <float.h>
+#include <math.h>
+
+#include "access/xlog.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+#include "utils/fmgrprotos.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+
+
+static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
+ void *arg);
+
+struct WaitLSNState *waitLSNState = NULL;
+
+/* Report the amount of shared memory space needed for WaitLSNState. */
+Size
+WaitLSNShmemSize(void)
+{
+ Size size;
+
+ size = offsetof(WaitLSNState, procInfos);
+ size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
+ return size;
+}
+
+/* Initialize the WaitLSNState in the shared memory. */
+void
+WaitLSNShmemInit(void)
+{
+ bool found;
+
+ waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
+ WaitLSNShmemSize(),
+ &found);
+ if (!found)
+ {
+ int i;
+
+ /* Initialize heaps and tracking */
+ for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
+ {
+ pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
+ pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, (void *) (uintptr_t) i);
+ }
+
+ /* Initialize process info array */
+ memset(&waitLSNState->procInfos, 0,
+ (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
+ }
+}
+
+/*
+ * Comparison function for LSN waiters heaps. Waiting processes are ordered by
+ * LSN, so that the waiter with smallest LSN is at the top.
+ */
+static int
+waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
+{
+ int i = (uintptr_t) arg;
+ const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode[i], a);
+ const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode[i], b);
+
+ if (aproc->waitLSN < bproc->waitLSN)
+ return 1;
+ else if (aproc->waitLSN > bproc->waitLSN)
+ return -1;
+ else
+ return 0;
+}
+
+/*
+ * Update minimum waited LSN for the specified LSN type
+ */
+static void
+updateMinWaitedLSN(WaitLSNType lsnType)
+{
+ XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
+ int i = (int) lsnType;
+
+ Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
+
+ if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
+ {
+ pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
+ WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode[i], node);
+
+ minWaitedLSN = procInfo->waitLSN;
+ }
+ pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
+}
+
+/*
+ * Add current process to appropriate waiters heap based on LSN type
+ */
+static void
+addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
+{
+ WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
+ int i = (int) lsnType;
+
+ Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
+
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+ procInfo->procno = MyProcNumber;
+ procInfo->waitLSN = lsn;
+
+ Assert(!procInfo->inHeap[i]);
+ pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode[i]);
+ procInfo->inHeap[i] = true;
+ updateMinWaitedLSN(lsnType);
+
+ LWLockRelease(WaitLSNLock);
+}
+
+/*
+ * Remove current process from appropriate waiters heap based on LSN type
+ */
+static void
+deleteLSNWaiter(WaitLSNType lsnType)
+{
+ WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
+ int i = (int) lsnType;
+
+ Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
+
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+ if (procInfo->inHeap[i])
+ {
+ pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode[i]);
+ procInfo->inHeap[i] = false;
+ updateMinWaitedLSN(lsnType);
+ }
+
+ LWLockRelease(WaitLSNLock);
+}
+
+/*
+ * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
+ * on the stack. It should be enough to take single iteration for most cases.
+ */
+#define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
+
+/*
+ * Remove waiters whose LSN has been reached from the heap and set their
+ * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
+ * and set latches for all waiters.
+ *
+ * This function first accumulates waiters to wake up into an array, then
+ * wakes them up without holding a WaitLSNLock. The array size is static and
+ * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
+ * to wake up all the waiters at once in the vast majority of cases. However,
+ * if there are more waiters, this function will loop to process them in
+ * multiple chunks.
+ */
+static void
+wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
+{
+ ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
+ int numWakeUpProcs;
+ int i = (int) lsnType;
+
+ Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
+
+ do
+ {
+ numWakeUpProcs = 0;
+ LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
+
+ /*
+ * Iterate the waiters heap until we find LSN not yet reached. Record
+ * process numbers to wake up, but send wakeups after releasing lock.
+ */
+ while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
+ {
+ pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
+ WaitLSNProcInfo *procInfo;
+
+ /* Get procInfo using appropriate heap node */
+ procInfo = pairingheap_container(WaitLSNProcInfo, heapNode[i], node);
+
+ if (!XLogRecPtrIsInvalid(currentLSN) && procInfo->waitLSN > currentLSN)
+ break;
+
+ Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
+ wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
+ (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
+
+ /* Update appropriate flag */
+ procInfo->inHeap[i] = false;
+
+ if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
+ break;
+ }
+
+ updateMinWaitedLSN(lsnType);
+ LWLockRelease(WaitLSNLock);
+
+ /*
+ * Set latches for processes whose waited LSNs have been reached.
+ * Since SetLatch() is a time-consuming operation, we do this outside
+ * of WaitLSNLock. This is safe because procLatch is never freed, so
+ * at worst we may set a latch for the wrong process or for no process
+ * at all, which is harmless.
+ */
+ for (i = 0; i < numWakeUpProcs; i++)
+ SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
+
+ } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
+}
+
+/*
+ * Wake up processes waiting for LSN to reach currentLSN
+ */
+void
+WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
+{
+ int i = (int) lsnType;
+
+ Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
+
+ /* Fast path check */
+ if (pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
+ return;
+
+ wakeupWaiters(lsnType, currentLSN);
+}
+
+/*
+ * Clean up LSN waiters for exiting process
+ */
+void
+WaitLSNCleanup(void)
+{
+ if (waitLSNState)
+ {
+ int i;
+
+ /*
+ * We do a fast-path check of the heap flags without the lock. These
+ * flags are set to true only by the process itself. So, it's only
+ * possible to get a false positive. But that will be eliminated by a
+ * recheck inside deleteLSNWaiter().
+ */
+
+ for (i = 0; i < (int) WAIT_LSN_TYPE_COUNT; i++)
+ {
+ if (waitLSNState->procInfos[MyProcNumber].inHeap[i])
+ deleteLSNWaiter((WaitLSNType) i);
+ }
+ }
+}
+
+/*
+ * Wait using MyLatch till the given LSN is reached, the replica gets
+ * promoted, or the postmaster dies.
+ *
+ * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
+ * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
+ * or replica got promoted before the target LSN reached.
+ */
+WaitLSNResult
+WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
+{
+ XLogRecPtr currentLSN;
+ TimestampTz endtime = 0;
+ int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+ /* Shouldn't be called when shmem isn't initialized */
+ Assert(waitLSNState);
+
+ /* Should have a valid proc number */
+ Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends);
+
+ if (timeout > 0)
+ {
+ endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
+ wake_events |= WL_TIMEOUT;
+ }
+
+ /*
+ * Add our process to the waiters heap. It might happen that target LSN
+ * gets reached before we do. The check at the beginning of the loop
+ * below prevents the race condition.
+ */
+ addLSNWaiter(targetLSN, lsnType);
+
+ for (;;)
+ {
+ int rc;
+ long delay_ms = -1;
+
+ if (lsnType == WAIT_LSN_TYPE_REPLAY)
+ currentLSN = GetXLogReplayRecPtr(NULL);
+ else
+ currentLSN = GetFlushRecPtr(NULL);
+
+ /* Check that recovery is still in-progress */
+ if (!RecoveryInProgress())
+ {
+ /*
+ * Recovery was ended, but check if target LSN was already
+ * reached.
+ */
+ deleteLSNWaiter(lsnType);
+
+ if (PromoteIsTriggered() && targetLSN <= currentLSN)
+ return WAIT_LSN_RESULT_SUCCESS;
+ return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
+ }
+ else
+ {
+ /* Check if the waited LSN has been reached */
+ if (targetLSN <= currentLSN)
+ break;
+ }
+
+ if (timeout > 0)
+ {
+ delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
+ if (delay_ms <= 0)
+ break;
+ }
+
+ CHECK_FOR_INTERRUPTS();
+
+ rc = WaitLatch(MyLatch, wake_events, delay_ms,
+ (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (rc & WL_POSTMASTER_DEATH)
+ ereport(FATAL,
+ errcode(ERRCODE_ADMIN_SHUTDOWN),
+ errmsg("terminating connection due to unexpected postmaster exit"),
+ errcontext("while waiting for LSN"));
+
+ if (rc & WL_LATCH_SET)
+ ResetLatch(MyLatch);
+ }
+
+ /*
+ * Delete our process from the shared memory heap. We might already be
+ * deleted by the startup process. The 'inHeap' flags prevents us from
+ * the double deletion.
+ */
+ deleteLSNWaiter(lsnType);
+
+ /*
+ * If we didn't reach the target LSN, we must be exited by timeout.
+ */
+ if (targetLSN > currentLSN)
+ return WAIT_LSN_RESULT_TIMEOUT;
+
+ return WAIT_LSN_RESULT_SUCCESS;
+}
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2fa045e6b0f..10ffce8d174 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -24,6 +24,7 @@
#include "access/twophase.h"
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
#include "commands/async.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -150,6 +151,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, InjectionPointShmemSize());
size = add_size(size, SlotSyncShmemSize());
size = add_size(size, AioShmemSize());
+ size = add_size(size, WaitLSNShmemSize());
/* include additional requested shmem from preload libraries */
size = add_size(size, total_addin_request);
@@ -343,6 +345,7 @@ CreateOrAttachShmemStructs(void)
WaitEventCustomShmemInit();
InjectionPointShmemInit();
AioShmemInit();
+ WaitLSNShmemInit();
}
/*
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 7553f6eacef..c1ac71ff7f2 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -89,6 +89,8 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem
LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server."
SSL_OPEN_SERVER "Waiting for SSL while attempting connection."
WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby."
+WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary."
+WAIT_FOR_WAL_REPLAY "Waiting for WAL replay to reach a target LSN on a standby."
WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process."
WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process."
@@ -355,6 +357,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry."
InjectionPoint "Waiting to read or update information related to injection points."
SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state."
AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue."
+WaitLSN "Waiting to read or update shared Wait-for-LSN state."
#
# END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h
new file mode 100644
index 00000000000..4dc328b1b07
--- /dev/null
+++ b/src/include/access/xlogwait.h
@@ -0,0 +1,98 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogwait.h
+ * Declarations for LSN replay waiting routines.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * src/include/access/xlogwait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef XLOG_WAIT_H
+#define XLOG_WAIT_H
+
+#include "access/xlogdefs.h"
+#include "lib/pairingheap.h"
+#include "port/atomics.h"
+#include "storage/procnumber.h"
+#include "storage/spin.h"
+#include "tcop/dest.h"
+
+/*
+ * Result statuses for WaitForLSNReplay().
+ */
+typedef enum
+{
+ WAIT_LSN_RESULT_SUCCESS, /* Target LSN is reached */
+ WAIT_LSN_RESULT_NOT_IN_RECOVERY, /* Recovery ended before or during our
+ * wait */
+ WAIT_LSN_RESULT_TIMEOUT /* Timeout occurred */
+} WaitLSNResult;
+
+/*
+ * LSN type for waiting facility.
+ */
+typedef enum WaitLSNType
+{
+ WAIT_LSN_TYPE_REPLAY = 0, /* Waiting for replay on standby */
+ WAIT_LSN_TYPE_FLUSH = 1, /* Waiting for flush on primary */
+ WAIT_LSN_TYPE_COUNT = 2
+} WaitLSNType;
+
+/*
+ * WaitLSNProcInfo - the shared memory structure representing information
+ * about the single process, which may wait for LSN operations. An item of
+ * waitLSNState->procInfos array.
+ */
+typedef struct WaitLSNProcInfo
+{
+ /* LSN, which this process is waiting for */
+ XLogRecPtr waitLSN;
+
+ /* Process to wake up once the waitLSN is reached */
+ ProcNumber procno;
+
+ /* Heap membership flags for LSN types */
+ bool inHeap[WAIT_LSN_TYPE_COUNT];
+
+ /* Heap nodes for LSN types */
+ pairingheap_node heapNode[WAIT_LSN_TYPE_COUNT];
+} WaitLSNProcInfo;
+
+/*
+ * WaitLSNState - the shared memory state for the LSN waiting facility.
+ */
+typedef struct WaitLSNState
+{
+ /*
+ * The minimum LSN values some process is waiting for. Used for the
+ * fast-path checking if we need to wake up any waiters after replaying a
+ * WAL record. Could be read lock-less. Update protected by WaitLSNLock.
+ */
+ pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT];
+
+ /*
+ * A pairing heaps of waiting processes ordered by LSN values (least LSN
+ * is on top). Protected by WaitLSNLock.
+ */
+ pairingheap waitersHeap[WAIT_LSN_TYPE_COUNT];
+
+ /*
+ * An array with per-process information, indexed by the process number.
+ * Protected by WaitLSNLock.
+ */
+ WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER];
+} WaitLSNState;
+
+
+extern PGDLLIMPORT WaitLSNState *waitLSNState;
+
+extern Size WaitLSNShmemSize(void);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN);
+extern void WaitLSNCleanup(void);
+extern WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN,
+ int64 timeout);
+
+#endif /* XLOG_WAIT_H */
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 06a1ffd4b08..5b0ce383408 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -85,6 +85,7 @@ PG_LWLOCK(50, DSMRegistry)
PG_LWLOCK(51, InjectionPoint)
PG_LWLOCK(52, SerialControl)
PG_LWLOCK(53, AioWorkerSubmissionQueue)
+PG_LWLOCK(54, WaitLSN)
/*
* There also exist several built-in LWLock tranches. As with the predefined
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 2ca7b75af57..73df31344be 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3267,6 +3267,10 @@ WaitEventIO
WaitEventIPC
WaitEventSet
WaitEventTimeout
+WaitLSNType
+WaitLSNState
+WaitLSNProcInfo
+WaitLSNResult
WaitPMResult
WalCloseMethod
WalCompression