Commit 2f29f011 authored by Alvaro Herrera's avatar Alvaro Herrera

pg_basebackup: stylistic adjustments

The most user-visible part of this is to change the long options
--statusint and --noloop to --status-interval and --no-loop,
respectively, per discussion.

Also, consistently enclose file names in double quotes, per our
conventions; and consistently use the term "transaction log file" to
talk about WAL segments.  (Someday we may need to go over this
terminology and make it consistent across the whole source code.)

Finally, reflow the code to better fit in 80 columns, and have pgindent
fix it up some more.
parent 04d2956f
...@@ -346,7 +346,7 @@ PostgreSQL documentation ...@@ -346,7 +346,7 @@ PostgreSQL documentation
<variablelist> <variablelist>
<varlistentry> <varlistentry>
<term><option>-s <replaceable class="parameter">interval</replaceable></option></term> <term><option>-s <replaceable class="parameter">interval</replaceable></option></term>
<term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term> <term><option>--status-interval=<replaceable class="parameter">interval</replaceable></option></term>
<listitem> <listitem>
<para> <para>
Specifies the number of seconds between status packets sent back to the Specifies the number of seconds between status packets sent back to the
......
...@@ -96,7 +96,7 @@ PostgreSQL documentation ...@@ -96,7 +96,7 @@ PostgreSQL documentation
<variablelist> <variablelist>
<varlistentry> <varlistentry>
<term><option>-n</option></term> <term><option>-n</option></term>
<term><option>--noloop</option></term> <term><option>--no-loop</option></term>
<listitem> <listitem>
<para> <para>
Don't loop on connection errors. Instead, exit right away with Don't loop on connection errors. Instead, exit right away with
...@@ -124,7 +124,7 @@ PostgreSQL documentation ...@@ -124,7 +124,7 @@ PostgreSQL documentation
<variablelist> <variablelist>
<varlistentry> <varlistentry>
<term><option>-s <replaceable class="parameter">interval</replaceable></option></term> <term><option>-s <replaceable class="parameter">interval</replaceable></option></term>
<term><option>--statusint=<replaceable class="parameter">interval</replaceable></option></term> <term><option>--status-interval=<replaceable class="parameter">interval</replaceable></option></term>
<listitem> <listitem>
<para> <para>
Specifies the number of seconds between status packets sent back to the Specifies the number of seconds between status packets sent back to the
......
...@@ -79,7 +79,8 @@ static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); ...@@ -79,7 +79,8 @@ static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void BaseBackup(void); static void BaseBackup(void);
static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finished); static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
bool segment_finished);
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
static const char * static const char *
...@@ -96,7 +97,6 @@ get_gz_error(gzFile gzf) ...@@ -96,7 +97,6 @@ get_gz_error(gzFile gzf)
} }
#endif #endif
static void static void
usage(void) usage(void)
{ {
...@@ -121,7 +121,8 @@ usage(void) ...@@ -121,7 +121,8 @@ usage(void)
printf(_(" -V, --version output version information, then exit\n")); printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n")); printf(_(" -?, --help show this help, then exit\n"));
printf(_("\nConnection options:\n")); printf(_("\nConnection options:\n"));
printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n")); printf(_(" -s, --status-interval=INTERVAL\n"
" time between status packets sent to server (in seconds)\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
printf(_(" -p, --port=PORT database server port number\n")); printf(_(" -p, --port=PORT database server port number\n"));
printf(_(" -U, --username=NAME connect as specified database user\n")); printf(_(" -U, --username=NAME connect as specified database user\n"));
...@@ -140,7 +141,8 @@ usage(void) ...@@ -140,7 +141,8 @@ usage(void)
* time to stop. * time to stop.
*/ */
static bool static bool
reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finished) reached_end_position(XLogRecPtr segendpos, uint32 timeline,
bool segment_finished)
{ {
if (!has_xlogendptr) if (!has_xlogendptr)
{ {
...@@ -176,7 +178,8 @@ reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finishe ...@@ -176,7 +178,8 @@ reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finishe
if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2) if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2)
{ {
fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"), fprintf(stderr,
_("%s: could not parse transaction log location \"%s\"\n"),
progname, xlogend); progname, xlogend);
exit(1); exit(1);
} }
...@@ -234,7 +237,8 @@ LogStreamerMain(logstreamer_param *param) ...@@ -234,7 +237,8 @@ LogStreamerMain(logstreamer_param *param)
{ {
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir, param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout, true)) reached_end_position, standby_message_timeout,
true))
/* /*
* Any errors will already have been reported in the function process, * Any errors will already have been reported in the function process,
...@@ -266,7 +270,8 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) ...@@ -266,7 +270,8 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
/* Convert the starting position */ /* Convert the starting position */
if (sscanf(startpos, "%X/%X", &hi, &lo) != 2) if (sscanf(startpos, "%X/%X", &hi, &lo) != 2)
{ {
fprintf(stderr, _("%s: invalid format of xlog location: %s\n"), fprintf(stderr,
_("%s: could not parse transaction log location \"%s\"\n"),
progname, startpos); progname, startpos);
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -278,7 +283,8 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) ...@@ -278,7 +283,8 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
/* Create our background pipe */ /* Create our background pipe */
if (pipe(bgpipe) < 0) if (pipe(bgpipe) < 0)
{ {
fprintf(stderr, _("%s: could not create pipe for background process: %s\n"), fprintf(stderr,
_("%s: could not create pipe for background process: %s\n"),
progname, strerror(errno)); progname, strerror(errno));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -407,7 +413,8 @@ progress_report(int tablespacenum, const char *filename) ...@@ -407,7 +413,8 @@ progress_report(int tablespacenum, const char *filename)
* translatable strings. And we only test for INT64_FORMAT availability * translatable strings. And we only test for INT64_FORMAT availability
* in snprintf, not fprintf. * in snprintf, not fprintf.
*/ */
snprintf(totaldone_str, sizeof(totaldone_str), INT64_FORMAT, totaldone / 1024); snprintf(totaldone_str, sizeof(totaldone_str), INT64_FORMAT,
totaldone / 1024);
snprintf(totalsize_str, sizeof(totalsize_str), INT64_FORMAT, totalsize); snprintf(totalsize_str, sizeof(totalsize_str), INT64_FORMAT, totalsize);
if (verbose) if (verbose)
...@@ -422,20 +429,23 @@ progress_report(int tablespacenum, const char *filename) ...@@ -422,20 +429,23 @@ progress_report(int tablespacenum, const char *filename)
ngettext("%s/%s kB (100%%), %d/%d tablespace %35s", ngettext("%s/%s kB (100%%), %d/%d tablespace %35s",
"%s/%s kB (100%%), %d/%d tablespaces %35s", "%s/%s kB (100%%), %d/%d tablespaces %35s",
tablespacecount), tablespacecount),
totaldone_str, totalsize_str, tablespacenum, tablespacecount, ""); totaldone_str, totalsize_str,
tablespacenum, tablespacecount, "");
else else
fprintf(stderr, fprintf(stderr,
ngettext("%s/%s kB (%d%%), %d/%d tablespace (%-30.30s)", ngettext("%s/%s kB (%d%%), %d/%d tablespace (%-30.30s)",
"%s/%s kB (%d%%), %d/%d tablespaces (%-30.30s)", "%s/%s kB (%d%%), %d/%d tablespaces (%-30.30s)",
tablespacecount), tablespacecount),
totaldone_str, totalsize_str, percent, tablespacenum, tablespacecount, filename); totaldone_str, totalsize_str, percent,
tablespacenum, tablespacecount, filename);
} }
else else
fprintf(stderr, fprintf(stderr,
ngettext("%s/%s kB (%d%%), %d/%d tablespace", ngettext("%s/%s kB (%d%%), %d/%d tablespace",
"%s/%s kB (%d%%), %d/%d tablespaces", "%s/%s kB (%d%%), %d/%d tablespaces",
tablespacecount), tablespacecount),
totaldone_str, totalsize_str, percent, tablespacenum, tablespacecount); totaldone_str, totalsize_str, percent,
tablespacenum, tablespacecount);
fprintf(stderr, "\r"); fprintf(stderr, "\r");
} }
...@@ -463,7 +473,7 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -463,7 +473,7 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
#endif #endif
if (PQgetisnull(res, rownum, 0)) if (PQgetisnull(res, rownum, 0))
{
/* /*
* Base tablespaces * Base tablespaces
*/ */
...@@ -473,9 +483,11 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -473,9 +483,11 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
if (compresslevel != 0) if (compresslevel != 0)
{ {
ztarfile = gzdopen(dup(fileno(stdout)), "wb"); ztarfile = gzdopen(dup(fileno(stdout)), "wb");
if (gzsetparams(ztarfile, compresslevel, Z_DEFAULT_STRATEGY) != Z_OK) if (gzsetparams(ztarfile, compresslevel,
Z_DEFAULT_STRATEGY) != Z_OK)
{ {
fprintf(stderr, _("%s: could not set compression level %d: %s\n"), fprintf(stderr,
_("%s: could not set compression level %d: %s\n"),
progname, compresslevel, get_gz_error(ztarfile)); progname, compresslevel, get_gz_error(ztarfile));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -491,9 +503,11 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -491,9 +503,11 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
{ {
snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir); snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir);
ztarfile = gzopen(filename, "wb"); ztarfile = gzopen(filename, "wb");
if (gzsetparams(ztarfile, compresslevel, Z_DEFAULT_STRATEGY) != Z_OK) if (gzsetparams(ztarfile, compresslevel,
Z_DEFAULT_STRATEGY) != Z_OK)
{ {
fprintf(stderr, _("%s: could not set compression level %d: %s\n"), fprintf(stderr,
_("%s: could not set compression level %d: %s\n"),
progname, compresslevel, get_gz_error(ztarfile)); progname, compresslevel, get_gz_error(ztarfile));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -505,6 +519,7 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -505,6 +519,7 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
tarfile = fopen(filename, "wb"); tarfile = fopen(filename, "wb");
} }
} }
}
else else
{ {
/* /*
...@@ -513,11 +528,14 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -513,11 +528,14 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
if (compresslevel != 0) if (compresslevel != 0)
{ {
snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir, PQgetvalue(res, rownum, 0)); snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir,
PQgetvalue(res, rownum, 0));
ztarfile = gzopen(filename, "wb"); ztarfile = gzopen(filename, "wb");
if (gzsetparams(ztarfile, compresslevel, Z_DEFAULT_STRATEGY) != Z_OK) if (gzsetparams(ztarfile, compresslevel,
Z_DEFAULT_STRATEGY) != Z_OK)
{ {
fprintf(stderr, _("%s: could not set compression level %d: %s\n"), fprintf(stderr,
_("%s: could not set compression level %d: %s\n"),
progname, compresslevel, get_gz_error(ztarfile)); progname, compresslevel, get_gz_error(ztarfile));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -525,7 +543,8 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -525,7 +543,8 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
else else
#endif #endif
{ {
snprintf(filename, sizeof(filename), "%s/%s.tar", basedir, PQgetvalue(res, rownum, 0)); snprintf(filename, sizeof(filename), "%s/%s.tar", basedir,
PQgetvalue(res, rownum, 0));
tarfile = fopen(filename, "wb"); tarfile = fopen(filename, "wb");
} }
} }
...@@ -536,7 +555,8 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -536,7 +555,8 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
if (!ztarfile) if (!ztarfile)
{ {
/* Compression is in use */ /* Compression is in use */
fprintf(stderr, _("%s: could not create compressed file \"%s\": %s\n"), fprintf(stderr,
_("%s: could not create compressed file \"%s\": %s\n"),
progname, filename, get_gz_error(ztarfile)); progname, filename, get_gz_error(ztarfile));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -589,9 +609,11 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -589,9 +609,11 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
if (ztarfile != NULL) if (ztarfile != NULL)
{ {
if (gzwrite(ztarfile, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf)) if (gzwrite(ztarfile, zerobuf, sizeof(zerobuf)) !=
sizeof(zerobuf))
{ {
fprintf(stderr, _("%s: could not write to compressed file \"%s\": %s\n"), fprintf(stderr,
_("%s: could not write to compressed file \"%s\": %s\n"),
progname, filename, get_gz_error(ztarfile)); progname, filename, get_gz_error(ztarfile));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -601,7 +623,8 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -601,7 +623,8 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
{ {
if (fwrite(zerobuf, sizeof(zerobuf), 1, tarfile) != 1) if (fwrite(zerobuf, sizeof(zerobuf), 1, tarfile) != 1)
{ {
fprintf(stderr, _("%s: could not write to file \"%s\": %s\n"), fprintf(stderr,
_("%s: could not write to file \"%s\": %s\n"),
progname, filename, strerror(errno)); progname, filename, strerror(errno));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -612,7 +635,8 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -612,7 +635,8 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
{ {
if (gzclose(ztarfile) != 0) if (gzclose(ztarfile) != 0)
{ {
fprintf(stderr, _("%s: could not close compressed file \"%s\": %s\n"), fprintf(stderr,
_("%s: could not close compressed file \"%s\": %s\n"),
progname, filename, get_gz_error(ztarfile)); progname, filename, get_gz_error(ztarfile));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -624,7 +648,8 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -624,7 +648,8 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
{ {
if (fclose(tarfile) != 0) if (fclose(tarfile) != 0)
{ {
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), fprintf(stderr,
_("%s: could not close file \"%s\": %s\n"),
progname, filename, strerror(errno)); progname, filename, strerror(errno));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -645,7 +670,8 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -645,7 +670,8 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
{ {
if (gzwrite(ztarfile, copybuf, r) != r) if (gzwrite(ztarfile, copybuf, r) != r)
{ {
fprintf(stderr, _("%s: could not write to compressed file \"%s\": %s\n"), fprintf(stderr,
_("%s: could not write to compressed file \"%s\": %s\n"),
progname, filename, get_gz_error(ztarfile)); progname, filename, get_gz_error(ztarfile));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -773,7 +799,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -773,7 +799,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
/* /*
* First part of header is zero terminated filename * First part of header is zero terminated filename
*/ */
snprintf(filename, sizeof(filename), "%s/%s", current_path, copybuf); snprintf(filename, sizeof(filename), "%s/%s", current_path,
copybuf);
if (filename[strlen(filename) - 1] == '/') if (filename[strlen(filename) - 1] == '/')
{ {
/* /*
...@@ -802,7 +829,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -802,7 +829,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
} }
#ifndef WIN32 #ifndef WIN32
if (chmod(filename, (mode_t) filemode)) if (chmod(filename, (mode_t) filemode))
fprintf(stderr, _("%s: could not set permissions on directory \"%s\": %s\n"), fprintf(stderr,
_("%s: could not set permissions on directory \"%s\": %s\n"),
progname, filename, strerror(errno)); progname, filename, strerror(errno));
#endif #endif
} }
...@@ -822,7 +850,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -822,7 +850,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
} }
else else
{ {
fprintf(stderr, _("%s: unrecognized link indicator \"%c\"\n"), fprintf(stderr,
_("%s: unrecognized link indicator \"%c\"\n"),
progname, copybuf[156]); progname, copybuf[156]);
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -900,7 +929,9 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -900,7 +929,9 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
if (file != NULL) if (file != NULL)
{ {
fprintf(stderr, _("%s: COPY stream ended before last file was finished\n"), progname); fprintf(stderr,
_("%s: COPY stream ended before last file was finished\n"),
progname);
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -935,14 +966,15 @@ BaseBackup(void) ...@@ -935,14 +966,15 @@ BaseBackup(void)
res = PQexec(conn, "IDENTIFY_SYSTEM"); res = PQexec(conn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
fprintf(stderr, _("%s: could not identify system: %s"), fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, PQerrorMessage(conn)); progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
if (PQntuples(res) != 1 || PQnfields(res) != 3) if (PQntuples(res) != 1 || PQnfields(res) != 3)
{ {
fprintf(stderr, _("%s: could not identify system, got %d rows and %d fields\n"), fprintf(stderr,
progname, PQntuples(res), PQnfields(res)); _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 3);
disconnect_and_exit(1); disconnect_and_exit(1);
} }
sysidentifier = strdup(PQgetvalue(res, 0, 0)); sysidentifier = strdup(PQgetvalue(res, 0, 0));
...@@ -953,7 +985,8 @@ BaseBackup(void) ...@@ -953,7 +985,8 @@ BaseBackup(void)
* Start the actual backup * Start the actual backup
*/ */
PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i); PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s", snprintf(current_path, sizeof(current_path),
"BASE_BACKUP LABEL '%s' %s %s %s %s",
escaped_label, escaped_label,
showprogress ? "PROGRESS" : "", showprogress ? "PROGRESS" : "",
includewal && !streamwal ? "WAL" : "", includewal && !streamwal ? "WAL" : "",
...@@ -962,8 +995,8 @@ BaseBackup(void) ...@@ -962,8 +995,8 @@ BaseBackup(void)
if (PQsendQuery(conn, current_path) == 0) if (PQsendQuery(conn, current_path) == 0)
{ {
fprintf(stderr, _("%s: could not send base backup command: %s"), fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, PQerrorMessage(conn)); progname, "BASE_BACKUP", PQerrorMessage(conn));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -985,7 +1018,7 @@ BaseBackup(void) ...@@ -985,7 +1018,7 @@ BaseBackup(void)
} }
strcpy(xlogstart, PQgetvalue(res, 0, 0)); strcpy(xlogstart, PQgetvalue(res, 0, 0));
if (verbose && includewal) if (verbose && includewal)
fprintf(stderr, "xlog start point: %s\n", xlogstart); fprintf(stderr, "transaction log start point: %s\n", xlogstart);
PQclear(res); PQclear(res);
MemSet(xlogend, 0, sizeof(xlogend)); MemSet(xlogend, 0, sizeof(xlogend));
...@@ -1029,7 +1062,8 @@ BaseBackup(void) ...@@ -1029,7 +1062,8 @@ BaseBackup(void)
*/ */
if (format == 't' && strcmp(basedir, "-") == 0 && PQntuples(res) > 1) if (format == 't' && strcmp(basedir, "-") == 0 && PQntuples(res) > 1)
{ {
fprintf(stderr, _("%s: can only write single tablespace to stdout, database has %d\n"), fprintf(stderr,
_("%s: can only write single tablespace to stdout, database has %d\n"),
progname, PQntuples(res)); progname, PQntuples(res));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -1070,19 +1104,21 @@ BaseBackup(void) ...@@ -1070,19 +1104,21 @@ BaseBackup(void)
res = PQgetResult(conn); res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
fprintf(stderr, _("%s: could not get WAL end position from server: %s"), fprintf(stderr,
_("%s: could not get transaction log end position from server: %s"),
progname, PQerrorMessage(conn)); progname, PQerrorMessage(conn));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
if (PQntuples(res) != 1) if (PQntuples(res) != 1)
{ {
fprintf(stderr, _("%s: no WAL end position returned from server\n"), fprintf(stderr,
_("%s: no transaction log end position returned from server\n"),
progname); progname);
disconnect_and_exit(1); disconnect_and_exit(1);
} }
strcpy(xlogend, PQgetvalue(res, 0, 0)); strcpy(xlogend, PQgetvalue(res, 0, 0));
if (verbose && includewal) if (verbose && includewal)
fprintf(stderr, "xlog end point: %s\n", xlogend); fprintf(stderr, "transaction log end point: %s\n", xlogend);
PQclear(res); PQclear(res);
res = PQgetResult(conn); res = PQgetResult(conn);
...@@ -1105,12 +1141,14 @@ BaseBackup(void) ...@@ -1105,12 +1141,14 @@ BaseBackup(void)
#endif #endif
if (verbose) if (verbose)
fprintf(stderr, _("%s: waiting for background process to finish streaming...\n"), progname); fprintf(stderr,
_("%s: waiting for background process to finish streaming...\n"), progname);
#ifndef WIN32 #ifndef WIN32
if (write(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend)) if (write(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
{ {
fprintf(stderr, _("%s: could not send command to background pipe: %s\n"), fprintf(stderr,
_("%s: could not send command to background pipe: %s\n"),
progname, strerror(errno)); progname, strerror(errno));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -1151,7 +1189,8 @@ BaseBackup(void) ...@@ -1151,7 +1189,8 @@ BaseBackup(void)
*/ */
if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2) if (sscanf(xlogend, "%X/%X", &hi, &lo) != 2)
{ {
fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"), fprintf(stderr,
_("%s: could not parse transaction log location \"%s\"\n"),
progname, xlogend); progname, xlogend);
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -1159,7 +1198,8 @@ BaseBackup(void) ...@@ -1159,7 +1198,8 @@ BaseBackup(void)
InterlockedIncrement(&has_xlogendptr); InterlockedIncrement(&has_xlogendptr);
/* First wait for the thread to exit */ /* First wait for the thread to exit */
if (WaitForSingleObjectEx((HANDLE) bgchild, INFINITE, FALSE) != WAIT_OBJECT_0) if (WaitForSingleObjectEx((HANDLE) bgchild, INFINITE, FALSE) !=
WAIT_OBJECT_0)
{ {
_dosmaperr(GetLastError()); _dosmaperr(GetLastError());
fprintf(stderr, _("%s: could not wait for child thread: %s\n"), fprintf(stderr, _("%s: could not wait for child thread: %s\n"),
...@@ -1213,7 +1253,7 @@ main(int argc, char **argv) ...@@ -1213,7 +1253,7 @@ main(int argc, char **argv)
{"username", required_argument, NULL, 'U'}, {"username", required_argument, NULL, 'U'},
{"no-password", no_argument, NULL, 'w'}, {"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'}, {"password", no_argument, NULL, 'W'},
{"statusint", required_argument, NULL, 's'}, {"status-interval", required_argument, NULL, 's'},
{"verbose", no_argument, NULL, 'v'}, {"verbose", no_argument, NULL, 'v'},
{"progress", no_argument, NULL, 'P'}, {"progress", no_argument, NULL, 'P'},
{NULL, 0, NULL, 0} {NULL, 0, NULL, 0}
...@@ -1255,7 +1295,8 @@ main(int argc, char **argv) ...@@ -1255,7 +1295,8 @@ main(int argc, char **argv)
format = 't'; format = 't';
else else
{ {
fprintf(stderr, _("%s: invalid output format \"%s\", must be \"plain\" or \"tar\"\n"), fprintf(stderr,
_("%s: invalid output format \"%s\", must be \"plain\" or \"tar\"\n"),
progname, optarg); progname, optarg);
exit(1); exit(1);
} }
...@@ -1263,7 +1304,8 @@ main(int argc, char **argv) ...@@ -1263,7 +1304,8 @@ main(int argc, char **argv)
case 'x': case 'x':
if (includewal) if (includewal)
{ {
fprintf(stderr, _("%s: cannot specify both --xlog and --xlog-method\n"), fprintf(stderr,
_("%s: cannot specify both --xlog and --xlog-method\n"),
progname); progname);
exit(1); exit(1);
} }
...@@ -1274,7 +1316,8 @@ main(int argc, char **argv) ...@@ -1274,7 +1316,8 @@ main(int argc, char **argv)
case 'X': case 'X':
if (includewal) if (includewal)
{ {
fprintf(stderr, _("%s: cannot specify both --xlog and --xlog-method\n"), fprintf(stderr,
_("%s: cannot specify both --xlog and --xlog-method\n"),
progname); progname);
exit(1); exit(1);
} }
...@@ -1288,7 +1331,8 @@ main(int argc, char **argv) ...@@ -1288,7 +1331,8 @@ main(int argc, char **argv)
streamwal = true; streamwal = true;
else else
{ {
fprintf(stderr, _("%s: invalid xlog-method option \"%s\", must be empty, \"fetch\", or \"stream\"\n"), fprintf(stderr,
_("%s: invalid xlog-method option \"%s\", must be empty, \"fetch\", or \"stream\"\n"),
progname, optarg); progname, optarg);
exit(1); exit(1);
} }
...@@ -1430,7 +1474,6 @@ main(int argc, char **argv) ...@@ -1430,7 +1474,6 @@ main(int argc, char **argv)
if (format == 'p' || strcmp(basedir, "-") != 0) if (format == 'p' || strcmp(basedir, "-") != 0)
verify_dir_is_empty_or_create(basedir); verify_dir_is_empty_or_create(basedir);
BaseBackup(); BaseBackup();
return 0; return 0;
......
...@@ -45,9 +45,11 @@ volatile bool time_to_abort = false; ...@@ -45,9 +45,11 @@ volatile bool time_to_abort = false;
static void usage(void); static void usage(void);
static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline); static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos,
uint32 currenttimeline);
static void StreamLog(); static void StreamLog();
static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished); static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
bool segment_finished);
static void static void
usage(void) usage(void)
...@@ -58,12 +60,13 @@ usage(void) ...@@ -58,12 +60,13 @@ usage(void)
printf(_(" %s [OPTION]...\n"), progname); printf(_(" %s [OPTION]...\n"), progname);
printf(_("\nOptions:\n")); printf(_("\nOptions:\n"));
printf(_(" -D, --directory=DIR receive transaction log files into this directory\n")); printf(_(" -D, --directory=DIR receive transaction log files into this directory\n"));
printf(_(" -n, --noloop do not loop on connection lost\n")); printf(_(" -n, --no-loop do not loop on connection lost\n"));
printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n")); printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n")); printf(_(" -?, --help show this help, then exit\n"));
printf(_("\nConnection options:\n")); printf(_("\nConnection options:\n"));
printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n")); printf(_(" -s, --status-interval=INTERVAL\n"
" time between status packets sent to server (in seconds)\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
printf(_(" -p, --port=PORT database server port number\n")); printf(_(" -p, --port=PORT database server port number\n"));
printf(_(" -U, --username=NAME connect as specified database user\n")); printf(_(" -U, --username=NAME connect as specified database user\n"));
...@@ -123,7 +126,8 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) ...@@ -123,7 +126,8 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
seg; seg;
XLogSegNo segno; XLogSegNo segno;
if (strcmp(dirent->d_name, ".") == 0 || strcmp(dirent->d_name, "..") == 0) if (strcmp(dirent->d_name, ".") == 0 ||
strcmp(dirent->d_name, "..") == 0)
continue; continue;
/* xlog files are always 24 characters */ /* xlog files are always 24 characters */
...@@ -149,7 +153,8 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) ...@@ -149,7 +153,8 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
*/ */
if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3) if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3)
{ {
fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"), fprintf(stderr,
_("%s: could not parse transaction log file name \"%s\"\n"),
progname, dirent->d_name); progname, dirent->d_name);
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -179,7 +184,8 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) ...@@ -179,7 +184,8 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
} }
else else
{ {
fprintf(stderr, _("%s: segment file '%s' is incorrect size %d, skipping\n"), fprintf(stderr,
_("%s: segment file \"%s\" has incorrect size %d, skipping\n"),
progname, dirent->d_name, (int) statbuf.st_size); progname, dirent->d_name, (int) statbuf.st_size);
continue; continue;
} }
...@@ -232,20 +238,22 @@ StreamLog(void) ...@@ -232,20 +238,22 @@ StreamLog(void)
res = PQexec(conn, "IDENTIFY_SYSTEM"); res = PQexec(conn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
fprintf(stderr, _("%s: could not identify system: %s\n"), fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, PQerrorMessage(conn)); progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
if (PQntuples(res) != 1 || PQnfields(res) != 3) if (PQntuples(res) != 1 || PQnfields(res) != 3)
{ {
fprintf(stderr, _("%s: could not identify system, got %d rows and %d fields\n"), fprintf(stderr,
progname, PQntuples(res), PQnfields(res)); _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 3);
disconnect_and_exit(1); disconnect_and_exit(1);
} }
timeline = atoi(PQgetvalue(res, 0, 1)); timeline = atoi(PQgetvalue(res, 0, 1));
if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2) if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
{ {
fprintf(stderr, _("%s: could not parse log start position from value \"%s\"\n"), fprintf(stderr,
_("%s: could not parse transaction log location \"%s\"\n"),
progname, PQgetvalue(res, 0, 2)); progname, PQgetvalue(res, 0, 2));
disconnect_and_exit(1); disconnect_and_exit(1);
} }
...@@ -266,14 +274,13 @@ StreamLog(void) ...@@ -266,14 +274,13 @@ StreamLog(void)
* Start the replication * Start the replication
*/ */
if (verbose) if (verbose)
fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"), fprintf(stderr,
progname, _("%s: starting log streaming at %X/%X (timeline %u)\n"),
(uint32) (startpos >> 32), (uint32) startpos, progname, (uint32) (startpos >> 32), (uint32) startpos,
timeline); timeline);
ReceiveXlogStream(conn, startpos, timeline, NULL, basedir, ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
stop_streaming, stop_streaming, standby_message_timeout, false);
standby_message_timeout, false);
PQfinish(conn); PQfinish(conn);
} }
...@@ -301,15 +308,14 @@ main(int argc, char **argv) ...@@ -301,15 +308,14 @@ main(int argc, char **argv)
{"host", required_argument, NULL, 'h'}, {"host", required_argument, NULL, 'h'},
{"port", required_argument, NULL, 'p'}, {"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'}, {"username", required_argument, NULL, 'U'},
{"noloop", no_argument, NULL, 'n'}, {"no-loop", no_argument, NULL, 'n'},
{"no-password", no_argument, NULL, 'w'}, {"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'}, {"password", no_argument, NULL, 'W'},
{"statusint", required_argument, NULL, 's'}, {"status-interval", required_argument, NULL, 's'},
{"verbose", no_argument, NULL, 'v'}, {"verbose", no_argument, NULL, 'v'},
{NULL, 0, NULL, 0} {NULL, 0, NULL, 0}
}; };
int c; int c;
int option_index; int option_index;
progname = get_progname(argv[0]); progname = get_progname(argv[0]);
...@@ -322,8 +328,8 @@ main(int argc, char **argv) ...@@ -322,8 +328,8 @@ main(int argc, char **argv)
usage(); usage();
exit(0); exit(0);
} }
else if (strcmp(argv[1], "-V") == 0 else if (strcmp(argv[1], "-V") == 0 ||
|| strcmp(argv[1], "--version") == 0) strcmp(argv[1], "--version") == 0)
{ {
puts("pg_receivexlog (PostgreSQL) " PG_VERSION); puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
exit(0); exit(0);
...@@ -417,12 +423,13 @@ main(int argc, char **argv) ...@@ -417,12 +423,13 @@ main(int argc, char **argv)
{ {
StreamLog(); StreamLog();
if (time_to_abort) if (time_to_abort)
{
/* /*
* We've been Ctrl-C'ed. That's not an error, so exit without an * We've been Ctrl-C'ed. That's not an error, so exit without an
* errorcode. * errorcode.
*/ */
exit(0); exit(0);
}
else if (noloop) else if (noloop)
{ {
fprintf(stderr, _("%s: disconnected.\n"), progname); fprintf(stderr, _("%s: disconnected.\n"), progname);
...@@ -430,7 +437,8 @@ main(int argc, char **argv) ...@@ -430,7 +437,8 @@ main(int argc, char **argv)
} }
else else
{ {
fprintf(stderr, _("%s: disconnected. Waiting %d seconds to try again\n"), fprintf(stderr,
_("%s: disconnected. Waiting %d seconds to try again\n"),
progname, RECONNECT_SLEEP_TIME); progname, RECONNECT_SLEEP_TIME);
pg_usleep(RECONNECT_SLEEP_TIME * 1000000); pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
} }
......
...@@ -50,7 +50,8 @@ static int walfile = -1; ...@@ -50,7 +50,8 @@ static int walfile = -1;
* The file will be padded to 16Mb with zeroes. * The file will be padded to 16Mb with zeroes.
*/ */
static int static int
open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf) open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
char *namebuf)
{ {
int f; int f;
char fn[MAXPGPATH]; char fn[MAXPGPATH];
...@@ -66,7 +67,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu ...@@ -66,7 +67,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu
f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (f == -1) if (f == -1)
{ {
fprintf(stderr, _("%s: could not open WAL segment %s: %s\n"), fprintf(stderr,
_("%s: could not open transaction log file \"%s\": %s\n"),
progname, fn, strerror(errno)); progname, fn, strerror(errno));
return -1; return -1;
} }
...@@ -77,7 +79,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu ...@@ -77,7 +79,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu
*/ */
if (fstat(f, &statbuf) != 0) if (fstat(f, &statbuf) != 0)
{ {
fprintf(stderr, _("%s: could not stat WAL segment %s: %s\n"), fprintf(stderr,
_("%s: could not stat transaction log file \"%s\": %s\n"),
progname, fn, strerror(errno)); progname, fn, strerror(errno));
close(f); close(f);
return -1; return -1;
...@@ -86,7 +89,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu ...@@ -86,7 +89,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu
return f; /* File is open and ready to use */ return f; /* File is open and ready to use */
if (statbuf.st_size != 0) if (statbuf.st_size != 0)
{ {
fprintf(stderr, _("%s: WAL segment %s is %d bytes, should be 0 or %d\n"), fprintf(stderr,
_("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
progname, fn, (int) statbuf.st_size, XLogSegSize); progname, fn, (int) statbuf.st_size, XLogSegSize);
close(f); close(f);
return -1; return -1;
...@@ -98,7 +102,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu ...@@ -98,7 +102,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu
{ {
if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{ {
fprintf(stderr, _("%s: could not pad WAL segment %s: %s\n"), fprintf(stderr,
_("%s: could not pad transaction log file \"%s\": %s\n"),
progname, fn, strerror(errno)); progname, fn, strerror(errno));
free(zerobuf); free(zerobuf);
close(f); close(f);
...@@ -110,7 +115,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu ...@@ -110,7 +115,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu
if (lseek(f, SEEK_SET, 0) != 0) if (lseek(f, SEEK_SET, 0) != 0)
{ {
fprintf(stderr, _("%s: could not seek back to beginning of WAL segment %s: %s\n"), fprintf(stderr,
_("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
progname, fn, strerror(errno)); progname, fn, strerror(errno));
close(f); close(f);
return -1; return -1;
...@@ -119,7 +125,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu ...@@ -119,7 +125,8 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu
} }
/* /*
* Close the current WAL file, and rename it to the correct filename if it's complete. * Close the current WAL file, and rename it to the correct filename if it's
* complete.
* *
* If segment_complete is true, rename the current WAL file even if we've not * If segment_complete is true, rename the current WAL file even if we've not
* completed writing the whole segment. * completed writing the whole segment.
...@@ -131,21 +138,22 @@ close_walfile(char *basedir, char *walname, bool segment_complete) ...@@ -131,21 +138,22 @@ close_walfile(char *basedir, char *walname, bool segment_complete)
if (currpos == -1) if (currpos == -1)
{ {
fprintf(stderr, _("%s: could not get current position in file %s: %s\n"), fprintf(stderr,
_("%s: could not determine seek position in file \"%s\": %s\n"),
progname, walname, strerror(errno)); progname, walname, strerror(errno));
return false; return false;
} }
if (fsync(walfile) != 0) if (fsync(walfile) != 0)
{ {
fprintf(stderr, _("%s: could not fsync file %s: %s\n"), fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
progname, walname, strerror(errno)); progname, walname, strerror(errno));
return false; return false;
} }
if (close(walfile) != 0) if (close(walfile) != 0)
{ {
fprintf(stderr, _("%s: could not close file %s: %s\n"), fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
progname, walname, strerror(errno)); progname, walname, strerror(errno));
walfile = -1; walfile = -1;
return false; return false;
...@@ -165,13 +173,14 @@ close_walfile(char *basedir, char *walname, bool segment_complete) ...@@ -165,13 +173,14 @@ close_walfile(char *basedir, char *walname, bool segment_complete)
snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname); snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname);
if (rename(oldfn, newfn) != 0) if (rename(oldfn, newfn) != 0)
{ {
fprintf(stderr, _("%s: could not rename file %s: %s\n"), fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
progname, walname, strerror(errno)); progname, walname, strerror(errno));
return false; return false;
} }
} }
else else
fprintf(stderr, _("%s: not renaming %s, segment is not complete.\n"), fprintf(stderr,
_("%s: not renaming \"%s\", segment is not complete.\n"),
progname, walname); progname, walname);
return true; return true;
...@@ -271,7 +280,10 @@ localTimestampDifferenceExceeds(TimestampTz start_time, ...@@ -271,7 +280,10 @@ localTimestampDifferenceExceeds(TimestampTz start_time,
* Note: The log position *must* be at a log segment start! * Note: The log position *must* be at a log segment start!
*/ */
bool bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, bool rename_partial) ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout, bool rename_partial)
{ {
char query[128]; char query[128];
char current_walfile_name[MAXPGPATH]; char current_walfile_name[MAXPGPATH];
...@@ -286,27 +298,33 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -286,27 +298,33 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
res = PQexec(conn, "IDENTIFY_SYSTEM"); res = PQexec(conn, "IDENTIFY_SYSTEM");
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
fprintf(stderr, _("%s: could not identify system: %s\n"), fprintf(stderr,
progname, PQerrorMessage(conn)); _("%s: could not send replication command \"%s\": %s"),
progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));
PQclear(res); PQclear(res);
return false; return false;
} }
if (PQnfields(res) != 3 || PQntuples(res) != 1) if (PQnfields(res) != 3 || PQntuples(res) != 1)
{ {
fprintf(stderr, _("%s: could not identify system, got %d rows and %d fields\n"), fprintf(stderr,
progname, PQntuples(res), PQnfields(res)); _("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d fields\n"),
progname, PQntuples(res), PQnfields(res), 1, 3);
PQclear(res); PQclear(res);
return false; return false;
} }
if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0) if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0)
{ {
fprintf(stderr, _("%s: system identifier does not match between base backup and streaming connection\n"), progname); fprintf(stderr,
_("%s: system identifier does not match between base backup and streaming connection\n"),
progname);
PQclear(res); PQclear(res);
return false; return false;
} }
if (timeline != atoi(PQgetvalue(res, 0, 1))) if (timeline != atoi(PQgetvalue(res, 0, 1)))
{ {
fprintf(stderr, _("%s: timeline does not match between base backup and streaming connection\n"), progname); fprintf(stderr,
_("%s: timeline does not match between base backup and streaming connection\n"),
progname);
PQclear(res); PQclear(res);
return false; return false;
} }
...@@ -319,8 +337,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -319,8 +337,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
res = PQexec(conn, query); res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COPY_BOTH) if (PQresultStatus(res) != PGRES_COPY_BOTH)
{ {
fprintf(stderr, _("%s: could not start replication: %s\n"), fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),
progname, PQresultErrorMessage(res)); progname, "START_REPLICATION", PQresultErrorMessage(res));
PQclear(res); PQclear(res);
return false; return false;
} }
...@@ -348,7 +366,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -348,7 +366,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
*/ */
if (stream_stop && stream_stop(blockpos, timeline, false)) if (stream_stop && stream_stop(blockpos, timeline, false))
{ {
if (walfile != -1 && !close_walfile(basedir, current_walfile_name, rename_partial)) if (walfile != -1 && !close_walfile(basedir, current_walfile_name,
rename_partial))
/* Potential error message is written by close_walfile */ /* Potential error message is written by close_walfile */
goto error; goto error;
return true; return true;
...@@ -364,8 +383,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -364,8 +383,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
{ {
/* Time to send feedback! */ /* Time to send feedback! */
char replybuf[sizeof(StandbyReplyMessage) + 1]; char replybuf[sizeof(StandbyReplyMessage) + 1];
StandbyReplyMessage *replymsg = (StandbyReplyMessage *) (replybuf + 1); StandbyReplyMessage *replymsg;
replymsg = (StandbyReplyMessage *) (replybuf + 1);
replymsg->write = blockpos; replymsg->write = blockpos;
replymsg->flush = InvalidXLogRecPtr; replymsg->flush = InvalidXLogRecPtr;
replymsg->apply = InvalidXLogRecPtr; replymsg->apply = InvalidXLogRecPtr;
...@@ -433,7 +453,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -433,7 +453,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
/* Else there is actually data on the socket */ /* Else there is actually data on the socket */
if (PQconsumeInput(conn) == 0) if (PQconsumeInput(conn) == 0)
{ {
fprintf(stderr, _("%s: could not receive data from WAL stream: %s\n"), fprintf(stderr,
_("%s: could not receive data from WAL stream: %s"),
progname, PQerrorMessage(conn)); progname, PQerrorMessage(conn));
goto error; goto error;
} }
...@@ -444,7 +465,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -444,7 +465,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
break; break;
if (r == -2) if (r == -2)
{ {
fprintf(stderr, _("%s: could not read copy data: %s\n"), fprintf(stderr, _("%s: could not read COPY data: %s"),
progname, PQerrorMessage(conn)); progname, PQerrorMessage(conn));
goto error; goto error;
} }
...@@ -456,7 +477,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -456,7 +477,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
*/ */
if (r != STREAMING_KEEPALIVE_SIZE) if (r != STREAMING_KEEPALIVE_SIZE)
{ {
fprintf(stderr, _("%s: keepalive message is incorrect size: %d\n"), fprintf(stderr,
_("%s: keepalive message has incorrect size %d\n"),
progname, r); progname, r);
goto error; goto error;
} }
...@@ -488,7 +510,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -488,7 +510,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
/* No file open yet */ /* No file open yet */
if (xlogoff != 0) if (xlogoff != 0)
{ {
fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"), fprintf(stderr,
_("%s: received transaction log record for offset %u with no file open\n"),
progname, xlogoff); progname, xlogoff);
goto error; goto error;
} }
...@@ -499,7 +522,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -499,7 +522,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
/* XXX: store seek value don't reseek all the time */ /* XXX: store seek value don't reseek all the time */
if (lseek(walfile, 0, SEEK_CUR) != xlogoff) if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
{ {
fprintf(stderr, _("%s: got WAL data offset %08x, expected %08x\n"), fprintf(stderr,
_("%s: got WAL data offset %08x, expected %08x\n"),
progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
goto error; goto error;
} }
...@@ -534,10 +558,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -534,10 +558,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
copybuf + STREAMING_HEADER_SIZE + bytes_written, copybuf + STREAMING_HEADER_SIZE + bytes_written,
bytes_to_write) != bytes_to_write) bytes_to_write) != bytes_to_write)
{ {
fprintf(stderr, _("%s: could not write %u bytes to WAL file %s: %s\n"), fprintf(stderr,
progname, _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
bytes_to_write, progname, bytes_to_write, current_walfile_name,
current_walfile_name,
strerror(errno)); strerror(errno));
goto error; goto error;
} }
...@@ -581,7 +604,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -581,7 +604,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
res = PQgetResult(conn); res = PQgetResult(conn);
if (PQresultStatus(res) != PGRES_COMMAND_OK) if (PQresultStatus(res) != PGRES_COMMAND_OK)
{ {
fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"), fprintf(stderr,
_("%s: unexpected termination of replication stream: %s"),
progname, PQresultErrorMessage(res)); progname, PQresultErrorMessage(res));
goto error; goto error;
} }
......
...@@ -184,7 +184,8 @@ GetConnection(void) ...@@ -184,7 +184,8 @@ GetConnection(void)
tmpparam = PQparameterStatus(tmpconn, "integer_datetimes"); tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
if (!tmpparam) if (!tmpparam)
{ {
fprintf(stderr, _("%s: could not determine server setting for integer_datetimes\n"), fprintf(stderr,
_("%s: could not determine server setting for integer_datetimes\n"),
progname); progname);
PQfinish(tmpconn); PQfinish(tmpconn);
exit(1); exit(1);
...@@ -196,7 +197,8 @@ GetConnection(void) ...@@ -196,7 +197,8 @@ GetConnection(void)
if (strcmp(tmpparam, "off") != 0) if (strcmp(tmpparam, "off") != 0)
#endif #endif
{ {
fprintf(stderr, _("%s: integer_datetimes compile flag does not match server\n"), fprintf(stderr,
_("%s: integer_datetimes compile flag does not match server\n"),
progname); progname);
PQfinish(tmpconn); PQfinish(tmpconn);
exit(1); exit(1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment