diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2017-06-30 14:57:06 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2017-06-30 14:57:14 -0400 |
commit | 1f201a818a5910a37530cc929bd345688f827942 (patch) | |
tree | 8aec05ddd9c9ebf8d2815daf9e496cba7403c349 /src/backend/replication/logical/launcher.c | |
parent | 1db49c3b6d2399f8f83a97f1fa34e749b9fada7c (diff) |
Fix race conditions and missed wakeups in syncrep worker signaling.
When a sync worker is waiting for the associated apply worker to notice
that it's in SYNCWAIT state, wait_for_worker_state_change() would just
patiently wait for that to happen. This generally required waiting for
the 1-second timeout in LogicalRepApplyLoop to elapse. Kicking the worker
via its latch makes things significantly snappier.
While at it, fix race conditions that could potentially result in crashes:
we can *not* call logicalrep_worker_wakeup_ptr() once we've released the
LogicalRepWorkerLock, because worker->proc might've been reset to NULL
after we do that (indeed, there's no really solid reason to believe that
the LogicalRepWorker slot even belongs to the same worker anymore).
In logicalrep_worker_wakeup(), we can just move the wakeup inside the
lock scope. In process_syncing_tables_for_apply(), a bit more code
rearrangement is needed.
Also improve some nearby comments.
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 12 |
1 files changed, 9 insertions, 3 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 86a2b14807f..961110c94be 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -515,7 +515,7 @@ logicalrep_worker_stop(Oid subid, Oid relid) } /* - * Wake up (using latch) the logical replication worker. + * Wake up (using latch) any logical replication worker for specified sub/rel. */ void logicalrep_worker_wakeup(Oid subid, Oid relid) @@ -523,19 +523,25 @@ logicalrep_worker_wakeup(Oid subid, Oid relid) LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(subid, relid, true); - LWLockRelease(LogicalRepWorkerLock); if (worker) logicalrep_worker_wakeup_ptr(worker); + + LWLockRelease(LogicalRepWorkerLock); } /* - * Wake up (using latch) the logical replication worker. + * Wake up (using latch) the specified logical replication worker. + * + * Caller must hold lock, else worker->proc could change under us. */ void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker) { + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + SetLatch(&worker->proc->procLatch); } |