Commit e5adcb78 authored by Peter Geoghegan's avatar Peter Geoghegan

Refactor nbtree insertion scankeys.

Use dedicated struct to represent nbtree insertion scan keys.  Having a
dedicated struct makes the difference between search type scankeys and
insertion scankeys a lot clearer, and simplifies the signature of
several related functions.  This is based on a suggestion by Andrey
Lepikhov.

Streamline how unique index insertions cache binary search progress.
Cache the state of in-progress binary searches within _bt_check_unique()
for later instead of having callers avoid repeating the binary search in
an ad-hoc manner.  This makes it easy to add a new optimization:
_bt_check_unique() now falls out of its loop immediately in the common
case where it's already clear that there couldn't possibly be a
duplicate.

The new _bt_check_unique() scheme makes it a lot easier to manage cached
binary search effort afterwards, from within _bt_findinsertloc().  This
is needed for the upcoming patch to make nbtree tuples unique by
treating heap TID as a final tiebreaker column.  Unique key binary
searches need to restore lower and upper bounds.  They cannot simply
continue to use the >= lower bound as the offset to insert at, because
the heap TID tiebreaker column must be used in comparisons for the
restored binary search (unlike the original _bt_check_unique() binary
search, where scankey's heap TID column must be omitted).

Author: Peter Geoghegan, Heikki Linnakangas
Reviewed-By: Heikki Linnakangas, Andrey Lepikhov
Discussion: https://postgr.es/m/CAH2-WzmE6AhUdk9NdWBf4K3HjWXZBX3+umC7mH7+WDrKcRtsOw@mail.gmail.com
parent 550b9d26
......@@ -127,9 +127,9 @@ static void bt_check_every_level(Relation rel, Relation heaprel,
static BtreeLevel bt_check_level_from_leftmost(BtreeCheckState *state,
BtreeLevel level);
static void bt_target_page_check(BtreeCheckState *state);
static ScanKey bt_right_page_check_scankey(BtreeCheckState *state);
static void bt_downlink_check(BtreeCheckState *state, BlockNumber childblock,
ScanKey targetkey);
static BTScanInsert bt_right_page_check_scankey(BtreeCheckState *state);
static void bt_downlink_check(BtreeCheckState *state, BTScanInsert targetkey,
BlockNumber childblock);
static void bt_downlink_missing_check(BtreeCheckState *state);
static void bt_tuple_present_callback(Relation index, HeapTuple htup,
Datum *values, bool *isnull,
......@@ -139,14 +139,14 @@ static IndexTuple bt_normalize_tuple(BtreeCheckState *state,
static inline bool offset_is_negative_infinity(BTPageOpaque opaque,
OffsetNumber offset);
static inline bool invariant_leq_offset(BtreeCheckState *state,
ScanKey key,
BTScanInsert key,
OffsetNumber upperbound);
static inline bool invariant_geq_offset(BtreeCheckState *state,
ScanKey key,
BTScanInsert key,
OffsetNumber lowerbound);
static inline bool invariant_leq_nontarget_offset(BtreeCheckState *state,
Page other,
ScanKey key,
BTScanInsert key,
Page nontarget,
OffsetNumber upperbound);
static Page palloc_btree_page(BtreeCheckState *state, BlockNumber blocknum);
......@@ -838,8 +838,8 @@ bt_target_page_check(BtreeCheckState *state)
{
ItemId itemid;
IndexTuple itup;
ScanKey skey;
size_t tupsize;
BTScanInsert skey;
CHECK_FOR_INTERRUPTS();
......@@ -1030,7 +1030,7 @@ bt_target_page_check(BtreeCheckState *state)
*/
else if (offset == max)
{
ScanKey rightkey;
BTScanInsert rightkey;
/* Get item in next/right page */
rightkey = bt_right_page_check_scankey(state);
......@@ -1082,7 +1082,7 @@ bt_target_page_check(BtreeCheckState *state)
{
BlockNumber childblock = BTreeInnerTupleGetDownLink(itup);
bt_downlink_check(state, childblock, skey);
bt_downlink_check(state, skey, childblock);
}
}
......@@ -1111,11 +1111,12 @@ bt_target_page_check(BtreeCheckState *state)
* Note that !readonly callers must reverify that target page has not
* been concurrently deleted.
*/
static ScanKey
static BTScanInsert
bt_right_page_check_scankey(BtreeCheckState *state)
{
BTPageOpaque opaque;
ItemId rightitem;
IndexTuple firstitup;
BlockNumber targetnext;
Page rightpage;
OffsetNumber nline;
......@@ -1303,8 +1304,8 @@ bt_right_page_check_scankey(BtreeCheckState *state)
* Return first real item scankey. Note that this relies on right page
* memory remaining allocated.
*/
return _bt_mkscankey(state->rel,
(IndexTuple) PageGetItem(rightpage, rightitem));
firstitup = (IndexTuple) PageGetItem(rightpage, rightitem);
return _bt_mkscankey(state->rel, firstitup);
}
/*
......@@ -1317,8 +1318,8 @@ bt_right_page_check_scankey(BtreeCheckState *state)
* verification this way around is much more practical.
*/
static void
bt_downlink_check(BtreeCheckState *state, BlockNumber childblock,
ScanKey targetkey)
bt_downlink_check(BtreeCheckState *state, BTScanInsert targetkey,
BlockNumber childblock)
{
OffsetNumber offset;
OffsetNumber maxoffset;
......@@ -1423,8 +1424,7 @@ bt_downlink_check(BtreeCheckState *state, BlockNumber childblock,
if (offset_is_negative_infinity(copaque, offset))
continue;
if (!invariant_leq_nontarget_offset(state, child,
targetkey, offset))
if (!invariant_leq_nontarget_offset(state, targetkey, child, offset))
ereport(ERROR,
(errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("down-link lower bound invariant violated for index \"%s\"",
......@@ -1864,13 +1864,12 @@ offset_is_negative_infinity(BTPageOpaque opaque, OffsetNumber offset)
* to corruption.
*/
static inline bool
invariant_leq_offset(BtreeCheckState *state, ScanKey key,
invariant_leq_offset(BtreeCheckState *state, BTScanInsert key,
OffsetNumber upperbound)
{
int16 nkeyatts = IndexRelationGetNumberOfKeyAttributes(state->rel);
int32 cmp;
cmp = _bt_compare(state->rel, nkeyatts, key, state->target, upperbound);
cmp = _bt_compare(state->rel, key, state->target, upperbound);
return cmp <= 0;
}
......@@ -1883,13 +1882,12 @@ invariant_leq_offset(BtreeCheckState *state, ScanKey key,
* to corruption.
*/
static inline bool
invariant_geq_offset(BtreeCheckState *state, ScanKey key,
invariant_geq_offset(BtreeCheckState *state, BTScanInsert key,
OffsetNumber lowerbound)
{
int16 nkeyatts = IndexRelationGetNumberOfKeyAttributes(state->rel);
int32 cmp;
cmp = _bt_compare(state->rel, nkeyatts, key, state->target, lowerbound);
cmp = _bt_compare(state->rel, key, state->target, lowerbound);
return cmp >= 0;
}
......@@ -1905,14 +1903,12 @@ invariant_geq_offset(BtreeCheckState *state, ScanKey key,
* to corruption.
*/
static inline bool
invariant_leq_nontarget_offset(BtreeCheckState *state,
Page nontarget, ScanKey key,
OffsetNumber upperbound)
invariant_leq_nontarget_offset(BtreeCheckState *state, BTScanInsert key,
Page nontarget, OffsetNumber upperbound)
{
int16 nkeyatts = IndexRelationGetNumberOfKeyAttributes(state->rel);
int32 cmp;
cmp = _bt_compare(state->rel, nkeyatts, key, nontarget, upperbound);
cmp = _bt_compare(state->rel, key, nontarget, upperbound);
return cmp <= 0;
}
......
......@@ -598,19 +598,22 @@ scankey point to comparison functions that return boolean, such as int4lt.
There might be more than one scankey entry for a given index column, or
none at all. (We require the keys to appear in index column order, but
the order of multiple keys for a given column is unspecified.) An
insertion scankey uses the same array-of-ScanKey data structure, but the
sk_func pointers point to btree comparison support functions (ie, 3-way
comparators that return int4 values interpreted as <0, =0, >0). In an
insertion scankey there is exactly one entry per index column. Insertion
scankeys are built within the btree code (eg, by _bt_mkscankey()) and are
used to locate the starting point of a scan, as well as for locating the
place to insert a new index tuple. (Note: in the case of an insertion
scankey built from a search scankey, there might be fewer keys than
index columns, indicating that we have no constraints for the remaining
index columns.) After we have located the starting point of a scan, the
original search scankey is consulted as each index entry is sequentially
scanned to decide whether to return the entry and whether the scan can
stop (see _bt_checkkeys()).
insertion scankey ("BTScanInsert" data structure) uses a similar
array-of-ScanKey data structure, but the sk_func pointers point to btree
comparison support functions (ie, 3-way comparators that return int4 values
interpreted as <0, =0, >0). In an insertion scankey there is at most one
entry per index column. There is also other data about the rules used to
locate where to begin the scan, such as whether or not the scan is a
"nextkey" scan. Insertion scankeys are built within the btree code (eg, by
_bt_mkscankey()) and are used to locate the starting point of a scan, as
well as for locating the place to insert a new index tuple. (Note: in the
case of an insertion scankey built from a search scankey or built from a
truncated pivot tuple, there might be fewer keys than index columns,
indicating that we have no constraints for the remaining index columns.)
After we have located the starting point of a scan, the original search
scankey is consulted as each index entry is sequentially scanned to decide
whether to return the entry and whether the scan can stop (see
_bt_checkkeys()).
We use term "pivot" index tuples to distinguish tuples which don't point
to heap tuples, but rather used for tree navigation. Pivot tuples includes
......
......@@ -51,19 +51,16 @@ typedef struct
static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
Relation heapRel, Buffer buf, OffsetNumber offset,
ScanKey itup_scankey,
static TransactionId _bt_check_unique(Relation rel, BTInsertState insertstate,
Relation heapRel,
IndexUniqueCheck checkUnique, bool *is_unique,
uint32 *speculativeToken);
static void _bt_findinsertloc(Relation rel,
Buffer *bufptr,
OffsetNumber *offsetptr,
int keysz,
ScanKey scankey,
IndexTuple newtup,
static OffsetNumber _bt_findinsertloc(Relation rel,
BTInsertState insertstate,
bool checkingunique,
BTStack stack,
Relation heapRel);
static void _bt_stepright(Relation rel, BTInsertState insertstate, BTStack stack);
static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
BTStack stack,
IndexTuple itup,
......@@ -83,8 +80,8 @@ static void _bt_checksplitloc(FindSplitData *state,
int dataitemstoleft, Size firstoldonrightsz);
static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
OffsetNumber itup_off);
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey);
static bool _bt_isequal(TupleDesc itupdesc, BTScanInsert itup_key,
Page page, OffsetNumber offnum);
static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
/*
......@@ -110,18 +107,26 @@ _bt_doinsert(Relation rel, IndexTuple itup,
IndexUniqueCheck checkUnique, Relation heapRel)
{
bool is_unique = false;
int indnkeyatts;
ScanKey itup_scankey;
BTInsertStateData insertstate;
BTScanInsert itup_key;
BTStack stack = NULL;
Buffer buf;
OffsetNumber offset;
bool fastpath;
indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
Assert(indnkeyatts != 0);
bool checkingunique = (checkUnique != UNIQUE_CHECK_NO);
/* we need an insertion scan key to do our search, so build one */
itup_scankey = _bt_mkscankey(rel, itup);
itup_key = _bt_mkscankey(rel, itup);
/*
* Fill in the BTInsertState working area, to track the current page and
* position within the page to insert on
*/
insertstate.itup = itup;
/* PageAddItem will MAXALIGN(), but be consistent */
insertstate.itemsz = MAXALIGN(IndexTupleSize(itup));
insertstate.itup_key = itup_key;
insertstate.bounds_valid = false;
insertstate.buf = InvalidBuffer;
/*
* It's very common to have an index on an auto-incremented or
......@@ -144,10 +149,8 @@ _bt_doinsert(Relation rel, IndexTuple itup,
*/
top:
fastpath = false;
offset = InvalidOffsetNumber;
if (RelationGetTargetBlock(rel) != InvalidBlockNumber)
{
Size itemsz;
Page page;
BTPageOpaque lpageop;
......@@ -166,9 +169,6 @@ top:
page = BufferGetPage(buf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
itemsz = IndexTupleSize(itup);
itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this
* but we need to be consistent */
/*
* Check if the page is still the rightmost leaf page, has enough
......@@ -177,10 +177,9 @@ top:
*/
if (P_ISLEAF(lpageop) && P_RIGHTMOST(lpageop) &&
!P_IGNORE(lpageop) &&
(PageGetFreeSpace(page) > itemsz) &&
(PageGetFreeSpace(page) > insertstate.itemsz) &&
PageGetMaxOffsetNumber(page) >= P_FIRSTDATAKEY(lpageop) &&
_bt_compare(rel, indnkeyatts, itup_scankey, page,
P_FIRSTDATAKEY(lpageop)) > 0)
_bt_compare(rel, itup_key, page, P_FIRSTDATAKEY(lpageop)) > 0)
{
/*
* The right-most block should never have an incomplete split.
......@@ -219,10 +218,12 @@ top:
* Find the first page containing this key. Buffer returned by
* _bt_search() is locked in exclusive mode.
*/
stack = _bt_search(rel, indnkeyatts, itup_scankey, false, &buf, BT_WRITE,
NULL);
stack = _bt_search(rel, itup_key, &buf, BT_WRITE, NULL);
}
insertstate.buf = buf;
buf = InvalidBuffer; /* insertstate.buf now owns the buffer */
/*
* If we're not allowing duplicates, make sure the key isn't already in
* the index.
......@@ -244,19 +245,19 @@ top:
* let the tuple in and return false for possibly non-unique, or true for
* definitely unique.
*/
if (checkUnique != UNIQUE_CHECK_NO)
if (checkingunique)
{
TransactionId xwait;
uint32 speculativeToken;
offset = _bt_binsrch(rel, buf, indnkeyatts, itup_scankey, false);
xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
checkUnique, &is_unique, &speculativeToken);
xwait = _bt_check_unique(rel, &insertstate, heapRel, checkUnique,
&is_unique, &speculativeToken);
if (TransactionIdIsValid(xwait))
{
/* Have to wait for the other guy ... */
_bt_relbuf(rel, buf);
_bt_relbuf(rel, insertstate.buf);
insertstate.buf = InvalidBuffer;
/*
* If it's a speculative insertion, wait for it to finish (ie. to
......@@ -277,6 +278,8 @@ top:
if (checkUnique != UNIQUE_CHECK_EXISTING)
{
OffsetNumber newitemoff;
/*
* The only conflict predicate locking cares about for indexes is when
* an index tuple insert conflicts with an existing lock. Since the
......@@ -286,22 +289,28 @@ top:
* This reasoning also applies to INCLUDE indexes, whose extra
* attributes are not considered part of the key space.
*/
CheckForSerializableConflictIn(rel, NULL, buf);
/* do the insertion */
_bt_findinsertloc(rel, &buf, &offset, indnkeyatts, itup_scankey, itup,
stack, heapRel);
_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
CheckForSerializableConflictIn(rel, NULL, insertstate.buf);
/*
* Do the insertion. Note that insertstate contains cached binary
* search bounds established within _bt_check_unique when insertion is
* checkingunique.
*/
newitemoff = _bt_findinsertloc(rel, &insertstate, checkingunique,
stack, heapRel);
_bt_insertonpg(rel, insertstate.buf, InvalidBuffer, stack, itup,
newitemoff, false);
}
else
{
/* just release the buffer */
_bt_relbuf(rel, buf);
_bt_relbuf(rel, insertstate.buf);
}
/* be tidy */
if (stack)
_bt_freestack(stack);
_bt_freeskey(itup_scankey);
pfree(itup_key);
return is_unique;
}
......@@ -309,10 +318,6 @@ top:
/*
* _bt_check_unique() -- Check for violation of unique index constraint
*
* offset points to the first possible item that could conflict. It can
* also point to end-of-page, which means that the first tuple to check
* is the first tuple on the next page.
*
* Returns InvalidTransactionId if there is no conflict, else an xact ID
* we must wait for to see if it commits a conflicting tuple. If an actual
* conflict is detected, no return --- just ereport(). If an xact ID is
......@@ -324,16 +329,21 @@ top:
* InvalidTransactionId because we don't want to wait. In this case we
* set *is_unique to false if there is a potential conflict, and the
* core code must redo the uniqueness check later.
*
* As a side-effect, sets state in insertstate that can later be used by
* _bt_findinsertloc() to reuse most of the binary search work we do
* here.
*/
static TransactionId
_bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
_bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
IndexUniqueCheck checkUnique, bool *is_unique,
uint32 *speculativeToken)
{
TupleDesc itupdesc = RelationGetDescr(rel);
int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
IndexTuple itup = insertstate->itup;
BTScanInsert itup_key = insertstate->itup_key;
SnapshotData SnapshotDirty;
OffsetNumber offset;
OffsetNumber maxoff;
Page page;
BTPageOpaque opaque;
......@@ -345,13 +355,22 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
InitDirtySnapshot(SnapshotDirty);
page = BufferGetPage(buf);
page = BufferGetPage(insertstate->buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
/*
* Find the first tuple with the same key.
*
* This also saves the binary search bounds in insertstate. We use them
* in the fastpath below, but also in the _bt_findinsertloc() call later.
*/
offset = _bt_binsrch_insert(rel, insertstate);
/*
* Scan over all equal tuples, looking for live conflicts.
*/
Assert(!insertstate->bounds_valid || insertstate->low == offset);
for (;;)
{
ItemId curitemid;
......@@ -364,21 +383,40 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
*/
if (offset <= maxoff)
{
/*
* Fastpath: In most cases, we can use cached search bounds to
* limit our consideration to items that are definitely
* duplicates. This fastpath doesn't apply when the original page
* is empty, or when initial offset is past the end of the
* original page, which may indicate that we need to examine a
* second or subsequent page.
*
* Note that this optimization avoids calling _bt_isequal()
* entirely when there are no duplicates, as long as the offset
* where the key will go is not at the end of the page.
*/
if (nbuf == InvalidBuffer && offset == insertstate->stricthigh)
{
Assert(insertstate->bounds_valid);
Assert(insertstate->low >= P_FIRSTDATAKEY(opaque));
Assert(insertstate->low <= insertstate->stricthigh);
Assert(!_bt_isequal(itupdesc, itup_key, page, offset));
break;
}
curitemid = PageGetItemId(page, offset);
/*
* We can skip items that are marked killed.
*
* Formerly, we applied _bt_isequal() before checking the kill
* flag, so as to fall out of the item loop as soon as possible.
* However, in the presence of heavy update activity an index may
* contain many killed items with the same key; running
* _bt_isequal() on each killed item gets expensive. Furthermore
* it is likely that the non-killed version of each key appears
* first, so that we didn't actually get to exit any sooner
* anyway. So now we just advance over killed items as quickly as
* we can. We only apply _bt_isequal() when we get to a non-killed
* item or the end of the page.
* In the presence of heavy update activity an index may contain
* many killed items with the same key; running _bt_isequal() on
* each killed item gets expensive. Just advance over killed
* items as quickly as we can. We only apply _bt_isequal() when
* we get to a non-killed item. Even those comparisons could be
* avoided (in the common case where there is only one page to
* visit) by reusing bounds, but just skipping dead items is fast
* enough.
*/
if (!ItemIdIsDead(curitemid))
{
......@@ -391,7 +429,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
* in real comparison, but only for ordering/finding items on
* pages. - vadim 03/24/97
*/
if (!_bt_isequal(itupdesc, page, offset, indnkeyatts, itup_scankey))
if (!_bt_isequal(itupdesc, itup_key, page, offset))
break; /* we're past all the equal tuples */
/* okay, we gotta fetch the heap tuple ... */
......@@ -488,7 +526,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
* otherwise be masked by this unique constraint
* violation.
*/
CheckForSerializableConflictIn(rel, NULL, buf);
CheckForSerializableConflictIn(rel, NULL, insertstate->buf);
/*
* This is a definite conflict. Break the tuple down into
......@@ -500,7 +538,8 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
*/
if (nbuf != InvalidBuffer)
_bt_relbuf(rel, nbuf);
_bt_relbuf(rel, buf);
_bt_relbuf(rel, insertstate->buf);
insertstate->buf = InvalidBuffer;
{
Datum values[INDEX_MAX_KEYS];
......@@ -540,7 +579,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
if (nbuf != InvalidBuffer)
MarkBufferDirtyHint(nbuf, true);
else
MarkBufferDirtyHint(buf, true);
MarkBufferDirtyHint(insertstate->buf, true);
}
}
}
......@@ -552,11 +591,14 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
offset = OffsetNumberNext(offset);
else
{
int highkeycmp;
/* If scankey == hikey we gotta check the next page too */
if (P_RIGHTMOST(opaque))
break;
if (!_bt_isequal(itupdesc, page, P_HIKEY,
indnkeyatts, itup_scankey))
highkeycmp = _bt_compare(rel, itup_key, page, P_HIKEY);
Assert(highkeycmp <= 0);
if (highkeycmp != 0)
break;
/* Advance to next non-dead page --- there must be one */
for (;;)
......@@ -600,57 +642,41 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
/*
* _bt_findinsertloc() -- Finds an insert location for a tuple
*
* On entry, insertstate buffer contains the first legal page the new
* tuple could be inserted to. It is exclusive-locked and pinned by the
* caller.
*
* If the new key is equal to one or more existing keys, we can
* legitimately place it anywhere in the series of equal keys --- in fact,
* if the new key is equal to the page's "high key" we can place it on
* the next page. If it is equal to the high key, and there's not room
* to insert the new tuple on the current page without splitting, then
* we can move right hoping to find more free space and avoid a split.
* (We should not move right indefinitely, however, since that leads to
* O(N^2) insertion behavior in the presence of many equal keys.)
* Once we have chosen the page to put the key on, we'll insert it before
* any existing equal keys because of the way _bt_binsrch() works.
*
* If there's not enough room in the space, we try to make room by
* removing any LP_DEAD tuples.
* Furthermore, if there's not enough room on a page, we try to make
* room by removing any LP_DEAD tuples.
*
* On entry, *bufptr and *offsetptr point to the first legal position
* where the new tuple could be inserted. The caller should hold an
* exclusive lock on *bufptr. *offsetptr can also be set to
* InvalidOffsetNumber, in which case the function will search for the
* right location within the page if needed. On exit, they point to the
* chosen insert location. If _bt_findinsertloc decides to move right,
* the lock and pin on the original page will be released and the new
* page returned to the caller is exclusively locked instead.
* On exit, insertstate buffer contains the chosen insertion page, and
* the offset within that page is returned. If _bt_findinsertloc needed
* to move right, the lock and pin on the original page are released, and
* the new buffer is exclusively locked and pinned instead.
*
* newtup is the new tuple we're inserting, and scankey is an insertion
* type scan key for it.
* If insertstate contains cached binary search bounds, we will take
* advantage of them. This avoids repeating comparisons that we made in
* _bt_check_unique() already.
*/
static void
static OffsetNumber
_bt_findinsertloc(Relation rel,
Buffer *bufptr,
OffsetNumber *offsetptr,
int keysz,
ScanKey scankey,
IndexTuple newtup,
BTInsertState insertstate,
bool checkingunique,
BTStack stack,
Relation heapRel)
{
Buffer buf = *bufptr;
Page page = BufferGetPage(buf);
Size itemsz;
BTScanInsert itup_key = insertstate->itup_key;
Page page = BufferGetPage(insertstate->buf);
BTPageOpaque lpageop;
bool movedright,
vacuumed;
OffsetNumber newitemoff;
OffsetNumber firstlegaloff = *offsetptr;
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
itemsz = IndexTupleSize(newtup);
itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
* need to be consistent */
/*
* Check whether the item can fit on a btree page at all. (Eventually, we
* ought to try to apply TOAST methods if not.) We actually need to be
......@@ -660,11 +686,11 @@ _bt_findinsertloc(Relation rel,
*
* NOTE: if you change this, see also the similar code in _bt_buildadd().
*/
if (itemsz > BTMaxItemSize(page))
if (insertstate->itemsz > BTMaxItemSize(page))
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
itemsz, BTMaxItemSize(page),
insertstate->itemsz, BTMaxItemSize(page),
RelationGetRelationName(rel)),
errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
"Consider a function index of an MD5 hash of the value, "
......@@ -690,100 +716,113 @@ _bt_findinsertloc(Relation rel,
* excellent job of preventing O(N^2) behavior with many equal keys.
*----------
*/
movedright = false;
vacuumed = false;
while (PageGetFreeSpace(page) < itemsz)
{
Buffer rbuf;
BlockNumber rblkno;
Assert(P_ISLEAF(lpageop) && !P_INCOMPLETE_SPLIT(lpageop));
Assert(!insertstate->bounds_valid || checkingunique);
while (PageGetFreeSpace(page) < insertstate->itemsz)
{
/*
* before considering moving right, see if we can obtain enough space
* by erasing LP_DEAD items
*/
if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
if (P_HAS_GARBAGE(lpageop))
{
_bt_vacuum_one_page(rel, buf, heapRel);
_bt_vacuum_one_page(rel, insertstate->buf, heapRel);
insertstate->bounds_valid = false;
/*
* remember that we vacuumed this page, because that makes the
* hint supplied by the caller invalid
*/
vacuumed = true;
if (PageGetFreeSpace(page) >= itemsz)
if (PageGetFreeSpace(page) >= insertstate->itemsz)
break; /* OK, now we have enough space */
}
/*
* nope, so check conditions (b) and (c) enumerated above
* Nope, so check conditions (b) and (c) enumerated above
*
* The earlier _bt_check_unique() call may well have established a
* strict upper bound on the offset for the new item. If it's not the
* last item of the page (i.e. if there is at least one tuple on the
* page that's greater than the tuple we're inserting to) then we know
* that the tuple belongs on this page. We can skip the high key
* check.
*/
if (insertstate->bounds_valid &&
insertstate->low <= insertstate->stricthigh &&
insertstate->stricthigh <= PageGetMaxOffsetNumber(page))
break;
if (P_RIGHTMOST(lpageop) ||
_bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
_bt_compare(rel, itup_key, page, P_HIKEY) != 0 ||
random() <= (MAX_RANDOM_VALUE / 100))
break;
/*
* step right to next non-dead page
*
* must write-lock that page before releasing write lock on current
* page; else someone else's _bt_check_unique scan could fail to see
* our insertion. write locks on intermediate dead pages won't do
* because we don't know when they will get de-linked from the tree.
*/
rbuf = InvalidBuffer;
_bt_stepright(rel, insertstate, stack);
/* Update local state after stepping right */
page = BufferGetPage(insertstate->buf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
}
rblkno = lpageop->btpo_next;
for (;;)
{
rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
page = BufferGetPage(rbuf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
/*
* We should now be on the correct page. Find the offset within the page
* for the new tuple. (Possibly reusing earlier search bounds.)
*/
Assert(P_RIGHTMOST(lpageop) ||
_bt_compare(rel, itup_key, page, P_HIKEY) <= 0);
/*
* If this page was incompletely split, finish the split now. We
* do this while holding a lock on the left sibling, which is not
* good because finishing the split could be a fairly lengthy
* operation. But this should happen very seldom.
*/
if (P_INCOMPLETE_SPLIT(lpageop))
{
_bt_finish_split(rel, rbuf, stack);
rbuf = InvalidBuffer;
continue;
}
return _bt_binsrch_insert(rel, insertstate);
}
if (!P_IGNORE(lpageop))
break;
if (P_RIGHTMOST(lpageop))
elog(ERROR, "fell off the end of index \"%s\"",
RelationGetRelationName(rel));
/*
* Step right to next non-dead page, during insertion.
*
* This is a bit more complicated than moving right in a search. We must
* write-lock the target page before releasing write lock on current page;
* else someone else's _bt_check_unique scan could fail to see our insertion.
* Write locks on intermediate dead pages won't do because we don't know when
* they will get de-linked from the tree.
*/
static void
_bt_stepright(Relation rel, BTInsertState insertstate, BTStack stack)
{
Page page;
BTPageOpaque lpageop;
Buffer rbuf;
BlockNumber rblkno;
page = BufferGetPage(insertstate->buf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
rbuf = InvalidBuffer;
rblkno = lpageop->btpo_next;
for (;;)
{
rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
page = BufferGetPage(rbuf);
lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
rblkno = lpageop->btpo_next;
/*
* If this page was incompletely split, finish the split now. We do
* this while holding a lock on the left sibling, which is not good
* because finishing the split could be a fairly lengthy operation.
* But this should happen very seldom.
*/
if (P_INCOMPLETE_SPLIT(lpageop))
{
_bt_finish_split(rel, rbuf, stack);
rbuf = InvalidBuffer;
continue;
}
_bt_relbuf(rel, buf);
buf = rbuf;
movedright = true;
vacuumed = false;
}
/*
* Now we are on the right page, so find the insert position. If we moved
* right at all, we know we should insert at the start of the page. If we
* didn't move right, we can use the firstlegaloff hint if the caller
* supplied one, unless we vacuumed the page which might have moved tuples
* around making the hint invalid. If we didn't move right or can't use
* the hint, find the position by searching.
*/
if (movedright)
newitemoff = P_FIRSTDATAKEY(lpageop);
else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
newitemoff = firstlegaloff;
else
newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
if (!P_IGNORE(lpageop))
break;
if (P_RIGHTMOST(lpageop))
elog(ERROR, "fell off the end of index \"%s\"",
RelationGetRelationName(rel));
*bufptr = buf;
*offsetptr = newitemoff;
rblkno = lpageop->btpo_next;
}
/* rbuf locked; unlock buf, update state for caller */
_bt_relbuf(rel, insertstate->buf);
insertstate->buf = rbuf;
insertstate->bounds_valid = false;
}
/*----------
......@@ -2312,24 +2351,21 @@ _bt_pgaddtup(Page page,
* Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
*/
static bool
_bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey)
_bt_isequal(TupleDesc itupdesc, BTScanInsert itup_key, Page page,
OffsetNumber offnum)
{
IndexTuple itup;
ScanKey scankey;
int i;
/* Better be comparing to a leaf item */
/* Better be comparing to a non-pivot item */
Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
Assert(offnum >= P_FIRSTDATAKEY((BTPageOpaque) PageGetSpecialPointer(page)));
scankey = itup_key->scankeys;
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
/*
* It's okay that we might perform a comparison against a truncated page
* high key when caller needs to determine if _bt_check_unique scan must
* continue on to the next page. Caller never asks us to compare non-key
* attributes within an INCLUDE index.
*/
for (i = 1; i <= keysz; i++)
for (i = 1; i <= itup_key->keysz; i++)
{
AttrNumber attno;
Datum datum;
......@@ -2377,6 +2413,8 @@ _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
Page page = BufferGetPage(buffer);
BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
Assert(P_ISLEAF(opaque));
/*
* Scan over all items to see which ones need to be deleted according to
* LP_DEAD flags.
......
......@@ -1371,7 +1371,7 @@ _bt_pagedel(Relation rel, Buffer buf)
*/
if (!stack)
{
ScanKey itup_scankey;
BTScanInsert itup_key;
ItemId itemid;
IndexTuple targetkey;
Buffer lbuf;
......@@ -1421,12 +1421,10 @@ _bt_pagedel(Relation rel, Buffer buf)
}
/* we need an insertion scan key for the search, so build one */
itup_scankey = _bt_mkscankey(rel, targetkey);
/* find the leftmost leaf page containing this key */
stack = _bt_search(rel,
IndexRelationGetNumberOfKeyAttributes(rel),
itup_scankey, false, &lbuf, BT_READ, NULL);
/* don't need a pin on the page */
itup_key = _bt_mkscankey(rel, targetkey);
/* get stack to leaf page by searching index */
stack = _bt_search(rel, itup_key, &lbuf, BT_READ, NULL);
/* don't need a lock or second pin on the page */
_bt_relbuf(rel, lbuf);
/*
......
......@@ -25,6 +25,7 @@
static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp);
static OffsetNumber _bt_binsrch(Relation rel, BTScanInsert key, Buffer buf);
static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
OffsetNumber offnum);
static void _bt_saveitem(BTScanOpaque so, int itemIndex,
......@@ -70,13 +71,9 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp)
* _bt_search() -- Search the tree for a particular scankey,
* or more precisely for the first leaf page it could be on.
*
* The passed scankey must be an insertion-type scankey (see nbtree/README),
* The passed scankey is an insertion-type scankey (see nbtree/README),
* but it can omit the rightmost column(s) of the index.
*
* When nextkey is false (the usual case), we are looking for the first
* item >= scankey. When nextkey is true, we are looking for the first
* item strictly greater than scankey.
*
* Return value is a stack of parent-page pointers. *bufP is set to the
* address of the leaf-page buffer, which is read-locked and pinned.
* No locks are held on the parent pages, however!
......@@ -92,8 +89,8 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp)
* during the search will be finished.
*/
BTStack
_bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
Buffer *bufP, int access, Snapshot snapshot)
_bt_search(Relation rel, BTScanInsert key, Buffer *bufP, int access,
Snapshot snapshot)
{
BTStack stack_in = NULL;
int page_access = BT_READ;
......@@ -129,8 +126,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
* if the leaf page is split and we insert to the parent page). But
* this is a good opportunity to finish splits of internal pages too.
*/
*bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey,
(access == BT_WRITE), stack_in,
*bufP = _bt_moveright(rel, key, *bufP, (access == BT_WRITE), stack_in,
page_access, snapshot);
/* if this is a leaf page, we're done */
......@@ -143,7 +139,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
* Find the appropriate item on the internal page, and get the child
* page that it points to.
*/
offnum = _bt_binsrch(rel, *bufP, keysz, scankey, nextkey);
offnum = _bt_binsrch(rel, key, *bufP);
itemid = PageGetItemId(page, offnum);
itup = (IndexTuple) PageGetItem(page, itemid);
blkno = BTreeInnerTupleGetDownLink(itup);
......@@ -197,8 +193,8 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
* need to move right in the tree. See Lehman and Yao for an
* excruciatingly precise description.
*/
*bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey,
true, stack_in, BT_WRITE, snapshot);
*bufP = _bt_moveright(rel, key, *bufP, true, stack_in, BT_WRITE,
snapshot);
}
return stack_in;
......@@ -214,16 +210,17 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
* or strictly to the right of it.
*
* This routine decides whether or not we need to move right in the
* tree by examining the high key entry on the page. If that entry
* is strictly less than the scankey, or <= the scankey in the nextkey=true
* case, then we followed the wrong link and we need to move right.
* tree by examining the high key entry on the page. If that entry is
* strictly less than the scankey, or <= the scankey in the
* key.nextkey=true case, then we followed the wrong link and we need
* to move right.
*
* The passed scankey must be an insertion-type scankey (see nbtree/README),
* but it can omit the rightmost column(s) of the index.
* The passed insertion-type scankey can omit the rightmost column(s) of the
* index. (see nbtree/README)
*
* When nextkey is false (the usual case), we are looking for the first
* item >= scankey. When nextkey is true, we are looking for the first
* item strictly greater than scankey.
* When key.nextkey is false (the usual case), we are looking for the first
* item >= key. When key.nextkey is true, we are looking for the first item
* strictly greater than key.
*
* If forupdate is true, we will attempt to finish any incomplete splits
* that we encounter. This is required when locking a target page for an
......@@ -240,10 +237,8 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
*/
Buffer
_bt_moveright(Relation rel,
BTScanInsert key,
Buffer buf,
int keysz,
ScanKey scankey,
bool nextkey,
bool forupdate,
BTStack stack,
int access,
......@@ -268,7 +263,7 @@ _bt_moveright(Relation rel,
* We also have to move right if we followed a link that brought us to a
* dead page.
*/
cmpval = nextkey ? 0 : 1;
cmpval = key->nextkey ? 0 : 1;
for (;;)
{
......@@ -303,7 +298,7 @@ _bt_moveright(Relation rel,
continue;
}
if (P_IGNORE(opaque) || _bt_compare(rel, keysz, scankey, page, P_HIKEY) >= cmpval)
if (P_IGNORE(opaque) || _bt_compare(rel, key, page, P_HIKEY) >= cmpval)
{
/* step right one page */
buf = _bt_relandgetbuf(rel, buf, opaque->btpo_next, access);
......@@ -323,13 +318,6 @@ _bt_moveright(Relation rel,
/*
* _bt_binsrch() -- Do a binary search for a key on a particular page.
*
* The passed scankey must be an insertion-type scankey (see nbtree/README),
* but it can omit the rightmost column(s) of the index.
*
* When nextkey is false (the usual case), we are looking for the first
* item >= scankey. When nextkey is true, we are looking for the first
* item strictly greater than scankey.
*
* On a leaf page, _bt_binsrch() returns the OffsetNumber of the first
* key >= given scankey, or > scankey if nextkey is true. (NOTE: in
* particular, this means it is possible to return a value 1 greater than the
......@@ -347,12 +335,10 @@ _bt_moveright(Relation rel,
* the given page. _bt_binsrch() has no lock or refcount side effects
* on the buffer.
*/
OffsetNumber
static OffsetNumber
_bt_binsrch(Relation rel,
Buffer buf,
int keysz,
ScanKey scankey,
bool nextkey)
BTScanInsert key,
Buffer buf)
{
Page page;
BTPageOpaque opaque;
......@@ -374,7 +360,7 @@ _bt_binsrch(Relation rel,
* This can never happen on an internal page, however, since they are
* never empty (an internal page must have children).
*/
if (high < low)
if (unlikely(high < low))
return low;
/*
......@@ -391,7 +377,7 @@ _bt_binsrch(Relation rel,
*/
high++; /* establish the loop invariant for high */
cmpval = nextkey ? 0 : 1; /* select comparison value */
cmpval = key->nextkey ? 0 : 1; /* select comparison value */
while (high > low)
{
......@@ -399,7 +385,7 @@ _bt_binsrch(Relation rel,
/* We have low <= mid < high, so mid points at a real slot */
result = _bt_compare(rel, keysz, scankey, page, mid);
result = _bt_compare(rel, key, page, mid);
if (result >= cmpval)
low = mid + 1;
......@@ -426,14 +412,120 @@ _bt_binsrch(Relation rel,
return OffsetNumberPrev(low);
}
/*----------
* _bt_compare() -- Compare scankey to a particular tuple on the page.
/*
*
* The passed scankey must be an insertion-type scankey (see nbtree/README),
* but it can omit the rightmost column(s) of the index.
* bt_binsrch_insert() -- Cacheable, incremental leaf page binary search.
*
* Like _bt_binsrch(), but with support for caching the binary search
* bounds. Only used during insertion, and only on the leaf page that it
* looks like caller will insert tuple on. Exclusive-locked and pinned
* leaf page is contained within insertstate.
*
* Caches the bounds fields in insertstate so that a subsequent call can
* reuse the low and strict high bounds of original binary search. Callers
* that use these fields directly must be prepared for the case where low
* and/or stricthigh are not on the same page (one or both exceed maxoff
* for the page). The case where there are no items on the page (high <
* low) makes bounds invalid.
*
* Caller is responsible for invalidating bounds when it modifies the page
* before calling here a second time.
*/
OffsetNumber
_bt_binsrch_insert(Relation rel, BTInsertState insertstate)
{
BTScanInsert key = insertstate->itup_key;
Page page;
BTPageOpaque opaque;
OffsetNumber low,
high,
stricthigh;
int32 result,
cmpval;
page = BufferGetPage(insertstate->buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
Assert(P_ISLEAF(opaque));
Assert(!key->nextkey);
if (!insertstate->bounds_valid)
{
/* Start new binary search */
low = P_FIRSTDATAKEY(opaque);
high = PageGetMaxOffsetNumber(page);
}
else
{
/* Restore result of previous binary search against same page */
low = insertstate->low;
high = insertstate->stricthigh;
}
/* If there are no keys on the page, return the first available slot */
if (unlikely(high < low))
{
/* Caller can't reuse bounds */
insertstate->low = InvalidOffsetNumber;
insertstate->stricthigh = InvalidOffsetNumber;
insertstate->bounds_valid = false;
return low;
}
/*
* Binary search to find the first key on the page >= scan key. (nextkey
* is always false when inserting).
*
* The loop invariant is: all slots before 'low' are < scan key, all slots
* at or after 'high' are >= scan key. 'stricthigh' is > scan key, and is
* maintained to save additional search effort for caller.
*
* We can fall out when high == low.
*/
if (!insertstate->bounds_valid)
high++; /* establish the loop invariant for high */
stricthigh = high; /* high initially strictly higher */
cmpval = 1; /* !nextkey comparison value */
while (high > low)
{
OffsetNumber mid = low + ((high - low) / 2);
/* We have low <= mid < high, so mid points at a real slot */
result = _bt_compare(rel, key, page, mid);
if (result >= cmpval)
low = mid + 1;
else
{
high = mid;
if (result != 0)
stricthigh = high;
}
}
/*
* On a leaf page, a binary search always returns the first key >= scan
* key (at least in !nextkey case), which could be the last slot + 1. This
* is also the lower bound of cached search.
*
* stricthigh may also be the last slot + 1, which prevents caller from
* using bounds directly, but is still useful to us if we're called a
* second time with cached bounds (cached low will be < stricthigh when
* that happens).
*/
insertstate->low = low;
insertstate->stricthigh = stricthigh;
insertstate->bounds_valid = true;
return low;
}
/*----------
* _bt_compare() -- Compare insertion-type scankey to tuple on a page.
*
* keysz: number of key conditions to be checked (might be less than the
* number of index columns!)
* page/offnum: location of btree item to be compared to.
*
* This routine returns:
......@@ -446,25 +538,26 @@ _bt_binsrch(Relation rel,
*
* CRUCIAL NOTE: on a non-leaf page, the first data key is assumed to be
* "minus infinity": this routine will always claim it is less than the
* scankey. The actual key value stored (if any, which there probably isn't)
* does not matter. This convention allows us to implement the Lehman and
* Yao convention that the first down-link pointer is before the first key.
* See backend/access/nbtree/README for details.
* scankey. The actual key value stored is explicitly truncated to 0
* attributes (explicitly minus infinity) with version 3+ indexes, but
* that isn't relied upon. This allows us to implement the Lehman and
* Yao convention that the first down-link pointer is before the first
* key. See backend/access/nbtree/README for details.
*----------
*/
int32
_bt_compare(Relation rel,
int keysz,
ScanKey scankey,
BTScanInsert key,
Page page,
OffsetNumber offnum)
{
TupleDesc itupdesc = RelationGetDescr(rel);
BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
IndexTuple itup;
int i;
ScanKey scankey;
Assert(_bt_check_natts(rel, page, offnum));
Assert(key->keysz <= IndexRelationGetNumberOfKeyAttributes(rel));
/*
* Force result ">" if target item is first data item on an internal page
......@@ -487,7 +580,8 @@ _bt_compare(Relation rel,
* _bt_first).
*/
for (i = 1; i <= keysz; i++)
scankey = key->scankeys;
for (int i = 1; i <= key->keysz; i++)
{
Datum datum;
bool isNull;
......@@ -573,8 +667,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
StrategyNumber strat;
bool nextkey;
bool goback;
BTScanInsertData inskey;
ScanKey startKeys[INDEX_MAX_KEYS];
ScanKeyData scankeys[INDEX_MAX_KEYS];
ScanKeyData notnullkeys[INDEX_MAX_KEYS];
int keysCount = 0;
int i;
......@@ -820,8 +914,9 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
/*
* We want to start the scan somewhere within the index. Set up an
* insertion scankey we can use to search for the boundary point we
* identified above. The insertion scankey is built in the local
* scankeys[] array, using the keys identified by startKeys[].
* identified above. The insertion scankey is built using the keys
* identified by startKeys[]. (Remaining insertion scankey fields are
* initialized after initial-positioning strategy is finalized.)
*/
Assert(keysCount <= INDEX_MAX_KEYS);
for (i = 0; i < keysCount; i++)
......@@ -849,7 +944,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
_bt_parallel_done(scan);
return false;
}
memcpy(scankeys + i, subkey, sizeof(ScanKeyData));
memcpy(inskey.scankeys + i, subkey, sizeof(ScanKeyData));
/*
* If the row comparison is the last positioning key we accepted,
......@@ -881,7 +976,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
if (subkey->sk_flags & SK_ISNULL)
break; /* can't use null keys */
Assert(keysCount < INDEX_MAX_KEYS);
memcpy(scankeys + keysCount, subkey, sizeof(ScanKeyData));
memcpy(inskey.scankeys + keysCount, subkey,
sizeof(ScanKeyData));
keysCount++;
if (subkey->sk_flags & SK_ROW_END)
{
......@@ -927,7 +1023,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
FmgrInfo *procinfo;
procinfo = index_getprocinfo(rel, cur->sk_attno, BTORDER_PROC);
ScanKeyEntryInitializeWithInfo(scankeys + i,
ScanKeyEntryInitializeWithInfo(inskey.scankeys + i,
cur->sk_flags,
cur->sk_attno,
InvalidStrategy,
......@@ -948,7 +1044,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
elog(ERROR, "missing support function %d(%u,%u) for attribute %d of index \"%s\"",
BTORDER_PROC, rel->rd_opcintype[i], cur->sk_subtype,
cur->sk_attno, RelationGetRelationName(rel));
ScanKeyEntryInitialize(scankeys + i,
ScanKeyEntryInitialize(inskey.scankeys + i,
cur->sk_flags,
cur->sk_attno,
InvalidStrategy,
......@@ -1051,12 +1147,15 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
return false;
}
/* Initialize remaining insertion scan key fields */
inskey.nextkey = nextkey;
inskey.keysz = keysCount;
/*
* Use the manufactured insertion scan key to descend the tree and
* position ourselves on the target leaf page.
*/
stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ,
scan->xs_snapshot);
stack = _bt_search(rel, &inskey, &buf, BT_READ, scan->xs_snapshot);
/* don't need to keep the stack around... */
_bt_freestack(stack);
......@@ -1085,7 +1184,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
_bt_initialize_more_data(so, dir);
/* position to the precise item on the page */
offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey);
offnum = _bt_binsrch(rel, &inskey, buf);
/*
* If nextkey = false, we are positioned at the first item >= scan key, or
......
......@@ -263,6 +263,7 @@ typedef struct BTWriteState
{
Relation heap;
Relation index;
BTScanInsert inskey; /* generic insertion scankey */
bool btws_use_wal; /* dump pages to WAL? */
BlockNumber btws_pages_alloced; /* # pages allocated */
BlockNumber btws_pages_written; /* # pages written out */
......@@ -540,6 +541,7 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
wstate.heap = btspool->heap;
wstate.index = btspool->index;
wstate.inskey = _bt_mkscankey(wstate.index, NULL);
/*
* We need to log index creation in WAL iff WAL archiving/streaming is
......@@ -1085,7 +1087,6 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
TupleDesc tupdes = RelationGetDescr(wstate->index);
int i,
keysz = IndexRelationGetNumberOfKeyAttributes(wstate->index);
ScanKey indexScanKey = NULL;
SortSupport sortKeys;
if (merge)
......@@ -1098,7 +1099,6 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
/* the preparation of merge */
itup = tuplesort_getindextuple(btspool->sortstate, true);
itup2 = tuplesort_getindextuple(btspool2->sortstate, true);
indexScanKey = _bt_mkscankey_nodata(wstate->index);
/* Prepare SortSupport data for each column */
sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
......@@ -1106,7 +1106,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
for (i = 0; i < keysz; i++)
{
SortSupport sortKey = sortKeys + i;
ScanKey scanKey = indexScanKey + i;
ScanKey scanKey = wstate->inskey->scankeys + i;
int16 strategy;
sortKey->ssup_cxt = CurrentMemoryContext;
......@@ -1125,8 +1125,6 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey);
}
_bt_freeskey(indexScanKey);
for (;;)
{
load1 = true; /* load BTSpool next ? */
......
......@@ -56,34 +56,37 @@ static bool _bt_check_rowcompare(ScanKey skey,
* Build an insertion scan key that contains comparison data from itup
* as well as comparator routines appropriate to the key datatypes.
*
* The result is intended for use with _bt_compare().
* Result is intended for use with _bt_compare(). Callers that don't
* need to fill out the insertion scankey arguments (e.g. they use an
* ad-hoc comparison routine) can pass a NULL index tuple.
*/
ScanKey
BTScanInsert
_bt_mkscankey(Relation rel, IndexTuple itup)
{
BTScanInsert key;
ScanKey skey;
TupleDesc itupdesc;
int indnatts PG_USED_FOR_ASSERTS_ONLY;
int indnkeyatts;
int16 *indoption;
int tupnatts;
int i;
itupdesc = RelationGetDescr(rel);
indnatts = IndexRelationGetNumberOfAttributes(rel);
indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
indoption = rel->rd_indoption;
tupnatts = itup ? BTreeTupleGetNAtts(itup, rel) : 0;
Assert(indnkeyatts > 0);
Assert(indnkeyatts <= indnatts);
Assert(BTreeTupleGetNAtts(itup, rel) == indnatts ||
BTreeTupleGetNAtts(itup, rel) == indnkeyatts);
Assert(tupnatts <= IndexRelationGetNumberOfAttributes(rel));
/*
* We'll execute search using scan key constructed on key columns. Non-key
* (INCLUDE index) columns are always omitted from scan keys.
*/
skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
key = palloc(offsetof(BTScanInsertData, scankeys) +
sizeof(ScanKeyData) * indnkeyatts);
key->nextkey = false;
key->keysz = Min(indnkeyatts, tupnatts);
skey = key->scankeys;
for (i = 0; i < indnkeyatts; i++)
{
FmgrInfo *procinfo;
......@@ -96,56 +99,20 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
* comparison can be needed.
*/
procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
arg = index_getattr(itup, i + 1, itupdesc, &null);
flags = (null ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT);
ScanKeyEntryInitializeWithInfo(&skey[i],
flags,
(AttrNumber) (i + 1),
InvalidStrategy,
InvalidOid,
rel->rd_indcollation[i],
procinfo,
arg);
}
return skey;
}
/*
* _bt_mkscankey_nodata
* Build an insertion scan key that contains 3-way comparator routines
* appropriate to the key datatypes, but no comparison data. The
* comparison data ultimately used must match the key datatypes.
*
* The result cannot be used with _bt_compare(), unless comparison
* data is first stored into the key entries. Currently this
* routine is only called by nbtsort.c and tuplesort.c, which have
* their own comparison routines.
*/
ScanKey
_bt_mkscankey_nodata(Relation rel)
{
ScanKey skey;
int indnkeyatts;
int16 *indoption;
int i;
indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
indoption = rel->rd_indoption;
skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
for (i = 0; i < indnkeyatts; i++)
{
FmgrInfo *procinfo;
int flags;
/*
* We can use the cached (default) support procs since no cross-type
* comparison can be needed.
* Key arguments built when caller provides no tuple are
* defensively represented as NULL values. They should never be
* used.
*/
procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
flags = SK_ISNULL | (indoption[i] << SK_BT_INDOPTION_SHIFT);
if (i < tupnatts)
arg = index_getattr(itup, i + 1, itupdesc, &null);
else
{
arg = (Datum) 0;
null = true;
}
flags = (null ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT);
ScanKeyEntryInitializeWithInfo(&skey[i],
flags,
(AttrNumber) (i + 1),
......@@ -153,19 +120,10 @@ _bt_mkscankey_nodata(Relation rel)
InvalidOid,
rel->rd_indcollation[i],
procinfo,
(Datum) 0);
arg);
}
return skey;
}
/*
* free a scan key made by either _bt_mkscankey or _bt_mkscankey_nodata.
*/
void
_bt_freeskey(ScanKey skey)
{
pfree(skey);
return key;
}
/*
......
......@@ -884,7 +884,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
{
Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
randomAccess);
ScanKey indexScanKey;
BTScanInsert indexScanKey;
MemoryContext oldcontext;
int i;
......@@ -919,7 +919,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
state->tupDesc = tupDesc; /* assume we need not copy tupDesc */
indexScanKey = _bt_mkscankey_nodata(indexRel);
indexScanKey = _bt_mkscankey(indexRel, NULL);
if (state->indexInfo->ii_Expressions != NULL)
{
......@@ -945,7 +945,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
for (i = 0; i < state->nKeys; i++)
{
SortSupport sortKey = state->sortKeys + i;
ScanKey scanKey = indexScanKey + i;
ScanKey scanKey = indexScanKey->scankeys + i;
int16 strategy;
sortKey->ssup_cxt = CurrentMemoryContext;
......@@ -964,7 +964,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
}
_bt_freeskey(indexScanKey);
pfree(indexScanKey);
MemoryContextSwitchTo(oldcontext);
......@@ -981,7 +981,7 @@ tuplesort_begin_index_btree(Relation heapRel,
{
Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
randomAccess);
ScanKey indexScanKey;
BTScanInsert indexScanKey;
MemoryContext oldcontext;
int i;
......@@ -1014,7 +1014,7 @@ tuplesort_begin_index_btree(Relation heapRel,
state->indexRel = indexRel;
state->enforceUnique = enforceUnique;
indexScanKey = _bt_mkscankey_nodata(indexRel);
indexScanKey = _bt_mkscankey(indexRel, NULL);
/* Prepare SortSupport data for each column */
state->sortKeys = (SortSupport) palloc0(state->nKeys *
......@@ -1023,7 +1023,7 @@ tuplesort_begin_index_btree(Relation heapRel,
for (i = 0; i < state->nKeys; i++)
{
SortSupport sortKey = state->sortKeys + i;
ScanKey scanKey = indexScanKey + i;
ScanKey scanKey = indexScanKey->scankeys + i;
int16 strategy;
sortKey->ssup_cxt = CurrentMemoryContext;
......@@ -1042,7 +1042,7 @@ tuplesort_begin_index_btree(Relation heapRel,
PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
}
_bt_freeskey(indexScanKey);
pfree(indexScanKey);
MemoryContextSwitchTo(oldcontext);
......
......@@ -319,6 +319,64 @@ typedef struct BTStackData
typedef BTStackData *BTStack;
/*
* BTScanInsert is the btree-private state needed to find an initial position
* for an indexscan, or to insert new tuples -- an "insertion scankey" (not to
* be confused with a search scankey). It's used to descend a B-Tree using
* _bt_search.
*
* When nextkey is false (the usual case), _bt_search and _bt_binsrch will
* locate the first item >= scankey. When nextkey is true, they will locate
* the first item > scan key.
*
* scankeys is an array of scan key entries for attributes that are compared.
* keysz is the size of the array. During insertion, there must be a scan key
* for every attribute, but when starting a regular index scan some can be
* omitted. The array is used as a flexible array member, though it's sized
* in a way that makes it possible to use stack allocations. See
* nbtree/README for full details.
*/
typedef struct BTScanInsertData
{
bool nextkey;
int keysz; /* Size of scankeys array */
ScanKeyData scankeys[INDEX_MAX_KEYS]; /* Must appear last */
} BTScanInsertData;
typedef BTScanInsertData *BTScanInsert;
/*
* BTInsertStateData is a working area used during insertion.
*
* This is filled in after descending the tree to the first leaf page the new
* tuple might belong on. Tracks the current position while performing
* uniqueness check, before we have determined which exact page to insert
* to.
*
* (This should be private to nbtinsert.c, but it's also used by
* _bt_binsrch_insert)
*/
typedef struct BTInsertStateData
{
IndexTuple itup; /* Item we're inserting */
Size itemsz; /* Size of itup -- should be MAXALIGN()'d */
BTScanInsert itup_key; /* Insertion scankey */
/* Buffer containing leaf page we're likely to insert itup on */
Buffer buf;
/*
* Cache of bounds within the current buffer. Only used for insertions
* where _bt_check_unique is called. See _bt_binsrch_insert and
* _bt_findinsertloc for details.
*/
bool bounds_valid;
OffsetNumber low;
OffsetNumber stricthigh;
} BTInsertStateData;
typedef BTInsertStateData *BTInsertState;
/*
* BTScanOpaqueData is the btree-private state needed for an indexscan.
* This consists of preprocessed scan keys (see _bt_preprocess_keys() for
......@@ -558,16 +616,12 @@ extern int _bt_pagedel(Relation rel, Buffer buf);
/*
* prototypes for functions in nbtsearch.c
*/
extern BTStack _bt_search(Relation rel,
int keysz, ScanKey scankey, bool nextkey,
Buffer *bufP, int access, Snapshot snapshot);
extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
ScanKey scankey, bool nextkey, bool forupdate, BTStack stack,
int access, Snapshot snapshot);
extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
ScanKey scankey, bool nextkey);
extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
Page page, OffsetNumber offnum);
extern BTStack _bt_search(Relation rel, BTScanInsert key, Buffer *bufP,
int access, Snapshot snapshot);
extern Buffer _bt_moveright(Relation rel, BTScanInsert key, Buffer buf,
bool forupdate, BTStack stack, int access, Snapshot snapshot);
extern OffsetNumber _bt_binsrch_insert(Relation rel, BTInsertState insertstate);
extern int32 _bt_compare(Relation rel, BTScanInsert key, Page page, OffsetNumber offnum);
extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
......@@ -576,9 +630,7 @@ extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
/*
* prototypes for functions in nbtutils.c
*/
extern ScanKey _bt_mkscankey(Relation rel, IndexTuple itup);
extern ScanKey _bt_mkscankey_nodata(Relation rel);
extern void _bt_freeskey(ScanKey skey);
extern BTScanInsert _bt_mkscankey(Relation rel, IndexTuple itup);
extern void _bt_freestack(BTStack stack);
extern void _bt_preprocess_array_keys(IndexScanDesc scan);
extern void _bt_start_array_keys(IndexScanDesc scan, ScanDirection dir);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment