Commit df4685fb authored by Robert Haas's avatar Robert Haas

Minor optimizations based on ParallelContext having nworkers_launched.

Originally, we didn't have nworkers_launched, so code that used parallel
contexts had to be preprared for the possibility that not all of the
workers requested actually got launched.  But now we can count on knowing
the number of workers that were successfully launched, which can shave
off a few cycles and simplify some code slightly.

Amit Kapila, reviewed by Haribabu Kommi, per a suggestion from Peter
Geoghegan.
parent 546cd0d7
......@@ -520,7 +520,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
*/
CHECK_FOR_INTERRUPTS();
for (i = 0; i < pcxt->nworkers; ++i)
for (i = 0; i < pcxt->nworkers_launched; ++i)
{
if (pcxt->worker[i].error_mqh != NULL)
{
......@@ -560,7 +560,7 @@ WaitForParallelWorkersToExit(ParallelContext *pcxt)
int i;
/* Wait until the workers actually die. */
for (i = 0; i < pcxt->nworkers; ++i)
for (i = 0; i < pcxt->nworkers_launched; ++i)
{
BgwHandleStatus status;
......@@ -610,7 +610,7 @@ DestroyParallelContext(ParallelContext *pcxt)
/* Kill each worker in turn, and forget their error queues. */
if (pcxt->worker != NULL)
{
for (i = 0; i < pcxt->nworkers; ++i)
for (i = 0; i < pcxt->nworkers_launched; ++i)
{
if (pcxt->worker[i].error_mqh != NULL)
{
......@@ -708,7 +708,7 @@ HandleParallelMessages(void)
if (pcxt->worker == NULL)
continue;
for (i = 0; i < pcxt->nworkers; ++i)
for (i = 0; i < pcxt->nworkers_launched; ++i)
{
/*
* Read as many messages as we can from each worker, but stop when
......
......@@ -522,7 +522,7 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
WaitForParallelWorkersToFinish(pei->pcxt);
/* Next, accumulate buffer usage. */
for (i = 0; i < pei->pcxt->nworkers; ++i)
for (i = 0; i < pei->pcxt->nworkers_launched; ++i)
InstrAccumParallelQuery(&pei->buffer_usage[i]);
/* Finally, accumulate instrumentation, if any. */
......
......@@ -153,7 +153,6 @@ ExecGather(GatherState *node)
if (gather->num_workers > 0 && IsInParallelMode())
{
ParallelContext *pcxt;
bool got_any_worker = false;
/* Initialize the workers required to execute Gather node. */
if (!node->pei)
......@@ -169,30 +168,27 @@ ExecGather(GatherState *node)
LaunchParallelWorkers(pcxt);
/* Set up tuple queue readers to read the results. */
if (pcxt->nworkers > 0)
if (pcxt->nworkers_launched > 0)
{
node->nreaders = 0;
node->reader =
palloc(pcxt->nworkers * sizeof(TupleQueueReader *));
palloc(pcxt->nworkers_launched * sizeof(TupleQueueReader *));
for (i = 0; i < pcxt->nworkers; ++i)
for (i = 0; i < pcxt->nworkers_launched; ++i)
{
if (pcxt->worker[i].bgwhandle == NULL)
continue;
shm_mq_set_handle(node->pei->tqueue[i],
pcxt->worker[i].bgwhandle);
node->reader[node->nreaders++] =
CreateTupleQueueReader(node->pei->tqueue[i],
fslot->tts_tupleDescriptor);
got_any_worker = true;
}
}
else
{
/* No workers? Then never mind. */
if (!got_any_worker)
ExecShutdownGatherWorkers(node);
}
}
/* Run plan locally if no workers or not single-copy. */
node->need_to_scan_locally = (node->reader == NULL)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment