Commit 0eab92c0 authored by Tom Lane's avatar Tom Lane

Fix VACUUM so that it can use pages as move targets even if they do not

have any newly-dead tuples on them.  This is a longstanding deficiency
that prevents VACUUM from compacting a file as much as one would expect.
Change requires fixing repair_frag to not assume that fraged_pages is
a subset of vacuum_pages.
Also make some further cleanups of places that assumed page numbers fit
in int and tuple counts fit in uint32.
parent 4fe8490b
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.199 2001/06/29 16:34:30 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.200 2001/06/29 20:14:27 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -71,14 +71,14 @@ typedef struct VacPageData ...@@ -71,14 +71,14 @@ typedef struct VacPageData
Size free; /* FreeSpace on this Page */ Size free; /* FreeSpace on this Page */
uint16 offsets_used; /* Number of OffNums used by vacuum */ uint16 offsets_used; /* Number of OffNums used by vacuum */
uint16 offsets_free; /* Number of OffNums free or to be free */ uint16 offsets_free; /* Number of OffNums free or to be free */
OffsetNumber offsets[1]; /* Array of its OffNums */ OffsetNumber offsets[1]; /* Array of free OffNums */
} VacPageData; } VacPageData;
typedef VacPageData *VacPage; typedef VacPageData *VacPage;
typedef struct VacPageListData typedef struct VacPageListData
{ {
int empty_end_pages;/* Number of "empty" end-pages */ BlockNumber empty_end_pages; /* Number of "empty" end-pages */
int num_pages; /* Number of pages in pagedesc */ int num_pages; /* Number of pages in pagedesc */
int num_allocated_pages; /* Number of allocated pages in int num_allocated_pages; /* Number of allocated pages in
* pagedesc */ * pagedesc */
...@@ -146,7 +146,7 @@ static void vacuum_index(VacPageList vacpagelist, Relation indrel, ...@@ -146,7 +146,7 @@ static void vacuum_index(VacPageList vacpagelist, Relation indrel,
double num_tuples, int keep_tuples); double num_tuples, int keep_tuples);
static void scan_index(Relation indrel, double num_tuples); static void scan_index(Relation indrel, double num_tuples);
static VacPage tid_reaped(ItemPointer itemptr, VacPageList vacpagelist); static VacPage tid_reaped(ItemPointer itemptr, VacPageList vacpagelist);
static void reap_page(VacPageList vacpagelist, VacPage vacpage); static VacPage copy_vac_page(VacPage vacpage);
static void vpage_insert(VacPageList vacpagelist, VacPage vpnew); static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
static void get_indices(Relation relation, int *nindices, Relation **Irel); static void get_indices(Relation relation, int *nindices, Relation **Irel);
static void close_indices(int nindices, Relation *Irel); static void close_indices(int nindices, Relation *Irel);
...@@ -610,10 +610,10 @@ vacuum_rel(Oid relid) ...@@ -610,10 +610,10 @@ vacuum_rel(Oid relid)
/* /*
* scan_heap() -- scan an open heap relation * scan_heap() -- scan an open heap relation
* *
* This routine sets commit times, constructs vacuum_pages list of * This routine sets commit status bits, constructs vacuum_pages (list
* empty/uninitialized pages and pages with dead tuples and * of pages we need to compact free space on and/or clean indexes of
* ~LP_USED line pointers, constructs fraged_pages list of pages * deleted tuples), constructs fraged_pages (list of pages with free
* appropriate for purposes of shrinking and maintains statistics * space that tuples could be moved into), and calculates statistics
* on the number of live tuples in a heap. * on the number of live tuples in a heap.
*/ */
static void static void
...@@ -625,8 +625,6 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, ...@@ -625,8 +625,6 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
ItemId itemid; ItemId itemid;
Buffer buf; Buffer buf;
HeapTupleData tuple; HeapTupleData tuple;
Page page,
tempPage = NULL;
OffsetNumber offnum, OffsetNumber offnum,
maxoff; maxoff;
bool pgchanged, bool pgchanged,
...@@ -634,21 +632,21 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, ...@@ -634,21 +632,21 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
notup; notup;
char *relname; char *relname;
VacPage vacpage, VacPage vacpage,
vp; vacpagecopy;
double num_tuples; BlockNumber empty_pages,
uint32 tups_vacuumed,
nkeep,
nunused,
ncrash,
empty_pages,
new_pages, new_pages,
changed_pages, changed_pages,
empty_end_pages; empty_end_pages;
Size free_size, double num_tuples,
tups_vacuumed,
nkeep,
nunused,
ncrash;
double free_size,
usable_free_size; usable_free_size;
Size min_tlen = MaxTupleSize; Size min_tlen = MaxTupleSize;
Size max_tlen = 0; Size max_tlen = 0;
int32 i; int i;
bool do_shrinking = true; bool do_shrinking = true;
VTupleLink vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData)); VTupleLink vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
int num_vtlinks = 0; int num_vtlinks = 0;
...@@ -660,21 +658,30 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, ...@@ -660,21 +658,30 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
relname = RelationGetRelationName(onerel); relname = RelationGetRelationName(onerel);
elog(MESSAGE_LEVEL, "--Relation %s--", relname); elog(MESSAGE_LEVEL, "--Relation %s--", relname);
tups_vacuumed = nkeep = nunused = ncrash = empty_pages = empty_pages = new_pages = changed_pages = empty_end_pages = 0;
new_pages = changed_pages = empty_end_pages = 0; num_tuples = tups_vacuumed = nkeep = nunused = ncrash = 0;
num_tuples = 0; free_size = 0;
free_size = usable_free_size = 0;
nblocks = RelationGetNumberOfBlocks(onerel); nblocks = RelationGetNumberOfBlocks(onerel);
/*
* We initially create each VacPage item in a maximal-sized workspace,
* then copy the workspace into a just-large-enough copy.
*/
vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber)); vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
vacpage->offsets_used = 0;
for (blkno = 0; blkno < nblocks; blkno++) for (blkno = 0; blkno < nblocks; blkno++)
{ {
Page page,
tempPage = NULL;
bool do_reap,
do_frag;
buf = ReadBuffer(onerel, blkno); buf = ReadBuffer(onerel, blkno);
page = BufferGetPage(buf); page = BufferGetPage(buf);
vacpage->blkno = blkno; vacpage->blkno = blkno;
vacpage->offsets_used = 0;
vacpage->offsets_free = 0; vacpage->offsets_free = 0;
if (PageIsNew(page)) if (PageIsNew(page))
...@@ -686,7 +693,9 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, ...@@ -686,7 +693,9 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
free_size += (vacpage->free - sizeof(ItemIdData)); free_size += (vacpage->free - sizeof(ItemIdData));
new_pages++; new_pages++;
empty_end_pages++; empty_end_pages++;
reap_page(vacuum_pages, vacpage); vacpagecopy = copy_vac_page(vacpage);
vpage_insert(vacuum_pages, vacpagecopy);
vpage_insert(fraged_pages, vacpagecopy);
WriteBuffer(buf); WriteBuffer(buf);
continue; continue;
} }
...@@ -697,7 +706,9 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, ...@@ -697,7 +706,9 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
free_size += (vacpage->free - sizeof(ItemIdData)); free_size += (vacpage->free - sizeof(ItemIdData));
empty_pages++; empty_pages++;
empty_end_pages++; empty_end_pages++;
reap_page(vacuum_pages, vacpage); vacpagecopy = copy_vac_page(vacpage);
vpage_insert(vacuum_pages, vacpagecopy);
vpage_insert(fraged_pages, vacpagecopy);
ReleaseBuffer(buf); ReleaseBuffer(buf);
continue; continue;
} }
...@@ -718,7 +729,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, ...@@ -718,7 +729,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
if (!ItemIdIsUsed(itemid)) if (!ItemIdIsUsed(itemid))
{ {
vacpage->offsets[vacpage->offsets_free++] = offnum; vacpage->offsets[vacpage->offsets_free++] = offnum;
nunused++; nunused += 1;
continue; continue;
} }
...@@ -777,7 +788,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, ...@@ -777,7 +788,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
* Not Aborted, Not Committed, Not in Progress - * Not Aborted, Not Committed, Not in Progress -
* so it's from crashed process. - vadim 11/26/96 * so it's from crashed process. - vadim 11/26/96
*/ */
ncrash++; ncrash += 1;
tupgone = true; tupgone = true;
} }
else else
...@@ -852,7 +863,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, ...@@ -852,7 +863,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
tuple.t_data->t_xmax >= XmaxRecent) tuple.t_data->t_xmax >= XmaxRecent)
{ {
tupgone = false; tupgone = false;
nkeep++; nkeep += 1;
if (!(tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED)) if (!(tuple.t_data->t_infomask & HEAP_XMAX_COMMITTED))
{ {
tuple.t_data->t_infomask |= HEAP_XMAX_COMMITTED; tuple.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
...@@ -909,7 +920,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, ...@@ -909,7 +920,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
pageSize = PageGetPageSize(page); pageSize = PageGetPageSize(page);
tempPage = (Page) palloc(pageSize); tempPage = (Page) palloc(pageSize);
memmove(tempPage, page, pageSize); memcpy(tempPage, page, pageSize);
} }
/* mark it unused on the temp page */ /* mark it unused on the temp page */
...@@ -917,7 +928,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, ...@@ -917,7 +928,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
lpp->lp_flags &= ~LP_USED; lpp->lp_flags &= ~LP_USED;
vacpage->offsets[vacpage->offsets_free++] = offnum; vacpage->offsets[vacpage->offsets_free++] = offnum;
tups_vacuumed++; tups_vacuumed += 1;
} }
else else
{ {
...@@ -931,21 +942,44 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, ...@@ -931,21 +942,44 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
} }
if (tempPage != (Page) NULL) if (tempPage != (Page) NULL)
{ /* Some tuples are gone */ {
/* Some tuples are removable; figure free space after removal */
PageRepairFragmentation(tempPage, NULL); PageRepairFragmentation(tempPage, NULL);
vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower; vacpage->free = ((PageHeader) tempPage)->pd_upper - ((PageHeader) tempPage)->pd_lower;
free_size += vacpage->free;
reap_page(vacuum_pages, vacpage);
pfree(tempPage); pfree(tempPage);
tempPage = (Page) NULL; do_reap = true;
} }
else if (vacpage->offsets_free > 0) else
{ /* there are only ~LP_USED line pointers */ {
/* Just use current available space */
vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower; vacpage->free = ((PageHeader) page)->pd_upper - ((PageHeader) page)->pd_lower;
/* Need to reap the page if it has ~LP_USED line pointers */
do_reap = (vacpage->offsets_free > 0);
}
free_size += vacpage->free; free_size += vacpage->free;
reap_page(vacuum_pages, vacpage); /*
* Add the page to fraged_pages if it has a useful amount of free
* space. "Useful" means enough for a minimal-sized tuple.
* But we don't know that accurately near the start of the relation,
* so add pages unconditionally if they have >= BLCKSZ/10 free space.
*/
do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ/10);
if (do_reap || do_frag)
{
vacpagecopy = copy_vac_page(vacpage);
if (do_reap)
vpage_insert(vacuum_pages, vacpagecopy);
if (do_frag)
vpage_insert(fraged_pages, vacpagecopy);
} }
if (notup)
empty_end_pages++;
else
empty_end_pages = 0;
if (pgchanged) if (pgchanged)
{ {
WriteBuffer(buf); WriteBuffer(buf);
...@@ -953,11 +987,6 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, ...@@ -953,11 +987,6 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
} }
else else
ReleaseBuffer(buf); ReleaseBuffer(buf);
if (notup)
empty_end_pages++;
else
empty_end_pages = 0;
} }
pfree(vacpage); pfree(vacpage);
...@@ -974,26 +1003,22 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, ...@@ -974,26 +1003,22 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
fraged_pages->empty_end_pages = empty_end_pages; fraged_pages->empty_end_pages = empty_end_pages;
/* /*
* Try to make fraged_pages keeping in mind that we can't use free * Clear the fraged_pages list if we found we couldn't shrink.
* space of "empty" end-pages and last page if it reaped. * Else, remove any "empty" end-pages from the list, and compute
* usable free space = free space in remaining pages.
*/ */
if (do_shrinking && vacuum_pages->num_pages - empty_end_pages > 0) if (do_shrinking)
{
int nusf; /* blocks usefull for re-using */
nusf = vacuum_pages->num_pages - empty_end_pages;
if ((vacuum_pages->pagedesc[nusf - 1])->blkno == nblocks - empty_end_pages - 1)
nusf--;
for (i = 0; i < nusf; i++)
{ {
vp = vacuum_pages->pagedesc[i]; Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
if (enough_space(vp, min_tlen)) fraged_pages->num_pages -= empty_end_pages;
{ usable_free_size = 0;
vpage_insert(fraged_pages, vp); for (i = 0; i < fraged_pages->num_pages; i++)
usable_free_size += vp->free; usable_free_size += fraged_pages->pagedesc[i]->free;
}
} }
else
{
fraged_pages->num_pages = 0;
usable_free_size = 0;
} }
if (usable_free_size > 0 && num_vtlinks > 0) if (usable_free_size > 0 && num_vtlinks > 0)
...@@ -1011,13 +1036,13 @@ scan_heap(VRelStats *vacrelstats, Relation onerel, ...@@ -1011,13 +1036,13 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
} }
elog(MESSAGE_LEVEL, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \ elog(MESSAGE_LEVEL, "Pages %u: Changed %u, reaped %u, Empty %u, New %u; \
Tup %.0f: Vac %u, Keep/VTL %u/%u, Crash %u, UnUsed %u, MinLen %lu, MaxLen %lu; \ Tup %.0f: Vac %.0f, Keep/VTL %.0f/%u, Crash %.0f, UnUsed %.0f, MinLen %lu, MaxLen %lu; \
Re-using: Free/Avail. Space %lu/%lu; EndEmpty/Avail. Pages %u/%u. %s", Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u. %s",
nblocks, changed_pages, vacuum_pages->num_pages, empty_pages, nblocks, changed_pages, vacuum_pages->num_pages, empty_pages,
new_pages, num_tuples, tups_vacuumed, new_pages, num_tuples, tups_vacuumed,
nkeep, vacrelstats->num_vtlinks, ncrash, nkeep, vacrelstats->num_vtlinks, ncrash,
nunused, (unsigned long) min_tlen, (unsigned long) max_tlen, nunused, (unsigned long) min_tlen, (unsigned long) max_tlen,
(unsigned long) free_size, (unsigned long) usable_free_size, free_size, usable_free_size,
empty_end_pages, fraged_pages->num_pages, empty_end_pages, fraged_pages->num_pages,
show_rusage(&ru0)); show_rusage(&ru0));
...@@ -1043,7 +1068,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, ...@@ -1043,7 +1068,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
CommandId myCID; CommandId myCID;
Buffer buf, Buffer buf,
cur_buffer; cur_buffer;
int nblocks, BlockNumber nblocks,
blkno; blkno;
BlockNumber last_move_dest_block = 0, BlockNumber last_move_dest_block = 0,
last_vacuum_block; last_vacuum_block;
...@@ -1093,10 +1118,19 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, ...@@ -1093,10 +1118,19 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
Nvacpagelist.num_pages = 0; Nvacpagelist.num_pages = 0;
num_fraged_pages = fraged_pages->num_pages; num_fraged_pages = fraged_pages->num_pages;
Assert(vacuum_pages->num_pages > vacuum_pages->empty_end_pages); Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages; vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
if (vacuumed_pages > 0)
{
/* get last reaped page from vacuum_pages */
last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1]; last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
last_vacuum_block = last_vacuum_page->blkno; last_vacuum_block = last_vacuum_page->blkno;
}
else
{
last_vacuum_page = NULL;
last_vacuum_block = InvalidBlockNumber;
}
cur_buffer = InvalidBuffer; cur_buffer = InvalidBuffer;
num_moved = 0; num_moved = 0;
...@@ -1106,19 +1140,34 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, ...@@ -1106,19 +1140,34 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
/* /*
* Scan pages backwards from the last nonempty page, trying to move * Scan pages backwards from the last nonempty page, trying to move
* tuples down to lower pages. Quit when we reach a page that we have * tuples down to lower pages. Quit when we reach a page that we have
* moved any tuples onto. Note that if a page is still in the * moved any tuples onto, or the first page if we haven't moved anything,
* fraged_pages list (list of candidate move-target pages) when we * or when we find a page we cannot completely empty (this last condition
* reach it, we will remove it from the list. This ensures we never * is handled by "break" statements within the loop).
* move a tuple up to a higher page number.
* *
* NB: this code depends on the vacuum_pages and fraged_pages lists being * NB: this code depends on the vacuum_pages and fraged_pages lists being
* in order, and on fraged_pages being a subset of vacuum_pages. * in order by blkno.
*/ */
nblocks = vacrelstats->rel_pages; nblocks = vacrelstats->rel_pages;
for (blkno = nblocks - vacuum_pages->empty_end_pages - 1; for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
blkno > last_move_dest_block; blkno > last_move_dest_block;
blkno--) blkno--)
{ {
/*
* Forget fraged_pages pages at or after this one; they're no longer
* useful as move targets, since we only want to move down. Note
* that since we stop the outer loop at last_move_dest_block, pages
* removed here cannot have had anything moved onto them already.
*/
while (num_fraged_pages > 0 &&
fraged_pages->pagedesc[num_fraged_pages-1]->blkno >= blkno)
{
Assert(fraged_pages->pagedesc[num_fraged_pages-1]->offsets_used == 0);
--num_fraged_pages;
}
/*
* Process this page of relation.
*/
buf = ReadBuffer(onerel, blkno); buf = ReadBuffer(onerel, blkno);
page = BufferGetPage(buf); page = BufferGetPage(buf);
...@@ -1127,10 +1176,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, ...@@ -1127,10 +1176,13 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
isempty = PageIsEmpty(page); isempty = PageIsEmpty(page);
dowrite = false; dowrite = false;
if (blkno == last_vacuum_block) /* it's reaped page */
/* Is the page in the vacuum_pages list? */
if (blkno == last_vacuum_block)
{
if (last_vacuum_page->offsets_free > 0)
{ {
if (last_vacuum_page->offsets_free > 0) /* there are dead tuples */ /* there are dead tuples on this page - clean them */
{ /* on this page - clean */
Assert(!isempty); Assert(!isempty);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
vacuum_page(onerel, buf, last_vacuum_page); vacuum_page(onerel, buf, last_vacuum_page);
...@@ -1151,12 +1203,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, ...@@ -1151,12 +1203,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
last_vacuum_page = NULL; last_vacuum_page = NULL;
last_vacuum_block = InvalidBlockNumber; last_vacuum_block = InvalidBlockNumber;
} }
if (num_fraged_pages > 0 &&
fraged_pages->pagedesc[num_fraged_pages - 1]->blkno == blkno)
{
/* page is in fraged_pages too; remove it */
--num_fraged_pages;
}
if (isempty) if (isempty)
{ {
ReleaseBuffer(buf); ReleaseBuffer(buf);
...@@ -1305,29 +1351,15 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, ...@@ -1305,29 +1351,15 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
if (to_vacpage == NULL || if (to_vacpage == NULL ||
!enough_space(to_vacpage, tlen)) !enough_space(to_vacpage, tlen))
{ {
/*
* if to_vacpage no longer has enough free space
* to be useful, remove it from fraged_pages list
*/
if (to_vacpage != NULL &&
!enough_space(to_vacpage, vacrelstats->min_tlen))
{
Assert(num_fraged_pages > to_item);
memmove(fraged_pages->pagedesc + to_item,
fraged_pages->pagedesc + to_item + 1,
sizeof(VacPage) * (num_fraged_pages - to_item - 1));
num_fraged_pages--;
}
for (i = 0; i < num_fraged_pages; i++) for (i = 0; i < num_fraged_pages; i++)
{ {
if (enough_space(fraged_pages->pagedesc[i], tlen)) if (enough_space(fraged_pages->pagedesc[i], tlen))
break; break;
} }
/* can't move item anywhere */
if (i == num_fraged_pages) if (i == num_fraged_pages)
{ {
/* can't move item anywhere */
for (i = 0; i < num_vtmove; i++) for (i = 0; i < num_vtmove; i++)
{ {
Assert(vtmove[i].vacpage->offsets_used > 0); Assert(vtmove[i].vacpage->offsets_used > 0);
...@@ -1649,19 +1681,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, ...@@ -1649,19 +1681,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
{ {
WriteBuffer(cur_buffer); WriteBuffer(cur_buffer);
cur_buffer = InvalidBuffer; cur_buffer = InvalidBuffer;
/*
* If previous target page is now too full to add *any*
* tuple to it, remove it from fraged_pages.
*/
if (!enough_space(cur_page, vacrelstats->min_tlen))
{
Assert(num_fraged_pages > cur_item);
memmove(fraged_pages->pagedesc + cur_item,
fraged_pages->pagedesc + cur_item + 1,
sizeof(VacPage) * (num_fraged_pages - cur_item - 1));
num_fraged_pages--;
}
} }
for (i = 0; i < num_fraged_pages; i++) for (i = 0; i < num_fraged_pages; i++)
{ {
...@@ -1835,7 +1854,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, ...@@ -1835,7 +1854,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
qsort((char *) (vacpage->offsets), vacpage->offsets_free, qsort((char *) (vacpage->offsets), vacpage->offsets_free,
sizeof(OffsetNumber), vac_cmp_offno); sizeof(OffsetNumber), vac_cmp_offno);
} }
reap_page(&Nvacpagelist, vacpage); vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
WriteBuffer(buf); WriteBuffer(buf);
} }
else if (dowrite) else if (dowrite)
...@@ -1858,7 +1877,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, ...@@ -1858,7 +1877,6 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
if (num_moved > 0) if (num_moved > 0)
{ {
/* /*
* We have to commit our tuple movings before we truncate the * We have to commit our tuple movings before we truncate the
* relation. Ideally we should do Commit/StartTransactionCommand * relation. Ideally we should do Commit/StartTransactionCommand
...@@ -1872,24 +1890,54 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, ...@@ -1872,24 +1890,54 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
} }
/* /*
* Clean uncleaned reaped pages from vacuum_pages list list and set * We are not going to move any more tuples across pages, but we still
* xmin committed for inserted tuples * need to apply vacuum_page to compact free space in the remaining
* pages in vacuum_pages list. Note that some of these pages may also
* be in the fraged_pages list, and may have had tuples moved onto them;
* if so, we already did vacuum_page and needn't do it again.
*/ */
checked_moved = 0; for (i = 0, curpage = vacuum_pages->pagedesc;
for (i = 0, curpage = vacuum_pages->pagedesc; i < vacuumed_pages; i++, curpage++) i < vacuumed_pages;
i++, curpage++)
{ {
Assert((*curpage)->blkno < blkno); Assert((*curpage)->blkno < blkno);
if ((*curpage)->offsets_used == 0)
{
/* this page was not used as a move target, so must clean it */
buf = ReadBuffer(onerel, (*curpage)->blkno); buf = ReadBuffer(onerel, (*curpage)->blkno);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf); page = BufferGetPage(buf);
if ((*curpage)->offsets_used == 0) /* this page was not used */
{
if (!PageIsEmpty(page)) if (!PageIsEmpty(page))
vacuum_page(onerel, buf, *curpage); vacuum_page(onerel, buf, *curpage);
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
WriteBuffer(buf);
} }
else }
/* this page was used */
/*
* Now scan all the pages that we moved tuples onto and update
* tuple status bits. This is not really necessary, but will save time
* for future transactions examining these tuples.
*
* XXX Notice that this code fails to clear HEAP_MOVED_OFF tuples from
* pages that were move source pages but not move dest pages. One also
* wonders whether it wouldn't be better to skip this step and let the
* tuple status updates happen someplace that's not holding an exclusive
* lock on the relation.
*/
checked_moved = 0;
for (i = 0, curpage = fraged_pages->pagedesc;
i < num_fraged_pages;
i++, curpage++)
{ {
Assert((*curpage)->blkno < blkno);
if ((*curpage)->blkno > last_move_dest_block)
break; /* no need to scan any further */
if ((*curpage)->offsets_used == 0)
continue; /* this page was never used as a move dest */
buf = ReadBuffer(onerel, (*curpage)->blkno);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
num_tuples = 0; num_tuples = 0;
max_offset = PageGetMaxOffsetNumber(page); max_offset = PageGetMaxOffsetNumber(page);
for (newoff = FirstOffsetNumber; for (newoff = FirstOffsetNumber;
...@@ -1916,11 +1964,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel, ...@@ -1916,11 +1964,10 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected"); elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
} }
} }
Assert((*curpage)->offsets_used == num_tuples);
checked_moved += num_tuples;
}
LockBuffer(buf, BUFFER_LOCK_UNLOCK); LockBuffer(buf, BUFFER_LOCK_UNLOCK);
WriteBuffer(buf); WriteBuffer(buf);
Assert((*curpage)->offsets_used == num_tuples);
checked_moved += num_tuples;
} }
Assert(num_moved == checked_moved); Assert(num_moved == checked_moved);
...@@ -2083,7 +2130,7 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages) ...@@ -2083,7 +2130,7 @@ vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
* tuples have correct on-row commit status on disk (see bufmgr.c's * tuples have correct on-row commit status on disk (see bufmgr.c's
* comments for FlushRelationBuffers()). * comments for FlushRelationBuffers()).
*/ */
Assert(vacrelstats->rel_pages >= (BlockNumber) vacuum_pages->empty_end_pages); Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages; relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
i = FlushRelationBuffers(onerel, relblocks); i = FlushRelationBuffers(onerel, relblocks);
...@@ -2367,34 +2414,35 @@ vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples, ...@@ -2367,34 +2414,35 @@ vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
heap_close(rd, RowExclusiveLock); heap_close(rd, RowExclusiveLock);
} }
/* /* Copy a VacPage structure */
* reap_page() -- save a page on the array of reaped pages. static VacPage
* copy_vac_page(VacPage vacpage)
* As a side effect of the way that the vacuuming loop for a given
* relation works, higher pages come after lower pages in the array
* (and highest tid on a page is last).
*/
static void
reap_page(VacPageList vacpagelist, VacPage vacpage)
{ {
VacPage newvacpage; VacPage newvacpage;
/* allocate a VacPageData entry */ /* allocate a VacPageData entry */
newvacpage = (VacPage) palloc(sizeof(VacPageData) + vacpage->offsets_free * sizeof(OffsetNumber)); newvacpage = (VacPage) palloc(sizeof(VacPageData) +
vacpage->offsets_free * sizeof(OffsetNumber));
/* fill it in */ /* fill it in */
if (vacpage->offsets_free > 0) if (vacpage->offsets_free > 0)
memmove(newvacpage->offsets, vacpage->offsets, vacpage->offsets_free * sizeof(OffsetNumber)); memcpy(newvacpage->offsets, vacpage->offsets,
vacpage->offsets_free * sizeof(OffsetNumber));
newvacpage->blkno = vacpage->blkno; newvacpage->blkno = vacpage->blkno;
newvacpage->free = vacpage->free; newvacpage->free = vacpage->free;
newvacpage->offsets_used = vacpage->offsets_used; newvacpage->offsets_used = vacpage->offsets_used;
newvacpage->offsets_free = vacpage->offsets_free; newvacpage->offsets_free = vacpage->offsets_free;
/* insert this page into vacpagelist list */ return newvacpage;
vpage_insert(vacpagelist, newvacpage);
} }
/*
* Add a VacPage pointer to a VacPageList.
*
* As a side effect of the way that scan_heap works,
* higher pages come after lower pages in the array
* (and highest tid on a page is last).
*/
static void static void
vpage_insert(VacPageList vacpagelist, VacPage vpnew) vpage_insert(VacPageList vacpagelist, VacPage vpnew)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment