diff options
author | Amit Kapila <akapila@postgresql.org> | 2023-01-18 09:03:12 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2023-01-18 09:03:12 +0530 |
commit | d540a02a724b9643205abce8c5644a0f0908f6e3 (patch) | |
tree | bb5103b31bfcbfb162bb1544c3e4766d66d13780 /src/backend/replication/logical/launcher.c | |
parent | 14bdb3f13de16523609d838b725540af5e23ddd3 (diff) |
Display the leader apply worker's PID for parallel apply workers.
Add leader_pid to pg_stat_subscription. leader_pid is the process ID of
the leader apply worker if this process is a parallel apply worker. If
this field is NULL, it indicates that the process is a leader apply
worker or a synchronization worker. The new column makes it easier to
distinguish parallel apply workers from other kinds of workers and helps
to identify the leader for the parallel workers corresponding to a
particular subscription.
Additionally, update the leader_pid column in pg_stat_activity as well to
display the PID of the leader apply worker for parallel apply workers.
Author: Hou Zhijie
Reviewed-by: Peter Smith, Sawada Masahiko, Amit Kapila, Shveta Mallik
Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/launcher.c')
-rw-r--r-- | src/backend/replication/logical/launcher.c | 64 |
1 files changed, 47 insertions, 17 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index afb7acddaa6..27e58566cec 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -410,7 +410,7 @@ retry: worker->relstate = SUBREL_STATE_UNKNOWN; worker->relstate_lsn = InvalidXLogRecPtr; worker->stream_fileset = NULL; - worker->apply_leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; + worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); @@ -732,7 +732,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker) worker->userid = InvalidOid; worker->subid = InvalidOid; worker->relid = InvalidOid; - worker->apply_leader_pid = InvalidPid; + worker->leader_pid = InvalidPid; worker->parallel_apply = false; } @@ -1067,12 +1067,40 @@ IsLogicalLauncher(void) } /* + * Return the pid of the leader apply worker if the given pid is the pid of a + * parallel apply worker, otherwise, return InvalidPid. + */ +pid_t +GetLeaderApplyWorkerPid(pid_t pid) +{ + int leader_pid = InvalidPid; + int i; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + + if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid) + { + leader_pid = w->leader_pid; + break; + } + } + + LWLockRelease(LogicalRepWorkerLock); + + return leader_pid; +} + +/* * Returns state of the subscriptions. */ Datum pg_stat_get_subscription(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_COLS 8 +#define PG_STAT_GET_SUBSCRIPTION_COLS 9 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); int i; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; @@ -1098,10 +1126,6 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) if (OidIsValid(subid) && worker.subid != subid) continue; - /* Skip if this is a parallel apply worker */ - if (isParallelApplyWorker(&worker)) - continue; - worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); @@ -1110,26 +1134,32 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) else nulls[1] = true; values[2] = Int32GetDatum(worker_pid); - if (XLogRecPtrIsInvalid(worker.last_lsn)) + + if (isParallelApplyWorker(&worker)) + values[3] = Int32GetDatum(worker.leader_pid); + else nulls[3] = true; + + if (XLogRecPtrIsInvalid(worker.last_lsn)) + nulls[4] = true; else - values[3] = LSNGetDatum(worker.last_lsn); + values[4] = LSNGetDatum(worker.last_lsn); if (worker.last_send_time == 0) - nulls[4] = true; + nulls[5] = true; else - values[4] = TimestampTzGetDatum(worker.last_send_time); + values[5] = TimestampTzGetDatum(worker.last_send_time); if (worker.last_recv_time == 0) - nulls[5] = true; + nulls[6] = true; else - values[5] = TimestampTzGetDatum(worker.last_recv_time); + values[6] = TimestampTzGetDatum(worker.last_recv_time); if (XLogRecPtrIsInvalid(worker.reply_lsn)) - nulls[6] = true; + nulls[7] = true; else - values[6] = LSNGetDatum(worker.reply_lsn); + values[7] = LSNGetDatum(worker.reply_lsn); if (worker.reply_time == 0) - nulls[7] = true; + nulls[8] = true; else - values[7] = TimestampTzGetDatum(worker.reply_time); + values[8] = TimestampTzGetDatum(worker.reply_time); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); |