Commit 0fedb4ea authored by Andrew Dunstan's avatar Andrew Dunstan

Fix walsender timeouts when decoding a large transaction

The logical slots have a fast code path for sending data so as not to
impose too high a per message overhead. The fast path skips checks for
interrupts and timeouts. However, the existing coding failed to consider
the fact that a transaction with a large number of changes may take a
very long time to be processed and sent to the client. This causes the
walsender to ignore interrupts for potentially a long time and more
importantly it will result in the walsender being killed due to
timeout at the end of such a transaction.

This commit changes the fast path to also check for interrupts and only
allows calling the fast path when the last keepalive check happened less
than half the walsender timeout ago. Otherwise the slower code path will
be taken.

Backpatched to 9.4

Petr Jelinek, reviewed by  Kyotaro HORIGUCHI, Yura Sokolov,  Craig
Ringer and Robert Haas.

Discussion: https://postgr.es/m/e082a56a-fd95-a250-3bae-0fff93832510@2ndquadrant.com
parent 1fcd0ade
...@@ -1151,6 +1151,8 @@ static void ...@@ -1151,6 +1151,8 @@ static void
WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
bool last_write) bool last_write)
{ {
TimestampTz now;
/* output previously gathered data in a CopyData packet */ /* output previously gathered data in a CopyData packet */
pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
...@@ -1160,23 +1162,54 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, ...@@ -1160,23 +1162,54 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
* several releases by streaming physical replication. * several releases by streaming physical replication.
*/ */
resetStringInfo(&tmpbuf); resetStringInfo(&tmpbuf);
pq_sendint64(&tmpbuf, GetCurrentTimestamp()); now = GetCurrentTimestamp();
pq_sendint64(&tmpbuf, now);
memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)], memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
tmpbuf.data, sizeof(int64)); tmpbuf.data, sizeof(int64));
/* fast path */ CHECK_FOR_INTERRUPTS();
/* Try to flush pending output to the client */ /* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0) if (pq_flush_if_writable() != 0)
WalSndShutdown(); WalSndShutdown();
if (!pq_is_send_pending()) /* Try taking fast path unless we get too close to walsender timeout. */
if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
wal_sender_timeout / 2) &&
!pq_is_send_pending())
{
return; return;
}
/* If we have pending write here, go to slow path */
for (;;) for (;;)
{ {
int wakeEvents; int wakeEvents;
long sleeptime; long sleeptime;
TimestampTz now;
/* Check for input from the client */
ProcessRepliesIfAny();
now = GetCurrentTimestamp();
/* die if timeout was reached */
WalSndCheckTimeOut(now);
/* Send keepalive if the time has come */
WalSndKeepaliveIfNecessary(now);
if (!pq_is_send_pending())
break;
sleeptime = WalSndComputeSleeptime(now);
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
/* Sleep until something happens or we time out */
WaitLatchOrSocket(MyLatch, wakeEvents,
MyProcPort->sock, sleeptime,
WAIT_EVENT_WAL_SENDER_WRITE_DATA);
/* /*
* Emergency bailout if postmaster has died. This is to avoid the * Emergency bailout if postmaster has died. This is to avoid the
...@@ -1198,34 +1231,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, ...@@ -1198,34 +1231,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
SyncRepInitConfig(); SyncRepInitConfig();
} }
/* Check for input from the client */
ProcessRepliesIfAny();
/* Try to flush pending output to the client */ /* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0) if (pq_flush_if_writable() != 0)
WalSndShutdown(); WalSndShutdown();
/* If we finished clearing the buffered data, we're done here. */
if (!pq_is_send_pending())
break;
now = GetCurrentTimestamp();
/* die if timeout was reached */
WalSndCheckTimeOut(now);
/* Send keepalive if the time has come */
WalSndKeepaliveIfNecessary(now);
sleeptime = WalSndComputeSleeptime(now);
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
/* Sleep until something happens or we time out */
WaitLatchOrSocket(MyLatch, wakeEvents,
MyProcPort->sock, sleeptime,
WAIT_EVENT_WAL_SENDER_WRITE_DATA);
} }
/* reactivate latch so WalSndLoop knows to continue */ /* reactivate latch so WalSndLoop knows to continue */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment