summaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeGather.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeGather.c')
-rw-r--r--src/backend/executor/nodeGather.c138
1 files changed, 108 insertions, 30 deletions
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 5f589614dc2..850c67e022a 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -36,11 +36,13 @@
#include "executor/nodeGather.h"
#include "executor/nodeSubplan.h"
#include "executor/tqueue.h"
+#include "miscadmin.h"
#include "utils/memutils.h"
#include "utils/rel.h"
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
+static HeapTuple gather_readnext(GatherState *gatherstate);
static void ExecShutdownGatherWorkers(GatherState *node);
@@ -125,6 +127,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
TupleTableSlot *
ExecGather(GatherState *node)
{
+ TupleTableSlot *fslot = node->funnel_slot;
int i;
TupleTableSlot *slot;
TupleTableSlot *resultSlot;
@@ -148,6 +151,7 @@ ExecGather(GatherState *node)
*/
if (gather->num_workers > 0 && IsInParallelMode())
{
+ ParallelContext *pcxt;
bool got_any_worker = false;
/* Initialize the workers required to execute Gather node. */
@@ -160,18 +164,26 @@ ExecGather(GatherState *node)
* Register backend workers. We might not get as many as we
* requested, or indeed any at all.
*/
- LaunchParallelWorkers(node->pei->pcxt);
+ pcxt = node->pei->pcxt;
+ LaunchParallelWorkers(pcxt);
- /* Set up a tuple queue to collect the results. */
- node->funnel = CreateTupleQueueFunnel();
- for (i = 0; i < node->pei->pcxt->nworkers; ++i)
+ /* Set up tuple queue readers to read the results. */
+ if (pcxt->nworkers > 0)
{
- if (node->pei->pcxt->worker[i].bgwhandle)
+ node->nreaders = 0;
+ node->reader =
+ palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
+
+ for (i = 0; i < pcxt->nworkers; ++i)
{
+ if (pcxt->worker[i].bgwhandle == NULL)
+ continue;
+
shm_mq_set_handle(node->pei->tqueue[i],
- node->pei->pcxt->worker[i].bgwhandle);
- RegisterTupleQueueOnFunnel(node->funnel,
- node->pei->tqueue[i]);
+ pcxt->worker[i].bgwhandle);
+ node->reader[node->nreaders++] =
+ CreateTupleQueueReader(node->pei->tqueue[i],
+ fslot->tts_tupleDescriptor);
got_any_worker = true;
}
}
@@ -182,7 +194,7 @@ ExecGather(GatherState *node)
}
/* Run plan locally if no workers or not single-copy. */
- node->need_to_scan_locally = (node->funnel == NULL)
+ node->need_to_scan_locally = (node->reader == NULL)
|| !gather->single_copy;
node->initialized = true;
}
@@ -254,13 +266,9 @@ ExecEndGather(GatherState *node)
}
/*
- * gather_getnext
- *
- * Get the next tuple from shared memory queue. This function
- * is responsible for fetching tuples from all the queues associated
- * with worker backends used in Gather node execution and if there is
- * no data available from queues or no worker is available, it does
- * fetch the data from local node.
+ * Read the next tuple. We might fetch a tuple from one of the tuple queues
+ * using gather_readnext, or if no tuple queue contains a tuple and the
+ * single_copy flag is not set, we might generate one locally instead.
*/
static TupleTableSlot *
gather_getnext(GatherState *gatherstate)
@@ -270,18 +278,11 @@ gather_getnext(GatherState *gatherstate)
TupleTableSlot *fslot = gatherstate->funnel_slot;
HeapTuple tup;
- while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally)
+ while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally)
{
- if (gatherstate->funnel != NULL)
+ if (gatherstate->reader != NULL)
{
- bool done = false;
-
- /* wait only if local scan is done */
- tup = TupleQueueFunnelNext(gatherstate->funnel,
- gatherstate->need_to_scan_locally,
- &done);
- if (done)
- ExecShutdownGatherWorkers(gatherstate);
+ tup = gather_readnext(gatherstate);
if (HeapTupleIsValid(tup))
{
@@ -309,6 +310,80 @@ gather_getnext(GatherState *gatherstate)
return ExecClearTuple(fslot);
}
+/*
+ * Attempt to read a tuple from one of our parallel workers.
+ */
+static HeapTuple
+gather_readnext(GatherState *gatherstate)
+{
+ int waitpos = gatherstate->nextreader;
+
+ for (;;)
+ {
+ TupleQueueReader *reader;
+ HeapTuple tup;
+ bool readerdone;
+
+ /* Make sure we've read all messages from workers. */
+ HandleParallelMessages();
+
+ /* Attempt to read a tuple, but don't block if none is available. */
+ reader = gatherstate->reader[gatherstate->nextreader];
+ tup = TupleQueueReaderNext(reader, true, &readerdone);
+
+ /*
+ * If this reader is done, remove it. If all readers are done,
+ * clean up remaining worker state.
+ */
+ if (readerdone)
+ {
+ DestroyTupleQueueReader(reader);
+ --gatherstate->nreaders;
+ if (gatherstate->nreaders == 0)
+ {
+ ExecShutdownGather(gatherstate);
+ return NULL;
+ }
+ else
+ {
+ memmove(&gatherstate->reader[gatherstate->nextreader],
+ &gatherstate->reader[gatherstate->nextreader + 1],
+ sizeof(TupleQueueReader *)
+ * (gatherstate->nreaders - gatherstate->nextreader));
+ if (gatherstate->nextreader >= gatherstate->nreaders)
+ gatherstate->nextreader = 0;
+ if (gatherstate->nextreader < waitpos)
+ --waitpos;
+ }
+ continue;
+ }
+
+ /* Advance nextreader pointer in round-robin fashion. */
+ gatherstate->nextreader =
+ (gatherstate->nextreader + 1) % gatherstate->nreaders;
+
+ /* If we got a tuple, return it. */
+ if (tup)
+ return tup;
+
+ /* Have we visited every TupleQueueReader? */
+ if (gatherstate->nextreader == waitpos)
+ {
+ /*
+ * If (still) running plan locally, return NULL so caller can
+ * generate another tuple from the local copy of the plan.
+ */
+ if (gatherstate->need_to_scan_locally)
+ return NULL;
+
+ /* Nothing to do except wait for developments. */
+ WaitLatch(MyLatch, WL_LATCH_SET, 0);
+ CHECK_FOR_INTERRUPTS();
+ ResetLatch(MyLatch);
+ }
+ }
+}
+
/* ----------------------------------------------------------------
* ExecShutdownGatherWorkers
*
@@ -320,11 +395,14 @@ gather_getnext(GatherState *gatherstate)
void
ExecShutdownGatherWorkers(GatherState *node)
{
- /* Shut down tuple queue funnel before shutting down workers. */
- if (node->funnel != NULL)
+ /* Shut down tuple queue readers before shutting down workers. */
+ if (node->reader != NULL)
{
- DestroyTupleQueueFunnel(node->funnel);
- node->funnel = NULL;
+ int i;
+
+ for (i = 0; i < node->nreaders; ++i)
+ DestroyTupleQueueReader(node->reader[i]);
+ node->reader = NULL;
}
/* Now shut down the workers. */