/*------------------------------------------------------------------------- * * rtree.c * interface routines for the postgres rtree indexed access method. * * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION * $Header: /cvsroot/pgsql/src/backend/access/rtree/Attic/rtree.c,v 1.64 2001/09/29 03:46:12 momjian Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/genam.h" #include "access/heapam.h" #include "access/rtree.h" #include "access/xlogutils.h" #include "catalog/index.h" #include "executor/executor.h" #include "miscadmin.h" /* * XXX We assume that all datatypes indexable in rtrees are pass-by-reference. * To fix this, you'd need to improve the IndexTupleGetDatum() macro, and * do something with the various datum-pfreeing code. However, it's not that * unreasonable an assumption in practice. */ #define IndexTupleGetDatum(itup) \ PointerGetDatum(((char *) (itup)) + sizeof(IndexTupleData)) /* * Space-allocation macros. Note we count the item's line pointer in its size. */ #define RTPageAvailSpace \ (BLCKSZ - (sizeof(PageHeaderData) - sizeof(ItemIdData)) \ - MAXALIGN(sizeof(RTreePageOpaqueData))) #define IndexTupleTotalSize(itup) \ (MAXALIGN(IndexTupleSize(itup)) + sizeof(ItemIdData)) #define IndexTupleAttSize(itup) \ (IndexTupleSize(itup) - sizeof(IndexTupleData)) /* results of rtpicksplit() */ typedef struct SPLITVEC { OffsetNumber *spl_left; int spl_nleft; Datum spl_ldatum; OffsetNumber *spl_right; int spl_nright; Datum spl_rdatum; } SPLITVEC; /* for sorting tuples by cost, for picking split */ typedef struct SPLITCOST { OffsetNumber offset_number; float cost_differential; bool choose_left; } SPLITCOST; typedef struct RTSTATE { FmgrInfo unionFn; /* union function */ FmgrInfo sizeFn; /* size function */ FmgrInfo interFn; /* intersection function */ } RTSTATE; /* Working state for rtbuild and its callback */ typedef struct { RTSTATE rtState; double indtuples; } RTBuildState; /* non-export function prototypes */ static void rtbuildCallback(Relation index, HeapTuple htup, Datum *attdata, char *nulls, bool tupleIsAlive, void *state); static InsertIndexResult rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate); static void rttighten(Relation r, RTSTACK *stk, Datum datum, int att_size, RTSTATE *rtstate); static InsertIndexResult rtdosplit(Relation r, Buffer buffer, RTSTACK *stack, IndexTuple itup, RTSTATE *rtstate); static void rtintinsert(Relation r, RTSTACK *stk, IndexTuple ltup, IndexTuple rtup, RTSTATE *rtstate); static void rtnewroot(Relation r, IndexTuple lt, IndexTuple rt); static void rtpicksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup, RTSTATE *rtstate); static void RTInitBuffer(Buffer b, uint32 f); static OffsetNumber choose(Relation r, Page p, IndexTuple it, RTSTATE *rtstate); static int nospace(Page p, IndexTuple it); static void initRtstate(RTSTATE *rtstate, Relation index); static int qsort_comp_splitcost(const void *a, const void *b); /* * routine to build an index. Basically calls insert over and over */ Datum rtbuild(PG_FUNCTION_ARGS) { Relation heap = (Relation) PG_GETARG_POINTER(0); Relation index = (Relation) PG_GETARG_POINTER(1); IndexInfo *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2); double reltuples; RTBuildState buildstate; Buffer buffer; /* no locking is needed */ initRtstate(&buildstate.rtState, index); /* * We expect to be called exactly once for any index relation. If * that's not the case, big trouble's what we have. */ if (RelationGetNumberOfBlocks(index) != 0) elog(ERROR, "%s already contains data", RelationGetRelationName(index)); /* initialize the root page */ buffer = ReadBuffer(index, P_NEW); RTInitBuffer(buffer, F_LEAF); WriteBuffer(buffer); /* build the index */ buildstate.indtuples = 0; /* do the heap scan */ reltuples = IndexBuildHeapScan(heap, index, indexInfo, rtbuildCallback, (void *) &buildstate); /* okay, all heap tuples are indexed */ /* * Since we just counted the tuples in the heap, we update its stats * in pg_class to guarantee that the planner takes advantage of the * index we just created. But, only update statistics during normal * index definitions, not for indices on system catalogs created * during bootstrap processing. We must close the relations before * updating statistics to guarantee that the relcache entries are * flushed when we increment the command counter in UpdateStats(). But * we do not release any locks on the relations; those will be held * until end of transaction. */ if (IsNormalProcessingMode()) { Oid hrelid = RelationGetRelid(heap); Oid irelid = RelationGetRelid(index); heap_close(heap, NoLock); index_close(index); UpdateStats(hrelid, reltuples); UpdateStats(irelid, buildstate.indtuples); } PG_RETURN_VOID(); } /* * Per-tuple callback from IndexBuildHeapScan */ static void rtbuildCallback(Relation index, HeapTuple htup, Datum *attdata, char *nulls, bool tupleIsAlive, void *state) { RTBuildState *buildstate = (RTBuildState *) state; IndexTuple itup; InsertIndexResult res; /* form an index tuple and point it at the heap tuple */ itup = index_formtuple(RelationGetDescr(index), attdata, nulls); itup->t_tid = htup->t_self; /* rtree indexes don't index nulls, see notes in rtinsert */ if (IndexTupleHasNulls(itup)) { pfree(itup); return; } /* * Since we already have the index relation locked, we call * rtdoinsert directly. Normal access method calls dispatch * through rtinsert, which locks the relation for write. This is * the right thing to do if you're inserting single tups, but not * when you're initializing the whole index at once. */ res = rtdoinsert(index, itup, &buildstate->rtState); if (res) pfree(res); buildstate->indtuples += 1; pfree(itup); } /* * rtinsert -- wrapper for rtree tuple insertion. * * This is the public interface routine for tuple insertion in rtrees. * It doesn't do any work; just locks the relation and passes the buck. */ Datum rtinsert(PG_FUNCTION_ARGS) { Relation r = (Relation) PG_GETARG_POINTER(0); Datum *datum = (Datum *) PG_GETARG_POINTER(1); char *nulls = (char *) PG_GETARG_POINTER(2); ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3); #ifdef NOT_USED Relation heapRel = (Relation) PG_GETARG_POINTER(4); #endif InsertIndexResult res; IndexTuple itup; RTSTATE rtState; /* generate an index tuple */ itup = index_formtuple(RelationGetDescr(r), datum, nulls); itup->t_tid = *ht_ctid; /* * Currently, rtrees do not support indexing NULLs; considerable * infrastructure work would have to be done to do anything reasonable * with a NULL. */ if (IndexTupleHasNulls(itup)) { pfree(itup); PG_RETURN_POINTER((InsertIndexResult) NULL); } initRtstate(&rtState, r); /* * Since rtree is not marked "amconcurrent" in pg_am, caller should * have acquired exclusive lock on index relation. We need no locking * here. */ res = rtdoinsert(r, itup, &rtState); PG_RETURN_POINTER(res); } static InsertIndexResult rtdoinsert(Relation r, IndexTuple itup, RTSTATE *rtstate) { Page page; Buffer buffer; BlockNumber blk; IndexTuple which; OffsetNumber l; RTSTACK *stack; InsertIndexResult res; RTreePageOpaque opaque; Datum datum; blk = P_ROOT; buffer = InvalidBuffer; stack = (RTSTACK *) NULL; do { /* let go of current buffer before getting next */ if (buffer != InvalidBuffer) ReleaseBuffer(buffer); /* get next buffer */ buffer = ReadBuffer(r, blk); page = (Page) BufferGetPage(buffer); opaque = (RTreePageOpaque) PageGetSpecialPointer(page); if (!(opaque->flags & F_LEAF)) { RTSTACK *n; ItemId iid; n = (RTSTACK *) palloc(sizeof(RTSTACK)); n->rts_parent = stack; n->rts_blk = blk; n->rts_child = choose(r, page, itup, rtstate); stack = n; iid = PageGetItemId(page, n->rts_child); which = (IndexTuple) PageGetItem(page, iid); blk = ItemPointerGetBlockNumber(&(which->t_tid)); } } while (!(opaque->flags & F_LEAF)); if (nospace(page, itup)) { /* need to do a split */ res = rtdosplit(r, buffer, stack, itup, rtstate); freestack(stack); WriteBuffer(buffer); /* don't forget to release buffer! */ return res; } /* add the item and write the buffer */ if (PageIsEmpty(page)) { l = PageAddItem(page, (Item) itup, IndexTupleSize(itup), FirstOffsetNumber, LP_USED); } else { l = PageAddItem(page, (Item) itup, IndexTupleSize(itup), OffsetNumberNext(PageGetMaxOffsetNumber(page)), LP_USED); } if (l == InvalidOffsetNumber) elog(ERROR, "rtdoinsert: failed to add index item to %s", RelationGetRelationName(r)); WriteBuffer(buffer); datum = IndexTupleGetDatum(itup); /* now expand the page boundary in the parent to include the new child */ rttighten(r, stack, datum, IndexTupleAttSize(itup), rtstate); freestack(stack); /* build and return an InsertIndexResult for this insertion */ res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); ItemPointerSet(&(res->pointerData), blk, l); return res; } static void rttighten(Relation r, RTSTACK *stk, Datum datum, int att_size, RTSTATE *rtstate) { Datum oldud; Datum tdatum; Page p; float old_size, newd_size; Buffer b; if (stk == (RTSTACK *) NULL) return; b = ReadBuffer(r, stk->rts_blk); p = BufferGetPage(b); oldud = IndexTupleGetDatum(PageGetItem(p, PageGetItemId(p, stk->rts_child))); FunctionCall2(&rtstate->sizeFn, oldud, PointerGetDatum(&old_size)); datum = FunctionCall2(&rtstate->unionFn, oldud, datum); FunctionCall2(&rtstate->sizeFn, datum, PointerGetDatum(&newd_size)); /* * If newd_size == 0 we have degenerate rectangles, so we * don't know if there was any change, so we have to * assume there was. */ if ((newd_size == 0) || (newd_size != old_size)) { TupleDesc td = RelationGetDescr(r); if (td->attrs[0]->attlen < 0) { /* * This is an internal page, so 'oldud' had better be a union * (constant-length) key, too. (See comment below.) */ Assert(VARSIZE(DatumGetPointer(datum)) == VARSIZE(DatumGetPointer(oldud))); memmove(DatumGetPointer(oldud), DatumGetPointer(datum), VARSIZE(DatumGetPointer(datum))); } else { memmove(DatumGetPointer(oldud), DatumGetPointer(datum), att_size); } WriteBuffer(b); /* * The user may be defining an index on variable-sized data (like * polygons). If so, we need to get a constant-sized datum for * insertion on the internal page. We do this by calling the * union proc, which is required to return a rectangle. */ tdatum = FunctionCall2(&rtstate->unionFn, datum, datum); rttighten(r, stk->rts_parent, tdatum, att_size, rtstate); pfree(DatumGetPointer(tdatum)); } else ReleaseBuffer(b); pfree(DatumGetPointer(datum)); } /* * rtdosplit -- split a page in the tree. * * rtpicksplit does the interesting work of choosing the split. * This routine just does the bit-pushing. */ static InsertIndexResult rtdosplit(Relation r, Buffer buffer, RTSTACK *stack, IndexTuple itup, RTSTATE *rtstate) { Page p; Buffer leftbuf, rightbuf; Page left, right; ItemId itemid; IndexTuple item; IndexTuple ltup, rtup; OffsetNumber maxoff; OffsetNumber i; OffsetNumber leftoff, rightoff; BlockNumber lbknum, rbknum; BlockNumber bufblock; RTreePageOpaque opaque; int blank; InsertIndexResult res; char *isnull; SPLITVEC v; OffsetNumber *spl_left, *spl_right; TupleDesc tupDesc; int n; OffsetNumber newitemoff; p = (Page) BufferGetPage(buffer); opaque = (RTreePageOpaque) PageGetSpecialPointer(p); rtpicksplit(r, p, &v, itup, rtstate); /* * The root of the tree is the first block in the relation. If we're * about to split the root, we need to do some hocus-pocus to enforce * this guarantee. */ if (BufferGetBlockNumber(buffer) == P_ROOT) { leftbuf = ReadBuffer(r, P_NEW); RTInitBuffer(leftbuf, opaque->flags); lbknum = BufferGetBlockNumber(leftbuf); left = (Page) BufferGetPage(leftbuf); } else { leftbuf = buffer; IncrBufferRefCount(buffer); lbknum = BufferGetBlockNumber(buffer); left = (Page) PageGetTempPage(p, sizeof(RTreePageOpaqueData)); } rightbuf = ReadBuffer(r, P_NEW); RTInitBuffer(rightbuf, opaque->flags); rbknum = BufferGetBlockNumber(rightbuf); right = (Page) BufferGetPage(rightbuf); spl_left = v.spl_left; spl_right = v.spl_right; leftoff = rightoff = FirstOffsetNumber; maxoff = PageGetMaxOffsetNumber(p); newitemoff = OffsetNumberNext(maxoff); /* build an InsertIndexResult for this insertion */ res = (InsertIndexResult) palloc(sizeof(InsertIndexResultData)); /* * spl_left contains a list of the offset numbers of the * tuples that will go to the left page. For each offset * number, get the tuple item, then add the item to the * left page. Similarly for the right side. */ /* fill left node */ for (n = 0; n < v.spl_nleft; n++) { i = *spl_left; if (i == newitemoff) item = itup; else { itemid = PageGetItemId(p, i); item = (IndexTuple) PageGetItem(p, itemid); } if (PageAddItem(left, (Item) item, IndexTupleSize(item), leftoff, LP_USED) == InvalidOffsetNumber) elog(ERROR, "rtdosplit: failed to add index item to %s", RelationGetRelationName(r)); leftoff = OffsetNumberNext(leftoff); if (i == newitemoff) ItemPointerSet(&(res->pointerData), lbknum, leftoff); spl_left++; /* advance in left split vector */ } /* fill right node */ for (n = 0; n < v.spl_nright; n++) { i = *spl_right; if (i == newitemoff) item = itup; else { itemid = PageGetItemId(p, i); item = (IndexTuple) PageGetItem(p, itemid); } if (PageAddItem(right, (Item) item, IndexTupleSize(item), rightoff, LP_USED) == InvalidOffsetNumber) elog(ERROR, "rtdosplit: failed to add index item to %s", RelationGetRelationName(r)); rightoff = OffsetNumberNext(rightoff); if (i == newitemoff) ItemPointerSet(&(res->pointerData), rbknum, rightoff); spl_right++; /* advance in right split vector */ } /* Make sure we consumed all of the split vectors, and release 'em */ Assert(*spl_left == InvalidOffsetNumber); Assert(*spl_right == InvalidOffsetNumber); pfree(v.spl_left); pfree(v.spl_right); if ((bufblock = BufferGetBlockNumber(buffer)) != P_ROOT) PageRestoreTempPage(left, p); WriteBuffer(leftbuf); WriteBuffer(rightbuf); /* * Okay, the page is split. We have three things left to do: * * 1) Adjust any active scans on this index to cope with changes we * introduced in its structure by splitting this page. * * 2) "Tighten" the bounding box of the pointer to the left page in the * parent node in the tree, if any. Since we moved a bunch of stuff * off the left page, we expect it to get smaller. This happens in * the internal insertion routine. * * 3) Insert a pointer to the right page in the parent. This may cause * the parent to split. If it does, we need to repeat steps one and * two for each split node in the tree. */ /* adjust active scans */ rtadjscans(r, RTOP_SPLIT, bufblock, FirstOffsetNumber); tupDesc = r->rd_att; isnull = (char *) palloc(r->rd_rel->relnatts); for (blank = 0; blank < r->rd_rel->relnatts; blank++) isnull[blank] = ' '; ltup = (IndexTuple) index_formtuple(tupDesc, &(v.spl_ldatum), isnull); rtup = (IndexTuple) index_formtuple(tupDesc, &(v.spl_rdatum), isnull); pfree(isnull); /* set pointers to new child pages in the internal index tuples */ ItemPointerSet(&(ltup->t_tid), lbknum, 1); ItemPointerSet(&(rtup->t_tid), rbknum, 1); rtintinsert(r, stack, ltup, rtup, rtstate); pfree(ltup); pfree(rtup); return res; } static void rtintinsert(Relation r, RTSTACK *stk, IndexTuple ltup, IndexTuple rtup, RTSTATE *rtstate) { IndexTuple old; Buffer b; Page p; Datum ldatum, rdatum, newdatum; InsertIndexResult res; if (stk == (RTSTACK *) NULL) { rtnewroot(r, ltup, rtup); return; } b = ReadBuffer(r, stk->rts_blk); p = BufferGetPage(b); old = (IndexTuple) PageGetItem(p, PageGetItemId(p, stk->rts_child)); /* * This is a hack. Right now, we force rtree internal keys to be * constant size. To fix this, need delete the old key and add both * left and right for the two new pages. The insertion of left may * force a split if the new left key is bigger than the old key. */ if (IndexTupleSize(old) != IndexTupleSize(ltup)) elog(ERROR, "Variable-length rtree keys are not supported."); /* install pointer to left child */ memmove(old, ltup, IndexTupleSize(ltup)); if (nospace(p, rtup)) { newdatum = IndexTupleGetDatum(ltup); rttighten(r, stk->rts_parent, newdatum, IndexTupleAttSize(ltup), rtstate); res = rtdosplit(r, b, stk->rts_parent, rtup, rtstate); WriteBuffer(b); /* don't forget to release buffer! - * 01/31/94 */ pfree(res); } else { if (PageAddItem(p, (Item) rtup, IndexTupleSize(rtup), PageGetMaxOffsetNumber(p), LP_USED) == InvalidOffsetNumber) elog(ERROR, "rtintinsert: failed to add index item to %s", RelationGetRelationName(r)); WriteBuffer(b); ldatum = IndexTupleGetDatum(ltup); rdatum = IndexTupleGetDatum(rtup); newdatum = FunctionCall2(&rtstate->unionFn, ldatum, rdatum); rttighten(r, stk->rts_parent, newdatum, IndexTupleAttSize(rtup), rtstate); pfree(DatumGetPointer(newdatum)); } } static void rtnewroot(Relation r, IndexTuple lt, IndexTuple rt) { Buffer b; Page p; b = ReadBuffer(r, P_ROOT); RTInitBuffer(b, 0); p = BufferGetPage(b); if (PageAddItem(p, (Item) lt, IndexTupleSize(lt), FirstOffsetNumber, LP_USED) == InvalidOffsetNumber) elog(ERROR, "rtnewroot: failed to add index item to %s", RelationGetRelationName(r)); if (PageAddItem(p, (Item) rt, IndexTupleSize(rt), OffsetNumberNext(FirstOffsetNumber), LP_USED) == InvalidOffsetNumber) elog(ERROR, "rtnewroot: failed to add index item to %s", RelationGetRelationName(r)); WriteBuffer(b); } /* * Choose how to split an rtree page into two pages. * * We return two vectors of index item numbers, one for the items to be * put on the left page, one for the items to be put on the right page. * In addition, the item to be added (itup) is listed in the appropriate * vector. It is represented by item number N+1 (N = # of items on page). * * Both vectors have a terminating sentinel value of InvalidOffsetNumber, * but the sentinal value is no longer used, because the SPLITVEC * vector also contains the length of each vector, and that information * is now used to iterate over them in rtdosplit(). --kbb, 21 Sept 2001 * * The bounding-box datums for the two new pages are also returned in *v. * * This is the quadratic-cost split algorithm Guttman describes in * his paper. The reason we chose it is that you can implement this * with less information about the data types on which you're operating. * * We must also deal with a consideration not found in Guttman's algorithm: * variable-length data. In particular, the incoming item might be * large enough that not just any split will work. In the worst case, * our "split" may have to be the new item on one page and all the existing * items on the other. Short of that, we have to take care that we do not * make a split that leaves both pages too full for the new item. */ static void rtpicksplit(Relation r, Page page, SPLITVEC *v, IndexTuple itup, RTSTATE *rtstate) { OffsetNumber maxoff, newitemoff; OffsetNumber i, j; IndexTuple item_1, item_2; Datum datum_alpha, datum_beta; Datum datum_l, datum_r; Datum union_d, union_dl, union_dr; Datum inter_d; bool firsttime; float size_alpha, size_beta, size_union, size_inter; float size_waste, waste; float size_l, size_r; int nbytes; OffsetNumber seed_1 = 0, seed_2 = 0; OffsetNumber *left, *right; Size newitemsz, item_1_sz, item_2_sz, left_avail_space, right_avail_space; int total_num_tuples, num_tuples_without_seeds, max_after_split; /* in Guttman's lingo, (M - m) */ float diff; /* diff between cost of putting tuple left or right */ SPLITCOST *cost_vector; int n; /* * First, make sure the new item is not so large that we can't * possibly fit it on a page, even by itself. (It's sufficient to * make this test here, since any oversize tuple must lead to a page * split attempt.) */ newitemsz = IndexTupleTotalSize(itup); if (newitemsz > RTPageAvailSpace) elog(ERROR, "rtree: index item size %lu exceeds maximum %lu", (unsigned long) newitemsz, (unsigned long) RTPageAvailSpace); maxoff = PageGetMaxOffsetNumber(page); newitemoff = OffsetNumberNext(maxoff); /* phony index for new * item */ total_num_tuples = newitemoff; num_tuples_without_seeds = total_num_tuples - 2; max_after_split = total_num_tuples / 2; /* works for m = M/2 */ /* Make arrays big enough for worst case, including sentinel */ nbytes = (maxoff + 2) * sizeof(OffsetNumber); v->spl_left = (OffsetNumber *) palloc(nbytes); v->spl_right = (OffsetNumber *) palloc(nbytes); firsttime = true; waste = 0.0; for (i = FirstOffsetNumber; i < maxoff; i = OffsetNumberNext(i)) { item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i)); datum_alpha = IndexTupleGetDatum(item_1); item_1_sz = IndexTupleTotalSize(item_1); for (j = OffsetNumberNext(i); j <= maxoff; j = OffsetNumberNext(j)) { item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, j)); datum_beta = IndexTupleGetDatum(item_2); item_2_sz = IndexTupleTotalSize(item_2); /* * Ignore seed pairs that don't leave room for the new item on * either split page. */ if (newitemsz + item_1_sz > RTPageAvailSpace && newitemsz + item_2_sz > RTPageAvailSpace) continue; /* compute the wasted space by unioning these guys */ union_d = FunctionCall2(&rtstate->unionFn, datum_alpha, datum_beta); FunctionCall2(&rtstate->sizeFn, union_d, PointerGetDatum(&size_union)); inter_d = FunctionCall2(&rtstate->interFn, datum_alpha, datum_beta); /* * The interFn may return a NULL pointer (not an SQL null!) to * indicate no intersection. sizeFn must cope with this. */ FunctionCall2(&rtstate->sizeFn, inter_d, PointerGetDatum(&size_inter)); size_waste = size_union - size_inter; if (DatumGetPointer(union_d) != NULL) pfree(DatumGetPointer(union_d)); if (DatumGetPointer(inter_d) != NULL) pfree(DatumGetPointer(inter_d)); /* * are these a more promising split that what we've already * seen? */ if (size_waste > waste || firsttime) { waste = size_waste; seed_1 = i; seed_2 = j; firsttime = false; } } } if (firsttime) { /* * There is no possible split except to put the new item on its * own page. Since we still have to compute the union rectangles, * we play dumb and run through the split algorithm anyway, * setting seed_1 = first item on page and seed_2 = new item. */ seed_1 = FirstOffsetNumber; seed_2 = newitemoff; } item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_1)); datum_alpha = IndexTupleGetDatum(item_1); datum_l = FunctionCall2(&rtstate->unionFn, datum_alpha, datum_alpha); FunctionCall2(&rtstate->sizeFn, datum_l, PointerGetDatum(&size_l)); left_avail_space = RTPageAvailSpace - IndexTupleTotalSize(item_1); if (seed_2 == newitemoff) { item_2 = itup; /* Needn't leave room for new item in calculations below */ newitemsz = 0; } else item_2 = (IndexTuple) PageGetItem(page, PageGetItemId(page, seed_2)); datum_beta = IndexTupleGetDatum(item_2); datum_r = FunctionCall2(&rtstate->unionFn, datum_beta, datum_beta); FunctionCall2(&rtstate->sizeFn, datum_r, PointerGetDatum(&size_r)); right_avail_space = RTPageAvailSpace - IndexTupleTotalSize(item_2); /* * Now split up the regions between the two seeds. * * The cost_vector array will contain hints for determining where * each tuple should go. Each record in the array will contain * a boolean, choose_left, that indicates which node the tuple * prefers to be on, and the absolute difference in cost between * putting the tuple in its favored node and in the other node. * * Later, we will sort the cost_vector in descending order by cost * difference, and consider the tuples in that order for * placement. That way, the tuples that *really* want to be in * one node or the other get to choose first, and the tuples that * don't really care choose last. * * First, build the cost_vector array. The new index tuple will * also be handled in this loop, and represented in the array, * with i==newitemoff. * * In the case of variable size tuples it is possible that we only * have the two seeds and no other tuples, in which case we don't * do any of this cost_vector stuff. */ /* to keep compiler quiet */ cost_vector = (SPLITCOST *) NULL; if (num_tuples_without_seeds > 0) { cost_vector = (SPLITCOST *) palloc(num_tuples_without_seeds * sizeof(SPLITCOST)); n = 0; for (i = FirstOffsetNumber; i <= newitemoff; i = OffsetNumberNext(i)) { /* Compute new union datums and sizes for both choices */ if ((i == seed_1) || (i == seed_2)) continue; else if (i == newitemoff) item_1 = itup; else item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i)); datum_alpha = IndexTupleGetDatum(item_1); union_dl = FunctionCall2(&rtstate->unionFn, datum_l, datum_alpha); union_dr = FunctionCall2(&rtstate->unionFn, datum_r, datum_alpha); FunctionCall2(&rtstate->sizeFn, union_dl, PointerGetDatum(&size_alpha)); FunctionCall2(&rtstate->sizeFn, union_dr, PointerGetDatum(&size_beta)); diff = (size_alpha - size_l) - (size_beta - size_r); cost_vector[n].offset_number = i; cost_vector[n].cost_differential = fabs(diff); cost_vector[n].choose_left = (diff < 0); n++; } /* * Sort the array. The function qsort_comp_splitcost is * set up "backwards", to provided descending order. */ qsort(cost_vector, num_tuples_without_seeds, sizeof(SPLITCOST), &qsort_comp_splitcost); } /* * Now make the final decisions about where each tuple will go, * and build the vectors to return in the SPLITVEC record. * * The cost_vector array contains (descriptions of) all the * tuples, in the order that we want to consider them, so we * we just iterate through it and place each tuple in left * or right nodes, according to the criteria described below. */ left = v->spl_left; v->spl_nleft = 0; right = v->spl_right; v->spl_nright = 0; /* Place the seeds first. * left avail space, left union, right avail space, and right * union have already been adjusted for the seeds. */ *left++ = seed_1; v->spl_nleft++; *right++ = seed_2; v->spl_nright++; for (n = 0; n < num_tuples_without_seeds; n++) { bool left_feasible, right_feasible, choose_left; /* * We need to figure out which page needs the least * enlargement in order to store the item. */ i = cost_vector[n].offset_number; /* Compute new union datums and sizes for both possible additions */ if (i == newitemoff) { item_1 = itup; /* Needn't leave room for new item anymore */ newitemsz = 0; } else item_1 = (IndexTuple) PageGetItem(page, PageGetItemId(page, i)); item_1_sz = IndexTupleTotalSize(item_1); datum_alpha = IndexTupleGetDatum(item_1); union_dl = FunctionCall2(&rtstate->unionFn, datum_l, datum_alpha); union_dr = FunctionCall2(&rtstate->unionFn, datum_r, datum_alpha); FunctionCall2(&rtstate->sizeFn, union_dl, PointerGetDatum(&size_alpha)); FunctionCall2(&rtstate->sizeFn, union_dr, PointerGetDatum(&size_beta)); /* * We prefer the page that shows smaller enlargement of its union * area (Guttman's algorithm), but we must take care that at least * one page will still have room for the new item after this one * is added. * * (We know that all the old items together can fit on one page, so * we need not worry about any other problem than failing to fit * the new item.) * * Guttman's algorithm actually has two factors to consider (in * order): 1. if one node has so many tuples already assigned to * it that the other needs all the rest in order to satisfy the * condition that neither node has fewer than m tuples, then * that is decisive; 2. otherwise, choose the page that shows * the smaller enlargement of its union area. * * I have chosen m = M/2, where M is the maximum number of * tuples on a page. (Actually, this is only strictly * true for fixed size tuples. For variable size tuples, * there still might have to be only one tuple on a page, * if it is really big. But even with variable size * tuples we still try to get m as close as possible to M/2.) * * The question of which page shows the smaller enlargement of * its union area has already been answered, and the answer * stored in the choose_left field of the SPLITCOST record. */ left_feasible = (left_avail_space >= item_1_sz && ((left_avail_space - item_1_sz) >= newitemsz || right_avail_space >= newitemsz)); right_feasible = (right_avail_space >= item_1_sz && ((right_avail_space - item_1_sz) >= newitemsz || left_avail_space >= newitemsz)); if (left_feasible && right_feasible) { /* * Both feasible, use Guttman's algorithm. * First check the m condition described above, and if * that doesn't apply, choose the page with the smaller * enlargement of its union area. */ if (v->spl_nleft > max_after_split) choose_left = false; else if (v->spl_nright > max_after_split) choose_left = true; else choose_left = cost_vector[n].choose_left; } else if (left_feasible) choose_left = true; else if (right_feasible) choose_left = false; else { elog(ERROR, "rtpicksplit: failed to find a workable page split"); choose_left = false;/* keep compiler quiet */ } if (choose_left) { pfree(DatumGetPointer(datum_l)); pfree(DatumGetPointer(union_dr)); datum_l = union_dl; size_l = size_alpha; left_avail_space -= item_1_sz; *left++ = i; v->spl_nleft++; } else { pfree(DatumGetPointer(datum_r)); pfree(DatumGetPointer(union_dl)); datum_r = union_dr; size_r = size_beta; right_avail_space -= item_1_sz; *right++ = i; v->spl_nright++; } } if (num_tuples_without_seeds > 0) { pfree(cost_vector); } *left = *right = InvalidOffsetNumber; /* add ending sentinels */ v->spl_ldatum = datum_l; v->spl_rdatum = datum_r; } static void RTInitBuffer(Buffer b, uint32 f) { RTreePageOpaque opaque; Page page; Size pageSize; pageSize = BufferGetPageSize(b); page = BufferGetPage(b); MemSet(page, 0, (int) pageSize); PageInit(page, pageSize, sizeof(RTreePageOpaqueData)); opaque = (RTreePageOpaque) PageGetSpecialPointer(page); opaque->flags = f; } static OffsetNumber choose(Relation r, Page p, IndexTuple it, RTSTATE *rtstate) { OffsetNumber maxoff; OffsetNumber i; Datum ud, id; Datum datum; float usize, dsize; OffsetNumber which; float which_grow; id = IndexTupleGetDatum(it); maxoff = PageGetMaxOffsetNumber(p); which_grow = -1.0; which = -1; for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) { datum = IndexTupleGetDatum(PageGetItem(p, PageGetItemId(p, i))); FunctionCall2(&rtstate->sizeFn, datum, PointerGetDatum(&dsize)); ud = FunctionCall2(&rtstate->unionFn, datum, id); FunctionCall2(&rtstate->sizeFn, ud, PointerGetDatum(&usize)); pfree(DatumGetPointer(ud)); if (which_grow < 0 || usize - dsize < which_grow) { which = i; which_grow = usize - dsize; if (which_grow == 0) break; } } return which; } static int nospace(Page p, IndexTuple it) { return PageGetFreeSpace(p) < IndexTupleSize(it); } void freestack(RTSTACK *s) { RTSTACK *p; while (s != (RTSTACK *) NULL) { p = s->rts_parent; pfree(s); s = p; } } /* * Bulk deletion of all index entries pointing to a set of heap tuples. * The set of target tuples is specified via a callback routine that tells * whether any given heap tuple (identified by ItemPointer) is being deleted. * * Result: a palloc'd struct containing statistical info for VACUUM displays. */ Datum rtbulkdelete(PG_FUNCTION_ARGS) { Relation rel = (Relation) PG_GETARG_POINTER(0); IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1); void *callback_state = (void *) PG_GETARG_POINTER(2); IndexBulkDeleteResult *result; BlockNumber num_pages; double tuples_removed; double num_index_tuples; RetrieveIndexResult res; IndexScanDesc iscan; tuples_removed = 0; num_index_tuples = 0; /* * Since rtree is not marked "amconcurrent" in pg_am, caller should * have acquired exclusive lock on index relation. We need no locking * here. */ /* * XXX generic implementation --- should be improved! */ /* walk through the entire index */ iscan = index_beginscan(rel, false, 0, (ScanKey) NULL); while ((res = index_getnext(iscan, ForwardScanDirection)) != (RetrieveIndexResult) NULL) { ItemPointer heapptr = &res->heap_iptr; if (callback(heapptr, callback_state)) { ItemPointer indexptr = &res->index_iptr; BlockNumber blkno; OffsetNumber offnum; Buffer buf; Page page; blkno = ItemPointerGetBlockNumber(indexptr); offnum = ItemPointerGetOffsetNumber(indexptr); /* adjust any scans that will be affected by this deletion */ /* (namely, my own scan) */ rtadjscans(rel, RTOP_DEL, blkno, offnum); /* delete the index tuple */ buf = ReadBuffer(rel, blkno); page = BufferGetPage(buf); PageIndexTupleDelete(page, offnum); WriteBuffer(buf); tuples_removed += 1; } else num_index_tuples += 1; pfree(res); } index_endscan(iscan); /* return statistics */ num_pages = RelationGetNumberOfBlocks(rel); result = (IndexBulkDeleteResult *) palloc(sizeof(IndexBulkDeleteResult)); result->num_pages = num_pages; result->tuples_removed = tuples_removed; result->num_index_tuples = num_index_tuples; PG_RETURN_POINTER(result); } static void initRtstate(RTSTATE *rtstate, Relation index) { RegProcedure union_proc, size_proc, inter_proc; union_proc = index_getprocid(index, 1, RT_UNION_PROC); size_proc = index_getprocid(index, 1, RT_SIZE_PROC); inter_proc = index_getprocid(index, 1, RT_INTER_PROC); fmgr_info(union_proc, &rtstate->unionFn); fmgr_info(size_proc, &rtstate->sizeFn); fmgr_info(inter_proc, &rtstate->interFn); return; } /* for sorting SPLITCOST records in descending order */ static int qsort_comp_splitcost(const void *a, const void *b) { float diff = ((SPLITCOST *)a)->cost_differential - ((SPLITCOST *)b)->cost_differential; if (diff < 0) return 1; else if (diff > 0) return -1; else return 0; } #ifdef RTDEBUG void _rtdump(Relation r) { Buffer buf; Page page; OffsetNumber offnum, maxoff; BlockNumber blkno; BlockNumber nblocks; RTreePageOpaque po; IndexTuple itup; BlockNumber itblkno; OffsetNumber itoffno; Datum datum; char *itkey; nblocks = RelationGetNumberOfBlocks(r); for (blkno = 0; blkno < nblocks; blkno++) { buf = ReadBuffer(r, blkno); page = BufferGetPage(buf); po = (RTreePageOpaque) PageGetSpecialPointer(page); maxoff = PageGetMaxOffsetNumber(page); printf("Page %d maxoff %d <%s>\n", blkno, maxoff, (po->flags & F_LEAF ? "LEAF" : "INTERNAL")); if (PageIsEmpty(page)) { ReleaseBuffer(buf); continue; } for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) { itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); itblkno = ItemPointerGetBlockNumber(&(itup->t_tid)); itoffno = ItemPointerGetOffsetNumber(&(itup->t_tid)); datum = IndexTupleGetDatum(itup); itkey = DatumGetCString(DirectFunctionCall1(box_out, datum)); printf("\t[%d] size %d heap <%d,%d> key:%s\n", offnum, IndexTupleSize(itup), itblkno, itoffno, itkey); pfree(itkey); } ReleaseBuffer(buf); } } #endif /* defined RTDEBUG */ void rtree_redo(XLogRecPtr lsn, XLogRecord *record) { elog(STOP, "rtree_redo: unimplemented"); } void rtree_undo(XLogRecPtr lsn, XLogRecord *record) { elog(STOP, "rtree_undo: unimplemented"); } void rtree_desc(char *buf, uint8 xl_info, char *rec) { }