Commit 6a2a70a0 authored by Thomas Munro's avatar Thomas Munro

Use signalfd(2) for epoll latches.

Cut down on system calls and other overheads by reading from a signalfd
instead of using a signal handler and self-pipe.  Affects Linux sytems,
and possibly others including illumos that implement the Linux epoll and
signalfd interfaces.
Reviewed-by: default avatarAndres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA+hUKGJjxPDpzBE0a3hyUywBvaZuC89yx3jK9RFZgfv_KHU7gg@mail.gmail.com
parent 83709a0d
...@@ -35,13 +35,15 @@ sigset_t UnBlockSig, ...@@ -35,13 +35,15 @@ sigset_t UnBlockSig,
* collection; it's essentially BlockSig minus SIGTERM, SIGQUIT, SIGALRM. * collection; it's essentially BlockSig minus SIGTERM, SIGQUIT, SIGALRM.
* *
* UnBlockSig is the set of signals to block when we don't want to block * UnBlockSig is the set of signals to block when we don't want to block
* signals (is this ever nonzero??) * signals.
*/ */
void void
pqinitmask(void) pqinitmask(void)
{ {
sigemptyset(&UnBlockSig); sigemptyset(&UnBlockSig);
/* Note: InitializeLatchSupport() modifies UnBlockSig. */
/* First set all signals, then clear some. */ /* First set all signals, then clear some. */
sigfillset(&BlockSig); sigfillset(&BlockSig);
sigfillset(&StartupBlockSig); sigfillset(&StartupBlockSig);
......
...@@ -3,21 +3,20 @@ ...@@ -3,21 +3,20 @@
* latch.c * latch.c
* Routines for inter-process latches * Routines for inter-process latches
* *
* The Unix implementation uses the so-called self-pipe trick to overcome the * The poll() implementation uses the so-called self-pipe trick to overcome the
* race condition involved with poll() (or epoll_wait() on linux) and setting * race condition involved with poll() and setting a global flag in the signal
* a global flag in the signal handler. When a latch is set and the current * handler. When a latch is set and the current process is waiting for it, the
* process is waiting for it, the signal handler wakes up the poll() in * signal handler wakes up the poll() in WaitLatch by writing a byte to a pipe.
* WaitLatch by writing a byte to a pipe. A signal by itself doesn't interrupt * A signal by itself doesn't interrupt poll() on all platforms, and even on
* poll() on all platforms, and even on platforms where it does, a signal that * platforms where it does, a signal that arrives just before the poll() call
* arrives just before the poll() call does not prevent poll() from entering * does not prevent poll() from entering sleep. An incoming byte on a pipe
* sleep. An incoming byte on a pipe however reliably interrupts the sleep, * however reliably interrupts the sleep, and causes poll() to return
* and causes poll() to return immediately even if the signal arrives before * immediately even if the signal arrives before poll() begins.
* poll() begins.
* *
* When SetLatch is called from the same process that owns the latch, * The epoll() implementation overcomes the race with a different technique: it
* SetLatch writes the byte directly to the pipe. If it's owned by another * keeps SIGURG blocked and consumes from a signalfd() descriptor instead. We
* process, SIGURG is sent and the signal handler in the waiting process * don't need to register a signal handler or create our own self-pipe. We
* writes the byte to the pipe on behalf of the signaling process. * assume that any system that has Linux epoll() also has Linux signalfd().
* *
* The Windows implementation uses Windows events that are inherited by all * The Windows implementation uses Windows events that are inherited by all
* postmaster child processes. There's no need for the self-pipe trick there. * postmaster child processes. There's no need for the self-pipe trick there.
...@@ -46,6 +45,7 @@ ...@@ -46,6 +45,7 @@
#include <poll.h> #include <poll.h>
#endif #endif
#include "libpq/pqsignal.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h" #include "pgstat.h"
#include "port/atomics.h" #include "port/atomics.h"
...@@ -79,6 +79,10 @@ ...@@ -79,6 +79,10 @@
#error "no wait set implementation available" #error "no wait set implementation available"
#endif #endif
#ifdef WAIT_USE_EPOLL
#include <sys/signalfd.h>
#endif
/* typedef in latch.h */ /* typedef in latch.h */
struct WaitEventSet struct WaitEventSet
{ {
...@@ -139,7 +143,14 @@ static WaitEventSet *LatchWaitSet; ...@@ -139,7 +143,14 @@ static WaitEventSet *LatchWaitSet;
#ifndef WIN32 #ifndef WIN32
/* Are we currently in WaitLatch? The signal handler would like to know. */ /* Are we currently in WaitLatch? The signal handler would like to know. */
static volatile sig_atomic_t waiting = false; static volatile sig_atomic_t waiting = false;
#endif
#ifdef WAIT_USE_EPOLL
/* On Linux, we'll receive SIGURG via a signalfd file descriptor. */
static int signal_fd = -1;
#endif
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
/* Read and write ends of the self-pipe */ /* Read and write ends of the self-pipe */
static int selfpipe_readfd = -1; static int selfpipe_readfd = -1;
static int selfpipe_writefd = -1; static int selfpipe_writefd = -1;
...@@ -150,8 +161,11 @@ static int selfpipe_owner_pid = 0; ...@@ -150,8 +161,11 @@ static int selfpipe_owner_pid = 0;
/* Private function prototypes */ /* Private function prototypes */
static void latch_sigurg_handler(SIGNAL_ARGS); static void latch_sigurg_handler(SIGNAL_ARGS);
static void sendSelfPipeByte(void); static void sendSelfPipeByte(void);
static void drainSelfPipe(void); #endif
#endif /* WIN32 */
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_EPOLL)
static void drain(void);
#endif
#if defined(WAIT_USE_EPOLL) #if defined(WAIT_USE_EPOLL)
static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action); static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action);
...@@ -175,7 +189,7 @@ static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, ...@@ -175,7 +189,7 @@ static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
void void
InitializeLatchSupport(void) InitializeLatchSupport(void)
{ {
#ifndef WIN32 #if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
int pipefd[2]; int pipefd[2];
if (IsUnderPostmaster) if (IsUnderPostmaster)
...@@ -247,8 +261,21 @@ InitializeLatchSupport(void) ...@@ -247,8 +261,21 @@ InitializeLatchSupport(void)
ReserveExternalFD(); ReserveExternalFD();
pqsignal(SIGURG, latch_sigurg_handler); pqsignal(SIGURG, latch_sigurg_handler);
#else #endif
/* currently, nothing to do here for Windows */
#ifdef WAIT_USE_EPOLL
sigset_t signalfd_mask;
/* Block SIGURG, because we'll receive it through a signalfd. */
sigaddset(&UnBlockSig, SIGURG);
/* Set up the signalfd to receive SIGURG notifications. */
sigemptyset(&signalfd_mask);
sigaddset(&signalfd_mask, SIGURG);
signal_fd = signalfd(-1, &signalfd_mask, SFD_NONBLOCK | SFD_CLOEXEC);
if (signal_fd < 0)
elog(FATAL, "signalfd() failed");
ReserveExternalFD();
#endif #endif
} }
...@@ -273,7 +300,9 @@ InitializeLatchWaitSet(void) ...@@ -273,7 +300,9 @@ InitializeLatchWaitSet(void)
void void
ShutdownLatchSupport(void) ShutdownLatchSupport(void)
{ {
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
pqsignal(SIGURG, SIG_IGN); pqsignal(SIGURG, SIG_IGN);
#endif
if (LatchWaitSet) if (LatchWaitSet)
{ {
...@@ -281,11 +310,18 @@ ShutdownLatchSupport(void) ...@@ -281,11 +310,18 @@ ShutdownLatchSupport(void)
LatchWaitSet = NULL; LatchWaitSet = NULL;
} }
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
close(selfpipe_readfd); close(selfpipe_readfd);
close(selfpipe_writefd); close(selfpipe_writefd);
selfpipe_readfd = -1; selfpipe_readfd = -1;
selfpipe_writefd = -1; selfpipe_writefd = -1;
selfpipe_owner_pid = InvalidPid; selfpipe_owner_pid = InvalidPid;
#endif
#if defined(WAIT_USE_EPOLL)
close(signal_fd);
signal_fd = -1;
#endif
} }
/* /*
...@@ -299,10 +335,10 @@ InitLatch(Latch *latch) ...@@ -299,10 +335,10 @@ InitLatch(Latch *latch)
latch->owner_pid = MyProcPid; latch->owner_pid = MyProcPid;
latch->is_shared = false; latch->is_shared = false;
#ifndef WIN32 #if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
/* Assert InitializeLatchSupport has been called in this process */ /* Assert InitializeLatchSupport has been called in this process */
Assert(selfpipe_readfd >= 0 && selfpipe_owner_pid == MyProcPid); Assert(selfpipe_readfd >= 0 && selfpipe_owner_pid == MyProcPid);
#else #elif defined(WAIT_USE_WIN32)
latch->event = CreateEvent(NULL, TRUE, FALSE, NULL); latch->event = CreateEvent(NULL, TRUE, FALSE, NULL);
if (latch->event == NULL) if (latch->event == NULL)
elog(ERROR, "CreateEvent failed: error code %lu", GetLastError()); elog(ERROR, "CreateEvent failed: error code %lu", GetLastError());
...@@ -363,7 +399,7 @@ OwnLatch(Latch *latch) ...@@ -363,7 +399,7 @@ OwnLatch(Latch *latch)
/* Sanity checks */ /* Sanity checks */
Assert(latch->is_shared); Assert(latch->is_shared);
#ifndef WIN32 #if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
/* Assert InitializeLatchSupport has been called in this process */ /* Assert InitializeLatchSupport has been called in this process */
Assert(selfpipe_readfd >= 0 && selfpipe_owner_pid == MyProcPid); Assert(selfpipe_readfd >= 0 && selfpipe_owner_pid == MyProcPid);
#endif #endif
...@@ -550,9 +586,9 @@ SetLatch(Latch *latch) ...@@ -550,9 +586,9 @@ SetLatch(Latch *latch)
/* /*
* See if anyone's waiting for the latch. It can be the current process if * See if anyone's waiting for the latch. It can be the current process if
* we're in a signal handler. We use the self-pipe to wake up the * we're in a signal handler. We use the self-pipe or SIGURG to ourselves
* poll()/epoll_wait() in that case. If it's another process, send a * to wake up WaitEventSetWaitBlock() without races in that case. If it's
* signal. * another process, send a signal.
* *
* Fetch owner_pid only once, in case the latch is concurrently getting * Fetch owner_pid only once, in case the latch is concurrently getting
* owned or disowned. XXX: This assumes that pid_t is atomic, which isn't * owned or disowned. XXX: This assumes that pid_t is atomic, which isn't
...@@ -575,11 +611,17 @@ SetLatch(Latch *latch) ...@@ -575,11 +611,17 @@ SetLatch(Latch *latch)
return; return;
else if (owner_pid == MyProcPid) else if (owner_pid == MyProcPid)
{ {
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
if (waiting) if (waiting)
sendSelfPipeByte(); sendSelfPipeByte();
#else
if (waiting)
kill(MyProcPid, SIGURG);
#endif
} }
else else
kill(owner_pid, SIGURG); kill(owner_pid, SIGURG);
#else #else
/* /*
...@@ -856,8 +898,13 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, ...@@ -856,8 +898,13 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
{ {
set->latch = latch; set->latch = latch;
set->latch_pos = event->pos; set->latch_pos = event->pos;
#ifndef WIN32 #if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
event->fd = selfpipe_readfd; event->fd = selfpipe_readfd;
#elif defined(WAIT_USE_EPOLL)
event->fd = signal_fd;
#else
event->fd = PGINVALID_SOCKET;
return event->pos;
#endif #endif
} }
else if (events == WL_POSTMASTER_DEATH) else if (events == WL_POSTMASTER_DEATH)
...@@ -932,12 +979,13 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch) ...@@ -932,12 +979,13 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
if (latch && latch->owner_pid != MyProcPid) if (latch && latch->owner_pid != MyProcPid)
elog(ERROR, "cannot wait on a latch owned by another process"); elog(ERROR, "cannot wait on a latch owned by another process");
set->latch = latch; set->latch = latch;
/* /*
* On Unix, we don't need to modify the kernel object because the * On Unix, we don't need to modify the kernel object because the
* underlying pipe is the same for all latches so we can return * underlying pipe (if there is one) is the same for all latches so we
* immediately. On Windows, we need to update our array of handles, * can return immediately. On Windows, we need to update our array of
* but we leave the old one in place and tolerate spurious wakeups if * handles, but we leave the old one in place and tolerate spurious
* the latch is disabled. * wakeups if the latch is disabled.
*/ */
#if defined(WAIT_USE_WIN32) #if defined(WAIT_USE_WIN32)
if (!latch) if (!latch)
...@@ -1421,8 +1469,8 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, ...@@ -1421,8 +1469,8 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
if (cur_event->events == WL_LATCH_SET && if (cur_event->events == WL_LATCH_SET &&
cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP)) cur_epoll_event->events & (EPOLLIN | EPOLLERR | EPOLLHUP))
{ {
/* There's data in the self-pipe, clear it. */ /* Drain the signalfd. */
drainSelfPipe(); drain();
if (set->latch && set->latch->is_set) if (set->latch && set->latch->is_set)
{ {
...@@ -1575,7 +1623,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, ...@@ -1575,7 +1623,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
cur_kqueue_event->filter == EVFILT_READ) cur_kqueue_event->filter == EVFILT_READ)
{ {
/* There's data in the self-pipe, clear it. */ /* There's data in the self-pipe, clear it. */
drainSelfPipe(); drain();
if (set->latch && set->latch->is_set) if (set->latch && set->latch->is_set)
{ {
...@@ -1691,7 +1739,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, ...@@ -1691,7 +1739,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
(cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL))) (cur_pollfd->revents & (POLLIN | POLLHUP | POLLERR | POLLNVAL)))
{ {
/* There's data in the self-pipe, clear it. */ /* There's data in the self-pipe, clear it. */
drainSelfPipe(); drain();
if (set->latch && set->latch->is_set) if (set->latch && set->latch->is_set)
{ {
...@@ -1951,7 +1999,8 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, ...@@ -1951,7 +1999,8 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
} }
#endif #endif
#ifndef WIN32 #if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE)
/* /*
* SetLatch uses SIGURG to wake up the process waiting on the latch. * SetLatch uses SIGURG to wake up the process waiting on the latch.
* *
...@@ -1967,10 +2016,8 @@ latch_sigurg_handler(SIGNAL_ARGS) ...@@ -1967,10 +2016,8 @@ latch_sigurg_handler(SIGNAL_ARGS)
errno = save_errno; errno = save_errno;
} }
#endif /* !WIN32 */
/* Send one byte to the self-pipe, to wake up WaitLatch */ /* Send one byte to the self-pipe, to wake up WaitLatch */
#ifndef WIN32
static void static void
sendSelfPipeByte(void) sendSelfPipeByte(void)
{ {
...@@ -2000,45 +2047,58 @@ retry: ...@@ -2000,45 +2047,58 @@ retry:
return; return;
} }
} }
#endif /* !WIN32 */
#endif
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_EPOLL)
/* /*
* Read all available data from the self-pipe * Read all available data from self-pipe or signalfd.
* *
* Note: this is only called when waiting = true. If it fails and doesn't * Note: this is only called when waiting = true. If it fails and doesn't
* return, it must reset that flag first (though ideally, this will never * return, it must reset that flag first (though ideally, this will never
* happen). * happen).
*/ */
#ifndef WIN32
static void static void
drainSelfPipe(void) drain(void)
{ {
/* char buf[1024];
* There shouldn't normally be more than one byte in the pipe, or maybe a
* few bytes if multiple processes run SetLatch at the same instant.
*/
char buf[16];
int rc; int rc;
int fd;
#ifdef WAIT_USE_POLL
fd = selfpipe_readfd;
#else
fd = signal_fd;
#endif
for (;;) for (;;)
{ {
rc = read(selfpipe_readfd, buf, sizeof(buf)); rc = read(fd, buf, sizeof(buf));
if (rc < 0) if (rc < 0)
{ {
if (errno == EAGAIN || errno == EWOULDBLOCK) if (errno == EAGAIN || errno == EWOULDBLOCK)
break; /* the pipe is empty */ break; /* the descriptor is empty */
else if (errno == EINTR) else if (errno == EINTR)
continue; /* retry */ continue; /* retry */
else else
{ {
waiting = false; waiting = false;
#ifdef WAIT_USE_POLL
elog(ERROR, "read() on self-pipe failed: %m"); elog(ERROR, "read() on self-pipe failed: %m");
#else
elog(ERROR, "read() on signalfd failed: %m");
#endif
} }
} }
else if (rc == 0) else if (rc == 0)
{ {
waiting = false; waiting = false;
#ifdef WAIT_USE_POLL
elog(ERROR, "unexpected EOF on self-pipe"); elog(ERROR, "unexpected EOF on self-pipe");
#else
elog(ERROR, "unexpected EOF on signalfd");
#endif
} }
else if (rc < sizeof(buf)) else if (rc < sizeof(buf))
{ {
...@@ -2048,4 +2108,5 @@ drainSelfPipe(void) ...@@ -2048,4 +2108,5 @@ drainSelfPipe(void)
/* else buffer wasn't big enough, so read again */ /* else buffer wasn't big enough, so read again */
} }
} }
#endif /* !WIN32 */
#endif
...@@ -118,6 +118,11 @@ InitPostmasterChild(void) ...@@ -118,6 +118,11 @@ InitPostmasterChild(void)
/* We don't want the postmaster's proc_exit() handlers */ /* We don't want the postmaster's proc_exit() handlers */
on_exit_reset(); on_exit_reset();
/* In EXEC_BACKEND case we will not have inherited BlockSig etc values */
#ifdef EXEC_BACKEND
pqinitmask();
#endif
/* Initialize process-local latch support */ /* Initialize process-local latch support */
InitializeLatchSupport(); InitializeLatchSupport();
MyLatch = &LocalLatchData; MyLatch = &LocalLatchData;
...@@ -135,11 +140,6 @@ InitPostmasterChild(void) ...@@ -135,11 +140,6 @@ InitPostmasterChild(void)
elog(FATAL, "setsid() failed: %m"); elog(FATAL, "setsid() failed: %m");
#endif #endif
/* In EXEC_BACKEND case we will not have inherited BlockSig etc values */
#ifdef EXEC_BACKEND
pqinitmask();
#endif
/* /*
* Every postmaster child process is expected to respond promptly to * Every postmaster child process is expected to respond promptly to
* SIGQUIT at all times. Therefore we centrally remove SIGQUIT from * SIGQUIT at all times. Therefore we centrally remove SIGQUIT from
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment