Commit 7aea8e4f authored by Robert Haas's avatar Robert Haas

Determine whether it's safe to attempt a parallel plan for a query.

Commit 924bcf4f introduced a framework
for parallel computation in PostgreSQL that makes most but not all
built-in functions safe to execute in parallel mode.  In order to have
parallel query, we'll need to be able to determine whether that query
contains functions (either built-in or user-defined) that cannot be
safely executed in parallel mode.  This requires those functions to be
labeled, so this patch introduces an infrastructure for that.  Some
functions currently labeled as safe may need to be revised depending on
how pending issues related to heavyweight locking under paralllelism
are resolved.

Parallel plans can't be used except for the case where the query will
run to completion.  If portal execution were suspended, the parallel
mode restrictions would need to remain in effect during that time, but
that might make other queries fail.  Therefore, this patch introduces
a framework that enables consideration of parallel plans only when it
is known that the plan will be run to completion.  This probably needs
some refinement; for example, at bind time, we do not know whether a
query run via the extended protocol will be execution to completion or
run with a limited fetch count.  Having the client indicate its
intentions at bind time would constitute a wire protocol break.  Some
contexts in which parallel mode would be safe are not adjusted by this
patch; the default is not to try parallel plans except from call sites
that have been updated to say that such plans are OK.

This commit doesn't introduce any parallel paths or plans; it just
provides a way to determine whether they could potentially be used.
I'm committing it on the theory that the remaining parallel sequential
scan patches will also get committed to this release, hopefully in the
not-too-distant future.

Robert Haas and Amit Kapila.  Reviewed (in earlier versions) by Noah
Misch.
parent b44d92b6
......@@ -4994,6 +4994,23 @@
</entry>
</row>
<row>
<entry><structfield>proparallel</structfield></entry>
<entry><type>char</type></entry>
<entry></entry>
<entry>
<structfield>proparallel</structfield> tells whether the function
can be safely run in parallel mode.
It is <literal>s</literal> for functions which are safe to run in
parallel mode without restriction.
It is <literal>r</literal> for functions which can be run in parallel
mode, but their execution is restricted to the parallel group leader;
parallel worker processes cannot invoke these functions.
It is <literal>u</literal> for functions which are unsafe in parallel
mode; the presence of such a function forces a serial execution plan.
</entry>
</row>
<row>
<entry><structfield>pronargs</structfield></entry>
<entry><type>int2</type></entry>
......
......@@ -35,6 +35,7 @@ ALTER FUNCTION <replaceable>name</replaceable> ( [ [ <replaceable class="paramet
CALLED ON NULL INPUT | RETURNS NULL ON NULL INPUT | STRICT
IMMUTABLE | STABLE | VOLATILE | [ NOT ] LEAKPROOF
[ EXTERNAL ] SECURITY INVOKER | [ EXTERNAL ] SECURITY DEFINER
PARALLEL { UNSAFE | RESTRICTED | SAFE }
COST <replaceable class="parameter">execution_cost</replaceable>
ROWS <replaceable class="parameter">result_rows</replaceable>
SET <replaceable class="parameter">configuration_parameter</replaceable> { TO | = } { <replaceable class="parameter">value</replaceable> | DEFAULT }
......@@ -191,6 +192,17 @@ ALTER FUNCTION <replaceable>name</replaceable> ( [ [ <replaceable class="paramet
</listitem>
</varlistentry>
<varlistentry>
<term><literal>PARALLEL</literal></term>
<listitem>
<para>
Change whether the function is deemed safe for parallelism.
See <xref linkend="sql-createfunction"> for details.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>LEAKPROOF</literal></term>
<listitem>
......
......@@ -30,6 +30,7 @@ CREATE [ OR REPLACE ] FUNCTION
| IMMUTABLE | STABLE | VOLATILE | [ NOT ] LEAKPROOF
| CALLED ON NULL INPUT | RETURNS NULL ON NULL INPUT | STRICT
| [ EXTERNAL ] SECURITY INVOKER | [ EXTERNAL ] SECURITY DEFINER
| PARALLEL { UNSAFE | RESTRICTED | SAFE }
| COST <replaceable class="parameter">execution_cost</replaceable>
| ROWS <replaceable class="parameter">result_rows</replaceable>
| SET <replaceable class="parameter">configuration_parameter</replaceable> { TO <replaceable class="parameter">value</replaceable> | = <replaceable class="parameter">value</replaceable> | FROM CURRENT }
......@@ -411,6 +412,43 @@ CREATE [ OR REPLACE ] FUNCTION
</listitem>
</varlistentry>
<varlistentry>
<term><literal>PARALLEL</literal></term>
<listitem>
<para><literal>PARALLEL UNSAFE</literal> indicates that the function
can't be executed in parallel mode and the presence of such a
function in an SQL statement forces a serial execution plan. This is
the default. <literal>PARALLEL RESTRICTED</literal> indicates that
the function can be executed in parallel mode, but the execution is
restricted to parallel group leader. <literal>PARALLEL SAFE</literal>
indicates that the function is safe to run in parallel mode without
restriction.
</para>
<para>
Functions should be labeled parallel unsafe if they modify any database
state, or if they make changes to the transaction such as using
sub-transactions, or if they access sequences or attempt to make
persistent changes to settings (e.g. <literal>setval</>). They should
be labeled as parallel restricted if they access temporary tables,
client connection state, cursors, prepared statements, or miscellaneous
backend-local state which the system cannot synchronize in parallel mode
(e.g. <literal>setseed</> cannot be executed other than by the group
leader because a change made by another process would not be reflected
in the leader). In general, if a function is labeled as being safe when
it is restricted or unsafe, or if it is labeled as being restricted when
it is in fact unsafe, it may throw errors or produce wrong answers
when used in a parallel query. C-language functions could in theory
exhibit totally undefined behavior if mislabeled, since there is no way
for the system to protect itself against arbitrary C code, but in most
likely cases the result will be no worse than for any other function.
If in doubt, functions should be labeled as <literal>UNSAFE</>, which is
the default.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="parameter">execution_cost</replaceable></term>
......
......@@ -540,6 +540,7 @@ AggregateCreate(const char *aggName,
false, /* isStrict (not needed for agg) */
PROVOLATILE_IMMUTABLE, /* volatility (not
* needed for agg) */
PROPARALLEL_UNSAFE,
parameterTypes, /* paramTypes */
allParameterTypes, /* allParamTypes */
parameterModes, /* parameterModes */
......
......@@ -83,6 +83,7 @@ ProcedureCreate(const char *procedureName,
bool isLeakProof,
bool isStrict,
char volatility,
char parallel,
oidvector *parameterTypes,
Datum allParameterTypes,
Datum parameterModes,
......@@ -344,6 +345,7 @@ ProcedureCreate(const char *procedureName,
values[Anum_pg_proc_proisstrict - 1] = BoolGetDatum(isStrict);
values[Anum_pg_proc_proretset - 1] = BoolGetDatum(returnsSet);
values[Anum_pg_proc_provolatile - 1] = CharGetDatum(volatility);
values[Anum_pg_proc_proparallel - 1] = CharGetDatum(parallel);
values[Anum_pg_proc_pronargs - 1] = UInt16GetDatum(parameterCount);
values[Anum_pg_proc_pronargdefaults - 1] = UInt16GetDatum(list_length(parameterDefaults));
values[Anum_pg_proc_prorettype - 1] = ObjectIdGetDatum(returnType);
......
......@@ -348,7 +348,7 @@ ExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
INSTR_TIME_SET_CURRENT(planstart);
/* plan the query */
plan = pg_plan_query(query, 0, params);
plan = pg_plan_query(query, CURSOR_OPT_PARALLEL_OK, params);
INSTR_TIME_SET_CURRENT(planduration);
INSTR_TIME_SUBTRACT(planduration, planstart);
......
......@@ -707,7 +707,7 @@ execute_sql_string(const char *sql, const char *filename)
sql,
NULL,
0);
stmt_list = pg_plan_queries(stmt_list, 0, NULL);
stmt_list = pg_plan_queries(stmt_list, CURSOR_OPT_PARALLEL_OK, NULL);
foreach(lc2, stmt_list)
{
......
......@@ -465,7 +465,8 @@ compute_common_attribute(DefElem *defel,
DefElem **leakproof_item,
List **set_items,
DefElem **cost_item,
DefElem **rows_item)
DefElem **rows_item,
DefElem **parallel_item)
{
if (strcmp(defel->defname, "volatility") == 0)
{
......@@ -513,6 +514,13 @@ compute_common_attribute(DefElem *defel,
*rows_item = defel;
}
else if (strcmp(defel->defname, "parallel") == 0)
{
if (*parallel_item)
goto duplicate_error;
*parallel_item = defel;
}
else
return false;
......@@ -544,6 +552,27 @@ interpret_func_volatility(DefElem *defel)
}
}
static char
interpret_func_parallel(DefElem *defel)
{
char *str = strVal(defel->arg);
if (strcmp(str, "safe") == 0)
return PROPARALLEL_SAFE;
else if (strcmp(str, "unsafe") == 0)
return PROPARALLEL_UNSAFE;
else if (strcmp(str, "restricted") == 0)
return PROPARALLEL_RESTRICTED;
else
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("parallel option \"%s\" not recognized",
str)));
return PROPARALLEL_UNSAFE; /* keep compiler quiet */
}
}
/*
* Update a proconfig value according to a list of VariableSetStmt items.
*
......@@ -592,7 +621,8 @@ compute_attributes_sql_style(List *options,
bool *leakproof_p,
ArrayType **proconfig,
float4 *procost,
float4 *prorows)
float4 *prorows,
char *parallel_p)
{
ListCell *option;
DefElem *as_item = NULL;
......@@ -606,6 +636,7 @@ compute_attributes_sql_style(List *options,
List *set_items = NIL;
DefElem *cost_item = NULL;
DefElem *rows_item = NULL;
DefElem *parallel_item = NULL;
foreach(option, options)
{
......@@ -650,7 +681,8 @@ compute_attributes_sql_style(List *options,
&leakproof_item,
&set_items,
&cost_item,
&rows_item))
&rows_item,
&parallel_item))
{
/* recognized common option */
continue;
......@@ -712,6 +744,8 @@ compute_attributes_sql_style(List *options,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("ROWS must be positive")));
}
if (parallel_item)
*parallel_p = interpret_func_parallel(parallel_item);
}
......@@ -858,6 +892,7 @@ CreateFunction(CreateFunctionStmt *stmt, const char *queryString)
HeapTuple languageTuple;
Form_pg_language languageStruct;
List *as_clause;
char parallel;
/* Convert list of names to a name and namespace */
namespaceId = QualifiedNameGetCreationNamespace(stmt->funcname,
......@@ -878,13 +913,14 @@ CreateFunction(CreateFunctionStmt *stmt, const char *queryString)
proconfig = NULL;
procost = -1; /* indicates not set */
prorows = -1; /* indicates not set */
parallel = PROPARALLEL_UNSAFE;
/* override attributes from explicit list */
compute_attributes_sql_style(stmt->options,
&as_clause, &language, &transformDefElem,
&isWindowFunc, &volatility,
&isStrict, &security, &isLeakProof,
&proconfig, &procost, &prorows);
&proconfig, &procost, &prorows, &parallel);
/* Look up the language and validate permissions */
languageTuple = SearchSysCache1(LANGNAME, PointerGetDatum(language));
......@@ -1061,6 +1097,7 @@ CreateFunction(CreateFunctionStmt *stmt, const char *queryString)
isLeakProof,
isStrict,
volatility,
parallel,
parameterTypes,
PointerGetDatum(allParameterTypes),
PointerGetDatum(parameterModes),
......@@ -1141,6 +1178,7 @@ AlterFunction(AlterFunctionStmt *stmt)
List *set_items = NIL;
DefElem *cost_item = NULL;
DefElem *rows_item = NULL;
DefElem *parallel_item = NULL;
ObjectAddress address;
rel = heap_open(ProcedureRelationId, RowExclusiveLock);
......@@ -1178,7 +1216,8 @@ AlterFunction(AlterFunctionStmt *stmt)
&leakproof_item,
&set_items,
&cost_item,
&rows_item) == false)
&rows_item,
&parallel_item) == false)
elog(ERROR, "option \"%s\" not recognized", defel->defname);
}
......@@ -1250,6 +1289,8 @@ AlterFunction(AlterFunctionStmt *stmt)
tup = heap_modify_tuple(tup, RelationGetDescr(rel),
repl_val, repl_null, repl_repl);
}
if (parallel_item)
procForm->proparallel = interpret_func_parallel(parallel_item);
/* Do the update */
simple_heap_update(rel, &tup->t_self, tup);
......
......@@ -135,6 +135,7 @@ CreateProceduralLanguage(CreatePLangStmt *stmt)
false, /* isLeakProof */
false, /* isStrict */
PROVOLATILE_VOLATILE,
PROPARALLEL_UNSAFE,
buildoidvector(funcargtypes, 0),
PointerGetDatum(NULL),
PointerGetDatum(NULL),
......@@ -174,6 +175,7 @@ CreateProceduralLanguage(CreatePLangStmt *stmt)
false, /* isLeakProof */
true, /* isStrict */
PROVOLATILE_VOLATILE,
PROPARALLEL_UNSAFE,
buildoidvector(funcargtypes, 1),
PointerGetDatum(NULL),
PointerGetDatum(NULL),
......@@ -216,6 +218,7 @@ CreateProceduralLanguage(CreatePLangStmt *stmt)
false, /* isLeakProof */
true, /* isStrict */
PROVOLATILE_VOLATILE,
PROPARALLEL_UNSAFE,
buildoidvector(funcargtypes, 1),
PointerGetDatum(NULL),
PointerGetDatum(NULL),
......
......@@ -1611,6 +1611,7 @@ makeRangeConstructors(const char *name, Oid namespace,
false, /* leakproof */
false, /* isStrict */
PROVOLATILE_IMMUTABLE, /* volatility */
PROPARALLEL_SAFE, /* parallel safety */
constructorArgTypesVector, /* parameterTypes */
PointerGetDatum(NULL), /* allParameterTypes */
PointerGetDatum(NULL), /* parameterModes */
......
......@@ -243,6 +243,11 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
if (!(eflags & (EXEC_FLAG_SKIP_TRIGGERS | EXEC_FLAG_EXPLAIN_ONLY)))
AfterTriggerBeginQuery();
/* Enter parallel mode, if required by the query. */
if (queryDesc->plannedstmt->parallelModeNeeded &&
!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
EnterParallelMode();
MemoryContextSwitchTo(oldcontext);
}
......@@ -474,6 +479,11 @@ standard_ExecutorEnd(QueryDesc *queryDesc)
*/
MemoryContextSwitchTo(oldcontext);
/* Exit parallel mode, if it was required by the query. */
if (queryDesc->plannedstmt->parallelModeNeeded &&
!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY))
ExitParallelMode();
/*
* Release EState and per-query memory context. This should release
* everything the executor has allocated.
......
......@@ -496,7 +496,9 @@ init_execution_state(List *queryTree_list,
if (queryTree->commandType == CMD_UTILITY)
stmt = queryTree->utilityStmt;
else
stmt = (Node *) pg_plan_query(queryTree, 0, NULL);
stmt = (Node *) pg_plan_query(queryTree,
fcache->readonly_func ? CURSOR_OPT_PARALLEL_OK : 0,
NULL);
/* Precheck all commands for validity in a function */
if (IsA(stmt, TransactionStmt))
......
......@@ -94,6 +94,7 @@ _copyPlannedStmt(const PlannedStmt *from)
COPY_NODE_FIELD(invalItems);
COPY_SCALAR_FIELD(nParamExec);
COPY_SCALAR_FIELD(hasRowSecurity);
COPY_SCALAR_FIELD(parallelModeNeeded);
return newnode;
}
......
......@@ -256,6 +256,7 @@ _outPlannedStmt(StringInfo str, const PlannedStmt *node)
WRITE_NODE_FIELD(invalItems);
WRITE_INT_FIELD(nParamExec);
WRITE_BOOL_FIELD(hasRowSecurity);
WRITE_BOOL_FIELD(parallelModeNeeded);
}
/*
......@@ -1787,6 +1788,8 @@ _outPlannerGlobal(StringInfo str, const PlannerGlobal *node)
WRITE_UINT_FIELD(lastRowMarkId);
WRITE_BOOL_FIELD(transientPlan);
WRITE_BOOL_FIELD(hasRowSecurity);
WRITE_BOOL_FIELD(parallelModeOK);
WRITE_BOOL_FIELD(parallelModeNeeded);
}
static void
......
......@@ -19,6 +19,7 @@
#include <math.h>
#include "access/htup_details.h"
#include "access/parallel.h"
#include "executor/executor.h"
#include "executor/nodeAgg.h"
#include "foreign/fdwapi.h"
......@@ -43,6 +44,7 @@
#include "parser/parsetree.h"
#include "parser/parse_agg.h"
#include "rewrite/rewriteManip.h"
#include "storage/dsm_impl.h"
#include "utils/rel.h"
#include "utils/selfuncs.h"
......@@ -197,6 +199,48 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
glob->transientPlan = false;
glob->hasRowSecurity = false;
/*
* Assess whether it's feasible to use parallel mode for this query.
* We can't do this in a standalone backend, or if the command will
* try to modify any data, or if this is a cursor operation, or if any
* parallel-unsafe functions are present in the query tree.
*
* For now, we don't try to use parallel mode if we're running inside
* a parallel worker. We might eventually be able to relax this
* restriction, but for now it seems best not to have parallel workers
* trying to create their own parallel workers.
*/
glob->parallelModeOK = (cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
IsUnderPostmaster && dynamic_shared_memory_type != DSM_IMPL_NONE &&
parse->commandType == CMD_SELECT && !parse->hasModifyingCTE &&
parse->utilityStmt == NULL && !IsParallelWorker() &&
!contain_parallel_unsafe((Node *) parse);
/*
* glob->parallelModeOK should tell us whether it's necessary to impose
* the parallel mode restrictions, but we don't actually want to impose
* them unless we choose a parallel plan, so that people who mislabel
* their functions but don't use parallelism anyway aren't harmed.
* However, it's useful for testing purposes to be able to force the
* restrictions to be imposed whenever a parallel plan is actually chosen
* or not.
*
* (It's been suggested that we should always impose these restrictions
* whenever glob->parallelModeOK is true, so that it's easier to notice
* incorrectly-labeled functions sooner. That might be the right thing
* to do, but for now I've taken this approach. We could also control
* this with a GUC.)
*
* FIXME: It's assumed that code further down will set parallelModeNeeded
* to true if a parallel path is actually chosen. Since the core
* parallelism code isn't committed yet, this currently never happens.
*/
#ifdef FORCE_PARALLEL_MODE
glob->parallelModeNeeded = glob->parallelModeOK;
#else
glob->parallelModeNeeded = false;
#endif
/* Determine what fraction of the plan is likely to be scanned */
if (cursorOptions & CURSOR_OPT_FAST_PLAN)
{
......@@ -293,6 +337,7 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
result->invalItems = glob->invalItems;
result->nParamExec = glob->nParamExec;
result->hasRowSecurity = glob->hasRowSecurity;
result->parallelModeNeeded = glob->parallelModeNeeded;
return result;
}
......
......@@ -96,6 +96,7 @@ static bool contain_subplans_walker(Node *node, void *context);
static bool contain_mutable_functions_walker(Node *node, void *context);
static bool contain_volatile_functions_walker(Node *node, void *context);
static bool contain_volatile_functions_not_nextval_walker(Node *node, void *context);
static bool contain_parallel_unsafe_walker(Node *node, void *context);
static bool contain_nonstrict_functions_walker(Node *node, void *context);
static bool contain_leaked_vars_walker(Node *node, void *context);
static Relids find_nonnullable_rels_walker(Node *node, bool top_level);
......@@ -1198,6 +1199,123 @@ contain_volatile_functions_not_nextval_walker(Node *node, void *context)
context);
}
/*****************************************************************************
* Check queries for parallel-unsafe constructs
*****************************************************************************/
bool
contain_parallel_unsafe(Node *node)
{
return contain_parallel_unsafe_walker(node, NULL);
}
static bool
contain_parallel_unsafe_walker(Node *node, void *context)
{
if (node == NULL)
return false;
if (IsA(node, FuncExpr))
{
FuncExpr *expr = (FuncExpr *) node;
if (func_parallel(expr->funcid) == PROPARALLEL_UNSAFE)
return true;
/* else fall through to check args */
}
else if (IsA(node, OpExpr))
{
OpExpr *expr = (OpExpr *) node;
set_opfuncid(expr);
if (func_parallel(expr->opfuncid) == PROPARALLEL_UNSAFE)
return true;
/* else fall through to check args */
}
else if (IsA(node, DistinctExpr))
{
DistinctExpr *expr = (DistinctExpr *) node;
set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */
if (func_parallel(expr->opfuncid) == PROPARALLEL_UNSAFE)
return true;
/* else fall through to check args */
}
else if (IsA(node, NullIfExpr))
{
NullIfExpr *expr = (NullIfExpr *) node;
set_opfuncid((OpExpr *) expr); /* rely on struct equivalence */
if (func_parallel(expr->opfuncid) == PROPARALLEL_UNSAFE)
return true;
/* else fall through to check args */
}
else if (IsA(node, ScalarArrayOpExpr))
{
ScalarArrayOpExpr *expr = (ScalarArrayOpExpr *) node;
set_sa_opfuncid(expr);
if (func_parallel(expr->opfuncid) == PROPARALLEL_UNSAFE)
return true;
/* else fall through to check args */
}
else if (IsA(node, CoerceViaIO))
{
CoerceViaIO *expr = (CoerceViaIO *) node;
Oid iofunc;
Oid typioparam;
bool typisvarlena;
/* check the result type's input function */
getTypeInputInfo(expr->resulttype,
&iofunc, &typioparam);
if (func_parallel(iofunc) == PROPARALLEL_UNSAFE)
return true;
/* check the input type's output function */
getTypeOutputInfo(exprType((Node *) expr->arg),
&iofunc, &typisvarlena);
if (func_parallel(iofunc) == PROPARALLEL_UNSAFE)
return true;
/* else fall through to check args */
}
else if (IsA(node, ArrayCoerceExpr))
{
ArrayCoerceExpr *expr = (ArrayCoerceExpr *) node;
if (OidIsValid(expr->elemfuncid) &&
func_parallel(expr->elemfuncid) == PROPARALLEL_UNSAFE)
return true;
/* else fall through to check args */
}
else if (IsA(node, RowCompareExpr))
{
/* RowCompare probably can't have volatile ops, but check anyway */
RowCompareExpr *rcexpr = (RowCompareExpr *) node;
ListCell *opid;
foreach(opid, rcexpr->opnos)
{
if (op_volatile(lfirst_oid(opid)) == PROPARALLEL_UNSAFE)
return true;
}
/* else fall through to check args */
}
else if (IsA(node, Query))
{
Query *query = (Query *) node;
if (query->rowMarks != NULL)
return true;
/* Recurse into subselects */
return query_tree_walker(query,
contain_parallel_unsafe_walker,
context, 0);
}
return expression_tree_walker(node,
contain_parallel_unsafe_walker,
context);
}
/*****************************************************************************
* Check clauses for nonstrict functions
*****************************************************************************/
......
......@@ -613,8 +613,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
OBJECT_P OF OFF OFFSET OIDS ON ONLY OPERATOR OPTION OPTIONS OR
ORDER ORDINALITY OUT_P OUTER_P OVER OVERLAPS OVERLAY OWNED OWNER
PARSER PARTIAL PARTITION PASSING PASSWORD PLACING PLANS POLICY POSITION
PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
PARALLEL PARSER PARTIAL PARTITION PASSING PASSWORD PLACING PLANS POLICY
POSITION PRECEDING PRECISION PRESERVE PREPARE PREPARED PRIMARY
PRIOR PRIVILEGES PROCEDURAL PROCEDURE PROGRAM
QUOTE
......@@ -7065,6 +7065,10 @@ common_func_opt_item:
/* we abuse the normal content of a DefElem here */
$$ = makeDefElem("set", (Node *)$1);
}
| PARALLEL ColId
{
$$ = makeDefElem("parallel", (Node *)makeString($2));
}
;
createfunc_opt_item:
......@@ -13778,6 +13782,7 @@ unreserved_keyword:
| OVER
| OWNED
| OWNER
| PARALLEL
| PARSER
| PARTIAL
| PARTITION
......
......@@ -1030,7 +1030,8 @@ exec_simple_query(const char *query_string)
querytree_list = pg_analyze_and_rewrite(parsetree, query_string,
NULL, 0);
plantree_list = pg_plan_queries(querytree_list, 0, NULL);
plantree_list = pg_plan_queries(querytree_list,
CURSOR_OPT_PARALLEL_OK, NULL);
/* Done with the snapshot used for parsing/planning */
if (snapshot_set)
......
......@@ -1539,6 +1539,25 @@ func_volatile(Oid funcid)
return result;
}
/*
* func_parallel
* Given procedure id, return the function's proparallel flag.
*/
char
func_parallel(Oid funcid)
{
HeapTuple tp;
char result;
tp = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for function %u", funcid);
result = ((Form_pg_proc) GETSTRUCT(tp))->proparallel;
ReleaseSysCache(tp);
return result;
}
/*
* get_func_leakproof
* Given procedure id, return the function's leakproof field.
......
......@@ -10204,6 +10204,7 @@ dumpFunc(Archive *fout, DumpOptions *dopt, FuncInfo *finfo)
char *proconfig;
char *procost;
char *prorows;
char *proparallel;
char *lanname;
char *rettypename;
int nallargs;
......@@ -10228,7 +10229,25 @@ dumpFunc(Archive *fout, DumpOptions *dopt, FuncInfo *finfo)
selectSourceSchema(fout, finfo->dobj.namespace->dobj.name);
/* Fetch function-specific details */
if (fout->remoteVersion >= 90500)
if (fout->remoteVersion >= 90600)
{
/*
* proparallel was added in 9.6
*/
appendPQExpBuffer(query,
"SELECT proretset, prosrc, probin, "
"pg_catalog.pg_get_function_arguments(oid) AS funcargs, "
"pg_catalog.pg_get_function_identity_arguments(oid) AS funciargs, "
"pg_catalog.pg_get_function_result(oid) AS funcresult, "
"array_to_string(protrftypes, ' ') AS protrftypes, "
"proiswindow, provolatile, proisstrict, prosecdef, "
"proleakproof, proconfig, procost, prorows, proparallel, "
"(SELECT lanname FROM pg_catalog.pg_language WHERE oid = prolang) AS lanname "
"FROM pg_catalog.pg_proc "
"WHERE oid = '%u'::pg_catalog.oid",
finfo->dobj.catId.oid);
}
else if (fout->remoteVersion >= 90500)
{
/*
* protrftypes was added in 9.5
......@@ -10410,6 +10429,12 @@ dumpFunc(Archive *fout, DumpOptions *dopt, FuncInfo *finfo)
proconfig = PQgetvalue(res, 0, PQfnumber(res, "proconfig"));
procost = PQgetvalue(res, 0, PQfnumber(res, "procost"));
prorows = PQgetvalue(res, 0, PQfnumber(res, "prorows"));
if (PQfnumber(res, "proparallel") != -1)
proparallel = PQgetvalue(res, 0, PQfnumber(res, "proparallel"));
else
proparallel = NULL;
lanname = PQgetvalue(res, 0, PQfnumber(res, "lanname"));
/*
......@@ -10608,6 +10633,17 @@ dumpFunc(Archive *fout, DumpOptions *dopt, FuncInfo *finfo)
strcmp(prorows, "0") != 0 && strcmp(prorows, "1000") != 0)
appendPQExpBuffer(q, " ROWS %s", prorows);
if (proparallel != NULL && proparallel[0] != PROPARALLEL_UNSAFE)
{
if (proparallel[0] == PROPARALLEL_SAFE)
appendPQExpBufferStr(q, " PARALLEL SAFE");
else if (proparallel[0] == PROPARALLEL_RESTRICTED)
appendPQExpBufferStr(q, " PARALLEL RESTRICTED");
else if (proparallel[0] != PROPARALLEL_UNSAFE)
exit_horribly(NULL, "unrecognized proparallel value for function \"%s\"\n",
finfo->dobj.name);
}
for (i = 0; i < nconfigitems; i++)
{
/* we feel free to scribble on configitems[] here */
......
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201508111
#define CATALOG_VERSION_NO 201509161
#endif
......@@ -144,7 +144,7 @@ DATA(insert OID = 1247 ( pg_type PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 30 0 t
DESCR("");
DATA(insert OID = 1249 ( pg_attribute PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 21 0 f f f f f f t n 3 1 _null_ _null_ ));
DESCR("");
DATA(insert OID = 1255 ( pg_proc PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 28 0 t f f f f f t n 3 1 _null_ _null_ ));
DATA(insert OID = 1255 ( pg_proc PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 29 0 t f f f f f t n 3 1 _null_ _null_ ));
DESCR("");
DATA(insert OID = 1259 ( pg_class PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 30 0 t f f f f f t n 3 1 _null_ _null_ ));
DESCR("");
......
This diff is collapsed.
......@@ -33,6 +33,7 @@ extern ObjectAddress ProcedureCreate(const char *procedureName,
bool isLeakProof,
bool isStrict,
char volatility,
char parallel,
oidvector *parameterTypes,
Datum allParameterTypes,
Datum parameterModes,
......
......@@ -2351,6 +2351,7 @@ typedef struct SecLabelStmt
#define CURSOR_OPT_FAST_PLAN 0x0020 /* prefer fast-start plan */
#define CURSOR_OPT_GENERIC_PLAN 0x0040 /* force use of generic plan */
#define CURSOR_OPT_CUSTOM_PLAN 0x0080 /* force use of custom plan */
#define CURSOR_OPT_PARALLEL_OK 0x0100 /* parallel mode OK */
typedef struct DeclareCursorStmt
{
......
......@@ -71,6 +71,8 @@ typedef struct PlannedStmt
int nParamExec; /* number of PARAM_EXEC Params used */
bool hasRowSecurity; /* row security applied? */
bool parallelModeNeeded; /* parallel mode required to execute? */
} PlannedStmt;
/* macro for fetching the Plan associated with a SubPlan node */
......
......@@ -102,6 +102,10 @@ typedef struct PlannerGlobal
bool transientPlan; /* redo plan when TransactionXmin changes? */
bool hasRowSecurity; /* row security applied? */
bool parallelModeOK; /* parallel mode potentially OK? */
bool parallelModeNeeded; /* parallel mode actually required? */
} PlannerGlobal;
/* macro for fetching the Plan associated with a SubPlan node */
......
......@@ -62,6 +62,7 @@ extern bool contain_subplans(Node *clause);
extern bool contain_mutable_functions(Node *clause);
extern bool contain_volatile_functions(Node *clause);
extern bool contain_volatile_functions_not_nextval(Node *clause);
extern bool contain_parallel_unsafe(Node *node);
extern bool contain_nonstrict_functions(Node *clause);
extern bool contain_leaked_vars(Node *clause);
......
......@@ -281,6 +281,7 @@ PG_KEYWORD("overlaps", OVERLAPS, TYPE_FUNC_NAME_KEYWORD)
PG_KEYWORD("overlay", OVERLAY, COL_NAME_KEYWORD)
PG_KEYWORD("owned", OWNED, UNRESERVED_KEYWORD)
PG_KEYWORD("owner", OWNER, UNRESERVED_KEYWORD)
PG_KEYWORD("parallel", PARALLEL, UNRESERVED_KEYWORD)
PG_KEYWORD("parser", PARSER, UNRESERVED_KEYWORD)
PG_KEYWORD("partial", PARTIAL, UNRESERVED_KEYWORD)
PG_KEYWORD("partition", PARTITION, UNRESERVED_KEYWORD)
......
......@@ -93,6 +93,7 @@ extern Oid get_func_variadictype(Oid funcid);
extern bool get_func_retset(Oid funcid);
extern bool func_strict(Oid funcid);
extern char func_volatile(Oid funcid);
extern char func_parallel(Oid funcid);
extern bool get_func_leakproof(Oid funcid);
extern float4 get_func_cost(Oid funcid);
extern float4 get_func_rows(Oid funcid);
......
......@@ -230,7 +230,8 @@ static Datum exec_eval_expr(PLpgSQL_execstate *estate,
Oid *rettype,
int32 *rettypmod);
static int exec_run_select(PLpgSQL_execstate *estate,
PLpgSQL_expr *expr, long maxtuples, Portal *portalP);
PLpgSQL_expr *expr, long maxtuples, Portal *portalP,
bool parallelOK);
static int exec_for_query(PLpgSQL_execstate *estate, PLpgSQL_stmt_forq *stmt,
Portal portal, bool prefetch_ok);
static ParamListInfo setup_param_list(PLpgSQL_execstate *estate,
......@@ -1563,7 +1564,7 @@ exec_stmt_perform(PLpgSQL_execstate *estate, PLpgSQL_stmt_perform *stmt)
{
PLpgSQL_expr *expr = stmt->expr;
(void) exec_run_select(estate, expr, 0, NULL);
(void) exec_run_select(estate, expr, 0, NULL, true);
exec_set_found(estate, (estate->eval_processed != 0));
exec_eval_cleanup(estate);
......@@ -2107,7 +2108,7 @@ exec_stmt_fors(PLpgSQL_execstate *estate, PLpgSQL_stmt_fors *stmt)
/*
* Open the implicit cursor for the statement using exec_run_select
*/
exec_run_select(estate, stmt->query, 0, &portal);
exec_run_select(estate, stmt->query, 0, &portal, false);
/*
* Execute the loop
......@@ -2869,14 +2870,15 @@ exec_stmt_return_query(PLpgSQL_execstate *estate,
if (stmt->query != NULL)
{
/* static query */
exec_run_select(estate, stmt->query, 0, &portal);
exec_run_select(estate, stmt->query, 0, &portal, true);
}
else
{
/* RETURN QUERY EXECUTE */
Assert(stmt->dynquery != NULL);
portal = exec_dynquery_with_params(estate, stmt->dynquery,
stmt->params, NULL, 0);
stmt->params, NULL,
CURSOR_OPT_PARALLEL_OK);
}
tupmap = convert_tuples_by_position(portal->tupDesc,
......@@ -5019,7 +5021,7 @@ exec_eval_expr(PLpgSQL_execstate *estate,
/*
* Else do it the hard way via exec_run_select
*/
rc = exec_run_select(estate, expr, 2, NULL);
rc = exec_run_select(estate, expr, 2, NULL, false);
if (rc != SPI_OK_SELECT)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
......@@ -5075,7 +5077,8 @@ exec_eval_expr(PLpgSQL_execstate *estate,
*/
static int
exec_run_select(PLpgSQL_execstate *estate,
PLpgSQL_expr *expr, long maxtuples, Portal *portalP)
PLpgSQL_expr *expr, long maxtuples, Portal *portalP,
bool parallelOK)
{
ParamListInfo paramLI;
int rc;
......@@ -5084,7 +5087,8 @@ exec_run_select(PLpgSQL_execstate *estate,
* On the first call for this expression generate the plan
*/
if (expr->plan == NULL)
exec_prepare_plan(estate, expr, 0);
exec_prepare_plan(estate, expr, parallelOK ?
CURSOR_OPT_PARALLEL_OK : 0);
/*
* If a portal was requested, put the query into the portal
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment