Commit 9de3aa65 authored by Heikki Linnakangas's avatar Heikki Linnakangas

Rewrite the GiST insertion logic so that we don't need the post-recovery

cleanup stage to finish incomplete inserts or splits anymore. There was two
reasons for the cleanup step:

1. When a new tuple was inserted to a leaf page, the downlink in the parent
needed to be updated to contain (ie. to be consistent with) the new key.
Updating the parent in turn might require recursively updating the parent of
the parent. We now handle that by updating the parent while traversing down
the tree, so that when we insert the leaf tuple, all the parents are already
consistent with the new key, and the tree is consistent at every step.

2. When a page is split, we need to insert the downlink for the new right
page(s), and update the downlink for the original page to not include keys
that moved to the right page(s). We now handle that by setting a new flag,
F_FOLLOW_RIGHT, on the non-rightmost pages in the split. When that flag is
set, scans always follow the rightlink, regardless of the NSN mechanism used
to detect concurrent page splits. That way the tree is consistent right after
split, even though the downlink is still missing. This is very similar to the
way B-tree splits are handled. When the downlink is inserted in the parent,
the flag is cleared. To keep the insertion algorithm simple, when an
insertion sees an incomplete split, indicated by the F_FOLLOW_RIGHT flag, it
finishes the split before doing anything else.

These changes allow removing the whole "invalid tuple" mechanism, but I
retained the scan code to still follow invalid tuples correctly. While we
don't create any such tuples anymore, we want to handle them gracefully in
case you pg_upgrade a GiST index that has them. If we encounter any on an
insert, though, we just throw an error saying that you need to REINDEX.

The issue that got me into doing this is that if you did a checkpoint while
an insert or split was in progress, and the checkpoint finishes quickly so
that there is no WAL record related to the insert between RedoRecPtr and the
checkpoint record, recovery from that checkpoint would not know to finish
the incomplete insert. IOW, we have the same issue we solved with the
rm_safe_restartpoint mechanism during normal operation too. It's highly
unlikely to happen in practice, and this fix is far too large to backpatch,
so we're just going to live with in previous versions, but this refactoring
fixes it going forward.

With this patch, you don't get the annoying
'index "FOO" needs VACUUM or REINDEX to finish crash recovery' notices
anymore if you crash at an unfortunate moment.
parent 7a1ca897
...@@ -709,33 +709,4 @@ my_distance(PG_FUNCTION_ARGS) ...@@ -709,33 +709,4 @@ my_distance(PG_FUNCTION_ARGS)
</sect1> </sect1>
<sect1 id="gist-recovery">
<title>Crash Recovery</title>
<para>
Usually, replay of the WAL log is sufficient to restore the integrity
of a GiST index following a database crash. However, there are some
corner cases in which the index state is not fully rebuilt. The index
will still be functionally correct, but there might be some performance
degradation. When this occurs, the index can be repaired by
<command>VACUUM</>ing its table, or by rebuilding the index using
<command>REINDEX</>. In some cases a plain <command>VACUUM</> is
not sufficient, and either <command>VACUUM FULL</> or <command>REINDEX</>
is needed. The need for one of these procedures is indicated by occurrence
of this log message during crash recovery:
<programlisting>
LOG: index NNN/NNN/NNN needs VACUUM or REINDEX to finish crash recovery
</programlisting>
or this log message during routine index insertions:
<programlisting>
LOG: index "FOO" needs VACUUM or REINDEX to finish crash recovery
</programlisting>
If a plain <command>VACUUM</> finds itself unable to complete recovery
fully, it will return a notice:
<programlisting>
NOTICE: index "FOO" needs VACUUM FULL or REINDEX to finish crash recovery
</programlisting>
</para>
</sect1>
</chapter> </chapter>
...@@ -108,43 +108,71 @@ Penalty is used for choosing a subtree to insert; method PickSplit is used for ...@@ -108,43 +108,71 @@ Penalty is used for choosing a subtree to insert; method PickSplit is used for
the node splitting algorithm; method Union is used for propagating changes the node splitting algorithm; method Union is used for propagating changes
upward to maintain the tree properties. upward to maintain the tree properties.
NOTICE: We modified original INSERT algorithm for performance reason. In To insert a tuple, we first have to find a suitable leaf page to insert to.
particularly, it is now a single-pass algorithm. The algorithm walks down the tree, starting from the root, along the path
of smallest Penalty. At each step:
Function findLeaf is used to identify subtree for insertion. Page, in which
insertion is proceeded, is locked as well as its parent page. Functions 1. Has this page been split since we looked at the parent? If so, it's
findParent and findPath are used to find parent pages, which could be changed possible that we should be inserting to the other half instead, so retreat
because of concurrent access. Function pageSplit is recurrent and could split back to the parent.
page by more than 2 pages, which could be necessary if keys have different 2. If this is a leaf node, we've found our target node.
lengths or more than one key are inserted (in such situation, user defined 3. Otherwise use Penalty to pick a new target subtree.
function pickSplit cannot guarantee free space on page). 4. Check the key representing the target subtree. If it doesn't already cover
the key we're inserting, replace it with the Union of the old downlink key
findLeaf(new-key) and the key being inserted. (Actually, we always call Union, and just skip
push(stack, [root, 0]) //page, LSN the replacement if the Unioned key is the same as the existing key)
while(true) 5. Replacing the key in step 4 might cause the page to be split. In that case,
ptr = top of stack propagate the change upwards and restart the algorithm from the first parent
latch( ptr->page, S-mode ) that didn't need to be split.
ptr->lsn = ptr->page->lsn 6. Walk down to the target subtree, and goto 1.
if ( exists ptr->parent AND ptr->parent->lsn < ptr->page->nsn )
unlatch( ptr->page ) This differs from the insertion algorithm in the original paper. In the
pop stack original paper, you first walk down the tree until you reach a leaf page, and
else if ( ptr->page is not leaf ) then you adjust the downlink in the parent, and propagating the adjustment up,
push( stack, [get_best_child(ptr->page, new-key), 0] ) all the way up to the root in the worst case. But we adjust the downlinks to
unlatch( ptr->page ) cover the new key already when we walk down, so that when we reach the leaf
else page, we don't need to update the parents anymore, except to insert the
unlatch( ptr->page ) downlinks if we have to split the page. This makes crash recovery simpler:
latch( ptr->page, X-mode ) after inserting a key to the page, the tree is immediately self-consistent
if ( ptr->page is not leaf ) without having to update the parents. Even if we split a page and crash before
//the only root page can become a non-leaf inserting the downlink to the parent, the tree is self-consistent because the
unlatch( ptr->page ) right half of the split is accessible via the rightlink of the left page
else if ( ptr->parent->lsn < ptr->page->nsn ) (which replaced the original page).
unlatch( ptr->page )
pop stack Note that the algorithm can walk up and down the tree before reaching a leaf
else page, if internal pages need to split while adjusting the downlinks for the
return stack new key. Eventually, you should reach the bottom, and proceed with the
end insertion of the new tuple.
end
end Once we've found the target page to insert to, we check if there's room
for the new tuple. If there is, the tuple is inserted, and we're done.
If it doesn't fit, however, the page needs to be split. Note that it is
possible that a page needs to be split into more than two pages, if keys have
different lengths or more than one key is being inserted at a time (which can
happen when inserting downlinks for a page split that resulted in more than
two pages at the lower level). After splitting a page, the parent page needs
to be updated. The downlink for the new page needs to be inserted, and the
downlink for the old page, which became the left half of the split, needs to
be updated to only cover those tuples that stayed on the left page. Inserting
the downlink in the parent can again lead to a page split, recursing up to the
root page in the worst case.
gistplacetopage is the workhorse function that performs one step of the
insertion. If the tuple fits, it inserts it to the given page, otherwise
it splits the page, and constructs the new downlink tuples for the split
pages. The caller must then call gistplacetopage() on the parent page to
insert the downlink tuples. The parent page that holds the downlink to
the child might have migrated as a result of concurrent splits of the
parent, gistfindCorrectParent() is used to find the parent page.
Splitting the root page works slightly differently. At root split,
gistplacetopage() allocates the new child pages and replaces the old root
page with the new root containing downlinks to the new children, all in one
operation.
findPath is a subroutine of findParent, used when the correct parent page
can't be found by following the rightlinks at the parent level:
findPath( stack item ) findPath( stack item )
push stack, [root, 0, 0] // page, LSN, parent push stack, [root, 0, 0] // page, LSN, parent
...@@ -165,9 +193,13 @@ findPath( stack item ) ...@@ -165,9 +193,13 @@ findPath( stack item )
pop stack pop stack
end end
gistFindCorrectParent is used to re-find the parent of a page during
insertion. It might have migrated to the right since we traversed down the
tree because of page splits.
findParent( stack item ) findParent( stack item )
parent = item->parent parent = item->parent
latch( parent->page, X-mode )
if ( parent->page->lsn != parent->lsn ) if ( parent->page->lsn != parent->lsn )
while(true) while(true)
search parent tuple on parent->page, if found the return search parent tuple on parent->page, if found the return
...@@ -181,9 +213,13 @@ findParent( stack item ) ...@@ -181,9 +213,13 @@ findParent( stack item )
end end
newstack = findPath( item->parent ) newstack = findPath( item->parent )
replace part of stack to new one replace part of stack to new one
latch( parent->page, X-mode )
return findParent( item ) return findParent( item )
end end
pageSplit function decides how to distribute keys to the new pages after
page split:
pageSplit(page, allkeys) pageSplit(page, allkeys)
(lkeys, rkeys) = pickSplit( allkeys ) (lkeys, rkeys) = pickSplit( allkeys )
if ( page is root ) if ( page is root )
...@@ -204,39 +240,44 @@ pageSplit(page, allkeys) ...@@ -204,39 +240,44 @@ pageSplit(page, allkeys)
return newkeys return newkeys
placetopage(page, keysarray)
if ( no space left on page )
keysarray = pageSplit(page, [ extract_keys(page), keysarray])
last page in chain gets old NSN,
original and others - new NSN equals to LSN
if ( page is root )
make new root with keysarray
end
else
put keysarray on page
if ( length of keysarray > 1 )
keysarray = [ union(keysarray) ]
end
end
insert(new-key) Concurrency control
stack = findLeaf(new-key) -------------------
keysarray = [new-key] As a rule of thumb, if you need to hold a lock on multiple pages at the
ptr = top of stack same time, the locks should be acquired in the following order: child page
while(true) before parent, and left-to-right at the same level. Always acquiring the
findParent( ptr ) //findParent latches parent page locks in the same order avoids deadlocks.
keysarray = placetopage(ptr->page, keysarray)
unlatch( ptr->page ) The search algorithm only looks at and locks one page at a time. Consequently
pop stack; there's a race condition between a search and a page split. A page split
ptr = top of stack happens in two phases: 1. The page is split 2. The downlink is inserted to the
if (length of keysarray == 1) parent. If a search looks at the parent page between those steps, before the
newboundingkey = union(oldboundingkey, keysarray) downlink is inserted, it will still find the new right half by following the
if (newboundingkey == oldboundingkey) rightlink on the left half. But it must not follow the rightlink if it saw the
unlatch ptr->page downlink in the parent, or the page will be visited twice!
break loop
end A split initially marks the left page with the F_FOLLOW_RIGHT flag. If a scan
end sees that flag set, it knows that the right page is missing the downlink, and
end should be visited too. When split inserts the downlink to the parent, it
clears the F_FOLLOW_RIGHT flag in the child, and sets the NSN field in the
child page header to match the LSN of the insertion on the parent. If the
F_FOLLOW_RIGHT flag is not set, a scan compares the NSN on the child and the
LSN it saw in the parent. If NSN < LSN, the scan looked at the parent page
before the downlink was inserted, so it should follow the rightlink. Otherwise
the scan saw the downlink in the parent page, and will/did follow that as
usual.
A scan can't normally see a page with the F_FOLLOW_RIGHT flag set, because
a page split keeps the child pages locked until the downlink has been inserted
to the parent and the flag cleared again. But if a crash happens in the middle
of a page split, before the downlinks are inserted into the parent, that will
leave a page with F_FOLLOW_RIGHT in the tree. Scans handle that just fine,
but we'll eventually want to fix that for performance reasons. And more
importantly, dealing with pages with missing downlink pointers in the parent
would complicate the insertion algorithm. So when an insertion sees a page
with F_FOLLOW_RIGHT set, it immediately tries to bring the split that
crashed in the middle to completion by adding the downlink in the parent.
Authors: Authors:
Teodor Sigaev <teodor@sigaev.ru> Teodor Sigaev <teodor@sigaev.ru>
......
...@@ -31,6 +31,12 @@ typedef struct ...@@ -31,6 +31,12 @@ typedef struct
MemoryContext tmpCtx; MemoryContext tmpCtx;
} GISTBuildState; } GISTBuildState;
/* A List of these is used represent a split-in-progress. */
typedef struct
{
Buffer buf; /* the split page "half" */
IndexTuple downlink; /* downlink for this half. */
} GISTPageSplitInfo;
/* non-export function prototypes */ /* non-export function prototypes */
static void gistbuildCallback(Relation index, static void gistbuildCallback(Relation index,
...@@ -43,8 +49,13 @@ static void gistdoinsert(Relation r, ...@@ -43,8 +49,13 @@ static void gistdoinsert(Relation r,
IndexTuple itup, IndexTuple itup,
Size freespace, Size freespace,
GISTSTATE *GISTstate); GISTSTATE *GISTstate);
static void gistfindleaf(GISTInsertState *state, static void gistfixsplit(GISTInsertState *state, GISTSTATE *giststate);
GISTSTATE *giststate); static bool gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
GISTSTATE *giststate,
IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
Buffer leftchild);
static void gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
GISTSTATE *giststate, List *splitinfo);
#define ROTATEDIST(d) do { \ #define ROTATEDIST(d) do { \
...@@ -251,41 +262,52 @@ gistinsert(PG_FUNCTION_ARGS) ...@@ -251,41 +262,52 @@ gistinsert(PG_FUNCTION_ARGS)
/* /*
* Workhouse routine for doing insertion into a GiST index. Note that * Place tuples from 'itup' to 'buffer'. If 'oldoffnum' is valid, the tuple
* this routine assumes it is invoked in a short-lived memory context, * at that offset is atomically removed along with inserting the new tuples.
* so it does not bother releasing palloc'd allocations. * This is used to replace a tuple with a new one.
*
* If 'leftchildbuf' is valid, we're inserting the downlink for the page
* to the right of 'leftchildbuf', or updating the downlink for 'leftchildbuf'.
* F_FOLLOW_RIGHT flag on 'leftchildbuf' is cleared and NSN is set.
*
* If there is not enough room on the page, it is split. All the split
* pages are kept pinned and locked and returned in *splitinfo, the caller
* is responsible for inserting the downlinks for them. However, if
* 'buffer' is the root page and it needs to be split, gistplacetopage()
* performs the split as one atomic operation, and *splitinfo is set to NIL.
* In that case, we continue to hold the root page locked, and the child
* pages are released; note that new tuple(s) are *not* on the root page
* but in one of the new child pages.
*/ */
static void static bool
gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate) gistplacetopage(GISTInsertState *state, GISTSTATE *giststate,
Buffer buffer,
IndexTuple *itup, int ntup, OffsetNumber oldoffnum,
Buffer leftchildbuf,
List **splitinfo)
{ {
GISTInsertState state; Page page = BufferGetPage(buffer);
bool is_leaf = (GistPageIsLeaf(page)) ? true : false;
memset(&state, 0, sizeof(GISTInsertState)); XLogRecPtr recptr;
int i;
state.itup = (IndexTuple *) palloc(sizeof(IndexTuple)); bool is_split;
state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
memcpy(state.itup[0], itup, IndexTupleSize(itup));
state.ituplen = 1;
state.freespace = freespace;
state.r = r;
state.key = itup->t_tid;
state.needInsertComplete = true;
state.stack = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
state.stack->blkno = GIST_ROOT_BLKNO;
gistfindleaf(&state, giststate); /*
gistmakedeal(&state, giststate); * Refuse to modify a page that's incompletely split. This should
} * not happen because we finish any incomplete splits while we walk
* down the tree. However, it's remotely possible that another
* concurrent inserter splits a parent page, and errors out before
* completing the split. We will just throw an error in that case,
* and leave any split we had in progress unfinished too. The next
* insert that comes along will clean up the mess.
*/
if (GistFollowRight(page))
elog(ERROR, "concurrent GiST page split was incomplete");
static bool *splitinfo = NIL;
gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
{
bool is_splitted = false;
bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
/* /*
* if (!is_leaf) remove old key: This node's key has been modified, either * if isupdate, remove old key: This node's key has been modified, either
* because a child split occurred or because we needed to adjust our key * because a child split occurred or because we needed to adjust our key
* for an insert in a child node. Therefore, remove the old version of * for an insert in a child node. Therefore, remove the old version of
* this node's key. * this node's key.
...@@ -293,77 +315,134 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) ...@@ -293,77 +315,134 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
* for WAL replay, in the non-split case we handle this by setting up a * for WAL replay, in the non-split case we handle this by setting up a
* one-element todelete array; in the split case, it's handled implicitly * one-element todelete array; in the split case, it's handled implicitly
* because the tuple vector passed to gistSplit won't include this tuple. * because the tuple vector passed to gistSplit won't include this tuple.
*
* XXX: If we want to change fillfactors between node and leaf, fillfactor
* = (is_leaf ? state->leaf_fillfactor : state->node_fillfactor)
*/ */
if (gistnospace(state->stack->page, state->itup, state->ituplen, is_split = gistnospace(page, itup, ntup, oldoffnum, state->freespace);
is_leaf ? InvalidOffsetNumber : state->stack->childoffnum, if (is_split)
state->freespace))
{ {
/* no space for insertion */ /* no space for insertion */
IndexTuple *itvec; IndexTuple *itvec;
int tlen; int tlen;
SplitedPageLayout *dist = NULL, SplitedPageLayout *dist = NULL,
*ptr; *ptr;
BlockNumber rrlink = InvalidBlockNumber; BlockNumber oldrlink = InvalidBlockNumber;
GistNSN oldnsn; GistNSN oldnsn = { 0, 0 };
SplitedPageLayout rootpg;
BlockNumber blkno = BufferGetBlockNumber(buffer);
bool is_rootsplit;
is_splitted = true; is_rootsplit = (blkno == GIST_ROOT_BLKNO);
/* /*
* Form index tuples vector to split: remove old tuple if t's needed * Form index tuples vector to split. If we're replacing an old tuple,
* and add new tuples to vector * remove the old version from the vector.
*/ */
itvec = gistextractpage(state->stack->page, &tlen); itvec = gistextractpage(page, &tlen);
if (!is_leaf) if (OffsetNumberIsValid(oldoffnum))
{ {
/* on inner page we should remove old tuple */ /* on inner page we should remove old tuple */
int pos = state->stack->childoffnum - FirstOffsetNumber; int pos = oldoffnum - FirstOffsetNumber;
tlen--; tlen--;
if (pos != tlen) if (pos != tlen)
memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos)); memmove(itvec + pos, itvec + pos + 1, sizeof(IndexTuple) * (tlen - pos));
} }
itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen); itvec = gistjoinvector(itvec, &tlen, itup, ntup);
dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate); dist = gistSplit(state->r, page, itvec, tlen, giststate);
state->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * tlen);
state->ituplen = 0;
if (state->stack->blkno != GIST_ROOT_BLKNO)
{
/* /*
* if non-root split then we should not allocate new buffer, but * Set up pages to work with. Allocate new buffers for all but the
* we must create temporary page to operate * leftmost page. The original page becomes the new leftmost page,
* and is just replaced with the new contents.
*
* For a root-split, allocate new buffers for all child pages, the
* original page is overwritten with new root page containing
* downlinks to the new child pages.
*/ */
dist->buffer = state->stack->buffer; ptr = dist;
dist->page = PageGetTempPageCopySpecial(BufferGetPage(dist->buffer)); if (!is_rootsplit)
{
/* save old rightlink and NSN */
oldrlink = GistPageGetOpaque(page)->rightlink;
oldnsn = GistPageGetOpaque(page)->nsn;
dist->buffer = buffer;
dist->block.blkno = BufferGetBlockNumber(buffer);
dist->page = PageGetTempPageCopySpecial(BufferGetPage(buffer));
/* clean all flags except F_LEAF */ /* clean all flags except F_LEAF */
GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0; GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
ptr = ptr->next;
}
for (; ptr; ptr = ptr->next)
{
/* Allocate new page */
ptr->buffer = gistNewBuffer(state->r);
GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0);
ptr->page = BufferGetPage(ptr->buffer);
ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
} }
/* make new pages and fills them */ /*
* Now that we know whick blocks the new pages go to, set up downlink
* tuples to point to them.
*/
for (ptr = dist; ptr; ptr = ptr->next) for (ptr = dist; ptr; ptr = ptr->next)
{ {
ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
GistTupleSetValid(ptr->itup);
}
/*
* If this is a root split, we construct the new root page with the
* downlinks here directly, instead of requiring the caller to insert
* them. Add the new root page to the list along with the child pages.
*/
if (is_rootsplit)
{
IndexTuple *downlinks;
int ndownlinks = 0;
int i; int i;
char *data;
/* get new page */ rootpg.buffer = buffer;
if (ptr->buffer == InvalidBuffer) rootpg.page = PageGetTempPageCopySpecial(BufferGetPage(rootpg.buffer));
GistPageGetOpaque(rootpg.page)->flags = 0;
/* Prepare a vector of all the downlinks */
for (ptr = dist; ptr; ptr = ptr->next)
ndownlinks++;
downlinks = palloc(sizeof(IndexTuple) * ndownlinks);
for (i = 0, ptr = dist; ptr; ptr = ptr->next)
downlinks[i++] = ptr->itup;
rootpg.block.blkno = GIST_ROOT_BLKNO;
rootpg.block.num = ndownlinks;
rootpg.list = gistfillitupvec(downlinks, ndownlinks,
&(rootpg.lenlist));
rootpg.itup = NULL;
rootpg.next = dist;
dist = &rootpg;
}
else
{ {
ptr->buffer = gistNewBuffer(state->r); /* Prepare split-info to be returned to caller */
GISTInitBuffer(ptr->buffer, (is_leaf) ? F_LEAF : 0); for (ptr = dist; ptr; ptr = ptr->next)
ptr->page = BufferGetPage(ptr->buffer); {
GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
si->buf = ptr->buffer;
si->downlink = ptr->itup;
*splitinfo = lappend(*splitinfo, si);
}
} }
ptr->block.blkno = BufferGetBlockNumber(ptr->buffer);
/* /*
* fill page, we can do it because all these pages are new (ie not * Fill all pages. All the pages are new, ie. freshly allocated empty
* linked in tree or masked by temp page * pages, or a temporary copy of the old page.
*/ */
data = (char *) (ptr->list); for (ptr = dist; ptr; ptr = ptr->next)
{
char *data = (char *) (ptr->list);
for (i = 0; i < ptr->block.num; i++) for (i = 0; i < ptr->block.num; i++)
{ {
if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber) if (PageAddItem(ptr->page, (Item) data, IndexTupleSize((IndexTuple) data), i + FirstOffsetNumber, false, false) == InvalidOffsetNumber)
...@@ -371,245 +450,346 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) ...@@ -371,245 +450,346 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
data += IndexTupleSize((IndexTuple) data); data += IndexTupleSize((IndexTuple) data);
} }
/* set up ItemPointer and remember it for parent */ /* Set up rightlinks */
ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno); if (ptr->next && ptr->block.blkno != GIST_ROOT_BLKNO)
state->itup[state->ituplen] = ptr->itup; GistPageGetOpaque(ptr->page)->rightlink =
state->ituplen++; ptr->next->block.blkno;
} else
GistPageGetOpaque(ptr->page)->rightlink = oldrlink;
/* saves old rightlink */ if (ptr->next && !is_rootsplit)
if (state->stack->blkno != GIST_ROOT_BLKNO) GistMarkFollowRight(ptr->page);
rrlink = GistPageGetOpaque(dist->page)->rightlink; else
GistClearFollowRight(ptr->page);
/*
* Copy the NSN of the original page to all pages. The
* F_FOLLOW_RIGHT flags ensure that scans will follow the
* rightlinks until the downlinks are inserted.
*/
GistPageGetOpaque(ptr->page)->nsn = oldnsn;
}
START_CRIT_SECTION(); START_CRIT_SECTION();
/* /*
* must mark buffers dirty before XLogInsert, even though we'll still * Must mark buffers dirty before XLogInsert, even though we'll still
* be changing their opaque fields below. set up right links. * be changing their opaque fields below.
*/ */
for (ptr = dist; ptr; ptr = ptr->next) for (ptr = dist; ptr; ptr = ptr->next)
{
MarkBufferDirty(ptr->buffer); MarkBufferDirty(ptr->buffer);
GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ? if (BufferIsValid(leftchildbuf))
ptr->next->block.blkno : rrlink; MarkBufferDirty(leftchildbuf);
}
/* restore splitted non-root page */ /*
if (state->stack->blkno != GIST_ROOT_BLKNO) * The first page in the chain was a temporary working copy meant
{ * to replace the old page. Copy it over the old page.
*/
PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer)); PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
dist->page = BufferGetPage(dist->buffer); dist->page = BufferGetPage(dist->buffer);
}
/* Write the WAL record */
if (RelationNeedsWAL(state->r)) if (RelationNeedsWAL(state->r))
{ recptr = gistXLogSplit(state->r->rd_node, blkno, is_leaf,
XLogRecPtr recptr; dist, oldrlink, oldnsn, leftchildbuf);
XLogRecData *rdata; else
recptr = GetXLogRecPtrForTemp();
rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
is_leaf, &(state->key), dist);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
for (ptr = dist; ptr; ptr = ptr->next) for (ptr = dist; ptr; ptr = ptr->next)
{ {
PageSetLSN(ptr->page, recptr); PageSetLSN(ptr->page, recptr);
PageSetTLI(ptr->page, ThisTimeLineID); PageSetTLI(ptr->page, ThisTimeLineID);
} }
}
else
{
for (ptr = dist; ptr; ptr = ptr->next)
{
PageSetLSN(ptr->page, GetXLogRecPtrForTemp());
}
}
/* set up NSN */
oldnsn = GistPageGetOpaque(dist->page)->nsn;
if (state->stack->blkno == GIST_ROOT_BLKNO)
/* if root split we should put initial value */
oldnsn = PageGetLSN(dist->page);
for (ptr = dist; ptr; ptr = ptr->next)
{
/* only for last set oldnsn */
GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ?
PageGetLSN(ptr->page) : oldnsn;
}
/* /*
* release buffers, if it was a root split then release all buffers * Return the new child buffers to the caller.
* because we create all buffers *
* If this was a root split, we've already inserted the downlink
* pointers, in the form of a new root page. Therefore we can
* release all the new buffers, and keep just the root page locked.
*/ */
ptr = (state->stack->blkno == GIST_ROOT_BLKNO) ? dist : dist->next; if (is_rootsplit)
for (; ptr; ptr = ptr->next)
UnlockReleaseBuffer(ptr->buffer);
if (state->stack->blkno == GIST_ROOT_BLKNO)
{ {
gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key)); for (ptr = dist->next; ptr; ptr = ptr->next)
state->needInsertComplete = false; UnlockReleaseBuffer(ptr->buffer);
} }
END_CRIT_SECTION();
} }
else else
{ {
/* enough space */ /*
* Enough space. We also get here if ntuples==0.
*/
START_CRIT_SECTION(); START_CRIT_SECTION();
if (!is_leaf) if (OffsetNumberIsValid(oldoffnum))
PageIndexTupleDelete(state->stack->page, state->stack->childoffnum); PageIndexTupleDelete(page, oldoffnum);
gistfillbuffer(state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber); gistfillbuffer(page, itup, ntup, InvalidOffsetNumber);
MarkBufferDirty(buffer);
MarkBufferDirty(state->stack->buffer); if (BufferIsValid(leftchildbuf))
MarkBufferDirty(leftchildbuf);
if (RelationNeedsWAL(state->r)) if (RelationNeedsWAL(state->r))
{ {
OffsetNumber noffs = 0, OffsetNumber ndeloffs = 0,
offs[1]; deloffs[1];
XLogRecPtr recptr;
XLogRecData *rdata;
if (!is_leaf) if (OffsetNumberIsValid(oldoffnum))
{ {
/* only on inner page we should delete previous version */ deloffs[0] = oldoffnum;
offs[0] = state->stack->childoffnum; ndeloffs = 1;
noffs = 1;
} }
rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer, recptr = gistXLogUpdate(state->r->rd_node, buffer,
offs, noffs, deloffs, ndeloffs, itup, ntup,
state->itup, state->ituplen, leftchildbuf);
&(state->key));
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata); PageSetLSN(page, recptr);
PageSetLSN(state->stack->page, recptr); PageSetTLI(page, ThisTimeLineID);
PageSetTLI(state->stack->page, ThisTimeLineID);
} }
else else
PageSetLSN(state->stack->page, GetXLogRecPtrForTemp()); {
recptr = GetXLogRecPtrForTemp();
if (state->stack->blkno == GIST_ROOT_BLKNO) PageSetLSN(page, recptr);
state->needInsertComplete = false; }
END_CRIT_SECTION();
if (state->ituplen > 1) *splitinfo = NIL;
{ /* previous is_splitted==true */ }
/* /*
* child was splited, so we must form union for insertion in * If we inserted the downlink for a child page, set NSN and clear
* parent * F_FOLLOW_RIGHT flag on the left child, so that concurrent scans know
* to follow the rightlink if and only if they looked at the parent page
* before we inserted the downlink.
*
* Note that we do this *after* writing the WAL record. That means that
* the possible full page image in the WAL record does not include
* these changes, and they must be replayed even if the page is restored
* from the full page image. There's a chicken-and-egg problem: if we
* updated the child pages first, we wouldn't know the recptr of the WAL
* record we're about to write.
*/ */
IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate); if (BufferIsValid(leftchildbuf))
ItemPointerSetBlockNumber(&(newtup->t_tid), state->stack->blkno);
state->itup[0] = newtup;
state->ituplen = 1;
}
else if (is_leaf)
{ {
/* Page leftpg = BufferGetPage(leftchildbuf);
* itup[0] store key to adjust parent, we set it to valid to
* correct check by GistTupleIsInvalid macro in gistgetadjusted() GistPageGetOpaque(leftpg)->nsn = recptr;
*/ GistClearFollowRight(leftpg);
ItemPointerSetBlockNumber(&(state->itup[0]->t_tid), state->stack->blkno);
GistTupleSetValid(state->itup[0]); PageSetLSN(leftpg, recptr);
} PageSetTLI(leftpg, ThisTimeLineID);
} }
return is_splitted;
END_CRIT_SECTION();
return is_split;
} }
/* /*
* returns stack of pages, all pages in stack are pinned, and * Workhouse routine for doing insertion into a GiST index. Note that
* leaf is X-locked * this routine assumes it is invoked in a short-lived memory context,
* so it does not bother releasing palloc'd allocations.
*/ */
static void static void
gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) gistdoinsert(Relation r, IndexTuple itup, Size freespace, GISTSTATE *giststate)
{ {
ItemId iid; ItemId iid;
IndexTuple idxtuple; IndexTuple idxtuple;
GISTPageOpaque opaque; GISTInsertStack firststack;
GISTInsertStack *stack;
GISTInsertState state;
bool xlocked = false;
memset(&state, 0, sizeof(GISTInsertState));
state.freespace = freespace;
state.r = r;
/* Start from the root */
firststack.blkno = GIST_ROOT_BLKNO;
firststack.lsn.xrecoff = 0;
firststack.parent = NULL;
state.stack = stack = &firststack;
/* /*
* walk down, We don't lock page for a long time, but so we should be * Walk down along the path of smallest penalty, updating the parent
* ready to recheck path in a bad case... We remember, that page->lsn * pointers with the key we're inserting as we go. If we crash in the
* should never be invalid. * middle, the tree is consistent, although the possible parent updates
* were a waste.
*/ */
for (;;) for (;;)
{ {
if (XLogRecPtrIsInvalid(state->stack->lsn)) if (XLogRecPtrIsInvalid(stack->lsn))
state->stack->buffer = ReadBuffer(state->r, state->stack->blkno); stack->buffer = ReadBuffer(state.r, stack->blkno);
LockBuffer(state->stack->buffer, GIST_SHARE);
gistcheckpage(state->r, state->stack->buffer); /*
* Be optimistic and grab shared lock first. Swap it for an
* exclusive lock later if we need to update the page.
*/
if (!xlocked)
{
LockBuffer(stack->buffer, GIST_SHARE);
gistcheckpage(state.r, stack->buffer);
}
state->stack->page = (Page) BufferGetPage(state->stack->buffer); stack->page = (Page) BufferGetPage(stack->buffer);
opaque = GistPageGetOpaque(state->stack->page); stack->lsn = PageGetLSN(stack->page);
Assert(!RelationNeedsWAL(state.r) || !XLogRecPtrIsInvalid(stack->lsn));
/*
* If this page was split but the downlink was never inserted to
* the parent because the inserting backend crashed before doing
* that, fix that now.
*/
if (GistFollowRight(stack->page))
{
if (!xlocked)
{
LockBuffer(stack->buffer, GIST_UNLOCK);
LockBuffer(stack->buffer, GIST_EXCLUSIVE);
xlocked = true;
/* someone might've completed the split when we unlocked */
if (!GistFollowRight(stack->page))
continue;
}
gistfixsplit(&state, giststate);
state->stack->lsn = PageGetLSN(state->stack->page); UnlockReleaseBuffer(stack->buffer);
Assert(!RelationNeedsWAL(state->r) || !XLogRecPtrIsInvalid(state->stack->lsn)); xlocked = false;
state.stack = stack = stack->parent;
continue;
}
if (state->stack->blkno != GIST_ROOT_BLKNO && if (stack->blkno != GIST_ROOT_BLKNO &&
XLByteLT(state->stack->parent->lsn, opaque->nsn)) XLByteLT(stack->parent->lsn,
GistPageGetOpaque(stack->page)->nsn))
{ {
/* /*
* caused split non-root page is detected, go up to parent to * Concurrent split detected. There's no guarantee that the
* choose best child * downlink for this page is consistent with the tuple we're
* inserting anymore, so go back to parent and rechoose the
* best child.
*/ */
UnlockReleaseBuffer(state->stack->buffer); UnlockReleaseBuffer(stack->buffer);
state->stack = state->stack->parent; xlocked = false;
state.stack = stack = stack->parent;
continue; continue;
} }
if (!GistPageIsLeaf(state->stack->page)) if (!GistPageIsLeaf(stack->page))
{ {
/* /*
* This is an internal page, so continue to walk down the tree. We * This is an internal page so continue to walk down the tree.
* find the child node that has the minimum insertion penalty and * Find the child node that has the minimum insertion penalty.
* recursively invoke ourselves to modify that node. Once the
* recursive call returns, we may need to adjust the parent node
* for two reasons: the child node split, or the key in this node
* needs to be adjusted for the newly inserted key below us.
*/ */
GISTInsertStack *item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack)); BlockNumber childblkno;
IndexTuple newtup;
GISTInsertStack *item;
state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate); stack->childoffnum = gistchoose(state.r, stack->page, itup, giststate);
iid = PageGetItemId(stack->page, stack->childoffnum);
idxtuple = (IndexTuple) PageGetItem(stack->page, iid);
childblkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid));
/*
* Check that it's not a leftover invalid tuple from pre-9.1
*/
if (GistTupleIsInvalid(idxtuple))
ereport(ERROR,
(errmsg("index \"%s\" contains an inner tuple marked as invalid",
RelationGetRelationName(r)),
errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."),
errhint("Please REINDEX it.")));
iid = PageGetItemId(state->stack->page, state->stack->childoffnum); /*
idxtuple = (IndexTuple) PageGetItem(state->stack->page, iid); * Check that the key representing the target child node is
item->blkno = ItemPointerGetBlockNumber(&(idxtuple->t_tid)); * consistent with the key we're inserting. Update it if it's not.
LockBuffer(state->stack->buffer, GIST_UNLOCK); */
newtup = gistgetadjusted(state.r, idxtuple, itup, giststate);
if (newtup)
{
/*
* Swap shared lock for an exclusive one. Beware, the page
* may change while we unlock/lock the page...
*/
if (!xlocked)
{
LockBuffer(stack->buffer, GIST_UNLOCK);
LockBuffer(stack->buffer, GIST_EXCLUSIVE);
xlocked = true;
stack->page = (Page) BufferGetPage(stack->buffer);
item->parent = state->stack; if (!XLByteEQ(PageGetLSN(stack->page), stack->lsn))
item->child = NULL; {
if (state->stack) /* the page was changed while we unlocked it, retry */
state->stack->child = item; continue;
state->stack = item; }
}
/*
* Update the tuple.
*
* gistinserthere() might have to split the page to make the
* updated tuple fit. It will adjust the stack so that after
* the call, we'll be holding a lock on the page containing
* the tuple, which might have moved right.
*
* Except if this causes a root split, gistinserthere()
* returns 'true'. In that case, stack only holds the new
* root, and the child page was released. Have to start
* all over.
*/
if (gistinserttuples(&state, stack, giststate, &newtup, 1,
stack->childoffnum, InvalidBuffer))
{
UnlockReleaseBuffer(stack->buffer);
xlocked = false;
state.stack = stack = stack->parent;
continue;
}
}
LockBuffer(stack->buffer, GIST_UNLOCK);
xlocked = false;
/* descend to the chosen child */
item = (GISTInsertStack *) palloc0(sizeof(GISTInsertStack));
item->blkno = childblkno;
item->parent = stack;
state.stack = stack = item;
} }
else else
{ {
/* be carefull, during unlock/lock page may be changed... */ /*
LockBuffer(state->stack->buffer, GIST_UNLOCK); * Leaf page. Insert the new key. We've already updated all the
LockBuffer(state->stack->buffer, GIST_EXCLUSIVE); * parents on the way down, but we might have to split the page
state->stack->page = (Page) BufferGetPage(state->stack->buffer); * if it doesn't fit. gistinserthere() will take care of that.
opaque = GistPageGetOpaque(state->stack->page); */
/*
* Swap shared lock for an exclusive one. Be careful, the page
* may change while we unlock/lock the page...
*/
if (!xlocked)
{
LockBuffer(stack->buffer, GIST_UNLOCK);
LockBuffer(stack->buffer, GIST_EXCLUSIVE);
xlocked = true;
stack->page = (Page) BufferGetPage(stack->buffer);
stack->lsn = PageGetLSN(stack->page);
if (state->stack->blkno == GIST_ROOT_BLKNO) if (stack->blkno == GIST_ROOT_BLKNO)
{ {
/* /*
* the only page can become inner instead of leaf is a root * the only page that can become inner instead of leaf
* page, so for root we should recheck it * is the root page, so for root we should recheck it
*/ */
if (!GistPageIsLeaf(state->stack->page)) if (!GistPageIsLeaf(stack->page))
{ {
/* /*
* very rarely situation: during unlock/lock index with * very rare situation: during unlock/lock index with
* number of pages = 1 was increased * number of pages = 1 was increased
*/ */
LockBuffer(state->stack->buffer, GIST_UNLOCK); LockBuffer(stack->buffer, GIST_UNLOCK);
xlocked = false;
continue; continue;
} }
...@@ -617,30 +797,34 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) ...@@ -617,30 +797,34 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
* we don't need to check root split, because checking * we don't need to check root split, because checking
* leaf/inner is enough to recognize split for root * leaf/inner is enough to recognize split for root
*/ */
} }
else if (XLByteLT(state->stack->parent->lsn, opaque->nsn)) else if (GistFollowRight(stack->page) ||
XLByteLT(stack->parent->lsn,
GistPageGetOpaque(stack->page)->nsn))
{ {
/* /*
* detecting split during unlock/lock, so we should find * The page was split while we momentarily unlocked the
* better child on parent * page. Go back to parent.
*/ */
UnlockReleaseBuffer(stack->buffer);
/* forget buffer */ xlocked = false;
UnlockReleaseBuffer(state->stack->buffer); state.stack = stack = stack->parent;
state->stack = state->stack->parent;
continue; continue;
} }
}
state->stack->lsn = PageGetLSN(state->stack->page); /* now state.stack->(page, buffer and blkno) points to leaf page */
/* ok we found a leaf page and it X-locked */ gistinserttuples(&state, stack, giststate, &itup, 1,
InvalidOffsetNumber, InvalidBuffer);
LockBuffer(stack->buffer, GIST_UNLOCK);
/* Release any pins we might still hold before exiting */
for (; stack; stack = stack->parent)
ReleaseBuffer(stack->buffer);
break; break;
} }
} }
/* now state->stack->(page, buffer and blkno) points to leaf page */
} }
/* /*
...@@ -648,7 +832,7 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate) ...@@ -648,7 +832,7 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
* *
* returns from the beginning of closest parent; * returns from the beginning of closest parent;
* *
* To prevent deadlocks, this should lock only one page simultaneously. * To prevent deadlocks, this should lock only one page at a time.
*/ */
GISTInsertStack * GISTInsertStack *
gistFindPath(Relation r, BlockNumber child) gistFindPath(Relation r, BlockNumber child)
...@@ -683,6 +867,13 @@ gistFindPath(Relation r, BlockNumber child) ...@@ -683,6 +867,13 @@ gistFindPath(Relation r, BlockNumber child)
top->lsn = PageGetLSN(page); top->lsn = PageGetLSN(page);
/*
* If F_FOLLOW_RIGHT is set, the page to the right doesn't have a
* downlink. This should not normally happen..
*/
if (GistFollowRight(page))
elog(ERROR, "concurrent GiST page split was incomplete");
if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) && if (top->parent && XLByteLT(top->parent->lsn, GistPageGetOpaque(page)->nsn) &&
GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ ) GistPageGetOpaque(page)->rightlink != InvalidBlockNumber /* sanity check */ )
{ {
...@@ -711,8 +902,6 @@ gistFindPath(Relation r, BlockNumber child) ...@@ -711,8 +902,6 @@ gistFindPath(Relation r, BlockNumber child)
ptr = top; ptr = top;
while (ptr->parent) while (ptr->parent)
{ {
/* set child link */
ptr->parent->child = ptr;
/* move childoffnum.. */ /* move childoffnum.. */
if (ptr == top) if (ptr == top)
{ {
...@@ -754,17 +943,16 @@ gistFindPath(Relation r, BlockNumber child) ...@@ -754,17 +943,16 @@ gistFindPath(Relation r, BlockNumber child)
return NULL; return NULL;
} }
/* /*
* Returns X-locked parent of stack page * Updates the stack so that child->parent is the correct parent of the
* child. child->parent must be exclusively locked on entry, and will
* remain so at exit, but it might not be the same page anymore.
*/ */
static void static void
gistFindCorrectParent(Relation r, GISTInsertStack *child) gistFindCorrectParent(Relation r, GISTInsertStack *child)
{ {
GISTInsertStack *parent = child->parent; GISTInsertStack *parent = child->parent;
LockBuffer(parent->buffer, GIST_EXCLUSIVE);
gistcheckpage(r, parent->buffer); gistcheckpage(r, parent->buffer);
parent->page = (Page) BufferGetPage(parent->buffer); parent->page = (Page) BufferGetPage(parent->buffer);
...@@ -836,83 +1024,231 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child) ...@@ -836,83 +1024,231 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child)
/* install new chain of parents to stack */ /* install new chain of parents to stack */
child->parent = parent; child->parent = parent;
parent->child = child;
/* make recursive call to normal processing */ /* make recursive call to normal processing */
LockBuffer(child->parent->buffer, GIST_EXCLUSIVE);
gistFindCorrectParent(r, child); gistFindCorrectParent(r, child);
} }
return; return;
} }
void /*
gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) * Form a downlink pointer for the page in 'buf'.
*/
static IndexTuple
gistformdownlink(Relation rel, Buffer buf, GISTSTATE *giststate,
GISTInsertStack *stack)
{ {
int is_splitted; Page page = BufferGetPage(buf);
ItemId iid; OffsetNumber maxoff;
IndexTuple oldtup, OffsetNumber offset;
newtup; IndexTuple downlink = NULL;
/* walk up */ maxoff = PageGetMaxOffsetNumber(page);
while (true) for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset))
{ {
/* IndexTuple ituple = (IndexTuple)
* After this call: 1. if child page was splited, then itup contains PageGetItem(page, PageGetItemId(page, offset));
* keys for each page 2. if child page wasn't splited, then itup if (downlink == NULL)
* contains additional for adjustment of current key downlink = CopyIndexTuple(ituple);
*/ else
if (state->stack->parent)
{ {
IndexTuple newdownlink;
newdownlink = gistgetadjusted(rel, downlink, ituple,
giststate);
if (newdownlink)
downlink = newdownlink;
}
}
/* /*
* X-lock parent page before proceed child, gistFindCorrectParent * If the page is completely empty, we can't form a meaningful
* should find and lock it * downlink for it. But we have to insert a downlink for the page.
* Any key will do, as long as its consistent with the downlink of
* parent page, so that we can legally insert it to the parent.
* A minimal one that matches as few scans as possible would be best,
* to keep scans from doing useless work, but we don't know how to
* construct that. So we just use the downlink of the original page
* that was split - that's as far from optimal as it can get but will
* do..
*/ */
gistFindCorrectParent(state->r, state->stack); if (!downlink)
{
ItemId iid;
LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
gistFindCorrectParent(rel, stack);
iid = PageGetItemId(stack->parent->page, stack->parent->childoffnum);
downlink = (IndexTuple) PageGetItem(stack->parent->page, iid);
downlink = CopyIndexTuple(downlink);
LockBuffer(stack->parent->buffer, GIST_UNLOCK);
} }
is_splitted = gistplacetopage(state, giststate);
/* parent locked above, so release child buffer */ ItemPointerSetBlockNumber(&(downlink->t_tid), BufferGetBlockNumber(buf));
UnlockReleaseBuffer(state->stack->buffer); GistTupleSetValid(downlink);
/* pop parent page from stack */ return downlink;
state->stack = state->stack->parent; }
/* stack is void */
if (!state->stack) /*
break; * Complete the incomplete split of state->stack->page.
*/
static void
gistfixsplit(GISTInsertState *state, GISTSTATE *giststate)
{
GISTInsertStack *stack = state->stack;
Buffer buf;
Page page;
List *splitinfo = NIL;
elog(LOG, "fixing incomplete split in index \"%s\", block %u",
RelationGetRelationName(state->r), stack->blkno);
Assert(GistFollowRight(stack->page));
Assert(OffsetNumberIsValid(stack->parent->childoffnum));
buf = stack->buffer;
/* /*
* child did not split, so we can check is it needed to update parent * Read the chain of split pages, following the rightlinks. Construct
* tuple * a downlink tuple for each page.
*/ */
if (!is_splitted) for (;;)
{
GISTPageSplitInfo *si = palloc(sizeof(GISTPageSplitInfo));
IndexTuple downlink;
page = BufferGetPage(buf);
/* Form the new downlink tuples to insert to parent */
downlink = gistformdownlink(state->r, buf, giststate, stack);
si->buf = buf;
si->downlink = downlink;
splitinfo = lappend(splitinfo, si);
if (GistFollowRight(page))
{ {
/* parent's tuple */ /* lock next page */
iid = PageGetItemId(state->stack->page, state->stack->childoffnum); buf = ReadBuffer(state->r, GistPageGetOpaque(page)->rightlink);
oldtup = (IndexTuple) PageGetItem(state->stack->page, iid); LockBuffer(buf, GIST_EXCLUSIVE);
newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate); }
else
if (!newtup)
{ /* not need to update key */
LockBuffer(state->stack->buffer, GIST_UNLOCK);
break; break;
} }
state->itup[0] = newtup; /* Insert the downlinks */
gistfinishsplit(state, stack, giststate, splitinfo);
}
/*
* Insert tuples to stack->buffer. If 'oldoffnum' is valid, the new tuples
* replace an old tuple at oldoffnum. The caller must hold an exclusive lock
* on the page.
*
* If leftchild is valid, we're inserting/updating the downlink for the
* page to the right of leftchild. We clear the F_FOLLOW_RIGHT flag and
* update NSN on leftchild, atomically with the insertion of the downlink.
*
* Returns 'true' if the page had to be split. On return, we will continue
* to hold an exclusive lock on state->stack->buffer, but if we had to split
* the page, it might not contain the tuple we just inserted/updated.
*/
static bool
gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
GISTSTATE *giststate,
IndexTuple *tuples, int ntup, OffsetNumber oldoffnum,
Buffer leftchild)
{
List *splitinfo;
bool is_split;
is_split = gistplacetopage(state, giststate, stack->buffer,
tuples, ntup, oldoffnum,
leftchild,
&splitinfo);
if (splitinfo)
gistfinishsplit(state, stack, giststate, splitinfo);
return is_split;
}
/*
* Finish an incomplete split by inserting/updating the downlinks in
* parent page. 'splitinfo' contains all the child pages, exclusively-locked,
* involved in the split, from left-to-right.
*/
static void
gistfinishsplit(GISTInsertState *state, GISTInsertStack *stack,
GISTSTATE *giststate, List *splitinfo)
{
ListCell *lc;
List *reversed;
GISTPageSplitInfo *right;
GISTPageSplitInfo *left;
IndexTuple tuples[2];
/* A split always contains at least two halves */
Assert(list_length(splitinfo) >= 2);
/*
* We need to insert downlinks for each new page, and update the
* downlink for the original (leftmost) page in the split. Begin at
* the rightmost page, inserting one downlink at a time until there's
* only two pages left. Finally insert the downlink for the last new
* page and update the downlink for the original page as one operation.
*/
/* for convenience, create a copy of the list in reverse order */
reversed = NIL;
foreach(lc, splitinfo)
{
reversed = lcons(lfirst(lc), reversed);
} }
} /* while */
/* release all parent buffers */ LockBuffer(stack->parent->buffer, GIST_EXCLUSIVE);
while (state->stack) gistFindCorrectParent(state->r, stack);
while(list_length(reversed) > 2)
{ {
ReleaseBuffer(state->stack->buffer); right = (GISTPageSplitInfo *) linitial(reversed);
state->stack = state->stack->parent; left = (GISTPageSplitInfo *) lsecond(reversed);
if (gistinserttuples(state, stack->parent, giststate,
&right->downlink, 1,
InvalidOffsetNumber,
left->buf))
{
/*
* If the parent page was split, need to relocate the original
* parent pointer.
*/
gistFindCorrectParent(state->r, stack);
}
UnlockReleaseBuffer(right->buf);
reversed = list_delete_first(reversed);
} }
/* say to xlog that insert is completed */ right = (GISTPageSplitInfo *) linitial(reversed);
if (state->needInsertComplete && RelationNeedsWAL(state->r)) left = (GISTPageSplitInfo *) lsecond(reversed);
gistxlogInsertCompletion(state->r->rd_node, &(state->key), 1);
/*
* Finally insert downlink for the remaining right page and update the
* downlink for the original page to not contain the tuples that were
* moved to the new pages.
*/
tuples[0] = left->downlink;
tuples[1] = right->downlink;
gistinserttuples(state, stack->parent, giststate,
tuples, 2,
stack->parent->childoffnum,
left->buf);
LockBuffer(stack->parent->buffer, GIST_UNLOCK);
UnlockReleaseBuffer(right->buf);
Assert(left->buf == stack->buffer);
} }
/* /*
...@@ -963,8 +1299,7 @@ gistSplit(Relation r, ...@@ -963,8 +1299,7 @@ gistSplit(Relation r,
ROTATEDIST(res); ROTATEDIST(res);
res->block.num = v.splitVector.spl_nright; res->block.num = v.splitVector.spl_nright;
res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist)); res->list = gistfillitupvec(rvectup, v.splitVector.spl_nright, &(res->lenlist));
res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false) res->itup = gistFormTuple(giststate, r, v.spl_rattr, v.spl_risnull, false);
: gist_form_invalid_tuple(GIST_ROOT_BLKNO);
} }
if (!gistfitpage(lvectup, v.splitVector.spl_nleft)) if (!gistfitpage(lvectup, v.splitVector.spl_nleft))
...@@ -986,50 +1321,12 @@ gistSplit(Relation r, ...@@ -986,50 +1321,12 @@ gistSplit(Relation r,
ROTATEDIST(res); ROTATEDIST(res);
res->block.num = v.splitVector.spl_nleft; res->block.num = v.splitVector.spl_nleft;
res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist)); res->list = gistfillitupvec(lvectup, v.splitVector.spl_nleft, &(res->lenlist));
res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false) res->itup = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lisnull, false);
: gist_form_invalid_tuple(GIST_ROOT_BLKNO);
} }
return res; return res;
} }
/*
* buffer must be pinned and locked by caller
*/
void
gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key)
{
Page page;
Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
page = BufferGetPage(buffer);
START_CRIT_SECTION();
GISTInitBuffer(buffer, 0);
gistfillbuffer(page, itup, len, FirstOffsetNumber);
MarkBufferDirty(buffer);
if (RelationNeedsWAL(r))
{
XLogRecPtr recptr;
XLogRecData *rdata;
rdata = formUpdateRdata(r->rd_node, buffer,
NULL, 0,
itup, len, key);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
}
else
PageSetLSN(page, GetXLogRecPtrForTemp());
END_CRIT_SECTION();
}
/* /*
* Fill a GISTSTATE with information about the index * Fill a GISTSTATE with information about the index
*/ */
......
...@@ -254,9 +254,15 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances, ...@@ -254,9 +254,15 @@ gistScanPage(IndexScanDesc scan, GISTSearchItem *pageItem, double *myDistances,
page = BufferGetPage(buffer); page = BufferGetPage(buffer);
opaque = GistPageGetOpaque(page); opaque = GistPageGetOpaque(page);
/* check if page split occurred since visit to parent */ /*
* Check if we need to follow the rightlink. We need to follow it if the
* page was concurrently split since we visited the parent (in which case
* parentlsn < nsn), or if the the system crashed after a page split but
* before the downlink was inserted into the parent.
*/
if (!XLogRecPtrIsInvalid(pageItem->data.parentlsn) && if (!XLogRecPtrIsInvalid(pageItem->data.parentlsn) &&
XLByteLT(pageItem->data.parentlsn, opaque->nsn) && (GistFollowRight(page) ||
XLByteLT(pageItem->data.parentlsn, opaque->nsn)) &&
opaque->rightlink != InvalidBlockNumber /* sanity check */ ) opaque->rightlink != InvalidBlockNumber /* sanity check */ )
{ {
/* There was a page split, follow right link to add pages */ /* There was a page split, follow right link to add pages */
......
...@@ -499,58 +499,6 @@ gistSplitHalf(GIST_SPLITVEC *v, int len) ...@@ -499,58 +499,6 @@ gistSplitHalf(GIST_SPLITVEC *v, int len)
v->spl_left[v->spl_nleft++] = i; v->spl_left[v->spl_nleft++] = i;
} }
/*
* if it was invalid tuple then we need special processing.
* We move all invalid tuples on right page.
*
* if there is no place on left page, gistSplit will be called one more
* time for left page.
*
* Normally, we never exec this code, but after crash replay it's possible
* to get 'invalid' tuples (probability is low enough)
*/
static void
gistSplitByInvalid(GISTSTATE *giststate, GistSplitVector *v, IndexTuple *itup, int len)
{
int i;
static OffsetNumber offInvTuples[MaxOffsetNumber];
int nOffInvTuples = 0;
for (i = 1; i <= len; i++)
if (GistTupleIsInvalid(itup[i - 1]))
offInvTuples[nOffInvTuples++] = i;
if (nOffInvTuples == len)
{
/* corner case, all tuples are invalid */
v->spl_rightvalid = v->spl_leftvalid = false;
gistSplitHalf(&v->splitVector, len);
}
else
{
GistSplitUnion gsvp;
v->splitVector.spl_right = offInvTuples;
v->splitVector.spl_nright = nOffInvTuples;
v->spl_rightvalid = false;
v->splitVector.spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
v->splitVector.spl_nleft = 0;
for (i = 1; i <= len; i++)
if (!GistTupleIsInvalid(itup[i - 1]))
v->splitVector.spl_left[v->splitVector.spl_nleft++] = i;
v->spl_leftvalid = true;
gsvp.equiv = NULL;
gsvp.attr = v->spl_lattr;
gsvp.len = v->splitVector.spl_nleft;
gsvp.entries = v->splitVector.spl_left;
gsvp.isnull = v->spl_lisnull;
gistunionsubkeyvec(giststate, itup, &gsvp, 0);
}
}
/* /*
* trys to split page by attno key, in a case of null * trys to split page by attno key, in a case of null
* values move its to separate page. * values move its to separate page.
...@@ -568,12 +516,6 @@ gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist ...@@ -568,12 +516,6 @@ gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist
Datum datum; Datum datum;
bool IsNull; bool IsNull;
if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
{
gistSplitByInvalid(giststate, v, itup, len);
return;
}
datum = index_getattr(itup[i - 1], attno + 1, giststate->tupdesc, &IsNull); datum = index_getattr(itup[i - 1], attno + 1, giststate->tupdesc, &IsNull);
gistdentryinit(giststate, attno, &(entryvec->vector[i]), gistdentryinit(giststate, attno, &(entryvec->vector[i]),
datum, r, page, i, datum, r, page, i,
...@@ -582,8 +524,6 @@ gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist ...@@ -582,8 +524,6 @@ gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *gist
offNullTuples[nOffNullTuples++] = i; offNullTuples[nOffNullTuples++] = i;
} }
v->spl_leftvalid = v->spl_rightvalid = true;
if (nOffNullTuples == len) if (nOffNullTuples == len)
{ {
/* /*
......
...@@ -152,7 +152,7 @@ gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) ...@@ -152,7 +152,7 @@ gistfillitupvec(IndexTuple *vec, int veclen, int *memlen)
* invalid tuple. Resulting Datums aren't compressed. * invalid tuple. Resulting Datums aren't compressed.
*/ */
bool void
gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
Datum *attr, bool *isnull) Datum *attr, bool *isnull)
{ {
...@@ -180,10 +180,6 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke ...@@ -180,10 +180,6 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke
Datum datum; Datum datum;
bool IsNull; bool IsNull;
if (GistTupleIsInvalid(itvec[j]))
return FALSE; /* signals that union with invalid tuple =>
* result is invalid */
datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull); datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
if (IsNull) if (IsNull)
continue; continue;
...@@ -218,8 +214,6 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke ...@@ -218,8 +214,6 @@ gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startke
isnull[i] = FALSE; isnull[i] = FALSE;
} }
} }
return TRUE;
} }
/* /*
...@@ -231,8 +225,7 @@ gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) ...@@ -231,8 +225,7 @@ gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
{ {
memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts); memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts);
if (!gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS)) gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS);
return gist_form_invalid_tuple(InvalidBlockNumber);
return gistFormTuple(giststate, r, attrS, isnullS, false); return gistFormTuple(giststate, r, attrS, isnullS, false);
} }
...@@ -328,9 +321,6 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis ...@@ -328,9 +321,6 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
IndexTuple newtup = NULL; IndexTuple newtup = NULL;
int i; int i;
if (GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup))
return gist_form_invalid_tuple(ItemPointerGetBlockNumber(&(oldtup->t_tid)));
gistDeCompressAtt(giststate, r, oldtup, NULL, gistDeCompressAtt(giststate, r, oldtup, NULL,
(OffsetNumber) 0, oldentries, oldisnull); (OffsetNumber) 0, oldentries, oldisnull);
...@@ -401,14 +391,6 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ ...@@ -401,14 +391,6 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
int j; int j;
IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup))
{
ereport(LOG,
(errmsg("index \"%s\" needs VACUUM or REINDEX to finish crash recovery",
RelationGetRelationName(r))));
continue;
}
sum_grow = 0; sum_grow = 0;
for (j = 0; j < r->rd_att->natts; j++) for (j = 0; j < r->rd_att->natts; j++)
{ {
...@@ -521,7 +503,11 @@ gistFormTuple(GISTSTATE *giststate, Relation r, ...@@ -521,7 +503,11 @@ gistFormTuple(GISTSTATE *giststate, Relation r,
} }
res = index_form_tuple(giststate->tupdesc, compatt, isnull); res = index_form_tuple(giststate->tupdesc, compatt, isnull);
GistTupleSetValid(res); /*
* The offset number on tuples on internal pages is unused. For historical
* reasons, it is set 0xffff.
*/
ItemPointerSetOffsetNumber( &(res->t_tid), 0xffff);
return res; return res;
} }
......
...@@ -26,13 +26,6 @@ ...@@ -26,13 +26,6 @@
#include "utils/memutils.h" #include "utils/memutils.h"
typedef struct GistBulkDeleteResult
{
IndexBulkDeleteResult std; /* common state */
bool needReindex;
} GistBulkDeleteResult;
/* /*
* VACUUM cleanup: update FSM * VACUUM cleanup: update FSM
*/ */
...@@ -40,7 +33,7 @@ Datum ...@@ -40,7 +33,7 @@ Datum
gistvacuumcleanup(PG_FUNCTION_ARGS) gistvacuumcleanup(PG_FUNCTION_ARGS)
{ {
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1); IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
Relation rel = info->index; Relation rel = info->index;
BlockNumber npages, BlockNumber npages,
blkno; blkno;
...@@ -56,10 +49,10 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) ...@@ -56,10 +49,10 @@ gistvacuumcleanup(PG_FUNCTION_ARGS)
/* Set up all-zero stats if gistbulkdelete wasn't called */ /* Set up all-zero stats if gistbulkdelete wasn't called */
if (stats == NULL) if (stats == NULL)
{ {
stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult)); stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
/* use heap's tuple count */ /* use heap's tuple count */
stats->std.num_index_tuples = info->num_heap_tuples; stats->num_index_tuples = info->num_heap_tuples;
stats->std.estimated_count = info->estimated_count; stats->estimated_count = info->estimated_count;
/* /*
* XXX the above is wrong if index is partial. Would it be OK to just * XXX the above is wrong if index is partial. Would it be OK to just
...@@ -67,11 +60,6 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) ...@@ -67,11 +60,6 @@ gistvacuumcleanup(PG_FUNCTION_ARGS)
*/ */
} }
if (stats->needReindex)
ereport(NOTICE,
(errmsg("index \"%s\" needs VACUUM FULL or REINDEX to finish crash recovery",
RelationGetRelationName(rel))));
/* /*
* Need lock unless it's local to this backend. * Need lock unless it's local to this backend.
*/ */
...@@ -112,10 +100,10 @@ gistvacuumcleanup(PG_FUNCTION_ARGS) ...@@ -112,10 +100,10 @@ gistvacuumcleanup(PG_FUNCTION_ARGS)
IndexFreeSpaceMapVacuum(info->index); IndexFreeSpaceMapVacuum(info->index);
/* return statistics */ /* return statistics */
stats->std.pages_free = totFreePages; stats->pages_free = totFreePages;
if (needLock) if (needLock)
LockRelationForExtension(rel, ExclusiveLock); LockRelationForExtension(rel, ExclusiveLock);
stats->std.num_pages = RelationGetNumberOfBlocks(rel); stats->num_pages = RelationGetNumberOfBlocks(rel);
if (needLock) if (needLock)
UnlockRelationForExtension(rel, ExclusiveLock); UnlockRelationForExtension(rel, ExclusiveLock);
...@@ -135,7 +123,7 @@ pushStackIfSplited(Page page, GistBDItem *stack) ...@@ -135,7 +123,7 @@ pushStackIfSplited(Page page, GistBDItem *stack)
GISTPageOpaque opaque = GistPageGetOpaque(page); GISTPageOpaque opaque = GistPageGetOpaque(page);
if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) && if (stack->blkno != GIST_ROOT_BLKNO && !XLogRecPtrIsInvalid(stack->parentlsn) &&
XLByteLT(stack->parentlsn, opaque->nsn) && (GistFollowRight(page) || XLByteLT(stack->parentlsn, opaque->nsn)) &&
opaque->rightlink != InvalidBlockNumber /* sanity check */ ) opaque->rightlink != InvalidBlockNumber /* sanity check */ )
{ {
/* split page detected, install right link to the stack */ /* split page detected, install right link to the stack */
...@@ -162,7 +150,7 @@ Datum ...@@ -162,7 +150,7 @@ Datum
gistbulkdelete(PG_FUNCTION_ARGS) gistbulkdelete(PG_FUNCTION_ARGS)
{ {
IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0); IndexVacuumInfo *info = (IndexVacuumInfo *) PG_GETARG_POINTER(0);
GistBulkDeleteResult *stats = (GistBulkDeleteResult *) PG_GETARG_POINTER(1); IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(1);
IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2); IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(2);
void *callback_state = (void *) PG_GETARG_POINTER(3); void *callback_state = (void *) PG_GETARG_POINTER(3);
Relation rel = info->index; Relation rel = info->index;
...@@ -171,10 +159,10 @@ gistbulkdelete(PG_FUNCTION_ARGS) ...@@ -171,10 +159,10 @@ gistbulkdelete(PG_FUNCTION_ARGS)
/* first time through? */ /* first time through? */
if (stats == NULL) if (stats == NULL)
stats = (GistBulkDeleteResult *) palloc0(sizeof(GistBulkDeleteResult)); stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
/* we'll re-count the tuples each time */ /* we'll re-count the tuples each time */
stats->std.estimated_count = false; stats->estimated_count = false;
stats->std.num_index_tuples = 0; stats->num_index_tuples = 0;
stack = (GistBDItem *) palloc0(sizeof(GistBDItem)); stack = (GistBDItem *) palloc0(sizeof(GistBDItem));
stack->blkno = GIST_ROOT_BLKNO; stack->blkno = GIST_ROOT_BLKNO;
...@@ -232,10 +220,10 @@ gistbulkdelete(PG_FUNCTION_ARGS) ...@@ -232,10 +220,10 @@ gistbulkdelete(PG_FUNCTION_ARGS)
{ {
todelete[ntodelete] = i - ntodelete; todelete[ntodelete] = i - ntodelete;
ntodelete++; ntodelete++;
stats->std.tuples_removed += 1; stats->tuples_removed += 1;
} }
else else
stats->std.num_index_tuples += 1; stats->num_index_tuples += 1;
} }
if (ntodelete) if (ntodelete)
...@@ -250,22 +238,13 @@ gistbulkdelete(PG_FUNCTION_ARGS) ...@@ -250,22 +238,13 @@ gistbulkdelete(PG_FUNCTION_ARGS)
if (RelationNeedsWAL(rel)) if (RelationNeedsWAL(rel))
{ {
XLogRecData *rdata;
XLogRecPtr recptr; XLogRecPtr recptr;
gistxlogPageUpdate *xlinfo;
rdata = formUpdateRdata(rel->rd_node, buffer, recptr = gistXLogUpdate(rel->rd_node, buffer,
todelete, ntodelete, todelete, ntodelete,
NULL, 0, NULL, 0, InvalidBuffer);
NULL);
xlinfo = (gistxlogPageUpdate *) rdata->next->data;
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
PageSetLSN(page, recptr); PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
pfree(xlinfo);
pfree(rdata);
} }
else else
PageSetLSN(page, GetXLogRecPtrForTemp()); PageSetLSN(page, GetXLogRecPtrForTemp());
...@@ -293,7 +272,11 @@ gistbulkdelete(PG_FUNCTION_ARGS) ...@@ -293,7 +272,11 @@ gistbulkdelete(PG_FUNCTION_ARGS)
stack->next = ptr; stack->next = ptr;
if (GistTupleIsInvalid(idxtuple)) if (GistTupleIsInvalid(idxtuple))
stats->needReindex = true; ereport(LOG,
(errmsg("index \"%s\" contains an inner tuple marked as invalid",
RelationGetRelationName(rel)),
errdetail("This is caused by an incomplete page split at crash recovery before upgrading to 9.1."),
errhint("Please REINDEX it.")));
} }
} }
......
...@@ -20,15 +20,6 @@ ...@@ -20,15 +20,6 @@
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/rel.h" #include "utils/rel.h"
typedef struct
{
gistxlogPageUpdate *data;
int len;
IndexTuple *itup;
OffsetNumber *todelete;
} PageUpdateRecord;
typedef struct typedef struct
{ {
gistxlogPage *header; gistxlogPage *header;
...@@ -41,144 +32,37 @@ typedef struct ...@@ -41,144 +32,37 @@ typedef struct
NewPage *page; NewPage *page;
} PageSplitRecord; } PageSplitRecord;
/* track for incomplete inserts, idea was taken from nbtxlog.c */
typedef struct gistIncompleteInsert
{
RelFileNode node;
BlockNumber origblkno; /* for splits */
ItemPointerData key;
int lenblk;
BlockNumber *blkno;
XLogRecPtr lsn;
BlockNumber *path;
int pathlen;
} gistIncompleteInsert;
static MemoryContext opCtx; /* working memory for operations */ static MemoryContext opCtx; /* working memory for operations */
static MemoryContext insertCtx; /* holds incomplete_inserts list */
static List *incomplete_inserts;
#define ItemPointerEQ(a, b) \
( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \
ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) )
/*
* Replay the clearing of F_FOLLOW_RIGHT flag.
*/
static void static void
pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn,
BlockNumber *blkno, int lenblk, BlockNumber leftblkno)
PageSplitRecord *xlinfo /* to extract blkno info */ )
{ {
MemoryContext oldCxt; Buffer buffer;
gistIncompleteInsert *ninsert;
if (!ItemPointerIsValid(&key))
/*
* if key is null then we should not store insertion as incomplete,
* because it's a vacuum operation..
*/
return;
oldCxt = MemoryContextSwitchTo(insertCtx);
ninsert = (gistIncompleteInsert *) palloc(sizeof(gistIncompleteInsert));
ninsert->node = node;
ninsert->key = key;
ninsert->lsn = lsn;
if (lenblk && blkno) buffer = XLogReadBuffer(node, leftblkno, false);
{ if (BufferIsValid(buffer))
ninsert->lenblk = lenblk;
ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
memcpy(ninsert->blkno, blkno, sizeof(BlockNumber) * ninsert->lenblk);
ninsert->origblkno = *blkno;
}
else
{ {
int i; Page page = (Page) BufferGetPage(buffer);
Assert(xlinfo);
ninsert->lenblk = xlinfo->data->npage;
ninsert->blkno = (BlockNumber *) palloc(sizeof(BlockNumber) * ninsert->lenblk);
for (i = 0; i < ninsert->lenblk; i++)
ninsert->blkno[i] = xlinfo->page[i].header->blkno;
ninsert->origblkno = xlinfo->data->origblkno;
}
Assert(ninsert->lenblk > 0);
/* /*
* Stick the new incomplete insert onto the front of the list, not the * Note that we still update the page even if page LSN is equal to the
* back. This is so that gist_xlog_cleanup will process incompletions in * LSN of this record, because the updated NSN is not included in the
* last-in-first-out order. * full page image.
*/ */
incomplete_inserts = lcons(ninsert, incomplete_inserts); if (!XLByteLT(lsn, PageGetLSN(page)))
MemoryContextSwitchTo(oldCxt);
}
static void
forgetIncompleteInsert(RelFileNode node, ItemPointerData key)
{
ListCell *l;
if (!ItemPointerIsValid(&key))
return;
if (incomplete_inserts == NIL)
return;
foreach(l, incomplete_inserts)
{
gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key)))
{
/* found */
incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
pfree(insert->blkno);
pfree(insert);
break;
}
}
}
static void
decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
{
char *begin = XLogRecGetData(record),
*ptr;
int i = 0,
addpath = 0;
decoded->data = (gistxlogPageUpdate *) begin;
if (decoded->data->ntodelete)
{ {
decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath); GistPageGetOpaque(page)->nsn = lsn;
addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete); GistClearFollowRight(page);
}
else
decoded->todelete = NULL;
decoded->len = 0; PageSetLSN(page, lsn);
ptr = begin + sizeof(gistxlogPageUpdate) + addpath; PageSetTLI(page, ThisTimeLineID);
while (ptr - begin < record->xl_len) MarkBufferDirty(buffer);
{
decoded->len++;
ptr += IndexTupleSize((IndexTuple) ptr);
} }
UnlockReleaseBuffer(buffer);
decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len);
ptr = begin + sizeof(gistxlogPageUpdate) + addpath;
while (ptr - begin < record->xl_len)
{
decoded->itup[i] = (IndexTuple) ptr;
ptr += IndexTupleSize(decoded->itup[i]);
i++;
} }
} }
...@@ -186,29 +70,22 @@ decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record) ...@@ -186,29 +70,22 @@ decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record)
* redo any page update (except page split) * redo any page update (except page split)
*/ */
static void static void
gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
{ {
gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) XLogRecGetData(record); char *begin = XLogRecGetData(record);
PageUpdateRecord xlrec; gistxlogPageUpdate *xldata = (gistxlogPageUpdate *) begin;
Buffer buffer; Buffer buffer;
Page page; Page page;
char *data;
/* we must fix incomplete_inserts list even if XLR_BKP_BLOCK_1 is set */ if (BlockNumberIsValid(xldata->leftchild))
forgetIncompleteInsert(xldata->node, xldata->key); gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
if (!isnewroot && xldata->blkno != GIST_ROOT_BLKNO)
/* operation with root always finalizes insertion */
pushIncompleteInsert(xldata->node, lsn, xldata->key,
&(xldata->blkno), 1,
NULL);
/* nothing else to do if page was backed up (and no info to do it with) */ /* nothing more to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1) if (record->xl_info & XLR_BKP_BLOCK_1)
return; return;
decodePageUpdateRecord(&xlrec, record); buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
buffer = XLogReadBuffer(xlrec.data->node, xlrec.data->blkno, false);
if (!BufferIsValid(buffer)) if (!BufferIsValid(buffer))
return; return;
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
...@@ -219,28 +96,49 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) ...@@ -219,28 +96,49 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot)
return; return;
} }
if (isnewroot) data = begin + sizeof(gistxlogPageUpdate);
GISTInitBuffer(buffer, 0);
else if (xlrec.data->ntodelete) /* Delete old tuples */
if (xldata->ntodelete > 0)
{ {
int i; int i;
OffsetNumber *todelete = (OffsetNumber *) data;
data += sizeof(OffsetNumber) * xldata->ntodelete;
for (i = 0; i < xlrec.data->ntodelete; i++) for (i = 0; i < xldata->ntodelete; i++)
PageIndexTupleDelete(page, xlrec.todelete[i]); PageIndexTupleDelete(page, todelete[i]);
if (GistPageIsLeaf(page)) if (GistPageIsLeaf(page))
GistMarkTuplesDeleted(page); GistMarkTuplesDeleted(page);
} }
/* add tuples */ /* add tuples */
if (xlrec.len > 0) if (data - begin < record->xl_len)
gistfillbuffer(page, xlrec.itup, xlrec.len, InvalidOffsetNumber); {
OffsetNumber off = (PageIsEmpty(page)) ? FirstOffsetNumber :
OffsetNumberNext(PageGetMaxOffsetNumber(page));
while (data - begin < record->xl_len)
{
IndexTuple itup = (IndexTuple) data;
Size sz = IndexTupleSize(itup);
OffsetNumber l;
data += sz;
l = PageAddItem(page, (Item) itup, sz, off, false, false);
if (l == InvalidOffsetNumber)
elog(ERROR, "failed to add item to GiST index page, size %d bytes",
(int) sz);
off++;
}
}
else
{
/* /*
* special case: leafpage, nothing to insert, nothing to delete, then * special case: leafpage, nothing to insert, nothing to delete, then
* vacuum marks page * vacuum marks page
*/ */
if (GistPageIsLeaf(page) && xlrec.len == 0 && xlrec.data->ntodelete == 0) if (GistPageIsLeaf(page) && xldata->ntodelete == 0)
GistClearTuplesDeleted(page); GistClearTuplesDeleted(page);
}
if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO) if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO)
...@@ -315,41 +213,67 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) ...@@ -315,41 +213,67 @@ decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
static void static void
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
{ {
gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
PageSplitRecord xlrec; PageSplitRecord xlrec;
Buffer buffer; Buffer buffer;
Page page; Page page;
int i; int i;
int flags; bool isrootsplit = false;
if (BlockNumberIsValid(xldata->leftchild))
gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
decodePageSplitRecord(&xlrec, record); decodePageSplitRecord(&xlrec, record);
flags = xlrec.data->origleaf ? F_LEAF : 0;
/* loop around all pages */ /* loop around all pages */
for (i = 0; i < xlrec.data->npage; i++) for (i = 0; i < xlrec.data->npage; i++)
{ {
NewPage *newpage = xlrec.page + i; NewPage *newpage = xlrec.page + i;
int flags;
if (newpage->header->blkno == GIST_ROOT_BLKNO)
{
Assert(i == 0);
isrootsplit = true;
}
buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true); buffer = XLogReadBuffer(xlrec.data->node, newpage->header->blkno, true);
Assert(BufferIsValid(buffer)); Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
/* ok, clear buffer */ /* ok, clear buffer */
if (xlrec.data->origleaf && newpage->header->blkno != GIST_ROOT_BLKNO)
flags = F_LEAF;
else
flags = 0;
GISTInitBuffer(buffer, flags); GISTInitBuffer(buffer, flags);
/* and fill it */ /* and fill it */
gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber); gistfillbuffer(page, newpage->itup, newpage->header->num, FirstOffsetNumber);
if (newpage->header->blkno == GIST_ROOT_BLKNO)
{
GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
GistPageGetOpaque(page)->nsn = xldata->orignsn;
GistClearFollowRight(page);
}
else
{
if (i < xlrec.data->npage - 1)
GistPageGetOpaque(page)->rightlink = xlrec.page[i + 1].header->blkno;
else
GistPageGetOpaque(page)->rightlink = xldata->origrlink;
GistPageGetOpaque(page)->nsn = xldata->orignsn;
if (i < xlrec.data->npage - 1 && !isrootsplit)
GistMarkFollowRight(page);
else
GistClearFollowRight(page);
}
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer); MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
pushIncompleteInsert(xlrec.data->node, lsn, xlrec.data->key,
NULL, 0,
&xlrec);
} }
static void static void
...@@ -372,24 +296,6 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) ...@@ -372,24 +296,6 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
static void
gistRedoCompleteInsert(XLogRecPtr lsn, XLogRecord *record)
{
char *begin = XLogRecGetData(record),
*ptr;
gistxlogInsertComplete *xlrec;
xlrec = (gistxlogInsertComplete *) begin;
ptr = begin + sizeof(gistxlogInsertComplete);
while (ptr - begin < record->xl_len)
{
Assert(record->xl_len - (ptr - begin) >= sizeof(ItemPointerData));
forgetIncompleteInsert(xlrec->node, *((ItemPointerData *) ptr));
ptr += sizeof(ItemPointerData);
}
}
void void
gist_redo(XLogRecPtr lsn, XLogRecord *record) gist_redo(XLogRecPtr lsn, XLogRecord *record)
{ {
...@@ -401,30 +307,23 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) ...@@ -401,30 +307,23 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
* implement a similar optimization we have in b-tree, and remove killed * implement a similar optimization we have in b-tree, and remove killed
* tuples outside VACUUM, we'll need to handle that here. * tuples outside VACUUM, we'll need to handle that here.
*/ */
RestoreBkpBlocks(lsn, record, false); RestoreBkpBlocks(lsn, record, false);
oldCxt = MemoryContextSwitchTo(opCtx); oldCxt = MemoryContextSwitchTo(opCtx);
switch (info) switch (info)
{ {
case XLOG_GIST_PAGE_UPDATE: case XLOG_GIST_PAGE_UPDATE:
gistRedoPageUpdateRecord(lsn, record, false); gistRedoPageUpdateRecord(lsn, record);
break; break;
case XLOG_GIST_PAGE_DELETE: case XLOG_GIST_PAGE_DELETE:
gistRedoPageDeleteRecord(lsn, record); gistRedoPageDeleteRecord(lsn, record);
break; break;
case XLOG_GIST_NEW_ROOT:
gistRedoPageUpdateRecord(lsn, record, true);
break;
case XLOG_GIST_PAGE_SPLIT: case XLOG_GIST_PAGE_SPLIT:
gistRedoPageSplitRecord(lsn, record); gistRedoPageSplitRecord(lsn, record);
break; break;
case XLOG_GIST_CREATE_INDEX: case XLOG_GIST_CREATE_INDEX:
gistRedoCreateIndex(lsn, record); gistRedoCreateIndex(lsn, record);
break; break;
case XLOG_GIST_INSERT_COMPLETE:
gistRedoCompleteInsert(lsn, record);
break;
default: default:
elog(PANIC, "gist_redo: unknown op code %u", info); elog(PANIC, "gist_redo: unknown op code %u", info);
} }
...@@ -434,20 +333,16 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) ...@@ -434,20 +333,16 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
} }
static void static void
out_target(StringInfo buf, RelFileNode node, ItemPointerData key) out_target(StringInfo buf, RelFileNode node)
{ {
appendStringInfo(buf, "rel %u/%u/%u", appendStringInfo(buf, "rel %u/%u/%u",
node.spcNode, node.dbNode, node.relNode); node.spcNode, node.dbNode, node.relNode);
if (ItemPointerIsValid(&key))
appendStringInfo(buf, "; tid %u/%u",
ItemPointerGetBlockNumber(&key),
ItemPointerGetOffsetNumber(&key));
} }
static void static void
out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
{ {
out_target(buf, xlrec->node, xlrec->key); out_target(buf, xlrec->node);
appendStringInfo(buf, "; block number %u", xlrec->blkno); appendStringInfo(buf, "; block number %u", xlrec->blkno);
} }
...@@ -463,7 +358,7 @@ static void ...@@ -463,7 +358,7 @@ static void
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec) out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
{ {
appendStringInfo(buf, "page_split: "); appendStringInfo(buf, "page_split: ");
out_target(buf, xlrec->node, xlrec->key); out_target(buf, xlrec->node);
appendStringInfo(buf, "; block number %u splits to %d pages", appendStringInfo(buf, "; block number %u splits to %d pages",
xlrec->origblkno, xlrec->npage); xlrec->origblkno, xlrec->npage);
} }
...@@ -482,10 +377,6 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -482,10 +377,6 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec)
case XLOG_GIST_PAGE_DELETE: case XLOG_GIST_PAGE_DELETE:
out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec); out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
break; break;
case XLOG_GIST_NEW_ROOT:
appendStringInfo(buf, "new_root: ");
out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key);
break;
case XLOG_GIST_PAGE_SPLIT: case XLOG_GIST_PAGE_SPLIT:
out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec); out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
break; break;
...@@ -495,415 +386,102 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -495,415 +386,102 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec)
((RelFileNode *) rec)->dbNode, ((RelFileNode *) rec)->dbNode,
((RelFileNode *) rec)->relNode); ((RelFileNode *) rec)->relNode);
break; break;
case XLOG_GIST_INSERT_COMPLETE:
appendStringInfo(buf, "complete_insert: rel %u/%u/%u",
((gistxlogInsertComplete *) rec)->node.spcNode,
((gistxlogInsertComplete *) rec)->node.dbNode,
((gistxlogInsertComplete *) rec)->node.relNode);
break;
default: default:
appendStringInfo(buf, "unknown gist op code %u", info); appendStringInfo(buf, "unknown gist op code %u", info);
break; break;
} }
} }
IndexTuple
gist_form_invalid_tuple(BlockNumber blkno)
{
/*
* we don't alloc space for null's bitmap, this is invalid tuple, be
* carefull in read and write code
*/
Size size = IndexInfoFindDataOffset(0);
IndexTuple tuple = (IndexTuple) palloc0(size);
tuple->t_info |= size;
ItemPointerSetBlockNumber(&(tuple->t_tid), blkno);
GistTupleSetInvalid(tuple);
return tuple;
}
static void
gistxlogFindPath(Relation index, gistIncompleteInsert *insert)
{
GISTInsertStack *top;
insert->pathlen = 0;
insert->path = NULL;
if ((top = gistFindPath(index, insert->origblkno)) != NULL)
{
int i;
GISTInsertStack *ptr;
for (ptr = top; ptr; ptr = ptr->parent)
insert->pathlen++;
insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen);
i = 0;
for (ptr = top; ptr; ptr = ptr->parent)
insert->path[i++] = ptr->blkno;
}
else
elog(ERROR, "lost parent for block %u", insert->origblkno);
}
static SplitedPageLayout *
gistMakePageLayout(Buffer *buffers, int nbuffers)
{
SplitedPageLayout *res = NULL,
*resptr;
while (nbuffers-- > 0)
{
Page page = BufferGetPage(buffers[nbuffers]);
IndexTuple *vec;
int veclen;
resptr = (SplitedPageLayout *) palloc0(sizeof(SplitedPageLayout));
resptr->block.blkno = BufferGetBlockNumber(buffers[nbuffers]);
resptr->block.num = PageGetMaxOffsetNumber(page);
vec = gistextractpage(page, &veclen);
resptr->list = gistfillitupvec(vec, veclen, &(resptr->lenlist));
resptr->next = res;
res = resptr;
}
return res;
}
/*
* Continue insert after crash. In normal situations, there aren't any
* incomplete inserts, but if a crash occurs partway through an insertion
* sequence, we'll need to finish making the index valid at the end of WAL
* replay.
*
* Note that we assume the index is now in a valid state, except for the
* unfinished insertion. In particular it's safe to invoke gistFindPath();
* there shouldn't be any garbage pages for it to run into.
*
* To complete insert we can't use basic insertion algorithm because
* during insertion we can't call user-defined support functions of opclass.
* So, we insert 'invalid' tuples without real key and do it by separate algorithm.
* 'invalid' tuple should be updated by vacuum full.
*/
static void
gistContinueInsert(gistIncompleteInsert *insert)
{
IndexTuple *itup;
int i,
lenitup;
Relation index;
index = CreateFakeRelcacheEntry(insert->node);
/*
* needed vector itup never will be more than initial lenblkno+2, because
* during this processing Indextuple can be only smaller
*/
lenitup = insert->lenblk;
itup = (IndexTuple *) palloc(sizeof(IndexTuple) * (lenitup + 2 /* guarantee root split */ ));
for (i = 0; i < insert->lenblk; i++)
itup[i] = gist_form_invalid_tuple(insert->blkno[i]);
/*
* any insertion of itup[] should make LOG message about
*/
if (insert->origblkno == GIST_ROOT_BLKNO)
{
/*
* it was split root, so we should only make new root. it can't be
* simple insert into root, we should replace all content of root.
*/
Buffer buffer = XLogReadBuffer(insert->node, GIST_ROOT_BLKNO, true);
gistnewroot(index, buffer, itup, lenitup, NULL);
UnlockReleaseBuffer(buffer);
}
else
{
Buffer *buffers;
Page *pages;
int numbuffer;
OffsetNumber *todelete;
/* construct path */
gistxlogFindPath(index, insert);
Assert(insert->pathlen > 0);
buffers = (Buffer *) palloc(sizeof(Buffer) * (insert->lenblk + 2 /* guarantee root split */ ));
pages = (Page *) palloc(sizeof(Page) * (insert->lenblk + 2 /* guarantee root split */ ));
todelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (insert->lenblk + 2 /* guarantee root split */ ));
for (i = 0; i < insert->pathlen; i++)
{
int j,
k,
pituplen = 0;
uint8 xlinfo;
XLogRecData *rdata;
XLogRecPtr recptr;
Buffer tempbuffer = InvalidBuffer;
int ntodelete = 0;
numbuffer = 1;
buffers[0] = ReadBuffer(index, insert->path[i]);
LockBuffer(buffers[0], GIST_EXCLUSIVE);
/*
* we check buffer, because we restored page earlier
*/
gistcheckpage(index, buffers[0]);
pages[0] = BufferGetPage(buffers[0]);
Assert(!GistPageIsLeaf(pages[0]));
pituplen = PageGetMaxOffsetNumber(pages[0]);
/* find remove old IndexTuples to remove */
for (j = 0; j < pituplen && ntodelete < lenitup; j++)
{
BlockNumber blkno;
ItemId iid = PageGetItemId(pages[0], j + FirstOffsetNumber);
IndexTuple idxtup = (IndexTuple) PageGetItem(pages[0], iid);
blkno = ItemPointerGetBlockNumber(&(idxtup->t_tid));
for (k = 0; k < lenitup; k++)
if (ItemPointerGetBlockNumber(&(itup[k]->t_tid)) == blkno)
{
todelete[ntodelete] = j + FirstOffsetNumber - ntodelete;
ntodelete++;
break;
}
}
if (ntodelete == 0)
elog(PANIC, "gistContinueInsert: cannot find pointer to page(s)");
/*
* we check space with subtraction only first tuple to delete,
* hope, that wiil be enough space....
*/
if (gistnospace(pages[0], itup, lenitup, *todelete, 0))
{
/* no space left on page, so we must split */
buffers[numbuffer] = ReadBuffer(index, P_NEW);
LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE);
GISTInitBuffer(buffers[numbuffer], 0);
pages[numbuffer] = BufferGetPage(buffers[numbuffer]);
gistfillbuffer(pages[numbuffer], itup, lenitup, FirstOffsetNumber);
numbuffer++;
if (BufferGetBlockNumber(buffers[0]) == GIST_ROOT_BLKNO)
{
Buffer tmp;
/*
* we split root, just copy content from root to new page
*/
/* sanity check */
if (i + 1 != insert->pathlen)
elog(PANIC, "unexpected pathlen in index \"%s\"",
RelationGetRelationName(index));
/* fill new page, root will be changed later */
tempbuffer = ReadBuffer(index, P_NEW);
LockBuffer(tempbuffer, GIST_EXCLUSIVE);
memcpy(BufferGetPage(tempbuffer), pages[0], BufferGetPageSize(tempbuffer));
/* swap buffers[0] (was root) and temp buffer */
tmp = buffers[0];
buffers[0] = tempbuffer;
tempbuffer = tmp; /* now in tempbuffer GIST_ROOT_BLKNO,
* it is still unchanged */
pages[0] = BufferGetPage(buffers[0]);
}
START_CRIT_SECTION();
for (j = 0; j < ntodelete; j++)
PageIndexTupleDelete(pages[0], todelete[j]);
xlinfo = XLOG_GIST_PAGE_SPLIT;
rdata = formSplitRdata(index->rd_node, insert->path[i],
false, &(insert->key),
gistMakePageLayout(buffers, numbuffer));
}
else
{
START_CRIT_SECTION();
for (j = 0; j < ntodelete; j++)
PageIndexTupleDelete(pages[0], todelete[j]);
gistfillbuffer(pages[0], itup, lenitup, InvalidOffsetNumber);
xlinfo = XLOG_GIST_PAGE_UPDATE;
rdata = formUpdateRdata(index->rd_node, buffers[0],
todelete, ntodelete,
itup, lenitup, &(insert->key));
}
/*
* use insert->key as mark for completion of insert (form*Rdata()
* above) for following possible replays
*/
/* write pages, we should mark it dirty befor XLogInsert() */
for (j = 0; j < numbuffer; j++)
{
GistPageGetOpaque(pages[j])->rightlink = InvalidBlockNumber;
MarkBufferDirty(buffers[j]);
}
recptr = XLogInsert(RM_GIST_ID, xlinfo, rdata);
for (j = 0; j < numbuffer; j++)
{
PageSetLSN(pages[j], recptr);
PageSetTLI(pages[j], ThisTimeLineID);
}
END_CRIT_SECTION();
lenitup = numbuffer;
for (j = 0; j < numbuffer; j++)
{
itup[j] = gist_form_invalid_tuple(BufferGetBlockNumber(buffers[j]));
UnlockReleaseBuffer(buffers[j]);
}
if (tempbuffer != InvalidBuffer)
{
/*
* it was a root split, so fill it by new values
*/
gistnewroot(index, tempbuffer, itup, lenitup, &(insert->key));
UnlockReleaseBuffer(tempbuffer);
}
}
}
FreeFakeRelcacheEntry(index);
ereport(LOG,
(errmsg("index %u/%u/%u needs VACUUM FULL or REINDEX to finish crash recovery",
insert->node.spcNode, insert->node.dbNode, insert->node.relNode),
errdetail("Incomplete insertion detected during crash replay.")));
}
void void
gist_xlog_startup(void) gist_xlog_startup(void)
{ {
incomplete_inserts = NIL;
insertCtx = AllocSetContextCreate(CurrentMemoryContext,
"GiST recovery temporary context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
opCtx = createTempGistContext(); opCtx = createTempGistContext();
} }
void void
gist_xlog_cleanup(void) gist_xlog_cleanup(void)
{ {
ListCell *l;
MemoryContext oldCxt;
oldCxt = MemoryContextSwitchTo(opCtx);
foreach(l, incomplete_inserts)
{
gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l);
gistContinueInsert(insert);
MemoryContextReset(opCtx);
}
MemoryContextSwitchTo(oldCxt);
MemoryContextDelete(opCtx); MemoryContextDelete(opCtx);
MemoryContextDelete(insertCtx);
} }
bool /*
gist_safe_restartpoint(void) * Write WAL record of a page split.
{ */
if (incomplete_inserts) XLogRecPtr
return false; gistXLogSplit(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
return true; SplitedPageLayout *dist,
} BlockNumber origrlink, GistNSN orignsn,
Buffer leftchildbuf)
XLogRecData *
formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
ItemPointer key, SplitedPageLayout *dist)
{ {
XLogRecData *rdata; XLogRecData *rdata;
gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit)); gistxlogPageSplit xlrec;
SplitedPageLayout *ptr; SplitedPageLayout *ptr;
int npage = 0, int npage = 0,
cur = 1; cur;
XLogRecPtr recptr;
ptr = dist; for (ptr = dist; ptr; ptr = ptr->next)
while (ptr)
{
npage++; npage++;
ptr = ptr->next;
}
rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2)); rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (npage * 2 + 2));
xlrec->node = node; xlrec.node = node;
xlrec->origblkno = blkno; xlrec.origblkno = blkno;
xlrec->origleaf = page_is_leaf; xlrec.origrlink = origrlink;
xlrec->npage = (uint16) npage; xlrec.orignsn = orignsn;
if (key) xlrec.origleaf = page_is_leaf;
xlrec->key = *key; xlrec.npage = (uint16) npage;
else xlrec.leftchild =
ItemPointerSetInvalid(&(xlrec->key)); BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
rdata[0].buffer = InvalidBuffer; rdata[0].data = (char *) &xlrec;
rdata[0].data = (char *) xlrec;
rdata[0].len = sizeof(gistxlogPageSplit); rdata[0].len = sizeof(gistxlogPageSplit);
rdata[0].next = NULL; rdata[0].buffer = InvalidBuffer;
cur = 1;
/*
* Include a full page image of the child buf. (only necessary if a
* checkpoint happened since the child page was split)
*/
if (BufferIsValid(leftchildbuf))
{
rdata[cur - 1].next = &(rdata[cur]);
rdata[cur].data = NULL;
rdata[cur].len = 0;
rdata[cur].buffer = leftchildbuf;
rdata[cur].buffer_std = true;
cur++;
}
ptr = dist; for (ptr = dist; ptr; ptr = ptr->next)
while (ptr)
{ {
rdata[cur - 1].next = &(rdata[cur]);
rdata[cur].buffer = InvalidBuffer; rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char *) &(ptr->block); rdata[cur].data = (char *) &(ptr->block);
rdata[cur].len = sizeof(gistxlogPage); rdata[cur].len = sizeof(gistxlogPage);
rdata[cur - 1].next = &(rdata[cur]);
cur++; cur++;
rdata[cur - 1].next = &(rdata[cur]);
rdata[cur].buffer = InvalidBuffer; rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char *) (ptr->list); rdata[cur].data = (char *) (ptr->list);
rdata[cur].len = ptr->lenlist; rdata[cur].len = ptr->lenlist;
rdata[cur - 1].next = &(rdata[cur]);
rdata[cur].next = NULL;
cur++; cur++;
ptr = ptr->next;
} }
rdata[cur - 1].next = NULL;
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
return rdata; pfree(rdata);
return recptr;
} }
/* /*
* Construct the rdata array for an XLOG record describing a page update * Write XLOG record describing a page update. The update can include any
* (deletion and/or insertion of tuples on a single index page). * number of deletions and/or insertions of tuples on a single index page.
*
* If this update inserts a downlink for a split page, also record that
* the F_FOLLOW_RIGHT flag on the child page is cleared and NSN set.
* *
* Note that both the todelete array and the tuples are marked as belonging * Note that both the todelete array and the tuples are marked as belonging
* to the target buffer; they need not be stored in XLOG if XLogInsert decides * to the target buffer; they need not be stored in XLOG if XLogInsert decides
...@@ -911,27 +489,26 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, ...@@ -911,27 +489,26 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf,
* at least one rdata item referencing the buffer, even when ntodelete and * at least one rdata item referencing the buffer, even when ntodelete and
* ituplen are both zero; this ensures that XLogInsert knows about the buffer. * ituplen are both zero; this ensures that XLogInsert knows about the buffer.
*/ */
XLogRecData * XLogRecPtr
formUpdateRdata(RelFileNode node, Buffer buffer, gistXLogUpdate(RelFileNode node, Buffer buffer,
OffsetNumber *todelete, int ntodelete, OffsetNumber *todelete, int ntodelete,
IndexTuple *itup, int ituplen, ItemPointer key) IndexTuple *itup, int ituplen,
Buffer leftchildbuf)
{ {
XLogRecData *rdata; XLogRecData *rdata;
gistxlogPageUpdate *xlrec; gistxlogPageUpdate *xlrec;
int cur, int cur,
i; i;
XLogRecPtr recptr;
rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen)); rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen));
xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
xlrec->node = node; xlrec->node = node;
xlrec->blkno = BufferGetBlockNumber(buffer); xlrec->blkno = BufferGetBlockNumber(buffer);
xlrec->ntodelete = ntodelete; xlrec->ntodelete = ntodelete;
xlrec->leftchild =
if (key) BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
xlrec->key = *key;
else
ItemPointerSetInvalid(&(xlrec->key));
rdata[0].buffer = buffer; rdata[0].buffer = buffer;
rdata[0].buffer_std = true; rdata[0].buffer_std = true;
...@@ -945,13 +522,13 @@ formUpdateRdata(RelFileNode node, Buffer buffer, ...@@ -945,13 +522,13 @@ formUpdateRdata(RelFileNode node, Buffer buffer,
rdata[1].next = &(rdata[2]); rdata[1].next = &(rdata[2]);
rdata[2].data = (char *) todelete; rdata[2].data = (char *) todelete;
rdata[2].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); rdata[2].len = sizeof(OffsetNumber) * ntodelete;
rdata[2].buffer = buffer; rdata[2].buffer = buffer;
rdata[2].buffer_std = true; rdata[2].buffer_std = true;
rdata[2].next = NULL;
/* new tuples */
cur = 3; cur = 3;
/* new tuples */
for (i = 0; i < ituplen; i++) for (i = 0; i < ituplen; i++)
{ {
rdata[cur - 1].next = &(rdata[cur]); rdata[cur - 1].next = &(rdata[cur]);
...@@ -959,38 +536,26 @@ formUpdateRdata(RelFileNode node, Buffer buffer, ...@@ -959,38 +536,26 @@ formUpdateRdata(RelFileNode node, Buffer buffer,
rdata[cur].len = IndexTupleSize(itup[i]); rdata[cur].len = IndexTupleSize(itup[i]);
rdata[cur].buffer = buffer; rdata[cur].buffer = buffer;
rdata[cur].buffer_std = true; rdata[cur].buffer_std = true;
rdata[cur].next = NULL;
cur++; cur++;
} }
return rdata; /*
} * Include a full page image of the child buf. (only necessary if
* a checkpoint happened since the child page was split)
XLogRecPtr */
gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len) if (BufferIsValid(leftchildbuf))
{ {
gistxlogInsertComplete xlrec; rdata[cur - 1].next = &(rdata[cur]);
XLogRecData rdata[2]; rdata[cur].data = NULL;
XLogRecPtr recptr; rdata[cur].len = 0;
rdata[cur].buffer = leftchildbuf;
Assert(len > 0); rdata[cur].buffer_std = true;
xlrec.node = node; cur++;
}
rdata[0].buffer = InvalidBuffer; rdata[cur - 1].next = NULL;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof(gistxlogInsertComplete);
rdata[0].next = &(rdata[1]);
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) keys;
rdata[1].len = sizeof(ItemPointerData) * len;
rdata[1].next = NULL;
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, rdata);
END_CRIT_SECTION(); recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
pfree(rdata);
return recptr; return recptr;
} }
...@@ -40,6 +40,6 @@ const RmgrData RmgrTable[RM_MAX_ID + 1] = { ...@@ -40,6 +40,6 @@ const RmgrData RmgrTable[RM_MAX_ID + 1] = {
{"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint}, {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup, btree_safe_restartpoint},
{"Hash", hash_redo, hash_desc, NULL, NULL, NULL}, {"Hash", hash_redo, hash_desc, NULL, NULL, NULL},
{"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint}, {"Gin", gin_redo, gin_desc, gin_xlog_startup, gin_xlog_cleanup, gin_safe_restartpoint},
{"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, gist_safe_restartpoint}, {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup, NULL},
{"Sequence", seq_redo, seq_desc, NULL, NULL, NULL} {"Sequence", seq_redo, seq_desc, NULL, NULL, NULL}
}; };
...@@ -58,9 +58,10 @@ ...@@ -58,9 +58,10 @@
/* /*
* Page opaque data in a GiST index page. * Page opaque data in a GiST index page.
*/ */
#define F_LEAF (1 << 0) #define F_LEAF (1 << 0) /* leaf page */
#define F_DELETED (1 << 1) #define F_DELETED (1 << 1) /* the page has been deleted */
#define F_TUPLES_DELETED (1 << 2) #define F_TUPLES_DELETED (1 << 2) /* some tuples on the page are dead */
#define F_FOLLOW_RIGHT (1 << 3) /* page to the right has no downlink */
typedef XLogRecPtr GistNSN; typedef XLogRecPtr GistNSN;
...@@ -132,6 +133,10 @@ typedef struct GISTENTRY ...@@ -132,6 +133,10 @@ typedef struct GISTENTRY
#define GistMarkTuplesDeleted(page) ( GistPageGetOpaque(page)->flags |= F_TUPLES_DELETED) #define GistMarkTuplesDeleted(page) ( GistPageGetOpaque(page)->flags |= F_TUPLES_DELETED)
#define GistClearTuplesDeleted(page) ( GistPageGetOpaque(page)->flags &= ~F_TUPLES_DELETED) #define GistClearTuplesDeleted(page) ( GistPageGetOpaque(page)->flags &= ~F_TUPLES_DELETED)
#define GistFollowRight(page) ( GistPageGetOpaque(page)->flags & F_FOLLOW_RIGHT)
#define GistMarkFollowRight(page) ( GistPageGetOpaque(page)->flags |= F_FOLLOW_RIGHT)
#define GistClearFollowRight(page) ( GistPageGetOpaque(page)->flags &= ~F_FOLLOW_RIGHT)
/* /*
* Vector of GISTENTRY structs; user-defined methods union and picksplit * Vector of GISTENTRY structs; user-defined methods union and picksplit
* take it as one of their arguments * take it as one of their arguments
......
...@@ -132,9 +132,9 @@ typedef GISTScanOpaqueData *GISTScanOpaque; ...@@ -132,9 +132,9 @@ typedef GISTScanOpaqueData *GISTScanOpaque;
/* XLog stuff */ /* XLog stuff */
#define XLOG_GIST_PAGE_UPDATE 0x00 #define XLOG_GIST_PAGE_UPDATE 0x00
#define XLOG_GIST_NEW_ROOT 0x20 /* #define XLOG_GIST_NEW_ROOT 0x20 */ /* not used anymore */
#define XLOG_GIST_PAGE_SPLIT 0x30 #define XLOG_GIST_PAGE_SPLIT 0x30
#define XLOG_GIST_INSERT_COMPLETE 0x40 /* #define XLOG_GIST_INSERT_COMPLETE 0x40 */ /* not used anymore */
#define XLOG_GIST_CREATE_INDEX 0x50 #define XLOG_GIST_CREATE_INDEX 0x50
#define XLOG_GIST_PAGE_DELETE 0x60 #define XLOG_GIST_PAGE_DELETE 0x60
...@@ -144,9 +144,10 @@ typedef struct gistxlogPageUpdate ...@@ -144,9 +144,10 @@ typedef struct gistxlogPageUpdate
BlockNumber blkno; BlockNumber blkno;
/* /*
* It used to identify completeness of insert. Sets to leaf itup * If this operation completes a page split, by inserting a downlink for
* the split page, leftchild points to the left half of the split.
*/ */
ItemPointerData key; BlockNumber leftchild;
/* number of deleted offsets */ /* number of deleted offsets */
uint16 ntodelete; uint16 ntodelete;
...@@ -160,11 +161,12 @@ typedef struct gistxlogPageSplit ...@@ -160,11 +161,12 @@ typedef struct gistxlogPageSplit
{ {
RelFileNode node; RelFileNode node;
BlockNumber origblkno; /* splitted page */ BlockNumber origblkno; /* splitted page */
BlockNumber origrlink; /* rightlink of the page before split */
GistNSN orignsn; /* NSN of the page before split */
bool origleaf; /* was splitted page a leaf page? */ bool origleaf; /* was splitted page a leaf page? */
uint16 npage;
/* see comments on gistxlogPageUpdate */ BlockNumber leftchild; /* like in gistxlogPageUpdate */
ItemPointerData key; uint16 npage; /* # of pages in the split */
/* /*
* follow: 1. gistxlogPage and array of IndexTupleData per page * follow: 1. gistxlogPage and array of IndexTupleData per page
...@@ -177,12 +179,6 @@ typedef struct gistxlogPage ...@@ -177,12 +179,6 @@ typedef struct gistxlogPage
int num; /* number of index tuples following */ int num; /* number of index tuples following */
} gistxlogPage; } gistxlogPage;
typedef struct gistxlogInsertComplete
{
RelFileNode node;
/* follows ItemPointerData key to clean */
} gistxlogInsertComplete;
typedef struct gistxlogPageDelete typedef struct gistxlogPageDelete
{ {
RelFileNode node; RelFileNode node;
...@@ -206,7 +202,6 @@ typedef struct SplitedPageLayout ...@@ -206,7 +202,6 @@ typedef struct SplitedPageLayout
* GISTInsertStack used for locking buffers and transfer arguments during * GISTInsertStack used for locking buffers and transfer arguments during
* insertion * insertion
*/ */
typedef struct GISTInsertStack typedef struct GISTInsertStack
{ {
/* current page */ /* current page */
...@@ -223,9 +218,8 @@ typedef struct GISTInsertStack ...@@ -223,9 +218,8 @@ typedef struct GISTInsertStack
/* child's offset */ /* child's offset */
OffsetNumber childoffnum; OffsetNumber childoffnum;
/* pointer to parent and child */ /* pointer to parent */
struct GISTInsertStack *parent; struct GISTInsertStack *parent;
struct GISTInsertStack *child;
/* for gistFindPath */ /* for gistFindPath */
struct GISTInsertStack *next; struct GISTInsertStack *next;
...@@ -238,12 +232,10 @@ typedef struct GistSplitVector ...@@ -238,12 +232,10 @@ typedef struct GistSplitVector
Datum spl_lattr[INDEX_MAX_KEYS]; /* Union of subkeys in Datum spl_lattr[INDEX_MAX_KEYS]; /* Union of subkeys in
* spl_left */ * spl_left */
bool spl_lisnull[INDEX_MAX_KEYS]; bool spl_lisnull[INDEX_MAX_KEYS];
bool spl_leftvalid;
Datum spl_rattr[INDEX_MAX_KEYS]; /* Union of subkeys in Datum spl_rattr[INDEX_MAX_KEYS]; /* Union of subkeys in
* spl_right */ * spl_right */
bool spl_risnull[INDEX_MAX_KEYS]; bool spl_risnull[INDEX_MAX_KEYS];
bool spl_rightvalid;
bool *spl_equiv; /* equivalent tuples which can be freely bool *spl_equiv; /* equivalent tuples which can be freely
* distributed between left and right pages */ * distributed between left and right pages */
...@@ -252,28 +244,40 @@ typedef struct GistSplitVector ...@@ -252,28 +244,40 @@ typedef struct GistSplitVector
typedef struct typedef struct
{ {
Relation r; Relation r;
IndexTuple *itup; /* in/out, points to compressed entry */
int ituplen; /* length of itup */
Size freespace; /* free space to be left */ Size freespace; /* free space to be left */
GISTInsertStack *stack;
bool needInsertComplete;
/* pointer to heap tuple */ GISTInsertStack *stack;
ItemPointerData key;
} GISTInsertState; } GISTInsertState;
/* root page of a gist index */ /* root page of a gist index */
#define GIST_ROOT_BLKNO 0 #define GIST_ROOT_BLKNO 0
/* /*
* mark tuples on inner pages during recovery * Before PostgreSQL 9.1, we used rely on so-called "invalid tuples" on inner
* pages to finish crash recovery of incomplete page splits. If a crash
* happened in the middle of a page split, so that the downlink pointers were
* not yet inserted, crash recovery inserted a special downlink pointer. The
* semantics of an invalid tuple was that it if you encounter one in a scan,
* it must always be followed, because we don't know if the tuples on the
* child page match or not.
*
* We no longer create such invalid tuples, we now mark the left-half of such
* an incomplete split with the F_FOLLOW_RIGHT flag instead, and finish the
* split properly the next time we need to insert on that page. To retain
* on-disk compatibility for the sake of pg_upgrade, we still store 0xffff as
* the offset number of all inner tuples. If we encounter any invalid tuples
* with 0xfffe during insertion, we throw an error, though scans still handle
* them. You should only encounter invalid tuples if you pg_upgrade a pre-9.1
* gist index which already has invalid tuples in it because of a crash. That
* should be rare, and you are recommended to REINDEX anyway if you have any
* invalid tuples in an index, so throwing an error is as far as we go with
* supporting that.
*/ */
#define TUPLE_IS_VALID 0xffff #define TUPLE_IS_VALID 0xffff
#define TUPLE_IS_INVALID 0xfffe #define TUPLE_IS_INVALID 0xfffe
#define GistTupleIsInvalid(itup) ( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID ) #define GistTupleIsInvalid(itup) ( ItemPointerGetOffsetNumber( &((itup)->t_tid) ) == TUPLE_IS_INVALID )
#define GistTupleSetValid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID ) #define GistTupleSetValid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_VALID )
#define GistTupleSetInvalid(itup) ItemPointerSetOffsetNumber( &((itup)->t_tid), TUPLE_IS_INVALID )
/* gist.c */ /* gist.c */
extern Datum gistbuild(PG_FUNCTION_ARGS); extern Datum gistbuild(PG_FUNCTION_ARGS);
...@@ -281,8 +285,6 @@ extern Datum gistinsert(PG_FUNCTION_ARGS); ...@@ -281,8 +285,6 @@ extern Datum gistinsert(PG_FUNCTION_ARGS);
extern MemoryContext createTempGistContext(void); extern MemoryContext createTempGistContext(void);
extern void initGISTstate(GISTSTATE *giststate, Relation index); extern void initGISTstate(GISTSTATE *giststate, Relation index);
extern void freeGISTstate(GISTSTATE *giststate); extern void freeGISTstate(GISTSTATE *giststate);
extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key);
extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup, extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup,
int len, GISTSTATE *giststate); int len, GISTSTATE *giststate);
...@@ -294,18 +296,17 @@ extern void gist_redo(XLogRecPtr lsn, XLogRecord *record); ...@@ -294,18 +296,17 @@ extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec); extern void gist_desc(StringInfo buf, uint8 xl_info, char *rec);
extern void gist_xlog_startup(void); extern void gist_xlog_startup(void);
extern void gist_xlog_cleanup(void); extern void gist_xlog_cleanup(void);
extern bool gist_safe_restartpoint(void);
extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer, extern XLogRecPtr gistXLogUpdate(RelFileNode node, Buffer buffer,
OffsetNumber *todelete, int ntodelete, OffsetNumber *todelete, int ntodelete,
IndexTuple *itup, int ituplen, ItemPointer key); IndexTuple *itup, int ntup,
Buffer leftchild);
extern XLogRecData *formSplitRdata(RelFileNode node, extern XLogRecPtr gistXLogSplit(RelFileNode node,
BlockNumber blkno, bool page_is_leaf, BlockNumber blkno, bool page_is_leaf,
ItemPointer key, SplitedPageLayout *dist); SplitedPageLayout *dist,
BlockNumber origrlink, GistNSN oldnsn,
extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len); Buffer leftchild);
/* gistget.c */ /* gistget.c */
extern Datum gistgettuple(PG_FUNCTION_ARGS); extern Datum gistgettuple(PG_FUNCTION_ARGS);
...@@ -357,7 +358,7 @@ extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, ...@@ -357,7 +358,7 @@ extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
extern float gistpenalty(GISTSTATE *giststate, int attno, extern float gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *key1, bool isNull1, GISTENTRY *key1, bool isNull1,
GISTENTRY *key2, bool isNull2); GISTENTRY *key2, bool isNull2);
extern bool gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, extern void gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
Datum *attr, bool *isnull); Datum *attr, bool *isnull);
extern bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b); extern bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b);
extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment