1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
|
/*-------------------------------------------------------------------------
*
* wait.c
* Implements WAIT FOR, which allows waiting for events such as
* time passing or LSN having been replayed on replica.
*
* Portions Copyright (c) 2025, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/commands/wait.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include <math.h>
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
#include "commands/defrem.h"
#include "commands/wait.h"
#include "executor/executor.h"
#include "parser/parse_node.h"
#include "storage/proc.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
void
ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
{
XLogRecPtr lsn;
int64 timeout = 0;
WaitLSNResult waitLSNResult;
bool throw = true;
TupleDesc tupdesc;
TupOutputState *tstate;
const char *result = "<unset>";
bool timeout_specified = false;
bool no_throw_specified = false;
/* Parse and validate the mandatory LSN */
lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
CStringGetDatum(stmt->lsn_literal)));
foreach_node(DefElem, defel, stmt->options)
{
if (strcmp(defel->defname, "timeout") == 0)
{
char *timeout_str;
const char *hintmsg;
double result;
if (timeout_specified)
errorConflictingDefElem(defel, pstate);
timeout_specified = true;
timeout_str = defGetString(defel);
if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
{
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid timeout value: \"%s\"", timeout_str),
hintmsg ? errhint("%s", _(hintmsg)) : 0);
}
/*
* Get rid of any fractional part in the input. This is so we
* don't fail on just-out-of-range values that would round into
* range.
*/
result = rint(result);
/* Range check */
if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
ereport(ERROR,
errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
errmsg("timeout value is out of range"));
if (result < 0)
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("timeout cannot be negative"));
timeout = (int64) result;
}
else if (strcmp(defel->defname, "no_throw") == 0)
{
if (no_throw_specified)
errorConflictingDefElem(defel, pstate);
no_throw_specified = true;
throw = !defGetBoolean(defel);
}
else
{
ereport(ERROR,
errcode(ERRCODE_SYNTAX_ERROR),
errmsg("option \"%s\" not recognized",
defel->defname),
parser_errposition(pstate, defel->location));
}
}
/*
* We are going to wait for the LSN replay. We should first care that we
* don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
* Otherwise, our snapshot could prevent the replay of WAL records
* implying a kind of self-deadlock. This is the reason why WAIT FOR is a
* command, not a procedure or function.
*
* At first, we should check there is no active snapshot. According to
* PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
* processed with a snapshot. Thankfully, we can pop this snapshot,
* because PortalRunUtility() can tolerate this.
*/
if (ActiveSnapshotSet())
PopActiveSnapshot();
/*
* At second, invalidate a catalog snapshot if any. And we should be done
* with the preparation.
*/
InvalidateCatalogSnapshot();
/* Give up if there is still an active or registered snapshot. */
if (HaveRegisteredOrActiveSnapshot())
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("WAIT FOR must be only called without an active or registered snapshot"),
errdetail("WAIT FOR cannot be executed from a function or a procedure or within a transaction with an isolation level higher than READ COMMITTED."));
/*
* As the result we should hold no snapshot, and correspondingly our xmin
* should be unset.
*/
Assert(MyProc->xmin == InvalidTransactionId);
waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
/*
* Process the result of WaitForLSNReplay(). Throw appropriate error if
* needed.
*/
switch (waitLSNResult)
{
case WAIT_LSN_RESULT_SUCCESS:
/* Nothing to do on success */
result = "success";
break;
case WAIT_LSN_RESULT_TIMEOUT:
if (throw)
ereport(ERROR,
errcode(ERRCODE_QUERY_CANCELED),
errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
LSN_FORMAT_ARGS(lsn),
LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
else
result = "timeout";
break;
case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
if (throw)
{
if (PromoteIsTriggered())
{
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is not in progress"),
errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.",
LSN_FORMAT_ARGS(lsn),
LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
}
else
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is not in progress"),
errhint("Waiting for the replay LSN can only be executed during recovery."));
}
else
result = "not in recovery";
break;
}
/* need a tuple descriptor representing a single TEXT column */
tupdesc = WaitStmtResultDesc(stmt);
/* prepare for projection of tuples */
tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
/* Send it */
do_text_output_oneline(tstate, result);
end_tup_output(tstate);
}
TupleDesc
WaitStmtResultDesc(WaitStmt *stmt)
{
TupleDesc tupdesc;
/* Need a tuple descriptor representing a single TEXT column */
tupdesc = CreateTemplateTupleDesc(1);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
TEXTOID, -1, 0);
return tupdesc;
}
|