Commit 3dad73e7 authored by Fujii Masao's avatar Fujii Masao

Add -F option to pg_receivexlog, for specifying fsync interval.

This allows us to specify the maximum time to issue fsync to ensure
the received WAL file is safely flushed to disk. Without this,
pg_receivexlog always flushes WAL file only when it's closed and
which can cause WAL data to be lost at the event of a crash.

Furuya Osamu, heavily modified by me.
parent 1add956a
......@@ -105,6 +105,21 @@ PostgreSQL documentation
</listitem>
</varlistentry>
<varlistentry>
<term><option>-F <replaceable class="parameter">interval</replaceable></option></term>
<term><option>--fsync-interval=<replaceable class="parameter">interval</replaceable></option></term>
<listitem>
<para>
Specifies the maximum time to issue sync commands to ensure the
received WAL file is safely flushed to disk, in seconds. The default
value is zero, which disables issuing fsyncs except when WAL file is
closed. If <literal>-1</literal> is specified, WAL file is flushed as
soon as possible, that is, as soon as there are WAL data which has
not been flushed yet.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><option>-v</option></term>
<term><option>--verbose</option></term>
......
......@@ -371,7 +371,7 @@ LogStreamerMain(logstreamer_param *param)
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
NULL))
NULL, 0))
/*
* Any errors will already have been reported in the function process,
......
......@@ -36,6 +36,7 @@ static char *basedir = NULL;
static int verbose = 0;
static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 0; /* 0 = default */
static volatile bool time_to_abort = false;
......@@ -62,6 +63,8 @@ usage(void)
printf(_("\nOptions:\n"));
printf(_(" -D, --directory=DIR receive transaction log files into this directory\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
printf(_(" -F --fsync-interval=INTERVAL\n"
" frequency of syncs to transaction log files (in seconds)\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
......@@ -330,7 +333,8 @@ StreamLog(void)
starttli);
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
stop_streaming, standby_message_timeout, ".partial");
stop_streaming, standby_message_timeout, ".partial",
fsync_interval);
PQfinish(conn);
}
......@@ -360,6 +364,7 @@ main(int argc, char **argv)
{"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'},
{"no-loop", no_argument, NULL, 'n'},
{"fsync-interval", required_argument, NULL, 'F'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
......@@ -389,7 +394,7 @@ main(int argc, char **argv)
}
}
while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv",
long_options, &option_index)) != -1)
{
switch (c)
......@@ -436,6 +441,15 @@ main(int argc, char **argv)
case 'n':
noloop = 1;
break;
case 'F':
fsync_interval = atoi(optarg) * 1000;
if (fsync_interval < -1000)
{
fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
progname, optarg);
exit(1);
}
break;
case 'v':
verbose++;
break;
......
......@@ -31,12 +31,14 @@ static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static int64 last_fsync = -1; /* timestamp of last WAL file flush */
static bool still_sending = true; /* feedback still needs to be sent? */
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos);
char *partial_suffix, XLogRecPtr *stoppos,
int fsync_interval);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
......@@ -48,6 +50,13 @@ static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
XLogRecPtr blockpos, char *basedir, char *partial_suffix,
XLogRecPtr *stoppos);
static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop,
char *partial_suffix, XLogRecPtr *stoppos);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
int64 last_status, int fsync_interval,
XLogRecPtr blockpos);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
......@@ -200,6 +209,7 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
progname, current_walfile_name, partial_suffix);
lastFlushPosition = pos;
last_fsync = feGetCurrentTimestamp();
return true;
}
......@@ -430,13 +440,17 @@ CheckServerVersionForStreaming(PGconn *conn)
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
* fsync_interval controls how often we flush to the received WAL file,
* in milliseconds.
*
* Note: The log position *must* be at a log segment start!
*/
bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix)
int standby_message_timeout, char *partial_suffix,
int fsync_interval)
{
char query[128];
char slotcmd[128];
......@@ -581,7 +595,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
&stoppos);
&stoppos, fsync_interval);
if (res == NULL)
goto error;
......@@ -746,7 +760,7 @@ static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
XLogRecPtr *stoppos)
XLogRecPtr *stoppos, int fsync_interval)
{
char *copybuf = NULL;
int64 last_status = -1;
......@@ -763,26 +777,36 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/*
* Check if we should continue streaming, or abort at this point.
*/
if (still_sending && stream_stop(blockpos, timeline, false))
if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
stream_stop, partial_suffix, stoppos))
goto error;
now = feGetCurrentTimestamp();
/*
* If fsync_interval has elapsed since last WAL flush and we've written
* some WAL data, flush them to disk.
*/
if (lastFlushPosition < blockpos &&
walfile != -1 &&
((fsync_interval > 0 &&
feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) ||
fsync_interval < 0))
{
if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Potential error message is written by close_walfile */
goto error;
}
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
if (fsync(walfile) != 0)
{
fprintf(stderr, _("%s: could not send copy-end packet: %s"),
progname, PQerrorMessage(conn));
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
progname, current_walfile_name, strerror(errno));
goto error;
}
still_sending = false;
lastFlushPosition = blockpos;
last_fsync = now;
}
/*
* Potentially send a status message to the master
*/
now = feGetCurrentTimestamp();
if (still_sending && standby_message_timeout > 0 &&
feTimestampDifferenceExceeds(last_status, now,
standby_message_timeout))
......@@ -794,64 +818,58 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
}
/*
* Compute how long send/receive loops should sleep
* Calculate how long send/receive loops should sleep
*/
if (standby_message_timeout && still_sending)
{
int64 targettime;
long secs;
int usecs;
targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
feTimestampDifference(now,
targettime,
&secs,
&usecs);
/* Always sleep at least 1 sec */
if (secs <= 0)
{
secs = 1;
usecs = 0;
}
sleeptime = secs * 1000 + usecs / 1000;
}
else
sleeptime = -1;
sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
last_status, fsync_interval, blockpos);
r = CopyStreamReceive(conn, sleeptime, &copybuf);
if (r == 0)
continue;
if (r == -1)
goto error;
if (r == -2)
while (r != 0)
{
PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
basedir, partial_suffix, stoppos);
if (res == NULL)
if (r == -1)
goto error;
else
return res;
}
if (r == -2)
{
PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
basedir, partial_suffix, stoppos);
if (res == NULL)
goto error;
else
return res;
}
/* Check the message type. */
if (copybuf[0] == 'k')
{
if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
&last_status))
goto error;
}
else if (copybuf[0] == 'w')
{
if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
timeline, basedir, stream_stop, partial_suffix))
/* Check the message type. */
if (copybuf[0] == 'k')
{
if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
&last_status))
goto error;
}
else if (copybuf[0] == 'w')
{
if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
timeline, basedir, stream_stop, partial_suffix))
goto error;
/*
* Check if we should continue streaming, or abort at this point.
*/
if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
stream_stop, partial_suffix, stoppos))
goto error;
}
else
{
fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
progname, copybuf[0]);
goto error;
}
else
{
fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
progname, copybuf[0]);
goto error;
}
/*
* Process the received data, and any subsequent data we
* can read without blocking.
*/
r = CopyStreamReceive(conn, 0, &copybuf);
}
}
......@@ -1193,3 +1211,80 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
*stoppos = blockpos;
return res;
}
/*
* Check if we should continue streaming, or abort at this point.
*/
static bool
CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
char *partial_suffix, XLogRecPtr *stoppos)
{
if (still_sending && stream_stop(blockpos, timeline, false))
{
if (!close_walfile(basedir, partial_suffix, blockpos))
{
/* Potential error message is written by close_walfile */
return false;
}
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{
fprintf(stderr, _("%s: could not send copy-end packet: %s"),
progname, PQerrorMessage(conn));
return false;
}
still_sending = false;
}
return true;
}
/*
* Calculate how long send/receive loops should sleep
*/
static long
CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
int64 last_status, int fsync_interval, XLogRecPtr blockpos)
{
int64 targettime = 0;
int64 status_targettime = 0;
int64 fsync_targettime = 0;
long sleeptime;
if (standby_message_timeout && still_sending)
status_targettime = last_status +
(standby_message_timeout - 1) * ((int64) 1000);
if (fsync_interval > 0 && lastFlushPosition < blockpos)
fsync_targettime = last_fsync +
(fsync_interval - 1) * ((int64) 1000);
if ((status_targettime < fsync_targettime && status_targettime > 0) ||
fsync_targettime == 0)
targettime = status_targettime;
else
targettime = fsync_targettime;
if (targettime > 0)
{
long secs;
int usecs;
feTimestampDifference(now,
targettime,
&secs,
&usecs);
/* Always sleep at least 1 sec */
if (secs <= 0)
{
secs = 1;
usecs = 0;
}
sleeptime = secs * 1000 + usecs / 1000;
}
else
sleeptime = -1;
return sleeptime;
}
......@@ -16,4 +16,5 @@ extern bool ReceiveXlogStream(PGconn *conn,
char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout,
char *partial_suffix);
char *partial_suffix,
int fsync_interval);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment