Commit fca85f8e authored by Tom Lane's avatar Tom Lane

Fix walsender to exit promptly if client requests shutdown.

It's possible for WalSndWaitForWal to be asked to wait for WAL that doesn't
exist yet.  That's fine, in fact it's the normal situation if we're caught
up; but when the client requests shutdown we should not keep waiting.
The previous coding could wait indefinitely if the source server was idle.

In passing, improve the rather weak comments in this area, and slightly
rearrange some related code for better readability.

Back-patch to 9.4 where this code was introduced.

Discussion: https://postgr.es/m/14154.1498781234@sss.pgh.pa.us
parent 13a57710
...@@ -764,15 +764,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req ...@@ -764,15 +764,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
/* make sure we have enough WAL available */ /* make sure we have enough WAL available */
flushptr = WalSndWaitForWal(targetPagePtr + reqLen); flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
/* more than one block available */ /* fail if not (implies we are going to shut down) */
if (targetPagePtr + XLOG_BLCKSZ <= flushptr) if (flushptr < targetPagePtr + reqLen)
count = XLOG_BLCKSZ;
/* not enough WAL synced, that can happen during shutdown */
else if (targetPagePtr + reqLen > flushptr)
return -1; return -1;
/* part of the page available */
if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
count = XLOG_BLCKSZ; /* more than one block available */
else else
count = flushptr - targetPagePtr; count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */ /* now actually read the data, we know it's there */
XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ); XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
...@@ -1266,7 +1265,11 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId ...@@ -1266,7 +1265,11 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
} }
/* /*
* Wait till WAL < loc is flushed to disk so it can be safely read. * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
*
* Returns end LSN of flushed WAL. Normally this will be >= loc, but
* if we detect a shutdown request (either from postmaster or client)
* we will return early, so caller must always check.
*/ */
static XLogRecPtr static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc) WalSndWaitForWal(XLogRecPtr loc)
...@@ -1333,9 +1336,7 @@ WalSndWaitForWal(XLogRecPtr loc) ...@@ -1333,9 +1336,7 @@ WalSndWaitForWal(XLogRecPtr loc)
RecentFlushPtr = GetXLogReplayRecPtr(NULL); RecentFlushPtr = GetXLogReplayRecPtr(NULL);
/* /*
* If postmaster asked us to stop, don't wait here anymore. This will * If postmaster asked us to stop, don't wait anymore.
* cause the xlogreader to return without reading a full record, which
* is the fastest way to reach the mainloop which then can quit.
* *
* It's important to do this check after the recomputation of * It's important to do this check after the recomputation of
* RecentFlushPtr, so we can send all remaining data before shutting * RecentFlushPtr, so we can send all remaining data before shutting
...@@ -1366,14 +1367,20 @@ WalSndWaitForWal(XLogRecPtr loc) ...@@ -1366,14 +1367,20 @@ WalSndWaitForWal(XLogRecPtr loc)
WalSndCaughtUp = true; WalSndCaughtUp = true;
/* /*
* Try to flush pending output to the client. Also wait for the socket * Try to flush any pending output to the client.
* becoming writable, if there's still pending output after an attempt
* to flush. Otherwise we might just sit on output data while waiting
* for new WAL being generated.
*/ */
if (pq_flush_if_writable() != 0) if (pq_flush_if_writable() != 0)
WalSndShutdown(); WalSndShutdown();
/*
* If we have received CopyDone from the client, sent CopyDone
* ourselves, and the output buffer is empty, it's time to exit
* streaming, so fail the current WAL fetch request.
*/
if (streamingDoneReceiving && streamingDoneSending &&
!pq_is_send_pending())
break;
now = GetCurrentTimestamp(); now = GetCurrentTimestamp();
/* die if timeout was reached */ /* die if timeout was reached */
...@@ -1382,6 +1389,13 @@ WalSndWaitForWal(XLogRecPtr loc) ...@@ -1382,6 +1389,13 @@ WalSndWaitForWal(XLogRecPtr loc)
/* Send keepalive if the time has come */ /* Send keepalive if the time has come */
WalSndKeepaliveIfNecessary(now); WalSndKeepaliveIfNecessary(now);
/*
* Sleep until something happens or we time out. Also wait for the
* socket becoming writable, if there's still pending output.
* Otherwise we might sit on sendable output data while waiting for
* new WAL to be generated. (But if we have nothing to send, we don't
* want to wake on socket-writable.)
*/
sleeptime = WalSndComputeSleeptime(now); sleeptime = WalSndComputeSleeptime(now);
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
...@@ -1390,7 +1404,6 @@ WalSndWaitForWal(XLogRecPtr loc) ...@@ -1390,7 +1404,6 @@ WalSndWaitForWal(XLogRecPtr loc)
if (pq_is_send_pending()) if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE; wakeEvents |= WL_SOCKET_WRITEABLE;
/* Sleep until something happens or we time out */
WaitLatchOrSocket(MyLatch, wakeEvents, WaitLatchOrSocket(MyLatch, wakeEvents,
MyProcPort->sock, sleeptime, MyProcPort->sock, sleeptime,
WAIT_EVENT_WAL_SENDER_WAIT_WAL); WAIT_EVENT_WAL_SENDER_WAIT_WAL);
...@@ -2115,7 +2128,8 @@ WalSndLoop(WalSndSendDataCallback send_data) ...@@ -2115,7 +2128,8 @@ WalSndLoop(WalSndSendDataCallback send_data)
* ourselves, and the output buffer is empty, it's time to exit * ourselves, and the output buffer is empty, it's time to exit
* streaming. * streaming.
*/ */
if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving) if (streamingDoneReceiving && streamingDoneSending &&
!pq_is_send_pending())
break; break;
/* /*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment