Commit 10dd8df6 authored by Teodor Sigaev's avatar Teodor Sigaev

Reduce size of critical section and remove call of user-defined functions in

insertion and deletion, modify gistSplit() to do not use buffers.

 TODO: gistvacuumcleanup and XLOG
parent 12049d34
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.132 2006/04/03 13:44:33 teodor Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.133 2006/05/10 09:19:54 teodor Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -52,6 +52,8 @@ static void gistfindleaf(GISTInsertState *state,
#define ROTATEDIST(d) do { \
SplitedPageLayout *tmp=(SplitedPageLayout*)palloc(sizeof(SplitedPageLayout)); \
memset(tmp,0,sizeof(SplitedPageLayout)); \
tmp->block.blkno = InvalidBlockNumber; \
tmp->buffer = InvalidBuffer; \
tmp->next = (d); \
(d)=tmp; \
} while(0)
......@@ -309,52 +311,111 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
bool is_splitted = false;
bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
/*
* XXX this code really ought to work by locking, but not modifying,
* all the buffers it needs; then starting a critical section; then
* modifying the buffers in an already-determined way and writing an
* XLOG record to reflect that. Since it doesn't, we've got to put
* a critical section around the entire process, which is horrible
* from a robustness point of view.
*/
START_CRIT_SECTION();
if (!is_leaf)
/*
* if (!is_leaf) remove old key:
* This node's key has been modified, either because a child split
* occurred or because we needed to adjust our key for an insert in a
* child node. Therefore, remove the old version of this node's key.
*
* Note: for WAL replay, in the non-split case we handle this by
* for WAL replay, in the non-split case we handle this by
* setting up a one-element todelete array; in the split case, it's
* handled implicitly because the tuple vector passed to gistSplit
* won't include this tuple.
*/
PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
if (gistnospace(state->stack->page, state->itup, state->ituplen))
if (gistnospace(state->stack->page, state->itup, state->ituplen, (is_leaf) ? InvalidOffsetNumber : state->stack->childoffnum))
{
/* no space for insertion */
IndexTuple *itvec,
*newitup;
IndexTuple *itvec;
int tlen;
SplitedPageLayout *dist = NULL,
*ptr;
BlockNumber rrlink = InvalidBlockNumber;
GistNSN oldnsn;
is_splitted = true;
/*
* Form index tuples vector to split:
* remove old tuple if t's needed and add new tuples to vector
*/
itvec = gistextractbuffer(state->stack->buffer, &tlen);
if ( !is_leaf ) {
/* on inner page we should remove old tuple */
int pos = state->stack->childoffnum - FirstOffsetNumber;
tlen--;
if ( pos != tlen )
memmove( itvec+pos, itvec + pos + 1, sizeof( IndexTuple ) * (tlen-pos) );
}
itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate);
dist = gistSplit(state->r, state->stack->page, itvec, tlen, giststate);
state->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * tlen);
state->ituplen = 0;
if (state->stack->blkno != GIST_ROOT_BLKNO) {
/* if non-root split then we should not allocate new buffer,
but we must create temporary page to operate */
dist->buffer = state->stack->buffer;
dist->page = PageGetTempPage( BufferGetPage(dist->buffer), sizeof(GISTPageOpaqueData) );
/*clean all flags except F_LEAF */
GistPageGetOpaque(dist->page)->flags = (is_leaf) ? F_LEAF : 0;
}
/* make new pages and fills them */
for (ptr = dist; ptr; ptr = ptr->next) {
int i;
char *data;
/* get new page */
if ( ptr->buffer == InvalidBuffer ) {
ptr->buffer = gistNewBuffer( state->r );
GISTInitBuffer( ptr->buffer, (is_leaf) ? F_LEAF : 0 );
ptr->page = BufferGetPage(ptr->buffer);
}
ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
/* fill page, we can do it becouse all this pages are new (ie not linked in tree
or masked by temp page */
data = (char*)(ptr->list);
for(i=0;i<ptr->block.num;i++) {
if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(state->r));
data += IndexTupleSize((IndexTuple)data);
}
/* set up ItemPointer and remmeber it for parent */
ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
state->itup[ state->ituplen ] = ptr->itup;
state->ituplen++;
}
/* saves old rightlink */
if ( state->stack->blkno != GIST_ROOT_BLKNO )
rrlink = GistPageGetOpaque(dist->page)->rightlink;
START_CRIT_SECTION();
/*
* must mark buffers dirty before XLogInsert, even though we'll
* still be changing their opaque fields below
* still be changing their opaque fields below.
* set up right links.
*/
for (ptr = dist; ptr; ptr = ptr->next)
{
MarkBufferDirty(ptr->buffer);
GistPageGetOpaque(ptr->page)->rightlink = (ptr->next) ?
ptr->next->block.blkno : rrlink;
}
/* restore splitted non-root page */
if ( state->stack->blkno != GIST_ROOT_BLKNO ) {
PageRestoreTempPage( dist->page, BufferGetPage( dist->buffer ) );
dist->page = BufferGetPage( dist->buffer );
}
if (!state->r->rd_istemp)
......@@ -366,88 +427,44 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
is_leaf, &(state->key), dist);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
for (ptr = dist; ptr; ptr = ptr->next)
{
PageSetLSN(BufferGetPage(ptr->buffer), recptr);
PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
PageSetLSN(ptr->page, recptr);
PageSetTLI(ptr->page, ThisTimeLineID);
}
}
else
{
for (ptr = dist; ptr; ptr = ptr->next)
{
PageSetLSN(BufferGetPage(ptr->buffer), XLogRecPtrForTemp);
PageSetLSN(ptr->page, XLogRecPtrForTemp);
}
}
state->itup = newitup;
state->ituplen = tlen; /* now tlen >= 2 */
/* set up NSN */
oldnsn = GistPageGetOpaque(dist->page)->nsn;
if ( state->stack->blkno == GIST_ROOT_BLKNO )
/* if root split we should put initial value */
oldnsn = PageGetLSN(dist->page);
if (state->stack->blkno == GIST_ROOT_BLKNO)
{
gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
state->needInsertComplete = false;
for (ptr = dist; ptr; ptr = ptr->next)
{
Page page = (Page) BufferGetPage(ptr->buffer);
GistPageGetOpaque(page)->rightlink = (ptr->next) ?
ptr->next->block.blkno : InvalidBlockNumber;
GistPageGetOpaque(page)->nsn = PageGetLSN(page);
UnlockReleaseBuffer(ptr->buffer);
}
}
else
{
Page page;
BlockNumber rightrightlink = InvalidBlockNumber;
SplitedPageLayout *ourpage = NULL;
GistNSN oldnsn;
GISTPageOpaque opaque;
/* move origpage to first in chain */
if (dist->block.blkno != state->stack->blkno)
{
ptr = dist;
while (ptr->next)
{
if (ptr->next->block.blkno == state->stack->blkno)
{
ourpage = ptr->next;
ptr->next = ptr->next->next;
ourpage->next = dist;
dist = ourpage;
break;
}
ptr = ptr->next;
}
Assert(ourpage != NULL);
for (ptr = dist; ptr; ptr = ptr->next) {
/* only for last set oldnsn */
GistPageGetOpaque(ptr->page)->nsn = (ptr->next) ?
PageGetLSN(ptr->page) : oldnsn;
}
else
ourpage = dist;
/* now gets all needed data, and sets nsn's */
page = (Page) BufferGetPage(ourpage->buffer);
opaque = GistPageGetOpaque(page);
rightrightlink = opaque->rightlink;
oldnsn = opaque->nsn;
opaque->nsn = PageGetLSN(page);
opaque->rightlink = ourpage->next->block.blkno;
/*
* fill and release all new pages. They isn't linked into tree yet
* release buffers, if it was a root split then
* release all buffers because we create all buffers
*/
for (ptr = ourpage->next; ptr; ptr = ptr->next)
{
page = (Page) BufferGetPage(ptr->buffer);
GistPageGetOpaque(page)->rightlink = (ptr->next) ?
ptr->next->block.blkno : rightrightlink;
/* only for last set oldnsn */
GistPageGetOpaque(page)->nsn = (ptr->next) ?
opaque->nsn : oldnsn;
ptr = ( state->stack->blkno == GIST_ROOT_BLKNO ) ? dist : dist->next;
for(; ptr; ptr = ptr->next)
UnlockReleaseBuffer(ptr->buffer);
}
if (state->stack->blkno == GIST_ROOT_BLKNO) {
gistnewroot(state->r, state->stack->buffer, state->itup, state->ituplen, &(state->key));
state->needInsertComplete = false;
}
END_CRIT_SECTION();
......@@ -455,13 +472,14 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
else
{
/* enough space */
XLogRecPtr oldlsn;
START_CRIT_SECTION();
if (!is_leaf)
PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, InvalidOffsetNumber);
MarkBufferDirty(state->stack->buffer);
oldlsn = PageGetLSN(state->stack->page);
if (!state->r->rd_istemp)
{
OffsetNumber noffs = 0,
......@@ -921,77 +939,55 @@ gistToRealOffset(OffsetNumber *arr, int len, OffsetNumber *reasloffset)
arr[i] = reasloffset[arr[i]];
}
static IndexTupleData *
gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
char *ptr, *ret = palloc(BLCKSZ);
int i;
ptr = ret;
for (i = 0; i < veclen; i++) {
memcpy(ptr, vec[i], IndexTupleSize(vec[i]));
ptr += IndexTupleSize(vec[i]);
}
*memlen = ptr - ret;
Assert( *memlen < BLCKSZ );
return (IndexTupleData*)ret;
}
/*
* gistSplit -- split a page in the tree.
*/
IndexTuple *
SplitedPageLayout *
gistSplit(Relation r,
Buffer buffer,
Page page,
IndexTuple *itup, /* contains compressed entry */
int *len,
SplitedPageLayout **dist,
int len,
GISTSTATE *giststate)
{
Page p;
Buffer leftbuf,
rightbuf;
Page left,
right;
IndexTuple *lvectup,
*rvectup,
*newtup;
BlockNumber lbknum,
rbknum;
GISTPageOpaque opaque;
*rvectup;
GIST_SPLITVEC v;
GistEntryVector *entryvec;
int i,
fakeoffset,
nlen;
fakeoffset;
OffsetNumber *realoffset;
IndexTuple *cleaneditup = itup;
int lencleaneditup = *len;
p = (Page) BufferGetPage(buffer);
opaque = GistPageGetOpaque(p);
/*
* The root of the tree is the first block in the relation. If we're
* about to split the root, we need to do some hocus-pocus to enforce this
* guarantee.
*/
if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO)
{
leftbuf = gistNewBuffer(r);
GISTInitBuffer(leftbuf, opaque->flags & F_LEAF);
lbknum = BufferGetBlockNumber(leftbuf);
left = (Page) BufferGetPage(leftbuf);
}
else
{
leftbuf = buffer;
/* IncrBufferRefCount(buffer); */
lbknum = BufferGetBlockNumber(buffer);
left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData));
}
rightbuf = gistNewBuffer(r);
GISTInitBuffer(rightbuf, opaque->flags & F_LEAF);
rbknum = BufferGetBlockNumber(rightbuf);
right = (Page) BufferGetPage(rightbuf);
int lencleaneditup = len;
SplitedPageLayout *res = NULL;
/* generate the item array */
realoffset = palloc((*len + 1) * sizeof(OffsetNumber));
entryvec = palloc(GEVHDRSZ + (*len + 1) * sizeof(GISTENTRY));
entryvec->n = *len + 1;
realoffset = palloc((len + 1) * sizeof(OffsetNumber));
entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
entryvec->n = len + 1;
fakeoffset = FirstOffsetNumber;
for (i = 1; i <= *len; i++)
for (i = 1; i <= len; i++)
{
Datum datum;
bool IsNull;
if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup[i - 1]))
if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
{
entryvec->n--;
/* remember position of invalid tuple */
......@@ -1001,7 +997,7 @@ gistSplit(Relation r,
datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
gistdentryinit(giststate, 0, &(entryvec->vector[fakeoffset]),
datum, r, p, i,
datum, r, page, i,
ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
FALSE, IsNull);
realoffset[fakeoffset] = i;
......@@ -1013,14 +1009,14 @@ gistSplit(Relation r,
* possible, we move all invalid tuples on right page. We should remember,
* that union with invalid tuples is a invalid tuple.
*/
if (entryvec->n != *len + 1)
if (entryvec->n != len + 1)
{
lencleaneditup = entryvec->n - 1;
cleaneditup = (IndexTuple *) palloc(lencleaneditup * sizeof(IndexTuple));
for (i = 1; i < entryvec->n; i++)
cleaneditup[i - 1] = itup[realoffset[i] - 1];
if (gistnospace(left, cleaneditup, lencleaneditup))
if (!gistfitpage(cleaneditup, lencleaneditup))
{
/* no space on left to put all good tuples, so picksplit */
gistUserPicksplit(r, entryvec, &v, cleaneditup, lencleaneditup, giststate);
......@@ -1041,8 +1037,8 @@ gistSplit(Relation r,
v.spl_leftvalid = v.spl_rightvalid = false;
v.spl_nright = 0;
v.spl_nleft = 0;
for (i = 1; i <= *len; i++)
if (i - 1 < *len / 2)
for (i = 1; i <= len; i++)
if (i - 1 < len / 2)
v.spl_left[v.spl_nleft++] = i;
else
v.spl_right[v.spl_nright++] = i;
......@@ -1071,14 +1067,14 @@ gistSplit(Relation r,
else
{
/* there is no invalid tuples, so usial processing */
gistUserPicksplit(r, entryvec, &v, itup, *len, giststate);
gistUserPicksplit(r, entryvec, &v, itup, len, giststate);
v.spl_leftvalid = v.spl_rightvalid = true;
}
/* form left and right vector */
lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len + 1));
rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (*len + 1));
lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
for (i = 0; i < v.spl_nleft; i++)
lvectup[i] = itup[v.spl_left[i] - 1];
......@@ -1087,87 +1083,48 @@ gistSplit(Relation r,
rvectup[i] = itup[v.spl_right[i] - 1];
/* place invalid tuples on right page if itsn't done yet */
for (fakeoffset = entryvec->n; fakeoffset < *len + 1 && lencleaneditup; fakeoffset++)
for (fakeoffset = entryvec->n; fakeoffset < len + 1 && lencleaneditup; fakeoffset++)
{
rvectup[v.spl_nright++] = itup[realoffset[fakeoffset] - 1];
}
/* write on disk (may need another split) */
if (gistnospace(right, rvectup, v.spl_nright))
/* finalyze splitting (may need another split) */
if (!gistfitpage(rvectup, v.spl_nright))
{
nlen = v.spl_nright;
newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate);
/* ReleaseBuffer(rightbuf); */
res = gistSplit(r, page, rvectup, v.spl_nright, giststate);
}
else
{
char *ptr;
gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber);
/* XLOG stuff */
ROTATEDIST(*dist);
(*dist)->block.blkno = BufferGetBlockNumber(rightbuf);
(*dist)->block.num = v.spl_nright;
(*dist)->list = (IndexTupleData *) palloc(BLCKSZ);
ptr = (char *) ((*dist)->list);
for (i = 0; i < v.spl_nright; i++)
{
memcpy(ptr, rvectup[i], IndexTupleSize(rvectup[i]));
ptr += IndexTupleSize(rvectup[i]);
}
(*dist)->lenlist = ptr - ((char *) ((*dist)->list));
(*dist)->buffer = rightbuf;
nlen = 1;
newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1);
newtup[0] = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull)
: gist_form_invalid_tuple(rbknum);
ItemPointerSetBlockNumber(&(newtup[0]->t_tid), rbknum);
ROTATEDIST(res);
res->block.num = v.spl_nright;
res->list = gistfillitupvec(rvectup, v.spl_nright, &( res->lenlist ) );
res->itup = (v.spl_rightvalid) ? gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull)
: gist_form_invalid_tuple(GIST_ROOT_BLKNO);
}
if (gistnospace(left, lvectup, v.spl_nleft))
if (!gistfitpage(lvectup, v.spl_nleft))
{
int llen = v.spl_nleft;
IndexTuple *lntup;
SplitedPageLayout *resptr, *subres;
resptr = subres = gistSplit(r, page, lvectup, v.spl_nleft, giststate);
lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate);
/* ReleaseBuffer(leftbuf); */
/* install on list's tail */
while( resptr->next )
resptr = resptr->next;
newtup = gistjoinvector(newtup, &nlen, lntup, llen);
resptr->next = res;
res = subres;
}
else
{
char *ptr;
gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber);
/* XLOG stuff */
ROTATEDIST(*dist);
(*dist)->block.blkno = BufferGetBlockNumber(leftbuf);
(*dist)->block.num = v.spl_nleft;
(*dist)->list = (IndexTupleData *) palloc(BLCKSZ);
ptr = (char *) ((*dist)->list);
for (i = 0; i < v.spl_nleft; i++)
{
memcpy(ptr, lvectup[i], IndexTupleSize(lvectup[i]));
ptr += IndexTupleSize(lvectup[i]);
ROTATEDIST(res);
res->block.num = v.spl_nleft;
res->list = gistfillitupvec(lvectup, v.spl_nleft, &( res->lenlist ) );
res->itup = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull)
: gist_form_invalid_tuple(GIST_ROOT_BLKNO);
}
(*dist)->lenlist = ptr - ((char *) ((*dist)->list));
(*dist)->buffer = leftbuf;
if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO)
PageRestoreTempPage(left, p);
nlen += 1;
newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen);
newtup[nlen - 1] = (v.spl_leftvalid) ? gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull)
: gist_form_invalid_tuple(lbknum);
ItemPointerSetBlockNumber(&(newtup[nlen - 1]->t_tid), lbknum);
}
GistClearTuplesDeleted(p);
*len = nlen;
return newtup;
return res;
}
/*
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.10 2006/03/05 15:58:20 momjian Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.11 2006/05/10 09:19:54 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
......@@ -81,15 +81,31 @@ gistfillbuffer(Relation r, Page page, IndexTuple *itup,
* Check space for itup vector on page
*/
bool
gistnospace(Page page, IndexTuple *itvec, int len)
gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete)
{
unsigned int size = 0;
unsigned int size = 0, deleted = 0;
int i;
for (i = 0; i < len; i++)
size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
return (PageGetFreeSpace(page) < size);
if ( todelete != InvalidOffsetNumber ) {
IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete));
deleted = IndexTupleSize(itup) + sizeof(ItemIdData);
}
return (PageGetFreeSpace(page) + deleted < size);
}
bool
gistfitpage(IndexTuple *itvec, int len) {
int i;
Size size=0;
for(i=0;i<len;i++)
size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
return (size <= GiSTPageSize);
}
/*
......@@ -107,7 +123,7 @@ gistextractbuffer(Buffer buffer, int *len /* out */ )
*len = maxoff;
itvec = palloc(sizeof(IndexTuple) * maxoff);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
return itvec;
}
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.19 2006/05/02 22:25:10 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.20 2006/05/10 09:19:54 teodor Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -85,10 +85,7 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
if (GistPageIsLeaf(page))
{
if (GistTuplesDeleted(page))
{
needunion = needwrite = true;
GistClearTuplesDeleted(page);
}
}
else
{
......@@ -157,30 +154,54 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
if (curlenaddon)
{
/* insert updated tuples */
if (gistnospace(page, addon, curlenaddon))
if (gistnospace(page, addon, curlenaddon, InvalidOffsetNumber))
{
/* there is no space on page to insert tuples */
IndexTuple *vec;
SplitedPageLayout *dist = NULL,
*ptr;
int i;
int i, veclen=0;
MemoryContext oldCtx = MemoryContextSwitchTo(gv->opCtx);
vec = gistextractbuffer(buffer, &(res.ituplen));
vec = gistjoinvector(vec, &(res.ituplen), addon, curlenaddon);
res.itup = gistSplit(gv->index, buffer, vec, &(res.ituplen), &dist, &(gv->giststate));
vec = gistextractbuffer(buffer, &veclen);
vec = gistjoinvector(vec, &veclen, addon, curlenaddon);
dist = gistSplit(gv->index, page, vec, veclen, &(gv->giststate));
MemoryContextSwitchTo(oldCtx);
vec = (IndexTuple *) palloc(sizeof(IndexTuple) * res.ituplen);
for (i = 0; i < res.ituplen; i++)
{
vec[i] = (IndexTuple) palloc(IndexTupleSize(res.itup[i]));
memcpy(vec[i], res.itup[i], IndexTupleSize(res.itup[i]));
if (blkno != GIST_ROOT_BLKNO) {
/* if non-root split then we should not allocate new buffer */
dist->buffer = buffer;
dist->page = BufferGetPage(dist->buffer);
GistPageGetOpaque(dist->page)->flags = 0;
}
res.itup = vec;
for (ptr = dist; ptr; ptr = ptr->next)
{
res.itup = (IndexTuple *) palloc(sizeof(IndexTuple) * veclen);
res.ituplen = 0;
/* make new pages and fills them */
for (ptr = dist; ptr; ptr = ptr->next) {
char *data;
if ( ptr->buffer == InvalidBuffer ) {
ptr->buffer = gistNewBuffer( gv->index );
GISTInitBuffer( ptr->buffer, 0 );
ptr->page = BufferGetPage(ptr->buffer);
}
ptr->block.blkno = BufferGetBlockNumber( ptr->buffer );
data = (char*)(ptr->list);
for(i=0;i<ptr->block.num;i++) {
if ( PageAddItem(ptr->page, (Item)data, IndexTupleSize((IndexTuple)data), i+FirstOffsetNumber, LP_USED) == InvalidOffsetNumber )
elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(gv->index));
data += IndexTupleSize((IndexTuple)data);
}
ItemPointerSetBlockNumber(&(ptr->itup->t_tid), ptr->block.blkno);
res.itup[ res.ituplen ] = (IndexTuple)palloc(IndexTupleSize(ptr->itup));
memcpy( res.itup[ res.ituplen ], ptr->itup, IndexTupleSize(ptr->itup) );
res.ituplen++;
MarkBufferDirty(ptr->buffer);
}
......@@ -218,10 +239,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
for (ptr = dist; ptr; ptr = ptr->next)
{
/* we must keep the buffer lock on the head page */
/* we must keep the buffer pin on the head page */
if (BufferGetBlockNumber(ptr->buffer) != blkno)
LockBuffer(ptr->buffer, GIST_UNLOCK);
ReleaseBuffer(ptr->buffer);
UnlockReleaseBuffer( ptr->buffer );
}
if (blkno == GIST_ROOT_BLKNO)
......@@ -294,6 +314,7 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
if (needwrite)
{
MarkBufferDirty(buffer);
GistClearTuplesDeleted(page);
if (!gv->index->rd_istemp)
{
......@@ -570,14 +591,7 @@ gistbulkdelete(PG_FUNCTION_ARGS)
/*
* Remove deletable tuples from page
*
* XXX try to make this critical section shorter. Could do it
* by separating the callback loop from the actual tuple deletion,
* but that would affect the definition of the todelete[] array
* passed into the WAL record (because the indexes would all be
* pre-deletion).
*/
START_CRIT_SECTION();
maxoff = PageGetMaxOffsetNumber(page);
......@@ -588,13 +602,9 @@ gistbulkdelete(PG_FUNCTION_ARGS)
if (callback(&(idxtuple->t_tid), callback_state))
{
PageIndexTupleDelete(page, i);
todelete[ntodelete] = i;
i--;
maxoff--;
todelete[ntodelete] = i-ntodelete;
ntodelete++;
stats->std.tuples_removed += 1;
Assert(maxoff == PageGetMaxOffsetNumber(page));
}
else
stats->std.num_index_tuples += 1;
......@@ -602,10 +612,14 @@ gistbulkdelete(PG_FUNCTION_ARGS)
if (ntodelete)
{
GistMarkTuplesDeleted(page);
START_CRIT_SECTION();
MarkBufferDirty(buffer);
for(i=0;i<ntodelete;i++)
PageIndexTupleDelete(page, todelete[i]);
GistMarkTuplesDeleted(page);
if (!rel->rd_istemp)
{
XLogRecData *rdata;
......@@ -627,10 +641,11 @@ gistbulkdelete(PG_FUNCTION_ARGS)
}
else
PageSetLSN(page, XLogRecPtrForTemp);
}
END_CRIT_SECTION();
}
}
else
{
/* check for split proceeded after look at parent */
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.15 2006/04/03 16:45:50 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.16 2006/05/10 09:19:54 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
......@@ -625,7 +625,7 @@ gistContinueInsert(gistIncompleteInsert *insert)
}
}
if (gistnospace(pages[numbuffer - 1], itup, lenitup))
if (gistnospace(pages[numbuffer - 1], itup, lenitup, InvalidOffsetNumber))
{
/* no space left on page, so we must split */
buffers[numbuffer] = ReadBuffer(index, P_NEW);
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.12 2006/03/30 23:03:10 tgl Exp $
* $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.13 2006/05/10 09:19:54 teodor Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -138,6 +138,8 @@ typedef struct SplitedPageLayout
gistxlogPage block;
IndexTupleData *list;
int lenlist;
IndexTuple itup; /* union key for page */
Page page; /* to operate */
Buffer buffer; /* to write after all proceed */
struct SplitedPageLayout *next;
......@@ -234,8 +236,8 @@ extern void freeGISTstate(GISTSTATE *giststate);
extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer key);
extern IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
int *len, SplitedPageLayout **dist, GISTSTATE *giststate);
extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup,
int len, GISTSTATE *giststate);
extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child);
......@@ -261,11 +263,16 @@ extern Datum gistgettuple(PG_FUNCTION_ARGS);
extern Datum gistgetmulti(PG_FUNCTION_ARGS);
/* gistutil.c */
#define GiSTPageSize \
( BLCKSZ - SizeOfPageHeaderData - MAXALIGN(sizeof(GISTPageOpaqueData)) )
extern bool gistfitpage(IndexTuple *itvec, int len);
extern bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete);
extern void gistcheckpage(Relation rel, Buffer buf);
extern Buffer gistNewBuffer(Relation r);
extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
int len, OffsetNumber off);
extern bool gistnospace(Page page, IndexTuple *itvec, int len);
extern IndexTuple *gistextractbuffer(Buffer buffer, int *len /* out */ );
extern IndexTuple *gistjoinvector(
IndexTuple *itvec, int *len,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment