Commit dca0762e authored by Vadim B. Mikheev's avatar Vadim B. Mikheev

Couple additional functions to fix tree at runtime.

Need in one more function to handle "my bits moved..."
case. FixBTree is still FALSE.
parent ed7910ef
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.78 2001/01/29 07:28:16 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.79 2001/01/31 01:08:36 vadim Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -37,7 +37,9 @@ typedef struct
extern bool FixBTree;
Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release);
static void _bt_fixtree(Relation rel, BlockNumber blkno, BTStack stack);
static void _bt_fixtree(Relation rel, BlockNumber blkno);
static BlockNumber _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit);
static OffsetNumber _bt_getoff(Page page, BlockNumber blkno);
static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
......@@ -512,7 +514,7 @@ _bt_insertonpg(Relation rel,
{
blkno = lpageop->btpo_parent;
_bt_relbuf(rel, buf, BT_WRITE);
_bt_fixtree(rel, blkno, NULL);
_bt_fixtree(rel, blkno);
goto formres;
}
}
......@@ -561,6 +563,10 @@ _bt_insertonpg(Relation rel,
pbuf = _bt_getstackbuf(rel, stack);
if (pbuf == InvalidBuffer)
elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
"\n\tRecreate index %s.", RelationGetRelationName(rel));
/* Now we can write and unlock the children */
_bt_wrtbuf(rel, rbuf);
_bt_wrtbuf(rel, buf);
......@@ -1172,8 +1178,10 @@ _bt_getstackbuf(Relation rel, BTStack stack)
}
/* by here, the item we're looking for moved right at least one page */
if (P_RIGHTMOST(opaque))
elog(FATAL, "_bt_getstackbuf: my bits moved right off the end of the world!"
"\n\tRecreate index %s.", RelationGetRelationName(rel));
{
_bt_relbuf(rel, buf, BT_WRITE);
return(InvalidBuffer);
}
blkno = opaque->btpo_next;
_bt_relbuf(rel, buf, BT_WRITE);
......@@ -1450,6 +1458,7 @@ _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release)
/* give up left buffer */
_bt_relbuf(rel, leftbuf, BT_WRITE);
pfree(btitem);
leftbuf = rightbuf;
leftpage = rightpage;
leftopaque = rightopaque;
......@@ -1477,10 +1486,318 @@ _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release)
return(rootbuf);
}
/*
* Using blkno of leftmost page on a level inside tree this func
* checks/fixes tree from this level up to the root page.
*/
static void
_bt_fixtree(Relation rel, BlockNumber blkno, BTStack stack)
_bt_fixtree(Relation rel, BlockNumber blkno)
{
Buffer buf;
Page page;
BTPageOpaque opaque;
BlockNumber pblkno;
elog(ERROR, "bt_fixtree: unimplemented , yet (need to recreate index)");
for ( ; ; )
{
buf = _bt_getbuf(rel, blkno, BT_READ);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (! P_LEFTMOST(opaque) || P_ISLEAF(opaque))
elog(ERROR, "bt_fixtree: invalid start page (need to recreate index)");
pblkno = opaque->btpo_parent;
/* check/fix entire level */
_bt_fixlevel(rel, buf, InvalidBlockNumber);
/*
* No pins/locks are held here. Re-read start page if its
* btpo_parent pointed to meta page else go up one level.
*/
if (pblkno == BTREE_METAPAGE)
{
buf = _bt_getbuf(rel, blkno, BT_WRITE);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_ISROOT(opaque))
{
/* Tree is Ok now */
_bt_relbuf(rel, buf, BT_WRITE);
return;
}
pblkno = opaque->btpo_parent;
/* Call _bt_fixroot() if there is no upper level */
if (pblkno == BTREE_METAPAGE)
{
buf = _bt_fixroot(rel, buf, true);
_bt_relbuf(rel, buf, BT_WRITE);
return;
}
/* Have to go up one level */
_bt_relbuf(rel, buf, BT_WRITE);
blkno = pblkno;
}
}
}
/*
* Check/fix level starting from page in buffer buf up to block
* limit on *child* level (or till rightmost child page if limit
* is InvalidBlockNumber). Start buffer must be read locked.
* No pins/locks are held on exit. Returns block number of last
* visited/pointing-to-limit page on *check/fix* level.
*/
static BlockNumber
_bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
{
elog(ERROR, "bt_fixtree: unimplemented , yet");
BlockNumber blkno = BufferGetBlockNumber(buf);
BlockNumber pblkno = blkno;
Page page;
BTPageOpaque opaque;
BlockNumber cblkno[3];
OffsetNumber coff[3];
Buffer cbuf[3];
Page cpage[3];
BTPageOpaque copaque[3];
BTItem btitem;
int cidx, i;
bool goodbye = false;
char tbuf[BLCKSZ];
page = BufferGetPage(buf);
/* copy page to temp storage */
memmove(tbuf, page, PageGetPageSize(page));
_bt_relbuf(rel, buf, BT_READ);
page = (Page)tbuf;
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/* Initialize first child data */
coff[0] = P_FIRSTDATAKEY(opaque);
if (coff[0] > PageGetMaxOffsetNumber(page))
elog(ERROR, "bt_fixlevel: invalid maxoff on start page (need to recreate index)");
btitem = (BTItem) PageGetItem(page, PageGetItemId(page, coff[0]));
cblkno[0] = ItemPointerGetBlockNumber(&(btitem->bti_itup.t_tid));
cbuf[0] = _bt_getbuf(rel, cblkno[0], BT_READ);
cpage[0] = BufferGetPage(cbuf[0]);
copaque[0] = (BTPageOpaque) PageGetSpecialPointer(cpage[0]);
if (P_LEFTMOST(opaque) && ! P_LEFTMOST(copaque[0]))
elog(ERROR, "bt_fixtlevel: non-leftmost child page of leftmost parent (need to recreate index)");
/* caller should take care and avoid this */
if (P_RIGHTMOST(copaque[0]))
elog(ERROR, "bt_fixtlevel: invalid start child (need to recreate index)");
for ( ; ; )
{
/*
* Read up to 2 more child pages and look for pointers
* to them in *saved* parent page
*/
coff[1] = coff[2] = InvalidOffsetNumber;
for (cidx = 0; cidx < 2; )
{
cidx++;
cblkno[cidx] = (copaque[cidx - 1])->btpo_next;
cbuf[cidx] = _bt_getbuf(rel, cblkno[cidx], BT_READ);
cpage[cidx] = BufferGetPage(cbuf[cidx]);
copaque[cidx] = (BTPageOpaque) PageGetSpecialPointer(cpage[cidx]);
coff[cidx] = _bt_getoff(page, cblkno[cidx]);
/* sanity check */
if (coff[cidx] != InvalidOffsetNumber)
{
for (i = cidx - 1; i >= 0; i--)
{
if (coff[i] == InvalidOffsetNumber)
continue;
if (coff[cidx] != coff[i] + 1)
elog(ERROR, "bt_fixlevel: invalid item order(1) (need to recreate index)");
break;
}
}
if (P_RIGHTMOST(copaque[cidx]))
break;
}
/*
* Read parent page and insert missed pointers.
*/
if (coff[1] == InvalidOffsetNumber ||
(cidx == 2 && coff[2] == InvalidOffsetNumber))
{
Buffer newbuf;
Page newpage;
BTPageOpaque newopaque;
BTItem ritem;
Size itemsz;
OffsetNumber newitemoff;
BlockNumber parblk[3];
BTStackData stack;
stack.bts_parent = NULL;
stack.bts_blkno = pblkno;
stack.bts_offset = InvalidOffsetNumber;
ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid),
cblkno[0], P_HIKEY);
buf = _bt_getstackbuf(rel, &stack);
if (buf == InvalidBuffer)
elog(ERROR, "bt_fixlevel: pointer disappeared (need to recreate index)");
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
coff[0] = stack.bts_offset;
pblkno = BufferGetBlockNumber(buf);
parblk[0] = pblkno;
if (cblkno[0] == limit)
blkno = pblkno; /* where we have seen pointer to limit */
/* Check/insert missed pointers */
for (i = 1; i <= cidx; i++)
{
coff[i] = _bt_getoff(page, cblkno[i]);
/* sanity check */
parblk[i] = BufferGetBlockNumber(buf);
if (coff[i] != InvalidOffsetNumber)
{
if (parblk[i] == parblk[i - 1] &&
coff[i] != coff[i - 1] + 1)
elog(ERROR, "bt_fixlevel: invalid item order(2) (need to recreate index)");
if (cblkno[i] == limit)
blkno = parblk[i];
continue;
}
/* Have to check next page ? */
if ((! P_RIGHTMOST(opaque)) &&
coff[i - 1] == PageGetMaxOffsetNumber(page)) /* yes */
{
newbuf = _bt_getbuf(rel, opaque->btpo_next, BT_WRITE);
newpage = BufferGetPage(newbuf);
newopaque = (BTPageOpaque) PageGetSpecialPointer(newpage);
coff[i] = _bt_getoff(newpage, cblkno[i]);
if (coff[i] != InvalidOffsetNumber) /* found ! */
{
if (coff[i] != P_FIRSTDATAKEY(newopaque))
elog(ERROR, "bt_fixlevel: invalid item order(3) (need to recreate index)");
_bt_relbuf(rel, buf, BT_WRITE);
buf = newbuf;
page = newpage;
opaque = newopaque;
pblkno = BufferGetBlockNumber(buf);
parblk[i] = pblkno;
if (cblkno[i] == limit)
blkno = pblkno;
continue;
}
/* unfound - need to insert on current page */
_bt_relbuf(rel, newbuf, BT_WRITE);
}
/* insert pointer */
ritem = (BTItem) PageGetItem(cpage[i - 1],
PageGetItemId(cpage[i - 1], P_HIKEY));
btitem = _bt_formitem(&(ritem->bti_itup));
ItemPointerSet(&(btitem->bti_itup.t_tid), cblkno[i], P_HIKEY);
itemsz = IndexTupleDSize(btitem->bti_itup)
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = MAXALIGN(itemsz);
newitemoff = coff[i - 1] + 1;
if (PageGetFreeSpace(page) < itemsz)
{
OffsetNumber firstright;
OffsetNumber itup_off;
BlockNumber itup_blkno;
bool newitemonleft;
firstright = _bt_findsplitloc(rel, page,
newitemoff, itemsz, &newitemonleft);
newbuf = _bt_split(rel, buf, firstright,
newitemoff, itemsz, btitem, newitemonleft,
&itup_off, &itup_blkno);
/* what buffer we need in ? */
if (newitemonleft)
_bt_relbuf(rel, newbuf, BT_WRITE);
else
{
_bt_relbuf(rel, buf, BT_WRITE);
buf = newbuf;
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
}
pblkno = BufferGetBlockNumber(buf);
coff[i] = itup_off;
}
else
{
_bt_insertuple(rel, buf, itemsz, btitem, newitemoff);
coff[i] = newitemoff;
}
pfree(btitem);
parblk[i] = pblkno;
if (cblkno[i] == limit)
blkno = pblkno;
}
/* copy page with pointer to cblkno[cidx] to temp storage */
memmove(tbuf, page, PageGetPageSize(page));
_bt_relbuf(rel, buf, BT_WRITE);
page = (Page)tbuf;
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (limit == InvalidBlockNumber)
blkno = pblkno; /* last visited page */
}
/* Continue if current check/fix level page is rightmost */
if (P_RIGHTMOST(opaque))
goodbye = false;
/* Pointers to child pages are Ok - right end of child level ? */
_bt_relbuf(rel, cbuf[0], BT_READ);
_bt_relbuf(rel, cbuf[1], BT_READ);
if (cidx == 1 ||
(cidx == 2 && (P_RIGHTMOST(copaque[2]) || goodbye)))
{
if (cidx == 2)
_bt_relbuf(rel, cbuf[2], BT_READ);
return(blkno);
}
if (cblkno[0] == limit || cblkno[1] == limit)
goodbye = true;
cblkno[0] = cblkno[2];
cbuf[0] = cbuf[2];
cpage[0] = cpage[2];
copaque[0] = copaque[2];
coff[0] = coff[2];
}
}
static OffsetNumber
_bt_getoff(Page page, BlockNumber blkno)
{
BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
OffsetNumber maxoff = PageGetMaxOffsetNumber(page);
OffsetNumber offnum = P_FIRSTDATAKEY(opaque);
BlockNumber curblkno;
ItemId itemid;
BTItem item;
for ( ; offnum <= maxoff; offnum++)
{
itemid = PageGetItemId(page, offnum);
item = (BTItem) PageGetItem(page, itemid);
curblkno = ItemPointerGetBlockNumber(&(item->bti_itup.t_tid));
if (curblkno == blkno)
return(offnum);
}
return(InvalidOffsetNumber);
}
/*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment