Commit 37c83936 authored by Teodor Sigaev's avatar Teodor Sigaev

WAL for GiST. It work for online backup and so on, but on

recovery after crash (power loss etc) it may say that it can't restore
index and index should be reindexed.

Some refactoring code.
parent d6636543
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
# Makefile for access/gist # Makefile for access/gist
# #
# IDENTIFICATION # IDENTIFICATION
# $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.12 2003/11/29 19:51:39 pgsql Exp $ # $PostgreSQL: pgsql/src/backend/access/gist/Makefile,v 1.13 2005/06/14 11:45:13 teodor Exp $
# #
#------------------------------------------------------------------------- #-------------------------------------------------------------------------
...@@ -12,7 +12,7 @@ subdir = src/backend/access/gist ...@@ -12,7 +12,7 @@ subdir = src/backend/access/gist
top_builddir = ../../../.. top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global include $(top_builddir)/src/Makefile.global
OBJS = gist.o gistget.o gistscan.o OBJS = gist.o gistutil.o gistxlog.o gistget.o gistscan.o
all: SUBSYS.o all: SUBSYS.o
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.118 2005/06/06 17:01:21 tgl Exp $ * $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.119 2005/06/14 11:45:13 teodor Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -23,43 +23,6 @@ ...@@ -23,43 +23,6 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#undef GIST_PAGEADDITEM
#define ATTSIZE(datum, tupdesc, i, isnull) \
( \
(isnull) ? 0 : \
att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \
)
/* result's status */
#define INSERTED 0x01
#define SPLITED 0x02
/* group flags ( in gistSplit ) */
#define LEFT_ADDED 0x01
#define RIGHT_ADDED 0x02
#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED )
/*
* This defines only for shorter code, used in gistgetadjusted
* and gistadjsubkey only
*/
#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \
if (isnullkey) { \
gistentryinit((evp), rkey, r, NULL, \
(OffsetNumber) 0, rkeyb, FALSE); \
} else { \
gistentryinit((evp), okey, r, NULL, \
(OffsetNumber) 0, okeyb, FALSE); \
} \
} while(0)
#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \
FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b); \
FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b); \
} while(0);
/* Working state for gistbuild and its callback */ /* Working state for gistbuild and its callback */
typedef struct typedef struct
{ {
...@@ -80,62 +43,34 @@ static void gistbuildCallback(Relation index, ...@@ -80,62 +43,34 @@ static void gistbuildCallback(Relation index,
static void gistdoinsert(Relation r, static void gistdoinsert(Relation r,
IndexTuple itup, IndexTuple itup,
GISTSTATE *GISTstate); GISTSTATE *GISTstate);
static int gistlayerinsert(Relation r, BlockNumber blkno, static void gistfindleaf(GISTInsertState *state,
IndexTuple **itup,
int *len,
GISTSTATE *giststate);
static OffsetNumber gistwritebuffer(Relation r,
Page page,
IndexTuple *itup,
int len,
OffsetNumber off);
static bool gistnospace(Page page, IndexTuple *itvec, int len);
static IndexTuple *gistreadbuffer(Buffer buffer, int *len);
static IndexTuple *gistjoinvector(
IndexTuple *itvec, int *len,
IndexTuple *additvec, int addlen);
static IndexTuple gistunion(Relation r, IndexTuple *itvec,
int len, GISTSTATE *giststate);
static IndexTuple gistgetadjusted(Relation r,
IndexTuple oldtup,
IndexTuple addtup,
GISTSTATE *giststate); GISTSTATE *giststate);
static int gistfindgroup(GISTSTATE *giststate,
GISTENTRY *valvec, GIST_SPLITVEC *spl);
static void gistadjsubkey(Relation r, typedef struct PageLayout {
IndexTuple *itup, int *len, gistxlogPage block;
GIST_SPLITVEC *v, OffsetNumber *list;
GISTSTATE *giststate); Buffer buffer; /* to write after all proceed */
static IndexTuple gistFormTuple(GISTSTATE *giststate,
Relation r, Datum *attdata, int *datumsize, bool *isnull); struct PageLayout *next;
} PageLayout;
#define ROTATEDIST(d) do { \
PageLayout *tmp=(PageLayout*)palloc(sizeof(PageLayout)); \
memset(tmp,0,sizeof(PageLayout)); \
tmp->next = (d); \
(d)=tmp; \
} while(0)
static IndexTuple *gistSplit(Relation r, static IndexTuple *gistSplit(Relation r,
Buffer buffer, Buffer buffer,
IndexTuple *itup, IndexTuple *itup,
int *len, int *len,
PageLayout **dist,
GISTSTATE *giststate); GISTSTATE *giststate);
static void gistnewroot(Relation r, IndexTuple *itup, int len);
static void GISTInitBuffer(Buffer b, uint32 f);
static OffsetNumber gistchoose(Relation r, Page p,
IndexTuple it,
GISTSTATE *giststate);
static void gistdelete(Relation r, ItemPointer tid);
#ifdef GIST_PAGEADDITEM
static IndexTuple gist_tuple_replacekey(Relation r,
GISTENTRY entry, IndexTuple t);
#endif
static void gistcentryinit(GISTSTATE *giststate, int nkey,
GISTENTRY *e, Datum k,
Relation r, Page pg,
OffsetNumber o, int b, bool l, bool isNull);
static void gistDeCompressAtt(GISTSTATE *giststate, Relation r,
IndexTuple tuple, Page p, OffsetNumber o,
GISTENTRY *attdata, bool *isnull);
static void gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *key1, bool isNull1,
GISTENTRY *key2, bool isNull2,
float *penalty);
#undef GISTDEBUG #undef GISTDEBUG
...@@ -191,6 +126,26 @@ gistbuild(PG_FUNCTION_ARGS) ...@@ -191,6 +126,26 @@ gistbuild(PG_FUNCTION_ARGS)
/* initialize the root page */ /* initialize the root page */
buffer = ReadBuffer(index, P_NEW); buffer = ReadBuffer(index, P_NEW);
GISTInitBuffer(buffer, F_LEAF); GISTInitBuffer(buffer, F_LEAF);
if ( !index->rd_istemp ) {
XLogRecPtr recptr;
XLogRecData rdata;
Page page;
rdata.buffer = InvalidBuffer;
rdata.data = (char*)&(index->rd_node);
rdata.len = sizeof(RelFileNode);
rdata.next = NULL;
page = BufferGetPage(buffer);
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
}
WriteBuffer(buffer); WriteBuffer(buffer);
/* build the index */ /* build the index */
...@@ -340,51 +295,6 @@ gistinsert(PG_FUNCTION_ARGS) ...@@ -340,51 +295,6 @@ gistinsert(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(true); PG_RETURN_BOOL(true);
} }
#ifdef GIST_PAGEADDITEM
/*
* Take a compressed entry, and install it on a page. Since we now know
* where the entry will live, we decompress it and recompress it using
* that knowledge (some compression routines may want to fish around
* on the page, for example, or do something special for leaf nodes.)
*/
static OffsetNumber
gistPageAddItem(GISTSTATE *giststate,
Relation r,
Page page,
Item item,
Size size,
OffsetNumber offsetNumber,
ItemIdFlags flags,
GISTENTRY *dentry,
IndexTuple *newtup)
{
GISTENTRY tmpcentry;
IndexTuple itup = (IndexTuple) item;
OffsetNumber retval;
Datum datum;
bool IsNull;
/*
* recompress the item given that we now know the exact page and
* offset for insertion
*/
datum = index_getattr(itup, 1, r->rd_att, &IsNull);
gistdentryinit(giststate, 0, dentry, datum,
(Relation) 0, (Page) 0,
(OffsetNumber) InvalidOffsetNumber,
ATTSIZE(datum, r, 1, IsNull),
FALSE, IsNull);
gistcentryinit(giststate, 0, &tmpcentry, dentry->key, r, page,
offsetNumber, dentry->bytes, FALSE);
*newtup = gist_tuple_replacekey(r, tmpcentry, itup);
retval = PageAddItem(page, (Item) *newtup, IndexTupleSize(*newtup),
offsetNumber, flags);
if (retval == InvalidOffsetNumber)
elog(ERROR, "failed to add index item to \"%s\"",
RelationGetRelationName(r));
return retval;
}
#endif
/* /*
* Workhouse routine for doing insertion into a GiST index. Note that * Workhouse routine for doing insertion into a GiST index. Note that
...@@ -394,706 +304,365 @@ gistPageAddItem(GISTSTATE *giststate, ...@@ -394,706 +304,365 @@ gistPageAddItem(GISTSTATE *giststate,
static void static void
gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate) gistdoinsert(Relation r, IndexTuple itup, GISTSTATE *giststate)
{ {
IndexTuple *instup; GISTInsertState state;
int ret,
len = 1;
instup = (IndexTuple *) palloc(sizeof(IndexTuple)); memset(&state, 0, sizeof(GISTInsertState));
instup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
memcpy(instup[0], itup, IndexTupleSize(itup));
ret = gistlayerinsert(r, GIST_ROOT_BLKNO, &instup, &len, giststate); state.itup = (IndexTuple *) palloc(sizeof(IndexTuple));
if (ret & SPLITED) state.itup[0] = (IndexTuple) palloc(IndexTupleSize(itup));
gistnewroot(r, instup, len); memcpy(state.itup[0], itup, IndexTupleSize(itup));
} state.ituplen=1;
state.r = r;
state.key = itup->t_tid;
state.needInsertComplete = true;
state.xlog_mode = false;
static int state.stack = (GISTInsertStack*)palloc(sizeof(GISTInsertStack));
gistlayerinsert(Relation r, BlockNumber blkno, memset( state.stack, 0, sizeof(GISTInsertStack));
IndexTuple **itup, /* in - out, has compressed entry */ state.stack->blkno=GIST_ROOT_BLKNO;
int *len, /* in - out */
GISTSTATE *giststate)
{
Buffer buffer;
Page page;
int ret;
GISTPageOpaque opaque;
buffer = ReadBuffer(r, blkno); gistfindleaf(&state, giststate);
page = (Page) BufferGetPage(buffer); gistmakedeal(&state, giststate);
opaque = (GISTPageOpaque) PageGetSpecialPointer(page); }
if (!(opaque->flags & F_LEAF)) static bool
gistplacetopage(GISTInsertState *state, GISTSTATE *giststate) {
bool is_splitted = false;
if (gistnospace(state->stack->page, state->itup, state->ituplen))
{ {
/* /* no space for insertion */
* This is an internal page, so continue to walk down the IndexTuple *itvec,
* tree. We find the child node that has the minimum insertion *newitup;
* penalty and recursively invoke ourselves to modify that int tlen,olen;
* node. Once the recursive call returns, we may need to PageLayout *dist=NULL, *ptr;
* adjust the parent node for two reasons: the child node
* split, or the key in this node needs to be adjusted for the memset(&dist, 0, sizeof(PageLayout));
* newly inserted key below us. is_splitted = true;
*/ itvec = gistextractbuffer(state->stack->buffer, &tlen);
ItemId iid; olen=tlen;
BlockNumber nblkno; itvec = gistjoinvector(itvec, &tlen, state->itup, state->ituplen);
ItemPointerData oldtid; newitup = gistSplit(state->r, state->stack->buffer, itvec, &tlen, &dist, giststate);
IndexTuple oldtup;
OffsetNumber child; if ( !state->r->rd_istemp && !state->xlog_mode) {
gistxlogPageSplit xlrec;
child = gistchoose(r, page, *(*itup), giststate); XLogRecPtr recptr;
iid = PageGetItemId(page, child); XLogRecData *rdata;
oldtup = (IndexTuple) PageGetItem(page, iid); int i, npage = 0, cur=1;
nblkno = ItemPointerGetBlockNumber(&(oldtup->t_tid));
ptr=dist;
while( ptr ) {
npage++;
ptr=ptr->next;
}
/* rdata = (XLogRecData*)palloc(sizeof(XLogRecData)*(npage*2 + state->ituplen + 2));
* After this call: 1. if child page was splited, then itup
* contains keys for each page 2. if child page wasn't splited, xlrec.node = state->r->rd_node;
* then itup contains additional for adjustment of current key xlrec.origblkno = state->stack->blkno;
*/ xlrec.npage = npage;
ret = gistlayerinsert(r, nblkno, itup, len, giststate); xlrec.nitup = state->ituplen;
xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber;
xlrec.key = state->key;
xlrec.pathlen = (uint16)state->pathlen;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof( gistxlogPageSplit );
rdata[0].next = NULL;
if ( state->pathlen>=0 ) {
rdata[0].next = &(rdata[1]);
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) (state->path);
rdata[1].len = sizeof( BlockNumber ) * state->pathlen;
rdata[1].next = NULL;
cur++;
}
/* new tuples */
for(i=0;i<state->ituplen;i++) {
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char*)(state->itup[i]);
rdata[cur].len = IndexTupleSize(state->itup[i]);
rdata[cur-1].next = &(rdata[cur]);
cur++;
}
/* nothing inserted in child */ /* new page layout */
if (!(ret & INSERTED)) ptr=dist;
{ while(ptr) {
ReleaseBuffer(buffer); rdata[cur].buffer = InvalidBuffer;
return 0x00; rdata[cur].data = (char*)&(ptr->block);
} rdata[cur].len = sizeof(gistxlogPage);
rdata[cur-1].next = &(rdata[cur]);
cur++;
rdata[cur].buffer = InvalidBuffer;
rdata[cur].data = (char*)(ptr->list);
rdata[cur].len = MAXALIGN(sizeof(OffsetNumber)*ptr->block.num);
if ( rdata[cur].len > sizeof(OffsetNumber)*ptr->block.num )
rdata[cur].data = repalloc( rdata[cur].data, rdata[cur].len );
rdata[cur-1].next = &(rdata[cur]);
rdata[cur].next=NULL;
cur++;
ptr=ptr->next;
}
/* child did not split */ START_CRIT_SECTION();
if (!(ret & SPLITED))
{
IndexTuple newtup = gistgetadjusted(r, oldtup, (*itup)[0], giststate);
if (!newtup) recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
{ ptr = dist;
/* not need to update key */ while(ptr) {
ReleaseBuffer(buffer); PageSetLSN(BufferGetPage(ptr->buffer), recptr);
return 0x00; PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
ptr=ptr->next;
} }
(*itup)[0] = newtup; END_CRIT_SECTION();
} }
/* ptr = dist;
* This node's key has been modified, either because a child while(ptr) {
* split occurred or because we needed to adjust our key for WriteBuffer(ptr->buffer);
* an insert in a child node. Therefore, remove the old ptr=ptr->next;
* version of this node's key. }
*/
ItemPointerSet(&oldtid, blkno, child);
gistdelete(r, &oldtid);
/*
* if child was splitted, new key for child will be inserted in
* the end list of child, so we must say to any scans that page is
* changed beginning from 'child' offset
*/
if (ret & SPLITED)
gistadjscans(r, GISTOP_SPLIT, blkno, child);
}
ret = INSERTED; state->itup = newitup;
state->ituplen = tlen; /* now tlen >= 2 */
if (gistnospace(page, (*itup), *len)) if ( state->stack->blkno == GIST_ROOT_BLKNO ) {
{ gistnewroot(state->r, state->itup, state->ituplen, &(state->key), state->xlog_mode);
/* no space for insertion */ state->needInsertComplete=false;
IndexTuple *itvec, }
*newitup; if ( state->xlog_mode )
int tlen, LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
oldlen; ReleaseBuffer(state->stack->buffer);
ret |= SPLITED;
itvec = gistreadbuffer(buffer, &tlen);
itvec = gistjoinvector(itvec, &tlen, (*itup), *len);
oldlen = *len;
newitup = gistSplit(r, buffer, itvec, &tlen, giststate);
ReleaseBuffer(buffer);
*itup = newitup;
*len = tlen; /* now tlen >= 2 */
} }
else else
{ {
/* enough space */ /* enough space */
OffsetNumber off, OffsetNumber off, l;
l;
off = (PageIsEmpty(page)) ? off = (PageIsEmpty(state->stack->page)) ?
FirstOffsetNumber FirstOffsetNumber
: :
OffsetNumberNext(PageGetMaxOffsetNumber(page)); OffsetNumberNext(PageGetMaxOffsetNumber(state->stack->page));
l = gistwritebuffer(r, page, *itup, *len, off); l = gistfillbuffer(state->r, state->stack->page, state->itup, state->ituplen, off);
WriteBuffer(buffer); if ( !state->r->rd_istemp && !state->xlog_mode) {
gistxlogEntryUpdate xlrec;
XLogRecPtr recptr;
XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( state->ituplen + 2 ) );
int i, cur=0;
xlrec.node = state->r->rd_node;
xlrec.blkno = state->stack->blkno;
xlrec.todeleteoffnum = ( state->stack->todelete ) ? state->stack->childoffnum : InvalidOffsetNumber;
xlrec.key = state->key;
xlrec.pathlen = (uint16)state->pathlen;
rdata[0].buffer = InvalidBuffer;
rdata[0].data = (char *) &xlrec;
rdata[0].len = sizeof( gistxlogEntryUpdate );
rdata[0].next = NULL;
if ( state->pathlen>=0 ) {
rdata[0].next = &(rdata[1]);
rdata[1].buffer = InvalidBuffer;
rdata[1].data = (char *) (state->path);
rdata[1].len = sizeof( BlockNumber ) * state->pathlen;
rdata[1].next = NULL;
cur++;
}
for(i=1; i<=state->ituplen; i++) { /* adding tuples */
rdata[i+cur].buffer = InvalidBuffer;
rdata[i+cur].data = (char*)(state->itup[i-1]);
rdata[i+cur].len = IndexTupleSize(state->itup[i-1]);
rdata[i+cur].next = NULL;
rdata[i-1+cur].next = &(rdata[i+cur]);
}
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
PageSetLSN(state->stack->page, recptr);
PageSetTLI(state->stack->page, ThisTimeLineID);
END_CRIT_SECTION();
}
if ( state->stack->blkno == GIST_ROOT_BLKNO )
state->needInsertComplete=false;
if (*len > 1) if ( state->xlog_mode )
{ /* previous insert ret & SPLITED != 0 */ LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(state->stack->buffer);
if (state->ituplen > 1)
{ /* previous is_splitted==true */
/* /*
* child was splited, so we must form union for insertion in * child was splited, so we must form union for insertion in
* parent * parent
*/ */
IndexTuple newtup = gistunion(r, (*itup), *len, giststate); IndexTuple newtup = gistunion(state->r, state->itup, state->ituplen, giststate);
ItemPointerSet(&(newtup->t_tid), blkno, 1); ItemPointerSet(&(newtup->t_tid), state->stack->blkno, FirstOffsetNumber);
(*itup)[0] = newtup; state->itup[0] = newtup;
*len = 1; state->ituplen = 1;
} }
} }
return is_splitted;
return ret;
}
/*
* Write itup vector to page, has no control of free space
*/
static OffsetNumber
gistwritebuffer(Relation r, Page page, IndexTuple *itup,
int len, OffsetNumber off)
{
OffsetNumber l = InvalidOffsetNumber;
int i;
for (i = 0; i < len; i++)
{
#ifdef GIST_PAGEADDITEM
GISTENTRY tmpdentry;
IndexTuple newtup;
bool IsNull;
l = gistPageAddItem(giststate, r, page,
(Item) itup[i], IndexTupleSize(itup[i]),
off, LP_USED, &tmpdentry, &newtup);
off = OffsetNumberNext(off);
#else
l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]),
off, LP_USED);
if (l == InvalidOffsetNumber)
elog(ERROR, "failed to add index item to \"%s\"",
RelationGetRelationName(r));
#endif
}
return l;
} }
/* static void
* Check space for itup vector on page gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
*/
static bool
gistnospace(Page page, IndexTuple *itvec, int len)
{
unsigned int size = 0;
int i;
for (i = 0; i < len; i++)
size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
return (PageGetFreeSpace(page) < size);
}
/*
* Read buffer into itup vector
*/
static IndexTuple *
gistreadbuffer(Buffer buffer, int *len /* out */ )
{
OffsetNumber i,
maxoff;
IndexTuple *itvec;
Page p = (Page) BufferGetPage(buffer);
maxoff = PageGetMaxOffsetNumber(p);
*len = maxoff;
itvec = palloc(sizeof(IndexTuple) * maxoff);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
return itvec;
}
/*
* join two vectors into one
*/
static IndexTuple *
gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
{
itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
*len += addlen;
return itvec;
}
/*
* Return an IndexTuple containing the result of applying the "union"
* method to the specified IndexTuple vector.
*/
static IndexTuple
gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
{ {
Datum attr[INDEX_MAX_KEYS]; ItemId iid;
bool isnull[INDEX_MAX_KEYS]; IndexTuple oldtup;
GistEntryVector *evec; GISTInsertStack *ptr;
int i;
GISTENTRY centry[INDEX_MAX_KEYS];
evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
for (i = 0; i < r->rd_att->natts; i++)
{
Datum datum;
int j;
int real_len;
real_len = 0; /* walk down */
for (j = 0; j < len; j++) while( true ) {
{ GISTPageOpaque opaque;
bool IsNull;
datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
if (IsNull)
continue;
gistdentryinit(giststate, i,
&(evec->vector[real_len]),
datum,
NULL, NULL, (OffsetNumber) 0,
ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
FALSE, IsNull);
real_len++;
}
/* If this tuple vector was all NULLs, the union is NULL */ state->stack->buffer = ReadBuffer(state->r, state->stack->blkno);
if (real_len == 0) state->stack->page = (Page) BufferGetPage(state->stack->buffer);
{ opaque = (GISTPageOpaque) PageGetSpecialPointer(state->stack->page);
attr[i] = (Datum) 0;
isnull[i] = TRUE; if (!(opaque->flags & F_LEAF))
}
else
{ {
int datumsize; /*
* This is an internal page, so continue to walk down the
if (real_len == 1) * tree. We find the child node that has the minimum insertion
{ * penalty and recursively invoke ourselves to modify that
evec->n = 2; * node. Once the recursive call returns, we may need to
gistentryinit(evec->vector[1], * adjust the parent node for two reasons: the child node
evec->vector[0].key, r, NULL, * split, or the key in this node needs to be adjusted for the
(OffsetNumber) 0, evec->vector[0].bytes, FALSE); * newly inserted key below us.
} */
else GISTInsertStack *item=(GISTInsertStack*)palloc(sizeof(GISTInsertStack));
evec->n = real_len;
state->stack->childoffnum = gistchoose(state->r, state->stack->page, state->itup[0], giststate);
/* Compress the result of the union and store in attr array */
datum = FunctionCall2(&giststate->unionFn[i], iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
PointerGetDatum(evec), oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
PointerGetDatum(&datumsize)); item->blkno = ItemPointerGetBlockNumber(&(oldtup->t_tid));
item->parent = state->stack;
gistcentryinit(giststate, i, &centry[i], datum, item->todelete = false;
NULL, NULL, (OffsetNumber) 0, state->stack = item;
datumsize, FALSE, FALSE); } else
isnull[i] = FALSE; break;
attr[i] = centry[i].key;
}
} }
return index_form_tuple(giststate->tupdesc, attr, isnull); /* now state->stack->(page, buffer and blkno) points to leaf page, so insert */
}
/*
* Forms union of oldtup and addtup, if union == oldtup then return NULL
*/
static IndexTuple
gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
{
GistEntryVector *evec;
bool neednew = false;
bool isnull[INDEX_MAX_KEYS];
Datum attr[INDEX_MAX_KEYS];
GISTENTRY centry[INDEX_MAX_KEYS],
oldatt[INDEX_MAX_KEYS],
addatt[INDEX_MAX_KEYS],
*ev0p,
*ev1p;
bool oldisnull[INDEX_MAX_KEYS],
addisnull[INDEX_MAX_KEYS];
IndexTuple newtup = NULL;
int i;
evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
evec->n = 2;
ev0p = &(evec->vector[0]);
ev1p = &(evec->vector[1]);
gistDeCompressAtt(giststate, r, oldtup, NULL,
(OffsetNumber) 0, oldatt, oldisnull);
gistDeCompressAtt(giststate, r, addtup, NULL,
(OffsetNumber) 0, addatt, addisnull);
for (i = 0; i < r->rd_att->natts; i++)
{
if (oldisnull[i] && addisnull[i])
{
attr[i] = (Datum) 0;
isnull[i] = TRUE;
}
else
{
Datum datum;
int datumsize;
FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes,
addisnull[i], addatt[i].key, addatt[i].bytes);
datum = FunctionCall2(&giststate->unionFn[i],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
if (oldisnull[i] || addisnull[i])
{
if (oldisnull[i])
neednew = true;
}
else
{
bool result;
FunctionCall3(&giststate->equalFn[i],
ev0p->key,
datum,
PointerGetDatum(&result));
if (!result) /* form state->path to work xlog */
neednew = true; ptr = state->stack;
} state->pathlen=1;
while( ptr ) {
gistcentryinit(giststate, i, &centry[i], datum, state->pathlen++;
NULL, NULL, (OffsetNumber) 0, ptr=ptr->parent;
datumsize, FALSE, FALSE);
attr[i] = centry[i].key;
isnull[i] = FALSE;
}
} }
state->path=(BlockNumber*)palloc(sizeof(BlockNumber)*state->pathlen);
if (neednew) ptr = state->stack;
{ state->pathlen=0;
/* need to update key */ while( ptr ) {
newtup = index_form_tuple(giststate->tupdesc, attr, isnull); state->path[ state->pathlen ] = ptr->blkno;
newtup->t_tid = oldtup->t_tid; state->pathlen++;
ptr=ptr->parent;
} }
state->pathlen--;
return newtup; state->path++;
} }
static void
gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
{
int lr;
for (lr = 0; lr < 2; lr++)
{
OffsetNumber *entries;
int i;
Datum *attr;
int len,
*attrsize;
bool *isnull;
GistEntryVector *evec;
if (lr)
{
attrsize = spl->spl_lattrsize;
attr = spl->spl_lattr;
len = spl->spl_nleft;
entries = spl->spl_left;
isnull = spl->spl_lisnull;
}
else
{
attrsize = spl->spl_rattrsize;
attr = spl->spl_rattr;
len = spl->spl_nright;
entries = spl->spl_right;
isnull = spl->spl_risnull;
}
evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
for (i = 1; i < r->rd_att->natts; i++)
{
int j;
Datum datum;
int datumsize;
int real_len;
real_len = 0;
for (j = 0; j < len; j++)
{
bool IsNull;
if (spl->spl_idgrp[entries[j]])
continue;
datum = index_getattr(itvec[entries[j] - 1], i + 1,
giststate->tupdesc, &IsNull);
if (IsNull)
continue;
gistdentryinit(giststate, i,
&(evec->vector[real_len]),
datum,
NULL, NULL, (OffsetNumber) 0,
ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
FALSE, IsNull);
real_len++;
}
if (real_len == 0) void
{ gistmakedeal(GISTInsertState *state, GISTSTATE *giststate) {
datum = (Datum) 0; int is_splitted;
datumsize = 0; ItemId iid;
isnull[i] = true; IndexTuple oldtup, newtup;
}
else
{
/*
* evec->vector[0].bytes may be not defined, so form union
* with itself
*/
if (real_len == 1)
{
evec->n = 2;
memcpy(&(evec->vector[1]), &(evec->vector[0]),
sizeof(GISTENTRY));
}
else
evec->n = real_len;
datum = FunctionCall2(&giststate->unionFn[i],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
isnull[i] = false;
}
attr[i] = datum; /* walk up */
attrsize[i] = datumsize; while( true ) {
} /*
} * After this call: 1. if child page was splited, then itup
} * contains keys for each page 2. if child page wasn't splited,
* then itup contains additional for adjustment of current key
*/
/* is_splitted = gistplacetopage(state, giststate );
* find group in vector with equal value
*/
static int
gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
{
int i;
int curid = 1;
/*
* first key is always not null (see gistinsert), so we may not check
* for nulls
*/
for (i = 0; i < spl->spl_nleft; i++)
{
int j;
int len;
bool result;
if (spl->spl_idgrp[spl->spl_left[i]])
continue;
len = 0;
/* find all equal value in right part */
for (j = 0; j < spl->spl_nright; j++)
{
if (spl->spl_idgrp[spl->spl_right[j]])
continue;
FunctionCall3(&giststate->equalFn[0],
valvec[spl->spl_left[i]].key,
valvec[spl->spl_right[j]].key,
PointerGetDatum(&result));
if (result)
{
spl->spl_idgrp[spl->spl_right[j]] = curid;
len++;
}
}
/* find all other equal value in left part */
if (len)
{
/* add current val to list of equal values */
spl->spl_idgrp[spl->spl_left[i]] = curid;
/* searching .. */
for (j = i + 1; j < spl->spl_nleft; j++)
{
if (spl->spl_idgrp[spl->spl_left[j]])
continue;
FunctionCall3(&giststate->equalFn[0],
valvec[spl->spl_left[i]].key,
valvec[spl->spl_left[j]].key,
PointerGetDatum(&result));
if (result)
{
spl->spl_idgrp[spl->spl_left[j]] = curid;
len++;
}
}
spl->spl_ngrp[curid] = len + 1;
curid++;
}
}
return curid; /* pop page from stack */
} state->stack = state->stack->parent;
state->pathlen--;
state->path++;
/* stack is void */
if ( ! state->stack )
break;
/*
* Insert equivalent tuples to left or right page with minimum
* penalty
*/
static void
gistadjsubkey(Relation r,
IndexTuple *itup, /* contains compressed entry */
int *len,
GIST_SPLITVEC *v,
GISTSTATE *giststate)
{
int curlen;
OffsetNumber *curwpos;
GISTENTRY entry,
identry[INDEX_MAX_KEYS],
*ev0p,
*ev1p;
float lpenalty,
rpenalty;
GistEntryVector *evec;
int datumsize;
bool isnull[INDEX_MAX_KEYS];
int i,
j;
/* clear vectors */ /* child did not split */
curlen = v->spl_nleft; if (!is_splitted)
curwpos = v->spl_left;
for (i = 0; i < v->spl_nleft; i++)
{
if (v->spl_idgrp[v->spl_left[i]] == 0)
{ {
*curwpos = v->spl_left[i]; /* parent's tuple */
curwpos++; iid = PageGetItemId(state->stack->page, state->stack->childoffnum);
} oldtup = (IndexTuple) PageGetItem(state->stack->page, iid);
else newtup = gistgetadjusted(state->r, oldtup, state->itup[0], giststate);
curlen--;
} if (!newtup) /* not need to update key */
v->spl_nleft = curlen; break;
curlen = v->spl_nright; state->itup[0] = newtup;
curwpos = v->spl_right;
for (i = 0; i < v->spl_nright; i++)
{
if (v->spl_idgrp[v->spl_right[i]] == 0)
{
*curwpos = v->spl_right[i];
curwpos++;
} }
else
curlen--; /*
* This node's key has been modified, either because a child
* split occurred or because we needed to adjust our key for
* an insert in a child node. Therefore, remove the old
* version of this node's key.
*/
gistadjscans(state->r, GISTOP_DEL, state->stack->blkno, state->stack->childoffnum);
PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
if ( !state->r->rd_istemp )
state->stack->todelete = true;
/*
* if child was splitted, new key for child will be inserted in
* the end list of child, so we must say to any scans that page is
* changed beginning from 'child' offset
*/
if (is_splitted)
gistadjscans(state->r, GISTOP_SPLIT, state->stack->blkno, state->stack->childoffnum);
} /* while */
/* release all buffers */
while( state->stack ) {
if ( state->xlog_mode )
LockBuffer(state->stack->buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(state->stack->buffer);
state->stack = state->stack->parent;
} }
v->spl_nright = curlen;
evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ); /* say to xlog that insert is completed */
evec->n = 2; if ( !state->xlog_mode && state->needInsertComplete && !state->r->rd_istemp ) {
ev0p = &(evec->vector[0]); gistxlogInsertComplete xlrec;
ev1p = &(evec->vector[1]); XLogRecData rdata;
xlrec.node = state->r->rd_node;
xlrec.key = state->key;
rdata.buffer = InvalidBuffer;
rdata.data = (char *) &xlrec;
rdata.len = sizeof( gistxlogInsertComplete );
rdata.next = NULL;
/* add equivalent tuple */ START_CRIT_SECTION();
for (i = 0; i < *len; i++)
{
Datum datum;
if (v->spl_idgrp[i + 1] == 0) /* already inserted */ XLogInsert(RM_GIST_ID, XLOG_GIST_INSERT_COMPLETE, &rdata);
continue;
gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0,
identry, isnull);
v->spl_ngrp[v->spl_idgrp[i + 1]]--; END_CRIT_SECTION();
if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 &&
(v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED)
{
/* force last in group */
rpenalty = 1.0;
lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0;
}
else
{
/* where? */
for (j = 1; j < r->rd_att->natts; j++)
{
gistentryinit(entry, v->spl_lattr[j], r, NULL,
(OffsetNumber) 0, v->spl_lattrsize[j], FALSE);
gistpenalty(giststate, j, &entry, v->spl_lisnull[j],
&identry[j], isnull[j], &lpenalty);
gistentryinit(entry, v->spl_rattr[j], r, NULL,
(OffsetNumber) 0, v->spl_rattrsize[j], FALSE);
gistpenalty(giststate, j, &entry, v->spl_risnull[j],
&identry[j], isnull[j], &rpenalty);
if (lpenalty != rpenalty)
break;
}
}
/*
* add
* XXX: refactor this to avoid duplicating code
*/
if (lpenalty < rpenalty)
{
v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED;
v->spl_left[v->spl_nleft] = i + 1;
v->spl_nleft++;
for (j = 1; j < r->rd_att->natts; j++)
{
if (isnull[j] && v->spl_lisnull[j])
{
v->spl_lattr[j] = (Datum) 0;
v->spl_lattrsize[j] = 0;
}
else
{
FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j],
isnull[j], identry[j].key, identry[j].bytes);
datum = FunctionCall2(&giststate->unionFn[j],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
v->spl_lattr[j] = datum;
v->spl_lattrsize[j] = datumsize;
v->spl_lisnull[j] = false;
}
}
}
else
{
v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED;
v->spl_right[v->spl_nright] = i + 1;
v->spl_nright++;
for (j = 1; j < r->rd_att->natts; j++)
{
if (isnull[j] && v->spl_risnull[j])
{
v->spl_rattr[j] = (Datum) 0;
v->spl_rattrsize[j] = 0;
}
else
{
FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j],
isnull[j], identry[j].key, identry[j].bytes);
datum = FunctionCall2(&giststate->unionFn[j],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
v->spl_rattr[j] = datum;
v->spl_rattrsize[j] = datumsize;
v->spl_risnull[j] = false;
}
}
}
} }
} }
...@@ -1105,6 +674,7 @@ gistSplit(Relation r, ...@@ -1105,6 +674,7 @@ gistSplit(Relation r,
Buffer buffer, Buffer buffer,
IndexTuple *itup, /* contains compressed entry */ IndexTuple *itup, /* contains compressed entry */
int *len, int *len,
PageLayout **dist,
GISTSTATE *giststate) GISTSTATE *giststate)
{ {
Page p; Page p;
...@@ -1225,29 +795,57 @@ gistSplit(Relation r, ...@@ -1225,29 +795,57 @@ gistSplit(Relation r,
/* write on disk (may need another split) */ /* write on disk (may need another split) */
if (gistnospace(right, rvectup, v.spl_nright)) if (gistnospace(right, rvectup, v.spl_nright))
{ {
int i;
PageLayout *d, *origd=*dist;
nlen = v.spl_nright; nlen = v.spl_nright;
newtup = gistSplit(r, rightbuf, rvectup, &nlen, giststate); newtup = gistSplit(r, rightbuf, rvectup, &nlen, dist, giststate);
/* XLOG stuff */
d=*dist;
/* translate offsetnumbers to our */
while( d && d!=origd ) {
for(i=0;i<d->block.num;i++)
d->list[i] = v.spl_right[ d->list[i]-1 ];
d=d->next;
}
ReleaseBuffer(rightbuf); ReleaseBuffer(rightbuf);
} }
else else
{ {
OffsetNumber l; OffsetNumber l;
l = gistwritebuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber); l = gistfillbuffer(r, right, rvectup, v.spl_nright, FirstOffsetNumber);
WriteBuffer(rightbuf); /* XLOG stuff */
ROTATEDIST(*dist);
(*dist)->block.blkno = BufferGetBlockNumber(rightbuf);
(*dist)->block.num = v.spl_nright;
(*dist)->list = v.spl_right;
(*dist)->buffer = rightbuf;
nlen = 1; nlen = 1;
newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1); newtup = (IndexTuple *) palloc(sizeof(IndexTuple) * 1);
newtup[0] = gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull); newtup[0] = gistFormTuple(giststate, r, v.spl_rattr, v.spl_rattrsize, v.spl_risnull);
ItemPointerSet(&(newtup[0]->t_tid), rbknum, 1); ItemPointerSet(&(newtup[0]->t_tid), rbknum, FirstOffsetNumber);
} }
if (gistnospace(left, lvectup, v.spl_nleft)) if (gistnospace(left, lvectup, v.spl_nleft))
{ {
int llen = v.spl_nleft; int llen = v.spl_nleft;
IndexTuple *lntup; IndexTuple *lntup;
int i;
lntup = gistSplit(r, leftbuf, lvectup, &llen, giststate); PageLayout *d, *origd=*dist;
lntup = gistSplit(r, leftbuf, lvectup, &llen, dist, giststate);
/* XLOG stuff */
d=*dist;
/* translate offsetnumbers to our */
while( d && d!=origd ) {
for(i=0;i<d->block.num;i++)
d->list[i] = v.spl_left[ d->list[i]-1 ];
d=d->next;
}
ReleaseBuffer(leftbuf); ReleaseBuffer(leftbuf);
newtup = gistjoinvector(newtup, &nlen, lntup, llen); newtup = gistjoinvector(newtup, &nlen, lntup, llen);
...@@ -1256,151 +854,76 @@ gistSplit(Relation r, ...@@ -1256,151 +854,76 @@ gistSplit(Relation r,
{ {
OffsetNumber l; OffsetNumber l;
l = gistwritebuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber); l = gistfillbuffer(r, left, lvectup, v.spl_nleft, FirstOffsetNumber);
if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO) if (BufferGetBlockNumber(buffer) != GIST_ROOT_BLKNO)
PageRestoreTempPage(left, p); PageRestoreTempPage(left, p);
WriteBuffer(leftbuf); /* XLOG stuff */
ROTATEDIST(*dist);
(*dist)->block.blkno = BufferGetBlockNumber(leftbuf);
(*dist)->block.num = v.spl_nleft;
(*dist)->list = v.spl_left;
(*dist)->buffer = leftbuf;
nlen += 1; nlen += 1;
newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen); newtup = (IndexTuple *) repalloc(newtup, sizeof(IndexTuple) * nlen);
newtup[nlen - 1] = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull); newtup[nlen - 1] = gistFormTuple(giststate, r, v.spl_lattr, v.spl_lattrsize, v.spl_lisnull);
ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, 1); ItemPointerSet(&(newtup[nlen - 1]->t_tid), lbknum, FirstOffsetNumber);
} }
*len = nlen; *len = nlen;
return newtup; return newtup;
} }
static void void
gistnewroot(Relation r, IndexTuple *itup, int len) gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode)
{
Buffer b;
Page p;
b = ReadBuffer(r, GIST_ROOT_BLKNO);
GISTInitBuffer(b, 0);
p = BufferGetPage(b);
gistwritebuffer(r, p, itup, len, FirstOffsetNumber);
WriteBuffer(b);
}
static void
GISTInitBuffer(Buffer b, uint32 f)
{ {
GISTPageOpaque opaque; Buffer buffer;
Page page; Page page;
Size pageSize;
pageSize = BufferGetPageSize(b);
page = BufferGetPage(b);
PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
opaque->flags = f;
}
/*
* find entry with lowest penalty
*/
static OffsetNumber
gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
GISTSTATE *giststate)
{
OffsetNumber maxoff;
OffsetNumber i;
OffsetNumber which;
float sum_grow,
which_grow[INDEX_MAX_KEYS];
GISTENTRY entry,
identry[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
maxoff = PageGetMaxOffsetNumber(p);
*which_grow = -1.0;
which = -1;
sum_grow = 1;
gistDeCompressAtt(giststate, r,
it, NULL, (OffsetNumber) 0,
identry, isnull);
for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i))
{
int j;
IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
sum_grow = 0; buffer = (xlog_mode) ? XLogReadBuffer(false, r, GIST_ROOT_BLKNO) : ReadBuffer(r, GIST_ROOT_BLKNO);
for (j = 0; j < r->rd_att->natts; j++) GISTInitBuffer(buffer, 0);
{ page = BufferGetPage(buffer);
Datum datum;
float usize; gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
bool IsNull; if ( !xlog_mode && !r->rd_istemp ) {
gistxlogEntryUpdate xlrec;
datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull); XLogRecPtr recptr;
gistdentryinit(giststate, j, &entry, datum, r, p, i, XLogRecData *rdata = (XLogRecData*)palloc( sizeof(XLogRecData) * ( len + 1 ) );
ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull), int i;
FALSE, IsNull);
gistpenalty(giststate, j, &entry, IsNull, xlrec.node = r->rd_node;
&identry[j], isnull[j], &usize); xlrec.blkno = GIST_ROOT_BLKNO;
xlrec.todeleteoffnum = InvalidOffsetNumber;
if (which_grow[j] < 0 || usize < which_grow[j]) xlrec.key = *key;
{ xlrec.pathlen=0;
which = i;
which_grow[j] = usize; rdata[0].buffer = InvalidBuffer;
if (j < r->rd_att->natts - 1 && i == FirstOffsetNumber) rdata[0].data = (char *) &xlrec;
which_grow[j + 1] = -1; rdata[0].len = sizeof( gistxlogEntryUpdate );
sum_grow += which_grow[j]; rdata[0].next = NULL;
}
else if (which_grow[j] == usize) for(i=1; i<=len; i++) {
sum_grow += usize; rdata[i].buffer = InvalidBuffer;
else rdata[i].data = (char*)(itup[i-1]);
{ rdata[i].len = IndexTupleSize(itup[i-1]);
sum_grow = 1; rdata[i].next = NULL;
break; rdata[i-1].next = &(rdata[i]);
} }
}
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
} }
if ( xlog_mode )
return which; LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
} }
/*
* Retail deletion of a single tuple.
*
* NB: this is no longer called externally, but is still needed by
* gistlayerinsert(). That dependency will have to be fixed if GIST
* is ever going to allow concurrent insertions.
*/
static void
gistdelete(Relation r, ItemPointer tid)
{
BlockNumber blkno;
OffsetNumber offnum;
Buffer buf;
Page page;
/*
* Since GIST is not marked "amconcurrent" in pg_am, caller should
* have acquired exclusive lock on index relation. We need no locking
* here.
*/
blkno = ItemPointerGetBlockNumber(tid);
offnum = ItemPointerGetOffsetNumber(tid);
/* adjust any scans that will be affected by this deletion */
/* NB: this works only for scans in *this* backend! */
gistadjscans(r, GISTOP_DEL, blkno, offnum);
/* delete the index tuple */
buf = ReadBuffer(r, blkno);
page = BufferGetPage(buf);
PageIndexTupleDelete(page, offnum);
WriteBuffer(buf);
}
/* /*
* Bulk deletion of all index entries pointing to a set of heap tuples. * Bulk deletion of all index entries pointing to a set of heap tuples.
...@@ -1462,6 +985,30 @@ gistbulkdelete(PG_FUNCTION_ARGS) ...@@ -1462,6 +985,30 @@ gistbulkdelete(PG_FUNCTION_ARGS)
page = BufferGetPage(buf); page = BufferGetPage(buf);
PageIndexTupleDelete(page, offnum); PageIndexTupleDelete(page, offnum);
if ( !rel->rd_istemp ) {
gistxlogEntryUpdate xlrec;
XLogRecPtr recptr;
XLogRecData rdata;
xlrec.node = rel->rd_node;
xlrec.blkno = blkno;
xlrec.todeleteoffnum = offnum;
xlrec.pathlen=0;
ItemPointerSetInvalid( &(xlrec.key) );
rdata.buffer = InvalidBuffer;
rdata.data = (char *) &xlrec;
rdata.len = sizeof( gistxlogEntryUpdate );
rdata.next = NULL;
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_DELETE, &rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
}
WriteBuffer(buf); WriteBuffer(buf);
...@@ -1527,159 +1074,6 @@ freeGISTstate(GISTSTATE *giststate) ...@@ -1527,159 +1074,6 @@ freeGISTstate(GISTSTATE *giststate)
/* no work */ /* no work */
} }
#ifdef GIST_PAGEADDITEM
/*
* Given an IndexTuple to be inserted on a page, this routine replaces
* the key with another key, which may involve generating a new IndexTuple
* if the sizes don't match or if the null status changes.
*
* XXX this only works for a single-column index tuple!
*/
static IndexTuple
gist_tuple_replacekey(Relation r, GISTENTRY entry, IndexTuple t)
{
bool IsNull;
Datum datum = index_getattr(t, 1, r->rd_att, &IsNull);
/*
* If new entry fits in index tuple, copy it in. To avoid worrying
* about null-value bitmask, pass it off to the general
* index_form_tuple routine if either the previous or new value is
* NULL.
*/
if (!IsNull && DatumGetPointer(entry.key) != NULL &&
(Size) entry.bytes <= ATTSIZE(datum, r, 1, IsNull))
{
memcpy(DatumGetPointer(datum),
DatumGetPointer(entry.key),
entry.bytes);
/* clear out old size */
t->t_info &= ~INDEX_SIZE_MASK;
/* or in new size */
t->t_info |= MAXALIGN(entry.bytes + sizeof(IndexTupleData));
return t;
}
else
{
/* generate a new index tuple for the compressed entry */
TupleDesc tupDesc = r->rd_att;
IndexTuple newtup;
bool isnull;
isnull = (DatumGetPointer(entry.key) == NULL);
newtup = index_form_tuple(tupDesc, &(entry.key), &isnull);
newtup->t_tid = t->t_tid;
return newtup;
}
}
#endif
/*
* initialize a GiST entry with a decompressed version of key
*/
void
gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
Datum k, Relation r, Page pg, OffsetNumber o,
int b, bool l, bool isNull)
{
if (b && !isNull)
{
GISTENTRY *dep;
gistentryinit(*e, k, r, pg, o, b, l);
dep = (GISTENTRY *)
DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey],
PointerGetDatum(e)));
/* decompressFn may just return the given pointer */
if (dep != e)
gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
dep->bytes, dep->leafkey);
}
else
gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
}
/*
* initialize a GiST entry with a compressed version of key
*/
static void
gistcentryinit(GISTSTATE *giststate, int nkey,
GISTENTRY *e, Datum k, Relation r,
Page pg, OffsetNumber o, int b, bool l, bool isNull)
{
if (!isNull)
{
GISTENTRY *cep;
gistentryinit(*e, k, r, pg, o, b, l);
cep = (GISTENTRY *)
DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey],
PointerGetDatum(e)));
/* compressFn may just return the given pointer */
if (cep != e)
gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset,
cep->bytes, cep->leafkey);
}
else
gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
}
static IndexTuple
gistFormTuple(GISTSTATE *giststate, Relation r,
Datum attdata[], int datumsize[], bool isnull[])
{
GISTENTRY centry[INDEX_MAX_KEYS];
Datum compatt[INDEX_MAX_KEYS];
int i;
for (i = 0; i < r->rd_att->natts; i++)
{
if (isnull[i])
compatt[i] = (Datum) 0;
else
{
gistcentryinit(giststate, i, &centry[i], attdata[i],
NULL, NULL, (OffsetNumber) 0,
datumsize[i], FALSE, FALSE);
compatt[i] = centry[i].key;
}
}
return index_form_tuple(giststate->tupdesc, compatt, isnull);
}
static void
gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
OffsetNumber o, GISTENTRY *attdata, bool *isnull)
{
int i;
for (i = 0; i < r->rd_att->natts; i++)
{
Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
gistdentryinit(giststate, i, &attdata[i],
datum, r, p, o,
ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]),
FALSE, isnull[i]);
}
}
static void
gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *key1, bool isNull1,
GISTENTRY *key2, bool isNull2, float *penalty)
{
if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2))
*penalty = 0.0;
else
FunctionCall3(&giststate->penaltyFn[attno],
PointerGetDatum(key1),
PointerGetDatum(key2),
PointerGetDatum(penalty));
}
#ifdef GISTDEBUG #ifdef GISTDEBUG
static void static void
gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff)
...@@ -1726,13 +1120,3 @@ gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff) ...@@ -1726,13 +1120,3 @@ gist_dumptree(Relation r, int level, BlockNumber blk, OffsetNumber coff)
} }
#endif /* defined GISTDEBUG */ #endif /* defined GISTDEBUG */
void
gist_redo(XLogRecPtr lsn, XLogRecord *record)
{
elog(PANIC, "gist_redo: unimplemented");
}
void
gist_desc(char *buf, uint8 xl_info, char *rec)
{
}
...@@ -8,14 +8,14 @@ ...@@ -8,14 +8,14 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.47 2005/05/17 03:34:18 neilc Exp $ * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.48 2005/06/14 11:45:13 teodor Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h" #include "postgres.h"
#include "access/gist_private.h"
#include "access/itup.h" #include "access/itup.h"
#include "access/gist_private.h"
#include "executor/execdebug.h" #include "executor/execdebug.h"
#include "utils/memutils.h" #include "utils/memutils.h"
......
/*-------------------------------------------------------------------------
*
* gistutil.c
* utilities routines for the postgres GiST index access method.
*
*
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistutil.c,v 1.1 2005/06/14 11:45:13 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/gist_private.h"
#include "access/gistscan.h"
#include "access/heapam.h"
#include "catalog/index.h"
#include "miscadmin.h"
/* group flags ( in gistadjsubkey ) */
#define LEFT_ADDED 0x01
#define RIGHT_ADDED 0x02
#define BOTH_ADDED ( LEFT_ADDED | RIGHT_ADDED )
/*
* This defines is only for shorter code, used in gistgetadjusted
* and gistadjsubkey only
*/
#define FILLITEM(evp, isnullkey, okey, okeyb, rkey, rkeyb) do { \
if (isnullkey) { \
gistentryinit((evp), rkey, r, NULL, \
(OffsetNumber) 0, rkeyb, FALSE); \
} else { \
gistentryinit((evp), okey, r, NULL, \
(OffsetNumber) 0, okeyb, FALSE); \
} \
} while(0)
#define FILLEV(isnull1, key1, key1b, isnull2, key2, key2b) do { \
FILLITEM(*ev0p, isnull1, key1, key1b, key2, key2b); \
FILLITEM(*ev1p, isnull2, key2, key2b, key1, key1b); \
} while(0);
static void
gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *key1, bool isNull1,
GISTENTRY *key2, bool isNull2, float *penalty);
/*
* Write itup vector to page, has no control of free space
*/
OffsetNumber
gistfillbuffer(Relation r, Page page, IndexTuple *itup,
int len, OffsetNumber off)
{
OffsetNumber l = InvalidOffsetNumber;
int i;
for (i = 0; i < len; i++)
{
l = PageAddItem(page, (Item) itup[i], IndexTupleSize(itup[i]),
off, LP_USED);
if (l == InvalidOffsetNumber)
elog(ERROR, "gistfillbuffer: failed to add index item to \"%s\"",
RelationGetRelationName(r));
off++;
}
return l;
}
/*
* Check space for itup vector on page
*/
bool
gistnospace(Page page, IndexTuple *itvec, int len)
{
unsigned int size = 0;
int i;
for (i = 0; i < len; i++)
size += IndexTupleSize(itvec[i]) + sizeof(ItemIdData);
return (PageGetFreeSpace(page) < size);
}
/*
* Read buffer into itup vector
*/
IndexTuple *
gistextractbuffer(Buffer buffer, int *len /* out */ )
{
OffsetNumber i,
maxoff;
IndexTuple *itvec;
Page p = (Page) BufferGetPage(buffer);
maxoff = PageGetMaxOffsetNumber(p);
*len = maxoff;
itvec = palloc(sizeof(IndexTuple) * maxoff);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
itvec[i - 1] = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
return itvec;
}
/*
* join two vectors into one
*/
IndexTuple *
gistjoinvector(IndexTuple *itvec, int *len, IndexTuple *additvec, int addlen)
{
itvec = (IndexTuple *) repalloc((void *) itvec, sizeof(IndexTuple) * ((*len) + addlen));
memmove(&itvec[*len], additvec, sizeof(IndexTuple) * addlen);
*len += addlen;
return itvec;
}
/*
* Return an IndexTuple containing the result of applying the "union"
* method to the specified IndexTuple vector.
*/
IndexTuple
gistunion(Relation r, IndexTuple *itvec, int len, GISTSTATE *giststate)
{
Datum attr[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
GistEntryVector *evec;
int i;
GISTENTRY centry[INDEX_MAX_KEYS];
evec = (GistEntryVector *) palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
for (i = 0; i < r->rd_att->natts; i++)
{
Datum datum;
int j;
int real_len;
real_len = 0;
for (j = 0; j < len; j++)
{
bool IsNull;
datum = index_getattr(itvec[j], i + 1, giststate->tupdesc, &IsNull);
if (IsNull)
continue;
gistdentryinit(giststate, i,
&(evec->vector[real_len]),
datum,
NULL, NULL, (OffsetNumber) 0,
ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
FALSE, IsNull);
real_len++;
}
/* If this tuple vector was all NULLs, the union is NULL */
if (real_len == 0)
{
attr[i] = (Datum) 0;
isnull[i] = TRUE;
}
else
{
int datumsize;
if (real_len == 1)
{
evec->n = 2;
gistentryinit(evec->vector[1],
evec->vector[0].key, r, NULL,
(OffsetNumber) 0, evec->vector[0].bytes, FALSE);
}
else
evec->n = real_len;
/* Compress the result of the union and store in attr array */
datum = FunctionCall2(&giststate->unionFn[i],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
gistcentryinit(giststate, i, &centry[i], datum,
NULL, NULL, (OffsetNumber) 0,
datumsize, FALSE, FALSE);
isnull[i] = FALSE;
attr[i] = centry[i].key;
}
}
return index_form_tuple(giststate->tupdesc, attr, isnull);
}
/*
* Forms union of oldtup and addtup, if union == oldtup then return NULL
*/
IndexTuple
gistgetadjusted(Relation r, IndexTuple oldtup, IndexTuple addtup, GISTSTATE *giststate)
{
GistEntryVector *evec;
bool neednew = false;
bool isnull[INDEX_MAX_KEYS];
Datum attr[INDEX_MAX_KEYS];
GISTENTRY centry[INDEX_MAX_KEYS],
oldatt[INDEX_MAX_KEYS],
addatt[INDEX_MAX_KEYS],
*ev0p,
*ev1p;
bool oldisnull[INDEX_MAX_KEYS],
addisnull[INDEX_MAX_KEYS];
IndexTuple newtup = NULL;
int i;
evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
evec->n = 2;
ev0p = &(evec->vector[0]);
ev1p = &(evec->vector[1]);
gistDeCompressAtt(giststate, r, oldtup, NULL,
(OffsetNumber) 0, oldatt, oldisnull);
gistDeCompressAtt(giststate, r, addtup, NULL,
(OffsetNumber) 0, addatt, addisnull);
for (i = 0; i < r->rd_att->natts; i++)
{
if (oldisnull[i] && addisnull[i])
{
attr[i] = (Datum) 0;
isnull[i] = TRUE;
}
else
{
Datum datum;
int datumsize;
FILLEV(oldisnull[i], oldatt[i].key, oldatt[i].bytes,
addisnull[i], addatt[i].key, addatt[i].bytes);
datum = FunctionCall2(&giststate->unionFn[i],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
if (oldisnull[i] || addisnull[i])
{
if (oldisnull[i])
neednew = true;
}
else
{
bool result;
FunctionCall3(&giststate->equalFn[i],
ev0p->key,
datum,
PointerGetDatum(&result));
if (!result)
neednew = true;
}
gistcentryinit(giststate, i, &centry[i], datum,
NULL, NULL, (OffsetNumber) 0,
datumsize, FALSE, FALSE);
attr[i] = centry[i].key;
isnull[i] = FALSE;
}
}
if (neednew)
{
/* need to update key */
newtup = index_form_tuple(giststate->tupdesc, attr, isnull);
newtup->t_tid = oldtup->t_tid;
}
return newtup;
}
void
gistunionsubkey(Relation r, GISTSTATE *giststate, IndexTuple *itvec, GIST_SPLITVEC *spl)
{
int lr;
for (lr = 0; lr < 2; lr++)
{
OffsetNumber *entries;
int i;
Datum *attr;
int len,
*attrsize;
bool *isnull;
GistEntryVector *evec;
if (lr)
{
attrsize = spl->spl_lattrsize;
attr = spl->spl_lattr;
len = spl->spl_nleft;
entries = spl->spl_left;
isnull = spl->spl_lisnull;
}
else
{
attrsize = spl->spl_rattrsize;
attr = spl->spl_rattr;
len = spl->spl_nright;
entries = spl->spl_right;
isnull = spl->spl_risnull;
}
evec = palloc(((len == 1) ? 2 : len) * sizeof(GISTENTRY) + GEVHDRSZ);
for (i = 1; i < r->rd_att->natts; i++)
{
int j;
Datum datum;
int datumsize;
int real_len;
real_len = 0;
for (j = 0; j < len; j++)
{
bool IsNull;
if (spl->spl_idgrp[entries[j]])
continue;
datum = index_getattr(itvec[entries[j] - 1], i + 1,
giststate->tupdesc, &IsNull);
if (IsNull)
continue;
gistdentryinit(giststate, i,
&(evec->vector[real_len]),
datum,
NULL, NULL, (OffsetNumber) 0,
ATTSIZE(datum, giststate->tupdesc, i + 1, IsNull),
FALSE, IsNull);
real_len++;
}
if (real_len == 0)
{
datum = (Datum) 0;
datumsize = 0;
isnull[i] = true;
}
else
{
/*
* evec->vector[0].bytes may be not defined, so form union
* with itself
*/
if (real_len == 1)
{
evec->n = 2;
memcpy(&(evec->vector[1]), &(evec->vector[0]),
sizeof(GISTENTRY));
}
else
evec->n = real_len;
datum = FunctionCall2(&giststate->unionFn[i],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
isnull[i] = false;
}
attr[i] = datum;
attrsize[i] = datumsize;
}
}
}
/*
* find group in vector with equal value
*/
int
gistfindgroup(GISTSTATE *giststate, GISTENTRY *valvec, GIST_SPLITVEC *spl)
{
int i;
int curid = 1;
/*
* first key is always not null (see gistinsert), so we may not check
* for nulls
*/
for (i = 0; i < spl->spl_nleft; i++)
{
int j;
int len;
bool result;
if (spl->spl_idgrp[spl->spl_left[i]])
continue;
len = 0;
/* find all equal value in right part */
for (j = 0; j < spl->spl_nright; j++)
{
if (spl->spl_idgrp[spl->spl_right[j]])
continue;
FunctionCall3(&giststate->equalFn[0],
valvec[spl->spl_left[i]].key,
valvec[spl->spl_right[j]].key,
PointerGetDatum(&result));
if (result)
{
spl->spl_idgrp[spl->spl_right[j]] = curid;
len++;
}
}
/* find all other equal value in left part */
if (len)
{
/* add current val to list of equal values */
spl->spl_idgrp[spl->spl_left[i]] = curid;
/* searching .. */
for (j = i + 1; j < spl->spl_nleft; j++)
{
if (spl->spl_idgrp[spl->spl_left[j]])
continue;
FunctionCall3(&giststate->equalFn[0],
valvec[spl->spl_left[i]].key,
valvec[spl->spl_left[j]].key,
PointerGetDatum(&result));
if (result)
{
spl->spl_idgrp[spl->spl_left[j]] = curid;
len++;
}
}
spl->spl_ngrp[curid] = len + 1;
curid++;
}
}
return curid;
}
/*
* Insert equivalent tuples to left or right page with minimum
* penalty
*/
void
gistadjsubkey(Relation r,
IndexTuple *itup, /* contains compressed entry */
int *len,
GIST_SPLITVEC *v,
GISTSTATE *giststate)
{
int curlen;
OffsetNumber *curwpos;
GISTENTRY entry,
identry[INDEX_MAX_KEYS],
*ev0p,
*ev1p;
float lpenalty,
rpenalty;
GistEntryVector *evec;
int datumsize;
bool isnull[INDEX_MAX_KEYS];
int i,
j;
/* clear vectors */
curlen = v->spl_nleft;
curwpos = v->spl_left;
for (i = 0; i < v->spl_nleft; i++)
{
if (v->spl_idgrp[v->spl_left[i]] == 0)
{
*curwpos = v->spl_left[i];
curwpos++;
}
else
curlen--;
}
v->spl_nleft = curlen;
curlen = v->spl_nright;
curwpos = v->spl_right;
for (i = 0; i < v->spl_nright; i++)
{
if (v->spl_idgrp[v->spl_right[i]] == 0)
{
*curwpos = v->spl_right[i];
curwpos++;
}
else
curlen--;
}
v->spl_nright = curlen;
evec = palloc(2 * sizeof(GISTENTRY) + GEVHDRSZ);
evec->n = 2;
ev0p = &(evec->vector[0]);
ev1p = &(evec->vector[1]);
/* add equivalent tuple */
for (i = 0; i < *len; i++)
{
Datum datum;
if (v->spl_idgrp[i + 1] == 0) /* already inserted */
continue;
gistDeCompressAtt(giststate, r, itup[i], NULL, (OffsetNumber) 0,
identry, isnull);
v->spl_ngrp[v->spl_idgrp[i + 1]]--;
if (v->spl_ngrp[v->spl_idgrp[i + 1]] == 0 &&
(v->spl_grpflag[v->spl_idgrp[i + 1]] & BOTH_ADDED) != BOTH_ADDED)
{
/* force last in group */
rpenalty = 1.0;
lpenalty = (v->spl_grpflag[v->spl_idgrp[i + 1]] & LEFT_ADDED) ? 2.0 : 0.0;
}
else
{
/* where? */
for (j = 1; j < r->rd_att->natts; j++)
{
gistentryinit(entry, v->spl_lattr[j], r, NULL,
(OffsetNumber) 0, v->spl_lattrsize[j], FALSE);
gistpenalty(giststate, j, &entry, v->spl_lisnull[j],
&identry[j], isnull[j], &lpenalty);
gistentryinit(entry, v->spl_rattr[j], r, NULL,
(OffsetNumber) 0, v->spl_rattrsize[j], FALSE);
gistpenalty(giststate, j, &entry, v->spl_risnull[j],
&identry[j], isnull[j], &rpenalty);
if (lpenalty != rpenalty)
break;
}
}
/*
* add
* XXX: refactor this to avoid duplicating code
*/
if (lpenalty < rpenalty)
{
v->spl_grpflag[v->spl_idgrp[i + 1]] |= LEFT_ADDED;
v->spl_left[v->spl_nleft] = i + 1;
v->spl_nleft++;
for (j = 1; j < r->rd_att->natts; j++)
{
if (isnull[j] && v->spl_lisnull[j])
{
v->spl_lattr[j] = (Datum) 0;
v->spl_lattrsize[j] = 0;
}
else
{
FILLEV(v->spl_lisnull[j], v->spl_lattr[j], v->spl_lattrsize[j],
isnull[j], identry[j].key, identry[j].bytes);
datum = FunctionCall2(&giststate->unionFn[j],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
v->spl_lattr[j] = datum;
v->spl_lattrsize[j] = datumsize;
v->spl_lisnull[j] = false;
}
}
}
else
{
v->spl_grpflag[v->spl_idgrp[i + 1]] |= RIGHT_ADDED;
v->spl_right[v->spl_nright] = i + 1;
v->spl_nright++;
for (j = 1; j < r->rd_att->natts; j++)
{
if (isnull[j] && v->spl_risnull[j])
{
v->spl_rattr[j] = (Datum) 0;
v->spl_rattrsize[j] = 0;
}
else
{
FILLEV(v->spl_risnull[j], v->spl_rattr[j], v->spl_rattrsize[j],
isnull[j], identry[j].key, identry[j].bytes);
datum = FunctionCall2(&giststate->unionFn[j],
PointerGetDatum(evec),
PointerGetDatum(&datumsize));
v->spl_rattr[j] = datum;
v->spl_rattrsize[j] = datumsize;
v->spl_risnull[j] = false;
}
}
}
}
}
/*
* find entry with lowest penalty
*/
OffsetNumber
gistchoose(Relation r, Page p, IndexTuple it, /* it has compressed entry */
GISTSTATE *giststate)
{
OffsetNumber maxoff;
OffsetNumber i;
OffsetNumber which;
float sum_grow,
which_grow[INDEX_MAX_KEYS];
GISTENTRY entry,
identry[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
maxoff = PageGetMaxOffsetNumber(p);
*which_grow = -1.0;
which = -1;
sum_grow = 1;
gistDeCompressAtt(giststate, r,
it, NULL, (OffsetNumber) 0,
identry, isnull);
for (i = FirstOffsetNumber; i <= maxoff && sum_grow; i = OffsetNumberNext(i))
{
int j;
IndexTuple itup = (IndexTuple) PageGetItem(p, PageGetItemId(p, i));
sum_grow = 0;
for (j = 0; j < r->rd_att->natts; j++)
{
Datum datum;
float usize;
bool IsNull;
datum = index_getattr(itup, j + 1, giststate->tupdesc, &IsNull);
gistdentryinit(giststate, j, &entry, datum, r, p, i,
ATTSIZE(datum, giststate->tupdesc, j + 1, IsNull),
FALSE, IsNull);
gistpenalty(giststate, j, &entry, IsNull,
&identry[j], isnull[j], &usize);
if (which_grow[j] < 0 || usize < which_grow[j])
{
which = i;
which_grow[j] = usize;
if (j < r->rd_att->natts - 1 && i == FirstOffsetNumber)
which_grow[j + 1] = -1;
sum_grow += which_grow[j];
}
else if (which_grow[j] == usize)
sum_grow += usize;
else
{
sum_grow = 1;
break;
}
}
}
return which;
}
/*
* initialize a GiST entry with a decompressed version of key
*/
void
gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
Datum k, Relation r, Page pg, OffsetNumber o,
int b, bool l, bool isNull)
{
if (b && !isNull)
{
GISTENTRY *dep;
gistentryinit(*e, k, r, pg, o, b, l);
dep = (GISTENTRY *)
DatumGetPointer(FunctionCall1(&giststate->decompressFn[nkey],
PointerGetDatum(e)));
/* decompressFn may just return the given pointer */
if (dep != e)
gistentryinit(*e, dep->key, dep->rel, dep->page, dep->offset,
dep->bytes, dep->leafkey);
}
else
gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
}
/*
* initialize a GiST entry with a compressed version of key
*/
void
gistcentryinit(GISTSTATE *giststate, int nkey,
GISTENTRY *e, Datum k, Relation r,
Page pg, OffsetNumber o, int b, bool l, bool isNull)
{
if (!isNull)
{
GISTENTRY *cep;
gistentryinit(*e, k, r, pg, o, b, l);
cep = (GISTENTRY *)
DatumGetPointer(FunctionCall1(&giststate->compressFn[nkey],
PointerGetDatum(e)));
/* compressFn may just return the given pointer */
if (cep != e)
gistentryinit(*e, cep->key, cep->rel, cep->page, cep->offset,
cep->bytes, cep->leafkey);
}
else
gistentryinit(*e, (Datum) 0, r, pg, o, 0, l);
}
IndexTuple
gistFormTuple(GISTSTATE *giststate, Relation r,
Datum attdata[], int datumsize[], bool isnull[])
{
GISTENTRY centry[INDEX_MAX_KEYS];
Datum compatt[INDEX_MAX_KEYS];
int i;
for (i = 0; i < r->rd_att->natts; i++)
{
if (isnull[i])
compatt[i] = (Datum) 0;
else
{
gistcentryinit(giststate, i, &centry[i], attdata[i],
NULL, NULL, (OffsetNumber) 0,
datumsize[i], FALSE, FALSE);
compatt[i] = centry[i].key;
}
}
return index_form_tuple(giststate->tupdesc, compatt, isnull);
}
void
gistDeCompressAtt(GISTSTATE *giststate, Relation r, IndexTuple tuple, Page p,
OffsetNumber o, GISTENTRY *attdata, bool *isnull)
{
int i;
for (i = 0; i < r->rd_att->natts; i++)
{
Datum datum = index_getattr(tuple, i + 1, giststate->tupdesc, &isnull[i]);
gistdentryinit(giststate, i, &attdata[i],
datum, r, p, o,
ATTSIZE(datum, giststate->tupdesc, i + 1, isnull[i]),
FALSE, isnull[i]);
}
}
static void
gistpenalty(GISTSTATE *giststate, int attno,
GISTENTRY *key1, bool isNull1,
GISTENTRY *key2, bool isNull2, float *penalty)
{
if (giststate->penaltyFn[attno].fn_strict && (isNull1 || isNull2))
*penalty = 0.0;
else
FunctionCall3(&giststate->penaltyFn[attno],
PointerGetDatum(key1),
PointerGetDatum(key2),
PointerGetDatum(penalty));
}
void
GISTInitBuffer(Buffer b, uint32 f)
{
GISTPageOpaque opaque;
Page page;
Size pageSize;
pageSize = BufferGetPageSize(b);
page = BufferGetPage(b);
PageInit(page, pageSize, sizeof(GISTPageOpaqueData));
opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
opaque->flags = f;
}
/*-------------------------------------------------------------------------
*
* gistxlog.c
* WAL replay logic for GiST.
*
*
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.1 2005/06/14 11:45:13 teodor Exp $
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/gist_private.h"
#include "access/gistscan.h"
#include "access/heapam.h"
#include "catalog/index.h"
#include "commands/vacuum.h"
#include "miscadmin.h"
#include "utils/memutils.h"
typedef struct {
gistxlogEntryUpdate *data;
int len;
IndexTuple *itup;
BlockNumber *path;
} EntryUpdateRecord;
typedef struct {
gistxlogPage *header;
OffsetNumber *offnum;
/* to work with */
Page page;
Buffer buffer;
bool is_ok;
} NewPage;
typedef struct {
gistxlogPageSplit *data;
NewPage *page;
IndexTuple *itup;
BlockNumber *path;
} PageSplitRecord;
/* track for incomplete inserts, idea was taken from nbtxlog.c */
typedef struct gistIncompleteInsert {
RelFileNode node;
ItemPointerData key;
int lenblk;
BlockNumber *blkno;
int pathlen;
BlockNumber *path;
} gistIncompleteInsert;
MemoryContext opCtx;
MemoryContext insertCtx;
static List *incomplete_inserts;
#define ItemPointerEQ( a, b ) \
( \
ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(a) && \
ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \
)
static void
pushIncompleteInsert(RelFileNode node, ItemPointerData key,
BlockNumber *blkno, int lenblk,
BlockNumber *path, int pathlen,
PageSplitRecord *xlinfo /* to extract blkno info */ ) {
MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx);
gistIncompleteInsert *ninsert = (gistIncompleteInsert*)palloc( sizeof(gistIncompleteInsert) );
ninsert->node = node;
ninsert->key = key;
if ( lenblk && blkno ) {
ninsert->lenblk = lenblk;
ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
memcpy(ninsert->blkno, blkno, sizeof(BlockNumber)*ninsert->lenblk);
} else {
int i;
Assert( xlinfo );
ninsert->lenblk = xlinfo->data->npage;
ninsert->blkno = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->lenblk );
for(i=0;i<ninsert->lenblk;i++)
ninsert->blkno[i] = xlinfo->page[i].header->blkno;
}
Assert( ninsert->lenblk>0 );
if ( path && ninsert->pathlen ) {
ninsert->pathlen = pathlen;
ninsert->path = (BlockNumber*)palloc( sizeof(BlockNumber)*ninsert->pathlen );
memcpy(ninsert->path, path, sizeof(BlockNumber)*ninsert->pathlen);
} else {
ninsert->pathlen = 0;
ninsert->path = NULL;
}
incomplete_inserts = lappend(incomplete_inserts, ninsert);
MemoryContextSwitchTo(oldCxt);
}
static void
forgetIncompleteInsert(RelFileNode node, ItemPointerData key) {
ListCell *l;
foreach(l, incomplete_inserts) {
gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l);
if ( RelFileNodeEquals(node, insert->node) && ItemPointerEQ( &(insert->key), &(key) ) ) {
/* found */
if ( insert->path ) pfree( insert->path );
pfree( insert->blkno );
incomplete_inserts = list_delete_ptr(incomplete_inserts, insert);
pfree( insert );
break;
}
}
}
static void
decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) {
char *begin = XLogRecGetData(record), *ptr;
int i=0, addpath=0;
decoded->data = (gistxlogEntryUpdate*)begin;
if ( decoded->data->pathlen ) {
addpath = sizeof(BlockNumber) * decoded->data->pathlen;
decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
} else
decoded->path = NULL;
decoded->len=0;
ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
while( ptr - begin < record->xl_len ) {
decoded->len++;
ptr += IndexTupleSize( (IndexTuple)ptr );
}
decoded->itup=(IndexTuple*)palloc( sizeof( IndexTuple ) * decoded->len );
ptr=begin+sizeof( gistxlogEntryUpdate ) + addpath;
while( ptr - begin < record->xl_len ) {
decoded->itup[i] = (IndexTuple)ptr;
ptr += IndexTupleSize( decoded->itup[i] );
i++;
}
}
static void
gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) {
EntryUpdateRecord xlrec;
Relation reln;
Buffer buffer;
Page page;
decodeEntryUpdateRecord( &xlrec, record );
reln = XLogOpenRelation(xlrec.data->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer(false, reln, xlrec.data->blkno);
if (!BufferIsValid(buffer))
elog(PANIC, "gistRedoEntryUpdateRecord: block unfound");
page = (Page) BufferGetPage(buffer);
if ( isnewroot ) {
if ( !PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page)) ) {
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
} else {
if ( PageIsNew((PageHeader) page) )
elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page");
if (XLByteLE(lsn, PageGetLSN(page))) {
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
}
if ( isnewroot )
GISTInitBuffer(buffer, 0);
else if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber )
PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
/* add tuples */
if ( xlrec.len > 0 ) {
OffsetNumber off = (PageIsEmpty(page)) ?
FirstOffsetNumber
:
OffsetNumberNext(PageGetMaxOffsetNumber(page));
gistfillbuffer(reln, page, xlrec.itup, xlrec.len, off);
}
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
if ( incomplete_inserts != NIL )
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
if ( !isnewroot && xlrec.data->blkno!=GIST_ROOT_BLKNO )
pushIncompleteInsert(xlrec.data->node, xlrec.data->key,
&(xlrec.data->blkno), 1,
xlrec.path, xlrec.data->pathlen,
NULL);
}
}
static void
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) {
char *begin = XLogRecGetData(record), *ptr;
int i=0, addpath = 0;
decoded->data = (gistxlogPageSplit*)begin;
decoded->page = (NewPage*)palloc( sizeof(NewPage) * decoded->data->npage );
decoded->itup = (IndexTuple*)palloc( sizeof(IndexTuple) * decoded->data->nitup );
if ( decoded->data->pathlen ) {
addpath = sizeof(BlockNumber) * decoded->data->pathlen;
decoded->path = (BlockNumber*)(begin+sizeof( gistxlogEntryUpdate ));
} else
decoded->path = NULL;
ptr=begin+sizeof( gistxlogPageSplit ) + addpath;
for(i=0;i<decoded->data->nitup;i++) {
Assert( ptr - begin < record->xl_len );
decoded->itup[i] = (IndexTuple)ptr;
ptr += IndexTupleSize( decoded->itup[i] );
}
for(i=0;i<decoded->data->npage;i++) {
Assert( ptr - begin < record->xl_len );
decoded->page[i].header = (gistxlogPage*)ptr;
ptr += sizeof(gistxlogPage);
Assert( ptr - begin < record->xl_len );
decoded->page[i].offnum = (OffsetNumber*)ptr;
ptr += MAXALIGN( sizeof(OffsetNumber) * decoded->page[i].header->num );
}
}
static void
gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record ) {
PageSplitRecord xlrec;
Relation reln;
Buffer buffer;
Page page;
int i, len=0;
IndexTuple *itup, *institup;
GISTPageOpaque opaque;
bool release=true;
decodePageSplitRecord( &xlrec, record );
reln = XLogOpenRelation(xlrec.data->node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer( false, reln, xlrec.data->origblkno);
if (!BufferIsValid(buffer))
elog(PANIC, "gistRedoEntryUpdateRecord: block unfound");
page = (Page) BufferGetPage(buffer);
if (PageIsNew((PageHeader) page))
elog(PANIC, "gistRedoEntryUpdateRecord: uninitialized page");
if (XLByteLE(lsn, PageGetLSN(page))) {
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
if ( xlrec.data->todeleteoffnum != InvalidOffsetNumber )
PageIndexTupleDelete(page, xlrec.data->todeleteoffnum);
itup = gistextractbuffer(buffer, &len);
itup = gistjoinvector(itup, &len, xlrec.itup, xlrec.data->nitup);
institup = (IndexTuple*)palloc( sizeof(IndexTuple) * len );
opaque = (GISTPageOpaque) PageGetSpecialPointer(page);
for(i=0;i<xlrec.data->npage;i++) {
int j;
NewPage *newpage = xlrec.page + i;
/* prepare itup vector */
for(j=0;j<newpage->header->num;j++)
institup[j] = itup[ newpage->offnum[j] - 1 ];
if ( newpage->header->blkno == xlrec.data->origblkno ) {
/* IncrBufferRefCount(buffer); */
newpage->page = (Page) PageGetTempPage(page, sizeof(GISTPageOpaqueData));
newpage->buffer = buffer;
newpage->is_ok=false;
} else {
newpage->buffer = XLogReadBuffer(true, reln, newpage->header->blkno);
if (!BufferIsValid(newpage->buffer))
elog(PANIC, "gistRedoPageSplitRecord: lost page");
newpage->page = (Page) BufferGetPage(newpage->buffer);
if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(newpage->page))) {
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
newpage->is_ok=true;
continue; /* good page */
} else {
newpage->is_ok=false;
GISTInitBuffer(newpage->buffer, opaque->flags & F_LEAF);
}
}
gistfillbuffer(reln, newpage->page, institup, newpage->header->num, FirstOffsetNumber);
}
for(i=0;i<xlrec.data->npage;i++) {
NewPage *newpage = xlrec.page + i;
if ( newpage->is_ok )
continue;
if ( newpage->header->blkno == xlrec.data->origblkno ) {
PageRestoreTempPage(newpage->page, page);
release = false;
}
PageSetLSN(newpage->page, lsn);
PageSetTLI(newpage->page, ThisTimeLineID);
LockBuffer(newpage->buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(newpage->buffer);
}
if ( release ) {
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
}
if ( ItemPointerIsValid( &(xlrec.data->key) ) ) {
if ( incomplete_inserts != NIL )
forgetIncompleteInsert(xlrec.data->node, xlrec.data->key);
pushIncompleteInsert(xlrec.data->node, xlrec.data->key,
NULL, 0,
xlrec.path, xlrec.data->pathlen,
&xlrec);
}
}
static void
gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) {
RelFileNode *node = (RelFileNode*)XLogRecGetData(record);
Relation reln;
Buffer buffer;
Page page;
reln = XLogOpenRelation(*node);
if (!RelationIsValid(reln))
return;
buffer = XLogReadBuffer( true, reln, GIST_ROOT_BLKNO);
if (!BufferIsValid(buffer))
elog(PANIC, "gistRedoCreateIndex: block unfound");
page = (Page) BufferGetPage(buffer);
if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page))) {
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
return;
}
GISTInitBuffer(buffer, F_LEAF);
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
WriteBuffer(buffer);
}
void
gist_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
MemoryContext oldCxt;
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info) {
case XLOG_GIST_ENTRY_UPDATE:
case XLOG_GIST_ENTRY_DELETE:
gistRedoEntryUpdateRecord(lsn, record,false);
break;
case XLOG_GIST_NEW_ROOT:
gistRedoEntryUpdateRecord(lsn, record,true);
break;
case XLOG_GIST_PAGE_SPLIT:
gistRedoPageSplitRecord(lsn, record);
break;
case XLOG_GIST_CREATE_INDEX:
gistRedoCreateIndex(lsn, record);
break;
case XLOG_GIST_INSERT_COMPLETE:
forgetIncompleteInsert( ((gistxlogInsertComplete*)XLogRecGetData(record))->node,
((gistxlogInsertComplete*)XLogRecGetData(record))->key );
break;
default:
elog(PANIC, "gist_redo: unknown op code %u", info);
}
MemoryContextSwitchTo(oldCxt);
MemoryContextReset(opCtx);
}
static void
out_target(char *buf, RelFileNode node, ItemPointerData key)
{
sprintf(buf + strlen(buf), "rel %u/%u/%u; tid %u/%u",
node.spcNode, node.dbNode, node.relNode,
ItemPointerGetBlockNumber(&key),
ItemPointerGetOffsetNumber(&key));
}
static void
out_gistxlogEntryUpdate(char *buf, gistxlogEntryUpdate *xlrec) {
out_target(buf, xlrec->node, xlrec->key);
sprintf(buf + strlen(buf), "; block number %u; update offset %u;",
xlrec->blkno, xlrec->todeleteoffnum);
}
static void
out_gistxlogPageSplit(char *buf, gistxlogPageSplit *xlrec) {
strcat(buf, "page_split: ");
out_target(buf, xlrec->node, xlrec->key);
sprintf(buf + strlen(buf), "; block number %u; update offset %u; add %d tuples; split to %d pages",
xlrec->origblkno, xlrec->todeleteoffnum,
xlrec->nitup, xlrec->npage);
}
void
gist_desc(char *buf, uint8 xl_info, char *rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
switch (info) {
case XLOG_GIST_ENTRY_UPDATE:
strcat(buf, "entry_update: ");
out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec);
break;
case XLOG_GIST_ENTRY_DELETE:
strcat(buf, "entry_delete: ");
out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate*)rec);
break;
case XLOG_GIST_NEW_ROOT:
strcat(buf, "new_root: ");
out_target(buf, ((gistxlogEntryUpdate*)rec)->node, ((gistxlogEntryUpdate*)rec)->key);
break;
case XLOG_GIST_PAGE_SPLIT:
out_gistxlogPageSplit(buf, (gistxlogPageSplit*)rec);
break;
case XLOG_GIST_CREATE_INDEX:
sprintf(buf + strlen(buf), "create_index: rel %u/%u/%u",
((RelFileNode*)rec)->spcNode,
((RelFileNode*)rec)->dbNode,
((RelFileNode*)rec)->relNode);
break;
case XLOG_GIST_INSERT_COMPLETE:
strcat(buf, "insert_complete: ");
out_target(buf, ((gistxlogInsertComplete*)rec)->node, ((gistxlogInsertComplete*)rec)->key);
default:
elog(PANIC, "gist_desc: unknown op code %u", info);
}
}
#ifdef GIST_INCOMPLETE_INSERT
static void
gistContinueInsert(gistIncompleteInsert *insert) {
GISTSTATE giststate;
GISTInsertState state;
int i;
MemoryContext oldCxt;
oldCxt = MemoryContextSwitchTo(opCtx);
state.r = XLogOpenRelation(insert->node);
if (!RelationIsValid(state.r))
return;
initGISTstate(&giststate, state.r);
state.needInsertComplete=false;
ItemPointerSetInvalid( &(state.key) );
state.path=NULL;
state.pathlen=0;
state.xlog_mode = true;
/* form union tuples */
state.itup = (IndexTuple*)palloc(sizeof(IndexTuple)*insert->lenblk);
state.ituplen = insert->lenblk;
for(i=0;i<insert->lenblk;i++) {
int len=0;
IndexTuple *itup;
Buffer buffer;
Page page;
buffer = XLogReadBuffer(false, state.r, insert->blkno[i]);
if (!BufferIsValid(buffer))
elog(PANIC, "gistContinueInsert: block unfound");
page = (Page) BufferGetPage(buffer);
if ( PageIsNew((PageHeader)page) )
elog(PANIC, "gistContinueInsert: uninitialized page");
itup = gistextractbuffer(buffer, &len);
state.itup[i] = gistunion(state.r, itup, len, &giststate);
ItemPointerSet( &(state.itup[i]->t_tid), insert->blkno[i], FirstOffsetNumber );
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
}
if ( insert->pathlen==0 ) {
/*it was split root, so we should only make new root*/
gistnewroot(state.r, state.itup, state.ituplen, &(state.key), true);
MemoryContextSwitchTo(oldCxt);
MemoryContextReset(opCtx);
return;
}
/* form stack */
state.stack=NULL;
for(i=0;i<insert->pathlen;i++) {
int j,len=0;
IndexTuple *itup;
GISTInsertStack *top = (GISTInsertStack*)palloc( sizeof(GISTInsertStack) );
top->blkno = insert->path[i];
top->buffer = XLogReadBuffer(false, state.r, top->blkno);
if (!BufferIsValid(top->buffer))
elog(PANIC, "gistContinueInsert: block unfound");
top->page = (Page) BufferGetPage(top->buffer);
if ( PageIsNew((PageHeader)(top->page)) )
elog(PANIC, "gistContinueInsert: uninitialized page");
top->todelete = false;
/* find childoffnum */
itup = gistextractbuffer(top->buffer, &len);
top->childoffnum=InvalidOffsetNumber;
for(j=0;j<len && top->childoffnum==InvalidOffsetNumber;j++) {
BlockNumber blkno = ItemPointerGetBlockNumber( &(itup[j]->t_tid) );
if ( i==0 ) {
int k;
for(k=0;k<insert->lenblk;k++)
if ( insert->blkno[k] == blkno ) {
top->childoffnum = j+1;
break;
}
} else if ( insert->path[i-1]==blkno )
top->childoffnum = j+1;
}
if ( top->childoffnum==InvalidOffsetNumber ) {
elog(WARNING, "gistContinueInsert: unknown parent, REINDEX GiST Indexes");
return;
}
if ( i==0 )
PageIndexTupleDelete(top->page, top->childoffnum);
/* install item on right place in stack */
top->parent=NULL;
if ( state.stack ) {
GISTInsertStack *ptr = state.stack;
while( ptr->parent )
ptr = ptr->parent;
ptr->parent=top;
} else
state.stack = top;
}
/* Good. Now we can continue insert */
gistmakedeal(&state, &giststate);
MemoryContextSwitchTo(oldCxt);
MemoryContextReset(opCtx);
}
#endif
void
gist_xlog_startup(void) {
incomplete_inserts=NIL;
insertCtx = AllocSetContextCreate(CurrentMemoryContext,
"GiST insert in xlog temporary context",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
opCtx = createTempGistContext();
}
void
gist_xlog_cleanup(void) {
ListCell *l;
foreach(l, incomplete_inserts) {
gistIncompleteInsert *insert = (gistIncompleteInsert*) lfirst(l);
char buf[1024];
*buf='\0';
out_target(buf, insert->node, insert->key);
elog(LOG,"Incomplete insert: %s; It's needed to reindex", buf);
#ifdef GIST_INCOMPLETE_INSERT
gistContinueInsert(insert);
#endif
}
MemoryContextDelete(opCtx);
MemoryContextDelete(insertCtx);
}
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
* *
* Resource managers definition * Resource managers definition
* *
* $PostgreSQL: pgsql/src/backend/access/transam/rmgr.c,v 1.19 2005/06/08 15:50:26 tgl Exp $ * $PostgreSQL: pgsql/src/backend/access/transam/rmgr.c,v 1.20 2005/06/14 11:45:14 teodor Exp $
*/ */
#include "postgres.h" #include "postgres.h"
...@@ -37,6 +37,6 @@ const RmgrData RmgrTable[RM_MAX_ID + 1] = { ...@@ -37,6 +37,6 @@ const RmgrData RmgrTable[RM_MAX_ID + 1] = {
{"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup}, {"Btree", btree_redo, btree_desc, btree_xlog_startup, btree_xlog_cleanup},
{"Hash", hash_redo, hash_desc, NULL, NULL}, {"Hash", hash_redo, hash_desc, NULL, NULL},
{"Rtree", rtree_redo, rtree_desc, NULL, NULL}, {"Rtree", rtree_redo, rtree_desc, NULL, NULL},
{"Gist", gist_redo, gist_desc, NULL, NULL}, {"Gist", gist_redo, gist_desc, gist_xlog_startup, gist_xlog_cleanup},
{"Sequence", seq_redo, seq_desc, NULL, NULL} {"Sequence", seq_redo, seq_desc, NULL, NULL}
}; };
...@@ -7,15 +7,17 @@ ...@@ -7,15 +7,17 @@
* Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.2 2005/06/06 17:01:24 tgl Exp $ * $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.3 2005/06/14 11:45:14 teodor Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#ifndef GIST_PRIVATE_H #ifndef GIST_PRIVATE_H
#define GIST_PRIVATE_H #define GIST_PRIVATE_H
#include "access/itup.h"
#include "access/gist.h" #include "access/gist.h"
#include "access/xlog.h" #include "access/xlog.h"
#include "access/xlogdefs.h"
#include "fmgr.h" #include "fmgr.h"
/* /*
...@@ -66,6 +68,42 @@ typedef struct GISTScanOpaqueData ...@@ -66,6 +68,42 @@ typedef struct GISTScanOpaqueData
typedef GISTScanOpaqueData *GISTScanOpaque; typedef GISTScanOpaqueData *GISTScanOpaque;
/*
* GISTInsertStack used for locking buffers and transfer arguments during
* insertion
*/
typedef struct GISTInsertStack {
/* current page */
BlockNumber blkno;
Buffer buffer;
Page page;
/* child's offset */
OffsetNumber childoffnum;
/* pointer to parent */
struct GISTInsertStack *parent;
bool todelete;
} GISTInsertStack;
typedef struct {
Relation r;
IndexTuple *itup; /* in/out, points to compressed entry */
int ituplen; /* length of itup */
GISTInsertStack *stack;
bool needInsertComplete;
bool xlog_mode;
/* pointer to heap tuple */
ItemPointerData key;
/* path to stroe in XLog */
BlockNumber *path;
int pathlen;
} GISTInsertState;
/* /*
* When we're doing a scan and updating a tree at the same time, the * When we're doing a scan and updating a tree at the same time, the
* updates may affect the scan. We use the flags entry of the scan's * updates may affect the scan. We use the flags entry of the scan's
...@@ -89,6 +127,72 @@ typedef GISTScanOpaqueData *GISTScanOpaque; ...@@ -89,6 +127,72 @@ typedef GISTScanOpaqueData *GISTScanOpaque;
#define GISTOP_DEL 0 #define GISTOP_DEL 0
#define GISTOP_SPLIT 1 #define GISTOP_SPLIT 1
#define ATTSIZE(datum, tupdesc, i, isnull) \
( \
(isnull) ? 0 : \
att_addlength(0, (tupdesc)->attrs[(i)-1]->attlen, (datum)) \
)
/* XLog stuff */
#define XLOG_GIST_ENTRY_UPDATE 0x00
#define XLOG_GIST_ENTRY_DELETE 0x10
#define XLOG_GIST_NEW_ROOT 0x20
typedef struct gistxlogEntryUpdate {
RelFileNode node;
BlockNumber blkno;
/* if todeleteoffnum!=InvalidOffsetNumber then delete it. */
OffsetNumber todeleteoffnum;
uint16 pathlen;
/*
* It used to identify compliteness of insert.
* Sets to leaf itup
*/
ItemPointerData key;
/* follow:
* 1. path to root (BlockNumber)
* 2. tuples to insert
*/
} gistxlogEntryUpdate;
#define XLOG_GIST_PAGE_SPLIT 0x30
typedef struct gistxlogPageSplit {
RelFileNode node;
BlockNumber origblkno; /*splitted page*/
OffsetNumber todeleteoffnum;
uint16 pathlen;
int npage;
int nitup;
/* see comments on gistxlogEntryUpdate */
ItemPointerData key;
/* follow:
* 1. path to root (BlockNumber)
* 2. tuples to insert
* 3. gistxlogPage and array of OffsetNumber per page
*/
} gistxlogPageSplit;
typedef struct gistxlogPage {
BlockNumber blkno;
int num;
} gistxlogPage;
#define XLOG_GIST_INSERT_COMPLETE 0x40
typedef struct gistxlogInsertComplete {
RelFileNode node;
ItemPointerData key;
} gistxlogInsertComplete;
#define XLOG_GIST_CREATE_INDEX 0x50
/* gist.c */ /* gist.c */
extern Datum gistbuild(PG_FUNCTION_ARGS); extern Datum gistbuild(PG_FUNCTION_ARGS);
extern Datum gistinsert(PG_FUNCTION_ARGS); extern Datum gistinsert(PG_FUNCTION_ARGS);
...@@ -96,15 +200,57 @@ extern Datum gistbulkdelete(PG_FUNCTION_ARGS); ...@@ -96,15 +200,57 @@ extern Datum gistbulkdelete(PG_FUNCTION_ARGS);
extern MemoryContext createTempGistContext(void); extern MemoryContext createTempGistContext(void);
extern void initGISTstate(GISTSTATE *giststate, Relation index); extern void initGISTstate(GISTSTATE *giststate, Relation index);
extern void freeGISTstate(GISTSTATE *giststate); extern void freeGISTstate(GISTSTATE *giststate);
extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e, extern void gistnewroot(Relation r, IndexTuple *itup, int len, ItemPointer key, bool xlog_mode);
Datum k, Relation r, Page pg, OffsetNumber o, extern void gistmakedeal(GISTInsertState *state, GISTSTATE *giststate);
int b, bool l, bool isNull);
/* gistxlog.c */
extern void gist_redo(XLogRecPtr lsn, XLogRecord *record); extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
extern void gist_desc(char *buf, uint8 xl_info, char *rec); extern void gist_desc(char *buf, uint8 xl_info, char *rec);
extern void gist_xlog_startup(void);
extern void gist_xlog_cleanup(void);
/* gistget.c */ /* gistget.c */
extern Datum gistgettuple(PG_FUNCTION_ARGS); extern Datum gistgettuple(PG_FUNCTION_ARGS);
extern Datum gistgetmulti(PG_FUNCTION_ARGS); extern Datum gistgetmulti(PG_FUNCTION_ARGS);
/* gistutil.c */
extern OffsetNumber gistfillbuffer(Relation r, Page page, IndexTuple *itup,
int len, OffsetNumber off);
extern bool gistnospace(Page page, IndexTuple *itvec, int len);
extern IndexTuple * gistextractbuffer(Buffer buffer, int *len /* out */ );
extern IndexTuple * gistjoinvector(
IndexTuple *itvec, int *len,
IndexTuple *additvec, int addlen);
extern IndexTuple gistunion(Relation r, IndexTuple *itvec,
int len, GISTSTATE *giststate);
extern IndexTuple gistgetadjusted(Relation r,
IndexTuple oldtup,
IndexTuple addtup,
GISTSTATE *giststate);
extern int gistfindgroup(GISTSTATE *giststate,
GISTENTRY *valvec, GIST_SPLITVEC *spl);
extern void gistadjsubkey(Relation r,
IndexTuple *itup, int *len,
GIST_SPLITVEC *v,
GISTSTATE *giststate);
extern IndexTuple gistFormTuple(GISTSTATE *giststate,
Relation r, Datum *attdata, int *datumsize, bool *isnull);
extern OffsetNumber gistchoose(Relation r, Page p,
IndexTuple it,
GISTSTATE *giststate);
extern void gistcentryinit(GISTSTATE *giststate, int nkey,
GISTENTRY *e, Datum k,
Relation r, Page pg,
OffsetNumber o, int b, bool l, bool isNull);
extern void gistDeCompressAtt(GISTSTATE *giststate, Relation r,
IndexTuple tuple, Page p, OffsetNumber o,
GISTENTRY *attdata, bool *isnull);
extern void gistunionsubkey(Relation r, GISTSTATE *giststate,
IndexTuple *itvec, GIST_SPLITVEC *spl);
extern void GISTInitBuffer(Buffer b, uint32 f);
extern void gistdentryinit(GISTSTATE *giststate, int nkey, GISTENTRY *e,
Datum k, Relation r, Page pg, OffsetNumber o,
int b, bool l, bool isNull);
#endif /* GIST_PRIVATE_H */ #endif /* GIST_PRIVATE_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment