Commit 6977b8b7 authored by Robert Haas's avatar Robert Haas

Port single-page btree vacuum logic to hash indexes.

This is advantageous for hash indexes for the same reasons it's good
for btrees: it accelerates space recycling, reducing bloat.

Ashutosh Sharma, reviewed by Amit Kapila and by me.  A bit of
additional hacking by me.

Discussion: http://postgr.es/m/CAE9k0PkRSyzx8dOnokEpUi2A-RFZK72WN0h9DEMv_ut9q6bPRw@mail.gmail.com
parent 2038bf41
...@@ -284,7 +284,10 @@ The insertion algorithm is rather similar: ...@@ -284,7 +284,10 @@ The insertion algorithm is rather similar:
if we get the lock on both the buckets if we get the lock on both the buckets
finish the split using algorithm mentioned below for split finish the split using algorithm mentioned below for split
release the pin on old bucket and restart the insert from beginning. release the pin on old bucket and restart the insert from beginning.
if current page is full, release lock but not pin, read/exclusive-lock if current page is full, first check if this page contains any dead tuples.
if yes, remove dead tuples from the current page and again check for the
availability of the space. If enough space found, insert the tuple else
release lock but not pin, read/exclusive-lock
next page; repeat as needed next page; repeat as needed
>> see below if no space in any page of bucket >> see below if no space in any page of bucket
take buffer content lock in exclusive mode on metapage take buffer content lock in exclusive mode on metapage
......
...@@ -36,6 +36,7 @@ typedef struct ...@@ -36,6 +36,7 @@ typedef struct
{ {
HSpool *spool; /* NULL if not using spooling */ HSpool *spool; /* NULL if not using spooling */
double indtuples; /* # tuples accepted into index */ double indtuples; /* # tuples accepted into index */
Relation heapRel; /* heap relation descriptor */
} HashBuildState; } HashBuildState;
static void hashbuildCallback(Relation index, static void hashbuildCallback(Relation index,
...@@ -154,6 +155,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo) ...@@ -154,6 +155,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
/* prepare to build the index */ /* prepare to build the index */
buildstate.indtuples = 0; buildstate.indtuples = 0;
buildstate.heapRel = heap;
/* do the heap scan */ /* do the heap scan */
reltuples = IndexBuildHeapScan(heap, index, indexInfo, true, reltuples = IndexBuildHeapScan(heap, index, indexInfo, true,
...@@ -162,7 +164,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo) ...@@ -162,7 +164,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
if (buildstate.spool) if (buildstate.spool)
{ {
/* sort the tuples and insert them into the index */ /* sort the tuples and insert them into the index */
_h_indexbuild(buildstate.spool); _h_indexbuild(buildstate.spool, buildstate.heapRel);
_h_spooldestroy(buildstate.spool); _h_spooldestroy(buildstate.spool);
} }
...@@ -218,7 +220,7 @@ hashbuildCallback(Relation index, ...@@ -218,7 +220,7 @@ hashbuildCallback(Relation index,
itup = index_form_tuple(RelationGetDescr(index), itup = index_form_tuple(RelationGetDescr(index),
index_values, index_isnull); index_values, index_isnull);
itup->t_tid = htup->t_self; itup->t_tid = htup->t_self;
_hash_doinsert(index, itup); _hash_doinsert(index, itup, buildstate->heapRel);
pfree(itup); pfree(itup);
} }
...@@ -251,7 +253,7 @@ hashinsert(Relation rel, Datum *values, bool *isnull, ...@@ -251,7 +253,7 @@ hashinsert(Relation rel, Datum *values, bool *isnull,
itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull); itup = index_form_tuple(RelationGetDescr(rel), index_values, index_isnull);
itup->t_tid = *ht_ctid; itup->t_tid = *ht_ctid;
_hash_doinsert(rel, itup); _hash_doinsert(rel, itup, heapRel);
pfree(itup); pfree(itup);
...@@ -331,14 +333,24 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir) ...@@ -331,14 +333,24 @@ hashgettuple(IndexScanDesc scan, ScanDirection dir)
if (scan->kill_prior_tuple) if (scan->kill_prior_tuple)
{ {
/* /*
* Yes, so mark it by setting the LP_DEAD state in the item flags. * Yes, so remember it for later. (We'll deal with all such
*/ * tuples at once right after leaving the index page or at
ItemIdMarkDead(PageGetItemId(page, offnum)); * end of scan.) In case if caller reverses the indexscan
* direction it is quite possible that the same item might
/* * get entered multiple times. But, we don't detect that;
* Since this can be redone later if needed, mark as a hint. * instead, we just forget any excess entries.
*/ */
MarkBufferDirtyHint(buf, true); if (so->killedItems == NULL)
so->killedItems = palloc(MaxIndexTuplesPerPage *
sizeof(HashScanPosItem));
if (so->numKilled < MaxIndexTuplesPerPage)
{
so->killedItems[so->numKilled].heapTid = so->hashso_heappos;
so->killedItems[so->numKilled].indexOffset =
ItemPointerGetOffsetNumber(&(so->hashso_curpos));
so->numKilled++;
}
} }
/* /*
...@@ -446,6 +458,9 @@ hashbeginscan(Relation rel, int nkeys, int norderbys) ...@@ -446,6 +458,9 @@ hashbeginscan(Relation rel, int nkeys, int norderbys)
so->hashso_buc_populated = false; so->hashso_buc_populated = false;
so->hashso_buc_split = false; so->hashso_buc_split = false;
so->killedItems = NULL;
so->numKilled = 0;
scan->opaque = so; scan->opaque = so;
return scan; return scan;
...@@ -461,6 +476,10 @@ hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys, ...@@ -461,6 +476,10 @@ hashrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
HashScanOpaque so = (HashScanOpaque) scan->opaque; HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation; Relation rel = scan->indexRelation;
/* Before leaving current page, deal with any killed items */
if (so->numKilled > 0)
_hash_kill_items(scan);
_hash_dropscanbuf(rel, so); _hash_dropscanbuf(rel, so);
/* set position invalid (this will cause _hash_first call) */ /* set position invalid (this will cause _hash_first call) */
...@@ -488,8 +507,14 @@ hashendscan(IndexScanDesc scan) ...@@ -488,8 +507,14 @@ hashendscan(IndexScanDesc scan)
HashScanOpaque so = (HashScanOpaque) scan->opaque; HashScanOpaque so = (HashScanOpaque) scan->opaque;
Relation rel = scan->indexRelation; Relation rel = scan->indexRelation;
/* Before leaving current page, deal with any killed items */
if (so->numKilled > 0)
_hash_kill_items(scan);
_hash_dropscanbuf(rel, so); _hash_dropscanbuf(rel, so);
if (so->killedItems != NULL)
pfree(so->killedItems);
pfree(so); pfree(so);
scan->opaque = NULL; scan->opaque = NULL;
} }
...@@ -848,6 +873,16 @@ hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf, ...@@ -848,6 +873,16 @@ hashbucketcleanup(Relation rel, Bucket cur_bucket, Buffer bucket_buf,
PageIndexMultiDelete(page, deletable, ndeletable); PageIndexMultiDelete(page, deletable, ndeletable);
bucket_dirty = true; bucket_dirty = true;
/*
* Let us mark the page as clean if vacuum removes the DEAD tuples
* from an index page. We do this by clearing LH_PAGE_HAS_DEAD_TUPLES
* flag. Clearing this flag is just a hint; replay won't redo this.
*/
if (tuples_removed && *tuples_removed > 0 &&
opaque->hasho_flag & LH_PAGE_HAS_DEAD_TUPLES)
opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
MarkBufferDirty(buf); MarkBufferDirty(buf);
/* XLOG stuff */ /* XLOG stuff */
......
...@@ -14,10 +14,15 @@ ...@@ -14,10 +14,15 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "access/heapam_xlog.h"
#include "access/bufmask.h" #include "access/bufmask.h"
#include "access/hash.h" #include "access/hash.h"
#include "access/hash_xlog.h" #include "access/hash_xlog.h"
#include "access/xlogutils.h" #include "access/xlogutils.h"
#include "access/xlog.h"
#include "access/transam.h"
#include "storage/procarray.h"
#include "miscadmin.h"
/* /*
* replay a hash index meta page * replay a hash index meta page
...@@ -915,6 +920,235 @@ hash_xlog_update_meta_page(XLogReaderState *record) ...@@ -915,6 +920,235 @@ hash_xlog_update_meta_page(XLogReaderState *record)
UnlockReleaseBuffer(metabuf); UnlockReleaseBuffer(metabuf);
} }
/*
* Get the latestRemovedXid from the heap pages pointed at by the index
* tuples being deleted. See also btree_xlog_delete_get_latestRemovedXid,
* on which this function is based.
*/
static TransactionId
hash_xlog_vacuum_get_latestRemovedXid(XLogReaderState *record)
{
xl_hash_vacuum_one_page *xlrec;
OffsetNumber *unused;
Buffer ibuffer,
hbuffer;
Page ipage,
hpage;
RelFileNode rnode;
BlockNumber blkno;
ItemId iitemid,
hitemid;
IndexTuple itup;
HeapTupleHeader htuphdr;
BlockNumber hblkno;
OffsetNumber hoffnum;
TransactionId latestRemovedXid = InvalidTransactionId;
int i;
char *ptr;
Size len;
xlrec = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
/*
* If there's nothing running on the standby we don't need to derive a
* full latestRemovedXid value, so use a fast path out of here. This
* returns InvalidTransactionId, and so will conflict with all HS
* transactions; but since we just worked out that that's zero people,
* it's OK.
*
* XXX There is a race condition here, which is that a new backend might
* start just after we look. If so, it cannot need to conflict, but this
* coding will result in throwing a conflict anyway.
*/
if (CountDBBackends(InvalidOid) == 0)
return latestRemovedXid;
/*
* Get index page. If the DB is consistent, this should not fail, nor
* should any of the heap page fetches below. If one does, we return
* InvalidTransactionId to cancel all HS transactions. That's probably
* overkill, but it's safe, and certainly better than panicking here.
*/
XLogRecGetBlockTag(record, 1, &rnode, NULL, &blkno);
ibuffer = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, RBM_NORMAL);
if (!BufferIsValid(ibuffer))
return InvalidTransactionId;
LockBuffer(ibuffer, HASH_READ);
ipage = (Page) BufferGetPage(ibuffer);
/*
* Loop through the deleted index items to obtain the TransactionId from
* the heap items they point to.
*/
ptr = XLogRecGetBlockData(record, 1, &len);
unused = (OffsetNumber *) ptr;
for (i = 0; i < xlrec->ntuples; i++)
{
/*
* Identify the index tuple about to be deleted.
*/
iitemid = PageGetItemId(ipage, unused[i]);
itup = (IndexTuple) PageGetItem(ipage, iitemid);
/*
* Locate the heap page that the index tuple points at
*/
hblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
hbuffer = XLogReadBufferExtended(xlrec->hnode, MAIN_FORKNUM,
hblkno, RBM_NORMAL);
if (!BufferIsValid(hbuffer))
{
UnlockReleaseBuffer(ibuffer);
return InvalidTransactionId;
}
LockBuffer(hbuffer, HASH_READ);
hpage = (Page) BufferGetPage(hbuffer);
/*
* Look up the heap tuple header that the index tuple points at by
* using the heap node supplied with the xlrec. We can't use
* heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer.
* Note that we are not looking at tuple data here, just headers.
*/
hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid));
hitemid = PageGetItemId(hpage, hoffnum);
/*
* Follow any redirections until we find something useful.
*/
while (ItemIdIsRedirected(hitemid))
{
hoffnum = ItemIdGetRedirect(hitemid);
hitemid = PageGetItemId(hpage, hoffnum);
CHECK_FOR_INTERRUPTS();
}
/*
* If the heap item has storage, then read the header and use that to
* set latestRemovedXid.
*
* Some LP_DEAD items may not be accessible, so we ignore them.
*/
if (ItemIdHasStorage(hitemid))
{
htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
}
else if (ItemIdIsDead(hitemid))
{
/*
* Conjecture: if hitemid is dead then it had xids before the xids
* marked on LP_NORMAL items. So we just ignore this item and move
* onto the next, for the purposes of calculating
* latestRemovedxids.
*/
}
else
Assert(!ItemIdIsUsed(hitemid));
UnlockReleaseBuffer(hbuffer);
}
UnlockReleaseBuffer(ibuffer);
/*
* If all heap tuples were LP_DEAD then we will be returning
* InvalidTransactionId here, which avoids conflicts. This matches
* existing logic which assumes that LP_DEAD tuples must already be older
* than the latestRemovedXid on the cleanup record that set them as
* LP_DEAD, hence must already have generated a conflict.
*/
return latestRemovedXid;
}
/*
* replay delete operation in hash index to remove
* tuples marked as DEAD during index tuple insertion.
*/
static void
hash_xlog_vacuum_one_page(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
xl_hash_vacuum_one_page *xldata;
Buffer buffer;
Buffer metabuf;
Page page;
XLogRedoAction action;
xldata = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
/*
* If we have any conflict processing to do, it must happen before we
* update the page.
*
* Hash index records that are marked as LP_DEAD and being removed during
* hash index tuple insertion can conflict with standby queries. You might
* think that vacuum records would conflict as well, but we've handled
* that already. XLOG_HEAP2_CLEANUP_INFO records provide the highest xid
* cleaned by the vacuum of the heap and so we can resolve any conflicts
* just once when that arrives. After that we know that no conflicts
* exist from individual hash index vacuum records on that index.
*/
if (InHotStandby)
{
TransactionId latestRemovedXid =
hash_xlog_vacuum_get_latestRemovedXid(record);
RelFileNode rnode;
XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
}
action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
if (action == BLK_NEEDS_REDO)
{
char *ptr;
Size len;
ptr = XLogRecGetBlockData(record, 0, &len);
page = (Page) BufferGetPage(buffer);
if (len > 0)
{
OffsetNumber *unused;
OffsetNumber *unend;
unused = (OffsetNumber *) ptr;
unend = (OffsetNumber *) ((char *) ptr + len);
if ((unend - unused) > 0)
PageIndexMultiDelete(page, unused, unend - unused);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (BufferIsValid(buffer))
UnlockReleaseBuffer(buffer);
if (XLogReadBufferForRedo(record, 1, &metabuf) == BLK_NEEDS_REDO)
{
Page metapage;
HashMetaPage metap;
metapage = BufferGetPage(metabuf);
metap = HashPageGetMeta(metapage);
metap->hashm_ntuples -= xldata->ntuples;
PageSetLSN(metapage, lsn);
MarkBufferDirty(metabuf);
}
if (BufferIsValid(metabuf))
UnlockReleaseBuffer(metabuf);
}
void void
hash_redo(XLogReaderState *record) hash_redo(XLogReaderState *record)
{ {
...@@ -958,6 +1192,9 @@ hash_redo(XLogReaderState *record) ...@@ -958,6 +1192,9 @@ hash_redo(XLogReaderState *record)
case XLOG_HASH_UPDATE_META_PAGE: case XLOG_HASH_UPDATE_META_PAGE:
hash_xlog_update_meta_page(record); hash_xlog_update_meta_page(record);
break; break;
case XLOG_HASH_VACUUM_ONE_PAGE:
hash_xlog_vacuum_one_page(record);
break;
default: default:
elog(PANIC, "hash_redo: unknown op code %u", info); elog(PANIC, "hash_redo: unknown op code %u", info);
} }
......
...@@ -17,9 +17,14 @@ ...@@ -17,9 +17,14 @@
#include "access/hash.h" #include "access/hash.h"
#include "access/hash_xlog.h" #include "access/hash_xlog.h"
#include "access/heapam.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "storage/lwlock.h"
#include "storage/buf_internals.h"
static void _hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
RelFileNode hnode);
/* /*
* _hash_doinsert() -- Handle insertion of a single index tuple. * _hash_doinsert() -- Handle insertion of a single index tuple.
...@@ -28,7 +33,7 @@ ...@@ -28,7 +33,7 @@
* and hashinsert. By here, itup is completely filled in. * and hashinsert. By here, itup is completely filled in.
*/ */
void void
_hash_doinsert(Relation rel, IndexTuple itup) _hash_doinsert(Relation rel, IndexTuple itup, Relation heapRel)
{ {
Buffer buf = InvalidBuffer; Buffer buf = InvalidBuffer;
Buffer bucket_buf; Buffer bucket_buf;
...@@ -118,10 +123,30 @@ restart_insert: ...@@ -118,10 +123,30 @@ restart_insert:
/* Do the insertion */ /* Do the insertion */
while (PageGetFreeSpace(page) < itemsz) while (PageGetFreeSpace(page) < itemsz)
{ {
BlockNumber nextblkno;
/*
* Check if current page has any DEAD tuples. If yes,
* delete these tuples and see if we can get a space for
* the new item to be inserted before moving to the next
* page in the bucket chain.
*/
if (H_HAS_DEAD_TUPLES(pageopaque))
{
if (IsBufferCleanupOK(buf))
{
_hash_vacuum_one_page(rel, metabuf, buf, heapRel->rd_node);
if (PageGetFreeSpace(page) >= itemsz)
break; /* OK, now we have enough space */
}
}
/* /*
* no space on this page; check for an overflow page * no space on this page; check for an overflow page
*/ */
BlockNumber nextblkno = pageopaque->hasho_nextblkno; nextblkno = pageopaque->hasho_nextblkno;
if (BlockNumberIsValid(nextblkno)) if (BlockNumberIsValid(nextblkno))
{ {
...@@ -157,7 +182,7 @@ restart_insert: ...@@ -157,7 +182,7 @@ restart_insert:
Assert(PageGetFreeSpace(page) >= itemsz); Assert(PageGetFreeSpace(page) >= itemsz);
} }
pageopaque = (HashPageOpaque) PageGetSpecialPointer(page); pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
Assert(pageopaque->hasho_flag == LH_OVERFLOW_PAGE); Assert((pageopaque->hasho_flag & LH_PAGE_TYPE) == LH_OVERFLOW_PAGE);
Assert(pageopaque->hasho_bucket == bucket); Assert(pageopaque->hasho_bucket == bucket);
} }
...@@ -300,3 +325,93 @@ _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups, ...@@ -300,3 +325,93 @@ _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
RelationGetRelationName(rel)); RelationGetRelationName(rel));
} }
} }
/*
* _hash_vacuum_one_page - vacuum just one index page.
*
* Try to remove LP_DEAD items from the given page. We must acquire cleanup
* lock on the page being modified before calling this function.
*/
static void
_hash_vacuum_one_page(Relation rel, Buffer metabuf, Buffer buf,
RelFileNode hnode)
{
OffsetNumber deletable[MaxOffsetNumber];
int ndeletable = 0;
OffsetNumber offnum,
maxoff;
Page page = BufferGetPage(buf);
HashPageOpaque pageopaque;
HashMetaPage metap;
double tuples_removed = 0;
/* Scan each tuple in page to see if it is marked as LP_DEAD */
maxoff = PageGetMaxOffsetNumber(page);
for (offnum = FirstOffsetNumber;
offnum <= maxoff;
offnum = OffsetNumberNext(offnum))
{
ItemId itemId = PageGetItemId(page, offnum);
if (ItemIdIsDead(itemId))
{
deletable[ndeletable++] = offnum;
tuples_removed += 1;
}
}
if (ndeletable > 0)
{
/*
* Write-lock the meta page so that we can decrement
* tuple count.
*/
LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
/* No ereport(ERROR) until changes are logged */
START_CRIT_SECTION();
PageIndexMultiDelete(page, deletable, ndeletable);
pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
metap = HashPageGetMeta(BufferGetPage(metabuf));
metap->hashm_ntuples -= tuples_removed;
MarkBufferDirty(buf);
MarkBufferDirty(metabuf);
/* XLOG stuff */
if (RelationNeedsWAL(rel))
{
xl_hash_vacuum_one_page xlrec;
XLogRecPtr recptr;
xlrec.hnode = hnode;
xlrec.ntuples = tuples_removed;
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, SizeOfHashVacuumOnePage);
XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
XLogRegisterBufData(0, (char *) deletable,
ndeletable * sizeof(OffsetNumber));
XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_VACUUM_ONE_PAGE);
PageSetLSN(BufferGetPage(buf), recptr);
PageSetLSN(BufferGetPage(metabuf), recptr);
}
END_CRIT_SECTION();
/*
* Releasing write lock on meta page as we have updated
* the tuple count.
*/
LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
}
}
...@@ -465,6 +465,10 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir) ...@@ -465,6 +465,10 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
break; /* yes, so exit for-loop */ break; /* yes, so exit for-loop */
} }
/* Before leaving current page, deal with any killed items */
if (so->numKilled > 0)
_hash_kill_items(scan);
/* /*
* ran off the end of this page, try the next * ran off the end of this page, try the next
*/ */
...@@ -518,6 +522,10 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir) ...@@ -518,6 +522,10 @@ _hash_step(IndexScanDesc scan, Buffer *bufP, ScanDirection dir)
break; /* yes, so exit for-loop */ break; /* yes, so exit for-loop */
} }
/* Before leaving current page, deal with any killed items */
if (so->numKilled > 0)
_hash_kill_items(scan);
/* /*
* ran off the end of this page, try the next * ran off the end of this page, try the next
*/ */
......
...@@ -101,7 +101,7 @@ _h_spool(HSpool *hspool, ItemPointer self, Datum *values, bool *isnull) ...@@ -101,7 +101,7 @@ _h_spool(HSpool *hspool, ItemPointer self, Datum *values, bool *isnull)
* create an entire index. * create an entire index.
*/ */
void void
_h_indexbuild(HSpool *hspool) _h_indexbuild(HSpool *hspool, Relation heapRel)
{ {
IndexTuple itup; IndexTuple itup;
#ifdef USE_ASSERT_CHECKING #ifdef USE_ASSERT_CHECKING
...@@ -126,6 +126,6 @@ _h_indexbuild(HSpool *hspool) ...@@ -126,6 +126,6 @@ _h_indexbuild(HSpool *hspool)
Assert(hashkey >= lasthashkey); Assert(hashkey >= lasthashkey);
#endif #endif
_hash_doinsert(hspool->index, itup); _hash_doinsert(hspool->index, itup, heapRel);
} }
} }
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "access/relscan.h" #include "access/relscan.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "storage/buf_internals.h"
#define CALC_NEW_BUCKET(old_bucket, lowmask) \ #define CALC_NEW_BUCKET(old_bucket, lowmask) \
old_bucket | (lowmask + 1) old_bucket | (lowmask + 1)
...@@ -446,3 +447,70 @@ _hash_get_newbucket_from_oldbucket(Relation rel, Bucket old_bucket, ...@@ -446,3 +447,70 @@ _hash_get_newbucket_from_oldbucket(Relation rel, Bucket old_bucket,
return new_bucket; return new_bucket;
} }
/*
* _hash_kill_items - set LP_DEAD state for items an indexscan caller has
* told us were killed.
*
* scan->opaque, referenced locally through so, contains information about the
* current page and killed tuples thereon (generally, this should only be
* called if so->numKilled > 0).
*
* We match items by heap TID before assuming they are the right ones to
* delete.
*/
void
_hash_kill_items(IndexScanDesc scan)
{
HashScanOpaque so = (HashScanOpaque) scan->opaque;
Page page;
HashPageOpaque opaque;
OffsetNumber offnum, maxoff;
int numKilled = so->numKilled;
int i;
bool killedsomething = false;
Assert(so->numKilled > 0);
Assert(so->killedItems != NULL);
/*
* Always reset the scan state, so we don't look for same
* items on other pages.
*/
so->numKilled = 0;
page = BufferGetPage(so->hashso_curbuf);
opaque = (HashPageOpaque) PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
for (i = 0; i < numKilled; i++)
{
offnum = so->killedItems[i].indexOffset;
while (offnum <= maxoff)
{
ItemId iid = PageGetItemId(page, offnum);
IndexTuple ituple = (IndexTuple) PageGetItem(page, iid);
if (ItemPointerEquals(&ituple->t_tid, &so->killedItems[i].heapTid))
{
/* found the item */
ItemIdMarkDead(iid);
killedsomething = true;
break; /* out of inner search loop */
}
offnum = OffsetNumberNext(offnum);
}
}
/*
* Since this can be redone later if needed, mark as dirty hint.
* Whenever we mark anything LP_DEAD, we also set the page's
* LH_PAGE_HAS_DEAD_TUPLES flag, which is likewise just a hint.
*/
if (killedsomething)
{
opaque->hasho_flag |= LH_PAGE_HAS_DEAD_TUPLES;
MarkBufferDirtyHint(so->hashso_curbuf, true);
}
}
...@@ -154,6 +154,8 @@ hash_identify(uint8 info) ...@@ -154,6 +154,8 @@ hash_identify(uint8 info)
case XLOG_HASH_UPDATE_META_PAGE: case XLOG_HASH_UPDATE_META_PAGE:
id = "UPDATE_META_PAGE"; id = "UPDATE_META_PAGE";
break; break;
case XLOG_HASH_VACUUM_ONE_PAGE:
id = "VACUUM_ONE_PAGE";
} }
return id; return id;
......
...@@ -57,6 +57,7 @@ typedef uint32 Bucket; ...@@ -57,6 +57,7 @@ typedef uint32 Bucket;
#define LH_BUCKET_BEING_POPULATED (1 << 4) #define LH_BUCKET_BEING_POPULATED (1 << 4)
#define LH_BUCKET_BEING_SPLIT (1 << 5) #define LH_BUCKET_BEING_SPLIT (1 << 5)
#define LH_BUCKET_NEEDS_SPLIT_CLEANUP (1 << 6) #define LH_BUCKET_NEEDS_SPLIT_CLEANUP (1 << 6)
#define LH_PAGE_HAS_DEAD_TUPLES (1 << 7)
#define LH_PAGE_TYPE \ #define LH_PAGE_TYPE \
(LH_OVERFLOW_PAGE|LH_BUCKET_PAGE|LH_BITMAP_PAGE|LH_META_PAGE) (LH_OVERFLOW_PAGE|LH_BUCKET_PAGE|LH_BITMAP_PAGE|LH_META_PAGE)
...@@ -86,6 +87,7 @@ typedef HashPageOpaqueData *HashPageOpaque; ...@@ -86,6 +87,7 @@ typedef HashPageOpaqueData *HashPageOpaque;
#define H_NEEDS_SPLIT_CLEANUP(opaque) ((opaque)->hasho_flag & LH_BUCKET_NEEDS_SPLIT_CLEANUP) #define H_NEEDS_SPLIT_CLEANUP(opaque) ((opaque)->hasho_flag & LH_BUCKET_NEEDS_SPLIT_CLEANUP)
#define H_BUCKET_BEING_SPLIT(opaque) ((opaque)->hasho_flag & LH_BUCKET_BEING_SPLIT) #define H_BUCKET_BEING_SPLIT(opaque) ((opaque)->hasho_flag & LH_BUCKET_BEING_SPLIT)
#define H_BUCKET_BEING_POPULATED(opaque) ((opaque)->hasho_flag & LH_BUCKET_BEING_POPULATED) #define H_BUCKET_BEING_POPULATED(opaque) ((opaque)->hasho_flag & LH_BUCKET_BEING_POPULATED)
#define H_HAS_DEAD_TUPLES(opaque) ((opaque)->hasho_flag & LH_PAGE_HAS_DEAD_TUPLES)
/* /*
* The page ID is for the convenience of pg_filedump and similar utilities, * The page ID is for the convenience of pg_filedump and similar utilities,
...@@ -95,6 +97,13 @@ typedef HashPageOpaqueData *HashPageOpaque; ...@@ -95,6 +97,13 @@ typedef HashPageOpaqueData *HashPageOpaque;
*/ */
#define HASHO_PAGE_ID 0xFF80 #define HASHO_PAGE_ID 0xFF80
typedef struct HashScanPosItem /* what we remember about each match */
{
ItemPointerData heapTid; /* TID of referenced heap item */
OffsetNumber indexOffset; /* index item's location within page */
} HashScanPosItem;
/* /*
* HashScanOpaqueData is private state for a hash index scan. * HashScanOpaqueData is private state for a hash index scan.
*/ */
...@@ -135,6 +144,9 @@ typedef struct HashScanOpaqueData ...@@ -135,6 +144,9 @@ typedef struct HashScanOpaqueData
* referred only when hashso_buc_populated is true. * referred only when hashso_buc_populated is true.
*/ */
bool hashso_buc_split; bool hashso_buc_split;
/* info about killed items if any (killedItems is NULL if never used) */
HashScanPosItem *killedItems; /* tids and offset numbers of killed items */
int numKilled; /* number of currently stored items */
} HashScanOpaqueData; } HashScanOpaqueData;
typedef HashScanOpaqueData *HashScanOpaque; typedef HashScanOpaqueData *HashScanOpaque;
...@@ -300,7 +312,7 @@ extern Datum hash_uint32(uint32 k); ...@@ -300,7 +312,7 @@ extern Datum hash_uint32(uint32 k);
/* private routines */ /* private routines */
/* hashinsert.c */ /* hashinsert.c */
extern void _hash_doinsert(Relation rel, IndexTuple itup); extern void _hash_doinsert(Relation rel, IndexTuple itup, Relation heapRel);
extern OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf, extern OffsetNumber _hash_pgaddtup(Relation rel, Buffer buf,
Size itemsize, IndexTuple itup); Size itemsize, IndexTuple itup);
extern void _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups, extern void _hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
...@@ -361,7 +373,7 @@ extern HSpool *_h_spoolinit(Relation heap, Relation index, uint32 num_buckets); ...@@ -361,7 +373,7 @@ extern HSpool *_h_spoolinit(Relation heap, Relation index, uint32 num_buckets);
extern void _h_spooldestroy(HSpool *hspool); extern void _h_spooldestroy(HSpool *hspool);
extern void _h_spool(HSpool *hspool, ItemPointer self, extern void _h_spool(HSpool *hspool, ItemPointer self,
Datum *values, bool *isnull); Datum *values, bool *isnull);
extern void _h_indexbuild(HSpool *hspool); extern void _h_indexbuild(HSpool *hspool, Relation heapRel);
/* hashutil.c */ /* hashutil.c */
extern bool _hash_checkqual(IndexScanDesc scan, IndexTuple itup); extern bool _hash_checkqual(IndexScanDesc scan, IndexTuple itup);
...@@ -381,6 +393,7 @@ extern BlockNumber _hash_get_oldblock_from_newbucket(Relation rel, Bucket new_bu ...@@ -381,6 +393,7 @@ extern BlockNumber _hash_get_oldblock_from_newbucket(Relation rel, Bucket new_bu
extern BlockNumber _hash_get_newblock_from_oldbucket(Relation rel, Bucket old_bucket); extern BlockNumber _hash_get_newblock_from_oldbucket(Relation rel, Bucket old_bucket);
extern Bucket _hash_get_newbucket_from_oldbucket(Relation rel, Bucket old_bucket, extern Bucket _hash_get_newbucket_from_oldbucket(Relation rel, Bucket old_bucket,
uint32 lowmask, uint32 maxbucket); uint32 lowmask, uint32 maxbucket);
extern void _hash_kill_items(IndexScanDesc scan);
/* hash.c */ /* hash.c */
extern void hashbucketcleanup(Relation rel, Bucket cur_bucket, extern void hashbucketcleanup(Relation rel, Bucket cur_bucket,
......
...@@ -44,6 +44,7 @@ ...@@ -44,6 +44,7 @@
#define XLOG_HASH_UPDATE_META_PAGE 0xB0 /* update meta page after #define XLOG_HASH_UPDATE_META_PAGE 0xB0 /* update meta page after
* vacuum */ * vacuum */
#define XLOG_HASH_VACUUM_ONE_PAGE 0xC0 /* remove dead tuples from index page */
/* /*
* xl_hash_split_allocate_page flag values, 8 bits are available. * xl_hash_split_allocate_page flag values, 8 bits are available.
...@@ -250,6 +251,24 @@ typedef struct xl_hash_init_bitmap_page ...@@ -250,6 +251,24 @@ typedef struct xl_hash_init_bitmap_page
#define SizeOfHashInitBitmapPage \ #define SizeOfHashInitBitmapPage \
(offsetof(xl_hash_init_bitmap_page, bmsize) + sizeof(uint16)) (offsetof(xl_hash_init_bitmap_page, bmsize) + sizeof(uint16))
/*
* This is what we need for index tuple deletion and to
* update the meta page.
*
* This data record is used for XLOG_HASH_VACUUM_ONE_PAGE
*
* Backup Blk 0: bucket page
* Backup Blk 1: meta page
*/
typedef struct xl_hash_vacuum_one_page
{
RelFileNode hnode;
double ntuples;
} xl_hash_vacuum_one_page;
#define SizeOfHashVacuumOnePage \
(offsetof(xl_hash_vacuum_one_page, ntuples) + sizeof(double))
extern void hash_redo(XLogReaderState *record); extern void hash_redo(XLogReaderState *record);
extern void hash_desc(StringInfo buf, XLogReaderState *record); extern void hash_desc(StringInfo buf, XLogReaderState *record);
extern const char *hash_identify(uint8 info); extern const char *hash_identify(uint8 info);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment