summaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
authorDavid Rowley <drowley@postgresql.org>2020-06-19 17:24:27 +1200
committerDavid Rowley <drowley@postgresql.org>2020-06-19 17:24:27 +1200
commit9bdb300dedf086cc54edf740088208e6b24307ef (patch)
tree62d1699e8bd93fcd6ce59b16d5fe4bfa6c09c4d7 /src/backend/executor
parentf219167910ad33dfd8f1b0bba15323d71a91c4e9 (diff)
Fix EXPLAIN ANALYZE for parallel HashAgg plans
Since 1f39bce02, HashAgg nodes have had the ability to spill to disk when memory consumption exceeds work_mem. That commit added new properties to EXPLAIN ANALYZE to show the maximum memory usage and disk usage, however, it didn't quite go as far as showing that information for parallel workers. Since workers may have experienced something very different from the main process, we should show this information per worker, as is done in Sort. Reviewed-by: Justin Pryzby Reviewed-by: Jeff Davis Discussion: https://postgr.es/m/CAApHDvpEKbfZa18mM1TD7qV6PG+w97pwCWq5tVD0dX7e11gRJw@mail.gmail.com Backpatch-through: 13, where the hashagg spilling code was added.
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/execParallel.c19
-rw-r--r--src/backend/executor/nodeAgg.c103
2 files changed, 119 insertions, 3 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 41cb41481df..382e78fb7fe 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -25,6 +25,7 @@
#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/nodeAgg.h"
#include "executor/nodeAppend.h"
#include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h"
@@ -288,7 +289,10 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIncrementalSortEstimate((IncrementalSortState *) planstate, e->pcxt);
break;
-
+ case T_AggState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecAggEstimate((AggState *) planstate, e->pcxt);
+ break;
default:
break;
}
@@ -505,7 +509,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
/* even when not parallel-aware, for EXPLAIN ANALYZE */
ExecIncrementalSortInitializeDSM((IncrementalSortState *) planstate, d->pcxt);
break;
-
+ case T_AggState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecAggInitializeDSM((AggState *) planstate, d->pcxt);
+ break;
default:
break;
}
@@ -1048,6 +1055,9 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
case T_HashState:
ExecHashRetrieveInstrumentation((HashState *) planstate);
break;
+ case T_AggState:
+ ExecAggRetrieveInstrumentation((AggState *) planstate);
+ break;
default:
break;
}
@@ -1336,7 +1346,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
ExecIncrementalSortInitializeWorker((IncrementalSortState *) planstate,
pwcxt);
break;
-
+ case T_AggState:
+ /* even when not parallel-aware, for EXPLAIN ANALYZE */
+ ExecAggInitializeWorker((AggState *) planstate, pwcxt);
+ break;
default:
break;
}
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 331acee2814..a20554ae65a 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -240,6 +240,7 @@
#include "postgres.h"
#include "access/htup_details.h"
+#include "access/parallel.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_aggregate.h"
#include "catalog/pg_proc.h"
@@ -4483,6 +4484,22 @@ ExecEndAgg(AggState *node)
int numGroupingSets = Max(node->maxsets, 1);
int setno;
+ /*
+ * When ending a parallel worker, copy the statistics gathered by the
+ * worker back into shared memory so that it can be picked up by the main
+ * process to report in EXPLAIN ANALYZE.
+ */
+ if (node->shared_info && IsParallelWorker())
+ {
+ AggregateInstrumentation *si;
+
+ Assert(ParallelWorkerNumber <= node->shared_info->num_workers);
+ si = &node->shared_info->sinstrument[ParallelWorkerNumber];
+ si->hash_batches_used = node->hash_batches_used;
+ si->hash_disk_used = node->hash_disk_used;
+ si->hash_mem_peak = node->hash_mem_peak;
+ }
+
/* Make sure we have closed any open tuplesorts */
if (node->sort_in)
@@ -4854,3 +4871,89 @@ aggregate_dummy(PG_FUNCTION_ARGS)
fcinfo->flinfo->fn_oid);
return (Datum) 0; /* keep compiler quiet */
}
+
+/* ----------------------------------------------------------------
+ * Parallel Query Support
+ * ----------------------------------------------------------------
+ */
+
+ /* ----------------------------------------------------------------
+ * ExecAggEstimate
+ *
+ * Estimate space required to propagate aggregate statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggEstimate(AggState *node, ParallelContext *pcxt)
+{
+ Size size;
+
+ /* don't need this if not instrumenting or no workers */
+ if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+ return;
+
+ size = mul_size(pcxt->nworkers, sizeof(AggregateInstrumentation));
+ size = add_size(size, offsetof(SharedAggInfo, sinstrument));
+ shm_toc_estimate_chunk(&pcxt->estimator, size);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAggInitializeDSM
+ *
+ * Initialize DSM space for aggregate statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt)
+{
+ Size size;
+
+ /* don't need this if not instrumenting or no workers */
+ if (!node->ss.ps.instrument || pcxt->nworkers == 0)
+ return;
+
+ size = offsetof(SharedAggInfo, sinstrument)
+ + pcxt->nworkers * sizeof(AggregateInstrumentation);
+ node->shared_info = shm_toc_allocate(pcxt->toc, size);
+ /* ensure any unfilled slots will contain zeroes */
+ memset(node->shared_info, 0, size);
+ node->shared_info->num_workers = pcxt->nworkers;
+ shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id,
+ node->shared_info);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAggInitializeWorker
+ *
+ * Attach worker to DSM space for aggregate statistics.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt)
+{
+ node->shared_info =
+ shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAggRetrieveInstrumentation
+ *
+ * Transfer aggregate statistics from DSM to private memory.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAggRetrieveInstrumentation(AggState *node)
+{
+ Size size;
+ SharedAggInfo *si;
+
+ if (node->shared_info == NULL)
+ return;
+
+ size = offsetof(SharedAggInfo, sinstrument)
+ + node->shared_info->num_workers * sizeof(AggregateInstrumentation);
+ si = palloc(size);
+ memcpy(si, node->shared_info, size);
+ node->shared_info = si;
+}