Commit b0b0d84b authored by Robert Haas's avatar Robert Haas

Allow a parallel context to relaunch workers.

This may allow some callers to avoid the overhead involved in tearing
down a parallel context and then setting up a new one, which means
releasing the DSM and then allocating and populating a new one.  I
suspect we'll want to revise the Gather node to make use of this new
capability, but even if not it may be useful elsewhere and requires
very little additional code.
parent afdfcd3f
......@@ -221,3 +221,8 @@ pattern looks like this:
DestroyParallelContext(pcxt);
ExitParallelMode();
If desired, after WaitForParallelWorkersToFinish() has been called, another
call to LaunchParallelWorkers() can be made using the same parallel context.
Calls to these two functions can be alternated any number of times before
destroying the parallel context.
......@@ -404,6 +404,52 @@ LaunchParallelWorkers(ParallelContext *pcxt)
/* We might be running in a short-lived memory context. */
oldcontext = MemoryContextSwitchTo(TopTransactionContext);
/*
* This function can be called for a parallel context for which it has
* already been called previously, but only if all of the old workers
* have already exited. When this case arises, we need to do some extra
* reinitialization.
*/
if (pcxt->nworkers_launched > 0)
{
FixedParallelState *fps;
char *error_queue_space;
/* Clean out old worker handles. */
for (i = 0; i < pcxt->nworkers; ++i)
{
if (pcxt->worker[i].error_mqh != NULL)
elog(ERROR, "previously launched worker still alive");
if (pcxt->worker[i].bgwhandle != NULL)
{
pfree(pcxt->worker[i].bgwhandle);
pcxt->worker[i].bgwhandle = NULL;
}
}
/* Reset a few bits of fixed parallel state to a clean state. */
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
fps->workers_attached = 0;
fps->last_xlog_end = 0;
/* Recreate error queues. */
error_queue_space =
shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
for (i = 0; i < pcxt->nworkers; ++i)
{
char *start;
shm_mq *mq;
start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
shm_mq_set_receiver(mq, MyProc);
pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
}
/* Reset number of workers launched. */
pcxt->nworkers_launched = 0;
}
/* Configure a worker. */
snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
MyProcPid);
......@@ -428,8 +474,11 @@ LaunchParallelWorkers(ParallelContext *pcxt)
if (!any_registrations_failed &&
RegisterDynamicBackgroundWorker(&worker,
&pcxt->worker[i].bgwhandle))
{
shm_mq_set_handle(pcxt->worker[i].error_mqh,
pcxt->worker[i].bgwhandle);
pcxt->nworkers_launched++;
}
else
{
/*
......
......@@ -35,6 +35,7 @@ typedef struct ParallelContext
dlist_node node;
SubTransactionId subid;
int nworkers;
int nworkers_launched;
parallel_worker_main_type entrypoint;
char *library_name;
char *function_name;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment