Commit 090d0f20 authored by Robert Haas's avatar Robert Haas

Allow discovery of whether a dynamic background worker is running.

Using the infrastructure provided by this patch, it's possible either
to wait for the startup of a dynamically-registered background worker,
or to poll the status of such a worker without waiting.  In either
case, the current PID of the worker process can also be obtained.
As usual, worker_spi is updated to demonstrate the new functionality.

Patch by me.  Review by Andres Freund.
parent c9e2e2db
......@@ -4,6 +4,6 @@
\echo Use "CREATE EXTENSION worker_spi" to load this file. \quit
CREATE FUNCTION worker_spi_launch(pg_catalog.int4)
RETURNS pg_catalog.bool STRICT
RETURNS pg_catalog.int4 STRICT
AS 'MODULE_PATHNAME'
LANGUAGE C;
......@@ -365,6 +365,9 @@ worker_spi_launch(PG_FUNCTION_ARGS)
{
int32 i = PG_GETARG_INT32(0);
BackgroundWorker worker;
BackgroundWorkerHandle *handle;
BgwHandleStatus status;
pid_t pid;
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
......@@ -375,6 +378,25 @@ worker_spi_launch(PG_FUNCTION_ARGS)
sprintf(worker.bgw_function_name, "worker_spi_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
worker.bgw_main_arg = Int32GetDatum(i);
PG_RETURN_BOOL(RegisterDynamicBackgroundWorker(&worker));
/* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
PG_RETURN_NULL();
status = WaitForBackgroundWorkerStartup(handle, &pid);
if (status == BGWH_STOPPED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("could not start background process"),
errhint("More details may be available in the server log.")));
if (status == BGWH_POSTMASTER_DIED)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
errmsg("cannot start background processes without postmaster"),
errhint("Kill all remaining database processes and restart the database.")));
Assert(status == BGWH_STARTED);
PG_RETURN_INT32(pid);
}
......@@ -37,11 +37,11 @@
<function>RegisterBackgroundWorker(<type>BackgroundWorker *worker</type>)</function>
from its <function>_PG_init()</>. Background workers can also be started
after the system is up and running by calling the function
<function>RegisterDynamicBackgroundWorker</function>(<type>BackgroundWorker
*worker</type>). Unlike <function>RegisterBackgroundWorker</>, which can
only be called from within the postmaster,
<function>RegisterDynamicBackgroundWorker</function> must be called from
a regular backend.
<function>RegisterDynamicBackgroundWorker(<type>BackgroundWorker
*worker, BackgroundWorkerHandle **handle</type>)</function>. Unlike
<function>RegisterBackgroundWorker</>, which can only be called from within
the postmaster, <function>RegisterDynamicBackgroundWorker</function> must be
called from a regular backend.
</para>
<para>
......@@ -58,6 +58,7 @@ typedef struct BackgroundWorker
char bgw_library_name[BGW_MAXLEN]; /* only if bgw_main is NULL */
char bgw_function_name[BGW_MAXLEN]; /* only if bgw_main is NULL */
Datum bgw_main_arg;
int bgw_notify_pid;
} BackgroundWorker;
</programlisting>
</para>
......@@ -135,6 +136,15 @@ typedef struct BackgroundWorker
<structfield>bgw_main</structfield> is NULL.
</para>
<para>
<structfield>bgw_notify_pid</structfield> is the PID of a PostgreSQL
backend process to which the postmaster should send <literal>SIGUSR1</>
when the process is started or exits. It should be 0 for workers registered
at postmaster startup time, or when the backend registering the worker does
not wish to wait for the worker to start up. Otherwise, it should be
initialized to <literal>MyProcPid</>.
</para>
<para>Once running, the process can connect to a database by calling
<function>BackgroundWorkerInitializeConnection(<parameter>char *dbname</parameter>, <parameter>char *username</parameter>)</function>.
This allows the process to run transactions and queries using the
......@@ -165,6 +175,40 @@ typedef struct BackgroundWorker
<command>postgres</> itself has terminated.
</para>
<para>
When a background worker is registered using the
<function>RegisterDynamicBackgroundWorker</function> function, it is
possible for the backend performing the registration to obtain information
the status of the worker. Backends wishing to do this should pass the
address of a <type>BackgroundWorkerHandle *</type> as the second argument
to <function>RegisterDynamicBackgroundWorker</function>. If the worker
is successfully registered, this pointer will be initialized with an
opaque handle that can subsequently be passed to
<function>GetBackgroundWorkerPid(<parameter>BackgroundWorkerHandle *</parameter>, <parameter>pid_t *</parameter>)</function>.
This function can be used to poll the status of the worker: a return
value of <literal>BGWH_NOT_YET_STARTED</> indicates that the worker has not
yet been started by the postmaster; <literal>BGWH_STOPPED</literal>
indicates that it has been started but is no longer running; and
<literal>BGWH_STARTED</literal> indicates that it is currently running.
In this last case, the PID will also be returned via the second argument.
</para>
<para>
In some cases, a process which registers a background worker may wish to
wait for the worker to start up. This can be accomplished by initializing
<structfield>bgw_notify_pid</structfield> to <literal>MyProcPid</> and
then passing the <type>BackgroundWorkerHandle *</type> obtained at
registration time to
<function>WaitForBackgroundWorkerStartup(<parameter>BackgroundWorkerHandle
*handle</parameter>, <parameter>pid_t *</parameter>)</function> function.
This function will block until the postmaster has attempted to start the
background worker, or until the postmaster dies. If the background runner
is running, the return value will <literal>BGWH_STARTED</>, and
the PID will be written to the provided address. Otherwise, the return
value will be <literal>BGWH_STOPPED</literal> or
<literal>BGWH_POSTMASTER_DIED</literal>.
</para>
<para>
The <filename>worker_spi</> contrib module contains a working example,
which demonstrates some useful techniques.
......
......@@ -207,8 +207,6 @@ typedef struct QueueBackendStatus
QueuePosition pos; /* backend has read queue up to here */
} QueueBackendStatus;
#define InvalidPid (-1)
/*
* Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff)
*
......
......@@ -18,6 +18,7 @@
#include "miscadmin.h"
#include "libpq/pqsignal.h"
#include "postmaster/bgworker_internals.h"
#include "postmaster/postmaster.h"
#include "storage/barrier.h"
#include "storage/ipc.h"
#include "storage/latch.h"
......@@ -66,6 +67,8 @@ slist_head BackgroundWorkerList = SLIST_STATIC_INIT(BackgroundWorkerList);
typedef struct BackgroundWorkerSlot
{
bool in_use;
pid_t pid; /* InvalidPid = not started yet; 0 = dead */
uint64 generation; /* incremented when slot is recycled */
BackgroundWorker worker;
} BackgroundWorkerSlot;
......@@ -75,6 +78,12 @@ typedef struct BackgroundWorkerArray
BackgroundWorkerSlot slot[FLEXIBLE_ARRAY_MEMBER];
} BackgroundWorkerArray;
struct BackgroundWorkerHandle
{
int slot;
uint64 generation;
};
BackgroundWorkerArray *BackgroundWorkerData;
/*
......@@ -125,7 +134,10 @@ BackgroundWorkerShmemInit(void)
rw = slist_container(RegisteredBgWorker, rw_lnode, siter.cur);
Assert(slotno < max_worker_processes);
slot->in_use = true;
slot->pid = InvalidPid;
slot->generation = 0;
rw->rw_shmem_slot = slotno;
rw->rw_worker.bgw_notify_pid = 0; /* might be reinit after crash */
memcpy(&slot->worker, &rw->rw_worker, sizeof(BackgroundWorker));
++slotno;
}
......@@ -244,7 +256,7 @@ BackgroundWorkerStateChange(void)
slot->worker.bgw_function_name, BGW_MAXLEN);
/*
* Copy remaining fields.
* Copy various fixed-size fields.
*
* flags, start_time, and restart_time are examined by the
* postmaster, but nothing too bad will happen if they are
......@@ -257,6 +269,23 @@ BackgroundWorkerStateChange(void)
rw->rw_worker.bgw_main = slot->worker.bgw_main;
rw->rw_worker.bgw_main_arg = slot->worker.bgw_main_arg;
/*
* Copy the PID to be notified about state changes, but only if
* the postmaster knows about a backend with that PID. It isn't
* an error if the postmaster doesn't know about the PID, because
* the backend that requested the worker could have died (or been
* killed) just after doing so. Nonetheless, at least until we get
* some experience with how this plays out in the wild, log a message
* at a relative high debug level.
*/
rw->rw_worker.bgw_notify_pid = slot->worker.bgw_notify_pid;
if (!PostmasterMarkPIDForWorkerNotify(rw->rw_worker.bgw_notify_pid))
{
elog(DEBUG1, "worker notification PID %u is not valid",
rw->rw_worker.bgw_notify_pid);
rw->rw_worker.bgw_notify_pid = 0;
}
/* Initialize postmaster bookkeeping. */
rw->rw_backend = NULL;
rw->rw_pid = 0;
......@@ -302,6 +331,44 @@ ForgetBackgroundWorker(slist_mutable_iter *cur)
free(rw);
}
/*
* Report the PID of a newly-launched background worker in shared memory.
*
* This function should only be called from the postmaster.
*/
void
ReportBackgroundWorkerPID(RegisteredBgWorker *rw)
{
BackgroundWorkerSlot *slot;
Assert(rw->rw_shmem_slot < max_worker_processes);
slot = &BackgroundWorkerData->slot[rw->rw_shmem_slot];
slot->pid = rw->rw_pid;
if (rw->rw_worker.bgw_notify_pid != 0)
kill(rw->rw_worker.bgw_notify_pid, SIGUSR1);
}
/*
* Cancel SIGUSR1 notifications for a PID belonging to an exiting backend.
*
* This function should only be called from the postmaster.
*/
void
BackgroundWorkerStopNotifications(pid_t pid)
{
slist_iter siter;
slist_foreach(siter, &BackgroundWorkerList)
{
RegisteredBgWorker *rw;
rw = slist_container(RegisteredBgWorker, rw_lnode, siter.cur);
if (rw->rw_worker.bgw_notify_pid == pid)
rw->rw_worker.bgw_notify_pid = 0;
}
}
#ifdef EXEC_BACKEND
/*
* In EXEC_BACKEND mode, workers use this to retrieve their details from
......@@ -602,6 +669,15 @@ RegisterBackgroundWorker(BackgroundWorker *worker)
if (!SanityCheckBackgroundWorker(worker, LOG))
return;
if (worker->bgw_notify_pid != 0)
{
ereport(LOG,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("background worker \"%s\": only dynamic background workers can request notification",
worker->bgw_name)));
return;
}
/*
* Enforce maximum number of workers. Note this is overly restrictive: we
* could allow more non-shmem-connected workers, because these don't count
......@@ -647,12 +723,18 @@ RegisterBackgroundWorker(BackgroundWorker *worker)
*
* Returns true on success and false on failure. Failure typically indicates
* that no background worker slots are currently available.
*
* If handle != NULL, we'll set *handle to a pointer that can subsequently
* be used as an argument to GetBackgroundWorkerPid(). The caller can
* free this pointer using pfree(), if desired.
*/
bool
RegisterDynamicBackgroundWorker(BackgroundWorker *worker)
RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
BackgroundWorkerHandle **handle)
{
int slotno;
bool success = false;
uint64 generation;
/*
* We can't register dynamic background workers from the postmaster.
......@@ -680,6 +762,9 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker)
if (!slot->in_use)
{
memcpy(&slot->worker, worker, sizeof(BackgroundWorker));
slot->pid = InvalidPid; /* indicates not started yet */
slot->generation++;
generation = slot->generation;
/*
* Make sure postmaster doesn't see the slot as in use before
......@@ -699,5 +784,122 @@ RegisterDynamicBackgroundWorker(BackgroundWorker *worker)
if (success)
SendPostmasterSignal(PMSIGNAL_BACKGROUND_WORKER_CHANGE);
/*
* If we found a slot and the user has provided a handle, initialize it.
*/
if (success && handle)
{
*handle = palloc(sizeof(BackgroundWorkerHandle));
(*handle)->slot = slotno;
(*handle)->generation = generation;
}
return success;
}
/*
* Get the PID of a dynamically-registered background worker.
*
* If the worker is determined to be running, the return value will be
* BGWH_STARTED and *pidp will get the PID of the worker process.
* Otherwise, the return value will be BGWH_NOT_YET_STARTED if the worker
* hasn't been started yet, and BGWH_STOPPED if the worker was previously
* running but is no longer.
*
* In the latter case, the worker may be stopped temporarily (if it is
* configured for automatic restart, or if it exited with code 0) or gone
* for good (if it is configured not to restart and exited with code 1).
*/
BgwHandleStatus
GetBackgroundWorkerPid(BackgroundWorkerHandle *handle, pid_t *pidp)
{
BackgroundWorkerSlot *slot;
pid_t pid;
Assert(handle->slot < max_worker_processes);
slot = &BackgroundWorkerData->slot[handle->slot];
/*
* We could probably arrange to synchronize access to data using
* memory barriers only, but for now, let's just keep it simple and
* grab the lock. It seems unlikely that there will be enough traffic
* here to result in meaningful contention.
*/
LWLockAcquire(BackgroundWorkerLock, LW_SHARED);
/*
* The generation number can't be concurrently changed while we hold the
* lock. The pid, which is updated by the postmaster, can change at any
* time, but we assume such changes are atomic. So the value we read
* won't be garbage, but it might be out of date by the time the caller
* examines it (but that's unavoidable anyway).
*/
if (handle->generation != slot->generation)
pid = 0;
else
pid = slot->pid;
/* All done. */
LWLockRelease(BackgroundWorkerLock);
if (pid == 0)
return BGWH_STOPPED;
else if (pid == InvalidPid)
return BGWH_NOT_YET_STARTED;
*pidp = pid;
return BGWH_STARTED;
}
/*
* Wait for a background worker to start up.
*
* This is like GetBackgroundWorkerPid(), except that if the worker has not
* yet started, we wait for it to do so; thus, BGWH_NOT_YET_STARTED is never
* returned. However, if the postmaster has died, we give up and return
* BGWH_POSTMASTER_DIED, since it that case we know that startup will not
* take place.
*/
BgwHandleStatus
WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *handle, pid_t *pidp)
{
BgwHandleStatus status;
pid_t pid;
int rc;
bool save_set_latch_on_sigusr1;
save_set_latch_on_sigusr1 = set_latch_on_sigusr1;
set_latch_on_sigusr1 = true;
PG_TRY();
{
for (;;)
{
CHECK_FOR_INTERRUPTS();
status = GetBackgroundWorkerPid(handle, &pid);
if (status != BGWH_NOT_YET_STARTED)
break;
rc = WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
if (rc & WL_POSTMASTER_DEATH)
{
status = BGWH_POSTMASTER_DIED;
break;
}
ResetLatch(&MyProc->procLatch);
}
}
PG_CATCH();
{
set_latch_on_sigusr1 = save_set_latch_on_sigusr1;
PG_RE_THROW();
}
PG_END_TRY();
set_latch_on_sigusr1 = save_set_latch_on_sigusr1;
*pidp = pid;
return status;
}
......@@ -170,6 +170,7 @@ typedef struct bkend
*/
int bkend_type;
bool dead_end; /* is it going to send an error and quit? */
bool bgworker_notify; /* gets bgworker start/stop notifications */
dlist_node elem; /* list link in BackendList */
} Backend;
......@@ -2877,11 +2878,20 @@ CleanupBackgroundWorker(int pid,
#ifdef EXEC_BACKEND
ShmemBackendArrayRemove(rw->rw_backend);
#endif
/*
* It's possible that this background worker started some OTHER
* background worker and asked to be notified when that worker
* started or stopped. If so, cancel any notifications destined
* for the now-dead backend.
*/
if (rw->rw_backend->bgworker_notify)
BackgroundWorkerStopNotifications(rw->rw_pid);
free(rw->rw_backend);
rw->rw_backend = NULL;
}
rw->rw_pid = 0;
rw->rw_child_slot = 0;
ReportBackgroundWorkerPID(rw); /* report child death */
LogChildExit(LOG, namebuf, pid, exitstatus);
......@@ -2955,6 +2965,18 @@ CleanupBackend(int pid,
ShmemBackendArrayRemove(bp);
#endif
}
if (bp->bgworker_notify)
{
/*
* This backend may have been slated to receive SIGUSR1
* when some background worker started or stopped. Cancel
* those notifications, as we don't want to signal PIDs that
* are not PostgreSQL backends. This gets skipped in the
* (probably very common) case where the backend has never
* requested any such notifications.
*/
BackgroundWorkerStopNotifications(bp->pid);
}
dlist_delete(iter.cur);
free(bp);
break;
......@@ -3018,6 +3040,7 @@ HandleChildCrash(int pid, int exitstatus, const char *procname)
rw->rw_pid = 0;
rw->rw_child_slot = 0;
/* don't reset crashed_at */
/* don't report child stop, either */
/* Keep looping so we can signal remaining workers */
}
else
......@@ -3712,6 +3735,9 @@ BackendStartup(Port *port)
else
bn->child_slot = 0;
/* Hasn't asked to be notified about any bgworkers yet */
bn->bgworker_notify = false;
#ifdef EXEC_BACKEND
pid = backend_forkexec(port);
#else /* !EXEC_BACKEND */
......@@ -5122,6 +5148,7 @@ StartAutovacuumWorker(void)
/* Autovac workers are not dead_end and need a child slot */
bn->dead_end = false;
bn->child_slot = MyPMChildSlot = AssignPostmasterChildSlot();
bn->bgworker_notify = false;
bn->pid = StartAutoVacWorker();
if (bn->pid > 0)
......@@ -5318,6 +5345,7 @@ do_start_bgworker(RegisteredBgWorker *rw)
rw->rw_pid = worker_pid;
if (rw->rw_backend)
rw->rw_backend->pid = rw->rw_pid;
ReportBackgroundWorkerPID(rw);
}
}
......@@ -5400,6 +5428,7 @@ assign_backendlist_entry(RegisteredBgWorker *rw)
bn->child_slot = MyPMChildSlot = AssignPostmasterChildSlot();
bn->bkend_type = BACKEND_TYPE_BGWORKER;
bn->dead_end = false;
bn->bgworker_notify = false;
rw->rw_backend = bn;
rw->rw_child_slot = bn->child_slot;
......@@ -5510,6 +5539,29 @@ maybe_start_bgworker(void)
StartWorkerNeeded = false;
}
/*
* When a backend asks to be notified about worker state changes, we
* set a flag in its backend entry. The background worker machinery needs
* to know when such backends exit.
*/
bool
PostmasterMarkPIDForWorkerNotify(int pid)
{
dlist_iter iter;
Backend *bp;
dlist_foreach(iter, &BackendList)
{
bp = dlist_container(Backend, elem, iter.cur);
if (bp->pid == pid)
{
bp->bgworker_notify = true;
return true;
}
}
return false;
}
#ifdef EXEC_BACKEND
/*
......
......@@ -21,6 +21,7 @@
#include "miscadmin.h"
#include "storage/latch.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
......@@ -57,6 +58,14 @@ typedef struct
*/
#define NumProcSignalSlots (MaxBackends + NUM_AUXPROCTYPES)
/*
* If this flag is set, the process latch will be set whenever SIGUSR1
* is received. This is useful when waiting for a signal from the postmaster.
* Spurious wakeups must be expected. Make sure that the flag is cleared
* in the error path.
*/
bool set_latch_on_sigusr1;
static ProcSignalSlot *ProcSignalSlots = NULL;
static volatile ProcSignalSlot *MyProcSignalSlot = NULL;
......@@ -276,6 +285,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
if (set_latch_on_sigusr1)
SetLatch(&MyProc->procLatch);
latch_sigusr1_handler();
errno = save_errno;
......
......@@ -28,6 +28,8 @@
#define PG_BACKEND_VERSIONSTR "postgres (PostgreSQL) " PG_VERSION "\n"
#define InvalidPid (-1)
/*****************************************************************************
* System interrupt and critical section handling
......
......@@ -80,13 +80,32 @@ typedef struct BackgroundWorker
char bgw_library_name[BGW_MAXLEN]; /* only if bgw_main is NULL */
char bgw_function_name[BGW_MAXLEN]; /* only if bgw_main is NULL */
Datum bgw_main_arg;
pid_t bgw_notify_pid; /* SIGUSR1 this backend on start/stop */
} BackgroundWorker;
typedef enum BgwHandleStatus
{
BGWH_STARTED, /* worker is running */
BGWH_NOT_YET_STARTED, /* worker hasn't been started yet */
BGWH_STOPPED, /* worker has exited */
BGWH_POSTMASTER_DIED /* postmaster died; worker status unclear */
} BgwHandleStatus;
struct BackgroundWorkerHandle;
typedef struct BackgroundWorkerHandle BackgroundWorkerHandle;
/* Register a new bgworker during shared_preload_libraries */
extern void RegisterBackgroundWorker(BackgroundWorker *worker);
/* Register a new bgworker from a regular backend */
extern bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker);
extern bool RegisterDynamicBackgroundWorker(BackgroundWorker *worker,
BackgroundWorkerHandle **handle);
/* Query the status of a bgworker */
extern BgwHandleStatus GetBackgroundWorkerPid(BackgroundWorkerHandle *handle,
pid_t *pidp);
extern BgwHandleStatus WaitForBackgroundWorkerStartup(BackgroundWorkerHandle *
handle, pid_t *pid);
/* This is valid in a running worker */
extern BackgroundWorker *MyBgworkerEntry;
......
......@@ -40,6 +40,8 @@ extern Size BackgroundWorkerShmemSize(void);
extern void BackgroundWorkerShmemInit(void);
extern void BackgroundWorkerStateChange(void);
extern void ForgetBackgroundWorker(slist_mutable_iter *cur);
extern void ReportBackgroundWorkerPID(RegisteredBgWorker *);
extern void BackgroundWorkerStopNotifications(pid_t pid);
/* Function to start a background worker, called from postmaster.c */
extern void StartBackgroundWorker(void);
......
......@@ -52,6 +52,7 @@ extern void ClosePostmasterPorts(bool am_syslogger);
extern int MaxLivePostmasterChildren(void);
extern int GetNumShmemAttachedBgworkers(void);
extern bool PostmasterMarkPIDForWorkerNotify(int);
#ifdef EXEC_BACKEND
extern pid_t postmaster_forkexec(int argc, char *argv[]);
......
......@@ -54,5 +54,6 @@ extern int SendProcSignal(pid_t pid, ProcSignalReason reason,
BackendId backendId);
extern void procsignal_sigusr1_handler(SIGNAL_ARGS);
extern bool set_latch_on_sigusr1;
#endif /* PROCSIGNAL_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment