Commit 31d96581 authored by Magnus Hagander's avatar Magnus Hagander

Fix base backup streaming xlog from standby

When backing up from a standby server, the backup process
will not automatically switch xlog segment. So we must
accept a partially transferred xlog file in this case, but
rename it into position anyway.

In passing, merge the two callbacks for segment end and
stop stream into a single callback, since their implementations
were close to identical, and rename this callback to
reflect that it stops streaming rather than continues it.

Patch by Magnus Hagander, review by Fujii Masao
parent d226e236
...@@ -78,7 +78,7 @@ static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); ...@@ -78,7 +78,7 @@ static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
static void BaseBackup(void); static void BaseBackup(void);
static bool segment_callback(XLogRecPtr segendpos, uint32 timeline); static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
#ifdef HAVE_LIBZ #ifdef HAVE_LIBZ
static const char * static const char *
...@@ -129,8 +129,7 @@ usage(void) ...@@ -129,8 +129,7 @@ usage(void)
/* /*
* Called in the background process whenever a complete segment of WAL * Called in the background process every time data is received.
* has been received.
* On Unix, we check to see if there is any data on our pipe * On Unix, we check to see if there is any data on our pipe
* (which would mean we have a stop position), and if it is, check if * (which would mean we have a stop position), and if it is, check if
* it is time to stop. * it is time to stop.
...@@ -138,7 +137,7 @@ usage(void) ...@@ -138,7 +137,7 @@ usage(void)
* time to stop. * time to stop.
*/ */
static bool static bool
segment_callback(XLogRecPtr segendpos, uint32 timeline) reached_end_position(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
{ {
if (!has_xlogendptr) if (!has_xlogendptr)
{ {
...@@ -231,7 +230,7 @@ LogStreamerMain(logstreamer_param * param) ...@@ -231,7 +230,7 @@ LogStreamerMain(logstreamer_param * param)
{ {
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir, param->sysidentifier, param->xlogdir,
segment_callback, NULL, standby_message_timeout)) reached_end_position, standby_message_timeout, true))
/* /*
* Any errors will already have been reported in the function process, * Any errors will already have been reported in the function process,
......
...@@ -43,7 +43,7 @@ volatile bool time_to_abort = false; ...@@ -43,7 +43,7 @@ volatile bool time_to_abort = false;
static void usage(void); static void usage(void);
static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline); static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline);
static void StreamLog(); static void StreamLog();
static bool segment_callback(XLogRecPtr segendpos, uint32 timeline); static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
static void static void
usage(void) usage(void)
...@@ -69,21 +69,12 @@ usage(void) ...@@ -69,21 +69,12 @@ usage(void)
} }
static bool static bool
segment_callback(XLogRecPtr segendpos, uint32 timeline) stop_streaming(XLogRecPtr segendpos, uint32 timeline, bool segment_finished)
{ {
if (verbose) if (verbose && segment_finished)
fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"), fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
progname, segendpos.xlogid, segendpos.xrecoff, timeline); progname, segendpos.xlogid, segendpos.xrecoff, timeline);
/*
* Never abort from this - we handle all aborting in continue_streaming()
*/
return false;
}
static bool
continue_streaming(void)
{
if (time_to_abort) if (time_to_abort)
{ {
fprintf(stderr, _("%s: received interrupt signal, exiting.\n"), fprintf(stderr, _("%s: received interrupt signal, exiting.\n"),
...@@ -268,8 +259,8 @@ StreamLog(void) ...@@ -268,8 +259,8 @@ StreamLog(void)
progname, startpos.xlogid, startpos.xrecoff, timeline); progname, startpos.xlogid, startpos.xrecoff, timeline);
ReceiveXlogStream(conn, startpos, timeline, NULL, basedir, ReceiveXlogStream(conn, startpos, timeline, NULL, basedir,
segment_callback, continue_streaming, stop_streaming,
standby_message_timeout); standby_message_timeout, false);
PQfinish(conn); PQfinish(conn);
} }
......
...@@ -113,8 +113,14 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu ...@@ -113,8 +113,14 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu
return f; return f;
} }
/*
* Close the current WAL file, and rename it to the correct filename if it's complete.
*
* If segment_complete is true, rename the current WAL file even if we've not
* completed writing the whole segment.
*/
static bool static bool
close_walfile(int walfile, char *basedir, char *walname) close_walfile(int walfile, char *basedir, char *walname, bool segment_complete)
{ {
off_t currpos = lseek(walfile, 0, SEEK_CUR); off_t currpos = lseek(walfile, 0, SEEK_CUR);
...@@ -141,9 +147,9 @@ close_walfile(int walfile, char *basedir, char *walname) ...@@ -141,9 +147,9 @@ close_walfile(int walfile, char *basedir, char *walname)
/* /*
* Rename the .partial file only if we've completed writing the * Rename the .partial file only if we've completed writing the
* whole segment. * whole segment or segment_complete is true.
*/ */
if (currpos == XLOG_SEG_SIZE) if (currpos == XLOG_SEG_SIZE || segment_complete)
{ {
char oldfn[MAXPGPATH]; char oldfn[MAXPGPATH];
char newfn[MAXPGPATH]; char newfn[MAXPGPATH];
...@@ -199,11 +205,10 @@ localGetCurrentTimestamp(void) ...@@ -199,11 +205,10 @@ localGetCurrentTimestamp(void)
* All received segments will be written to the directory * All received segments will be written to the directory
* specified by basedir. * specified by basedir.
* *
* The segment_finish callback will be called after each segment * The stream_stop callback will be called every time data
* has been finished, and the stream_continue callback will be * is received, and whenever a segment is completed. If it returns
* called every time data is received. If either of these callbacks * true, the streaming will stop and the function
* return true, the streaming will stop and the function * return. As long as it returns false, streaming will continue
* return. As long as they return false, streaming will continue
* indefinitely. * indefinitely.
* *
* standby_message_timeout controls how often we send a message * standby_message_timeout controls how often we send a message
...@@ -214,7 +219,7 @@ localGetCurrentTimestamp(void) ...@@ -214,7 +219,7 @@ localGetCurrentTimestamp(void)
* Note: The log position *must* be at a log segment start! * Note: The log position *must* be at a log segment start!
*/ */
bool bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, segment_finish_callback segment_finish, stream_continue_callback stream_continue, int standby_message_timeout) ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, bool rename_partial)
{ {
char query[128]; char query[128];
char current_walfile_name[MAXPGPATH]; char current_walfile_name[MAXPGPATH];
...@@ -288,11 +293,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -288,11 +293,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
/* /*
* Check if we should continue streaming, or abort at this point. * Check if we should continue streaming, or abort at this point.
*/ */
if (stream_continue && stream_continue()) if (stream_stop && stream_stop(blockpos, timeline, false))
{ {
if (walfile != -1) if (walfile != -1)
/* Potential error message is written by close_walfile */ /* Potential error message is written by close_walfile */
return close_walfile(walfile, basedir, current_walfile_name); return close_walfile(walfile, basedir, current_walfile_name, rename_partial);
return true; return true;
} }
...@@ -486,20 +491,20 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi ...@@ -486,20 +491,20 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
/* Did we reach the end of a WAL segment? */ /* Did we reach the end of a WAL segment? */
if (blockpos.xrecoff % XLOG_SEG_SIZE == 0) if (blockpos.xrecoff % XLOG_SEG_SIZE == 0)
{ {
if (!close_walfile(walfile, basedir, current_walfile_name)) if (!close_walfile(walfile, basedir, current_walfile_name, false))
/* Error message written in close_walfile() */ /* Error message written in close_walfile() */
return false; return false;
walfile = -1; walfile = -1;
xlogoff = 0; xlogoff = 0;
if (segment_finish != NULL) if (stream_stop != NULL)
{ {
/* /*
* Callback when the segment finished, and return if it * Callback when the segment finished, and return if it
* told us to. * told us to.
*/ */
if (segment_finish(blockpos, timeline)) if (stream_stop(blockpos, timeline, true))
return true; return true;
} }
} }
......
#include "access/xlogdefs.h" #include "access/xlogdefs.h"
/* /*
* Called whenever a segment is finished, return true to stop * Called before trying to read more data or when a segment is
* the streaming at this point. * finished. Return true to stop streaming.
*/ */
typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline); typedef bool (*stream_stop_callback)(XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
/*
* Called before trying to read more data. Return true to stop
* the streaming at this point.
*/
typedef bool (*stream_continue_callback)(void);
extern bool ReceiveXlogStream(PGconn *conn, extern bool ReceiveXlogStream(PGconn *conn,
XLogRecPtr startpos, XLogRecPtr startpos,
uint32 timeline, uint32 timeline,
char *sysidentifier, char *sysidentifier,
char *basedir, char *basedir,
segment_finish_callback segment_finish, stream_stop_callback stream_stop,
stream_continue_callback stream_continue, int standby_message_timeout,
int standby_message_timeout); bool rename_partial);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment