summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/launcher.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r--src/backend/replication/logical/launcher.c57
1 files changed, 50 insertions, 7 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 95b5cae9a55..6c6d4015ba7 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -248,9 +248,11 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
* Walks the workers array and searches for one that matches given worker type,
* subscription id, and relation id.
*
- * For apply workers, the relid should be set to InvalidOid, as they manage
- * changes across all tables. For table sync workers, the relid should be set
- * to the OID of the relation being synchronized.
+ * For both apply workers and sequencesync workers, the relid should be set to
+ * InvalidOid, as these workers handle changes across all tables and sequences
+ * respectively, rather than targeting a specific relation. For tablesync
+ * workers, the relid should be set to the OID of the relation being
+ * synchronized.
*/
LogicalRepWorker *
logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
@@ -334,6 +336,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
int nparallelapplyworkers;
TimestampTz now;
bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
+ bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC);
bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
/*----------
@@ -422,7 +425,8 @@ retry:
* sync worker limit per subscription. So, just return silently as we
* might get here because of an otherwise harmless race condition.
*/
- if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
+ if ((is_tablesync_worker || is_sequencesync_worker) &&
+ nsyncworkers >= max_sync_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
@@ -478,6 +482,7 @@ retry:
TIMESTAMP_NOBEGIN(worker->last_recv_time);
worker->reply_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->reply_time);
+ worker->last_seqsync_start_time = 0;
/* Before releasing lock, remember generation for future identification. */
generation = worker->generation;
@@ -511,8 +516,16 @@ retry:
memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
break;
+ case WORKERTYPE_SEQUENCESYNC:
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication sequencesync worker for subscription %u",
+ subid);
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
+ break;
+
case WORKERTYPE_TABLESYNC:
- snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication tablesync worker for subscription %u sync %u",
subid,
@@ -849,6 +862,33 @@ logicalrep_launcher_onexit(int code, Datum arg)
}
/*
+ * Reset the last_seqsync_start_time of the sequencesync worker in the
+ * subscription's apply worker.
+ *
+ * Note that this value is not stored in the sequencesync worker, because that
+ * has finished already and is about to exit.
+ */
+void
+logicalrep_reset_seqsync_start_time(void)
+{
+ LogicalRepWorker *worker;
+
+ /*
+ * The apply worker can't access last_seqsync_start_time concurrently, so
+ * it is okay to use SHARED lock here. See ProcessSequencesForSync().
+ */
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ worker = logicalrep_worker_find(WORKERTYPE_APPLY,
+ MyLogicalRepWorker->subid, InvalidOid,
+ true);
+ if (worker)
+ worker->last_seqsync_start_time = 0;
+
+ LWLockRelease(LogicalRepWorkerLock);
+}
+
+/*
* Cleanup function.
*
* Called on logical replication worker exit.
@@ -896,7 +936,7 @@ logicalrep_sync_worker_count(Oid subid)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
- if (isTablesyncWorker(w) && w->subid == subid)
+ if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w)))
res++;
}
@@ -1610,7 +1650,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
worker_pid = worker.proc->pid;
values[0] = ObjectIdGetDatum(worker.subid);
- if (isTablesyncWorker(&worker))
+ if (isTableSyncWorker(&worker))
values[1] = ObjectIdGetDatum(worker.relid);
else
nulls[1] = true;
@@ -1650,6 +1690,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
case WORKERTYPE_PARALLEL_APPLY:
values[9] = CStringGetTextDatum("parallel apply");
break;
+ case WORKERTYPE_SEQUENCESYNC:
+ values[9] = CStringGetTextDatum("sequence synchronization");
+ break;
case WORKERTYPE_TABLESYNC:
values[9] = CStringGetTextDatum("table synchronization");
break;