Commit d2158b02 authored by Teodor Sigaev's avatar Teodor Sigaev

* Add support NULL to GiST.

* some refactoring and simplify code int gistutil.c and gist.c
* now in some cases it can be called used-defined
  picksplit method for non-first column in index, but here
	is a place to do more.
* small fix of docs related to support NULL.
parent 86722057
<!-- $PostgreSQL: pgsql/doc/src/sgml/indexam.sgml,v 2.11 2006/05/10 23:18:38 tgl Exp $ -->
<!-- $PostgreSQL: pgsql/doc/src/sgml/indexam.sgml,v 2.12 2006/05/24 11:01:39 teodor Exp $ -->
<chapter id="indexam">
<title>Index Access Method Interface Definition</title>
......@@ -126,8 +126,7 @@
used to scan for rows with <literal>a = 4</literal>, which is wrong if the
index omits rows where <literal>b</> is null.
It is, however, OK to omit rows where the first indexed column is null.
(GiST currently does so.) Thus,
<structfield>amindexnulls</structfield> should be set true only if the
Thus, <structfield>amindexnulls</structfield> should be set true only if the
index access method indexes all rows, including arbitrary combinations of
null values.
</para>
......
<!-- $PostgreSQL: pgsql/doc/src/sgml/indices.sgml,v 1.56 2006/01/18 21:29:45 momjian Exp $ -->
<!-- $PostgreSQL: pgsql/doc/src/sgml/indices.sgml,v 1.57 2006/05/24 11:01:39 teodor Exp $ -->
<chapter id="indexes">
<title id="indexes-title">Indexes</title>
......@@ -290,13 +290,13 @@ CREATE INDEX test2_mm_idx ON test2 (major, minor);
</para>
<para>
A multicolumn GiST index can only be used when there is a query condition
on its leading column. Conditions on additional columns restrict the
entries returned by the index, but the condition on the first column is the
most important one for determining how much of the index needs to be
scanned. A GiST index will be relatively ineffective if its first column
has only a few distinct values, even if there are many distinct values in
additional columns.
A multicolumn GiST index can be used with query conditions that
involve any subset of the index's columns. Conditions on additional
columns restrict the entries returned by the index, but the condition on
the first column is the most important one for determining how much of
the index needs to be scanned. A GiST index will be relatively
ineffective if its first column has only a few distinct values, even if
there are many distinct values in additional columns.
</para>
<para>
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.136 2006/05/19 16:15:17 teodor Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.137 2006/05/24 11:01:39 teodor Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -181,32 +181,13 @@ gistbuildCallback(Relation index,
{
GISTBuildState *buildstate = (GISTBuildState *) state;
IndexTuple itup;
GISTENTRY tmpcentry;
int i;
MemoryContext oldCtx;
/* GiST cannot index tuples with leading NULLs */
if (isnull[0])
return;
oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);
/* immediately compress keys to normalize */
for (i = 0; i < buildstate->numindexattrs; i++)
{
if (isnull[i])
values[i] = (Datum) 0;
else
{
gistcentryinit(&buildstate->giststate, i, &tmpcentry, values[i],
NULL, NULL, (OffsetNumber) 0,
-1 /* size is currently bogus */ , TRUE, FALSE);
values[i] = tmpcentry.key;
}
}
/* form an index tuple and point it at the heap tuple */
itup = index_form_tuple(buildstate->giststate.tupdesc, values, isnull);
itup = gistFormTuple(&buildstate->giststate, index,
values, NULL /* size is currently bogus */, isnull);
itup->t_tid = htup->t_self;
/*
......@@ -243,34 +224,16 @@ gistinsert(PG_FUNCTION_ARGS)
#endif
IndexTuple itup;
GISTSTATE giststate;
GISTENTRY tmpentry;
int i;
MemoryContext oldCtx;
MemoryContext insertCtx;
/* GiST cannot index tuples with leading NULLs */
if (isnull[0])
PG_RETURN_BOOL(false);
insertCtx = createTempGistContext();
oldCtx = MemoryContextSwitchTo(insertCtx);
initGISTstate(&giststate, r);
/* immediately compress keys to normalize */
for (i = 0; i < r->rd_att->natts; i++)
{
if (isnull[i])
values[i] = (Datum) 0;
else
{
gistcentryinit(&giststate, i, &tmpentry, values[i],
NULL, NULL, (OffsetNumber) 0,
-1 /* size is currently bogus */ , TRUE, FALSE);
values[i] = tmpentry.key;
}
}
itup = index_form_tuple(giststate.tupdesc, values, isnull);
itup = gistFormTuple(&giststate, r,
values, NULL /* size is currently bogus */, isnull);
itup->t_tid = *ht_ctid;
gistdoinsert(r, itup, &giststate);
......@@ -937,51 +900,23 @@ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate)
}
/*
* gistSplit -- split a page in the tree.
* simple split page
*/
SplitedPageLayout *
gistSplit(Relation r,
Page page,
IndexTuple *itup, /* contains compressed entry */
int len,
GISTSTATE *giststate)
{
IndexTuple *lvectup,
*rvectup;
GIST_SPLITVEC v;
GistEntryVector *entryvec;
static void
gistSplitHalf(GIST_SPLITVEC *v, int len) {
int i;
OffsetNumber offInvTuples[ MaxOffsetNumber ];
int nOffInvTuples = 0;
SplitedPageLayout *res = NULL;
/* generate the item array */
entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
entryvec->n = len + 1;
for (i = 1; i <= len; i++)
{
Datum datum;
bool IsNull;
if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1]))
/* remember position of invalid tuple */
offInvTuples[ nOffInvTuples++ ] = i;
if ( nOffInvTuples > 0 )
/* we can safely do not decompress other keys, because
we will do splecial processing, but
it's needed to find another invalid tuples */
continue;
datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
gistdentryinit(giststate, 0, &(entryvec->vector[i]),
datum, r, page, i,
ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
FALSE, IsNull);
}
v->spl_nright = v->spl_nleft = 0;
v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
v->spl_right= (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
for(i = 1; i <= len; i++)
if ( i<len/2 )
v->spl_right[ v->spl_nright++ ] = i;
else
v->spl_left[ v->spl_nleft++ ] = i;
}
/*
/*
* if it was invalid tuple then we need special processing.
* We move all invalid tuples on right page.
*
......@@ -991,37 +926,142 @@ gistSplit(Relation r,
* Normally, we never exec this code, but after crash replay it's possible
* to get 'invalid' tuples (probability is low enough)
*/
if (nOffInvTuples > 0)
{
static void
gistSplitByInvalid(GISTSTATE *giststate, GIST_SPLITVEC *v, IndexTuple *itup, int len) {
int i;
static OffsetNumber offInvTuples[ MaxOffsetNumber ];
int nOffInvTuples = 0;
for (i = 1; i <= len; i++)
if ( GistTupleIsInvalid(itup[i - 1]) )
offInvTuples[ nOffInvTuples++ ] = i;
if ( nOffInvTuples == len ) {
/* corner case, all tuples are invalid */
v->spl_rightvalid= v->spl_leftvalid = false;
gistSplitHalf( v, len );
} else {
GistSplitVec gsvp;
v.spl_right = offInvTuples;
v.spl_nright = nOffInvTuples;
v.spl_rightvalid = false;
v->spl_right = offInvTuples;
v->spl_nright = nOffInvTuples;
v->spl_rightvalid = false;
v.spl_left = (OffsetNumber *) palloc(entryvec->n * sizeof(OffsetNumber));
v.spl_nleft = 0;
v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
v->spl_nleft = 0;
for(i = 1; i <= len; i++)
if ( !GistTupleIsInvalid(itup[i - 1]) )
v.spl_left[ v.spl_nleft++ ] = i;
v.spl_leftvalid = true;
v->spl_left[ v->spl_nleft++ ] = i;
v->spl_leftvalid = true;
gsvp.idgrp = NULL;
gsvp.attrsize = v.spl_lattrsize;
gsvp.attr = v.spl_lattr;
gsvp.len = v.spl_nleft;
gsvp.entries = v.spl_left;
gsvp.isnull = v.spl_lisnull;
gsvp.attrsize = v->spl_lattrsize;
gsvp.attr = v->spl_lattr;
gsvp.len = v->spl_nleft;
gsvp.entries = v->spl_left;
gsvp.isnull = v->spl_lisnull;
gistunionsubkeyvec(giststate, itup, &gsvp, 0);
}
}
/*
* trys to split page by attno key, in a case of null
* values move its to separate page.
*/
static void
gistSplitByKey(Relation r, Page page, IndexTuple *itup, int len, GISTSTATE *giststate,
GIST_SPLITVEC *v, GistEntryVector *entryvec, int attno) {
int i;
static OffsetNumber offNullTuples[ MaxOffsetNumber ];
int nOffNullTuples = 0;
for (i = 1; i <= len; i++) {
Datum datum;
bool IsNull;
gistunionsubkeyvec(giststate, itup, &gsvp, true);
if (!GistPageIsLeaf(page) && GistTupleIsInvalid(itup[i - 1])) {
gistSplitByInvalid(giststate, v, itup, len);
return;
}
datum = index_getattr(itup[i - 1], attno+1, giststate->tupdesc, &IsNull);
gistdentryinit(giststate, attno, &(entryvec->vector[i]),
datum, r, page, i,
ATTSIZE(datum, giststate->tupdesc, attno+1, IsNull),
FALSE, IsNull);
if ( IsNull )
offNullTuples[ nOffNullTuples++ ] = i;
}
v->spl_leftvalid = v->spl_rightvalid = true;
if ( nOffNullTuples == len ) {
/*
* Corner case: All keys in attno column are null, we should try to
* by keys in next column. It all keys in all columns
* are NULL just split page half by half
*/
v->spl_risnull[attno] = v->spl_lisnull[attno] = TRUE;
if ( attno+1 == r->rd_att->natts )
gistSplitHalf( v, len );
else
{
/* there is no invalid tuples, so usial processing */
gistUserPicksplit(r, entryvec, &v, itup, len, giststate);
v.spl_leftvalid = v.spl_rightvalid = true;
gistSplitByKey(r, page, itup, len, giststate, v, entryvec, attno+1);
} else if ( nOffNullTuples > 0 ) {
int j=0;
/*
* We don't want to mix NULLs and not-NULLs keys
* on one page, so move nulls to right page
*/
v->spl_right = offNullTuples;
v->spl_nright = nOffNullTuples;
v->spl_risnull[attno] = TRUE;
v->spl_left = (OffsetNumber *) palloc(len * sizeof(OffsetNumber));
v->spl_nleft = 0;
for(i = 1; i <= len; i++)
if ( j<v->spl_nright && offNullTuples[j] == i )
j++;
else
v->spl_left[ v->spl_nleft++ ] = i;
v->spl_idgrp = NULL;
gistunionsubkey(giststate, itup, v, 0);
} else {
/*
* all keys are not-null
*/
gistUserPicksplit(r, entryvec, attno, v, itup, len, giststate);
}
}
/*
* gistSplit -- split a page in the tree and fill struct
* used for XLOG and real writes buffers. Function is recursive, ie
* it will split page until keys will fit in every page.
*/
SplitedPageLayout *
gistSplit(Relation r,
Page page,
IndexTuple *itup, /* contains compressed entry */
int len,
GISTSTATE *giststate)
{
IndexTuple *lvectup,
*rvectup;
GIST_SPLITVEC v;
GistEntryVector *entryvec;
int i;
SplitedPageLayout *res = NULL;
/* generate the item array */
entryvec = palloc(GEVHDRSZ + (len + 1) * sizeof(GISTENTRY));
entryvec->n = len + 1;
gistSplitByKey(r, page, itup, len, giststate,
&v, entryvec, 0);
/* form left and right vector */
lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * (len + 1));
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.56 2006/03/05 15:58:20 momjian Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.57 2006/05/24 11:01:39 teodor Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -361,7 +361,7 @@ gistindex_keytest(IndexTuple tuple,
IncrIndexProcessed();
/*
* Tuple doesn't restore after crash recovery because of inclomplete
* Tuple doesn't restore after crash recovery because of incomplete
* insert
*/
if (!GistPageIsLeaf(p) && GistTupleIsInvalid(tuple))
......@@ -378,14 +378,15 @@ gistindex_keytest(IndexTuple tuple,
key->sk_attno,
giststate->tupdesc,
&isNull);
/* is the index entry NULL? */
if (isNull)
{
/* XXX eventually should check if SK_ISNULL */
if ( key->sk_flags & SK_ISNULL ) {
/* is the compared-to datum NULL? on non-leaf page it's possible
to have nulls in childs :( */
if ( isNull || !GistPageIsLeaf(p) )
return true;
return false;
}
/* is the compared-to datum NULL? */
if (key->sk_flags & SK_ISNULL)
} else if ( isNull )
return false;
gistdentryinit(giststate, key->sk_attno - 1, &de,
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.13 2006/05/19 16:15:17 teodor Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.14 2006/05/24 11:01:39 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
......@@ -26,30 +26,18 @@
#define RIGHT_ADDED 0x02
#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED )
static float gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *key1, bool isNull1,
GISTENTRY *key2, bool isNull2);
/*
* This defines is only for shorter code, used in gistgetadjusted
* and gistadjsubkey only
* static *S used for temrorary storage (saves stack and palloc() call)
*/
#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \
if (isnullkey) { \
gistentryinit((evp), rkey, r, NULL, \
(OffsetNumber) 0, rkeyb, FALSE); \
} else { \
gistentryinit((evp), okey, r, NULL, \
(OffsetNumber) 0, okeyb, FALSE); \
} \
} while(0)
#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \
FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b); \
FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b); \
} while(0);
static void gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *key1, bool isNull1,
GISTENTRY *key2, bool isNull2, float *penalty);
static int attrsizeS[INDEX_MAX_KEYS];
static Datum attrS[INDEX_MAX_KEYS];
static bool isnullS[INDEX_MAX_KEYS];
/*
* Write itup vector to page, has no control of free space
......@@ -164,87 +152,115 @@ gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) {
}
/*
* Return an IndexTuple containing the result of applying the "union"
* method to the specified IndexTuple vector.
* Make unions of keys in IndexTuple vector, return FALSE if itvec contains
* invalid tuple. Resulting Datums aren't compressed.
*/
IndexTuple
gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
{
Datum attr[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
GistEntryVector *evec;
static bool
gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey,
Datum *attr, bool *isnull, int *attrsize ) {
int i;
GISTENTRY centry[INDEX_MAX_KEYS];
IndexTuple res;
GistEntryVector *evec;
evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
for (i = 0; i < len; i++)
if (GistTupleIsInvalid(itvec[i]))
return gist_form_invalid_tuple(InvalidBlockNumber);
for (i = 0; i < r->rd_att->natts; i++)
{
Datum datum;
for (i = startkey; i < giststate->tupdesc->natts; i++) {
int j;
int real_len;
real_len = 0;
for (j = 0; j < len; j++)
{
evec->n = 0;
for (j = 0; j < len; j++) {
Datum datum;
bool IsNull;
if (GistTupleIsInvalid(itvec[j]))
return FALSE; /* signals that union with invalid tuple => result is invalid */
datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
if (IsNull)
continue;
gistdentryinit(giststate, i,
&(evec->vector[real_len]),
evec->vector + evec->n,
datum,
NULL, NULL, (OffsetNumber) 0,
ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
FALSE, IsNull);
real_len++;
evec->n++;
}
/* If this tuple vector was all NULLs, the union is NULL */
if (real_len == 0)
{
if ( evec->n == 0 ) {
attr[i] = (Datum) 0;
attrsize[i] = (Datum) 0;
isnull[i] = TRUE;
}
else
{
int datumsize;
if (real_len == 1)
{
} else {
if (evec->n == 1) {
evec->n = 2;
gistentryinit(evec->vector[1],
evec->vector[0].key, r, NULL,
(OffsetNumber) 0, evec->vector[0].bytes, FALSE);
evec->vector[1] = evec->vector[0];
}
else
evec->n = real_len;
/* Compress the result of the union and store in attr array */
datum = FunctionCall2(&giststate->unionFn[i],
/* Make union and store in attr array */
attr[i] = FunctionCall2(&giststate->unionFn[i],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
PointerGetDatum(attrsize + i));
gistcentryinit(giststate, i, &centry[i], datum,
NULL, NULL, (OffsetNumber) 0,
datumsize, FALSE, FALSE);
isnull[i] = FALSE;
attr[i] = centry[i].key;
}
}
res = index_form_tuple(giststate->tupdesc, attr, isnull);
GistTupleSetValid(res);
return res;
return TRUE;
}
/*
* Return an IndexTuple containing the result of applying the "union"
* method to the specified IndexTuple vector.
*/
IndexTuple
gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
{
if ( !gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS, attrsizeS) )
return gist_form_invalid_tuple(InvalidBlockNumber);
return gistFormTuple(giststate, r, attrS, attrsizeS, isnullS);
}
/*
* makes union of two key
*/
static void
gistMakeUnionKey( GISTSTATE *giststate, int attno,
GISTENTRY *entry1, bool isnull1,
GISTENTRY *entry2, bool isnull2,
Datum *dst, int *dstsize, bool *dstisnull ) {
static char storage[ 2 * sizeof(GISTENTRY) + GEVHDRSZ ];
GistEntryVector *evec = (GistEntryVector*)storage;
evec->n = 2;
if ( isnull1 && isnull2 ) {
*dstisnull = TRUE;
*dst = (Datum)0;
*dstsize = 0;
} else {
if ( isnull1 == FALSE && isnull2 == FALSE ) {
evec->vector[0] = *entry1;
evec->vector[1] = *entry2;
} else if ( isnull1 == FALSE ) {
evec->vector[0] = *entry1;
evec->vector[1] = *entry1;
} else {
evec->vector[0] = *entry2;
evec->vector[1] = *entry2;
}
*dstisnull = FALSE;
*dst = FunctionCall2(&giststate->unionFn[attno],
PointerGetDatum(evec),
PointerGetDatum(dstsize));
}
}
/*
* Forms union of oldtup and addtup, if union == oldtup then return NULL
......@@ -252,15 +268,9 @@ gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
IndexTuple
gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
{
GistEntryVector *evec;
bool neednew = false;
bool isnull[INDEX_MAX_KEYS];
Datum attr[INDEX_MAX_KEYS];
GISTENTRY centry[INDEX_MAX_KEYS],
oldatt[INDEX_MAX_KEYS],
addatt[INDEX_MAX_KEYS],
*ev0p,
*ev1p;
bool neednew = FALSE;
GISTENTRY oldentries[INDEX_MAX_KEYS],
addentries[INDEX_MAX_KEYS];
bool oldisnull[INDEX_MAX_KEYS],
addisnull[INDEX_MAX_KEYS];
IndexTuple newtup = NULL;
......@@ -269,147 +279,83 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
if (GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup))
return gist_form_invalid_tuple(ItemPointerGetBlockNumber(&(oldtup->t_tid)));
evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
evec->n = 2;
ev0p = &(evec->vector[0]);
ev1p = &(evec->vector[1]);
gistDeCompressAtt(giststate, r, oldtup, NULL,
(OffsetNumber) 0, oldatt, oldisnull);
(OffsetNumber) 0, oldentries, oldisnull);
gistDeCompressAtt(giststate, r, addtup, NULL,
(OffsetNumber) 0, addatt, addisnull);
(OffsetNumber) 0, addentries, addisnull);
for (i = 0; i < r->rd_att->natts; i++)
{
if (oldisnull[i] && addisnull[i])
{
attr[i] = (Datum) 0;
isnull[i] = TRUE;
}
else
{
Datum datum;
int datumsize;
for(i = 0; i < r->rd_att->natts; i++) {
gistMakeUnionKey( giststate, i,
oldentries + i, oldisnull[i],
addentries + i, addisnull[i],
attrS + i, attrsizeS + i, isnullS + i );
FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes,
addisnull[i], addatt[i].key, addatt[i].bytes);
if ( neednew )
/* we already need new key, so we can skip check */
continue;
datum = FunctionCall2(&giststate->unionFn[i],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
if ( isnullS[i] )
/* union of key may be NULL if and only if both keys are NULL */
continue;
if (oldisnull[i] || addisnull[i])
{
if (oldisnull[i])
if ( !addisnull[i] ) {
if ( oldisnull[i] )
neednew = true;
}
else
{
else {
bool result;
FunctionCall3(&giststate->equalFn[i],
ev0p->key,
datum,
oldentries[i].key,
attrS[i],
PointerGetDatum(&result));
if (!result)
neednew = true;
}
gistcentryinit(giststate, i, &centry[i], datum,
NULL, NULL, (OffsetNumber) 0,
datumsize, FALSE, FALSE);
attr[i] = centry[i].key;
isnull[i] = FALSE;
}
}
if (neednew)
{
/* need to update key */
newtup = index_form_tuple(giststate->tupdesc, attr, isnull);
newtup = gistFormTuple(giststate, r, attrS, attrsizeS, isnullS);
newtup->t_tid = oldtup->t_tid;
}
return newtup;
}
/*
* Forms unions of subkeys after page split, but
* uses only tuples aren't in groups of equalent tuples
*/
void
gistunionsubkeyvec(GISTSTATE *giststate, IndexTuple *itvec,
GistSplitVec *gsvp, bool isall) {
int i;
GistEntryVector *evec;
evec = palloc(((gsvp->len < 2) ? 2 : gsvp->len) * sizeof(GISTENTRY) + GEVHDRSZ);
GistSplitVec *gsvp, int startkey) {
IndexTuple *cleanedItVec;
int i, cleanedLen=0;
for (i = (isall) ? 0 : 1; i < giststate->tupdesc->natts; i++)
{
int j;
Datum datum;
int datumsize;
int real_len;
real_len = 0;
for (j = 0; j < gsvp->len; j++)
{
bool IsNull;
cleanedItVec = (IndexTuple*)palloc(sizeof(IndexTuple) * gsvp->len);
if ( gsvp->idgrp && gsvp->idgrp[gsvp->entries[j]])
for(i=0;i<gsvp->len;i++) {
if ( gsvp->idgrp && gsvp->idgrp[gsvp->entries[i]])
continue;
datum = index_getattr(itvec[gsvp->entries[j] - 1], i + 1,
giststate->tupdesc, &IsNull);
if (IsNull)
continue;
gistdentryinit(giststate, i,
&(evec->vector[real_len]),
datum,
NULL, NULL, (OffsetNumber) 0,
ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
FALSE, IsNull);
real_len++;
cleanedItVec[cleanedLen++] = itvec[gsvp->entries[i] - 1];
}
if (real_len == 0)
{
datum = (Datum) 0;
datumsize = 0;
gsvp->isnull[i] = true;
}
else
{
/*
* evec->vector[0].bytes may be not defined, so form union
* with itself
*/
if (real_len == 1)
{
evec->n = 2;
memcpy(&(evec->vector[1]), &(evec->vector[0]),
sizeof(GISTENTRY));
}
else
evec->n = real_len;
datum = FunctionCall2(&giststate->unionFn[i],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
gsvp->isnull[i] = false;
}
gistMakeUnionItVec(giststate, cleanedItVec, cleanedLen, startkey,
gsvp->attr, gsvp->isnull, gsvp->attrsize);
gsvp->attr[i] = datum;
gsvp->attrsize[i] = datumsize;
}
pfree( cleanedItVec );
}
/*
* unions subkey for after user picksplit over first column
* unions subkeys for after user picksplit over attno-1 column
*/
static void
gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
void
gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl, int attno)
{
GistSplitVec gsvp;
......@@ -421,7 +367,7 @@ gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
gsvp.entries = spl->spl_left;
gsvp.isnull = spl->spl_lisnull;
gistunionsubkeyvec(giststate, itvec, &gsvp, false);
gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
gsvp.attrsize = spl->spl_rattrsize;
gsvp.attr = spl->spl_rattr;
......@@ -429,20 +375,20 @@ gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
gsvp.entries = spl->spl_right;
gsvp.isnull = spl->spl_risnull;
gistunionsubkeyvec(giststate, itvec, &gsvp, false);
gistunionsubkeyvec(giststate, itvec, &gsvp, attno);
}
/*
* find group in vector with equal value
*/
int
gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
static int
gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl, int attno)
{
int i;
int curid = 1;
/*
* first key is always not null (see gistinsert), so we may not check for
* attno key is always not null (see gistSplitByKey), so we may not check for
* nulls
*/
for (i = 0; i < spl->spl_nleft; i++)
......@@ -459,7 +405,7 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
{
if (spl->spl_idgrp[spl->spl_right[j]])
continue;
FunctionCall3(&giststate->equalFn[0],
FunctionCall3(&giststate->equalFn[attno],
valvec[spl->spl_left[i]].key,
valvec[spl->spl_right[j]].key,
PointerGetDatum(&result));
......@@ -479,7 +425,7 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
{
if (spl->spl_idgrp[spl->spl_left[j]])
continue;
FunctionCall3(&giststate->equalFn[0],
FunctionCall3(&giststate->equalFn[attno],
valvec[spl->spl_left[i]].key,
valvec[spl->spl_left[j]].key,
PointerGetDatum(&result));
......@@ -501,23 +447,20 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
* Insert equivalent tuples to left or right page with minimum
* penalty
*/
void
static void
gistadjsubkey(Relation r,
IndexTuple *itup, /* contains compressed entry */
int len,
GIST_SPLITVEC *v,
GISTSTATE *giststate)
GISTSTATE *giststate,
int attno)
{
int curlen;
OffsetNumber *curwpos;
GISTENTRY entry,
identry[INDEX_MAX_KEYS],
*ev0p,
*ev1p;
float lpenalty,
rpenalty;
GistEntryVector *evec;
int datumsize;
identry[INDEX_MAX_KEYS];
float lpenalty = 0,
rpenalty = 0;
bool isnull[INDEX_MAX_KEYS];
int i,
j;
......@@ -551,16 +494,9 @@ gistadjsubkey(Relation r,
}
v->spl_nright = curlen;
evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
evec->n = 2;
ev0p = &(evec->vector[0]);
ev1p = &(evec->vector[1]);
/* add equivalent tuple */
for (i = 0; i < len; i++)
{
Datum datum;
if (v->spl_idgrp[i + 1] == 0) /* already inserted */
continue;
gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0,
......@@ -577,17 +513,17 @@ gistadjsubkey(Relation r,
else
{
/* where? */
for (j = 1; j < r->rd_att->natts; j++)
for (j = attno+1; j < r->rd_att->natts; j++)
{
gistentryinit(entry, v->spl_lattr[j], r, NULL,
(OffsetNumber) 0, v->spl_lattrsize[j], FALSE);
gistpenalty(giststate, j, &entry, v->spl_lisnull[j],
&identry[j], isnull[j], &lpenalty);
lpenalty = gistpenalty(giststate, j, &entry, v->spl_lisnull[j],
&identry[j], isnull[j]);
gistentryinit(entry, v->spl_rattr[j], r, NULL,
(OffsetNumber) 0, v->spl_rattrsize[j], FALSE);
gistpenalty(giststate, j, &entry, v->spl_risnull[j],
&identry[j], isnull[j], &rpenalty);
rpenalty = gistpenalty(giststate, j, &entry, v->spl_risnull[j],
&identry[j], isnull[j]);
if (lpenalty != rpenalty)
break;
......@@ -600,55 +536,31 @@ gistadjsubkey(Relation r,
if (lpenalty < rpenalty)
{
v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED;
v->spl_left[v->spl_nleft] = i + 1;
v->spl_nleft++;
for (j = 1; j < r->rd_att->natts; j++)
{
if (isnull[j] && v->spl_lisnull[j])
{
v->spl_lattr[j] = (Datum) 0;
v->spl_lattrsize[j] = 0;
}
else
{
FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j],
isnull[j], identry[j].key, identry[j].bytes);
v->spl_left[v->spl_nleft++] = i + 1;
datum = FunctionCall2(&giststate->unionFn[j],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
v->spl_lattr[j] = datum;
v->spl_lattrsize[j] = datumsize;
v->spl_lisnull[j] = false;
}
for (j = attno+1; j < r->rd_att->natts; j++)
{
gistentryinit(entry, v->spl_lattr[j], r, NULL,
(OffsetNumber) 0, v->spl_lattrsize[j], FALSE);
gistMakeUnionKey( giststate, j,
&entry, v->spl_lisnull[j],
identry + j, isnull[j],
v->spl_lattr + j, v->spl_lattrsize + j, v->spl_lisnull + j );
}
}
else
{
v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED;
v->spl_right[v->spl_nright] = i + 1;
v->spl_nright++;
for (j = 1; j < r->rd_att->natts; j++)
{
if (isnull[j] && v->spl_risnull[j])
{
v->spl_rattr[j] = (Datum) 0;
v->spl_rattrsize[j] = 0;
}
else
{
FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j],
isnull[j], identry[j].key, identry[j].bytes);
datum = FunctionCall2(&giststate->unionFn[j],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
v->spl_right[v->spl_nright++] = i + 1;
v->spl_rattr[j] = datum;
v->spl_rattrsize[j] = datumsize;
v->spl_risnull[j] = false;
}
for (j = attno+1; j < r->rd_att->natts; j++)
{
gistentryinit(entry, v->spl_rattr[j], r, NULL,
(OffsetNumber) 0, v->spl_rattrsize[j], FALSE);
gistMakeUnionKey( giststate, j,
&entry, v->spl_risnull[j],
identry + j, isnull[j],
v->spl_rattr + j, v->spl_rattrsize + j, v->spl_risnull + j );
}
}
}
......@@ -702,8 +614,8 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
gistdentryinit(giststate, j, &entry, datum, r, p, i,
ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull),
FALSE, IsNull);
gistpenalty(giststate, j, &entry, IsNull,
&identry[j], isnull[j], &usize);
usize = gistpenalty(giststate, j, &entry, IsNull,
&identry[j], isnull[j]);
if (which_grow[j] < 0 || usize < which_grow[j])
{
......@@ -796,8 +708,10 @@ gistFormTuple(GISTSTATE *giststate, Relation r,
else
{
gistcentryinit(giststate, i, &centry[i], attdata[i],
NULL, NULL, (OffsetNumber) 0,
datumsize[i], FALSE, FALSE);
r, NULL, (OffsetNumber) 0,
(datumsize) ? datumsize[i] : -1,
(datumsize) ? FALSE : TRUE,
FALSE);
compatt[i] = centry[i].key;
}
}
......@@ -824,29 +738,35 @@ gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
}
}
static void
static float
gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *key1, bool isNull1,
GISTENTRY *key2, bool isNull2, float *penalty)
GISTENTRY *orig, bool isNullOrig,
GISTENTRY *add, bool isNullAdd)
{
if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2))
*penalty = 0.0;
else
float penalty = 0.0;
if ( giststate->penaltyFn[attno].fn_strict==FALSE || ( isNullOrig == FALSE && isNullAdd == FALSE ) )
FunctionCall3(&giststate->penaltyFn[attno],
PointerGetDatum(key1),
PointerGetDatum(key2),
PointerGetDatum(penalty));
PointerGetDatum(orig),
PointerGetDatum(add),
PointerGetDatum(&penalty));
else if ( isNullOrig && isNullAdd )
penalty = 0.0;
else
penalty = 1e10; /* try to prevent to mix null and non-null value */
return penalty;
}
void
gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GIST_SPLITVEC *v,
IndexTuple *itup, int len, GISTSTATE *giststate)
{
/*
* now let the user-defined picksplit function set up the split vector; in
* entryvec have no null value!!
*/
FunctionCall2(&giststate->picksplitFn[0],
FunctionCall2(&giststate->picksplitFn[attno],
PointerGetDatum(entryvec),
PointerGetDatum(v));
......@@ -856,16 +776,16 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
if (v->spl_right[v->spl_nright - 1] == InvalidOffsetNumber)
v->spl_right[v->spl_nright - 1] = (OffsetNumber) (entryvec->n - 1);
v->spl_lattr[0] = v->spl_ldatum;
v->spl_rattr[0] = v->spl_rdatum;
v->spl_lisnull[0] = false;
v->spl_risnull[0] = false;
v->spl_lattr[attno] = v->spl_ldatum;
v->spl_rattr[attno] = v->spl_rdatum;
v->spl_lisnull[attno] = false;
v->spl_risnull[attno] = false;
/*
* if index is multikey, then we must to try get smaller bounding box for
* subkey(s)
*/
if (giststate->tupdesc->natts > 1)
if (giststate->tupdesc->natts > 1 && attno+1 != giststate->tupdesc->natts)
{
int MaxGrpId;
......@@ -873,17 +793,17 @@ gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
v->spl_grpflag = (char *) palloc0(sizeof(char) * entryvec->n);
v->spl_ngrp = (int *) palloc(sizeof(int) * entryvec->n);
MaxGrpId = gistfindgroup(giststate, entryvec->vector, v);
MaxGrpId = gistfindgroup(giststate, entryvec->vector, v, attno);
/* form union of sub keys for each page (l,p) */
gistunionsubkey(giststate, itup, v);
gistunionsubkey(giststate, itup, v, attno + 1);
/*
* if possible, we insert equivalent tuples with control by penalty
* for a subkey(s)
*/
if (MaxGrpId > 1)
gistadjsubkey(r, itup, len, v, giststate);
gistadjsubkey(r, itup, len, v, giststate, attno);
}
}
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.15 2006/05/19 16:15:17 teodor Exp $
* $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.16 2006/05/24 11:01:39 teodor Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -206,17 +206,6 @@ typedef struct
/* root page of a gist index */
#define GIST_ROOT_BLKNO 0
/*
* When we update a relation on which we're doing a scan, we need to
* check the scan and fix it if the update affected any of the pages
* it touches. Otherwise, we can miss records that we should see.
* The only times we need to do this are for deletions and splits. See
* the code in gistscan.c for how the scan is fixed. These two
* constants tell us what sort of operation changed the index.
*/
#define GISTOP_DEL 0
/* #define GISTOP_SPLIT 1 */
#define ATTSIZE(datum, tupdesc, i, isnull) \
( \
(isnull) ? 0 : \
......@@ -291,12 +280,6 @@ extern IndexTuple gistgetadjusted(Relation r,
IndexTuple oldtup,
IndexTuple addtup,
GISTSTATE *giststate);
extern int gistfindgroup(GISTSTATE *giststate,
GISTENTRY *valvec, GIST_SPLITVEC *spl);
extern void gistadjsubkey(Relation r,
IndexTuple *itup, int len,
GIST_SPLITVEC *v,
GISTSTATE *giststate);
extern IndexTuple gistFormTuple(GISTSTATE *giststate,
Relation r, Datum *attdata, int *datumsize, bool *isnull);
......@@ -321,13 +304,15 @@ typedef struct {
} GistSplitVec;
extern void gistunionsubkeyvec(GISTSTATE *giststate,
IndexTuple *itvec, GistSplitVec *gsvp, bool isall);
IndexTuple *itvec, GistSplitVec *gsvp, int startkey);
extern void gistunionsubkey(GISTSTATE *giststate, IndexTuple *itvec,
GIST_SPLITVEC *spl, int attno);
extern void GISTInitBuffer(Buffer b, uint32 f);
extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
Datum k, Relation r, Page pg, OffsetNumber o,
int b, bool l, bool isNull);
void gistUserPicksplit(Relation r, GistEntryVector *entryvec, GIST_SPLITVEC *v,
void gistUserPicksplit(Relation r, GistEntryVector *entryvec, int attno, GIST_SPLITVEC *v,
IndexTuple *itup, int len, GISTSTATE *giststate);
/* gistvacuum.c */
......
......@@ -37,7 +37,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.333 2006/05/19 19:08:26 alvherre Exp $
* $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.334 2006/05/24 11:01:39 teodor Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 200605191
#define CATALOG_VERSION_NO 200605241
#endif
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.42 2006/05/02 22:25:10 tgl Exp $
* $PostgreSQL: pgsql/src/include/catalog/pg_am.h,v 1.43 2006/05/24 11:01:39 teodor Exp $
*
* NOTES
* the genbki.sh script reads this file and generates .bki
......@@ -114,7 +114,7 @@ DESCR("b-tree index access method");
DATA(insert OID = 405 ( hash 1 1 0 f f f f f t f hashinsert hashbeginscan hashgettuple hashgetmulti hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbulkdelete hashvacuumcleanup hashcostestimate ));
DESCR("hash index access method");
#define HASH_AM_OID 405
DATA(insert OID = 783 ( gist 100 7 0 f t f f t t t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate ));
DATA(insert OID = 783 ( gist 100 7 0 f t t t t t t gistinsert gistbeginscan gistgettuple gistgetmulti gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbulkdelete gistvacuumcleanup gistcostestimate ));
DESCR("GiST index access method");
#define GIST_AM_OID 783
DATA(insert OID = 2742 ( gin 100 4 0 f f f f t t f gininsert ginbeginscan gingettuple gingetmulti ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbulkdelete ginvacuumcleanup gincostestimate ));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment