Commit 51004c71 authored by Tom Lane's avatar Tom Lane

Make some efficiency improvements in LISTEN/NOTIFY.

Move the responsibility for advancing the NOTIFY queue tail pointer
from the listener(s) to the notification sender, and only have the
sender do it once every few queue pages, rather than after every batch
of notifications as at present.  This reduces the number of times we
execute asyncQueueAdvanceTail, and reduces contention when there are
multiple listeners (since that function requires exclusive lock).
This change relies on the observation that we don't really need the tail
pointer to be exactly up-to-date.  It's certainly not necessary to
attempt to release disk space more often than once per SLRU segment.
The only other usage of the tail pointer is that an incoming listener,
if it's the only listener in its database, will need to scan the queue
forward from the tail; but that's surely a less performance-critical
path than routine sending and receiving of notifies.  We compromise by
advancing the tail pointer after every 4 pages of output, so that it
shouldn't get more than a few pages behind.

Also, when sending signals to other backends after adding notify
message(s) to the queue, recognize that only backends in our own
database are going to care about those messages, so only such
backends really need to be awakened promptly.  Backends in other
databases should get kicked if they're well behind on reading the
queue, else they'll hold back the global tail pointer; but wakening
them for every single message is pointless.  This change can
substantially reduce signal traffic if listeners are spread among
many databases.  It won't help for the common case of only a single
active database, but the extra check costs very little.

Martijn van Oosterhout, with some adjustments by me

Discussion: https://postgr.es/m/CADWG95vtRBFDdrx1JdT1_9nhOFw48KaeTev6F_LtDQAFVpSPhA@mail.gmail.com
Discussion: https://postgr.es/m/CADWG95uFj8rLM52Er80JnhRsTbb_AqPP1ANHS8XQRGbqLrU+jA@mail.gmail.com
parent 72c48c3f
...@@ -75,8 +75,10 @@ ...@@ -75,8 +75,10 @@
* list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal * list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
* to every listening backend (we don't know which backend is listening on * to every listening backend (we don't know which backend is listening on
* which channel so we must signal them all). We can exclude backends that * which channel so we must signal them all). We can exclude backends that
* are already up to date, though. We don't bother with a self-signal * are already up to date, though, and we can also exclude backends that
* either, but just process the queue directly. * are in other databases (unless they are way behind and should be kicked
* to make them advance their pointers). We don't bother with a
* self-signal either, but just process the queue directly.
* *
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
* sets the process's latch, which triggers the event to be processed * sets the process's latch, which triggers the event to be processed
...@@ -89,13 +91,14 @@ ...@@ -89,13 +91,14 @@
* Inbound-notify processing consists of reading all of the notifications * Inbound-notify processing consists of reading all of the notifications
* that have arrived since scanning last time. We read every notification * that have arrived since scanning last time. We read every notification
* until we reach either a notification from an uncommitted transaction or * until we reach either a notification from an uncommitted transaction or
* the head pointer's position. Then we check if we were the laziest * the head pointer's position.
* backend: if our pointer is set to the same position as the global tail *
* pointer is set, then we move the global tail pointer ahead to where the * 6. To avoid SLRU wraparound and limit disk space consumption, the tail
* second-laziest backend is (in general, we take the MIN of the current * pointer needs to be advanced so that old pages can be truncated.
* head position and all active backends' new tail pointers). Whenever we * This is relatively expensive (notably, it requires an exclusive lock),
* move the global tail pointer we also truncate now-unused pages (i.e., * so we don't want to do it often. We make sending backends do this work
* delete files in pg_notify/ that are no longer used). * if they advanced the queue head into a new page, but only once every
* QUEUE_CLEANUP_DELAY pages.
* *
* An application that listens on the same channel it notifies will get * An application that listens on the same channel it notifies will get
* NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful,
...@@ -211,6 +214,19 @@ typedef struct QueuePosition ...@@ -211,6 +214,19 @@ typedef struct QueuePosition
(x).page != (y).page ? (x) : \ (x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y)) (x).offset > (y).offset ? (x) : (y))
/*
* Parameter determining how often we try to advance the tail pointer:
* we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
* also the distance by which a backend in another database needs to be
* behind before we'll decide we need to wake it up to advance its pointer.
*
* Resist the temptation to make this really large. While that would save
* work in some places, it would add cost in others. In particular, this
* should likely be less than NUM_ASYNC_BUFFERS, to ensure that backends
* catch up before the pages they'll need to read fall out of SLRU cache.
*/
#define QUEUE_CLEANUP_DELAY 4
/* /*
* Struct describing a listening backend's status * Struct describing a listening backend's status
*/ */
...@@ -252,8 +268,8 @@ typedef struct QueueBackendStatus ...@@ -252,8 +268,8 @@ typedef struct QueueBackendStatus
typedef struct AsyncQueueControl typedef struct AsyncQueueControl
{ {
QueuePosition head; /* head points to the next free location */ QueuePosition head; /* head points to the next free location */
QueuePosition tail; /* the global tail is equivalent to the pos of QueuePosition tail; /* tail must be <= the queue position of every
* the "slowest" backend */ * listening backend */
BackendId firstListener; /* id of first listener, or InvalidBackendId */ BackendId firstListener; /* id of first listener, or InvalidBackendId */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
...@@ -402,10 +418,14 @@ static bool amRegisteredListener = false; ...@@ -402,10 +418,14 @@ static bool amRegisteredListener = false;
/* has this backend sent notifications in the current transaction? */ /* has this backend sent notifications in the current transaction? */
static bool backendHasSentNotifications = false; static bool backendHasSentNotifications = false;
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
static bool backendTryAdvanceTail = false;
/* GUC parameter */ /* GUC parameter */
bool Trace_notify = false; bool Trace_notify = false;
/* local function prototypes */ /* local function prototypes */
static int asyncQueuePageDiff(int p, int q);
static bool asyncQueuePagePrecedes(int p, int q); static bool asyncQueuePagePrecedes(int p, int q);
static void queue_listen(ListenActionKind action, const char *channel); static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg); static void Async_UnlistenOnExit(int code, Datum arg);
...@@ -421,7 +441,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); ...@@ -421,7 +441,7 @@ static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
static ListCell *asyncQueueAddEntries(ListCell *nextNotify); static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
static double asyncQueueUsage(void); static double asyncQueueUsage(void);
static void asyncQueueFillWarning(void); static void asyncQueueFillWarning(void);
static bool SignalBackends(void); static void SignalBackends(void);
static void asyncQueueReadAllNotifications(void); static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop, QueuePosition stop,
...@@ -436,10 +456,11 @@ static int notification_match(const void *key1, const void *key2, Size keysize); ...@@ -436,10 +456,11 @@ static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void); static void ClearPendingActionsAndNotifies(void);
/* /*
* We will work on the page range of 0..QUEUE_MAX_PAGE. * Compute the difference between two queue page numbers (i.e., p - q),
* accounting for wraparound.
*/ */
static bool static int
asyncQueuePagePrecedes(int p, int q) asyncQueuePageDiff(int p, int q)
{ {
int diff; int diff;
...@@ -455,7 +476,14 @@ asyncQueuePagePrecedes(int p, int q) ...@@ -455,7 +476,14 @@ asyncQueuePagePrecedes(int p, int q)
diff -= QUEUE_MAX_PAGE + 1; diff -= QUEUE_MAX_PAGE + 1;
else if (diff < -((QUEUE_MAX_PAGE + 1) / 2)) else if (diff < -((QUEUE_MAX_PAGE + 1) / 2))
diff += QUEUE_MAX_PAGE + 1; diff += QUEUE_MAX_PAGE + 1;
return diff < 0; return diff;
}
/* Is p < q, accounting for wraparound? */
static bool
asyncQueuePagePrecedes(int p, int q)
{
return asyncQueuePageDiff(p, q) < 0;
} }
/* /*
...@@ -1051,8 +1079,6 @@ Exec_ListenPreCommit(void) ...@@ -1051,8 +1079,6 @@ Exec_ListenPreCommit(void)
* notification to the frontend. Also, although our transaction might * notification to the frontend. Also, although our transaction might
* have executed NOTIFY, those message(s) aren't queued yet so we can't * have executed NOTIFY, those message(s) aren't queued yet so we can't
* see them in the queue. * see them in the queue.
*
* This will also advance the global tail pointer if possible.
*/ */
if (!QUEUE_POS_EQUAL(max, head)) if (!QUEUE_POS_EQUAL(max, head))
asyncQueueReadAllNotifications(); asyncQueueReadAllNotifications();
...@@ -1138,6 +1164,8 @@ Exec_UnlistenAllCommit(void) ...@@ -1138,6 +1164,8 @@ Exec_UnlistenAllCommit(void)
* of a transaction. If we issued any notifications in the just-completed * of a transaction. If we issued any notifications in the just-completed
* transaction, send signals to other backends to process them, and also * transaction, send signals to other backends to process them, and also
* process the queue ourselves to send messages to our own frontend. * process the queue ourselves to send messages to our own frontend.
* Also, if we filled enough queue pages with new notifies, try to advance
* the queue tail pointer.
* *
* The reason that this is not done in AtCommit_Notify is that there is * The reason that this is not done in AtCommit_Notify is that there is
* a nonzero chance of errors here (for example, encoding conversion errors * a nonzero chance of errors here (for example, encoding conversion errors
...@@ -1156,7 +1184,6 @@ void ...@@ -1156,7 +1184,6 @@ void
ProcessCompletedNotifies(void) ProcessCompletedNotifies(void)
{ {
MemoryContext caller_context; MemoryContext caller_context;
bool signalled;
/* Nothing to do if we didn't send any notifications */ /* Nothing to do if we didn't send any notifications */
if (!backendHasSentNotifications) if (!backendHasSentNotifications)
...@@ -1185,23 +1212,20 @@ ProcessCompletedNotifies(void) ...@@ -1185,23 +1212,20 @@ ProcessCompletedNotifies(void)
StartTransactionCommand(); StartTransactionCommand();
/* Send signals to other backends */ /* Send signals to other backends */
signalled = SignalBackends(); SignalBackends();
if (listenChannels != NIL) if (listenChannels != NIL)
{ {
/* Read the queue ourselves, and send relevant stuff to the frontend */ /* Read the queue ourselves, and send relevant stuff to the frontend */
asyncQueueReadAllNotifications(); asyncQueueReadAllNotifications();
} }
else if (!signalled)
{
/* /*
* If we found no other listening backends, and we aren't listening * If it's time to try to advance the global tail pointer, do that.
* ourselves, then we must execute asyncQueueAdvanceTail to flush the
* queue, because ain't nobody else gonna do it. This prevents queue
* overflow when we're sending useless notifies to nobody. (A new
* listener could have joined since we looked, but if so this is
* harmless.)
*/ */
if (backendTryAdvanceTail)
{
backendTryAdvanceTail = false;
asyncQueueAdvanceTail(); asyncQueueAdvanceTail();
} }
...@@ -1242,8 +1266,6 @@ IsListeningOn(const char *channel) ...@@ -1242,8 +1266,6 @@ IsListeningOn(const char *channel)
static void static void
asyncQueueUnregister(void) asyncQueueUnregister(void)
{ {
bool advanceTail;
Assert(listenChannels == NIL); /* else caller error */ Assert(listenChannels == NIL); /* else caller error */
if (!amRegisteredListener) /* nothing to do */ if (!amRegisteredListener) /* nothing to do */
...@@ -1253,10 +1275,7 @@ asyncQueueUnregister(void) ...@@ -1253,10 +1275,7 @@ asyncQueueUnregister(void)
* Need exclusive lock here to manipulate list links. * Need exclusive lock here to manipulate list links.
*/ */
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
/* check if entry is valid and oldest ... */ /* Mark our entry as invalid */
advanceTail = (MyProcPid == QUEUE_BACKEND_PID(MyBackendId)) &&
QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
/* ... then mark it invalid */
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid; QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
/* and remove it from the list */ /* and remove it from the list */
...@@ -1278,10 +1297,6 @@ asyncQueueUnregister(void) ...@@ -1278,10 +1297,6 @@ asyncQueueUnregister(void)
/* mark ourselves as no longer listed in the global array */ /* mark ourselves as no longer listed in the global array */
amRegisteredListener = false; amRegisteredListener = false;
/* If we were the laziest backend, try to advance the tail pointer */
if (advanceTail)
asyncQueueAdvanceTail();
} }
/* /*
...@@ -1467,6 +1482,15 @@ asyncQueueAddEntries(ListCell *nextNotify) ...@@ -1467,6 +1482,15 @@ asyncQueueAddEntries(ListCell *nextNotify)
* page without overrunning the queue. * page without overrunning the queue.
*/ */
slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head)); slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
/*
* If the new page address is a multiple of QUEUE_CLEANUP_DELAY,
* set flag to remember that we should try to advance the tail
* pointer (we don't want to actually do that right here).
*/
if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
backendTryAdvanceTail = true;
/* And exit the loop */ /* And exit the loop */
break; break;
} }
...@@ -1570,31 +1594,30 @@ asyncQueueFillWarning(void) ...@@ -1570,31 +1594,30 @@ asyncQueueFillWarning(void)
} }
/* /*
* Send signals to all listening backends (except our own). * Send signals to listening backends.
* *
* Returns true if we sent at least one signal. * We never signal our own process; that should be handled by our caller.
* *
* Since we need EXCLUSIVE lock anyway we also check the position of the other * Normally we signal only backends in our own database, since only those
* backends and in case one is already up-to-date we don't signal it. * backends could be interested in notifies we send. However, if there's
* This can happen if concurrent notifying transactions have sent a signal and * notify traffic in our database but no traffic in another database that
* the signaled backend has read the other notifications and ours in the same * does have listener(s), those listeners will fall further and further
* step. * behind. Waken them anyway if they're far enough behind, so that they'll
* advance their queue position pointers, allowing the global tail to advance.
* *
* Since we know the BackendId and the Pid the signalling is quite cheap. * Since we know the BackendId and the Pid the signalling is quite cheap.
*/ */
static bool static void
SignalBackends(void) SignalBackends(void)
{ {
bool signalled = false;
int32 *pids; int32 *pids;
BackendId *ids; BackendId *ids;
int count; int count;
int32 pid;
/* /*
* Identify all backends that are listening and not already up-to-date. We * Identify backends that we need to signal. We don't want to send
* don't want to send signals while holding the AsyncQueueLock, so we just * signals while holding the AsyncQueueLock, so this loop just builds a
* build a list of target PIDs. * list of target PIDs.
* *
* XXX in principle these pallocs could fail, which would be bad. Maybe * XXX in principle these pallocs could fail, which would be bad. Maybe
* preallocate the arrays? But in practice this is only run in trivial * preallocate the arrays? But in practice this is only run in trivial
...@@ -1607,26 +1630,43 @@ SignalBackends(void) ...@@ -1607,26 +1630,43 @@ SignalBackends(void)
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
{ {
pid = QUEUE_BACKEND_PID(i); int32 pid = QUEUE_BACKEND_PID(i);
QueuePosition pos;
Assert(pid != InvalidPid); Assert(pid != InvalidPid);
if (pid != MyProcPid) if (pid == MyProcPid)
continue; /* never signal self */
pos = QUEUE_BACKEND_POS(i);
if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
{ {
QueuePosition pos = QUEUE_BACKEND_POS(i); /*
* Always signal listeners in our own database, unless they're
if (!QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) * already caught up (unlikely, but possible).
*/
if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
continue;
}
else
{ {
/*
* Listeners in other databases should be signaled only if they
* are far behind.
*/
if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
continue;
}
/* OK, need to signal this one */
pids[count] = pid; pids[count] = pid;
ids[count] = i; ids[count] = i;
count++; count++;
} }
}
}
LWLockRelease(AsyncQueueLock); LWLockRelease(AsyncQueueLock);
/* Now send signals */ /* Now send signals */
for (int i = 0; i < count; i++) for (int i = 0; i < count; i++)
{ {
pid = pids[i]; int32 pid = pids[i];
/* /*
* Note: assuming things aren't broken, a signal failure here could * Note: assuming things aren't broken, a signal failure here could
...@@ -1636,14 +1676,10 @@ SignalBackends(void) ...@@ -1636,14 +1676,10 @@ SignalBackends(void)
*/ */
if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0) if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0)
elog(DEBUG3, "could not signal backend with PID %d: %m", pid); elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
else
signalled = true;
} }
pfree(pids); pfree(pids);
pfree(ids); pfree(ids);
return signalled;
} }
/* /*
...@@ -1844,7 +1880,6 @@ asyncQueueReadAllNotifications(void) ...@@ -1844,7 +1880,6 @@ asyncQueueReadAllNotifications(void)
QueuePosition oldpos; QueuePosition oldpos;
QueuePosition head; QueuePosition head;
Snapshot snapshot; Snapshot snapshot;
bool advanceTail;
/* page_buffer must be adequately aligned, so use a union */ /* page_buffer must be adequately aligned, so use a union */
union union
...@@ -1966,13 +2001,8 @@ asyncQueueReadAllNotifications(void) ...@@ -1966,13 +2001,8 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */ /* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED); LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos; QUEUE_BACKEND_POS(MyBackendId) = pos;
advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
LWLockRelease(AsyncQueueLock); LWLockRelease(AsyncQueueLock);
/* If we were the laziest backend, try to advance the tail pointer */
if (advanceTail)
asyncQueueAdvanceTail();
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();
...@@ -1980,13 +2010,8 @@ asyncQueueReadAllNotifications(void) ...@@ -1980,13 +2010,8 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */ /* Update shared state */
LWLockAcquire(AsyncQueueLock, LW_SHARED); LWLockAcquire(AsyncQueueLock, LW_SHARED);
QUEUE_BACKEND_POS(MyBackendId) = pos; QUEUE_BACKEND_POS(MyBackendId) = pos;
advanceTail = QUEUE_POS_EQUAL(oldpos, QUEUE_TAIL);
LWLockRelease(AsyncQueueLock); LWLockRelease(AsyncQueueLock);
/* If we were the laziest backend, try to advance the tail pointer */
if (advanceTail)
asyncQueueAdvanceTail();
/* Done with snapshot */ /* Done with snapshot */
UnregisterSnapshot(snapshot); UnregisterSnapshot(snapshot);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment