Commit 6d9fa526 authored by Peter Eisentraut's avatar Peter Eisentraut

pg_receivewal: Add --endpos option

This is primarily useful for making tests of this utility more
deterministic, to avoid the complexity of starting pg_receivewal as a
deamon in TAP tests.

While this is less useful than the equivalent pg_recvlogical option,
users can as well use it for example to enforce WAL streaming up to a
end-of-backup position, to save only a minimal amount of WAL.

Use this new option to stream WAL data in a deterministic way within a
new set of TAP tests.

Author: Michael Paquier <michael.paquier@gmail.com>
parent c1898c3e
...@@ -98,6 +98,22 @@ PostgreSQL documentation ...@@ -98,6 +98,22 @@ PostgreSQL documentation
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><option>-E <replaceable>lsn</replaceable></option></term>
<term><option>--endpos=<replaceable>lsn</replaceable></option></term>
<listitem>
<para>
Automatically stop replication and exit with normal exit status 0 when
receiving reaches the specified LSN.
</para>
<para>
If there is a record with LSN exactly equal to <replaceable>lsn</>,
the record will be processed.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><option>--if-not-exists</option></term> <term><option>--if-not-exists</option></term>
<listitem> <listitem>
......
...@@ -36,12 +36,13 @@ static int verbose = 0; ...@@ -36,12 +36,13 @@ static int verbose = 0;
static int compresslevel = 0; static int compresslevel = 0;
static int noloop = 0; static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static volatile bool time_to_abort = false; static volatile bool time_to_stop = false;
static bool do_create_slot = false; static bool do_create_slot = false;
static bool slot_exists_ok = false; static bool slot_exists_ok = false;
static bool do_drop_slot = false; static bool do_drop_slot = false;
static bool synchronous = false; static bool synchronous = false;
static char *replication_slot = NULL; static char *replication_slot = NULL;
static XLogRecPtr endpos = InvalidXLogRecPtr;
static void usage(void); static void usage(void);
...@@ -77,6 +78,7 @@ usage(void) ...@@ -77,6 +78,7 @@ usage(void)
printf(_(" %s [OPTION]...\n"), progname); printf(_(" %s [OPTION]...\n"), progname);
printf(_("\nOptions:\n")); printf(_("\nOptions:\n"));
printf(_(" -D, --directory=DIR receive write-ahead log files into this directory\n")); printf(_(" -D, --directory=DIR receive write-ahead log files into this directory\n"));
printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n")); printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n")); printf(_(" -n, --no-loop do not loop on connection lost\n"));
printf(_(" -s, --status-interval=SECS\n" printf(_(" -s, --status-interval=SECS\n"
...@@ -112,6 +114,16 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) ...@@ -112,6 +114,16 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
progname, (uint32) (xlogpos >> 32), (uint32) xlogpos, progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
timeline); timeline);
if (!XLogRecPtrIsInvalid(endpos) && endpos < xlogpos)
{
if (verbose)
fprintf(stderr, _("%s: stopped streaming at %X/%X (timeline %u)\n"),
progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
timeline);
time_to_stop = true;
return true;
}
/* /*
* Note that we report the previous, not current, position here. After a * Note that we report the previous, not current, position here. After a
* timeline switch, xlogpos points to the beginning of the segment because * timeline switch, xlogpos points to the beginning of the segment because
...@@ -128,7 +140,7 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) ...@@ -128,7 +140,7 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
prevtimeline = timeline; prevtimeline = timeline;
prevpos = xlogpos; prevpos = xlogpos;
if (time_to_abort) if (time_to_stop)
{ {
if (verbose) if (verbose)
fprintf(stderr, _("%s: received interrupt signal, exiting\n"), fprintf(stderr, _("%s: received interrupt signal, exiting\n"),
...@@ -448,7 +460,7 @@ StreamLog(void) ...@@ -448,7 +460,7 @@ StreamLog(void)
static void static void
sigint_handler(int signum) sigint_handler(int signum)
{ {
time_to_abort = true; time_to_stop = true;
} }
#endif #endif
...@@ -460,6 +472,7 @@ main(int argc, char **argv) ...@@ -460,6 +472,7 @@ main(int argc, char **argv)
{"version", no_argument, NULL, 'V'}, {"version", no_argument, NULL, 'V'},
{"directory", required_argument, NULL, 'D'}, {"directory", required_argument, NULL, 'D'},
{"dbname", required_argument, NULL, 'd'}, {"dbname", required_argument, NULL, 'd'},
{"endpos", required_argument, NULL, 'E'},
{"host", required_argument, NULL, 'h'}, {"host", required_argument, NULL, 'h'},
{"port", required_argument, NULL, 'p'}, {"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'}, {"username", required_argument, NULL, 'U'},
...@@ -481,6 +494,7 @@ main(int argc, char **argv) ...@@ -481,6 +494,7 @@ main(int argc, char **argv)
int c; int c;
int option_index; int option_index;
char *db_name; char *db_name;
uint32 hi, lo;
progname = get_progname(argv[0]); progname = get_progname(argv[0]);
set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup")); set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
...@@ -500,7 +514,7 @@ main(int argc, char **argv) ...@@ -500,7 +514,7 @@ main(int argc, char **argv)
} }
} }
while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWvZ:", while ((c = getopt_long(argc, argv, "D:d:E:h:p:U:s:S:nwWvZ:",
long_options, &option_index)) != -1) long_options, &option_index)) != -1)
{ {
switch (c) switch (c)
...@@ -544,6 +558,16 @@ main(int argc, char **argv) ...@@ -544,6 +558,16 @@ main(int argc, char **argv)
case 'S': case 'S':
replication_slot = pg_strdup(optarg); replication_slot = pg_strdup(optarg);
break; break;
case 'E':
if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
{
fprintf(stderr,
_("%s: could not parse end position \"%s\"\n"),
progname, optarg);
exit(1);
}
endpos = ((uint64) hi) << 32 | lo;
break;
case 'n': case 'n':
noloop = 1; noloop = 1;
break; break;
...@@ -714,11 +738,11 @@ main(int argc, char **argv) ...@@ -714,11 +738,11 @@ main(int argc, char **argv)
while (true) while (true)
{ {
StreamLog(); StreamLog();
if (time_to_abort) if (time_to_stop)
{ {
/* /*
* We've been Ctrl-C'ed. That's not an error, so exit without an * We've been Ctrl-C'ed or end of streaming position has been
* errorcode. * willingly reached, so exit without an error code.
*/ */
exit(0); exit(0);
} }
......
use strict; use strict;
use warnings; use warnings;
use TestLib; use TestLib;
use Test::More tests => 8; use PostgresNode;
use Test::More tests => 14;
program_help_ok('pg_receivewal'); program_help_ok('pg_receivewal');
program_version_ok('pg_receivewal'); program_version_ok('pg_receivewal');
program_options_handling_ok('pg_receivewal'); program_options_handling_ok('pg_receivewal');
my $primary = get_new_node('primary');
$primary->init(allows_streaming => 1);
$primary->start;
my $stream_dir = $primary->basedir . '/archive_wal';
mkdir($stream_dir);
# Sanity checks for command line options.
$primary->command_fails(['pg_receivewal'],
'pg_receivewal needs target directory specified');
$primary->command_fails(
[ 'pg_receivewal', '-D', $stream_dir, '--create-slot', '--drop-slot' ],
'failure if both --create-slot and --drop-slot specified');
$primary->command_fails(
[ 'pg_receivewal', '-D', $stream_dir, '--create-slot' ],
'failure if --create-slot specified without --slot');
# Slot creation and drop
my $slot_name = 'test';
$primary->command_ok(
[ 'pg_receivewal', '--slot', $slot_name, '--create-slot' ],
'creating a replication slot');
$primary->command_ok([ 'pg_receivewal', '--slot', $slot_name, '--drop-slot' ],
'dropping a replication slot');
# Generate some WAL. Use --synchronous at the same time to add more
# code coverage. Switch to the next segment first so that subsequent
# restarts of pg_receivewal will see this segment as full..
$primary->psql('postgres', 'CREATE TABLE test_table(x integer);');
$primary->psql('postgres', 'SELECT pg_switch_wal();');
my $nextlsn =
$primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
chomp($nextlsn);
$primary->psql('postgres',
'INSERT INTO test_table VALUES (generate_series(1,100));');
# Stream up to the given position.
$primary->command_ok(
[ 'pg_receivewal', '-D', $stream_dir, '--verbose',
'--endpos', $nextlsn, '--synchronous', '--no-loop' ],
'streaming some WAL with --synchronous');
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment