summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xact.c6
-rw-r--r--src/backend/access/transam/xlog.c7
-rw-r--r--src/backend/access/transam/xlogrecovery.c11
-rw-r--r--src/backend/commands/Makefile3
-rw-r--r--src/backend/commands/meson.build1
-rw-r--r--src/backend/commands/wait.c212
-rw-r--r--src/backend/parser/gram.y33
-rw-r--r--src/backend/storage/lmgr/proc.c6
-rw-r--r--src/backend/tcop/pquery.c12
-rw-r--r--src/backend/tcop/utility.c22
-rw-r--r--src/include/commands/wait.h22
-rw-r--r--src/include/nodes/parsenodes.h8
-rw-r--r--src/include/parser/kwlist.h2
-rw-r--r--src/include/tcop/cmdtaglist.h1
-rw-r--r--src/test/recovery/meson.build3
-rw-r--r--src/test/recovery/t/049_wait_for_lsn.pl302
-rw-r--r--src/tools/pgindent/typedefs.list1
17 files changed, 641 insertions, 11 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 2cf3d4e92b7..092e197eba3 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -31,6 +31,7 @@
#include "access/xloginsert.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/pg_enum.h"
@@ -2843,6 +2844,11 @@ AbortTransaction(void)
*/
LWLockReleaseAll();
+ /*
+ * Cleanup waiting for LSN if any.
+ */
+ WaitLSNCleanup();
+
/* Clear wait information and command progress indicator */
pgstat_report_wait_end();
pgstat_progress_end_command();
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 9900e3e0179..7c959051e11 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -62,6 +62,7 @@
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "backup/basebackup.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
@@ -6228,6 +6229,12 @@ StartupXLOG(void)
LWLockRelease(ControlFileLock);
/*
+ * Wake up all waiters for replay LSN. They need to report an error that
+ * recovery was ended before reaching the target LSN.
+ */
+ WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr);
+
+ /*
* Shutdown the recovery environment. This must occur after
* RecoverPreparedTransactions() (see notes in lock_twophase_recover())
* and after switching SharedRecoveryState to RECOVERY_STATE_DONE so as
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 93c50831b26..550de6e4a59 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -40,6 +40,7 @@
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
@@ -1838,6 +1839,16 @@ PerformWalRecovery(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for then walk
+ * over the shared memory array and set latches to notify the
+ * waiters.
+ */
+ if (waitLSNState &&
+ (XLogRecoveryCtl->lastReplayedEndRecPtr >=
+ pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY])))
+ WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr);
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index cb2fbdc7c60..f99acfd2b4b 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -64,6 +64,7 @@ OBJS = \
vacuum.o \
vacuumparallel.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index dd4cde41d32..9f640ad4810 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -53,4 +53,5 @@ backend_sources += files(
'vacuumparallel.c',
'variable.c',
'view.c',
+ 'wait.c',
)
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 00000000000..67068a92dbf
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,212 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements WAIT FOR, which allows waiting for events such as
+ * time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
+#include "commands/defrem.h"
+#include "commands/wait.h"
+#include "executor/executor.h"
+#include "parser/parse_node.h"
+#include "storage/proc.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+
+
+void
+ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
+{
+ XLogRecPtr lsn;
+ int64 timeout = 0;
+ WaitLSNResult waitLSNResult;
+ bool throw = true;
+ TupleDesc tupdesc;
+ TupOutputState *tstate;
+ const char *result = "<unset>";
+ bool timeout_specified = false;
+ bool no_throw_specified = false;
+
+ /* Parse and validate the mandatory LSN */
+ lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(stmt->lsn_literal)));
+
+ foreach_node(DefElem, defel, stmt->options)
+ {
+ if (strcmp(defel->defname, "timeout") == 0)
+ {
+ char *timeout_str;
+ const char *hintmsg;
+ double result;
+
+ if (timeout_specified)
+ errorConflictingDefElem(defel, pstate);
+ timeout_specified = true;
+
+ timeout_str = defGetString(defel);
+
+ if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid timeout value: \"%s\"", timeout_str),
+ hintmsg ? errhint("%s", _(hintmsg)) : 0);
+ }
+
+ /*
+ * Get rid of any fractional part in the input. This is so we
+ * don't fail on just-out-of-range values that would round into
+ * range.
+ */
+ result = rint(result);
+
+ /* Range check */
+ if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
+ ereport(ERROR,
+ errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("timeout value is out of range"));
+
+ if (result < 0)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("timeout cannot be negative"));
+
+ timeout = (int64) result;
+ }
+ else if (strcmp(defel->defname, "no_throw") == 0)
+ {
+ if (no_throw_specified)
+ errorConflictingDefElem(defel, pstate);
+
+ no_throw_specified = true;
+
+ throw = !defGetBoolean(defel);
+ }
+ else
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("option \"%s\" not recognized",
+ defel->defname),
+ parser_errposition(pstate, defel->location));
+ }
+ }
+
+ /*
+ * We are going to wait for the LSN replay. We should first care that we
+ * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
+ * Otherwise, our snapshot could prevent the replay of WAL records
+ * implying a kind of self-deadlock. This is the reason why WAIT FOR is a
+ * command, not a procedure or function.
+ *
+ * At first, we should check there is no active snapshot. According to
+ * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
+ * processed with a snapshot. Thankfully, we can pop this snapshot,
+ * because PortalRunUtility() can tolerate this.
+ */
+ if (ActiveSnapshotSet())
+ PopActiveSnapshot();
+
+ /*
+ * At second, invalidate a catalog snapshot if any. And we should be done
+ * with the preparation.
+ */
+ InvalidateCatalogSnapshot();
+
+ /* Give up if there is still an active or registered snapshot. */
+ if (HaveRegisteredOrActiveSnapshot())
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("WAIT FOR must be only called without an active or registered snapshot"),
+ errdetail("WAIT FOR cannot be executed from a function or a procedure or within a transaction with an isolation level higher than READ COMMITTED."));
+
+ /*
+ * As the result we should hold no snapshot, and correspondingly our xmin
+ * should be unset.
+ */
+ Assert(MyProc->xmin == InvalidTransactionId);
+
+ waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
+
+ /*
+ * Process the result of WaitForLSNReplay(). Throw appropriate error if
+ * needed.
+ */
+ switch (waitLSNResult)
+ {
+ case WAIT_LSN_RESULT_SUCCESS:
+ /* Nothing to do on success */
+ result = "success";
+ break;
+
+ case WAIT_LSN_RESULT_TIMEOUT:
+ if (throw)
+ ereport(ERROR,
+ errcode(ERRCODE_QUERY_CANCELED),
+ errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+ else
+ result = "timeout";
+ break;
+
+ case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
+ if (throw)
+ {
+ if (PromoteIsTriggered())
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+ }
+ else
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errhint("Waiting for the replay LSN can only be executed during recovery."));
+ }
+ else
+ result = "not in recovery";
+ break;
+ }
+
+ /* need a tuple descriptor representing a single TEXT column */
+ tupdesc = WaitStmtResultDesc(stmt);
+
+ /* prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+
+ /* Send it */
+ do_text_output_oneline(tstate, result);
+
+ end_tup_output(tstate);
+}
+
+TupleDesc
+WaitStmtResultDesc(WaitStmt *stmt)
+{
+ TupleDesc tupdesc;
+
+ /* Need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+ TEXTOID, -1, 0);
+ return tupdesc;
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 8a0470d5b84..57fe0186547 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -308,7 +308,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt
UnlistenStmt UpdateStmt VacuumStmt
VariableResetStmt VariableSetStmt VariableShowStmt
- ViewStmt CheckPointStmt CreateConversionStmt
+ ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
DeallocateStmt PrepareStmt ExecuteStmt
DropOwnedStmt ReassignOwnedStmt
AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -325,6 +325,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <boolean> opt_concurrently
%type <dbehavior> opt_drop_behavior
%type <list> opt_utility_option_list
+%type <list> opt_wait_with_clause
%type <list> utility_option_list
%type <defelt> utility_option_elem
%type <str> utility_option_name
@@ -678,7 +679,6 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
json_object_constructor_null_clause_opt
json_array_constructor_null_clause_opt
-
/*
* Non-keyword token types. These are hard-wired into the "flex" lexer.
* They must be listed first so that their numeric codes do not depend on
@@ -748,7 +748,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
- LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+ LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN_P
MAPPING MATCH MATCHED MATERIALIZED MAXVALUE MERGE MERGE_ACTION METHOD
MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -792,7 +792,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VIRTUAL VOLATILE
- WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+ WAIT WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -1120,6 +1120,7 @@ stmt:
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | WaitStmt
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -16482,6 +16483,26 @@ xml_passing_mech:
| BY VALUE_P
;
+/*****************************************************************************
+ *
+ * WAIT FOR LSN
+ *
+ *****************************************************************************/
+
+WaitStmt:
+ WAIT FOR LSN_P Sconst opt_wait_with_clause
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn_literal = $4;
+ n->options = $5;
+ $$ = (Node *) n;
+ }
+ ;
+
+opt_wait_with_clause:
+ WITH '(' utility_option_list ')' { $$ = $3; }
+ | /*EMPTY*/ { $$ = NIL; }
+ ;
/*
* Aggregate decoration clauses
@@ -17969,6 +17990,7 @@ unreserved_keyword:
| LOCK_P
| LOCKED
| LOGGED
+ | LSN_P
| MAPPING
| MATCH
| MATCHED
@@ -18139,6 +18161,7 @@ unreserved_keyword:
| VIEWS
| VIRTUAL
| VOLATILE
+ | WAIT
| WHITESPACE_P
| WITHIN
| WITHOUT
@@ -18585,6 +18608,7 @@ bare_label_keyword:
| LOCK_P
| LOCKED
| LOGGED
+ | LSN_P
| MAPPING
| MATCH
| MATCHED
@@ -18796,6 +18820,7 @@ bare_label_keyword:
| VIEWS
| VIRTUAL
| VOLATILE
+ | WAIT
| WHEN
| WHITESPACE_P
| WORK
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 96f29aafc39..26b201eadb8 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -947,6 +948,11 @@ ProcKill(int code, Datum arg)
*/
LWLockReleaseAll();
+ /*
+ * Cleanup waiting for LSN if any.
+ */
+ WaitLSNCleanup();
+
/* Cancel any pending condition variable sleep, too */
ConditionVariableCancelSleep();
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 74179139fa9..fde78c55160 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -1158,10 +1158,11 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt,
MemoryContextSwitchTo(portal->portalContext);
/*
- * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from
- * under us, so don't complain if it's now empty. Otherwise, our snapshot
- * should be the top one; pop it. Note that this could be a different
- * snapshot from the one we made above; see EnsurePortalSnapshotExists.
+ * Some utility commands (e.g., VACUUM, WAIT FOR) pop the ActiveSnapshot
+ * stack from under us, so don't complain if it's now empty. Otherwise,
+ * our snapshot should be the top one; pop it. Note that this could be a
+ * different snapshot from the one we made above; see
+ * EnsurePortalSnapshotExists.
*/
if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
{
@@ -1738,7 +1739,8 @@ PlannedStmtRequiresSnapshot(PlannedStmt *pstmt)
IsA(utilityStmt, ListenStmt) ||
IsA(utilityStmt, NotifyStmt) ||
IsA(utilityStmt, UnlistenStmt) ||
- IsA(utilityStmt, CheckPointStmt))
+ IsA(utilityStmt, CheckPointStmt) ||
+ IsA(utilityStmt, WaitStmt))
return false;
return true;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 918db53dd5e..082967c0a86 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -56,6 +56,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -266,6 +267,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
case T_PrepareStmt:
case T_UnlistenStmt:
case T_VariableSetStmt:
+ case T_WaitStmt:
{
/*
* These modify only backend-local state, so they're OK to run
@@ -1055,6 +1057,12 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
}
+ case T_WaitStmt:
+ {
+ ExecWaitStmt(pstate, (WaitStmt *) parsetree, dest);
+ }
+ break;
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2059,6 +2067,9 @@ UtilityReturnsTuples(Node *parsetree)
case T_VariableShowStmt:
return true;
+ case T_WaitStmt:
+ return true;
+
default:
return false;
}
@@ -2114,6 +2125,9 @@ UtilityTupleDescriptor(Node *parsetree)
return GetPGVariableResultDesc(n->name);
}
+ case T_WaitStmt:
+ return WaitStmtResultDesc((WaitStmt *) parsetree);
+
default:
return NULL;
}
@@ -3091,6 +3105,10 @@ CreateCommandTag(Node *parsetree)
}
break;
+ case T_WaitStmt:
+ tag = CMDTAG_WAIT;
+ break;
+
/* already-planned queries */
case T_PlannedStmt:
{
@@ -3689,6 +3707,10 @@ GetCommandLogLevel(Node *parsetree)
lev = LOGSTMT_DDL;
break;
+ case T_WaitStmt:
+ lev = LOGSTMT_ALL;
+ break;
+
/* already-planned queries */
case T_PlannedStmt:
{
diff --git a/src/include/commands/wait.h b/src/include/commands/wait.h
new file mode 100644
index 00000000000..ce332134fb3
--- /dev/null
+++ b/src/include/commands/wait.h
@@ -0,0 +1,22 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.h
+ * prototypes for commands/wait.c
+ *
+ * Portions Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * src/include/commands/wait.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAIT_H
+#define WAIT_H
+
+#include "nodes/parsenodes.h"
+#include "parser/parse_node.h"
+#include "tcop/dest.h"
+
+extern void ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest);
+extern TupleDesc WaitStmtResultDesc(WaitStmt *stmt);
+
+#endif /* WAIT_H */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index ecbddd12e1b..d14294a4ece 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4385,4 +4385,12 @@ typedef struct DropSubscriptionStmt
DropBehavior behavior; /* RESTRICT or CASCADE behavior */
} DropSubscriptionStmt;
+typedef struct WaitStmt
+{
+ NodeTag type;
+ char *lsn_literal; /* LSN string from grammar */
+ List *options; /* List of DefElem nodes */
+} WaitStmt;
+
+
#endif /* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 84182eaaae2..5d4fe27ef96 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -270,6 +270,7 @@ PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("lsn", LSN_P, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("matched", MATCHED, UNRESERVED_KEYWORD, BARE_LABEL)
@@ -496,6 +497,7 @@ PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("virtual", VIRTUAL, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("wait", WAIT, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("when", WHEN, RESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("where", WHERE, RESERVED_KEYWORD, AS_LABEL)
PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/include/tcop/cmdtaglist.h b/src/include/tcop/cmdtaglist.h
index d250a714d59..c4606d65043 100644
--- a/src/include/tcop/cmdtaglist.h
+++ b/src/include/tcop/cmdtaglist.h
@@ -217,3 +217,4 @@ PG_CMDTAG(CMDTAG_TRUNCATE_TABLE, "TRUNCATE TABLE", false, false, false)
PG_CMDTAG(CMDTAG_UNLISTEN, "UNLISTEN", false, false, false)
PG_CMDTAG(CMDTAG_UPDATE, "UPDATE", false, false, true)
PG_CMDTAG(CMDTAG_VACUUM, "VACUUM", false, false, false)
+PG_CMDTAG(CMDTAG_WAIT, "WAIT", false, false, false)
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 52993c32dbb..523a5cd5b52 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -56,7 +56,8 @@ tests += {
't/045_archive_restartpoint.pl',
't/046_checkpoint_logical_slot.pl',
't/047_checkpoint_physical_slot.pl',
- 't/048_vacuum_horizon_floor.pl'
+ 't/048_vacuum_horizon_floor.pl',
+ 't/049_wait_for_lsn.pl',
],
},
}
diff --git a/src/test/recovery/t/049_wait_for_lsn.pl b/src/test/recovery/t/049_wait_for_lsn.pl
new file mode 100644
index 00000000000..e0ddb06a2f0
--- /dev/null
+++ b/src/test/recovery/t/049_wait_for_lsn.pl
@@ -0,0 +1,302 @@
+# Checks waiting for the LSN replay on standby using
+# the WAIT FOR command.
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize primary node
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# And some content and take a backup
+$node_primary->safe_psql('postgres',
+ "CREATE TABLE wait_test AS SELECT generate_series(1,10) AS a");
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Create a streaming standby with a 1 second delay from the backup
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $delay = 1;
+$node_standby->init_from_backup($node_primary, $backup_name,
+ has_streaming => 1);
+$node_standby->append_conf(
+ 'postgresql.conf', qq[
+ recovery_min_apply_delay = '${delay}s'
+]);
+$node_standby->start;
+
+# 1. Make sure that WAIT FOR works: add new content to
+# primary and memorize primary's insert LSN, then wait for that LSN to be
+# replayed on standby.
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(11, 20))");
+my $lsn1 =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+my $output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn1}' WITH (timeout '1d');
+ SELECT pg_lsn_cmp(pg_last_wal_replay_lsn(), '${lsn1}'::pg_lsn);
+]);
+
+# Make sure the current LSN on standby is at least as big as the LSN we
+# observed on primary's before.
+ok((split("\n", $output))[-1] >= 0,
+ "standby reached the same LSN as primary after WAIT FOR");
+
+# 2. Check that new data is visible after calling WAIT FOR
+$node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (generate_series(21, 30))");
+my $lsn2 =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn2}';
+ SELECT count(*) FROM wait_test;
+]);
+
+# Make sure the count(*) on standby reflects the recent changes on primary
+ok((split("\n", $output))[-1] eq 30,
+ "standby reached the same LSN as primary");
+
+# 3. Check that waiting for unreachable LSN triggers the timeout. The
+# unreachable LSN must be well in advance. So WAL records issued by
+# the concurrent autovacuum could not affect that.
+my $lsn3 =
+ $node_primary->safe_psql('postgres',
+ "SELECT pg_current_wal_insert_lsn() + 10000000000");
+my $stderr;
+$node_standby->safe_psql('postgres',
+ "WAIT FOR LSN '${lsn2}' WITH (timeout '10ms');");
+$node_standby->psql(
+ 'postgres',
+ "WAIT FOR LSN '${lsn3}' WITH (timeout '1000ms');",
+ stderr => \$stderr);
+ok( $stderr =~ /timed out while waiting for target LSN/,
+ "get timeout on waiting for unreachable LSN");
+
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn2}' WITH (timeout '0.1s', no_throw);]);
+ok($output eq "success",
+ "WAIT FOR returns correct status after successful waiting");
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn3}' WITH (timeout '10ms', no_throw);]);
+ok($output eq "timeout", "WAIT FOR returns correct status after timeout");
+
+# 4. Check that WAIT FOR triggers an error if called on primary,
+# within another function, or inside a transaction with an isolation level
+# higher than READ COMMITTED.
+
+$node_primary->psql('postgres', "WAIT FOR LSN '${lsn3}';",
+ stderr => \$stderr);
+ok( $stderr =~ /recovery is not in progress/,
+ "get an error when running on the primary");
+
+$node_standby->psql(
+ 'postgres',
+ "BEGIN ISOLATION LEVEL REPEATABLE READ; SELECT 1; WAIT FOR LSN '${lsn3}';",
+ stderr => \$stderr);
+ok( $stderr =~
+ /WAIT FOR must be only called without an active or registered snapshot/,
+ "get an error when running in a transaction with an isolation level higher than REPEATABLE READ"
+);
+
+$node_primary->safe_psql(
+ 'postgres', qq[
+CREATE FUNCTION pg_wal_replay_wait_wrap(target_lsn pg_lsn) RETURNS void AS \$\$
+ BEGIN
+ EXECUTE format('WAIT FOR LSN %L;', target_lsn);
+ END
+\$\$
+LANGUAGE plpgsql;
+]);
+
+$node_primary->wait_for_catchup($node_standby);
+$node_standby->psql(
+ 'postgres',
+ "SELECT pg_wal_replay_wait_wrap('${lsn3}');",
+ stderr => \$stderr);
+ok( $stderr =~
+ /WAIT FOR must be only called without an active or registered snapshot/,
+ "get an error when running within another function");
+
+# 5. Check parameter validation error cases on standby before promotion
+my $test_lsn =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+
+# Test negative timeout
+$node_standby->psql(
+ 'postgres',
+ "WAIT FOR LSN '${test_lsn}' WITH (timeout '-1000ms');",
+ stderr => \$stderr);
+ok($stderr =~ /timeout cannot be negative/, "get error for negative timeout");
+
+# Test unknown parameter with WITH clause
+$node_standby->psql(
+ 'postgres',
+ "WAIT FOR LSN '${test_lsn}' WITH (unknown_param 'value');",
+ stderr => \$stderr);
+ok($stderr =~ /option "unknown_param" not recognized/,
+ "get error for unknown parameter");
+
+# Test duplicate TIMEOUT parameter with WITH clause
+$node_standby->psql(
+ 'postgres',
+ "WAIT FOR LSN '${test_lsn}' WITH (timeout '1000', timeout '2000');",
+ stderr => \$stderr);
+ok( $stderr =~ /conflicting or redundant options/,
+ "get error for duplicate TIMEOUT parameter");
+
+# Test duplicate NO_THROW parameter with WITH clause
+$node_standby->psql(
+ 'postgres',
+ "WAIT FOR LSN '${test_lsn}' WITH (no_throw, no_throw);",
+ stderr => \$stderr);
+ok( $stderr =~ /conflicting or redundant options/,
+ "get error for duplicate NO_THROW parameter");
+
+# Test syntax error - options without WITH keyword
+$node_standby->psql(
+ 'postgres',
+ "WAIT FOR LSN '${test_lsn}' (timeout '100ms');",
+ stderr => \$stderr);
+ok($stderr =~ /syntax error/,
+ "get syntax error when options specified without WITH keyword");
+
+# Test syntax error - missing LSN
+$node_standby->psql('postgres', "WAIT FOR TIMEOUT 1000;", stderr => \$stderr);
+ok($stderr =~ /syntax error/, "get syntax error for missing LSN");
+
+# Test invalid LSN format
+$node_standby->psql(
+ 'postgres',
+ "WAIT FOR LSN 'invalid_lsn';",
+ stderr => \$stderr);
+ok($stderr =~ /invalid input syntax for type pg_lsn/,
+ "get error for invalid LSN format");
+
+# Test invalid timeout format
+$node_standby->psql(
+ 'postgres',
+ "WAIT FOR LSN '${test_lsn}' WITH (timeout 'invalid');",
+ stderr => \$stderr);
+ok($stderr =~ /invalid timeout value/,
+ "get error for invalid timeout format");
+
+# Test new WITH clause syntax
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn2}' WITH (timeout '0.1s', no_throw);]);
+ok($output eq "success", "WAIT FOR WITH clause syntax works correctly");
+
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn3}' WITH (timeout 100, no_throw);]);
+ok($output eq "timeout",
+ "WAIT FOR WITH clause returns correct timeout status");
+
+# Test WITH clause error case - invalid option
+$node_standby->psql(
+ 'postgres',
+ "WAIT FOR LSN '${test_lsn}' WITH (invalid_option 'value');",
+ stderr => \$stderr);
+ok( $stderr =~ /option "invalid_option" not recognized/,
+ "get error for invalid WITH clause option");
+
+# 6. Check the scenario of multiple LSN waiters. We make 5 background
+# psql sessions each waiting for a corresponding insertion. When waiting is
+# finished, stored procedures logs if there are visible as many rows as
+# should be.
+$node_primary->safe_psql(
+ 'postgres', qq[
+CREATE FUNCTION log_count(i int) RETURNS void AS \$\$
+ DECLARE
+ count int;
+ BEGIN
+ SELECT count(*) FROM wait_test INTO count;
+ IF count >= 31 + i THEN
+ RAISE LOG 'count %', i;
+ END IF;
+ END
+\$\$
+LANGUAGE plpgsql;
+]);
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
+my @psql_sessions;
+for (my $i = 0; $i < 5; $i++)
+{
+ $node_primary->safe_psql('postgres',
+ "INSERT INTO wait_test VALUES (${i});");
+ my $lsn =
+ $node_primary->safe_psql('postgres',
+ "SELECT pg_current_wal_insert_lsn()");
+ $psql_sessions[$i] = $node_standby->background_psql('postgres');
+ $psql_sessions[$i]->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR LSN '${lsn}';
+ SELECT log_count(${i});
+ ]);
+}
+my $log_offset = -s $node_standby->logfile;
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
+for (my $i = 0; $i < 5; $i++)
+{
+ $node_standby->wait_for_log("count ${i}", $log_offset);
+ $psql_sessions[$i]->quit;
+}
+
+ok(1, 'multiple LSN waiters reported consistent data');
+
+# 7. Check that the standby promotion terminates the wait on LSN. Start
+# waiting for an unreachable LSN then promote. Check the log for the relevant
+# error message. Also, check that waiting for already replayed LSN doesn't
+# cause an error even after promotion.
+my $lsn4 =
+ $node_primary->safe_psql('postgres',
+ "SELECT pg_current_wal_insert_lsn() + 10000000000");
+my $lsn5 =
+ $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+my $psql_session = $node_standby->background_psql('postgres');
+$psql_session->query_until(
+ qr/start/, qq[
+ \\echo start
+ WAIT FOR LSN '${lsn4}';
+]);
+
+# Make sure standby will be promoted at least at the primary insert LSN we
+# have just observed. Use pg_switch_wal() to force the insert LSN to be
+# written then wait for standby to catchup.
+$node_primary->safe_psql('postgres', 'SELECT pg_switch_wal();');
+$node_primary->wait_for_catchup($node_standby);
+
+$log_offset = -s $node_standby->logfile;
+$node_standby->promote;
+$node_standby->wait_for_log('recovery is not in progress', $log_offset);
+
+ok(1, 'got error after standby promote');
+
+$node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';");
+
+ok(1, 'wait for already replayed LSN exits immediately even after promotion');
+
+$output = $node_standby->safe_psql(
+ 'postgres', qq[
+ WAIT FOR LSN '${lsn4}' WITH (timeout '10ms', no_throw);]);
+ok($output eq "not in recovery",
+ "WAIT FOR returns correct status after standby promotion");
+
+
+$node_standby->stop;
+$node_primary->stop;
+
+# If we send \q with $psql_session->quit the command can be sent to the session
+# already closed. So \q is in initial script, here we only finish IPC::Run.
+$psql_session->{run}->finish;
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 73df31344be..432509277c9 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3272,6 +3272,7 @@ WaitLSNState
WaitLSNProcInfo
WaitLSNResult
WaitPMResult
+WaitStmt
WalCloseMethod
WalCompression
WalInsertClass