Commit 1296d5c5 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Fix a couple of error-handling bugs in the xlogreader patch.

XLogReadRecord should reset its state on every error, to make sure it
re-reads the page on next call. It was inconsistent in that some errors did
that, but some did not.

In ReadRecord(), don't give up on an error if we're in standby mode. The
loop was set up to retry, but the checks within the loop broke out of the
loop on any error.

Andres Freund, with some tweaking by me.
parent b14f81bc
......@@ -3180,7 +3180,8 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
* try to read a record just after the last one previously read.
*
* If no valid record is available, returns NULL, or fails if emode is PANIC.
* (emode must be either PANIC, LOG)
* (emode must be either PANIC, LOG). In standby mode, retries until a valid
* record is available.
*
* The record is copied into readRecordBuf, so that on successful return,
* the returned record pointer always points there.
......@@ -3209,12 +3210,6 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
EndRecPtr = xlogreader->EndRecPtr;
if (record == NULL)
{
/* not all failures fill errormsg; report those that do */
if (errormsg && errormsg[0] != '\0')
ereport(emode_for_corrupt_record(emode,
RecPtr ? RecPtr : EndRecPtr),
(errmsg_internal("%s", errormsg) /* already translated */));
lastSourceFailed = true;
if (readFile >= 0)
......@@ -3222,7 +3217,20 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
close(readFile);
readFile = -1;
}
break;
/*
* We only end up here without a message when XLogPageRead() failed
* - in that case we already logged something.
* In StandbyMode that only happens if we have been triggered, so
* we shouldn't loop anymore in that case.
*/
if (errormsg)
ereport(emode_for_corrupt_record(emode,
RecPtr ? RecPtr : EndRecPtr),
(errmsg_internal("%s", errormsg) /* already translated */));
/* Give up, or retry if we're in standby mode. */
continue;
}
/*
......@@ -3234,6 +3242,8 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
XLogSegNo segno;
int32 offset;
lastSourceFailed = true;
XLByteToSeg(xlogreader->latestPagePtr, segno);
offset = xlogreader->latestPagePtr % XLogSegSize;
XLogFileName(fname, xlogreader->readPageTLI, segno);
......@@ -3243,9 +3253,10 @@ ReadRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, int emode,
xlogreader->latestPageTLI,
fname,
offset)));
return false;
record = NULL;
continue;
}
} while (StandbyMode && record == NULL);
} while (StandbyMode && record == NULL && !CheckForStandbyTrigger());
return record;
}
......
......@@ -222,11 +222,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
readOff = ReadPageInternal(state, targetPagePtr, SizeOfXLogRecord);
if (readOff < 0)
{
if (state->errormsg_buf[0] != '\0')
*errormsg = state->errormsg_buf;
return NULL;
}
goto err;
/*
* ReadPageInternal always returns at least the page header, so we can
......@@ -246,8 +242,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
{
report_invalid_record(state, "invalid record offset at %X/%X",
(uint32) (RecPtr >> 32), (uint32) RecPtr);
*errormsg = state->errormsg_buf;
return NULL;
goto err;
}
if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
......@@ -255,8 +250,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
{
report_invalid_record(state, "contrecord is requested by %X/%X",
(uint32) (RecPtr >> 32), (uint32) RecPtr);
*errormsg = state->errormsg_buf;
return NULL;
goto err;
}
/* ReadPageInternal has verified the page header */
......@@ -270,11 +264,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
targetPagePtr,
Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
if (readOff < 0)
{
if (state->errormsg_buf[0] != '\0')
*errormsg = state->errormsg_buf;
return NULL;
}
goto err;
/*
* Read the record length.
......@@ -300,11 +290,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
{
if (!ValidXLogRecordHeader(state, RecPtr, state->ReadRecPtr, record,
randAccess))
{
if (state->errormsg_buf[0] != '\0')
*errormsg = state->errormsg_buf;
return NULL;
}
goto err;
gotheader = true;
}
else
......@@ -314,8 +300,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
{
report_invalid_record(state, "invalid record length at %X/%X",
(uint32) (RecPtr >> 32), (uint32) RecPtr);
*errormsg = state->errormsg_buf;
return NULL;
goto err;
}
gotheader = false;
}
......@@ -330,8 +315,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
report_invalid_record(state, "record length %u at %X/%X too long",
total_len,
(uint32) (RecPtr >> 32), (uint32) RecPtr);
*errormsg = state->errormsg_buf;
return NULL;
goto err;
}
len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment