Commit 5890790b authored by Teodor Sigaev's avatar Teodor Sigaev

Rework completion of incomplete inserts. Now it writes

WAL log during inserts.
parent 19892feb
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.21 2006/05/17 16:34:59 teodor Exp $ * $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.22 2006/05/19 11:10:25 teodor Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -104,19 +104,25 @@ gistDeleteSubtree( GistVacuum *gv, BlockNumber blkno ) { ...@@ -104,19 +104,25 @@ gistDeleteSubtree( GistVacuum *gv, BlockNumber blkno ) {
if (!gv->index->rd_istemp) if (!gv->index->rd_istemp)
{ {
XLogRecData rdata; XLogRecData rdata[2];
XLogRecPtr recptr; XLogRecPtr recptr;
gistxlogPageDelete xlrec; gistxlogPageDelete xlrec;
xlrec.node = gv->index->rd_node; xlrec.node = gv->index->rd_node;
xlrec.blkno = blkno; xlrec.blkno = blkno;
rdata.buffer = InvalidBuffer; rdata[0].buffer = buffer;
rdata.data = (char *) &xlrec; rdata[0].buffer_std = true;
rdata.len = sizeof(gistxlogPageDelete); rdata[0].data = NULL;
rdata.next = NULL; rdata[0].len = 0;
rdata[0].next = &(rdata[1]);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, &rdata); rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) &xlrec;
rdata[1].len = sizeof(gistxlogPageDelete);
rdata[1].next = NULL;
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_DELETE, rdata);
PageSetLSN(page, recptr); PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
} }
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.17 2006/05/17 16:34:59 teodor Exp $ * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.18 2006/05/19 11:10:25 teodor Exp $
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h" #include "postgres.h"
...@@ -73,8 +73,18 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, ...@@ -73,8 +73,18 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key,
BlockNumber *blkno, int lenblk, BlockNumber *blkno, int lenblk,
PageSplitRecord *xlinfo /* to extract blkno info */ ) PageSplitRecord *xlinfo /* to extract blkno info */ )
{ {
MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx); MemoryContext oldCxt;
gistIncompleteInsert *ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert)); gistIncompleteInsert *ninsert;
if ( !ItemPointerIsValid(&key) )
/*
* if key is null then we should not store insertion as incomplete,
* because it's a vacuum operation..
*/
return;
oldCxt = MemoryContextSwitchTo(insertCtx);
ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
ninsert->node = node; ninsert->node = node;
ninsert->key = key; ninsert->key = key;
...@@ -115,6 +125,12 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key) ...@@ -115,6 +125,12 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
{ {
ListCell *l; ListCell *l;
if ( !ItemPointerIsValid(&key) )
return;
if (incomplete_inserts==NIL)
return;
foreach(l, incomplete_inserts) foreach(l, incomplete_inserts)
{ {
gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
...@@ -180,16 +196,13 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) ...@@ -180,16 +196,13 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
Page page; Page page;
/* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */ /* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */
if (ItemPointerIsValid(&(xldata->key)))
{
if (incomplete_inserts != NIL)
forgetIncompleteInsert(xldata->node, xldata->key); forgetIncompleteInsert(xldata->node, xldata->key);
if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO) if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
/* operation with root always finalizes insertion */
pushIncompleteInsert(xldata->node, lsn, xldata->key, pushIncompleteInsert(xldata->node, lsn, xldata->key,
&(xldata->blkno), 1, &(xldata->blkno), 1,
NULL); NULL);
}
/* nothing else to do if page was backed up (and no info to do it with) */ /* nothing else to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1) if (record->xl_info & XLR_BKP_BLOCK_1)
...@@ -252,12 +265,15 @@ gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record) ...@@ -252,12 +265,15 @@ gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer; Buffer buffer;
Page page; Page page;
/* nothing else to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
reln = XLogOpenRelation(xldata->node); reln = XLogOpenRelation(xldata->node);
buffer = XLogReadBuffer(reln, xldata->blkno, false); buffer = XLogReadBuffer(reln, xldata->blkno, false);
if (!BufferIsValid(buffer)) if (!BufferIsValid(buffer))
return; return;
GISTInitBuffer( buffer, 0 );
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
GistPageSetDeleted(page); GistPageSetDeleted(page);
...@@ -333,15 +349,11 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) ...@@ -333,15 +349,11 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
if (ItemPointerIsValid(&(xlrec.data->key)))
{
if (incomplete_inserts != NIL)
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key); forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key, pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
NULL, 0, NULL, 0,
&xlrec); &xlrec);
}
} }
static void static void
...@@ -536,7 +548,43 @@ gistxlogFindPath(Relation index, gistIncompleteInsert *insert) ...@@ -536,7 +548,43 @@ gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
insert->path[i++] = ptr->blkno; insert->path[i++] = ptr->blkno;
} }
else else
elog(LOG, "lost parent for block %u", insert->origblkno); elog(ERROR, "lost parent for block %u", insert->origblkno);
}
static SplitedPageLayout*
gistMakePageLayout(Buffer *buffers, int nbuffers) {
SplitedPageLayout *res=NULL, *resptr;
while( nbuffers-- > 0 ) {
Page page = BufferGetPage( buffers[ nbuffers ] );
IndexTuple idxtup;
OffsetNumber i;
char *ptr;
resptr = (SplitedPageLayout*)palloc0( sizeof(SplitedPageLayout) );
resptr->block.blkno = BufferGetBlockNumber( buffers[ nbuffers ] );
resptr->block.num = PageGetMaxOffsetNumber( page );
for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
resptr->lenlist += IndexTupleSize(idxtup);
}
resptr->list = (IndexTupleData*)palloc( resptr->lenlist );
ptr = (char*)(resptr->list);
for(i=FirstOffsetNumber; i<= PageGetMaxOffsetNumber( page ); i++) {
idxtup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
memcpy( ptr, idxtup, IndexTupleSize(idxtup) );
ptr += IndexTupleSize(idxtup);
}
resptr->next = res;
res = resptr;
}
return res;
} }
/* /*
...@@ -549,10 +597,10 @@ gistxlogFindPath(Relation index, gistIncompleteInsert *insert) ...@@ -549,10 +597,10 @@ gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
* unfinished insertion. In particular it's safe to invoke gistFindPath(); * unfinished insertion. In particular it's safe to invoke gistFindPath();
* there shouldn't be any garbage pages for it to run into. * there shouldn't be any garbage pages for it to run into.
* *
* Although stored LSN in gistIncompleteInsert is a LSN of child page, * To complete insert we can't use basic insertion algorithm because
* we can compare it with LSN of parent, because parent is always locked * during insertion we can't call user-defined support functions of opclass.
* while we change child page (look at gistmakedeal). So if parent's LSN is * So, we insert 'invalid' tuples without real key and do it by separate algorithm.
* less than stored lsn then changes in parent aren't done yet. * 'invalid' tuple should be updated by vacuum full.
*/ */
static void static void
gistContinueInsert(gistIncompleteInsert *insert) gistContinueInsert(gistIncompleteInsert *insert)
...@@ -574,39 +622,27 @@ gistContinueInsert(gistIncompleteInsert *insert) ...@@ -574,39 +622,27 @@ gistContinueInsert(gistIncompleteInsert *insert)
for (i = 0; i < insert->lenblk; i++) for (i = 0; i < insert->lenblk; i++)
itup[i] = gist_form_invalid_tuple(insert->blkno[i]); itup[i] = gist_form_invalid_tuple(insert->blkno[i]);
/*
* any insertion of itup[] should make LOG message about
*/
if (insert->origblkno == GIST_ROOT_BLKNO) if (insert->origblkno == GIST_ROOT_BLKNO)
{ {
/* /*
* it was split root, so we should only make new root. it can't be * it was split root, so we should only make new root. it can't be
* simple insert into root, look at call pushIncompleteInsert in * simple insert into root, we should replace all content of root.
* gistRedoPageSplitRecord
*/ */
Buffer buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true); Buffer buffer = XLogReadBuffer(index, GIST_ROOT_BLKNO, true);
Page page;
Assert(BufferIsValid(buffer)); gistnewroot(index, buffer, itup, lenitup, NULL);
page = BufferGetPage(buffer);
GISTInitBuffer(buffer, 0);
gistfillbuffer(index, page, itup, lenitup, FirstOffsetNumber);
PageSetLSN(page, insert->lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
/*
* XXX fall out to avoid making LOG message at bottom of routine.
* I think the logic for when to emit that message is all wrong...
*/
return;
} }
else else
{ {
Buffer *buffers; Buffer *buffers;
Page *pages; Page *pages;
int numbuffer; int numbuffer;
OffsetNumber *todelete;
/* construct path */ /* construct path */
gistxlogFindPath(index, insert); gistxlogFindPath(index, insert);
...@@ -615,49 +651,60 @@ gistContinueInsert(gistIncompleteInsert *insert) ...@@ -615,49 +651,60 @@ gistContinueInsert(gistIncompleteInsert *insert)
buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ )); buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ )); pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ ));
for (i = 0; i < insert->pathlen; i++) for (i = 0; i < insert->pathlen; i++)
{ {
int j, int j,
k, k,
pituplen = 0, pituplen = 0;
childfound = 0; XLogRecData *rdata;
XLogRecPtr recptr;
Buffer tempbuffer = InvalidBuffer;
int ntodelete = 0;
numbuffer = 1; numbuffer = 1;
buffers[numbuffer - 1] = ReadBuffer(index, insert->path[i]); buffers[0] = ReadBuffer(index, insert->path[i]);
LockBuffer(buffers[numbuffer - 1], GIST_EXCLUSIVE); LockBuffer(buffers[0], GIST_EXCLUSIVE);
pages[numbuffer - 1] = BufferGetPage(buffers[numbuffer - 1]); /*
* we check buffer, because we restored page earlier
*/
gistcheckpage(index, buffers[0]);
if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1]))) pages[0] = BufferGetPage(buffers[0]);
{ Assert( !GistPageIsLeaf(pages[0]) );
UnlockReleaseBuffer(buffers[numbuffer - 1]);
return;
}
pituplen = PageGetMaxOffsetNumber(pages[numbuffer - 1]); pituplen = PageGetMaxOffsetNumber(pages[0]);
/* remove old IndexTuples */ /* find remove old IndexTuples to remove */
for (j = 0; j < pituplen && childfound < lenitup; j++) for (j = 0; j < pituplen && ntodelete < lenitup; j++)
{ {
BlockNumber blkno; BlockNumber blkno;
ItemId iid = PageGetItemId(pages[numbuffer - 1], j + FirstOffsetNumber); ItemId iid = PageGetItemId(pages[0], j + FirstOffsetNumber);
IndexTuple idxtup = (IndexTuple) PageGetItem(pages[numbuffer - 1], iid); IndexTuple idxtup = (IndexTuple) PageGetItem(pages[0], iid);
blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid)); blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));
for (k = 0; k < lenitup; k++) for (k = 0; k < lenitup; k++)
if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno) if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
{ {
PageIndexTupleDelete(pages[numbuffer - 1], j + FirstOffsetNumber); todelete[ntodelete] = j + FirstOffsetNumber - ntodelete;
j--; ntodelete++;
pituplen--;
childfound++;
break; break;
} }
} }
if (gistnospace(pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber)) if ( ntodelete == 0 )
elog(PANIC,"gistContinueInsert: can't find pointer to page(s)");
/*
* we check space with subtraction only first tuple to delete, hope,
* that wiil be enough space....
*/
if (gistnospace(pages[0], itup, lenitup, *todelete))
{ {
/* no space left on page, so we must split */ /* no space left on page, so we must split */
buffers[numbuffer] = ReadBuffer(index, P_NEW); buffers[numbuffer] = ReadBuffer(index, P_NEW);
LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
...@@ -668,62 +715,86 @@ gistContinueInsert(gistIncompleteInsert *insert) ...@@ -668,62 +715,86 @@ gistContinueInsert(gistIncompleteInsert *insert)
if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO) if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
{ {
IndexTuple *parentitup; Buffer tmp;
/* /*
* we split root, just copy tuples from old root to new * we split root, just copy content from root to new page
* page
*/ */
parentitup = gistextractpage(pages[numbuffer - 1],
&pituplen);
/* sanity check */ /* sanity check */
if (i + 1 != insert->pathlen) if (i + 1 != insert->pathlen)
elog(PANIC, "unexpected pathlen in index \"%s\"", elog(PANIC, "unexpected pathlen in index \"%s\"",
RelationGetRelationName(index)); RelationGetRelationName(index));
/* fill new page */ /* fill new page, root will be changed later */
buffers[numbuffer] = ReadBuffer(index, P_NEW); tempbuffer = ReadBuffer(index, P_NEW);
LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); LockBuffer(tempbuffer, GIST_EXCLUSIVE);
GISTInitBuffer(buffers[numbuffer], 0); memcpy( BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer) );
pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber);
numbuffer++;
/* fill root page */ /* swap buffers[0] (was root) and temp buffer */
GISTInitBuffer(buffers[0], 0); tmp = buffers[0];
for (j = 1; j < numbuffer; j++) buffers[0] = tempbuffer;
{ tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO, it is still unchanged */
IndexTuple tuple = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
pages[0] = BufferGetPage(buffers[0]);
if (PageAddItem(pages[0],
(Item) tuple,
IndexTupleSize(tuple),
(OffsetNumber) j,
LP_USED) == InvalidOffsetNumber)
elog(PANIC, "failed to add item to index page in \"%s\"",
RelationGetRelationName(index));
}
} }
START_CRIT_SECTION();
for(j=0;j<ntodelete;j++)
PageIndexTupleDelete(pages[0], todelete[j]);
rdata = formSplitRdata(index->rd_node, insert->path[i],
false, &(insert->key),
gistMakePageLayout( buffers, numbuffer ) );
} else {
START_CRIT_SECTION();
for(j=0;j<ntodelete;j++)
PageIndexTupleDelete(pages[0], todelete[j]);
gistfillbuffer(index, pages[0], itup, lenitup, InvalidOffsetNumber);
rdata = formUpdateRdata(index->rd_node, buffers[0],
todelete, ntodelete,
itup, lenitup, &(insert->key));
} }
else
gistfillbuffer(index, pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber);
lenitup = numbuffer; /*
* use insert->key as mark for completion of insert (form*Rdata() above)
* for following possible replays
*/
/* write pages with XLOG LSN */
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
for (j = 0; j < numbuffer; j++) for (j = 0; j < numbuffer; j++)
{ {
itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j])); PageSetLSN(pages[j], recptr);
PageSetLSN(pages[j], insert->lsn);
PageSetTLI(pages[j], ThisTimeLineID); PageSetTLI(pages[j], ThisTimeLineID);
GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber; GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
MarkBufferDirty(buffers[j]); MarkBufferDirty(buffers[j]);
}
END_CRIT_SECTION();
lenitup = numbuffer;
for (j = 0; j < numbuffer; j++) {
itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
UnlockReleaseBuffer(buffers[j]); UnlockReleaseBuffer(buffers[j]);
} }
if ( tempbuffer != InvalidBuffer ) {
/*
* it was a root split, so fill it by new values
*/
gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key));
UnlockReleaseBuffer(tempbuffer);
}
} }
} }
ereport(LOG, ereport(LOG,
(errmsg("index %u/%u/%u needs VACUUM or REINDEX to finish crash recovery", (errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
insert->node.spcNode, insert->node.dbNode, insert->node.relNode), insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
errdetail("Incomplete insertion detected during crash replay."))); errdetail("Incomplete insertion detected during crash replay.")));
} }
...@@ -747,6 +818,7 @@ gist_xlog_cleanup(void) ...@@ -747,6 +818,7 @@ gist_xlog_cleanup(void)
MemoryContext oldCxt; MemoryContext oldCxt;
oldCxt = MemoryContextSwitchTo(opCtx); oldCxt = MemoryContextSwitchTo(opCtx);
foreach(l, incomplete_inserts) foreach(l, incomplete_inserts)
{ {
gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment