Commit 56c7d8d4 authored by Magnus Hagander's avatar Magnus Hagander

Allow pg_basebackup to stream transaction log in tar mode

This will write the received transaction log into a file called
pg_wal.tar(.gz) next to the other tarfiles instead of writing it to
base.tar. When using fetch mode, the transaction log is still written to
base.tar like before, and when used against a pre-10 server, the file
is named pg_xlog.tar.

To do this, implement a new concept of a "walmethod", which is
responsible for writing the WAL. Two implementations exist, one that
writes to a plain directory (which is also used by pg_receivexlog) and
one that writes to a tar file with optional compression.

Reviewed by Michael Paquier
parent 1885c884
...@@ -180,7 +180,8 @@ PostgreSQL documentation ...@@ -180,7 +180,8 @@ PostgreSQL documentation
target directory, the tar contents will be written to target directory, the tar contents will be written to
standard output, suitable for piping to for example standard output, suitable for piping to for example
<productname>gzip</productname>. This is only possible if <productname>gzip</productname>. This is only possible if
the cluster has no additional tablespaces. the cluster has no additional tablespaces and transaction
log streaming is not used.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
...@@ -323,6 +324,10 @@ PostgreSQL documentation ...@@ -323,6 +324,10 @@ PostgreSQL documentation
If the log has been rotated when it's time to transfer it, the If the log has been rotated when it's time to transfer it, the
backup will fail and be unusable. backup will fail and be unusable.
</para> </para>
<para>
The transaction log files will be written to
the <filename>base.tar</filename> file.
</para>
</listitem> </listitem>
</varlistentry> </varlistentry>
...@@ -339,6 +344,11 @@ PostgreSQL documentation ...@@ -339,6 +344,11 @@ PostgreSQL documentation
client can keep up with transaction log received, using this mode client can keep up with transaction log received, using this mode
requires no extra transaction logs to be saved on the master. requires no extra transaction logs to be saved on the master.
</para> </para>
<para>
The transaction log files are written to a separate file
named <filename>pg_wal.tar</filename> (if the server is a version
earlier than 10, the file will be named <filename>pg_xlog.tar</filename>).
</para>
</listitem> </listitem>
</varlistentry> </varlistentry>
</variablelist> </variablelist>
...@@ -353,7 +363,8 @@ PostgreSQL documentation ...@@ -353,7 +363,8 @@ PostgreSQL documentation
<para> <para>
Enables gzip compression of tar file output, with the default Enables gzip compression of tar file output, with the default
compression level. Compression is only available when using compression level. Compression is only available when using
the tar format. the tar format, and the suffix <filename>.gz</filename> will
automatically be added to all tar filenames.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
...@@ -366,7 +377,8 @@ PostgreSQL documentation ...@@ -366,7 +377,8 @@ PostgreSQL documentation
Enables gzip compression of tar file output, and specifies the Enables gzip compression of tar file output, and specifies the
compression level (0 through 9, 0 being no compression and 9 being best compression level (0 through 9, 0 being no compression and 9 being best
compression). Compression is only available when using the tar compression). Compression is only available when using the tar
format. format, and the suffix <filename>.gz</filename> will
automatically be added to all tar filenames.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
......
...@@ -19,7 +19,7 @@ include $(top_builddir)/src/Makefile.global ...@@ -19,7 +19,7 @@ include $(top_builddir)/src/Makefile.global
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
LDFLAGS += -L$(top_builddir)/src/fe_utils -lpgfeutils -lpq LDFLAGS += -L$(top_builddir)/src/fe_utils -lpgfeutils -lpq
OBJS=receivelog.o streamutil.o $(WIN32RES) OBJS=receivelog.o streamutil.o walmethods.o $(WIN32RES)
all: pg_basebackup pg_receivexlog pg_recvlogical all: pg_basebackup pg_receivexlog pg_recvlogical
......
...@@ -449,7 +449,7 @@ typedef struct ...@@ -449,7 +449,7 @@ typedef struct
{ {
PGconn *bgconn; PGconn *bgconn;
XLogRecPtr startptr; XLogRecPtr startptr;
char xlogdir[MAXPGPATH]; char xlog[MAXPGPATH]; /* directory or tarfile depending on mode */
char *sysidentifier; char *sysidentifier;
int timeline; int timeline;
} logstreamer_param; } logstreamer_param;
...@@ -470,9 +470,13 @@ LogStreamerMain(logstreamer_param *param) ...@@ -470,9 +470,13 @@ LogStreamerMain(logstreamer_param *param)
stream.synchronous = false; stream.synchronous = false;
stream.do_sync = do_sync; stream.do_sync = do_sync;
stream.mark_done = true; stream.mark_done = true;
stream.basedir = param->xlogdir;
stream.partial_suffix = NULL; stream.partial_suffix = NULL;
if (format == 'p')
stream.walmethod = CreateWalDirectoryMethod(param->xlog, do_sync);
else
stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
if (!ReceiveXlogStream(param->bgconn, &stream)) if (!ReceiveXlogStream(param->bgconn, &stream))
/* /*
...@@ -482,6 +486,14 @@ LogStreamerMain(logstreamer_param *param) ...@@ -482,6 +486,14 @@ LogStreamerMain(logstreamer_param *param)
*/ */
return 1; return 1;
if (!stream.walmethod->finish())
{
fprintf(stderr,
_("%s: could not finish writing WAL files: %s\n"),
progname, strerror(errno));
return 1;
}
PQfinish(param->bgconn); PQfinish(param->bgconn);
return 0; return 0;
} }
...@@ -533,28 +545,32 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier) ...@@ -533,28 +545,32 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
exit(1); exit(1);
/* In post-10 cluster, pg_xlog has been renamed to pg_wal */ /* In post-10 cluster, pg_xlog has been renamed to pg_wal */
snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/%s", snprintf(param->xlog, sizeof(param->xlog), "%s/%s",
basedir, basedir,
PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ? PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
"pg_xlog" : "pg_wal"); "pg_xlog" : "pg_wal");
/*
* Create pg_wal/archive_status or pg_xlog/archive_status (and thus
* pg_wal or pg_xlog) depending on the target server so we can write to
* basedir/pg_wal or basedir/pg_xlog as the directory entry in the tar
* file may arrive later.
*/
snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",
basedir,
PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
"pg_xlog" : "pg_wal");
if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST) if (format == 'p')
{ {
fprintf(stderr, /*
_("%s: could not create directory \"%s\": %s\n"), * Create pg_wal/archive_status or pg_xlog/archive_status (and thus
progname, statusdir, strerror(errno)); * pg_wal or pg_xlog) depending on the target server so we can write to
disconnect_and_exit(1); * basedir/pg_wal or basedir/pg_xlog as the directory entry in the tar
* file may arrive later.
*/
snprintf(statusdir, sizeof(statusdir), "%s/%s/archive_status",
basedir,
PQserverVersion(conn) < MINIMUM_VERSION_FOR_PG_WAL ?
"pg_xlog" : "pg_wal");
if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
{
fprintf(stderr,
_("%s: could not create directory \"%s\": %s\n"),
progname, statusdir, strerror(errno));
disconnect_and_exit(1);
}
} }
/* /*
...@@ -2245,16 +2261,6 @@ main(int argc, char **argv) ...@@ -2245,16 +2261,6 @@ main(int argc, char **argv)
exit(1); exit(1);
} }
if (format != 'p' && streamwal)
{
fprintf(stderr,
_("%s: WAL streaming can only be used in plain mode\n"),
progname);
fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
progname);
exit(1);
}
if (replication_slot && !streamwal) if (replication_slot && !streamwal)
{ {
fprintf(stderr, fprintf(stderr,
......
...@@ -338,11 +338,19 @@ StreamLog(void) ...@@ -338,11 +338,19 @@ StreamLog(void)
stream.synchronous = synchronous; stream.synchronous = synchronous;
stream.do_sync = true; stream.do_sync = true;
stream.mark_done = false; stream.mark_done = false;
stream.basedir = basedir; stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync);
stream.partial_suffix = ".partial"; stream.partial_suffix = ".partial";
ReceiveXlogStream(conn, &stream); ReceiveXlogStream(conn, &stream);
if (!stream.walmethod->finish())
{
fprintf(stderr,
_("%s: could not finish writing WAL files: %s\n"),
progname, strerror(errno));
return;
}
PQfinish(conn); PQfinish(conn);
conn = NULL; conn = NULL;
} }
......
...@@ -30,7 +30,7 @@ ...@@ -30,7 +30,7 @@
/* fd and filename for currently open WAL file */ /* fd and filename for currently open WAL file */
static int walfile = -1; static Walfile *walfile = NULL;
static char current_walfile_name[MAXPGPATH] = ""; static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false; static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
...@@ -56,29 +56,23 @@ static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, ...@@ -56,29 +56,23 @@ static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline); uint32 *timeline);
static bool static bool
mark_file_as_archived(const char *basedir, const char *fname, bool do_sync) mark_file_as_archived(StreamCtl *stream, const char *fname)
{ {
int fd; Walfile *f;
static char tmppath[MAXPGPATH]; static char tmppath[MAXPGPATH];
snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done", snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
basedir, fname); fname);
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); f = stream->walmethod->open_for_write(tmppath, NULL, 0);
if (fd < 0) if (f == NULL)
{ {
fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"), fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
progname, tmppath, strerror(errno)); progname, tmppath, stream->walmethod->getlasterror());
return false; return false;
} }
close(fd); stream->walmethod->close(f, CLOSE_NORMAL);
if (do_sync && fsync_fname(tmppath, false, progname) != 0)
return false;
if (do_sync && fsync_parent_path(tmppath, progname) != 0)
return false;
return true; return true;
} }
...@@ -95,121 +89,82 @@ mark_file_as_archived(const char *basedir, const char *fname, bool do_sync) ...@@ -95,121 +89,82 @@ mark_file_as_archived(const char *basedir, const char *fname, bool do_sync)
static bool static bool
open_walfile(StreamCtl *stream, XLogRecPtr startpoint) open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
{ {
int f; Walfile *f;
char fn[MAXPGPATH]; char fn[MAXPGPATH];
struct stat statbuf; ssize_t size;
char *zerobuf;
int bytes;
XLogSegNo segno; XLogSegNo segno;
XLByteToSeg(startpoint, segno); XLByteToSeg(startpoint, segno);
XLogFileName(current_walfile_name, stream->timeline, segno); XLogFileName(current_walfile_name, stream->timeline, segno);
snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name, snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
stream->partial_suffix ? stream->partial_suffix : ""); stream->partial_suffix ? stream->partial_suffix : "");
f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (f == -1)
{
fprintf(stderr,
_("%s: could not open transaction log file \"%s\": %s\n"),
progname, fn, strerror(errno));
return false;
}
/* /*
* Verify that the file is either empty (just created), or a complete * When streaming to files, if an existing file exists we verify that it's
* XLogSegSize segment. Anything in between indicates a corrupt file. * either empty (just created), or a complete XLogSegSize segment (in
* which case it has been created and padded). Anything else indicates a
* corrupt file.
*
* When streaming to tar, no file with this name will exist before, so we
* never have to verify a size.
*/ */
if (fstat(f, &statbuf) != 0) if (stream->walmethod->existsfile(fn))
{ {
fprintf(stderr, size = stream->walmethod->get_file_size(fn);
_("%s: could not stat transaction log file \"%s\": %s\n"), if (size < 0)
progname, fn, strerror(errno));
close(f);
return false;
}
if (statbuf.st_size == XLogSegSize)
{
/*
* fsync, in case of a previous crash between padding and fsyncing the
* file.
*/
if (stream->do_sync)
{ {
if (fsync_fname(fn, false, progname) != 0 || fprintf(stderr,
fsync_parent_path(fn, progname) != 0) _("%s: could not get size of transaction log file \"%s\": %s\n"),
progname, fn, stream->walmethod->getlasterror());
return false;
}
if (size == XLogSegSize)
{
/* Already padded file. Open it for use */
f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
if (f == NULL)
{ {
/* error already printed */ fprintf(stderr,
close(f); _("%s: could not open existing transaction log file \"%s\": %s\n"),
progname, fn, stream->walmethod->getlasterror());
return false; return false;
} }
}
/* File is open and ready to use */ /* fsync file in case of a previous crash */
walfile = f; if (!stream->walmethod->fsync(f))
return true; {
} stream->walmethod->close(f, CLOSE_UNLINK);
if (statbuf.st_size != 0) return false;
{ }
fprintf(stderr,
_("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
progname, fn, (int) statbuf.st_size, XLogSegSize);
close(f);
return false;
}
/* walfile = f;
* New, empty, file. So pad it to 16Mb with zeroes. If we fail partway return true;
* through padding, we should attempt to unlink the file on failure, so as }
* not to leave behind a partially-filled file. if (size != 0)
*/
zerobuf = pg_malloc0(XLOG_BLCKSZ);
for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
{
errno = 0;
if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{ {
/* if write didn't set errno, assume problem is no disk space */ /* if write didn't set errno, assume problem is no disk space */
if (errno == 0) if (errno == 0)
errno = ENOSPC; errno = ENOSPC;
fprintf(stderr, fprintf(stderr,
_("%s: could not pad transaction log file \"%s\": %s\n"), _("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
progname, fn, strerror(errno)); progname, fn, (int) size, XLogSegSize);
free(zerobuf);
close(f);
unlink(fn);
return false; return false;
} }
/* File existed and was empty, so fall through and open */
} }
free(zerobuf);
/* /* No file existed, so create one */
* fsync WAL file and containing directory, to ensure the file is
* persistently created and zeroed. That's particularly important when
* using synchronous mode, where the file is modified and fsynced
* in-place, without a directory fsync.
*/
if (stream->do_sync)
{
if (fsync_fname(fn, false, progname) != 0 ||
fsync_parent_path(fn, progname) != 0)
{
/* error already printed */
close(f);
return false;
}
}
if (lseek(f, SEEK_SET, 0) != 0) f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, XLogSegSize);
if (f == NULL)
{ {
fprintf(stderr, fprintf(stderr,
_("%s: could not seek to beginning of transaction log file \"%s\": %s\n"), _("%s: could not open transaction log file \"%s\": %s\n"),
progname, fn, strerror(errno)); progname, fn, stream->walmethod->getlasterror());
close(f);
return false; return false;
} }
/* File is open and ready to use */
walfile = f; walfile = f;
return true; return true;
} }
...@@ -223,59 +178,46 @@ static bool ...@@ -223,59 +178,46 @@ static bool
close_walfile(StreamCtl *stream, XLogRecPtr pos) close_walfile(StreamCtl *stream, XLogRecPtr pos)
{ {
off_t currpos; off_t currpos;
int r;
if (walfile == -1) if (walfile == NULL)
return true; return true;
currpos = lseek(walfile, 0, SEEK_CUR); currpos = stream->walmethod->get_current_pos(walfile);
if (currpos == -1) if (currpos == -1)
{ {
fprintf(stderr, fprintf(stderr,
_("%s: could not determine seek position in file \"%s\": %s\n"), _("%s: could not determine seek position in file \"%s\": %s\n"),
progname, current_walfile_name, strerror(errno)); progname, current_walfile_name, stream->walmethod->getlasterror());
close(walfile); stream->walmethod->close(walfile, CLOSE_UNLINK);
walfile = -1; walfile = NULL;
return false; return false;
} }
if (stream->do_sync && fsync(walfile) != 0) if (stream->partial_suffix)
{ {
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), if (currpos == XLOG_SEG_SIZE)
progname, current_walfile_name, strerror(errno)); r = stream->walmethod->close(walfile, CLOSE_NORMAL);
close(walfile); else
walfile = -1; {
return false; fprintf(stderr,
_("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, stream->partial_suffix);
r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
}
} }
else
r = stream->walmethod->close(walfile, CLOSE_NORMAL);
if (close(walfile) != 0) walfile = NULL;
if (r != 0)
{ {
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
progname, current_walfile_name, strerror(errno)); progname, current_walfile_name, stream->walmethod->getlasterror());
walfile = -1;
return false; return false;
} }
walfile = -1;
/*
* If we finished writing a .partial file, rename it into place.
*/
if (currpos == XLOG_SEG_SIZE && stream->partial_suffix)
{
char oldfn[MAXPGPATH];
char newfn[MAXPGPATH];
snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix);
snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name);
if (durable_rename(oldfn, newfn, progname) != 0)
{
/* durable_rename produced a log entry */
return false;
}
}
else if (stream->partial_suffix)
fprintf(stderr,
_("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, stream->partial_suffix);
/* /*
* Mark file as archived if requested by the caller - pg_basebackup needs * Mark file as archived if requested by the caller - pg_basebackup needs
...@@ -286,8 +228,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) ...@@ -286,8 +228,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
if (currpos == XLOG_SEG_SIZE && stream->mark_done) if (currpos == XLOG_SEG_SIZE && stream->mark_done)
{ {
/* writes error message if failed */ /* writes error message if failed */
if (!mark_file_as_archived(stream->basedir, current_walfile_name, if (!mark_file_as_archived(stream, current_walfile_name))
stream->do_sync))
return false; return false;
} }
...@@ -302,9 +243,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos) ...@@ -302,9 +243,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
static bool static bool
existsTimeLineHistoryFile(StreamCtl *stream) existsTimeLineHistoryFile(StreamCtl *stream)
{ {
char path[MAXPGPATH];
char histfname[MAXFNAMELEN]; char histfname[MAXFNAMELEN];
int fd;
/* /*
* Timeline 1 never has a history file. We treat that as if it existed, * Timeline 1 never has a history file. We treat that as if it existed,
...@@ -315,31 +254,15 @@ existsTimeLineHistoryFile(StreamCtl *stream) ...@@ -315,31 +254,15 @@ existsTimeLineHistoryFile(StreamCtl *stream)
TLHistoryFileName(histfname, stream->timeline); TLHistoryFileName(histfname, stream->timeline);
snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname); return stream->walmethod->existsfile(histfname);
fd = open(path, O_RDONLY | PG_BINARY, 0);
if (fd < 0)
{
if (errno != ENOENT)
fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s\n"),
progname, path, strerror(errno));
return false;
}
else
{
close(fd);
return true;
}
} }
static bool static bool
writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
{ {
int size = strlen(content); int size = strlen(content);
char path[MAXPGPATH];
char tmppath[MAXPGPATH];
char histfname[MAXFNAMELEN]; char histfname[MAXFNAMELEN];
int fd; Walfile *f;
/* /*
* Check that the server's idea of how timeline history files should be * Check that the server's idea of how timeline history files should be
...@@ -353,53 +276,31 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) ...@@ -353,53 +276,31 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
return false; return false;
} }
snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname); f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
if (f == NULL)
/*
* Write into a temp file name.
*/
snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
unlink(tmppath);
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0)
{ {
fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"), fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
progname, tmppath, strerror(errno)); progname, histfname, stream->walmethod->getlasterror());
return false; return false;
} }
errno = 0; if ((int) stream->walmethod->write(f, content, size) != size)
if ((int) write(fd, content, size) != size)
{ {
int save_errno = errno; fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
progname, histfname, stream->walmethod->getlasterror());
/* /*
* If we fail to make the file, delete it to release disk space * If we fail to make the file, delete it to release disk space
*/ */
close(fd); stream->walmethod->close(f, CLOSE_UNLINK);
unlink(tmppath);
errno = save_errno;
fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
progname, tmppath, strerror(errno));
return false; return false;
} }
if (close(fd) != 0) if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
{ {
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
progname, tmppath, strerror(errno)); progname, histfname, stream->walmethod->getlasterror());
return false;
}
/*
* Now move the completed history file into place with its final name.
*/
if (durable_rename(tmppath, path, progname) < 0)
{
/* durable_rename produced a log entry */
return false; return false;
} }
...@@ -407,8 +308,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content) ...@@ -407,8 +308,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
if (stream->mark_done) if (stream->mark_done)
{ {
/* writes error message if failed */ /* writes error message if failed */
if (!mark_file_as_archived(stream->basedir, histfname, if (!mark_file_as_archived(stream, histfname))
stream->do_sync))
return false; return false;
} }
...@@ -618,7 +518,9 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) ...@@ -618,7 +518,9 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
{ {
/* /*
* Fetch the timeline history file for this timeline, if we don't have * Fetch the timeline history file for this timeline, if we don't have
* it already. * it already. When streaming log to tar, this will always return
* false, as we are never streaming into an existing file and
* therefore there can be no pre-existing timeline history file.
*/ */
if (!existsTimeLineHistoryFile(stream)) if (!existsTimeLineHistoryFile(stream))
{ {
...@@ -777,10 +679,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream) ...@@ -777,10 +679,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
} }
error: error:
if (walfile != -1 && close(walfile) != 0) if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NORMAL) != 0)
fprintf(stderr, _("%s: could not close file \"%s\": %s\n"), fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
progname, current_walfile_name, strerror(errno)); progname, current_walfile_name, stream->walmethod->getlasterror());
walfile = -1; walfile = NULL;
return false; return false;
} }
...@@ -864,12 +766,12 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream, ...@@ -864,12 +766,12 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
* If synchronous option is true, issue sync command as soon as there * If synchronous option is true, issue sync command as soon as there
* are WAL data which has not been flushed yet. * are WAL data which has not been flushed yet.
*/ */
if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1) if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
{ {
if (stream->do_sync && fsync(walfile) != 0) if (stream->walmethod->fsync(walfile) != 0)
{ {
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
progname, current_walfile_name, strerror(errno)); progname, current_walfile_name, stream->walmethod->getlasterror());
goto error; goto error;
} }
lastFlushPosition = blockpos; lastFlushPosition = blockpos;
...@@ -1100,7 +1002,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, ...@@ -1100,7 +1002,7 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
if (replyRequested && still_sending) if (replyRequested && still_sending)
{ {
if (reportFlushPosition && lastFlushPosition < blockpos && if (reportFlushPosition && lastFlushPosition < blockpos &&
walfile != -1) walfile != NULL)
{ {
/* /*
* If a valid flush location needs to be reported, flush the * If a valid flush location needs to be reported, flush the
...@@ -1109,10 +1011,10 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, ...@@ -1109,10 +1011,10 @@ ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
* data has been successfully replicated or not, at the normal * data has been successfully replicated or not, at the normal
* shutdown of the server. * shutdown of the server.
*/ */
if (stream->do_sync && fsync(walfile) != 0) if (stream->walmethod->fsync(walfile) != 0)
{ {
fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
progname, current_walfile_name, strerror(errno)); progname, current_walfile_name, stream->walmethod->getlasterror());
return false; return false;
} }
lastFlushPosition = blockpos; lastFlushPosition = blockpos;
...@@ -1170,7 +1072,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, ...@@ -1170,7 +1072,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
* Verify that the initial location in the stream matches where we think * Verify that the initial location in the stream matches where we think
* we are. * we are.
*/ */
if (walfile == -1) if (walfile == NULL)
{ {
/* No file open yet */ /* No file open yet */
if (xlogoff != 0) if (xlogoff != 0)
...@@ -1184,12 +1086,11 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, ...@@ -1184,12 +1086,11 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
else else
{ {
/* More data in existing segment */ /* More data in existing segment */
/* XXX: store seek value don't reseek all the time */ if (stream->walmethod->get_current_pos(walfile) != xlogoff)
if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
{ {
fprintf(stderr, fprintf(stderr,
_("%s: got WAL data offset %08x, expected %08x\n"), _("%s: got WAL data offset %08x, expected %08x\n"),
progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile));
return false; return false;
} }
} }
...@@ -1210,7 +1111,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, ...@@ -1210,7 +1111,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
else else
bytes_to_write = bytes_left; bytes_to_write = bytes_left;
if (walfile == -1) if (walfile == NULL)
{ {
if (!open_walfile(stream, *blockpos)) if (!open_walfile(stream, *blockpos))
{ {
...@@ -1219,14 +1120,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len, ...@@ -1219,14 +1120,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
} }
} }
if (write(walfile, if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
copybuf + hdr_len + bytes_written, bytes_to_write) != bytes_to_write)
bytes_to_write) != bytes_to_write)
{ {
fprintf(stderr, fprintf(stderr,
_("%s: could not write %u bytes to WAL file \"%s\": %s\n"), _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
progname, bytes_to_write, current_walfile_name, progname, bytes_to_write, current_walfile_name,
strerror(errno)); stream->walmethod->getlasterror());
return false; return false;
} }
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#define RECEIVELOG_H #define RECEIVELOG_H
#include "libpq-fe.h" #include "libpq-fe.h"
#include "walmethods.h"
#include "access/xlogdefs.h" #include "access/xlogdefs.h"
...@@ -41,7 +42,7 @@ typedef struct StreamCtl ...@@ -41,7 +42,7 @@ typedef struct StreamCtl
stream_stop_callback stream_stop; /* Stop streaming when returns true */ stream_stop_callback stream_stop; /* Stop streaming when returns true */
char *basedir; /* Received segments written to this dir */ WalWriteMethod *walmethod; /* How to write the WAL */
char *partial_suffix; /* Suffix appended to partially received files */ char *partial_suffix; /* Suffix appended to partially received files */
} StreamCtl; } StreamCtl;
......
...@@ -4,7 +4,7 @@ use Cwd; ...@@ -4,7 +4,7 @@ use Cwd;
use Config; use Config;
use PostgresNode; use PostgresNode;
use TestLib; use TestLib;
use Test::More tests => 67; use Test::More tests => 69;
program_help_ok('pg_basebackup'); program_help_ok('pg_basebackup');
program_version_ok('pg_basebackup'); program_version_ok('pg_basebackup');
...@@ -237,6 +237,10 @@ $node->command_ok( ...@@ -237,6 +237,10 @@ $node->command_ok(
'pg_basebackup -X stream runs'); 'pg_basebackup -X stream runs');
ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxf/pg_wal")), ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxf/pg_wal")),
'WAL files copied'); 'WAL files copied');
$node->command_ok(
[ 'pg_basebackup', '-D', "$tempdir/backupxst", '-X', 'stream', '-Ft' ],
'pg_basebackup -X stream runs in tar mode');
ok(-f "$tempdir/backupxst/pg_wal.tar", "tar file was created");
$node->command_fails( $node->command_fails(
[ 'pg_basebackup', '-D', "$tempdir/fail", '-S', 'slot1' ], [ 'pg_basebackup', '-D', "$tempdir/fail", '-S', 'slot1' ],
......
/*-------------------------------------------------------------------------
*
* walmethods.c - implementations of different ways to write received wal
*
* NOTE! The caller must ensure that only one method is instantiated in
* any given program, and that it's only instantiated once!
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/walmethods.c
*-------------------------------------------------------------------------
*/
#include "postgres_fe.h"
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
#ifdef HAVE_LIBZ
#include <zlib.h>
#endif
#include "pgtar.h"
#include "common/file_utils.h"
#include "receivelog.h"
#include "streamutil.h"
/* Size of zlib buffer for .tar.gz */
#define ZLIB_OUT_SIZE 4096
/*-------------------------------------------------------------------------
* WalDirectoryMethod - write wal to a directory looking like pg_xlog
*-------------------------------------------------------------------------
*/
/*
* Global static data for this method
*/
typedef struct DirectoryMethodData
{
char *basedir;
bool sync;
} DirectoryMethodData;
static DirectoryMethodData *dir_data = NULL;
/*
* Local file handle
*/
typedef struct DirectoryMethodFile
{
int fd;
off_t currpos;
char *pathname;
char *fullpath;
char *temp_suffix;
} DirectoryMethodFile;
static char *
dir_getlasterror(void)
{
/* Directory method always sets errno, so just use strerror */
return strerror(errno);
}
static Walfile
dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
{
static char tmppath[MAXPGPATH];
int fd;
DirectoryMethodFile *f;
snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0)
return NULL;
if (pad_to_size)
{
/* Always pre-pad on regular files */
char *zerobuf;
int bytes;
zerobuf = pg_malloc0(XLOG_BLCKSZ);
for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
{
if (write(fd, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
int save_errno = errno;
pg_free(zerobuf);
close(fd);
errno = save_errno;
return NULL;
}
}
pg_free(zerobuf);
if (lseek(fd, 0, SEEK_SET) != 0)
{
int save_errno = errno;
close(fd);
errno = save_errno;
return NULL;
}
}
/*
* fsync WAL file and containing directory, to ensure the file is
* persistently created and zeroed (if padded). That's particularly
* important when using synchronous mode, where the file is modified and
* fsynced in-place, without a directory fsync.
*/
if (dir_data->sync)
{
if (fsync_fname(tmppath, false, progname) != 0 ||
fsync_parent_path(tmppath, progname) != 0)
{
close(fd);
return NULL;
}
}
f = pg_malloc0(sizeof(DirectoryMethodFile));
f->fd = fd;
f->currpos = 0;
f->pathname = pg_strdup(pathname);
f->fullpath = pg_strdup(tmppath);
if (temp_suffix)
f->temp_suffix = pg_strdup(temp_suffix);
return f;
}
static ssize_t
dir_write(Walfile f, const void *buf, size_t count)
{
ssize_t r;
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
Assert(f != NULL);
r = write(df->fd, buf, count);
if (r > 0)
df->currpos += r;
return r;
}
static off_t
dir_get_current_pos(Walfile f)
{
Assert(f != NULL);
/* Use a cached value to prevent lots of reseeks */
return ((DirectoryMethodFile *) f)->currpos;
}
static int
dir_close(Walfile f, WalCloseMethod method)
{
int r;
DirectoryMethodFile *df = (DirectoryMethodFile *) f;
static char tmppath[MAXPGPATH];
static char tmppath2[MAXPGPATH];
Assert(f != NULL);
r = close(df->fd);
if (r == 0)
{
/* Build path to the current version of the file */
if (method == CLOSE_NORMAL && df->temp_suffix)
{
/*
* If we have a temp prefix, normal operation is to rename the
* file.
*/
snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
dir_data->basedir, df->pathname, df->temp_suffix);
snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
dir_data->basedir, df->pathname);
r = durable_rename(tmppath, tmppath2, progname);
}
else if (method == CLOSE_UNLINK)
{
/* Unlink the file once it's closed */
snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
r = unlink(tmppath);
}
else
{
/*
* Else either CLOSE_NORMAL and no temp suffix, or
* CLOSE_NO_RENAME. In this case, fsync the file and containing
* directory if sync mode is requested.
*/
if (dir_data->sync)
{
r = fsync_fname(df->fullpath, false, progname);
if (r == 0)
r = fsync_parent_path(df->fullpath, progname);
}
}
}
pg_free(df->pathname);
pg_free(df->fullpath);
if (df->temp_suffix)
pg_free(df->temp_suffix);
pg_free(df);
return r;
}
static int
dir_fsync(Walfile f)
{
Assert(f != NULL);
if (!dir_data->sync)
return 0;
return fsync(((DirectoryMethodFile *) f)->fd);
}
static ssize_t
dir_get_file_size(const char *pathname)
{
struct stat statbuf;
static char tmppath[MAXPGPATH];
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, pathname);
if (stat(tmppath, &statbuf) != 0)
return -1;
return statbuf.st_size;
}
static bool
dir_existsfile(const char *pathname)
{
static char tmppath[MAXPGPATH];
int fd;
snprintf(tmppath, sizeof(tmppath), "%s/%s",
dir_data->basedir, pathname);
fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
if (fd < 0)
return false;
close(fd);
return true;
}
static bool
dir_finish(void)
{
if (dir_data->sync)
{
/*
* Files are fsynced when they are closed, but we need to fsync the
* directory entry here as well.
*/
if (fsync_fname(dir_data->basedir, true, progname) != 0)
return false;
}
return true;
}
WalWriteMethod *
CreateWalDirectoryMethod(const char *basedir, bool sync)
{
WalWriteMethod *method;
method = pg_malloc0(sizeof(WalWriteMethod));
method->open_for_write = dir_open_for_write;
method->write = dir_write;
method->get_current_pos = dir_get_current_pos;
method->get_file_size = dir_get_file_size;
method->close = dir_close;
method->fsync = dir_fsync;
method->existsfile = dir_existsfile;
method->finish = dir_finish;
method->getlasterror = dir_getlasterror;
dir_data = pg_malloc0(sizeof(DirectoryMethodData));
dir_data->basedir = pg_strdup(basedir);
dir_data->sync = sync;
return method;
}
/*-------------------------------------------------------------------------
* WalTarMethod - write wal to a tar file containing pg_xlog contents
*-------------------------------------------------------------------------
*/
typedef struct TarMethodFile
{
off_t ofs_start; /* Where does the *header* for this file start */
off_t currpos;
char header[512];
char *pathname;
size_t pad_to_size;
} TarMethodFile;
typedef struct TarMethodData
{
char *tarfilename;
int fd;
int compression;
bool sync;
TarMethodFile *currentfile;
char lasterror[1024];
#ifdef HAVE_LIBZ
z_streamp zp;
void *zlibOut;
#endif
} TarMethodData;
static TarMethodData *tar_data = NULL;
#define tar_clear_error() tar_data->lasterror[0] = '\0'
#define tar_set_error(msg) strlcpy(tar_data->lasterror, msg, sizeof(tar_data->lasterror))
static char *
tar_getlasterror(void)
{
/*
* If a custom error is set, return that one. Otherwise, assume errno is
* set and return that one.
*/
if (tar_data->lasterror[0])
return tar_data->lasterror;
return strerror(errno);
}
#ifdef HAVE_LIBZ
static bool
tar_write_compressed_data(void *buf, size_t count, bool flush)
{
tar_data->zp->next_in = buf;
tar_data->zp->avail_in = count;
while (tar_data->zp->avail_in || flush)
{
int r;
r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
if (r == Z_STREAM_ERROR)
{
tar_set_error("deflate failed");
return false;
}
if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
{
size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
if (write(tar_data->fd, tar_data->zlibOut, len) != len)
return false;
tar_data->zp->next_out = tar_data->zlibOut;
tar_data->zp->avail_out = ZLIB_OUT_SIZE;
}
if (r == Z_STREAM_END)
break;
}
if (flush)
{
/* Reset the stream for writing */
if (deflateReset(tar_data->zp) != Z_OK)
{
tar_set_error("deflateReset failed");
return false;
}
}
return true;
}
#endif
static ssize_t
tar_write(Walfile f, const void *buf, size_t count)
{
ssize_t r;
Assert(f != NULL);
tar_clear_error();
/* Tarfile will always be positioned at the end */
if (!tar_data->compression)
{
r = write(tar_data->fd, buf, count);
if (r > 0)
((TarMethodFile *) f)->currpos += r;
return r;
}
#ifdef HAVE_LIBZ
else
{
if (!tar_write_compressed_data((void *) buf, count, false))
return -1;
((TarMethodFile *) f)->currpos += count;
return count;
}
#endif
}
static bool
tar_write_padding_data(TarMethodFile * f, size_t bytes)
{
char *zerobuf = pg_malloc0(XLOG_BLCKSZ);
size_t bytesleft = bytes;
while (bytesleft)
{
size_t bytestowrite = bytesleft > XLOG_BLCKSZ ? XLOG_BLCKSZ : bytesleft;
size_t r = tar_write(f, zerobuf, bytestowrite);
if (r < 0)
return false;
bytesleft -= r;
}
return true;
}
static Walfile
tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
{
int save_errno;
static char tmppath[MAXPGPATH];
tar_clear_error();
if (tar_data->fd < 0)
{
/*
* We open the tar file only when we first try to write to it.
*/
tar_data->fd = open(tar_data->tarfilename,
O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (tar_data->fd < 0)
return NULL;
#ifdef HAVE_LIBZ
if (tar_data->compression)
{
tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
tar_data->zp->zalloc = Z_NULL;
tar_data->zp->zfree = Z_NULL;
tar_data->zp->opaque = Z_NULL;
tar_data->zp->next_out = tar_data->zlibOut;
tar_data->zp->avail_out = ZLIB_OUT_SIZE;
/*
* Initialize deflation library. Adding the magic value 16 to the
* default 15 for the windowBits parameter makes the output be
* gzip instead of zlib.
*/
if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
{
pg_free(tar_data->zp);
tar_data->zp = NULL;
tar_set_error("deflateInit2 failed");
return NULL;
}
}
#endif
/* There's no tar header itself, the file starts with regular files */
}
Assert(tar_data->currentfile == NULL);
if (tar_data->currentfile != NULL)
{
tar_set_error("implementation error: tar files can't have more than one open file\n");
return NULL;
}
tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
snprintf(tmppath, sizeof(tmppath), "%s%s",
pathname, temp_suffix ? temp_suffix : "");
/* Create a header with size set to 0 - we will fill out the size on close */
if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
{
pg_free(tar_data->currentfile);
tar_data->currentfile = NULL;
tar_set_error("could not create tar header");
return NULL;
}
#ifdef HAVE_LIBZ
if (tar_data->compression)
{
/* Flush existing data */
if (!tar_write_compressed_data(NULL, 0, true))
return NULL;
/* Turn off compression for header */
if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
{
tar_set_error("deflateParams failed");
return NULL;
}
}
#endif
tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
if (tar_data->currentfile->ofs_start == -1)
{
save_errno = errno;
pg_free(tar_data->currentfile);
tar_data->currentfile = NULL;
errno = save_errno;
return NULL;
}
tar_data->currentfile->currpos = 0;
if (!tar_data->compression)
{
if (write(tar_data->fd, tar_data->currentfile->header, 512) != 512)
{
save_errno = errno;
pg_free(tar_data->currentfile);
tar_data->currentfile = NULL;
errno = save_errno;
return NULL;
}
}
#ifdef HAVE_LIBZ
else
{
/* Write header through the zlib APIs but with no compression */
if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
return NULL;
/* Re-enable compression for the rest of the file */
if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
{
tar_set_error("deflateParams failed");
return NULL;
}
}
#endif
tar_data->currentfile->pathname = pg_strdup(pathname);
/*
* Uncompressed files are padded on creation, but for compression we can't
* do that
*/
if (pad_to_size)
{
tar_data->currentfile->pad_to_size = pad_to_size;
if (!tar_data->compression)
{
/* Uncompressed, so pad now */
tar_write_padding_data(tar_data->currentfile, pad_to_size);
/* Seek back to start */
if (lseek(tar_data->fd, tar_data->currentfile->ofs_start + 512, SEEK_SET) != tar_data->currentfile->ofs_start + 512)
return NULL;
tar_data->currentfile->currpos = 0;
}
}
return tar_data->currentfile;
}
static ssize_t
tar_get_file_size(const char *pathname)
{
tar_clear_error();
/* Currently not used, so not supported */
errno = ENOSYS;
return -1;
}
static off_t
tar_get_current_pos(Walfile f)
{
Assert(f != NULL);
tar_clear_error();
return ((TarMethodFile *) f)->currpos;
}
static int
tar_fsync(Walfile f)
{
Assert(f != NULL);
tar_clear_error();
/*
* Always sync the whole tarfile, because that's all we can do. This makes
* no sense on compressed files, so just ignore those.
*/
if (tar_data->compression)
return 0;
return fsync(tar_data->fd);
}
static int
tar_close(Walfile f, WalCloseMethod method)
{
ssize_t filesize;
int padding;
TarMethodFile *tf = (TarMethodFile *) f;
Assert(f != NULL);
tar_clear_error();
if (method == CLOSE_UNLINK)
{
if (tar_data->compression)
{
tar_set_error("unlink not supported with compression");
return -1;
}
/*
* Unlink the file that we just wrote to the tar. We do this by
* truncating it to the start of the header. This is safe as we only
* allow writing of the very last file.
*/
if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
return -1;
pg_free(tf->pathname);
pg_free(tf);
tar_data->currentfile = NULL;
return 0;
}
/*
* Pad the file itself with zeroes if necessary. Note that this is
* different from the tar format padding -- this is the padding we asked
* for when the file was opened.
*/
if (tf->pad_to_size)
{
if (tar_data->compression)
{
/*
* A compressed tarfile is padded on close since we cannot know
* the size of the compressed output until the end.
*/
size_t sizeleft = tf->pad_to_size - tf->currpos;
if (sizeleft)
{
if (!tar_write_padding_data(tf, sizeleft))
return -1;
}
}
else
{
/*
* An uncompressed tarfile was padded on creation, so just adjust
* the current position as if we seeked to the end.
*/
tf->currpos = tf->pad_to_size;
}
}
/*
* Get the size of the file, and pad the current data up to the nearest
* 512 byte boundary.
*/
filesize = tar_get_current_pos(f);
padding = ((filesize + 511) & ~511) - filesize;
if (padding)
{
char zerobuf[512];
MemSet(zerobuf, 0, padding);
if (tar_write(f, zerobuf, padding) != padding)
return -1;
}
#ifdef HAVE_LIBZ
if (tar_data->compression)
{
/* Flush the current buffer */
if (!tar_write_compressed_data(NULL, 0, true))
{
errno = EINVAL;
return -1;
}
}
#endif
/*
* Now go back and update the header with the correct filesize and
* possibly also renaming the file. We overwrite the entire current header
* when done, including the checksum.
*/
print_tar_number(&(tf->header[124]), 12, filesize);
if (method == CLOSE_NORMAL)
/*
* We overwrite it with what it was before if we have no tempname,
* since we're going to write the buffer anyway.
*/
strlcpy(&(tf->header[0]), tf->pathname, 100);
print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
return -1;
if (!tar_data->compression)
{
if (write(tar_data->fd, tf->header, 512) != 512)
return -1;
}
#ifdef HAVE_LIBZ
else
{
/* Turn off compression */
if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
{
tar_set_error("deflateParams failed");
return -1;
}
/* Overwrite the header, assuming the size will be the same */
if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
return -1;
/* Turn compression back on */
if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
{
tar_set_error("deflateParams failed");
return -1;
}
}
#endif
/* Move file pointer back down to end, so we can write the next file */
if (lseek(tar_data->fd, 0, SEEK_END) < 0)
return -1;
/* Always fsync on close, so the padding gets fsynced */
tar_fsync(f);
/* Clean up and done */
pg_free(tf->pathname);
pg_free(tf);
tar_data->currentfile = NULL;
return 0;
}
static bool
tar_existsfile(const char *pathname)
{
tar_clear_error();
/* We only deal with new tarfiles, so nothing externally created exists */
return false;
}
static bool
tar_finish(void)
{
char zerobuf[1024];
tar_clear_error();
if (tar_data->currentfile)
{
if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
return false;
}
/* A tarfile always ends with two empty blocks */
MemSet(zerobuf, 0, sizeof(zerobuf));
if (!tar_data->compression)
{
if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
return false;
}
#ifdef HAVE_LIBZ
else
{
if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
return false;
/* Also flush all data to make sure the gzip stream is finished */
tar_data->zp->next_in = NULL;
tar_data->zp->avail_in = 0;
while (true)
{
int r;
r = deflate(tar_data->zp, Z_FINISH);
if (r == Z_STREAM_ERROR)
{
tar_set_error("deflate failed");
return false;
}
if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
{
size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
if (write(tar_data->fd, tar_data->zlibOut, len) != len)
return false;
}
if (r == Z_STREAM_END)
break;
}
if (deflateEnd(tar_data->zp) != Z_OK)
{
tar_set_error("deflateEnd failed");
return false;
}
}
#endif
/* sync the empty blocks as well, since they're after the last file */
fsync(tar_data->fd);
if (close(tar_data->fd) != 0)
return false;
tar_data->fd = -1;
if (tar_data->sync)
{
if (fsync_fname(tar_data->tarfilename, false, progname) != 0)
return false;
if (fsync_parent_path(tar_data->tarfilename, progname) != 0)
return false;
}
return true;
}
WalWriteMethod *
CreateWalTarMethod(const char *tarbase, int compression, bool sync)
{
WalWriteMethod *method;
const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
method = pg_malloc0(sizeof(WalWriteMethod));
method->open_for_write = tar_open_for_write;
method->write = tar_write;
method->get_current_pos = tar_get_current_pos;
method->get_file_size = tar_get_file_size;
method->close = tar_close;
method->fsync = tar_fsync;
method->existsfile = tar_existsfile;
method->finish = tar_finish;
method->getlasterror = tar_getlasterror;
tar_data = pg_malloc0(sizeof(TarMethodData));
tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
tar_data->fd = -1;
tar_data->compression = compression;
tar_data->sync = sync;
if (compression)
tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
return method;
}
/*-------------------------------------------------------------------------
*
* walmethods.h
*
* Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/bin/pg_basebackup/walmethods.h
*-------------------------------------------------------------------------
*/
typedef void *Walfile;
typedef enum
{
CLOSE_NORMAL,
CLOSE_UNLINK,
CLOSE_NO_RENAME,
} WalCloseMethod;
typedef struct WalWriteMethod WalWriteMethod;
struct WalWriteMethod
{
Walfile(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
int (*close) (Walfile f, WalCloseMethod method);
bool (*existsfile) (const char *pathname);
ssize_t (*get_file_size) (const char *pathname);
ssize_t (*write) (Walfile f, const void *buf, size_t count);
off_t (*get_current_pos) (Walfile f);
int (*fsync) (Walfile f);
bool (*finish) (void);
char *(*getlasterror) (void);
};
/*
* Available WAL methods:
* - WalDirectoryMethod - write WAL to regular files in a standard pg_xlog
* - TarDirectoryMethod - write WAL to a tarfile corresponding to pg_xlog
* (only implements the methods required for pg_basebackup,
* not all those required for pg_receivexlog)
*/
WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, bool sync);
WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync);
...@@ -22,4 +22,5 @@ enum tarError ...@@ -22,4 +22,5 @@ enum tarError
extern enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget, extern enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget,
pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime); pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime);
extern uint64 read_tar_number(const char *s, int len); extern uint64 read_tar_number(const char *s, int len);
extern void print_tar_number(char *s, int len, uint64 val);
extern int tarChecksum(char *header); extern int tarChecksum(char *header);
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
* support only non-negative numbers, so we don't worry about the GNU rules * support only non-negative numbers, so we don't worry about the GNU rules
* for handling negative numbers.) * for handling negative numbers.)
*/ */
static void void
print_tar_number(char *s, int len, uint64 val) print_tar_number(char *s, int len, uint64 val)
{ {
if (val < (((uint64) 1) << ((len - 1) * 3))) if (val < (((uint64) 1) << ((len - 1) * 3)))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment