From 7e174fa793a2df89fe03d002a5087ef67abcdde8 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Fri, 4 Aug 2017 21:14:35 -0400 Subject: Only kill sync workers at commit time in subscription DDL This allows a transaction abort to avoid killing those workers. Author: Petr Jelinek --- src/backend/replication/logical/launcher.c | 83 +++++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 2 deletions(-) (limited to 'src/backend/replication/logical/launcher.c') diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index d165d518e1b..0f9e5755b9e 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -73,6 +73,14 @@ typedef struct LogicalRepCtxStruct LogicalRepCtxStruct *LogicalRepCtx; +typedef struct LogicalRepWorkerId +{ + Oid subid; + Oid relid; +} LogicalRepWorkerId; + +static List *on_commit_stop_workers = NIL; + static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); @@ -249,6 +257,30 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) return res; } +/* + * Similar to logicalrep_worker_find(), but returns list of all workers for + * the subscription, instead just one. + */ +List * +logicalrep_workers_find(Oid subid, bool only_running) +{ + int i; + List *res = NIL; + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + /* Search for attached worker for a given subscription id. */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (w->in_use && w->subid == subid && (!only_running || w->proc)) + res = lappend(res, w); + } + + return res; +} + /* * Start new apply background worker. */ @@ -513,6 +545,27 @@ logicalrep_worker_stop(Oid subid, Oid relid) LWLockRelease(LogicalRepWorkerLock); } +/* + * Request worker for specified sub/rel to be stopped on commit. + */ +void +logicalrep_worker_stop_at_commit(Oid subid, Oid relid) +{ + LogicalRepWorkerId *wid; + MemoryContext oldctx; + + /* Make sure we store the info in context that survives until commit. */ + oldctx = MemoryContextSwitchTo(TopTransactionContext); + + wid = palloc(sizeof(LogicalRepWorkerId)); + wid->subid = subid; + wid->relid = relid; + + on_commit_stop_workers = lappend(on_commit_stop_workers, wid); + + MemoryContextSwitchTo(oldctx); +} + /* * Wake up (using latch) any logical replication worker for specified sub/rel. */ @@ -753,15 +806,41 @@ ApplyLauncherShmemInit(void) } } +/* + * Check whether current transaction has manipulated logical replication + * workers. + */ +bool +XactManipulatesLogicalReplicationWorkers(void) +{ + return (on_commit_stop_workers != NIL); +} + /* * Wakeup the launcher on commit if requested. */ void AtEOXact_ApplyLauncher(bool isCommit) { - if (isCommit && on_commit_launcher_wakeup) - ApplyLauncherWakeup(); + if (isCommit) + { + ListCell *lc; + foreach (lc, on_commit_stop_workers) + { + LogicalRepWorkerId *wid = lfirst(lc); + logicalrep_worker_stop(wid->subid, wid->relid); + } + + if (on_commit_launcher_wakeup) + ApplyLauncherWakeup(); + } + + /* + * No need to pfree on_commit_stop_workers. It was allocated in + * transaction memory context, which is going to be cleaned soon. + */ + on_commit_stop_workers = NIL; on_commit_launcher_wakeup = false; } -- cgit v1.2.3