Commit 6148656a authored by Thomas Munro's avatar Thomas Munro

Use EVFILT_SIGNAL for kqueue latches.

Cut down on system calls and other overheads by waiting for SIGURG
explicitly with kqueue instead of using a signal handler and self-pipe.
Affects *BSD and macOS systems.

This leaves only the poll implementation with a signal handler and the
traditional self-pipe trick.

Discussion: https://postgr.es/m/CA+hUKGJjxPDpzBE0a3hyUywBvaZuC89yx3jK9RFZgfv_KHU7gg@mail.gmail.com
parent 6a2a70a0
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
* don't need to register a signal handler or create our own self-pipe. We * don't need to register a signal handler or create our own self-pipe. We
* assume that any system that has Linux epoll() also has Linux signalfd(). * assume that any system that has Linux epoll() also has Linux signalfd().
* *
* The kqueue() implementation waits for SIGURG with EVFILT_SIGNAL.
*
* The Windows implementation uses Windows events that are inherited by all * The Windows implementation uses Windows events that are inherited by all
* postmaster child processes. There's no need for the self-pipe trick there. * postmaster child processes. There's no need for the self-pipe trick there.
* *
...@@ -150,7 +152,7 @@ static volatile sig_atomic_t waiting = false; ...@@ -150,7 +152,7 @@ static volatile sig_atomic_t waiting = false;
static int signal_fd = -1; static int signal_fd = -1;
#endif #endif
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE) #if defined(WAIT_USE_POLL)
/* Read and write ends of the self-pipe */ /* Read and write ends of the self-pipe */
static int selfpipe_readfd = -1; static int selfpipe_readfd = -1;
static int selfpipe_writefd = -1; static int selfpipe_writefd = -1;
...@@ -189,7 +191,7 @@ static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, ...@@ -189,7 +191,7 @@ static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
void void
InitializeLatchSupport(void) InitializeLatchSupport(void)
{ {
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE) #if defined(WAIT_USE_POLL)
int pipefd[2]; int pipefd[2];
if (IsUnderPostmaster) if (IsUnderPostmaster)
...@@ -277,6 +279,11 @@ InitializeLatchSupport(void) ...@@ -277,6 +279,11 @@ InitializeLatchSupport(void)
elog(FATAL, "signalfd() failed"); elog(FATAL, "signalfd() failed");
ReserveExternalFD(); ReserveExternalFD();
#endif #endif
#ifdef WAIT_USE_KQUEUE
/* Ignore SIGURG, because we'll receive it via kqueue. */
pqsignal(SIGURG, SIG_IGN);
#endif
} }
void void
...@@ -300,7 +307,7 @@ InitializeLatchWaitSet(void) ...@@ -300,7 +307,7 @@ InitializeLatchWaitSet(void)
void void
ShutdownLatchSupport(void) ShutdownLatchSupport(void)
{ {
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE) #if defined(WAIT_USE_POLL)
pqsignal(SIGURG, SIG_IGN); pqsignal(SIGURG, SIG_IGN);
#endif #endif
...@@ -310,7 +317,7 @@ ShutdownLatchSupport(void) ...@@ -310,7 +317,7 @@ ShutdownLatchSupport(void)
LatchWaitSet = NULL; LatchWaitSet = NULL;
} }
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE) #if defined(WAIT_USE_POLL)
close(selfpipe_readfd); close(selfpipe_readfd);
close(selfpipe_writefd); close(selfpipe_writefd);
selfpipe_readfd = -1; selfpipe_readfd = -1;
...@@ -335,7 +342,7 @@ InitLatch(Latch *latch) ...@@ -335,7 +342,7 @@ InitLatch(Latch *latch)
latch->owner_pid = MyProcPid; latch->owner_pid = MyProcPid;
latch->is_shared = false; latch->is_shared = false;
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE) #if defined(WAIT_USE_POLL)
/* Assert InitializeLatchSupport has been called in this process */ /* Assert InitializeLatchSupport has been called in this process */
Assert(selfpipe_readfd >= 0 && selfpipe_owner_pid == MyProcPid); Assert(selfpipe_readfd >= 0 && selfpipe_owner_pid == MyProcPid);
#elif defined(WAIT_USE_WIN32) #elif defined(WAIT_USE_WIN32)
...@@ -399,7 +406,7 @@ OwnLatch(Latch *latch) ...@@ -399,7 +406,7 @@ OwnLatch(Latch *latch)
/* Sanity checks */ /* Sanity checks */
Assert(latch->is_shared); Assert(latch->is_shared);
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE) #if defined(WAIT_USE_POLL)
/* Assert InitializeLatchSupport has been called in this process */ /* Assert InitializeLatchSupport has been called in this process */
Assert(selfpipe_readfd >= 0 && selfpipe_owner_pid == MyProcPid); Assert(selfpipe_readfd >= 0 && selfpipe_owner_pid == MyProcPid);
#endif #endif
...@@ -611,7 +618,7 @@ SetLatch(Latch *latch) ...@@ -611,7 +618,7 @@ SetLatch(Latch *latch)
return; return;
else if (owner_pid == MyProcPid) else if (owner_pid == MyProcPid)
{ {
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE) #if defined(WAIT_USE_POLL)
if (waiting) if (waiting)
sendSelfPipeByte(); sendSelfPipeByte();
#else #else
...@@ -898,13 +905,15 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch, ...@@ -898,13 +905,15 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
{ {
set->latch = latch; set->latch = latch;
set->latch_pos = event->pos; set->latch_pos = event->pos;
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE) #if defined(WAIT_USE_POLL)
event->fd = selfpipe_readfd; event->fd = selfpipe_readfd;
#elif defined(WAIT_USE_EPOLL) #elif defined(WAIT_USE_EPOLL)
event->fd = signal_fd; event->fd = signal_fd;
#else #else
event->fd = PGINVALID_SOCKET; event->fd = PGINVALID_SOCKET;
#ifdef WAIT_USE_EPOLL
return event->pos; return event->pos;
#endif
#endif #endif
} }
else if (events == WL_POSTMASTER_DEATH) else if (events == WL_POSTMASTER_DEATH)
...@@ -1125,6 +1134,18 @@ WaitEventAdjustKqueueAddPostmaster(struct kevent *k_ev, WaitEvent *event) ...@@ -1125,6 +1134,18 @@ WaitEventAdjustKqueueAddPostmaster(struct kevent *k_ev, WaitEvent *event)
AccessWaitEvent(k_ev) = event; AccessWaitEvent(k_ev) = event;
} }
static inline void
WaitEventAdjustKqueueAddLatch(struct kevent *k_ev, WaitEvent *event)
{
/* For now latch can only be added, not removed. */
k_ev->ident = SIGURG;
k_ev->filter = EVFILT_SIGNAL;
k_ev->flags = EV_ADD;
k_ev->fflags = 0;
k_ev->data = 0;
AccessWaitEvent(k_ev) = event;
}
/* /*
* old_events is the previous event mask, used to compute what has changed. * old_events is the previous event mask, used to compute what has changed.
*/ */
...@@ -1156,6 +1177,11 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) ...@@ -1156,6 +1177,11 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
*/ */
WaitEventAdjustKqueueAddPostmaster(&k_ev[count++], event); WaitEventAdjustKqueueAddPostmaster(&k_ev[count++], event);
} }
else if (event->events == WL_LATCH_SET)
{
/* We detect latch wakeup using a signal event. */
WaitEventAdjustKqueueAddLatch(&k_ev[count++], event);
}
else else
{ {
/* /*
...@@ -1163,11 +1189,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events) ...@@ -1163,11 +1189,9 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
* old event mask to the new event mask, since kevent treats readable * old event mask to the new event mask, since kevent treats readable
* and writable as separate events. * and writable as separate events.
*/ */
if (old_events == WL_LATCH_SET || if (old_events & WL_SOCKET_READABLE)
(old_events & WL_SOCKET_READABLE))
old_filt_read = true; old_filt_read = true;
if (event->events == WL_LATCH_SET || if (event->events & WL_SOCKET_READABLE)
(event->events & WL_SOCKET_READABLE))
new_filt_read = true; new_filt_read = true;
if (old_events & WL_SOCKET_WRITEABLE) if (old_events & WL_SOCKET_WRITEABLE)
old_filt_write = true; old_filt_write = true;
...@@ -1620,11 +1644,8 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, ...@@ -1620,11 +1644,8 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
occurred_events->events = 0; occurred_events->events = 0;
if (cur_event->events == WL_LATCH_SET && if (cur_event->events == WL_LATCH_SET &&
cur_kqueue_event->filter == EVFILT_READ) cur_kqueue_event->filter == EVFILT_SIGNAL)
{ {
/* There's data in the self-pipe, clear it. */
drain();
if (set->latch && set->latch->is_set) if (set->latch && set->latch->is_set)
{ {
occurred_events->fd = PGINVALID_SOCKET; occurred_events->fd = PGINVALID_SOCKET;
...@@ -1999,7 +2020,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout, ...@@ -1999,7 +2020,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
} }
#endif #endif
#if defined(WAIT_USE_POLL) || defined(WAIT_USE_KQUEUE) #if defined(WAIT_USE_POLL)
/* /*
* SetLatch uses SIGURG to wake up the process waiting on the latch. * SetLatch uses SIGURG to wake up the process waiting on the latch.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment