nbtree.c 24.3 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * nbtree.c
4 5
 *	  Implementation of Lehman and Yao's btree management algorithm for
 *	  Postgres.
6
 *
7 8
 * NOTES
 *	  This file contains only the public interface routines.
9 10
 *
 *
Bruce Momjian's avatar
Bruce Momjian committed
11
 * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
12
 * Portions Copyright (c) 1994, Regents of the University of California
13
 *
14
 * IDENTIFICATION
15
 *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtree.c,v 1.122 2004/11/17 03:13:38 neilc Exp $
16 17 18
 *
 *-------------------------------------------------------------------------
 */
19
#include "postgres.h"
Marc G. Fournier's avatar
Marc G. Fournier committed
20

21 22
#include "access/genam.h"
#include "access/heapam.h"
Bruce Momjian's avatar
Bruce Momjian committed
23
#include "access/nbtree.h"
24
#include "catalog/index.h"
25
#include "commands/vacuum.h"
26
#include "miscadmin.h"
27
#include "storage/freespace.h"
28
#include "storage/smgr.h"
29

30

31 32 33 34 35 36 37 38
/* Working state for btbuild and its callback */
typedef struct
{
	bool		usefast;
	bool		isUnique;
	bool		haveDead;
	Relation	heapRel;
	BTSpool    *spool;
39

40 41 42 43 44 45 46 47 48
	/*
	 * spool2 is needed only when the index is an unique index. Dead
	 * tuples are put into spool2 instead of spool in order to avoid
	 * uniqueness check.
	 */
	BTSpool    *spool2;
	double		indtuples;
} BTBuildState;

49

50
bool		FastBuild = true;	/* use SORT instead of insertion build */
51

52
static void _bt_restscan(IndexScanDesc scan);
53
static void btbuildCallback(Relation index,
54 55 56 57 58
				HeapTuple htup,
				Datum *attdata,
				char *nulls,
				bool tupleIsAlive,
				void *state);
59 60


61
/*
62
 *	btbuild() -- build a new btree index.
63
 */
64 65
Datum
btbuild(PG_FUNCTION_ARGS)
66
{
67 68 69
	Relation	heap = (Relation) PG_GETARG_POINTER(0);
	Relation	index = (Relation) PG_GETARG_POINTER(1);
	IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
70 71
	double		reltuples;
	BTBuildState buildstate;
72

73
	/*
74 75 76 77
	 * bootstrap processing does something strange, so don't use
	 * sort/build for initial catalog indices.	at some point i need to
	 * look harder at this.  (there is some kind of incremental processing
	 * going on there.) -- pma 08/29/95
78
	 */
79 80 81 82 83 84 85
	buildstate.usefast = (FastBuild && IsNormalProcessingMode());
	buildstate.isUnique = indexInfo->ii_Unique;
	buildstate.haveDead = false;
	buildstate.heapRel = heap;
	buildstate.spool = NULL;
	buildstate.spool2 = NULL;
	buildstate.indtuples = 0;
86 87

#ifdef BTREE_BUILD_STATS
88
	if (log_btree_build_stats)
89
		ResetUsage();
90
#endif   /* BTREE_BUILD_STATS */
91

92
	/*
93 94
	 * We expect to be called exactly once for any index relation. If
	 * that's not the case, big trouble's what we have.
95
	 */
96
	if (RelationGetNumberOfBlocks(index) != 0)
97
		elog(ERROR, "index \"%s\" already contains data",
98
			 RelationGetRelationName(index));
99

100
	if (buildstate.usefast)
101
	{
102
		buildstate.spool = _bt_spoolinit(index, indexInfo->ii_Unique, false);
103

104
		/*
105 106
		 * If building a unique index, put dead tuples in a second spool
		 * to keep them out of the uniqueness check.
107
		 */
108
		if (indexInfo->ii_Unique)
109
			buildstate.spool2 = _bt_spoolinit(index, false, true);
110
	}
111 112 113 114 115
	else
	{
		/* if using slow build, initialize the btree index metadata page */
		_bt_metapinit(index);
	}
116

117 118 119
	/* do the heap scan */
	reltuples = IndexBuildHeapScan(heap, index, indexInfo,
								   btbuildCallback, (void *) &buildstate);
120 121

	/* okay, all heap tuples are indexed */
122
	if (buildstate.spool2 && !buildstate.haveDead)
123
	{
124 125 126
		/* spool2 turns out to be unnecessary */
		_bt_spooldestroy(buildstate.spool2);
		buildstate.spool2 = NULL;
127
	}
128

129
	/*
130 131 132
	 * if we are doing bottom-up btree build, finish the build by (1)
	 * completing the sort of the spool file, (2) inserting the sorted
	 * tuples into btree pages and (3) building the upper levels.
133
	 */
134
	if (buildstate.usefast)
135
	{
136 137 138 139
		_bt_leafbuild(buildstate.spool, buildstate.spool2);
		_bt_spooldestroy(buildstate.spool);
		if (buildstate.spool2)
			_bt_spooldestroy(buildstate.spool2);
140
	}
141 142

#ifdef BTREE_BUILD_STATS
143
	if (log_btree_build_stats)
144
	{
145
		ShowUsage("BTREE BUILD STATS");
146
		ResetUsage();
147
	}
148
#endif   /* BTREE_BUILD_STATS */
149 150

	/*
151 152
	 * Since we just counted the tuples in the heap, we update its stats
	 * in pg_class to guarantee that the planner takes advantage of the
153 154 155 156 157 158 159
	 * index we just created.  But, only update statistics during normal
	 * index definitions, not for indices on system catalogs created
	 * during bootstrap processing.  We must close the relations before
	 * updating statistics to guarantee that the relcache entries are
	 * flushed when we increment the command counter in UpdateStats(). But
	 * we do not release any locks on the relations; those will be held
	 * until end of transaction.
160
	 */
161 162
	if (IsNormalProcessingMode())
	{
163 164
		Oid			hrelid = RelationGetRelid(heap);
		Oid			irelid = RelationGetRelid(index);
165 166

		heap_close(heap, NoLock);
167
		index_close(index);
168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
		UpdateStats(hrelid, reltuples);
		UpdateStats(irelid, buildstate.indtuples);
	}

	PG_RETURN_VOID();
}

/*
 * Per-tuple callback from IndexBuildHeapScan
 */
static void
btbuildCallback(Relation index,
				HeapTuple htup,
				Datum *attdata,
				char *nulls,
				bool tupleIsAlive,
				void *state)
{
186
	BTBuildState *buildstate = (BTBuildState *) state;
187 188 189 190 191 192 193 194 195 196 197
	IndexTuple	itup;
	BTItem		btitem;
	InsertIndexResult res;

	/* form an index tuple and point it at the heap tuple */
	itup = index_formtuple(RelationGetDescr(index), attdata, nulls);
	itup->t_tid = htup->t_self;

	btitem = _bt_formitem(itup);

	/*
198 199 200
	 * if we are doing bottom-up btree build, we insert the index into a
	 * spool file for subsequent processing.  otherwise, we insert into
	 * the btree.
201 202 203 204 205 206
	 */
	if (buildstate->usefast)
	{
		if (tupleIsAlive || buildstate->spool2 == NULL)
			_bt_spool(btitem, buildstate->spool);
		else
207
		{
208 209 210
			/* dead tuples are put into spool2 */
			buildstate->haveDead = true;
			_bt_spool(btitem, buildstate->spool2);
211
		}
212
	}
213 214 215 216 217 218 219
	else
	{
		res = _bt_doinsert(index, btitem,
						   buildstate->isUnique, buildstate->heapRel);
		if (res)
			pfree(res);
	}
220

221
	buildstate->indtuples += 1;
222

223 224
	pfree(btitem);
	pfree(itup);
225 226 227
}

/*
228
 *	btinsert() -- insert an index tuple into a btree.
229
 *
230 231 232
 *		Descend the tree recursively, find the appropriate location for our
 *		new tuple, put it there, set its unique OID as appropriate, and
 *		return an InsertIndexResult to the caller.
233
 */
234 235
Datum
btinsert(PG_FUNCTION_ARGS)
236
{
237 238 239 240 241
	Relation	rel = (Relation) PG_GETARG_POINTER(0);
	Datum	   *datum = (Datum *) PG_GETARG_POINTER(1);
	char	   *nulls = (char *) PG_GETARG_POINTER(2);
	ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
	Relation	heapRel = (Relation) PG_GETARG_POINTER(4);
242
	bool		checkUnique = PG_GETARG_BOOL(5);
243
	InsertIndexResult res;
244 245
	BTItem		btitem;
	IndexTuple	itup;
246 247

	/* generate an index tuple */
248
	itup = index_formtuple(RelationGetDescr(rel), datum, nulls);
249 250 251
	itup->t_tid = *ht_ctid;
	btitem = _bt_formitem(itup);

252
	res = _bt_doinsert(rel, btitem, checkUnique, heapRel);
253 254 255 256

	pfree(btitem);
	pfree(itup);

257
	PG_RETURN_POINTER(res);
258 259 260
}

/*
261
 *	btgettuple() -- Get the next tuple in the scan.
262
 */
263 264
Datum
btgettuple(PG_FUNCTION_ARGS)
265
{
266 267
	IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
	ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
268 269 270 271
	BTScanOpaque so = (BTScanOpaque) scan->opaque;
	Page		page;
	OffsetNumber offnum;
	bool		res;
272 273 274 275 276 277 278

	/*
	 * If we've already initialized this scan, we can just advance it in
	 * the appropriate direction.  If we haven't done so yet, we call a
	 * routine to get the first item in the scan.
	 */
	if (ItemPointerIsValid(&(scan->currentItemData)))
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
279 280
	{
		/*
Bruce Momjian's avatar
Bruce Momjian committed
281
		 * Restore scan position using heap TID returned by previous call
282 283
		 * to btgettuple(). _bt_restscan() re-grabs the read lock on the
		 * buffer, too.
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
284 285
		 */
		_bt_restscan(scan);
Bruce Momjian's avatar
Bruce Momjian committed
286

287 288 289 290 291 292
		/*
		 * Check to see if we should kill the previously-fetched tuple.
		 */
		if (scan->kill_prior_tuple)
		{
			/*
Bruce Momjian's avatar
Bruce Momjian committed
293 294
			 * Yes, so mark it by setting the LP_DELETE bit in the item
			 * flags.
295 296 297 298
			 */
			offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
			page = BufferGetPage(so->btso_curbuf);
			PageGetItemId(page, offnum)->lp_flags |= LP_DELETE;
Bruce Momjian's avatar
Bruce Momjian committed
299

300 301
			/*
			 * Since this can be redone later if needed, it's treated the
Bruce Momjian's avatar
Bruce Momjian committed
302 303
			 * same as a commit-hint-bit status update for heap tuples: we
			 * mark the buffer dirty but don't make a WAL log entry.
304 305 306
			 */
			SetBufferCommitInfoNeedsSave(so->btso_curbuf);
		}
Bruce Momjian's avatar
Bruce Momjian committed
307

308 309 310
		/*
		 * Now continue the scan.
		 */
311
		res = _bt_next(scan, dir);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
312
	}
313 314
	else
		res = _bt_first(scan, dir);
315

316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
	/*
	 * Skip killed tuples if asked to.
	 */
	if (scan->ignore_killed_tuples)
	{
		while (res)
		{
			offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
			page = BufferGetPage(so->btso_curbuf);
			if (!ItemIdDeleted(PageGetItemId(page, offnum)))
				break;
			res = _bt_next(scan, dir);
		}
	}

331
	/*
332
	 * Save heap TID to use it in _bt_restscan.  Then release the read
333 334
	 * lock on the buffer so that we aren't blocking other backends.
	 *
335
	 * NOTE: we do keep the pin on the buffer!	This is essential to ensure
336
	 * that someone else doesn't delete the index entry we are stopped on.
337
	 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
338
	if (res)
339
	{
340
		((BTScanOpaque) scan->opaque)->curHeapIptr = scan->xs_ctup.t_self;
341 342
		LockBuffer(((BTScanOpaque) scan->opaque)->btso_curbuf,
				   BUFFER_LOCK_UNLOCK);
343
	}
344

345
	PG_RETURN_BOOL(res);
346 347 348
}

/*
349
 *	btbeginscan() -- start a scan on a btree index
350
 */
351 352
Datum
btbeginscan(PG_FUNCTION_ARGS)
353
{
354
	Relation	rel = (Relation) PG_GETARG_POINTER(0);
355 356
	int			keysz = PG_GETARG_INT32(1);
	ScanKey		scankey = (ScanKey) PG_GETARG_POINTER(2);
357
	IndexScanDesc scan;
358 359

	/* get the scan */
360
	scan = RelationGetIndexScan(rel, keysz, scankey);
361

362
	PG_RETURN_POINTER(scan);
363 364 365
}

/*
366
 *	btrescan() -- rescan an index relation
367
 */
368 369
Datum
btrescan(PG_FUNCTION_ARGS)
370
{
371
	IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
372
	ScanKey		scankey = (ScanKey) PG_GETARG_POINTER(1);
373 374
	ItemPointer iptr;
	BTScanOpaque so;
375 376 377

	so = (BTScanOpaque) scan->opaque;

378 379 380 381
	if (so == NULL)				/* if called from btbeginscan */
	{
		so = (BTScanOpaque) palloc(sizeof(BTScanOpaqueData));
		so->btso_curbuf = so->btso_mrkbuf = InvalidBuffer;
382 383
		ItemPointerSetInvalid(&(so->curHeapIptr));
		ItemPointerSetInvalid(&(so->mrkHeapIptr));
384 385
		if (scan->numberOfKeys > 0)
			so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
386
		else
387
			so->keyData = NULL;
388 389 390 391
		scan->opaque = so;
	}

	/* we aren't holding any read locks, but gotta drop the pins */
392 393
	if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
	{
394
		ReleaseBuffer(so->btso_curbuf);
395
		so->btso_curbuf = InvalidBuffer;
396
		ItemPointerSetInvalid(&(so->curHeapIptr));
397 398 399 400 401
		ItemPointerSetInvalid(iptr);
	}

	if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
	{
402
		ReleaseBuffer(so->btso_mrkbuf);
403
		so->btso_mrkbuf = InvalidBuffer;
404
		ItemPointerSetInvalid(&(so->mrkHeapIptr));
405 406 407 408 409 410 411
		ItemPointerSetInvalid(iptr);
	}

	/*
	 * Reset the scan keys. Note that keys ordering stuff moved to
	 * _bt_first.	   - vadim 05/05/97
	 */
412
	if (scankey && scan->numberOfKeys > 0)
413 414 415
		memmove(scan->keyData,
				scankey,
				scan->numberOfKeys * sizeof(ScanKeyData));
416
	so->numberOfKeys = 0;		/* until _bt_preprocess_keys sets it */
Marc G. Fournier's avatar
Marc G. Fournier committed
417

418
	PG_RETURN_VOID();
419 420 421
}

/*
422
 *	btendscan() -- close down a scan
423
 */
424 425
Datum
btendscan(PG_FUNCTION_ARGS)
426
{
427
	IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
428 429
	ItemPointer iptr;
	BTScanOpaque so;
430 431 432

	so = (BTScanOpaque) scan->opaque;

433
	/* we aren't holding any read locks, but gotta drop the pins */
434 435 436
	if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
	{
		if (BufferIsValid(so->btso_curbuf))
437
			ReleaseBuffer(so->btso_curbuf);
438 439 440 441 442 443 444
		so->btso_curbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

	if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
	{
		if (BufferIsValid(so->btso_mrkbuf))
445
			ReleaseBuffer(so->btso_mrkbuf);
446 447 448 449
		so->btso_mrkbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

450
	if (so->keyData != NULL)
451 452 453
		pfree(so->keyData);
	pfree(so);

454
	PG_RETURN_VOID();
455 456 457
}

/*
458
 *	btmarkpos() -- save current scan position
459
 */
460 461
Datum
btmarkpos(PG_FUNCTION_ARGS)
462
{
463
	IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
464 465
	ItemPointer iptr;
	BTScanOpaque so;
466 467 468

	so = (BTScanOpaque) scan->opaque;

469
	/* we aren't holding any read locks, but gotta drop the pin */
470 471
	if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
	{
472
		ReleaseBuffer(so->btso_mrkbuf);
473 474 475 476
		so->btso_mrkbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

477
	/* bump pin on current buffer for assignment to mark buffer */
478 479
	if (ItemPointerIsValid(&(scan->currentItemData)))
	{
480 481
		IncrBufferRefCount(so->btso_curbuf);
		so->btso_mrkbuf = so->btso_curbuf;
482
		scan->currentMarkData = scan->currentItemData;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
483
		so->mrkHeapIptr = so->curHeapIptr;
484
	}
485

486
	PG_RETURN_VOID();
487 488 489
}

/*
490
 *	btrestrpos() -- restore scan to last saved position
491
 */
492 493
Datum
btrestrpos(PG_FUNCTION_ARGS)
494
{
495
	IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
496 497
	ItemPointer iptr;
	BTScanOpaque so;
498 499 500

	so = (BTScanOpaque) scan->opaque;

501
	/* we aren't holding any read locks, but gotta drop the pin */
502 503
	if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
	{
504
		ReleaseBuffer(so->btso_curbuf);
505 506 507 508
		so->btso_curbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

509
	/* bump pin on marked buffer */
510 511
	if (ItemPointerIsValid(&(scan->currentMarkData)))
	{
512 513
		IncrBufferRefCount(so->btso_mrkbuf);
		so->btso_curbuf = so->btso_mrkbuf;
514
		scan->currentItemData = scan->currentMarkData;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
515
		so->curHeapIptr = so->mrkHeapIptr;
516
	}
517

518
	PG_RETURN_VOID();
519 520
}

521 522 523 524 525 526 527
/*
 * Bulk deletion of all index entries pointing to a set of heap tuples.
 * The set of target tuples is specified via a callback routine that tells
 * whether any given heap tuple (identified by ItemPointer) is being deleted.
 *
 * Result: a palloc'd struct containing statistical info for VACUUM displays.
 */
528
Datum
529
btbulkdelete(PG_FUNCTION_ARGS)
530
{
531
	Relation	rel = (Relation) PG_GETARG_POINTER(0);
532 533 534 535 536
	IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
	void	   *callback_state = (void *) PG_GETARG_POINTER(2);
	IndexBulkDeleteResult *result;
	double		tuples_removed;
	double		num_index_tuples;
537 538 539 540
	OffsetNumber deletable[BLCKSZ / sizeof(OffsetNumber)];
	int			ndeletable;
	Buffer		buf;
	BlockNumber num_pages;
541 542 543 544 545

	tuples_removed = 0;
	num_index_tuples = 0;

	/*
546
	 * The outer loop iterates over index leaf pages, the inner over items
Bruce Momjian's avatar
Bruce Momjian committed
547 548
	 * on a leaf page.	We issue just one _bt_delitems() call per page, so
	 * as to minimize WAL traffic.
549
	 *
Bruce Momjian's avatar
Bruce Momjian committed
550 551 552 553 554 555 556 557 558 559 560
	 * Note that we exclusive-lock every leaf page containing data items, in
	 * sequence left to right.	It sounds attractive to only
	 * exclusive-lock those containing items we need to delete, but
	 * unfortunately that is not safe: we could then pass a stopped
	 * indexscan, which could in rare cases lead to deleting the item it
	 * needs to find when it resumes.  (See _bt_restscan --- this could
	 * only happen if an indexscan stops on a deletable item and then a
	 * page split moves that item into a page further to its right, which
	 * the indexscan will have no pin on.)	We can skip obtaining
	 * exclusive lock on empty pages though, since no indexscan could be
	 * stopped on those.
561
	 */
562 563
	buf = _bt_get_endpoint(rel, 0, false);
	if (BufferIsValid(buf))		/* check for empty index */
564
	{
565
		for (;;)
566 567
		{
			Page		page;
568
			BTPageOpaque opaque;
569 570 571
			OffsetNumber offnum,
						minoff,
						maxoff;
Bruce Momjian's avatar
Bruce Momjian committed
572
			BlockNumber nextpage;
573

574
			vacuum_delay_point();
Jan Wieck's avatar
Jan Wieck committed
575

576
			ndeletable = 0;
577
			page = BufferGetPage(buf);
578 579 580 581 582
			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
			minoff = P_FIRSTDATAKEY(opaque);
			maxoff = PageGetMaxOffsetNumber(page);
			/* We probably cannot see deleted pages, but skip 'em if so */
			if (minoff <= maxoff && !P_ISDELETED(opaque))
583
			{
584 585 586 587
				/*
				 * Trade in the initial read lock for a super-exclusive
				 * write lock on this page.
				 */
588 589
				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
				LockBufferForCleanup(buf);
Bruce Momjian's avatar
Bruce Momjian committed
590

591
				/*
Bruce Momjian's avatar
Bruce Momjian committed
592 593
				 * Recompute minoff/maxoff, both of which could have
				 * changed while we weren't holding the lock.
594
				 */
595 596
				minoff = P_FIRSTDATAKEY(opaque);
				maxoff = PageGetMaxOffsetNumber(page);
Bruce Momjian's avatar
Bruce Momjian committed
597

598
				/*
599 600
				 * Scan over all items to see which ones need deleted
				 * according to the callback function.
601
				 */
602 603 604 605 606 607 608 609
				for (offnum = minoff;
					 offnum <= maxoff;
					 offnum = OffsetNumberNext(offnum))
				{
					BTItem		btitem;
					ItemPointer htup;

					btitem = (BTItem) PageGetItem(page,
Bruce Momjian's avatar
Bruce Momjian committed
610
											PageGetItemId(page, offnum));
611 612 613 614 615 616 617 618 619 620
					htup = &(btitem->bti_itup.t_tid);
					if (callback(htup, callback_state))
					{
						deletable[ndeletable++] = offnum;
						tuples_removed += 1;
					}
					else
						num_index_tuples += 1;
				}
			}
Bruce Momjian's avatar
Bruce Momjian committed
621

622 623 624 625 626 627 628 629 630
			/*
			 * If we need to delete anything, do it and write the buffer;
			 * else just release the buffer.
			 */
			nextpage = opaque->btpo_next;
			if (ndeletable > 0)
			{
				_bt_delitems(rel, buf, deletable, ndeletable);
				_bt_wrtbuf(rel, buf);
631 632
			}
			else
633 634 635 636 637 638
				_bt_relbuf(rel, buf);
			/* And advance to next page, if any */
			if (nextpage == P_NONE)
				break;
			buf = _bt_getbuf(rel, nextpage, BT_READ);
		}
639 640 641 642 643
	}

	/* return statistics */
	num_pages = RelationGetNumberOfBlocks(rel);

644
	result = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
645 646
	result->num_pages = num_pages;
	result->num_index_tuples = num_index_tuples;
647
	result->tuples_removed = tuples_removed;
648 649

	PG_RETURN_POINTER(result);
650
}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
651

652 653 654 655 656 657 658 659 660 661 662 663 664 665 666
/*
 * Post-VACUUM cleanup.
 *
 * Here, we scan looking for pages we can delete or return to the freelist.
 *
 * Result: a palloc'd struct containing statistical info for VACUUM displays.
 */
Datum
btvacuumcleanup(PG_FUNCTION_ARGS)
{
	Relation	rel = (Relation) PG_GETARG_POINTER(0);
	IndexVacuumCleanupInfo *info = (IndexVacuumCleanupInfo *) PG_GETARG_POINTER(1);
	IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(2);
	BlockNumber num_pages;
	BlockNumber blkno;
667
	BlockNumber *freePages;
668 669
	int			nFreePages,
				maxFreePages;
670 671 672
	BlockNumber pages_deleted = 0;
	MemoryContext mycontext;
	MemoryContext oldcontext;
673 674 675 676 677 678 679 680

	Assert(stats != NULL);

	num_pages = RelationGetNumberOfBlocks(rel);

	/* No point in remembering more than MaxFSMPages pages */
	maxFreePages = MaxFSMPages;
	if ((BlockNumber) maxFreePages > num_pages)
681
		maxFreePages = (int) num_pages;
682
	freePages = (BlockNumber *) palloc(maxFreePages * sizeof(BlockNumber));
683 684
	nFreePages = 0;

685 686 687 688 689 690 691
	/* Create a temporary memory context to run _bt_pagedel in */
	mycontext = AllocSetContextCreate(CurrentMemoryContext,
									  "_bt_pagedel",
									  ALLOCSET_DEFAULT_MINSIZE,
									  ALLOCSET_DEFAULT_INITSIZE,
									  ALLOCSET_DEFAULT_MAXSIZE);

692 693 694 695 696
	/*
	 * Scan through all pages of index, except metapage.  (Any pages added
	 * after we start the scan will not be examined; this should be fine,
	 * since they can't possibly be empty.)
	 */
Bruce Momjian's avatar
Bruce Momjian committed
697
	for (blkno = BTREE_METAPAGE + 1; blkno < num_pages; blkno++)
698
	{
Bruce Momjian's avatar
Bruce Momjian committed
699 700
		Buffer		buf;
		Page		page;
701 702 703 704 705
		BTPageOpaque opaque;

		buf = _bt_getbuf(rel, blkno, BT_READ);
		page = BufferGetPage(buf);
		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
706
		if (_bt_page_recyclable(page))
707
		{
708
			/* Okay to recycle this page */
709
			if (nFreePages < maxFreePages)
710
				freePages[nFreePages++] = blkno;
711 712 713 714 715 716
			pages_deleted++;
		}
		else if (P_ISDELETED(opaque))
		{
			/* Already deleted, but can't recycle yet */
			pages_deleted++;
717
		}
718
		else if ((opaque->btpo_flags & BTP_HALF_DEAD) ||
719
				 P_FIRSTDATAKEY(opaque) > PageGetMaxOffsetNumber(page))
720 721
		{
			/* Empty, try to delete */
Bruce Momjian's avatar
Bruce Momjian committed
722
			int			ndel;
723 724 725 726 727 728

			/* Run pagedel in a temp context to avoid memory leakage */
			MemoryContextReset(mycontext);
			oldcontext = MemoryContextSwitchTo(mycontext);

			ndel = _bt_pagedel(rel, buf, info->vacuum_full);
729 730 731 732

			/* count only this page, else may double-count parent */
			if (ndel)
				pages_deleted++;
733 734 735 736

			/*
			 * During VACUUM FULL it's okay to recycle deleted pages
			 * immediately, since there can be no other transactions
Bruce Momjian's avatar
Bruce Momjian committed
737
			 * scanning the index.	Note that we will only recycle the
738 739 740 741 742 743 744 745
			 * current page and not any parent pages that _bt_pagedel
			 * might have recursed to; this seems reasonable in the name
			 * of simplicity.  (Trying to do otherwise would mean we'd
			 * have to sort the list of recyclable pages we're building.)
			 */
			if (ndel && info->vacuum_full)
			{
				if (nFreePages < maxFreePages)
746
					freePages[nFreePages++] = blkno;
747 748 749 750 751
			}

			MemoryContextSwitchTo(oldcontext);
			continue;			/* pagedel released buffer */
		}
752 753 754
		_bt_relbuf(rel, buf);
	}

755
	/*
Bruce Momjian's avatar
Bruce Momjian committed
756 757 758 759
	 * During VACUUM FULL, we truncate off any recyclable pages at the end
	 * of the index.  In a normal vacuum it'd be unsafe to do this except
	 * by acquiring exclusive lock on the index and then rechecking all
	 * the pages; doesn't seem worth it.
760 761 762
	 */
	if (info->vacuum_full && nFreePages > 0)
	{
Bruce Momjian's avatar
Bruce Momjian committed
763
		BlockNumber new_pages = num_pages;
764

Bruce Momjian's avatar
Bruce Momjian committed
765
		while (nFreePages > 0 && freePages[nFreePages - 1] == new_pages - 1)
766 767 768 769 770 771 772 773 774 775 776
		{
			new_pages--;
			pages_deleted--;
			nFreePages--;
		}
		if (new_pages != num_pages)
		{
			/*
			 * Okay to truncate.
			 *
			 * First, flush any shared buffers for the blocks we intend to
Bruce Momjian's avatar
Bruce Momjian committed
777 778 779 780
			 * delete.	FlushRelationBuffers is a bit more than we need
			 * for this, since it will also write out dirty buffers for
			 * blocks we aren't deleting, but it's the closest thing in
			 * bufmgr's API.
781
			 */
782
			FlushRelationBuffers(rel, new_pages);
783 784 785 786

			/*
			 * Do the physical truncation.
			 */
787
			RelationTruncate(rel, new_pages);
788 789 790 791
			num_pages = new_pages;
		}
	}

792 793
	/*
	 * Update the shared Free Space Map with the info we now have about
794
	 * free pages in the index, discarding any old info the map may have.
795 796
	 * We do not need to sort the page numbers; they're in order already.
	 */
797
	RecordIndexFreeSpace(&rel->rd_node, nFreePages, freePages);
798

799
	pfree(freePages);
800

801 802
	MemoryContextDelete(mycontext);

803 804
	/* update statistics */
	stats->num_pages = num_pages;
805
	stats->pages_deleted = pages_deleted;
806 807 808 809 810
	stats->pages_free = nFreePages;

	PG_RETURN_POINTER(stats);
}

811 812
/*
 * Restore scan position when btgettuple is called to continue a scan.
813 814 815 816 817 818 819
 *
 * This is nontrivial because concurrent insertions might have moved the
 * index tuple we stopped on.  We assume the tuple can only have moved to
 * the right from our stop point, because we kept a pin on the buffer,
 * and so no deletion can have occurred on that page.
 *
 * On entry, we have a pin but no read lock on the buffer that contained
Bruce Momjian's avatar
Bruce Momjian committed
820
 * the index tuple we stopped the scan on.	On exit, we have pin and read
821 822
 * lock on the buffer that now contains that index tuple, and the scandesc's
 * current position is updated to point at it.
823
 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
824 825 826
static void
_bt_restscan(IndexScanDesc scan)
{
827
	Relation	rel = scan->indexRelation;
828 829
	BTScanOpaque so = (BTScanOpaque) scan->opaque;
	Buffer		buf = so->btso_curbuf;
830
	Page		page;
831 832
	ItemPointer current = &(scan->currentItemData);
	OffsetNumber offnum = ItemPointerGetOffsetNumber(current),
833 834
				maxoff;
	BTPageOpaque opaque;
835
	Buffer		nextbuf;
836
	ItemPointer target = &(so->curHeapIptr);
837 838
	BTItem		item;
	BlockNumber blkno;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
839

840
	/*
Bruce Momjian's avatar
Bruce Momjian committed
841 842
	 * Reacquire read lock on the buffer.  (We should still have a
	 * reference-count pin on it, so need not get that.)
843 844 845
	 */
	LockBuffer(buf, BT_READ);

846 847 848
	page = BufferGetPage(buf);
	maxoff = PageGetMaxOffsetNumber(page);
	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
849

850
	/*
Bruce Momjian's avatar
Bruce Momjian committed
851 852 853
	 * We use this as flag when first index tuple on page is deleted but
	 * we do not move left (this would slowdown vacuum) - so we set
	 * current->ip_posid before first index tuple on the current page
854
	 * (_bt_step will move it right)...  XXX still needed?
855
	 */
856
	if (!ItemPointerIsValid(target))
857
	{
858
		ItemPointerSetOffsetNumber(current,
859
							   OffsetNumberPrev(P_FIRSTDATAKEY(opaque)));
860 861 862
		return;
	}

863
	/*
864
	 * The item we were on may have moved right due to insertions. Find it
865
	 * again.  We use the heap TID to identify the item uniquely.
866 867
	 */
	for (;;)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
868
	{
869
		/* Check for item on this page */
870
		for (;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
871 872 873 874
			 offnum <= maxoff;
			 offnum = OffsetNumberNext(offnum))
		{
			item = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
875
			if (BTTidSame(item->bti_itup.t_tid, *target))
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
876
			{
877
				/* Found it */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
878 879 880 881 882
				current->ip_posid = offnum;
				return;
			}
		}

883
		/*
884
		 * The item we're looking for moved right at least one page, so
Bruce Momjian's avatar
Bruce Momjian committed
885 886 887 888 889
		 * move right.	We are careful here to pin and read-lock the next
		 * non-dead page before releasing the current one.	This ensures
		 * that a concurrent btbulkdelete scan cannot pass our position
		 * --- if it did, it might be able to reach and delete our target
		 * item before we can find it again.
890
		 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
891
		if (P_RIGHTMOST(opaque))
892 893
			elog(ERROR, "failed to re-find previous key in \"%s\"",
				 RelationGetRelationName(rel));
894 895 896 897 898
		/* Advance to next non-dead page --- there must be one */
		nextbuf = InvalidBuffer;
		for (;;)
		{
			blkno = opaque->btpo_next;
899
			nextbuf = _bt_relandgetbuf(rel, nextbuf, blkno, BT_READ);
900 901 902 903 904
			page = BufferGetPage(nextbuf);
			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
			if (!P_IGNORE(opaque))
				break;
			if (P_RIGHTMOST(opaque))
905
				elog(ERROR, "fell off the end of \"%s\"",
906 907
					 RelationGetRelationName(rel));
		}
908
		_bt_relbuf(rel, buf);
909
		so->btso_curbuf = buf = nextbuf;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
910
		maxoff = PageGetMaxOffsetNumber(page);
911 912
		offnum = P_FIRSTDATAKEY(opaque);
		ItemPointerSet(current, blkno, offnum);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
913 914
	}
}