Commit cf627ab4 authored by Tom Lane's avatar Tom Lane

Further performance improvements in sorting: reduce number of comparisons

during initial run formation by keeping both current run and next-run
tuples in the same heap (yup, Knuth is smarter than I am).  And, during
merge passes, make use of available sort memory to load multiple tuples
from any one input 'tape' at a time, thereby improving locality of
access to the temp file.
parent 2a5f3869
...@@ -19,13 +19,16 @@ ...@@ -19,13 +19,16 @@
* disk space as soon as each block is read from its "tape". * disk space as soon as each block is read from its "tape".
* *
* We do not form the initial runs using Knuth's recommended replacement * We do not form the initial runs using Knuth's recommended replacement
* selection method (Algorithm 5.4.1R), because it uses a fixed number of * selection data structure (Algorithm 5.4.1R), because it uses a fixed
* records in memory at all times. Since we are dealing with tuples that * number of records in memory at all times. Since we are dealing with
* may vary considerably in size, we want to be able to vary the number of * tuples that may vary considerably in size, we want to be able to vary
* records kept in memory to ensure full utilization of the allowed sort * the number of records kept in memory to ensure full utilization of the
* memory space. This is easily done by keeping a variable-size heap in * allowed sort memory space. So, we keep the tuples in a variable-size
* which the records of the current run are stored, plus a variable-size * heap, with the next record to go out at the top of the heap. Like
* unsorted array holding records that must go into the next run. * Algorithm 5.4.1R, each record is stored with the run number that it
* must go into, and we use (run number, key) as the ordering key for the
* heap. When the run number at the top of the heap changes, we know that
* no more records of the prior run are left in the heap.
* *
* The (approximate) amount of memory allowed for any one sort operation * The (approximate) amount of memory allowed for any one sort operation
* is given in kilobytes by the external variable SortMem. Initially, * is given in kilobytes by the external variable SortMem. Initially,
...@@ -35,13 +38,32 @@ ...@@ -35,13 +38,32 @@
* tuples just by scanning the tuple array sequentially. If we do exceed * tuples just by scanning the tuple array sequentially. If we do exceed
* SortMem, we construct a heap using Algorithm H and begin to emit tuples * SortMem, we construct a heap using Algorithm H and begin to emit tuples
* into sorted runs in temporary tapes, emitting just enough tuples at each * into sorted runs in temporary tapes, emitting just enough tuples at each
* step to get back within the SortMem limit. New tuples are added to the * step to get back within the SortMem limit. Whenever the run number at
* heap if they can go into the current run, else they are temporarily added * the top of the heap changes, we begin a new run with a new output tape
* to the unsorted array. Whenever the heap empties, we construct a new heap * (selected per Algorithm D). After the end of the input is reached,
* from the current contents of the unsorted array, and begin a new run with a * we dump out remaining tuples in memory into a final run (or two),
* new output tape (selected per Algorithm D). After the end of the input * then merge the runs using Algorithm D.
* is reached, we dump out remaining tuples in memory into a final run *
* (or two), then merge the runs using Algorithm D. * When merging runs, we use a heap containing just the frontmost tuple from
* each source run; we repeatedly output the smallest tuple and insert the
* next tuple from its source tape (if any). When the heap empties, the merge
* is complete. The basic merge algorithm thus needs very little memory ---
* only M tuples for an M-way merge, and M is at most six in the present code.
* However, we can still make good use of our full SortMem allocation by
* pre-reading additional tuples from each source tape. Without prereading,
* our access pattern to the temporary file would be very erratic; on average
* we'd read one block from each of M source tapes during the same time that
* we're writing M blocks to the output tape, so there is no sequentiality of
* access at all, defeating the read-ahead methods used by most Unix kernels.
* Worse, the output tape gets written into a very random sequence of blocks
* of the temp file, ensuring that things will be even worse when it comes
* time to read that tape. A straightforward merge pass thus ends up doing a
* lot of waiting for disk seeks. We can improve matters by prereading from
* each source tape sequentially, loading about SortMem/M bytes from each tape
* in turn. Then we run the merge algorithm, writing but not reading until
* one of the preloaded tuple series runs out. Then we switch back to preread
* mode, fill memory again, and repeat. This approach helps to localize both
* read and write accesses.
* *
* When the caller requests random access to the sort result, we form * When the caller requests random access to the sort result, we form
* the final sorted run on a logical tape which is then "frozen", so * the final sorted run on a logical tape which is then "frozen", so
...@@ -55,7 +77,7 @@ ...@@ -55,7 +77,7 @@
* Copyright (c) 1994, Regents of the University of California * Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.1 1999/10/17 22:15:05 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.2 1999/10/30 17:27:15 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -129,30 +151,65 @@ struct Tuplesortstate ...@@ -129,30 +151,65 @@ struct Tuplesortstate
* of memory space consumed. * of memory space consumed.
*/ */
void * (*readtup) (Tuplesortstate *state, int tapenum, unsigned int len); void * (*readtup) (Tuplesortstate *state, int tapenum, unsigned int len);
/*
* Obtain memory space occupied by a stored tuple. (This routine is
* only needed in the FINALMERGE case, since copytup, writetup, and
* readtup are expected to adjust availMem appropriately.)
*/
unsigned int (*tuplesize) (Tuplesortstate *state, void *tup);
/* /*
* This array holds "unsorted" tuples during the input phases. * This array holds pointers to tuples in sort memory. If we are in
* If we are able to complete the sort in memory, it holds the * state INITIAL, the tuples are in no particular order; if we are in
* final sorted result as well. * state SORTEDINMEM, the tuples are in final sorted order; in states
* BUILDRUNS and FINALMERGE, the tuples are organized in "heap" order
* per Algorithm H. (Note that memtupcount only counts the tuples that
* are part of the heap --- during merge passes, memtuples[] entries
* beyond TAPERANGE are never in the heap and are used to hold
* pre-read tuples.) In state SORTEDONTAPE, the array is not used.
*/ */
void **memtuples; /* array of pointers to palloc'd tuples */ void **memtuples; /* array of pointers to palloc'd tuples */
int memtupcount; /* number of tuples currently present */ int memtupcount; /* number of tuples currently present */
int memtupsize; /* allocated length of memtuples array */ int memtupsize; /* allocated length of memtuples array */
/* /*
* This array holds the partially-sorted "heap" of tuples that will go * While building initial runs, this array holds the run number for each
* out in the current run during BUILDRUNS state. While completing * tuple in memtuples[]. During merge passes, we re-use it to hold the
* the sort, we use it to merge runs of tuples from input tapes. * input tape number that each tuple in the heap was read from, or to hold
* It is never allocated unless we need to use tapes. * the index of the next tuple pre-read from the same tape in the case of
* pre-read entries. This array is never allocated unless we need to use
* tapes. Whenever it is allocated, it has the same length as
* memtuples[].
*/ */
void **heaptuples; /* array of pointers to palloc'd tuples */ int *memtupindex; /* index value associated with memtuples[i] */
int heaptupcount; /* number of tuples currently present */
int heaptupsize; /* allocated length of heaptuples array */ /*
* While building initial runs, this is the current output run number
* (starting at 0). Afterwards, it is the number of initial runs we made.
*/
int currentRun;
/* /*
* While merging, this array holds the actual number of the input tape * These variables are only used during merge passes. mergeactive[i]
* that each tuple in heaptuples[] came from. * is true if we are reading an input run from (actual) tape number i
* and have not yet exhausted that run. mergenext[i] is the memtuples
* index of the next pre-read tuple (next to be loaded into the heap)
* for tape i, or 0 if we are out of pre-read tuples. mergelast[i]
* similarly points to the last pre-read tuple from each tape.
* mergeavailmem[i] is the amount of unused space allocated for tape i.
* mergefreelist and mergefirstfree keep track of unused locations
* in the memtuples[] array. memtupindex[] links together pre-read
* tuples for each tape as well as recycled locations in mergefreelist.
* It is OK to use 0 as a null link in these lists, because memtuples[0]
* is part of the merge heap and is never a pre-read tuple.
*/ */
int *heapsrctapes; bool mergeactive[MAXTAPES]; /* Active input run source? */
int mergenext[MAXTAPES]; /* first preread tuple for each source */
int mergelast[MAXTAPES]; /* last preread tuple for each source */
long mergeavailmem[MAXTAPES]; /* availMem for prereading tapes */
long spacePerTape; /* actual per-tape target usage */
int mergefreelist; /* head of freelist of recycled slots */
int mergefirstfree; /* first slot never used in this merge */
/* /*
* Variables for Algorithm D. Note that destTape is a "logical" tape * Variables for Algorithm D. Note that destTape is a "logical" tape
...@@ -166,8 +223,6 @@ struct Tuplesortstate ...@@ -166,8 +223,6 @@ struct Tuplesortstate
int tp_dummy[MAXTAPES]; /* # of dummy runs for each tape (D[]) */ int tp_dummy[MAXTAPES]; /* # of dummy runs for each tape (D[]) */
int tp_tapenum[MAXTAPES]; /* Actual tape numbers (TAPE[]) */ int tp_tapenum[MAXTAPES]; /* Actual tape numbers (TAPE[]) */
bool multipleRuns; /* T if we have created more than 1 run */
/* /*
* These variables are used after completion of sorting to keep track * These variables are used after completion of sorting to keep track
* of the next tuple to return. (In the tape case, the tape's current * of the next tuple to return. (In the tape case, the tape's current
...@@ -202,6 +257,7 @@ struct Tuplesortstate ...@@ -202,6 +257,7 @@ struct Tuplesortstate
#define COPYTUP(state,tup) ((*(state)->copytup) (state, tup)) #define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
#define WRITETUP(state,tape,tup) ((*(state)->writetup) (state, tape, tup)) #define WRITETUP(state,tape,tup) ((*(state)->writetup) (state, tape, tup))
#define READTUP(state,tape,len) ((*(state)->readtup) (state, tape, len)) #define READTUP(state,tape,len) ((*(state)->readtup) (state, tape, len))
#define TUPLESIZE(state,tup) ((*(state)->tuplesize) (state, tup))
#define LACKMEM(state) ((state)->availMem < 0) #define LACKMEM(state) ((state)->availMem < 0)
#define USEMEM(state,amt) ((state)->availMem -= (amt)) #define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt)) #define FREEMEM(state,amt) ((state)->availMem += (amt))
...@@ -239,7 +295,7 @@ struct Tuplesortstate ...@@ -239,7 +295,7 @@ struct Tuplesortstate
* *
* We count space requested for tuples against the SortMem limit. * We count space requested for tuples against the SortMem limit.
* Fixed-size space (primarily the LogicalTapeSet I/O buffers) is not * Fixed-size space (primarily the LogicalTapeSet I/O buffers) is not
* counted, nor do we count the variable-size memtuples and heaptuples * counted, nor do we count the variable-size memtuples and memtupindex
* arrays. (Even though those could grow pretty large, they should be * arrays. (Even though those could grow pretty large, they should be
* small compared to the tuples proper, so this is not unreasonable.) * small compared to the tuples proper, so this is not unreasonable.)
* *
...@@ -271,11 +327,11 @@ static void selectnewtape(Tuplesortstate *state); ...@@ -271,11 +327,11 @@ static void selectnewtape(Tuplesortstate *state);
static void mergeruns(Tuplesortstate *state); static void mergeruns(Tuplesortstate *state);
static void mergeonerun(Tuplesortstate *state); static void mergeonerun(Tuplesortstate *state);
static void beginmerge(Tuplesortstate *state); static void beginmerge(Tuplesortstate *state);
static void beginrun(Tuplesortstate *state); static void mergepreread(Tuplesortstate *state);
static void dumptuples(Tuplesortstate *state, bool alltuples); static void dumptuples(Tuplesortstate *state, bool alltuples);
static void tuplesort_heap_insert(Tuplesortstate *state, void *tuple, static void tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
int tapenum); int tupleindex, bool checkIndex);
static void tuplesort_heap_siftup(Tuplesortstate *state); static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex);
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
static void markrunend(Tuplesortstate *state, int tapenum); static void markrunend(Tuplesortstate *state, int tapenum);
static int qsort_comparetup(const void *a, const void *b); static int qsort_comparetup(const void *a, const void *b);
...@@ -285,12 +341,14 @@ static void *copytup_heap(Tuplesortstate *state, void *tup); ...@@ -285,12 +341,14 @@ static void *copytup_heap(Tuplesortstate *state, void *tup);
static void writetup_heap(Tuplesortstate *state, int tapenum, void *tup); static void writetup_heap(Tuplesortstate *state, int tapenum, void *tup);
static void *readtup_heap(Tuplesortstate *state, int tapenum, static void *readtup_heap(Tuplesortstate *state, int tapenum,
unsigned int len); unsigned int len);
static unsigned int tuplesize_heap(Tuplesortstate *state, void *tup);
static int comparetup_index(Tuplesortstate *state, static int comparetup_index(Tuplesortstate *state,
const void *a, const void *b); const void *a, const void *b);
static void *copytup_index(Tuplesortstate *state, void *tup); static void *copytup_index(Tuplesortstate *state, void *tup);
static void writetup_index(Tuplesortstate *state, int tapenum, void *tup); static void writetup_index(Tuplesortstate *state, int tapenum, void *tup);
static void *readtup_index(Tuplesortstate *state, int tapenum, static void *readtup_index(Tuplesortstate *state, int tapenum,
unsigned int len); unsigned int len);
static unsigned int tuplesize_index(Tuplesortstate *state, void *tup);
/* /*
* Since qsort(3) will not pass any context info to qsort_comparetup(), * Since qsort(3) will not pass any context info to qsort_comparetup(),
...@@ -332,10 +390,9 @@ tuplesort_begin_common(bool randomAccess) ...@@ -332,10 +390,9 @@ tuplesort_begin_common(bool randomAccess)
state->memtupsize = 1024; /* initial guess */ state->memtupsize = 1024; /* initial guess */
state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *)); state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
state->heaptuples = NULL; /* until and unless needed */ state->memtupindex = NULL; /* until and unless needed */
state->heaptupcount = 0;
state->heaptupsize = 0; state->currentRun = 0;
state->heapsrctapes = NULL;
/* Algorithm D variables will be initialized by inittapes, if needed */ /* Algorithm D variables will be initialized by inittapes, if needed */
...@@ -359,6 +416,7 @@ tuplesort_begin_heap(TupleDesc tupDesc, ...@@ -359,6 +416,7 @@ tuplesort_begin_heap(TupleDesc tupDesc,
state->copytup = copytup_heap; state->copytup = copytup_heap;
state->writetup = writetup_heap; state->writetup = writetup_heap;
state->readtup = readtup_heap; state->readtup = readtup_heap;
state->tuplesize = tuplesize_heap;
state->tupDesc = tupDesc; state->tupDesc = tupDesc;
state->nKeys = nkeys; state->nKeys = nkeys;
...@@ -378,6 +436,7 @@ tuplesort_begin_index(Relation indexRel, ...@@ -378,6 +436,7 @@ tuplesort_begin_index(Relation indexRel,
state->copytup = copytup_index; state->copytup = copytup_index;
state->writetup = writetup_index; state->writetup = writetup_index;
state->readtup = readtup_index; state->readtup = readtup_index;
state->tuplesize = tuplesize_index;
state->indexRel = indexRel; state->indexRel = indexRel;
state->enforceUnique = enforceUnique; state->enforceUnique = enforceUnique;
...@@ -403,14 +462,8 @@ tuplesort_end(Tuplesortstate *state) ...@@ -403,14 +462,8 @@ tuplesort_end(Tuplesortstate *state)
pfree(state->memtuples[i]); pfree(state->memtuples[i]);
pfree(state->memtuples); pfree(state->memtuples);
} }
if (state->heaptuples) if (state->memtupindex)
{ pfree(state->memtupindex);
for (i = 0; i < state->heaptupcount; i++)
pfree(state->heaptuples[i]);
pfree(state->heaptuples);
}
if (state->heapsrctapes)
pfree(state->heapsrctapes);
} }
/* /*
...@@ -450,7 +503,6 @@ tuplesort_puttuple(Tuplesortstate *state, void *tuple) ...@@ -450,7 +503,6 @@ tuplesort_puttuple(Tuplesortstate *state, void *tuple)
* Nope; time to switch to tape-based operation. * Nope; time to switch to tape-based operation.
*/ */
inittapes(state); inittapes(state);
beginrun(state);
/* /*
* Dump tuples until we are back under the limit. * Dump tuples until we are back under the limit.
*/ */
...@@ -458,36 +510,23 @@ tuplesort_puttuple(Tuplesortstate *state, void *tuple) ...@@ -458,36 +510,23 @@ tuplesort_puttuple(Tuplesortstate *state, void *tuple)
break; break;
case TSS_BUILDRUNS: case TSS_BUILDRUNS:
/* /*
* Insert the copied tuple into the heap if it can go into the * Insert the copied tuple into the heap, with run number
* current run; otherwise add it to the unsorted array, whence * currentRun if it can go into the current run, else run
* it will go into the next run. * number currentRun+1. The tuple can go into the current run
* * if it is >= the first not-yet-output tuple. (Actually,
* The tuple can go into the current run if it is >= the first * it could go into the current run if it is >= the most recently
* not-yet-output tuple. (Actually, it could go into the current * output tuple ... but that would require keeping around the
* run if it is >= the most recently output tuple ... but that * tuple we last output, and it's simplest to let writetup free
* would require keeping around the tuple we last output, and * each tuple as soon as it's written.)
* it's simplest to let writetup free the tuple when written.)
* *
* Note there will always be at least one tuple in the heap * Note there will always be at least one tuple in the heap
* at this point; see dumptuples. * at this point; see dumptuples.
*/ */
Assert(state->heaptupcount > 0); Assert(state->memtupcount > 0);
if (COMPARETUP(state, tuple, state->heaptuples[0]) >= 0) if (COMPARETUP(state, tuple, state->memtuples[0]) >= 0)
{ tuplesort_heap_insert(state, tuple, state->currentRun, true);
tuplesort_heap_insert(state, tuple, 0);
}
else else
{ tuplesort_heap_insert(state, tuple, state->currentRun+1, true);
if (state->memtupcount >= state->memtupsize)
{
/* Grow the unsorted array as needed. */
state->memtupsize *= 2;
state->memtuples = (void **)
repalloc(state->memtuples,
state->memtupsize * sizeof(void *));
}
state->memtuples[state->memtupcount++] = tuple;
}
/* /*
* If we are over the memory limit, dump tuples till we're under. * If we are over the memory limit, dump tuples till we're under.
*/ */
...@@ -529,7 +568,7 @@ tuplesort_performsort(Tuplesortstate *state) ...@@ -529,7 +568,7 @@ tuplesort_performsort(Tuplesortstate *state)
* Finish tape-based sort. First, flush all tuples remaining * Finish tape-based sort. First, flush all tuples remaining
* in memory out to tape; then merge until we have a single * in memory out to tape; then merge until we have a single
* remaining run (or, if !randomAccess, one run per tape). * remaining run (or, if !randomAccess, one run per tape).
* Note that mergeruns sets the correct status. * Note that mergeruns sets the correct state->status.
*/ */
dumptuples(state, true); dumptuples(state, true);
mergeruns(state); mergeruns(state);
...@@ -675,17 +714,35 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward, ...@@ -675,17 +714,35 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward,
/* /*
* This code should match the inner loop of mergeonerun(). * This code should match the inner loop of mergeonerun().
*/ */
if (state->heaptupcount > 0) if (state->memtupcount > 0)
{ {
int srcTape = state->heapsrctapes[0]; int srcTape = state->memtupindex[0];
unsigned int tuplen;
tup = state->heaptuples[0]; int tupIndex;
tuplesort_heap_siftup(state); void *newtup;
if ((tuplen = getlen(state, srcTape, true)) != 0)
tup = state->memtuples[0];
/* returned tuple is no longer counted in our memory space */
tuplen = TUPLESIZE(state, tup);
state->availMem += tuplen;
state->mergeavailmem[srcTape] += tuplen;
tuplesort_heap_siftup(state, false);
if ((tupIndex = state->mergenext[srcTape]) == 0)
{ {
void *newtup = READTUP(state, srcTape, tuplen); /* out of preloaded data on this tape, try to read more */
tuplesort_heap_insert(state, newtup, srcTape); mergepreread(state);
/* if still no data, we've reached end of run on this tape */
if ((tupIndex = state->mergenext[srcTape]) == 0)
return tup;
} }
/* pull next preread tuple from list, insert in heap */
newtup = state->memtuples[tupIndex];
state->mergenext[srcTape] = state->memtupindex[tupIndex];
if (state->mergenext[srcTape] == 0)
state->mergelast[srcTape] = 0;
state->memtupindex[tupIndex] = state->mergefreelist;
state->mergefreelist = tupIndex;
tuplesort_heap_insert(state, newtup, srcTape, false);
return tup; return tup;
} }
return NULL; return NULL;
...@@ -704,18 +761,31 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward, ...@@ -704,18 +761,31 @@ tuplesort_gettuple(Tuplesortstate *state, bool forward,
static void static void
inittapes(Tuplesortstate *state) inittapes(Tuplesortstate *state)
{ {
int j; int ntuples,
j;
state->tapeset = LogicalTapeSetCreate(MAXTAPES); state->tapeset = LogicalTapeSetCreate(MAXTAPES);
/* /*
* Initialize heaptuples array slightly larger than current memtuples * Allocate the memtupindex array, same size as memtuples.
* usage; memtupcount is probably a good guess at how many tuples we */
* will be able to have in the heap at once. state->memtupindex = (int *) palloc(state->memtupsize * sizeof(int));
/*
* Convert the unsorted contents of memtuples[] into a heap.
* Each tuple is marked as belonging to run number zero.
*
* NOTE: we pass false for checkIndex since there's no point in
* comparing indexes in this step, even though we do intend the
* indexes to be part of the sort key...
*/ */
state->heaptupcount = 0; ntuples = state->memtupcount;
state->heaptupsize = state->memtupcount + state->memtupcount / 4; state->memtupcount = 0; /* make the heap empty */
state->heaptuples = (void **) palloc(state->heaptupsize * sizeof(void *)); for (j = 0; j < ntuples; j++)
tuplesort_heap_insert(state, state->memtuples[j], 0, false);
Assert(state->memtupcount == ntuples);
state->currentRun = 0;
/* /*
* Initialize variables of Algorithm D (step D1). * Initialize variables of Algorithm D (step D1).
...@@ -733,8 +803,6 @@ inittapes(Tuplesortstate *state) ...@@ -733,8 +803,6 @@ inittapes(Tuplesortstate *state)
state->Level = 1; state->Level = 1;
state->destTape = 0; state->destTape = 0;
state->multipleRuns = false;
state->status = TSS_BUILDRUNS; state->status = TSS_BUILDRUNS;
} }
...@@ -750,9 +818,6 @@ selectnewtape(Tuplesortstate *state) ...@@ -750,9 +818,6 @@ selectnewtape(Tuplesortstate *state)
int j; int j;
int a; int a;
/* We now have at least two initial runs */
state->multipleRuns = true;
/* Step D3: advance j (destTape) */ /* Step D3: advance j (destTape) */
if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape+1]) if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape+1])
{ {
...@@ -791,13 +856,13 @@ mergeruns(Tuplesortstate *state) ...@@ -791,13 +856,13 @@ mergeruns(Tuplesortstate *state)
svDummy; svDummy;
Assert(state->status == TSS_BUILDRUNS); Assert(state->status == TSS_BUILDRUNS);
Assert(state->memtupcount == 0 && state->heaptupcount == 0); Assert(state->memtupcount == 0);
/* /*
* If we produced only one initial run (quite likely if the total * If we produced only one initial run (quite likely if the total
* data volume is between 1X and 2X SortMem), we can just use that * data volume is between 1X and 2X SortMem), we can just use that
* tape as the finished output, rather than doing a useless merge. * tape as the finished output, rather than doing a useless merge.
*/ */
if (! state->multipleRuns) if (state->currentRun == 1)
{ {
state->result_tape = state->tp_tapenum[state->destTape]; state->result_tape = state->tp_tapenum[state->destTape];
/* must freeze and rewind the finished output tape */ /* must freeze and rewind the finished output tape */
...@@ -896,8 +961,10 @@ mergeonerun(Tuplesortstate *state) ...@@ -896,8 +961,10 @@ mergeonerun(Tuplesortstate *state)
{ {
int destTape = state->tp_tapenum[TAPERANGE]; int destTape = state->tp_tapenum[TAPERANGE];
int srcTape; int srcTape;
unsigned int tuplen; int tupIndex;
void *tup; void *tup;
long priorAvail,
spaceFreed;
/* /*
* Start the merge by loading one tuple from each active source tape * Start the merge by loading one tuple from each active source tape
...@@ -910,18 +977,34 @@ mergeonerun(Tuplesortstate *state) ...@@ -910,18 +977,34 @@ mergeonerun(Tuplesortstate *state)
* writing it out, and replacing it with next tuple from same tape * writing it out, and replacing it with next tuple from same tape
* (if there is another one). * (if there is another one).
*/ */
while (state->heaptupcount > 0) while (state->memtupcount > 0)
{ {
WRITETUP(state, destTape, state->heaptuples[0]); /* write the tuple to destTape */
srcTape = state->heapsrctapes[0]; priorAvail = state->availMem;
tuplesort_heap_siftup(state); srcTape = state->memtupindex[0];
if ((tuplen = getlen(state, srcTape, true)) != 0) WRITETUP(state, destTape, state->memtuples[0]);
/* writetup adjusted total free space, now fix per-tape space */
spaceFreed = state->availMem - priorAvail;
state->mergeavailmem[srcTape] += spaceFreed;
/* compact the heap */
tuplesort_heap_siftup(state, false);
if ((tupIndex = state->mergenext[srcTape]) == 0)
{ {
tup = READTUP(state, srcTape, tuplen); /* out of preloaded data on this tape, try to read more */
tuplesort_heap_insert(state, tup, srcTape); mergepreread(state);
/* if still no data, we've reached end of run on this tape */
if ((tupIndex = state->mergenext[srcTape]) == 0)
continue;
} }
/* pull next preread tuple from list, insert in heap */
tup = state->memtuples[tupIndex];
state->mergenext[srcTape] = state->memtupindex[tupIndex];
if (state->mergenext[srcTape] == 0)
state->mergelast[srcTape] = 0;
state->memtupindex[tupIndex] = state->mergefreelist;
state->mergefreelist = tupIndex;
tuplesort_heap_insert(state, tup, srcTape, false);
} }
/* /*
* When the heap empties, we're done. Write an end-of-run marker * When the heap empties, we're done. Write an end-of-run marker
* on the output tape, and increment its count of real runs. * on the output tape, and increment its count of real runs.
...@@ -933,21 +1016,31 @@ mergeonerun(Tuplesortstate *state) ...@@ -933,21 +1016,31 @@ mergeonerun(Tuplesortstate *state)
/* /*
* beginmerge - initialize for a merge pass * beginmerge - initialize for a merge pass
* *
* We load the first tuple from each nondummy input run into the heap. * We decrease the counts of real and dummy runs for each tape, and mark
* We also decrease the counts of real and dummy runs for each tape. * which tapes contain active input runs in mergeactive[]. Then, load
* as many tuples as we can from each active input tape, and finally
* fill the merge heap with the first tuple from each active tape.
*/ */
static void static void
beginmerge(Tuplesortstate *state) beginmerge(Tuplesortstate *state)
{ {
int activeTapes;
int tapenum; int tapenum;
int srcTape; int srcTape;
unsigned int tuplen;
void *tup;
Assert(state->heaptuples != NULL && state->heaptupcount == 0); /* Heap should be empty here */
if (state->heapsrctapes == NULL) Assert(state->memtupcount == 0);
state->heapsrctapes = (int *) palloc(MAXTAPES * sizeof(int));
/* Clear merge-pass state variables */
memset(state->mergeactive, 0, sizeof(state->mergeactive));
memset(state->mergenext, 0, sizeof(state->mergenext));
memset(state->mergelast, 0, sizeof(state->mergelast));
memset(state->mergeavailmem, 0, sizeof(state->mergeavailmem));
state->mergefreelist = 0; /* nothing in the freelist */
state->mergefirstfree = MAXTAPES; /* first slot available for preread */
/* Adjust run counts and mark the active tapes */
activeTapes = 0;
for (tapenum = 0; tapenum < TAPERANGE; tapenum++) for (tapenum = 0; tapenum < TAPERANGE; tapenum++)
{ {
if (state->tp_dummy[tapenum] > 0) if (state->tp_dummy[tapenum] > 0)
...@@ -959,34 +1052,135 @@ beginmerge(Tuplesortstate *state) ...@@ -959,34 +1052,135 @@ beginmerge(Tuplesortstate *state)
Assert(state->tp_runs[tapenum] > 0); Assert(state->tp_runs[tapenum] > 0);
state->tp_runs[tapenum]--; state->tp_runs[tapenum]--;
srcTape = state->tp_tapenum[tapenum]; srcTape = state->tp_tapenum[tapenum];
tuplen = getlen(state, srcTape, false); state->mergeactive[srcTape] = true;
tup = READTUP(state, srcTape, tuplen); activeTapes++;
tuplesort_heap_insert(state, tup, srcTape);
} }
} }
/*
* Initialize space allocation to let each active input tape have
* an equal share of preread space.
*/
Assert(activeTapes > 0);
state->spacePerTape = state->availMem / activeTapes;
for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
{
if (state->mergeactive[srcTape])
state->mergeavailmem[srcTape] = state->spacePerTape;
}
/*
* Preread as many tuples as possible (and at least one) from each
* active tape
*/
mergepreread(state);
/* Load the merge heap with the first tuple from each input tape */
for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
{
int tupIndex = state->mergenext[srcTape];
void *tup;
if (tupIndex)
{
tup = state->memtuples[tupIndex];
state->mergenext[srcTape] = state->memtupindex[tupIndex];
if (state->mergenext[srcTape] == 0)
state->mergelast[srcTape] = 0;
state->memtupindex[tupIndex] = state->mergefreelist;
state->mergefreelist = tupIndex;
tuplesort_heap_insert(state, tup, srcTape, false);
}
}
} }
/* /*
* beginrun - start a new initial run * mergepreread - load tuples from merge input tapes
* *
* The tuples presently in the unsorted memory array are moved into * This routine exists to improve sequentiality of reads during a merge pass,
* the heap. * as explained in the header comments of this file. Load tuples from each
* active source tape until the tape's run is exhausted or it has used up
* its fair share of available memory. In any case, we guarantee that there
* is at one preread tuple available from each unexhausted input tape.
*/ */
static void static void
beginrun(Tuplesortstate *state) mergepreread(Tuplesortstate *state)
{ {
int i; int srcTape;
unsigned int tuplen;
void *tup;
int tupIndex;
long priorAvail,
spaceUsed;
Assert(state->heaptupcount == 0 && state->memtupcount > 0); for (srcTape = 0; srcTape < MAXTAPES; srcTape++)
for (i = 0; i < state->memtupcount; i++) {
tuplesort_heap_insert(state, state->memtuples[i], 0); if (! state->mergeactive[srcTape])
state->memtupcount = 0; continue;
/*
* Skip reading from any tape that still has at least half
* of its target memory filled with tuples (threshold fraction
* may need adjustment?). This avoids reading just a few tuples
* when the incoming runs are not being consumed evenly.
*/
if (state->mergenext[srcTape] != 0 &&
state->mergeavailmem[srcTape] <= state->spacePerTape / 2)
continue;
/*
* Read tuples from this tape until it has used up its free memory,
* but ensure that we have at least one.
*/
priorAvail = state->availMem;
state->availMem = state->mergeavailmem[srcTape];
while (! LACKMEM(state) || state->mergenext[srcTape] == 0)
{
/* read next tuple, if any */
if ((tuplen = getlen(state, srcTape, true)) == 0)
{
state->mergeactive[srcTape] = false;
break;
}
tup = READTUP(state, srcTape, tuplen);
/* find or make a free slot in memtuples[] for it */
tupIndex = state->mergefreelist;
if (tupIndex)
state->mergefreelist = state->memtupindex[tupIndex];
else
{
tupIndex = state->mergefirstfree++;
/* Might need to enlarge arrays! */
if (tupIndex >= state->memtupsize)
{
state->memtupsize *= 2;
state->memtuples = (void **)
repalloc(state->memtuples,
state->memtupsize * sizeof(void *));
state->memtupindex = (int *)
repalloc(state->memtupindex,
state->memtupsize * sizeof(int));
}
}
/* store tuple, append to list for its tape */
state->memtuples[tupIndex] = tup;
state->memtupindex[tupIndex] = 0;
if (state->mergelast[srcTape])
state->memtupindex[state->mergelast[srcTape]] = tupIndex;
else
state->mergenext[srcTape] = tupIndex;
state->mergelast[srcTape] = tupIndex;
}
/* update per-tape and global availmem counts */
spaceUsed = state->mergeavailmem[srcTape] - state->availMem;
state->mergeavailmem[srcTape] = state->availMem;
state->availMem = priorAvail - spaceUsed;
}
} }
/* /*
* dumptuples - remove tuples from heap and write to tape * dumptuples - remove tuples from heap and write to tape
* *
* This is used during initial-run building, but not during merging.
*
* When alltuples = false, dump only enough tuples to get under the * When alltuples = false, dump only enough tuples to get under the
* availMem limit (and leave at least one tuple in the heap in any case, * availMem limit (and leave at least one tuple in the heap in any case,
* since puttuple assumes it always has a tuple to compare to). * since puttuple assumes it always has a tuple to compare to).
...@@ -994,37 +1188,42 @@ beginrun(Tuplesortstate *state) ...@@ -994,37 +1188,42 @@ beginrun(Tuplesortstate *state)
* When alltuples = true, dump everything currently in memory. * When alltuples = true, dump everything currently in memory.
* (This case is only used at end of input data.) * (This case is only used at end of input data.)
* *
* If we empty the heap, then start a new run using the tuples that * If we empty the heap, close out the current run and return (this should
* have accumulated in memtuples[] (if any). * only happen at end of input data). If we see that the tuple run number
* at the top of the heap has changed, start a new run.
*/ */
static void static void
dumptuples(Tuplesortstate *state, bool alltuples) dumptuples(Tuplesortstate *state, bool alltuples)
{ {
while (alltuples || while (alltuples ||
(LACKMEM(state) && (LACKMEM(state) && state->memtupcount > 1))
(state->heaptupcount > 0 || state->memtupcount > 0)))
{ {
/* /*
* Dump the heap's frontmost entry, and sift up to remove it * Dump the heap's frontmost entry, and sift up to remove it
* from the heap. * from the heap.
*/ */
Assert(state->heaptupcount > 0); Assert(state->memtupcount > 0);
WRITETUP(state, state->tp_tapenum[state->destTape], WRITETUP(state, state->tp_tapenum[state->destTape],
state->heaptuples[0]); state->memtuples[0]);
tuplesort_heap_siftup(state); tuplesort_heap_siftup(state, true);
/* /*
* If the heap is now empty, we've finished a run. * If the heap is empty *or* top run number has changed,
* we've finished the current run.
*/ */
if (state->heaptupcount == 0) if (state->memtupcount == 0 ||
state->currentRun != state->memtupindex[0])
{ {
markrunend(state, state->tp_tapenum[state->destTape]); markrunend(state, state->tp_tapenum[state->destTape]);
state->currentRun++;
state->tp_runs[state->destTape]++; state->tp_runs[state->destTape]++;
state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ state->tp_dummy[state->destTape]--; /* per Alg D step D2 */
/*
* Done if heap is empty, else prepare for new run.
*/
if (state->memtupcount == 0) if (state->memtupcount == 0)
break; /* all input data has been written to tape */ break;
/* Select new output tape and start a new run */ Assert(state->currentRun == state->memtupindex[0]);
selectnewtape(state); selectnewtape(state);
beginrun(state);
} }
} }
} }
...@@ -1119,88 +1318,102 @@ tuplesort_restorepos(Tuplesortstate *state) ...@@ -1119,88 +1318,102 @@ tuplesort_restorepos(Tuplesortstate *state)
/* /*
* Heap manipulation routines, per Knuth's Algorithm 5.2.3H. * Heap manipulation routines, per Knuth's Algorithm 5.2.3H.
*
* The heap lives in state->memtuples[], with parallel data storage
* for indexes in state->memtupindex[]. If checkIndex is true, use
* the tuple index as the front of the sort key; otherwise, no.
*/ */
#define HEAPCOMPARE(tup1,index1,tup2,index2) \
(checkIndex && (index1 != index2) ? index1 - index2 : \
COMPARETUP(state, tup1, tup2))
/* /*
* Insert a new tuple into an empty or existing heap, maintaining the * Insert a new tuple into an empty or existing heap, maintaining the
* heap invariant. The heap lives in state->heaptuples[]. Also, if * heap invariant.
* state->heapsrctapes is not NULL, we store each tuple's source tapenum
* in the corresponding element of state->heapsrctapes[].
*/ */
static void static void
tuplesort_heap_insert(Tuplesortstate *state, void *tuple, tuplesort_heap_insert(Tuplesortstate *state, void *tuple,
int tapenum) int tupleindex, bool checkIndex)
{ {
void **memtuples;
int *memtupindex;
int j; int j;
/* /*
* Make sure heaptuples[] can handle another entry. * Make sure memtuples[] can handle another entry.
* NOTE: we do not enlarge heapsrctapes[]; it's supposed
* to be big enough when created.
*/ */
if (state->heaptupcount >= state->heaptupsize) if (state->memtupcount >= state->memtupsize)
{ {
/* Grow the unsorted array as needed. */ state->memtupsize *= 2;
state->heaptupsize *= 2; state->memtuples = (void **)
state->heaptuples = (void **) repalloc(state->memtuples,
repalloc(state->heaptuples, state->memtupsize * sizeof(void *));
state->heaptupsize * sizeof(void *)); state->memtupindex = (int *)
repalloc(state->memtupindex,
state->memtupsize * sizeof(int));
} }
memtuples = state->memtuples;
memtupindex = state->memtupindex;
/* /*
* Sift-up the new entry, per Knuth 5.2.3 exercise 16. * Sift-up the new entry, per Knuth 5.2.3 exercise 16.
* Note that Knuth is using 1-based array indexes, not 0-based. * Note that Knuth is using 1-based array indexes, not 0-based.
*/ */
j = state->heaptupcount++; j = state->memtupcount++;
while (j > 0) { while (j > 0)
{
int i = (j-1) >> 1; int i = (j-1) >> 1;
if (COMPARETUP(state, tuple, state->heaptuples[i]) >= 0) if (HEAPCOMPARE(tuple, tupleindex,
memtuples[i], memtupindex[i]) >= 0)
break; break;
state->heaptuples[j] = state->heaptuples[i]; memtuples[j] = memtuples[i];
if (state->heapsrctapes) memtupindex[j] = memtupindex[i];
state->heapsrctapes[j] = state->heapsrctapes[i];
j = i; j = i;
} }
state->heaptuples[j] = tuple; memtuples[j] = tuple;
if (state->heapsrctapes) memtupindex[j] = tupleindex;
state->heapsrctapes[j] = tapenum;
} }
/* /*
* The tuple at state->heaptuples[0] has been removed from the heap. * The tuple at state->memtuples[0] has been removed from the heap.
* Decrement heaptupcount, and sift up to maintain the heap invariant. * Decrement memtupcount, and sift up to maintain the heap invariant.
*/ */
static void static void
tuplesort_heap_siftup(Tuplesortstate *state) tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex)
{ {
void **heaptuples = state->heaptuples; void **memtuples = state->memtuples;
int *memtupindex = state->memtupindex;
void *tuple; void *tuple;
int i, int tupindex,
i,
n; n;
if (--state->heaptupcount <= 0) if (--state->memtupcount <= 0)
return; return;
n = state->heaptupcount; n = state->memtupcount;
tuple = heaptuples[n]; /* tuple that must be reinserted */ tuple = memtuples[n]; /* tuple that must be reinserted */
tupindex = memtupindex[n];
i = 0; /* i is where the "hole" is */ i = 0; /* i is where the "hole" is */
for (;;) { for (;;)
{
int j = 2*i + 1; int j = 2*i + 1;
if (j >= n) if (j >= n)
break; break;
if (j+1 < n && if (j+1 < n &&
COMPARETUP(state, heaptuples[j], heaptuples[j+1]) > 0) HEAPCOMPARE(memtuples[j], memtupindex[j],
memtuples[j+1], memtupindex[j+1]) > 0)
j++; j++;
if (COMPARETUP(state, tuple, heaptuples[j]) <= 0) if (HEAPCOMPARE(tuple, tupindex,
memtuples[j], memtupindex[j]) <= 0)
break; break;
heaptuples[i] = heaptuples[j]; memtuples[i] = memtuples[j];
if (state->heapsrctapes) memtupindex[i] = memtupindex[j];
state->heapsrctapes[i] = state->heapsrctapes[j];
i = j; i = j;
} }
heaptuples[i] = tuple; memtuples[i] = tuple;
if (state->heapsrctapes) memtupindex[i] = tupindex;
state->heapsrctapes[i] = state->heapsrctapes[n];
} }
...@@ -1252,6 +1465,7 @@ comparetup_heap(Tuplesortstate *state, const void *a, const void *b) ...@@ -1252,6 +1465,7 @@ comparetup_heap(Tuplesortstate *state, const void *a, const void *b)
{ {
HeapTuple ltup = (HeapTuple) a; HeapTuple ltup = (HeapTuple) a;
HeapTuple rtup = (HeapTuple) b; HeapTuple rtup = (HeapTuple) b;
TupleDesc tupDesc = state->tupDesc;
int nkey; int nkey;
for (nkey = 0; nkey < state->nKeys; nkey++) for (nkey = 0; nkey < state->nKeys; nkey++)
...@@ -1265,11 +1479,11 @@ comparetup_heap(Tuplesortstate *state, const void *a, const void *b) ...@@ -1265,11 +1479,11 @@ comparetup_heap(Tuplesortstate *state, const void *a, const void *b)
lattr = heap_getattr(ltup, lattr = heap_getattr(ltup,
scanKey->sk_attno, scanKey->sk_attno,
state->tupDesc, tupDesc,
&isnull1); &isnull1);
rattr = heap_getattr(rtup, rattr = heap_getattr(rtup,
scanKey->sk_attno, scanKey->sk_attno,
state->tupDesc, tupDesc,
&isnull2); &isnull2);
if (isnull1) if (isnull1)
{ {
...@@ -1351,6 +1565,14 @@ readtup_heap(Tuplesortstate *state, int tapenum, unsigned int len) ...@@ -1351,6 +1565,14 @@ readtup_heap(Tuplesortstate *state, int tapenum, unsigned int len)
return (void *) tuple; return (void *) tuple;
} }
static unsigned int
tuplesize_heap(Tuplesortstate *state, void *tup)
{
HeapTuple tuple = (HeapTuple) tup;
return HEAPTUPLESIZE + tuple->t_len;
}
/* /*
* Routines specialized for IndexTuple case * Routines specialized for IndexTuple case
...@@ -1368,16 +1590,17 @@ comparetup_index(Tuplesortstate *state, const void *a, const void *b) ...@@ -1368,16 +1590,17 @@ comparetup_index(Tuplesortstate *state, const void *a, const void *b)
IndexTuple rtup = (IndexTuple) b; IndexTuple rtup = (IndexTuple) b;
TupleDesc itdesc = state->indexRel->rd_att; TupleDesc itdesc = state->indexRel->rd_att;
bool equal_isnull = false; bool equal_isnull = false;
Datum lattr,
rattr;
bool isnull1,
isnull2;
int i; int i;
for (i = 0; i < itdesc->natts; i++) for (i = 1; i <= itdesc->natts; i++)
{ {
lattr = index_getattr(ltup, i + 1, itdesc, &isnull1); Datum lattr,
rattr = index_getattr(rtup, i + 1, itdesc, &isnull2); rattr;
bool isnull1,
isnull2;
lattr = index_getattr(ltup, i, itdesc, &isnull1);
rattr = index_getattr(rtup, i, itdesc, &isnull2);
if (isnull1) if (isnull1)
{ {
...@@ -1389,11 +1612,11 @@ comparetup_index(Tuplesortstate *state, const void *a, const void *b) ...@@ -1389,11 +1612,11 @@ comparetup_index(Tuplesortstate *state, const void *a, const void *b)
else if (isnull2) else if (isnull2)
return -1; return -1;
if (_bt_invokestrat(state->indexRel, i + 1, if (_bt_invokestrat(state->indexRel, i,
BTGreaterStrategyNumber, BTGreaterStrategyNumber,
lattr, rattr)) lattr, rattr))
return 1; return 1;
if (_bt_invokestrat(state->indexRel, i + 1, if (_bt_invokestrat(state->indexRel, i,
BTGreaterStrategyNumber, BTGreaterStrategyNumber,
rattr, lattr)) rattr, lattr))
return -1; return -1;
...@@ -1463,3 +1686,12 @@ readtup_index(Tuplesortstate *state, int tapenum, unsigned int len) ...@@ -1463,3 +1686,12 @@ readtup_index(Tuplesortstate *state, int tapenum, unsigned int len)
elog(ERROR, "tuplesort: unexpected end of data"); elog(ERROR, "tuplesort: unexpected end of data");
return (void *) tuple; return (void *) tuple;
} }
static unsigned int
tuplesize_index(Tuplesortstate *state, void *tup)
{
IndexTuple tuple = (IndexTuple) tup;
unsigned int tuplen = IndexTupleSize(tuple);
return tuplen;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment