Commit 71b3e93c authored by Vadim B. Mikheev's avatar Vadim B. Mikheev

Duplicates handling...

parent 3548a410
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.15 1997/06/06 03:11:42 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.16 1997/06/10 07:28:47 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -28,14 +28,16 @@ ...@@ -28,14 +28,16 @@
#endif #endif
static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem); static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem);
static Buffer _bt_split(Relation rel, Buffer buf, BTItem hiRightItem); static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright);
static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit); static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit);
static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem); static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem);
static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem); static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem);
static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, BTItem oldItem, BTItem newItem); static void _bt_updateitem(Relation rel, Size keysz, Buffer buf, BTItem oldItem, BTItem newItem);
static bool _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey); static bool _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, int keysz, ScanKey scankey);
#if 0
static InsertIndexResult _bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem hikey); static InsertIndexResult _bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem hikey);
#endif
/* /*
* _bt_doinsert() -- Handle insertion of a single btitem in the tree. * _bt_doinsert() -- Handle insertion of a single btitem in the tree.
...@@ -52,14 +54,13 @@ _bt_doinsert(Relation rel, BTItem btitem, bool index_is_unique, Relation heapRel ...@@ -52,14 +54,13 @@ _bt_doinsert(Relation rel, BTItem btitem, bool index_is_unique, Relation heapRel
BTStack stack; BTStack stack;
Buffer buf; Buffer buf;
BlockNumber blkno; BlockNumber blkno;
int natts; int natts = rel->rd_rel->relnatts;
InsertIndexResult res; InsertIndexResult res;
itup = &(btitem->bti_itup); itup = &(btitem->bti_itup);
/* we need a scan key to do our search, so build one */ /* we need a scan key to do our search, so build one */
itup_scankey = _bt_mkscankey(rel, itup); itup_scankey = _bt_mkscankey(rel, itup);
natts = rel->rd_rel->relnatts;
/* find the page containing this key */ /* find the page containing this key */
stack = _bt_search(rel, natts, itup_scankey, &buf); stack = _bt_search(rel, natts, itup_scankey, &buf);
...@@ -223,17 +224,13 @@ _bt_insertonpg(Relation rel, ...@@ -223,17 +224,13 @@ _bt_insertonpg(Relation rel,
{ {
InsertIndexResult res; InsertIndexResult res;
Page page; Page page;
Buffer rbuf;
Buffer pbuf;
Page rpage;
BTItem ritem;
BTPageOpaque lpageop; BTPageOpaque lpageop;
BTPageOpaque rpageop; BlockNumber itup_blkno;
BlockNumber rbknum, itup_blkno;
OffsetNumber itup_off; OffsetNumber itup_off;
OffsetNumber firstright = InvalidOffsetNumber;
int itemsz; int itemsz;
Page ppage; bool do_split = false;
BTPageOpaque ppageop; bool keys_equal = false;
page = BufferGetPage(buf); page = BufferGetPage(buf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(page); lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
...@@ -248,8 +245,9 @@ _bt_insertonpg(Relation rel, ...@@ -248,8 +245,9 @@ _bt_insertonpg(Relation rel,
* page in the chain of duplicates then: * page in the chain of duplicates then:
* 1. if scankey == hikey (i.e. - new duplicate item) then * 1. if scankey == hikey (i.e. - new duplicate item) then
* insert it here; * insert it here;
* 2. if scankey < hikey then we grab new page, copy current page * 2. if scankey < hikey then:
* content there and insert new item on the current page. * 2.a if there is duplicate key(s) here - we force splitting;
* 2.b else - we may "eat" this page from duplicates chain.
*/ */
if ( lpageop->btpo_flags & BTP_CHAIN ) if ( lpageop->btpo_flags & BTP_CHAIN )
{ {
...@@ -274,26 +272,105 @@ _bt_insertonpg(Relation rel, ...@@ -274,26 +272,105 @@ _bt_insertonpg(Relation rel,
if ( !_bt_skeycmp (rel, keysz, scankey, page, hitemid, if ( !_bt_skeycmp (rel, keysz, scankey, page, hitemid,
BTLessStrategyNumber) ) BTLessStrategyNumber) )
elog (FATAL, "btree: attempt to insert higher key on the leftmost page in the chain of duplicates"); elog (FATAL, "btree: attempt to insert higher key on the leftmost page in the chain of duplicates");
return (_bt_shift(rel, buf, stack, keysz, scankey, btitem, hitem)); if ( maxoff > P_HIKEY ) /* have duplicate(s) */
{
firstright = P_FIRSTKEY;
do_split = true;
}
else /* "eat" page */
{
Buffer pbuf;
Page ppage;
itup_blkno = BufferGetBlockNumber(buf);
itup_off = PageAddItem(page, (Item) btitem, itemsz,
P_FIRSTKEY, LP_USED);
if ( itup_off == InvalidOffsetNumber )
elog (FATAL, "btree: failed to add item");
lpageop->btpo_flags &= ~BTP_CHAIN;
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
ppage = BufferGetPage(pbuf);
PageIndexTupleDelete(ppage, stack->bts_offset);
pfree(stack->bts_btitem);
stack->bts_btitem = _bt_formitem(&(btitem->bti_itup));
ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
itup_blkno, P_HIKEY);
_bt_wrtbuf(rel, buf);
res = _bt_insertonpg(rel, pbuf, stack->bts_parent,
keysz, scankey, stack->bts_btitem,
NULL);
ItemPointerSet(&(res->pointerData), itup_blkno, itup_off);
return (res);
}
} }
else
{
keys_equal = true;
if ( PageGetFreeSpace(page) < itemsz )
do_split = true;
}
}
else if ( PageGetFreeSpace(page) < itemsz )
do_split = true;
else if ( PageGetFreeSpace(page) < 3*itemsz + 2*sizeof(ItemIdData) )
{
OffsetNumber offnum = (P_RIGHTMOST(lpageop)) ? P_HIKEY : P_FIRSTKEY;
OffsetNumber maxoff = PageGetMaxOffsetNumber (page);
ItemId itid;
BTItem previtem, chkitem;
Size maxsize;
Size currsize;
itid = PageGetItemId(page, offnum);
previtem = (BTItem) PageGetItem(page, itid);
maxsize = currsize = (ItemIdGetLength(itid) + sizeof(ItemIdData));
for (offnum = OffsetNumberNext(offnum);
offnum <= maxoff; offnum = OffsetNumberNext(offnum) )
{
itid = PageGetItemId(page, offnum);
chkitem = (BTItem) PageGetItem(page, itid);
if ( !_bt_itemcmp (rel, keysz, previtem, chkitem,
BTEqualStrategyNumber) )
{
if ( currsize > maxsize )
maxsize = currsize;
currsize = 0;
previtem = chkitem;
}
currsize += (ItemIdGetLength(itid) + sizeof(ItemIdData));
}
if ( currsize > maxsize )
maxsize = currsize;
maxsize += sizeof (PageHeaderData) +
DOUBLEALIGN (sizeof (BTPageOpaqueData));
if ( maxsize >= PageGetPageSize (page) / 2 )
do_split = true;
} }
if (PageGetFreeSpace(page) < itemsz) if ( do_split )
{ {
Buffer rbuf;
Page rpage;
BTItem ritem;
BlockNumber rbknum;
BTPageOpaque rpageop;
Buffer pbuf;
Page ppage;
BTPageOpaque ppageop;
BlockNumber bknum = BufferGetBlockNumber(buf); BlockNumber bknum = BufferGetBlockNumber(buf);
BTItem lowLeftItem; BTItem lowLeftItem;
BTItem hiRightItem = NULL; OffsetNumber maxoff;
bool shifted = false;
bool left_chained = ( lpageop->btpo_flags & BTP_CHAIN ) ? true : false;
/* /*
* If we have to split leaf page in the chain of duplicates * If we have to split leaf page in the chain of duplicates by
* then we try to look at our right sibling first. * new duplicate then we try to look at our right sibling first.
*/ */
if ( ( lpageop->btpo_flags & BTP_CHAIN ) && if ( ( lpageop->btpo_flags & BTP_CHAIN ) &&
( lpageop->btpo_flags & BTP_LEAF ) ) ( lpageop->btpo_flags & BTP_LEAF ) && keys_equal )
{ {
bool use_left = true; bool use_left = true;
bool keys_equal = false;
rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE); rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
rpage = BufferGetPage(rbuf); rpage = BufferGetPage(rbuf);
...@@ -309,29 +386,20 @@ _bt_insertonpg(Relation rel, ...@@ -309,29 +386,20 @@ _bt_insertonpg(Relation rel,
{ {
if ( !( rpageop->btpo_flags & BTP_CHAIN ) ) if ( !( rpageop->btpo_flags & BTP_CHAIN ) )
elog (FATAL, "btree: lost page in the chain of duplicates"); elog (FATAL, "btree: lost page in the chain of duplicates");
keys_equal = true;
} }
else if ( _bt_skeycmp (rel, keysz, scankey, rpage, else if ( _bt_skeycmp (rel, keysz, scankey, rpage,
PageGetItemId(rpage, P_HIKEY), PageGetItemId(rpage, P_HIKEY),
BTGreaterStrategyNumber) ) BTGreaterStrategyNumber) )
elog (FATAL, "btree: hikey is out of order"); elog (FATAL, "btree: hikey is out of order");
/* else if ( rpageop->btpo_flags & BTP_CHAIN )
* If hikey > scankey and BTP_CHAIN is ON /*
* then it's first page of the chain of higher keys: * If hikey > scankey then it's last page in chain and
* our left sibling hikey was lying! We can't add new * BTP_CHAIN must be OFF
* item here, but we can turn BTP_CHAIN off on our
* left page and overwrite its hikey.
*/ */
if ( !keys_equal && ( rpageop->btpo_flags & BTP_CHAIN ) ) elog (FATAL, "btree: lost last page in the chain of duplicates");
{
BTItem tmp;
lpageop->btpo_flags &= ~BTP_CHAIN;
tmp = (BTItem) PageGetItem(rpage, PageGetItemId(rpage, P_HIKEY));
hiRightItem = _bt_formitem(&(tmp->bti_itup));
}
/* if there is room here then we use this page. */ /* if there is room here then we use this page. */
else if ( PageGetFreeSpace (rpage) > itemsz ) if ( PageGetFreeSpace (rpage) > itemsz )
use_left = false; use_left = false;
} }
else /* rightmost page */ else /* rightmost page */
...@@ -349,12 +417,70 @@ _bt_insertonpg(Relation rel, ...@@ -349,12 +417,70 @@ _bt_insertonpg(Relation rel,
} }
_bt_relbuf(rel, rbuf, BT_WRITE); _bt_relbuf(rel, rbuf, BT_WRITE);
} }
/*
* If after splitting un-chained page we'll got chain of pages
* with duplicates then we want to know
* 1. on which of two pages new btitem will go (current
* _bt_findsplitloc is quite bad);
* 2. what parent (if there's one) thinking about it
* (remember about deletions)
*/
else if ( !( lpageop->btpo_flags & BTP_CHAIN ) )
{
OffsetNumber start = ( P_RIGHTMOST(lpageop) ) ? P_HIKEY : P_FIRSTKEY;
Size llimit;
maxoff = PageGetMaxOffsetNumber (page);
llimit = PageGetPageSize(page) - sizeof (PageHeaderData) -
DOUBLEALIGN (sizeof (BTPageOpaqueData))
+ sizeof(ItemIdData);
llimit /= 2;
firstright = _bt_findsplitloc(rel, page, start, maxoff, llimit);
if ( _bt_itemcmp (rel, keysz,
(BTItem) PageGetItem(page, PageGetItemId(page, start)),
(BTItem) PageGetItem(page, PageGetItemId(page, firstright)),
BTEqualStrategyNumber) )
{
if ( _bt_skeycmp (rel, keysz, scankey, page,
PageGetItemId(page, firstright),
BTLessStrategyNumber) )
/*
* force moving current items to the new page:
* new item will go on the current page.
*/
firstright = start;
else
/*
* new btitem >= firstright, start item == firstright -
* new chain of duplicates: if this non-leftmost leaf
* page and parent item < start item then force moving
* all items to the new page - current page will be
* "empty" after it.
*/
{
if ( !P_LEFTMOST (lpageop) &&
( lpageop->btpo_flags & BTP_LEAF ) )
{
ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
bknum, P_HIKEY);
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
if ( _bt_itemcmp (rel, keysz, stack->bts_btitem,
(BTItem) PageGetItem(page,
PageGetItemId(page, start)),
BTLessStrategyNumber) )
{
firstright = start;
shifted = true;
}
_bt_relbuf(rel, pbuf, BT_WRITE);
}
}
} /* else - no new chain if start item < firstright one */
}
/* split the buffer into left and right halves */ /* split the buffer into left and right halves */
rbuf = _bt_split(rel, buf, hiRightItem); rbuf = _bt_split(rel, buf, firstright);
if ( hiRightItem != (BTItem) NULL )
pfree (hiRightItem);
/* which new page (left half or right half) gets the tuple? */ /* which new page (left half or right half) gets the tuple? */
if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem)) { if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem)) {
...@@ -369,13 +495,24 @@ _bt_insertonpg(Relation rel, ...@@ -369,13 +495,24 @@ _bt_insertonpg(Relation rel,
itup_blkno = BufferGetBlockNumber(rbuf); itup_blkno = BufferGetBlockNumber(rbuf);
} }
lowLeftItem = (BTItem) PageGetItem(page, maxoff = PageGetMaxOffsetNumber (page);
if ( shifted )
{
if ( maxoff > P_FIRSTKEY )
elog (FATAL, "btree: shifted page is not empty");
lowLeftItem = (BTItem) NULL;
}
else
{
if ( maxoff < P_FIRSTKEY )
elog (FATAL, "btree: un-shifted page is empty");
lowLeftItem = (BTItem) PageGetItem(page,
PageGetItemId(page, P_FIRSTKEY)); PageGetItemId(page, P_FIRSTKEY));
if ( _bt_itemcmp (rel, keysz, lowLeftItem,
if ( _bt_itemcmp (rel, keysz, lowLeftItem,
(BTItem) PageGetItem(page, PageGetItemId(page, P_HIKEY)), (BTItem) PageGetItem(page, PageGetItemId(page, P_HIKEY)),
BTEqualStrategyNumber) ) BTEqualStrategyNumber) )
lpageop->btpo_flags |= BTP_CHAIN; lpageop->btpo_flags |= BTP_CHAIN;
}
/* /*
* By here, * By here,
...@@ -405,6 +542,8 @@ _bt_insertonpg(Relation rel, ...@@ -405,6 +542,8 @@ _bt_insertonpg(Relation rel,
BTItem new_item; BTItem new_item;
OffsetNumber upditem_offset = P_HIKEY; OffsetNumber upditem_offset = P_HIKEY;
bool do_update = false; bool do_update = false;
bool update_in_place = true;
bool parent_chained;
/* form a index tuple that points at the new right page */ /* form a index tuple that points at the new right page */
rbknum = BufferGetBlockNumber(rbuf); rbknum = BufferGetBlockNumber(rbuf);
...@@ -449,6 +588,10 @@ _bt_insertonpg(Relation rel, ...@@ -449,6 +588,10 @@ _bt_insertonpg(Relation rel,
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
ppage = BufferGetPage(pbuf); ppage = BufferGetPage(pbuf);
ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage); ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
parent_chained = (( ppageop->btpo_flags & BTP_CHAIN )) ? true : false;
if ( parent_chained && !left_chained )
elog (FATAL, "nbtree: unexpected chained parent of unchained page");
/* /*
* If the key of new_item is < than the key of the item * If the key of new_item is < than the key of the item
...@@ -472,7 +615,8 @@ _bt_insertonpg(Relation rel, ...@@ -472,7 +615,8 @@ _bt_insertonpg(Relation rel,
*/ */
if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, new_item, if ( _bt_itemcmp (rel, keysz, stack->bts_btitem, new_item,
BTGreaterStrategyNumber) || BTGreaterStrategyNumber) ||
( _bt_itemcmp(rel, keysz, stack->bts_btitem, ( !shifted &&
_bt_itemcmp(rel, keysz, stack->bts_btitem,
new_item, BTEqualStrategyNumber) && new_item, BTEqualStrategyNumber) &&
_bt_itemcmp(rel, keysz, lowLeftItem, _bt_itemcmp(rel, keysz, lowLeftItem,
new_item, BTLessStrategyNumber) ) ) new_item, BTLessStrategyNumber) ) )
...@@ -491,34 +635,17 @@ _bt_insertonpg(Relation rel, ...@@ -491,34 +635,17 @@ _bt_insertonpg(Relation rel,
elog (FATAL, "btree: items are out of order (leftmost %d, stack %u, update %u)", elog (FATAL, "btree: items are out of order (leftmost %d, stack %u, update %u)",
P_LEFTMOST(lpageop), stack->bts_offset, upditem_offset); P_LEFTMOST(lpageop), stack->bts_offset, upditem_offset);
} }
/*
* There was bug caused by deletion all minimum keys (K1) from
* an index page and insertion there (up to page splitting)
* higher duplicate keys (K2): after it parent item for left
* page contained K1 and the next item (for new right page) - K2,
* - and scan for the key = K2 lost items on the left page.
* So, we have to update parent item if its key < minimum
* key on the left and minimum keys on the left and on the right
* are equal. It would be nice to update hikey on the previous
* page of the left one too, but we may get deadlock here
* (read comments in _bt_split), so we leave previous page
* hikey _inconsistent_, but there should to be BTP_CHAIN flag
* on it, which privents _bt_moveright from dangerous movings
* from there. - vadim 05/27/97
*/
else if ( _bt_itemcmp (rel, keysz, stack->bts_btitem,
lowLeftItem, BTLessStrategyNumber) &&
_bt_itemcmp (rel, keysz, new_item,
lowLeftItem, BTEqualStrategyNumber) )
{
do_update = true;
upditem_offset = stack->bts_offset;
}
if ( do_update ) if ( do_update )
{ {
/* Try to update in place. */ if ( shifted )
if ( DOUBLEALIGN (IndexTupleDSize (lowLeftItem->bti_itup)) == elog (FATAL, "btree: attempt to update parent for shifted page");
/*
* Try to update in place. If out parent page is chained
* then we must forse insertion.
*/
if ( !parent_chained &&
DOUBLEALIGN (IndexTupleDSize (lowLeftItem->bti_itup)) ==
DOUBLEALIGN (IndexTupleDSize (stack->bts_btitem->bti_itup)) ) DOUBLEALIGN (IndexTupleDSize (stack->bts_btitem->bti_itup)) )
{ {
_bt_updateitem(rel, keysz, pbuf, _bt_updateitem(rel, keysz, pbuf,
...@@ -528,6 +655,7 @@ _bt_insertonpg(Relation rel, ...@@ -528,6 +655,7 @@ _bt_insertonpg(Relation rel,
} }
else else
{ {
update_in_place = false;
PageIndexTupleDelete(ppage, upditem_offset); PageIndexTupleDelete(ppage, upditem_offset);
/* /*
...@@ -543,14 +671,18 @@ _bt_insertonpg(Relation rel, ...@@ -543,14 +671,18 @@ _bt_insertonpg(Relation rel,
ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
bknum, P_HIKEY); bknum, P_HIKEY);
/* unlock the children before doing this */ /*
* Unlock the children before doing this
*
* Mmm ... I foresee problems here. - vadim 06/10/97
*/
_bt_relbuf(rel, buf, BT_WRITE); _bt_relbuf(rel, buf, BT_WRITE);
_bt_relbuf(rel, rbuf, BT_WRITE); _bt_relbuf(rel, rbuf, BT_WRITE);
/* /*
* A regular _bt_binsrch should find the right place to * A regular _bt_binsrch should find the right place to
* put the new entry, since it should be either lower * put the new entry, since it should be lower than any
* than any other key on the page or unique. * other key on the page.
* Therefore set afteritem to NULL. * Therefore set afteritem to NULL.
*/ */
newskey = _bt_mkscankey(rel, &(stack->bts_btitem->bti_itup)); newskey = _bt_mkscankey(rel, &(stack->bts_btitem->bti_itup));
...@@ -575,9 +707,49 @@ _bt_insertonpg(Relation rel, ...@@ -575,9 +707,49 @@ _bt_insertonpg(Relation rel,
} }
newskey = _bt_mkscankey(rel, &(new_item->bti_itup)); newskey = _bt_mkscankey(rel, &(new_item->bti_itup));
afteritem = stack->bts_btitem;
if ( parent_chained && !update_in_place )
{
ppage = BufferGetPage(pbuf);
ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
if ( ppageop->btpo_flags & BTP_CHAIN )
elog (FATAL, "btree: unexpected BTP_CHAIN flag in parent after update");
if ( P_RIGHTMOST (ppageop) )
elog (FATAL, "btree: chained parent is RIGHTMOST after update");
maxoff = PageGetMaxOffsetNumber (ppage);
if ( maxoff != P_FIRSTKEY )
elog (FATAL, "btree: FIRSTKEY was unexpected in parent after update");
if ( _bt_skeycmp (rel, keysz, newskey, ppage,
PageGetItemId(ppage, P_FIRSTKEY),
BTLessEqualStrategyNumber) )
elog (FATAL, "btree: parent FIRSTKEY is >= duplicate key after update");
if ( !_bt_skeycmp (rel, keysz, newskey, ppage,
PageGetItemId(ppage, P_HIKEY),
BTEqualStrategyNumber) )
elog (FATAL, "btree: parent HIGHKEY is not equal duplicate key after update");
afteritem = (BTItem) NULL;
}
else if ( left_chained && !update_in_place )
{
ppage = BufferGetPage(pbuf);
ppageop = (BTPageOpaque) PageGetSpecialPointer(ppage);
if ( !P_RIGHTMOST (ppageop) &&
_bt_skeycmp (rel, keysz, newskey, ppage,
PageGetItemId(ppage, P_HIKEY),
BTGreaterStrategyNumber) )
afteritem = (BTItem) NULL;
}
if ( afteritem == (BTItem) NULL)
{
rbuf = _bt_getbuf(rel, ppageop->btpo_next, BT_WRITE);
_bt_relbuf(rel, pbuf, BT_WRITE);
pbuf = rbuf;
}
newres = _bt_insertonpg(rel, pbuf, stack->bts_parent, newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
keysz, newskey, new_item, keysz, newskey, new_item,
stack->bts_btitem); afteritem);
/* be tidy */ /* be tidy */
pfree(newres); pfree(newres);
...@@ -607,7 +779,7 @@ _bt_insertonpg(Relation rel, ...@@ -607,7 +779,7 @@ _bt_insertonpg(Relation rel,
* pin and lock on buf are maintained. * pin and lock on buf are maintained.
*/ */
static Buffer static Buffer
_bt_split(Relation rel, Buffer buf, BTItem hiRightItem) _bt_split(Relation rel, Buffer buf, OffsetNumber firstright)
{ {
Buffer rbuf; Buffer rbuf;
Page origpage; Page origpage;
...@@ -622,9 +794,7 @@ _bt_split(Relation rel, Buffer buf, BTItem hiRightItem) ...@@ -622,9 +794,7 @@ _bt_split(Relation rel, Buffer buf, BTItem hiRightItem)
OffsetNumber leftoff, rightoff; OffsetNumber leftoff, rightoff;
OffsetNumber start; OffsetNumber start;
OffsetNumber maxoff; OffsetNumber maxoff;
OffsetNumber firstright;
OffsetNumber i; OffsetNumber i;
Size llimit;
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE); rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
origpage = BufferGetPage(buf); origpage = BufferGetPage(buf);
...@@ -666,23 +836,9 @@ _bt_split(Relation rel, Buffer buf, BTItem hiRightItem) ...@@ -666,23 +836,9 @@ _bt_split(Relation rel, Buffer buf, BTItem hiRightItem)
/* splitting a non-rightmost page, start at the first data item */ /* splitting a non-rightmost page, start at the first data item */
start = P_FIRSTKEY; start = P_FIRSTKEY;
/* itemid = PageGetItemId(origpage, P_HIKEY);
* Copy the original high key to the new page if high key itemsz = ItemIdGetLength(itemid);
* was not passed by caller. item = (BTItem) PageGetItem(origpage, itemid);
*/
if ( hiRightItem == NULL )
{
itemid = PageGetItemId(origpage, P_HIKEY);
itemsz = ItemIdGetLength(itemid);
item = (BTItem) PageGetItem(origpage, itemid);
}
else
{
item = hiRightItem;
itemsz = IndexTupleDSize(hiRightItem->bti_itup)
+ (sizeof(BTItemData) - sizeof(IndexTupleData));
itemsz = DOUBLEALIGN(itemsz);
}
if ( PageAddItem(rightpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber ) if ( PageAddItem(rightpage, (Item) item, itemsz, P_HIKEY, LP_USED) == InvalidOffsetNumber )
elog (FATAL, "btree: failed to add hikey to the right sibling"); elog (FATAL, "btree: failed to add hikey to the right sibling");
rightoff = P_FIRSTKEY; rightoff = P_FIRSTKEY;
...@@ -694,8 +850,11 @@ _bt_split(Relation rel, Buffer buf, BTItem hiRightItem) ...@@ -694,8 +850,11 @@ _bt_split(Relation rel, Buffer buf, BTItem hiRightItem)
rightoff = P_HIKEY; rightoff = P_HIKEY;
} }
maxoff = PageGetMaxOffsetNumber(origpage); maxoff = PageGetMaxOffsetNumber(origpage);
llimit = PageGetFreeSpace(leftpage) / 2; if ( firstright == InvalidOffsetNumber )
firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit); {
Size llimit = PageGetFreeSpace(leftpage) / 2;
firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit);
}
for (i = start; i <= maxoff; i = OffsetNumberNext(i)) { for (i = start; i <= maxoff; i = OffsetNumberNext(i)) {
itemid = PageGetItemId(origpage, i); itemid = PageGetItemId(origpage, i);
...@@ -814,6 +973,9 @@ _bt_findsplitloc(Relation rel, ...@@ -814,6 +973,9 @@ _bt_findsplitloc(Relation rel,
Size nbytes; Size nbytes;
int natts; int natts;
if ( start >= maxoff )
elog (FATAL, "btree: cannot split if start (%d) >= maxoff (%d)",
start, maxoff);
natts = rel->rd_rel->relnatts; natts = rel->rd_rel->relnatts;
saferight = start; saferight = start;
safeitemid = PageGetItemId(page, saferight); safeitemid = PageGetItemId(page, saferight);
...@@ -822,8 +984,8 @@ _bt_findsplitloc(Relation rel, ...@@ -822,8 +984,8 @@ _bt_findsplitloc(Relation rel,
i = OffsetNumberNext(start); i = OffsetNumberNext(start);
while (nbytes < llimit) { while (nbytes < llimit)
{
/* check the next item on the page */ /* check the next item on the page */
nxtitemid = PageGetItemId(page, i); nxtitemid = PageGetItemId(page, i);
nbytes += (ItemIdGetLength(nxtitemid) + sizeof(ItemIdData)); nbytes += (ItemIdGetLength(nxtitemid) + sizeof(ItemIdData));
...@@ -840,7 +1002,10 @@ _bt_findsplitloc(Relation rel, ...@@ -840,7 +1002,10 @@ _bt_findsplitloc(Relation rel,
safeitem = nxtitem; safeitem = nxtitem;
saferight = i; saferight = i;
} }
i = OffsetNumberNext(i); if ( i < maxoff )
i = OffsetNumberNext(i);
else
break;
} }
/* /*
...@@ -851,6 +1016,9 @@ _bt_findsplitloc(Relation rel, ...@@ -851,6 +1016,9 @@ _bt_findsplitloc(Relation rel,
if (saferight == start) if (saferight == start)
saferight = i; saferight = i;
if ( saferight == maxoff && ( maxoff - start ) > 1 )
saferight = start + ( maxoff - start ) / 2;
return (saferight); return (saferight);
} }
...@@ -1051,10 +1219,22 @@ _bt_goesonpg(Relation rel, ...@@ -1051,10 +1219,22 @@ _bt_goesonpg(Relation rel,
* If we have no adjacency information, and the item is equal to the * If we have no adjacency information, and the item is equal to the
* high key on the page (by here it is), then the item does not belong * high key on the page (by here it is), then the item does not belong
* on this page. * on this page.
*
* Now it's not true in all cases. - vadim 06/10/97
*/ */
if (afteritem == (BTItem) NULL) if (afteritem == (BTItem) NULL)
{
if ( opaque->btpo_flags & BTP_LEAF )
return (false);
if ( opaque->btpo_flags & BTP_CHAIN )
return (true);
if ( _bt_skeycmp (rel, keysz, scankey, page,
PageGetItemId(page, P_FIRSTKEY),
BTEqualStrategyNumber) )
return (true);
return (false); return (false);
}
/* damn, have to work for it. i hate that. */ /* damn, have to work for it. i hate that. */
maxoff = PageGetMaxOffsetNumber(page); maxoff = PageGetMaxOffsetNumber(page);
...@@ -1269,6 +1449,7 @@ _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum, ...@@ -1269,6 +1449,7 @@ _bt_isequal (TupleDesc itupdesc, Page page, OffsetNumber offnum,
return (true); return (true);
} }
#if 0
/* /*
* _bt_shift - insert btitem on the passed page after shifting page * _bt_shift - insert btitem on the passed page after shifting page
* to the right in the tree. * to the right in the tree.
...@@ -1404,3 +1585,4 @@ _bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz, ...@@ -1404,3 +1585,4 @@ _bt_shift (Relation rel, Buffer buf, BTStack stack, int keysz,
return (res); return (res);
} }
#endif
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.20 1997/05/30 18:35:37 vadim Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.21 1997/06/10 07:28:50 vadim Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -159,6 +159,7 @@ _bt_moveright(Relation rel, ...@@ -159,6 +159,7 @@ _bt_moveright(Relation rel,
BTPageOpaque opaque; BTPageOpaque opaque;
ItemId hikey; ItemId hikey;
BlockNumber rblkno; BlockNumber rblkno;
int natts = rel->rd_rel->relnatts;
page = BufferGetPage(buf); page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page); opaque = (BTPageOpaque) PageGetSpecialPointer(page);
...@@ -195,26 +196,9 @@ _bt_moveright(Relation rel, ...@@ -195,26 +196,9 @@ _bt_moveright(Relation rel,
* on this page to do not lose "good" tuples if number * on this page to do not lose "good" tuples if number
* of attrs > keysize. Example: (2,0) - last items on * of attrs > keysize. Example: (2,0) - last items on
* this page, (2,1) - first item on next page (hikey), * this page, (2,1) - first item on next page (hikey),
* our scankey is x = 2. Scankey >= (2,1) because of * our scankey is x = 2. Scankey == (2,1) because of
* we compare first attrs only, but we shouldn't to move * we compare first attrs only, but we shouldn't to move
* right of here. - vadim 04/15/97 * right of here. - vadim 04/15/97
*
* XXX
* This code changed again! Actually, we break our
* duplicates handling in single case: if we insert
* new minimum key into leftmost page with duplicates
* and splitting doesn't occure then _bt_insertonpg doesn't
* worry about duplicates-rule. Fix _bt_insertonpg ?
* But I don't see why don't compare scankey with _last_
* item on the page instead of first one, in any cases.
* So - we do it in that way now. - vadim 05/26/97
*
* Also, if we are on an "pseudo-empty" leaf page (i.e. there is
* only hikey here) and scankey == hikey then we don't move
* right! It's fix for bug described in _bt_insertonpg(). It's
* right - at least till index cleanups are perfomed by vacuum
* in exclusive mode: so, though this page may be just splitted,
* it may not be "emptied" before we got here. - vadim 05/27/97
*/ */
if ( _bt_skeycmp (rel, keysz, scankey, page, hikey, if ( _bt_skeycmp (rel, keysz, scankey, page, hikey,
...@@ -227,14 +211,27 @@ _bt_moveright(Relation rel, ...@@ -227,14 +211,27 @@ _bt_moveright(Relation rel,
} }
if ( offmax > P_HIKEY ) if ( offmax > P_HIKEY )
{ {
if ( _bt_skeycmp (rel, keysz, scankey, page, if ( natts == keysz ) /* sanity checks */
{
if ( _bt_skeycmp (rel, keysz, scankey, page,
PageGetItemId (page, P_FIRSTKEY),
BTEqualStrategyNumber) )
elog (FATAL, "btree: BTP_CHAIN flag was expected");
if ( _bt_skeycmp (rel, keysz, scankey, page,
PageGetItemId (page, offmax),
BTEqualStrategyNumber) )
elog (FATAL, "btree: unexpected equal last item");
if ( _bt_skeycmp (rel, keysz, scankey, page,
PageGetItemId (page, offmax),
BTLessStrategyNumber) )
elog (FATAL, "btree: unexpected greater last item");
/* move right */
}
else if ( _bt_skeycmp (rel, keysz, scankey, page,
PageGetItemId (page, offmax), PageGetItemId (page, offmax),
BTLessEqualStrategyNumber) ) BTLessEqualStrategyNumber) )
break; break;
} }
else if ( offmax == P_HIKEY &&
( opaque->btpo_flags & BTP_LEAF ) )
break;
} }
/* step right one page */ /* step right one page */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment