summaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeHash.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeHash.c')
-rw-r--r--src/backend/executor/nodeHash.c1647
1 files changed, 1577 insertions, 70 deletions
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index afd7384e945..4284e8682a0 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -10,6 +10,8 @@
* IDENTIFICATION
* src/backend/executor/nodeHash.c
*
+ * See note on parallelism in nodeHashjoin.c.
+ *
*-------------------------------------------------------------------------
*/
/*
@@ -25,6 +27,7 @@
#include <limits.h>
#include "access/htup_details.h"
+#include "access/parallel.h"
#include "catalog/pg_statistic.h"
#include "commands/tablespace.h"
#include "executor/execdebug.h"
@@ -32,6 +35,8 @@
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
#include "miscadmin.h"
+#include "pgstat.h"
+#include "port/atomics.h"
#include "utils/dynahash.h"
#include "utils/memutils.h"
#include "utils/lsyscache.h"
@@ -40,6 +45,8 @@
static void ExecHashIncreaseNumBatches(HashJoinTable hashtable);
static void ExecHashIncreaseNumBuckets(HashJoinTable hashtable);
+static void ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable);
+static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable);
static void ExecHashBuildSkewHash(HashJoinTable hashtable, Hash *node,
int mcvsToUse);
static void ExecHashSkewTableInsert(HashJoinTable hashtable,
@@ -49,6 +56,30 @@ static void ExecHashSkewTableInsert(HashJoinTable hashtable,
static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
static void *dense_alloc(HashJoinTable hashtable, Size size);
+static HashJoinTuple ExecParallelHashTupleAlloc(HashJoinTable hashtable,
+ size_t size,
+ dsa_pointer *shared);
+static void MultiExecPrivateHash(HashState *node);
+static void MultiExecParallelHash(HashState *node);
+static inline HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable table,
+ int bucketno);
+static inline HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table,
+ HashJoinTuple tuple);
+static inline void ExecParallelHashPushTuple(dsa_pointer_atomic *head,
+ HashJoinTuple tuple,
+ dsa_pointer tuple_shared);
+static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch);
+static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable);
+static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable);
+static void ExecParallelHashRepartitionRest(HashJoinTable hashtable);
+static HashMemoryChunk ExecParallelHashPopChunkQueue(HashJoinTable table,
+ dsa_pointer *shared);
+static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
+ int batchno,
+ size_t size);
+static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
+static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
+
/* ----------------------------------------------------------------
* ExecHash
@@ -73,6 +104,39 @@ ExecHash(PlanState *pstate)
Node *
MultiExecHash(HashState *node)
{
+ /* must provide our own instrumentation support */
+ if (node->ps.instrument)
+ InstrStartNode(node->ps.instrument);
+
+ if (node->parallel_state != NULL)
+ MultiExecParallelHash(node);
+ else
+ MultiExecPrivateHash(node);
+
+ /* must provide our own instrumentation support */
+ if (node->ps.instrument)
+ InstrStopNode(node->ps.instrument, node->hashtable->partialTuples);
+
+ /*
+ * We do not return the hash table directly because it's not a subtype of
+ * Node, and so would violate the MultiExecProcNode API. Instead, our
+ * parent Hashjoin node is expected to know how to fish it out of our node
+ * state. Ugly but not really worth cleaning up, since Hashjoin knows
+ * quite a bit more about Hash besides that.
+ */
+ return NULL;
+}
+
+/* ----------------------------------------------------------------
+ * MultiExecPrivateHash
+ *
+ * parallel-oblivious version, building a backend-private
+ * hash table and (if necessary) batch files.
+ * ----------------------------------------------------------------
+ */
+static void
+MultiExecPrivateHash(HashState *node)
+{
PlanState *outerNode;
List *hashkeys;
HashJoinTable hashtable;
@@ -80,10 +144,6 @@ MultiExecHash(HashState *node)
ExprContext *econtext;
uint32 hashvalue;
- /* must provide our own instrumentation support */
- if (node->ps.instrument)
- InstrStartNode(node->ps.instrument);
-
/*
* get state info from node
*/
@@ -138,18 +198,147 @@ MultiExecHash(HashState *node)
if (hashtable->spaceUsed > hashtable->spacePeak)
hashtable->spacePeak = hashtable->spaceUsed;
- /* must provide our own instrumentation support */
- if (node->ps.instrument)
- InstrStopNode(node->ps.instrument, hashtable->totalTuples);
+ hashtable->partialTuples = hashtable->totalTuples;
+}
+
+/* ----------------------------------------------------------------
+ * MultiExecParallelHash
+ *
+ * parallel-aware version, building a shared hash table and
+ * (if necessary) batch files using the combined effort of
+ * a set of co-operating backends.
+ * ----------------------------------------------------------------
+ */
+static void
+MultiExecParallelHash(HashState *node)
+{
+ ParallelHashJoinState *pstate;
+ PlanState *outerNode;
+ List *hashkeys;
+ HashJoinTable hashtable;
+ TupleTableSlot *slot;
+ ExprContext *econtext;
+ uint32 hashvalue;
+ Barrier *build_barrier;
+ int i;
/*
- * We do not return the hash table directly because it's not a subtype of
- * Node, and so would violate the MultiExecProcNode API. Instead, our
- * parent Hashjoin node is expected to know how to fish it out of our node
- * state. Ugly but not really worth cleaning up, since Hashjoin knows
- * quite a bit more about Hash besides that.
+ * get state info from node
*/
- return NULL;
+ outerNode = outerPlanState(node);
+ hashtable = node->hashtable;
+
+ /*
+ * set expression context
+ */
+ hashkeys = node->hashkeys;
+ econtext = node->ps.ps_ExprContext;
+
+ /*
+ * Synchronize the parallel hash table build. At this stage we know that
+ * the shared hash table has been or is being set up by
+ * ExecHashTableCreate(), but we don't know if our peers have returned
+ * from there or are here in MultiExecParallelHash(), and if so how far
+ * through they are. To find out, we check the build_barrier phase then
+ * and jump to the right step in the build algorithm.
+ */
+ pstate = hashtable->parallel_state;
+ build_barrier = &pstate->build_barrier;
+ Assert(BarrierPhase(build_barrier) >= PHJ_BUILD_ALLOCATING);
+ switch (BarrierPhase(build_barrier))
+ {
+ case PHJ_BUILD_ALLOCATING:
+
+ /*
+ * Either I just allocated the initial hash table in
+ * ExecHashTableCreate(), or someone else is doing that. Either
+ * way, wait for everyone to arrive here so we can proceed.
+ */
+ BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ALLOCATING);
+ /* Fall through. */
+
+ case PHJ_BUILD_HASHING_INNER:
+
+ /*
+ * It's time to begin hashing, or if we just arrived here then
+ * hashing is already underway, so join in that effort. While
+ * hashing we have to be prepared to help increase the number of
+ * batches or buckets at any time, and if we arrived here when
+ * that was already underway we'll have to help complete that work
+ * immediately so that it's safe to access batches and buckets
+ * below.
+ */
+ if (PHJ_GROW_BATCHES_PHASE(BarrierAttach(&pstate->grow_batches_barrier)) !=
+ PHJ_GROW_BATCHES_ELECTING)
+ ExecParallelHashIncreaseNumBatches(hashtable);
+ if (PHJ_GROW_BUCKETS_PHASE(BarrierAttach(&pstate->grow_buckets_barrier)) !=
+ PHJ_GROW_BUCKETS_ELECTING)
+ ExecParallelHashIncreaseNumBuckets(hashtable);
+ ExecParallelHashEnsureBatchAccessors(hashtable);
+ ExecParallelHashTableSetCurrentBatch(hashtable, 0);
+ for (;;)
+ {
+ slot = ExecProcNode(outerNode);
+ if (TupIsNull(slot))
+ break;
+ econtext->ecxt_innertuple = slot;
+ if (ExecHashGetHashValue(hashtable, econtext, hashkeys,
+ false, hashtable->keepNulls,
+ &hashvalue))
+ ExecParallelHashTableInsert(hashtable, slot, hashvalue);
+ hashtable->partialTuples++;
+ }
+ BarrierDetach(&pstate->grow_buckets_barrier);
+ BarrierDetach(&pstate->grow_batches_barrier);
+
+ /*
+ * Make sure that any tuples we wrote to disk are visible to
+ * others before anyone tries to load them.
+ */
+ for (i = 0; i < hashtable->nbatch; ++i)
+ sts_end_write(hashtable->batches[i].inner_tuples);
+
+ /*
+ * Update shared counters. We need an accurate total tuple count
+ * to control the empty table optimization.
+ */
+ ExecParallelHashMergeCounters(hashtable);
+
+ /*
+ * Wait for everyone to finish building and flushing files and
+ * counters.
+ */
+ if (BarrierArriveAndWait(build_barrier,
+ WAIT_EVENT_HASH_BUILD_HASHING_INNER))
+ {
+ /*
+ * Elect one backend to disable any further growth. Batches
+ * are now fixed. While building them we made sure they'd fit
+ * in our memory budget when we load them back in later (or we
+ * tried to do that and gave up because we detected extreme
+ * skew).
+ */
+ pstate->growth = PHJ_GROWTH_DISABLED;
+ }
+ }
+
+ /*
+ * We're not yet attached to a batch. We all agree on the dimensions and
+ * number of inner tuples (for the empty table optimization).
+ */
+ hashtable->curbatch = -1;
+ hashtable->nbuckets = pstate->nbuckets;
+ hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
+ hashtable->totalTuples = pstate->total_tuples;
+ ExecParallelHashEnsureBatchAccessors(hashtable);
+
+ /*
+ * The next synchronization point is in ExecHashJoin's HJ_BUILD_HASHTABLE
+ * case, which will bring the build phase to PHJ_BUILD_DONE (if it isn't
+ * there already).
+ */
+ Assert(BarrierPhase(build_barrier) == PHJ_BUILD_HASHING_OUTER ||
+ BarrierPhase(build_barrier) == PHJ_BUILD_DONE);
}
/* ----------------------------------------------------------------
@@ -240,12 +429,15 @@ ExecEndHash(HashState *node)
* ----------------------------------------------------------------
*/
HashJoinTable
-ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
+ExecHashTableCreate(HashState *state, List *hashOperators, bool keepNulls)
{
+ Hash *node;
HashJoinTable hashtable;
Plan *outerNode;
+ size_t space_allowed;
int nbuckets;
int nbatch;
+ double rows;
int num_skew_mcvs;
int log2_nbuckets;
int nkeys;
@@ -258,10 +450,22 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
* "outer" subtree of this node, but the inner relation of the hashjoin).
* Compute the appropriate size of the hash table.
*/
+ node = (Hash *) state->ps.plan;
outerNode = outerPlan(node);
- ExecChooseHashTableSize(outerNode->plan_rows, outerNode->plan_width,
+ /*
+ * If this is shared hash table with a partial plan, then we can't use
+ * outerNode->plan_rows to estimate its size. We need an estimate of the
+ * total number of rows across all copies of the partial plan.
+ */
+ rows = node->plan.parallel_aware ? node->rows_total : outerNode->plan_rows;
+
+ ExecChooseHashTableSize(rows, outerNode->plan_width,
OidIsValid(node->skewTable),
+ state->parallel_state != NULL,
+ state->parallel_state != NULL ?
+ state->parallel_state->nparticipants - 1 : 0,
+ &space_allowed,
&nbuckets, &nbatch, &num_skew_mcvs);
/* nbuckets must be a power of 2 */
@@ -280,7 +484,7 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
hashtable->nbuckets_optimal = nbuckets;
hashtable->log2_nbuckets = log2_nbuckets;
hashtable->log2_nbuckets_optimal = log2_nbuckets;
- hashtable->buckets = NULL;
+ hashtable->buckets.unshared = NULL;
hashtable->keepNulls = keepNulls;
hashtable->skewEnabled = false;
hashtable->skewBucket = NULL;
@@ -293,16 +497,21 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
hashtable->nbatch_outstart = nbatch;
hashtable->growEnabled = true;
hashtable->totalTuples = 0;
+ hashtable->partialTuples = 0;
hashtable->skewTuples = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
hashtable->spaceUsed = 0;
hashtable->spacePeak = 0;
- hashtable->spaceAllowed = work_mem * 1024L;
+ hashtable->spaceAllowed = space_allowed;
hashtable->spaceUsedSkew = 0;
hashtable->spaceAllowedSkew =
hashtable->spaceAllowed * SKEW_WORK_MEM_PERCENT / 100;
hashtable->chunks = NULL;
+ hashtable->current_chunk = NULL;
+ hashtable->parallel_state = state->parallel_state;
+ hashtable->area = state->ps.state->es_query_dsa;
+ hashtable->batches = NULL;
#ifdef HJDEBUG
printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
@@ -351,10 +560,11 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
- if (nbatch > 1)
+ if (nbatch > 1 && hashtable->parallel_state == NULL)
{
/*
- * allocate and initialize the file arrays in hashCxt
+ * allocate and initialize the file arrays in hashCxt (not needed for
+ * parallel case which uses shared tuplestores instead of raw files)
*/
hashtable->innerBatchFile = (BufFile **)
palloc0(nbatch * sizeof(BufFile *));
@@ -365,23 +575,77 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
PrepareTempTablespaces();
}
- /*
- * Prepare context for the first-scan space allocations; allocate the
- * hashbucket array therein, and set each bucket "empty".
- */
- MemoryContextSwitchTo(hashtable->batchCxt);
+ MemoryContextSwitchTo(oldcxt);
- hashtable->buckets = (HashJoinTuple *)
- palloc0(nbuckets * sizeof(HashJoinTuple));
+ if (hashtable->parallel_state)
+ {
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ Barrier *build_barrier;
- /*
- * Set up for skew optimization, if possible and there's a need for more
- * than one batch. (In a one-batch join, there's no point in it.)
- */
- if (nbatch > 1)
- ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
+ /*
+ * Attach to the build barrier. The corresponding detach operation is
+ * in ExecHashTableDetach. Note that we won't attach to the
+ * batch_barrier for batch 0 yet. We'll attach later and start it out
+ * in PHJ_BATCH_PROBING phase, because batch 0 is allocated up front
+ * and then loaded while hashing (the standard hybrid hash join
+ * algorithm), and we'll coordinate that using build_barrier.
+ */
+ build_barrier = &pstate->build_barrier;
+ BarrierAttach(build_barrier);
- MemoryContextSwitchTo(oldcxt);
+ /*
+ * So far we have no idea whether there are any other participants,
+ * and if so, what phase they are working on. The only thing we care
+ * about at this point is whether someone has already created the
+ * SharedHashJoinBatch objects and the hash table for batch 0. One
+ * backend will be elected to do that now if necessary.
+ */
+ if (BarrierPhase(build_barrier) == PHJ_BUILD_ELECTING &&
+ BarrierArriveAndWait(build_barrier, WAIT_EVENT_HASH_BUILD_ELECTING))
+ {
+ pstate->nbatch = nbatch;
+ pstate->space_allowed = space_allowed;
+ pstate->growth = PHJ_GROWTH_OK;
+
+ /* Set up the shared state for coordinating batches. */
+ ExecParallelHashJoinSetUpBatches(hashtable, nbatch);
+
+ /*
+ * Allocate batch 0's hash table up front so we can load it
+ * directly while hashing.
+ */
+ pstate->nbuckets = nbuckets;
+ ExecParallelHashTableAlloc(hashtable, 0);
+ }
+
+ /*
+ * The next Parallel Hash synchronization point is in
+ * MultiExecParallelHash(), which will progress it all the way to
+ * PHJ_BUILD_DONE. The caller must not return control from this
+ * executor node between now and then.
+ */
+ }
+ else
+ {
+ /*
+ * Prepare context for the first-scan space allocations; allocate the
+ * hashbucket array therein, and set each bucket "empty".
+ */
+ MemoryContextSwitchTo(hashtable->batchCxt);
+
+ hashtable->buckets.unshared = (HashJoinTuple *)
+ palloc0(nbuckets * sizeof(HashJoinTuple));
+
+ /*
+ * Set up for skew optimization, if possible and there's a need for
+ * more than one batch. (In a one-batch join, there's no point in
+ * it.)
+ */
+ if (nbatch > 1)
+ ExecHashBuildSkewHash(hashtable, node, num_skew_mcvs);
+
+ MemoryContextSwitchTo(oldcxt);
+ }
return hashtable;
}
@@ -399,6 +663,9 @@ ExecHashTableCreate(Hash *node, List *hashOperators, bool keepNulls)
void
ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
+ bool try_combined_work_mem,
+ int parallel_workers,
+ size_t *space_allowed,
int *numbuckets,
int *numbatches,
int *num_skew_mcvs)
@@ -434,6 +701,16 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
hash_table_bytes = work_mem * 1024L;
/*
+ * Parallel Hash tries to use the combined work_mem of all workers to
+ * avoid the need to batch. If that won't work, it falls back to work_mem
+ * per worker and tries to process batches in parallel.
+ */
+ if (try_combined_work_mem)
+ hash_table_bytes += hash_table_bytes * parallel_workers;
+
+ *space_allowed = hash_table_bytes;
+
+ /*
* If skew optimization is possible, estimate the number of skew buckets
* that will fit in the memory allowed, and decrement the assumed space
* available for the main hash table accordingly.
@@ -478,7 +755,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
* Note that both nbuckets and nbatch must be powers of 2 to make
* ExecHashGetBucketAndBatch fast.
*/
- max_pointers = (work_mem * 1024L) / sizeof(HashJoinTuple);
+ max_pointers = *space_allowed / sizeof(HashJoinTuple);
max_pointers = Min(max_pointers, MaxAllocSize / sizeof(HashJoinTuple));
/* If max_pointers isn't a power of 2, must round it down to one */
mppow2 = 1L << my_log2(max_pointers);
@@ -511,6 +788,21 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
long bucket_size;
/*
+ * If Parallel Hash with combined work_mem would still need multiple
+ * batches, we'll have to fall back to regular work_mem budget.
+ */
+ if (try_combined_work_mem)
+ {
+ ExecChooseHashTableSize(ntuples, tupwidth, useskew,
+ false, parallel_workers,
+ space_allowed,
+ numbuckets,
+ numbatches,
+ num_skew_mcvs);
+ return;
+ }
+
+ /*
* Estimate the number of buckets we'll want to have when work_mem is
* entirely full. Each bucket will contain a bucket pointer plus
* NTUP_PER_BUCKET tuples, whose projected size already includes
@@ -564,14 +856,17 @@ ExecHashTableDestroy(HashJoinTable hashtable)
/*
* Make sure all the temp files are closed. We skip batch 0, since it
* can't have any temp files (and the arrays might not even exist if
- * nbatch is only 1).
+ * nbatch is only 1). Parallel hash joins don't use these files.
*/
- for (i = 1; i < hashtable->nbatch; i++)
+ if (hashtable->innerBatchFile != NULL)
{
- if (hashtable->innerBatchFile[i])
- BufFileClose(hashtable->innerBatchFile[i]);
- if (hashtable->outerBatchFile[i])
- BufFileClose(hashtable->outerBatchFile[i]);
+ for (i = 1; i < hashtable->nbatch; i++)
+ {
+ if (hashtable->innerBatchFile[i])
+ BufFileClose(hashtable->innerBatchFile[i]);
+ if (hashtable->outerBatchFile[i])
+ BufFileClose(hashtable->outerBatchFile[i]);
+ }
}
/* Release working memory (batchCxt is a child, so it goes away too) */
@@ -657,8 +952,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
hashtable->nbuckets = hashtable->nbuckets_optimal;
hashtable->log2_nbuckets = hashtable->log2_nbuckets_optimal;
- hashtable->buckets = repalloc(hashtable->buckets,
- sizeof(HashJoinTuple) * hashtable->nbuckets);
+ hashtable->buckets.unshared =
+ repalloc(hashtable->buckets.unshared,
+ sizeof(HashJoinTuple) * hashtable->nbuckets);
}
/*
@@ -666,14 +962,15 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
* buckets now and not have to keep track which tuples in the buckets have
* already been processed. We will free the old chunks as we go.
*/
- memset(hashtable->buckets, 0, sizeof(HashJoinTuple) * hashtable->nbuckets);
+ memset(hashtable->buckets.unshared, 0,
+ sizeof(HashJoinTuple) * hashtable->nbuckets);
oldchunks = hashtable->chunks;
hashtable->chunks = NULL;
/* so, let's scan through the old chunks, and all tuples in each chunk */
while (oldchunks != NULL)
{
- HashMemoryChunk nextchunk = oldchunks->next;
+ HashMemoryChunk nextchunk = oldchunks->next.unshared;
/* position within the buffer (up to oldchunks->used) */
size_t idx = 0;
@@ -700,8 +997,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
memcpy(copyTuple, hashTuple, hashTupleSize);
/* and add it back to the appropriate bucket */
- copyTuple->next = hashtable->buckets[bucketno];
- hashtable->buckets[bucketno] = copyTuple;
+ copyTuple->next.unshared = hashtable->buckets.unshared[bucketno];
+ hashtable->buckets.unshared[bucketno] = copyTuple;
}
else
{
@@ -751,6 +1048,380 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
/*
+ * ExecParallelHashIncreaseNumBatches
+ * Every participant attached to grow_barrier must run this function
+ * when it observes growth == PHJ_GROWTH_NEED_MORE_BATCHES.
+ */
+static void
+ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ int i;
+
+ Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
+
+ /*
+ * It's unlikely, but we need to be prepared for new participants to show
+ * up while we're in the middle of this operation so we need to switch on
+ * barrier phase here.
+ */
+ switch (PHJ_GROW_BATCHES_PHASE(BarrierPhase(&pstate->grow_batches_barrier)))
+ {
+ case PHJ_GROW_BATCHES_ELECTING:
+
+ /*
+ * Elect one participant to prepare to grow the number of batches.
+ * This involves reallocating or resetting the buckets of batch 0
+ * in preparation for all participants to begin repartitioning the
+ * tuples.
+ */
+ if (BarrierArriveAndWait(&pstate->grow_batches_barrier,
+ WAIT_EVENT_HASH_GROW_BATCHES_ELECTING))
+ {
+ dsa_pointer_atomic *buckets;
+ ParallelHashJoinBatch *old_batch0;
+ int new_nbatch;
+ int i;
+
+ /* Move the old batch out of the way. */
+ old_batch0 = hashtable->batches[0].shared;
+ pstate->old_batches = pstate->batches;
+ pstate->old_nbatch = hashtable->nbatch;
+ pstate->batches = InvalidDsaPointer;
+
+ /* Free this backend's old accessors. */
+ ExecParallelHashCloseBatchAccessors(hashtable);
+
+ /* Figure out how many batches to use. */
+ if (hashtable->nbatch == 1)
+ {
+ /*
+ * We are going from single-batch to multi-batch. We need
+ * to switch from one large combined memory budget to the
+ * regular work_mem budget.
+ */
+ pstate->space_allowed = work_mem * 1024L;
+
+ /*
+ * The combined work_mem of all participants wasn't
+ * enough. Therefore one batch per participant would be
+ * approximately equivalent and would probably also be
+ * insufficient. So try two batches per particiant,
+ * rounded up to a power of two.
+ */
+ new_nbatch = 1 << my_log2(pstate->nparticipants * 2);
+ }
+ else
+ {
+ /*
+ * We were already multi-batched. Try doubling the number
+ * of batches.
+ */
+ new_nbatch = hashtable->nbatch * 2;
+ }
+
+ /* Allocate new larger generation of batches. */
+ Assert(hashtable->nbatch == pstate->nbatch);
+ ExecParallelHashJoinSetUpBatches(hashtable, new_nbatch);
+ Assert(hashtable->nbatch == pstate->nbatch);
+
+ /* Replace or recycle batch 0's bucket array. */
+ if (pstate->old_nbatch == 1)
+ {
+ double dtuples;
+ double dbuckets;
+ int new_nbuckets;
+
+ /*
+ * We probably also need a smaller bucket array. How many
+ * tuples do we expect per batch, assuming we have only
+ * half of them so far? Normally we don't need to change
+ * the bucket array's size, because the size of each batch
+ * stays the same as we add more batches, but in this
+ * special case we move from a large batch to many smaller
+ * batches and it would be wasteful to keep the large
+ * array.
+ */
+ dtuples = (old_batch0->ntuples * 2.0) / new_nbatch;
+ dbuckets = ceil(dtuples / NTUP_PER_BUCKET);
+ dbuckets = Min(dbuckets,
+ MaxAllocSize / sizeof(dsa_pointer_atomic));
+ new_nbuckets = (int) dbuckets;
+ new_nbuckets = Max(new_nbuckets, 1024);
+ new_nbuckets = 1 << my_log2(new_nbuckets);
+ dsa_free(hashtable->area, old_batch0->buckets);
+ hashtable->batches[0].shared->buckets =
+ dsa_allocate(hashtable->area,
+ sizeof(dsa_pointer_atomic) * new_nbuckets);
+ buckets = (dsa_pointer_atomic *)
+ dsa_get_address(hashtable->area,
+ hashtable->batches[0].shared->buckets);
+ for (i = 0; i < new_nbuckets; ++i)
+ dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer);
+ pstate->nbuckets = new_nbuckets;
+ }
+ else
+ {
+ /* Recycle the existing bucket array. */
+ hashtable->batches[0].shared->buckets = old_batch0->buckets;
+ buckets = (dsa_pointer_atomic *)
+ dsa_get_address(hashtable->area, old_batch0->buckets);
+ for (i = 0; i < hashtable->nbuckets; ++i)
+ dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer);
+ }
+
+ /* Move all chunks to the work queue for parallel processing. */
+ pstate->chunk_work_queue = old_batch0->chunks;
+
+ /* Disable further growth temporarily while we're growing. */
+ pstate->growth = PHJ_GROWTH_DISABLED;
+ }
+ else
+ {
+ /* All other participants just flush their tuples to disk. */
+ ExecParallelHashCloseBatchAccessors(hashtable);
+ }
+ /* Fall through. */
+
+ case PHJ_GROW_BATCHES_ALLOCATING:
+ /* Wait for the above to be finished. */
+ BarrierArriveAndWait(&pstate->grow_batches_barrier,
+ WAIT_EVENT_HASH_GROW_BATCHES_ALLOCATING);
+ /* Fall through. */
+
+ case PHJ_GROW_BATCHES_REPARTITIONING:
+ /* Make sure that we have the current dimensions and buckets. */
+ ExecParallelHashEnsureBatchAccessors(hashtable);
+ ExecParallelHashTableSetCurrentBatch(hashtable, 0);
+ /* Then partition, flush counters. */
+ ExecParallelHashRepartitionFirst(hashtable);
+ ExecParallelHashRepartitionRest(hashtable);
+ ExecParallelHashMergeCounters(hashtable);
+ /* Wait for the above to be finished. */
+ BarrierArriveAndWait(&pstate->grow_batches_barrier,
+ WAIT_EVENT_HASH_GROW_BATCHES_REPARTITIONING);
+ /* Fall through. */
+
+ case PHJ_GROW_BATCHES_DECIDING:
+
+ /*
+ * Elect one participant to clean up and decide whether further
+ * repartitioning is needed, or should be disabled because it's
+ * not helping.
+ */
+ if (BarrierArriveAndWait(&pstate->grow_batches_barrier,
+ WAIT_EVENT_HASH_GROW_BATCHES_DECIDING))
+ {
+ bool space_exhausted = false;
+ bool extreme_skew_detected = false;
+
+ /* Make sure that we have the current dimensions and buckets. */
+ ExecParallelHashEnsureBatchAccessors(hashtable);
+ ExecParallelHashTableSetCurrentBatch(hashtable, 0);
+
+ /* Are any of the new generation of batches exhausted? */
+ for (i = 0; i < hashtable->nbatch; ++i)
+ {
+ ParallelHashJoinBatch *batch = hashtable->batches[i].shared;
+
+ if (batch->space_exhausted ||
+ batch->estimated_size > pstate->space_allowed)
+ {
+ int parent;
+
+ space_exhausted = true;
+
+ /*
+ * Did this batch receive ALL of the tuples from its
+ * parent batch? That would indicate that further
+ * repartitioning isn't going to help (the hash values
+ * are probably all the same).
+ */
+ parent = i % pstate->old_nbatch;
+ if (batch->ntuples == hashtable->batches[parent].shared->old_ntuples)
+ extreme_skew_detected = true;
+ }
+ }
+
+ /* Don't keep growing if it's not helping or we'd overflow. */
+ if (extreme_skew_detected || hashtable->nbatch >= INT_MAX / 2)
+ pstate->growth = PHJ_GROWTH_DISABLED;
+ else if (space_exhausted)
+ pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES;
+ else
+ pstate->growth = PHJ_GROWTH_OK;
+
+ /* Free the old batches in shared memory. */
+ dsa_free(hashtable->area, pstate->old_batches);
+ pstate->old_batches = InvalidDsaPointer;
+ }
+ /* Fall through. */
+
+ case PHJ_GROW_BATCHES_FINISHING:
+ /* Wait for the above to complete. */
+ BarrierArriveAndWait(&pstate->grow_batches_barrier,
+ WAIT_EVENT_HASH_GROW_BATCHES_FINISHING);
+ }
+}
+
+/*
+ * Repartition the tuples currently loaded into memory for inner batch 0
+ * because the number of batches has been increased. Some tuples are retained
+ * in memory and some are written out to a later batch.
+ */
+static void
+ExecParallelHashRepartitionFirst(HashJoinTable hashtable)
+{
+ dsa_pointer chunk_shared;
+ HashMemoryChunk chunk;
+
+ Assert(hashtable->nbatch = hashtable->parallel_state->nbatch);
+
+ while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_shared)))
+ {
+ size_t idx = 0;
+
+ /* Repartition all tuples in this chunk. */
+ while (idx < chunk->used)
+ {
+ HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx);
+ MinimalTuple tuple = HJTUPLE_MINTUPLE(hashTuple);
+ HashJoinTuple copyTuple;
+ dsa_pointer shared;
+ int bucketno;
+ int batchno;
+
+ ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
+ &bucketno, &batchno);
+
+ Assert(batchno < hashtable->nbatch);
+ if (batchno == 0)
+ {
+ /* It still belongs in batch 0. Copy to a new chunk. */
+ copyTuple =
+ ExecParallelHashTupleAlloc(hashtable,
+ HJTUPLE_OVERHEAD + tuple->t_len,
+ &shared);
+ copyTuple->hashvalue = hashTuple->hashvalue;
+ memcpy(HJTUPLE_MINTUPLE(copyTuple), tuple, tuple->t_len);
+ ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
+ copyTuple, shared);
+ }
+ else
+ {
+ size_t tuple_size =
+ MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
+
+ /* It belongs in a later batch. */
+ hashtable->batches[batchno].estimated_size += tuple_size;
+ sts_puttuple(hashtable->batches[batchno].inner_tuples,
+ &hashTuple->hashvalue, tuple);
+ }
+
+ /* Count this tuple. */
+ ++hashtable->batches[0].old_ntuples;
+ ++hashtable->batches[batchno].ntuples;
+
+ idx += MAXALIGN(HJTUPLE_OVERHEAD +
+ HJTUPLE_MINTUPLE(hashTuple)->t_len);
+ }
+
+ /* Free this chunk. */
+ dsa_free(hashtable->area, chunk_shared);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+}
+
+/*
+ * Help repartition inner batches 1..n.
+ */
+static void
+ExecParallelHashRepartitionRest(HashJoinTable hashtable)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ int old_nbatch = pstate->old_nbatch;
+ SharedTuplestoreAccessor **old_inner_tuples;
+ ParallelHashJoinBatch *old_batches;
+ int i;
+
+ /* Get our hands on the previous generation of batches. */
+ old_batches = (ParallelHashJoinBatch *)
+ dsa_get_address(hashtable->area, pstate->old_batches);
+ old_inner_tuples = palloc0(sizeof(SharedTuplestoreAccessor *) * old_nbatch);
+ for (i = 1; i < old_nbatch; ++i)
+ {
+ ParallelHashJoinBatch *shared =
+ NthParallelHashJoinBatch(old_batches, i);
+
+ old_inner_tuples[i] = sts_attach(ParallelHashJoinBatchInner(shared),
+ ParallelWorkerNumber + 1,
+ &pstate->fileset);
+ }
+
+ /* Join in the effort to repartition them. */
+ for (i = 1; i < old_nbatch; ++i)
+ {
+ MinimalTuple tuple;
+ uint32 hashvalue;
+
+ /* Scan one partition from the previous generation. */
+ sts_begin_parallel_scan(old_inner_tuples[i]);
+ while ((tuple = sts_parallel_scan_next(old_inner_tuples[i], &hashvalue)))
+ {
+ size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
+ int bucketno;
+ int batchno;
+
+ /* Decide which partition it goes to in the new generation. */
+ ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno,
+ &batchno);
+
+ hashtable->batches[batchno].estimated_size += tuple_size;
+ ++hashtable->batches[batchno].ntuples;
+ ++hashtable->batches[i].old_ntuples;
+
+ /* Store the tuple its new batch. */
+ sts_puttuple(hashtable->batches[batchno].inner_tuples,
+ &hashvalue, tuple);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+ sts_end_parallel_scan(old_inner_tuples[i]);
+ }
+
+ pfree(old_inner_tuples);
+}
+
+/*
+ * Transfer the backend-local per-batch counters to the shared totals.
+ */
+static void
+ExecParallelHashMergeCounters(HashJoinTable hashtable)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ int i;
+
+ LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
+ pstate->total_tuples = 0;
+ for (i = 0; i < hashtable->nbatch; ++i)
+ {
+ ParallelHashJoinBatchAccessor *batch = &hashtable->batches[i];
+
+ batch->shared->size += batch->size;
+ batch->shared->estimated_size += batch->estimated_size;
+ batch->shared->ntuples += batch->ntuples;
+ batch->shared->old_ntuples += batch->old_ntuples;
+ batch->size = 0;
+ batch->estimated_size = 0;
+ batch->ntuples = 0;
+ batch->old_ntuples = 0;
+ pstate->total_tuples += batch->shared->ntuples;
+ }
+ LWLockRelease(&pstate->lock);
+}
+
+/*
* ExecHashIncreaseNumBuckets
* increase the original number of buckets in order to reduce
* number of tuples per bucket
@@ -782,14 +1453,15 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
* ExecHashIncreaseNumBatches, but without all the copying into new
* chunks)
*/
- hashtable->buckets =
- (HashJoinTuple *) repalloc(hashtable->buckets,
+ hashtable->buckets.unshared =
+ (HashJoinTuple *) repalloc(hashtable->buckets.unshared,
hashtable->nbuckets * sizeof(HashJoinTuple));
- memset(hashtable->buckets, 0, hashtable->nbuckets * sizeof(HashJoinTuple));
+ memset(hashtable->buckets.unshared, 0,
+ hashtable->nbuckets * sizeof(HashJoinTuple));
/* scan through all tuples in all chunks to rebuild the hash table */
- for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next)
+ for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared)
{
/* process all tuples stored in this chunk */
size_t idx = 0;
@@ -804,8 +1476,8 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
&bucketno, &batchno);
/* add the tuple to the proper bucket */
- hashTuple->next = hashtable->buckets[bucketno];
- hashtable->buckets[bucketno] = hashTuple;
+ hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
+ hashtable->buckets.unshared[bucketno] = hashTuple;
/* advance index past the tuple */
idx += MAXALIGN(HJTUPLE_OVERHEAD +
@@ -817,6 +1489,93 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
}
}
+static void
+ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ int i;
+ HashMemoryChunk chunk;
+ dsa_pointer chunk_s;
+
+ Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
+
+ /*
+ * It's unlikely, but we need to be prepared for new participants to show
+ * up while we're in the middle of this operation so we need to switch on
+ * barrier phase here.
+ */
+ switch (PHJ_GROW_BUCKETS_PHASE(BarrierPhase(&pstate->grow_buckets_barrier)))
+ {
+ case PHJ_GROW_BUCKETS_ELECTING:
+ /* Elect one participant to prepare to increase nbuckets. */
+ if (BarrierArriveAndWait(&pstate->grow_buckets_barrier,
+ WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING))
+ {
+ size_t size;
+ dsa_pointer_atomic *buckets;
+
+ /* Double the size of the bucket array. */
+ pstate->nbuckets *= 2;
+ size = pstate->nbuckets * sizeof(dsa_pointer_atomic);
+ hashtable->batches[0].shared->size += size / 2;
+ dsa_free(hashtable->area, hashtable->batches[0].shared->buckets);
+ hashtable->batches[0].shared->buckets =
+ dsa_allocate(hashtable->area, size);
+ buckets = (dsa_pointer_atomic *)
+ dsa_get_address(hashtable->area,
+ hashtable->batches[0].shared->buckets);
+ for (i = 0; i < pstate->nbuckets; ++i)
+ dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer);
+
+ /* Put the chunk list onto the work queue. */
+ pstate->chunk_work_queue = hashtable->batches[0].shared->chunks;
+
+ /* Clear the flag. */
+ pstate->growth = PHJ_GROWTH_OK;
+ }
+ /* Fall through. */
+
+ case PHJ_GROW_BUCKETS_ALLOCATING:
+ /* Wait for the above to complete. */
+ BarrierArriveAndWait(&pstate->grow_buckets_barrier,
+ WAIT_EVENT_HASH_GROW_BUCKETS_ALLOCATING);
+ /* Fall through. */
+
+ case PHJ_GROW_BUCKETS_REINSERTING:
+ /* Reinsert all tuples into the hash table. */
+ ExecParallelHashEnsureBatchAccessors(hashtable);
+ ExecParallelHashTableSetCurrentBatch(hashtable, 0);
+ while ((chunk = ExecParallelHashPopChunkQueue(hashtable, &chunk_s)))
+ {
+ size_t idx = 0;
+
+ while (idx < chunk->used)
+ {
+ HashJoinTuple hashTuple = (HashJoinTuple) (chunk->data + idx);
+ dsa_pointer shared = chunk_s + HASH_CHUNK_HEADER_SIZE + idx;
+ int bucketno;
+ int batchno;
+
+ ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue,
+ &bucketno, &batchno);
+ Assert(batchno == 0);
+
+ /* add the tuple to the proper bucket */
+ ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
+ hashTuple, shared);
+
+ /* advance index past the tuple */
+ idx += MAXALIGN(HJTUPLE_OVERHEAD +
+ HJTUPLE_MINTUPLE(hashTuple)->t_len);
+ }
+
+ /* allow this loop to be cancellable */
+ CHECK_FOR_INTERRUPTS();
+ }
+ BarrierArriveAndWait(&pstate->grow_buckets_barrier,
+ WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING);
+ }
+}
/*
* ExecHashTableInsert
@@ -869,8 +1628,8 @@ ExecHashTableInsert(HashJoinTable hashtable,
HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
/* Push it onto the front of the bucket's list */
- hashTuple->next = hashtable->buckets[bucketno];
- hashtable->buckets[bucketno] = hashTuple;
+ hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
+ hashtable->buckets.unshared[bucketno] = hashTuple;
/*
* Increase the (optimal) number of buckets if we just exceeded the
@@ -911,6 +1670,94 @@ ExecHashTableInsert(HashJoinTable hashtable,
}
/*
+ * ExecHashTableParallelInsert
+ * insert a tuple into a shared hash table or shared batch tuplestore
+ */
+void
+ExecParallelHashTableInsert(HashJoinTable hashtable,
+ TupleTableSlot *slot,
+ uint32 hashvalue)
+{
+ MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
+ dsa_pointer shared;
+ int bucketno;
+ int batchno;
+
+retry:
+ ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
+
+ if (batchno == 0)
+ {
+ HashJoinTuple hashTuple;
+
+ /* Try to load it into memory. */
+ Assert(BarrierPhase(&hashtable->parallel_state->build_barrier) ==
+ PHJ_BUILD_HASHING_INNER);
+ hashTuple = ExecParallelHashTupleAlloc(hashtable,
+ HJTUPLE_OVERHEAD + tuple->t_len,
+ &shared);
+ if (hashTuple == NULL)
+ goto retry;
+
+ /* Store the hash value in the HashJoinTuple header. */
+ hashTuple->hashvalue = hashvalue;
+ memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
+
+ /* Push it onto the front of the bucket's list */
+ ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
+ hashTuple, shared);
+ }
+ else
+ {
+ size_t tuple_size = MAXALIGN(HJTUPLE_OVERHEAD + tuple->t_len);
+
+ Assert(batchno > 0);
+
+ /* Try to preallocate space in the batch if necessary. */
+ if (hashtable->batches[batchno].preallocated < tuple_size)
+ {
+ if (!ExecParallelHashTuplePrealloc(hashtable, batchno, tuple_size))
+ goto retry;
+ }
+
+ Assert(hashtable->batches[batchno].preallocated >= tuple_size);
+ hashtable->batches[batchno].preallocated -= tuple_size;
+ sts_puttuple(hashtable->batches[batchno].inner_tuples, &hashvalue,
+ tuple);
+ }
+ ++hashtable->batches[batchno].ntuples;
+}
+
+/*
+ * Insert a tuple into the current hash table. Unlike
+ * ExecParallelHashTableInsert, this version is not prepared to send the tuple
+ * to other batches or to run out of memory, and should only be called with
+ * tuples that belong in the current batch once growth has been disabled.
+ */
+void
+ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable,
+ TupleTableSlot *slot,
+ uint32 hashvalue)
+{
+ MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot);
+ HashJoinTuple hashTuple;
+ dsa_pointer shared;
+ int batchno;
+ int bucketno;
+
+ ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
+ Assert(batchno == hashtable->curbatch);
+ hashTuple = ExecParallelHashTupleAlloc(hashtable,
+ HJTUPLE_OVERHEAD + tuple->t_len,
+ &shared);
+ hashTuple->hashvalue = hashvalue;
+ memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
+ HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
+ ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
+ hashTuple, shared);
+}
+
+/*
* ExecHashGetHashValue
* Compute the hash value for a tuple
*
@@ -1076,11 +1923,11 @@ ExecScanHashBucket(HashJoinState *hjstate,
* otherwise scan the standard hashtable bucket.
*/
if (hashTuple != NULL)
- hashTuple = hashTuple->next;
+ hashTuple = hashTuple->next.unshared;
else if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
else
- hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
+ hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
while (hashTuple != NULL)
{
@@ -1104,7 +1951,67 @@ ExecScanHashBucket(HashJoinState *hjstate,
}
}
- hashTuple = hashTuple->next;
+ hashTuple = hashTuple->next.unshared;
+ }
+
+ /*
+ * no match
+ */
+ return false;
+}
+
+/*
+ * ExecParallelScanHashBucket
+ * scan a hash bucket for matches to the current outer tuple
+ *
+ * The current outer tuple must be stored in econtext->ecxt_outertuple.
+ *
+ * On success, the inner tuple is stored into hjstate->hj_CurTuple and
+ * econtext->ecxt_innertuple, using hjstate->hj_HashTupleSlot as the slot
+ * for the latter.
+ */
+bool
+ExecParallelScanHashBucket(HashJoinState *hjstate,
+ ExprContext *econtext)
+{
+ ExprState *hjclauses = hjstate->hashclauses;
+ HashJoinTable hashtable = hjstate->hj_HashTable;
+ HashJoinTuple hashTuple = hjstate->hj_CurTuple;
+ uint32 hashvalue = hjstate->hj_CurHashValue;
+
+ /*
+ * hj_CurTuple is the address of the tuple last returned from the current
+ * bucket, or NULL if it's time to start scanning a new bucket.
+ */
+ if (hashTuple != NULL)
+ hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
+ else
+ hashTuple = ExecParallelHashFirstTuple(hashtable,
+ hjstate->hj_CurBucketNo);
+
+ while (hashTuple != NULL)
+ {
+ if (hashTuple->hashvalue == hashvalue)
+ {
+ TupleTableSlot *inntuple;
+
+ /* insert hashtable's tuple into exec slot so ExecQual sees it */
+ inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+ hjstate->hj_HashTupleSlot,
+ false); /* do not pfree */
+ econtext->ecxt_innertuple = inntuple;
+
+ /* reset temp memory each time to avoid leaks from qual expr */
+ ResetExprContext(econtext);
+
+ if (ExecQual(hjclauses, econtext))
+ {
+ hjstate->hj_CurTuple = hashTuple;
+ return true;
+ }
+ }
+
+ hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple);
}
/*
@@ -1155,10 +2062,10 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
* bucket.
*/
if (hashTuple != NULL)
- hashTuple = hashTuple->next;
+ hashTuple = hashTuple->next.unshared;
else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
{
- hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
+ hashTuple = hashtable->buckets.unshared[hjstate->hj_CurBucketNo];
hjstate->hj_CurBucketNo++;
}
else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
@@ -1194,7 +2101,7 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
return true;
}
- hashTuple = hashTuple->next;
+ hashTuple = hashTuple->next.unshared;
}
/* allow this loop to be cancellable */
@@ -1226,7 +2133,7 @@ ExecHashTableReset(HashJoinTable hashtable)
oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
/* Reallocate and reinitialize the hash bucket headers. */
- hashtable->buckets = (HashJoinTuple *)
+ hashtable->buckets.unshared = (HashJoinTuple *)
palloc0(nbuckets * sizeof(HashJoinTuple));
hashtable->spaceUsed = 0;
@@ -1250,7 +2157,8 @@ ExecHashTableResetMatchFlags(HashJoinTable hashtable)
/* Reset all flags in the main table ... */
for (i = 0; i < hashtable->nbuckets; i++)
{
- for (tuple = hashtable->buckets[i]; tuple != NULL; tuple = tuple->next)
+ for (tuple = hashtable->buckets.unshared[i]; tuple != NULL;
+ tuple = tuple->next.unshared)
HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
}
@@ -1260,7 +2168,7 @@ ExecHashTableResetMatchFlags(HashJoinTable hashtable)
int j = hashtable->skewBucketNums[i];
HashSkewBucket *skewBucket = hashtable->skewBucket[j];
- for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next)
+ for (tuple = skewBucket->tuples; tuple != NULL; tuple = tuple->next.unshared)
HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(tuple));
}
}
@@ -1505,8 +2413,9 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
/* Push it onto the front of the skew bucket's list */
- hashTuple->next = hashtable->skewBucket[bucketNumber]->tuples;
+ hashTuple->next.unshared = hashtable->skewBucket[bucketNumber]->tuples;
hashtable->skewBucket[bucketNumber]->tuples = hashTuple;
+ Assert(hashTuple != hashTuple->next.unshared);
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
@@ -1554,7 +2463,7 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
hashTuple = bucket->tuples;
while (hashTuple != NULL)
{
- HashJoinTuple nextHashTuple = hashTuple->next;
+ HashJoinTuple nextHashTuple = hashTuple->next.unshared;
MinimalTuple tuple;
Size tupleSize;
@@ -1580,8 +2489,8 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
memcpy(copyTuple, hashTuple, tupleSize);
pfree(hashTuple);
- copyTuple->next = hashtable->buckets[bucketno];
- hashtable->buckets[bucketno] = copyTuple;
+ copyTuple->next.unshared = hashtable->buckets.unshared[bucketno];
+ hashtable->buckets.unshared[bucketno] = copyTuple;
/* We have reduced skew space, but overall space doesn't change */
hashtable->spaceUsedSkew -= tupleSize;
@@ -1760,11 +2669,11 @@ dense_alloc(HashJoinTable hashtable, Size size)
if (hashtable->chunks != NULL)
{
newChunk->next = hashtable->chunks->next;
- hashtable->chunks->next = newChunk;
+ hashtable->chunks->next.unshared = newChunk;
}
else
{
- newChunk->next = hashtable->chunks;
+ newChunk->next.unshared = hashtable->chunks;
hashtable->chunks = newChunk;
}
@@ -1789,7 +2698,7 @@ dense_alloc(HashJoinTable hashtable, Size size)
newChunk->used = size;
newChunk->ntuples = 1;
- newChunk->next = hashtable->chunks;
+ newChunk->next.unshared = hashtable->chunks;
hashtable->chunks = newChunk;
return newChunk->data;
@@ -1803,3 +2712,601 @@ dense_alloc(HashJoinTable hashtable, Size size)
/* return pointer to the start of the tuple memory */
return ptr;
}
+
+/*
+ * Allocate space for a tuple in shared dense storage. This is equivalent to
+ * dense_alloc but for Parallel Hash using shared memory.
+ *
+ * While loading a tuple into shared memory, we might run out of memory and
+ * decide to repartition, or determine that the load factor is too high and
+ * decide to expand the bucket array, or discover that another participant has
+ * commanded us to help do that. Return NULL if number of buckets or batches
+ * has changed, indicating that the caller must retry (considering the
+ * possibility that the tuple no longer belongs in the same batch).
+ */
+static HashJoinTuple
+ExecParallelHashTupleAlloc(HashJoinTable hashtable, size_t size,
+ dsa_pointer *shared)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ dsa_pointer chunk_shared;
+ HashMemoryChunk chunk;
+ Size chunk_size;
+ HashJoinTuple result;
+ int curbatch = hashtable->curbatch;
+
+ size = MAXALIGN(size);
+
+ /*
+ * Fast path: if there is enough space in this backend's current chunk,
+ * then we can allocate without any locking.
+ */
+ chunk = hashtable->current_chunk;
+ if (chunk != NULL &&
+ size < HASH_CHUNK_THRESHOLD &&
+ chunk->maxlen - chunk->used >= size)
+ {
+
+ chunk_shared = hashtable->current_chunk_shared;
+ Assert(chunk == dsa_get_address(hashtable->area, chunk_shared));
+ *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE + chunk->used;
+ result = (HashJoinTuple) (chunk->data + chunk->used);
+ chunk->used += size;
+
+ Assert(chunk->used <= chunk->maxlen);
+ Assert(result == dsa_get_address(hashtable->area, *shared));
+
+ return result;
+ }
+
+ /* Slow path: try to allocate a new chunk. */
+ LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
+
+ /*
+ * Check if we need to help increase the number of buckets or batches.
+ */
+ if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES ||
+ pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
+ {
+ ParallelHashGrowth growth = pstate->growth;
+
+ hashtable->current_chunk = NULL;
+ LWLockRelease(&pstate->lock);
+
+ /* Another participant has commanded us to help grow. */
+ if (growth == PHJ_GROWTH_NEED_MORE_BATCHES)
+ ExecParallelHashIncreaseNumBatches(hashtable);
+ else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
+ ExecParallelHashIncreaseNumBuckets(hashtable);
+
+ /* The caller must retry. */
+ return NULL;
+ }
+
+ /* Oversized tuples get their own chunk. */
+ if (size > HASH_CHUNK_THRESHOLD)
+ chunk_size = size + HASH_CHUNK_HEADER_SIZE;
+ else
+ chunk_size = HASH_CHUNK_SIZE;
+
+ /* Check if it's time to grow batches or buckets. */
+ if (pstate->growth != PHJ_GROWTH_DISABLED)
+ {
+ Assert(curbatch == 0);
+ Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
+
+ /*
+ * Check if our space limit would be exceeded. To avoid choking on
+ * very large tuples or very low work_mem setting, we'll always allow
+ * each backend to allocate at least one chunk.
+ */
+ if (hashtable->batches[0].at_least_one_chunk &&
+ hashtable->batches[0].shared->size +
+ chunk_size > pstate->space_allowed)
+ {
+ pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES;
+ hashtable->batches[0].shared->space_exhausted = true;
+ LWLockRelease(&pstate->lock);
+
+ return NULL;
+ }
+
+ /* Check if our load factor limit would be exceeded. */
+ if (hashtable->nbatch == 1)
+ {
+ hashtable->batches[0].shared->ntuples += hashtable->batches[0].ntuples;
+ hashtable->batches[0].ntuples = 0;
+ if (hashtable->batches[0].shared->ntuples + 1 >
+ hashtable->nbuckets * NTUP_PER_BUCKET &&
+ hashtable->nbuckets < (INT_MAX / 2))
+ {
+ pstate->growth = PHJ_GROWTH_NEED_MORE_BUCKETS;
+ LWLockRelease(&pstate->lock);
+
+ return NULL;
+ }
+ }
+ }
+
+ /* We are cleared to allocate a new chunk. */
+ chunk_shared = dsa_allocate(hashtable->area, chunk_size);
+ hashtable->batches[curbatch].shared->size += chunk_size;
+ hashtable->batches[curbatch].at_least_one_chunk = true;
+
+ /* Set up the chunk. */
+ chunk = (HashMemoryChunk) dsa_get_address(hashtable->area, chunk_shared);
+ *shared = chunk_shared + HASH_CHUNK_HEADER_SIZE;
+ chunk->maxlen = chunk_size - HASH_CHUNK_HEADER_SIZE;
+ chunk->used = size;
+
+ /*
+ * Push it onto the list of chunks, so that it can be found if we need to
+ * increase the number of buckets or batches (batch 0 only) and later for
+ * freeing the memory (all batches).
+ */
+ chunk->next.shared = hashtable->batches[curbatch].shared->chunks;
+ hashtable->batches[curbatch].shared->chunks = chunk_shared;
+
+ if (size <= HASH_CHUNK_THRESHOLD)
+ {
+ /*
+ * Make this the current chunk so that we can use the fast path to
+ * fill the rest of it up in future calls.
+ */
+ hashtable->current_chunk = chunk;
+ hashtable->current_chunk_shared = chunk_shared;
+ }
+ LWLockRelease(&pstate->lock);
+
+ Assert(chunk->data == dsa_get_address(hashtable->area, *shared));
+ result = (HashJoinTuple) chunk->data;
+
+ return result;
+}
+
+/*
+ * One backend needs to set up the shared batch state including tuplestores.
+ * Other backends will ensure they have correctly configured accessors by
+ * called ExecParallelHashEnsureBatchAccessors().
+ */
+static void
+ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ ParallelHashJoinBatch *batches;
+ MemoryContext oldcxt;
+ int i;
+
+ Assert(hashtable->batches == NULL);
+
+ /* Allocate space. */
+ pstate->batches =
+ dsa_allocate0(hashtable->area,
+ EstimateParallelHashJoinBatch(hashtable) * nbatch);
+ pstate->nbatch = nbatch;
+ batches = dsa_get_address(hashtable->area, pstate->batches);
+
+ /* Use hash join memory context. */
+ oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
+
+ /* Allocate this backend's accessor array. */
+ hashtable->nbatch = nbatch;
+ hashtable->batches = (ParallelHashJoinBatchAccessor *)
+ palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch);
+
+ /* Set up the shared state, tuplestores and backend-local accessors. */
+ for (i = 0; i < hashtable->nbatch; ++i)
+ {
+ ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i];
+ ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i);
+ char name[MAXPGPATH];
+
+ /*
+ * All members of shared were zero-initialized. We just need to set
+ * up the Barrier.
+ */
+ BarrierInit(&shared->batch_barrier, 0);
+ if (i == 0)
+ {
+ /* Batch 0 doesn't need to be loaded. */
+ BarrierAttach(&shared->batch_barrier);
+ while (BarrierPhase(&shared->batch_barrier) < PHJ_BATCH_PROBING)
+ BarrierArriveAndWait(&shared->batch_barrier, 0);
+ BarrierDetach(&shared->batch_barrier);
+ }
+
+ /* Initialize accessor state. All members were zero-initialized. */
+ accessor->shared = shared;
+
+ /* Initialize the shared tuplestores. */
+ snprintf(name, sizeof(name), "i%dof%d", i, hashtable->nbatch);
+ accessor->inner_tuples =
+ sts_initialize(ParallelHashJoinBatchInner(shared),
+ pstate->nparticipants,
+ ParallelWorkerNumber + 1,
+ sizeof(uint32),
+ SHARED_TUPLESTORE_SINGLE_PASS,
+ &pstate->fileset,
+ name);
+ snprintf(name, sizeof(name), "o%dof%d", i, hashtable->nbatch);
+ accessor->outer_tuples =
+ sts_initialize(ParallelHashJoinBatchOuter(shared,
+ pstate->nparticipants),
+ pstate->nparticipants,
+ ParallelWorkerNumber + 1,
+ sizeof(uint32),
+ SHARED_TUPLESTORE_SINGLE_PASS,
+ &pstate->fileset,
+ name);
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * Free the current set of ParallelHashJoinBatchAccessor objects.
+ */
+static void
+ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable)
+{
+ int i;
+
+ for (i = 0; i < hashtable->nbatch; ++i)
+ {
+ /* Make sure no files are left open. */
+ sts_end_write(hashtable->batches[i].inner_tuples);
+ sts_end_write(hashtable->batches[i].outer_tuples);
+ sts_end_parallel_scan(hashtable->batches[i].inner_tuples);
+ sts_end_parallel_scan(hashtable->batches[i].outer_tuples);
+ }
+ pfree(hashtable->batches);
+ hashtable->batches = NULL;
+}
+
+/*
+ * Make sure this backend has up-to-date accessors for the current set of
+ * batches.
+ */
+static void
+ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ ParallelHashJoinBatch *batches;
+ MemoryContext oldcxt;
+ int i;
+
+ if (hashtable->batches != NULL)
+ {
+ if (hashtable->nbatch == pstate->nbatch)
+ return;
+ ExecParallelHashCloseBatchAccessors(hashtable);
+ }
+
+ /*
+ * It's possible for a backend to start up very late so that the whole
+ * join is finished and the shm state for tracking batches has already
+ * been freed by ExecHashTableDetach(). In that case we'll just leave
+ * hashtable->batches as NULL so that ExecParallelHashJoinNewBatch() gives
+ * up early.
+ */
+ if (!DsaPointerIsValid(pstate->batches))
+ return;
+
+ /* Use hash join memory context. */
+ oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
+
+ /* Allocate this backend's accessor array. */
+ hashtable->nbatch = pstate->nbatch;
+ hashtable->batches = (ParallelHashJoinBatchAccessor *)
+ palloc0(sizeof(ParallelHashJoinBatchAccessor) * hashtable->nbatch);
+
+ /* Find the base of the pseudo-array of ParallelHashJoinBatch objects. */
+ batches = (ParallelHashJoinBatch *)
+ dsa_get_address(hashtable->area, pstate->batches);
+
+ /* Set up the accessor array and attach to the tuplestores. */
+ for (i = 0; i < hashtable->nbatch; ++i)
+ {
+ ParallelHashJoinBatchAccessor *accessor = &hashtable->batches[i];
+ ParallelHashJoinBatch *shared = NthParallelHashJoinBatch(batches, i);
+
+ accessor->shared = shared;
+ accessor->preallocated = 0;
+ accessor->done = false;
+ accessor->inner_tuples =
+ sts_attach(ParallelHashJoinBatchInner(shared),
+ ParallelWorkerNumber + 1,
+ &pstate->fileset);
+ accessor->outer_tuples =
+ sts_attach(ParallelHashJoinBatchOuter(shared,
+ pstate->nparticipants),
+ ParallelWorkerNumber + 1,
+ &pstate->fileset);
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * Allocate an empty shared memory hash table for a given batch.
+ */
+void
+ExecParallelHashTableAlloc(HashJoinTable hashtable, int batchno)
+{
+ ParallelHashJoinBatch *batch = hashtable->batches[batchno].shared;
+ dsa_pointer_atomic *buckets;
+ int nbuckets = hashtable->parallel_state->nbuckets;
+ int i;
+
+ batch->buckets =
+ dsa_allocate(hashtable->area, sizeof(dsa_pointer_atomic) * nbuckets);
+ buckets = (dsa_pointer_atomic *)
+ dsa_get_address(hashtable->area, batch->buckets);
+ for (i = 0; i < nbuckets; ++i)
+ dsa_pointer_atomic_init(&buckets[i], InvalidDsaPointer);
+}
+
+/*
+ * If we are currently attached to a shared hash join batch, detach. If we
+ * are last to detach, clean up.
+ */
+void
+ExecHashTableDetachBatch(HashJoinTable hashtable)
+{
+ if (hashtable->parallel_state != NULL &&
+ hashtable->curbatch >= 0)
+ {
+ int curbatch = hashtable->curbatch;
+ ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared;
+
+ /* Make sure any temporary files are closed. */
+ sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples);
+ sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples);
+
+ /* Detach from the batch we were last working on. */
+ if (BarrierArriveAndDetach(&batch->batch_barrier))
+ {
+ /*
+ * Technically we shouldn't access the barrier because we're no
+ * longer attached, but since there is no way it's moving after
+ * this point it seems safe to make the following assertion.
+ */
+ Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE);
+
+ /* Free shared chunks and buckets. */
+ while (DsaPointerIsValid(batch->chunks))
+ {
+ HashMemoryChunk chunk =
+ dsa_get_address(hashtable->area, batch->chunks);
+ dsa_pointer next = chunk->next.shared;
+
+ dsa_free(hashtable->area, batch->chunks);
+ batch->chunks = next;
+ }
+ if (DsaPointerIsValid(batch->buckets))
+ {
+ dsa_free(hashtable->area, batch->buckets);
+ batch->buckets = InvalidDsaPointer;
+ }
+ }
+ ExecParallelHashUpdateSpacePeak(hashtable, curbatch);
+ /* Remember that we are not attached to a batch. */
+ hashtable->curbatch = -1;
+ }
+}
+
+/*
+ * Detach from all shared resources. If we are last to detach, clean up.
+ */
+void
+ExecHashTableDetach(HashJoinTable hashtable)
+{
+ if (hashtable->parallel_state)
+ {
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ int i;
+
+ /* Make sure any temporary files are closed. */
+ if (hashtable->batches)
+ {
+ for (i = 0; i < hashtable->nbatch; ++i)
+ {
+ sts_end_write(hashtable->batches[i].inner_tuples);
+ sts_end_write(hashtable->batches[i].outer_tuples);
+ sts_end_parallel_scan(hashtable->batches[i].inner_tuples);
+ sts_end_parallel_scan(hashtable->batches[i].outer_tuples);
+ }
+ }
+
+ /* If we're last to detach, clean up shared memory. */
+ if (BarrierDetach(&pstate->build_barrier))
+ {
+ if (DsaPointerIsValid(pstate->batches))
+ {
+ dsa_free(hashtable->area, pstate->batches);
+ pstate->batches = InvalidDsaPointer;
+ }
+ }
+
+ hashtable->parallel_state = NULL;
+ }
+}
+
+/*
+ * Get the first tuple in a given bucket identified by number.
+ */
+static inline HashJoinTuple
+ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno)
+{
+ HashJoinTuple tuple;
+ dsa_pointer p;
+
+ Assert(hashtable->parallel_state);
+ p = dsa_pointer_atomic_read(&hashtable->buckets.shared[bucketno]);
+ tuple = (HashJoinTuple) dsa_get_address(hashtable->area, p);
+
+ return tuple;
+}
+
+/*
+ * Get the next tuple in the same bucket as 'tuple'.
+ */
+static inline HashJoinTuple
+ExecParallelHashNextTuple(HashJoinTable hashtable, HashJoinTuple tuple)
+{
+ HashJoinTuple next;
+
+ Assert(hashtable->parallel_state);
+ next = (HashJoinTuple) dsa_get_address(hashtable->area, tuple->next.shared);
+
+ return next;
+}
+
+/*
+ * Insert a tuple at the front of a chain of tuples in DSA memory atomically.
+ */
+static inline void
+ExecParallelHashPushTuple(dsa_pointer_atomic *head,
+ HashJoinTuple tuple,
+ dsa_pointer tuple_shared)
+{
+ for (;;)
+ {
+ tuple->next.shared = dsa_pointer_atomic_read(head);
+ if (dsa_pointer_atomic_compare_exchange(head,
+ &tuple->next.shared,
+ tuple_shared))
+ break;
+ }
+}
+
+/*
+ * Prepare to work on a given batch.
+ */
+void
+ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno)
+{
+ Assert(hashtable->batches[batchno].shared->buckets != InvalidDsaPointer);
+
+ hashtable->curbatch = batchno;
+ hashtable->buckets.shared = (dsa_pointer_atomic *)
+ dsa_get_address(hashtable->area,
+ hashtable->batches[batchno].shared->buckets);
+ hashtable->nbuckets = hashtable->parallel_state->nbuckets;
+ hashtable->log2_nbuckets = my_log2(hashtable->nbuckets);
+ hashtable->current_chunk = NULL;
+ hashtable->current_chunk_shared = InvalidDsaPointer;
+ hashtable->batches[batchno].at_least_one_chunk = false;
+}
+
+/*
+ * Take the next available chunk from the queue of chunks being worked on in
+ * parallel. Return NULL if there are none left. Otherwise return a pointer
+ * to the chunk, and set *shared to the DSA pointer to the chunk.
+ */
+static HashMemoryChunk
+ExecParallelHashPopChunkQueue(HashJoinTable hashtable, dsa_pointer *shared)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ HashMemoryChunk chunk;
+
+ LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
+ if (DsaPointerIsValid(pstate->chunk_work_queue))
+ {
+ *shared = pstate->chunk_work_queue;
+ chunk = (HashMemoryChunk)
+ dsa_get_address(hashtable->area, *shared);
+ pstate->chunk_work_queue = chunk->next.shared;
+ }
+ else
+ chunk = NULL;
+ LWLockRelease(&pstate->lock);
+
+ return chunk;
+}
+
+/*
+ * Increase the space preallocated in this backend for a given inner batch by
+ * at least a given amount. This allows us to track whether a given batch
+ * would fit in memory when loaded back in. Also increase the number of
+ * batches or buckets if required.
+ *
+ * This maintains a running estimation of how much space will be taken when we
+ * load the batch back into memory by simulating the way chunks will be handed
+ * out to workers. It's not perfectly accurate because the tuples will be
+ * packed into memory chunks differently by ExecParallelHashTupleAlloc(), but
+ * it should be pretty close. It tends to overestimate by a fraction of a
+ * chunk per worker since all workers gang up to preallocate during hashing,
+ * but workers tend to reload batches alone if there are enough to go around,
+ * leaving fewer partially filled chunks. This effect is bounded by
+ * nparticipants.
+ *
+ * Return false if the number of batches or buckets has changed, and the
+ * caller should reconsider which batch a given tuple now belongs in and call
+ * again.
+ */
+static bool
+ExecParallelHashTuplePrealloc(HashJoinTable hashtable, int batchno, size_t size)
+{
+ ParallelHashJoinState *pstate = hashtable->parallel_state;
+ ParallelHashJoinBatchAccessor *batch = &hashtable->batches[batchno];
+ size_t want = Max(size, HASH_CHUNK_SIZE - HASH_CHUNK_HEADER_SIZE);
+
+ Assert(batchno > 0);
+ Assert(batchno < hashtable->nbatch);
+
+ LWLockAcquire(&pstate->lock, LW_EXCLUSIVE);
+
+ /* Has another participant commanded us to help grow? */
+ if (pstate->growth == PHJ_GROWTH_NEED_MORE_BATCHES ||
+ pstate->growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
+ {
+ ParallelHashGrowth growth = pstate->growth;
+
+ LWLockRelease(&pstate->lock);
+ if (growth == PHJ_GROWTH_NEED_MORE_BATCHES)
+ ExecParallelHashIncreaseNumBatches(hashtable);
+ else if (growth == PHJ_GROWTH_NEED_MORE_BUCKETS)
+ ExecParallelHashIncreaseNumBuckets(hashtable);
+
+ return false;
+ }
+
+ if (pstate->growth != PHJ_GROWTH_DISABLED &&
+ batch->at_least_one_chunk &&
+ (batch->shared->estimated_size + size > pstate->space_allowed))
+ {
+ /*
+ * We have determined that this batch would exceed the space budget if
+ * loaded into memory. Command all participants to help repartition.
+ */
+ batch->shared->space_exhausted = true;
+ pstate->growth = PHJ_GROWTH_NEED_MORE_BATCHES;
+ LWLockRelease(&pstate->lock);
+
+ return false;
+ }
+
+ batch->at_least_one_chunk = true;
+ batch->shared->estimated_size += want + HASH_CHUNK_HEADER_SIZE;
+ batch->preallocated = want;
+ LWLockRelease(&pstate->lock);
+
+ return true;
+}
+
+/*
+ * Update this backend's copy of hashtable->spacePeak to account for a given
+ * batch. This is called at the end of hashing for batch 0, and then for each
+ * batch when it is done or discovered to be already done. The result is used
+ * for EXPLAIN output.
+ */
+void
+ExecParallelHashUpdateSpacePeak(HashJoinTable hashtable, int batchno)
+{
+ size_t size;
+
+ size = hashtable->batches[batchno].shared->size;
+ size += sizeof(dsa_pointer_atomic) * hashtable->nbuckets;
+ hashtable->spacePeak = Max(hashtable->spacePeak, size);
+}