summaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/launcher.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r--src/backend/replication/logical/launcher.c220
1 files changed, 185 insertions, 35 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index a69e371c050..afb7acddaa6 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -55,6 +55,7 @@
/* GUC variables */
int max_logical_replication_workers = 4;
int max_sync_workers_per_subscription = 2;
+int max_parallel_apply_workers_per_subscription = 2;
LogicalRepWorker *MyLogicalRepWorker = NULL;
@@ -74,6 +75,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
static void logicalrep_worker_onexit(int code, Datum arg);
static void logicalrep_worker_detach(void);
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
+static int logicalrep_pa_worker_count(Oid subid);
static bool on_commit_launcher_wakeup = false;
@@ -152,8 +154,10 @@ get_subscription_list(void)
*
* This is only needed for cleaning up the shared memory in case the worker
* fails to attach.
+ *
+ * Returns whether the attach was successful.
*/
-static void
+static bool
WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
uint16 generation,
BackgroundWorkerHandle *handle)
@@ -169,11 +173,11 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- /* Worker either died or has started; no need to do anything. */
+ /* Worker either died or has started. Return false if died. */
if (!worker->in_use || worker->proc)
{
LWLockRelease(LogicalRepWorkerLock);
- return;
+ return worker->in_use;
}
LWLockRelease(LogicalRepWorkerLock);
@@ -188,7 +192,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
if (generation == worker->generation)
logicalrep_worker_cleanup(worker);
LWLockRelease(LogicalRepWorkerLock);
- return;
+ return false;
}
/*
@@ -210,6 +214,8 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
/*
* Walks the workers array and searches for one that matches given
* subscription id and relid.
+ *
+ * We are only interested in the leader apply worker or table sync worker.
*/
LogicalRepWorker *
logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
@@ -224,6 +230,10 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+ /* Skip parallel apply workers. */
+ if (isParallelApplyWorker(w))
+ continue;
+
if (w->in_use && w->subid == subid && w->relid == relid &&
(!only_running || w->proc))
{
@@ -260,11 +270,13 @@ logicalrep_workers_find(Oid subid, bool only_running)
}
/*
- * Start new apply background worker, if possible.
+ * Start new logical replication background worker, if possible.
+ *
+ * Returns true on success, false on failure.
*/
-void
+bool
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
- Oid relid)
+ Oid relid, dsm_handle subworker_dsm)
{
BackgroundWorker bgw;
BackgroundWorkerHandle *bgw_handle;
@@ -273,7 +285,12 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
int slot = 0;
LogicalRepWorker *worker = NULL;
int nsyncworkers;
+ int nparallelapplyworkers;
TimestampTz now;
+ bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
+
+ /* Sanity check - tablesync worker cannot be a subworker */
+ Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
ereport(DEBUG1,
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -351,7 +368,20 @@ retry:
if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
- return;
+ return false;
+ }
+
+ nparallelapplyworkers = logicalrep_pa_worker_count(subid);
+
+ /*
+ * Return false if the number of parallel apply workers reached the limit
+ * per subscription.
+ */
+ if (is_parallel_apply_worker &&
+ nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
+ {
+ LWLockRelease(LogicalRepWorkerLock);
+ return false;
}
/*
@@ -365,7 +395,7 @@ retry:
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of logical replication worker slots"),
errhint("You might need to increase max_logical_replication_workers.")));
- return;
+ return false;
}
/* Prepare the worker slot. */
@@ -380,6 +410,8 @@ retry:
worker->relstate = SUBREL_STATE_UNKNOWN;
worker->relstate_lsn = InvalidXLogRecPtr;
worker->stream_fileset = NULL;
+ worker->apply_leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
+ worker->parallel_apply = is_parallel_apply_worker;
worker->last_lsn = InvalidXLogRecPtr;
TIMESTAMP_NOBEGIN(worker->last_send_time);
TIMESTAMP_NOBEGIN(worker->last_recv_time);
@@ -397,19 +429,34 @@ retry:
BGWORKER_BACKEND_DATABASE_CONNECTION;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
- snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+
+ if (is_parallel_apply_worker)
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ParallelApplyWorkerMain");
+ else
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+
if (OidIsValid(relid))
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication worker for subscription %u sync %u", subid, relid);
+ else if (is_parallel_apply_worker)
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication parallel apply worker for subscription %u", subid);
else
snprintf(bgw.bgw_name, BGW_MAXLEN,
- "logical replication worker for subscription %u", subid);
- snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
+ "logical replication apply worker for subscription %u", subid);
+
+ if (is_parallel_apply_worker)
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
+ else
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");
bgw.bgw_restart_time = BGW_NEVER_RESTART;
bgw.bgw_notify_pid = MyProcPid;
bgw.bgw_main_arg = Int32GetDatum(slot);
+ if (is_parallel_apply_worker)
+ memcpy(bgw.bgw_extra, &subworker_dsm, sizeof(dsm_handle));
+
if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
{
/* Failed to start worker, so clean up the worker slot. */
@@ -422,33 +469,23 @@ retry:
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("out of background worker slots"),
errhint("You might need to increase max_worker_processes.")));
- return;
+ return false;
}
/* Now wait until it attaches. */
- WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
+ return WaitForReplicationWorkerAttach(worker, generation, bgw_handle);
}
/*
- * Stop the logical replication worker for subid/relid, if any, and wait until
- * it detaches from the slot.
+ * Internal function to stop the worker and wait until it detaches from the
+ * slot.
*/
-void
-logicalrep_worker_stop(Oid subid, Oid relid)
+static void
+logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
{
- LogicalRepWorker *worker;
uint16 generation;
- LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-
- worker = logicalrep_worker_find(subid, relid, false);
-
- /* No worker, nothing to do. */
- if (!worker)
- {
- LWLockRelease(LogicalRepWorkerLock);
- return;
- }
+ Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED));
/*
* Remember which generation was our worker so we can check if what we see
@@ -486,10 +523,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
* different, meaning that a different worker has taken the slot.
*/
if (!worker->in_use || worker->generation != generation)
- {
- LWLockRelease(LogicalRepWorkerLock);
return;
- }
/* Worker has assigned proc, so it has started. */
if (worker->proc)
@@ -497,7 +531,7 @@ logicalrep_worker_stop(Oid subid, Oid relid)
}
/* Now terminate the worker ... */
- kill(worker->proc->pid, SIGTERM);
+ kill(worker->proc->pid, signo);
/* ... and wait for it to die. */
for (;;)
@@ -523,6 +557,53 @@ logicalrep_worker_stop(Oid subid, Oid relid)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
}
+}
+
+/*
+ * Stop the logical replication worker for subid/relid, if any.
+ */
+void
+logicalrep_worker_stop(Oid subid, Oid relid)
+{
+ LogicalRepWorker *worker;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ worker = logicalrep_worker_find(subid, relid, false);
+
+ if (worker)
+ {
+ Assert(!isParallelApplyWorker(worker));
+ logicalrep_worker_stop_internal(worker, SIGTERM);
+ }
+
+ LWLockRelease(LogicalRepWorkerLock);
+}
+
+/*
+ * Stop the logical replication parallel apply worker corresponding to the
+ * input slot number.
+ *
+ * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
+ * worker so that the worker exits cleanly.
+ */
+void
+logicalrep_pa_worker_stop(int slot_no, uint16 generation)
+{
+ LogicalRepWorker *worker;
+
+ Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ worker = &LogicalRepCtx->workers[slot_no];
+ Assert(isParallelApplyWorker(worker));
+
+ /*
+ * Only stop the worker if the generation matches and the worker is alive.
+ */
+ if (worker->generation == generation && worker->proc)
+ logicalrep_worker_stop_internal(worker, SIGINT);
LWLockRelease(LogicalRepWorkerLock);
}
@@ -595,11 +676,40 @@ logicalrep_worker_attach(int slot)
}
/*
- * Detach the worker (cleans up the worker info).
+ * Stop the parallel apply workers if any, and detach the leader apply worker
+ * (cleans up the worker info).
*/
static void
logicalrep_worker_detach(void)
{
+ /* Stop the parallel apply workers. */
+ if (am_leader_apply_worker())
+ {
+ List *workers;
+ ListCell *lc;
+
+ /*
+ * Detach from the error_mq_handle for all parallel apply workers
+ * before terminating them. This prevents the leader apply worker from
+ * receiving the worker termination message and sending it to logs
+ * when the same is already done by the parallel worker.
+ */
+ pa_detach_all_error_mq();
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+
+ workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
+ foreach(lc, workers)
+ {
+ LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+
+ if (isParallelApplyWorker(w))
+ logicalrep_worker_stop_internal(w, SIGTERM);
+ }
+
+ LWLockRelease(LogicalRepWorkerLock);
+ }
+
/* Block concurrent access. */
LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
@@ -622,6 +732,8 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
worker->userid = InvalidOid;
worker->subid = InvalidOid;
worker->relid = InvalidOid;
+ worker->apply_leader_pid = InvalidPid;
+ worker->parallel_apply = false;
}
/*
@@ -653,6 +765,13 @@ logicalrep_worker_onexit(int code, Datum arg)
if (MyLogicalRepWorker->stream_fileset != NULL)
FileSetDeleteAll(MyLogicalRepWorker->stream_fileset);
+ /*
+ * Session level locks may be acquired outside of a transaction in
+ * parallel apply mode and will not be released when the worker
+ * terminates, so manually release all locks before the worker exits.
+ */
+ LockReleaseAll(DEFAULT_LOCKMETHOD, true);
+
ApplyLauncherWakeup();
}
@@ -681,6 +800,33 @@ logicalrep_sync_worker_count(Oid subid)
}
/*
+ * Count the number of registered (but not necessarily running) parallel apply
+ * workers for a subscription.
+ */
+static int
+logicalrep_pa_worker_count(Oid subid)
+{
+ int i;
+ int res = 0;
+
+ Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+
+ /*
+ * Scan all attached parallel apply workers, only counting those which
+ * have the given subscription id.
+ */
+ for (i = 0; i < max_logical_replication_workers; i++)
+ {
+ LogicalRepWorker *w = &LogicalRepCtx->workers[i];
+
+ if (w->subid == subid && isParallelApplyWorker(w))
+ res++;
+ }
+
+ return res;
+}
+
+/*
* ApplyLauncherShmemSize
* Compute space needed for replication launcher shared memory
*/
@@ -869,7 +1015,7 @@ ApplyLauncherMain(Datum main_arg)
wait_time = wal_retrieve_retry_interval;
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
- sub->owner, InvalidOid);
+ sub->owner, InvalidOid, DSM_HANDLE_INVALID);
}
}
@@ -952,6 +1098,10 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
if (OidIsValid(subid) && worker.subid != subid)
continue;
+ /* Skip if this is a parallel apply worker */
+ if (isParallelApplyWorker(&worker))
+ continue;
+
worker_pid = worker.proc->pid;
values[0] = ObjectIdGetDatum(worker.subid);