Commit 4dab3d5a authored by Tom Lane's avatar Tom Lane

Change the autovacuum launcher to use WaitLatch instead of a poll loop.

In pursuit of this (and with the expectation that WaitLatch will be needed
in more places), convert the latch field that was already added to PGPROC
for sync rep into a generic latch that is activated for all PGPROC-owning
processes, and change many of the standard backend signal handlers to set
that latch when a signal happens.  This will allow WaitLatch callers to be
wakened properly by these signals.

In passing, fix a whole bunch of signal handlers that had been hacked to do
things that might change errno, without adding the necessary save/restore
logic for errno.  Also make some minor fixes in unix_latch.c, and clean
up bizarre and unsafe scheme for disowning the process's latch.  Much of
this has to be back-patched into 9.1.

Peter Geoghegan, with additional work by Tom
parent 1f1b70a7
...@@ -9957,34 +9957,50 @@ startupproc_quickdie(SIGNAL_ARGS) ...@@ -9957,34 +9957,50 @@ startupproc_quickdie(SIGNAL_ARGS)
static void static void
StartupProcSigUsr1Handler(SIGNAL_ARGS) StartupProcSigUsr1Handler(SIGNAL_ARGS)
{ {
int save_errno = errno;
latch_sigusr1_handler(); latch_sigusr1_handler();
errno = save_errno;
} }
/* SIGUSR2: set flag to finish recovery */ /* SIGUSR2: set flag to finish recovery */
static void static void
StartupProcTriggerHandler(SIGNAL_ARGS) StartupProcTriggerHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
promote_triggered = true; promote_triggered = true;
WakeupRecovery(); WakeupRecovery();
errno = save_errno;
} }
/* SIGHUP: set flag to re-read config file at next convenient time */ /* SIGHUP: set flag to re-read config file at next convenient time */
static void static void
StartupProcSigHupHandler(SIGNAL_ARGS) StartupProcSigHupHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
got_SIGHUP = true; got_SIGHUP = true;
WakeupRecovery(); WakeupRecovery();
errno = save_errno;
} }
/* SIGTERM: set flag to abort redo and exit */ /* SIGTERM: set flag to abort redo and exit */
static void static void
StartupProcShutdownHandler(SIGNAL_ARGS) StartupProcShutdownHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
if (in_restore_command) if (in_restore_command)
proc_exit(1); proc_exit(1);
else else
shutdown_requested = true; shutdown_requested = true;
WakeupRecovery(); WakeupRecovery();
errno = save_errno;
} }
/* Handle SIGHUP and SIGTERM signals of startup process */ /* Handle SIGHUP and SIGTERM signals of startup process */
......
...@@ -138,9 +138,12 @@ DisownLatch(volatile Latch *latch) ...@@ -138,9 +138,12 @@ DisownLatch(volatile Latch *latch)
* function returns immediately. * function returns immediately.
* *
* The 'timeout' is given in milliseconds. It must be >= 0 if WL_TIMEOUT flag * The 'timeout' is given in milliseconds. It must be >= 0 if WL_TIMEOUT flag
* is given. On some platforms, signals cause the timeout to be restarted, * is given. On some platforms, signals do not interrupt the wait, or even
* so beware that the function can sleep for several times longer than the * cause the timeout to be restarted, so beware that the function can sleep
* specified timeout. * for several times longer than the requested timeout. However, this
* difficulty is not so great as it seems, because the signal handlers for any
* signals that the caller should respond to ought to be programmed to end the
* wait by calling SetLatch. Ideally, the timeout parameter is vestigial.
* *
* The latch must be owned by the current process, ie. it must be a * The latch must be owned by the current process, ie. it must be a
* backend-local latch initialized with InitLatch, or a shared latch * backend-local latch initialized with InitLatch, or a shared latch
...@@ -261,6 +264,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, ...@@ -261,6 +264,7 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
{ {
if (errno == EINTR) if (errno == EINTR)
continue; continue;
waiting = false;
ereport(ERROR, ereport(ERROR,
(errcode_for_socket_access(), (errcode_for_socket_access(),
errmsg("select() failed: %m"))); errmsg("select() failed: %m")));
...@@ -294,6 +298,10 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, ...@@ -294,6 +298,10 @@ WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock,
* Sets a latch and wakes up anyone waiting on it. * Sets a latch and wakes up anyone waiting on it.
* *
* This is cheap if the latch is already set, otherwise not so much. * This is cheap if the latch is already set, otherwise not so much.
*
* NB: when calling this in a signal handler, be sure to save and restore
* errno around it. (That's standard practice in most signal handlers, of
* course, but we used to omit it in handlers that only set a flag.)
*/ */
void void
SetLatch(volatile Latch *latch) SetLatch(volatile Latch *latch)
...@@ -339,7 +347,10 @@ SetLatch(volatile Latch *latch) ...@@ -339,7 +347,10 @@ SetLatch(volatile Latch *latch)
if (owner_pid == 0) if (owner_pid == 0)
return; return;
else if (owner_pid == MyProcPid) else if (owner_pid == MyProcPid)
sendSelfPipeByte(); {
if (waiting)
sendSelfPipeByte();
}
else else
kill(owner_pid, SIGUSR1); kill(owner_pid, SIGUSR1);
} }
...@@ -371,7 +382,11 @@ ResetLatch(volatile Latch *latch) ...@@ -371,7 +382,11 @@ ResetLatch(volatile Latch *latch)
* SetLatch uses SIGUSR1 to wake up the process waiting on the latch. * SetLatch uses SIGUSR1 to wake up the process waiting on the latch.
* *
* Wake up WaitLatch, if we're waiting. (We might not be, since SIGUSR1 is * Wake up WaitLatch, if we're waiting. (We might not be, since SIGUSR1 is
* overloaded for multiple purposes.) * overloaded for multiple purposes; or we might not have reached WaitLatch
* yet, in which case we don't need to fill the pipe either.)
*
* NB: when calling this in a signal handler, be sure to save and restore
* errno around it.
*/ */
void void
latch_sigusr1_handler(void) latch_sigusr1_handler(void)
...@@ -435,13 +450,19 @@ retry: ...@@ -435,13 +450,19 @@ retry:
} }
} }
/* Read all available data from the self-pipe */ /*
* Read all available data from the self-pipe
*
* Note: this is only called when waiting = true. If it fails and doesn't
* return, it must reset that flag first (though ideally, this will never
* happen).
*/
static void static void
drainSelfPipe(void) drainSelfPipe(void)
{ {
/* /*
* There shouldn't normally be more than one byte in the pipe, or maybe a * There shouldn't normally be more than one byte in the pipe, or maybe a
* few more if multiple processes run SetLatch at the same instant. * few bytes if multiple processes run SetLatch at the same instant.
*/ */
char buf[16]; char buf[16];
int rc; int rc;
...@@ -456,9 +477,21 @@ drainSelfPipe(void) ...@@ -456,9 +477,21 @@ drainSelfPipe(void)
else if (errno == EINTR) else if (errno == EINTR)
continue; /* retry */ continue; /* retry */
else else
{
waiting = false;
elog(ERROR, "read() on self-pipe failed: %m"); elog(ERROR, "read() on self-pipe failed: %m");
}
} }
else if (rc == 0) else if (rc == 0)
{
waiting = false;
elog(ERROR, "unexpected EOF on self-pipe"); elog(ERROR, "unexpected EOF on self-pipe");
}
else if (rc < sizeof(buf))
{
/* we successfully drained the pipe; no need to read() again */
break;
}
/* else buffer wasn't big enough, so read again */
} }
} }
...@@ -84,6 +84,7 @@ ...@@ -84,6 +84,7 @@
#include "postmaster/postmaster.h" #include "postmaster/postmaster.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/pmsignal.h" #include "storage/pmsignal.h"
#include "storage/proc.h" #include "storage/proc.h"
#include "storage/procsignal.h" #include "storage/procsignal.h"
...@@ -553,11 +554,11 @@ AutoVacLauncherMain(int argc, char *argv[]) ...@@ -553,11 +554,11 @@ AutoVacLauncherMain(int argc, char *argv[])
Dlelem *elem; Dlelem *elem;
/* /*
* Emergency bailout if postmaster has died. This is to avoid the * This loop is a bit different from the normal use of WaitLatch,
* necessity for manual cleanup of all postmaster children. * because we'd like to sleep before the first launch of a child
* process. So it's WaitLatch, then ResetLatch, then check for
* wakening conditions.
*/ */
if (!PostmasterIsAlive())
proc_exit(1);
launcher_determine_sleep((AutoVacuumShmem->av_freeWorkers != NULL), launcher_determine_sleep((AutoVacuumShmem->av_freeWorkers != NULL),
false, &nap); false, &nap);
...@@ -566,42 +567,24 @@ AutoVacLauncherMain(int argc, char *argv[]) ...@@ -566,42 +567,24 @@ AutoVacLauncherMain(int argc, char *argv[])
EnableCatchupInterrupt(); EnableCatchupInterrupt();
/* /*
* Sleep for a while according to schedule. * Wait until naptime expires or we get some type of signal (all the
* * signal handlers will wake us by calling SetLatch).
* On some platforms, signals won't interrupt the sleep. To ensure we
* respond reasonably promptly when someone signals us, break down the
* sleep into 1-second increments, and check for interrupts after each
* nap.
*/ */
while (nap.tv_sec > 0 || nap.tv_usec > 0) WaitLatch(&MyProc->procLatch,
{ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
uint32 sleeptime; (nap.tv_sec * 1000L) + (nap.tv_usec / 1000L));
if (nap.tv_sec > 0)
{
sleeptime = 1000000;
nap.tv_sec--;
}
else
{
sleeptime = nap.tv_usec;
nap.tv_usec = 0;
}
pg_usleep(sleeptime);
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
*/
if (!PostmasterIsAlive())
proc_exit(1);
if (got_SIGTERM || got_SIGHUP || got_SIGUSR2) ResetLatch(&MyProc->procLatch);
break;
}
DisableCatchupInterrupt(); DisableCatchupInterrupt();
/*
* Emergency bailout if postmaster has died. This is to avoid the
* necessity for manual cleanup of all postmaster children.
*/
if (!PostmasterIsAlive())
proc_exit(1);
/* the normal shutdown case */ /* the normal shutdown case */
if (got_SIGTERM) if (got_SIGTERM)
break; break;
...@@ -1321,21 +1304,39 @@ AutoVacWorkerFailed(void) ...@@ -1321,21 +1304,39 @@ AutoVacWorkerFailed(void)
static void static void
avl_sighup_handler(SIGNAL_ARGS) avl_sighup_handler(SIGNAL_ARGS)
{ {
int save_errno = errno;
got_SIGHUP = true; got_SIGHUP = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
} }
/* SIGUSR2: a worker is up and running, or just finished, or failed to fork */ /* SIGUSR2: a worker is up and running, or just finished, or failed to fork */
static void static void
avl_sigusr2_handler(SIGNAL_ARGS) avl_sigusr2_handler(SIGNAL_ARGS)
{ {
int save_errno = errno;
got_SIGUSR2 = true; got_SIGUSR2 = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
} }
/* SIGTERM: time to die */ /* SIGTERM: time to die */
static void static void
avl_sigterm_handler(SIGNAL_ARGS) avl_sigterm_handler(SIGNAL_ARGS)
{ {
int save_errno = errno;
got_SIGTERM = true; got_SIGTERM = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
} }
......
...@@ -288,16 +288,21 @@ pgarch_exit(SIGNAL_ARGS) ...@@ -288,16 +288,21 @@ pgarch_exit(SIGNAL_ARGS)
static void static void
ArchSigHupHandler(SIGNAL_ARGS) ArchSigHupHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
/* set flag to re-read config file at next convenient time */ /* set flag to re-read config file at next convenient time */
got_SIGHUP = true; got_SIGHUP = true;
/* let the waiting loop iterate */
SetLatch(&mainloop_latch); SetLatch(&mainloop_latch);
errno = save_errno;
} }
/* SIGTERM signal handler for archiver process */ /* SIGTERM signal handler for archiver process */
static void static void
ArchSigTermHandler(SIGNAL_ARGS) ArchSigTermHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
/* /*
* The postmaster never sends us SIGTERM, so we assume that this means * The postmaster never sends us SIGTERM, so we assume that this means
* that init is trying to shut down the whole system. If we hang around * that init is trying to shut down the whole system. If we hang around
...@@ -305,28 +310,35 @@ ArchSigTermHandler(SIGNAL_ARGS) ...@@ -305,28 +310,35 @@ ArchSigTermHandler(SIGNAL_ARGS)
* archive commands. * archive commands.
*/ */
got_SIGTERM = true; got_SIGTERM = true;
/* let the waiting loop iterate */
SetLatch(&mainloop_latch); SetLatch(&mainloop_latch);
errno = save_errno;
} }
/* SIGUSR1 signal handler for archiver process */ /* SIGUSR1 signal handler for archiver process */
static void static void
pgarch_waken(SIGNAL_ARGS) pgarch_waken(SIGNAL_ARGS)
{ {
int save_errno = errno;
/* set flag that there is work to be done */ /* set flag that there is work to be done */
wakened = true; wakened = true;
/* let the waiting loop iterate */
SetLatch(&mainloop_latch); SetLatch(&mainloop_latch);
errno = save_errno;
} }
/* SIGUSR2 signal handler for archiver process */ /* SIGUSR2 signal handler for archiver process */
static void static void
pgarch_waken_stop(SIGNAL_ARGS) pgarch_waken_stop(SIGNAL_ARGS)
{ {
int save_errno = errno;
/* set flag to do a final cycle and shut down afterwards */ /* set flag to do a final cycle and shut down afterwards */
ready_to_stop = true; ready_to_stop = true;
/* let the waiting loop iterate */
SetLatch(&mainloop_latch); SetLatch(&mainloop_latch);
errno = save_errno;
} }
/* /*
......
...@@ -111,9 +111,6 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) ...@@ -111,9 +111,6 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks))); Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
Assert(WalSndCtl != NULL); Assert(WalSndCtl != NULL);
/* Reset the latch before adding ourselves to the queue. */
ResetLatch(&MyProc->waitLatch);
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING); Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);
...@@ -167,7 +164,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) ...@@ -167,7 +164,7 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
int syncRepState; int syncRepState;
/* Must reset the latch before testing state. */ /* Must reset the latch before testing state. */
ResetLatch(&MyProc->waitLatch); ResetLatch(&MyProc->procLatch);
/* /*
* Try checking the state without the lock first. There's no * Try checking the state without the lock first. There's no
...@@ -247,11 +244,10 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) ...@@ -247,11 +244,10 @@ SyncRepWaitForLSN(XLogRecPtr XactCommitLSN)
} }
/* /*
* Wait on latch for up to 60 seconds. This allows us to check for * Wait on latch. Any condition that should wake us up will set
* cancel/die signal or postmaster death regularly while waiting. Note * the latch, so no need for timeout.
* that timeout here does not necessarily release from loop.
*/ */
WaitLatch(&MyProc->waitLatch, WL_LATCH_SET | WL_TIMEOUT, 60000L); WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1);
} }
/* /*
...@@ -322,7 +318,7 @@ SyncRepCancelWait(void) ...@@ -322,7 +318,7 @@ SyncRepCancelWait(void)
} }
void void
SyncRepCleanupAtProcExit(int code, Datum arg) SyncRepCleanupAtProcExit(void)
{ {
if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
{ {
...@@ -330,8 +326,6 @@ SyncRepCleanupAtProcExit(int code, Datum arg) ...@@ -330,8 +326,6 @@ SyncRepCleanupAtProcExit(int code, Datum arg)
SHMQueueDelete(&(MyProc->syncRepLinks)); SHMQueueDelete(&(MyProc->syncRepLinks));
LWLockRelease(SyncRepLock); LWLockRelease(SyncRepLock);
} }
DisownLatch(&MyProc->waitLatch);
} }
/* /*
...@@ -567,9 +561,7 @@ SyncRepWakeQueue(bool all) ...@@ -567,9 +561,7 @@ SyncRepWakeQueue(bool all)
/* /*
* Wake only when we have set state and removed from queue. * Wake only when we have set state and removed from queue.
*/ */
Assert(SHMQueueIsDetached(&(thisproc->syncRepLinks))); SetLatch(&(thisproc->procLatch));
Assert(thisproc->syncRepState == SYNC_REP_WAIT_COMPLETE);
SetLatch(&(thisproc->waitLatch));
numprocs++; numprocs++;
} }
......
...@@ -374,11 +374,15 @@ WalRcvSigHupHandler(SIGNAL_ARGS) ...@@ -374,11 +374,15 @@ WalRcvSigHupHandler(SIGNAL_ARGS)
static void static void
WalRcvShutdownHandler(SIGNAL_ARGS) WalRcvShutdownHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
got_SIGTERM = true; got_SIGTERM = true;
/* Don't joggle the elbow of proc_exit */ /* Don't joggle the elbow of proc_exit */
if (!proc_exit_inprogress && WalRcvImmediateInterruptOK) if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
ProcessWalRcvInterrupts(); ProcessWalRcvInterrupts();
errno = save_errno;
} }
/* /*
......
...@@ -1249,18 +1249,26 @@ WalSndRqstFileReload(void) ...@@ -1249,18 +1249,26 @@ WalSndRqstFileReload(void)
static void static void
WalSndSigHupHandler(SIGNAL_ARGS) WalSndSigHupHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
got_SIGHUP = true; got_SIGHUP = true;
if (MyWalSnd) if (MyWalSnd)
SetLatch(&MyWalSnd->latch); SetLatch(&MyWalSnd->latch);
errno = save_errno;
} }
/* SIGTERM: set flag to shut down */ /* SIGTERM: set flag to shut down */
static void static void
WalSndShutdownHandler(SIGNAL_ARGS) WalSndShutdownHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
walsender_shutdown_requested = true; walsender_shutdown_requested = true;
if (MyWalSnd) if (MyWalSnd)
SetLatch(&MyWalSnd->latch); SetLatch(&MyWalSnd->latch);
errno = save_errno;
} }
/* /*
...@@ -1299,16 +1307,24 @@ WalSndQuickDieHandler(SIGNAL_ARGS) ...@@ -1299,16 +1307,24 @@ WalSndQuickDieHandler(SIGNAL_ARGS)
static void static void
WalSndXLogSendHandler(SIGNAL_ARGS) WalSndXLogSendHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
latch_sigusr1_handler(); latch_sigusr1_handler();
errno = save_errno;
} }
/* SIGUSR2: set flag to do a last cycle and shut down afterwards */ /* SIGUSR2: set flag to do a last cycle and shut down afterwards */
static void static void
WalSndLastCycleHandler(SIGNAL_ARGS) WalSndLastCycleHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
walsender_ready_to_stop = true; walsender_ready_to_stop = true;
if (MyWalSnd) if (MyWalSnd)
SetLatch(&MyWalSnd->latch); SetLatch(&MyWalSnd->latch);
errno = save_errno;
} }
/* Set up signal handlers */ /* Set up signal handlers */
......
...@@ -196,9 +196,11 @@ InitProcGlobal(void) ...@@ -196,9 +196,11 @@ InitProcGlobal(void)
for (i = 0; i < TotalProcs; i++) for (i = 0; i < TotalProcs; i++)
{ {
/* Common initialization for all PGPROCs, regardless of type. */ /* Common initialization for all PGPROCs, regardless of type. */
/* Set up per-PGPROC semaphore, latch, and backendLock */
PGSemaphoreCreate(&(procs[i].sem)); PGSemaphoreCreate(&(procs[i].sem));
InitSharedLatch(&(procs[i].procLatch));
procs[i].backendLock = LWLockAssign(); procs[i].backendLock = LWLockAssign();
InitSharedLatch(&procs[i].waitLatch);
/* /*
* Newly created PGPROCs for normal backends or for autovacuum must * Newly created PGPROCs for normal backends or for autovacuum must
...@@ -300,8 +302,8 @@ InitProcess(void) ...@@ -300,8 +302,8 @@ InitProcess(void)
MarkPostmasterChildActive(); MarkPostmasterChildActive();
/* /*
* Initialize all fields of MyProc, except for the semaphore which was * Initialize all fields of MyProc, except for the semaphore and latch,
* prepared for us by InitProcGlobal. * which were prepared for us by InitProcGlobal.
*/ */
SHMQueueElemInit(&(MyProc->links)); SHMQueueElemInit(&(MyProc->links));
MyProc->waitStatus = STATUS_OK; MyProc->waitStatus = STATUS_OK;
...@@ -327,12 +329,17 @@ InitProcess(void) ...@@ -327,12 +329,17 @@ InitProcess(void)
SHMQueueInit(&(MyProc->myProcLocks[i])); SHMQueueInit(&(MyProc->myProcLocks[i]));
MyProc->recoveryConflictPending = false; MyProc->recoveryConflictPending = false;
/* Initialise for sync rep */ /* Initialize fields for sync rep */
MyProc->waitLSN.xlogid = 0; MyProc->waitLSN.xlogid = 0;
MyProc->waitLSN.xrecoff = 0; MyProc->waitLSN.xrecoff = 0;
MyProc->syncRepState = SYNC_REP_NOT_WAITING; MyProc->syncRepState = SYNC_REP_NOT_WAITING;
SHMQueueElemInit(&(MyProc->syncRepLinks)); SHMQueueElemInit(&(MyProc->syncRepLinks));
OwnLatch(&MyProc->waitLatch);
/*
* Acquire ownership of the PGPROC's latch, so that we can use WaitLatch.
* Note that there's no particular need to do ResetLatch here.
*/
OwnLatch(&MyProc->procLatch);
/* /*
* We might be reusing a semaphore that belonged to a failed process. So * We might be reusing a semaphore that belonged to a failed process. So
...@@ -373,7 +380,6 @@ InitProcessPhase2(void) ...@@ -373,7 +380,6 @@ InitProcessPhase2(void)
/* /*
* Arrange to clean that up at backend exit. * Arrange to clean that up at backend exit.
*/ */
on_shmem_exit(SyncRepCleanupAtProcExit, 0);
on_shmem_exit(RemoveProcFromArray, 0); on_shmem_exit(RemoveProcFromArray, 0);
} }
...@@ -448,8 +454,8 @@ InitAuxiliaryProcess(void) ...@@ -448,8 +454,8 @@ InitAuxiliaryProcess(void)
SpinLockRelease(ProcStructLock); SpinLockRelease(ProcStructLock);
/* /*
* Initialize all fields of MyProc, except for the semaphore which was * Initialize all fields of MyProc, except for the semaphore and latch,
* prepared for us by InitProcGlobal. * which were prepared for us by InitProcGlobal.
*/ */
SHMQueueElemInit(&(MyProc->links)); SHMQueueElemInit(&(MyProc->links));
MyProc->waitStatus = STATUS_OK; MyProc->waitStatus = STATUS_OK;
...@@ -469,6 +475,12 @@ InitAuxiliaryProcess(void) ...@@ -469,6 +475,12 @@ InitAuxiliaryProcess(void)
for (i = 0; i < NUM_LOCK_PARTITIONS; i++) for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
SHMQueueInit(&(MyProc->myProcLocks[i])); SHMQueueInit(&(MyProc->myProcLocks[i]));
/*
* Acquire ownership of the PGPROC's latch, so that we can use WaitLatch.
* Note that there's no particular need to do ResetLatch here.
*/
OwnLatch(&MyProc->procLatch);
/* /*
* We might be reusing a semaphore that belonged to a failed process. So * We might be reusing a semaphore that belonged to a failed process. So
* be careful and reinitialize its value here. (This is not strictly * be careful and reinitialize its value here. (This is not strictly
...@@ -671,6 +683,9 @@ ProcKill(int code, Datum arg) ...@@ -671,6 +683,9 @@ ProcKill(int code, Datum arg)
Assert(MyProc != NULL); Assert(MyProc != NULL);
/* Make sure we're out of the sync rep lists */
SyncRepCleanupAtProcExit();
/* /*
* Release any LW locks I am holding. There really shouldn't be any, but * Release any LW locks I am holding. There really shouldn't be any, but
* it's cheap to check again before we cut the knees off the LWLock * it's cheap to check again before we cut the knees off the LWLock
...@@ -678,6 +693,9 @@ ProcKill(int code, Datum arg) ...@@ -678,6 +693,9 @@ ProcKill(int code, Datum arg)
*/ */
LWLockReleaseAll(); LWLockReleaseAll();
/* Release ownership of the process's latch, too */
DisownLatch(&MyProc->procLatch);
SpinLockAcquire(ProcStructLock); SpinLockAcquire(ProcStructLock);
/* Return PGPROC structure (and semaphore) to appropriate freelist */ /* Return PGPROC structure (and semaphore) to appropriate freelist */
...@@ -733,6 +751,9 @@ AuxiliaryProcKill(int code, Datum arg) ...@@ -733,6 +751,9 @@ AuxiliaryProcKill(int code, Datum arg)
/* Release any LW locks I am holding (see notes above) */ /* Release any LW locks I am holding (see notes above) */
LWLockReleaseAll(); LWLockReleaseAll();
/* Release ownership of the process's latch, too */
DisownLatch(&MyProc->procLatch);
SpinLockAcquire(ProcStructLock); SpinLockAcquire(ProcStructLock);
/* Mark auxiliary proc no longer in use */ /* Mark auxiliary proc no longer in use */
...@@ -1610,6 +1631,10 @@ handle_sig_alarm(SIGNAL_ARGS) ...@@ -1610,6 +1631,10 @@ handle_sig_alarm(SIGNAL_ARGS)
{ {
int save_errno = errno; int save_errno = errno;
/* SIGALRM is cause for waking anything waiting on the process latch */
if (MyProc)
SetLatch(&MyProc->procLatch);
if (deadlock_timeout_active) if (deadlock_timeout_active)
{ {
deadlock_timeout_active = false; deadlock_timeout_active = false;
......
...@@ -2643,11 +2643,12 @@ die(SIGNAL_ARGS) ...@@ -2643,11 +2643,12 @@ die(SIGNAL_ARGS)
InterruptHoldoffCount--; InterruptHoldoffCount--;
ProcessInterrupts(); ProcessInterrupts();
} }
/* Interrupt any sync rep wait which is currently in progress. */
SetLatch(&(MyProc->waitLatch));
} }
/* If we're still here, waken anything waiting on the process latch */
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno; errno = save_errno;
} }
...@@ -2684,11 +2685,12 @@ StatementCancelHandler(SIGNAL_ARGS) ...@@ -2684,11 +2685,12 @@ StatementCancelHandler(SIGNAL_ARGS)
InterruptHoldoffCount--; InterruptHoldoffCount--;
ProcessInterrupts(); ProcessInterrupts();
} }
/* Interrupt any sync rep wait which is currently in progress. */
SetLatch(&(MyProc->waitLatch));
} }
/* If we're still here, waken anything waiting on the process latch */
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno; errno = save_errno;
} }
...@@ -2696,6 +2698,7 @@ StatementCancelHandler(SIGNAL_ARGS) ...@@ -2696,6 +2698,7 @@ StatementCancelHandler(SIGNAL_ARGS)
void void
FloatExceptionHandler(SIGNAL_ARGS) FloatExceptionHandler(SIGNAL_ARGS)
{ {
/* We're not returning, so no need to save errno */
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_FLOATING_POINT_EXCEPTION), (errcode(ERRCODE_FLOATING_POINT_EXCEPTION),
errmsg("floating-point exception"), errmsg("floating-point exception"),
...@@ -2708,7 +2711,13 @@ FloatExceptionHandler(SIGNAL_ARGS) ...@@ -2708,7 +2711,13 @@ FloatExceptionHandler(SIGNAL_ARGS)
static void static void
SigHupHandler(SIGNAL_ARGS) SigHupHandler(SIGNAL_ARGS)
{ {
int save_errno = errno;
got_SIGHUP = true; got_SIGHUP = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
} }
/* /*
......
...@@ -33,8 +33,8 @@ extern char *SyncRepStandbyNames; ...@@ -33,8 +33,8 @@ extern char *SyncRepStandbyNames;
/* called by user backend */ /* called by user backend */
extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN);
/* callback at backend exit */ /* called at backend exit */
extern void SyncRepCleanupAtProcExit(int code, Datum arg); extern void SyncRepCleanupAtProcExit(void);
/* called by wal sender */ /* called by wal sender */
extern void SyncRepInitConfig(void); extern void SyncRepInitConfig(void);
......
...@@ -82,6 +82,8 @@ struct PGPROC ...@@ -82,6 +82,8 @@ struct PGPROC
PGSemaphoreData sem; /* ONE semaphore to sleep on */ PGSemaphoreData sem; /* ONE semaphore to sleep on */
int waitStatus; /* STATUS_WAITING, STATUS_OK or STATUS_ERROR */ int waitStatus; /* STATUS_WAITING, STATUS_OK or STATUS_ERROR */
Latch procLatch; /* generic latch for process */
LocalTransactionId lxid; /* local id of top-level transaction currently LocalTransactionId lxid; /* local id of top-level transaction currently
* being executed by this proc, if running; * being executed by this proc, if running;
* else InvalidLocalTransactionId */ * else InvalidLocalTransactionId */
...@@ -132,7 +134,6 @@ struct PGPROC ...@@ -132,7 +134,6 @@ struct PGPROC
* syncRepState must not be touched except by owning process or WALSender. * syncRepState must not be touched except by owning process or WALSender.
* syncRepLinks used only while holding SyncRepLock. * syncRepLinks used only while holding SyncRepLock.
*/ */
Latch waitLatch; /* allow us to wait for sync rep */
XLogRecPtr waitLSN; /* waiting for this LSN or higher */ XLogRecPtr waitLSN; /* waiting for this LSN or higher */
int syncRepState; /* wait state for sync rep */ int syncRepState; /* wait state for sync rep */
SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */ SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment