diff options
| -rw-r--r-- | doc/src/sgml/high-availability.sgml | 54 | ||||
| -rw-r--r-- | doc/src/sgml/ref/allfiles.sgml | 1 | ||||
| -rw-r--r-- | doc/src/sgml/ref/wait_for.sgml | 234 | ||||
| -rw-r--r-- | doc/src/sgml/reference.sgml | 1 | ||||
| -rw-r--r-- | src/backend/access/transam/xact.c | 6 | ||||
| -rw-r--r-- | src/backend/access/transam/xlog.c | 7 | ||||
| -rw-r--r-- | src/backend/access/transam/xlogrecovery.c | 11 | ||||
| -rw-r--r-- | src/backend/commands/Makefile | 3 | ||||
| -rw-r--r-- | src/backend/commands/meson.build | 1 | ||||
| -rw-r--r-- | src/backend/commands/wait.c | 212 | ||||
| -rw-r--r-- | src/backend/parser/gram.y | 33 | ||||
| -rw-r--r-- | src/backend/storage/lmgr/proc.c | 6 | ||||
| -rw-r--r-- | src/backend/tcop/pquery.c | 12 | ||||
| -rw-r--r-- | src/backend/tcop/utility.c | 22 | ||||
| -rw-r--r-- | src/include/commands/wait.h | 22 | ||||
| -rw-r--r-- | src/include/nodes/parsenodes.h | 8 | ||||
| -rw-r--r-- | src/include/parser/kwlist.h | 2 | ||||
| -rw-r--r-- | src/include/tcop/cmdtaglist.h | 1 | ||||
| -rw-r--r-- | src/test/recovery/meson.build | 3 | ||||
| -rw-r--r-- | src/test/recovery/t/049_wait_for_lsn.pl | 302 | ||||
| -rw-r--r-- | src/tools/pgindent/typedefs.list | 1 |
21 files changed, 931 insertions, 11 deletions
diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index b47d8b4106e..742deb037b7 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -1376,6 +1376,60 @@ synchronous_standby_names = 'ANY 2 (s1, s2, s3)' </sect3> </sect2> + <sect2 id="read-your-writes-consistency"> + <title>Read-Your-Writes Consistency</title> + + <para> + In asynchronous replication, there is always a short window where changes + on the primary may not yet be visible on the standby due to replication + lag. This can lead to inconsistencies when an application writes data on + the primary and then immediately issues a read query on the standby. + However, it is possible to address this without switching to synchronous + replication. + </para> + + <para> + To address this, PostgreSQL offers a mechanism for read-your-writes + consistency. The key idea is to ensure that a client sees its own writes + by synchronizing the WAL replay on the standby with the known point of + change on the primary. + </para> + + <para> + This is achieved by the following steps. After performing write + operations, the application retrieves the current WAL location using a + function call like this. + + <programlisting> +postgres=# SELECT pg_current_wal_insert_lsn(); +pg_current_wal_insert_lsn +-------------------- +0/306EE20 +(1 row) + </programlisting> + </para> + + <para> + The <acronym>LSN</acronym> obtained from the primary is then communicated + to the standby server. This can be managed at the application level or + via the connection pooler. On the standby, the application issues the + <xref linkend="sql-wait-for"/> command to block further processing until + the standby's WAL replay process reaches (or exceeds) the specified + <acronym>LSN</acronym>. + + <programlisting> +postgres=# WAIT FOR LSN '0/306EE20'; + status +-------- + success +(1 row) + </programlisting> + Once the command returns a status of success, it guarantees that all + changes up to the provided <acronym>LSN</acronym> have been applied, + ensuring that subsequent read queries will reflect the latest updates. + </para> + </sect2> + <sect2 id="continuous-archiving-in-standby"> <title>Continuous Archiving in Standby</title> diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml index f5be638867a..e167406c744 100644 --- a/doc/src/sgml/ref/allfiles.sgml +++ b/doc/src/sgml/ref/allfiles.sgml @@ -188,6 +188,7 @@ Complete list of usable sgml source files in this directory. <!ENTITY update SYSTEM "update.sgml"> <!ENTITY vacuum SYSTEM "vacuum.sgml"> <!ENTITY values SYSTEM "values.sgml"> +<!ENTITY waitFor SYSTEM "wait_for.sgml"> <!-- applications and utilities --> <!ENTITY clusterdb SYSTEM "clusterdb.sgml"> diff --git a/doc/src/sgml/ref/wait_for.sgml b/doc/src/sgml/ref/wait_for.sgml new file mode 100644 index 00000000000..3b8e842d1de --- /dev/null +++ b/doc/src/sgml/ref/wait_for.sgml @@ -0,0 +1,234 @@ +<!-- +doc/src/sgml/ref/wait_for.sgml +PostgreSQL documentation +--> + +<refentry id="sql-wait-for"> + <indexterm zone="sql-wait-for"> + <primary>WAIT FOR</primary> + </indexterm> + + <refmeta> + <refentrytitle>WAIT FOR</refentrytitle> + <manvolnum>7</manvolnum> + <refmiscinfo>SQL - Language Statements</refmiscinfo> + </refmeta> + + <refnamediv> + <refname>WAIT FOR</refname> + <refpurpose>wait for target <acronym>LSN</acronym> to be replayed, optionally with a timeout</refpurpose> + </refnamediv> + + <refsynopsisdiv> +<synopsis> +WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replaceable class="parameter">option</replaceable> [, ...] ) ] + +<phrase>where <replaceable class="parameter">option</replaceable> can be:</phrase> + + TIMEOUT '<replaceable class="parameter">timeout</replaceable>' + NO_THROW +</synopsis> + </refsynopsisdiv> + + <refsect1> + <title>Description</title> + + <para> + Waits until recovery replays <parameter>lsn</parameter>. + If no <parameter>timeout</parameter> is specified or it is set to + zero, this command waits indefinitely for the + <parameter>lsn</parameter>. + On timeout, or if the server is promoted before + <parameter>lsn</parameter> is reached, an error is emitted, + unless <literal>NO_THROW</literal> is specified in the WITH clause. + If <parameter>NO_THROW</parameter> is specified, then the command + doesn't throw errors. + </para> + + <para> + The possible return values are <literal>success</literal>, + <literal>timeout</literal>, and <literal>not in recovery</literal>. + </para> + </refsect1> + + <refsect1> + <title>Parameters</title> + + <variablelist> + <varlistentry> + <term><replaceable class="parameter">lsn</replaceable></term> + <listitem> + <para> + Specifies the target <acronym>LSN</acronym> to wait for. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>WITH ( <replaceable class="parameter">option</replaceable> [, ...] )</literal></term> + <listitem> + <para> + This clause specifies optional parameters for the wait operation. + The following parameters are supported: + + <variablelist> + <varlistentry> + <term><literal>TIMEOUT</literal> '<replaceable class="parameter">timeout</replaceable>'</term> + <listitem> + <para> + When specified and <parameter>timeout</parameter> is greater than zero, + the command waits until <parameter>lsn</parameter> is reached or + the specified <parameter>timeout</parameter> has elapsed. + </para> + <para> + The <parameter>timeout</parameter> might be given as integer number of + milliseconds. Also it might be given as string literal with + integer number of milliseconds or a number with unit + (see <xref linkend="config-setting-names-values"/>). + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>NO_THROW</literal></term> + <listitem> + <para> + Specify to not throw an error in the case of timeout or + running on the primary. In this case the result status can be get from + the return value. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect1> + + <refsect1> + <title>Outputs</title> + + <variablelist> + <varlistentry> + <term><literal>success</literal></term> + <listitem> + <para> + This return value denotes that we have successfully reached + the target <parameter>lsn</parameter>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>timeout</literal></term> + <listitem> + <para> + This return value denotes that the timeout happened before reaching + the target <parameter>lsn</parameter>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>not in recovery</literal></term> + <listitem> + <para> + This return value denotes that the database server is not in a recovery + state. This might mean either the database server was not in recovery + at the moment of receiving the command, or it was promoted before + reaching the target <parameter>lsn</parameter>. + </para> + </listitem> + </varlistentry> + </variablelist> + </refsect1> + + <refsect1> + <title>Notes</title> + + <para> + <command>WAIT FOR</command> command waits till + <parameter>lsn</parameter> to be replayed on standby. + That is, after this command execution, the value returned by + <function>pg_last_wal_replay_lsn</function> should be greater or equal + to the <parameter>lsn</parameter> value. This is useful to achieve + read-your-writes-consistency, while using async replica for reads and + primary for writes. In that case, the <acronym>lsn</acronym> of the last + modification should be stored on the client application side or the + connection pooler side. + </para> + + <para> + <command>WAIT FOR</command> command should be called on standby. + If a user runs <command>WAIT FOR</command> on primary, it + will error out unless <parameter>NO_THROW</parameter> is specified in the WITH clause. + However, if <command>WAIT FOR</command> is + called on primary promoted from standby and <literal>lsn</literal> + was already replayed, then the <command>WAIT FOR</command> command just + exits immediately. + </para> + +</refsect1> + + <refsect1> + <title>Examples</title> + + <para> + You can use <command>WAIT FOR</command> command to wait for + the <type>pg_lsn</type> value. For example, an application could update + the <literal>movie</literal> table and get the <acronym>lsn</acronym> after + changes just made. This example uses <function>pg_current_wal_insert_lsn</function> + on primary server to get the <acronym>lsn</acronym> given that + <varname>synchronous_commit</varname> could be set to + <literal>off</literal>. + + <programlisting> +postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama'; +UPDATE 100 +postgres=# SELECT pg_current_wal_insert_lsn(); +pg_current_wal_insert_lsn +-------------------- +0/306EE20 +(1 row) +</programlisting> + + Then an application could run <command>WAIT FOR</command> + with the <parameter>lsn</parameter> obtained from primary. After that the + changes made on primary should be guaranteed to be visible on replica. + +<programlisting> +postgres=# WAIT FOR LSN '0/306EE20'; + status +-------- + success +(1 row) +postgres=# SELECT * FROM movie WHERE genre = 'Drama'; + genre +------- +(0 rows) +</programlisting> + </para> + + <para> + If the target LSN is not reached before the timeout, the error is thrown. + +<programlisting> +postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '0.1s'); +ERROR: timed out while waiting for target LSN 0/306EE20 to be replayed; current replay LSN 0/306EA60 +</programlisting> + </para> + + <para> + The same example uses <command>WAIT FOR</command> with + <parameter>NO_THROW</parameter> option. +<programlisting> +postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '100ms', NO_THROW); + status +-------- + timeout +(1 row) +</programlisting> + </para> + </refsect1> +</refentry> diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml index ff85ace83fc..2cf02c37b17 100644 --- a/doc/src/sgml/reference.sgml +++ b/doc/src/sgml/reference.sgml @@ -216,6 +216,7 @@ &update; &vacuum; &values; + &waitFor; </reference> diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 2cf3d4e92b7..092e197eba3 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -31,6 +31,7 @@ #include "access/xloginsert.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "catalog/index.h" #include "catalog/namespace.h" #include "catalog/pg_enum.h" @@ -2843,6 +2844,11 @@ AbortTransaction(void) */ LWLockReleaseAll(); + /* + * Cleanup waiting for LSN if any. + */ + WaitLSNCleanup(); + /* Clear wait information and command progress indicator */ pgstat_report_wait_end(); pgstat_progress_end_command(); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9900e3e0179..7c959051e11 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -62,6 +62,7 @@ #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "backup/basebackup.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" @@ -6228,6 +6229,12 @@ StartupXLOG(void) LWLockRelease(ControlFileLock); /* + * Wake up all waiters for replay LSN. They need to report an error that + * recovery was ended before reaching the target LSN. + */ + WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr); + + /* * Shutdown the recovery environment. This must occur after * RecoverPreparedTransactions() (see notes in lock_twophase_recover()) * and after switching SharedRecoveryState to RECOVERY_STATE_DONE so as diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 93c50831b26..550de6e4a59 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -40,6 +40,7 @@ #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "backup/basebackup.h" #include "catalog/pg_control.h" #include "commands/tablespace.h" @@ -1838,6 +1839,16 @@ PerformWalRecovery(void) break; } + /* + * If we replayed an LSN that someone was waiting for then walk + * over the shared memory array and set latches to notify the + * waiters. + */ + if (waitLSNState && + (XLogRecoveryCtl->lastReplayedEndRecPtr >= + pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY]))) + WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr); + /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); } while (record != NULL); diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index cb2fbdc7c60..f99acfd2b4b 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -64,6 +64,7 @@ OBJS = \ vacuum.o \ vacuumparallel.o \ variable.o \ - view.o + view.o \ + wait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build index dd4cde41d32..9f640ad4810 100644 --- a/src/backend/commands/meson.build +++ b/src/backend/commands/meson.build @@ -53,4 +53,5 @@ backend_sources += files( 'vacuumparallel.c', 'variable.c', 'view.c', + 'wait.c', ) diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c new file mode 100644 index 00000000000..67068a92dbf --- /dev/null +++ b/src/backend/commands/wait.c @@ -0,0 +1,212 @@ +/*------------------------------------------------------------------------- + * + * wait.c + * Implements WAIT FOR, which allows waiting for events such as + * time passing or LSN having been replayed on replica. + * + * Portions Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/commands/wait.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <math.h> + +#include "access/xlogrecovery.h" +#include "access/xlogwait.h" +#include "commands/defrem.h" +#include "commands/wait.h" +#include "executor/executor.h" +#include "parser/parse_node.h" +#include "storage/proc.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/pg_lsn.h" +#include "utils/snapmgr.h" + + +void +ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) +{ + XLogRecPtr lsn; + int64 timeout = 0; + WaitLSNResult waitLSNResult; + bool throw = true; + TupleDesc tupdesc; + TupOutputState *tstate; + const char *result = "<unset>"; + bool timeout_specified = false; + bool no_throw_specified = false; + + /* Parse and validate the mandatory LSN */ + lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, + CStringGetDatum(stmt->lsn_literal))); + + foreach_node(DefElem, defel, stmt->options) + { + if (strcmp(defel->defname, "timeout") == 0) + { + char *timeout_str; + const char *hintmsg; + double result; + + if (timeout_specified) + errorConflictingDefElem(defel, pstate); + timeout_specified = true; + + timeout_str = defGetString(defel); + + if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg)) + { + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid timeout value: \"%s\"", timeout_str), + hintmsg ? errhint("%s", _(hintmsg)) : 0); + } + + /* + * Get rid of any fractional part in the input. This is so we + * don't fail on just-out-of-range values that would round into + * range. + */ + result = rint(result); + + /* Range check */ + if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result))) + ereport(ERROR, + errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("timeout value is out of range")); + + if (result < 0) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("timeout cannot be negative")); + + timeout = (int64) result; + } + else if (strcmp(defel->defname, "no_throw") == 0) + { + if (no_throw_specified) + errorConflictingDefElem(defel, pstate); + + no_throw_specified = true; + + throw = !defGetBoolean(defel); + } + else + { + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("option \"%s\" not recognized", + defel->defname), + parser_errposition(pstate, defel->location)); + } + } + + /* + * We are going to wait for the LSN replay. We should first care that we + * don't hold a snapshot and correspondingly our MyProc->xmin is invalid. + * Otherwise, our snapshot could prevent the replay of WAL records + * implying a kind of self-deadlock. This is the reason why WAIT FOR is a + * command, not a procedure or function. + * + * At first, we should check there is no active snapshot. According to + * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is + * processed with a snapshot. Thankfully, we can pop this snapshot, + * because PortalRunUtility() can tolerate this. + */ + if (ActiveSnapshotSet()) + PopActiveSnapshot(); + + /* + * At second, invalidate a catalog snapshot if any. And we should be done + * with the preparation. + */ + InvalidateCatalogSnapshot(); + + /* Give up if there is still an active or registered snapshot. */ + if (HaveRegisteredOrActiveSnapshot()) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("WAIT FOR must be only called without an active or registered snapshot"), + errdetail("WAIT FOR cannot be executed from a function or a procedure or within a transaction with an isolation level higher than READ COMMITTED.")); + + /* + * As the result we should hold no snapshot, and correspondingly our xmin + * should be unset. + */ + Assert(MyProc->xmin == InvalidTransactionId); + + waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout); + + /* + * Process the result of WaitForLSNReplay(). Throw appropriate error if + * needed. + */ + switch (waitLSNResult) + { + case WAIT_LSN_RESULT_SUCCESS: + /* Nothing to do on success */ + result = "success"; + break; + + case WAIT_LSN_RESULT_TIMEOUT: + if (throw) + ereport(ERROR, + errcode(ERRCODE_QUERY_CANCELED), + errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))); + else + result = "timeout"; + break; + + case WAIT_LSN_RESULT_NOT_IN_RECOVERY: + if (throw) + { + if (PromoteIsTriggered()) + { + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))); + } + else + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errhint("Waiting for the replay LSN can only be executed during recovery.")); + } + else + result = "not in recovery"; + break; + } + + /* need a tuple descriptor representing a single TEXT column */ + tupdesc = WaitStmtResultDesc(stmt); + + /* prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + /* Send it */ + do_text_output_oneline(tstate, result); + + end_tup_output(tstate); +} + +TupleDesc +WaitStmtResultDesc(WaitStmt *stmt) +{ + TupleDesc tupdesc; + + /* Need a tuple descriptor representing a single TEXT column */ + tupdesc = CreateTemplateTupleDesc(1); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0); + return tupdesc; +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 8a0470d5b84..57fe0186547 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -308,7 +308,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt UnlistenStmt UpdateStmt VacuumStmt VariableResetStmt VariableSetStmt VariableShowStmt - ViewStmt CheckPointStmt CreateConversionStmt + ViewStmt WaitStmt CheckPointStmt CreateConversionStmt DeallocateStmt PrepareStmt ExecuteStmt DropOwnedStmt ReassignOwnedStmt AlterTSConfigurationStmt AlterTSDictionaryStmt @@ -325,6 +325,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <boolean> opt_concurrently %type <dbehavior> opt_drop_behavior %type <list> opt_utility_option_list +%type <list> opt_wait_with_clause %type <list> utility_option_list %type <defelt> utility_option_elem %type <str> utility_option_name @@ -678,7 +679,6 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); json_object_constructor_null_clause_opt json_array_constructor_null_clause_opt - /* * Non-keyword token types. These are hard-wired into the "flex" lexer. * They must be listed first so that their numeric codes do not depend on @@ -748,7 +748,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); LABEL LANGUAGE LARGE_P LAST_P LATERAL_P LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL - LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED + LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN_P MAPPING MATCH MATCHED MATERIALIZED MAXVALUE MERGE MERGE_ACTION METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE @@ -792,7 +792,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING VERBOSE VERSION_P VIEW VIEWS VIRTUAL VOLATILE - WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE + WAIT WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE @@ -1120,6 +1120,7 @@ stmt: | VariableSetStmt | VariableShowStmt | ViewStmt + | WaitStmt | /*EMPTY*/ { $$ = NULL; } ; @@ -16482,6 +16483,26 @@ xml_passing_mech: | BY VALUE_P ; +/***************************************************************************** + * + * WAIT FOR LSN + * + *****************************************************************************/ + +WaitStmt: + WAIT FOR LSN_P Sconst opt_wait_with_clause + { + WaitStmt *n = makeNode(WaitStmt); + n->lsn_literal = $4; + n->options = $5; + $$ = (Node *) n; + } + ; + +opt_wait_with_clause: + WITH '(' utility_option_list ')' { $$ = $3; } + | /*EMPTY*/ { $$ = NIL; } + ; /* * Aggregate decoration clauses @@ -17969,6 +17990,7 @@ unreserved_keyword: | LOCK_P | LOCKED | LOGGED + | LSN_P | MAPPING | MATCH | MATCHED @@ -18139,6 +18161,7 @@ unreserved_keyword: | VIEWS | VIRTUAL | VOLATILE + | WAIT | WHITESPACE_P | WITHIN | WITHOUT @@ -18585,6 +18608,7 @@ bare_label_keyword: | LOCK_P | LOCKED | LOGGED + | LSN_P | MAPPING | MATCH | MATCHED @@ -18796,6 +18820,7 @@ bare_label_keyword: | VIEWS | VIRTUAL | VOLATILE + | WAIT | WHEN | WHITESPACE_P | WORK diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 96f29aafc39..26b201eadb8 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -36,6 +36,7 @@ #include "access/transam.h" #include "access/twophase.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -947,6 +948,11 @@ ProcKill(int code, Datum arg) */ LWLockReleaseAll(); + /* + * Cleanup waiting for LSN if any. + */ + WaitLSNCleanup(); + /* Cancel any pending condition variable sleep, too */ ConditionVariableCancelSleep(); diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 74179139fa9..fde78c55160 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -1158,10 +1158,11 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt, MemoryContextSwitchTo(portal->portalContext); /* - * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from - * under us, so don't complain if it's now empty. Otherwise, our snapshot - * should be the top one; pop it. Note that this could be a different - * snapshot from the one we made above; see EnsurePortalSnapshotExists. + * Some utility commands (e.g., VACUUM, WAIT FOR) pop the ActiveSnapshot + * stack from under us, so don't complain if it's now empty. Otherwise, + * our snapshot should be the top one; pop it. Note that this could be a + * different snapshot from the one we made above; see + * EnsurePortalSnapshotExists. */ if (portal->portalSnapshot != NULL && ActiveSnapshotSet()) { @@ -1738,7 +1739,8 @@ PlannedStmtRequiresSnapshot(PlannedStmt *pstmt) IsA(utilityStmt, ListenStmt) || IsA(utilityStmt, NotifyStmt) || IsA(utilityStmt, UnlistenStmt) || - IsA(utilityStmt, CheckPointStmt)) + IsA(utilityStmt, CheckPointStmt) || + IsA(utilityStmt, WaitStmt)) return false; return true; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 918db53dd5e..082967c0a86 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -56,6 +56,7 @@ #include "commands/user.h" #include "commands/vacuum.h" #include "commands/view.h" +#include "commands/wait.h" #include "miscadmin.h" #include "parser/parse_utilcmd.h" #include "postmaster/bgwriter.h" @@ -266,6 +267,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree) case T_PrepareStmt: case T_UnlistenStmt: case T_VariableSetStmt: + case T_WaitStmt: { /* * These modify only backend-local state, so they're OK to run @@ -1055,6 +1057,12 @@ standard_ProcessUtility(PlannedStmt *pstmt, break; } + case T_WaitStmt: + { + ExecWaitStmt(pstate, (WaitStmt *) parsetree, dest); + } + break; + default: /* All other statement types have event trigger support */ ProcessUtilitySlow(pstate, pstmt, queryString, @@ -2059,6 +2067,9 @@ UtilityReturnsTuples(Node *parsetree) case T_VariableShowStmt: return true; + case T_WaitStmt: + return true; + default: return false; } @@ -2114,6 +2125,9 @@ UtilityTupleDescriptor(Node *parsetree) return GetPGVariableResultDesc(n->name); } + case T_WaitStmt: + return WaitStmtResultDesc((WaitStmt *) parsetree); + default: return NULL; } @@ -3091,6 +3105,10 @@ CreateCommandTag(Node *parsetree) } break; + case T_WaitStmt: + tag = CMDTAG_WAIT; + break; + /* already-planned queries */ case T_PlannedStmt: { @@ -3689,6 +3707,10 @@ GetCommandLogLevel(Node *parsetree) lev = LOGSTMT_DDL; break; + case T_WaitStmt: + lev = LOGSTMT_ALL; + break; + /* already-planned queries */ case T_PlannedStmt: { diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h new file mode 100644 index 00000000000..ce332134fb3 --- /dev/null +++ b/src/include/commands/wait.h @@ -0,0 +1,22 @@ +/*------------------------------------------------------------------------- + * + * wait.h + * prototypes for commands/wait.c + * + * Portions Copyright (c) 2025, PostgreSQL Global Development Group + * + * src/include/commands/wait.h + * + *------------------------------------------------------------------------- + */ +#ifndef WAIT_H +#define WAIT_H + +#include "nodes/parsenodes.h" +#include "parser/parse_node.h" +#include "tcop/dest.h" + +extern void ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest); +extern TupleDesc WaitStmtResultDesc(WaitStmt *stmt); + +#endif /* WAIT_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index ecbddd12e1b..d14294a4ece 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4385,4 +4385,12 @@ typedef struct DropSubscriptionStmt DropBehavior behavior; /* RESTRICT or CASCADE behavior */ } DropSubscriptionStmt; +typedef struct WaitStmt +{ + NodeTag type; + char *lsn_literal; /* LSN string from grammar */ + List *options; /* List of DefElem nodes */ +} WaitStmt; + + #endif /* PARSENODES_H */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 84182eaaae2..5d4fe27ef96 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -270,6 +270,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("lsn", LSN_P, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("matched", MATCHED, UNRESERVED_KEYWORD, BARE_LABEL) @@ -496,6 +497,7 @@ PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("virtual", VIRTUAL, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("when", WHEN, RESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("where", WHERE, RESERVED_KEYWORD, AS_LABEL) PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD, BARE_LABEL) diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h index d250a714d59..c4606d65043 100644 --- a/src/include/tcop/cmdtaglist.h +++ b/src/include/tcop/cmdtaglist.h @@ -217,3 +217,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false) PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false) PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true) PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false) +PG_CMDTAG(CMDTAG_WAIT, "WAIT", false, false, false) diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 52993c32dbb..523a5cd5b52 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -56,7 +56,8 @@ tests += { 't/045_archive_restartpoint.pl', 't/046_checkpoint_logical_slot.pl', 't/047_checkpoint_physical_slot.pl', - 't/048_vacuum_horizon_floor.pl' + 't/048_vacuum_horizon_floor.pl', + 't/049_wait_for_lsn.pl', ], }, } diff --git a/src/test/recovery/t/049_wait_for_lsn.pl b/src/test/recovery/t/049_wait_for_lsn.pl new file mode 100644 index 00000000000..e0ddb06a2f0 --- /dev/null +++ b/src/test/recovery/t/049_wait_for_lsn.pl @@ -0,0 +1,302 @@ +# Checks waiting for the LSN replay on standby using +# the WAIT FOR command. +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize primary node +my $node_primary = PostgreSQL::Test::Cluster->new('primary'); +$node_primary->init(allows_streaming => 1); +$node_primary->start; + +# And some content and take a backup +$node_primary->safe_psql('postgres', + "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a"); +my $backup_name = 'my_backup'; +$node_primary->backup($backup_name); + +# Create a streaming standby with a 1 second delay from the backup +my $node_standby = PostgreSQL::Test::Cluster->new('standby'); +my $delay = 1; +$node_standby->init_from_backup($node_primary, $backup_name, + has_streaming => 1); +$node_standby->append_conf( + 'postgresql.conf', qq[ + recovery_min_apply_delay = '${delay}s' +]); +$node_standby->start; + +# 1. Make sure that WAIT FOR works: add new content to +# primary and memorize primary's insert LSN, then wait for that LSN to be +# replayed on standby. +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(11, 20))"); +my $lsn1 = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +my $output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn1}' WITH (timeout '1d'); + SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn); +]); + +# Make sure the current LSN on standby is at least as big as the LSN we +# observed on primary's before. +ok((split("\n", $output))[-1] >= 0, + "standby reached the same LSN as primary after WAIT FOR"); + +# 2. Check that new data is visible after calling WAIT FOR +$node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (generate_series(21, 30))"); +my $lsn2 = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn2}'; + SELECT count(*) FROM wait_test; +]); + +# Make sure the count(*) on standby reflects the recent changes on primary +ok((split("\n", $output))[-1] eq 30, + "standby reached the same LSN as primary"); + +# 3. Check that waiting for unreachable LSN triggers the timeout. The +# unreachable LSN must be well in advance. So WAL records issued by +# the concurrent autovacuum could not affect that. +my $lsn3 = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn() + 10000000000"); +my $stderr; +$node_standby->safe_psql('postgres', + "WAIT FOR LSN '${lsn2}' WITH (timeout '10ms');"); +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${lsn3}' WITH (timeout '1000ms');", + stderr => \$stderr); +ok( $stderr =~ /timed out while waiting for target LSN/, + "get timeout on waiting for unreachable LSN"); + +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn2}' WITH (timeout '0.1s', no_throw);]); +ok($output eq "success", + "WAIT FOR returns correct status after successful waiting"); +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn3}' WITH (timeout '10ms', no_throw);]); +ok($output eq "timeout", "WAIT FOR returns correct status after timeout"); + +# 4. Check that WAIT FOR triggers an error if called on primary, +# within another function, or inside a transaction with an isolation level +# higher than READ COMMITTED. + +$node_primary->psql('postgres', "WAIT FOR LSN '${lsn3}';", + stderr => \$stderr); +ok( $stderr =~ /recovery is not in progress/, + "get an error when running on the primary"); + +$node_standby->psql( + 'postgres', + "BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT 1; WAIT FOR LSN '${lsn3}';", + stderr => \$stderr); +ok( $stderr =~ + /WAIT FOR must be only called without an active or registered snapshot/, + "get an error when running in a transaction with an isolation level higher than REPEATABLE READ" +); + +$node_primary->safe_psql( + 'postgres', qq[ +CREATE FUNCTION pg_wal_replay_wait_wrap(target_lsn pg_lsn) RETURNS void AS \$\$ + BEGIN + EXECUTE format('WAIT FOR LSN %L;', target_lsn); + END +\$\$ +LANGUAGE plpgsql; +]); + +$node_primary->wait_for_catchup($node_standby); +$node_standby->psql( + 'postgres', + "SELECT pg_wal_replay_wait_wrap('${lsn3}');", + stderr => \$stderr); +ok( $stderr =~ + /WAIT FOR must be only called without an active or registered snapshot/, + "get an error when running within another function"); + +# 5. Check parameter validation error cases on standby before promotion +my $test_lsn = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); + +# Test negative timeout +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' WITH (timeout '-1000ms');", + stderr => \$stderr); +ok($stderr =~ /timeout cannot be negative/, "get error for negative timeout"); + +# Test unknown parameter with WITH clause +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' WITH (unknown_param 'value');", + stderr => \$stderr); +ok($stderr =~ /option "unknown_param" not recognized/, + "get error for unknown parameter"); + +# Test duplicate TIMEOUT parameter with WITH clause +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' WITH (timeout '1000', timeout '2000');", + stderr => \$stderr); +ok( $stderr =~ /conflicting or redundant options/, + "get error for duplicate TIMEOUT parameter"); + +# Test duplicate NO_THROW parameter with WITH clause +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' WITH (no_throw, no_throw);", + stderr => \$stderr); +ok( $stderr =~ /conflicting or redundant options/, + "get error for duplicate NO_THROW parameter"); + +# Test syntax error - options without WITH keyword +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' (timeout '100ms');", + stderr => \$stderr); +ok($stderr =~ /syntax error/, + "get syntax error when options specified without WITH keyword"); + +# Test syntax error - missing LSN +$node_standby->psql('postgres', "WAIT FOR TIMEOUT 1000;", stderr => \$stderr); +ok($stderr =~ /syntax error/, "get syntax error for missing LSN"); + +# Test invalid LSN format +$node_standby->psql( + 'postgres', + "WAIT FOR LSN 'invalid_lsn';", + stderr => \$stderr); +ok($stderr =~ /invalid input syntax for type pg_lsn/, + "get error for invalid LSN format"); + +# Test invalid timeout format +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' WITH (timeout 'invalid');", + stderr => \$stderr); +ok($stderr =~ /invalid timeout value/, + "get error for invalid timeout format"); + +# Test new WITH clause syntax +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn2}' WITH (timeout '0.1s', no_throw);]); +ok($output eq "success", "WAIT FOR WITH clause syntax works correctly"); + +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn3}' WITH (timeout 100, no_throw);]); +ok($output eq "timeout", + "WAIT FOR WITH clause returns correct timeout status"); + +# Test WITH clause error case - invalid option +$node_standby->psql( + 'postgres', + "WAIT FOR LSN '${test_lsn}' WITH (invalid_option 'value');", + stderr => \$stderr); +ok( $stderr =~ /option "invalid_option" not recognized/, + "get error for invalid WITH clause option"); + +# 6. Check the scenario of multiple LSN waiters. We make 5 background +# psql sessions each waiting for a corresponding insertion. When waiting is +# finished, stored procedures logs if there are visible as many rows as +# should be. +$node_primary->safe_psql( + 'postgres', qq[ +CREATE FUNCTION log_count(i int) RETURNS void AS \$\$ + DECLARE + count int; + BEGIN + SELECT count(*) FROM wait_test INTO count; + IF count >= 31 + i THEN + RAISE LOG 'count %', i; + END IF; + END +\$\$ +LANGUAGE plpgsql; +]); +$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();"); +my @psql_sessions; +for (my $i = 0; $i < 5; $i++) +{ + $node_primary->safe_psql('postgres', + "INSERT INTO wait_test VALUES (${i});"); + my $lsn = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn()"); + $psql_sessions[$i] = $node_standby->background_psql('postgres'); + $psql_sessions[$i]->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '${lsn}'; + SELECT log_count(${i}); + ]); +} +my $log_offset = -s $node_standby->logfile; +$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();"); +for (my $i = 0; $i < 5; $i++) +{ + $node_standby->wait_for_log("count ${i}", $log_offset); + $psql_sessions[$i]->quit; +} + +ok(1, 'multiple LSN waiters reported consistent data'); + +# 7. Check that the standby promotion terminates the wait on LSN. Start +# waiting for an unreachable LSN then promote. Check the log for the relevant +# error message. Also, check that waiting for already replayed LSN doesn't +# cause an error even after promotion. +my $lsn4 = + $node_primary->safe_psql('postgres', + "SELECT pg_current_wal_insert_lsn() + 10000000000"); +my $lsn5 = + $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()"); +my $psql_session = $node_standby->background_psql('postgres'); +$psql_session->query_until( + qr/start/, qq[ + \\echo start + WAIT FOR LSN '${lsn4}'; +]); + +# Make sure standby will be promoted at least at the primary insert LSN we +# have just observed. Use pg_switch_wal() to force the insert LSN to be +# written then wait for standby to catchup. +$node_primary->safe_psql('postgres', 'SELECT pg_switch_wal();'); +$node_primary->wait_for_catchup($node_standby); + +$log_offset = -s $node_standby->logfile; +$node_standby->promote; +$node_standby->wait_for_log('recovery is not in progress', $log_offset); + +ok(1, 'got error after standby promote'); + +$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';"); + +ok(1, 'wait for already replayed LSN exits immediately even after promotion'); + +$output = $node_standby->safe_psql( + 'postgres', qq[ + WAIT FOR LSN '${lsn4}' WITH (timeout '10ms', no_throw);]); +ok($output eq "not in recovery", + "WAIT FOR returns correct status after standby promotion"); + + +$node_standby->stop; +$node_primary->stop; + +# If we send \q with $psql_session->quit the command can be sent to the session +# already closed. So \q is in initial script, here we only finish IPC::Run. +$psql_session->{run}->finish; + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 73df31344be..432509277c9 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3272,6 +3272,7 @@ WaitLSNState WaitLSNProcInfo WaitLSNResult WaitPMResult +WaitStmt WalCloseMethod WalCompression WalInsertClass |
