summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/syncutils.c
blob: a696b1f2dc0d9acfacfbcf0cefb3fc147c47d81b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
/*-------------------------------------------------------------------------
 * syncutils.c
 *	  PostgreSQL logical replication: common synchronization code
 *
 * Copyright (c) 2025, PostgreSQL Global Development Group
 *
 * IDENTIFICATION
 *	  src/backend/replication/logical/syncutils.c
 *
 * NOTES
 *	  This file contains code common for synchronization workers.
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "catalog/pg_subscription_rel.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"

/*
 * Enum for phases of the subscription relations state.
 *
 * SYNC_RELATIONS_STATE_NEEDS_REBUILD indicates that the subscription relations
 * state is no longer valid, and the subscription relations should be rebuilt.
 *
 * SYNC_RELATIONS_STATE_REBUILD_STARTED indicates that the subscription
 * relations state is being rebuilt.
 *
 * SYNC_RELATIONS_STATE_VALID indicates that the subscription relation state is
 * up-to-date and valid.
 */
typedef enum
{
	SYNC_RELATIONS_STATE_NEEDS_REBUILD,
	SYNC_RELATIONS_STATE_REBUILD_STARTED,
	SYNC_RELATIONS_STATE_VALID,
} SyncingRelationsState;

static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;

/*
 * Exit routine for synchronization worker.
 */
pg_noreturn void
FinishSyncWorker(void)
{
	Assert(am_sequencesync_worker() || am_tablesync_worker());

	/*
	 * Commit any outstanding transaction. This is the usual case, unless
	 * there was nothing to do for the table.
	 */
	if (IsTransactionState())
	{
		CommitTransactionCommand();
		pgstat_report_stat(true);
	}

	/* And flush all writes. */
	XLogFlush(GetXLogWriteRecPtr());

	if (am_sequencesync_worker())
	{
		ereport(LOG,
				errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
					   MySubscription->name));

		/*
		 * Reset last_seqsync_start_time, so that next time a sequencesync
		 * worker is needed it can be started promptly.
		 */
		logicalrep_reset_seqsync_start_time();
	}
	else
	{
		StartTransactionCommand();
		ereport(LOG,
				errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
					   MySubscription->name,
					   get_rel_name(MyLogicalRepWorker->relid)));
		CommitTransactionCommand();

		/* Find the leader apply worker and signal it. */
		logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
								 InvalidOid);
	}

	/* Stop gracefully */
	proc_exit(0);
}

/*
 * Callback from syscache invalidation.
 */
void
InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
{
	relation_states_validity = SYNC_RELATIONS_STATE_NEEDS_REBUILD;
}

/*
 * Attempt to launch a sync worker for one or more sequences or a table, if
 * a worker slot is available and the retry interval has elapsed.
 *
 * wtype: sync worker type.
 * nsyncworkers: Number of currently running sync workers for the subscription.
 * relid:  InvalidOid for sequencesync worker, actual relid for tablesync
 * worker.
 * last_start_time: Pointer to the last start time of the worker.
 */
void
launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid,
				   TimestampTz *last_start_time)
{
	TimestampTz now;

	Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) ||
		   (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid)));

	/* If there is a free sync worker slot, start a new sync worker */
	if (nsyncworkers >= max_sync_workers_per_subscription)
		return;

	now = GetCurrentTimestamp();

	if (!(*last_start_time) ||
		TimestampDifferenceExceeds(*last_start_time, now,
								   wal_retrieve_retry_interval))
	{
		/*
		 * Set the last_start_time even if we fail to start the worker, so
		 * that we won't retry until wal_retrieve_retry_interval has elapsed.
		 */
		*last_start_time = now;
		(void) logicalrep_worker_launch(wtype,
										MyLogicalRepWorker->dbid,
										MySubscription->oid,
										MySubscription->name,
										MyLogicalRepWorker->userid,
										relid, DSM_HANDLE_INVALID, false);
	}
}

/*
 * Process possible state change(s) of relations that are being synchronized
 * and start new tablesync workers for the newly added tables. Also, start a
 * new sequencesync worker for the newly added sequences.
 */
void
ProcessSyncingRelations(XLogRecPtr current_lsn)
{
	switch (MyLogicalRepWorker->type)
	{
		case WORKERTYPE_PARALLEL_APPLY:

			/*
			 * Skip for parallel apply workers because they only operate on
			 * tables that are in a READY state. See pa_can_start() and
			 * should_apply_changes_for_rel().
			 */
			break;

		case WORKERTYPE_TABLESYNC:
			ProcessSyncingTablesForSync(current_lsn);
			break;

		case WORKERTYPE_APPLY:
			ProcessSyncingTablesForApply(current_lsn);
			ProcessSequencesForSync();
			break;

		case WORKERTYPE_SEQUENCESYNC:
			/* Should never happen. */
			elog(ERROR, "sequence synchronization worker is not expected to process relations");
			break;

		case WORKERTYPE_UNKNOWN:
			/* Should never happen. */
			elog(ERROR, "Unknown worker type");
	}
}

/*
 * Common code to fetch the up-to-date sync state info for tables and sequences.
 *
 * The pg_subscription_rel catalog is shared by tables and sequences. Changes
 * to either sequences or tables can affect the validity of relation states, so
 * we identify non-READY tables and non-READY sequences together to ensure
 * consistency.
 *
 * has_pending_subtables: true if the subscription has one or more tables that
 * are not in READY state, otherwise false.
 * has_pending_subsequences: true if the subscription has one or more sequences
 * that are not in READY state, otherwise false.
 */
void
FetchRelationStates(bool *has_pending_subtables,
					bool *has_pending_subsequences,
					bool *started_tx)
{
	/*
	 * has_subtables and has_subsequences_non_ready are declared as static,
	 * since the same value can be used until the system table is invalidated.
	 */
	static bool has_subtables = false;
	static bool has_subsequences_non_ready = false;

	*started_tx = false;

	if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
	{
		MemoryContext oldctx;
		List	   *rstates;
		SubscriptionRelState *rstate;

		relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
		has_subsequences_non_ready = false;

		/* Clean the old lists. */
		list_free_deep(table_states_not_ready);
		table_states_not_ready = NIL;

		if (!IsTransactionState())
		{
			StartTransactionCommand();
			*started_tx = true;
		}

		/* Fetch tables and sequences that are in non-READY state. */
		rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
										   true);

		/* Allocate the tracking info in a permanent memory context. */
		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
		foreach_ptr(SubscriptionRelState, subrel, rstates)
		{
			if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
				has_subsequences_non_ready = true;
			else
			{
				rstate = palloc(sizeof(SubscriptionRelState));
				memcpy(rstate, subrel, sizeof(SubscriptionRelState));
				table_states_not_ready = lappend(table_states_not_ready,
												 rstate);
			}
		}
		MemoryContextSwitchTo(oldctx);

		/*
		 * Does the subscription have tables?
		 *
		 * If there were not-READY tables found then we know it does. But if
		 * table_states_not_ready was empty we still need to check again to
		 * see if there are 0 tables.
		 */
		has_subtables = (table_states_not_ready != NIL) ||
			HasSubscriptionTables(MySubscription->oid);

		/*
		 * If the subscription relation cache has been invalidated since we
		 * entered this routine, we still use and return the relations we just
		 * finished constructing, to avoid infinite loops, but we leave the
		 * table states marked as stale so that we'll rebuild it again on next
		 * access. Otherwise, we mark the table states as valid.
		 */
		if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
			relation_states_validity = SYNC_RELATIONS_STATE_VALID;
	}

	if (has_pending_subtables)
		*has_pending_subtables = has_subtables;

	if (has_pending_subsequences)
		*has_pending_subsequences = has_subsequences_non_ready;
}