Commit 0011c009 authored by Robert Haas's avatar Robert Haas

Improve memory management for external sorts.

Introduce a new memory context which stores tuple data, and reset it
at the end of each merge pass; this helps avoid memory fragmentation
and, consequently, overallocation.  Also, for the final merge patch,
eliminate memory context chunk header overhead entirely by allocating
all of the memory used for buffering tuples during the merge in a
single chunk.  Since this modestly increases the number of tuples we
can store, grow the memtuples array a bit so that we're less likely to
run short of slots there.

Peter Geoghegan.  Review and testing of patches in this series by
Jeff Janes, Greg Stark, Mithun Cy, and me.
parent 55c3a04d
...@@ -138,7 +138,8 @@ bool optimize_bounded_sort = true; ...@@ -138,7 +138,8 @@ bool optimize_bounded_sort = true;
* The objects we actually sort are SortTuple structs. These contain * The objects we actually sort are SortTuple structs. These contain
* a pointer to the tuple proper (might be a MinimalTuple or IndexTuple), * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
* which is a separate palloc chunk --- we assume it is just one chunk and * which is a separate palloc chunk --- we assume it is just one chunk and
* can be freed by a simple pfree(). SortTuples also contain the tuple's * can be freed by a simple pfree() (except during final on-the-fly merge,
* when memory is used in batch). SortTuples also contain the tuple's
* first key column in Datum/nullflag format, and an index integer. * first key column in Datum/nullflag format, and an index integer.
* *
* Storing the first key column lets us save heap_getattr or index_getattr * Storing the first key column lets us save heap_getattr or index_getattr
...@@ -220,11 +221,13 @@ struct Tuplesortstate ...@@ -220,11 +221,13 @@ struct Tuplesortstate
* tuples to return? */ * tuples to return? */
bool boundUsed; /* true if we made use of a bounded heap */ bool boundUsed; /* true if we made use of a bounded heap */
int bound; /* if bounded, the maximum number of tuples */ int bound; /* if bounded, the maximum number of tuples */
bool tuples; /* Can SortTuple.tuple ever be set? */
int64 availMem; /* remaining memory available, in bytes */ int64 availMem; /* remaining memory available, in bytes */
int64 allowedMem; /* total memory allowed, in bytes */ int64 allowedMem; /* total memory allowed, in bytes */
int maxTapes; /* number of tapes (Knuth's T) */ int maxTapes; /* number of tapes (Knuth's T) */
int tapeRange; /* maxTapes-1 (Knuth's P) */ int tapeRange; /* maxTapes-1 (Knuth's P) */
MemoryContext sortcontext; /* memory context holding all sort data */ MemoryContext sortcontext; /* memory context holding most sort data */
MemoryContext tuplecontext; /* memory context holding tuple data */
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
/* /*
...@@ -280,6 +283,15 @@ struct Tuplesortstate ...@@ -280,6 +283,15 @@ struct Tuplesortstate
int memtupsize; /* allocated length of memtuples array */ int memtupsize; /* allocated length of memtuples array */
bool growmemtuples; /* memtuples' growth still underway? */ bool growmemtuples; /* memtuples' growth still underway? */
/*
* Memory for tuples is sometimes allocated in batch, rather than
* incrementally. This implies that incremental memory accounting has been
* abandoned. Currently, this only happens for the final on-the-fly merge
* step. Large batch allocations can store tuples (e.g. IndexTuples)
* without palloc() fragmentation and other overhead.
*/
bool batchUsed;
/* /*
* While building initial runs, this is the current output run number * While building initial runs, this is the current output run number
* (starting at 0). Afterwards, it is the number of initial runs we made. * (starting at 0). Afterwards, it is the number of initial runs we made.
...@@ -314,6 +326,21 @@ struct Tuplesortstate ...@@ -314,6 +326,21 @@ struct Tuplesortstate
int mergefreelist; /* head of freelist of recycled slots */ int mergefreelist; /* head of freelist of recycled slots */
int mergefirstfree; /* first slot never used in this merge */ int mergefirstfree; /* first slot never used in this merge */
/*
* Per-tape batch state, when final on-the-fly merge consumes memory from
* just a few large allocations.
*
* Aside from the general benefits of performing fewer individual retail
* palloc() calls, this also helps make merging more cache efficient, since
* each tape's tuples must naturally be accessed sequentially (in sorted
* order).
*/
int64 spacePerTape; /* Space (memory) for tuples (not slots) */
char **mergetuples; /* Each tape's memory allocation */
char **mergecurrent; /* Current offset into each tape's memory */
char **mergetail; /* Last item's start point for each tape */
char **mergeoverflow; /* Retail palloc() "overflow" for each tape */
/* /*
* Variables for Algorithm D. Note that destTape is a "logical" tape * Variables for Algorithm D. Note that destTape is a "logical" tape
* number, ie, an index into the tp_xxx[] arrays. Be careful to keep * number, ie, an index into the tp_xxx[] arrays. Be careful to keep
...@@ -389,9 +416,8 @@ struct Tuplesortstate ...@@ -389,9 +416,8 @@ struct Tuplesortstate
* tuplesort_begin_datum and used only by the DatumTuple routines. * tuplesort_begin_datum and used only by the DatumTuple routines.
*/ */
Oid datumType; Oid datumType;
/* we need typelen and byval in order to know how to copy the Datums. */ /* we need typelen in order to know how to copy the Datums. */
int datumTypeLen; int datumTypeLen;
bool datumTypeByVal;
/* /*
* Resource snapshot for time of sort start. * Resource snapshot for time of sort start.
...@@ -405,7 +431,7 @@ struct Tuplesortstate ...@@ -405,7 +431,7 @@ struct Tuplesortstate
#define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup)) #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
#define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup)) #define WRITETUP(state,tape,stup) ((*(state)->writetup) (state, tape, stup))
#define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len)) #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
#define LACKMEM(state) ((state)->availMem < 0) #define LACKMEM(state) ((state)->availMem < 0 && !(state)->batchUsed)
#define USEMEM(state,amt) ((state)->availMem -= (amt)) #define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt)) #define FREEMEM(state,amt) ((state)->availMem += (amt))
...@@ -447,7 +473,13 @@ struct Tuplesortstate ...@@ -447,7 +473,13 @@ struct Tuplesortstate
* rather than the originally-requested size. This is important since * rather than the originally-requested size. This is important since
* palloc can add substantial overhead. It's not a complete answer since * palloc can add substantial overhead. It's not a complete answer since
* we won't count any wasted space in palloc allocation blocks, but it's * we won't count any wasted space in palloc allocation blocks, but it's
* a lot better than what we were doing before 7.3. * a lot better than what we were doing before 7.3. As of 9.6, a
* separate memory context is used for caller passed tuples. Resetting
* it at certain key increments significantly ameliorates fragmentation.
* Note that this places a responsibility on readtup and copytup routines
* to use the right memory context for these tuples (and to not use the
* reset context for anything whose lifetime needs to span multiple
* external sort runs).
*/ */
/* When using this macro, beware of double evaluation of len */ /* When using this macro, beware of double evaluation of len */
...@@ -465,7 +497,14 @@ static void inittapes(Tuplesortstate *state); ...@@ -465,7 +497,14 @@ static void inittapes(Tuplesortstate *state);
static void selectnewtape(Tuplesortstate *state); static void selectnewtape(Tuplesortstate *state);
static void mergeruns(Tuplesortstate *state); static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state); static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state); static void beginmerge(Tuplesortstate *state, bool finalMerge);
static void batchmemtuples(Tuplesortstate *state);
static void mergebatch(Tuplesortstate *state, int64 spacePerTape);
static void mergebatchone(Tuplesortstate *state, int srcTape,
SortTuple *stup, bool *should_free);
static void mergebatchfreetape(Tuplesortstate *state, int srcTape,
SortTuple *rtup, bool *should_free);
static void *mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen);
static void mergepreread(Tuplesortstate *state); static void mergepreread(Tuplesortstate *state);
static void mergeprereadone(Tuplesortstate *state, int srcTape); static void mergeprereadone(Tuplesortstate *state, int srcTape);
static void dumptuples(Tuplesortstate *state, bool alltuples); static void dumptuples(Tuplesortstate *state, bool alltuples);
...@@ -477,6 +516,7 @@ static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex); ...@@ -477,6 +516,7 @@ static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
static void reversedirection(Tuplesortstate *state); static void reversedirection(Tuplesortstate *state);
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
static void markrunend(Tuplesortstate *state, int tapenum); static void markrunend(Tuplesortstate *state, int tapenum);
static void *readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen);
static int comparetup_heap(const SortTuple *a, const SortTuple *b, static int comparetup_heap(const SortTuple *a, const SortTuple *b,
Tuplesortstate *state); Tuplesortstate *state);
static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup); static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
...@@ -543,6 +583,7 @@ tuplesort_begin_common(int workMem, bool randomAccess) ...@@ -543,6 +583,7 @@ tuplesort_begin_common(int workMem, bool randomAccess)
{ {
Tuplesortstate *state; Tuplesortstate *state;
MemoryContext sortcontext; MemoryContext sortcontext;
MemoryContext tuplecontext;
MemoryContext oldcontext; MemoryContext oldcontext;
/* /*
...@@ -550,7 +591,22 @@ tuplesort_begin_common(int workMem, bool randomAccess) ...@@ -550,7 +591,22 @@ tuplesort_begin_common(int workMem, bool randomAccess)
* needed by the sort will live inside this context. * needed by the sort will live inside this context.
*/ */
sortcontext = AllocSetContextCreate(CurrentMemoryContext, sortcontext = AllocSetContextCreate(CurrentMemoryContext,
"TupleSort", "TupleSort main",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/*
* Caller tuple (e.g. IndexTuple) memory context.
*
* A dedicated child content used exclusively for caller passed tuples
* eases memory management. Resetting at key points reduces fragmentation.
* Note that the memtuples array of SortTuples is allocated in the parent
* context, not this context, because there is no need to free memtuples
* early.
*/
tuplecontext = AllocSetContextCreate(sortcontext,
"Caller tuples",
ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
...@@ -571,10 +627,12 @@ tuplesort_begin_common(int workMem, bool randomAccess) ...@@ -571,10 +627,12 @@ tuplesort_begin_common(int workMem, bool randomAccess)
state->status = TSS_INITIAL; state->status = TSS_INITIAL;
state->randomAccess = randomAccess; state->randomAccess = randomAccess;
state->bounded = false; state->bounded = false;
state->tuples = true;
state->boundUsed = false; state->boundUsed = false;
state->allowedMem = workMem * (int64) 1024; state->allowedMem = workMem * (int64) 1024;
state->availMem = state->allowedMem; state->availMem = state->allowedMem;
state->sortcontext = sortcontext; state->sortcontext = sortcontext;
state->tuplecontext = tuplecontext;
state->tapeset = NULL; state->tapeset = NULL;
state->memtupcount = 0; state->memtupcount = 0;
...@@ -587,6 +645,7 @@ tuplesort_begin_common(int workMem, bool randomAccess) ...@@ -587,6 +645,7 @@ tuplesort_begin_common(int workMem, bool randomAccess)
ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1); ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
state->growmemtuples = true; state->growmemtuples = true;
state->batchUsed = false;
state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple)); state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
USEMEM(state, GetMemoryChunkSpace(state->memtuples)); USEMEM(state, GetMemoryChunkSpace(state->memtuples));
...@@ -922,7 +981,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, ...@@ -922,7 +981,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation,
/* lookup necessary attributes of the datum type */ /* lookup necessary attributes of the datum type */
get_typlenbyval(datumType, &typlen, &typbyval); get_typlenbyval(datumType, &typlen, &typbyval);
state->datumTypeLen = typlen; state->datumTypeLen = typlen;
state->datumTypeByVal = typbyval; state->tuples = !typbyval;
/* Prepare SortSupport data */ /* Prepare SortSupport data */
state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData)); state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));
...@@ -1258,7 +1317,7 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, ...@@ -1258,7 +1317,7 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
ItemPointer self, Datum *values, ItemPointer self, Datum *values,
bool *isnull) bool *isnull)
{ {
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
SortTuple stup; SortTuple stup;
Datum original; Datum original;
IndexTuple tuple; IndexTuple tuple;
...@@ -1273,6 +1332,8 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, ...@@ -1273,6 +1332,8 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
RelationGetDescr(state->indexRel), RelationGetDescr(state->indexRel),
&stup.isnull1); &stup.isnull1);
MemoryContextSwitchTo(state->sortcontext);
if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1) if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
{ {
/* /*
...@@ -1333,7 +1394,7 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel, ...@@ -1333,7 +1394,7 @@ tuplesort_putindextuplevalues(Tuplesortstate *state, Relation rel,
void void
tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull) tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
{ {
MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext); MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
SortTuple stup; SortTuple stup;
/* /*
...@@ -1348,7 +1409,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull) ...@@ -1348,7 +1409,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
* identical to stup.tuple. * identical to stup.tuple.
*/ */
if (isNull || state->datumTypeByVal) if (isNull || !state->tuples)
{ {
/* /*
* Set datum1 to zeroed representation for NULLs (to be consistent, and * Set datum1 to zeroed representation for NULLs (to be consistent, and
...@@ -1357,6 +1418,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull) ...@@ -1357,6 +1418,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
stup.datum1 = !isNull ? val : (Datum) 0; stup.datum1 = !isNull ? val : (Datum) 0;
stup.isnull1 = isNull; stup.isnull1 = isNull;
stup.tuple = NULL; /* no separate storage */ stup.tuple = NULL; /* no separate storage */
MemoryContextSwitchTo(state->sortcontext);
} }
else else
{ {
...@@ -1365,6 +1427,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull) ...@@ -1365,6 +1427,7 @@ tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
stup.isnull1 = false; stup.isnull1 = false;
stup.tuple = DatumGetPointer(original); stup.tuple = DatumGetPointer(original);
USEMEM(state, GetMemoryChunkSpace(stup.tuple)); USEMEM(state, GetMemoryChunkSpace(stup.tuple));
MemoryContextSwitchTo(state->sortcontext);
if (!state->sortKeys->abbrev_converter) if (!state->sortKeys->abbrev_converter)
{ {
...@@ -1670,6 +1733,7 @@ tuplesort_performsort(Tuplesortstate *state) ...@@ -1670,6 +1733,7 @@ tuplesort_performsort(Tuplesortstate *state)
* Internal routine to fetch the next tuple in either forward or back * Internal routine to fetch the next tuple in either forward or back
* direction into *stup. Returns FALSE if no more tuples. * direction into *stup. Returns FALSE if no more tuples.
* If *should_free is set, the caller must pfree stup.tuple when done with it. * If *should_free is set, the caller must pfree stup.tuple when done with it.
* Otherwise, caller should not use tuple following next call here.
*/ */
static bool static bool
tuplesort_gettuple_common(Tuplesortstate *state, bool forward, tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
...@@ -1681,6 +1745,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, ...@@ -1681,6 +1745,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
{ {
case TSS_SORTEDINMEM: case TSS_SORTEDINMEM:
Assert(forward || state->randomAccess); Assert(forward || state->randomAccess);
Assert(!state->batchUsed);
*should_free = false; *should_free = false;
if (forward) if (forward)
{ {
...@@ -1725,6 +1790,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, ...@@ -1725,6 +1790,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
case TSS_SORTEDONTAPE: case TSS_SORTEDONTAPE:
Assert(forward || state->randomAccess); Assert(forward || state->randomAccess);
Assert(!state->batchUsed);
*should_free = true; *should_free = true;
if (forward) if (forward)
{ {
...@@ -1810,7 +1876,9 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, ...@@ -1810,7 +1876,9 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
case TSS_FINALMERGE: case TSS_FINALMERGE:
Assert(forward); Assert(forward);
*should_free = true; Assert(state->batchUsed || !state->tuples);
/* For now, assume tuple is stored in tape's batch memory */
*should_free = false;
/* /*
* This code should match the inner loop of mergeonerun(). * This code should match the inner loop of mergeonerun().
...@@ -1818,18 +1886,17 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, ...@@ -1818,18 +1886,17 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
if (state->memtupcount > 0) if (state->memtupcount > 0)
{ {
int srcTape = state->memtuples[0].tupindex; int srcTape = state->memtuples[0].tupindex;
Size tuplen;
int tupIndex; int tupIndex;
SortTuple *newtup; SortTuple *newtup;
/*
* Returned tuple is still counted in our memory space most
* of the time. See mergebatchone() for discussion of why
* caller may occasionally be required to free returned
* tuple, and how preread memory is managed with regard to
* edge cases more generally.
*/
*stup = state->memtuples[0]; *stup = state->memtuples[0];
/* returned tuple is no longer counted in our memory space */
if (stup->tuple)
{
tuplen = GetMemoryChunkSpace(stup->tuple);
state->availMem += tuplen;
state->mergeavailmem[srcTape] += tuplen;
}
tuplesort_heap_siftup(state, false); tuplesort_heap_siftup(state, false);
if ((tupIndex = state->mergenext[srcTape]) == 0) if ((tupIndex = state->mergenext[srcTape]) == 0)
{ {
...@@ -1837,16 +1904,26 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, ...@@ -1837,16 +1904,26 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* out of preloaded data on this tape, try to read more * out of preloaded data on this tape, try to read more
* *
* Unlike mergeonerun(), we only preload from the single * Unlike mergeonerun(), we only preload from the single
* tape that's run dry. See mergepreread() comments. * tape that's run dry, though not before preparing its
* batch memory for a new round of sequential consumption.
* See mergepreread() comments.
*/ */
if (state->batchUsed)
mergebatchone(state, srcTape, stup, should_free);
mergeprereadone(state, srcTape); mergeprereadone(state, srcTape);
/* /*
* if still no data, we've reached end of run on this tape * if still no data, we've reached end of run on this tape
*/ */
if ((tupIndex = state->mergenext[srcTape]) == 0) if ((tupIndex = state->mergenext[srcTape]) == 0)
{
/* Free tape's buffer, avoiding dangling pointer */
if (state->batchUsed)
mergebatchfreetape(state, srcTape, stup, should_free);
return true; return true;
} }
}
/* pull next preread tuple from list, insert in heap */ /* pull next preread tuple from list, insert in heap */
newtup = &state->memtuples[tupIndex]; newtup = &state->memtuples[tupIndex];
state->mergenext[srcTape] = newtup->tupindex; state->mergenext[srcTape] = newtup->tupindex;
...@@ -1912,6 +1989,8 @@ tuplesort_gettupleslot(Tuplesortstate *state, bool forward, ...@@ -1912,6 +1989,8 @@ tuplesort_gettupleslot(Tuplesortstate *state, bool forward,
* Fetch the next tuple in either forward or back direction. * Fetch the next tuple in either forward or back direction.
* Returns NULL if no more tuples. If *should_free is set, the * Returns NULL if no more tuples. If *should_free is set, the
* caller must pfree the returned tuple when done with it. * caller must pfree the returned tuple when done with it.
* If it is not set, caller should not use tuple following next
* call here.
*/ */
HeapTuple HeapTuple
tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free) tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
...@@ -1931,6 +2010,8 @@ tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free) ...@@ -1931,6 +2010,8 @@ tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
* Fetch the next index tuple in either forward or back direction. * Fetch the next index tuple in either forward or back direction.
* Returns NULL if no more tuples. If *should_free is set, the * Returns NULL if no more tuples. If *should_free is set, the
* caller must pfree the returned tuple when done with it. * caller must pfree the returned tuple when done with it.
* If it is not set, caller should not use tuple following next
* call here.
*/ */
IndexTuple IndexTuple
tuplesort_getindextuple(Tuplesortstate *state, bool forward, tuplesort_getindextuple(Tuplesortstate *state, bool forward,
...@@ -1979,7 +2060,7 @@ tuplesort_getdatum(Tuplesortstate *state, bool forward, ...@@ -1979,7 +2060,7 @@ tuplesort_getdatum(Tuplesortstate *state, bool forward,
if (state->sortKeys->abbrev_converter && abbrev) if (state->sortKeys->abbrev_converter && abbrev)
*abbrev = stup.datum1; *abbrev = stup.datum1;
if (stup.isnull1 || state->datumTypeByVal) if (stup.isnull1 || !state->tuples)
{ {
*val = stup.datum1; *val = stup.datum1;
*isNull = stup.isnull1; *isNull = stup.isnull1;
...@@ -2162,6 +2243,10 @@ inittapes(Tuplesortstate *state) ...@@ -2162,6 +2243,10 @@ inittapes(Tuplesortstate *state)
state->mergelast = (int *) palloc0(maxTapes * sizeof(int)); state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int)); state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64)); state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64));
state->mergetuples = (char **) palloc0(maxTapes * sizeof(char *));
state->mergecurrent = (char **) palloc0(maxTapes * sizeof(char *));
state->mergetail = (char **) palloc0(maxTapes * sizeof(char *));
state->mergeoverflow = (char **) palloc0(maxTapes * sizeof(char *));
state->tp_fib = (int *) palloc0(maxTapes * sizeof(int)); state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
state->tp_runs = (int *) palloc0(maxTapes * sizeof(int)); state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int)); state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
...@@ -2320,7 +2405,7 @@ mergeruns(Tuplesortstate *state) ...@@ -2320,7 +2405,7 @@ mergeruns(Tuplesortstate *state)
/* Tell logtape.c we won't be writing anymore */ /* Tell logtape.c we won't be writing anymore */
LogicalTapeSetForgetFreeSpace(state->tapeset); LogicalTapeSetForgetFreeSpace(state->tapeset);
/* Initialize for the final merge pass */ /* Initialize for the final merge pass */
beginmerge(state); beginmerge(state, state->tuples);
state->status = TSS_FINALMERGE; state->status = TSS_FINALMERGE;
return; return;
} }
...@@ -2412,7 +2497,7 @@ mergeonerun(Tuplesortstate *state) ...@@ -2412,7 +2497,7 @@ mergeonerun(Tuplesortstate *state)
* Start the merge by loading one tuple from each active source tape into * Start the merge by loading one tuple from each active source tape into
* the heap. We can also decrease the input run/dummy run counts. * the heap. We can also decrease the input run/dummy run counts.
*/ */
beginmerge(state); beginmerge(state, false);
/* /*
* Execute merge by repeatedly extracting lowest tuple in heap, writing it * Execute merge by repeatedly extracting lowest tuple in heap, writing it
...@@ -2450,6 +2535,12 @@ mergeonerun(Tuplesortstate *state) ...@@ -2450,6 +2535,12 @@ mergeonerun(Tuplesortstate *state)
state->mergeavailslots[srcTape]++; state->mergeavailslots[srcTape]++;
} }
/*
* Reset tuple memory, now that no caller tuples are needed in memory.
* This prevents fragmentation.
*/
MemoryContextReset(state->tuplecontext);
/* /*
* When the heap empties, we're done. Write an end-of-run marker on the * When the heap empties, we're done. Write an end-of-run marker on the
* output tape, and increment its count of real runs. * output tape, and increment its count of real runs.
...@@ -2471,9 +2562,12 @@ mergeonerun(Tuplesortstate *state) ...@@ -2471,9 +2562,12 @@ mergeonerun(Tuplesortstate *state)
* which tapes contain active input runs in mergeactive[]. Then, load * which tapes contain active input runs in mergeactive[]. Then, load
* as many tuples as we can from each active input tape, and finally * as many tuples as we can from each active input tape, and finally
* fill the merge heap with the first tuple from each active tape. * fill the merge heap with the first tuple from each active tape.
*
* finalMergeBatch indicates if this is the beginning of a final on-the-fly
* merge where a batched allocation of tuple memory is required.
*/ */
static void static void
beginmerge(Tuplesortstate *state) beginmerge(Tuplesortstate *state, bool finalMergeBatch)
{ {
int activeTapes; int activeTapes;
int tapenum; int tapenum;
...@@ -2511,6 +2605,18 @@ beginmerge(Tuplesortstate *state) ...@@ -2511,6 +2605,18 @@ beginmerge(Tuplesortstate *state)
state->mergefreelist = 0; /* nothing in the freelist */ state->mergefreelist = 0; /* nothing in the freelist */
state->mergefirstfree = activeTapes; /* 1st slot avail for preread */ state->mergefirstfree = activeTapes; /* 1st slot avail for preread */
if (finalMergeBatch)
{
/* Free outright buffers for tape never actually allocated */
FREEMEM(state, (state->maxTapes - activeTapes) * TAPE_BUFFER_OVERHEAD);
/*
* Grow memtuples one last time, since the palloc() overhead no longer
* incurred can make a big difference
*/
batchmemtuples(state);
}
/* /*
* Initialize space allocation to let each active input tape have an equal * Initialize space allocation to let each active input tape have an equal
* share of preread space. * share of preread space.
...@@ -2518,7 +2624,7 @@ beginmerge(Tuplesortstate *state) ...@@ -2518,7 +2624,7 @@ beginmerge(Tuplesortstate *state)
Assert(activeTapes > 0); Assert(activeTapes > 0);
slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes; slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
Assert(slotsPerTape > 0); Assert(slotsPerTape > 0);
spacePerTape = state->availMem / activeTapes; spacePerTape = MAXALIGN_DOWN(state->availMem / activeTapes);
for (srcTape = 0; srcTape < state->maxTapes; srcTape++) for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{ {
if (state->mergeactive[srcTape]) if (state->mergeactive[srcTape])
...@@ -2528,6 +2634,15 @@ beginmerge(Tuplesortstate *state) ...@@ -2528,6 +2634,15 @@ beginmerge(Tuplesortstate *state)
} }
} }
/*
* Preallocate tuple batch memory for each tape. This is the memory used
* for tuples themselves (not SortTuples), so it's never used by
* pass-by-value datum sorts. Memory allocation is performed here at most
* once per sort, just in advance of the final on-the-fly merge step.
*/
if (finalMergeBatch)
mergebatch(state, spacePerTape);
/* /*
* Preread as many tuples as possible (and at least one) from each active * Preread as many tuples as possible (and at least one) from each active
* tape * tape
...@@ -2551,8 +2666,316 @@ beginmerge(Tuplesortstate *state) ...@@ -2551,8 +2666,316 @@ beginmerge(Tuplesortstate *state)
tup->tupindex = state->mergefreelist; tup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex; state->mergefreelist = tupIndex;
state->mergeavailslots[srcTape]++; state->mergeavailslots[srcTape]++;
#ifdef TRACE_SORT
if (trace_sort && finalMergeBatch)
{
int64 perTapeKB = (spacePerTape + 1023) / 1024;
int64 usedSpaceKB;
int usedSlots;
/*
* Report how effective batchmemtuples() was in balancing
* the number of slots against the need for memory for the
* underlying tuples (e.g. IndexTuples). The big preread of
* all tapes when switching to FINALMERGE state should be
* fairly representative of memory utilization during the
* final merge step, and in any case is the only point at
* which all tapes are guaranteed to have depleted either
* their batch memory allowance or slot allowance. Ideally,
* both will be completely depleted for every tape by now.
*/
usedSpaceKB = (state->mergecurrent[srcTape] -
state->mergetuples[srcTape] + 1023) / 1024;
usedSlots = slotsPerTape - state->mergeavailslots[srcTape];
elog(LOG, "tape %d initially used %ld KB of %ld KB batch "
"(%2.3f) and %d out of %d slots (%2.3f)", srcTape,
usedSpaceKB, perTapeKB,
(double) usedSpaceKB / (double) perTapeKB,
usedSlots, slotsPerTape,
(double) usedSlots / (double) slotsPerTape);
}
#endif
}
}
}
/*
* batchmemtuples - grow memtuples without palloc overhead
*
* When called, availMem should be approximately the amount of memory we'd
* require to allocate memtupsize - memtupcount tuples (not SortTuples/slots)
* that were allocated with palloc() overhead, and in doing so use up all
* allocated slots. However, though slots and tuple memory is in balance
* following the last grow_memtuples() call, that's predicated on the observed
* average tuple size for the "final" grow_memtuples() call, which includes
* palloc overhead.
*
* This will perform an actual final grow_memtuples() call without any palloc()
* overhead, rebalancing the use of memory between slots and tuples.
*/
static void
batchmemtuples(Tuplesortstate *state)
{
int64 refund;
int64 availMemLessRefund;
int memtupsize = state->memtupsize;
/* For simplicity, assume no memtuples are actually currently counted */
Assert(state->memtupcount == 0);
/*
* Refund STANDARDCHUNKHEADERSIZE per tuple.
*
* This sometimes fails to make memory use prefectly balanced, but it
* should never make the situation worse. Note that Assert-enabled builds
* get a larger refund, due to a varying STANDARDCHUNKHEADERSIZE.
*/
refund = memtupsize * STANDARDCHUNKHEADERSIZE;
availMemLessRefund = state->availMem - refund;
/*
* To establish balanced memory use after refunding palloc overhead,
* temporarily have our accounting indicate that we've allocated all
* memory we're allowed to less that refund, and call grow_memtuples()
* to have it increase the number of slots.
*/
state->growmemtuples = true;
USEMEM(state, availMemLessRefund);
(void) grow_memtuples(state);
/* Should not matter, but be tidy */
FREEMEM(state, availMemLessRefund);
state->growmemtuples = false;
#ifdef TRACE_SORT
if (trace_sort)
{
Size OldKb = (memtupsize * sizeof(SortTuple) + 1023) / 1024;
Size NewKb = (state->memtupsize * sizeof(SortTuple) + 1023) / 1024;
elog(LOG, "grew memtuples %1.2fx from %d (%zu KB) to %d (%zu KB) for final merge",
(double) NewKb / (double) OldKb,
memtupsize, OldKb,
state->memtupsize, NewKb);
}
#endif
}
/*
* mergebatch - initialize tuple memory in batch
*
* This allows sequential access to sorted tuples buffered in memory from
* tapes/runs on disk during a final on-the-fly merge step. Note that the
* memory is not used for SortTuples, but for the underlying tuples (e.g.
* MinimalTuples).
*
* Note that when batch memory is used, there is a simple division of space
* into large buffers (one per active tape). The conventional incremental
* memory accounting (calling USEMEM() and FREEMEM()) is abandoned. Instead,
* when each tape's memory budget is exceeded, a retail palloc() "overflow" is
* performed, which is then immediately detected in a way that is analogous to
* LACKMEM(). This keeps each tape's use of memory fair, which is always a
* goal.
*/
static void
mergebatch(Tuplesortstate *state, int64 spacePerTape)
{
int srcTape;
Assert(state->activeTapes > 0);
Assert(state->tuples);
/*
* For the purposes of tuplesort's memory accounting, the batch allocation
* is special, and regular memory accounting through USEMEM() calls is
* abandoned (see mergeprereadone()).
*/
for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
{
char *mergetuples;
if (!state->mergeactive[srcTape])
continue;
/* Allocate buffer for each active tape */
mergetuples = MemoryContextAllocHuge(state->tuplecontext,
spacePerTape);
/* Initialize state for tape */
state->mergetuples[srcTape] = mergetuples;
state->mergecurrent[srcTape] = mergetuples;
state->mergetail[srcTape] = mergetuples;
state->mergeoverflow[srcTape] = NULL;
}
state->batchUsed = true;
state->spacePerTape = spacePerTape;
}
/*
* mergebatchone - prepare batch memory for one merge input tape
*
* This is called following the exhaustion of preread tuples for one input
* tape. All that actually occurs is that the state for the source tape is
* reset to indicate that all memory may be reused.
*
* This routine must deal with fixing up the tuple that is about to be returned
* to the client, due to "overflow" allocations.
*/
static void
mergebatchone(Tuplesortstate *state, int srcTape, SortTuple *rtup,
bool *should_free)
{
Assert(state->batchUsed);
/*
* Tuple about to be returned to caller ("stup") is final preread tuple
* from tape, just removed from the top of the heap. Special steps around
* memory management must be performed for that tuple, to make sure it
* isn't overwritten early.
*/
if (!state->mergeoverflow[srcTape])
{
Size tupLen;
/*
* Mark tuple buffer range for reuse, but be careful to move final,
* tail tuple to start of space for next run so that it's available
* to caller when stup is returned, and remains available at least
* until the next tuple is requested.
*/
tupLen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
state->mergecurrent[srcTape] = state->mergetuples[srcTape];
memmove(state->mergecurrent[srcTape], state->mergetail[srcTape],
tupLen);
/* Make SortTuple at top of the merge heap point to new tuple */
rtup->tuple = (void *) state->mergecurrent[srcTape];
state->mergetail[srcTape] = state->mergecurrent[srcTape];
state->mergecurrent[srcTape] += tupLen;
}
else
{
/*
* Handle an "overflow" retail palloc.
*
* This is needed when we run out of tuple memory for the tape.
*/
state->mergecurrent[srcTape] = state->mergetuples[srcTape];
state->mergetail[srcTape] = state->mergetuples[srcTape];
if (rtup->tuple)
{
Assert(rtup->tuple == (void *) state->mergeoverflow[srcTape]);
/* Caller should free palloc'd tuple */
*should_free = true;
}
state->mergeoverflow[srcTape] = NULL;
}
}
/*
* mergebatchfreetape - handle final clean-up for batch memory once tape is
* about to become exhausted
*
* All tuples are returned from tape, but a single final tuple, *rtup, is to be
* passed back to caller. Free tape's batch allocation buffer while ensuring
* that the final tuple is managed appropriately.
*/
static void
mergebatchfreetape(Tuplesortstate *state, int srcTape, SortTuple *rtup,
bool *should_free)
{
Assert(state->batchUsed);
Assert(state->status == TSS_FINALMERGE);
/*
* Tuple may or may not already be an overflow allocation from
* mergebatchone()
*/
if (!*should_free && rtup->tuple)
{
/*
* Final tuple still in tape's batch allocation.
*
* Return palloc()'d copy to caller, and have it freed in a similar
* manner to overflow allocation. Otherwise, we'd free batch memory
* and pass back a pointer to garbage. Note that we deliberately
* allocate this in the parent tuplesort context, to be on the safe
* side.
*/
Size tuplen;
void *oldTuple = rtup->tuple;
tuplen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
rtup->tuple = MemoryContextAlloc(state->sortcontext, tuplen);
memcpy(rtup->tuple, oldTuple, tuplen);
*should_free = true;
}
/* Free spacePerTape-sized buffer */
pfree(state->mergetuples[srcTape]);
}
/*
* mergebatchalloc - allocate memory for one tuple using a batch memory
* "logical allocation".
*
* This is used for the final on-the-fly merge phase only. READTUP() routines
* receive memory from here in place of palloc() and USEMEM() calls.
*
* Tuple tapenum is passed, ensuring each tape's tuples are stored in sorted,
* contiguous order (while allowing safe reuse of memory made available to
* each tape). This maximizes locality of access as tuples are returned by
* final merge.
*
* Caller must not subsequently attempt to free memory returned here. In
* general, only mergebatch* functions know about how memory returned from
* here should be freed, and this function's caller must ensure that batch
* memory management code will definitely have the opportunity to do the right
* thing during the final on-the-fly merge.
*/
static void *
mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen)
{
Size reserve_tuplen = MAXALIGN(tuplen);
char *ret;
/* Should overflow at most once before mergebatchone() call: */
Assert(state->mergeoverflow[tapenum] == NULL);
Assert(state->batchUsed);
/* It should be possible to use precisely spacePerTape memory at once */
if (state->mergecurrent[tapenum] + reserve_tuplen <=
state->mergetuples[tapenum] + state->spacePerTape)
{
/*
* Usual case -- caller is returned pointer into its tape's buffer, and
* an offset from that point is recorded as where tape has consumed up
* to for current round of preloading.
*/
ret = state->mergetail[tapenum] = state->mergecurrent[tapenum];
state->mergecurrent[tapenum] += reserve_tuplen;
} }
else
{
/*
* Allocate memory, and record as tape's overflow allocation. This
* will be detected quickly, in a similar fashion to a LACKMEM()
* condition, and should not happen again before a new round of
* preloading for caller's tape. Note that we deliberately allocate
* this in the parent tuplesort context, to be on the safe side.
*
* Sometimes, this does not happen because merging runs out of slots
* before running out of memory.
*/
ret = state->mergeoverflow[tapenum] =
MemoryContextAlloc(state->sortcontext, tuplen);
} }
return ret;
} }
/* /*
...@@ -2576,7 +2999,9 @@ beginmerge(Tuplesortstate *state) ...@@ -2576,7 +2999,9 @@ beginmerge(Tuplesortstate *state)
* that state and so no point in scanning through all the tapes to fix one. * that state and so no point in scanning through all the tapes to fix one.
* (Moreover, there may be quite a lot of inactive tapes in that state, since * (Moreover, there may be quite a lot of inactive tapes in that state, since
* we might have had many fewer runs than tapes. In a regular tape-to-tape * we might have had many fewer runs than tapes. In a regular tape-to-tape
* merge we can expect most of the tapes to be active.) * merge we can expect most of the tapes to be active. Plus, only
* FINALMERGE state has to consider memory management for a batch
* allocation.)
*/ */
static void static void
mergepreread(Tuplesortstate *state) mergepreread(Tuplesortstate *state)
...@@ -2605,9 +3030,20 @@ mergeprereadone(Tuplesortstate *state, int srcTape) ...@@ -2605,9 +3030,20 @@ mergeprereadone(Tuplesortstate *state, int srcTape)
if (!state->mergeactive[srcTape]) if (!state->mergeactive[srcTape])
return; /* tape's run is already exhausted */ return; /* tape's run is already exhausted */
/*
* Manage per-tape availMem. Only actually matters when batch memory not
* in use.
*/
priorAvail = state->availMem; priorAvail = state->availMem;
state->availMem = state->mergeavailmem[srcTape]; state->availMem = state->mergeavailmem[srcTape];
while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
/*
* When batch memory is used if final on-the-fly merge, only mergeoverflow
* test is relevant; otherwise, only LACKMEM() test is relevant.
*/
while ((state->mergeavailslots[srcTape] > 0 &&
state->mergeoverflow[srcTape] == NULL && !LACKMEM(state)) ||
state->mergenext[srcTape] == 0) state->mergenext[srcTape] == 0)
{ {
/* read next tuple, if any */ /* read next tuple, if any */
...@@ -3093,6 +3529,42 @@ markrunend(Tuplesortstate *state, int tapenum) ...@@ -3093,6 +3529,42 @@ markrunend(Tuplesortstate *state, int tapenum)
LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len)); LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
} }
/*
* Get memory for tuple from within READTUP() routine. Allocate
* memory and account for that, or consume from tape's batch
* allocation.
*
* Memory returned here in the final on-the-fly merge case is recycled
* from tape's batch allocation. Otherwise, callers must pfree() or
* reset tuple child memory context, and account for that with a
* FREEMEM(). Currently, this only ever needs to happen in WRITETUP()
* routines.
*/
static void *
readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen)
{
if (state->batchUsed)
{
/*
* No USEMEM() call, because during final on-the-fly merge
* accounting is based on tape-private state. ("Overflow"
* allocations are detected as an indication that a new round
* or preloading is required. Preloading marks existing
* contents of tape's batch buffer for reuse.)
*/
return mergebatchalloc(state, tapenum, tuplen);
}
else
{
char *ret;
/* Batch allocation yet to be performed */
ret = MemoryContextAlloc(state->tuplecontext, tuplen);
USEMEM(state, GetMemoryChunkSpace(ret));
return ret;
}
}
/* /*
* Routines specialized for HeapTuple (actually MinimalTuple) case * Routines specialized for HeapTuple (actually MinimalTuple) case
...@@ -3171,6 +3643,7 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup) ...@@ -3171,6 +3643,7 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
Datum original; Datum original;
MinimalTuple tuple; MinimalTuple tuple;
HeapTupleData htup; HeapTupleData htup;
MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
/* copy the tuple into sort storage */ /* copy the tuple into sort storage */
tuple = ExecCopySlotMinimalTuple(slot); tuple = ExecCopySlotMinimalTuple(slot);
...@@ -3184,6 +3657,8 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup) ...@@ -3184,6 +3657,8 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup)
state->tupDesc, state->tupDesc,
&stup->isnull1); &stup->isnull1);
MemoryContextSwitchTo(oldcontext);
if (!state->sortKeys->abbrev_converter || stup->isnull1) if (!state->sortKeys->abbrev_converter || stup->isnull1)
{ {
/* /*
...@@ -3266,11 +3741,10 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup, ...@@ -3266,11 +3741,10 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup,
{ {
unsigned int tupbodylen = len - sizeof(int); unsigned int tupbodylen = len - sizeof(int);
unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
MinimalTuple tuple = (MinimalTuple) palloc(tuplen); MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tapenum, tuplen);
char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET; char *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
HeapTupleData htup; HeapTupleData htup;
USEMEM(state, GetMemoryChunkSpace(tuple));
/* read in the tuple proper */ /* read in the tuple proper */
tuple->t_len = tuplen; tuple->t_len = tuplen;
LogicalTapeReadExact(state->tapeset, tapenum, LogicalTapeReadExact(state->tapeset, tapenum,
...@@ -3409,12 +3883,15 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup) ...@@ -3409,12 +3883,15 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup)
{ {
HeapTuple tuple = (HeapTuple) tup; HeapTuple tuple = (HeapTuple) tup;
Datum original; Datum original;
MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
/* copy the tuple into sort storage */ /* copy the tuple into sort storage */
tuple = heap_copytuple(tuple); tuple = heap_copytuple(tuple);
stup->tuple = (void *) tuple; stup->tuple = (void *) tuple;
USEMEM(state, GetMemoryChunkSpace(tuple)); USEMEM(state, GetMemoryChunkSpace(tuple));
MemoryContextSwitchTo(oldcontext);
/* /*
* set up first-column key value, and potentially abbreviate, if it's a * set up first-column key value, and potentially abbreviate, if it's a
* simple column * simple column
...@@ -3501,9 +3978,10 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup, ...@@ -3501,9 +3978,10 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int tuplen) int tapenum, unsigned int tuplen)
{ {
unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int); unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
HeapTuple tuple = (HeapTuple) palloc(t_len + HEAPTUPLESIZE); HeapTuple tuple = (HeapTuple) readtup_alloc(state,
tapenum,
t_len + HEAPTUPLESIZE);
USEMEM(state, GetMemoryChunkSpace(tuple));
/* Reconstruct the HeapTupleData header */ /* Reconstruct the HeapTupleData header */
tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE); tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
tuple->t_len = t_len; tuple->t_len = t_len;
...@@ -3722,7 +4200,7 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup) ...@@ -3722,7 +4200,7 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup)
Datum original; Datum original;
/* copy the tuple into sort storage */ /* copy the tuple into sort storage */
newtuple = (IndexTuple) palloc(tuplen); newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen);
memcpy(newtuple, tuple, tuplen); memcpy(newtuple, tuple, tuplen);
USEMEM(state, GetMemoryChunkSpace(newtuple)); USEMEM(state, GetMemoryChunkSpace(newtuple));
stup->tuple = (void *) newtuple; stup->tuple = (void *) newtuple;
...@@ -3804,9 +4282,8 @@ readtup_index(Tuplesortstate *state, SortTuple *stup, ...@@ -3804,9 +4282,8 @@ readtup_index(Tuplesortstate *state, SortTuple *stup,
int tapenum, unsigned int len) int tapenum, unsigned int len)
{ {
unsigned int tuplen = len - sizeof(unsigned int); unsigned int tuplen = len - sizeof(unsigned int);
IndexTuple tuple = (IndexTuple) palloc(tuplen); IndexTuple tuple = (IndexTuple) readtup_alloc(state, tapenum, tuplen);
USEMEM(state, GetMemoryChunkSpace(tuple));
LogicalTapeReadExact(state->tapeset, tapenum, LogicalTapeReadExact(state->tapeset, tapenum,
tuple, tuplen); tuple, tuplen);
if (state->randomAccess) /* need trailing length word? */ if (state->randomAccess) /* need trailing length word? */
...@@ -3864,7 +4341,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup) ...@@ -3864,7 +4341,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup)
waddr = NULL; waddr = NULL;
tuplen = 0; tuplen = 0;
} }
else if (state->datumTypeByVal) else if (!state->tuples)
{ {
waddr = &stup->datum1; waddr = &stup->datum1;
tuplen = sizeof(Datum); tuplen = sizeof(Datum);
...@@ -3906,7 +4383,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup, ...@@ -3906,7 +4383,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
stup->isnull1 = true; stup->isnull1 = true;
stup->tuple = NULL; stup->tuple = NULL;
} }
else if (state->datumTypeByVal) else if (!state->tuples)
{ {
Assert(tuplen == sizeof(Datum)); Assert(tuplen == sizeof(Datum));
LogicalTapeReadExact(state->tapeset, tapenum, LogicalTapeReadExact(state->tapeset, tapenum,
...@@ -3916,14 +4393,13 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup, ...@@ -3916,14 +4393,13 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup,
} }
else else
{ {
void *raddr = palloc(tuplen); void *raddr = readtup_alloc(state, tapenum, tuplen);
LogicalTapeReadExact(state->tapeset, tapenum, LogicalTapeReadExact(state->tapeset, tapenum,
raddr, tuplen); raddr, tuplen);
stup->datum1 = PointerGetDatum(raddr); stup->datum1 = PointerGetDatum(raddr);
stup->isnull1 = false; stup->isnull1 = false;
stup->tuple = raddr; stup->tuple = raddr;
USEMEM(state, GetMemoryChunkSpace(raddr));
} }
if (state->randomAccess) /* need trailing length word? */ if (state->randomAccess) /* need trailing length word? */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment