Commit ab727167 authored by Robert Haas's avatar Robert Haas

Support Parallel Append plan nodes.

When we create an Append node, we can spread out the workers over the
subplans instead of piling on to each subplan one at a time, which
should typically be a bit more efficient, both because the startup
cost of any plan executed entirely by one worker is paid only once and
also because of reduced contention.  We can also construct Append
plans using a mix of partial and non-partial subplans, which may allow
for parallelism in places that otherwise couldn't support it.
Unfortunately, this patch doesn't handle the important case of
parallelizing UNION ALL by running each branch in a separate worker;
the executor infrastructure is added here, but more planner work is
needed.

Amit Khandekar, Robert Haas, Amul Sul, reviewed and tested by
Ashutosh Bapat, Amit Langote, Rafia Sabih, Amit Kapila, and
Rajkumar Raghuwanshi.

Discussion: http://postgr.es/m/CAJ3gD9dy0K_E8r727heqXoBmWZ83HwLFwdcaSSmBQ1+S+vRuUQ@mail.gmail.com
parent 8097d189
...@@ -3633,6 +3633,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class=" ...@@ -3633,6 +3633,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry id="guc-enable-parallel-append" xreflabel="enable_parallel_append">
<term><varname>enable_parallel_append</varname> (<type>boolean</type>)
<indexterm>
<primary><varname>enable_parallel_append</> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Enables or disables the query planner's use of parallel-aware
append plan types. The default is <literal>on</>.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-enable-partition-wise-join" xreflabel="enable_partition_wise_join"> <varlistentry id="guc-enable-partition-wise-join" xreflabel="enable_partition_wise_join">
<term><varname>enable_partition_wise_join</varname> (<type>boolean</type>) <term><varname>enable_partition_wise_join</varname> (<type>boolean</type>)
<indexterm> <indexterm>
......
...@@ -845,7 +845,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ...@@ -845,7 +845,7 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<tbody> <tbody>
<row> <row>
<entry morerows="62"><literal>LWLock</literal></entry> <entry morerows="63"><literal>LWLock</literal></entry>
<entry><literal>ShmemIndexLock</literal></entry> <entry><literal>ShmemIndexLock</literal></entry>
<entry>Waiting to find or allocate space in shared memory.</entry> <entry>Waiting to find or allocate space in shared memory.</entry>
</row> </row>
...@@ -1116,6 +1116,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ...@@ -1116,6 +1116,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
<entry><literal>tbm</literal></entry> <entry><literal>tbm</literal></entry>
<entry>Waiting for TBM shared iterator lock.</entry> <entry>Waiting for TBM shared iterator lock.</entry>
</row> </row>
<row>
<entry><literal>parallel_append</literal></entry>
<entry>Waiting to choose the next subplan during Parallel Append plan
execution.</entry>
</row>
<row> <row>
<entry morerows="9"><literal>Lock</literal></entry> <entry morerows="9"><literal>Lock</literal></entry>
<entry><literal>relation</literal></entry> <entry><literal>relation</literal></entry>
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "executor/execExpr.h" #include "executor/execExpr.h"
#include "executor/execParallel.h" #include "executor/execParallel.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "executor/nodeAppend.h"
#include "executor/nodeBitmapHeapscan.h" #include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h" #include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h" #include "executor/nodeForeignscan.h"
...@@ -250,6 +251,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ...@@ -250,6 +251,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecForeignScanEstimate((ForeignScanState *) planstate, ExecForeignScanEstimate((ForeignScanState *) planstate,
e->pcxt); e->pcxt);
break; break;
case T_AppendState:
if (planstate->plan->parallel_aware)
ExecAppendEstimate((AppendState *) planstate,
e->pcxt);
break;
case T_CustomScanState: case T_CustomScanState:
if (planstate->plan->parallel_aware) if (planstate->plan->parallel_aware)
ExecCustomScanEstimate((CustomScanState *) planstate, ExecCustomScanEstimate((CustomScanState *) planstate,
...@@ -453,6 +459,11 @@ ExecParallelInitializeDSM(PlanState *planstate, ...@@ -453,6 +459,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecForeignScanInitializeDSM((ForeignScanState *) planstate, ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
d->pcxt); d->pcxt);
break; break;
case T_AppendState:
if (planstate->plan->parallel_aware)
ExecAppendInitializeDSM((AppendState *) planstate,
d->pcxt);
break;
case T_CustomScanState: case T_CustomScanState:
if (planstate->plan->parallel_aware) if (planstate->plan->parallel_aware)
ExecCustomScanInitializeDSM((CustomScanState *) planstate, ExecCustomScanInitializeDSM((CustomScanState *) planstate,
...@@ -884,6 +895,10 @@ ExecParallelReInitializeDSM(PlanState *planstate, ...@@ -884,6 +895,10 @@ ExecParallelReInitializeDSM(PlanState *planstate,
ExecForeignScanReInitializeDSM((ForeignScanState *) planstate, ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
pcxt); pcxt);
break; break;
case T_AppendState:
if (planstate->plan->parallel_aware)
ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
break;
case T_CustomScanState: case T_CustomScanState:
if (planstate->plan->parallel_aware) if (planstate->plan->parallel_aware)
ExecCustomScanReInitializeDSM((CustomScanState *) planstate, ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
...@@ -1194,6 +1209,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) ...@@ -1194,6 +1209,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
ExecForeignScanInitializeWorker((ForeignScanState *) planstate, ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
pwcxt); pwcxt);
break; break;
case T_AppendState:
if (planstate->plan->parallel_aware)
ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
break;
case T_CustomScanState: case T_CustomScanState:
if (planstate->plan->parallel_aware) if (planstate->plan->parallel_aware)
ExecCustomScanInitializeWorker((CustomScanState *) planstate, ExecCustomScanInitializeWorker((CustomScanState *) planstate,
......
This diff is collapsed.
...@@ -242,6 +242,7 @@ _copyAppend(const Append *from) ...@@ -242,6 +242,7 @@ _copyAppend(const Append *from)
*/ */
COPY_NODE_FIELD(partitioned_rels); COPY_NODE_FIELD(partitioned_rels);
COPY_NODE_FIELD(appendplans); COPY_NODE_FIELD(appendplans);
COPY_SCALAR_FIELD(first_partial_plan);
return newnode; return newnode;
} }
......
...@@ -1249,6 +1249,44 @@ list_copy_tail(const List *oldlist, int nskip) ...@@ -1249,6 +1249,44 @@ list_copy_tail(const List *oldlist, int nskip)
return newlist; return newlist;
} }
/*
* Sort a list using qsort. A sorted list is built but the cells of the
* original list are re-used. The comparator function receives arguments of
* type ListCell **
*/
List *
list_qsort(const List *list, list_qsort_comparator cmp)
{
ListCell *cell;
int i;
int len = list_length(list);
ListCell **list_arr;
List *new_list;
if (len == 0)
return NIL;
i = 0;
list_arr = palloc(sizeof(ListCell *) * len);
foreach(cell, list)
list_arr[i++] = cell;
qsort(list_arr, len, sizeof(ListCell *), cmp);
new_list = (List *) palloc(sizeof(List));
new_list->type = list->type;
new_list->length = len;
new_list->head = list_arr[0];
new_list->tail = list_arr[len - 1];
for (i = 0; i < len - 1; i++)
list_arr[i]->next = list_arr[i + 1];
list_arr[len - 1]->next = NULL;
pfree(list_arr);
return new_list;
}
/* /*
* Temporary compatibility functions * Temporary compatibility functions
* *
......
...@@ -399,6 +399,7 @@ _outAppend(StringInfo str, const Append *node) ...@@ -399,6 +399,7 @@ _outAppend(StringInfo str, const Append *node)
WRITE_NODE_FIELD(partitioned_rels); WRITE_NODE_FIELD(partitioned_rels);
WRITE_NODE_FIELD(appendplans); WRITE_NODE_FIELD(appendplans);
WRITE_INT_FIELD(first_partial_plan);
} }
static void static void
......
...@@ -1600,6 +1600,7 @@ _readAppend(void) ...@@ -1600,6 +1600,7 @@ _readAppend(void)
READ_NODE_FIELD(partitioned_rels); READ_NODE_FIELD(partitioned_rels);
READ_NODE_FIELD(appendplans); READ_NODE_FIELD(appendplans);
READ_INT_FIELD(first_partial_plan);
READ_DONE(); READ_DONE();
} }
......
This diff is collapsed.
...@@ -128,6 +128,7 @@ bool enable_mergejoin = true; ...@@ -128,6 +128,7 @@ bool enable_mergejoin = true;
bool enable_hashjoin = true; bool enable_hashjoin = true;
bool enable_gathermerge = true; bool enable_gathermerge = true;
bool enable_partition_wise_join = false; bool enable_partition_wise_join = false;
bool enable_parallel_append = true;
typedef struct typedef struct
{ {
...@@ -160,6 +161,8 @@ static Selectivity get_foreign_key_join_selectivity(PlannerInfo *root, ...@@ -160,6 +161,8 @@ static Selectivity get_foreign_key_join_selectivity(PlannerInfo *root,
Relids inner_relids, Relids inner_relids,
SpecialJoinInfo *sjinfo, SpecialJoinInfo *sjinfo,
List **restrictlist); List **restrictlist);
static Cost append_nonpartial_cost(List *subpaths, int numpaths,
int parallel_workers);
static void set_rel_width(PlannerInfo *root, RelOptInfo *rel); static void set_rel_width(PlannerInfo *root, RelOptInfo *rel);
static double relation_byte_size(double tuples, int width); static double relation_byte_size(double tuples, int width);
static double page_size(double tuples, int width); static double page_size(double tuples, int width);
...@@ -1741,6 +1744,167 @@ cost_sort(Path *path, PlannerInfo *root, ...@@ -1741,6 +1744,167 @@ cost_sort(Path *path, PlannerInfo *root,
path->total_cost = startup_cost + run_cost; path->total_cost = startup_cost + run_cost;
} }
/*
* append_nonpartial_cost
* Estimate the cost of the non-partial paths in a Parallel Append.
* The non-partial paths are assumed to be the first "numpaths" paths
* from the subpaths list, and to be in order of decreasing cost.
*/
static Cost
append_nonpartial_cost(List *subpaths, int numpaths, int parallel_workers)
{
Cost *costarr;
int arrlen;
ListCell *l;
ListCell *cell;
int i;
int path_index;
int min_index;
int max_index;
if (numpaths == 0)
return 0;
/*
* Array length is number of workers or number of relevants paths,
* whichever is less.
*/
arrlen = Min(parallel_workers, numpaths);
costarr = (Cost *) palloc(sizeof(Cost) * arrlen);
/* The first few paths will each be claimed by a different worker. */
path_index = 0;
foreach(cell, subpaths)
{
Path *subpath = (Path *) lfirst(cell);
if (path_index == arrlen)
break;
costarr[path_index++] = subpath->total_cost;
}
/*
* Since subpaths are sorted by decreasing cost, the last one will have
* the minimum cost.
*/
min_index = arrlen - 1;
/*
* For each of the remaining subpaths, add its cost to the array element
* with minimum cost.
*/
for_each_cell(l, cell)
{
Path *subpath = (Path *) lfirst(l);
int i;
/* Consider only the non-partial paths */
if (path_index++ == numpaths)
break;
costarr[min_index] += subpath->total_cost;
/* Update the new min cost array index */
for (min_index = i = 0; i < arrlen; i++)
{
if (costarr[i] < costarr[min_index])
min_index = i;
}
}
/* Return the highest cost from the array */
for (max_index = i = 0; i < arrlen; i++)
{
if (costarr[i] > costarr[max_index])
max_index = i;
}
return costarr[max_index];
}
/*
* cost_append
* Determines and returns the cost of an Append node.
*
* We charge nothing extra for the Append itself, which perhaps is too
* optimistic, but since it doesn't do any selection or projection, it is a
* pretty cheap node.
*/
void
cost_append(AppendPath *apath)
{
ListCell *l;
apath->path.startup_cost = 0;
apath->path.total_cost = 0;
if (apath->subpaths == NIL)
return;
if (!apath->path.parallel_aware)
{
Path *subpath = (Path *) linitial(apath->subpaths);
/*
* Startup cost of non-parallel-aware Append is the startup cost of
* first subpath.
*/
apath->path.startup_cost = subpath->startup_cost;
/* Compute rows and costs as sums of subplan rows and costs. */
foreach(l, apath->subpaths)
{
Path *subpath = (Path *) lfirst(l);
apath->path.rows += subpath->rows;
apath->path.total_cost += subpath->total_cost;
}
}
else /* parallel-aware */
{
int i = 0;
double parallel_divisor = get_parallel_divisor(&apath->path);
/* Calculate startup cost. */
foreach(l, apath->subpaths)
{
Path *subpath = (Path *) lfirst(l);
/*
* Append will start returning tuples when the child node having
* lowest startup cost is done setting up. We consider only the
* first few subplans that immediately get a worker assigned.
*/
if (i == 0)
apath->path.startup_cost = subpath->startup_cost;
else if (i < apath->path.parallel_workers)
apath->path.startup_cost = Min(apath->path.startup_cost,
subpath->startup_cost);
/*
* Apply parallel divisor to non-partial subpaths. Also add the
* cost of partial paths to the total cost, but ignore non-partial
* paths for now.
*/
if (i < apath->first_partial_path)
apath->path.rows += subpath->rows / parallel_divisor;
else
{
apath->path.rows += subpath->rows;
apath->path.total_cost += subpath->total_cost;
}
i++;
}
/* Add cost for non-partial subpaths. */
apath->path.total_cost +=
append_nonpartial_cost(apath->subpaths,
apath->first_partial_path,
apath->path.parallel_workers);
}
}
/* /*
* cost_merge_append * cost_merge_append
* Determines and returns the cost of a MergeAppend node. * Determines and returns the cost of a MergeAppend node.
......
...@@ -1232,7 +1232,8 @@ mark_dummy_rel(RelOptInfo *rel) ...@@ -1232,7 +1232,8 @@ mark_dummy_rel(RelOptInfo *rel)
rel->partial_pathlist = NIL; rel->partial_pathlist = NIL;
/* Set up the dummy path */ /* Set up the dummy path */
add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL)); add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL,
0, false, NIL, -1));
/* Set or update cheapest_total_path and related fields */ /* Set or update cheapest_total_path and related fields */
set_cheapest(rel); set_cheapest(rel);
......
...@@ -203,7 +203,8 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual ...@@ -203,7 +203,8 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual
Index scanrelid, char *enrname); Index scanrelid, char *enrname);
static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual, static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
Index scanrelid, int wtParam); Index scanrelid, int wtParam);
static Append *make_append(List *appendplans, List *tlist, List *partitioned_rels); static Append *make_append(List *appendplans, int first_partial_plan,
List *tlist, List *partitioned_rels);
static RecursiveUnion *make_recursive_union(List *tlist, static RecursiveUnion *make_recursive_union(List *tlist,
Plan *lefttree, Plan *lefttree,
Plan *righttree, Plan *righttree,
...@@ -1059,7 +1060,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path) ...@@ -1059,7 +1060,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path)
* parent-rel Vars it'll be asked to emit. * parent-rel Vars it'll be asked to emit.
*/ */
plan = make_append(subplans, tlist, best_path->partitioned_rels); plan = make_append(subplans, best_path->first_partial_path,
tlist, best_path->partitioned_rels);
copy_generic_path_info(&plan->plan, (Path *) best_path); copy_generic_path_info(&plan->plan, (Path *) best_path);
...@@ -5294,7 +5296,8 @@ make_foreignscan(List *qptlist, ...@@ -5294,7 +5296,8 @@ make_foreignscan(List *qptlist,
} }
static Append * static Append *
make_append(List *appendplans, List *tlist, List *partitioned_rels) make_append(List *appendplans, int first_partial_plan,
List *tlist, List *partitioned_rels)
{ {
Append *node = makeNode(Append); Append *node = makeNode(Append);
Plan *plan = &node->plan; Plan *plan = &node->plan;
...@@ -5305,6 +5308,7 @@ make_append(List *appendplans, List *tlist, List *partitioned_rels) ...@@ -5305,6 +5308,7 @@ make_append(List *appendplans, List *tlist, List *partitioned_rels)
plan->righttree = NULL; plan->righttree = NULL;
node->partitioned_rels = partitioned_rels; node->partitioned_rels = partitioned_rels;
node->appendplans = appendplans; node->appendplans = appendplans;
node->first_partial_plan = first_partial_plan;
return node; return node;
} }
......
...@@ -3680,9 +3680,12 @@ create_grouping_paths(PlannerInfo *root, ...@@ -3680,9 +3680,12 @@ create_grouping_paths(PlannerInfo *root,
path = (Path *) path = (Path *)
create_append_path(grouped_rel, create_append_path(grouped_rel,
paths, paths,
NIL,
NULL, NULL,
0, 0,
NIL); false,
NIL,
-1);
path->pathtarget = target; path->pathtarget = target;
} }
else else
......
...@@ -590,8 +590,8 @@ generate_union_path(SetOperationStmt *op, PlannerInfo *root, ...@@ -590,8 +590,8 @@ generate_union_path(SetOperationStmt *op, PlannerInfo *root,
/* /*
* Append the child results together. * Append the child results together.
*/ */
path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL); path = (Path *) create_append_path(result_rel, pathlist, NIL,
NULL, 0, false, NIL, -1);
/* We have to manually jam the right tlist into the path; ick */ /* We have to manually jam the right tlist into the path; ick */
path->pathtarget = create_pathtarget(root, tlist); path->pathtarget = create_pathtarget(root, tlist);
...@@ -702,7 +702,8 @@ generate_nonunion_path(SetOperationStmt *op, PlannerInfo *root, ...@@ -702,7 +702,8 @@ generate_nonunion_path(SetOperationStmt *op, PlannerInfo *root,
/* /*
* Append the child results together. * Append the child results together.
*/ */
path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL); path = (Path *) create_append_path(result_rel, pathlist, NIL,
NULL, 0, false, NIL, -1);
/* We have to manually jam the right tlist into the path; ick */ /* We have to manually jam the right tlist into the path; ick */
path->pathtarget = create_pathtarget(root, tlist); path->pathtarget = create_pathtarget(root, tlist);
......
...@@ -51,6 +51,8 @@ typedef enum ...@@ -51,6 +51,8 @@ typedef enum
#define STD_FUZZ_FACTOR 1.01 #define STD_FUZZ_FACTOR 1.01
static List *translate_sub_tlist(List *tlist, int relid); static List *translate_sub_tlist(List *tlist, int relid);
static int append_total_cost_compare(const void *a, const void *b);
static int append_startup_cost_compare(const void *a, const void *b);
static List *reparameterize_pathlist_by_child(PlannerInfo *root, static List *reparameterize_pathlist_by_child(PlannerInfo *root,
List *pathlist, List *pathlist,
RelOptInfo *child_rel); RelOptInfo *child_rel);
...@@ -1208,44 +1210,50 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, ...@@ -1208,44 +1210,50 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
* Note that we must handle subpaths = NIL, representing a dummy access path. * Note that we must handle subpaths = NIL, representing a dummy access path.
*/ */
AppendPath * AppendPath *
create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer, create_append_path(RelOptInfo *rel,
int parallel_workers, List *partitioned_rels) List *subpaths, List *partial_subpaths,
Relids required_outer,
int parallel_workers, bool parallel_aware,
List *partitioned_rels, double rows)
{ {
AppendPath *pathnode = makeNode(AppendPath); AppendPath *pathnode = makeNode(AppendPath);
ListCell *l; ListCell *l;
Assert(!parallel_aware || parallel_workers > 0);
pathnode->path.pathtype = T_Append; pathnode->path.pathtype = T_Append;
pathnode->path.parent = rel; pathnode->path.parent = rel;
pathnode->path.pathtarget = rel->reltarget; pathnode->path.pathtarget = rel->reltarget;
pathnode->path.param_info = get_appendrel_parampathinfo(rel, pathnode->path.param_info = get_appendrel_parampathinfo(rel,
required_outer); required_outer);
pathnode->path.parallel_aware = false; pathnode->path.parallel_aware = parallel_aware;
pathnode->path.parallel_safe = rel->consider_parallel; pathnode->path.parallel_safe = rel->consider_parallel;
pathnode->path.parallel_workers = parallel_workers; pathnode->path.parallel_workers = parallel_workers;
pathnode->path.pathkeys = NIL; /* result is always considered unsorted */ pathnode->path.pathkeys = NIL; /* result is always considered unsorted */
pathnode->partitioned_rels = list_copy(partitioned_rels); pathnode->partitioned_rels = list_copy(partitioned_rels);
pathnode->subpaths = subpaths;
/* /*
* We don't bother with inventing a cost_append(), but just do it here. * For parallel append, non-partial paths are sorted by descending total
* * costs. That way, the total time to finish all non-partial paths is
* Compute rows and costs as sums of subplan rows and costs. We charge * minimized. Also, the partial paths are sorted by descending startup
* nothing extra for the Append itself, which perhaps is too optimistic, * costs. There may be some paths that require to do startup work by a
* but since it doesn't do any selection or projection, it is a pretty * single worker. In such case, it's better for workers to choose the
* cheap node. * expensive ones first, whereas the leader should choose the cheapest
* startup plan.
*/ */
pathnode->path.rows = 0; if (pathnode->path.parallel_aware)
pathnode->path.startup_cost = 0; {
pathnode->path.total_cost = 0; subpaths = list_qsort(subpaths, append_total_cost_compare);
partial_subpaths = list_qsort(partial_subpaths,
append_startup_cost_compare);
}
pathnode->first_partial_path = list_length(subpaths);
pathnode->subpaths = list_concat(subpaths, partial_subpaths);
foreach(l, subpaths) foreach(l, subpaths)
{ {
Path *subpath = (Path *) lfirst(l); Path *subpath = (Path *) lfirst(l);
pathnode->path.rows += subpath->rows;
if (l == list_head(subpaths)) /* first node? */
pathnode->path.startup_cost = subpath->startup_cost;
pathnode->path.total_cost += subpath->total_cost;
pathnode->path.parallel_safe = pathnode->path.parallel_safe && pathnode->path.parallel_safe = pathnode->path.parallel_safe &&
subpath->parallel_safe; subpath->parallel_safe;
...@@ -1253,9 +1261,53 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer, ...@@ -1253,9 +1261,53 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer,
Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer)); Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer));
} }
Assert(!parallel_aware || pathnode->path.parallel_safe);
cost_append(pathnode);
/* If the caller provided a row estimate, override the computed value. */
if (rows >= 0)
pathnode->path.rows = rows;
return pathnode; return pathnode;
} }
/*
* append_total_cost_compare
* list_qsort comparator for sorting append child paths by total_cost
*/
static int
append_total_cost_compare(const void *a, const void *b)
{
Path *path1 = (Path *) lfirst(*(ListCell **) a);
Path *path2 = (Path *) lfirst(*(ListCell **) b);
if (path1->total_cost > path2->total_cost)
return -1;
if (path1->total_cost < path2->total_cost)
return 1;
return 0;
}
/*
* append_startup_cost_compare
* list_qsort comparator for sorting append child paths by startup_cost
*/
static int
append_startup_cost_compare(const void *a, const void *b)
{
Path *path1 = (Path *) lfirst(*(ListCell **) a);
Path *path2 = (Path *) lfirst(*(ListCell **) b);
if (path1->startup_cost > path2->startup_cost)
return -1;
if (path1->startup_cost < path2->startup_cost)
return 1;
return 0;
}
/* /*
* create_merge_append_path * create_merge_append_path
* Creates a path corresponding to a MergeAppend plan, returning the * Creates a path corresponding to a MergeAppend plan, returning the
......
...@@ -517,6 +517,7 @@ RegisterLWLockTranches(void) ...@@ -517,6 +517,7 @@ RegisterLWLockTranches(void)
LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE, LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE,
"session_typmod_table"); "session_typmod_table");
LWLockRegisterTranche(LWTRANCHE_TBM, "tbm"); LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append");
/* Register named tranches. */ /* Register named tranches. */
for (i = 0; i < NamedLWLockTrancheRequests; i++) for (i = 0; i < NamedLWLockTrancheRequests; i++)
......
...@@ -920,6 +920,15 @@ static struct config_bool ConfigureNamesBool[] = ...@@ -920,6 +920,15 @@ static struct config_bool ConfigureNamesBool[] =
false, false,
NULL, NULL, NULL NULL, NULL, NULL
}, },
{
{"enable_parallel_append", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("Enables the planner's use of parallel append plans."),
NULL
},
&enable_parallel_append,
true,
NULL, NULL, NULL
},
{ {
{"geqo", PGC_USERSET, QUERY_TUNING_GEQO, {"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
......
...@@ -296,6 +296,7 @@ ...@@ -296,6 +296,7 @@
#enable_material = on #enable_material = on
#enable_mergejoin = on #enable_mergejoin = on
#enable_nestloop = on #enable_nestloop = on
#enable_parallel_append = on
#enable_seqscan = on #enable_seqscan = on
#enable_sort = on #enable_sort = on
#enable_tidscan = on #enable_tidscan = on
......
...@@ -14,10 +14,15 @@ ...@@ -14,10 +14,15 @@
#ifndef NODEAPPEND_H #ifndef NODEAPPEND_H
#define NODEAPPEND_H #define NODEAPPEND_H
#include "access/parallel.h"
#include "nodes/execnodes.h" #include "nodes/execnodes.h"
extern AppendState *ExecInitAppend(Append *node, EState *estate, int eflags); extern AppendState *ExecInitAppend(Append *node, EState *estate, int eflags);
extern void ExecEndAppend(AppendState *node); extern void ExecEndAppend(AppendState *node);
extern void ExecReScanAppend(AppendState *node); extern void ExecReScanAppend(AppendState *node);
extern void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt);
extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt);
extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt);
extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt);
#endif /* NODEAPPEND_H */ #endif /* NODEAPPEND_H */
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "lib/pairingheap.h" #include "lib/pairingheap.h"
#include "nodes/params.h" #include "nodes/params.h"
#include "nodes/plannodes.h" #include "nodes/plannodes.h"
#include "storage/spin.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "utils/queryenvironment.h" #include "utils/queryenvironment.h"
#include "utils/reltrigger.h" #include "utils/reltrigger.h"
...@@ -1000,13 +1001,22 @@ typedef struct ModifyTableState ...@@ -1000,13 +1001,22 @@ typedef struct ModifyTableState
* whichplan which plan is being executed (0 .. n-1) * whichplan which plan is being executed (0 .. n-1)
* ---------------- * ----------------
*/ */
typedef struct AppendState
struct AppendState;
typedef struct AppendState AppendState;
struct ParallelAppendState;
typedef struct ParallelAppendState ParallelAppendState;
struct AppendState
{ {
PlanState ps; /* its first field is NodeTag */ PlanState ps; /* its first field is NodeTag */
PlanState **appendplans; /* array of PlanStates for my inputs */ PlanState **appendplans; /* array of PlanStates for my inputs */
int as_nplans; int as_nplans;
int as_whichplan; int as_whichplan;
} AppendState; ParallelAppendState *as_pstate; /* parallel coordination info */
Size pstate_len; /* size of parallel coordination info */
bool (*choose_next_subplan) (AppendState *);
};
/* ---------------- /* ----------------
* MergeAppendState information * MergeAppendState information
......
...@@ -269,6 +269,9 @@ extern void list_free_deep(List *list); ...@@ -269,6 +269,9 @@ extern void list_free_deep(List *list);
extern List *list_copy(const List *list); extern List *list_copy(const List *list);
extern List *list_copy_tail(const List *list, int nskip); extern List *list_copy_tail(const List *list, int nskip);
typedef int (*list_qsort_comparator) (const void *a, const void *b);
extern List *list_qsort(const List *list, list_qsort_comparator cmp);
/* /*
* To ease migration to the new list API, a set of compatibility * To ease migration to the new list API, a set of compatibility
* macros are provided that reduce the impact of the list API changes * macros are provided that reduce the impact of the list API changes
......
...@@ -248,6 +248,7 @@ typedef struct Append ...@@ -248,6 +248,7 @@ typedef struct Append
/* RT indexes of non-leaf tables in a partition tree */ /* RT indexes of non-leaf tables in a partition tree */
List *partitioned_rels; List *partitioned_rels;
List *appendplans; List *appendplans;
int first_partial_plan;
} Append; } Append;
/* ---------------- /* ----------------
......
...@@ -1255,6 +1255,9 @@ typedef struct CustomPath ...@@ -1255,6 +1255,9 @@ typedef struct CustomPath
* AppendPath represents an Append plan, ie, successive execution of * AppendPath represents an Append plan, ie, successive execution of
* several member plans. * several member plans.
* *
* For partial Append, 'subpaths' contains non-partial subpaths followed by
* partial subpaths.
*
* Note: it is possible for "subpaths" to contain only one, or even no, * Note: it is possible for "subpaths" to contain only one, or even no,
* elements. These cases are optimized during create_append_plan. * elements. These cases are optimized during create_append_plan.
* In particular, an AppendPath with no subpaths is a "dummy" path that * In particular, an AppendPath with no subpaths is a "dummy" path that
...@@ -1266,6 +1269,9 @@ typedef struct AppendPath ...@@ -1266,6 +1269,9 @@ typedef struct AppendPath
/* RT indexes of non-leaf tables in a partition tree */ /* RT indexes of non-leaf tables in a partition tree */
List *partitioned_rels; List *partitioned_rels;
List *subpaths; /* list of component Paths */ List *subpaths; /* list of component Paths */
/* Index of first partial path in subpaths */
int first_partial_path;
} AppendPath; } AppendPath;
#define IS_DUMMY_PATH(p) \ #define IS_DUMMY_PATH(p) \
......
...@@ -68,6 +68,7 @@ extern bool enable_mergejoin; ...@@ -68,6 +68,7 @@ extern bool enable_mergejoin;
extern bool enable_hashjoin; extern bool enable_hashjoin;
extern bool enable_gathermerge; extern bool enable_gathermerge;
extern bool enable_partition_wise_join; extern bool enable_partition_wise_join;
extern bool enable_parallel_append;
extern int constraint_exclusion; extern int constraint_exclusion;
extern double clamp_row_est(double nrows); extern double clamp_row_est(double nrows);
...@@ -106,6 +107,7 @@ extern void cost_sort(Path *path, PlannerInfo *root, ...@@ -106,6 +107,7 @@ extern void cost_sort(Path *path, PlannerInfo *root,
List *pathkeys, Cost input_cost, double tuples, int width, List *pathkeys, Cost input_cost, double tuples, int width,
Cost comparison_cost, int sort_mem, Cost comparison_cost, int sort_mem,
double limit_tuples); double limit_tuples);
extern void cost_append(AppendPath *path);
extern void cost_merge_append(Path *path, PlannerInfo *root, extern void cost_merge_append(Path *path, PlannerInfo *root,
List *pathkeys, int n_streams, List *pathkeys, int n_streams,
Cost input_startup_cost, Cost input_total_cost, Cost input_startup_cost, Cost input_total_cost,
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#ifndef PATHNODE_H #ifndef PATHNODE_H
#define PATHNODE_H #define PATHNODE_H
#include "nodes/bitmapset.h"
#include "nodes/relation.h" #include "nodes/relation.h"
...@@ -63,9 +64,11 @@ extern BitmapOrPath *create_bitmap_or_path(PlannerInfo *root, ...@@ -63,9 +64,11 @@ extern BitmapOrPath *create_bitmap_or_path(PlannerInfo *root,
List *bitmapquals); List *bitmapquals);
extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel,
List *tidquals, Relids required_outer); List *tidquals, Relids required_outer);
extern AppendPath *create_append_path(RelOptInfo *rel, List *subpaths, extern AppendPath *create_append_path(RelOptInfo *rel,
Relids required_outer, int parallel_workers, List *subpaths, List *partial_subpaths,
List *partitioned_rels); Relids required_outer,
int parallel_workers, bool parallel_aware,
List *partitioned_rels, double rows);
extern MergeAppendPath *create_merge_append_path(PlannerInfo *root, extern MergeAppendPath *create_merge_append_path(PlannerInfo *root,
RelOptInfo *rel, RelOptInfo *rel,
List *subpaths, List *subpaths,
......
...@@ -216,6 +216,7 @@ typedef enum BuiltinTrancheIds ...@@ -216,6 +216,7 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_SESSION_RECORD_TABLE, LWTRANCHE_SESSION_RECORD_TABLE,
LWTRANCHE_SESSION_TYPMOD_TABLE, LWTRANCHE_SESSION_TYPMOD_TABLE,
LWTRANCHE_TBM, LWTRANCHE_TBM,
LWTRANCHE_PARALLEL_APPEND,
LWTRANCHE_FIRST_USER_DEFINED LWTRANCHE_FIRST_USER_DEFINED
} BuiltinTrancheIds; } BuiltinTrancheIds;
......
...@@ -1404,6 +1404,7 @@ select min(1-id) from matest0; ...@@ -1404,6 +1404,7 @@ select min(1-id) from matest0;
reset enable_indexscan; reset enable_indexscan;
set enable_seqscan = off; -- plan with fewest seqscans should be merge set enable_seqscan = off; -- plan with fewest seqscans should be merge
set enable_parallel_append = off; -- Don't let parallel-append interfere
explain (verbose, costs off) select * from matest0 order by 1-id; explain (verbose, costs off) select * from matest0 order by 1-id;
QUERY PLAN QUERY PLAN
------------------------------------------------------------------ ------------------------------------------------------------------
...@@ -1470,6 +1471,7 @@ select min(1-id) from matest0; ...@@ -1470,6 +1471,7 @@ select min(1-id) from matest0;
(1 row) (1 row)
reset enable_seqscan; reset enable_seqscan;
reset enable_parallel_append;
drop table matest0 cascade; drop table matest0 cascade;
NOTICE: drop cascades to 3 other objects NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table matest1 DETAIL: drop cascades to table matest1
......
...@@ -11,8 +11,88 @@ set parallel_setup_cost=0; ...@@ -11,8 +11,88 @@ set parallel_setup_cost=0;
set parallel_tuple_cost=0; set parallel_tuple_cost=0;
set min_parallel_table_scan_size=0; set min_parallel_table_scan_size=0;
set max_parallel_workers_per_gather=4; set max_parallel_workers_per_gather=4;
-- Parallel Append with partial-subplans
explain (costs off) explain (costs off)
select count(*) from a_star; select round(avg(aa)), sum(aa) from a_star;
QUERY PLAN
-----------------------------------------------------
Finalize Aggregate
-> Gather
Workers Planned: 3
-> Partial Aggregate
-> Parallel Append
-> Parallel Seq Scan on a_star
-> Parallel Seq Scan on b_star
-> Parallel Seq Scan on c_star
-> Parallel Seq Scan on d_star
-> Parallel Seq Scan on e_star
-> Parallel Seq Scan on f_star
(11 rows)
select round(avg(aa)), sum(aa) from a_star;
round | sum
-------+-----
14 | 355
(1 row)
-- Parallel Append with both partial and non-partial subplans
alter table c_star set (parallel_workers = 0);
alter table d_star set (parallel_workers = 0);
explain (costs off)
select round(avg(aa)), sum(aa) from a_star;
QUERY PLAN
-----------------------------------------------------
Finalize Aggregate
-> Gather
Workers Planned: 3
-> Partial Aggregate
-> Parallel Append
-> Seq Scan on d_star
-> Seq Scan on c_star
-> Parallel Seq Scan on a_star
-> Parallel Seq Scan on b_star
-> Parallel Seq Scan on e_star
-> Parallel Seq Scan on f_star
(11 rows)
-- Parallel Append with only non-partial subplans
alter table a_star set (parallel_workers = 0);
alter table b_star set (parallel_workers = 0);
alter table e_star set (parallel_workers = 0);
alter table f_star set (parallel_workers = 0);
explain (costs off)
select round(avg(aa)), sum(aa) from a_star;
QUERY PLAN
--------------------------------------------
Finalize Aggregate
-> Gather
Workers Planned: 3
-> Partial Aggregate
-> Parallel Append
-> Seq Scan on d_star
-> Seq Scan on f_star
-> Seq Scan on e_star
-> Seq Scan on b_star
-> Seq Scan on c_star
-> Seq Scan on a_star
(11 rows)
select round(avg(aa)), sum(aa) from a_star;
round | sum
-------+-----
14 | 355
(1 row)
-- Disable Parallel Append
alter table a_star reset (parallel_workers);
alter table b_star reset (parallel_workers);
alter table c_star reset (parallel_workers);
alter table d_star reset (parallel_workers);
alter table e_star reset (parallel_workers);
alter table f_star reset (parallel_workers);
set enable_parallel_append to off;
explain (costs off)
select round(avg(aa)), sum(aa) from a_star;
QUERY PLAN QUERY PLAN
----------------------------------------------------- -----------------------------------------------------
Finalize Aggregate Finalize Aggregate
...@@ -28,12 +108,13 @@ explain (costs off) ...@@ -28,12 +108,13 @@ explain (costs off)
-> Parallel Seq Scan on f_star -> Parallel Seq Scan on f_star
(11 rows) (11 rows)
select count(*) from a_star; select round(avg(aa)), sum(aa) from a_star;
count round | sum
------- -------+-----
50 14 | 355
(1 row) (1 row)
reset enable_parallel_append;
-- test with leader participation disabled -- test with leader participation disabled
set parallel_leader_participation = off; set parallel_leader_participation = off;
explain (costs off) explain (costs off)
......
...@@ -81,11 +81,12 @@ select name, setting from pg_settings where name like 'enable%'; ...@@ -81,11 +81,12 @@ select name, setting from pg_settings where name like 'enable%';
enable_material | on enable_material | on
enable_mergejoin | on enable_mergejoin | on
enable_nestloop | on enable_nestloop | on
enable_parallel_append | on
enable_partition_wise_join | off enable_partition_wise_join | off
enable_seqscan | on enable_seqscan | on
enable_sort | on enable_sort | on
enable_tidscan | on enable_tidscan | on
(13 rows) (14 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail -- more-or-less working. We can't test their contents in any great detail
......
...@@ -508,11 +508,13 @@ select min(1-id) from matest0; ...@@ -508,11 +508,13 @@ select min(1-id) from matest0;
reset enable_indexscan; reset enable_indexscan;
set enable_seqscan = off; -- plan with fewest seqscans should be merge set enable_seqscan = off; -- plan with fewest seqscans should be merge
set enable_parallel_append = off; -- Don't let parallel-append interfere
explain (verbose, costs off) select * from matest0 order by 1-id; explain (verbose, costs off) select * from matest0 order by 1-id;
select * from matest0 order by 1-id; select * from matest0 order by 1-id;
explain (verbose, costs off) select min(1-id) from matest0; explain (verbose, costs off) select min(1-id) from matest0;
select min(1-id) from matest0; select min(1-id) from matest0;
reset enable_seqscan; reset enable_seqscan;
reset enable_parallel_append;
drop table matest0 cascade; drop table matest0 cascade;
......
...@@ -15,9 +15,38 @@ set parallel_tuple_cost=0; ...@@ -15,9 +15,38 @@ set parallel_tuple_cost=0;
set min_parallel_table_scan_size=0; set min_parallel_table_scan_size=0;
set max_parallel_workers_per_gather=4; set max_parallel_workers_per_gather=4;
-- Parallel Append with partial-subplans
explain (costs off) explain (costs off)
select count(*) from a_star; select round(avg(aa)), sum(aa) from a_star;
select count(*) from a_star; select round(avg(aa)), sum(aa) from a_star;
-- Parallel Append with both partial and non-partial subplans
alter table c_star set (parallel_workers = 0);
alter table d_star set (parallel_workers = 0);
explain (costs off)
select round(avg(aa)), sum(aa) from a_star;
-- Parallel Append with only non-partial subplans
alter table a_star set (parallel_workers = 0);
alter table b_star set (parallel_workers = 0);
alter table e_star set (parallel_workers = 0);
alter table f_star set (parallel_workers = 0);
explain (costs off)
select round(avg(aa)), sum(aa) from a_star;
select round(avg(aa)), sum(aa) from a_star;
-- Disable Parallel Append
alter table a_star reset (parallel_workers);
alter table b_star reset (parallel_workers);
alter table c_star reset (parallel_workers);
alter table d_star reset (parallel_workers);
alter table e_star reset (parallel_workers);
alter table f_star reset (parallel_workers);
set enable_parallel_append to off;
explain (costs off)
select round(avg(aa)), sum(aa) from a_star;
select round(avg(aa)), sum(aa) from a_star;
reset enable_parallel_append;
-- test with leader participation disabled -- test with leader participation disabled
set parallel_leader_participation = off; set parallel_leader_participation = off;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment