Commit bfc78d71 authored by Robert Haas's avatar Robert Haas

Rewrite interaction of parallel mode with parallel executor support.

In the previous coding, before returning from ExecutorRun, we'd shut
down all parallel workers.  This was dead wrong if ExecutorRun was
called with a non-zero tuple count; it had the effect of truncating
the query output.  To fix, give ExecutePlan control over whether to
enter parallel mode, and have it refuse to do so if the tuple count
is non-zero.  Rewrite the Gather logic so that it can cope with being
called outside parallel mode.

Commit 7aea8e4f is largely to blame
for this problem, though this patch modifies some subsequently-committed
code which relied on the guarantees it purported to make.
parent 816e336f
...@@ -76,6 +76,7 @@ static void CheckValidRowMarkRel(Relation rel, RowMarkType markType); ...@@ -76,6 +76,7 @@ static void CheckValidRowMarkRel(Relation rel, RowMarkType markType);
static void ExecPostprocessPlan(EState *estate); static void ExecPostprocessPlan(EState *estate);
static void ExecEndPlan(PlanState *planstate, EState *estate); static void ExecEndPlan(PlanState *planstate, EState *estate);
static void ExecutePlan(EState *estate, PlanState *planstate, static void ExecutePlan(EState *estate, PlanState *planstate,
bool use_parallel_mode,
CmdType operation, CmdType operation,
bool sendTuples, bool sendTuples,
long numberTuples, long numberTuples,
...@@ -243,11 +244,6 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) ...@@ -243,11 +244,6 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
if (!(eflags & (EXEC_FLAG_SKIP_TRIGGERS | EXEC_FLAG_EXPLAIN_ONLY))) if (!(eflags & (EXEC_FLAG_SKIP_TRIGGERS | EXEC_FLAG_EXPLAIN_ONLY)))
AfterTriggerBeginQuery(); AfterTriggerBeginQuery();
/* Enter parallel mode, if required by the query. */
if (queryDesc->plannedstmt->parallelModeNeeded &&
!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
EnterParallelMode();
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
} }
...@@ -341,15 +337,13 @@ standard_ExecutorRun(QueryDesc *queryDesc, ...@@ -341,15 +337,13 @@ standard_ExecutorRun(QueryDesc *queryDesc,
if (!ScanDirectionIsNoMovement(direction)) if (!ScanDirectionIsNoMovement(direction))
ExecutePlan(estate, ExecutePlan(estate,
queryDesc->planstate, queryDesc->planstate,
queryDesc->plannedstmt->parallelModeNeeded,
operation, operation,
sendTuples, sendTuples,
count, count,
direction, direction,
dest); dest);
/* Allow nodes to release or shut down resources. */
(void) ExecShutdownNode(queryDesc->planstate);
/* /*
* shutdown tuple receiver, if we started it * shutdown tuple receiver, if we started it
*/ */
...@@ -482,11 +476,6 @@ standard_ExecutorEnd(QueryDesc *queryDesc) ...@@ -482,11 +476,6 @@ standard_ExecutorEnd(QueryDesc *queryDesc)
*/ */
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
/* Exit parallel mode, if it was required by the query. */
if (queryDesc->plannedstmt->parallelModeNeeded &&
!(estate->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY))
ExitParallelMode();
/* /*
* Release EState and per-query memory context. This should release * Release EState and per-query memory context. This should release
* everything the executor has allocated. * everything the executor has allocated.
...@@ -1529,6 +1518,7 @@ ExecEndPlan(PlanState *planstate, EState *estate) ...@@ -1529,6 +1518,7 @@ ExecEndPlan(PlanState *planstate, EState *estate)
static void static void
ExecutePlan(EState *estate, ExecutePlan(EState *estate,
PlanState *planstate, PlanState *planstate,
bool use_parallel_mode,
CmdType operation, CmdType operation,
bool sendTuples, bool sendTuples,
long numberTuples, long numberTuples,
...@@ -1548,6 +1538,20 @@ ExecutePlan(EState *estate, ...@@ -1548,6 +1538,20 @@ ExecutePlan(EState *estate,
*/ */
estate->es_direction = direction; estate->es_direction = direction;
/*
* If a tuple count was supplied, we must force the plan to run without
* parallelism, because we might exit early.
*/
if (numberTuples != 0)
use_parallel_mode = false;
/*
* If a tuple count was supplied, we must force the plan to run without
* parallelism, because we might exit early.
*/
if (use_parallel_mode)
EnterParallelMode();
/* /*
* Loop until we've processed the proper number of tuples from the plan. * Loop until we've processed the proper number of tuples from the plan.
*/ */
...@@ -1566,7 +1570,11 @@ ExecutePlan(EState *estate, ...@@ -1566,7 +1570,11 @@ ExecutePlan(EState *estate,
* process so we just end the loop... * process so we just end the loop...
*/ */
if (TupIsNull(slot)) if (TupIsNull(slot))
{
/* Allow nodes to release or shut down resources. */
(void) ExecShutdownNode(planstate);
break; break;
}
/* /*
* If we have a junk filter, then project a new tuple with the junk * If we have a junk filter, then project a new tuple with the junk
...@@ -1603,6 +1611,9 @@ ExecutePlan(EState *estate, ...@@ -1603,6 +1611,9 @@ ExecutePlan(EState *estate,
if (numberTuples && numberTuples == current_tuple_count) if (numberTuples && numberTuples == current_tuple_count)
break; break;
} }
if (use_parallel_mode)
ExitParallelMode();
} }
......
...@@ -442,6 +442,23 @@ ExecParallelFinish(ParallelExecutorInfo *pei) ...@@ -442,6 +442,23 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
pei->instrumentation); pei->instrumentation);
} }
/*
* Clean up whatever ParallelExecutreInfo resources still exist after
* ExecParallelFinish. We separate these routines because someone might
* want to examine the contents of the DSM after ExecParallelFinish and
* before calling this routine.
*/
void
ExecParallelCleanup(ParallelExecutorInfo *pei)
{
if (pei->pcxt != NULL)
{
DestroyParallelContext(pei->pcxt);
pei->pcxt = NULL;
}
pfree(pei);
}
/* /*
* Create a DestReceiver to write tuples we produce to the shm_mq designated * Create a DestReceiver to write tuples we produce to the shm_mq designated
* for that purpose. * for that purpose.
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "postgres.h" #include "postgres.h"
#include "access/relscan.h" #include "access/relscan.h"
#include "access/xact.h"
#include "executor/execdebug.h" #include "executor/execdebug.h"
#include "executor/execParallel.h" #include "executor/execParallel.h"
#include "executor/nodeGather.h" #include "executor/nodeGather.h"
...@@ -45,7 +46,6 @@ ExecInitGather(Gather *node, EState *estate, int eflags) ...@@ -45,7 +46,6 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
gatherstate = makeNode(GatherState); gatherstate = makeNode(GatherState);
gatherstate->ps.plan = (Plan *) node; gatherstate->ps.plan = (Plan *) node;
gatherstate->ps.state = estate; gatherstate->ps.state = estate;
gatherstate->need_to_scan_workers = false;
gatherstate->need_to_scan_locally = !node->single_copy; gatherstate->need_to_scan_locally = !node->single_copy;
/* /*
...@@ -106,52 +106,57 @@ ExecGather(GatherState *node) ...@@ -106,52 +106,57 @@ ExecGather(GatherState *node)
* needs to allocate large dynamic segement, so it is better to do if it * needs to allocate large dynamic segement, so it is better to do if it
* is really needed. * is really needed.
*/ */
if (!node->pei) if (!node->initialized)
{ {
EState *estate = node->ps.state; EState *estate = node->ps.state;
Gather *gather = (Gather *) node->ps.plan;
/* Initialize the workers required to execute Gather node. */
node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate,
((Gather *) (node->ps.plan))->num_workers);
/* /*
* Register backend workers. If the required number of workers are not * Sometimes we might have to run without parallelism; but if
* available then we perform the scan with available workers and if * parallel mode is active then we can try to fire up some workers.
* there are no more workers available, then the Gather node will just
* scan locally.
*/ */
LaunchParallelWorkers(node->pei->pcxt); if (gather->num_workers > 0 && IsInParallelMode())
node->funnel = CreateTupleQueueFunnel();
for (i = 0; i < node->pei->pcxt->nworkers; ++i)
{ {
if (node->pei->pcxt->worker[i].bgwhandle) bool got_any_worker = false;
/* Initialize the workers required to execute Gather node. */
node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate,
gather->num_workers);
/*
* Register backend workers. We might not get as many as we
* requested, or indeed any at all.
*/
LaunchParallelWorkers(node->pei->pcxt);
/* Set up a tuple queue to collect the results. */
node->funnel = CreateTupleQueueFunnel();
for (i = 0; i < node->pei->pcxt->nworkers; ++i)
{ {
shm_mq_set_handle(node->pei->tqueue[i], if (node->pei->pcxt->worker[i].bgwhandle)
node->pei->pcxt->worker[i].bgwhandle); {
RegisterTupleQueueOnFunnel(node->funnel, node->pei->tqueue[i]); shm_mq_set_handle(node->pei->tqueue[i],
node->need_to_scan_workers = true; node->pei->pcxt->worker[i].bgwhandle);
RegisterTupleQueueOnFunnel(node->funnel,
node->pei->tqueue[i]);
got_any_worker = true;
}
} }
/* No workers? Then never mind. */
if (!got_any_worker)
ExecShutdownGather(node);
} }
/* If no workers are available, we must always scan locally. */ /* Run plan locally if no workers or not single-copy. */
if (!node->need_to_scan_workers) node->need_to_scan_locally = (node->funnel == NULL)
node->need_to_scan_locally = true; || !gather->single_copy;
node->initialized = true;
} }
slot = gather_getnext(node); slot = gather_getnext(node);
if (TupIsNull(slot))
{
/*
* Destroy the parallel context once we complete fetching all the
* tuples. Otherwise, the DSM and workers will stick around for the
* lifetime of the entire statement.
*/
ExecShutdownGather(node);
}
return slot; return slot;
} }
...@@ -194,10 +199,9 @@ gather_getnext(GatherState *gatherstate) ...@@ -194,10 +199,9 @@ gather_getnext(GatherState *gatherstate)
*/ */
slot = gatherstate->ps.ps_ProjInfo->pi_slot; slot = gatherstate->ps.ps_ProjInfo->pi_slot;
while (gatherstate->need_to_scan_workers || while (gatherstate->funnel != NULL || gatherstate->need_to_scan_locally)
gatherstate->need_to_scan_locally)
{ {
if (gatherstate->need_to_scan_workers) if (gatherstate->funnel != NULL)
{ {
bool done = false; bool done = false;
...@@ -206,7 +210,7 @@ gather_getnext(GatherState *gatherstate) ...@@ -206,7 +210,7 @@ gather_getnext(GatherState *gatherstate)
gatherstate->need_to_scan_locally, gatherstate->need_to_scan_locally,
&done); &done);
if (done) if (done)
gatherstate->need_to_scan_workers = false; ExecShutdownGather(gatherstate);
if (HeapTupleIsValid(tup)) if (HeapTupleIsValid(tup))
{ {
...@@ -247,30 +251,20 @@ gather_getnext(GatherState *gatherstate) ...@@ -247,30 +251,20 @@ gather_getnext(GatherState *gatherstate)
void void
ExecShutdownGather(GatherState *node) ExecShutdownGather(GatherState *node)
{ {
Gather *gather; /* Shut down tuple queue funnel before shutting down workers. */
if (node->funnel != NULL)
if (node->pei == NULL || node->pei->pcxt == NULL)
return;
/*
* Ensure all workers have finished before destroying the parallel context
* to ensure a clean exit.
*/
if (node->funnel)
{ {
DestroyTupleQueueFunnel(node->funnel); DestroyTupleQueueFunnel(node->funnel);
node->funnel = NULL; node->funnel = NULL;
} }
ExecParallelFinish(node->pei); /* Now shut down the workers. */
if (node->pei != NULL)
/* destroy parallel context. */ {
DestroyParallelContext(node->pei->pcxt); ExecParallelFinish(node->pei);
node->pei->pcxt = NULL; ExecParallelCleanup(node->pei);
node->pei = NULL;
gather = (Gather *) node->ps.plan; }
node->need_to_scan_locally = !gather->single_copy;
node->need_to_scan_workers = false;
} }
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
...@@ -295,5 +289,7 @@ ExecReScanGather(GatherState *node) ...@@ -295,5 +289,7 @@ ExecReScanGather(GatherState *node)
*/ */
ExecShutdownGather(node); ExecShutdownGather(node);
node->initialized = false;
ExecReScan(node->ps.lefttree); ExecReScan(node->ps.lefttree);
} }
...@@ -32,5 +32,6 @@ typedef struct ParallelExecutorInfo ...@@ -32,5 +32,6 @@ typedef struct ParallelExecutorInfo
extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
EState *estate, int nworkers); EState *estate, int nworkers);
extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
#endif /* EXECPARALLEL_H */ #endif /* EXECPARALLEL_H */
...@@ -1961,9 +1961,9 @@ typedef struct UniqueState ...@@ -1961,9 +1961,9 @@ typedef struct UniqueState
typedef struct GatherState typedef struct GatherState
{ {
PlanState ps; /* its first field is NodeTag */ PlanState ps; /* its first field is NodeTag */
bool initialized;
struct ParallelExecutorInfo *pei; struct ParallelExecutorInfo *pei;
struct TupleQueueFunnel *funnel; struct TupleQueueFunnel *funnel;
bool need_to_scan_workers;
bool need_to_scan_locally; bool need_to_scan_locally;
} GatherState; } GatherState;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment