Commit d70610c4 authored by Tom Lane's avatar Tom Lane

Several fixes for hash indexes that involve changing the on-disk index

layout; therefore, this change forces REINDEX of hash indexes (though
not a full initdb).  Widen hashm_ntuples to double so that hash space
management doesn't get confused by more than 4G entries; enlarge the
allowed number of free-space-bitmap pages; replace the useless bshift
field with a useful bmshift field; eliminate 4 bytes of wasted space
in the per-page special area.
parent 8b2450c8
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.66 2003/09/02 02:18:38 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hash.c,v 1.67 2003/09/02 18:13:29 tgl Exp $
*
* NOTES
* This file contains only the public interface routines.
......@@ -449,9 +449,7 @@ hashbulkdelete(PG_FUNCTION_ARGS)
BlockNumber num_pages;
double tuples_removed;
double num_index_tuples;
uint32 deleted_tuples;
uint32 tuples_remaining;
uint32 orig_ntuples;
double orig_ntuples;
Bucket orig_maxbucket;
Bucket cur_maxbucket;
Bucket cur_bucket;
......@@ -459,15 +457,8 @@ hashbulkdelete(PG_FUNCTION_ARGS)
HashMetaPage metap;
HashMetaPageData local_metapage;
/*
* keep track of counts in both float form (to return) and integer form
* (to update hashm_ntuples). It'd be better to make hashm_ntuples a
* double, but that will have to wait for an initdb.
*/
tuples_removed = 0;
num_index_tuples = 0;
deleted_tuples = 0;
tuples_remaining = 0;
/*
* Read the metapage to fetch original bucket and tuple counts. Also,
......@@ -479,7 +470,7 @@ hashbulkdelete(PG_FUNCTION_ARGS)
*/
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage((Page) metap, LH_META_PAGE);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
orig_maxbucket = metap->hashm_maxbucket;
orig_ntuples = metap->hashm_ntuples;
memcpy(&local_metapage, metap, sizeof(local_metapage));
......@@ -514,7 +505,7 @@ loop_top:
buf = _hash_getbuf(rel, blkno, HASH_WRITE);
page = BufferGetPage(buf);
_hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
_hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
Assert(opaque->hasho_bucket == cur_bucket);
......@@ -546,14 +537,12 @@ loop_top:
maxoffno = OffsetNumberPrev(maxoffno);
tuples_removed += 1;
deleted_tuples += 1;
}
else
{
offno = OffsetNumberNext(offno);
num_index_tuples += 1;
tuples_remaining += 1;
}
}
......@@ -584,7 +573,7 @@ loop_top:
/* Write-lock metapage and check for split since we started */
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage((Page) metap, LH_META_PAGE);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
if (cur_maxbucket != metap->hashm_maxbucket)
{
......@@ -604,7 +593,7 @@ loop_top:
* No one has split or inserted anything since start of scan,
* so believe our count as gospel.
*/
metap->hashm_ntuples = tuples_remaining;
metap->hashm_ntuples = num_index_tuples;
}
else
{
......@@ -613,8 +602,8 @@ loop_top:
* double-scanned tuples in split buckets. Proceed by
* dead-reckoning.
*/
if (metap->hashm_ntuples > deleted_tuples)
metap->hashm_ntuples -= deleted_tuples;
if (metap->hashm_ntuples > tuples_removed)
metap->hashm_ntuples -= tuples_removed;
else
metap->hashm_ntuples = 0;
num_index_tuples = metap->hashm_ntuples;
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.28 2003/09/01 20:26:34 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashinsert.c,v 1.29 2003/09/02 18:13:30 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -43,7 +43,7 @@ _hash_doinsert(Relation rel, HashItem hitem)
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage((Page) metap, LH_META_PAGE);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
/* we need a scan key to do our search, so build one */
itup = &(hitem->hash_itup);
......@@ -57,7 +57,7 @@ _hash_doinsert(Relation rel, HashItem hitem)
*/
_hash_search(rel, natts, itup_scankey, &buf, metap);
page = BufferGetPage(buf);
_hash_checkpage(page, LH_BUCKET_PAGE);
_hash_checkpage(rel, page, LH_BUCKET_PAGE);
/*
* trade in our read lock for a write lock so that we can do the
......@@ -120,10 +120,10 @@ _hash_insertonpg(Relation rel,
Bucket bucket;
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage((Page) metap, LH_META_PAGE);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
page = BufferGetPage(buf);
_hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
_hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
bucket = pageopaque->hasho_bucket;
......@@ -166,7 +166,7 @@ _hash_insertonpg(Relation rel,
elog(ERROR, "hash item too large");
}
}
_hash_checkpage(page, LH_OVERFLOW_PAGE);
_hash_checkpage(rel, page, LH_OVERFLOW_PAGE);
pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
Assert(pageopaque->hasho_bucket == bucket);
}
......@@ -195,7 +195,7 @@ _hash_insertonpg(Relation rel,
if (do_expand ||
(metap->hashm_ntuples / (metap->hashm_maxbucket + 1))
> metap->hashm_ffactor)
> (double) metap->hashm_ffactor)
_hash_expandtable(rel, metabuf);
_hash_relbuf(rel, metabuf, HASH_READ);
return res;
......@@ -220,7 +220,7 @@ _hash_pgaddtup(Relation rel,
Page page;
page = BufferGetPage(buf);
_hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
_hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
itup_off = OffsetNumberNext(PageGetMaxOffsetNumber(page));
if (PageAddItem(page, (Item) hitem, itemsize, itup_off, LP_USED)
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.39 2003/09/02 02:18:38 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashovfl.c,v 1.40 2003/09/02 18:13:30 tgl Exp $
*
* NOTES
* Overflow pages look like ordinary relation pages.
......@@ -97,12 +97,12 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
/* this had better be the last page in a bucket chain */
page = BufferGetPage(buf);
_hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
_hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
Assert(!BlockNumberIsValid(pageopaque->hasho_nextblkno));
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage((Page) metap, LH_META_PAGE);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
/* allocate an empty overflow page */
ovflblkno = _hash_getovflpage(rel, metabuf);
......@@ -114,9 +114,9 @@ _hash_addovflpage(Relation rel, Buffer metabuf, Buffer buf)
ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
ovflopaque->hasho_prevblkno = BufferGetBlockNumber(buf);
ovflopaque->hasho_nextblkno = InvalidBlockNumber;
ovflopaque->hasho_flag = LH_OVERFLOW_PAGE;
ovflopaque->hasho_oaddr = 0;
ovflopaque->hasho_bucket = pageopaque->hasho_bucket;
ovflopaque->hasho_flag = LH_OVERFLOW_PAGE;
ovflopaque->hasho_filler = HASHO_FILL;
_hash_wrtnorelbuf(ovflbuf);
/* logically chain overflow page to previous page */
......@@ -174,7 +174,7 @@ _hash_getovflpage(Relation rel, Buffer metabuf)
mapblkno = metap->hashm_mapp[i];
mapbuf = _hash_getbuf(rel, mapblkno, HASH_WRITE);
mappage = BufferGetPage(mapbuf);
_hash_checkpage(mappage, LH_BITMAP_PAGE);
_hash_checkpage(rel, mappage, LH_BITMAP_PAGE);
freep = HashPageGetBitmap(mappage);
if (i != first_page)
......@@ -310,11 +310,11 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage((Page) metap, LH_META_PAGE);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
ovflblkno = BufferGetBlockNumber(ovflbuf);
ovflpage = BufferGetPage(ovflbuf);
_hash_checkpage(ovflpage, LH_OVERFLOW_PAGE);
_hash_checkpage(rel, ovflpage, LH_OVERFLOW_PAGE);
ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
nextblkno = ovflopaque->hasho_nextblkno;
prevblkno = ovflopaque->hasho_prevblkno;
......@@ -337,7 +337,7 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
Page prevpage = BufferGetPage(prevbuf);
HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);
_hash_checkpage(prevpage, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
_hash_checkpage(rel, prevpage, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
Assert(prevopaque->hasho_bucket == bucket);
prevopaque->hasho_nextblkno = nextblkno;
_hash_wrtbuf(rel, prevbuf);
......@@ -348,7 +348,7 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
Page nextpage = BufferGetPage(nextbuf);
HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);
_hash_checkpage(nextpage, LH_OVERFLOW_PAGE);
_hash_checkpage(rel, nextpage, LH_OVERFLOW_PAGE);
Assert(nextopaque->hasho_bucket == bucket);
nextopaque->hasho_prevblkno = prevblkno;
_hash_wrtbuf(rel, nextbuf);
......@@ -368,7 +368,7 @@ _hash_freeovflpage(Relation rel, Buffer ovflbuf)
mapbuf = _hash_getbuf(rel, blkno, HASH_WRITE);
mappage = BufferGetPage(mapbuf);
_hash_checkpage(mappage, LH_BITMAP_PAGE);
_hash_checkpage(rel, mappage, LH_BITMAP_PAGE);
freep = HashPageGetBitmap(mappage);
CLRBIT(freep, bitmapbit);
_hash_wrtbuf(rel, mapbuf);
......@@ -406,11 +406,11 @@ _hash_initbitmap(Relation rel, HashMetaPage metap, BlockNumber blkno)
pg = BufferGetPage(buf);
_hash_pageinit(pg, BufferGetPageSize(buf));
op = (HashPageOpaque) PageGetSpecialPointer(pg);
op->hasho_oaddr = 0;
op->hasho_prevblkno = InvalidBlockNumber;
op->hasho_nextblkno = InvalidBlockNumber;
op->hasho_flag = LH_BITMAP_PAGE;
op->hasho_bucket = -1;
op->hasho_flag = LH_BITMAP_PAGE;
op->hasho_filler = HASHO_FILL;
/* set all of the bits to 1 */
freep = HashPageGetBitmap(pg);
......@@ -471,7 +471,7 @@ _hash_squeezebucket(Relation rel,
wblkno = bucket_blkno;
wbuf = _hash_getbuf(rel, wblkno, HASH_WRITE);
wpage = BufferGetPage(wbuf);
_hash_checkpage(wpage, LH_BUCKET_PAGE);
_hash_checkpage(rel, wpage, LH_BUCKET_PAGE);
wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
/*
......@@ -495,7 +495,7 @@ _hash_squeezebucket(Relation rel,
_hash_relbuf(rel, rbuf, HASH_WRITE);
rbuf = _hash_getbuf(rel, rblkno, HASH_WRITE);
rpage = BufferGetPage(rbuf);
_hash_checkpage(rpage, LH_OVERFLOW_PAGE);
_hash_checkpage(rel, rpage, LH_OVERFLOW_PAGE);
Assert(!PageIsEmpty(rpage));
ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
Assert(ropaque->hasho_bucket == bucket);
......@@ -531,7 +531,7 @@ _hash_squeezebucket(Relation rel,
wbuf = _hash_getbuf(rel, wblkno, HASH_WRITE);
wpage = BufferGetPage(wbuf);
_hash_checkpage(wpage, LH_OVERFLOW_PAGE);
_hash_checkpage(rel, wpage, LH_OVERFLOW_PAGE);
Assert(!PageIsEmpty(wpage));
wopaque = (HashPageOpaque) PageGetSpecialPointer(wpage);
Assert(wopaque->hasho_bucket == bucket);
......@@ -576,7 +576,7 @@ _hash_squeezebucket(Relation rel,
rbuf = _hash_getbuf(rel, rblkno, HASH_WRITE);
rpage = BufferGetPage(rbuf);
_hash_checkpage(rpage, LH_OVERFLOW_PAGE);
_hash_checkpage(rel, rpage, LH_OVERFLOW_PAGE);
Assert(!PageIsEmpty(rpage));
ropaque = (HashPageOpaque) PageGetSpecialPointer(rpage);
Assert(ropaque->hasho_bucket == bucket);
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.40 2003/09/02 02:18:38 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashpage.c,v 1.41 2003/09/02 18:13:31 tgl Exp $
*
* NOTES
* Postgres hash pages look like ordinary relation pages. The opaque
......@@ -98,11 +98,11 @@ _hash_metapinit(Relation rel)
_hash_pageinit(pg, BufferGetPageSize(metabuf));
pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
pageopaque->hasho_oaddr = 0;
pageopaque->hasho_prevblkno = InvalidBlockNumber;
pageopaque->hasho_nextblkno = InvalidBlockNumber;
pageopaque->hasho_flag = LH_META_PAGE;
pageopaque->hasho_bucket = -1;
pageopaque->hasho_flag = LH_META_PAGE;
pageopaque->hasho_filler = HASHO_FILL;
metap = (HashMetaPage) pg;
......@@ -112,14 +112,17 @@ _hash_metapinit(Relation rel)
metap->hashm_nmaps = 0;
metap->hashm_ffactor = DEFAULT_FFACTOR;
metap->hashm_bsize = BufferGetPageSize(metabuf);
metap->hashm_bshift = _hash_log2(metap->hashm_bsize);
/* page size must be power of 2 */
Assert(metap->hashm_bsize == (1 << metap->hashm_bshift));
/* bitmap size is half of page size, to keep it also power of 2 */
metap->hashm_bmsize = (metap->hashm_bsize >> 1);
Assert(metap->hashm_bsize >= metap->hashm_bmsize +
MAXALIGN(sizeof(PageHeaderData)) +
MAXALIGN(sizeof(HashPageOpaqueData)));
/* find largest bitmap array size that will fit in page size */
for (i = _hash_log2(metap->hashm_bsize); i > 0; --i)
{
if ((1 << i) <= (metap->hashm_bsize -
(MAXALIGN(sizeof(PageHeaderData)) +
MAXALIGN(sizeof(HashPageOpaqueData)))))
break;
}
Assert(i > 0);
metap->hashm_bmsize = 1 << i;
metap->hashm_bmshift = i + BYTE_TO_BIT;
Assert((1 << BMPG_SHIFT(metap)) == (BMPG_MASK(metap) + 1));
metap->hashm_procid = index_getprocid(rel, 1, HASHPROC);
......@@ -147,11 +150,11 @@ _hash_metapinit(Relation rel)
pg = BufferGetPage(buf);
_hash_pageinit(pg, BufferGetPageSize(buf));
pageopaque = (HashPageOpaque) PageGetSpecialPointer(pg);
pageopaque->hasho_oaddr = 0;
pageopaque->hasho_prevblkno = InvalidBlockNumber;
pageopaque->hasho_nextblkno = InvalidBlockNumber;
pageopaque->hasho_flag = LH_BUCKET_PAGE;
pageopaque->hasho_bucket = i;
pageopaque->hasho_flag = LH_BUCKET_PAGE;
pageopaque->hasho_filler = HASHO_FILL;
_hash_wrtbuf(rel, buf);
}
......@@ -343,49 +346,6 @@ _hash_unsetpagelock(Relation rel,
}
}
/*
* Delete a hash index item.
*
* It is safe to delete an item after acquiring a regular WRITE lock on
* the page, because no other backend can hold a READ lock on the page,
* and that means no other backend currently has an indexscan stopped on
* any item of the item being deleted. Our own backend might have such
* an indexscan (in fact *will*, since that's how VACUUM found the item
* in the first place), but _hash_adjscans will fix the scan position.
*/
void
_hash_pagedel(Relation rel, ItemPointer tid)
{
Buffer buf;
Buffer metabuf;
Page page;
BlockNumber blkno;
OffsetNumber offno;
HashMetaPage metap;
HashPageOpaque opaque;
blkno = ItemPointerGetBlockNumber(tid);
offno = ItemPointerGetOffsetNumber(tid);
buf = _hash_getbuf(rel, blkno, HASH_WRITE);
page = BufferGetPage(buf);
_hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
PageIndexTupleDelete(page, offno);
if (PageIsEmpty(page) && (opaque->hasho_flag & LH_OVERFLOW_PAGE))
_hash_freeovflpage(rel, buf);
else
_hash_wrtbuf(rel, buf);
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_WRITE);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage((Page) metap, LH_META_PAGE);
metap->hashm_ntuples--;
_hash_wrtbuf(rel, metabuf);
}
/*
* Expand the hash table by creating one new bucket.
*/
......@@ -398,7 +358,7 @@ _hash_expandtable(Relation rel, Buffer metabuf)
uint32 spare_ndx;
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage((Page) metap, LH_META_PAGE);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
_hash_chgbufaccess(rel, metabuf, HASH_READ, HASH_WRITE);
......@@ -474,7 +434,7 @@ _hash_splitbucket(Relation rel,
TupleDesc itupdesc = RelationGetDescr(rel);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage((Page) metap, LH_META_PAGE);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
/* get the buffers & pages */
start_oblkno = BUCKET_TO_BLKNO(metap, obucket);
......@@ -491,9 +451,9 @@ _hash_splitbucket(Relation rel,
nopaque = (HashPageOpaque) PageGetSpecialPointer(npage);
nopaque->hasho_prevblkno = InvalidBlockNumber;
nopaque->hasho_nextblkno = InvalidBlockNumber;
nopaque->hasho_flag = LH_BUCKET_PAGE;
nopaque->hasho_oaddr = 0;
nopaque->hasho_bucket = nbucket;
nopaque->hasho_flag = LH_BUCKET_PAGE;
nopaque->hasho_filler = HASHO_FILL;
_hash_wrtnorelbuf(nbuf);
/*
......@@ -503,7 +463,7 @@ _hash_splitbucket(Relation rel,
* XXX we should only need this once, if we are careful to preserve the
* invariant that overflow pages are never empty.
*/
_hash_checkpage(opage, LH_BUCKET_PAGE);
_hash_checkpage(rel, opage, LH_BUCKET_PAGE);
oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
if (PageIsEmpty(opage))
{
......@@ -521,7 +481,7 @@ _hash_splitbucket(Relation rel,
}
obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
opage = BufferGetPage(obuf);
_hash_checkpage(opage, LH_OVERFLOW_PAGE);
_hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
if (PageIsEmpty(opage))
elog(ERROR, "empty hash overflow page %u", oblkno);
oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
......@@ -556,7 +516,7 @@ _hash_splitbucket(Relation rel,
_hash_wrtbuf(rel, obuf);
obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
opage = BufferGetPage(obuf);
_hash_checkpage(opage, LH_OVERFLOW_PAGE);
_hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
/* we're guaranteed that an ovfl page has at least 1 tuple */
if (PageIsEmpty(opage))
......@@ -606,7 +566,7 @@ _hash_splitbucket(Relation rel,
_hash_wrtbuf(rel, nbuf);
nbuf = ovflbuf;
npage = BufferGetPage(nbuf);
_hash_checkpage(npage, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
_hash_checkpage(rel, npage, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
}
noffnum = OffsetNumberNext(PageGetMaxOffsetNumber(npage));
......@@ -653,7 +613,7 @@ _hash_splitbucket(Relation rel,
*/
obuf = _hash_getbuf(rel, oblkno, HASH_WRITE);
opage = BufferGetPage(obuf);
_hash_checkpage(opage, LH_OVERFLOW_PAGE);
_hash_checkpage(rel, opage, LH_OVERFLOW_PAGE);
oopaque = (HashPageOpaque) PageGetSpecialPointer(opage);
if (PageIsEmpty(opage))
elog(ERROR, "empty hash overflow page %u", oblkno);
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashsearch.c,v 1.32 2003/09/02 02:18:38 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashsearch.c,v 1.33 2003/09/02 18:13:31 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -34,17 +34,21 @@ _hash_search(Relation rel,
BlockNumber blkno;
Bucket bucket;
if (scankey == NULL)
if (scankey == NULL ||
(scankey[0].sk_flags & SK_ISNULL))
{
/*
* If the scankey is empty, all tuples will satisfy the
* scan so we start the scan at the first bucket (bucket 0).
*
* If the scankey is NULL, no tuples will satisfy the search;
* this should have been checked already, but arbitrarily return
* bucket zero.
*/
bucket = 0;
}
else
{
Assert(!(scankey[0].sk_flags & SK_ISNULL));
bucket = _hash_call(rel, metap, scankey[0].sk_argument);
}
......@@ -96,7 +100,7 @@ _hash_next(IndexScanDesc scan, ScanDirection dir)
current = &(scan->currentItemData);
offnum = ItemPointerGetOffsetNumber(current);
page = BufferGetPage(buf);
_hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
_hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
hitem = (HashItem) PageGetItem(page, PageGetItemId(page, offnum));
itup = &hitem->hash_itup;
scan->xs_ctup.t_self = itup->t_tid;
......@@ -117,7 +121,7 @@ _hash_readnext(Relation rel,
{
*bufp = _hash_getbuf(rel, blkno, HASH_READ);
*pagep = BufferGetPage(*bufp);
_hash_checkpage(*pagep, LH_OVERFLOW_PAGE);
_hash_checkpage(rel, *pagep, LH_OVERFLOW_PAGE);
*opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep);
Assert(!PageIsEmpty(*pagep));
}
......@@ -136,7 +140,7 @@ _hash_readprev(Relation rel,
{
*bufp = _hash_getbuf(rel, blkno, HASH_READ);
*pagep = BufferGetPage(*bufp);
_hash_checkpage(*pagep, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
_hash_checkpage(rel, *pagep, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
*opaquep = (HashPageOpaque) PageGetSpecialPointer(*pagep);
if (PageIsEmpty(*pagep))
{
......@@ -177,7 +181,7 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_READ);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage((Page) metap, LH_META_PAGE);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
/*
* XXX -- The attribute number stored in the scan key is the attno in
......@@ -188,7 +192,7 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
/* find the correct bucket page and load it into buf */
_hash_search(rel, 1, scan->keyData, &buf, metap);
page = BufferGetPage(buf);
_hash_checkpage(page, LH_BUCKET_PAGE);
_hash_checkpage(rel, page, LH_BUCKET_PAGE);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
/*
......@@ -235,7 +239,7 @@ _hash_first(IndexScanDesc scan, ScanDirection dir)
current = &(scan->currentItemData);
offnum = ItemPointerGetOffsetNumber(current);
page = BufferGetPage(buf);
_hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
_hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
hitem = (HashItem) PageGetItem(page, PageGetItemId(page, offnum));
itup = &hitem->hash_itup;
scan->xs_ctup.t_self = itup->t_tid;
......@@ -279,11 +283,11 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
allbuckets = (scan->numberOfKeys < 1);
metap = (HashMetaPage) BufferGetPage(metabuf);
_hash_checkpage((Page) metap, LH_META_PAGE);
_hash_checkpage(rel, (Page) metap, LH_META_PAGE);
buf = *bufP;
page = BufferGetPage(buf);
_hash_checkpage(page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
_hash_checkpage(rel, page, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
/*
......@@ -336,7 +340,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
blkno = BUCKET_TO_BLKNO(metap, bucket);
buf = _hash_getbuf(rel, blkno, HASH_READ);
page = BufferGetPage(buf);
_hash_checkpage(page, LH_BUCKET_PAGE);
_hash_checkpage(rel, page, LH_BUCKET_PAGE);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
Assert(opaque->hasho_bucket == bucket);
while (PageIsEmpty(page) &&
......@@ -386,7 +390,7 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir, Buffer metabuf)
blkno = BUCKET_TO_BLKNO(metap, bucket);
buf = _hash_getbuf(rel, blkno, HASH_READ);
page = BufferGetPage(buf);
_hash_checkpage(page, LH_BUCKET_PAGE);
_hash_checkpage(rel, page, LH_BUCKET_PAGE);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
Assert(opaque->hasho_bucket == bucket);
while (BlockNumberIsValid(opaque->hasho_nextblkno))
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashutil.c,v 1.34 2003/09/02 02:18:38 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/hash/hashutil.c,v 1.35 2003/09/02 18:13:31 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -143,10 +143,33 @@ _hash_log2(uint32 num)
* _hash_checkpage -- sanity checks on the format of all hash pages
*/
void
_hash_checkpage(Page page, int flags)
_hash_checkpage(Relation rel, Page page, int flags)
{
#ifdef USE_ASSERT_CHECKING
Assert(page);
/*
* When checking the metapage, always verify magic number and version.
*/
if (flags == LH_META_PAGE)
{
HashMetaPage metap = (HashMetaPage) page;
if (metap->hashm_magic != HASH_MAGIC)
ereport(ERROR,
(errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("index \"%s\" is not a hash index",
RelationGetRelationName(rel))));
if (metap->hashm_version != HASH_VERSION)
ereport(ERROR,
(errcode(ERRCODE_INDEX_CORRUPTED),
errmsg("index \"%s\" has wrong hash version, please REINDEX it",
RelationGetRelationName(rel))));
}
/*
* These other checks are for debugging purposes only.
*/
#ifdef USE_ASSERT_CHECKING
Assert(((PageHeader) (page))->pd_lower >= SizeOfPageHeaderData);
Assert(((PageHeader) (page))->pd_upper <=
(BLCKSZ - MAXALIGN(sizeof(HashPageOpaqueData))));
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Id: hash.h,v 1.51 2003/09/02 02:18:38 tgl Exp $
* $Id: hash.h,v 1.52 2003/09/02 18:13:32 tgl Exp $
*
* NOTES
* modeled after Margo Seltzer's hash implementation for unix.
......@@ -51,15 +51,24 @@ typedef uint32 Bucket;
typedef struct HashPageOpaqueData
{
bits16 hasho_flag; /* page type code, see above */
Bucket hasho_bucket; /* bucket number this pg belongs to */
bits16 hasho_oaddr; /* no longer used; delete someday */
BlockNumber hasho_nextblkno; /* next ovfl blkno */
BlockNumber hasho_prevblkno; /* previous ovfl (or bucket) blkno */
BlockNumber hasho_nextblkno; /* next ovfl blkno */
Bucket hasho_bucket; /* bucket number this pg belongs to */
uint16 hasho_flag; /* page type code, see above */
uint16 hasho_filler; /* available for future use */
/*
* We presently set hasho_filler to HASHO_FILL (0x1234); this is for
* the convenience of pg_filedump, which otherwise would have a hard
* time telling HashPageOpaqueData from BTPageOpaqueData. If we ever
* need that space for some other purpose, pg_filedump will have to
* find another way.
*/
} HashPageOpaqueData;
typedef HashPageOpaqueData *HashPageOpaque;
#define HASHO_FILL 0x1234
/*
* ScanOpaqueData is used to remember which buffers we're currently
* examining in the scan. We keep these buffers locked and pinned and
......@@ -81,7 +90,7 @@ typedef HashScanOpaqueData *HashScanOpaque;
#define HASH_METAPAGE 0 /* metapage is always block 0 */
#define HASH_MAGIC 0x6440640
#define HASH_VERSION 0
#define HASH_VERSION 1 /* new for Pg 7.4 */
/*
* Spares[] holds the number of overflow pages currently allocated at or
......@@ -99,25 +108,24 @@ typedef HashScanOpaqueData *HashScanOpaque;
*
* The limitation on the size of spares[] comes from the fact that there's
* no point in having more than 2^32 buckets with only uint32 hashcodes.
* There is no particularly good reason for bitmaps[] to be the same size,
* but we're stuck with that until we want to force an initdb. (With 8K
* block size, 32 bitmaps limit us to 8 Gb of overflow space...)
* There is no particular upper limit on the size of mapp[], other than
* needing to fit into the metapage. (With 8K block size, 128 bitmaps
* limit us to 64 Gb of overflow space...)
*/
#define HASH_MAX_SPLITPOINTS 32
#define HASH_MAX_BITMAPS 32
#define HASH_MAX_BITMAPS 128
typedef struct HashMetaPageData
{
PageHeaderData hashm_phdr; /* pad for page header (do not use) */
uint32 hashm_magic; /* magic no. for hash tables */
uint32 hashm_version; /* version ID */
uint32 hashm_ntuples; /* number of tuples stored in the table */
double hashm_ntuples; /* number of tuples stored in the table */
uint16 hashm_ffactor; /* target fill factor (tuples/bucket) */
uint16 hashm_bsize; /* index page size (bytes) - must be a power
* of 2 */
uint16 hashm_bshift; /* log2(bsize) */
uint16 hashm_bmsize; /* bitmap array size (bytes) - must be
* exactly half of hashm_bsize */
uint16 hashm_bsize; /* index page size (bytes) */
uint16 hashm_bmsize; /* bitmap array size (bytes) - must be a
* power of 2 */
uint16 hashm_bmshift; /* log2(bitmap array size in BITS) */
uint32 hashm_maxbucket; /* ID of maximum bucket in use */
uint32 hashm_highmask; /* mask to modulo into entire table */
uint32 hashm_lowmask; /* mask to modulo into lower half of table */
......@@ -125,10 +133,10 @@ typedef struct HashMetaPageData
* allocated */
uint32 hashm_firstfree; /* lowest-number free ovflpage (bit#) */
uint32 hashm_nmaps; /* number of bitmap pages */
RegProcedure hashm_procid; /* hash procedure id from pg_proc */
uint32 hashm_spares[HASH_MAX_SPLITPOINTS]; /* spare pages before
* each splitpoint */
BlockNumber hashm_mapp[HASH_MAX_BITMAPS]; /* blknos of ovfl bitmaps */
RegProcedure hashm_procid; /* hash procedure id from pg_proc */
} HashMetaPageData;
typedef HashMetaPageData *HashMetaPage;
......@@ -151,16 +159,12 @@ typedef HashItemData *HashItem;
* Bitmap pages do not contain tuples. They do contain the standard
* page headers and trailers; however, everything in between is a
* giant bit array. The number of bits that fit on a page obviously
* depends on the page size and the header/trailer overhead. In the
* present implementation, we use exactly half of a page for bitmap,
* so that we have a power-of-2 bits per page.
*
* The fact that the metapage has separate bsize and bmsize fields,
* but only one bshift field, is a design error that ought to be fixed.
* depends on the page size and the header/trailer overhead. We require
* the number of bits per page to be a power of 2.
*/
#define BMPGSZ_BYTE(metap) ((metap)->hashm_bmsize)
#define BMPGSZ_BIT(metap) ((metap)->hashm_bmsize << BYTE_TO_BIT)
#define BMPG_SHIFT(metap) ((metap)->hashm_bshift - 1 + BYTE_TO_BIT)
#define BMPG_SHIFT(metap) ((metap)->hashm_bmshift)
#define BMPG_MASK(metap) (BMPGSZ_BIT(metap) - 1)
#define HashPageGetBitmap(pg) \
((uint32 *) (((char *) (pg)) + MAXALIGN(sizeof(PageHeaderData))))
......@@ -254,7 +258,6 @@ extern void _hash_wrtnorelbuf(Buffer buf);
extern void _hash_chgbufaccess(Relation rel, Buffer buf, int from_access,
int to_access);
extern void _hash_pageinit(Page page, Size size);
extern void _hash_pagedel(Relation rel, ItemPointer tid);
extern void _hash_expandtable(Relation rel, Buffer metabuf);
/* hashscan.c */
......@@ -278,7 +281,7 @@ extern bool _hash_checkqual(IndexScanDesc scan, IndexTuple itup);
extern HashItem _hash_formitem(IndexTuple itup);
extern Bucket _hash_call(Relation rel, HashMetaPage metap, Datum key);
extern uint32 _hash_log2(uint32 num);
extern void _hash_checkpage(Page page, int flags);
extern void _hash_checkpage(Relation rel, Page page, int flags);
/* hash.c */
extern void hash_redo(XLogRecPtr lsn, XLogRecord *record);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment