Commit 80558c1f authored by Robert Haas's avatar Robert Haas

Generate parallel sequential scan plans in simple cases.

Add a new flag, consider_parallel, to each RelOptInfo, indicating
whether a plan for that relation could conceivably be run inside of
a parallel worker.  Right now, we're pretty conservative: for example,
it might be possible to defer applying a parallel-restricted qual
in a worker, and later do it in the leader, but right now we just
don't try to parallelize access to that relation.  That's probably
the right decision in most cases, anyway.

Using the new flag, generate parallel sequential scan plans for plain
baserels, meaning that we now have parallel sequential scan in
PostgreSQL.  The logic here is pretty unsophisticated right now: the
costing model probably isn't right in detail, and we can't push joins
beneath Gather nodes, so the number of plans that can actually benefit
from this is pretty limited right now.  Lots more work is needed.
Nevertheless, it seems time to enable this functionality so that all
this code can actually be tested easily by users and developers.

Note that, if you wish to test this functionality, it will be
necessary to set max_parallel_degree to a value greater than the
default of 0.  Once a few more loose ends have been tidied up here, we
might want to consider changing the default value of this GUC, but
I'm leaving it alone for now.

Along the way, fix a bug in cost_gather: the previous coding thought
that a Gather node's transfer overhead should be costed on the basis of
the relation size rather than the number of tuples that actually need
to be passed off to the leader.

Patch by me, reviewed in earlier versions by Amit Kapila.
parent f0661c4e
...@@ -1882,6 +1882,7 @@ _outRelOptInfo(StringInfo str, const RelOptInfo *node) ...@@ -1882,6 +1882,7 @@ _outRelOptInfo(StringInfo str, const RelOptInfo *node)
WRITE_INT_FIELD(width); WRITE_INT_FIELD(width);
WRITE_BOOL_FIELD(consider_startup); WRITE_BOOL_FIELD(consider_startup);
WRITE_BOOL_FIELD(consider_param_startup); WRITE_BOOL_FIELD(consider_param_startup);
WRITE_BOOL_FIELD(consider_parallel);
WRITE_NODE_FIELD(reltargetlist); WRITE_NODE_FIELD(reltargetlist);
WRITE_NODE_FIELD(pathlist); WRITE_NODE_FIELD(pathlist);
WRITE_NODE_FIELD(ppilist); WRITE_NODE_FIELD(ppilist);
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "access/tsmapi.h" #include "access/tsmapi.h"
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
#include "catalog/pg_operator.h" #include "catalog/pg_operator.h"
#include "catalog/pg_proc.h"
#include "foreign/fdwapi.h" #include "foreign/fdwapi.h"
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h" #include "nodes/nodeFuncs.h"
...@@ -71,6 +72,9 @@ static void set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, ...@@ -71,6 +72,9 @@ static void set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
Index rti, RangeTblEntry *rte); Index rti, RangeTblEntry *rte);
static void set_plain_rel_size(PlannerInfo *root, RelOptInfo *rel, static void set_plain_rel_size(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte); RangeTblEntry *rte);
static void set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte);
static bool function_rte_parallel_ok(RangeTblEntry *rte);
static void set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, static void set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte); RangeTblEntry *rte);
static void set_tablesample_rel_size(PlannerInfo *root, RelOptInfo *rel, static void set_tablesample_rel_size(PlannerInfo *root, RelOptInfo *rel,
...@@ -158,7 +162,8 @@ make_one_rel(PlannerInfo *root, List *joinlist) ...@@ -158,7 +162,8 @@ make_one_rel(PlannerInfo *root, List *joinlist)
set_base_rel_consider_startup(root); set_base_rel_consider_startup(root);
/* /*
* Generate access paths for the base rels. * Generate access paths for the base rels. set_base_rel_sizes also
* sets the consider_parallel flag for each baserel, if appropriate.
*/ */
set_base_rel_sizes(root); set_base_rel_sizes(root);
set_base_rel_pathlists(root); set_base_rel_pathlists(root);
...@@ -222,9 +227,12 @@ set_base_rel_consider_startup(PlannerInfo *root) ...@@ -222,9 +227,12 @@ set_base_rel_consider_startup(PlannerInfo *root)
/* /*
* set_base_rel_sizes * set_base_rel_sizes
* Set the size estimates (rows and widths) for each base-relation entry. * Set the size estimates (rows and widths) for each base-relation entry.
* Also determine whether to consider parallel paths for base relations.
* *
* We do this in a separate pass over the base rels so that rowcount * We do this in a separate pass over the base rels so that rowcount
* estimates are available for parameterized path generation. * estimates are available for parameterized path generation, and also so
* that the consider_parallel flag is set correctly before we begin to
* generate paths.
*/ */
static void static void
set_base_rel_sizes(PlannerInfo *root) set_base_rel_sizes(PlannerInfo *root)
...@@ -234,6 +242,7 @@ set_base_rel_sizes(PlannerInfo *root) ...@@ -234,6 +242,7 @@ set_base_rel_sizes(PlannerInfo *root)
for (rti = 1; rti < root->simple_rel_array_size; rti++) for (rti = 1; rti < root->simple_rel_array_size; rti++)
{ {
RelOptInfo *rel = root->simple_rel_array[rti]; RelOptInfo *rel = root->simple_rel_array[rti];
RangeTblEntry *rte;
/* there may be empty slots corresponding to non-baserel RTEs */ /* there may be empty slots corresponding to non-baserel RTEs */
if (rel == NULL) if (rel == NULL)
...@@ -245,7 +254,19 @@ set_base_rel_sizes(PlannerInfo *root) ...@@ -245,7 +254,19 @@ set_base_rel_sizes(PlannerInfo *root)
if (rel->reloptkind != RELOPT_BASEREL) if (rel->reloptkind != RELOPT_BASEREL)
continue; continue;
set_rel_size(root, rel, rti, root->simple_rte_array[rti]); rte = root->simple_rte_array[rti];
/*
* If parallelism is allowable for this query in general, see whether
* it's allowable for this rel in particular. We have to do this
* before set_rel_size, because that if this is an inheritance parent,
* set_append_rel_size will pass the consider_parallel flag down to
* inheritance children.
*/
if (root->glob->parallelModeOK)
set_rel_consider_parallel(root, rel, rte);
set_rel_size(root, rel, rti, rte);
} }
} }
...@@ -458,6 +479,131 @@ set_plain_rel_size(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) ...@@ -458,6 +479,131 @@ set_plain_rel_size(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
set_baserel_size_estimates(root, rel); set_baserel_size_estimates(root, rel);
} }
/*
* If this relation could possibly be scanned from within a worker, then set
* the consider_parallel flag. The flag has previously been initialized to
* false, so we just bail out if it becomes clear that we can't safely set it.
*/
static void
set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel,
RangeTblEntry *rte)
{
/* Don't call this if parallelism is disallowed for the entire query. */
Assert(root->glob->parallelModeOK);
/* Don't call this for non-baserels. */
Assert(rel->reloptkind == RELOPT_BASEREL);
/* Assorted checks based on rtekind. */
switch (rte->rtekind)
{
case RTE_RELATION:
/*
* Currently, parallel workers can't access the leader's temporary
* tables. We could possibly relax this if the wrote all of its
* local buffers at the start of the query and made no changes
* thereafter (maybe we could allow hint bit changes), and if we
* taught the workers to read them. Writing a large number of
* temporary buffers could be expensive, though, and we don't have
* the rest of the necessary infrastructure right now anyway. So
* for now, bail out if we see a temporary table.
*/
if (get_rel_persistence(rte->relid) == RELPERSISTENCE_TEMP)
return;
/*
* Table sampling can be pushed down to workers if the sample
* function and its arguments are safe.
*/
if (rte->tablesample != NULL)
{
Oid proparallel = func_parallel(rte->tablesample->tsmhandler);
if (proparallel != PROPARALLEL_SAFE)
return;
if (has_parallel_hazard((Node *) rte->tablesample->args,
false))
return;
return;
}
break;
case RTE_SUBQUERY:
/*
* Subplans currently aren't passed to workers. Even if they
* were, the subplan might be using parallelism internally, and
* we can't support nested Gather nodes at present. Finally,
* we don't have a good way of knowing whether the subplan
* involves any parallel-restricted operations. It would be
* nice to relax this restriction some day, but it's going to
* take a fair amount of work.
*/
return;
case RTE_JOIN:
/* Shouldn't happen; we're only considering baserels here. */
Assert(false);
return;
case RTE_FUNCTION:
/* Check for parallel-restricted functions. */
if (!function_rte_parallel_ok(rte))
return;
break;
case RTE_VALUES:
/*
* The data for a VALUES clause is stored in the plan tree itself,
* so scanning it in a worker is fine.
*/
break;
case RTE_CTE:
/*
* CTE tuplestores aren't shared among parallel workers, so we
* force all CTE scans to happen in the leader. Also, populating
* the CTE would require executing a subplan that's not available
* in the worker, might be parallel-restricted, and must get
* executed only once.
*/
return;
}
/*
* If there's anything in baserestrictinfo that's parallel-restricted,
* we give up on parallelizing access to this relation. We could consider
* instead postponing application of the restricted quals until we're
* above all the parallelism in the plan tree, but it's not clear that
* this would be a win in very many cases, and it might be tricky to make
* outer join clauses work correctly.
*/
if (has_parallel_hazard((Node *) rel->baserestrictinfo, false))
return;
/* We have a winner. */
rel->consider_parallel = true;
}
/*
* Check whether a function RTE is scanning something parallel-restricted.
*/
static bool
function_rte_parallel_ok(RangeTblEntry *rte)
{
ListCell *lc;
foreach(lc, rte->functions)
{
RangeTblFunction *rtfunc = (RangeTblFunction *) lfirst(lc);
Assert(IsA(rtfunc, RangeTblFunction));
if (has_parallel_hazard(rtfunc->funcexpr, false))
return false;
}
return true;
}
/* /*
* set_plain_rel_pathlist * set_plain_rel_pathlist
* Build access paths for a plain relation (no subquery, no inheritance) * Build access paths for a plain relation (no subquery, no inheritance)
...@@ -466,6 +612,7 @@ static void ...@@ -466,6 +612,7 @@ static void
set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
{ {
Relids required_outer; Relids required_outer;
int parallel_threshold = 1000;
/* /*
* We don't support pushing join clauses into the quals of a seqscan, but * We don't support pushing join clauses into the quals of a seqscan, but
...@@ -477,6 +624,40 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte) ...@@ -477,6 +624,40 @@ set_plain_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)
/* Consider sequential scan */ /* Consider sequential scan */
add_path(rel, create_seqscan_path(root, rel, required_outer, 0)); add_path(rel, create_seqscan_path(root, rel, required_outer, 0));
/* Consider parallel sequential scan */
if (rel->consider_parallel && rel->pages > parallel_threshold &&
required_outer == NULL)
{
Path *path;
int parallel_degree = 1;
/*
* Limit the degree of parallelism logarithmically based on the size
* of the relation. This probably needs to be a good deal more
* sophisticated, but we need something here for now.
*/
while (rel->pages > parallel_threshold * 3 &&
parallel_degree < max_parallel_degree)
{
parallel_degree++;
parallel_threshold *= 3;
if (parallel_threshold >= PG_INT32_MAX / 3)
break;
}
/*
* Ideally we should consider postponing the gather operation until
* much later, after we've pushed joins and so on atop the parallel
* sequential scan path. But we don't have the infrastructure for
* that yet, so just do this for now.
*/
path = create_seqscan_path(root, rel, required_outer, parallel_degree);
path = (Path *)
create_gather_path(root, rel, path, required_outer,
parallel_degree);
add_path(rel, path);
}
/* Consider index scans */ /* Consider index scans */
create_index_paths(root, rel); create_index_paths(root, rel);
...@@ -714,6 +895,9 @@ set_append_rel_size(PlannerInfo *root, RelOptInfo *rel, ...@@ -714,6 +895,9 @@ set_append_rel_size(PlannerInfo *root, RelOptInfo *rel,
continue; continue;
} }
/* Copy consider_parallel flag from parent. */
childrel->consider_parallel = rel->consider_parallel;
/* /*
* CE failed, so finish copying/modifying targetlist and join quals. * CE failed, so finish copying/modifying targetlist and join quals.
* *
......
...@@ -334,7 +334,7 @@ cost_gather(GatherPath *path, PlannerInfo *root, ...@@ -334,7 +334,7 @@ cost_gather(GatherPath *path, PlannerInfo *root,
/* Parallel setup and communication cost. */ /* Parallel setup and communication cost. */
startup_cost += parallel_setup_cost; startup_cost += parallel_setup_cost;
run_cost += parallel_tuple_cost * rel->tuples; run_cost += parallel_tuple_cost * path->path.rows;
path->path.startup_cost = startup_cost; path->path.startup_cost = startup_cost;
path->path.total_cost = (startup_cost + run_cost); path->path.total_cost = (startup_cost + run_cost);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "optimizer/clauses.h"
#include "optimizer/orclauses.h" #include "optimizer/orclauses.h"
#include "optimizer/pathnode.h" #include "optimizer/pathnode.h"
#include "optimizer/paths.h" #include "optimizer/paths.h"
...@@ -70,6 +71,17 @@ query_planner(PlannerInfo *root, List *tlist, ...@@ -70,6 +71,17 @@ query_planner(PlannerInfo *root, List *tlist,
/* We need a dummy joinrel to describe the empty set of baserels */ /* We need a dummy joinrel to describe the empty set of baserels */
final_rel = build_empty_join_rel(root); final_rel = build_empty_join_rel(root);
/*
* If query allows parallelism in general, check whether the quals
* are parallel-restricted. There's currently no real benefit to
* setting this flag correctly because we can't yet reference subplans
* from parallel workers. But that might change someday, so set this
* correctly anyway.
*/
if (root->glob->parallelModeOK)
final_rel->consider_parallel =
!has_parallel_hazard(parse->jointree->quals, false);
/* The only path for it is a trivial Result path */ /* The only path for it is a trivial Result path */
add_path(final_rel, (Path *) add_path(final_rel, (Path *)
create_result_path((List *) parse->jointree->quals)); create_result_path((List *) parse->jointree->quals));
......
...@@ -204,7 +204,8 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) ...@@ -204,7 +204,8 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
/* /*
* Assess whether it's feasible to use parallel mode for this query. * Assess whether it's feasible to use parallel mode for this query.
* We can't do this in a standalone backend, or if the command will * We can't do this in a standalone backend, or if the command will
* try to modify any data, or if this is a cursor operation, or if any * try to modify any data, or if this is a cursor operation, or if
* GUCs are set to values that don't permit parallelism, or if
* parallel-unsafe functions are present in the query tree. * parallel-unsafe functions are present in the query tree.
* *
* For now, we don't try to use parallel mode if we're running inside * For now, we don't try to use parallel mode if we're running inside
...@@ -223,9 +224,9 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) ...@@ -223,9 +224,9 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
glob->parallelModeOK = (cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 && glob->parallelModeOK = (cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
IsUnderPostmaster && dynamic_shared_memory_type != DSM_IMPL_NONE && IsUnderPostmaster && dynamic_shared_memory_type != DSM_IMPL_NONE &&
parse->commandType == CMD_SELECT && !parse->hasModifyingCTE && parse->commandType == CMD_SELECT && !parse->hasModifyingCTE &&
parse->utilityStmt == NULL && !IsParallelWorker() && parse->utilityStmt == NULL && max_parallel_degree > 0 &&
!IsolationIsSerializable() && !IsParallelWorker() && !IsolationIsSerializable() &&
!contain_parallel_unsafe((Node *) parse); !has_parallel_hazard((Node *) parse, true);
/* /*
* glob->parallelModeOK should tell us whether it's necessary to impose * glob->parallelModeOK should tell us whether it's necessary to impose
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "access/htup_details.h" #include "access/htup_details.h"
#include "catalog/pg_aggregate.h" #include "catalog/pg_aggregate.h"
#include "catalog/pg_class.h"
#include "catalog/pg_language.h" #include "catalog/pg_language.h"
#include "catalog/pg_operator.h" #include "catalog/pg_operator.h"
#include "catalog/pg_proc.h" #include "catalog/pg_proc.h"
...@@ -87,6 +88,11 @@ typedef struct ...@@ -87,6 +88,11 @@ typedef struct
char *prosrc; char *prosrc;
} inline_error_callback_arg; } inline_error_callback_arg;
typedef struct
{
bool allow_restricted;
} has_parallel_hazard_arg;
static bool contain_agg_clause_walker(Node *node, void *context); static bool contain_agg_clause_walker(Node *node, void *context);
static bool count_agg_clauses_walker(Node *node, static bool count_agg_clauses_walker(Node *node,
count_agg_clauses_context *context); count_agg_clauses_context *context);
...@@ -96,7 +102,11 @@ static bool contain_subplans_walker(Node *node, void *context); ...@@ -96,7 +102,11 @@ static bool contain_subplans_walker(Node *node, void *context);
static bool contain_mutable_functions_walker(Node *node, void *context); static bool contain_mutable_functions_walker(Node *node, void *context);
static bool contain_volatile_functions_walker(Node *node, void *context); static bool contain_volatile_functions_walker(Node *node, void *context);
static bool contain_volatile_functions_not_nextval_walker(Node *node, void *context); static bool contain_volatile_functions_not_nextval_walker(Node *node, void *context);
static bool contain_parallel_unsafe_walker(Node *node, void *context); static bool has_parallel_hazard_walker(Node *node,
has_parallel_hazard_arg *context);
static bool parallel_too_dangerous(char proparallel,
has_parallel_hazard_arg *context);
static bool typeid_is_temp(Oid typeid);
static bool contain_nonstrict_functions_walker(Node *node, void *context); static bool contain_nonstrict_functions_walker(Node *node, void *context);
static bool contain_leaked_vars_walker(Node *node, void *context); static bool contain_leaked_vars_walker(Node *node, void *context);
static Relids find_nonnullable_rels_walker(Node *node, bool top_level); static Relids find_nonnullable_rels_walker(Node *node, bool top_level);
...@@ -1200,63 +1210,159 @@ contain_volatile_functions_not_nextval_walker(Node *node, void *context) ...@@ -1200,63 +1210,159 @@ contain_volatile_functions_not_nextval_walker(Node *node, void *context)
} }
/***************************************************************************** /*****************************************************************************
* Check queries for parallel-unsafe constructs * Check queries for parallel unsafe and/or restricted constructs
*****************************************************************************/ *****************************************************************************/
/*
* Check whether a node tree contains parallel hazards. This is used both
* on the entire query tree, to see whether the query can be parallelized at
* all, and also to evaluate whether a particular expression is safe to
* run in a parallel worker. We could separate these concerns into two
* different functions, but there's enough overlap that it doesn't seem
* worthwhile.
*/
bool bool
contain_parallel_unsafe(Node *node) has_parallel_hazard(Node *node, bool allow_restricted)
{ {
return contain_parallel_unsafe_walker(node, NULL); has_parallel_hazard_arg context;
context.allow_restricted = allow_restricted;
return has_parallel_hazard_walker(node, &context);
} }
static bool static bool
contain_parallel_unsafe_walker(Node *node, void *context) has_parallel_hazard_walker(Node *node, has_parallel_hazard_arg *context)
{ {
if (node == NULL) if (node == NULL)
return false; return false;
/*
* When we're first invoked on a completely unplanned tree, we must
* recurse through Query objects to as to locate parallel-unsafe
* constructs anywhere in the tree.
*
* Later, we'll be called again for specific quals, possibly after
* some planning has been done, we may encounter SubPlan, SubLink,
* or AlternativeSubLink nodes. Currently, there's no need to recurse
* through these; they can't be unsafe, since we've already cleared
* the entire query of unsafe operations, and they're definitely
* parallel-restricted.
*/
if (IsA(node, Query))
{
Query *query = (Query *) node;
if (query->rowMarks != NULL)
return true;
/* Recurse into subselects */
return query_tree_walker(query,
has_parallel_hazard_walker,
context, 0);
}
else if (IsA(node, SubPlan) || IsA(node, SubLink) ||
IsA(node, AlternativeSubPlan) || IsA(node, Param))
{
/*
* Since we don't have the ability to push subplans down to workers
* at present, we treat subplan references as parallel-restricted.
*/
if (!context->allow_restricted)
return true;
}
/* This is just a notational convenience for callers. */
if (IsA(node, RestrictInfo))
{
RestrictInfo *rinfo = (RestrictInfo *) node;
return has_parallel_hazard_walker((Node *) rinfo->clause, context);
}
/*
* It is an error for a parallel worker to touch a temporary table in any
* way, so we can't handle nodes whose type is the rowtype of such a table.
*/
if (!context->allow_restricted)
{
switch (nodeTag(node))
{
case T_Var:
case T_Const:
case T_Param:
case T_Aggref:
case T_WindowFunc:
case T_ArrayRef:
case T_FuncExpr:
case T_NamedArgExpr:
case T_OpExpr:
case T_DistinctExpr:
case T_NullIfExpr:
case T_FieldSelect:
case T_FieldStore:
case T_RelabelType:
case T_CoerceViaIO:
case T_ArrayCoerceExpr:
case T_ConvertRowtypeExpr:
case T_CaseExpr:
case T_CaseTestExpr:
case T_ArrayExpr:
case T_RowExpr:
case T_CoalesceExpr:
case T_MinMaxExpr:
case T_CoerceToDomain:
case T_CoerceToDomainValue:
case T_SetToDefault:
if (typeid_is_temp(exprType(node)))
return true;
break;
default:
break;
}
}
/*
* For each node that might potentially call a function, we need to
* examine the pg_proc.proparallel marking for that function to see
* whether it's safe enough for the current value of allow_restricted.
*/
if (IsA(node, FuncExpr)) if (IsA(node, FuncExpr))
{ {
FuncExpr *expr = (FuncExpr *) node; FuncExpr *expr = (FuncExpr *) node;
if (func_parallel(expr->funcid) == PROPARALLEL_UNSAFE) if (parallel_too_dangerous(func_parallel(expr->funcid), context))
return true; return true;
/* else fall through to check args */
} }
else if (IsA(node, OpExpr)) else if (IsA(node, OpExpr))
{ {
OpExpr *expr = (OpExpr *) node; OpExpr *expr = (OpExpr *) node;
set_opfuncid(expr); set_opfuncid(expr);
if (func_parallel(expr->opfuncid) == PROPARALLEL_UNSAFE) if (parallel_too_dangerous(func_parallel(expr->opfuncid), context))
return true; return true;
/* else fall through to check args */
} }
else if (IsA(node, DistinctExpr)) else if (IsA(node, DistinctExpr))
{ {
DistinctExpr *expr = (DistinctExpr *) node; DistinctExpr *expr = (DistinctExpr *) node;
set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */ set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */
if (func_parallel(expr->opfuncid) == PROPARALLEL_UNSAFE) if (parallel_too_dangerous(func_parallel(expr->opfuncid), context))
return true; return true;
/* else fall through to check args */
} }
else if (IsA(node, NullIfExpr)) else if (IsA(node, NullIfExpr))
{ {
NullIfExpr *expr = (NullIfExpr *) node; NullIfExpr *expr = (NullIfExpr *) node;
set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */ set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */
if (func_parallel(expr->opfuncid) == PROPARALLEL_UNSAFE) if (parallel_too_dangerous(func_parallel(expr->opfuncid), context))
return true; return true;
/* else fall through to check args */
} }
else if (IsA(node, ScalarArrayOpExpr)) else if (IsA(node, ScalarArrayOpExpr))
{ {
ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) node; ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) node;
set_sa_opfuncid(expr); set_sa_opfuncid(expr);
if (func_parallel(expr->opfuncid) == PROPARALLEL_UNSAFE) if (parallel_too_dangerous(func_parallel(expr->opfuncid), context))
return true; return true;
/* else fall through to check args */
} }
else if (IsA(node, CoerceViaIO)) else if (IsA(node, CoerceViaIO))
{ {
...@@ -1268,54 +1374,61 @@ contain_parallel_unsafe_walker(Node *node, void *context) ...@@ -1268,54 +1374,61 @@ contain_parallel_unsafe_walker(Node *node, void *context)
/* check the result type's input function */ /* check the result type's input function */
getTypeInputInfo(expr->resulttype, getTypeInputInfo(expr->resulttype,
&iofunc, &typioparam); &iofunc, &typioparam);
if (func_parallel(iofunc) == PROPARALLEL_UNSAFE) if (parallel_too_dangerous(func_parallel(iofunc), context))
return true; return true;
/* check the input type's output function */ /* check the input type's output function */
getTypeOutputInfo(exprType((Node *) expr->arg), getTypeOutputInfo(exprType((Node *) expr->arg),
&iofunc, &typisvarlena); &iofunc, &typisvarlena);
if (func_parallel(iofunc) == PROPARALLEL_UNSAFE) if (parallel_too_dangerous(func_parallel(iofunc), context))
return true; return true;
/* else fall through to check args */
} }
else if (IsA(node, ArrayCoerceExpr)) else if (IsA(node, ArrayCoerceExpr))
{ {
ArrayCoerceExpr *expr = (ArrayCoerceExpr *) node; ArrayCoerceExpr *expr = (ArrayCoerceExpr *) node;
if (OidIsValid(expr->elemfuncid) && if (OidIsValid(expr->elemfuncid) &&
func_parallel(expr->elemfuncid) == PROPARALLEL_UNSAFE) parallel_too_dangerous(func_parallel(expr->elemfuncid), context))
return true; return true;
/* else fall through to check args */
} }
else if (IsA(node, RowCompareExpr)) else if (IsA(node, RowCompareExpr))
{ {
/* RowCompare probably can't have volatile ops, but check anyway */
RowCompareExpr *rcexpr = (RowCompareExpr *) node; RowCompareExpr *rcexpr = (RowCompareExpr *) node;
ListCell *opid; ListCell *opid;
foreach(opid, rcexpr->opnos) foreach(opid, rcexpr->opnos)
{ {
if (op_volatile(lfirst_oid(opid)) == PROPARALLEL_UNSAFE) Oid opfuncid = get_opcode(lfirst_oid(opid));
if (parallel_too_dangerous(func_parallel(opfuncid), context))
return true; return true;
} }
/* else fall through to check args */
} }
else if (IsA(node, Query))
{
Query *query = (Query *) node;
if (query->rowMarks != NULL)
return true;
/* Recurse into subselects */ /* ... and recurse to check substructure */
return query_tree_walker(query,
contain_parallel_unsafe_walker,
context, 0);
}
return expression_tree_walker(node, return expression_tree_walker(node,
contain_parallel_unsafe_walker, has_parallel_hazard_walker,
context); context);
} }
static bool
parallel_too_dangerous(char proparallel, has_parallel_hazard_arg *context)
{
if (context->allow_restricted)
return proparallel == PROPARALLEL_UNSAFE;
else
return proparallel != PROPARALLEL_SAFE;
}
static bool
typeid_is_temp(Oid typeid)
{
Oid relid = get_typ_typrelid(typeid);
if (!OidIsValid(relid))
return false;
return (get_rel_persistence(relid) == RELPERSISTENCE_TEMP);
}
/***************************************************************************** /*****************************************************************************
* Check clauses for nonstrict functions * Check clauses for nonstrict functions
*****************************************************************************/ *****************************************************************************/
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "optimizer/clauses.h"
#include "optimizer/cost.h" #include "optimizer/cost.h"
#include "optimizer/pathnode.h" #include "optimizer/pathnode.h"
#include "optimizer/paths.h" #include "optimizer/paths.h"
...@@ -102,6 +103,7 @@ build_simple_rel(PlannerInfo *root, int relid, RelOptKind reloptkind) ...@@ -102,6 +103,7 @@ build_simple_rel(PlannerInfo *root, int relid, RelOptKind reloptkind)
/* cheap startup cost is interesting iff not all tuples to be retrieved */ /* cheap startup cost is interesting iff not all tuples to be retrieved */
rel->consider_startup = (root->tuple_fraction > 0); rel->consider_startup = (root->tuple_fraction > 0);
rel->consider_param_startup = false; /* might get changed later */ rel->consider_param_startup = false; /* might get changed later */
rel->consider_parallel = false; /* might get changed later */
rel->reltargetlist = NIL; rel->reltargetlist = NIL;
rel->pathlist = NIL; rel->pathlist = NIL;
rel->ppilist = NIL; rel->ppilist = NIL;
...@@ -363,6 +365,7 @@ build_join_rel(PlannerInfo *root, ...@@ -363,6 +365,7 @@ build_join_rel(PlannerInfo *root,
/* cheap startup cost is interesting iff not all tuples to be retrieved */ /* cheap startup cost is interesting iff not all tuples to be retrieved */
joinrel->consider_startup = (root->tuple_fraction > 0); joinrel->consider_startup = (root->tuple_fraction > 0);
joinrel->consider_param_startup = false; joinrel->consider_param_startup = false;
joinrel->consider_parallel = false;
joinrel->reltargetlist = NIL; joinrel->reltargetlist = NIL;
joinrel->pathlist = NIL; joinrel->pathlist = NIL;
joinrel->ppilist = NIL; joinrel->ppilist = NIL;
...@@ -441,6 +444,24 @@ build_join_rel(PlannerInfo *root, ...@@ -441,6 +444,24 @@ build_join_rel(PlannerInfo *root,
set_joinrel_size_estimates(root, joinrel, outer_rel, inner_rel, set_joinrel_size_estimates(root, joinrel, outer_rel, inner_rel,
sjinfo, restrictlist); sjinfo, restrictlist);
/*
* Set the consider_parallel flag if this joinrel could potentially be
* scanned within a parallel worker. If this flag is false for either
* inner_rel or outer_rel, then it must be false for the joinrel also.
* Even if both are true, there might be parallel-restricted quals at
* our level.
*
* Note that if there are more than two rels in this relation, they
* could be divided between inner_rel and outer_rel in any arbitary
* way. We assume this doesn't matter, because we should hit all the
* same baserels and joinclauses while building up to this joinrel no
* matter which we take; therefore, we should make the same decision
* here however we get here.
*/
if (inner_rel->consider_parallel && outer_rel->consider_parallel &&
!has_parallel_hazard((Node *) restrictlist, false))
joinrel->consider_parallel = true;
/* /*
* Add the joinrel to the query's joinrel list, and store it into the * Add the joinrel to the query's joinrel list, and store it into the
* auxiliary hashtable if there is one. NB: GEQO requires us to append * auxiliary hashtable if there is one. NB: GEQO requires us to append
......
...@@ -1787,6 +1787,28 @@ get_rel_tablespace(Oid relid) ...@@ -1787,6 +1787,28 @@ get_rel_tablespace(Oid relid)
return InvalidOid; return InvalidOid;
} }
/*
* get_rel_persistence
*
* Returns the relpersistence associated with a given relation.
*/
char
get_rel_persistence(Oid relid)
{
HeapTuple tp;
Form_pg_class reltup;
char result;
tp = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for relation %u", relid);
reltup = (Form_pg_class) GETSTRUCT(tp);
result = reltup->relpersistence;
ReleaseSysCache(tp);
return result;
}
/* ---------- TRANSFORM CACHE ---------- */ /* ---------- TRANSFORM CACHE ---------- */
......
...@@ -452,6 +452,7 @@ typedef struct RelOptInfo ...@@ -452,6 +452,7 @@ typedef struct RelOptInfo
/* per-relation planner control flags */ /* per-relation planner control flags */
bool consider_startup; /* keep cheap-startup-cost paths? */ bool consider_startup; /* keep cheap-startup-cost paths? */
bool consider_param_startup; /* ditto, for parameterized paths? */ bool consider_param_startup; /* ditto, for parameterized paths? */
bool consider_parallel; /* consider parallel paths? */
/* materialization information */ /* materialization information */
List *reltargetlist; /* Vars to be output by scan of relation */ List *reltargetlist; /* Vars to be output by scan of relation */
......
...@@ -62,7 +62,7 @@ extern bool contain_subplans(Node *clause); ...@@ -62,7 +62,7 @@ extern bool contain_subplans(Node *clause);
extern bool contain_mutable_functions(Node *clause); extern bool contain_mutable_functions(Node *clause);
extern bool contain_volatile_functions(Node *clause); extern bool contain_volatile_functions(Node *clause);
extern bool contain_volatile_functions_not_nextval(Node *clause); extern bool contain_volatile_functions_not_nextval(Node *clause);
extern bool contain_parallel_unsafe(Node *node); extern bool has_parallel_hazard(Node *node, bool allow_restricted);
extern bool contain_nonstrict_functions(Node *clause); extern bool contain_nonstrict_functions(Node *clause);
extern bool contain_leaked_vars(Node *clause); extern bool contain_leaked_vars(Node *clause);
......
...@@ -103,6 +103,7 @@ extern Oid get_rel_namespace(Oid relid); ...@@ -103,6 +103,7 @@ extern Oid get_rel_namespace(Oid relid);
extern Oid get_rel_type_id(Oid relid); extern Oid get_rel_type_id(Oid relid);
extern char get_rel_relkind(Oid relid); extern char get_rel_relkind(Oid relid);
extern Oid get_rel_tablespace(Oid relid); extern Oid get_rel_tablespace(Oid relid);
extern char get_rel_persistence(Oid relid);
extern Oid get_transform_fromsql(Oid typid, Oid langid, List *trftypes); extern Oid get_transform_fromsql(Oid typid, Oid langid, List *trftypes);
extern Oid get_transform_tosql(Oid typid, Oid langid, List *trftypes); extern Oid get_transform_tosql(Oid typid, Oid langid, List *trftypes);
extern bool get_typisdefined(Oid typid); extern bool get_typisdefined(Oid typid);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment