Commit bf5ccf38 authored by Teodor Sigaev's avatar Teodor Sigaev

- Add check of already changed page while replay WAL. This touches only

ginRedoInsert(), because other ginRedo* functions rewrite whole page or
make changes which could be applied several times without consistent's loss

- Remove check of identifying of corresponding split record:
it's possible that replaying of WAL starts after actual page split, but before
removing of that split from incomplete splits list. In this case, that check
cause FATAL error.

Per stress test which reproduces bug reported by Craig McElroy
<craig.mcelroy@contegix.com>
parent f7967d4c
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gin/ginxlog.c,v 1.9 2007/09/20 17:56:30 tgl Exp $ * $PostgreSQL: pgsql/src/backend/access/gin/ginxlog.c,v 1.10 2007/10/29 19:26:57 teodor Exp $
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h" #include "postgres.h"
...@@ -53,7 +53,6 @@ static void ...@@ -53,7 +53,6 @@ static void
forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno) forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno)
{ {
ListCell *l; ListCell *l;
bool found = false;
foreach(l, incomplete_splits) foreach(l, incomplete_splits)
{ {
...@@ -62,16 +61,9 @@ forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updat ...@@ -62,16 +61,9 @@ forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updat
if (RelFileNodeEquals(node, split->node) && leftBlkno == split->leftBlkno && updateBlkno == split->rightBlkno) if (RelFileNodeEquals(node, split->node) && leftBlkno == split->leftBlkno && updateBlkno == split->rightBlkno)
{ {
incomplete_splits = list_delete_ptr(incomplete_splits, split); incomplete_splits = list_delete_ptr(incomplete_splits, split);
found = true;
break; break;
} }
} }
if (!found)
{
elog(ERROR, "failed to identify corresponding split record for %u/%u/%u",
node.relNode, leftBlkno, updateBlkno);
}
} }
static void static void
...@@ -129,7 +121,7 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) ...@@ -129,7 +121,7 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer; Buffer buffer;
Page page; Page page;
/* nothing else to do if page was backed up (and no info to do it with) */ /* nothing else to do if page was backed up */
if (record->xl_info & XLR_BKP_BLOCK_1) if (record->xl_info & XLR_BKP_BLOCK_1)
return; return;
...@@ -143,37 +135,44 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) ...@@ -143,37 +135,44 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
Assert(data->isDelete == FALSE); Assert(data->isDelete == FALSE);
Assert(GinPageIsData(page)); Assert(GinPageIsData(page));
if (data->isLeaf) if ( ! XLByteLE(lsn, PageGetLSN(page)) )
{ {
OffsetNumber i; if (data->isLeaf)
ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); {
OffsetNumber i;
ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
Assert(GinPageIsLeaf(page)); Assert(GinPageIsLeaf(page));
Assert(data->updateBlkno == InvalidBlockNumber); Assert(data->updateBlkno == InvalidBlockNumber);
for (i = 0; i < data->nitem; i++) for (i = 0; i < data->nitem; i++)
GinDataPageAddItem(page, items + i, data->offset + i); GinDataPageAddItem(page, items + i, data->offset + i);
} }
else else
{ {
PostingItem *pitem; PostingItem *pitem;
Assert(!GinPageIsLeaf(page)); Assert(!GinPageIsLeaf(page));
if (data->updateBlkno != InvalidBlockNumber) if (data->updateBlkno != InvalidBlockNumber)
{ {
/* update link to right page after split */ /* update link to right page after split */
pitem = (PostingItem *) GinDataPageGetItem(page, data->offset); pitem = (PostingItem *) GinDataPageGetItem(page, data->offset);
PostingItemSetBlockNumber(pitem, data->updateBlkno); PostingItemSetBlockNumber(pitem, data->updateBlkno);
} }
pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert)); pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
GinDataPageAddItem(page, pitem, data->offset); GinDataPageAddItem(page, pitem, data->offset);
}
}
if (data->updateBlkno != InvalidBlockNumber) if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno); {
PostingItem *pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
forgetIncompleteSplit(data->node, PostingItemGetBlockNumber(pitem), data->updateBlkno);
} }
} }
else else
{ {
...@@ -181,36 +180,45 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) ...@@ -181,36 +180,45 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
Assert(!GinPageIsData(page)); Assert(!GinPageIsData(page));
if (data->updateBlkno != InvalidBlockNumber) if ( ! XLByteLE(lsn, PageGetLSN(page)) )
{ {
/* update link to right page after split */ if (data->updateBlkno != InvalidBlockNumber)
Assert(!GinPageIsLeaf(page)); {
Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page)); /* update link to right page after split */
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset)); Assert(!GinPageIsLeaf(page));
ItemPointerSet(&itup->t_tid, data->updateBlkno, InvalidOffsetNumber); Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
} itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
ItemPointerSet(&itup->t_tid, data->updateBlkno, InvalidOffsetNumber);
}
if (data->isDelete) if (data->isDelete)
{ {
Assert(GinPageIsLeaf(page)); Assert(GinPageIsLeaf(page));
Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page)); Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
PageIndexTupleDelete(page, data->offset); PageIndexTupleDelete(page, data->offset);
} }
itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert)); itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber) if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index page in %u/%u/%u", elog(ERROR, "failed to add item to index page in %u/%u/%u",
data->node.spcNode, data->node.dbNode, data->node.relNode); data->node.spcNode, data->node.dbNode, data->node.relNode);
}
if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber) if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
{
itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno); forgetIncompleteSplit(data->node, GinItemPointerGetBlockNumber(&itup->t_tid), data->updateBlkno);
}
} }
PageSetLSN(page, lsn); if ( ! XLByteLE(lsn, PageGetLSN(page)) )
PageSetTLI(page, ThisTimeLineID); {
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer); MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment