Commit ee7ca559 authored by Robert Haas's avatar Robert Haas

Add a C API for parallel heap scans.

Using this API, one backend can set up a ParallelHeapScanDesc to
which multiple backends can then attach.  Each tuple in the relation
will be returned to exactly one of the scanning backends.  Only
forward scans are supported, and rescans must be carefully
coordinated.

This is not exposed to the planner or executor yet.

The original version of this code was written by me.  Amit Kapila
reviewed it, tested it, and improved it, including adding support for
synchronized scans, per review comments from Jeff Davis.  Extensive
testing of this and related patches was performed by Haribabu Kommi.
Final cleanup of this patch by me.
parent b0b0d84b
......@@ -63,6 +63,7 @@
#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "storage/spin.h"
#include "storage/standby.h"
#include "utils/datum.h"
#include "utils/inval.h"
......@@ -80,12 +81,14 @@ bool synchronize_seqscans = true;
static HeapScanDesc heap_beginscan_internal(Relation relation,
Snapshot snapshot,
int nkeys, ScanKey key,
ParallelHeapScanDesc parallel_scan,
bool allow_strat,
bool allow_sync,
bool allow_pagemode,
bool is_bitmapscan,
bool is_samplescan,
bool temp_snap);
static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
TransactionId xid, CommandId cid, int options);
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
......@@ -226,7 +229,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* results for a non-MVCC snapshot, the caller must hold some higher-level
* lock that ensures the interesting tuple(s) won't change.)
*/
scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
if (scan->rs_parallel != NULL)
scan->rs_nblocks = scan->rs_parallel->phs_nblocks;
else
scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
/*
* If the table is large relative to NBuffers, use a bulk-read access
......@@ -237,7 +243,8 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* behaviors, independently of the size of the table; also there is a GUC
* variable that can disable synchronized scanning.)
*
* During a rescan, don't make a new strategy object if we don't have to.
* Note that heap_parallelscan_initialize has a very similar test; if you
* change this, consider changing that one, too.
*/
if (!RelationUsesLocalBuffers(scan->rs_rd) &&
scan->rs_nblocks > NBuffers / 4)
......@@ -250,6 +257,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
if (allow_strat)
{
/* During a rescan, keep the previous strategy object. */
if (scan->rs_strategy == NULL)
scan->rs_strategy = GetAccessStrategy(BAS_BULKREAD);
}
......@@ -260,7 +268,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
scan->rs_strategy = NULL;
}
if (keep_startblock)
if (scan->rs_parallel != NULL)
{
/* For parallel scan, believe whatever ParallelHeapScanDesc says. */
scan->rs_syncscan = scan->rs_parallel->phs_syncscan;
}
else if (keep_startblock)
{
/*
* When rescanning, we want to keep the previous startblock setting,
......@@ -496,7 +509,20 @@ heapgettup(HeapScanDesc scan,
tuple->t_data = NULL;
return;
}
page = scan->rs_startblock; /* first page */
if (scan->rs_parallel != NULL)
{
page = heap_parallelscan_nextpage(scan);
/* Other processes might have already finished the scan. */
if (page == InvalidBlockNumber)
{
Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
}
else
page = scan->rs_startblock; /* first page */
heapgetpage(scan, page);
lineoff = FirstOffsetNumber; /* first offnum */
scan->rs_inited = true;
......@@ -519,6 +545,9 @@ heapgettup(HeapScanDesc scan,
}
else if (backward)
{
/* backward parallel scan not supported */
Assert(scan->rs_parallel == NULL);
if (!scan->rs_inited)
{
/*
......@@ -669,6 +698,11 @@ heapgettup(HeapScanDesc scan,
page = scan->rs_nblocks;
page--;
}
else if (scan->rs_parallel != NULL)
{
page = heap_parallelscan_nextpage(scan);
finished = (page == InvalidBlockNumber);
}
else
{
page++;
......@@ -773,7 +807,20 @@ heapgettup_pagemode(HeapScanDesc scan,
tuple->t_data = NULL;
return;
}
page = scan->rs_startblock; /* first page */
if (scan->rs_parallel != NULL)
{
page = heap_parallelscan_nextpage(scan);
/* Other processes might have already finished the scan. */
if (page == InvalidBlockNumber)
{
Assert(!BufferIsValid(scan->rs_cbuf));
tuple->t_data = NULL;
return;
}
}
else
page = scan->rs_startblock; /* first page */
heapgetpage(scan, page);
lineindex = 0;
scan->rs_inited = true;
......@@ -793,6 +840,9 @@ heapgettup_pagemode(HeapScanDesc scan,
}
else if (backward)
{
/* backward parallel scan not supported */
Assert(scan->rs_parallel == NULL);
if (!scan->rs_inited)
{
/*
......@@ -932,6 +982,11 @@ heapgettup_pagemode(HeapScanDesc scan,
page = scan->rs_nblocks;
page--;
}
else if (scan->rs_parallel != NULL)
{
page = heap_parallelscan_nextpage(scan);
finished = (page == InvalidBlockNumber);
}
else
{
page++;
......@@ -1341,7 +1396,7 @@ HeapScanDesc
heap_beginscan(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key)
{
return heap_beginscan_internal(relation, snapshot, nkeys, key,
return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
true, true, true, false, false, false);
}
......@@ -1351,7 +1406,7 @@ heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
Oid relid = RelationGetRelid(relation);
Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
return heap_beginscan_internal(relation, snapshot, nkeys, key,
return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
true, true, true, false, false, true);
}
......@@ -1360,7 +1415,7 @@ heap_beginscan_strat(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key,
bool allow_strat, bool allow_sync)
{
return heap_beginscan_internal(relation, snapshot, nkeys, key,
return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
allow_strat, allow_sync, true,
false, false, false);
}
......@@ -1369,7 +1424,7 @@ HeapScanDesc
heap_beginscan_bm(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key)
{
return heap_beginscan_internal(relation, snapshot, nkeys, key,
return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
false, false, true, true, false, false);
}
......@@ -1378,7 +1433,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key,
bool allow_strat, bool allow_sync, bool allow_pagemode)
{
return heap_beginscan_internal(relation, snapshot, nkeys, key,
return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
allow_strat, allow_sync, allow_pagemode,
false, true, false);
}
......@@ -1386,6 +1441,7 @@ heap_beginscan_sampling(Relation relation, Snapshot snapshot,
static HeapScanDesc
heap_beginscan_internal(Relation relation, Snapshot snapshot,
int nkeys, ScanKey key,
ParallelHeapScanDesc parallel_scan,
bool allow_strat,
bool allow_sync,
bool allow_pagemode,
......@@ -1418,6 +1474,7 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
scan->rs_allow_strat = allow_strat;
scan->rs_allow_sync = allow_sync;
scan->rs_temp_snap = temp_snap;
scan->rs_parallel = parallel_scan;
/*
* we can use page-at-a-time mode if it's an MVCC-safe snapshot
......@@ -1473,6 +1530,25 @@ heap_rescan(HeapScanDesc scan,
* reinitialize scan descriptor
*/
initscan(scan, key, true);
/*
* reset parallel scan, if present
*/
if (scan->rs_parallel != NULL)
{
ParallelHeapScanDesc parallel_scan;
/*
* Caller is responsible for making sure that all workers have
* finished the scan before calling this, so it really shouldn't be
* necessary to acquire the mutex at all. We acquire it anyway, just
* to be tidy.
*/
parallel_scan = scan->rs_parallel;
SpinLockAcquire(&parallel_scan->phs_mutex);
parallel_scan->phs_cblock = parallel_scan->phs_startblock;
SpinLockRelease(&parallel_scan->phs_mutex);
}
}
/* ----------------
......@@ -1531,6 +1607,154 @@ heap_endscan(HeapScanDesc scan)
pfree(scan);
}
/* ----------------
* heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc
*
* Sadly, this doesn't reduce to a constant, because the size required
* to serialize the snapshot can vary.
* ----------------
*/
Size
heap_parallelscan_estimate(Snapshot snapshot)
{
return add_size(offsetof(ParallelHeapScanDescData, phs_snapshot_data),
EstimateSnapshotSpace(snapshot));
}
/* ----------------
* heap_parallelscan_initialize - initialize ParallelHeapScanDesc
*
* Must allow as many bytes of shared memory as returned by
* heap_parallelscan_estimate. Call this just once in the leader
* process; then, individual workers attach via heap_beginscan_parallel.
* ----------------
*/
void
heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
Snapshot snapshot)
{
target->phs_relid = RelationGetRelid(relation);
target->phs_nblocks = RelationGetNumberOfBlocks(relation);
/* compare phs_syncscan initialization to similar logic in initscan */
target->phs_syncscan = synchronize_seqscans &&
!RelationUsesLocalBuffers(relation) &&
target->phs_nblocks > NBuffers / 4;
SpinLockInit(&target->phs_mutex);
target->phs_cblock = InvalidBlockNumber;
target->phs_startblock = InvalidBlockNumber;
SerializeSnapshot(snapshot, target->phs_snapshot_data);
}
/* ----------------
* heap_beginscan_parallel - join a parallel scan
*
* Caller must hold a suitable lock on the correct relation.
* ----------------
*/
HeapScanDesc
heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
{
Snapshot snapshot;
Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
RegisterSnapshot(snapshot);
return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
true, true, true, false, false, true);
}
/* ----------------
* heap_parallelscan_nextpage - get the next page to scan
*
* Get the next page to scan. Even if there are no pages left to scan,
* another backend could have grabbed a page to scan and not yet finished
* looking at it, so it doesn't follow that the scan is done when the
* first backend gets an InvalidBlockNumber return.
* ----------------
*/
static BlockNumber
heap_parallelscan_nextpage(HeapScanDesc scan)
{
BlockNumber page = InvalidBlockNumber;
BlockNumber sync_startpage = InvalidBlockNumber;
BlockNumber report_page = InvalidBlockNumber;
ParallelHeapScanDesc parallel_scan;
Assert(scan->rs_parallel);
parallel_scan = scan->rs_parallel;
retry:
/* Grab the spinlock. */
SpinLockAcquire(&parallel_scan->phs_mutex);
/*
* If the scan's startblock has not yet been initialized, we must do so
* now. If this is not a synchronized scan, we just start at block 0, but
* if it is a synchronized scan, we must get the starting position from
* the synchronized scan machinery. We can't hold the spinlock while
* doing that, though, so release the spinlock, get the information we
* need, and retry. If nobody else has initialized the scan in the
* meantime, we'll fill in the value we fetched on the second time
* through.
*/
if (parallel_scan->phs_startblock == InvalidBlockNumber)
{
if (!parallel_scan->phs_syncscan)
parallel_scan->phs_startblock = 0;
else if (sync_startpage != InvalidBlockNumber)
parallel_scan->phs_startblock = sync_startpage;
else
{
SpinLockRelease(&parallel_scan->phs_mutex);
sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
goto retry;
}
parallel_scan->phs_cblock = parallel_scan->phs_startblock;
}
/*
* The current block number is the next one that needs to be scanned,
* unless it's InvalidBlockNumber already, in which case there are no more
* blocks to scan. After remembering the current value, we must advance
* it so that the next call to this function returns the next block to be
* scanned.
*/
page = parallel_scan->phs_cblock;
if (page != InvalidBlockNumber)
{
parallel_scan->phs_cblock++;
if (parallel_scan->phs_cblock >= scan->rs_nblocks)
parallel_scan->phs_cblock = 0;
if (parallel_scan->phs_cblock == parallel_scan->phs_startblock)
{
parallel_scan->phs_cblock = InvalidBlockNumber;
report_page = parallel_scan->phs_startblock;
}
}
/* Release the lock. */
SpinLockRelease(&parallel_scan->phs_mutex);
/*
* Report scan location. Normally, we report the current page number.
* When we reach the end of the scan, though, we report the starting page,
* not the ending page, just so the starting positions for later scans
* doesn't slew backwards. We only report the position at the end of the
* scan once, though: subsequent callers will have report nothing, since
* they will have page == InvalidBlockNumber.
*/
if (scan->rs_syncscan)
{
if (report_page == InvalidBlockNumber)
report_page = page;
if (report_page != InvalidBlockNumber)
ss_report_location(scan->rs_rd, report_page);
}
return page;
}
/* ----------------
* heap_getnext - retrieve next tuple in scan
*
......
......@@ -96,8 +96,9 @@ extern Relation heap_openrv_extended(const RangeVar *relation,
#define heap_close(r,l) relation_close(r,l)
/* struct definition appears in relscan.h */
/* struct definitions appear in relscan.h */
typedef struct HeapScanDescData *HeapScanDesc;
typedef struct ParallelHeapScanDescData *ParallelHeapScanDesc;
/*
* HeapScanIsValid
......@@ -126,6 +127,11 @@ extern void heap_rescan_set_params(HeapScanDesc scan, ScanKey key,
extern void heap_endscan(HeapScanDesc scan);
extern HeapTuple heap_getnext(HeapScanDesc scan, ScanDirection direction);
extern Size heap_parallelscan_estimate(Snapshot snapshot);
extern void heap_parallelscan_initialize(ParallelHeapScanDesc target,
Relation relation, Snapshot snapshot);
extern HeapScanDesc heap_beginscan_parallel(Relation, ParallelHeapScanDesc);
extern bool heap_fetch(Relation relation, Snapshot snapshot,
HeapTuple tuple, Buffer *userbuf, bool keep_buf,
Relation stats_relation);
......
......@@ -20,6 +20,25 @@
#include "access/itup.h"
#include "access/tupdesc.h"
/*
* Shared state for parallel heap scan.
*
* Each backend participating in a parallel heap scan has its own
* HeapScanDesc in backend-private memory, and those objects all contain
* a pointer to this structure. The information here must be sufficient
* to properly initialize each new HeapScanDesc as workers join the scan,
* and it must act as a font of block numbers for those workers.
*/
typedef struct ParallelHeapScanDescData
{
Oid phs_relid; /* OID of relation to scan */
bool phs_syncscan; /* report location to syncscan logic? */
BlockNumber phs_nblocks; /* # blocks in relation at start of scan */
slock_t phs_mutex; /* mutual exclusion for block number fields */
BlockNumber phs_startblock; /* starting block number */
BlockNumber phs_cblock; /* current block number */
char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
} ParallelHeapScanDescData;
typedef struct HeapScanDescData
{
......@@ -49,6 +68,7 @@ typedef struct HeapScanDescData
BlockNumber rs_cblock; /* current block # in scan, if any */
Buffer rs_cbuf; /* current buffer in scan, if any */
/* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */
ParallelHeapScanDesc rs_parallel; /* parallel scan information */
/* these fields only used in page-at-a-time mode and for bitmap scans */
int rs_cindex; /* current tuple's index in vistuples */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment