Commit 9da0cc35 authored by Robert Haas's avatar Robert Haas

Support parallel btree index builds.

To make this work, tuplesort.c and logtape.c must also support
parallelism, so this patch adds that infrastructure and then applies
it to the particular case of parallel btree index builds.  Testing
to date shows that this can often be 2-3x faster than a serial
index build.

The model for deciding how many workers to use is fairly primitive
at present, but it's better than not having the feature.  We can
refine it as we get more experience.

Peter Geoghegan with some help from Rushabh Lathia.  While Heikki
Linnakangas is not an author of this patch, he wrote other patches
without which this feature would not have been possible, and
therefore the release notes should possibly credit him as an author
of this feature.  Reviewed by Claudio Freire, Heikki Linnakangas,
Thomas Munro, Tels, Amit Kapila, me.

Discussion: http://postgr.es/m/CAM3SWZQKM=Pzc=CAHzRixKjp2eO5Q0Jg1SoFQqeXFQ647JiwqQ@mail.gmail.com
Discussion: http://postgr.es/m/CAH2-Wz=AxWqDoVvGU7dq856S4r6sJAj6DBn7VMtigkB33N5eyg@mail.gmail.com
parent 9aef1731
...@@ -135,7 +135,8 @@ blbuild(Relation heap, Relation index, IndexInfo *indexInfo) ...@@ -135,7 +135,8 @@ blbuild(Relation heap, Relation index, IndexInfo *indexInfo)
/* Do the heap scan */ /* Do the heap scan */
reltuples = IndexBuildHeapScan(heap, index, indexInfo, true, reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
bloomBuildCallback, (void *) &buildstate); bloomBuildCallback, (void *) &buildstate,
NULL);
/* /*
* There are could be some items in cached page. Flush this page if * There are could be some items in cached page. Flush this page if
......
...@@ -2022,7 +2022,8 @@ include_dir 'conf.d' ...@@ -2022,7 +2022,8 @@ include_dir 'conf.d'
<para> <para>
When changing this value, consider also adjusting When changing this value, consider also adjusting
<xref linkend="guc-max-parallel-workers"/> and <xref linkend="guc-max-parallel-workers"/>,
<xref linkend="guc-max-parallel-workers-maintenance"/>, and
<xref linkend="guc-max-parallel-workers-per-gather"/>. <xref linkend="guc-max-parallel-workers-per-gather"/>.
</para> </para>
</listitem> </listitem>
...@@ -2070,6 +2071,44 @@ include_dir 'conf.d' ...@@ -2070,6 +2071,44 @@ include_dir 'conf.d'
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry id="guc-max-parallel-workers-maintenance" xreflabel="max_parallel_maintenance_workers">
<term><varname>max_parallel_maintenance_workers</varname> (<type>integer</type>)
<indexterm>
<primary><varname>max_parallel_maintenance_workers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Sets the maximum number of parallel workers that can be
started by a single utility command. Currently, the only
parallel utility command that supports the use of parallel
workers is <command>CREATE INDEX</command>, and only when
building a B-tree index. Parallel workers are taken from the
pool of processes established by <xref
linkend="guc-max-worker-processes"/>, limited by <xref
linkend="guc-max-parallel-workers"/>. Note that the requested
number of workers may not actually be available at runtime.
If this occurs, the utility operation will run with fewer
workers than expected. The default value is 2. Setting this
value to 0 disables the use of parallel workers by utility
commands.
</para>
<para>
Note that parallel utility commands should not consume
substantially more memory than equivalent non-parallel
operations. This strategy differs from that of parallel
query, where resource limits generally apply per worker
process. Parallel utility commands treat the resource limit
<varname>maintenance_work_mem</varname> as a limit to be applied to
the entire utility command, regardless of the number of
parallel worker processes. However, parallel utility
commands may still consume substantially more CPU resources
and I/O bandwidth.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-max-parallel-workers" xreflabel="max_parallel_workers"> <varlistentry id="guc-max-parallel-workers" xreflabel="max_parallel_workers">
<term><varname>max_parallel_workers</varname> (<type>integer</type>) <term><varname>max_parallel_workers</varname> (<type>integer</type>)
<indexterm> <indexterm>
...@@ -2079,8 +2118,9 @@ include_dir 'conf.d' ...@@ -2079,8 +2118,9 @@ include_dir 'conf.d'
<listitem> <listitem>
<para> <para>
Sets the maximum number of workers that the system can support for Sets the maximum number of workers that the system can support for
parallel queries. The default value is 8. When increasing or parallel operations. The default value is 8. When increasing or
decreasing this value, consider also adjusting decreasing this value, consider also adjusting
<xref linkend="guc-max-parallel-workers-maintenance"/> and
<xref linkend="guc-max-parallel-workers-per-gather"/>. <xref linkend="guc-max-parallel-workers-per-gather"/>.
Also, note that a setting for this value which is higher than Also, note that a setting for this value which is higher than
<xref linkend="guc-max-worker-processes"/> will have no effect, <xref linkend="guc-max-worker-processes"/> will have no effect,
......
...@@ -1263,7 +1263,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ...@@ -1263,7 +1263,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry>Waiting in an extension.</entry> <entry>Waiting in an extension.</entry>
</row> </row>
<row> <row>
<entry morerows="32"><literal>IPC</literal></entry> <entry morerows="33"><literal>IPC</literal></entry>
<entry><literal>BgWorkerShutdown</literal></entry> <entry><literal>BgWorkerShutdown</literal></entry>
<entry>Waiting for background worker to shut down.</entry> <entry>Waiting for background worker to shut down.</entry>
</row> </row>
...@@ -1371,6 +1371,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ...@@ -1371,6 +1371,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry><literal>ParallelBitmapScan</literal></entry> <entry><literal>ParallelBitmapScan</literal></entry>
<entry>Waiting for parallel bitmap scan to become initialized.</entry> <entry>Waiting for parallel bitmap scan to become initialized.</entry>
</row> </row>
<row>
<entry><literal>ParallelCreateIndexScan</literal></entry>
<entry>Waiting for parallel <command>CREATE INDEX</command> workers to finish heap scan.</entry>
</row>
<row> <row>
<entry><literal>ProcArrayGroupUpdate</literal></entry> <entry><literal>ProcArrayGroupUpdate</literal></entry>
<entry>Waiting for group leader to clear transaction id at transaction end.</entry> <entry>Waiting for group leader to clear transaction id at transaction end.</entry>
...@@ -3900,13 +3904,15 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, ...@@ -3900,13 +3904,15 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
</row> </row>
<row> <row>
<entry><literal>sort-start</literal></entry> <entry><literal>sort-start</literal></entry>
<entry><literal>(int, bool, int, int, bool)</literal></entry> <entry><literal>(int, bool, int, int, bool, int)</literal></entry>
<entry>Probe that fires when a sort operation is started. <entry>Probe that fires when a sort operation is started.
arg0 indicates heap, index or datum sort. arg0 indicates heap, index or datum sort.
arg1 is true for unique-value enforcement. arg1 is true for unique-value enforcement.
arg2 is the number of key columns. arg2 is the number of key columns.
arg3 is the number of kilobytes of work memory allowed. arg3 is the number of kilobytes of work memory allowed.
arg4 is true if random access to the sort result is required.</entry> arg4 is true if random access to the sort result is required.
arg5 indicates serial when <literal>0</literal>, parallel worker when
<literal>1</literal>, or parallel leader when <literal>2</literal>.</entry>
</row> </row>
<row> <row>
<entry><literal>sort-done</literal></entry> <entry><literal>sort-done</literal></entry>
......
...@@ -599,6 +599,64 @@ Indexes: ...@@ -599,6 +599,64 @@ Indexes:
which would drive the machine into swapping. which would drive the machine into swapping.
</para> </para>
<para>
<productname>PostgreSQL</productname> can build indexes while
leveraging multiple CPUs in order to process the table rows faster.
This feature is known as <firstterm>parallel index
build</firstterm>. For index methods that support building indexes
in parallel (currently, only B-tree),
<varname>maintenance_work_mem</varname> specifies the maximum
amount of memory that can be used by each index build operation as
a whole, regardless of how many worker processes were started.
Generally, a cost model automatically determines how many worker
processes should be requested, if any.
</para>
<para>
Parallel index builds may benefit from increasing
<varname>maintenance_work_mem</varname> where an equivalent serial
index build will see little or no benefit. Note that
<varname>maintenance_work_mem</varname> may influence the number of
worker processes requested, since parallel workers must have at
least a <literal>32MB</literal> share of the total
<varname>maintenance_work_mem</varname> budget. There must also be
a remaining <literal>32MB</literal> share for the leader process.
Increasing <xref linkend="guc-max-parallel-workers-maintenance"/>
may allow more workers to be used, which will reduce the time
needed for index creation, so long as the index build is not
already I/O bound. Of course, there should also be sufficient
CPU capacity that would otherwise lie idle.
</para>
<para>
Setting a value for <literal>parallel_workers</literal> via <xref
linkend="sql-altertable"/> directly controls how many parallel
worker processes will be requested by a <command>CREATE
INDEX</command> against the table. This bypasses the cost model
completely, and prevents <varname>maintenance_work_mem</varname>
from affecting how many parallel workers are requested. Setting
<literal>parallel_workers</literal> to 0 via <command>ALTER
TABLE</command> will disable parallel index builds on the table in
all cases.
</para>
<tip>
<para>
You might want to reset <literal>parallel_workers</literal> after
setting it as part of tuning an index build. This avoids
inadvertent changes to query plans, since
<literal>parallel_workers</literal> affects
<emphasis>all</emphasis> parallel table scans.
</para>
</tip>
<para>
While <command>CREATE INDEX</command> with the
<literal>CONCURRENTLY</literal> option supports parallel builds
without special restrictions, only the first table scan is actually
performed in parallel.
</para>
<para> <para>
Use <xref linkend="sql-dropindex"/> Use <xref linkend="sql-dropindex"/>
to remove an index. to remove an index.
......
...@@ -1228,8 +1228,8 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM ...@@ -1228,8 +1228,8 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
This sets the number of workers that should be used to assist a parallel This sets the number of workers that should be used to assist a parallel
scan of this table. If not set, the system will determine a value based scan of this table. If not set, the system will determine a value based
on the relation size. The actual number of workers chosen by the planner on the relation size. The actual number of workers chosen by the planner
may be less, for example due to or by utility statements that use parallel scans may be less, for example
the setting of <xref linkend="guc-max-worker-processes"/>. due to the setting of <xref linkend="guc-max-worker-processes"/>.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
......
...@@ -706,7 +706,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo) ...@@ -706,7 +706,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
* heap blocks in physical order. * heap blocks in physical order.
*/ */
reltuples = IndexBuildHeapScan(heap, index, indexInfo, false, reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
brinbuildCallback, (void *) state); brinbuildCallback, (void *) state, NULL);
/* process the final batch */ /* process the final batch */
form_and_insert_tuple(state); form_and_insert_tuple(state);
...@@ -1205,7 +1205,7 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel, ...@@ -1205,7 +1205,7 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
state->bs_currRangeStart = heapBlk; state->bs_currRangeStart = heapBlk;
IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true, IndexBuildHeapRangeScan(heapRel, state->bs_irel, indexInfo, false, true,
heapBlk, scanNumBlks, heapBlk, scanNumBlks,
brinbuildCallback, (void *) state); brinbuildCallback, (void *) state, NULL);
/* /*
* Now we update the values obtained by the scan with the placeholder * Now we update the values obtained by the scan with the placeholder
......
...@@ -391,7 +391,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo) ...@@ -391,7 +391,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
* prefers to receive tuples in TID order. * prefers to receive tuples in TID order.
*/ */
reltuples = IndexBuildHeapScan(heap, index, indexInfo, false, reltuples = IndexBuildHeapScan(heap, index, indexInfo, false,
ginBuildCallback, (void *) &buildstate); ginBuildCallback, (void *) &buildstate, NULL);
/* dump remaining entries to the index */ /* dump remaining entries to the index */
oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx); oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
......
...@@ -203,7 +203,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo) ...@@ -203,7 +203,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
* Do the heap scan. * Do the heap scan.
*/ */
reltuples = IndexBuildHeapScan(heap, index, indexInfo, true, reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
gistBuildCallback, (void *) &buildstate); gistBuildCallback, (void *) &buildstate, NULL);
/* /*
* If buffering was used, flush out all the tuples that are still in the * If buffering was used, flush out all the tuples that are still in the
......
...@@ -159,7 +159,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo) ...@@ -159,7 +159,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
/* do the heap scan */ /* do the heap scan */
reltuples = IndexBuildHeapScan(heap, index, indexInfo, true, reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
hashbuildCallback, (void *) &buildstate); hashbuildCallback, (void *) &buildstate, NULL);
if (buildstate.spool) if (buildstate.spool)
{ {
......
...@@ -82,6 +82,7 @@ _h_spoolinit(Relation heap, Relation index, uint32 num_buckets) ...@@ -82,6 +82,7 @@ _h_spoolinit(Relation heap, Relation index, uint32 num_buckets)
hspool->low_mask, hspool->low_mask,
hspool->max_buckets, hspool->max_buckets,
maintenance_work_mem, maintenance_work_mem,
NULL,
false); false);
return hspool; return hspool;
......
...@@ -1627,7 +1627,16 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, ...@@ -1627,7 +1627,16 @@ heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
SpinLockInit(&target->phs_mutex); SpinLockInit(&target->phs_mutex);
target->phs_startblock = InvalidBlockNumber; target->phs_startblock = InvalidBlockNumber;
pg_atomic_init_u64(&target->phs_nallocated, 0); pg_atomic_init_u64(&target->phs_nallocated, 0);
SerializeSnapshot(snapshot, target->phs_snapshot_data); if (IsMVCCSnapshot(snapshot))
{
SerializeSnapshot(snapshot, target->phs_snapshot_data);
target->phs_snapshot_any = false;
}
else
{
Assert(snapshot == SnapshotAny);
target->phs_snapshot_any = true;
}
} }
/* ---------------- /* ----------------
...@@ -1655,11 +1664,22 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan) ...@@ -1655,11 +1664,22 @@ heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
Snapshot snapshot; Snapshot snapshot;
Assert(RelationGetRelid(relation) == parallel_scan->phs_relid); Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
RegisterSnapshot(snapshot); if (!parallel_scan->phs_snapshot_any)
{
/* Snapshot was serialized -- restore it */
snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
RegisterSnapshot(snapshot);
}
else
{
/* SnapshotAny passed by caller (not serialized) */
snapshot = SnapshotAny;
}
return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan, return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
true, true, true, false, false, true); true, true, true, false, false,
!parallel_scan->phs_snapshot_any);
} }
/* ---------------- /* ----------------
......
...@@ -21,36 +21,19 @@ ...@@ -21,36 +21,19 @@
#include "access/nbtree.h" #include "access/nbtree.h"
#include "access/relscan.h" #include "access/relscan.h"
#include "access/xlog.h" #include "access/xlog.h"
#include "catalog/index.h"
#include "commands/vacuum.h" #include "commands/vacuum.h"
#include "nodes/execnodes.h"
#include "pgstat.h" #include "pgstat.h"
#include "storage/condition_variable.h" #include "storage/condition_variable.h"
#include "storage/indexfsm.h" #include "storage/indexfsm.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#include "tcop/tcopprot.h" /* pgrminclude ignore */
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/index_selfuncs.h" #include "utils/index_selfuncs.h"
#include "utils/memutils.h" #include "utils/memutils.h"
/* Working state for btbuild and its callback */
typedef struct
{
bool isUnique;
bool haveDead;
Relation heapRel;
BTSpool *spool;
/*
* spool2 is needed only when the index is a unique index. Dead tuples are
* put into spool2 instead of spool in order to avoid uniqueness check.
*/
BTSpool *spool2;
double indtuples;
} BTBuildState;
/* Working state needed by btvacuumpage */ /* Working state needed by btvacuumpage */
typedef struct typedef struct
{ {
...@@ -104,12 +87,6 @@ typedef struct BTParallelScanDescData ...@@ -104,12 +87,6 @@ typedef struct BTParallelScanDescData
typedef struct BTParallelScanDescData *BTParallelScanDesc; typedef struct BTParallelScanDescData *BTParallelScanDesc;
static void btbuildCallback(Relation index,
HeapTuple htup,
Datum *values,
bool *isnull,
bool tupleIsAlive,
void *state);
static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, static void btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
IndexBulkDeleteCallback callback, void *callback_state, IndexBulkDeleteCallback callback, void *callback_state,
BTCycleId cycleid); BTCycleId cycleid);
...@@ -166,115 +143,6 @@ bthandler(PG_FUNCTION_ARGS) ...@@ -166,115 +143,6 @@ bthandler(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(amroutine); PG_RETURN_POINTER(amroutine);
} }
/*
* btbuild() -- build a new btree index.
*/
IndexBuildResult *
btbuild(Relation heap, Relation index, IndexInfo *indexInfo)
{
IndexBuildResult *result;
double reltuples;
BTBuildState buildstate;
buildstate.isUnique = indexInfo->ii_Unique;
buildstate.haveDead = false;
buildstate.heapRel = heap;
buildstate.spool = NULL;
buildstate.spool2 = NULL;
buildstate.indtuples = 0;
#ifdef BTREE_BUILD_STATS
if (log_btree_build_stats)
ResetUsage();
#endif /* BTREE_BUILD_STATS */
/*
* We expect to be called exactly once for any index relation. If that's
* not the case, big trouble's what we have.
*/
if (RelationGetNumberOfBlocks(index) != 0)
elog(ERROR, "index \"%s\" already contains data",
RelationGetRelationName(index));
buildstate.spool = _bt_spoolinit(heap, index, indexInfo->ii_Unique, false);
/*
* If building a unique index, put dead tuples in a second spool to keep
* them out of the uniqueness check.
*/
if (indexInfo->ii_Unique)
buildstate.spool2 = _bt_spoolinit(heap, index, false, true);
/* do the heap scan */
reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
btbuildCallback, (void *) &buildstate);
/* okay, all heap tuples are indexed */
if (buildstate.spool2 && !buildstate.haveDead)
{
/* spool2 turns out to be unnecessary */
_bt_spooldestroy(buildstate.spool2);
buildstate.spool2 = NULL;
}
/*
* Finish the build by (1) completing the sort of the spool file, (2)
* inserting the sorted tuples into btree pages and (3) building the upper
* levels.
*/
_bt_leafbuild(buildstate.spool, buildstate.spool2);
_bt_spooldestroy(buildstate.spool);
if (buildstate.spool2)
_bt_spooldestroy(buildstate.spool2);
#ifdef BTREE_BUILD_STATS
if (log_btree_build_stats)
{
ShowUsage("BTREE BUILD STATS");
ResetUsage();
}
#endif /* BTREE_BUILD_STATS */
/*
* Return statistics
*/
result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
result->heap_tuples = reltuples;
result->index_tuples = buildstate.indtuples;
return result;
}
/*
* Per-tuple callback from IndexBuildHeapScan
*/
static void
btbuildCallback(Relation index,
HeapTuple htup,
Datum *values,
bool *isnull,
bool tupleIsAlive,
void *state)
{
BTBuildState *buildstate = (BTBuildState *) state;
/*
* insert the index tuple into the appropriate spool file for subsequent
* processing
*/
if (tupleIsAlive || buildstate->spool2 == NULL)
_bt_spool(buildstate->spool, &htup->t_self, values, isnull);
else
{
/* dead tuples are put into spool2 */
buildstate->haveDead = true;
_bt_spool(buildstate->spool2, &htup->t_self, values, isnull);
}
buildstate->indtuples += 1;
}
/* /*
* btbuildempty() -- build an empty btree index in the initialization fork * btbuildempty() -- build an empty btree index in the initialization fork
*/ */
......
This diff is collapsed.
...@@ -138,7 +138,8 @@ spgbuild(Relation heap, Relation index, IndexInfo *indexInfo) ...@@ -138,7 +138,8 @@ spgbuild(Relation heap, Relation index, IndexInfo *indexInfo)
ALLOCSET_DEFAULT_SIZES); ALLOCSET_DEFAULT_SIZES);
reltuples = IndexBuildHeapScan(heap, index, indexInfo, true, reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
spgistBuildCallback, (void *) &buildstate); spgistBuildCallback, (void *) &buildstate,
NULL);
MemoryContextDelete(buildstate.tmpCtx); MemoryContextDelete(buildstate.tmpCtx);
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#include "postgres.h" #include "postgres.h"
#include "access/nbtree.h"
#include "access/parallel.h" #include "access/parallel.h"
#include "access/session.h" #include "access/session.h"
#include "access/xact.h" #include "access/xact.h"
...@@ -129,6 +130,9 @@ static const struct ...@@ -129,6 +130,9 @@ static const struct
{ {
{ {
"ParallelQueryMain", ParallelQueryMain "ParallelQueryMain", ParallelQueryMain
},
{
"_bt_parallel_build_main", _bt_parallel_build_main
} }
}; };
...@@ -146,7 +150,7 @@ static void ParallelWorkerShutdown(int code, Datum arg); ...@@ -146,7 +150,7 @@ static void ParallelWorkerShutdown(int code, Datum arg);
*/ */
ParallelContext * ParallelContext *
CreateParallelContext(const char *library_name, const char *function_name, CreateParallelContext(const char *library_name, const char *function_name,
int nworkers) int nworkers, bool serializable_okay)
{ {
MemoryContext oldcontext; MemoryContext oldcontext;
ParallelContext *pcxt; ParallelContext *pcxt;
...@@ -167,9 +171,11 @@ CreateParallelContext(const char *library_name, const char *function_name, ...@@ -167,9 +171,11 @@ CreateParallelContext(const char *library_name, const char *function_name,
/* /*
* If we are running under serializable isolation, we can't use parallel * If we are running under serializable isolation, we can't use parallel
* workers, at least not until somebody enhances that mechanism to be * workers, at least not until somebody enhances that mechanism to be
* parallel-aware. * parallel-aware. Utility statement callers may ask us to ignore this
* restriction because they're always able to safely ignore the fact that
* SIREAD locks do not work with parallelism.
*/ */
if (IsolationIsSerializable()) if (IsolationIsSerializable() && !serializable_okay)
nworkers = 0; nworkers = 0;
/* We might be running in a short-lived memory context. */ /* We might be running in a short-lived memory context. */
......
...@@ -1137,7 +1137,7 @@ build_indices(void) ...@@ -1137,7 +1137,7 @@ build_indices(void)
heap = heap_open(ILHead->il_heap, NoLock); heap = heap_open(ILHead->il_heap, NoLock);
ind = index_open(ILHead->il_ind, NoLock); ind = index_open(ILHead->il_ind, NoLock);
index_build(heap, ind, ILHead->il_info, false, false); index_build(heap, ind, ILHead->il_info, false, false, false);
index_close(ind, NoLock); index_close(ind, NoLock);
heap_close(heap, NoLock); heap_close(heap, NoLock);
......
...@@ -2841,7 +2841,7 @@ RelationTruncateIndexes(Relation heapRelation) ...@@ -2841,7 +2841,7 @@ RelationTruncateIndexes(Relation heapRelation)
/* Initialize the index and rebuild */ /* Initialize the index and rebuild */
/* Note: we do not need to re-establish pkey setting */ /* Note: we do not need to re-establish pkey setting */
index_build(heapRelation, currentIndex, indexInfo, false, true); index_build(heapRelation, currentIndex, indexInfo, false, true, false);
/* We're done with this index */ /* We're done with this index */
index_close(currentIndex, NoLock); index_close(currentIndex, NoLock);
......
...@@ -56,6 +56,7 @@ ...@@ -56,6 +56,7 @@
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h" #include "nodes/nodeFuncs.h"
#include "optimizer/clauses.h" #include "optimizer/clauses.h"
#include "optimizer/planner.h"
#include "parser/parser.h" #include "parser/parser.h"
#include "rewrite/rewriteManip.h" #include "rewrite/rewriteManip.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
...@@ -902,7 +903,7 @@ index_create(Relation heapRelation, ...@@ -902,7 +903,7 @@ index_create(Relation heapRelation,
Assert(indexRelationId == RelationGetRelid(indexRelation)); Assert(indexRelationId == RelationGetRelid(indexRelation));
/* /*
* Obtain exclusive lock on it. Although no other backends can see it * Obtain exclusive lock on it. Although no other transactions can see it
* until we commit, this prevents deadlock-risk complaints from lock * until we commit, this prevents deadlock-risk complaints from lock
* manager in cases such as CLUSTER. * manager in cases such as CLUSTER.
*/ */
...@@ -1159,7 +1160,8 @@ index_create(Relation heapRelation, ...@@ -1159,7 +1160,8 @@ index_create(Relation heapRelation,
} }
else else
{ {
index_build(heapRelation, indexRelation, indexInfo, isprimary, false); index_build(heapRelation, indexRelation, indexInfo, isprimary, false,
true);
} }
/* /*
...@@ -1746,6 +1748,7 @@ BuildIndexInfo(Relation index) ...@@ -1746,6 +1748,7 @@ BuildIndexInfo(Relation index)
/* initialize index-build state to default */ /* initialize index-build state to default */
ii->ii_Concurrent = false; ii->ii_Concurrent = false;
ii->ii_BrokenHotChain = false; ii->ii_BrokenHotChain = false;
ii->ii_ParallelWorkers = 0;
/* set up for possible use by index AM */ /* set up for possible use by index AM */
ii->ii_Am = index->rd_rel->relam; ii->ii_Am = index->rd_rel->relam;
...@@ -2164,6 +2167,7 @@ index_update_stats(Relation rel, ...@@ -2164,6 +2167,7 @@ index_update_stats(Relation rel,
* *
* isprimary tells whether to mark the index as a primary-key index. * isprimary tells whether to mark the index as a primary-key index.
* isreindex indicates we are recreating a previously-existing index. * isreindex indicates we are recreating a previously-existing index.
* parallel indicates if parallelism may be useful.
* *
* Note: when reindexing an existing index, isprimary can be false even if * Note: when reindexing an existing index, isprimary can be false even if
* the index is a PK; it's already properly marked and need not be re-marked. * the index is a PK; it's already properly marked and need not be re-marked.
...@@ -2177,7 +2181,8 @@ index_build(Relation heapRelation, ...@@ -2177,7 +2181,8 @@ index_build(Relation heapRelation,
Relation indexRelation, Relation indexRelation,
IndexInfo *indexInfo, IndexInfo *indexInfo,
bool isprimary, bool isprimary,
bool isreindex) bool isreindex,
bool parallel)
{ {
IndexBuildResult *stats; IndexBuildResult *stats;
Oid save_userid; Oid save_userid;
...@@ -2192,10 +2197,31 @@ index_build(Relation heapRelation, ...@@ -2192,10 +2197,31 @@ index_build(Relation heapRelation,
Assert(PointerIsValid(indexRelation->rd_amroutine->ambuild)); Assert(PointerIsValid(indexRelation->rd_amroutine->ambuild));
Assert(PointerIsValid(indexRelation->rd_amroutine->ambuildempty)); Assert(PointerIsValid(indexRelation->rd_amroutine->ambuildempty));
ereport(DEBUG1, /*
(errmsg("building index \"%s\" on table \"%s\"", * Determine worker process details for parallel CREATE INDEX. Currently,
RelationGetRelationName(indexRelation), * only btree has support for parallel builds.
RelationGetRelationName(heapRelation)))); *
* Note that planner considers parallel safety for us.
*/
if (parallel && IsNormalProcessingMode() &&
indexRelation->rd_rel->relam == BTREE_AM_OID)
indexInfo->ii_ParallelWorkers =
plan_create_index_workers(RelationGetRelid(heapRelation),
RelationGetRelid(indexRelation));
if (indexInfo->ii_ParallelWorkers == 0)
ereport(DEBUG1,
(errmsg("building index \"%s\" on table \"%s\" serially",
RelationGetRelationName(indexRelation),
RelationGetRelationName(heapRelation))));
else
ereport(DEBUG1,
(errmsg_plural("building index \"%s\" on table \"%s\" with request for %d parallel worker",
"building index \"%s\" on table \"%s\" with request for %d parallel workers",
indexInfo->ii_ParallelWorkers,
RelationGetRelationName(indexRelation),
RelationGetRelationName(heapRelation),
indexInfo->ii_ParallelWorkers)));
/* /*
* Switch to the table owner's userid, so that any index functions are run * Switch to the table owner's userid, so that any index functions are run
...@@ -2347,13 +2373,14 @@ IndexBuildHeapScan(Relation heapRelation, ...@@ -2347,13 +2373,14 @@ IndexBuildHeapScan(Relation heapRelation,
IndexInfo *indexInfo, IndexInfo *indexInfo,
bool allow_sync, bool allow_sync,
IndexBuildCallback callback, IndexBuildCallback callback,
void *callback_state) void *callback_state,
HeapScanDesc scan)
{ {
return IndexBuildHeapRangeScan(heapRelation, indexRelation, return IndexBuildHeapRangeScan(heapRelation, indexRelation,
indexInfo, allow_sync, indexInfo, allow_sync,
false, false,
0, InvalidBlockNumber, 0, InvalidBlockNumber,
callback, callback_state); callback, callback_state, scan);
} }
/* /*
...@@ -2375,11 +2402,11 @@ IndexBuildHeapRangeScan(Relation heapRelation, ...@@ -2375,11 +2402,11 @@ IndexBuildHeapRangeScan(Relation heapRelation,
BlockNumber start_blockno, BlockNumber start_blockno,
BlockNumber numblocks, BlockNumber numblocks,
IndexBuildCallback callback, IndexBuildCallback callback,
void *callback_state) void *callback_state,
HeapScanDesc scan)
{ {
bool is_system_catalog; bool is_system_catalog;
bool checking_uniqueness; bool checking_uniqueness;
HeapScanDesc scan;
HeapTuple heapTuple; HeapTuple heapTuple;
Datum values[INDEX_MAX_KEYS]; Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS];
...@@ -2389,6 +2416,7 @@ IndexBuildHeapRangeScan(Relation heapRelation, ...@@ -2389,6 +2416,7 @@ IndexBuildHeapRangeScan(Relation heapRelation,
EState *estate; EState *estate;
ExprContext *econtext; ExprContext *econtext;
Snapshot snapshot; Snapshot snapshot;
bool need_unregister_snapshot = false;
TransactionId OldestXmin; TransactionId OldestXmin;
BlockNumber root_blkno = InvalidBlockNumber; BlockNumber root_blkno = InvalidBlockNumber;
OffsetNumber root_offsets[MaxHeapTuplesPerPage]; OffsetNumber root_offsets[MaxHeapTuplesPerPage];
...@@ -2432,27 +2460,59 @@ IndexBuildHeapRangeScan(Relation heapRelation, ...@@ -2432,27 +2460,59 @@ IndexBuildHeapRangeScan(Relation heapRelation,
* concurrent build, or during bootstrap, we take a regular MVCC snapshot * concurrent build, or during bootstrap, we take a regular MVCC snapshot
* and index whatever's live according to that. * and index whatever's live according to that.
*/ */
if (IsBootstrapProcessingMode() || indexInfo->ii_Concurrent) OldestXmin = InvalidTransactionId;
{
snapshot = RegisterSnapshot(GetTransactionSnapshot()); /* okay to ignore lazy VACUUMs here */
OldestXmin = InvalidTransactionId; /* not used */ if (!IsBootstrapProcessingMode() && !indexInfo->ii_Concurrent)
OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM);
/* "any visible" mode is not compatible with this */ if (!scan)
Assert(!anyvisible); {
/*
* Serial index build.
*
* Must begin our own heap scan in this case. We may also need to
* register a snapshot whose lifetime is under our direct control.
*/
if (!TransactionIdIsValid(OldestXmin))
{
snapshot = RegisterSnapshot(GetTransactionSnapshot());
need_unregister_snapshot = true;
}
else
snapshot = SnapshotAny;
scan = heap_beginscan_strat(heapRelation, /* relation */
snapshot, /* snapshot */
0, /* number of keys */
NULL, /* scan key */
true, /* buffer access strategy OK */
allow_sync); /* syncscan OK? */
} }
else else
{ {
snapshot = SnapshotAny; /*
/* okay to ignore lazy VACUUMs here */ * Parallel index build.
OldestXmin = GetOldestXmin(heapRelation, PROCARRAY_FLAGS_VACUUM); *
* Parallel case never registers/unregisters own snapshot. Snapshot
* is taken from parallel heap scan, and is SnapshotAny or an MVCC
* snapshot, based on same criteria as serial case.
*/
Assert(!IsBootstrapProcessingMode());
Assert(allow_sync);
snapshot = scan->rs_snapshot;
} }
scan = heap_beginscan_strat(heapRelation, /* relation */ /*
snapshot, /* snapshot */ * Must call GetOldestXmin() with SnapshotAny. Should never call
0, /* number of keys */ * GetOldestXmin() with MVCC snapshot. (It's especially worth checking
NULL, /* scan key */ * this for parallel builds, since ambuild routines that support parallel
true, /* buffer access strategy OK */ * builds must work these details out for themselves.)
allow_sync); /* syncscan OK? */ */
Assert(snapshot == SnapshotAny || IsMVCCSnapshot(snapshot));
Assert(snapshot == SnapshotAny ? TransactionIdIsValid(OldestXmin) :
!TransactionIdIsValid(OldestXmin));
Assert(snapshot == SnapshotAny || !anyvisible);
/* set our scan endpoints */ /* set our scan endpoints */
if (!allow_sync) if (!allow_sync)
...@@ -2783,8 +2843,8 @@ IndexBuildHeapRangeScan(Relation heapRelation, ...@@ -2783,8 +2843,8 @@ IndexBuildHeapRangeScan(Relation heapRelation,
heap_endscan(scan); heap_endscan(scan);
/* we can now forget our snapshot, if set */ /* we can now forget our snapshot, if set and registered by us */
if (IsBootstrapProcessingMode() || indexInfo->ii_Concurrent) if (need_unregister_snapshot)
UnregisterSnapshot(snapshot); UnregisterSnapshot(snapshot);
ExecDropSingleTupleTableSlot(slot); ExecDropSingleTupleTableSlot(slot);
...@@ -3027,7 +3087,7 @@ validate_index(Oid heapId, Oid indexId, Snapshot snapshot) ...@@ -3027,7 +3087,7 @@ validate_index(Oid heapId, Oid indexId, Snapshot snapshot)
state.tuplesort = tuplesort_begin_datum(INT8OID, Int8LessOperator, state.tuplesort = tuplesort_begin_datum(INT8OID, Int8LessOperator,
InvalidOid, false, InvalidOid, false,
maintenance_work_mem, maintenance_work_mem,
false); NULL, false);
state.htups = state.itups = state.tups_inserted = 0; state.htups = state.itups = state.tups_inserted = 0;
(void) index_bulk_delete(&ivinfo, NULL, (void) index_bulk_delete(&ivinfo, NULL,
...@@ -3552,7 +3612,7 @@ reindex_index(Oid indexId, bool skip_constraint_checks, char persistence, ...@@ -3552,7 +3612,7 @@ reindex_index(Oid indexId, bool skip_constraint_checks, char persistence,
/* Initialize the index and rebuild */ /* Initialize the index and rebuild */
/* Note: we do not need to re-establish pkey setting */ /* Note: we do not need to re-establish pkey setting */
index_build(heapRelation, iRel, indexInfo, false, true); index_build(heapRelation, iRel, indexInfo, false, true, true);
} }
PG_CATCH(); PG_CATCH();
{ {
...@@ -3911,8 +3971,7 @@ SetReindexProcessing(Oid heapOid, Oid indexOid) ...@@ -3911,8 +3971,7 @@ SetReindexProcessing(Oid heapOid, Oid indexOid)
static void static void
ResetReindexProcessing(void) ResetReindexProcessing(void)
{ {
if (IsInParallelMode()) /* This may be called in leader error path */
elog(ERROR, "cannot modify reindex state during a parallel operation");
currentlyReindexedHeap = InvalidOid; currentlyReindexedHeap = InvalidOid;
currentlyReindexedIndex = InvalidOid; currentlyReindexedIndex = InvalidOid;
} }
......
...@@ -315,6 +315,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, ...@@ -315,6 +315,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
indexInfo->ii_ReadyForInserts = true; indexInfo->ii_ReadyForInserts = true;
indexInfo->ii_Concurrent = false; indexInfo->ii_Concurrent = false;
indexInfo->ii_BrokenHotChain = false; indexInfo->ii_BrokenHotChain = false;
indexInfo->ii_ParallelWorkers = 0;
indexInfo->ii_Am = BTREE_AM_OID; indexInfo->ii_Am = BTREE_AM_OID;
indexInfo->ii_AmCache = NULL; indexInfo->ii_AmCache = NULL;
indexInfo->ii_Context = CurrentMemoryContext; indexInfo->ii_Context = CurrentMemoryContext;
......
...@@ -909,7 +909,8 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose, ...@@ -909,7 +909,8 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, bool verbose,
/* Set up sorting if wanted */ /* Set up sorting if wanted */
if (use_sort) if (use_sort)
tuplesort = tuplesort_begin_cluster(oldTupDesc, OldIndex, tuplesort = tuplesort_begin_cluster(oldTupDesc, OldIndex,
maintenance_work_mem, false); maintenance_work_mem,
NULL, false);
else else
tuplesort = NULL; tuplesort = NULL;
......
...@@ -380,6 +380,10 @@ DefineIndex(Oid relationId, ...@@ -380,6 +380,10 @@ DefineIndex(Oid relationId,
* this will typically require the caller to have already locked the * this will typically require the caller to have already locked the
* relation. To avoid lock upgrade hazards, that lock should be at least * relation. To avoid lock upgrade hazards, that lock should be at least
* as strong as the one we take here. * as strong as the one we take here.
*
* NB: If the lock strength here ever changes, code that is run by
* parallel workers under the control of certain particular ambuild
* functions will need to be updated, too.
*/ */
lockmode = stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock; lockmode = stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock;
rel = heap_open(relationId, lockmode); rel = heap_open(relationId, lockmode);
...@@ -617,6 +621,7 @@ DefineIndex(Oid relationId, ...@@ -617,6 +621,7 @@ DefineIndex(Oid relationId,
indexInfo->ii_ReadyForInserts = !stmt->concurrent; indexInfo->ii_ReadyForInserts = !stmt->concurrent;
indexInfo->ii_Concurrent = stmt->concurrent; indexInfo->ii_Concurrent = stmt->concurrent;
indexInfo->ii_BrokenHotChain = false; indexInfo->ii_BrokenHotChain = false;
indexInfo->ii_ParallelWorkers = 0;
indexInfo->ii_Am = accessMethodId; indexInfo->ii_Am = accessMethodId;
indexInfo->ii_AmCache = NULL; indexInfo->ii_AmCache = NULL;
indexInfo->ii_Context = CurrentMemoryContext; indexInfo->ii_Context = CurrentMemoryContext;
...@@ -1000,7 +1005,7 @@ DefineIndex(Oid relationId, ...@@ -1000,7 +1005,7 @@ DefineIndex(Oid relationId,
indexInfo->ii_BrokenHotChain = false; indexInfo->ii_BrokenHotChain = false;
/* Now build the index */ /* Now build the index */
index_build(rel, indexRelation, indexInfo, stmt->primary, false); index_build(rel, indexRelation, indexInfo, stmt->primary, false, true);
/* Close both the relations, but keep the locks */ /* Close both the relations, but keep the locks */
heap_close(rel, NoLock); heap_close(rel, NoLock);
......
...@@ -592,7 +592,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, ...@@ -592,7 +592,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
pstmt_data = ExecSerializePlan(planstate->plan, estate); pstmt_data = ExecSerializePlan(planstate->plan, estate);
/* Create a parallel context. */ /* Create a parallel context. */
pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers); pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers, false);
pei->pcxt = pcxt; pei->pcxt = pcxt;
/* /*
......
...@@ -373,7 +373,7 @@ initialize_phase(AggState *aggstate, int newphase) ...@@ -373,7 +373,7 @@ initialize_phase(AggState *aggstate, int newphase)
sortnode->collations, sortnode->collations,
sortnode->nullsFirst, sortnode->nullsFirst,
work_mem, work_mem,
false); NULL, false);
} }
aggstate->current_phase = newphase; aggstate->current_phase = newphase;
...@@ -450,7 +450,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, ...@@ -450,7 +450,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
pertrans->sortOperators[0], pertrans->sortOperators[0],
pertrans->sortCollations[0], pertrans->sortCollations[0],
pertrans->sortNullsFirst[0], pertrans->sortNullsFirst[0],
work_mem, false); work_mem, NULL, false);
} }
else else
pertrans->sortstates[aggstate->current_set] = pertrans->sortstates[aggstate->current_set] =
...@@ -460,7 +460,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans, ...@@ -460,7 +460,7 @@ initialize_aggregate(AggState *aggstate, AggStatePerTrans pertrans,
pertrans->sortOperators, pertrans->sortOperators,
pertrans->sortCollations, pertrans->sortCollations,
pertrans->sortNullsFirst, pertrans->sortNullsFirst,
work_mem, false); work_mem, NULL, false);
} }
/* /*
......
...@@ -93,7 +93,7 @@ ExecSort(PlanState *pstate) ...@@ -93,7 +93,7 @@ ExecSort(PlanState *pstate)
plannode->collations, plannode->collations,
plannode->nullsFirst, plannode->nullsFirst,
work_mem, work_mem,
node->randomAccess); NULL, node->randomAccess);
if (node->bounded) if (node->bounded)
tuplesort_set_bound(tuplesortstate, node->bound); tuplesort_set_bound(tuplesortstate, node->bound);
node->tuplesortstate = (void *) tuplesortstate; node->tuplesortstate = (void *) tuplesortstate;
......
...@@ -720,7 +720,8 @@ create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel) ...@@ -720,7 +720,8 @@ create_plain_partial_paths(PlannerInfo *root, RelOptInfo *rel)
{ {
int parallel_workers; int parallel_workers;
parallel_workers = compute_parallel_worker(rel, rel->pages, -1); parallel_workers = compute_parallel_worker(rel, rel->pages, -1,
max_parallel_workers_per_gather);
/* If any limit was set to zero, the user doesn't want a parallel scan. */ /* If any limit was set to zero, the user doesn't want a parallel scan. */
if (parallel_workers <= 0) if (parallel_workers <= 0)
...@@ -3299,7 +3300,8 @@ create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel, ...@@ -3299,7 +3300,8 @@ create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
pages_fetched = compute_bitmap_pages(root, rel, bitmapqual, 1.0, pages_fetched = compute_bitmap_pages(root, rel, bitmapqual, 1.0,
NULL, NULL); NULL, NULL);
parallel_workers = compute_parallel_worker(rel, pages_fetched, -1); parallel_workers = compute_parallel_worker(rel, pages_fetched, -1,
max_parallel_workers_per_gather);
if (parallel_workers <= 0) if (parallel_workers <= 0)
return; return;
...@@ -3319,9 +3321,13 @@ create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel, ...@@ -3319,9 +3321,13 @@ create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
* *
* "index_pages" is the number of pages from the index that we expect to scan, or * "index_pages" is the number of pages from the index that we expect to scan, or
* -1 if we don't expect to scan any. * -1 if we don't expect to scan any.
*
* "max_workers" is caller's limit on the number of workers. This typically
* comes from a GUC.
*/ */
int int
compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages) compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages,
int max_workers)
{ {
int parallel_workers = 0; int parallel_workers = 0;
...@@ -3392,10 +3398,8 @@ compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages) ...@@ -3392,10 +3398,8 @@ compute_parallel_worker(RelOptInfo *rel, double heap_pages, double index_pages)
} }
} }
/* /* In no case use more than caller supplied maximum number of workers */
* In no case use more than max_parallel_workers_per_gather workers. parallel_workers = Min(parallel_workers, max_workers);
*/
parallel_workers = Min(parallel_workers, max_parallel_workers_per_gather);
return parallel_workers; return parallel_workers;
} }
......
...@@ -682,7 +682,9 @@ cost_index(IndexPath *path, PlannerInfo *root, double loop_count, ...@@ -682,7 +682,9 @@ cost_index(IndexPath *path, PlannerInfo *root, double loop_count,
* order. * order.
*/ */
path->path.parallel_workers = compute_parallel_worker(baserel, path->path.parallel_workers = compute_parallel_worker(baserel,
rand_heap_pages, index_pages); rand_heap_pages,
index_pages,
max_parallel_workers_per_gather);
/* /*
* Fall out if workers can't be assigned for parallel scan, because in * Fall out if workers can't be assigned for parallel scan, because in
......
...@@ -5793,6 +5793,142 @@ plan_cluster_use_sort(Oid tableOid, Oid indexOid) ...@@ -5793,6 +5793,142 @@ plan_cluster_use_sort(Oid tableOid, Oid indexOid)
return (seqScanAndSortPath.total_cost < indexScanPath->path.total_cost); return (seqScanAndSortPath.total_cost < indexScanPath->path.total_cost);
} }
/*
* plan_create_index_workers
* Use the planner to decide how many parallel worker processes
* CREATE INDEX should request for use
*
* tableOid is the table on which the index is to be built. indexOid is the
* OID of an index to be created or reindexed (which must be a btree index).
*
* Return value is the number of parallel worker processes to request. It
* may be unsafe to proceed if this is 0. Note that this does not include the
* leader participating as a worker (value is always a number of parallel
* worker processes).
*
* Note: caller had better already hold some type of lock on the table and
* index.
*/
int
plan_create_index_workers(Oid tableOid, Oid indexOid)
{
PlannerInfo *root;
Query *query;
PlannerGlobal *glob;
RangeTblEntry *rte;
Relation heap;
Relation index;
RelOptInfo *rel;
int parallel_workers;
BlockNumber heap_blocks;
double reltuples;
double allvisfrac;
/* Return immediately when parallelism disabled */
if (max_parallel_maintenance_workers == 0)
return 0;
/* Set up largely-dummy planner state */
query = makeNode(Query);
query->commandType = CMD_SELECT;
glob = makeNode(PlannerGlobal);
root = makeNode(PlannerInfo);
root->parse = query;
root->glob = glob;
root->query_level = 1;
root->planner_cxt = CurrentMemoryContext;
root->wt_param_id = -1;
/*
* Build a minimal RTE.
*
* Set the target's table to be an inheritance parent. This is a kludge
* that prevents problems within get_relation_info(), which does not
* expect that any IndexOptInfo is currently undergoing REINDEX.
*/
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_RELATION;
rte->relid = tableOid;
rte->relkind = RELKIND_RELATION; /* Don't be too picky. */
rte->lateral = false;
rte->inh = true;
rte->inFromCl = true;
query->rtable = list_make1(rte);
/* Set up RTE/RelOptInfo arrays */
setup_simple_rel_arrays(root);
/* Build RelOptInfo */
rel = build_simple_rel(root, 1, NULL);
heap = heap_open(tableOid, NoLock);
index = index_open(indexOid, NoLock);
/*
* Determine if it's safe to proceed.
*
* Currently, parallel workers can't access the leader's temporary tables.
* Furthermore, any index predicate or index expressions must be parallel
* safe.
*/
if (heap->rd_rel->relpersistence == RELPERSISTENCE_TEMP ||
!is_parallel_safe(root, (Node *) RelationGetIndexExpressions(index)) ||
!is_parallel_safe(root, (Node *) RelationGetIndexPredicate(index)))
{
parallel_workers = 0;
goto done;
}
/*
* If parallel_workers storage parameter is set for the table, accept that
* as the number of parallel worker processes to launch (though still cap
* at max_parallel_maintenance_workers). Note that we deliberately do not
* consider any other factor when parallel_workers is set. (e.g., memory
* use by workers.)
*/
if (rel->rel_parallel_workers != -1)
{
parallel_workers = Min(rel->rel_parallel_workers,
max_parallel_maintenance_workers);
goto done;
}
/*
* Estimate heap relation size ourselves, since rel->pages cannot be
* trusted (heap RTE was marked as inheritance parent)
*/
estimate_rel_size(heap, NULL, &heap_blocks, &reltuples, &allvisfrac);
/*
* Determine number of workers to scan the heap relation using generic
* model
*/
parallel_workers = compute_parallel_worker(rel, heap_blocks, -1,
max_parallel_maintenance_workers);
/*
* Cap workers based on available maintenance_work_mem as needed.
*
* Note that each tuplesort participant receives an even share of the
* total maintenance_work_mem budget. Aim to leave participants
* (including the leader as a participant) with no less than 32MB of
* memory. This leaves cases where maintenance_work_mem is set to 64MB
* immediately past the threshold of being capable of launching a single
* parallel worker to sort.
*/
while (parallel_workers > 0 &&
maintenance_work_mem / (parallel_workers + 1) < 32768L)
parallel_workers--;
done:
index_close(index, NoLock);
heap_close(heap, NoLock);
return parallel_workers;
}
/* /*
* get_partitioned_child_rels * get_partitioned_child_rels
* Returns a list of the RT indexes of the partitioned child relations * Returns a list of the RT indexes of the partitioned child relations
......
...@@ -3655,6 +3655,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) ...@@ -3655,6 +3655,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_PARALLEL_BITMAP_SCAN: case WAIT_EVENT_PARALLEL_BITMAP_SCAN:
event_name = "ParallelBitmapScan"; event_name = "ParallelBitmapScan";
break; break;
case WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN:
event_name = "ParallelCreateIndexScan";
break;
case WAIT_EVENT_PROCARRAY_GROUP_UPDATE: case WAIT_EVENT_PROCARRAY_GROUP_UPDATE:
event_name = "ProcArrayGroupUpdate"; event_name = "ProcArrayGroupUpdate";
break; break;
......
...@@ -271,7 +271,7 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name) ...@@ -271,7 +271,7 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name)
* Open a file that was previously created in another backend (or this one) * Open a file that was previously created in another backend (or this one)
* with BufFileCreateShared in the same SharedFileSet using the same name. * with BufFileCreateShared in the same SharedFileSet using the same name.
* The backend that created the file must have called BufFileClose() or * The backend that created the file must have called BufFileClose() or
* BufFileExport() to make sure that it is ready to be opened by other * BufFileExportShared() to make sure that it is ready to be opened by other
* backends and render it read-only. * backends and render it read-only.
*/ */
BufFile * BufFile *
...@@ -800,3 +800,62 @@ BufFileTellBlock(BufFile *file) ...@@ -800,3 +800,62 @@ BufFileTellBlock(BufFile *file)
} }
#endif #endif
/*
* Return the current file size. Counts any holes left behind by
* BufFileViewAppend as part of the size.
*/
off_t
BufFileSize(BufFile *file)
{
return ((file->numFiles - 1) * (off_t) MAX_PHYSICAL_FILESIZE) +
FileGetSize(file->files[file->numFiles - 1]);
}
/*
* Append the contents of source file (managed within shared fileset) to
* end of target file (managed within same shared fileset).
*
* Note that operation subsumes ownership of underlying resources from
* "source". Caller should never call BufFileClose against source having
* called here first. Resource owners for source and target must match,
* too.
*
* This operation works by manipulating lists of segment files, so the
* file content is always appended at a MAX_PHYSICAL_FILESIZE-aligned
* boundary, typically creating empty holes before the boundary. These
* areas do not contain any interesting data, and cannot be read from by
* caller.
*
* Returns the block number within target where the contents of source
* begins. Caller should apply this as an offset when working off block
* positions that are in terms of the original BufFile space.
*/
long
BufFileAppend(BufFile *target, BufFile *source)
{
long startBlock = target->numFiles * BUFFILE_SEG_SIZE;
int newNumFiles = target->numFiles + source->numFiles;
int i;
Assert(target->fileset != NULL);
Assert(source->readOnly);
Assert(!source->dirty);
Assert(source->fileset != NULL);
if (target->resowner != source->resowner)
elog(ERROR, "could not append BufFile with non-matching resource owner");
target->files = (File *)
repalloc(target->files, sizeof(File) * newNumFiles);
target->offsets = (off_t *)
repalloc(target->offsets, sizeof(off_t) * newNumFiles);
for (i = target->numFiles; i < newNumFiles; i++)
{
target->files[i] = source->files[i - target->numFiles];
target->offsets[i] = 0L;
}
target->numFiles = newNumFiles;
return startBlock;
}
...@@ -2262,6 +2262,16 @@ FileGetRawMode(File file) ...@@ -2262,6 +2262,16 @@ FileGetRawMode(File file)
return VfdCache[file].fileMode; return VfdCache[file].fileMode;
} }
/*
* FileGetSize - returns the size of file
*/
off_t
FileGetSize(File file)
{
Assert(FileIsValid(file));
return VfdCache[file].fileSize;
}
/* /*
* Make room for another allocatedDescs[] array entry if needed and possible. * Make room for another allocatedDescs[] array entry if needed and possible.
* Returns true if an array element is available. * Returns true if an array element is available.
......
...@@ -291,6 +291,7 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples) ...@@ -291,6 +291,7 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples)
qstate->sortCollations, qstate->sortCollations,
qstate->sortNullsFirsts, qstate->sortNullsFirsts,
work_mem, work_mem,
NULL,
qstate->rescan_needed); qstate->rescan_needed);
else else
osastate->sortstate = tuplesort_begin_datum(qstate->sortColType, osastate->sortstate = tuplesort_begin_datum(qstate->sortColType,
...@@ -298,6 +299,7 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples) ...@@ -298,6 +299,7 @@ ordered_set_startup(FunctionCallInfo fcinfo, bool use_tuples)
qstate->sortCollation, qstate->sortCollation,
qstate->sortNullsFirst, qstate->sortNullsFirst,
work_mem, work_mem,
NULL,
qstate->rescan_needed); qstate->rescan_needed);
osastate->number_of_rows = 0; osastate->number_of_rows = 0;
......
...@@ -112,6 +112,7 @@ bool enableFsync = true; ...@@ -112,6 +112,7 @@ bool enableFsync = true;
bool allowSystemTableMods = false; bool allowSystemTableMods = false;
int work_mem = 1024; int work_mem = 1024;
int maintenance_work_mem = 16384; int maintenance_work_mem = 16384;
int max_parallel_maintenance_workers = 2;
/* /*
* Primary determinants of sizes of shared-memory structures. * Primary determinants of sizes of shared-memory structures.
......
...@@ -2734,6 +2734,16 @@ static struct config_int ConfigureNamesInt[] = ...@@ -2734,6 +2734,16 @@ static struct config_int ConfigureNamesInt[] =
check_autovacuum_max_workers, NULL, NULL check_autovacuum_max_workers, NULL, NULL
}, },
{
{"max_parallel_maintenance_workers", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
gettext_noop("Sets the maximum number of parallel processes per maintenance operation."),
NULL
},
&max_parallel_maintenance_workers,
2, 0, 1024,
NULL, NULL, NULL
},
{ {
{"max_parallel_workers_per_gather", PGC_USERSET, RESOURCES_ASYNCHRONOUS, {"max_parallel_workers_per_gather", PGC_USERSET, RESOURCES_ASYNCHRONOUS,
gettext_noop("Sets the maximum number of parallel processes per executor node."), gettext_noop("Sets the maximum number of parallel processes per executor node."),
......
...@@ -163,10 +163,11 @@ ...@@ -163,10 +163,11 @@
#effective_io_concurrency = 1 # 1-1000; 0 disables prefetching #effective_io_concurrency = 1 # 1-1000; 0 disables prefetching
#max_worker_processes = 8 # (change requires restart) #max_worker_processes = 8 # (change requires restart)
#max_parallel_maintenance_workers = 2 # taken from max_parallel_workers
#max_parallel_workers_per_gather = 2 # taken from max_parallel_workers #max_parallel_workers_per_gather = 2 # taken from max_parallel_workers
#parallel_leader_participation = on #parallel_leader_participation = on
#max_parallel_workers = 8 # maximum number of max_worker_processes that #max_parallel_workers = 8 # maximum number of max_worker_processes that
# can be used in parallel queries # can be used in parallel operations
#old_snapshot_threshold = -1 # 1min-60d; -1 disables; 0 is immediate #old_snapshot_threshold = -1 # 1min-60d; -1 disables; 0 is immediate
# (change requires restart) # (change requires restart)
#backend_flush_after = 0 # measured in pages, 0 disables #backend_flush_after = 0 # measured in pages, 0 disables
......
...@@ -52,7 +52,7 @@ provider postgresql { ...@@ -52,7 +52,7 @@ provider postgresql {
probe query__done(const char *); probe query__done(const char *);
probe statement__status(const char *); probe statement__status(const char *);
probe sort__start(int, bool, int, int, bool); probe sort__start(int, bool, int, int, bool, int);
probe sort__done(bool, long); probe sort__done(bool, long);
probe buffer__read__start(ForkNumber, BlockNumber, Oid, Oid, Oid, int, bool); probe buffer__read__start(ForkNumber, BlockNumber, Oid, Oid, Oid, int, bool);
......
This diff is collapsed.
This diff is collapsed.
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "catalog/pg_index.h" #include "catalog/pg_index.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/shm_toc.h"
/* There's room for a 16-bit vacuum cycle ID in BTPageOpaqueData */ /* There's room for a 16-bit vacuum cycle ID in BTPageOpaqueData */
typedef uint16 BTCycleId; typedef uint16 BTCycleId;
...@@ -430,8 +431,6 @@ typedef BTScanOpaqueData *BTScanOpaque; ...@@ -430,8 +431,6 @@ typedef BTScanOpaqueData *BTScanOpaque;
/* /*
* external entry points for btree, in nbtree.c * external entry points for btree, in nbtree.c
*/ */
extern IndexBuildResult *btbuild(Relation heap, Relation index,
struct IndexInfo *indexInfo);
extern void btbuildempty(Relation index); extern void btbuildempty(Relation index);
extern bool btinsert(Relation rel, Datum *values, bool *isnull, extern bool btinsert(Relation rel, Datum *values, bool *isnull,
ItemPointer ht_ctid, Relation heapRel, ItemPointer ht_ctid, Relation heapRel,
...@@ -547,13 +546,8 @@ extern bool btvalidate(Oid opclassoid); ...@@ -547,13 +546,8 @@ extern bool btvalidate(Oid opclassoid);
/* /*
* prototypes for functions in nbtsort.c * prototypes for functions in nbtsort.c
*/ */
typedef struct BTSpool BTSpool; /* opaque type known only within nbtsort.c */ extern IndexBuildResult *btbuild(Relation heap, Relation index,
struct IndexInfo *indexInfo);
extern BTSpool *_bt_spoolinit(Relation heap, Relation index, extern void _bt_parallel_build_main(dsm_segment *seg, shm_toc *toc);
bool isunique, bool isdead);
extern void _bt_spooldestroy(BTSpool *btspool);
extern void _bt_spool(BTSpool *btspool, ItemPointer self,
Datum *values, bool *isnull);
extern void _bt_leafbuild(BTSpool *btspool, BTSpool *spool2);
#endif /* NBTREE_H */ #endif /* NBTREE_H */
...@@ -59,7 +59,9 @@ extern PGDLLIMPORT bool InitializingParallelWorker; ...@@ -59,7 +59,9 @@ extern PGDLLIMPORT bool InitializingParallelWorker;
#define IsParallelWorker() (ParallelWorkerNumber >= 0) #define IsParallelWorker() (ParallelWorkerNumber >= 0)
extern ParallelContext *CreateParallelContext(const char *library_name, const char *function_name, int nworkers); extern ParallelContext *CreateParallelContext(const char *library_name,
const char *function_name, int nworkers,
bool serializable_okay);
extern void InitializeParallelDSM(ParallelContext *pcxt); extern void InitializeParallelDSM(ParallelContext *pcxt);
extern void ReinitializeParallelDSM(ParallelContext *pcxt); extern void ReinitializeParallelDSM(ParallelContext *pcxt);
extern void LaunchParallelWorkers(ParallelContext *pcxt); extern void LaunchParallelWorkers(ParallelContext *pcxt);
......
...@@ -39,6 +39,7 @@ typedef struct ParallelHeapScanDescData ...@@ -39,6 +39,7 @@ typedef struct ParallelHeapScanDescData
BlockNumber phs_startblock; /* starting block number */ BlockNumber phs_startblock; /* starting block number */
pg_atomic_uint64 phs_nallocated; /* number of blocks allocated to pg_atomic_uint64 phs_nallocated; /* number of blocks allocated to
* workers so far. */ * workers so far. */
bool phs_snapshot_any; /* SnapshotAny, not phs_snapshot_data? */
char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER]; char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
} ParallelHeapScanDescData; } ParallelHeapScanDescData;
......
...@@ -104,14 +104,16 @@ extern void index_build(Relation heapRelation, ...@@ -104,14 +104,16 @@ extern void index_build(Relation heapRelation,
Relation indexRelation, Relation indexRelation,
IndexInfo *indexInfo, IndexInfo *indexInfo,
bool isprimary, bool isprimary,
bool isreindex); bool isreindex,
bool parallel);
extern double IndexBuildHeapScan(Relation heapRelation, extern double IndexBuildHeapScan(Relation heapRelation,
Relation indexRelation, Relation indexRelation,
IndexInfo *indexInfo, IndexInfo *indexInfo,
bool allow_sync, bool allow_sync,
IndexBuildCallback callback, IndexBuildCallback callback,
void *callback_state); void *callback_state,
HeapScanDesc scan);
extern double IndexBuildHeapRangeScan(Relation heapRelation, extern double IndexBuildHeapRangeScan(Relation heapRelation,
Relation indexRelation, Relation indexRelation,
IndexInfo *indexInfo, IndexInfo *indexInfo,
...@@ -120,7 +122,8 @@ extern double IndexBuildHeapRangeScan(Relation heapRelation, ...@@ -120,7 +122,8 @@ extern double IndexBuildHeapRangeScan(Relation heapRelation,
BlockNumber start_blockno, BlockNumber start_blockno,
BlockNumber end_blockno, BlockNumber end_blockno,
IndexBuildCallback callback, IndexBuildCallback callback,
void *callback_state); void *callback_state,
HeapScanDesc scan);
extern void validate_index(Oid heapId, Oid indexId, Snapshot snapshot); extern void validate_index(Oid heapId, Oid indexId, Snapshot snapshot);
......
...@@ -241,6 +241,7 @@ extern bool enableFsync; ...@@ -241,6 +241,7 @@ extern bool enableFsync;
extern PGDLLIMPORT bool allowSystemTableMods; extern PGDLLIMPORT bool allowSystemTableMods;
extern PGDLLIMPORT int work_mem; extern PGDLLIMPORT int work_mem;
extern PGDLLIMPORT int maintenance_work_mem; extern PGDLLIMPORT int maintenance_work_mem;
extern PGDLLIMPORT int max_parallel_maintenance_workers;
extern int VacuumCostPageHit; extern int VacuumCostPageHit;
extern int VacuumCostPageMiss; extern int VacuumCostPageMiss;
......
...@@ -132,11 +132,12 @@ typedef struct ExprState ...@@ -132,11 +132,12 @@ typedef struct ExprState
* ReadyForInserts is it valid for inserts? * ReadyForInserts is it valid for inserts?
* Concurrent are we doing a concurrent index build? * Concurrent are we doing a concurrent index build?
* BrokenHotChain did we detect any broken HOT chains? * BrokenHotChain did we detect any broken HOT chains?
* ParallelWorkers # of workers requested (excludes leader)
* AmCache private cache area for index AM * AmCache private cache area for index AM
* Context memory context holding this IndexInfo * Context memory context holding this IndexInfo
* *
* ii_Concurrent and ii_BrokenHotChain are used only during index build; * ii_Concurrent, ii_BrokenHotChain, and ii_ParallelWorkers are used only
* they're conventionally set to false otherwise. * during index build; they're conventionally zeroed otherwise.
* ---------------- * ----------------
*/ */
typedef struct IndexInfo typedef struct IndexInfo
...@@ -158,6 +159,7 @@ typedef struct IndexInfo ...@@ -158,6 +159,7 @@ typedef struct IndexInfo
bool ii_ReadyForInserts; bool ii_ReadyForInserts;
bool ii_Concurrent; bool ii_Concurrent;
bool ii_BrokenHotChain; bool ii_BrokenHotChain;
int ii_ParallelWorkers;
Oid ii_Am; Oid ii_Am;
void *ii_AmCache; void *ii_AmCache;
MemoryContext ii_Context; MemoryContext ii_Context;
......
...@@ -55,7 +55,7 @@ extern RelOptInfo *standard_join_search(PlannerInfo *root, int levels_needed, ...@@ -55,7 +55,7 @@ extern RelOptInfo *standard_join_search(PlannerInfo *root, int levels_needed,
extern void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel); extern void generate_gather_paths(PlannerInfo *root, RelOptInfo *rel);
extern int compute_parallel_worker(RelOptInfo *rel, double heap_pages, extern int compute_parallel_worker(RelOptInfo *rel, double heap_pages,
double index_pages); double index_pages, int max_workers);
extern void create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel, extern void create_partial_bitmap_paths(PlannerInfo *root, RelOptInfo *rel,
Path *bitmapqual); Path *bitmapqual);
extern void generate_partition_wise_join_paths(PlannerInfo *root, extern void generate_partition_wise_join_paths(PlannerInfo *root,
......
...@@ -56,6 +56,7 @@ extern Expr *expression_planner(Expr *expr); ...@@ -56,6 +56,7 @@ extern Expr *expression_planner(Expr *expr);
extern Expr *preprocess_phv_expression(PlannerInfo *root, Expr *expr); extern Expr *preprocess_phv_expression(PlannerInfo *root, Expr *expr);
extern bool plan_cluster_use_sort(Oid tableOid, Oid indexOid); extern bool plan_cluster_use_sort(Oid tableOid, Oid indexOid);
extern int plan_create_index_workers(Oid tableOid, Oid indexOid);
extern List *get_partitioned_child_rels(PlannerInfo *root, Index rti, extern List *get_partitioned_child_rels(PlannerInfo *root, Index rti,
bool *part_cols_updated); bool *part_cols_updated);
......
...@@ -826,6 +826,7 @@ typedef enum ...@@ -826,6 +826,7 @@ typedef enum
WAIT_EVENT_MQ_SEND, WAIT_EVENT_MQ_SEND,
WAIT_EVENT_PARALLEL_FINISH, WAIT_EVENT_PARALLEL_FINISH,
WAIT_EVENT_PARALLEL_BITMAP_SCAN, WAIT_EVENT_PARALLEL_BITMAP_SCAN,
WAIT_EVENT_PARALLEL_CREATE_INDEX_SCAN,
WAIT_EVENT_PROCARRAY_GROUP_UPDATE, WAIT_EVENT_PROCARRAY_GROUP_UPDATE,
WAIT_EVENT_CLOG_GROUP_UPDATE, WAIT_EVENT_CLOG_GROUP_UPDATE,
WAIT_EVENT_REPLICATION_ORIGIN_DROP, WAIT_EVENT_REPLICATION_ORIGIN_DROP,
......
...@@ -43,6 +43,8 @@ extern size_t BufFileWrite(BufFile *file, void *ptr, size_t size); ...@@ -43,6 +43,8 @@ extern size_t BufFileWrite(BufFile *file, void *ptr, size_t size);
extern int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence); extern int BufFileSeek(BufFile *file, int fileno, off_t offset, int whence);
extern void BufFileTell(BufFile *file, int *fileno, off_t *offset); extern void BufFileTell(BufFile *file, int *fileno, off_t *offset);
extern int BufFileSeekBlock(BufFile *file, long blknum); extern int BufFileSeekBlock(BufFile *file, long blknum);
extern off_t BufFileSize(BufFile *file);
extern long BufFileAppend(BufFile *target, BufFile *source);
extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name); extern BufFile *BufFileCreateShared(SharedFileSet *fileset, const char *name);
extern void BufFileExportShared(BufFile *file); extern void BufFileExportShared(BufFile *file);
......
...@@ -78,6 +78,7 @@ extern char *FilePathName(File file); ...@@ -78,6 +78,7 @@ extern char *FilePathName(File file);
extern int FileGetRawDesc(File file); extern int FileGetRawDesc(File file);
extern int FileGetRawFlags(File file); extern int FileGetRawFlags(File file);
extern mode_t FileGetRawMode(File file); extern mode_t FileGetRawMode(File file);
extern off_t FileGetSize(File file);
/* Operations used for sharing named temporary files */ /* Operations used for sharing named temporary files */
extern File PathNameCreateTemporaryFile(const char *name, bool error_on_failure); extern File PathNameCreateTemporaryFile(const char *name, bool error_on_failure);
......
...@@ -16,15 +16,49 @@ ...@@ -16,15 +16,49 @@
#ifndef LOGTAPE_H #ifndef LOGTAPE_H
#define LOGTAPE_H #define LOGTAPE_H
#include "storage/sharedfileset.h"
/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */ /* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
typedef struct LogicalTapeSet LogicalTapeSet; typedef struct LogicalTapeSet LogicalTapeSet;
/*
* The approach tuplesort.c takes to parallel external sorts is that workers,
* whose state is almost the same as independent serial sorts, are made to
* produce a final materialized tape of sorted output in all cases. This is
* frozen, just like any case requiring a final materialized tape. However,
* there is one difference, which is that freezing will also export an
* underlying shared fileset BufFile for sharing. Freezing produces TapeShare
* metadata for the worker when this happens, which is passed along through
* shared memory to leader.
*
* The leader process can then pass an array of TapeShare metadata (one per
* worker participant) to LogicalTapeSetCreate(), alongside a handle to a
* shared fileset, which is sufficient to construct a new logical tapeset that
* consists of each of the tapes materialized by workers.
*
* Note that while logtape.c does create an empty leader tape at the end of the
* tapeset in the leader case, it can never be written to due to a restriction
* in the shared buffile infrastructure.
*/
typedef struct TapeShare
{
/*
* firstblocknumber is first block that should be read from materialized
* tape.
*
* buffilesize is the size of associated BufFile following freezing.
*/
long firstblocknumber;
off_t buffilesize;
} TapeShare;
/* /*
* prototypes for functions in logtape.c * prototypes for functions in logtape.c
*/ */
extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes); extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes, TapeShare *shared,
SharedFileSet *fileset, int worker);
extern void LogicalTapeSetClose(LogicalTapeSet *lts); extern void LogicalTapeSetClose(LogicalTapeSet *lts);
extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts); extern void LogicalTapeSetForgetFreeSpace(LogicalTapeSet *lts);
extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
...@@ -34,7 +68,8 @@ extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, ...@@ -34,7 +68,8 @@ extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum, extern void LogicalTapeRewindForRead(LogicalTapeSet *lts, int tapenum,
size_t buffer_size); size_t buffer_size);
extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum); extern void LogicalTapeRewindForWrite(LogicalTapeSet *lts, int tapenum);
extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum); extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum,
TapeShare *share);
extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, extern size_t LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
size_t size); size_t size);
extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, extern void LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
......
...@@ -8,7 +8,8 @@ ...@@ -8,7 +8,8 @@
* if necessary). It works efficiently for both small and large amounts * if necessary). It works efficiently for both small and large amounts
* of data. Small amounts are sorted in-memory using qsort(). Large * of data. Small amounts are sorted in-memory using qsort(). Large
* amounts are sorted using temporary files and a standard external sort * amounts are sorted using temporary files and a standard external sort
* algorithm. * algorithm. Parallel sorts use a variant of this external sort
* algorithm, and are typically only used for large amounts of data.
* *
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
...@@ -23,13 +24,39 @@ ...@@ -23,13 +24,39 @@
#include "access/itup.h" #include "access/itup.h"
#include "executor/tuptable.h" #include "executor/tuptable.h"
#include "fmgr.h" #include "fmgr.h"
#include "storage/dsm.h"
#include "utils/relcache.h" #include "utils/relcache.h"
/* Tuplesortstate is an opaque type whose details are not known outside /*
* tuplesort.c. * Tuplesortstate and Sharedsort are opaque types whose details are not
* known outside tuplesort.c.
*/ */
typedef struct Tuplesortstate Tuplesortstate; typedef struct Tuplesortstate Tuplesortstate;
typedef struct Sharedsort Sharedsort;
/*
* Tuplesort parallel coordination state, allocated by each participant in
* local memory. Participant caller initializes everything. See usage notes
* below.
*/
typedef struct SortCoordinateData
{
/* Worker process? If not, must be leader. */
bool isWorker;
/*
* Leader-process-passed number of participants known launched (workers
* set this to -1). Includes state within leader needed for it to
* participate as a worker, if any.
*/
int nParticipants;
/* Private opaque state (points to shared memory) */
Sharedsort *sharedsort;
} SortCoordinateData;
typedef struct SortCoordinateData *SortCoordinate;
/* /*
* Data structures for reporting sort statistics. Note that * Data structures for reporting sort statistics. Note that
...@@ -66,6 +93,8 @@ typedef struct TuplesortInstrumentation ...@@ -66,6 +93,8 @@ typedef struct TuplesortInstrumentation
* sorting HeapTuples and two more for sorting IndexTuples. Yet another * sorting HeapTuples and two more for sorting IndexTuples. Yet another
* API supports sorting bare Datums. * API supports sorting bare Datums.
* *
* Serial sort callers should pass NULL for their coordinate argument.
*
* The "heap" API actually stores/sorts MinimalTuples, which means it doesn't * The "heap" API actually stores/sorts MinimalTuples, which means it doesn't
* preserve the system columns (tuple identity and transaction visibility * preserve the system columns (tuple identity and transaction visibility
* info). The sort keys are specified by column numbers within the tuples * info). The sort keys are specified by column numbers within the tuples
...@@ -84,30 +113,107 @@ typedef struct TuplesortInstrumentation ...@@ -84,30 +113,107 @@ typedef struct TuplesortInstrumentation
* *
* The "index_hash" API is similar to index_btree, but the tuples are * The "index_hash" API is similar to index_btree, but the tuples are
* actually sorted by their hash codes not the raw data. * actually sorted by their hash codes not the raw data.
*
* Parallel sort callers are required to coordinate multiple tuplesort states
* in a leader process and one or more worker processes. The leader process
* must launch workers, and have each perform an independent "partial"
* tuplesort, typically fed by the parallel heap interface. The leader later
* produces the final output (internally, it merges runs output by workers).
*
* Callers must do the following to perform a sort in parallel using multiple
* worker processes:
*
* 1. Request tuplesort-private shared memory for n workers. Use
* tuplesort_estimate_shared() to get the required size.
* 2. Have leader process initialize allocated shared memory using
* tuplesort_initialize_shared(). Launch workers.
* 3. Initialize a coordinate argument within both the leader process, and
* for each worker process. This has a pointer to the shared
* tuplesort-private structure, as well as some caller-initialized fields.
* Leader's coordinate argument reliably indicates number of workers
* launched (this is unused by workers).
* 4. Begin a tuplesort using some appropriate tuplesort_begin* routine,
* (passing the coordinate argument) within each worker. The workMem
* arguments need not be identical. All other arguments should match
* exactly, though.
* 5. tuplesort_attach_shared() should be called by all workers. Feed tuples
* to each worker, and call tuplesort_performsort() within each when input
* is exhausted.
* 6. Call tuplesort_end() in each worker process. Worker processes can shut
* down once tuplesort_end() returns.
* 7. Begin a tuplesort in the leader using the same tuplesort_begin*
* routine, passing a leader-appropriate coordinate argument (this can
* happen as early as during step 3, actually, since we only need to know
* the number of workers successfully launched). The leader must now wait
* for workers to finish. Caller must use own mechanism for ensuring that
* next step isn't reached until all workers have called and returned from
* tuplesort_performsort(). (Note that it's okay if workers have already
* also called tuplesort_end() by then.)
* 8. Call tuplesort_performsort() in leader. Consume output using the
* appropriate tuplesort_get* routine. Leader can skip this step if
* tuplesort turns out to be unnecessary.
* 9. Call tuplesort_end() in leader.
*
* This division of labor assumes nothing about how input tuples are produced,
* but does require that caller combine the state of multiple tuplesorts for
* any purpose other than producing the final output. For example, callers
* must consider that tuplesort_get_stats() reports on only one worker's role
* in a sort (or the leader's role), and not statistics for the sort as a
* whole.
*
* Note that callers may use the leader process to sort runs as if it was an
* independent worker process (prior to the process performing a leader sort
* to produce the final sorted output). Doing so only requires a second
* "partial" tuplesort within the leader process, initialized like that of a
* worker process. The steps above don't touch on this directly. The only
* difference is that the tuplesort_attach_shared() call is never needed within
* leader process, because the backend as a whole holds the shared fileset
* reference. A worker Tuplesortstate in leader is expected to do exactly the
* same amount of total initial processing work as a worker process
* Tuplesortstate, since the leader process has nothing else to do before
* workers finish.
*
* Note that only a very small amount of memory will be allocated prior to
* the leader state first consuming input, and that workers will free the
* vast majority of their memory upon returning from tuplesort_performsort().
* Callers can rely on this to arrange for memory to be used in a way that
* respects a workMem-style budget across an entire parallel sort operation.
*
* Callers are responsible for parallel safety in general. However, they
* can at least rely on there being no parallel safety hazards within
* tuplesort, because tuplesort thinks of the sort as several independent
* sorts whose results are combined. Since, in general, the behavior of
* sort operators is immutable, caller need only worry about the parallel
* safety of whatever the process is through which input tuples are
* generated (typically, caller uses a parallel heap scan).
*/ */
extern Tuplesortstate *tuplesort_begin_heap(TupleDesc tupDesc, extern Tuplesortstate *tuplesort_begin_heap(TupleDesc tupDesc,
int nkeys, AttrNumber *attNums, int nkeys, AttrNumber *attNums,
Oid *sortOperators, Oid *sortCollations, Oid *sortOperators, Oid *sortCollations,
bool *nullsFirstFlags, bool *nullsFirstFlags,
int workMem, bool randomAccess); int workMem, SortCoordinate coordinate,
bool randomAccess);
extern Tuplesortstate *tuplesort_begin_cluster(TupleDesc tupDesc, extern Tuplesortstate *tuplesort_begin_cluster(TupleDesc tupDesc,
Relation indexRel, Relation indexRel, int workMem,
int workMem, bool randomAccess); SortCoordinate coordinate, bool randomAccess);
extern Tuplesortstate *tuplesort_begin_index_btree(Relation heapRel, extern Tuplesortstate *tuplesort_begin_index_btree(Relation heapRel,
Relation indexRel, Relation indexRel,
bool enforceUnique, bool enforceUnique,
int workMem, bool randomAccess); int workMem, SortCoordinate coordinate,
bool randomAccess);
extern Tuplesortstate *tuplesort_begin_index_hash(Relation heapRel, extern Tuplesortstate *tuplesort_begin_index_hash(Relation heapRel,
Relation indexRel, Relation indexRel,
uint32 high_mask, uint32 high_mask,
uint32 low_mask, uint32 low_mask,
uint32 max_buckets, uint32 max_buckets,
int workMem, bool randomAccess); int workMem, SortCoordinate coordinate,
bool randomAccess);
extern Tuplesortstate *tuplesort_begin_datum(Oid datumType, extern Tuplesortstate *tuplesort_begin_datum(Oid datumType,
Oid sortOperator, Oid sortCollation, Oid sortOperator, Oid sortCollation,
bool nullsFirstFlag, bool nullsFirstFlag,
int workMem, bool randomAccess); int workMem, SortCoordinate coordinate,
bool randomAccess);
extern void tuplesort_set_bound(Tuplesortstate *state, int64 bound); extern void tuplesort_set_bound(Tuplesortstate *state, int64 bound);
...@@ -141,10 +247,16 @@ extern const char *tuplesort_space_type_name(TuplesortSpaceType t); ...@@ -141,10 +247,16 @@ extern const char *tuplesort_space_type_name(TuplesortSpaceType t);
extern int tuplesort_merge_order(int64 allowedMem); extern int tuplesort_merge_order(int64 allowedMem);
extern Size tuplesort_estimate_shared(int nworkers);
extern void tuplesort_initialize_shared(Sharedsort *shared, int nWorkers,
dsm_segment *seg);
extern void tuplesort_attach_shared(Sharedsort *shared, dsm_segment *seg);
/* /*
* These routines may only be called if randomAccess was specified 'true'. * These routines may only be called if randomAccess was specified 'true'.
* Likewise, backwards scan in gettuple/getdatum is only allowed if * Likewise, backwards scan in gettuple/getdatum is only allowed if
* randomAccess was specified. * randomAccess was specified. Note that parallel sorts do not support
* randomAccess.
*/ */
extern void tuplesort_rescan(Tuplesortstate *state); extern void tuplesort_rescan(Tuplesortstate *state);
......
...@@ -165,6 +165,7 @@ BTArrayKeyInfo ...@@ -165,6 +165,7 @@ BTArrayKeyInfo
BTBuildState BTBuildState
BTCycleId BTCycleId
BTIndexStat BTIndexStat
BTLeader
BTMetaPageData BTMetaPageData
BTOneVacInfo BTOneVacInfo
BTPS_State BTPS_State
...@@ -178,6 +179,7 @@ BTScanOpaqueData ...@@ -178,6 +179,7 @@ BTScanOpaqueData
BTScanPos BTScanPos
BTScanPosData BTScanPosData
BTScanPosItem BTScanPosItem
BTShared
BTSortArrayContext BTSortArrayContext
BTSpool BTSpool
BTStack BTStack
...@@ -2047,6 +2049,7 @@ SharedSortInfo ...@@ -2047,6 +2049,7 @@ SharedSortInfo
SharedTuplestore SharedTuplestore
SharedTuplestoreAccessor SharedTuplestoreAccessor
SharedTypmodTableEntry SharedTypmodTableEntry
Sharedsort
ShellTypeInfo ShellTypeInfo
ShippableCacheEntry ShippableCacheEntry
ShippableCacheKey ShippableCacheKey
...@@ -2091,6 +2094,8 @@ Sort ...@@ -2091,6 +2094,8 @@ Sort
SortBy SortBy
SortByDir SortByDir
SortByNulls SortByNulls
SortCoordinate
SortCoordinateData
SortGroupClause SortGroupClause
SortItem SortItem
SortPath SortPath
...@@ -2234,6 +2239,7 @@ TableSpaceOpts ...@@ -2234,6 +2239,7 @@ TableSpaceOpts
TablespaceList TablespaceList
TablespaceListCell TablespaceListCell
TapeBlockTrailer TapeBlockTrailer
TapeShare
TarMethodData TarMethodData
TarMethodFile TarMethodFile
TargetEntry TargetEntry
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment