Commit 3bbf668d authored by Tom Lane's avatar Tom Lane

Fix multiple problems in WAL replay.

Most of the replay functions for WAL record types that modify more than
one page failed to ensure that those pages were locked correctly to ensure
that concurrent queries could not see inconsistent page states.  This is
a hangover from coding decisions made long before Hot Standby was added,
when it was hardly necessary to acquire buffer locks during WAL replay
at all, let alone hold them for carefully-chosen periods.

The key problem was that RestoreBkpBlocks was written to hold lock on each
page restored from a full-page image for only as long as it took to update
that page.  This was guaranteed to break any WAL replay function in which
there was any update-ordering constraint between pages, because even if the
nominal order of the pages is the right one, any mixture of full-page and
non-full-page updates in the same record would result in out-of-order
updates.  Moreover, it wouldn't work for situations where there's a
requirement to maintain lock on one page while updating another.  Failure
to honor an update ordering constraint in this way is thought to be the
cause of bug #7648 from Daniel Farina: what seems to have happened there
is that a btree page being split was rewritten from a full-page image
before the new right sibling page was written, and because lock on the
original page was not maintained it was possible for hot standby queries to
try to traverse the page's right-link to the not-yet-existing sibling page.

To fix, get rid of RestoreBkpBlocks as such, and instead create a new
function RestoreBackupBlock that restores just one full-page image at a
time.  This function can be invoked by WAL replay functions at the points
where they would otherwise perform non-full-page updates; in this way, the
physical order of page updates remains the same no matter which pages are
replaced by full-page images.  We can then further adjust the logic in
individual replay functions if it is necessary to hold buffer locks
for overlapping periods.  A side benefit is that we can simplify the
handling of concurrency conflict resolution by moving that code into the
record-type-specfic functions; there's no more need to contort the code
layout to keep conflict resolution in front of the RestoreBkpBlocks call.

In connection with that, standardize on zero-based numbering rather than
one-based numbering for referencing the full-page images.  In HEAD, I
removed the macros XLR_BKP_BLOCK_1 through XLR_BKP_BLOCK_4.  They are
still there in the header files in previous branches, but are no longer
used by the code.

In addition, fix some other bugs identified in the course of making these
changes:

spgRedoAddNode could fail to update the parent downlink at all, if the
parent tuple is in the same page as either the old or new split tuple and
we're not doing a full-page image: it would get fooled by the LSN having
been advanced already.  This would result in permanent index corruption,
not just transient failure of concurrent queries.

Also, ginHeapTupleFastInsert's "merge lists" case failed to mark the old
tail page as a candidate for a full-page image; in the worst case this
could result in torn-page corruption.

heap_xlog_freeze() was inconsistent about using a cleanup lock or plain
exclusive lock: it did the former in the normal path but the latter for a
full-page image.  A plain exclusive lock seems sufficient, so change to
that.

Also, remove gistRedoPageDeleteRecord(), which has been dead code since
VACUUM FULL was rewritten.

Back-patch to 9.0, where hot standby was introduced.  Note however that 9.0
had a significantly different WAL-logging scheme for GIST index updates,
and it doesn't appear possible to make that scheme safe for concurrent hot
standby queries, because it can leave inconsistent states in the index even
between WAL records.  Given the lack of complaints from the field, we won't
work too hard on fixing that branch.
parent 9b3ac49e
...@@ -290,7 +290,7 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) ...@@ -290,7 +290,7 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
if (metadata->head == InvalidBlockNumber) if (metadata->head == InvalidBlockNumber)
{ {
/* /*
* Main list is empty, so just copy sublist into main list * Main list is empty, so just insert sublist as main list
*/ */
START_CRIT_SECTION(); START_CRIT_SECTION();
...@@ -313,6 +313,14 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector) ...@@ -313,6 +313,14 @@ ginHeapTupleFastInsert(GinState *ginstate, GinTupleCollector *collector)
LockBuffer(buffer, GIN_EXCLUSIVE); LockBuffer(buffer, GIN_EXCLUSIVE);
page = BufferGetPage(buffer); page = BufferGetPage(buffer);
rdata[0].next = rdata + 1;
rdata[1].buffer = buffer;
rdata[1].buffer_std = true;
rdata[1].data = NULL;
rdata[1].len = 0;
rdata[1].next = NULL;
Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber); Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
START_CRIT_SECTION(); START_CRIT_SECTION();
......
...@@ -77,6 +77,9 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) ...@@ -77,6 +77,9 @@ ginRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
MetaBuffer; MetaBuffer;
Page page; Page page;
/* Backup blocks are not used in create_index records */
Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true); MetaBuffer = XLogReadBuffer(*node, GIN_METAPAGE_BLKNO, true);
Assert(BufferIsValid(MetaBuffer)); Assert(BufferIsValid(MetaBuffer));
page = (Page) BufferGetPage(MetaBuffer); page = (Page) BufferGetPage(MetaBuffer);
...@@ -109,6 +112,9 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record) ...@@ -109,6 +112,9 @@ ginRedoCreatePTree(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer; Buffer buffer;
Page page; Page page;
/* Backup blocks are not used in create_ptree records */
Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
buffer = XLogReadBuffer(data->node, data->blkno, true); buffer = XLogReadBuffer(data->node, data->blkno, true);
Assert(BufferIsValid(buffer)); Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
...@@ -159,9 +165,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record) ...@@ -159,9 +165,12 @@ ginRedoInsert(XLogRecPtr lsn, XLogRecord *record)
} }
} }
/* nothing else to do if page was backed up */ /* If we have a full-page image, restore it and we're done */
if (record->xl_info & XLR_BKP_BLOCK_1) if (record->xl_info & XLR_BKP_BLOCK(0))
{
(void) RestoreBackupBlock(lsn, record, 0, false, false);
return; return;
}
buffer = XLogReadBuffer(data->node, data->blkno, false); buffer = XLogReadBuffer(data->node, data->blkno, false);
if (!BufferIsValid(buffer)) if (!BufferIsValid(buffer))
...@@ -256,6 +265,9 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record) ...@@ -256,6 +265,9 @@ ginRedoSplit(XLogRecPtr lsn, XLogRecord *record)
if (data->isData) if (data->isData)
flags |= GIN_DATA; flags |= GIN_DATA;
/* Backup blocks are not used in split records */
Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
lbuffer = XLogReadBuffer(data->node, data->lblkno, true); lbuffer = XLogReadBuffer(data->node, data->lblkno, true);
Assert(BufferIsValid(lbuffer)); Assert(BufferIsValid(lbuffer));
lpage = (Page) BufferGetPage(lbuffer); lpage = (Page) BufferGetPage(lbuffer);
...@@ -369,9 +381,12 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record) ...@@ -369,9 +381,12 @@ ginRedoVacuumPage(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer; Buffer buffer;
Page page; Page page;
/* nothing to do if page was backed up (and no info to do it with) */ /* If we have a full-page image, restore it and we're done */
if (record->xl_info & XLR_BKP_BLOCK_1) if (record->xl_info & XLR_BKP_BLOCK(0))
{
(void) RestoreBackupBlock(lsn, record, 0, false, false);
return; return;
}
buffer = XLogReadBuffer(data->node, data->blkno, false); buffer = XLogReadBuffer(data->node, data->blkno, false);
if (!BufferIsValid(buffer)) if (!BufferIsValid(buffer))
...@@ -420,33 +435,38 @@ static void ...@@ -420,33 +435,38 @@ static void
ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
{ {
ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record); ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
Buffer buffer; Buffer dbuffer;
Buffer pbuffer;
Buffer lbuffer;
Page page; Page page;
if (!(record->xl_info & XLR_BKP_BLOCK_1)) if (record->xl_info & XLR_BKP_BLOCK(0))
dbuffer = RestoreBackupBlock(lsn, record, 0, false, true);
else
{ {
buffer = XLogReadBuffer(data->node, data->blkno, false); dbuffer = XLogReadBuffer(data->node, data->blkno, false);
if (BufferIsValid(buffer)) if (BufferIsValid(dbuffer))
{ {
page = BufferGetPage(buffer); page = BufferGetPage(dbuffer);
if (!XLByteLE(lsn, PageGetLSN(page))) if (!XLByteLE(lsn, PageGetLSN(page)))
{ {
Assert(GinPageIsData(page)); Assert(GinPageIsData(page));
GinPageGetOpaque(page)->flags = GIN_DELETED; GinPageGetOpaque(page)->flags = GIN_DELETED;
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer); MarkBufferDirty(dbuffer);
} }
UnlockReleaseBuffer(buffer);
} }
} }
if (!(record->xl_info & XLR_BKP_BLOCK_2)) if (record->xl_info & XLR_BKP_BLOCK(1))
pbuffer = RestoreBackupBlock(lsn, record, 1, false, true);
else
{ {
buffer = XLogReadBuffer(data->node, data->parentBlkno, false); pbuffer = XLogReadBuffer(data->node, data->parentBlkno, false);
if (BufferIsValid(buffer)) if (BufferIsValid(pbuffer))
{ {
page = BufferGetPage(buffer); page = BufferGetPage(pbuffer);
if (!XLByteLE(lsn, PageGetLSN(page))) if (!XLByteLE(lsn, PageGetLSN(page)))
{ {
Assert(GinPageIsData(page)); Assert(GinPageIsData(page));
...@@ -454,29 +474,35 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record) ...@@ -454,29 +474,35 @@ ginRedoDeletePage(XLogRecPtr lsn, XLogRecord *record)
GinPageDeletePostingItem(page, data->parentOffset); GinPageDeletePostingItem(page, data->parentOffset);
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer); MarkBufferDirty(pbuffer);
} }
UnlockReleaseBuffer(buffer);
} }
} }
if (!(record->xl_info & XLR_BKP_BLOCK_3) && data->leftBlkno != InvalidBlockNumber) if (record->xl_info & XLR_BKP_BLOCK(2))
(void) RestoreBackupBlock(lsn, record, 2, false, false);
else if (data->leftBlkno != InvalidBlockNumber)
{ {
buffer = XLogReadBuffer(data->node, data->leftBlkno, false); lbuffer = XLogReadBuffer(data->node, data->leftBlkno, false);
if (BufferIsValid(buffer)) if (BufferIsValid(lbuffer))
{ {
page = BufferGetPage(buffer); page = BufferGetPage(lbuffer);
if (!XLByteLE(lsn, PageGetLSN(page))) if (!XLByteLE(lsn, PageGetLSN(page)))
{ {
Assert(GinPageIsData(page)); Assert(GinPageIsData(page));
GinPageGetOpaque(page)->rightlink = data->rightLink; GinPageGetOpaque(page)->rightlink = data->rightLink;
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer); MarkBufferDirty(lbuffer);
} }
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(lbuffer);
} }
} }
if (BufferIsValid(pbuffer))
UnlockReleaseBuffer(pbuffer);
if (BufferIsValid(dbuffer))
UnlockReleaseBuffer(dbuffer);
} }
static void static void
...@@ -505,7 +531,9 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) ...@@ -505,7 +531,9 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
/* /*
* insert into tail page * insert into tail page
*/ */
if (!(record->xl_info & XLR_BKP_BLOCK_1)) if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
{ {
buffer = XLogReadBuffer(data->node, data->metadata.tail, false); buffer = XLogReadBuffer(data->node, data->metadata.tail, false);
if (BufferIsValid(buffer)) if (BufferIsValid(buffer))
...@@ -553,6 +581,10 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) ...@@ -553,6 +581,10 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
/* /*
* New tail * New tail
*/ */
if (record->xl_info & XLR_BKP_BLOCK(0))
(void) RestoreBackupBlock(lsn, record, 0, false, false);
else
{
buffer = XLogReadBuffer(data->node, data->prevTail, false); buffer = XLogReadBuffer(data->node, data->prevTail, false);
if (BufferIsValid(buffer)) if (BufferIsValid(buffer))
{ {
...@@ -569,6 +601,7 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record) ...@@ -569,6 +601,7 @@ ginRedoUpdateMetapage(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
} }
}
UnlockReleaseBuffer(metabuffer); UnlockReleaseBuffer(metabuffer);
} }
...@@ -585,8 +618,12 @@ ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record) ...@@ -585,8 +618,12 @@ ginRedoInsertListPage(XLogRecPtr lsn, XLogRecord *record)
tupsize; tupsize;
IndexTuple tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage)); IndexTuple tuples = (IndexTuple) (XLogRecGetData(record) + sizeof(ginxlogInsertListPage));
if (record->xl_info & XLR_BKP_BLOCK_1) /* If we have a full-page image, restore it and we're done */
if (record->xl_info & XLR_BKP_BLOCK(0))
{
(void) RestoreBackupBlock(lsn, record, 0, false, false);
return; return;
}
buffer = XLogReadBuffer(data->node, data->blkno, true); buffer = XLogReadBuffer(data->node, data->blkno, true);
Assert(BufferIsValid(buffer)); Assert(BufferIsValid(buffer));
...@@ -632,6 +669,9 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record) ...@@ -632,6 +669,9 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
Page metapage; Page metapage;
int i; int i;
/* Backup blocks are not used in delete_listpage records */
Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false); metabuffer = XLogReadBuffer(data->node, GIN_METAPAGE_BLKNO, false);
if (!BufferIsValid(metabuffer)) if (!BufferIsValid(metabuffer))
return; /* assume index was deleted, nothing to do */ return; /* assume index was deleted, nothing to do */
...@@ -645,6 +685,16 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record) ...@@ -645,6 +685,16 @@ ginRedoDeleteListPages(XLogRecPtr lsn, XLogRecord *record)
MarkBufferDirty(metabuffer); MarkBufferDirty(metabuffer);
} }
/*
* In normal operation, shiftList() takes exclusive lock on all the
* pages-to-be-deleted simultaneously. During replay, however, it should
* be all right to lock them one at a time. This is dependent on the fact
* that we are deleting pages from the head of the list, and that readers
* share-lock the next page before releasing the one they are on. So we
* cannot get past a reader that is on, or due to visit, any page we are
* going to delete. New incoming readers will block behind our metapage
* lock and then see a fully updated page list.
*/
for (i = 0; i < data->ndeleted; i++) for (i = 0; i < data->ndeleted; i++)
{ {
Buffer buffer = XLogReadBuffer(data->node, data->toDelete[i], false); Buffer buffer = XLogReadBuffer(data->node, data->toDelete[i], false);
...@@ -678,7 +728,6 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record) ...@@ -678,7 +728,6 @@ gin_redo(XLogRecPtr lsn, XLogRecord *record)
* implement a similar optimization as we have in b-tree, and remove * implement a similar optimization as we have in b-tree, and remove
* killed tuples outside VACUUM, we'll need to handle that here. * killed tuples outside VACUUM, we'll need to handle that here.
*/ */
RestoreBkpBlocks(lsn, record, false);
topCtx = MemoryContextSwitchTo(opCtx); topCtx = MemoryContextSwitchTo(opCtx);
switch (info) switch (info)
......
...@@ -32,23 +32,37 @@ typedef struct ...@@ -32,23 +32,37 @@ typedef struct
static MemoryContext opCtx; /* working memory for operations */ static MemoryContext opCtx; /* working memory for operations */
/* /*
* Replay the clearing of F_FOLLOW_RIGHT flag. * Replay the clearing of F_FOLLOW_RIGHT flag on a child page.
*
* Even if the WAL record includes a full-page image, we have to update the
* follow-right flag, because that change is not included in the full-page
* image. To be sure that the intermediate state with the wrong flag value is
* not visible to concurrent Hot Standby queries, this function handles
* restoring the full-page image as well as updating the flag. (Note that
* we never need to do anything else to the child page in the current WAL
* action.)
*/ */
static void static void
gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn, gistRedoClearFollowRight(XLogRecPtr lsn, XLogRecord *record, int block_index,
BlockNumber leftblkno) RelFileNode node, BlockNumber childblkno)
{ {
Buffer buffer; Buffer buffer;
Page page;
buffer = XLogReadBuffer(node, leftblkno, false); if (record->xl_info & XLR_BKP_BLOCK(block_index))
if (BufferIsValid(buffer)) buffer = RestoreBackupBlock(lsn, record, block_index, false, true);
else
{ {
Page page = (Page) BufferGetPage(buffer); buffer = XLogReadBuffer(node, childblkno, false);
if (!BufferIsValid(buffer))
return; /* page was deleted, nothing to do */
}
page = (Page) BufferGetPage(buffer);
/* /*
* Note that we still update the page even if page LSN is equal to the * Note that we still update the page even if page LSN is equal to the LSN
* LSN of this record, because the updated NSN is not included in the * of this record, because the updated NSN is not included in the full
* full page image. * page image.
*/ */
if (!XLByteLT(lsn, PageGetLSN(page))) if (!XLByteLT(lsn, PageGetLSN(page)))
{ {
...@@ -60,7 +74,6 @@ gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn, ...@@ -60,7 +74,6 @@ gistRedoClearFollowRight(RelFileNode node, XLogRecPtr lsn,
MarkBufferDirty(buffer); MarkBufferDirty(buffer);
} }
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
}
} }
/* /*
...@@ -75,18 +88,37 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) ...@@ -75,18 +88,37 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
Page page; Page page;
char *data; char *data;
/*
* We need to acquire and hold lock on target page while updating the left
* child page. If we have a full-page image of target page, getting the
* lock is a side-effect of restoring that image. Note that even if the
* target page no longer exists, we'll still attempt to replay the change
* on the child page.
*/
if (record->xl_info & XLR_BKP_BLOCK(0))
buffer = RestoreBackupBlock(lsn, record, 0, false, true);
else
buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
/* Fix follow-right data on left child page */
if (BlockNumberIsValid(xldata->leftchild)) if (BlockNumberIsValid(xldata->leftchild))
gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild); gistRedoClearFollowRight(lsn, record, 1,
xldata->node, xldata->leftchild);
/* nothing more to do if page was backed up (and no info to do it with) */ /* Done if target page no longer exists */
if (record->xl_info & XLR_BKP_BLOCK_1) if (!BufferIsValid(buffer))
return; return;
buffer = XLogReadBuffer(xldata->node, xldata->blkno, false); /* nothing more to do if page was backed up (and no info to do it with) */
if (!BufferIsValid(buffer)) if (record->xl_info & XLR_BKP_BLOCK(0))
{
UnlockReleaseBuffer(buffer);
return; return;
}
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
/* nothing more to do if change already applied */
if (XLByteLE(lsn, PageGetLSN(page))) if (XLByteLE(lsn, PageGetLSN(page)))
{ {
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
...@@ -140,13 +172,16 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) ...@@ -140,13 +172,16 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
GistClearTuplesDeleted(page); GistClearTuplesDeleted(page);
} }
if (!GistPageIsLeaf(page) && PageGetMaxOffsetNumber(page) == InvalidOffsetNumber && xldata->blkno == GIST_ROOT_BLKNO) if (!GistPageIsLeaf(page) &&
PageGetMaxOffsetNumber(page) == InvalidOffsetNumber &&
xldata->blkno == GIST_ROOT_BLKNO)
{
/* /*
* all links on non-leaf root page was deleted by vacuum full, so root * all links on non-leaf root page was deleted by vacuum full, so root
* page becomes a leaf * page becomes a leaf
*/ */
GistPageSetLeaf(page); GistPageSetLeaf(page);
}
GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; GistPageGetOpaque(page)->rightlink = InvalidBlockNumber;
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
...@@ -155,30 +190,6 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record) ...@@ -155,30 +190,6 @@ gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record)
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
static void
gistRedoPageDeleteRecord(XLogRecPtr lsn, XLogRecord *record)
{
gistxlogPageDelete *xldata = (gistxlogPageDelete *) XLogRecGetData(record);
Buffer buffer;
Page page;
/* nothing else to do if page was backed up (and no info to do it with) */
if (record->xl_info & XLR_BKP_BLOCK_1)
return;
buffer = XLogReadBuffer(xldata->node, xldata->blkno, false);
if (!BufferIsValid(buffer))
return;
page = (Page) BufferGetPage(buffer);
GistPageSetDeleted(page);
PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer);
UnlockReleaseBuffer(buffer);
}
static void static void
decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record) decodePageSplitRecord(PageSplitRecord *decoded, XLogRecord *record)
{ {
...@@ -215,15 +226,22 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) ...@@ -215,15 +226,22 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
{ {
gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record); gistxlogPageSplit *xldata = (gistxlogPageSplit *) XLogRecGetData(record);
PageSplitRecord xlrec; PageSplitRecord xlrec;
Buffer firstbuffer = InvalidBuffer;
Buffer buffer; Buffer buffer;
Page page; Page page;
int i; int i;
bool isrootsplit = false; bool isrootsplit = false;
if (BlockNumberIsValid(xldata->leftchild))
gistRedoClearFollowRight(xldata->node, lsn, xldata->leftchild);
decodePageSplitRecord(&xlrec, record); decodePageSplitRecord(&xlrec, record);
/*
* We must hold lock on the first-listed page throughout the action,
* including while updating the left child page (if any). We can unlock
* remaining pages in the list as soon as they've been written, because
* there is no path for concurrent queries to reach those pages without
* first visiting the first-listed page.
*/
/* loop around all pages */ /* loop around all pages */
for (i = 0; i < xlrec.data->npage; i++) for (i = 0; i < xlrec.data->npage; i++)
{ {
...@@ -273,8 +291,20 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) ...@@ -273,8 +291,20 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record)
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer); MarkBufferDirty(buffer);
if (i == 0)
firstbuffer = buffer;
else
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
} }
/* Fix follow-right data on left child page, if any */
if (BlockNumberIsValid(xldata->leftchild))
gistRedoClearFollowRight(lsn, record, 0,
xldata->node, xldata->leftchild);
/* Finally, release lock on the first page */
UnlockReleaseBuffer(firstbuffer);
} }
static void static void
...@@ -284,6 +314,9 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record) ...@@ -284,6 +314,9 @@ gistRedoCreateIndex(XLogRecPtr lsn, XLogRecord *record)
Buffer buffer; Buffer buffer;
Page page; Page page;
/* Backup blocks are not used in create_index records */
Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true); buffer = XLogReadBuffer(*node, GIST_ROOT_BLKNO, true);
Assert(BufferIsValid(buffer)); Assert(BufferIsValid(buffer));
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
...@@ -308,7 +341,6 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) ...@@ -308,7 +341,6 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
* implement a similar optimization we have in b-tree, and remove killed * implement a similar optimization we have in b-tree, and remove killed
* tuples outside VACUUM, we'll need to handle that here. * tuples outside VACUUM, we'll need to handle that here.
*/ */
RestoreBkpBlocks(lsn, record, false);
oldCxt = MemoryContextSwitchTo(opCtx); oldCxt = MemoryContextSwitchTo(opCtx);
switch (info) switch (info)
...@@ -316,9 +348,6 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) ...@@ -316,9 +348,6 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record)
case XLOG_GIST_PAGE_UPDATE: case XLOG_GIST_PAGE_UPDATE:
gistRedoPageUpdateRecord(lsn, record); gistRedoPageUpdateRecord(lsn, record);
break; break;
case XLOG_GIST_PAGE_DELETE:
gistRedoPageDeleteRecord(lsn, record);
break;
case XLOG_GIST_PAGE_SPLIT: case XLOG_GIST_PAGE_SPLIT:
gistRedoPageSplitRecord(lsn, record); gistRedoPageSplitRecord(lsn, record);
break; break;
...@@ -347,14 +376,6 @@ out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) ...@@ -347,14 +376,6 @@ out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec)
appendStringInfo(buf, "; block number %u", xlrec->blkno); appendStringInfo(buf, "; block number %u", xlrec->blkno);
} }
static void
out_gistxlogPageDelete(StringInfo buf, gistxlogPageDelete *xlrec)
{
appendStringInfo(buf, "page_delete: rel %u/%u/%u; blkno %u",
xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
xlrec->blkno);
}
static void static void
out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec) out_gistxlogPageSplit(StringInfo buf, gistxlogPageSplit *xlrec)
{ {
...@@ -375,9 +396,6 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec) ...@@ -375,9 +396,6 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec)
appendStringInfo(buf, "page_update: "); appendStringInfo(buf, "page_update: ");
out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec); out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec);
break; break;
case XLOG_GIST_PAGE_DELETE:
out_gistxlogPageDelete(buf, (gistxlogPageDelete *) rec);
break;
case XLOG_GIST_PAGE_SPLIT: case XLOG_GIST_PAGE_SPLIT:
out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec); out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec);
break; break;
...@@ -498,37 +516,30 @@ gistXLogUpdate(RelFileNode node, Buffer buffer, ...@@ -498,37 +516,30 @@ gistXLogUpdate(RelFileNode node, Buffer buffer,
Buffer leftchildbuf) Buffer leftchildbuf)
{ {
XLogRecData *rdata; XLogRecData *rdata;
gistxlogPageUpdate *xlrec; gistxlogPageUpdate xlrec;
int cur, int cur,
i; i;
XLogRecPtr recptr; XLogRecPtr recptr;
rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (4 + ituplen)); rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (3 + ituplen));
xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate));
xlrec->node = node; xlrec.node = node;
xlrec->blkno = BufferGetBlockNumber(buffer); xlrec.blkno = BufferGetBlockNumber(buffer);
xlrec->ntodelete = ntodelete; xlrec.ntodelete = ntodelete;
xlrec->leftchild = xlrec.leftchild =
BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber; BufferIsValid(leftchildbuf) ? BufferGetBlockNumber(leftchildbuf) : InvalidBlockNumber;
rdata[0].buffer = buffer; rdata[0].data = (char *) &xlrec;
rdata[0].buffer_std = true; rdata[0].len = sizeof(gistxlogPageUpdate);
rdata[0].data = NULL; rdata[0].buffer = InvalidBuffer;
rdata[0].len = 0;
rdata[0].next = &(rdata[1]); rdata[0].next = &(rdata[1]);
rdata[1].data = (char *) xlrec; rdata[1].data = (char *) todelete;
rdata[1].len = sizeof(gistxlogPageUpdate); rdata[1].len = sizeof(OffsetNumber) * ntodelete;
rdata[1].buffer = InvalidBuffer; rdata[1].buffer = buffer;
rdata[1].next = &(rdata[2]); rdata[1].buffer_std = true;
rdata[2].data = (char *) todelete;
rdata[2].len = sizeof(OffsetNumber) * ntodelete;
rdata[2].buffer = buffer;
rdata[2].buffer_std = true;
cur = 3; cur = 2;
/* new tuples */ /* new tuples */
for (i = 0; i < ituplen; i++) for (i = 0; i < ituplen; i++)
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
...@@ -438,8 +438,9 @@ critical section.) ...@@ -438,8 +438,9 @@ critical section.)
4. Mark the shared buffer(s) as dirty with MarkBufferDirty(). (This must 4. Mark the shared buffer(s) as dirty with MarkBufferDirty(). (This must
happen before the WAL record is inserted; see notes in SyncOneBuffer().) happen before the WAL record is inserted; see notes in SyncOneBuffer().)
5. Build a WAL log record and pass it to XLogInsert(); then update the page's 5. If the relation requires WAL-logging, build a WAL log record and pass it
LSN and TLI using the returned XLOG location. For instance, to XLogInsert(); then update the page's LSN and TLI using the returned XLOG
location. For instance,
recptr = XLogInsert(rmgr_id, info, rdata); recptr = XLogInsert(rmgr_id, info, rdata);
...@@ -466,9 +467,9 @@ which buffers were handled that way --- otherwise they may be misled about ...@@ -466,9 +467,9 @@ which buffers were handled that way --- otherwise they may be misled about
what the XLOG record actually contains. XLOG records that describe multi-page what the XLOG record actually contains. XLOG records that describe multi-page
changes therefore require some care to design: you must be certain that you changes therefore require some care to design: you must be certain that you
know what data is indicated by each "BKP" bit. An example of the trickiness know what data is indicated by each "BKP" bit. An example of the trickiness
is that in a HEAP_UPDATE record, BKP(1) normally is associated with the source is that in a HEAP_UPDATE record, BKP(0) normally is associated with the source
page and BKP(2) is associated with the destination page --- but if these are page and BKP(1) is associated with the destination page --- but if these are
the same page, only BKP(1) would have been set. the same page, only BKP(0) would have been set.
For this reason as well as the risk of deadlocking on buffer locks, it's best For this reason as well as the risk of deadlocking on buffer locks, it's best
to design WAL records so that they reflect small atomic actions involving just to design WAL records so that they reflect small atomic actions involving just
...@@ -497,12 +498,19 @@ incrementally update the page, the rdata array *must* mention the buffer ...@@ -497,12 +498,19 @@ incrementally update the page, the rdata array *must* mention the buffer
ID at least once; otherwise there is no defense against torn-page problems. ID at least once; otherwise there is no defense against torn-page problems.
The standard replay-routine pattern for this case is The standard replay-routine pattern for this case is
if (record->xl_info & XLR_BKP_BLOCK_n) if (record->xl_info & XLR_BKP_BLOCK(N))
<< do nothing, page was rewritten from logged copy >>; {
/* apply the change from the full-page image */
(void) RestoreBackupBlock(lsn, record, N, false, false);
return;
}
buffer = XLogReadBuffer(rnode, blkno, false); buffer = XLogReadBuffer(rnode, blkno, false);
if (!BufferIsValid(buffer)) if (!BufferIsValid(buffer))
<< do nothing, page has been deleted >>; {
/* page has been deleted, so we need do nothing */
return;
}
page = (Page) BufferGetPage(buffer); page = (Page) BufferGetPage(buffer);
if (XLByteLE(lsn, PageGetLSN(page))) if (XLByteLE(lsn, PageGetLSN(page)))
...@@ -520,13 +528,42 @@ The standard replay-routine pattern for this case is ...@@ -520,13 +528,42 @@ The standard replay-routine pattern for this case is
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
As noted above, for a multi-page update you need to be able to determine As noted above, for a multi-page update you need to be able to determine
which XLR_BKP_BLOCK_n flag applies to each page. If a WAL record reflects which XLR_BKP_BLOCK(N) flag applies to each page. If a WAL record reflects
a combination of fully-rewritable and incremental updates, then the rewritable a combination of fully-rewritable and incremental updates, then the rewritable
pages don't count for the XLR_BKP_BLOCK_n numbering. (XLR_BKP_BLOCK_n is pages don't count for the XLR_BKP_BLOCK(N) numbering. (XLR_BKP_BLOCK(N) is
associated with the n'th distinct buffer ID seen in the "rdata" array, and associated with the N'th distinct buffer ID seen in the "rdata" array, and
per the above discussion, fully-rewritable buffers shouldn't be mentioned in per the above discussion, fully-rewritable buffers shouldn't be mentioned in
"rdata".) "rdata".)
When replaying a WAL record that describes changes on multiple pages, you
must be careful to lock the pages properly to prevent concurrent Hot Standby
queries from seeing an inconsistent state. If this requires that two
or more buffer locks be held concurrently, the coding pattern shown above
is too simplistic, since it assumes the routine can exit as soon as it's
known the current page requires no modification. Instead, you might have
something like
if (record->xl_info & XLR_BKP_BLOCK(0))
{
/* apply the change from the full-page image */
buffer0 = RestoreBackupBlock(lsn, record, 0, false, true);
}
else
{
buffer0 = XLogReadBuffer(rnode, blkno, false);
if (BufferIsValid(buffer0))
{
... apply the change if not already done ...
MarkBufferDirty(buffer0);
}
}
... similarly apply the changes for remaining pages ...
/* and now we can release the lock on the first page */
if (BufferIsValid(buffer0))
UnlockReleaseBuffer(buffer0);
Due to all these constraints, complex changes (such as a multilevel index Due to all these constraints, complex changes (such as a multilevel index
insertion) normally need to be described by a series of atomic-action WAL insertion) normally need to be described by a series of atomic-action WAL
records. What do you do if the intermediate states are not self-consistent? records. What do you do if the intermediate states are not self-consistent?
......
...@@ -835,8 +835,8 @@ begin:; ...@@ -835,8 +835,8 @@ begin:;
* At the exit of this loop, write_len includes the backup block data. * At the exit of this loop, write_len includes the backup block data.
* *
* Also set the appropriate info bits to show which buffers were backed * Also set the appropriate info bits to show which buffers were backed
* up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct * up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer
* buffer value (ignoring InvalidBuffer) appearing in the rdata chain. * value (ignoring InvalidBuffer) appearing in the rdata chain.
*/ */
rdt_lastnormal = rdt; rdt_lastnormal = rdt;
write_len = len; write_len = len;
...@@ -848,7 +848,7 @@ begin:; ...@@ -848,7 +848,7 @@ begin:;
if (!dtbuf_bkp[i]) if (!dtbuf_bkp[i])
continue; continue;
info |= XLR_SET_BKP_BLOCK(i); info |= XLR_BKP_BLOCK(i);
bkpb = &(dtbuf_xlg[i]); bkpb = &(dtbuf_xlg[i]);
page = (char *) BufferGetBlock(dtbuf[i]); page = (char *) BufferGetBlock(dtbuf[i]);
...@@ -3080,9 +3080,16 @@ CleanupBackupHistory(void) ...@@ -3080,9 +3080,16 @@ CleanupBackupHistory(void)
} }
/* /*
* Restore the backup blocks present in an XLOG record, if any. * Restore a full-page image from a backup block attached to an XLOG record.
* *
* We assume all of the record has been read into memory at *record. * lsn: LSN of the XLOG record being replayed
* record: the complete XLOG record
* block_index: which backup block to restore (0 .. XLR_MAX_BKP_BLOCKS - 1)
* get_cleanup_lock: TRUE to get a cleanup rather than plain exclusive lock
* keep_buffer: TRUE to return the buffer still locked and pinned
*
* Returns the buffer number containing the page. Note this is not terribly
* useful unless keep_buffer is specified as TRUE.
* *
* Note: when a backup block is available in XLOG, we restore it * Note: when a backup block is available in XLOG, we restore it
* unconditionally, even if the page in the database appears newer. * unconditionally, even if the page in the database appears newer.
...@@ -3093,15 +3100,20 @@ CleanupBackupHistory(void) ...@@ -3093,15 +3100,20 @@ CleanupBackupHistory(void)
* modifications of the page that appear in XLOG, rather than possibly * modifications of the page that appear in XLOG, rather than possibly
* ignoring them as already applied, but that's not a huge drawback. * ignoring them as already applied, but that's not a huge drawback.
* *
* If 'cleanup' is true, a cleanup lock is used when restoring blocks. * If 'get_cleanup_lock' is true, a cleanup lock is obtained on the buffer,
* Otherwise, a normal exclusive lock is used. During crash recovery, that's * else a normal exclusive lock is used. During crash recovery, that's just
* just pro forma because there can't be any regular backends in the system, * pro forma because there can't be any regular backends in the system, but
* but in hot standby mode the distinction is important. The 'cleanup' * in hot standby mode the distinction is important.
* argument applies to all backup blocks in the WAL record, that suffices for *
* now. * If 'keep_buffer' is true, return without releasing the buffer lock and pin;
* then caller is responsible for doing UnlockReleaseBuffer() later. This
* is needed in some cases when replaying XLOG records that touch multiple
* pages, to prevent inconsistent states from being visible to other backends.
* (Again, that's only important in hot standby mode.)
*/ */
void Buffer
RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup) RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
bool get_cleanup_lock, bool keep_buffer)
{ {
Buffer buffer; Buffer buffer;
Page page; Page page;
...@@ -3109,22 +3121,23 @@ RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup) ...@@ -3109,22 +3121,23 @@ RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
char *blk; char *blk;
int i; int i;
if (!(record->xl_info & XLR_BKP_BLOCK_MASK)) /* Locate requested BkpBlock in the record */
return;
blk = (char *) XLogRecGetData(record) + record->xl_len; blk = (char *) XLogRecGetData(record) + record->xl_len;
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{ {
if (!(record->xl_info & XLR_SET_BKP_BLOCK(i))) if (!(record->xl_info & XLR_BKP_BLOCK(i)))
continue; continue;
memcpy(&bkpb, blk, sizeof(BkpBlock)); memcpy(&bkpb, blk, sizeof(BkpBlock));
blk += sizeof(BkpBlock); blk += sizeof(BkpBlock);
if (i == block_index)
{
/* Found it, apply the update */
buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block, buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
RBM_ZERO); RBM_ZERO);
Assert(BufferIsValid(buffer)); Assert(BufferIsValid(buffer));
if (cleanup) if (get_cleanup_lock)
LockBufferForCleanup(buffer); LockBufferForCleanup(buffer);
else else
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
...@@ -3148,10 +3161,19 @@ RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup) ...@@ -3148,10 +3161,19 @@ RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
PageSetLSN(page, lsn); PageSetLSN(page, lsn);
PageSetTLI(page, ThisTimeLineID); PageSetTLI(page, ThisTimeLineID);
MarkBufferDirty(buffer); MarkBufferDirty(buffer);
if (!keep_buffer)
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(buffer);
return buffer;
}
blk += BLCKSZ - bkpb.hole_length; blk += BLCKSZ - bkpb.hole_length;
} }
/* Caller specified a bogus block_index */
elog(ERROR, "failed to restore block_index %d", block_index);
return InvalidBuffer; /* keep compiler quiet */
} }
/* /*
...@@ -3193,7 +3215,7 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) ...@@ -3193,7 +3215,7 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
{ {
uint32 blen; uint32 blen;
if (!(record->xl_info & XLR_SET_BKP_BLOCK(i))) if (!(record->xl_info & XLR_BKP_BLOCK(i)))
continue; continue;
if (remaining < sizeof(BkpBlock)) if (remaining < sizeof(BkpBlock))
...@@ -8081,7 +8103,8 @@ xlog_outrec(StringInfo buf, XLogRecord *record) ...@@ -8081,7 +8103,8 @@ xlog_outrec(StringInfo buf, XLogRecord *record)
int i; int i;
appendStringInfo(buf, "prev %X/%X; xid %u", appendStringInfo(buf, "prev %X/%X; xid %u",
(uint32) (record->xl_prev >> 32), (uint32) record->xl_prev, (uint32) (record->xl_prev >> 32),
(uint32) record->xl_prev,
record->xl_xid); record->xl_xid);
appendStringInfo(buf, "; len %u", appendStringInfo(buf, "; len %u",
...@@ -8089,8 +8112,8 @@ xlog_outrec(StringInfo buf, XLogRecord *record) ...@@ -8089,8 +8112,8 @@ xlog_outrec(StringInfo buf, XLogRecord *record)
for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{ {
if (record->xl_info & XLR_SET_BKP_BLOCK(i)) if (record->xl_info & XLR_BKP_BLOCK(i))
appendStringInfo(buf, "; bkpb%d", i + 1); appendStringInfo(buf, "; bkpb%d", i);
} }
appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name); appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
......
...@@ -167,7 +167,7 @@ typedef GISTScanOpaqueData *GISTScanOpaque; ...@@ -167,7 +167,7 @@ typedef GISTScanOpaqueData *GISTScanOpaque;
#define XLOG_GIST_PAGE_SPLIT 0x30 #define XLOG_GIST_PAGE_SPLIT 0x30
/* #define XLOG_GIST_INSERT_COMPLETE 0x40 */ /* not used anymore */ /* #define XLOG_GIST_INSERT_COMPLETE 0x40 */ /* not used anymore */
#define XLOG_GIST_CREATE_INDEX 0x50 #define XLOG_GIST_CREATE_INDEX 0x50
#define XLOG_GIST_PAGE_DELETE 0x60 /* #define XLOG_GIST_PAGE_DELETE 0x60 */ /* not used anymore */
typedef struct gistxlogPageUpdate typedef struct gistxlogPageUpdate
{ {
...@@ -211,12 +211,6 @@ typedef struct gistxlogPage ...@@ -211,12 +211,6 @@ typedef struct gistxlogPage
int num; /* number of index tuples following */ int num; /* number of index tuples following */
} gistxlogPage; } gistxlogPage;
typedef struct gistxlogPageDelete
{
RelFileNode node;
BlockNumber blkno;
} gistxlogPageDelete;
/* SplitedPageLayout - gistSplit function result */ /* SplitedPageLayout - gistSplit function result */
typedef struct SplitedPageLayout typedef struct SplitedPageLayout
{ {
......
...@@ -71,11 +71,7 @@ typedef struct XLogRecord ...@@ -71,11 +71,7 @@ typedef struct XLogRecord
*/ */
#define XLR_BKP_BLOCK_MASK 0x0F /* all info bits used for bkp blocks */ #define XLR_BKP_BLOCK_MASK 0x0F /* all info bits used for bkp blocks */
#define XLR_MAX_BKP_BLOCKS 4 #define XLR_MAX_BKP_BLOCKS 4
#define XLR_SET_BKP_BLOCK(iblk) (0x08 >> (iblk)) #define XLR_BKP_BLOCK(iblk) (0x08 >> (iblk)) /* iblk in 0..3 */
#define XLR_BKP_BLOCK_1 XLR_SET_BKP_BLOCK(0) /* 0x08 */
#define XLR_BKP_BLOCK_2 XLR_SET_BKP_BLOCK(1) /* 0x04 */
#define XLR_BKP_BLOCK_3 XLR_SET_BKP_BLOCK(2) /* 0x02 */
#define XLR_BKP_BLOCK_4 XLR_SET_BKP_BLOCK(3) /* 0x01 */
/* Sync methods */ /* Sync methods */
#define SYNC_METHOD_FSYNC 0 #define SYNC_METHOD_FSYNC 0
...@@ -94,13 +90,13 @@ extern int sync_method; ...@@ -94,13 +90,13 @@ extern int sync_method;
* If buffer is valid then XLOG will check if buffer must be backed up * If buffer is valid then XLOG will check if buffer must be backed up
* (ie, whether this is first change of that page since last checkpoint). * (ie, whether this is first change of that page since last checkpoint).
* If so, the whole page contents are attached to the XLOG record, and XLOG * If so, the whole page contents are attached to the XLOG record, and XLOG
* sets XLR_BKP_BLOCK_X bit in xl_info. Note that the buffer must be pinned * sets XLR_BKP_BLOCK(N) bit in xl_info. Note that the buffer must be pinned
* and exclusive-locked by the caller, so that it won't change under us. * and exclusive-locked by the caller, so that it won't change under us.
* NB: when the buffer is backed up, we DO NOT insert the data pointed to by * NB: when the buffer is backed up, we DO NOT insert the data pointed to by
* this XLogRecData struct into the XLOG record, since we assume it's present * this XLogRecData struct into the XLOG record, since we assume it's present
* in the buffer. Therefore, rmgr redo routines MUST pay attention to * in the buffer. Therefore, rmgr redo routines MUST pay attention to
* XLR_BKP_BLOCK_X to know what is actually stored in the XLOG record. * XLR_BKP_BLOCK(N) to know what is actually stored in the XLOG record.
* The i'th XLR_BKP_BLOCK bit corresponds to the i'th distinct buffer * The N'th XLR_BKP_BLOCK bit corresponds to the N'th distinct buffer
* value (ignoring InvalidBuffer) appearing in the rdata chain. * value (ignoring InvalidBuffer) appearing in the rdata chain.
* *
* When buffer is valid, caller must set buffer_std to indicate whether the * When buffer is valid, caller must set buffer_std to indicate whether the
...@@ -274,7 +270,9 @@ extern int XLogFileOpen(XLogSegNo segno); ...@@ -274,7 +270,9 @@ extern int XLogFileOpen(XLogSegNo segno);
extern void XLogGetLastRemoved(XLogSegNo *segno); extern void XLogGetLastRemoved(XLogSegNo *segno);
extern void XLogSetAsyncXactLSN(XLogRecPtr record); extern void XLogSetAsyncXactLSN(XLogRecPtr record);
extern void RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup); extern Buffer RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record,
int block_index,
bool get_cleanup_lock, bool keep_buffer);
extern void xlog_redo(XLogRecPtr lsn, XLogRecord *record); extern void xlog_redo(XLogRecPtr lsn, XLogRecord *record);
extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec); extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment