Commit 7b4ac199 authored by Robert Haas's avatar Robert Haas

Extend index AM API for parallel index scans.

This patch doesn't actually make any index AM parallel-aware, but it
provides the necessary functions at the AM layer to do so.

Rahila Syed, Amit Kapila, Robert Haas
parent 587cda35
......@@ -138,6 +138,9 @@ blhandler(PG_FUNCTION_ARGS)
amroutine->amendscan = blendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
amroutine->amestimateparallelscan = NULL;
amroutine->aminitparallelscan = NULL;
amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
......
......@@ -131,6 +131,11 @@ typedef struct IndexAmRoutine
amendscan_function amendscan;
ammarkpos_function ammarkpos; /* can be NULL */
amrestrpos_function amrestrpos; /* can be NULL */
/* interface functions to support parallel index scans */
amestimateparallelscan_function amestimateparallelscan; /* can be NULL */
aminitparallelscan_function aminitparallelscan; /* can be NULL */
amparallelrescan_function amparallelrescan; /* can be NULL */
} IndexAmRoutine;
</programlisting>
</para>
......@@ -624,6 +629,68 @@ amrestrpos (IndexScanDesc scan);
the <structfield>amrestrpos</> field in its <structname>IndexAmRoutine</>
struct may be set to NULL.
</para>
<para>
In addition to supporting ordinary index scans, some types of index
may wish to support <firstterm>parallel index scans</>, which allow
multiple backends to cooperate in performing an index scan. The
index access method should arrange things so that each cooperating
process returns a subset of the tuples that would be performed by
an ordinary, non-parallel index scan, but in such a way that the
union of those subsets is equal to the set of tuples that would be
returned by an ordinary, non-parallel index scan. Furthermore, while
there need not be any global ordering of tuples returned by a parallel
scan, the ordering of that subset of tuples returned within each
cooperating backend must match the requested ordering. The following
functions may be implemented to support parallel index scans:
</para>
<para>
<programlisting>
Size
amestimateparallelscan (void);
</programlisting>
Estimate and return the number of bytes of dynamic shared memory which
the access method will be needed to perform a parallel scan. (This number
is in addition to, not in lieu of, the amount of space needed for
AM-independent data in <structname>ParallelIndexScanDescData</>.)
</para>
<para>
It is not necessary to implement this function for access methods which
do not support parallel scans or for which the number of additional bytes
of storage required is zero.
</para>
<para>
<programlisting>
void
aminitparallelscan (void *target);
</programlisting>
This function will be called to initialize dynamic shared memory at the
beginning of a parallel scan. <parameter>target</> will point to at least
the number of bytes previously returned by
<function>amestimateparallelscan</>, and this function may use that
amount of space to store whatever data it wishes.
</para>
<para>
It is not necessary to implement this function for access methods which
do not support parallel scans or in cases where the shared memory space
required needs no initialization.
</para>
<para>
<programlisting>
void
amparallelrescan (IndexScanDesc scan);
</programlisting>
This function, if implemented, will be called when a parallel index scan
must be restarted. It should reset any shared state set up by
<function>aminitparallelscan</> such that the scan will be restarted from
the beginning.
</para>
</sect1>
<sect1 id="index-scanning">
......
......@@ -112,6 +112,9 @@ brinhandler(PG_FUNCTION_ARGS)
amroutine->amendscan = brinendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
amroutine->amestimateparallelscan = NULL;
amroutine->aminitparallelscan = NULL;
amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
......
......@@ -68,6 +68,9 @@ ginhandler(PG_FUNCTION_ARGS)
amroutine->amendscan = ginendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
amroutine->amestimateparallelscan = NULL;
amroutine->aminitparallelscan = NULL;
amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
......
......@@ -89,6 +89,9 @@ gisthandler(PG_FUNCTION_ARGS)
amroutine->amendscan = gistendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
amroutine->amestimateparallelscan = NULL;
amroutine->aminitparallelscan = NULL;
amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
......
......@@ -86,6 +86,9 @@ hashhandler(PG_FUNCTION_ARGS)
amroutine->amendscan = hashendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
amroutine->amestimateparallelscan = NULL;
amroutine->aminitparallelscan = NULL;
amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
......
......@@ -20,6 +20,10 @@
* index_insert - insert an index tuple into a relation
* index_markpos - mark a scan position
* index_restrpos - restore a scan position
* index_parallelscan_estimate - estimate shared memory for parallel scan
* index_parallelscan_initialize - initialize parallel scan
* index_parallelrescan - (re)start a parallel scan of an index
* index_beginscan_parallel - join parallel index scan
* index_getnext_tid - get the next TID from a scan
* index_fetch_heap - get the scan's next heap tuple
* index_getnext - get the next heap tuple from a scan
......@@ -120,7 +124,8 @@ do { \
} while(0)
static IndexScanDesc index_beginscan_internal(Relation indexRelation,
int nkeys, int norderbys, Snapshot snapshot);
int nkeys, int norderbys, Snapshot snapshot,
ParallelIndexScanDesc pscan, bool temp_snap);
/* ----------------------------------------------------------------
......@@ -219,7 +224,7 @@ index_beginscan(Relation heapRelation,
{
IndexScanDesc scan;
scan = index_beginscan_internal(indexRelation, nkeys, norderbys, snapshot);
scan = index_beginscan_internal(indexRelation, nkeys, norderbys, snapshot, NULL, false);
/*
* Save additional parameters into the scandesc. Everything else was set
......@@ -244,7 +249,7 @@ index_beginscan_bitmap(Relation indexRelation,
{
IndexScanDesc scan;
scan = index_beginscan_internal(indexRelation, nkeys, 0, snapshot);
scan = index_beginscan_internal(indexRelation, nkeys, 0, snapshot, NULL, false);
/*
* Save additional parameters into the scandesc. Everything else was set
......@@ -260,8 +265,11 @@ index_beginscan_bitmap(Relation indexRelation,
*/
static IndexScanDesc
index_beginscan_internal(Relation indexRelation,
int nkeys, int norderbys, Snapshot snapshot)
int nkeys, int norderbys, Snapshot snapshot,
ParallelIndexScanDesc pscan, bool temp_snap)
{
IndexScanDesc scan;
RELATION_CHECKS;
CHECK_REL_PROCEDURE(ambeginscan);
......@@ -276,8 +284,13 @@ index_beginscan_internal(Relation indexRelation,
/*
* Tell the AM to open a scan.
*/
return indexRelation->rd_amroutine->ambeginscan(indexRelation, nkeys,
scan = indexRelation->rd_amroutine->ambeginscan(indexRelation, nkeys,
norderbys);
/* Initialize information for parallel scan. */
scan->parallel_scan = pscan;
scan->xs_temp_snap = temp_snap;
return scan;
}
/* ----------------
......@@ -341,6 +354,9 @@ index_endscan(IndexScanDesc scan)
/* Release index refcount acquired by index_beginscan */
RelationDecrementReferenceCount(scan->indexRelation);
if (scan->xs_temp_snap)
UnregisterSnapshot(scan->xs_snapshot);
/* Release the scan data structure itself */
IndexScanEnd(scan);
}
......@@ -389,6 +405,115 @@ index_restrpos(IndexScanDesc scan)
scan->indexRelation->rd_amroutine->amrestrpos(scan);
}
/*
* index_parallelscan_estimate - estimate shared memory for parallel scan
*
* Currently, we don't pass any information to the AM-specific estimator,
* so it can probably only return a constant. In the future, we might need
* to pass more information.
*/
Size
index_parallelscan_estimate(Relation indexRelation, Snapshot snapshot)
{
Size nbytes;
RELATION_CHECKS;
nbytes = offsetof(ParallelIndexScanDescData, ps_snapshot_data);
nbytes = add_size(nbytes, EstimateSnapshotSpace(snapshot));
nbytes = MAXALIGN(nbytes);
/*
* If amestimateparallelscan is not provided, assume there is no
* AM-specific data needed. (It's hard to believe that could work, but
* it's easy enough to cater to it here.)
*/
if (indexRelation->rd_amroutine->amestimateparallelscan != NULL)
nbytes = add_size(nbytes,
indexRelation->rd_amroutine->amestimateparallelscan());
return nbytes;
}
/*
* index_parallelscan_initialize - initialize parallel scan
*
* We initialize both the ParallelIndexScanDesc proper and the AM-specific
* information which follows it.
*
* This function calls access method specific initialization routine to
* initialize am specific information. Call this just once in the leader
* process; then, individual workers attach via index_beginscan_parallel.
*/
void
index_parallelscan_initialize(Relation heapRelation, Relation indexRelation,
Snapshot snapshot, ParallelIndexScanDesc target)
{
Size offset;
RELATION_CHECKS;
offset = add_size(offsetof(ParallelIndexScanDescData, ps_snapshot_data),
EstimateSnapshotSpace(snapshot));
offset = MAXALIGN(offset);
target->ps_relid = RelationGetRelid(heapRelation);
target->ps_indexid = RelationGetRelid(indexRelation);
target->ps_offset = offset;
SerializeSnapshot(snapshot, target->ps_snapshot_data);
/* aminitparallelscan is optional; assume no-op if not provided by AM */
if (indexRelation->rd_amroutine->aminitparallelscan != NULL)
{
void *amtarget;
amtarget = OffsetToPointer(target, offset);
indexRelation->rd_amroutine->aminitparallelscan(amtarget);
}
}
/* ----------------
* index_parallelrescan - (re)start a parallel scan of an index
* ----------------
*/
void
index_parallelrescan(IndexScanDesc scan)
{
SCAN_CHECKS;
/* amparallelrescan is optional; assume no-op if not provided by AM */
if (scan->indexRelation->rd_amroutine->amparallelrescan != NULL)
scan->indexRelation->rd_amroutine->amparallelrescan(scan);
}
/*
* index_beginscan_parallel - join parallel index scan
*
* Caller must be holding suitable locks on the heap and the index.
*/
IndexScanDesc
index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys,
int norderbys, ParallelIndexScanDesc pscan)
{
Snapshot snapshot;
IndexScanDesc scan;
Assert(RelationGetRelid(heaprel) == pscan->ps_relid);
snapshot = RestoreSnapshot(pscan->ps_snapshot_data);
RegisterSnapshot(snapshot);
scan = index_beginscan_internal(indexrel, nkeys, norderbys, snapshot,
pscan, true);
/*
* Save additional parameters into the scandesc. Everything else was set
* up by index_beginscan_internal.
*/
scan->heapRelation = heaprel;
scan->xs_snapshot = snapshot;
return scan;
}
/* ----------------
* index_getnext_tid - get the next TID from a scan
*
......
......@@ -118,6 +118,9 @@ bthandler(PG_FUNCTION_ARGS)
amroutine->amendscan = btendscan;
amroutine->ammarkpos = btmarkpos;
amroutine->amrestrpos = btrestrpos;
amroutine->amestimateparallelscan = NULL;
amroutine->aminitparallelscan = NULL;
amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
......
......@@ -68,6 +68,9 @@ spghandler(PG_FUNCTION_ARGS)
amroutine->amendscan = spgendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
amroutine->amestimateparallelscan = NULL;
amroutine->aminitparallelscan = NULL;
amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
......
......@@ -137,6 +137,18 @@ typedef void (*ammarkpos_function) (IndexScanDesc scan);
/* restore marked scan position */
typedef void (*amrestrpos_function) (IndexScanDesc scan);
/*
* Callback function signatures - for parallel index scans.
*/
/* estimate size of parallel scan descriptor */
typedef Size (*amestimateparallelscan_function) (void);
/* prepare for parallel index scan */
typedef void (*aminitparallelscan_function) (void *target);
/* (re)start parallel index scan */
typedef void (*amparallelrescan_function) (IndexScanDesc scan);
/*
* API struct for an index AM. Note this must be stored in a single palloc'd
......@@ -196,6 +208,11 @@ typedef struct IndexAmRoutine
amendscan_function amendscan;
ammarkpos_function ammarkpos; /* can be NULL */
amrestrpos_function amrestrpos; /* can be NULL */
/* interface functions to support parallel index scans */
amestimateparallelscan_function amestimateparallelscan; /* can be NULL */
aminitparallelscan_function aminitparallelscan; /* can be NULL */
amparallelrescan_function amparallelrescan; /* can be NULL */
} IndexAmRoutine;
......
......@@ -83,6 +83,8 @@ typedef bool (*IndexBulkDeleteCallback) (ItemPointer itemptr, void *state);
typedef struct IndexScanDescData *IndexScanDesc;
typedef struct SysScanDescData *SysScanDesc;
typedef struct ParallelIndexScanDescData *ParallelIndexScanDesc;
/*
* Enumeration specifying the type of uniqueness check to perform in
* index_insert().
......@@ -144,6 +146,13 @@ extern void index_rescan(IndexScanDesc scan,
extern void index_endscan(IndexScanDesc scan);
extern void index_markpos(IndexScanDesc scan);
extern void index_restrpos(IndexScanDesc scan);
extern Size index_parallelscan_estimate(Relation indexrel, Snapshot snapshot);
extern void index_parallelscan_initialize(Relation heaprel, Relation indexrel,
Snapshot snapshot, ParallelIndexScanDesc target);
extern void index_parallelrescan(IndexScanDesc scan);
extern IndexScanDesc index_beginscan_parallel(Relation heaprel,
Relation indexrel, int nkeys, int norderbys,
ParallelIndexScanDesc pscan);
extern ItemPointer index_getnext_tid(IndexScanDesc scan,
ScanDirection direction);
extern HeapTuple index_fetch_heap(IndexScanDesc scan);
......
......@@ -93,6 +93,7 @@ typedef struct IndexScanDescData
ScanKey keyData; /* array of index qualifier descriptors */
ScanKey orderByData; /* array of ordering op descriptors */
bool xs_want_itup; /* caller requests index tuples */
bool xs_temp_snap; /* unregister snapshot at scan end? */
/* signaling to index AM about killing index tuples */
bool kill_prior_tuple; /* last-returned tuple is dead */
......@@ -126,8 +127,20 @@ typedef struct IndexScanDescData
/* state data for traversing HOT chains in index_getnext */
bool xs_continue_hot; /* T if must keep walking HOT chain */
/* parallel index scan information, in shared memory */
ParallelIndexScanDesc parallel_scan;
} IndexScanDescData;
/* Generic structure for parallel scans */
typedef struct ParallelIndexScanDescData
{
Oid ps_relid;
Oid ps_indexid;
Size ps_offset; /* Offset in bytes of am specific structure */
char ps_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
} ParallelIndexScanDescData;
/* Struct for heap-or-index scans of system tables */
typedef struct SysScanDescData
{
......
......@@ -527,6 +527,9 @@ typedef NameData *Name;
#define PointerIsAligned(pointer, type) \
(((uintptr_t)(pointer) % (sizeof (type))) == 0)
#define OffsetToPointer(base, offset) \
((void *)((char *) base + offset))
#define OidIsValid(objectId) ((bool) ((objectId) != InvalidOid))
#define RegProcedureIsValid(p) OidIsValid(p)
......
......@@ -1264,6 +1264,8 @@ OverrideSearchPath
OverrideStackEntry
PACE_HEADER
PACL
ParallelIndexScanDesc
ParallelIndexScanDescData
PATH
PBOOL
PCtxtHandle
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment