Commit 2badb5af authored by Robert Haas's avatar Robert Haas

Report an ERROR if a parallel worker fails to start properly.

Commit 28724fd9 fixed things so that
if a background worker fails to start due to fork() failure or because
it is terminated before startup succeeds, BGWH_STOPPED will be
reported.  However, that only helps if the code that uses the
background worker machinery notices the change in status, and the code
in parallel.c did not.

To fix that, do two things.  First, make sure that when a worker
exits, it triggers the leader to read from error queues.  That way, if
a worker which has attached to an error queue exits uncleanly, the
leader is sure to throw some error, either the contents of the
ErrorResponse sent by the worker, or "lost connection to parallel
worker" if it exited without sending one.  To cover the case where
the worker never starts up in the first place or exits before
attaching to the error queue, the ParallelContext now keeps track
of which workers have sent at least one message via the error
queue.  A worker which sends no messages by the time the parallel
operation finishes will be checked to see whether it exited before
attaching to the error queue; if so, a new error message, "parallel
worker failed to initialize", will be reported.  If not, we'll
continue to wait until it either starts up and exits cleanly, starts
up and exits uncleanly, or fails to start, and then take the
appropriate action.

Patch by me, reviewed by Amit Kapila.

Discussion: http://postgr.es/m/CA+TgmoYnBgXgdTu6wk5YPdWhmgabYc9nY_pFLq=tB=FSLYkD8Q@mail.gmail.com
parent 160a4f62
...@@ -113,6 +113,9 @@ static FixedParallelState *MyFixedParallelState; ...@@ -113,6 +113,9 @@ static FixedParallelState *MyFixedParallelState;
/* List of active parallel contexts. */ /* List of active parallel contexts. */
static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list); static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
/* Backend-local copy of data from FixedParallelState. */
static pid_t ParallelMasterPid;
/* /*
* List of internal parallel worker entry points. We need this for * List of internal parallel worker entry points. We need this for
* reasons explained in LookupParallelWorkerFunction(), below. * reasons explained in LookupParallelWorkerFunction(), below.
...@@ -133,6 +136,7 @@ static const struct ...@@ -133,6 +136,7 @@ static const struct
static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg); static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
static void WaitForParallelWorkersToExit(ParallelContext *pcxt); static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname); static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
static void ParallelWorkerShutdown(int code, Datum arg);
/* /*
...@@ -433,6 +437,11 @@ ReinitializeParallelDSM(ParallelContext *pcxt) ...@@ -433,6 +437,11 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
WaitForParallelWorkersToFinish(pcxt); WaitForParallelWorkersToFinish(pcxt);
WaitForParallelWorkersToExit(pcxt); WaitForParallelWorkersToExit(pcxt);
pcxt->nworkers_launched = 0; pcxt->nworkers_launched = 0;
if (pcxt->any_message_received)
{
pfree(pcxt->any_message_received);
pcxt->any_message_received = NULL;
}
} }
/* Reset a few bits of fixed parallel state to a clean state. */ /* Reset a few bits of fixed parallel state to a clean state. */
...@@ -531,6 +540,14 @@ LaunchParallelWorkers(ParallelContext *pcxt) ...@@ -531,6 +540,14 @@ LaunchParallelWorkers(ParallelContext *pcxt)
} }
} }
/*
* Now that nworkers_launched has taken its final value, we can initialize
* any_message_received.
*/
if (pcxt->nworkers_launched > 0)
pcxt->any_message_received =
palloc0(sizeof(bool) * pcxt->nworkers_launched);
/* Restore previous memory context. */ /* Restore previous memory context. */
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
} }
...@@ -552,6 +569,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) ...@@ -552,6 +569,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
for (;;) for (;;)
{ {
bool anyone_alive = false; bool anyone_alive = false;
int nfinished = 0;
int i; int i;
/* /*
...@@ -563,7 +581,15 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) ...@@ -563,7 +581,15 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
for (i = 0; i < pcxt->nworkers_launched; ++i) for (i = 0; i < pcxt->nworkers_launched; ++i)
{ {
if (pcxt->worker[i].error_mqh != NULL) /*
* If error_mqh is NULL, then the worker has already exited
* cleanly. If we have received a message through error_mqh from
* the worker, we know it started up cleanly, and therefore we're
* certain to be notified when it exits.
*/
if (pcxt->worker[i].error_mqh == NULL)
++nfinished;
else if (pcxt->any_message_received[i])
{ {
anyone_alive = true; anyone_alive = true;
break; break;
...@@ -571,7 +597,62 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) ...@@ -571,7 +597,62 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
} }
if (!anyone_alive) if (!anyone_alive)
break; {
/* If all workers are known to have finished, we're done. */
if (nfinished >= pcxt->nworkers_launched)
{
Assert(nfinished == pcxt->nworkers_launched);
break;
}
/*
* We didn't detect any living workers, but not all workers are
* known to have exited cleanly. Either not all workers have
* launched yet, or maybe some of them failed to start or
* terminated abnormally.
*/
for (i = 0; i < pcxt->nworkers_launched; ++i)
{
pid_t pid;
shm_mq *mq;
/*
* If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
* should just keep waiting. If it is BGWH_STOPPED, then
* further investigation is needed.
*/
if (pcxt->worker[i].error_mqh == NULL ||
pcxt->worker[i].bgwhandle == NULL ||
GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
&pid) != BGWH_STOPPED)
continue;
/*
* Check whether the worker ended up stopped without ever
* attaching to the error queue. If so, the postmaster was
* unable to fork the worker or it exited without initializing
* properly. We must throw an error, since the caller may
* have been expecting the worker to do some work before
* exiting.
*/
mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
if (shm_mq_get_sender(mq) == NULL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("parallel worker failed to initialize"),
errhint("More details may be available in the server log.")));
/*
* The worker is stopped, but is attached to the error queue.
* Unless there's a bug somewhere, this will only happen when
* the worker writes messages and terminates after the
* CHECK_FOR_INTERRUPTS() near the top of this function and
* before the call to GetBackgroundWorkerPid(). In that case,
* or latch should have been set as well and the right things
* will happen on the next pass through the loop.
*/
}
}
WaitLatch(MyLatch, WL_LATCH_SET, -1, WaitLatch(MyLatch, WL_LATCH_SET, -1,
WAIT_EVENT_PARALLEL_FINISH); WAIT_EVENT_PARALLEL_FINISH);
...@@ -828,6 +909,9 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) ...@@ -828,6 +909,9 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
{ {
char msgtype; char msgtype;
if (pcxt->any_message_received != NULL)
pcxt->any_message_received[i] = true;
msgtype = pq_getmsgbyte(msg); msgtype = pq_getmsgbyte(msg);
switch (msgtype) switch (msgtype)
...@@ -1024,11 +1108,16 @@ ParallelWorkerMain(Datum main_arg) ...@@ -1024,11 +1108,16 @@ ParallelWorkerMain(Datum main_arg)
fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false); fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
MyFixedParallelState = fps; MyFixedParallelState = fps;
/* Arrange to signal the leader if we exit. */
ParallelMasterPid = fps->parallel_master_pid;
ParallelMasterBackendId = fps->parallel_master_backend_id;
on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);
/* /*
* Now that we have a worker number, we can find and attach to the error * Now we can find and attach to the error queue provided for us. That's
* queue provided for us. That's good, because until we do that, any * good, because until we do that, any errors that happen here will not be
* errors that happen here will not be reported back to the process that * reported back to the process that requested that this worker be
* requested that this worker be launched. * launched.
*/ */
error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false); error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
mq = (shm_mq *) (error_queue_space + mq = (shm_mq *) (error_queue_space +
...@@ -1146,9 +1235,6 @@ ParallelWorkerMain(Datum main_arg) ...@@ -1146,9 +1235,6 @@ ParallelWorkerMain(Datum main_arg)
SetTempNamespaceState(fps->temp_namespace_id, SetTempNamespaceState(fps->temp_namespace_id,
fps->temp_toast_namespace_id); fps->temp_toast_namespace_id);
/* Set ParallelMasterBackendId so we know how to address temp relations. */
ParallelMasterBackendId = fps->parallel_master_backend_id;
/* Restore reindex state. */ /* Restore reindex state. */
reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false); reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
RestoreReindexState(reindexspace); RestoreReindexState(reindexspace);
...@@ -1197,6 +1283,20 @@ ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end) ...@@ -1197,6 +1283,20 @@ ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
SpinLockRelease(&fps->mutex); SpinLockRelease(&fps->mutex);
} }
/*
* Make sure the leader tries to read from our error queue one more time.
* This guards against the case where we exit uncleanly without sending an
* ErrorResponse to the leader, for example because some code calls proc_exit
* directly.
*/
static void
ParallelWorkerShutdown(int code, Datum arg)
{
SendProcSignal(ParallelMasterPid,
PROCSIG_PARALLEL_MESSAGE,
ParallelMasterBackendId);
}
/* /*
* Look up (and possibly load) a parallel worker entry point function. * Look up (and possibly load) a parallel worker entry point function.
* *
......
...@@ -43,6 +43,7 @@ typedef struct ParallelContext ...@@ -43,6 +43,7 @@ typedef struct ParallelContext
void *private_memory; void *private_memory;
shm_toc *toc; shm_toc *toc;
ParallelWorkerInfo *worker; ParallelWorkerInfo *worker;
bool *any_message_received;
} ParallelContext; } ParallelContext;
typedef struct ParallelWorkerContext typedef struct ParallelWorkerContext
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment