Commit 51daa7bd authored by Tom Lane's avatar Tom Lane

Improve division of labor between execParallel.c and nodeGather[Merge].c.

Move the responsibility for creating/destroying TupleQueueReaders into
execParallel.c, to avoid duplicative coding in nodeGather.c and
nodeGatherMerge.c.  Also, instead of having DestroyTupleQueueReader do
shm_mq_detach, do it in the caller (which is now only ExecParallelFinish).
This means execParallel.c does both the attaching and detaching of the
tuple-queue-reader shm_mqs, which seems less weird than the previous
arrangement.

These changes also eliminate a vestigial memory leak (of the pei->tqueue
array).  It's now demonstrable that rescans of Gather or GatherMerge don't
leak memory.

Discussion: https://postgr.es/m/8670.1504192177@sss.pgh.pa.us
parent c039ba07
...@@ -534,9 +534,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, ...@@ -534,9 +534,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space); shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
pei->buffer_usage = bufusage_space; pei->buffer_usage = bufusage_space;
/* Set up tuple queues. */ /* Set up the tuple queues that the workers will write into. */
pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
/* We don't need the TupleQueueReaders yet, though. */
pei->reader = NULL;
/* /*
* If instrumentation options were supplied, allocate space for the data. * If instrumentation options were supplied, allocate space for the data.
* It only gets partially initialized here; the rest happens during * It only gets partially initialized here; the rest happens during
...@@ -603,6 +606,37 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, ...@@ -603,6 +606,37 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
return pei; return pei;
} }
/*
* Set up tuple queue readers to read the results of a parallel subplan.
* All the workers are expected to return tuples matching tupDesc.
*
* This is separate from ExecInitParallelPlan() because we can launch the
* worker processes and let them start doing something before we do this.
*/
void
ExecParallelCreateReaders(ParallelExecutorInfo *pei,
TupleDesc tupDesc)
{
int nworkers = pei->pcxt->nworkers_launched;
int i;
Assert(pei->reader == NULL);
if (nworkers > 0)
{
pei->reader = (TupleQueueReader **)
palloc(nworkers * sizeof(TupleQueueReader *));
for (i = 0; i < nworkers; i++)
{
shm_mq_set_handle(pei->tqueue[i],
pei->pcxt->worker[i].bgwhandle);
pei->reader[i] = CreateTupleQueueReader(pei->tqueue[i],
tupDesc);
}
}
}
/* /*
* Re-initialize the parallel executor shared memory state before launching * Re-initialize the parallel executor shared memory state before launching
* a fresh batch of workers. * a fresh batch of workers.
...@@ -616,6 +650,7 @@ ExecParallelReinitialize(PlanState *planstate, ...@@ -616,6 +650,7 @@ ExecParallelReinitialize(PlanState *planstate,
ReinitializeParallelDSM(pei->pcxt); ReinitializeParallelDSM(pei->pcxt);
pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
pei->reader = NULL;
pei->finished = false; pei->finished = false;
/* Traverse plan tree and let each child node reset associated state. */ /* Traverse plan tree and let each child node reset associated state. */
...@@ -741,16 +776,45 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate, ...@@ -741,16 +776,45 @@ ExecParallelRetrieveInstrumentation(PlanState *planstate,
void void
ExecParallelFinish(ParallelExecutorInfo *pei) ExecParallelFinish(ParallelExecutorInfo *pei)
{ {
int nworkers = pei->pcxt->nworkers_launched;
int i; int i;
/* Make this be a no-op if called twice in a row. */
if (pei->finished) if (pei->finished)
return; return;
/* First, wait for the workers to finish. */ /*
* Detach from tuple queues ASAP, so that any still-active workers will
* notice that no further results are wanted.
*/
if (pei->tqueue != NULL)
{
for (i = 0; i < nworkers; i++)
shm_mq_detach(pei->tqueue[i]);
pfree(pei->tqueue);
pei->tqueue = NULL;
}
/*
* While we're waiting for the workers to finish, let's get rid of the
* tuple queue readers. (Any other local cleanup could be done here too.)
*/
if (pei->reader != NULL)
{
for (i = 0; i < nworkers; i++)
DestroyTupleQueueReader(pei->reader[i]);
pfree(pei->reader);
pei->reader = NULL;
}
/* Now wait for the workers to finish. */
WaitForParallelWorkersToFinish(pei->pcxt); WaitForParallelWorkersToFinish(pei->pcxt);
/* Next, accumulate buffer usage. */ /*
for (i = 0; i < pei->pcxt->nworkers_launched; ++i) * Next, accumulate buffer usage. (This must wait for the workers to
* finish, or we might get incomplete data.)
*/
for (i = 0; i < nworkers; i++)
InstrAccumParallelQuery(&pei->buffer_usage[i]); InstrAccumParallelQuery(&pei->buffer_usage[i]);
/* Finally, accumulate instrumentation, if any. */ /* Finally, accumulate instrumentation, if any. */
......
...@@ -130,7 +130,6 @@ ExecGather(PlanState *pstate) ...@@ -130,7 +130,6 @@ ExecGather(PlanState *pstate)
{ {
GatherState *node = castNode(GatherState, pstate); GatherState *node = castNode(GatherState, pstate);
TupleTableSlot *fslot = node->funnel_slot; TupleTableSlot *fslot = node->funnel_slot;
int i;
TupleTableSlot *slot; TupleTableSlot *slot;
ExprContext *econtext; ExprContext *econtext;
...@@ -173,33 +172,30 @@ ExecGather(PlanState *pstate) ...@@ -173,33 +172,30 @@ ExecGather(PlanState *pstate)
LaunchParallelWorkers(pcxt); LaunchParallelWorkers(pcxt);
/* We save # workers launched for the benefit of EXPLAIN */ /* We save # workers launched for the benefit of EXPLAIN */
node->nworkers_launched = pcxt->nworkers_launched; node->nworkers_launched = pcxt->nworkers_launched;
node->nreaders = 0;
node->nextreader = 0;
/* Set up tuple queue readers to read the results. */ /* Set up tuple queue readers to read the results. */
if (pcxt->nworkers_launched > 0) if (pcxt->nworkers_launched > 0)
{ {
node->reader = palloc(pcxt->nworkers_launched * ExecParallelCreateReaders(node->pei,
sizeof(TupleQueueReader *)); fslot->tts_tupleDescriptor);
/* Make a working array showing the active readers */
for (i = 0; i < pcxt->nworkers_launched; ++i) node->nreaders = pcxt->nworkers_launched;
{ node->reader = (TupleQueueReader **)
shm_mq_set_handle(node->pei->tqueue[i], palloc(node->nreaders * sizeof(TupleQueueReader *));
pcxt->worker[i].bgwhandle); memcpy(node->reader, node->pei->reader,
node->reader[node->nreaders++] = node->nreaders * sizeof(TupleQueueReader *));
CreateTupleQueueReader(node->pei->tqueue[i],
fslot->tts_tupleDescriptor);
}
} }
else else
{ {
/* No workers? Then never mind. */ /* No workers? Then never mind. */
ExecShutdownGatherWorkers(node); node->nreaders = 0;
node->reader = NULL;
} }
node->nextreader = 0;
} }
/* Run plan locally if no workers or not single-copy. */ /* Run plan locally if no workers or not single-copy. */
node->need_to_scan_locally = (node->reader == NULL) node->need_to_scan_locally = (node->nreaders == 0)
|| !gather->single_copy; || !gather->single_copy;
node->initialized = true; node->initialized = true;
} }
...@@ -258,11 +254,11 @@ gather_getnext(GatherState *gatherstate) ...@@ -258,11 +254,11 @@ gather_getnext(GatherState *gatherstate)
MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory; MemoryContext tupleContext = gatherstate->ps.ps_ExprContext->ecxt_per_tuple_memory;
HeapTuple tup; HeapTuple tup;
while (gatherstate->reader != NULL || gatherstate->need_to_scan_locally) while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
{ {
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
if (gatherstate->reader != NULL) if (gatherstate->nreaders > 0)
{ {
MemoryContext oldContext; MemoryContext oldContext;
...@@ -319,19 +315,15 @@ gather_readnext(GatherState *gatherstate) ...@@ -319,19 +315,15 @@ gather_readnext(GatherState *gatherstate)
tup = TupleQueueReaderNext(reader, true, &readerdone); tup = TupleQueueReaderNext(reader, true, &readerdone);
/* /*
* If this reader is done, remove it, and collapse the array. If all * If this reader is done, remove it from our working array of active
* readers are done, clean up remaining worker state. * readers. If all readers are done, we're outta here.
*/ */
if (readerdone) if (readerdone)
{ {
Assert(!tup); Assert(!tup);
DestroyTupleQueueReader(reader);
--gatherstate->nreaders; --gatherstate->nreaders;
if (gatherstate->nreaders == 0) if (gatherstate->nreaders == 0)
{
ExecShutdownGatherWorkers(gatherstate);
return NULL; return NULL;
}
memmove(&gatherstate->reader[gatherstate->nextreader], memmove(&gatherstate->reader[gatherstate->nextreader],
&gatherstate->reader[gatherstate->nextreader + 1], &gatherstate->reader[gatherstate->nextreader + 1],
sizeof(TupleQueueReader *) sizeof(TupleQueueReader *)
...@@ -378,37 +370,25 @@ gather_readnext(GatherState *gatherstate) ...@@ -378,37 +370,25 @@ gather_readnext(GatherState *gatherstate)
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* ExecShutdownGatherWorkers * ExecShutdownGatherWorkers
* *
* Destroy the parallel workers. Collect all the stats after * Stop all the parallel workers.
* workers are stopped, else some work done by workers won't be
* accounted.
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
static void static void
ExecShutdownGatherWorkers(GatherState *node) ExecShutdownGatherWorkers(GatherState *node)
{ {
/* Shut down tuple queue readers before shutting down workers. */
if (node->reader != NULL)
{
int i;
for (i = 0; i < node->nreaders; ++i)
DestroyTupleQueueReader(node->reader[i]);
pfree(node->reader);
node->reader = NULL;
}
/* Now shut down the workers. */
if (node->pei != NULL) if (node->pei != NULL)
ExecParallelFinish(node->pei); ExecParallelFinish(node->pei);
/* Flush local copy of reader array */
if (node->reader)
pfree(node->reader);
node->reader = NULL;
} }
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* ExecShutdownGather * ExecShutdownGather
* *
* Destroy the setup for parallel workers including parallel context. * Destroy the setup for parallel workers including parallel context.
* Collect all the stats after workers are stopped, else some work
* done by workers won't be accounted.
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
void void
......
...@@ -178,7 +178,6 @@ ExecGatherMerge(PlanState *pstate) ...@@ -178,7 +178,6 @@ ExecGatherMerge(PlanState *pstate)
GatherMergeState *node = castNode(GatherMergeState, pstate); GatherMergeState *node = castNode(GatherMergeState, pstate);
TupleTableSlot *slot; TupleTableSlot *slot;
ExprContext *econtext; ExprContext *econtext;
int i;
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
...@@ -214,27 +213,23 @@ ExecGatherMerge(PlanState *pstate) ...@@ -214,27 +213,23 @@ ExecGatherMerge(PlanState *pstate)
LaunchParallelWorkers(pcxt); LaunchParallelWorkers(pcxt);
/* We save # workers launched for the benefit of EXPLAIN */ /* We save # workers launched for the benefit of EXPLAIN */
node->nworkers_launched = pcxt->nworkers_launched; node->nworkers_launched = pcxt->nworkers_launched;
node->nreaders = 0;
/* Set up tuple queue readers to read the results. */ /* Set up tuple queue readers to read the results. */
if (pcxt->nworkers_launched > 0) if (pcxt->nworkers_launched > 0)
{ {
node->reader = palloc(pcxt->nworkers_launched * ExecParallelCreateReaders(node->pei, node->tupDesc);
sizeof(TupleQueueReader *)); /* Make a working array showing the active readers */
node->nreaders = pcxt->nworkers_launched;
for (i = 0; i < pcxt->nworkers_launched; ++i) node->reader = (TupleQueueReader **)
{ palloc(node->nreaders * sizeof(TupleQueueReader *));
shm_mq_set_handle(node->pei->tqueue[i], memcpy(node->reader, node->pei->reader,
pcxt->worker[i].bgwhandle); node->nreaders * sizeof(TupleQueueReader *));
node->reader[node->nreaders++] =
CreateTupleQueueReader(node->pei->tqueue[i],
node->tupDesc);
}
} }
else else
{ {
/* No workers? Then never mind. */ /* No workers? Then never mind. */
ExecShutdownGatherMergeWorkers(node); node->nreaders = 0;
node->reader = NULL;
} }
} }
...@@ -284,8 +279,6 @@ ExecEndGatherMerge(GatherMergeState *node) ...@@ -284,8 +279,6 @@ ExecEndGatherMerge(GatherMergeState *node)
* ExecShutdownGatherMerge * ExecShutdownGatherMerge
* *
* Destroy the setup for parallel workers including parallel context. * Destroy the setup for parallel workers including parallel context.
* Collect all the stats after workers are stopped, else some work
* done by workers won't be accounted.
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
void void
...@@ -304,30 +297,19 @@ ExecShutdownGatherMerge(GatherMergeState *node) ...@@ -304,30 +297,19 @@ ExecShutdownGatherMerge(GatherMergeState *node)
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* ExecShutdownGatherMergeWorkers * ExecShutdownGatherMergeWorkers
* *
* Destroy the parallel workers. Collect all the stats after * Stop all the parallel workers.
* workers are stopped, else some work done by workers won't be
* accounted.
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
static void static void
ExecShutdownGatherMergeWorkers(GatherMergeState *node) ExecShutdownGatherMergeWorkers(GatherMergeState *node)
{ {
/* Shut down tuple queue readers before shutting down workers. */
if (node->reader != NULL)
{
int i;
for (i = 0; i < node->nreaders; ++i)
if (node->reader[i])
DestroyTupleQueueReader(node->reader[i]);
pfree(node->reader);
node->reader = NULL;
}
/* Now shut down the workers. */
if (node->pei != NULL) if (node->pei != NULL)
ExecParallelFinish(node->pei); ExecParallelFinish(node->pei);
/* Flush local copy of reader array */
if (node->reader)
pfree(node->reader);
node->reader = NULL;
} }
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
...@@ -672,8 +654,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) ...@@ -672,8 +654,6 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
else if (tuple_buffer->done) else if (tuple_buffer->done)
{ {
/* Reader is known to be exhausted. */ /* Reader is known to be exhausted. */
DestroyTupleQueueReader(gm_state->reader[reader - 1]);
gm_state->reader[reader - 1] = NULL;
return false; return false;
} }
else else
......
...@@ -651,11 +651,13 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc) ...@@ -651,11 +651,13 @@ CreateTupleQueueReader(shm_mq_handle *handle, TupleDesc tupledesc)
/* /*
* Destroy a tuple queue reader. * Destroy a tuple queue reader.
*
* Note: cleaning up the underlying shm_mq is the caller's responsibility.
* We won't access it here, as it may be detached already.
*/ */
void void
DestroyTupleQueueReader(TupleQueueReader *reader) DestroyTupleQueueReader(TupleQueueReader *reader)
{ {
shm_mq_detach(reader->queue);
if (reader->typmodmap != NULL) if (reader->typmodmap != NULL)
hash_destroy(reader->typmodmap); hash_destroy(reader->typmodmap);
/* Is it worth trying to free substructure of the remap tree? */ /* Is it worth trying to free substructure of the remap tree? */
......
...@@ -23,17 +23,21 @@ typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation; ...@@ -23,17 +23,21 @@ typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation;
typedef struct ParallelExecutorInfo typedef struct ParallelExecutorInfo
{ {
PlanState *planstate; PlanState *planstate; /* plan subtree we're running in parallel */
ParallelContext *pcxt; ParallelContext *pcxt; /* parallel context we're using */
BufferUsage *buffer_usage; BufferUsage *buffer_usage; /* points to bufusage area in DSM */
SharedExecutorInstrumentation *instrumentation; SharedExecutorInstrumentation *instrumentation; /* optional */
shm_mq_handle **tqueue; dsa_area *area; /* points to DSA area in DSM */
dsa_area *area; bool finished; /* set true by ExecParallelFinish */
bool finished; /* These two arrays have pcxt->nworkers_launched entries: */
shm_mq_handle **tqueue; /* tuple queues for worker output */
struct TupleQueueReader **reader; /* tuple reader/writer support */
} ParallelExecutorInfo; } ParallelExecutorInfo;
extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
EState *estate, int nworkers, int64 tuples_needed); EState *estate, int nworkers, int64 tuples_needed);
extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei,
TupleDesc tupDesc);
extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
extern void ExecParallelReinitialize(PlanState *planstate, extern void ExecParallelReinitialize(PlanState *planstate,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment