Commit 9c83b54a authored by Tom Lane's avatar Tom Lane

Fix a recently-introduced race condition in LISTEN/NOTIFY handling.

Commit 566372b3 fixed some race conditions involving concurrent
SimpleLruTruncate calls, but it introduced new ones in async.c.
A newly-listening backend could attempt to read Notify SLRU pages that
were in process of being truncated, possibly causing an error.  Also,
the QUEUE_TAIL pointer could become set to a value that's not equal to
the queue position of any backend.  While that's fairly harmless in
v13 and up (thanks to commit 51004c71), in older branches it resulted
in near-permanent disabling of the queue truncation logic, so that
continued use of NOTIFY led to queue-fill warnings and eventual
inability to send any more notifies.  (A server restart is enough to
make that go away, but it's still pretty unpleasant.)

The core of the problem is confusion about whether QUEUE_TAIL
represents the "logical" tail of the queue (i.e., the oldest
still-interesting data) or the "physical" tail (the oldest data we've
not yet truncated away).  To fix, split that into two variables.
QUEUE_TAIL regains its definition as the logical tail, and we
introduce a new variable to track the oldest un-truncated page.

Per report from Mikael Gustavsson.  Like the previous patch,
back-patch to all supported branches.

Discussion: https://postgr.es/m/1b8561412e8a4f038d7a491c8b922788@smhi.se
parent 3df51ca8
...@@ -255,7 +255,7 @@ typedef struct QueueBackendStatus ...@@ -255,7 +255,7 @@ typedef struct QueueBackendStatus
* When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
* entries of other backends and also change the head pointer. When holding * entries of other backends and also change the head pointer. When holding
* both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
* can change the tail pointer. * can change the tail pointers.
* *
* NotifySLRULock is used as the control lock for the pg_notify SLRU buffers. * NotifySLRULock is used as the control lock for the pg_notify SLRU buffers.
* In order to avoid deadlocks, whenever we need multiple locks, we first get * In order to avoid deadlocks, whenever we need multiple locks, we first get
...@@ -276,6 +276,8 @@ typedef struct AsyncQueueControl ...@@ -276,6 +276,8 @@ typedef struct AsyncQueueControl
QueuePosition head; /* head points to the next free location */ QueuePosition head; /* head points to the next free location */
QueuePosition tail; /* tail must be <= the queue position of every QueuePosition tail; /* tail must be <= the queue position of every
* listening backend */ * listening backend */
int stopPage; /* oldest unrecycled page; must be <=
* tail.page */
BackendId firstListener; /* id of first listener, or InvalidBackendId */ BackendId firstListener; /* id of first listener, or InvalidBackendId */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
...@@ -286,6 +288,7 @@ static AsyncQueueControl *asyncQueueControl; ...@@ -286,6 +288,7 @@ static AsyncQueueControl *asyncQueueControl;
#define QUEUE_HEAD (asyncQueueControl->head) #define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail) #define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
#define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener) #define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener)
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid) #define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
...@@ -537,6 +540,7 @@ AsyncShmemInit(void) ...@@ -537,6 +540,7 @@ AsyncShmemInit(void)
/* First time through, so initialize it */ /* First time through, so initialize it */
SET_QUEUE_POS(QUEUE_HEAD, 0, 0); SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
SET_QUEUE_POS(QUEUE_TAIL, 0, 0); SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
QUEUE_STOP_PAGE = 0;
QUEUE_FIRST_LISTENER = InvalidBackendId; QUEUE_FIRST_LISTENER = InvalidBackendId;
asyncQueueControl->lastQueueFillWarn = 0; asyncQueueControl->lastQueueFillWarn = 0;
/* zero'th entry won't be used, but let's initialize it anyway */ /* zero'th entry won't be used, but let's initialize it anyway */
...@@ -1358,7 +1362,7 @@ asyncQueueIsFull(void) ...@@ -1358,7 +1362,7 @@ asyncQueueIsFull(void)
nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1; nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
if (nexthead > QUEUE_MAX_PAGE) if (nexthead > QUEUE_MAX_PAGE)
nexthead = 0; /* wrap around */ nexthead = 0; /* wrap around */
boundary = QUEUE_POS_PAGE(QUEUE_TAIL); boundary = QUEUE_STOP_PAGE;
boundary -= boundary % SLRU_PAGES_PER_SEGMENT; boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
return asyncQueuePagePrecedes(nexthead, boundary); return asyncQueuePagePrecedes(nexthead, boundary);
} }
...@@ -1572,6 +1576,11 @@ pg_notification_queue_usage(PG_FUNCTION_ARGS) ...@@ -1572,6 +1576,11 @@ pg_notification_queue_usage(PG_FUNCTION_ARGS)
* Return the fraction of the queue that is currently occupied. * Return the fraction of the queue that is currently occupied.
* *
* The caller must hold NotifyQueueLock in (at least) shared mode. * The caller must hold NotifyQueueLock in (at least) shared mode.
*
* Note: we measure the distance to the logical tail page, not the physical
* tail page. In some sense that's wrong, but the relative position of the
* physical tail is affected by details such as SLRU segment boundaries,
* so that a result based on that is unpleasantly unstable.
*/ */
static double static double
asyncQueueUsage(void) asyncQueueUsage(void)
...@@ -2178,7 +2187,23 @@ asyncQueueAdvanceTail(void) ...@@ -2178,7 +2187,23 @@ asyncQueueAdvanceTail(void)
/* Restrict task to one backend per cluster; see SimpleLruTruncate(). */ /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE); LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
/* Compute the new tail. */ /*
* Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact
* (ie, exactly match at least one backend's queue position), so it must
* be updated atomically with the actual computation. Since v13, we could
* get away with not doing it like that, but it seems prudent to keep it
* so.
*
* Also, because incoming backends will scan forward from QUEUE_TAIL, that
* must be advanced before we can truncate any data. Thus, QUEUE_TAIL is
* the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest
* un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL),
* there are pages we can truncate but haven't yet finished doing so.
*
* For concurrency's sake, we don't want to hold NotifyQueueLock while
* performing SimpleLruTruncate. This is OK because no backend will try
* to access the pages we are in the midst of truncating.
*/
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
min = QUEUE_HEAD; min = QUEUE_HEAD;
for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
...@@ -2186,7 +2211,8 @@ asyncQueueAdvanceTail(void) ...@@ -2186,7 +2211,8 @@ asyncQueueAdvanceTail(void)
Assert(QUEUE_BACKEND_PID(i) != InvalidPid); Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
} }
oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL); QUEUE_TAIL = min;
oldtailpage = QUEUE_STOP_PAGE;
LWLockRelease(NotifyQueueLock); LWLockRelease(NotifyQueueLock);
/* /*
...@@ -2205,16 +2231,16 @@ asyncQueueAdvanceTail(void) ...@@ -2205,16 +2231,16 @@ asyncQueueAdvanceTail(void)
* release the lock again. * release the lock again.
*/ */
SimpleLruTruncate(NotifyCtl, newtailpage); SimpleLruTruncate(NotifyCtl, newtailpage);
}
/* /*
* Advertise the new tail. This changes asyncQueueIsFull()'s verdict for * Update QUEUE_STOP_PAGE. This changes asyncQueueIsFull()'s verdict
* the segment immediately prior to the new tail, allowing fresh data into * for the segment immediately prior to the old tail, allowing fresh
* that segment. * data into that segment.
*/ */
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
QUEUE_TAIL = min; QUEUE_STOP_PAGE = newtailpage;
LWLockRelease(NotifyQueueLock); LWLockRelease(NotifyQueueLock);
}
LWLockRelease(NotifyQueueTailLock); LWLockRelease(NotifyQueueTailLock);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment