diff options
Diffstat (limited to 'src/backend/executor/nodeGather.c')
-rw-r--r-- | src/backend/executor/nodeGather.c | 138 |
1 files changed, 108 insertions, 30 deletions
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 5f589614dc2..850c67e022a 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -36,11 +36,13 @@ #include "executor/nodeGather.h" #include "executor/nodeSubplan.h" #include "executor/tqueue.h" +#include "miscadmin.h" #include "utils/memutils.h" #include "utils/rel.h" static TupleTableSlot *gather_getnext(GatherState *gatherstate); +static HeapTuple gather_readnext(GatherState *gatherstate); static void ExecShutdownGatherWorkers(GatherState *node); @@ -125,6 +127,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags) TupleTableSlot * ExecGather(GatherState *node) { + TupleTableSlot *fslot = node->funnel_slot; int i; TupleTableSlot *slot; TupleTableSlot *resultSlot; @@ -148,6 +151,7 @@ ExecGather(GatherState *node) */ if (gather->num_workers > 0 && IsInParallelMode()) { + ParallelContext *pcxt; bool got_any_worker = false; /* Initialize the workers required to execute Gather node. */ @@ -160,18 +164,26 @@ ExecGather(GatherState *node) * Register backend workers. We might not get as many as we * requested, or indeed any at all. */ - LaunchParallelWorkers(node->pei->pcxt); + pcxt = node->pei->pcxt; + LaunchParallelWorkers(pcxt); - /* Set up a tuple queue to collect the results. */ - node->funnel = CreateTupleQueueFunnel(); - for (i = 0; i < node->pei->pcxt->nworkers; ++i) + /* Set up tuple queue readers to read the results. */ + if (pcxt->nworkers > 0) { - if (node->pei->pcxt->worker[i].bgwhandle) + node->nreaders = 0; + node->reader = + palloc(pcxt->nworkers * sizeof(TupleQueueReader *)); + + for (i = 0; i < pcxt->nworkers; ++i) { + if (pcxt->worker[i].bgwhandle == NULL) + continue; + shm_mq_set_handle(node->pei->tqueue[i], - node->pei->pcxt->worker[i].bgwhandle); - RegisterTupleQueueOnFunnel(node->funnel, - node->pei->tqueue[i]); + pcxt->worker[i].bgwhandle); + node->reader[node->nreaders++] = + CreateTupleQueueReader(node->pei->tqueue[i], + fslot->tts_tupleDescriptor); got_any_worker = true; } } @@ -182,7 +194,7 @@ ExecGather(GatherState *node) } /* Run plan locally if no workers or not single-copy. */ - node->need_to_scan_locally = (node->funnel == NULL) + node->need_to_scan_locally = (node->reader == NULL) || !gather->single_copy; node->initialized = true; } @@ -254,13 +266,9 @@ ExecEndGather(GatherState *node) } /* - * gather_getnext - * - * Get the next tuple from shared memory queue. This function - * is responsible for fetching tuples from all the queues associated - * with worker backends used in Gather node execution and if there is - * no data available from queues or no worker is available, it does - * fetch the data from local node. + * Read the next tuple. We might fetch a tuple from one of the tuple queues + * using gather_readnext, or if no tuple queue contains a tuple and the + * single_copy flag is not set, we might generate one locally instead. */ static TupleTableSlot * gather_getnext(GatherState *gatherstate) @@ -270,18 +278,11 @@ gather_getnext(GatherState *gatherstate) TupleTableSlot *fslot = gatherstate->funnel_slot; HeapTuple tup; - while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally) + while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally) { - if (gatherstate->funnel != NULL) + if (gatherstate->reader != NULL) { - bool done = false; - - /* wait only if local scan is done */ - tup = TupleQueueFunnelNext(gatherstate->funnel, - gatherstate->need_to_scan_locally, - &done); - if (done) - ExecShutdownGatherWorkers(gatherstate); + tup = gather_readnext(gatherstate); if (HeapTupleIsValid(tup)) { @@ -309,6 +310,80 @@ gather_getnext(GatherState *gatherstate) return ExecClearTuple(fslot); } +/* + * Attempt to read a tuple from one of our parallel workers. + */ +static HeapTuple +gather_readnext(GatherState *gatherstate) +{ + int waitpos = gatherstate->nextreader; + + for (;;) + { + TupleQueueReader *reader; + HeapTuple tup; + bool readerdone; + + /* Make sure we've read all messages from workers. */ + HandleParallelMessages(); + + /* Attempt to read a tuple, but don't block if none is available. */ + reader = gatherstate->reader[gatherstate->nextreader]; + tup = TupleQueueReaderNext(reader, true, &readerdone); + + /* + * If this reader is done, remove it. If all readers are done, + * clean up remaining worker state. + */ + if (readerdone) + { + DestroyTupleQueueReader(reader); + --gatherstate->nreaders; + if (gatherstate->nreaders == 0) + { + ExecShutdownGather(gatherstate); + return NULL; + } + else + { + memmove(&gatherstate->reader[gatherstate->nextreader], + &gatherstate->reader[gatherstate->nextreader + 1], + sizeof(TupleQueueReader *) + * (gatherstate->nreaders - gatherstate->nextreader)); + if (gatherstate->nextreader >= gatherstate->nreaders) + gatherstate->nextreader = 0; + if (gatherstate->nextreader < waitpos) + --waitpos; + } + continue; + } + + /* Advance nextreader pointer in round-robin fashion. */ + gatherstate->nextreader = + (gatherstate->nextreader + 1) % gatherstate->nreaders; + + /* If we got a tuple, return it. */ + if (tup) + return tup; + + /* Have we visited every TupleQueueReader? */ + if (gatherstate->nextreader == waitpos) + { + /* + * If (still) running plan locally, return NULL so caller can + * generate another tuple from the local copy of the plan. + */ + if (gatherstate->need_to_scan_locally) + return NULL; + + /* Nothing to do except wait for developments. */ + WaitLatch(MyLatch, WL_LATCH_SET, 0); + CHECK_FOR_INTERRUPTS(); + ResetLatch(MyLatch); + } + } +} + /* ---------------------------------------------------------------- * ExecShutdownGatherWorkers * @@ -320,11 +395,14 @@ gather_getnext(GatherState *gatherstate) void ExecShutdownGatherWorkers(GatherState *node) { - /* Shut down tuple queue funnel before shutting down workers. */ - if (node->funnel != NULL) + /* Shut down tuple queue readers before shutting down workers. */ + if (node->reader != NULL) { - DestroyTupleQueueFunnel(node->funnel); - node->funnel = NULL; + int i; + + for (i = 0; i < node->nreaders; ++i) + DestroyTupleQueueReader(node->reader[i]); + node->reader = NULL; } /* Now shut down the workers. */ |