summaryrefslogtreecommitdiff
path: root/src/backend/utils
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/utils')
-rw-r--r--src/backend/utils/sort/tuplesort.c556
1 files changed, 516 insertions, 40 deletions
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 67d86ed83b4..c3f666e2f11 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -138,7 +138,8 @@ bool optimize_bounded_sort = true;
* The objects we actually sort are SortTuple structs. These contain
* a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
* which is a separate palloc chunk --- we assume it is just one chunk and
- * can be freed by a simple pfree(). SortTuples also contain the tuple's
+ * can be freed by a simple pfree() (except during final on-the-fly merge,
+ * when memory is used in batch). SortTuples also contain the tuple's
* first key column in Datum/nullflag format, and an index integer.
*
* Storing the first key column lets us save heap_getattr or index_getattr
@@ -220,11 +221,13 @@ struct Tuplesortstate
* tuples to return? */
bool boundUsed; /* true if we made use of a bounded heap */
int bound; /* if bounded, the maximum number of tuples */
+ bool tuples; /* Can SortTuple.tuple ever be set? */
int64 availMem; /* remaining memory available, in bytes */
int64 allowedMem; /* total memory allowed, in bytes */
int maxTapes; /* number of tapes (Knuth's T) */
int tapeRange; /* maxTapes-1 (Knuth's P) */
- MemoryContext sortcontext; /* memory context holding all sort data */
+ MemoryContext sortcontext; /* memory context holding most sort data */
+ MemoryContext tuplecontext; /* memory context holding tuple data */
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
/*
@@ -281,6 +284,15 @@ struct Tuplesortstate
bool growmemtuples; /* memtuples' growth still underway? */
/*
+ * Memory for tuples is sometimes allocated in batch, rather than
+ * incrementally. This implies that incremental memory accounting has been
+ * abandoned. Currently, this only happens for the final on-the-fly merge
+ * step. Large batch allocations can store tuples (e.g. IndexTuples)
+ * without palloc() fragmentation and other overhead.
+ */
+ bool batchUsed;
+
+ /*
* While building initial runs, this is the current output run number
* (starting at 0). Afterwards, it is the number of initial runs we made.
*/
@@ -315,6 +327,21 @@ struct Tuplesortstate
int mergefirstfree; /* first slot never used in this merge */
/*
+ * Per-tape batch state, when final on-the-fly merge consumes memory from
+ * just a few large allocations.
+ *
+ * Aside from the general benefits of performing fewer individual retail
+ * palloc() calls, this also helps make merging more cache efficient, since
+ * each tape's tuples must naturally be accessed sequentially (in sorted
+ * order).
+ */
+ int64 spacePerTape; /* Space (memory) for tuples (not slots) */
+ char **mergetuples; /* Each tape's memory allocation */
+ char **mergecurrent; /* Current offset into each tape's memory */
+ char **mergetail; /* Last item's start point for each tape */
+ char **mergeoverflow; /* Retail palloc() "overflow" for each tape */
+
+ /*
* Variables for Algorithm D. Note that destTape is a "logical" tape
* number, ie, an index into the tp_xxx[] arrays. Be careful to keep
* "logical" and "actual" tape numbers straight!
@@ -389,9 +416,8 @@ struct Tuplesortstate
* tuplesort_begin_datum and used only by the DatumTuple routines.
*/
Oid datumType;
- /* we need typelen and byval in order to know how to copy the Datums. */
+ /* we need typelen in order to know how to copy the Datums. */
int datumTypeLen;
- bool datumTypeByVal;
/*
* Resource snapshot for time of sort start.
@@ -405,7 +431,7 @@ struct Tuplesortstate
#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
-#define LACKMEM(state) ((state)->availMem < 0)
+#define LACKMEM(state) ((state)->availMem < 0 && !(state)->batchUsed)
#define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt))
@@ -447,7 +473,13 @@ struct Tuplesortstate
* rather than the originally-requested size. This is important since
* palloc can add substantial overhead. It's not a complete answer since
* we won't count any wasted space in palloc allocation blocks, but it's
- * a lot better than what we were doing before 7.3.
+ * a lot better than what we were doing before 7.3. As of 9.6, a
+ * separate memory context is used for caller passed tuples. Resetting
+ * it at certain key increments significantly ameliorates fragmentation.
+ * Note that this places a responsibility on readtup and copytup routines
+ * to use the right memory context for these tuples (and to not use the
+ * reset context for anything whose lifetime needs to span multiple
+ * external sort runs).
*/
/* When using this macro, beware of double evaluation of len */
@@ -465,7 +497,14 @@ static void inittapes(Tuplesortstate *state);
static void selectnewtape(Tuplesortstate *state);
static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state);
-static void beginmerge(Tuplesortstate *state);
+static void beginmerge(Tuplesortstate *state, bool finalMerge);
+static void batchmemtuples(Tuplesortstate *state);
+static void mergebatch(Tuplesortstate *state, int64 spacePerTape);
+static void mergebatchone(Tuplesortstate *state, int srcTape,
+ SortTuple *stup, bool *should_free);
+static void mergebatchfreetape(Tuplesortstate *state, int srcTape,
+ SortTuple *rtup, bool *should_free);
+static void *mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen);
static void mergepreread(Tuplesortstate *state);
static void mergeprereadone(Tuplesortstate *state, int srcTape);
static void dumptuples(Tuplesortstate *state, bool alltuples);
@@ -477,6 +516,7 @@ static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
static void reversedirection(Tuplesortstate *state);
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
static void markrunend(Tuplesortstate *state, int tapenum);
+static void *readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen);
static int comparetup_heap(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state);
static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
@@ -543,6 +583,7 @@ tuplesort_begin_common(int workMem, bool randomAccess)
{
Tuplesortstate *state;
MemoryContext sortcontext;
+ MemoryContext tuplecontext;
MemoryContext oldcontext;
/*
@@ -550,12 +591,27 @@ tuplesort_begin_common(int workMem, bool randomAccess)
* needed by the sort will live inside this context.
*/
sortcontext = AllocSetContextCreate(CurrentMemoryContext,
- "TupleSort",
+ "TupleSort main",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/*
+ * Caller tuple (e.g. IndexTuple) memory context.
+ *
+ * A dedicated child content used exclusively for caller passed tuples
+ * eases memory management. Resetting at key points reduces fragmentation.
+ * Note that the memtuples array of SortTuples is allocated in the parent
+ * context, not this context, because there is no need to free memtuples
+ * early.
+ */
+ tuplecontext = AllocSetContextCreate(sortcontext,
+ "Caller tuples",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /*
* Make the Tuplesortstate within the per-sort context. This way, we
* don't need a separate pfree() operation for it at shutdown.
*/
@@ -571,10 +627,12 @@ tuplesort_begin_common(int workMem, bool randomAccess)
state->status = TSS_INITIAL;
state->randomAccess = randomAccess;
state->bounded = false;
+ state->tuples = true;
state->boundUsed = false;
state->allowedMem = workMem * (int64) 1024;
state->availMem = state->allowedMem;
state->sortcontext = sortcontext;
+ state->tuplecontext = tuplecontext;
state->tapeset = NULL;
state->memtupcount = 0;
@@ -587,6 +645,7 @@ tuplesort_begin_common(int workMem, bool randomAccess)
ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
state->growmemtuples = true;
+ state->batchUsed = false;
state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
USEMEM(state, GetMemoryChunkSpace(state->memtuples));
@@ -922,7 +981,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
/* lookup necessary attributes of the datum type */
get_typlenbyval(datumType, &typlen, &typbyval);
state->datumTypeLen = typlen;
- state->datumTypeByVal = typbyval;
+ state->tuples = !typbyval;
/* Prepare SortSupport data */
state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));
@@ -1258,7 +1317,7 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
ItemPointer self, Datum *values,
bool *isnull)
{
- MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
SortTuple stup;
Datum original;
IndexTuple tuple;
@@ -1273,6 +1332,8 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
RelationGetDescr(state->indexRel),
&stup.isnull1);
+ MemoryContextSwitchTo(state->sortcontext);
+
if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
{
/*
@@ -1333,7 +1394,7 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
void
tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
{
- MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
SortTuple stup;
/*
@@ -1348,7 +1409,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
* identical to stup.tuple.
*/
- if (isNull || state->datumTypeByVal)
+ if (isNull || !state->tuples)
{
/*
* Set datum1 to zeroed representation for NULLs (to be consistent, and
@@ -1357,6 +1418,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
stup.datum1 = !isNull ? val : (Datum) 0;
stup.isnull1 = isNull;
stup.tuple = NULL; /* no separate storage */
+ MemoryContextSwitchTo(state->sortcontext);
}
else
{
@@ -1365,6 +1427,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
stup.isnull1 = false;
stup.tuple = DatumGetPointer(original);
USEMEM(state, GetMemoryChunkSpace(stup.tuple));
+ MemoryContextSwitchTo(state->sortcontext);
if (!state->sortKeys->abbrev_converter)
{
@@ -1670,6 +1733,7 @@ tuplesort_performsort(Tuplesortstate *state)
* Internal routine to fetch the next tuple in either forward or back
* direction into *stup. Returns FALSE if no more tuples.
* If *should_free is set, the caller must pfree stup.tuple when done with it.
+ * Otherwise, caller should not use tuple following next call here.
*/
static bool
tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
@@ -1681,6 +1745,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
{
case TSS_SORTEDINMEM:
Assert(forward || state->randomAccess);
+ Assert(!state->batchUsed);
*should_free = false;
if (forward)
{
@@ -1725,6 +1790,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
case TSS_SORTEDONTAPE:
Assert(forward || state->randomAccess);
+ Assert(!state->batchUsed);
*should_free = true;
if (forward)
{
@@ -1810,7 +1876,9 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
case TSS_FINALMERGE:
Assert(forward);
- *should_free = true;
+ Assert(state->batchUsed || !state->tuples);
+ /* For now, assume tuple is stored in tape's batch memory */
+ *should_free = false;
/*
* This code should match the inner loop of mergeonerun().
@@ -1818,18 +1886,17 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
if (state->memtupcount > 0)
{
int srcTape = state->memtuples[0].tupindex;
- Size tuplen;
int tupIndex;
SortTuple *newtup;
+ /*
+ * Returned tuple is still counted in our memory space most
+ * of the time. See mergebatchone() for discussion of why
+ * caller may occasionally be required to free returned
+ * tuple, and how preread memory is managed with regard to
+ * edge cases more generally.
+ */
*stup = state->memtuples[0];
- /* returned tuple is no longer counted in our memory space */
- if (stup->tuple)
- {
- tuplen = GetMemoryChunkSpace(stup->tuple);
- state->availMem += tuplen;
- state->mergeavailmem[srcTape] += tuplen;
- }
tuplesort_heap_siftup(state, false);
if ((tupIndex = state->mergenext[srcTape]) == 0)
{
@@ -1837,15 +1904,25 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* out of preloaded data on this tape, try to read more
*
* Unlike mergeonerun(), we only preload from the single
- * tape that's run dry. See mergepreread() comments.
+ * tape that's run dry, though not before preparing its
+ * batch memory for a new round of sequential consumption.
+ * See mergepreread() comments.
*/
+ if (state->batchUsed)
+ mergebatchone(state, srcTape, stup, should_free);
+
mergeprereadone(state, srcTape);
/*
* if still no data, we've reached end of run on this tape
*/
if ((tupIndex = state->mergenext[srcTape]) == 0)
+ {
+ /* Free tape's buffer, avoiding dangling pointer */
+ if (state->batchUsed)
+ mergebatchfreetape(state, srcTape, stup, should_free);
return true;
+ }
}
/* pull next preread tuple from list, insert in heap */
newtup = &state->memtuples[tupIndex];
@@ -1912,6 +1989,8 @@ tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
* Fetch the next tuple in either forward or back direction.
* Returns NULL if no more tuples. If *should_free is set, the
* caller must pfree the returned tuple when done with it.
+ * If it is not set, caller should not use tuple following next
+ * call here.
*/
HeapTuple
tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
@@ -1931,6 +2010,8 @@ tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
* Fetch the next index tuple in either forward or back direction.
* Returns NULL if no more tuples. If *should_free is set, the
* caller must pfree the returned tuple when done with it.
+ * If it is not set, caller should not use tuple following next
+ * call here.
*/
IndexTuple
tuplesort_getindextuple(Tuplesortstate *state, bool forward,
@@ -1979,7 +2060,7 @@ tuplesort_getdatum(Tuplesortstate *state, bool forward,
if (state->sortKeys->abbrev_converter && abbrev)
*abbrev = stup.datum1;
- if (stup.isnull1 || state->datumTypeByVal)
+ if (stup.isnull1 || !state->tuples)
{
*val = stup.datum1;
*isNull = stup.isnull1;
@@ -2162,6 +2243,10 @@ inittapes(Tuplesortstate *state)
state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64));
+ state->mergetuples = (char **) palloc0(maxTapes * sizeof(char *));
+ state->mergecurrent = (char **) palloc0(maxTapes * sizeof(char *));
+ state->mergetail = (char **) palloc0(maxTapes * sizeof(char *));
+ state->mergeoverflow = (char **) palloc0(maxTapes * sizeof(char *));
state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
@@ -2320,7 +2405,7 @@ mergeruns(Tuplesortstate *state)
/* Tell logtape.c we won't be writing anymore */
LogicalTapeSetForgetFreeSpace(state->tapeset);
/* Initialize for the final merge pass */
- beginmerge(state);
+ beginmerge(state, state->tuples);
state->status = TSS_FINALMERGE;
return;
}
@@ -2412,7 +2497,7 @@ mergeonerun(Tuplesortstate *state)
* Start the merge by loading one tuple from each active source tape into
* the heap. We can also decrease the input run/dummy run counts.
*/
- beginmerge(state);
+ beginmerge(state, false);
/*
* Execute merge by repeatedly extracting lowest tuple in heap, writing it
@@ -2451,6 +2536,12 @@ mergeonerun(Tuplesortstate *state)
}
/*
+ * Reset tuple memory, now that no caller tuples are needed in memory.
+ * This prevents fragmentation.
+ */
+ MemoryContextReset(state->tuplecontext);
+
+ /*
* When the heap empties, we're done. Write an end-of-run marker on the
* output tape, and increment its count of real runs.
*/
@@ -2471,9 +2562,12 @@ mergeonerun(Tuplesortstate *state)
* which tapes contain active input runs in mergeactive[]. Then, load
* as many tuples as we can from each active input tape, and finally
* fill the merge heap with the first tuple from each active tape.
+ *
+ * finalMergeBatch indicates if this is the beginning of a final on-the-fly
+ * merge where a batched allocation of tuple memory is required.
*/
static void
-beginmerge(Tuplesortstate *state)
+beginmerge(Tuplesortstate *state, bool finalMergeBatch)
{
int activeTapes;
int tapenum;
@@ -2511,6 +2605,18 @@ beginmerge(Tuplesortstate *state)
state->mergefreelist = 0; /* nothing in the freelist */
state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
+ if (finalMergeBatch)
+ {
+ /* Free outright buffers for tape never actually allocated */
+ FREEMEM(state, (state->maxTapes - activeTapes) * TAPE_BUFFER_OVERHEAD);
+
+ /*
+ * Grow memtuples one last time, since the palloc() overhead no longer
+ * incurred can make a big difference
+ */
+ batchmemtuples(state);
+ }
+
/*
* Initialize space allocation to let each active input tape have an equal
* share of preread space.
@@ -2518,7 +2624,7 @@ beginmerge(Tuplesortstate *state)
Assert(activeTapes > 0);
slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
Assert(slotsPerTape > 0);
- spacePerTape = state->availMem / activeTapes;
+ spacePerTape = MAXALIGN_DOWN(state->availMem / activeTapes);
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
if (state->mergeactive[srcTape])
@@ -2529,6 +2635,15 @@ beginmerge(Tuplesortstate *state)
}
/*
+ * Preallocate tuple batch memory for each tape. This is the memory used
+ * for tuples themselves (not SortTuples), so it's never used by
+ * pass-by-value datum sorts. Memory allocation is performed here at most
+ * once per sort, just in advance of the final on-the-fly merge step.
+ */
+ if (finalMergeBatch)
+ mergebatch(state, spacePerTape);
+
+ /*
* Preread as many tuples as possible (and at least one) from each active
* tape
*/
@@ -2551,8 +2666,316 @@ beginmerge(Tuplesortstate *state)
tup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex;
state->mergeavailslots[srcTape]++;
+
+#ifdef TRACE_SORT
+ if (trace_sort && finalMergeBatch)
+ {
+ int64 perTapeKB = (spacePerTape + 1023) / 1024;
+ int64 usedSpaceKB;
+ int usedSlots;
+
+ /*
+ * Report how effective batchmemtuples() was in balancing
+ * the number of slots against the need for memory for the
+ * underlying tuples (e.g. IndexTuples). The big preread of
+ * all tapes when switching to FINALMERGE state should be
+ * fairly representative of memory utilization during the
+ * final merge step, and in any case is the only point at
+ * which all tapes are guaranteed to have depleted either
+ * their batch memory allowance or slot allowance. Ideally,
+ * both will be completely depleted for every tape by now.
+ */
+ usedSpaceKB = (state->mergecurrent[srcTape] -
+ state->mergetuples[srcTape] + 1023) / 1024;
+ usedSlots = slotsPerTape - state->mergeavailslots[srcTape];
+
+ elog(LOG, "tape %d initially used %ld KB of %ld KB batch "
+ "(%2.3f) and %d out of %d slots (%2.3f)", srcTape,
+ usedSpaceKB, perTapeKB,
+ (double) usedSpaceKB / (double) perTapeKB,
+ usedSlots, slotsPerTape,
+ (double) usedSlots / (double) slotsPerTape);
+ }
+#endif
+ }
+ }
+}
+
+/*
+ * batchmemtuples - grow memtuples without palloc overhead
+ *
+ * When called, availMem should be approximately the amount of memory we'd
+ * require to allocate memtupsize - memtupcount tuples (not SortTuples/slots)
+ * that were allocated with palloc() overhead, and in doing so use up all
+ * allocated slots. However, though slots and tuple memory is in balance
+ * following the last grow_memtuples() call, that's predicated on the observed
+ * average tuple size for the "final" grow_memtuples() call, which includes
+ * palloc overhead.
+ *
+ * This will perform an actual final grow_memtuples() call without any palloc()
+ * overhead, rebalancing the use of memory between slots and tuples.
+ */
+static void
+batchmemtuples(Tuplesortstate *state)
+{
+ int64 refund;
+ int64 availMemLessRefund;
+ int memtupsize = state->memtupsize;
+
+ /* For simplicity, assume no memtuples are actually currently counted */
+ Assert(state->memtupcount == 0);
+
+ /*
+ * Refund STANDARDCHUNKHEADERSIZE per tuple.
+ *
+ * This sometimes fails to make memory use prefectly balanced, but it
+ * should never make the situation worse. Note that Assert-enabled builds
+ * get a larger refund, due to a varying STANDARDCHUNKHEADERSIZE.
+ */
+ refund = memtupsize * STANDARDCHUNKHEADERSIZE;
+ availMemLessRefund = state->availMem - refund;
+
+ /*
+ * To establish balanced memory use after refunding palloc overhead,
+ * temporarily have our accounting indicate that we've allocated all
+ * memory we're allowed to less that refund, and call grow_memtuples()
+ * to have it increase the number of slots.
+ */
+ state->growmemtuples = true;
+ USEMEM(state, availMemLessRefund);
+ (void) grow_memtuples(state);
+ /* Should not matter, but be tidy */
+ FREEMEM(state, availMemLessRefund);
+ state->growmemtuples = false;
+
+#ifdef TRACE_SORT
+ if (trace_sort)
+ {
+ Size OldKb = (memtupsize * sizeof(SortTuple) + 1023) / 1024;
+ Size NewKb = (state->memtupsize * sizeof(SortTuple) + 1023) / 1024;
+
+ elog(LOG, "grew memtuples %1.2fx from %d (%zu KB) to %d (%zu KB) for final merge",
+ (double) NewKb / (double) OldKb,
+ memtupsize, OldKb,
+ state->memtupsize, NewKb);
+ }
+#endif
+}
+
+/*
+ * mergebatch - initialize tuple memory in batch
+ *
+ * This allows sequential access to sorted tuples buffered in memory from
+ * tapes/runs on disk during a final on-the-fly merge step. Note that the
+ * memory is not used for SortTuples, but for the underlying tuples (e.g.
+ * MinimalTuples).
+ *
+ * Note that when batch memory is used, there is a simple division of space
+ * into large buffers (one per active tape). The conventional incremental
+ * memory accounting (calling USEMEM() and FREEMEM()) is abandoned. Instead,
+ * when each tape's memory budget is exceeded, a retail palloc() "overflow" is
+ * performed, which is then immediately detected in a way that is analogous to
+ * LACKMEM(). This keeps each tape's use of memory fair, which is always a
+ * goal.
+ */
+static void
+mergebatch(Tuplesortstate *state, int64 spacePerTape)
+{
+ int srcTape;
+
+ Assert(state->activeTapes > 0);
+ Assert(state->tuples);
+
+ /*
+ * For the purposes of tuplesort's memory accounting, the batch allocation
+ * is special, and regular memory accounting through USEMEM() calls is
+ * abandoned (see mergeprereadone()).
+ */
+ for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
+ {
+ char *mergetuples;
+
+ if (!state->mergeactive[srcTape])
+ continue;
+
+ /* Allocate buffer for each active tape */
+ mergetuples = MemoryContextAllocHuge(state->tuplecontext,
+ spacePerTape);
+
+ /* Initialize state for tape */
+ state->mergetuples[srcTape] = mergetuples;
+ state->mergecurrent[srcTape] = mergetuples;
+ state->mergetail[srcTape] = mergetuples;
+ state->mergeoverflow[srcTape] = NULL;
+ }
+
+ state->batchUsed = true;
+ state->spacePerTape = spacePerTape;
+}
+
+/*
+ * mergebatchone - prepare batch memory for one merge input tape
+ *
+ * This is called following the exhaustion of preread tuples for one input
+ * tape. All that actually occurs is that the state for the source tape is
+ * reset to indicate that all memory may be reused.
+ *
+ * This routine must deal with fixing up the tuple that is about to be returned
+ * to the client, due to "overflow" allocations.
+ */
+static void
+mergebatchone(Tuplesortstate *state, int srcTape, SortTuple *rtup,
+ bool *should_free)
+{
+ Assert(state->batchUsed);
+
+ /*
+ * Tuple about to be returned to caller ("stup") is final preread tuple
+ * from tape, just removed from the top of the heap. Special steps around
+ * memory management must be performed for that tuple, to make sure it
+ * isn't overwritten early.
+ */
+ if (!state->mergeoverflow[srcTape])
+ {
+ Size tupLen;
+
+ /*
+ * Mark tuple buffer range for reuse, but be careful to move final,
+ * tail tuple to start of space for next run so that it's available
+ * to caller when stup is returned, and remains available at least
+ * until the next tuple is requested.
+ */
+ tupLen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
+ state->mergecurrent[srcTape] = state->mergetuples[srcTape];
+ memmove(state->mergecurrent[srcTape], state->mergetail[srcTape],
+ tupLen);
+
+ /* Make SortTuple at top of the merge heap point to new tuple */
+ rtup->tuple = (void *) state->mergecurrent[srcTape];
+
+ state->mergetail[srcTape] = state->mergecurrent[srcTape];
+ state->mergecurrent[srcTape] += tupLen;
+ }
+ else
+ {
+ /*
+ * Handle an "overflow" retail palloc.
+ *
+ * This is needed when we run out of tuple memory for the tape.
+ */
+ state->mergecurrent[srcTape] = state->mergetuples[srcTape];
+ state->mergetail[srcTape] = state->mergetuples[srcTape];
+
+ if (rtup->tuple)
+ {
+ Assert(rtup->tuple == (void *) state->mergeoverflow[srcTape]);
+ /* Caller should free palloc'd tuple */
+ *should_free = true;
}
+ state->mergeoverflow[srcTape] = NULL;
+ }
+}
+
+/*
+ * mergebatchfreetape - handle final clean-up for batch memory once tape is
+ * about to become exhausted
+ *
+ * All tuples are returned from tape, but a single final tuple, *rtup, is to be
+ * passed back to caller. Free tape's batch allocation buffer while ensuring
+ * that the final tuple is managed appropriately.
+ */
+static void
+mergebatchfreetape(Tuplesortstate *state, int srcTape, SortTuple *rtup,
+ bool *should_free)
+{
+ Assert(state->batchUsed);
+ Assert(state->status == TSS_FINALMERGE);
+
+ /*
+ * Tuple may or may not already be an overflow allocation from
+ * mergebatchone()
+ */
+ if (!*should_free && rtup->tuple)
+ {
+ /*
+ * Final tuple still in tape's batch allocation.
+ *
+ * Return palloc()'d copy to caller, and have it freed in a similar
+ * manner to overflow allocation. Otherwise, we'd free batch memory
+ * and pass back a pointer to garbage. Note that we deliberately
+ * allocate this in the parent tuplesort context, to be on the safe
+ * side.
+ */
+ Size tuplen;
+ void *oldTuple = rtup->tuple;
+
+ tuplen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
+ rtup->tuple = MemoryContextAlloc(state->sortcontext, tuplen);
+ memcpy(rtup->tuple, oldTuple, tuplen);
+ *should_free = true;
+ }
+
+ /* Free spacePerTape-sized buffer */
+ pfree(state->mergetuples[srcTape]);
+}
+
+/*
+ * mergebatchalloc - allocate memory for one tuple using a batch memory
+ * "logical allocation".
+ *
+ * This is used for the final on-the-fly merge phase only. READTUP() routines
+ * receive memory from here in place of palloc() and USEMEM() calls.
+ *
+ * Tuple tapenum is passed, ensuring each tape's tuples are stored in sorted,
+ * contiguous order (while allowing safe reuse of memory made available to
+ * each tape). This maximizes locality of access as tuples are returned by
+ * final merge.
+ *
+ * Caller must not subsequently attempt to free memory returned here. In
+ * general, only mergebatch* functions know about how memory returned from
+ * here should be freed, and this function's caller must ensure that batch
+ * memory management code will definitely have the opportunity to do the right
+ * thing during the final on-the-fly merge.
+ */
+static void *
+mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen)
+{
+ Size reserve_tuplen = MAXALIGN(tuplen);
+ char *ret;
+
+ /* Should overflow at most once before mergebatchone() call: */
+ Assert(state->mergeoverflow[tapenum] == NULL);
+ Assert(state->batchUsed);
+
+ /* It should be possible to use precisely spacePerTape memory at once */
+ if (state->mergecurrent[tapenum] + reserve_tuplen <=
+ state->mergetuples[tapenum] + state->spacePerTape)
+ {
+ /*
+ * Usual case -- caller is returned pointer into its tape's buffer, and
+ * an offset from that point is recorded as where tape has consumed up
+ * to for current round of preloading.
+ */
+ ret = state->mergetail[tapenum] = state->mergecurrent[tapenum];
+ state->mergecurrent[tapenum] += reserve_tuplen;
}
+ else
+ {
+ /*
+ * Allocate memory, and record as tape's overflow allocation. This
+ * will be detected quickly, in a similar fashion to a LACKMEM()
+ * condition, and should not happen again before a new round of
+ * preloading for caller's tape. Note that we deliberately allocate
+ * this in the parent tuplesort context, to be on the safe side.
+ *
+ * Sometimes, this does not happen because merging runs out of slots
+ * before running out of memory.
+ */
+ ret = state->mergeoverflow[tapenum] =
+ MemoryContextAlloc(state->sortcontext, tuplen);
+ }
+
+ return ret;
}
/*
@@ -2576,7 +2999,9 @@ beginmerge(Tuplesortstate *state)
* that state and so no point in scanning through all the tapes to fix one.
* (Moreover, there may be quite a lot of inactive tapes in that state, since
* we might have had many fewer runs than tapes. In a regular tape-to-tape
- * merge we can expect most of the tapes to be active.)
+ * merge we can expect most of the tapes to be active. Plus, only
+ * FINALMERGE state has to consider memory management for a batch
+ * allocation.)
*/
static void
mergepreread(Tuplesortstate *state)
@@ -2605,9 +3030,20 @@ mergeprereadone(Tuplesortstate *state, int srcTape)
if (!state->mergeactive[srcTape])
return; /* tape's run is already exhausted */
+
+ /*
+ * Manage per-tape availMem. Only actually matters when batch memory not
+ * in use.
+ */
priorAvail = state->availMem;
state->availMem = state->mergeavailmem[srcTape];
- while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
+
+ /*
+ * When batch memory is used if final on-the-fly merge, only mergeoverflow
+ * test is relevant; otherwise, only LACKMEM() test is relevant.
+ */
+ while ((state->mergeavailslots[srcTape] > 0 &&
+ state->mergeoverflow[srcTape] == NULL && !LACKMEM(state)) ||
state->mergenext[srcTape] == 0)
{
/* read next tuple, if any */
@@ -3093,6 +3529,42 @@ markrunend(Tuplesortstate *state, int tapenum)
LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
}
+/*
+ * Get memory for tuple from within READTUP() routine. Allocate
+ * memory and account for that, or consume from tape's batch
+ * allocation.
+ *
+ * Memory returned here in the final on-the-fly merge case is recycled
+ * from tape's batch allocation. Otherwise, callers must pfree() or
+ * reset tuple child memory context, and account for that with a
+ * FREEMEM(). Currently, this only ever needs to happen in WRITETUP()
+ * routines.
+ */
+static void *
+readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen)
+{
+ if (state->batchUsed)
+ {
+ /*
+ * No USEMEM() call, because during final on-the-fly merge
+ * accounting is based on tape-private state. ("Overflow"
+ * allocations are detected as an indication that a new round
+ * or preloading is required. Preloading marks existing
+ * contents of tape's batch buffer for reuse.)
+ */
+ return mergebatchalloc(state, tapenum, tuplen);
+ }
+ else
+ {
+ char *ret;
+
+ /* Batch allocation yet to be performed */
+ ret = MemoryContextAlloc(state->tuplecontext, tuplen);
+ USEMEM(state, GetMemoryChunkSpace(ret));
+ return ret;
+ }
+}
+
/*
* Routines specialized for HeapTuple (actually MinimalTuple) case
@@ -3171,6 +3643,7 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
Datum original;
MinimalTuple tuple;
HeapTupleData htup;
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
/* copy the tuple into sort storage */
tuple = ExecCopySlotMinimalTuple(slot);
@@ -3184,6 +3657,8 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
state->tupDesc,
&stup->isnull1);
+ MemoryContextSwitchTo(oldcontext);
+
if (!state->sortKeys->abbrev_converter || stup->isnull1)
{
/*
@@ -3266,11 +3741,10 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
{
unsigned int tupbodylen = len - sizeof(int);
unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
- MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
+ MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tapenum, tuplen);
char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
HeapTupleData htup;
- USEMEM(state, GetMemoryChunkSpace(tuple));
/* read in the tuple proper */
tuple->t_len = tuplen;
LogicalTapeReadExact(state->tapeset, tapenum,
@@ -3409,12 +3883,15 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
{
HeapTuple tuple = (HeapTuple) tup;
Datum original;
+ MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
/* copy the tuple into sort storage */
tuple = heap_copytuple(tuple);
stup->tuple = (void *) tuple;
USEMEM(state, GetMemoryChunkSpace(tuple));
+ MemoryContextSwitchTo(oldcontext);
+
/*
* set up first-column key value, and potentially abbreviate, if it's a
* simple column
@@ -3501,9 +3978,10 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int tuplen)
{
unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
- HeapTuple tuple = (HeapTuple) palloc(t_len + HEAPTUPLESIZE);
+ HeapTuple tuple = (HeapTuple) readtup_alloc(state,
+ tapenum,
+ t_len + HEAPTUPLESIZE);
- USEMEM(state, GetMemoryChunkSpace(tuple));
/* Reconstruct the HeapTupleData header */
tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
tuple->t_len = t_len;
@@ -3722,7 +4200,7 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
Datum original;
/* copy the tuple into sort storage */
- newtuple = (IndexTuple) palloc(tuplen);
+ newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen);
memcpy(newtuple, tuple, tuplen);
USEMEM(state, GetMemoryChunkSpace(newtuple));
stup->tuple = (void *) newtuple;
@@ -3804,9 +4282,8 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len)
{
unsigned int tuplen = len - sizeof(unsigned int);
- IndexTuple tuple = (IndexTuple) palloc(tuplen);
+ IndexTuple tuple = (IndexTuple) readtup_alloc(state, tapenum, tuplen);
- USEMEM(state, GetMemoryChunkSpace(tuple));
LogicalTapeReadExact(state->tapeset, tapenum,
tuple, tuplen);
if (state->randomAccess) /* need trailing length word? */
@@ -3864,7 +4341,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
waddr = NULL;
tuplen = 0;
}
- else if (state->datumTypeByVal)
+ else if (!state->tuples)
{
waddr = &stup->datum1;
tuplen = sizeof(Datum);
@@ -3906,7 +4383,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
stup->isnull1 = true;
stup->tuple = NULL;
}
- else if (state->datumTypeByVal)
+ else if (!state->tuples)
{
Assert(tuplen == sizeof(Datum));
LogicalTapeReadExact(state->tapeset, tapenum,
@@ -3916,14 +4393,13 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
}
else
{
- void *raddr = palloc(tuplen);
+ void *raddr = readtup_alloc(state, tapenum, tuplen);
LogicalTapeReadExact(state->tapeset, tapenum,
raddr, tuplen);
stup->datum1 = PointerGetDatum(raddr);
stup->isnull1 = false;
stup->tuple = raddr;
- USEMEM(state, GetMemoryChunkSpace(raddr));
}
if (state->randomAccess) /* need trailing length word? */