summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/sequencesync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/sequencesync.c')
-rw-r--r--src/backend/replication/logical/sequencesync.c745
1 files changed, 745 insertions, 0 deletions
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
new file mode 100644
index 00000000000..717c82328f2
--- /dev/null
+++ b/src/backend/replication/logical/sequencesync.c
@@ -0,0 +1,745 @@
+/*-------------------------------------------------------------------------
+ * sequencesync.c
+ * PostgreSQL logical replication: sequence synchronization
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/replication/logical/sequencesync.c
+ *
+ * NOTES
+ * This file contains code for sequence synchronization for
+ * logical replication.
+ *
+ * Sequences requiring synchronization are tracked in the pg_subscription_rel
+ * catalog.
+ *
+ * Sequences to be synchronized will be added with state INIT when either of
+ * the following commands is executed:
+ * CREATE SUBSCRIPTION
+ * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
+ *
+ * Executing the following command resets all sequences in the subscription to
+ * state INIT, triggering re-synchronization:
+ * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
+ *
+ * The apply worker periodically scans pg_subscription_rel for sequences in
+ * INIT state. When such sequences are found, it spawns a sequencesync worker
+ * to handle synchronization.
+ *
+ * A single sequencesync worker is responsible for synchronizing all sequences.
+ * It begins by retrieving the list of sequences that are flagged for
+ * synchronization, i.e., those in the INIT state. These sequences are then
+ * processed in batches, allowing multiple entries to be synchronized within a
+ * single transaction. The worker fetches the current sequence values and page
+ * LSNs from the remote publisher, updates the corresponding sequences on the
+ * local subscriber, and finally marks each sequence as READY upon successful
+ * synchronization.
+ *
+ * Sequence state transitions follow this pattern:
+ * INIT -> READY
+ *
+ * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
+ * sequences are synchronized per transaction. The locks on the sequence
+ * relation will be periodically released at each transaction commit.
+ *
+ * XXX: We didn't choose launcher process to maintain the launch of sequencesync
+ * worker as it didn't have database connection to access the sequences from the
+ * pg_subscription_rel system catalog that need to be synchronized.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/table.h"
+#include "catalog/pg_sequence.h"
+#include "catalog/pg_subscription_rel.h"
+#include "commands/sequence.h"
+#include "pgstat.h"
+#include "postmaster/interrupt.h"
+#include "replication/logicalworker.h"
+#include "replication/worker_internal.h"
+#include "utils/acl.h"
+#include "utils/fmgroids.h"
+#include "utils/guc.h"
+#include "utils/inval.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/pg_lsn.h"
+#include "utils/syscache.h"
+#include "utils/usercontext.h"
+
+#define REMOTE_SEQ_COL_COUNT 10
+
+typedef enum CopySeqResult
+{
+ COPYSEQ_SUCCESS,
+ COPYSEQ_MISMATCH,
+ COPYSEQ_INSUFFICIENT_PERM,
+ COPYSEQ_SKIPPED
+} CopySeqResult;
+
+static List *seqinfos = NIL;
+
+/*
+ * Apply worker determines if sequence synchronization is needed.
+ *
+ * Start a sequencesync worker if one is not already running. The active
+ * sequencesync worker will handle all pending sequence synchronization. If any
+ * sequences remain unsynchronized after it exits, a new worker can be started
+ * in the next iteration.
+ */
+void
+ProcessSequencesForSync(void)
+{
+ LogicalRepWorker *sequencesync_worker;
+ int nsyncworkers;
+ bool has_pending_sequences;
+ bool started_tx;
+
+ FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
+
+ if (started_tx)
+ {
+ CommitTransactionCommand();
+ pgstat_report_stat(true);
+ }
+
+ if (!has_pending_sequences)
+ return;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ /* Check if there is a sequencesync worker already running? */
+ sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
+ MyLogicalRepWorker->subid,
+ InvalidOid, true);
+ if (sequencesync_worker)
+ {
+ LWLockRelease(LogicalRepWorkerLock);
+ return;
+ }
+
+ /*
+ * Count running sync workers for this subscription, while we have the
+ * lock.
+ */
+ nsyncworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid);
+ LWLockRelease(LogicalRepWorkerLock);
+
+ /*
+ * It is okay to read/update last_seqsync_start_time here in apply worker
+ * as we have already ensured that sync worker doesn't exist.
+ */
+ launch_sync_worker(WORKERTYPE_SEQUENCESYNC, nsyncworkers, InvalidOid,
+ &MyLogicalRepWorker->last_seqsync_start_time);
+}
+
+/*
+ * get_sequences_string
+ *
+ * Build a comma-separated string of schema-qualified sequence names
+ * for the given list of sequence indexes.
+ */
+static void
+get_sequences_string(List *seqindexes, StringInfo buf)
+{
+ resetStringInfo(buf);
+ foreach_int(seqidx, seqindexes)
+ {
+ LogicalRepSequenceInfo *seqinfo =
+ (LogicalRepSequenceInfo *) list_nth(seqinfos, seqidx);
+
+ if (buf->len > 0)
+ appendStringInfoString(buf, ", ");
+
+ appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
+ }
+}
+
+/*
+ * report_sequence_errors
+ *
+ * Report discrepancies found during sequence synchronization between
+ * the publisher and subscriber. Emits warnings for:
+ * a) mismatched definitions or concurrent rename
+ * b) insufficient privileges
+ * c) missing sequences on the subscriber
+ * Then raises an ERROR to indicate synchronization failure.
+ */
+static void
+report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
+ List *missing_seqs_idx)
+{
+ StringInfo seqstr;
+
+ /* Quick exit if there are no errors to report */
+ if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
+ return;
+
+ seqstr = makeStringInfo();
+
+ if (mismatched_seqs_idx)
+ {
+ get_sequences_string(mismatched_seqs_idx, seqstr);
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
+ "mismatched or renamed sequences on subscriber (%s)",
+ list_length(mismatched_seqs_idx),
+ seqstr->data));
+ }
+
+ if (insuffperm_seqs_idx)
+ {
+ get_sequences_string(insuffperm_seqs_idx, seqstr);
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg_plural("insufficient privileges on sequence (%s)",
+ "insufficient privileges on sequences (%s)",
+ list_length(insuffperm_seqs_idx),
+ seqstr->data));
+ }
+
+ if (missing_seqs_idx)
+ {
+ get_sequences_string(missing_seqs_idx, seqstr);
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg_plural("missing sequence on publisher (%s)",
+ "missing sequences on publisher (%s)",
+ list_length(missing_seqs_idx),
+ seqstr->data));
+ }
+
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
+ MySubscription->name));
+}
+
+/*
+ * get_and_validate_seq_info
+ *
+ * Extracts remote sequence information from the tuple slot received from the
+ * publisher, and validates it against the corresponding local sequence
+ * definition.
+ */
+static CopySeqResult
+get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
+ LogicalRepSequenceInfo **seqinfo, int *seqidx)
+{
+ bool isnull;
+ int col = 0;
+ Oid remote_typid;
+ int64 remote_start;
+ int64 remote_increment;
+ int64 remote_min;
+ int64 remote_max;
+ bool remote_cycle;
+ CopySeqResult result = COPYSEQ_SUCCESS;
+ HeapTuple tup;
+ Form_pg_sequence local_seq;
+ LogicalRepSequenceInfo *seqinfo_local;
+
+ *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ /* Identify the corresponding local sequence for the given index. */
+ *seqinfo = seqinfo_local =
+ (LogicalRepSequenceInfo *) list_nth(seqinfos, *seqidx);
+
+ seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
+ Assert(!isnull);
+
+ /* Sanity check */
+ Assert(col == REMOTE_SEQ_COL_COUNT);
+
+ seqinfo_local->found_on_pub = true;
+
+ *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
+
+ /* Sequence was concurrently dropped? */
+ if (!*sequence_rel)
+ return COPYSEQ_SKIPPED;
+
+ tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
+
+ /* Sequence was concurrently dropped? */
+ if (!HeapTupleIsValid(tup))
+ elog(ERROR, "cache lookup failed for sequence %u",
+ seqinfo_local->localrelid);
+
+ local_seq = (Form_pg_sequence) GETSTRUCT(tup);
+
+ /* Sequence parameters for remote/local are the same? */
+ if (local_seq->seqtypid != remote_typid ||
+ local_seq->seqstart != remote_start ||
+ local_seq->seqincrement != remote_increment ||
+ local_seq->seqmin != remote_min ||
+ local_seq->seqmax != remote_max ||
+ local_seq->seqcycle != remote_cycle)
+ result = COPYSEQ_MISMATCH;
+
+ /* Sequence was concurrently renamed? */
+ if (strcmp(seqinfo_local->nspname,
+ get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
+ strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
+ result = COPYSEQ_MISMATCH;
+
+ ReleaseSysCache(tup);
+ return result;
+}
+
+/*
+ * Apply remote sequence state to local sequence and mark it as
+ * synchronized (READY).
+ */
+static CopySeqResult
+copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
+{
+ UserContext ucxt;
+ AclResult aclresult;
+ bool run_as_owner = MySubscription->runasowner;
+ Oid seqoid = seqinfo->localrelid;
+
+ /*
+ * If the user did not opt to run as the owner of the subscription
+ * ('run_as_owner'), then copy the sequence as the owner of the sequence.
+ */
+ if (!run_as_owner)
+ SwitchToUntrustedUser(seqowner, &ucxt);
+
+ aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
+
+ if (aclresult != ACLCHECK_OK)
+ {
+ if (!run_as_owner)
+ RestoreUserContext(&ucxt);
+
+ return COPYSEQ_INSUFFICIENT_PERM;
+ }
+
+ /*
+ * The log counter (log_cnt) tracks how many sequence values are still
+ * unused locally. It is only relevant to the local node and managed
+ * internally by nextval() when allocating new ranges. Since log_cnt does
+ * not affect the visible sequence state (like last_value or is_called)
+ * and is only used for local caching, it need not be copied to the
+ * subscriber during synchronization.
+ */
+ SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
+
+ if (!run_as_owner)
+ RestoreUserContext(&ucxt);
+
+ /*
+ * Record the remote sequence's LSN in pg_subscription_rel and mark the
+ * sequence as READY.
+ */
+ UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
+ seqinfo->page_lsn, false);
+
+ return COPYSEQ_SUCCESS;
+}
+
+/*
+ * Copy existing data of sequences from the publisher.
+ */
+static void
+copy_sequences(WalReceiverConn *conn)
+{
+ int cur_batch_base_index = 0;
+ int n_seqinfos = list_length(seqinfos);
+ List *mismatched_seqs_idx = NIL;
+ List *missing_seqs_idx = NIL;
+ List *insuffperm_seqs_idx = NIL;
+ StringInfo seqstr = makeStringInfo();
+ StringInfo cmd = makeStringInfo();
+ MemoryContext oldctx;
+
+#define MAX_SEQUENCES_SYNC_PER_BATCH 100
+
+ elog(DEBUG1,
+ "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
+ MySubscription->name, n_seqinfos);
+
+ while (cur_batch_base_index < n_seqinfos)
+ {
+ Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
+ BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
+ int batch_size = 0;
+ int batch_succeeded_count = 0;
+ int batch_mismatched_count = 0;
+ int batch_skipped_count = 0;
+ int batch_insuffperm_count = 0;
+ int batch_missing_count;
+ Relation sequence_rel;
+
+ WalRcvExecResult *res;
+ TupleTableSlot *slot;
+
+ StartTransactionCommand();
+
+ for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
+ {
+ LogicalRepSequenceInfo *seqinfo =
+ (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
+
+ if (seqstr->len > 0)
+ appendStringInfoString(seqstr, ", ");
+
+ appendStringInfo(seqstr, "(\'%s\', \'%s\', %d)",
+ seqinfo->nspname, seqinfo->seqname, idx);
+
+ if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
+ break;
+ }
+
+ /*
+ * We deliberately avoid acquiring a local lock on the sequence before
+ * querying the publisher to prevent potential distributed deadlocks
+ * in bi-directional replication setups.
+ *
+ * Example scenario:
+ *
+ * - On each node, a background worker acquires a lock on a sequence
+ * as part of a sync operation.
+ *
+ * - Concurrently, a user transaction attempts to alter the same
+ * sequence, waiting on the background worker's lock.
+ *
+ * - Meanwhile, a query from the other node tries to access metadata
+ * that depends on the completion of the alter operation.
+ *
+ * - This creates a circular wait across nodes:
+ *
+ * Node-1: Query -> waits on Alter -> waits on Sync Worker
+ *
+ * Node-2: Query -> waits on Alter -> waits on Sync Worker
+ *
+ * Since each node only sees part of the wait graph, the deadlock may
+ * go undetected, leading to indefinite blocking.
+ *
+ * Note: Each entry in VALUES includes an index 'seqidx' that
+ * represents the sequence's position in the local 'seqinfos' list.
+ * This index is propagated to the query results and later used to
+ * directly map the fetched publisher sequence rows back to their
+ * corresponding local entries without relying on result order or name
+ * matching.
+ */
+ appendStringInfo(cmd,
+ "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
+ " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
+ " seq.seqmax, seq.seqcycle\n"
+ "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
+ "JOIN pg_namespace n ON n.nspname = s.schname\n"
+ "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
+ "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
+ "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
+ seqstr->data);
+
+ res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not fetch sequence information from the publisher: %s",
+ res->err));
+
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ CopySeqResult sync_status;
+ LogicalRepSequenceInfo *seqinfo;
+ int seqidx;
+
+ CHECK_FOR_INTERRUPTS();
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ sync_status = get_and_validate_seq_info(slot, &sequence_rel,
+ &seqinfo, &seqidx);
+ if (sync_status == COPYSEQ_SUCCESS)
+ sync_status = copy_sequence(seqinfo,
+ sequence_rel->rd_rel->relowner);
+
+ switch (sync_status)
+ {
+ case COPYSEQ_SUCCESS:
+ elog(DEBUG1,
+ "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
+ MySubscription->name, seqinfo->nspname,
+ seqinfo->seqname);
+ batch_succeeded_count++;
+ break;
+ case COPYSEQ_MISMATCH:
+
+ /*
+ * Remember mismatched sequences in a long-lived memory
+ * context since these will be used after the transaction
+ * is committed.
+ */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
+ seqidx);
+ MemoryContextSwitchTo(oldctx);
+ batch_mismatched_count++;
+ break;
+ case COPYSEQ_INSUFFICIENT_PERM:
+
+ /*
+ * Remember sequences with insufficient privileges in a
+ * long-lived memory context since these will be used
+ * after the transaction is committed.
+ */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
+ seqidx);
+ MemoryContextSwitchTo(oldctx);
+ batch_insuffperm_count++;
+ break;
+ case COPYSEQ_SKIPPED:
+ ereport(LOG,
+ errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
+ seqinfo->nspname,
+ seqinfo->seqname));
+ batch_skipped_count++;
+ break;
+ }
+
+ if (sequence_rel)
+ table_close(sequence_rel, NoLock);
+ }
+
+ ExecDropSingleTupleTableSlot(slot);
+ walrcv_clear_result(res);
+ resetStringInfo(seqstr);
+ resetStringInfo(cmd);
+
+ batch_missing_count = batch_size - (batch_succeeded_count +
+ batch_mismatched_count +
+ batch_insuffperm_count +
+ batch_skipped_count);
+
+ elog(DEBUG1,
+ "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
+ MySubscription->name,
+ (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
+ batch_size, batch_succeeded_count, batch_mismatched_count,
+ batch_insuffperm_count, batch_missing_count, batch_skipped_count);
+
+ /* Commit this batch, and prepare for next batch */
+ CommitTransactionCommand();
+
+ if (batch_missing_count)
+ {
+ for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
+ {
+ LogicalRepSequenceInfo *seqinfo =
+ (LogicalRepSequenceInfo *) list_nth(seqinfos, idx);
+
+ /* If the sequence was not found on publisher, record it */
+ if (!seqinfo->found_on_pub)
+ missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
+ }
+ }
+
+ /*
+ * cur_batch_base_index is not incremented sequentially because some
+ * sequences may be missing, and the number of fetched rows may not
+ * match the batch size.
+ */
+ cur_batch_base_index += batch_size;
+ }
+
+ /* Report mismatches, permission issues, or missing sequences */
+ report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
+ missing_seqs_idx);
+}
+
+/*
+ * Identifies sequences that require synchronization and initiates the
+ * synchronization process.
+ */
+static void
+LogicalRepSyncSequences(void)
+{
+ char *err;
+ bool must_use_password;
+ Relation rel;
+ HeapTuple tup;
+ ScanKeyData skey[2];
+ SysScanDesc scan;
+ Oid subid = MyLogicalRepWorker->subid;
+ StringInfoData app_name;
+
+ StartTransactionCommand();
+
+ rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+
+ ScanKeyInit(&skey[0],
+ Anum_pg_subscription_rel_srsubid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(subid));
+
+ ScanKeyInit(&skey[1],
+ Anum_pg_subscription_rel_srsubstate,
+ BTEqualStrategyNumber, F_CHAREQ,
+ CharGetDatum(SUBREL_STATE_INIT));
+
+ scan = systable_beginscan(rel, InvalidOid, false,
+ NULL, 2, skey);
+ while (HeapTupleIsValid(tup = systable_getnext(scan)))
+ {
+ Form_pg_subscription_rel subrel;
+ LogicalRepSequenceInfo *seq;
+ Relation sequence_rel;
+ MemoryContext oldctx;
+
+ CHECK_FOR_INTERRUPTS();
+
+ subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+
+ sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
+
+ /* Skip if sequence was dropped concurrently */
+ if (!sequence_rel)
+ continue;
+
+ /* Skip if the relation is not a sequence */
+ if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
+ {
+ table_close(sequence_rel, NoLock);
+ continue;
+ }
+
+ /*
+ * Worker needs to process sequences across transaction boundary, so
+ * allocate them under long-lived context.
+ */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+
+ seq = palloc0_object(LogicalRepSequenceInfo);
+ seq->localrelid = subrel->srrelid;
+ seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
+ seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
+ seqinfos = lappend(seqinfos, seq);
+
+ MemoryContextSwitchTo(oldctx);
+
+ table_close(sequence_rel, NoLock);
+ }
+
+ /* Cleanup */
+ systable_endscan(scan);
+ table_close(rel, AccessShareLock);
+
+ CommitTransactionCommand();
+
+ /*
+ * Exit early if no catalog entries found, likely due to concurrent drops.
+ */
+ if (!seqinfos)
+ return;
+
+ /* Is the use of a password mandatory? */
+ must_use_password = MySubscription->passwordrequired &&
+ !MySubscription->ownersuperuser;
+
+ initStringInfo(&app_name);
+ appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
+ MySubscription->oid, GetSystemIdentifier());
+
+ /*
+ * Establish the connection to the publisher for sequence synchronization.
+ */
+ LogRepWorkerWalRcvConn =
+ walrcv_connect(MySubscription->conninfo, true, true,
+ must_use_password,
+ app_name.data, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
+ ereport(ERROR,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
+ MySubscription->name, err));
+
+ pfree(app_name.data);
+
+ copy_sequences(LogRepWorkerWalRcvConn);
+}
+
+/*
+ * Execute the initial sync with error handling. Disable the subscription,
+ * if required.
+ *
+ * Note that we don't handle FATAL errors which are probably because of system
+ * resource error and are not repeatable.
+ */
+static void
+start_sequence_sync()
+{
+ Assert(am_sequencesync_worker());
+
+ PG_TRY();
+ {
+ /* Call initial sync. */
+ LogicalRepSyncSequences();
+ }
+ PG_CATCH();
+ {
+ if (MySubscription->disableonerr)
+ DisableSubscriptionAndExit();
+ else
+ {
+ /*
+ * Report the worker failed during sequence synchronization. Abort
+ * the current transaction so that the stats message is sent in an
+ * idle state.
+ */
+ AbortOutOfAnyTransaction();
+ PG_RE_THROW();
+ }
+ }
+ PG_END_TRY();
+}
+
+/* Logical Replication sequencesync worker entry point */
+void
+SequenceSyncWorkerMain(Datum main_arg)
+{
+ int worker_slot = DatumGetInt32(main_arg);
+
+ SetupApplyOrSyncWorker(worker_slot);
+
+ start_sequence_sync();
+
+ FinishSyncWorker();
+}