Commit 64b2e7ad authored by Robert Haas's avatar Robert Haas

Pass extra data to bgworkers, and use this to fix parallel contexts.

Up until now, the total amount of data that could be passed to a
background worker at startup was one datum, which can be a small as
4 bytes on some systems.  That's enough to pass a dsm_handle or an
array index, but not much else.  Add a bgw_extra flag to the
BackgroundWorker struct, allowing up to 128 bytes to be passed to
a new worker on any platform.

Use this to fix a problem I recently discovered with the parallel
context machinery added in 9.5: the master assigns each worker an
array index, and each worker subsequently assigns itself an array
index, and there's nothing to guarantee that the two sets of indexes
match, leading to chaos.

Normally, I would not back-patch the change to add bgw_extra, since it
is basically a feature addition.  However, since 9.5 is still in beta
and there seems to be no other sensible way to repair the broken
parallel context machinery, back-patch to 9.5.  Existing background
worker code can ignore the bgw_extra field without a problem, but
might need to be recompiled since the structure size has changed.

Report and patch by me.  Review by Amit Kapila.
parent 59464bd6
...@@ -58,6 +58,7 @@ typedef struct BackgroundWorker ...@@ -58,6 +58,7 @@ typedef struct BackgroundWorker
char bgw_library_name[BGW_MAXLEN]; /* only if bgw_main is NULL */ char bgw_library_name[BGW_MAXLEN]; /* only if bgw_main is NULL */
char bgw_function_name[BGW_MAXLEN]; /* only if bgw_main is NULL */ char bgw_function_name[BGW_MAXLEN]; /* only if bgw_main is NULL */
Datum bgw_main_arg; Datum bgw_main_arg;
char bgw_extra[BGW_EXTRALEN];
int bgw_notify_pid; int bgw_notify_pid;
} BackgroundWorker; } BackgroundWorker;
</programlisting> </programlisting>
...@@ -182,6 +183,13 @@ typedef struct BackgroundWorker ...@@ -182,6 +183,13 @@ typedef struct BackgroundWorker
new background worker process. new background worker process.
</para> </para>
<para>
<structfield>bgw_extra</structfield> can contain extra data to be passed
to the background worker. Unlike <structfield>bgw_main_arg</>, this data
is not passed as an argument to the worker's main function, but it can be
accessed via <literal>MyBgworkerEntry</literal>, as discussed above.
</para>
<para> <para>
<structfield>bgw_notify_pid</structfield> is the PID of a PostgreSQL <structfield>bgw_notify_pid</structfield> is the PID of a PostgreSQL
backend process to which the postmaster should send <literal>SIGUSR1</> backend process to which the postmaster should send <literal>SIGUSR1</>
......
...@@ -77,10 +77,6 @@ typedef struct FixedParallelState ...@@ -77,10 +77,6 @@ typedef struct FixedParallelState
/* Mutex protects remaining fields. */ /* Mutex protects remaining fields. */
slock_t mutex; slock_t mutex;
/* Track whether workers have attached. */
int workers_expected;
int workers_attached;
/* Maximum XactLastRecEnd of any worker. */ /* Maximum XactLastRecEnd of any worker. */
XLogRecPtr last_xlog_end; XLogRecPtr last_xlog_end;
} FixedParallelState; } FixedParallelState;
...@@ -295,8 +291,6 @@ InitializeParallelDSM(ParallelContext *pcxt) ...@@ -295,8 +291,6 @@ InitializeParallelDSM(ParallelContext *pcxt)
fps->parallel_master_backend_id = MyBackendId; fps->parallel_master_backend_id = MyBackendId;
fps->entrypoint = pcxt->entrypoint; fps->entrypoint = pcxt->entrypoint;
SpinLockInit(&fps->mutex); SpinLockInit(&fps->mutex);
fps->workers_expected = pcxt->nworkers;
fps->workers_attached = 0;
fps->last_xlog_end = 0; fps->last_xlog_end = 0;
shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps); shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
...@@ -403,7 +397,6 @@ ReinitializeParallelDSM(ParallelContext *pcxt) ...@@ -403,7 +397,6 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
/* Reset a few bits of fixed parallel state to a clean state. */ /* Reset a few bits of fixed parallel state to a clean state. */
fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED); fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
fps->workers_attached = 0;
fps->last_xlog_end = 0; fps->last_xlog_end = 0;
/* Recreate error queues. */ /* Recreate error queues. */
...@@ -455,6 +448,7 @@ LaunchParallelWorkers(ParallelContext *pcxt) ...@@ -455,6 +448,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
worker.bgw_main = ParallelWorkerMain; worker.bgw_main = ParallelWorkerMain;
worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg)); worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
worker.bgw_notify_pid = MyProcPid; worker.bgw_notify_pid = MyProcPid;
memset(&worker.bgw_extra, 0, BGW_EXTRALEN);
/* /*
* Start workers. * Start workers.
...@@ -466,6 +460,7 @@ LaunchParallelWorkers(ParallelContext *pcxt) ...@@ -466,6 +460,7 @@ LaunchParallelWorkers(ParallelContext *pcxt)
*/ */
for (i = 0; i < pcxt->nworkers; ++i) for (i = 0; i < pcxt->nworkers; ++i)
{ {
memcpy(worker.bgw_extra, &i, sizeof(int));
if (!any_registrations_failed && if (!any_registrations_failed &&
RegisterDynamicBackgroundWorker(&worker, RegisterDynamicBackgroundWorker(&worker,
&pcxt->worker[i].bgwhandle)) &pcxt->worker[i].bgwhandle))
...@@ -891,6 +886,10 @@ ParallelWorkerMain(Datum main_arg) ...@@ -891,6 +886,10 @@ ParallelWorkerMain(Datum main_arg)
pqsignal(SIGTERM, die); pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals(); BackgroundWorkerUnblockSignals();
/* Determine and set our parallel worker number. */
Assert(ParallelWorkerNumber == -1);
memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
/* Set up a memory context and resource owner. */ /* Set up a memory context and resource owner. */
Assert(CurrentResourceOwner == NULL); Assert(CurrentResourceOwner == NULL);
CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel"); CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
...@@ -915,18 +914,9 @@ ParallelWorkerMain(Datum main_arg) ...@@ -915,18 +914,9 @@ ParallelWorkerMain(Datum main_arg)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("bad magic number in dynamic shared memory segment"))); errmsg("bad magic number in dynamic shared memory segment")));
/* Determine and set our worker number. */ /* Look up fixed parallel state. */
fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED); fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED);
Assert(fps != NULL); Assert(fps != NULL);
Assert(ParallelWorkerNumber == -1);
SpinLockAcquire(&fps->mutex);
if (fps->workers_attached < fps->workers_expected)
ParallelWorkerNumber = fps->workers_attached++;
SpinLockRelease(&fps->mutex);
if (ParallelWorkerNumber < 0)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("too many parallel workers already attached")));
MyFixedParallelState = fps; MyFixedParallelState = fps;
/* /*
......
...@@ -314,6 +314,7 @@ BackgroundWorkerStateChange(void) ...@@ -314,6 +314,7 @@ BackgroundWorkerStateChange(void)
rw->rw_worker.bgw_restart_time = slot->worker.bgw_restart_time; rw->rw_worker.bgw_restart_time = slot->worker.bgw_restart_time;
rw->rw_worker.bgw_main = slot->worker.bgw_main; rw->rw_worker.bgw_main = slot->worker.bgw_main;
rw->rw_worker.bgw_main_arg = slot->worker.bgw_main_arg; rw->rw_worker.bgw_main_arg = slot->worker.bgw_main_arg;
memcpy(rw->rw_worker.bgw_extra, slot->worker.bgw_extra, BGW_EXTRALEN);
/* /*
* Copy the PID to be notified about state changes, but only if the * Copy the PID to be notified about state changes, but only if the
......
...@@ -74,6 +74,7 @@ typedef enum ...@@ -74,6 +74,7 @@ typedef enum
#define BGW_DEFAULT_RESTART_INTERVAL 60 #define BGW_DEFAULT_RESTART_INTERVAL 60
#define BGW_NEVER_RESTART -1 #define BGW_NEVER_RESTART -1
#define BGW_MAXLEN 64 #define BGW_MAXLEN 64
#define BGW_EXTRALEN 128
typedef struct BackgroundWorker typedef struct BackgroundWorker
{ {
...@@ -85,6 +86,7 @@ typedef struct BackgroundWorker ...@@ -85,6 +86,7 @@ typedef struct BackgroundWorker
char bgw_library_name[BGW_MAXLEN]; /* only if bgw_main is NULL */ char bgw_library_name[BGW_MAXLEN]; /* only if bgw_main is NULL */
char bgw_function_name[BGW_MAXLEN]; /* only if bgw_main is NULL */ char bgw_function_name[BGW_MAXLEN]; /* only if bgw_main is NULL */
Datum bgw_main_arg; Datum bgw_main_arg;
char bgw_extra[BGW_EXTRALEN];
pid_t bgw_notify_pid; /* SIGUSR1 this backend on start/stop */ pid_t bgw_notify_pid; /* SIGUSR1 this backend on start/stop */
} BackgroundWorker; } BackgroundWorker;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment