Commit b0f18cb7 authored by Robert Haas's avatar Robert Haas

hash: Refactor bucket squeeze code.

In preparation for adding write-ahead logging to hash indexes,
refactor _hash_freeovflpage and _hash_squeezebucket so that all
related page modifications happen in a single section of code.  The
previous coding assumed that it would be fine to move tuples one at a
time, and also that the various operations involved in freeing an
overflow page didn't necessarily all need to be done together, all
of which is true if you don't care about write-ahead logging.

Amit Kapila, with slight changes by me.
parent 817f2a58
......@@ -228,3 +228,44 @@ _hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup)
return itup_off;
}
/*
* _hash_pgaddmultitup() -- add a tuple vector to a particular page in the
* index.
*
* This routine has same requirements for locking and tuple ordering as
* _hash_pgaddtup().
*
* Returns the offset number array at which the tuples were inserted.
*/
void
_hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
OffsetNumber *itup_offsets, uint16 nitups)
{
OffsetNumber itup_off;
Page page;
uint32 hashkey;
int i;
_hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
page = BufferGetPage(buf);
for (i = 0; i < nitups; i++)
{
Size itemsize;
itemsize = IndexTupleDSize(*itups[i]);
itemsize = MAXALIGN(itemsize);
/* Find where to insert the tuple (preserving page's hashkey ordering) */
hashkey = _hash_get_indextuple_hashkey(itups[i]);
itup_off = _hash_binsearch(page, hashkey);
itup_offsets[i] = itup_off;
if (PageAddItem(page, (Item) itups[i], itemsize, itup_off, false, false)
== InvalidOffsetNumber)
elog(ERROR, "failed to add index item to \"%s\"",
RelationGetRelationName(rel));
}
}
......@@ -391,6 +391,8 @@ _hash_firstfreebit(uint32 map)
* Remove this overflow page from its bucket's chain, and mark the page as
* free. On entry, ovflbuf is write-locked; it is released before exiting.
*
* Add the tuples (itups) to wbuf.
*
* Since this function is invoked in VACUUM, we provide an access strategy
* parameter that controls fetches of the bucket pages.
*
......@@ -403,13 +405,16 @@ _hash_firstfreebit(uint32 map)
* has a lock on same.
*/
BlockNumber
_hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
_hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
Buffer wbuf, IndexTuple *itups, OffsetNumber *itup_offsets,
Size *tups_size, uint16 nitups,
BufferAccessStrategy bstrategy)
{
HashMetaPage metap;
Buffer metabuf;
Buffer mapbuf;
Buffer prevbuf = InvalidBuffer;
Buffer nextbuf = InvalidBuffer;
BlockNumber ovflblkno;
BlockNumber prevblkno;
BlockNumber blkno;
......@@ -434,15 +439,6 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
writeblkno = BufferGetBlockNumber(wbuf);
bucket = ovflopaque->hasho_bucket;
/*
* Zero the page for debugging's sake; then write and release it. (Note:
* if we failed to zero the page here, we'd have problems with the Assert
* in _hash_pageinit() when the page is reused.)
*/
MemSet(ovflpage, 0, BufferGetPageSize(ovflbuf));
MarkBufferDirty(ovflbuf);
_hash_relbuf(rel, ovflbuf);
/*
* Fix up the bucket chain. this is a doubly-linked list, so we must fix
* up the bucket chain members behind and ahead of the overflow page being
......@@ -451,9 +447,6 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
*/
if (BlockNumberIsValid(prevblkno))
{
Page prevpage;
HashPageOpaque prevopaque;
if (prevblkno == writeblkno)
prevbuf = wbuf;
else
......@@ -462,32 +455,13 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
HASH_WRITE,
LH_BUCKET_PAGE | LH_OVERFLOW_PAGE,
bstrategy);
prevpage = BufferGetPage(prevbuf);
prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
Assert(prevopaque->hasho_bucket == bucket);
prevopaque->hasho_nextblkno = nextblkno;
MarkBufferDirty(prevbuf);
if (prevblkno != writeblkno)
_hash_relbuf(rel, prevbuf);
}
if (BlockNumberIsValid(nextblkno))
{
Buffer nextbuf = _hash_getbuf_with_strategy(rel,
nextblkno,
HASH_WRITE,
LH_OVERFLOW_PAGE,
bstrategy);
Page nextpage = BufferGetPage(nextbuf);
HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
Assert(nextopaque->hasho_bucket == bucket);
nextopaque->hasho_prevblkno = prevblkno;
MarkBufferDirty(nextbuf);
_hash_relbuf(rel, nextbuf);
}
nextbuf = _hash_getbuf_with_strategy(rel,
nextblkno,
HASH_WRITE,
LH_OVERFLOW_PAGE,
bstrategy);
/* Note: bstrategy is intentionally not used for metapage and bitmap */
......@@ -508,24 +482,71 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
/* Release metapage lock while we access the bitmap page */
LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
/* Clear the bitmap bit to indicate that this overflow page is free */
/* read the bitmap page to clear the bitmap bit */
mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE, LH_BITMAP_PAGE);
mappage = BufferGetPage(mapbuf);
freep = HashPageGetBitmap(mappage);
Assert(ISSET(freep, bitmapbit));
CLRBIT(freep, bitmapbit);
MarkBufferDirty(mapbuf);
_hash_relbuf(rel, mapbuf);
/* Get write-lock on metapage to update firstfree */
LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
/*
* we have to insert tuples on the "write" page, being careful to preserve
* hashkey ordering. (If we insert many tuples into the same "write" page
* it would be worth qsort'ing them).
*/
if (nitups > 0)
{
_hash_pgaddmultitup(rel, wbuf, itups, itup_offsets, nitups);
MarkBufferDirty(wbuf);
}
/* Initialize the freed overflow page. */
_hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));
MarkBufferDirty(ovflbuf);
if (BufferIsValid(prevbuf))
{
Page prevpage = BufferGetPage(prevbuf);
HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
Assert(prevopaque->hasho_bucket == bucket);
prevopaque->hasho_nextblkno = nextblkno;
MarkBufferDirty(prevbuf);
}
if (BufferIsValid(nextbuf))
{
Page nextpage = BufferGetPage(nextbuf);
HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
Assert(nextopaque->hasho_bucket == bucket);
nextopaque->hasho_prevblkno = prevblkno;
MarkBufferDirty(nextbuf);
}
/* Clear the bitmap bit to indicate that this overflow page is free */
CLRBIT(freep, bitmapbit);
MarkBufferDirty(mapbuf);
/* if this is now the first free page, update hashm_firstfree */
if (ovflbitno < metap->hashm_firstfree)
{
metap->hashm_firstfree = ovflbitno;
MarkBufferDirty(metabuf);
}
/* release previous bucket if it is not same as write bucket */
if (BufferIsValid(prevbuf) && prevblkno != writeblkno)
_hash_relbuf(rel, prevbuf);
if (BufferIsValid(ovflbuf))
_hash_relbuf(rel, ovflbuf);
if (BufferIsValid(nextbuf))
_hash_relbuf(rel, nextbuf);
_hash_relbuf(rel, mapbuf);
_hash_relbuf(rel, metabuf);
return nextblkno;
......@@ -640,7 +661,6 @@ _hash_squeezebucket(Relation rel,
Page rpage;
HashPageOpaque wopaque;
HashPageOpaque ropaque;
bool wbuf_dirty;
/*
* start squeezing into the primary bucket page.
......@@ -686,15 +706,21 @@ _hash_squeezebucket(Relation rel,
/*
* squeeze the tuples.
*/
wbuf_dirty = false;
for (;;)
{
OffsetNumber roffnum;
OffsetNumber maxroffnum;
OffsetNumber deletable[MaxOffsetNumber];
int ndeletable = 0;
IndexTuple itups[MaxIndexTuplesPerPage];
Size tups_size[MaxIndexTuplesPerPage];
OffsetNumber itup_offsets[MaxIndexTuplesPerPage];
uint16 ndeletable = 0;
uint16 nitups = 0;
Size all_tups_size = 0;
int i;
bool retain_pin = false;
readpage:
/* Scan each tuple in "read" page */
maxroffnum = PageGetMaxOffsetNumber(rpage);
for (roffnum = FirstOffsetNumber;
......@@ -715,11 +741,13 @@ _hash_squeezebucket(Relation rel,
/*
* Walk up the bucket chain, looking for a page big enough for
* this item. Exit if we reach the read page.
* this item and all other accumulated items. Exit if we reach
* the read page.
*/
while (PageGetFreeSpace(wpage) < itemsz)
while (PageGetFreeSpaceForMultipleTuples(wpage, nitups + 1) < (all_tups_size + itemsz))
{
Buffer next_wbuf = InvalidBuffer;
bool tups_moved = false;
Assert(!PageIsEmpty(wpage));
......@@ -737,12 +765,30 @@ _hash_squeezebucket(Relation rel,
LH_OVERFLOW_PAGE,
bstrategy);
if (nitups > 0)
{
Assert(nitups == ndeletable);
/*
* we have to insert tuples on the "write" page, being
* careful to preserve hashkey ordering. (If we insert
* many tuples into the same "write" page it would be
* worth qsort'ing them).
*/
_hash_pgaddmultitup(rel, wbuf, itups, itup_offsets, nitups);
MarkBufferDirty(wbuf);
/* Delete tuples we already moved off read page */
PageIndexMultiDelete(rpage, deletable, ndeletable);
MarkBufferDirty(rbuf);
tups_moved = true;
}
/*
* release the lock on previous page after acquiring the lock
* on next page
*/
if (wbuf_dirty)
MarkBufferDirty(wbuf);
if (retain_pin)
LockBuffer(wbuf, BUFFER_LOCK_UNLOCK);
else
......@@ -751,12 +797,6 @@ _hash_squeezebucket(Relation rel,
/* nothing more to do if we reached the read page */
if (rblkno == wblkno)
{
if (ndeletable > 0)
{
/* Delete tuples we already moved off read page */
PageIndexMultiDelete(rpage, deletable, ndeletable);
MarkBufferDirty(rbuf);
}
_hash_relbuf(rel, rbuf);
return;
}
......@@ -765,21 +805,34 @@ _hash_squeezebucket(Relation rel,
wpage = BufferGetPage(wbuf);
wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
Assert(wopaque->hasho_bucket == bucket);
wbuf_dirty = false;
retain_pin = false;
}
/*
* we have found room so insert on the "write" page, being careful
* to preserve hashkey ordering. (If we insert many tuples into
* the same "write" page it would be worth qsort'ing instead of
* doing repeated _hash_pgaddtup.)
*/
(void) _hash_pgaddtup(rel, wbuf, itemsz, itup);
wbuf_dirty = true;
/* be tidy */
for (i = 0; i < nitups; i++)
pfree(itups[i]);
nitups = 0;
all_tups_size = 0;
ndeletable = 0;
/*
* after moving the tuples, rpage would have been compacted,
* so we need to rescan it.
*/
if (tups_moved)
goto readpage;
}
/* remember tuple for deletion from "read" page */
deletable[ndeletable++] = roffnum;
/*
* we need a copy of index tuples as they can be freed as part of
* overflow page, however we need them to write a WAL record in
* _hash_freeovflpage.
*/
itups[nitups] = CopyIndexTuple(itup);
tups_size[nitups++] = itemsz;
all_tups_size += itemsz;
}
/*
......@@ -797,10 +850,12 @@ _hash_squeezebucket(Relation rel,
Assert(BlockNumberIsValid(rblkno));
/* free this overflow page (releases rbuf) */
_hash_freeovflpage(rel, rbuf, wbuf, bstrategy);
_hash_freeovflpage(rel, bucket_buf, rbuf, wbuf, itups, itup_offsets,
tups_size, nitups, bstrategy);
if (wbuf_dirty)
MarkBufferDirty(wbuf);
/* be tidy */
for (i = 0; i < nitups; i++)
pfree(itups[i]);
/* are we freeing the page adjacent to wbuf? */
if (rblkno == wblkno)
......
......@@ -470,7 +470,6 @@ _hash_metapinit(Relation rel, double num_tuples, ForkNumber forkNum)
void
_hash_pageinit(Page page, Size size)
{
Assert(PageIsNew(page));
PageInit(page, size, sizeof(HashPageOpaqueData));
}
......
......@@ -597,6 +597,33 @@ PageGetFreeSpace(Page page)
return (Size) space;
}
/*
* PageGetFreeSpaceForMultipleTuples
* Returns the size of the free (allocatable) space on a page,
* reduced by the space needed for multiple new line pointers.
*
* Note: this should usually only be used on index pages. Use
* PageGetHeapFreeSpace on heap pages.
*/
Size
PageGetFreeSpaceForMultipleTuples(Page page, int ntups)
{
int space;
/*
* Use signed arithmetic here so that we behave sensibly if pd_lower >
* pd_upper.
*/
space = (int) ((PageHeader) page)->pd_upper -
(int) ((PageHeader) page)->pd_lower;
if (space < (int) (ntups * sizeof(ItemIdData)))
return 0;
space -= ntups * sizeof(ItemIdData);
return (Size) space;
}
/*
* PageGetExactFreeSpace
* Returns the size of the free (allocatable) space on a page,
......
......@@ -303,11 +303,14 @@ extern Datum hash_uint32(uint32 k);
extern void _hash_doinsert(Relation rel, IndexTuple itup);
extern OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
Size itemsize, IndexTuple itup);
extern void _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
OffsetNumber *itup_offsets, uint16 nitups);
/* hashovfl.c */
extern Buffer _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf, bool retain_pin);
extern BlockNumber _hash_freeovflpage(Relation rel, Buffer ovflbuf, Buffer wbuf,
BufferAccessStrategy bstrategy);
extern BlockNumber _hash_freeovflpage(Relation rel, Buffer bucketbuf, Buffer ovflbuf,
Buffer wbuf, IndexTuple *itups, OffsetNumber *itup_offsets,
Size *tups_size, uint16 nitups, BufferAccessStrategy bstrategy);
extern void _hash_initbitmap(Relation rel, HashMetaPage metap,
BlockNumber blkno, ForkNumber forkNum);
extern void _hash_squeezebucket(Relation rel,
......
......@@ -425,6 +425,7 @@ extern Page PageGetTempPageCopySpecial(Page page);
extern void PageRestoreTempPage(Page tempPage, Page oldPage);
extern void PageRepairFragmentation(Page page);
extern Size PageGetFreeSpace(Page page);
extern Size PageGetFreeSpaceForMultipleTuples(Page page, int ntups);
extern Size PageGetExactFreeSpace(Page page);
extern Size PageGetHeapFreeSpace(Page page);
extern void PageIndexTupleDelete(Page page, OffsetNumber offset);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment