summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/launcher.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r--src/backend/replication/logical/launcher.c40
1 files changed, 27 insertions, 13 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 218cefe86e2..95b5cae9a55 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -245,20 +245,25 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
}
/*
- * Walks the workers array and searches for one that matches given
- * subscription id and relid.
+ * Walks the workers array and searches for one that matches given worker type,
+ * subscription id, and relation id.
*
- * We are only interested in the leader apply worker or table sync worker.
+ * For apply workers, the relid should be set to InvalidOid, as they manage
+ * changes across all tables. For table sync workers, the relid should be set
+ * to the OID of the relation being synchronized.
*/
LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
+ bool only_running)
{
int i;
LogicalRepWorker *res = NULL;
+ /* relid must be valid only for table sync workers */
+ Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
- /* Search for attached worker for a given subscription id. */
+ /* Search for an attached worker that matches the specified criteria. */
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
@@ -268,7 +273,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
continue;
if (w->in_use && w->subid == subid && w->relid == relid &&
- (!only_running || w->proc))
+ w->type == wtype && (!only_running || w->proc))
{
res = w;
break;
@@ -627,16 +632,20 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
}
/*
- * Stop the logical replication worker for subid/relid, if any.
+ * Stop the logical replication worker that matches the specified worker type,
+ * subscription id, and relation id.
*/
void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
{
LogicalRepWorker *worker;
+ /* relid must be valid only for table sync workers */
+ Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
+
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- worker = logicalrep_worker_find(subid, relid, false);
+ worker = logicalrep_worker_find(wtype, subid, relid, false);
if (worker)
{
@@ -694,16 +703,20 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
}
/*
- * Wake up (using latch) any logical replication worker for specified sub/rel.
+ * Wake up (using latch) any logical replication worker that matches the
+ * specified worker type, subscription id, and relation id.
*/
void
-logicalrep_worker_wakeup(Oid subid, Oid relid)
+logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
{
LogicalRepWorker *worker;
+ /* relid must be valid only for table sync workers */
+ Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
+
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- worker = logicalrep_worker_find(subid, relid, true);
+ worker = logicalrep_worker_find(wtype, subid, relid, true);
if (worker)
logicalrep_worker_wakeup_ptr(worker);
@@ -1260,7 +1273,8 @@ ApplyLauncherMain(Datum main_arg)
continue;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+ w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid,
+ false);
if (w != NULL)
{