Commit 631118fe authored by Heikki Linnakangas's avatar Heikki Linnakangas

Get rid of the post-recovery cleanup step of GIN page splits.

Replace it with an approach similar to what GiST uses: when a page is split,
the left sibling is marked with a flag indicating that the parent hasn't been
updated yet. When the parent is updated, the flag is cleared. If an insertion
steps on a page with the flag set, it will finish split before proceeding
with the insertion.

The post-recovery cleanup mechanism was never totally reliable, as insertion
to the parent could fail e.g because of running out of memory or disk space,
leaving the tree in an inconsistent state.

This also divides the responsibility of WAL-logging more clearly between
the generic ginbtree.c code, and the parts specific to entry and posting
trees. There is now a common WAL record format for insertions and deletions,
which is written by ginbtree.c, followed by tree-specific payload, which is
returned by the placetopage- and split- callbacks.
parent ce5326ee
......@@ -18,6 +18,13 @@
#include "miscadmin.h"
#include "utils/rel.h"
static void ginFindParents(GinBtree btree, GinBtreeStack *stack);
static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
void *insertdata, BlockNumber updateblkno,
Buffer childbuf, GinStatsData *buildStats);
static void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
bool freestack, GinStatsData *buildStats);
/*
* Lock buffer by needed method for search.
*/
......@@ -85,6 +92,13 @@ ginFindLeafPage(GinBtree btree, bool searchMode)
access = ginTraverseLock(stack->buffer, searchMode);
/*
* If we're going to modify the tree, finish any incomplete splits we
* encounter on the way.
*/
if (!searchMode && GinPageIsIncompleteSplit(page))
ginFinishSplit(btree, stack, false, NULL);
/*
* ok, page is correctly locked, we should check to move right ..,
* root never has a right link, so small optimization
......@@ -101,6 +115,9 @@ ginFindLeafPage(GinBtree btree, bool searchMode)
stack->buffer = ginStepRight(stack->buffer, btree->index, access);
stack->blkno = rightlink;
page = BufferGetPage(stack->buffer);
if (!searchMode && GinPageIsIncompleteSplit(page))
ginFinishSplit(btree, stack, false, NULL);
}
if (GinPageIsLeaf(page)) /* we found, return locked page */
......@@ -187,7 +204,7 @@ freeGinBtreeStack(GinBtreeStack *stack)
* child's offset in stack->parent. The root page is never released, to
* to prevent conflict with vacuum process.
*/
void
static void
ginFindParents(GinBtree btree, GinBtreeStack *stack)
{
Page page;
......@@ -195,58 +212,55 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
BlockNumber blkno,
leftmostBlkno;
OffsetNumber offset;
GinBtreeStack *root = stack->parent;
GinBtreeStack *root;
GinBtreeStack *ptr;
if (!root)
/*
* Unwind the stack all the way up to the root, leaving only the root
* item.
*
* Be careful not to release the pin on the root page! The pin on root
* page is required to lock out concurrent vacuums on the tree.
*/
root = stack->parent;
while (root->parent)
{
/* XLog mode... */
root = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
root->blkno = btree->rootBlkno;
root->buffer = ReadBuffer(btree->index, btree->rootBlkno);
LockBuffer(root->buffer, GIN_EXCLUSIVE);
root->parent = NULL;
ReleaseBuffer(root->buffer);
root = root->parent;
}
else
{
/*
* find root, we should not release root page until update is
* finished!!
*/
while (root->parent)
{
ReleaseBuffer(root->buffer);
root = root->parent;
}
Assert(root->blkno == btree->rootBlkno);
Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
LockBuffer(root->buffer, GIN_EXCLUSIVE);
}
Assert(root->blkno == btree->rootBlkno);
Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
root->off = InvalidOffsetNumber;
page = BufferGetPage(root->buffer);
Assert(!GinPageIsLeaf(page));
/* check trivial case */
if ((root->off = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) != InvalidOffsetNumber)
{
stack->parent = root;
return;
}
blkno = root->blkno;
buffer = root->buffer;
offset = InvalidOffsetNumber;
leftmostBlkno = blkno = btree->getLeftMostChild(btree, page);
LockBuffer(root->buffer, GIN_UNLOCK);
Assert(blkno != InvalidBlockNumber);
ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
for (;;)
{
buffer = ReadBuffer(btree->index, blkno);
LockBuffer(buffer, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
if (GinPageIsLeaf(page))
elog(ERROR, "Lost path");
if (GinPageIsIncompleteSplit(page))
{
Assert(blkno != btree->rootBlkno);
ptr->blkno = blkno;
ptr->buffer = buffer;
/*
* parent may be wrong, but if so, the ginFinishSplit call will
* recurse to call ginFindParents again to fix it.
*/
ptr->parent = root;
ptr->off = InvalidOffsetNumber;
ginFinishSplit(btree, ptr, false, NULL);
}
leftmostBlkno = btree->getLeftMostChild(btree, page);
while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber)
......@@ -259,11 +273,22 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
}
buffer = ginStepRight(buffer, btree->index, GIN_EXCLUSIVE);
page = BufferGetPage(buffer);
/* finish any incomplete splits, as above */
if (GinPageIsIncompleteSplit(page))
{
Assert(blkno != btree->rootBlkno);
ptr->blkno = blkno;
ptr->buffer = buffer;
ptr->parent = root;
ptr->off = InvalidOffsetNumber;
ginFinishSplit(btree, ptr, false, NULL);
}
}
if (blkno != InvalidBlockNumber)
{
ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
ptr->blkno = blkno;
ptr->buffer = buffer;
ptr->parent = root; /* it may be wrong, but in next call we will
......@@ -273,7 +298,9 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
return;
}
/* Descend down to next level */
blkno = leftmostBlkno;
buffer = ReadBuffer(btree->index, blkno);
}
}
......@@ -284,19 +311,38 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack)
* the parent needs to be updated. (a root split returns true as it doesn't
* need any further action by the caller to complete)
*
* When inserting a downlink to a internal page, the existing item at the
* given location is updated to point to 'updateblkno'.
* When inserting a downlink to a internal page, 'childbuf' contains the
* child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
* atomically with the insert. Also, the existing item at the given location
* is updated to point to 'updateblkno'.
*
* stack->buffer is locked on entry, and is kept locked.
*/
static bool
ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
void *insertdata, BlockNumber updateblkno,
GinStatsData *buildStats)
Buffer childbuf, GinStatsData *buildStats)
{
Page page = BufferGetPage(stack->buffer);
XLogRecData *rdata;
XLogRecData *payloadrdata;
bool fit;
uint16 xlflags = 0;
Page childpage = NULL;
if (GinPageIsData(page))
xlflags |= GIN_INSERT_ISDATA;
if (GinPageIsLeaf(page))
{
xlflags |= GIN_INSERT_ISLEAF;
Assert(!BufferIsValid(childbuf));
Assert(updateblkno == InvalidBlockNumber);
}
else
{
Assert(BufferIsValid(childbuf));
Assert(updateblkno != InvalidBlockNumber);
childpage = BufferGetPage(childbuf);
}
/*
* Try to put the incoming tuple on the page. If it doesn't fit,
......@@ -306,17 +352,63 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
START_CRIT_SECTION();
fit = btree->placeToPage(btree, stack->buffer, stack->off,
insertdata, updateblkno,
&rdata);
&payloadrdata);
if (fit)
{
MarkBufferDirty(stack->buffer);
/* An insert to an internal page finishes the split of the child. */
if (childbuf != InvalidBuffer)
{
GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
MarkBufferDirty(childbuf);
}
if (RelationNeedsWAL(btree->index))
{
XLogRecPtr recptr;
XLogRecData rdata[3];
ginxlogInsert xlrec;
BlockIdData childblknos[2];
xlrec.node = btree->index->rd_node;
xlrec.blkno = BufferGetBlockNumber(stack->buffer);
xlrec.offset = stack->off;
xlrec.flags = xlflags;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(ginxlogInsert);
/*
* Log information about child if this was an insertion of a
* downlink.
*/
if (childbuf != InvalidBuffer)
{
rdata[0].next = &rdata[1];
BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) childblknos;
rdata[1].len = sizeof(BlockIdData) * 2;
rdata[1].next = &rdata[2];
rdata[2].buffer = childbuf;
rdata[2].buffer_std = false;
rdata[2].data = NULL;
rdata[2].len = 0;
rdata[2].next = payloadrdata;
}
else
rdata[0].next = payloadrdata;
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
PageSetLSN(page, recptr);
if (childbuf != InvalidBuffer)
PageSetLSN(childpage, recptr);
}
END_CRIT_SECTION();
......@@ -329,13 +421,16 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
Buffer rbuffer;
Page newlpage;
BlockNumber savedRightLink;
GinBtreeStack *parent;
Page lpage,
rpage;
Page rpage;
XLogRecData rdata[2];
ginxlogSplit data;
Buffer lbuffer = InvalidBuffer;
Page newrootpg = NULL;
END_CRIT_SECTION();
rbuffer = GinNewBuffer(btree->index);
/* During index build, count the new page */
if (buildStats)
{
......@@ -353,21 +448,50 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
*/
newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off,
insertdata, updateblkno,
&rdata);
&payloadrdata);
((ginxlogSplit *) (rdata->data))->rootBlkno = btree->rootBlkno;
data.node = btree->index->rd_node;
data.rblkno = BufferGetBlockNumber(rbuffer);
data.flags = xlflags;
if (childbuf != InvalidBuffer)
{
Page childpage = BufferGetPage(childbuf);
GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
parent = stack->parent;
data.leftChildBlkno = BufferGetBlockNumber(childbuf);
data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
}
else
data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &data;
rdata[0].len = sizeof(ginxlogSplit);
if (childbuf != InvalidBuffer)
{
rdata[0].next = &rdata[1];
rdata[1].buffer = childbuf;
rdata[1].buffer_std = false;
rdata[1].data = NULL;
rdata[1].len = 0;
rdata[1].next = payloadrdata;
}
else
rdata[0].next = payloadrdata;
rpage = BufferGetPage(rbuffer);
if (parent == NULL)
if (stack->parent == NULL)
{
/*
* split root, so we need to allocate new left page and place
* pointer on root to left and right page
*/
Buffer lbuffer = GinNewBuffer(btree->index);
lbuffer = GinNewBuffer(btree->index);
/* During index build, count the new page */
/* During index build, count the newly-added root page */
if (buildStats)
{
if (btree->isData)
......@@ -376,82 +500,97 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
buildStats->nEntryPages++;
}
((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
lpage = BufferGetPage(lbuffer);
rpage = BufferGetPage(rbuffer);
/*
* root never has a right-link, so we borrow the rrlink field to
* store the root block number.
*/
data.rrlink = BufferGetBlockNumber(stack->buffer);
data.lblkno = BufferGetBlockNumber(lbuffer);
data.flags |= GIN_SPLIT_ROOT;
GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
START_CRIT_SECTION();
GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
PageRestoreTempPage(newlpage, lpage);
btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
MarkBufferDirty(rbuffer);
MarkBufferDirty(lbuffer);
MarkBufferDirty(stack->buffer);
if (RelationNeedsWAL(btree->index))
{
XLogRecPtr recptr;
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
PageSetLSN(page, recptr);
PageSetLSN(lpage, recptr);
PageSetLSN(rpage, recptr);
}
UnlockReleaseBuffer(rbuffer);
UnlockReleaseBuffer(lbuffer);
END_CRIT_SECTION();
/* During index build, count the newly-added root page */
if (buildStats)
{
if (btree->isData)
buildStats->nDataPages++;
else
buildStats->nEntryPages++;
}
/*
* Construct a new root page containing downlinks to the new left
* and right pages. (do this in a temporary copy first rather
* than overwriting the original page directly, so that we can still
* abort gracefully if this fails.)
*/
newrootpg = PageGetTempPage(rpage);
GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF, BLCKSZ);
return true;
btree->fillRoot(btree, newrootpg,
BufferGetBlockNumber(lbuffer), newlpage,
BufferGetBlockNumber(rbuffer), rpage);
}
else
{
/* split non-root page */
((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
lpage = BufferGetPage(stack->buffer);
rpage = BufferGetPage(rbuffer);
data.rrlink = savedRightLink;
data.lblkno = BufferGetBlockNumber(stack->buffer);
GinPageGetOpaque(rpage)->rightlink = savedRightLink;
GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
}
/*
* Ok, we have the new contents of the left page in a temporary copy
* now (newlpage), and the newly-allocated right block has been filled
* in. The original page is still unchanged.
*
* If this is a root split, we also have a temporary page containing
* the new contents of the root. Copy the new left page to a
* newly-allocated block, and initialize the (original) root page the
* new copy. Otherwise, copy over the temporary copy of the new left
* page over the old left page.
*/
START_CRIT_SECTION();
PageRestoreTempPage(newlpage, lpage);
START_CRIT_SECTION();
MarkBufferDirty(rbuffer);
MarkBufferDirty(stack->buffer);
MarkBufferDirty(rbuffer);
if (RelationNeedsWAL(btree->index))
{
XLogRecPtr recptr;
if (stack->parent == NULL)
{
PageRestoreTempPage(newlpage, BufferGetPage(lbuffer));
MarkBufferDirty(lbuffer);
newlpage = newrootpg;
}
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
PageSetLSN(lpage, recptr);
PageSetLSN(rpage, recptr);
}
UnlockReleaseBuffer(rbuffer);
END_CRIT_SECTION();
PageRestoreTempPage(newlpage, BufferGetPage(stack->buffer));
MarkBufferDirty(stack->buffer);
return false;
/* write WAL record */
if (RelationNeedsWAL(btree->index))
{
XLogRecPtr recptr;
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
PageSetLSN(BufferGetPage(stack->buffer), recptr);
PageSetLSN(rpage, recptr);
if (stack->parent == NULL)
PageSetLSN(BufferGetPage(lbuffer), recptr);
}
END_CRIT_SECTION();
/*
* We can release the lock on the right page now, but keep the
* original buffer locked.
*/
UnlockReleaseBuffer(rbuffer);
if (stack->parent == NULL)
UnlockReleaseBuffer(lbuffer);
/*
* If we split the root, we're done. Otherwise the split is not
* complete until the downlink for the new page has been inserted to
* the parent.
*/
if (stack->parent == NULL)
return true;
else
return false;
}
}
......@@ -460,13 +599,27 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
*
* On entry, stack->buffer is exclusively locked.
*
* NB: the passed-in stack is freed, as though by freeGinBtreeStack.
* If freestack is true, all the buffers are released and unlocked as we
* crawl up the tree, and 'stack' is freed. Otherwise stack->buffer is kept
* locked, and stack is unmodified, except for possibly moving right to find
* the correct parent of page.
*/
void
ginFinishSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
static void
ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
GinStatsData *buildStats)
{
Page page;
bool done;
bool first = true;
/*
* freestack == false when we encounter an incompletely split page during a
* scan, while freestack == true is used in the normal scenario that a
* split is finished right after the initial insert.
*/
if (!freestack)
elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
stack->blkno, RelationGetRelationName(btree->index));
/* this loop crawls up the stack until the insertion is complete */
do
......@@ -475,19 +628,26 @@ ginFinishSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
void *insertdata;
BlockNumber updateblkno;
insertdata = btree->prepareDownlink(btree, stack->buffer);
updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
/* search parent to lock */
LockBuffer(parent->buffer, GIN_EXCLUSIVE);
/*
* If the parent page was incompletely split, finish that split first,
* then continue with the current one.
*
* Note: we have to finish *all* incomplete splits we encounter, even
* if we have to move right. Otherwise we might choose as the target
* a page that has no downlink in the parent, and splitting it further
* would fail.
*/
if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
ginFinishSplit(btree, parent, false, buildStats);
/* move right if it's needed */
page = BufferGetPage(parent->buffer);
while ((parent->off = btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber)
{
BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
if (rightlink == InvalidBlockNumber)
if (GinPageRightMost(page))
{
/*
* rightmost page, but we don't find parent, we should use
......@@ -501,25 +661,44 @@ ginFinishSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
}
parent->buffer = ginStepRight(parent->buffer, btree->index, GIN_EXCLUSIVE);
parent->blkno = rightlink;
parent->blkno = BufferGetBlockNumber(parent->buffer);
page = BufferGetPage(parent->buffer);
}
/* release the child */
UnlockReleaseBuffer(stack->buffer);
pfree(stack);
stack = parent;
if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
ginFinishSplit(btree, parent, false, buildStats);
}
/* insert the downlink to parent */
done = ginPlaceToPage(btree, stack,
/* insert the downlink */
insertdata = btree->prepareDownlink(btree, stack->buffer);
updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
done = ginPlaceToPage(btree, parent,
insertdata, updateblkno,
buildStats);
stack->buffer, buildStats);
pfree(insertdata);
/*
* If the caller requested to free the stack, unlock and release the
* child buffer now. Otherwise keep it pinned and locked, but if we
* have to recurse up the tree, we can unlock the upper pages, only
* keeping the page at the bottom of the stack locked.
*/
if (!first || freestack)
LockBuffer(stack->buffer, GIN_UNLOCK);
if (freestack)
{
ReleaseBuffer(stack->buffer);
pfree(stack);
}
stack = parent;
first = false;
} while (!done);
/* unlock the parent */
LockBuffer(stack->buffer, GIN_UNLOCK);
/* free the rest of the stack */
freeGinBtreeStack(stack);
if (freestack)
freeGinBtreeStack(stack);
}
/*
......@@ -540,14 +719,18 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
{
bool done;
/* If the leaf page was incompletely split, finish the split first */
if (GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
ginFinishSplit(btree, stack, false, buildStats);
done = ginPlaceToPage(btree, stack,
insertdata, InvalidBlockNumber,
buildStats);
InvalidBuffer, buildStats);
if (done)
{
LockBuffer(stack->buffer, GIN_UNLOCK);
freeGinBtreeStack(stack);
}
else
ginFinishSplit(btree, stack, buildStats);
ginFinishSplit(btree, stack, true, buildStats);
}
......@@ -227,6 +227,7 @@ GinDataPageAddItemPointer(Page page, ItemPointer data, OffsetNumber offset)
OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
char *ptr;
Assert(ItemPointerIsValid(data));
Assert(GinPageIsLeaf(page));
if (offset == InvalidOffsetNumber)
......@@ -255,6 +256,7 @@ GinDataPageAddPostingItem(Page page, PostingItem *data, OffsetNumber offset)
OffsetNumber maxoff = GinPageGetOpaque(page)->maxoff;
char *ptr;
Assert(PostingItemGetBlockNumber(data) != InvalidBlockNumber);
Assert(!GinPageIsLeaf(page));
if (offset == InvalidOffsetNumber)
......@@ -338,11 +340,8 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
XLogRecData **prdata)
{
Page page = BufferGetPage(buf);
int cnt = 0;
/* these must be static so they can be returned to caller */
static XLogRecData rdata[3];
static ginxlogInsert data;
static XLogRecData rdata[2];
/* quick exit if it doesn't fit */
if (!dataIsEnoughSpace(btree, buf, off, insertdata))
......@@ -359,45 +358,10 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
PostingItemSetBlockNumber(pitem, updateblkno);
}
data.updateBlkno = updateblkno;
data.node = btree->index->rd_node;
data.blkno = BufferGetBlockNumber(buf);
data.offset = off;
data.nitem = 1;
data.isDelete = FALSE;
data.isData = TRUE;
data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
/*
* Prevent full page write if child's split occurs. That is needed to
* remove incomplete splits while replaying WAL
*
* data.updateBlkno contains new block number (of newly created right
* page) for recently splited page.
*/
if (data.updateBlkno == InvalidBlockNumber)
{
rdata[0].buffer = buf;
rdata[0].buffer_std = FALSE;
rdata[0].data = NULL;
rdata[0].len = 0;
rdata[0].next = &rdata[1];
cnt++;
}
rdata[cnt].buffer = InvalidBuffer;
rdata[cnt].data = (char *) &data;
rdata[cnt].len = sizeof(ginxlogInsert);
rdata[cnt].next = &rdata[cnt + 1];
cnt++;
rdata[cnt].buffer = InvalidBuffer;
/* data and len filled in below */
rdata[cnt].next = NULL;
if (GinPageIsLeaf(page))
{
GinBtreeDataLeafInsertData *items = insertdata;
static ginxlogInsertDataLeaf data;
uint32 savedPos = items->curitem;
if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
......@@ -415,10 +379,18 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
{
GinDataPageAddItemPointer(page, items->items + items->curitem, off);
items->curitem++;
data.nitem = 1;
}
rdata[cnt].data = (char *) &items->items[savedPos];
rdata[cnt].len = sizeof(ItemPointerData) * data.nitem;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &data;
rdata[0].len = offsetof(ginxlogInsertDataLeaf, items);
rdata[0].next = &rdata[1];
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) &items->items[savedPos];
rdata[1].len = sizeof(ItemPointerData) * data.nitem;
rdata[1].next = NULL;
}
else
{
......@@ -426,8 +398,10 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
GinDataPageAddPostingItem(page, pitem, off);
rdata[cnt].data = (char *) pitem;
rdata[cnt].len = sizeof(PostingItem);
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) pitem;
rdata[0].len = sizeof(PostingItem);
rdata[0].next = NULL;
}
return true;
......@@ -456,8 +430,8 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
Size freeSpace;
/* these must be static so they can be returned to caller */
static ginxlogSplit data;
static XLogRecData rdata[4];
static ginxlogSplitData data;
static XLogRecData rdata[2];
static char vector[2 * BLCKSZ];
GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
......@@ -488,6 +462,7 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
if (isleaf && GinPageRightMost(lpage) && off > GinPageGetOpaque(lpage)->maxoff)
{
/* append new items to the end */
GinBtreeDataLeafInsertData *items = insertdata;
while (items->curitem < items->nitem &&
......@@ -566,25 +541,18 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
bound = GinDataPageGetRightBound(rpage);
*bound = oldbound;
data.node = btree->index->rd_node;
data.rootBlkno = InvalidBlockNumber;
data.lblkno = BufferGetBlockNumber(lbuf);
data.rblkno = BufferGetBlockNumber(rbuf);
data.separator = separator;
data.nitem = maxoff;
data.isData = TRUE;
data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
data.isRootSplit = FALSE;
data.rightbound = oldbound;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &data;
rdata[0].len = sizeof(ginxlogSplit);
rdata[0].len = sizeof(ginxlogSplitData);
rdata[0].next = &rdata[1];
rdata[1].buffer = InvalidBuffer;
rdata[1].data = vector;
rdata[1].len = MAXALIGN(maxoff * sizeofitem);
rdata[1].len = maxoff * sizeofitem;
rdata[1].next = NULL;
return lpage;
......@@ -610,21 +578,18 @@ dataPrepareDownlink(GinBtree btree, Buffer lbuf)
* Also called from ginxlog, should not use btree
*/
void
ginDataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage)
{
Page page = BufferGetPage(root),
lpage = BufferGetPage(lbuf),
rpage = BufferGetPage(rbuf);
PostingItem li,
ri;
li.key = *GinDataPageGetRightBound(lpage);
PostingItemSetBlockNumber(&li, BufferGetBlockNumber(lbuf));
GinDataPageAddPostingItem(page, &li, InvalidOffsetNumber);
PostingItemSetBlockNumber(&li, lblkno);
GinDataPageAddPostingItem(root, &li, InvalidOffsetNumber);
ri.key = *GinDataPageGetRightBound(rpage);
PostingItemSetBlockNumber(&ri, BufferGetBlockNumber(rbuf));
GinDataPageAddPostingItem(page, &ri, InvalidOffsetNumber);
PostingItemSetBlockNumber(&ri, rblkno);
GinDataPageAddPostingItem(root, &ri, InvalidOffsetNumber);
}
/*
......
......@@ -504,7 +504,7 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
/* these must be static so they can be returned to caller */
static XLogRecData rdata[3];
static ginxlogInsert data;
static ginxlogInsertEntry data;
/* quick exit if it doesn't fit */
if (!entryIsEnoughSpace(btree, buf, off, insertData))
......@@ -512,7 +512,6 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
*prdata = rdata;
entryPreparePage(btree, page, off, insertData, updateblkno);
data.updateBlkno = updateblkno;
placed = PageAddItem(page,
(Item) insertData->entry,
......@@ -522,34 +521,11 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
elog(ERROR, "failed to add item to index page in \"%s\"",
RelationGetRelationName(btree->index));
data.node = btree->index->rd_node;
data.blkno = BufferGetBlockNumber(buf);
data.offset = off;
data.nitem = 1;
data.isDelete = insertData->isDelete;
data.isData = false;
data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
/*
* Prevent full page write if child's split occurs. That is needed to
* remove incomplete splits while replaying WAL
*
* data.updateBlkno contains new block number (of newly created right
* page) for recently splited page.
*/
if (data.updateBlkno == InvalidBlockNumber)
{
rdata[0].buffer = buf;
rdata[0].buffer_std = TRUE;
rdata[0].data = NULL;
rdata[0].len = 0;
rdata[0].next = &rdata[1];
cnt++;
}
rdata[cnt].buffer = InvalidBuffer;
rdata[cnt].data = (char *) &data;
rdata[cnt].len = sizeof(ginxlogInsert);
rdata[cnt].len = offsetof(ginxlogInsertEntry, tuple);
rdata[cnt].next = &rdata[cnt + 1];
cnt++;
......@@ -577,6 +553,7 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
maxoff,
separator = InvalidOffsetNumber;
Size totalsize = 0;
Size tupstoresize;
Size lsize = 0,
size;
char *ptr;
......@@ -588,18 +565,18 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
/* these must be static so they can be returned to caller */
static XLogRecData rdata[2];
static ginxlogSplit data;
static ginxlogSplitEntry data;
static char tupstore[2 * BLCKSZ];
*prdata = rdata;
data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
InvalidOffsetNumber : GinGetDownlink(insertData->entry);
data.updateBlkno = updateblkno;
entryPreparePage(btree, lpage, off, insertData, updateblkno);
/*
* First, append all the existing tuples and the new tuple we're inserting
* one after another in a temporary workspace.
*/
maxoff = PageGetMaxOffsetNumber(lpage);
ptr = tupstore;
for (i = FirstOffsetNumber; i <= maxoff; i++)
{
if (i == off)
......@@ -624,7 +601,12 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
ptr += size;
totalsize += size + sizeof(ItemIdData);
}
tupstoresize = ptr - tupstore;
/*
* Initialize the left and right pages, and copy all the tuples back to
* them.
*/
GinInitPage(rpage, GinPageGetOpaque(lpage)->flags, pageSize);
GinInitPage(lpage, GinPageGetOpaque(rpage)->flags, pageSize);
......@@ -654,24 +636,17 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
ptr += MAXALIGN(IndexTupleSize(itup));
}
data.node = btree->index->rd_node;
data.rootBlkno = InvalidBlockNumber;
data.lblkno = BufferGetBlockNumber(lbuf);
data.rblkno = BufferGetBlockNumber(rbuf);
data.separator = separator;
data.nitem = maxoff;
data.isData = FALSE;
data.isLeaf = GinPageIsLeaf(lpage) ? TRUE : FALSE;
data.isRootSplit = FALSE;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &data;
rdata[0].len = sizeof(ginxlogSplit);
rdata[0].len = sizeof(ginxlogSplitEntry);
rdata[0].next = &rdata[1];
rdata[1].buffer = InvalidBuffer;
rdata[1].data = tupstore;
rdata[1].len = MAXALIGN(totalsize);
rdata[1].len = tupstoresize;
rdata[1].next = NULL;
return lpage;
......@@ -702,24 +677,19 @@ entryPrepareDownlink(GinBtree btree, Buffer lbuf)
* Also called from ginxlog, should not use btree
*/
void
ginEntryFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf)
ginEntryFillRoot(GinBtree btree, Page root,
BlockNumber lblkno, Page lpage,
BlockNumber rblkno, Page rpage)
{
Page page = BufferGetPage(root);
Page lpage = BufferGetPage(lbuf);
Page rpage = BufferGetPage(rbuf);
IndexTuple itup;
itup = GinFormInteriorTuple(getRightMostTuple(lpage),
lpage,
BufferGetBlockNumber(lbuf));
if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
itup = GinFormInteriorTuple(getRightMostTuple(lpage), lpage, lblkno);
if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index root page");
pfree(itup);
itup = GinFormInteriorTuple(getRightMostTuple(rpage),
rpage,
BufferGetBlockNumber(rbuf));
if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
itup = GinFormInteriorTuple(getRightMostTuple(rpage), rpage, rblkno);
if (PageAddItem(root, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index root page");
pfree(itup);
}
......
......@@ -18,55 +18,27 @@
#include "utils/memutils.h"
static MemoryContext opCtx; /* working memory for operations */
static MemoryContext topCtx;
typedef struct ginIncompleteSplit
{
RelFileNode node;
BlockNumber leftBlkno;
BlockNumber rightBlkno;
BlockNumber rootBlkno;
} ginIncompleteSplit;
static List *incomplete_splits;
static void
pushIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber rightBlkno, BlockNumber rootBlkno)
ginRedoClearIncompleteSplit(XLogRecPtr lsn, RelFileNode node, BlockNumber blkno)
{
ginIncompleteSplit *split;
MemoryContextSwitchTo(topCtx);
split = palloc(sizeof(ginIncompleteSplit));
split->node = node;
split->leftBlkno = leftBlkno;
split->rightBlkno = rightBlkno;
split->rootBlkno = rootBlkno;
incomplete_splits = lappend(incomplete_splits, split);
MemoryContextSwitchTo(opCtx);
}
Buffer buffer;
Page page;
static void
forgetIncompleteSplit(RelFileNode node, BlockNumber leftBlkno, BlockNumber updateBlkno)
{
ListCell *l;
buffer = XLogReadBuffer(node, blkno, false);
if (!BufferIsValid(buffer))
return; /* page was deleted, nothing to do */
page = (Page) BufferGetPage(buffer);
foreach(l, incomplete_splits)
if (lsn > PageGetLSN(page))
{
ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT;
if (RelFileNodeEquals(node, split->node) &&
leftBlkno == split->leftBlkno &&
updateBlkno == split->rightBlkno)
{
incomplete_splits = list_delete_ptr(incomplete_splits, split);
pfree(split);
break;
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
UnlockReleaseBuffer(buffer);
}
static void
......@@ -128,44 +100,105 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
}
static void
ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
ginRedoInsertEntry(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
void *rdata)
{
ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
Buffer buffer;
Page page;
Page page = BufferGetPage(buffer);
ginxlogInsertEntry *data = (ginxlogInsertEntry *) rdata;
IndexTuple itup;
/* first, forget any incomplete split this insertion completes */
if (data->isData)
if (rightblkno != InvalidBlockNumber)
{
Assert(data->isDelete == FALSE);
if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
{
PostingItem *pitem;
/* update link to right page after split */
Assert(!GinPageIsLeaf(page));
Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offset));
GinSetDownlink(itup, rightblkno);
}
pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
forgetIncompleteSplit(data->node,
PostingItemGetBlockNumber(pitem),
data->updateBlkno);
}
if (data->isDelete)
{
Assert(GinPageIsLeaf(page));
Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
PageIndexTupleDelete(page, offset);
}
itup = &data->tuple;
if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), offset, false, false) == InvalidOffsetNumber)
{
RelFileNode node;
ForkNumber forknum;
BlockNumber blknum;
BufferGetTag(buffer, &node, &forknum, &blknum);
elog(ERROR, "failed to add item to index page in %u/%u/%u",
node.spcNode, node.dbNode, node.relNode);
}
}
static void
ginRedoInsertData(Buffer buffer, OffsetNumber offset, BlockNumber rightblkno,
void *rdata)
{
Page page = BufferGetPage(buffer);
if (GinPageIsLeaf(page))
{
ginxlogInsertDataLeaf *data = (ginxlogInsertDataLeaf *) rdata;
ItemPointerData *items = data->items;
OffsetNumber i;
for (i = 0; i < data->nitem; i++)
GinDataPageAddItemPointer(page, &items[i], offset + i);
}
else
{
if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
{
IndexTuple itup;
PostingItem *pitem = (PostingItem *) rdata;
PostingItem *oldpitem;
itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
forgetIncompleteSplit(data->node,
GinGetDownlink(itup),
data->updateBlkno);
}
/* update link to right page after split */
oldpitem = GinDataPageGetPostingItem(page, offset);
PostingItemSetBlockNumber(oldpitem, rightblkno);
GinDataPageAddPostingItem(page, pitem, offset);
}
}
static void
ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
{
ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
Buffer buffer;
Page page;
char *payload;
BlockNumber leftChildBlkno = InvalidBlockNumber;
BlockNumber rightChildBlkno = InvalidBlockNumber;
bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
payload = XLogRecGetData(record) + sizeof(ginxlogInsert);
/*
* First clear incomplete-split flag on child page if this finishes
* a split.
*/
if (!isLeaf)
{
leftChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
payload += sizeof(BlockIdData);
rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
payload += sizeof(BlockIdData);
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
ginRedoClearIncompleteSplit(lsn, data->node, leftChildBlkno);
}
/* If we have a full-page image, restore it and we're done */
if (record->xl_info & XLR_BKP_BLOCK(0))
if (record->xl_info & XLR_BKP_BLOCK(isLeaf ? 0 : 1))
{
(void) RestoreBackupBlock(lsn, record, 0, false, false);
(void) RestoreBackupBlock(lsn, record, isLeaf ? 0 : 1, false, false);
return;
}
......@@ -176,74 +209,82 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
if (lsn > PageGetLSN(page))
{
if (data->isData)
/* How to insert the payload is tree-type specific */
if (data->flags & GIN_INSERT_ISDATA)
{
Assert(GinPageIsData(page));
if (data->isLeaf)
{
OffsetNumber i;
ItemPointerData *items = (ItemPointerData *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
Assert(GinPageIsLeaf(page));
Assert(data->updateBlkno == InvalidBlockNumber);
for (i = 0; i < data->nitem; i++)
GinDataPageAddItemPointer(page, &items[i], data->offset + i);
}
else
{
PostingItem *pitem;
Assert(!GinPageIsLeaf(page));
if (data->updateBlkno != InvalidBlockNumber)
{
/* update link to right page after split */
pitem = GinDataPageGetPostingItem(page, data->offset);
PostingItemSetBlockNumber(pitem, data->updateBlkno);
}
pitem = (PostingItem *) (XLogRecGetData(record) + sizeof(ginxlogInsert));
GinDataPageAddPostingItem(page, pitem, data->offset);
}
ginRedoInsertData(buffer, data->offset, rightChildBlkno, payload);
}
else
{
IndexTuple itup;
Assert(!GinPageIsData(page));
ginRedoInsertEntry(buffer, data->offset, rightChildBlkno, payload);
}
if (data->updateBlkno != InvalidBlockNumber)
{
/* update link to right page after split */
Assert(!GinPageIsLeaf(page));
Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, data->offset));
GinSetDownlink(itup, data->updateBlkno);
}
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
}
if (data->isDelete)
{
Assert(GinPageIsLeaf(page));
Assert(data->offset >= FirstOffsetNumber && data->offset <= PageGetMaxOffsetNumber(page));
PageIndexTupleDelete(page, data->offset);
}
UnlockReleaseBuffer(buffer);
}
itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsert));
static void
ginRedoSplitEntry(Page lpage, Page rpage, void *rdata)
{
ginxlogSplitEntry *data = (ginxlogSplitEntry *) rdata;
IndexTuple itup = (IndexTuple) ((char *) rdata + sizeof(ginxlogSplitEntry));
OffsetNumber i;
if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), data->offset, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index page in %u/%u/%u",
data->node.spcNode, data->node.dbNode, data->node.relNode);
}
for (i = 0; i < data->separator; i++)
{
if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to gin index page");
itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
}
PageSetLSN(page, lsn);
for (i = data->separator; i < data->nitem; i++)
{
if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to gin index page");
itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
}
}
MarkBufferDirty(buffer);
static void
ginRedoSplitData(Page lpage, Page rpage, void *rdata)
{
ginxlogSplitData *data = (ginxlogSplitData *) rdata;
bool isleaf = GinPageIsLeaf(lpage);
char *ptr = (char *) rdata + sizeof(ginxlogSplitData);
OffsetNumber i;
ItemPointer bound;
if (isleaf)
{
ItemPointer items = (ItemPointer) ptr;
for (i = 0; i < data->separator; i++)
GinDataPageAddItemPointer(lpage, &items[i], InvalidOffsetNumber);
for (i = data->separator; i < data->nitem; i++)
GinDataPageAddItemPointer(rpage, &items[i], InvalidOffsetNumber);
}
else
{
PostingItem *items = (PostingItem *) ptr;
for (i = 0; i < data->separator; i++)
GinDataPageAddPostingItem(lpage, &items[i], InvalidOffsetNumber);
for (i = data->separator; i < data->nitem; i++)
GinDataPageAddPostingItem(rpage, &items[i], InvalidOffsetNumber);
}
UnlockReleaseBuffer(buffer);
/* set up right key */
bound = GinDataPageGetRightBound(lpage);
if (isleaf)
*bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff);
else
*bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key;
bound = GinDataPageGetRightBound(rpage);
*bound = data->rightbound;
}
static void
......@@ -255,14 +296,30 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
Page lpage,
rpage;
uint32 flags = 0;
char *payload;
bool isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
bool isData = (data->flags & GIN_INSERT_ISDATA) != 0;
bool isRoot = (data->flags & GIN_SPLIT_ROOT) != 0;
payload = XLogRecGetData(record) + sizeof(ginxlogSplit);
/*
* First clear incomplete-split flag on child page if this finishes
* a split
*/
if (!isLeaf)
{
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
ginRedoClearIncompleteSplit(lsn, data->node, data->leftChildBlkno);
}
if (data->isLeaf)
if (isLeaf)
flags |= GIN_LEAF;
if (data->isData)
flags |= GIN_DATA;
/* Backup blocks are not used in split records */
Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
if (isData)
flags |= GIN_DATA;
lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
Assert(BufferIsValid(lbuffer));
......@@ -275,64 +332,13 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
GinInitBuffer(rbuffer, flags);
GinPageGetOpaque(lpage)->rightlink = BufferGetBlockNumber(rbuffer);
GinPageGetOpaque(rpage)->rightlink = data->rrlink;
if (data->isData)
{
char *ptr = XLogRecGetData(record) + sizeof(ginxlogSplit);
Size sizeofitem = GinSizeOfDataPageItem(lpage);
OffsetNumber i;
ItemPointer bound;
for (i = 0; i < data->separator; i++)
{
if (data->isLeaf)
GinDataPageAddItemPointer(lpage, (ItemPointer) ptr, InvalidOffsetNumber);
else
GinDataPageAddPostingItem(lpage, (PostingItem *) ptr, InvalidOffsetNumber);
ptr += sizeofitem;
}
for (i = data->separator; i < data->nitem; i++)
{
if (data->isLeaf)
GinDataPageAddItemPointer(rpage, (ItemPointer) ptr, InvalidOffsetNumber);
else
GinDataPageAddPostingItem(rpage, (PostingItem *) ptr, InvalidOffsetNumber);
ptr += sizeofitem;
}
GinPageGetOpaque(rpage)->rightlink = isRoot ? InvalidBlockNumber : data->rrlink;
/* set up right key */
bound = GinDataPageGetRightBound(lpage);
if (data->isLeaf)
*bound = *GinDataPageGetItemPointer(lpage, GinPageGetOpaque(lpage)->maxoff);
else
*bound = GinDataPageGetPostingItem(lpage, GinPageGetOpaque(lpage)->maxoff)->key;
bound = GinDataPageGetRightBound(rpage);
*bound = data->rightbound;
}
/* Do the tree-type specific portion to restore the page contents */
if (isData)
ginRedoSplitData(lpage, rpage, payload);
else
{
IndexTuple itup = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogSplit));
OffsetNumber i;
for (i = 0; i < data->separator; i++)
{
if (PageAddItem(lpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index page in %u/%u/%u",
data->node.spcNode, data->node.dbNode, data->node.relNode);
itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
}
for (i = data->separator; i < data->nitem; i++)
{
if (PageAddItem(rpage, (Item) itup, IndexTupleSize(itup), InvalidOffsetNumber, false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add item to index page in %u/%u/%u",
data->node.spcNode, data->node.dbNode, data->node.relNode);
itup = (IndexTuple) (((char *) itup) + MAXALIGN(IndexTupleSize(itup)));
}
}
ginRedoSplitEntry(lpage, rpage, payload);
PageSetLSN(rpage, lsn);
MarkBufferDirty(rbuffer);
......@@ -340,25 +346,31 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
PageSetLSN(lpage, lsn);
MarkBufferDirty(lbuffer);
if (!data->isLeaf && data->updateBlkno != InvalidBlockNumber)
forgetIncompleteSplit(data->node, data->leftChildBlkno, data->updateBlkno);
if (data->isRootSplit)
if (isRoot)
{
Buffer rootBuf = XLogReadBuffer(data->node, data->rootBlkno, true);
BlockNumber rootBlkno = data->rrlink;
Buffer rootBuf = XLogReadBuffer(data->node, rootBlkno, true);
Page rootPage = BufferGetPage(rootBuf);
GinInitBuffer(rootBuf, flags & ~GIN_LEAF);
if (data->isData)
if (isData)
{
Assert(data->rootBlkno != GIN_ROOT_BLKNO);
ginDataFillRoot(NULL, rootBuf, lbuffer, rbuffer);
Assert(rootBlkno != GIN_ROOT_BLKNO);
ginDataFillRoot(NULL, BufferGetPage(rootBuf),
BufferGetBlockNumber(lbuffer),
BufferGetPage(lbuffer),
BufferGetBlockNumber(rbuffer),
BufferGetPage(rbuffer));
}
else
{
Assert(data->rootBlkno == GIN_ROOT_BLKNO);
ginEntryFillRoot(NULL, rootBuf, lbuffer, rbuffer);
Assert(rootBlkno == GIN_ROOT_BLKNO);
ginEntryFillRoot(NULL, BufferGetPage(rootBuf),
BufferGetBlockNumber(lbuffer),
BufferGetPage(lbuffer),
BufferGetBlockNumber(rbuffer),
BufferGetPage(rbuffer));
}
PageSetLSN(rootPage, lsn);
......@@ -366,8 +378,6 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
MarkBufferDirty(rootBuf);
UnlockReleaseBuffer(rootBuf);
}
else
pushIncompleteSplit(data->node, data->lblkno, data->rblkno, data->rootBlkno);
UnlockReleaseBuffer(rbuffer);
UnlockReleaseBuffer(lbuffer);
......@@ -711,6 +721,7 @@ void
gin_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
MemoryContext oldCtx;
/*
* GIN indexes do not require any conflict processing. NB: If we ever
......@@ -718,7 +729,7 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record)
* killed tuples outside VACUUM, we'll need to handle that here.
*/
topCtx = MemoryContextSwitchTo(opCtx);
oldCtx = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_GIN_CREATE_INDEX:
......@@ -751,15 +762,13 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record)
default:
elog(PANIC, "gin_redo: unknown op code %u", info);
}
MemoryContextSwitchTo(topCtx);
MemoryContextSwitchTo(oldCtx);
MemoryContextReset(opCtx);
}
void
gin_xlog_startup(void)
{
incomplete_splits = NIL;
opCtx = AllocSetContextCreate(CurrentMemoryContext,
"GIN recovery temporary context",
ALLOCSET_DEFAULT_MINSIZE,
......@@ -767,84 +776,8 @@ gin_xlog_startup(void)
ALLOCSET_DEFAULT_MAXSIZE);
}
static void
ginContinueSplit(ginIncompleteSplit *split)
{
GinBtreeData btree;
GinState ginstate;
Relation reln;
Buffer buffer;
GinBtreeStack *stack;
/*
* elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno,
* split->leftBlkno, split->rightBlkno);
*/
buffer = XLogReadBuffer(split->node, split->leftBlkno, false);
/*
* Failure should be impossible here, because we wrote the page earlier.
*/
if (!BufferIsValid(buffer))
elog(PANIC, "ginContinueSplit: left block %u not found",
split->leftBlkno);
reln = CreateFakeRelcacheEntry(split->node);
if (split->rootBlkno == GIN_ROOT_BLKNO)
{
MemSet(&ginstate, 0, sizeof(ginstate));
ginstate.index = reln;
ginPrepareEntryScan(&btree,
InvalidOffsetNumber, (Datum) 0, GIN_CAT_NULL_KEY,
&ginstate);
}
else
{
ginPrepareDataScan(&btree, reln, split->rootBlkno);
}
stack = palloc(sizeof(GinBtreeStack));
stack->blkno = split->leftBlkno;
stack->buffer = buffer;
stack->off = InvalidOffsetNumber;
stack->parent = NULL;
ginFindParents(&btree, stack);
LockBuffer(stack->parent->buffer, GIN_UNLOCK);
ginFinishSplit(&btree, stack, NULL);
/* buffer is released by ginFinishSplit */
FreeFakeRelcacheEntry(reln);
}
void
gin_xlog_cleanup(void)
{
ListCell *l;
MemoryContext topCtx;
topCtx = MemoryContextSwitchTo(opCtx);
foreach(l, incomplete_splits)
{
ginIncompleteSplit *split = (ginIncompleteSplit *) lfirst(l);
ginContinueSplit(split);
MemoryContextReset(opCtx);
}
MemoryContextSwitchTo(topCtx);
MemoryContextDelete(opCtx);
incomplete_splits = NIL;
}
bool
gin_safe_restartpoint(void)
{
if (incomplete_splits)
return false;
return true;
}
......@@ -41,20 +41,45 @@ gin_desc(StringInfo buf, uint8 xl_info, char *rec)
desc_node(buf, ((ginxlogCreatePostingTree *) rec)->node, ((ginxlogCreatePostingTree *) rec)->blkno);
break;
case XLOG_GIN_INSERT:
appendStringInfoString(buf, "Insert item, ");
desc_node(buf, ((ginxlogInsert *) rec)->node, ((ginxlogInsert *) rec)->blkno);
appendStringInfo(buf, " offset: %u nitem: %u isdata: %c isleaf %c isdelete %c updateBlkno:%u",
((ginxlogInsert *) rec)->offset,
((ginxlogInsert *) rec)->nitem,
(((ginxlogInsert *) rec)->isData) ? 'T' : 'F',
(((ginxlogInsert *) rec)->isLeaf) ? 'T' : 'F',
(((ginxlogInsert *) rec)->isDelete) ? 'T' : 'F',
((ginxlogInsert *) rec)->updateBlkno);
{
ginxlogInsert *xlrec = (ginxlogInsert *) rec;
char *payload = rec + sizeof(ginxlogInsert);
appendStringInfoString(buf, "Insert item, ");
desc_node(buf, xlrec->node, xlrec->blkno);
appendStringInfo(buf, " offset: %u isdata: %c isleaf: %c",
xlrec->offset,
(xlrec->flags & GIN_INSERT_ISDATA) ? 'T' : 'F',
(xlrec->flags & GIN_INSERT_ISLEAF) ? 'T' : 'F');
if (!(xlrec->flags & GIN_INSERT_ISLEAF))
{
BlockNumber leftChildBlkno;
BlockNumber rightChildBlkno;
memcpy(&leftChildBlkno, payload, sizeof(BlockNumber));
payload += sizeof(BlockNumber);
memcpy(&rightChildBlkno, payload, sizeof(BlockNumber));
payload += sizeof(BlockNumber);
appendStringInfo(buf, " children: %u/%u",
leftChildBlkno, rightChildBlkno);
}
if (!(xlrec->flags & GIN_INSERT_ISDATA))
appendStringInfo(buf, " isdelete: %c",
(((ginxlogInsertEntry *) payload)->isDelete) ? 'T' : 'F');
else if (xlrec->flags & GIN_INSERT_ISLEAF)
appendStringInfo(buf, " nitem: %u",
(((ginxlogInsertDataLeaf *) payload)->nitem) ? 'T' : 'F');
else
appendStringInfo(buf, " pitem: %u-%u/%u",
PostingItemGetBlockNumber((PostingItem *) payload),
ItemPointerGetBlockNumber(&((PostingItem *) payload)->key),
ItemPointerGetOffsetNumber(&((PostingItem *) payload)->key));
}
break;
case XLOG_GIN_SPLIT:
appendStringInfoString(buf, "Page split, ");
desc_node(buf, ((ginxlogSplit *) rec)->node, ((ginxlogSplit *) rec)->lblkno);
appendStringInfo(buf, " isrootsplit: %c", (((ginxlogSplit *) rec)->isRootSplit) ? 'T' : 'F');
appendStringInfo(buf, " isrootsplit: %c", (((ginxlogSplit *) rec)->flags & GIN_SPLIT_ROOT) ? 'T' : 'F');
break;
case XLOG_GIN_VACUUM_PAGE:
appendStringInfoString(buf, "Vacuum page, ");
......
......@@ -58,6 +58,5 @@ extern void gin_redo(XLogRecPtr lsn, XLogRecord *record);
extern void gin_desc(StringInfo buf, uint8 xl_info, char *rec);
extern void gin_xlog_startup(void);
extern void gin_xlog_cleanup(void);
extern bool gin_safe_restartpoint(void);
#endif /* GIN_H */
......@@ -48,6 +48,7 @@ typedef GinPageOpaqueData *GinPageOpaque;
#define GIN_META (1 << 3)
#define GIN_LIST (1 << 4)
#define GIN_LIST_FULLROW (1 << 5) /* makes sense only on GIN_LIST page */
#define GIN_INCOMPLETE_SPLIT (1 << 6) /* page was split, but parent not updated */
/* Page numbers of fixed-location pages */
#define GIN_METAPAGE_BLKNO (0)
......@@ -119,6 +120,7 @@ typedef struct GinMetaPageData
#define GinPageIsDeleted(page) ( GinPageGetOpaque(page)->flags & GIN_DELETED)
#define GinPageSetDeleted(page) ( GinPageGetOpaque(page)->flags |= GIN_DELETED)
#define GinPageSetNonDeleted(page) ( GinPageGetOpaque(page)->flags &= ~GIN_DELETED)
#define GinPageIsIncompleteSplit(page) ( GinPageGetOpaque(page)->flags & GIN_INCOMPLETE_SPLIT)
#define GinPageRightMost(page) ( GinPageGetOpaque(page)->rightlink == InvalidBlockNumber)
......@@ -336,41 +338,77 @@ typedef struct ginxlogInsert
{
RelFileNode node;
BlockNumber blkno;
BlockNumber updateBlkno;
uint16 flags; /* GIN_SPLIT_ISLEAF and/or GIN_SPLIT_ISDATA */
OffsetNumber offset;
bool isDelete;
bool isData;
bool isLeaf;
OffsetNumber nitem;
/*
* follows: tuples or ItemPointerData or PostingItem or list of
* ItemPointerData
* FOLLOWS:
*
* 1. if not leaf page, block numbers of the left and right child pages
* whose split this insertion finishes. As BlockIdData[2] (beware of adding
* fields before this that would make them not 16-bit aligned)
*
* 2. one of the following structs, depending on tree type.
*
* NB: the below structs are only 16-bit aligned when appended to a
* ginxlogInsert struct! Beware of adding fields to them that require
* stricter alignment.
*/
} ginxlogInsert;
typedef struct
{
bool isDelete;
IndexTupleData tuple; /* variable length */
} ginxlogInsertEntry;
typedef struct
{
OffsetNumber nitem;
ItemPointerData items[1]; /* variable length */
} ginxlogInsertDataLeaf;
/* In an insert to an internal data page, the payload is a PostingItem */
#define XLOG_GIN_SPLIT 0x30
typedef struct ginxlogSplit
{
RelFileNode node;
BlockNumber lblkno;
BlockNumber rootBlkno;
BlockNumber rblkno;
BlockNumber rrlink;
BlockNumber rrlink; /* right link, or root's blocknumber if root split */
BlockNumber leftChildBlkno; /* valid on a non-leaf split */
BlockNumber rightChildBlkno;
uint16 flags;
/* follows: one of the following structs */
} ginxlogSplit;
/*
* Flags used in ginxlogInsert and ginxlogSplit records
*/
#define GIN_INSERT_ISDATA 0x01 /* for both insert and split records */
#define GIN_INSERT_ISLEAF 0x02 /* .. */
#define GIN_SPLIT_ROOT 0x04 /* only for split records */
typedef struct
{
OffsetNumber separator;
OffsetNumber nitem;
bool isData;
bool isLeaf;
bool isRootSplit;
/* FOLLOWS: IndexTuples */
} ginxlogSplitEntry;
BlockNumber leftChildBlkno;
BlockNumber updateBlkno;
typedef struct
{
OffsetNumber separator;
OffsetNumber nitem;
ItemPointerData rightbound;
ItemPointerData rightbound; /* used only in posting tree */
/* follows: list of tuple or ItemPointerData or PostingItem */
} ginxlogSplit;
/* FOLLOWS: array of ItemPointers (for leaf) or PostingItems (non-leaf) */
} ginxlogSplitData;
#define XLOG_GIN_VACUUM_PAGE 0x40
......@@ -488,7 +526,7 @@ typedef struct GinBtreeData
bool (*placeToPage) (GinBtree, Buffer, OffsetNumber, void *, BlockNumber, XLogRecData **);
Page (*splitPage) (GinBtree, Buffer, Buffer, OffsetNumber, void *, BlockNumber, XLogRecData **);
void *(*prepareDownlink) (GinBtree, Buffer);
void (*fillRoot) (GinBtree, Buffer, Buffer, Buffer);
void (*fillRoot) (GinBtree, Page, BlockNumber, Page, BlockNumber, Page);
bool isData;
......@@ -535,9 +573,6 @@ extern Buffer ginStepRight(Buffer buffer, Relation index, int lockmode);
extern void freeGinBtreeStack(GinBtreeStack *stack);
extern void ginInsertValue(GinBtree btree, GinBtreeStack *stack,
void *insertdata, GinStatsData *buildStats);
extern void ginFindParents(GinBtree btree, GinBtreeStack *stack);
extern void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
GinStatsData *buildStats);
/* ginentrypage.c */
extern IndexTuple GinFormTuple(GinState *ginstate,
......@@ -547,7 +582,7 @@ extern void GinShortenTuple(IndexTuple itup, uint32 nipd);
extern void ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum,
Datum key, GinNullCategory category,
GinState *ginstate);
extern void ginEntryFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf);
extern void ginEntryFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage);
/* gindatapage.c */
extern BlockNumber createPostingTree(Relation index,
......@@ -560,7 +595,7 @@ extern void ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
ItemPointerData *items, uint32 nitem,
GinStatsData *buildStats);
extern GinBtreeStack *ginScanBeginPostingTree(Relation index, BlockNumber rootBlkno);
extern void ginDataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf);
extern void ginDataFillRoot(GinBtree btree, Page root, BlockNumber lblkno, Page lpage, BlockNumber rblkno, Page rpage);
extern void ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno);
/* ginscan.c */
......
......@@ -38,7 +38,7 @@ PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, NULL, NULL, NULL)
PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, NULL, NULL, NULL)
PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint)
PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, NULL, NULL, NULL)
PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint)
PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, NULL)
PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL)
PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, NULL, NULL, NULL)
PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_xlog_startup, spg_xlog_cleanup, NULL)
......@@ -55,7 +55,7 @@ typedef struct BkpBlock
/*
* Each page of XLOG file has a header like this:
*/
#define XLOG_PAGE_MAGIC 0xD076 /* can be used as WAL version indicator */
#define XLOG_PAGE_MAGIC 0xD077 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment