diff options
Diffstat (limited to 'src/backend')
| -rw-r--r-- | src/backend/access/transam/xact.c | 6 | ||||
| -rw-r--r-- | src/backend/access/transam/xlog.c | 7 | ||||
| -rw-r--r-- | src/backend/access/transam/xlogrecovery.c | 11 | ||||
| -rw-r--r-- | src/backend/commands/Makefile | 3 | ||||
| -rw-r--r-- | src/backend/commands/meson.build | 1 | ||||
| -rw-r--r-- | src/backend/commands/wait.c | 212 | ||||
| -rw-r--r-- | src/backend/parser/gram.y | 33 | ||||
| -rw-r--r-- | src/backend/storage/lmgr/proc.c | 6 | ||||
| -rw-r--r-- | src/backend/tcop/pquery.c | 12 | ||||
| -rw-r--r-- | src/backend/tcop/utility.c | 22 |
10 files changed, 303 insertions, 10 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 2cf3d4e92b7..092e197eba3 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -31,6 +31,7 @@ #include "access/xloginsert.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "catalog/index.h" #include "catalog/namespace.h" #include "catalog/pg_enum.h" @@ -2843,6 +2844,11 @@ AbortTransaction(void) */ LWLockReleaseAll(); + /* + * Cleanup waiting for LSN if any. + */ + WaitLSNCleanup(); + /* Clear wait information and command progress indicator */ pgstat_report_wait_end(); pgstat_progress_end_command(); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9900e3e0179..7c959051e11 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -62,6 +62,7 @@ #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "backup/basebackup.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" @@ -6228,6 +6229,12 @@ StartupXLOG(void) LWLockRelease(ControlFileLock); /* + * Wake up all waiters for replay LSN. They need to report an error that + * recovery was ended before reaching the target LSN. + */ + WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr); + + /* * Shutdown the recovery environment. This must occur after * RecoverPreparedTransactions() (see notes in lock_twophase_recover()) * and after switching SharedRecoveryState to RECOVERY_STATE_DONE so as diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 93c50831b26..550de6e4a59 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -40,6 +40,7 @@ #include "access/xlogreader.h" #include "access/xlogrecovery.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "backup/basebackup.h" #include "catalog/pg_control.h" #include "commands/tablespace.h" @@ -1838,6 +1839,16 @@ PerformWalRecovery(void) break; } + /* + * If we replayed an LSN that someone was waiting for then walk + * over the shared memory array and set latches to notify the + * waiters. + */ + if (waitLSNState && + (XLogRecoveryCtl->lastReplayedEndRecPtr >= + pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY]))) + WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr); + /* Else, try to fetch the next WAL record */ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI); } while (record != NULL); diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile index cb2fbdc7c60..f99acfd2b4b 100644 --- a/src/backend/commands/Makefile +++ b/src/backend/commands/Makefile @@ -64,6 +64,7 @@ OBJS = \ vacuum.o \ vacuumparallel.o \ variable.o \ - view.o + view.o \ + wait.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build index dd4cde41d32..9f640ad4810 100644 --- a/src/backend/commands/meson.build +++ b/src/backend/commands/meson.build @@ -53,4 +53,5 @@ backend_sources += files( 'vacuumparallel.c', 'variable.c', 'view.c', + 'wait.c', ) diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c new file mode 100644 index 00000000000..67068a92dbf --- /dev/null +++ b/src/backend/commands/wait.c @@ -0,0 +1,212 @@ +/*------------------------------------------------------------------------- + * + * wait.c + * Implements WAIT FOR, which allows waiting for events such as + * time passing or LSN having been replayed on replica. + * + * Portions Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/commands/wait.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <math.h> + +#include "access/xlogrecovery.h" +#include "access/xlogwait.h" +#include "commands/defrem.h" +#include "commands/wait.h" +#include "executor/executor.h" +#include "parser/parse_node.h" +#include "storage/proc.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/pg_lsn.h" +#include "utils/snapmgr.h" + + +void +ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest) +{ + XLogRecPtr lsn; + int64 timeout = 0; + WaitLSNResult waitLSNResult; + bool throw = true; + TupleDesc tupdesc; + TupOutputState *tstate; + const char *result = "<unset>"; + bool timeout_specified = false; + bool no_throw_specified = false; + + /* Parse and validate the mandatory LSN */ + lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, + CStringGetDatum(stmt->lsn_literal))); + + foreach_node(DefElem, defel, stmt->options) + { + if (strcmp(defel->defname, "timeout") == 0) + { + char *timeout_str; + const char *hintmsg; + double result; + + if (timeout_specified) + errorConflictingDefElem(defel, pstate); + timeout_specified = true; + + timeout_str = defGetString(defel); + + if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg)) + { + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid timeout value: \"%s\"", timeout_str), + hintmsg ? errhint("%s", _(hintmsg)) : 0); + } + + /* + * Get rid of any fractional part in the input. This is so we + * don't fail on just-out-of-range values that would round into + * range. + */ + result = rint(result); + + /* Range check */ + if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result))) + ereport(ERROR, + errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("timeout value is out of range")); + + if (result < 0) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("timeout cannot be negative")); + + timeout = (int64) result; + } + else if (strcmp(defel->defname, "no_throw") == 0) + { + if (no_throw_specified) + errorConflictingDefElem(defel, pstate); + + no_throw_specified = true; + + throw = !defGetBoolean(defel); + } + else + { + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("option \"%s\" not recognized", + defel->defname), + parser_errposition(pstate, defel->location)); + } + } + + /* + * We are going to wait for the LSN replay. We should first care that we + * don't hold a snapshot and correspondingly our MyProc->xmin is invalid. + * Otherwise, our snapshot could prevent the replay of WAL records + * implying a kind of self-deadlock. This is the reason why WAIT FOR is a + * command, not a procedure or function. + * + * At first, we should check there is no active snapshot. According to + * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is + * processed with a snapshot. Thankfully, we can pop this snapshot, + * because PortalRunUtility() can tolerate this. + */ + if (ActiveSnapshotSet()) + PopActiveSnapshot(); + + /* + * At second, invalidate a catalog snapshot if any. And we should be done + * with the preparation. + */ + InvalidateCatalogSnapshot(); + + /* Give up if there is still an active or registered snapshot. */ + if (HaveRegisteredOrActiveSnapshot()) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("WAIT FOR must be only called without an active or registered snapshot"), + errdetail("WAIT FOR cannot be executed from a function or a procedure or within a transaction with an isolation level higher than READ COMMITTED.")); + + /* + * As the result we should hold no snapshot, and correspondingly our xmin + * should be unset. + */ + Assert(MyProc->xmin == InvalidTransactionId); + + waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout); + + /* + * Process the result of WaitForLSNReplay(). Throw appropriate error if + * needed. + */ + switch (waitLSNResult) + { + case WAIT_LSN_RESULT_SUCCESS: + /* Nothing to do on success */ + result = "success"; + break; + + case WAIT_LSN_RESULT_TIMEOUT: + if (throw) + ereport(ERROR, + errcode(ERRCODE_QUERY_CANCELED), + errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))); + else + result = "timeout"; + break; + + case WAIT_LSN_RESULT_NOT_IN_RECOVERY: + if (throw) + { + if (PromoteIsTriggered()) + { + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.", + LSN_FORMAT_ARGS(lsn), + LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL)))); + } + else + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("recovery is not in progress"), + errhint("Waiting for the replay LSN can only be executed during recovery.")); + } + else + result = "not in recovery"; + break; + } + + /* need a tuple descriptor representing a single TEXT column */ + tupdesc = WaitStmtResultDesc(stmt); + + /* prepare for projection of tuples */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + /* Send it */ + do_text_output_oneline(tstate, result); + + end_tup_output(tstate); +} + +TupleDesc +WaitStmtResultDesc(WaitStmt *stmt) +{ + TupleDesc tupdesc; + + /* Need a tuple descriptor representing a single TEXT column */ + tupdesc = CreateTemplateTupleDesc(1); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0); + return tupdesc; +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 8a0470d5b84..57fe0186547 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -308,7 +308,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); SecLabelStmt SelectStmt TransactionStmt TransactionStmtLegacy TruncateStmt UnlistenStmt UpdateStmt VacuumStmt VariableResetStmt VariableSetStmt VariableShowStmt - ViewStmt CheckPointStmt CreateConversionStmt + ViewStmt WaitStmt CheckPointStmt CreateConversionStmt DeallocateStmt PrepareStmt ExecuteStmt DropOwnedStmt ReassignOwnedStmt AlterTSConfigurationStmt AlterTSDictionaryStmt @@ -325,6 +325,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type <boolean> opt_concurrently %type <dbehavior> opt_drop_behavior %type <list> opt_utility_option_list +%type <list> opt_wait_with_clause %type <list> utility_option_list %type <defelt> utility_option_elem %type <str> utility_option_name @@ -678,7 +679,6 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); json_object_constructor_null_clause_opt json_array_constructor_null_clause_opt - /* * Non-keyword token types. These are hard-wired into the "flex" lexer. * They must be listed first so that their numeric codes do not depend on @@ -748,7 +748,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); LABEL LANGUAGE LARGE_P LAST_P LATERAL_P LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL - LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED + LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED LSN_P MAPPING MATCH MATCHED MATERIALIZED MAXVALUE MERGE MERGE_ACTION METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE @@ -792,7 +792,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING VERBOSE VERSION_P VIEW VIEWS VIRTUAL VOLATILE - WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE + WAIT WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLNAMESPACES XMLPARSE XMLPI XMLROOT XMLSERIALIZE XMLTABLE @@ -1120,6 +1120,7 @@ stmt: | VariableSetStmt | VariableShowStmt | ViewStmt + | WaitStmt | /*EMPTY*/ { $$ = NULL; } ; @@ -16482,6 +16483,26 @@ xml_passing_mech: | BY VALUE_P ; +/***************************************************************************** + * + * WAIT FOR LSN + * + *****************************************************************************/ + +WaitStmt: + WAIT FOR LSN_P Sconst opt_wait_with_clause + { + WaitStmt *n = makeNode(WaitStmt); + n->lsn_literal = $4; + n->options = $5; + $$ = (Node *) n; + } + ; + +opt_wait_with_clause: + WITH '(' utility_option_list ')' { $$ = $3; } + | /*EMPTY*/ { $$ = NIL; } + ; /* * Aggregate decoration clauses @@ -17969,6 +17990,7 @@ unreserved_keyword: | LOCK_P | LOCKED | LOGGED + | LSN_P | MAPPING | MATCH | MATCHED @@ -18139,6 +18161,7 @@ unreserved_keyword: | VIEWS | VIRTUAL | VOLATILE + | WAIT | WHITESPACE_P | WITHIN | WITHOUT @@ -18585,6 +18608,7 @@ bare_label_keyword: | LOCK_P | LOCKED | LOGGED + | LSN_P | MAPPING | MATCH | MATCHED @@ -18796,6 +18820,7 @@ bare_label_keyword: | VIEWS | VIRTUAL | VOLATILE + | WAIT | WHEN | WHITESPACE_P | WORK diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 96f29aafc39..26b201eadb8 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -36,6 +36,7 @@ #include "access/transam.h" #include "access/twophase.h" #include "access/xlogutils.h" +#include "access/xlogwait.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -947,6 +948,11 @@ ProcKill(int code, Datum arg) */ LWLockReleaseAll(); + /* + * Cleanup waiting for LSN if any. + */ + WaitLSNCleanup(); + /* Cancel any pending condition variable sleep, too */ ConditionVariableCancelSleep(); diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 74179139fa9..fde78c55160 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -1158,10 +1158,11 @@ PortalRunUtility(Portal portal, PlannedStmt *pstmt, MemoryContextSwitchTo(portal->portalContext); /* - * Some utility commands (e.g., VACUUM) pop the ActiveSnapshot stack from - * under us, so don't complain if it's now empty. Otherwise, our snapshot - * should be the top one; pop it. Note that this could be a different - * snapshot from the one we made above; see EnsurePortalSnapshotExists. + * Some utility commands (e.g., VACUUM, WAIT FOR) pop the ActiveSnapshot + * stack from under us, so don't complain if it's now empty. Otherwise, + * our snapshot should be the top one; pop it. Note that this could be a + * different snapshot from the one we made above; see + * EnsurePortalSnapshotExists. */ if (portal->portalSnapshot != NULL && ActiveSnapshotSet()) { @@ -1738,7 +1739,8 @@ PlannedStmtRequiresSnapshot(PlannedStmt *pstmt) IsA(utilityStmt, ListenStmt) || IsA(utilityStmt, NotifyStmt) || IsA(utilityStmt, UnlistenStmt) || - IsA(utilityStmt, CheckPointStmt)) + IsA(utilityStmt, CheckPointStmt) || + IsA(utilityStmt, WaitStmt)) return false; return true; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 918db53dd5e..082967c0a86 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -56,6 +56,7 @@ #include "commands/user.h" #include "commands/vacuum.h" #include "commands/view.h" +#include "commands/wait.h" #include "miscadmin.h" #include "parser/parse_utilcmd.h" #include "postmaster/bgwriter.h" @@ -266,6 +267,7 @@ ClassifyUtilityCommandAsReadOnly(Node *parsetree) case T_PrepareStmt: case T_UnlistenStmt: case T_VariableSetStmt: + case T_WaitStmt: { /* * These modify only backend-local state, so they're OK to run @@ -1055,6 +1057,12 @@ standard_ProcessUtility(PlannedStmt *pstmt, break; } + case T_WaitStmt: + { + ExecWaitStmt(pstate, (WaitStmt *) parsetree, dest); + } + break; + default: /* All other statement types have event trigger support */ ProcessUtilitySlow(pstate, pstmt, queryString, @@ -2059,6 +2067,9 @@ UtilityReturnsTuples(Node *parsetree) case T_VariableShowStmt: return true; + case T_WaitStmt: + return true; + default: return false; } @@ -2114,6 +2125,9 @@ UtilityTupleDescriptor(Node *parsetree) return GetPGVariableResultDesc(n->name); } + case T_WaitStmt: + return WaitStmtResultDesc((WaitStmt *) parsetree); + default: return NULL; } @@ -3091,6 +3105,10 @@ CreateCommandTag(Node *parsetree) } break; + case T_WaitStmt: + tag = CMDTAG_WAIT; + break; + /* already-planned queries */ case T_PlannedStmt: { @@ -3689,6 +3707,10 @@ GetCommandLogLevel(Node *parsetree) lev = LOGSTMT_DDL; break; + case T_WaitStmt: + lev = LOGSTMT_ALL; + break; + /* already-planned queries */ case T_PlannedStmt: { |
