nbtree.c 37.4 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * nbtree.c
4 5
 *	  Implementation of Lehman and Yao's btree management algorithm for
 *	  Postgres.
6
 *
7 8
 * NOTES
 *	  This file contains only the public interface routines.
9 10
 *
 *
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
11 12
 * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
 * Portions Copyright (c) 1994, Regents of the University of California
13
 *
14
 * IDENTIFICATION
15
 *	  $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtree.c,v 1.66 2000/10/20 11:01:03 vadim Exp $
16 17 18 19
 *
 *-------------------------------------------------------------------------
 */

20
#include "postgres.h"
Marc G. Fournier's avatar
Marc G. Fournier committed
21

22 23
#include "access/genam.h"
#include "access/heapam.h"
Bruce Momjian's avatar
Bruce Momjian committed
24
#include "access/nbtree.h"
25
#include "catalog/index.h"
Bruce Momjian's avatar
Bruce Momjian committed
26
#include "executor/executor.h"
27
#include "miscadmin.h"
28
#include "storage/sinval.h"
29

30

31 32 33
bool		BuildingBtree = false;		/* see comment in btbuild() */
bool		FastBuild = true;	/* use sort/build instead of insertion
								 * build */
34

35
static void _bt_restscan(IndexScanDesc scan);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
36

37
/*
38
 *	btbuild() -- build a new btree index.
39
 *
40 41 42 43
 *		We use a global variable to record the fact that we're creating
 *		a new index.  This is used to avoid high-concurrency locking,
 *		since the index won't be visible until this transaction commits
 *		and since building is guaranteed to be single-threaded.
44
 */
45 46
Datum
btbuild(PG_FUNCTION_ARGS)
47
{
48 49
	Relation		heap = (Relation) PG_GETARG_POINTER(0);
	Relation		index = (Relation) PG_GETARG_POINTER(1);
50 51
	IndexInfo	   *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
	Node		   *oldPred = (Node *) PG_GETARG_POINTER(3);
52
#ifdef NOT_USED
53
	IndexStrategy	istrat = (IndexStrategy) PG_GETARG_POINTER(4);
54
#endif
55 56 57 58 59
	HeapScanDesc hscan;
	HeapTuple	htup;
	IndexTuple	itup;
	TupleDesc	htupdesc,
				itupdesc;
60 61
	Datum		attdata[INDEX_MAX_KEYS];
	char		nulls[INDEX_MAX_KEYS];
62 63
	int			nhtups,
				nitups;
64
	Node	   *pred = indexInfo->ii_Predicate;
65
#ifndef OMIT_PARTIAL_INDEX
66 67
	TupleTable	tupleTable;
	TupleTableSlot *slot;
68
#endif
69 70
	ExprContext *econtext;
	InsertIndexResult res = NULL;
71
	BTSpool    *spool = NULL;
72
	BTItem		btitem;
73
	bool		usefast;
74 75 76 77 78 79 80 81 82 83
	Snapshot	snapshot;
	TransactionId	XmaxRecent;
	/*
	 * spool2 is needed only when the index is an unique index.
	 * Dead tuples are put into spool2 instead of spool in
	 * order to avoid uniqueness check.
	 */
	BTSpool		*spool2 = NULL;
	bool		tupleIsAlive;
	int		dead_count;
84

85 86 87
	/* note that this is a new btree */
	BuildingBtree = true;

88
	/*
89 90 91 92
	 * bootstrap processing does something strange, so don't use
	 * sort/build for initial catalog indices.	at some point i need to
	 * look harder at this.  (there is some kind of incremental processing
	 * going on there.) -- pma 08/29/95
93
	 */
94 95 96
	usefast = (FastBuild && IsNormalProcessingMode());

#ifdef BTREE_BUILD_STATS
97
	if (Show_btree_build_stats)
98
		ResetUsage();
99
#endif /* BTREE_BUILD_STATS */
100 101 102 103 104 105

	/* initialize the btree index metadata page (if this is a new index) */
	if (oldPred == NULL)
		_bt_metapinit(index);

	/* get tuple descriptors for heap and index relations */
106 107
	htupdesc = RelationGetDescr(heap);
	itupdesc = RelationGetDescr(index);
108

109
	/*
110 111 112 113
	 * If this is a predicate (partial) index, we will need to evaluate
	 * the predicate using ExecQual, which requires the current tuple to
	 * be in a slot of a TupleTable.  In addition, ExecQual must have an
	 * ExprContext referring to that slot.	Here, we initialize dummy
114 115 116 117
	 * TupleTable and ExprContext objects for this purpose. --Nels, Feb 92
	 *
	 * We construct the ExprContext anyway since we need a per-tuple
	 * temporary memory context for function evaluation -- tgl July 00
118
	 */
119 120 121 122 123
#ifndef OMIT_PARTIAL_INDEX
	if (pred != NULL || oldPred != NULL)
	{
		tupleTable = ExecCreateTupleTable(1);
		slot = ExecAllocTableSlot(tupleTable);
124
		ExecSetSlotDescriptor(slot, htupdesc);
125 126 127 128 129 130 131 132 133 134

		/*
		 * we never want to use sort/build if we are extending an existing
		 * partial index -- it works by inserting the newly-qualifying
		 * tuples into the existing index. (sort/build would overwrite the
		 * existing index with one consisting of the newly-qualifying
		 * tuples.)
		 */
		usefast = false;
	}
135 136 137 138 139 140 141 142
	else
	{
		tupleTable = NULL;
		slot = NULL;
	}
	econtext = MakeExprContext(slot, TransactionCommandContext);
#else
	econtext = MakeExprContext(NULL, TransactionCommandContext);
143
#endif	 /* OMIT_PARTIAL_INDEX */
144 145 146 147 148

	/* build the index */
	nhtups = nitups = 0;

	if (usefast)
149
	{
150
		spool = _bt_spoolinit(index, indexInfo->ii_Unique);
151 152 153 154 155 156 157
		/*
	 	 * Different from spool,the uniqueness isn't checked
		 * for spool2.
	 	 */
		if (indexInfo->ii_Unique)
			spool2 = _bt_spoolinit(index, false);
	}
158

159
	/* start a heap scan */
160 161 162 163 164 165
	dead_count = 0;
	snapshot = (IsBootstrapProcessingMode() ? SnapshotNow : SnapshotAny);
	hscan = heap_beginscan(heap, 0, snapshot, 0, (ScanKey) NULL);
	XmaxRecent = 0;
	if (snapshot == SnapshotAny)
		GetXmaxRecent(&XmaxRecent);
166

167 168
	while (HeapTupleIsValid(htup = heap_getnext(hscan, 0)))
	{
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
		if (snapshot == SnapshotAny)
		{
			tupleIsAlive = HeapTupleSatisfiesNow(htup->t_data);
			if (!tupleIsAlive)
			{
				if ((htup->t_data->t_infomask & HEAP_XMIN_INVALID) != 0)
					continue;
				if (htup->t_data->t_infomask & HEAP_XMAX_COMMITTED &&
					htup->t_data->t_xmax < XmaxRecent)
					continue;
			}
		}
		else
			tupleIsAlive = true;
		
184 185
		MemoryContextReset(econtext->ecxt_per_tuple_memory);

186 187
		nhtups++;

188
#ifndef OMIT_PARTIAL_INDEX
189 190 191 192 193 194 195
		/*
		 * If oldPred != NULL, this is an EXTEND INDEX command, so skip
		 * this tuple if it was already in the existing partial index
		 */
		if (oldPred != NULL)
		{
			slot->val = htup;
196
			if (ExecQual((List *) oldPred, econtext, false))
197 198 199 200 201 202 203 204 205 206 207 208 209
			{
				nitups++;
				continue;
			}
		}

		/*
		 * Skip this tuple if it doesn't satisfy the partial-index
		 * predicate
		 */
		if (pred != NULL)
		{
			slot->val = htup;
210
			if (!ExecQual((List *) pred, econtext, false))
211 212
				continue;
		}
213
#endif	 /* OMIT_PARTIAL_INDEX */
214

215
		nitups++;
216 217 218 219 220

		/*
		 * For the current heap tuple, extract all the attributes we use
		 * in this index, and note which are null.
		 */
221 222 223 224 225 226
		FormIndexDatum(indexInfo,
					   htup,
					   htupdesc,
					   econtext->ecxt_per_tuple_memory,
					   attdata,
					   nulls);
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248

		/* form an index tuple and point it at the heap tuple */
		itup = index_formtuple(itupdesc, attdata, nulls);

		/*
		 * If the single index key is null, we don't insert it into the
		 * index.  Btrees support scans on <, <=, =, >=, and >. Relational
		 * algebra says that A op B (where op is one of the operators
		 * above) returns null if either A or B is null.  This means that
		 * no qualification used in an index scan could ever return true
		 * on a null attribute.  It also means that indices can't be used
		 * by ISNULL or NOTNULL scans, but that's an artifact of the
		 * strategy map architecture chosen in 1986, not of the way nulls
		 * are handled here.
		 */

		/*
		 * New comments: NULLs handling. While we can't do NULL
		 * comparison, we can follow simple rule for ordering items on
		 * btree pages - NULLs greater NOT_NULLs and NULL = NULL is TRUE.
		 * Sure, it's just rule for placing/finding items and no more -
		 * keytest'll return FALSE for a = 5 for items having 'a' isNULL.
249 250
		 * Look at _bt_compare for how it works.
		 *				 - vadim 03/23/97
251 252 253 254
		 *
		 * if (itup->t_info & INDEX_NULL_MASK) { pfree(itup); continue; }
		 */

255
		itup->t_tid = htup->t_self;
256 257 258 259
		btitem = _bt_formitem(itup);

		/*
		 * if we are doing bottom-up btree build, we insert the index into
260
		 * a spool file for subsequent processing.	otherwise, we insert
261 262 263
		 * into the btree.
		 */
		if (usefast)
264 265 266 267 268 269 270 271 272
		{
			if (tupleIsAlive || !spool2)
				_bt_spool(btitem, spool);
			else /* dead tuples are put into spool2 */
			{
				dead_count++;
				_bt_spool(btitem, spool2);
			}
		}
273
		else
274
			res = _bt_doinsert(index, btitem, indexInfo->ii_Unique, heap);
275 276 277 278 279

		pfree(btitem);
		pfree(itup);
		if (res)
			pfree(res);
280
	}
281 282 283

	/* okay, all heap tuples are indexed */
	heap_endscan(hscan);
284 285 286 287 288
	if (spool2 && !dead_count) /* spool2 was found to be unnecessary */
	{
		_bt_spooldestroy(spool2);
		spool2 = NULL;
	}
289

290
#ifndef OMIT_PARTIAL_INDEX
291 292
	if (pred != NULL || oldPred != NULL)
	{
293
		ExecDropTupleTable(tupleTable, true);
294
	}
295
#endif	 /* OMIT_PARTIAL_INDEX */
296
	FreeExprContext(econtext);
297

298
	/*
299 300 301
	 * if we are doing bottom-up btree build, finish the build by (1)
	 * completing the sort of the spool file, (2) inserting the sorted
	 * tuples into btree pages and (3) building the upper levels.
302
	 */
303 304
	if (usefast)
	{
305
		_bt_leafbuild(spool, spool2);
306
		_bt_spooldestroy(spool);
307 308
		if (spool2)
			_bt_spooldestroy(spool2);
309
	}
310 311

#ifdef BTREE_BUILD_STATS
312
	if (Show_btree_build_stats)
313
	{
314
		fprintf(stderr, "BTREE BUILD STATS\n");
315 316
		ShowUsage();
		ResetUsage();
317
	}
318
#endif /* BTREE_BUILD_STATS */
319 320

	/*
321 322
	 * Since we just counted the tuples in the heap, we update its stats
	 * in pg_class to guarantee that the planner takes advantage of the
323 324 325 326 327 328 329
	 * index we just created.  But, only update statistics during normal
	 * index definitions, not for indices on system catalogs created
	 * during bootstrap processing.  We must close the relations before
	 * updating statistics to guarantee that the relcache entries are
	 * flushed when we increment the command counter in UpdateStats(). But
	 * we do not release any locks on the relations; those will be held
	 * until end of transaction.
330
	 */
331 332
	if (IsNormalProcessingMode())
	{
333 334
		Oid			hrelid = RelationGetRelid(heap);
		Oid			irelid = RelationGetRelid(index);
Hiroshi Inoue's avatar
Hiroshi Inoue committed
335
		bool		inplace = IsReindexProcessing();
336 337

		heap_close(heap, NoLock);
338
		index_close(index);
339

Hiroshi Inoue's avatar
Hiroshi Inoue committed
340 341
		UpdateStats(hrelid, nhtups, inplace);
		UpdateStats(irelid, nitups, inplace);
342 343 344 345
		if (oldPred != NULL)
		{
			if (nitups == nhtups)
				pred = NULL;
Hiroshi Inoue's avatar
Hiroshi Inoue committed
346 347
			if (!inplace)
				UpdateIndexPredicate(irelid, oldPred, pred);
348
		}
349 350
	}

351 352
	/* all done */
	BuildingBtree = false;
353

354
	PG_RETURN_VOID();
355 356 357
}

/*
358
 *	btinsert() -- insert an index tuple into a btree.
359
 *
360 361 362
 *		Descend the tree recursively, find the appropriate location for our
 *		new tuple, put it there, set its unique OID as appropriate, and
 *		return an InsertIndexResult to the caller.
363
 */
364 365
Datum
btinsert(PG_FUNCTION_ARGS)
366
{
367 368 369 370 371 372
	Relation		rel = (Relation) PG_GETARG_POINTER(0);
	Datum		   *datum = (Datum *) PG_GETARG_POINTER(1);
	char		   *nulls = (char *) PG_GETARG_POINTER(2);
	ItemPointer		ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
	Relation		heapRel = (Relation) PG_GETARG_POINTER(4);
	InsertIndexResult res;
373 374
	BTItem		btitem;
	IndexTuple	itup;
375 376

	/* generate an index tuple */
377
	itup = index_formtuple(RelationGetDescr(rel), datum, nulls);
378 379 380
	itup->t_tid = *ht_ctid;
	btitem = _bt_formitem(itup);

381
	res = _bt_doinsert(rel, btitem, rel->rd_uniqueindex, heapRel);
382 383 384 385

	pfree(btitem);
	pfree(itup);

386
	PG_RETURN_POINTER(res);
387 388 389
}

/*
390
 *	btgettuple() -- Get the next tuple in the scan.
391
 */
392 393
Datum
btgettuple(PG_FUNCTION_ARGS)
394
{
395 396
	IndexScanDesc		scan = (IndexScanDesc) PG_GETARG_POINTER(0);
	ScanDirection		dir = (ScanDirection) PG_GETARG_INT32(1);
397 398 399 400 401 402 403 404 405
	RetrieveIndexResult res;

	/*
	 * If we've already initialized this scan, we can just advance it in
	 * the appropriate direction.  If we haven't done so yet, we call a
	 * routine to get the first item in the scan.
	 */

	if (ItemPointerIsValid(&(scan->currentItemData)))
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
406 407
	{
		/*
Bruce Momjian's avatar
Bruce Momjian committed
408
		 * Restore scan position using heap TID returned by previous call
409 410
		 * to btgettuple(). _bt_restscan() re-grabs the read lock on
		 * the buffer, too.
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
411 412
		 */
		_bt_restscan(scan);
413
		res = _bt_next(scan, dir);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
414
	}
415 416
	else
		res = _bt_first(scan, dir);
417

418
	/*
419 420 421
	 * Save heap TID to use it in _bt_restscan.  Then release the read
	 * lock on the buffer so that we aren't blocking other backends.
	 * NOTE: we do keep the pin on the buffer!
422
	 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
423
	if (res)
424
	{
425
		((BTScanOpaque) scan->opaque)->curHeapIptr = res->heap_iptr;
426 427
		LockBuffer(((BTScanOpaque) scan->opaque)->btso_curbuf,
				   BUFFER_LOCK_UNLOCK);
428
	}
429

430
	PG_RETURN_POINTER(res);
431 432 433
}

/*
434
 *	btbeginscan() -- start a scan on a btree index
435
 */
436 437
Datum
btbeginscan(PG_FUNCTION_ARGS)
438
{
439 440 441 442
	Relation	rel = (Relation) PG_GETARG_POINTER(0);
	bool		fromEnd = PG_GETARG_BOOL(1);
	uint16		keysz = PG_GETARG_UINT16(2);
	ScanKey		scankey = (ScanKey) PG_GETARG_POINTER(3);
443
	IndexScanDesc scan;
444 445 446 447 448 449 450

	/* get the scan */
	scan = RelationGetIndexScan(rel, fromEnd, keysz, scankey);

	/* register scan in case we change pages it's using */
	_bt_regscan(scan);

451
	PG_RETURN_POINTER(scan);
452 453 454
}

/*
455
 *	btrescan() -- rescan an index relation
456
 */
457 458
Datum
btrescan(PG_FUNCTION_ARGS)
459
{
460 461 462 463 464
	IndexScanDesc	scan = (IndexScanDesc) PG_GETARG_POINTER(0);
#ifdef NOT_USED					/* XXX surely it's wrong to ignore this? */
	bool			fromEnd = PG_GETARG_BOOL(1);
#endif
	ScanKey			scankey = (ScanKey) PG_GETARG_POINTER(2);
465 466
	ItemPointer iptr;
	BTScanOpaque so;
467 468 469

	so = (BTScanOpaque) scan->opaque;

470 471 472 473 474 475 476 477 478 479 480 481
	if (so == NULL)				/* if called from btbeginscan */
	{
		so = (BTScanOpaque) palloc(sizeof(BTScanOpaqueData));
		so->btso_curbuf = so->btso_mrkbuf = InvalidBuffer;
		so->keyData = (ScanKey) NULL;
		if (scan->numberOfKeys > 0)
			so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
		scan->opaque = so;
		scan->flags = 0x0;
	}

	/* we aren't holding any read locks, but gotta drop the pins */
482 483
	if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
	{
484
		ReleaseBuffer(so->btso_curbuf);
485 486 487 488 489 490
		so->btso_curbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

	if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
	{
491
		ReleaseBuffer(so->btso_mrkbuf);
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
		so->btso_mrkbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

	/*
	 * Reset the scan keys. Note that keys ordering stuff moved to
	 * _bt_first.	   - vadim 05/05/97
	 */
	so->numberOfKeys = scan->numberOfKeys;
	if (scan->numberOfKeys > 0)
	{
		memmove(scan->keyData,
				scankey,
				scan->numberOfKeys * sizeof(ScanKeyData));
		memmove(so->keyData,
				scankey,
				so->numberOfKeys * sizeof(ScanKeyData));
	}
Marc G. Fournier's avatar
Marc G. Fournier committed
510

511
	PG_RETURN_VOID();
512 513 514 515 516
}

void
btmovescan(IndexScanDesc scan, Datum v)
{
517 518
	ItemPointer iptr;
	BTScanOpaque so;
519 520 521

	so = (BTScanOpaque) scan->opaque;

522
	/* we aren't holding any read locks, but gotta drop the pin */
523 524
	if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
	{
525
		ReleaseBuffer(so->btso_curbuf);
526 527 528 529 530
		so->btso_curbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

	so->keyData[0].sk_argument = v;
531 532 533
}

/*
534
 *	btendscan() -- close down a scan
535
 */
536 537
Datum
btendscan(PG_FUNCTION_ARGS)
538
{
539
	IndexScanDesc	scan = (IndexScanDesc) PG_GETARG_POINTER(0);
540 541
	ItemPointer iptr;
	BTScanOpaque so;
542 543 544

	so = (BTScanOpaque) scan->opaque;

545
	/* we aren't holding any read locks, but gotta drop the pins */
546 547 548
	if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
	{
		if (BufferIsValid(so->btso_curbuf))
549
			ReleaseBuffer(so->btso_curbuf);
550 551 552 553 554 555 556
		so->btso_curbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

	if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
	{
		if (BufferIsValid(so->btso_mrkbuf))
557
			ReleaseBuffer(so->btso_mrkbuf);
558 559 560 561 562 563 564 565 566
		so->btso_mrkbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

	if (so->keyData != (ScanKey) NULL)
		pfree(so->keyData);
	pfree(so);

	_bt_dropscan(scan);
567

568
	PG_RETURN_VOID();
569 570 571
}

/*
572
 *	btmarkpos() -- save current scan position
573
 */
574 575
Datum
btmarkpos(PG_FUNCTION_ARGS)
576
{
577
	IndexScanDesc	scan = (IndexScanDesc) PG_GETARG_POINTER(0);
578 579
	ItemPointer iptr;
	BTScanOpaque so;
580 581 582

	so = (BTScanOpaque) scan->opaque;

583
	/* we aren't holding any read locks, but gotta drop the pin */
584 585
	if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
	{
586
		ReleaseBuffer(so->btso_mrkbuf);
587 588 589 590
		so->btso_mrkbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

591
	/* bump pin on current buffer for assignment to mark buffer */
592 593
	if (ItemPointerIsValid(&(scan->currentItemData)))
	{
594
		so->btso_mrkbuf = ReadBuffer(scan->relation,
595
								  BufferGetBlockNumber(so->btso_curbuf));
596
		scan->currentMarkData = scan->currentItemData;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
597
		so->mrkHeapIptr = so->curHeapIptr;
598
	}
599

600
	PG_RETURN_VOID();
601 602 603
}

/*
604
 *	btrestrpos() -- restore scan to last saved position
605
 */
606 607
Datum
btrestrpos(PG_FUNCTION_ARGS)
608
{
609
	IndexScanDesc	scan = (IndexScanDesc) PG_GETARG_POINTER(0);
610 611
	ItemPointer iptr;
	BTScanOpaque so;
612 613 614

	so = (BTScanOpaque) scan->opaque;

615
	/* we aren't holding any read locks, but gotta drop the pin */
616 617
	if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
	{
618
		ReleaseBuffer(so->btso_curbuf);
619 620 621 622
		so->btso_curbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

623
	/* bump pin on marked buffer */
624 625
	if (ItemPointerIsValid(&(scan->currentMarkData)))
	{
626
		so->btso_curbuf = ReadBuffer(scan->relation,
627
								  BufferGetBlockNumber(so->btso_mrkbuf));
628
		scan->currentItemData = scan->currentMarkData;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
629
		so->curHeapIptr = so->mrkHeapIptr;
630
	}
631

632
	PG_RETURN_VOID();
633 634 635
}

/* stubs */
636 637
Datum
btdelete(PG_FUNCTION_ARGS)
638
{
639 640 641
	Relation		rel = (Relation) PG_GETARG_POINTER(0);
	ItemPointer		tid = (ItemPointer) PG_GETARG_POINTER(1);

642
	/* adjust any active scans that will be affected by this deletion */
643
	_bt_adjscans(rel, tid);
644 645 646

	/* delete the data from the page */
	_bt_pagedel(rel, tid);
647

648
	PG_RETURN_VOID();
649
}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
650

651 652 653
/*
 * Restore scan position when btgettuple is called to continue a scan.
 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
654 655 656
static void
_bt_restscan(IndexScanDesc scan)
{
657 658 659
	Relation	rel = scan->relation;
	BTScanOpaque so = (BTScanOpaque) scan->opaque;
	Buffer		buf = so->btso_curbuf;
660
	Page		page;
661 662
	ItemPointer current = &(scan->currentItemData);
	OffsetNumber offnum = ItemPointerGetOffsetNumber(current),
663 664
				maxoff;
	BTPageOpaque opaque;
665 666 667
	ItemPointerData target = so->curHeapIptr;
	BTItem		item;
	BlockNumber blkno;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
668

669 670 671 672 673 674
	/*
	 * Get back the read lock we were holding on the buffer.
	 * (We still have a reference-count pin on it, though.)
	 */
	LockBuffer(buf, BT_READ);

675 676 677
	page = BufferGetPage(buf);
	maxoff = PageGetMaxOffsetNumber(page);
	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
678

679
	/*
Bruce Momjian's avatar
Bruce Momjian committed
680 681 682
	 * We use this as flag when first index tuple on page is deleted but
	 * we do not move left (this would slowdown vacuum) - so we set
	 * current->ip_posid before first index tuple on the current page
683 684 685 686
	 * (_bt_step will move it right)...
	 */
	if (!ItemPointerIsValid(&target))
	{
687 688
		ItemPointerSetOffsetNumber(current,
								   OffsetNumberPrev(P_FIRSTDATAKEY(opaque)));
689 690 691
		return;
	}

692 693 694 695 696
	/*
	 * The item we were on may have moved right due to insertions.
	 * Find it again.
	 */
	for (;;)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
697
	{
698
		/* Check for item on this page */
699
		for (;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
700 701 702 703
			 offnum <= maxoff;
			 offnum = OffsetNumberNext(offnum))
		{
			item = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
704 705 706 707
			if (item->bti_itup.t_tid.ip_blkid.bi_hi ==
				target.ip_blkid.bi_hi &&
				item->bti_itup.t_tid.ip_blkid.bi_lo ==
				target.ip_blkid.bi_lo &&
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
708 709 710 711 712 713 714
				item->bti_itup.t_tid.ip_posid == target.ip_posid)
			{
				current->ip_posid = offnum;
				return;
			}
		}

715 716 717
		/*
		 * By here, the item we're looking for moved right at least one page
		 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
718
		if (P_RIGHTMOST(opaque))
719 720
			elog(FATAL, "_bt_restscan: my bits moved right off the end of the world!"
				 "\n\tRecreate index %s.", RelationGetRelationName(rel));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
721 722 723 724 725 726 727

		blkno = opaque->btpo_next;
		_bt_relbuf(rel, buf, BT_READ);
		buf = _bt_getbuf(rel, blkno, BT_READ);
		page = BufferGetPage(buf);
		maxoff = PageGetMaxOffsetNumber(page);
		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
728 729 730
		offnum = P_FIRSTDATAKEY(opaque);
		ItemPointerSet(current, blkno, offnum);
		so->btso_curbuf = buf;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
731 732
	}
}
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839

#ifdef XLOG
void btree_redo(XLogRecPtr lsn, XLogRecord *record)
{
	uint8	info = record->xl_info & ~XLR_INFO_MASK;

	if (info == XLOG_BTREE_DELETE)
		btree_xlog_delete(true, lsn, record);
	else if (info == XLOG_BTREE_INSERT)
		btree_xlog_insert(true, lsn, record);
	else if (info == XLOG_BTREE_SPLIT)
		btree_xlog_split(true, false, lsn, record);	/* new item on the right */
	else if (info == XLOG_BTREE_SPLEFT)
		btree_xlog_split(true, true, lsn, record);	/* new item on the left */
	else if (info == XLOG_BTREE_NEWROOT)
		btree_xlog_newroot(true, lsn, record);
	else
		elog(STOP, "btree_redo: unknown op code %u", info);
}

void btree_undo(XLogRecPtr lsn, XLogRecord *record)
{
	uint8	info = record->xl_info & ~XLR_INFO_MASK;

	if (info == XLOG_BTREE_DELETE)
		btree_xlog_delete(false, lsn, record);
	else if (info == XLOG_BTREE_INSERT)
		btree_xlog_insert(false, lsn, record);
	else if (info == XLOG_BTREE_SPLIT)
		btree_xlog_split(false, false, lsn, record);/* new item on the right */
	else if (info == XLOG_BTREE_SPLEFT)
		btree_xlog_split(false, true, lsn, record);	/* new item on the left */
	else if (info == XLOG_BTREE_NEWROOT)
		btree_xlog_newroot(false, lsn, record);
	else
		elog(STOP, "btree_undo: unknown op code %u", info);
}

static void btree_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
	xl_btree_delete	   *xlrec;
	Relation		   *reln;
	Buffer				buffer;
	Page				page;

	if (!redo)
		return;

	xlrec = (xl_btree_delete*) XLogRecGetData(record);
	reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
	if (!RelationIsValid(reln))
		return;
	buffer = XLogReadBuffer(false, reln, 
				ItemPointerGetBlockNumber(&(xlrec->target.tid)));
	if (!BufferIsValid(buffer))
		elog(STOP, "btree_delete_redo: block unfound");
	page = (Page) BufferGetPage(buffer);
	if (PageIsNew((PageHeader) page))
		elog(STOP, "btree_delete_redo: uninitialized page");

	PageIndexTupleDelete(page, ItemPointerGetOffsetNumber(&(xlrec->target.tid)));

	return;
}

static void btree_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
	xl_btree_insert	   *xlrec;
	Relation		   *reln;
	Buffer				buffer;
	Page				page;
	BTPageOpaque		pageop;

	xlrec = (xl_btree_insert*) XLogRecGetData(record);
	reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
	if (!RelationIsValid(reln))
		return;
	buffer = XLogReadBuffer((redo) ? true : false, reln, 
				ItemPointerGetBlockNumber(&(xlrec->target.tid)));
	if (!BufferIsValid(buffer))
		return;
	page = (Page) BufferGetPage(buffer);
	if (PageIsNew((PageHeader) page))
		elog(STOP, "btree_insert_%s: uninitialized page",
			(redo) ? "redo" : "undo");
	pageop = (BTPageOpaque) PageGetSpecialPointer(page);

	if (redo)
	{
		if (XLByteLE(lsn, PageGetLSN(page)))
			UnlockAndReleaseBuffer(buffer);
		else
		{
			Size		hsize = SizeOfBtreeInsert;
			RelFileNode	hnode;

			if (P_ISLEAF(pageop))
			{
				hsize += (sizeof(CommandId) + sizeof(RelFileNode));
				memcpy(&hnode, (char*)xlrec + SizeOfBtreeInsert + 
							sizeof(CommandId), sizeof(RelFileNode));
			}

			if (! _bt_add_item(page, 
					ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
					(char*)xlrec + hsize,
					record->xl_len - hsize,
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
840
					hnode))
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910
				elog(STOP, "btree_insert_redo: failed to add item");

			PageSetLSN(page, lsn);
			PageSetSUI(page, ThisStartUpID);
			UnlockAndWriteBuffer(buffer);
		}
	}
	else
	{
		BTItemData		btdata;

		if (XLByteLT(PageGetLSN(page), lsn))
			elog(STOP, "btree_insert_undo: bad page LSN");

		if (! P_ISLEAF(pageop))
		{
			UnlockAndReleaseBuffer(buffer);
			return;
		}

		memcpy(&btdata, (char*)xlrec + SizeOfBtreeInsert + 
			sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData));

		_bt_del_item(reln, buffer, &btdata, true, lsn, record);

	}

	return;
}

static void
btree_xlog_split(bool redo, bool onleft, XLogRecPtr lsn, XLogRecord *record)
{
	xl_btree_split	   *xlrec;
	Relation		   *reln;
	BlockNumber			blkno;
	BlockNumber			parent;
	Buffer				buffer;
	Page				page;
	BTPageOpaque		pageop;
	char			   *op = (redo) ? "redo" : "undo";
	bool				isleaf;

	xlrec = (xl_btree_split*) XLogRecGetData(record);
	reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->target.node);
	if (!RelationIsValid(reln))
		return;

	/* Left (original) sibling */
	blkno = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
					BlockIdGetBlockNumber(xlrec->otherblk);
	buffer = XLogReadBuffer(false, reln, blkno);
	if (!BufferIsValid(buffer))
		elog(STOP, "btree_split_%s: lost left sibling", op);

	page = (Page) BufferGetPage(buffer);
	if (PageIsNew((PageHeader) page))
		elog(STOP, "btree_split_%s: uninitialized left sibling", op);

	pageop = (BTPageOpaque) PageGetSpecialPointer(page);
	isleaf = P_ISLEAF(pageop);
	parent = pageop->btpo_parent;

	if (redo)
	{
		if (XLByteLE(lsn, PageGetLSN(page)))
			UnlockAndReleaseBuffer(buffer);
		else
		{
			/* Delete items related to new right sibling */
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
911
			_bt_fix_left_page(page, record, onleft);
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
912 913 914 915 916 917 918 919 920 921 922 923 924 925 926

			if (onleft)
			{
				BTItemData	btdata;
				Size		hsize = SizeOfBtreeSplit;
				Size		itemsz;
				RelFileNode	hnode;

				pageop->btpo_next = BlockIdGetBlockNumber(xlrec->otherblk);
				if (isleaf)
				{
					hsize += (sizeof(CommandId) + sizeof(RelFileNode));
					memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + 
								sizeof(CommandId), sizeof(RelFileNode));
				}
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
927 928 929 930 931 932 933
				else
				{
					memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
					itemsz = IndexTupleDSize(btdata.bti_itup) +
								(sizeof(BTItemData) - sizeof(IndexTupleData));
					hsize += itemsz;
				}
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
934 935 936 937 938 939 940 941 942

				memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
				itemsz = IndexTupleDSize(btdata.bti_itup) +
							(sizeof(BTItemData) - sizeof(IndexTupleData));

				if (! _bt_add_item(page, 
						ItemPointerGetOffsetNumber(&(xlrec->target.tid)),
						(char*)xlrec + hsize,
						itemsz,
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
943
						hnode))
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003
					elog(STOP, "btree_split_redo: failed to add item");
			}
			else
				pageop->btpo_next = ItemPointerGetBlockNumber(&(xlrec->target.tid));

			PageSetLSN(page, lsn);
			PageSetSUI(page, ThisStartUpID);
			UnlockAndWriteBuffer(buffer);
		}
	}
	else	/* undo */
	{
		if (XLByteLT(PageGetLSN(page), lsn))
			elog(STOP, "btree_split_undo: bad left sibling LSN");

		if (! isleaf || ! onleft)
			UnlockAndReleaseBuffer(buffer);
		else
		{
			BTItemData		btdata;

			memcpy(&btdata, (char*)xlrec + SizeOfBtreeSplit + 
				sizeof(CommandId) + sizeof(RelFileNode), sizeof(BTItemData));

			_bt_del_item(reln, buffer, &btdata, false, lsn, record);
		}
	}

	/* Right (new) sibling */
	blkno = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) : 
					ItemPointerGetBlockNumber(&(xlrec->target.tid));
	buffer = XLogReadBuffer((redo) ? true : false, reln, blkno);
	if (!BufferIsValid(buffer))
		elog(STOP, "btree_split_%s: lost right sibling", op);

	page = (Page) BufferGetPage(buffer);
	if (PageIsNew((PageHeader) page))
	{
		if (!redo)
			elog(STOP, "btree_split_undo: uninitialized right sibling");
		PageInit(page, BufferGetPageSize(buffer), 0);
	}

	if (redo)
	{
		if (XLByteLE(lsn, PageGetLSN(page)))
			UnlockAndReleaseBuffer(buffer);
		else
		{
			Size		hsize = SizeOfBtreeSplit;
			BTItemData	btdata;
			Size		itemsz;

			_bt_pageinit(page, BufferGetPageSize(buffer));
			pageop = (BTPageOpaque) PageGetSpecialPointer(page);
			if (isleaf)
			{
				pageop->btpo_flags |= BTP_LEAF;
				hsize += (sizeof(CommandId) + sizeof(RelFileNode));
			}
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1004 1005 1006 1007 1008 1009 1010
			else
			{
				memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
				itemsz = IndexTupleDSize(btdata.bti_itup) +
							(sizeof(BTItemData) - sizeof(IndexTupleData));
				hsize += itemsz;
			}
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214
			if (onleft)		/* skip target item */
			{
				memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
				itemsz = IndexTupleDSize(btdata.bti_itup) +
							(sizeof(BTItemData) - sizeof(IndexTupleData));
				hsize += itemsz;
			}

			for (char* item = (char*)xlrec + hsize;
					item < (char*)record + record->xl_len; )
			{
				memcpy(&btdata, item, sizeof(BTItemData));
				itemsz = IndexTupleDSize(btdata.bti_itup) +
							(sizeof(BTItemData) - sizeof(IndexTupleData));
				itemsz = MAXALIGN(itemsz);
				if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber,	
						LP_USED) == InvalidOffsetNumber)
					elog(STOP, "btree_split_redo: can't add item to right sibling");
				item += itemsz;
			}

			pageop->btpo_prev = (onleft) ? ItemPointerGetBlockNumber(&(xlrec->target.tid)) :
					BlockIdGetBlockNumber(xlrec->otherblk);
			pageop->btpo_next = BlockIdGetBlockNumber(xlrec->rightblk);
			pageop->btpo_parent = parent;

			PageSetLSN(page, lsn);
			PageSetSUI(page, ThisStartUpID);
			UnlockAndWriteBuffer(buffer);
		}
	}
	else	/* undo */
	{
		if (XLByteLT(PageGetLSN(page), lsn))
			elog(STOP, "btree_split_undo: bad right sibling LSN");

		if (! isleaf || onleft)
			UnlockAndReleaseBuffer(buffer);
		else
		{
			char		tbuf[BLCKSZ];
			int			cnt;
			char	   *item;
			Size		itemsz;

			item = (char*)xlrec + SizeOfBtreeSplit +
					sizeof(CommandId) + sizeof(RelFileNode);
			for (cnt = 0; item < (char*)record + record->xl_len; )
			{
				BTItem	btitem = (BTItem)
					(tbuf + cnt * (MAXALIGN(sizeof(BTItemData))));
				memcpy(btitem, item, sizeof(BTItemData));
				itemsz = IndexTupleDSize(btitem->bti_itup) +
							(sizeof(BTItemData) - sizeof(IndexTupleData));
				itemsz = MAXALIGN(itemsz);
				item += itemsz;
				cnt++;
			}
			cnt -= ItemPointerGetOffsetNumber(&(xlrec->target.tid));
			if (cnt < 0)
				elog(STOP, "btree_split_undo: target item unfound in right sibling");

			item = tbuf + cnt * (MAXALIGN(sizeof(BTItemData)));

			_bt_del_item(reln, buffer, (BTItem)item, false, lsn, record);
		}
	}

	/* Right (next) page */
	blkno = BlockIdGetBlockNumber(xlrec->rightblk);
	buffer = XLogReadBuffer(false, reln, blkno);
	if (!BufferIsValid(buffer))
		elog(STOP, "btree_split_%s: lost next right page", op);

	page = (Page) BufferGetPage(buffer);
	if (PageIsNew((PageHeader) page))
		elog(STOP, "btree_split_%s: uninitialized next right page", op);

	if (redo)
	{
		if (XLByteLE(lsn, PageGetLSN(page)))
			UnlockAndReleaseBuffer(buffer);
		else
		{
			pageop = (BTPageOpaque) PageGetSpecialPointer(page);
			pageop->btpo_prev = (onleft) ? BlockIdGetBlockNumber(xlrec->otherblk) :
					ItemPointerGetBlockNumber(&(xlrec->target.tid));

			PageSetLSN(page, lsn);
			PageSetSUI(page, ThisStartUpID);
			UnlockAndWriteBuffer(buffer);
		}
	}
	else	/* undo */
	{
		if (XLByteLT(PageGetLSN(page), lsn))
			elog(STOP, "btree_split_undo: bad next right page LSN");

		UnlockAndReleaseBuffer(buffer);
	}

}

static void btree_xlog_newroot(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
	xl_btree_newroot   *xlrec;
	Relation		   *reln;
	Buffer				buffer;
	Page				page;
	Buffer				metabuf;
	Page				metapg;

	if (!redo)
		return;

	xlrec = (xl_btree_newroot*) XLogRecGetData(record);
	reln = XLogOpenRelation(redo, RM_BTREE_ID, xlrec->node);
	if (!RelationIsValid(reln))
		return;
	buffer = XLogReadBuffer(true, reln, BlockIdGetBlockNumber(&(xlrec->rootblk)));
	if (!BufferIsValid(buffer))
		elog(STOP, "btree_newroot_redo: no root page");
	metabuf = XLogReadBuffer(false, reln, BTREE_METAPAGE);
	if (!BufferIsValid(buffer))
		elog(STOP, "btree_newroot_redo: no metapage");
	page = (Page) BufferGetPage(buffer);

	if (PageIsNew((PageHeader) page) || XLByteLT(PageGetLSN(page), lsn))
	{
		_bt_pageinit(page, BufferGetPageSize(buffer));
		pageop = (BTPageOpaque) PageGetSpecialPointer(page);

		pageop->btpo_flags |= BTP_ROOT;
		pageop->btpo_prev = pageop->btpo_next = P_NONE;
		pageop->btpo_parent = BTREE_METAPAGE;

		if (record->xl_len == SizeOfBtreeNewroot)	/* no childs */
			pageop->btpo_flags |= BTP_LEAF;
		else
		{
			BTItemData	btdata;
			Size		itemsz;

			for (char* item = (char*)xlrec + SizeOfBtreeNewroot;
					item < (char*)record + record->xl_len; )
			{
				memcpy(&btdata, item, sizeof(BTItemData));
				itemsz = IndexTupleDSize(btdata.bti_itup) +
							(sizeof(BTItemData) - sizeof(IndexTupleData));
				itemsz = MAXALIGN(itemsz);
				if (PageAddItem(page, (Item) item, itemsz, FirstOffsetNumber,	
						LP_USED) == InvalidOffsetNumber)
					elog(STOP, "btree_newroot_redo: can't add item");
				item += itemsz;
			}
		}

		PageSetLSN(page, lsn);
		PageSetSUI(page, ThisStartUpID);
		UnlockAndWriteBuffer(buffer);
	}
	else
		UnlockAndReleaseBuffer(buffer);

	metapg = BufferGetPage(metabuf);
	if (PageIsNew((PageHeader) metapg))
	{
		BTMetaPageData	md;

		_bt_pageinit(metapg, BufferGetPageSize(metabuf));
		md.btm_magic = BTREE_MAGIC;
		md.btm_version = BTREE_VERSION;
		md.btm_root = P_NONE;
		md.btm_level = 0;
		memcpy((char *) BTPageGetMeta(pg), (char *) &md, sizeof(md));
	}

	if (XLByteLT(PageGetLSN(metapg), lsn))
	{
		BTMetaPageData	   *metad = BTPageGetMeta(metapg);

		metad->btm_root = BlockIdGetBlockNumber(&(xlrec->rootblk));
		(metad->btm_level)++;
		PageSetLSN(metapg, lsn);
		PageSetSUI(metapg, ThisStartUpID);
		UnlockAndWriteBuffer(metabuf);
	}
	else
		UnlockAndReleaseBuffer(metabuf);

	return;
}

/*
 * UNDO insertion on *leaf* page: 
 * - find inserted tuple;
 * - delete it if heap tuple was inserted by the same xaction
 */
static void
_bt_del_item(Relation reln, Buffer buffer, BTItem btitem, bool insert, 
				XLogRecPtr lsn, XLogRecord *record)
{
	char		   *xlrec = (char*) XLogRecGetData(record);
	Page			page = (Page) BufferGetPage(buffer);
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1215
	BTPageOpaque	pageop = (BTPageOpaque) PageGetSpecialPointer(page);
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1216 1217 1218
	BlockNumber		blkno;
	OffsetNumber	offno;
	ItemId			lp;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1219
	BTItem			item;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1220 1221 1222

	for ( ; ; )
	{
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234
		OffsetNumber	maxoff = PageGetMaxOffsetNumber(page);

		for (offno = P_FIRSTDATAKEY(pageop);
			 offno <= maxoff;
			 offno = OffsetNumberNext(offno))
		{
			lp = PageGetItemId(page, offno);
			item = (BTItem) PageGetItem(page, lp);
			if (BTItemSame(item, btitem))
				break;
		}
		if (offno <= maxoff)
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1235
			break;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1236
		offno = InvalidOffsetNumber;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248
		if (P_RIGHTMOST(pageop))
			break;
		blkno = pageop->btpo_next;
		UnlockAndReleaseBuffer(buffer);
		buffer = XLogReadBuffer(false, reln, blkno);
		if (!BufferIsValid(buffer))
			elog(STOP, "btree_%s_undo: lost right sibling",
				(insert) ? "insert" : "split");
		page = (Page) BufferGetPage(buffer);
		if (PageIsNew((PageHeader) page))
			elog(STOP, "btree_%s_undo: uninitialized right sibling",
				(insert) ? "insert" : "split");
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1249
		pageop = (BTPageOpaque) PageGetSpecialPointer(page);
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263
		if (XLByteLT(PageGetLSN(page), lsn))
			break;
	}

	if (offno == InvalidOffsetNumber)	/* not found */
	{
		if (!InRecovery)
			elog(STOP, "btree_%s_undo: lost target tuple in rollback",
				(insert) ? "insert" : "split");
		UnlockAndReleaseBuffer(buffer);
		return;
	}

	lp = PageGetItemId(page, offno);
1264 1265

	if (InRecovery)					/* check heap tuple */
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1266
	{
1267
		if (!ItemIdDeleted(lp))
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1268
		{
1269 1270 1271 1272
			int			result;
			CommandId	cid;
			RelFileNode	hnode;
			Size		hsize = (insert) ? SizeOfBtreeInsert : SizeOfBtreeSplit;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1273

1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284
			memcpy(&cid, (char*)xlrec + hsize, sizeof(CommandId));
			memcpy(&hnode, (char*)xlrec + hsize + sizeof(CommandId), sizeof(RelFileNode));
			result = XLogIsOwnerOfTuple(hnode, &(btitem->bti_itup.t_tid),
						record->xl_xid, cid);
			if (result < 0)	/* not owner */
			{
				UnlockAndReleaseBuffer(buffer);
				return;
			}
		}
		PageIndexTupleDelete(page, offno);
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1285 1286
		pageop = (BTPageOpaque) PageGetSpecialPointer(page);
		pageop->btpo_flags |= BTP_REORDER;
1287 1288
		UnlockAndWriteBuffer(buffer);
		return;
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1289 1290
	}

1291 1292 1293 1294 1295 1296 1297
	/* normal rollback */
	if (ItemIdDeleted(lp))	/* marked for deletion ?! */
		elog(STOP, "btree_%s_undo: deleted target tuple in rollback",
			(insert) ? "insert" : "split");

	lp->lp_flags |= LP_DELETE;
	MarkBufferForCleanup(buffer, IndexPageCleanup);
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1298 1299 1300 1301 1302
	return;
}

static bool
_bt_add_item(Page page, OffsetNumber offno, 
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1303
	char* item, Size size, RelFileNode hnode)
Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333
{
	BTPageOpaque	pageop = (BTPageOpaque) PageGetSpecialPointer(page);

	if (offno > PageGetMaxOffsetNumber(page) + 1)
	{
		if (! (pageop->btpo_flags & BTP_REORDER))
		{
			elog(NOTICE, "btree_add_item: BTP_REORDER flag was expected");
			pageop->btpo_flags |= BTP_REORDER;
		}
		offno = PageGetMaxOffsetNumber(page) + 1;
	}

	if (PageAddItem(page, (Item) item, size, offno,	
			LP_USED) == InvalidOffsetNumber)
	{
		/* ops, not enough space - try to deleted dead tuples */
		bool		result;

		if (! P_ISLEAF(pageop))
			return(false);
		result = _bt_cleanup_page(page, hnode);
		if (!result || PageAddItem(page, (Item) item, size, offno,	
				LP_USED) == InvalidOffsetNumber)
			return(false);
	}

	return(true);
}

Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454
static bool
_bt_cleanup_page(Page page, RelFileNode hnode)
{
	OffsetNumber	maxoff = PageGetMaxOffsetNumber(page);
	OffsetNumber	offno;
	ItemId			lp;
	BTItem			item;
	bool			result = false;

	for (offno = P_FIRSTDATAKEY(pageop); offno <= maxoff; )
	{
		lp = PageGetItemId(page, offno);
		item = (BTItem) PageGetItem(page, lp);
		if (XLogIsValidTuple(hnode, &(item->bti_itup.t_tid))
			offno = OffsetNumberNext(offno);
		else
		{
			PageIndexTupleDelete(page, offno);
			maxoff = PageGetMaxOffsetNumber(page);
			result = true;
		}
	}

	return(result);
}

/*
 * Remove from left sibling items belonging to right sibling
 * and change P_HIKEY
 */
static void
_bt_fix_left_page(Page page, XLogRecord *record, bool onleft)
{
	char		   *xlrec = (char*) XLogRecGetData(record);
	BTPageOpaque	pageop = (BTPageOpaque) PageGetSpecialPointer(page);
	Size			hsize = SizeOfBtreeSplit;
	RelFileNode		hnode;
	BTItemData		btdata;
	OffsetNumber	maxoff = PageGetMaxOffsetNumber(page);
	OffsetNumber	offno;
	char		   *item;
	Size			itemsz;
	char		   *previtem = NULL;
	char		   *lhikey = NULL;
	Size			lhisize = 0;

	if (pageop->btpo_flags & BTP_LEAF)
	{
		hsize += (sizeof(CommandId) + sizeof(RelFileNode));
		memcpy(&hnode, (char*)xlrec + SizeOfBtreeSplit + 
					sizeof(CommandId), sizeof(RelFileNode));
	}
	else
	{
		lhikey = (char*)xlrec + hsize;
		memcpy(&btdata, lhikey, sizeof(BTItemData));
		lhisize = IndexTupleDSize(btdata.bti_itup) +
					(sizeof(BTItemData) - sizeof(IndexTupleData));
		hsize += lhisize;
	}

	if (! P_RIGHTMOST(pageop))
		PageIndexTupleDelete(page, P_HIKEY);

	if (onleft)		/* skip target item */
	{
		memcpy(&btdata, (char*)xlrec + hsize, sizeof(BTItemData));
		itemsz = IndexTupleDSize(btdata.bti_itup) +
					(sizeof(BTItemData) - sizeof(IndexTupleData));
		hsize += itemsz;
	}

	for (item = (char*)xlrec + hsize; ; )
	{
		memcpy(&btdata, item, sizeof(BTItemData));
		for (offno = P_FIRSTDATAKEY(pageop);
			 offno <= maxoff;
			 offno = OffsetNumberNext(offno))
		{
			ItemId	lp = PageGetItemId(page, offno);
			BTItem	btitem = (BTItem) PageGetItem(page, lp);

			if (BTItemSame(&btdata, btitem))
			{
				PageIndexTupleDelete(page, offno);
				break;
			}
		}

		itemsz = IndexTupleDSize(btdata.bti_itup) +
					(sizeof(BTItemData) - sizeof(IndexTupleData));
		itemsz = MAXALIGN(itemsz);

		if (item + itemsz < (char*)record + record->xl_len)
		{
			previtem = item;
			item += itemsz;
		}
		else
			break;
	}

	/* time to insert hi-key */
	if (pageop->btpo_flags & BTP_LEAF)
	{
		lhikey = (P_RIGHTMOST(pageop)) ? item : previtem;
		memcpy(&btdata, lhikey, sizeof(BTItemData));
		lhisize = IndexTupleDSize(btdata.bti_itup) +
					(sizeof(BTItemData) - sizeof(IndexTupleData));
	}

	if (! _bt_add_item(page, 
			P_HIKEY,
			lhikey,
			lhisize,
			&hnode))
		elog(STOP, "btree_split_redo: failed to add hi key to left sibling");

	return;
}

Vadim B. Mikheev's avatar
WAL  
Vadim B. Mikheev committed
1455
#endif