Commit 2c0a4858 authored by Andres Freund's avatar Andres Freund

Prevent WAL files created by pg_basebackup -x/X from being archived again.

WAL (and timeline history) files created by pg_basebackup did not
maintain the new base backup's archive status. That's currently not a
problem if the new node is used as a standby - but if that node is
promoted all still existing files can get archived again.  With a high
wal_keep_segment settings that can happen a significant time later -
which is quite confusing.

Change both the backend (for the -x/-X fetch case) and pg_basebackup
(for -X stream) itself to always mark WAL/timeline files included in
the base backup as .done. That's in line with walreceiver.c doing so.

The verbosity of the pg_basebackup changes show pretty clearly that it
needs some refactoring, but that'd result in not be backpatchable
changes.

Backpatch to 9.1 where pg_basebackup was introduced.

Discussion: 20141205002854.GE21964@awork2.anarazel.de
parent ccb161b6
...@@ -471,6 +471,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) ...@@ -471,6 +471,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
errmsg("unexpected WAL file size \"%s\"", walFiles[i]))); errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
} }
/* send the WAL file itself */
_tarWriteHeader(pathbuf, NULL, &statbuf); _tarWriteHeader(pathbuf, NULL, &statbuf);
while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0) while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0)
...@@ -497,7 +498,17 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) ...@@ -497,7 +498,17 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
} }
/* XLogSegSize is a multiple of 512, so no need for padding */ /* XLogSegSize is a multiple of 512, so no need for padding */
FreeFile(fp); FreeFile(fp);
/*
* Mark file as archived, otherwise files can get archived again
* after promotion of a new node. This is in line with
* walreceiver.c always doing a XLogArchiveForceDone() after a
* complete segment.
*/
StatusFilePath(pathbuf, walFiles[i], ".done");
sendFileWithContent(pathbuf, "");
} }
/* /*
...@@ -521,6 +532,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir) ...@@ -521,6 +532,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
errmsg("could not stat file \"%s\": %m", pathbuf))); errmsg("could not stat file \"%s\": %m", pathbuf)));
sendFile(pathbuf, pathbuf, &statbuf, false); sendFile(pathbuf, pathbuf, &statbuf, false);
/* unconditionally mark file as archived */
StatusFilePath(pathbuf, fname, ".done");
sendFileWithContent(pathbuf, "");
} }
/* Send CopyDone message for the last tar file */ /* Send CopyDone message for the last tar file */
...@@ -1021,6 +1036,15 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces) ...@@ -1021,6 +1036,15 @@ sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf); _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
} }
size += 512; /* Size of the header just added */ size += 512; /* Size of the header just added */
/*
* Also send archive_status directory (by hackishly reusing
* statbuf from above ...).
*/
if (!sizeonly)
_tarWriteHeader("./pg_xlog/archive_status", NULL, &statbuf);
size += 512; /* Size of the header just added */
continue; /* don't recurse into pg_xlog */ continue; /* don't recurse into pg_xlog */
} }
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include <zlib.h> #include <zlib.h>
#endif #endif
#include "common/string.h"
#include "getopt_long.h" #include "getopt_long.h"
#include "libpq-fe.h" #include "libpq-fe.h"
#include "pqexpbuffer.h" #include "pqexpbuffer.h"
...@@ -370,7 +371,7 @@ LogStreamerMain(logstreamer_param *param) ...@@ -370,7 +371,7 @@ LogStreamerMain(logstreamer_param *param)
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir, param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout, reached_end_position, standby_message_timeout,
NULL, false)) NULL, false, true))
/* /*
* Any errors will already have been reported in the function process, * Any errors will already have been reported in the function process,
...@@ -394,6 +395,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) ...@@ -394,6 +395,7 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
logstreamer_param *param; logstreamer_param *param;
uint32 hi, uint32 hi,
lo; lo;
char statusdir[MAXPGPATH];
param = pg_malloc0(sizeof(logstreamer_param)); param = pg_malloc0(sizeof(logstreamer_param));
param->timeline = timeline; param->timeline = timeline;
...@@ -428,13 +430,23 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) ...@@ -428,13 +430,23 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
/* Error message already written in GetConnection() */ /* Error message already written in GetConnection() */
exit(1); exit(1);
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
/* /*
* Always in plain format, so we can write to basedir/pg_xlog. But the * Create pg_xlog/archive_status (and thus pg_xlog) so we can can write to
* directory entry in the tar file may arrive later, so make sure it's * basedir/pg_xlog as the directory entry in the tar file may arrive
* created before we start. * later.
*/ */
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir); snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
verify_dir_is_empty_or_create(param->xlogdir); basedir);
if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
{
fprintf(stderr,
_("%s: could not create directory \"%s\": %s\n"),
progname, statusdir, strerror(errno));
disconnect_and_exit(1);
}
/* /*
* Start a child process and tell it to start streaming. On Unix, this is * Start a child process and tell it to start streaming. On Unix, this is
...@@ -1236,11 +1248,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) ...@@ -1236,11 +1248,12 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
* by the wal receiver process. Also, when transaction * by the wal receiver process. Also, when transaction
* log directory location was specified, pg_xlog has * log directory location was specified, pg_xlog has
* already been created as a symbolic link before * already been created as a symbolic link before
* starting the actual backup. So just ignore failure * starting the actual backup. So just ignore creation
* on them. * failures on related directories.
*/ */
if ((!streamwal && (strcmp(xlog_dir, "") == 0)) if (!((pg_str_endswith(filename, "/pg_xlog") ||
|| strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0) pg_str_endswith(filename, "/archive_status")) &&
errno == EEXIST))
{ {
fprintf(stderr, fprintf(stderr,
_("%s: could not create directory \"%s\": %s\n"), _("%s: could not create directory \"%s\": %s\n"),
......
...@@ -342,7 +342,7 @@ StreamLog(void) ...@@ -342,7 +342,7 @@ StreamLog(void)
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
stop_streaming, standby_message_timeout, ".partial", stop_streaming, standby_message_timeout, ".partial",
synchronous); synchronous, false);
PQfinish(conn); PQfinish(conn);
conn = NULL; conn = NULL;
......
...@@ -37,7 +37,7 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, ...@@ -37,7 +37,7 @@ static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir, uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout, stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos, char *partial_suffix, XLogRecPtr *stoppos,
bool synchronous); bool synchronous, bool mark_done);
static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
...@@ -45,20 +45,50 @@ static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, ...@@ -45,20 +45,50 @@ static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr *blockpos, uint32 timeline, XLogRecPtr *blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop, char *basedir, stream_stop_callback stream_stop,
char *partial_suffix); char *partial_suffix, bool mark_done);
static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf, static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
XLogRecPtr blockpos, char *basedir, char *partial_suffix, XLogRecPtr blockpos, char *basedir, char *partial_suffix,
XLogRecPtr *stoppos); XLogRecPtr *stoppos, bool mark_done);
static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
uint32 timeline, char *basedir, uint32 timeline, char *basedir,
stream_stop_callback stream_stop, stream_stop_callback stream_stop,
char *partial_suffix, XLogRecPtr *stoppos); char *partial_suffix, XLogRecPtr *stoppos,
bool mark_done);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
int64 last_status); int64 last_status);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline); uint32 *timeline);
static bool
mark_file_as_archived(const char *basedir, const char *fname)
{
int fd;
static char tmppath[MAXPGPATH];
snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
basedir, fname);
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0)
{
fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
progname, tmppath, strerror(errno));
return false;
}
if (fsync(fd) != 0)
{
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
progname, tmppath, strerror(errno));
return false;
}
close(fd);
return true;
}
/* /*
* Open a new WAL file in the specified directory. * Open a new WAL file in the specified directory.
* *
...@@ -152,7 +182,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, ...@@ -152,7 +182,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
* and returns false, otherwise returns true. * and returns false, otherwise returns true.
*/ */
static bool static bool
close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done)
{ {
off_t currpos; off_t currpos;
...@@ -206,6 +236,19 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos) ...@@ -206,6 +236,19 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
_("%s: not renaming \"%s%s\", segment is not complete\n"), _("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, partial_suffix); progname, current_walfile_name, partial_suffix);
/*
* Mark file as archived if requested by the caller - pg_basebackup needs
* to do so as files can otherwise get archived again after promotion of a
* new node. This is in line with walreceiver.c always doing a
* XLogArchiveForceDone() after a complete segment.
*/
if (currpos == XLOG_SEG_SIZE && mark_done)
{
/* writes error message if failed */
if (!mark_file_as_archived(basedir, current_walfile_name))
return false;
}
lastFlushPosition = pos; lastFlushPosition = pos;
return true; return true;
} }
...@@ -248,7 +291,8 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli) ...@@ -248,7 +291,8 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
} }
static bool static bool
writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content) writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
char *content, bool mark_done)
{ {
int size = strlen(content); int size = strlen(content);
char path[MAXPGPATH]; char path[MAXPGPATH];
...@@ -327,6 +371,14 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co ...@@ -327,6 +371,14 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *co
return false; return false;
} }
/* Maintain archive_status, check close_walfile() for details. */
if (mark_done)
{
/* writes error message if failed */
if (!mark_file_as_archived(basedir, histfname))
return false;
}
return true; return true;
} }
...@@ -447,7 +499,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -447,7 +499,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir, char *sysidentifier, char *basedir,
stream_stop_callback stream_stop, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix, int standby_message_timeout, char *partial_suffix,
bool synchronous) bool synchronous, bool mark_done)
{ {
char query[128]; char query[128];
char slotcmd[128]; char slotcmd[128];
...@@ -562,7 +614,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -562,7 +614,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Write the history file to disk */ /* Write the history file to disk */
writeTimeLineHistoryFile(basedir, timeline, writeTimeLineHistoryFile(basedir, timeline,
PQgetvalue(res, 0, 0), PQgetvalue(res, 0, 0),
PQgetvalue(res, 0, 1)); PQgetvalue(res, 0, 1),
mark_done);
PQclear(res); PQclear(res);
} }
...@@ -592,7 +645,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -592,7 +645,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */ /* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix, standby_message_timeout, partial_suffix,
&stoppos, synchronous); &stoppos, synchronous, mark_done);
if (res == NULL) if (res == NULL)
goto error; goto error;
...@@ -757,7 +810,7 @@ static PGresult * ...@@ -757,7 +810,7 @@ static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop, char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix, int standby_message_timeout, char *partial_suffix,
XLogRecPtr *stoppos, bool synchronous) XLogRecPtr *stoppos, bool synchronous, bool mark_done)
{ {
char *copybuf = NULL; char *copybuf = NULL;
int64 last_status = -1; int64 last_status = -1;
...@@ -775,7 +828,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -775,7 +828,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Check if we should continue streaming, or abort at this point. * Check if we should continue streaming, or abort at this point.
*/ */
if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
stream_stop, partial_suffix, stoppos)) stream_stop, partial_suffix, stoppos,
mark_done))
goto error; goto error;
now = feGetCurrentTimestamp(); now = feGetCurrentTimestamp();
...@@ -830,7 +884,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -830,7 +884,8 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
if (r == -2) if (r == -2)
{ {
PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos, PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
basedir, partial_suffix, stoppos); basedir, partial_suffix,
stoppos, mark_done);
if (res == NULL) if (res == NULL)
goto error; goto error;
else else
...@@ -847,14 +902,16 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -847,14 +902,16 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
else if (copybuf[0] == 'w') else if (copybuf[0] == 'w')
{ {
if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos, if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
timeline, basedir, stream_stop, partial_suffix)) timeline, basedir, stream_stop,
partial_suffix, true))
goto error; goto error;
/* /*
* Check if we should continue streaming, or abort at this point. * Check if we should continue streaming, or abort at this point.
*/ */
if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
stream_stop, partial_suffix, stoppos)) stream_stop, partial_suffix, stoppos,
mark_done))
goto error; goto error;
} }
else else
...@@ -1055,7 +1112,7 @@ static bool ...@@ -1055,7 +1112,7 @@ static bool
ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr *blockpos, uint32 timeline, XLogRecPtr *blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop, char *basedir, stream_stop_callback stream_stop,
char *partial_suffix) char *partial_suffix, bool mark_done)
{ {
int xlogoff; int xlogoff;
int bytes_left; int bytes_left;
...@@ -1163,7 +1220,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, ...@@ -1163,7 +1220,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
/* Did we reach the end of a WAL segment? */ /* Did we reach the end of a WAL segment? */
if (*blockpos % XLOG_SEG_SIZE == 0) if (*blockpos % XLOG_SEG_SIZE == 0)
{ {
if (!close_walfile(basedir, partial_suffix, *blockpos)) if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done))
/* Error message written in close_walfile() */ /* Error message written in close_walfile() */
return false; return false;
...@@ -1193,7 +1250,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, ...@@ -1193,7 +1250,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
static PGresult * static PGresult *
HandleEndOfCopyStream(PGconn *conn, char *copybuf, HandleEndOfCopyStream(PGconn *conn, char *copybuf,
XLogRecPtr blockpos, char *basedir, char *partial_suffix, XLogRecPtr blockpos, char *basedir, char *partial_suffix,
XLogRecPtr *stoppos) XLogRecPtr *stoppos, bool mark_done)
{ {
PGresult *res = PQgetResult(conn); PGresult *res = PQgetResult(conn);
...@@ -1204,7 +1261,7 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf, ...@@ -1204,7 +1261,7 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
*/ */
if (still_sending) if (still_sending)
{ {
if (!close_walfile(basedir, partial_suffix, blockpos)) if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
{ {
/* Error message written in close_walfile() */ /* Error message written in close_walfile() */
PQclear(res); PQclear(res);
...@@ -1236,11 +1293,11 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf, ...@@ -1236,11 +1293,11 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
static bool static bool
CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop, char *basedir, stream_stop_callback stream_stop,
char *partial_suffix, XLogRecPtr *stoppos) char *partial_suffix, XLogRecPtr *stoppos, bool mark_done)
{ {
if (still_sending && stream_stop(blockpos, timeline, false)) if (still_sending && stream_stop(blockpos, timeline, false))
{ {
if (!close_walfile(basedir, partial_suffix, blockpos)) if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
{ {
/* Potential error message is written by close_walfile */ /* Potential error message is written by close_walfile */
return false; return false;
......
...@@ -31,6 +31,7 @@ extern bool ReceiveXlogStream(PGconn *conn, ...@@ -31,6 +31,7 @@ extern bool ReceiveXlogStream(PGconn *conn,
stream_stop_callback stream_stop, stream_stop_callback stream_stop,
int standby_message_timeout, int standby_message_timeout,
char *partial_suffix, char *partial_suffix,
bool synchronous); bool synchronous,
bool mark_done);
#endif /* RECEIVELOG_H */ #endif /* RECEIVELOG_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment