Commit c4c22747 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Fix bugs in cascading replication with recovery_target_timeline='latest'

The cascading replication code assumed that the current RecoveryTargetTLI
never changes, but that's not true with recovery_target_timeline='latest'.
The obvious upshot of that is that RecoveryTargetTLI in shared memory needs
to be protected by a lock. A less obvious consequence is that when a
cascading standby is connected, and the standby switches to a new target
timeline after scanning the archive, it will continue to stream WAL to the
cascading standby, but from a wrong file, ie. the file of the previous
timeline. For example, if the standby is currently streaming from the middle
of file 000000010000000000000005, and the timeline changes, the standby
will continue to stream from that file. However, the WAL on the new
timeline is in file 000000020000000000000005, so the standby sends garbage
from 000000010000000000000005 to the cascading standby, instead of the
correct WAL from file 000000020000000000000005.

This also fixes a related bug where a partial WAL segment is restored from
the archive and streamed to a cascading standby. The code assumed that when
a WAL segment is copied from the archive, it can immediately be fully
streamed to a cascading standby. However, if the segment is only partially
filled, ie. has the right size, but only N first bytes contain valid WAL,
that's not safe. That can happen if a partial WAL segment is manually copied
to the archive, or if a partial WAL segment is archived because a server is
started up on a new timeline within that segment. The cascading standby will
get confused if the WAL it received is not valid, and will get stuck until
it's restarted. This patch fixes that problem by not allowing WAL restored
from the archive to be streamed to a cascading standby until it's been
replayed, and thus validated.
parent cdf91edb
...@@ -407,7 +407,6 @@ typedef struct XLogCtlData ...@@ -407,7 +407,6 @@ typedef struct XLogCtlData
XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */
int XLogCacheBlck; /* highest allocated xlog buffer index */ int XLogCacheBlck; /* highest allocated xlog buffer index */
TimeLineID ThisTimeLineID; TimeLineID ThisTimeLineID;
TimeLineID RecoveryTargetTLI;
/* /*
* archiveCleanupCommand is read from recovery.conf but needs to be in * archiveCleanupCommand is read from recovery.conf but needs to be in
...@@ -456,14 +455,14 @@ typedef struct XLogCtlData ...@@ -456,14 +455,14 @@ typedef struct XLogCtlData
XLogRecPtr recoveryLastRecPtr; XLogRecPtr recoveryLastRecPtr;
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */ /* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
TimestampTz recoveryLastXTime; TimestampTz recoveryLastXTime;
/* current effective recovery target timeline */
TimeLineID RecoveryTargetTLI;
/* /*
* timestamp of when we started replaying the current chunk of WAL data, * timestamp of when we started replaying the current chunk of WAL data,
* only relevant for replication or archive recovery * only relevant for replication or archive recovery
*/ */
TimestampTz currentChunkStartTime; TimestampTz currentChunkStartTime;
/* end of the last record restored from the archive */
XLogRecPtr restoreLastRecPtr;
/* Are we requested to pause recovery? */ /* Are we requested to pause recovery? */
bool recoveryPause; bool recoveryPause;
...@@ -2817,18 +2816,6 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, ...@@ -2817,18 +2816,6 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
if (reload) if (reload)
WalSndRqstFileReload(); WalSndRqstFileReload();
/*
* Calculate the end location of the restored WAL file and save it in
* shmem. It's used as current standby flush position, and cascading
* walsenders try to send WAL records up to this location.
*/
XLogSegNoOffsetToRecPtr(segno, 0, endptr);
XLByteAdvance(endptr, XLogSegSize);
SpinLockAcquire(&xlogctl->info_lck);
xlogctl->restoreLastRecPtr = endptr;
SpinLockRelease(&xlogctl->info_lck);
/* Signal walsender that new WAL has arrived */ /* Signal walsender that new WAL has arrived */
if (AllowCascadeReplication()) if (AllowCascadeReplication())
WalSndWakeup(); WalSndWakeup();
...@@ -4470,12 +4457,17 @@ rescanLatestTimeLine(void) ...@@ -4470,12 +4457,17 @@ rescanLatestTimeLine(void)
ThisTimeLineID))); ThisTimeLineID)));
else else
{ {
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
/* Switch target */ /* Switch target */
recoveryTargetTLI = newtarget; recoveryTargetTLI = newtarget;
list_free(expectedTLIs); list_free(expectedTLIs);
expectedTLIs = newExpectedTLIs; expectedTLIs = newExpectedTLIs;
XLogCtl->RecoveryTargetTLI = recoveryTargetTLI; SpinLockAcquire(&xlogctl->info_lck);
xlogctl->RecoveryTargetTLI = recoveryTargetTLI;
SpinLockRelease(&xlogctl->info_lck);
ereport(LOG, ereport(LOG,
(errmsg("new target timeline is %u", (errmsg("new target timeline is %u",
...@@ -7513,13 +7505,20 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch) ...@@ -7513,13 +7505,20 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
} }
/* /*
* GetRecoveryTargetTLI - get the recovery target timeline ID * GetRecoveryTargetTLI - get the current recovery target timeline ID
*/ */
TimeLineID TimeLineID
GetRecoveryTargetTLI(void) GetRecoveryTargetTLI(void)
{ {
/* RecoveryTargetTLI doesn't change so we need no lock to copy it */ /* use volatile pointer to prevent code rearrangement */
return XLogCtl->RecoveryTargetTLI; volatile XLogCtlData *xlogctl = XLogCtl;
TimeLineID result;
SpinLockAcquire(&xlogctl->info_lck);
result = xlogctl->RecoveryTargetTLI;
SpinLockRelease(&xlogctl->info_lck);
return result;
} }
/* /*
...@@ -8309,7 +8308,7 @@ CreateRestartPoint(int flags) ...@@ -8309,7 +8308,7 @@ CreateRestartPoint(int flags)
XLogRecPtr endptr; XLogRecPtr endptr;
/* Get the current (or recent) end of xlog */ /* Get the current (or recent) end of xlog */
endptr = GetStandbyFlushRecPtr(); endptr = GetStandbyFlushRecPtr(NULL);
KeepLogSeg(endptr, &_logSegNo); KeepLogSeg(endptr, &_logSegNo);
_logSegNo--; _logSegNo--;
...@@ -9818,14 +9817,13 @@ do_pg_abort_backup(void) ...@@ -9818,14 +9817,13 @@ do_pg_abort_backup(void)
/* /*
* Get latest redo apply position. * Get latest redo apply position.
* *
* Optionally, returns the end byte position of the last restored * Optionally, returns the current recovery target timeline. Callers not
* WAL segment. Callers not interested in that value may pass * interested in that may pass NULL for targetTLI.
* NULL for restoreLastRecPtr.
* *
* Exported to allow WALReceiver to read the pointer directly. * Exported to allow WALReceiver to read the pointer directly.
*/ */
XLogRecPtr XLogRecPtr
GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) GetXLogReplayRecPtr(TimeLineID *targetTLI)
{ {
/* use volatile pointer to prevent code rearrangement */ /* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl; volatile XLogCtlData *xlogctl = XLogCtl;
...@@ -9833,8 +9831,8 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) ...@@ -9833,8 +9831,8 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
SpinLockAcquire(&xlogctl->info_lck); SpinLockAcquire(&xlogctl->info_lck);
recptr = xlogctl->recoveryLastRecPtr; recptr = xlogctl->recoveryLastRecPtr;
if (restoreLastRecPtr) if (targetTLI)
*restoreLastRecPtr = xlogctl->restoreLastRecPtr; *targetTLI = xlogctl->RecoveryTargetTLI;
SpinLockRelease(&xlogctl->info_lck); SpinLockRelease(&xlogctl->info_lck);
return recptr; return recptr;
...@@ -9843,21 +9841,23 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr) ...@@ -9843,21 +9841,23 @@ GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr)
/* /*
* Get current standby flush position, ie, the last WAL position * Get current standby flush position, ie, the last WAL position
* known to be fsync'd to disk in standby. * known to be fsync'd to disk in standby.
*
* If 'targetTLI' is not NULL, it's set to the current recovery target
* timeline.
*/ */
XLogRecPtr XLogRecPtr
GetStandbyFlushRecPtr(void) GetStandbyFlushRecPtr(TimeLineID *targetTLI)
{ {
XLogRecPtr receivePtr; XLogRecPtr receivePtr;
XLogRecPtr replayPtr; XLogRecPtr replayPtr;
XLogRecPtr restorePtr;
receivePtr = GetWalRcvWriteRecPtr(NULL); receivePtr = GetWalRcvWriteRecPtr(NULL);
replayPtr = GetXLogReplayRecPtr(&restorePtr); replayPtr = GetXLogReplayRecPtr(targetTLI);
if (XLByteLT(receivePtr, replayPtr)) if (XLByteLT(receivePtr, replayPtr))
return XLByteLT(replayPtr, restorePtr) ? restorePtr : replayPtr; return replayPtr;
else else
return XLByteLT(receivePtr, restorePtr) ? restorePtr : receivePtr; return receivePtr;
} }
/* /*
......
...@@ -303,7 +303,7 @@ IdentifySystem(void) ...@@ -303,7 +303,7 @@ IdentifySystem(void)
GetSystemIdentifier()); GetSystemIdentifier());
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr(); logptr = am_cascading_walsender ? GetStandbyFlushRecPtr(NULL) : GetInsertRecPtr();
snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr); snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
...@@ -1137,7 +1137,31 @@ XLogSend(char *msgbuf, bool *caughtup) ...@@ -1137,7 +1137,31 @@ XLogSend(char *msgbuf, bool *caughtup)
* subsequently crashes and restarts, slaves must not have applied any WAL * subsequently crashes and restarts, slaves must not have applied any WAL
* that gets lost on the master. * that gets lost on the master.
*/ */
SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr(); if (am_cascading_walsender)
{
TimeLineID currentTargetTLI;
SendRqstPtr = GetStandbyFlushRecPtr(&currentTargetTLI);
/*
* If the recovery target timeline changed, bail out. It's a bit
* unfortunate that we have to just disconnect, but there is no way
* to tell the client that the timeline changed. We also don't know
* exactly where the switch happened, so we cannot safely try to send
* up to the switchover point before disconnecting.
*/
if (currentTargetTLI != ThisTimeLineID)
{
if (!walsender_ready_to_stop)
ereport(LOG,
(errmsg("terminating walsender process to force cascaded standby "
"to update timeline and reconnect")));
walsender_ready_to_stop = true;
*caughtup = true;
return;
}
}
else
SendRqstPtr = GetFlushRecPtr();
/* Quick exit if nothing to do */ /* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr)) if (XLByteLE(SendRqstPtr, sentPtr))
......
...@@ -285,8 +285,8 @@ extern bool RecoveryInProgress(void); ...@@ -285,8 +285,8 @@ extern bool RecoveryInProgress(void);
extern bool HotStandbyActive(void); extern bool HotStandbyActive(void);
extern bool XLogInsertAllowed(void); extern bool XLogInsertAllowed(void);
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream); extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
extern XLogRecPtr GetXLogReplayRecPtr(XLogRecPtr *restoreLastRecPtr); extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *targetTLI);
extern XLogRecPtr GetStandbyFlushRecPtr(void); extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *targetTLI);
extern XLogRecPtr GetXLogInsertRecPtr(void); extern XLogRecPtr GetXLogInsertRecPtr(void);
extern XLogRecPtr GetXLogWriteRecPtr(void); extern XLogRecPtr GetXLogWriteRecPtr(void);
extern bool RecoveryIsPaused(void); extern bool RecoveryIsPaused(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment