Commit c4f99d20 authored by Fujii Masao's avatar Fujii Masao

Add --synchronous option to pg_receivexlog, for more reliable WAL writing.

Previously pg_receivexlog flushed WAL data only when WAL file was switched.
Then 3dad73e7 added -F option to pg_receivexlog so that users could control
how frequently sync commands were issued to WAL files. It also allowed users
to make pg_receivexlog flush WAL data immediately after writing by
specifying 0 in -F option. However feedback messages were not sent back
immediately even after a flush location was updated. So even if WAL data
was flushed in real time, the server could not see that for a while.

This commit removes -F option from and adds --synchronous to pg_receivexlog.
If --synchronous is specified, like the standby's wal receiver, pg_receivexlog
flushes WAL data as soon as there is WAL data which has not been flushed yet.
Then it sends back the feedback message identifying the latest flush location
to the server. This option is useful to make pg_receivexlog behave as sync
standby by using replication slot, for example.

Original patch by Furuya Osamu, heavily rewritten by me.
Reviewed by Heikki Linnakangas, Alvaro Herrera and Sawada Masahiko.
parent bc241488
...@@ -48,6 +48,13 @@ PostgreSQL documentation ...@@ -48,6 +48,13 @@ PostgreSQL documentation
<application>pg_receivexlog</application>. <application>pg_receivexlog</application>.
</para> </para>
<para>
Unlike the standby's WAL receiver, <application>pg_receivexlog</>
flushes WAL data only when WAL file is closed, by default.
<literal>--synchronous</> option must be specified to flush WAL data
in real time and ensure it's safely flushed to disk.
</para>
<para> <para>
The transaction log is streamed over a regular The transaction log is streamed over a regular
<productname>PostgreSQL</productname> connection, and uses the replication <productname>PostgreSQL</productname> connection, and uses the replication
...@@ -85,21 +92,6 @@ PostgreSQL documentation ...@@ -85,21 +92,6 @@ PostgreSQL documentation
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><option>-F <replaceable class="parameter">interval</replaceable></option></term>
<term><option>--fsync-interval=<replaceable class="parameter">interval</replaceable></option></term>
<listitem>
<para>
Specifies the maximum time to issue sync commands to ensure the
received WAL file is safely flushed to disk, in seconds. The default
value is zero, which disables issuing fsyncs except when WAL file is
closed. If <literal>-1</literal> is specified, WAL file is flushed as
soon as possible, that is, as soon as there are WAL data which has
not been flushed yet.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><option>-n</option></term> <term><option>-n</option></term>
<term><option>--no-loop</option></term> <term><option>--no-loop</option></term>
...@@ -135,15 +127,25 @@ PostgreSQL documentation ...@@ -135,15 +127,25 @@ PostgreSQL documentation
When this option is used, <application>pg_receivexlog</> will report When this option is used, <application>pg_receivexlog</> will report
a flush position to the server, indicating when each segment has been a flush position to the server, indicating when each segment has been
synchronized to disk so that the server can remove that segment if it synchronized to disk so that the server can remove that segment if it
is not otherwise needed. When using this parameter, it is important is not otherwise needed. <literal>--synchronous</literal> option must
to make sure that <application>pg_receivexlog</> cannot become the be specified when making <application>pg_receivexlog</> run as
synchronous standby through an incautious setting of synchronous standby by using replication slot. Otherwise WAL data
<xref linkend="guc-synchronous-standby-names">; it does not flush cannot be flushed frequently enough for this to work correctly.
data frequently enough for this to work correctly.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><option>--synchronous</option></term>
<listitem>
<para>
Issue sync commands as soon as there is WAL data which has not been
flushed yet. Also status packets are sent back to the server just after
WAL data is flushed whatever <literal>--status-interval</> is set to.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><option>-v</option></term> <term><option>-v</option></term>
<term><option>--verbose</option></term> <term><option>--verbose</option></term>
......
...@@ -370,7 +370,7 @@ LogStreamerMain(logstreamer_param *param) ...@@ -370,7 +370,7 @@ LogStreamerMain(logstreamer_param *param)
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir, param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout, reached_end_position, standby_message_timeout,
NULL, 0)) NULL, false))
/* /*
* Any errors will already have been reported in the function process, * Any errors will already have been reported in the function process,
......
...@@ -36,10 +36,10 @@ static char *basedir = NULL; ...@@ -36,10 +36,10 @@ static char *basedir = NULL;
static int verbose = 0; static int verbose = 0;
static int noloop = 0; static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static int fsync_interval = 0; /* 0 = default */
static volatile bool time_to_abort = false; static volatile bool time_to_abort = false;
static bool do_create_slot = false; static bool do_create_slot = false;
static bool do_drop_slot = false; static bool do_drop_slot = false;
static bool synchronous = false;
static void usage(void); static void usage(void);
...@@ -66,12 +66,11 @@ usage(void) ...@@ -66,12 +66,11 @@ usage(void)
printf(_(" %s [OPTION]...\n"), progname); printf(_(" %s [OPTION]...\n"), progname);
printf(_("\nOptions:\n")); printf(_("\nOptions:\n"));
printf(_(" -D, --directory=DIR receive transaction log files into this directory\n")); printf(_(" -D, --directory=DIR receive transaction log files into this directory\n"));
printf(_(" -F --fsync-interval=SECS\n"
" time between fsyncs to transaction log files (default: %d)\n"), (fsync_interval / 1000));
printf(_(" -n, --no-loop do not loop on connection lost\n")); printf(_(" -n, --no-loop do not loop on connection lost\n"));
printf(_(" -s, --status-interval=SECS\n" printf(_(" -s, --status-interval=SECS\n"
" time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000)); " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
printf(_(" -S, --slot=SLOTNAME replication slot to use\n")); printf(_(" -S, --slot=SLOTNAME replication slot to use\n"));
printf(_(" --synchronous flush transaction log immediately after writing\n"));
printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n")); printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n")); printf(_(" -?, --help show this help, then exit\n"));
...@@ -343,7 +342,7 @@ StreamLog(void) ...@@ -343,7 +342,7 @@ StreamLog(void)
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
stop_streaming, standby_message_timeout, ".partial", stop_streaming, standby_message_timeout, ".partial",
fsync_interval); synchronous);
PQfinish(conn); PQfinish(conn);
conn = NULL; conn = NULL;
...@@ -374,7 +373,6 @@ main(int argc, char **argv) ...@@ -374,7 +373,6 @@ main(int argc, char **argv)
{"port", required_argument, NULL, 'p'}, {"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'}, {"username", required_argument, NULL, 'U'},
{"no-loop", no_argument, NULL, 'n'}, {"no-loop", no_argument, NULL, 'n'},
{"fsync-interval", required_argument, NULL, 'F'},
{"no-password", no_argument, NULL, 'w'}, {"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'}, {"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'}, {"status-interval", required_argument, NULL, 's'},
...@@ -383,6 +381,7 @@ main(int argc, char **argv) ...@@ -383,6 +381,7 @@ main(int argc, char **argv)
/* action */ /* action */
{"create-slot", no_argument, NULL, 1}, {"create-slot", no_argument, NULL, 1},
{"drop-slot", no_argument, NULL, 2}, {"drop-slot", no_argument, NULL, 2},
{"synchronous", no_argument, NULL, 3},
{NULL, 0, NULL, 0} {NULL, 0, NULL, 0}
}; };
...@@ -408,7 +407,7 @@ main(int argc, char **argv) ...@@ -408,7 +407,7 @@ main(int argc, char **argv)
} }
} }
while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nF:wWv", while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv",
long_options, &option_index)) != -1) long_options, &option_index)) != -1)
{ {
switch (c) switch (c)
...@@ -455,15 +454,6 @@ main(int argc, char **argv) ...@@ -455,15 +454,6 @@ main(int argc, char **argv)
case 'n': case 'n':
noloop = 1; noloop = 1;
break; break;
case 'F':
fsync_interval = atoi(optarg) * 1000;
if (fsync_interval < -1000)
{
fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
progname, optarg);
exit(1);
}
break;
case 'v': case 'v':
verbose++; verbose++;
break; break;
...@@ -474,6 +464,9 @@ main(int argc, char **argv) ...@@ -474,6 +464,9 @@ main(int argc, char **argv)
case 2: case 2:
do_drop_slot = true; do_drop_slot = true;
break; break;
case 3:
synchronous = true;
break;
default: default:
/* /*
......
...@@ -31,14 +31,13 @@ static char current_walfile_name[MAXPGPATH] = ""; ...@@ -31,14 +31,13 @@ static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false; static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static int64 last_fsync = -1; /* timestamp of last WAL file flush */
static bool still_sending = true; /* feedback still needs to be sent? */ static bool still_sending = true; /* feedback still needs to be sent? */
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir, uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout, stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos, char *partial_suffix, XLogRecPtr *stoppos,
int fsync_interval); bool synchronous);
static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
...@@ -55,8 +54,7 @@ static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, ...@@ -55,8 +54,7 @@ static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
stream_stop_callback stream_stop, stream_stop_callback stream_stop,
char *partial_suffix, XLogRecPtr *stoppos); char *partial_suffix, XLogRecPtr *stoppos);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
int64 last_status, int fsync_interval, int64 last_status);
XLogRecPtr blockpos);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline); uint32 *timeline);
...@@ -209,7 +207,6 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) ...@@ -209,7 +207,6 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
progname, current_walfile_name, partial_suffix); progname, current_walfile_name, partial_suffix);
lastFlushPosition = pos; lastFlushPosition = pos;
last_fsync = feGetCurrentTimestamp();
return true; return true;
} }
...@@ -440,8 +437,8 @@ CheckServerVersionForStreaming(PGconn *conn) ...@@ -440,8 +437,8 @@ CheckServerVersionForStreaming(PGconn *conn)
* allows you to tell the difference between partial and completed files, * allows you to tell the difference between partial and completed files,
* so that you can continue later where you left. * so that you can continue later where you left.
* *
* fsync_interval controls how often we flush to the received WAL file, * If 'synchronous' is true, the received WAL is flushed as soon as written,
* in milliseconds. * otherwise only when the WAL file is closed.
* *
* Note: The log position *must* be at a log segment start! * Note: The log position *must* be at a log segment start!
*/ */
...@@ -450,7 +447,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -450,7 +447,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir, char *sysidentifier, char *basedir,
stream_stop_callback stream_stop, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix, int standby_message_timeout, char *partial_suffix,
int fsync_interval) bool synchronous)
{ {
char query[128]; char query[128];
char slotcmd[128]; char slotcmd[128];
...@@ -595,7 +592,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -595,7 +592,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */ /* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix, standby_message_timeout, partial_suffix,
&stoppos, fsync_interval); &stoppos, synchronous);
if (res == NULL) if (res == NULL)
goto error; goto error;
...@@ -760,7 +757,7 @@ static PGresult * ...@@ -760,7 +757,7 @@ static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop, char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix, int standby_message_timeout, char *partial_suffix,
XLogRecPtr *stoppos, int fsync_interval) XLogRecPtr *stoppos, bool synchronous)
{ {
char *copybuf = NULL; char *copybuf = NULL;
int64 last_status = -1; int64 last_status = -1;
...@@ -784,14 +781,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -784,14 +781,10 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
now = feGetCurrentTimestamp(); now = feGetCurrentTimestamp();
/* /*
* If fsync_interval has elapsed since last WAL flush and we've written * If synchronous option is true, issue sync command as soon as
* some WAL data, flush them to disk. * there are WAL data which has not been flushed yet.
*/ */
if (lastFlushPosition < blockpos && if (synchronous && lastFlushPosition < blockpos && walfile != -1)
walfile != -1 &&
((fsync_interval > 0 &&
feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) ||
fsync_interval < 0))
{ {
if (fsync(walfile) != 0) if (fsync(walfile) != 0)
{ {
...@@ -799,9 +792,15 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -799,9 +792,15 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
progname, current_walfile_name, strerror(errno)); progname, current_walfile_name, strerror(errno));
goto error; goto error;
} }
lastFlushPosition = blockpos; lastFlushPosition = blockpos;
last_fsync = now;
/*
* Send feedback so that the server sees the latest WAL locations
* immediately.
*/
if (!sendFeedback(conn, blockpos, now, false))
goto error;
last_status = now;
} }
/* /*
...@@ -821,7 +820,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -821,7 +820,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Calculate how long send/receive loops should sleep * Calculate how long send/receive loops should sleep
*/ */
sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout, sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
last_status, fsync_interval, blockpos); last_status);
r = CopyStreamReceive(conn, sleeptime, &copybuf); r = CopyStreamReceive(conn, sleeptime, &copybuf);
while (r != 0) while (r != 0)
...@@ -1244,34 +1243,22 @@ CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, ...@@ -1244,34 +1243,22 @@ CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
*/ */
static long static long
CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
int64 last_status, int fsync_interval, XLogRecPtr blockpos) int64 last_status)
{ {
int64 targettime = 0;
int64 status_targettime = 0; int64 status_targettime = 0;
int64 fsync_targettime = 0;
long sleeptime; long sleeptime;
if (standby_message_timeout && still_sending) if (standby_message_timeout && still_sending)
status_targettime = last_status + status_targettime = last_status +
(standby_message_timeout - 1) * ((int64) 1000); (standby_message_timeout - 1) * ((int64) 1000);
if (fsync_interval > 0 && lastFlushPosition < blockpos) if (status_targettime > 0)
fsync_targettime = last_fsync +
(fsync_interval - 1) * ((int64) 1000);
if ((status_targettime < fsync_targettime && status_targettime > 0) ||
fsync_targettime == 0)
targettime = status_targettime;
else
targettime = fsync_targettime;
if (targettime > 0)
{ {
long secs; long secs;
int usecs; int usecs;
feTimestampDifference(now, feTimestampDifference(now,
targettime, status_targettime,
&secs, &secs,
&usecs); &usecs);
/* Always sleep at least 1 sec */ /* Always sleep at least 1 sec */
......
...@@ -31,6 +31,6 @@ extern bool ReceiveXlogStream(PGconn *conn, ...@@ -31,6 +31,6 @@ extern bool ReceiveXlogStream(PGconn *conn,
stream_stop_callback stream_stop, stream_stop_callback stream_stop,
int standby_message_timeout, int standby_message_timeout,
char *partial_suffix, char *partial_suffix,
int fsync_interval); bool synchronous);
#endif /* RECEIVELOG_H */ #endif /* RECEIVELOG_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment