Commit 24992c6d authored by Tom Lane's avatar Tom Lane

Rewrite PageIndexDeleteNoCompact into a form that only deletes 1 tuple.

The full generality of deleting an arbitrary number of tuples is no longer
needed, so let's save some code and cycles by replacing the original coding
with an implementation based on PageIndexTupleDelete.

We can always get back the old code from git if we need it again for new
callers (though I don't care for its willingness to mess with line pointers
it wasn't told to mess with).

Discussion: <552.1473445163@sss.pgh.pa.us>
parent 1a4be103
...@@ -245,7 +245,7 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange, ...@@ -245,7 +245,7 @@ brin_doupdate(Relation idxrel, BlockNumber pagesPerRange,
if (extended) if (extended)
brin_page_init(BufferGetPage(newbuf), BRIN_PAGETYPE_REGULAR); brin_page_init(BufferGetPage(newbuf), BRIN_PAGETYPE_REGULAR);
PageIndexDeleteNoCompact(oldpage, &oldoff, 1); PageIndexTupleDeleteNoCompact(oldpage, oldoff);
newoff = PageAddItem(newpage, (Item) newtup, newsz, newoff = PageAddItem(newpage, (Item) newtup, newsz,
InvalidOffsetNumber, false, false); InvalidOffsetNumber, false, false);
if (newoff == InvalidOffsetNumber) if (newoff == InvalidOffsetNumber)
......
...@@ -148,10 +148,8 @@ brin_xlog_update(XLogReaderState *record) ...@@ -148,10 +148,8 @@ brin_xlog_update(XLogReaderState *record)
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
offnum = xlrec->oldOffnum; offnum = xlrec->oldOffnum;
if (PageGetMaxOffsetNumber(page) + 1 < offnum)
elog(PANIC, "brin_xlog_update: invalid max offset number");
PageIndexDeleteNoCompact(page, &offnum, 1); PageIndexTupleDeleteNoCompact(page, offnum);
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
MarkBufferDirty(buffer); MarkBufferDirty(buffer);
......
...@@ -415,8 +415,7 @@ PageRestoreTempPage(Page tempPage, Page oldPage) ...@@ -415,8 +415,7 @@ PageRestoreTempPage(Page tempPage, Page oldPage)
} }
/* /*
* sorting support for PageRepairFragmentation, PageIndexMultiDelete, * sorting support for PageRepairFragmentation and PageIndexMultiDelete
* PageIndexDeleteNoCompact
*/ */
typedef struct itemIdSortData typedef struct itemIdSortData
{ {
...@@ -762,15 +761,14 @@ PageIndexTupleDelete(Page page, OffsetNumber offnum) ...@@ -762,15 +761,14 @@ PageIndexTupleDelete(Page page, OffsetNumber offnum)
* Now move everything between the old upper bound (beginning of tuple * Now move everything between the old upper bound (beginning of tuple
* space) and the beginning of the deleted tuple forward, so that space in * space) and the beginning of the deleted tuple forward, so that space in
* the middle of the page is left free. If we've just deleted the tuple * the middle of the page is left free. If we've just deleted the tuple
* at the beginning of tuple space, then there's no need to do the copy * at the beginning of tuple space, then there's no need to do the copy.
* (and bcopy on some architectures SEGV's if asked to move zero bytes).
*/ */
/* beginning of tuple space */ /* beginning of tuple space */
addr = (char *) page + phdr->pd_upper; addr = (char *) page + phdr->pd_upper;
if (offset > phdr->pd_upper) if (offset > phdr->pd_upper)
memmove(addr + size, addr, (int) (offset - phdr->pd_upper)); memmove(addr + size, addr, offset - phdr->pd_upper);
/* adjust free space boundary pointers */ /* adjust free space boundary pointers */
phdr->pd_upper += size; phdr->pd_upper += size;
...@@ -918,155 +916,105 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems) ...@@ -918,155 +916,105 @@ PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems)
compactify_tuples(itemidbase, nused, page); compactify_tuples(itemidbase, nused, page);
} }
/* /*
* PageIndexDeleteNoCompact * PageIndexTupleDeleteNoCompact
* Delete the given items for an index page, and defragment the resulting
* free space, but do not compact the item pointers array.
*
* itemnos is the array of tuples to delete; nitems is its size. maxIdxTuples
* is the maximum number of tuples that can exist in a page.
* *
* Unused items at the end of the array are removed. * Remove the specified tuple from an index page, but set its line pointer
* to "unused" instead of compacting it out, except that it can be removed
* if it's the last line pointer on the page.
* *
* This is used for index AMs that require that existing TIDs of live tuples * This is used for index AMs that require that existing TIDs of live tuples
* remain unchanged. * remain unchanged, and are willing to allow unused line pointers instead.
*/ */
void void
PageIndexDeleteNoCompact(Page page, OffsetNumber *itemnos, int nitems) PageIndexTupleDeleteNoCompact(Page page, OffsetNumber offnum)
{ {
PageHeader phdr = (PageHeader) page; PageHeader phdr = (PageHeader) page;
LocationIndex pd_lower = phdr->pd_lower; char *addr;
LocationIndex pd_upper = phdr->pd_upper; ItemId tup;
LocationIndex pd_special = phdr->pd_special; Size size;
unsigned offset;
int nline; int nline;
bool empty;
OffsetNumber offnum;
int nextitm;
/* /*
* As with PageRepairFragmentation, paranoia seems justified. * As with PageRepairFragmentation, paranoia seems justified.
*/ */
if (pd_lower < SizeOfPageHeaderData || if (phdr->pd_lower < SizeOfPageHeaderData ||
pd_lower > pd_upper || phdr->pd_lower > phdr->pd_upper ||
pd_upper > pd_special || phdr->pd_upper > phdr->pd_special ||
pd_special > BLCKSZ || phdr->pd_special > BLCKSZ ||
pd_special != MAXALIGN(pd_special)) phdr->pd_special != MAXALIGN(phdr->pd_special))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u", errmsg("corrupted page pointers: lower = %u, upper = %u, special = %u",
pd_lower, pd_upper, pd_special))); phdr->pd_lower, phdr->pd_upper, phdr->pd_special)));
/*
* Scan the existing item pointer array and mark as unused those that are
* in our kill-list; make sure any non-interesting ones are marked unused
* as well.
*/
nline = PageGetMaxOffsetNumber(page); nline = PageGetMaxOffsetNumber(page);
empty = true; if ((int) offnum <= 0 || (int) offnum > nline)
nextitm = 0; elog(ERROR, "invalid index offnum: %u", offnum);
for (offnum = FirstOffsetNumber; offnum <= nline; offnum = OffsetNumberNext(offnum))
{
ItemId lp;
ItemLength itemlen;
ItemOffset offset;
lp = PageGetItemId(page, offnum);
itemlen = ItemIdGetLength(lp); tup = PageGetItemId(page, offnum);
offset = ItemIdGetOffset(lp); Assert(ItemIdHasStorage(tup));
size = ItemIdGetLength(tup);
offset = ItemIdGetOffset(tup);
if (ItemIdIsUsed(lp)) if (offset < phdr->pd_upper || (offset + size) > phdr->pd_special ||
{
if (offset < pd_upper ||
(offset + itemlen) > pd_special ||
offset != MAXALIGN(offset)) offset != MAXALIGN(offset))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DATA_CORRUPTED), (errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted item pointer: offset = %u, length = %u", errmsg("corrupted item pointer: offset = %u, size = %u",
offset, (unsigned int) itemlen))); offset, (unsigned int) size)));
if (nextitm < nitems && offnum == itemnos[nextitm])
{
/* this one is on our list to delete, so mark it unused */
ItemIdSetUnused(lp);
nextitm++;
}
else if (ItemIdHasStorage(lp))
{
/* This one's live -- must do the compaction dance */
empty = false;
}
else
{
/* get rid of this one too */
ItemIdSetUnused(lp);
}
}
}
/* this will catch invalid or out-of-order itemnos[] */ /* Amount of space to actually be deleted */
if (nextitm != nitems) size = MAXALIGN(size);
elog(ERROR, "incorrect index offsets supplied");
if (empty) /*
{ * Either set the item pointer to "unused", or zap it if it's the last
/* Page is completely empty, so just reset it quickly */ * one. (Note: it's possible that the next-to-last one(s) are already
phdr->pd_lower = SizeOfPageHeaderData; * unused, but we do not trouble to try to compact them out if so.)
phdr->pd_upper = pd_special; */
} if ((int) offnum < nline)
ItemIdSetUnused(tup);
else else
{ {
/* There are live items: need to compact the page the hard way */ phdr->pd_lower -= sizeof(ItemIdData);
itemIdSortData itemidbase[MaxOffsetNumber]; nline--; /* there's one less than when we started */
itemIdSort itemidptr; }
int i;
Size totallen;
/* /*
* Scan the page taking note of each item that we need to preserve. * Now move everything between the old upper bound (beginning of tuple
* This includes both live items (those that contain data) and * space) and the beginning of the deleted tuple forward, so that space in
* interspersed unused ones. It's critical to preserve these unused * the middle of the page is left free. If we've just deleted the tuple
* items, because otherwise the offset numbers for later live items * at the beginning of tuple space, then there's no need to do the copy.
* would change, which is not acceptable. Unused items might get used
* again later; that is fine.
*/ */
itemidptr = itemidbase;
totallen = 0;
PageClearHasFreeLinePointers(page);
for (i = 0; i < nline; i++)
{
ItemId lp;
itemidptr->offsetindex = i; /* beginning of tuple space */
addr = (char *) page + phdr->pd_upper;
lp = PageGetItemId(page, i + 1); if (offset > phdr->pd_upper)
if (ItemIdHasStorage(lp)) memmove(addr + size, addr, offset - phdr->pd_upper);
{
itemidptr->itemoff = ItemIdGetOffset(lp);
itemidptr->alignedlen = MAXALIGN(ItemIdGetLength(lp));
totallen += itemidptr->alignedlen;
itemidptr++;
}
else
{
PageSetHasFreeLinePointers(page);
ItemIdSetUnused(lp);
}
}
nline = itemidptr - itemidbase;
/* By here, there are exactly nline elements in itemidbase array */
if (totallen > (Size) (pd_special - pd_lower)) /* adjust free space boundary pointer */
ereport(ERROR, phdr->pd_upper += size;
(errcode(ERRCODE_DATA_CORRUPTED),
errmsg("corrupted item lengths: total %u, available space %u",
(unsigned int) totallen, pd_special - pd_lower)));
/* /*
* Defragment the data areas of each tuple, being careful to preserve * Finally, we need to adjust the linp entries that remain.
* each item's position in the linp array. *
* Anything that used to be before the deleted tuple's data was moved
* forward by the size of the deleted tuple.
*/ */
compactify_tuples(itemidbase, nline, page); if (!PageIsEmpty(page))
{
int i;
for (i = 1; i <= nline; i++)
{
ItemId ii = PageGetItemId(phdr, i);
if (ItemIdHasStorage(ii) && ItemIdGetOffset(ii) <= offset)
ii->lp_off += size;
}
} }
} }
......
...@@ -429,8 +429,7 @@ extern Size PageGetExactFreeSpace(Page page); ...@@ -429,8 +429,7 @@ extern Size PageGetExactFreeSpace(Page page);
extern Size PageGetHeapFreeSpace(Page page); extern Size PageGetHeapFreeSpace(Page page);
extern void PageIndexTupleDelete(Page page, OffsetNumber offset); extern void PageIndexTupleDelete(Page page, OffsetNumber offset);
extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems); extern void PageIndexMultiDelete(Page page, OffsetNumber *itemnos, int nitems);
extern void PageIndexDeleteNoCompact(Page page, OffsetNumber *itemnos, extern void PageIndexTupleDeleteNoCompact(Page page, OffsetNumber offset);
int nitems);
extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum, extern bool PageIndexTupleOverwrite(Page page, OffsetNumber offnum,
Item newtup, Size newsize); Item newtup, Size newsize);
extern char *PageSetChecksumCopy(Page page, BlockNumber blkno); extern char *PageSetChecksumCopy(Page page, BlockNumber blkno);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment