diff options
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
| -rw-r--r-- | src/backend/replication/logical/launcher.c | 57 |
1 files changed, 50 insertions, 7 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 95b5cae9a55..6c6d4015ba7 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -248,9 +248,11 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, * Walks the workers array and searches for one that matches given worker type, * subscription id, and relation id. * - * For apply workers, the relid should be set to InvalidOid, as they manage - * changes across all tables. For table sync workers, the relid should be set - * to the OID of the relation being synchronized. + * For both apply workers and sequencesync workers, the relid should be set to + * InvalidOid, as these workers handle changes across all tables and sequences + * respectively, rather than targeting a specific relation. For tablesync + * workers, the relid should be set to the OID of the relation being + * synchronized. */ LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, @@ -334,6 +336,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, int nparallelapplyworkers; TimestampTz now; bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC); + bool is_sequencesync_worker = (wtype == WORKERTYPE_SEQUENCESYNC); bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY); /*---------- @@ -422,7 +425,8 @@ retry: * sync worker limit per subscription. So, just return silently as we * might get here because of an otherwise harmless race condition. */ - if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription) + if ((is_tablesync_worker || is_sequencesync_worker) && + nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); return false; @@ -478,6 +482,7 @@ retry: TIMESTAMP_NOBEGIN(worker->last_recv_time); worker->reply_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->reply_time); + worker->last_seqsync_start_time = 0; /* Before releasing lock, remember generation for future identification. */ generation = worker->generation; @@ -511,8 +516,16 @@ retry: memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle)); break; + case WORKERTYPE_SEQUENCESYNC: + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication sequencesync worker for subscription %u", + subid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker"); + break; + case WORKERTYPE_TABLESYNC: - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication tablesync worker for subscription %u sync %u", subid, @@ -849,6 +862,33 @@ logicalrep_launcher_onexit(int code, Datum arg) } /* + * Reset the last_seqsync_start_time of the sequencesync worker in the + * subscription's apply worker. + * + * Note that this value is not stored in the sequencesync worker, because that + * has finished already and is about to exit. + */ +void +logicalrep_reset_seqsync_start_time(void) +{ + LogicalRepWorker *worker; + + /* + * The apply worker can't access last_seqsync_start_time concurrently, so + * it is okay to use SHARED lock here. See ProcessSequencesForSync(). + */ + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + worker = logicalrep_worker_find(WORKERTYPE_APPLY, + MyLogicalRepWorker->subid, InvalidOid, + true); + if (worker) + worker->last_seqsync_start_time = 0; + + LWLockRelease(LogicalRepWorkerLock); +} + +/* * Cleanup function. * * Called on logical replication worker exit. @@ -896,7 +936,7 @@ logicalrep_sync_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isTablesyncWorker(w) && w->subid == subid) + if (w->subid == subid && (isTableSyncWorker(w) || isSequenceSyncWorker(w))) res++; } @@ -1610,7 +1650,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); - if (isTablesyncWorker(&worker)) + if (isTableSyncWorker(&worker)) values[1] = ObjectIdGetDatum(worker.relid); else nulls[1] = true; @@ -1650,6 +1690,9 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) case WORKERTYPE_PARALLEL_APPLY: values[9] = CStringGetTextDatum("parallel apply"); break; + case WORKERTYPE_SEQUENCESYNC: + values[9] = CStringGetTextDatum("sequence synchronization"); + break; case WORKERTYPE_TABLESYNC: values[9] = CStringGetTextDatum("table synchronization"); break; |
