Commit 24598337 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Implement binary heap replace-top operation in a smarter way.

In external sort's merge phase, we maintain a binary heap holding the next
tuple from each input tape. On each step, the topmost tuple is returned,
and replaced with the next tuple from the same tape. We were doing the
replacement by deleting the top node in one operation, and inserting the
next tuple after that. However, you can do a "replace-top" operation more
efficiently, in one "sift-up". A deletion will always walk the heap from
top to bottom, but in a replacement, we can stop as soon as we find the
right place for the new tuple. This is particularly helpful, if the tapes
are not in completely random order, so that the next tuple from a tape is
likely to land near the top of the heap.

Peter Geoghegan, reviewed by Claudio Freire, with some editing by me.

Discussion: <CAM3SWZRhBhiknTF_=NjDSnNZ11hx=U_SEYwbc5vd=x7M4mMiCw@mail.gmail.com>
parent f2717c79
...@@ -69,25 +69,25 @@ ...@@ -69,25 +69,25 @@
* using Algorithm D. * using Algorithm D.
* *
* When merging runs, we use a heap containing just the frontmost tuple from * When merging runs, we use a heap containing just the frontmost tuple from
* each source run; we repeatedly output the smallest tuple and insert the * each source run; we repeatedly output the smallest tuple and replace it
* next tuple from its source tape (if any). When the heap empties, the merge * with the next tuple from its source tape (if any). When the heap empties,
* is complete. The basic merge algorithm thus needs very little memory --- * the merge is complete. The basic merge algorithm thus needs very little
* only M tuples for an M-way merge, and M is constrained to a small number. * memory --- only M tuples for an M-way merge, and M is constrained to a
* However, we can still make good use of our full workMem allocation by * small number. However, we can still make good use of our full workMem
* pre-reading additional tuples from each source tape. Without prereading, * allocation by pre-reading additional tuples from each source tape. Without
* our access pattern to the temporary file would be very erratic; on average * prereading, our access pattern to the temporary file would be very erratic;
* we'd read one block from each of M source tapes during the same time that * on average we'd read one block from each of M source tapes during the same
* we're writing M blocks to the output tape, so there is no sequentiality of * time that we're writing M blocks to the output tape, so there is no
* access at all, defeating the read-ahead methods used by most Unix kernels. * sequentiality of access at all, defeating the read-ahead methods used by
* Worse, the output tape gets written into a very random sequence of blocks * most Unix kernels. Worse, the output tape gets written into a very random
* of the temp file, ensuring that things will be even worse when it comes * sequence of blocks of the temp file, ensuring that things will be even
* time to read that tape. A straightforward merge pass thus ends up doing a * worse when it comes time to read that tape. A straightforward merge pass
* lot of waiting for disk seeks. We can improve matters by prereading from * thus ends up doing a lot of waiting for disk seeks. We can improve matters
* each source tape sequentially, loading about workMem/M bytes from each tape * by prereading from each source tape sequentially, loading about workMem/M
* in turn. Then we run the merge algorithm, writing but not reading until * bytes from each tape in turn. Then we run the merge algorithm, writing but
* one of the preloaded tuple series runs out. Then we switch back to preread * not reading until one of the preloaded tuple series runs out. Then we
* mode, fill memory again, and repeat. This approach helps to localize both * switch back to preread mode, fill memory again, and repeat. This approach
* read and write accesses. * helps to localize both read and write accesses.
* *
* When the caller requests random access to the sort result, we form * When the caller requests random access to the sort result, we form
* the final sorted run on a logical tape which is then "frozen", so * the final sorted run on a logical tape which is then "frozen", so
...@@ -569,8 +569,10 @@ static void make_bounded_heap(Tuplesortstate *state); ...@@ -569,8 +569,10 @@ static void make_bounded_heap(Tuplesortstate *state);
static void sort_bounded_heap(Tuplesortstate *state); static void sort_bounded_heap(Tuplesortstate *state);
static void tuplesort_sort_memtuples(Tuplesortstate *state); static void tuplesort_sort_memtuples(Tuplesortstate *state);
static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple, static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
int tupleindex, bool checkIndex); bool checkIndex);
static void tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex); static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple,
bool checkIndex);
static void tuplesort_heap_delete_top(Tuplesortstate *state, bool checkIndex);
static void reversedirection(Tuplesortstate *state); static void reversedirection(Tuplesortstate *state);
static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
static void markrunend(Tuplesortstate *state, int tapenum); static void markrunend(Tuplesortstate *state, int tapenum);
...@@ -1617,10 +1619,10 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple) ...@@ -1617,10 +1619,10 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple)
} }
else else
{ {
/* discard top of heap, sift up, insert new tuple */ /* discard top of heap, replacing it with the new tuple */
free_sort_tuple(state, &state->memtuples[0]); free_sort_tuple(state, &state->memtuples[0]);
tuplesort_heap_siftup(state, false); tuple->tupindex = 0; /* not used */
tuplesort_heap_insert(state, tuple, 0, false); tuplesort_heap_replace_top(state, tuple, false);
} }
break; break;
...@@ -1665,7 +1667,8 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple) ...@@ -1665,7 +1667,8 @@ puttuple_common(Tuplesortstate *state, SortTuple *tuple)
* initial COMPARETUP() call is required for the tuple, to * initial COMPARETUP() call is required for the tuple, to
* determine that the tuple does not belong in RUN_FIRST). * determine that the tuple does not belong in RUN_FIRST).
*/ */
tuplesort_heap_insert(state, tuple, state->currentRun, true); tuple->tupindex = state->currentRun;
tuplesort_heap_insert(state, tuple, true);
} }
else else
{ {
...@@ -1987,7 +1990,6 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, ...@@ -1987,7 +1990,6 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
* more generally. * more generally.
*/ */
*stup = state->memtuples[0]; *stup = state->memtuples[0];
tuplesort_heap_siftup(state, false);
if ((tupIndex = state->mergenext[srcTape]) == 0) if ((tupIndex = state->mergenext[srcTape]) == 0)
{ {
/* /*
...@@ -2008,18 +2010,25 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, ...@@ -2008,18 +2010,25 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
*/ */
if ((tupIndex = state->mergenext[srcTape]) == 0) if ((tupIndex = state->mergenext[srcTape]) == 0)
{ {
/* Remove the top node from the heap */
tuplesort_heap_delete_top(state, false);
/* Free tape's buffer, avoiding dangling pointer */ /* Free tape's buffer, avoiding dangling pointer */
if (state->batchUsed) if (state->batchUsed)
mergebatchfreetape(state, srcTape, stup, should_free); mergebatchfreetape(state, srcTape, stup, should_free);
return true; return true;
} }
} }
/* pull next preread tuple from list, insert in heap */
/*
* pull next preread tuple from list, and replace the returned
* tuple at top of the heap with it.
*/
newtup = &state->memtuples[tupIndex]; newtup = &state->memtuples[tupIndex];
state->mergenext[srcTape] = newtup->tupindex; state->mergenext[srcTape] = newtup->tupindex;
if (state->mergenext[srcTape] == 0) if (state->mergenext[srcTape] == 0)
state->mergelast[srcTape] = 0; state->mergelast[srcTape] = 0;
tuplesort_heap_insert(state, newtup, srcTape, false); newtup->tupindex = srcTape;
tuplesort_heap_replace_top(state, newtup, false);
/* put the now-unused memtuples entry on the freelist */ /* put the now-unused memtuples entry on the freelist */
newtup->tupindex = state->mergefreelist; newtup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex; state->mergefreelist = tupIndex;
...@@ -2394,7 +2403,8 @@ inittapes(Tuplesortstate *state) ...@@ -2394,7 +2403,8 @@ inittapes(Tuplesortstate *state)
/* Must copy source tuple to avoid possible overwrite */ /* Must copy source tuple to avoid possible overwrite */
SortTuple stup = state->memtuples[j]; SortTuple stup = state->memtuples[j];
tuplesort_heap_insert(state, &stup, 0, false); stup.tupindex = RUN_FIRST;
tuplesort_heap_insert(state, &stup, false);
} }
Assert(state->memtupcount == ntuples); Assert(state->memtupcount == ntuples);
} }
...@@ -2642,22 +2652,29 @@ mergeonerun(Tuplesortstate *state) ...@@ -2642,22 +2652,29 @@ mergeonerun(Tuplesortstate *state)
/* writetup adjusted total free space, now fix per-tape space */ /* writetup adjusted total free space, now fix per-tape space */
spaceFreed = state->availMem - priorAvail; spaceFreed = state->availMem - priorAvail;
state->mergeavailmem[srcTape] += spaceFreed; state->mergeavailmem[srcTape] += spaceFreed;
/* compact the heap */
tuplesort_heap_siftup(state, false);
if ((tupIndex = state->mergenext[srcTape]) == 0) if ((tupIndex = state->mergenext[srcTape]) == 0)
{ {
/* out of preloaded data on this tape, try to read more */ /* out of preloaded data on this tape, try to read more */
mergepreread(state); mergepreread(state);
/* if still no data, we've reached end of run on this tape */ /* if still no data, we've reached end of run on this tape */
if ((tupIndex = state->mergenext[srcTape]) == 0) if ((tupIndex = state->mergenext[srcTape]) == 0)
{
/* remove the written-out tuple from the heap */
tuplesort_heap_delete_top(state, false);
continue; continue;
} }
/* pull next preread tuple from list, insert in heap */ }
/*
* pull next preread tuple from list, and replace the written-out
* tuple in the heap with it.
*/
tup = &state->memtuples[tupIndex]; tup = &state->memtuples[tupIndex];
state->mergenext[srcTape] = tup->tupindex; state->mergenext[srcTape] = tup->tupindex;
if (state->mergenext[srcTape] == 0) if (state->mergenext[srcTape] == 0)
state->mergelast[srcTape] = 0; state->mergelast[srcTape] = 0;
tuplesort_heap_insert(state, tup, srcTape, false); tup->tupindex = srcTape;
tuplesort_heap_replace_top(state, tup, false);
/* put the now-unused memtuples entry on the freelist */ /* put the now-unused memtuples entry on the freelist */
tup->tupindex = state->mergefreelist; tup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex; state->mergefreelist = tupIndex;
...@@ -2793,7 +2810,8 @@ beginmerge(Tuplesortstate *state, bool finalMergeBatch) ...@@ -2793,7 +2810,8 @@ beginmerge(Tuplesortstate *state, bool finalMergeBatch)
state->mergenext[srcTape] = tup->tupindex; state->mergenext[srcTape] = tup->tupindex;
if (state->mergenext[srcTape] == 0) if (state->mergenext[srcTape] == 0)
state->mergelast[srcTape] = 0; state->mergelast[srcTape] = 0;
tuplesort_heap_insert(state, tup, srcTape, false); tup->tupindex = srcTape;
tuplesort_heap_insert(state, tup, false);
/* put the now-unused memtuples entry on the freelist */ /* put the now-unused memtuples entry on the freelist */
tup->tupindex = state->mergefreelist; tup->tupindex = state->mergefreelist;
state->mergefreelist = tupIndex; state->mergefreelist = tupIndex;
...@@ -3275,13 +3293,12 @@ dumptuples(Tuplesortstate *state, bool alltuples) ...@@ -3275,13 +3293,12 @@ dumptuples(Tuplesortstate *state, bool alltuples)
* Still holding out for a case favorable to replacement * Still holding out for a case favorable to replacement
* selection. Still incrementally spilling using heap. * selection. Still incrementally spilling using heap.
* *
* Dump the heap's frontmost entry, and sift up to remove it from * Dump the heap's frontmost entry, and remove it from the heap.
* the heap.
*/ */
Assert(state->memtupcount > 0); Assert(state->memtupcount > 0);
WRITETUP(state, state->tp_tapenum[state->destTape], WRITETUP(state, state->tp_tapenum[state->destTape],
&state->memtuples[0]); &state->memtuples[0]);
tuplesort_heap_siftup(state, true); tuplesort_heap_delete_top(state, true);
} }
else else
{ {
...@@ -3644,27 +3661,29 @@ make_bounded_heap(Tuplesortstate *state) ...@@ -3644,27 +3661,29 @@ make_bounded_heap(Tuplesortstate *state)
state->memtupcount = 0; /* make the heap empty */ state->memtupcount = 0; /* make the heap empty */
for (i = 0; i < tupcount; i++) for (i = 0; i < tupcount; i++)
{ {
if (state->memtupcount >= state->bound && if (state->memtupcount < state->bound)
COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
{
/* New tuple would just get thrown out, so skip it */
free_sort_tuple(state, &state->memtuples[i]);
CHECK_FOR_INTERRUPTS();
}
else
{ {
/* Insert next tuple into heap */ /* Insert next tuple into heap */
/* Must copy source tuple to avoid possible overwrite */ /* Must copy source tuple to avoid possible overwrite */
SortTuple stup = state->memtuples[i]; SortTuple stup = state->memtuples[i];
tuplesort_heap_insert(state, &stup, 0, false); stup.tupindex = 0; /* not used */
tuplesort_heap_insert(state, &stup, false);
/* If heap too full, discard largest entry */ }
if (state->memtupcount > state->bound) else
{ {
free_sort_tuple(state, &state->memtuples[0]); /*
tuplesort_heap_siftup(state, false); * The heap is full. Replace the largest entry with the new
* tuple, or just discard it, if it's larger than anything already
* in the heap.
*/
if (COMPARETUP(state, &state->memtuples[i], &state->memtuples[0]) <= 0)
{
free_sort_tuple(state, &state->memtuples[i]);
CHECK_FOR_INTERRUPTS();
} }
else
tuplesort_heap_replace_top(state, &state->memtuples[i], false);
} }
} }
...@@ -3685,16 +3704,16 @@ sort_bounded_heap(Tuplesortstate *state) ...@@ -3685,16 +3704,16 @@ sort_bounded_heap(Tuplesortstate *state)
Assert(tupcount == state->bound); Assert(tupcount == state->bound);
/* /*
* We can unheapify in place because each sift-up will remove the largest * We can unheapify in place because each delete-top call will remove the
* entry, which we can promptly store in the newly freed slot at the end. * largest entry, which we can promptly store in the newly freed slot at
* Once we're down to a single-entry heap, we're done. * the end. Once we're down to a single-entry heap, we're done.
*/ */
while (state->memtupcount > 1) while (state->memtupcount > 1)
{ {
SortTuple stup = state->memtuples[0]; SortTuple stup = state->memtuples[0];
/* this sifts-up the next-largest entry and decreases memtupcount */ /* this sifts-up the next-largest entry and decreases memtupcount */
tuplesort_heap_siftup(state, false); tuplesort_heap_delete_top(state, false);
state->memtuples[state->memtupcount] = stup; state->memtuples[state->memtupcount] = stup;
} }
state->memtupcount = tupcount; state->memtupcount = tupcount;
...@@ -3738,30 +3757,21 @@ tuplesort_sort_memtuples(Tuplesortstate *state) ...@@ -3738,30 +3757,21 @@ tuplesort_sort_memtuples(Tuplesortstate *state)
* Insert a new tuple into an empty or existing heap, maintaining the * Insert a new tuple into an empty or existing heap, maintaining the
* heap invariant. Caller is responsible for ensuring there's room. * heap invariant. Caller is responsible for ensuring there's room.
* *
* Note: we assume *tuple is a temporary variable that can be scribbled on. * Note: For some callers, tuple points to a memtuples[] entry above the
* For some callers, tuple actually points to a memtuples[] entry above the
* end of the heap. This is safe as long as it's not immediately adjacent * end of the heap. This is safe as long as it's not immediately adjacent
* to the end of the heap (ie, in the [memtupcount] array entry) --- if it * to the end of the heap (ie, in the [memtupcount] array entry) --- if it
* is, it might get overwritten before being moved into the heap! * is, it might get overwritten before being moved into the heap!
*/ */
static void static void
tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple, tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
int tupleindex, bool checkIndex) bool checkIndex)
{ {
SortTuple *memtuples; SortTuple *memtuples;
int j; int j;
/*
* Save the tupleindex --- see notes above about writing on *tuple. It's a
* historical artifact that tupleindex is passed as a separate argument
* and not in *tuple, but it's notationally convenient so let's leave it
* that way.
*/
tuple->tupindex = tupleindex;
memtuples = state->memtuples; memtuples = state->memtuples;
Assert(state->memtupcount < state->memtupsize); Assert(state->memtupcount < state->memtupsize);
Assert(!checkIndex || tupleindex == RUN_FIRST); Assert(!checkIndex || tuple->tupindex == RUN_FIRST);
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
...@@ -3783,25 +3793,51 @@ tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple, ...@@ -3783,25 +3793,51 @@ tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple,
} }
/* /*
* The tuple at state->memtuples[0] has been removed from the heap. * Remove the tuple at state->memtuples[0] from the heap. Decrement
* Decrement memtupcount, and sift up to maintain the heap invariant. * memtupcount, and sift up to maintain the heap invariant.
*
* The caller has already free'd the tuple the top node points to,
* if necessary.
*/ */
static void static void
tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex) tuplesort_heap_delete_top(Tuplesortstate *state, bool checkIndex)
{ {
SortTuple *memtuples = state->memtuples; SortTuple *memtuples = state->memtuples;
SortTuple *tuple; SortTuple *tuple;
int i,
n;
Assert(!checkIndex || state->currentRun == RUN_FIRST); Assert(!checkIndex || state->currentRun == RUN_FIRST);
if (--state->memtupcount <= 0) if (--state->memtupcount <= 0)
return; return;
/*
* Remove the last tuple in the heap, and re-insert it, by replacing the
* current top node with it.
*/
tuple = &memtuples[state->memtupcount];
tuplesort_heap_replace_top(state, tuple, checkIndex);
}
/*
* Replace the tuple at state->memtuples[0] with a new tuple. Sift up to
* maintain the heap invariant.
*
* This corresponds to Knuth's "sift-up" algorithm (Algorithm 5.2.3H,
* Heapsort, steps H3-H8).
*/
static void
tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple,
bool checkIndex)
{
SortTuple *memtuples = state->memtuples;
int i,
n;
Assert(!checkIndex || state->currentRun == RUN_FIRST);
Assert(state->memtupcount >= 1);
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
n = state->memtupcount; n = state->memtupcount;
tuple = &memtuples[n]; /* tuple that must be reinserted */
i = 0; /* i is where the "hole" is */ i = 0; /* i is where the "hole" is */
for (;;) for (;;)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment