Commit 09368d23 authored by Teodor Sigaev's avatar Teodor Sigaev

Fix 'all at one page bug' in picksplit method of R-tree emulation. Add defense

from buggy user-defined picksplit to GiST.
parent 1eef90d0
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistproc.c,v 1.15 2009/01/01 17:23:35 momjian Exp $ * $PostgreSQL: pgsql/src/backend/access/gist/gistproc.c,v 1.16 2009/04/06 14:27:27 teodor Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -272,6 +272,69 @@ chooseLR(GIST_SPLITVEC *v, ...@@ -272,6 +272,69 @@ chooseLR(GIST_SPLITVEC *v,
v->spl_ldatum_exists = v->spl_rdatum_exists = false; v->spl_ldatum_exists = v->spl_rdatum_exists = false;
} }
/*
* Trivial split: half of entries will be placed on one page
* and another half - to another
*/
static void
fallbackSplit(GistEntryVector *entryvec, GIST_SPLITVEC *v)
{
OffsetNumber i,
maxoff;
BOX *unionL = NULL,
*unionR = NULL;
int nbytes;
maxoff = entryvec->n - 1;
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
v->spl_left = (OffsetNumber *) palloc(nbytes);
v->spl_right = (OffsetNumber *) palloc(nbytes);
v->spl_nleft = v->spl_nright = 0;
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
BOX * cur = DatumGetBoxP(entryvec->vector[i].key);
if (i <= (maxoff - FirstOffsetNumber + 1) / 2)
{
v->spl_left[v->spl_nleft] = i;
if (unionL == NULL)
{
unionL = (BOX *) palloc(sizeof(BOX));
*unionL = *cur;
}
else
adjustBox(unionL, cur);
v->spl_nleft++;
}
else
{
v->spl_right[v->spl_nright] = i;
if (unionR == NULL)
{
unionR = (BOX *) palloc(sizeof(BOX));
*unionR = *cur;
}
else
adjustBox(unionR, cur);
v->spl_nright++;
}
}
if (v->spl_ldatum_exists)
adjustBox(unionL, DatumGetBoxP(v->spl_ldatum));
v->spl_ldatum = BoxPGetDatum(unionL);
if (v->spl_rdatum_exists)
adjustBox(unionR, DatumGetBoxP(v->spl_rdatum));
v->spl_rdatum = BoxPGetDatum(unionR);
v->spl_ldatum_exists = v->spl_rdatum_exists = false;
}
/* /*
* The GiST PickSplit method * The GiST PickSplit method
* *
...@@ -324,52 +387,22 @@ gist_box_picksplit(PG_FUNCTION_ARGS) ...@@ -324,52 +387,22 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
adjustBox(&pageunion, cur); adjustBox(&pageunion, cur);
} }
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
listL = (OffsetNumber *) palloc(nbytes);
listR = (OffsetNumber *) palloc(nbytes);
unionL = (BOX *) palloc(sizeof(BOX));
unionR = (BOX *) palloc(sizeof(BOX));
if (allisequal) if (allisequal)
{ {
cur = DatumGetBoxP(entryvec->vector[OffsetNumberNext(FirstOffsetNumber)].key); /*
if (memcmp((void *) cur, (void *) &pageunion, sizeof(BOX)) == 0) * All entries are the same
{ */
v->spl_left = listL; fallbackSplit(entryvec, v);
v->spl_right = listR; PG_RETURN_POINTER(v);
v->spl_nleft = v->spl_nright = 0;
memcpy((void *) unionL, (void *) &pageunion, sizeof(BOX));
memcpy((void *) unionR, (void *) &pageunion, sizeof(BOX));
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
if (i <= (maxoff - FirstOffsetNumber + 1) / 2)
{
v->spl_left[v->spl_nleft] = i;
v->spl_nleft++;
}
else
{
v->spl_right[v->spl_nright] = i;
v->spl_nright++;
}
}
if (v->spl_ldatum_exists)
adjustBox(unionL, DatumGetBoxP(v->spl_ldatum));
v->spl_ldatum = BoxPGetDatum(unionL);
if (v->spl_rdatum_exists)
adjustBox(unionR, DatumGetBoxP(v->spl_rdatum));
v->spl_rdatum = BoxPGetDatum(unionR);
v->spl_ldatum_exists = v->spl_rdatum_exists = false;
PG_RETURN_POINTER(v);
}
} }
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
listL = (OffsetNumber *) palloc(nbytes);
listR = (OffsetNumber *) palloc(nbytes);
listB = (OffsetNumber *) palloc(nbytes); listB = (OffsetNumber *) palloc(nbytes);
listT = (OffsetNumber *) palloc(nbytes); listT = (OffsetNumber *) palloc(nbytes);
unionL = (BOX *) palloc(sizeof(BOX));
unionR = (BOX *) palloc(sizeof(BOX));
unionB = (BOX *) palloc(sizeof(BOX)); unionB = (BOX *) palloc(sizeof(BOX));
unionT = (BOX *) palloc(sizeof(BOX)); unionT = (BOX *) palloc(sizeof(BOX));
...@@ -452,6 +485,12 @@ gist_box_picksplit(PG_FUNCTION_ARGS) ...@@ -452,6 +485,12 @@ gist_box_picksplit(PG_FUNCTION_ARGS)
else else
ADDLIST(listT, unionT, posT, i); ADDLIST(listT, unionT, posT, i);
} }
if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
{
fallbackSplit(entryvec, v);
PG_RETURN_POINTER(v);
}
} }
/* which split more optimal? */ /* which split more optimal? */
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistsplit.c,v 1.7 2009/01/01 17:23:35 momjian Exp $ * $PostgreSQL: pgsql/src/backend/access/gist/gistsplit.c,v 1.8 2009/04/06 14:27:27 teodor Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -280,6 +280,63 @@ supportSecondarySplit(Relation r, GISTSTATE *giststate, int attno, GIST_SPLITVEC ...@@ -280,6 +280,63 @@ supportSecondarySplit(Relation r, GISTSTATE *giststate, int attno, GIST_SPLITVEC
sv->spl_ldatum_exists = sv->spl_rdatum_exists = false; sv->spl_ldatum_exists = sv->spl_rdatum_exists = false;
} }
/*
* Trivial picksplit implementaion. Function called only
* if user-defined picksplit puts all keys to the one page.
* That is a bug of user-defined picksplit but we'd like
* to "fix" that.
*/
static void
genericPickSplit(GISTSTATE *giststate, GistEntryVector *entryvec, GIST_SPLITVEC *v, int attno)
{
OffsetNumber i,
maxoff;
int nbytes;
GistEntryVector *evec;
maxoff = entryvec->n - 1;
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
v->spl_left = (OffsetNumber *) palloc(nbytes);
v->spl_right = (OffsetNumber *) palloc(nbytes);
v->spl_nleft = v->spl_nright = 0;
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
if (i <= (maxoff - FirstOffsetNumber + 1) / 2)
{
v->spl_left[v->spl_nleft] = i;
v->spl_nleft++;
}
else
{
v->spl_right[v->spl_nright] = i;
v->spl_nright++;
}
}
/*
* Form unions of each page
*/
evec = palloc( sizeof(GISTENTRY) * entryvec->n + GEVHDRSZ );
evec->n = v->spl_nleft;
memcpy(evec->vector, entryvec->vector + FirstOffsetNumber,
sizeof(GISTENTRY) * evec->n);
v->spl_ldatum = FunctionCall2(&giststate->unionFn[attno],
PointerGetDatum(evec),
PointerGetDatum(&nbytes));
evec->n = v->spl_nright;
memcpy(evec->vector, entryvec->vector + FirstOffsetNumber + v->spl_nleft,
sizeof(GISTENTRY) * evec->n);
v->spl_rdatum = FunctionCall2(&giststate->unionFn[attno],
PointerGetDatum(evec),
PointerGetDatum(&nbytes));
}
/* /*
* Calls user picksplit method for attno columns to split vector to * Calls user picksplit method for attno columns to split vector to
* two vectors. May use attno+n columns data to * two vectors. May use attno+n columns data to
...@@ -296,7 +353,7 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GistSplitVec ...@@ -296,7 +353,7 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GistSplitVec
/* /*
* now let the user-defined picksplit function set up the split vector; in * now let the user-defined picksplit function set up the split vector; in
* entryvec have no null value!! * entryvec there is no null value!!
*/ */
sv->spl_ldatum_exists = (v->spl_lisnull[attno]) ? false : true; sv->spl_ldatum_exists = (v->spl_lisnull[attno]) ? false : true;
...@@ -308,18 +365,43 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GistSplitVec ...@@ -308,18 +365,43 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GistSplitVec
PointerGetDatum(entryvec), PointerGetDatum(entryvec),
PointerGetDatum(sv)); PointerGetDatum(sv));
/* compatibility with old code */ if ( sv->spl_nleft == 0 || sv->spl_nright == 0 )
if (sv->spl_left[sv->spl_nleft - 1] == InvalidOffsetNumber) {
sv->spl_left[sv->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1); ereport(DEBUG1,
if (sv->spl_right[sv->spl_nright - 1] == InvalidOffsetNumber) (errcode(ERRCODE_INTERNAL_ERROR),
sv->spl_right[sv->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1); errmsg("Picksplit method for %d column of index \"%s\" failed",
attno+1, RelationGetRelationName(r)),
errhint("Index is not optimal, to optimize it contact developer or try to use the column as a second one in create index command")));
if (sv->spl_ldatum_exists || sv->spl_rdatum_exists) /*
* Reinit GIST_SPLITVEC. Although that fields are not used
* by genericPickSplit(), let us set up it for further processing
*/
sv->spl_ldatum_exists = (v->spl_lisnull[attno]) ? false : true;
sv->spl_rdatum_exists = (v->spl_risnull[attno]) ? false : true;
sv->spl_ldatum = v->spl_lattr[attno];
sv->spl_rdatum = v->spl_rattr[attno];
genericPickSplit(giststate, entryvec, sv, attno);
if (sv->spl_ldatum_exists || sv->spl_rdatum_exists)
supportSecondarySplit(r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno]);
}
else
{ {
elog(LOG, "PickSplit method of %d columns of index '%s' doesn't support secondary split", /* compatibility with old code */
attno + 1, RelationGetRelationName(r)); if (sv->spl_left[sv->spl_nleft - 1] == InvalidOffsetNumber)
sv->spl_left[sv->spl_nleft - 1] = (OffsetNumber) (entryvec->n - 1);
if (sv->spl_right[sv->spl_nright - 1] == InvalidOffsetNumber)
sv->spl_right[sv->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);
supportSecondarySplit(r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno]); if (sv->spl_ldatum_exists || sv->spl_rdatum_exists)
{
elog(LOG, "PickSplit method of %d columns of index '%s' doesn't support secondary split",
attno + 1, RelationGetRelationName(r));
supportSecondarySplit(r, giststate, attno, sv, v->spl_lattr[attno], v->spl_rattr[attno]);
}
} }
v->spl_lattr[attno] = sv->spl_ldatum; v->spl_lattr[attno] = sv->spl_ldatum;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment