Commit 355d3993 authored by Robert Haas's avatar Robert Haas

Add a Gather Merge executor node.

Like Gather, we spawn multiple workers and run the same plan in each
one; however, Gather Merge is used when each worker produces the same
output ordering and we want to preserve that output ordering while
merging together the streams of tuples from various workers.  (In a
way, Gather Merge is like a hybrid of Gather and MergeAppend.)

This works out to a win if it saves us from having to perform an
expensive Sort.  In cases where only a small amount of data would need
to be sorted, it may actually be faster to use a regular Gather node
and then sort the results afterward, because Gather Merge sometimes
needs to wait synchronously for tuples whereas a pure Gather generally
doesn't.  But if this avoids an expensive sort then it's a win.

Rushabh Lathia, reviewed and tested by Amit Kapila, Thomas Munro,
and Neha Sharma, and reviewed and revised by me.

Discussion: http://postgr.es/m/CAGPqQf09oPX-cQRpBKS0Gq49Z+m6KBxgxd_p9gX8CKk_d75HoQ@mail.gmail.com
parent a72f0365
......@@ -3497,6 +3497,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
</listitem>
</varlistentry>
<varlistentry id="guc-enable-gathermerge" xreflabel="enable_gathermerge">
<term><varname>enable_gathermerge</varname> (<type>boolean</type>)
<indexterm>
<primary><varname>enable_gathermerge</> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Enables or disables the query planner's use of gather
merge plan types. The default is <literal>on</>.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-enable-hashagg" xreflabel="enable_hashagg">
<term><varname>enable_hashagg</varname> (<type>boolean</type>)
<indexterm>
......
......@@ -918,6 +918,9 @@ ExplainNode(PlanState *planstate, List *ancestors,
case T_Gather:
pname = sname = "Gather";
break;
case T_GatherMerge:
pname = sname = "Gather Merge";
break;
case T_IndexScan:
pname = sname = "Index Scan";
break;
......@@ -1411,6 +1414,26 @@ ExplainNode(PlanState *planstate, List *ancestors,
ExplainPropertyBool("Single Copy", gather->single_copy, es);
}
break;
case T_GatherMerge:
{
GatherMerge *gm = (GatherMerge *) plan;
show_scan_qual(plan->qual, "Filter", planstate, ancestors, es);
if (plan->qual)
show_instrumentation_count("Rows Removed by Filter", 1,
planstate, es);
ExplainPropertyInteger("Workers Planned",
gm->num_workers, es);
if (es->analyze)
{
int nworkers;
nworkers = ((GatherMergeState *) planstate)->nworkers_launched;
ExplainPropertyInteger("Workers Launched",
nworkers, es);
}
}
break;
case T_FunctionScan:
if (es->verbose)
{
......
......@@ -20,7 +20,7 @@ OBJS = execAmi.o execCurrent.o execGrouping.o execIndexing.o execJunk.o \
nodeBitmapHeapscan.o nodeBitmapIndexscan.o \
nodeCustom.o nodeFunctionscan.o nodeGather.o \
nodeHash.o nodeHashjoin.o nodeIndexscan.o nodeIndexonlyscan.o \
nodeLimit.o nodeLockRows.o \
nodeLimit.o nodeLockRows.o nodeGatherMerge.o \
nodeMaterial.o nodeMergeAppend.o nodeMergejoin.o nodeModifyTable.o \
nodeNestloop.o nodeProjectSet.o nodeRecursiveunion.o nodeResult.o \
nodeSamplescan.o nodeSeqscan.o nodeSetOp.o nodeSort.o nodeUnique.o \
......
......@@ -89,6 +89,7 @@
#include "executor/nodeForeignscan.h"
#include "executor/nodeFunctionscan.h"
#include "executor/nodeGather.h"
#include "executor/nodeGatherMerge.h"
#include "executor/nodeGroup.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
......@@ -326,6 +327,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
estate, eflags);
break;
case T_GatherMerge:
result = (PlanState *) ExecInitGatherMerge((GatherMerge *) node,
estate, eflags);
break;
case T_Hash:
result = (PlanState *) ExecInitHash((Hash *) node,
estate, eflags);
......@@ -535,6 +541,10 @@ ExecProcNode(PlanState *node)
result = ExecGather((GatherState *) node);
break;
case T_GatherMergeState:
result = ExecGatherMerge((GatherMergeState *) node);
break;
case T_HashState:
result = ExecHash((HashState *) node);
break;
......@@ -697,6 +707,10 @@ ExecEndNode(PlanState *node)
ExecEndGather((GatherState *) node);
break;
case T_GatherMergeState:
ExecEndGatherMerge((GatherMergeState *) node);
break;
case T_IndexScanState:
ExecEndIndexScan((IndexScanState *) node);
break;
......@@ -842,6 +856,9 @@ ExecShutdownNode(PlanState *node)
case T_CustomScanState:
ExecShutdownCustomScan((CustomScanState *) node);
break;
case T_GatherMergeState:
ExecShutdownGatherMerge((GatherMergeState *) node);
break;
default:
break;
}
......
This diff is collapsed.
......@@ -360,6 +360,31 @@ _copyGather(const Gather *from)
return newnode;
}
/*
* _copyGatherMerge
*/
static GatherMerge *
_copyGatherMerge(const GatherMerge *from)
{
GatherMerge *newnode = makeNode(GatherMerge);
/*
* copy node superclass fields
*/
CopyPlanFields((const Plan *) from, (Plan *) newnode);
/*
* copy remainder of node
*/
COPY_SCALAR_FIELD(num_workers);
COPY_SCALAR_FIELD(numCols);
COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
COPY_POINTER_FIELD(collations, from->numCols * sizeof(Oid));
COPY_POINTER_FIELD(nullsFirst, from->numCols * sizeof(bool));
return newnode;
}
/*
* CopyScanFields
......@@ -4594,6 +4619,9 @@ copyObject(const void *from)
case T_Gather:
retval = _copyGather(from);
break;
case T_GatherMerge:
retval = _copyGatherMerge(from);
break;
case T_SeqScan:
retval = _copySeqScan(from);
break;
......
......@@ -457,6 +457,35 @@ _outGather(StringInfo str, const Gather *node)
WRITE_BOOL_FIELD(invisible);
}
static void
_outGatherMerge(StringInfo str, const GatherMerge *node)
{
int i;
WRITE_NODE_TYPE("GATHERMERGE");
_outPlanInfo(str, (const Plan *) node);
WRITE_INT_FIELD(num_workers);
WRITE_INT_FIELD(numCols);
appendStringInfoString(str, " :sortColIdx");
for (i = 0; i < node->numCols; i++)
appendStringInfo(str, " %d", node->sortColIdx[i]);
appendStringInfoString(str, " :sortOperators");
for (i = 0; i < node->numCols; i++)
appendStringInfo(str, " %u", node->sortOperators[i]);
appendStringInfoString(str, " :collations");
for (i = 0; i < node->numCols; i++)
appendStringInfo(str, " %u", node->collations[i]);
appendStringInfoString(str, " :nullsFirst");
for (i = 0; i < node->numCols; i++)
appendStringInfo(str, " %s", booltostr(node->nullsFirst[i]));
}
static void
_outScan(StringInfo str, const Scan *node)
{
......@@ -2016,6 +2045,17 @@ _outLimitPath(StringInfo str, const LimitPath *node)
WRITE_NODE_FIELD(limitCount);
}
static void
_outGatherMergePath(StringInfo str, const GatherMergePath *node)
{
WRITE_NODE_TYPE("GATHERMERGEPATH");
_outPathInfo(str, (const Path *) node);
WRITE_NODE_FIELD(subpath);
WRITE_INT_FIELD(num_workers);
}
static void
_outNestPath(StringInfo str, const NestPath *node)
{
......@@ -3473,6 +3513,9 @@ outNode(StringInfo str, const void *obj)
case T_Gather:
_outGather(str, obj);
break;
case T_GatherMerge:
_outGatherMerge(str, obj);
break;
case T_Scan:
_outScan(str, obj);
break;
......@@ -3809,6 +3852,9 @@ outNode(StringInfo str, const void *obj)
case T_LimitPath:
_outLimitPath(str, obj);
break;
case T_GatherMergePath:
_outGatherMergePath(str, obj);
break;
case T_NestPath:
_outNestPath(str, obj);
break;
......
......@@ -2137,6 +2137,26 @@ _readGather(void)
READ_DONE();
}
/*
* _readGatherMerge
*/
static GatherMerge *
_readGatherMerge(void)
{
READ_LOCALS(GatherMerge);
ReadCommonPlan(&local_node->plan);
READ_INT_FIELD(num_workers);
READ_INT_FIELD(numCols);
READ_ATTRNUMBER_ARRAY(sortColIdx, local_node->numCols);
READ_OID_ARRAY(sortOperators, local_node->numCols);
READ_OID_ARRAY(collations, local_node->numCols);
READ_BOOL_ARRAY(nullsFirst, local_node->numCols);
READ_DONE();
}
/*
* _readHash
*/
......@@ -2577,6 +2597,8 @@ parseNodeString(void)
return_value = _readUnique();
else if (MATCH("GATHER", 6))
return_value = _readGather();
else if (MATCH("GATHERMERGE", 11))
return_value = _readGatherMerge();
else if (MATCH("HASH", 4))
return_value = _readHash();
else if (MATCH("SETOP", 5))
......
......@@ -2084,39 +2084,51 @@ set_worktable_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
/*
* generate_gather_paths
* Generate parallel access paths for a relation by pushing a Gather on
* top of a partial path.
* Generate parallel access paths for a relation by pushing a Gather or
* Gather Merge on top of a partial path.
*
* This must not be called until after we're done creating all partial paths
* for the specified relation. (Otherwise, add_partial_path might delete a
* path that some GatherPath has a reference to.)
* path that some GatherPath or GatherMergePath has a reference to.)
*/
void
generate_gather_paths(PlannerInfo *root, RelOptInfo *rel)
{
Path *cheapest_partial_path;
Path *simple_gather_path;
ListCell *lc;
/* If there are no partial paths, there's nothing to do here. */
if (rel->partial_pathlist == NIL)
return;
/*
* The output of Gather is currently always unsorted, so there's only one
* partial path of interest: the cheapest one. That will be the one at
* the front of partial_pathlist because of the way add_partial_path
* works.
*
* Eventually, we should have a Gather Merge operation that can merge
* multiple tuple streams together while preserving their ordering. We
* could usefully generate such a path from each partial path that has
* non-NIL pathkeys.
* The output of Gather is always unsorted, so there's only one partial
* path of interest: the cheapest one. That will be the one at the front
* of partial_pathlist because of the way add_partial_path works.
*/
cheapest_partial_path = linitial(rel->partial_pathlist);
simple_gather_path = (Path *)
create_gather_path(root, rel, cheapest_partial_path, rel->reltarget,
NULL, NULL);
add_path(rel, simple_gather_path);
/*
* For each useful ordering, we can consider an order-preserving Gather
* Merge.
*/
foreach (lc, rel->partial_pathlist)
{
Path *subpath = (Path *) lfirst(lc);
GatherMergePath *path;
if (subpath->pathkeys == NIL)
continue;
path = create_gather_merge_path(root, rel, subpath, rel->reltarget,
subpath->pathkeys, NULL, NULL);
add_path(rel, &path->path);
}
}
/*
......
......@@ -126,6 +126,7 @@ bool enable_nestloop = true;
bool enable_material = true;
bool enable_mergejoin = true;
bool enable_hashjoin = true;
bool enable_gathermerge = true;
typedef struct
{
......@@ -372,6 +373,73 @@ cost_gather(GatherPath *path, PlannerInfo *root,
path->path.total_cost = (startup_cost + run_cost);
}
/*
* cost_gather_merge
* Determines and returns the cost of gather merge path.
*
* GatherMerge merges several pre-sorted input streams, using a heap that at
* any given instant holds the next tuple from each stream. If there are N
* streams, we need about N*log2(N) tuple comparisons to construct the heap at
* startup, and then for each output tuple, about log2(N) comparisons to
* replace the top heap entry with the next tuple from the same stream.
*/
void
cost_gather_merge(GatherMergePath *path, PlannerInfo *root,
RelOptInfo *rel, ParamPathInfo *param_info,
Cost input_startup_cost, Cost input_total_cost,
double *rows)
{
Cost startup_cost = 0;
Cost run_cost = 0;
Cost comparison_cost;
double N;
double logN;
/* Mark the path with the correct row estimate */
if (rows)
path->path.rows = *rows;
else if (param_info)
path->path.rows = param_info->ppi_rows;
else
path->path.rows = rel->rows;
if (!enable_gathermerge)
startup_cost += disable_cost;
/*
* Add one to the number of workers to account for the leader. This might
* be overgenerous since the leader will do less work than other workers
* in typical cases, but we'll go with it for now.
*/
Assert(path->num_workers > 0);
N = (double) path->num_workers + 1;
logN = LOG2(N);
/* Assumed cost per tuple comparison */
comparison_cost = 2.0 * cpu_operator_cost;
/* Heap creation cost */
startup_cost += comparison_cost * N * logN;
/* Per-tuple heap maintenance cost */
run_cost += path->path.rows * comparison_cost * logN;
/* small cost for heap management, like cost_merge_append */
run_cost += cpu_operator_cost * path->path.rows;
/*
* Parallel setup and communication cost. Since Gather Merge, unlike
* Gather, requires us to block until a tuple is available from every
* worker, we bump the IPC cost up a little bit as compared with Gather.
* For lack of a better idea, charge an extra 5%.
*/
startup_cost += parallel_setup_cost;
run_cost += parallel_tuple_cost * path->path.rows * 1.05;
path->path.startup_cost = startup_cost + input_startup_cost;
path->path.total_cost = (startup_cost + run_cost + input_total_cost);
}
/*
* cost_index
* Determines and returns the cost of scanning a relation using an index.
......
......@@ -277,6 +277,8 @@ static ModifyTable *make_modifytable(PlannerInfo *root,
List *resultRelations, List *subplans,
List *withCheckOptionLists, List *returningLists,
List *rowMarks, OnConflictExpr *onconflict, int epqParam);
static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
GatherMergePath *best_path);
/*
......@@ -475,6 +477,10 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags)
(LimitPath *) best_path,
flags);
break;
case T_GatherMerge:
plan = (Plan *) create_gather_merge_plan(root,
(GatherMergePath *) best_path);
break;
default:
elog(ERROR, "unrecognized node type: %d",
(int) best_path->pathtype);
......@@ -1451,6 +1457,86 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path)
return gather_plan;
}
/*
* create_gather_merge_plan
*
* Create a Gather Merge plan for 'best_path' and (recursively)
* plans for its subpaths.
*/
static GatherMerge *
create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path)
{
GatherMerge *gm_plan;
Plan *subplan;
List *pathkeys = best_path->path.pathkeys;
int numsortkeys;
AttrNumber *sortColIdx;
Oid *sortOperators;
Oid *collations;
bool *nullsFirst;
/* As with Gather, it's best to project away columns in the workers. */
subplan = create_plan_recurse(root, best_path->subpath, CP_EXACT_TLIST);
/* See create_merge_append_plan for why there's no make_xxx function */
gm_plan = makeNode(GatherMerge);
gm_plan->plan.targetlist = subplan->targetlist;
gm_plan->num_workers = best_path->num_workers;
copy_generic_path_info(&gm_plan->plan, &best_path->path);
/* Gather Merge is pointless with no pathkeys; use Gather instead. */
Assert(pathkeys != NIL);
/* Compute sort column info, and adjust GatherMerge tlist as needed */
(void) prepare_sort_from_pathkeys(&gm_plan->plan, pathkeys,
best_path->path.parent->relids,
NULL,
true,
&gm_plan->numCols,
&gm_plan->sortColIdx,
&gm_plan->sortOperators,
&gm_plan->collations,
&gm_plan->nullsFirst);
/* Compute sort column info, and adjust subplan's tlist as needed */
subplan = prepare_sort_from_pathkeys(subplan, pathkeys,
best_path->subpath->parent->relids,
gm_plan->sortColIdx,
false,
&numsortkeys,
&sortColIdx,
&sortOperators,
&collations,
&nullsFirst);
/* As for MergeAppend, check that we got the same sort key information. */
Assert(numsortkeys == gm_plan->numCols);
if (memcmp(sortColIdx, gm_plan->sortColIdx,
numsortkeys * sizeof(AttrNumber)) != 0)
elog(ERROR, "GatherMerge child's targetlist doesn't match GatherMerge");
Assert(memcmp(sortOperators, gm_plan->sortOperators,
numsortkeys * sizeof(Oid)) == 0);
Assert(memcmp(collations, gm_plan->collations,
numsortkeys * sizeof(Oid)) == 0);
Assert(memcmp(nullsFirst, gm_plan->nullsFirst,
numsortkeys * sizeof(bool)) == 0);
/* Now, insert a Sort node if subplan isn't sufficiently ordered */
if (!pathkeys_contained_in(pathkeys, best_path->subpath->pathkeys))
subplan = (Plan *) make_sort(subplan, numsortkeys,
sortColIdx, sortOperators,
collations, nullsFirst);
/* Now insert the subplan under GatherMerge. */
gm_plan->plan.lefttree = subplan;
/* use parallel mode for parallel plans. */
root->glob->parallelModeNeeded = true;
return gm_plan;
}
/*
* create_projection_plan
*
......
......@@ -3663,8 +3663,7 @@ create_grouping_paths(PlannerInfo *root,
/*
* Now generate a complete GroupAgg Path atop of the cheapest partial
* path. We need only bother with the cheapest path here, as the
* output of Gather is never sorted.
* path. We can do this using either Gather or Gather Merge.
*/
if (grouped_rel->partial_pathlist)
{
......@@ -3711,6 +3710,70 @@ create_grouping_paths(PlannerInfo *root,
parse->groupClause,
(List *) parse->havingQual,
dNumGroups));
/*
* The point of using Gather Merge rather than Gather is that it
* can preserve the ordering of the input path, so there's no
* reason to try it unless (1) it's possible to produce more than
* one output row and (2) we want the output path to be ordered.
*/
if (parse->groupClause != NIL && root->group_pathkeys != NIL)
{
foreach(lc, grouped_rel->partial_pathlist)
{
Path *subpath = (Path *) lfirst(lc);
Path *gmpath;
double total_groups;
/*
* It's useful to consider paths that are already properly
* ordered for Gather Merge, because those don't need a
* sort. It's also useful to consider the cheapest path,
* because sorting it in parallel and then doing Gather
* Merge may be better than doing an unordered Gather
* followed by a sort. But there's no point in
* considering non-cheapest paths that aren't already
* sorted correctly.
*/
if (path != subpath &&
!pathkeys_contained_in(root->group_pathkeys,
subpath->pathkeys))
continue;
total_groups = subpath->rows * subpath->parallel_workers;
gmpath = (Path *)
create_gather_merge_path(root,
grouped_rel,
subpath,
NULL,
root->group_pathkeys,
NULL,
&total_groups);
if (parse->hasAggs)
add_path(grouped_rel, (Path *)
create_agg_path(root,
grouped_rel,
gmpath,
target,
parse->groupClause ? AGG_SORTED : AGG_PLAIN,
AGGSPLIT_FINAL_DESERIAL,
parse->groupClause,
(List *) parse->havingQual,
&agg_final_costs,
dNumGroups));
else
add_path(grouped_rel, (Path *)
create_group_path(root,
grouped_rel,
gmpath,
target,
parse->groupClause,
(List *) parse->havingQual,
dNumGroups));
}
}
}
}
......@@ -3808,6 +3871,16 @@ create_grouping_paths(PlannerInfo *root,
/* Now choose the best path(s) */
set_cheapest(grouped_rel);
/*
* We've been using the partial pathlist for the grouped relation to hold
* partially aggregated paths, but that's actually a little bit bogus
* because it's unsafe for later planning stages -- like ordered_rel ---
* to get the idea that they can use these partial paths as if they didn't
* need a FinalizeAggregate step. Zap the partial pathlist at this stage
* so we don't get confused.
*/
grouped_rel->partial_pathlist = NIL;
return grouped_rel;
}
......@@ -4275,6 +4348,56 @@ create_ordered_paths(PlannerInfo *root,
}
}
/*
* generate_gather_paths() will have already generated a simple Gather
* path for the best parallel path, if any, and the loop above will have
* considered sorting it. Similarly, generate_gather_paths() will also
* have generated order-preserving Gather Merge plans which can be used
* without sorting if they happen to match the sort_pathkeys, and the loop
* above will have handled those as well. However, there's one more
* possibility: it may make sense to sort the cheapest partial path
* according to the required output order and then use Gather Merge.
*/
if (ordered_rel->consider_parallel && root->sort_pathkeys != NIL &&
input_rel->partial_pathlist != NIL)
{
Path *cheapest_partial_path;
cheapest_partial_path = linitial(input_rel->partial_pathlist);
/*
* If cheapest partial path doesn't need a sort, this is redundant
* with what's already been tried.
*/
if (!pathkeys_contained_in(root->sort_pathkeys,
cheapest_partial_path->pathkeys))
{
Path *path;
double total_groups;
path = (Path *) create_sort_path(root,
ordered_rel,
cheapest_partial_path,
root->sort_pathkeys,
limit_tuples);
total_groups = cheapest_partial_path->rows *
cheapest_partial_path->parallel_workers;
path = (Path *)
create_gather_merge_path(root, ordered_rel,
path,
target, root->sort_pathkeys, NULL,
&total_groups);
/* Add projection step if needed */
if (path->pathtarget != target)
path = apply_projection_to_path(root, ordered_rel,
path, target);
add_path(ordered_rel, path);
}
}
/*
* If there is an FDW that's responsible for all baserels of the query,
* let it consider adding ForeignPaths.
......
......@@ -616,6 +616,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset)
break;
case T_Gather:
case T_GatherMerge:
set_upper_references(root, plan, rtoffset);
break;
......
......@@ -2700,6 +2700,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
case T_Sort:
case T_Unique:
case T_Gather:
case T_GatherMerge:
case T_SetOp:
case T_Group:
/* no node-type-specific fields need fixing */
......
......@@ -1627,6 +1627,66 @@ create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
return pathnode;
}
/*
* create_gather_merge_path
*
* Creates a path corresponding to a gather merge scan, returning
* the pathnode.
*/
GatherMergePath *
create_gather_merge_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,
PathTarget *target, List *pathkeys,
Relids required_outer, double *rows)
{
GatherMergePath *pathnode = makeNode(GatherMergePath);
Cost input_startup_cost = 0;
Cost input_total_cost = 0;
Assert(subpath->parallel_safe);
Assert(pathkeys);
pathnode->path.pathtype = T_GatherMerge;
pathnode->path.parent = rel;
pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
required_outer);
pathnode->path.parallel_aware = false;
pathnode->subpath = subpath;
pathnode->num_workers = subpath->parallel_workers;
pathnode->path.pathkeys = pathkeys;
pathnode->path.pathtarget = target ? target : rel->reltarget;
pathnode->path.rows += subpath->rows;
if (pathkeys_contained_in(pathkeys, subpath->pathkeys))
{
/* Subpath is adequately ordered, we won't need to sort it */
input_startup_cost += subpath->startup_cost;
input_total_cost += subpath->total_cost;
}
else
{
/* We'll need to insert a Sort node, so include cost for that */
Path sort_path; /* dummy for result of cost_sort */
cost_sort(&sort_path,
root,
pathkeys,
subpath->total_cost,
subpath->rows,
subpath->pathtarget->width,
0.0,
work_mem,
-1);
input_startup_cost += sort_path.startup_cost;
input_total_cost += sort_path.total_cost;
}
cost_gather_merge(pathnode, root, rel, pathnode->path.param_info,
input_startup_cost, input_total_cost, rows);
return pathnode;
}
/*
* translate_sub_tlist - get subquery column numbers represented by tlist
*
......
......@@ -902,6 +902,15 @@ static struct config_bool ConfigureNamesBool[] =
true,
NULL, NULL, NULL
},
{
{"enable_gathermerge", PGC_USERSET, QUERY_TUNING_METHOD,
gettext_noop("Enables the planner's use of gather merge plans."),
NULL
},
&enable_gathermerge,
true,
NULL, NULL, NULL
},
{
{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
......
/*-------------------------------------------------------------------------
*
* nodeGatherMerge.h
* prototypes for nodeGatherMerge.c
*
*
* Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/executor/nodeGatherMerge.h
*
*-------------------------------------------------------------------------
*/
#ifndef NODEGATHERMERGE_H
#define NODEGATHERMERGE_H
#include "nodes/execnodes.h"
extern GatherMergeState *ExecInitGatherMerge(GatherMerge * node,
EState *estate,
int eflags);
extern TupleTableSlot *ExecGatherMerge(GatherMergeState * node);
extern void ExecEndGatherMerge(GatherMergeState * node);
extern void ExecReScanGatherMerge(GatherMergeState * node);
extern void ExecShutdownGatherMerge(GatherMergeState * node);
#endif /* NODEGATHERMERGE_H */
......@@ -2094,6 +2094,35 @@ typedef struct GatherState
bool need_to_scan_locally;
} GatherState;
/* ----------------
* GatherMergeState information
*
* Gather merge nodes launch 1 or more parallel workers, run a
* subplan which produces sorted output in each worker, and then
* merge the results into a single sorted stream.
* ----------------
*/
struct GMReaderTuple;
typedef struct GatherMergeState
{
PlanState ps; /* its first field is NodeTag */
bool initialized;
struct ParallelExecutorInfo *pei;
int nreaders;
int nworkers_launched;
struct TupleQueueReader **reader;
TupleDesc tupDesc;
TupleTableSlot **gm_slots;
struct binaryheap *gm_heap; /* binary heap of slot indices */
bool gm_initialized; /* gather merge initilized ? */
bool need_to_scan_locally;
int gm_nkeys;
SortSupport gm_sortkeys; /* array of length ms_nkeys */
struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per
* reader */
} GatherMergeState;
/* ----------------
* HashState information
* ----------------
......
......@@ -77,6 +77,7 @@ typedef enum NodeTag
T_WindowAgg,
T_Unique,
T_Gather,
T_GatherMerge,
T_Hash,
T_SetOp,
T_LockRows,
......@@ -127,6 +128,7 @@ typedef enum NodeTag
T_WindowAggState,
T_UniqueState,
T_GatherState,
T_GatherMergeState,
T_HashState,
T_SetOpState,
T_LockRowsState,
......@@ -249,6 +251,7 @@ typedef enum NodeTag
T_MaterialPath,
T_UniquePath,
T_GatherPath,
T_GatherMergePath,
T_ProjectionPath,
T_ProjectSetPath,
T_SortPath,
......
......@@ -797,6 +797,22 @@ typedef struct Gather
bool invisible; /* suppress EXPLAIN display (for testing)? */
} Gather;
/* ------------
* gather merge node
* ------------
*/
typedef struct GatherMerge
{
Plan plan;
int num_workers;
/* remaining fields are just like the sort-key info in struct Sort */
int numCols; /* number of sort-key columns */
AttrNumber *sortColIdx; /* their indexes in the target list */
Oid *sortOperators; /* OIDs of operators to sort them by */
Oid *collations; /* OIDs of collations */
bool *nullsFirst; /* NULLS FIRST/LAST directions */
} GatherMerge;
/* ----------------
* hash build node
*
......
......@@ -1203,6 +1203,19 @@ typedef struct GatherPath
bool single_copy; /* don't execute path more than once */
} GatherPath;
/*
* GatherMergePath runs several copies of a plan in parallel and
* collects the results. For gather merge parallel leader always execute the
* plan.
*/
typedef struct GatherMergePath
{
Path path;
Path *subpath; /* path for each worker */
int num_workers; /* number of workers sought to help */
} GatherMergePath;
/*
* All join-type paths share these fields.
*/
......
......@@ -66,6 +66,7 @@ extern bool enable_nestloop;
extern bool enable_material;
extern bool enable_mergejoin;
extern bool enable_hashjoin;
extern bool enable_gathermerge;
extern int constraint_exclusion;
extern double clamp_row_est(double nrows);
......@@ -205,5 +206,9 @@ extern Selectivity clause_selectivity(PlannerInfo *root,
int varRelid,
JoinType jointype,
SpecialJoinInfo *sjinfo);
extern void cost_gather_merge(GatherMergePath *path, PlannerInfo *root,
RelOptInfo *rel, ParamPathInfo *param_info,
Cost input_startup_cost, Cost input_total_cost,
double *rows);
#endif /* COST_H */
......@@ -78,6 +78,13 @@ extern UniquePath *create_unique_path(PlannerInfo *root, RelOptInfo *rel,
extern GatherPath *create_gather_path(PlannerInfo *root,
RelOptInfo *rel, Path *subpath, PathTarget *target,
Relids required_outer, double *rows);
extern GatherMergePath *create_gather_merge_path(PlannerInfo *root,
RelOptInfo *rel,
Path *subpath,
PathTarget *target,
List *pathkeys,
Relids required_outer,
double *rows);
extern SubqueryScanPath *create_subqueryscan_path(PlannerInfo *root,
RelOptInfo *rel, Path *subpath,
List *pathkeys, Relids required_outer);
......
......@@ -213,6 +213,33 @@ select count(*) from tenk1, tenk2 where tenk1.unique1 = tenk2.unique1;
reset enable_hashjoin;
reset enable_nestloop;
--test gather merge
set enable_hashagg to off;
explain (costs off)
select string4, count((unique2)) from tenk1 group by string4 order by string4;
QUERY PLAN
----------------------------------------------------
Finalize GroupAggregate
Group Key: string4
-> Gather Merge
Workers Planned: 4
-> Partial GroupAggregate
Group Key: string4
-> Sort
Sort Key: string4
-> Parallel Seq Scan on tenk1
(9 rows)
select string4, count((unique2)) from tenk1 group by string4 order by string4;
string4 | count
---------+-------
AAAAxx | 2500
HHHHxx | 2500
OOOOxx | 2500
VVVVxx | 2500
(4 rows)
reset enable_hashagg;
set force_parallel_mode=1;
explain (costs off)
select stringu1::int2 from tenk1 where unique1 = 1;
......
......@@ -73,6 +73,7 @@ select name, setting from pg_settings where name like 'enable%';
name | setting
----------------------+---------
enable_bitmapscan | on
enable_gathermerge | on
enable_hashagg | on
enable_hashjoin | on
enable_indexonlyscan | on
......@@ -83,7 +84,7 @@ select name, setting from pg_settings where name like 'enable%';
enable_seqscan | on
enable_sort | on
enable_tidscan | on
(11 rows)
(12 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail
......
......@@ -84,6 +84,17 @@ select count(*) from tenk1, tenk2 where tenk1.unique1 = tenk2.unique1;
reset enable_hashjoin;
reset enable_nestloop;
--test gather merge
set enable_hashagg to off;
explain (costs off)
select string4, count((unique2)) from tenk1 group by string4 order by string4;
select string4, count((unique2)) from tenk1 group by string4 order by string4;
reset enable_hashagg;
set force_parallel_mode=1;
explain (costs off)
......
......@@ -779,6 +779,9 @@ GV
Gather
GatherPath
GatherState
GatherMerge
GatherMergePath
GatherMergeState
Gene
GenericCosts
GenericExprState
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment