nbtree.c 25 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * nbtree.c
4 5
 *	  Implementation of Lehman and Yao's btree management algorithm for
 *	  Postgres.
6
 *
7 8
 * NOTES
 *	  This file contains only the public interface routines.
9 10
 *
 *
Bruce Momjian's avatar
Bruce Momjian committed
11
 * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
12
 * Portions Copyright (c) 1994, Regents of the University of California
13
 *
14
 * IDENTIFICATION
15
 *	  $PostgreSQL: pgsql/src/backend/access/nbtree/nbtree.c,v 1.115 2004/05/08 19:09:24 tgl Exp $
16 17 18
 *
 *-------------------------------------------------------------------------
 */
19
#include "postgres.h"
Marc G. Fournier's avatar
Marc G. Fournier committed
20

21 22
#include "access/genam.h"
#include "access/heapam.h"
Bruce Momjian's avatar
Bruce Momjian committed
23
#include "access/nbtree.h"
24
#include "catalog/index.h"
25
#include "commands/vacuum.h"
26
#include "miscadmin.h"
27
#include "storage/freespace.h"
28
#include "storage/smgr.h"
29

30

31 32 33 34 35 36 37 38
/* Working state for btbuild and its callback */
typedef struct
{
	bool		usefast;
	bool		isUnique;
	bool		haveDead;
	Relation	heapRel;
	BTSpool    *spool;
39

40 41 42 43 44 45 46 47 48
	/*
	 * spool2 is needed only when the index is an unique index. Dead
	 * tuples are put into spool2 instead of spool in order to avoid
	 * uniqueness check.
	 */
	BTSpool    *spool2;
	double		indtuples;
} BTBuildState;

49

50
bool		FastBuild = true;	/* use SORT instead of insertion build */
51

52
static void _bt_restscan(IndexScanDesc scan);
53
static void btbuildCallback(Relation index,
54 55 56 57 58
				HeapTuple htup,
				Datum *attdata,
				char *nulls,
				bool tupleIsAlive,
				void *state);
59 60 61 62 63 64 65 66


/*
 * AtEOXact_nbtree() --- clean up nbtree subsystem at xact abort or commit.
 */
void
AtEOXact_nbtree(void)
{
67
	/* nothing to do at the moment */
68 69
}

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
70

71
/*
72
 *	btbuild() -- build a new btree index.
73
 *
74 75 76 77
 *		We use a global variable to record the fact that we're creating
 *		a new index.  This is used to avoid high-concurrency locking,
 *		since the index won't be visible until this transaction commits
 *		and since building is guaranteed to be single-threaded.
78
 */
79 80
Datum
btbuild(PG_FUNCTION_ARGS)
81
{
82 83 84
	Relation	heap = (Relation) PG_GETARG_POINTER(0);
	Relation	index = (Relation) PG_GETARG_POINTER(1);
	IndexInfo  *indexInfo = (IndexInfo *) PG_GETARG_POINTER(2);
85 86
	double		reltuples;
	BTBuildState buildstate;
87

88
	/*
89 90 91 92
	 * bootstrap processing does something strange, so don't use
	 * sort/build for initial catalog indices.	at some point i need to
	 * look harder at this.  (there is some kind of incremental processing
	 * going on there.) -- pma 08/29/95
93
	 */
94 95 96 97 98 99 100
	buildstate.usefast = (FastBuild && IsNormalProcessingMode());
	buildstate.isUnique = indexInfo->ii_Unique;
	buildstate.haveDead = false;
	buildstate.heapRel = heap;
	buildstate.spool = NULL;
	buildstate.spool2 = NULL;
	buildstate.indtuples = 0;
101 102

#ifdef BTREE_BUILD_STATS
103
	if (log_btree_build_stats)
104
		ResetUsage();
105
#endif   /* BTREE_BUILD_STATS */
106

107
	/*
108 109
	 * We expect to be called exactly once for any index relation. If
	 * that's not the case, big trouble's what we have.
110
	 */
111
	if (RelationGetNumberOfBlocks(index) != 0)
112
		elog(ERROR, "index \"%s\" already contains data",
113
			 RelationGetRelationName(index));
114

115
	/* initialize the btree index metadata page */
116 117
	/* mark it valid right away only if using slow build */
	_bt_metapinit(index, !buildstate.usefast);
118

119
	if (buildstate.usefast)
120
	{
121
		buildstate.spool = _bt_spoolinit(index, indexInfo->ii_Unique, false);
122

123
		/*
124 125
		 * If building a unique index, put dead tuples in a second spool
		 * to keep them out of the uniqueness check.
126
		 */
127
		if (indexInfo->ii_Unique)
128
			buildstate.spool2 = _bt_spoolinit(index, false, true);
129
	}
130

131 132 133
	/* do the heap scan */
	reltuples = IndexBuildHeapScan(heap, index, indexInfo,
								   btbuildCallback, (void *) &buildstate);
134 135

	/* okay, all heap tuples are indexed */
136
	if (buildstate.spool2 && !buildstate.haveDead)
137
	{
138 139 140
		/* spool2 turns out to be unnecessary */
		_bt_spooldestroy(buildstate.spool2);
		buildstate.spool2 = NULL;
141
	}
142

143
	/*
144 145 146
	 * if we are doing bottom-up btree build, finish the build by (1)
	 * completing the sort of the spool file, (2) inserting the sorted
	 * tuples into btree pages and (3) building the upper levels.
147
	 */
148
	if (buildstate.usefast)
149
	{
150 151 152 153
		_bt_leafbuild(buildstate.spool, buildstate.spool2);
		_bt_spooldestroy(buildstate.spool);
		if (buildstate.spool2)
			_bt_spooldestroy(buildstate.spool2);
154
	}
155 156

#ifdef BTREE_BUILD_STATS
157
	if (log_btree_build_stats)
158
	{
159
		ShowUsage("BTREE BUILD STATS");
160
		ResetUsage();
161
	}
162
#endif   /* BTREE_BUILD_STATS */
163 164

	/*
165 166
	 * Since we just counted the tuples in the heap, we update its stats
	 * in pg_class to guarantee that the planner takes advantage of the
167 168 169 170 171 172 173
	 * index we just created.  But, only update statistics during normal
	 * index definitions, not for indices on system catalogs created
	 * during bootstrap processing.  We must close the relations before
	 * updating statistics to guarantee that the relcache entries are
	 * flushed when we increment the command counter in UpdateStats(). But
	 * we do not release any locks on the relations; those will be held
	 * until end of transaction.
174
	 */
175 176
	if (IsNormalProcessingMode())
	{
177 178
		Oid			hrelid = RelationGetRelid(heap);
		Oid			irelid = RelationGetRelid(index);
179 180

		heap_close(heap, NoLock);
181
		index_close(index);
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
		UpdateStats(hrelid, reltuples);
		UpdateStats(irelid, buildstate.indtuples);
	}

	PG_RETURN_VOID();
}

/*
 * Per-tuple callback from IndexBuildHeapScan
 */
static void
btbuildCallback(Relation index,
				HeapTuple htup,
				Datum *attdata,
				char *nulls,
				bool tupleIsAlive,
				void *state)
{
200
	BTBuildState *buildstate = (BTBuildState *) state;
201 202 203 204 205 206 207 208 209 210 211
	IndexTuple	itup;
	BTItem		btitem;
	InsertIndexResult res;

	/* form an index tuple and point it at the heap tuple */
	itup = index_formtuple(RelationGetDescr(index), attdata, nulls);
	itup->t_tid = htup->t_self;

	btitem = _bt_formitem(itup);

	/*
212 213 214
	 * if we are doing bottom-up btree build, we insert the index into a
	 * spool file for subsequent processing.  otherwise, we insert into
	 * the btree.
215 216 217 218 219 220
	 */
	if (buildstate->usefast)
	{
		if (tupleIsAlive || buildstate->spool2 == NULL)
			_bt_spool(btitem, buildstate->spool);
		else
221
		{
222 223 224
			/* dead tuples are put into spool2 */
			buildstate->haveDead = true;
			_bt_spool(btitem, buildstate->spool2);
225
		}
226
	}
227 228 229 230 231 232 233
	else
	{
		res = _bt_doinsert(index, btitem,
						   buildstate->isUnique, buildstate->heapRel);
		if (res)
			pfree(res);
	}
234

235
	buildstate->indtuples += 1;
236

237 238
	pfree(btitem);
	pfree(itup);
239 240 241
}

/*
242
 *	btinsert() -- insert an index tuple into a btree.
243
 *
244 245 246
 *		Descend the tree recursively, find the appropriate location for our
 *		new tuple, put it there, set its unique OID as appropriate, and
 *		return an InsertIndexResult to the caller.
247
 */
248 249
Datum
btinsert(PG_FUNCTION_ARGS)
250
{
251 252 253 254 255
	Relation	rel = (Relation) PG_GETARG_POINTER(0);
	Datum	   *datum = (Datum *) PG_GETARG_POINTER(1);
	char	   *nulls = (char *) PG_GETARG_POINTER(2);
	ItemPointer ht_ctid = (ItemPointer) PG_GETARG_POINTER(3);
	Relation	heapRel = (Relation) PG_GETARG_POINTER(4);
256
	bool		checkUnique = PG_GETARG_BOOL(5);
257
	InsertIndexResult res;
258 259
	BTItem		btitem;
	IndexTuple	itup;
260 261

	/* generate an index tuple */
262
	itup = index_formtuple(RelationGetDescr(rel), datum, nulls);
263 264 265
	itup->t_tid = *ht_ctid;
	btitem = _bt_formitem(itup);

266
	res = _bt_doinsert(rel, btitem, checkUnique, heapRel);
267 268 269 270

	pfree(btitem);
	pfree(itup);

271
	PG_RETURN_POINTER(res);
272 273 274
}

/*
275
 *	btgettuple() -- Get the next tuple in the scan.
276
 */
277 278
Datum
btgettuple(PG_FUNCTION_ARGS)
279
{
280 281
	IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
	ScanDirection dir = (ScanDirection) PG_GETARG_INT32(1);
282 283 284 285
	BTScanOpaque so = (BTScanOpaque) scan->opaque;
	Page		page;
	OffsetNumber offnum;
	bool		res;
286 287 288 289 290 291 292

	/*
	 * If we've already initialized this scan, we can just advance it in
	 * the appropriate direction.  If we haven't done so yet, we call a
	 * routine to get the first item in the scan.
	 */
	if (ItemPointerIsValid(&(scan->currentItemData)))
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
293 294
	{
		/*
Bruce Momjian's avatar
Bruce Momjian committed
295
		 * Restore scan position using heap TID returned by previous call
296 297
		 * to btgettuple(). _bt_restscan() re-grabs the read lock on the
		 * buffer, too.
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
298 299
		 */
		_bt_restscan(scan);
Bruce Momjian's avatar
Bruce Momjian committed
300

301 302 303 304 305 306
		/*
		 * Check to see if we should kill the previously-fetched tuple.
		 */
		if (scan->kill_prior_tuple)
		{
			/*
Bruce Momjian's avatar
Bruce Momjian committed
307 308
			 * Yes, so mark it by setting the LP_DELETE bit in the item
			 * flags.
309 310 311 312
			 */
			offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
			page = BufferGetPage(so->btso_curbuf);
			PageGetItemId(page, offnum)->lp_flags |= LP_DELETE;
Bruce Momjian's avatar
Bruce Momjian committed
313

314 315
			/*
			 * Since this can be redone later if needed, it's treated the
Bruce Momjian's avatar
Bruce Momjian committed
316 317
			 * same as a commit-hint-bit status update for heap tuples: we
			 * mark the buffer dirty but don't make a WAL log entry.
318 319 320
			 */
			SetBufferCommitInfoNeedsSave(so->btso_curbuf);
		}
Bruce Momjian's avatar
Bruce Momjian committed
321

322 323 324
		/*
		 * Now continue the scan.
		 */
325
		res = _bt_next(scan, dir);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
326
	}
327 328
	else
		res = _bt_first(scan, dir);
329

330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
	/*
	 * Skip killed tuples if asked to.
	 */
	if (scan->ignore_killed_tuples)
	{
		while (res)
		{
			offnum = ItemPointerGetOffsetNumber(&(scan->currentItemData));
			page = BufferGetPage(so->btso_curbuf);
			if (!ItemIdDeleted(PageGetItemId(page, offnum)))
				break;
			res = _bt_next(scan, dir);
		}
	}

345
	/*
346
	 * Save heap TID to use it in _bt_restscan.  Then release the read
347 348
	 * lock on the buffer so that we aren't blocking other backends.
	 *
349
	 * NOTE: we do keep the pin on the buffer!	This is essential to ensure
350
	 * that someone else doesn't delete the index entry we are stopped on.
351
	 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
352
	if (res)
353
	{
354
		((BTScanOpaque) scan->opaque)->curHeapIptr = scan->xs_ctup.t_self;
355 356
		LockBuffer(((BTScanOpaque) scan->opaque)->btso_curbuf,
				   BUFFER_LOCK_UNLOCK);
357
	}
358

359
	PG_RETURN_BOOL(res);
360 361 362
}

/*
363
 *	btbeginscan() -- start a scan on a btree index
364
 */
365 366
Datum
btbeginscan(PG_FUNCTION_ARGS)
367
{
368
	Relation	rel = (Relation) PG_GETARG_POINTER(0);
369 370
	int			keysz = PG_GETARG_INT32(1);
	ScanKey		scankey = (ScanKey) PG_GETARG_POINTER(2);
371
	IndexScanDesc scan;
372 373

	/* get the scan */
374
	scan = RelationGetIndexScan(rel, keysz, scankey);
375

376
	PG_RETURN_POINTER(scan);
377 378 379
}

/*
380
 *	btrescan() -- rescan an index relation
381
 */
382 383
Datum
btrescan(PG_FUNCTION_ARGS)
384
{
385
	IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
386
	ScanKey		scankey = (ScanKey) PG_GETARG_POINTER(1);
387 388
	ItemPointer iptr;
	BTScanOpaque so;
389 390 391

	so = (BTScanOpaque) scan->opaque;

392 393 394 395
	if (so == NULL)				/* if called from btbeginscan */
	{
		so = (BTScanOpaque) palloc(sizeof(BTScanOpaqueData));
		so->btso_curbuf = so->btso_mrkbuf = InvalidBuffer;
396 397
		ItemPointerSetInvalid(&(so->curHeapIptr));
		ItemPointerSetInvalid(&(so->mrkHeapIptr));
398 399
		if (scan->numberOfKeys > 0)
			so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData));
400
		else
401
			so->keyData = NULL;
402 403 404 405
		scan->opaque = so;
	}

	/* we aren't holding any read locks, but gotta drop the pins */
406 407
	if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
	{
408
		ReleaseBuffer(so->btso_curbuf);
409
		so->btso_curbuf = InvalidBuffer;
410
		ItemPointerSetInvalid(&(so->curHeapIptr));
411 412 413 414 415
		ItemPointerSetInvalid(iptr);
	}

	if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
	{
416
		ReleaseBuffer(so->btso_mrkbuf);
417
		so->btso_mrkbuf = InvalidBuffer;
418
		ItemPointerSetInvalid(&(so->mrkHeapIptr));
419 420 421 422 423 424 425
		ItemPointerSetInvalid(iptr);
	}

	/*
	 * Reset the scan keys. Note that keys ordering stuff moved to
	 * _bt_first.	   - vadim 05/05/97
	 */
426
	if (scankey && scan->numberOfKeys > 0)
427 428 429
		memmove(scan->keyData,
				scankey,
				scan->numberOfKeys * sizeof(ScanKeyData));
430
	so->numberOfKeys = 0;		/* until _bt_preprocess_keys sets it */
Marc G. Fournier's avatar
Marc G. Fournier committed
431

432
	PG_RETURN_VOID();
433 434 435
}

/*
436
 *	btendscan() -- close down a scan
437
 */
438 439
Datum
btendscan(PG_FUNCTION_ARGS)
440
{
441
	IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
442 443
	ItemPointer iptr;
	BTScanOpaque so;
444 445 446

	so = (BTScanOpaque) scan->opaque;

447
	/* we aren't holding any read locks, but gotta drop the pins */
448 449 450
	if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
	{
		if (BufferIsValid(so->btso_curbuf))
451
			ReleaseBuffer(so->btso_curbuf);
452 453 454 455 456 457 458
		so->btso_curbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

	if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
	{
		if (BufferIsValid(so->btso_mrkbuf))
459
			ReleaseBuffer(so->btso_mrkbuf);
460 461 462 463
		so->btso_mrkbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

464
	if (so->keyData != NULL)
465 466 467
		pfree(so->keyData);
	pfree(so);

468
	PG_RETURN_VOID();
469 470 471
}

/*
472
 *	btmarkpos() -- save current scan position
473
 */
474 475
Datum
btmarkpos(PG_FUNCTION_ARGS)
476
{
477
	IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
478 479
	ItemPointer iptr;
	BTScanOpaque so;
480 481 482

	so = (BTScanOpaque) scan->opaque;

483
	/* we aren't holding any read locks, but gotta drop the pin */
484 485
	if (ItemPointerIsValid(iptr = &(scan->currentMarkData)))
	{
486
		ReleaseBuffer(so->btso_mrkbuf);
487 488 489 490
		so->btso_mrkbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

491
	/* bump pin on current buffer for assignment to mark buffer */
492 493
	if (ItemPointerIsValid(&(scan->currentItemData)))
	{
494
		so->btso_mrkbuf = ReadBuffer(scan->indexRelation,
495
								  BufferGetBlockNumber(so->btso_curbuf));
496
		scan->currentMarkData = scan->currentItemData;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
497
		so->mrkHeapIptr = so->curHeapIptr;
498
	}
499

500
	PG_RETURN_VOID();
501 502 503
}

/*
504
 *	btrestrpos() -- restore scan to last saved position
505
 */
506 507
Datum
btrestrpos(PG_FUNCTION_ARGS)
508
{
509
	IndexScanDesc scan = (IndexScanDesc) PG_GETARG_POINTER(0);
510 511
	ItemPointer iptr;
	BTScanOpaque so;
512 513 514

	so = (BTScanOpaque) scan->opaque;

515
	/* we aren't holding any read locks, but gotta drop the pin */
516 517
	if (ItemPointerIsValid(iptr = &(scan->currentItemData)))
	{
518
		ReleaseBuffer(so->btso_curbuf);
519 520 521 522
		so->btso_curbuf = InvalidBuffer;
		ItemPointerSetInvalid(iptr);
	}

523
	/* bump pin on marked buffer */
524 525
	if (ItemPointerIsValid(&(scan->currentMarkData)))
	{
526
		so->btso_curbuf = ReadBuffer(scan->indexRelation,
527
								  BufferGetBlockNumber(so->btso_mrkbuf));
528
		scan->currentItemData = scan->currentMarkData;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
529
		so->curHeapIptr = so->mrkHeapIptr;
530
	}
531

532
	PG_RETURN_VOID();
533 534
}

535 536 537 538 539 540 541
/*
 * Bulk deletion of all index entries pointing to a set of heap tuples.
 * The set of target tuples is specified via a callback routine that tells
 * whether any given heap tuple (identified by ItemPointer) is being deleted.
 *
 * Result: a palloc'd struct containing statistical info for VACUUM displays.
 */
542
Datum
543
btbulkdelete(PG_FUNCTION_ARGS)
544
{
545
	Relation	rel = (Relation) PG_GETARG_POINTER(0);
546 547 548 549 550
	IndexBulkDeleteCallback callback = (IndexBulkDeleteCallback) PG_GETARG_POINTER(1);
	void	   *callback_state = (void *) PG_GETARG_POINTER(2);
	IndexBulkDeleteResult *result;
	double		tuples_removed;
	double		num_index_tuples;
551 552 553 554
	OffsetNumber deletable[BLCKSZ / sizeof(OffsetNumber)];
	int			ndeletable;
	Buffer		buf;
	BlockNumber num_pages;
555 556 557 558 559

	tuples_removed = 0;
	num_index_tuples = 0;

	/*
560
	 * The outer loop iterates over index leaf pages, the inner over items
Bruce Momjian's avatar
Bruce Momjian committed
561 562
	 * on a leaf page.	We issue just one _bt_delitems() call per page, so
	 * as to minimize WAL traffic.
563
	 *
Bruce Momjian's avatar
Bruce Momjian committed
564 565 566 567 568 569 570 571 572 573 574
	 * Note that we exclusive-lock every leaf page containing data items, in
	 * sequence left to right.	It sounds attractive to only
	 * exclusive-lock those containing items we need to delete, but
	 * unfortunately that is not safe: we could then pass a stopped
	 * indexscan, which could in rare cases lead to deleting the item it
	 * needs to find when it resumes.  (See _bt_restscan --- this could
	 * only happen if an indexscan stops on a deletable item and then a
	 * page split moves that item into a page further to its right, which
	 * the indexscan will have no pin on.)	We can skip obtaining
	 * exclusive lock on empty pages though, since no indexscan could be
	 * stopped on those.
575
	 */
576 577
	buf = _bt_get_endpoint(rel, 0, false);
	if (BufferIsValid(buf))		/* check for empty index */
578
	{
579
		for (;;)
580 581
		{
			Page		page;
582
			BTPageOpaque opaque;
583 584 585
			OffsetNumber offnum,
						minoff,
						maxoff;
Bruce Momjian's avatar
Bruce Momjian committed
586
			BlockNumber nextpage;
587

588
			vacuum_delay_point();
Jan Wieck's avatar
Jan Wieck committed
589

590
			ndeletable = 0;
591
			page = BufferGetPage(buf);
592 593 594 595 596
			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
			minoff = P_FIRSTDATAKEY(opaque);
			maxoff = PageGetMaxOffsetNumber(page);
			/* We probably cannot see deleted pages, but skip 'em if so */
			if (minoff <= maxoff && !P_ISDELETED(opaque))
597
			{
598 599 600 601
				/*
				 * Trade in the initial read lock for a super-exclusive
				 * write lock on this page.
				 */
602 603
				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
				LockBufferForCleanup(buf);
Bruce Momjian's avatar
Bruce Momjian committed
604

605
				/*
Bruce Momjian's avatar
Bruce Momjian committed
606 607
				 * Recompute minoff/maxoff, both of which could have
				 * changed while we weren't holding the lock.
608
				 */
609 610
				minoff = P_FIRSTDATAKEY(opaque);
				maxoff = PageGetMaxOffsetNumber(page);
Bruce Momjian's avatar
Bruce Momjian committed
611

612
				/*
613 614
				 * Scan over all items to see which ones need deleted
				 * according to the callback function.
615
				 */
616 617 618 619 620 621 622 623
				for (offnum = minoff;
					 offnum <= maxoff;
					 offnum = OffsetNumberNext(offnum))
				{
					BTItem		btitem;
					ItemPointer htup;

					btitem = (BTItem) PageGetItem(page,
Bruce Momjian's avatar
Bruce Momjian committed
624
											PageGetItemId(page, offnum));
625 626 627 628 629 630 631 632 633 634
					htup = &(btitem->bti_itup.t_tid);
					if (callback(htup, callback_state))
					{
						deletable[ndeletable++] = offnum;
						tuples_removed += 1;
					}
					else
						num_index_tuples += 1;
				}
			}
Bruce Momjian's avatar
Bruce Momjian committed
635

636 637 638 639 640 641 642 643 644
			/*
			 * If we need to delete anything, do it and write the buffer;
			 * else just release the buffer.
			 */
			nextpage = opaque->btpo_next;
			if (ndeletable > 0)
			{
				_bt_delitems(rel, buf, deletable, ndeletable);
				_bt_wrtbuf(rel, buf);
645 646
			}
			else
647 648 649 650 651 652
				_bt_relbuf(rel, buf);
			/* And advance to next page, if any */
			if (nextpage == P_NONE)
				break;
			buf = _bt_getbuf(rel, nextpage, BT_READ);
		}
653 654 655 656 657
	}

	/* return statistics */
	num_pages = RelationGetNumberOfBlocks(rel);

658
	result = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
659 660
	result->num_pages = num_pages;
	result->num_index_tuples = num_index_tuples;
661
	result->tuples_removed = tuples_removed;
662 663

	PG_RETURN_POINTER(result);
664
}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
665

666 667 668 669 670 671 672 673 674 675 676 677 678 679 680
/*
 * Post-VACUUM cleanup.
 *
 * Here, we scan looking for pages we can delete or return to the freelist.
 *
 * Result: a palloc'd struct containing statistical info for VACUUM displays.
 */
Datum
btvacuumcleanup(PG_FUNCTION_ARGS)
{
	Relation	rel = (Relation) PG_GETARG_POINTER(0);
	IndexVacuumCleanupInfo *info = (IndexVacuumCleanupInfo *) PG_GETARG_POINTER(1);
	IndexBulkDeleteResult *stats = (IndexBulkDeleteResult *) PG_GETARG_POINTER(2);
	BlockNumber num_pages;
	BlockNumber blkno;
681
	BlockNumber *freePages;
682 683
	int			nFreePages,
				maxFreePages;
684 685 686
	BlockNumber pages_deleted = 0;
	MemoryContext mycontext;
	MemoryContext oldcontext;
687 688 689 690 691 692 693 694

	Assert(stats != NULL);

	num_pages = RelationGetNumberOfBlocks(rel);

	/* No point in remembering more than MaxFSMPages pages */
	maxFreePages = MaxFSMPages;
	if ((BlockNumber) maxFreePages > num_pages)
Bruce Momjian's avatar
Bruce Momjian committed
695
		maxFreePages = (int) num_pages + 1;		/* +1 to avoid palloc(0) */
696
	freePages = (BlockNumber *) palloc(maxFreePages * sizeof(BlockNumber));
697 698
	nFreePages = 0;

699 700 701 702 703 704 705
	/* Create a temporary memory context to run _bt_pagedel in */
	mycontext = AllocSetContextCreate(CurrentMemoryContext,
									  "_bt_pagedel",
									  ALLOCSET_DEFAULT_MINSIZE,
									  ALLOCSET_DEFAULT_INITSIZE,
									  ALLOCSET_DEFAULT_MAXSIZE);

706 707 708 709 710
	/*
	 * Scan through all pages of index, except metapage.  (Any pages added
	 * after we start the scan will not be examined; this should be fine,
	 * since they can't possibly be empty.)
	 */
Bruce Momjian's avatar
Bruce Momjian committed
711
	for (blkno = BTREE_METAPAGE + 1; blkno < num_pages; blkno++)
712
	{
Bruce Momjian's avatar
Bruce Momjian committed
713 714
		Buffer		buf;
		Page		page;
715 716 717 718 719
		BTPageOpaque opaque;

		buf = _bt_getbuf(rel, blkno, BT_READ);
		page = BufferGetPage(buf);
		opaque = (BTPageOpaque) PageGetSpecialPointer(page);
720
		if (_bt_page_recyclable(page))
721
		{
722
			/* Okay to recycle this page */
723
			if (nFreePages < maxFreePages)
724
				freePages[nFreePages++] = blkno;
725 726 727 728 729 730
			pages_deleted++;
		}
		else if (P_ISDELETED(opaque))
		{
			/* Already deleted, but can't recycle yet */
			pages_deleted++;
731
		}
732
		else if ((opaque->btpo_flags & BTP_HALF_DEAD) ||
733
				 P_FIRSTDATAKEY(opaque) > PageGetMaxOffsetNumber(page))
734 735
		{
			/* Empty, try to delete */
Bruce Momjian's avatar
Bruce Momjian committed
736
			int			ndel;
737 738 739 740 741 742

			/* Run pagedel in a temp context to avoid memory leakage */
			MemoryContextReset(mycontext);
			oldcontext = MemoryContextSwitchTo(mycontext);

			ndel = _bt_pagedel(rel, buf, info->vacuum_full);
743 744 745 746

			/* count only this page, else may double-count parent */
			if (ndel)
				pages_deleted++;
747 748 749 750

			/*
			 * During VACUUM FULL it's okay to recycle deleted pages
			 * immediately, since there can be no other transactions
Bruce Momjian's avatar
Bruce Momjian committed
751
			 * scanning the index.	Note that we will only recycle the
752 753 754 755 756 757 758 759
			 * current page and not any parent pages that _bt_pagedel
			 * might have recursed to; this seems reasonable in the name
			 * of simplicity.  (Trying to do otherwise would mean we'd
			 * have to sort the list of recyclable pages we're building.)
			 */
			if (ndel && info->vacuum_full)
			{
				if (nFreePages < maxFreePages)
760
					freePages[nFreePages++] = blkno;
761 762 763 764 765
			}

			MemoryContextSwitchTo(oldcontext);
			continue;			/* pagedel released buffer */
		}
766 767 768
		_bt_relbuf(rel, buf);
	}

769
	/*
Bruce Momjian's avatar
Bruce Momjian committed
770 771 772 773
	 * During VACUUM FULL, we truncate off any recyclable pages at the end
	 * of the index.  In a normal vacuum it'd be unsafe to do this except
	 * by acquiring exclusive lock on the index and then rechecking all
	 * the pages; doesn't seem worth it.
774 775 776
	 */
	if (info->vacuum_full && nFreePages > 0)
	{
Bruce Momjian's avatar
Bruce Momjian committed
777
		BlockNumber new_pages = num_pages;
778

Bruce Momjian's avatar
Bruce Momjian committed
779
		while (nFreePages > 0 && freePages[nFreePages - 1] == new_pages - 1)
780 781 782 783 784 785 786 787 788 789 790 791 792
		{
			new_pages--;
			pages_deleted--;
			nFreePages--;
		}
		if (new_pages != num_pages)
		{
			int			i;

			/*
			 * Okay to truncate.
			 *
			 * First, flush any shared buffers for the blocks we intend to
Bruce Momjian's avatar
Bruce Momjian committed
793 794 795 796
			 * delete.	FlushRelationBuffers is a bit more than we need
			 * for this, since it will also write out dirty buffers for
			 * blocks we aren't deleting, but it's the closest thing in
			 * bufmgr's API.
797 798 799
			 */
			i = FlushRelationBuffers(rel, new_pages);
			if (i < 0)
800
				elog(ERROR, "FlushRelationBuffers returned %d", i);
801 802 803 804

			/*
			 * Do the physical truncation.
			 */
805
			RelationTruncate(rel, new_pages);
806 807 808 809
			num_pages = new_pages;
		}
	}

810 811
	/*
	 * Update the shared Free Space Map with the info we now have about
812
	 * free pages in the index, discarding any old info the map may have.
813 814
	 * We do not need to sort the page numbers; they're in order already.
	 */
815
	RecordIndexFreeSpace(&rel->rd_node, nFreePages, freePages);
816

817
	pfree(freePages);
818

819 820
	MemoryContextDelete(mycontext);

821 822
	/* update statistics */
	stats->num_pages = num_pages;
823
	stats->pages_deleted = pages_deleted;
824 825 826 827 828
	stats->pages_free = nFreePages;

	PG_RETURN_POINTER(stats);
}

829 830
/*
 * Restore scan position when btgettuple is called to continue a scan.
831 832 833 834 835 836 837
 *
 * This is nontrivial because concurrent insertions might have moved the
 * index tuple we stopped on.  We assume the tuple can only have moved to
 * the right from our stop point, because we kept a pin on the buffer,
 * and so no deletion can have occurred on that page.
 *
 * On entry, we have a pin but no read lock on the buffer that contained
Bruce Momjian's avatar
Bruce Momjian committed
838
 * the index tuple we stopped the scan on.	On exit, we have pin and read
839 840
 * lock on the buffer that now contains that index tuple, and the scandesc's
 * current position is updated to point at it.
841
 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
842 843 844
static void
_bt_restscan(IndexScanDesc scan)
{
845
	Relation	rel = scan->indexRelation;
846 847
	BTScanOpaque so = (BTScanOpaque) scan->opaque;
	Buffer		buf = so->btso_curbuf;
848
	Page		page;
849 850
	ItemPointer current = &(scan->currentItemData);
	OffsetNumber offnum = ItemPointerGetOffsetNumber(current),
851 852
				maxoff;
	BTPageOpaque opaque;
853
	Buffer		nextbuf;
854
	ItemPointer target = &(so->curHeapIptr);
855 856
	BTItem		item;
	BlockNumber blkno;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
857

858
	/*
Bruce Momjian's avatar
Bruce Momjian committed
859 860
	 * Reacquire read lock on the buffer.  (We should still have a
	 * reference-count pin on it, so need not get that.)
861 862 863
	 */
	LockBuffer(buf, BT_READ);

864 865 866
	page = BufferGetPage(buf);
	maxoff = PageGetMaxOffsetNumber(page);
	opaque = (BTPageOpaque) PageGetSpecialPointer(page);
867

868
	/*
Bruce Momjian's avatar
Bruce Momjian committed
869 870 871
	 * We use this as flag when first index tuple on page is deleted but
	 * we do not move left (this would slowdown vacuum) - so we set
	 * current->ip_posid before first index tuple on the current page
872
	 * (_bt_step will move it right)...  XXX still needed?
873
	 */
874
	if (!ItemPointerIsValid(target))
875
	{
876
		ItemPointerSetOffsetNumber(current,
877
							   OffsetNumberPrev(P_FIRSTDATAKEY(opaque)));
878 879 880
		return;
	}

881
	/*
882
	 * The item we were on may have moved right due to insertions. Find it
883
	 * again.  We use the heap TID to identify the item uniquely.
884 885
	 */
	for (;;)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
886
	{
887
		/* Check for item on this page */
888
		for (;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
889 890 891 892
			 offnum <= maxoff;
			 offnum = OffsetNumberNext(offnum))
		{
			item = (BTItem) PageGetItem(page, PageGetItemId(page, offnum));
893
			if (BTTidSame(item->bti_itup.t_tid, *target))
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
894
			{
895
				/* Found it */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
896 897 898 899 900
				current->ip_posid = offnum;
				return;
			}
		}

901
		/*
902
		 * The item we're looking for moved right at least one page, so
Bruce Momjian's avatar
Bruce Momjian committed
903 904 905 906 907
		 * move right.	We are careful here to pin and read-lock the next
		 * non-dead page before releasing the current one.	This ensures
		 * that a concurrent btbulkdelete scan cannot pass our position
		 * --- if it did, it might be able to reach and delete our target
		 * item before we can find it again.
908
		 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
909
		if (P_RIGHTMOST(opaque))
910 911
			elog(ERROR, "failed to re-find previous key in \"%s\"",
				 RelationGetRelationName(rel));
912 913 914 915 916
		/* Advance to next non-dead page --- there must be one */
		nextbuf = InvalidBuffer;
		for (;;)
		{
			blkno = opaque->btpo_next;
917
			nextbuf = _bt_relandgetbuf(rel, nextbuf, blkno, BT_READ);
918 919 920 921 922
			page = BufferGetPage(nextbuf);
			opaque = (BTPageOpaque) PageGetSpecialPointer(page);
			if (!P_IGNORE(opaque))
				break;
			if (P_RIGHTMOST(opaque))
923
				elog(ERROR, "fell off the end of \"%s\"",
924 925
					 RelationGetRelationName(rel));
		}
926
		_bt_relbuf(rel, buf);
927
		so->btso_curbuf = buf = nextbuf;
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
928
		maxoff = PageGetMaxOffsetNumber(page);
929 930
		offnum = P_FIRSTDATAKEY(opaque);
		ItemPointerSet(current, blkno, offnum);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
931 932
	}
}