Commit 7df2c1f8 authored by Tom Lane's avatar Tom Lane

Force rescanning of parallel-aware scan nodes below a Gather[Merge].

The ExecReScan machinery contains various optimizations for postponing
or skipping rescans of plan subtrees; for example a HashAgg node may
conclude that it can re-use the table it built before, instead of
re-reading its input subtree.  But that is wrong if the input contains
a parallel-aware table scan node, since the portion of the table scanned
by the leader process is likely to vary from one rescan to the next.
This explains the timing-dependent buildfarm failures we saw after
commit a2b70c89.

The established mechanism for showing that a plan node's output is
potentially variable is to mark it as depending on some runtime Param.
Hence, to fix this, invent a dummy Param (one that has a PARAM_EXEC
parameter number, but carries no actual value) associated with each Gather
or GatherMerge node, mark parallel-aware nodes below that node as dependent
on that Param, and arrange for ExecReScanGather[Merge] to flag that Param
as changed whenever the Gather[Merge] node is rescanned.

This solution breaks an undocumented assumption made by the parallel
executor logic, namely that all rescans of nodes below a Gather[Merge]
will happen synchronously during the ReScan of the top node itself.
But that's fundamentally contrary to the design of the ExecReScan code,
and so was doomed to fail someday anyway (even if you want to argue
that the bug being fixed here wasn't a failure of that assumption).
A follow-on patch will address that issue.  In the meantime, the worst
that's expected to happen is that given very bad timing luck, the leader
might have to do all the work during a rescan, because workers think
they have nothing to do, if they are able to start up before the eventual
ReScan of the leader's parallel-aware table scan node has reset the
shared scan state.

Although this problem exists in 9.6, there does not seem to be any way
for it to manifest there.  Without GatherMerge, it seems that a plan tree
that has a rescan-short-circuiting node below Gather will always also
have one above it that will short-circuit in the same cases, preventing
the Gather from being rescanned.  Hence we won't take the risk of
back-patching this change into 9.6.  But v10 needs it.

Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
parent 00f6d5c2
...@@ -432,6 +432,9 @@ ExecShutdownGather(GatherState *node) ...@@ -432,6 +432,9 @@ ExecShutdownGather(GatherState *node)
void void
ExecReScanGather(GatherState *node) ExecReScanGather(GatherState *node)
{ {
Gather *gather = (Gather *) node->ps.plan;
PlanState *outerPlan = outerPlanState(node);
/* /*
* Re-initialize the parallel workers to perform rescan of relation. We * Re-initialize the parallel workers to perform rescan of relation. We
* want to gracefully shutdown all the workers so that they should be able * want to gracefully shutdown all the workers so that they should be able
...@@ -445,5 +448,22 @@ ExecReScanGather(GatherState *node) ...@@ -445,5 +448,22 @@ ExecReScanGather(GatherState *node)
if (node->pei) if (node->pei)
ExecParallelReinitialize(node->pei); ExecParallelReinitialize(node->pei);
ExecReScan(node->ps.lefttree); /*
* Set child node's chgParam to tell it that the next scan might deliver a
* different set of rows within the leader process. (The overall rowset
* shouldn't change, but the leader process's subset might; hence nodes
* between here and the parallel table scan node mustn't optimize on the
* assumption of an unchanging rowset.)
*/
if (gather->rescan_param >= 0)
outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
gather->rescan_param);
/*
* if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
*/
if (outerPlan->chgParam == NULL)
ExecReScan(outerPlan);
} }
...@@ -327,6 +327,9 @@ ExecShutdownGatherMergeWorkers(GatherMergeState *node) ...@@ -327,6 +327,9 @@ ExecShutdownGatherMergeWorkers(GatherMergeState *node)
void void
ExecReScanGatherMerge(GatherMergeState *node) ExecReScanGatherMerge(GatherMergeState *node)
{ {
GatherMerge *gm = (GatherMerge *) node->ps.plan;
PlanState *outerPlan = outerPlanState(node);
/* /*
* Re-initialize the parallel workers to perform rescan of relation. We * Re-initialize the parallel workers to perform rescan of relation. We
* want to gracefully shutdown all the workers so that they should be able * want to gracefully shutdown all the workers so that they should be able
...@@ -341,7 +344,24 @@ ExecReScanGatherMerge(GatherMergeState *node) ...@@ -341,7 +344,24 @@ ExecReScanGatherMerge(GatherMergeState *node)
if (node->pei) if (node->pei)
ExecParallelReinitialize(node->pei); ExecParallelReinitialize(node->pei);
ExecReScan(node->ps.lefttree); /*
* Set child node's chgParam to tell it that the next scan might deliver a
* different set of rows within the leader process. (The overall rowset
* shouldn't change, but the leader process's subset might; hence nodes
* between here and the parallel table scan node mustn't optimize on the
* assumption of an unchanging rowset.)
*/
if (gm->rescan_param >= 0)
outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
gm->rescan_param);
/*
* if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
*/
if (outerPlan->chgParam == NULL)
ExecReScan(outerPlan);
} }
/* /*
......
...@@ -361,6 +361,7 @@ _copyGather(const Gather *from) ...@@ -361,6 +361,7 @@ _copyGather(const Gather *from)
* copy remainder of node * copy remainder of node
*/ */
COPY_SCALAR_FIELD(num_workers); COPY_SCALAR_FIELD(num_workers);
COPY_SCALAR_FIELD(rescan_param);
COPY_SCALAR_FIELD(single_copy); COPY_SCALAR_FIELD(single_copy);
COPY_SCALAR_FIELD(invisible); COPY_SCALAR_FIELD(invisible);
...@@ -384,6 +385,7 @@ _copyGatherMerge(const GatherMerge *from) ...@@ -384,6 +385,7 @@ _copyGatherMerge(const GatherMerge *from)
* copy remainder of node * copy remainder of node
*/ */
COPY_SCALAR_FIELD(num_workers); COPY_SCALAR_FIELD(num_workers);
COPY_SCALAR_FIELD(rescan_param);
COPY_SCALAR_FIELD(numCols); COPY_SCALAR_FIELD(numCols);
COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber)); COPY_POINTER_FIELD(sortColIdx, from->numCols * sizeof(AttrNumber));
COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid)); COPY_POINTER_FIELD(sortOperators, from->numCols * sizeof(Oid));
......
...@@ -479,6 +479,7 @@ _outGather(StringInfo str, const Gather *node) ...@@ -479,6 +479,7 @@ _outGather(StringInfo str, const Gather *node)
_outPlanInfo(str, (const Plan *) node); _outPlanInfo(str, (const Plan *) node);
WRITE_INT_FIELD(num_workers); WRITE_INT_FIELD(num_workers);
WRITE_INT_FIELD(rescan_param);
WRITE_BOOL_FIELD(single_copy); WRITE_BOOL_FIELD(single_copy);
WRITE_BOOL_FIELD(invisible); WRITE_BOOL_FIELD(invisible);
} }
...@@ -493,6 +494,7 @@ _outGatherMerge(StringInfo str, const GatherMerge *node) ...@@ -493,6 +494,7 @@ _outGatherMerge(StringInfo str, const GatherMerge *node)
_outPlanInfo(str, (const Plan *) node); _outPlanInfo(str, (const Plan *) node);
WRITE_INT_FIELD(num_workers); WRITE_INT_FIELD(num_workers);
WRITE_INT_FIELD(rescan_param);
WRITE_INT_FIELD(numCols); WRITE_INT_FIELD(numCols);
appendStringInfoString(str, " :sortColIdx"); appendStringInfoString(str, " :sortColIdx");
......
...@@ -2163,6 +2163,7 @@ _readGather(void) ...@@ -2163,6 +2163,7 @@ _readGather(void)
ReadCommonPlan(&local_node->plan); ReadCommonPlan(&local_node->plan);
READ_INT_FIELD(num_workers); READ_INT_FIELD(num_workers);
READ_INT_FIELD(rescan_param);
READ_BOOL_FIELD(single_copy); READ_BOOL_FIELD(single_copy);
READ_BOOL_FIELD(invisible); READ_BOOL_FIELD(invisible);
...@@ -2180,6 +2181,7 @@ _readGatherMerge(void) ...@@ -2180,6 +2181,7 @@ _readGatherMerge(void)
ReadCommonPlan(&local_node->plan); ReadCommonPlan(&local_node->plan);
READ_INT_FIELD(num_workers); READ_INT_FIELD(num_workers);
READ_INT_FIELD(rescan_param);
READ_INT_FIELD(numCols); READ_INT_FIELD(numCols);
READ_ATTRNUMBER_ARRAY(sortColIdx, local_node->numCols); READ_ATTRNUMBER_ARRAY(sortColIdx, local_node->numCols);
READ_OID_ARRAY(sortOperators, local_node->numCols); READ_OID_ARRAY(sortOperators, local_node->numCols);
......
...@@ -374,6 +374,7 @@ RelOptInfo - a relation or joined relations ...@@ -374,6 +374,7 @@ RelOptInfo - a relation or joined relations
MaterialPath - a Material plan node MaterialPath - a Material plan node
UniquePath - remove duplicate rows (either by hashing or sorting) UniquePath - remove duplicate rows (either by hashing or sorting)
GatherPath - collect the results of parallel workers GatherPath - collect the results of parallel workers
GatherMergePath - collect parallel results, preserving their common sort order
ProjectionPath - a Result plan node with child (used for projection) ProjectionPath - a Result plan node with child (used for projection)
ProjectSetPath - a ProjectSet plan node applied to some sub-path ProjectSetPath - a ProjectSet plan node applied to some sub-path
SortPath - a Sort plan node applied to some sub-path SortPath - a Sort plan node applied to some sub-path
...@@ -1030,7 +1031,7 @@ either by an entire query or some portion of the query in such a way that ...@@ -1030,7 +1031,7 @@ either by an entire query or some portion of the query in such a way that
some of that work can be done by one or more worker processes, which are some of that work can be done by one or more worker processes, which are
called parallel workers. Parallel workers are a subtype of dynamic called parallel workers. Parallel workers are a subtype of dynamic
background workers; see src/backend/access/transam/README.parallel for a background workers; see src/backend/access/transam/README.parallel for a
fuller description. Academic literature on parallel query suggests that fuller description. The academic literature on parallel query suggests
that parallel execution strategies can be divided into essentially two that parallel execution strategies can be divided into essentially two
categories: pipelined parallelism, where the execution of the query is categories: pipelined parallelism, where the execution of the query is
divided into multiple stages and each stage is handled by a separate divided into multiple stages and each stage is handled by a separate
...@@ -1046,16 +1047,14 @@ that the underlying table be partitioned. It only requires that (1) ...@@ -1046,16 +1047,14 @@ that the underlying table be partitioned. It only requires that (1)
there is some method of dividing the data from at least one of the base there is some method of dividing the data from at least one of the base
tables involved in the relation across multiple processes, (2) allowing tables involved in the relation across multiple processes, (2) allowing
each process to handle its own portion of the data, and then (3) each process to handle its own portion of the data, and then (3)
collecting the results. Requirements (2) and (3) is satisfied by the collecting the results. Requirements (2) and (3) are satisfied by the
executor node Gather, which launches any number of worker processes and executor node Gather (or GatherMerge), which launches any number of worker
executes its single child plan in all of them (and perhaps in the leader processes and executes its single child plan in all of them, and perhaps
also, if the children aren't generating enough data to keep the leader in the leader also, if the children aren't generating enough data to keep
busy). Requirement (1) is handled by the SeqScan node: when invoked the leader busy. Requirement (1) is handled by the table scan node: when
with parallel_aware = true, this node will, in effect, partition the invoked with parallel_aware = true, this node will, in effect, partition
table on a block by block basis, returning a subset of the tuples from the table on a block by block basis, returning a subset of the tuples from
the relation in each worker where that SeqScan is executed. A similar the relation in each worker where that scan node is executed.
scheme could be (and probably should be) implemented for bitmap heap
scans.
Just as we do for non-parallel access methods, we build Paths to Just as we do for non-parallel access methods, we build Paths to
represent access strategies that can be used in a parallel plan. These represent access strategies that can be used in a parallel plan. These
......
...@@ -267,7 +267,7 @@ static Unique *make_unique_from_sortclauses(Plan *lefttree, List *distinctList); ...@@ -267,7 +267,7 @@ static Unique *make_unique_from_sortclauses(Plan *lefttree, List *distinctList);
static Unique *make_unique_from_pathkeys(Plan *lefttree, static Unique *make_unique_from_pathkeys(Plan *lefttree,
List *pathkeys, int numCols); List *pathkeys, int numCols);
static Gather *make_gather(List *qptlist, List *qpqual, static Gather *make_gather(List *qptlist, List *qpqual,
int nworkers, bool single_copy, Plan *subplan); int nworkers, int rescan_param, bool single_copy, Plan *subplan);
static SetOp *make_setop(SetOpCmd cmd, SetOpStrategy strategy, Plan *lefttree, static SetOp *make_setop(SetOpCmd cmd, SetOpStrategy strategy, Plan *lefttree,
List *distinctList, AttrNumber flagColIdx, int firstFlag, List *distinctList, AttrNumber flagColIdx, int firstFlag,
long numGroups); long numGroups);
...@@ -1471,6 +1471,7 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path) ...@@ -1471,6 +1471,7 @@ create_gather_plan(PlannerInfo *root, GatherPath *best_path)
gather_plan = make_gather(tlist, gather_plan = make_gather(tlist,
NIL, NIL,
best_path->num_workers, best_path->num_workers,
SS_assign_special_param(root),
best_path->single_copy, best_path->single_copy,
subplan); subplan);
...@@ -1505,6 +1506,9 @@ create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path) ...@@ -1505,6 +1506,9 @@ create_gather_merge_plan(PlannerInfo *root, GatherMergePath *best_path)
gm_plan->num_workers = best_path->num_workers; gm_plan->num_workers = best_path->num_workers;
copy_generic_path_info(&gm_plan->plan, &best_path->path); copy_generic_path_info(&gm_plan->plan, &best_path->path);
/* Assign the rescan Param. */
gm_plan->rescan_param = SS_assign_special_param(root);
/* Gather Merge is pointless with no pathkeys; use Gather instead. */ /* Gather Merge is pointless with no pathkeys; use Gather instead. */
Assert(pathkeys != NIL); Assert(pathkeys != NIL);
...@@ -6238,6 +6242,7 @@ static Gather * ...@@ -6238,6 +6242,7 @@ static Gather *
make_gather(List *qptlist, make_gather(List *qptlist,
List *qpqual, List *qpqual,
int nworkers, int nworkers,
int rescan_param,
bool single_copy, bool single_copy,
Plan *subplan) Plan *subplan)
{ {
...@@ -6249,6 +6254,7 @@ make_gather(List *qptlist, ...@@ -6249,6 +6254,7 @@ make_gather(List *qptlist,
plan->lefttree = subplan; plan->lefttree = subplan;
plan->righttree = NULL; plan->righttree = NULL;
node->num_workers = nworkers; node->num_workers = nworkers;
node->rescan_param = rescan_param;
node->single_copy = single_copy; node->single_copy = single_copy;
node->invisible = false; node->invisible = false;
......
...@@ -374,6 +374,12 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) ...@@ -374,6 +374,12 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
gather->single_copy = true; gather->single_copy = true;
gather->invisible = (force_parallel_mode == FORCE_PARALLEL_REGRESS); gather->invisible = (force_parallel_mode == FORCE_PARALLEL_REGRESS);
/*
* Since this Gather has no parallel-aware descendants to signal to,
* we don't need a rescan Param.
*/
gather->rescan_param = -1;
/* /*
* Ideally we'd use cost_gather here, but setting up dummy path data * Ideally we'd use cost_gather here, but setting up dummy path data
* to satisfy it doesn't seem much cleaner than knowing what it does. * to satisfy it doesn't seem much cleaner than knowing what it does.
......
...@@ -79,6 +79,7 @@ static Node *process_sublinks_mutator(Node *node, ...@@ -79,6 +79,7 @@ static Node *process_sublinks_mutator(Node *node,
process_sublinks_context *context); process_sublinks_context *context);
static Bitmapset *finalize_plan(PlannerInfo *root, static Bitmapset *finalize_plan(PlannerInfo *root,
Plan *plan, Plan *plan,
int gather_param,
Bitmapset *valid_params, Bitmapset *valid_params,
Bitmapset *scan_params); Bitmapset *scan_params);
static bool finalize_primnode(Node *node, finalize_primnode_context *context); static bool finalize_primnode(Node *node, finalize_primnode_context *context);
...@@ -2217,12 +2218,15 @@ void ...@@ -2217,12 +2218,15 @@ void
SS_finalize_plan(PlannerInfo *root, Plan *plan) SS_finalize_plan(PlannerInfo *root, Plan *plan)
{ {
/* No setup needed, just recurse through plan tree. */ /* No setup needed, just recurse through plan tree. */
(void) finalize_plan(root, plan, root->outer_params, NULL); (void) finalize_plan(root, plan, -1, root->outer_params, NULL);
} }
/* /*
* Recursive processing of all nodes in the plan tree * Recursive processing of all nodes in the plan tree
* *
* gather_param is the rescan_param of an ancestral Gather/GatherMerge,
* or -1 if there is none.
*
* valid_params is the set of param IDs supplied by outer plan levels * valid_params is the set of param IDs supplied by outer plan levels
* that are valid to reference in this plan node or its children. * that are valid to reference in this plan node or its children.
* *
...@@ -2249,7 +2253,9 @@ SS_finalize_plan(PlannerInfo *root, Plan *plan) ...@@ -2249,7 +2253,9 @@ SS_finalize_plan(PlannerInfo *root, Plan *plan)
* can be handled more cleanly. * can be handled more cleanly.
*/ */
static Bitmapset * static Bitmapset *
finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, finalize_plan(PlannerInfo *root, Plan *plan,
int gather_param,
Bitmapset *valid_params,
Bitmapset *scan_params) Bitmapset *scan_params)
{ {
finalize_primnode_context context; finalize_primnode_context context;
...@@ -2302,6 +2308,18 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, ...@@ -2302,6 +2308,18 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
finalize_primnode((Node *) plan->targetlist, &context); finalize_primnode((Node *) plan->targetlist, &context);
finalize_primnode((Node *) plan->qual, &context); finalize_primnode((Node *) plan->qual, &context);
/*
* If it's a parallel-aware scan node, mark it as dependent on the parent
* Gather/GatherMerge's rescan Param.
*/
if (plan->parallel_aware)
{
if (gather_param < 0)
elog(ERROR, "parallel-aware plan node is not below a Gather");
context.paramids =
bms_add_member(context.paramids, gather_param);
}
/* Check additional node-type-specific fields */ /* Check additional node-type-specific fields */
switch (nodeTag(plan)) switch (nodeTag(plan))
{ {
...@@ -2512,6 +2530,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, ...@@ -2512,6 +2530,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
bms_add_members(context.paramids, bms_add_members(context.paramids,
finalize_plan(root, finalize_plan(root,
(Plan *) lfirst(lc), (Plan *) lfirst(lc),
gather_param,
valid_params, valid_params,
scan_params)); scan_params));
} }
...@@ -2542,6 +2561,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, ...@@ -2542,6 +2561,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
bms_add_members(context.paramids, bms_add_members(context.paramids,
finalize_plan(root, finalize_plan(root,
(Plan *) lfirst(l), (Plan *) lfirst(l),
gather_param,
valid_params, valid_params,
scan_params)); scan_params));
} }
...@@ -2558,6 +2578,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, ...@@ -2558,6 +2578,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
bms_add_members(context.paramids, bms_add_members(context.paramids,
finalize_plan(root, finalize_plan(root,
(Plan *) lfirst(l), (Plan *) lfirst(l),
gather_param,
valid_params, valid_params,
scan_params)); scan_params));
} }
...@@ -2574,6 +2595,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, ...@@ -2574,6 +2595,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
bms_add_members(context.paramids, bms_add_members(context.paramids,
finalize_plan(root, finalize_plan(root,
(Plan *) lfirst(l), (Plan *) lfirst(l),
gather_param,
valid_params, valid_params,
scan_params)); scan_params));
} }
...@@ -2590,6 +2612,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, ...@@ -2590,6 +2612,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
bms_add_members(context.paramids, bms_add_members(context.paramids,
finalize_plan(root, finalize_plan(root,
(Plan *) lfirst(l), (Plan *) lfirst(l),
gather_param,
valid_params, valid_params,
scan_params)); scan_params));
} }
...@@ -2606,6 +2629,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, ...@@ -2606,6 +2629,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
bms_add_members(context.paramids, bms_add_members(context.paramids,
finalize_plan(root, finalize_plan(root,
(Plan *) lfirst(l), (Plan *) lfirst(l),
gather_param,
valid_params, valid_params,
scan_params)); scan_params));
} }
...@@ -2697,13 +2721,51 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, ...@@ -2697,13 +2721,51 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
&context); &context);
break; break;
case T_Gather:
/* child nodes are allowed to reference rescan_param, if any */
locally_added_param = ((Gather *) plan)->rescan_param;
if (locally_added_param >= 0)
{
valid_params = bms_add_member(bms_copy(valid_params),
locally_added_param);
/*
* We currently don't support nested Gathers. The issue so
* far as this function is concerned would be how to identify
* which child nodes depend on which Gather.
*/
Assert(gather_param < 0);
/* Pass down rescan_param to child parallel-aware nodes */
gather_param = locally_added_param;
}
/* rescan_param does *not* get added to scan_params */
break;
case T_GatherMerge:
/* child nodes are allowed to reference rescan_param, if any */
locally_added_param = ((GatherMerge *) plan)->rescan_param;
if (locally_added_param >= 0)
{
valid_params = bms_add_member(bms_copy(valid_params),
locally_added_param);
/*
* We currently don't support nested Gathers. The issue so
* far as this function is concerned would be how to identify
* which child nodes depend on which Gather.
*/
Assert(gather_param < 0);
/* Pass down rescan_param to child parallel-aware nodes */
gather_param = locally_added_param;
}
/* rescan_param does *not* get added to scan_params */
break;
case T_ProjectSet: case T_ProjectSet:
case T_Hash: case T_Hash:
case T_Material: case T_Material:
case T_Sort: case T_Sort:
case T_Unique: case T_Unique:
case T_Gather:
case T_GatherMerge:
case T_SetOp: case T_SetOp:
case T_Group: case T_Group:
/* no node-type-specific fields need fixing */ /* no node-type-specific fields need fixing */
...@@ -2717,6 +2779,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, ...@@ -2717,6 +2779,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
/* Process left and right child plans, if any */ /* Process left and right child plans, if any */
child_params = finalize_plan(root, child_params = finalize_plan(root,
plan->lefttree, plan->lefttree,
gather_param,
valid_params, valid_params,
scan_params); scan_params);
context.paramids = bms_add_members(context.paramids, child_params); context.paramids = bms_add_members(context.paramids, child_params);
...@@ -2726,6 +2789,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, ...@@ -2726,6 +2789,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
/* right child can reference nestloop_params as well as valid_params */ /* right child can reference nestloop_params as well as valid_params */
child_params = finalize_plan(root, child_params = finalize_plan(root,
plan->righttree, plan->righttree,
gather_param,
bms_union(nestloop_params, valid_params), bms_union(nestloop_params, valid_params),
scan_params); scan_params);
/* ... and they don't count as parameters used at my level */ /* ... and they don't count as parameters used at my level */
...@@ -2737,6 +2801,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params, ...@@ -2737,6 +2801,7 @@ finalize_plan(PlannerInfo *root, Plan *plan, Bitmapset *valid_params,
/* easy case */ /* easy case */
child_params = finalize_plan(root, child_params = finalize_plan(root,
plan->righttree, plan->righttree,
gather_param,
valid_params, valid_params,
scan_params); scan_params);
} }
......
...@@ -825,13 +825,21 @@ typedef struct Unique ...@@ -825,13 +825,21 @@ typedef struct Unique
/* ------------ /* ------------
* gather node * gather node
*
* Note: rescan_param is the ID of a PARAM_EXEC parameter slot. That slot
* will never actually contain a value, but the Gather node must flag it as
* having changed whenever it is rescanned. The child parallel-aware scan
* nodes are marked as depending on that parameter, so that the rescan
* machinery is aware that their output is likely to change across rescans.
* In some cases we don't need a rescan Param, so rescan_param is set to -1.
* ------------ * ------------
*/ */
typedef struct Gather typedef struct Gather
{ {
Plan plan; Plan plan;
int num_workers; int num_workers; /* planned number of worker processes */
bool single_copy; int rescan_param; /* ID of Param that signals a rescan, or -1 */
bool single_copy; /* don't execute plan more than once */
bool invisible; /* suppress EXPLAIN display (for testing)? */ bool invisible; /* suppress EXPLAIN display (for testing)? */
} Gather; } Gather;
...@@ -842,7 +850,8 @@ typedef struct Gather ...@@ -842,7 +850,8 @@ typedef struct Gather
typedef struct GatherMerge typedef struct GatherMerge
{ {
Plan plan; Plan plan;
int num_workers; int num_workers; /* planned number of worker processes */
int rescan_param; /* ID of Param that signals a rescan, or -1 */
/* remaining fields are just like the sort-key info in struct Sort */ /* remaining fields are just like the sort-key info in struct Sort */
int numCols; /* number of sort-key columns */ int numCols; /* number of sort-key columns */
AttrNumber *sortColIdx; /* their indexes in the target list */ AttrNumber *sortColIdx; /* their indexes in the target list */
......
...@@ -1268,9 +1268,9 @@ typedef struct GatherPath ...@@ -1268,9 +1268,9 @@ typedef struct GatherPath
} GatherPath; } GatherPath;
/* /*
* GatherMergePath runs several copies of a plan in parallel and * GatherMergePath runs several copies of a plan in parallel and collects
* collects the results. For gather merge parallel leader always execute the * the results, preserving their common sort order. For gather merge, the
* plan. * parallel leader always executes the plan too, so we don't need single_copy.
*/ */
typedef struct GatherMergePath typedef struct GatherMergePath
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment