Commit 59c02a36 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Fix assert failure at end of recovery, broken by XLogInsert scaling patch.

Initialization of the first XLOG buffer at end-of-recovery was broken for
the case that the last read WAL record ended at a page boundary. Instead of
trying to copy the last full xlog page to the buffer cache in that case,
just set shared state so that the next page is initialized when the first
WAL record after startup is inserted. (that's what we did in earlier
version, too)

To make the shared state required for that case less surprising, replace the
XLogCtl->curridx variable, which was the index of the latest initialized
buffer, with an XLogRecPtr of how far the buffers have been initialized.
That also allows us to get rid of the XLogRecEndPtrToBufIdx macro.

While we're at it, make a similar change for XLogCtl->Write.curridx, getting
rid of that variable and calculating the next buffer to write from
XLogCtl->LogwrtResult instead.
parent 3f2adace
...@@ -457,15 +457,6 @@ typedef struct XLogCtlInsert ...@@ -457,15 +457,6 @@ typedef struct XLogCtlInsert
XLogRecPtr lastBackupStart; XLogRecPtr lastBackupStart;
} XLogCtlInsert; } XLogCtlInsert;
/*
* Shared state data for XLogWrite/XLogFlush.
*/
typedef struct XLogCtlWrite
{
int curridx; /* cache index of next block to write */
pg_time_t lastSegSwitchTime; /* time of last xlog segment switch */
} XLogCtlWrite;
/* /*
* Total shared-memory state for XLOG. * Total shared-memory state for XLOG.
*/ */
...@@ -482,12 +473,12 @@ typedef struct XLogCtlData ...@@ -482,12 +473,12 @@ typedef struct XLogCtlData
XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG XLogSegNo lastRemovedSegNo; /* latest removed/recycled XLOG
* segment */ * segment */
/* Fake LSN counter, for unlogged relations. Protected by ulsn_lck */ /* Fake LSN counter, for unlogged relations. Protected by ulsn_lck. */
XLogRecPtr unloggedLSN; XLogRecPtr unloggedLSN;
slock_t ulsn_lck; slock_t ulsn_lck;
/* Protected by WALWriteLock: */ /* Time of last xlog segment switch. Protected by WALWriteLock. */
XLogCtlWrite Write; pg_time_t lastSegSwitchTime;
/* /*
* Protected by info_lck and WALWriteLock (you must hold either lock to * Protected by info_lck and WALWriteLock (you must hold either lock to
...@@ -496,15 +487,15 @@ typedef struct XLogCtlData ...@@ -496,15 +487,15 @@ typedef struct XLogCtlData
XLogwrtResult LogwrtResult; XLogwrtResult LogwrtResult;
/* /*
* Latest initialized block index in cache. * Latest initialized page in the cache (last byte position + 1).
* *
* To change curridx and the identity of a buffer, you need to hold * To change the identity of a buffer (and InitializedUpTo), you need to
* WALBufMappingLock. To change the identity of a buffer that's still * hold WALBufMappingLock. To change the identity of a buffer that's still
* dirty, the old page needs to be written out first, and for that you * dirty, the old page needs to be written out first, and for that you
* need WALWriteLock, and you need to ensure that there are no in-progress * need WALWriteLock, and you need to ensure that there are no in-progress
* insertions to the page by calling WaitXLogInsertionsToFinish(). * insertions to the page by calling WaitXLogInsertionsToFinish().
*/ */
int curridx; XLogRecPtr InitializedUpTo;
/* /*
* These values do not change after startup, although the pointed-to pages * These values do not change after startup, although the pointed-to pages
...@@ -618,16 +609,10 @@ static ControlFileData *ControlFile = NULL; ...@@ -618,16 +609,10 @@ static ControlFileData *ControlFile = NULL;
/* /*
* XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or
* would hold if it was in cache, the page containing 'recptr'. * would hold if it was in cache, the page containing 'recptr'.
*
* XLogRecEndPtrToBufIdx is the same, but a pointer to the first byte of a
* page is taken to mean the previous page.
*/ */
#define XLogRecPtrToBufIdx(recptr) \ #define XLogRecPtrToBufIdx(recptr) \
(((recptr) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1)) (((recptr) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
#define XLogRecEndPtrToBufIdx(recptr) \
((((recptr) - 1) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1))
/* /*
* These are the number of bytes in a WAL page and segment usable for WAL data. * These are the number of bytes in a WAL page and segment usable for WAL data.
*/ */
...@@ -2409,9 +2394,9 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) ...@@ -2409,9 +2394,9 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
* Now that we have the lock, check if someone initialized the page * Now that we have the lock, check if someone initialized the page
* already. * already.
*/ */
while (upto >= XLogCtl->xlblocks[XLogCtl->curridx] || opportunistic) while (upto >= XLogCtl->InitializedUpTo || opportunistic)
{ {
nextidx = NextBufIdx(XLogCtl->curridx); nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
/* /*
* Get ending-offset of the buffer page we need to replace (this may * Get ending-offset of the buffer page we need to replace (this may
...@@ -2484,11 +2469,9 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) ...@@ -2484,11 +2469,9 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
* Now the next buffer slot is free and we can set it up to be the next * Now the next buffer slot is free and we can set it up to be the next
* output page. * output page.
*/ */
NewPageBeginPtr = XLogCtl->xlblocks[XLogCtl->curridx]; NewPageBeginPtr = XLogCtl->InitializedUpTo;
NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ;
Assert(NewPageEndPtr % XLOG_BLCKSZ == 0);
Assert(XLogRecEndPtrToBufIdx(NewPageEndPtr) == nextidx);
Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx); Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx);
NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
...@@ -2547,7 +2530,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) ...@@ -2547,7 +2530,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
*((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr; *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr;
XLogCtl->curridx = nextidx; XLogCtl->InitializedUpTo = NewPageEndPtr;
npages++; npages++;
} }
...@@ -2598,7 +2581,6 @@ XLogCheckpointNeeded(XLogSegNo new_segno) ...@@ -2598,7 +2581,6 @@ XLogCheckpointNeeded(XLogSegNo new_segno)
static void static void
XLogWrite(XLogwrtRqst WriteRqst, bool flexible) XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
{ {
XLogCtlWrite *Write = &XLogCtl->Write;
bool ispartialpage; bool ispartialpage;
bool last_iteration; bool last_iteration;
bool finishing_seg; bool finishing_seg;
...@@ -2631,12 +2613,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) ...@@ -2631,12 +2613,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
/* /*
* Within the loop, curridx is the cache block index of the page to * Within the loop, curridx is the cache block index of the page to
* consider writing. We advance Write->curridx only after successfully * consider writing. Begin at the buffer containing the next unwritten
* writing pages. (Right now, this refinement is useless since we are * page, or last partially written page.
* going to PANIC if any error occurs anyway; but someday it may come in
* useful.)
*/ */
curridx = Write->curridx; curridx = XLogRecPtrToBufIdx(LogwrtResult.Write);
while (LogwrtResult.Write < WriteRqst.Write) while (LogwrtResult.Write < WriteRqst.Write)
{ {
...@@ -2747,7 +2727,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) ...@@ -2747,7 +2727,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
/* Update state for write */ /* Update state for write */
openLogOff += nbytes; openLogOff += nbytes;
Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
npages = 0; npages = 0;
/* /*
...@@ -2775,7 +2754,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) ...@@ -2775,7 +2754,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
if (XLogArchivingActive()) if (XLogArchivingActive())
XLogArchiveNotifySeg(openLogSegNo); XLogArchiveNotifySeg(openLogSegNo);
Write->lastSegSwitchTime = (pg_time_t) time(NULL); XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
/* /*
* Request a checkpoint if we've consumed too much xlog since * Request a checkpoint if we've consumed too much xlog since
...@@ -2807,7 +2786,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible) ...@@ -2807,7 +2786,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
} }
Assert(npages == 0); Assert(npages == 0);
Assert(curridx == Write->curridx);
/* /*
* If asked to flush, do so * If asked to flush, do so
...@@ -6021,12 +5999,10 @@ StartupXLOG(void) ...@@ -6021,12 +5999,10 @@ StartupXLOG(void)
XLogSegNo endLogSegNo; XLogSegNo endLogSegNo;
TimeLineID PrevTimeLineID; TimeLineID PrevTimeLineID;
XLogRecord *record; XLogRecord *record;
uint32 freespace;
TransactionId oldestActiveXID; TransactionId oldestActiveXID;
bool backupEndRequired = false; bool backupEndRequired = false;
bool backupFromStandby = false; bool backupFromStandby = false;
DBState dbstate_at_startup; DBState dbstate_at_startup;
int firstIdx;
XLogReaderState *xlogreader; XLogReaderState *xlogreader;
XLogPageReadPrivate private; XLogPageReadPrivate private;
bool fast_promoted = false; bool fast_promoted = false;
...@@ -7034,48 +7010,51 @@ StartupXLOG(void) ...@@ -7034,48 +7010,51 @@ StartupXLOG(void)
openLogOff = 0; openLogOff = 0;
Insert = &XLogCtl->Insert; Insert = &XLogCtl->Insert;
Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec); Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec);
Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
firstIdx = XLogRecEndPtrToBufIdx(EndOfLog);
XLogCtl->curridx = firstIdx;
XLogCtl->xlblocks[firstIdx] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
/* /*
* Tricky point here: readBuf contains the *last* block that the LastRec * Tricky point here: readBuf contains the *last* block that the LastRec
* record spans, not the one it starts in. The last block is indeed the * record spans, not the one it starts in. The last block is indeed the
* one we want to use. * one we want to use.
*/ */
Assert(readOff == (XLogCtl->xlblocks[firstIdx] - XLOG_BLCKSZ) % XLogSegSize); if (EndOfLog % XLOG_BLCKSZ != 0)
memcpy((char *) &XLogCtl->pages[firstIdx * XLOG_BLCKSZ], xlogreader->readBuf, XLOG_BLCKSZ); {
Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog); char *page;
int len;
int firstIdx;
XLogRecPtr pageBeginPtr;
LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; pageBeginPtr = EndOfLog - (EndOfLog % XLOG_BLCKSZ);
Assert(readOff == pageBeginPtr % XLogSegSize);
XLogCtl->LogwrtResult = LogwrtResult; firstIdx = XLogRecPtrToBufIdx(EndOfLog);
XLogCtl->LogwrtRqst.Write = EndOfLog; /* Copy the valid part of the last block, and zero the rest */
XLogCtl->LogwrtRqst.Flush = EndOfLog; page = &XLogCtl->pages[firstIdx * XLOG_BLCKSZ];
len = EndOfLog % XLOG_BLCKSZ;
memcpy(page, xlogreader->readBuf, len);
memset(page + len, 0, XLOG_BLCKSZ - len);
freespace = INSERT_FREESPACE(EndOfLog); XLogCtl->xlblocks[firstIdx] = pageBeginPtr + XLOG_BLCKSZ;
if (freespace > 0) XLogCtl->InitializedUpTo = pageBeginPtr + XLOG_BLCKSZ;
{
/* Make sure rest of page is zero */
MemSet(&XLogCtl->pages[firstIdx * XLOG_BLCKSZ] + EndOfLog % XLOG_BLCKSZ, 0, freespace);
XLogCtl->Write.curridx = firstIdx;
} }
else else
{ {
/* /*
* Whenever LogwrtResult points to exactly the end of a page, * There is no partial block to copy. Just set InitializedUpTo,
* Write.curridx must point to the *next* page (see XLogWrite()). * and let the first attempt to insert a log record to initialize
* * the next buffer.
* Note: it might seem we should do AdvanceXLInsertBuffer() here, but
* this is sufficient. The first actual attempt to insert a log
* record will advance the insert state.
*/ */
XLogCtl->Write.curridx = NextBufIdx(firstIdx); XLogCtl->InitializedUpTo = EndOfLog;
} }
LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
XLogCtl->LogwrtResult = LogwrtResult;
XLogCtl->LogwrtRqst.Write = EndOfLog;
XLogCtl->LogwrtRqst.Flush = EndOfLog;
/* Pre-scan prepared transactions to find out the range of XIDs present */ /* Pre-scan prepared transactions to find out the range of XIDs present */
oldestActiveXID = PrescanPreparedTransactions(NULL, NULL); oldestActiveXID = PrescanPreparedTransactions(NULL, NULL);
...@@ -7199,7 +7178,7 @@ StartupXLOG(void) ...@@ -7199,7 +7178,7 @@ StartupXLOG(void)
LWLockRelease(ControlFileLock); LWLockRelease(ControlFileLock);
/* start the archive_timeout timer running */ /* start the archive_timeout timer running */
XLogCtl->Write.lastSegSwitchTime = (pg_time_t) time(NULL); XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
/* also initialize latestCompletedXid, to nextXid - 1 */ /* also initialize latestCompletedXid, to nextXid - 1 */
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
...@@ -7710,7 +7689,7 @@ GetLastSegSwitchTime(void) ...@@ -7710,7 +7689,7 @@ GetLastSegSwitchTime(void)
/* Need WALWriteLock, but shared lock is sufficient */ /* Need WALWriteLock, but shared lock is sufficient */
LWLockAcquire(WALWriteLock, LW_SHARED); LWLockAcquire(WALWriteLock, LW_SHARED);
result = XLogCtl->Write.lastSegSwitchTime; result = XLogCtl->lastSegSwitchTime;
LWLockRelease(WALWriteLock); LWLockRelease(WALWriteLock);
return result; return result;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment