Commit 05c8482f authored by Amit Kapila's avatar Amit Kapila

Enable parallel SELECT for "INSERT INTO ... SELECT ...".

Parallel SELECT can't be utilized for INSERT in the following cases:
- INSERT statement uses the ON CONFLICT DO UPDATE clause
- Target table has a parallel-unsafe: trigger, index expression or
  predicate, column default expression or check constraint
- Target table has a parallel-unsafe domain constraint on any column
- Target table is a partitioned table with a parallel-unsafe partition key
  expression or support function

The planner is updated to perform additional parallel-safety checks for
the cases listed above, for determining whether it is safe to run INSERT
in parallel-mode with an underlying parallel SELECT. The planner will
consider using parallel SELECT for "INSERT INTO ... SELECT ...", provided
nothing unsafe is found from the additional parallel-safety checks, or
from the existing parallel-safety checks for SELECT.

While checking parallel-safety, we need to check it for all the partitions
on the table which can be costly especially when we decide not to use a
parallel plan. So, in a separate patch, we will introduce a GUC and or a
reloption to enable/disable parallelism for Insert statements.

Prior to entering parallel-mode for the execution of INSERT with parallel
SELECT, a TransactionId is acquired and assigned to the current
transaction state. This is necessary to prevent the INSERT from attempting
to assign the TransactionId whilst in parallel-mode, which is not allowed.
This approach has a disadvantage in that if the underlying SELECT does not
return any rows, then the TransactionId is not used, however that
shouldn't happen in practice in many cases.

Author: Greg Nancarrow, Amit Langote, Amit Kapila
Reviewed-by: Amit Langote, Hou Zhijie, Takayuki Tsunakawa, Antonin Houska, Bharath Rupireddy, Dilip Kumar, Vignesh C, Zhihong Yu, Amit Kapila
Tested-by: Tang, Haiying
Discussion: https://postgr.es/m/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV=qpFJrR3AcrTS3g@mail.gmail.com
Discussion: https://postgr.es/m/CAJcOf-fAdj=nDKMsRhQzndm-O13NY4dL6xGcEvdX5Xvbbi0V7g@mail.gmail.com
parent 0ba71107
...@@ -146,7 +146,9 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%'; ...@@ -146,7 +146,9 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
a CTE, no parallel plans for that query will be generated. As an a CTE, no parallel plans for that query will be generated. As an
exception, the commands <literal>CREATE TABLE ... AS</literal>, <literal>SELECT exception, the commands <literal>CREATE TABLE ... AS</literal>, <literal>SELECT
INTO</literal>, and <literal>CREATE MATERIALIZED VIEW</literal> which create a new INTO</literal>, and <literal>CREATE MATERIALIZED VIEW</literal> which create a new
table and populate it can use a parallel plan. table and populate it can use a parallel plan. Another exception is the command
<literal>INSERT INTO ... SELECT ...</literal> which can use a parallel plan for
the underlying <literal>SELECT</literal> part of the query.
</para> </para>
</listitem> </listitem>
......
...@@ -1014,6 +1014,32 @@ IsInParallelMode(void) ...@@ -1014,6 +1014,32 @@ IsInParallelMode(void)
return CurrentTransactionState->parallelModeLevel != 0; return CurrentTransactionState->parallelModeLevel != 0;
} }
/*
* PrepareParallelModePlanExec
*
* Prepare for entering parallel mode plan execution, based on command-type.
*/
void
PrepareParallelModePlanExec(CmdType commandType)
{
if (IsModifySupportedInParallelMode(commandType))
{
Assert(!IsInParallelMode());
/*
* Prepare for entering parallel mode by assigning a TransactionId.
* Failure to do this now would result in heap_insert() subsequently
* attempting to assign a TransactionId whilst in parallel-mode, which
* is not allowed.
*
* This approach has a disadvantage in that if the underlying SELECT
* does not return any rows, then the TransactionId is not used,
* however that shouldn't happen in practice in many cases.
*/
(void) GetCurrentTransactionId();
}
}
/* /*
* CommandCounterIncrement * CommandCounterIncrement
*/ */
......
...@@ -1512,7 +1512,10 @@ ExecutePlan(EState *estate, ...@@ -1512,7 +1512,10 @@ ExecutePlan(EState *estate,
estate->es_use_parallel_mode = use_parallel_mode; estate->es_use_parallel_mode = use_parallel_mode;
if (use_parallel_mode) if (use_parallel_mode)
{
PrepareParallelModePlanExec(estate->es_plannedstmt->commandType);
EnterParallelMode(); EnterParallelMode();
}
/* /*
* Loop until we've processed the proper number of tuples from the plan. * Loop until we've processed the proper number of tuples from the plan.
......
...@@ -96,6 +96,7 @@ _copyPlannedStmt(const PlannedStmt *from) ...@@ -96,6 +96,7 @@ _copyPlannedStmt(const PlannedStmt *from)
COPY_BITMAPSET_FIELD(rewindPlanIDs); COPY_BITMAPSET_FIELD(rewindPlanIDs);
COPY_NODE_FIELD(rowMarks); COPY_NODE_FIELD(rowMarks);
COPY_NODE_FIELD(relationOids); COPY_NODE_FIELD(relationOids);
COPY_NODE_FIELD(partitionOids);
COPY_NODE_FIELD(invalItems); COPY_NODE_FIELD(invalItems);
COPY_NODE_FIELD(paramExecTypes); COPY_NODE_FIELD(paramExecTypes);
COPY_NODE_FIELD(utilityStmt); COPY_NODE_FIELD(utilityStmt);
......
...@@ -314,6 +314,7 @@ _outPlannedStmt(StringInfo str, const PlannedStmt *node) ...@@ -314,6 +314,7 @@ _outPlannedStmt(StringInfo str, const PlannedStmt *node)
WRITE_BITMAPSET_FIELD(rewindPlanIDs); WRITE_BITMAPSET_FIELD(rewindPlanIDs);
WRITE_NODE_FIELD(rowMarks); WRITE_NODE_FIELD(rowMarks);
WRITE_NODE_FIELD(relationOids); WRITE_NODE_FIELD(relationOids);
WRITE_NODE_FIELD(partitionOids);
WRITE_NODE_FIELD(invalItems); WRITE_NODE_FIELD(invalItems);
WRITE_NODE_FIELD(paramExecTypes); WRITE_NODE_FIELD(paramExecTypes);
WRITE_NODE_FIELD(utilityStmt); WRITE_NODE_FIELD(utilityStmt);
...@@ -2221,6 +2222,7 @@ _outPlannerGlobal(StringInfo str, const PlannerGlobal *node) ...@@ -2221,6 +2222,7 @@ _outPlannerGlobal(StringInfo str, const PlannerGlobal *node)
WRITE_NODE_FIELD(resultRelations); WRITE_NODE_FIELD(resultRelations);
WRITE_NODE_FIELD(appendRelations); WRITE_NODE_FIELD(appendRelations);
WRITE_NODE_FIELD(relationOids); WRITE_NODE_FIELD(relationOids);
WRITE_NODE_FIELD(partitionOids);
WRITE_NODE_FIELD(invalItems); WRITE_NODE_FIELD(invalItems);
WRITE_NODE_FIELD(paramExecTypes); WRITE_NODE_FIELD(paramExecTypes);
WRITE_UINT_FIELD(lastPHId); WRITE_UINT_FIELD(lastPHId);
......
...@@ -1590,6 +1590,7 @@ _readPlannedStmt(void) ...@@ -1590,6 +1590,7 @@ _readPlannedStmt(void)
READ_BITMAPSET_FIELD(rewindPlanIDs); READ_BITMAPSET_FIELD(rewindPlanIDs);
READ_NODE_FIELD(rowMarks); READ_NODE_FIELD(rowMarks);
READ_NODE_FIELD(relationOids); READ_NODE_FIELD(relationOids);
READ_NODE_FIELD(partitionOids);
READ_NODE_FIELD(invalItems); READ_NODE_FIELD(invalItems);
READ_NODE_FIELD(paramExecTypes); READ_NODE_FIELD(paramExecTypes);
READ_NODE_FIELD(utilityStmt); READ_NODE_FIELD(utilityStmt);
......
...@@ -305,6 +305,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, ...@@ -305,6 +305,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
glob->resultRelations = NIL; glob->resultRelations = NIL;
glob->appendRelations = NIL; glob->appendRelations = NIL;
glob->relationOids = NIL; glob->relationOids = NIL;
glob->partitionOids = NIL;
glob->invalItems = NIL; glob->invalItems = NIL;
glob->paramExecTypes = NIL; glob->paramExecTypes = NIL;
glob->lastPHId = 0; glob->lastPHId = 0;
...@@ -316,16 +317,16 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, ...@@ -316,16 +317,16 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
/* /*
* Assess whether it's feasible to use parallel mode for this query. We * Assess whether it's feasible to use parallel mode for this query. We
* can't do this in a standalone backend, or if the command will try to * can't do this in a standalone backend, or if the command will try to
* modify any data, or if this is a cursor operation, or if GUCs are set * modify any data (except for Insert), or if this is a cursor operation,
* to values that don't permit parallelism, or if parallel-unsafe * or if GUCs are set to values that don't permit parallelism, or if
* functions are present in the query tree. * parallel-unsafe functions are present in the query tree.
* *
* (Note that we do allow CREATE TABLE AS, SELECT INTO, and CREATE * (Note that we do allow CREATE TABLE AS, INSERT INTO...SELECT, SELECT
* MATERIALIZED VIEW to use parallel plans, but as of now, only the leader * INTO, and CREATE MATERIALIZED VIEW to use parallel plans. However, as
* backend writes into a completely new table. In the future, we can * of now, only the leader backend writes into a completely new table. In
* extend it to allow workers to write into the table. However, to allow * the future, we can extend it to allow workers to write into the table.
* parallel updates and deletes, we have to solve other problems, * However, to allow parallel updates and deletes, we have to solve other
* especially around combo CIDs.) * problems, especially around combo CIDs.)
* *
* For now, we don't try to use parallel mode if we're running inside a * For now, we don't try to use parallel mode if we're running inside a
* parallel worker. We might eventually be able to relax this * parallel worker. We might eventually be able to relax this
...@@ -334,13 +335,14 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, ...@@ -334,13 +335,14 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
*/ */
if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 && if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
IsUnderPostmaster && IsUnderPostmaster &&
parse->commandType == CMD_SELECT && (parse->commandType == CMD_SELECT ||
is_parallel_allowed_for_modify(parse)) &&
!parse->hasModifyingCTE && !parse->hasModifyingCTE &&
max_parallel_workers_per_gather > 0 && max_parallel_workers_per_gather > 0 &&
!IsParallelWorker()) !IsParallelWorker())
{ {
/* all the cheap tests pass, so scan the query tree */ /* all the cheap tests pass, so scan the query tree */
glob->maxParallelHazard = max_parallel_hazard(parse); glob->maxParallelHazard = max_parallel_hazard(parse, glob);
glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE); glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE);
} }
else else
...@@ -521,6 +523,19 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, ...@@ -521,6 +523,19 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
result->rewindPlanIDs = glob->rewindPlanIDs; result->rewindPlanIDs = glob->rewindPlanIDs;
result->rowMarks = glob->finalrowmarks; result->rowMarks = glob->finalrowmarks;
result->relationOids = glob->relationOids; result->relationOids = glob->relationOids;
/*
* Register the Oids of parallel-safety-checked partitions as plan
* dependencies. This is only really needed in the case of a parallel plan
* so that if parallel-unsafe properties are subsequently defined on the
* partitions, the cached parallel plan will be invalidated, and a
* non-parallel plan will be generated.
*
* We also use this list to acquire locks on partitions before executing
* cached plan. See AcquireExecutorLocks().
*/
if (glob->partitionOids != NIL && glob->parallelModeNeeded)
result->partitionOids = glob->partitionOids;
result->invalItems = glob->invalItems; result->invalItems = glob->invalItems;
result->paramExecTypes = glob->paramExecTypes; result->paramExecTypes = glob->paramExecTypes;
/* utilityStmt should be null, but we might as well copy it */ /* utilityStmt should be null, but we might as well copy it */
......
This diff is collapsed.
...@@ -1735,6 +1735,23 @@ QueryListGetPrimaryStmt(List *stmts) ...@@ -1735,6 +1735,23 @@ QueryListGetPrimaryStmt(List *stmts)
return NULL; return NULL;
} }
static void
AcquireExecutorLocksOnPartitions(List *partitionOids, int lockmode,
bool acquire)
{
ListCell *lc;
foreach(lc, partitionOids)
{
Oid partOid = lfirst_oid(lc);
if (acquire)
LockRelationOid(partOid, lockmode);
else
UnlockRelationOid(partOid, lockmode);
}
}
/* /*
* AcquireExecutorLocks: acquire locks needed for execution of a cached plan; * AcquireExecutorLocks: acquire locks needed for execution of a cached plan;
* or release them if acquire is false. * or release them if acquire is false.
...@@ -1748,6 +1765,8 @@ AcquireExecutorLocks(List *stmt_list, bool acquire) ...@@ -1748,6 +1765,8 @@ AcquireExecutorLocks(List *stmt_list, bool acquire)
{ {
PlannedStmt *plannedstmt = lfirst_node(PlannedStmt, lc1); PlannedStmt *plannedstmt = lfirst_node(PlannedStmt, lc1);
ListCell *lc2; ListCell *lc2;
Index rti,
resultRelation = 0;
if (plannedstmt->commandType == CMD_UTILITY) if (plannedstmt->commandType == CMD_UTILITY)
{ {
...@@ -1765,6 +1784,9 @@ AcquireExecutorLocks(List *stmt_list, bool acquire) ...@@ -1765,6 +1784,9 @@ AcquireExecutorLocks(List *stmt_list, bool acquire)
continue; continue;
} }
rti = 1;
if (plannedstmt->resultRelations)
resultRelation = linitial_int(plannedstmt->resultRelations);
foreach(lc2, plannedstmt->rtable) foreach(lc2, plannedstmt->rtable)
{ {
RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc2); RangeTblEntry *rte = (RangeTblEntry *) lfirst(lc2);
...@@ -1782,6 +1804,14 @@ AcquireExecutorLocks(List *stmt_list, bool acquire) ...@@ -1782,6 +1804,14 @@ AcquireExecutorLocks(List *stmt_list, bool acquire)
LockRelationOid(rte->relid, rte->rellockmode); LockRelationOid(rte->relid, rte->rellockmode);
else else
UnlockRelationOid(rte->relid, rte->rellockmode); UnlockRelationOid(rte->relid, rte->rellockmode);
/* Lock partitions ahead of modifying them in parallel mode. */
if (rti == resultRelation &&
plannedstmt->partitionOids != NIL)
AcquireExecutorLocksOnPartitions(plannedstmt->partitionOids,
rte->rellockmode, acquire);
rti++;
} }
} }
} }
...@@ -1990,7 +2020,8 @@ PlanCacheRelCallback(Datum arg, Oid relid) ...@@ -1990,7 +2020,8 @@ PlanCacheRelCallback(Datum arg, Oid relid)
if (plannedstmt->commandType == CMD_UTILITY) if (plannedstmt->commandType == CMD_UTILITY)
continue; /* Ignore utility statements */ continue; /* Ignore utility statements */
if ((relid == InvalidOid) ? plannedstmt->relationOids != NIL : if ((relid == InvalidOid) ? plannedstmt->relationOids != NIL :
list_member_oid(plannedstmt->relationOids, relid)) (list_member_oid(plannedstmt->relationOids, relid) ||
list_member_oid(plannedstmt->partitionOids, relid)))
{ {
/* Invalidate the generic plan only */ /* Invalidate the generic plan only */
plansource->gplan->is_valid = false; plansource->gplan->is_valid = false;
......
...@@ -466,5 +466,20 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse ...@@ -466,5 +466,20 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse
extern void EnterParallelMode(void); extern void EnterParallelMode(void);
extern void ExitParallelMode(void); extern void ExitParallelMode(void);
extern bool IsInParallelMode(void); extern bool IsInParallelMode(void);
extern void PrepareParallelModePlanExec(CmdType commandType);
/*
* IsModifySupportedInParallelMode
*
* Indicates whether execution of the specified table-modification command
* (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain
* parallel-safety conditions.
*/
static inline bool
IsModifySupportedInParallelMode(CmdType commandType)
{
/* Currently only INSERT is supported */
return (commandType == CMD_INSERT);
}
#endif /* XACT_H */ #endif /* XACT_H */
...@@ -120,6 +120,8 @@ typedef struct PlannerGlobal ...@@ -120,6 +120,8 @@ typedef struct PlannerGlobal
List *relationOids; /* OIDs of relations the plan depends on */ List *relationOids; /* OIDs of relations the plan depends on */
List *partitionOids; /* OIDs of partitions the plan depends on */
List *invalItems; /* other dependencies, as PlanInvalItems */ List *invalItems; /* other dependencies, as PlanInvalItems */
List *paramExecTypes; /* type OIDs for PARAM_EXEC Params */ List *paramExecTypes; /* type OIDs for PARAM_EXEC Params */
......
...@@ -79,6 +79,8 @@ typedef struct PlannedStmt ...@@ -79,6 +79,8 @@ typedef struct PlannedStmt
List *relationOids; /* OIDs of relations the plan depends on */ List *relationOids; /* OIDs of relations the plan depends on */
List *partitionOids; /* OIDs of partitions the plan depends on */
List *invalItems; /* other dependencies, as PlanInvalItems */ List *invalItems; /* other dependencies, as PlanInvalItems */
List *paramExecTypes; /* type OIDs for PARAM_EXEC Params */ List *paramExecTypes; /* type OIDs for PARAM_EXEC Params */
......
...@@ -32,7 +32,7 @@ extern double expression_returns_set_rows(PlannerInfo *root, Node *clause); ...@@ -32,7 +32,7 @@ extern double expression_returns_set_rows(PlannerInfo *root, Node *clause);
extern bool contain_subplans(Node *clause); extern bool contain_subplans(Node *clause);
extern char max_parallel_hazard(Query *parse); extern char max_parallel_hazard(Query *parse, PlannerGlobal *glob);
extern bool is_parallel_safe(PlannerInfo *root, Node *node); extern bool is_parallel_safe(PlannerInfo *root, Node *node);
extern bool contain_nonstrict_functions(Node *clause); extern bool contain_nonstrict_functions(Node *clause);
extern bool contain_exec_param(Node *clause, List *param_ids); extern bool contain_exec_param(Node *clause, List *param_ids);
...@@ -52,5 +52,6 @@ extern void CommuteOpExpr(OpExpr *clause); ...@@ -52,5 +52,6 @@ extern void CommuteOpExpr(OpExpr *clause);
extern Query *inline_set_returning_function(PlannerInfo *root, extern Query *inline_set_returning_function(PlannerInfo *root,
RangeTblEntry *rte); RangeTblEntry *rte);
extern bool is_parallel_allowed_for_modify(Query *parse);
#endif /* CLAUSES_H */ #endif /* CLAUSES_H */
This diff is collapsed.
...@@ -90,6 +90,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8 ...@@ -90,6 +90,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8
# run by itself so it can run parallel workers # run by itself so it can run parallel workers
test: select_parallel test: select_parallel
test: write_parallel test: write_parallel
test: insert_parallel
# no relation related tests can be put in this group # no relation related tests can be put in this group
test: publication subscription test: publication subscription
......
...@@ -148,6 +148,7 @@ test: stats_ext ...@@ -148,6 +148,7 @@ test: stats_ext
test: collate.linux.utf8 test: collate.linux.utf8
test: select_parallel test: select_parallel
test: write_parallel test: write_parallel
test: insert_parallel
test: publication test: publication
test: subscription test: subscription
test: select_views test: select_views
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment