Commit 7c75ef57 authored by Robert Haas's avatar Robert Haas

hash: Implement page-at-a-time scan.

Commit 09cb5c0e added a similar
optimization to btree back in 2006, but nobody bothered to implement
the same thing for hash indexes, probably because they weren't
WAL-logged and had lots of other performance problems as well.  As
with the corresponding btree case, this eliminates the problem of
potentially needing to refind our position within the page, and cuts
down on pin/unpin traffic as well.

Ashutosh Sharma, reviewed by Alexander Korotkov, Jesper Pedersen,
Amit Kapila, and me.  Some final edits to comments and README by
me.

Discussion: http://postgr.es/m/CAE9k0Pm3KTx93K8_5j6VMzG4h5F+SyknxUwXrN-zqSZ9X8ZS3w@mail.gmail.com
parent 0f574a7a
......@@ -259,10 +259,11 @@ The reader algorithm is:
-- then, per read request:
reacquire content lock on current page
step to next page if necessary (no chaining of content locks, but keep
the pin on the primary bucket throughout the scan; we also maintain
a pin on the page currently being scanned)
get tuple
release content lock
the pin on the primary bucket throughout the scan)
save all the matching tuples from current index page into an items array
release pin and content lock (but if it is primary bucket page retain
its pin till the end of the scan)
get tuple from an item array
-- at scan shutdown:
release all pins still held
......@@ -270,15 +271,13 @@ Holding the buffer pin on the primary bucket page for the whole scan prevents
the reader's current-tuple pointer from being invalidated by splits or
compactions. (Of course, other buckets can still be split or compacted.)
To keep concurrency reasonably good, we require readers to cope with
concurrent insertions, which means that they have to be able to re-find
their current scan position after re-acquiring the buffer content lock on
page. Since deletion is not possible while a reader holds the pin on bucket,
and we assume that heap tuple TIDs are unique, this can be implemented by
searching for the same heap tuple TID previously returned. Insertion does
not move index entries across pages, so the previously-returned index entry
should always be on the same page, at the same or higher offset number,
as it was before.
To minimize lock/unlock traffic, hash index scan always searches the entire
hash page to identify all the matching items at once, copying their heap tuple
IDs into backend-local storage. The heap tuple IDs are then processed while not
holding any page lock within the index thereby, allowing concurrent insertion
to happen on the same index page without any requirement of re-finding the
current scan position for the reader. We do continue to hold a pin on the
bucket page, to protect against concurrent deletions and bucket split.
To allow for scans during a bucket split, if at the start of the scan, the
bucket is marked as bucket-being-populated, it scan all the tuples in that
......@@ -415,23 +414,43 @@ The fourth operation is garbage collection (bulk deletion):
Note that this is designed to allow concurrent splits and scans. If a split
occurs, tuples relocated into the new bucket will be visited twice by the
scan, but that does no harm. As we release the lock on bucket page during
cleanup scan of a bucket, it will allow concurrent scan to start on a bucket
and ensures that scan will always be behind cleanup. It is must to keep scans
behind cleanup, else vacuum could decrease the TIDs that are required to
complete the scan. Now, as the scan that returns multiple tuples from the
same bucket page always expect next valid TID to be greater than or equal to
the current TID, it might miss the tuples. This holds true for backward scans
as well (backward scans first traverse each bucket starting from first bucket
to last overflow page in the chain). We must be careful about the statistics
reported by the VACUUM operation. What we can do is count the number of
tuples scanned, and believe this in preference to the stored tuple count if
the stored tuple count and number of buckets did *not* change at any time
during the scan. This provides a way of correcting the stored tuple count if
it gets out of sync for some reason. But if a split or insertion does occur
concurrently, the scan count is untrustworthy; instead, subtract the number of
tuples deleted from the stored tuple count and use that.
scan, but that does no harm. See also "Interlocking Between Scans and
VACUUM", below.
We must be careful about the statistics reported by the VACUUM operation.
What we can do is count the number of tuples scanned, and believe this in
preference to the stored tuple count if the stored tuple count and number of
buckets did *not* change at any time during the scan. This provides a way of
correcting the stored tuple count if it gets out of sync for some reason. But
if a split or insertion does occur concurrently, the scan count is
untrustworthy; instead, subtract the number of tuples deleted from the stored
tuple count and use that.
Interlocking Between Scans and VACUUM
-------------------------------------
Since we release the lock on bucket page during a cleanup scan of a bucket, a
concurrent scan could start in that bucket before we've finished vacuuming it.
If a scan gets ahead of cleanup, we could have the following problem: (1) the
scan sees heap TIDs that are about to be removed before they are processed by
VACUUM, (2) the scan decides that one or more of those TIDs are dead, (3)
VACUUM completes, (3) one or more of the TIDs the scan decided were dead are
reused for an unrelated tuple, and finally (5) the scan wakes up and
erroneously kills the new tuple.
Note that this requires VACUUM and a scan to be active in the same bucket at
the same time. If VACUUM completes before the scan starts, the scan never has
a chance to see the dead tuples; if the scan completes before the VACUUM
starts, the heap TIDs can't have been reused meanwhile. Furthermore, VACUUM
can't start on a bucket that has an active scan, because the scan holds a pin
on the primary bucket page, and VACUUM must take a cleanup lock on that page
in order to begin cleanup. Therefore, the only way this problem can occur is
for a scan to start after VACUUM has released the cleanup lock on the bucket
but before it has processed the entire bucket and then overtake the cleanup
operation.
Currently, we prevent this using lock chaining: cleanup locks the next page
in the chain before releasing the lock and pin on the page just processed.
Free Space Management
---------------------
......
......@@ -268,65 +268,20 @@ bool
hashgettuple(IndexScanDesc scan, ScanDirection dir)
{
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation;
Buffer buf;
Page page;
OffsetNumber offnum;
ItemPointer current;
bool res;
/* Hash indexes are always lossy since we store only the hash code */
scan->xs_recheck = true;
/*
* We hold pin but not lock on current buffer while outside the hash AM.
* Reacquire the read lock here.
*/
if (BufferIsValid(so->hashso_curbuf))
LockBuffer(so->hashso_curbuf, BUFFER_LOCK_SHARE);
/*
* If we've already initialized this scan, we can just advance it in the
* appropriate direction. If we haven't done so yet, we call a routine to
* get the first item in the scan.
*/
current = &(so->hashso_curpos);
if (ItemPointerIsValid(current))
if (!HashScanPosIsValid(so->currPos))
res = _hash_first(scan, dir);
else
{
/*
* An insertion into the current index page could have happened while
* we didn't have read lock on it. Re-find our position by looking
* for the TID we previously returned. (Because we hold a pin on the
* primary bucket page, no deletions or splits could have occurred;
* therefore we can expect that the TID still exists in the current
* index page, at an offset >= where we were.)
*/
OffsetNumber maxoffnum;
buf = so->hashso_curbuf;
Assert(BufferIsValid(buf));
page = BufferGetPage(buf);
/*
* We don't need test for old snapshot here as the current buffer is
* pinned, so vacuum can't clean the page.
*/
maxoffnum = PageGetMaxOffsetNumber(page);
for (offnum = ItemPointerGetOffsetNumber(current);
offnum <= maxoffnum;
offnum = OffsetNumberNext(offnum))
{
IndexTuple itup;
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
if (ItemPointerEquals(&(so->hashso_heappos), &(itup->t_tid)))
break;
}
if (offnum > maxoffnum)
elog(ERROR, "failed to re-find scan position within index \"%s\"",
RelationGetRelationName(rel));
ItemPointerSetOffsetNumber(current, offnum);
/*
* Check to see if we should kill the previously-fetched tuple.
*/
......@@ -341,16 +296,11 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)
* entries.
*/
if (so->killedItems == NULL)
so->killedItems = palloc(MaxIndexTuplesPerPage *
sizeof(HashScanPosItem));
so->killedItems = (int *)
palloc(MaxIndexTuplesPerPage * sizeof(int));
if (so->numKilled < MaxIndexTuplesPerPage)
{
so->killedItems[so->numKilled].heapTid = so->hashso_heappos;
so->killedItems[so->numKilled].indexOffset =
ItemPointerGetOffsetNumber(&(so->hashso_curpos));
so->numKilled++;
}
so->killedItems[so->numKilled++] = so->currPos.itemIndex;
}
/*
......@@ -358,30 +308,6 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)
*/
res = _hash_next(scan, dir);
}
else
res = _hash_first(scan, dir);
/*
* Skip killed tuples if asked to.
*/
if (scan->ignore_killed_tuples)
{
while (res)
{
offnum = ItemPointerGetOffsetNumber(current);
page = BufferGetPage(so->hashso_curbuf);
if (!ItemIdIsDead(PageGetItemId(page, offnum)))
break;
res = _hash_next(scan, dir);
}
}
/* Release read lock on current buffer, but keep it pinned */
if (BufferIsValid(so->hashso_curbuf))
LockBuffer(so->hashso_curbuf, BUFFER_LOCK_UNLOCK);
/* Return current heap TID on success */
scan->xs_ctup.t_self = so->hashso_heappos;
return res;
}
......@@ -396,35 +322,21 @@ hashgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
HashScanOpaque so = (HashScanOpaque) scan->opaque;
bool res;
int64 ntids = 0;
HashScanPosItem *currItem;
res = _hash_first(scan, ForwardScanDirection);
while (res)
{
bool add_tuple;
currItem = &so->currPos.items[so->currPos.itemIndex];
/*
* Skip killed tuples if asked to.
* _hash_first and _hash_next handle eliminate dead index entries
* whenever scan->ignored_killed_tuples is true. Therefore, there's
* nothing to do here except add the results to the TIDBitmap.
*/
if (scan->ignore_killed_tuples)
{
Page page;
OffsetNumber offnum;
offnum = ItemPointerGetOffsetNumber(&(so->hashso_curpos));
page = BufferGetPage(so->hashso_curbuf);
add_tuple = !ItemIdIsDead(PageGetItemId(page, offnum));
}
else
add_tuple = true;
/* Save tuple ID, and continue scanning */
if (add_tuple)
{
/* Note we mark the tuple ID as requiring recheck */
tbm_add_tuples(tbm, &(so->hashso_heappos), 1, true);
ntids++;
}
tbm_add_tuples(tbm, &(currItem->heapTid), 1, true);
ntids++;
res = _hash_next(scan, ForwardScanDirection);
}
......@@ -448,12 +360,9 @@ hashbeginscan(Relation rel, int nkeys, int norderbys)
scan = RelationGetIndexScan(rel, nkeys, norderbys);
so = (HashScanOpaque) palloc(sizeof(HashScanOpaqueData));
so->hashso_curbuf = InvalidBuffer;
HashScanPosInvalidate(so->currPos);
so->hashso_bucket_buf = InvalidBuffer;
so->hashso_split_bucket_buf = InvalidBuffer;
/* set position invalid (this will cause _hash_first call) */
ItemPointerSetInvalid(&(so->hashso_curpos));
ItemPointerSetInvalid(&(so->hashso_heappos));
so->hashso_buc_populated = false;
so->hashso_buc_split = false;
......@@ -476,22 +385,17 @@ hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation;
/*
* Before leaving current page, deal with any killed items. Also, ensure
* that we acquire lock on current page before calling _hash_kill_items.
*/
if (so->numKilled > 0)
if (HashScanPosIsValid(so->currPos))
{
LockBuffer(so->hashso_curbuf, BUFFER_LOCK_SHARE);
_hash_kill_items(scan);
LockBuffer(so->hashso_curbuf, BUFFER_LOCK_UNLOCK);
/* Before leaving current page, deal with any killed items */
if (so->numKilled > 0)
_hash_kill_items(scan);
}
_hash_dropscanbuf(rel, so);
/* set position invalid (this will cause _hash_first call) */
ItemPointerSetInvalid(&(so->hashso_curpos));
ItemPointerSetInvalid(&(so->hashso_heappos));
HashScanPosInvalidate(so->currPos);
/* Update scan key, if a new one is given */
if (scankey && scan->numberOfKeys > 0)
......@@ -514,15 +418,11 @@ hashendscan(IndexScanDesc scan)
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation;
/*
* Before leaving current page, deal with any killed items. Also, ensure
* that we acquire lock on current page before calling _hash_kill_items.
*/
if (so->numKilled > 0)
if (HashScanPosIsValid(so->currPos))
{
LockBuffer(so->hashso_curbuf, BUFFER_LOCK_SHARE);
_hash_kill_items(scan);
LockBuffer(so->hashso_curbuf, BUFFER_LOCK_UNLOCK);
/* Before leaving current page, deal with any killed items */
if (so->numKilled > 0)
_hash_kill_items(scan);
}
_hash_dropscanbuf(rel, so);
......@@ -755,16 +655,15 @@ hashvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
* primary bucket page. The lock won't necessarily be held continuously,
* though, because we'll release it when visiting overflow pages.
*
* It would be very bad if this function cleaned a page while some other
* backend was in the midst of scanning it, because hashgettuple assumes
* that the next valid TID will be greater than or equal to the current
* valid TID. There can't be any concurrent scans in progress when we first
* enter this function because of the cleanup lock we hold on the primary
* bucket page, but as soon as we release that lock, there might be. We
* handle that by conspiring to prevent those scans from passing our cleanup
* scan. To do that, we lock the next page in the bucket chain before
* releasing the lock on the previous page. (This type of lock chaining is
* not ideal, so we might want to look for a better solution at some point.)
* There can't be any concurrent scans in progress when we first enter this
* function because of the cleanup lock we hold on the primary bucket page,
* but as soon as we release that lock, there might be. If those scans got
* ahead of our cleanup scan, they might see a tuple before we kill it and
* wake up only after VACUUM has completed and the TID has been recycled for
* an unrelated tuple. To avoid that calamity, we prevent scans from passing
* our cleanup scan by locking the next page in the bucket chain before
* releasing the lock on the previous page. (This type of lock chaining is not
* ideal, so we might want to look for a better solution at some point.)
*
* We need to retain a pin on the primary bucket to ensure that no concurrent
* split can start.
......
......@@ -298,20 +298,20 @@ _hash_dropscanbuf(Relation rel, HashScanOpaque so)
{
/* release pin we hold on primary bucket page */
if (BufferIsValid(so->hashso_bucket_buf) &&
so->hashso_bucket_buf != so->hashso_curbuf)
so->hashso_bucket_buf != so->currPos.buf)
_hash_dropbuf(rel, so->hashso_bucket_buf);
so->hashso_bucket_buf = InvalidBuffer;
/* release pin we hold on primary bucket page of bucket being split */
if (BufferIsValid(so->hashso_split_bucket_buf) &&
so->hashso_split_bucket_buf != so->hashso_curbuf)
so->hashso_split_bucket_buf != so->currPos.buf)
_hash_dropbuf(rel, so->hashso_split_bucket_buf);
so->hashso_split_bucket_buf = InvalidBuffer;
/* release any pin we still hold */
if (BufferIsValid(so->hashso_curbuf))
_hash_dropbuf(rel, so->hashso_curbuf);
so->hashso_curbuf = InvalidBuffer;
if (BufferIsValid(so->currPos.buf))
_hash_dropbuf(rel, so->currPos.buf);
so->currPos.buf = InvalidBuffer;
/* reset split scan */
so->hashso_buc_populated = false;
......
......@@ -20,44 +20,105 @@
#include "pgstat.h"
#include "utils/rel.h"
static bool _hash_readpage(IndexScanDesc scan, Buffer *bufP,
ScanDirection dir);
static int _hash_load_qualified_items(IndexScanDesc scan, Page page,
OffsetNumber offnum, ScanDirection dir);
static inline void _hash_saveitem(HashScanOpaque so, int itemIndex,
OffsetNumber offnum, IndexTuple itup);
static void _hash_readnext(IndexScanDesc scan, Buffer *bufp,
Page *pagep, HashPageOpaque *opaquep);
/*
* _hash_next() -- Get the next item in a scan.
*
* On entry, we have a valid hashso_curpos in the scan, and a
* pin and read lock on the page that contains that item.
* We find the next item in the scan, if any.
* On success exit, we have the page containing the next item
* pinned and locked.
* On entry, so->currPos describes the current page, which may
* be pinned but not locked, and so->currPos.itemIndex identifies
* which item was previously returned.
*
* On successful exit, scan->xs_ctup.t_self is set to the TID
* of the next heap tuple. so->currPos is updated as needed.
*
* On failure exit (no more tuples), we return FALSE with pin
* held on bucket page but no pins or locks held on overflow
* page.
*/
bool
_hash_next(IndexScanDesc scan, ScanDirection dir)
{
Relation rel = scan->indexRelation;
HashScanOpaque so = (HashScanOpaque) scan->opaque;
HashScanPosItem *currItem;
BlockNumber blkno;
Buffer buf;
Page page;
OffsetNumber offnum;
ItemPointer current;
IndexTuple itup;
/* we still have the buffer pinned and read-locked */
buf = so->hashso_curbuf;
Assert(BufferIsValid(buf));
bool end_of_scan = false;
/*
* step to next valid tuple.
* Advance to the next tuple on the current page; or if done, try to read
* data from the next or previous page based on the scan direction. Before
* moving to the next or previous page make sure that we deal with all the
* killed items.
*/
if (!_hash_step(scan, &buf, dir))
if (ScanDirectionIsForward(dir))
{
if (++so->currPos.itemIndex > so->currPos.lastItem)
{
if (so->numKilled > 0)
_hash_kill_items(scan);
blkno = so->currPos.nextPage;
if (BlockNumberIsValid(blkno))
{
buf = _hash_getbuf(rel, blkno, HASH_READ, LH_OVERFLOW_PAGE);
TestForOldSnapshot(scan->xs_snapshot, rel, BufferGetPage(buf));
if (!_hash_readpage(scan, &buf, dir))
end_of_scan = true;
}
else
end_of_scan = true;
}
}
else
{
if (--so->currPos.itemIndex < so->currPos.firstItem)
{
if (so->numKilled > 0)
_hash_kill_items(scan);
blkno = so->currPos.prevPage;
if (BlockNumberIsValid(blkno))
{
buf = _hash_getbuf(rel, blkno, HASH_READ,
LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
TestForOldSnapshot(scan->xs_snapshot, rel, BufferGetPage(buf));
/*
* We always maintain the pin on bucket page for whole scan
* operation, so releasing the additional pin we have acquired
* here.
*/
if (buf == so->hashso_bucket_buf ||
buf == so->hashso_split_bucket_buf)
_hash_dropbuf(rel, buf);
if (!_hash_readpage(scan, &buf, dir))
end_of_scan = true;
}
else
end_of_scan = true;
}
}
if (end_of_scan)
{
_hash_dropscanbuf(rel, so);
HashScanPosInvalidate(so->currPos);
return false;
}
/* if we're here, _hash_step found a valid tuple */
current = &(so->hashso_curpos);
offnum = ItemPointerGetOffsetNumber(current);
_hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
page = BufferGetPage(buf);
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
so->hashso_heappos = itup->t_tid;
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
scan->xs_ctup.t_self = currItem->heapTid;
return true;
}
......@@ -212,11 +273,18 @@ _hash_readprev(IndexScanDesc scan,
/*
* _hash_first() -- Find the first item in a scan.
*
* Find the first item in the index that
* satisfies the qualification associated with the scan descriptor. On
* success, the page containing the current index tuple is read locked
* and pinned, and the scan's opaque data entry is updated to
* include the buffer.
* We find the first item (or, if backward scan, the last item) in the
* index that satisfies the qualification associated with the scan
* descriptor.
*
* On successful exit, if the page containing current index tuple is an
* overflow page, both pin and lock are released whereas if it is a bucket
* page then it is pinned but not locked and data about the matching
* tuple(s) on the page has been loaded into so->currPos,
* scan->xs_ctup.t_self is set to the heap TID of the current tuple.
*
* On failure exit (no more tuples), we return FALSE, with pin held on
* bucket page but no pins or locks held on overflow page.
*/
bool
_hash_first(IndexScanDesc scan, ScanDirection dir)
......@@ -229,15 +297,10 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
Buffer buf;
Page page;
HashPageOpaque opaque;
IndexTuple itup;
ItemPointer current;
OffsetNumber offnum;
HashScanPosItem *currItem;
pgstat_count_index_scan(rel);
current = &(so->hashso_curpos);
ItemPointerSetInvalid(current);
/*
* We do not support hash scans with no index qualification, because we
* would have to read the whole index rather than just one bucket. That
......@@ -356,222 +419,308 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
_hash_readnext(scan, &buf, &page, &opaque);
}
/* Now find the first tuple satisfying the qualification */
if (!_hash_step(scan, &buf, dir))
/* remember which buffer we have pinned, if any */
Assert(BufferIsInvalid(so->currPos.buf));
so->currPos.buf = buf;
/* Now find all the tuples satisfying the qualification from a page */
if (!_hash_readpage(scan, &buf, dir))
return false;
/* if we're here, _hash_step found a valid tuple */
offnum = ItemPointerGetOffsetNumber(current);
_hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
page = BufferGetPage(buf);
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
so->hashso_heappos = itup->t_tid;
/* OK, itemIndex says what to return */
currItem = &so->currPos.items[so->currPos.itemIndex];
scan->xs_ctup.t_self = currItem->heapTid;
/* if we're here, _hash_readpage found a valid tuples */
return true;
}
/*
* _hash_step() -- step to the next valid item in a scan in the bucket.
*
* If no valid record exists in the requested direction, return
* false. Else, return true and set the hashso_curpos for the
* scan to the right thing.
* _hash_readpage() -- Load data from current index page into so->currPos
*
* Here we need to ensure that if the scan has started during split, then
* skip the tuples that are moved by split while scanning bucket being
* populated and then scan the bucket being split to cover all such
* tuples. This is done to ensure that we don't miss tuples in the scans
* that are started during split.
* We scan all the items in the current index page and save them into
* so->currPos if it satisfies the qualification. If no matching items
* are found in the current page, we move to the next or previous page
* in a bucket chain as indicated by the direction.
*
* 'bufP' points to the current buffer, which is pinned and read-locked.
* On success exit, we have pin and read-lock on whichever page
* contains the right item; on failure, we have released all buffers.
* Return true if any matching items are found else return false.
*/
bool
_hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
static bool
_hash_readpage(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
{
Relation rel = scan->indexRelation;
HashScanOpaque so = (HashScanOpaque) scan->opaque;
ItemPointer current;
Buffer buf;
Page page;
HashPageOpaque opaque;
OffsetNumber maxoff;
OffsetNumber offnum;
BlockNumber blkno;
IndexTuple itup;
current = &(so->hashso_curpos);
uint16 itemIndex;
buf = *bufP;
Assert(BufferIsValid(buf));
_hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
page = BufferGetPage(buf);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
/*
* If _hash_step is called from _hash_first, current will not be valid, so
* we can't dereference it. However, in that case, we presumably want to
* start at the beginning/end of the page...
*/
maxoff = PageGetMaxOffsetNumber(page);
if (ItemPointerIsValid(current))
offnum = ItemPointerGetOffsetNumber(current);
else
offnum = InvalidOffsetNumber;
so->currPos.buf = buf;
/*
* 'offnum' now points to the last tuple we examined (if any).
*
* continue to step through tuples until: 1) we get to the end of the
* bucket chain or 2) we find a valid tuple.
* We save the LSN of the page as we read it, so that we know whether it
* is safe to apply LP_DEAD hints to the page later.
*/
do
so->currPos.lsn = PageGetLSN(page);
so->currPos.currPage = BufferGetBlockNumber(buf);
if (ScanDirectionIsForward(dir))
{
switch (dir)
BlockNumber prev_blkno = InvalidBlockNumber;
for (;;)
{
case ForwardScanDirection:
if (offnum != InvalidOffsetNumber)
offnum = OffsetNumberNext(offnum); /* move forward */
else
{
/* new page, locate starting position by binary search */
offnum = _hash_binsearch(page, so->hashso_sk_hash);
}
for (;;)
{
/*
* check if we're still in the range of items with the
* target hash key
*/
if (offnum <= maxoff)
{
Assert(offnum >= FirstOffsetNumber);
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
/*
* skip the tuples that are moved by split operation
* for the scan that has started when split was in
* progress
*/
if (so->hashso_buc_populated && !so->hashso_buc_split &&
(itup->t_info & INDEX_MOVED_BY_SPLIT_MASK))
{
offnum = OffsetNumberNext(offnum); /* move forward */
continue;
}
if (so->hashso_sk_hash == _hash_get_indextuple_hashkey(itup))
break; /* yes, so exit for-loop */
}
/* Before leaving current page, deal with any killed items */
if (so->numKilled > 0)
_hash_kill_items(scan);
/*
* ran off the end of this page, try the next
*/
_hash_readnext(scan, &buf, &page, &opaque);
if (BufferIsValid(buf))
{
maxoff = PageGetMaxOffsetNumber(page);
offnum = _hash_binsearch(page, so->hashso_sk_hash);
}
else
{
itup = NULL;
break; /* exit for-loop */
}
}
/* new page, locate starting position by binary search */
offnum = _hash_binsearch(page, so->hashso_sk_hash);
itemIndex = _hash_load_qualified_items(scan, page, offnum, dir);
if (itemIndex != 0)
break;
case BackwardScanDirection:
if (offnum != InvalidOffsetNumber)
offnum = OffsetNumberPrev(offnum); /* move back */
else
{
/* new page, locate starting position by binary search */
offnum = _hash_binsearch_last(page, so->hashso_sk_hash);
}
for (;;)
{
/*
* check if we're still in the range of items with the
* target hash key
*/
if (offnum >= FirstOffsetNumber)
{
Assert(offnum <= maxoff);
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
/*
* skip the tuples that are moved by split operation
* for the scan that has started when split was in
* progress
*/
if (so->hashso_buc_populated && !so->hashso_buc_split &&
(itup->t_info & INDEX_MOVED_BY_SPLIT_MASK))
{
offnum = OffsetNumberPrev(offnum); /* move back */
continue;
}
if (so->hashso_sk_hash == _hash_get_indextuple_hashkey(itup))
break; /* yes, so exit for-loop */
}
/* Before leaving current page, deal with any killed items */
if (so->numKilled > 0)
_hash_kill_items(scan);
/*
* ran off the end of this page, try the next
*/
_hash_readprev(scan, &buf, &page, &opaque);
if (BufferIsValid(buf))
{
TestForOldSnapshot(scan->xs_snapshot, rel, page);
maxoff = PageGetMaxOffsetNumber(page);
offnum = _hash_binsearch_last(page, so->hashso_sk_hash);
}
else
{
itup = NULL;
break; /* exit for-loop */
}
}
/*
* Could not find any matching tuples in the current page, move to
* the next page. Before leaving the current page, deal with any
* killed items.
*/
if (so->numKilled > 0)
_hash_kill_items(scan);
/*
* If this is a primary bucket page, hasho_prevblkno is not a real
* block number.
*/
if (so->currPos.buf == so->hashso_bucket_buf ||
so->currPos.buf == so->hashso_split_bucket_buf)
prev_blkno = InvalidBlockNumber;
else
prev_blkno = opaque->hasho_prevblkno;
_hash_readnext(scan, &buf, &page, &opaque);
if (BufferIsValid(buf))
{
so->currPos.buf = buf;
so->currPos.currPage = BufferGetBlockNumber(buf);
so->currPos.lsn = PageGetLSN(page);
}
else
{
/*
* Remember next and previous block numbers for scrollable
* cursors to know the start position and return FALSE
* indicating that no more matching tuples were found. Also,
* don't reset currPage or lsn, because we expect
* _hash_kill_items to be called for the old page after this
* function returns.
*/
so->currPos.prevPage = prev_blkno;
so->currPos.nextPage = InvalidBlockNumber;
so->currPos.buf = buf;
return false;
}
}
so->currPos.firstItem = 0;
so->currPos.lastItem = itemIndex - 1;
so->currPos.itemIndex = 0;
}
else
{
BlockNumber next_blkno = InvalidBlockNumber;
for (;;)
{
/* new page, locate starting position by binary search */
offnum = _hash_binsearch_last(page, so->hashso_sk_hash);
itemIndex = _hash_load_qualified_items(scan, page, offnum, dir);
if (itemIndex != MaxIndexTuplesPerPage)
break;
default:
/* NoMovementScanDirection */
/* this should not be reached */
itup = NULL;
/*
* Could not find any matching tuples in the current page, move to
* the previous page. Before leaving the current page, deal with
* any killed items.
*/
if (so->numKilled > 0)
_hash_kill_items(scan);
if (so->currPos.buf == so->hashso_bucket_buf ||
so->currPos.buf == so->hashso_split_bucket_buf)
next_blkno = opaque->hasho_nextblkno;
_hash_readprev(scan, &buf, &page, &opaque);
if (BufferIsValid(buf))
{
so->currPos.buf = buf;
so->currPos.currPage = BufferGetBlockNumber(buf);
so->currPos.lsn = PageGetLSN(page);
}
else
{
/*
* Remember next and previous block numbers for scrollable
* cursors to know the start position and return FALSE
* indicating that no more matching tuples were found. Also,
* don't reset currPage or lsn, because we expect
* _hash_kill_items to be called for the old page after this
* function returns.
*/
so->currPos.prevPage = InvalidBlockNumber;
so->currPos.nextPage = next_blkno;
so->currPos.buf = buf;
return false;
}
}
so->currPos.firstItem = itemIndex;
so->currPos.lastItem = MaxIndexTuplesPerPage - 1;
so->currPos.itemIndex = MaxIndexTuplesPerPage - 1;
}
if (so->currPos.buf == so->hashso_bucket_buf ||
so->currPos.buf == so->hashso_split_bucket_buf)
{
so->currPos.prevPage = InvalidBlockNumber;
so->currPos.nextPage = opaque->hasho_nextblkno;
LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
}
else
{
so->currPos.prevPage = opaque->hasho_prevblkno;
so->currPos.nextPage = opaque->hasho_nextblkno;
_hash_relbuf(rel, so->currPos.buf);
so->currPos.buf = InvalidBuffer;
}
Assert(so->currPos.firstItem <= so->currPos.lastItem);
return true;
}
/*
* Load all the qualified items from a current index page
* into so->currPos. Helper function for _hash_readpage.
*/
static int
_hash_load_qualified_items(IndexScanDesc scan, Page page,
OffsetNumber offnum, ScanDirection dir)
{
HashScanOpaque so = (HashScanOpaque) scan->opaque;
IndexTuple itup;
int itemIndex;
OffsetNumber maxoff;
maxoff = PageGetMaxOffsetNumber(page);
if (ScanDirectionIsForward(dir))
{
/* load items[] in ascending order */
itemIndex = 0;
while (offnum <= maxoff)
{
Assert(offnum >= FirstOffsetNumber);
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
/*
* skip the tuples that are moved by split operation for the scan
* that has started when split was in progress. Also, skip the
* tuples that are marked as dead.
*/
if ((so->hashso_buc_populated && !so->hashso_buc_split &&
(itup->t_info & INDEX_MOVED_BY_SPLIT_MASK)) ||
(scan->ignore_killed_tuples &&
(ItemIdIsDead(PageGetItemId(page, offnum)))))
{
offnum = OffsetNumberNext(offnum); /* move forward */
continue;
}
if (so->hashso_sk_hash == _hash_get_indextuple_hashkey(itup) &&
_hash_checkqual(scan, itup))
{
/* tuple is qualified, so remember it */
_hash_saveitem(so, itemIndex, offnum, itup);
itemIndex++;
}
else
{
/*
* No more matching tuples exist in this page. so, exit while
* loop.
*/
break;
}
offnum = OffsetNumberNext(offnum);
}
if (itup == NULL)
Assert(itemIndex <= MaxIndexTuplesPerPage);
return itemIndex;
}
else
{
/* load items[] in descending order */
itemIndex = MaxIndexTuplesPerPage;
while (offnum >= FirstOffsetNumber)
{
Assert(offnum <= maxoff);
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
/*
* We ran off the end of the bucket without finding a match.
* Release the pin on bucket buffers. Normally, such pins are
* released at end of scan, however scrolling cursors can
* reacquire the bucket lock and pin in the same scan multiple
* times.
* skip the tuples that are moved by split operation for the scan
* that has started when split was in progress. Also, skip the
* tuples that are marked as dead.
*/
*bufP = so->hashso_curbuf = InvalidBuffer;
ItemPointerSetInvalid(current);
_hash_dropscanbuf(rel, so);
return false;
if ((so->hashso_buc_populated && !so->hashso_buc_split &&
(itup->t_info & INDEX_MOVED_BY_SPLIT_MASK)) ||
(scan->ignore_killed_tuples &&
(ItemIdIsDead(PageGetItemId(page, offnum)))))
{
offnum = OffsetNumberPrev(offnum); /* move back */
continue;
}
if (so->hashso_sk_hash == _hash_get_indextuple_hashkey(itup) &&
_hash_checkqual(scan, itup))
{
itemIndex--;
/* tuple is qualified, so remember it */
_hash_saveitem(so, itemIndex, offnum, itup);
}
else
{
/*
* No more matching tuples exist in this page. so, exit while
* loop.
*/
break;
}
offnum = OffsetNumberPrev(offnum);
}
/* check the tuple quals, loop around if not met */
} while (!_hash_checkqual(scan, itup));
Assert(itemIndex >= 0);
return itemIndex;
}
}
/* Save an index item into so->currPos.items[itemIndex] */
static inline void
_hash_saveitem(HashScanOpaque so, int itemIndex,
OffsetNumber offnum, IndexTuple itup)
{
HashScanPosItem *currItem = &so->currPos.items[itemIndex];
/* if we made it to here, we've found a valid tuple */
blkno = BufferGetBlockNumber(buf);
*bufP = so->hashso_curbuf = buf;
ItemPointerSet(current, blkno, offnum);
return true;
currItem->heapTid = itup->t_tid;
currItem->indexOffset = offnum;
}
......@@ -522,13 +522,30 @@ _hash_get_newbucket_from_oldbucket(Relation rel, Bucket old_bucket,
* current page and killed tuples thereon (generally, this should only be
* called if so->numKilled > 0).
*
* The caller does not have a lock on the page and may or may not have the
* page pinned in a buffer. Note that read-lock is sufficient for setting
* LP_DEAD status (which is only a hint).
*
* The caller must have pin on bucket buffer, but may or may not have pin
* on overflow buffer, as indicated by HashScanPosIsPinned(so->currPos).
*
* We match items by heap TID before assuming they are the right ones to
* delete.
*
* Note that we keep the pin on the bucket page throughout the scan. Hence,
* there is no chance of VACUUM deleting any items from that page. However,
* having pin on the overflow page doesn't guarantee that vacuum won't delete
* any items.
*
* See _bt_killitems() for more details.
*/
void
_hash_kill_items(IndexScanDesc scan)
{
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation;
BlockNumber blkno;
Buffer buf;
Page page;
HashPageOpaque opaque;
OffsetNumber offnum,
......@@ -536,9 +553,11 @@ _hash_kill_items(IndexScanDesc scan)
int numKilled = so->numKilled;
int i;
bool killedsomething = false;
bool havePin = false;
Assert(so->numKilled > 0);
Assert(so->killedItems != NULL);
Assert(HashScanPosIsValid(so->currPos));
/*
* Always reset the scan state, so we don't look for same items on other
......@@ -546,20 +565,54 @@ _hash_kill_items(IndexScanDesc scan)
*/
so->numKilled = 0;
page = BufferGetPage(so->hashso_curbuf);
blkno = so->currPos.currPage;
if (HashScanPosIsPinned(so->currPos))
{
/*
* We already have pin on this buffer, so, all we need to do is
* acquire lock on it.
*/
havePin = true;
buf = so->currPos.buf;
LockBuffer(buf, BUFFER_LOCK_SHARE);
}
else
buf = _hash_getbuf(rel, blkno, HASH_READ, LH_OVERFLOW_PAGE);
/*
* If page LSN differs it means that the page was modified since the last
* read. killedItems could be not valid so applying LP_DEAD hints is not
* safe.
*/
page = BufferGetPage(buf);
if (PageGetLSN(page) != so->currPos.lsn)
{
if (havePin)
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
else
_hash_relbuf(rel, buf);
return;
}
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
for (i = 0; i < numKilled; i++)
{
offnum = so->killedItems[i].indexOffset;
int itemIndex = so->killedItems[i];
HashScanPosItem *currItem = &so->currPos.items[itemIndex];
offnum = currItem->indexOffset;
Assert(itemIndex >= so->currPos.firstItem &&
itemIndex <= so->currPos.lastItem);
while (offnum <= maxoff)
{
ItemId iid = PageGetItemId(page, offnum);
IndexTuple ituple = (IndexTuple) PageGetItem(page, iid);
if (ItemPointerEquals(&ituple->t_tid, &so->killedItems[i].heapTid))
if (ItemPointerEquals(&ituple->t_tid, &currItem->heapTid))
{
/* found the item */
ItemIdMarkDead(iid);
......@@ -578,6 +631,12 @@ _hash_kill_items(IndexScanDesc scan)
if (killedsomething)
{
opaque->hasho_flag |= LH_PAGE_HAS_DEAD_TUPLES;
MarkBufferDirtyHint(so->hashso_curbuf, true);
MarkBufferDirtyHint(buf, true);
}
if (so->hashso_bucket_buf == so->currPos.buf ||
havePin)
LockBuffer(so->currPos.buf, BUFFER_LOCK_UNLOCK);
else
_hash_relbuf(rel, buf);
}
......@@ -114,6 +114,53 @@ typedef struct HashScanPosItem /* what we remember about each match */
OffsetNumber indexOffset; /* index item's location within page */
} HashScanPosItem;
typedef struct HashScanPosData
{
Buffer buf; /* if valid, the buffer is pinned */
XLogRecPtr lsn; /* pos in the WAL stream when page was read */
BlockNumber currPage; /* current hash index page */
BlockNumber nextPage; /* next overflow page */
BlockNumber prevPage; /* prev overflow or bucket page */
/*
* The items array is always ordered in index order (ie, increasing
* indexoffset). When scanning backwards it is convenient to fill the
* array back-to-front, so we start at the last slot and fill downwards.
* Hence we need both a first-valid-entry and a last-valid-entry counter.
* itemIndex is a cursor showing which entry was last returned to caller.
*/
int firstItem; /* first valid index in items[] */
int lastItem; /* last valid index in items[] */
int itemIndex; /* current index in items[] */
HashScanPosItem items[MaxIndexTuplesPerPage]; /* MUST BE LAST */
} HashScanPosData;
#define HashScanPosIsPinned(scanpos) \
( \
AssertMacro(BlockNumberIsValid((scanpos).currPage) || \
!BufferIsValid((scanpos).buf)), \
BufferIsValid((scanpos).buf) \
)
#define HashScanPosIsValid(scanpos) \
( \
AssertMacro(BlockNumberIsValid((scanpos).currPage) || \
!BufferIsValid((scanpos).buf)), \
BlockNumberIsValid((scanpos).currPage) \
)
#define HashScanPosInvalidate(scanpos) \
do { \
(scanpos).buf = InvalidBuffer; \
(scanpos).lsn = InvalidXLogRecPtr; \
(scanpos).currPage = InvalidBlockNumber; \
(scanpos).nextPage = InvalidBlockNumber; \
(scanpos).prevPage = InvalidBlockNumber; \
(scanpos).firstItem = 0; \
(scanpos).lastItem = 0; \
(scanpos).itemIndex = 0; \
} while (0);
/*
* HashScanOpaqueData is private state for a hash index scan.
......@@ -123,14 +170,6 @@ typedef struct HashScanOpaqueData
/* Hash value of the scan key, ie, the hash key we seek */
uint32 hashso_sk_hash;
/*
* We also want to remember which buffer we're currently examining in the
* scan. We keep the buffer pinned (but not locked) across hashgettuple
* calls, in order to avoid doing a ReadBuffer() for every tuple in the
* index.
*/
Buffer hashso_curbuf;
/* remember the buffer associated with primary bucket */
Buffer hashso_bucket_buf;
......@@ -141,12 +180,6 @@ typedef struct HashScanOpaqueData
*/
Buffer hashso_split_bucket_buf;
/* Current position of the scan, as an index TID */
ItemPointerData hashso_curpos;
/* Current position of the scan, as a heap TID */
ItemPointerData hashso_heappos;
/* Whether scan starts on bucket being populated due to split */
bool hashso_buc_populated;
......@@ -156,8 +189,14 @@ typedef struct HashScanOpaqueData
*/
bool hashso_buc_split;
/* info about killed items if any (killedItems is NULL if never used) */
HashScanPosItem *killedItems; /* tids and offset numbers of killed items */
int *killedItems; /* currPos.items indexes of killed items */
int numKilled; /* number of currently stored items */
/*
* Identify all the matching items on a page and save them in
* HashScanPosData
*/
HashScanPosData currPos; /* current position data */
} HashScanOpaqueData;
typedef HashScanOpaqueData *HashScanOpaque;
......@@ -401,7 +440,6 @@ extern void _hash_finish_split(Relation rel, Buffer metabuf, Buffer obuf,
/* hashsearch.c */
extern bool _hash_next(IndexScanDesc scan, ScanDirection dir);
extern bool _hash_first(IndexScanDesc scan, ScanDirection dir);
extern bool _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir);
/* hashsort.c */
typedef struct HSpool HSpool; /* opaque struct in hashsort.c */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment