Commit 754baa21 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Automatically terminate replication connections that are idle for more

than replication_timeout (a new GUC) milliseconds. The TCP timeout is often
too long, you want the master to notice a dead connection much sooner.
People complained about that in 9.0 too, but with synchronous replication
it's even more important to notice dead connections promptly.

Fujii Masao and Heikki Linnakangas
parent bc03c593
......@@ -2019,6 +2019,29 @@ SET ENABLE_SEQSCAN TO OFF;
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-replication-timeout" xreflabel="replication_timeout">
<term><varname>replication_timeout</varname> (<type>integer</type>)</term>
<indexterm>
<primary><varname>replication_timeout</> configuration parameter</primary>
</indexterm>
<listitem>
<para>
Terminate replication connections that are inactive longer
than the specified number of milliseconds. This is useful for
the primary server to detect a standby crash or network outage.
A value of zero means wait forever. This parameter can only be set in
the <filename>postgresql.conf</> file or on the server command line.
The default value is 60 seconds.
</para>
<para>
To prevent connections from being terminated prematurely,
<xref linkend="guc-wal-receiver-status-interval">
must be enabled on the standby, and its value must be less than the
value of <varname>replication_timeout</>.
</para>
</listitem>
</varlistentry>
</variablelist>
</sect2>
......@@ -2216,6 +2239,11 @@ SET ENABLE_SEQSCAN TO OFF;
the <filename>postgresql.conf</> file or on the server command line.
The default value is 10 seconds.
</para>
<para>
When <xref linkend="guc-replication-timeout"> is enabled on the primary,
<varname>wal_receiver_status_interval</> must be enabled, and its value
must be less than the value of <varname>replication_timeout</>.
</para>
</listitem>
</varlistentry>
......
......@@ -55,10 +55,12 @@
* pq_peekbyte - peek at next byte from connection
* pq_putbytes - send bytes to connection (not flushed until pq_flush)
* pq_flush - flush pending output
* pq_flush_if_writable - flush pending output if writable without blocking
* pq_getbyte_if_available - get a byte if available without blocking
*
* message-level I/O (and old-style-COPY-OUT cruft):
* pq_putmessage - send a normal message (suppressed in COPY OUT mode)
* pq_putmessage_noblock - buffer a normal message (suppressed in COPY OUT)
* pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
* pq_endcopyout - end a COPY OUT transfer
*
......@@ -92,6 +94,7 @@
#include "miscadmin.h"
#include "storage/ipc.h"
#include "utils/guc.h"
#include "utils/memutils.h"
/*
* Configuration options
......@@ -105,15 +108,21 @@ static char sock_path[MAXPGPATH];
/*
* Buffers for low-level I/O
* Buffers for low-level I/O.
*
* The receive buffer is fixed size. Send buffer is usually 8k, but can be
* enlarged by pq_putmessage_noblock() if the message doesn't fit otherwise.
*/
#define PQ_BUFFER_SIZE 8192
#define PQ_SEND_BUFFER_SIZE 8192
#define PQ_RECV_BUFFER_SIZE 8192
static char PqSendBuffer[PQ_BUFFER_SIZE];
static char *PqSendBuffer;
static int PqSendBufferSize; /* Size send buffer */
static int PqSendPointer; /* Next index to store a byte in PqSendBuffer */
static int PqSendStart; /* Next index to send a byte in PqSendBuffer */
static char PqRecvBuffer[PQ_BUFFER_SIZE];
static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
static int PqRecvPointer; /* Next index to read a byte from PqRecvBuffer */
static int PqRecvLength; /* End of data available in PqRecvBuffer */
......@@ -128,6 +137,7 @@ static bool DoingCopyOut;
static void pq_close(int code, Datum arg);
static int internal_putbytes(const char *s, size_t len);
static int internal_flush(void);
static void pq_set_nonblocking(bool nonblocking);
#ifdef HAVE_UNIX_SOCKETS
static int Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
......@@ -142,7 +152,9 @@ static int Setup_AF_UNIX(void);
void
pq_init(void)
{
PqSendPointer = PqRecvPointer = PqRecvLength = 0;
PqSendBufferSize = PQ_SEND_BUFFER_SIZE;
PqSendBuffer = MemoryContextAlloc(TopMemoryContext, PqSendBufferSize);
PqSendPointer = PqSendStart = PqRecvPointer = PqRecvLength = 0;
PqCommBusy = false;
DoingCopyOut = false;
on_proc_exit(pq_close, 0);
......@@ -732,6 +744,42 @@ TouchSocketFile(void)
* --------------------------------
*/
/* --------------------------------
* pq_set_nonblocking - set socket blocking/non-blocking
*
* Sets the socket non-blocking if nonblocking is TRUE, or sets it
* blocking otherwise.
* --------------------------------
*/
static void
pq_set_nonblocking(bool nonblocking)
{
if (MyProcPort->noblock == nonblocking)
return;
#ifdef WIN32
pgwin32_noblock = nonblocking ? 1 : 0;
#else
/*
* Use COMMERROR on failure, because ERROR would try to send the error
* to the client, which might require changing the mode again, leading
* to infinite recursion.
*/
if (nonblocking)
{
if (!pg_set_noblock(MyProcPort->sock))
ereport(COMMERROR,
(errmsg("could not set socket to non-blocking mode: %m")));
}
else
{
if (!pg_set_block(MyProcPort->sock))
ereport(COMMERROR,
(errmsg("could not set socket to blocking mode: %m")));
}
#endif
MyProcPort->noblock = nonblocking;
}
/* --------------------------------
* pq_recvbuf - load some bytes into the input buffer
......@@ -756,13 +804,16 @@ pq_recvbuf(void)
PqRecvLength = PqRecvPointer = 0;
}
/* Ensure that we're in blocking mode */
pq_set_nonblocking(false);
/* Can fill buffer from PqRecvLength and upwards */
for (;;)
{
int r;
r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
PQ_BUFFER_SIZE - PqRecvLength);
PQ_RECV_BUFFER_SIZE - PqRecvLength);
if (r < 0)
{
......@@ -825,7 +876,6 @@ pq_peekbyte(void)
return (unsigned char) PqRecvBuffer[PqRecvPointer];
}
/* --------------------------------
* pq_getbyte_if_available - get a single byte from connection,
* if available
......@@ -845,72 +895,38 @@ pq_getbyte_if_available(unsigned char *c)
return 1;
}
/* Temporarily put the socket into non-blocking mode */
#ifdef WIN32
pgwin32_noblock = 1;
#else
if (!pg_set_noblock(MyProcPort->sock))
ereport(ERROR,
(errmsg("could not set socket to non-blocking mode: %m")));
#endif
MyProcPort->noblock = true;
PG_TRY();
/* Put the socket into non-blocking mode */
pq_set_nonblocking(true);
r = secure_read(MyProcPort, c, 1);
if (r < 0)
{
r = secure_read(MyProcPort, c, 1);
if (r < 0)
/*
* Ok if no data available without blocking or interrupted (though
* EINTR really shouldn't happen with a non-blocking socket).
* Report other errors.
*/
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
r = 0;
else
{
/*
* Ok if no data available without blocking or interrupted (though
* EINTR really shouldn't happen with a non-blocking socket).
* Report other errors.
* Careful: an ereport() that tries to write to the client
* would cause recursion to here, leading to stack overflow
* and core dump! This message must go *only* to the
* postmaster log.
*/
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
r = 0;
else
{
/*
* Careful: an ereport() that tries to write to the client
* would cause recursion to here, leading to stack overflow
* and core dump! This message must go *only* to the
* postmaster log.
*/
ereport(COMMERROR,
(errcode_for_socket_access(),
errmsg("could not receive data from client: %m")));
r = EOF;
}
}
else if (r == 0)
{
/* EOF detected */
ereport(COMMERROR,
(errcode_for_socket_access(),
errmsg("could not receive data from client: %m")));
r = EOF;
}
}
PG_CATCH();
else if (r == 0)
{
/*
* The rest of the backend code assumes the socket is in blocking
* mode, so treat failure as FATAL.
*/
#ifdef WIN32
pgwin32_noblock = 0;
#else
if (!pg_set_block(MyProcPort->sock))
ereport(FATAL,
(errmsg("could not set socket to blocking mode: %m")));
#endif
MyProcPort->noblock = false;
PG_RE_THROW();
/* EOF detected */
r = EOF;
}
PG_END_TRY();
#ifdef WIN32
pgwin32_noblock = 0;
#else
if (!pg_set_block(MyProcPort->sock))
ereport(FATAL,
(errmsg("could not set socket to blocking mode: %m")));
#endif
MyProcPort->noblock = false;
return r;
}
......@@ -1138,10 +1154,13 @@ internal_putbytes(const char *s, size_t len)
while (len > 0)
{
/* If buffer is full, then flush it out */
if (PqSendPointer >= PQ_BUFFER_SIZE)
if (PqSendPointer >= PqSendBufferSize)
{
pq_set_nonblocking(false);
if (internal_flush())
return EOF;
amount = PQ_BUFFER_SIZE - PqSendPointer;
}
amount = PqSendBufferSize - PqSendPointer;
if (amount > len)
amount = len;
memcpy(PqSendBuffer + PqSendPointer, s, amount);
......@@ -1167,17 +1186,25 @@ pq_flush(void)
if (PqCommBusy)
return 0;
PqCommBusy = true;
pq_set_nonblocking(false);
res = internal_flush();
PqCommBusy = false;
return res;
}
/* --------------------------------
* internal_flush - flush pending output
*
* Returns 0 if OK (meaning everything was sent, or operation would block
* and the socket is in non-blocking mode), or EOF if trouble.
* --------------------------------
*/
static int
internal_flush(void)
{
static int last_reported_send_errno = 0;
char *bufptr = PqSendBuffer;
char *bufptr = PqSendBuffer + PqSendStart;
char *bufend = PqSendBuffer + PqSendPointer;
while (bufptr < bufend)
......@@ -1191,6 +1218,16 @@ internal_flush(void)
if (errno == EINTR)
continue; /* Ok if we were interrupted */
/*
* Ok if no data writable without blocking, and the socket
* is in non-blocking mode.
*/
if (errno == EAGAIN ||
errno == EWOULDBLOCK)
{
return 0;
}
/*
* Careful: an ereport() that tries to write to the client would
* cause recursion to here, leading to stack overflow and core
......@@ -1212,18 +1249,56 @@ internal_flush(void)
* We drop the buffered data anyway so that processing can
* continue, even though we'll probably quit soon.
*/
PqSendPointer = 0;
PqSendStart = PqSendPointer = 0;
return EOF;
}
last_reported_send_errno = 0; /* reset after any successful send */
bufptr += r;
PqSendStart += r;
}
PqSendPointer = 0;
PqSendStart = PqSendPointer = 0;
return 0;
}
/* --------------------------------
* pq_flush_if_writable - flush pending output if writable without blocking
*
* Returns 0 if OK, or EOF if trouble.
* --------------------------------
*/
int
pq_flush_if_writable(void)
{
int res;
/* Quick exit if nothing to do */
if (PqSendPointer == PqSendStart)
return 0;
/* No-op if reentrant call */
if (PqCommBusy)
return 0;
/* Temporarily put the socket into non-blocking mode */
pq_set_nonblocking(true);
PqCommBusy = true;
res = internal_flush();
PqCommBusy = false;
return res;
}
/* --------------------------------
* pq_is_send_pending - is there any pending data in the output buffer?
* --------------------------------
*/
bool
pq_is_send_pending(void)
{
return (PqSendStart < PqSendPointer);
}
/* --------------------------------
* Message-level I/O routines begin here.
......@@ -1285,6 +1360,33 @@ fail:
return EOF;
}
/* --------------------------------
* pq_putmessage_noblock - like pq_putmessage, but never blocks
*
* If the output buffer is too small to hold the message, the buffer
* is enlarged.
*/
void
pq_putmessage_noblock(char msgtype, const char *s, size_t len)
{
int res;
int required;
/*
* Ensure we have enough space in the output buffer for the message header
* as well as the message itself.
*/
required = PqSendPointer + 1 + 4 + len;
if (required > PqSendBufferSize)
{
PqSendBuffer = repalloc(PqSendBuffer, required);
PqSendBufferSize = required;
}
res = pq_putmessage(msgtype, s, len);
Assert(res == 0); /* should not fail when the message fits in buffer */
}
/* --------------------------------
* pq_startcopyout - inform libpq that an old-style COPY OUT transfer
* is beginning
......
......@@ -193,19 +193,21 @@ DisownLatch(volatile Latch *latch)
bool
WaitLatch(volatile Latch *latch, long timeout)
{
return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
}
/*
* Like WaitLatch, but will also return when there's data available in
* 'sock' for reading. Returns 0 if timeout was reached, 1 if the latch
* was set, or 2 if the scoket became readable.
* 'sock' for reading or writing. Returns 0 if timeout was reached,
* 1 if the latch was set, 2 if the socket became readable or writable.
*/
int
WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead,
bool forWrite, long timeout)
{
struct timeval tv, *tvp = NULL;
fd_set input_mask;
fd_set output_mask;
int rc;
int result = 0;
......@@ -241,14 +243,22 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
FD_ZERO(&input_mask);
FD_SET(selfpipe_readfd, &input_mask);
hifd = selfpipe_readfd;
if (sock != PGINVALID_SOCKET)
if (sock != PGINVALID_SOCKET && forRead)
{
FD_SET(sock, &input_mask);
if (sock > hifd)
hifd = sock;
}
rc = select(hifd + 1, &input_mask, NULL, NULL, tvp);
FD_ZERO(&output_mask);
if (sock != PGINVALID_SOCKET && forWrite)
{
FD_SET(sock, &output_mask);
if (sock > hifd)
hifd = sock;
}
rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp);
if (rc < 0)
{
if (errno == EINTR)
......@@ -263,7 +273,9 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, long timeout)
result = 0;
break;
}
if (sock != PGINVALID_SOCKET && FD_ISSET(sock, &input_mask))
if (sock != PGINVALID_SOCKET &&
((forRead && FD_ISSET(sock, &input_mask)) ||
(forWrite && FD_ISSET(sock, &output_mask))))
{
result = 2;
break; /* data available in socket */
......
......@@ -14,7 +14,8 @@
#include "postgres.h"
/*
* Indicate if pgwin32_recv() should operate in non-blocking mode.
* Indicate if pgwin32_recv() and pgwin32_send() should operate
* in non-blocking mode.
*
* Since the socket emulation layer always sets the actual socket to
* non-blocking mode in order to be able to deliver signals, we must
......@@ -399,6 +400,16 @@ pgwin32_send(SOCKET s, char *buf, int len, int flags)
return -1;
}
if (pgwin32_noblock)
{
/*
* No data sent, and we are in "emulated non-blocking mode", so
* return indicating that we'd block if we were to continue.
*/
errno = EWOULDBLOCK;
return -1;
}
/* No error, zero bytes (win2000+) or error+WSAEWOULDBLOCK (<=nt4) */
if (pgwin32_waitforsinglesocket(s, FD_WRITE | FD_CLOSE, INFINITE) == 0)
......
......@@ -85,11 +85,12 @@ DisownLatch(volatile Latch *latch)
bool
WaitLatch(volatile Latch *latch, long timeout)
{
return WaitLatchOrSocket(latch, PGINVALID_SOCKET, timeout) > 0;
return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0;
}
int
WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead,
bool forWrite, long timeout)
{
DWORD rc;
HANDLE events[3];
......@@ -103,10 +104,17 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
events[0] = latchevent;
events[1] = pgwin32_signal_event;
numevents = 2;
if (sock != PGINVALID_SOCKET)
if (sock != PGINVALID_SOCKET && (forRead || forWrite))
{
int flags = 0;
if (forRead)
flags |= FD_READ;
if (forWrite)
flags |= FD_WRITE;
sockevent = WSACreateEvent();
WSAEventSelect(sock, sockevent, FD_READ);
WSAEventSelect(sock, sockevent, flags);
events[numevents++] = sockevent;
}
......@@ -139,8 +147,18 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
pgwin32_dispatch_queued_signals();
else if (rc == WAIT_OBJECT_0 + 2)
{
WSANETWORKEVENTS resEvents;
Assert(sock != PGINVALID_SOCKET);
result = 2;
ZeroMemory(&resEvents, sizeof(resEvents));
if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR)
ereport(FATAL,
(errmsg_internal("failed to enumerate network events: %i", (int) GetLastError())));
if ((forRead && resEvents.lNetworkEvents & FD_READ) ||
(forWrite && resEvents.lNetworkEvents & FD_WRITE))
result = 2;
break;
}
else if (rc != WAIT_OBJECT_0)
......@@ -148,7 +166,7 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, long timeout)
}
/* Clean up the handle we created for the socket */
if (sock != PGINVALID_SOCKET)
if (sock != PGINVALID_SOCKET && (forRead || forWrite))
{
WSAEventSelect(sock, sockevent, 0);
WSACloseEvent(sockevent);
......
......@@ -74,6 +74,7 @@ bool am_walsender = false; /* Am I a walsender process ? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int WalSndDelay = 1000; /* max sleep time between some actions */
int replication_timeout = 60 * 1000; /* maximum time to send one WAL data message */
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
......@@ -95,6 +96,11 @@ static XLogRecPtr sentPtr = {0, 0};
*/
static StringInfoData reply_message;
/*
* Timestamp of the last receipt of the reply from the standby.
*/
static TimestampTz last_reply_timestamp;
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
volatile sig_atomic_t walsender_shutdown_requested = false;
......@@ -113,7 +119,7 @@ static int WalSndLoop(void);
static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static bool XLogSend(char *msgbuf, bool *caughtup);
static void XLogSend(char *msgbuf, bool *caughtup);
static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd * cmd);
static void ProcessStandbyMessage(void);
......@@ -469,6 +475,7 @@ ProcessRepliesIfAny(void)
{
unsigned char firstchar;
int r;
int received = false;
for (;;)
{
......@@ -484,7 +491,7 @@ ProcessRepliesIfAny(void)
if (r == 0)
{
/* no data available without blocking */
return;
break;
}
/* Handle the very limited subset of commands expected in this phase */
......@@ -495,6 +502,7 @@ ProcessRepliesIfAny(void)
*/
case 'd':
ProcessStandbyMessage();
received = true;
break;
/*
......@@ -510,6 +518,12 @@ ProcessRepliesIfAny(void)
firstchar)));
}
}
/*
* Save the last reply timestamp if we've received at least
* one reply.
*/
if (received)
last_reply_timestamp = GetCurrentTimestamp();
}
/*
......@@ -688,6 +702,9 @@ WalSndLoop(void)
*/
initStringInfo(&reply_message);
/* Initialize the last reply timestamp */
last_reply_timestamp = GetCurrentTimestamp();
/* Loop forever, unless we get an error */
for (;;)
{
......@@ -706,19 +723,6 @@ WalSndLoop(void)
SyncRepInitConfig();
}
/*
* When SIGUSR2 arrives, we send all outstanding logs up to the
* shutdown checkpoint record (i.e., the latest record) and exit.
*/
if (walsender_ready_to_stop)
{
if (!XLogSend(output_message, &caughtup))
break;
ProcessRepliesIfAny();
if (caughtup)
walsender_shutdown_requested = true;
}
/* Normal exit from the walsender is here */
if (walsender_shutdown_requested)
{
......@@ -730,11 +734,13 @@ WalSndLoop(void)
}
/*
* If we had sent all accumulated WAL in last round, nap for the
* configured time before retrying.
* If we don't have any pending data in the output buffer, try to
* send some more.
*/
if (caughtup)
if (!pq_is_send_pending())
{
XLogSend(output_message, &caughtup);
/*
* Even if we wrote all the WAL that was available when we started
* sending, more might have arrived while we were sending this
......@@ -742,28 +748,79 @@ WalSndLoop(void)
* received any signals from that time. Let's arm the latch
* again, and after that check that we're still up-to-date.
*/
ResetLatch(&MyWalSnd->latch);
if (!XLogSend(output_message, &caughtup))
break;
if (caughtup && !got_SIGHUP && !walsender_ready_to_stop && !walsender_shutdown_requested)
if (caughtup && !pq_is_send_pending())
{
/*
* XXX: We don't really need the periodic wakeups anymore,
* WaitLatchOrSocket should reliably wake up as soon as
* something interesting happens.
*/
ResetLatch(&MyWalSnd->latch);
/* Sleep */
WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
WalSndDelay * 1000L);
XLogSend(output_message, &caughtup);
}
}
else
/* Flush pending output to the client */
if (pq_flush_if_writable() != 0)
break;
/*
* When SIGUSR2 arrives, we send any outstanding logs up to the
* shutdown checkpoint record (i.e., the latest record) and exit.
*/
if (walsender_ready_to_stop && !pq_is_send_pending())
{
/* Attempt to send the log once every loop */
if (!XLogSend(output_message, &caughtup))
XLogSend(output_message, &caughtup);
ProcessRepliesIfAny();
if (caughtup && !pq_is_send_pending())
walsender_shutdown_requested = true;
}
if ((caughtup || pq_is_send_pending()) &&
!got_SIGHUP &&
!walsender_shutdown_requested)
{
TimestampTz finish_time;
long sleeptime;
/* Reschedule replication timeout */
if (replication_timeout > 0)
{
long secs;
int usecs;
finish_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
replication_timeout);
TimestampDifference(GetCurrentTimestamp(),
finish_time, &secs, &usecs);
sleeptime = secs * 1000 + usecs / 1000;
if (WalSndDelay < sleeptime)
sleeptime = WalSndDelay;
}
else
{
/*
* XXX: Without timeout, we don't really need the periodic
* wakeups anymore, WaitLatchOrSocket should reliably wake up
* as soon as something interesting happens.
*/
sleeptime = WalSndDelay;
}
/* Sleep */
WaitLatchOrSocket(&MyWalSnd->latch, MyProcPort->sock,
true, pq_is_send_pending(),
sleeptime * 1000L);
/* Check for replication timeout */
if (replication_timeout > 0 &&
GetCurrentTimestamp() >= finish_time)
{
/*
* Since typically expiration of replication timeout means
* communication problem, we don't send the error message
* to the standby.
*/
ereport(COMMERROR,
(errmsg("terminating walsender process due to replication timeout")));
break;
}
}
/*
......@@ -993,7 +1050,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
/*
* Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
* but not yet sent to the client, and send it.
* but not yet sent to the client, and buffer it in the libpq output
* buffer.
*
* msgbuf is a work area in which the output message is constructed. It's
* passed in just so we can avoid re-palloc'ing the buffer on each cycle.
......@@ -1001,10 +1059,9 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
*
* If there is no unsent WAL remaining, *caughtup is set to true, otherwise
* *caughtup is set to false.
*
* Returns true if OK, false if trouble.
*/
static bool
static void
XLogSend(char *msgbuf, bool *caughtup)
{
XLogRecPtr SendRqstPtr;
......@@ -1027,7 +1084,7 @@ XLogSend(char *msgbuf, bool *caughtup)
if (XLByteLE(SendRqstPtr, sentPtr))
{
*caughtup = true;
return true;
return;
}
/*
......@@ -1099,11 +1156,7 @@ XLogSend(char *msgbuf, bool *caughtup)
memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
/* Flush pending output to the client */
if (pq_flush())
return false;
pq_putmessage_noblock('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
sentPtr = endptr;
......@@ -1127,7 +1180,7 @@ XLogSend(char *msgbuf, bool *caughtup)
set_ps_display(activitymsg, false);
}
return true;
return;
}
/* SIGHUP: set flag to re-read config file at next convenient time */
......
......@@ -1855,6 +1855,16 @@ static struct config_int ConfigureNamesInt[] =
1000, 1, 10000, NULL, NULL
},
{
{"replication_timeout", PGC_SIGHUP, WAL_REPLICATION,
gettext_noop("Sets the maximum time to wait for WAL replication."),
NULL,
GUC_UNIT_MS
},
&replication_timeout,
60 * 1000, 0, INT_MAX, NULL, NULL
},
{
{"commit_delay", PGC_USERSET, WAL_SETTINGS,
gettext_noop("Sets the delay in microseconds between transaction commit and "
......
......@@ -200,6 +200,7 @@
#wal_sender_delay = 1s # walsender cycle time, 1-10000 milliseconds
#wal_keep_segments = 0 # in logfile segments, 16MB each; 0 disables
#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed
#replication_timeout = 60s # in milliseconds, 0 is disabled
# - Standby Servers -
......
......@@ -60,7 +60,10 @@ extern int pq_peekbyte(void);
extern int pq_getbyte_if_available(unsigned char *c);
extern int pq_putbytes(const char *s, size_t len);
extern int pq_flush(void);
extern int pq_flush_if_writable(void);
extern bool pq_is_send_pending(void);
extern int pq_putmessage(char msgtype, const char *s, size_t len);
extern void pq_putmessage_noblock(char msgtype, const char *s, size_t len);
extern void pq_startcopyout(void);
extern void pq_endcopyout(bool errorAbort);
......
......@@ -98,6 +98,7 @@ extern volatile sig_atomic_t walsender_ready_to_stop;
/* user-settable parameters */
extern int WalSndDelay;
extern int max_wal_senders;
extern int replication_timeout;
extern int WalSenderMain(void);
extern void WalSndSignals(void);
......
......@@ -40,7 +40,7 @@ extern void OwnLatch(volatile Latch *latch);
extern void DisownLatch(volatile Latch *latch);
extern bool WaitLatch(volatile Latch *latch, long timeout);
extern int WaitLatchOrSocket(volatile Latch *latch, pgsocket sock,
long timeout);
bool forRead, bool forWrite, long timeout);
extern void SetLatch(volatile Latch *latch);
extern void ResetLatch(volatile Latch *latch);
#define TestLatch(latch) (((volatile Latch *) latch)->is_set)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment