From b18c09ee3a8a35bb2cbc39f7bf83c618b936b1a1 Mon Sep 17 00:00:00 2001
From: "Vadim B. Mikheev" <vadim4o@yahoo.com>
Date: Fri, 2 Feb 2001 19:49:15 +0000
Subject: [PATCH] Runtime tree recovery is implemented, just testing is left
 -:)

---
 src/backend/access/nbtree/nbtinsert.c | 269 +++++++++++++++++++-------
 1 file changed, 202 insertions(+), 67 deletions(-)

diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index b44b7839de..6a4b925cd1 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *	  $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.79 2001/01/31 01:08:36 vadim Exp $
+ *	  $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.80 2001/02/02 19:49:15 vadim Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -38,7 +38,10 @@ extern bool FixBTree;
 
 Buffer _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release);
 static void _bt_fixtree(Relation rel, BlockNumber blkno);
-static BlockNumber _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit);
+static void _bt_fixbranch(Relation rel, BlockNumber lblkno, 
+				BlockNumber rblkno, BTStack true_stack);
+static void _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit);
+static void _bt_fixup(Relation rel, Buffer buf);
 static OffsetNumber _bt_getoff(Page page, BlockNumber blkno);
 
 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
@@ -64,7 +67,7 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
 static void _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
 							  int leftfree, int rightfree,
 							  bool newitemonleft, Size firstrightitemsz);
-static Buffer _bt_getstackbuf(Relation rel, BTStack stack);
+static Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
 static void _bt_pgaddtup(Relation rel, Page page,
 						 Size itemsize, BTItem btitem,
 						 OffsetNumber itup_off, const char *where);
@@ -497,34 +500,7 @@ _bt_insertonpg(Relation rel,
 						elog(ERROR, "bt_insertonpg: no root page found");
 					_bt_wrtbuf(rel, rbuf);
 					_bt_wrtnorelbuf(rel, buf);
-					while(! P_LEFTMOST(lpageop))
-					{
-						BlockNumber		blkno = lpageop->btpo_prev;
-						LockBuffer(buf, BUFFER_LOCK_UNLOCK);
-						ReleaseBuffer(buf);
-						buf = _bt_getbuf(rel, blkno, BT_WRITE);
-						page = BufferGetPage(buf);
-						lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
-						/*
-						 * If someone else already created parent pages
-						 * then it's time for _bt_fixtree() to check upper
-						 * levels and fix them, if required.
-						 */
-						if (lpageop->btpo_parent != BTREE_METAPAGE)
-						{
-							blkno = lpageop->btpo_parent;
-							_bt_relbuf(rel, buf, BT_WRITE);
-							_bt_fixtree(rel, blkno);
-							goto formres;
-						}
-					}
-					/*
-					 * Ok, we are on the leftmost page, it's write locked
-					 * by us and its btpo_parent points to meta page - time
-					 * for _bt_fixroot().
-					 */
-					 buf = _bt_fixroot(rel, buf, true);
-					 _bt_relbuf(rel, buf, BT_WRITE);
+					_bt_fixup(rel, buf);
 					goto formres;
 				}
 
@@ -561,16 +537,22 @@ _bt_insertonpg(Relation rel,
 			ItemPointerSet(&(stack->bts_btitem.bti_itup.t_tid),
 						   bknum, P_HIKEY);
 
-			pbuf = _bt_getstackbuf(rel, stack);
-
-			if (pbuf == InvalidBuffer)
-				elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
-					 "\n\tRecreate index %s.", RelationGetRelationName(rel));
+			pbuf = _bt_getstackbuf(rel, stack, BT_WRITE);
 
 			/* Now we can write and unlock the children */
 			_bt_wrtbuf(rel, rbuf);
 			_bt_wrtbuf(rel, buf);
 
+			if (pbuf == InvalidBuffer)
+			{
+				if (!FixBTree)
+					elog(ERROR, "_bt_getstackbuf: my bits moved right off the end of the world!"
+						 "\n\tRecreate index %s.", RelationGetRelationName(rel));
+				pfree(new_item);
+				_bt_fixbranch(rel, bknum, rbknum, stack);
+				goto formres;
+			}
+
 			/* Recursively update the parent */
 			newres = _bt_insertonpg(rel, pbuf, stack->bts_parent,
 									0, NULL, new_item, stack->bts_offset);
@@ -1132,7 +1114,7 @@ _bt_checksplitloc(FindSplitData *state, OffsetNumber firstright,
  *		Also, re-set bts_blkno & bts_offset if changed.
  */
 static Buffer
-_bt_getstackbuf(Relation rel, BTStack stack)
+_bt_getstackbuf(Relation rel, BTStack stack, int access)
 {
 	BlockNumber blkno;
 	Buffer		buf;
@@ -1145,7 +1127,7 @@ _bt_getstackbuf(Relation rel, BTStack stack)
 	BTPageOpaque opaque;
 
 	blkno = stack->bts_blkno;
-	buf = _bt_getbuf(rel, blkno, BT_WRITE);
+	buf = _bt_getbuf(rel, blkno, access);
 	page = BufferGetPage(buf);
 	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 	maxoff = PageGetMaxOffsetNumber(page);
@@ -1179,13 +1161,13 @@ _bt_getstackbuf(Relation rel, BTStack stack)
 		/* by here, the item we're looking for moved right at least one page */
 		if (P_RIGHTMOST(opaque))
 		{
-			_bt_relbuf(rel, buf, BT_WRITE);
+			_bt_relbuf(rel, buf, access);
 			return(InvalidBuffer);
 		}
 
 		blkno = opaque->btpo_next;
-		_bt_relbuf(rel, buf, BT_WRITE);
-		buf = _bt_getbuf(rel, blkno, BT_WRITE);
+		_bt_relbuf(rel, buf, access);
+		buf = _bt_getbuf(rel, blkno, access);
 		page = BufferGetPage(buf);
 		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 		maxoff = PageGetMaxOffsetNumber(page);
@@ -1400,10 +1382,13 @@ _bt_fixroot(Relation rel, Buffer oldrootbuf, bool release)
 
 	/*
 	 * Now read other pages (if any) on level and add them to new root.
+	 * Here we break one of our locking rules - never hold lock on parent
+	 * page when acquiring lock on its child, - but we free from deadlock:
+	 *
 	 * If concurrent process will split one of pages on this level then it
-	 * will notice either btpo_parent == metablock or btpo_parent == rootblk.
-	 * In first case it will give up its locks and try to lock leftmost page
-	 * buffer (oldrootbuf) to fix root - ie it will wait for us and let us
+	 * will see either btpo_parent == metablock or btpo_parent == rootblk.
+	 * In first case it will give up its locks and walk to the leftmost page
+	 * (oldrootbuf) in _bt_fixup() - ie it will wait for us and let us
 	 * continue. In second case it will try to lock rootbuf keeping its locks
 	 * on buffers we already passed, also waiting for us. If we'll have to
 	 * unlock rootbuf (split it) and that process will have to split page
@@ -1547,14 +1532,12 @@ _bt_fixtree(Relation rel, BlockNumber blkno)
  * Check/fix level starting from page in buffer buf up to block
  * limit on *child* level (or till rightmost child page if limit
  * is InvalidBlockNumber). Start buffer must be read locked.
- * No pins/locks are held on exit. Returns block number of last
- * visited/pointing-to-limit page on *check/fix* level.
+ * No pins/locks are held on exit.
  */
-static BlockNumber
+static void
 _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
 {
 	BlockNumber		blkno = BufferGetBlockNumber(buf);
-	BlockNumber		pblkno = blkno;
 	Page			page;
 	BTPageOpaque	opaque;
 	BlockNumber		cblkno[3];
@@ -1639,22 +1622,20 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
 			BTStackData		stack;
 
 			stack.bts_parent = NULL;
-			stack.bts_blkno = pblkno;
+			stack.bts_blkno = blkno;
 			stack.bts_offset = InvalidOffsetNumber;
 			ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid),
 							cblkno[0], P_HIKEY);
 
-			buf = _bt_getstackbuf(rel, &stack);
+			buf = _bt_getstackbuf(rel, &stack, BT_WRITE);
 			if (buf == InvalidBuffer)
 				elog(ERROR, "bt_fixlevel: pointer disappeared (need to recreate index)");
 
 			page = BufferGetPage(buf);
 			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 			coff[0] = stack.bts_offset;
-			pblkno = BufferGetBlockNumber(buf);
-			parblk[0] = pblkno;
-			if (cblkno[0] == limit)
-				blkno = pblkno;		/* where we have seen pointer to limit */
+			blkno = BufferGetBlockNumber(buf);
+			parblk[0] = blkno;
 
 			/* Check/insert missed pointers */
 			for (i = 1; i <= cidx; i++)
@@ -1668,8 +1649,6 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
 					if (parblk[i] == parblk[i - 1] &&
 								coff[i] != coff[i - 1] + 1)
 						elog(ERROR, "bt_fixlevel: invalid item order(2) (need to recreate index)");
-					if (cblkno[i] == limit)
-						blkno = parblk[i];
 					continue;
 				}
 				/* Have to check next page ? */
@@ -1688,10 +1667,8 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
 						buf = newbuf;
 						page = newpage;
 						opaque = newopaque;
-						pblkno = BufferGetBlockNumber(buf);
-						parblk[i] = pblkno;
-						if (cblkno[i] == limit)
-							blkno = pblkno;
+						blkno = BufferGetBlockNumber(buf);
+						parblk[i] = blkno;
 						continue;
 					}
 					/* unfound - need to insert on current page */
@@ -1730,7 +1707,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
 						page = BufferGetPage(buf);
 						opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 					}
-					pblkno = BufferGetBlockNumber(buf);
+					blkno = BufferGetBlockNumber(buf);
 					coff[i] = itup_off;
 				}
 				else
@@ -1740,9 +1717,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
 				}
 
 				pfree(btitem);
-				parblk[i] = pblkno;
-				if (cblkno[i] == limit)
-					blkno = pblkno;
+				parblk[i] = blkno;
 			}
 
 			/* copy page with pointer to cblkno[cidx] to temp storage */
@@ -1750,8 +1725,6 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
 			_bt_relbuf(rel, buf, BT_WRITE);
 			page = (Page)tbuf;
 			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
-			if (limit == InvalidBlockNumber)
-				blkno = pblkno;		/* last visited page */
 		}
 
 		/* Continue if current check/fix level page is rightmost */
@@ -1766,7 +1739,7 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
 		{
 			if (cidx == 2)
 				_bt_relbuf(rel, cbuf[2], BT_READ);
-			return(blkno);
+			return;
 		}
 		if (cblkno[0] == limit || cblkno[1] == limit)
 			goodbye = true;
@@ -1778,6 +1751,168 @@ _bt_fixlevel(Relation rel, Buffer buf, BlockNumber limit)
 	}
 }
 
+/*
+ * Check/fix part of tree - branch - up from parent of level with blocks
+ * lblkno and rblknum. We first ensure that parent level has pointers
+ * to both lblkno & rblknum and if those pointers are on different
+ * parent pages then do the same for parent level, etc. No locks must
+ * be held on target level and upper on entry. No locks will be held
+ * on exit. Stack created when traversing tree down should be provided and
+ * it must points to parent level. rblkno must be on the right from lblkno.
+ * (This function is special edition of more expensive _bt_fixtree(),
+ * but it doesn't guarantee full consistency of tree.)
+ */
+static void
+_bt_fixbranch(Relation rel, BlockNumber lblkno, 
+				BlockNumber rblkno, BTStack true_stack)
+{
+	BlockNumber		blkno = true_stack->bts_blkno;
+	BTStackData		stack;
+	BTPageOpaque	opaque;
+	Buffer			buf, rbuf;
+	Page			page;
+	OffsetNumber	offnum;
+
+	true_stack = true_stack->bts_parent;
+	for ( ; ; )
+	{
+		buf = _bt_getbuf(rel, blkno, BT_READ);
+
+		/* Check/fix parent level pointed by blkno */
+		_bt_fixlevel(rel, buf, rblkno);
+
+		/*
+		 * Here parent level should have pointers for both
+		 * lblkno and rblkno and we have to find them.
+		 */
+		stack.bts_parent = NULL;
+		stack.bts_blkno = blkno;
+		stack.bts_offset = InvalidOffsetNumber;
+		ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), lblkno, P_HIKEY);
+		buf = _bt_getstackbuf(rel, &stack, BT_READ);
+		if (buf == InvalidBuffer)
+			elog(ERROR, "bt_fixbranch: left pointer unfound (need to recreate index)");
+		page = BufferGetPage(buf);
+		offnum = _bt_getoff(page, rblkno);
+
+		if (offnum != InvalidOffsetNumber)	/* right pointer found */
+		{
+			if (offnum <= stack.bts_offset)
+				elog(ERROR, "bt_fixbranch: invalid item order (need to recreate index)");
+			_bt_relbuf(rel, buf, BT_READ);
+			return;
+		}
+
+		/* Pointers are on different parent pages - find right one */
+		lblkno = BufferGetBlockNumber(buf);
+
+		stack.bts_parent = NULL;
+		stack.bts_blkno = lblkno;
+		stack.bts_offset = InvalidOffsetNumber;
+		ItemPointerSet(&(stack.bts_btitem.bti_itup.t_tid), rblkno, P_HIKEY);
+		rbuf = _bt_getstackbuf(rel, &stack, BT_READ);
+		if (rbuf == InvalidBuffer)
+			elog(ERROR, "bt_fixbranch: right pointer unfound (need to recreate index)");
+		rblkno = BufferGetBlockNumber(rbuf);
+		_bt_relbuf(rel, rbuf, BT_READ);
+
+		/*
+		 * If we have parent item in true_stack then go up one level and
+		 * ensure that it has pointers to new lblkno & rblkno.
+		 */
+		if (true_stack)
+		{
+			_bt_relbuf(rel, buf, BT_READ);
+			blkno = true_stack->bts_blkno;
+			true_stack = true_stack->bts_parent;
+			continue;
+		}
+
+		/*
+		 * Well, we are on the level that was root or unexistent when
+		 * we started traversing tree down. If btpo_parent is updated
+		 * then we'll use it to continue, else we'll fix/restore upper
+		 * levels entirely.
+		 */
+		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+		if (opaque->btpo_parent != BTREE_METAPAGE)
+		{
+			blkno = opaque->btpo_parent;
+			_bt_relbuf(rel, buf, BT_READ);
+			continue;
+		}
+
+		/* Have to switch to excl buf lock and re-check btpo_parent */
+		_bt_relbuf(rel, buf, BT_READ);
+		buf = _bt_getbuf(rel, blkno, BT_WRITE);
+		page = BufferGetPage(buf);
+		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+		if (opaque->btpo_parent != BTREE_METAPAGE)
+		{
+			blkno = opaque->btpo_parent;
+			_bt_relbuf(rel, buf, BT_WRITE);
+			continue;
+		}
+
+		/*
+		 * We hold excl lock on some internal page with unupdated
+		 * btpo_parent - time for _bt_fixup.
+		 */
+		break;
+	}
+
+	_bt_fixup(rel, buf);
+
+	return;
+}
+
+/*
+ * Having buf excl locked this routine walks to the left on level and
+ * uses either _bt_fixtree() or _bt_fixroot() to create/check&fix upper
+ * levels. No buffer pins/locks will be held on exit.
+ */
+static void
+_bt_fixup(Relation rel, Buffer buf)
+{
+	Page			page;
+	BTPageOpaque	opaque;
+	BlockNumber		blkno;
+
+	for ( ; ; )
+	{
+		page = BufferGetPage(buf);
+		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+		/*
+		 * If someone else already created parent pages
+		 * then it's time for _bt_fixtree() to check upper
+		 * levels and fix them, if required.
+		 */
+		if (opaque->btpo_parent != BTREE_METAPAGE)
+		{
+			blkno = opaque->btpo_parent;
+			_bt_relbuf(rel, buf, BT_WRITE);
+			_bt_fixtree(rel, blkno);
+			return;
+		}
+		if (P_LEFTMOST(opaque))
+			break;
+		blkno = opaque->btpo_prev;
+		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+		ReleaseBuffer(buf);
+		buf = _bt_getbuf(rel, blkno, BT_WRITE);
+	}
+
+	/*
+	 * Ok, we are on the leftmost page, it's write locked
+	 * by us and its btpo_parent points to meta page - time
+	 * for _bt_fixroot().
+	 */
+	 buf = _bt_fixroot(rel, buf, true);
+	 _bt_relbuf(rel, buf, BT_WRITE);
+
+	return;
+}
+
 static OffsetNumber
 _bt_getoff(Page page, BlockNumber blkno)
 {
-- 
2.24.1