Commit 2a77355e authored by Heikki Linnakangas's avatar Heikki Linnakangas

Change the retry-loop in standby mode to also try restoring files from

pg_xlog directory. This is essential for replaying WAL records that
were streamed from the master, after a standby server restart.

If a corrupt record is seen in a file restored from the archive or
streamed from the master, log it as a WARNING and keep retrying. If the
corruption is permanent, and not just a glitch in the whatever copies the
files to the archive or a network error not caught by CRC checks in TCP
for example, we will keep retrying and logging the WARNING indefinitely.
But that's better than shutting down completely, the standby is still
useful for running read-only queries. In PITR the recovery ends at such a
corrupt record, which is a bit questionable, but that's the behavior we
had in previous releases and we don't feel like chaning it now. It does
make sense for tools like pg_standby.
parent feb5087a
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.384 2010/03/21 00:17:58 petere Exp $ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.385 2010/03/30 16:23:57 heikki Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -449,21 +449,34 @@ static uint32 openLogId = 0; ...@@ -449,21 +449,34 @@ static uint32 openLogId = 0;
static uint32 openLogSeg = 0; static uint32 openLogSeg = 0;
static uint32 openLogOff = 0; static uint32 openLogOff = 0;
/*
* Codes indicating where we got a WAL file from during recovery, or where
* to attempt to get one.
*/
#define XLOG_FROM_ARCHIVE (1<<0) /* Restored using restore_command */
#define XLOG_FROM_PG_XLOG (1<<1) /* Existing file in pg_xlog */
#define XLOG_FROM_STREAM (1<<2) /* Streamed from master */
/* /*
* These variables are used similarly to the ones above, but for reading * These variables are used similarly to the ones above, but for reading
* the XLOG. Note, however, that readOff generally represents the offset * the XLOG. Note, however, that readOff generally represents the offset
* of the page just read, not the seek position of the FD itself, which * of the page just read, not the seek position of the FD itself, which
* will be just past that page. readLen indicates how much of the current * will be just past that page. readLen indicates how much of the current
* page has been read into readBuf. * page has been read into readBuf, and readSource indicates where we got
* the currently open file from.
*/ */
static int readFile = -1; static int readFile = -1;
static uint32 readId = 0; static uint32 readId = 0;
static uint32 readSeg = 0; static uint32 readSeg = 0;
static uint32 readOff = 0; static uint32 readOff = 0;
static uint32 readLen = 0; static uint32 readLen = 0;
static int readSource = 0; /* XLOG_FROM_* code */
/* Is the currently open segment being streamed from primary? */ /*
static bool readStreamed = false; * Keeps track of which sources we've tried to read the current WAL
* record from and failed.
*/
static int failedSources = 0;
/* Buffer for currently read page (XLOG_BLCKSZ bytes) */ /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
static char *readBuf = NULL; static char *readBuf = NULL;
...@@ -517,11 +530,12 @@ static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, ...@@ -517,11 +530,12 @@ static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
bool find_free, int *max_advance, bool find_free, int *max_advance,
bool use_lock); bool use_lock);
static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
bool fromArchive, bool notexistOk); int source, bool notexistOk);
static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, static int XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode,
bool fromArchive); int sources);
static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, static bool XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
bool randAccess); bool randAccess);
static int emode_for_corrupt_record(int emode);
static void XLogFileClose(void); static void XLogFileClose(void);
static bool RestoreArchivedFile(char *path, const char *xlogfname, static bool RestoreArchivedFile(char *path, const char *xlogfname,
const char *recovername, off_t expectedSize); const char *recovername, off_t expectedSize);
...@@ -2573,7 +2587,7 @@ XLogFileOpen(uint32 log, uint32 seg) ...@@ -2573,7 +2587,7 @@ XLogFileOpen(uint32 log, uint32 seg)
*/ */
static int static int
XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
bool fromArchive, bool notfoundOk) int source, bool notfoundOk)
{ {
char xlogfname[MAXFNAMELEN]; char xlogfname[MAXFNAMELEN];
char activitymsg[MAXFNAMELEN + 16]; char activitymsg[MAXFNAMELEN + 16];
...@@ -2582,8 +2596,9 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, ...@@ -2582,8 +2596,9 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
XLogFileName(xlogfname, tli, log, seg); XLogFileName(xlogfname, tli, log, seg);
if (fromArchive) switch (source)
{ {
case XLOG_FROM_ARCHIVE:
/* Report recovery progress in PS display */ /* Report recovery progress in PS display */
snprintf(activitymsg, sizeof(activitymsg), "waiting for %s", snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
xlogfname); xlogfname);
...@@ -2594,11 +2609,15 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, ...@@ -2594,11 +2609,15 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
XLogSegSize); XLogSegSize);
if (!restoredFromArchive) if (!restoredFromArchive)
return -1; return -1;
} break;
else
{ case XLOG_FROM_PG_XLOG:
XLogFilePath(path, tli, log, seg); XLogFilePath(path, tli, log, seg);
restoredFromArchive = false; restoredFromArchive = false;
break;
default:
elog(ERROR, "invalid XLogFileRead source %d", source);
} }
fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
...@@ -2612,6 +2631,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, ...@@ -2612,6 +2631,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
xlogfname); xlogfname);
set_ps_display(activitymsg, false); set_ps_display(activitymsg, false);
readSource = source;
return fd; return fd;
} }
if (errno != ENOENT || !notfoundOk) /* unexpected failure? */ if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
...@@ -2630,7 +2650,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli, ...@@ -2630,7 +2650,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
* searched in pg_xlog if not found in archive. * searched in pg_xlog if not found in archive.
*/ */
static int static int
XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive) XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, int sources)
{ {
char path[MAXPGPATH]; char path[MAXPGPATH];
ListCell *cell; ListCell *cell;
...@@ -2653,20 +2673,19 @@ XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive) ...@@ -2653,20 +2673,19 @@ XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, bool fromArchive)
if (tli < curFileTLI) if (tli < curFileTLI)
break; /* don't bother looking at too-old TLIs */ break; /* don't bother looking at too-old TLIs */
fd = XLogFileRead(log, seg, emode, tli, fromArchive, true); if (sources & XLOG_FROM_ARCHIVE)
{
fd = XLogFileRead(log, seg, emode, tli, XLOG_FROM_ARCHIVE, true);
if (fd != -1) if (fd != -1)
{
elog(DEBUG1, "got WAL segment from archive");
return fd; return fd;
}
}
/* if (sources & XLOG_FROM_PG_XLOG)
* If not in StandbyMode, fall back to searching pg_xlog. In
* StandbyMode we're streaming segments from the primary to pg_xlog,
* and we mustn't confuse the (possibly partial) segments in pg_xlog
* with complete segments ready to be applied. We rather wait for the
* records to arrive through streaming.
*/
if (!StandbyMode && fromArchive)
{ {
fd = XLogFileRead(log, seg, emode, tli, false, true); fd = XLogFileRead(log, seg, emode, tli, XLOG_FROM_PG_XLOG, true);
if (fd != -1) if (fd != -1)
return fd; return fd;
} }
...@@ -3520,7 +3539,7 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) ...@@ -3520,7 +3539,7 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
* the returned record pointer always points there. * the returned record pointer always points there.
*/ */
static XLogRecord * static XLogRecord *
ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
{ {
XLogRecord *record; XLogRecord *record;
char *buffer; char *buffer;
...@@ -3530,17 +3549,6 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3530,17 +3549,6 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
total_len; total_len;
uint32 targetRecOff; uint32 targetRecOff;
uint32 pageHeaderSize; uint32 pageHeaderSize;
int emode;
/*
* We don't expect any invalid records during streaming recovery: we
* should never hit the end of WAL because we wait for it to be streamed.
* Therefore treat any broken WAL as PANIC, instead of failing over.
*/
if (StandbyMode)
emode = PANIC;
else
emode = emode_arg;
if (readBuf == NULL) if (readBuf == NULL)
{ {
...@@ -3593,6 +3601,9 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3593,6 +3601,9 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
randAccess = true; /* allow curFileTLI to go backwards too */ randAccess = true; /* allow curFileTLI to go backwards too */
} }
/* This is the first try to read this page. */
failedSources = 0;
retry:
/* Read the page containing the record */ /* Read the page containing the record */
if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess)) if (!XLogPageRead(RecPtr, emode, fetching_ckpt, randAccess))
return NULL; return NULL;
...@@ -3611,7 +3622,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3611,7 +3622,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
} }
else if (targetRecOff < pageHeaderSize) else if (targetRecOff < pageHeaderSize)
{ {
ereport(emode, ereport(emode_for_corrupt_record(emode),
(errmsg("invalid record offset at %X/%X", (errmsg("invalid record offset at %X/%X",
RecPtr->xlogid, RecPtr->xrecoff))); RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid; goto next_record_is_invalid;
...@@ -3619,7 +3630,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3619,7 +3630,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
targetRecOff == pageHeaderSize) targetRecOff == pageHeaderSize)
{ {
ereport(emode, ereport(emode_for_corrupt_record(emode),
(errmsg("contrecord is requested by %X/%X", (errmsg("contrecord is requested by %X/%X",
RecPtr->xlogid, RecPtr->xrecoff))); RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid; goto next_record_is_invalid;
...@@ -3634,7 +3645,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3634,7 +3645,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
{ {
if (record->xl_len != 0) if (record->xl_len != 0)
{ {
ereport(emode, ereport(emode_for_corrupt_record(emode),
(errmsg("invalid xlog switch record at %X/%X", (errmsg("invalid xlog switch record at %X/%X",
RecPtr->xlogid, RecPtr->xrecoff))); RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid; goto next_record_is_invalid;
...@@ -3642,7 +3653,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3642,7 +3653,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
} }
else if (record->xl_len == 0) else if (record->xl_len == 0)
{ {
ereport(emode, ereport(emode_for_corrupt_record(emode),
(errmsg("record with zero length at %X/%X", (errmsg("record with zero length at %X/%X",
RecPtr->xlogid, RecPtr->xrecoff))); RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid; goto next_record_is_invalid;
...@@ -3651,14 +3662,14 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3651,14 +3662,14 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
record->xl_tot_len > SizeOfXLogRecord + record->xl_len + record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ)) XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
{ {
ereport(emode, ereport(emode_for_corrupt_record(emode),
(errmsg("invalid record length at %X/%X", (errmsg("invalid record length at %X/%X",
RecPtr->xlogid, RecPtr->xrecoff))); RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid; goto next_record_is_invalid;
} }
if (record->xl_rmid > RM_MAX_ID) if (record->xl_rmid > RM_MAX_ID)
{ {
ereport(emode, ereport(emode_for_corrupt_record(emode),
(errmsg("invalid resource manager ID %u at %X/%X", (errmsg("invalid resource manager ID %u at %X/%X",
record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff))); record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid; goto next_record_is_invalid;
...@@ -3671,7 +3682,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3671,7 +3682,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
*/ */
if (!XLByteLT(record->xl_prev, *RecPtr)) if (!XLByteLT(record->xl_prev, *RecPtr))
{ {
ereport(emode, ereport(emode_for_corrupt_record(emode),
(errmsg("record with incorrect prev-link %X/%X at %X/%X", (errmsg("record with incorrect prev-link %X/%X at %X/%X",
record->xl_prev.xlogid, record->xl_prev.xrecoff, record->xl_prev.xlogid, record->xl_prev.xrecoff,
RecPtr->xlogid, RecPtr->xrecoff))); RecPtr->xlogid, RecPtr->xrecoff)));
...@@ -3687,7 +3698,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3687,7 +3698,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
*/ */
if (!XLByteEQ(record->xl_prev, ReadRecPtr)) if (!XLByteEQ(record->xl_prev, ReadRecPtr))
{ {
ereport(emode, ereport(emode_for_corrupt_record(emode),
(errmsg("record with incorrect prev-link %X/%X at %X/%X", (errmsg("record with incorrect prev-link %X/%X at %X/%X",
record->xl_prev.xlogid, record->xl_prev.xrecoff, record->xl_prev.xlogid, record->xl_prev.xrecoff,
RecPtr->xlogid, RecPtr->xrecoff))); RecPtr->xlogid, RecPtr->xrecoff)));
...@@ -3716,7 +3727,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3716,7 +3727,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
{ {
readRecordBufSize = 0; readRecordBufSize = 0;
/* We treat this as a "bogus data" condition */ /* We treat this as a "bogus data" condition */
ereport(emode, ereport(emode_for_corrupt_record(emode),
(errmsg("record length %u at %X/%X too long", (errmsg("record length %u at %X/%X too long",
total_len, RecPtr->xlogid, RecPtr->xrecoff))); total_len, RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid; goto next_record_is_invalid;
...@@ -3756,7 +3767,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3756,7 +3767,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
/* Check that the continuation record looks valid */ /* Check that the continuation record looks valid */
if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD)) if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
{ {
ereport(emode, ereport(emode_for_corrupt_record(emode),
(errmsg("there is no contrecord flag in log file %u, segment %u, offset %u", (errmsg("there is no contrecord flag in log file %u, segment %u, offset %u",
readId, readSeg, readOff))); readId, readSeg, readOff)));
goto next_record_is_invalid; goto next_record_is_invalid;
...@@ -3766,7 +3777,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3766,7 +3777,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
if (contrecord->xl_rem_len == 0 || if (contrecord->xl_rem_len == 0 ||
total_len != (contrecord->xl_rem_len + gotlen)) total_len != (contrecord->xl_rem_len + gotlen))
{ {
ereport(emode, ereport(emode_for_corrupt_record(emode),
(errmsg("invalid contrecord length %u in log file %u, segment %u, offset %u", (errmsg("invalid contrecord length %u in log file %u, segment %u, offset %u",
contrecord->xl_rem_len, contrecord->xl_rem_len,
readId, readSeg, readOff))); readId, readSeg, readOff)));
...@@ -3784,7 +3795,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3784,7 +3795,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
contrecord->xl_rem_len); contrecord->xl_rem_len);
break; break;
} }
if (!RecordIsValid(record, *RecPtr, emode)) if (!RecordIsValid(record, *RecPtr, emode_for_corrupt_record(emode)))
goto next_record_is_invalid; goto next_record_is_invalid;
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
EndRecPtr.xlogid = readId; EndRecPtr.xlogid = readId;
...@@ -3798,7 +3809,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3798,7 +3809,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
} }
/* Record does not cross a page boundary */ /* Record does not cross a page boundary */
if (!RecordIsValid(record, *RecPtr, emode)) if (!RecordIsValid(record, *RecPtr, emode_for_corrupt_record(emode)))
goto next_record_is_invalid; goto next_record_is_invalid;
EndRecPtr.xlogid = RecPtr->xlogid; EndRecPtr.xlogid = RecPtr->xlogid;
EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len); EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
...@@ -3824,12 +3835,19 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt) ...@@ -3824,12 +3835,19 @@ ReadRecord(XLogRecPtr *RecPtr, int emode_arg, bool fetching_ckpt)
} }
return (XLogRecord *) buffer; return (XLogRecord *) buffer;
next_record_is_invalid:; next_record_is_invalid:
failedSources |= readSource;
if (readFile >= 0) if (readFile >= 0)
{ {
close(readFile); close(readFile);
readFile = -1; readFile = -1;
} }
/* In standby-mode, keep trying */
if (StandbyMode)
goto retry;
else
return NULL; return NULL;
} }
...@@ -8731,10 +8749,24 @@ StartupProcessMain(void) ...@@ -8731,10 +8749,24 @@ StartupProcessMain(void)
/* /*
* Read the XLOG page containing RecPtr into readBuf (if not read already). * Read the XLOG page containing RecPtr into readBuf (if not read already).
* Returns true if successful, false otherwise or fails if emode is PANIC. * Returns true if the page is read successfully.
* *
* This is responsible for restoring files from archive as needed, as well * This is responsible for restoring files from archive as needed, as well
* as for waiting for the requested WAL record to arrive in standby mode. * as for waiting for the requested WAL record to arrive in standby mode.
*
* 'emode' specifies the log level used for reporting "file not found" or
* "end of WAL" situations in archive recovery, or in standby mode when a
* trigger file is found. If set to WARNING or below, XLogPageRead() returns
* false in those situations, on higher log levels the ereport() won't
* return.
*
* In standby mode, if after a successful return of XLogPageRead() the
* caller finds the record it's interested in to be broken, it should
* ereport the error with the level determined by
* emode_for_corrupt_record(), and then set "failedSources |= readSource"
* and call XLogPageRead() again with the same arguments. This lets
* XLogPageRead() to try fetching the record from another source, or to
* sleep and retry.
*/ */
static bool static bool
XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
...@@ -8746,13 +8778,14 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, ...@@ -8746,13 +8778,14 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
uint32 targetRecOff; uint32 targetRecOff;
uint32 targetId; uint32 targetId;
uint32 targetSeg; uint32 targetSeg;
static pg_time_t last_fail_time = 0;
XLByteToSeg(*RecPtr, targetId, targetSeg); XLByteToSeg(*RecPtr, targetId, targetSeg);
targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ; targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ; targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
/* Fast exit if we have read the record in the current buffer already */ /* Fast exit if we have read the record in the current buffer already */
if (targetId == readId && targetSeg == readSeg && if (failedSources == 0 && targetId == readId && targetSeg == readSeg &&
targetPageOff == readOff && targetRecOff < readLen) targetPageOff == readOff && targetRecOff < readLen)
return true; return true;
...@@ -8764,18 +8797,18 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, ...@@ -8764,18 +8797,18 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
{ {
close(readFile); close(readFile);
readFile = -1; readFile = -1;
readSource = 0;
} }
XLByteToSeg(*RecPtr, readId, readSeg); XLByteToSeg(*RecPtr, readId, readSeg);
retry:
/* See if we need to retrieve more data */ /* See if we need to retrieve more data */
if (readFile < 0 || if (readFile < 0 ||
(readStreamed && !XLByteLT(*RecPtr, receivedUpto))) (readSource == XLOG_FROM_STREAM && !XLByteLT(*RecPtr, receivedUpto)))
{ {
if (StandbyMode) if (StandbyMode)
{ {
bool last_restore_failed = false;
/* /*
* In standby mode, wait for the requested record to become * In standby mode, wait for the requested record to become
* available, either via restore_command succeeding to restore the * available, either via restore_command succeeding to restore the
...@@ -8800,15 +8833,16 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, ...@@ -8800,15 +8833,16 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
{ {
readFile = readFile =
XLogFileRead(readId, readSeg, PANIC, XLogFileRead(readId, readSeg, PANIC,
recoveryTargetTLI, false, false); recoveryTargetTLI,
XLOG_FROM_PG_XLOG, false);
switched_segment = true; switched_segment = true;
readStreamed = true; readSource = XLOG_FROM_STREAM;
} }
break; break;
} }
if (CheckForStandbyTrigger()) if (CheckForStandbyTrigger())
goto next_record_is_invalid; goto triggered;
/* /*
* When streaming is active, we want to react quickly when * When streaming is active, we want to react quickly when
...@@ -8818,6 +8852,9 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, ...@@ -8818,6 +8852,9 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
} }
else else
{ {
int sources;
pg_time_t now;
/* /*
* Until walreceiver manages to reconnect, poll the * Until walreceiver manages to reconnect, poll the
* archive. * archive.
...@@ -8830,48 +8867,73 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, ...@@ -8830,48 +8867,73 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
/* Reset curFileTLI if random fetch. */ /* Reset curFileTLI if random fetch. */
if (randAccess) if (randAccess)
curFileTLI = 0; curFileTLI = 0;
readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2, true);
switched_segment = true;
readStreamed = false;
if (readFile != -1)
{
elog(DEBUG1, "got WAL segment from archive");
break;
}
/* /*
* If we succeeded restoring some segments from archive * Try to restore the file from archive, or read an
* since the last connection attempt (or we haven't tried * existing file from pg_xlog.
* streaming yet, retry immediately. But if we haven't,
* assume the problem is persistent, so be less
* aggressive.
*/ */
if (last_restore_failed) sources = XLOG_FROM_ARCHIVE | XLOG_FROM_PG_XLOG;
if (!(sources & ~failedSources))
{ {
/* /*
* Check to see if the trigger file exists. Note that * We've exhausted all options for retrieving the
* we do this only after failure, so when you create * file. Retry ...
* the trigger file, we still finish replaying as much
* as we can before failover.
*/ */
if (CheckForStandbyTrigger()) failedSources = 0;
goto next_record_is_invalid;
pg_usleep(5000000L); /* 5 seconds */ /*
* ... but sleep first if it hasn't been long since
* last attempt.
*/
now = (pg_time_t) time(NULL);
if ((now - last_fail_time) < 5)
{
pg_usleep(1000000L * (5 - (now - last_fail_time)));
now = (pg_time_t) time(NULL);
} }
last_restore_failed = true; last_fail_time = now;
/* /*
* Nope, not found in archive. Try to stream it. * If primary_conninfo is set, launch walreceiver to
* try to stream the missing WAL, before retrying
* to restore from archive/pg_xlog.
* *
* If fetching_ckpt is TRUE, RecPtr points to the initial * If fetching_ckpt is TRUE, RecPtr points to the
* checkpoint location. In that case, we use RedoStartLSN * initial checkpoint location. In that case, we use
* as the streaming start position instead of RecPtr, so * RedoStartLSN as the streaming start position instead
* that when we later jump backwards to start redo at * of RecPtr, so that when we later jump backwards to
* RedoStartLSN, we will have the logs streamed already. * start redo at RedoStartLSN, we will have the logs
* streamed already.
*/ */
if (PrimaryConnInfo) if (PrimaryConnInfo)
RequestXLogStreaming(fetching_ckpt ? RedoStartLSN : *RecPtr, {
RequestXLogStreaming(
fetching_ckpt ? RedoStartLSN : *RecPtr,
PrimaryConnInfo); PrimaryConnInfo);
continue;
}
}
/* Don't try to read from a source that just failed */
sources &= ~failedSources;
readFile = XLogFileReadAnyTLI(readId, readSeg, DEBUG2,
sources);
switched_segment = true;
if (readFile != -1)
break;
/*
* Nope, not found in archive and/or pg_xlog.
*/
failedSources |= sources;
/*
* Check to see if the trigger file exists. Note that
* we do this only after failure, so when you create
* the trigger file, we still finish replaying as much
* as we can from archive and pg_xlog before failover.
*/
if (CheckForStandbyTrigger())
goto triggered;
} }
/* /*
...@@ -8886,13 +8948,19 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, ...@@ -8886,13 +8948,19 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
/* In archive or crash recovery. */ /* In archive or crash recovery. */
if (readFile < 0) if (readFile < 0)
{ {
int sources;
/* Reset curFileTLI if random fetch. */ /* Reset curFileTLI if random fetch. */
if (randAccess) if (randAccess)
curFileTLI = 0; curFileTLI = 0;
sources = XLOG_FROM_PG_XLOG;
if (InArchiveRecovery)
sources |= XLOG_FROM_ARCHIVE;
readFile = XLogFileReadAnyTLI(readId, readSeg, emode, readFile = XLogFileReadAnyTLI(readId, readSeg, emode,
InArchiveRecovery); sources);
switched_segment = true; switched_segment = true;
readStreamed = false;
if (readFile < 0) if (readFile < 0)
return false; return false;
} }
...@@ -8900,8 +8968,8 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, ...@@ -8900,8 +8968,8 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
} }
/* /*
* At this point, we have the right segment open and we know the requested * At this point, we have the right segment open and if we're streaming
* record is in it. * we know the requested record is in it.
*/ */
Assert(readFile != -1); Assert(readFile != -1);
...@@ -8911,7 +8979,7 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, ...@@ -8911,7 +8979,7 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
* requested record has been received, but this is for the benefit of * requested record has been received, but this is for the benefit of
* future calls, to allow quick exit at the top of this function. * future calls, to allow quick exit at the top of this function.
*/ */
if (readStreamed) if (readSource == XLOG_FROM_STREAM)
{ {
if (RecPtr->xlogid != receivedUpto.xlogid || if (RecPtr->xlogid != receivedUpto.xlogid ||
(RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ)) (RecPtr->xrecoff / XLOG_BLCKSZ) != (receivedUpto.xrecoff / XLOG_BLCKSZ))
...@@ -8936,13 +9004,14 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, ...@@ -8936,13 +9004,14 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
readOff = 0; readOff = 0;
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{ {
ereport(emode, ereport(emode_for_corrupt_record(emode),
(errcode_for_file_access(), (errcode_for_file_access(),
errmsg("could not read from log file %u, segment %u, offset %u: %m", errmsg("could not read from log file %u, segment %u, offset %u: %m",
readId, readSeg, readOff))); readId, readSeg, readOff)));
goto next_record_is_invalid; goto next_record_is_invalid;
} }
if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) if (!ValidXLOGHeader((XLogPageHeader) readBuf,
emode_for_corrupt_record(emode)))
goto next_record_is_invalid; goto next_record_is_invalid;
} }
...@@ -8950,7 +9019,7 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, ...@@ -8950,7 +9019,7 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
readOff = targetPageOff; readOff = targetPageOff;
if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0) if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
{ {
ereport(emode, ereport(emode_for_corrupt_record(emode),
(errcode_for_file_access(), (errcode_for_file_access(),
errmsg("could not seek in log file %u, segment %u to offset %u: %m", errmsg("could not seek in log file %u, segment %u to offset %u: %m",
readId, readSeg, readOff))); readId, readSeg, readOff)));
...@@ -8958,13 +9027,13 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, ...@@ -8958,13 +9027,13 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
} }
if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ) if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
{ {
ereport(emode, ereport(emode_for_corrupt_record(emode),
(errcode_for_file_access(), (errcode_for_file_access(),
errmsg("could not read from log file %u, segment %u, offset %u: %m", errmsg("could not read from log file %u, segment %u, offset %u: %m",
readId, readSeg, readOff))); readId, readSeg, readOff)));
goto next_record_is_invalid; goto next_record_is_invalid;
} }
if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode_for_corrupt_record(emode)))
goto next_record_is_invalid; goto next_record_is_invalid;
Assert(targetId == readId); Assert(targetId == readId);
...@@ -8975,13 +9044,62 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt, ...@@ -8975,13 +9044,62 @@ XLogPageRead(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt,
return true; return true;
next_record_is_invalid: next_record_is_invalid:
failedSources |= readSource;
if (readFile >= 0) if (readFile >= 0)
close(readFile); close(readFile);
readFile = -1; readFile = -1;
readStreamed = false;
readLen = 0; readLen = 0;
readSource = 0;
/* In standby-mode, keep trying */
if (StandbyMode)
goto retry;
else
return false; return false;
triggered:
if (readFile >= 0)
close(readFile);
readFile = -1;
readLen = 0;
readSource = 0;
return false;
}
/*
* Determine what log level should be used to report a corrupt WAL record
* in the current WAL page, previously read by XLogPageRead().
*
* 'emode' is the error mode that would be used to report a file-not-found
* or legitimate end-of-WAL situation. It is upgraded to WARNING or PANIC
* if a corrupt record is not expected at this point.
*/
static int
emode_for_corrupt_record(int emode)
{
/*
* We don't expect any invalid records in archive or in records streamed
* from master. Files in the archive should be complete, and we should
* never hit the end of WAL because we stop and wait for more WAL to
* arrive before replaying it.
*
* In standby mode, throw a WARNING and keep retrying. If we're lucky
* it's a transient error and will go away by itself, and in any case
* it's better to keep the standby open for any possible read-only
* queries. We throw WARNING in PITR as well, which causes the recovery
* to end. That's questionable, you probably would want to abort the
* recovery if the archive is corrupt and investigate the situation.
* But that's the behavior we've always had, and it does make sense
* for tools like pg_standby that implement a standby mode externally.
*/
if (readSource == XLOG_FROM_STREAM || readSource == XLOG_FROM_ARCHIVE)
{
if (emode < WARNING)
emode = WARNING;
}
return emode;
} }
/* /*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment