Commit a9dad564 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Teach pg_basebackup and pg_receivexlog to reply to server keepalives.

Without this, the connection will be killed after timeout if
wal_sender_timeout is set in the server.

Original patch by Amit Kapila, modified by me to fit recent changes in the
code.
parent 9e45e038
...@@ -377,10 +377,10 @@ PostgreSQL documentation ...@@ -377,10 +377,10 @@ PostgreSQL documentation
<listitem> <listitem>
<para> <para>
Specifies the number of seconds between status packets sent back to the Specifies the number of seconds between status packets sent back to the
server. This is required when streaming the transaction log (using server. This allows for easier monitoring of the progress from server.
<literal>--xlog=stream</literal>) if replication timeout is configured A value of zero disables the periodic status updates completely,
on the server, and allows for easier monitoring. A value of zero disables although an update will still be sent when requested by the server, to
the status updates completely. The default value is 10 seconds. avoid timeout disconnect. The default value is 10 seconds.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
......
...@@ -155,9 +155,10 @@ PostgreSQL documentation ...@@ -155,9 +155,10 @@ PostgreSQL documentation
<listitem> <listitem>
<para> <para>
Specifies the number of seconds between status packets sent back to the Specifies the number of seconds between status packets sent back to the
server. This is required if replication timeout is configured on the server. This allows for easier monitoring of the progress from server.
server, and allows for easier monitoring. A value of zero disables the A value of zero disables the periodic status updates completely,
status updates completely. The default value is 10 seconds. although an update will still be sent when requested by the server, to
avoid timeout disconnect. The default value is 10 seconds.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
......
...@@ -287,7 +287,7 @@ recvint64(char *buf) ...@@ -287,7 +287,7 @@ recvint64(char *buf)
* Send a Standby Status Update message to server. * Send a Standby Status Update message to server.
*/ */
static bool static bool
sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now) sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
{ {
char replybuf[1 + 8 + 8 + 8 + 8 + 1]; char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0; int len = 0;
...@@ -302,7 +302,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now) ...@@ -302,7 +302,7 @@ sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now)
len += 8; len += 8;
sendint64(now, &replybuf[len]); /* sendTime */ sendint64(now, &replybuf[len]); /* sendTime */
len += 8; len += 8;
replybuf[len] = 0; /* replyRequested */ replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
len += 1; len += 1;
if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
...@@ -413,6 +413,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -413,6 +413,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
int bytes_left; int bytes_left;
int bytes_written; int bytes_written;
int64 now; int64 now;
int hdr_len;
if (copybuf != NULL) if (copybuf != NULL)
{ {
...@@ -441,7 +442,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -441,7 +442,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
standby_message_timeout)) standby_message_timeout))
{ {
/* Time to send feedback! */ /* Time to send feedback! */
if (!sendFeedback(conn, blockpos, now)) if (!sendFeedback(conn, blockpos, now, false))
goto error; goto error;
last_status = now; last_status = now;
} }
...@@ -520,10 +521,34 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -520,10 +521,34 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Check the message type. */ /* Check the message type. */
if (copybuf[0] == 'k') if (copybuf[0] == 'k')
{ {
int pos;
bool replyRequested;
/* /*
* keepalive message, sent in 9.2 and newer. We just ignore this * Parse the keepalive message, enclosed in the CopyData message.
* message completely, but need to skip past it in the stream. * We just check if the server requested a reply, and ignore the
* rest.
*/ */
pos = 1; /* skip msgtype 'k' */
pos += 8; /* skip walEnd */
pos += 8; /* skip sendTime */
if (r < pos + 1)
{
fprintf(stderr, _("%s: streaming header too small: %d\n"),
progname, r);
goto error;
}
replyRequested = copybuf[pos];
/* If the server requested an immediate reply, send one. */
if (replyRequested)
{
now = localGetCurrentTimestamp();
if (!sendFeedback(conn, blockpos, now, false))
goto error;
last_status = now;
}
continue; continue;
} }
else if (copybuf[0] != 'w') else if (copybuf[0] != 'w')
...@@ -538,8 +563,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -538,8 +563,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* message. We only need the WAL location field (dataStart), the rest * message. We only need the WAL location field (dataStart), the rest
* of the header is ignored. * of the header is ignored.
*/ */
#define STREAMING_HEADER_SIZE (1 /* msgtype */ + 8 /* dataStart */ + 8 /* walEnd */ + 8 /* sendTime */) hdr_len = 1; /* msgtype 'w' */
if (r < STREAMING_HEADER_SIZE + 1) hdr_len += 8; /* dataStart */
hdr_len += 8; /* walEnd */
hdr_len += 8; /* sendTime */
if (r < hdr_len + 1)
{ {
fprintf(stderr, _("%s: streaming header too small: %d\n"), fprintf(stderr, _("%s: streaming header too small: %d\n"),
progname, r); progname, r);
...@@ -578,7 +606,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -578,7 +606,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
} }
} }
bytes_left = r - STREAMING_HEADER_SIZE; bytes_left = r - hdr_len;
bytes_written = 0; bytes_written = 0;
while (bytes_left) while (bytes_left)
...@@ -604,7 +632,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -604,7 +632,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
} }
if (write(walfile, if (write(walfile,
copybuf + STREAMING_HEADER_SIZE + bytes_written, copybuf + hdr_len + bytes_written,
bytes_to_write) != bytes_to_write) bytes_to_write) != bytes_to_write)
{ {
fprintf(stderr, fprintf(stderr,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment