Commit 4016bdef authored by Tom Lane's avatar Tom Lane

Fix assorted bugs in GIN's WAL replay logic.

The original coding was quite sloppy about handling the case where
XLogReadBuffer fails (because the page has since been deleted).  This
would result in either "bad buffer id: 0" or an Assert failure during
replay, if indeed the page were no longer there.  In a couple of places
it also neglected to check whether the change had already been applied,
which would probably result in corrupted index contents.  I believe that
bug #5703 is an instance of the first problem.  These issues could show up
without replication, but only if you were unfortunate enough to crash
between modification of a GIN index and the next checkpoint.

Back-patch to 8.2, which is as far back as GIN has WAL support.
parent 220e45bf
...@@ -77,11 +77,13 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) ...@@ -77,11 +77,13 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true); MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
Assert(BufferIsValid(MetaBuffer)); Assert(BufferIsValid(MetaBuffer));
page = (Page) BufferGetPage(MetaBuffer);
GinInitMetabuffer(MetaBuffer); GinInitMetabuffer(MetaBuffer);
page = (Page) BufferGetPage(MetaBuffer);
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(MetaBuffer);
RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true); RootBuffer = XLogReadBuffer(*node, GIN_ROOT_BLKNO, true);
Assert(BufferIsValid(RootBuffer)); Assert(BufferIsValid(RootBuffer));
...@@ -91,11 +93,10 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) ...@@ -91,11 +93,10 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(MetaBuffer);
UnlockReleaseBuffer(MetaBuffer);
MarkBufferDirty(RootBuffer); MarkBufferDirty(RootBuffer);
UnlockReleaseBuffer(RootBuffer); UnlockReleaseBuffer(RootBuffer);
UnlockReleaseBuffer(MetaBuffer);
} }
static void static void
...@@ -128,21 +129,49 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) ...@@ -128,21 +129,49 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer; Buffer buffer;
Page page; Page page;
/* first, forget any incomplete split this insertion completes */
if (data->isData)
{
Assert(data->isDelete == FALSE);
if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
{
PostingItem *pitem;
pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
forgetIncompleteSplit(data->node,
PostingItemGetBlockNumber(pitem),
data->updateBlkno);
}
}
else
{
if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
{
IndexTuple itup;
itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
forgetIncompleteSplit(data->node,
GinItemPointerGetBlockNumber(&itup->t_tid),
data->updateBlkno);
}
}
/* nothing else to do if page was backed up */ /* nothing else to do if page was backed up */
if (record->xl_info & XLR_BKP_BLOCK_1) if (record->xl_info & XLR_BKP_BLOCK_1)
return; return;
buffer = XLogReadBuffer(data->node, data->blkno, false); buffer = XLogReadBuffer(data->node, data->blkno, false);
Assert(BufferIsValid(buffer)); if (!BufferIsValid(buffer))
return; /* page was deleted, nothing to do */
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
if (!XLByteLE(lsn, PageGetLSN(page)))
{
if (data->isData) if (data->isData)
{ {
Assert(data->isDelete == FALSE);
Assert(GinPageIsData(page)); Assert(GinPageIsData(page));
if (!XLByteLE(lsn, PageGetLSN(page)))
{
if (data->isLeaf) if (data->isLeaf)
{ {
OffsetNumber i; OffsetNumber i;
...@@ -172,23 +201,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) ...@@ -172,23 +201,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
GinDataPageAddItem(page, pitem, data->offset); GinDataPageAddItem(page, pitem, data->offset);
} }
} }
if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
{
PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno);
}
}
else else
{ {
IndexTuple itup; IndexTuple itup;
Assert(!GinPageIsData(page)); Assert(!GinPageIsData(page));
if (!XLByteLE(lsn, PageGetLSN(page)))
{
if (data->updateBlkno != InvalidBlockNumber) if (data->updateBlkno != InvalidBlockNumber)
{ {
/* update link to right page after split */ /* update link to right page after split */
...@@ -212,20 +230,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) ...@@ -212,20 +230,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
data->node.spcNode, data->node.dbNode, data->node.relNode); data->node.spcNode, data->node.dbNode, data->node.relNode);
} }
if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
{
itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno);
}
}
if (!XLByteLE(lsn, PageGetLSN(page)))
{
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer); MarkBufferDirty(buffer);
} }
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
...@@ -244,7 +254,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) ...@@ -244,7 +254,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
if (data->isData) if (data->isData)
flags |= GIN_DATA; flags |= GIN_DATA;
lbuffer = XLogReadBuffer(data->node, data->lblkno, data->isRootSplit); lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
Assert(BufferIsValid(lbuffer)); Assert(BufferIsValid(lbuffer));
lpage = (Page) BufferGetPage(lbuffer); lpage = (Page) BufferGetPage(lbuffer);
GinInitBuffer(lbuffer, flags); GinInitBuffer(lbuffer, flags);
...@@ -321,7 +331,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) ...@@ -321,7 +331,7 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
if (data->isRootSplit) if (data->isRootSplit)
{ {
Buffer rootBuf = XLogReadBuffer(data->node, data->rootBlkno, false); Buffer rootBuf = XLogReadBuffer(data->node, data->rootBlkno, true);
Page rootPage = BufferGetPage(rootBuf); Page rootPage = BufferGetPage(rootBuf);
GinInitBuffer(rootBuf, flags & ~GIN_LEAF); GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
...@@ -357,14 +367,17 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record) ...@@ -357,14 +367,17 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer; Buffer buffer;
Page page; Page page;
/* nothing else to do if page was backed up (and no info to do it with) */ /* nothing to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1) if (record->xl_info & XLR_BKP_BLOCK_1)
return; return;
buffer = XLogReadBuffer(data->node, data->blkno, false); buffer = XLogReadBuffer(data->node, data->blkno, false);
Assert(BufferIsValid(buffer)); if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
if (!XLByteLE(lsn, PageGetLSN(page)))
{
if (GinPageIsData(page)) if (GinPageIsData(page))
{ {
memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage), memcpy(GinDataPageGetData(page), XLogRecGetData(record) + sizeof(ginxlogVacuumPage),
...@@ -394,8 +407,9 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record) ...@@ -394,8 +407,9 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer); MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
...@@ -409,39 +423,57 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) ...@@ -409,39 +423,57 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
if (!(record->xl_info & XLR_BKP_BLOCK_1)) if (!(record->xl_info & XLR_BKP_BLOCK_1))
{ {
buffer = XLogReadBuffer(data->node, data->blkno, false); buffer = XLogReadBuffer(data->node, data->blkno, false);
if (BufferIsValid(buffer))
{
page = BufferGetPage(buffer); page = BufferGetPage(buffer);
if (!XLByteLE(lsn, PageGetLSN(page)))
{
Assert(GinPageIsData(page)); Assert(GinPageIsData(page));
GinPageGetOpaque(page)->flags = GIN_DELETED; GinPageGetOpaque(page)->flags = GIN_DELETED;
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer); MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
}
if (!(record->xl_info & XLR_BKP_BLOCK_2)) if (!(record->xl_info & XLR_BKP_BLOCK_2))
{ {
buffer = XLogReadBuffer(data->node, data->parentBlkno, false); buffer = XLogReadBuffer(data->node, data->parentBlkno, false);
if (BufferIsValid(buffer))
{
page = BufferGetPage(buffer); page = BufferGetPage(buffer);
if (!XLByteLE(lsn, PageGetLSN(page)))
{
Assert(GinPageIsData(page)); Assert(GinPageIsData(page));
Assert(!GinPageIsLeaf(page)); Assert(!GinPageIsLeaf(page));
PageDeletePostingItem(page, data->parentOffset); PageDeletePostingItem(page, data->parentOffset);
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer); MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
}
if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber) if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber)
{ {
buffer = XLogReadBuffer(data->node, data->leftBlkno, false); buffer = XLogReadBuffer(data->node, data->leftBlkno, false);
if (BufferIsValid(buffer))
{
page = BufferGetPage(buffer); page = BufferGetPage(buffer);
if (!XLByteLE(lsn, PageGetLSN(page)))
{
Assert(GinPageIsData(page)); Assert(GinPageIsData(page));
GinPageGetOpaque(page)->rightlink = data->rightLink; GinPageGetOpaque(page)->rightlink = data->rightLink;
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer); MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
}
} }
static void static void
...@@ -450,8 +482,11 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) ...@@ -450,8 +482,11 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record); ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);
Buffer metabuffer; Buffer metabuffer;
Page metapage; Page metapage;
Buffer buffer;
metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false); metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
if (!BufferIsValid(metabuffer))
elog(PANIC, "GIN metapage disappeared");
metapage = BufferGetPage(metabuffer); metapage = BufferGetPage(metabuffer);
if (!XLByteLE(lsn, PageGetLSN(metapage))) if (!XLByteLE(lsn, PageGetLSN(metapage)))
...@@ -469,7 +504,9 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) ...@@ -469,7 +504,9 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
*/ */
if (!(record->xl_info & XLR_BKP_BLOCK_1)) if (!(record->xl_info & XLR_BKP_BLOCK_1))
{ {
Buffer buffer = XLogReadBuffer(data->node, data->metadata.tail, false); buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
if (BufferIsValid(buffer))
{
Page page = BufferGetPage(buffer); Page page = BufferGetPage(buffer);
if (!XLByteLE(lsn, PageGetLSN(page))) if (!XLByteLE(lsn, PageGetLSN(page)))
...@@ -505,13 +542,15 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) ...@@ -505,13 +542,15 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
} }
}
else if (data->prevTail != InvalidBlockNumber) else if (data->prevTail != InvalidBlockNumber)
{ {
/* /*
* New tail * New tail
*/ */
buffer = XLogReadBuffer(data->node, data->prevTail, false);
Buffer buffer = XLogReadBuffer(data->node, data->prevTail, false); if (BufferIsValid(buffer))
{
Page page = BufferGetPage(buffer); Page page = BufferGetPage(buffer);
if (!XLByteLE(lsn, PageGetLSN(page))) if (!XLByteLE(lsn, PageGetLSN(page)))
...@@ -524,6 +563,7 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) ...@@ -524,6 +563,7 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
} }
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
}
UnlockReleaseBuffer(metabuffer); UnlockReleaseBuffer(metabuffer);
} }
...@@ -544,6 +584,7 @@ ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record) ...@@ -544,6 +584,7 @@ ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record)
return; return;
buffer = XLogReadBuffer(data->node, data->blkno, true); buffer = XLogReadBuffer(data->node, data->blkno, true);
Assert(BufferIsValid(buffer));
page = BufferGetPage(buffer); page = BufferGetPage(buffer);
GinInitBuffer(buffer, GIN_LIST); GinInitBuffer(buffer, GIN_LIST);
...@@ -587,6 +628,8 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record) ...@@ -587,6 +628,8 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
int i; int i;
metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false); metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
if (!BufferIsValid(metabuffer))
elog(PANIC, "GIN metapage disappeared");
metapage = BufferGetPage(metabuffer); metapage = BufferGetPage(metabuffer);
if (!XLByteLE(lsn, PageGetLSN(metapage))) if (!XLByteLE(lsn, PageGetLSN(metapage)))
...@@ -600,6 +643,9 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record) ...@@ -600,6 +643,9 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
for (i = 0; i < data->ndeleted; i++) for (i = 0; i < data->ndeleted; i++)
{ {
Buffer buffer = XLogReadBuffer(data->node, data->toDelete[i], false); Buffer buffer = XLogReadBuffer(data->node, data->toDelete[i], false);
if (BufferIsValid(buffer))
{
Page page = BufferGetPage(buffer); Page page = BufferGetPage(buffer);
if (!XLByteLE(lsn, PageGetLSN(page))) if (!XLByteLE(lsn, PageGetLSN(page)))
...@@ -613,6 +659,7 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record) ...@@ -613,6 +659,7 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
}
UnlockReleaseBuffer(metabuffer); UnlockReleaseBuffer(metabuffer);
} }
...@@ -755,6 +802,13 @@ ginContinueSplit(ginIncompleteSplit *split) ...@@ -755,6 +802,13 @@ ginContinueSplit(ginIncompleteSplit *split)
*/ */
buffer = XLogReadBuffer(split->node, split->leftBlkno, false); buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
/*
* Failure should be impossible here, because we wrote the page earlier.
*/
if (!BufferIsValid(buffer))
elog(PANIC, "ginContinueSplit: left block %u not found",
split->leftBlkno);
reln = CreateFakeRelcacheEntry(split->node); reln = CreateFakeRelcacheEntry(split->node);
if (split->rootBlkno == GIN_ROOT_BLKNO) if (split->rootBlkno == GIN_ROOT_BLKNO)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment