summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/launcher.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2025-11-05 05:54:25 +0000
committerAmit Kapila <akapila@postgresql.org>2025-11-05 05:59:58 +0000
commit5509055d6956745532e65ab218e15b99d87d66ce (patch)
tree6d3776b6e028dbe5a465657395c5c624b0b21a75 /src/backend/replication/logical/launcher.c
parent1fd981f05369340a8afa4d013a350b0b2ac6e33e (diff)
Add sequence synchronization for logical replication.
This patch introduces sequence synchronization. Sequences that are synced will have 2 states: - INIT (needs [re]synchronizing) - READY (is already synchronized) A new sequencesync worker is launched as needed to synchronize sequences. A single sequencesync worker is responsible for synchronizing all sequences. It begins by retrieving the list of sequences that are flagged for synchronization, i.e., those in the INIT state. These sequences are then processed in batches, allowing multiple entries to be synchronized within a single transaction. The worker fetches the current sequence values and page LSNs from the remote publisher, updates the corresponding sequences on the local subscriber, and finally marks each sequence as READY upon successful synchronization. Sequence synchronization occurs in 3 places: 1) CREATE SUBSCRIPTION - The command syntax remains unchanged. - The subscriber retrieves sequences associated with publications. - Published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker to synchronize all sequences. 2) ALTER SUBSCRIPTION ... REFRESH PUBLICATION - The command syntax remains unchanged. - Dropped published sequences are removed from pg_subscription_rel. - Newly published sequences are added to pg_subscription_rel with INIT state. - Initiate the sequencesync worker to synchronize only newly added sequences. 3) ALTER SUBSCRIPTION ... REFRESH SEQUENCES - A new command introduced for PG19 by f0b3573c3a. - All sequences in pg_subscription_rel are reset to INIT state. - Initiate the sequencesync worker to synchronize all sequences. - Unlike "ALTER SUBSCRIPTION ... REFRESH PUBLICATION" command, addition and removal of missing sequences will not be done in this case. Author: Vignesh C <vignesh21@gmail.com> Reviewed-by: shveta malik <shveta.malik@gmail.com> Reviewed-by: Hou Zhijie <houzj.fnst@fujitsu.com> Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com> Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com> Reviewed-by: Peter Smith <smithpb2250@gmail.com> Reviewed-by: Nisha Moond <nisha.moond412@gmail.com> Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Chao Li <li.evan.chao@gmail.com> Discussion: https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r--src/backend/replication/logical/launcher.c57
1 files changed, 50 insertions, 7 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 95b5cae9a55..6c6d4015ba7 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -248,9 +248,11 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
* Walks the workers array and searches for one that matches given worker type,
* subscription id, and relation id.
*
- * For apply workers, the relid should be set to InvalidOid, as they manage
- * changes across all tables. For table sync workers, the relid should be set
- * to the OID of the relation being synchronized.
+ * For both apply workers and sequencesync workers, the relid should be set to
+ * InvalidOid, as these workers handle changes across all tables and sequences
+ * respectively, rather than targeting a specific relation. For tablesync
+ * workers, the relid should be set to the OID of the relation being
+ * synchronized.
*/
LogicalRepWorker *
logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
@@ -334,6 +336,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
int nparallelapplyworkers;
TimestampTz now;
bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
+ bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
/*----------
@@ -422,7 +425,8 @@ retry:
* sync worker limit per subscription. So, just return silently as we
* might get here because of an otherwise harmless race condition.
*/
- if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
+ if ((is_tablesync_worker || is_sequencesync_worker) &&
+ nsyncworkers >= max_sync_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
@@ -478,6 +482,7 @@ retry:
TIMESTAMP_NOBEGIN(worker->last_recv_time);
worker->reply_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->reply_time);
+ worker->last_seqsync_start_time = 0;
/* Before releasing lock, remember generation for future identification. */
generation = worker->generation;
@@ -511,8 +516,16 @@ retry:
memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
break;
+ case WORKERTYPE_SEQUENCESYNC:
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication sequencesync worker for subscription %u",
+ subid);
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
+ break;
+
case WORKERTYPE_TABLESYNC:
- snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication tablesync worker for subscription %u sync %u",
subid,
@@ -849,6 +862,33 @@ logicalrep_launcher_onexit(int code, Datum arg)
}
/*
+ * Reset the last_seqsync_start_time of the sequencesync worker in the
+ * subscription's apply worker.
+ *
+ * Note that this value is not stored in the sequencesync worker, because that
+ * has finished already and is about to exit.
+ */
+void
+logicalrep_reset_seqsync_start_time(void)
+{
+ LogicalRepWorker *worker;
+
+ /*
+ * The apply worker can't access last_seqsync_start_time concurrently, so
+ * it is okay to use SHARED lock here. See ProcessSequencesForSync().
+ */
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ worker = logicalrep_worker_find(WORKERTYPE_APPLY,
+ MyLogicalRepWorker->subid, InvalidOid,
+ true);
+ if (worker)
+ worker->last_seqsync_start_time = 0;
+
+ LWLockRelease(LogicalRepWorkerLock);
+}
+
+/*
* Cleanup function.
*
* Called on logical replication worker exit.
@@ -896,7 +936,7 @@ logicalrep_sync_worker_count(Oid subid)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
- if (isTablesyncWorker(w) && w->subid == subid)
+ if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
res++;
}
@@ -1610,7 +1650,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
worker_pid = worker.proc->pid;
values[0] = ObjectIdGetDatum(worker.subid);
- if (isTablesyncWorker(&worker))
+ if (isTableSyncWorker(&worker))
values[1] = ObjectIdGetDatum(worker.relid);
else
nulls[1] = true;
@@ -1650,6 +1690,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
case WORKERTYPE_PARALLEL_APPLY:
values[9] = CStringGetTextDatum("parallel apply");
break;
+ case WORKERTYPE_SEQUENCESYNC:
+ values[9] = CStringGetTextDatum("sequence synchronization");
+ break;
case WORKERTYPE_TABLESYNC:
values[9] = CStringGetTextDatum("table synchronization");
break;