Commit bfb93742 authored by Tom Lane's avatar Tom Lane

Fix fuzzy thinking in ReinitializeParallelDSM().

The fact that no workers were successfully launched in the previous
iteration does not excuse us from setting up properly to try again.
This appears to explain crashes I saw in parallel regression testing
due to error_mqh being NULL when it shouldn't be.

Minor other cosmetic fixes too.
parent 75be6646
...@@ -191,8 +191,8 @@ CreateParallelContextForExternalFunction(char *library_name, ...@@ -191,8 +191,8 @@ CreateParallelContextForExternalFunction(char *library_name,
/* /*
* Establish the dynamic shared memory segment for a parallel context and * Establish the dynamic shared memory segment for a parallel context and
* copied state and other bookkeeping information that will need by parallel * copy state and other bookkeeping information that will be needed by
* workers into it. * parallel workers into it.
*/ */
void void
InitializeParallelDSM(ParallelContext *pcxt) InitializeParallelDSM(ParallelContext *pcxt)
...@@ -271,7 +271,7 @@ InitializeParallelDSM(ParallelContext *pcxt) ...@@ -271,7 +271,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
* parallelism than to fail outright. * parallelism than to fail outright.
*/ */
segsize = shm_toc_estimate(&pcxt->estimator); segsize = shm_toc_estimate(&pcxt->estimator);
if (pcxt->nworkers != 0) if (pcxt->nworkers > 0)
pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
if (pcxt->seg != NULL) if (pcxt->seg != NULL)
pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
...@@ -397,11 +397,13 @@ ReinitializeParallelDSM(ParallelContext *pcxt) ...@@ -397,11 +397,13 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
char *error_queue_space; char *error_queue_space;
int i; int i;
if (pcxt->nworkers_launched == 0) /* Wait for any old workers to exit. */
return; if (pcxt->nworkers_launched > 0)
{
WaitForParallelWorkersToFinish(pcxt); WaitForParallelWorkersToFinish(pcxt);
WaitForParallelWorkersToExit(pcxt); WaitForParallelWorkersToExit(pcxt);
pcxt->nworkers_launched = 0;
}
/* Reset a few bits of fixed parallel state to a clean state. */ /* Reset a few bits of fixed parallel state to a clean state. */
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
...@@ -420,9 +422,6 @@ ReinitializeParallelDSM(ParallelContext *pcxt) ...@@ -420,9 +422,6 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
shm_mq_set_receiver(mq, MyProc); shm_mq_set_receiver(mq, MyProc);
pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
} }
/* Reset number of workers launched. */
pcxt->nworkers_launched = 0;
} }
/* /*
...@@ -493,6 +492,7 @@ LaunchParallelWorkers(ParallelContext *pcxt) ...@@ -493,6 +492,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
*/ */
any_registrations_failed = true; any_registrations_failed = true;
pcxt->worker[i].bgwhandle = NULL; pcxt->worker[i].bgwhandle = NULL;
pfree(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL; pcxt->worker[i].error_mqh = NULL;
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment