Commit eb11de8f authored by Heikki Linnakangas's avatar Heikki Linnakangas

Sanity-check that a page zeroed by redo routine is marked with WILL_INIT.

There was already a sanity-check in the other direction: if a page was
marked with WILL_INIT, it had to be initialized by the redo routine. It's
not strictly necessary for correctness that a page is marked with WILL_INIT
if it's going to be initialized at redo, but it's a missed optimization if
nothing else.

Fix a few instances of this issue in SP-GiST, where a block in WAL record
was not marked with WILL_INIT, but was in fact always initialized at redo.
We were creating a full-page image of the page unnecessarily in those
cases.

Backpatch to 9.5, where the new WILL_INIT flag was added.
parent e52b690c
...@@ -291,12 +291,16 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, ...@@ -291,12 +291,16 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
if (RelationNeedsWAL(index)) if (RelationNeedsWAL(index))
{ {
XLogRecPtr recptr; XLogRecPtr recptr;
int flags;
XLogBeginInsert(); XLogBeginInsert();
XLogRegisterData((char *) &xlrec, sizeof(xlrec)); XLogRegisterData((char *) &xlrec, sizeof(xlrec));
XLogRegisterData((char *) leafTuple, leafTuple->size); XLogRegisterData((char *) leafTuple, leafTuple->size);
XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); flags = REGBUF_STANDARD;
if (xlrec.newPage)
flags |= REGBUF_WILL_INIT;
XLogRegisterBuffer(0, current->buffer, flags);
if (xlrec.offnumParent != InvalidOffsetNumber) if (xlrec.offnumParent != InvalidOffsetNumber)
XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD); XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
...@@ -1348,12 +1352,16 @@ doPickSplit(Relation index, SpGistState *state, ...@@ -1348,12 +1352,16 @@ doPickSplit(Relation index, SpGistState *state,
XLogRegisterData((char *) innerTuple, innerTuple->size); XLogRegisterData((char *) innerTuple, innerTuple->size);
XLogRegisterData(leafdata, leafptr - leafdata); XLogRegisterData(leafdata, leafptr - leafdata);
/* Old leaf page */
if (BufferIsValid(saveCurrent.buffer))
{
flags = REGBUF_STANDARD; flags = REGBUF_STANDARD;
if (xlrec.initSrc) if (xlrec.initSrc)
flags |= REGBUF_WILL_INIT; flags |= REGBUF_WILL_INIT;
if (BufferIsValid(saveCurrent.buffer))
XLogRegisterBuffer(0, saveCurrent.buffer, flags); XLogRegisterBuffer(0, saveCurrent.buffer, flags);
}
/* New leaf page */
if (BufferIsValid(newLeafBuffer)) if (BufferIsValid(newLeafBuffer))
{ {
flags = REGBUF_STANDARD; flags = REGBUF_STANDARD;
...@@ -1361,7 +1369,14 @@ doPickSplit(Relation index, SpGistState *state, ...@@ -1361,7 +1369,14 @@ doPickSplit(Relation index, SpGistState *state,
flags |= REGBUF_WILL_INIT; flags |= REGBUF_WILL_INIT;
XLogRegisterBuffer(1, newLeafBuffer, flags); XLogRegisterBuffer(1, newLeafBuffer, flags);
} }
XLogRegisterBuffer(2, current->buffer, REGBUF_STANDARD);
/* Inner page */
flags = REGBUF_STANDARD;
if (xlrec.initInner)
flags |= REGBUF_WILL_INIT;
XLogRegisterBuffer(2, current->buffer, flags);
/* Parent page, if different from inner page */
if (parent->buffer != InvalidBuffer) if (parent->buffer != InvalidBuffer)
{ {
if (parent->buffer != current->buffer) if (parent->buffer != current->buffer)
...@@ -1631,13 +1646,17 @@ spgAddNodeAction(Relation index, SpGistState *state, ...@@ -1631,13 +1646,17 @@ spgAddNodeAction(Relation index, SpGistState *state,
if (RelationNeedsWAL(index)) if (RelationNeedsWAL(index))
{ {
XLogRecPtr recptr; XLogRecPtr recptr;
int flags;
XLogBeginInsert(); XLogBeginInsert();
/* orig page */ /* orig page */
XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD); XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
/* new page */ /* new page */
XLogRegisterBuffer(1, current->buffer, REGBUF_STANDARD); flags = REGBUF_STANDARD;
if (xlrec.newPage)
flags |= REGBUF_WILL_INIT;
XLogRegisterBuffer(1, current->buffer, flags);
/* parent page (if different from orig and new) */ /* parent page (if different from orig and new) */
if (xlrec.parentBlk == 2) if (xlrec.parentBlk == 2)
XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD); XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
......
...@@ -328,6 +328,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, ...@@ -328,6 +328,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
ForkNumber forknum; ForkNumber forknum;
BlockNumber blkno; BlockNumber blkno;
Page page; Page page;
bool zeromode;
bool willinit;
if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno)) if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
{ {
...@@ -335,6 +337,17 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, ...@@ -335,6 +337,17 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
elog(PANIC, "failed to locate backup block with ID %d", block_id); elog(PANIC, "failed to locate backup block with ID %d", block_id);
} }
/*
* Make sure that if the block is marked with WILL_INIT, the caller is
* going to initialize it. And vice versa.
*/
zeromode = (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
willinit = (record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0;
if (willinit && !zeromode)
elog(PANIC, "block with WILL_INIT flag in WAL record must be zeroed by redo routine");
if (!willinit && zeromode)
elog(PANIC, "block to be initialized in redo routine must be marked with WILL_INIT flag in the WAL record");
/* If it's a full-page image, restore it. */ /* If it's a full-page image, restore it. */
if (XLogRecHasBlockImage(record, block_id)) if (XLogRecHasBlockImage(record, block_id))
{ {
...@@ -359,12 +372,6 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, ...@@ -359,12 +372,6 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
} }
else else
{ {
if ((record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0 &&
mode != RBM_ZERO_AND_LOCK && mode != RBM_ZERO_AND_CLEANUP_LOCK)
{
elog(PANIC, "block with WILL_INIT flag in WAL record must be zeroed by redo routine");
}
*buf = XLogReadBufferExtended(rnode, forknum, blkno, mode); *buf = XLogReadBufferExtended(rnode, forknum, blkno, mode);
if (BufferIsValid(*buf)) if (BufferIsValid(*buf))
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment