Commit 89395bfa authored by Tom Lane's avatar Tom Lane

Improve gist XLOG code to follow the coding rules needed to prevent

torn-page problems.  This introduces some issues of its own, mainly
that there are now some critical sections of unreasonably broad scope,
but it's a step forward anyway.  Further cleanup will require some
code refactoring that I'd prefer to get Oleg and Teodor involved in.
parent 4243f238
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.129 2006/03/05 15:58:20 momjian Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gist.c,v 1.130 2006/03/30 23:03:09 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -90,6 +90,7 @@ gistbuild(PG_FUNCTION_ARGS)
double reltuples;
GISTBuildState buildstate;
Buffer buffer;
Page page;
/*
* We expect to be called exactly once for any index relation. If that's
......@@ -104,33 +105,33 @@ gistbuild(PG_FUNCTION_ARGS)
/* initialize the root page */
buffer = gistNewBuffer(index);
Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
page = BufferGetPage(buffer);
START_CRIT_SECTION();
GISTInitBuffer(buffer, F_LEAF);
if (!index->rd_istemp)
{
XLogRecPtr recptr;
XLogRecData rdata;
Page page;
rdata.buffer = InvalidBuffer;
rdata.data = (char *) &(index->rd_node);
rdata.len = sizeof(RelFileNode);
rdata.buffer = InvalidBuffer;
rdata.next = NULL;
page = BufferGetPage(buffer);
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX, &rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
}
else
PageSetLSN(BufferGetPage(buffer), XLogRecPtrForTemp);
PageSetLSN(page, XLogRecPtrForTemp);
LockBuffer(buffer, GIST_UNLOCK);
WriteBuffer(buffer);
END_CRIT_SECTION();
/* build the index */
buildstate.numindexattrs = indexInfo->ii_NumIndexAttrs;
buildstate.indtuples = 0;
......@@ -305,6 +306,15 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
bool is_splitted = false;
bool is_leaf = (GistPageIsLeaf(state->stack->page)) ? true : false;
/*
* XXX this code really ought to work by locking, but not modifying,
* all the buffers it needs; then starting a critical section; then
* modifying the buffers in an already-determined way and writing an
* XLOG record to reflect that. Since it doesn't, we've got to put
* a critical section around the entire process, which is horrible
* from a robustness point of view.
*/
START_CRIT_SECTION();
if (!is_leaf)
......@@ -312,6 +322,11 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
* This node's key has been modified, either because a child split
* occurred or because we needed to adjust our key for an insert in a
* child node. Therefore, remove the old version of this node's key.
*
* Note: for WAL replay, in the non-split case we handle this by
* setting up a one-element todelete array; in the split case, it's
* handled implicitly because the tuple vector passed to gistSplit
* won't include this tuple.
*/
PageIndexTupleDelete(state->stack->page, state->stack->childoffnum);
......@@ -336,9 +351,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
XLogRecData *rdata;
rdata = formSplitRdata(state->r->rd_node, state->stack->blkno,
&(state->key), dist);
START_CRIT_SECTION();
is_leaf, &(state->key), dist);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
ptr = dist;
......@@ -348,8 +361,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
PageSetTLI(BufferGetPage(ptr->buffer), ThisTimeLineID);
ptr = ptr->next;
}
END_CRIT_SECTION();
}
else
{
......@@ -410,7 +421,6 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
else
ourpage = dist;
/* now gets all needed data, and sets nsn's */
page = (Page) BufferGetPage(ourpage->buffer);
opaque = GistPageGetOpaque(page);
......@@ -437,9 +447,12 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
WriteBuffer(ptr->buffer);
ptr = ptr->next;
}
}
WriteNoReleaseBuffer(state->stack->buffer);
}
END_CRIT_SECTION();
}
else
{
/* enough space */
......@@ -451,7 +464,7 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
if (!state->r->rd_istemp)
{
OffsetNumber noffs = 0,
offs[MAXALIGN(sizeof(OffsetNumber)) / sizeof(OffsetNumber)];
offs[1];
XLogRecPtr recptr;
XLogRecData *rdata;
......@@ -462,17 +475,14 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
noffs = 1;
}
rdata = formUpdateRdata(state->r->rd_node, state->stack->blkno,
offs, noffs, false, state->itup, state->ituplen,
rdata = formUpdateRdata(state->r->rd_node, state->stack->buffer,
offs, noffs, false,
state->itup, state->ituplen,
&(state->key));
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
PageSetLSN(state->stack->page, recptr);
PageSetTLI(state->stack->page, ThisTimeLineID);
END_CRIT_SECTION();
}
else
PageSetLSN(state->stack->page, XLogRecPtrForTemp);
......@@ -481,6 +491,8 @@ gistplacetopage(GISTInsertState *state, GISTSTATE *giststate)
state->needInsertComplete = false;
WriteNoReleaseBuffer(state->stack->buffer);
END_CRIT_SECTION();
if (!is_leaf) /* small optimization: inform scan ablout
* deleting... */
gistadjscans(state->r, GISTOP_DEL, state->stack->blkno,
......@@ -636,30 +648,14 @@ gistfindleaf(GISTInsertState *state, GISTSTATE *giststate)
}
/*
* Should have the same interface as XLogReadBuffer
*/
static Buffer
gistReadAndLockBuffer(Relation r, BlockNumber blkno)
{
Buffer buffer = ReadBuffer(r, blkno);
LockBuffer(buffer, GIST_SHARE);
return buffer;
}
/*
* Traverse the tree to find path from root page.
* Traverse the tree to find path from root page to specified "child" block.
*
* returns from the begining of closest parent;
*
* Function is used in both regular and recovery mode, so must work with
* different read functions (gistReadAndLockBuffer and XLogReadBuffer)
*
* To prevent deadlocks, this should lock only one page simultaneously.
*/
GISTInsertStack *
gistFindPath(Relation r, BlockNumber child,
Buffer (*myReadBuffer) (Relation, BlockNumber))
gistFindPath(Relation r, BlockNumber child)
{
Page page;
Buffer buffer;
......@@ -677,7 +673,8 @@ gistFindPath(Relation r, BlockNumber child,
while (top && top->blkno != child)
{
buffer = myReadBuffer(r, top->blkno); /* locks buffer */
buffer = ReadBuffer(r, top->blkno);
LockBuffer(buffer, GIST_SHARE);
gistcheckpage(r, buffer);
page = (Page) BufferGetPage(buffer);
......@@ -833,7 +830,7 @@ gistFindCorrectParent(Relation r, GISTInsertStack *child)
}
/* ok, find new path */
ptr = parent = gistFindPath(r, child->blkno, gistReadAndLockBuffer);
ptr = parent = gistFindPath(r, child->blkno);
Assert(ptr != NULL);
/* read all buffers as expected by caller */
......@@ -1192,27 +1189,31 @@ gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, ItemPointer ke
Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
page = BufferGetPage(buffer);
GISTInitBuffer(buffer, 0);
START_CRIT_SECTION();
GISTInitBuffer(buffer, 0); /* XXX not F_LEAF? */
gistfillbuffer(r, page, itup, len, FirstOffsetNumber);
if (!r->rd_istemp)
{
XLogRecPtr recptr;
XLogRecData *rdata;
rdata = formUpdateRdata(r->rd_node, GIST_ROOT_BLKNO,
NULL, 0, false, itup, len, key);
START_CRIT_SECTION();
rdata = formUpdateRdata(r->rd_node, buffer,
NULL, 0, false,
itup, len, key);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_NEW_ROOT, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
}
else
PageSetLSN(page, XLogRecPtrForTemp);
WriteNoReleaseBuffer(buffer);
END_CRIT_SECTION();
}
void
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.16 2006/03/05 15:58:20 momjian Exp $
* $PostgreSQL: pgsql/src/backend/access/gist/gistvacuum.c,v 1.17 2006/03/30 23:03:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -80,6 +80,12 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
page = (Page) BufferGetPage(buffer);
maxoff = PageGetMaxOffsetNumber(page);
/*
* XXX need to reduce scope of changes to page so we can make this
* critical section less extensive
*/
START_CRIT_SECTION();
if (GistPageIsLeaf(page))
{
if (GistTuplesDeleted(page))
......@@ -188,11 +194,9 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
ItemPointerSet(&key, blkno, TUPLE_IS_VALID);
rdata = formSplitRdata(gv->index->rd_node, blkno,
&key, dist);
false, &key, dist);
xlinfo = rdata->data;
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_SPLIT, rdata);
ptr = dist;
while (ptr)
......@@ -202,7 +206,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
ptr = ptr->next;
}
END_CRIT_SECTION();
pfree(xlinfo);
pfree(rdata);
}
......@@ -235,8 +238,6 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
oldCtx = MemoryContextSwitchTo(gv->opCtx);
gistnewroot(gv->index, buffer, res.itup, res.ituplen, &key);
MemoryContextSwitchTo(oldCtx);
WriteNoReleaseBuffer(buffer);
}
needwrite = false;
......@@ -302,15 +303,14 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
XLogRecPtr recptr;
char *xlinfo;
rdata = formUpdateRdata(gv->index->rd_node, blkno, todelete, ntodelete,
res.emptypage, addon, curlenaddon, NULL);
rdata = formUpdateRdata(gv->index->rd_node, buffer,
todelete, ntodelete, res.emptypage,
addon, curlenaddon, NULL);
xlinfo = rdata->data;
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
pfree(xlinfo);
pfree(rdata);
......@@ -322,6 +322,8 @@ gistVacuumUpdate(GistVacuum *gv, BlockNumber blkno, bool needunion)
else
ReleaseBuffer(buffer);
END_CRIT_SECTION();
if (ncompleted && !gv->index->rd_istemp)
gistxlogInsertCompletion(gv->index->rd_node, completed, ncompleted);
......@@ -579,6 +581,17 @@ gistbulkdelete(PG_FUNCTION_ARGS)
*/
pushStackIfSplited(page, stack);
/*
* Remove deletable tuples from page
*
* XXX try to make this critical section shorter. Could do it
* by separating the callback loop from the actual tuple deletion,
* but that would affect the definition of the todelete[] array
* passed into the WAL record (because the indexes would all be
* pre-deletion).
*/
START_CRIT_SECTION();
maxoff = PageGetMaxOffsetNumber(page);
for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
......@@ -608,17 +621,17 @@ gistbulkdelete(PG_FUNCTION_ARGS)
{
XLogRecData *rdata;
XLogRecPtr recptr;
gistxlogEntryUpdate *xlinfo;
gistxlogPageUpdate *xlinfo;
rdata = formUpdateRdata(rel->rd_node, stack->blkno, todelete, ntodelete,
false, NULL, 0, NULL);
xlinfo = (gistxlogEntryUpdate *) rdata->data;
rdata = formUpdateRdata(rel->rd_node, buffer,
todelete, ntodelete, false,
NULL, 0,
NULL);
xlinfo = (gistxlogPageUpdate *) rdata->data;
START_CRIT_SECTION();
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_ENTRY_UPDATE, rdata);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_PAGE_UPDATE, rdata);
PageSetLSN(page, recptr);
PageSetTLI(page, ThisTimeLineID);
END_CRIT_SECTION();
pfree(xlinfo);
pfree(rdata);
......@@ -627,6 +640,8 @@ gistbulkdelete(PG_FUNCTION_ARGS)
PageSetLSN(page, XLogRecPtrForTemp);
WriteNoReleaseBuffer(buffer);
}
END_CRIT_SECTION();
}
else
{
......
This diff is collapsed.
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.11 2006/03/24 04:32:13 tgl Exp $
* $PostgreSQL: pgsql/src/include/access/gist_private.h,v 1.12 2006/03/30 23:03:10 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -80,11 +80,13 @@ typedef GISTScanOpaqueData *GISTScanOpaque;
/* XLog stuff */
extern const XLogRecPtr XLogRecPtrForTemp;
#define XLOG_GIST_ENTRY_UPDATE 0x00
#define XLOG_GIST_ENTRY_DELETE 0x10
#define XLOG_GIST_PAGE_UPDATE 0x00
#define XLOG_GIST_NEW_ROOT 0x20
#define XLOG_GIST_PAGE_SPLIT 0x30
#define XLOG_GIST_INSERT_COMPLETE 0x40
#define XLOG_GIST_CREATE_INDEX 0x50
typedef struct gistxlogEntryUpdate
typedef struct gistxlogPageUpdate
{
RelFileNode node;
BlockNumber blkno;
......@@ -100,17 +102,16 @@ typedef struct gistxlogEntryUpdate
/*
* follow: 1. todelete OffsetNumbers 2. tuples to insert
*/
} gistxlogEntryUpdate;
#define XLOG_GIST_PAGE_SPLIT 0x30
} gistxlogPageUpdate;
typedef struct gistxlogPageSplit
{
RelFileNode node;
BlockNumber origblkno; /* splitted page */
bool origleaf; /* was splitted page a leaf page? */
uint16 npage;
/* see comments on gistxlogEntryUpdate */
/* see comments on gistxlogPageUpdate */
ItemPointerData key;
/*
......@@ -118,22 +119,19 @@ typedef struct gistxlogPageSplit
*/
} gistxlogPageSplit;
#define XLOG_GIST_INSERT_COMPLETE 0x40
typedef struct gistxlogPage
{
BlockNumber blkno;
int num;
int num; /* number of index tuples following */
} gistxlogPage;
#define XLOG_GIST_CREATE_INDEX 0x50
typedef struct gistxlogInsertComplete
{
RelFileNode node;
/* follows ItemPointerData key to clean */
} gistxlogInsertComplete;
/* SplitedPageLayout - gistSplit function result */
typedef struct SplitedPageLayout
{
......@@ -239,8 +237,7 @@ extern void gistnewroot(Relation r, Buffer buffer, IndexTuple *itup, int len, It
extern IndexTuple *gistSplit(Relation r, Buffer buffer, IndexTuple *itup,
int *len, SplitedPageLayout **dist, GISTSTATE *giststate);
extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child,
Buffer (*myReadBuffer) (Relation, BlockNumber));
extern GISTInsertStack *gistFindPath(Relation r, BlockNumber child);
/* gistxlog.c */
extern void gist_redo(XLogRecPtr lsn, XLogRecord *record);
......@@ -249,11 +246,12 @@ extern void gist_xlog_startup(void);
extern void gist_xlog_cleanup(void);
extern IndexTuple gist_form_invalid_tuple(BlockNumber blkno);
extern XLogRecData *formUpdateRdata(RelFileNode node, BlockNumber blkno,
extern XLogRecData *formUpdateRdata(RelFileNode node, Buffer buffer,
OffsetNumber *todelete, int ntodelete, bool emptypage,
IndexTuple *itup, int ituplen, ItemPointer key);
extern XLogRecData *formSplitRdata(RelFileNode node, BlockNumber blkno,
extern XLogRecData *formSplitRdata(RelFileNode node,
BlockNumber blkno, bool page_is_leaf,
ItemPointer key, SplitedPageLayout *dist);
extern XLogRecPtr gistxlogInsertCompletion(RelFileNode node, ItemPointerData *keys, int len);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment