Commit d6c9e05c authored by Tom Lane's avatar Tom Lane

Fix assorted bugs in contrib/bloom.

In blinsert(), cope with the possibility that a page we pull from the
notFullPage list is marked BLOOM_DELETED.  This could happen if VACUUM
recently marked it deleted but hasn't (yet) updated the metapage.
We can re-use such a page safely, but we *must* reinitialize it so that
it's no longer marked deleted.

Fix blvacuum() so that it updates the notFullPage list even if it's
going to update it to empty.  The previous "optimization" of skipping
the update seems pretty dubious, since it means that the next blinsert()
will uselessly visit whatever pages we left in the list.

Uniformly treat PageIsNew pages the same as deleted pages.  This should
allow proper recovery if a crash occurs just after relation extension.

Properly use vacuum_delay_point, not assorted ad-hoc CHECK_FOR_INTERRUPTS
calls, in the blvacuum() main loop.

Fix broken tuple-counting logic: blvacuum.c counted the number of live
index tuples over again in each scan, leading to VACUUM VERBOSE reporting
some multiple of the actual number of surviving index tuples after any
vacuum that removed any tuples (since they'd be counted in blvacuum, maybe
more than once, and then again in blvacuumcleanup, without ever zeroing the
counter).  It's sufficient to count them in blvacuumcleanup.

stats->estimated_count is a boolean, not a counter, and we don't want
to set it true, so don't add tuple counts to it.

Add a couple of Asserts that we don't overrun available space on a bloom
page.  I don't think there's any bug there today, but the way the
FreeBlockNumberArray size calculation is set up is scarily fragile, and
BloomPageGetFreeSpace isn't much better.  The Asserts should help catch
any future mistakes.

Per investigation of a report from Jeff Janes.  I think the first item
above may explain his report; the other changes were things I noticed
while casting about for an explanation.

Report: <CAMkU=1xEUuBphDwDmB1WjN4+td4kpnEniFaTBxnk1xzHCw8_OQ@mail.gmail.com>
parent ed0097e4
...@@ -237,6 +237,13 @@ blinsert(Relation index, Datum *values, bool *isnull, ...@@ -237,6 +237,13 @@ blinsert(Relation index, Datum *values, bool *isnull,
state = GenericXLogStart(index); state = GenericXLogStart(index);
page = GenericXLogRegisterBuffer(state, buffer, 0); page = GenericXLogRegisterBuffer(state, buffer, 0);
/*
* We might have found a page that was recently deleted by VACUUM. If
* so, we can reuse it, but we must reinitialize it.
*/
if (PageIsNew(page) || BloomPageIsDeleted(page))
BloomInitPage(page, 0);
if (BloomPageAddItem(&blstate, page, itup)) if (BloomPageAddItem(&blstate, page, itup))
{ {
/* Success! Apply the change, clean up, and exit */ /* Success! Apply the change, clean up, and exit */
...@@ -295,6 +302,10 @@ blinsert(Relation index, Datum *values, bool *isnull, ...@@ -295,6 +302,10 @@ blinsert(Relation index, Datum *values, bool *isnull,
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
page = GenericXLogRegisterBuffer(state, buffer, 0); page = GenericXLogRegisterBuffer(state, buffer, 0);
/* Basically same logic as above */
if (PageIsNew(page) || BloomPageIsDeleted(page))
BloomInitPage(page, 0);
if (BloomPageAddItem(&blstate, page, itup)) if (BloomPageAddItem(&blstate, page, itup))
{ {
/* Success! Apply the changes, clean up, and exit */ /* Success! Apply the changes, clean up, and exit */
......
...@@ -135,7 +135,7 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm) ...@@ -135,7 +135,7 @@ blgetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
page = BufferGetPage(buffer); page = BufferGetPage(buffer);
TestForOldSnapshot(scan->xs_snapshot, scan->indexRelation, page); TestForOldSnapshot(scan->xs_snapshot, scan->indexRelation, page);
if (!BloomPageIsDeleted(page)) if (!PageIsNew(page) && !BloomPageIsDeleted(page))
{ {
OffsetNumber offset, OffsetNumber offset,
maxOffset = BloomPageGetMaxOffset(page); maxOffset = BloomPageGetMaxOffset(page);
......
...@@ -299,7 +299,7 @@ BloomFormTuple(BloomState *state, ItemPointer iptr, Datum *values, bool *isnull) ...@@ -299,7 +299,7 @@ BloomFormTuple(BloomState *state, ItemPointer iptr, Datum *values, bool *isnull)
/* /*
* Add new bloom tuple to the page. Returns true if new tuple was successfully * Add new bloom tuple to the page. Returns true if new tuple was successfully
* added to the page. Returns false if it doesn't fit the page. * added to the page. Returns false if it doesn't fit on the page.
*/ */
bool bool
BloomPageAddItem(BloomState *state, Page page, BloomTuple *tuple) BloomPageAddItem(BloomState *state, Page page, BloomTuple *tuple)
...@@ -308,7 +308,10 @@ BloomPageAddItem(BloomState *state, Page page, BloomTuple *tuple) ...@@ -308,7 +308,10 @@ BloomPageAddItem(BloomState *state, Page page, BloomTuple *tuple)
BloomPageOpaque opaque; BloomPageOpaque opaque;
Pointer ptr; Pointer ptr;
/* Does new tuple fit the page */ /* We shouldn't be pointed to an invalid page */
Assert(!PageIsNew(page) && !BloomPageIsDeleted(page));
/* Does new tuple fit on the page? */
if (BloomPageGetFreeSpace(state, page) < state->sizeOfBloomTuple) if (BloomPageGetFreeSpace(state, page) < state->sizeOfBloomTuple)
return false; return false;
...@@ -322,6 +325,9 @@ BloomPageAddItem(BloomState *state, Page page, BloomTuple *tuple) ...@@ -322,6 +325,9 @@ BloomPageAddItem(BloomState *state, Page page, BloomTuple *tuple)
ptr = (Pointer) BloomPageGetTuple(state, page, opaque->maxoff + 1); ptr = (Pointer) BloomPageGetTuple(state, page, opaque->maxoff + 1);
((PageHeader) page)->pd_lower = ptr - page; ((PageHeader) page)->pd_lower = ptr - page;
/* Assert we didn't overrun available space */
Assert(((PageHeader) page)->pd_lower <= ((PageHeader) page)->pd_upper);
return true; return true;
} }
...@@ -424,6 +430,9 @@ BloomFillMetapage(Relation index, Page metaPage) ...@@ -424,6 +430,9 @@ BloomFillMetapage(Relation index, Page metaPage)
metadata->magickNumber = BLOOM_MAGICK_NUMBER; metadata->magickNumber = BLOOM_MAGICK_NUMBER;
metadata->opts = *opts; metadata->opts = *opts;
((PageHeader) metaPage)->pd_lower += sizeof(BloomMetaPageData); ((PageHeader) metaPage)->pd_lower += sizeof(BloomMetaPageData);
/* If this fails, probably FreeBlockNumberArray size calc is wrong: */
Assert(((PageHeader) metaPage)->pd_lower <= ((PageHeader) metaPage)->pd_upper);
} }
/* /*
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include "postgres.h" #include "postgres.h"
#include "access/genam.h" #include "access/genam.h"
#include "bloom.h"
#include "catalog/storage.h" #include "catalog/storage.h"
#include "commands/vacuum.h" #include "commands/vacuum.h"
#include "miscadmin.h" #include "miscadmin.h"
...@@ -21,7 +22,6 @@ ...@@ -21,7 +22,6 @@
#include "storage/indexfsm.h" #include "storage/indexfsm.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "bloom.h"
/* /*
* Bulk deletion of all index entries pointing to a set of heap tuples. * Bulk deletion of all index entries pointing to a set of heap tuples.
...@@ -42,6 +42,7 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, ...@@ -42,6 +42,7 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
BloomState state; BloomState state;
Buffer buffer; Buffer buffer;
Page page; Page page;
BloomMetaPageData *metaData;
GenericXLogState *gxlogState; GenericXLogState *gxlogState;
if (stats == NULL) if (stats == NULL)
...@@ -60,6 +61,8 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, ...@@ -60,6 +61,8 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
*itupPtr, *itupPtr,
*itupEnd; *itupEnd;
vacuum_delay_point();
buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno, buffer = ReadBufferExtended(index, MAIN_FORKNUM, blkno,
RBM_NORMAL, info->strategy); RBM_NORMAL, info->strategy);
...@@ -67,15 +70,18 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, ...@@ -67,15 +70,18 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
gxlogState = GenericXLogStart(index); gxlogState = GenericXLogStart(index);
page = GenericXLogRegisterBuffer(gxlogState, buffer, 0); page = GenericXLogRegisterBuffer(gxlogState, buffer, 0);
if (BloomPageIsDeleted(page)) /* Ignore empty/deleted pages until blvacuumcleanup() */
if (PageIsNew(page) || BloomPageIsDeleted(page))
{ {
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
GenericXLogAbort(gxlogState); GenericXLogAbort(gxlogState);
CHECK_FOR_INTERRUPTS();
continue; continue;
} }
/* Iterate over the tuples */ /*
* Iterate over the tuples. itup points to current tuple being
* scanned, itupPtr points to where to save next non-deleted tuple.
*/
itup = itupPtr = BloomPageGetTuple(&state, page, FirstOffsetNumber); itup = itupPtr = BloomPageGetTuple(&state, page, FirstOffsetNumber);
itupEnd = BloomPageGetTuple(&state, page, itupEnd = BloomPageGetTuple(&state, page,
OffsetNumberNext(BloomPageGetMaxOffset(page))); OffsetNumberNext(BloomPageGetMaxOffset(page)));
...@@ -84,36 +90,32 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, ...@@ -84,36 +90,32 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
/* Do we have to delete this tuple? */ /* Do we have to delete this tuple? */
if (callback(&itup->heapPtr, callback_state)) if (callback(&itup->heapPtr, callback_state))
{ {
stats->tuples_removed += 1; /* Yes; adjust count of tuples that will be left on page */
BloomPageGetOpaque(page)->maxoff--; BloomPageGetOpaque(page)->maxoff--;
stats->tuples_removed += 1;
} }
else else
{ {
/* No; copy it to itupPtr++, but skip copy if not needed */
if (itupPtr != itup) if (itupPtr != itup)
{
/*
* If we already delete something before, we have to move
* this tuple backward.
*/
memmove((Pointer) itupPtr, (Pointer) itup, memmove((Pointer) itupPtr, (Pointer) itup,
state.sizeOfBloomTuple); state.sizeOfBloomTuple);
}
stats->num_index_tuples++;
itupPtr = BloomPageGetNextTuple(&state, itupPtr); itupPtr = BloomPageGetNextTuple(&state, itupPtr);
} }
itup = BloomPageGetNextTuple(&state, itup); itup = BloomPageGetNextTuple(&state, itup);
} }
/* Assert that we counted correctly */
Assert(itupPtr == BloomPageGetTuple(&state, page, Assert(itupPtr == BloomPageGetTuple(&state, page,
OffsetNumberNext(BloomPageGetMaxOffset(page)))); OffsetNumberNext(BloomPageGetMaxOffset(page))));
/* /*
* Add page to notFullPage list if we will not mark page as deleted * Add page to new notFullPage list if we will not mark page as
* and there is a free space on it * deleted and there is free space on it
*/ */
if (BloomPageGetMaxOffset(page) != 0 && if (BloomPageGetMaxOffset(page) != 0 &&
BloomPageGetFreeSpace(&state, page) > state.sizeOfBloomTuple && BloomPageGetFreeSpace(&state, page) >= state.sizeOfBloomTuple &&
countPage < BloomMetaBlockN) countPage < BloomMetaBlockN)
notFullPage[countPage++] = blkno; notFullPage[countPage++] = blkno;
...@@ -134,13 +136,13 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, ...@@ -134,13 +136,13 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
GenericXLogAbort(gxlogState); GenericXLogAbort(gxlogState);
} }
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
CHECK_FOR_INTERRUPTS();
} }
if (countPage > 0) /*
{ * Update the metapage's notFullPage list with whatever we found. Our
BloomMetaPageData *metaData; * info could already be out of date at this point, but blinsert() will
* cope if so.
*/
buffer = ReadBuffer(index, BLOOM_METAPAGE_BLKNO); buffer = ReadBuffer(index, BLOOM_METAPAGE_BLKNO);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
...@@ -154,7 +156,6 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats, ...@@ -154,7 +156,6 @@ blbulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
GenericXLogFinish(gxlogState); GenericXLogFinish(gxlogState);
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
}
return stats; return stats;
} }
...@@ -170,7 +171,6 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) ...@@ -170,7 +171,6 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
Relation index = info->index; Relation index = info->index;
BlockNumber npages, BlockNumber npages,
blkno; blkno;
BlockNumber totFreePages;
if (info->analyze_only) if (info->analyze_only)
return stats; return stats;
...@@ -183,7 +183,9 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) ...@@ -183,7 +183,9 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
* statistics. * statistics.
*/ */
npages = RelationGetNumberOfBlocks(index); npages = RelationGetNumberOfBlocks(index);
totFreePages = 0; stats->num_pages = npages;
stats->pages_free = 0;
stats->num_index_tuples = 0;
for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++) for (blkno = BLOOM_HEAD_BLKNO; blkno < npages; blkno++)
{ {
Buffer buffer; Buffer buffer;
...@@ -196,23 +198,20 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats) ...@@ -196,23 +198,20 @@ blvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
LockBuffer(buffer, BUFFER_LOCK_SHARE); LockBuffer(buffer, BUFFER_LOCK_SHARE);
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
if (BloomPageIsDeleted(page)) if (PageIsNew(page) || BloomPageIsDeleted(page))
{ {
RecordFreeIndexPage(index, blkno); RecordFreeIndexPage(index, blkno);
totFreePages++; stats->pages_free++;
} }
else else
{ {
stats->num_index_tuples += BloomPageGetMaxOffset(page); stats->num_index_tuples += BloomPageGetMaxOffset(page);
stats->estimated_count += BloomPageGetMaxOffset(page);
} }
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
IndexFreeSpaceMapVacuum(info->index); IndexFreeSpaceMapVacuum(info->index);
stats->pages_free = totFreePages;
stats->num_pages = RelationGetNumberOfBlocks(index);
return stats; return stats;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment