summaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/access/transam/xact.c6
-rw-r--r--src/backend/access/transam/xlog.c7
-rw-r--r--src/backend/access/transam/xlogrecovery.c11
-rw-r--r--src/backend/commands/Makefile3
-rw-r--r--src/backend/commands/meson.build1
-rw-r--r--src/backend/commands/wait.c212
-rw-r--r--src/backend/parser/gram.y33
-rw-r--r--src/backend/storage/lmgr/proc.c6
-rw-r--r--src/backend/tcop/pquery.c12
-rw-r--r--src/backend/tcop/utility.c22
10 files changed, 303 insertions, 10 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 2cf3d4e92b7..092e197eba3 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -31,6 +31,7 @@
#include "access/xloginsert.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/pg_enum.h"
@@ -2843,6 +2844,11 @@ AbortTransaction(void)
*/
LWLockReleaseAll();
+ /*
+ * Cleanup waiting for LSN if any.
+ */
+ WaitLSNCleanup();
+
/* Clear wait information and command progress indicator */
pgstat_report_wait_end();
pgstat_progress_end_command();
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 9900e3e0179..7c959051e11 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -62,6 +62,7 @@
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "backup/basebackup.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
@@ -6228,6 +6229,12 @@ StartupXLOG(void)
LWLockRelease(ControlFileLock);
/*
+ * Wake up all waiters for replay LSN. They need to report an error that
+ * recovery was ended before reaching the target LSN.
+ */
+ WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr);
+
+ /*
* Shutdown the recovery environment. This must occur after
* RecoverPreparedTransactions() (see notes in lock_twophase_recover())
* and after switching SharedRecoveryState to RECOVERY_STATE_DONE so as
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 93c50831b26..550de6e4a59 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -40,6 +40,7 @@
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "backup/basebackup.h"
#include "catalog/pg_control.h"
#include "commands/tablespace.h"
@@ -1838,6 +1839,16 @@ PerformWalRecovery(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for then walk
+ * over the shared memory array and set latches to notify the
+ * waiters.
+ */
+ if (waitLSNState &&
+ (XLogRecoveryCtl->lastReplayedEndRecPtr >=
+ pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY])))
+ WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr);
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index cb2fbdc7c60..f99acfd2b4b 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -64,6 +64,7 @@ OBJS = \
vacuum.o \
vacuumparallel.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index dd4cde41d32..9f640ad4810 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -53,4 +53,5 @@ backend_sources += files(
'vacuumparallel.c',
'variable.c',
'view.c',
+ 'wait.c',
)
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 00000000000..67068a92dbf
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,212 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements WAIT FOR, which allows waiting for events such as
+ * time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
+#include "commands/defrem.h"
+#include "commands/wait.h"
+#include "executor/executor.h"
+#include "parser/parse_node.h"
+#include "storage/proc.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+
+
+void
+ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
+{
+ XLogRecPtr lsn;
+ int64 timeout = 0;
+ WaitLSNResult waitLSNResult;
+ bool throw = true;
+ TupleDesc tupdesc;
+ TupOutputState *tstate;
+ const char *result = "<unset>";
+ bool timeout_specified = false;
+ bool no_throw_specified = false;
+
+ /* Parse and validate the mandatory LSN */
+ lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(stmt->lsn_literal)));
+
+ foreach_node(DefElem, defel, stmt->options)
+ {
+ if (strcmp(defel->defname, "timeout") == 0)
+ {
+ char *timeout_str;
+ const char *hintmsg;
+ double result;
+
+ if (timeout_specified)
+ errorConflictingDefElem(defel, pstate);
+ timeout_specified = true;
+
+ timeout_str = defGetString(defel);
+
+ if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid timeout value: \"%s\"", timeout_str),
+ hintmsg ? errhint("%s", _(hintmsg)) : 0);
+ }
+
+ /*
+ * Get rid of any fractional part in the input. This is so we
+ * don't fail on just-out-of-range values that would round into
+ * range.
+ */
+ result = rint(result);
+
+ /* Range check */
+ if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
+ ereport(ERROR,
+ errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("timeout value is out of range"));
+
+ if (result < 0)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("timeout cannot be negative"));
+
+ timeout = (int64) result;
+ }
+ else if (strcmp(defel->defname, "no_throw") == 0)
+ {
+ if (no_throw_specified)
+ errorConflictingDefElem(defel, pstate);
+
+ no_throw_specified = true;
+
+ throw = !defGetBoolean(defel);
+ }
+ else
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("option \"%s\" not recognized",
+ defel->defname),
+ parser_errposition(pstate, defel->location));
+ }
+ }
+
+ /*
+ * We are going to wait for the LSN replay. We should first care that we
+ * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
+ * Otherwise, our snapshot could prevent the replay of WAL records
+ * implying a kind of self-deadlock. This is the reason why WAIT FOR is a
+ * command, not a procedure or function.
+ *
+ * At first, we should check there is no active snapshot. According to
+ * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
+ * processed with a snapshot. Thankfully, we can pop this snapshot,
+ * because PortalRunUtility() can tolerate this.
+ */
+ if (ActiveSnapshotSet())
+ PopActiveSnapshot();
+
+ /*
+ * At second, invalidate a catalog snapshot if any. And we should be done
+ * with the preparation.
+ */
+ InvalidateCatalogSnapshot();
+
+ /* Give up if there is still an active or registered snapshot. */
+ if (HaveRegisteredOrActiveSnapshot())
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("WAIT FOR must be only called without an active or registered snapshot"),
+ errdetail("WAIT FOR cannot be executed from a function or a procedure or within a transaction with an isolation level higher than READ COMMITTED."));
+
+ /*
+ * As the result we should hold no snapshot, and correspondingly our xmin
+ * should be unset.
+ */
+ Assert(MyProc->xmin == InvalidTransactionId);
+
+ waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
+
+ /*
+ * Process the result of WaitForLSNReplay(). Throw appropriate error if
+ * needed.
+ */
+ switch (waitLSNResult)
+ {
+ case WAIT_LSN_RESULT_SUCCESS:
+ /* Nothing to do on success */
+ result = "success";
+ break;
+
+ case WAIT_LSN_RESULT_TIMEOUT:
+ if (throw)
+ ereport(ERROR,
+ errcode(ERRCODE_QUERY_CANCELED),
+ errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+ else
+ result = "timeout";
+ break;
+
+ case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
+ if (throw)
+ {
+ if (PromoteIsTriggered())
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+ }
+ else
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errhint("Waiting for the replay LSN can only be executed during recovery."));
+ }
+ else
+ result = "not in recovery";
+ break;
+ }
+
+ /* need a tuple descriptor representing a single TEXT column */
+ tupdesc = WaitStmtResultDesc(stmt);
+
+ /* prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+
+ /* Send it */
+ do_text_output_oneline(tstate, result);
+
+ end_tup_output(tstate);
+}
+
+TupleDesc
+WaitStmtResultDesc(WaitStmt *stmt)
+{
+ TupleDesc tupdesc;
+
+ /* Need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+ TEXTOID, -1, 0);
+ return tupdesc;
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 8a0470d5b84..57fe0186547 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -308,7 +308,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt
UnlistenStmt UpdateStmt VacuumStmt
VariableResetStmt VariableSetStmt VariableShowStmt
- ViewStmt CheckPointStmt CreateConversionStmt
+ ViewStmt WaitStmt CheckPointStmt CreateConversionStmt
DeallocateStmt PrepareStmt ExecuteStmt
DropOwnedStmt ReassignOwnedStmt
AlterTSConfigurationStmt AlterTSDictionaryStmt
@@ -325,6 +325,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <boolean> opt_concurrently
%type <dbehavior> opt_drop_behavior
%type <list> opt_utility_option_list
+%type <list> opt_wait_with_clause
%type <list> utility_option_list
%type <defelt> utility_option_elem
%type <str> utility_option_name
@@ -678,7 +679,6 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
json_object_constructor_null_clause_opt
json_array_constructor_null_clause_opt
-
/*
* Non-keyword token types. These are hard-wired into the "flex" lexer.
* They must be listed first so that their numeric codes do not depend on
@@ -748,7 +748,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
- LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+ LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN_P
MAPPING MATCH MATCHED MATERIALIZED MAXVALUE MERGE MERGE_ACTION METHOD
MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -792,7 +792,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VIRTUAL VOLATILE
- WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+ WAIT WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES
XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE
@@ -1120,6 +1120,7 @@ stmt:
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | WaitStmt
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -16482,6 +16483,26 @@ xml_passing_mech:
| BY VALUE_P
;
+/*****************************************************************************
+ *
+ * WAIT FOR LSN
+ *
+ *****************************************************************************/
+
+WaitStmt:
+ WAIT FOR LSN_P Sconst opt_wait_with_clause
+ {
+ WaitStmt *n = makeNode(WaitStmt);
+ n->lsn_literal = $4;
+ n->options = $5;
+ $$ = (Node *) n;
+ }
+ ;
+
+opt_wait_with_clause:
+ WITH '(' utility_option_list ')' { $$ = $3; }
+ | /*EMPTY*/ { $$ = NIL; }
+ ;
/*
* Aggregate decoration clauses
@@ -17969,6 +17990,7 @@ unreserved_keyword:
| LOCK_P
| LOCKED
| LOGGED
+ | LSN_P
| MAPPING
| MATCH
| MATCHED
@@ -18139,6 +18161,7 @@ unreserved_keyword:
| VIEWS
| VIRTUAL
| VOLATILE
+ | WAIT
| WHITESPACE_P
| WITHIN
| WITHOUT
@@ -18585,6 +18608,7 @@ bare_label_keyword:
| LOCK_P
| LOCKED
| LOGGED
+ | LSN_P
| MAPPING
| MATCH
| MATCHED
@@ -18796,6 +18820,7 @@ bare_label_keyword:
| VIEWS
| VIRTUAL
| VOLATILE
+ | WAIT
| WHEN
| WHITESPACE_P
| WORK
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 96f29aafc39..26b201eadb8 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -36,6 +36,7 @@
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xlogutils.h"
+#include "access/xlogwait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -947,6 +948,11 @@ ProcKill(int code, Datum arg)
*/
LWLockReleaseAll();
+ /*
+ * Cleanup waiting for LSN if any.
+ */
+ WaitLSNCleanup();
+
/* Cancel any pending condition variable sleep, too */
ConditionVariableCancelSleep();
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 74179139fa9..fde78c55160 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -1158,10 +1158,11 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt,
MemoryContextSwitchTo(portal->portalContext);
/*
- * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from
- * under us, so don't complain if it's now empty. Otherwise, our snapshot
- * should be the top one; pop it. Note that this could be a different
- * snapshot from the one we made above; see EnsurePortalSnapshotExists.
+ * Some utility commands (e.g., VACUUM, WAIT FOR) pop the ActiveSnapshot
+ * stack from under us, so don't complain if it's now empty. Otherwise,
+ * our snapshot should be the top one; pop it. Note that this could be a
+ * different snapshot from the one we made above; see
+ * EnsurePortalSnapshotExists.
*/
if (portal->portalSnapshot != NULL && ActiveSnapshotSet())
{
@@ -1738,7 +1739,8 @@ PlannedStmtRequiresSnapshot(PlannedStmt *pstmt)
IsA(utilityStmt, ListenStmt) ||
IsA(utilityStmt, NotifyStmt) ||
IsA(utilityStmt, UnlistenStmt) ||
- IsA(utilityStmt, CheckPointStmt))
+ IsA(utilityStmt, CheckPointStmt) ||
+ IsA(utilityStmt, WaitStmt))
return false;
return true;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 918db53dd5e..082967c0a86 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -56,6 +56,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -266,6 +267,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree)
case T_PrepareStmt:
case T_UnlistenStmt:
case T_VariableSetStmt:
+ case T_WaitStmt:
{
/*
* These modify only backend-local state, so they're OK to run
@@ -1055,6 +1057,12 @@ standard_ProcessUtility(PlannedStmt *pstmt,
break;
}
+ case T_WaitStmt:
+ {
+ ExecWaitStmt(pstate, (WaitStmt *) parsetree, dest);
+ }
+ break;
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(pstate, pstmt, queryString,
@@ -2059,6 +2067,9 @@ UtilityReturnsTuples(Node *parsetree)
case T_VariableShowStmt:
return true;
+ case T_WaitStmt:
+ return true;
+
default:
return false;
}
@@ -2114,6 +2125,9 @@ UtilityTupleDescriptor(Node *parsetree)
return GetPGVariableResultDesc(n->name);
}
+ case T_WaitStmt:
+ return WaitStmtResultDesc((WaitStmt *) parsetree);
+
default:
return NULL;
}
@@ -3091,6 +3105,10 @@ CreateCommandTag(Node *parsetree)
}
break;
+ case T_WaitStmt:
+ tag = CMDTAG_WAIT;
+ break;
+
/* already-planned queries */
case T_PlannedStmt:
{
@@ -3689,6 +3707,10 @@ GetCommandLogLevel(Node *parsetree)
lev = LOGSTMT_DDL;
break;
+ case T_WaitStmt:
+ lev = LOGSTMT_ALL;
+ break;
+
/* already-planned queries */
case T_PlannedStmt:
{