Commit 38c83c9b authored by Magnus Hagander's avatar Magnus Hagander

Refactor receivelog.c parameters

Much cruft had accumulated over time with a large number of parameters
passed down between functions very deep. With this refactoring, instead
introduce a StreamCtl structure that holds the parameters, and pass around
a pointer to this structure instead. This makes it much easier to add or
remove fields that are needed deeper down in the implementation without
having to modify every function header in the file.

Patch by me after much nagging from Andres
Reviewed by Craig Ringer and Daniel Gustafsson
parent 73e7e49d
...@@ -372,10 +372,20 @@ typedef struct ...@@ -372,10 +372,20 @@ typedef struct
static int static int
LogStreamerMain(logstreamer_param *param) LogStreamerMain(logstreamer_param *param)
{ {
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, StreamCtl stream;
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout, MemSet(&stream, sizeof(stream), 0);
NULL, false, true)) stream.startpos = param->startptr;
stream.timeline = param->timeline;
stream.sysidentifier = param->sysidentifier;
stream.stream_stop = reached_end_position;
stream.standby_message_timeout = standby_message_timeout;
stream.synchronous = false;
stream.mark_done = true;
stream.basedir = param->xlogdir;
stream.partial_suffix = NULL;
if (!ReceiveXlogStream(param->bgconn, &stream))
/* /*
* Any errors will already have been reported in the function process, * Any errors will already have been reported in the function process,
......
...@@ -276,10 +276,11 @@ FindStreamingStart(uint32 *tli) ...@@ -276,10 +276,11 @@ FindStreamingStart(uint32 *tli)
static void static void
StreamLog(void) StreamLog(void)
{ {
XLogRecPtr startpos, XLogRecPtr serverpos;
serverpos; TimeLineID servertli;
TimeLineID starttli, StreamCtl stream;
servertli;
MemSet(&stream, 0, sizeof(stream));
/* /*
* Connect in replication mode to the server * Connect in replication mode to the server
...@@ -311,17 +312,17 @@ StreamLog(void) ...@@ -311,17 +312,17 @@ StreamLog(void)
/* /*
* Figure out where to start streaming. * Figure out where to start streaming.
*/ */
startpos = FindStreamingStart(&starttli); stream.startpos = FindStreamingStart(&stream.timeline);
if (startpos == InvalidXLogRecPtr) if (stream.startpos == InvalidXLogRecPtr)
{ {
startpos = serverpos; stream.startpos = serverpos;
starttli = servertli; stream.timeline = servertli;
} }
/* /*
* Always start streaming at the beginning of a segment * Always start streaming at the beginning of a segment
*/ */
startpos -= startpos % XLOG_SEG_SIZE; stream.startpos -= stream.startpos % XLOG_SEG_SIZE;
/* /*
* Start the replication * Start the replication
...@@ -329,12 +330,17 @@ StreamLog(void) ...@@ -329,12 +330,17 @@ StreamLog(void)
if (verbose) if (verbose)
fprintf(stderr, fprintf(stderr,
_("%s: starting log streaming at %X/%X (timeline %u)\n"), _("%s: starting log streaming at %X/%X (timeline %u)\n"),
progname, (uint32) (startpos >> 32), (uint32) startpos, progname, (uint32) (stream.startpos >> 32), (uint32) stream.startpos,
starttli); stream.timeline);
stream.stream_stop = stop_streaming;
stream.standby_message_timeout = standby_message_timeout;
stream.synchronous = synchronous;
stream.mark_done = false;
stream.basedir = basedir;
stream.partial_suffix = ".partial";
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, ReceiveXlogStream(conn, &stream);
stop_streaming, standby_message_timeout, ".partial",
synchronous, false);
PQfinish(conn); PQfinish(conn);
conn = NULL; conn = NULL;
......
...@@ -33,27 +33,18 @@ static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; ...@@ -33,27 +33,18 @@ static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static bool still_sending = true; /* feedback still needs to be sent? */ static bool still_sending = true; /* feedback still needs to be sent? */
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
uint32 timeline, char *basedir, XLogRecPtr *stoppos);
stream_stop_callback stream_stop, int standby_message_timeout,
char *partial_suffix, XLogRecPtr *stoppos,
bool synchronous, bool mark_done);
static int CopyStreamPoll(PGconn *conn, long timeout_ms); static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer); static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
XLogRecPtr blockpos, int64 *last_status); XLogRecPtr blockpos, int64 *last_status);
static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
XLogRecPtr *blockpos, uint32 timeline, XLogRecPtr *blockpos);
char *basedir, stream_stop_callback stream_stop, static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
char *partial_suffix, bool mark_done); XLogRecPtr blockpos, XLogRecPtr *stoppos);
static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf, static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
XLogRecPtr blockpos, char *basedir, char *partial_suffix, XLogRecPtr *stoppos);
XLogRecPtr *stoppos, bool mark_done);
static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop,
char *partial_suffix, XLogRecPtr *stoppos,
bool mark_done);
static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout, static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
int64 last_status); int64 last_status);
...@@ -99,8 +90,7 @@ mark_file_as_archived(const char *basedir, const char *fname) ...@@ -99,8 +90,7 @@ mark_file_as_archived(const char *basedir, const char *fname)
* partial_suffix) is stored in current_walfile_name. * partial_suffix) is stored in current_walfile_name.
*/ */
static bool static bool
open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
char *partial_suffix)
{ {
int f; int f;
char fn[MAXPGPATH]; char fn[MAXPGPATH];
...@@ -110,10 +100,10 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, ...@@ -110,10 +100,10 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
XLogSegNo segno; XLogSegNo segno;
XLByteToSeg(startpoint, segno); XLByteToSeg(startpoint, segno);
XLogFileName(current_walfile_name, timeline, segno); XLogFileName(current_walfile_name, stream->timeline, segno);
snprintf(fn, sizeof(fn), "%s/%s%s", basedir, current_walfile_name, snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name,
partial_suffix ? partial_suffix : ""); stream->partial_suffix ? stream->partial_suffix : "");
f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
if (f == -1) if (f == -1)
{ {
...@@ -185,7 +175,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, ...@@ -185,7 +175,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir,
* and returns false, otherwise returns true. * and returns false, otherwise returns true.
*/ */
static bool static bool
close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done) close_walfile(StreamCtl *stream, XLogRecPtr pos)
{ {
off_t currpos; off_t currpos;
...@@ -220,13 +210,13 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don ...@@ -220,13 +210,13 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
/* /*
* If we finished writing a .partial file, rename it into place. * If we finished writing a .partial file, rename it into place.
*/ */
if (currpos == XLOG_SEG_SIZE && partial_suffix) if (currpos == XLOG_SEG_SIZE && stream->partial_suffix)
{ {
char oldfn[MAXPGPATH]; char oldfn[MAXPGPATH];
char newfn[MAXPGPATH]; char newfn[MAXPGPATH];
snprintf(oldfn, sizeof(oldfn), "%s/%s%s", basedir, current_walfile_name, partial_suffix); snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix);
snprintf(newfn, sizeof(newfn), "%s/%s", basedir, current_walfile_name); snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name);
if (rename(oldfn, newfn) != 0) if (rename(oldfn, newfn) != 0)
{ {
fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"), fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
...@@ -234,10 +224,10 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don ...@@ -234,10 +224,10 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
return false; return false;
} }
} }
else if (partial_suffix) else if (stream->partial_suffix)
fprintf(stderr, fprintf(stderr,
_("%s: not renaming \"%s%s\", segment is not complete\n"), _("%s: not renaming \"%s%s\", segment is not complete\n"),
progname, current_walfile_name, partial_suffix); progname, current_walfile_name, stream->partial_suffix);
/* /*
* Mark file as archived if requested by the caller - pg_basebackup needs * Mark file as archived if requested by the caller - pg_basebackup needs
...@@ -245,10 +235,10 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don ...@@ -245,10 +235,10 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
* new node. This is in line with walreceiver.c always doing a * new node. This is in line with walreceiver.c always doing a
* XLogArchiveForceDone() after a complete segment. * XLogArchiveForceDone() after a complete segment.
*/ */
if (currpos == XLOG_SEG_SIZE && mark_done) if (currpos == XLOG_SEG_SIZE && stream->mark_done)
{ {
/* writes error message if failed */ /* writes error message if failed */
if (!mark_file_as_archived(basedir, current_walfile_name)) if (!mark_file_as_archived(stream->basedir, current_walfile_name))
return false; return false;
} }
...@@ -261,7 +251,7 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don ...@@ -261,7 +251,7 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_don
* Check if a timeline history file exists. * Check if a timeline history file exists.
*/ */
static bool static bool
existsTimeLineHistoryFile(char *basedir, TimeLineID tli) existsTimeLineHistoryFile(StreamCtl *stream)
{ {
char path[MAXPGPATH]; char path[MAXPGPATH];
char histfname[MAXFNAMELEN]; char histfname[MAXFNAMELEN];
...@@ -271,12 +261,12 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli) ...@@ -271,12 +261,12 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
* Timeline 1 never has a history file. We treat that as if it existed, * Timeline 1 never has a history file. We treat that as if it existed,
* since we never need to stream it. * since we never need to stream it.
*/ */
if (tli == 1) if (stream->timeline == 1)
return true; return true;
TLHistoryFileName(histfname, tli); TLHistoryFileName(histfname, stream->timeline);
snprintf(path, sizeof(path), "%s/%s", basedir, histfname); snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
fd = open(path, O_RDONLY | PG_BINARY, 0); fd = open(path, O_RDONLY | PG_BINARY, 0);
if (fd < 0) if (fd < 0)
...@@ -294,8 +284,7 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli) ...@@ -294,8 +284,7 @@ existsTimeLineHistoryFile(char *basedir, TimeLineID tli)
} }
static bool static bool
writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
char *content, bool mark_done)
{ {
int size = strlen(content); int size = strlen(content);
char path[MAXPGPATH]; char path[MAXPGPATH];
...@@ -307,15 +296,15 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, ...@@ -307,15 +296,15 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
* Check that the server's idea of how timeline history files should be * Check that the server's idea of how timeline history files should be
* named matches ours. * named matches ours.
*/ */
TLHistoryFileName(histfname, tli); TLHistoryFileName(histfname, stream->timeline);
if (strcmp(histfname, filename) != 0) if (strcmp(histfname, filename) != 0)
{ {
fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"), fprintf(stderr, _("%s: server reported unexpected history file name for timeline %u: %s\n"),
progname, tli, filename); progname, stream->timeline, filename);
return false; return false;
} }
snprintf(path, sizeof(path), "%s/%s", basedir, histfname); snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
/* /*
* Write into a temp file name. * Write into a temp file name.
...@@ -375,10 +364,10 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, ...@@ -375,10 +364,10 @@ writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
} }
/* Maintain archive_status, check close_walfile() for details. */ /* Maintain archive_status, check close_walfile() for details. */
if (mark_done) if (stream->mark_done)
{ {
/* writes error message if failed */ /* writes error message if failed */
if (!mark_file_as_archived(basedir, histfname)) if (!mark_file_as_archived(stream->basedir, histfname))
return false; return false;
} }
...@@ -468,6 +457,8 @@ CheckServerVersionForStreaming(PGconn *conn) ...@@ -468,6 +457,8 @@ CheckServerVersionForStreaming(PGconn *conn)
/* /*
* Receive a log stream starting at the specified position. * Receive a log stream starting at the specified position.
* *
* Individual parameters are passed through the StreamCtl structure.
*
* If sysidentifier is specified, validate that both the system * If sysidentifier is specified, validate that both the system
* identifier and the timeline matches the specified ones * identifier and the timeline matches the specified ones
* (by sending an extra IDENTIFY_SYSTEM command) * (by sending an extra IDENTIFY_SYSTEM command)
...@@ -498,11 +489,7 @@ CheckServerVersionForStreaming(PGconn *conn) ...@@ -498,11 +489,7 @@ CheckServerVersionForStreaming(PGconn *conn)
* Note: The log position *must* be at a log segment start! * Note: The log position *must* be at a log segment start!
*/ */
bool bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
bool synchronous, bool mark_done)
{ {
char query[128]; char query[128];
char slotcmd[128]; char slotcmd[128];
...@@ -539,7 +526,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -539,7 +526,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
slotcmd[0] = 0; slotcmd[0] = 0;
} }
if (sysidentifier != NULL) if (stream->sysidentifier != NULL)
{ {
/* Validate system identifier hasn't changed */ /* Validate system identifier hasn't changed */
res = PQexec(conn, "IDENTIFY_SYSTEM"); res = PQexec(conn, "IDENTIFY_SYSTEM");
...@@ -559,7 +546,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -559,7 +546,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res); PQclear(res);
return false; return false;
} }
if (strcmp(sysidentifier, PQgetvalue(res, 0, 0)) != 0) if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
{ {
fprintf(stderr, fprintf(stderr,
_("%s: system identifier does not match between base backup and streaming connection\n"), _("%s: system identifier does not match between base backup and streaming connection\n"),
...@@ -567,11 +554,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -567,11 +554,11 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res); PQclear(res);
return false; return false;
} }
if (timeline > atoi(PQgetvalue(res, 0, 1))) if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
{ {
fprintf(stderr, fprintf(stderr,
_("%s: starting timeline %u is not present in the server\n"), _("%s: starting timeline %u is not present in the server\n"),
progname, timeline); progname, stream->timeline);
PQclear(res); PQclear(res);
return false; return false;
} }
...@@ -582,7 +569,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -582,7 +569,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* initialize flush position to starting point, it's the caller's * initialize flush position to starting point, it's the caller's
* responsibility that that's sane. * responsibility that that's sane.
*/ */
lastFlushPosition = startpos; lastFlushPosition = stream->startpos;
while (1) while (1)
{ {
...@@ -590,9 +577,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -590,9 +577,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Fetch the timeline history file for this timeline, if we don't have * Fetch the timeline history file for this timeline, if we don't have
* it already. * it already.
*/ */
if (!existsTimeLineHistoryFile(basedir, timeline)) if (!existsTimeLineHistoryFile(stream))
{ {
snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", timeline); snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
res = PQexec(conn, query); res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_TUPLES_OK) if (PQresultStatus(res) != PGRES_TUPLES_OK)
{ {
...@@ -615,10 +602,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -615,10 +602,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
} }
/* Write the history file to disk */ /* Write the history file to disk */
writeTimeLineHistoryFile(basedir, timeline, writeTimeLineHistoryFile(stream,
PQgetvalue(res, 0, 0), PQgetvalue(res, 0, 0),
PQgetvalue(res, 0, 1), PQgetvalue(res, 0, 1));
mark_done);
PQclear(res); PQclear(res);
} }
...@@ -627,14 +613,14 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -627,14 +613,14 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Before we start streaming from the requested location, check if the * Before we start streaming from the requested location, check if the
* callback tells us to stop here. * callback tells us to stop here.
*/ */
if (stream_stop(startpos, timeline, false)) if (stream->stream_stop(stream->startpos, stream->timeline, false))
return true; return true;
/* Initiate the replication stream at specified location */ /* Initiate the replication stream at specified location */
snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u", snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
slotcmd, slotcmd,
(uint32) (startpos >> 32), (uint32) startpos, (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
timeline); stream->timeline);
res = PQexec(conn, query); res = PQexec(conn, query);
if (PQresultStatus(res) != PGRES_COPY_BOTH) if (PQresultStatus(res) != PGRES_COPY_BOTH)
{ {
...@@ -646,9 +632,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -646,9 +632,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
PQclear(res); PQclear(res);
/* Stream the WAL */ /* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, res = HandleCopyStream(conn, stream, &stoppos);
standby_message_timeout, partial_suffix,
&stoppos, synchronous, mark_done);
if (res == NULL) if (res == NULL)
goto error; goto error;
...@@ -676,26 +660,26 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -676,26 +660,26 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
uint32 newtimeline; uint32 newtimeline;
bool parsed; bool parsed;
parsed = ReadEndOfStreamingResult(res, &startpos, &newtimeline); parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);
PQclear(res); PQclear(res);
if (!parsed) if (!parsed)
goto error; goto error;
/* Sanity check the values the server gave us */ /* Sanity check the values the server gave us */
if (newtimeline <= timeline) if (newtimeline <= stream->timeline)
{ {
fprintf(stderr, fprintf(stderr,
_("%s: server reported unexpected next timeline %u, following timeline %u\n"), _("%s: server reported unexpected next timeline %u, following timeline %u\n"),
progname, newtimeline, timeline); progname, newtimeline, stream->timeline);
goto error; goto error;
} }
if (startpos > stoppos) if (stream->startpos > stoppos)
{ {
fprintf(stderr, fprintf(stderr,
_("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"), _("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X\n"),
progname, progname,
timeline, (uint32) (stoppos >> 32), (uint32) stoppos, stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,
newtimeline, (uint32) (startpos >> 32), (uint32) startpos); newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);
goto error; goto error;
} }
...@@ -715,8 +699,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -715,8 +699,8 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Loop back to start streaming from the new timeline. Always * Loop back to start streaming from the new timeline. Always
* start streaming at the beginning of a segment. * start streaming at the beginning of a segment.
*/ */
timeline = newtimeline; stream->timeline = newtimeline;
startpos = startpos - (startpos % XLOG_SEG_SIZE); stream->startpos = stream->startpos - (stream->startpos % XLOG_SEG_SIZE);
continue; continue;
} }
else if (PQresultStatus(res) == PGRES_COMMAND_OK) else if (PQresultStatus(res) == PGRES_COMMAND_OK)
...@@ -729,7 +713,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -729,7 +713,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* Check if the callback thinks it's OK to stop here. If not, * Check if the callback thinks it's OK to stop here. If not,
* complain. * complain.
*/ */
if (stream_stop(stoppos, timeline, false)) if (stream->stream_stop(stoppos, stream->timeline, false))
return true; return true;
else else
{ {
...@@ -810,14 +794,12 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline) ...@@ -810,14 +794,12 @@ ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
* On any other sort of error, returns NULL. * On any other sort of error, returns NULL.
*/ */
static PGresult * static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, HandleCopyStream(PGconn *conn, StreamCtl *stream,
char *basedir, stream_stop_callback stream_stop, XLogRecPtr *stoppos)
int standby_message_timeout, char *partial_suffix,
XLogRecPtr *stoppos, bool synchronous, bool mark_done)
{ {
char *copybuf = NULL; char *copybuf = NULL;
int64 last_status = -1; int64 last_status = -1;
XLogRecPtr blockpos = startpos; XLogRecPtr blockpos = stream->startpos;
still_sending = true; still_sending = true;
...@@ -830,9 +812,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -830,9 +812,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* /*
* Check if we should continue streaming, or abort at this point. * Check if we should continue streaming, or abort at this point.
*/ */
if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
stream_stop, partial_suffix, stoppos,
mark_done))
goto error; goto error;
now = feGetCurrentTimestamp(); now = feGetCurrentTimestamp();
...@@ -841,7 +821,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -841,7 +821,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
* If synchronous option is true, issue sync command as soon as there * If synchronous option is true, issue sync command as soon as there
* are WAL data which has not been flushed yet. * are WAL data which has not been flushed yet.
*/ */
if (synchronous && lastFlushPosition < blockpos && walfile != -1) if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1)
{ {
if (fsync(walfile) != 0) if (fsync(walfile) != 0)
{ {
...@@ -863,9 +843,9 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -863,9 +843,9 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* /*
* Potentially send a status message to the master * Potentially send a status message to the master
*/ */
if (still_sending && standby_message_timeout > 0 && if (still_sending && stream->standby_message_timeout > 0 &&
feTimestampDifferenceExceeds(last_status, now, feTimestampDifferenceExceeds(last_status, now,
standby_message_timeout)) stream->standby_message_timeout))
{ {
/* Time to send feedback! */ /* Time to send feedback! */
if (!sendFeedback(conn, blockpos, now, false)) if (!sendFeedback(conn, blockpos, now, false))
...@@ -876,7 +856,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -876,7 +856,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* /*
* Calculate how long send/receive loops should sleep * Calculate how long send/receive loops should sleep
*/ */
sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout, sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,
last_status); last_status);
r = CopyStreamReceive(conn, sleeptime, &copybuf); r = CopyStreamReceive(conn, sleeptime, &copybuf);
...@@ -886,9 +866,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -886,9 +866,7 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
goto error; goto error;
if (r == -2) if (r == -2)
{ {
PGresult *res = HandleEndOfCopyStream(conn, copybuf, blockpos, PGresult *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);
basedir, partial_suffix,
stoppos, mark_done);
if (res == NULL) if (res == NULL)
goto error; goto error;
...@@ -905,18 +883,14 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, ...@@ -905,18 +883,14 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
} }
else if (copybuf[0] == 'w') else if (copybuf[0] == 'w')
{ {
if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos, if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))
timeline, basedir, stream_stop,
partial_suffix, mark_done))
goto error; goto error;
/* /*
* Check if we should continue streaming, or abort at this * Check if we should continue streaming, or abort at this
* point. * point.
*/ */
if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir, if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))
stream_stop, partial_suffix, stoppos,
mark_done))
goto error; goto error;
} }
else else
...@@ -1114,10 +1088,8 @@ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len, ...@@ -1114,10 +1088,8 @@ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
* Process XLogData message. * Process XLogData message.
*/ */
static bool static bool
ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
XLogRecPtr *blockpos, uint32 timeline, XLogRecPtr *blockpos)
char *basedir, stream_stop_callback stream_stop,
char *partial_suffix, bool mark_done)
{ {
int xlogoff; int xlogoff;
int bytes_left; int bytes_left;
...@@ -1197,8 +1169,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, ...@@ -1197,8 +1169,7 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
if (walfile == -1) if (walfile == -1)
{ {
if (!open_walfile(*blockpos, timeline, if (!open_walfile(stream, *blockpos))
basedir, partial_suffix))
{ {
/* Error logged by open_walfile */ /* Error logged by open_walfile */
return false; return false;
...@@ -1225,13 +1196,13 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, ...@@ -1225,13 +1196,13 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
/* Did we reach the end of a WAL segment? */ /* Did we reach the end of a WAL segment? */
if (*blockpos % XLOG_SEG_SIZE == 0) if (*blockpos % XLOG_SEG_SIZE == 0)
{ {
if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done)) if (!close_walfile(stream, *blockpos))
/* Error message written in close_walfile() */ /* Error message written in close_walfile() */
return false; return false;
xlogoff = 0; xlogoff = 0;
if (still_sending && stream_stop(*blockpos, timeline, true)) if (still_sending && stream->stream_stop(*blockpos, stream->timeline, true))
{ {
if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
{ {
...@@ -1253,9 +1224,8 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len, ...@@ -1253,9 +1224,8 @@ ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
* Handle end of the copy stream. * Handle end of the copy stream.
*/ */
static PGresult * static PGresult *
HandleEndOfCopyStream(PGconn *conn, char *copybuf, HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
XLogRecPtr blockpos, char *basedir, char *partial_suffix, XLogRecPtr blockpos, XLogRecPtr *stoppos)
XLogRecPtr *stoppos, bool mark_done)
{ {
PGresult *res = PQgetResult(conn); PGresult *res = PQgetResult(conn);
...@@ -1266,7 +1236,7 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf, ...@@ -1266,7 +1236,7 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
*/ */
if (still_sending) if (still_sending)
{ {
if (!close_walfile(basedir, partial_suffix, blockpos, mark_done)) if (!close_walfile(stream, blockpos))
{ {
/* Error message written in close_walfile() */ /* Error message written in close_walfile() */
PQclear(res); PQclear(res);
...@@ -1296,13 +1266,12 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf, ...@@ -1296,13 +1266,12 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
* Check if we should continue streaming, or abort at this point. * Check if we should continue streaming, or abort at this point.
*/ */
static bool static bool
CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline, CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
char *basedir, stream_stop_callback stream_stop, XLogRecPtr *stoppos)
char *partial_suffix, XLogRecPtr *stoppos, bool mark_done)
{ {
if (still_sending && stream_stop(blockpos, timeline, false)) if (still_sending && stream->stream_stop(blockpos, stream->timeline, false))
{ {
if (!close_walfile(basedir, partial_suffix, blockpos, mark_done)) if (!close_walfile(stream, blockpos))
{ {
/* Potential error message is written by close_walfile */ /* Potential error message is written by close_walfile */
return false; return false;
......
...@@ -22,16 +22,31 @@ ...@@ -22,16 +22,31 @@
*/ */
typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished); typedef bool (*stream_stop_callback) (XLogRecPtr segendpos, uint32 timeline, bool segment_finished);
/*
* Global parameters when receiving xlog stream. For details about the individual fields,
* see the function comment for ReceiveXlogStream().
*/
typedef struct StreamCtl
{
XLogRecPtr startpos; /* Start position for streaming */
TimeLineID timeline; /* Timeline to stream data from */
char *sysidentifier; /* Validate this system identifier and
* timeline */
int standby_message_timeout; /* Send status messages this
* often */
bool synchronous; /* Flush data on write */
bool mark_done; /* Mark segment as done in generated archive */
stream_stop_callback stream_stop; /* Stop streaming when returns true */
char *basedir; /* Received segments written to this dir */
char *partial_suffix; /* Suffix appended to partially received files */
} StreamCtl;
extern bool CheckServerVersionForStreaming(PGconn *conn); extern bool CheckServerVersionForStreaming(PGconn *conn);
extern bool ReceiveXlogStream(PGconn *conn, extern bool ReceiveXlogStream(PGconn *conn,
XLogRecPtr startpos, StreamCtl *stream);
uint32 timeline,
char *sysidentifier,
char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout,
char *partial_suffix,
bool synchronous,
bool mark_done);
#endif /* RECEIVELOG_H */ #endif /* RECEIVELOG_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment