Commit b0d5036c authored by Hiroshi Inoue's avatar Hiroshi Inoue

CREATE btree INDEX takes dead tuples into account when old transactions

are running.
parent 5ab40f0b
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.62 2000/07/21 06:42:32 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.63 2000/08/10 02:33:20 inoue Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "catalog/index.h" #include "catalog/index.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/sinval.h"
bool BuildingBtree = false; /* see comment in btbuild() */ bool BuildingBtree = false; /* see comment in btbuild() */
...@@ -70,6 +71,16 @@ btbuild(PG_FUNCTION_ARGS) ...@@ -70,6 +71,16 @@ btbuild(PG_FUNCTION_ARGS)
BTSpool *spool = NULL; BTSpool *spool = NULL;
BTItem btitem; BTItem btitem;
bool usefast; bool usefast;
Snapshot snapshot;
TransactionId XmaxRecent;
/*
* spool2 is needed only when the index is an unique index.
* Dead tuples are put into spool2 instead of spool in
* order to avoid uniqueness check.
*/
BTSpool *spool2 = NULL;
bool tupleIsAlive;
int dead_count;
/* note that this is a new btree */ /* note that this is a new btree */
BuildingBtree = true; BuildingBtree = true;
...@@ -135,13 +146,41 @@ btbuild(PG_FUNCTION_ARGS) ...@@ -135,13 +146,41 @@ btbuild(PG_FUNCTION_ARGS)
nhtups = nitups = 0; nhtups = nitups = 0;
if (usefast) if (usefast)
{
spool = _bt_spoolinit(index, indexInfo->ii_Unique); spool = _bt_spoolinit(index, indexInfo->ii_Unique);
/*
* Different from spool,the uniqueness isn't checked
* for spool2.
*/
if (indexInfo->ii_Unique)
spool2 = _bt_spoolinit(index, false);
}
/* start a heap scan */ /* start a heap scan */
hscan = heap_beginscan(heap, 0, SnapshotNow, 0, (ScanKey) NULL); dead_count = 0;
snapshot = (IsBootstrapProcessingMode() ? SnapshotNow : SnapshotAny);
hscan = heap_beginscan(heap, 0, snapshot, 0, (ScanKey) NULL);
XmaxRecent = 0;
if (snapshot == SnapshotAny)
GetXmaxRecent(&XmaxRecent);
while (HeapTupleIsValid(htup = heap_getnext(hscan, 0))) while (HeapTupleIsValid(htup = heap_getnext(hscan, 0)))
{ {
if (snapshot == SnapshotAny)
{
tupleIsAlive = HeapTupleSatisfiesNow(htup->t_data);
if (!tupleIsAlive)
{
if ((htup->t_data->t_infomask & HEAP_XMIN_INVALID) != 0)
continue;
if (htup->t_data->t_infomask & HEAP_XMAX_COMMITTED &&
htup->t_data->t_xmax < XmaxRecent)
continue;
}
}
else
tupleIsAlive = true;
MemoryContextReset(econtext->ecxt_per_tuple_memory); MemoryContextReset(econtext->ecxt_per_tuple_memory);
nhtups++; nhtups++;
...@@ -222,7 +261,15 @@ btbuild(PG_FUNCTION_ARGS) ...@@ -222,7 +261,15 @@ btbuild(PG_FUNCTION_ARGS)
* into the btree. * into the btree.
*/ */
if (usefast) if (usefast)
{
if (tupleIsAlive || !spool2)
_bt_spool(btitem, spool); _bt_spool(btitem, spool);
else /* dead tuples are put into spool2 */
{
dead_count++;
_bt_spool(btitem, spool2);
}
}
else else
res = _bt_doinsert(index, btitem, indexInfo->ii_Unique, heap); res = _bt_doinsert(index, btitem, indexInfo->ii_Unique, heap);
...@@ -234,6 +281,11 @@ btbuild(PG_FUNCTION_ARGS) ...@@ -234,6 +281,11 @@ btbuild(PG_FUNCTION_ARGS)
/* okay, all heap tuples are indexed */ /* okay, all heap tuples are indexed */
heap_endscan(hscan); heap_endscan(hscan);
if (spool2 && !dead_count) /* spool2 was found to be unnecessary */
{
_bt_spooldestroy(spool2);
spool2 = NULL;
}
#ifndef OMIT_PARTIAL_INDEX #ifndef OMIT_PARTIAL_INDEX
if (pred != NULL || oldPred != NULL) if (pred != NULL || oldPred != NULL)
...@@ -250,8 +302,10 @@ btbuild(PG_FUNCTION_ARGS) ...@@ -250,8 +302,10 @@ btbuild(PG_FUNCTION_ARGS)
*/ */
if (usefast) if (usefast)
{ {
_bt_leafbuild(spool); _bt_leafbuild(spool, spool2);
_bt_spooldestroy(spool); _bt_spooldestroy(spool);
if (spool2)
_bt_spooldestroy(spool2);
} }
#ifdef BTREE_BUILD_STATS #ifdef BTREE_BUILD_STATS
......
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsort.c,v 1.56 2000/07/21 22:14:09 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtsort.c,v 1.57 2000/08/10 02:33:20 inoue Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -95,7 +95,7 @@ static void _bt_sortaddtup(Page page, Size itemsize, ...@@ -95,7 +95,7 @@ static void _bt_sortaddtup(Page page, Size itemsize,
BTItem btitem, OffsetNumber itup_off); BTItem btitem, OffsetNumber itup_off);
static void _bt_buildadd(Relation index, BTPageState *state, BTItem bti); static void _bt_buildadd(Relation index, BTPageState *state, BTItem bti);
static void _bt_uppershutdown(Relation index, BTPageState *state); static void _bt_uppershutdown(Relation index, BTPageState *state);
static void _bt_load(Relation index, BTSpool *btspool); static void _bt_load(Relation index, BTSpool *btspool, BTSpool *btspool2);
/* /*
...@@ -153,7 +153,7 @@ _bt_spool(BTItem btitem, BTSpool *btspool) ...@@ -153,7 +153,7 @@ _bt_spool(BTItem btitem, BTSpool *btspool)
* create an entire btree. * create an entire btree.
*/ */
void void
_bt_leafbuild(BTSpool *btspool) _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
{ {
#ifdef BTREE_BUILD_STATS #ifdef BTREE_BUILD_STATS
if (Show_btree_build_stats) if (Show_btree_build_stats)
...@@ -165,7 +165,9 @@ _bt_leafbuild(BTSpool *btspool) ...@@ -165,7 +165,9 @@ _bt_leafbuild(BTSpool *btspool)
#endif /* BTREE_BUILD_STATS */ #endif /* BTREE_BUILD_STATS */
tuplesort_performsort(btspool->sortstate); tuplesort_performsort(btspool->sortstate);
_bt_load(btspool->index, btspool); if (btspool2)
tuplesort_performsort(btspool2->sortstate);
_bt_load(btspool->index, btspool, btspool2);
} }
...@@ -523,29 +525,107 @@ _bt_uppershutdown(Relation index, BTPageState *state) ...@@ -523,29 +525,107 @@ _bt_uppershutdown(Relation index, BTPageState *state)
* btree leaves. * btree leaves.
*/ */
static void static void
_bt_load(Relation index, BTSpool *btspool) _bt_load(Relation index, BTSpool *btspool, BTSpool *btspool2)
{ {
BTPageState *state = NULL; BTPageState *state = NULL;
bool merge = (btspool2 != NULL);
BTItem bti, bti2 = NULL;
bool should_free, should_free2, load1;
TupleDesc tupdes = RelationGetDescr(index);
int i, keysz = RelationGetNumberOfAttributes(index);
ScanKey indexScanKey = NULL;
if (merge)
{
/*
* Another BTSpool for dead tuples exists.
* Now we have to merge btspool and btspool2.
*/
ScanKey entry;
Datum attrDatum1, attrDatum2;
bool isFirstNull, isSecondNull;
int32 compare;
/* the preparation of merge */
bti = (BTItem) tuplesort_getindextuple(btspool->sortstate, true, &should_free);
bti2 = (BTItem) tuplesort_getindextuple(btspool2->sortstate, true, &should_free2);
indexScanKey = _bt_mkscankey_nodata(index);
for (;;) for (;;)
{ {
BTItem bti; load1 = true; /* load BTSpool next ? */
bool should_free; if (NULL == bti2)
{
if (NULL == bti)
break;
}
else if (NULL != bti)
{
bti = (BTItem) tuplesort_getindextuple(btspool->sortstate, true, for (i = 1; i <= keysz; i++)
&should_free); {
if (bti == (BTItem) NULL) entry = indexScanKey + i - 1;
attrDatum1 = index_getattr((IndexTuple)bti, i, tupdes, &isFirstNull);
attrDatum2 = index_getattr((IndexTuple)bti2, i, tupdes, &isSecondNull);
if (isFirstNull)
{
if (!isSecondNull)
{
load1 = false;
break;
}
}
else if (isSecondNull)
break;
else
{
compare = DatumGetInt32(FunctionCall2(&entry->sk_func, attrDatum1, attrDatum2));
if (compare > 0)
{
load1 = false;
break;
}
else if (compare < 0)
break; break;
}
}
}
else
load1 = false;
/* When we see first tuple, create first index page */ /* When we see first tuple, create first index page */
if (state == NULL) if (state == NULL)
state = _bt_pagestate(index, BTP_LEAF, 0); state = _bt_pagestate(index, BTP_LEAF, 0);
if (load1)
{
_bt_buildadd(index, state, bti); _bt_buildadd(index, state, bti);
if (should_free)
pfree((void *) bti);
bti = (BTItem) tuplesort_getindextuple(btspool->sortstate, true, &should_free);
}
else
{
_bt_buildadd(index, state, bti2);
if (should_free2)
pfree((void *) bti2);
bti2 = (BTItem) tuplesort_getindextuple(btspool2->sortstate, true, &should_free2);
}
}
_bt_freeskey(indexScanKey);
}
else /* merge is unnecessary */
{
while (bti = (BTItem) tuplesort_getindextuple(btspool->sortstate, true, &should_free), bti != (BTItem) NULL)
{
/* When we see first tuple, create first index page */
if (state == NULL)
state = _bt_pagestate(index, BTP_LEAF, 0);
_bt_buildadd(index, state, bti);
if (should_free) if (should_free)
pfree((void *) bti); pfree((void *) bti);
} }
}
/* Close down final pages, if we had any data at all */ /* Close down final pages, if we had any data at all */
if (state != NULL) if (state != NULL)
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $Id: nbtree.h,v 1.40 2000/07/25 04:47:57 tgl Exp $ * $Id: nbtree.h,v 1.41 2000/08/10 02:33:19 inoue Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -289,6 +289,6 @@ typedef struct BTSpool BTSpool; /* opaque type known only within nbtsort.c */ ...@@ -289,6 +289,6 @@ typedef struct BTSpool BTSpool; /* opaque type known only within nbtsort.c */
extern BTSpool *_bt_spoolinit(Relation index, bool isunique); extern BTSpool *_bt_spoolinit(Relation index, bool isunique);
extern void _bt_spooldestroy(BTSpool *btspool); extern void _bt_spooldestroy(BTSpool *btspool);
extern void _bt_spool(BTItem btitem, BTSpool *btspool); extern void _bt_spool(BTItem btitem, BTSpool *btspool);
extern void _bt_leafbuild(BTSpool *btspool); extern void _bt_leafbuild(BTSpool *btspool, BTSpool *spool2);
#endif /* NBTREE_H */ #endif /* NBTREE_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment