Commit e7cc8437 authored by Magnus Hagander's avatar Magnus Hagander

Pre-pad WAL files when streaming transaction log

Instead of filling files as they appear, pre-pad the
WAL files received when streaming xlog the same way
that the server does. Data is streamed into a .partial
file which is then renamed()d into palce when it's complete,
but it will always be 16MB.

This also means that the starting position for pg_receivexlog
is now simply right after the last complete segment, and we
never need to deal with partial segments there.

Patch by me, review by Fujii Masao
parent 4429f6a9
...@@ -71,33 +71,10 @@ usage(void) ...@@ -71,33 +71,10 @@ usage(void)
static bool static bool
segment_callback(XLogRecPtr segendpos, uint32 timeline) segment_callback(XLogRecPtr segendpos, uint32 timeline)
{ {
char fn[MAXPGPATH];
struct stat statbuf;
if (verbose) if (verbose)
fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"), fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
progname, segendpos.xlogid, segendpos.xrecoff, timeline); progname, segendpos.xlogid, segendpos.xrecoff, timeline);
/*
* Check if there is a partial file for the name we just finished, and if
* there is, remove it under the assumption that we have now got all the
* data we need.
*/
segendpos.xrecoff /= XLOG_SEG_SIZE;
PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
basedir, timeline,
segendpos.xlogid,
segendpos.xrecoff);
if (stat(fn, &statbuf) == 0)
{
/* File existed, get rid of it */
if (verbose)
fprintf(stderr, _("%s: removing file \"%s\"\n"),
progname, fn);
unlink(fn);
}
/* /*
* Never abort from this - we handle all aborting in continue_streaming() * Never abort from this - we handle all aborting in continue_streaming()
*/ */
...@@ -119,9 +96,8 @@ continue_streaming(void) ...@@ -119,9 +96,8 @@ continue_streaming(void)
/* /*
* Determine starting location for streaming, based on: * Determine starting location for streaming, based on:
* 1. If there are existing xlog segments, start at the end of the last one * 1. If there are existing xlog segments, start at the end of the last one
* 2. If the last one is a partial segment, rename it and start over, since * that is complete (size matches XLogSegSize)
* we don't sync after every write. * 2. If no valid xlog exists, start from the beginning of the current
* 3. If no existing xlog exists, start from the beginning of the current
* WAL segment. * WAL segment.
*/ */
static XLogRecPtr static XLogRecPtr
...@@ -133,7 +109,6 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) ...@@ -133,7 +109,6 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
bool b; bool b;
uint32 high_log = 0; uint32 high_log = 0;
uint32 high_seg = 0; uint32 high_seg = 0;
bool partial = false;
dir = opendir(basedir); dir = opendir(basedir);
if (dir == NULL) if (dir == NULL)
...@@ -195,7 +170,7 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) ...@@ -195,7 +170,7 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
disconnect_and_exit(1); disconnect_and_exit(1);
} }
if (statbuf.st_size == 16 * 1024 * 1024) if (statbuf.st_size == XLOG_SEG_SIZE)
{ {
/* Completed segment */ /* Completed segment */
if (log > high_log || if (log > high_log ||
...@@ -208,37 +183,9 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) ...@@ -208,37 +183,9 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
} }
else else
{ {
/* fprintf(stderr, _("%s: segment file '%s' is incorrect size %d, skipping\n"),
* This is a partial file. Rename it out of the way. progname, dirent->d_name, (int) statbuf.st_size);
*/ continue;
char newfn[MAXPGPATH];
fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
progname, dirent->d_name, dirent->d_name);
snprintf(newfn, sizeof(newfn), "%s/%s.partial",
basedir, dirent->d_name);
if (stat(newfn, &statbuf) == 0)
{
/*
* XXX: perhaps we should only error out if the existing file
* is larger?
*/
fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
progname, newfn);
disconnect_and_exit(1);
}
if (rename(fullpath, newfn) != 0)
{
fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
progname, fullpath, newfn, strerror(errno));
disconnect_and_exit(1);
}
/* Don't continue looking for more, we assume this is the last */
partial = true;
break;
} }
} }
...@@ -247,17 +194,11 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) ...@@ -247,17 +194,11 @@ FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
if (high_log > 0 || high_seg > 0) if (high_log > 0 || high_seg > 0)
{ {
XLogRecPtr high_ptr; XLogRecPtr high_ptr;
/*
if (!partial) * Move the starting pointer to the start of the next segment,
{ * since the highest one we've seen was completed.
/* */
* If the segment was partial, the pointer is already at the right NextLogSeg(high_log, high_seg);
* location since we want to re-transmit that segment. If it was
* not, we need to move it to the next segment, since we are
* tracking the last one that was complete.
*/
NextLogSeg(high_log, high_seg);
}
high_ptr.xlogid = high_log; high_ptr.xlogid = high_log;
high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE; high_ptr.xrecoff = high_seg * XLOG_SEG_SIZE;
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include "receivelog.h" #include "receivelog.h"
#include "streamutil.h" #include "streamutil.h"
#include <sys/stat.h>
#include <sys/time.h> #include <sys/time.h>
#include <sys/types.h> #include <sys/types.h>
#include <unistd.h> #include <unistd.h>
...@@ -41,24 +42,128 @@ const XLogRecPtr InvalidXLogRecPtr = {0, 0}; ...@@ -41,24 +42,128 @@ const XLogRecPtr InvalidXLogRecPtr = {0, 0};
* Open a new WAL file in the specified directory. Store the name * Open a new WAL file in the specified directory. Store the name
* (not including the full directory) in namebuf. Assumes there is * (not including the full directory) in namebuf. Assumes there is
* enough room in this buffer... * enough room in this buffer...
*
* The file will be padded to 16Mb with zeroes.
*/ */
static int static int
open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf) open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
{ {
int f; int f;
char fn[MAXPGPATH]; char fn[MAXPGPATH];
struct stat statbuf;
char *zerobuf;
int bytes;
XLogFileName(namebuf, timeline, startpoint.xlogid, XLogFileName(namebuf, timeline, startpoint.xlogid,
startpoint.xrecoff / XLOG_SEG_SIZE); startpoint.xrecoff / XLOG_SEG_SIZE);
snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf); snprintf(fn, sizeof(fn), "%s/%s.partial", basedir, namebuf);
f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666); f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (f == -1) if (f == -1)
{
fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"), fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
progname, namebuf, strerror(errno)); progname, fn, strerror(errno));
return -1;
}
/*
* Verify that the file is either empty (just created), or a complete
* XLogSegSize segment. Anything in between indicates a corrupt file.
*/
if (fstat(f, &statbuf) != 0)
{
fprintf(stderr, _("%s: could not stat WAL segment %s: %s\n"),
progname, fn, strerror(errno));
close(f);
return -1;
}
if (statbuf.st_size == XLogSegSize)
return f; /* File is open and ready to use */
if (statbuf.st_size != 0)
{
fprintf(stderr, _("%s: WAL segment %s is %d bytes, should be 0 or %d\n"),
progname, fn, (int) statbuf.st_size, XLogSegSize);
close(f);
return -1;
}
/* New, empty, file. So pad it to 16Mb with zeroes */
zerobuf = xmalloc0(XLOG_BLCKSZ);
for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
{
if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{
fprintf(stderr, _("%s: could not pad WAL segment %s: %s\n"),
progname, fn, strerror(errno));
close(f);
unlink(fn);
return -1;
}
}
free(zerobuf);
if (lseek(f, SEEK_SET, 0) != 0)
{
fprintf(stderr, _("%s: could not seek back to beginning of WAL segment %s: %s\n"),
progname, fn, strerror(errno));
close(f);
return -1;
}
return f; return f;
} }
static bool
close_walfile(int walfile, char *basedir, char *walname)
{
off_t currpos = lseek(walfile, 0, SEEK_CUR);
if (currpos == -1)
{
fprintf(stderr, _("%s: could not get current position in file %s: %s\n"),
progname, walname, strerror(errno));
return false;
}
if (fsync(walfile) != 0)
{
fprintf(stderr, _("%s: could not fsync file %s: %s\n"),
progname, walname, strerror(errno));
return false;
}
if (close(walfile) != 0)
{
fprintf(stderr, _("%s: could not close file %s: %s\n"),
progname, walname, strerror(errno));
return false;
}
/*
* Rename the .partial file only if we've completed writing the
* whole segment.
*/
if (currpos == XLOG_SEG_SIZE)
{
char oldfn[MAXPGPATH];
char newfn[MAXPGPATH];
snprintf(oldfn, sizeof(oldfn), "%s/%s.partial", basedir, walname);
snprintf(newfn, sizeof(newfn), "%s/%s", basedir, walname);
if (rename(oldfn, newfn) != 0)
{
fprintf(stderr, _("%s: could not rename file %s: %s\n"),
progname, walname, strerror(errno));
return false;
}
}
else
fprintf(stderr, _("%s: not renaming %s, segment is not complete.\n"),
progname, walname);
return true;
}
/* /*
* Local version of GetCurrentTimestamp(), since we are not linked with * Local version of GetCurrentTimestamp(), since we are not linked with
* backend code. * backend code.
...@@ -178,10 +283,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -178,10 +283,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
if (stream_continue && stream_continue()) if (stream_continue && stream_continue())
{ {
if (walfile != -1) if (walfile != -1)
{ /* Potential error message is written by close_walfile */
fsync(walfile); return close_walfile(walfile, basedir, current_walfile_name);
close(walfile);
}
return true; return true;
} }
...@@ -360,8 +463,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -360,8 +463,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
/* Did we reach the end of a WAL segment? */ /* Did we reach the end of a WAL segment? */
if (blockpos.xrecoff % XLOG_SEG_SIZE == 0) if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
{ {
fsync(walfile); if (!close_walfile(walfile, basedir, current_walfile_name))
close(walfile); /* Error message written in close_walfile() */
return false;
walfile = -1; walfile = -1;
xlogoff = 0; xlogoff = 0;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment