Commit 56788d21 authored by David Rowley's avatar David Rowley

Allocate consecutive blocks during parallel seqscans

Previously we would allocate blocks to parallel workers during a parallel
sequential scan 1 block at a time.  Since other workers were likely to
request a block before a worker returns for another block number to work
on, this could lead to non-sequential I/O patterns in each worker which
could cause the operating system's readahead to perform poorly or not at
all.

Here we change things so that we allocate consecutive "chunks" of blocks
to workers and have them work on those until they're done, at which time
we allocate another chunk for the worker.  The size of these chunks is
based on the size of the relation.

Initial patch here was by Thomas Munro which showed some good improvements
just having a fixed chunk size of 64 blocks with a simple ramp-down near
the end of the scan. The revisions of the patch to make the chunk size
based on the relation size and the adjusted ramp-down in powers of two was
done by me, along with quite extensive benchmarking to determine the
optimal chunk sizes.

For the most part, benchmarks have shown significant performance
improvements for large parallel sequential scans on Linux, FreeBSD and
Windows using SSDs.  It's less clear how this affects the performance of
cloud providers.  Tests done so far are unable to obtain stable enough
performance to provide meaningful benchmark results.  It is possible that
this could cause some performance regressions on more obscure filesystems,
so we may need to later provide users with some ability to get something
closer to the old behavior.  For now, let's leave that until we see that
it's really required.

Author: Thomas Munro, David Rowley
Reviewed-by: Ranier Vilela, Soumyadeep Chakraborty, Robert Haas
Reviewed-by: Amit Kapila, Kirk Jamison
Discussion: https://postgr.es/m/CA+hUKGJ_EErDv41YycXcbMbCBkztA34+z1ts9VQH+ACRuvpxig@mail.gmail.com
parent 11a68e4b
......@@ -520,12 +520,14 @@ heapgettup(HeapScanDesc scan,
{
ParallelBlockTableScanDesc pbscan =
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
ParallelBlockTableScanWorker pbscanwork =
(ParallelBlockTableScanWorker) scan->rs_base.rs_private;
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
pbscan);
pbscanwork, pbscan);
page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
pbscan);
pbscanwork, pbscan);
/* Other processes might have already finished the scan. */
if (page == InvalidBlockNumber)
......@@ -720,9 +722,11 @@ heapgettup(HeapScanDesc scan,
{
ParallelBlockTableScanDesc pbscan =
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
ParallelBlockTableScanWorker pbscanwork =
(ParallelBlockTableScanWorker) scan->rs_base.rs_private;
page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
pbscan);
pbscanwork, pbscan);
finished = (page == InvalidBlockNumber);
}
else
......@@ -834,12 +838,14 @@ heapgettup_pagemode(HeapScanDesc scan,
{
ParallelBlockTableScanDesc pbscan =
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
ParallelBlockTableScanWorker pbscanwork =
(ParallelBlockTableScanWorker) scan->rs_base.rs_private;
table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
pbscan);
pbscanwork, pbscan);
page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
pbscan);
pbscanwork, pbscan);
/* Other processes might have already finished the scan. */
if (page == InvalidBlockNumber)
......@@ -1019,9 +1025,11 @@ heapgettup_pagemode(HeapScanDesc scan,
{
ParallelBlockTableScanDesc pbscan =
(ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
ParallelBlockTableScanWorker pbscanwork =
(ParallelBlockTableScanWorker) scan->rs_base.rs_private;
page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
pbscan);
pbscanwork, pbscan);
finished = (page == InvalidBlockNumber);
}
else
......@@ -1155,6 +1163,8 @@ heap_beginscan(Relation relation, Snapshot snapshot,
scan->rs_base.rs_nkeys = nkeys;
scan->rs_base.rs_flags = flags;
scan->rs_base.rs_parallel = parallel_scan;
scan->rs_base.rs_private =
palloc(sizeof(ParallelBlockTableScanWorkerData));
scan->rs_strategy = NULL; /* set in initscan */
/*
......
......@@ -25,10 +25,24 @@
#include "access/tableam.h"
#include "access/xact.h"
#include "optimizer/plancat.h"
#include "port/pg_bitutils.h"
#include "storage/bufmgr.h"
#include "storage/shmem.h"
#include "storage/smgr.h"
/*
* Constants to control the behavior of block allocation to parallel workers
* during a parallel seqscan. Technically these values do not need to be
* powers of 2, but having them as powers of 2 makes the math more optimal
* and makes the ramp-down stepping more even.
*/
/* The number of I/O chunks we try to break a parallel seqscan down into */
#define PARALLEL_SEQSCAN_NCHUNKS 2048
/* Ramp down size of allocations when we've only this number of chunks left */
#define PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS 64
/* Cap the size of parallel I/O chunks to this number of blocks */
#define PARALLEL_SEQSCAN_MAX_CHUNK_SIZE 8192
/* GUC variables */
char *default_table_access_method = DEFAULT_TABLE_ACCESS_METHOD;
......@@ -408,10 +422,37 @@ table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
* to set the startblock once.
*/
void
table_block_parallelscan_startblock_init(Relation rel, ParallelBlockTableScanDesc pbscan)
table_block_parallelscan_startblock_init(Relation rel,
ParallelBlockTableScanWorker pbscanwork,
ParallelBlockTableScanDesc pbscan)
{
BlockNumber sync_startpage = InvalidBlockNumber;
/* Reset the state we use for controlling allocation size. */
memset(pbscanwork, 0, sizeof(*pbscanwork));
StaticAssertStmt(MaxBlockNumber <= 0xFFFFFFFE,
"pg_nextpower2_32 may be too small for non-standard BlockNumber width");
/*
* We determine the chunk size based on the size of the relation. First we
* split the relation into PARALLEL_SEQSCAN_NCHUNKS chunks but we then
* take the next highest power of 2 number of the chunk size. This means
* we split the relation into somewhere between PARALLEL_SEQSCAN_NCHUNKS
* and PARALLEL_SEQSCAN_NCHUNKS / 2 chunks.
*/
pbscanwork->phsw_chunk_size = pg_nextpower2_32(Max(pbscan->phs_nblocks /
PARALLEL_SEQSCAN_NCHUNKS, 1));
/*
* Ensure we don't go over the maximum chunk size with larger tables. This
* means we may get much more than PARALLEL_SEQSCAN_NCHUNKS for larger
* tables. Too large a chunk size has been shown to be detrimental to
* synchronous scan performance.
*/
pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
retry:
/* Grab the spinlock. */
SpinLockAcquire(&pbscan->phs_mutex);
......@@ -451,13 +492,40 @@ retry:
* backend gets an InvalidBlockNumber return.
*/
BlockNumber
table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbscan)
table_block_parallelscan_nextpage(Relation rel,
ParallelBlockTableScanWorker pbscanwork,
ParallelBlockTableScanDesc pbscan)
{
BlockNumber page;
uint64 nallocated;
/*
* phs_nallocated tracks how many pages have been allocated to workers
* The logic below allocates block numbers out to parallel workers in a
* way that each worker will receive a set of consecutive block numbers to
* scan. Earlier versions of this would allocate the next highest block
* number to the next worker to call this function. This would generally
* result in workers never receiving consecutive block numbers. Some
* operating systems would not detect the sequential I/O pattern due to
* each backend being a different process which could result in poor
* performance due to inefficient or no readahead. To work around this
* issue, we now allocate a range of block numbers for each worker and
* when they come back for another block, we give them the next one in
* that range until the range is complete. When the worker completes the
* range of blocks we then allocate another range for it and return the
* first block number from that range.
*
* Here we name these ranges of blocks "chunks". The initial size of
* these chunks is determined in table_block_parallelscan_startblock_init
* based on the size of the relation. Towards the end of the scan, we
* start making reductions in the size of the chunks in order to attempt
* to divide the remaining work over all the workers as evenly as
* possible.
*
* Here pbscanwork is local worker memory. phsw_chunk_remaining tracks
* the number of blocks remaining in the chunk. When that reaches 0 then
* we must allocate a new chunk for the worker.
*
* phs_nallocated tracks how many blocks have been allocated to workers
* already. When phs_nallocated >= rs_nblocks, all blocks have been
* allocated.
*
......@@ -468,10 +536,50 @@ table_block_parallelscan_nextpage(Relation rel, ParallelBlockTableScanDesc pbsca
* wide because of that, to avoid wrapping around when rs_nblocks is close
* to 2^32.
*
* The actual page to return is calculated by adding the counter to the
* The actual block to return is calculated by adding the counter to the
* starting block number, modulo nblocks.
*/
nallocated = pg_atomic_fetch_add_u64(&pbscan->phs_nallocated, 1);
/*
* First check if we have any remaining blocks in a previous chunk for
* this worker. We must consume all of the blocks from that before we
* allocate a new chunk to the worker.
*/
if (pbscanwork->phsw_chunk_remaining > 0)
{
/*
* Give them the next block in the range and update the remaining
* number of blocks.
*/
nallocated = ++pbscanwork->phsw_nallocated;
pbscanwork->phsw_chunk_remaining--;
}
else
{
/*
* When we've only got PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS chunks
* remaining in the scan, we half the chunk size. Since we reduce the
* chunk size here, we'll hit this again after doing
* PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS at the new size. After a few
* iterations of this, we'll end up doing the last few blocks with the
* chunk size set to 1.
*/
if (pbscanwork->phsw_chunk_size > 1 &&
pbscanwork->phsw_nallocated > pbscan->phs_nblocks -
(pbscanwork->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS))
pbscanwork->phsw_chunk_size >>= 1;
nallocated = pbscanwork->phsw_nallocated =
pg_atomic_fetch_add_u64(&pbscan->phs_nallocated,
pbscanwork->phsw_chunk_size);
/*
* Set the remaining number of blocks in this chunk so that subsequent
* calls from this worker continue on with this chunk until it's done.
*/
pbscanwork->phsw_chunk_remaining = pbscanwork->phsw_chunk_size - 1;
}
if (nallocated >= pbscan->phs_nblocks)
page = InvalidBlockNumber; /* all blocks have been allocated */
else
......
......@@ -42,9 +42,9 @@ typedef struct TableScanDescData
*/
uint32 rs_flags;
void *rs_private; /* per-worker private memory for AM to use */
struct ParallelTableScanDescData *rs_parallel; /* parallel scan
* information */
} TableScanDescData;
typedef struct TableScanDescData *TableScanDesc;
......@@ -81,6 +81,18 @@ typedef struct ParallelBlockTableScanDescData
} ParallelBlockTableScanDescData;
typedef struct ParallelBlockTableScanDescData *ParallelBlockTableScanDesc;
/*
* Per backend state for parallel table scan, for block-oriented storage.
*/
typedef struct ParallelBlockTableScanWorkerData
{
uint64 phsw_nallocated; /* Current # of blocks into the scan */
uint32 phsw_chunk_remaining; /* # blocks left in this chunk */
uint32 phsw_chunk_size; /* The number of blocks to allocate in
* each I/O chunk for the scan */
} ParallelBlockTableScanWorkerData;
typedef struct ParallelBlockTableScanWorkerData *ParallelBlockTableScanWorker;
/*
* Base class for fetches from a table via an index. This is the base-class
* for such scans, which needs to be embedded in the respective struct for
......
......@@ -1793,8 +1793,10 @@ extern Size table_block_parallelscan_initialize(Relation rel,
extern void table_block_parallelscan_reinitialize(Relation rel,
ParallelTableScanDesc pscan);
extern BlockNumber table_block_parallelscan_nextpage(Relation rel,
ParallelBlockTableScanWorker pbscanwork,
ParallelBlockTableScanDesc pbscan);
extern void table_block_parallelscan_startblock_init(Relation rel,
ParallelBlockTableScanWorker pbscanwork,
ParallelBlockTableScanDesc pbscan);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment