Commit 5e6d8d2b authored by Robert Haas's avatar Robert Haas

Allow parallel workers to execute subplans.

This doesn't do anything to make Param nodes anything other than
parallel-restricted, so this only helps with uncorrelated subplans,
and it's not necessarily very cheap because each worker will run the
subplan separately (just as a Hash Join will build a separate copy of
the hash table in each participating process), but it's a first step
toward supporting cases that are more likely to help in practice, and
is occasionally useful on its own.

Amit Kapila, reviewed and tested by Rafia Sabih, Dilip Kumar, and
me.

Discussion: http://postgr.es/m/CAA4eK1+e8Z45D2n+rnDMDYsVEb5iW7jqaCH_tvPMYau=1Rru9w@mail.gmail.com
parent 8da9a226
...@@ -156,7 +156,7 @@ ExecSerializePlan(Plan *plan, EState *estate) ...@@ -156,7 +156,7 @@ ExecSerializePlan(Plan *plan, EState *estate)
pstmt->planTree = plan; pstmt->planTree = plan;
pstmt->rtable = estate->es_range_table; pstmt->rtable = estate->es_range_table;
pstmt->resultRelations = NIL; pstmt->resultRelations = NIL;
pstmt->subplans = NIL; pstmt->subplans = estate->es_plannedstmt->subplans;
pstmt->rewindPlanIDs = NULL; pstmt->rewindPlanIDs = NULL;
pstmt->rowMarks = NIL; pstmt->rowMarks = NIL;
pstmt->relationOids = NIL; pstmt->relationOids = NIL;
......
...@@ -1495,6 +1495,7 @@ _copySubPlan(const SubPlan *from) ...@@ -1495,6 +1495,7 @@ _copySubPlan(const SubPlan *from)
COPY_SCALAR_FIELD(firstColCollation); COPY_SCALAR_FIELD(firstColCollation);
COPY_SCALAR_FIELD(useHashTable); COPY_SCALAR_FIELD(useHashTable);
COPY_SCALAR_FIELD(unknownEqFalse); COPY_SCALAR_FIELD(unknownEqFalse);
COPY_SCALAR_FIELD(parallel_safe);
COPY_NODE_FIELD(setParam); COPY_NODE_FIELD(setParam);
COPY_NODE_FIELD(parParam); COPY_NODE_FIELD(parParam);
COPY_NODE_FIELD(args); COPY_NODE_FIELD(args);
......
...@@ -423,6 +423,7 @@ _equalSubPlan(const SubPlan *a, const SubPlan *b) ...@@ -423,6 +423,7 @@ _equalSubPlan(const SubPlan *a, const SubPlan *b)
COMPARE_SCALAR_FIELD(firstColCollation); COMPARE_SCALAR_FIELD(firstColCollation);
COMPARE_SCALAR_FIELD(useHashTable); COMPARE_SCALAR_FIELD(useHashTable);
COMPARE_SCALAR_FIELD(unknownEqFalse); COMPARE_SCALAR_FIELD(unknownEqFalse);
COMPARE_SCALAR_FIELD(parallel_safe);
COMPARE_NODE_FIELD(setParam); COMPARE_NODE_FIELD(setParam);
COMPARE_NODE_FIELD(parParam); COMPARE_NODE_FIELD(parParam);
COMPARE_NODE_FIELD(args); COMPARE_NODE_FIELD(args);
......
...@@ -1226,6 +1226,7 @@ _outSubPlan(StringInfo str, const SubPlan *node) ...@@ -1226,6 +1226,7 @@ _outSubPlan(StringInfo str, const SubPlan *node)
WRITE_OID_FIELD(firstColCollation); WRITE_OID_FIELD(firstColCollation);
WRITE_BOOL_FIELD(useHashTable); WRITE_BOOL_FIELD(useHashTable);
WRITE_BOOL_FIELD(unknownEqFalse); WRITE_BOOL_FIELD(unknownEqFalse);
WRITE_BOOL_FIELD(parallel_safe);
WRITE_NODE_FIELD(setParam); WRITE_NODE_FIELD(setParam);
WRITE_NODE_FIELD(parParam); WRITE_NODE_FIELD(parParam);
WRITE_NODE_FIELD(args); WRITE_NODE_FIELD(args);
......
...@@ -2233,6 +2233,7 @@ _readSubPlan(void) ...@@ -2233,6 +2233,7 @@ _readSubPlan(void)
READ_OID_FIELD(firstColCollation); READ_OID_FIELD(firstColCollation);
READ_BOOL_FIELD(useHashTable); READ_BOOL_FIELD(useHashTable);
READ_BOOL_FIELD(unknownEqFalse); READ_BOOL_FIELD(unknownEqFalse);
READ_BOOL_FIELD(parallel_safe);
READ_NODE_FIELD(setParam); READ_NODE_FIELD(setParam);
READ_NODE_FIELD(parParam); READ_NODE_FIELD(parParam);
READ_NODE_FIELD(args); READ_NODE_FIELD(args);
......
...@@ -58,7 +58,7 @@ static Node *build_subplan(PlannerInfo *root, Plan *plan, PlannerInfo *subroot, ...@@ -58,7 +58,7 @@ static Node *build_subplan(PlannerInfo *root, Plan *plan, PlannerInfo *subroot,
List *plan_params, List *plan_params,
SubLinkType subLinkType, int subLinkId, SubLinkType subLinkType, int subLinkId,
Node *testexpr, bool adjust_testexpr, Node *testexpr, bool adjust_testexpr,
bool unknownEqFalse); bool unknownEqFalse, bool parallel_safe);
static List *generate_subquery_params(PlannerInfo *root, List *tlist, static List *generate_subquery_params(PlannerInfo *root, List *tlist,
List **paramIds); List **paramIds);
static List *generate_subquery_vars(PlannerInfo *root, List *tlist, static List *generate_subquery_vars(PlannerInfo *root, List *tlist,
...@@ -551,7 +551,8 @@ make_subplan(PlannerInfo *root, Query *orig_subquery, ...@@ -551,7 +551,8 @@ make_subplan(PlannerInfo *root, Query *orig_subquery,
/* And convert to SubPlan or InitPlan format. */ /* And convert to SubPlan or InitPlan format. */
result = build_subplan(root, plan, subroot, plan_params, result = build_subplan(root, plan, subroot, plan_params,
subLinkType, subLinkId, subLinkType, subLinkId,
testexpr, true, isTopQual); testexpr, true, isTopQual,
best_path->parallel_safe);
/* /*
* If it's a correlated EXISTS with an unimportant targetlist, we might be * If it's a correlated EXISTS with an unimportant targetlist, we might be
...@@ -604,7 +605,8 @@ make_subplan(PlannerInfo *root, Query *orig_subquery, ...@@ -604,7 +605,8 @@ make_subplan(PlannerInfo *root, Query *orig_subquery,
plan_params, plan_params,
ANY_SUBLINK, 0, ANY_SUBLINK, 0,
newtestexpr, newtestexpr,
false, true); false, true,
best_path->parallel_safe);
/* Check we got what we expected */ /* Check we got what we expected */
Assert(IsA(hashplan, SubPlan)); Assert(IsA(hashplan, SubPlan));
Assert(hashplan->parParam == NIL); Assert(hashplan->parParam == NIL);
...@@ -634,7 +636,7 @@ build_subplan(PlannerInfo *root, Plan *plan, PlannerInfo *subroot, ...@@ -634,7 +636,7 @@ build_subplan(PlannerInfo *root, Plan *plan, PlannerInfo *subroot,
List *plan_params, List *plan_params,
SubLinkType subLinkType, int subLinkId, SubLinkType subLinkType, int subLinkId,
Node *testexpr, bool adjust_testexpr, Node *testexpr, bool adjust_testexpr,
bool unknownEqFalse) bool unknownEqFalse, bool parallel_safe)
{ {
Node *result; Node *result;
SubPlan *splan; SubPlan *splan;
...@@ -653,6 +655,7 @@ build_subplan(PlannerInfo *root, Plan *plan, PlannerInfo *subroot, ...@@ -653,6 +655,7 @@ build_subplan(PlannerInfo *root, Plan *plan, PlannerInfo *subroot,
&splan->firstColCollation); &splan->firstColCollation);
splan->useHashTable = false; splan->useHashTable = false;
splan->unknownEqFalse = unknownEqFalse; splan->unknownEqFalse = unknownEqFalse;
splan->parallel_safe = parallel_safe;
splan->setParam = NIL; splan->setParam = NIL;
splan->parParam = NIL; splan->parParam = NIL;
splan->args = NIL; splan->args = NIL;
...@@ -1213,6 +1216,12 @@ SS_process_ctes(PlannerInfo *root) ...@@ -1213,6 +1216,12 @@ SS_process_ctes(PlannerInfo *root)
&splan->firstColCollation); &splan->firstColCollation);
splan->useHashTable = false; splan->useHashTable = false;
splan->unknownEqFalse = false; splan->unknownEqFalse = false;
/*
* CTE scans are not considered for parallelism (cf
* set_rel_consider_parallel).
*/
splan->parallel_safe = false;
splan->setParam = NIL; splan->setParam = NIL;
splan->parParam = NIL; splan->parParam = NIL;
splan->args = NIL; splan->args = NIL;
......
...@@ -1162,21 +1162,19 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) ...@@ -1162,21 +1162,19 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
} }
/* /*
* Since we don't have the ability to push subplans down to workers at * Really we should not see SubLink during a max_interesting == restricted
* present, we treat subplan references as parallel-restricted. We need * scan, but if we do, return true.
* not worry about examining their contents; if they are unsafe, we would
* have found that out while examining the whole tree before reduction of
* sublinks to subplans. (Really we should not see SubLink during a
* max_interesting == restricted scan, but if we do, return true.)
*/ */
else if (IsA(node, SubLink) || else if (IsA(node, SubLink))
IsA(node, SubPlan) ||
IsA(node, AlternativeSubPlan))
{ {
if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
return true; return true;
} }
/* We can push the subplans only if they are parallel-safe. */
else if (IsA(node, SubPlan))
return !((SubPlan *) node)->parallel_safe;
/* /*
* We can't pass Params to workers at the moment either, so they are also * We can't pass Params to workers at the moment either, so they are also
* parallel-restricted. * parallel-restricted.
......
...@@ -677,6 +677,7 @@ typedef struct SubPlan ...@@ -677,6 +677,7 @@ typedef struct SubPlan
bool unknownEqFalse; /* TRUE if it's okay to return FALSE when the bool unknownEqFalse; /* TRUE if it's okay to return FALSE when the
* spec result is UNKNOWN; this allows much * spec result is UNKNOWN; this allows much
* simpler handling of null values */ * simpler handling of null values */
bool parallel_safe; /* OK to use as part of parallel plan? */
/* Information for passing params into and out of the subselect: */ /* Information for passing params into and out of the subselect: */
/* setParam and parParam are lists of integers (param IDs) */ /* setParam and parParam are lists of integers (param IDs) */
List *setParam; /* initplan subqueries have to set these List *setParam; /* initplan subqueries have to set these
......
...@@ -99,6 +99,32 @@ explain (costs off) ...@@ -99,6 +99,32 @@ explain (costs off)
-> Index Only Scan using tenk1_unique1 on tenk1 -> Index Only Scan using tenk1_unique1 on tenk1
(3 rows) (3 rows)
-- test parallel plans for queries containing un-correlated subplans.
alter table tenk2 set (parallel_workers = 0);
explain (costs off)
select count(*) from tenk1 where (two, four) not in
(select hundred, thousand from tenk2 where thousand > 100);
QUERY PLAN
------------------------------------------------------
Finalize Aggregate
-> Gather
Workers Planned: 4
-> Partial Aggregate
-> Parallel Seq Scan on tenk1
Filter: (NOT (hashed SubPlan 1))
SubPlan 1
-> Seq Scan on tenk2
Filter: (thousand > 100)
(9 rows)
select count(*) from tenk1 where (two, four) not in
(select hundred, thousand from tenk2 where thousand > 100);
count
-------
10000
(1 row)
alter table tenk2 reset (parallel_workers);
set force_parallel_mode=1; set force_parallel_mode=1;
explain (costs off) explain (costs off)
select stringu1::int2 from tenk1 where unique1 = 1; select stringu1::int2 from tenk1 where unique1 = 1;
......
...@@ -39,6 +39,15 @@ explain (costs off) ...@@ -39,6 +39,15 @@ explain (costs off)
select sum(parallel_restricted(unique1)) from tenk1 select sum(parallel_restricted(unique1)) from tenk1
group by(parallel_restricted(unique1)); group by(parallel_restricted(unique1));
-- test parallel plans for queries containing un-correlated subplans.
alter table tenk2 set (parallel_workers = 0);
explain (costs off)
select count(*) from tenk1 where (two, four) not in
(select hundred, thousand from tenk2 where thousand > 100);
select count(*) from tenk1 where (two, four) not in
(select hundred, thousand from tenk2 where thousand > 100);
alter table tenk2 reset (parallel_workers);
set force_parallel_mode=1; set force_parallel_mode=1;
explain (costs off) explain (costs off)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment