/*------------------------------------------------------------------------- * * gistutil.c * utilities routines for the postgres GiST index access method. * * * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.16 2006/06/28 12:00:14 teodor Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/genam.h" #include "access/gist_private.h" #include "access/gistscan.h" #include "access/heapam.h" #include "catalog/index.h" #include "miscadmin.h" #include "storage/freespace.h" /* * static *S used for temrorary storage (saves stack and palloc() call) */ static Datum attrS[INDEX_MAX_KEYS]; static bool isnullS[INDEX_MAX_KEYS]; /* * Write itup vector to page, has no control of free space */ OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup, int len, OffsetNumber off) { OffsetNumber l = InvalidOffsetNumber; int i; if (off == InvalidOffsetNumber) off = (PageIsEmpty(page)) ? FirstOffsetNumber : OffsetNumberNext(PageGetMaxOffsetNumber(page)); for (i = 0; i < len; i++) { l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]), off, LP_USED); if (l == InvalidOffsetNumber) elog(ERROR, "failed to add item to index page in \"%s\"", RelationGetRelationName(r)); off++; } return l; } /* * Check space for itup vector on page */ bool gistnospace(Page page, IndexTuple *itvec, int len, OffsetNumber todelete) { unsigned int size = 0, deleted = 0; int i; for (i = 0; i < len; i++) size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData); if ( todelete != InvalidOffsetNumber ) { IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, todelete)); deleted = IndexTupleSize(itup) + sizeof(ItemIdData); } return (PageGetFreeSpace(page) + deleted < size); } bool gistfitpage(IndexTuple *itvec, int len) { int i; Size size=0; for(i=0;i<len;i++) size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData); return (size <= GiSTPageSize); } /* * Read buffer into itup vector */ IndexTuple * gistextractpage(Page page, int *len /* out */ ) { OffsetNumber i, maxoff; IndexTuple *itvec; maxoff = PageGetMaxOffsetNumber(page); *len = maxoff; itvec = palloc(sizeof(IndexTuple) * maxoff); for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i)) itvec[i - FirstOffsetNumber] = (IndexTuple) PageGetItem(page, PageGetItemId(page, i)); return itvec; } /* * join two vectors into one */ IndexTuple * gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen) { itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen)); memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen); *len += addlen; return itvec; } /* * make plain IndexTupleVector */ IndexTupleData * gistfillitupvec(IndexTuple *vec, int veclen, int *memlen) { char *ptr, *ret; int i; *memlen=0; for (i = 0; i < veclen; i++) *memlen += IndexTupleSize(vec[i]); ptr = ret = palloc(*memlen); for (i = 0; i < veclen; i++) { memcpy(ptr, vec[i], IndexTupleSize(vec[i])); ptr += IndexTupleSize(vec[i]); } return (IndexTupleData*)ret; } /* * Make unions of keys in IndexTuple vector, return FALSE if itvec contains * invalid tuple. Resulting Datums aren't compressed. */ bool gistMakeUnionItVec(GISTSTATE *giststate, IndexTuple *itvec, int len, int startkey, Datum *attr, bool *isnull ) { int i; GistEntryVector *evec; int attrsize; evec = (GistEntryVector *) palloc( ( len + 2 ) * sizeof(GISTENTRY) + GEVHDRSZ); for (i = startkey; i < giststate->tupdesc->natts; i++) { int j; evec->n = 0; if ( !isnull[i] ) { gistentryinit( evec->vector[evec->n], attr[i], NULL, NULL, (OffsetNumber) 0, FALSE); evec->n++; } for (j = 0; j < len; j++) { Datum datum; bool IsNull; if (GistTupleIsInvalid(itvec[j])) return FALSE; /* signals that union with invalid tuple => result is invalid */ datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull); if (IsNull) continue; gistdentryinit(giststate, i, evec->vector + evec->n, datum, NULL, NULL, (OffsetNumber) 0, FALSE, IsNull); evec->n++; } /* If this tuple vector was all NULLs, the union is NULL */ if ( evec->n == 0 ) { attr[i] = (Datum) 0; isnull[i] = TRUE; } else { if (evec->n == 1) { evec->n = 2; evec->vector[1] = evec->vector[0]; } /* Make union and store in attr array */ attr[i] = FunctionCall2(&giststate->unionFn[i], PointerGetDatum(evec), PointerGetDatum(&attrsize)); isnull[i] = FALSE; } } return TRUE; } /* * Return an IndexTuple containing the result of applying the "union" * method to the specified IndexTuple vector. */ IndexTuple gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate) { memset(isnullS, TRUE, sizeof(bool) * giststate->tupdesc->natts); if ( !gistMakeUnionItVec(giststate, itvec, len, 0, attrS, isnullS ) ) return gist_form_invalid_tuple(InvalidBlockNumber); return gistFormTuple(giststate, r, attrS, isnullS, false); } /* * makes union of two key */ void gistMakeUnionKey( GISTSTATE *giststate, int attno, GISTENTRY *entry1, bool isnull1, GISTENTRY *entry2, bool isnull2, Datum *dst, bool *dstisnull ) { int dstsize; static char storage[ 2 * sizeof(GISTENTRY) + GEVHDRSZ ]; GistEntryVector *evec = (GistEntryVector*)storage; evec->n = 2; if ( isnull1 && isnull2 ) { *dstisnull = TRUE; *dst = (Datum)0; } else { if ( isnull1 == FALSE && isnull2 == FALSE ) { evec->vector[0] = *entry1; evec->vector[1] = *entry2; } else if ( isnull1 == FALSE ) { evec->vector[0] = *entry1; evec->vector[1] = *entry1; } else { evec->vector[0] = *entry2; evec->vector[1] = *entry2; } *dstisnull = FALSE; *dst = FunctionCall2(&giststate->unionFn[attno], PointerGetDatum(evec), PointerGetDatum(&dstsize)); } } bool gistKeyIsEQ(GISTSTATE *giststate, int attno, Datum a, Datum b) { bool result; FunctionCall3(&giststate->equalFn[attno], a, b, PointerGetDatum(&result)); return result; } /* * Decompress all keys in tuple */ void gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p, OffsetNumber o, GISTENTRY *attdata, bool *isnull) { int i; for (i = 0; i < r->rd_att->natts; i++) { Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]); gistdentryinit(giststate, i, &attdata[i], datum, r, p, o, FALSE, isnull[i]); } } /* * Forms union of oldtup and addtup, if union == oldtup then return NULL */ IndexTuple gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate) { bool neednew = FALSE; GISTENTRY oldentries[INDEX_MAX_KEYS], addentries[INDEX_MAX_KEYS]; bool oldisnull[INDEX_MAX_KEYS], addisnull[INDEX_MAX_KEYS]; IndexTuple newtup = NULL; int i; if (GistTupleIsInvalid(oldtup) || GistTupleIsInvalid(addtup)) return gist_form_invalid_tuple(ItemPointerGetBlockNumber(&(oldtup->t_tid))); gistDeCompressAtt(giststate, r, oldtup, NULL, (OffsetNumber) 0, oldentries, oldisnull); gistDeCompressAtt(giststate, r, addtup, NULL, (OffsetNumber) 0, addentries, addisnull); for(i = 0; i < r->rd_att->natts; i++) { gistMakeUnionKey( giststate, i, oldentries + i, oldisnull[i], addentries + i, addisnull[i], attrS + i, isnullS + i ); if ( neednew ) /* we already need new key, so we can skip check */ continue; if ( isnullS[i] ) /* union of key may be NULL if and only if both keys are NULL */ continue; if ( !addisnull[i] ) { if ( oldisnull[i] || gistKeyIsEQ(giststate, i, oldentries[i].key, attrS[i])==false ) neednew = true; } } if (neednew) { /* need to update key */ newtup = gistFormTuple(giststate, r, attrS, isnullS, false); newtup->t_tid = oldtup->t_tid; } return newtup; } /* * find entry with lowest penalty */ OffsetNumber gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */ GISTSTATE *giststate) { OffsetNumber maxoff; OffsetNumber i; OffsetNumber which; float sum_grow, which_grow[INDEX_MAX_KEYS]; GISTENTRY entry, identry[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; maxoff = PageGetMaxOffsetNumber(p); *which_grow = -1.0; which = InvalidOffsetNumber; sum_grow = 1; gistDeCompressAtt(giststate, r, it, NULL, (OffsetNumber) 0, identry, isnull); Assert( maxoff >= FirstOffsetNumber ); Assert( !GistPageIsLeaf(p) ); for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i)) { int j; IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i)); if (!GistPageIsLeaf(p) && GistTupleIsInvalid(itup)) { ereport(LOG, (errmsg("index \"%s\" needs VACUUM or REINDEX to finish crash recovery", RelationGetRelationName(r)))); continue; } sum_grow = 0; for (j = 0; j < r->rd_att->natts; j++) { Datum datum; float usize; bool IsNull; datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull); gistdentryinit(giststate, j, &entry, datum, r, p, i, FALSE, IsNull); usize = gistpenalty(giststate, j, &entry, IsNull, &identry[j], isnull[j]); if (which_grow[j] < 0 || usize < which_grow[j]) { which = i; which_grow[j] = usize; if (j < r->rd_att->natts - 1 && i == FirstOffsetNumber) which_grow[j + 1] = -1; sum_grow += which_grow[j]; } else if (which_grow[j] == usize) sum_grow += usize; else { sum_grow = 1; break; } } } if (which == InvalidOffsetNumber) which = FirstOffsetNumber; return which; } /* * initialize a GiST entry with a decompressed version of key */ void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, Datum k, Relation r, Page pg, OffsetNumber o, bool l, bool isNull) { if (!isNull) { GISTENTRY *dep; gistentryinit(*e, k, r, pg, o, l); dep = (GISTENTRY *) DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey], PointerGetDatum(e))); /* decompressFn may just return the given pointer */ if (dep != e) gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset, dep->leafkey); } else gistentryinit(*e, (Datum) 0, r, pg, o, l); } /* * initialize a GiST entry with a compressed version of key */ void gistcentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, Datum k, Relation r, Page pg, OffsetNumber o, bool l, bool isNull) { if (!isNull) { GISTENTRY *cep; gistentryinit(*e, k, r, pg, o, l); cep = (GISTENTRY *) DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey], PointerGetDatum(e))); /* compressFn may just return the given pointer */ if (cep != e) gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset, cep->leafkey); } else gistentryinit(*e, (Datum) 0, r, pg, o, l); } IndexTuple gistFormTuple(GISTSTATE *giststate, Relation r, Datum attdata[], bool isnull[], bool newValues) { GISTENTRY centry[INDEX_MAX_KEYS]; Datum compatt[INDEX_MAX_KEYS]; int i; IndexTuple res; for (i = 0; i < r->rd_att->natts; i++) { if (isnull[i]) compatt[i] = (Datum) 0; else { gistcentryinit(giststate, i, ¢ry[i], attdata[i], r, NULL, (OffsetNumber) 0, newValues, FALSE); compatt[i] = centry[i].key; } } res = index_form_tuple(giststate->tupdesc, compatt, isnull); GistTupleSetValid(res); return res; } float gistpenalty(GISTSTATE *giststate, int attno, GISTENTRY *orig, bool isNullOrig, GISTENTRY *add, bool isNullAdd) { float penalty = 0.0; if ( giststate->penaltyFn[attno].fn_strict==FALSE || ( isNullOrig == FALSE && isNullAdd == FALSE ) ) FunctionCall3(&giststate->penaltyFn[attno], PointerGetDatum(orig), PointerGetDatum(add), PointerGetDatum(&penalty)); else if ( isNullOrig && isNullAdd ) penalty = 0.0; else penalty = 1e10; /* try to prevent to mix null and non-null value */ return penalty; } /* * Initialize a new index page */ void GISTInitBuffer(Buffer b, uint32 f) { GISTPageOpaque opaque; Page page; Size pageSize; pageSize = BufferGetPageSize(b); page = BufferGetPage(b); PageInit(page, pageSize, sizeof(GISTPageOpaqueData)); opaque = GistPageGetOpaque(page); opaque->flags = f; opaque->rightlink = InvalidBlockNumber; /* page was already zeroed by PageInit, so this is not needed: */ /* memset(&(opaque->nsn), 0, sizeof(GistNSN)); */ } /* * Verify that a freshly-read page looks sane. */ void gistcheckpage(Relation rel, Buffer buf) { Page page = BufferGetPage(buf); /* * ReadBuffer verifies that every newly-read page passes * PageHeaderIsValid, which means it either contains a reasonably sane * page header or is all-zero. We have to defend against the all-zero * case, however. */ if (PageIsNew(page)) ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("index \"%s\" contains unexpected zero page at block %u", RelationGetRelationName(rel), BufferGetBlockNumber(buf)), errhint("Please REINDEX it."))); /* * Additionally check that the special area looks sane. */ if (((PageHeader) (page))->pd_special != (BLCKSZ - MAXALIGN(sizeof(GISTPageOpaqueData)))) ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), errmsg("index \"%s\" contains corrupted page at block %u", RelationGetRelationName(rel), BufferGetBlockNumber(buf)), errhint("Please REINDEX it."))); } /* * Allocate a new page (either by recycling, or by extending the index file) * * The returned buffer is already pinned and exclusive-locked * * Caller is responsible for initializing the page by calling GISTInitBuffer */ Buffer gistNewBuffer(Relation r) { Buffer buffer; bool needLock; /* First, try to get a page from FSM */ for (;;) { BlockNumber blkno = GetFreeIndexPage(&r->rd_node); if (blkno == InvalidBlockNumber) break; /* nothing left in FSM */ buffer = ReadBuffer(r, blkno); /* * We have to guard against the possibility that someone else already * recycled this page; the buffer may be locked if so. */ if (ConditionalLockBuffer(buffer)) { Page page = BufferGetPage(buffer); if (PageIsNew(page)) return buffer; /* OK to use, if never initialized */ gistcheckpage(r, buffer); if (GistPageIsDeleted(page)) return buffer; /* OK to use */ LockBuffer(buffer, GIST_UNLOCK); } /* Can't use it, so release buffer and try again */ ReleaseBuffer(buffer); } /* Must extend the file */ needLock = !RELATION_IS_LOCAL(r); if (needLock) LockRelationForExtension(r, ExclusiveLock); buffer = ReadBuffer(r, P_NEW); LockBuffer(buffer, GIST_EXCLUSIVE); if (needLock) UnlockRelationForExtension(r, ExclusiveLock); return buffer; }