Commit 8cb62426 authored by Tom Lane's avatar Tom Lane

Replace inefficient _bt_invokestrat calls with direct calls to the

appropriate btree three-way comparison routine.  Not clear why the
three-way comparison routines were being used in some paths and not
others in btree --- incomplete changes by someone long ago, maybe?
Anyway, this makes for a nice speedup in CREATE INDEX.
parent 49353692
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.54 2000/01/26 05:55:58 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.55 2000/02/18 06:32:33 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -20,8 +20,11 @@ ...@@ -20,8 +20,11 @@
static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem); static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf, BTStack stack, int keysz, ScanKey scankey, BTItem btitem, BTItem afteritem);
static Buffer _bt_split(Relation rel, Buffer buf, OffsetNumber firstright); static Buffer _bt_split(Relation rel, Size keysz, ScanKey scankey,
static OffsetNumber _bt_findsplitloc(Relation rel, Page page, OffsetNumber start, OffsetNumber maxoff, Size llimit); Buffer buf, OffsetNumber firstright);
static OffsetNumber _bt_findsplitloc(Relation rel, Size keysz, ScanKey scankey,
Page page, OffsetNumber start,
OffsetNumber maxoff, Size llimit);
static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf); static void _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem); static OffsetNumber _bt_pgaddtup(Relation rel, Buffer buf, int keysz, ScanKey itup_scankey, Size itemsize, BTItem btitem, BTItem afteritem);
static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem); static bool _bt_goesonpg(Relation rel, Buffer buf, Size keysz, ScanKey scankey, BTItem afteritem);
...@@ -297,7 +300,7 @@ _bt_insertonpg(Relation rel, ...@@ -297,7 +300,7 @@ _bt_insertonpg(Relation rel,
hitemid = PageGetItemId(page, P_HIKEY); hitemid = PageGetItemId(page, P_HIKEY);
hitem = (BTItem) PageGetItem(page, hitemid); hitem = (BTItem) PageGetItem(page, hitemid);
if (maxoff > P_HIKEY && if (maxoff > P_HIKEY &&
!_bt_itemcmp(rel, keysz, hitem, !_bt_itemcmp(rel, keysz, scankey, hitem,
(BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)), (BTItem) PageGetItem(page, PageGetItemId(page, P_FIRSTKEY)),
BTEqualStrategyNumber)) BTEqualStrategyNumber))
elog(FATAL, "btree: bad key on the page in the chain of duplicates"); elog(FATAL, "btree: bad key on the page in the chain of duplicates");
...@@ -373,7 +376,8 @@ _bt_insertonpg(Relation rel, ...@@ -373,7 +376,8 @@ _bt_insertonpg(Relation rel,
{ {
itid = PageGetItemId(page, offnum); itid = PageGetItemId(page, offnum);
chkitem = (BTItem) PageGetItem(page, itid); chkitem = (BTItem) PageGetItem(page, itid);
if (!_bt_itemcmp(rel, keysz, previtem, chkitem, if (!_bt_itemcmp(rel, keysz, scankey,
previtem, chkitem,
BTEqualStrategyNumber)) BTEqualStrategyNumber))
{ {
if (currsize > maxsize) if (currsize > maxsize)
...@@ -471,9 +475,10 @@ _bt_insertonpg(Relation rel, ...@@ -471,9 +475,10 @@ _bt_insertonpg(Relation rel,
MAXALIGN(sizeof(BTPageOpaqueData)) MAXALIGN(sizeof(BTPageOpaqueData))
+sizeof(ItemIdData); +sizeof(ItemIdData);
llimit /= 2; llimit /= 2;
firstright = _bt_findsplitloc(rel, page, start, maxoff, llimit); firstright = _bt_findsplitloc(rel, keysz, scankey,
page, start, maxoff, llimit);
if (_bt_itemcmp(rel, keysz, if (_bt_itemcmp(rel, keysz, scankey,
(BTItem) PageGetItem(page, PageGetItemId(page, start)), (BTItem) PageGetItem(page, PageGetItemId(page, start)),
(BTItem) PageGetItem(page, PageGetItemId(page, firstright)), (BTItem) PageGetItem(page, PageGetItemId(page, firstright)),
BTEqualStrategyNumber)) BTEqualStrategyNumber))
...@@ -503,7 +508,8 @@ _bt_insertonpg(Relation rel, ...@@ -503,7 +508,8 @@ _bt_insertonpg(Relation rel,
ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid), ItemPointerSet(&(stack->bts_btitem->bti_itup.t_tid),
bknum, P_HIKEY); bknum, P_HIKEY);
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
if (_bt_itemcmp(rel, keysz, stack->bts_btitem, if (_bt_itemcmp(rel, keysz, scankey,
stack->bts_btitem,
(BTItem) PageGetItem(page, (BTItem) PageGetItem(page,
PageGetItemId(page, start)), PageGetItemId(page, start)),
BTLessStrategyNumber)) BTLessStrategyNumber))
...@@ -519,7 +525,7 @@ _bt_insertonpg(Relation rel, ...@@ -519,7 +525,7 @@ _bt_insertonpg(Relation rel,
} }
/* split the buffer into left and right halves */ /* split the buffer into left and right halves */
rbuf = _bt_split(rel, buf, firstright); rbuf = _bt_split(rel, keysz, scankey, buf, firstright);
/* which new page (left half or right half) gets the tuple? */ /* which new page (left half or right half) gets the tuple? */
if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem)) if (_bt_goesonpg(rel, buf, keysz, scankey, afteritem))
...@@ -550,7 +556,7 @@ _bt_insertonpg(Relation rel, ...@@ -550,7 +556,7 @@ _bt_insertonpg(Relation rel,
elog(FATAL, "btree: un-shifted page is empty"); elog(FATAL, "btree: un-shifted page is empty");
lowLeftItem = (BTItem) PageGetItem(page, lowLeftItem = (BTItem) PageGetItem(page,
PageGetItemId(page, P_FIRSTKEY)); PageGetItemId(page, P_FIRSTKEY));
if (_bt_itemcmp(rel, keysz, lowLeftItem, if (_bt_itemcmp(rel, keysz, scankey, lowLeftItem,
(BTItem) PageGetItem(page, PageGetItemId(page, P_HIKEY)), (BTItem) PageGetItem(page, PageGetItemId(page, P_HIKEY)),
BTEqualStrategyNumber)) BTEqualStrategyNumber))
lpageop->btpo_flags |= BTP_CHAIN; lpageop->btpo_flags |= BTP_CHAIN;
...@@ -613,7 +619,8 @@ l_spl: ; ...@@ -613,7 +619,8 @@ l_spl: ;
{ {
ritem = (BTItem) PageGetItem(rpage, ritem = (BTItem) PageGetItem(rpage,
PageGetItemId(rpage, P_FIRSTKEY)); PageGetItemId(rpage, P_FIRSTKEY));
if (_bt_itemcmp(rel, keysz, ritem, if (_bt_itemcmp(rel, keysz, scankey,
ritem,
(BTItem) PageGetItem(rpage, (BTItem) PageGetItem(rpage,
PageGetItemId(rpage, P_HIKEY)), PageGetItemId(rpage, P_HIKEY)),
BTEqualStrategyNumber)) BTEqualStrategyNumber))
...@@ -663,13 +670,16 @@ l_spl: ; ...@@ -663,13 +670,16 @@ l_spl: ;
* possible in splitting leftmost page) and current parent * possible in splitting leftmost page) and current parent
* item == new_item. - vadim 05/27/97 * item == new_item. - vadim 05/27/97
*/ */
if (_bt_itemcmp(rel, keysz, stack->bts_btitem, new_item, if (_bt_itemcmp(rel, keysz, scankey,
stack->bts_btitem, new_item,
BTGreaterStrategyNumber) || BTGreaterStrategyNumber) ||
(!shifted && (!shifted &&
_bt_itemcmp(rel, keysz, stack->bts_btitem, _bt_itemcmp(rel, keysz, scankey,
new_item, BTEqualStrategyNumber) && stack->bts_btitem, new_item,
_bt_itemcmp(rel, keysz, lowLeftItem, BTEqualStrategyNumber) &&
new_item, BTLessStrategyNumber))) _bt_itemcmp(rel, keysz, scankey,
lowLeftItem, new_item,
BTLessStrategyNumber)))
{ {
do_update = true; do_update = true;
...@@ -831,7 +841,8 @@ l_spl: ; ...@@ -831,7 +841,8 @@ l_spl: ;
* pin and lock on buf are maintained. * pin and lock on buf are maintained.
*/ */
static Buffer static Buffer
_bt_split(Relation rel, Buffer buf, OffsetNumber firstright) _bt_split(Relation rel, Size keysz, ScanKey scankey,
Buffer buf, OffsetNumber firstright)
{ {
Buffer rbuf; Buffer rbuf;
Page origpage; Page origpage;
...@@ -915,7 +926,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright) ...@@ -915,7 +926,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright)
{ {
Size llimit = PageGetFreeSpace(leftpage) / 2; Size llimit = PageGetFreeSpace(leftpage) / 2;
firstright = _bt_findsplitloc(rel, origpage, start, maxoff, llimit); firstright = _bt_findsplitloc(rel, keysz, scankey,
origpage, start, maxoff, llimit);
} }
for (i = start; i <= maxoff; i = OffsetNumberNext(i)) for (i = start; i <= maxoff; i = OffsetNumberNext(i))
...@@ -1027,6 +1039,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright) ...@@ -1027,6 +1039,8 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright)
*/ */
static OffsetNumber static OffsetNumber
_bt_findsplitloc(Relation rel, _bt_findsplitloc(Relation rel,
Size keysz,
ScanKey scankey,
Page page, Page page,
OffsetNumber start, OffsetNumber start,
OffsetNumber maxoff, OffsetNumber maxoff,
...@@ -1039,12 +1053,10 @@ _bt_findsplitloc(Relation rel, ...@@ -1039,12 +1053,10 @@ _bt_findsplitloc(Relation rel,
BTItem safeitem, BTItem safeitem,
nxtitem; nxtitem;
Size nbytes; Size nbytes;
int natts;
if (start >= maxoff) if (start >= maxoff)
elog(FATAL, "btree: cannot split if start (%d) >= maxoff (%d)", elog(FATAL, "btree: cannot split if start (%d) >= maxoff (%d)",
start, maxoff); start, maxoff);
natts = rel->rd_rel->relnatts;
saferight = start; saferight = start;
safeitemid = PageGetItemId(page, saferight); safeitemid = PageGetItemId(page, saferight);
nbytes = ItemIdGetLength(safeitemid) + sizeof(ItemIdData); nbytes = ItemIdGetLength(safeitemid) + sizeof(ItemIdData);
...@@ -1064,7 +1076,7 @@ _bt_findsplitloc(Relation rel, ...@@ -1064,7 +1076,7 @@ _bt_findsplitloc(Relation rel,
* at isn't equal to the last safe one we saw, then it's our new * at isn't equal to the last safe one we saw, then it's our new
* safe tuple. * safe tuple.
*/ */
if (!_bt_itemcmp(rel, natts, if (!_bt_itemcmp(rel, keysz, scankey,
safeitem, nxtitem, BTEqualStrategyNumber)) safeitem, nxtitem, BTEqualStrategyNumber))
{ {
safeitem = nxtitem; safeitem = nxtitem;
...@@ -1345,92 +1357,94 @@ _bt_goesonpg(Relation rel, ...@@ -1345,92 +1357,94 @@ _bt_goesonpg(Relation rel,
} }
/* /*
* _bt_itemcmp() -- compare item1 to item2 using a requested * _bt_tuplecompare() -- compare two IndexTuples,
* strategy (<, <=, =, >=, >) * return -1, 0, or +1
* *
*/ */
bool int32
_bt_itemcmp(Relation rel, _bt_tuplecompare(Relation rel,
Size keysz, Size keysz,
BTItem item1, ScanKey scankey,
BTItem item2, IndexTuple tuple1,
StrategyNumber strat) IndexTuple tuple2)
{ {
TupleDesc tupDes; TupleDesc tupDes;
IndexTuple indexTuple1,
indexTuple2;
Datum attrDatum1,
attrDatum2;
int i; int i;
bool isFirstNull, int32 compare = 0;
isSecondNull;
bool compare;
bool useEqual = false;
if (strat == BTLessEqualStrategyNumber)
{
useEqual = true;
strat = BTLessStrategyNumber;
}
else if (strat == BTGreaterEqualStrategyNumber)
{
useEqual = true;
strat = BTGreaterStrategyNumber;
}
tupDes = RelationGetDescr(rel); tupDes = RelationGetDescr(rel);
indexTuple1 = &(item1->bti_itup);
indexTuple2 = &(item2->bti_itup);
for (i = 1; i <= keysz; i++) for (i = 1; i <= keysz; i++)
{ {
attrDatum1 = index_getattr(indexTuple1, i, tupDes, &isFirstNull); ScanKey entry = &scankey[i - 1];
attrDatum2 = index_getattr(indexTuple2, i, tupDes, &isSecondNull); Datum attrDatum1,
attrDatum2;
bool isFirstNull,
isSecondNull;
attrDatum1 = index_getattr(tuple1, i, tupDes, &isFirstNull);
attrDatum2 = index_getattr(tuple2, i, tupDes, &isSecondNull);
/* see comments about NULLs handling in btbuild */ /* see comments about NULLs handling in btbuild */
if (isFirstNull) /* attr in item1 is NULL */ if (isFirstNull) /* attr in tuple1 is NULL */
{ {
if (isSecondNull) /* attr in item2 is NULL too */ if (isSecondNull) /* attr in tuple2 is NULL too */
compare = (strat == BTEqualStrategyNumber) ? true : false; compare = 0;
else else
compare = (strat == BTGreaterStrategyNumber) ? true : false; compare = 1; /* NULL ">" not-NULL */
} }
else if (isSecondNull) /* attr in item1 is NOT_NULL and */ else if (isSecondNull) /* attr in tuple1 is NOT_NULL and */
{ /* and attr in item2 is NULL */ { /* attr in tuple2 is NULL */
compare = (strat == BTLessStrategyNumber) ? true : false; compare = -1; /* not-NULL "<" NULL */
} }
else else
compare = _bt_invokestrat(rel, i, strat, attrDatum1, attrDatum2);
if (compare) /* true for one of ">, <, =" */
{ {
if (strat != BTEqualStrategyNumber) compare = (int32) FMGR_PTR2(&entry->sk_func,
return true; attrDatum1, attrDatum2);
} }
else
/* false for one of ">, <, =" */
{
if (strat == BTEqualStrategyNumber)
return false;
/* if (compare != 0)
* if original strat was "<=, >=" OR "<, >" but some break; /* done when we find unequal attributes */
* attribute(s) left - need to test for Equality }
return compare;
}
/*
* _bt_itemcmp() -- compare two BTItems using a requested
* strategy (<, <=, =, >=, >)
*
*/ */
if (useEqual || i < keysz) bool
_bt_itemcmp(Relation rel,
Size keysz,
ScanKey scankey,
BTItem item1,
BTItem item2,
StrategyNumber strat)
{
int32 compare;
compare = _bt_tuplecompare(rel, keysz, scankey,
&(item1->bti_itup),
&(item2->bti_itup));
switch (strat)
{ {
if (isFirstNull || isSecondNull) case BTLessStrategyNumber:
compare = (isFirstNull && isSecondNull) ? true : false; return (bool) (compare < 0);
else case BTLessEqualStrategyNumber:
compare = _bt_invokestrat(rel, i, BTEqualStrategyNumber, return (bool) (compare <= 0);
attrDatum1, attrDatum2); case BTEqualStrategyNumber:
if (compare) /* item1' and item2' attributes are equal */ return (bool) (compare == 0);
continue; /* - try to compare next attributes */ case BTGreaterEqualStrategyNumber:
return (bool) (compare >= 0);
case BTGreaterStrategyNumber:
return (bool) (compare > 0);
} }
elog(ERROR, "_bt_itemcmp: bogus strategy %d", (int) strat);
return false; return false;
}
}
return true;
} }
/* /*
...@@ -1585,7 +1599,7 @@ _bt_shift(Relation rel, Buffer buf, BTStack stack, int keysz, ...@@ -1585,7 +1599,7 @@ _bt_shift(Relation rel, Buffer buf, BTStack stack, int keysz,
/* init old page opaque */ /* init old page opaque */
pageop->btpo_flags = npageop->btpo_flags; /* restore flags */ pageop->btpo_flags = npageop->btpo_flags; /* restore flags */
pageop->btpo_flags &= ~BTP_CHAIN; pageop->btpo_flags &= ~BTP_CHAIN;
if (_bt_itemcmp(rel, keysz, hikey, btitem, BTEqualStrategyNumber)) if (_bt_itemcmp(rel, keysz, scankey, hikey, btitem, BTEqualStrategyNumber))
pageop->btpo_flags |= BTP_CHAIN; pageop->btpo_flags |= BTP_CHAIN;
pageop->btpo_prev = npageop->btpo_prev; /* restore prev */ pageop->btpo_prev = npageop->btpo_prev; /* restore prev */
pageop->btpo_next = nbknum; /* next points to the new page */ pageop->btpo_next = nbknum; /* next points to the new page */
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.56 2000/01/26 05:55:58 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsearch.c,v 1.57 2000/02/18 06:32:39 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -264,36 +264,21 @@ _bt_skeycmp(Relation rel, ...@@ -264,36 +264,21 @@ _bt_skeycmp(Relation rel,
BTItem item; BTItem item;
IndexTuple indexTuple; IndexTuple indexTuple;
TupleDesc tupDes; TupleDesc tupDes;
ScanKey entry;
int i; int i;
Datum attrDatum; int32 compare = 0;
Datum keyDatum;
bool compare;
bool isNull;
bool useEqual = false;
bool keyNull;
if (strat == BTLessEqualStrategyNumber)
{
useEqual = true;
strat = BTLessStrategyNumber;
}
else if (strat == BTGreaterEqualStrategyNumber)
{
useEqual = true;
strat = BTGreaterStrategyNumber;
}
item = (BTItem) PageGetItem(page, itemid); item = (BTItem) PageGetItem(page, itemid);
indexTuple = &(item->bti_itup); indexTuple = &(item->bti_itup);
tupDes = RelationGetDescr(rel); tupDes = RelationGetDescr(rel);
/* see if the comparison is true for all of the key attributes */
for (i = 1; i <= keysz; i++) for (i = 1; i <= keysz; i++)
{ {
ScanKey entry = &scankey[i - 1];
Datum attrDatum;
bool isNull;
Datum keyDatum;
entry = &scankey[i - 1];
Assert(entry->sk_attno == i); Assert(entry->sk_attno == i);
attrDatum = index_getattr(indexTuple, attrDatum = index_getattr(indexTuple,
entry->sk_attno, entry->sk_attno,
...@@ -304,54 +289,40 @@ _bt_skeycmp(Relation rel, ...@@ -304,54 +289,40 @@ _bt_skeycmp(Relation rel,
/* see comments about NULLs handling in btbuild */ /* see comments about NULLs handling in btbuild */
if (entry->sk_flags & SK_ISNULL) /* key is NULL */ if (entry->sk_flags & SK_ISNULL) /* key is NULL */
{ {
Assert(entry->sk_procedure == F_NULLVALUE);
keyNull = true;
if (isNull) if (isNull)
compare = (strat == BTEqualStrategyNumber) ? true : false; compare = 0; /* NULL key "=" NULL datum */
else else
compare = (strat == BTGreaterStrategyNumber) ? true : false; compare = 1; /* NULL key ">" not-NULL datum */
} }
else if (isNull) /* key is NOT_NULL and item is NULL */ else if (isNull) /* key is NOT_NULL and item is NULL */
{ {
keyNull = false; compare = -1; /* not-NULL key "<" NULL datum */
compare = (strat == BTLessStrategyNumber) ? true : false;
} }
else else
{ {
keyNull = false; compare = (int32) FMGR_PTR2(&entry->sk_func, keyDatum, attrDatum);
compare = _bt_invokestrat(rel, i, strat, keyDatum, attrDatum);
} }
if (compare) /* true for one of ">, <, =" */ if (compare != 0)
{ break; /* done when we find unequal attributes */
if (strat != BTEqualStrategyNumber)
return true;
} }
else
/* false for one of ">, <, =" */
{
if (strat == BTEqualStrategyNumber)
return false;
/* switch (strat)
* if original strat was "<=, >=" OR "<, >" but some
* attribute(s) left - need to test for Equality
*/
if (useEqual || i < keysz)
{ {
if (keyNull || isNull) case BTLessStrategyNumber:
compare = (keyNull && isNull) ? true : false; return (bool) (compare < 0);
else case BTLessEqualStrategyNumber:
compare = _bt_invokestrat(rel, i, BTEqualStrategyNumber, return (bool) (compare <= 0);
keyDatum, attrDatum); case BTEqualStrategyNumber:
if (compare) /* key' and item' attributes are equal */ return (bool) (compare == 0);
continue; /* - try to compare next attributes */ case BTGreaterEqualStrategyNumber:
} return (bool) (compare >= 0);
return false; case BTGreaterStrategyNumber:
} return (bool) (compare > 0);
} }
return true; elog(ERROR, "_bt_skeycmp: bogus strategy %d", (int) strat);
return false;
} }
/* /*
...@@ -610,7 +581,6 @@ _bt_compare(Relation rel, ...@@ -610,7 +581,6 @@ _bt_compare(Relation rel,
/* see comments about NULLs handling in btbuild */ /* see comments about NULLs handling in btbuild */
if (entry->sk_flags & SK_ISNULL) /* key is NULL */ if (entry->sk_flags & SK_ISNULL) /* key is NULL */
{ {
Assert(entry->sk_procedure == F_NULLVALUE);
if (null) if (null)
tmpres = (long) 0; /* NULL "=" NULL */ tmpres = (long) 0; /* NULL "=" NULL */
else else
......
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsort.c,v 1.50 2000/01/26 05:55:59 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsort.c,v 1.51 2000/02/18 06:32:39 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -69,12 +69,13 @@ struct BTSpool ...@@ -69,12 +69,13 @@ struct BTSpool
static void _bt_load(Relation index, BTSpool *btspool); static void _bt_load(Relation index, BTSpool *btspool);
static BTItem _bt_buildadd(Relation index, BTPageState *state, BTItem bti, static BTItem _bt_buildadd(Relation index, Size keysz, ScanKey scankey,
int flags); BTPageState *state, BTItem bti, int flags);
static BTItem _bt_minitem(Page opage, BlockNumber oblkno, int atend); static BTItem _bt_minitem(Page opage, BlockNumber oblkno, int atend);
static BTPageState *_bt_pagestate(Relation index, int flags, static BTPageState *_bt_pagestate(Relation index, int flags,
int level, bool doupper); int level, bool doupper);
static void _bt_uppershutdown(Relation index, BTPageState *state); static void _bt_uppershutdown(Relation index, Size keysz, ScanKey scankey,
BTPageState *state);
/* /*
...@@ -282,7 +283,8 @@ _bt_minitem(Page opage, BlockNumber oblkno, int atend) ...@@ -282,7 +283,8 @@ _bt_minitem(Page opage, BlockNumber oblkno, int atend)
* if all keys are unique, 'first' will always be the same as 'last'. * if all keys are unique, 'first' will always be the same as 'last'.
*/ */
static BTItem static BTItem
_bt_buildadd(Relation index, BTPageState *state, BTItem bti, int flags) _bt_buildadd(Relation index, Size keysz, ScanKey scankey,
BTPageState *state, BTItem bti, int flags)
{ {
Buffer nbuf; Buffer nbuf;
Page npage; Page npage;
...@@ -402,7 +404,7 @@ _bt_buildadd(Relation index, BTPageState *state, BTItem bti, int flags) ...@@ -402,7 +404,7 @@ _bt_buildadd(Relation index, BTPageState *state, BTItem bti, int flags)
nopaque->btpo_prev = BufferGetBlockNumber(obuf); nopaque->btpo_prev = BufferGetBlockNumber(obuf);
nopaque->btpo_next = P_NONE; nopaque->btpo_next = P_NONE;
if (_bt_itemcmp(index, index->rd_att->natts, if (_bt_itemcmp(index, keysz, scankey,
(BTItem) PageGetItem(opage, PageGetItemId(opage, P_HIKEY)), (BTItem) PageGetItem(opage, PageGetItemId(opage, P_HIKEY)),
(BTItem) PageGetItem(opage, PageGetItemId(opage, P_FIRSTKEY)), (BTItem) PageGetItem(opage, PageGetItemId(opage, P_FIRSTKEY)),
BTEqualStrategyNumber)) BTEqualStrategyNumber))
...@@ -424,7 +426,7 @@ _bt_buildadd(Relation index, BTPageState *state, BTItem bti, int flags) ...@@ -424,7 +426,7 @@ _bt_buildadd(Relation index, BTPageState *state, BTItem bti, int flags)
_bt_pagestate(index, 0, state->btps_level + 1, true); _bt_pagestate(index, 0, state->btps_level + 1, true);
} }
nbti = _bt_minitem(opage, BufferGetBlockNumber(obuf), 0); nbti = _bt_minitem(opage, BufferGetBlockNumber(obuf), 0);
_bt_buildadd(index, state->btps_next, nbti, 0); _bt_buildadd(index, keysz, scankey, state->btps_next, nbti, 0);
pfree((void *) nbti); pfree((void *) nbti);
} }
...@@ -454,7 +456,7 @@ _bt_buildadd(Relation index, BTPageState *state, BTItem bti, int flags) ...@@ -454,7 +456,7 @@ _bt_buildadd(Relation index, BTPageState *state, BTItem bti, int flags)
#endif #endif
if (last_bti == (BTItem) NULL) if (last_bti == (BTItem) NULL)
first_off = P_FIRSTKEY; first_off = P_FIRSTKEY;
else if (!_bt_itemcmp(index, index->rd_att->natts, else if (!_bt_itemcmp(index, keysz, scankey,
bti, last_bti, BTEqualStrategyNumber)) bti, last_bti, BTEqualStrategyNumber))
first_off = off; first_off = off;
last_off = off; last_off = off;
...@@ -470,7 +472,8 @@ _bt_buildadd(Relation index, BTPageState *state, BTItem bti, int flags) ...@@ -470,7 +472,8 @@ _bt_buildadd(Relation index, BTPageState *state, BTItem bti, int flags)
} }
static void static void
_bt_uppershutdown(Relation index, BTPageState *state) _bt_uppershutdown(Relation index, Size keysz, ScanKey scankey,
BTPageState *state)
{ {
BTPageState *s; BTPageState *s;
BlockNumber blkno; BlockNumber blkno;
...@@ -499,7 +502,7 @@ _bt_uppershutdown(Relation index, BTPageState *state) ...@@ -499,7 +502,7 @@ _bt_uppershutdown(Relation index, BTPageState *state)
else else
{ {
bti = _bt_minitem(s->btps_page, blkno, 0); bti = _bt_minitem(s->btps_page, blkno, 0);
_bt_buildadd(index, s->btps_next, bti, 0); _bt_buildadd(index, keysz, scankey, s->btps_next, bti, 0);
pfree((void *) bti); pfree((void *) bti);
} }
} }
...@@ -521,6 +524,8 @@ static void ...@@ -521,6 +524,8 @@ static void
_bt_load(Relation index, BTSpool *btspool) _bt_load(Relation index, BTSpool *btspool)
{ {
BTPageState *state; BTPageState *state;
ScanKey skey;
int natts;
BTItem bti; BTItem bti;
bool should_free; bool should_free;
...@@ -529,93 +534,21 @@ _bt_load(Relation index, BTSpool *btspool) ...@@ -529,93 +534,21 @@ _bt_load(Relation index, BTSpool *btspool)
*/ */
state = _bt_pagestate(index, BTP_LEAF, 0, true); state = _bt_pagestate(index, BTP_LEAF, 0, true);
skey = _bt_mkscankey_nodata(index);
natts = RelationGetNumberOfAttributes(index);
for (;;) for (;;)
{ {
bti = (BTItem) tuplesort_getindextuple(btspool->sortstate, true, bti = (BTItem) tuplesort_getindextuple(btspool->sortstate, true,
&should_free); &should_free);
if (bti == (BTItem) NULL) if (bti == (BTItem) NULL)
break; break;
_bt_buildadd(index, state, bti, BTP_LEAF); _bt_buildadd(index, natts, skey, state, bti, BTP_LEAF);
if (should_free) if (should_free)
pfree((void *) bti); pfree((void *) bti);
} }
_bt_uppershutdown(index, state); _bt_uppershutdown(index, natts, skey, state);
}
/*
* given the (appropriately side-linked) leaf pages of a btree,
* construct the corresponding upper levels. we do this by inserting
* minimum keys from each page into parent pages as needed. the
* format of the internal pages is otherwise the same as for leaf
* pages.
*
* this routine is not called during conventional bulk-loading (in
* which case we can just build the upper levels as we create the
* sorted bottom level). it is only used for index recycling.
*/
#ifdef NOT_USED
void
_bt_upperbuild(Relation index)
{
Buffer rbuf;
BlockNumber blk;
Page rpage;
BTPageOpaque ropaque;
BTPageState *state;
BTItem nbti;
/*
* find the first leaf block. while we're at it, clear the BTP_ROOT
* flag that we set while building it (so we could find it later).
*/
rbuf = _bt_getroot(index, BT_WRITE);
blk = BufferGetBlockNumber(rbuf);
rpage = BufferGetPage(rbuf);
ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
ropaque->btpo_flags &= ~BTP_ROOT;
_bt_wrtbuf(index, rbuf);
state = _bt_pagestate(index, 0, 0, true);
/* for each page... */ _bt_freeskey(skey);
do
{
#ifdef NOT_USED
printf("\t\tblk=%d\n", blk);
#endif
rbuf = _bt_getbuf(index, blk, BT_READ);
rpage = BufferGetPage(rbuf);
ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
/* for each item... */
if (!PageIsEmpty(rpage))
{
/*
* form a new index tuple corresponding to the minimum key of
* the lower page and insert it into a page at this level.
*/
nbti = _bt_minitem(rpage, blk, P_RIGHTMOST(ropaque));
#ifdef FASTBUILD_DEBUG
{
bool isnull;
Datum d = index_getattr(&(nbti->bti_itup), 1, index->rd_att,
&isnull);
printf("_bt_upperbuild: inserting <%x> at %d\n",
d, state->btps_level);
}
#endif
_bt_buildadd(index, state, nbti, 0);
pfree((void *) nbti);
}
blk = ropaque->btpo_next;
_bt_relbuf(index, rbuf, BT_READ);
} while (blk != P_NONE);
_bt_uppershutdown(index, state);
} }
#endif
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/Attic/nbtstrat.c,v 1.11 2000/01/26 05:55:59 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/Attic/nbtstrat.c,v 1.12 2000/02/18 06:32:39 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -118,6 +118,8 @@ _bt_getstrat(Relation rel, ...@@ -118,6 +118,8 @@ _bt_getstrat(Relation rel,
return strat; return strat;
} }
#ifdef NOT_USED
bool bool
_bt_invokestrat(Relation rel, _bt_invokestrat(Relation rel,
AttrNumber attno, AttrNumber attno,
...@@ -128,3 +130,5 @@ _bt_invokestrat(Relation rel, ...@@ -128,3 +130,5 @@ _bt_invokestrat(Relation rel,
return (RelationInvokeStrategy(rel, &BTEvaluationData, attno, strat, return (RelationInvokeStrategy(rel, &BTEvaluationData, attno, strat,
left, right)); left, right));
} }
#endif
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.34 2000/01/26 05:55:59 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtutils.c,v 1.35 2000/02/18 06:32:39 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -23,7 +23,14 @@ ...@@ -23,7 +23,14 @@
extern int NIndexTupleProcessed; extern int NIndexTupleProcessed;
/*
* _bt_mkscankey
* Build a scan key that contains comparison data from itup
* as well as comparator routines appropriate to the key datatypes.
*
* The result is intended for use with _bt_skeycmp() or _bt_compare(),
* although it could be used with _bt_itemcmp() or _bt_tuplecompare().
*/
ScanKey ScanKey
_bt_mkscankey(Relation rel, IndexTuple itup) _bt_mkscankey(Relation rel, IndexTuple itup)
{ {
...@@ -31,35 +38,67 @@ _bt_mkscankey(Relation rel, IndexTuple itup) ...@@ -31,35 +38,67 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
TupleDesc itupdesc; TupleDesc itupdesc;
int natts; int natts;
int i; int i;
Datum arg;
RegProcedure proc; RegProcedure proc;
Datum arg;
bool null; bool null;
bits16 flag; bits16 flag;
natts = rel->rd_rel->relnatts;
itupdesc = RelationGetDescr(rel); itupdesc = RelationGetDescr(rel);
natts = RelationGetNumberOfAttributes(rel);
skey = (ScanKey) palloc(natts * sizeof(ScanKeyData)); skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
for (i = 0; i < natts; i++) for (i = 0; i < natts; i++)
{ {
proc = index_getprocid(rel, i + 1, BTORDER_PROC);
arg = index_getattr(itup, i + 1, itupdesc, &null); arg = index_getattr(itup, i + 1, itupdesc, &null);
if (null) flag = null ? SK_ISNULL : 0x0;
{ ScanKeyEntryInitialize(&skey[i],
proc = F_NULLVALUE; flag,
flag = SK_ISNULL; (AttrNumber) (i + 1),
proc,
arg);
} }
else
return skey;
}
/*
* _bt_mkscankey_nodata
* Build a scan key that contains comparator routines appropriate to
* the key datatypes, but no comparison data.
*
* The result can be used with _bt_itemcmp() or _bt_tuplecompare(),
* but not with _bt_skeycmp() or _bt_compare().
*/
ScanKey
_bt_mkscankey_nodata(Relation rel)
{
ScanKey skey;
int natts;
int i;
RegProcedure proc;
natts = RelationGetNumberOfAttributes(rel);
skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
for (i = 0; i < natts; i++)
{ {
proc = index_getprocid(rel, i + 1, BTORDER_PROC); proc = index_getprocid(rel, i + 1, BTORDER_PROC);
flag = 0x0; ScanKeyEntryInitialize(&skey[i],
} SK_ISNULL,
ScanKeyEntryInitialize(&skey[i], flag, (AttrNumber) (i + 1), proc, arg); (AttrNumber) (i + 1),
proc,
(Datum) NULL);
} }
return skey; return skey;
} }
/*
* free a scan key made by either _bt_mkscankey or _bt_mkscankey_nodata.
*/
void void
_bt_freeskey(ScanKey skey) _bt_freeskey(ScanKey skey)
{ {
......
...@@ -78,7 +78,7 @@ ...@@ -78,7 +78,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.5 2000/01/26 05:57:33 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/utils/sort/tuplesort.c,v 1.6 2000/02/18 06:32:30 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -253,6 +253,7 @@ struct Tuplesortstate ...@@ -253,6 +253,7 @@ struct Tuplesortstate
* by tuplesort_begin_index and used only by the IndexTuple routines. * by tuplesort_begin_index and used only by the IndexTuple routines.
*/ */
Relation indexRel; Relation indexRel;
ScanKey indexScanKey;
bool enforceUnique; /* complain if we find duplicate tuples */ bool enforceUnique; /* complain if we find duplicate tuples */
/* /*
...@@ -476,6 +477,8 @@ tuplesort_begin_index(Relation indexRel, ...@@ -476,6 +477,8 @@ tuplesort_begin_index(Relation indexRel,
state->tuplesize = tuplesize_index; state->tuplesize = tuplesize_index;
state->indexRel = indexRel; state->indexRel = indexRel;
/* see comments below about btree dependence of this code... */
state->indexScanKey = _bt_mkscankey_nodata(indexRel);
state->enforceUnique = enforceUnique; state->enforceUnique = enforceUnique;
return state; return state;
...@@ -1745,40 +1748,56 @@ tuplesize_heap(Tuplesortstate *state, void *tup) ...@@ -1745,40 +1748,56 @@ tuplesize_heap(Tuplesortstate *state, void *tup)
static int static int
comparetup_index(Tuplesortstate *state, const void *a, const void *b) comparetup_index(Tuplesortstate *state, const void *a, const void *b)
{ {
IndexTuple ltup = (IndexTuple) a; /*
IndexTuple rtup = (IndexTuple) b; * This is almost the same as _bt_tuplecompare(), but we need to
TupleDesc itdesc = state->indexRel->rd_att; * keep track of whether any null fields are present.
bool equal_isnull = false; */
IndexTuple tuple1 = (IndexTuple) a;
IndexTuple tuple2 = (IndexTuple) b;
Relation rel = state->indexRel;
Size keysz = RelationGetNumberOfAttributes(rel);
ScanKey scankey = state->indexScanKey;
TupleDesc tupDes;
int i; int i;
bool equal_hasnull = false;
tupDes = RelationGetDescr(rel);
for (i = 1; i <= itdesc->natts; i++) for (i = 1; i <= keysz; i++)
{ {
Datum lattr, ScanKey entry = &scankey[i - 1];
rattr; Datum attrDatum1,
bool isnull1, attrDatum2;
isnull2; bool isFirstNull,
isSecondNull;
int32 compare;
lattr = index_getattr(ltup, i, itdesc, &isnull1); attrDatum1 = index_getattr(tuple1, i, tupDes, &isFirstNull);
rattr = index_getattr(rtup, i, itdesc, &isnull2); attrDatum2 = index_getattr(tuple2, i, tupDes, &isSecondNull);
if (isnull1) /* see comments about NULLs handling in btbuild */
if (isFirstNull) /* attr in tuple1 is NULL */
{ {
if (!isnull2) if (isSecondNull) /* attr in tuple2 is NULL too */
return 1; /* NULL sorts after non-NULL */ {
equal_isnull = true; compare = 0;
continue; equal_hasnull = true;
}
else
compare = 1; /* NULL ">" not-NULL */
}
else if (isSecondNull) /* attr in tuple1 is NOT_NULL and */
{ /* attr in tuple2 is NULL */
compare = -1; /* not-NULL "<" NULL */
}
else
{
compare = (int32) FMGR_PTR2(&entry->sk_func,
attrDatum1, attrDatum2);
} }
else if (isnull2)
return -1;
if (_bt_invokestrat(state->indexRel, i, if (compare != 0)
BTGreaterStrategyNumber, return (int) compare; /* done when we find unequal attributes */
lattr, rattr))
return 1;
if (_bt_invokestrat(state->indexRel, i,
BTGreaterStrategyNumber,
rattr, lattr))
return -1;
} }
/* /*
...@@ -1790,7 +1809,7 @@ comparetup_index(Tuplesortstate *state, const void *a, const void *b) ...@@ -1790,7 +1809,7 @@ comparetup_index(Tuplesortstate *state, const void *a, const void *b)
* the sort algorithm wouldn't have checked whether one must appear * the sort algorithm wouldn't have checked whether one must appear
* before the other. * before the other.
*/ */
if (state->enforceUnique && !equal_isnull) if (state->enforceUnique && !equal_hasnull)
elog(ERROR, "Cannot create unique index. Table contains non-unique values"); elog(ERROR, "Cannot create unique index. Table contains non-unique values");
return 0; return 0;
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: nbtree.h,v 1.33 2000/01/26 05:57:50 momjian Exp $ * $Id: nbtree.h,v 1.34 2000/02/18 06:32:28 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -205,10 +205,10 @@ typedef struct BTPageState ...@@ -205,10 +205,10 @@ typedef struct BTPageState
*/ */
extern InsertIndexResult _bt_doinsert(Relation rel, BTItem btitem, extern InsertIndexResult _bt_doinsert(Relation rel, BTItem btitem,
bool index_is_unique, Relation heapRel); bool index_is_unique, Relation heapRel);
extern int32 _bt_tuplecompare(Relation rel, Size keysz, ScanKey scankey,
/* default is to allow duplicates */ IndexTuple tuple1, IndexTuple tuple2);
extern bool _bt_itemcmp(Relation rel, Size keysz, BTItem item1, BTItem item2, extern bool _bt_itemcmp(Relation rel, Size keysz, ScanKey scankey,
StrategyNumber strat); BTItem item1, BTItem item2, StrategyNumber strat);
/* /*
* prototypes for functions in nbtpage.c * prototypes for functions in nbtpage.c
...@@ -273,13 +273,12 @@ extern bool _bt_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir); ...@@ -273,13 +273,12 @@ extern bool _bt_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir);
*/ */
extern StrategyNumber _bt_getstrat(Relation rel, AttrNumber attno, extern StrategyNumber _bt_getstrat(Relation rel, AttrNumber attno,
RegProcedure proc); RegProcedure proc);
extern bool _bt_invokestrat(Relation rel, AttrNumber attno,
StrategyNumber strat, Datum left, Datum right);
/* /*
* prototypes for functions in nbtutils.c * prototypes for functions in nbtutils.c
*/ */
extern ScanKey _bt_mkscankey(Relation rel, IndexTuple itup); extern ScanKey _bt_mkscankey(Relation rel, IndexTuple itup);
extern ScanKey _bt_mkscankey_nodata(Relation rel);
extern void _bt_freeskey(ScanKey skey); extern void _bt_freeskey(ScanKey skey);
extern void _bt_freestack(BTStack stack); extern void _bt_freestack(BTStack stack);
extern void _bt_orderkeys(Relation relation, BTScanOpaque so); extern void _bt_orderkeys(Relation relation, BTScanOpaque so);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment