Commit 4f85fde8 authored by Andres Freund's avatar Andres Freund

Introduce and use infrastructure for interrupt processing during client reads.

Up to now large swathes of backend code ran inside signal handlers
while reading commands from the client, to allow for speedy reaction to
asynchronous events. Most prominently shared invalidation and NOTIFY
handling. That means that complex code like the starting/stopping of
transactions is run in signal handlers...  The required code was
fragile and verbose, and is likely to contain bugs.

That approach also severely limited what could be done while
communicating with the client. As the read might be from within
openssl it wasn't safely possible to trigger an error, e.g. to cancel
a backend in idle-in-transaction state. We did that in some cases,
namely fatal errors, nonetheless.

Now that FE/BE communication in the backend employs non-blocking
sockets and latches to block, we can quite simply interrupt reads from
signal handlers by setting the latch. That allows us to signal an
interrupted read, which is supposed to be retried after returning from
within the ssl library.

As signal handlers now only need to set the latch to guarantee timely
interrupt processing, remove a fair amount of complicated & fragile
code from async.c and sinval.c.

We could now actually start to process some kinds of interrupts, like
sinval ones, more often that before, but that seems better done
separately.

This work will hopefully allow to handle cases like being blocked by
sending data, interrupting idle transactions and similar to be
implemented without too much effort.  In addition to allowing getting
rid of ImmediateInterruptOK, that is.

Author: Andres Freund
Reviewed-By: Heikki Linnakangas
parent 387da188
...@@ -79,10 +79,12 @@ ...@@ -79,10 +79,12 @@
* either, but just process the queue directly. * either, but just process the queue directly.
* *
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
* can call inbound-notify processing immediately if this backend is idle * sets the process's latch, which triggers the event to be processed
* (ie, it is waiting for a frontend command and is not within a transaction * immediately if this backend is idle (i.e., it is waiting for a frontend
* block). Otherwise the handler may only set a flag, which will cause the * command and is not within a transaction block. C.f.
* processing to occur just before we next go idle. * ProcessClientReadInterrupt()). Otherwise the handler may only set a
* flag, which will cause the processing to occur just before we next go
* idle.
* *
* Inbound-notify processing consists of reading all of the notifications * Inbound-notify processing consists of reading all of the notifications
* that have arrived since scanning last time. We read every notification * that have arrived since scanning last time. We read every notification
...@@ -126,6 +128,7 @@ ...@@ -126,6 +128,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/procarray.h" #include "storage/procarray.h"
#include "storage/procsignal.h" #include "storage/procsignal.h"
#include "storage/sinval.h" #include "storage/sinval.h"
...@@ -334,17 +337,13 @@ static List *pendingNotifies = NIL; /* list of Notifications */ ...@@ -334,17 +337,13 @@ static List *pendingNotifies = NIL; /* list of Notifications */
static List *upperPendingNotifies = NIL; /* list of upper-xact lists */ static List *upperPendingNotifies = NIL; /* list of upper-xact lists */
/* /*
* State for inbound notifications consists of two flags: one saying whether * Inbound notifications are initially processed by HandleNotifyInterrupt(),
* the signal handler is currently allowed to call ProcessIncomingNotify * called from inside a signal handler. That just sets the
* directly, and one saying whether the signal has occurred but the handler * notifyInterruptPending flag and sets the process
* was not allowed to call ProcessIncomingNotify at the time. * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
* * actually deal with the interrupt.
* NB: the "volatile" on these declarations is critical! If your compiler
* does not grok "volatile", you'd be best advised to compile this file
* with all optimization turned off.
*/ */
static volatile sig_atomic_t notifyInterruptEnabled = 0; volatile sig_atomic_t notifyInterruptPending = false;
static volatile sig_atomic_t notifyInterruptOccurred = 0;
/* True if we've registered an on_shmem_exit cleanup */ /* True if we've registered an on_shmem_exit cleanup */
static bool unlistenExitRegistered = false; static bool unlistenExitRegistered = false;
...@@ -1625,164 +1624,45 @@ AtSubAbort_Notify(void) ...@@ -1625,164 +1624,45 @@ AtSubAbort_Notify(void)
/* /*
* HandleNotifyInterrupt * HandleNotifyInterrupt
* *
* This is called when PROCSIG_NOTIFY_INTERRUPT is received. * Signal handler portion of interrupt handling. Let the backend know
* * that there's a pending notify interrupt. If we're currently reading
* If we are idle (notifyInterruptEnabled is set), we can safely invoke * from the client, this will interrupt the read and
* ProcessIncomingNotify directly. Otherwise, just set a flag * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt().
* to do it later.
*/ */
void void
HandleNotifyInterrupt(void) HandleNotifyInterrupt(void)
{ {
/* /*
* Note: this is called by a SIGNAL HANDLER. You must be very wary what * Note: this is called by a SIGNAL HANDLER. You must be very wary what
* you do here. Some helpful soul had this routine sprinkled with * you do here.
* TPRINTFs, which would likely lead to corruption of stdio buffers if
* they were ever turned on.
*/ */
/* Don't joggle the elbow of proc_exit */ /* signal that work needs to be done */
if (proc_exit_inprogress) notifyInterruptPending = true;
return;
if (notifyInterruptEnabled)
{
bool save_ImmediateInterruptOK = ImmediateInterruptOK;
/*
* We may be called while ImmediateInterruptOK is true; turn it off
* while messing with the NOTIFY state. This prevents problems if
* SIGINT or similar arrives while we're working. Just to be real
* sure, bump the interrupt holdoff counter as well. That way, even
* if something inside ProcessIncomingNotify() transiently sets
* ImmediateInterruptOK (eg while waiting on a lock), we won't get
* interrupted until we're done with the notify interrupt.
*/
ImmediateInterruptOK = false;
HOLD_INTERRUPTS();
/*
* I'm not sure whether some flavors of Unix might allow another
* SIGUSR1 occurrence to recursively interrupt this routine. To cope
* with the possibility, we do the same sort of dance that
* EnableNotifyInterrupt must do --- see that routine for comments.
*/
notifyInterruptEnabled = 0; /* disable any recursive signal */
notifyInterruptOccurred = 1; /* do at least one iteration */
for (;;)
{
notifyInterruptEnabled = 1;
if (!notifyInterruptOccurred)
break;
notifyInterruptEnabled = 0;
if (notifyInterruptOccurred)
{
/* Here, it is finally safe to do stuff. */
if (Trace_notify)
elog(DEBUG1, "HandleNotifyInterrupt: perform async notify");
ProcessIncomingNotify();
if (Trace_notify)
elog(DEBUG1, "HandleNotifyInterrupt: done");
}
}
/* /* make sure the event is processed in due course */
* Restore the holdoff level and ImmediateInterruptOK, and check for SetLatch(MyLatch);
* interrupts if needed.
*/
RESUME_INTERRUPTS();
ImmediateInterruptOK = save_ImmediateInterruptOK;
if (save_ImmediateInterruptOK)
CHECK_FOR_INTERRUPTS();
}
else
{
/*
* In this path it is NOT SAFE to do much of anything, except this:
*/
notifyInterruptOccurred = 1;
}
} }
/* /*
* EnableNotifyInterrupt * ProcessNotifyInterrupt
* *
* This is called by the PostgresMain main loop just before waiting * This is called just after waiting for a frontend command. If a
* for a frontend command. If we are truly idle (ie, *not* inside * interrupt arrives (via HandleNotifyInterrupt()) while reading, the
* a transaction block), then process any pending inbound notifies, * read will be interrupted via the process's latch, and this routine
* and enable the signal handler to process future notifies directly. * will get called. If we are truly idle (ie, *not* inside a transaction
* * block), process the incoming notifies.
* NOTE: the signal handler starts out disabled, and stays so until
* PostgresMain calls this the first time.
*/ */
void void
EnableNotifyInterrupt(void) ProcessNotifyInterrupt(void)
{ {
if (IsTransactionOrTransactionBlock()) if (IsTransactionOrTransactionBlock())
return; /* not really idle */ return; /* not really idle */
/* while (notifyInterruptPending)
* This code is tricky because we are communicating with a signal handler
* that could interrupt us at any point. If we just checked
* notifyInterruptOccurred and then set notifyInterruptEnabled, we could
* fail to respond promptly to a signal that happens in between those two
* steps. (A very small time window, perhaps, but Murphy's Law says you
* can hit it...) Instead, we first set the enable flag, then test the
* occurred flag. If we see an unserviced interrupt has occurred, we
* re-clear the enable flag before going off to do the service work. (That
* prevents re-entrant invocation of ProcessIncomingNotify() if another
* interrupt occurs.) If an interrupt comes in between the setting and
* clearing of notifyInterruptEnabled, then it will have done the service
* work and left notifyInterruptOccurred zero, so we have to check again
* after clearing enable. The whole thing has to be in a loop in case
* another interrupt occurs while we're servicing the first. Once we get
* out of the loop, enable is set and we know there is no unserviced
* interrupt.
*
* NB: an overenthusiastic optimizing compiler could easily break this
* code. Hopefully, they all understand what "volatile" means these days.
*/
for (;;)
{
notifyInterruptEnabled = 1;
if (!notifyInterruptOccurred)
break;
notifyInterruptEnabled = 0;
if (notifyInterruptOccurred)
{
if (Trace_notify)
elog(DEBUG1, "EnableNotifyInterrupt: perform async notify");
ProcessIncomingNotify(); ProcessIncomingNotify();
if (Trace_notify)
elog(DEBUG1, "EnableNotifyInterrupt: done");
}
}
} }
/*
* DisableNotifyInterrupt
*
* This is called by the PostgresMain main loop just after receiving
* a frontend command. Signal handler execution of inbound notifies
* is disabled until the next EnableNotifyInterrupt call.
*
* The PROCSIG_CATCHUP_INTERRUPT signal handler also needs to call this,
* so as to prevent conflicts if one signal interrupts the other. So we
* must return the previous state of the flag.
*/
bool
DisableNotifyInterrupt(void)
{
bool result = (notifyInterruptEnabled != 0);
notifyInterruptEnabled = 0;
return result;
}
/* /*
* Read all pending notifications from the queue, and deliver appropriate * Read all pending notifications from the queue, and deliver appropriate
...@@ -2076,9 +1956,10 @@ asyncQueueAdvanceTail(void) ...@@ -2076,9 +1956,10 @@ asyncQueueAdvanceTail(void)
/* /*
* ProcessIncomingNotify * ProcessIncomingNotify
* *
* Deal with arriving NOTIFYs from other backends. * Deal with arriving NOTIFYs from other backends as soon as it's safe to
* This is called either directly from the PROCSIG_NOTIFY_INTERRUPT * do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT
* signal handler, or the next time control reaches the outer idle loop. * signal handler, but isn't anymore.
*
* Scan the queue for arriving notifications and report them to my front * Scan the queue for arriving notifications and report them to my front
* end. * end.
* *
...@@ -2087,18 +1968,13 @@ asyncQueueAdvanceTail(void) ...@@ -2087,18 +1968,13 @@ asyncQueueAdvanceTail(void)
static void static void
ProcessIncomingNotify(void) ProcessIncomingNotify(void)
{ {
bool catchup_enabled;
/* We *must* reset the flag */ /* We *must* reset the flag */
notifyInterruptOccurred = 0; notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */ /* Do nothing else if we aren't actively listening */
if (listenChannels == NIL) if (listenChannels == NIL)
return; return;
/* Must prevent catchup interrupt while I am running */
catchup_enabled = DisableCatchupInterrupt();
if (Trace_notify) if (Trace_notify)
elog(DEBUG1, "ProcessIncomingNotify"); elog(DEBUG1, "ProcessIncomingNotify");
...@@ -2123,9 +1999,6 @@ ProcessIncomingNotify(void) ...@@ -2123,9 +1999,6 @@ ProcessIncomingNotify(void)
if (Trace_notify) if (Trace_notify)
elog(DEBUG1, "ProcessIncomingNotify: done"); elog(DEBUG1, "ProcessIncomingNotify: done");
if (catchup_enabled)
EnableCatchupInterrupt();
} }
/* /*
......
...@@ -511,6 +511,7 @@ be_tls_read(Port *port, void *ptr, size_t len) ...@@ -511,6 +511,7 @@ be_tls_read(Port *port, void *ptr, size_t len)
ssize_t n; ssize_t n;
int err; int err;
int waitfor; int waitfor;
int latchret;
rloop: rloop:
errno = 0; errno = 0;
...@@ -531,12 +532,29 @@ rloop: ...@@ -531,12 +532,29 @@ rloop:
break; break;
} }
waitfor = WL_LATCH_SET;
if (err == SSL_ERROR_WANT_READ) if (err == SSL_ERROR_WANT_READ)
waitfor = WL_SOCKET_READABLE; waitfor |= WL_SOCKET_READABLE;
else else
waitfor = WL_SOCKET_WRITEABLE; waitfor |= WL_SOCKET_WRITEABLE;
WaitLatchOrSocket(MyLatch, waitfor, port->sock, 0); latchret = WaitLatchOrSocket(MyLatch, waitfor, port->sock, 0);
/*
* We'll, among other situations, get here if the low level
* routine doing the actual recv() via the socket got interrupted
* by a signal. That's so we can handle interrupts once outside
* openssl, so we don't jump out from underneath its covers. We
* can check this both, when reading and writing, because even
* when writing that's just openssl's doing, not a 'proper' write
* initiated by postgres.
*/
if (latchret & WL_LATCH_SET)
{
ResetLatch(MyLatch);
ProcessClientReadInterrupt(); /* preserves errno */
}
goto rloop; goto rloop;
case SSL_ERROR_SYSCALL: case SSL_ERROR_SYSCALL:
/* leave it to caller to ereport the value of errno */ /* leave it to caller to ereport the value of errno */
...@@ -647,6 +665,10 @@ wloop: ...@@ -647,6 +665,10 @@ wloop:
waitfor = WL_SOCKET_WRITEABLE; waitfor = WL_SOCKET_WRITEABLE;
WaitLatchOrSocket(MyLatch, waitfor, port->sock, 0); WaitLatchOrSocket(MyLatch, waitfor, port->sock, 0);
/*
* XXX: We'll, at some later point, likely want to add interrupt
* processing here.
*/
goto wloop; goto wloop;
case SSL_ERROR_SYSCALL: case SSL_ERROR_SYSCALL:
/* leave it to caller to ereport the value of errno */ /* leave it to caller to ereport the value of errno */
......
...@@ -128,6 +128,7 @@ secure_read(Port *port, void *ptr, size_t len) ...@@ -128,6 +128,7 @@ secure_read(Port *port, void *ptr, size_t len)
{ {
ssize_t n; ssize_t n;
retry:
#ifdef USE_SSL #ifdef USE_SSL
if (port->ssl_in_use) if (port->ssl_in_use)
{ {
...@@ -139,6 +140,14 @@ secure_read(Port *port, void *ptr, size_t len) ...@@ -139,6 +140,14 @@ secure_read(Port *port, void *ptr, size_t len)
n = secure_raw_read(port, ptr, len); n = secure_raw_read(port, ptr, len);
} }
/* Process interrupts that happened while (or before) receiving. */
ProcessClientReadInterrupt(); /* preserves errno */
/* retry after processing interrupts */
if (n < 0 && errno == EINTR)
{
goto retry;
}
return n; return n;
} }
...@@ -147,8 +156,6 @@ secure_raw_read(Port *port, void *ptr, size_t len) ...@@ -147,8 +156,6 @@ secure_raw_read(Port *port, void *ptr, size_t len)
{ {
ssize_t n; ssize_t n;
prepare_for_client_read();
/* /*
* Try to read from the socket without blocking. If it succeeds we're * Try to read from the socket without blocking. If it succeeds we're
* done, otherwise we'll wait for the socket using the latch mechanism. * done, otherwise we'll wait for the socket using the latch mechanism.
...@@ -168,10 +175,20 @@ rloop: ...@@ -168,10 +175,20 @@ rloop:
int save_errno = errno; int save_errno = errno;
w = WaitLatchOrSocket(MyLatch, w = WaitLatchOrSocket(MyLatch,
WL_SOCKET_READABLE, WL_LATCH_SET | WL_SOCKET_READABLE,
port->sock, 0); port->sock, 0);
if (w & WL_SOCKET_READABLE) if (w & WL_LATCH_SET)
{
ResetLatch(MyLatch);
/*
* Force a return, so interrupts can be processed when not
* (possibly) underneath a ssl library.
*/
errno = EINTR;
return -1;
}
else if (w & WL_SOCKET_READABLE)
{ {
goto rloop; goto rloop;
} }
...@@ -183,8 +200,6 @@ rloop: ...@@ -183,8 +200,6 @@ rloop:
errno = save_errno; errno = save_errno;
} }
client_read_ended();
return n; return n;
} }
...@@ -197,6 +212,7 @@ secure_write(Port *port, void *ptr, size_t len) ...@@ -197,6 +212,7 @@ secure_write(Port *port, void *ptr, size_t len)
{ {
ssize_t n; ssize_t n;
retry:
#ifdef USE_SSL #ifdef USE_SSL
if (port->ssl_in_use) if (port->ssl_in_use)
{ {
...@@ -208,6 +224,21 @@ secure_write(Port *port, void *ptr, size_t len) ...@@ -208,6 +224,21 @@ secure_write(Port *port, void *ptr, size_t len)
n = secure_raw_write(port, ptr, len); n = secure_raw_write(port, ptr, len);
} }
/*
* XXX: We'll, at some later point, likely want to add interrupt
* processing here.
*/
/*
* Retry after processing interrupts. This can be triggered even though we
* don't check for latch set's during writing yet, because SSL
* renegotiations might have required reading from the socket.
*/
if (n < 0 && errno == EINTR)
{
goto retry;
}
return n; return n;
} }
......
...@@ -582,9 +582,6 @@ AutoVacLauncherMain(int argc, char *argv[]) ...@@ -582,9 +582,6 @@ AutoVacLauncherMain(int argc, char *argv[])
launcher_determine_sleep(!dlist_is_empty(&AutoVacuumShmem->av_freeWorkers), launcher_determine_sleep(!dlist_is_empty(&AutoVacuumShmem->av_freeWorkers),
false, &nap); false, &nap);
/* Allow sinval catchup interrupts while sleeping */
EnableCatchupInterrupt();
/* /*
* Wait until naptime expires or we get some type of signal (all the * Wait until naptime expires or we get some type of signal (all the
* signal handlers will wake us by calling SetLatch). * signal handlers will wake us by calling SetLatch).
...@@ -595,7 +592,8 @@ AutoVacLauncherMain(int argc, char *argv[]) ...@@ -595,7 +592,8 @@ AutoVacLauncherMain(int argc, char *argv[])
ResetLatch(MyLatch); ResetLatch(MyLatch);
DisableCatchupInterrupt(); /* Process sinval catchup interrupts that happened while sleeping */
ProcessCatchupInterrupt();
/* /*
* Emergency bailout if postmaster has died. This is to avoid the * Emergency bailout if postmaster has died. This is to avoid the
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "commands/async.h" #include "commands/async.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/ipc.h" #include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/sinvaladt.h" #include "storage/sinvaladt.h"
#include "utils/inval.h" #include "utils/inval.h"
...@@ -32,19 +33,12 @@ uint64 SharedInvalidMessageCounter; ...@@ -32,19 +33,12 @@ uint64 SharedInvalidMessageCounter;
* through a cache reset exercise. This is done by sending * through a cache reset exercise. This is done by sending
* PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far behind. * PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far behind.
* *
* State for catchup events consists of two flags: one saying whether * The signal handler will set a interrupt pending flag and will set the
* the signal handler is currently allowed to call ProcessCatchupEvent * processes latch. Whenever starting to read from the client, or when
* directly, and one saying whether the signal has occurred but the handler * interrupted while doing so, ProcessClientReadInterrupt() will call
* was not allowed to call ProcessCatchupEvent at the time. * ProcessCatchupEvent().
*
* NB: the "volatile" on these declarations is critical! If your compiler
* does not grok "volatile", you'd be best advised to compile this file
* with all optimization turned off.
*/ */
static volatile int catchupInterruptEnabled = 0; volatile sig_atomic_t catchupInterruptPending = false;
static volatile int catchupInterruptOccurred = 0;
static void ProcessCatchupEvent(void);
/* /*
...@@ -141,9 +135,9 @@ ReceiveSharedInvalidMessages( ...@@ -141,9 +135,9 @@ ReceiveSharedInvalidMessages(
* catchup signal this way avoids creating spikes in system load for what * catchup signal this way avoids creating spikes in system load for what
* should be just a background maintenance activity. * should be just a background maintenance activity.
*/ */
if (catchupInterruptOccurred) if (catchupInterruptPending)
{ {
catchupInterruptOccurred = 0; catchupInterruptPending = false;
elog(DEBUG4, "sinval catchup complete, cleaning queue"); elog(DEBUG4, "sinval catchup complete, cleaning queue");
SICleanupQueue(false, 0); SICleanupQueue(false, 0);
} }
...@@ -155,12 +149,9 @@ ReceiveSharedInvalidMessages( ...@@ -155,12 +149,9 @@ ReceiveSharedInvalidMessages(
* *
* This is called when PROCSIG_CATCHUP_INTERRUPT is received. * This is called when PROCSIG_CATCHUP_INTERRUPT is received.
* *
* If we are idle (catchupInterruptEnabled is set), we can safely * We used to directly call ProcessCatchupEvent directly when idle. These days
* invoke ProcessCatchupEvent directly. Otherwise, just set a flag * we just set a flag to do it later and notify the process of that fact by
* to do it later. (Note that it's quite possible for normal processing * setting the process's latch.
* of the current transaction to cause ReceiveSharedInvalidMessages()
* to be run later on; in that case the flag will get cleared again,
* since there's no longer any reason to do anything.)
*/ */
void void
HandleCatchupInterrupt(void) HandleCatchupInterrupt(void)
...@@ -170,155 +161,29 @@ HandleCatchupInterrupt(void) ...@@ -170,155 +161,29 @@ HandleCatchupInterrupt(void)
* you do here. * you do here.
*/ */
/* Don't joggle the elbow of proc_exit */ catchupInterruptPending = true;
if (proc_exit_inprogress)
return;
if (catchupInterruptEnabled)
{
bool save_ImmediateInterruptOK = ImmediateInterruptOK;
/*
* We may be called while ImmediateInterruptOK is true; turn it off
* while messing with the catchup state. This prevents problems if
* SIGINT or similar arrives while we're working. Just to be real
* sure, bump the interrupt holdoff counter as well. That way, even
* if something inside ProcessCatchupEvent() transiently sets
* ImmediateInterruptOK (eg while waiting on a lock), we won't get
* interrupted until we're done with the catchup interrupt.
*/
ImmediateInterruptOK = false;
HOLD_INTERRUPTS();
/*
* I'm not sure whether some flavors of Unix might allow another
* SIGUSR1 occurrence to recursively interrupt this routine. To cope
* with the possibility, we do the same sort of dance that
* EnableCatchupInterrupt must do --- see that routine for comments.
*/
catchupInterruptEnabled = 0; /* disable any recursive signal */
catchupInterruptOccurred = 1; /* do at least one iteration */
for (;;)
{
catchupInterruptEnabled = 1;
if (!catchupInterruptOccurred)
break;
catchupInterruptEnabled = 0;
if (catchupInterruptOccurred)
{
/* Here, it is finally safe to do stuff. */
ProcessCatchupEvent();
}
}
/* /* make sure the event is processed in due course */
* Restore the holdoff level and ImmediateInterruptOK, and check for SetLatch(MyLatch);
* interrupts if needed.
*/
RESUME_INTERRUPTS();
ImmediateInterruptOK = save_ImmediateInterruptOK;
if (save_ImmediateInterruptOK)
CHECK_FOR_INTERRUPTS();
}
else
{
/*
* In this path it is NOT SAFE to do much of anything, except this:
*/
catchupInterruptOccurred = 1;
}
} }
/* /*
* EnableCatchupInterrupt * ProcessCatchupInterrupt
*
* This is called by the PostgresMain main loop just before waiting
* for a frontend command. We process any pending catchup events,
* and enable the signal handler to process future events directly.
* *
* NOTE: the signal handler starts out disabled, and stays so until * The portion of catchup interrupt handling that runs outside of the signal
* PostgresMain calls this the first time. * handler, which allows it to actually process pending invalidations.
*/ */
void void
EnableCatchupInterrupt(void) ProcessCatchupInterrupt(void)
{ {
/* while (catchupInterruptPending)
* This code is tricky because we are communicating with a signal handler
* that could interrupt us at any point. If we just checked
* catchupInterruptOccurred and then set catchupInterruptEnabled, we could
* fail to respond promptly to a signal that happens in between those two
* steps. (A very small time window, perhaps, but Murphy's Law says you
* can hit it...) Instead, we first set the enable flag, then test the
* occurred flag. If we see an unserviced interrupt has occurred, we
* re-clear the enable flag before going off to do the service work. (That
* prevents re-entrant invocation of ProcessCatchupEvent() if another
* interrupt occurs.) If an interrupt comes in between the setting and
* clearing of catchupInterruptEnabled, then it will have done the service
* work and left catchupInterruptOccurred zero, so we have to check again
* after clearing enable. The whole thing has to be in a loop in case
* another interrupt occurs while we're servicing the first. Once we get
* out of the loop, enable is set and we know there is no unserviced
* interrupt.
*
* NB: an overenthusiastic optimizing compiler could easily break this
* code. Hopefully, they all understand what "volatile" means these days.
*/
for (;;)
{ {
catchupInterruptEnabled = 1;
if (!catchupInterruptOccurred)
break;
catchupInterruptEnabled = 0;
if (catchupInterruptOccurred)
ProcessCatchupEvent();
}
}
/*
* DisableCatchupInterrupt
*
* This is called by the PostgresMain main loop just after receiving
* a frontend command. Signal handler execution of catchup events
* is disabled until the next EnableCatchupInterrupt call.
*
* The PROCSIG_NOTIFY_INTERRUPT signal handler also needs to call this,
* so as to prevent conflicts if one signal interrupts the other. So we
* must return the previous state of the flag.
*/
bool
DisableCatchupInterrupt(void)
{
bool result = (catchupInterruptEnabled != 0);
catchupInterruptEnabled = 0;
return result;
}
/*
* ProcessCatchupEvent
*
* Respond to a catchup event (PROCSIG_CATCHUP_INTERRUPT) from another
* backend.
*
* This is called either directly from the PROCSIG_CATCHUP_INTERRUPT
* signal handler, or the next time control reaches the outer idle loop
* (assuming there's still anything to do by then).
*/
static void
ProcessCatchupEvent(void)
{
bool notify_enabled;
/* Must prevent notify interrupt while I am running */
notify_enabled = DisableNotifyInterrupt();
/* /*
* What we need to do here is cause ReceiveSharedInvalidMessages() to run, * What we need to do here is cause ReceiveSharedInvalidMessages() to
* which will do the necessary work and also reset the * run, which will do the necessary work and also reset the
* catchupInterruptOccurred flag. If we are inside a transaction we can * catchupInterruptPending flag. If we are inside a transaction we
* just call AcceptInvalidationMessages() to do this. If we aren't, we * can just call AcceptInvalidationMessages() to do this. If we
* start and immediately end a transaction; the call to * aren't, we start and immediately end a transaction; the call to
* AcceptInvalidationMessages() happens down inside transaction start. * AcceptInvalidationMessages() happens down inside transaction start.
* *
* It is awfully tempting to just call AcceptInvalidationMessages() * It is awfully tempting to just call AcceptInvalidationMessages()
...@@ -337,7 +202,5 @@ ProcessCatchupEvent(void) ...@@ -337,7 +202,5 @@ ProcessCatchupEvent(void)
StartTransactionCommand(); StartTransactionCommand();
CommitTransactionCommand(); CommitTransactionCommand();
} }
}
if (notify_enabled)
EnableNotifyInterrupt();
} }
...@@ -301,17 +301,25 @@ InteractiveBackend(StringInfo inBuf) ...@@ -301,17 +301,25 @@ InteractiveBackend(StringInfo inBuf)
* interactive_getc -- collect one character from stdin * interactive_getc -- collect one character from stdin
* *
* Even though we are not reading from a "client" process, we still want to * Even though we are not reading from a "client" process, we still want to
* respond to signals, particularly SIGTERM/SIGQUIT. Hence we must use * respond to signals, particularly SIGTERM/SIGQUIT.
* prepare_for_client_read and client_read_ended.
*/ */
static int static int
interactive_getc(void) interactive_getc(void)
{ {
int c; int c;
prepare_for_client_read(); /*
* This will not process catchup interrupts or notifications while
* reading. But those can't really be relevant for a standalone backend
* anyway. To properly handle SIGTERM there's a hack in die() that
* directly processes interrupts at this stage...
*/
CHECK_FOR_INTERRUPTS();
c = getc(stdin); c = getc(stdin);
client_read_ended();
ProcessClientReadInterrupt();
return c; return c;
} }
...@@ -513,53 +521,33 @@ ReadCommand(StringInfo inBuf) ...@@ -513,53 +521,33 @@ ReadCommand(StringInfo inBuf)
} }
/* /*
* prepare_for_client_read -- set up to possibly block on client input * ProcessClientReadInterrupt() - Process interrupts specific to client reads
* *
* This must be called immediately before any low-level read from the * This is called just after low-level reads. That might be after the read
* client connection. It is necessary to do it at a sufficiently low level * finished successfully, or it was interrupted via interrupt.
* that there won't be any other operations except the read kernel call
* itself between this call and the subsequent client_read_ended() call.
* In particular there mustn't be use of malloc() or other potentially
* non-reentrant libc functions. This restriction makes it safe for us
* to allow interrupt service routines to execute nontrivial code while
* we are waiting for input.
*/
void
prepare_for_client_read(void)
{
if (DoingCommandRead)
{
/* Enable immediate processing of asynchronous signals */
EnableNotifyInterrupt();
EnableCatchupInterrupt();
/* Allow die interrupts to be processed while waiting */
ImmediateInterruptOK = true;
/* And don't forget to detect one that already arrived */
CHECK_FOR_INTERRUPTS();
}
}
/*
* client_read_ended -- get out of the client-input state
* *
* This is called just after low-level reads. It must preserve errno! * Must preserve errno!
*/ */
void void
client_read_ended(void) ProcessClientReadInterrupt(void)
{ {
int save_errno = errno;
if (DoingCommandRead) if (DoingCommandRead)
{ {
int save_errno = errno; /* Check for general interrupts that arrived while reading */
CHECK_FOR_INTERRUPTS();
ImmediateInterruptOK = false; /* Process sinval catchup interrupts that happened while reading */
if (catchupInterruptPending)
ProcessCatchupInterrupt();
DisableNotifyInterrupt(); /* Process sinval catchup interrupts that happened while reading */
DisableCatchupInterrupt(); if (notifyInterruptPending)
ProcessNotifyInterrupt();
}
errno = save_errno; errno = save_errno;
}
} }
...@@ -2626,6 +2614,15 @@ die(SIGNAL_ARGS) ...@@ -2626,6 +2614,15 @@ die(SIGNAL_ARGS)
/* If we're still here, waken anything waiting on the process latch */ /* If we're still here, waken anything waiting on the process latch */
SetLatch(MyLatch); SetLatch(MyLatch);
/*
* If we're in single user mode, we want to quit immediately - we can't
* rely on latches as they wouldn't work when stdin/stdout is a
* file. Rather ugly, but it's unlikely to be worthwhile to invest much
* more effort just for the benefit of single user mode.
*/
if (DoingCommandRead && whereToSendOutput != DestRemote)
ProcessInterrupts();
errno = save_errno; errno = save_errno;
} }
...@@ -2834,8 +2831,6 @@ ProcessInterrupts(void) ...@@ -2834,8 +2831,6 @@ ProcessInterrupts(void)
QueryCancelPending = false; /* ProcDie trumps QueryCancel */ QueryCancelPending = false; /* ProcDie trumps QueryCancel */
ImmediateInterruptOK = false; /* not idle anymore */ ImmediateInterruptOK = false; /* not idle anymore */
LockErrorCleanup(); LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
/* As in quickdie, don't risk sending to client during auth */ /* As in quickdie, don't risk sending to client during auth */
if (ClientAuthInProgress && whereToSendOutput == DestRemote) if (ClientAuthInProgress && whereToSendOutput == DestRemote)
whereToSendOutput = DestNone; whereToSendOutput = DestNone;
...@@ -2871,8 +2866,6 @@ ProcessInterrupts(void) ...@@ -2871,8 +2866,6 @@ ProcessInterrupts(void)
QueryCancelPending = false; /* lost connection trumps QueryCancel */ QueryCancelPending = false; /* lost connection trumps QueryCancel */
ImmediateInterruptOK = false; /* not idle anymore */ ImmediateInterruptOK = false; /* not idle anymore */
LockErrorCleanup(); LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
/* don't send to client, we already know the connection to be dead. */ /* don't send to client, we already know the connection to be dead. */
whereToSendOutput = DestNone; whereToSendOutput = DestNone;
ereport(FATAL, ereport(FATAL,
...@@ -2892,8 +2885,6 @@ ProcessInterrupts(void) ...@@ -2892,8 +2885,6 @@ ProcessInterrupts(void)
ImmediateInterruptOK = false; /* not idle anymore */ ImmediateInterruptOK = false; /* not idle anymore */
RecoveryConflictPending = false; RecoveryConflictPending = false;
LockErrorCleanup(); LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
pgstat_report_recovery_conflict(RecoveryConflictReason); pgstat_report_recovery_conflict(RecoveryConflictReason);
ereport(FATAL, ereport(FATAL,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
...@@ -2926,8 +2917,6 @@ ProcessInterrupts(void) ...@@ -2926,8 +2917,6 @@ ProcessInterrupts(void)
{ {
ImmediateInterruptOK = false; /* not idle anymore */ ImmediateInterruptOK = false; /* not idle anymore */
LockErrorCleanup(); LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
/* As in quickdie, don't risk sending to client during auth */ /* As in quickdie, don't risk sending to client during auth */
if (whereToSendOutput == DestRemote) if (whereToSendOutput == DestRemote)
whereToSendOutput = DestNone; whereToSendOutput = DestNone;
...@@ -2945,8 +2934,6 @@ ProcessInterrupts(void) ...@@ -2945,8 +2934,6 @@ ProcessInterrupts(void)
ImmediateInterruptOK = false; /* not idle anymore */ ImmediateInterruptOK = false; /* not idle anymore */
(void) get_timeout_indicator(STATEMENT_TIMEOUT, true); (void) get_timeout_indicator(STATEMENT_TIMEOUT, true);
LockErrorCleanup(); LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE), (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("canceling statement due to lock timeout"))); errmsg("canceling statement due to lock timeout")));
...@@ -2955,8 +2942,6 @@ ProcessInterrupts(void) ...@@ -2955,8 +2942,6 @@ ProcessInterrupts(void)
{ {
ImmediateInterruptOK = false; /* not idle anymore */ ImmediateInterruptOK = false; /* not idle anymore */
LockErrorCleanup(); LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED), (errcode(ERRCODE_QUERY_CANCELED),
errmsg("canceling statement due to statement timeout"))); errmsg("canceling statement due to statement timeout")));
...@@ -2965,8 +2950,6 @@ ProcessInterrupts(void) ...@@ -2965,8 +2950,6 @@ ProcessInterrupts(void)
{ {
ImmediateInterruptOK = false; /* not idle anymore */ ImmediateInterruptOK = false; /* not idle anymore */
LockErrorCleanup(); LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED), (errcode(ERRCODE_QUERY_CANCELED),
errmsg("canceling autovacuum task"))); errmsg("canceling autovacuum task")));
...@@ -2976,8 +2959,6 @@ ProcessInterrupts(void) ...@@ -2976,8 +2959,6 @@ ProcessInterrupts(void)
ImmediateInterruptOK = false; /* not idle anymore */ ImmediateInterruptOK = false; /* not idle anymore */
RecoveryConflictPending = false; RecoveryConflictPending = false;
LockErrorCleanup(); LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
pgstat_report_recovery_conflict(RecoveryConflictReason); pgstat_report_recovery_conflict(RecoveryConflictReason);
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
...@@ -2994,13 +2975,12 @@ ProcessInterrupts(void) ...@@ -2994,13 +2975,12 @@ ProcessInterrupts(void)
{ {
ImmediateInterruptOK = false; /* not idle anymore */ ImmediateInterruptOK = false; /* not idle anymore */
LockErrorCleanup(); LockErrorCleanup();
DisableNotifyInterrupt();
DisableCatchupInterrupt();
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_QUERY_CANCELED), (errcode(ERRCODE_QUERY_CANCELED),
errmsg("canceling statement due to user request"))); errmsg("canceling statement due to user request")));
} }
} }
/* If we get here, do nothing (probably, QueryCancelPending was reset) */ /* If we get here, do nothing (probably, QueryCancelPending was reset) */
} }
...@@ -3843,14 +3823,8 @@ PostgresMain(int argc, char *argv[], ...@@ -3843,14 +3823,8 @@ PostgresMain(int argc, char *argv[],
disable_all_timeouts(false); disable_all_timeouts(false);
QueryCancelPending = false; /* second to avoid race condition */ QueryCancelPending = false; /* second to avoid race condition */
/* /* Not reading from the client anymore. */
* Turn off these interrupts too. This is only needed here and not in
* other exception-catching places since these interrupts are only
* enabled while we wait for client input.
*/
DoingCommandRead = false; DoingCommandRead = false;
DisableNotifyInterrupt();
DisableCatchupInterrupt();
/* Make sure libpq is in a good state */ /* Make sure libpq is in a good state */
pq_comm_reset(); pq_comm_reset();
......
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
#ifndef ASYNC_H #ifndef ASYNC_H
#define ASYNC_H #define ASYNC_H
#include <signal.h>
#include "fmgr.h" #include "fmgr.h"
/* /*
...@@ -21,6 +23,7 @@ ...@@ -21,6 +23,7 @@
#define NUM_ASYNC_BUFFERS 8 #define NUM_ASYNC_BUFFERS 8
extern bool Trace_notify; extern bool Trace_notify;
extern volatile sig_atomic_t notifyInterruptPending;
extern Size AsyncShmemSize(void); extern Size AsyncShmemSize(void);
extern void AsyncShmemInit(void); extern void AsyncShmemInit(void);
...@@ -48,12 +51,7 @@ extern void ProcessCompletedNotifies(void); ...@@ -48,12 +51,7 @@ extern void ProcessCompletedNotifies(void);
/* signal handler for inbound notifies (PROCSIG_NOTIFY_INTERRUPT) */ /* signal handler for inbound notifies (PROCSIG_NOTIFY_INTERRUPT) */
extern void HandleNotifyInterrupt(void); extern void HandleNotifyInterrupt(void);
/* /* process interrupts */
* enable/disable processing of inbound notifies directly from signal handler. extern void ProcessNotifyInterrupt(void);
* The enable routine first performs processing of any inbound notifies that
* have occurred since the last disable.
*/
extern void EnableNotifyInterrupt(void);
extern bool DisableNotifyInterrupt(void);
#endif /* ASYNC_H */ #endif /* ASYNC_H */
...@@ -14,8 +14,9 @@ ...@@ -14,8 +14,9 @@
#ifndef SINVAL_H #ifndef SINVAL_H
#define SINVAL_H #define SINVAL_H
#include "storage/relfilenode.h" #include <signal.h>
#include "storage/relfilenode.h"
/* /*
* We support several types of shared-invalidation messages: * We support several types of shared-invalidation messages:
...@@ -123,6 +124,7 @@ typedef union ...@@ -123,6 +124,7 @@ typedef union
/* Counter of messages processed; don't worry about overflow. */ /* Counter of messages processed; don't worry about overflow. */
extern uint64 SharedInvalidMessageCounter; extern uint64 SharedInvalidMessageCounter;
extern volatile sig_atomic_t catchupInterruptPending;
extern void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs, extern void SendSharedInvalidMessages(const SharedInvalidationMessage *msgs,
int n); int n);
...@@ -138,8 +140,7 @@ extern void HandleCatchupInterrupt(void); ...@@ -138,8 +140,7 @@ extern void HandleCatchupInterrupt(void);
* The enable routine first performs processing of any catchup events that * The enable routine first performs processing of any catchup events that
* have occurred since the last disable. * have occurred since the last disable.
*/ */
extern void EnableCatchupInterrupt(void); extern void ProcessCatchupInterrupt(void);
extern bool DisableCatchupInterrupt(void);
extern int xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs, extern int xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
bool *RelcacheInitFileInval); bool *RelcacheInitFileInval);
......
...@@ -67,8 +67,8 @@ extern void StatementCancelHandler(SIGNAL_ARGS); ...@@ -67,8 +67,8 @@ extern void StatementCancelHandler(SIGNAL_ARGS);
extern void FloatExceptionHandler(SIGNAL_ARGS) __attribute__((noreturn)); extern void FloatExceptionHandler(SIGNAL_ARGS) __attribute__((noreturn));
extern void RecoveryConflictInterrupt(ProcSignalReason reason); /* called from SIGUSR1 extern void RecoveryConflictInterrupt(ProcSignalReason reason); /* called from SIGUSR1
* handler */ * handler */
extern void prepare_for_client_read(void); extern void ProcessClientReadInterrupt(void);
extern void client_read_ended(void);
extern void process_postgres_switches(int argc, char *argv[], extern void process_postgres_switches(int argc, char *argv[],
GucContext ctx, const char **dbname); GucContext ctx, const char **dbname);
extern void PostgresMain(int argc, char *argv[], extern void PostgresMain(int argc, char *argv[],
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment