Commit 74cbe966 authored by Fujii Masao's avatar Fujii Masao

Refactor pg_receivexlog main loop code, for readability.

Previously the source codes for receiving the data and for
polling the socket were included in pg_receivexlog main loop.
This commit splits out them as separate functions. This is
useful for improving the readability of main loop code and
making the future pg_receivexlog-related patch simpler.
parent 644d8535
......@@ -35,6 +35,8 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
......@@ -744,12 +746,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
int bytes_written;
int64 now;
int hdr_len;
if (copybuf != NULL)
{
PQfreemem(copybuf);
copybuf = NULL;
}
long sleeptime;
/*
* Check if we should continue streaming, or abort at this point.
......@@ -784,67 +781,38 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
last_status = now;
}
r = PQgetCopyData(conn, &copybuf, 1);
if (r == 0)
/*
* Compute how long send/receive loops should sleep
*/
if (standby_message_timeout && still_sending)
{
/*
* No data available. Wait for some to appear, but not longer than
* the specified timeout, so that we can ping the server.
*/
fd_set input_mask;
struct timeval timeout;
struct timeval *timeoutptr;
FD_ZERO(&input_mask);
FD_SET(PQsocket(conn), &input_mask);
if (standby_message_timeout && still_sending)
int64 targettime;
long secs;
int usecs;
targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
feTimestampDifference(now,
targettime,
&secs,
&usecs);
/* Always sleep at least 1 sec */
if (secs <= 0)
{
int64 targettime;
long secs;
int usecs;
targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
feTimestampDifference(now,
targettime,
&secs,
&usecs);
if (secs <= 0)
timeout.tv_sec = 1; /* Always sleep at least 1 sec */
else
timeout.tv_sec = secs;
timeout.tv_usec = usecs;
timeoutptr = &timeout;
secs = 1;
usecs = 0;
}
else
timeoutptr = NULL;
r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
if (r == 0 || (r < 0 && errno == EINTR))
{
/*
* Got a timeout or signal. Continue the loop and either
* deliver a status packet to the server or just go back into
* blocking.
*/
continue;
}
else if (r < 0)
{
fprintf(stderr, _("%s: select() failed: %s\n"),
progname, strerror(errno));
goto error;
}
/* Else there is actually data on the socket */
if (PQconsumeInput(conn) == 0)
{
fprintf(stderr,
_("%s: could not receive data from WAL stream: %s"),
progname, PQerrorMessage(conn));
goto error;
}
continue;
sleeptime = secs * 1000 + usecs / 1000;
}
else
sleeptime = -1;
r = CopyStreamReceive(conn, sleeptime, &copybuf);
if (r == 0)
continue;
if (r == -1)
goto error;
if (r == -2)
{
PGresult *res = PQgetResult(conn);
......@@ -877,15 +845,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
}
if (copybuf != NULL)
PQfreemem(copybuf);
copybuf = NULL;
*stoppos = blockpos;
return res;
}
if (r == -2)
{
fprintf(stderr, _("%s: could not read COPY data: %s"),
progname, PQerrorMessage(conn));
goto error;
}
/* Check the message type. */
if (copybuf[0] == 'k')
......@@ -1056,3 +1019,115 @@ error:
PQfreemem(copybuf);
return NULL;
}
/*
* Wait until we can read CopyData message, or timeout.
*
* Returns 1 if data has become available for reading, 0 if timed out
* or interrupted by signal, and -1 on an error.
*/
static int
CopyStreamPoll(PGconn *conn, long timeout_ms)
{
int ret;
fd_set input_mask;
struct timeval timeout;
struct timeval *timeoutptr;
if (PQsocket(conn) < 0)
{
fprintf(stderr, _("%s: socket not open"), progname);
return -1;
}
FD_ZERO(&input_mask);
FD_SET(PQsocket(conn), &input_mask);
if (timeout_ms < 0)
timeoutptr = NULL;
else
{
timeout.tv_sec = timeout_ms / 1000L;
timeout.tv_usec = (timeout_ms % 1000L) * 1000L;
timeoutptr = &timeout;
}
ret = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
if (ret == 0 || (ret < 0 && errno == EINTR))
return 0; /* Got a timeout or signal */
else if (ret < 0)
{
fprintf(stderr, _("%s: select() failed: %s\n"),
progname, strerror(errno));
return -1;
}
return 1;
}
/*
* Receive CopyData message available from XLOG stream, blocking for
* maximum of 'timeout' ms.
*
* If data was received, returns the length of the data. *buffer is set to
* point to a buffer holding the received message. The buffer is only valid
* until the next CopyStreamReceive call.
*
* 0 if no data was available within timeout, or wait was interrupted
* by signal. -1 on error. -2 if the server ended the COPY.
*/
static int
CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
{
static char *copybuf = NULL;
int rawlen;
if (copybuf != NULL)
PQfreemem(copybuf);
copybuf = NULL;
*buffer = NULL;
/* Try to receive a CopyData message */
rawlen = PQgetCopyData(conn, &copybuf, 1);
if (rawlen == 0)
{
/*
* No data available. Wait for some to appear, but not longer than
* the specified timeout, so that we can ping the server.
*/
if (timeout > 0)
{
int ret;
ret = CopyStreamPoll(conn, timeout);
if (ret <= 0)
return ret;
}
/* Else there is actually data on the socket */
if (PQconsumeInput(conn) == 0)
{
fprintf(stderr,
_("%s: could not receive data from WAL stream: %s"),
progname, PQerrorMessage(conn));
return -1;
}
/* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(conn, &copybuf, 1);
if (rawlen == 0)
return 0;
}
if (rawlen == -1) /* end-of-streaming or error */
return -2;
if (rawlen == -2)
{
fprintf(stderr, _("%s: could not read COPY data: %s"),
progname, PQerrorMessage(conn));
return -1;
}
/* Return received messages to caller */
*buffer = copybuf;
return rawlen;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment