Commit 3cda10f4 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Use atomic ops to hand out pages to scan in parallel scan.

With a lot of CPUs, the spinlock that protects the current scan location
in a parallel scan can become a bottleneck. Use an atomic fetch-and-add
instruction instead.

David Rowley

Discussion: https://www.postgresql.org/message-id/CAKJS1f9tgsPhqBcoPjv9_KUPZvTLCZ4jy%3DB%3DbhqgaKn7cYzm-w@mail.gmail.com
parent 0c504a80
...@@ -58,6 +58,7 @@ ...@@ -58,6 +58,7 @@
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
#include "port/atomics.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/freespace.h" #include "storage/freespace.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
...@@ -89,6 +90,7 @@ static HeapScanDesc heap_beginscan_internal(Relation relation, ...@@ -89,6 +90,7 @@ static HeapScanDesc heap_beginscan_internal(Relation relation,
bool is_bitmapscan, bool is_bitmapscan,
bool is_samplescan, bool is_samplescan,
bool temp_snap); bool temp_snap);
static void heap_parallelscan_startblock_init(HeapScanDesc scan);
static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan); static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
TransactionId xid, CommandId cid, int options); TransactionId xid, CommandId cid, int options);
...@@ -510,6 +512,8 @@ heapgettup(HeapScanDesc scan, ...@@ -510,6 +512,8 @@ heapgettup(HeapScanDesc scan,
} }
if (scan->rs_parallel != NULL) if (scan->rs_parallel != NULL)
{ {
heap_parallelscan_startblock_init(scan);
page = heap_parallelscan_nextpage(scan); page = heap_parallelscan_nextpage(scan);
/* Other processes might have already finished the scan. */ /* Other processes might have already finished the scan. */
...@@ -812,6 +816,8 @@ heapgettup_pagemode(HeapScanDesc scan, ...@@ -812,6 +816,8 @@ heapgettup_pagemode(HeapScanDesc scan,
} }
if (scan->rs_parallel != NULL) if (scan->rs_parallel != NULL)
{ {
heap_parallelscan_startblock_init(scan);
page = heap_parallelscan_nextpage(scan); page = heap_parallelscan_nextpage(scan);
/* Other processes might have already finished the scan. */ /* Other processes might have already finished the scan. */
...@@ -1535,14 +1541,10 @@ heap_rescan(HeapScanDesc scan, ...@@ -1535,14 +1541,10 @@ heap_rescan(HeapScanDesc scan,
/* /*
* Caller is responsible for making sure that all workers have * Caller is responsible for making sure that all workers have
* finished the scan before calling this, so it really shouldn't be * finished the scan before calling this.
* necessary to acquire the mutex at all. We acquire it anyway, just
* to be tidy.
*/ */
parallel_scan = scan->rs_parallel; parallel_scan = scan->rs_parallel;
SpinLockAcquire(&parallel_scan->phs_mutex); pg_atomic_write_u64(&parallel_scan->phs_nallocated, 0);
parallel_scan->phs_cblock = parallel_scan->phs_startblock;
SpinLockRelease(&parallel_scan->phs_mutex);
} }
} }
...@@ -1635,8 +1637,8 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, ...@@ -1635,8 +1637,8 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
!RelationUsesLocalBuffers(relation) && !RelationUsesLocalBuffers(relation) &&
target->phs_nblocks > NBuffers / 4; target->phs_nblocks > NBuffers / 4;
SpinLockInit(&target->phs_mutex); SpinLockInit(&target->phs_mutex);
target->phs_cblock = InvalidBlockNumber;
target->phs_startblock = InvalidBlockNumber; target->phs_startblock = InvalidBlockNumber;
pg_atomic_write_u64(&target->phs_nallocated, 0);
SerializeSnapshot(snapshot, target->phs_snapshot_data); SerializeSnapshot(snapshot, target->phs_snapshot_data);
} }
...@@ -1660,20 +1662,17 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan) ...@@ -1660,20 +1662,17 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
} }
/* ---------------- /* ----------------
* heap_parallelscan_nextpage - get the next page to scan * heap_parallelscan_startblock_init - find and set the scan's startblock
* *
* Get the next page to scan. Even if there are no pages left to scan, * Determine where the parallel seq scan should start. This function may
* another backend could have grabbed a page to scan and not yet finished * be called many times, once by each parallel worker. We must be careful
* looking at it, so it doesn't follow that the scan is done when the * only to set the startblock once.
* first backend gets an InvalidBlockNumber return.
* ---------------- * ----------------
*/ */
static BlockNumber static void
heap_parallelscan_nextpage(HeapScanDesc scan) heap_parallelscan_startblock_init(HeapScanDesc scan)
{ {
BlockNumber page = InvalidBlockNumber;
BlockNumber sync_startpage = InvalidBlockNumber; BlockNumber sync_startpage = InvalidBlockNumber;
BlockNumber report_page = InvalidBlockNumber;
ParallelHeapScanDesc parallel_scan; ParallelHeapScanDesc parallel_scan;
Assert(scan->rs_parallel); Assert(scan->rs_parallel);
...@@ -1705,46 +1704,63 @@ retry: ...@@ -1705,46 +1704,63 @@ retry:
sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks); sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
goto retry; goto retry;
} }
parallel_scan->phs_cblock = parallel_scan->phs_startblock;
} }
SpinLockRelease(&parallel_scan->phs_mutex);
}
/* /* ----------------
* The current block number is the next one that needs to be scanned, * heap_parallelscan_nextpage - get the next page to scan
* unless it's InvalidBlockNumber already, in which case there are no more *
* blocks to scan. After remembering the current value, we must advance * Get the next page to scan. Even if there are no pages left to scan,
* it so that the next call to this function returns the next block to be * another backend could have grabbed a page to scan and not yet finished
* scanned. * looking at it, so it doesn't follow that the scan is done when the
* first backend gets an InvalidBlockNumber return.
* ----------------
*/ */
page = parallel_scan->phs_cblock; static BlockNumber
if (page != InvalidBlockNumber) heap_parallelscan_nextpage(HeapScanDesc scan)
{ {
parallel_scan->phs_cblock++; BlockNumber page;
if (parallel_scan->phs_cblock >= scan->rs_nblocks) ParallelHeapScanDesc parallel_scan;
parallel_scan->phs_cblock = 0; uint64 nallocated;
if (parallel_scan->phs_cblock == parallel_scan->phs_startblock)
{
parallel_scan->phs_cblock = InvalidBlockNumber;
report_page = parallel_scan->phs_startblock;
}
}
/* Release the lock. */ Assert(scan->rs_parallel);
SpinLockRelease(&parallel_scan->phs_mutex); parallel_scan = scan->rs_parallel;
/*
* phs_nallocated tracks how many pages have been allocated to workers
* already. When phs_nallocated >= rs_nblocks, all blocks have been
* allocated.
*
* Because we use an atomic fetch-and-add to fetch the current value, the
* phs_nallocated counter will exceed rs_nblocks, because workers will
* still increment the value, when they try to allocate the next block but
* all blocks have been allocated already. The counter must be 64 bits
* wide because of that, to avoid wrapping around when rs_nblocks is close
* to 2^32.
*
* The actual page to return is calculated by adding the counter to the
* starting block number, modulo nblocks.
*/
nallocated = pg_atomic_fetch_add_u64(&parallel_scan->phs_nallocated, 1);
if (nallocated >= scan->rs_nblocks)
page = InvalidBlockNumber; /* all blocks have been allocated */
else
page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks;
/* /*
* Report scan location. Normally, we report the current page number. * Report scan location. Normally, we report the current page number.
* When we reach the end of the scan, though, we report the starting page, * When we reach the end of the scan, though, we report the starting page,
* not the ending page, just so the starting positions for later scans * not the ending page, just so the starting positions for later scans
* doesn't slew backwards. We only report the position at the end of the * doesn't slew backwards. We only report the position at the end of the
* scan once, though: subsequent callers will have report nothing, since * scan once, though: subsequent callers will report nothing.
* they will have page == InvalidBlockNumber.
*/ */
if (scan->rs_syncscan) if (scan->rs_syncscan)
{ {
if (report_page == InvalidBlockNumber) if (page != InvalidBlockNumber)
report_page = page; ss_report_location(scan->rs_rd, page);
if (report_page != InvalidBlockNumber) else if (nallocated == scan->rs_nblocks)
ss_report_location(scan->rs_rd, report_page); ss_report_location(scan->rs_rd, parallel_scan->phs_startblock);
} }
return page; return page;
......
...@@ -35,9 +35,10 @@ typedef struct ParallelHeapScanDescData ...@@ -35,9 +35,10 @@ typedef struct ParallelHeapScanDescData
Oid phs_relid; /* OID of relation to scan */ Oid phs_relid; /* OID of relation to scan */
bool phs_syncscan; /* report location to syncscan logic? */ bool phs_syncscan; /* report location to syncscan logic? */
BlockNumber phs_nblocks; /* # blocks in relation at start of scan */ BlockNumber phs_nblocks; /* # blocks in relation at start of scan */
slock_t phs_mutex; /* mutual exclusion for block number fields */ slock_t phs_mutex; /* mutual exclusion for setting startblock */
BlockNumber phs_startblock; /* starting block number */ BlockNumber phs_startblock; /* starting block number */
BlockNumber phs_cblock; /* current block number */ pg_atomic_uint64 phs_nallocated; /* number of blocks allocated to
* workers so far. */
char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER]; char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
} ParallelHeapScanDescData; } ParallelHeapScanDescData;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment