diff options
| author | Amit Kapila <akapila@postgresql.org> | 2025-11-05 05:54:25 +0000 |
|---|---|---|
| committer | Amit Kapila <akapila@postgresql.org> | 2025-11-05 05:59:58 +0000 |
| commit | 5509055d6956745532e65ab218e15b99d87d66ce (patch) | |
| tree | 6d3776b6e028dbe5a465657395c5c624b0b21a75 /src/backend/replication/logical/worker.c | |
| parent | 1fd981f05369340a8afa4d013a350b0b2ac6e33e (diff) | |
Add sequence synchronization for logical replication.
This patch introduces sequence synchronization. Sequences that are synced
will have 2 states:
- INIT (needs [re]synchronizing)
- READY (is already synchronized)
A new sequencesync worker is launched as needed to synchronize sequences.
A single sequencesync worker is responsible for synchronizing all
sequences. It begins by retrieving the list of sequences that are flagged
for synchronization, i.e., those in the INIT state. These sequences are
then processed in batches, allowing multiple entries to be synchronized
within a single transaction. The worker fetches the current sequence
values and page LSNs from the remote publisher, updates the corresponding
sequences on the local subscriber, and finally marks each sequence as
READY upon successful synchronization.
Sequence synchronization occurs in 3 places:
1) CREATE SUBSCRIPTION
- The command syntax remains unchanged.
- The subscriber retrieves sequences associated with publications.
- Published sequences are added to pg_subscription_rel with INIT
state.
- Initiate the sequencesync worker to synchronize all sequences.
2) ALTER SUBSCRIPTION ... REFRESH PUBLICATION
- The command syntax remains unchanged.
- Dropped published sequences are removed from pg_subscription_rel.
- Newly published sequences are added to pg_subscription_rel with INIT
state.
- Initiate the sequencesync worker to synchronize only newly added
sequences.
3) ALTER SUBSCRIPTION ... REFRESH SEQUENCES
- A new command introduced for PG19 by f0b3573c3a.
- All sequences in pg_subscription_rel are reset to INIT state.
- Initiate the sequencesync worker to synchronize all sequences.
- Unlike "ALTER SUBSCRIPTION ... REFRESH PUBLICATION" command,
addition and removal of missing sequences will not be done in this
case.
Author: Vignesh C <vignesh21@gmail.com>
Reviewed-by: shveta malik <shveta.malik@gmail.com>
Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Nisha Moond <nisha.moond412@gmail.com>
Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Chao Li <li.evan.chao@gmail.com>
Discussion: https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/worker.c')
| -rw-r--r-- | src/backend/replication/logical/worker.c | 76 |
1 files changed, 57 insertions, 19 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7edd1c9cf06..e1c757c911e 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -702,6 +702,11 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) (rel->state == SUBREL_STATE_SYNCDONE && rel->statelsn <= remote_final_lsn)); + case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ + elog(ERROR, "sequence synchronization worker is not expected to apply changes"); + break; + case WORKERTYPE_UNKNOWN: /* Should never happen. */ elog(ERROR, "Unknown worker type"); @@ -1243,7 +1248,10 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -1365,7 +1373,10 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(prepare_data.end_lsn); /* @@ -1421,7 +1432,10 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(prepare_data.end_lsn); clear_subscription_skip_lsn(prepare_data.end_lsn); @@ -1487,7 +1501,10 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn, InvalidXLogRecPtr); in_remote_transaction = false; - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(rollback_data.rollback_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -1622,7 +1639,10 @@ apply_handle_stream_prepare(StringInfo s) pgstat_report_stat(false); - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(prepare_data.end_lsn); /* @@ -2465,7 +2485,10 @@ apply_handle_stream_commit(StringInfo s) break; } - /* Process any tables that are being synchronized in parallel. */ + /* + * Process any tables that are being synchronized in parallel, as well as + * any newly added tables or sequences. + */ ProcessSyncingRelations(commit_data.end_lsn); pgstat_report_activity(STATE_IDLE, NULL); @@ -4137,7 +4160,10 @@ LogicalRepApplyLoop(XLogRecPtr last_received) AcceptInvalidationMessages(); maybe_reread_subscription(); - /* Process any table synchronization changes. */ + /* + * Process any relations that are being synchronized in parallel + * and any newly added tables or sequences. + */ ProcessSyncingRelations(last_received); } @@ -5700,8 +5726,8 @@ run_apply_worker() } /* - * Common initialization for leader apply worker, parallel apply worker and - * tablesync worker. + * Common initialization for leader apply worker, parallel apply worker, + * tablesync worker and sequencesync worker. * * Initialize the database connection, in-memory subscription and necessary * config options. @@ -5809,13 +5835,17 @@ InitializeLogRepWorker(void) if (am_tablesync_worker()) ereport(LOG, - (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", - MySubscription->name, - get_rel_name(MyLogicalRepWorker->relid)))); + errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started", + MySubscription->name, + get_rel_name(MyLogicalRepWorker->relid))); + else if (am_sequencesync_worker()) + ereport(LOG, + errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started", + MySubscription->name)); else ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" has started", - MySubscription->name))); + errmsg("logical replication apply worker for subscription \"%s\" has started", + MySubscription->name)); CommitTransactionCommand(); } @@ -5831,14 +5861,16 @@ replorigin_reset(int code, Datum arg) replorigin_session_origin_timestamp = 0; } -/* Common function to setup the leader apply or tablesync worker. */ +/* + * Common function to setup the leader apply, tablesync and sequencesync worker. + */ void SetupApplyOrSyncWorker(int worker_slot) { /* Attach to slot */ logicalrep_worker_attach(worker_slot); - Assert(am_tablesync_worker() || am_leader_apply_worker()); + Assert(am_tablesync_worker() || am_sequencesync_worker() || am_leader_apply_worker()); /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); @@ -5921,9 +5953,15 @@ DisableSubscriptionAndExit(void) RESUME_INTERRUPTS(); - /* Report the worker failed during either table synchronization or apply */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); + if (am_leader_apply_worker() || am_tablesync_worker()) + { + /* + * Report the worker failed during either table synchronization or + * apply. + */ + pgstat_report_subscription_error(MyLogicalRepWorker->subid, + !am_tablesync_worker()); + } /* Disable the subscription */ StartTransactionCommand(); |
