Commit eda6dd32 authored by Neil Conway's avatar Neil Conway

GiST improvements:

- make sure we always invoke user-supplied GiST methods in a short-lived
  memory context. This means the backend isn't exposed to any memory leaks
  that be in those methods (in fact, it is probably a net loss for most
  GiST methods to bother manually freeing memory now). This also means
  we can do away with a lot of ugly manual memory management in the
  GiST code itself.

- keep the current page of a GiST index scan pinned, rather than doing a
  ReadBuffer() for each tuple produced by the scan. Since ReadBuffer() is
  expensive, this is a perf. win

- implement dead tuple killing for GiST indexes (which is easy to do, now
  that we keep a pin on the current scan page). Now all the builtin indexes
  implement dead tuple killing.

- cleanup a lot of ugly code in GiST
parent 818bfda1
<!--
$PostgreSQL: pgsql/doc/src/sgml/gist.sgml,v 1.17 2005/04/09 03:52:43 momjian Exp $
$PostgreSQL: pgsql/doc/src/sgml/gist.sgml,v 1.18 2005/05/17 00:59:30 neilc Exp $
-->
<chapter id="GiST">
......@@ -202,7 +202,7 @@ $PostgreSQL: pgsql/doc/src/sgml/gist.sgml,v 1.17 2005/04/09 03:52:43 momjian Exp
<para>
The lack of write-ahead logging is just a small matter of programming,
but since it isn't done yet, a crash could render a <acronym>GiST</acronym>
index inconsistent, forcing a REINDEX.
index inconsistent, forcing a <command>REINDEX</command>.
</para>
</sect1>
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.115 2005/05/15 04:08:29 neilc Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.116 2005/05/17 00:59:30 neilc Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -21,6 +21,7 @@
#include "catalog/index.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "utils/memutils.h"
#undef GIST_PAGEADDITEM
......@@ -45,13 +46,13 @@
* and gistadjsubkey only
*/
#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \
if (isnullkey) { \
gistentryinit((evp), rkey, r, NULL, \
(OffsetNumber) 0, rkeyb, FALSE); \
} else { \
gistentryinit((evp), okey, r, NULL, \
(OffsetNumber) 0, okeyb, FALSE); \
} \
if (isnullkey) { \
gistentryinit((evp), rkey, r, NULL, \
(OffsetNumber) 0, rkeyb, FALSE); \
} else { \
gistentryinit((evp), okey, r, NULL, \
(OffsetNumber) 0, okeyb, FALSE); \
} \
} while(0)
#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \
......@@ -65,6 +66,7 @@ typedef struct
GISTSTATE giststate;
int numindexattrs;
double indtuples;
MemoryContext tmpCxt;
} GISTBuildState;
......@@ -128,9 +130,8 @@ static void gistcentryinit(GISTSTATE *giststate, int nkey,
Relation r, Page pg,
OffsetNumber o, int b, bool l, bool isNull);
static void gistDeCompressAtt(GISTSTATE *giststate, Relation r,
IndexTuple tuple, Page p, OffsetNumber o,
GISTENTRY *attdata, bool *decompvec, bool *isnull);
static void gistFreeAtt(Relation r, GISTENTRY *attdata, bool *decompvec);
IndexTuple tuple, Page p, OffsetNumber o,
GISTENTRY *attdata, bool *isnull);
static void gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *key1, bool isNull1,
GISTENTRY *key2, bool isNull2,
......@@ -143,7 +144,28 @@ static void gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber c
#endif
/*
* routine to build an index. Basically calls insert over and over
* Create and return a temporary memory context for use by GiST. We
* _always_ invoke user-provided methods in a temporary memory
* context, so that memory leaks in those functions cannot cause
* problems. Also, we use some additional temporary contexts in the
* GiST code itself, to avoid the need to do some awkward manual
* memory management.
*/
MemoryContext
createTempGistContext(void)
{
return AllocSetContextCreate(CurrentMemoryContext,
"GiST temporary context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
}
/*
* Routine to build an index. Basically calls insert over and over.
*
* XXX: it would be nice to implement some sort of bulk-loading
* algorithm, but it is not clear how to do that.
*/
Datum
gistbuild(PG_FUNCTION_ARGS)
......@@ -155,10 +177,6 @@ gistbuild(PG_FUNCTION_ARGS)
GISTBuildState buildstate;
Buffer buffer;
/* no locking is needed */
initGISTstate(&buildstate.giststate, index);
/*
* We expect to be called exactly once for any index relation. If
* that's not the case, big trouble's what we have.
......@@ -167,6 +185,9 @@ gistbuild(PG_FUNCTION_ARGS)
elog(ERROR, "index \"%s\" already contains data",
RelationGetRelationName(index));
/* no locking is needed */
initGISTstate(&buildstate.giststate, index);
/* initialize the root page */
buffer = ReadBuffer(index, P_NEW);
GISTInitBuffer(buffer, F_LEAF);
......@@ -175,21 +196,27 @@ gistbuild(PG_FUNCTION_ARGS)
/* build the index */
buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs;
buildstate.indtuples = 0;
/*
* create a temporary memory context that is reset once for each
* tuple inserted into the index
*/
buildstate.tmpCxt = createTempGistContext();
/* do the heap scan */
reltuples = IndexBuildHeapScan(heap, index, indexInfo,
gistbuildCallback, (void *) &buildstate);
gistbuildCallback, (void *) &buildstate);
/* okay, all heap tuples are indexed */
MemoryContextDelete(buildstate.tmpCxt);
/* since we just counted the # of tuples, may as well update stats */
IndexCloseAndUpdateStats(heap, reltuples, index, buildstate.indtuples);
freeGISTstate(&buildstate.giststate);
#ifdef GISTDEBUG
gist_dumptree(index, 0, GISTP_ROOT, 0);
gist_dumptree(index, 0, GIST_ROOT_BLKNO, 0);
#endif
PG_RETURN_VOID();
}
......@@ -206,32 +233,26 @@ gistbuildCallback(Relation index,
{
GISTBuildState *buildstate = (GISTBuildState *) state;
IndexTuple itup;
bool compvec[INDEX_MAX_KEYS];
GISTENTRY tmpcentry;
int i;
MemoryContext oldCxt;
/* GiST cannot index tuples with leading NULLs */
if (isnull[0])
return;
oldCxt = MemoryContextSwitchTo(buildstate->tmpCxt);
/* immediately compress keys to normalize */
for (i = 0; i < buildstate->numindexattrs; i++)
{
if (isnull[i])
{
values[i] = (Datum) 0;
compvec[i] = FALSE;
}
else
{
gistcentryinit(&buildstate->giststate, i, &tmpcentry, values[i],
NULL, NULL, (OffsetNumber) 0,
-1 /* size is currently bogus */ , TRUE, FALSE);
if (values[i] != tmpcentry.key &&
!(isAttByVal(&buildstate->giststate, i)))
compvec[i] = TRUE;
else
compvec[i] = FALSE;
-1 /* size is currently bogus */, TRUE, FALSE);
values[i] = tmpcentry.key;
}
}
......@@ -250,12 +271,8 @@ gistbuildCallback(Relation index,
gistdoinsert(index, itup, &buildstate->giststate);
buildstate->indtuples += 1;
for (i = 0; i < buildstate->numindexattrs; i++)
if (compvec[i])
pfree(DatumGetPointer(values[i]));
pfree(itup);
MemoryContextSwitchTo(oldCxt);
MemoryContextReset(buildstate->tmpCxt);
}
/*
......@@ -271,7 +288,6 @@ gistinsert(PG_FUNCTION_ARGS)
Datum *values = (Datum *) PG_GETARG_POINTER(1);
bool *isnull = (bool *) PG_GETARG_POINTER(2);
ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
#ifdef NOT_USED
Relation heapRel = (Relation) PG_GETARG_POINTER(4);
bool checkUnique = PG_GETARG_BOOL(5);
......@@ -280,7 +296,8 @@ gistinsert(PG_FUNCTION_ARGS)
GISTSTATE giststate;
GISTENTRY tmpentry;
int i;
bool compvec[INDEX_MAX_KEYS];
MemoryContext oldCxt;
MemoryContext insertCxt;
/*
* Since GIST is not marked "amconcurrent" in pg_am, caller should
......@@ -292,25 +309,21 @@ gistinsert(PG_FUNCTION_ARGS)
if (isnull[0])
PG_RETURN_BOOL(false);
insertCxt = createTempGistContext();
oldCxt = MemoryContextSwitchTo(insertCxt);
initGISTstate(&giststate, r);
/* immediately compress keys to normalize */
for (i = 0; i < r->rd_att->natts; i++)
{
if (isnull[i])
{
values[i] = (Datum) 0;
compvec[i] = FALSE;
}
else
{
gistcentryinit(&giststate, i, &tmpentry, values[i],
NULL, NULL, (OffsetNumber) 0,
-1 /* size is currently bogus */ , TRUE, FALSE);
if (values[i] != tmpentry.key && !(isAttByVal(&giststate, i)))
compvec[i] = TRUE;
else
compvec[i] = FALSE;
-1 /* size is currently bogus */, TRUE, FALSE);
values[i] = tmpentry.key;
}
}
......@@ -319,11 +332,10 @@ gistinsert(PG_FUNCTION_ARGS)
gistdoinsert(r, itup, &giststate);
for (i = 0; i < r->rd_att->natts; i++)
if (compvec[i] == TRUE)
pfree(DatumGetPointer(values[i]));
pfree(itup);
/* cleanup */
freeGISTstate(&giststate);
MemoryContextSwitchTo(oldCxt);
MemoryContextDelete(insertCxt);
PG_RETURN_BOOL(true);
}
......@@ -370,36 +382,29 @@ gistPageAddItem(GISTSTATE *giststate,
if (retval == InvalidOffsetNumber)
elog(ERROR, "failed to add index item to \"%s\"",
RelationGetRelationName(r));
/* be tidy */
if (DatumGetPointer(tmpcentry.key) != NULL &&
tmpcentry.key != dentry->key &&
tmpcentry.key != datum)
pfree(DatumGetPointer(tmpcentry.key));
return (retval);
return retval;
}
#endif
/*
* Workhouse routine for doing insertion into a GiST index. Note that
* this routine assumes it is invoked in a short-lived memory context,
* so it does not bother releasing palloc'd allocations.
*/
static void
gistdoinsert(Relation r,
IndexTuple itup,
GISTSTATE *giststate)
gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate)
{
IndexTuple *instup;
int i,
ret,
int ret,
len = 1;
instup = (IndexTuple *) palloc(sizeof(IndexTuple));
instup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
memcpy(instup[0], itup, IndexTupleSize(itup));
ret = gistlayerinsert(r, GISTP_ROOT, &instup, &len, giststate);
ret = gistlayerinsert(r, GIST_ROOT_BLKNO, &instup, &len, giststate);
if (ret & SPLITED)
gistnewroot(r, instup, len);
for (i = 0; i < len; i++)
pfree(instup[i]);
pfree(instup);
}
static int
......@@ -410,7 +415,6 @@ gistlayerinsert(Relation r, BlockNumber blkno,
{
Buffer buffer;
Page page;
OffsetNumber child;
int ret;
GISTPageOpaque opaque;
......@@ -420,12 +424,20 @@ gistlayerinsert(Relation r, BlockNumber blkno,
if (!(opaque->flags & F_LEAF))
{
/* internal page, so we must walk on tree */
/* len IS equal 1 */
/*
* This is an internal page, so continue to walk down the
* tree. We find the child node that has the minimum insertion
* penalty and recursively invoke ourselves to modify that
* node. Once the recursive call returns, we may need to
* adjust the parent node for two reasons: the child node
* split, or the key in this node needs to be adjusted for the
* newly inserted key below us.
*/
ItemId iid;
BlockNumber nblkno;
ItemPointerData oldtid;
IndexTuple oldtup;
OffsetNumber child;
child = gistchoose(r, page, *(*itup), giststate);
iid = PageGetItemId(page, child);
......@@ -446,7 +458,7 @@ gistlayerinsert(Relation r, BlockNumber blkno,
return 0x00;
}
/* child does not splited */
/* child did not split */
if (!(ret & SPLITED))
{
IndexTuple newtup = gistgetadjusted(r, oldtup, (*itup)[0], giststate);
......@@ -458,11 +470,15 @@ gistlayerinsert(Relation r, BlockNumber blkno,
return 0x00;
}
pfree((*itup)[0]); /* !!! */
(*itup)[0] = newtup;
}
/* key is modified, so old version must be deleted */
/*
* This node's key has been modified, either because a child
* split occurred or because we needed to adjust our key for
* an insert in a child node. Therefore, remove the old
* version of this node's key.
*/
ItemPointerSet(&oldtid, blkno, child);
gistdelete(r, &oldtid);
......@@ -491,11 +507,6 @@ gistlayerinsert(Relation r, BlockNumber blkno,
oldlen = *len;
newitup = gistSplit(r, buffer, itvec, &tlen, giststate);
ReleaseBuffer(buffer);
do
pfree((*itup)[oldlen - 1]);
while ((--oldlen) > 0);
pfree((*itup));
pfree(itvec);
*itup = newitup;
*len = tlen; /* now tlen >= 2 */
}
......@@ -509,23 +520,17 @@ gistlayerinsert(Relation r, BlockNumber blkno,
FirstOffsetNumber
:
OffsetNumberNext(PageGetMaxOffsetNumber(page));
l = gistwritebuffer(r, page, (*itup), *len, off);
l = gistwritebuffer(r, page, *itup, *len, off);
WriteBuffer(buffer);
if (*len > 1)
{ /* previous insert ret & SPLITED != 0 */
int i;
/*
* child was splited, so we must form union for insertion in
* parent
*/
IndexTuple newtup = gistunion(r, (*itup), *len, giststate);
ItemPointerSet(&(newtup->t_tid), blkno, 1);
for (i = 0; i < *len; i++)
pfree((*itup)[i]);
(*itup)[0] = newtup;
*len = 1;
}
......@@ -544,23 +549,17 @@ gistwritebuffer(Relation r, Page page, IndexTuple *itup,
OffsetNumber l = InvalidOffsetNumber;
int i;
#ifdef GIST_PAGEADDITEM
GISTENTRY tmpdentry;
IndexTuple newtup;
bool IsNull;
#endif
for (i = 0; i < len; i++)
{
#ifdef GIST_PAGEADDITEM
GISTENTRY tmpdentry;
IndexTuple newtup;
bool IsNull;
l = gistPageAddItem(giststate, r, page,
(Item) itup[i], IndexTupleSize(itup[i]),
off, LP_USED, &tmpdentry, &newtup);
off = OffsetNumberNext(off);
if (DatumGetPointer(tmpdentry.key) != NULL &&
tmpdentry.key != index_getattr(itup[i], 1, r->rd_att, &IsNull))
pfree(DatumGetPointer(tmpdentry.key));
if (itup[i] != newtup)
pfree(newtup);
#else
l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]),
off, LP_USED);
......@@ -620,101 +619,77 @@ gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
}
/*
* return union of itup vector
* Return an IndexTuple containing the result of applying the "union"
* method to the specified IndexTuple vector.
*/
static IndexTuple
gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
{
Datum attr[INDEX_MAX_KEYS];
bool whatfree[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
GistEntryVector *evec;
Datum datum;
int datumsize,
i,
j;
int i;
GISTENTRY centry[INDEX_MAX_KEYS];
bool *needfree;
IndexTuple newtup;
bool IsNull;
int reallen;
needfree = (bool *) palloc(((len == 1) ? 2 : len) * sizeof(bool));
evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
for (j = 0; j < r->rd_att->natts; j++)
for (i = 0; i < r->rd_att->natts; i++)
{
reallen = 0;
for (i = 0; i < len; i++)
Datum datum;
int j;
int real_len;
real_len = 0;
for (j = 0; j < len; j++)
{
datum = index_getattr(itvec[i], j + 1, giststate->tupdesc, &IsNull);
bool IsNull;
datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
if (IsNull)
continue;
gistdentryinit(giststate, j,
&(evec->vector[reallen]),
gistdentryinit(giststate, i,
&(evec->vector[real_len]),
datum,
NULL, NULL, (OffsetNumber) 0,
ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), FALSE, IsNull);
if ((!isAttByVal(giststate, j)) &&
evec->vector[reallen].key != datum)
needfree[reallen] = TRUE;
else
needfree[reallen] = FALSE;
reallen++;
ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
FALSE, IsNull);
real_len++;
}
if (reallen == 0)
/* If this tuple vector was all NULLs, the union is NULL */
if (real_len == 0)
{
attr[j] = (Datum) 0;
isnull[j] = TRUE;
whatfree[j] = FALSE;
attr[i] = (Datum) 0;
isnull[i] = TRUE;
}
else
{
if (reallen == 1)
int datumsize;
if (real_len == 1)
{
evec->n = 2;
gistentryinit(evec->vector[1],
evec->vector[0].key, r, NULL,
(OffsetNumber) 0, evec->vector[0].bytes, FALSE);
(OffsetNumber) 0, evec->vector[0].bytes, FALSE);
}
else
evec->n = reallen;
datum = FunctionCall2(&giststate->unionFn[j],
evec->n = real_len;
/* Compress the result of the union and store in attr array */
datum = FunctionCall2(&giststate->unionFn[i],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
for (i = 0; i < reallen; i++)
if (needfree[i])
pfree(DatumGetPointer(evec->vector[i].key));
gistcentryinit(giststate, j, &centry[j], datum,
gistcentryinit(giststate, i, &centry[i], datum,
NULL, NULL, (OffsetNumber) 0,
datumsize, FALSE, FALSE);
isnull[j] = FALSE;
attr[j] = centry[j].key;
if (!isAttByVal(giststate, j))
{
whatfree[j] = TRUE;
if (centry[j].key != datum)
pfree(DatumGetPointer(datum));
}
else
whatfree[j] = FALSE;
isnull[i] = FALSE;
attr[i] = centry[i].key;
}
}
pfree(evec);
pfree(needfree);
newtup = index_form_tuple(giststate->tupdesc, attr, isnull);
for (j = 0; j < r->rd_att->natts; j++)
if (whatfree[j])
pfree(DatumGetPointer(attr[j]));
return newtup;
return index_form_tuple(giststate->tupdesc, attr, isnull);
}
......@@ -725,24 +700,18 @@ static IndexTuple
gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
{
GistEntryVector *evec;
Datum datum;
int datumsize;
bool result,
neednew = false;
bool isnull[INDEX_MAX_KEYS],
whatfree[INDEX_MAX_KEYS];
bool neednew = false;
bool isnull[INDEX_MAX_KEYS];
Datum attr[INDEX_MAX_KEYS];
GISTENTRY centry[INDEX_MAX_KEYS],
oldatt[INDEX_MAX_KEYS],
addatt[INDEX_MAX_KEYS],
*ev0p,
*ev1p;
bool olddec[INDEX_MAX_KEYS],
adddec[INDEX_MAX_KEYS];
bool oldisnull[INDEX_MAX_KEYS],
addisnull[INDEX_MAX_KEYS];
IndexTuple newtup = NULL;
int j;
int i;
evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
evec->n = 2;
......@@ -750,39 +719,40 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
ev1p = &(evec->vector[1]);
gistDeCompressAtt(giststate, r, oldtup, NULL,
(OffsetNumber) 0, oldatt, olddec, oldisnull);
(OffsetNumber) 0, oldatt, oldisnull);
gistDeCompressAtt(giststate, r, addtup, NULL,
(OffsetNumber) 0, addatt, adddec, addisnull);
(OffsetNumber) 0, addatt, addisnull);
for (j = 0; j < r->rd_att->natts; j++)
for (i = 0; i < r->rd_att->natts; i++)
{
if (oldisnull[j] && addisnull[j])
if (oldisnull[i] && addisnull[i])
{
attr[j] = (Datum) 0;
isnull[j] = TRUE;
whatfree[j] = FALSE;
attr[i] = (Datum) 0;
isnull[i] = TRUE;
}
else
{
FILLEV(
oldisnull[j], oldatt[j].key, oldatt[j].bytes,
addisnull[j], addatt[j].key, addatt[j].bytes
);
Datum datum;
int datumsize;
datum = FunctionCall2(&giststate->unionFn[j],
FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes,
addisnull[i], addatt[i].key, addatt[i].bytes);
datum = FunctionCall2(&giststate->unionFn[i],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
if (oldisnull[j] || addisnull[j])
if (oldisnull[i] || addisnull[i])
{
if (oldisnull[j])
if (oldisnull[i])
neednew = true;
}
else
{
FunctionCall3(&giststate->equalFn[j],
bool result;
FunctionCall3(&giststate->equalFn[i],
ev0p->key,
datum,
PointerGetDatum(&result));
......@@ -791,28 +761,14 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
neednew = true;
}
if (olddec[j])
pfree(DatumGetPointer(oldatt[j].key));
if (adddec[j])
pfree(DatumGetPointer(addatt[j].key));
gistcentryinit(giststate, j, &centry[j], datum,
gistcentryinit(giststate, i, &centry[i], datum,
NULL, NULL, (OffsetNumber) 0,
datumsize, FALSE, FALSE);
attr[j] = centry[j].key;
isnull[j] = FALSE;
if ((!isAttByVal(giststate, j)))
{
whatfree[j] = TRUE;
if (centry[j].key != datum)
pfree(DatumGetPointer(datum));
}
else
whatfree[j] = FALSE;
attr[i] = centry[i].key;
isnull[i] = FALSE;
}
}
pfree(evec);
if (neednew)
{
......@@ -821,33 +777,24 @@ gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *gis
newtup->t_tid = oldtup->t_tid;
}
for (j = 0; j < r->rd_att->natts; j++)
if (whatfree[j])
pfree(DatumGetPointer(attr[j]));
return newtup;
}
static void
gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
{
int i,
j,
lr;
Datum *attr;
bool *needfree,
IsNull;
int len,
*attrsize;
OffsetNumber *entries;
GistEntryVector *evec;
Datum datum;
int datumsize;
int reallen;
bool *isnull;
int lr;
for (lr = 0; lr <= 1; lr++)
for (lr = 0; lr < 2; lr++)
{
OffsetNumber *entries;
int i;
Datum *attr;
int len,
*attrsize;
bool *isnull;
GistEntryVector *evec;
if (lr)
{
attrsize = spl->spl_lattrsize;
......@@ -865,38 +812,41 @@ gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITV
isnull = spl->spl_risnull;
}
needfree = (bool *) palloc(((len == 1) ? 2 : len) * sizeof(bool));
evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
for (j = 1; j < r->rd_att->natts; j++)
for (i = 1; i < r->rd_att->natts; i++)
{
reallen = 0;
for (i = 0; i < len; i++)
int j;
Datum datum;
int datumsize;
int real_len;
real_len = 0;
for (j = 0; j < len; j++)
{
if (spl->spl_idgrp[entries[i]])
bool IsNull;
if (spl->spl_idgrp[entries[j]])
continue;
datum = index_getattr(itvec[entries[i] - 1], j + 1,
datum = index_getattr(itvec[entries[j] - 1], i + 1,
giststate->tupdesc, &IsNull);
if (IsNull)
continue;
gistdentryinit(giststate, j,
&(evec->vector[reallen]),
gistdentryinit(giststate, i,
&(evec->vector[real_len]),
datum,
NULL, NULL, (OffsetNumber) 0,
ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), FALSE, IsNull);
if ((!isAttByVal(giststate, j)) &&
evec->vector[reallen].key != datum)
needfree[reallen] = TRUE;
else
needfree[reallen] = FALSE;
reallen++;
ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
FALSE, IsNull);
real_len++;
}
if (reallen == 0)
if (real_len == 0)
{
datum = (Datum) 0;
datumsize = 0;
isnull[j] = true;
isnull[i] = true;
}
else
{
......@@ -904,30 +854,23 @@ gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITV
* evec->vector[0].bytes may be not defined, so form union
* with itself
*/
if (reallen == 1)
if (real_len == 1)
{
evec->n = 2;
memcpy((void *) &(evec->vector[1]),
(void *) &(evec->vector[0]),
memcpy(&(evec->vector[1]), &(evec->vector[0]),
sizeof(GISTENTRY));
}
else
evec->n = reallen;
datum = FunctionCall2(&giststate->unionFn[j],
evec->n = real_len;
datum = FunctionCall2(&giststate->unionFn[i],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
isnull[j] = false;
isnull[i] = false;
}
for (i = 0; i < reallen; i++)
if (needfree[i])
pfree(DatumGetPointer(evec->vector[i].key));
attr[j] = datum;
attrsize[j] = datumsize;
attr[i] = datum;
attrsize[i] = datumsize;
}
pfree(evec);
pfree(needfree);
}
}
......@@ -937,11 +880,8 @@ gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITV
static int
gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
{
int i,
j,
len;
int i;
int curid = 1;
bool result;
/*
* first key is always not null (see gistinsert), so we may not check
......@@ -949,6 +889,10 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
*/
for (i = 0; i < spl->spl_nleft; i++)
{
int j;
int len;
bool result;
if (spl->spl_idgrp[spl->spl_left[i]])
continue;
len = 0;
......@@ -996,8 +940,8 @@ gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
}
/*
* Insert equivalent tuples to left or right page
* with minimize penalty
* Insert equivalent tuples to left or right page with minimum
* penalty
*/
static void
gistadjsubkey(Relation r,
......@@ -1008,7 +952,6 @@ gistadjsubkey(Relation r,
{
int curlen;
OffsetNumber *curwpos;
bool decfree[INDEX_MAX_KEYS];
GISTENTRY entry,
identry[INDEX_MAX_KEYS],
*ev0p,
......@@ -1020,12 +963,12 @@ gistadjsubkey(Relation r,
bool isnull[INDEX_MAX_KEYS];
int i,
j;
Datum datum;
/* clear vectors */
curlen = v->spl_nleft;
curwpos = v->spl_left;
for (i = 0; i < v->spl_nleft; i++)
{
if (v->spl_idgrp[v->spl_left[i]] == 0)
{
*curwpos = v->spl_left[i];
......@@ -1033,11 +976,13 @@ gistadjsubkey(Relation r,
}
else
curlen--;
}
v->spl_nleft = curlen;
curlen = v->spl_nright;
curwpos = v->spl_right;
for (i = 0; i < v->spl_nright; i++)
{
if (v->spl_idgrp[v->spl_right[i]] == 0)
{
*curwpos = v->spl_right[i];
......@@ -1045,6 +990,7 @@ gistadjsubkey(Relation r,
}
else
curlen--;
}
v->spl_nright = curlen;
evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
......@@ -1055,16 +1001,17 @@ gistadjsubkey(Relation r,
/* add equivalent tuple */
for (i = 0; i < *len; i++)
{
Datum datum;
if (v->spl_idgrp[i + 1] == 0) /* already inserted */
continue;
gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0,
identry, decfree, isnull);
identry, isnull);
v->spl_ngrp[v->spl_idgrp[i + 1]]--;
if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 &&
(v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED)
(v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED)
{
/* force last in group */
rpenalty = 1.0;
lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0;
......@@ -1088,7 +1035,11 @@ gistadjsubkey(Relation r,
break;
}
}
/* add */
/*
* add
* XXX: refactor this to avoid duplicating code
*/
if (lpenalty < rpenalty)
{
v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED;
......@@ -1103,17 +1054,13 @@ gistadjsubkey(Relation r,
}
else
{
FILLEV(
v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j],
isnull[j], identry[j].key, identry[j].bytes
);
FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j],
isnull[j], identry[j].key, identry[j].bytes);
datum = FunctionCall2(&giststate->unionFn[j],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
if ((!isAttByVal(giststate, j)) && !v->spl_lisnull[j])
pfree(DatumGetPointer(v->spl_lattr[j]));
v->spl_lattr[j] = datum;
v->spl_lattrsize[j] = datumsize;
v->spl_lisnull[j] = false;
......@@ -1134,28 +1081,20 @@ gistadjsubkey(Relation r,
}
else
{
FILLEV(
v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j],
isnull[j], identry[j].key, identry[j].bytes
);
FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j],
isnull[j], identry[j].key, identry[j].bytes);
datum = FunctionCall2(&giststate->unionFn[j],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
if ((!isAttByVal(giststate, j)) && !v->spl_risnull[j])
pfree(DatumGetPointer(v->spl_rattr[j]));
v->spl_rattr[j] = datum;
v->spl_rattrsize[j] = datumsize;
v->spl_risnull[j] = false;
}
}
}
gistFreeAtt(r, identry, decfree);
}
pfree(evec);
}
/*
......@@ -1181,13 +1120,8 @@ gistSplit(Relation r,
GISTPageOpaque opaque;
GIST_SPLITVEC v;
GistEntryVector *entryvec;
bool *decompvec;
int i,
j,
nlen;
int MaxGrpId = 1;
Datum datum;
bool IsNull;
p = (Page) BufferGetPage(buffer);
opaque = (GISTPageOpaque) PageGetSpecialPointer(p);
......@@ -1197,8 +1131,7 @@ gistSplit(Relation r,
* about to split the root, we need to do some hocus-pocus to enforce
* this guarantee.
*/
if (BufferGetBlockNumber(buffer) == GISTP_ROOT)
if (BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO)
{
leftbuf = ReadBuffer(r, P_NEW);
GISTInitBuffer(leftbuf, opaque->flags);
......@@ -1221,17 +1154,17 @@ gistSplit(Relation r,
/* generate the item array */
entryvec = palloc(GEVHDRSZ + (*len + 1) * sizeof(GISTENTRY));
entryvec->n = *len + 1;
decompvec = (bool *) palloc((*len + 1) * sizeof(bool));
for (i = 1; i <= *len; i++)
{
Datum datum;
bool IsNull;
datum = index_getattr(itup[i - 1], 1, giststate->tupdesc, &IsNull);
gistdentryinit(giststate, 0, &(entryvec->vector[i]),
datum, r, p, i,
ATTSIZE(datum, giststate->tupdesc, 1, IsNull), FALSE, IsNull);
if ((!isAttByVal(giststate, 0)) && entryvec->vector[i].key != datum)
decompvec[i] = TRUE;
else
decompvec[i] = FALSE;
ATTSIZE(datum, giststate->tupdesc, 1, IsNull),
FALSE, IsNull);
}
/*
......@@ -1259,6 +1192,8 @@ gistSplit(Relation r,
*/
if (r->rd_att->natts > 1)
{
int MaxGrpId;
v.spl_idgrp = (int *) palloc0(sizeof(int) * (*len + 1));
v.spl_grpflag = (char *) palloc0(sizeof(char) * (*len + 1));
v.spl_ngrp = (int *) palloc(sizeof(int) * (*len + 1));
......@@ -1274,19 +1209,8 @@ gistSplit(Relation r,
*/
if (MaxGrpId > 1)
gistadjsubkey(r, itup, len, &v, giststate);
pfree(v.spl_idgrp);
pfree(v.spl_grpflag);
pfree(v.spl_ngrp);
}
/* clean up the entry vector: its keys need to be deleted, too */
for (i = 1; i <= *len; i++)
if (decompvec[i])
pfree(DatumGetPointer(entryvec->vector[i].key));
pfree(entryvec);
pfree(decompvec);
/* form left and right vector */
lvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nleft);
rvectup = (IndexTuple *) palloc(sizeof(IndexTuple) * v.spl_nright);
......@@ -1298,15 +1222,12 @@ gistSplit(Relation r,
rvectup[i] = itup[v.spl_right[i] - 1];
/* write on disk (may be need another split) */
/* write on disk (may need another split) */
if (gistnospace(right, rvectup, v.spl_nright))
{
nlen = v.spl_nright;
newtup = gistSplit(r, rightbuf, rvectup, &nlen, giststate);
ReleaseBuffer(rightbuf);
for (j = 1; j < r->rd_att->natts; j++)
if ((!isAttByVal(giststate, j)) && !v.spl_risnull[j])
pfree(DatumGetPointer(v.spl_rattr[j]));
}
else
{
......@@ -1321,7 +1242,6 @@ gistSplit(Relation r,
ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1);
}
if (gistnospace(left, lvectup, v.spl_nleft))
{
int llen = v.spl_nleft;
......@@ -1330,35 +1250,24 @@ gistSplit(Relation r,
lntup = gistSplit(r, leftbuf, lvectup, &llen, giststate);
ReleaseBuffer(leftbuf);
for (j = 1; j < r->rd_att->natts; j++)
if ((!isAttByVal(giststate, j)) && !v.spl_lisnull[j])
pfree(DatumGetPointer(v.spl_lattr[j]));
newtup = gistjoinvector(newtup, &nlen, lntup, llen);
pfree(lntup);
}
else
{
OffsetNumber l;
l = gistwritebuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber);
if (BufferGetBlockNumber(buffer) != GISTP_ROOT)
if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO)
PageRestoreTempPage(left, p);
WriteBuffer(leftbuf);
nlen += 1;
newtup = (IndexTuple *) repalloc((void *) newtup, sizeof(IndexTuple) * nlen);
newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen);
newtup[nlen - 1] = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull);
ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, 1);
}
/* !!! pfree */
pfree(rvectup);
pfree(lvectup);
pfree(v.spl_left);
pfree(v.spl_right);
*len = nlen;
return newtup;
}
......@@ -1369,7 +1278,7 @@ gistnewroot(Relation r, IndexTuple *itup, int len)
Buffer b;
Page p;
b = ReadBuffer(r, GISTP_ROOT);
b = ReadBuffer(r, GIST_ROOT_BLKNO);
GISTInitBuffer(b, 0);
p = BufferGetPage(b);
......@@ -1385,16 +1294,13 @@ GISTInitBuffer(Buffer b, uint32 f)
Size pageSize;
pageSize = BufferGetPageSize(b);
page = BufferGetPage(b);
PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
opaque->flags = f;
}
/*
* find entry with lowest penalty
*/
......@@ -1404,17 +1310,12 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
{
OffsetNumber maxoff;
OffsetNumber i;
Datum datum;
float usize;
OffsetNumber which;
float sum_grow,
which_grow[INDEX_MAX_KEYS];
GISTENTRY entry,
identry[INDEX_MAX_KEYS];
bool IsNull,
decompvec[INDEX_MAX_KEYS],
isnull[INDEX_MAX_KEYS];
int j;
bool isnull[INDEX_MAX_KEYS];
maxoff = PageGetMaxOffsetNumber(p);
*which_grow = -1.0;
......@@ -1422,21 +1323,26 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
sum_grow = 1;
gistDeCompressAtt(giststate, r,
it, NULL, (OffsetNumber) 0,
identry, decompvec, isnull);
identry, isnull);
for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i))
{
int j;
IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
sum_grow = 0;
for (j = 0; j < r->rd_att->natts; j++)
{
datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
gistdentryinit(giststate, j, &entry, datum, r, p, i, ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), FALSE, IsNull);
gistpenalty(giststate, j, &entry, IsNull, &identry[j], isnull[j], &usize);
Datum datum;
float usize;
bool IsNull;
if ((!isAttByVal(giststate, j)) && entry.key != datum)
pfree(DatumGetPointer(entry.key));
datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
gistdentryinit(giststate, j, &entry, datum, r, p, i,
ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull),
FALSE, IsNull);
gistpenalty(giststate, j, &entry, IsNull,
&identry[j], isnull[j], &usize);
if (which_grow[j] < 0 || usize < which_grow[j])
{
......@@ -1456,24 +1362,9 @@ gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
}
}
gistFreeAtt(r, identry, decompvec);
return which;
}
void
gistfreestack(GISTSTACK *s)
{
GISTSTACK *p;
while (s != NULL)
{
p = s->gs_parent;
pfree(s);
s = p;
}
}
/*
* Retail deletion of a single tuple.
*
......@@ -1593,7 +1484,6 @@ gistbulkdelete(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(result);
}
void
initGISTstate(GISTSTATE *giststate, Relation index)
{
......@@ -1608,22 +1498,22 @@ initGISTstate(GISTSTATE *giststate, Relation index)
for (i = 0; i < index->rd_att->natts; i++)
{
fmgr_info_copy(&(giststate->consistentFn[i]),
index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
index_getprocinfo(index, i + 1, GIST_CONSISTENT_PROC),
CurrentMemoryContext);
fmgr_info_copy(&(giststate->unionFn[i]),
index_getprocinfo(index, i + 1, GIST_UNION_PROC),
CurrentMemoryContext);
fmgr_info_copy(&(giststate->compressFn[i]),
index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
index_getprocinfo(index, i + 1, GIST_COMPRESS_PROC),
CurrentMemoryContext);
fmgr_info_copy(&(giststate->decompressFn[i]),
index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
index_getprocinfo(index, i + 1, GIST_DECOMPRESS_PROC),
CurrentMemoryContext);
fmgr_info_copy(&(giststate->penaltyFn[i]),
index_getprocinfo(index, i + 1, GIST_PENALTY_PROC),
CurrentMemoryContext);
fmgr_info_copy(&(giststate->picksplitFn[i]),
index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
index_getprocinfo(index, i + 1, GIST_PICKSPLIT_PROC),
CurrentMemoryContext);
fmgr_info_copy(&(giststate->equalFn[i]),
index_getprocinfo(index, i + 1, GIST_EQUAL_PROC),
......@@ -1703,11 +1593,8 @@ gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
PointerGetDatum(e)));
/* decompressFn may just return the given pointer */
if (dep != e)
{
gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
dep->bytes, dep->leafkey);
pfree(dep);
}
}
else
gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
......@@ -1732,11 +1619,8 @@ gistcentryinit(GISTSTATE *giststate, int nkey,
PointerGetDatum(e)));
/* compressFn may just return the given pointer */
if (cep != e)
{
gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset,
cep->bytes, cep->leafkey);
pfree(cep);
}
}
else
gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
......@@ -1746,79 +1630,42 @@ static IndexTuple
gistFormTuple(GISTSTATE *giststate, Relation r,
Datum attdata[], int datumsize[], bool isnull[])
{
IndexTuple tup;
bool whatfree[INDEX_MAX_KEYS];
GISTENTRY centry[INDEX_MAX_KEYS];
Datum compatt[INDEX_MAX_KEYS];
int j;
int i;
for (j = 0; j < r->rd_att->natts; j++)
for (i = 0; i < r->rd_att->natts; i++)
{
if (isnull[j])
{
compatt[j] = (Datum) 0;
whatfree[j] = FALSE;
}
if (isnull[i])
compatt[i] = (Datum) 0;
else
{
gistcentryinit(giststate, j, &centry[j], attdata[j],
gistcentryinit(giststate, i, &centry[i], attdata[i],
NULL, NULL, (OffsetNumber) 0,
datumsize[j], FALSE, FALSE);
compatt[j] = centry[j].key;
if (!isAttByVal(giststate, j))
{
whatfree[j] = TRUE;
if (centry[j].key != attdata[j])
pfree(DatumGetPointer(attdata[j]));
}
else
whatfree[j] = FALSE;
datumsize[i], FALSE, FALSE);
compatt[i] = centry[i].key;
}
}
tup = index_form_tuple(giststate->tupdesc, compatt, isnull);
for (j = 0; j < r->rd_att->natts; j++)
if (whatfree[j])
pfree(DatumGetPointer(compatt[j]));
return tup;
return index_form_tuple(giststate->tupdesc, compatt, isnull);
}
static void
gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
OffsetNumber o, GISTENTRY *attdata, bool *decompvec, bool *isnull)
OffsetNumber o, GISTENTRY *attdata, bool *isnull)
{
int i;
Datum datum;
for (i = 0; i < r->rd_att->natts; i++)
{
datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
gistdentryinit(giststate, i, &attdata[i],
datum, r, p, o,
ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]), FALSE, isnull[i]);
if (isAttByVal(giststate, i))
decompvec[i] = FALSE;
else
{
if (attdata[i].key == datum || isnull[i])
decompvec[i] = FALSE;
else
decompvec[i] = TRUE;
}
ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]),
FALSE, isnull[i]);
}
}
static void
gistFreeAtt(Relation r, GISTENTRY *attdata, bool *decompvec)
{
int i;
for (i = 0; i < r->rd_att->natts; i++)
if (decompvec[i])
pfree(DatumGetPointer(attdata[i].key));
}
static void
gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *key1, bool isNull1,
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.45 2005/03/27 23:52:55 tgl Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.46 2005/05/17 00:59:30 neilc Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -16,41 +16,71 @@
#include "access/gist.h"
#include "executor/execdebug.h"
#include "utils/memutils.h"
static OffsetNumber gistfindnext(IndexScanDesc s, Page p, OffsetNumber n,
ScanDirection dir);
static bool gistscancache(IndexScanDesc s, ScanDirection dir);
static bool gistfirst(IndexScanDesc s, ScanDirection dir);
static bool gistnext(IndexScanDesc s, ScanDirection dir);
static bool gistindex_keytest(IndexTuple tuple,
int scanKeySize, ScanKey key, GISTSTATE *giststate,
Relation r, Page p, OffsetNumber offset);
static OffsetNumber gistfindnext(IndexScanDesc scan, OffsetNumber n,
ScanDirection dir);
static bool gistnext(IndexScanDesc scan, ScanDirection dir);
static bool gistindex_keytest(IndexTuple tuple, IndexScanDesc scan,
OffsetNumber offset);
/*
* gistgettuple() -- Get the next tuple in the scan
*/
Datum
gistgettuple(PG_FUNCTION_ARGS)
{
IndexScanDesc s = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
bool res;
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
Page page;
OffsetNumber offnum;
GISTScanOpaque so;
/* if we have it cached in the scan desc, just return the value */
if (gistscancache(s, dir))
PG_RETURN_BOOL(true);
so = (GISTScanOpaque) scan->opaque;
/* not cached, so we'll have to do some work */
if (ItemPointerIsValid(&(s->currentItemData)))
res = gistnext(s, dir);
else
res = gistfirst(s, dir);
PG_RETURN_BOOL(res);
/*
* If we have produced an index tuple in the past and the executor
* has informed us we need to mark it as "killed", do so now.
*
* XXX: right now there is no concurrent access. In the
* future, we should (a) get a read lock on the page (b) check
* that the location of the previously-fetched tuple hasn't
* changed due to concurrent insertions.
*/
if (scan->kill_prior_tuple && ItemPointerIsValid(&(scan->currentItemData)))
{
offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
page = BufferGetPage(so->curbuf);
PageGetItemId(page, offnum)->lp_flags |= LP_DELETE;
SetBufferCommitInfoNeedsSave(so->curbuf);
}
/*
* Get the next tuple that matches the search key. If asked to
* skip killed tuples, continue looping until we find a non-killed
* tuple that matches the search key.
*/
for (;;)
{
bool res = gistnext(scan, dir);
if (res == true && scan->ignore_killed_tuples)
{
offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
page = BufferGetPage(so->curbuf);
if (ItemIdDeleted(PageGetItemId(page, offnum)))
continue;
}
PG_RETURN_BOOL(res);
}
}
Datum
gistgetmulti(PG_FUNCTION_ARGS)
{
IndexScanDesc s = (IndexScanDesc) PG_GETARG_POINTER(0);
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ItemPointer tids = (ItemPointer) PG_GETARG_POINTER(1);
int32 max_tids = PG_GETARG_INT32(2);
int32 *returned_tids = (int32 *) PG_GETARG_POINTER(3);
......@@ -60,13 +90,10 @@ gistgetmulti(PG_FUNCTION_ARGS)
/* XXX generic implementation: loop around guts of gistgettuple */
while (ntids < max_tids)
{
if (ItemPointerIsValid(&(s->currentItemData)))
res = gistnext(s, ForwardScanDirection);
else
res = gistfirst(s, ForwardScanDirection);
res = gistnext(scan, ForwardScanDirection);
if (!res)
break;
tids[ntids] = s->xs_ctup.t_self;
tids[ntids] = scan->xs_ctup.t_self;
ntids++;
}
......@@ -74,166 +101,123 @@ gistgetmulti(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(res);
}
/*
* Fetch a tuple that matchs the search key; this can be invoked
* either to fetch the first such tuple or subsequent matching
* tuples. Returns true iff a matching tuple was found.
*/
static bool
gistfirst(IndexScanDesc s, ScanDirection dir)
gistnext(IndexScanDesc scan, ScanDirection dir)
{
Buffer b;
Page p;
OffsetNumber n;
OffsetNumber maxoff;
GISTPageOpaque po;
GISTScanOpaque so;
GISTSTACK *stk;
BlockNumber blk;
IndexTuple it;
so = (GISTScanOpaque) s->opaque;
so = (GISTScanOpaque) scan->opaque;
if (ItemPointerIsValid(&scan->currentItemData) == false)
{
/* Being asked to fetch the first entry, so start at the root */
Assert(so->curbuf == InvalidBuffer);
so->curbuf = ReadBuffer(scan->indexRelation, GIST_ROOT_BLKNO);
}
b = ReadBuffer(s->indexRelation, GISTP_ROOT);
p = BufferGetPage(b);
p = BufferGetPage(so->curbuf);
po = (GISTPageOpaque) PageGetSpecialPointer(p);
for (;;)
if (ItemPointerIsValid(&scan->currentItemData) == false)
{
maxoff = PageGetMaxOffsetNumber(p);
if (ScanDirectionIsBackward(dir))
n = gistfindnext(s, p, maxoff, dir);
n = PageGetMaxOffsetNumber(p);
else
n = gistfindnext(s, p, FirstOffsetNumber, dir);
while (n < FirstOffsetNumber || n > maxoff)
{
stk = so->s_stack;
if (stk == NULL)
{
ReleaseBuffer(b);
return false;
}
b = ReleaseAndReadBuffer(b, s->indexRelation, stk->gs_blk);
p = BufferGetPage(b);
po = (GISTPageOpaque) PageGetSpecialPointer(p);
maxoff = PageGetMaxOffsetNumber(p);
if (ScanDirectionIsBackward(dir))
n = OffsetNumberPrev(stk->gs_child);
else
n = OffsetNumberNext(stk->gs_child);
so->s_stack = stk->gs_parent;
pfree(stk);
n = gistfindnext(s, p, n, dir);
}
if (po->flags & F_LEAF)
{
ItemPointerSet(&(s->currentItemData), BufferGetBlockNumber(b), n);
it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
s->xs_ctup.t_self = it->t_tid;
ReleaseBuffer(b);
return true;
}
else
{
stk = (GISTSTACK *) palloc(sizeof(GISTSTACK));
stk->gs_child = n;
stk->gs_blk = BufferGetBlockNumber(b);
stk->gs_parent = so->s_stack;
so->s_stack = stk;
it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
blk = ItemPointerGetBlockNumber(&(it->t_tid));
b = ReleaseAndReadBuffer(b, s->indexRelation, blk);
p = BufferGetPage(b);
po = (GISTPageOpaque) PageGetSpecialPointer(p);
}
n = FirstOffsetNumber;
}
}
static bool
gistnext(IndexScanDesc s, ScanDirection dir)
{
Buffer b;
Page p;
OffsetNumber n;
OffsetNumber maxoff;
GISTPageOpaque po;
GISTScanOpaque so;
GISTSTACK *stk;
BlockNumber blk;
IndexTuple it;
so = (GISTScanOpaque) s->opaque;
blk = ItemPointerGetBlockNumber(&(s->currentItemData));
n = ItemPointerGetOffsetNumber(&(s->currentItemData));
if (ScanDirectionIsForward(dir))
n = OffsetNumberNext(n);
else
n = OffsetNumberPrev(n);
{
n = ItemPointerGetOffsetNumber(&(scan->currentItemData));
b = ReadBuffer(s->indexRelation, blk);
p = BufferGetPage(b);
po = (GISTPageOpaque) PageGetSpecialPointer(p);
if (ScanDirectionIsBackward(dir))
n = OffsetNumberPrev(n);
else
n = OffsetNumberNext(n);
}
for (;;)
{
maxoff = PageGetMaxOffsetNumber(p);
n = gistfindnext(s, p, n, dir);
n = gistfindnext(scan, n, dir);
while (n < FirstOffsetNumber || n > maxoff)
if (!OffsetNumberIsValid(n))
{
stk = so->s_stack;
if (stk == NULL)
/*
* We ran out of matching index entries on the current
* page, so pop the top stack entry and use it to continue
* the search.
*/
/* If we're out of stack entries, we're done */
if (so->stack == NULL)
{
ReleaseBuffer(b);
ReleaseBuffer(so->curbuf);
so->curbuf = InvalidBuffer;
return false;
}
b = ReleaseAndReadBuffer(b, s->indexRelation, stk->gs_blk);
p = BufferGetPage(b);
stk = so->stack;
so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
stk->block);
p = BufferGetPage(so->curbuf);
po = (GISTPageOpaque) PageGetSpecialPointer(p);
maxoff = PageGetMaxOffsetNumber(p);
if (ScanDirectionIsBackward(dir))
n = OffsetNumberPrev(stk->gs_child);
n = OffsetNumberPrev(stk->offset);
else
n = OffsetNumberNext(stk->gs_child);
n = OffsetNumberNext(stk->offset);
so->s_stack = stk->gs_parent;
so->stack = stk->parent;
pfree(stk);
n = gistfindnext(s, p, n, dir);
continue;
}
if (po->flags & F_LEAF)
{
ItemPointerSet(&(s->currentItemData), BufferGetBlockNumber(b), n);
/*
* We've found a matching index entry in a leaf page, so
* return success. Note that we keep "curbuf" pinned so
* that we can efficiently resume the index scan later.
*/
ItemPointerSet(&(scan->currentItemData),
BufferGetBlockNumber(so->curbuf), n);
it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
s->xs_ctup.t_self = it->t_tid;
ReleaseBuffer(b);
scan->xs_ctup.t_self = it->t_tid;
return true;
}
else
{
/*
* We've found an entry in an internal node whose key is
* consistent with the search key, so continue the search
* in the pointed-to child node (i.e. we search depth
* first). Push the current node onto the stack so we
* resume searching from this node later.
*/
BlockNumber child_block;
stk = (GISTSTACK *) palloc(sizeof(GISTSTACK));
stk->gs_child = n;
stk->gs_blk = BufferGetBlockNumber(b);
stk->gs_parent = so->s_stack;
so->s_stack = stk;
stk->offset = n;
stk->block = BufferGetBlockNumber(so->curbuf);
stk->parent = so->stack;
so->stack = stk;
it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
blk = ItemPointerGetBlockNumber(&(it->t_tid));
child_block = ItemPointerGetBlockNumber(&(it->t_tid));
b = ReleaseAndReadBuffer(b, s->indexRelation, blk);
p = BufferGetPage(b);
so->curbuf = ReleaseAndReadBuffer(so->curbuf, scan->indexRelation,
child_block);
p = BufferGetPage(so->curbuf);
po = (GISTPageOpaque) PageGetSpecialPointer(p);
if (ScanDirectionIsBackward(dir))
......@@ -244,19 +228,34 @@ gistnext(IndexScanDesc s, ScanDirection dir)
}
}
/* Similar to index_keytest, but decompresses the key in the IndexTuple */
/*
* Similar to index_keytest, but first decompress the key in the
* IndexTuple before passing it to the sk_func (and we have previously
* overwritten the sk_func to use the user-defined Consistent method,
* so we actually invoke that). Note that this function is always
* invoked in a short-lived memory context, so we don't need to worry
* about cleaning up allocated memory (either here or in the
* implementation of any Consistent methods).
*/
static bool
gistindex_keytest(IndexTuple tuple,
int scanKeySize,
ScanKey key,
GISTSTATE *giststate,
Relation r,
Page p,
IndexScanDesc scan,
OffsetNumber offset)
{
int keySize = scan->numberOfKeys;
ScanKey key = scan->keyData;
Relation r = scan->indexRelation;
GISTScanOpaque so;
Page p;
GISTSTATE *giststate;
so = (GISTScanOpaque) scan->opaque;
giststate = so->giststate;
p = BufferGetPage(so->curbuf);
IncrIndexProcessed();
while (scanKeySize > 0)
while (keySize > 0)
{
Datum datum;
bool isNull;
......@@ -297,53 +296,57 @@ gistindex_keytest(IndexTuple tuple,
Int32GetDatum(key->sk_strategy),
ObjectIdGetDatum(key->sk_subtype));
/* if index datum had to be decompressed, free it */
if (de.key != datum && !isAttByVal(giststate, key->sk_attno - 1))
if (DatumGetPointer(de.key) != NULL)
pfree(DatumGetPointer(de.key));
if (!DatumGetBool(test))
return false;
scanKeySize--;
keySize--;
key++;
}
return true;
}
/*
* Return the offset of the first index entry that is consistent with
* the search key after offset 'n' in the current page. If there are
* no more consistent entries, return InvalidOffsetNumber.
*/
static OffsetNumber
gistfindnext(IndexScanDesc s, Page p, OffsetNumber n, ScanDirection dir)
gistfindnext(IndexScanDesc scan, OffsetNumber n, ScanDirection dir)
{
OffsetNumber maxoff;
IndexTuple it;
GISTPageOpaque po;
GISTScanOpaque so;
GISTSTATE *giststate;
OffsetNumber maxoff;
IndexTuple it;
GISTPageOpaque po;
GISTScanOpaque so;
MemoryContext oldcxt;
Page p;
so = (GISTScanOpaque) scan->opaque;
p = BufferGetPage(so->curbuf);
maxoff = PageGetMaxOffsetNumber(p);
po = (GISTPageOpaque) PageGetSpecialPointer(p);
so = (GISTScanOpaque) s->opaque;
giststate = so->giststate;
/*
* Make sure we're in a short-lived memory context when we invoke
* a user-supplied GiST method in gistindex_keytest(), so we don't
* leak memory
*/
oldcxt = MemoryContextSwitchTo(so->tempCxt);
/*
* If we modified the index during the scan, we may have a pointer to
* a ghost tuple, before the scan. If this is the case, back up one.
*/
if (so->s_flags & GS_CURBEFORE)
if (so->flags & GS_CURBEFORE)
{
so->s_flags &= ~GS_CURBEFORE;
so->flags &= ~GS_CURBEFORE;
n = OffsetNumberPrev(n);
}
while (n >= FirstOffsetNumber && n <= maxoff)
{
it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
if (gistindex_keytest(it,
s->numberOfKeys, s->keyData, giststate,
s->indexRelation, p, n))
if (gistindex_keytest(it, scan, n))
break;
if (ScanDirectionIsBackward(dir))
......@@ -352,28 +355,16 @@ gistfindnext(IndexScanDesc s, Page p, OffsetNumber n, ScanDirection dir)
n = OffsetNumberNext(n);
}
return n;
}
static bool
gistscancache(IndexScanDesc s, ScanDirection dir)
{
Buffer b;
Page p;
OffsetNumber n;
IndexTuple it;
MemoryContextSwitchTo(oldcxt);
MemoryContextReset(so->tempCxt);
if (!(ScanDirectionIsNoMovement(dir)
&& ItemPointerIsValid(&(s->currentItemData))))
return false;
b = ReadBuffer(s->indexRelation,
ItemPointerGetBlockNumber(&(s->currentItemData)));
p = BufferGetPage(b);
n = ItemPointerGetOffsetNumber(&(s->currentItemData));
it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
s->xs_ctup.t_self = it->t_tid;
ReleaseBuffer(b);
return true;
/*
* If we found a matching entry, return its offset; otherwise
* return InvalidOffsetNumber to inform the caller to go to the
* next page.
*/
if (n >= FirstOffsetNumber && n <= maxoff)
return n;
else
return InvalidOffsetNumber;
}
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistscan.c,v 1.56 2004/12/31 21:59:10 pgsql Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistscan.c,v 1.57 2005/05/17 00:59:30 neilc Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -17,28 +17,29 @@
#include "access/genam.h"
#include "access/gist.h"
#include "access/gistscan.h"
#include "utils/memutils.h"
#include "utils/resowner.h"
/* routines defined and used here */
static void gistregscan(IndexScanDesc s);
static void gistdropscan(IndexScanDesc s);
static void gistadjone(IndexScanDesc s, int op, BlockNumber blkno,
static void gistregscan(IndexScanDesc scan);
static void gistdropscan(IndexScanDesc scan);
static void gistadjone(IndexScanDesc scan, int op, BlockNumber blkno,
OffsetNumber offnum);
static void adjuststack(GISTSTACK *stk, BlockNumber blkno);
static void adjustiptr(IndexScanDesc s, ItemPointer iptr,
static void adjustiptr(IndexScanDesc scan, ItemPointer iptr,
int op, BlockNumber blkno, OffsetNumber offnum);
static void gistfreestack(GISTSTACK *s);
/*
* Whenever we start a GiST scan in a backend, we register it in private
* space. Then if the GiST index gets updated, we check all registered
* scans and adjust them if the tuple they point at got moved by the
* update. We only need to do this in private space, because when we update
* an GiST we have a write lock on the tree, so no other process can have
* any locks at all on it. A single transaction can have write and read
* locks on the same object, so that's why we need to handle this case.
* Whenever we start a GiST scan in a backend, we register it in
* private space. Then if the GiST index gets updated, we check all
* registered scans and adjust them if the tuple they point at got
* moved by the update. We only need to do this in private space,
* because when we update an GiST we have a write lock on the tree, so
* no other process can have any locks at all on it. A single
* transaction can have write and read locks on the same object, so
* that's why we need to handle this case.
*/
typedef struct GISTScanListData
{
IndexScanDesc gsl_scan;
......@@ -57,65 +58,77 @@ gistbeginscan(PG_FUNCTION_ARGS)
Relation r = (Relation) PG_GETARG_POINTER(0);
int nkeys = PG_GETARG_INT32(1);
ScanKey key = (ScanKey) PG_GETARG_POINTER(2);
IndexScanDesc s;
IndexScanDesc scan;
s = RelationGetIndexScan(r, nkeys, key);
scan = RelationGetIndexScan(r, nkeys, key);
gistregscan(scan);
gistregscan(s);
PG_RETURN_POINTER(s);
PG_RETURN_POINTER(scan);
}
Datum
gistrescan(PG_FUNCTION_ARGS)
{
IndexScanDesc s = (IndexScanDesc) PG_GETARG_POINTER(0);
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
ScanKey key = (ScanKey) PG_GETARG_POINTER(1);
GISTScanOpaque p;
GISTScanOpaque so;
int i;
/*
* Clear all the pointers.
*/
ItemPointerSetInvalid(&s->currentItemData);
ItemPointerSetInvalid(&s->currentMarkData);
ItemPointerSetInvalid(&scan->currentItemData);
ItemPointerSetInvalid(&scan->currentMarkData);
p = (GISTScanOpaque) s->opaque;
if (p != NULL)
so = (GISTScanOpaque) scan->opaque;
if (so != NULL)
{
/* rescan an existing indexscan --- reset state */
gistfreestack(p->s_stack);
gistfreestack(p->s_markstk);
p->s_stack = p->s_markstk = NULL;
p->s_flags = 0x0;
gistfreestack(so->stack);
gistfreestack(so->markstk);
so->stack = so->markstk = NULL;
so->flags = 0x0;
/* drop pins on buffers -- no locks held */
if (BufferIsValid(so->curbuf))
{
ReleaseBuffer(so->curbuf);
so->curbuf = InvalidBuffer;
}
if (BufferIsValid(so->markbuf))
{
ReleaseBuffer(so->markbuf);
so->markbuf = InvalidBuffer;
}
}
else
{
/* initialize opaque data */
p = (GISTScanOpaque) palloc(sizeof(GISTScanOpaqueData));
p->s_stack = p->s_markstk = NULL;
p->s_flags = 0x0;
s->opaque = p;
p->giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
initGISTstate(p->giststate, s->indexRelation);
so = (GISTScanOpaque) palloc(sizeof(GISTScanOpaqueData));
so->stack = so->markstk = NULL;
so->flags = 0x0;
so->tempCxt = createTempGistContext();
so->curbuf = so->markbuf = InvalidBuffer;
so->giststate = (GISTSTATE *) palloc(sizeof(GISTSTATE));
initGISTstate(so->giststate, scan->indexRelation);
scan->opaque = so;
}
/* Update scan key, if a new one is given */
if (key && s->numberOfKeys > 0)
if (key && scan->numberOfKeys > 0)
{
memmove(s->keyData,
key,
s->numberOfKeys * sizeof(ScanKeyData));
memmove(scan->keyData, key,
scan->numberOfKeys * sizeof(ScanKeyData));
/*
* Modify the scan key so that the Consistent function is called
* for all comparisons. The original operator is passed to the
* Consistent function in the form of its strategy number, which
* is available from the sk_strategy field, and its subtype from
* the sk_subtype field.
* Modify the scan key so that all the Consistent method is
* called for all comparisons. The original operator is passed
* to the Consistent function in the form of its strategy
* number, which is available from the sk_strategy field, and
* its subtype from the sk_subtype field.
*/
for (i = 0; i < s->numberOfKeys; i++)
s->keyData[i].sk_func = p->giststate->consistentFn[s->keyData[i].sk_attno - 1];
for (i = 0; i < scan->numberOfKeys; i++)
scan->keyData[i].sk_func = so->giststate->consistentFn[scan->keyData[i].sk_attno - 1];
}
PG_RETURN_VOID();
......@@ -124,35 +137,47 @@ gistrescan(PG_FUNCTION_ARGS)
Datum
gistmarkpos(PG_FUNCTION_ARGS)
{
IndexScanDesc s = (IndexScanDesc) PG_GETARG_POINTER(0);
GISTScanOpaque p;
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
GISTScanOpaque so;
GISTSTACK *o,
*n,
*tmp;
s->currentMarkData = s->currentItemData;
p = (GISTScanOpaque) s->opaque;
if (p->s_flags & GS_CURBEFORE)
p->s_flags |= GS_MRKBEFORE;
scan->currentMarkData = scan->currentItemData;
so = (GISTScanOpaque) scan->opaque;
if (so->flags & GS_CURBEFORE)
so->flags |= GS_MRKBEFORE;
else
p->s_flags &= ~GS_MRKBEFORE;
so->flags &= ~GS_MRKBEFORE;
o = NULL;
n = p->s_stack;
n = so->stack;
/* copy the parent stack from the current item data */
while (n != NULL)
{
tmp = (GISTSTACK *) palloc(sizeof(GISTSTACK));
tmp->gs_child = n->gs_child;
tmp->gs_blk = n->gs_blk;
tmp->gs_parent = o;
tmp->offset = n->offset;
tmp->block = n->block;
tmp->parent = o;
o = tmp;
n = n->gs_parent;
n = n->parent;
}
gistfreestack(p->s_markstk);
p->s_markstk = o;
gistfreestack(so->markstk);
so->markstk = o;
/* Update markbuf: make sure to bump ref count on curbuf */
if (BufferIsValid(so->markbuf))
{
ReleaseBuffer(so->markbuf);
so->markbuf = InvalidBuffer;
}
if (BufferIsValid(so->curbuf))
{
IncrBufferRefCount(so->curbuf);
so->markbuf = so->curbuf;
}
PG_RETURN_VOID();
}
......@@ -160,35 +185,47 @@ gistmarkpos(PG_FUNCTION_ARGS)
Datum
gistrestrpos(PG_FUNCTION_ARGS)
{
IndexScanDesc s = (IndexScanDesc) PG_GETARG_POINTER(0);
GISTScanOpaque p;
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
GISTScanOpaque so;
GISTSTACK *o,
*n,
*tmp;
s->currentItemData = s->currentMarkData;
p = (GISTScanOpaque) s->opaque;
if (p->s_flags & GS_MRKBEFORE)
p->s_flags |= GS_CURBEFORE;
scan->currentItemData = scan->currentMarkData;
so = (GISTScanOpaque) scan->opaque;
if (so->flags & GS_MRKBEFORE)
so->flags |= GS_CURBEFORE;
else
p->s_flags &= ~GS_CURBEFORE;
so->flags &= ~GS_CURBEFORE;
o = NULL;
n = p->s_markstk;
n = so->markstk;
/* copy the parent stack from the current item data */
while (n != NULL)
{
tmp = (GISTSTACK *) palloc(sizeof(GISTSTACK));
tmp->gs_child = n->gs_child;
tmp->gs_blk = n->gs_blk;
tmp->gs_parent = o;
tmp->offset = n->offset;
tmp->block = n->block;
tmp->parent = o;
o = tmp;
n = n->gs_parent;
n = n->parent;
}
gistfreestack(p->s_stack);
p->s_stack = o;
gistfreestack(so->stack);
so->stack = o;
/* Update curbuf: be sure to bump ref count on markbuf */
if (BufferIsValid(so->curbuf))
{
ReleaseBuffer(so->curbuf);
so->curbuf = InvalidBuffer;
}
if (BufferIsValid(so->markbuf))
{
IncrBufferRefCount(so->markbuf);
so->curbuf = so->markbuf;
}
PG_RETURN_VOID();
}
......@@ -196,52 +233,57 @@ gistrestrpos(PG_FUNCTION_ARGS)
Datum
gistendscan(PG_FUNCTION_ARGS)
{
IndexScanDesc s = (IndexScanDesc) PG_GETARG_POINTER(0);
GISTScanOpaque p;
IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
GISTScanOpaque so;
p = (GISTScanOpaque) s->opaque;
so = (GISTScanOpaque) scan->opaque;
if (p != NULL)
if (so != NULL)
{
gistfreestack(p->s_stack);
gistfreestack(p->s_markstk);
if (p->giststate != NULL)
freeGISTstate(p->giststate);
pfree(s->opaque);
gistfreestack(so->stack);
gistfreestack(so->markstk);
if (so->giststate != NULL)
freeGISTstate(so->giststate);
/* drop pins on buffers -- we aren't holding any locks */
if (BufferIsValid(so->curbuf))
ReleaseBuffer(so->curbuf);
if (BufferIsValid(so->markbuf))
ReleaseBuffer(so->markbuf);
MemoryContextDelete(so->tempCxt);
pfree(scan->opaque);
}
gistdropscan(s);
/* XXX don't unset read lock -- two-phase locking */
gistdropscan(scan);
PG_RETURN_VOID();
}
static void
gistregscan(IndexScanDesc s)
gistregscan(IndexScanDesc scan)
{
GISTScanList l;
l = (GISTScanList) palloc(sizeof(GISTScanListData));
l->gsl_scan = s;
l->gsl_scan = scan;
l->gsl_owner = CurrentResourceOwner;
l->gsl_next = GISTScans;
GISTScans = l;
}
static void
gistdropscan(IndexScanDesc s)
gistdropscan(IndexScanDesc scan)
{
GISTScanList l;
GISTScanList prev;
prev = NULL;
for (l = GISTScans; l != NULL && l->gsl_scan != s; l = l->gsl_next)
for (l = GISTScans; l != NULL && l->gsl_scan != scan; l = l->gsl_next)
prev = l;
if (l == NULL)
elog(ERROR, "GiST scan list corrupted -- could not find 0x%p",
(void *) s);
(void *) scan);
if (prev == NULL)
GISTScans = l->gsl_next;
......@@ -313,22 +355,22 @@ gistadjscans(Relation rel, int op, BlockNumber blkno, OffsetNumber offnum)
* update. If so, we make the change here.
*/
static void
gistadjone(IndexScanDesc s,
gistadjone(IndexScanDesc scan,
int op,
BlockNumber blkno,
OffsetNumber offnum)
{
GISTScanOpaque so;
adjustiptr(s, &(s->currentItemData), op, blkno, offnum);
adjustiptr(s, &(s->currentMarkData), op, blkno, offnum);
adjustiptr(scan, &(scan->currentItemData), op, blkno, offnum);
adjustiptr(scan, &(scan->currentMarkData), op, blkno, offnum);
so = (GISTScanOpaque) s->opaque;
so = (GISTScanOpaque) scan->opaque;
if (op == GISTOP_SPLIT)
{
adjuststack(so->s_stack, blkno);
adjuststack(so->s_markstk, blkno);
adjuststack(so->stack, blkno);
adjuststack(so->markstk, blkno);
}
}
......@@ -340,7 +382,7 @@ gistadjone(IndexScanDesc s,
* the same page.
*/
static void
adjustiptr(IndexScanDesc s,
adjustiptr(IndexScanDesc scan,
ItemPointer iptr,
int op,
BlockNumber blkno,
......@@ -354,7 +396,7 @@ adjustiptr(IndexScanDesc s,
if (ItemPointerGetBlockNumber(iptr) == blkno)
{
curoff = ItemPointerGetOffsetNumber(iptr);
so = (GISTScanOpaque) s->opaque;
so = (GISTScanOpaque) scan->opaque;
switch (op)
{
......@@ -362,7 +404,6 @@ adjustiptr(IndexScanDesc s,
/* back up one if we need to */
if (curoff >= offnum)
{
if (curoff > FirstOffsetNumber)
{
/* just adjust the item pointer */
......@@ -375,10 +416,10 @@ adjustiptr(IndexScanDesc s,
* tuple
*/
ItemPointerSet(iptr, blkno, FirstOffsetNumber);
if (iptr == &(s->currentItemData))
so->s_flags |= GS_CURBEFORE;
if (iptr == &(scan->currentItemData))
so->flags |= GS_CURBEFORE;
else
so->s_flags |= GS_MRKBEFORE;
so->flags |= GS_MRKBEFORE;
}
}
break;
......@@ -386,10 +427,10 @@ adjustiptr(IndexScanDesc s,
case GISTOP_SPLIT:
/* back to start of page on split */
ItemPointerSet(iptr, blkno, FirstOffsetNumber);
if (iptr == &(s->currentItemData))
so->s_flags &= ~GS_CURBEFORE;
if (iptr == &(scan->currentItemData))
so->flags &= ~GS_CURBEFORE;
else
so->s_flags &= ~GS_MRKBEFORE;
so->flags &= ~GS_MRKBEFORE;
break;
default:
......@@ -417,9 +458,20 @@ adjuststack(GISTSTACK *stk, BlockNumber blkno)
{
while (stk != NULL)
{
if (stk->gs_blk == blkno)
stk->gs_child = FirstOffsetNumber;
if (stk->block == blkno)
stk->offset = FirstOffsetNumber;
stk = stk->parent;
}
}
stk = stk->gs_parent;
static void
gistfreestack(GISTSTACK *s)
{
while (s != NULL)
{
GISTSTACK *p = s->parent;
pfree(s);
s = p;
}
}
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/access/gist.h,v 1.44 2005/03/27 23:53:04 tgl Exp $
* $PostgreSQL: pgsql/src/include/access/gist.h,v 1.45 2005/05/17 00:59:30 neilc Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -54,13 +54,21 @@ typedef GISTPageOpaqueData *GISTPageOpaque;
#define GIST_LEAF(entry) (((GISTPageOpaque) PageGetSpecialPointer((entry)->page))->flags & F_LEAF)
/*
* When we descend a tree, we keep a stack of parent pointers.
* When we descend a tree, we keep a stack of parent pointers. This
* allows us to follow a chain of internal node points until we reach
* a leaf node, and then back up the stack to re-examine the internal
* nodes.
*
* 'parent' is the previous stack entry -- i.e. the node we arrived
* from. 'block' is the node's block number. 'offset' is the offset in
* the node's page that we stopped at (i.e. we followed the child
* pointer located at the specified offset).
*/
typedef struct GISTSTACK
{
struct GISTSTACK *gs_parent;
OffsetNumber gs_child;
BlockNumber gs_blk;
struct GISTSTACK *parent;
OffsetNumber offset;
BlockNumber block;
} GISTSTACK;
typedef struct GISTSTATE
......@@ -84,10 +92,13 @@ typedef struct GISTSTATE
*/
typedef struct GISTScanOpaqueData
{
struct GISTSTACK *s_stack;
struct GISTSTACK *s_markstk;
uint16 s_flags;
struct GISTSTATE *giststate;
GISTSTACK *stack;
GISTSTACK *markstk;
uint16 flags;
GISTSTATE *giststate;
MemoryContext tempCxt;
Buffer curbuf;
Buffer markbuf;
} GISTScanOpaqueData;
typedef GISTScanOpaqueData *GISTScanOpaque;
......@@ -101,8 +112,8 @@ typedef GISTScanOpaqueData *GISTScanOpaque;
#define GS_CURBEFORE ((uint16) (1 << 0))
#define GS_MRKBEFORE ((uint16) (1 << 1))
/* root page of a gist */
#define GISTP_ROOT 0
/* root page of a gist index */
#define GIST_ROOT_BLKNO 0
/*
* When we update a relation on which we're doing a scan, we need to
......@@ -183,7 +194,6 @@ extern Datum gistbuild(PG_FUNCTION_ARGS);
extern Datum gistinsert(PG_FUNCTION_ARGS);
extern Datum gistbulkdelete(PG_FUNCTION_ARGS);
extern void _gistdump(Relation r);
extern void gistfreestack(GISTSTACK *s);
extern void initGISTstate(GISTSTATE *giststate, Relation index);
extern void freeGISTstate(GISTSTATE *giststate);
extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
......@@ -193,6 +203,7 @@ extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
extern void gist_undo(XLogRecPtr lsn, XLogRecord *record);
extern void gist_desc(char *buf, uint8 xl_info, char *rec);
extern MemoryContext createTempGistContext(void);
/* gistget.c */
extern Datum gistgettuple(PG_FUNCTION_ARGS);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment