Commit e78fe652 authored by Marc G. Fournier's avatar Marc G. Fournier

Oops, thanks to Dan McGuirk for pointing out that I missed part of

the commit :(

Here's the rest of the GiST code thta was missing...
parent 2fd6061e
#-------------------------------------------------------------------------
#
# Makefile.inc--
# Makefile for access/rtree (R-Tree access method)
#
# Copyright (c) 1994, Regents of the University of California
#
#
# IDENTIFICATION
# /usr/local/devel/pglite/cvs/src/backend/access/rtree/Makefile.inc,v 1.2 1995/03/21 06:50:48 andrew Exp
#
#-------------------------------------------------------------------------
SUBSRCS+= gist.c gistget.c gistscan.c giststrat.c
/*-------------------------------------------------------------------------
*
* gist.c--
* interface routines for the postgres GiST index access method.
*
*
*
* IDENTIFICATION
*
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "utils/elog.h"
#include "utils/palloc.h"
#include "utils/rel.h"
#include "utils/excid.h"
#include "access/heapam.h"
#include "access/genam.h"
#include "access/gist.h"
#include "access/gistscan.h"
#include "access/funcindex.h"
#include "access/tupdesc.h"
#include "nodes/execnodes.h"
#include "nodes/plannodes.h"
#include "executor/executor.h"
#include "executor/tuptable.h"
#include "catalog/index.h"
/* non-export function prototypes */
static InsertIndexResult gistdoinsert(Relation r, IndexTuple itup,
GISTSTATE *GISTstate);
static InsertIndexResult gistentryinsert(Relation r, GISTSTACK *stk,
IndexTuple tup,
GISTSTATE *giststate);
static void gistentryinserttwo(Relation r, GISTSTACK *stk, IndexTuple ltup,
IndexTuple rtup, GISTSTATE *giststate);
static void gistAdjustKeys(Relation r, GISTSTACK *stk, BlockNumber blk,
char *datum, int att_size, GISTSTATE *giststate);
static void gistintinsert(Relation r, GISTSTACK *stk, IndexTuple ltup,
IndexTuple rtup, GISTSTATE *giststate);
static InsertIndexResult gistSplit(Relation r, Buffer buffer,
GISTSTACK *stack, IndexTuple itup,
GISTSTATE *giststate);
static void gistnewroot(GISTSTATE *giststate, Relation r, IndexTuple lt,
IndexTuple rt);
static void GISTInitBuffer(Buffer b, uint32 f);
static BlockNumber gistChooseSubtree(Relation r, IndexTuple itup, int level,
GISTSTATE *giststate,
GISTSTACK **retstack, Buffer *leafbuf);
static OffsetNumber gistchoose(Relation r, Page p, IndexTuple it,
GISTSTATE *giststate);
static int gistnospace(Page p, IndexTuple it);
void gistdelete(Relation r, ItemPointer tid);
static IndexTuple gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t);
/*
** routine to build an index. Basically calls insert over and over
*/
void
gistbuild(Relation heap,
Relation index,
int natts,
AttrNumber *attnum,
IndexStrategy istrat,
uint16 pint,
Datum *params,
FuncIndexInfo *finfo,
PredInfo *predInfo)
{
HeapScanDesc scan;
Buffer buffer;
AttrNumber i;
HeapTuple htup;
IndexTuple itup;
TupleDesc hd, id;
InsertIndexResult res;
Datum *d;
bool *nulls;
int nb, nh, ni;
ExprContext *econtext;
TupleTable tupleTable;
TupleTableSlot *slot;
Oid hrelid, irelid;
Node *pred, *oldPred;
GISTSTATE giststate;
GISTENTRY tmpcentry;
bool *compvec;
/* GiSTs only know how to do stupid locking now */
RelationSetLockForWrite(index);
setheapoverride(TRUE); /* so we can see the new pg_index tuple */
initGISTstate(&giststate, index);
setheapoverride(FALSE);
pred = predInfo->pred;
oldPred = predInfo->oldPred;
/*
* We expect to be called exactly once for any index relation.
* If that's not the case, big trouble's what we have.
*/
if (oldPred == NULL && (nb = RelationGetNumberOfBlocks(index)) != 0)
elog(WARN, "%.16s already contains data", &(index->rd_rel->relname.data[0]));
/* initialize the root page (if this is a new index) */
if (oldPred == NULL) {
buffer = ReadBuffer(index, P_NEW);
GISTInitBuffer(buffer, F_LEAF);
WriteBuffer(buffer);
}
/* init the tuple descriptors and get set for a heap scan */
hd = RelationGetTupleDescriptor(heap);
id = RelationGetTupleDescriptor(index);
d = (Datum *)palloc(natts * sizeof (*d));
nulls = (bool *)palloc(natts * sizeof (*nulls));
/*
* If this is a predicate (partial) index, we will need to evaluate the
* predicate using ExecQual, which requires the current tuple to be in a
* slot of a TupleTable. In addition, ExecQual must have an ExprContext
* referring to that slot. Here, we initialize dummy TupleTable and
* ExprContext objects for this purpose. --Nels, Feb '92
*/
#ifndef OMIT_PARTIAL_INDEX
if (pred != NULL || oldPred != NULL) {
tupleTable = ExecCreateTupleTable(1);
slot = ExecAllocTableSlot(tupleTable);
econtext = makeNode(ExprContext);
FillDummyExprContext(econtext, slot, hd, buffer);
}
#endif /* OMIT_PARTIAL_INDEX */
scan = heap_beginscan(heap, 0, NowTimeQual, 0, (ScanKey) NULL);
htup = heap_getnext(scan, 0, &buffer);
/* int the tuples as we insert them */
nh = ni = 0;
for (; HeapTupleIsValid(htup); htup = heap_getnext(scan, 0, &buffer)) {
nh++;
/*
* If oldPred != NULL, this is an EXTEND INDEX command, so skip
* this tuple if it was already in the existing partial index
*/
if (oldPred != NULL) {
#ifndef OMIT_PARTIAL_INDEX
/*SetSlotContents(slot, htup); */
slot->val = htup;
if (ExecQual((List*)oldPred, econtext) == true) {
ni++;
continue;
}
#endif /* OMIT_PARTIAL_INDEX */
}
/* Skip this tuple if it doesn't satisfy the partial-index predicate */
if (pred != NULL) {
#ifndef OMIT_PARTIAL_INDEX
/*SetSlotContents(slot, htup); */
slot->val = htup;
if (ExecQual((List*)pred, econtext) == false)
continue;
#endif /* OMIT_PARTIAL_INDEX */
}
ni++;
/*
* For the current heap tuple, extract all the attributes
* we use in this index, and note which are null.
*/
for (i = 1; i <= natts; i++) {
int attoff;
bool attnull;
/*
* Offsets are from the start of the tuple, and are
* zero-based; indices are one-based. The next call
* returns i - 1. That's data hiding for you.
*/
attoff = AttrNumberGetAttrOffset(i);
/*
d[attoff] = HeapTupleGetAttributeValue(htup, buffer,
*/
d[attoff] = GetIndexValue(htup,
hd,
attoff,
attnum,
finfo,
&attnull,
buffer);
nulls[attoff] = (attnull ? 'n' : ' ');
}
/* immediately compress keys to normalize */
compvec = (bool *)palloc(sizeof(bool) * natts);
for (i = 0; i < natts; i++) {
gistcentryinit(&giststate, &tmpcentry, (char *)d[i],
(Relation) NULL, (Page) NULL, (OffsetNumber) 0,
-1 /* size is currently bogus */, TRUE);
if (d[i] != (Datum)tmpcentry.pred && !(giststate.keytypbyval))
compvec[i] = TRUE;
else compvec[i] = FALSE;
d[i] = (Datum)tmpcentry.pred;
}
/* form an index tuple and point it at the heap tuple */
itup = index_formtuple(id, &d[0], nulls);
itup->t_tid = htup->t_ctid;
/*
* Since we already have the index relation locked, we
* call gistdoinsert directly. Normal access method calls
* dispatch through gistinsert, which locks the relation
* for write. This is the right thing to do if you're
* inserting single tups, but not when you're initializing
* the whole index at once.
*/
res = gistdoinsert(index, itup, &giststate);
for (i = 0; i < natts; i++)
if (compvec[i] == TRUE) pfree((char *)d[i]);
pfree(itup);
pfree(res);
pfree(compvec);
}
/* okay, all heap tuples are indexed */
heap_endscan(scan);
RelationUnsetLockForWrite(index);
if (pred != NULL || oldPred != NULL) {
#ifndef OMIT_PARTIAL_INDEX
ExecDestroyTupleTable(tupleTable, true);
pfree(econtext);
#endif /* OMIT_PARTIAL_INDEX */
}
/*
* Since we just inted the tuples in the heap, we update its
* stats in pg_relation to guarantee that the planner takes
* advantage of the index we just created. UpdateStats() does a
* CommandinterIncrement(), which flushes changed entries from
* the system relcache. The act of constructing an index changes
* these heap and index tuples in the system catalogs, so they
* need to be flushed. We close them to guarantee that they
* will be.
*/
hrelid = heap->rd_id;
irelid = index->rd_id;
heap_close(heap);
index_close(index);
UpdateStats(hrelid, nh, true);
UpdateStats(irelid, ni, false);
if (oldPred != NULL) {
if (ni == nh) pred = NULL;
UpdateIndexPredicate(irelid, oldPred, pred);
}
/* be tidy */
pfree(nulls);
pfree(d);
}
/*
* gistinsert -- wrapper for GiST tuple insertion.
*
* This is the public interface routine for tuple insertion in GiSTs.
* It doesn't do any work; just locks the relation and passes the buck.
*/
InsertIndexResult
gistinsert(Relation r, Datum *datum, char *nulls, ItemPointer ht_ctid)
{
InsertIndexResult res;
IndexTuple itup;
GISTSTATE giststate;
GISTENTRY tmpentry;
int i;
bool *compvec;
initGISTstate(&giststate, r);
/* immediately compress keys to normalize */
compvec = (bool *)palloc(sizeof(bool) * r->rd_att->natts);
for (i = 0; i < r->rd_att->natts; i++) {
gistcentryinit(&giststate, &tmpentry, (char *)datum[i],
(Relation) NULL, (Page) NULL, (OffsetNumber) 0,
-1 /* size is currently bogus */, TRUE);
if (datum[i] != (Datum)tmpentry.pred && !(giststate.keytypbyval))
compvec[i] = TRUE;
else compvec[i] = FALSE;
datum[i] = (Datum)tmpentry.pred;
}
itup = index_formtuple(RelationGetTupleDescriptor(r), datum, nulls);
itup->t_tid = *ht_ctid;
RelationSetLockForWrite(r);
res = gistdoinsert(r, itup, &giststate);
for (i = 0; i < r->rd_att->natts; i++)
if (compvec[i] == TRUE) pfree((char *)datum[i]);
pfree(itup);
pfree(compvec);
/* XXX two-phase locking -- don't unlock the relation until EOT */
return (res);
}
/*
** Take a compressed entry, and install it on a page. Since we now know
** where the entry will live, we decompress it and recompress it using
** that knowledge (some compression routines may want to fish around
** on the page, for example, or do something special for leaf nodes.)
*/
static OffsetNumber
gistPageAddItem(GISTSTATE *giststate,
Relation r,
Page page,
Item item,
Size size,
OffsetNumber offsetNumber,
ItemIdFlags flags,
GISTENTRY *dentry,
IndexTuple *newtup)
{
GISTENTRY tmpcentry;
IndexTuple itup = (IndexTuple)item;
/* recompress the item given that we now know the exact page and
offset for insertion */
gistdentryinit(giststate, dentry,
(((char *) itup) + sizeof(IndexTupleData)),
(Relation)0, (Page)0, (OffsetNumber)InvalidOffsetNumber,
IndexTupleSize(itup) - sizeof(IndexTupleData), FALSE);
gistcentryinit(giststate, &tmpcentry, dentry->pred, r, page,
offsetNumber, dentry->bytes, FALSE);
*newtup = gist_tuple_replacekey(r, *dentry, itup);
/* be tidy */
if (tmpcentry.pred != dentry->pred
&& tmpcentry.pred != (((char *) itup) + sizeof(IndexTupleData)))
pfree(tmpcentry.pred);
return(PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup),
offsetNumber, flags));
}
static InsertIndexResult
gistdoinsert(Relation r,
IndexTuple itup, /* itup contains compressed entry */
GISTSTATE *giststate)
{
char *datum, *newdatum;
GISTENTRY entry, tmpdentry;
InsertIndexResult res;
OffsetNumber l;
GISTSTACK *stack, *tmpstk;
Buffer buffer;
BlockNumber blk;
Page page;
OffsetNumber off;
IndexTuple newtup;
/* 3rd arg is ignored for now */
blk = gistChooseSubtree(r, itup, 0, giststate, &stack, &buffer);
page = (Page) BufferGetPage(buffer);
if (gistnospace(page, itup)) {
/* need to do a split */
res = gistSplit(r, buffer, stack, itup, giststate);
gistfreestack(stack);
WriteBuffer(buffer); /* don't forget to release buffer! */
return (res);
}
if (PageIsEmpty(page))
off = FirstOffsetNumber;
else
off = OffsetNumberNext(PageGetMaxOffsetNumber(page));
/* add the item and write the buffer */
l = gistPageAddItem(giststate, r, page, (Item) itup, IndexTupleSize(itup),
off, LP_USED, &tmpdentry, &newtup);
WriteBuffer(buffer);
/* now expand the page boundary in the parent to include the new child */
gistAdjustKeys(r, stack, blk, tmpdentry.pred, tmpdentry.bytes, giststate);
gistfreestack(stack);
/* be tidy */
if (itup != newtup)
pfree(newtup);
if (tmpdentry.pred != (((char *) itup) + sizeof(IndexTupleData)))
pfree(tmpdentry.pred);
/* build and return an InsertIndexResult for this insertion */
res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
ItemPointerSet(&(res->pointerData), blk, l);
return (res);
}
static BlockNumber
gistChooseSubtree(Relation r, IndexTuple itup, /* itup has compressed entry */
int level,
GISTSTATE *giststate,
GISTSTACK **retstack /*out*/,
Buffer *leafbuf /*out*/)
{
Buffer buffer;
BlockNumber blk;
GISTSTACK *stack;
Page page;
GISTPageOpaque opaque;
IndexTuple which;
blk = GISTP_ROOT;
buffer = InvalidBuffer;
stack = (GISTSTACK *) NULL;
do {
/* let go of current buffer before getting next */
if (buffer != InvalidBuffer)
ReleaseBuffer(buffer);
/* get next buffer */
buffer = ReadBuffer(r, blk);
page = (Page) BufferGetPage(buffer);
opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
if (!(opaque->flags & F_LEAF)) {
GISTSTACK *n;
ItemId iid;
n = (GISTSTACK *) palloc(sizeof(GISTSTACK));
n->gs_parent = stack;
n->gs_blk = blk;
n->gs_child = gistchoose(r, page, itup, giststate);
stack = n;
iid = PageGetItemId(page, n->gs_child);
which = (IndexTuple) PageGetItem(page, iid);
blk = ItemPointerGetBlockNumber(&(which->t_tid));
}
} while (!(opaque->flags & F_LEAF));
*retstack = stack;
*leafbuf = buffer;
return(blk);
}
static void
gistAdjustKeys(Relation r,
GISTSTACK *stk,
BlockNumber blk,
char *datum, /* datum is uncompressed */
int att_size,
GISTSTATE *giststate)
{
char *oldud;
Page p;
Buffer b;
bool result;
bytea *evec;
GISTENTRY centry, *ev0p, *ev1p, *dentryp;
int size, datumsize;
IndexTuple tid;
if (stk == (GISTSTACK *) NULL)
return;
b = ReadBuffer(r, stk->gs_blk);
p = BufferGetPage(b);
oldud = (char *) PageGetItem(p, PageGetItemId(p, stk->gs_child));
tid = (IndexTuple) oldud;
size = IndexTupleSize((IndexTuple)oldud) - sizeof(IndexTupleData);
oldud += sizeof(IndexTupleData);
evec = (bytea *) palloc(2*sizeof(GISTENTRY) + VARHDRSZ);
VARSIZE(evec) = 2*sizeof(GISTENTRY) + VARHDRSZ;
/* insert decompressed oldud into entry vector */
gistdentryinit(giststate, &((GISTENTRY *)VARDATA(evec))[0],
oldud, r, p, stk->gs_child,
size, FALSE);
ev0p = &((GISTENTRY *)VARDATA(evec))[0];
/* insert datum entry into entry vector */
gistentryinit(((GISTENTRY *)VARDATA(evec))[1], datum,
(Relation)NULL,(Page)NULL,(OffsetNumber)0, att_size, FALSE);
ev1p = &((GISTENTRY *)VARDATA(evec))[1];
/* form union of decompressed entries */
datum = (char *) (giststate->unionFn)(evec, &datumsize);
/* did union leave decompressed version of oldud unchanged? */
(giststate->equalFn)(ev0p->pred, datum, &result);
if (!result) {
TupleDesc td = RelationGetTupleDescriptor(r);
/* compress datum for storage on page */
gistcentryinit(giststate, &centry, datum, ev0p->rel, ev0p->page,
ev0p->offset, datumsize, FALSE);
if (td->attrs[0]->attlen >= 0) {
memmove(oldud, centry.pred, att_size);
gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, datum, att_size,
giststate);
}
else if (VARSIZE(centry.pred) == VARSIZE(oldud)) {
memmove(oldud, centry.pred, VARSIZE(centry.pred));
gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, datum, att_size,
giststate);
}
else {
/*
** new datum is not the same size as the old.
** We have to delete the old entry and insert the new
** one. Note that this may cause a split here!
*/
IndexTuple newtup;
ItemPointerData oldtid;
char *isnull;
TupleDesc tupDesc;
InsertIndexResult res;
/* delete old tuple */
ItemPointerSet(&oldtid, stk->gs_blk, stk->gs_child);
gistdelete(r, (ItemPointer)&oldtid);
/* generate and insert new tuple */
tupDesc = r->rd_att;
isnull = (char *) palloc(r->rd_rel->relnatts);
memset(isnull, ' ', r->rd_rel->relnatts);
newtup = (IndexTuple) index_formtuple(tupDesc,
(Datum *) &centry.pred, isnull);
pfree(isnull);
/* set pointer in new tuple to point to current child */
ItemPointerSet(&oldtid, blk, 1);
newtup->t_tid = oldtid;
/* inserting the new entry also adjust keys above */
res = gistentryinsert(r, stk, newtup, giststate);
/* in stack, set info to point to new tuple */
stk->gs_blk = ItemPointerGetBlockNumber(&(res->pointerData));
stk->gs_child = ItemPointerGetOffsetNumber(&(res->pointerData));
pfree(res);
}
WriteBuffer(b);
if (centry.pred != datum)
pfree(datum);
}
else {
ReleaseBuffer(b);
}
pfree(evec);
}
/*
* gistSplit -- split a page in the tree.
*
*/
static InsertIndexResult
gistSplit(Relation r,
Buffer buffer,
GISTSTACK *stack,
IndexTuple itup, /* contains compressed entry */
GISTSTATE *giststate)
{
Page p;
Buffer leftbuf, rightbuf;
Page left, right;
ItemId itemid;
IndexTuple item;
IndexTuple ltup, rtup, newtup;
OffsetNumber maxoff;
OffsetNumber i;
OffsetNumber leftoff, rightoff;
BlockNumber lbknum, rbknum;
BlockNumber bufblock;
GISTPageOpaque opaque;
int blank;
InsertIndexResult res;
char *isnull;
GIST_SPLITVEC v;
TupleDesc tupDesc;
bytea *entryvec;
bool *decompvec;
IndexTuple item_1;
GISTENTRY tmpdentry, tmpentry;
char *datum;
isnull = (char *) palloc(r->rd_rel->relnatts);
for (blank = 0; blank < r->rd_rel->relnatts; blank++)
isnull[blank] = ' ';
p = (Page) BufferGetPage(buffer);
opaque = (GISTPageOpaque) PageGetSpecialPointer(p);
/*
* The root of the tree is the first block in the relation. If
* we're about to split the root, we need to do some hocus-pocus
* to enforce this guarantee.
*/
if (BufferGetBlockNumber(buffer) == GISTP_ROOT) {
leftbuf = ReadBuffer(r, P_NEW);
GISTInitBuffer(leftbuf, opaque->flags);
lbknum = BufferGetBlockNumber(leftbuf);
left = (Page) BufferGetPage(leftbuf);
} else {
leftbuf = buffer;
IncrBufferRefCount(buffer);
lbknum = BufferGetBlockNumber(buffer);
left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData));
}
rightbuf = ReadBuffer(r, P_NEW);
GISTInitBuffer(rightbuf, opaque->flags);
rbknum = BufferGetBlockNumber(rightbuf);
right = (Page) BufferGetPage(rightbuf);
/* generate the item array */
maxoff = PageGetMaxOffsetNumber(p);
entryvec = (bytea *)palloc(VARHDRSZ + (maxoff + 2) * sizeof(GISTENTRY));
decompvec = (bool *)palloc(VARHDRSZ + (maxoff + 2) * sizeof(bool));
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
item_1 = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
gistdentryinit(giststate, &((GISTENTRY *)VARDATA(entryvec))[i],
(((char *) item_1) + sizeof(IndexTupleData)),
r, p, i,
IndexTupleSize(item_1) - sizeof(IndexTupleData), FALSE);
if ((char *)(((GISTENTRY *)VARDATA(entryvec))[i].pred)
== (((char *) item_1) + sizeof(IndexTupleData)))
decompvec[i] = FALSE;
else decompvec[i] = TRUE;
}
/* add the new datum as the last entry */
gistdentryinit(giststate, &(((GISTENTRY *)VARDATA(entryvec))[maxoff+1]),
(((char *) itup) + sizeof(IndexTupleData)),
(Relation)NULL, (Page)NULL,
(OffsetNumber)0, tmpentry.bytes, FALSE);
if ((char *)(((GISTENTRY *)VARDATA(entryvec))[maxoff+1]).pred !=
(((char *) itup) + sizeof(IndexTupleData)))
decompvec[maxoff+1] = TRUE;
else decompvec[maxoff+1] = FALSE;
VARSIZE(entryvec) = (maxoff + 2) * sizeof(GISTENTRY) + VARHDRSZ;
/* now let the user-defined picksplit function set up the split vector */
(giststate->picksplitFn)(entryvec, &v);
/* compress ldatum and rdatum */
gistcentryinit(giststate, &tmpentry, v.spl_ldatum, (Relation)NULL,
(Page)NULL, (OffsetNumber)0,
((GISTENTRY *)VARDATA(entryvec))[i].bytes, FALSE);
if (v.spl_ldatum != tmpentry.pred)
pfree(v.spl_ldatum);
v.spl_ldatum = tmpentry.pred;
gistcentryinit(giststate, &tmpentry, v.spl_rdatum, (Relation)NULL,
(Page)NULL, (OffsetNumber)0,
((GISTENTRY *)VARDATA(entryvec))[i].bytes, FALSE);
if (v.spl_rdatum != tmpentry.pred)
pfree(v.spl_rdatum);
v.spl_rdatum = tmpentry.pred;
/* clean up the entry vector: its preds need to be deleted, too */
for (i = FirstOffsetNumber; i <= maxoff+1; i = OffsetNumberNext(i))
if (decompvec[i])
pfree(((GISTENTRY *)VARDATA(entryvec))[i].pred);
pfree(entryvec);
pfree(decompvec);
leftoff = rightoff = FirstOffsetNumber;
maxoff = PageGetMaxOffsetNumber(p);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
itemid = PageGetItemId(p, i);
item = (IndexTuple) PageGetItem(p, itemid);
if (i == *(v.spl_left)) {
(void) gistPageAddItem(giststate, r, left, (Item) item,
IndexTupleSize(item),
leftoff, LP_USED, &tmpdentry, &newtup);
leftoff = OffsetNumberNext(leftoff);
v.spl_left++; /* advance in left split vector */
/* be tidy */
if (tmpdentry.pred != (((char *) item) + sizeof(IndexTupleData)))
pfree(tmpdentry.pred);
if ((IndexTuple)item != newtup)
pfree(newtup);
}
else {
(void) gistPageAddItem(giststate, r, right, (Item) item,
IndexTupleSize(item),
rightoff, LP_USED, &tmpdentry, &newtup);
rightoff = OffsetNumberNext(rightoff);
v.spl_right++; /* advance in right split vector */
/* be tidy */
if (tmpdentry.pred != (((char *) item) + sizeof(IndexTupleData)))
pfree(tmpdentry.pred);
if (item != newtup)
pfree(newtup);
}
}
/* build an InsertIndexResult for this insertion */
res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
/* now insert the new index tuple */
if (*(v.spl_left) != FirstOffsetNumber) {
(void) gistPageAddItem(giststate, r, left, (Item) itup,
IndexTupleSize(itup),
leftoff, LP_USED, &tmpdentry, &newtup);
leftoff = OffsetNumberNext(leftoff);
ItemPointerSet(&(res->pointerData), lbknum, leftoff);
/* be tidy */
if (tmpdentry.pred != (((char *) itup) + sizeof(IndexTupleData)))
pfree(tmpdentry.pred);
if (itup != newtup)
pfree(newtup);
} else {
(void) gistPageAddItem(giststate, r, right, (Item) itup,
IndexTupleSize(itup),
rightoff, LP_USED, &tmpdentry, &newtup);
rightoff = OffsetNumberNext(rightoff);
ItemPointerSet(&(res->pointerData), rbknum, rightoff);
/* be tidy */
if (tmpdentry.pred != (((char *) itup) + sizeof(IndexTupleData)))
pfree(tmpdentry.pred);
if (itup != newtup)
pfree(newtup);
}
if ((bufblock = BufferGetBlockNumber(buffer)) != GISTP_ROOT) {
PageRestoreTempPage(left, p);
}
WriteBuffer(leftbuf);
WriteBuffer(rightbuf);
/*
* Okay, the page is split. We have three things left to do:
*
* 1) Adjust any active scans on this index to cope with changes
* we introduced in its structure by splitting this page.
*
* 2) "Tighten" the bounding box of the pointer to the left
* page in the parent node in the tree, if any. Since we
* moved a bunch of stuff off the left page, we expect it
* to get smaller. This happens in the internal insertion
* routine.
*
* 3) Insert a pointer to the right page in the parent. This
* may cause the parent to split. If it does, we need to
* repeat steps one and two for each split node in the tree.
*/
/* adjust active scans */
gistadjscans(r, GISTOP_SPLIT, bufblock, FirstOffsetNumber);
tupDesc = r->rd_att;
ltup = (IndexTuple) index_formtuple(tupDesc,
(Datum *) &(v.spl_ldatum), isnull);
rtup = (IndexTuple) index_formtuple(tupDesc,
(Datum *) &(v.spl_rdatum), isnull);
pfree(isnull);
/* set pointers to new child pages in the internal index tuples */
ItemPointerSet(&(ltup->t_tid), lbknum, 1);
ItemPointerSet(&(rtup->t_tid), rbknum, 1);
gistintinsert(r, stack, ltup, rtup, giststate);
pfree(ltup);
pfree(rtup);
return (res);
}
/*
** After a split, we need to overwrite the old entry's key in the parent,
** and install install an entry for the new key into the parent.
*/
static void
gistintinsert(Relation r,
GISTSTACK *stk,
IndexTuple ltup, /* new version of entry for old page */
IndexTuple rtup, /* entry for new page */
GISTSTATE *giststate)
{
IndexTuple old;
Buffer b;
Page p;
ItemPointerData ltid;
if (stk == (GISTSTACK *) NULL) {
gistnewroot(giststate, r, ltup, rtup);
return;
}
/* remove old left pointer, insert the 2 new entries */
ItemPointerSet(&ltid, stk->gs_blk, stk->gs_child);
gistdelete(r, (ItemPointer)&ltid);
gistentryinserttwo(r, stk, ltup, rtup, giststate);
}
/*
** Insert two entries onto one page, handling a split for either one!
*/
static void
gistentryinserttwo(Relation r, GISTSTACK *stk, IndexTuple ltup,
IndexTuple rtup, GISTSTATE *giststate)
{
Buffer b;
Page p;
InsertIndexResult res;
OffsetNumber off;
bytea *evec;
char *datum;
int size;
GISTENTRY tmpentry;
IndexTuple newtup;
b = ReadBuffer(r, stk->gs_blk);
p = BufferGetPage(b);
if (gistnospace(p, ltup)) {
res = gistSplit(r, b, stk->gs_parent, ltup, giststate);
WriteBuffer(b); /* don't forget to release buffer! - 01/31/94 */
pfree(res);
gistdoinsert(r, rtup, giststate);
} else {
(void) gistPageAddItem(giststate, r, p, (Item)ltup,
IndexTupleSize(ltup), InvalidOffsetNumber,
LP_USED, &tmpentry, &newtup);
WriteBuffer(b);
gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, tmpentry.pred,
tmpentry.bytes, giststate);
/* be tidy */
if (tmpentry.pred != (((char *) ltup) + sizeof(IndexTupleData)))
pfree(tmpentry.pred);
if (ltup != newtup)
pfree(newtup);
(void)gistentryinsert(r, stk, rtup, giststate);
}
}
/*
** Insert an entry onto a page
*/
static InsertIndexResult
gistentryinsert(Relation r, GISTSTACK *stk, IndexTuple tup,
GISTSTATE *giststate)
{
Buffer b;
Page p;
InsertIndexResult res;
bytea *evec;
char *datum;
int size;
OffsetNumber off;
GISTENTRY tmpentry;
IndexTuple newtup;
b = ReadBuffer(r, stk->gs_blk);
p = BufferGetPage(b);
if (gistnospace(p, tup)) {
res = gistSplit(r, b, stk->gs_parent, tup, giststate);
WriteBuffer(b); /* don't forget to release buffer! - 01/31/94 */
return(res);
}
else {
res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
off = gistPageAddItem(giststate, r, p, (Item) tup, IndexTupleSize(tup),
InvalidOffsetNumber, LP_USED, &tmpentry, &newtup);
WriteBuffer(b);
ItemPointerSet(&(res->pointerData), stk->gs_blk, off);
gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, tmpentry.pred,
tmpentry.bytes, giststate);
/* be tidy */
if (tmpentry.pred != (((char *) tup) + sizeof(IndexTupleData)))
pfree(tmpentry.pred);
if (tup != newtup)
pfree(newtup);
return(res);
}
}
static void
gistnewroot(GISTSTATE *giststate, Relation r, IndexTuple lt, IndexTuple rt)
{
Buffer b;
Page p;
GISTENTRY tmpentry;
IndexTuple newtup;
b = ReadBuffer(r, GISTP_ROOT);
GISTInitBuffer(b, 0);
p = BufferGetPage(b);
(void) gistPageAddItem(giststate, r, p, (Item) lt, IndexTupleSize(lt),
FirstOffsetNumber,
LP_USED, &tmpentry, &newtup);
/* be tidy */
if (tmpentry.pred != (((char *) lt) + sizeof(IndexTupleData)))
pfree(tmpentry.pred);
if (lt != newtup)
pfree(newtup);
(void) gistPageAddItem(giststate, r, p, (Item) rt, IndexTupleSize(rt),
OffsetNumberNext(FirstOffsetNumber), LP_USED,
&tmpentry, &newtup);
/* be tidy */
if (tmpentry.pred != (((char *) rt) + sizeof(IndexTupleData)))
pfree(tmpentry.pred);
if (rt != newtup)
pfree(newtup);
WriteBuffer(b);
}
static void
GISTInitBuffer(Buffer b, uint32 f)
{
GISTPageOpaque opaque;
Page page;
Size pageSize;
pageSize = BufferGetPageSize(b);
page = BufferGetPage(b);
memset(page, 0, (int) pageSize);
PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
opaque->flags = f;
}
/*
** find entry with lowest penalty
*/
static OffsetNumber
gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
GISTSTATE *giststate)
{
OffsetNumber maxoff;
OffsetNumber i;
char *ud, *id;
char *datum;
float usize, dsize;
OffsetNumber which;
float which_grow;
GISTENTRY entry, identry;
int size, idsize;
idsize = IndexTupleSize(it) - sizeof(IndexTupleData);
id = ((char *) it) + sizeof(IndexTupleData);
maxoff = PageGetMaxOffsetNumber(p);
which_grow = -1.0;
which = -1;
gistdentryinit(giststate,&identry,id,(Relation)NULL,(Page)NULL,
(OffsetNumber)0, idsize, FALSE);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
datum = (char *) PageGetItem(p, PageGetItemId(p, i));
size = IndexTupleSize(datum) - sizeof(IndexTupleData);
datum += sizeof(IndexTupleData);
gistdentryinit(giststate,&entry,datum,r,p,i,size,FALSE);
(giststate->penaltyFn)(entry, identry, &usize);
if (which_grow < 0 || usize < which_grow) {
which = i;
which_grow = usize;
if (which_grow == 0)
break;
}
if (entry.pred != datum)
pfree(entry.pred);
}
if (identry.pred != id)
pfree(identry.pred);
return (which);
}
static int
gistnospace(Page p, IndexTuple it)
{
return (PageGetFreeSpace(p) < IndexTupleSize(it));
}
void
gistfreestack(GISTSTACK *s)
{
GISTSTACK *p;
while (s != (GISTSTACK *) NULL) {
p = s->gs_parent;
pfree(s);
s = p;
}
}
/*
** remove an entry from a page
*/
void
gistdelete(Relation r, ItemPointer tid)
{
BlockNumber blkno;
OffsetNumber offnum;
Buffer buf;
Page page;
/* must write-lock on delete */
RelationSetLockForWrite(r);
blkno = ItemPointerGetBlockNumber(tid);
offnum = ItemPointerGetOffsetNumber(tid);
/* adjust any scans that will be affected by this deletion */
gistadjscans(r, GISTOP_DEL, blkno, offnum);
/* delete the index tuple */
buf = ReadBuffer(r, blkno);
page = BufferGetPage(buf);
PageIndexTupleDelete(page, offnum);
WriteBuffer(buf);
/* XXX -- two-phase locking, don't release the write lock */
}
void
initGISTstate(GISTSTATE *giststate, Relation index)
{
RegProcedure consistent_proc, union_proc, compress_proc, decompress_proc;
RegProcedure penalty_proc, picksplit_proc, equal_proc;
func_ptr user_fn;
int pronargs;
HeapTuple htup;
IndexTupleForm itupform;
consistent_proc = index_getprocid(index, 1, GIST_CONSISTENT_PROC);
union_proc = index_getprocid(index, 1, GIST_UNION_PROC);
compress_proc = index_getprocid(index, 1, GIST_COMPRESS_PROC);
decompress_proc = index_getprocid(index, 1, GIST_DECOMPRESS_PROC);
penalty_proc = index_getprocid(index, 1, GIST_PENALTY_PROC);
picksplit_proc = index_getprocid(index, 1, GIST_PICKSPLIT_PROC);
equal_proc = index_getprocid(index, 1, GIST_EQUAL_PROC);
fmgr_info(consistent_proc, &user_fn, &pronargs);
giststate->consistentFn = user_fn;
fmgr_info(union_proc, &user_fn, &pronargs);
giststate->unionFn = user_fn;
fmgr_info(compress_proc, &user_fn, &pronargs);
giststate->compressFn = user_fn;
fmgr_info(decompress_proc, &user_fn, &pronargs);
giststate->decompressFn = user_fn;
fmgr_info(penalty_proc, &user_fn, &pronargs);
giststate->penaltyFn = user_fn;
fmgr_info(picksplit_proc, &user_fn, &pronargs);
giststate->picksplitFn = user_fn;
fmgr_info(equal_proc, &user_fn, &pronargs);
giststate->equalFn = user_fn;
/* see if key type is different from type of attribute being indexed */
htup = SearchSysCacheTuple(INDEXRELID, ObjectIdGetDatum(index->rd_id),
0,0,0);
itupform = (IndexTupleForm)GETSTRUCT(htup);
if (!HeapTupleIsValid(htup))
elog(WARN, "initGISTstate: index %d not found", index->rd_id);
giststate->haskeytype = itupform->indhaskeytype;
if (giststate->haskeytype) {
/* key type is different -- is it byval? */
htup = SearchSysCacheTuple(ATTNUM,
ObjectIdGetDatum(itupform->indexrelid),
UInt16GetDatum(FirstOffsetNumber),
0,0);
if (!HeapTupleIsValid(htup)) {
elog(WARN, "initGISTstate: no attribute tuple %d %d",
itupform->indexrelid, FirstOffsetNumber);
return;
}
giststate->keytypbyval = (((AttributeTupleForm)htup)->attbyval);
}
else
giststate->keytypbyval = FALSE;
return;
}
/*
** Given an IndexTuple to be inserted on a page, this routine replaces
** the key with another key, which may involve generating a new IndexTuple
** if the sizes don't match
*/
static IndexTuple
gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t)
{
char * datum = (((char *) t) + sizeof(IndexTupleData));
/* if new entry fits in index tuple, copy it in */
if (entry.bytes < IndexTupleSize(t) - sizeof(IndexTupleData)) {
memcpy(datum, entry.pred, entry.bytes);
/* clear out old size */
t->t_info &= 0xe000;
/* or in new size */
t->t_info |= MAXALIGN(entry.bytes + sizeof(IndexTupleData));
return(t);
}
else {
/* generate a new index tuple for the compressed entry */
TupleDesc tupDesc = r->rd_att;
IndexTuple newtup;
char *isnull;
int blank;
isnull = (char *) palloc(r->rd_rel->relnatts);
for (blank = 0; blank < r->rd_rel->relnatts; blank++)
isnull[blank] = ' ';
newtup = (IndexTuple) index_formtuple(tupDesc,
(Datum *)&(entry.pred),
isnull);
newtup->t_tid = t->t_tid;
pfree(isnull);
return(newtup);
}
}
/*
** initialize a GiST entry with a decompressed version of pred
*/
void
gistdentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r,
Page pg, OffsetNumber o, int b, bool l)
{
GISTENTRY *dep;
gistentryinit(*e, pr, r, pg, o, b, l);
if (giststate->haskeytype) {
dep = (GISTENTRY *)((giststate->decompressFn)(e));
gistentryinit(*e, dep->pred, dep->rel, dep->page, dep->offset, dep->bytes,
dep->leafkey);
if (dep != e) pfree(dep);
}
}
/*
** initialize a GiST entry with a compressed version of pred
*/
void
gistcentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r,
Page pg, OffsetNumber o, int b, bool l)
{
GISTENTRY *cep;
gistentryinit(*e, pr, r, pg, o, b, l);
if (giststate->haskeytype) {
cep = (GISTENTRY *)((giststate->compressFn)(e));
gistentryinit(*e, cep->pred, cep->rel, cep->page, cep->offset, cep->bytes,
cep->leafkey);
if (cep != e) pfree(cep);
}
}
#define GISTDEBUG
#ifdef GISTDEBUG
extern char *text_range_out();
extern char *int_range_out();
/*
** sloppy debugging support routine, requires recompilation with appropriate
** "out" method for the index keys. Could be fixed to find that info
** in the catalogs...
*/
void
_gistdump(Relation r)
{
Buffer buf;
Page page;
OffsetNumber offnum, maxoff;
BlockNumber blkno;
BlockNumber nblocks;
GISTPageOpaque po;
IndexTuple itup;
BlockNumber itblkno;
OffsetNumber itoffno;
char *datum;
char *itkey;
nblocks = RelationGetNumberOfBlocks(r);
for (blkno = 0; blkno < nblocks; blkno++) {
buf = ReadBuffer(r, blkno);
page = BufferGetPage(buf);
po = (GISTPageOpaque) PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
printf("Page %d maxoff %d <%s>\n", blkno, maxoff,
(po->flags & F_LEAF ? "LEAF" : "INTERNAL"));
if (PageIsEmpty(page)) {
ReleaseBuffer(buf);
continue;
}
for (offnum = FirstOffsetNumber;
offnum <= maxoff;
offnum = OffsetNumberNext(offnum)) {
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
itblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid));
datum = ((char *) itup);
datum += sizeof(IndexTupleData);
/* get out function for type of key, and out it! */
itkey = (char *) int_range_out(datum);
/* itkey = " unable to print"; */
printf("\t[%d] size %d heap <%d,%d> key:%s\n",
offnum, IndexTupleSize(itup), itblkno, itoffno, itkey);
pfree(itkey);
}
ReleaseBuffer(buf);
}
}
#define TRLOWER(tr) (((tr)->bytes))
#define TRUPPER(tr) (&((tr)->bytes[MAXALIGN(VARSIZE(TRLOWER(tr)))]))
typedef struct txtrange {
/* flag: NINF means that lower is negative infinity; PINF means that
** upper is positive infinity. 0 means that both are numbers.
*/
int32 vl_len;
int32 flag;
char bytes[2];
} TXTRANGE;
typedef struct intrange {
int lower;
int upper;
/* flag: NINF means that lower is negative infinity; PINF means that
** upper is positive infinity. 0 means that both are numbers.
*/
int flag;
} INTRANGE;
char *text_range_out(TXTRANGE *r)
{
char *result;
char *lower, *upper;
if (r == NULL)
return(NULL);
result = (char *)palloc(16 + VARSIZE(TRLOWER(r)) + VARSIZE(TRUPPER(r))
- 2*VARHDRSZ);
lower = (char *)palloc(VARSIZE(TRLOWER(r)) + 1 - VARHDRSZ);
memcpy(lower, VARDATA(TRLOWER(r)), VARSIZE(TRLOWER(r)) - VARHDRSZ);
lower[VARSIZE(TRLOWER(r)) - VARHDRSZ] = '\0';
upper = (char *)palloc(VARSIZE(TRUPPER(r)) + 1 - VARHDRSZ);
memcpy(upper, VARDATA(TRUPPER(r)), VARSIZE(TRUPPER(r)) - VARHDRSZ);
upper[VARSIZE(TRUPPER(r)) - VARHDRSZ] = '\0';
(void) sprintf(result, "[%s,%s): %d", lower, upper, r->flag);
pfree(lower);
pfree(upper);
return(result);
}
char *
int_range_out(INTRANGE *r)
{
char *result;
if (r == NULL)
return(NULL);
result = (char *)palloc(80);
(void) sprintf(result, "[%d,%d): %d",r->lower, r->upper, r->flag);
return(result);
}
#endif /* defined GISTDEBUG */
/*-------------------------------------------------------------------------
*
* gistget.c--
* fetch tuples from a GiST scan.
*
*
*
* IDENTIFICATION
* /usr/local/devel/pglite/cvs/src/backend/access/gisr/gistget.c,v 1.9 1995/08/01 20:16:02 jolly Exp
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "utils/elog.h"
#include "utils/palloc.h"
#include "utils/rel.h"
#include "access/heapam.h"
#include "access/genam.h"
#include "access/iqual.h"
#include "access/gist.h"
#include "access/sdir.h"
#include "executor/execdebug.h"
static OffsetNumber gistfindnext(IndexScanDesc s, Page p, OffsetNumber n,
ScanDirection dir);
static RetrieveIndexResult gistscancache(IndexScanDesc s, ScanDirection dir);
static RetrieveIndexResult gistfirst(IndexScanDesc s, ScanDirection dir);
static RetrieveIndexResult gistnext(IndexScanDesc s, ScanDirection dir);
static ItemPointer gistheapptr(Relation r, ItemPointer itemp);
RetrieveIndexResult
gistgettuple(IndexScanDesc s, ScanDirection dir)
{
RetrieveIndexResult res;
/* if we have it cached in the scan desc, just return the value */
if ((res = gistscancache(s, dir)) != (RetrieveIndexResult) NULL)
return (res);
/* not cached, so we'll have to do some work */
if (ItemPointerIsValid(&(s->currentItemData))) {
res = gistnext(s, dir);
} else {
res = gistfirst(s, dir);
}
return (res);
}
static RetrieveIndexResult
gistfirst(IndexScanDesc s, ScanDirection dir)
{
Buffer b;
Page p;
OffsetNumber n;
OffsetNumber maxoff;
RetrieveIndexResult res;
GISTPageOpaque po;
GISTScanOpaque so;
GISTSTACK *stk;
BlockNumber blk;
IndexTuple it;
ItemPointer ip;
b = ReadBuffer(s->relation, GISTP_ROOT);
p = BufferGetPage(b);
po = (GISTPageOpaque) PageGetSpecialPointer(p);
so = (GISTScanOpaque) s->opaque;
for (;;) {
maxoff = PageGetMaxOffsetNumber(p);
if (ScanDirectionIsBackward(dir))
n = gistfindnext(s, p, maxoff, dir);
else
n = gistfindnext(s, p, FirstOffsetNumber, dir);
while (n < FirstOffsetNumber || n > maxoff) {
ReleaseBuffer(b);
if (so->s_stack == (GISTSTACK *) NULL)
return ((RetrieveIndexResult) NULL);
stk = so->s_stack;
b = ReadBuffer(s->relation, stk->gs_blk);
p = BufferGetPage(b);
po = (GISTPageOpaque) PageGetSpecialPointer(p);
maxoff = PageGetMaxOffsetNumber(p);
if (ScanDirectionIsBackward(dir)) {
n = OffsetNumberPrev(stk->gs_child);
} else {
n = OffsetNumberNext(stk->gs_child);
}
so->s_stack = stk->gs_parent;
pfree(stk);
n = gistfindnext(s, p, n, dir);
}
if (po->flags & F_LEAF) {
ItemPointerSet(&(s->currentItemData), BufferGetBlockNumber(b), n);
it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
ip = (ItemPointer) palloc(sizeof(ItemPointerData));
memmove((char *) ip, (char *) &(it->t_tid),
sizeof(ItemPointerData));
ReleaseBuffer(b);
res = FormRetrieveIndexResult(&(s->currentItemData), ip);
return (res);
} else {
stk = (GISTSTACK *) palloc(sizeof(GISTSTACK));
stk->gs_child = n;
stk->gs_blk = BufferGetBlockNumber(b);
stk->gs_parent = so->s_stack;
so->s_stack = stk;
it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
blk = ItemPointerGetBlockNumber(&(it->t_tid));
ReleaseBuffer(b);
b = ReadBuffer(s->relation, blk);
p = BufferGetPage(b);
po = (GISTPageOpaque) PageGetSpecialPointer(p);
}
}
}
static RetrieveIndexResult
gistnext(IndexScanDesc s, ScanDirection dir)
{
Buffer b;
Page p;
OffsetNumber n;
OffsetNumber maxoff;
RetrieveIndexResult res;
GISTPageOpaque po;
GISTScanOpaque so;
GISTSTACK *stk;
BlockNumber blk;
IndexTuple it;
ItemPointer ip;
blk = ItemPointerGetBlockNumber(&(s->currentItemData));
n = ItemPointerGetOffsetNumber(&(s->currentItemData));
if (ScanDirectionIsForward(dir)) {
n = OffsetNumberNext(n);
} else {
n = OffsetNumberPrev(n);
}
b = ReadBuffer(s->relation, blk);
p = BufferGetPage(b);
po = (GISTPageOpaque) PageGetSpecialPointer(p);
so = (GISTScanOpaque) s->opaque;
for (;;) {
maxoff = PageGetMaxOffsetNumber(p);
n = gistfindnext(s, p, n, dir);
while (n < FirstOffsetNumber || n > maxoff) {
ReleaseBuffer(b);
if (so->s_stack == (GISTSTACK *) NULL)
return ((RetrieveIndexResult) NULL);
stk = so->s_stack;
b = ReadBuffer(s->relation, stk->gs_blk);
p = BufferGetPage(b);
maxoff = PageGetMaxOffsetNumber(p);
po = (GISTPageOpaque) PageGetSpecialPointer(p);
if (ScanDirectionIsBackward(dir)) {
n = OffsetNumberPrev(stk->gs_child);
} else {
n = OffsetNumberNext(stk->gs_child);
}
so->s_stack = stk->gs_parent;
pfree(stk);
n = gistfindnext(s, p, n, dir);
}
if (po->flags & F_LEAF) {
ItemPointerSet(&(s->currentItemData), BufferGetBlockNumber(b), n);
it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
ip = (ItemPointer) palloc(sizeof(ItemPointerData));
memmove((char *) ip, (char *) &(it->t_tid),
sizeof(ItemPointerData));
ReleaseBuffer(b);
res = FormRetrieveIndexResult(&(s->currentItemData), ip);
return (res);
} else {
stk = (GISTSTACK *) palloc(sizeof(GISTSTACK));
stk->gs_child = n;
stk->gs_blk = BufferGetBlockNumber(b);
stk->gs_parent = so->s_stack;
so->s_stack = stk;
it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
blk = ItemPointerGetBlockNumber(&(it->t_tid));
ReleaseBuffer(b);
b = ReadBuffer(s->relation, blk);
p = BufferGetPage(b);
po = (GISTPageOpaque) PageGetSpecialPointer(p);
if (ScanDirectionIsBackward(dir)) {
n = PageGetMaxOffsetNumber(p);
} else {
n = FirstOffsetNumber;
}
}
}
}
/* Similar to index_keytest, but decompresses the key in the IndexTuple */
bool
gistindex_keytest(IndexTuple tuple,
TupleDesc tupdesc,
int scanKeySize,
ScanKey key,
GISTSTATE *giststate,
Relation r,
Page p,
OffsetNumber offset)
{
bool isNull;
Datum datum;
int test;
GISTENTRY de;
IncrIndexProcessed();
while (scanKeySize > 0) {
datum = index_getattr(tuple,
1,
tupdesc,
&isNull);
gistdentryinit(giststate, &de, (char *)datum, r, p, offset,
IndexTupleSize(tuple) - sizeof(IndexTupleData),
FALSE);
if (isNull) {
/* XXX eventually should check if SK_ISNULL */
return (false);
}
if (key[0].sk_flags & SK_COMMUTE) {
test = (int) (*(key[0].sk_func))
(DatumGetPointer(key[0].sk_argument),
&de, key[0].sk_procedure);
} else {
test = (int) (*(key[0].sk_func))
(&de,
DatumGetPointer(key[0].sk_argument),
key[0].sk_procedure);
}
if (!test == !(key[0].sk_flags & SK_NEGATE)) {
return (false);
}
scanKeySize -= 1;
key++;
}
return (true);
}
static OffsetNumber
gistfindnext(IndexScanDesc s, Page p, OffsetNumber n, ScanDirection dir)
{
OffsetNumber maxoff;
char *it;
GISTPageOpaque po;
GISTScanOpaque so;
GISTENTRY de;
GISTSTATE *giststate;
maxoff = PageGetMaxOffsetNumber(p);
po = (GISTPageOpaque) PageGetSpecialPointer(p);
so = (GISTScanOpaque) s->opaque;
giststate = so->giststate;
/*
* If we modified the index during the scan, we may have a pointer to
* a ghost tuple, before the scan. If this is the case, back up one.
*/
if (so->s_flags & GS_CURBEFORE) {
so->s_flags &= ~GS_CURBEFORE;
n = OffsetNumberPrev(n);
}
while (n >= FirstOffsetNumber && n <= maxoff) {
it = (char *) PageGetItem(p, PageGetItemId(p, n));
if (gistindex_keytest((IndexTuple) it,
RelationGetTupleDescriptor(s->relation),
s->numberOfKeys, s->keyData, giststate,
s->relation, p, n))
break;
if (ScanDirectionIsBackward(dir)) {
n = OffsetNumberPrev(n);
} else {
n = OffsetNumberNext(n);
}
}
return (n);
}
static RetrieveIndexResult
gistscancache(IndexScanDesc s, ScanDirection dir)
{
RetrieveIndexResult res;
ItemPointer ip;
if (!(ScanDirectionIsNoMovement(dir)
&& ItemPointerIsValid(&(s->currentItemData)))) {
return ((RetrieveIndexResult) NULL);
}
ip = gistheapptr(s->relation, &(s->currentItemData));
if (ItemPointerIsValid(ip))
res = FormRetrieveIndexResult(&(s->currentItemData), ip);
else
res = (RetrieveIndexResult) NULL;
return (res);
}
/*
* gistheapptr returns the item pointer to the tuple in the heap relation
* for which itemp is the index relation item pointer.
*/
static ItemPointer
gistheapptr(Relation r, ItemPointer itemp)
{
Buffer b;
Page p;
IndexTuple it;
ItemPointer ip;
OffsetNumber n;
ip = (ItemPointer) palloc(sizeof(ItemPointerData));
if (ItemPointerIsValid(itemp)) {
b = ReadBuffer(r, ItemPointerGetBlockNumber(itemp));
p = BufferGetPage(b);
n = ItemPointerGetOffsetNumber(itemp);
it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n));
memmove((char *) ip, (char *) &(it->t_tid),
sizeof(ItemPointerData));
ReleaseBuffer(b);
} else {
ItemPointerSetInvalid(ip);
}
return (ip);
}
/*-------------------------------------------------------------------------
*
* gist.c--
* interface routines for the postgres GiST indexed access method.
*
*
*
* IDENTIFICATION
* /usr/local/devel/pglite/cvs/src/backend/access/rtree/rtree.c,v 1.16 1995/08/01 20:16:03 jolly Exp
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "utils/elog.h"
#include "utils/palloc.h"
#include "utils/rel.h"
#include "utils/excid.h"
#include "access/heapam.h"
#include "access/genam.h"
#include "access/gist.h"
#include "access/gistscan.h"
#include "access/funcindex.h"
#include "access/tupdesc.h"
#include "nodes/execnodes.h"
#include "nodes/plannodes.h"
#include "executor/executor.h"
#include "executor/tuptable.h"
#include "catalog/index.h"
/* non-export function prototypes */
static InsertIndexResult gistdoinsert(Relation r, IndexTuple itup,
GISTSTATE *GISTstate);
static InsertIndexResult gistentryinsert(Relation r, GISTSTACK *stk,
IndexTuple tup,
GISTSTATE *giststate);
static void gistentryinserttwo(Relation r, GISTSTACK *stk, IndexTuple ltup,
IndexTuple rtup, GISTSTATE *giststate);
static void gistAdjustKeys(Relation r, GISTSTACK *stk, BlockNumber blk,
char *datum, int att_size, GISTSTATE *giststate);
static void gistintinsert(Relation r, GISTSTACK *stk, IndexTuple ltup,
IndexTuple rtup, GISTSTATE *giststate);
static InsertIndexResult gistSplit(Relation r, Buffer buffer,
GISTSTACK *stack, IndexTuple itup,
GISTSTATE *giststate);
static void gistnewroot(Relation r, IndexTuple lt, IndexTuple rt);
static void GISTInitBuffer(Buffer b, uint32 f);
static BlockNumber gistChooseSubtree(Relation r, IndexTuple itup, int level,
GISTSTATE *giststate,
GISTSTACK **retstack, Buffer *leafbuf);
static OffsetNumber gistchoose(Relation r, Page p, IndexTuple it,
GISTSTATE *giststate);
static int gistnospace(Page p, IndexTuple it);
void gistdelete(Relation r, ItemPointer tid);
static IndexTuple gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t);
void
gistbuild(Relation heap,
Relation index,
int natts,
AttrNumber *attnum,
IndexStrategy istrat,
uint16 pint,
Datum *params,
FuncIndexInfo *finfo,
PredInfo *predInfo)
{
HeapScanDesc scan;
Buffer buffer;
AttrNumber i;
HeapTuple htup;
IndexTuple itup;
TupleDesc hd, id;
InsertIndexResult res;
Datum *d;
bool *nulls;
int nb, nh, ni;
ExprContext *econtext;
TupleTable tupleTable;
TupleTableSlot *slot;
Oid hrelid, irelid;
Node *pred, *oldPred;
GISTSTATE giststate;
GISTENTRY tmpentry;
bool *decompvec;
initGISTstate(&giststate, index);
/* GiSTs only know how to do stupid locking now */
RelationSetLockForWrite(index);
pred = predInfo->pred;
oldPred = predInfo->oldPred;
/*
* We expect to be called exactly once for any index relation.
* If that's not the case, big trouble's what we have.
*/
if (oldPred == NULL && (nb = RelationGetNumberOfBlocks(index)) != 0)
elog(WARN, "%.16s already contains data", &(index->rd_rel->relname.data[0]));
/* initialize the root page (if this is a new index) */
if (oldPred == NULL) {
buffer = ReadBuffer(index, P_NEW);
GISTInitBuffer(buffer, F_LEAF);
WriteBuffer(buffer);
}
/* init the tuple descriptors and get set for a heap scan */
hd = RelationGetTupleDescriptor(heap);
id = RelationGetTupleDescriptor(index);
d = (Datum *)palloc(natts * sizeof (*d));
nulls = (bool *)palloc(natts * sizeof (*nulls));
/*
* If this is a predicate (partial) index, we will need to evaluate the
* predicate using ExecQual, which requires the current tuple to be in a
* slot of a TupleTable. In addition, ExecQual must have an ExprContext
* referring to that slot. Here, we initialize dummy TupleTable and
* ExprContext objects for this purpose. --Nels, Feb '92
*/
#ifndef OMIT_PARTIAL_INDEX
if (pred != NULL || oldPred != NULL) {
tupleTable = ExecCreateTupleTable(1);
slot = ExecAllocTableSlot(tupleTable);
econtext = makeNode(ExprContext);
FillDummyExprContext(econtext, slot, hd, buffer);
}
#endif /* OMIT_PARTIAL_INDEX */
scan = heap_beginscan(heap, 0, NowTimeQual, 0, (ScanKey) NULL);
htup = heap_getnext(scan, 0, &buffer);
/* int the tuples as we insert them */
nh = ni = 0;
for (; HeapTupleIsValid(htup); htup = heap_getnext(scan, 0, &buffer)) {
nh++;
/*
* If oldPred != NULL, this is an EXTEND INDEX command, so skip
* this tuple if it was already in the existing partial index
*/
if (oldPred != NULL) {
#ifndef OMIT_PARTIAL_INDEX
/*SetSlotContents(slot, htup); */
slot->val = htup;
if (ExecQual((List*)oldPred, econtext) == true) {
ni++;
continue;
}
#endif /* OMIT_PARTIAL_INDEX */
}
/* Skip this tuple if it doesn't satisfy the partial-index predicate */
if (pred != NULL) {
#ifndef OMIT_PARTIAL_INDEX
/*SetSlotContents(slot, htup); */
slot->val = htup;
if (ExecQual((List*)pred, econtext) == false)
continue;
#endif /* OMIT_PARTIAL_INDEX */
}
ni++;
/*
* For the current heap tuple, extract all the attributes
* we use in this index, and note which are null.
*/
for (i = 1; i <= natts; i++) {
int attoff;
bool attnull;
/*
* Offsets are from the start of the tuple, and are
* zero-based; indices are one-based. The next call
* returns i - 1. That's data hiding for you.
*/
attoff = AttrNumberGetAttrOffset(i);
/*
d[attoff] = HeapTupleGetAttributeValue(htup, buffer,
*/
d[attoff] = GetIndexValue(htup,
hd,
attoff,
attnum,
finfo,
&attnull,
buffer);
nulls[attoff] = (attnull ? 'n' : ' ');
}
/* immediately compress keys, and generate an index tuple */
decompvec = (bool *)palloc(sizeof(bool) * natts);
for (i = 0; i < natts; i++) {
gistcentryinit(&giststate, &tmpentry, (char *)d[i], (Relation) NULL,
(Page) NULL, (OffsetNumber) 0,
-1 /* size is currently bogus */, TRUE);
if (d[i] != (Datum)tmpentry.pred && tmpentry.bytes > sizeof(int32))
decompvec[i] = TRUE;
else decompvec[i] = FALSE;
d[i] = (Datum)tmpentry.pred;
}
/* form an index tuple and point it at the heap tuple */
itup = index_formtuple(id, &d[0], nulls);
itup->t_tid = htup->t_ctid;
/*
* Since we already have the index relation locked, we
* call gistdoinsert directly. Normal access method calls
* dispatch through gistinsert, which locks the relation
* for write. This is the right thing to do if you're
* inserting single tups, but not when you're initializing
* the whole index at once.
*/
res = gistdoinsert(index, itup, &giststate);
for (i = 0; i < natts; i++)
if (decompvec[i] == TRUE) pfree((char *)d[i]);
pfree(itup);
pfree(res);
}
/* okay, all heap tuples are indexed */
heap_endscan(scan);
RelationUnsetLockForWrite(index);
if (pred != NULL || oldPred != NULL) {
#ifndef OMIT_PARTIAL_INDEX
ExecDestroyTupleTable(tupleTable, true);
pfree(econtext);
#endif /* OMIT_PARTIAL_INDEX */
}
/*
* Since we just inted the tuples in the heap, we update its
* stats in pg_relation to guarantee that the planner takes
* advantage of the index we just created. UpdateStats() does a
* CommandinterIncrement(), which flushes changed entries from
* the system relcache. The act of constructing an index changes
* these heap and index tuples in the system catalogs, so they
* need to be flushed. We close them to guarantee that they
* will be.
*/
hrelid = heap->rd_id;
irelid = index->rd_id;
heap_close(heap);
index_close(index);
UpdateStats(hrelid, nh, true);
UpdateStats(irelid, ni, false);
if (oldPred != NULL) {
if (ni == nh) pred = NULL;
UpdateIndexPredicate(irelid, oldPred, pred);
}
/* be tidy */
pfree(nulls);
pfree(d);
}
/*
* gistinsert -- wrapper for GiST tuple insertion.
*
* This is the public interface routine for tuple insertion in GiSTs.
* It doesn't do any work; just locks the relation and passes the buck.
*/
InsertIndexResult
gistinsert(Relation r, Datum *datum, char *nulls, ItemPointer ht_ctid)
{
InsertIndexResult res;
IndexTuple itup;
GISTSTATE giststate;
GISTENTRY tmpentry;
int i;
bool *decompvec;
initGISTstate(&giststate, r);
/* immediately compress keys, and generate an index tuple */
decompvec = (bool *)palloc(sizeof(bool) * r->rd_att->natts);
for (i = 0; i < r->rd_att->natts; i++) {
gistcentryinit(&giststate, &tmpentry, (char *)datum[i], (Relation)NULL,
(Page) NULL, (OffsetNumber) 0,
-1 /* size is currently bogus */, TRUE);
if (datum[i] != (Datum)tmpentry.pred && tmpentry.bytes > sizeof(int32))
decompvec[i] = TRUE;
else decompvec[i] = FALSE;
datum[i] = (Datum)tmpentry.pred;
}
itup = index_formtuple(RelationGetTupleDescriptor(r), datum, nulls);
itup->t_tid = *ht_ctid;
RelationSetLockForWrite(r);
res = gistdoinsert(r, itup, &giststate);
for (i = 0; i < r->rd_att->natts; i++)
if (decompvec[i] == TRUE) pfree((char *)datum[i]);
pfree(itup);
/* XXX two-phase locking -- don't unlock the relation until EOT */
return (res);
}
/* itup contains original uncompressed entry */
static InsertIndexResult
gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate)
{
char *datum, *newdatum;
GISTENTRY entry, tmpentry;
InsertIndexResult res;
OffsetNumber l;
GISTSTACK *stack, *tmpstk;
Buffer buffer;
BlockNumber blk;
Page page;
/* 3rd arg is ignored for now */
blk = gistChooseSubtree(r, itup, 0, giststate, &stack, &buffer);
page = (Page) BufferGetPage(buffer);
if (gistnospace(page, itup)) {
/* need to do a split */
res = gistSplit(r, buffer, stack, itup, giststate);
gistfreestack(stack);
WriteBuffer(buffer); /* don't forget to release buffer! */
return (res);
}
/* add the item and write the buffer */
if (PageIsEmpty(page)) {
l = PageAddItem(page, (Item) itup, IndexTupleSize(itup),
FirstOffsetNumber,
LP_USED);
} else {
l = PageAddItem(page, (Item) itup, IndexTupleSize(itup),
OffsetNumberNext(PageGetMaxOffsetNumber(page)),
LP_USED);
}
WriteBuffer(buffer);
/* now expand the page boundary in the parent to include the new child */
datum = (((char *) itup) + sizeof(IndexTupleData));
gistdentryinit(giststate, &tmpentry, datum, (Relation)0, (Page)0,
(OffsetNumber)0,
IndexTupleSize(itup) - sizeof(IndexTupleData), FALSE);
gistAdjustKeys(r, stack, blk, tmpentry.pred, tmpentry.bytes, giststate);
gistfreestack(stack);
if (tmpentry.pred != datum)
pfree(tmpentry.pred);
/* build and return an InsertIndexResult for this insertion */
res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
ItemPointerSet(&(res->pointerData), blk, l);
return (res);
}
/* itup contains compressed entry */
static BlockNumber
gistChooseSubtree(Relation r, IndexTuple itup, int level,
GISTSTATE *giststate,
GISTSTACK **retstack /*out*/,
Buffer *leafbuf /*out*/)
{
Buffer buffer;
BlockNumber blk;
GISTSTACK *stack;
Page page;
GISTPageOpaque opaque;
IndexTuple which;
blk = GISTP_ROOT;
buffer = InvalidBuffer;
stack = (GISTSTACK *) NULL;
do {
/* let go of current buffer before getting next */
if (buffer != InvalidBuffer)
ReleaseBuffer(buffer);
/* get next buffer */
buffer = ReadBuffer(r, blk);
page = (Page) BufferGetPage(buffer);
opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
if (!(opaque->flags & F_LEAF)) {
GISTSTACK *n;
ItemId iid;
n = (GISTSTACK *) palloc(sizeof(GISTSTACK));
n->gs_parent = stack;
n->gs_blk = blk;
n->gs_child = gistchoose(r, page, itup, giststate);
stack = n;
iid = PageGetItemId(page, n->gs_child);
which = (IndexTuple) PageGetItem(page, iid);
blk = ItemPointerGetBlockNumber(&(which->t_tid));
}
} while (!(opaque->flags & F_LEAF));
*retstack = stack;
*leafbuf = buffer;
return(blk);
}
/* datum is uncompressed */
static void
gistAdjustKeys(Relation r,
GISTSTACK *stk,
BlockNumber blk,
char *datum,
int att_size,
GISTSTATE *giststate)
{
char *oldud;
Page p;
Buffer b;
bool result;
bytea *evec;
GISTENTRY centry, *ev0p, *ev1p, *dentryp;
int size, datumsize;
IndexTuple tid;
if (stk == (GISTSTACK *) NULL)
return;
b = ReadBuffer(r, stk->gs_blk);
p = BufferGetPage(b);
oldud = (char *) PageGetItem(p, PageGetItemId(p, stk->gs_child));
tid = (IndexTuple) oldud;
size = IndexTupleSize((IndexTuple)oldud) - sizeof(IndexTupleData);
oldud += sizeof(IndexTupleData);
evec = (bytea *) palloc(2*sizeof(GISTENTRY) + VARHDRSZ);
VARSIZE(evec) = 2*sizeof(GISTENTRY) + VARHDRSZ;
/* insert decompressed oldud into entry vector */
gistdentryinit(giststate, &((GISTENTRY *)VARDATA(evec))[0],
oldud, r, p, stk->gs_child,
size, FALSE);
ev0p = &((GISTENTRY *)VARDATA(evec))[0];
/* insert datum entry into entry vector */
gistentryinit(((GISTENTRY *)VARDATA(evec))[1], datum,
(Relation)NULL,(Page)NULL,(OffsetNumber)0, att_size, FALSE);
ev1p = &((GISTENTRY *)VARDATA(evec))[1];
/* form union of decompressed entries */
datum = (char *) (giststate->unionFn)(evec, &datumsize);
/* did union leave decompressed version of oldud unchanged? */
(giststate->equalFn)(ev0p->pred, datum, &result);
if (!result) {
TupleDesc td = RelationGetTupleDescriptor(r);
/* compress datum for storage on page */
gistcentryinit(giststate, &centry, datum, ev0p->rel, ev0p->page,
ev0p->offset, datumsize, FALSE);
if (td->attrs[0]->attlen >= 0) {
memmove(oldud, centry.pred, att_size);
gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, datum, att_size,
giststate);
}
else if (VARSIZE(centry.pred) == VARSIZE(oldud)) {
memmove(oldud, centry.pred, VARSIZE(centry.pred));
gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, datum, att_size,
giststate);
}
else {
/* new datum is not the same size as the old.
** We have to delete the old entry and insert the new
** one. Note that this may cause a split here!
*/
IndexTuple newtup;
ItemPointerData oldtid;
char *isnull;
TupleDesc tupDesc;
InsertIndexResult res;
/* delete old tuple */
ItemPointerSet(&oldtid, stk->gs_blk, stk->gs_child);
gistdelete(r, (ItemPointer)&oldtid);
/* generate and insert new tuple */
tupDesc = r->rd_att;
isnull = (char *) palloc(r->rd_rel->relnatts);
memset(isnull, ' ', r->rd_rel->relnatts);
newtup = (IndexTuple) index_formtuple(tupDesc,
(Datum *) &centry.pred, isnull);
pfree(isnull);
/* set pointer in new tuple to point to current child */
ItemPointerSet(&oldtid, blk, 1);
newtup->t_tid = oldtid;
/* inserting the new entry also adjust keys above */
res = gistentryinsert(r, stk, newtup, giststate);
/* in stack, set info to point to new tuple */
stk->gs_blk = ItemPointerGetBlockNumber(&(res->pointerData));
stk->gs_child = ItemPointerGetOffsetNumber(&(res->pointerData));
pfree(res);
}
WriteBuffer(b);
if (centry.pred != datum)
pfree(datum);
}
else {
ReleaseBuffer(b);
}
pfree(evec);
}
/*
* gistSplit -- split a page in the tree.
*
*/
static InsertIndexResult
gistSplit(Relation r,
Buffer buffer,
GISTSTACK *stack,
IndexTuple itup, /* contains compressed entry */
GISTSTATE *giststate)
{
Page p;
Buffer leftbuf, rightbuf;
Page left, right;
ItemId itemid;
IndexTuple item;
IndexTuple ltup, rtup;
OffsetNumber maxoff;
OffsetNumber i;
OffsetNumber leftoff, rightoff;
BlockNumber lbknum, rbknum;
BlockNumber bufblock;
GISTPageOpaque opaque;
int blank;
InsertIndexResult res;
char *isnull;
GIST_SPLITVEC v;
TupleDesc tupDesc;
bytea *entryvec;
bool *decompvec;
IndexTuple item_1;
GISTENTRY tmpcentry, *tmpdentryp, tmpentry;
char *datum;
isnull = (char *) palloc(r->rd_rel->relnatts);
for (blank = 0; blank < r->rd_rel->relnatts; blank++)
isnull[blank] = ' ';
p = (Page) BufferGetPage(buffer);
opaque = (GISTPageOpaque) PageGetSpecialPointer(p);
/*
* The root of the tree is the first block in the relation. If
* we're about to split the root, we need to do some hocus-pocus
* to enforce this guarantee.
*/
if (BufferGetBlockNumber(buffer) == GISTP_ROOT) {
leftbuf = ReadBuffer(r, P_NEW);
GISTInitBuffer(leftbuf, opaque->flags);
lbknum = BufferGetBlockNumber(leftbuf);
left = (Page) BufferGetPage(leftbuf);
} else {
leftbuf = buffer;
IncrBufferRefCount(buffer);
lbknum = BufferGetBlockNumber(buffer);
left = (Page) PageGetTempPage(p, sizeof(GISTPageOpaqueData));
}
rightbuf = ReadBuffer(r, P_NEW);
GISTInitBuffer(rightbuf, opaque->flags);
rbknum = BufferGetBlockNumber(rightbuf);
right = (Page) BufferGetPage(rightbuf);
/* generate the item array */
maxoff = PageGetMaxOffsetNumber(p);
entryvec = (bytea *)palloc(VARHDRSZ + (maxoff + 2) * sizeof(GISTENTRY));
decompvec = (bool *)palloc(VARHDRSZ + (maxoff + 2) * sizeof(bool));
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
item_1 = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
gistdentryinit(giststate, &((GISTENTRY *)VARDATA(entryvec))[i],
(((char *) item_1) + sizeof(IndexTupleData)),
r, p, i,
IndexTupleSize(item_1) - sizeof(IndexTupleData), FALSE);
if ((char *)(((GISTENTRY *)VARDATA(entryvec))[i].pred)
== (((char *) item_1) + sizeof(IndexTupleData)))
decompvec[i] = FALSE;
else decompvec[i] = TRUE;
}
/* add the new datum as the last entry */
gistdentryinit(giststate, &(((GISTENTRY *)VARDATA(entryvec))[maxoff+1]),
(((char *) itup) + sizeof(IndexTupleData)),
(Relation)NULL, (Page)NULL,
(OffsetNumber)0, tmpentry.bytes, FALSE);
if ((char *)(((GISTENTRY *)VARDATA(entryvec))[maxoff+1]).pred !=
(((char *) itup) + sizeof(IndexTupleData)))
decompvec[maxoff+1] = TRUE;
else decompvec[maxoff+1] = FALSE;
VARSIZE(entryvec) = (maxoff + 2) * sizeof(GISTENTRY) + VARHDRSZ;
/* now let the user-defined picksplit function set up the split vector */
(giststate->picksplitFn)(entryvec, &v);
/* compress ldatum and rdatum */
gistcentryinit(giststate, &tmpentry, v.spl_ldatum, (Relation)NULL,
(Page)NULL, (OffsetNumber)0,
((GISTENTRY *)VARDATA(entryvec))[i].bytes, FALSE);
if (v.spl_ldatum != tmpentry.pred)
pfree(v.spl_ldatum);
v.spl_ldatum = tmpentry.pred;
gistcentryinit(giststate, &tmpentry, v.spl_rdatum, (Relation)NULL,
(Page)NULL, (OffsetNumber)0,
((GISTENTRY *)VARDATA(entryvec))[i].bytes, FALSE);
if (v.spl_rdatum != tmpentry.pred)
pfree(v.spl_rdatum);
v.spl_rdatum = tmpentry.pred;
/* clean up the entry vector: its preds need to be deleted, too */
for (i = FirstOffsetNumber; i <= maxoff+1; i = OffsetNumberNext(i))
if (decompvec[i])
pfree(((GISTENTRY *)VARDATA(entryvec))[i].pred);
pfree(entryvec);
pfree(decompvec);
leftoff = rightoff = FirstOffsetNumber;
maxoff = PageGetMaxOffsetNumber(p);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
itemid = PageGetItemId(p, i);
item = (IndexTuple) PageGetItem(p, itemid);
if (i == *(v.spl_left)) {
(void) PageAddItem(left, (Item) item, IndexTupleSize(item),
leftoff, LP_USED);
leftoff = OffsetNumberNext(leftoff);
v.spl_left++; /* advance in left split vector */
} else {
(void) PageAddItem(right, (Item) item, IndexTupleSize(item),
rightoff, LP_USED);
rightoff = OffsetNumberNext(rightoff);
v.spl_right++; /* advance in right split vector */
}
}
/* build an InsertIndexResult for this insertion */
res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
/* now insert the new index tuple */
if (*(v.spl_left) != FirstOffsetNumber) {
(void) PageAddItem(left, (Item) itup, IndexTupleSize(itup),
leftoff, LP_USED);
leftoff = OffsetNumberNext(leftoff);
ItemPointerSet(&(res->pointerData), lbknum, leftoff);
} else {
(void) PageAddItem(right, (Item) itup, IndexTupleSize(itup),
rightoff, LP_USED);
rightoff = OffsetNumberNext(rightoff);
ItemPointerSet(&(res->pointerData), rbknum, rightoff);
}
if ((bufblock = BufferGetBlockNumber(buffer)) != GISTP_ROOT) {
PageRestoreTempPage(left, p);
}
WriteBuffer(leftbuf);
WriteBuffer(rightbuf);
/*
* Okay, the page is split. We have three things left to do:
*
* 1) Adjust any active scans on this index to cope with changes
* we introduced in its structure by splitting this page.
*
* 2) "Tighten" the bounding box of the pointer to the left
* page in the parent node in the tree, if any. Since we
* moved a bunch of stuff off the left page, we expect it
* to get smaller. This happens in the internal insertion
* routine.
*
* 3) Insert a pointer to the right page in the parent. This
* may cause the parent to split. If it does, we need to
* repeat steps one and two for each split node in the tree.
*/
/* adjust active scans */
gistadjscans(r, GISTOP_SPLIT, bufblock, FirstOffsetNumber);
tupDesc = r->rd_att;
ltup = (IndexTuple) index_formtuple(tupDesc,
(Datum *) &(v.spl_ldatum), isnull);
rtup = (IndexTuple) index_formtuple(tupDesc,
(Datum *) &(v.spl_rdatum), isnull);
pfree(isnull);
/* set pointers to new child pages in the internal index tuples */
ItemPointerSet(&(ltup->t_tid), lbknum, 1);
ItemPointerSet(&(rtup->t_tid), rbknum, 1);
gistintinsert(r, stack, ltup, rtup, giststate);
pfree(ltup);
pfree(rtup);
return (res);
}
static void
gistintinsert(Relation r,
GISTSTACK *stk,
IndexTuple ltup,
IndexTuple rtup,
GISTSTATE *giststate)
{
IndexTuple old;
Buffer b;
Page p;
ItemPointerData ltid;
if (stk == (GISTSTACK *) NULL) {
gistnewroot(r, ltup, rtup);
return;
}
/* remove old left pointer, insert the 2 new entries */
ItemPointerSet(&ltid, stk->gs_blk, stk->gs_child);
gistdelete(r, (ItemPointer)&ltid);
gistentryinserttwo(r, stk, ltup, rtup, giststate);
}
static void
gistentryinserttwo(Relation r, GISTSTACK *stk, IndexTuple ltup,
IndexTuple rtup, GISTSTATE *giststate)
{
Buffer b;
Page p;
InsertIndexResult res;
OffsetNumber off;
bytea *evec;
char *datum;
int size;
GISTENTRY tmpentry;
b = ReadBuffer(r, stk->gs_blk);
p = BufferGetPage(b);
if (gistnospace(p, ltup)) {
res = gistSplit(r, b, stk->gs_parent, ltup, giststate);
WriteBuffer(b); /* don't forget to release buffer! - 01/31/94 */
pfree(res);
gistdoinsert(r, rtup, giststate);
} else {
(void) PageAddItem(p, (Item)ltup, IndexTupleSize(ltup),
InvalidOffsetNumber, LP_USED);
WriteBuffer(b);
datum = (((char *) ltup) + sizeof(IndexTupleData));
size = IndexTupleSize(ltup) - sizeof(IndexTupleData);
gistdentryinit(giststate, &tmpentry, datum, (Relation)0, (Page)0,
(OffsetNumber)0, size, 0);
gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, tmpentry.pred,
tmpentry.bytes, giststate);
if (tmpentry.pred != datum)
pfree(tmpentry.pred);
(void)gistentryinsert(r, stk, rtup, giststate);
}
}
static InsertIndexResult
gistentryinsert(Relation r, GISTSTACK *stk, IndexTuple tup,
GISTSTATE *giststate)
{
Buffer b;
Page p;
InsertIndexResult res;
bytea *evec;
char *datum;
int size;
OffsetNumber off;
GISTENTRY tmpentry;
b = ReadBuffer(r, stk->gs_blk);
p = BufferGetPage(b);
if (gistnospace(p, tup)) {
res = gistSplit(r, b, stk->gs_parent, tup, giststate);
WriteBuffer(b); /* don't forget to release buffer! - 01/31/94 */
return(res);
}
else {
res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData));
off = PageAddItem(p, (Item) tup, IndexTupleSize(tup),
InvalidOffsetNumber, LP_USED);
WriteBuffer(b);
ItemPointerSet(&(res->pointerData), stk->gs_blk, off);
datum = (((char *) tup) + sizeof(IndexTupleData));
size = IndexTupleSize(tup) - sizeof(IndexTupleData);
gistdentryinit(giststate, &tmpentry, datum, (Relation)0, (Page)0,
(OffsetNumber)0, size, 0);
gistAdjustKeys(r, stk->gs_parent, stk->gs_blk, tmpentry.pred,
tmpentry.bytes, giststate);
if (tmpentry.pred != datum)
pfree(tmpentry.pred);
return(res);
}
}
static void
gistnewroot(Relation r, IndexTuple lt, IndexTuple rt)
{
Buffer b;
Page p;
b = ReadBuffer(r, GISTP_ROOT);
GISTInitBuffer(b, 0);
p = BufferGetPage(b);
(void) PageAddItem(p, (Item) lt, IndexTupleSize(lt),
FirstOffsetNumber, LP_USED);
(void) PageAddItem(p, (Item) rt, IndexTupleSize(rt),
OffsetNumberNext(FirstOffsetNumber), LP_USED);
WriteBuffer(b);
}
static void
GISTInitBuffer(Buffer b, uint32 f)
{
GISTPageOpaque opaque;
Page page;
Size pageSize;
pageSize = BufferGetPageSize(b);
page = BufferGetPage(b);
memset(page, 0, (int) pageSize);
PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
opaque->flags = f;
}
/* it contains compressed entry */
static OffsetNumber
gistchoose(Relation r, Page p, IndexTuple it, GISTSTATE *giststate)
{
OffsetNumber maxoff;
OffsetNumber i;
char *ud, *id;
char *datum;
float usize, dsize;
OffsetNumber which;
float which_grow;
GISTENTRY entry, identry;
int size, idsize;
idsize = IndexTupleSize(it) - sizeof(IndexTupleData);
id = ((char *) it) + sizeof(IndexTupleData);
maxoff = PageGetMaxOffsetNumber(p);
which_grow = -1.0;
which = -1;
gistdentryinit(giststate,&identry,id,(Relation)NULL,(Page)NULL,
(OffsetNumber)0, idsize, FALSE);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) {
datum = (char *) PageGetItem(p, PageGetItemId(p, i));
size = IndexTupleSize(datum) - sizeof(IndexTupleData);
datum += sizeof(IndexTupleData);
gistdentryinit(giststate,&entry,datum,r,p,i,size,FALSE);
(giststate->penaltyFn)(entry, identry, &usize);
if (which_grow < 0 || usize < which_grow) {
which = i;
which_grow = usize;
if (which_grow == 0)
break;
}
if (entry.pred != datum)
pfree(entry.pred);
}
if (identry.pred != id)
pfree(identry.pred);
return (which);
}
static int
gistnospace(Page p, IndexTuple it)
{
return (PageGetFreeSpace(p) < IndexTupleSize(it));
}
void
gistfreestack(GISTSTACK *s)
{
GISTSTACK *p;
while (s != (GISTSTACK *) NULL) {
p = s->gs_parent;
pfree(s);
s = p;
}
}
void
gistdelete(Relation r, ItemPointer tid)
{
BlockNumber blkno;
OffsetNumber offnum;
Buffer buf;
Page page;
/* must write-lock on delete */
RelationSetLockForWrite(r);
blkno = ItemPointerGetBlockNumber(tid);
offnum = ItemPointerGetOffsetNumber(tid);
/* adjust any scans that will be affected by this deletion */
gistadjscans(r, GISTOP_DEL, blkno, offnum);
/* delete the index tuple */
buf = ReadBuffer(r, blkno);
page = BufferGetPage(buf);
PageIndexTupleDelete(page, offnum);
WriteBuffer(buf);
/* XXX -- two-phase locking, don't release the write lock */
}
void
initGISTstate(GISTSTATE *giststate, Relation index)
{
RegProcedure consistent_proc, union_proc, compress_proc, decompress_proc;
RegProcedure penalty_proc, picksplit_proc, equal_proc;
func_ptr user_fn;
int pronargs;
consistent_proc = index_getprocid(index, 1, GIST_CONSISTENT_PROC);
union_proc = index_getprocid(index, 1, GIST_UNION_PROC);
compress_proc = index_getprocid(index, 1, GIST_COMPRESS_PROC);
decompress_proc = index_getprocid(index, 1, GIST_DECOMPRESS_PROC);
penalty_proc = index_getprocid(index, 1, GIST_PENALTY_PROC);
picksplit_proc = index_getprocid(index, 1, GIST_PICKSPLIT_PROC);
equal_proc = index_getprocid(index, 1, GIST_EQUAL_PROC);
fmgr_info(consistent_proc, &user_fn, &pronargs);
giststate->consistentFn = user_fn;
fmgr_info(union_proc, &user_fn, &pronargs);
giststate->unionFn = user_fn;
fmgr_info(compress_proc, &user_fn, &pronargs);
giststate->compressFn = user_fn;
fmgr_info(decompress_proc, &user_fn, &pronargs);
giststate->decompressFn = user_fn;
fmgr_info(penalty_proc, &user_fn, &pronargs);
giststate->penaltyFn = user_fn;
fmgr_info(picksplit_proc, &user_fn, &pronargs);
giststate->picksplitFn = user_fn;
fmgr_info(equal_proc, &user_fn, &pronargs);
giststate->equalFn = user_fn;
return;
}
static IndexTuple
gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t)
{
char * datum = (((char *) t) + sizeof(IndexTupleData));
/* if new entry fits in index tuple, copy it in */
if (entry.bytes < IndexTupleSize(t) - sizeof(IndexTupleData)) {
memcpy(datum, entry.pred, entry.bytes);
/* clear out old size */
t->t_info &= 0xe000;
/* or in new size */
t->t_info |= MAXALIGN(entry.bytes + sizeof(IndexTupleData));
return(t);
}
else {
/* generate a new index tuple for the compressed entry */
TupleDesc tupDesc = r->rd_att;
IndexTuple newtup;
char *isnull;
int blank;
isnull = (char *) palloc(r->rd_rel->relnatts);
for (blank = 0; blank < r->rd_rel->relnatts; blank++)
isnull[blank] = ' ';
newtup = (IndexTuple) index_formtuple(tupDesc,
(Datum *)&(entry.pred),
isnull);
newtup->t_tid = t->t_tid;
pfree(isnull);
return(newtup);
}
}
void
gistdentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r,
Page pg, OffsetNumber o, int b, bool l)
{
GISTENTRY *dep;
gistentryinit(*e, pr, r, pg, o, b, l);
dep = (GISTENTRY *)((giststate->decompressFn)(e));
gistentryinit(*e, dep->pred, dep->rel, dep->page, dep->offset, dep->bytes,
dep->leafkey);
if (dep != e) pfree(dep);
}
void
gistcentryinit(GISTSTATE *giststate, GISTENTRY *e, char *pr, Relation r,
Page pg, OffsetNumber o, int b, bool l)
{
GISTENTRY *cep;
gistentryinit(*e, pr, r, pg, o, b, l);
cep = (GISTENTRY *)((giststate->compressFn)(e));
gistentryinit(*e, cep->pred, cep->rel, cep->page, cep->offset, cep->bytes,
cep->leafkey);
if (cep != e) pfree(cep);
}
#define GISTDEBUG
#ifdef GISTDEBUG
extern char *text_range_out();
extern char *int_range_out();
void
_gistdump(Relation r)
{
Buffer buf;
Page page;
OffsetNumber offnum, maxoff;
BlockNumber blkno;
BlockNumber nblocks;
GISTPageOpaque po;
IndexTuple itup;
BlockNumber itblkno;
OffsetNumber itoffno;
char *datum;
char *itkey;
nblocks = RelationGetNumberOfBlocks(r);
for (blkno = 0; blkno < nblocks; blkno++) {
buf = ReadBuffer(r, blkno);
page = BufferGetPage(buf);
po = (GISTPageOpaque) PageGetSpecialPointer(page);
maxoff = PageGetMaxOffsetNumber(page);
printf("Page %d maxoff %d <%s>\n", blkno, maxoff,
(po->flags & F_LEAF ? "LEAF" : "INTERNAL"));
if (PageIsEmpty(page)) {
ReleaseBuffer(buf);
continue;
}
for (offnum = FirstOffsetNumber;
offnum <= maxoff;
offnum = OffsetNumberNext(offnum)) {
itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
itblkno = ItemPointerGetBlockNumber(&(itup->t_tid));
itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid));
datum = ((char *) itup);
datum += sizeof(IndexTupleData);
/* get out function for type of key, and out it! */
itkey = (char *) int_range_out(datum);
/* itkey = " unable to print"; */
printf("\t[%d] size %d heap <%d,%d> key:%s\n",
offnum, IndexTupleSize(itup), itblkno, itoffno, itkey);
pfree(itkey);
}
ReleaseBuffer(buf);
}
}
#define TRLOWER(tr) (((tr)->bytes))
#define TRUPPER(tr) (&((tr)->bytes[MAXALIGN(VARSIZE(TRLOWER(tr)))]))
typedef struct txtrange {
/* flag: NINF means that lower is negative infinity; PINF means that
** upper is positive infinity. 0 means that both are numbers.
*/
int32 vl_len;
int32 flag;
char bytes[2];
} TXTRANGE;
typedef struct intrange {
int lower;
int upper;
/* flag: NINF means that lower is negative infinity; PINF means that
** upper is positive infinity. 0 means that both are numbers.
*/
int flag;
} INTRANGE;
char *text_range_out(TXTRANGE *r)
{
char *result;
char *lower, *upper;
if (r == NULL)
return(NULL);
result = (char *)palloc(16 + VARSIZE(TRLOWER(r)) + VARSIZE(TRUPPER(r))
- 2*VARHDRSZ);
lower = (char *)palloc(VARSIZE(TRLOWER(r)) + 1 - VARHDRSZ);
memcpy(lower, VARDATA(TRLOWER(r)), VARSIZE(TRLOWER(r)) - VARHDRSZ);
lower[VARSIZE(TRLOWER(r)) - VARHDRSZ] = '\0';
upper = (char *)palloc(VARSIZE(TRUPPER(r)) + 1 - VARHDRSZ);
memcpy(upper, VARDATA(TRUPPER(r)), VARSIZE(TRUPPER(r)) - VARHDRSZ);
upper[VARSIZE(TRUPPER(r)) - VARHDRSZ] = '\0';
(void) sprintf(result, "[%s,%s): %d", lower, upper, r->flag);
pfree(lower);
pfree(upper);
return(result);
}
char *
int_range_out(INTRANGE *r)
{
char *result;
if (r == NULL)
return(NULL);
result = (char *)palloc(80);
(void) sprintf(result, "[%d,%d): %d",r->lower, r->upper, r->flag);
return(result);
}
#endif /* defined GISTDEBUG */
/*-------------------------------------------------------------------------
*
* gistscan.c--
* routines to manage scans on index relations
*
*
* IDENTIFICATION
* /usr/local/devel/pglite/cvs/src/backend/access/gist/gistscan.c,v 1.7 1995/06/14 00:10:05 jolly Exp
*
*-------------------------------------------------------------------------
*/
#include "c.h"
#include "postgres.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "utils/elog.h"
#include "utils/palloc.h"
#include "utils/rel.h"
#include "access/heapam.h"
#include "access/genam.h"
#include "access/gist.h"
#include "access/giststrat.h"
/* routines defined and used here */
static void gistregscan(IndexScanDesc s);
static void gistdropscan(IndexScanDesc s);
static void gistadjone(IndexScanDesc s, int op, BlockNumber blkno,
OffsetNumber offnum);
static void adjuststack(GISTSTACK *stk, BlockNumber blkno,
OffsetNumber offnum);
static void adjustiptr(IndexScanDesc s, ItemPointer iptr,
int op, BlockNumber blkno, OffsetNumber offnum);
/*
* Whenever we start a GiST scan in a backend, we register it in private
* space. Then if the GiST index gets updated, we check all registered
* scans and adjust them if the tuple they point at got moved by the
* update. We only need to do this in private space, because when we update
* an GiST we have a write lock on the tree, so no other process can have
* any locks at all on it. A single transaction can have write and read
* locks on the same object, so that's why we need to handle this case.
*/
typedef struct GISTScanListData {
IndexScanDesc gsl_scan;
struct GISTScanListData *gsl_next;
} GISTScanListData;
typedef GISTScanListData *GISTScanList;
/* pointer to list of local scans on GiSTs */
static GISTScanList GISTScans = (GISTScanList) NULL;
IndexScanDesc
gistbeginscan(Relation r,
bool fromEnd,
uint16 nkeys,
ScanKey key)
{
IndexScanDesc s;
RelationSetLockForRead(r);
s = RelationGetIndexScan(r, fromEnd, nkeys, key);
gistregscan(s);
return (s);
}
void
gistrescan(IndexScanDesc s, bool fromEnd, ScanKey key)
{
GISTScanOpaque p;
int i;
if (!IndexScanIsValid(s)) {
elog(WARN, "gistrescan: invalid scan.");
return;
}
/*
* Clear all the pointers.
*/
ItemPointerSetInvalid(&s->previousItemData);
ItemPointerSetInvalid(&s->currentItemData);
ItemPointerSetInvalid(&s->nextItemData);
ItemPointerSetInvalid(&s->previousMarkData);
ItemPointerSetInvalid(&s->currentMarkData);
ItemPointerSetInvalid(&s->nextMarkData);
/*
* Set flags.
*/
if (RelationGetNumberOfBlocks(s->relation) == 0) {
s->flags = ScanUnmarked;
} else if (fromEnd) {
s->flags = ScanUnmarked | ScanUncheckedPrevious;
} else {
s->flags = ScanUnmarked | ScanUncheckedNext;
}
s->scanFromEnd = fromEnd;
if (s->numberOfKeys > 0) {
memmove(s->keyData,
key,
s->numberOfKeys * sizeof(ScanKeyData));
}
p = (GISTScanOpaque) s->opaque;
if (p != (GISTScanOpaque) NULL) {
freestack(p->s_stack);
freestack(p->s_markstk);
p->s_stack = p->s_markstk = (GISTSTACK *) NULL;
p->s_flags = 0x0;
} else {
/* initialize opaque data */
p = (GISTScanOpaque) palloc(sizeof(GISTScanOpaqueData));
p->s_stack = p->s_markstk = (GISTSTACK *) NULL;
p->s_flags = 0x0;
s->opaque = p;
p->giststate = (GISTSTATE *)palloc(sizeof(GISTSTATE));
initGISTstate(p->giststate, s->relation);
if (s->numberOfKeys > 0)
/*
** Play games here with the scan key to use the Consistent
** function for all comparisons:
** 1) the sk_procedure field will now be used to hold the
** strategy number
** 2) the sk_func field will point to the Consistent function
*/
for (i = 0; i < s->numberOfKeys; i++) {
/* s->keyData[i].sk_procedure
= index_getprocid(s->relation, 1, GIST_CONSISTENT_PROC); */
s->keyData[i].sk_procedure
= RelationGetGISTStrategy(s->relation, s->keyData[i].sk_attno,
s->keyData[i].sk_procedure);
s->keyData[i].sk_func = p->giststate->consistentFn;
}
}
}
void
gistmarkpos(IndexScanDesc s)
{
GISTScanOpaque p;
GISTSTACK *o, *n, *tmp;
s->currentMarkData = s->currentItemData;
p = (GISTScanOpaque) s->opaque;
if (p->s_flags & GS_CURBEFORE)
p->s_flags |= GS_MRKBEFORE;
else
p->s_flags &= ~GS_MRKBEFORE;
o = (GISTSTACK *) NULL;
n = p->s_stack;
/* copy the parent stack from the current item data */
while (n != (GISTSTACK *) NULL) {
tmp = (GISTSTACK *) palloc(sizeof(GISTSTACK));
tmp->gs_child = n->gs_child;
tmp->gs_blk = n->gs_blk;
tmp->gs_parent = o;
o = tmp;
n = n->gs_parent;
}
freestack(p->s_markstk);
p->s_markstk = o;
}
void
gistrestrpos(IndexScanDesc s)
{
GISTScanOpaque p;
GISTSTACK *o, *n, *tmp;
s->currentItemData = s->currentMarkData;
p = (GISTScanOpaque) s->opaque;
if (p->s_flags & GS_MRKBEFORE)
p->s_flags |= GS_CURBEFORE;
else
p->s_flags &= ~GS_CURBEFORE;
o = (GISTSTACK *) NULL;
n = p->s_markstk;
/* copy the parent stack from the current item data */
while (n != (GISTSTACK *) NULL) {
tmp = (GISTSTACK *) palloc(sizeof(GISTSTACK));
tmp->gs_child = n->gs_child;
tmp->gs_blk = n->gs_blk;
tmp->gs_parent = o;
o = tmp;
n = n->gs_parent;
}
freestack(p->s_stack);
p->s_stack = o;
}
void
gistendscan(IndexScanDesc s)
{
GISTScanOpaque p;
p = (GISTScanOpaque) s->opaque;
if (p != (GISTScanOpaque) NULL) {
freestack(p->s_stack);
freestack(p->s_markstk);
}
gistdropscan(s);
/* XXX don't unset read lock -- two-phase locking */
}
static void
gistregscan(IndexScanDesc s)
{
GISTScanList l;
l = (GISTScanList) palloc(sizeof(GISTScanListData));
l->gsl_scan = s;
l->gsl_next = GISTScans;
GISTScans = l;
}
static void
gistdropscan(IndexScanDesc s)
{
GISTScanList l;
GISTScanList prev;
prev = (GISTScanList) NULL;
for (l = GISTScans;
l != (GISTScanList) NULL && l->gsl_scan != s;
l = l->gsl_next) {
prev = l;
}
if (l == (GISTScanList) NULL)
elog(WARN, "GiST scan list corrupted -- cannot find 0x%lx", s);
if (prev == (GISTScanList) NULL)
GISTScans = l->gsl_next;
else
prev->gsl_next = l->gsl_next;
pfree(l);
}
void
gistadjscans(Relation r, int op, BlockNumber blkno, OffsetNumber offnum)
{
GISTScanList l;
Oid relid;
relid = r->rd_id;
for (l = GISTScans; l != (GISTScanList) NULL; l = l->gsl_next) {
if (l->gsl_scan->relation->rd_id == relid)
gistadjone(l->gsl_scan, op, blkno, offnum);
}
}
/*
* gistadjone() -- adjust one scan for update.
*
* By here, the scan passed in is on a modified relation. Op tells
* us what the modification is, and blkno and offind tell us what
* block and offset index were affected. This routine checks the
* current and marked positions, and the current and marked stacks,
* to see if any stored location needs to be changed because of the
* update. If so, we make the change here.
*/
static void
gistadjone(IndexScanDesc s,
int op,
BlockNumber blkno,
OffsetNumber offnum)
{
GISTScanOpaque so;
adjustiptr(s, &(s->currentItemData), op, blkno, offnum);
adjustiptr(s, &(s->currentMarkData), op, blkno, offnum);
so = (GISTScanOpaque) s->opaque;
if (op == GISTOP_SPLIT) {
adjuststack(so->s_stack, blkno, offnum);
adjuststack(so->s_markstk, blkno, offnum);
}
}
/*
* adjustiptr() -- adjust current and marked item pointers in the scan
*
* Depending on the type of update and the place it happened, we
* need to do nothing, to back up one record, or to start over on
* the same page.
*/
static void
adjustiptr(IndexScanDesc s,
ItemPointer iptr,
int op,
BlockNumber blkno,
OffsetNumber offnum)
{
OffsetNumber curoff;
GISTScanOpaque so;
if (ItemPointerIsValid(iptr)) {
if (ItemPointerGetBlockNumber(iptr) == blkno) {
curoff = ItemPointerGetOffsetNumber(iptr);
so = (GISTScanOpaque) s->opaque;
switch (op) {
case GISTOP_DEL:
/* back up one if we need to */
if (curoff >= offnum) {
if (curoff > FirstOffsetNumber) {
/* just adjust the item pointer */
ItemPointerSet(iptr, blkno, OffsetNumberPrev(curoff));
} else {
/* remember that we're before the current tuple */
ItemPointerSet(iptr, blkno, FirstOffsetNumber);
if (iptr == &(s->currentItemData))
so->s_flags |= GS_CURBEFORE;
else
so->s_flags |= GS_MRKBEFORE;
}
}
break;
case GISTOP_SPLIT:
/* back to start of page on split */
ItemPointerSet(iptr, blkno, FirstOffsetNumber);
if (iptr == &(s->currentItemData))
so->s_flags &= ~GS_CURBEFORE;
else
so->s_flags &= ~GS_MRKBEFORE;
break;
default:
elog(WARN, "Bad operation in GiST scan adjust: %d", op);
}
}
}
}
/*
* adjuststack() -- adjust the supplied stack for a split on a page in
* the index we're scanning.
*
* If a page on our parent stack has split, we need to back up to the
* beginning of the page and rescan it. The reason for this is that
* the split algorithm for GiSTs doesn't order tuples in any useful
* way on a single page. This means on that a split, we may wind up
* looking at some heap tuples more than once. This is handled in the
* access method update code for heaps; if we've modified the tuple we
* are looking at already in this transaction, we ignore the update
* request.
*/
/*ARGSUSED*/
static void
adjuststack(GISTSTACK *stk,
BlockNumber blkno,
OffsetNumber offnum)
{
while (stk != (GISTSTACK *) NULL) {
if (stk->gs_blk == blkno)
stk->gs_child = FirstOffsetNumber;
stk = stk->gs_parent;
}
}
/*-------------------------------------------------------------------------
*
* giststrat.c--
* strategy map data for GiSTs.
*
* Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* /usr/local/devel/pglite/cvs/src/backend/access/gist/giststrat.c,v 1.4 1995/06/14 00:10:05 jolly Exp
*
*-------------------------------------------------------------------------
*/
#include "c.h"
#include "utils/rel.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "access/istrat.h"
#include "access/gist.h"
/*
* Note: negate, commute, and negatecommute all assume that operators are
* ordered as follows in the strategy map:
*
* contains, contained-by
*
* The negate, commute, and negatecommute arrays are used by the planner
* to plan indexed scans over data that appears in the qualificiation in
* a boolean negation, or whose operands appear in the wrong order. For
* example, if the operator "<%" means "contains", and the user says
*
* where not rel.box <% "(10,10,20,20)"::box
*
* the planner can plan an index scan by noting that GiST indices have
* an operator in their operator class for negating <%.
*
* Similarly, if the user says something like
*
* where "(10,10,20,20)"::box <% rel.box
*
* the planner can see that the GiST index on rel.box has an operator in
* its opclass for commuting <%, and plan the scan using that operator.
* This added complexity in the access methods makes the planner a lot easier
* to write.
*/
/* if a op b, what operator tells us if (not a op b)? */
static StrategyNumber GISTNegate[GISTNStrategies] = {
InvalidStrategy,
InvalidStrategy,
InvalidStrategy
};
/* if a op_1 b, what is the operator op_2 such that b op_2 a? */
static StrategyNumber GISTCommute[GISTNStrategies] = {
InvalidStrategy,
InvalidStrategy,
InvalidStrategy
};
/* if a op_1 b, what is the operator op_2 such that (b !op_2 a)? */
static StrategyNumber GISTNegateCommute[GISTNStrategies] = {
InvalidStrategy,
InvalidStrategy,
InvalidStrategy
};
/*
* GiSTs do not currently support TermData (see rtree/rtstrat.c for
* discussion of
* TermData) -- such logic must be encoded in the user's Consistent function.
*/
/*
* If you were sufficiently attentive to detail, you would go through
* the ExpressionData pain above for every one of the strategies
* we defined. I am not. Now we declare the StrategyEvaluationData
* structure that gets shipped around to help the planner and the access
* method decide what sort of scan it should do, based on (a) what the
* user asked for, (b) what operators are defined for a particular opclass,
* and (c) the reams of information we supplied above.
*
* The idea of all of this initialized data is to make life easier on the
* user when he defines a new operator class to use this access method.
* By filling in all the data, we let him get away with leaving holes in his
* operator class, and still let him use the index. The added complexity
* in the access methods just isn't worth the trouble, though.
*/
static StrategyEvaluationData GISTEvaluationData = {
GISTNStrategies, /* # of strategies */
(StrategyTransformMap) GISTNegate, /* how to do (not qual) */
(StrategyTransformMap) GISTCommute, /* how to swap operands */
(StrategyTransformMap) GISTNegateCommute, /* how to do both */
NULL
};
StrategyNumber
RelationGetGISTStrategy(Relation r,
AttrNumber attnum,
RegProcedure proc)
{
return (RelationGetStrategy(r, attnum, &GISTEvaluationData, proc));
}
bool
RelationInvokeGISTStrategy(Relation r,
AttrNumber attnum,
StrategyNumber s,
Datum left,
Datum right)
{
return (RelationInvokeStrategy(r, &GISTEvaluationData, attnum, s,
left, right));
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment