Commit 1ff92eea authored by Heikki Linnakangas's avatar Heikki Linnakangas

Fix sloppiness in the timeline switch over streaming replication patch.

Here's another attempt at fixing the logic that decides how far the WAL can
be streamed, which was still broken if the timeline changed while streaming.
You would get an assertion failure. The way the logic is now written is more
readable, too.

Thom Brown reported the assertion failure.
parent 36e4456d
...@@ -169,7 +169,7 @@ static void WalSndLoop(void); ...@@ -169,7 +169,7 @@ static void WalSndLoop(void);
static void InitWalSenderSlot(void); static void InitWalSenderSlot(void);
static void WalSndKill(int code, Datum arg); static void WalSndKill(int code, Datum arg);
static void XLogSend(bool *caughtup); static void XLogSend(bool *caughtup);
static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID currentTLI); static XLogRecPtr GetStandbyFlushRecPtr(void);
static void IdentifySystem(void); static void IdentifySystem(void);
static void StartReplication(StartReplicationCmd *cmd); static void StartReplication(StartReplicationCmd *cmd);
static void ProcessStandbyMessage(void); static void ProcessStandbyMessage(void);
...@@ -250,7 +250,7 @@ IdentifySystem(void) ...@@ -250,7 +250,7 @@ IdentifySystem(void)
if (am_cascading_walsender) if (am_cascading_walsender)
{ {
/* this also updates ThisTimeLineID */ /* this also updates ThisTimeLineID */
logptr = GetStandbyFlushRecPtr(0); logptr = GetStandbyFlushRecPtr();
} }
else else
logptr = GetInsertRecPtr(); logptr = GetInsertRecPtr();
...@@ -423,7 +423,7 @@ StartReplication(StartReplicationCmd *cmd) ...@@ -423,7 +423,7 @@ StartReplication(StartReplicationCmd *cmd)
if (am_cascading_walsender) if (am_cascading_walsender)
{ {
/* this also updates ThisTimeLineID */ /* this also updates ThisTimeLineID */
FlushPtr = GetStandbyFlushRecPtr(0); FlushPtr = GetStandbyFlushRecPtr();
} }
else else
FlushPtr = GetFlushRecPtr(); FlushPtr = GetFlushRecPtr();
...@@ -1310,7 +1310,6 @@ static void ...@@ -1310,7 +1310,6 @@ static void
XLogSend(bool *caughtup) XLogSend(bool *caughtup)
{ {
XLogRecPtr SendRqstPtr; XLogRecPtr SendRqstPtr;
XLogRecPtr FlushPtr;
XLogRecPtr startptr; XLogRecPtr startptr;
XLogRecPtr endptr; XLogRecPtr endptr;
Size nbytes; Size nbytes;
...@@ -1321,33 +1320,39 @@ XLogSend(bool *caughtup) ...@@ -1321,33 +1320,39 @@ XLogSend(bool *caughtup)
return; return;
} }
/* Figure out how far we can safely send the WAL. */
if (sendTimeLineIsHistoric)
{
/* /*
* Attempt to send all data that's already been written out and fsync'd to * Streaming an old timeline timeline that's in this server's history,
* disk. We cannot go further than what's been written out given the * but is not the one we're currently inserting or replaying. It can
* current implementation of XLogRead(). And in any case it's unsafe to * be streamed up to the point where we switched off that timeline.
* send WAL that is not securely down to disk on the master: if the master
* subsequently crashes and restarts, slaves must not have applied any WAL
* that gets lost on the master.
*/ */
if (am_cascading_walsender) SendRqstPtr = sendTimeLineValidUpto;
FlushPtr = GetStandbyFlushRecPtr(sendTimeLine); }
else else if (am_cascading_walsender)
FlushPtr = GetFlushRecPtr(); {
/* /*
* In a cascading standby, the current recovery target timeline can * Streaming the latest timeline on a standby.
* change, or we can be promoted. In either case, the current timeline *
* becomes historic. We need to detect that so that we don't try to stream * Attempt to send all WAL that has already been replayed, so that
* past the point where we switched to another timeline. It's checked * we know it's valid. If we're receiving WAL through streaming
* after calculating FlushPtr, to avoid a race condition: if the timeline * replication, it's also OK to send any WAL that has been received
* becomes historic just after we checked that it was still current, it * but not replayed.
* should still be OK to stream it up to the FlushPtr that was calculated *
* before it became historic. * The timeline we're recovering from can change, or we can be
* promoted. In either case, the current timeline becomes historic.
* We need to detect that so that we don't try to stream past the
* point where we switched to another timeline. We check for promotion
* or timeline switch after calculating FlushPtr, to avoid a race
* condition: if the timeline becomes historic just after we checked
* that it was still current, it's still be OK to stream it up to the
* FlushPtr that was calculated before it became historic.
*/ */
if (!sendTimeLineIsHistoric && am_cascading_walsender)
{
bool becameHistoric = false; bool becameHistoric = false;
SendRqstPtr = GetStandbyFlushRecPtr();
if (!RecoveryInProgress()) if (!RecoveryInProgress())
{ {
/* /*
...@@ -1361,7 +1366,8 @@ XLogSend(bool *caughtup) ...@@ -1361,7 +1366,8 @@ XLogSend(bool *caughtup)
{ {
/* /*
* Still a cascading standby. But is the timeline we're sending * Still a cascading standby. But is the timeline we're sending
* still the one recovery is recovering from? * still the one recovery is recovering from? ThisTimeLineID was
* updated by the GetStandbyFlushRecPtr() call above.
*/ */
if (sendTimeLine != ThisTimeLineID) if (sendTimeLine != ThisTimeLineID)
becameHistoric = true; becameHistoric = true;
...@@ -1391,7 +1397,23 @@ XLogSend(bool *caughtup) ...@@ -1391,7 +1397,23 @@ XLogSend(bool *caughtup)
(uint32) sentPtr); (uint32) sentPtr);
sendTimeLineIsHistoric = true; sendTimeLineIsHistoric = true;
SendRqstPtr = sendTimeLineValidUpto;
}
} }
else
{
/*
* Streaming the current timeline on a master.
*
* Attempt to send all data that's already been written out and
* fsync'd to disk. We cannot go further than what's been written out
* given the current implementation of XLogRead(). And in any case
* it's unsafe to send WAL that is not securely down to disk on the
* master: if the master subsequently crashes and restarts, slaves
* must not have applied any WAL that gets lost on the master.
*/
SendRqstPtr = GetFlushRecPtr();
} }
/* /*
...@@ -1413,15 +1435,7 @@ XLogSend(bool *caughtup) ...@@ -1413,15 +1435,7 @@ XLogSend(bool *caughtup)
return; return;
} }
/* /* Do we have any work to do? */
* Stream up to the point known to be flushed to disk, or to the end of
* this timeline, whichever comes first.
*/
if (sendTimeLineIsHistoric && XLByteLT(sendTimeLineValidUpto, FlushPtr))
SendRqstPtr = sendTimeLineValidUpto;
else
SendRqstPtr = FlushPtr;
Assert(XLByteLE(sentPtr, SendRqstPtr)); Assert(XLByteLE(sentPtr, SendRqstPtr));
if (XLByteLE(SendRqstPtr, sentPtr)) if (XLByteLE(SendRqstPtr, sentPtr))
{ {
...@@ -1522,15 +1536,11 @@ XLogSend(bool *caughtup) ...@@ -1522,15 +1536,11 @@ XLogSend(bool *caughtup)
* can be sent to the standby. This should only be called when in recovery, * can be sent to the standby. This should only be called when in recovery,
* ie. we're streaming to a cascaded standby. * ie. we're streaming to a cascaded standby.
* *
* If currentTLI is non-zero, the function returns the point that the WAL on
* the given timeline has been flushed upto. If recovery has already switched
* to a different timeline, InvalidXLogRecPtr is returned.
*
* As a side-effect, ThisTimeLineID is updated to the TLI of the last * As a side-effect, ThisTimeLineID is updated to the TLI of the last
* replayed WAL record. * replayed WAL record.
*/ */
static XLogRecPtr static XLogRecPtr
GetStandbyFlushRecPtr(TimeLineID currentTLI) GetStandbyFlushRecPtr(void)
{ {
XLogRecPtr replayPtr; XLogRecPtr replayPtr;
TimeLineID replayTLI; TimeLineID replayTLI;
...@@ -1549,11 +1559,8 @@ GetStandbyFlushRecPtr(TimeLineID currentTLI) ...@@ -1549,11 +1559,8 @@ GetStandbyFlushRecPtr(TimeLineID currentTLI)
ThisTimeLineID = replayTLI; ThisTimeLineID = replayTLI;
if (currentTLI != replayTLI && currentTLI != 0)
return InvalidXLogRecPtr;
result = replayPtr; result = replayPtr;
if (receiveTLI == currentTLI && receivePtr > replayPtr) if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
result = receivePtr; result = receivePtr;
return result; return result;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment