Commit 6f7cddc7 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Now that START_REPLICATION returns the next timeline's ID after reaching end

of timeline, take advantage of that in walreceiver.

Startup process is still in control of choosign the target timeline, by
scanning the timeline history files present in pg_xlog, but walreceiver now
uses the next timeline's ID to fetch its history file immediately after it
has finished streaming the old timeline. Before, the standby would first try
to restart streaming on the old timeline, which fetches the missing timeline
history file as a side-effect, and only then restart from the new timeline.
This patch eliminates the extra iteration, which speeds up the timeline
switch and reduces the noise in the log caused by the extra restart on the
old timeline.
parent 2ff65553
......@@ -50,7 +50,7 @@ static void libpqrcv_connect(char *conninfo);
static void libpqrcv_identify_system(TimeLineID *primary_tli);
static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
static void libpqrcv_endstreaming(void);
static void libpqrcv_endstreaming(TimeLineID *next_tli);
static int libpqrcv_receive(int timeout, char **buffer);
static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
......@@ -199,10 +199,11 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint)
}
/*
* Stop streaming WAL data.
* Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
* reported by the server, or 0 if it did not report it.
*/
static void
libpqrcv_endstreaming(void)
libpqrcv_endstreaming(TimeLineID *next_tli)
{
PGresult *res;
......@@ -211,33 +212,42 @@ libpqrcv_endstreaming(void)
(errmsg("could not send end-of-streaming message to primary: %s",
PQerrorMessage(streamConn))));
/* Read the command result after COPY is finished */
while ((res = PQgetResult(streamConn)) != NULL)
/*
* After COPY is finished, we should receive a result set indicating the
* next timeline's ID, or just CommandComplete if the server was shut down.
*
* If we had not yet received CopyDone from the backend, PGRES_COPY_IN
* would also be possible. However, at the moment this function is only
* called after receiving CopyDone from the backend - the walreceiver
* never terminates replication on its own initiative.
*/
res = PQgetResult(streamConn);
if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
/*
* After Copy, if the streaming ended because we reached end of the
* timeline, server sends one result set with the next timeline's ID.
* We don't need it, so just slurp and ignore it.
*
* If we had not yet received CopyDone from the backend, PGRES_COPY_IN
* is also possible. However, at the moment this function is only
* called after receiving CopyDone from the backend - the walreceiver
* never terminates replication on its own initiative.
*/
switch (PQresultStatus(res))
{
case PGRES_COMMAND_OK:
case PGRES_TUPLES_OK:
break;
default:
ereport(ERROR,
(errmsg("error reading result of streaming command: %s",
PQerrorMessage(streamConn))));
}
/* Read the next timeline's ID */
if (PQnfields(res) != 1 || PQntuples(res) != 1)
ereport(ERROR,
(errmsg("unexpected result set after end-of-streaming")));
*next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
PQclear(res);
/* the result set should be followed by CommandComplete */
res = PQgetResult(streamConn);
}
else
*next_tli = 0;
if (PQresultStatus(res) != PGRES_COMMAND_OK)
ereport(ERROR,
(errmsg("error reading result of streaming command: %s",
PQerrorMessage(streamConn))));
/* Verify that there are no more results */
res = PQgetResult(streamConn);
if (res != NULL)
ereport(ERROR,
(errmsg("unexpected result after CommandComplete: %s",
PQerrorMessage(streamConn))));
}
/*
......
......@@ -505,8 +505,15 @@ WalReceiverMain(void)
* our side, too.
*/
EnableWalRcvImmediateExit();
walrcv_endstreaming();
walrcv_endstreaming(&primaryTLI);
DisableWalRcvImmediateExit();
/*
* If the server had switched to a new timeline that we didn't know
* about when we began streaming, fetch its timeline history file
* now.
*/
WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
}
else
ereport(LOG,
......
......@@ -128,7 +128,7 @@ extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistor
typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint);
extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
typedef void (*walrcv_endstreaming_type) (void);
typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli);
extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming;
typedef int (*walrcv_receive_type) (int timeout, char **buffer);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment