summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/sequencesync.c
blob: 717c82328f284172b0d2c8b437ce5ecf99d2a1c4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
/*-------------------------------------------------------------------------
 * sequencesync.c
 *	  PostgreSQL logical replication: sequence synchronization
 *
 * Copyright (c) 2025, PostgreSQL Global Development Group
 *
 * IDENTIFICATION
 *	  src/backend/replication/logical/sequencesync.c
 *
 * NOTES
 *	  This file contains code for sequence synchronization for
 *	  logical replication.
 *
 * Sequences requiring synchronization are tracked in the pg_subscription_rel
 * catalog.
 *
 * Sequences to be synchronized will be added with state INIT when either of
 * the following commands is executed:
 * CREATE SUBSCRIPTION
 * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
 *
 * Executing the following command resets all sequences in the subscription to
 * state INIT, triggering re-synchronization:
 * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
 *
 * The apply worker periodically scans pg_subscription_rel for sequences in
 * INIT state. When such sequences are found, it spawns a sequencesync worker
 * to handle synchronization.
 *
 * A single sequencesync worker is responsible for synchronizing all sequences.
 * It begins by retrieving the list of sequences that are flagged for
 * synchronization, i.e., those in the INIT state. These sequences are then
 * processed in batches, allowing multiple entries to be synchronized within a
 * single transaction. The worker fetches the current sequence values and page
 * LSNs from the remote publisher, updates the corresponding sequences on the
 * local subscriber, and finally marks each sequence as READY upon successful
 * synchronization.
 *
 * Sequence state transitions follow this pattern:
 *   INIT -> READY
 *
 * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
 * sequences are synchronized per transaction. The locks on the sequence
 * relation will be periodically released at each transaction commit.
 *
 * XXX: We didn't choose launcher process to maintain the launch of sequencesync
 * worker as it didn't have database connection to access the sequences from the
 * pg_subscription_rel system catalog that need to be synchronized.
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "access/table.h"
#include "catalog/pg_sequence.h"
#include "catalog/pg_subscription_rel.h"
#include "commands/sequence.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/logicalworker.h"
#include "replication/worker_internal.h"
#include "utils/acl.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"

#define REMOTE_SEQ_COL_COUNT 10

typedef enum CopySeqResult
{
	COPYSEQ_SUCCESS,
	COPYSEQ_MISMATCH,
	COPYSEQ_INSUFFICIENT_PERM,
	COPYSEQ_SKIPPED
} CopySeqResult;

static List *seqinfos = NIL;

/*
 * Apply worker determines if sequence synchronization is needed.
 *
 * Start a sequencesync worker if one is not already running. The active
 * sequencesync worker will handle all pending sequence synchronization. If any
 * sequences remain unsynchronized after it exits, a new worker can be started
 * in the next iteration.
 */
void
ProcessSequencesForSync(void)
{
	LogicalRepWorker *sequencesync_worker;
	int			nsyncworkers;
	bool		has_pending_sequences;
	bool		started_tx;

	FetchRelationStates(NULL, &has_pending_sequences, &started_tx);

	if (started_tx)
	{
		CommitTransactionCommand();
		pgstat_report_stat(true);
	}

	if (!has_pending_sequences)
		return;

	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);

	/* Check if there is a sequencesync worker already running? */
	sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
												 MyLogicalRepWorker->subid,
												 InvalidOid, true);
	if (sequencesync_worker)
	{
		LWLockRelease(LogicalRepWorkerLock);
		return;
	}

	/*
	 * Count running sync workers for this subscription, while we have the
	 * lock.
	 */
	nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
	LWLockRelease(LogicalRepWorkerLock);

	/*
	 * It is okay to read/update last_seqsync_start_time here in apply worker
	 * as we have already ensured that sync worker doesn't exist.
	 */
	launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
					   &MyLogicalRepWorker->last_seqsync_start_time);
}

/*
 * get_sequences_string
 *
 * Build a comma-separated string of schema-qualified sequence names
 * for the given list of sequence indexes.
 */
static void
get_sequences_string(List *seqindexes, StringInfo buf)
{
	resetStringInfo(buf);
	foreach_int(seqidx, seqindexes)
	{
		LogicalRepSequenceInfo *seqinfo =
			(LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);

		if (buf->len > 0)
			appendStringInfoString(buf, ", ");

		appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
	}
}

/*
 * report_sequence_errors
 *
 * Report discrepancies found during sequence synchronization between
 * the publisher and subscriber. Emits warnings for:
 * a) mismatched definitions or concurrent rename
 * b) insufficient privileges
 * c) missing sequences on the subscriber
 * Then raises an ERROR to indicate synchronization failure.
 */
static void
report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
					   List *missing_seqs_idx)
{
	StringInfo	seqstr;

	/* Quick exit if there are no errors to report */
	if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
		return;

	seqstr = makeStringInfo();

	if (mismatched_seqs_idx)
	{
		get_sequences_string(mismatched_seqs_idx, seqstr);
		ereport(WARNING,
				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
							  "mismatched or renamed sequences on subscriber (%s)",
							  list_length(mismatched_seqs_idx),
							  seqstr->data));
	}

	if (insuffperm_seqs_idx)
	{
		get_sequences_string(insuffperm_seqs_idx, seqstr);
		ereport(WARNING,
				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				errmsg_plural("insufficient privileges on sequence (%s)",
							  "insufficient privileges on sequences (%s)",
							  list_length(insuffperm_seqs_idx),
							  seqstr->data));
	}

	if (missing_seqs_idx)
	{
		get_sequences_string(missing_seqs_idx, seqstr);
		ereport(WARNING,
				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
				errmsg_plural("missing sequence on publisher (%s)",
							  "missing sequences on publisher (%s)",
							  list_length(missing_seqs_idx),
							  seqstr->data));
	}

	ereport(ERROR,
			errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
			errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
				   MySubscription->name));
}

/*
 * get_and_validate_seq_info
 *
 * Extracts remote sequence information from the tuple slot received from the
 * publisher, and validates it against the corresponding local sequence
 * definition.
 */
static CopySeqResult
get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
						  LogicalRepSequenceInfo **seqinfo, int *seqidx)
{
	bool		isnull;
	int			col = 0;
	Oid			remote_typid;
	int64		remote_start;
	int64		remote_increment;
	int64		remote_min;
	int64		remote_max;
	bool		remote_cycle;
	CopySeqResult result = COPYSEQ_SUCCESS;
	HeapTuple	tup;
	Form_pg_sequence local_seq;
	LogicalRepSequenceInfo *seqinfo_local;

	*seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
	Assert(!isnull);

	/* Identify the corresponding local sequence for the given index. */
	*seqinfo = seqinfo_local =
		(LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);

	seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
	Assert(!isnull);

	seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
	Assert(!isnull);

	seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
	Assert(!isnull);

	remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
	Assert(!isnull);

	remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
	Assert(!isnull);

	remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
	Assert(!isnull);

	remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
	Assert(!isnull);

	remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
	Assert(!isnull);

	remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
	Assert(!isnull);

	/* Sanity check */
	Assert(col == REMOTE_SEQ_COL_COUNT);

	seqinfo_local->found_on_pub = true;

	*sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);

	/* Sequence was concurrently dropped? */
	if (!*sequence_rel)
		return COPYSEQ_SKIPPED;

	tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));

	/* Sequence was concurrently dropped? */
	if (!HeapTupleIsValid(tup))
		elog(ERROR, "cache lookup failed for sequence %u",
			 seqinfo_local->localrelid);

	local_seq = (Form_pg_sequence) GETSTRUCT(tup);

	/* Sequence parameters for remote/local are the same? */
	if (local_seq->seqtypid != remote_typid ||
		local_seq->seqstart != remote_start ||
		local_seq->seqincrement != remote_increment ||
		local_seq->seqmin != remote_min ||
		local_seq->seqmax != remote_max ||
		local_seq->seqcycle != remote_cycle)
		result = COPYSEQ_MISMATCH;

	/* Sequence was concurrently renamed? */
	if (strcmp(seqinfo_local->nspname,
			   get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
		strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
		result = COPYSEQ_MISMATCH;

	ReleaseSysCache(tup);
	return result;
}

/*
 * Apply remote sequence state to local sequence and mark it as
 * synchronized (READY).
 */
static CopySeqResult
copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
{
	UserContext ucxt;
	AclResult	aclresult;
	bool		run_as_owner = MySubscription->runasowner;
	Oid			seqoid = seqinfo->localrelid;

	/*
	 * If the user did not opt to run as the owner of the subscription
	 * ('run_as_owner'), then copy the sequence as the owner of the sequence.
	 */
	if (!run_as_owner)
		SwitchToUntrustedUser(seqowner, &ucxt);

	aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);

	if (aclresult != ACLCHECK_OK)
	{
		if (!run_as_owner)
			RestoreUserContext(&ucxt);

		return COPYSEQ_INSUFFICIENT_PERM;
	}

	/*
	 * The log counter (log_cnt) tracks how many sequence values are still
	 * unused locally. It is only relevant to the local node and managed
	 * internally by nextval() when allocating new ranges. Since log_cnt does
	 * not affect the visible sequence state (like last_value or is_called)
	 * and is only used for local caching, it need not be copied to the
	 * subscriber during synchronization.
	 */
	SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);

	if (!run_as_owner)
		RestoreUserContext(&ucxt);

	/*
	 * Record the remote sequence's LSN in pg_subscription_rel and mark the
	 * sequence as READY.
	 */
	UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
							   seqinfo->page_lsn, false);

	return COPYSEQ_SUCCESS;
}

/*
 * Copy existing data of sequences from the publisher.
 */
static void
copy_sequences(WalReceiverConn *conn)
{
	int			cur_batch_base_index = 0;
	int			n_seqinfos = list_length(seqinfos);
	List	   *mismatched_seqs_idx = NIL;
	List	   *missing_seqs_idx = NIL;
	List	   *insuffperm_seqs_idx = NIL;
	StringInfo	seqstr = makeStringInfo();
	StringInfo	cmd = makeStringInfo();
	MemoryContext oldctx;

#define MAX_SEQUENCES_SYNC_PER_BATCH 100

	elog(DEBUG1,
		 "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
		 MySubscription->name, n_seqinfos);

	while (cur_batch_base_index < n_seqinfos)
	{
		Oid			seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
		BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
		int			batch_size = 0;
		int			batch_succeeded_count = 0;
		int			batch_mismatched_count = 0;
		int			batch_skipped_count = 0;
		int			batch_insuffperm_count = 0;
		int			batch_missing_count;
		Relation	sequence_rel;

		WalRcvExecResult *res;
		TupleTableSlot *slot;

		StartTransactionCommand();

		for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
		{
			LogicalRepSequenceInfo *seqinfo =
				(LogicalRepSequenceInfo *) list_nth(seqinfos, idx);

			if (seqstr->len > 0)
				appendStringInfoString(seqstr, ", ");

			appendStringInfo(seqstr, "(\'%s\', \'%s\', %d)",
							 seqinfo->nspname, seqinfo->seqname, idx);

			if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
				break;
		}

		/*
		 * We deliberately avoid acquiring a local lock on the sequence before
		 * querying the publisher to prevent potential distributed deadlocks
		 * in bi-directional replication setups.
		 *
		 * Example scenario:
		 *
		 * - On each node, a background worker acquires a lock on a sequence
		 * as part of a sync operation.
		 *
		 * - Concurrently, a user transaction attempts to alter the same
		 * sequence, waiting on the background worker's lock.
		 *
		 * - Meanwhile, a query from the other node tries to access metadata
		 * that depends on the completion of the alter operation.
		 *
		 * - This creates a circular wait across nodes:
		 *
		 * Node-1: Query -> waits on Alter -> waits on Sync Worker
		 *
		 * Node-2: Query -> waits on Alter -> waits on Sync Worker
		 *
		 * Since each node only sees part of the wait graph, the deadlock may
		 * go undetected, leading to indefinite blocking.
		 *
		 * Note: Each entry in VALUES includes an index 'seqidx' that
		 * represents the sequence's position in the local 'seqinfos' list.
		 * This index is propagated to the query results and later used to
		 * directly map the fetched publisher sequence rows back to their
		 * corresponding local entries without relying on result order or name
		 * matching.
		 */
		appendStringInfo(cmd,
						 "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
						 "       seq.seqstart, seq.seqincrement, seq.seqmin,\n"
						 "       seq.seqmax, seq.seqcycle\n"
						 "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
						 "JOIN pg_namespace n ON n.nspname = s.schname\n"
						 "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
						 "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
						 "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
						 seqstr->data);

		res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
		if (res->status != WALRCV_OK_TUPLES)
			ereport(ERROR,
					errcode(ERRCODE_CONNECTION_FAILURE),
					errmsg("could not fetch sequence information from the publisher: %s",
						   res->err));

		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
		while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
		{
			CopySeqResult sync_status;
			LogicalRepSequenceInfo *seqinfo;
			int			seqidx;

			CHECK_FOR_INTERRUPTS();

			if (ConfigReloadPending)
			{
				ConfigReloadPending = false;
				ProcessConfigFile(PGC_SIGHUP);
			}

			sync_status = get_and_validate_seq_info(slot, &sequence_rel,
													&seqinfo, &seqidx);
			if (sync_status == COPYSEQ_SUCCESS)
				sync_status = copy_sequence(seqinfo,
											sequence_rel->rd_rel->relowner);

			switch (sync_status)
			{
				case COPYSEQ_SUCCESS:
					elog(DEBUG1,
						 "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
						 MySubscription->name, seqinfo->nspname,
						 seqinfo->seqname);
					batch_succeeded_count++;
					break;
				case COPYSEQ_MISMATCH:

					/*
					 * Remember mismatched sequences in a long-lived memory
					 * context since these will be used after the transaction
					 * is committed.
					 */
					oldctx = MemoryContextSwitchTo(ApplyContext);
					mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
													  seqidx);
					MemoryContextSwitchTo(oldctx);
					batch_mismatched_count++;
					break;
				case COPYSEQ_INSUFFICIENT_PERM:

					/*
					 * Remember sequences with insufficient privileges in a
					 * long-lived memory context since these will be used
					 * after the transaction is committed.
					 */
					oldctx = MemoryContextSwitchTo(ApplyContext);
					insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
													  seqidx);
					MemoryContextSwitchTo(oldctx);
					batch_insuffperm_count++;
					break;
				case COPYSEQ_SKIPPED:
					ereport(LOG,
							errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
								   seqinfo->nspname,
								   seqinfo->seqname));
					batch_skipped_count++;
					break;
			}

			if (sequence_rel)
				table_close(sequence_rel, NoLock);
		}

		ExecDropSingleTupleTableSlot(slot);
		walrcv_clear_result(res);
		resetStringInfo(seqstr);
		resetStringInfo(cmd);

		batch_missing_count = batch_size - (batch_succeeded_count +
											batch_mismatched_count +
											batch_insuffperm_count +
											batch_skipped_count);

		elog(DEBUG1,
			 "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
			 MySubscription->name,
			 (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
			 batch_size, batch_succeeded_count, batch_mismatched_count,
			 batch_insuffperm_count, batch_missing_count, batch_skipped_count);

		/* Commit this batch, and prepare for next batch */
		CommitTransactionCommand();

		if (batch_missing_count)
		{
			for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
			{
				LogicalRepSequenceInfo *seqinfo =
					(LogicalRepSequenceInfo *) list_nth(seqinfos, idx);

				/* If the sequence was not found on publisher, record it */
				if (!seqinfo->found_on_pub)
					missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
			}
		}

		/*
		 * cur_batch_base_index is not incremented sequentially because some
		 * sequences may be missing, and the number of fetched rows may not
		 * match the batch size.
		 */
		cur_batch_base_index += batch_size;
	}

	/* Report mismatches, permission issues, or missing sequences */
	report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
						   missing_seqs_idx);
}

/*
 * Identifies sequences that require synchronization and initiates the
 * synchronization process.
 */
static void
LogicalRepSyncSequences(void)
{
	char	   *err;
	bool		must_use_password;
	Relation	rel;
	HeapTuple	tup;
	ScanKeyData skey[2];
	SysScanDesc scan;
	Oid			subid = MyLogicalRepWorker->subid;
	StringInfoData app_name;

	StartTransactionCommand();

	rel = table_open(SubscriptionRelRelationId, AccessShareLock);

	ScanKeyInit(&skey[0],
				Anum_pg_subscription_rel_srsubid,
				BTEqualStrategyNumber, F_OIDEQ,
				ObjectIdGetDatum(subid));

	ScanKeyInit(&skey[1],
				Anum_pg_subscription_rel_srsubstate,
				BTEqualStrategyNumber, F_CHAREQ,
				CharGetDatum(SUBREL_STATE_INIT));

	scan = systable_beginscan(rel, InvalidOid, false,
							  NULL, 2, skey);
	while (HeapTupleIsValid(tup = systable_getnext(scan)))
	{
		Form_pg_subscription_rel subrel;
		LogicalRepSequenceInfo *seq;
		Relation	sequence_rel;
		MemoryContext oldctx;

		CHECK_FOR_INTERRUPTS();

		subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);

		sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);

		/* Skip if sequence was dropped concurrently */
		if (!sequence_rel)
			continue;

		/* Skip if the relation is not a sequence */
		if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
		{
			table_close(sequence_rel, NoLock);
			continue;
		}

		/*
		 * Worker needs to process sequences across transaction boundary, so
		 * allocate them under long-lived context.
		 */
		oldctx = MemoryContextSwitchTo(ApplyContext);

		seq = palloc0_object(LogicalRepSequenceInfo);
		seq->localrelid = subrel->srrelid;
		seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
		seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
		seqinfos = lappend(seqinfos, seq);

		MemoryContextSwitchTo(oldctx);

		table_close(sequence_rel, NoLock);
	}

	/* Cleanup */
	systable_endscan(scan);
	table_close(rel, AccessShareLock);

	CommitTransactionCommand();

	/*
	 * Exit early if no catalog entries found, likely due to concurrent drops.
	 */
	if (!seqinfos)
		return;

	/* Is the use of a password mandatory? */
	must_use_password = MySubscription->passwordrequired &&
		!MySubscription->ownersuperuser;

	initStringInfo(&app_name);
	appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
					 MySubscription->oid, GetSystemIdentifier());

	/*
	 * Establish the connection to the publisher for sequence synchronization.
	 */
	LogRepWorkerWalRcvConn =
		walrcv_connect(MySubscription->conninfo, true, true,
					   must_use_password,
					   app_name.data, &err);
	if (LogRepWorkerWalRcvConn == NULL)
		ereport(ERROR,
				errcode(ERRCODE_CONNECTION_FAILURE),
				errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
					   MySubscription->name, err));

	pfree(app_name.data);

	copy_sequences(LogRepWorkerWalRcvConn);
}

/*
 * Execute the initial sync with error handling. Disable the subscription,
 * if required.
 *
 * Note that we don't handle FATAL errors which are probably because of system
 * resource error and are not repeatable.
 */
static void
start_sequence_sync()
{
	Assert(am_sequencesync_worker());

	PG_TRY();
	{
		/* Call initial sync. */
		LogicalRepSyncSequences();
	}
	PG_CATCH();
	{
		if (MySubscription->disableonerr)
			DisableSubscriptionAndExit();
		else
		{
			/*
			 * Report the worker failed during sequence synchronization. Abort
			 * the current transaction so that the stats message is sent in an
			 * idle state.
			 */
			AbortOutOfAnyTransaction();
			PG_RE_THROW();
		}
	}
	PG_END_TRY();
}

/* Logical Replication sequencesync worker entry point */
void
SequenceSyncWorkerMain(Datum main_arg)
{
	int			worker_slot = DatumGetInt32(main_arg);

	SetupApplyOrSyncWorker(worker_slot);

	start_sequence_sync();

	FinishSyncWorker();
}