Commit 061e7efb authored by Heikki Linnakangas's avatar Heikki Linnakangas

Allow WAL record header to be split across pages.

This saves a few bytes of WAL space, but the real motivation is to make it
predictable how much WAL space a record requires, as it no longer depends
on whether we need to waste the last few bytes at end of WAL page because
the header doesn't fit.

The total length field of WAL record, xl_tot_len, is moved to the beginning
of the WAL record header, so that it is still always found on the first page
where a WAL record begins.

Bump WAL version number again as this is an incompatible change.
parent 20ba5ca6
...@@ -653,7 +653,9 @@ static void CleanupBackupHistory(void); ...@@ -653,7 +653,9 @@ static void CleanupBackupHistory(void);
static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt); static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt);
static void CheckRecoveryConsistency(void); static void CheckRecoveryConsistency(void);
static bool ValidXLOGHeader(XLogPageHeader hdr, int emode); static bool ValidXLogPageHeader(XLogPageHeader hdr, int emode);
static bool ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record,
int emode, bool randAccess);
static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt); static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
static List *readTimeLineHistory(TimeLineID targetTLI); static List *readTimeLineHistory(TimeLineID targetTLI);
static bool existsTimeLineHistory(TimeLineID probeTLI); static bool existsTimeLineHistory(TimeLineID probeTLI);
...@@ -695,7 +697,6 @@ XLogRecPtr ...@@ -695,7 +697,6 @@ XLogRecPtr
XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
{ {
XLogCtlInsert *Insert = &XLogCtl->Insert; XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogRecord *record;
XLogRecPtr RecPtr; XLogRecPtr RecPtr;
XLogRecPtr WriteRqst; XLogRecPtr WriteRqst;
uint32 freespace; uint32 freespace;
...@@ -709,6 +710,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) ...@@ -709,6 +710,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS]; XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
XLogRecData hdr_rdt;
pg_crc32 rdata_crc; pg_crc32 rdata_crc;
uint32 len, uint32 len,
write_len; write_len;
...@@ -717,6 +719,15 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) ...@@ -717,6 +719,15 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
bool doPageWrites; bool doPageWrites;
bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH); bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
uint8 info_orig = info; uint8 info_orig = info;
static XLogRecord *rechdr;
if (rechdr == NULL)
{
rechdr = malloc(SizeOfXLogRecord);
if (rechdr == NULL)
elog(ERROR, "out of memory");
MemSet(rechdr, 0, SizeOfXLogRecord);
}
/* cross-check on whether we should be here or not */ /* cross-check on whether we should be here or not */
if (!XLogInsertAllowed()) if (!XLogInsertAllowed())
...@@ -903,6 +914,22 @@ begin:; ...@@ -903,6 +914,22 @@ begin:;
for (rdt = rdata; rdt != NULL; rdt = rdt->next) for (rdt = rdata; rdt != NULL; rdt = rdt->next)
COMP_CRC32(rdata_crc, rdt->data, rdt->len); COMP_CRC32(rdata_crc, rdt->data, rdt->len);
/*
* Construct record header (prev-link and CRC are filled in later), and
* make that the first chunk in the chain.
*/
rechdr->xl_xid = GetCurrentTransactionIdIfAny();
rechdr->xl_tot_len = SizeOfXLogRecord + write_len;
rechdr->xl_len = len; /* doesn't include backup blocks */
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
hdr_rdt.next = rdata;
hdr_rdt.data = (char *) rechdr;
hdr_rdt.len = SizeOfXLogRecord;
write_len += SizeOfXLogRecord;
START_CRIT_SECTION(); START_CRIT_SECTION();
/* Now wait to get insert lock */ /* Now wait to get insert lock */
...@@ -962,12 +989,12 @@ begin:; ...@@ -962,12 +989,12 @@ begin:;
} }
/* /*
* If there isn't enough space on the current XLOG page for a record * If the current page is completely full, the record goes to the next
* header, advance to the next page (leaving the unused space as zeroes). * page, right after the page header.
*/ */
updrqst = false; updrqst = false;
freespace = INSERT_FREESPACE(Insert); freespace = INSERT_FREESPACE(Insert);
if (freespace < SizeOfXLogRecord) if (freespace == 0)
{ {
updrqst = AdvanceXLInsertBuffer(false); updrqst = AdvanceXLInsertBuffer(false);
freespace = INSERT_FREESPACE(Insert); freespace = INSERT_FREESPACE(Insert);
...@@ -1009,21 +1036,13 @@ begin:; ...@@ -1009,21 +1036,13 @@ begin:;
return RecPtr; return RecPtr;
} }
/* Insert record header */ /* Finish the record header */
rechdr->xl_prev = Insert->PrevRecord;
record = (XLogRecord *) Insert->currpos;
record->xl_prev = Insert->PrevRecord;
record->xl_xid = GetCurrentTransactionIdIfAny();
record->xl_tot_len = SizeOfXLogRecord + write_len;
record->xl_len = len; /* doesn't include backup blocks */
record->xl_info = info;
record->xl_rmid = rmid;
/* Now we can finish computing the record's CRC */ /* Now we can finish computing the record's CRC */
COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32), COMP_CRC32(rdata_crc, (char *) rechdr, offsetof(XLogRecord, xl_crc));
SizeOfXLogRecord - sizeof(pg_crc32));
FIN_CRC32(rdata_crc); FIN_CRC32(rdata_crc);
record->xl_crc = rdata_crc; rechdr->xl_crc = rdata_crc;
#ifdef WAL_DEBUG #ifdef WAL_DEBUG
if (XLOG_DEBUG) if (XLOG_DEBUG)
...@@ -1033,11 +1052,11 @@ begin:; ...@@ -1033,11 +1052,11 @@ begin:;
initStringInfo(&buf); initStringInfo(&buf);
appendStringInfo(&buf, "INSERT @ %X/%X: ", appendStringInfo(&buf, "INSERT @ %X/%X: ",
RecPtr.xlogid, RecPtr.xrecoff); RecPtr.xlogid, RecPtr.xrecoff);
xlog_outrec(&buf, record); xlog_outrec(&buf, rechdr);
if (rdata->data != NULL) if (rdata->data != NULL)
{ {
appendStringInfo(&buf, " - "); appendStringInfo(&buf, " - ");
RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data); RmgrTable[rechdr->xl_rmid].rm_desc(&buf, rechdr->xl_info, rdata->data);
} }
elog(LOG, "%s", buf.data); elog(LOG, "%s", buf.data);
pfree(buf.data); pfree(buf.data);
...@@ -1048,12 +1067,10 @@ begin:; ...@@ -1048,12 +1067,10 @@ begin:;
ProcLastRecPtr = RecPtr; ProcLastRecPtr = RecPtr;
Insert->PrevRecord = RecPtr; Insert->PrevRecord = RecPtr;
Insert->currpos += SizeOfXLogRecord;
freespace -= SizeOfXLogRecord;
/* /*
* Append the data, including backup blocks if any * Append the data, including backup blocks if any
*/ */
rdata = &hdr_rdt;
while (write_len) while (write_len)
{ {
while (rdata->data == NULL) while (rdata->data == NULL)
...@@ -1171,7 +1188,7 @@ begin:; ...@@ -1171,7 +1188,7 @@ begin:;
/* normal case, ie not xlog switch */ /* normal case, ie not xlog switch */
/* Need to update shared LogwrtRqst if some block was filled up */ /* Need to update shared LogwrtRqst if some block was filled up */
if (freespace < SizeOfXLogRecord) if (freespace == 0)
{ {
/* curridx is filled and available for writing out */ /* curridx is filled and available for writing out */
updrqst = true; updrqst = true;
...@@ -2090,7 +2107,7 @@ XLogFlush(XLogRecPtr record) ...@@ -2090,7 +2107,7 @@ XLogFlush(XLogRecPtr record)
XLogCtlInsert *Insert = &XLogCtl->Insert; XLogCtlInsert *Insert = &XLogCtl->Insert;
uint32 freespace = INSERT_FREESPACE(Insert); uint32 freespace = INSERT_FREESPACE(Insert);
if (freespace < SizeOfXLogRecord) /* buffer is full */ if (freespace == 0) /* buffer is full */
WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx]; WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
else else
{ {
...@@ -3705,8 +3722,7 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) ...@@ -3705,8 +3722,7 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
} }
/* Finally include the record header */ /* Finally include the record header */
COMP_CRC32(crc, (char *) record + sizeof(pg_crc32), COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
SizeOfXLogRecord - sizeof(pg_crc32));
FIN_CRC32(crc); FIN_CRC32(crc);
if (!EQ_CRC32(record->xl_crc, crc)) if (!EQ_CRC32(record->xl_crc, crc))
...@@ -3736,13 +3752,13 @@ static XLogRecord * ...@@ -3736,13 +3752,13 @@ static XLogRecord *
ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
{ {
XLogRecord *record; XLogRecord *record;
char *buffer;
XLogRecPtr tmpRecPtr = EndRecPtr; XLogRecPtr tmpRecPtr = EndRecPtr;
bool randAccess = false; bool randAccess = false;
uint32 len, uint32 len,
total_len; total_len;
uint32 targetRecOff; uint32 targetRecOff;
uint32 pageHeaderSize; uint32 pageHeaderSize;
bool gotheader;
if (readBuf == NULL) if (readBuf == NULL)
{ {
...@@ -3762,17 +3778,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) ...@@ -3762,17 +3778,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
RecPtr = &tmpRecPtr; RecPtr = &tmpRecPtr;
/* /*
* RecPtr is pointing to end+1 of the previous WAL record. We must * RecPtr is pointing to end+1 of the previous WAL record. If
* advance it if necessary to where the next record starts. First, * we're at a page boundary, no more records can fit on the current
* align to next page if no more records can fit on the current page. * page. We must skip over the page header, but we can't do that
*/ * until we've read in the page, since the header size is variable.
if (XLOG_BLCKSZ - (RecPtr->xrecoff % XLOG_BLCKSZ) < SizeOfXLogRecord)
NextLogPage(*RecPtr);
/*
* If at page start, we must skip over the page header. But we can't
* do that until we've read in the page, since the header size is
* variable.
*/ */
} }
else else
...@@ -3793,7 +3802,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) ...@@ -3793,7 +3802,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt)
* to go backwards (but we can't reset that variable right here, since * to go backwards (but we can't reset that variable right here, since
* we might not change files at all). * we might not change files at all).
*/ */
lastPageTLI = 0; /* see comment in ValidXLOGHeader */ lastPageTLI = 0; /* see comment in ValidXLogPageHeader */
randAccess = true; /* allow curFileTLI to go backwards too */ randAccess = true; /* allow curFileTLI to go backwards too */
} }
...@@ -3833,76 +3842,15 @@ retry: ...@@ -3833,76 +3842,15 @@ retry:
RecPtr->xlogid, RecPtr->xrecoff))); RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid; goto next_record_is_invalid;
} }
record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
/* /*
* xl_len == 0 is bad data for everything except XLOG SWITCH, where it is * NB: Even though we use an XLogRecord pointer here, the whole record
* required. * header might not fit on this page. xl_tot_len is the first field in
*/ * struct, so it must be on this page, but we cannot safely access any
if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH) * other fields yet.
{
if (record->xl_len != 0)
{
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("invalid xlog switch record at %X/%X",
RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid;
}
}
else if (record->xl_len == 0)
{
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("record with zero length at %X/%X",
RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid;
}
if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
{
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("invalid record length at %X/%X",
RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid;
}
if (record->xl_rmid > RM_MAX_ID)
{
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("invalid resource manager ID %u at %X/%X",
record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid;
}
if (randAccess)
{
/*
* We can't exactly verify the prev-link, but surely it should be less
* than the record's own address.
*/
if (!XLByteLT(record->xl_prev, *RecPtr))
{
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("record with incorrect prev-link %X/%X at %X/%X",
record->xl_prev.xlogid, record->xl_prev.xrecoff,
RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid;
}
}
else
{
/*
* Record's prev-link should exactly match our previous location. This
* check guards against torn WAL pages where a stale but valid-looking
* WAL record starts on a sector boundary.
*/ */
if (!XLByteEQ(record->xl_prev, ReadRecPtr)) record = (XLogRecord *) (readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
{ total_len = record->xl_tot_len;
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("record with incorrect prev-link %X/%X at %X/%X",
record->xl_prev.xlogid, record->xl_prev.xrecoff,
RecPtr->xlogid, RecPtr->xrecoff)));
goto next_record_is_invalid;
}
}
/* /*
* Allocate or enlarge readRecordBuf as needed. To avoid useless small * Allocate or enlarge readRecordBuf as needed. To avoid useless small
...@@ -3911,7 +3859,6 @@ retry: ...@@ -3911,7 +3859,6 @@ retry:
* enough for all "normal" records, but very large commit or abort records * enough for all "normal" records, but very large commit or abort records
* might need more space.) * might need more space.)
*/ */
total_len = record->xl_tot_len;
if (total_len > readRecordBufSize) if (total_len > readRecordBufSize)
{ {
uint32 newSize = total_len; uint32 newSize = total_len;
...@@ -3933,7 +3880,19 @@ retry: ...@@ -3933,7 +3880,19 @@ retry:
readRecordBufSize = newSize; readRecordBufSize = newSize;
} }
buffer = readRecordBuf; /*
* If we got the whole header already, validate it immediately. Otherwise
* we validate it after reading the rest of the header from the next page.
*/
if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
{
if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
goto next_record_is_invalid;
gotheader = true;
}
else
gotheader = false;
len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ; len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
if (total_len > len) if (total_len > len)
{ {
...@@ -3941,16 +3900,19 @@ retry: ...@@ -3941,16 +3900,19 @@ retry:
char *contrecord; char *contrecord;
XLogPageHeader pageHeader; XLogPageHeader pageHeader;
XLogRecPtr pagelsn; XLogRecPtr pagelsn;
uint32 gotlen = len; char *buffer;
uint32 gotlen;
/* Initialize pagelsn to the beginning of the page this record is on */ /* Initialize pagelsn to the beginning of the page this record is on */
pagelsn = *RecPtr; pagelsn = *RecPtr;
pagelsn.xrecoff = (pagelsn.xrecoff / XLOG_BLCKSZ) * XLOG_BLCKSZ; pagelsn.xrecoff = (pagelsn.xrecoff / XLOG_BLCKSZ) * XLOG_BLCKSZ;
memcpy(buffer, record, len); /* Copy the first fragment of the record from the first page. */
record = (XLogRecord *) buffer; memcpy(readRecordBuf, readBuf + RecPtr->xrecoff % XLOG_BLCKSZ, len);
buffer += len; buffer = readRecordBuf + len;
for (;;) gotlen = len;
do
{ {
/* Calculate pointer to beginning of next page */ /* Calculate pointer to beginning of next page */
XLByteAdvance(pagelsn, XLOG_BLCKSZ); XLByteAdvance(pagelsn, XLOG_BLCKSZ);
...@@ -3958,8 +3920,9 @@ retry: ...@@ -3958,8 +3920,9 @@ retry:
if (!XLogPageRead(&pagelsn, emode, false, false)) if (!XLogPageRead(&pagelsn, emode, false, false))
return NULL; return NULL;
/* Check that the continuation record looks valid */ /* Check that the continuation on next page looks valid */
if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD)) pageHeader = (XLogPageHeader) readBuf;
if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
{ {
ereport(emode_for_corrupt_record(emode, *RecPtr), ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("there is no contrecord flag in log segment %s, offset %u", (errmsg("there is no contrecord flag in log segment %s, offset %u",
...@@ -3967,14 +3930,13 @@ retry: ...@@ -3967,14 +3930,13 @@ retry:
readOff))); readOff)));
goto next_record_is_invalid; goto next_record_is_invalid;
} }
pageHeader = (XLogPageHeader) readBuf; /*
pageHeaderSize = XLogPageHeaderSize(pageHeader); * Cross-check that xlp_rem_len agrees with how much of the record
contrecord = (char *) readBuf + pageHeaderSize; * we expect there to be left.
*/
if (pageHeader->xlp_rem_len == 0 || if (pageHeader->xlp_rem_len == 0 ||
total_len != (pageHeader->xlp_rem_len + gotlen)) total_len != (pageHeader->xlp_rem_len + gotlen))
{ {
char fname[MAXFNAMELEN];
XLogFileName(fname, curFileTLI, readSegNo);
ereport(emode_for_corrupt_record(emode, *RecPtr), ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("invalid contrecord length %u in log segment %s, offset %u", (errmsg("invalid contrecord length %u in log segment %s, offset %u",
pageHeader->xlp_rem_len, pageHeader->xlp_rem_len,
...@@ -3982,17 +3944,28 @@ retry: ...@@ -3982,17 +3944,28 @@ retry:
readOff))); readOff)));
goto next_record_is_invalid; goto next_record_is_invalid;
} }
/* Append the continuation from this page to the buffer */
pageHeaderSize = XLogPageHeaderSize(pageHeader);
contrecord = (char *) readBuf + pageHeaderSize;
len = XLOG_BLCKSZ - pageHeaderSize; len = XLOG_BLCKSZ - pageHeaderSize;
if (pageHeader->xlp_rem_len > len) if (pageHeader->xlp_rem_len < len)
{ len = pageHeader->xlp_rem_len;
memcpy(buffer, (char *) contrecord, len); memcpy(buffer, (char *) contrecord, len);
gotlen += len;
buffer += len; buffer += len;
continue; gotlen += len;
}
memcpy(buffer, (char *) contrecord, pageHeader->xlp_rem_len); /* If we just reassembled the record header, validate it. */
break; if (!gotheader)
{
record = (XLogRecord *) readRecordBuf;
if (!ValidXLogRecordHeader(RecPtr, record, emode, randAccess))
goto next_record_is_invalid;
gotheader = true;
} }
} while (pageHeader->xlp_rem_len > len);
record = (XLogRecord *) readRecordBuf;
if (!RecordIsValid(record, *RecPtr, emode)) if (!RecordIsValid(record, *RecPtr, emode))
goto next_record_is_invalid; goto next_record_is_invalid;
pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf); pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
...@@ -4001,10 +3974,9 @@ retry: ...@@ -4001,10 +3974,9 @@ retry:
readOff + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len), readOff + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len),
EndRecPtr); EndRecPtr);
ReadRecPtr = *RecPtr; ReadRecPtr = *RecPtr;
/* needn't worry about XLOG SWITCH, it can't cross page boundaries */
return record;
} }
else
{
/* Record does not cross a page boundary */ /* Record does not cross a page boundary */
if (!RecordIsValid(record, *RecPtr, emode)) if (!RecordIsValid(record, *RecPtr, emode))
goto next_record_is_invalid; goto next_record_is_invalid;
...@@ -4012,7 +3984,8 @@ retry: ...@@ -4012,7 +3984,8 @@ retry:
EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len); EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
ReadRecPtr = *RecPtr; ReadRecPtr = *RecPtr;
memcpy(buffer, record, total_len); memcpy(readRecordBuf, record, total_len);
}
/* /*
* Special processing if it's an XLOG SWITCH record * Special processing if it's an XLOG SWITCH record
...@@ -4030,7 +4003,7 @@ retry: ...@@ -4030,7 +4003,7 @@ retry:
*/ */
readOff = XLogSegSize - XLOG_BLCKSZ; readOff = XLogSegSize - XLOG_BLCKSZ;
} }
return (XLogRecord *) buffer; return record;
next_record_is_invalid: next_record_is_invalid:
failedSources |= readSource; failedSources |= readSource;
...@@ -4055,7 +4028,7 @@ next_record_is_invalid: ...@@ -4055,7 +4028,7 @@ next_record_is_invalid:
* ReadRecord. It's not intended for use from anywhere else. * ReadRecord. It's not intended for use from anywhere else.
*/ */
static bool static bool
ValidXLOGHeader(XLogPageHeader hdr, int emode) ValidXLogPageHeader(XLogPageHeader hdr, int emode)
{ {
XLogRecPtr recaddr; XLogRecPtr recaddr;
...@@ -4173,6 +4146,88 @@ ValidXLOGHeader(XLogPageHeader hdr, int emode) ...@@ -4173,6 +4146,88 @@ ValidXLOGHeader(XLogPageHeader hdr, int emode)
return true; return true;
} }
/*
* Validate an XLOG record header.
*
* This is just a convenience subroutine to avoid duplicated code in
* ReadRecord. It's not intended for use from anywhere else.
*/
static bool
ValidXLogRecordHeader(XLogRecPtr *RecPtr, XLogRecord *record, int emode,
bool randAccess)
{
/*
* xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
* required.
*/
if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
{
if (record->xl_len != 0)
{
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("invalid xlog switch record at %X/%X",
RecPtr->xlogid, RecPtr->xrecoff)));
return false;
}
}
else if (record->xl_len == 0)
{
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("record with zero length at %X/%X",
RecPtr->xlogid, RecPtr->xrecoff)));
return false;
}
if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
{
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("invalid record length at %X/%X",
RecPtr->xlogid, RecPtr->xrecoff)));
return false;
}
if (record->xl_rmid > RM_MAX_ID)
{
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("invalid resource manager ID %u at %X/%X",
record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
return false;
}
if (randAccess)
{
/*
* We can't exactly verify the prev-link, but surely it should be less
* than the record's own address.
*/
if (!XLByteLT(record->xl_prev, *RecPtr))
{
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("record with incorrect prev-link %X/%X at %X/%X",
record->xl_prev.xlogid, record->xl_prev.xrecoff,
RecPtr->xlogid, RecPtr->xrecoff)));
return false;
}
}
else
{
/*
* Record's prev-link should exactly match our previous location. This
* check guards against torn WAL pages where a stale but valid-looking
* WAL record starts on a sector boundary.
*/
if (!XLByteEQ(record->xl_prev, ReadRecPtr))
{
ereport(emode_for_corrupt_record(emode, *RecPtr),
(errmsg("record with incorrect prev-link %X/%X at %X/%X",
record->xl_prev.xlogid, record->xl_prev.xrecoff,
RecPtr->xlogid, RecPtr->xrecoff)));
return false;
}
}
return true;
}
/* /*
* Try to read a timeline's history file. * Try to read a timeline's history file.
* *
...@@ -5182,8 +5237,7 @@ BootStrapXLOG(void) ...@@ -5182,8 +5237,7 @@ BootStrapXLOG(void)
INIT_CRC32(crc); INIT_CRC32(crc);
COMP_CRC32(crc, &checkPoint, sizeof(checkPoint)); COMP_CRC32(crc, &checkPoint, sizeof(checkPoint));
COMP_CRC32(crc, (char *) record + sizeof(pg_crc32), COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
SizeOfXLogRecord - sizeof(pg_crc32));
FIN_CRC32(crc); FIN_CRC32(crc);
record->xl_crc = crc; record->xl_crc = crc;
...@@ -7720,7 +7774,7 @@ CreateCheckPoint(int flags) ...@@ -7720,7 +7774,7 @@ CreateCheckPoint(int flags)
* checkpoint, even though physically before it. Got that? * checkpoint, even though physically before it. Got that?
*/ */
freespace = INSERT_FREESPACE(Insert); freespace = INSERT_FREESPACE(Insert);
if (freespace < SizeOfXLogRecord) if (freespace == 0)
{ {
(void) AdvanceXLInsertBuffer(false); (void) AdvanceXLInsertBuffer(false);
/* OK to ignore update return flag, since we will do flush anyway */ /* OK to ignore update return flag, since we will do flush anyway */
...@@ -10285,7 +10339,7 @@ retry: ...@@ -10285,7 +10339,7 @@ retry:
fname, readOff))); fname, readOff)));
goto next_record_is_invalid; goto next_record_is_invalid;
} }
if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid; goto next_record_is_invalid;
} }
...@@ -10311,7 +10365,7 @@ retry: ...@@ -10311,7 +10365,7 @@ retry:
fname, readOff))); fname, readOff)));
goto next_record_is_invalid; goto next_record_is_invalid;
} }
if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode)) if (!ValidXLogPageHeader((XLogPageHeader) readBuf, emode))
goto next_record_is_invalid; goto next_record_is_invalid;
Assert(targetSegNo == readSegNo); Assert(targetSegNo == readSegNo);
......
...@@ -942,8 +942,7 @@ WriteEmptyXLOG(void) ...@@ -942,8 +942,7 @@ WriteEmptyXLOG(void)
INIT_CRC32(crc); INIT_CRC32(crc);
COMP_CRC32(crc, &ControlFile.checkPointCopy, sizeof(CheckPoint)); COMP_CRC32(crc, &ControlFile.checkPointCopy, sizeof(CheckPoint));
COMP_CRC32(crc, (char *) record + sizeof(pg_crc32), COMP_CRC32(crc, (char *) record, offsetof(XLogRecord, xl_crc));
SizeOfXLogRecord - sizeof(pg_crc32));
FIN_CRC32(crc); FIN_CRC32(crc);
record->xl_crc = crc; record->xl_crc = crc;
......
...@@ -40,15 +40,16 @@ ...@@ -40,15 +40,16 @@
*/ */
typedef struct XLogRecord typedef struct XLogRecord
{ {
pg_crc32 xl_crc; /* CRC for this record */
XLogRecPtr xl_prev; /* ptr to previous record in log */
TransactionId xl_xid; /* xact id */
uint32 xl_tot_len; /* total len of entire record */ uint32 xl_tot_len; /* total len of entire record */
TransactionId xl_xid; /* xact id */
uint32 xl_len; /* total len of rmgr data */ uint32 xl_len; /* total len of rmgr data */
uint8 xl_info; /* flag bits, see below */ uint8 xl_info; /* flag bits, see below */
RmgrId xl_rmid; /* resource manager for this record */ RmgrId xl_rmid; /* resource manager for this record */
/* 2 bytes of padding here, initialize to zero */
XLogRecPtr xl_prev; /* ptr to previous record in log */
pg_crc32 xl_crc; /* CRC for this record */
/* Depending on MAXALIGN, there are either 2 or 6 wasted bytes here */ /* If MAXALIGN==8, there are 4 wasted bytes here */
/* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */ /* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
......
...@@ -51,7 +51,7 @@ typedef struct BkpBlock ...@@ -51,7 +51,7 @@ typedef struct BkpBlock
/* /*
* Each page of XLOG file has a header like this: * Each page of XLOG file has a header like this:
*/ */
#define XLOG_PAGE_MAGIC 0xD073 /* can be used as WAL version indicator */ #define XLOG_PAGE_MAGIC 0xD074 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData typedef struct XLogPageHeaderData
{ {
...@@ -63,9 +63,7 @@ typedef struct XLogPageHeaderData ...@@ -63,9 +63,7 @@ typedef struct XLogPageHeaderData
/* /*
* When there is not enough space on current page for whole record, we * When there is not enough space on current page for whole record, we
* continue on the next page. xlp_rem_len is the number of bytes * continue on the next page. xlp_rem_len is the number of bytes
* remaining from a previous page. (However, the XLogRecord header will * remaining from a previous page.
* never be split across pages; if there's less than SizeOfXLogRecord
* space left at the end of a page, we just waste it.)
* *
* Note that xl_rem_len includes backup-block data; that is, it tracks * Note that xl_rem_len includes backup-block data; that is, it tracks
* xl_tot_len not xl_len in the initial header. Also note that the * xl_tot_len not xl_len in the initial header. Also note that the
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment