summaryrefslogtreecommitdiff
path: root/src/backend/commands/waitlsn.c
blob: 63e9ebf1730c2ee5ce726d6d197bc9e7a631a24e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
/*-------------------------------------------------------------------------
 *
 * waitlsn.c
 *	  Implements waiting for the given LSN, which is used in
 *	  CALL pg_wal_replay_wait(target_lsn pg_lsn, timeout float8).
 *
 * Copyright (c) 2024, PostgreSQL Global Development Group
 *
 * IDENTIFICATION
 *	  src/backend/commands/waitlsn.c
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include <float.h>
#include <math.h>

#include "pgstat.h"
#include "access/xlog.h"
#include "access/xlogrecovery.h"
#include "commands/waitlsn.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "storage/latch.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "utils/pg_lsn.h"
#include "utils/snapmgr.h"
#include "utils/fmgrprotos.h"
#include "utils/wait_event_types.h"

/* Add to / delete from shared memory array */
static void addLSNWaiter(XLogRecPtr lsn);
static void deleteLSNWaiter(void);

struct WaitLSNState *waitLSN = NULL;
static volatile sig_atomic_t haveShmemItem = false;

/*
 * Report the amount of shared memory space needed for WaitLSNState
 */
Size
WaitLSNShmemSize(void)
{
	Size		size;

	size = offsetof(WaitLSNState, procInfos);
	size = add_size(size, mul_size(MaxBackends, sizeof(WaitLSNProcInfo)));
	return size;
}

/* Initialize the WaitLSNState in the shared memory */
void
WaitLSNShmemInit(void)
{
	bool		found;

	waitLSN = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
											   WaitLSNShmemSize(),
											   &found);
	if (!found)
	{
		SpinLockInit(&waitLSN->mutex);
		waitLSN->numWaitedProcs = 0;
		pg_atomic_init_u64(&waitLSN->minLSN, PG_UINT64_MAX);
	}
}

/*
 * Add the information about the LSN waiter backend to the shared memory
 * array.
 */
static void
addLSNWaiter(XLogRecPtr lsn)
{
	WaitLSNProcInfo cur;
	int			i;

	cur.procnum = MyProcNumber;
	cur.waitLSN = lsn;

	SpinLockAcquire(&waitLSN->mutex);

	for (i = 0; i < waitLSN->numWaitedProcs; i++)
	{
		if (waitLSN->procInfos[i].waitLSN >= cur.waitLSN)
		{
			WaitLSNProcInfo tmp;

			tmp = waitLSN->procInfos[i];
			waitLSN->procInfos[i] = cur;
			cur = tmp;
		}
	}
	waitLSN->procInfos[i] = cur;
	waitLSN->numWaitedProcs++;

	pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
	SpinLockRelease(&waitLSN->mutex);
}

/*
 * Delete the information about the LSN waiter backend from the shared memory
 * array.
 */
static void
deleteLSNWaiter(void)
{
	int			i;
	bool		found = false;

	SpinLockAcquire(&waitLSN->mutex);

	for (i = 0; i < waitLSN->numWaitedProcs; i++)
	{
		if (waitLSN->procInfos[i].procnum == MyProcNumber)
			found = true;

		if (found && i < waitLSN->numWaitedProcs - 1)
		{
			waitLSN->procInfos[i] = waitLSN->procInfos[i + 1];
		}
	}

	if (!found)
	{
		SpinLockRelease(&waitLSN->mutex);
		return;
	}
	waitLSN->numWaitedProcs--;

	if (waitLSN->numWaitedProcs != 0)
		pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
	else
		pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);

	SpinLockRelease(&waitLSN->mutex);
}

/*
 * Set latches of LSN waiters whose LSN has been replayed.  Set latches of all
 * LSN waiters when InvalidXLogRecPtr is given.
 */
void
WaitLSNSetLatches(XLogRecPtr currentLSN)
{
	int			i;
	int		   *wakeUpProcNums;
	int			numWakeUpProcs;

	wakeUpProcNums = palloc(sizeof(int) * MaxBackends);

	SpinLockAcquire(&waitLSN->mutex);

	/*
	 * Remember processes, whose waited LSNs are already replayed.  We should
	 * set their latches later after spinlock release.
	 */
	for (i = 0; i < waitLSN->numWaitedProcs; i++)
	{
		if (!XLogRecPtrIsInvalid(currentLSN) &&
			waitLSN->procInfos[i].waitLSN > currentLSN)
			break;

		wakeUpProcNums[i] = waitLSN->procInfos[i].procnum;
	}

	/*
	 * Immediately remove those processes from the shmem array.  Otherwise,
	 * shmem array items will be here till corresponding processes wake up and
	 * delete themselves.
	 */
	numWakeUpProcs = i;
	for (i = 0; i < waitLSN->numWaitedProcs - numWakeUpProcs; i++)
		waitLSN->procInfos[i] = waitLSN->procInfos[i + numWakeUpProcs];
	waitLSN->numWaitedProcs -= numWakeUpProcs;

	if (waitLSN->numWaitedProcs != 0)
		pg_atomic_write_u64(&waitLSN->minLSN, waitLSN->procInfos[i].waitLSN);
	else
		pg_atomic_write_u64(&waitLSN->minLSN, PG_UINT64_MAX);

	SpinLockRelease(&waitLSN->mutex);

	/*
	 * Set latches for processes, whose waited LSNs are already replayed. This
	 * involves spinlocks.  So, we shouldn't do this under a spinlock.
	 */
	for (i = 0; i < numWakeUpProcs; i++)
	{
		PGPROC	   *backend;

		backend = GetPGProcByNumber(wakeUpProcNums[i]);
		SetLatch(&backend->procLatch);
	}
	pfree(wakeUpProcNums);
}

/*
 * Delete our item from shmem array if any.
 */
void
WaitLSNCleanup(void)
{
	if (haveShmemItem)
		deleteLSNWaiter();
}

/*
 * Wait using MyLatch till the given LSN is replayed, the postmaster dies or
 * timeout happens.
 */
void
WaitForLSN(XLogRecPtr targetLSN, int64 timeout)
{
	XLogRecPtr	currentLSN;
	TimestampTz endtime = 0;

	/* Shouldn't be called when shmem isn't initialized */
	Assert(waitLSN);

	/* Should be only called by a backend */
	Assert(MyBackendType == B_BACKEND);

	if (!RecoveryInProgress())
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("recovery is not in progress"),
				 errhint("Waiting for LSN can only be executed during recovery.")));

	/* If target LSN is already replayed, exit immediately */
	if (targetLSN <= GetXLogReplayRecPtr(NULL))
		return;

	if (timeout > 0)
		endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);

	addLSNWaiter(targetLSN);
	haveShmemItem = true;

	for (;;)
	{
		int			rc;
		int			latch_events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
		long		delay_ms = 0;

		/* Check if the waited LSN has been replayed */
		currentLSN = GetXLogReplayRecPtr(NULL);
		if (targetLSN <= currentLSN)
			break;

		/* Recheck that recovery is still in-progress */
		if (!RecoveryInProgress())
			ereport(ERROR,
					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
					 errmsg("recovery is not in progress"),
					 errdetail("Recovery ended before replaying the target LSN %X/%X; last replay LSN %X/%X.",
							   LSN_FORMAT_ARGS(targetLSN),
							   LSN_FORMAT_ARGS(currentLSN))));

		if (timeout > 0)
		{
			delay_ms = (endtime - GetCurrentTimestamp()) / 1000;
			latch_events |= WL_TIMEOUT;
			if (delay_ms <= 0)
				break;
		}

		CHECK_FOR_INTERRUPTS();

		rc = WaitLatch(MyLatch, latch_events, delay_ms,
					   WAIT_EVENT_WAIT_FOR_WAL_REPLAY);

		if (rc & WL_LATCH_SET)
			ResetLatch(MyLatch);
	}

	if (targetLSN > currentLSN)
	{
		deleteLSNWaiter();
		haveShmemItem = false;
		ereport(ERROR,
				(errcode(ERRCODE_QUERY_CANCELED),
				 errmsg("timed out while waiting for target LSN %X/%X to be replayed; current replay LSN %X/%X",
						LSN_FORMAT_ARGS(targetLSN),
						LSN_FORMAT_ARGS(currentLSN))));
	}
	else
	{
		haveShmemItem = false;
	}
}

Datum
pg_wal_replay_wait(PG_FUNCTION_ARGS)
{
	XLogRecPtr	target_lsn = PG_GETARG_LSN(0);
	int64		timeout = PG_GETARG_INT64(1);
	CallContext *context = (CallContext *) fcinfo->context;

	if (timeout < 0)
		ereport(ERROR,
				(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
				 errmsg("\"timeout\" must not be negative")));

	/*
	 * We are going to wait for the LSN replay.  We should first care that we
	 * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
	 * Otherwise, our snapshot could prevent the replay of WAL records
	 * implying a kind of self-deadlock.  This is the reason why
	 * pg_wal_replay_wait() is a procedure, not a function.
	 *
	 * At first, we check that pg_wal_replay_wait() is called in a non-atomic
	 * context.  That is, a procedure call isn't wrapped into a transaction,
	 * another procedure call, or a function call.
	 *
	 * Secondly, according to PlannedStmtRequiresSnapshot(), even in an atomic
	 * context, CallStmt is processed with a snapshot.  Thankfully, we can pop
	 * this snapshot, because PortalRunUtility() can tolerate this.
	 */
	if (context->atomic)
		ereport(ERROR,
				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				 errmsg("pg_wal_replay_wait() must be only called in non-atomic context"),
				 errdetail("Make sure pg_wal_replay_wait() isn't called within a transaction, another procedure, or a function.")));

	if (ActiveSnapshotSet())
		PopActiveSnapshot();
	Assert(!ActiveSnapshotSet());
	InvalidateCatalogSnapshot();
	Assert(MyProc->xmin == InvalidTransactionId);

	(void) WaitForLSN(target_lsn, timeout);

	PG_RETURN_VOID();
}