Commit 04eee1fa authored by Heikki Linnakangas's avatar Heikki Linnakangas

More GIN refactoring.

Split off the portion of ginInsertValue that inserts the tuple to current
level into a separate function, ginPlaceToPage. ginInsertValue's charter
is now to recurse up the tree to insert the downlink, when a page split is
required.

This is in preparation for a patch to change the way incomplete splits are
handled, which will need to do these operations separately. And IMHO makes
the code more readable anyway.
parent 50101263
...@@ -280,80 +280,115 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack, ...@@ -280,80 +280,115 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
} }
/* /*
* Insert value (stored in GinBtree) to tree described by stack * Returns true if the insertion is done, false if the page was split and
* * downlink insertion is pending.
* During an index build, buildStats is non-null and the counters
* it contains are incremented as needed.
* *
* NB: the passed-in stack is freed, as though by freeGinBtreeStack. * stack->buffer is locked on entry, and is kept locked.
*/ */
void static bool
ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats) ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
GinStatsData *buildStats)
{ {
GinBtreeStack *parent; Page page = BufferGetPage(stack->buffer);
BlockNumber rootBlkno; XLogRecData *rdata;
Page page, bool fit;
rpage,
lpage;
/* extract root BlockNumber from stack */ START_CRIT_SECTION();
Assert(stack != NULL); fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata);
parent = stack; if (fit)
while (parent->parent) {
parent = parent->parent; MarkBufferDirty(stack->buffer);
rootBlkno = parent->blkno;
Assert(BlockNumberIsValid(rootBlkno));
/* this loop crawls up the stack until the insertion is complete */ if (RelationNeedsWAL(btree->index))
for (;;) {
XLogRecPtr recptr;
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata);
PageSetLSN(page, recptr);
}
END_CRIT_SECTION();
return true;
}
else
{ {
XLogRecData *rdata; /* Didn't fit, have to split */
Buffer rbuffer;
Page newlpage;
BlockNumber savedRightLink; BlockNumber savedRightLink;
bool fit; GinBtreeStack *parent;
Page lpage,
rpage;
END_CRIT_SECTION();
rbuffer = GinNewBuffer(btree->index);
page = BufferGetPage(stack->buffer);
savedRightLink = GinPageGetOpaque(page)->rightlink; savedRightLink = GinPageGetOpaque(page)->rightlink;
START_CRIT_SECTION(); /*
fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata); * newlpage is a pointer to memory page, it is not associated with
if (fit) * a buffer. stack->buffer is not touched yet.
*/
newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
/* During index build, count the newly-split page */
if (buildStats)
{ {
if (btree->isData)
buildStats->nDataPages++;
else
buildStats->nEntryPages++;
}
parent = stack->parent;
if (parent == NULL)
{
/*
* split root, so we need to allocate new left page and place
* pointer on root to left and right page
*/
Buffer lbuffer = GinNewBuffer(btree->index);
((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
lpage = BufferGetPage(lbuffer);
rpage = BufferGetPage(rbuffer);
GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
START_CRIT_SECTION();
GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
PageRestoreTempPage(newlpage, lpage);
btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
MarkBufferDirty(rbuffer);
MarkBufferDirty(lbuffer);
MarkBufferDirty(stack->buffer); MarkBufferDirty(stack->buffer);
if (RelationNeedsWAL(btree->index)) if (RelationNeedsWAL(btree->index))
{ {
XLogRecPtr recptr; XLogRecPtr recptr;
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT, rdata); recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
PageSetLSN(page, recptr); PageSetLSN(page, recptr);
PageSetLSN(lpage, recptr);
PageSetLSN(rpage, recptr);
} }
LockBuffer(stack->buffer, GIN_UNLOCK); UnlockReleaseBuffer(rbuffer);
END_CRIT_SECTION(); UnlockReleaseBuffer(lbuffer);
freeGinBtreeStack(stack);
return;
}
else
{
/* Didn't fit, have to split */
Buffer rbuffer;
Page newlpage;
END_CRIT_SECTION(); END_CRIT_SECTION();
rbuffer = GinNewBuffer(btree->index); /* During index build, count the newly-added root page */
/*
* newlpage is a pointer to memory page, it is not associated with
* a buffer. stack->buffer is not touched yet.
*/
newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
/* During index build, count the newly-split page */
if (buildStats) if (buildStats)
{ {
if (btree->isData) if (btree->isData)
...@@ -362,98 +397,83 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats) ...@@ -362,98 +397,83 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
buildStats->nEntryPages++; buildStats->nEntryPages++;
} }
parent = stack->parent; return true;
}
if (parent == NULL) else
{ {
/* /* split non-root page */
* split root, so we need to allocate new left page and place ((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
* pointer on root to left and right page ((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
*/
Buffer lbuffer = GinNewBuffer(btree->index);
((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
page = BufferGetPage(stack->buffer);
lpage = BufferGetPage(lbuffer);
rpage = BufferGetPage(rbuffer);
GinPageGetOpaque(rpage)->rightlink = InvalidBlockNumber;
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
((ginxlogSplit *) (rdata->data))->lblkno = BufferGetBlockNumber(lbuffer);
START_CRIT_SECTION();
GinInitBuffer(stack->buffer, GinPageGetOpaque(newlpage)->flags & ~GIN_LEAF);
PageRestoreTempPage(newlpage, lpage);
btree->fillRoot(btree, stack->buffer, lbuffer, rbuffer);
MarkBufferDirty(rbuffer);
MarkBufferDirty(lbuffer);
MarkBufferDirty(stack->buffer);
if (RelationNeedsWAL(btree->index)) lpage = BufferGetPage(stack->buffer);
{ rpage = BufferGetPage(rbuffer);
XLogRecPtr recptr;
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata); GinPageGetOpaque(rpage)->rightlink = savedRightLink;
PageSetLSN(page, recptr); GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
PageSetLSN(lpage, recptr);
PageSetLSN(rpage, recptr);
}
UnlockReleaseBuffer(rbuffer); START_CRIT_SECTION();
UnlockReleaseBuffer(lbuffer); PageRestoreTempPage(newlpage, lpage);
LockBuffer(stack->buffer, GIN_UNLOCK);
END_CRIT_SECTION();
freeGinBtreeStack(stack); MarkBufferDirty(rbuffer);
MarkBufferDirty(stack->buffer);
/* During index build, count the newly-added root page */ if (RelationNeedsWAL(btree->index))
if (buildStats) {
{ XLogRecPtr recptr;
if (btree->isData)
buildStats->nDataPages++;
else
buildStats->nEntryPages++;
}
return; recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata);
PageSetLSN(lpage, recptr);
PageSetLSN(rpage, recptr);
} }
else UnlockReleaseBuffer(rbuffer);
{ END_CRIT_SECTION();
/* split non-root page */
((ginxlogSplit *) (rdata->data))->isRootSplit = FALSE;
((ginxlogSplit *) (rdata->data))->rrlink = savedRightLink;
lpage = BufferGetPage(stack->buffer); return false;
rpage = BufferGetPage(rbuffer); }
}
}
GinPageGetOpaque(rpage)->rightlink = savedRightLink; /*
GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer); * Insert value (stored in GinBtree) to tree described by stack
*
* During an index build, buildStats is non-null and the counters
* it contains are incremented as needed.
*
* NB: the passed-in stack is freed, as though by freeGinBtreeStack.
*/
void
ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
{
GinBtreeStack *parent;
BlockNumber rootBlkno;
Page page;
/* extract root BlockNumber from stack */
Assert(stack != NULL);
parent = stack;
while (parent->parent)
parent = parent->parent;
rootBlkno = parent->blkno;
Assert(BlockNumberIsValid(rootBlkno));
START_CRIT_SECTION(); /* this loop crawls up the stack until the insertion is complete */
PageRestoreTempPage(newlpage, lpage); for (;;)
{
bool done;
MarkBufferDirty(rbuffer); done = ginPlaceToPage(btree, rootBlkno, stack, buildStats);
MarkBufferDirty(stack->buffer);
if (RelationNeedsWAL(btree->index)) /* just to be extra sure we don't delete anything by accident... */
{ btree->isDelete = FALSE;
XLogRecPtr recptr;
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT, rdata); if (done)
PageSetLSN(lpage, recptr); {
PageSetLSN(rpage, recptr); LockBuffer(stack->buffer, GIN_UNLOCK);
} freeGinBtreeStack(stack);
UnlockReleaseBuffer(rbuffer); break;
END_CRIT_SECTION();
}
} }
btree->prepareDownlink(btree, stack->buffer); btree->prepareDownlink(btree, stack->buffer);
btree->isDelete = FALSE;
/* search parent to lock */ /* search parent to lock */
LockBuffer(parent->buffer, GIN_EXCLUSIVE); LockBuffer(parent->buffer, GIN_EXCLUSIVE);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment