Commit 3a1f8611 authored by Robert Haas's avatar Robert Haas

Update parallel executor support to reuse the same DSM.

Commit b0b0d84b purported to make it
possible to relaunch workers using the same parallel context, but it had
an unpleasant race condition: we might reinitialize after the workers
have sent their last control message but before they have dettached the
DSM, leaving to crashes.  Repair by introducing a new ParallelContext
operation, ReinitializeParallelDSM.

Adjust execParallel.c to use this new support, so that we can rescan a
Gather node by relaunching workers but without needing to recreate the
DSM.

Amit Kapila, with some adjustments by me.  Extracted from latest parallel
sequential scan patch.
parent c6baec92
...@@ -222,7 +222,9 @@ pattern looks like this: ...@@ -222,7 +222,9 @@ pattern looks like this:
ExitParallelMode(); ExitParallelMode();
If desired, after WaitForParallelWorkersToFinish() has been called, another If desired, after WaitForParallelWorkersToFinish() has been called, the
call to LaunchParallelWorkers() can be made using the same parallel context. context can be reset so that workers can be launched anew using the same
Calls to these two functions can be alternated any number of times before parallel context. To do this, first call ReinitializeParallelDSM() to
destroying the parallel context. reinitialize state managed by the parallel context machinery itself; then,
perform any other necessary resetting of state; after that, you can again
call LaunchParallelWorkers.
...@@ -110,6 +110,7 @@ static void HandleParallelMessage(ParallelContext *, int, StringInfo msg); ...@@ -110,6 +110,7 @@ static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
static void ParallelErrorContext(void *arg); static void ParallelErrorContext(void *arg);
static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc); static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
static void ParallelWorkerMain(Datum main_arg); static void ParallelWorkerMain(Datum main_arg);
static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
/* /*
* Establish a new parallel context. This should be done after entering * Establish a new parallel context. This should be done after entering
...@@ -383,6 +384,46 @@ InitializeParallelDSM(ParallelContext *pcxt) ...@@ -383,6 +384,46 @@ InitializeParallelDSM(ParallelContext *pcxt)
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
} }
/*
* Reinitialize the dynamic shared memory segment for a parallel context such
* that we could launch workers for it again.
*/
void
ReinitializeParallelDSM(ParallelContext *pcxt)
{
FixedParallelState *fps;
char *error_queue_space;
int i;
if (pcxt->nworkers_launched == 0)
return;
WaitForParallelWorkersToFinish(pcxt);
WaitForParallelWorkersToExit(pcxt);
/* Reset a few bits of fixed parallel state to a clean state. */
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
fps->workers_attached = 0;
fps->last_xlog_end = 0;
/* Recreate error queues. */
error_queue_space =
shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
for (i = 0; i < pcxt->nworkers; ++i)
{
char *start;
shm_mq *mq;
start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
shm_mq_set_receiver(mq, MyProc);
pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
}
/* Reset number of workers launched. */
pcxt->nworkers_launched = 0;
}
/* /*
* Launch parallel workers. * Launch parallel workers.
*/ */
...@@ -404,52 +445,6 @@ LaunchParallelWorkers(ParallelContext *pcxt) ...@@ -404,52 +445,6 @@ LaunchParallelWorkers(ParallelContext *pcxt)
/* We might be running in a short-lived memory context. */ /* We might be running in a short-lived memory context. */
oldcontext = MemoryContextSwitchTo(TopTransactionContext); oldcontext = MemoryContextSwitchTo(TopTransactionContext);
/*
* This function can be called for a parallel context for which it has
* already been called previously, but only if all of the old workers
* have already exited. When this case arises, we need to do some extra
* reinitialization.
*/
if (pcxt->nworkers_launched > 0)
{
FixedParallelState *fps;
char *error_queue_space;
/* Clean out old worker handles. */
for (i = 0; i < pcxt->nworkers; ++i)
{
if (pcxt->worker[i].error_mqh != NULL)
elog(ERROR, "previously launched worker still alive");
if (pcxt->worker[i].bgwhandle != NULL)
{
pfree(pcxt->worker[i].bgwhandle);
pcxt->worker[i].bgwhandle = NULL;
}
}
/* Reset a few bits of fixed parallel state to a clean state. */
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
fps->workers_attached = 0;
fps->last_xlog_end = 0;
/* Recreate error queues. */
error_queue_space =
shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
for (i = 0; i < pcxt->nworkers; ++i)
{
char *start;
shm_mq *mq;
start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
shm_mq_set_receiver(mq, MyProc);
pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
}
/* Reset number of workers launched. */
pcxt->nworkers_launched = 0;
}
/* Configure a worker. */ /* Configure a worker. */
snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d", snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
MyProcPid); MyProcPid);
...@@ -501,7 +496,7 @@ LaunchParallelWorkers(ParallelContext *pcxt) ...@@ -501,7 +496,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
} }
/* /*
* Wait for all workers to exit. * Wait for all workers to finish computing.
* *
* Even if the parallel operation seems to have completed successfully, it's * Even if the parallel operation seems to have completed successfully, it's
* important to call this function afterwards. We must not miss any errors * important to call this function afterwards. We must not miss any errors
...@@ -552,6 +547,46 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) ...@@ -552,6 +547,46 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
} }
} }
/*
* Wait for all workers to exit.
*
* This function ensures that workers have been completely shutdown. The
* difference between WaitForParallelWorkersToFinish and this function is
* that former just ensures that last message sent by worker backend is
* received by master backend whereas this ensures the complete shutdown.
*/
static void
WaitForParallelWorkersToExit(ParallelContext *pcxt)
{
int i;
/* Wait until the workers actually die. */
for (i = 0; i < pcxt->nworkers; ++i)
{
BgwHandleStatus status;
if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
continue;
status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
/*
* If the postmaster kicked the bucket, we have no chance of cleaning
* up safely -- we won't be able to tell when our workers are actually
* dead. This doesn't necessitate a PANIC since they will all abort
* eventually, but we can't safely continue this session.
*/
if (status == BGWH_POSTMASTER_DIED)
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("postmaster exited during a parallel transaction")));
/* Release memory. */
pfree(pcxt->worker[i].bgwhandle);
pcxt->worker[i].bgwhandle = NULL;
}
}
/* /*
* Destroy a parallel context. * Destroy a parallel context.
* *
...@@ -578,10 +613,10 @@ DestroyParallelContext(ParallelContext *pcxt) ...@@ -578,10 +613,10 @@ DestroyParallelContext(ParallelContext *pcxt)
{ {
for (i = 0; i < pcxt->nworkers; ++i) for (i = 0; i < pcxt->nworkers; ++i)
{ {
if (pcxt->worker[i].bgwhandle != NULL)
TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
if (pcxt->worker[i].error_mqh != NULL) if (pcxt->worker[i].error_mqh != NULL)
{ {
TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
pfree(pcxt->worker[i].error_mqh); pfree(pcxt->worker[i].error_mqh);
pcxt->worker[i].error_mqh = NULL; pcxt->worker[i].error_mqh = NULL;
} }
...@@ -609,38 +644,14 @@ DestroyParallelContext(ParallelContext *pcxt) ...@@ -609,38 +644,14 @@ DestroyParallelContext(ParallelContext *pcxt)
pcxt->private_memory = NULL; pcxt->private_memory = NULL;
} }
/* Wait until the workers actually die. */ /*
for (i = 0; i < pcxt->nworkers; ++i) * We can't finish transaction commit or abort until all of the
{ * workers have exited. This means, in particular, that we can't respond
BgwHandleStatus status; * to interrupts at this stage.
*/
if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL) HOLD_INTERRUPTS();
continue; WaitForParallelWorkersToExit(pcxt);
RESUME_INTERRUPTS();
/*
* We can't finish transaction commit or abort until all of the
* workers are dead. This means, in particular, that we can't respond
* to interrupts at this stage.
*/
HOLD_INTERRUPTS();
status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
RESUME_INTERRUPTS();
/*
* If the postmaster kicked the bucket, we have no chance of cleaning
* up safely -- we won't be able to tell when our workers are actually
* dead. This doesn't necessitate a PANIC since they will all abort
* eventually, but we can't safely continue this session.
*/
if (status == BGWH_POSTMASTER_DIED)
ereport(FATAL,
(errcode(ERRCODE_ADMIN_SHUTDOWN),
errmsg("postmaster exited during a parallel transaction")));
/* Release memory. */
pfree(pcxt->worker[i].bgwhandle);
pcxt->worker[i].bgwhandle = NULL;
}
/* Free the worker array itself. */ /* Free the worker array itself. */
if (pcxt->worker != NULL) if (pcxt->worker != NULL)
...@@ -799,9 +810,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) ...@@ -799,9 +810,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
case 'X': /* Terminate, indicating clean exit */ case 'X': /* Terminate, indicating clean exit */
{ {
pfree(pcxt->worker[i].bgwhandle);
pfree(pcxt->worker[i].error_mqh); pfree(pcxt->worker[i].error_mqh);
pcxt->worker[i].bgwhandle = NULL;
pcxt->worker[i].error_mqh = NULL; pcxt->worker[i].error_mqh = NULL;
break; break;
} }
......
...@@ -84,7 +84,8 @@ static bool ExecParallelEstimate(PlanState *node, ...@@ -84,7 +84,8 @@ static bool ExecParallelEstimate(PlanState *node,
ExecParallelEstimateContext *e); ExecParallelEstimateContext *e);
static bool ExecParallelInitializeDSM(PlanState *node, static bool ExecParallelInitializeDSM(PlanState *node,
ExecParallelInitializeDSMContext *d); ExecParallelInitializeDSMContext *d);
static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt); static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
bool reinitialize);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation); SharedExecutorInstrumentation *instrumentation);
...@@ -217,7 +218,7 @@ ExecParallelInitializeDSM(PlanState *planstate, ...@@ -217,7 +218,7 @@ ExecParallelInitializeDSM(PlanState *planstate,
* to the main backend and start the workers. * to the main backend and start the workers.
*/ */
static shm_mq_handle ** static shm_mq_handle **
ExecParallelSetupTupleQueues(ParallelContext *pcxt) ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
{ {
shm_mq_handle **responseq; shm_mq_handle **responseq;
char *tqueuespace; char *tqueuespace;
...@@ -231,9 +232,16 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt) ...@@ -231,9 +232,16 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
responseq = (shm_mq_handle **) responseq = (shm_mq_handle **)
palloc(pcxt->nworkers * sizeof(shm_mq_handle *)); palloc(pcxt->nworkers * sizeof(shm_mq_handle *));
/* Allocate space from the DSM for the queues themselves. */ /*
tqueuespace = shm_toc_allocate(pcxt->toc, * If not reinitializing, allocate space from the DSM for the queues;
PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers); * otherwise, find the already allocated space.
*/
if (!reinitialize)
tqueuespace =
shm_toc_allocate(pcxt->toc,
PARALLEL_TUPLE_QUEUE_SIZE * pcxt->nworkers);
else
tqueuespace = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE);
/* Create the queues, and become the receiver for each. */ /* Create the queues, and become the receiver for each. */
for (i = 0; i < pcxt->nworkers; ++i) for (i = 0; i < pcxt->nworkers; ++i)
...@@ -248,12 +256,23 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt) ...@@ -248,12 +256,23 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt)
} }
/* Add array of queues to shm_toc, so others can find it. */ /* Add array of queues to shm_toc, so others can find it. */
shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace); if (!reinitialize)
shm_toc_insert(pcxt->toc, PARALLEL_KEY_TUPLE_QUEUE, tqueuespace);
/* Return array of handles. */ /* Return array of handles. */
return responseq; return responseq;
} }
/*
* Re-initialize the response queues for backend workers to return tuples
* to the main backend and start the workers.
*/
shm_mq_handle **
ExecParallelReinitializeTupleQueues(ParallelContext *pcxt)
{
return ExecParallelSetupTupleQueues(pcxt, true);
}
/* /*
* Sets up the required infrastructure for backend workers to perform * Sets up the required infrastructure for backend workers to perform
* execution and return results to the main backend. * execution and return results to the main backend.
...@@ -363,7 +382,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) ...@@ -363,7 +382,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
pei->buffer_usage = bufusage_space; pei->buffer_usage = bufusage_space;
/* Set up tuple queues. */ /* Set up tuple queues. */
pei->tqueue = ExecParallelSetupTupleQueues(pcxt); pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
/* /*
* If instrumentation options were supplied, allocate space for the * If instrumentation options were supplied, allocate space for the
......
...@@ -41,6 +41,7 @@ ...@@ -41,6 +41,7 @@
static TupleTableSlot *gather_getnext(GatherState *gatherstate); static TupleTableSlot *gather_getnext(GatherState *gatherstate);
static void ExecShutdownGatherWorkers(GatherState *node);
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
...@@ -150,9 +151,10 @@ ExecGather(GatherState *node) ...@@ -150,9 +151,10 @@ ExecGather(GatherState *node)
bool got_any_worker = false; bool got_any_worker = false;
/* Initialize the workers required to execute Gather node. */ /* Initialize the workers required to execute Gather node. */
node->pei = ExecInitParallelPlan(node->ps.lefttree, if (!node->pei)
estate, node->pei = ExecInitParallelPlan(node->ps.lefttree,
gather->num_workers); estate,
gather->num_workers);
/* /*
* Register backend workers. We might not get as many as we * Register backend workers. We might not get as many as we
...@@ -279,7 +281,7 @@ gather_getnext(GatherState *gatherstate) ...@@ -279,7 +281,7 @@ gather_getnext(GatherState *gatherstate)
gatherstate->need_to_scan_locally, gatherstate->need_to_scan_locally,
&done); &done);
if (done) if (done)
ExecShutdownGather(gatherstate); ExecShutdownGatherWorkers(gatherstate);
if (HeapTupleIsValid(tup)) if (HeapTupleIsValid(tup))
{ {
...@@ -308,15 +310,15 @@ gather_getnext(GatherState *gatherstate) ...@@ -308,15 +310,15 @@ gather_getnext(GatherState *gatherstate)
} }
/* ---------------------------------------------------------------- /* ----------------------------------------------------------------
* ExecShutdownGather * ExecShutdownGatherWorkers
* *
* Destroy the setup for parallel workers. Collect all the * Destroy the parallel workers. Collect all the stats after
* stats after workers are stopped, else some work done by * workers are stopped, else some work done by workers won't be
* workers won't be accounted. * accounted.
* ---------------------------------------------------------------- * ----------------------------------------------------------------
*/ */
void void
ExecShutdownGather(GatherState *node) ExecShutdownGatherWorkers(GatherState *node)
{ {
/* Shut down tuple queue funnel before shutting down workers. */ /* Shut down tuple queue funnel before shutting down workers. */
if (node->funnel != NULL) if (node->funnel != NULL)
...@@ -327,8 +329,25 @@ ExecShutdownGather(GatherState *node) ...@@ -327,8 +329,25 @@ ExecShutdownGather(GatherState *node)
/* Now shut down the workers. */ /* Now shut down the workers. */
if (node->pei != NULL) if (node->pei != NULL)
{
ExecParallelFinish(node->pei); ExecParallelFinish(node->pei);
}
/* ----------------------------------------------------------------
* ExecShutdownGather
*
* Destroy the setup for parallel workers including parallel context.
* Collect all the stats after workers are stopped, else some work
* done by workers won't be accounted.
* ----------------------------------------------------------------
*/
void
ExecShutdownGather(GatherState *node)
{
ExecShutdownGatherWorkers(node);
/* Now destroy the parallel context. */
if (node->pei != NULL)
{
ExecParallelCleanup(node->pei); ExecParallelCleanup(node->pei);
node->pei = NULL; node->pei = NULL;
} }
...@@ -349,14 +368,21 @@ void ...@@ -349,14 +368,21 @@ void
ExecReScanGather(GatherState *node) ExecReScanGather(GatherState *node)
{ {
/* /*
* Re-initialize the parallel context and workers to perform rescan of * Re-initialize the parallel workers to perform rescan of relation.
* relation. We want to gracefully shutdown all the workers so that they * We want to gracefully shutdown all the workers so that they
* should be able to propagate any error or other information to master * should be able to propagate any error or other information to master
* backend before dying. * backend before dying. Parallel context will be reused for rescan.
*/ */
ExecShutdownGather(node); ExecShutdownGatherWorkers(node);
node->initialized = false; node->initialized = false;
if (node->pei)
{
ReinitializeParallelDSM(node->pei->pcxt);
node->pei->tqueue =
ExecParallelReinitializeTupleQueues(node->pei->pcxt);
}
ExecReScan(node->ps.lefttree); ExecReScan(node->ps.lefttree);
} }
...@@ -56,6 +56,7 @@ extern bool InitializingParallelWorker; ...@@ -56,6 +56,7 @@ extern bool InitializingParallelWorker;
extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers); extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers); extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers);
extern void InitializeParallelDSM(ParallelContext *); extern void InitializeParallelDSM(ParallelContext *);
extern void ReinitializeParallelDSM(ParallelContext *pcxt);
extern void LaunchParallelWorkers(ParallelContext *); extern void LaunchParallelWorkers(ParallelContext *);
extern void WaitForParallelWorkersToFinish(ParallelContext *); extern void WaitForParallelWorkersToFinish(ParallelContext *);
extern void DestroyParallelContext(ParallelContext *); extern void DestroyParallelContext(ParallelContext *);
......
...@@ -33,5 +33,6 @@ extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, ...@@ -33,5 +33,6 @@ extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
EState *estate, int nworkers); EState *estate, int nworkers);
extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
extern shm_mq_handle **ExecParallelReinitializeTupleQueues(ParallelContext *pcxt);
#endif /* EXECPARALLEL_H */ #endif /* EXECPARALLEL_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment