Commit 3df92bbd authored by Peter Geoghegan's avatar Peter Geoghegan

Rename nbtree split REDO routine variables.

Make the nbtree page split REDO routine variable names consistent with
_bt_split() (which handles the original execution of page splits).
These names make the code easier to follow by making the distinction
between the original page and the left half of the split clear.  (The
left half of the split page is a temp page that REDO creates to replace
the origpage contents.)

Also reduce the elevel used when adding a new high key to the temp page
from PANIC to ERROR to be consistent.  We already only raise an ERROR
when data item PageAddItem() temp page calls fail.
parent 199cec97
...@@ -256,20 +256,20 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record) ...@@ -256,20 +256,20 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record)
XLogRecPtr lsn = record->EndRecPtr; XLogRecPtr lsn = record->EndRecPtr;
xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record); xl_btree_split *xlrec = (xl_btree_split *) XLogRecGetData(record);
bool isleaf = (xlrec->level == 0); bool isleaf = (xlrec->level == 0);
Buffer lbuf; Buffer buf;
Buffer rbuf; Buffer rbuf;
Page rpage; Page rpage;
BTPageOpaque ropaque; BTPageOpaque ropaque;
char *datapos; char *datapos;
Size datalen; Size datalen;
BlockNumber leftsib; BlockNumber origpagenumber;
BlockNumber rightsib; BlockNumber rightpagenumber;
BlockNumber rnext; BlockNumber spagenumber;
XLogRecGetBlockTag(record, 0, NULL, NULL, &leftsib); XLogRecGetBlockTag(record, 0, NULL, NULL, &origpagenumber);
XLogRecGetBlockTag(record, 1, NULL, NULL, &rightsib); XLogRecGetBlockTag(record, 1, NULL, NULL, &rightpagenumber);
if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &rnext)) if (!XLogRecGetBlockTag(record, 2, NULL, NULL, &spagenumber))
rnext = P_NONE; spagenumber = P_NONE;
/* /*
* Clear the incomplete split flag on the left sibling of the child page * Clear the incomplete split flag on the left sibling of the child page
...@@ -287,8 +287,8 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record) ...@@ -287,8 +287,8 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record)
_bt_pageinit(rpage, BufferGetPageSize(rbuf)); _bt_pageinit(rpage, BufferGetPageSize(rbuf));
ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage); ropaque = (BTPageOpaque) PageGetSpecialPointer(rpage);
ropaque->btpo_prev = leftsib; ropaque->btpo_prev = origpagenumber;
ropaque->btpo_next = rnext; ropaque->btpo_next = spagenumber;
ropaque->btpo.level = xlrec->level; ropaque->btpo.level = xlrec->level;
ropaque->btpo_flags = isleaf ? BTP_LEAF : 0; ropaque->btpo_flags = isleaf ? BTP_LEAF : 0;
ropaque->btpo_cycleid = 0; ropaque->btpo_cycleid = 0;
...@@ -298,8 +298,8 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record) ...@@ -298,8 +298,8 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record)
PageSetLSN(rpage, lsn); PageSetLSN(rpage, lsn);
MarkBufferDirty(rbuf); MarkBufferDirty(rbuf);
/* Now reconstruct left (original) sibling page */ /* Now reconstruct original page (left half of split) */
if (XLogReadBufferForRedo(record, 0, &lbuf) == BLK_NEEDS_REDO) if (XLogReadBufferForRedo(record, 0, &buf) == BLK_NEEDS_REDO)
{ {
/* /*
* To retain the same physical order of the tuples that they had, we * To retain the same physical order of the tuples that they had, we
...@@ -309,15 +309,15 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record) ...@@ -309,15 +309,15 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record)
* checking possible. See also _bt_restore_page(), which does the * checking possible. See also _bt_restore_page(), which does the
* same for the right page. * same for the right page.
*/ */
Page lpage = (Page) BufferGetPage(lbuf); Page origpage = (Page) BufferGetPage(buf);
BTPageOpaque lopaque = (BTPageOpaque) PageGetSpecialPointer(lpage); BTPageOpaque oopaque = (BTPageOpaque) PageGetSpecialPointer(origpage);
OffsetNumber off; OffsetNumber off;
IndexTuple newitem = NULL, IndexTuple newitem = NULL,
left_hikey = NULL, left_hikey = NULL,
nposting = NULL; nposting = NULL;
Size newitemsz = 0, Size newitemsz = 0,
left_hikeysz = 0; left_hikeysz = 0;
Page newlpage; Page leftpage;
OffsetNumber leftoff, OffsetNumber leftoff,
replacepostingoff = InvalidOffsetNumber; replacepostingoff = InvalidOffsetNumber;
...@@ -340,8 +340,8 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record) ...@@ -340,8 +340,8 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record)
/* Use mutable, aligned newitem copy in _bt_swap_posting() */ /* Use mutable, aligned newitem copy in _bt_swap_posting() */
newitem = CopyIndexTuple(newitem); newitem = CopyIndexTuple(newitem);
itemid = PageGetItemId(lpage, replacepostingoff); itemid = PageGetItemId(origpage, replacepostingoff);
oposting = (IndexTuple) PageGetItem(lpage, itemid); oposting = (IndexTuple) PageGetItem(origpage, itemid);
nposting = _bt_swap_posting(newitem, oposting, nposting = _bt_swap_posting(newitem, oposting,
xlrec->postingoff); xlrec->postingoff);
} }
...@@ -359,16 +359,16 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record) ...@@ -359,16 +359,16 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record)
Assert(datalen == 0); Assert(datalen == 0);
newlpage = PageGetTempPageCopySpecial(lpage); leftpage = PageGetTempPageCopySpecial(origpage);
/* Set high key */ /* Add high key tuple from WAL record to temp page */
leftoff = P_HIKEY; leftoff = P_HIKEY;
if (PageAddItem(newlpage, (Item) left_hikey, left_hikeysz, if (PageAddItem(leftpage, (Item) left_hikey, left_hikeysz, P_HIKEY,
P_HIKEY, false, false) == InvalidOffsetNumber) false, false) == InvalidOffsetNumber)
elog(PANIC, "failed to add high key to left page after split"); elog(ERROR, "failed to add high key to left page after split");
leftoff = OffsetNumberNext(leftoff); leftoff = OffsetNumberNext(leftoff);
for (off = P_FIRSTDATAKEY(lopaque); off < xlrec->firstrightoff; off++) for (off = P_FIRSTDATAKEY(oopaque); off < xlrec->firstrightoff; off++)
{ {
ItemId itemid; ItemId itemid;
Size itemsz; Size itemsz;
...@@ -379,7 +379,7 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record) ...@@ -379,7 +379,7 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record)
{ {
Assert(newitemonleft || Assert(newitemonleft ||
xlrec->firstrightoff == xlrec->newitemoff); xlrec->firstrightoff == xlrec->newitemoff);
if (PageAddItem(newlpage, (Item) nposting, if (PageAddItem(leftpage, (Item) nposting,
MAXALIGN(IndexTupleSize(nposting)), leftoff, MAXALIGN(IndexTupleSize(nposting)), leftoff,
false, false) == InvalidOffsetNumber) false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new posting list item to left page after split"); elog(ERROR, "failed to add new posting list item to left page after split");
...@@ -390,16 +390,16 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record) ...@@ -390,16 +390,16 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record)
/* add the new item if it was inserted on left page */ /* add the new item if it was inserted on left page */
else if (newitemonleft && off == xlrec->newitemoff) else if (newitemonleft && off == xlrec->newitemoff)
{ {
if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff, if (PageAddItem(leftpage, (Item) newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber) false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new item to left page after split"); elog(ERROR, "failed to add new item to left page after split");
leftoff = OffsetNumberNext(leftoff); leftoff = OffsetNumberNext(leftoff);
} }
itemid = PageGetItemId(lpage, off); itemid = PageGetItemId(origpage, off);
itemsz = ItemIdGetLength(itemid); itemsz = ItemIdGetLength(itemid);
item = (IndexTuple) PageGetItem(lpage, itemid); item = (IndexTuple) PageGetItem(origpage, itemid);
if (PageAddItem(newlpage, (Item) item, itemsz, leftoff, if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
false, false) == InvalidOffsetNumber) false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add old item to left page after split"); elog(ERROR, "failed to add old item to left page after split");
leftoff = OffsetNumberNext(leftoff); leftoff = OffsetNumberNext(leftoff);
...@@ -408,31 +408,31 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record) ...@@ -408,31 +408,31 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record)
/* cope with possibility that newitem goes at the end */ /* cope with possibility that newitem goes at the end */
if (newitemonleft && off == xlrec->newitemoff) if (newitemonleft && off == xlrec->newitemoff)
{ {
if (PageAddItem(newlpage, (Item) newitem, newitemsz, leftoff, if (PageAddItem(leftpage, (Item) newitem, newitemsz, leftoff,
false, false) == InvalidOffsetNumber) false, false) == InvalidOffsetNumber)
elog(ERROR, "failed to add new item to left page after split"); elog(ERROR, "failed to add new item to left page after split");
leftoff = OffsetNumberNext(leftoff); leftoff = OffsetNumberNext(leftoff);
} }
PageRestoreTempPage(newlpage, lpage); PageRestoreTempPage(leftpage, origpage);
/* Fix opaque fields */ /* Fix opaque fields */
lopaque->btpo_flags = BTP_INCOMPLETE_SPLIT; oopaque->btpo_flags = BTP_INCOMPLETE_SPLIT;
if (isleaf) if (isleaf)
lopaque->btpo_flags |= BTP_LEAF; oopaque->btpo_flags |= BTP_LEAF;
lopaque->btpo_next = rightsib; oopaque->btpo_next = rightpagenumber;
lopaque->btpo_cycleid = 0; oopaque->btpo_cycleid = 0;
PageSetLSN(lpage, lsn); PageSetLSN(origpage, lsn);
MarkBufferDirty(lbuf); MarkBufferDirty(buf);
} }
/* /*
* We no longer need the buffers. They must be released together, so that * We no longer need the buffers. They must be released together, so that
* readers cannot observe two inconsistent halves. * readers cannot observe two inconsistent halves.
*/ */
if (BufferIsValid(lbuf)) if (BufferIsValid(buf))
UnlockReleaseBuffer(lbuf); UnlockReleaseBuffer(buf);
UnlockReleaseBuffer(rbuf); UnlockReleaseBuffer(rbuf);
/* /*
...@@ -443,22 +443,22 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record) ...@@ -443,22 +443,22 @@ btree_xlog_split(bool newitemonleft, XLogReaderState *record)
* replay, because no other index update can be in progress, and readers * replay, because no other index update can be in progress, and readers
* will cope properly when following an obsolete left-link. * will cope properly when following an obsolete left-link.
*/ */
if (rnext != P_NONE) if (spagenumber != P_NONE)
{ {
Buffer buffer; Buffer sbuf;
if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO) if (XLogReadBufferForRedo(record, 2, &sbuf) == BLK_NEEDS_REDO)
{ {
Page page = (Page) BufferGetPage(buffer); Page spage = (Page) BufferGetPage(sbuf);
BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(page); BTPageOpaque spageop = (BTPageOpaque) PageGetSpecialPointer(spage);
pageop->btpo_prev = rightsib; spageop->btpo_prev = rightpagenumber;
PageSetLSN(page, lsn); PageSetLSN(spage, lsn);
MarkBufferDirty(buffer); MarkBufferDirty(sbuf);
} }
if (BufferIsValid(buffer)) if (BufferIsValid(sbuf))
UnlockReleaseBuffer(buffer); UnlockReleaseBuffer(sbuf);
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment