Commit e94568ec authored by Heikki Linnakangas's avatar Heikki Linnakangas

Change the way pre-reading in external sort's merge phase works.

Don't pre-read tuples into SortTuple slots during merge. Instead, use the
memory for larger read buffers in logtape.c. We're doing the same number
of READTUP() calls either way, but managing the pre-read SortTuple slots
is much more complicated. Also, the on-tape representation is more compact
than SortTuples, so we can fit more pre-read tuples into the same amount
of memory this way. And we have better cache-locality, when we use just a
small number of SortTuple slots.

Now that we only hold one tuple from each tape in the SortTuple slots, we
can greatly simplify the "batch memory" management. We now maintain a
small set of fixed-sized slots, to hold the tuples, and fall back to
palloc() for larger tuples. We use this method during all merge phases,
not just the final merge, and also when randomAccess is requested, and
also in the TSS_SORTEDONTAPE case. In other words, it's used whenever we
do an external sort.

Reviewed by Peter Geoghegan and Claudio Freire.

Discussion: <CAM3SWZTpaORV=yQGVCG8Q4axcZ3MvF-05xe39ZvORdU9JcD6hQ@mail.gmail.com>
parent e8bdee27
...@@ -52,12 +52,17 @@ ...@@ -52,12 +52,17 @@
* not clear this helps much, but it can't hurt. (XXX perhaps a LIFO * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
* policy for free blocks would be better?) * policy for free blocks would be better?)
* *
* To further make the I/Os more sequential, we can use a larger buffer
* when reading, and read multiple blocks from the same tape in one go,
* whenever the buffer becomes empty. LogicalTapeAssignReadBufferSize()
* can be used to set the size of the read buffer.
*
* To support the above policy of writing to the lowest free block, * To support the above policy of writing to the lowest free block,
* ltsGetFreeBlock sorts the list of free block numbers into decreasing * ltsGetFreeBlock sorts the list of free block numbers into decreasing
* order each time it is asked for a block and the list isn't currently * order each time it is asked for a block and the list isn't currently
* sorted. This is an efficient way to handle it because we expect cycles * sorted. This is an efficient way to handle it because we expect cycles
* of releasing many blocks followed by re-using many blocks, due to * of releasing many blocks followed by re-using many blocks, due to
* tuplesort.c's "preread" behavior. * the larger read buffer.
* *
* Since all the bookkeeping and buffer memory is allocated with palloc(), * Since all the bookkeeping and buffer memory is allocated with palloc(),
* and the underlying file(s) are made with OpenTemporaryFile, all resources * and the underlying file(s) are made with OpenTemporaryFile, all resources
...@@ -79,6 +84,7 @@ ...@@ -79,6 +84,7 @@
#include "storage/buffile.h" #include "storage/buffile.h"
#include "utils/logtape.h" #include "utils/logtape.h"
#include "utils/memutils.h"
/* /*
* Block indexes are "long"s, so we can fit this many per indirect block. * Block indexes are "long"s, so we can fit this many per indirect block.
...@@ -131,9 +137,18 @@ typedef struct LogicalTape ...@@ -131,9 +137,18 @@ typedef struct LogicalTape
* reading. * reading.
*/ */
char *buffer; /* physical buffer (separately palloc'd) */ char *buffer; /* physical buffer (separately palloc'd) */
int buffer_size; /* allocated size of the buffer */
long curBlockNumber; /* this block's logical blk# within tape */ long curBlockNumber; /* this block's logical blk# within tape */
int pos; /* next read/write position in buffer */ int pos; /* next read/write position in buffer */
int nbytes; /* total # of valid bytes in buffer */ int nbytes; /* total # of valid bytes in buffer */
/*
* Desired buffer size to use when reading. To keep things simple, we use
* a single-block buffer when writing, or when reading a frozen tape. But
* when we are reading and will only read forwards, we allocate a larger
* buffer, determined by read_buffer_size.
*/
int read_buffer_size;
} LogicalTape; } LogicalTape;
/* /*
...@@ -227,6 +242,53 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer) ...@@ -227,6 +242,53 @@ ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
blocknum))); blocknum)));
} }
/*
* Read as many blocks as we can into the per-tape buffer.
*
* The caller can specify the next physical block number to read, in
* datablocknum, or -1 to fetch the next block number from the internal block.
* If datablocknum == -1, the caller must've already set curBlockNumber.
*
* Returns true if anything was read, 'false' on EOF.
*/
static bool
ltsReadFillBuffer(LogicalTapeSet *lts, LogicalTape *lt, long datablocknum)
{
lt->pos = 0;
lt->nbytes = 0;
do
{
/* Fetch next block number (unless provided by caller) */
if (datablocknum == -1)
{
datablocknum = ltsRecallNextBlockNum(lts, lt->indirect, lt->frozen);
if (datablocknum == -1L)
break; /* EOF */
lt->curBlockNumber++;
}
/* Read the block */
ltsReadBlock(lts, datablocknum, (void *) (lt->buffer + lt->nbytes));
if (!lt->frozen)
ltsReleaseBlock(lts, datablocknum);
if (lt->curBlockNumber < lt->numFullBlocks)
lt->nbytes += BLCKSZ;
else
{
/* EOF */
lt->nbytes += lt->lastBlockBytes;
break;
}
/* Advance to next block, if we have buffer space left */
datablocknum = -1;
} while (lt->nbytes < lt->buffer_size);
return (lt->nbytes > 0);
}
/* /*
* qsort comparator for sorting freeBlocks[] into decreasing order. * qsort comparator for sorting freeBlocks[] into decreasing order.
*/ */
...@@ -546,6 +608,8 @@ LogicalTapeSetCreate(int ntapes) ...@@ -546,6 +608,8 @@ LogicalTapeSetCreate(int ntapes)
lt->numFullBlocks = 0L; lt->numFullBlocks = 0L;
lt->lastBlockBytes = 0; lt->lastBlockBytes = 0;
lt->buffer = NULL; lt->buffer = NULL;
lt->buffer_size = 0;
lt->read_buffer_size = BLCKSZ;
lt->curBlockNumber = 0L; lt->curBlockNumber = 0L;
lt->pos = 0; lt->pos = 0;
lt->nbytes = 0; lt->nbytes = 0;
...@@ -628,7 +692,10 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, ...@@ -628,7 +692,10 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
/* Allocate data buffer and first indirect block on first write */ /* Allocate data buffer and first indirect block on first write */
if (lt->buffer == NULL) if (lt->buffer == NULL)
{
lt->buffer = (char *) palloc(BLCKSZ); lt->buffer = (char *) palloc(BLCKSZ);
lt->buffer_size = BLCKSZ;
}
if (lt->indirect == NULL) if (lt->indirect == NULL)
{ {
lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock)); lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
...@@ -636,6 +703,7 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, ...@@ -636,6 +703,7 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
lt->indirect->nextup = NULL; lt->indirect->nextup = NULL;
} }
Assert(lt->buffer_size == BLCKSZ);
while (size > 0) while (size > 0)
{ {
if (lt->pos >= BLCKSZ) if (lt->pos >= BLCKSZ)
...@@ -709,18 +777,19 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite) ...@@ -709,18 +777,19 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
Assert(lt->frozen); Assert(lt->frozen);
datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect); datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
} }
/* Allocate a read buffer */
if (lt->buffer)
pfree(lt->buffer);
lt->buffer = palloc(lt->read_buffer_size);
lt->buffer_size = lt->read_buffer_size;
/* Read the first block, or reset if tape is empty */ /* Read the first block, or reset if tape is empty */
lt->curBlockNumber = 0L; lt->curBlockNumber = 0L;
lt->pos = 0; lt->pos = 0;
lt->nbytes = 0; lt->nbytes = 0;
if (datablocknum != -1L) if (datablocknum != -1L)
{ ltsReadFillBuffer(lts, lt, datablocknum);
ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
if (!lt->frozen)
ltsReleaseBlock(lts, datablocknum);
lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
BLCKSZ : lt->lastBlockBytes;
}
} }
else else
{ {
...@@ -754,6 +823,13 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite) ...@@ -754,6 +823,13 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
lt->curBlockNumber = 0L; lt->curBlockNumber = 0L;
lt->pos = 0; lt->pos = 0;
lt->nbytes = 0; lt->nbytes = 0;
if (lt->buffer)
{
pfree(lt->buffer);
lt->buffer = NULL;
lt->buffer_size = 0;
}
} }
} }
...@@ -779,20 +855,8 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum, ...@@ -779,20 +855,8 @@ LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
if (lt->pos >= lt->nbytes) if (lt->pos >= lt->nbytes)
{ {
/* Try to load more data into buffer. */ /* Try to load more data into buffer. */
long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect, if (!ltsReadFillBuffer(lts, lt, -1))
lt->frozen);
if (datablocknum == -1L)
break; /* EOF */ break; /* EOF */
lt->curBlockNumber++;
lt->pos = 0;
ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
if (!lt->frozen)
ltsReleaseBlock(lts, datablocknum);
lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
BLCKSZ : lt->lastBlockBytes;
if (lt->nbytes <= 0)
break; /* EOF (possible here?) */
} }
nthistime = lt->nbytes - lt->pos; nthistime = lt->nbytes - lt->pos;
...@@ -842,6 +906,22 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum) ...@@ -842,6 +906,22 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
lt->writing = false; lt->writing = false;
lt->frozen = true; lt->frozen = true;
datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true); datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
/*
* The seek and backspace functions assume a single block read buffer.
* That's OK with current usage. A larger buffer is helpful to make the
* read pattern of the backing file look more sequential to the OS, when
* we're reading from multiple tapes. But at the end of a sort, when a
* tape is frozen, we only read from a single tape anyway.
*/
if (!lt->buffer || lt->buffer_size != BLCKSZ)
{
if (lt->buffer)
pfree(lt->buffer);
lt->buffer = palloc(BLCKSZ);
lt->buffer_size = BLCKSZ;
}
/* Read the first block, or reset if tape is empty */ /* Read the first block, or reset if tape is empty */
lt->curBlockNumber = 0L; lt->curBlockNumber = 0L;
lt->pos = 0; lt->pos = 0;
...@@ -875,6 +955,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size) ...@@ -875,6 +955,7 @@ LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
Assert(tapenum >= 0 && tapenum < lts->nTapes); Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = &lts->tapes[tapenum]; lt = &lts->tapes[tapenum];
Assert(lt->frozen); Assert(lt->frozen);
Assert(lt->buffer_size == BLCKSZ);
/* /*
* Easy case for seek within current block. * Easy case for seek within current block.
...@@ -941,6 +1022,7 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, ...@@ -941,6 +1022,7 @@ LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
lt = &lts->tapes[tapenum]; lt = &lts->tapes[tapenum];
Assert(lt->frozen); Assert(lt->frozen);
Assert(offset >= 0 && offset <= BLCKSZ); Assert(offset >= 0 && offset <= BLCKSZ);
Assert(lt->buffer_size == BLCKSZ);
/* /*
* Easy case for seek within current block. * Easy case for seek within current block.
...@@ -1002,6 +1084,10 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum, ...@@ -1002,6 +1084,10 @@ LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
Assert(tapenum >= 0 && tapenum < lts->nTapes); Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = &lts->tapes[tapenum]; lt = &lts->tapes[tapenum];
/* With a larger buffer, 'pos' wouldn't be the same as offset within page */
Assert(lt->buffer_size == BLCKSZ);
*blocknum = lt->curBlockNumber; *blocknum = lt->curBlockNumber;
*offset = lt->pos; *offset = lt->pos;
} }
...@@ -1014,3 +1100,28 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts) ...@@ -1014,3 +1100,28 @@ LogicalTapeSetBlocks(LogicalTapeSet *lts)
{ {
return lts->nFileBlocks; return lts->nFileBlocks;
} }
/*
* Set buffer size to use, when reading from given tape.
*/
void
LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum, size_t avail_mem)
{
LogicalTape *lt;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = &lts->tapes[tapenum];
/*
* The buffer size must be a multiple of BLCKSZ in size, so round the
* given value down to nearest BLCKSZ. Make sure we have at least one
* page. Also, don't go above MaxAllocSize, to avoid erroring out. A
* multi-gigabyte buffer is unlikely to be helpful, anyway.
*/
if (avail_mem < BLCKSZ)
avail_mem = BLCKSZ;
if (avail_mem > MaxAllocSize)
avail_mem = MaxAllocSize;
avail_mem -= avail_mem % BLCKSZ;
lt->read_buffer_size = avail_mem;
}
This diff is collapsed.
...@@ -39,6 +39,8 @@ extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, ...@@ -39,6 +39,8 @@ extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
long blocknum, int offset); long blocknum, int offset);
extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
long *blocknum, int *offset); long *blocknum, int *offset);
extern void LogicalTapeAssignReadBufferSize(LogicalTapeSet *lts, int tapenum,
size_t bufsize);
extern long LogicalTapeSetBlocks(LogicalTapeSet *lts); extern long LogicalTapeSetBlocks(LogicalTapeSet *lts);
#endif /* LOGTAPE_H */ #endif /* LOGTAPE_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment