• Teodor Sigaev's avatar
    GIN improvements · 23416364
    Teodor Sigaev authored
    - Replace sorted array of entries in maintenance_work_mem to binary tree,
      this should improve create performance.
    - More precisely calculate allocated memory, eliminate leaks
      with user-defined extractValue()
    - Improve wordings in tsearch2
    23416364
gininsert.c 10.1 KB
/*-------------------------------------------------------------------------
 *
 * gininsert.c
 *    insert routines for the postgres inverted index access method.
 *
 *
 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *          $PostgreSQL: pgsql/src/backend/access/gin/gininsert.c,v 1.3 2006/07/11 16:55:34 teodor Exp $
 *-------------------------------------------------------------------------
 */

#include "postgres.h"
#include "access/genam.h"
#include "access/gin.h"
#include "access/heapam.h"
#include "catalog/index.h"
#include "miscadmin.h"
#include "storage/freespace.h"
#include "utils/memutils.h"
#include "access/tuptoaster.h"

typedef struct {
	GinState			ginstate;
	double				indtuples;
	MemoryContext		tmpCtx;
	MemoryContext		funcCtx;
	BuildAccumulator	accum;
} GinBuildState;

/*
 * Creates posting tree with one page. Function
 * suppose that items[] fits to page
 */
static BlockNumber
createPostingTree( Relation index, ItemPointerData *items, uint32 nitems ) {
	BlockNumber blkno;
	Buffer buffer = GinNewBuffer(index);
	Page page;

	START_CRIT_SECTION();

	GinInitBuffer( buffer, GIN_DATA|GIN_LEAF );
	page = BufferGetPage(buffer);
	blkno = BufferGetBlockNumber(buffer);

	memcpy( GinDataPageGetData(page), items, sizeof(ItemPointerData) * nitems );
	GinPageGetOpaque(page)->maxoff = nitems;

	if (!index->rd_istemp) {
		XLogRecPtr  recptr;
		XLogRecData rdata[2];
		ginxlogCreatePostingTree	data;

		data.node = index->rd_node;
		data.blkno = blkno;
		data.nitem = nitems;

		rdata[0].buffer = InvalidBuffer;
		rdata[0].data = (char *) &data;
		rdata[0].len = sizeof(ginxlogCreatePostingTree);
		rdata[0].next = &rdata[1];

		rdata[1].buffer = InvalidBuffer;
		rdata[1].data = (char *) items;
		rdata[1].len = sizeof(ItemPointerData) * nitems;
		rdata[1].next = NULL;



		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_PTREE, rdata);
		PageSetLSN(page, recptr);
		PageSetTLI(page, ThisTimeLineID);

	} 

	MarkBufferDirty(buffer);
	UnlockReleaseBuffer(buffer);

	END_CRIT_SECTION();

	return blkno;
}


/*
 * Adds array of item pointers to tuple's posting list or
 * creates posting tree and tuple pointed to tree in a case
 * of not enough space.  Max size of tuple is defined in
 * GinFormTuple().
 */
static IndexTuple
addItemPointersToTuple(Relation index, GinState *ginstate, GinBtreeStack *stack, 
		IndexTuple old, ItemPointerData *items, uint32 nitem, bool isBuild) {
	bool isnull;
	Datum key = index_getattr(old, FirstOffsetNumber, ginstate->tupdesc, &isnull);
	IndexTuple res = GinFormTuple(ginstate, key, NULL, nitem + GinGetNPosting(old));

	if ( res ) {
		/* good, small enough */
		MergeItemPointers( GinGetPosting(res),
			GinGetPosting(old), GinGetNPosting(old),
			items,	nitem
		);
		
		GinSetNPosting(res, nitem + GinGetNPosting(old));
	} else {
		BlockNumber postingRoot;
		GinPostingTreeScan *gdi;

		/* posting list becomes big, so we need to make posting's tree */
		res = GinFormTuple(ginstate, key, NULL, 0);
		postingRoot = createPostingTree(index, GinGetPosting(old), GinGetNPosting(old));
		GinSetPostingTree(res, postingRoot);

		gdi = prepareScanPostingTree(index, postingRoot, FALSE); 
		gdi->btree.isBuild = isBuild;

		insertItemPointer(gdi, items, nitem);

		pfree(gdi);
	}

	return res;
}

/*
 * Inserts only one entry to the index, but it can adds more that 1 
 * ItemPointer. 
 */
static void
ginEntryInsert( Relation index, GinState *ginstate, Datum value, ItemPointerData *items, uint32 nitem, bool isBuild) {
	GinBtreeData	btree;
	GinBtreeStack *stack;
	IndexTuple itup;
	Page page;

	prepareEntryScan( &btree, index, value, ginstate );

	stack = ginFindLeafPage(&btree, NULL);
	page = BufferGetPage( stack->buffer );

	if ( btree.findItem( &btree, stack ) ) {
		/* found entry */
		itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, stack->off));

		if ( GinIsPostingTree(itup) ) {
			/* lock root of posting tree */
			GinPostingTreeScan *gdi;
			BlockNumber	rootPostingTree = GinGetPostingTree(itup); 

			/* release all stack */
			LockBuffer(stack->buffer, GIN_UNLOCK);
			freeGinBtreeStack( stack );

			/* insert into posting tree */
			gdi = prepareScanPostingTree( index, rootPostingTree, FALSE );
			gdi->btree.isBuild = isBuild;
			insertItemPointer(gdi, items, nitem);

			return;
		}

		itup = addItemPointersToTuple(index, ginstate, stack, itup, items, nitem, isBuild);

		btree.isDelete = TRUE;
	} else {
		/* We suppose, that tuple can store at list one itempointer */
		itup = GinFormTuple( ginstate, value, items, 1);
		if ( itup==NULL || IndexTupleSize(itup) >= GinMaxItemSize  )
			elog(ERROR, "huge tuple");

		if ( nitem>1 ) {
			IndexTuple	previtup = itup;

			itup =  addItemPointersToTuple(index, ginstate, stack, previtup, items+1, nitem-1, isBuild);
			pfree(previtup);
		}
	}

	btree.entry = itup;
	ginInsertValue(&btree, stack);
	pfree( itup );
}

/*
 * Saves indexed value in memory accumulator during index creation
 * Function isnt use during normal insert
 */
static uint32
ginHeapTupleBulkInsert(GinBuildState *buildstate, Datum value, ItemPointer heapptr) {
	Datum	*entries;
	uint32 nentries;
	MemoryContext oldCtx;

	oldCtx = MemoryContextSwitchTo(buildstate->funcCtx);
	entries = extractEntriesSU( buildstate->accum.ginstate, value, &nentries);
	MemoryContextSwitchTo(oldCtx);

	if ( nentries==0 )
		/* nothing to insert */
		return 0;

	ginInsertRecordBA( &buildstate->accum, heapptr, entries, nentries);

	MemoryContextReset(buildstate->funcCtx);

	return nentries;
}

static void 
ginBuildCallback(Relation index, HeapTuple htup, Datum *values,
					bool *isnull, bool tupleIsAlive, void *state) {

	GinBuildState	*buildstate = (GinBuildState*)state;
	MemoryContext oldCtx;

	if ( *isnull )
		return;

	oldCtx = MemoryContextSwitchTo(buildstate->tmpCtx);

	buildstate->indtuples += ginHeapTupleBulkInsert(buildstate, *values, &htup->t_self);

	/* we use only half maintenance_work_mem, because there is some leaks 
		during insertion and extract values */ 
	if ( buildstate->accum.allocatedMemory >= maintenance_work_mem*1024L/2L ) { 
		ItemPointerData	*list;
		Datum entry;
		uint32	nlist;

		while( (list=ginGetEntry(&buildstate->accum, &entry, &nlist)) != NULL )
			ginEntryInsert(index, &buildstate->ginstate, entry, list, nlist, TRUE);

		MemoryContextReset(buildstate->tmpCtx);
		ginInitBA(&buildstate->accum);
	}

	MemoryContextSwitchTo(oldCtx);
}

Datum
ginbuild(PG_FUNCTION_ARGS) {
	Relation    heap = (Relation) PG_GETARG_POINTER(0);
	Relation    index = (Relation) PG_GETARG_POINTER(1);
	IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
	IndexBuildResult *result;
	double      reltuples;
	GinBuildState	buildstate;
	Buffer		buffer;
	ItemPointerData	*list;
	Datum entry;
	uint32	nlist;
	MemoryContext oldCtx;

	if (RelationGetNumberOfBlocks(index) != 0)
		elog(ERROR, "index \"%s\" already contains data",
			RelationGetRelationName(index));

	initGinState(&buildstate.ginstate, index);

	/* initialize the root page */
	buffer = GinNewBuffer(index);
	START_CRIT_SECTION();
	GinInitBuffer(buffer, GIN_LEAF);
	if (!index->rd_istemp) {
		XLogRecPtr  recptr;
		XLogRecData rdata;
		Page        page;

		rdata.buffer = InvalidBuffer;
		rdata.data = (char *) &(index->rd_node);
		rdata.len = sizeof(RelFileNode);
		rdata.next = NULL;

		page = BufferGetPage(buffer);


		recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_INDEX, &rdata);
		PageSetLSN(page, recptr);
		PageSetTLI(page, ThisTimeLineID);

	} 

	MarkBufferDirty(buffer);
	UnlockReleaseBuffer(buffer);
	END_CRIT_SECTION();

	/* build the index */
	buildstate.indtuples = 0;

	/*
	 * create a temporary memory context that is reset once for each tuple
	 * inserted into the index
	 */
	buildstate.tmpCtx = AllocSetContextCreate(CurrentMemoryContext,
												"Gin build temporary context",
												ALLOCSET_DEFAULT_MINSIZE,
												ALLOCSET_DEFAULT_INITSIZE,
												ALLOCSET_DEFAULT_MAXSIZE);

	buildstate.funcCtx = AllocSetContextCreate(buildstate.tmpCtx,
												"Gin build temporary context for user-defined function",
												ALLOCSET_DEFAULT_MINSIZE,
												ALLOCSET_DEFAULT_INITSIZE,
												ALLOCSET_DEFAULT_MAXSIZE);

	buildstate.accum.ginstate = &buildstate.ginstate;
	ginInitBA( &buildstate.accum );

	/* do the heap scan */
	reltuples = IndexBuildHeapScan(heap, index, indexInfo,
									ginBuildCallback, (void *) &buildstate);

	oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
	while( (list=ginGetEntry(&buildstate.accum, &entry, &nlist)) != NULL )
		ginEntryInsert(index, &buildstate.ginstate, entry, list, nlist, TRUE);
	MemoryContextSwitchTo(oldCtx);

	MemoryContextDelete(buildstate.tmpCtx);

	/*
	 * Return statistics
	 */
	result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));

	result->heap_tuples = reltuples;
	result->index_tuples = buildstate.indtuples;

	PG_RETURN_POINTER(result);
}

/*
 * Inserts value during normal insertion
 */
static uint32
ginHeapTupleInsert( Relation index, GinState *ginstate, Datum value, ItemPointer item) {
	Datum	*entries;
	uint32 i,nentries;

	entries = extractEntriesSU( ginstate, value, &nentries);

	if ( nentries==0 )
		/* nothing to insert */
		return 0;

	for(i=0;i<nentries;i++) 
		ginEntryInsert(index, ginstate, entries[i], item, 1, FALSE);

	return nentries;
}

Datum
gininsert(PG_FUNCTION_ARGS) {
	Relation    index = (Relation) PG_GETARG_POINTER(0);
	Datum      *values = (Datum *) PG_GETARG_POINTER(1);
	bool       *isnull = (bool *) PG_GETARG_POINTER(2);
	ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
#ifdef NOT_USED
	Relation    heapRel = (Relation) PG_GETARG_POINTER(4);
	bool        checkUnique = PG_GETARG_BOOL(5);
#endif
	GinState 	ginstate;
	MemoryContext oldCtx;
	MemoryContext insertCtx;
	uint32 res;

	if ( *isnull )
		PG_RETURN_BOOL(false);

	insertCtx = AllocSetContextCreate(CurrentMemoryContext,
												"Gin insert temporary context",
												ALLOCSET_DEFAULT_MINSIZE,
												ALLOCSET_DEFAULT_INITSIZE,
												ALLOCSET_DEFAULT_MAXSIZE);

	oldCtx = MemoryContextSwitchTo(insertCtx);

	initGinState(&ginstate, index);

	res = ginHeapTupleInsert(index, &ginstate, *values, ht_ctid); 

	MemoryContextSwitchTo(oldCtx);
	MemoryContextDelete(insertCtx);

	PG_RETURN_BOOL(res>0);
}