Commit 9155580f authored by Heikki Linnakangas's avatar Heikki Linnakangas

Generate less WAL during GiST, GIN and SP-GiST index build.

Instead of WAL-logging every modification during the build separately,
first build the index without any WAL-logging, and make a separate pass
through the index at the end, to write all pages to the WAL. This
significantly reduces the amount of WAL generated, and is usually also
faster, despite the extra I/O needed for the extra scan through the index.
WAL generated this way is also faster to replay.

For GiST, the LSN-NSN interlock makes this a little tricky. All pages must
be marked with a valid (i.e. non-zero) LSN, so that the parent-child
LSN-NSN interlock works correctly. We now use magic value 1 for that during
index build. Change the fake LSN counter to begin from 1000, so that 1 is
safely smaller than any real or fake LSN. 2 would've been enough for our
purposes, but let's reserve a bigger range, in case we need more special
values in the future.

Author: Anastasia Lubennikova, Andrey V. Lepikhov
Reviewed-by: Heikki Linnakangas, Dmitry Dolgov
parent 5f768045
......@@ -396,7 +396,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
/* It will fit, perform the insertion */
START_CRIT_SECTION();
if (RelationNeedsWAL(btree->index))
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{
XLogBeginInsert();
XLogRegisterBuffer(0, stack->buffer, REGBUF_STANDARD);
......@@ -417,7 +417,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
MarkBufferDirty(childbuf);
}
if (RelationNeedsWAL(btree->index))
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{
XLogRecPtr recptr;
ginxlogInsert xlrec;
......@@ -595,7 +595,7 @@ ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
}
/* write WAL record */
if (RelationNeedsWAL(btree->index))
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{
XLogRecPtr recptr;
......
......@@ -593,7 +593,7 @@ dataBeginPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
* Great, all the items fit on a single page. If needed, prepare data
* for a WAL record describing the changes we'll make.
*/
if (RelationNeedsWAL(btree->index))
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
computeLeafRecompressWALData(leaf);
/*
......@@ -719,7 +719,7 @@ dataExecPlaceToPageLeaf(GinBtree btree, Buffer buf, GinBtreeStack *stack,
dataPlaceToPageLeafRecompress(buf, leaf);
/* If needed, register WAL data built by computeLeafRecompressWALData */
if (RelationNeedsWAL(btree->index))
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{
XLogRegisterBufData(0, leaf->walinfo, leaf->walinfolen);
}
......@@ -1152,7 +1152,7 @@ dataExecPlaceToPageInternal(GinBtree btree, Buffer buf, GinBtreeStack *stack,
pitem = (PostingItem *) insertdata;
GinDataPageAddPostingItem(page, pitem, off);
if (RelationNeedsWAL(btree->index))
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{
/*
* This must be static, because it has to survive until XLogInsert,
......@@ -1773,6 +1773,7 @@ createPostingTree(Relation index, ItemPointerData *items, uint32 nitems,
Pointer ptr;
int nrootitems;
int rootsize;
bool is_build = (buildStats != NULL);
/* Construct the new root page in memory first. */
tmppage = (Page) palloc(BLCKSZ);
......@@ -1826,7 +1827,7 @@ createPostingTree(Relation index, ItemPointerData *items, uint32 nitems,
PageRestoreTempPage(tmppage, page);
MarkBufferDirty(buffer);
if (RelationNeedsWAL(index))
if (RelationNeedsWAL(index) && !is_build)
{
XLogRecPtr recptr;
ginxlogCreatePostingTree data;
......
......@@ -571,7 +571,7 @@ entryExecPlaceToPage(GinBtree btree, Buffer buf, GinBtreeStack *stack,
elog(ERROR, "failed to add item to index page in \"%s\"",
RelationGetRelationName(btree->index));
if (RelationNeedsWAL(btree->index))
if (RelationNeedsWAL(btree->index) && !btree->isBuild)
{
/*
* This must be static, because it has to survive until XLogInsert,
......
......@@ -195,6 +195,7 @@ ginEntryInsert(GinState *ginstate,
buildStats->nEntries++;
ginPrepareEntryScan(&btree, attnum, key, category, ginstate);
btree.isBuild = (buildStats != NULL);
stack = ginFindLeafPage(&btree, false, false, NULL);
page = BufferGetPage(stack->buffer);
......@@ -347,23 +348,6 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
GinInitBuffer(RootBuffer, GIN_LEAF);
MarkBufferDirty(RootBuffer);
if (RelationNeedsWAL(index))
{
XLogRecPtr recptr;
Page page;
XLogBeginInsert();
XLogRegisterBuffer(0, MetaBuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
XLogRegisterBuffer(1, RootBuffer, REGBUF_WILL_INIT);
recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_CREATE_INDEX);
page = BufferGetPage(RootBuffer);
PageSetLSN(page, recptr);
page = BufferGetPage(MetaBuffer);
PageSetLSN(page, recptr);
}
UnlockReleaseBuffer(MetaBuffer);
UnlockReleaseBuffer(RootBuffer);
......@@ -419,7 +403,18 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
* Update metapage stats
*/
buildstate.buildStats.nTotalPages = RelationGetNumberOfBlocks(index);
ginUpdateStats(index, &buildstate.buildStats);
ginUpdateStats(index, &buildstate.buildStats, true);
/*
* We didn't write WAL records as we built the index, so if WAL-logging is
* required, write all pages to the WAL now.
*/
if (RelationNeedsWAL(index))
{
log_newpage_range(index, MAIN_FORKNUM,
0, RelationGetNumberOfBlocks(index),
true);
}
/*
* Return statistics
......
......@@ -662,7 +662,7 @@ ginGetStats(Relation index, GinStatsData *stats)
* Note: nPendingPages and ginVersion are *not* copied over
*/
void
ginUpdateStats(Relation index, const GinStatsData *stats)
ginUpdateStats(Relation index, const GinStatsData *stats, bool is_build)
{
Buffer metabuffer;
Page metapage;
......@@ -692,7 +692,7 @@ ginUpdateStats(Relation index, const GinStatsData *stats)
MarkBufferDirty(metabuffer);
if (RelationNeedsWAL(index))
if (RelationNeedsWAL(index) && !is_build)
{
XLogRecPtr recptr;
ginxlogUpdateMeta data;
......
......@@ -759,7 +759,7 @@ ginvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
/* Update the metapage with accurate page and entry counts */
idxStat.nTotalPages = npages;
ginUpdateStats(info->index, &idxStat);
ginUpdateStats(info->index, &idxStat, false);
/* Finally, vacuum the FSM */
IndexFreeSpaceMapVacuum(info->index);
......
......@@ -40,36 +40,6 @@ ginRedoClearIncompleteSplit(XLogReaderState *record, uint8 block_id)
UnlockReleaseBuffer(buffer);
}
static void
ginRedoCreateIndex(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
Buffer RootBuffer,
MetaBuffer;
Page page;
MetaBuffer = XLogInitBufferForRedo(record, 0);
Assert(BufferGetBlockNumber(MetaBuffer) == GIN_METAPAGE_BLKNO);
page = (Page) BufferGetPage(MetaBuffer);
GinInitMetabuffer(MetaBuffer);
PageSetLSN(page, lsn);
MarkBufferDirty(MetaBuffer);
RootBuffer = XLogInitBufferForRedo(record, 1);
Assert(BufferGetBlockNumber(RootBuffer) == GIN_ROOT_BLKNO);
page = (Page) BufferGetPage(RootBuffer);
GinInitBuffer(RootBuffer, GIN_LEAF);
PageSetLSN(page, lsn);
MarkBufferDirty(RootBuffer);
UnlockReleaseBuffer(RootBuffer);
UnlockReleaseBuffer(MetaBuffer);
}
static void
ginRedoCreatePTree(XLogReaderState *record)
{
......@@ -767,9 +737,6 @@ gin_redo(XLogReaderState *record)
oldCtx = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_GIN_CREATE_INDEX:
ginRedoCreateIndex(record);
break;
case XLOG_GIN_CREATE_PTREE:
ginRedoCreatePTree(record);
break;
......
......@@ -173,7 +173,7 @@ gistinsert(Relation r, Datum *values, bool *isnull,
values, isnull, true /* size is currently bogus */ );
itup->t_tid = *ht_ctid;
gistdoinsert(r, itup, 0, giststate, heapRel);
gistdoinsert(r, itup, 0, giststate, heapRel, false);
/* cleanup */
MemoryContextSwitchTo(oldCxt);
......@@ -220,7 +220,8 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
Buffer leftchildbuf,
List **splitinfo,
bool markfollowright,
Relation heapRel)
Relation heapRel,
bool is_build)
{
BlockNumber blkno = BufferGetBlockNumber(buffer);
Page page = BufferGetPage(buffer);
......@@ -459,7 +460,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
* insertion for that. NB: The number of pages and data segments
* specified here must match the calculations in gistXLogSplit()!
*/
if (RelationNeedsWAL(rel))
if (!is_build && RelationNeedsWAL(rel))
XLogEnsureRecordSpace(npage, 1 + npage * 2);
START_CRIT_SECTION();
......@@ -480,18 +481,30 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
PageRestoreTempPage(dist->page, BufferGetPage(dist->buffer));
dist->page = BufferGetPage(dist->buffer);
/* Write the WAL record */
/*
* Write the WAL record.
*
* If we're building a new index, however, we don't WAL-log changes
* yet. The LSN-NSN interlock between parent and child requires that
* LSNs never move backwards, so set the LSNs to a value that's
* smaller than any real or fake unlogged LSN that might be generated
* later. (There can't be any concurrent scans during index build, so
* we don't need to be able to detect concurrent splits yet.)
*/
if (is_build)
recptr = GistBuildLSN;
else
{
if (RelationNeedsWAL(rel))
recptr = gistXLogSplit(is_leaf,
dist, oldrlink, oldnsn, leftchildbuf,
markfollowright);
else
recptr = gistGetFakeLSN(rel);
}
for (ptr = dist; ptr; ptr = ptr->next)
{
PageSetLSN(ptr->page, recptr);
}
/*
* Return the new child buffers to the caller.
......@@ -545,6 +558,10 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
if (BufferIsValid(leftchildbuf))
MarkBufferDirty(leftchildbuf);
if (is_build)
recptr = GistBuildLSN;
else
{
if (RelationNeedsWAL(rel))
{
OffsetNumber ndeloffs = 0,
......@@ -559,14 +576,11 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
recptr = gistXLogUpdate(buffer,
deloffs, ndeloffs, itup, ntup,
leftchildbuf);
PageSetLSN(page, recptr);
}
else
{
recptr = gistGetFakeLSN(rel);
PageSetLSN(page, recptr);
}
PageSetLSN(page, recptr);
if (newblkno)
*newblkno = blkno;
......@@ -607,7 +621,7 @@ gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
*/
void
gistdoinsert(Relation r, IndexTuple itup, Size freespace,
GISTSTATE *giststate, Relation heapRel)
GISTSTATE *giststate, Relation heapRel, bool is_build)
{
ItemId iid;
IndexTuple idxtuple;
......@@ -620,6 +634,7 @@ gistdoinsert(Relation r, IndexTuple itup, Size freespace,
state.freespace = freespace;
state.r = r;
state.heapRel = heapRel;
state.is_build = is_build;
/* Start from the root */
firststack.blkno = GIST_ROOT_BLKNO;
......@@ -1252,7 +1267,8 @@ gistinserttuples(GISTInsertState *state, GISTInsertStack *stack,
leftchild,
&splitinfo,
true,
state->heapRel);
state->heapRel,
state->is_build);
/*
* Before recursing up in case the page was split, release locks on the
......
......@@ -180,19 +180,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
GISTInitBuffer(buffer, F_LEAF);
MarkBufferDirty(buffer);
if (RelationNeedsWAL(index))
{
XLogRecPtr recptr;
XLogBeginInsert();
XLogRegisterBuffer(0, buffer, REGBUF_WILL_INIT);
recptr = XLogInsert(RM_GIST_ID, XLOG_GIST_CREATE_INDEX);
PageSetLSN(page, recptr);
}
else
PageSetLSN(page, gistGetFakeLSN(heap));
PageSetLSN(page, GistBuildLSN);
UnlockReleaseBuffer(buffer);
......@@ -226,6 +214,17 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
freeGISTstate(buildstate.giststate);
/*
* We didn't write WAL records as we built the index, so if WAL-logging is
* required, write all pages to the WAL now.
*/
if (RelationNeedsWAL(index))
{
log_newpage_range(index, MAIN_FORKNUM,
0, RelationGetNumberOfBlocks(index),
true);
}
/*
* Return statistics
*/
......@@ -488,7 +487,7 @@ gistBuildCallback(Relation index,
* locked, we call gistdoinsert directly.
*/
gistdoinsert(index, itup, buildstate->freespace,
buildstate->giststate, buildstate->heaprel);
buildstate->giststate, buildstate->heaprel, true);
}
/* Update tuple count and total size. */
......@@ -695,7 +694,7 @@ gistbufferinginserttuples(GISTBuildState *buildstate, Buffer buffer, int level,
InvalidBuffer,
&splitinfo,
false,
buildstate->heaprel);
buildstate->heaprel, true);
/*
* If this is a root split, update the root path item kept in memory. This
......
......@@ -1008,7 +1008,7 @@ gistproperty(Oid index_oid, int attno,
XLogRecPtr
gistGetFakeLSN(Relation rel)
{
static XLogRecPtr counter = 1;
static XLogRecPtr counter = FirstNormalUnloggedLSN;
if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
{
......
......@@ -490,25 +490,6 @@ gistRedoPageSplitRecord(XLogReaderState *record)
UnlockReleaseBuffer(firstbuffer);
}
static void
gistRedoCreateIndex(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
Buffer buffer;
Page page;
buffer = XLogInitBufferForRedo(record, 0);
Assert(BufferGetBlockNumber(buffer) == GIST_ROOT_BLKNO);
page = (Page) BufferGetPage(buffer);
GISTInitBuffer(buffer, F_LEAF);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
/* redo page deletion */
static void
gistRedoPageDelete(XLogReaderState *record)
......@@ -594,9 +575,6 @@ gist_redo(XLogReaderState *record)
case XLOG_GIST_PAGE_SPLIT:
gistRedoPageSplitRecord(record);
break;
case XLOG_GIST_CREATE_INDEX:
gistRedoCreateIndex(record);
break;
case XLOG_GIST_PAGE_DELETE:
gistRedoPageDelete(record);
break;
......
......@@ -78,9 +78,6 @@ gin_desc(StringInfo buf, XLogReaderState *record)
switch (info)
{
case XLOG_GIN_CREATE_INDEX:
/* no further information */
break;
case XLOG_GIN_CREATE_PTREE:
/* no further information */
break;
......@@ -188,9 +185,6 @@ gin_identify(uint8 info)
switch (info & ~XLR_INFO_MASK)
{
case XLOG_GIN_CREATE_INDEX:
id = "CREATE_INDEX";
break;
case XLOG_GIN_CREATE_PTREE:
id = "CREATE_PTREE";
break;
......
......@@ -71,8 +71,6 @@ gist_desc(StringInfo buf, XLogReaderState *record)
case XLOG_GIST_PAGE_SPLIT:
out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
break;
case XLOG_GIST_CREATE_INDEX:
break;
case XLOG_GIST_PAGE_DELETE:
out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
break;
......@@ -98,9 +96,6 @@ gist_identify(uint8 info)
case XLOG_GIST_PAGE_SPLIT:
id = "PAGE_SPLIT";
break;
case XLOG_GIST_CREATE_INDEX:
id = "CREATE_INDEX";
break;
case XLOG_GIST_PAGE_DELETE:
id = "PAGE_DELETE";
break;
......
......@@ -24,8 +24,6 @@ spg_desc(StringInfo buf, XLogReaderState *record)
switch (info)
{
case XLOG_SPGIST_CREATE_INDEX:
break;
case XLOG_SPGIST_ADD_LEAF:
{
spgxlogAddLeaf *xlrec = (spgxlogAddLeaf *) rec;
......@@ -88,9 +86,6 @@ spg_identify(uint8 info)
switch (info & ~XLR_INFO_MASK)
{
case XLOG_SPGIST_CREATE_INDEX:
id = "CREATE_INDEX";
break;
case XLOG_SPGIST_ADD_LEAF:
id = "ADD_LEAF";
break;
......
......@@ -289,7 +289,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
MarkBufferDirty(current->buffer);
if (RelationNeedsWAL(index))
if (RelationNeedsWAL(index) && !state->isBuild)
{
XLogRecPtr recptr;
int flags;
......@@ -516,7 +516,7 @@ moveLeafs(Relation index, SpGistState *state,
MarkBufferDirty(current->buffer);
MarkBufferDirty(nbuf);
if (RelationNeedsWAL(index))
if (RelationNeedsWAL(index) && !state->isBuild)
{
XLogRecPtr recptr;
......@@ -1334,7 +1334,7 @@ doPickSplit(Relation index, SpGistState *state,
saveCurrent.buffer = InvalidBuffer;
}
if (RelationNeedsWAL(index))
if (RelationNeedsWAL(index) && !state->isBuild)
{
XLogRecPtr recptr;
int flags;
......@@ -1531,7 +1531,7 @@ spgAddNodeAction(Relation index, SpGistState *state,
MarkBufferDirty(current->buffer);
if (RelationNeedsWAL(index))
if (RelationNeedsWAL(index) && !state->isBuild)
{
XLogRecPtr recptr;
......@@ -1644,7 +1644,7 @@ spgAddNodeAction(Relation index, SpGistState *state,
MarkBufferDirty(saveCurrent.buffer);
if (RelationNeedsWAL(index))
if (RelationNeedsWAL(index) && !state->isBuild)
{
XLogRecPtr recptr;
int flags;
......@@ -1840,7 +1840,7 @@ spgSplitNodeAction(Relation index, SpGistState *state,
MarkBufferDirty(current->buffer);
if (RelationNeedsWAL(index))
if (RelationNeedsWAL(index) && !state->isBuild)
{
XLogRecPtr recptr;
......
......@@ -105,26 +105,6 @@ spgbuild(Relation heap, Relation index, IndexInfo *indexInfo)
SpGistInitBuffer(nullbuffer, SPGIST_LEAF | SPGIST_NULLS);
MarkBufferDirty(nullbuffer);
if (RelationNeedsWAL(index))
{
XLogRecPtr recptr;
XLogBeginInsert();
/*
* Replay will re-initialize the pages, so don't take full pages
* images. No other data to log.
*/
XLogRegisterBuffer(0, metabuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
XLogRegisterBuffer(1, rootbuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
XLogRegisterBuffer(2, nullbuffer, REGBUF_WILL_INIT | REGBUF_STANDARD);
recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_CREATE_INDEX);
PageSetLSN(BufferGetPage(metabuffer), recptr);
PageSetLSN(BufferGetPage(rootbuffer), recptr);
PageSetLSN(BufferGetPage(nullbuffer), recptr);
}
END_CRIT_SECTION();
......@@ -151,6 +131,17 @@ spgbuild(Relation heap, Relation index, IndexInfo *indexInfo)
SpGistUpdateMetaPage(index);
/*
* We didn't write WAL records as we built the index, so if WAL-logging is
* required, write all pages to the WAL now.
*/
if (RelationNeedsWAL(index))
{
log_newpage_range(index, MAIN_FORKNUM,
0, RelationGetNumberOfBlocks(index),
true);
}
result = (IndexBuildResult *) palloc0(sizeof(IndexBuildResult));
result->heap_tuples = reltuples;
result->index_tuples = buildstate.indtuples;
......
......@@ -72,38 +72,6 @@ addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
size);
}
static void
spgRedoCreateIndex(XLogReaderState *record)
{
XLogRecPtr lsn = record->EndRecPtr;
Buffer buffer;
Page page;
buffer = XLogInitBufferForRedo(record, 0);
Assert(BufferGetBlockNumber(buffer) == SPGIST_METAPAGE_BLKNO);
page = (Page) BufferGetPage(buffer);
SpGistInitMetapage(page);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
buffer = XLogInitBufferForRedo(record, 1);
Assert(BufferGetBlockNumber(buffer) == SPGIST_ROOT_BLKNO);
SpGistInitBuffer(buffer, SPGIST_LEAF);
page = (Page) BufferGetPage(buffer);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
buffer = XLogInitBufferForRedo(record, 2);
Assert(BufferGetBlockNumber(buffer) == SPGIST_NULL_BLKNO);
SpGistInitBuffer(buffer, SPGIST_LEAF | SPGIST_NULLS);
page = (Page) BufferGetPage(buffer);
PageSetLSN(page, lsn);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
static void
spgRedoAddLeaf(XLogReaderState *record)
{
......@@ -976,9 +944,6 @@ spg_redo(XLogReaderState *record)
oldCxt = MemoryContextSwitchTo(opCtx);
switch (info)
{
case XLOG_SPGIST_CREATE_INDEX:
spgRedoCreateIndex(record);
break;
case XLOG_SPGIST_ADD_LEAF:
spgRedoAddLeaf(record);
break;
......
......@@ -5242,7 +5242,7 @@ BootStrapXLOG(void)
ControlFile->time = checkPoint.time;
ControlFile->checkPoint = checkPoint.redo;
ControlFile->checkPointCopy = checkPoint;
ControlFile->unloggedLSN = 1;
ControlFile->unloggedLSN = FirstNormalUnloggedLSN;
/* Set important parameter values for use when replaying WAL */
ControlFile->MaxConnections = MaxConnections;
......@@ -9781,12 +9781,11 @@ xlog_redo(XLogReaderState *record)
}
else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT)
{
Buffer buffer;
/*
* Full-page image (FPI) records contain nothing else but a backup
* block. The block reference must include a full-page image -
* otherwise there would be no point in this record.
* block (or multiple backup blocks). Every block reference must
* include a full-page image - otherwise there would be no point in
* this record.
*
* No recovery conflicts are generated by these generic records - if a
* resource manager needs to generate conflicts, it has to define a
......@@ -9798,10 +9797,15 @@ xlog_redo(XLogReaderState *record)
* XLOG_FPI and XLOG_FPI_FOR_HINT records, they use a different info
* code just to distinguish them for statistics purposes.
*/
if (XLogReadBufferForRedo(record, 0, &buffer) != BLK_RESTORED)
for (uint8 block_id = 0; block_id <= record->max_block_id; block_id++)
{
Buffer buffer;
if (XLogReadBufferForRedo(record, block_id, &buffer) != BLK_RESTORED)
elog(ERROR, "unexpected XLogReadBufferForRedo result when restoring backup block");
UnlockReleaseBuffer(buffer);
}
}
else if (info == XLOG_BACKUP_END)
{
XLogRecPtr startpoint;
......
......@@ -1021,6 +1021,88 @@ log_newpage_buffer(Buffer buffer, bool page_std)
return log_newpage(&rnode, forkNum, blkno, page, page_std);
}
/*
* WAL-log a range of blocks in a relation.
*
* An image of all pages with block numbers 'startblk' <= X < 'endblock' is
* written to the WAL. If the range is large, this is done in multiple WAL
* records.
*
* If all page follows the standard page layout, with a PageHeader and unused
* space between pd_lower and pd_upper, set 'page_std' to true. That allows
* the unused space to be left out from the WAL records, making them smaller.
*
* NOTE: This function acquires exclusive-locks on the pages. Typically, this
* is used on a newly-built relation, and the caller is holding a
* AccessExclusiveLock on it, so no other backend can be accessing it at the
* same time. If that's not the case, you must ensure that this does not
* cause a deadlock through some other means.
*/
void
log_newpage_range(Relation rel, ForkNumber forkNum,
BlockNumber startblk, BlockNumber endblk,
bool page_std)
{
BlockNumber blkno;
/*
* Iterate over all the pages in the range. They are collected into
* batches of XLR_MAX_BLOCK_ID pages, and a single WAL-record is written
* for each batch.
*/
XLogEnsureRecordSpace(XLR_MAX_BLOCK_ID - 1, 0);
blkno = startblk;
while (blkno < endblk)
{
Buffer bufpack[XLR_MAX_BLOCK_ID];
XLogRecPtr recptr;
int nbufs;
int i;
CHECK_FOR_INTERRUPTS();
/* Collect a batch of blocks. */
nbufs = 0;
while (nbufs < XLR_MAX_BLOCK_ID && blkno < endblk)
{
Buffer buf = ReadBuffer(rel, blkno);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
/*
* Completely empty pages are not WAL-logged. Writing a WAL record
* would change the LSN, and we don't want that. We want the page
* to stay empty.
*/
if (!PageIsNew(BufferGetPage(buf)))
bufpack[nbufs++] = buf;
else
UnlockReleaseBuffer(buf);
blkno++;
}
/* Write WAL record for this batch. */
XLogBeginInsert();
START_CRIT_SECTION();
for (i = 0; i < nbufs; i++)
{
XLogRegisterBuffer(i, bufpack[i], REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
MarkBufferDirty(bufpack[i]);
}
recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
for (i = 0; i < nbufs; i++)
{
PageSetLSN(BufferGetPage(bufpack[i]), recptr);
UnlockReleaseBuffer(bufpack[i]);
}
END_CRIT_SECTION();
}
}
/*
* Allocate working buffers needed for WAL record construction.
*/
......
......@@ -71,6 +71,7 @@ extern int gin_pending_list_limit;
/* ginutil.c */
extern void ginGetStats(Relation index, GinStatsData *stats);
extern void ginUpdateStats(Relation index, const GinStatsData *stats);
extern void ginUpdateStats(Relation index, const GinStatsData *stats,
bool is_build);
#endif /* GIN_H */
......@@ -16,8 +16,6 @@
#include "lib/stringinfo.h"
#include "storage/off.h"
#define XLOG_GIN_CREATE_INDEX 0x00
#define XLOG_GIN_CREATE_PTREE 0x10
typedef struct ginxlogCreatePostingTree
......
......@@ -49,6 +49,13 @@
typedef XLogRecPtr GistNSN;
/*
* A bogus LSN / NSN value used during index build. Must be smaller than any
* real or fake unlogged LSN, so that after an index build finishes, all the
* splits are considered completed.
*/
#define GistBuildLSN ((XLogRecPtr) 1)
/*
* For on-disk compatibility with pre-9.3 servers, NSN is stored as two
* 32-bit fields on disk, same as LSNs.
......
......@@ -244,6 +244,7 @@ typedef struct
Relation r;
Relation heapRel;
Size freespace; /* free space to be left */
bool is_build;
GISTInsertStack *stack;
} GISTInsertState;
......@@ -393,7 +394,8 @@ extern void gistdoinsert(Relation r,
IndexTuple itup,
Size freespace,
GISTSTATE *GISTstate,
Relation heapRel);
Relation heapRel,
bool is_build);
/* A List of these is returned from gistplacetopage() in *splitinfo */
typedef struct
......@@ -409,7 +411,8 @@ extern bool gistplacetopage(Relation rel, Size freespace, GISTSTATE *giststate,
Buffer leftchildbuf,
List **splitinfo,
bool markleftchild,
Relation heapRel);
Relation heapRel,
bool is_build);
extern SplitedPageLayout *gistSplit(Relation r, Page page, IndexTuple *itup,
int len, GISTSTATE *giststate);
......
......@@ -23,7 +23,7 @@
* FSM */
#define XLOG_GIST_PAGE_SPLIT 0x30
/* #define XLOG_GIST_INSERT_COMPLETE 0x40 */ /* not used anymore */
#define XLOG_GIST_CREATE_INDEX 0x50
/* #define XLOG_GIST_CREATE_INDEX 0x50 */ /* not used anymore */
#define XLOG_GIST_PAGE_DELETE 0x60
/*
......
......@@ -18,7 +18,7 @@
#include "storage/off.h"
/* XLOG record types for SPGiST */
#define XLOG_SPGIST_CREATE_INDEX 0x00
/* #define XLOG_SPGIST_CREATE_INDEX 0x00 */ /* not used anymore */
#define XLOG_SPGIST_ADD_LEAF 0x10
#define XLOG_SPGIST_MOVE_LEAFS 0x20
#define XLOG_SPGIST_ADD_NODE 0x30
......
......@@ -28,6 +28,13 @@ typedef uint64 XLogRecPtr;
#define InvalidXLogRecPtr 0
#define XLogRecPtrIsInvalid(r) ((r) == InvalidXLogRecPtr)
/*
* First LSN to use for "fake" LSNs.
*
* Values smaller than this can be used for special per-AM purposes.
*/
#define FirstNormalUnloggedLSN ((XLogRecPtr) 1000)
/*
* XLogSegNo - physical log file sequence number.
*/
......
......@@ -16,6 +16,7 @@
#include "storage/block.h"
#include "storage/buf.h"
#include "storage/relfilenode.h"
#include "utils/relcache.h"
/*
* The minimum size of the WAL construction working area. If you need to
......@@ -54,6 +55,8 @@ extern bool XLogCheckBufferNeedsBackup(Buffer buffer);
extern XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum,
BlockNumber blk, char *page, bool page_std);
extern XLogRecPtr log_newpage_buffer(Buffer buffer, bool page_std);
extern void log_newpage_range(Relation rel, ForkNumber forkNum,
BlockNumber startblk, BlockNumber endblk, bool page_std);
extern XLogRecPtr XLogSaveBufferForHint(Buffer buffer, bool buffer_std);
extern void InitXLogInsert(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment