summaryrefslogtreecommitdiff
path: root/src/backend/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands')
-rw-r--r--src/backend/commands/Makefile3
-rw-r--r--src/backend/commands/meson.build1
-rw-r--r--src/backend/commands/wait.c212
3 files changed, 215 insertions, 1 deletions
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index cb2fbdc7c60..f99acfd2b4b 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -64,6 +64,7 @@ OBJS = \
vacuum.o \
vacuumparallel.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build
index dd4cde41d32..9f640ad4810 100644
--- a/src/backend/commands/meson.build
+++ b/src/backend/commands/meson.build
@@ -53,4 +53,5 @@ backend_sources += files(
'vacuumparallel.c',
'variable.c',
'view.c',
+ 'wait.c',
)
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
new file mode 100644
index 00000000000..67068a92dbf
--- /dev/null
+++ b/src/backend/commands/wait.c
@@ -0,0 +1,212 @@
+/*-------------------------------------------------------------------------
+ *
+ * wait.c
+ * Implements WAIT FOR, which allows waiting for events such as
+ * time passing or LSN having been replayed on replica.
+ *
+ * Portions Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/commands/wait.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
+#include "commands/defrem.h"
+#include "commands/wait.h"
+#include "executor/executor.h"
+#include "parser/parse_node.h"
+#include "storage/proc.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+
+
+void
+ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
+{
+ XLogRecPtr lsn;
+ int64 timeout = 0;
+ WaitLSNResult waitLSNResult;
+ bool throw = true;
+ TupleDesc tupdesc;
+ TupOutputState *tstate;
+ const char *result = "<unset>";
+ bool timeout_specified = false;
+ bool no_throw_specified = false;
+
+ /* Parse and validate the mandatory LSN */
+ lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
+ CStringGetDatum(stmt->lsn_literal)));
+
+ foreach_node(DefElem, defel, stmt->options)
+ {
+ if (strcmp(defel->defname, "timeout") == 0)
+ {
+ char *timeout_str;
+ const char *hintmsg;
+ double result;
+
+ if (timeout_specified)
+ errorConflictingDefElem(defel, pstate);
+ timeout_specified = true;
+
+ timeout_str = defGetString(defel);
+
+ if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("invalid timeout value: \"%s\"", timeout_str),
+ hintmsg ? errhint("%s", _(hintmsg)) : 0);
+ }
+
+ /*
+ * Get rid of any fractional part in the input. This is so we
+ * don't fail on just-out-of-range values that would round into
+ * range.
+ */
+ result = rint(result);
+
+ /* Range check */
+ if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
+ ereport(ERROR,
+ errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("timeout value is out of range"));
+
+ if (result < 0)
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("timeout cannot be negative"));
+
+ timeout = (int64) result;
+ }
+ else if (strcmp(defel->defname, "no_throw") == 0)
+ {
+ if (no_throw_specified)
+ errorConflictingDefElem(defel, pstate);
+
+ no_throw_specified = true;
+
+ throw = !defGetBoolean(defel);
+ }
+ else
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("option \"%s\" not recognized",
+ defel->defname),
+ parser_errposition(pstate, defel->location));
+ }
+ }
+
+ /*
+ * We are going to wait for the LSN replay. We should first care that we
+ * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
+ * Otherwise, our snapshot could prevent the replay of WAL records
+ * implying a kind of self-deadlock. This is the reason why WAIT FOR is a
+ * command, not a procedure or function.
+ *
+ * At first, we should check there is no active snapshot. According to
+ * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
+ * processed with a snapshot. Thankfully, we can pop this snapshot,
+ * because PortalRunUtility() can tolerate this.
+ */
+ if (ActiveSnapshotSet())
+ PopActiveSnapshot();
+
+ /*
+ * At second, invalidate a catalog snapshot if any. And we should be done
+ * with the preparation.
+ */
+ InvalidateCatalogSnapshot();
+
+ /* Give up if there is still an active or registered snapshot. */
+ if (HaveRegisteredOrActiveSnapshot())
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("WAIT FOR must be only called without an active or registered snapshot"),
+ errdetail("WAIT FOR cannot be executed from a function or a procedure or within a transaction with an isolation level higher than READ COMMITTED."));
+
+ /*
+ * As the result we should hold no snapshot, and correspondingly our xmin
+ * should be unset.
+ */
+ Assert(MyProc->xmin == InvalidTransactionId);
+
+ waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
+
+ /*
+ * Process the result of WaitForLSNReplay(). Throw appropriate error if
+ * needed.
+ */
+ switch (waitLSNResult)
+ {
+ case WAIT_LSN_RESULT_SUCCESS:
+ /* Nothing to do on success */
+ result = "success";
+ break;
+
+ case WAIT_LSN_RESULT_TIMEOUT:
+ if (throw)
+ ereport(ERROR,
+ errcode(ERRCODE_QUERY_CANCELED),
+ errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+ else
+ result = "timeout";
+ break;
+
+ case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
+ if (throw)
+ {
+ if (PromoteIsTriggered())
+ {
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.",
+ LSN_FORMAT_ARGS(lsn),
+ LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+ }
+ else
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("recovery is not in progress"),
+ errhint("Waiting for the replay LSN can only be executed during recovery."));
+ }
+ else
+ result = "not in recovery";
+ break;
+ }
+
+ /* need a tuple descriptor representing a single TEXT column */
+ tupdesc = WaitStmtResultDesc(stmt);
+
+ /* prepare for projection of tuples */
+ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
+
+ /* Send it */
+ do_text_output_oneline(tstate, result);
+
+ end_tup_output(tstate);
+}
+
+TupleDesc
+WaitStmtResultDesc(WaitStmt *stmt)
+{
+ TupleDesc tupdesc;
+
+ /* Need a tuple descriptor representing a single TEXT column */
+ tupdesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
+ TEXTOID, -1, 0);
+ return tupdesc;
+}