Commit 6805e02c authored by Fujii Masao's avatar Fujii Masao

Refactor pg_receivexlog main loop code, for readability, take 2.

Previously the source codes for processing the received data and handling
the end of stream were included in pg_receivexlog main loop. This commit
splits out them as separate functions. This is useful for improving the
readability of main loop code and making the future pg_receivexlog-related
patch simpler.
parent e3da0d4d
...@@ -31,12 +31,23 @@ static char current_walfile_name[MAXPGPATH] = ""; ...@@ -31,12 +31,23 @@ static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false; static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static bool still_sending = true; /* feedback still needs to be sent? */
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir, uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout, stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos); char *partial_suffix, XLogRecPtr *stoppos);
static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr blockpos, int64 *last_status);
static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr *blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
char *partial_suffix);
static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
XLogRecPtr blockpos, char *basedir, char *partial_suffix,
XLogRecPtr *stoppos);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline); uint32 *timeline);
...@@ -740,16 +751,13 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -740,16 +751,13 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *copybuf = NULL; char *copybuf = NULL;
int64 last_status = -1; int64 last_status = -1;
XLogRecPtr blockpos = startpos; XLogRecPtr blockpos = startpos;
bool still_sending = true;
still_sending = true;
while (1) while (1)
{ {
int r; int r;
int xlogoff;
int bytes_left;
int bytes_written;
int64 now; int64 now;
int hdr_len;
long sleeptime; long sleeptime;
/* /*
...@@ -818,198 +826,26 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -818,198 +826,26 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
goto error; goto error;
if (r == -2) if (r == -2)
{ {
PGresult *res = PQgetResult(conn); PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
basedir, partial_suffix, stoppos);
/* if (res == NULL)
* The server closed its end of the copy stream. If we haven't goto error;
* closed ours already, we need to do so now, unless the server else
* threw an error, in which case we don't. return res;
*/
if (still_sending)
{
if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Error message written in close_walfile() */
PQclear(res);
goto error;
}
if (PQresultStatus(res) == PGRES_COPY_IN)
{
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
fprintf(stderr,
_("%s: could not send copy-end packet: %s"),
progname, PQerrorMessage(conn));
PQclear(res);
goto error;
}
PQclear(res);
res = PQgetResult(conn);
}
still_sending = false;
}
if (copybuf != NULL)
PQfreemem(copybuf);
copybuf = NULL;
*stoppos = blockpos;
return res;
} }
/* Check the message type. */ /* Check the message type. */
if (copybuf[0] == 'k') if (copybuf[0] == 'k')
{ {
int pos; if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
bool replyRequested; &last_status))
/*
* Parse the keepalive message, enclosed in the CopyData message.
* We just check if the server requested a reply, and ignore the
* rest.
*/
pos = 1; /* skip msgtype 'k' */
pos += 8; /* skip walEnd */
pos += 8; /* skip sendTime */
if (r < pos + 1)
{
fprintf(stderr, _("%s: streaming header too small: %d\n"),
progname, r);
goto error; goto error;
}
replyRequested = copybuf[pos];
/* If the server requested an immediate reply, send one. */
if (replyRequested && still_sending)
{
now = feGetCurrentTimestamp();
if (!sendFeedback(conn, blockpos, now, false))
goto error;
last_status = now;
}
} }
else if (copybuf[0] == 'w') else if (copybuf[0] == 'w')
{ {
/* if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
* Once we've decided we don't want to receive any more, just timeline, basedir, stream_stop, partial_suffix))
* ignore any subsequent XLogData messages.
*/
if (!still_sending)
continue;
/*
* Read the header of the XLogData message, enclosed in the
* CopyData message. We only need the WAL location field
* (dataStart), the rest of the header is ignored.
*/
hdr_len = 1; /* msgtype 'w' */
hdr_len += 8; /* dataStart */
hdr_len += 8; /* walEnd */
hdr_len += 8; /* sendTime */
if (r < hdr_len)
{
fprintf(stderr, _("%s: streaming header too small: %d\n"),
progname, r);
goto error; goto error;
}
blockpos = fe_recvint64(&copybuf[1]);
/* Extract WAL location for this block */
xlogoff = blockpos % XLOG_SEG_SIZE;
/*
* Verify that the initial location in the stream matches where we
* think we are.
*/
if (walfile == -1)
{
/* No file open yet */
if (xlogoff != 0)
{
fprintf(stderr,
_("%s: received transaction log record for offset %u with no file open\n"),
progname, xlogoff);
goto error;
}
}
else
{
/* More data in existing segment */
/* XXX: store seek value don't reseek all the time */
if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
{
fprintf(stderr,
_("%s: got WAL data offset %08x, expected %08x\n"),
progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
goto error;
}
}
bytes_left = r - hdr_len;
bytes_written = 0;
while (bytes_left)
{
int bytes_to_write;
/*
* If crossing a WAL boundary, only write up until we reach
* XLOG_SEG_SIZE.
*/
if (xlogoff + bytes_left > XLOG_SEG_SIZE)
bytes_to_write = XLOG_SEG_SIZE - xlogoff;
else
bytes_to_write = bytes_left;
if (walfile == -1)
{
if (!open_walfile(blockpos, timeline,
basedir, partial_suffix))
{
/* Error logged by open_walfile */
goto error;
}
}
if (write(walfile,
copybuf + hdr_len + bytes_written,
bytes_to_write) != bytes_to_write)
{
fprintf(stderr,
_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
progname, bytes_to_write, current_walfile_name,
strerror(errno));
goto error;
}
/* Write was successful, advance our position */
bytes_written += bytes_to_write;
bytes_left -= bytes_to_write;
blockpos += bytes_to_write;
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if (blockpos % XLOG_SEG_SIZE == 0)
{
if (!close_walfile(basedir, partial_suffix, blockpos))
/* Error message written in close_walfile() */
goto error;
xlogoff = 0;
if (still_sending && stream_stop(blockpos, timeline, true))
{
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
fprintf(stderr, _("%s: could not send copy-end packet: %s"),
progname, PQerrorMessage(conn));
goto error;
}
still_sending = false;
break; /* ignore the rest of this XLogData packet */
}
}
}
/* No more data left to write, receive next copy packet */
} }
else else
{ {
...@@ -1135,3 +971,225 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer) ...@@ -1135,3 +971,225 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
*buffer = copybuf; *buffer = copybuf;
return rawlen; return rawlen;
} }
/*
* Process the keepalive message.
*/
static bool
ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr blockpos, int64 *last_status)
{
int pos;
bool replyRequested;
int64 now;
/*
* Parse the keepalive message, enclosed in the CopyData message.
* We just check if the server requested a reply, and ignore the
* rest.
*/
pos = 1; /* skip msgtype 'k' */
pos += 8; /* skip walEnd */
pos += 8; /* skip sendTime */
if (len < pos + 1)
{
fprintf(stderr, _("%s: streaming header too small: %d\n"),
progname, len);
return false;
}
replyRequested = copybuf[pos];
/* If the server requested an immediate reply, send one. */
if (replyRequested && still_sending)
{
now = feGetCurrentTimestamp();
if (!sendFeedback(conn, blockpos, now, false))
return false;
*last_status = now;
}
return true;
}
/*
* Process XLogData message.
*/
static bool
ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr *blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
char *partial_suffix)
{
int xlogoff;
int bytes_left;
int bytes_written;
int hdr_len;
/*
* Once we've decided we don't want to receive any more, just
* ignore any subsequent XLogData messages.
*/
if (!(still_sending))
return true;
/*
* Read the header of the XLogData message, enclosed in the
* CopyData message. We only need the WAL location field
* (dataStart), the rest of the header is ignored.
*/
hdr_len = 1; /* msgtype 'w' */
hdr_len += 8; /* dataStart */
hdr_len += 8; /* walEnd */
hdr_len += 8; /* sendTime */
if (len < hdr_len)
{
fprintf(stderr, _("%s: streaming header too small: %d\n"),
progname, len);
return false;
}
*blockpos = fe_recvint64(&copybuf[1]);
/* Extract WAL location for this block */
xlogoff = *blockpos % XLOG_SEG_SIZE;
/*
* Verify that the initial location in the stream matches where we
* think we are.
*/
if (walfile == -1)
{
/* No file open yet */
if (xlogoff != 0)
{
fprintf(stderr,
_("%s: received transaction log record for offset %u with no file open\n"),
progname, xlogoff);
return false;
}
}
else
{
/* More data in existing segment */
/* XXX: store seek value don't reseek all the time */
if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
{
fprintf(stderr,
_("%s: got WAL data offset %08x, expected %08x\n"),
progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
return false;
}
}
bytes_left = len - hdr_len;
bytes_written = 0;
while (bytes_left)
{
int bytes_to_write;
/*
* If crossing a WAL boundary, only write up until we reach
* XLOG_SEG_SIZE.
*/
if (xlogoff + bytes_left > XLOG_SEG_SIZE)
bytes_to_write = XLOG_SEG_SIZE - xlogoff;
else
bytes_to_write = bytes_left;
if (walfile == -1)
{
if (!open_walfile(*blockpos, timeline,
basedir, partial_suffix))
{
/* Error logged by open_walfile */
return false;
}
}
if (write(walfile,
copybuf + hdr_len + bytes_written,
bytes_to_write) != bytes_to_write)
{
fprintf(stderr,
_("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
progname, bytes_to_write, current_walfile_name,
strerror(errno));
return false;
}
/* Write was successful, advance our position */
bytes_written += bytes_to_write;
bytes_left -= bytes_to_write;
*blockpos += bytes_to_write;
xlogoff += bytes_to_write;
/* Did we reach the end of a WAL segment? */
if (*blockpos % XLOG_SEG_SIZE == 0)
{
if (!close_walfile(basedir, partial_suffix, *blockpos))
/* Error message written in close_walfile() */
return false;
xlogoff = 0;
if (still_sending && stream_stop(*blockpos, timeline, true))
{
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
fprintf(stderr, _("%s: could not send copy-end packet: %s"),
progname, PQerrorMessage(conn));
return false;
}
still_sending = false;
return true; /* ignore the rest of this XLogData packet */
}
}
}
/* No more data left to write, receive next copy packet */
return true;
}
/*
* Handle end of the copy stream.
*/
static PGresult *
HandleEndOfCopyStream(PGconn *conn, char *copybuf,
XLogRecPtr blockpos, char *basedir, char *partial_suffix,
XLogRecPtr *stoppos)
{
PGresult *res = PQgetResult(conn);
/*
* The server closed its end of the copy stream. If we haven't
* closed ours already, we need to do so now, unless the server
* threw an error, in which case we don't.
*/
if (still_sending)
{
if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Error message written in close_walfile() */
PQclear(res);
return NULL;
}
if (PQresultStatus(res) == PGRES_COPY_IN)
{
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
fprintf(stderr,
_("%s: could not send copy-end packet: %s"),
progname, PQerrorMessage(conn));
PQclear(res);
return NULL;
}
res = PQgetResult(conn);
}
still_sending = false;
}
if (copybuf != NULL)
PQfreemem(copybuf);
*stoppos = blockpos;
return res;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment