Commit 166b61a8 authored by Robert Haas's avatar Robert Haas

Avoid aggregating worker instrumentation multiple times.

Amit Kapila, per design ideas from me.
parent adeee974
...@@ -277,13 +277,15 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) ...@@ -277,13 +277,15 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
} }
/* /*
* Re-initialize the response queues for backend workers to return tuples * Re-initialize the parallel executor info such that it can be reused by
* to the main backend and start the workers. * workers.
*/ */
shm_mq_handle ** void
ExecParallelReinitializeTupleQueues(ParallelContext *pcxt) ExecParallelReinitialize(ParallelExecutorInfo *pei)
{ {
return ExecParallelSetupTupleQueues(pcxt, true); ReinitializeParallelDSM(pei->pcxt);
pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
pei->finished = false;
} }
/* /*
...@@ -308,6 +310,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) ...@@ -308,6 +310,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
/* Allocate object for return value. */ /* Allocate object for return value. */
pei = palloc0(sizeof(ParallelExecutorInfo)); pei = palloc0(sizeof(ParallelExecutorInfo));
pei->finished = false;
pei->planstate = planstate; pei->planstate = planstate;
/* Fix up and serialize plan to be sent to workers. */ /* Fix up and serialize plan to be sent to workers. */
...@@ -469,6 +472,9 @@ ExecParallelFinish(ParallelExecutorInfo *pei) ...@@ -469,6 +472,9 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
{ {
int i; int i;
if (pei->finished)
return;
/* First, wait for the workers to finish. */ /* First, wait for the workers to finish. */
WaitForParallelWorkersToFinish(pei->pcxt); WaitForParallelWorkersToFinish(pei->pcxt);
...@@ -480,6 +486,8 @@ ExecParallelFinish(ParallelExecutorInfo *pei) ...@@ -480,6 +486,8 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
if (pei->instrumentation) if (pei->instrumentation)
ExecParallelRetrieveInstrumentation(pei->planstate, ExecParallelRetrieveInstrumentation(pei->planstate,
pei->instrumentation); pei->instrumentation);
pei->finished = true;
} }
/* /*
......
...@@ -456,11 +456,7 @@ ExecReScanGather(GatherState *node) ...@@ -456,11 +456,7 @@ ExecReScanGather(GatherState *node)
node->initialized = false; node->initialized = false;
if (node->pei) if (node->pei)
{ ExecParallelReinitialize(node->pei);
ReinitializeParallelDSM(node->pei->pcxt);
node->pei->tqueue =
ExecParallelReinitializeTupleQueues(node->pei->pcxt);
}
ExecReScan(node->ps.lefttree); ExecReScan(node->ps.lefttree);
} }
...@@ -27,12 +27,13 @@ typedef struct ParallelExecutorInfo ...@@ -27,12 +27,13 @@ typedef struct ParallelExecutorInfo
BufferUsage *buffer_usage; BufferUsage *buffer_usage;
SharedExecutorInstrumentation *instrumentation; SharedExecutorInstrumentation *instrumentation;
shm_mq_handle **tqueue; shm_mq_handle **tqueue;
bool finished;
} ParallelExecutorInfo; } ParallelExecutorInfo;
extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
EState *estate, int nworkers); EState *estate, int nworkers);
extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
extern shm_mq_handle **ExecParallelReinitializeTupleQueues(ParallelContext *pcxt); extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
#endif /* EXECPARALLEL_H */ #endif /* EXECPARALLEL_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment