Commit ce5326ee authored by Heikki Linnakangas's avatar Heikki Linnakangas

More GIN refactoring.

Separate the insertion payload from the more static portions of GinBtree.
GinBtree now only contains information related to searching the tree, and
the information of what to insert is passed separately.

Add root block number to GinBtree, instead of passing it around all the
functions as argument.

Split off ginFinishSplit() from ginInsertValue(). ginFinishSplit is
responsible for finding the parent and inserting the downlink to it.
parent 4118f7e8
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "utils/rel.h" #include "utils/rel.h"
/* /*
* Locks buffer by needed method for search. * Lock buffer by needed method for search.
*/ */
static int static int
ginTraverseLock(Buffer buffer, bool searchMode) ginTraverseLock(Buffer buffer, bool searchMode)
...@@ -53,9 +53,9 @@ ginTraverseLock(Buffer buffer, bool searchMode) ...@@ -53,9 +53,9 @@ ginTraverseLock(Buffer buffer, bool searchMode)
} }
/* /*
* Descends the tree to the leaf page that contains or would contain the * Descend the tree to the leaf page that contains or would contain the key
* key we're searching for. The key should already be filled in 'btree', * we're searching for. The key should already be filled in 'btree', in
* in tree-type specific manner. If btree->fullScan is true, descends to the * tree-type specific manner. If btree->fullScan is true, descends to the
* leftmost leaf page. * leftmost leaf page.
* *
* If 'searchmode' is false, on return stack->buffer is exclusively locked, * If 'searchmode' is false, on return stack->buffer is exclusively locked,
...@@ -63,13 +63,13 @@ ginTraverseLock(Buffer buffer, bool searchMode) ...@@ -63,13 +63,13 @@ ginTraverseLock(Buffer buffer, bool searchMode)
* is share-locked, and stack->parent is NULL. * is share-locked, and stack->parent is NULL.
*/ */
GinBtreeStack * GinBtreeStack *
ginFindLeafPage(GinBtree btree, BlockNumber rootBlkno, bool searchMode) ginFindLeafPage(GinBtree btree, bool searchMode)
{ {
GinBtreeStack *stack; GinBtreeStack *stack;
stack = (GinBtreeStack *) palloc(sizeof(GinBtreeStack)); stack = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
stack->blkno = rootBlkno; stack->blkno = btree->rootBlkno;
stack->buffer = ReadBuffer(btree->index, rootBlkno); stack->buffer = ReadBuffer(btree->index, btree->rootBlkno);
stack->parent = NULL; stack->parent = NULL;
stack->predictNumber = 1; stack->predictNumber = 1;
...@@ -89,7 +89,7 @@ ginFindLeafPage(GinBtree btree, BlockNumber rootBlkno, bool searchMode) ...@@ -89,7 +89,7 @@ ginFindLeafPage(GinBtree btree, BlockNumber rootBlkno, bool searchMode)
* ok, page is correctly locked, we should check to move right .., * ok, page is correctly locked, we should check to move right ..,
* root never has a right link, so small optimization * root never has a right link, so small optimization
*/ */
while (btree->fullScan == FALSE && stack->blkno != rootBlkno && while (btree->fullScan == FALSE && stack->blkno != btree->rootBlkno &&
btree->isMoveRight(btree, page)) btree->isMoveRight(btree, page))
{ {
BlockNumber rightlink = GinPageGetOpaque(page)->rightlink; BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
...@@ -158,8 +158,8 @@ ginStepRight(Buffer buffer, Relation index, int lockmode) ...@@ -158,8 +158,8 @@ ginStepRight(Buffer buffer, Relation index, int lockmode)
elog(ERROR, "right sibling of GIN page is of different type"); elog(ERROR, "right sibling of GIN page is of different type");
/* /*
* Given the proper lock sequence above, we should never land on a * Given the proper lock sequence above, we should never land on a deleted
* deleted page. * page.
*/ */
if (GinPageIsDeleted(page)) if (GinPageIsDeleted(page))
elog(ERROR, "right sibling of GIN page was deleted"); elog(ERROR, "right sibling of GIN page was deleted");
...@@ -183,14 +183,12 @@ freeGinBtreeStack(GinBtreeStack *stack) ...@@ -183,14 +183,12 @@ freeGinBtreeStack(GinBtreeStack *stack)
} }
/* /*
* Try to find parent for current stack position, returns correct * Try to find parent for current stack position. Returns correct parent and
* parent and child's offset in stack->parent. * child's offset in stack->parent. The root page is never released, to
* Function should never release root page to prevent conflicts * to prevent conflict with vacuum process.
* with vacuum process
*/ */
void void
ginFindParents(GinBtree btree, GinBtreeStack *stack, ginFindParents(GinBtree btree, GinBtreeStack *stack)
BlockNumber rootBlkno)
{ {
Page page; Page page;
Buffer buffer; Buffer buffer;
...@@ -204,8 +202,8 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack, ...@@ -204,8 +202,8 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
{ {
/* XLog mode... */ /* XLog mode... */
root = (GinBtreeStack *) palloc(sizeof(GinBtreeStack)); root = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
root->blkno = rootBlkno; root->blkno = btree->rootBlkno;
root->buffer = ReadBuffer(btree->index, rootBlkno); root->buffer = ReadBuffer(btree->index, btree->rootBlkno);
LockBuffer(root->buffer, GIN_EXCLUSIVE); LockBuffer(root->buffer, GIN_EXCLUSIVE);
root->parent = NULL; root->parent = NULL;
} }
...@@ -221,8 +219,8 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack, ...@@ -221,8 +219,8 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
root = root->parent; root = root->parent;
} }
Assert(root->blkno == rootBlkno); Assert(root->blkno == btree->rootBlkno);
Assert(BufferGetBlockNumber(root->buffer) == rootBlkno); Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
LockBuffer(root->buffer, GIN_EXCLUSIVE); LockBuffer(root->buffer, GIN_EXCLUSIVE);
} }
root->off = InvalidOffsetNumber; root->off = InvalidOffsetNumber;
...@@ -268,7 +266,7 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack, ...@@ -268,7 +266,7 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack)); ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
ptr->blkno = blkno; ptr->blkno = blkno;
ptr->buffer = buffer; ptr->buffer = buffer;
ptr->parent = root; /* it's may be wrong, but in next call we will ptr->parent = root; /* it may be wrong, but in next call we will
* correct */ * correct */
ptr->off = offset; ptr->off = offset;
stack->parent = ptr; stack->parent = ptr;
...@@ -280,21 +278,35 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack, ...@@ -280,21 +278,35 @@ ginFindParents(GinBtree btree, GinBtreeStack *stack,
} }
/* /*
* Returns true if the insertion is done, false if the page was split and * Insert a new item to a page.
* downlink insertion is pending. *
* Returns true if the insertion was finished. On false, the page was split and
* the parent needs to be updated. (a root split returns true as it doesn't
* need any further action by the caller to complete)
*
* When inserting a downlink to a internal page, the existing item at the
* given location is updated to point to 'updateblkno'.
* *
* stack->buffer is locked on entry, and is kept locked. * stack->buffer is locked on entry, and is kept locked.
*/ */
static bool static bool
ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack, ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
void *insertdata, BlockNumber updateblkno,
GinStatsData *buildStats) GinStatsData *buildStats)
{ {
Page page = BufferGetPage(stack->buffer); Page page = BufferGetPage(stack->buffer);
XLogRecData *rdata; XLogRecData *rdata;
bool fit; bool fit;
/*
* Try to put the incoming tuple on the page. If it doesn't fit,
* placeToPage method will return false and leave the page unmodified, and
* we'll have to split the page.
*/
START_CRIT_SECTION(); START_CRIT_SECTION();
fit = btree->placeToPage(btree, stack->buffer, stack->off, &rdata); fit = btree->placeToPage(btree, stack->buffer, stack->off,
insertdata, updateblkno,
&rdata);
if (fit) if (fit)
{ {
MarkBufferDirty(stack->buffer); MarkBufferDirty(stack->buffer);
...@@ -324,18 +336,7 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack, ...@@ -324,18 +336,7 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
END_CRIT_SECTION(); END_CRIT_SECTION();
rbuffer = GinNewBuffer(btree->index); rbuffer = GinNewBuffer(btree->index);
/* During index build, count the new page */
savedRightLink = GinPageGetOpaque(page)->rightlink;
/*
* newlpage is a pointer to memory page, it is not associated with
* a buffer. stack->buffer is not touched yet.
*/
newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off, &rdata);
((ginxlogSplit *) (rdata->data))->rootBlkno = rootBlkno;
/* During index build, count the newly-split page */
if (buildStats) if (buildStats)
{ {
if (btree->isData) if (btree->isData)
...@@ -344,6 +345,18 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack, ...@@ -344,6 +345,18 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
buildStats->nEntryPages++; buildStats->nEntryPages++;
} }
savedRightLink = GinPageGetOpaque(page)->rightlink;
/*
* newlpage is a pointer to memory page, it is not associated with a
* buffer. stack->buffer is not touched yet.
*/
newlpage = btree->splitPage(btree, stack->buffer, rbuffer, stack->off,
insertdata, updateblkno,
&rdata);
((ginxlogSplit *) (rdata->data))->rootBlkno = btree->rootBlkno;
parent = stack->parent; parent = stack->parent;
if (parent == NULL) if (parent == NULL)
...@@ -354,6 +367,15 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack, ...@@ -354,6 +367,15 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
*/ */
Buffer lbuffer = GinNewBuffer(btree->index); Buffer lbuffer = GinNewBuffer(btree->index);
/* During index build, count the new page */
if (buildStats)
{
if (btree->isData)
buildStats->nDataPages++;
else
buildStats->nEntryPages++;
}
((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE; ((ginxlogSplit *) (rdata->data))->isRootSplit = TRUE;
((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber; ((ginxlogSplit *) (rdata->data))->rrlink = InvalidBlockNumber;
...@@ -434,46 +456,27 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack, ...@@ -434,46 +456,27 @@ ginPlaceToPage(GinBtree btree, BlockNumber rootBlkno, GinBtreeStack *stack,
} }
/* /*
* Insert value (stored in GinBtree) to tree described by stack * Finish a split by inserting the downlink for the new page to parent.
* *
* During an index build, buildStats is non-null and the counters * On entry, stack->buffer is exclusively locked.
* it contains are incremented as needed.
* *
* NB: the passed-in stack is freed, as though by freeGinBtreeStack. * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
*/ */
void void
ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats) ginFinishSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
{ {
GinBtreeStack *parent;
BlockNumber rootBlkno;
Page page; Page page;
/* extract root BlockNumber from stack */
Assert(stack != NULL);
parent = stack;
while (parent->parent)
parent = parent->parent;
rootBlkno = parent->blkno;
Assert(BlockNumberIsValid(rootBlkno));
/* this loop crawls up the stack until the insertion is complete */
for (;;)
{
bool done; bool done;
done = ginPlaceToPage(btree, rootBlkno, stack, buildStats); /* this loop crawls up the stack until the insertion is complete */
do
/* just to be extra sure we don't delete anything by accident... */
btree->isDelete = FALSE;
if (done)
{ {
LockBuffer(stack->buffer, GIN_UNLOCK); GinBtreeStack *parent = stack->parent;
freeGinBtreeStack(stack); void *insertdata;
break; BlockNumber updateblkno;
}
btree->prepareDownlink(btree, stack->buffer); insertdata = btree->prepareDownlink(btree, stack->buffer);
updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
/* search parent to lock */ /* search parent to lock */
LockBuffer(parent->buffer, GIN_EXCLUSIVE); LockBuffer(parent->buffer, GIN_EXCLUSIVE);
...@@ -491,7 +494,7 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats) ...@@ -491,7 +494,7 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
* plain search... * plain search...
*/ */
LockBuffer(parent->buffer, GIN_UNLOCK); LockBuffer(parent->buffer, GIN_UNLOCK);
ginFindParents(btree, stack, rootBlkno); ginFindParents(btree, stack);
parent = stack->parent; parent = stack->parent;
Assert(parent != NULL); Assert(parent != NULL);
break; break;
...@@ -502,8 +505,49 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats) ...@@ -502,8 +505,49 @@ ginInsertValue(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats)
page = BufferGetPage(parent->buffer); page = BufferGetPage(parent->buffer);
} }
/* release the child */
UnlockReleaseBuffer(stack->buffer); UnlockReleaseBuffer(stack->buffer);
pfree(stack); pfree(stack);
stack = parent; stack = parent;
/* insert the downlink to parent */
done = ginPlaceToPage(btree, stack,
insertdata, updateblkno,
buildStats);
pfree(insertdata);
} while (!done);
LockBuffer(stack->buffer, GIN_UNLOCK);
/* free the rest of the stack */
freeGinBtreeStack(stack);
}
/*
* Insert a value to tree described by stack.
*
* The value to be inserted is given in 'insertdata'. Its format depends
* on whether this is an entry or data tree, ginInsertValue just passes it
* through to the tree-specific callback function.
*
* During an index build, buildStats is non-null and the counters it contains
* are incremented as needed.
*
* NB: the passed-in stack is freed, as though by freeGinBtreeStack.
*/
void
ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
GinStatsData *buildStats)
{
bool done;
done = ginPlaceToPage(btree, stack,
insertdata, InvalidBlockNumber,
buildStats);
if (done)
{
LockBuffer(stack->buffer, GIN_UNLOCK);
freeGinBtreeStack(stack);
} }
else
ginFinishSplit(btree, stack, buildStats);
} }
...@@ -30,7 +30,7 @@ dataIsMoveRight(GinBtree btree, Page page) ...@@ -30,7 +30,7 @@ dataIsMoveRight(GinBtree btree, Page page)
if (GinPageRightMost(page)) if (GinPageRightMost(page))
return FALSE; return FALSE;
return (ginCompareItemPointers(btree->items + btree->curitem, iptr) > 0) ? TRUE : FALSE; return (ginCompareItemPointers(&btree->itemptr, iptr) > 0) ? TRUE : FALSE;
} }
/* /*
...@@ -80,7 +80,7 @@ dataLocateItem(GinBtree btree, GinBtreeStack *stack) ...@@ -80,7 +80,7 @@ dataLocateItem(GinBtree btree, GinBtreeStack *stack)
else else
{ {
pitem = GinDataPageGetPostingItem(page, mid); pitem = GinDataPageGetPostingItem(page, mid);
result = ginCompareItemPointers(btree->items + btree->curitem, &(pitem->key)); result = ginCompareItemPointers(&btree->itemptr, &(pitem->key));
} }
if (result == 0) if (result == 0)
...@@ -138,7 +138,7 @@ dataLocateLeafItem(GinBtree btree, GinBtreeStack *stack) ...@@ -138,7 +138,7 @@ dataLocateLeafItem(GinBtree btree, GinBtreeStack *stack)
{ {
OffsetNumber mid = low + ((high - low) / 2); OffsetNumber mid = low + ((high - low) / 2);
result = ginCompareItemPointers(btree->items + btree->curitem, result = ginCompareItemPointers(&btree->itemptr,
GinDataPageGetItemPointer(page, mid)); GinDataPageGetItemPointer(page, mid));
if (result == 0) if (result == 0)
...@@ -298,18 +298,19 @@ GinPageDeletePostingItem(Page page, OffsetNumber offset) ...@@ -298,18 +298,19 @@ GinPageDeletePostingItem(Page page, OffsetNumber offset)
* item pointer never deletes! * item pointer never deletes!
*/ */
static bool static bool
dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off) dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off, void *insertdata)
{ {
Page page = BufferGetPage(buf); Page page = BufferGetPage(buf);
Assert(GinPageIsData(page)); Assert(GinPageIsData(page));
Assert(!btree->isDelete);
if (GinPageIsLeaf(page)) if (GinPageIsLeaf(page))
{ {
GinBtreeDataLeafInsertData *items = insertdata;
if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff) if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
{ {
if ((btree->nitem - btree->curitem) * sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page)) if ((items->nitem - items->curitem) * sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
return true; return true;
} }
else if (sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page)) else if (sizeof(ItemPointerData) <= GinDataPageGetFreeSpace(page))
...@@ -321,43 +322,22 @@ dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off) ...@@ -321,43 +322,22 @@ dataIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
return false; return false;
} }
/*
* In case of previous split update old child blkno to
* new right page
* item pointer never deletes!
*/
static BlockNumber
dataPrepareData(GinBtree btree, Page page, OffsetNumber off)
{
BlockNumber ret = InvalidBlockNumber;
Assert(GinPageIsData(page));
if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber)
{
PostingItem *pitem = GinDataPageGetPostingItem(page, off);
PostingItemSetBlockNumber(pitem, btree->rightblkno);
ret = btree->rightblkno;
}
btree->rightblkno = InvalidBlockNumber;
return ret;
}
/* /*
* Places keys to page and fills WAL record. In case leaf page and * Places keys to page and fills WAL record. In case leaf page and
* build mode puts all ItemPointers to page. * build mode puts all ItemPointers to page.
* *
* If none of the keys fit, returns false without modifying the page. * If none of the keys fit, returns false without modifying the page.
*
* On insertion to an internal node, in addition to inserting the given item,
* the downlink of the existing item at 'off' is updated to point to
* 'updateblkno'.
*/ */
static bool static bool
dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
void *insertdata, BlockNumber updateblkno,
XLogRecData **prdata) XLogRecData **prdata)
{ {
Page page = BufferGetPage(buf); Page page = BufferGetPage(buf);
int sizeofitem = GinSizeOfDataPageItem(page);
int cnt = 0; int cnt = 0;
/* these must be static so they can be returned to caller */ /* these must be static so they can be returned to caller */
...@@ -365,14 +345,21 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, ...@@ -365,14 +345,21 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
static ginxlogInsert data; static ginxlogInsert data;
/* quick exit if it doesn't fit */ /* quick exit if it doesn't fit */
if (!dataIsEnoughSpace(btree, buf, off)) if (!dataIsEnoughSpace(btree, buf, off, insertdata))
return false; return false;
*prdata = rdata; *prdata = rdata;
Assert(GinPageIsData(page)); Assert(GinPageIsData(page));
data.updateBlkno = dataPrepareData(btree, page, off); /* Update existing downlink to point to next page (on internal page) */
if (!GinPageIsLeaf(page))
{
PostingItem *pitem = GinDataPageGetPostingItem(page, off);
PostingItemSetBlockNumber(pitem, updateblkno);
}
data.updateBlkno = updateblkno;
data.node = btree->index->rd_node; data.node = btree->index->rd_node;
data.blkno = BufferGetBlockNumber(buf); data.blkno = BufferGetBlockNumber(buf);
data.offset = off; data.offset = off;
...@@ -405,34 +392,43 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, ...@@ -405,34 +392,43 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
cnt++; cnt++;
rdata[cnt].buffer = InvalidBuffer; rdata[cnt].buffer = InvalidBuffer;
rdata[cnt].data = (GinPageIsLeaf(page)) ? ((char *) (btree->items + btree->curitem)) : ((char *) &(btree->pitem)); /* data and len filled in below */
rdata[cnt].len = sizeofitem;
rdata[cnt].next = NULL; rdata[cnt].next = NULL;
if (GinPageIsLeaf(page)) if (GinPageIsLeaf(page))
{ {
GinBtreeDataLeafInsertData *items = insertdata;
uint32 savedPos = items->curitem;
if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff) if (GinPageRightMost(page) && off > GinPageGetOpaque(page)->maxoff)
{ {
/* usually, create index... */ /* usually, create index... */
uint32 savedPos = btree->curitem; while (items->curitem < items->nitem)
while (btree->curitem < btree->nitem)
{ {
GinDataPageAddItemPointer(page, btree->items + btree->curitem, off); GinDataPageAddItemPointer(page, items->items + items->curitem, off);
off++; off++;
btree->curitem++; items->curitem++;
} }
data.nitem = btree->curitem - savedPos; data.nitem = items->curitem - savedPos;
rdata[cnt].len = sizeofitem * data.nitem;
} }
else else
{ {
GinDataPageAddItemPointer(page, btree->items + btree->curitem, off); GinDataPageAddItemPointer(page, items->items + items->curitem, off);
btree->curitem++; items->curitem++;
} }
rdata[cnt].data = (char *) &items->items[savedPos];
rdata[cnt].len = sizeof(ItemPointerData) * data.nitem;
} }
else else
GinDataPageAddPostingItem(page, &(btree->pitem), off); {
PostingItem *pitem = insertdata;
GinDataPageAddPostingItem(page, pitem, off);
rdata[cnt].data = (char *) pitem;
rdata[cnt].len = sizeof(PostingItem);
}
return true; return true;
} }
...@@ -444,7 +440,8 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, ...@@ -444,7 +440,8 @@ dataPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
* left page * left page
*/ */
static Page static Page
dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata) dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
void *insertdata, BlockNumber updateblkno, XLogRecData **prdata)
{ {
char *ptr; char *ptr;
OffsetNumber separator; OffsetNumber separator;
...@@ -457,7 +454,6 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe ...@@ -457,7 +454,6 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
Page rpage = BufferGetPage(rbuf); Page rpage = BufferGetPage(rbuf);
Size pageSize = PageGetPageSize(lpage); Size pageSize = PageGetPageSize(lpage);
Size freeSpace; Size freeSpace;
uint32 nCopied = 1;
/* these must be static so they can be returned to caller */ /* these must be static so they can be returned to caller */
static ginxlogSplit data; static ginxlogSplit data;
...@@ -468,9 +464,14 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe ...@@ -468,9 +464,14 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
freeSpace = GinDataPageGetFreeSpace(rpage); freeSpace = GinDataPageGetFreeSpace(rpage);
*prdata = rdata; *prdata = rdata;
data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
InvalidOffsetNumber : PostingItemGetBlockNumber(&(btree->pitem)); /* Update existing downlink to point to next page (on internal page) */
data.updateBlkno = dataPrepareData(btree, lpage, off); if (!isleaf)
{
PostingItem *pitem = GinDataPageGetPostingItem(lpage, off);
PostingItemSetBlockNumber(pitem, updateblkno);
}
if (isleaf) if (isleaf)
{ {
...@@ -487,16 +488,16 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe ...@@ -487,16 +488,16 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
if (isleaf && GinPageRightMost(lpage) && off > GinPageGetOpaque(lpage)->maxoff) if (isleaf && GinPageRightMost(lpage) && off > GinPageGetOpaque(lpage)->maxoff)
{ {
nCopied = 0; GinBtreeDataLeafInsertData *items = insertdata;
while (btree->curitem < btree->nitem &&
while (items->curitem < items->nitem &&
maxoff * sizeof(ItemPointerData) < 2 * (freeSpace - sizeof(ItemPointerData))) maxoff * sizeof(ItemPointerData) < 2 * (freeSpace - sizeof(ItemPointerData)))
{ {
memcpy(vector + maxoff * sizeof(ItemPointerData), memcpy(vector + maxoff * sizeof(ItemPointerData),
btree->items + btree->curitem, items->items + items->curitem,
sizeof(ItemPointerData)); sizeof(ItemPointerData));
maxoff++; maxoff++;
nCopied++; items->curitem++;
btree->curitem++;
} }
} }
else else
...@@ -506,11 +507,17 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe ...@@ -506,11 +507,17 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
memmove(ptr + sizeofitem, ptr, (maxoff - off + 1) * sizeofitem); memmove(ptr + sizeofitem, ptr, (maxoff - off + 1) * sizeofitem);
if (isleaf) if (isleaf)
{ {
memcpy(ptr, btree->items + btree->curitem, sizeofitem); GinBtreeDataLeafInsertData *items = insertdata;
btree->curitem++;
memcpy(ptr, items->items + items->curitem, sizeofitem);
items->curitem++;
} }
else else
memcpy(ptr, &(btree->pitem), sizeofitem); {
PostingItem *pitem = insertdata;
memcpy(ptr, pitem, sizeofitem);
}
maxoff++; maxoff++;
} }
...@@ -584,16 +591,18 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe ...@@ -584,16 +591,18 @@ dataSplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRe
} }
/* /*
* Prepare the state in 'btree' for inserting a downlink for given buffer. * Construct insertion payload for inserting the downlink for given buffer.
*/ */
static void static void *
dataPrepareDownlink(GinBtree btree, Buffer lbuf) dataPrepareDownlink(GinBtree btree, Buffer lbuf)
{ {
PostingItem *pitem = palloc(sizeof(PostingItem));
Page lpage = BufferGetPage(lbuf); Page lpage = BufferGetPage(lbuf);
PostingItemSetBlockNumber(&(btree->pitem), BufferGetBlockNumber(lbuf)); PostingItemSetBlockNumber(pitem, BufferGetBlockNumber(lbuf));
btree->pitem.key = *GinDataPageGetRightBound(lpage); pitem->key = *GinDataPageGetRightBound(lpage);
btree->rightblkno = GinPageGetOpaque(lpage)->rightlink;
return pitem;
} }
/* /*
...@@ -698,11 +707,12 @@ createPostingTree(Relation index, ItemPointerData *items, uint32 nitems, ...@@ -698,11 +707,12 @@ createPostingTree(Relation index, ItemPointerData *items, uint32 nitems,
} }
void void
ginPrepareDataScan(GinBtree btree, Relation index) ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno)
{ {
memset(btree, 0, sizeof(GinBtreeData)); memset(btree, 0, sizeof(GinBtreeData));
btree->index = index; btree->index = index;
btree->rootBlkno = rootBlkno;
btree->findChildPage = dataLocateItem; btree->findChildPage = dataLocateItem;
btree->getLeftMostChild = dataGetLeftMostPage; btree->getLeftMostChild = dataGetLeftMostPage;
...@@ -715,7 +725,6 @@ ginPrepareDataScan(GinBtree btree, Relation index) ...@@ -715,7 +725,6 @@ ginPrepareDataScan(GinBtree btree, Relation index)
btree->prepareDownlink = dataPrepareDownlink; btree->prepareDownlink = dataPrepareDownlink;
btree->isData = TRUE; btree->isData = TRUE;
btree->isDelete = FALSE;
btree->fullScan = FALSE; btree->fullScan = FALSE;
btree->isBuild = FALSE; btree->isBuild = FALSE;
} }
...@@ -729,29 +738,32 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno, ...@@ -729,29 +738,32 @@ ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
GinStatsData *buildStats) GinStatsData *buildStats)
{ {
GinBtreeData btree; GinBtreeData btree;
GinBtreeDataLeafInsertData insertdata;
GinBtreeStack *stack; GinBtreeStack *stack;
ginPrepareDataScan(&btree, index); ginPrepareDataScan(&btree, index, rootBlkno);
btree.isBuild = (buildStats != NULL); btree.isBuild = (buildStats != NULL);
btree.items = items; insertdata.items = items;
btree.nitem = nitem; insertdata.nitem = nitem;
btree.curitem = 0; insertdata.curitem = 0;
while (btree.curitem < btree.nitem) while (insertdata.curitem < insertdata.nitem)
{ {
stack = ginFindLeafPage(&btree, rootBlkno, false); /* search for the leaf page where the first item should go to */
btree.itemptr = insertdata.items[insertdata.curitem];
stack = ginFindLeafPage(&btree, false);
if (btree.findItem(&btree, stack)) if (btree.findItem(&btree, stack))
{ {
/* /*
* btree.items[btree.curitem] already exists in index * Current item already exists in index.
*/ */
btree.curitem++; insertdata.curitem++;
LockBuffer(stack->buffer, GIN_UNLOCK); LockBuffer(stack->buffer, GIN_UNLOCK);
freeGinBtreeStack(stack); freeGinBtreeStack(stack);
} }
else else
ginInsertValue(&btree, stack, buildStats); ginInsertValue(&btree, stack, &insertdata, buildStats);
} }
} }
...@@ -764,11 +776,11 @@ ginScanBeginPostingTree(Relation index, BlockNumber rootBlkno) ...@@ -764,11 +776,11 @@ ginScanBeginPostingTree(Relation index, BlockNumber rootBlkno)
GinBtreeData btree; GinBtreeData btree;
GinBtreeStack *stack; GinBtreeStack *stack;
ginPrepareDataScan(&btree, index); ginPrepareDataScan(&btree, index, rootBlkno);
btree.fullScan = TRUE; btree.fullScan = TRUE;
stack = ginFindLeafPage(&btree, rootBlkno, TRUE); stack = ginFindLeafPage(&btree, TRUE);
return stack; return stack;
} }
...@@ -112,6 +112,7 @@ GinFormTuple(GinState *ginstate, ...@@ -112,6 +112,7 @@ GinFormTuple(GinState *ginstate,
if (newsize != IndexTupleSize(itup)) if (newsize != IndexTupleSize(itup))
{ {
itup = repalloc(itup, newsize); itup = repalloc(itup, newsize);
/* /*
* PostgreSQL 9.3 and earlier did not clear this new space, so we * PostgreSQL 9.3 and earlier did not clear this new space, so we
* might find uninitialized padding when reading tuples from disk. * might find uninitialized padding when reading tuples from disk.
...@@ -431,22 +432,26 @@ entryGetLeftMostPage(GinBtree btree, Page page) ...@@ -431,22 +432,26 @@ entryGetLeftMostPage(GinBtree btree, Page page)
} }
static bool static bool
entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off) entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off,
GinBtreeEntryInsertData *insertData)
{ {
Size itupsz = 0; Size releasedsz = 0;
Size addedsz;
Page page = BufferGetPage(buf); Page page = BufferGetPage(buf);
Assert(btree->entry); Assert(insertData->entry);
Assert(!GinPageIsData(page)); Assert(!GinPageIsData(page));
if (btree->isDelete) if (insertData->isDelete)
{ {
IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off)); IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
itupsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData); releasedsz = MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData);
} }
if (PageGetFreeSpace(page) + itupsz >= MAXALIGN(IndexTupleSize(btree->entry)) + sizeof(ItemIdData)) addedsz = MAXALIGN(IndexTupleSize(insertData->entry)) + sizeof(ItemIdData);
if (PageGetFreeSpace(page) + releasedsz >= addedsz)
return true; return true;
return false; return false;
...@@ -457,42 +462,42 @@ entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off) ...@@ -457,42 +462,42 @@ entryIsEnoughSpace(GinBtree btree, Buffer buf, OffsetNumber off)
* should update it, update old child blkno to new right page * should update it, update old child blkno to new right page
* if child split occurred * if child split occurred
*/ */
static BlockNumber static void
entryPreparePage(GinBtree btree, Page page, OffsetNumber off) entryPreparePage(GinBtree btree, Page page, OffsetNumber off,
GinBtreeEntryInsertData *insertData, BlockNumber updateblkno)
{ {
BlockNumber ret = InvalidBlockNumber; Assert(insertData->entry);
Assert(btree->entry);
Assert(!GinPageIsData(page)); Assert(!GinPageIsData(page));
if (btree->isDelete) if (insertData->isDelete)
{ {
Assert(GinPageIsLeaf(page)); Assert(GinPageIsLeaf(page));
PageIndexTupleDelete(page, off); PageIndexTupleDelete(page, off);
} }
if (!GinPageIsLeaf(page) && btree->rightblkno != InvalidBlockNumber) if (!GinPageIsLeaf(page) && updateblkno != InvalidBlockNumber)
{ {
IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off)); IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, off));
GinSetDownlink(itup, btree->rightblkno); GinSetDownlink(itup, updateblkno);
ret = btree->rightblkno;
} }
btree->rightblkno = InvalidBlockNumber;
return ret;
} }
/* /*
* Place tuple on page and fills WAL record * Place tuple on page and fills WAL record
* *
* If the tuple doesn't fit, returns false without modifying the page. * If the tuple doesn't fit, returns false without modifying the page.
*
* On insertion to an internal node, in addition to inserting the given item,
* the downlink of the existing item at 'off' is updated to point to
* 'updateblkno'.
*/ */
static bool static bool
entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
void *insertPayload, BlockNumber updateblkno,
XLogRecData **prdata) XLogRecData **prdata)
{ {
GinBtreeEntryInsertData *insertData = insertPayload;
Page page = BufferGetPage(buf); Page page = BufferGetPage(buf);
OffsetNumber placed; OffsetNumber placed;
int cnt = 0; int cnt = 0;
...@@ -502,13 +507,17 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, ...@@ -502,13 +507,17 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
static ginxlogInsert data; static ginxlogInsert data;
/* quick exit if it doesn't fit */ /* quick exit if it doesn't fit */
if (!entryIsEnoughSpace(btree, buf, off)) if (!entryIsEnoughSpace(btree, buf, off, insertData))
return false; return false;
*prdata = rdata; *prdata = rdata;
data.updateBlkno = entryPreparePage(btree, page, off); entryPreparePage(btree, page, off, insertData, updateblkno);
data.updateBlkno = updateblkno;
placed = PageAddItem(page, (Item) btree->entry, IndexTupleSize(btree->entry), off, false, false); placed = PageAddItem(page,
(Item) insertData->entry,
IndexTupleSize(insertData->entry),
off, false, false);
if (placed != off) if (placed != off)
elog(ERROR, "failed to add item to index page in \"%s\"", elog(ERROR, "failed to add item to index page in \"%s\"",
RelationGetRelationName(btree->index)); RelationGetRelationName(btree->index));
...@@ -517,7 +526,7 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, ...@@ -517,7 +526,7 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
data.blkno = BufferGetBlockNumber(buf); data.blkno = BufferGetBlockNumber(buf);
data.offset = off; data.offset = off;
data.nitem = 1; data.nitem = 1;
data.isDelete = btree->isDelete; data.isDelete = insertData->isDelete;
data.isData = false; data.isData = false;
data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE; data.isLeaf = GinPageIsLeaf(page) ? TRUE : FALSE;
...@@ -545,12 +554,10 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, ...@@ -545,12 +554,10 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
cnt++; cnt++;
rdata[cnt].buffer = InvalidBuffer; rdata[cnt].buffer = InvalidBuffer;
rdata[cnt].data = (char *) btree->entry; rdata[cnt].data = (char *) insertData->entry;
rdata[cnt].len = IndexTupleSize(btree->entry); rdata[cnt].len = IndexTupleSize(insertData->entry);
rdata[cnt].next = NULL; rdata[cnt].next = NULL;
btree->entry = NULL;
return true; return true;
} }
...@@ -561,8 +568,11 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off, ...@@ -561,8 +568,11 @@ entryPlaceToPage(GinBtree btree, Buffer buf, OffsetNumber off,
* an equal number! * an equal number!
*/ */
static Page static Page
entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogRecData **prdata) entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off,
void *insertPayload,
BlockNumber updateblkno, XLogRecData **prdata)
{ {
GinBtreeEntryInsertData *insertData = insertPayload;
OffsetNumber i, OffsetNumber i,
maxoff, maxoff,
separator = InvalidOffsetNumber; separator = InvalidOffsetNumber;
...@@ -583,8 +593,9 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR ...@@ -583,8 +593,9 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
*prdata = rdata; *prdata = rdata;
data.leftChildBlkno = (GinPageIsLeaf(lpage)) ? data.leftChildBlkno = (GinPageIsLeaf(lpage)) ?
InvalidOffsetNumber : GinGetDownlink(btree->entry); InvalidOffsetNumber : GinGetDownlink(insertData->entry);
data.updateBlkno = entryPreparePage(btree, lpage, off); data.updateBlkno = updateblkno;
entryPreparePage(btree, lpage, off, insertData, updateblkno);
maxoff = PageGetMaxOffsetNumber(lpage); maxoff = PageGetMaxOffsetNumber(lpage);
ptr = tupstore; ptr = tupstore;
...@@ -593,8 +604,8 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR ...@@ -593,8 +604,8 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
{ {
if (i == off) if (i == off)
{ {
size = MAXALIGN(IndexTupleSize(btree->entry)); size = MAXALIGN(IndexTupleSize(insertData->entry));
memcpy(ptr, btree->entry, size); memcpy(ptr, insertData->entry, size);
ptr += size; ptr += size;
totalsize += size + sizeof(ItemIdData); totalsize += size + sizeof(ItemIdData);
} }
...@@ -608,8 +619,8 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR ...@@ -608,8 +619,8 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
if (off == maxoff + 1) if (off == maxoff + 1)
{ {
size = MAXALIGN(IndexTupleSize(btree->entry)); size = MAXALIGN(IndexTupleSize(insertData->entry));
memcpy(ptr, btree->entry, size); memcpy(ptr, insertData->entry, size);
ptr += size; ptr += size;
totalsize += size + sizeof(ItemIdData); totalsize += size + sizeof(ItemIdData);
} }
...@@ -667,20 +678,23 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR ...@@ -667,20 +678,23 @@ entrySplitPage(GinBtree btree, Buffer lbuf, Buffer rbuf, OffsetNumber off, XLogR
} }
/* /*
* Prepare the state in 'btree' for inserting a downlink for given buffer. * Construct insertion payload for inserting the downlink for given buffer.
*/ */
static void static void *
entryPrepareDownlink(GinBtree btree, Buffer lbuf) entryPrepareDownlink(GinBtree btree, Buffer lbuf)
{ {
GinBtreeEntryInsertData *insertData;
Page lpage = BufferGetPage(lbuf); Page lpage = BufferGetPage(lbuf);
BlockNumber lblkno = BufferGetBlockNumber(lbuf);
IndexTuple itup; IndexTuple itup;
itup = getRightMostTuple(lpage); itup = getRightMostTuple(lpage);
btree->entry = GinFormInteriorTuple(itup, insertData = palloc(sizeof(GinBtreeEntryInsertData));
lpage, insertData->entry = GinFormInteriorTuple(itup, lpage, lblkno);
BufferGetBlockNumber(lbuf)); insertData->isDelete = false;
btree->rightblkno = GinPageGetOpaque(lpage)->rightlink;
return insertData;
} }
/* /*
...@@ -724,6 +738,7 @@ ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum, ...@@ -724,6 +738,7 @@ ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum,
memset(btree, 0, sizeof(GinBtreeData)); memset(btree, 0, sizeof(GinBtreeData));
btree->index = ginstate->index; btree->index = ginstate->index;
btree->rootBlkno = GIN_ROOT_BLKNO;
btree->ginstate = ginstate; btree->ginstate = ginstate;
btree->findChildPage = entryLocateEntry; btree->findChildPage = entryLocateEntry;
...@@ -743,5 +758,4 @@ ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum, ...@@ -743,5 +758,4 @@ ginPrepareEntryScan(GinBtree btree, OffsetNumber attnum,
btree->entryAttnum = attnum; btree->entryAttnum = attnum;
btree->entryKey = key; btree->entryKey = key;
btree->entryCategory = category; btree->entryCategory = category;
btree->isDelete = FALSE;
} }
...@@ -374,7 +374,7 @@ restartScanEntry: ...@@ -374,7 +374,7 @@ restartScanEntry:
ginPrepareEntryScan(&btreeEntry, entry->attnum, ginPrepareEntryScan(&btreeEntry, entry->attnum,
entry->queryKey, entry->queryCategory, entry->queryKey, entry->queryCategory,
ginstate); ginstate);
stackEntry = ginFindLeafPage(&btreeEntry, GIN_ROOT_BLKNO, true); stackEntry = ginFindLeafPage(&btreeEntry, true);
page = BufferGetPage(stackEntry->buffer); page = BufferGetPage(stackEntry->buffer);
needUnlock = TRUE; needUnlock = TRUE;
......
...@@ -163,17 +163,20 @@ ginEntryInsert(GinState *ginstate, ...@@ -163,17 +163,20 @@ ginEntryInsert(GinState *ginstate,
GinStatsData *buildStats) GinStatsData *buildStats)
{ {
GinBtreeData btree; GinBtreeData btree;
GinBtreeEntryInsertData insertdata;
GinBtreeStack *stack; GinBtreeStack *stack;
IndexTuple itup; IndexTuple itup;
Page page; Page page;
insertdata.isDelete = FALSE;
/* During index build, count the to-be-inserted entry */ /* During index build, count the to-be-inserted entry */
if (buildStats) if (buildStats)
buildStats->nEntries++; buildStats->nEntries++;
ginPrepareEntryScan(&btree, attnum, key, category, ginstate); ginPrepareEntryScan(&btree, attnum, key, category, ginstate);
stack = ginFindLeafPage(&btree, GIN_ROOT_BLKNO, false); stack = ginFindLeafPage(&btree, false);
page = BufferGetPage(stack->buffer); page = BufferGetPage(stack->buffer);
if (btree.findItem(&btree, stack)) if (btree.findItem(&btree, stack))
...@@ -201,7 +204,7 @@ ginEntryInsert(GinState *ginstate, ...@@ -201,7 +204,7 @@ ginEntryInsert(GinState *ginstate,
itup = addItemPointersToLeafTuple(ginstate, itup, itup = addItemPointersToLeafTuple(ginstate, itup,
items, nitem, buildStats); items, nitem, buildStats);
btree.isDelete = TRUE; insertdata.isDelete = TRUE;
} }
else else
{ {
...@@ -211,8 +214,8 @@ ginEntryInsert(GinState *ginstate, ...@@ -211,8 +214,8 @@ ginEntryInsert(GinState *ginstate,
} }
/* Insert the new or modified leaf tuple */ /* Insert the new or modified leaf tuple */
btree.entry = itup; insertdata.entry = itup;
ginInsertValue(&btree, stack, buildStats); ginInsertValue(&btree, stack, &insertdata, buildStats);
pfree(itup); pfree(itup);
} }
......
...@@ -774,7 +774,7 @@ ginContinueSplit(ginIncompleteSplit *split) ...@@ -774,7 +774,7 @@ ginContinueSplit(ginIncompleteSplit *split)
GinState ginstate; GinState ginstate;
Relation reln; Relation reln;
Buffer buffer; Buffer buffer;
GinBtreeStack stack; GinBtreeStack *stack;
/* /*
* elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno, * elog(NOTICE,"ginContinueSplit root:%u l:%u r:%u", split->rootBlkno,
...@@ -802,22 +802,22 @@ ginContinueSplit(ginIncompleteSplit *split) ...@@ -802,22 +802,22 @@ ginContinueSplit(ginIncompleteSplit *split)
} }
else else
{ {
ginPrepareDataScan(&btree, reln); ginPrepareDataScan(&btree, reln, split->rootBlkno);
} }
stack.blkno = split->leftBlkno; stack = palloc(sizeof(GinBtreeStack));
stack.buffer = buffer; stack->blkno = split->leftBlkno;
stack.off = InvalidOffsetNumber; stack->buffer = buffer;
stack.parent = NULL; stack->off = InvalidOffsetNumber;
stack->parent = NULL;
ginFindParents(&btree, &stack, split->rootBlkno); ginFindParents(&btree, stack);
LockBuffer(stack->parent->buffer, GIN_UNLOCK);
ginFinishSplit(&btree, stack, NULL);
btree.prepareDownlink(&btree, buffer); /* buffer is released by ginFinishSplit */
ginInsertValue(&btree, stack.parent, NULL);
FreeFakeRelcacheEntry(reln); FreeFakeRelcacheEntry(reln);
UnlockReleaseBuffer(buffer);
} }
void void
......
...@@ -485,41 +485,59 @@ typedef struct GinBtreeData ...@@ -485,41 +485,59 @@ typedef struct GinBtreeData
/* insert methods */ /* insert methods */
OffsetNumber (*findChildPtr) (GinBtree, Page, BlockNumber, OffsetNumber); OffsetNumber (*findChildPtr) (GinBtree, Page, BlockNumber, OffsetNumber);
bool (*placeToPage) (GinBtree, Buffer, OffsetNumber, XLogRecData **); bool (*placeToPage) (GinBtree, Buffer, OffsetNumber, void *, BlockNumber, XLogRecData **);
Page (*splitPage) (GinBtree, Buffer, Buffer, OffsetNumber, XLogRecData **); Page (*splitPage) (GinBtree, Buffer, Buffer, OffsetNumber, void *, BlockNumber, XLogRecData **);
void (*prepareDownlink) (GinBtree, Buffer); void *(*prepareDownlink) (GinBtree, Buffer);
void (*fillRoot) (GinBtree, Buffer, Buffer, Buffer); void (*fillRoot) (GinBtree, Buffer, Buffer, Buffer);
bool isData; bool isData;
Relation index; Relation index;
BlockNumber rootBlkno;
GinState *ginstate; /* not valid in a data scan */ GinState *ginstate; /* not valid in a data scan */
bool fullScan; bool fullScan;
bool isBuild; bool isBuild;
BlockNumber rightblkno; /* Search key for Entry tree */
/* Entry options */
OffsetNumber entryAttnum; OffsetNumber entryAttnum;
Datum entryKey; Datum entryKey;
GinNullCategory entryCategory; GinNullCategory entryCategory;
IndexTuple entry;
bool isDelete;
/* Data (posting tree) options */ /* Search key for data tree (posting tree) */
ItemPointerData itemptr;
} GinBtreeData;
/* This represents a tuple to be inserted to entry tree. */
typedef struct
{
IndexTuple entry; /* tuple to insert */
bool isDelete; /* delete old tuple at same offset? */
} GinBtreeEntryInsertData;
/*
* This represents an itempointer, or many itempointers, to be inserted to
* a data (posting tree) leaf page
*/
typedef struct
{
ItemPointerData *items; ItemPointerData *items;
uint32 nitem; uint32 nitem;
uint32 curitem; uint32 curitem;
} GinBtreeDataLeafInsertData;
PostingItem pitem; /*
} GinBtreeData; * For internal data (posting tree) pages, the insertion payload is a
* PostingItem
*/
extern GinBtreeStack *ginFindLeafPage(GinBtree btree, BlockNumber rootBlkno, bool searchMode); extern GinBtreeStack *ginFindLeafPage(GinBtree btree, bool searchMode);
extern Buffer ginStepRight(Buffer buffer, Relation index, int lockmode); extern Buffer ginStepRight(Buffer buffer, Relation index, int lockmode);
extern void freeGinBtreeStack(GinBtreeStack *stack); extern void freeGinBtreeStack(GinBtreeStack *stack);
extern void ginInsertValue(GinBtree btree, GinBtreeStack *stack, extern void ginInsertValue(GinBtree btree, GinBtreeStack *stack,
void *insertdata, GinStatsData *buildStats);
extern void ginFindParents(GinBtree btree, GinBtreeStack *stack);
extern void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
GinStatsData *buildStats); GinStatsData *buildStats);
extern void ginFindParents(GinBtree btree, GinBtreeStack *stack, BlockNumber rootBlkno);
/* ginentrypage.c */ /* ginentrypage.c */
extern IndexTuple GinFormTuple(GinState *ginstate, extern IndexTuple GinFormTuple(GinState *ginstate,
...@@ -543,7 +561,7 @@ extern void ginInsertItemPointers(Relation index, BlockNumber rootBlkno, ...@@ -543,7 +561,7 @@ extern void ginInsertItemPointers(Relation index, BlockNumber rootBlkno,
GinStatsData *buildStats); GinStatsData *buildStats);
extern GinBtreeStack *ginScanBeginPostingTree(Relation index, BlockNumber rootBlkno); extern GinBtreeStack *ginScanBeginPostingTree(Relation index, BlockNumber rootBlkno);
extern void ginDataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf); extern void ginDataFillRoot(GinBtree btree, Buffer root, Buffer lbuf, Buffer rbuf);
extern void ginPrepareDataScan(GinBtree btree, Relation index); extern void ginPrepareDataScan(GinBtree btree, Relation index, BlockNumber rootBlkno);
/* ginscan.c */ /* ginscan.c */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment