diff options
Diffstat (limited to 'src/backend/replication/logical')
| -rw-r--r-- | src/backend/replication/logical/launcher.c | 40 | ||||
| -rw-r--r-- | src/backend/replication/logical/syncutils.c | 3 | ||||
| -rw-r--r-- | src/backend/replication/logical/tablesync.c | 11 | ||||
| -rw-r--r-- | src/backend/replication/logical/worker.c | 8 | 
4 files changed, 41 insertions, 21 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 218cefe86e2..95b5cae9a55 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -245,20 +245,25 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,  }  /* - * Walks the workers array and searches for one that matches given - * subscription id and relid. + * Walks the workers array and searches for one that matches given worker type, + * subscription id, and relation id.   * - * We are only interested in the leader apply worker or table sync worker. + * For apply workers, the relid should be set to InvalidOid, as they manage + * changes across all tables. For table sync workers, the relid should be set + * to the OID of the relation being synchronized.   */  LogicalRepWorker * -logicalrep_worker_find(Oid subid, Oid relid, bool only_running) +logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, +					   bool only_running)  {  	int			i;  	LogicalRepWorker *res = NULL; +	/* relid must be valid only for table sync workers */ +	Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));  	Assert(LWLockHeldByMe(LogicalRepWorkerLock)); -	/* Search for attached worker for a given subscription id. */ +	/* Search for an attached worker that matches the specified criteria. */  	for (i = 0; i < max_logical_replication_workers; i++)  	{  		LogicalRepWorker *w = &LogicalRepCtx->workers[i]; @@ -268,7 +273,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)  			continue;  		if (w->in_use && w->subid == subid && w->relid == relid && -			(!only_running || w->proc)) +			w->type == wtype && (!only_running || w->proc))  		{  			res = w;  			break; @@ -627,16 +632,20 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)  }  /* - * Stop the logical replication worker for subid/relid, if any. + * Stop the logical replication worker that matches the specified worker type, + * subscription id, and relation id.   */  void -logicalrep_worker_stop(Oid subid, Oid relid) +logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)  {  	LogicalRepWorker *worker; +	/* relid must be valid only for table sync workers */ +	Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid)); +  	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); -	worker = logicalrep_worker_find(subid, relid, false); +	worker = logicalrep_worker_find(wtype, subid, relid, false);  	if (worker)  	{ @@ -694,16 +703,20 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)  }  /* - * Wake up (using latch) any logical replication worker for specified sub/rel. + * Wake up (using latch) any logical replication worker that matches the + * specified worker type, subscription id, and relation id.   */  void -logicalrep_worker_wakeup(Oid subid, Oid relid) +logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)  {  	LogicalRepWorker *worker; +	/* relid must be valid only for table sync workers */ +	Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid)); +  	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); -	worker = logicalrep_worker_find(subid, relid, true); +	worker = logicalrep_worker_find(wtype, subid, relid, true);  	if (worker)  		logicalrep_worker_wakeup_ptr(worker); @@ -1260,7 +1273,8 @@ ApplyLauncherMain(Datum main_arg)  				continue;  			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); -			w = logicalrep_worker_find(sub->oid, InvalidOid, false); +			w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid, +									   false);  			if (w != NULL)  			{ diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c index e452a1e78d4..ae8c9385916 100644 --- a/src/backend/replication/logical/syncutils.c +++ b/src/backend/replication/logical/syncutils.c @@ -69,7 +69,8 @@ FinishSyncWorker(void)  	CommitTransactionCommand();  	/* Find the leader apply worker and signal it. */ -	logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); +	logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid, +							 InvalidOid);  	/* Stop gracefully */  	proc_exit(0); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 40e1ed3c20e..58c98488d7b 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -160,7 +160,8 @@ wait_for_table_state_change(Oid relid, char expected_state)  		/* Check if the sync worker is still running and bail if not. */  		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); -		worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid, +		worker = logicalrep_worker_find(WORKERTYPE_TABLESYNC, +										MyLogicalRepWorker->subid, relid,  										false);  		LWLockRelease(LogicalRepWorkerLock);  		if (!worker) @@ -207,8 +208,9 @@ wait_for_worker_state_change(char expected_state)  		 * waiting.  		 */  		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); -		worker = logicalrep_worker_find(MyLogicalRepWorker->subid, -										InvalidOid, false); +		worker = logicalrep_worker_find(WORKERTYPE_APPLY, +										MyLogicalRepWorker->subid, InvalidOid, +										false);  		if (worker && worker->proc)  			logicalrep_worker_wakeup_ptr(worker);  		LWLockRelease(LogicalRepWorkerLock); @@ -476,7 +478,8 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)  			 */  			LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); -			syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, +			syncworker = logicalrep_worker_find(WORKERTYPE_TABLESYNC, +												MyLogicalRepWorker->subid,  												rstate->relid, false);  			if (syncworker) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5df5a4612b6..7edd1c9cf06 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1817,7 +1817,8 @@ apply_handle_stream_start(StringInfo s)  				 * Signal the leader apply worker, as it may be waiting for  				 * us.  				 */ -				logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); +				logicalrep_worker_wakeup(WORKERTYPE_APPLY, +										 MyLogicalRepWorker->subid, InvalidOid);  			}  			parallel_stream_nchanges = 0; @@ -3284,8 +3285,9 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,  		 * maybe_advance_nonremovable_xid() for details).  		 */  		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); -		leader = logicalrep_worker_find(MyLogicalRepWorker->subid, -										InvalidOid, false); +		leader = logicalrep_worker_find(WORKERTYPE_APPLY, +										MyLogicalRepWorker->subid, InvalidOid, +										false);  		if (!leader)  		{  			ereport(ERROR,  | 
