Commit ae90128d authored by Tom Lane's avatar Tom Lane

Fix NOTIFY to cope with I/O problems, such as out-of-disk-space.

The LISTEN/NOTIFY subsystem got confused if SimpleLruZeroPage failed,
which would typically happen as a result of a write() failure while
attempting to dump a dirty pg_notify page out of memory.  Subsequently,
all attempts to send more NOTIFY messages would fail with messages like
"Could not read from file "pg_notify/nnnn" at offset nnnnn: Success".
Only restarting the server would clear this condition.  Per reports from
Kevin Grittner and Christoph Berg.

Back-patch to 9.0, where the problem was introduced during the
LISTEN/NOTIFY rewrite.
parent 9e26326a
...@@ -1285,6 +1285,7 @@ static ListCell * ...@@ -1285,6 +1285,7 @@ static ListCell *
asyncQueueAddEntries(ListCell *nextNotify) asyncQueueAddEntries(ListCell *nextNotify)
{ {
AsyncQueueEntry qe; AsyncQueueEntry qe;
QueuePosition queue_head;
int pageno; int pageno;
int offset; int offset;
int slotno; int slotno;
...@@ -1292,8 +1293,21 @@ asyncQueueAddEntries(ListCell *nextNotify) ...@@ -1292,8 +1293,21 @@ asyncQueueAddEntries(ListCell *nextNotify)
/* We hold both AsyncQueueLock and AsyncCtlLock during this operation */ /* We hold both AsyncQueueLock and AsyncCtlLock during this operation */
LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE);
/*
* We work with a local copy of QUEUE_HEAD, which we write back to shared
* memory upon exiting. The reason for this is that if we have to advance
* to a new page, SimpleLruZeroPage might fail (out of disk space, for
* instance), and we must not advance QUEUE_HEAD if it does. (Otherwise,
* subsequent insertions would try to put entries into a page that slru.c
* thinks doesn't exist yet.) So, use a local position variable. Note
* that if we do fail, any already-inserted queue entries are forgotten;
* this is okay, since they'd be useless anyway after our transaction
* rolls back.
*/
queue_head = QUEUE_HEAD;
/* Fetch the current page */ /* Fetch the current page */
pageno = QUEUE_POS_PAGE(QUEUE_HEAD); pageno = QUEUE_POS_PAGE(queue_head);
slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId); slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId);
/* Note we mark the page dirty before writing in it */ /* Note we mark the page dirty before writing in it */
AsyncCtl->shared->page_dirty[slotno] = true; AsyncCtl->shared->page_dirty[slotno] = true;
...@@ -1305,7 +1319,7 @@ asyncQueueAddEntries(ListCell *nextNotify) ...@@ -1305,7 +1319,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
/* Construct a valid queue entry in local variable qe */ /* Construct a valid queue entry in local variable qe */
asyncQueueNotificationToEntry(n, &qe); asyncQueueNotificationToEntry(n, &qe);
offset = QUEUE_POS_OFFSET(QUEUE_HEAD); offset = QUEUE_POS_OFFSET(queue_head);
/* Check whether the entry really fits on the current page */ /* Check whether the entry really fits on the current page */
if (offset + qe.length <= QUEUE_PAGESIZE) if (offset + qe.length <= QUEUE_PAGESIZE)
...@@ -1331,8 +1345,8 @@ asyncQueueAddEntries(ListCell *nextNotify) ...@@ -1331,8 +1345,8 @@ asyncQueueAddEntries(ListCell *nextNotify)
&qe, &qe,
qe.length); qe.length);
/* Advance QUEUE_HEAD appropriately, and note if page is full */ /* Advance queue_head appropriately, and detect if page is full */
if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length)) if (asyncQueueAdvance(&(queue_head), qe.length))
{ {
/* /*
* Page is full, so we're done here, but first fill the next page * Page is full, so we're done here, but first fill the next page
...@@ -1342,12 +1356,15 @@ asyncQueueAddEntries(ListCell *nextNotify) ...@@ -1342,12 +1356,15 @@ asyncQueueAddEntries(ListCell *nextNotify)
* asyncQueueIsFull() ensured that there is room to create this * asyncQueueIsFull() ensured that there is room to create this
* page without overrunning the queue. * page without overrunning the queue.
*/ */
slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD)); slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head));
/* And exit the loop */ /* And exit the loop */
break; break;
} }
} }
/* Success, so update the global QUEUE_HEAD */
QUEUE_HEAD = queue_head;
LWLockRelease(AsyncCtlLock); LWLockRelease(AsyncCtlLock);
return nextNotify; return nextNotify;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment