Commit b18c09ee authored by Vadim B. Mikheev's avatar Vadim B. Mikheev

Runtime tree recovery is implemented, just testing is left -:)

parent b60c57da
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.79 2001/01/31 01:08:36 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.80 2001/02/02 19:49:15 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -38,7 +38,10 @@ extern bool FixBTree; ...@@ -38,7 +38,10 @@ extern bool FixBTree;
Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release); Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release);
static void _bt_fixtree(Relation rel, BlockNumber blkno); static void _bt_fixtree(Relation rel, BlockNumber blkno);
static BlockNumber _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit); static void _bt_fixbranch(Relation rel, BlockNumber lblkno,
BlockNumber rblkno, BTStack true_stack);
static void _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit);
static void _bt_fixup(Relation rel, Buffer buf);
static OffsetNumber _bt_getoff(Page page, BlockNumber blkno); static OffsetNumber _bt_getoff(Page page, BlockNumber blkno);
static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
...@@ -64,7 +67,7 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page, ...@@ -64,7 +67,7 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
int leftfree, int rightfree, int leftfree, int rightfree,
bool newitemonleft, Size firstrightitemsz); bool newitemonleft, Size firstrightitemsz);
static Buffer _bt_getstackbuf(Relation rel, BTStack stack); static Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
static void _bt_pgaddtup(Relation rel, Page page, static void _bt_pgaddtup(Relation rel, Page page,
Size itemsize, BTItem btitem, Size itemsize, BTItem btitem,
OffsetNumber itup_off, const char *where); OffsetNumber itup_off, const char *where);
...@@ -497,34 +500,7 @@ _bt_insertonpg(Relation rel, ...@@ -497,34 +500,7 @@ _bt_insertonpg(Relation rel,
elog(ERROR, "bt_insertonpg: no root page found"); elog(ERROR, "bt_insertonpg: no root page found");
_bt_wrtbuf(rel, rbuf); _bt_wrtbuf(rel, rbuf);
_bt_wrtnorelbuf(rel, buf); _bt_wrtnorelbuf(rel, buf);
while(! P_LEFTMOST(lpageop)) _bt_fixup(rel, buf);
{
BlockNumber blkno = lpageop->btpo_prev;
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buf);
buf = _bt_getbuf(rel, blkno, BT_WRITE);
page = BufferGetPage(buf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
/*
* If someone else already created parent pages
* then it's time for _bt_fixtree() to check upper
* levels and fix them, if required.
*/
if (lpageop->btpo_parent != BTREE_METAPAGE)
{
blkno = lpageop->btpo_parent;
_bt_relbuf(rel, buf, BT_WRITE);
_bt_fixtree(rel, blkno);
goto formres;
}
}
/*
* Ok, we are on the leftmost page, it's write locked
* by us and its btpo_parent points to meta page - time
* for _bt_fixroot().
*/
buf = _bt_fixroot(rel, buf, true);
_bt_relbuf(rel, buf, BT_WRITE);
goto formres; goto formres;
} }
...@@ -561,16 +537,22 @@ _bt_insertonpg(Relation rel, ...@@ -561,16 +537,22 @@ _bt_insertonpg(Relation rel,
ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid), ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
bknum, P_HIKEY); bknum, P_HIKEY);
pbuf = _bt_getstackbuf(rel, stack); pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
if (pbuf == InvalidBuffer)
elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
"\n\tRecreate index %s.", RelationGetRelationName(rel));
/* Now we can write and unlock the children */ /* Now we can write and unlock the children */
_bt_wrtbuf(rel, rbuf); _bt_wrtbuf(rel, rbuf);
_bt_wrtbuf(rel, buf); _bt_wrtbuf(rel, buf);
if (pbuf == InvalidBuffer)
{
if (!FixBTree)
elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
"\n\tRecreate index %s.", RelationGetRelationName(rel));
pfree(new_item);
_bt_fixbranch(rel, bknum, rbknum, stack);
goto formres;
}
/* Recursively update the parent */ /* Recursively update the parent */
newres = _bt_insertonpg(rel, pbuf, stack->bts_parent, newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
0, NULL, new_item, stack->bts_offset); 0, NULL, new_item, stack->bts_offset);
...@@ -1132,7 +1114,7 @@ _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright, ...@@ -1132,7 +1114,7 @@ _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
* Also, re-set bts_blkno & bts_offset if changed. * Also, re-set bts_blkno & bts_offset if changed.
*/ */
static Buffer static Buffer
_bt_getstackbuf(Relation rel, BTStack stack) _bt_getstackbuf(Relation rel, BTStack stack, int access)
{ {
BlockNumber blkno; BlockNumber blkno;
Buffer buf; Buffer buf;
...@@ -1145,7 +1127,7 @@ _bt_getstackbuf(Relation rel, BTStack stack) ...@@ -1145,7 +1127,7 @@ _bt_getstackbuf(Relation rel, BTStack stack)
BTPageOpaque opaque; BTPageOpaque opaque;
blkno = stack->bts_blkno; blkno = stack->bts_blkno;
buf = _bt_getbuf(rel, blkno, BT_WRITE); buf = _bt_getbuf(rel, blkno, access);
page = BufferGetPage(buf); page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page); opaque = (BTPageOpaque) PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page); maxoff = PageGetMaxOffsetNumber(page);
...@@ -1179,13 +1161,13 @@ _bt_getstackbuf(Relation rel, BTStack stack) ...@@ -1179,13 +1161,13 @@ _bt_getstackbuf(Relation rel, BTStack stack)
/* by here, the item we're looking for moved right at least one page */ /* by here, the item we're looking for moved right at least one page */
if (P_RIGHTMOST(opaque)) if (P_RIGHTMOST(opaque))
{ {
_bt_relbuf(rel, buf, BT_WRITE); _bt_relbuf(rel, buf, access);
return(InvalidBuffer); return(InvalidBuffer);
} }
blkno = opaque->btpo_next; blkno = opaque->btpo_next;
_bt_relbuf(rel, buf, BT_WRITE); _bt_relbuf(rel, buf, access);
buf = _bt_getbuf(rel, blkno, BT_WRITE); buf = _bt_getbuf(rel, blkno, access);
page = BufferGetPage(buf); page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page); opaque = (BTPageOpaque) PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page); maxoff = PageGetMaxOffsetNumber(page);
...@@ -1400,10 +1382,13 @@ _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release) ...@@ -1400,10 +1382,13 @@ _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release)
/* /*
* Now read other pages (if any) on level and add them to new root. * Now read other pages (if any) on level and add them to new root.
* Here we break one of our locking rules - never hold lock on parent
* page when acquiring lock on its child, - but we free from deadlock:
*
* If concurrent process will split one of pages on this level then it * If concurrent process will split one of pages on this level then it
* will notice either btpo_parent == metablock or btpo_parent == rootblk. * will see either btpo_parent == metablock or btpo_parent == rootblk.
* In first case it will give up its locks and try to lock leftmost page * In first case it will give up its locks and walk to the leftmost page
* buffer (oldrootbuf) to fix root - ie it will wait for us and let us * (oldrootbuf) in _bt_fixup() - ie it will wait for us and let us
* continue. In second case it will try to lock rootbuf keeping its locks * continue. In second case it will try to lock rootbuf keeping its locks
* on buffers we already passed, also waiting for us. If we'll have to * on buffers we already passed, also waiting for us. If we'll have to
* unlock rootbuf (split it) and that process will have to split page * unlock rootbuf (split it) and that process will have to split page
...@@ -1547,14 +1532,12 @@ _bt_fixtree(Relation rel, BlockNumber blkno) ...@@ -1547,14 +1532,12 @@ _bt_fixtree(Relation rel, BlockNumber blkno)
* Check/fix level starting from page in buffer buf up to block * Check/fix level starting from page in buffer buf up to block
* limit on *child* level (or till rightmost child page if limit * limit on *child* level (or till rightmost child page if limit
* is InvalidBlockNumber). Start buffer must be read locked. * is InvalidBlockNumber). Start buffer must be read locked.
* No pins/locks are held on exit. Returns block number of last * No pins/locks are held on exit.
* visited/pointing-to-limit page on *check/fix* level.
*/ */
static BlockNumber static void
_bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
{ {
BlockNumber blkno = BufferGetBlockNumber(buf); BlockNumber blkno = BufferGetBlockNumber(buf);
BlockNumber pblkno = blkno;
Page page; Page page;
BTPageOpaque opaque; BTPageOpaque opaque;
BlockNumber cblkno[3]; BlockNumber cblkno[3];
...@@ -1639,22 +1622,20 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) ...@@ -1639,22 +1622,20 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
BTStackData stack; BTStackData stack;
stack.bts_parent = NULL; stack.bts_parent = NULL;
stack.bts_blkno = pblkno; stack.bts_blkno = blkno;
stack.bts_offset = InvalidOffsetNumber; stack.bts_offset = InvalidOffsetNumber;
ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid),
cblkno[0], P_HIKEY); cblkno[0], P_HIKEY);
buf = _bt_getstackbuf(rel, &stack); buf = _bt_getstackbuf(rel, &stack, BT_WRITE);
if (buf == InvalidBuffer) if (buf == InvalidBuffer)
elog(ERROR, "bt_fixlevel: pointer disappeared (need to recreate index)"); elog(ERROR, "bt_fixlevel: pointer disappeared (need to recreate index)");
page = BufferGetPage(buf); page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page); opaque = (BTPageOpaque) PageGetSpecialPointer(page);
coff[0] = stack.bts_offset; coff[0] = stack.bts_offset;
pblkno = BufferGetBlockNumber(buf); blkno = BufferGetBlockNumber(buf);
parblk[0] = pblkno; parblk[0] = blkno;
if (cblkno[0] == limit)
blkno = pblkno; /* where we have seen pointer to limit */
/* Check/insert missed pointers */ /* Check/insert missed pointers */
for (i = 1; i <= cidx; i++) for (i = 1; i <= cidx; i++)
...@@ -1668,8 +1649,6 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) ...@@ -1668,8 +1649,6 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
if (parblk[i] == parblk[i - 1] && if (parblk[i] == parblk[i - 1] &&
coff[i] != coff[i - 1] + 1) coff[i] != coff[i - 1] + 1)
elog(ERROR, "bt_fixlevel: invalid item order(2) (need to recreate index)"); elog(ERROR, "bt_fixlevel: invalid item order(2) (need to recreate index)");
if (cblkno[i] == limit)
blkno = parblk[i];
continue; continue;
} }
/* Have to check next page ? */ /* Have to check next page ? */
...@@ -1688,10 +1667,8 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) ...@@ -1688,10 +1667,8 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
buf = newbuf; buf = newbuf;
page = newpage; page = newpage;
opaque = newopaque; opaque = newopaque;
pblkno = BufferGetBlockNumber(buf); blkno = BufferGetBlockNumber(buf);
parblk[i] = pblkno; parblk[i] = blkno;
if (cblkno[i] == limit)
blkno = pblkno;
continue; continue;
} }
/* unfound - need to insert on current page */ /* unfound - need to insert on current page */
...@@ -1730,7 +1707,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) ...@@ -1730,7 +1707,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
page = BufferGetPage(buf); page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page); opaque = (BTPageOpaque) PageGetSpecialPointer(page);
} }
pblkno = BufferGetBlockNumber(buf); blkno = BufferGetBlockNumber(buf);
coff[i] = itup_off; coff[i] = itup_off;
} }
else else
...@@ -1740,9 +1717,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) ...@@ -1740,9 +1717,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
} }
pfree(btitem); pfree(btitem);
parblk[i] = pblkno; parblk[i] = blkno;
if (cblkno[i] == limit)
blkno = pblkno;
} }
/* copy page with pointer to cblkno[cidx] to temp storage */ /* copy page with pointer to cblkno[cidx] to temp storage */
...@@ -1750,8 +1725,6 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) ...@@ -1750,8 +1725,6 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
_bt_relbuf(rel, buf, BT_WRITE); _bt_relbuf(rel, buf, BT_WRITE);
page = (Page)tbuf; page = (Page)tbuf;
opaque = (BTPageOpaque) PageGetSpecialPointer(page); opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (limit == InvalidBlockNumber)
blkno = pblkno; /* last visited page */
} }
/* Continue if current check/fix level page is rightmost */ /* Continue if current check/fix level page is rightmost */
...@@ -1766,7 +1739,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) ...@@ -1766,7 +1739,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
{ {
if (cidx == 2) if (cidx == 2)
_bt_relbuf(rel, cbuf[2], BT_READ); _bt_relbuf(rel, cbuf[2], BT_READ);
return(blkno); return;
} }
if (cblkno[0] == limit || cblkno[1] == limit) if (cblkno[0] == limit || cblkno[1] == limit)
goodbye = true; goodbye = true;
...@@ -1778,6 +1751,168 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit) ...@@ -1778,6 +1751,168 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
} }
} }
/*
* Check/fix part of tree - branch - up from parent of level with blocks
* lblkno and rblknum. We first ensure that parent level has pointers
* to both lblkno & rblknum and if those pointers are on different
* parent pages then do the same for parent level, etc. No locks must
* be held on target level and upper on entry. No locks will be held
* on exit. Stack created when traversing tree down should be provided and
* it must points to parent level. rblkno must be on the right from lblkno.
* (This function is special edition of more expensive _bt_fixtree(),
* but it doesn't guarantee full consistency of tree.)
*/
static void
_bt_fixbranch(Relation rel, BlockNumber lblkno,
BlockNumber rblkno, BTStack true_stack)
{
BlockNumber blkno = true_stack->bts_blkno;
BTStackData stack;
BTPageOpaque opaque;
Buffer buf, rbuf;
Page page;
OffsetNumber offnum;
true_stack = true_stack->bts_parent;
for ( ; ; )
{
buf = _bt_getbuf(rel, blkno, BT_READ);
/* Check/fix parent level pointed by blkno */
_bt_fixlevel(rel, buf, rblkno);
/*
* Here parent level should have pointers for both
* lblkno and rblkno and we have to find them.
*/
stack.bts_parent = NULL;
stack.bts_blkno = blkno;
stack.bts_offset = InvalidOffsetNumber;
ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), lblkno, P_HIKEY);
buf = _bt_getstackbuf(rel, &stack, BT_READ);
if (buf == InvalidBuffer)
elog(ERROR, "bt_fixbranch: left pointer unfound (need to recreate index)");
page = BufferGetPage(buf);
offnum = _bt_getoff(page, rblkno);
if (offnum != InvalidOffsetNumber) /* right pointer found */
{
if (offnum <= stack.bts_offset)
elog(ERROR, "bt_fixbranch: invalid item order (need to recreate index)");
_bt_relbuf(rel, buf, BT_READ);
return;
}
/* Pointers are on different parent pages - find right one */
lblkno = BufferGetBlockNumber(buf);
stack.bts_parent = NULL;
stack.bts_blkno = lblkno;
stack.bts_offset = InvalidOffsetNumber;
ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), rblkno, P_HIKEY);
rbuf = _bt_getstackbuf(rel, &stack, BT_READ);
if (rbuf == InvalidBuffer)
elog(ERROR, "bt_fixbranch: right pointer unfound (need to recreate index)");
rblkno = BufferGetBlockNumber(rbuf);
_bt_relbuf(rel, rbuf, BT_READ);
/*
* If we have parent item in true_stack then go up one level and
* ensure that it has pointers to new lblkno & rblkno.
*/
if (true_stack)
{
_bt_relbuf(rel, buf, BT_READ);
blkno = true_stack->bts_blkno;
true_stack = true_stack->bts_parent;
continue;
}
/*
* Well, we are on the level that was root or unexistent when
* we started traversing tree down. If btpo_parent is updated
* then we'll use it to continue, else we'll fix/restore upper
* levels entirely.
*/
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (opaque->btpo_parent != BTREE_METAPAGE)
{
blkno = opaque->btpo_parent;
_bt_relbuf(rel, buf, BT_READ);
continue;
}
/* Have to switch to excl buf lock and re-check btpo_parent */
_bt_relbuf(rel, buf, BT_READ);
buf = _bt_getbuf(rel, blkno, BT_WRITE);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (opaque->btpo_parent != BTREE_METAPAGE)
{
blkno = opaque->btpo_parent;
_bt_relbuf(rel, buf, BT_WRITE);
continue;
}
/*
* We hold excl lock on some internal page with unupdated
* btpo_parent - time for _bt_fixup.
*/
break;
}
_bt_fixup(rel, buf);
return;
}
/*
* Having buf excl locked this routine walks to the left on level and
* uses either _bt_fixtree() or _bt_fixroot() to create/check&fix upper
* levels. No buffer pins/locks will be held on exit.
*/
static void
_bt_fixup(Relation rel, Buffer buf)
{
Page page;
BTPageOpaque opaque;
BlockNumber blkno;
for ( ; ; )
{
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/*
* If someone else already created parent pages
* then it's time for _bt_fixtree() to check upper
* levels and fix them, if required.
*/
if (opaque->btpo_parent != BTREE_METAPAGE)
{
blkno = opaque->btpo_parent;
_bt_relbuf(rel, buf, BT_WRITE);
_bt_fixtree(rel, blkno);
return;
}
if (P_LEFTMOST(opaque))
break;
blkno = opaque->btpo_prev;
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buf);
buf = _bt_getbuf(rel, blkno, BT_WRITE);
}
/*
* Ok, we are on the leftmost page, it's write locked
* by us and its btpo_parent points to meta page - time
* for _bt_fixroot().
*/
buf = _bt_fixroot(rel, buf, true);
_bt_relbuf(rel, buf, BT_WRITE);
return;
}
static OffsetNumber static OffsetNumber
_bt_getoff(Page page, BlockNumber blkno) _bt_getoff(Page page, BlockNumber blkno)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment