summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2025-11-05 05:54:25 +0000
committerAmit Kapila <akapila@postgresql.org>2025-11-05 05:59:58 +0000
commit5509055d6956745532e65ab218e15b99d87d66ce (patch)
tree6d3776b6e028dbe5a465657395c5c624b0b21a75 /src/backend/replication/logical/worker.c
parent1fd981f05369340a8afa4d013a350b0b2ac6e33e (diff)
Add sequence synchronization for logical replication.
This patch introduces sequence synchronization. Sequences that are synced will have 2 states: - INIT (needs [re]synchronizing) - READY (is already synchronized) A new sequencesync worker is launched as needed to synchronize sequences. A single sequencesync worker is responsible for synchronizing all sequences. It begins by retrieving the list of sequences that are flagged for synchronization, i.e., those in the INIT state. These sequences are then processed in batches, allowing multiple entries to be synchronized within a single transaction. The worker fetches the current sequence values and page LSNs from the remote publisher, updates the corresponding sequences on the local subscriber, and finally marks each sequence as READY upon successful synchronization. Sequence synchronization occurs in 3 places: 1) CREATE SUBSCRIPTION - The command syntax remains unchanged. - The subscriber retrieves sequences associated with publications. - Published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker to synchronize all sequences. 2) ALTER SUBSCRIPTION ... REFRESH PUBLICATION - The command syntax remains unchanged. - Dropped published sequences are removed from pg_subscription_rel. - Newly published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker to synchronize only newly added sequences. 3) ALTER SUBSCRIPTION ... REFRESH SEQUENCES - A new command introduced for PG19 by f0b3573c3a. - All sequences in pg_subscription_rel are reset to INIT state. - Initiate the sequencesync worker to synchronize all sequences. - Unlike "ALTER SUBSCRIPTION ... REFRESH PUBLICATION" command, addition and removal of missing sequences will not be done in this case. Author: Vignesh C <vignesh21@gmail.com> Reviewed-by: shveta malik <shveta.malik@gmail.com> Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com> Reviewed-by: Peter Smith <smithpb2250@gmail.com> Reviewed-by: Nisha Moond <nisha.moond412@gmail.com> Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Chao Li <li.evan.chao@gmail.com> Discussion: https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c76
1 files changed, 57 insertions, 19 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7edd1c9cf06..e1c757c911e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -702,6 +702,11 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
(rel->state == SUBREL_STATE_SYNCDONE &&
rel->statelsn <= remote_final_lsn));
+ case WORKERTYPE_SEQUENCESYNC:
+ /* Should never happen. */
+ elog(ERROR, "sequence synchronization worker is not expected to apply changes");
+ break;
+
case WORKERTYPE_UNKNOWN:
/* Should never happen. */
elog(ERROR, "Unknown worker type");
@@ -1243,7 +1248,10 @@ apply_handle_commit(StringInfo s)
apply_handle_commit_internal(&commit_data);
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(commit_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
@@ -1365,7 +1373,10 @@ apply_handle_prepare(StringInfo s)
in_remote_transaction = false;
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(prepare_data.end_lsn);
/*
@@ -1421,7 +1432,10 @@ apply_handle_commit_prepared(StringInfo s)
store_flush_position(prepare_data.end_lsn, XactLastCommitEnd);
in_remote_transaction = false;
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(prepare_data.end_lsn);
clear_subscription_skip_lsn(prepare_data.end_lsn);
@@ -1487,7 +1501,10 @@ apply_handle_rollback_prepared(StringInfo s)
store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr);
in_remote_transaction = false;
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(rollback_data.rollback_end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
@@ -1622,7 +1639,10 @@ apply_handle_stream_prepare(StringInfo s)
pgstat_report_stat(false);
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(prepare_data.end_lsn);
/*
@@ -2465,7 +2485,10 @@ apply_handle_stream_commit(StringInfo s)
break;
}
- /* Process any tables that are being synchronized in parallel. */
+ /*
+ * Process any tables that are being synchronized in parallel, as well as
+ * any newly added tables or sequences.
+ */
ProcessSyncingRelations(commit_data.end_lsn);
pgstat_report_activity(STATE_IDLE, NULL);
@@ -4137,7 +4160,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
AcceptInvalidationMessages();
maybe_reread_subscription();
- /* Process any table synchronization changes. */
+ /*
+ * Process any relations that are being synchronized in parallel
+ * and any newly added tables or sequences.
+ */
ProcessSyncingRelations(last_received);
}
@@ -5700,8 +5726,8 @@ run_apply_worker()
}
/*
- * Common initialization for leader apply worker, parallel apply worker and
- * tablesync worker.
+ * Common initialization for leader apply worker, parallel apply worker,
+ * tablesync worker and sequencesync worker.
*
* Initialize the database connection, in-memory subscription and necessary
* config options.
@@ -5809,13 +5835,17 @@ InitializeLogRepWorker(void)
if (am_tablesync_worker())
ereport(LOG,
- (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
- MySubscription->name,
- get_rel_name(MyLogicalRepWorker->relid))));
+ errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid)));
+ else if (am_sequencesync_worker())
+ ereport(LOG,
+ errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started",
+ MySubscription->name));
else
ereport(LOG,
- (errmsg("logical replication apply worker for subscription \"%s\" has started",
- MySubscription->name)));
+ errmsg("logical replication apply worker for subscription \"%s\" has started",
+ MySubscription->name));
CommitTransactionCommand();
}
@@ -5831,14 +5861,16 @@ replorigin_reset(int code, Datum arg)
replorigin_session_origin_timestamp = 0;
}
-/* Common function to setup the leader apply or tablesync worker. */
+/*
+ * Common function to setup the leader apply, tablesync and sequencesync worker.
+ */
void
SetupApplyOrSyncWorker(int worker_slot)
{
/* Attach to slot */
logicalrep_worker_attach(worker_slot);
- Assert(am_tablesync_worker() || am_leader_apply_worker());
+ Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker());
/* Setup signal handling */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
@@ -5921,9 +5953,15 @@ DisableSubscriptionAndExit(void)
RESUME_INTERRUPTS();
- /* Report the worker failed during either table synchronization or apply */
- pgstat_report_subscription_error(MyLogicalRepWorker->subid,
- !am_tablesync_worker());
+ if (am_leader_apply_worker() || am_tablesync_worker())
+ {
+ /*
+ * Report the worker failed during either table synchronization or
+ * apply.
+ */
+ pgstat_report_subscription_error(MyLogicalRepWorker->subid,
+ !am_tablesync_worker());
+ }
/* Disable the subscription */
StartTransactionCommand();