diff options
| -rw-r--r-- | src/backend/access/transam/Makefile | 3 | ||||
| -rw-r--r-- | src/backend/access/transam/meson.build | 1 | ||||
| -rw-r--r-- | src/backend/access/transam/xlogwait.c | 409 | ||||
| -rw-r--r-- | src/backend/storage/ipc/ipci.c | 3 | ||||
| -rw-r--r-- | src/backend/utils/activity/wait_event_names.txt | 3 | ||||
| -rw-r--r-- | src/include/access/xlogwait.h | 98 | ||||
| -rw-r--r-- | src/include/storage/lwlocklist.h | 1 | ||||
| -rw-r--r-- | src/tools/pgindent/typedefs.list | 4 |
8 files changed, 521 insertions, 1 deletions
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 661c55a9db7..a32f473e0a2 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -36,7 +36,8 @@ OBJS = \ xlogreader.o \ xlogrecovery.o \ xlogstats.o \ - xlogutils.o + xlogutils.o \ + xlogwait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build index e8ae9b13c8e..74a62ab3eab 100644 --- a/src/backend/access/transam/meson.build +++ b/src/backend/access/transam/meson.build @@ -24,6 +24,7 @@ backend_sources += files( 'xlogrecovery.c', 'xlogstats.c', 'xlogutils.c', + 'xlogwait.c', ) # used by frontend programs to build a frontend xlogreader diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c new file mode 100644 index 00000000000..e04567cfd67 --- /dev/null +++ b/src/backend/access/transam/xlogwait.c @@ -0,0 +1,409 @@ +/*------------------------------------------------------------------------- + * + * xlogwait.c + * Implements waiting for WAL operations to reach specific LSNs. + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/transam/xlogwait.c + * + * NOTES + * This file implements waiting for WAL operations to reach specific LSNs + * on both physical standby and primary servers. The core idea is simple: + * every process that wants to wait publishes the LSN it needs to the + * shared memory, and the appropriate process (startup on standby, or + * WAL writer/backend on primary) wakes it once that LSN has been reached. + * + * The shared memory used by this module comprises a procInfos + * per-backend array with the information of the awaited LSN for each + * of the backend processes. The elements of that array are organized + * into a pairing heap waitersHeap, which allows for very fast finding + * of the least awaited LSN. + * + * In addition, the least-awaited LSN is cached as minWaitedLSN. The + * waiter process publishes information about itself to the shared + * memory and waits on the latch until it is woken up by the appropriate + * process, standby is promoted, or the postmaster dies. Then, it cleans + * information about itself in the shared memory. + * + * On standby servers: After replaying a WAL record, the startup process + * first performs a fast path check minWaitedLSN > replayLSN. If this + * check is negative, it checks waitersHeap and wakes up the backend + * whose awaited LSNs are reached. + * + * On primary servers: After flushing WAL, the WAL writer or backend + * process performs a similar check against the flush LSN and wakes up + * waiters whose target flush LSNs have been reached. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <float.h> +#include <math.h> + +#include "access/xlog.h" +#include "access/xlogrecovery.h" +#include "access/xlogwait.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/latch.h" +#include "storage/proc.h" +#include "storage/shmem.h" +#include "utils/fmgrprotos.h" +#include "utils/pg_lsn.h" +#include "utils/snapmgr.h" + + +static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, + void *arg); + +struct WaitLSNState *waitLSNState = NULL; + +/* Report the amount of shared memory space needed for WaitLSNState. */ +Size +WaitLSNShmemSize(void) +{ + Size size; + + size = offsetof(WaitLSNState, procInfos); + size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo))); + return size; +} + +/* Initialize the WaitLSNState in the shared memory. */ +void +WaitLSNShmemInit(void) +{ + bool found; + + waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState", + WaitLSNShmemSize(), + &found); + if (!found) + { + int i; + + /* Initialize heaps and tracking */ + for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++) + { + pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX); + pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, (void *) (uintptr_t) i); + } + + /* Initialize process info array */ + memset(&waitLSNState->procInfos, 0, + (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo)); + } +} + +/* + * Comparison function for LSN waiters heaps. Waiting processes are ordered by + * LSN, so that the waiter with smallest LSN is at the top. + */ +static int +waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg) +{ + int i = (uintptr_t) arg; + const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode[i], a); + const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode[i], b); + + if (aproc->waitLSN < bproc->waitLSN) + return 1; + else if (aproc->waitLSN > bproc->waitLSN) + return -1; + else + return 0; +} + +/* + * Update minimum waited LSN for the specified LSN type + */ +static void +updateMinWaitedLSN(WaitLSNType lsnType) +{ + XLogRecPtr minWaitedLSN = PG_UINT64_MAX; + int i = (int) lsnType; + + Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT); + + if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i])) + { + pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]); + WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode[i], node); + + minWaitedLSN = procInfo->waitLSN; + } + pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN); +} + +/* + * Add current process to appropriate waiters heap based on LSN type + */ +static void +addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType) +{ + WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber]; + int i = (int) lsnType; + + Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT); + + LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); + + procInfo->procno = MyProcNumber; + procInfo->waitLSN = lsn; + + Assert(!procInfo->inHeap[i]); + pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode[i]); + procInfo->inHeap[i] = true; + updateMinWaitedLSN(lsnType); + + LWLockRelease(WaitLSNLock); +} + +/* + * Remove current process from appropriate waiters heap based on LSN type + */ +static void +deleteLSNWaiter(WaitLSNType lsnType) +{ + WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber]; + int i = (int) lsnType; + + Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT); + + LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); + + if (procInfo->inHeap[i]) + { + pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode[i]); + procInfo->inHeap[i] = false; + updateMinWaitedLSN(lsnType); + } + + LWLockRelease(WaitLSNLock); +} + +/* + * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated + * on the stack. It should be enough to take single iteration for most cases. + */ +#define WAKEUP_PROC_STATIC_ARRAY_SIZE (16) + +/* + * Remove waiters whose LSN has been reached from the heap and set their + * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap + * and set latches for all waiters. + * + * This function first accumulates waiters to wake up into an array, then + * wakes them up without holding a WaitLSNLock. The array size is static and + * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough + * to wake up all the waiters at once in the vast majority of cases. However, + * if there are more waiters, this function will loop to process them in + * multiple chunks. + */ +static void +wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN) +{ + ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE]; + int numWakeUpProcs; + int i = (int) lsnType; + + Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT); + + do + { + numWakeUpProcs = 0; + LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE); + + /* + * Iterate the waiters heap until we find LSN not yet reached. Record + * process numbers to wake up, but send wakeups after releasing lock. + */ + while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i])) + { + pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]); + WaitLSNProcInfo *procInfo; + + /* Get procInfo using appropriate heap node */ + procInfo = pairingheap_container(WaitLSNProcInfo, heapNode[i], node); + + if (!XLogRecPtrIsInvalid(currentLSN) && procInfo->waitLSN > currentLSN) + break; + + Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE); + wakeUpProcs[numWakeUpProcs++] = procInfo->procno; + (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]); + + /* Update appropriate flag */ + procInfo->inHeap[i] = false; + + if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE) + break; + } + + updateMinWaitedLSN(lsnType); + LWLockRelease(WaitLSNLock); + + /* + * Set latches for processes whose waited LSNs have been reached. + * Since SetLatch() is a time-consuming operation, we do this outside + * of WaitLSNLock. This is safe because procLatch is never freed, so + * at worst we may set a latch for the wrong process or for no process + * at all, which is harmless. + */ + for (i = 0; i < numWakeUpProcs; i++) + SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch); + + } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE); +} + +/* + * Wake up processes waiting for LSN to reach currentLSN + */ +void +WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN) +{ + int i = (int) lsnType; + + Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT); + + /* Fast path check */ + if (pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN) + return; + + wakeupWaiters(lsnType, currentLSN); +} + +/* + * Clean up LSN waiters for exiting process + */ +void +WaitLSNCleanup(void) +{ + if (waitLSNState) + { + int i; + + /* + * We do a fast-path check of the heap flags without the lock. These + * flags are set to true only by the process itself. So, it's only + * possible to get a false positive. But that will be eliminated by a + * recheck inside deleteLSNWaiter(). + */ + + for (i = 0; i < (int) WAIT_LSN_TYPE_COUNT; i++) + { + if (waitLSNState->procInfos[MyProcNumber].inHeap[i]) + deleteLSNWaiter((WaitLSNType) i); + } + } +} + +/* + * Wait using MyLatch till the given LSN is reached, the replica gets + * promoted, or the postmaster dies. + * + * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached. + * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery, + * or replica got promoted before the target LSN reached. + */ +WaitLSNResult +WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout) +{ + XLogRecPtr currentLSN; + TimestampTz endtime = 0; + int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH; + + /* Shouldn't be called when shmem isn't initialized */ + Assert(waitLSNState); + + /* Should have a valid proc number */ + Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends); + + if (timeout > 0) + { + endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout); + wake_events |= WL_TIMEOUT; + } + + /* + * Add our process to the waiters heap. It might happen that target LSN + * gets reached before we do. The check at the beginning of the loop + * below prevents the race condition. + */ + addLSNWaiter(targetLSN, lsnType); + + for (;;) + { + int rc; + long delay_ms = -1; + + if (lsnType == WAIT_LSN_TYPE_REPLAY) + currentLSN = GetXLogReplayRecPtr(NULL); + else + currentLSN = GetFlushRecPtr(NULL); + + /* Check that recovery is still in-progress */ + if (!RecoveryInProgress()) + { + /* + * Recovery was ended, but check if target LSN was already + * reached. + */ + deleteLSNWaiter(lsnType); + + if (PromoteIsTriggered() && targetLSN <= currentLSN) + return WAIT_LSN_RESULT_SUCCESS; + return WAIT_LSN_RESULT_NOT_IN_RECOVERY; + } + else + { + /* Check if the waited LSN has been reached */ + if (targetLSN <= currentLSN) + break; + } + + if (timeout > 0) + { + delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime); + if (delay_ms <= 0) + break; + } + + CHECK_FOR_INTERRUPTS(); + + rc = WaitLatch(MyLatch, wake_events, delay_ms, + (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH); + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (rc & WL_POSTMASTER_DEATH) + ereport(FATAL, + errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating connection due to unexpected postmaster exit"), + errcontext("while waiting for LSN")); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + + /* + * Delete our process from the shared memory heap. We might already be + * deleted by the startup process. The 'inHeap' flags prevents us from + * the double deletion. + */ + deleteLSNWaiter(lsnType); + + /* + * If we didn't reach the target LSN, we must be exited by timeout. + */ + if (targetLSN > currentLSN) + return WAIT_LSN_RESULT_TIMEOUT; + + return WAIT_LSN_RESULT_SUCCESS; +} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2fa045e6b0f..10ffce8d174 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -24,6 +24,7 @@ #include "access/twophase.h" #include "access/xlogprefetcher.h" #include "access/xlogrecovery.h" +#include "access/xlogwait.h" #include "commands/async.h" #include "miscadmin.h" #include "pgstat.h" @@ -150,6 +151,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, InjectionPointShmemSize()); size = add_size(size, SlotSyncShmemSize()); size = add_size(size, AioShmemSize()); + size = add_size(size, WaitLSNShmemSize()); /* include additional requested shmem from preload libraries */ size = add_size(size, total_addin_request); @@ -343,6 +345,7 @@ CreateOrAttachShmemStructs(void) WaitEventCustomShmemInit(); InjectionPointShmemInit(); AioShmemInit(); + WaitLSNShmemInit(); } /* diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 7553f6eacef..c1ac71ff7f2 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -89,6 +89,8 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAIT_FOR_STANDBY_CONFIRMATION "Waiting for WAL to be received and flushed by the physical standby." +WAIT_FOR_WAL_FLUSH "Waiting for WAL flush to reach a target LSN on a primary." +WAIT_FOR_WAL_REPLAY "Waiting for WAL replay to reach a target LSN on a standby." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." @@ -355,6 +357,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry." InjectionPoint "Waiting to read or update information related to injection points." SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state." AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue." +WaitLSN "Waiting to read or update shared Wait-for-LSN state." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h new file mode 100644 index 00000000000..4dc328b1b07 --- /dev/null +++ b/src/include/access/xlogwait.h @@ -0,0 +1,98 @@ +/*------------------------------------------------------------------------- + * + * xlogwait.h + * Declarations for LSN replay waiting routines. + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * src/include/access/xlogwait.h + * + *------------------------------------------------------------------------- + */ +#ifndef XLOG_WAIT_H +#define XLOG_WAIT_H + +#include "access/xlogdefs.h" +#include "lib/pairingheap.h" +#include "port/atomics.h" +#include "storage/procnumber.h" +#include "storage/spin.h" +#include "tcop/dest.h" + +/* + * Result statuses for WaitForLSNReplay(). + */ +typedef enum +{ + WAIT_LSN_RESULT_SUCCESS, /* Target LSN is reached */ + WAIT_LSN_RESULT_NOT_IN_RECOVERY, /* Recovery ended before or during our + * wait */ + WAIT_LSN_RESULT_TIMEOUT /* Timeout occurred */ +} WaitLSNResult; + +/* + * LSN type for waiting facility. + */ +typedef enum WaitLSNType +{ + WAIT_LSN_TYPE_REPLAY = 0, /* Waiting for replay on standby */ + WAIT_LSN_TYPE_FLUSH = 1, /* Waiting for flush on primary */ + WAIT_LSN_TYPE_COUNT = 2 +} WaitLSNType; + +/* + * WaitLSNProcInfo - the shared memory structure representing information + * about the single process, which may wait for LSN operations. An item of + * waitLSNState->procInfos array. + */ +typedef struct WaitLSNProcInfo +{ + /* LSN, which this process is waiting for */ + XLogRecPtr waitLSN; + + /* Process to wake up once the waitLSN is reached */ + ProcNumber procno; + + /* Heap membership flags for LSN types */ + bool inHeap[WAIT_LSN_TYPE_COUNT]; + + /* Heap nodes for LSN types */ + pairingheap_node heapNode[WAIT_LSN_TYPE_COUNT]; +} WaitLSNProcInfo; + +/* + * WaitLSNState - the shared memory state for the LSN waiting facility. + */ +typedef struct WaitLSNState +{ + /* + * The minimum LSN values some process is waiting for. Used for the + * fast-path checking if we need to wake up any waiters after replaying a + * WAL record. Could be read lock-less. Update protected by WaitLSNLock. + */ + pg_atomic_uint64 minWaitedLSN[WAIT_LSN_TYPE_COUNT]; + + /* + * A pairing heaps of waiting processes ordered by LSN values (least LSN + * is on top). Protected by WaitLSNLock. + */ + pairingheap waitersHeap[WAIT_LSN_TYPE_COUNT]; + + /* + * An array with per-process information, indexed by the process number. + * Protected by WaitLSNLock. + */ + WaitLSNProcInfo procInfos[FLEXIBLE_ARRAY_MEMBER]; +} WaitLSNState; + + +extern PGDLLIMPORT WaitLSNState *waitLSNState; + +extern Size WaitLSNShmemSize(void); +extern void WaitLSNShmemInit(void); +extern void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN); +extern void WaitLSNCleanup(void); +extern WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, + int64 timeout); + +#endif /* XLOG_WAIT_H */ diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index 06a1ffd4b08..5b0ce383408 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -85,6 +85,7 @@ PG_LWLOCK(50, DSMRegistry) PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) PG_LWLOCK(53, AioWorkerSubmissionQueue) +PG_LWLOCK(54, WaitLSN) /* * There also exist several built-in LWLock tranches. As with the predefined diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 2ca7b75af57..73df31344be 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3267,6 +3267,10 @@ WaitEventIO WaitEventIPC WaitEventSet WaitEventTimeout +WaitLSNType +WaitLSNState +WaitLSNProcInfo +WaitLSNResult WaitPMResult WalCloseMethod WalCompression |
