Commit 88dc31e3 authored by Tom Lane's avatar Tom Lane

First cut at recycling space in btree indexes. Still some rough edges

to fix, but it seems to basically work...
parent 27854915
......@@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/common/indextuple.c,v 1.63 2002/11/13 00:39:46 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/access/common/indextuple.c,v 1.64 2003/02/23 06:17:12 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -394,17 +394,16 @@ nocache_index_getattr(IndexTuple tup,
}
/*
* Copies source into target. If *target == NULL, we palloc space; otherwise
* we assume we have space that is already palloc'ed.
* Create a palloc'd copy of an index tuple.
*/
void
CopyIndexTuple(IndexTuple source, IndexTuple *target)
IndexTuple
CopyIndexTuple(IndexTuple source)
{
IndexTuple result;
Size size;
size = IndexTupleSize(source);
if (*target == NULL)
*target = (IndexTuple) palloc(size);
memmove((char *) *target, (char *) source, size);
result = (IndexTuple) palloc(size);
memcpy(result, source, size);
return result;
}
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.98 2003/02/22 00:45:03 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.99 2003/02/23 06:17:13 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -58,7 +58,6 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
int leftfree, int rightfree,
bool newitemonleft, Size firstrightitemsz);
static Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
static void _bt_pgaddtup(Relation rel, Page page,
Size itemsize, BTItem btitem,
OffsetNumber itup_off, const char *where);
......@@ -666,7 +665,6 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
rightoff;
OffsetNumber maxoff;
OffsetNumber i;
BTItem lhikey;
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
origpage = BufferGetPage(buf);
......@@ -730,7 +728,6 @@ _bt_split(Relation rel, Buffer buf, OffsetNumber firstright,
itemsz = ItemIdGetLength(itemid);
item = (BTItem) PageGetItem(origpage, itemid);
}
lhikey = item;
if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
LP_USED) == InvalidOffsetNumber)
elog(PANIC, "btree: failed to add hikey to the left sibling");
......@@ -1262,7 +1259,7 @@ _bt_insert_parent(Relation rel,
*
* Returns InvalidBuffer if item not found (should not happen).
*/
static Buffer
Buffer
_bt_getstackbuf(Relation rel, BTStack stack, int access)
{
BlockNumber blkno;
......
......@@ -9,7 +9,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.60 2003/02/22 00:45:04 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtpage.c,v 1.61 2003/02/23 06:17:13 tgl Exp $
*
* NOTES
* Postgres btree pages look like ordinary relation pages. The opaque
......@@ -24,6 +24,7 @@
#include "access/nbtree.h"
#include "miscadmin.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
......@@ -391,7 +392,38 @@ _bt_getbuf(Relation rel, BlockNumber blkno, int access)
bool needLock;
Page page;
/* XXX soon: ask FSM about free space */
Assert(access == BT_WRITE);
/*
* First see if the FSM knows of any free pages.
*
* We can't trust the FSM's report unreservedly; we have to check
* that the page is still free. (For example, an already-free page
* could have been re-used between the time the last VACUUM scanned
* it and the time the VACUUM made its FSM updates.)
*
* The request size should be more than half of what btvacuumcleanup
* logs as the per-page free space. We use BLCKSZ/2 and BLCKSZ-1
* to try to get some use out of FSM's space management algorithm.
* XXX this needs some more thought...
*/
for (;;)
{
blkno = GetPageWithFreeSpace(&rel->rd_node, BLCKSZ/2);
if (blkno == InvalidBlockNumber)
break;
buf = ReadBuffer(rel, blkno);
LockBuffer(buf, access);
page = BufferGetPage(buf);
if (_bt_page_recyclable(page))
{
/* Okay to use page. Re-initialize and return it */
_bt_pageinit(page, BufferGetPageSize(buf));
return buf;
}
elog(DEBUG1, "_bt_getbuf: FSM returned nonrecyclable page");
_bt_relbuf(rel, buf);
}
/*
* Extend the relation by one page.
......@@ -487,6 +519,36 @@ _bt_pageinit(Page page, Size size)
PageInit(page, size, sizeof(BTPageOpaqueData));
}
/*
* _bt_page_recyclable() -- Is an existing page recyclable?
*
* This exists to make sure _bt_getbuf and btvacuumcleanup have the same
* policy about whether a page is safe to re-use.
*/
bool
_bt_page_recyclable(Page page)
{
BTPageOpaque opaque;
/*
* It's possible to find an all-zeroes page in an index --- for example,
* a backend might successfully extend the relation one page and then
* crash before it is able to make a WAL entry for adding the page.
* If we find a zeroed page then reclaim it.
*/
if (PageIsNew(page))
return true;
/*
* Otherwise, recycle if deleted and too old to have any processes
* interested in it.
*/
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_ISDELETED(opaque) &&
TransactionIdPrecedesOrEquals(opaque->btpo.xact, RecentGlobalXmin))
return true;
return false;
}
/*
* _bt_metaproot() -- Change the root page of the btree.
*
......@@ -605,3 +667,426 @@ _bt_itemdel(Relation rel, Buffer buf, ItemPointer tid)
END_CRIT_SECTION();
}
/*
* _bt_pagedel() -- Delete a page from the b-tree.
*
* This action unlinks the page from the b-tree structure, removing all
* pointers leading to it --- but not touching its own left and right links.
* The page cannot be physically reclaimed right away, since other processes
* may currently be trying to follow links leading to the page; they have to
* be allowed to use its right-link to recover. See nbtree/README.
*
* On entry, the target buffer must be pinned and read-locked. This lock and
* pin will be dropped before exiting.
*
* Returns the number of pages successfully deleted (zero on failure; could
* be more than one if parent blocks were deleted).
*
* NOTE: this leaks memory. Rather than trying to clean up everything
* carefully, it's better to run it in a temp context that can be reset
* frequently.
*/
int
_bt_pagedel(Relation rel, Buffer buf, bool vacuum_full)
{
BlockNumber target,
leftsib,
rightsib,
parent;
OffsetNumber poffset,
maxoff;
uint32 targetlevel,
ilevel;
ItemId itemid;
BTItem targetkey,
btitem;
ScanKey itup_scankey;
BTStack stack;
Buffer lbuf,
rbuf,
pbuf;
bool parent_half_dead;
bool parent_one_child;
bool rightsib_empty;
Buffer metabuf = InvalidBuffer;
Page metapg = NULL;
BTMetaPageData *metad = NULL;
Page page;
BTPageOpaque opaque;
/*
* We can never delete rightmost pages nor root pages. While at it,
* check that page is not already deleted and is empty.
*/
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_RIGHTMOST(opaque) || P_ISROOT(opaque) || P_ISDELETED(opaque) ||
P_FIRSTDATAKEY(opaque) <= PageGetMaxOffsetNumber(page))
{
_bt_relbuf(rel, buf);
return 0;
}
/*
* Save info about page, including a copy of its high key (it must
* have one, being non-rightmost).
*/
target = BufferGetBlockNumber(buf);
targetlevel = opaque->btpo.level;
leftsib = opaque->btpo_prev;
itemid = PageGetItemId(page, P_HIKEY);
targetkey = CopyBTItem((BTItem) PageGetItem(page, itemid));
/*
* We need to get an approximate pointer to the page's parent page.
* Use the standard search mechanism to search for the page's high key;
* this will give us a link to either the current parent or someplace
* to its left (if there are multiple equal high keys). To avoid
* deadlocks, we'd better drop the target page lock first.
*/
_bt_relbuf(rel, buf);
/* we need a scan key to do our search, so build one */
itup_scankey = _bt_mkscankey(rel, &(targetkey->bti_itup));
/* find the leftmost leaf page containing this key */
stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
&lbuf, BT_READ);
/* don't need a pin on that either */
_bt_relbuf(rel, lbuf);
/*
* If we are trying to delete an interior page, _bt_search did more
* than we needed. Locate the stack item pointing to our parent level.
*/
ilevel = 0;
for (;;)
{
if (stack == NULL)
elog(ERROR, "_bt_pagedel: not enough stack items");
if (ilevel == targetlevel)
break;
stack = stack->bts_parent;
ilevel++;
}
/*
* We have to lock the pages we need to modify in the standard order:
* moving right, then up. Else we will deadlock against other writers.
*
* So, we need to find and write-lock the current left sibling of the
* target page. The sibling that was current a moment ago could have
* split, so we may have to move right. This search could fail if
* either the sibling or the target page was deleted by someone else
* meanwhile; if so, give up. (Right now, that should never happen,
* since page deletion is only done in VACUUM and there shouldn't be
* multiple VACUUMs concurrently on the same table.)
*/
if (leftsib != P_NONE)
{
lbuf = _bt_getbuf(rel, leftsib, BT_WRITE);
page = BufferGetPage(lbuf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
while (P_ISDELETED(opaque) || opaque->btpo_next != target)
{
/* step right one page */
leftsib = opaque->btpo_next;
_bt_relbuf(rel, lbuf);
if (leftsib == P_NONE)
{
elog(LOG, "_bt_pagedel: no left sibling (concurrent deletion?)");
return 0;
}
lbuf = _bt_getbuf(rel, leftsib, BT_WRITE);
page = BufferGetPage(lbuf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
}
}
else
lbuf = InvalidBuffer;
/*
* Next write-lock the target page itself. It should be okay to take just
* a write lock not a superexclusive lock, since no scans would stop on an
* empty page.
*/
buf = _bt_getbuf(rel, target, BT_WRITE);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
/*
* Check page is still empty etc, else abandon deletion. The empty check
* is necessary since someone else might have inserted into it while
* we didn't have it locked; the others are just for paranoia's sake.
*/
if (P_RIGHTMOST(opaque) || P_ISROOT(opaque) || P_ISDELETED(opaque) ||
P_FIRSTDATAKEY(opaque) <= PageGetMaxOffsetNumber(page))
{
_bt_relbuf(rel, buf);
if (BufferIsValid(lbuf))
_bt_relbuf(rel, lbuf);
return 0;
}
if (opaque->btpo_prev != leftsib)
elog(ERROR, "_bt_pagedel: left link changed unexpectedly");
/*
* And next write-lock the (current) right sibling.
*/
rightsib = opaque->btpo_next;
rbuf = _bt_getbuf(rel, rightsib, BT_WRITE);
/*
* Next find and write-lock the current parent of the target page.
* This is essentially the same as the corresponding step of splitting.
*/
ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
target, P_HIKEY);
pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
if (pbuf == InvalidBuffer)
elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
"\n\tRecreate index %s.", RelationGetRelationName(rel));
parent = stack->bts_blkno;
poffset = stack->bts_offset;
/*
* If the target is the rightmost child of its parent, then we can't
* delete, unless it's also the only child --- in which case the parent
* changes to half-dead status.
*/
page = BufferGetPage(pbuf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
parent_half_dead = false;
parent_one_child = false;
if (poffset >= maxoff)
{
if (poffset == P_FIRSTDATAKEY(opaque))
parent_half_dead = true;
else
{
_bt_relbuf(rel, pbuf);
_bt_relbuf(rel, rbuf);
_bt_relbuf(rel, buf);
if (BufferIsValid(lbuf))
_bt_relbuf(rel, lbuf);
return 0;
}
}
else
{
/* Will there be exactly one child left in this parent? */
if (OffsetNumberNext(P_FIRSTDATAKEY(opaque)) == maxoff)
parent_one_child = true;
}
/*
* If we are deleting the next-to-last page on the target's level,
* then the rightsib is a candidate to become the new fast root.
* (In theory, it might be possible to push the fast root even further
* down, but the odds of doing so are slim, and the locking considerations
* daunting.)
*
* We can safely acquire a lock on the metapage here --- see comments for
* _bt_newroot().
*/
if (leftsib == P_NONE)
{
page = BufferGetPage(rbuf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
Assert(opaque->btpo.level == targetlevel);
if (P_RIGHTMOST(opaque))
{
/* rightsib will be the only one left on the level */
metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
metapg = BufferGetPage(metabuf);
metad = BTPageGetMeta(metapg);
/*
* The expected case here is btm_fastlevel == targetlevel+1;
* if the fastlevel is <= targetlevel, something is wrong, and we
* choose to overwrite it to fix it.
*/
if (metad->btm_fastlevel > targetlevel+1)
{
/* no update wanted */
_bt_relbuf(rel, metabuf);
metabuf = InvalidBuffer;
}
}
}
/*
* Here we begin doing the deletion.
*/
/* No elog(ERROR) until changes are logged */
START_CRIT_SECTION();
/*
* Update parent. The normal case is a tad tricky because we want to
* delete the target's downlink and the *following* key. Easiest way is
* to copy the right sibling's downlink over the target downlink, and then
* delete the following item.
*/
page = BufferGetPage(pbuf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (parent_half_dead)
{
PageIndexTupleDelete(page, poffset);
opaque->btpo_flags |= BTP_HALF_DEAD;
}
else
{
OffsetNumber nextoffset;
itemid = PageGetItemId(page, poffset);
btitem = (BTItem) PageGetItem(page, itemid);
Assert(ItemPointerGetBlockNumber(&(btitem->bti_itup.t_tid)) == target);
ItemPointerSet(&(btitem->bti_itup.t_tid), rightsib, P_HIKEY);
nextoffset = OffsetNumberNext(poffset);
/* This part is just for double-checking */
itemid = PageGetItemId(page, nextoffset);
btitem = (BTItem) PageGetItem(page, itemid);
if (ItemPointerGetBlockNumber(&(btitem->bti_itup.t_tid)) != rightsib)
elog(PANIC, "_bt_pagedel: right sibling is not next child");
PageIndexTupleDelete(page, nextoffset);
}
/*
* Update siblings' side-links. Note the target page's side-links will
* continue to point to the siblings.
*/
if (BufferIsValid(lbuf))
{
page = BufferGetPage(lbuf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
Assert(opaque->btpo_next == target);
opaque->btpo_next = rightsib;
}
page = BufferGetPage(rbuf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
Assert(opaque->btpo_prev == target);
opaque->btpo_prev = leftsib;
rightsib_empty = (P_FIRSTDATAKEY(opaque) > PageGetMaxOffsetNumber(page));
/*
* Mark the page itself deleted. It can be recycled when all current
* transactions are gone; or immediately if we're doing VACUUM FULL.
*/
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
opaque->btpo_flags |= BTP_DELETED;
opaque->btpo.xact =
vacuum_full ? FrozenTransactionId : ReadNewTransactionId();
/* And update the metapage, if needed */
if (BufferIsValid(metabuf))
{
metad->btm_fastroot = rightsib;
metad->btm_fastlevel = targetlevel;
}
/* XLOG stuff */
if (!rel->rd_istemp)
{
xl_btree_delete_page xlrec;
xl_btree_metadata xlmeta;
uint8 xlinfo;
XLogRecPtr recptr;
XLogRecData rdata[5];
XLogRecData *nextrdata;
xlrec.target.node = rel->rd_node;
ItemPointerSet(&(xlrec.target.tid), parent, poffset);
xlrec.deadblk = target;
xlrec.leftblk = leftsib;
xlrec.rightblk = rightsib;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = SizeOfBtreeDeletePage;
rdata[0].next = nextrdata = &(rdata[1]);
if (BufferIsValid(metabuf))
{
xlmeta.root = metad->btm_root;
xlmeta.level = metad->btm_level;
xlmeta.fastroot = metad->btm_fastroot;
xlmeta.fastlevel = metad->btm_fastlevel;
nextrdata->buffer = InvalidBuffer;
nextrdata->data = (char *) &xlmeta;
nextrdata->len = sizeof(xl_btree_metadata);
nextrdata->next = nextrdata + 1;
nextrdata++;
xlinfo = XLOG_BTREE_DELETE_PAGE_META;
}
else
xlinfo = XLOG_BTREE_DELETE_PAGE;
nextrdata->buffer = pbuf;
nextrdata->data = NULL;
nextrdata->len = 0;
nextrdata->next = nextrdata + 1;
nextrdata++;
nextrdata->buffer = rbuf;
nextrdata->data = NULL;
nextrdata->len = 0;
nextrdata->next = NULL;
if (BufferIsValid(lbuf))
{
nextrdata->next = nextrdata + 1;
nextrdata++;
nextrdata->buffer = lbuf;
nextrdata->data = NULL;
nextrdata->len = 0;
nextrdata->next = NULL;
}
recptr = XLogInsert(RM_BTREE_ID, xlinfo, rdata);
if (BufferIsValid(metabuf))
{
PageSetLSN(metapg, recptr);
PageSetSUI(metapg, ThisStartUpID);
}
page = BufferGetPage(pbuf);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
page = BufferGetPage(rbuf);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
page = BufferGetPage(buf);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
if (BufferIsValid(lbuf))
{
page = BufferGetPage(lbuf);
PageSetLSN(page, recptr);
PageSetSUI(page, ThisStartUpID);
}
}
END_CRIT_SECTION();
/* Write and release buffers */
if (BufferIsValid(metabuf))
_bt_wrtbuf(rel, metabuf);
_bt_wrtbuf(rel, pbuf);
_bt_wrtbuf(rel, rbuf);
_bt_wrtbuf(rel, buf);
if (BufferIsValid(lbuf))
_bt_wrtbuf(rel, lbuf);
/*
* If parent became half dead, recurse to try to delete it. Otherwise,
* if right sibling is empty and is now the last child of the parent,
* recurse to try to delete it. (These cases cannot apply at the same
* time, though the second case might itself recurse to the first.)
*/
if (parent_half_dead)
{
buf = _bt_getbuf(rel, parent, BT_READ);
return _bt_pagedel(rel, buf, vacuum_full) + 1;
}
if (parent_one_child && rightsib_empty)
{
buf = _bt_getbuf(rel, rightsib, BT_READ);
return _bt_pagedel(rel, buf, vacuum_full) + 1;
}
return 1;
}
......@@ -12,7 +12,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.96 2003/02/22 00:45:04 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.97 2003/02/23 06:17:13 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -710,15 +710,16 @@ Datum
btvacuumcleanup(PG_FUNCTION_ARGS)
{
Relation rel = (Relation) PG_GETARG_POINTER(0);
#ifdef NOT_USED
IndexVacuumCleanupInfo *info = (IndexVacuumCleanupInfo *) PG_GETARG_POINTER(1);
#endif
IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(2);
BlockNumber num_pages;
BlockNumber blkno;
PageFreeSpaceInfo *pageSpaces;
int nFreePages,
maxFreePages;
BlockNumber pages_deleted = 0;
MemoryContext mycontext;
MemoryContext oldcontext;
Assert(stats != NULL);
......@@ -731,6 +732,13 @@ btvacuumcleanup(PG_FUNCTION_ARGS)
pageSpaces = (PageFreeSpaceInfo *) palloc(maxFreePages * sizeof(PageFreeSpaceInfo));
nFreePages = 0;
/* Create a temporary memory context to run _bt_pagedel in */
mycontext = AllocSetContextCreate(CurrentMemoryContext,
"_bt_pagedel",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/*
* Scan through all pages of index, except metapage. (Any pages added
* after we start the scan will not be examined; this should be fine,
......@@ -745,17 +753,53 @@ btvacuumcleanup(PG_FUNCTION_ARGS)
buf = _bt_getbuf(rel, blkno, BT_READ);
page = BufferGetPage(buf);
opaque = (BTPageOpaque) PageGetSpecialPointer(page);
if (P_ISDELETED(opaque))
if (_bt_page_recyclable(page))
{
/* XXX if safe-to-reclaim... */
/* Okay to recycle this page */
if (nFreePages < maxFreePages)
{
pageSpaces[nFreePages].blkno = blkno;
/* The avail-space value is bogus, but must be < BLCKSZ */
/* claimed avail-space must be < BLCKSZ */
pageSpaces[nFreePages].avail = BLCKSZ-1;
nFreePages++;
}
}
else if ((opaque->btpo_flags & BTP_HALF_DEAD) ||
P_FIRSTDATAKEY(opaque) > PageGetMaxOffsetNumber(page))
{
/* Empty, try to delete */
int ndel;
/* Run pagedel in a temp context to avoid memory leakage */
MemoryContextReset(mycontext);
oldcontext = MemoryContextSwitchTo(mycontext);
ndel = _bt_pagedel(rel, buf, info->vacuum_full);
pages_deleted += ndel;
/*
* During VACUUM FULL it's okay to recycle deleted pages
* immediately, since there can be no other transactions
* scanning the index. Note that we will only recycle the
* current page and not any parent pages that _bt_pagedel
* might have recursed to; this seems reasonable in the name
* of simplicity. (Trying to do otherwise would mean we'd
* have to sort the list of recyclable pages we're building.)
*/
if (ndel && info->vacuum_full)
{
if (nFreePages < maxFreePages)
{
pageSpaces[nFreePages].blkno = blkno;
/* claimed avail-space must be < BLCKSZ */
pageSpaces[nFreePages].avail = BLCKSZ-1;
nFreePages++;
}
}
MemoryContextSwitchTo(oldcontext);
continue; /* pagedel released buffer */
}
_bt_relbuf(rel, buf);
}
......@@ -768,6 +812,13 @@ btvacuumcleanup(PG_FUNCTION_ARGS)
pfree(pageSpaces);
MemoryContextDelete(mycontext);
if (pages_deleted > 0)
elog(info->message_level, "Index %s: %u pages, deleted %u; %u now free",
RelationGetRelationName(rel),
num_pages, pages_deleted, nFreePages);
/* update statistics */
stats->num_pages = num_pages;
stats->pages_free = nFreePages;
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.1 2003/02/21 00:06:21 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtxlog.c,v 1.2 2003/02/23 06:17:13 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -403,6 +403,171 @@ btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
UnlockAndWriteBuffer(buffer);
}
static void
btree_xlog_delete_page(bool redo, bool ismeta,
XLogRecPtr lsn, XLogRecord *record)
{
xl_btree_delete_page *xlrec = (xl_btree_delete_page *) XLogRecGetData(record);
Relation reln;
BlockNumber parent;
BlockNumber target;
BlockNumber leftsib;
BlockNumber rightsib;
Buffer buffer;
Page page;
BTPageOpaque pageop;
char *op = (redo) ? "redo" : "undo";
reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
if (!RelationIsValid(reln))
return;
parent = ItemPointerGetBlockNumber(&(xlrec->target.tid));
target = xlrec->deadblk;
leftsib = xlrec->leftblk;
rightsib = xlrec->rightblk;
/* parent page */
if (redo && !(record->xl_info & XLR_BKP_BLOCK_1))
{
buffer = XLogReadBuffer(false, reln, parent);
if (!BufferIsValid(buffer))
elog(PANIC, "btree_delete_page_redo: parent block unfound");
page = (Page) BufferGetPage(buffer);
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (PageIsNew((PageHeader) page))
elog(PANIC, "btree_delete_page_redo: uninitialized parent page");
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockAndReleaseBuffer(buffer);
}
else
{
OffsetNumber poffset;
poffset = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
if (poffset >= PageGetMaxOffsetNumber(page))
{
Assert(poffset == P_FIRSTDATAKEY(pageop));
PageIndexTupleDelete(page, poffset);
pageop->btpo_flags |= BTP_HALF_DEAD;
}
else
{
ItemId itemid;
BTItem btitem;
OffsetNumber nextoffset;
itemid = PageGetItemId(page, poffset);
btitem = (BTItem) PageGetItem(page, itemid);
ItemPointerSet(&(btitem->bti_itup.t_tid), rightsib, P_HIKEY);
nextoffset = OffsetNumberNext(poffset);
PageIndexTupleDelete(page, nextoffset);
}
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
}
/* Fix left-link of right sibling */
if (redo && !(record->xl_info & XLR_BKP_BLOCK_2))
{
buffer = XLogReadBuffer(false, reln, rightsib);
if (!BufferIsValid(buffer))
elog(PANIC, "btree_delete_page_redo: lost right sibling");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "btree_delete_page_redo: uninitialized right sibling");
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockAndReleaseBuffer(buffer);
}
else
{
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_prev = leftsib;
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
}
/* Fix right-link of left sibling, if any */
if (redo && !(record->xl_info & XLR_BKP_BLOCK_3))
{
if (leftsib != P_NONE)
{
buffer = XLogReadBuffer(false, reln, leftsib);
if (!BufferIsValid(buffer))
elog(PANIC, "btree_delete_page_redo: lost left sibling");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "btree_delete_page_redo: uninitialized left sibling");
if (XLByteLE(lsn, PageGetLSN(page)))
{
UnlockAndReleaseBuffer(buffer);
}
else
{
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
pageop->btpo_next = rightsib;
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
}
}
/* Rewrite target page as empty deleted page */
buffer = XLogReadBuffer(false, reln, target);
if (!BufferIsValid(buffer))
elog(PANIC, "btree_delete_page_%s: lost target page", op);
page = (Page) BufferGetPage(buffer);
if (redo)
_bt_pageinit(page, BufferGetPageSize(buffer));
else if (PageIsNew((PageHeader) page))
elog(PANIC, "btree_delete_page_undo: uninitialized target page");
pageop = (BTPageOpaque) PageGetSpecialPointer(page);
if (redo)
{
pageop->btpo_prev = leftsib;
pageop->btpo_next = rightsib;
pageop->btpo.xact = FrozenTransactionId;
pageop->btpo_flags = BTP_DELETED;
PageSetLSN(page, lsn);
PageSetSUI(page, ThisStartUpID);
UnlockAndWriteBuffer(buffer);
}
else
{
/* undo */
if (XLByteLT(PageGetLSN(page), lsn))
elog(PANIC, "btree_delete_page_undo: bad left sibling LSN");
elog(PANIC, "btree_delete_page_undo: unimplemented");
}
/* Update metapage if needed */
if (redo) /* metapage changes not undoable */
{
if (ismeta)
{
xl_btree_metadata md;
memcpy(&md, (char *) xlrec + SizeOfBtreeDeletePage,
sizeof(xl_btree_metadata));
_bt_restore_meta(reln, lsn,
md.root, md.level,
md.fastroot, md.fastlevel);
}
}
}
static void
btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
......@@ -534,8 +699,10 @@ btree_redo(XLogRecPtr lsn, XLogRecord *record)
btree_xlog_delete(true, lsn, record);
break;
case XLOG_BTREE_DELETE_PAGE:
btree_xlog_delete_page(true, false, lsn, record);
break;
case XLOG_BTREE_DELETE_PAGE_META:
// ???
btree_xlog_delete_page(true, true, lsn, record);
break;
case XLOG_BTREE_NEWROOT:
btree_xlog_newroot(true, lsn, record);
......@@ -583,8 +750,10 @@ btree_undo(XLogRecPtr lsn, XLogRecord *record)
btree_xlog_delete(false, lsn, record);
break;
case XLOG_BTREE_DELETE_PAGE:
btree_xlog_delete_page(false, false, lsn, record);
break;
case XLOG_BTREE_DELETE_PAGE_META:
// ???
btree_xlog_delete_page(false, true, lsn, record);
break;
case XLOG_BTREE_NEWROOT:
btree_xlog_newroot(false, lsn, record);
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/freespace/freespace.c,v 1.14 2002/09/20 19:56:01 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/freespace/freespace.c,v 1.15 2003/02/23 06:17:13 tgl Exp $
*
*
* NOTES:
......@@ -681,7 +681,9 @@ free_chunk_chain(FSMChunk *fchunk)
* Look to see if a page with at least the specified amount of space is
* available in the given FSMRelation. If so, return its page number,
* and advance the nextPage counter so that the next inquiry will return
* a different page if possible. Return InvalidBlockNumber if no success.
* a different page if possible; also update the entry to show that the
* requested space is not available anymore. Return InvalidBlockNumber
* if no success.
*/
static BlockNumber
find_free_space(FSMRelation *fsmrel, Size spaceNeeded)
......@@ -713,6 +715,12 @@ find_free_space(FSMRelation *fsmrel, Size spaceNeeded)
/* Check the next page */
if ((Size) curChunk->bytes[chunkRelIndex] >= spaceNeeded)
{
/*
* Found what we want --- adjust the entry. In theory we could
* delete the entry immediately if it drops below threshold,
* but it seems better to wait till we next need space.
*/
curChunk->bytes[chunkRelIndex] -= (ItemLength) spaceNeeded;
fsmrel->nextPage = pageIndex + 1;
return curChunk->pages[chunkRelIndex];
}
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Id: itup.h,v 1.36 2002/08/25 17:20:01 tgl Exp $
* $Id: itup.h,v 1.37 2003/02/23 06:17:13 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -133,11 +133,11 @@ typedef InsertIndexResultData *InsertIndexResult;
)
/* indextuple.h */
/* routines in indextuple.c */
extern IndexTuple index_formtuple(TupleDesc tupleDescriptor,
Datum *value, char *null);
extern Datum nocache_index_getattr(IndexTuple tup, int attnum,
TupleDesc tupleDesc, bool *isnull);
extern void CopyIndexTuple(IndexTuple source, IndexTuple *target);
extern IndexTuple CopyIndexTuple(IndexTuple source);
#endif /* ITUP_H */
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Id: nbtree.h,v 1.65 2003/02/22 00:45:05 tgl Exp $
* $Id: nbtree.h,v 1.66 2003/02/23 06:17:13 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -118,6 +118,8 @@ typedef struct BTItemData
typedef BTItemData *BTItem;
#define CopyBTItem(btitem) ((BTItem) CopyIndexTuple((IndexTuple) (btitem)))
/*
* For XLOG: size without alignment. Sizeof works as long as
* IndexTupleData has exactly 8 bytes.
......@@ -434,6 +436,7 @@ extern Datum btvacuumcleanup(PG_FUNCTION_ARGS);
*/
extern InsertIndexResult _bt_doinsert(Relation rel, BTItem btitem,
bool index_is_unique, Relation heapRel);
extern Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
extern void _bt_insert_parent(Relation rel, Buffer buf, Buffer rbuf,
BTStack stack, bool is_root, bool is_only);
......@@ -448,8 +451,10 @@ extern void _bt_relbuf(Relation rel, Buffer buf);
extern void _bt_wrtbuf(Relation rel, Buffer buf);
extern void _bt_wrtnorelbuf(Relation rel, Buffer buf);
extern void _bt_pageinit(Page page, Size size);
extern bool _bt_page_recyclable(Page page);
extern void _bt_metaproot(Relation rel, BlockNumber rootbknum, uint32 level);
extern void _bt_itemdel(Relation rel, Buffer buf, ItemPointer tid);
extern int _bt_pagedel(Relation rel, Buffer buf, bool vacuum_full);
/*
* prototypes for functions in nbtsearch.c
......@@ -488,7 +493,6 @@ extern BTItem _bt_formitem(IndexTuple itup);
/*
* prototypes for functions in nbtsort.c
*/
typedef struct BTSpool BTSpool; /* opaque type known only within nbtsort.c */
extern BTSpool *_bt_spoolinit(Relation index, bool isunique);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment