summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/syncutils.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2025-11-05 05:54:25 +0000
committerAmit Kapila <akapila@postgresql.org>2025-11-05 05:59:58 +0000
commit5509055d6956745532e65ab218e15b99d87d66ce (patch)
tree6d3776b6e028dbe5a465657395c5c624b0b21a75 /src/backend/replication/logical/syncutils.c
parent1fd981f05369340a8afa4d013a350b0b2ac6e33e (diff)
Add sequence synchronization for logical replication.
This patch introduces sequence synchronization. Sequences that are synced will have 2 states: - INIT (needs [re]synchronizing) - READY (is already synchronized) A new sequencesync worker is launched as needed to synchronize sequences. A single sequencesync worker is responsible for synchronizing all sequences. It begins by retrieving the list of sequences that are flagged for synchronization, i.e., those in the INIT state. These sequences are then processed in batches, allowing multiple entries to be synchronized within a single transaction. The worker fetches the current sequence values and page LSNs from the remote publisher, updates the corresponding sequences on the local subscriber, and finally marks each sequence as READY upon successful synchronization. Sequence synchronization occurs in 3 places: 1) CREATE SUBSCRIPTION - The command syntax remains unchanged. - The subscriber retrieves sequences associated with publications. - Published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker to synchronize all sequences. 2) ALTER SUBSCRIPTION ... REFRESH PUBLICATION - The command syntax remains unchanged. - Dropped published sequences are removed from pg_subscription_rel. - Newly published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker to synchronize only newly added sequences. 3) ALTER SUBSCRIPTION ... REFRESH SEQUENCES - A new command introduced for PG19 by f0b3573c3a. - All sequences in pg_subscription_rel are reset to INIT state. - Initiate the sequencesync worker to synchronize all sequences. - Unlike "ALTER SUBSCRIPTION ... REFRESH PUBLICATION" command, addition and removal of missing sequences will not be done in this case. Author: Vignesh C <vignesh21@gmail.com> Reviewed-by: shveta malik <shveta.malik@gmail.com> Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com> Reviewed-by: Peter Smith <smithpb2250@gmail.com> Reviewed-by: Nisha Moond <nisha.moond412@gmail.com> Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Chao Li <li.evan.chao@gmail.com> Discussion: https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/syncutils.c')
-rw-r--r--src/backend/replication/logical/syncutils.c139
1 files changed, 115 insertions, 24 deletions
diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c
index ae8c9385916..a696b1f2dc0 100644
--- a/src/backend/replication/logical/syncutils.c
+++ b/src/backend/replication/logical/syncutils.c
@@ -16,6 +16,7 @@
#include "catalog/pg_subscription_rel.h"
#include "pgstat.h"
+#include "replication/logicallauncher.h"
#include "replication/worker_internal.h"
#include "storage/ipc.h"
#include "utils/lsyscache.h"
@@ -48,6 +49,8 @@ static SyncingRelationsState relation_states_validity = SYNC_RELATIONS_STATE_NEE
pg_noreturn void
FinishSyncWorker(void)
{
+ Assert(am_sequencesync_worker() || am_tablesync_worker());
+
/*
* Commit any outstanding transaction. This is the usual case, unless
* there was nothing to do for the table.
@@ -61,16 +64,31 @@ FinishSyncWorker(void)
/* And flush all writes. */
XLogFlush(GetXLogWriteRecPtr());
- StartTransactionCommand();
- ereport(LOG,
- (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
- MySubscription->name,
- get_rel_name(MyLogicalRepWorker->relid))));
- CommitTransactionCommand();
+ if (am_sequencesync_worker())
+ {
+ ereport(LOG,
+ errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished",
+ MySubscription->name));
+
+ /*
+ * Reset last_seqsync_start_time, so that next time a sequencesync
+ * worker is needed it can be started promptly.
+ */
+ logicalrep_reset_seqsync_start_time();
+ }
+ else
+ {
+ StartTransactionCommand();
+ ereport(LOG,
+ errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished",
+ MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid)));
+ CommitTransactionCommand();
- /* Find the leader apply worker and signal it. */
- logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
- InvalidOid);
+ /* Find the leader apply worker and signal it. */
+ logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
+ InvalidOid);
+ }
/* Stop gracefully */
proc_exit(0);
@@ -86,7 +104,52 @@ InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue)
}
/*
- * Process possible state change(s) of relations that are being synchronized.
+ * Attempt to launch a sync worker for one or more sequences or a table, if
+ * a worker slot is available and the retry interval has elapsed.
+ *
+ * wtype: sync worker type.
+ * nsyncworkers: Number of currently running sync workers for the subscription.
+ * relid: InvalidOid for sequencesync worker, actual relid for tablesync
+ * worker.
+ * last_start_time: Pointer to the last start time of the worker.
+ */
+void
+launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid,
+ TimestampTz *last_start_time)
+{
+ TimestampTz now;
+
+ Assert((wtype == WORKERTYPE_TABLESYNC && OidIsValid(relid)) ||
+ (wtype == WORKERTYPE_SEQUENCESYNC && !OidIsValid(relid)));
+
+ /* If there is a free sync worker slot, start a new sync worker */
+ if (nsyncworkers >= max_sync_workers_per_subscription)
+ return;
+
+ now = GetCurrentTimestamp();
+
+ if (!(*last_start_time) ||
+ TimestampDifferenceExceeds(*last_start_time, now,
+ wal_retrieve_retry_interval))
+ {
+ /*
+ * Set the last_start_time even if we fail to start the worker, so
+ * that we won't retry until wal_retrieve_retry_interval has elapsed.
+ */
+ *last_start_time = now;
+ (void) logicalrep_worker_launch(wtype,
+ MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ relid, DSM_HANDLE_INVALID, false);
+ }
+}
+
+/*
+ * Process possible state change(s) of relations that are being synchronized
+ * and start new tablesync workers for the newly added tables. Also, start a
+ * new sequencesync worker for the newly added sequences.
*/
void
ProcessSyncingRelations(XLogRecPtr current_lsn)
@@ -108,6 +171,12 @@ ProcessSyncingRelations(XLogRecPtr current_lsn)
case WORKERTYPE_APPLY:
ProcessSyncingTablesForApply(current_lsn);
+ ProcessSequencesForSync();
+ break;
+
+ case WORKERTYPE_SEQUENCESYNC:
+ /* Should never happen. */
+ elog(ERROR, "sequence synchronization worker is not expected to process relations");
break;
case WORKERTYPE_UNKNOWN:
@@ -117,17 +186,29 @@ ProcessSyncingRelations(XLogRecPtr current_lsn)
}
/*
- * Common code to fetch the up-to-date sync state info into the static lists.
+ * Common code to fetch the up-to-date sync state info for tables and sequences.
*
- * Returns true if subscription has 1 or more tables, else false.
+ * The pg_subscription_rel catalog is shared by tables and sequences. Changes
+ * to either sequences or tables can affect the validity of relation states, so
+ * we identify non-READY tables and non-READY sequences together to ensure
+ * consistency.
*
- * Note: If this function started the transaction (indicated by the parameter)
- * then it is the caller's responsibility to commit it.
+ * has_pending_subtables: true if the subscription has one or more tables that
+ * are not in READY state, otherwise false.
+ * has_pending_subsequences: true if the subscription has one or more sequences
+ * that are not in READY state, otherwise false.
*/
-bool
-FetchRelationStates(bool *started_tx)
+void
+FetchRelationStates(bool *has_pending_subtables,
+ bool *has_pending_subsequences,
+ bool *started_tx)
{
+ /*
+ * has_subtables and has_subsequences_non_ready are declared as static,
+ * since the same value can be used until the system table is invalidated.
+ */
static bool has_subtables = false;
+ static bool has_subsequences_non_ready = false;
*started_tx = false;
@@ -135,10 +216,10 @@ FetchRelationStates(bool *started_tx)
{
MemoryContext oldctx;
List *rstates;
- ListCell *lc;
SubscriptionRelState *rstate;
relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
+ has_subsequences_non_ready = false;
/* Clean the old lists. */
list_free_deep(table_states_not_ready);
@@ -150,17 +231,23 @@ FetchRelationStates(bool *started_tx)
*started_tx = true;
}
- /* Fetch tables that are in non-ready state. */
- rstates = GetSubscriptionRelations(MySubscription->oid, true, false,
+ /* Fetch tables and sequences that are in non-READY state. */
+ rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
true);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
- foreach(lc, rstates)
+ foreach_ptr(SubscriptionRelState, subrel, rstates)
{
- rstate = palloc(sizeof(SubscriptionRelState));
- memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState));
- table_states_not_ready = lappend(table_states_not_ready, rstate);
+ if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
+ has_subsequences_non_ready = true;
+ else
+ {
+ rstate = palloc(sizeof(SubscriptionRelState));
+ memcpy(rstate, subrel, sizeof(SubscriptionRelState));
+ table_states_not_ready = lappend(table_states_not_ready,
+ rstate);
+ }
}
MemoryContextSwitchTo(oldctx);
@@ -185,5 +272,9 @@ FetchRelationStates(bool *started_tx)
relation_states_validity = SYNC_RELATIONS_STATE_VALID;
}
- return has_subtables;
+ if (has_pending_subtables)
+ *has_pending_subtables = has_subtables;
+
+ if (has_pending_subsequences)
+ *has_pending_subsequences = has_subsequences_non_ready;
}