diff options
Diffstat (limited to 'src/backend/commands')
| -rw-r--r-- | src/backend/commands/Makefile | 3 | ||||
| -rw-r--r-- | src/backend/commands/meson.build | 1 | ||||
| -rw-r--r-- | src/backend/commands/wait.c | 212 |
3 files changed, 215 insertions, 1 deletions
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index cb2fbdc7c60..f99acfd2b4b 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -64,6 +64,7 @@ OBJS = \ vacuum.o \ vacuumparallel.o \ variable.o \ - view.o + view.o \ + wait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build index dd4cde41d32..9f640ad4810 100644 --- a/src/backend/commands/meson.build +++ b/src/backend/commands/meson.build @@ -53,4 +53,5 @@ backend_sources += files( 'vacuumparallel.c', 'variable.c', 'view.c', + 'wait.c', ) diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c new file mode 100644 index 00000000000..67068a92dbf --- /dev/null +++ b/src/backend/commands/wait.c @@ -0,0 +1,212 @@ +/*------------------------------------------------------------------------- + * + * wait.c + * Implements WAIT FOR, which allows waiting for events such as + * time passing or LSN having been replayed on replica. + * + * Portions Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/commands/wait.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <math.h> + +#include "access/xlogrecovery.h" +#include "access/xlogwait.h" +#include "commands/defrem.h" +#include "commands/wait.h" +#include "executor/executor.h" +#include "parser/parse_node.h" +#include "storage/proc.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/pg_lsn.h" +#include "utils/snapmgr.h" + + +void +ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) +{ + XLogRecPtr lsn; + int64 timeout = 0; + WaitLSNResult waitLSNResult; + bool throw = true; + TupleDesc tupdesc; + TupOutputState *tstate; + const char *result = "<unset>"; + bool timeout_specified = false; + bool no_throw_specified = false; + + /* Parse and validate the mandatory LSN */ + lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, + CStringGetDatum(stmt->lsn_literal))); + + foreach_node(DefElem, defel, stmt->options) + { + if (strcmp(defel->defname, "timeout") == 0) + { + char *timeout_str; + const char *hintmsg; + double result; + + if (timeout_specified) + errorConflictingDefElem(defel, pstate); + timeout_specified = true; + + timeout_str = defGetString(defel); + + if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg)) + { + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid timeout value: \"%s\"", timeout_str), + hintmsg ? errhint("%s", _(hintmsg)) : 0); + } + + /* + * Get rid of any fractional part in the input. This is so we + * don't fail on just-out-of-range values that would round into + * range. + */ + result = rint(result); + + /* Range check */ + if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result))) + ereport(ERROR, + errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("timeout value is out of range")); + + if (result < 0) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("timeout cannot be negative")); + + timeout = (int64) result; + } + else if (strcmp(defel->defname, "no_throw") == 0) + { + if (no_throw_specified) + errorConflictingDefElem(defel, pstate); + + no_throw_specified = true; + + throw = !defGetBoolean(defel); + } + else + { + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("option \"%s\" not recognized", + defel->defname), + parser_errposition(pstate, defel->location)); + } + } + + /* + * We are going to wait for the LSN replay. We should first care that we + * don't hold a snapshot and correspondingly our MyProc->xmin is invalid. + * Otherwise, our snapshot could prevent the replay of WAL records + * implying a kind of self-deadlock. This is the reason why WAIT FOR is a + * command, not a procedure or function. + * + * At first, we should check there is no active snapshot. According to + * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is + * processed with a snapshot. Thankfully, we can pop this snapshot, + * because PortalRunUtility() can tolerate this. + */ + if (ActiveSnapshotSet()) + PopActiveSnapshot(); + + /* + * At second, invalidate a catalog snapshot if any. And we should be done + * with the preparation. + */ + InvalidateCatalogSnapshot(); + + /* Give up if there is still an active or registered snapshot. */ + if (HaveRegisteredOrActiveSnapshot()) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("WAIT FOR must be only called without an active or registered snapshot"), + errdetail("WAIT FOR cannot be executed from a function or a procedure or within a transaction with an isolation level higher than READ COMMITTED.")); + + /* + * As the result we should hold no snapshot, and correspondingly our xmin + * should be unset. + */ + Assert(MyProc->xmin == InvalidTransactionId); + + waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout); + + /* + * Process the result of WaitForLSNReplay(). Throw appropriate error if + * needed. + */ + switch (waitLSNResult) + { + case WAIT_LSN_RESULT_SUCCESS: + /* Nothing to do on success */ + result = "success"; + break; + + case WAIT_LSN_RESULT_TIMEOUT: + if (throw) + ereport(ERROR, + errcode(ERRCODE_QUERY_CANCELED), + errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))); + else + result = "timeout"; + break; + + case WAIT_LSN_RESULT_NOT_IN_RECOVERY: + if (throw) + { + if (PromoteIsTriggered()) + { + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))); + } + else + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errhint("Waiting for the replay LSN can only be executed during recovery.")); + } + else + result = "not in recovery"; + break; + } + + /* need a tuple descriptor representing a single TEXT column */ + tupdesc = WaitStmtResultDesc(stmt); + + /* prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + /* Send it */ + do_text_output_oneline(tstate, result); + + end_tup_output(tstate); +} + +TupleDesc +WaitStmtResultDesc(WaitStmt *stmt) +{ + TupleDesc tupdesc; + + /* Need a tuple descriptor representing a single TEXT column */ + tupdesc = CreateTemplateTupleDesc(1); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0); + return tupdesc; +} |
