vacuum.c 104 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * vacuum.c
4 5 6
 *	  The postgres vacuum cleaner.
 *
 * This file includes the "full" version of VACUUM, as well as control code
7
 * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.	See
8 9
 * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
 *
10
 *
11
 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
12
 * Portions Copyright (c) 1994, Regents of the University of California
13 14 15
 *
 *
 * IDENTIFICATION
16
 *	  $PostgreSQL: pgsql/src/backend/commands/vacuum.c,v 1.333 2006/07/10 16:20:50 alvherre Exp $
17 18 19
 *
 *-------------------------------------------------------------------------
 */
20 21
#include "postgres.h"

22
#include <sys/time.h>
23
#include <unistd.h>
24

25
#include "access/clog.h"
Bruce Momjian's avatar
Bruce Momjian committed
26 27
#include "access/genam.h"
#include "access/heapam.h"
28
#include "access/multixact.h"
29
#include "access/subtrans.h"
30
#include "access/xlog.h"
Bruce Momjian's avatar
Bruce Momjian committed
31
#include "catalog/catalog.h"
32
#include "catalog/indexing.h"
33
#include "catalog/namespace.h"
34
#include "catalog/pg_database.h"
35
#include "catalog/pg_index.h"
36
#include "commands/dbcommands.h"
Bruce Momjian's avatar
Bruce Momjian committed
37
#include "commands/vacuum.h"
38
#include "executor/executor.h"
Bruce Momjian's avatar
Bruce Momjian committed
39
#include "miscadmin.h"
40
#include "postmaster/autovacuum.h"
41
#include "storage/freespace.h"
42
#include "storage/pmsignal.h"
43
#include "storage/procarray.h"
Bruce Momjian's avatar
Bruce Momjian committed
44
#include "storage/smgr.h"
45
#include "tcop/pquery.h"
46
#include "utils/acl.h"
Bruce Momjian's avatar
Bruce Momjian committed
47
#include "utils/builtins.h"
48
#include "utils/flatfiles.h"
49
#include "utils/fmgroids.h"
Bruce Momjian's avatar
Bruce Momjian committed
50
#include "utils/inval.h"
51
#include "utils/lsyscache.h"
52
#include "utils/memutils.h"
53
#include "utils/pg_rusage.h"
54
#include "utils/relcache.h"
Bruce Momjian's avatar
Bruce Momjian committed
55
#include "utils/syscache.h"
56 57
#include "pgstat.h"

58

59 60 61 62
/*
 * VacPage structures keep track of each page on which we find useful
 * amounts of free space.
 */
63 64 65 66 67 68
typedef struct VacPageData
{
	BlockNumber blkno;			/* BlockNumber of this Page */
	Size		free;			/* FreeSpace on this Page */
	uint16		offsets_used;	/* Number of OffNums used by vacuum */
	uint16		offsets_free;	/* Number of OffNums free or to be free */
69
	OffsetNumber offsets[1];	/* Array of free OffNums */
70 71 72 73 74 75
} VacPageData;

typedef VacPageData *VacPage;

typedef struct VacPageListData
{
76
	BlockNumber empty_end_pages;	/* Number of "empty" end-pages */
77
	int			num_pages;		/* Number of pages in pagedesc */
78 79
	int			num_allocated_pages;	/* Number of allocated pages in
										 * pagedesc */
80
	VacPage    *pagedesc;		/* Descriptions of pages */
81 82 83 84
} VacPageListData;

typedef VacPageListData *VacPageList;

85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
/*
 * The "vtlinks" array keeps information about each recently-updated tuple
 * ("recent" meaning its XMAX is too new to let us recycle the tuple).
 * We store the tuple's own TID as well as its t_ctid (its link to the next
 * newer tuple version).  Searching in this array allows us to follow update
 * chains backwards from newer to older tuples.  When we move a member of an
 * update chain, we must move *all* the live members of the chain, so that we
 * can maintain their t_ctid link relationships (we must not just overwrite
 * t_ctid in an existing tuple).
 *
 * Note: because t_ctid links can be stale (this would only occur if a prior
 * VACUUM crashed partway through), it is possible that new_tid points to an
 * empty slot or unrelated tuple.  We have to check the linkage as we follow
 * it, just as is done in EvalPlanQual.
 */
100 101
typedef struct VTupleLinkData
{
102 103
	ItemPointerData new_tid;	/* t_ctid of an updated tuple */
	ItemPointerData this_tid;	/* t_self of the tuple */
104 105 106 107
} VTupleLinkData;

typedef VTupleLinkData *VTupleLink;

108 109 110 111
/*
 * We use an array of VTupleMoveData to plan a chain tuple move fully
 * before we do it.
 */
112 113 114
typedef struct VTupleMoveData
{
	ItemPointerData tid;		/* tuple ID */
115 116
	VacPage		vacpage;		/* where to move it to */
	bool		cleanVpd;		/* clean vacpage before using? */
117 118 119 120
} VTupleMoveData;

typedef VTupleMoveData *VTupleMove;

121 122 123
/*
 * VRelStats contains the data acquired by scan_heap for use later
 */
124 125
typedef struct VRelStats
{
126
	/* miscellaneous statistics */
127
	BlockNumber rel_pages;
128
	double		rel_tuples;
129 130 131
	Size		min_tlen;
	Size		max_tlen;
	bool		hasindex;
132
	TransactionId minxid;	/* Minimum Xid present anywhere on table */
133
	/* vtlinks array for tuple chain following - sorted by new_tid */
134 135 136 137
	int			num_vtlinks;
	VTupleLink	vtlinks;
} VRelStats;

Bruce Momjian's avatar
Bruce Momjian committed
138 139 140 141 142 143
/*----------------------------------------------------------------------
 * ExecContext:
 *
 * As these variables always appear together, we put them into one struct
 * and pull initialization and cleanup into separate routines.
 * ExecContext is used by repair_frag() and move_xxx_tuple().  More
Bruce Momjian's avatar
Bruce Momjian committed
144
 * accurately:	It is *used* only in move_xxx_tuple(), but because this
Bruce Momjian's avatar
Bruce Momjian committed
145 146 147 148 149 150 151 152 153
 * routine is called many times, we initialize the struct just once in
 * repair_frag() and pass it on to move_xxx_tuple().
 */
typedef struct ExecContextData
{
	ResultRelInfo *resultRelInfo;
	EState	   *estate;
	TupleTableSlot *slot;
} ExecContextData;
154

Bruce Momjian's avatar
Bruce Momjian committed
155 156 157 158 159 160 161 162 163 164 165 166 167 168
typedef ExecContextData *ExecContext;

static void
ExecContext_Init(ExecContext ec, Relation rel)
{
	TupleDesc	tupdesc = RelationGetDescr(rel);

	/*
	 * We need a ResultRelInfo and an EState so we can use the regular
	 * executor's index-entry-making machinery.
	 */
	ec->estate = CreateExecutorState();

	ec->resultRelInfo = makeNode(ResultRelInfo);
Bruce Momjian's avatar
Bruce Momjian committed
169
	ec->resultRelInfo->ri_RangeTableIndex = 1;	/* dummy */
Bruce Momjian's avatar
Bruce Momjian committed
170
	ec->resultRelInfo->ri_RelationDesc = rel;
Bruce Momjian's avatar
Bruce Momjian committed
171
	ec->resultRelInfo->ri_TrigDesc = NULL;		/* we don't fire triggers */
Bruce Momjian's avatar
Bruce Momjian committed
172 173 174 175 176 177 178

	ExecOpenIndices(ec->resultRelInfo);

	ec->estate->es_result_relations = ec->resultRelInfo;
	ec->estate->es_num_result_relations = 1;
	ec->estate->es_result_relation_info = ec->resultRelInfo;

179 180
	/* Set up a tuple slot too */
	ec->slot = MakeSingleTupleTableSlot(tupdesc);
Bruce Momjian's avatar
Bruce Momjian committed
181 182 183 184 185
}

static void
ExecContext_Finish(ExecContext ec)
{
186
	ExecDropSingleTupleTableSlot(ec->slot);
Bruce Momjian's avatar
Bruce Momjian committed
187 188 189
	ExecCloseIndices(ec->resultRelInfo);
	FreeExecutorState(ec->estate);
}
Bruce Momjian's avatar
Bruce Momjian committed
190

Bruce Momjian's avatar
Bruce Momjian committed
191 192 193 194
/*
 * End of ExecContext Implementation
 *----------------------------------------------------------------------
 */
195

196
static MemoryContext vac_context = NULL;
197

Bruce Momjian's avatar
Bruce Momjian committed
198
static int	elevel = -1;
199

200

201
/* non-export function prototypes */
202
static List *get_rel_oids(List *relids, const RangeVar *vacrel,
203
			 const char *stmttype);
204 205 206 207 208
static void vac_update_dbminxid(Oid dbid,
					TransactionId *minxid,
					TransactionId *vacuumxid);
static void vac_truncate_clog(TransactionId myminxid, TransactionId myvacxid);
static void vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind);
209
static void full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
210
static void scan_heap(VRelStats *vacrelstats, Relation onerel,
211 212
		  VacPageList vacuum_pages, VacPageList fraged_pages,
		  TransactionId FreezeLimit, TransactionId OldestXmin);
213
static void repair_frag(VRelStats *vacrelstats, Relation onerel,
214
			VacPageList vacuum_pages, VacPageList fraged_pages,
215
			int nindexes, Relation *Irel, TransactionId OldestXmin);
Bruce Momjian's avatar
Bruce Momjian committed
216
static void move_chain_tuple(Relation rel,
Bruce Momjian's avatar
Bruce Momjian committed
217 218 219
				 Buffer old_buf, Page old_page, HeapTuple old_tup,
				 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
				 ExecContext ec, ItemPointer ctid, bool cleanVpd);
Bruce Momjian's avatar
Bruce Momjian committed
220
static void move_plain_tuple(Relation rel,
Bruce Momjian's avatar
Bruce Momjian committed
221 222 223
				 Buffer old_buf, Page old_page, HeapTuple old_tup,
				 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
				 ExecContext ec);
Bruce Momjian's avatar
Bruce Momjian committed
224
static void update_hint_bits(Relation rel, VacPageList fraged_pages,
Bruce Momjian's avatar
Bruce Momjian committed
225 226
				 int num_fraged_pages, BlockNumber last_move_dest_block,
				 int num_moved);
227
static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
228
			VacPageList vacpagelist);
229
static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
230
static void vacuum_index(VacPageList vacpagelist, Relation indrel,
231
			 double num_tuples, int keep_tuples);
232
static void scan_index(Relation indrel, double num_tuples);
233
static bool tid_reaped(ItemPointer itemptr, void *state);
234
static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
235
			   BlockNumber rel_pages);
236
static VacPage copy_vac_page(VacPage vacpage);
Bruce Momjian's avatar
Bruce Momjian committed
237
static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
238
static void *vac_bsearch(const void *key, const void *base,
239 240
			size_t nelem, size_t size,
			int (*compar) (const void *, const void *));
Bruce Momjian's avatar
Bruce Momjian committed
241 242 243
static int	vac_cmp_blk(const void *left, const void *right);
static int	vac_cmp_offno(const void *left, const void *right);
static int	vac_cmp_vtlinks(const void *left, const void *right);
Bruce Momjian's avatar
Bruce Momjian committed
244
static bool enough_space(VacPage vacpage, Size len);
245
static Size	PageGetFreeSpaceWithFillFactor(Relation relation, Page page);
246 247 248 249 250 251 252 253


/****************************************************************************
 *																			*
 *			Code common to all flavors of VACUUM and ANALYZE				*
 *																			*
 ****************************************************************************
 */
254

255

256 257
/*
 * Primary entry point for VACUUM and ANALYZE commands.
258 259 260 261 262 263 264 265 266
 *
 * relids is normally NIL; if it is not, then it provides the list of
 * relation OIDs to be processed, and vacstmt->relation is ignored.
 * (The non-NIL case is currently only used by autovacuum.)
 *
 * It is the caller's responsibility that both vacstmt and relids
 * (if given) be allocated in a memory context that won't disappear
 * at transaction commit.  In fact this context must be QueryContext
 * to avoid complaints from PreventTransactionChain.
267
 */
268
void
269
vacuum(VacuumStmt *vacstmt, List *relids)
270
{
271
	const char *stmttype = vacstmt->vacuum ? "VACUUM" : "ANALYZE";
272 273
	volatile MemoryContext anl_context = NULL;
	volatile bool all_rels,
274 275
				in_outer_xact,
				use_own_xacts;
276
	List	   *relations;
277 278 279 280

	if (vacstmt->verbose)
		elevel = INFO;
	else
281
		elevel = DEBUG2;
282

283
	/*
284 285 286 287 288 289 290 291 292 293
	 * We cannot run VACUUM inside a user transaction block; if we were inside
	 * a transaction, then our commit- and start-transaction-command calls
	 * would not have the intended effect! Furthermore, the forced commit that
	 * occurs before truncating the relation's file would have the effect of
	 * committing the rest of the user's transaction too, which would
	 * certainly not be the desired behavior.  (This only applies to VACUUM
	 * FULL, though.  We could in theory run lazy VACUUM inside a transaction
	 * block, but we choose to disallow that case because we'd rather commit
	 * as soon as possible after finishing the vacuum.	This is mainly so that
	 * we can let go the AccessExclusiveLock that we may be holding.)
294 295
	 *
	 * ANALYZE (without VACUUM) can run either way.
296
	 */
297
	if (vacstmt->vacuum)
298
	{
299
		PreventTransactionChain((void *) vacstmt, stmttype);
300 301 302 303
		in_outer_xact = false;
	}
	else
		in_outer_xact = IsInTransactionChain((void *) vacstmt);
304

305 306 307
	/*
	 * Disallow the combination VACUUM FULL FREEZE; although it would mostly
	 * work, VACUUM FULL's ability to move tuples around means that it is
308
	 * injecting its own XID into tuple visibility checks.	We'd have to
309 310
	 * guarantee that every moved tuple is properly marked XMIN_COMMITTED or
	 * XMIN_INVALID before the end of the operation.  There are corner cases
311 312
	 * where this does not happen, and getting rid of them all seems hard (not
	 * to mention fragile to maintain).  On the whole it's not worth it
313 314 315
	 * compared to telling people to use two operations.  See pgsql-hackers
	 * discussion of 27-Nov-2004, and comments below for update_hint_bits().
	 *
316 317
	 * Note: this is enforced here, and not in the grammar, since (a) we can
	 * give a better error message, and (b) we might want to allow it again
318 319 320 321 322 323 324 325
	 * someday.
	 */
	if (vacstmt->vacuum && vacstmt->full && vacstmt->freeze)
		ereport(ERROR,
				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
				 errmsg("VACUUM FULL FREEZE is not supported"),
				 errhint("Use VACUUM FULL, then VACUUM FREEZE.")));

326
	/*
327 328
	 * Send info about dead objects to the statistics collector, unless
	 * we are in autovacuum --- autovacuum.c does this for itself.
329
	 */
330
	if (vacstmt->vacuum && !IsAutoVacuumProcess())
331
		pgstat_vacuum_tabstat();
332

333 334 335
	/*
	 * Create special memory context for cross-transaction storage.
	 *
336 337
	 * Since it is a child of PortalContext, it will go away eventually even
	 * if we suffer an error; there's no need for special abort cleanup logic.
338
	 */
339
	vac_context = AllocSetContextCreate(PortalContext,
340 341 342 343
										"Vacuum",
										ALLOCSET_DEFAULT_MINSIZE,
										ALLOCSET_DEFAULT_INITSIZE,
										ALLOCSET_DEFAULT_MAXSIZE);
344

345 346
	/* Remember whether we are processing everything in the DB */
	all_rels = (relids == NIL && vacstmt->relation == NULL);
Tom Lane's avatar
ARGH!  
Tom Lane committed
347

348
	/*
349 350
	 * Build list of relations to process, unless caller gave us one. (If we
	 * build one, we put it in vac_context for safekeeping.)
351 352
	 */
	relations = get_rel_oids(relids, vacstmt->relation, stmttype);
353

354 355 356
	/*
	 * Decide whether we need to start/commit our own transactions.
	 *
357 358
	 * For VACUUM (with or without ANALYZE): always do so, so that we can
	 * release locks as soon as possible.  (We could possibly use the outer
359 360
	 * transaction for a one-table VACUUM, but handling TOAST tables would be
	 * problematic.)
361 362
	 *
	 * For ANALYZE (no VACUUM): if inside a transaction block, we cannot
363 364 365
	 * start/commit our own transactions.  Also, there's no need to do so if
	 * only processing one relation.  For multiple relations when not within a
	 * transaction block, use own transactions so we can release locks sooner.
366 367 368 369 370 371 372 373
	 */
	if (vacstmt->vacuum)
		use_own_xacts = true;
	else
	{
		Assert(vacstmt->analyze);
		if (in_outer_xact)
			use_own_xacts = false;
374
		else if (list_length(relations) > 1)
375 376 377 378 379
			use_own_xacts = true;
		else
			use_own_xacts = false;
	}

380
	/*
381 382
	 * If we are running ANALYZE without per-table transactions, we'll need a
	 * memory context with table lifetime.
383
	 */
384 385 386 387 388 389
	if (!use_own_xacts)
		anl_context = AllocSetContextCreate(PortalContext,
											"Analyze",
											ALLOCSET_DEFAULT_MINSIZE,
											ALLOCSET_DEFAULT_INITSIZE,
											ALLOCSET_DEFAULT_MAXSIZE);
390 391

	/*
392 393 394 395 396 397
	 * vacuum_rel expects to be entered with no transaction active; it will
	 * start and commit its own transaction.  But we are called by an SQL
	 * command, and so we are executing inside a transaction already. We
	 * commit the transaction started in PostgresMain() here, and start
	 * another one before exiting to match the commit waiting for us back in
	 * PostgresMain().
398
	 */
399
	if (use_own_xacts)
400 401
	{
		/* matches the StartTransaction in PostgresMain() */
402
		CommitTransactionCommand();
403
	}
404

405 406
	/* Turn vacuum cost accounting on or off */
	PG_TRY();
407
	{
408
		ListCell   *cur;
409

410
		VacuumCostActive = (VacuumCostDelay > 0);
411 412
		VacuumCostBalance = 0;

413 414 415
		/*
		 * Loop to process each selected relation.
		 */
416
		foreach(cur, relations)
417
		{
418 419 420
			Oid			relid = lfirst_oid(cur);

			if (vacstmt->vacuum)
421 422
				vacuum_rel(relid, vacstmt, RELKIND_RELATION);

423
			if (vacstmt->analyze)
424
			{
425 426 427
				MemoryContext old_context = NULL;

				/*
428 429 430 431 432
				 * If using separate xacts, start one for analyze. Otherwise,
				 * we can use the outer transaction, but we still need to call
				 * analyze_rel in a memory context that will be cleaned up on
				 * return (else we leak memory while processing multiple
				 * tables).
433 434 435 436
				 */
				if (use_own_xacts)
				{
					StartTransactionCommand();
437 438
					/* functions in indexes may want a snapshot set */
					ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
439 440 441 442 443
				}
				else
					old_context = MemoryContextSwitchTo(anl_context);

				/*
444 445
				 * Tell the buffer replacement strategy that vacuum is causing
				 * the IO
446 447 448 449 450 451 452 453 454 455 456 457 458 459
				 */
				StrategyHintVacuum(true);

				analyze_rel(relid, vacstmt);

				StrategyHintVacuum(false);

				if (use_own_xacts)
					CommitTransactionCommand();
				else
				{
					MemoryContextSwitchTo(old_context);
					MemoryContextResetAndDeleteChildren(anl_context);
				}
460 461
			}
		}
462
	}
463 464 465 466 467 468 469 470 471 472
	PG_CATCH();
	{
		/* Make sure cost accounting is turned off after error */
		VacuumCostActive = false;
		PG_RE_THROW();
	}
	PG_END_TRY();

	/* Turn off vacuum cost accounting */
	VacuumCostActive = false;
473

474 475 476
	/*
	 * Finish up processing.
	 */
477
	if (use_own_xacts)
478
	{
479
		/* here, we are not in a transaction */
480

481
		/*
Bruce Momjian's avatar
Bruce Momjian committed
482
		 * This matches the CommitTransaction waiting for us in
483
		 * PostgresMain().
484
		 */
485
		StartTransactionCommand();
486 487 488 489 490 491 492 493
		/*
		 * Re-establish the transaction snapshot.  This is wasted effort
		 * when we are called as a normal utility command, because the
		 * new transaction will be dropped immediately by PostgresMain();
		 * but it's necessary if we are called from autovacuum because
		 * autovacuum might continue on to do an ANALYZE-only call.
		 */
		ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
494
	}
495

496 497
	if (vacstmt->vacuum)
	{
498 499 500
		TransactionId	minxid,
						vacuumxid;

501
		/*
502 503
		 * If it was a database-wide VACUUM, print FSM usage statistics (we
		 * don't make you be superuser to see these).
504
		 */
505
		if (all_rels)
506 507
			PrintFreeSpaceMapStatistics(elevel);

508 509 510 511 512
		/* Update pg_database.datminxid and datvacuumxid */
		vac_update_dbminxid(MyDatabaseId, &minxid, &vacuumxid);

		/* Try to truncate pg_clog. */
		vac_truncate_clog(minxid, vacuumxid);
513 514
	}

515 516
	/*
	 * Clean up working storage --- note we must do this after
517 518
	 * StartTransactionCommand, else we might be trying to delete the active
	 * context!
519 520 521
	 */
	MemoryContextDelete(vac_context);
	vac_context = NULL;
522

523
	if (anl_context)
524
		MemoryContextDelete(anl_context);
525 526 527
}

/*
528
 * Build a list of Oids for each relation to be processed
529 530 531
 *
 * The list is built in vac_context so that it will survive across our
 * per-relation transactions.
532
 */
533
static List *
534
get_rel_oids(List *relids, const RangeVar *vacrel, const char *stmttype)
535
{
536
	List	   *oid_list = NIL;
537 538
	MemoryContext oldcontext;

539 540 541 542
	/* List supplied by VACUUM's caller? */
	if (relids)
		return relids;

543
	if (vacrel)
544
	{
545
		/* Process a specific relation */
Bruce Momjian's avatar
Bruce Momjian committed
546
		Oid			relid;
547 548 549 550 551

		relid = RangeVarGetRelid(vacrel, false);

		/* Make a relation list entry for this guy */
		oldcontext = MemoryContextSwitchTo(vac_context);
552
		oid_list = lappend_oid(oid_list, relid);
553
		MemoryContextSwitchTo(oldcontext);
554 555 556
	}
	else
	{
557 558 559 560 561
		/* Process all plain relations listed in pg_class */
		Relation	pgclass;
		HeapScanDesc scan;
		HeapTuple	tuple;
		ScanKeyData key;
562

563 564 565 566
		ScanKeyInit(&key,
					Anum_pg_class_relkind,
					BTEqualStrategyNumber, F_CHAREQ,
					CharGetDatum(RELKIND_RELATION));
567

568
		pgclass = heap_open(RelationRelationId, AccessShareLock);
569

570
		scan = heap_beginscan(pgclass, SnapshotNow, 1, &key);
571

572
		while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
573
		{
574 575
			/* Make a relation list entry for this guy */
			oldcontext = MemoryContextSwitchTo(vac_context);
576
			oid_list = lappend_oid(oid_list, HeapTupleGetOid(tuple));
577
			MemoryContextSwitchTo(oldcontext);
578
		}
579

580 581
		heap_endscan(scan);
		heap_close(pgclass, AccessShareLock);
582 583
	}

584
	return oid_list;
585 586
}

587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614
/*
 * vacuum_set_xid_limits() -- compute oldest-Xmin and freeze cutoff points
 */
void
vacuum_set_xid_limits(VacuumStmt *vacstmt, bool sharedRel,
					  TransactionId *oldestXmin,
					  TransactionId *freezeLimit)
{
	TransactionId limit;

	*oldestXmin = GetOldestXmin(sharedRel);

	Assert(TransactionIdIsNormal(*oldestXmin));

	if (vacstmt->freeze)
	{
		/* FREEZE option: use oldest Xmin as freeze cutoff too */
		limit = *oldestXmin;
	}
	else
	{
		/*
		 * Normal case: freeze cutoff is well in the past, to wit, about
		 * halfway to the wrap horizon
		 */
		limit = GetCurrentTransactionId() - (MaxTransactionId >> 2);
	}

615 616 617
	/*
	 * Be careful not to generate a "permanent" XID
	 */
618 619 620
	if (!TransactionIdIsNormal(limit))
		limit = FirstNormalTransactionId;

621 622 623
	/*
	 * Ensure sane relationship of limits
	 */
624 625
	if (TransactionIdFollows(limit, *oldestXmin))
	{
626
		ereport(WARNING,
627
				(errmsg("oldest xmin is far in the past"),
628
				 errhint("Close open transactions soon to avoid wraparound problems.")));
629 630 631 632 633 634
		limit = *oldestXmin;
	}

	*freezeLimit = limit;
}

635

636
/*
637
 *	vac_update_relstats() -- update statistics for one relation
638
 *
639 640 641 642 643
 *		Update the whole-relation statistics that are kept in its pg_class
 *		row.  There are additional stats that will be updated if we are
 *		doing ANALYZE, but we always update these stats.  This routine works
 *		for both index and heap relation entries in pg_class.
 *
644 645 646 647
 *		We violate transaction semantics here by overwriting the rel's
 *		existing pg_class tuple with the new values.  This is reasonably
 *		safe since the new values are correct whether or not this transaction
 *		commits.  The reason for this is that if we updated these tuples in
648 649 650 651 652 653 654 655 656 657
 *		the usual way, vacuuming pg_class itself wouldn't work very well ---
 *		by the time we got done with a vacuum cycle, most of the tuples in
 *		pg_class would've been obsoleted.  Of course, this only works for
 *		fixed-size never-null columns, but these are.
 *
 *		This routine is shared by full VACUUM, lazy VACUUM, and stand-alone
 *		ANALYZE.
 */
void
vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
658 659
					bool hasindex, TransactionId minxid,
					TransactionId vacuumxid)
660 661 662 663
{
	Relation	rd;
	HeapTuple	ctup;
	Form_pg_class pgcform;
664
	bool		dirty;
665

666
	rd = heap_open(RelationRelationId, RowExclusiveLock);
667

668 669 670 671
	/* Fetch a copy of the tuple to scribble on */
	ctup = SearchSysCacheCopy(RELOID,
							  ObjectIdGetDatum(relid),
							  0, 0, 0);
672 673 674
	if (!HeapTupleIsValid(ctup))
		elog(ERROR, "pg_class entry for relid %u vanished during vacuuming",
			 relid);
675
	pgcform = (Form_pg_class) GETSTRUCT(ctup);
676

677
	/* Apply required updates, if any, to copied tuple */
678

679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694
	dirty = false;
	if (pgcform->relpages != (int32) num_pages)
	{
		pgcform->relpages = (int32) num_pages;
		dirty = true;
	}
	if (pgcform->reltuples != (float4) num_tuples)
	{
		pgcform->reltuples = (float4) num_tuples;
		dirty = true;
	}
	if (pgcform->relhasindex != hasindex)
	{
		pgcform->relhasindex = hasindex;
		dirty = true;
	}
695
	/*
696 697
	 * If we have discovered that there are no indexes, then there's no
	 * primary key either.	This could be done more thoroughly...
698 699
	 */
	if (!hasindex)
700 701 702 703 704 705 706
	{
		if (pgcform->relhaspkey)
		{
			pgcform->relhaspkey = false;
			dirty = true;
		}
	}
707 708 709 710 711 712 713 714 715 716
	if (TransactionIdIsValid(minxid) && pgcform->relminxid != minxid)
	{
		pgcform->relminxid = minxid;
		dirty = true;
	}
	if (TransactionIdIsValid(vacuumxid) && pgcform->relvacuumxid != vacuumxid)
	{
		pgcform->relvacuumxid = vacuumxid;
		dirty = true;
	}
717

718
	/*
719
	 * If anything changed, write out the tuple
720
	 */
721 722
	if (dirty)
		heap_inplace_update(rd, ctup);
723 724 725 726 727

	heap_close(rd, RowExclusiveLock);
}


728
/*
729
 *	vac_update_dbminxid() -- update the minimum Xid present in one database
730
 *
731 732 733 734 735
 * 		Update pg_database's datminxid and datvacuumxid, and the flat-file copy
 * 		of it.  datminxid is updated to the minimum of all relminxid found in
 * 		pg_class.  datvacuumxid is updated to the minimum of all relvacuumxid
 * 		found in pg_class.  The values are also returned in minxid and
 * 		vacuumxid, respectively.
736
 *
737 738 739 740 741
 *		We violate transaction semantics here by overwriting the database's
 *		existing pg_database tuple with the new values.  This is reasonably
 *		safe since the new values are correct whether or not this transaction
 *		commits.  As with vac_update_relstats, this avoids leaving dead tuples
 *		behind after a VACUUM.
742
 *
743
 *		This routine is shared by full and lazy VACUUM.
744 745
 */
static void
746
vac_update_dbminxid(Oid dbid, TransactionId *minxid, TransactionId *vacuumxid)
747 748 749
{
	HeapTuple	tuple;
	Form_pg_database dbform;
750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794
	Relation	relation;
	SysScanDesc	scan;
	HeapTuple	classTup;
	TransactionId	newMinXid = InvalidTransactionId;
	TransactionId	newVacXid = InvalidTransactionId;
	bool		dirty = false;

	/* 
	 * We must seqscan pg_class to find the minimum Xid, because there
	 * is no index that can help us here.
	 */
	relation = heap_open(RelationRelationId, AccessShareLock);

	scan = systable_beginscan(relation, InvalidOid, false,
							  SnapshotNow, 0, NULL);

	while ((classTup = systable_getnext(scan)) != NULL)
	{
		Form_pg_class classForm;

		classForm = (Form_pg_class) GETSTRUCT(classTup);

		/*
		 * Only consider heap and TOAST tables (anything else should have
		 * InvalidTransactionId in both fields anyway.)
		 */
		if (classForm->relkind != RELKIND_RELATION &&
			classForm->relkind != RELKIND_TOASTVALUE)
			continue;

		Assert(TransactionIdIsNormal(classForm->relminxid));
		Assert(TransactionIdIsNormal(classForm->relvacuumxid));

		/*
		 * Compute the minimum relminxid in all the tables in the database.
		 */
		if ((!TransactionIdIsValid(newMinXid) ||
			 TransactionIdPrecedes(classForm->relminxid, newMinXid)))
			newMinXid = classForm->relminxid;

		/* ditto, for relvacuumxid */
		if ((!TransactionIdIsValid(newVacXid) ||
			 TransactionIdPrecedes(classForm->relvacuumxid, newVacXid)))
			newVacXid = classForm->relvacuumxid;
	}
795

796 797 798 799 800 801 802 803
	/* we're done with pg_class */
	systable_endscan(scan);
	heap_close(relation, AccessShareLock);

	Assert(TransactionIdIsNormal(newMinXid));
	Assert(TransactionIdIsNormal(newVacXid));

	/* Now fetch the pg_database tuple we need to update. */
804
	relation = heap_open(DatabaseRelationId, RowExclusiveLock);
805

806 807 808 809
	/* Fetch a copy of the tuple to scribble on */
	tuple = SearchSysCacheCopy(DATABASEOID,
							   ObjectIdGetDatum(dbid),
							   0, 0, 0);
810
	if (!HeapTupleIsValid(tuple))
811
		elog(ERROR, "could not find tuple for database %u", dbid);
812 813
	dbform = (Form_pg_database) GETSTRUCT(tuple);

814 815 816 817 818 819 820 821 822 823
	if (TransactionIdPrecedes(dbform->datminxid, newMinXid))
	{
		dbform->datminxid = newMinXid;
		dirty = true;
	}
	if (TransactionIdPrecedes(dbform->datvacuumxid, newVacXid))
	{
		dbform->datvacuumxid = newVacXid;
		dirty = true;
	}
824

825 826
	if (dirty)
		heap_inplace_update(relation, tuple);
827

828
	heap_freetuple(tuple);
829
	heap_close(relation, RowExclusiveLock);
830

831 832 833 834
	/* set return values */
	*minxid = newMinXid;
	*vacuumxid = newVacXid;

835
	/* Mark the flat-file copy of pg_database for update at commit */
836
   	database_file_update_needed();
837 838 839 840 841 842 843 844
}


/*
 *	vac_truncate_clog() -- attempt to truncate the commit log
 *
 *		Scan pg_database to determine the system-wide oldest datvacuumxid,
 *		and use it to truncate the transaction commit log (pg_clog).
845 846 847 848 849 850 851
 *		Also update the XID wrap limit point maintained by varsup.c.
 *
 *		We also generate a warning if the system-wide oldest datfrozenxid
 *		seems to be in danger of wrapping around.  This is a long-in-advance
 *		warning; if we start getting uncomfortably close, GetNewTransactionId
 *		will generate more-annoying warnings, and ultimately refuse to issue
 *		any more new XIDs.
852 853
 *
 *		The passed XIDs are simply the ones I just wrote into my pg_database
854
 *		entry.	They're used to initialize the "min" calculations.
855 856 857 858 859
 *
 *		This routine is shared by full and lazy VACUUM.  Note that it is only
 *		applied after a database-wide VACUUM operation.
 */
static void
860
vac_truncate_clog(TransactionId myminxid, TransactionId myvacxid)
861
{
Bruce Momjian's avatar
Bruce Momjian committed
862
	TransactionId myXID = GetCurrentTransactionId();
863 864
	TransactionId minXID;
	TransactionId vacuumXID;
865 866 867 868
	Relation	relation;
	HeapScanDesc scan;
	HeapTuple	tuple;
	int32		age;
869
	NameData	oldest_datname;
870
	bool		vacuumAlreadyWrapped = false;
871
	bool		minAlreadyWrapped = false;
872

873 874 875
	/* Initialize the minimum values. */
	minXID = myminxid;
	vacuumXID = myvacxid;
876
	namestrcpy(&oldest_datname, get_database_name(MyDatabaseId));
877

878 879 880 881
	/*
	 * Note: the "already wrapped" cases should now be impossible due to the
	 * defenses in GetNewTransactionId, but we keep them anyway.
	 */
882
	relation = heap_open(DatabaseRelationId, AccessShareLock);
883

884
	scan = heap_beginscan(relation, SnapshotNow, 0, NULL);
885

886
	while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
887 888 889
	{
		Form_pg_database dbform = (Form_pg_database) GETSTRUCT(tuple);

890 891
		Assert(TransactionIdIsNormal(dbform->datvacuumxid));
		Assert(TransactionIdIsNormal(dbform->datminxid));
892

893 894 895 896 897 898 899 900
		if (TransactionIdPrecedes(myXID, dbform->datvacuumxid))
			vacuumAlreadyWrapped = true;
		else if (TransactionIdPrecedes(dbform->datvacuumxid, vacuumXID))
			vacuumXID = dbform->datvacuumxid;

		if (TransactionIdPrecedes(myXID, dbform->datminxid))
			minAlreadyWrapped = true;
		else if (TransactionIdPrecedes(dbform->datminxid, minXID))
901
		{
902 903
			minXID = dbform->datminxid;
			namecpy(&oldest_datname, &dbform->datname);
904
		}
905 906 907 908 909 910
	}

	heap_endscan(scan);

	heap_close(relation, AccessShareLock);

911
	/*
912 913
	 * Do not truncate CLOG if we seem to have suffered wraparound already;
	 * the computed minimum XID might be bogus.
914 915 916
	 */
	if (vacuumAlreadyWrapped)
	{
917 918 919
		ereport(WARNING,
				(errmsg("some databases have not been vacuumed in over 2 billion transactions"),
				 errdetail("You may have already suffered transaction-wraparound data loss.")));
920 921 922
		return;
	}

923
	/* Truncate CLOG to the oldest vacuumxid */
924 925
	TruncateCLOG(vacuumXID);

926
	/*
927 928
	 * Do not update varsup.c if we seem to have suffered wraparound already;
	 * the computed XID might be bogus.
929
	 */
930
	if (minAlreadyWrapped)
931
	{
932 933 934
		ereport(WARNING,
				(errmsg("some databases have not been vacuumed in over 1 billion transactions"),
				 errhint("Better vacuum them soon, or you may have a wraparound failure.")));
935
		return;
936
	}
937 938

	/* Update the wrap limit for GetNewTransactionId */
939
	SetTransactionIdLimit(minXID, &oldest_datname);
940 941

	/* Give warning about impending wraparound problems */
942
	age = (int32) (myXID - minXID);
943 944
	if (age > (int32) ((MaxTransactionId >> 3) * 3))
		ereport(WARNING,
945 946 947 948 949
		   (errmsg("database \"%s\" must be vacuumed within %u transactions",
				   NameStr(oldest_datname),
				   (MaxTransactionId >> 1) - age),
			errhint("To avoid a database shutdown, execute a full-database VACUUM in \"%s\".",
					NameStr(oldest_datname))));
950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971

	/*
	 * Have the postmaster start an autovacuum iteration.  If the user has
	 * autovacuum configured, this is not needed; otherwise, we need to make
	 * sure we have some mechanism to cope with transaction Id wraparound.
	 * Ideally this would only be needed for template databases, because all
	 * other databases should be kept nicely pruned by regular vacuuming.
	 *
	 * XXX -- the test we use here is fairly arbitrary.  Note that in the
	 * autovacuum database-wide code, a template database is always processed
	 * with VACUUM FREEZE, so we can be sure that it will be truly frozen so
	 * it won't be need to be processed here again soon.  
	 *
	 * FIXME -- here we could get into a kind of loop if the database being
	 * chosen is not actually a template database, because we'll not freeze
	 * it, so its age may not really decrease if there are any live
	 * non-freezable tuples.  Consider forcing a vacuum freeze if autovacuum
	 * is invoked by a backend.  On the other hand, forcing a vacuum freeze
	 * on a user database may not a be a very polite thing to do.
	 */
	if (!AutoVacuumingActive() && age > (int32) ((MaxTransactionId >> 3) * 3))
		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC);
972 973 974
}


975 976 977 978 979 980 981 982 983 984
/****************************************************************************
 *																			*
 *			Code common to both flavors of VACUUM							*
 *																			*
 ****************************************************************************
 */


/*
 *	vacuum_rel() -- vacuum one heap relation
985
 *
986 987 988 989 990
 *		Doing one heap at a time incurs extra overhead, since we need to
 *		check that the heap exists again just before we vacuum it.	The
 *		reason that we do this is so that vacuuming can be spread across
 *		many small transactions.  Otherwise, two-phase locking would require
 *		us to lock the entire database during one pass of the vacuum cleaner.
991 992
 *
 *		At entry and exit, we are not inside a transaction.
993
 */
994
static void
995
vacuum_rel(Oid relid, VacuumStmt *vacstmt, char expected_relkind)
996
{
997
	LOCKMODE	lmode;
998
	Relation	onerel;
999
	LockRelId	onerelid;
1000
	Oid			toast_relid;
1001

1002
	/* Begin a transaction for vacuuming this relation */
1003
	StartTransactionCommand();
1004 1005
	/* functions in indexes may want a snapshot set */
	ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
1006

Jan Wieck's avatar
Jan Wieck committed
1007
	/*
Bruce Momjian's avatar
Bruce Momjian committed
1008 1009
	 * Tell the cache replacement strategy that vacuum is causing all
	 * following IO
Jan Wieck's avatar
Jan Wieck committed
1010 1011 1012
	 */
	StrategyHintVacuum(true);

1013
	/*
Bruce Momjian's avatar
Bruce Momjian committed
1014
	 * Check for user-requested abort.	Note we want this to be inside a
Bruce Momjian's avatar
Bruce Momjian committed
1015
	 * transaction, so xact.c doesn't issue useless WARNING.
1016
	 */
1017
	CHECK_FOR_INTERRUPTS();
1018

1019
	/*
1020 1021
	 * Race condition -- if the pg_class tuple has gone away since the last
	 * time we saw it, we don't need to vacuum it.
1022
	 */
1023 1024 1025
	if (!SearchSysCacheExists(RELOID,
							  ObjectIdGetDatum(relid),
							  0, 0, 0))
1026
	{
Jan Wieck's avatar
Jan Wieck committed
1027
		StrategyHintVacuum(false);
1028
		CommitTransactionCommand();
1029
		return;
1030 1031
	}

1032
	/*
1033
	 * Determine the type of lock we want --- hard exclusive lock for a FULL
1034 1035
	 * vacuum, but just ShareUpdateExclusiveLock for concurrent vacuum. Either
	 * way, we can be sure that no other backend is vacuuming the same table.
1036 1037 1038 1039
	 */
	lmode = vacstmt->full ? AccessExclusiveLock : ShareUpdateExclusiveLock;

	/*
1040
	 * Open the class, get an appropriate lock on it, and check permissions.
1041
	 *
1042 1043 1044
	 * We allow the user to vacuum a table if he is superuser, the table
	 * owner, or the database owner (but in the latter case, only if it's not
	 * a shared relation).	pg_class_ownercheck includes the superuser case.
1045
	 *
1046 1047
	 * Note we choose to treat permissions failure as a WARNING and keep
	 * trying to vacuum the rest of the DB --- is this appropriate?
1048
	 */
1049
	onerel = relation_open(relid, lmode);
1050

1051
	if (!(pg_class_ownercheck(RelationGetRelid(onerel), GetUserId()) ||
1052
		  (pg_database_ownercheck(MyDatabaseId, GetUserId()) && !onerel->rd_rel->relisshared)))
1053
	{
1054
		ereport(WARNING,
1055
				(errmsg("skipping \"%s\" --- only table or database owner can vacuum it",
1056
						RelationGetRelationName(onerel))));
1057
		relation_close(onerel, lmode);
Jan Wieck's avatar
Jan Wieck committed
1058
		StrategyHintVacuum(false);
1059
		CommitTransactionCommand();
1060
		return;
1061 1062 1063
	}

	/*
1064 1065
	 * Check that it's a plain table; we used to do this in get_rel_oids() but
	 * seems safer to check after we've locked the relation.
1066 1067 1068
	 */
	if (onerel->rd_rel->relkind != expected_relkind)
	{
1069
		ereport(WARNING,
1070
				(errmsg("skipping \"%s\" --- cannot vacuum indexes, views, or special system tables",
1071
						RelationGetRelationName(onerel))));
1072
		relation_close(onerel, lmode);
Jan Wieck's avatar
Jan Wieck committed
1073
		StrategyHintVacuum(false);
1074
		CommitTransactionCommand();
1075
		return;
1076 1077
	}

1078 1079 1080 1081 1082 1083 1084 1085 1086 1087
	/*
	 * Silently ignore tables that are temp tables of other backends ---
	 * trying to vacuum these will lead to great unhappiness, since their
	 * contents are probably not up-to-date on disk.  (We don't throw a
	 * warning here; it would just lead to chatter during a database-wide
	 * VACUUM.)
	 */
	if (isOtherTempNamespace(RelationGetNamespace(onerel)))
	{
		relation_close(onerel, lmode);
Jan Wieck's avatar
Jan Wieck committed
1088
		StrategyHintVacuum(false);
1089
		CommitTransactionCommand();
1090
		return;			/* assume no long-lived data in temp tables */
1091 1092
	}

1093
	/*
1094 1095
	 * Get a session-level lock too. This will protect our access to the
	 * relation across multiple transactions, so that we can vacuum the
1096 1097
	 * relation's TOAST table (if any) secure in the knowledge that no one is
	 * deleting the parent relation.
1098 1099 1100 1101 1102 1103
	 *
	 * NOTE: this cannot block, even if someone else is waiting for access,
	 * because the lock manager knows that both lock requests are from the
	 * same process.
	 */
	onerelid = onerel->rd_lockInfo.lockRelId;
1104
	LockRelationForSession(&onerelid, onerel->rd_istemp, lmode);
1105

1106 1107 1108
	/*
	 * Remember the relation's TOAST relation for later
	 */
1109 1110
	toast_relid = onerel->rd_rel->reltoastrelid;

1111 1112 1113
	/*
	 * Do the actual work --- either FULL or "lazy" vacuum
	 */
1114
	if (vacstmt->full)
1115
		full_vacuum_rel(onerel, vacstmt);
1116
	else
1117
		lazy_vacuum_rel(onerel, vacstmt);
1118 1119

	/* all done with this class, but hold lock until commit */
1120
	relation_close(onerel, NoLock);
1121

1122 1123 1124
	/*
	 * Complete the transaction and free all temporary memory used.
	 */
Jan Wieck's avatar
Jan Wieck committed
1125
	StrategyHintVacuum(false);
1126
	CommitTransactionCommand();
1127 1128 1129 1130

	/*
	 * If the relation has a secondary toast rel, vacuum that too while we
	 * still hold the session lock on the master table.  Note however that
1131 1132 1133
	 * "analyze" will not get done on the toast table.	This is good, because
	 * the toaster always uses hardcoded index access and statistics are
	 * totally unimportant for toast relations.
1134 1135
	 */
	if (toast_relid != InvalidOid)
1136
		vacuum_rel(toast_relid, vacstmt, RELKIND_TOASTVALUE);
1137

1138 1139 1140
	/*
	 * Now release the session-level lock on the master table.
	 */
1141
	UnlockRelationForSession(&onerelid, lmode);
Tom Lane's avatar
ARGH!  
Tom Lane committed
1142

1143
	return;
1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157
}


/****************************************************************************
 *																			*
 *			Code for VACUUM FULL (only)										*
 *																			*
 ****************************************************************************
 */


/*
 *	full_vacuum_rel() -- perform FULL VACUUM for one heap relation
 *
1158
 *		This routine vacuums a single heap, cleans out its indexes, and
1159 1160 1161 1162 1163 1164
 *		updates its num_pages and num_tuples statistics.
 *
 *		At entry, we have already established a transaction and opened
 *		and locked the relation.
 */
static void
1165
full_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
1166 1167
{
	VacPageListData vacuum_pages;		/* List of pages to vacuum and/or
1168
										 * clean indexes */
1169 1170
	VacPageListData fraged_pages;		/* List of pages with space enough for
										 * re-using */
1171
	Relation   *Irel;
1172
	int			nindexes,
1173 1174
				i;
	VRelStats  *vacrelstats;
1175 1176
	TransactionId FreezeLimit,
				  OldestXmin;
1177

1178 1179
	vacuum_set_xid_limits(vacstmt, onerel->rd_rel->relisshared,
						  &OldestXmin, &FreezeLimit);
1180

1181 1182 1183
	/*
	 * Set up statistics-gathering machinery.
	 */
1184
	vacrelstats = (VRelStats *) palloc(sizeof(VRelStats));
1185 1186
	vacrelstats->rel_pages = 0;
	vacrelstats->rel_tuples = 0;
1187
	vacrelstats->hasindex = false;
Bruce Momjian's avatar
Bruce Momjian committed
1188

1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199
	/*
	 * Set initial minimum Xid, which will be updated if a smaller Xid is found
	 * in the relation by scan_heap.
	 *
	 * We use RecentXmin here (the minimum Xid that belongs to a transaction
	 * that is still open according to our snapshot), because it is the
	 * earliest transaction that could insert new tuples in the table after our
	 * VACUUM is done.
	 */
	vacrelstats->minxid = RecentXmin;

1200
	/* scan the heap */
Bruce Momjian's avatar
Bruce Momjian committed
1201
	vacuum_pages.num_pages = fraged_pages.num_pages = 0;
1202 1203
	scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages, FreezeLimit,
			  OldestXmin);
1204

1205
	/* Now open all indexes of the relation */
1206
	vac_open_indexes(onerel, AccessExclusiveLock, &nindexes, &Irel);
1207
	if (nindexes > 0)
1208
		vacrelstats->hasindex = true;
1209

1210
	/* Clean/scan index relation(s) */
1211
	if (Irel != NULL)
1212
	{
Bruce Momjian's avatar
Bruce Momjian committed
1213
		if (vacuum_pages.num_pages > 0)
1214
		{
1215
			for (i = 0; i < nindexes; i++)
1216
				vacuum_index(&vacuum_pages, Irel[i],
1217
							 vacrelstats->rel_tuples, 0);
1218 1219 1220
		}
		else
		{
1221 1222
			/* just scan indexes to update statistic */
			for (i = 0; i < nindexes; i++)
1223
				scan_index(Irel[i], vacrelstats->rel_tuples);
1224 1225 1226
		}
	}

1227 1228 1229 1230
	if (fraged_pages.num_pages > 0)
	{
		/* Try to shrink heap */
		repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
1231
					nindexes, Irel, OldestXmin);
1232
		vac_close_indexes(nindexes, Irel, NoLock);
1233
	}
1234 1235
	else
	{
1236
		vac_close_indexes(nindexes, Irel, NoLock);
1237 1238 1239
		if (vacuum_pages.num_pages > 0)
		{
			/* Clean pages from vacuum_pages list */
Bruce Momjian's avatar
Bruce Momjian committed
1240
			vacuum_heap(vacrelstats, onerel, &vacuum_pages);
1241
		}
1242
	}
1243

1244 1245 1246
	/* update shared free space map with final free space info */
	vac_update_fsm(onerel, &fraged_pages, vacrelstats->rel_pages);

1247
	/* update statistics in pg_class */
1248
	vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
1249 1250
						vacrelstats->rel_tuples, vacrelstats->hasindex,
						vacrelstats->minxid, OldestXmin);
1251 1252

	/* report results to the stats collector, too */
1253
	pgstat_report_vacuum(RelationGetRelid(onerel), onerel->rd_rel->relisshared,
1254
						 vacstmt->analyze, vacrelstats->rel_tuples);
1255 1256
}

1257

1258
/*
Bruce Momjian's avatar
Bruce Momjian committed
1259
 *	scan_heap() -- scan an open heap relation
1260
 *
1261 1262 1263 1264
 *		This routine sets commit status bits, constructs vacuum_pages (list
 *		of pages we need to compact free space on and/or clean indexes of
 *		deleted tuples), constructs fraged_pages (list of pages with free
 *		space that tuples could be moved into), and calculates statistics
1265
 *		on the number of live tuples in the heap.
1266 1267 1268
 *
 *		It also updates the minimum Xid found anywhere on the table in
 *		vacrelstats->minxid, for later storing it in pg_class.relminxid.
1269 1270
 */
static void
Bruce Momjian's avatar
Bruce Momjian committed
1271
scan_heap(VRelStats *vacrelstats, Relation onerel,
1272 1273
		  VacPageList vacuum_pages, VacPageList fraged_pages,
		  TransactionId FreezeLimit, TransactionId OldestXmin)
1274
{
1275
	BlockNumber nblocks,
1276 1277
				blkno;
	char	   *relname;
Bruce Momjian's avatar
Bruce Momjian committed
1278
	VacPage		vacpage;
1279
	BlockNumber empty_pages,
Bruce Momjian's avatar
Bruce Momjian committed
1280
				empty_end_pages;
1281 1282 1283
	double		num_tuples,
				tups_vacuumed,
				nkeep,
1284
				nunused;
1285 1286
	double		free_space,
				usable_free_space;
1287
	Size		min_tlen = MaxTupleSize;
1288 1289
	Size		max_tlen = 0;
	bool		do_shrinking = true;
1290 1291 1292
	VTupleLink	vtlinks = (VTupleLink) palloc(100 * sizeof(VTupleLinkData));
	int			num_vtlinks = 0;
	int			free_vtlinks = 100;
1293
	PGRUsage	ru0;
1294

1295
	pg_rusage_init(&ru0);
1296

1297
	relname = RelationGetRelationName(onerel);
1298 1299 1300 1301
	ereport(elevel,
			(errmsg("vacuuming \"%s.%s\"",
					get_namespace_name(RelationGetNamespace(onerel)),
					relname)));
1302

1303
	empty_pages = empty_end_pages = 0;
1304
	num_tuples = tups_vacuumed = nkeep = nunused = 0;
1305
	free_space = 0;
1306 1307 1308

	nblocks = RelationGetNumberOfBlocks(onerel);

1309 1310 1311 1312
	/*
	 * We initially create each VacPage item in a maximal-sized workspace,
	 * then copy the workspace into a just-large-enough copy.
	 */
Bruce Momjian's avatar
Bruce Momjian committed
1313
	vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
1314 1315 1316

	for (blkno = 0; blkno < nblocks; blkno++)
	{
1317 1318 1319 1320
		Page		page,
					tempPage = NULL;
		bool		do_reap,
					do_frag;
Bruce Momjian's avatar
Bruce Momjian committed
1321 1322 1323 1324 1325
		Buffer		buf;
		OffsetNumber offnum,
					maxoff;
		bool		pgchanged,
					notup;
1326

1327
		vacuum_delay_point();
1328

1329 1330
		buf = ReadBuffer(onerel, blkno);
		page = BufferGetPage(buf);
1331

1332
		/*
1333
		 * Since we are holding exclusive lock on the relation, no other
1334 1335 1336 1337 1338
		 * backend can be accessing the page; however it is possible that the
		 * background writer will try to write the page if it's already marked
		 * dirty.  To ensure that invalid data doesn't get written to disk, we
		 * must take exclusive buffer lock wherever we potentially modify
		 * pages.
1339
		 */
1340
		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
1341

Bruce Momjian's avatar
Bruce Momjian committed
1342
		vacpage->blkno = blkno;
1343
		vacpage->offsets_used = 0;
Bruce Momjian's avatar
Bruce Momjian committed
1344
		vacpage->offsets_free = 0;
1345

1346 1347
		if (PageIsNew(page))
		{
Bruce Momjian's avatar
Bruce Momjian committed
1348
			VacPage		vacpagecopy;
Bruce Momjian's avatar
Bruce Momjian committed
1349

1350
			ereport(WARNING,
1351 1352
			   (errmsg("relation \"%s\" page %u is uninitialized --- fixing",
					   relname, blkno)));
1353
			PageInit(page, BufferGetPageSize(buf), 0);
1354
			MarkBufferDirty(buf);
1355
			vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1356 1357
			free_space += vacpage->free;
			empty_pages++;
Bruce Momjian's avatar
Bruce Momjian committed
1358
			empty_end_pages++;
1359 1360 1361
			vacpagecopy = copy_vac_page(vacpage);
			vpage_insert(vacuum_pages, vacpagecopy);
			vpage_insert(fraged_pages, vacpagecopy);
1362
			UnlockReleaseBuffer(buf);
1363
			continue;
1364
		}
1365 1366

		if (PageIsEmpty(page))
1367
		{
Bruce Momjian's avatar
Bruce Momjian committed
1368
			VacPage		vacpagecopy;
Bruce Momjian's avatar
Bruce Momjian committed
1369

1370
			vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1371
			free_space += vacpage->free;
1372
			empty_pages++;
Bruce Momjian's avatar
Bruce Momjian committed
1373
			empty_end_pages++;
1374 1375 1376
			vacpagecopy = copy_vac_page(vacpage);
			vpage_insert(vacuum_pages, vacpagecopy);
			vpage_insert(fraged_pages, vacpagecopy);
1377
			UnlockReleaseBuffer(buf);
1378
			continue;
1379 1380
		}

1381 1382 1383 1384 1385 1386
		pgchanged = false;
		notup = true;
		maxoff = PageGetMaxOffsetNumber(page);
		for (offnum = FirstOffsetNumber;
			 offnum <= maxoff;
			 offnum = OffsetNumberNext(offnum))
1387
		{
Bruce Momjian's avatar
Bruce Momjian committed
1388 1389
			ItemId		itemid = PageGetItemId(page, offnum);
			bool		tupgone = false;
1390
			HeapTupleData tuple;
1391 1392

			/*
1393
			 * Collect un-used items too - it's possible to have indexes
1394 1395 1396 1397
			 * pointing here after crash.
			 */
			if (!ItemIdIsUsed(itemid))
			{
Bruce Momjian's avatar
Bruce Momjian committed
1398
				vacpage->offsets[vacpage->offsets_free++] = offnum;
1399
				nunused += 1;
1400 1401 1402
				continue;
			}

1403 1404 1405
			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
			tuple.t_len = ItemIdGetLength(itemid);
			ItemPointerSet(&(tuple.t_self), blkno, offnum);
1406

1407
			switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf))
1408
			{
1409
				case HEAPTUPLE_DEAD:
1410
					tupgone = true;		/* we can delete the tuple */
1411 1412
					break;
				case HEAPTUPLE_LIVE:
1413

1414
					/*
1415 1416
					 * Tuple is good.  Consider whether to replace its xmin
					 * value with FrozenTransactionId.
1417
					 */
1418 1419
					if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple.t_data)) &&
						TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
1420 1421
											  FreezeLimit))
					{
1422
						HeapTupleHeaderSetXmin(tuple.t_data, FrozenTransactionId);
Tom Lane's avatar
Tom Lane committed
1423 1424
						/* infomask should be okay already */
						Assert(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED);
1425 1426
						pgchanged = true;
					}
1427

1428 1429 1430
					/*
					 * Other checks...
					 */
1431 1432 1433 1434
					if (onerel->rd_rel->relhasoids &&
						!OidIsValid(HeapTupleGetOid(&tuple)))
						elog(WARNING, "relation \"%s\" TID %u/%u: OID is invalid",
							 relname, blkno, offnum);
1435 1436
					break;
				case HEAPTUPLE_RECENTLY_DEAD:
1437

1438
					/*
1439 1440
					 * If tuple is recently deleted then we must not remove it
					 * from relation.
1441
					 */
1442
					nkeep += 1;
1443

1444
					/*
1445 1446
					 * If we do shrinking and this tuple is updated one then
					 * remember it to construct updated tuple dependencies.
1447
					 */
1448 1449 1450
					if (do_shrinking &&
						!(ItemPointerEquals(&(tuple.t_self),
											&(tuple.t_data->t_ctid))))
1451 1452 1453 1454
					{
						if (free_vtlinks == 0)
						{
							free_vtlinks = 1000;
Bruce Momjian's avatar
Bruce Momjian committed
1455
							vtlinks = (VTupleLink) repalloc(vtlinks,
1456 1457
											   (free_vtlinks + num_vtlinks) *
													 sizeof(VTupleLinkData));
1458 1459 1460 1461 1462 1463
						}
						vtlinks[num_vtlinks].new_tid = tuple.t_data->t_ctid;
						vtlinks[num_vtlinks].this_tid = tuple.t_self;
						free_vtlinks--;
						num_vtlinks++;
					}
1464 1465
					break;
				case HEAPTUPLE_INSERT_IN_PROGRESS:
1466

1467
					/*
1468 1469 1470 1471
					 * This should not happen, since we hold exclusive lock on
					 * the relation; shouldn't we raise an error? (Actually,
					 * it can happen in system catalogs, since we tend to
					 * release write lock before commit there.)
1472
					 */
1473 1474 1475
					ereport(NOTICE,
							(errmsg("relation \"%s\" TID %u/%u: InsertTransactionInProgress %u --- can't shrink relation",
									relname, blkno, offnum, HeapTupleHeaderGetXmin(tuple.t_data))));
1476 1477 1478
					do_shrinking = false;
					break;
				case HEAPTUPLE_DELETE_IN_PROGRESS:
1479

1480
					/*
1481 1482 1483 1484
					 * This should not happen, since we hold exclusive lock on
					 * the relation; shouldn't we raise an error? (Actually,
					 * it can happen in system catalogs, since we tend to
					 * release write lock before commit there.)
1485
					 */
1486 1487 1488
					ereport(NOTICE,
							(errmsg("relation \"%s\" TID %u/%u: DeleteTransactionInProgress %u --- can't shrink relation",
									relname, blkno, offnum, HeapTupleHeaderGetXmax(tuple.t_data))));
1489 1490 1491
					do_shrinking = false;
					break;
				default:
1492
					elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
1493
					break;
1494 1495 1496 1497
			}

			if (tupgone)
			{
1498
				ItemId		lpp;
1499

1500
				/*
1501 1502
				 * Here we are building a temporary copy of the page with dead
				 * tuples removed.	Below we will apply
1503
				 * PageRepairFragmentation to the copy, so that we can
1504 1505 1506
				 * determine how much space will be available after removal of
				 * dead tuples.  But note we are NOT changing the real page
				 * yet...
1507
				 */
1508
				if (tempPage == NULL)
1509
				{
1510
					Size		pageSize;
1511 1512 1513

					pageSize = PageGetPageSize(page);
					tempPage = (Page) palloc(pageSize);
1514
					memcpy(tempPage, page, pageSize);
1515 1516
				}

1517
				/* mark it unused on the temp page */
1518
				lpp = PageGetItemId(tempPage, offnum);
1519 1520
				lpp->lp_flags &= ~LP_USED;

Bruce Momjian's avatar
Bruce Momjian committed
1521
				vacpage->offsets[vacpage->offsets_free++] = offnum;
1522
				tups_vacuumed += 1;
1523 1524 1525
			}
			else
			{
1526 1527
				TransactionId	min;

1528
				num_tuples += 1;
1529
				notup = false;
1530 1531 1532 1533
				if (tuple.t_len < min_tlen)
					min_tlen = tuple.t_len;
				if (tuple.t_len > max_tlen)
					max_tlen = tuple.t_len;
1534 1535 1536 1537 1538 1539 1540 1541 1542

				/* 
				 * If the tuple is alive, we consider it for the "minxid"
				 * calculations.
				 */
				min = vactuple_get_minxid(&tuple);
				if (TransactionIdIsValid(min) &&
					TransactionIdPrecedes(min, vacrelstats->minxid))
					vacrelstats->minxid = min;
1543
			}
1544
		}						/* scan along page */
1545

1546
		if (tempPage != NULL)
1547 1548
		{
			/* Some tuples are removable; figure free space after removal */
1549
			PageRepairFragmentation(tempPage, NULL);
1550
			vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, tempPage);
1551
			pfree(tempPage);
1552
			do_reap = true;
1553
		}
1554 1555 1556
		else
		{
			/* Just use current available space */
1557
			vacpage->free = PageGetFreeSpaceWithFillFactor(onerel, page);
1558 1559
			/* Need to reap the page if it has ~LP_USED line pointers */
			do_reap = (vacpage->offsets_free > 0);
1560
		}
1561

1562
		free_space += vacpage->free;
1563

1564 1565
		/*
		 * Add the page to fraged_pages if it has a useful amount of free
1566
		 * space.  "Useful" means enough for a minimal-sized tuple. But we
1567 1568
		 * don't know that accurately near the start of the relation, so add
		 * pages unconditionally if they have >= BLCKSZ/10 free space.
1569
		 */
1570
		do_frag = (vacpage->free >= min_tlen || vacpage->free >= BLCKSZ / 10);
1571 1572

		if (do_reap || do_frag)
1573
		{
Bruce Momjian's avatar
Bruce Momjian committed
1574 1575
			VacPage		vacpagecopy = copy_vac_page(vacpage);

1576 1577 1578 1579
			if (do_reap)
				vpage_insert(vacuum_pages, vacpagecopy);
			if (do_frag)
				vpage_insert(fraged_pages, vacpagecopy);
1580 1581
		}

1582 1583
		/*
		 * Include the page in empty_end_pages if it will be empty after
1584
		 * vacuuming; this is to keep us from using it as a move destination.
1585
		 */
1586
		if (notup)
1587 1588
		{
			empty_pages++;
Bruce Momjian's avatar
Bruce Momjian committed
1589
			empty_end_pages++;
1590
		}
1591
		else
Bruce Momjian's avatar
Bruce Momjian committed
1592
			empty_end_pages = 0;
1593 1594

		if (pgchanged)
1595 1596
			MarkBufferDirty(buf);
		UnlockReleaseBuffer(buf);
1597 1598
	}

Bruce Momjian's avatar
Bruce Momjian committed
1599
	pfree(vacpage);
1600 1601

	/* save stats in the rel list for use later */
1602 1603
	vacrelstats->rel_tuples = num_tuples;
	vacrelstats->rel_pages = nblocks;
1604
	if (num_tuples == 0)
1605 1606 1607 1608
		min_tlen = max_tlen = 0;
	vacrelstats->min_tlen = min_tlen;
	vacrelstats->max_tlen = max_tlen;

Bruce Momjian's avatar
Bruce Momjian committed
1609 1610
	vacuum_pages->empty_end_pages = empty_end_pages;
	fraged_pages->empty_end_pages = empty_end_pages;
1611 1612

	/*
1613 1614 1615
	 * Clear the fraged_pages list if we found we couldn't shrink. Else,
	 * remove any "empty" end-pages from the list, and compute usable free
	 * space = free space in remaining pages.
1616
	 */
1617
	if (do_shrinking)
1618
	{
Bruce Momjian's avatar
Bruce Momjian committed
1619 1620
		int			i;

1621 1622
		Assert((BlockNumber) fraged_pages->num_pages >= empty_end_pages);
		fraged_pages->num_pages -= empty_end_pages;
1623
		usable_free_space = 0;
1624
		for (i = 0; i < fraged_pages->num_pages; i++)
1625
			usable_free_space += fraged_pages->pagedesc[i]->free;
1626 1627 1628 1629
	}
	else
	{
		fraged_pages->num_pages = 0;
1630
		usable_free_space = 0;
1631
	}
1632

1633 1634
	/* don't bother to save vtlinks if we will not call repair_frag */
	if (fraged_pages->num_pages > 0 && num_vtlinks > 0)
1635
	{
Bruce Momjian's avatar
Bruce Momjian committed
1636
		qsort((char *) vtlinks, num_vtlinks, sizeof(VTupleLinkData),
Bruce Momjian's avatar
Bruce Momjian committed
1637
			  vac_cmp_vtlinks);
1638 1639 1640 1641 1642 1643 1644 1645 1646 1647
		vacrelstats->vtlinks = vtlinks;
		vacrelstats->num_vtlinks = num_vtlinks;
	}
	else
	{
		vacrelstats->vtlinks = NULL;
		vacrelstats->num_vtlinks = 0;
		pfree(vtlinks);
	}

1648
	ereport(elevel,
1649
			(errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1650 1651
					RelationGetRelationName(onerel),
					tups_vacuumed, num_tuples, nblocks),
1652
			 errdetail("%.0f dead row versions cannot be removed yet.\n"
1653
			  "Nonremovable row versions range from %lu to %lu bytes long.\n"
1654
					   "There were %.0f unused item pointers.\n"
1655
	   "Total free space (including removable row versions) is %.0f bytes.\n"
1656
					   "%u pages are or will become empty, including %u at the end of the table.\n"
1657
	 "%u pages containing %.0f free bytes are potential move destinations.\n"
1658
					   "%s.",
1659 1660 1661 1662 1663 1664
					   nkeep,
					   (unsigned long) min_tlen, (unsigned long) max_tlen,
					   nunused,
					   free_space,
					   empty_pages, empty_end_pages,
					   fraged_pages->num_pages, usable_free_space,
1665
					   pg_rusage_show(&ru0))));
Bruce Momjian's avatar
Bruce Momjian committed
1666
}
1667

1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724
/*
 * vactuple_get_minxid
 *
 * Get the minimum relevant Xid for a tuple, not considering FrozenXid.
 * Return InvalidXid if none (i.e., xmin=FrozenXid, xmax=InvalidXid).
 * This is for the purpose of calculating pg_class.relminxid for a table
 * we're vacuuming.
 */
TransactionId
vactuple_get_minxid(HeapTuple tuple)
{
	TransactionId	min = InvalidTransactionId;

	/* 
	 * Initialize calculations with Xmin.  NB -- may be FrozenXid and
	 * we don't want that one.
	 */
	if (TransactionIdIsNormal(HeapTupleHeaderGetXmin(tuple->t_data)))
		min = HeapTupleHeaderGetXmin(tuple->t_data);

	/*
	 * If Xmax is not marked INVALID, we assume it's valid without making
	 * further checks on it --- it must be recently obsoleted or still running,
	 * else HeapTupleSatisfiesVacuum would have deemed it removable.
	 */
	if (!(tuple->t_data->t_infomask | HEAP_XMAX_INVALID))
	{
		TransactionId	 xmax = HeapTupleHeaderGetXmax(tuple->t_data);

		/* If xmax is a plain Xid, consider it by itself */
		if (!(tuple->t_data->t_infomask | HEAP_XMAX_IS_MULTI))
		{
			if (!TransactionIdIsValid(min) ||
				(TransactionIdIsNormal(xmax) &&
				 TransactionIdPrecedes(xmax, min)))
				min = xmax;
		}
		else
		{
			/* If it's a MultiXactId, consider each of its members */
			TransactionId *members;
			int			nmembers,
						membno;

			nmembers = GetMultiXactIdMembers(xmax, &members);

			for (membno = 0; membno < nmembers; membno++)
			{
				if (!TransactionIdIsValid(min) ||
					TransactionIdPrecedes(members[membno], min))
					min = members[membno];
			}
		}
	}

	return min;
}
1725 1726

/*
Bruce Momjian's avatar
Bruce Momjian committed
1727
 *	repair_frag() -- try to repair relation's fragmentation
1728
 *
1729
 *		This routine marks dead tuples as unused and tries re-use dead space
1730 1731
 *		by moving tuples (and inserting indexes if needed). It constructs
 *		Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
1732 1733 1734
 *		for them after committing (in hack-manner - without losing locks
 *		and freeing memory!) current transaction. It truncates relation
 *		if some end-blocks are gone away.
1735 1736
 */
static void
Bruce Momjian's avatar
Bruce Momjian committed
1737
repair_frag(VRelStats *vacrelstats, Relation onerel,
1738
			VacPageList vacuum_pages, VacPageList fraged_pages,
1739
			int nindexes, Relation *Irel, TransactionId OldestXmin)
1740
{
Bruce Momjian's avatar
Bruce Momjian committed
1741 1742
	TransactionId myXID = GetCurrentTransactionId();
	Buffer		dst_buffer = InvalidBuffer;
1743
	BlockNumber nblocks,
1744
				blkno;
1745
	BlockNumber last_move_dest_block = 0,
1746
				last_vacuum_block;
Bruce Momjian's avatar
Bruce Momjian committed
1747
	Page		dst_page = NULL;
Bruce Momjian's avatar
Bruce Momjian committed
1748
	ExecContextData ec;
Bruce Momjian's avatar
Bruce Momjian committed
1749
	VacPageListData Nvacpagelist;
Bruce Momjian's avatar
Bruce Momjian committed
1750
	VacPage		dst_vacpage = NULL,
Bruce Momjian's avatar
Bruce Momjian committed
1751
				last_vacuum_page,
Bruce Momjian's avatar
Bruce Momjian committed
1752 1753
				vacpage,
			   *curpage;
1754
	int			i;
Bruce Momjian's avatar
Bruce Momjian committed
1755
	int			num_moved = 0,
Bruce Momjian's avatar
Bruce Momjian committed
1756 1757
				num_fraged_pages,
				vacuumed_pages;
Bruce Momjian's avatar
Bruce Momjian committed
1758
	int			keep_tuples = 0;
1759
	PGRUsage	ru0;
1760

1761
	pg_rusage_init(&ru0);
1762

Bruce Momjian's avatar
Bruce Momjian committed
1763
	ExecContext_Init(&ec, onerel);
1764

Bruce Momjian's avatar
Bruce Momjian committed
1765 1766
	Nvacpagelist.num_pages = 0;
	num_fraged_pages = fraged_pages->num_pages;
1767
	Assert((BlockNumber) vacuum_pages->num_pages >= vacuum_pages->empty_end_pages);
Bruce Momjian's avatar
Bruce Momjian committed
1768
	vacuumed_pages = vacuum_pages->num_pages - vacuum_pages->empty_end_pages;
1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779
	if (vacuumed_pages > 0)
	{
		/* get last reaped page from vacuum_pages */
		last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
		last_vacuum_block = last_vacuum_page->blkno;
	}
	else
	{
		last_vacuum_page = NULL;
		last_vacuum_block = InvalidBlockNumber;
	}
1780

Bruce Momjian's avatar
Bruce Momjian committed
1781 1782
	vacpage = (VacPage) palloc(sizeof(VacPageData) + MaxOffsetNumber * sizeof(OffsetNumber));
	vacpage->offsets_used = vacpage->offsets_free = 0;
1783

1784
	/*
1785 1786 1787 1788 1789
	 * Scan pages backwards from the last nonempty page, trying to move tuples
	 * down to lower pages.  Quit when we reach a page that we have moved any
	 * tuples onto, or the first page if we haven't moved anything, or when we
	 * find a page we cannot completely empty (this last condition is handled
	 * by "break" statements within the loop).
1790
	 *
1791 1792
	 * NB: this code depends on the vacuum_pages and fraged_pages lists being
	 * in order by blkno.
1793
	 */
1794
	nblocks = vacrelstats->rel_pages;
Bruce Momjian's avatar
Bruce Momjian committed
1795
	for (blkno = nblocks - vacuum_pages->empty_end_pages - 1;
1796 1797
		 blkno > last_move_dest_block;
		 blkno--)
1798
	{
Bruce Momjian's avatar
Bruce Momjian committed
1799 1800 1801 1802 1803 1804
		Buffer		buf;
		Page		page;
		OffsetNumber offnum,
					maxoff;
		bool		isempty,
					chain_tuple_moved;
Bruce Momjian's avatar
Bruce Momjian committed
1805

1806
		vacuum_delay_point();
1807

1808
		/*
1809 1810 1811 1812
		 * Forget fraged_pages pages at or after this one; they're no longer
		 * useful as move targets, since we only want to move down. Note that
		 * since we stop the outer loop at last_move_dest_block, pages removed
		 * here cannot have had anything moved onto them already.
1813
		 *
1814 1815 1816
		 * Also note that we don't change the stored fraged_pages list, only
		 * our local variable num_fraged_pages; so the forgotten pages are
		 * still available to be loaded into the free space map later.
1817 1818
		 */
		while (num_fraged_pages > 0 &&
1819
			   fraged_pages->pagedesc[num_fraged_pages - 1]->blkno >= blkno)
1820
		{
1821
			Assert(fraged_pages->pagedesc[num_fraged_pages - 1]->offsets_used == 0);
1822 1823 1824
			--num_fraged_pages;
		}

1825 1826 1827
		/*
		 * Process this page of relation.
		 */
1828 1829 1830
		buf = ReadBuffer(onerel, blkno);
		page = BufferGetPage(buf);

Bruce Momjian's avatar
Bruce Momjian committed
1831
		vacpage->offsets_free = 0;
1832 1833 1834

		isempty = PageIsEmpty(page);

1835 1836
		/* Is the page in the vacuum_pages list? */
		if (blkno == last_vacuum_block)
1837
		{
1838 1839 1840
			if (last_vacuum_page->offsets_free > 0)
			{
				/* there are dead tuples on this page - clean them */
1841
				Assert(!isempty);
1842 1843 1844
				LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
				vacuum_page(onerel, buf, last_vacuum_page);
				LockBuffer(buf, BUFFER_LOCK_UNLOCK);
1845 1846 1847
			}
			else
				Assert(isempty);
Bruce Momjian's avatar
Bruce Momjian committed
1848
			--vacuumed_pages;
1849 1850
			if (vacuumed_pages > 0)
			{
Bruce Momjian's avatar
Bruce Momjian committed
1851
				/* get prev reaped page from vacuum_pages */
Bruce Momjian's avatar
Bruce Momjian committed
1852 1853
				last_vacuum_page = vacuum_pages->pagedesc[vacuumed_pages - 1];
				last_vacuum_block = last_vacuum_page->blkno;
1854 1855
			}
			else
1856
			{
1857
				last_vacuum_page = NULL;
1858
				last_vacuum_block = InvalidBlockNumber;
1859
			}
1860 1861 1862 1863 1864 1865 1866 1867 1868
			if (isempty)
			{
				ReleaseBuffer(buf);
				continue;
			}
		}
		else
			Assert(!isempty);

1869 1870
		chain_tuple_moved = false;		/* no one chain-tuple was moved off
										 * this page, yet */
Bruce Momjian's avatar
Bruce Momjian committed
1871
		vacpage->blkno = blkno;
1872 1873 1874 1875 1876
		maxoff = PageGetMaxOffsetNumber(page);
		for (offnum = FirstOffsetNumber;
			 offnum <= maxoff;
			 offnum = OffsetNumberNext(offnum))
		{
Bruce Momjian's avatar
Bruce Momjian committed
1877 1878 1879
			Size		tuple_len;
			HeapTupleData tuple;
			ItemId		itemid = PageGetItemId(page, offnum);
1880 1881 1882 1883

			if (!ItemIdIsUsed(itemid))
				continue;

1884 1885 1886
			tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
			tuple_len = tuple.t_len = ItemIdGetLength(itemid);
			ItemPointerSet(&(tuple.t_self), blkno, offnum);
1887

1888
			/* ---
1889 1890
			 * VACUUM FULL has an exclusive lock on the relation.  So
			 * normally no other transaction can have pending INSERTs or
1891 1892 1893 1894 1895 1896 1897 1898 1899 1900
			 * DELETEs in this relation.  A tuple is either:
			 *		(a) a tuple in a system catalog, inserted or deleted
			 *			by a not yet committed transaction
			 *		(b) known dead (XMIN_INVALID, or XMAX_COMMITTED and xmax
			 *			is visible to all active transactions)
			 *		(c) inserted by a committed xact (XMIN_COMMITTED)
			 *		(d) moved by the currently running VACUUM.
			 *		(e) deleted (XMAX_COMMITTED) but at least one active
			 *			transaction does not see the deleting transaction
			 * In case (a) we wouldn't be in repair_frag() at all.
1901 1902 1903 1904 1905
			 * In case (b) we cannot be here, because scan_heap() has
			 * already marked the item as unused, see continue above. Case
			 * (c) is what normally is to be expected. Case (d) is only
			 * possible, if a whole tuple chain has been moved while
			 * processing this or a higher numbered block.
1906
			 * ---
Bruce Momjian's avatar
Bruce Momjian committed
1907
			 */
1908 1909
			if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED))
			{
1910 1911 1912 1913 1914
				if (tuple.t_data->t_infomask & HEAP_MOVED_IN)
					elog(ERROR, "HEAP_MOVED_IN was not expected");
				if (!(tuple.t_data->t_infomask & HEAP_MOVED_OFF))
					elog(ERROR, "HEAP_MOVED_OFF was expected");

Bruce Momjian's avatar
Bruce Momjian committed
1915
				/*
1916 1917
				 * MOVED_OFF by another VACUUM would have caused the
				 * visibility check to set XMIN_COMMITTED or XMIN_INVALID.
Bruce Momjian's avatar
Bruce Momjian committed
1918
				 */
1919 1920
				if (HeapTupleHeaderGetXvac(tuple.t_data) != myXID)
					elog(ERROR, "invalid XVAC in tuple header");
Bruce Momjian's avatar
Bruce Momjian committed
1921 1922

				/*
1923 1924 1925
				 * If this (chain) tuple is moved by me already then I have to
				 * check is it in vacpage or not - i.e. is it moved while
				 * cleaning this page or some previous one.
1926
				 */
Bruce Momjian's avatar
Bruce Momjian committed
1927 1928 1929 1930

				/* Can't we Assert(keep_tuples > 0) here? */
				if (keep_tuples == 0)
					continue;
1931 1932 1933
				if (chain_tuple_moved)
				{
					/* some chains were moved while cleaning this page */
Bruce Momjian's avatar
Bruce Momjian committed
1934 1935 1936 1937 1938
					Assert(vacpage->offsets_free > 0);
					for (i = 0; i < vacpage->offsets_free; i++)
					{
						if (vacpage->offsets[i] == offnum)
							break;
1939
					}
Bruce Momjian's avatar
Bruce Momjian committed
1940
					if (i >= vacpage->offsets_free)		/* not found */
1941
					{
Bruce Momjian's avatar
Bruce Momjian committed
1942
						vacpage->offsets[vacpage->offsets_free++] = offnum;
1943 1944 1945
						keep_tuples--;
					}
				}
Bruce Momjian's avatar
Bruce Momjian committed
1946 1947 1948 1949 1950 1951
				else
				{
					vacpage->offsets[vacpage->offsets_free++] = offnum;
					keep_tuples--;
				}
				continue;
1952 1953 1954
			}

			/*
1955 1956 1957 1958
			 * If this tuple is in a chain of tuples created in updates by
			 * "recent" transactions then we have to move the whole chain of
			 * tuples to other places, so that we can write new t_ctid links
			 * that preserve the chain relationship.
1959 1960
			 *
			 * This test is complicated.  Read it as "if tuple is a recently
1961 1962 1963 1964
			 * created updated version, OR if it is an obsoleted version". (In
			 * the second half of the test, we needn't make any check on XMAX
			 * --- it must be recently obsoleted, else scan_heap would have
			 * deemed it removable.)
1965
			 *
1966 1967 1968 1969 1970 1971
			 * NOTE: this test is not 100% accurate: it is possible for a
			 * tuple to be an updated one with recent xmin, and yet not match
			 * any new_tid entry in the vtlinks list.  Presumably there was
			 * once a parent tuple with xmax matching the xmin, but it's
			 * possible that that tuple has been removed --- for example, if
			 * it had xmin = xmax and wasn't itself an updated version, then
1972 1973
			 * HeapTupleSatisfiesVacuum would deem it removable as soon as the
			 * xmin xact completes.
1974
			 *
1975 1976
			 * To be on the safe side, we abandon the repair_frag process if
			 * we cannot find the parent tuple in vtlinks.	This may be overly
1977
			 * conservative; AFAICS it would be safe to move the chain.
1978
			 */
1979
			if (((tuple.t_data->t_infomask & HEAP_UPDATED) &&
1980 1981
				 !TransactionIdPrecedes(HeapTupleHeaderGetXmin(tuple.t_data),
										OldestXmin)) ||
1982
				(!(tuple.t_data->t_infomask & (HEAP_XMAX_INVALID |
1983
											   HEAP_IS_LOCKED)) &&
1984 1985
				 !(ItemPointerEquals(&(tuple.t_self),
									 &(tuple.t_data->t_ctid)))))
1986
			{
Bruce Momjian's avatar
Bruce Momjian committed
1987
				Buffer		Cbuf = buf;
1988 1989
				bool		freeCbuf = false;
				bool		chain_move_failed = false;
Bruce Momjian's avatar
Bruce Momjian committed
1990 1991 1992
				ItemPointerData Ctid;
				HeapTupleData tp = tuple;
				Size		tlen = tuple_len;
1993 1994 1995
				VTupleMove	vtmove;
				int			num_vtmove;
				int			free_vtmove;
Bruce Momjian's avatar
Bruce Momjian committed
1996
				VacPage		to_vacpage = NULL;
Bruce Momjian's avatar
Bruce Momjian committed
1997 1998
				int			to_item = 0;
				int			ti;
1999

Bruce Momjian's avatar
Bruce Momjian committed
2000
				if (dst_buffer != InvalidBuffer)
2001
				{
2002
					ReleaseBuffer(dst_buffer);
Bruce Momjian's avatar
Bruce Momjian committed
2003
					dst_buffer = InvalidBuffer;
2004
				}
Bruce Momjian's avatar
Bruce Momjian committed
2005

2006 2007 2008
				/* Quick exit if we have no vtlinks to search in */
				if (vacrelstats->vtlinks == NULL)
				{
2009
					elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
Bruce Momjian's avatar
Bruce Momjian committed
2010
					break;		/* out of walk-along-page loop */
2011 2012
				}

2013
				/*
2014 2015 2016 2017
				 * If this tuple is in the begin/middle of the chain then we
				 * have to move to the end of chain.  As with any t_ctid
				 * chase, we have to verify that each new tuple is really the
				 * descendant of the tuple we came from.
2018
				 */
2019
				while (!(tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
2020
												  HEAP_IS_LOCKED)) &&
2021 2022
					   !(ItemPointerEquals(&(tp.t_self),
										   &(tp.t_data->t_ctid))))
2023
				{
2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040 2041
					ItemPointerData nextTid;
					TransactionId priorXmax;
					Buffer		nextBuf;
					Page		nextPage;
					OffsetNumber nextOffnum;
					ItemId		nextItemid;
					HeapTupleHeader nextTdata;

					nextTid = tp.t_data->t_ctid;
					priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
					/* assume block# is OK (see heap_fetch comments) */
					nextBuf = ReadBuffer(onerel,
										 ItemPointerGetBlockNumber(&nextTid));
					nextPage = BufferGetPage(nextBuf);
					/* If bogus or unused slot, assume tp is end of chain */
					nextOffnum = ItemPointerGetOffsetNumber(&nextTid);
					if (nextOffnum < FirstOffsetNumber ||
						nextOffnum > PageGetMaxOffsetNumber(nextPage))
2042
					{
2043 2044 2045 2046 2047 2048 2049 2050
						ReleaseBuffer(nextBuf);
						break;
					}
					nextItemid = PageGetItemId(nextPage, nextOffnum);
					if (!ItemIdIsUsed(nextItemid))
					{
						ReleaseBuffer(nextBuf);
						break;
2051
					}
2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064
					/* if not matching XMIN, assume tp is end of chain */
					nextTdata = (HeapTupleHeader) PageGetItem(nextPage,
															  nextItemid);
					if (!TransactionIdEquals(HeapTupleHeaderGetXmin(nextTdata),
											 priorXmax))
					{
						ReleaseBuffer(nextBuf);
						break;
					}
					/* OK, switch our attention to the next tuple in chain */
					tp.t_data = nextTdata;
					tp.t_self = nextTid;
					tlen = tp.t_len = ItemIdGetLength(nextItemid);
2065 2066
					if (freeCbuf)
						ReleaseBuffer(Cbuf);
2067 2068
					Cbuf = nextBuf;
					freeCbuf = true;
2069 2070
				}

2071 2072 2073 2074 2075
				/* Set up workspace for planning the chain move */
				vtmove = (VTupleMove) palloc(100 * sizeof(VTupleMoveData));
				num_vtmove = 0;
				free_vtmove = 100;

2076
				/*
2077 2078 2079
				 * Now, walk backwards up the chain (towards older tuples) and
				 * check if all items in chain can be moved.  We record all
				 * the moves that need to be made in the vtmove array.
2080
				 */
Bruce Momjian's avatar
Bruce Momjian committed
2081
				for (;;)
2082
				{
2083 2084 2085
					Buffer		Pbuf;
					Page		Ppage;
					ItemId		Pitemid;
2086
					HeapTupleHeader PTdata;
2087 2088 2089
					VTupleLinkData vtld,
							   *vtlp;

2090
					/* Identify a target page to move this tuple to */
Bruce Momjian's avatar
Bruce Momjian committed
2091 2092
					if (to_vacpage == NULL ||
						!enough_space(to_vacpage, tlen))
2093 2094 2095
					{
						for (i = 0; i < num_fraged_pages; i++)
						{
Bruce Momjian's avatar
Bruce Momjian committed
2096
							if (enough_space(fraged_pages->pagedesc[i], tlen))
2097 2098
								break;
						}
Bruce Momjian's avatar
Bruce Momjian committed
2099 2100

						if (i == num_fraged_pages)
Bruce Momjian's avatar
Bruce Momjian committed
2101
						{
2102
							/* can't move item anywhere */
2103
							chain_move_failed = true;
Bruce Momjian's avatar
Bruce Momjian committed
2104
							break;		/* out of check-all-items loop */
2105 2106
						}
						to_item = i;
Bruce Momjian's avatar
Bruce Momjian committed
2107
						to_vacpage = fraged_pages->pagedesc[to_item];
2108
					}
Bruce Momjian's avatar
Bruce Momjian committed
2109 2110
					to_vacpage->free -= MAXALIGN(tlen);
					if (to_vacpage->offsets_used >= to_vacpage->offsets_free)
2111
						to_vacpage->free -= sizeof(ItemIdData);
Bruce Momjian's avatar
Bruce Momjian committed
2112
					(to_vacpage->offsets_used)++;
2113 2114

					/* Add an entry to vtmove list */
2115 2116 2117
					if (free_vtmove == 0)
					{
						free_vtmove = 1000;
2118 2119 2120 2121
						vtmove = (VTupleMove)
							repalloc(vtmove,
									 (free_vtmove + num_vtmove) *
									 sizeof(VTupleMoveData));
2122 2123
					}
					vtmove[num_vtmove].tid = tp.t_self;
Bruce Momjian's avatar
Bruce Momjian committed
2124 2125
					vtmove[num_vtmove].vacpage = to_vacpage;
					if (to_vacpage->offsets_used == 1)
2126 2127 2128 2129 2130
						vtmove[num_vtmove].cleanVpd = true;
					else
						vtmove[num_vtmove].cleanVpd = false;
					free_vtmove--;
					num_vtmove++;
Bruce Momjian's avatar
Bruce Momjian committed
2131

2132
					/* Done if at beginning of chain */
Bruce Momjian's avatar
Bruce Momjian committed
2133
					if (!(tp.t_data->t_infomask & HEAP_UPDATED) ||
2134 2135 2136
					 TransactionIdPrecedes(HeapTupleHeaderGetXmin(tp.t_data),
										   OldestXmin))
						break;	/* out of check-all-items loop */
Bruce Momjian's avatar
Bruce Momjian committed
2137

2138
					/* Move to tuple with prior row version */
2139 2140 2141 2142 2143 2144 2145 2146
					vtld.new_tid = tp.t_self;
					vtlp = (VTupleLink)
						vac_bsearch((void *) &vtld,
									(void *) (vacrelstats->vtlinks),
									vacrelstats->num_vtlinks,
									sizeof(VTupleLinkData),
									vac_cmp_vtlinks);
					if (vtlp == NULL)
2147
					{
2148
						/* see discussion above */
2149
						elog(DEBUG2, "parent item in update-chain not found --- can't continue repair_frag");
2150
						chain_move_failed = true;
Bruce Momjian's avatar
Bruce Momjian committed
2151
						break;	/* out of check-all-items loop */
2152 2153 2154
					}
					tp.t_self = vtlp->this_tid;
					Pbuf = ReadBuffer(onerel,
2155
									ItemPointerGetBlockNumber(&(tp.t_self)));
2156 2157
					Ppage = BufferGetPage(Pbuf);
					Pitemid = PageGetItemId(Ppage,
2158
								   ItemPointerGetOffsetNumber(&(tp.t_self)));
2159 2160
					/* this can't happen since we saw tuple earlier: */
					if (!ItemIdIsUsed(Pitemid))
2161
						elog(ERROR, "parent itemid marked as unused");
2162
					PTdata = (HeapTupleHeader) PageGetItem(Ppage, Pitemid);
2163 2164 2165

					/* ctid should not have changed since we saved it */
					Assert(ItemPointerEquals(&(vtld.new_tid),
2166
											 &(PTdata->t_ctid)));
Bruce Momjian's avatar
Bruce Momjian committed
2167

2168
					/*
2169
					 * Read above about cases when !ItemIdIsUsed(nextItemid)
2170 2171 2172 2173 2174 2175 2176 2177
					 * (child item is removed)... Due to the fact that at the
					 * moment we don't remove unuseful part of update-chain,
					 * it's possible to get non-matching parent row here. Like
					 * as in the case which caused this problem, we stop
					 * shrinking here. I could try to find real parent row but
					 * want not to do it because of real solution will be
					 * implemented anyway, later, and we are too close to 6.5
					 * release. - vadim 06/11/99
2178
					 */
2179 2180
					if ((PTdata->t_infomask & HEAP_XMAX_IS_MULTI) ||
						!(TransactionIdEquals(HeapTupleHeaderGetXmax(PTdata),
2181
										 HeapTupleHeaderGetXmin(tp.t_data))))
2182 2183
					{
						ReleaseBuffer(Pbuf);
2184
						elog(DEBUG2, "too old parent tuple found --- can't continue repair_frag");
2185
						chain_move_failed = true;
Bruce Momjian's avatar
Bruce Momjian committed
2186
						break;	/* out of check-all-items loop */
2187
					}
2188
					tp.t_data = PTdata;
2189 2190 2191 2192 2193
					tlen = tp.t_len = ItemIdGetLength(Pitemid);
					if (freeCbuf)
						ReleaseBuffer(Cbuf);
					Cbuf = Pbuf;
					freeCbuf = true;
Bruce Momjian's avatar
Bruce Momjian committed
2194
				}				/* end of check-all-items loop */
2195

2196 2197
				if (freeCbuf)
					ReleaseBuffer(Cbuf);
2198 2199 2200
				freeCbuf = false;

				if (chain_move_failed)
2201
				{
2202
					/*
2203 2204 2205
					 * Undo changes to offsets_used state.	We don't bother
					 * cleaning up the amount-free state, since we're not
					 * going to do any further tuple motion.
2206 2207 2208 2209 2210 2211
					 */
					for (i = 0; i < num_vtmove; i++)
					{
						Assert(vtmove[i].vacpage->offsets_used > 0);
						(vtmove[i].vacpage->offsets_used)--;
					}
2212
					pfree(vtmove);
2213
					break;		/* out of walk-along-page loop */
2214
				}
2215

2216
				/*
2217 2218 2219
				 * Okay, move the whole tuple chain in reverse order.
				 *
				 * Ctid tracks the new location of the previously-moved tuple.
2220
				 */
2221 2222 2223
				ItemPointerSetInvalid(&Ctid);
				for (ti = 0; ti < num_vtmove; ti++)
				{
2224
					VacPage		destvacpage = vtmove[ti].vacpage;
Bruce Momjian's avatar
Bruce Momjian committed
2225 2226
					Page		Cpage;
					ItemId		Citemid;
2227

Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2228
					/* Get page to move from */
2229
					tuple.t_self = vtmove[ti].tid;
Bruce Momjian's avatar
Bruce Momjian committed
2230
					Cbuf = ReadBuffer(onerel,
2231
								 ItemPointerGetBlockNumber(&(tuple.t_self)));
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2232 2233

					/* Get page to move to */
Bruce Momjian's avatar
Bruce Momjian committed
2234
					dst_buffer = ReadBuffer(onerel, destvacpage->blkno);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2235

Bruce Momjian's avatar
Bruce Momjian committed
2236 2237
					LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
					if (dst_buffer != Cbuf)
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2238 2239
						LockBuffer(Cbuf, BUFFER_LOCK_EXCLUSIVE);

Bruce Momjian's avatar
Bruce Momjian committed
2240
					dst_page = BufferGetPage(dst_buffer);
2241
					Cpage = BufferGetPage(Cbuf);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2242

Bruce Momjian's avatar
Bruce Momjian committed
2243
					Citemid = PageGetItemId(Cpage,
2244
								ItemPointerGetOffsetNumber(&(tuple.t_self)));
2245 2246
					tuple.t_data = (HeapTupleHeader) PageGetItem(Cpage, Citemid);
					tuple_len = tuple.t_len = ItemIdGetLength(Citemid);
2247

Bruce Momjian's avatar
Bruce Momjian committed
2248 2249 2250
					move_chain_tuple(onerel, Cbuf, Cpage, &tuple,
									 dst_buffer, dst_page, destvacpage,
									 &ec, &Ctid, vtmove[ti].cleanVpd);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2251

Bruce Momjian's avatar
Bruce Momjian committed
2252
					num_moved++;
2253
					if (destvacpage->blkno > last_move_dest_block)
Bruce Momjian's avatar
Bruce Momjian committed
2254
						last_move_dest_block = destvacpage->blkno;
Bruce Momjian's avatar
Bruce Momjian committed
2255

2256 2257 2258 2259
					/*
					 * Remember that we moved tuple from the current page
					 * (corresponding index tuple will be cleaned).
					 */
2260
					if (Cbuf == buf)
Bruce Momjian's avatar
Bruce Momjian committed
2261
						vacpage->offsets[vacpage->offsets_free++] =
Bruce Momjian's avatar
Bruce Momjian committed
2262
							ItemPointerGetOffsetNumber(&(tuple.t_self));
2263 2264
					else
						keep_tuples++;
2265

2266 2267
					ReleaseBuffer(dst_buffer);
					ReleaseBuffer(Cbuf);
Bruce Momjian's avatar
Bruce Momjian committed
2268
				}				/* end of move-the-tuple-chain loop */
2269

Bruce Momjian's avatar
Bruce Momjian committed
2270
				dst_buffer = InvalidBuffer;
2271
				pfree(vtmove);
2272
				chain_tuple_moved = true;
2273 2274

				/* advance to next tuple in walk-along-page loop */
2275
				continue;
Bruce Momjian's avatar
Bruce Momjian committed
2276
			}					/* end of is-tuple-in-chain test */
2277

2278
			/* try to find new page for this tuple */
Bruce Momjian's avatar
Bruce Momjian committed
2279 2280
			if (dst_buffer == InvalidBuffer ||
				!enough_space(dst_vacpage, tuple_len))
2281
			{
Bruce Momjian's avatar
Bruce Momjian committed
2282
				if (dst_buffer != InvalidBuffer)
2283
				{
2284
					ReleaseBuffer(dst_buffer);
Bruce Momjian's avatar
Bruce Momjian committed
2285
					dst_buffer = InvalidBuffer;
2286
				}
Bruce Momjian's avatar
Bruce Momjian committed
2287
				for (i = 0; i < num_fraged_pages; i++)
2288
				{
Bruce Momjian's avatar
Bruce Momjian committed
2289
					if (enough_space(fraged_pages->pagedesc[i], tuple_len))
2290 2291
						break;
				}
Bruce Momjian's avatar
Bruce Momjian committed
2292
				if (i == num_fraged_pages)
2293
					break;		/* can't move item anywhere */
Bruce Momjian's avatar
Bruce Momjian committed
2294 2295 2296 2297
				dst_vacpage = fraged_pages->pagedesc[i];
				dst_buffer = ReadBuffer(onerel, dst_vacpage->blkno);
				LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
				dst_page = BufferGetPage(dst_buffer);
2298
				/* if this page was not used before - clean it */
Bruce Momjian's avatar
Bruce Momjian committed
2299 2300
				if (!PageIsEmpty(dst_page) && dst_vacpage->offsets_used == 0)
					vacuum_page(onerel, dst_buffer, dst_vacpage);
2301
			}
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2302
			else
Bruce Momjian's avatar
Bruce Momjian committed
2303
				LockBuffer(dst_buffer, BUFFER_LOCK_EXCLUSIVE);
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2304 2305

			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2306

Bruce Momjian's avatar
Bruce Momjian committed
2307 2308
			move_plain_tuple(onerel, buf, page, &tuple,
							 dst_buffer, dst_page, dst_vacpage, &ec);
2309

Bruce Momjian's avatar
Bruce Momjian committed
2310 2311 2312
			num_moved++;
			if (dst_vacpage->blkno > last_move_dest_block)
				last_move_dest_block = dst_vacpage->blkno;
2313

Bruce Momjian's avatar
Bruce Momjian committed
2314
			/*
Bruce Momjian's avatar
Bruce Momjian committed
2315 2316
			 * Remember that we moved tuple from the current page
			 * (corresponding index tuple will be cleaned).
2317
			 */
Bruce Momjian's avatar
Bruce Momjian committed
2318
			vacpage->offsets[vacpage->offsets_free++] = offnum;
Bruce Momjian's avatar
Bruce Momjian committed
2319
		}						/* walk along page */
2320

2321
		/*
2322 2323 2324 2325
		 * If we broke out of the walk-along-page loop early (ie, still have
		 * offnum <= maxoff), then we failed to move some tuple off this page.
		 * No point in shrinking any more, so clean up and exit the per-page
		 * loop.
2326
		 */
2327 2328
		if (offnum < maxoff && keep_tuples > 0)
		{
Bruce Momjian's avatar
Bruce Momjian committed
2329
			OffsetNumber off;
2330

2331
			/*
2332
			 * Fix vacpage state for any unvisited tuples remaining on page
2333
			 */
2334
			for (off = OffsetNumberNext(offnum);
Bruce Momjian's avatar
Bruce Momjian committed
2335 2336
				 off <= maxoff;
				 off = OffsetNumberNext(off))
2337
			{
Bruce Momjian's avatar
Bruce Momjian committed
2338 2339
				ItemId		itemid = PageGetItemId(page, off);
				HeapTupleHeader htup;
Bruce Momjian's avatar
Bruce Momjian committed
2340

2341 2342
				if (!ItemIdIsUsed(itemid))
					continue;
Bruce Momjian's avatar
Bruce Momjian committed
2343 2344
				htup = (HeapTupleHeader) PageGetItem(page, itemid);
				if (htup->t_infomask & HEAP_XMIN_COMMITTED)
2345
					continue;
Bruce Momjian's avatar
Bruce Momjian committed
2346

Bruce Momjian's avatar
Bruce Momjian committed
2347
				/*
2348 2349
				 * See comments in the walk-along-page loop above about why
				 * only MOVED_OFF tuples should be found here.
Bruce Momjian's avatar
Bruce Momjian committed
2350
				 */
2351 2352 2353 2354 2355 2356 2357
				if (htup->t_infomask & HEAP_MOVED_IN)
					elog(ERROR, "HEAP_MOVED_IN was not expected");
				if (!(htup->t_infomask & HEAP_MOVED_OFF))
					elog(ERROR, "HEAP_MOVED_OFF was expected");
				if (HeapTupleHeaderGetXvac(htup) != myXID)
					elog(ERROR, "invalid XVAC in tuple header");

Bruce Momjian's avatar
Bruce Momjian committed
2358
				if (chain_tuple_moved)
2359
				{
2360
					/* some chains were moved while cleaning this page */
Bruce Momjian's avatar
Bruce Momjian committed
2361 2362 2363 2364 2365
					Assert(vacpage->offsets_free > 0);
					for (i = 0; i < vacpage->offsets_free; i++)
					{
						if (vacpage->offsets[i] == off)
							break;
2366
					}
Bruce Momjian's avatar
Bruce Momjian committed
2367
					if (i >= vacpage->offsets_free)		/* not found */
2368
					{
Bruce Momjian's avatar
Bruce Momjian committed
2369
						vacpage->offsets[vacpage->offsets_free++] = off;
2370 2371 2372 2373
						Assert(keep_tuples > 0);
						keep_tuples--;
					}
				}
2374
				else
Bruce Momjian's avatar
Bruce Momjian committed
2375 2376 2377 2378 2379
				{
					vacpage->offsets[vacpage->offsets_free++] = off;
					Assert(keep_tuples > 0);
					keep_tuples--;
				}
2380 2381 2382
			}
		}

Bruce Momjian's avatar
Bruce Momjian committed
2383
		if (vacpage->offsets_free > 0)	/* some tuples were moved */
2384
		{
2385 2386
			if (chain_tuple_moved)		/* else - they are ordered */
			{
Bruce Momjian's avatar
Bruce Momjian committed
2387
				qsort((char *) (vacpage->offsets), vacpage->offsets_free,
Bruce Momjian's avatar
Bruce Momjian committed
2388
					  sizeof(OffsetNumber), vac_cmp_offno);
2389
			}
2390
			vpage_insert(&Nvacpagelist, copy_vac_page(vacpage));
2391
		}
2392 2393

		ReleaseBuffer(buf);
2394

2395
		if (offnum <= maxoff)
2396
			break;				/* had to quit early, see above note */
2397 2398 2399 2400 2401

	}							/* walk along relation */

	blkno++;					/* new number of blocks */

Bruce Momjian's avatar
Bruce Momjian committed
2402
	if (dst_buffer != InvalidBuffer)
2403
	{
Bruce Momjian's avatar
Bruce Momjian committed
2404
		Assert(num_moved > 0);
2405
		ReleaseBuffer(dst_buffer);
2406
	}
2407

Bruce Momjian's avatar
Bruce Momjian committed
2408
	if (num_moved > 0)
2409
	{
2410
		/*
2411 2412 2413
		 * We have to commit our tuple movings before we truncate the
		 * relation.  Ideally we should do Commit/StartTransactionCommand
		 * here, relying on the session-level table lock to protect our
2414 2415 2416 2417
		 * exclusive access to the relation.  However, that would require a
		 * lot of extra code to close and re-open the relation, indexes, etc.
		 * For now, a quick hack: record status of current transaction as
		 * committed, and continue.
2418
		 */
Vadim B. Mikheev's avatar
Vadim B. Mikheev committed
2419
		RecordTransactionCommit();
2420
	}
2421 2422

	/*
2423
	 * We are not going to move any more tuples across pages, but we still
2424 2425 2426 2427
	 * need to apply vacuum_page to compact free space in the remaining pages
	 * in vacuum_pages list.  Note that some of these pages may also be in the
	 * fraged_pages list, and may have had tuples moved onto them; if so, we
	 * already did vacuum_page and needn't do it again.
2428
	 */
2429 2430 2431
	for (i = 0, curpage = vacuum_pages->pagedesc;
		 i < vacuumed_pages;
		 i++, curpage++)
2432
	{
2433 2434
		vacuum_delay_point();

2435
		Assert((*curpage)->blkno < blkno);
2436
		if ((*curpage)->offsets_used == 0)
2437
		{
Bruce Momjian's avatar
Bruce Momjian committed
2438 2439 2440
			Buffer		buf;
			Page		page;

2441 2442 2443 2444
			/* this page was not used as a move target, so must clean it */
			buf = ReadBuffer(onerel, (*curpage)->blkno);
			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
			page = BufferGetPage(buf);
2445
			if (!PageIsEmpty(page))
2446
				vacuum_page(onerel, buf, *curpage);
2447
			UnlockReleaseBuffer(buf);
2448
		}
2449 2450 2451
	}

	/*
2452 2453 2454
	 * Now scan all the pages that we moved tuples onto and update tuple
	 * status bits.  This is not really necessary, but will save time for
	 * future transactions examining these tuples.
2455
	 */
Bruce Momjian's avatar
Bruce Momjian committed
2456 2457
	update_hint_bits(onerel, fraged_pages, num_fraged_pages,
					 last_move_dest_block, num_moved);
Bruce Momjian's avatar
Bruce Momjian committed
2458

2459
	/*
2460 2461 2462 2463
	 * It'd be cleaner to make this report at the bottom of this routine, but
	 * then the rusage would double-count the second pass of index vacuuming.
	 * So do it here and ignore the relatively small amount of processing that
	 * occurs below.
2464 2465
	 */
	ereport(elevel,
2466 2467 2468 2469 2470
			(errmsg("\"%s\": moved %u row versions, truncated %u to %u pages",
					RelationGetRelationName(onerel),
					num_moved, nblocks, blkno),
			 errdetail("%s.",
					   pg_rusage_show(&ru0))));
2471

2472 2473 2474
	/*
	 * Reflect the motion of system tuples to catalog cache here.
	 */
2475
	CommandCounterIncrement();
2476

Bruce Momjian's avatar
Bruce Momjian committed
2477
	if (Nvacpagelist.num_pages > 0)
2478
	{
2479
		/* vacuum indexes again if needed */
2480
		if (Irel != NULL)
2481
		{
2482
			VacPage    *vpleft,
2483 2484
					   *vpright,
						vpsave;
2485

Bruce Momjian's avatar
Bruce Momjian committed
2486 2487
			/* re-sort Nvacpagelist.pagedesc */
			for (vpleft = Nvacpagelist.pagedesc,
2488
				 vpright = Nvacpagelist.pagedesc + Nvacpagelist.num_pages - 1;
2489 2490 2491 2492 2493 2494
				 vpleft < vpright; vpleft++, vpright--)
			{
				vpsave = *vpleft;
				*vpleft = *vpright;
				*vpright = vpsave;
			}
Bruce Momjian's avatar
Bruce Momjian committed
2495

Bruce Momjian's avatar
Bruce Momjian committed
2496
			/*
2497 2498 2499 2500
			 * keep_tuples is the number of tuples that have been moved off a
			 * page during chain moves but not been scanned over subsequently.
			 * The tuple ids of these tuples are not recorded as free offsets
			 * for any VacPage, so they will not be cleared from the indexes.
Bruce Momjian's avatar
Bruce Momjian committed
2501
			 */
2502
			Assert(keep_tuples >= 0);
2503
			for (i = 0; i < nindexes; i++)
Bruce Momjian's avatar
Bruce Momjian committed
2504
				vacuum_index(&Nvacpagelist, Irel[i],
2505
							 vacrelstats->rel_tuples, keep_tuples);
2506 2507
		}

2508 2509 2510
		/*
		 * Clean moved-off tuples from last page in Nvacpagelist list.
		 *
2511 2512
		 * We need only do this in this one page, because higher-numbered
		 * pages are going to be truncated from the relation entirely. But see
2513
		 * comments for update_hint_bits().
2514
		 */
2515
		if (vacpage->blkno == (blkno - 1) &&
Bruce Momjian's avatar
Bruce Momjian committed
2516
			vacpage->offsets_free > 0)
2517
		{
Bruce Momjian's avatar
Bruce Momjian committed
2518 2519
			Buffer		buf;
			Page		page;
2520
			OffsetNumber unused[MaxOffsetNumber];
Bruce Momjian's avatar
Bruce Momjian committed
2521 2522 2523 2524
			OffsetNumber offnum,
						maxoff;
			int			uncnt;
			int			num_tuples = 0;
2525

Bruce Momjian's avatar
Bruce Momjian committed
2526
			buf = ReadBuffer(onerel, vacpage->blkno);
2527
			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
2528
			page = BufferGetPage(buf);
2529
			maxoff = PageGetMaxOffsetNumber(page);
2530
			for (offnum = FirstOffsetNumber;
2531
				 offnum <= maxoff;
2532 2533
				 offnum = OffsetNumberNext(offnum))
			{
Bruce Momjian's avatar
Bruce Momjian committed
2534
				ItemId		itemid = PageGetItemId(page, offnum);
Bruce Momjian's avatar
Bruce Momjian committed
2535 2536
				HeapTupleHeader htup;

2537 2538
				if (!ItemIdIsUsed(itemid))
					continue;
Bruce Momjian's avatar
Bruce Momjian committed
2539 2540 2541
				htup = (HeapTupleHeader) PageGetItem(page, itemid);
				if (htup->t_infomask & HEAP_XMIN_COMMITTED)
					continue;
2542

Bruce Momjian's avatar
Bruce Momjian committed
2543
				/*
2544 2545
				 * See comments in the walk-along-page loop above about why
				 * only MOVED_OFF tuples should be found here.
Bruce Momjian's avatar
Bruce Momjian committed
2546
				 */
2547 2548 2549 2550 2551 2552
				if (htup->t_infomask & HEAP_MOVED_IN)
					elog(ERROR, "HEAP_MOVED_IN was not expected");
				if (!(htup->t_infomask & HEAP_MOVED_OFF))
					elog(ERROR, "HEAP_MOVED_OFF was expected");
				if (HeapTupleHeaderGetXvac(htup) != myXID)
					elog(ERROR, "invalid XVAC in tuple header");
Bruce Momjian's avatar
Bruce Momjian committed
2553 2554 2555

				itemid->lp_flags &= ~LP_USED;
				num_tuples++;
2556
			}
Bruce Momjian's avatar
Bruce Momjian committed
2557
			Assert(vacpage->offsets_free == num_tuples);
2558

2559
			START_CRIT_SECTION();
2560

2561
			uncnt = PageRepairFragmentation(page, unused);
2562

2563 2564
			MarkBufferDirty(buf);

2565 2566
			/* XLOG stuff */
			if (!onerel->rd_istemp)
2567
			{
2568
				XLogRecPtr	recptr;
2569

2570
				recptr = log_heap_clean(onerel, buf, unused, uncnt);
2571
				PageSetLSN(page, recptr);
2572
				PageSetTLI(page, ThisTimeLineID);
2573
			}
2574 2575
			else
			{
Bruce Momjian's avatar
Bruce Momjian committed
2576
				/*
2577 2578
				 * No XLOG record, but still need to flag that XID exists on
				 * disk
Bruce Momjian's avatar
Bruce Momjian committed
2579
				 */
2580 2581 2582
				MyXactMadeTempRelUpdate = true;
			}

2583
			END_CRIT_SECTION();
2584

2585
			UnlockReleaseBuffer(buf);
2586 2587
		}

Bruce Momjian's avatar
Bruce Momjian committed
2588
		/* now - free new list of reaped pages */
Bruce Momjian's avatar
Bruce Momjian committed
2589 2590 2591 2592
		curpage = Nvacpagelist.pagedesc;
		for (i = 0; i < Nvacpagelist.num_pages; i++, curpage++)
			pfree(*curpage);
		pfree(Nvacpagelist.pagedesc);
2593 2594
	}

2595
	/* Truncate relation, if needed */
2596
	if (blkno < nblocks)
2597
	{
2598
		RelationTruncate(onerel, blkno);
2599
		vacrelstats->rel_pages = blkno; /* set new number of blocks */
2600 2601
	}

2602
	/* clean up */
Bruce Momjian's avatar
Bruce Momjian committed
2603
	pfree(vacpage);
2604 2605
	if (vacrelstats->vtlinks != NULL)
		pfree(vacrelstats->vtlinks);
2606

Bruce Momjian's avatar
Bruce Momjian committed
2607 2608
	ExecContext_Finish(&ec);
}
2609

Bruce Momjian's avatar
Bruce Momjian committed
2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629
/*
 *	move_chain_tuple() -- move one tuple that is part of a tuple chain
 *
 *		This routine moves old_tup from old_page to dst_page.
 *		old_page and dst_page might be the same page.
 *		On entry old_buf and dst_buf are locked exclusively, both locks (or
 *		the single lock, if this is a intra-page-move) are released before
 *		exit.
 *
 *		Yes, a routine with ten parameters is ugly, but it's still better
 *		than having these 120 lines of code in repair_frag() which is
 *		already too long and almost unreadable.
 */
static void
move_chain_tuple(Relation rel,
				 Buffer old_buf, Page old_page, HeapTuple old_tup,
				 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
				 ExecContext ec, ItemPointer ctid, bool cleanVpd)
{
	TransactionId myXID = GetCurrentTransactionId();
Bruce Momjian's avatar
Bruce Momjian committed
2630 2631 2632 2633
	HeapTupleData newtup;
	OffsetNumber newoff;
	ItemId		newitemid;
	Size		tuple_len = old_tup->t_len;
Bruce Momjian's avatar
Bruce Momjian committed
2634

2635 2636 2637
	/*
	 * make a modifiable copy of the source tuple.
	 */
Bruce Momjian's avatar
Bruce Momjian committed
2638 2639
	heap_copytuple_with_tuple(old_tup, &newtup);

2640 2641 2642
	/*
	 * register invalidation of source tuple in catcaches.
	 */
Bruce Momjian's avatar
Bruce Momjian committed
2643 2644 2645 2646 2647
	CacheInvalidateHeapTuple(rel, old_tup);

	/* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
	START_CRIT_SECTION();

2648 2649 2650
	/*
	 * mark the source tuple MOVED_OFF.
	 */
Bruce Momjian's avatar
Bruce Momjian committed
2651
	old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
Bruce Momjian's avatar
Bruce Momjian committed
2652 2653
									 HEAP_XMIN_INVALID |
									 HEAP_MOVED_IN);
Bruce Momjian's avatar
Bruce Momjian committed
2654 2655 2656 2657 2658 2659
	old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
	HeapTupleHeaderSetXvac(old_tup->t_data, myXID);

	/*
	 * If this page was not used before - clean it.
	 *
2660 2661 2662 2663 2664 2665 2666
	 * NOTE: a nasty bug used to lurk here.  It is possible for the source and
	 * destination pages to be the same (since this tuple-chain member can be
	 * on a page lower than the one we're currently processing in the outer
	 * loop).  If that's true, then after vacuum_page() the source tuple will
	 * have been moved, and tuple.t_data will be pointing at garbage.
	 * Therefore we must do everything that uses old_tup->t_data BEFORE this
	 * step!!
Bruce Momjian's avatar
Bruce Momjian committed
2667
	 *
2668 2669
	 * This path is different from the other callers of vacuum_page, because
	 * we have already incremented the vacpage's offsets_used field to account
2670 2671 2672 2673
	 * for the tuple(s) we expect to move onto the page. Therefore
	 * vacuum_page's check for offsets_used == 0 is wrong. But since that's a
	 * good debugging check for all other callers, we work around it here
	 * rather than remove it.
Bruce Momjian's avatar
Bruce Momjian committed
2674 2675 2676
	 */
	if (!PageIsEmpty(dst_page) && cleanVpd)
	{
Bruce Momjian's avatar
Bruce Momjian committed
2677
		int			sv_offsets_used = dst_vacpage->offsets_used;
Bruce Momjian's avatar
Bruce Momjian committed
2678 2679 2680 2681 2682 2683 2684

		dst_vacpage->offsets_used = 0;
		vacuum_page(rel, dst_buf, dst_vacpage);
		dst_vacpage->offsets_used = sv_offsets_used;
	}

	/*
2685 2686
	 * Update the state of the copied tuple, and store it on the destination
	 * page.
Bruce Momjian's avatar
Bruce Momjian committed
2687 2688 2689 2690 2691 2692 2693 2694 2695 2696
	 */
	newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
								   HEAP_XMIN_INVALID |
								   HEAP_MOVED_OFF);
	newtup.t_data->t_infomask |= HEAP_MOVED_IN;
	HeapTupleHeaderSetXvac(newtup.t_data, myXID);
	newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
						 InvalidOffsetNumber, LP_USED);
	if (newoff == InvalidOffsetNumber)
		elog(PANIC, "failed to add item with len = %lu to page %u while moving tuple chain",
Bruce Momjian's avatar
Bruce Momjian committed
2697
			 (unsigned long) tuple_len, dst_vacpage->blkno);
Bruce Momjian's avatar
Bruce Momjian committed
2698
	newitemid = PageGetItemId(dst_page, newoff);
2699
	/* drop temporary copy, and point to the version on the dest page */
Bruce Momjian's avatar
Bruce Momjian committed
2700 2701
	pfree(newtup.t_data);
	newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
2702

Bruce Momjian's avatar
Bruce Momjian committed
2703 2704
	ItemPointerSet(&(newtup.t_self), dst_vacpage->blkno, newoff);

2705
	/*
2706 2707 2708
	 * Set new tuple's t_ctid pointing to itself if last tuple in chain, and
	 * to next tuple in chain otherwise.  (Since we move the chain in reverse
	 * order, this is actually the previously processed tuple.)
2709 2710 2711 2712 2713 2714 2715
	 */
	if (!ItemPointerIsValid(ctid))
		newtup.t_data->t_ctid = newtup.t_self;
	else
		newtup.t_data->t_ctid = *ctid;
	*ctid = newtup.t_self;

2716 2717 2718 2719
	MarkBufferDirty(dst_buf);
	if (dst_buf != old_buf)
		MarkBufferDirty(old_buf);

Bruce Momjian's avatar
Bruce Momjian committed
2720 2721 2722 2723 2724 2725 2726 2727 2728
	/* XLOG stuff */
	if (!rel->rd_istemp)
	{
		XLogRecPtr	recptr = log_heap_move(rel, old_buf, old_tup->t_self,
										   dst_buf, &newtup);

		if (old_buf != dst_buf)
		{
			PageSetLSN(old_page, recptr);
2729
			PageSetTLI(old_page, ThisTimeLineID);
Bruce Momjian's avatar
Bruce Momjian committed
2730 2731
		}
		PageSetLSN(dst_page, recptr);
2732
		PageSetTLI(dst_page, ThisTimeLineID);
Bruce Momjian's avatar
Bruce Momjian committed
2733 2734 2735
	}
	else
	{
2736 2737 2738
		/*
		 * No XLOG record, but still need to flag that XID exists on disk
		 */
Bruce Momjian's avatar
Bruce Momjian committed
2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752
		MyXactMadeTempRelUpdate = true;
	}

	END_CRIT_SECTION();

	LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
	if (dst_buf != old_buf)
		LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);

	/* Create index entries for the moved tuple */
	if (ec->resultRelInfo->ri_NumIndices > 0)
	{
		ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
		ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
2753
		ResetPerTupleExprContext(ec->estate);
Bruce Momjian's avatar
Bruce Momjian committed
2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774
	}
}

/*
 *	move_plain_tuple() -- move one tuple that is not part of a chain
 *
 *		This routine moves old_tup from old_page to dst_page.
 *		On entry old_buf and dst_buf are locked exclusively, both locks are
 *		released before exit.
 *
 *		Yes, a routine with eight parameters is ugly, but it's still better
 *		than having these 90 lines of code in repair_frag() which is already
 *		too long and almost unreadable.
 */
static void
move_plain_tuple(Relation rel,
				 Buffer old_buf, Page old_page, HeapTuple old_tup,
				 Buffer dst_buf, Page dst_page, VacPage dst_vacpage,
				 ExecContext ec)
{
	TransactionId myXID = GetCurrentTransactionId();
Bruce Momjian's avatar
Bruce Momjian committed
2775 2776 2777 2778
	HeapTupleData newtup;
	OffsetNumber newoff;
	ItemId		newitemid;
	Size		tuple_len = old_tup->t_len;
Bruce Momjian's avatar
Bruce Momjian committed
2779 2780 2781 2782 2783 2784 2785

	/* copy tuple */
	heap_copytuple_with_tuple(old_tup, &newtup);

	/*
	 * register invalidation of source tuple in catcaches.
	 *
Bruce Momjian's avatar
Bruce Momjian committed
2786
	 * (Note: we do not need to register the copied tuple, because we are not
2787 2788
	 * changing the tuple contents and so there cannot be any need to flush
	 * negative catcache entries.)
Bruce Momjian's avatar
Bruce Momjian committed
2789 2790 2791 2792 2793 2794
	 */
	CacheInvalidateHeapTuple(rel, old_tup);

	/* NO EREPORT(ERROR) TILL CHANGES ARE LOGGED */
	START_CRIT_SECTION();

2795 2796 2797
	/*
	 * Mark new tuple as MOVED_IN by me.
	 */
Bruce Momjian's avatar
Bruce Momjian committed
2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817
	newtup.t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
								   HEAP_XMIN_INVALID |
								   HEAP_MOVED_OFF);
	newtup.t_data->t_infomask |= HEAP_MOVED_IN;
	HeapTupleHeaderSetXvac(newtup.t_data, myXID);

	/* add tuple to the page */
	newoff = PageAddItem(dst_page, (Item) newtup.t_data, tuple_len,
						 InvalidOffsetNumber, LP_USED);
	if (newoff == InvalidOffsetNumber)
		elog(PANIC, "failed to add item with len = %lu to page %u (free space %lu, nusd %u, noff %u)",
			 (unsigned long) tuple_len,
			 dst_vacpage->blkno, (unsigned long) dst_vacpage->free,
			 dst_vacpage->offsets_used, dst_vacpage->offsets_free);
	newitemid = PageGetItemId(dst_page, newoff);
	pfree(newtup.t_data);
	newtup.t_data = (HeapTupleHeader) PageGetItem(dst_page, newitemid);
	ItemPointerSet(&(newtup.t_data->t_ctid), dst_vacpage->blkno, newoff);
	newtup.t_self = newtup.t_data->t_ctid;

2818 2819 2820
	/*
	 * Mark old tuple as MOVED_OFF by me.
	 */
Bruce Momjian's avatar
Bruce Momjian committed
2821
	old_tup->t_data->t_infomask &= ~(HEAP_XMIN_COMMITTED |
Bruce Momjian's avatar
Bruce Momjian committed
2822 2823
									 HEAP_XMIN_INVALID |
									 HEAP_MOVED_IN);
Bruce Momjian's avatar
Bruce Momjian committed
2824 2825 2826
	old_tup->t_data->t_infomask |= HEAP_MOVED_OFF;
	HeapTupleHeaderSetXvac(old_tup->t_data, myXID);

2827 2828 2829
	MarkBufferDirty(dst_buf);
	MarkBufferDirty(old_buf);

Bruce Momjian's avatar
Bruce Momjian committed
2830 2831 2832 2833 2834 2835 2836
	/* XLOG stuff */
	if (!rel->rd_istemp)
	{
		XLogRecPtr	recptr = log_heap_move(rel, old_buf, old_tup->t_self,
										   dst_buf, &newtup);

		PageSetLSN(old_page, recptr);
2837
		PageSetTLI(old_page, ThisTimeLineID);
Bruce Momjian's avatar
Bruce Momjian committed
2838
		PageSetLSN(dst_page, recptr);
2839
		PageSetTLI(dst_page, ThisTimeLineID);
Bruce Momjian's avatar
Bruce Momjian committed
2840 2841 2842
	}
	else
	{
2843 2844 2845
		/*
		 * No XLOG record, but still need to flag that XID exists on disk
		 */
Bruce Momjian's avatar
Bruce Momjian committed
2846 2847 2848 2849 2850
		MyXactMadeTempRelUpdate = true;
	}

	END_CRIT_SECTION();

2851
	dst_vacpage->free = PageGetFreeSpaceWithFillFactor(rel, dst_page);
Bruce Momjian's avatar
Bruce Momjian committed
2852 2853 2854 2855 2856 2857 2858 2859 2860 2861
	LockBuffer(dst_buf, BUFFER_LOCK_UNLOCK);
	LockBuffer(old_buf, BUFFER_LOCK_UNLOCK);

	dst_vacpage->offsets_used++;

	/* insert index' tuples if needed */
	if (ec->resultRelInfo->ri_NumIndices > 0)
	{
		ExecStoreTuple(&newtup, ec->slot, InvalidBuffer, false);
		ExecInsertIndexTuples(ec->slot, &(newtup.t_self), ec->estate, true);
2862
		ResetPerTupleExprContext(ec->estate);
Bruce Momjian's avatar
Bruce Momjian committed
2863 2864 2865 2866 2867 2868
	}
}

/*
 *	update_hint_bits() -- update hint bits in destination pages
 *
2869 2870 2871 2872 2873 2874 2875 2876 2877 2878 2879 2880 2881 2882 2883 2884
 * Scan all the pages that we moved tuples onto and update tuple status bits.
 * This is normally not really necessary, but it will save time for future
 * transactions examining these tuples.
 *
 * This pass guarantees that all HEAP_MOVED_IN tuples are marked as
 * XMIN_COMMITTED, so that future tqual tests won't need to check their XVAC.
 *
 * BUT NOTICE that this code fails to clear HEAP_MOVED_OFF tuples from
 * pages that were move source pages but not move dest pages.  The bulk
 * of the move source pages will be physically truncated from the relation,
 * and the last page remaining in the rel will be fixed separately in
 * repair_frag(), so the only cases where a MOVED_OFF tuple won't get its
 * hint bits updated are tuples that are moved as part of a chain and were
 * on pages that were not either move destinations nor at the end of the rel.
 * To completely ensure that no MOVED_OFF tuples remain unmarked, we'd have
 * to remember and revisit those pages too.
Bruce Momjian's avatar
Bruce Momjian committed
2885
 *
2886 2887 2888 2889 2890 2891 2892
 * Because of this omission, VACUUM FULL FREEZE is not a safe combination;
 * it's possible that the VACUUM's own XID remains exposed as something that
 * tqual tests would need to check.
 *
 * For the non-freeze case, one wonders whether it wouldn't be better to skip
 * this work entirely, and let the tuple status updates happen someplace
 * that's not holding an exclusive lock on the relation.
Bruce Momjian's avatar
Bruce Momjian committed
2893 2894 2895 2896 2897
 */
static void
update_hint_bits(Relation rel, VacPageList fraged_pages, int num_fraged_pages,
				 BlockNumber last_move_dest_block, int num_moved)
{
2898
	TransactionId myXID = GetCurrentTransactionId();
Bruce Momjian's avatar
Bruce Momjian committed
2899 2900
	int			checked_moved = 0;
	int			i;
Bruce Momjian's avatar
Bruce Momjian committed
2901
	VacPage    *curpage;
Bruce Momjian's avatar
Bruce Momjian committed
2902 2903 2904 2905 2906

	for (i = 0, curpage = fraged_pages->pagedesc;
		 i < num_fraged_pages;
		 i++, curpage++)
	{
Bruce Momjian's avatar
Bruce Momjian committed
2907 2908 2909 2910 2911
		Buffer		buf;
		Page		page;
		OffsetNumber max_offset;
		OffsetNumber off;
		int			num_tuples = 0;
Bruce Momjian's avatar
Bruce Momjian committed
2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926

		vacuum_delay_point();

		if ((*curpage)->blkno > last_move_dest_block)
			break;				/* no need to scan any further */
		if ((*curpage)->offsets_used == 0)
			continue;			/* this page was never used as a move dest */
		buf = ReadBuffer(rel, (*curpage)->blkno);
		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
		page = BufferGetPage(buf);
		max_offset = PageGetMaxOffsetNumber(page);
		for (off = FirstOffsetNumber;
			 off <= max_offset;
			 off = OffsetNumberNext(off))
		{
Bruce Momjian's avatar
Bruce Momjian committed
2927 2928
			ItemId		itemid = PageGetItemId(page, off);
			HeapTupleHeader htup;
Bruce Momjian's avatar
Bruce Momjian committed
2929 2930 2931 2932 2933 2934

			if (!ItemIdIsUsed(itemid))
				continue;
			htup = (HeapTupleHeader) PageGetItem(page, itemid);
			if (htup->t_infomask & HEAP_XMIN_COMMITTED)
				continue;
Bruce Momjian's avatar
Bruce Momjian committed
2935

Bruce Momjian's avatar
Bruce Momjian committed
2936
			/*
2937
			 * Here we may see either MOVED_OFF or MOVED_IN tuples.
Bruce Momjian's avatar
Bruce Momjian committed
2938
			 */
2939 2940 2941 2942 2943
			if (!(htup->t_infomask & HEAP_MOVED))
				elog(ERROR, "HEAP_MOVED_OFF/HEAP_MOVED_IN was expected");
			if (HeapTupleHeaderGetXvac(htup) != myXID)
				elog(ERROR, "invalid XVAC in tuple header");

Bruce Momjian's avatar
Bruce Momjian committed
2944 2945 2946 2947 2948 2949 2950 2951 2952
			if (htup->t_infomask & HEAP_MOVED_IN)
			{
				htup->t_infomask |= HEAP_XMIN_COMMITTED;
				htup->t_infomask &= ~HEAP_MOVED;
				num_tuples++;
			}
			else
				htup->t_infomask |= HEAP_XMIN_INVALID;
		}
2953 2954
		MarkBufferDirty(buf);
		UnlockReleaseBuffer(buf);
Bruce Momjian's avatar
Bruce Momjian committed
2955 2956 2957 2958
		Assert((*curpage)->offsets_used == num_tuples);
		checked_moved += num_tuples;
	}
	Assert(num_moved == checked_moved);
Bruce Momjian's avatar
Bruce Momjian committed
2959
}
2960 2961

/*
Bruce Momjian's avatar
Bruce Momjian committed
2962
 *	vacuum_heap() -- free dead tuples
2963
 *
2964 2965
 *		This routine marks dead tuples as unused and truncates relation
 *		if there are "empty" end-blocks.
2966 2967
 */
static void
Bruce Momjian's avatar
Bruce Momjian committed
2968
vacuum_heap(VRelStats *vacrelstats, Relation onerel, VacPageList vacuum_pages)
2969
{
2970
	Buffer		buf;
Bruce Momjian's avatar
Bruce Momjian committed
2971
	VacPage    *vacpage;
2972
	BlockNumber relblocks;
2973
	int			nblocks;
2974
	int			i;
2975

Bruce Momjian's avatar
Bruce Momjian committed
2976
	nblocks = vacuum_pages->num_pages;
2977
	nblocks -= vacuum_pages->empty_end_pages;	/* nothing to do with them */
2978

Bruce Momjian's avatar
Bruce Momjian committed
2979
	for (i = 0, vacpage = vacuum_pages->pagedesc; i < nblocks; i++, vacpage++)
2980
	{
2981 2982
		vacuum_delay_point();

Bruce Momjian's avatar
Bruce Momjian committed
2983
		if ((*vacpage)->offsets_free > 0)
2984
		{
Bruce Momjian's avatar
Bruce Momjian committed
2985
			buf = ReadBuffer(onerel, (*vacpage)->blkno);
2986 2987
			LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
			vacuum_page(onerel, buf, *vacpage);
2988
			UnlockReleaseBuffer(buf);
2989
		}
2990 2991
	}

2992
	/* Truncate relation if there are some empty end-pages */
2993
	Assert(vacrelstats->rel_pages >= vacuum_pages->empty_end_pages);
Bruce Momjian's avatar
Bruce Momjian committed
2994
	if (vacuum_pages->empty_end_pages > 0)
2995
	{
2996
		relblocks = vacrelstats->rel_pages - vacuum_pages->empty_end_pages;
2997 2998 2999 3000
		ereport(elevel,
				(errmsg("\"%s\": truncated %u to %u pages",
						RelationGetRelationName(onerel),
						vacrelstats->rel_pages, relblocks)));
3001
		RelationTruncate(onerel, relblocks);
3002
		vacrelstats->rel_pages = relblocks;		/* set new number of blocks */
3003
	}
Bruce Momjian's avatar
Bruce Momjian committed
3004
}
3005 3006

/*
Bruce Momjian's avatar
Bruce Momjian committed
3007
 *	vacuum_page() -- free dead tuples on a page
3008
 *					 and repair its fragmentation.
3009 3010
 *
 * Caller must hold pin and lock on buffer.
3011 3012
 */
static void
3013
vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
3014
{
3015
	OffsetNumber unused[MaxOffsetNumber];
3016 3017 3018 3019
	int			uncnt;
	Page		page = BufferGetPage(buffer);
	ItemId		itemid;
	int			i;
3020

3021
	/* There shouldn't be any tuples moved onto the page yet! */
Bruce Momjian's avatar
Bruce Momjian committed
3022
	Assert(vacpage->offsets_used == 0);
3023

3024
	START_CRIT_SECTION();
3025

Bruce Momjian's avatar
Bruce Momjian committed
3026
	for (i = 0; i < vacpage->offsets_free; i++)
3027
	{
3028
		itemid = PageGetItemId(page, vacpage->offsets[i]);
3029
		itemid->lp_flags &= ~LP_USED;
3030
	}
3031

3032
	uncnt = PageRepairFragmentation(page, unused);
3033

3034 3035
	MarkBufferDirty(buffer);

3036 3037
	/* XLOG stuff */
	if (!onerel->rd_istemp)
3038
	{
3039
		XLogRecPtr	recptr;
3040

3041
		recptr = log_heap_clean(onerel, buffer, unused, uncnt);
3042
		PageSetLSN(page, recptr);
3043
		PageSetTLI(page, ThisTimeLineID);
3044
	}
3045 3046 3047 3048 3049 3050
	else
	{
		/* No XLOG record, but still need to flag that XID exists on disk */
		MyXactMadeTempRelUpdate = true;
	}

3051
	END_CRIT_SECTION();
Bruce Momjian's avatar
Bruce Momjian committed
3052
}
3053

3054
/*
3055
 *	scan_index() -- scan one index relation to update pg_class statistics.
3056 3057
 *
 * We use this when we have no deletions to do.
3058 3059
 */
static void
3060
scan_index(Relation indrel, double num_tuples)
3061
{
3062
	IndexBulkDeleteResult *stats;
3063
	IndexVacuumInfo ivinfo;
3064
	PGRUsage	ru0;
3065

3066
	pg_rusage_init(&ru0);
3067

3068 3069 3070 3071
	ivinfo.index = indrel;
	ivinfo.vacuum_full = true;
	ivinfo.message_level = elevel;
	ivinfo.num_heap_tuples = num_tuples;
3072

3073
	stats = index_vacuum_cleanup(&ivinfo, NULL);
3074

3075 3076
	if (!stats)
		return;
3077

3078
	/* now update statistics in pg_class */
3079
	vac_update_relstats(RelationGetRelid(indrel),
3080
						stats->num_pages, stats->num_index_tuples,
3081
						false, InvalidTransactionId, InvalidTransactionId);
3082

3083
	ereport(elevel,
3084 3085
			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
					RelationGetRelationName(indrel),
3086
					stats->num_index_tuples,
3087 3088 3089 3090 3091
					stats->num_pages),
	errdetail("%u index pages have been deleted, %u are currently reusable.\n"
			  "%s.",
			  stats->pages_deleted, stats->pages_free,
			  pg_rusage_show(&ru0))));
3092

3093
	/*
3094 3095
	 * Check for tuple count mismatch.	If the index is partial, then it's OK
	 * for it to have fewer tuples than the heap; else we got trouble.
3096
	 */
3097
	if (stats->num_index_tuples != num_tuples)
3098
	{
3099
		if (stats->num_index_tuples > num_tuples ||
3100
			!vac_is_partial_index(indrel))
3101
			ereport(WARNING,
3102
					(errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3103 3104 3105
							RelationGetRelationName(indrel),
							stats->num_index_tuples, num_tuples),
					 errhint("Rebuild the index with REINDEX.")));
3106
	}
3107 3108

	pfree(stats);
Bruce Momjian's avatar
Bruce Momjian committed
3109
}
3110

3111
/*
Bruce Momjian's avatar
Bruce Momjian committed
3112
 *	vacuum_index() -- vacuum one index relation.
3113
 *
Bruce Momjian's avatar
Bruce Momjian committed
3114
 *		Vpl is the VacPageList of the heap we're currently vacuuming.
3115
 *		It's locked. Indrel is an index relation on the vacuumed heap.
3116 3117 3118
 *
 *		We don't bother to set locks on the index relation here, since
 *		the parent table is exclusive-locked already.
3119
 *
3120 3121
 *		Finally, we arrange to update the index relation's statistics in
 *		pg_class.
3122 3123
 */
static void
3124
vacuum_index(VacPageList vacpagelist, Relation indrel,
3125
			 double num_tuples, int keep_tuples)
3126
{
3127
	IndexBulkDeleteResult *stats;
3128
	IndexVacuumInfo ivinfo;
3129
	PGRUsage	ru0;
3130

3131
	pg_rusage_init(&ru0);
3132

3133 3134 3135 3136 3137
	ivinfo.index = indrel;
	ivinfo.vacuum_full = true;
	ivinfo.message_level = elevel;
	ivinfo.num_heap_tuples = num_tuples + keep_tuples;

3138
	/* Do bulk deletion */
3139
	stats = index_bulk_delete(&ivinfo, NULL, tid_reaped, (void *) vacpagelist);
3140

3141
	/* Do post-VACUUM cleanup */
3142
	stats = index_vacuum_cleanup(&ivinfo, stats);
3143

3144 3145
	if (!stats)
		return;
3146

3147
	/* now update statistics in pg_class */
3148
	vac_update_relstats(RelationGetRelid(indrel),
3149
						stats->num_pages, stats->num_index_tuples,
3150
						false, InvalidTransactionId, InvalidTransactionId);
3151

3152
	ereport(elevel,
3153 3154 3155 3156 3157 3158 3159 3160 3161 3162
			(errmsg("index \"%s\" now contains %.0f row versions in %u pages",
					RelationGetRelationName(indrel),
					stats->num_index_tuples,
					stats->num_pages),
			 errdetail("%.0f index row versions were removed.\n"
			 "%u index pages have been deleted, %u are currently reusable.\n"
					   "%s.",
					   stats->tuples_removed,
					   stats->pages_deleted, stats->pages_free,
					   pg_rusage_show(&ru0))));
3163

3164
	/*
3165 3166
	 * Check for tuple count mismatch.	If the index is partial, then it's OK
	 * for it to have fewer tuples than the heap; else we got trouble.
3167
	 */
3168
	if (stats->num_index_tuples != num_tuples + keep_tuples)
3169
	{
3170
		if (stats->num_index_tuples > num_tuples + keep_tuples ||
3171
			!vac_is_partial_index(indrel))
3172
			ereport(WARNING,
3173
					(errmsg("index \"%s\" contains %.0f row versions, but table contains %.0f row versions",
3174
							RelationGetRelationName(indrel),
3175
						  stats->num_index_tuples, num_tuples + keep_tuples),
3176
					 errhint("Rebuild the index with REINDEX.")));
3177
	}
3178 3179

	pfree(stats);
Bruce Momjian's avatar
Bruce Momjian committed
3180
}
3181 3182

/*
Bruce Momjian's avatar
Bruce Momjian committed
3183
 *	tid_reaped() -- is a particular tid reaped?
3184
 *
3185 3186
 *		This has the right signature to be an IndexBulkDeleteCallback.
 *
Bruce Momjian's avatar
Bruce Momjian committed
3187
 *		vacpagelist->VacPage_array is sorted in right order.
3188
 */
3189 3190
static bool
tid_reaped(ItemPointer itemptr, void *state)
3191
{
3192
	VacPageList vacpagelist = (VacPageList) state;
3193 3194
	OffsetNumber ioffno;
	OffsetNumber *voff;
Bruce Momjian's avatar
Bruce Momjian committed
3195
	VacPage		vp,
3196
			   *vpp;
Bruce Momjian's avatar
Bruce Momjian committed
3197
	VacPageData vacpage;
3198

Bruce Momjian's avatar
Bruce Momjian committed
3199
	vacpage.blkno = ItemPointerGetBlockNumber(itemptr);
3200
	ioffno = ItemPointerGetOffsetNumber(itemptr);
3201

Bruce Momjian's avatar
Bruce Momjian committed
3202
	vp = &vacpage;
3203 3204 3205 3206
	vpp = (VacPage *) vac_bsearch((void *) &vp,
								  (void *) (vacpagelist->pagedesc),
								  vacpagelist->num_pages,
								  sizeof(VacPage),
3207
								  vac_cmp_blk);
3208

3209 3210
	if (vpp == NULL)
		return false;
3211

3212 3213
	/* ok - we are on a partially or fully reaped page */
	vp = *vpp;
3214

Bruce Momjian's avatar
Bruce Momjian committed
3215
	if (vp->offsets_free == 0)
3216 3217
	{
		/* this is EmptyPage, so claim all tuples on it are reaped!!! */
3218
		return true;
3219 3220
	}

3221 3222 3223 3224
	voff = (OffsetNumber *) vac_bsearch((void *) &ioffno,
										(void *) (vp->offsets),
										vp->offsets_free,
										sizeof(OffsetNumber),
3225
										vac_cmp_offno);
3226

3227 3228
	if (voff == NULL)
		return false;
3229

3230
	/* tid is reaped */
3231
	return true;
Bruce Momjian's avatar
Bruce Momjian committed
3232
}
3233

3234 3235 3236 3237 3238 3239 3240 3241 3242
/*
 * Update the shared Free Space Map with the info we now have about
 * free space in the relation, discarding any old info the map may have.
 */
static void
vac_update_fsm(Relation onerel, VacPageList fraged_pages,
			   BlockNumber rel_pages)
{
	int			nPages = fraged_pages->num_pages;
3243 3244
	VacPage    *pagedesc = fraged_pages->pagedesc;
	Size		threshold;
3245
	PageFreeSpaceInfo *pageSpaces;
3246 3247 3248 3249 3250
	int			outPages;
	int			i;

	/*
	 * We only report pages with free space at least equal to the average
3251 3252 3253 3254 3255 3256 3257
	 * request size --- this avoids cluttering FSM with uselessly-small bits
	 * of space.  Although FSM would discard pages with little free space
	 * anyway, it's important to do this prefiltering because (a) it reduces
	 * the time spent holding the FSM lock in RecordRelationFreeSpace, and (b)
	 * FSM uses the number of pages reported as a statistic for guiding space
	 * management.	If we didn't threshold our reports the same way
	 * vacuumlazy.c does, we'd be skewing that statistic.
3258 3259
	 */
	threshold = GetAvgFSMRequestSize(&onerel->rd_node);
3260

3261
	pageSpaces = (PageFreeSpaceInfo *)
3262
		palloc(nPages * sizeof(PageFreeSpaceInfo));
3263
	outPages = 0;
3264 3265 3266 3267

	for (i = 0; i < nPages; i++)
	{
		/*
3268 3269 3270
		 * fraged_pages may contain entries for pages that we later decided to
		 * truncate from the relation; don't enter them into the free space
		 * map!
3271
		 */
3272
		if (pagedesc[i]->blkno >= rel_pages)
3273
			break;
3274 3275 3276 3277 3278 3279

		if (pagedesc[i]->free >= threshold)
		{
			pageSpaces[outPages].blkno = pagedesc[i]->blkno;
			pageSpaces[outPages].avail = pagedesc[i]->free;
			outPages++;
3280 3281 3282
		}
	}

3283
	RecordRelationFreeSpace(&onerel->rd_node, outPages, pageSpaces);
3284 3285

	pfree(pageSpaces);
3286 3287
}

3288 3289 3290
/* Copy a VacPage structure */
static VacPage
copy_vac_page(VacPage vacpage)
3291
{
3292
	VacPage		newvacpage;
3293

Bruce Momjian's avatar
Bruce Momjian committed
3294
	/* allocate a VacPageData entry */
3295
	newvacpage = (VacPage) palloc(sizeof(VacPageData) +
3296
							   vacpage->offsets_free * sizeof(OffsetNumber));
3297

3298
	/* fill it in */
Bruce Momjian's avatar
Bruce Momjian committed
3299
	if (vacpage->offsets_free > 0)
3300 3301
		memcpy(newvacpage->offsets, vacpage->offsets,
			   vacpage->offsets_free * sizeof(OffsetNumber));
Bruce Momjian's avatar
Bruce Momjian committed
3302 3303 3304 3305
	newvacpage->blkno = vacpage->blkno;
	newvacpage->free = vacpage->free;
	newvacpage->offsets_used = vacpage->offsets_used;
	newvacpage->offsets_free = vacpage->offsets_free;
3306

3307
	return newvacpage;
Bruce Momjian's avatar
Bruce Momjian committed
3308
}
3309

3310 3311 3312 3313 3314 3315 3316
/*
 * Add a VacPage pointer to a VacPageList.
 *
 *		As a side effect of the way that scan_heap works,
 *		higher pages come after lower pages in the array
 *		(and highest tid on a page is last).
 */
Bruce Momjian's avatar
Bruce Momjian committed
3317 3318
static void
vpage_insert(VacPageList vacpagelist, VacPage vpnew)
3319
{
3320
#define PG_NPAGEDESC 1024
3321

Bruce Momjian's avatar
Bruce Momjian committed
3322 3323
	/* allocate a VacPage entry if needed */
	if (vacpagelist->num_pages == 0)
3324
	{
Bruce Momjian's avatar
Bruce Momjian committed
3325 3326
		vacpagelist->pagedesc = (VacPage *) palloc(PG_NPAGEDESC * sizeof(VacPage));
		vacpagelist->num_allocated_pages = PG_NPAGEDESC;
3327
	}
Bruce Momjian's avatar
Bruce Momjian committed
3328
	else if (vacpagelist->num_pages >= vacpagelist->num_allocated_pages)
3329
	{
Bruce Momjian's avatar
Bruce Momjian committed
3330 3331
		vacpagelist->num_allocated_pages *= 2;
		vacpagelist->pagedesc = (VacPage *) repalloc(vacpagelist->pagedesc, vacpagelist->num_allocated_pages * sizeof(VacPage));
3332
	}
Bruce Momjian's avatar
Bruce Momjian committed
3333 3334
	vacpagelist->pagedesc[vacpagelist->num_pages] = vpnew;
	(vacpagelist->num_pages)++;
3335 3336
}

3337 3338 3339
/*
 * vac_bsearch: just like standard C library routine bsearch(),
 * except that we first test to see whether the target key is outside
3340
 * the range of the table entries.	This case is handled relatively slowly
3341 3342 3343
 * by the normal binary search algorithm (ie, no faster than any other key)
 * but it occurs often enough in VACUUM to be worth optimizing.
 */
3344
static void *
3345 3346
vac_bsearch(const void *key, const void *base,
			size_t nelem, size_t size,
3347
			int (*compar) (const void *, const void *))
3348
{
3349
	int			res;
3350 3351 3352 3353 3354 3355 3356 3357 3358 3359
	const void *last;

	if (nelem == 0)
		return NULL;
	res = compar(key, base);
	if (res < 0)
		return NULL;
	if (res == 0)
		return (void *) base;
	if (nelem > 1)
3360
	{
3361 3362 3363
		last = (const void *) ((const char *) base + (nelem - 1) * size);
		res = compar(key, last);
		if (res > 0)
3364
			return NULL;
3365 3366
		if (res == 0)
			return (void *) last;
3367
	}
3368 3369 3370
	if (nelem <= 2)
		return NULL;			/* already checked 'em all */
	return bsearch(key, base, nelem, size, compar);
Bruce Momjian's avatar
Bruce Momjian committed
3371
}
3372

3373 3374 3375
/*
 * Comparator routines for use with qsort() and bsearch().
 */
3376
static int
Bruce Momjian's avatar
Bruce Momjian committed
3377
vac_cmp_blk(const void *left, const void *right)
3378
{
3379 3380
	BlockNumber lblk,
				rblk;
3381

Bruce Momjian's avatar
Bruce Momjian committed
3382 3383
	lblk = (*((VacPage *) left))->blkno;
	rblk = (*((VacPage *) right))->blkno;
3384

3385
	if (lblk < rblk)
3386
		return -1;
3387
	if (lblk == rblk)
3388 3389
		return 0;
	return 1;
Bruce Momjian's avatar
Bruce Momjian committed
3390
}
3391

3392
static int
Bruce Momjian's avatar
Bruce Momjian committed
3393
vac_cmp_offno(const void *left, const void *right)
3394
{
3395
	if (*(OffsetNumber *) left < *(OffsetNumber *) right)
3396
		return -1;
3397
	if (*(OffsetNumber *) left == *(OffsetNumber *) right)
3398 3399
		return 0;
	return 1;
Bruce Momjian's avatar
Bruce Momjian committed
3400
}
3401

3402
static int
Bruce Momjian's avatar
Bruce Momjian committed
3403
vac_cmp_vtlinks(const void *left, const void *right)
3404
{
Bruce Momjian's avatar
Bruce Momjian committed
3405 3406
	if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi <
		((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3407
		return -1;
Bruce Momjian's avatar
Bruce Momjian committed
3408 3409
	if (((VTupleLink) left)->new_tid.ip_blkid.bi_hi >
		((VTupleLink) right)->new_tid.ip_blkid.bi_hi)
3410 3411
		return 1;
	/* bi_hi-es are equal */
Bruce Momjian's avatar
Bruce Momjian committed
3412 3413
	if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo <
		((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3414
		return -1;
Bruce Momjian's avatar
Bruce Momjian committed
3415 3416
	if (((VTupleLink) left)->new_tid.ip_blkid.bi_lo >
		((VTupleLink) right)->new_tid.ip_blkid.bi_lo)
3417 3418
		return 1;
	/* bi_lo-es are equal */
Bruce Momjian's avatar
Bruce Momjian committed
3419 3420
	if (((VTupleLink) left)->new_tid.ip_posid <
		((VTupleLink) right)->new_tid.ip_posid)
3421
		return -1;
Bruce Momjian's avatar
Bruce Momjian committed
3422 3423
	if (((VTupleLink) left)->new_tid.ip_posid >
		((VTupleLink) right)->new_tid.ip_posid)
3424 3425 3426
		return 1;
	return 0;
}
3427

3428

3429 3430 3431 3432 3433
/*
 * Open all the indexes of the given relation, obtaining the specified kind
 * of lock on each.  Return an array of Relation pointers for the indexes
 * into *Irel, and the number of indexes into *nindexes.
 */
3434
void
3435 3436
vac_open_indexes(Relation relation, LOCKMODE lockmode,
				 int *nindexes, Relation **Irel)
3437
{
3438 3439
	List	   *indexoidlist;
	ListCell   *indexoidscan;
3440
	int			i;
3441

3442
	indexoidlist = RelationGetIndexList(relation);
3443

3444
	*nindexes = list_length(indexoidlist);
3445

3446 3447
	if (*nindexes > 0)
		*Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
3448 3449
	else
		*Irel = NULL;
3450

3451 3452
	i = 0;
	foreach(indexoidscan, indexoidlist)
3453
	{
3454
		Oid			indexoid = lfirst_oid(indexoidscan);
3455
		Relation	ind;
3456

3457 3458 3459
		ind = index_open(indexoid);
		(*Irel)[i++] = ind;
		LockRelation(ind, lockmode);
3460 3461
	}

3462
	list_free(indexoidlist);
Bruce Momjian's avatar
Bruce Momjian committed
3463
}
3464

3465
/*
3466
 * Release the resources acquired by vac_open_indexes.	Optionally release
3467 3468
 * the locks (say NoLock to keep 'em).
 */
3469
void
3470
vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode)
3471
{
3472
	if (Irel == NULL)
3473
		return;
3474

3475
	while (nindexes--)
3476 3477 3478 3479 3480 3481 3482
	{
		Relation	ind = Irel[nindexes];

		if (lockmode != NoLock)
			UnlockRelation(ind, lockmode);
		index_close(ind);
	}
3483
	pfree(Irel);
Bruce Momjian's avatar
Bruce Momjian committed
3484
}
3485 3486


3487 3488 3489 3490 3491
/*
 * Is an index partial (ie, could it contain fewer tuples than the heap?)
 */
bool
vac_is_partial_index(Relation indrel)
3492
{
3493
	/*
3494
	 * If the index's AM doesn't support nulls, it's partial for our purposes
3495
	 */
3496
	if (!indrel->rd_am->amindexnulls)
3497 3498 3499
		return true;

	/* Otherwise, look to see if there's a partial-index predicate */
3500 3501 3502 3503
	if (!heap_attisnull(indrel->rd_indextuple, Anum_pg_index_indpred))
		return true;

	return false;
Bruce Momjian's avatar
Bruce Momjian committed
3504
}
3505 3506


3507
static bool
Bruce Momjian's avatar
Bruce Momjian committed
3508
enough_space(VacPage vacpage, Size len)
3509
{
3510
	len = MAXALIGN(len);
3511

Bruce Momjian's avatar
Bruce Momjian committed
3512
	if (len > vacpage->free)
3513
		return false;
3514

3515 3516 3517
	/* if there are free itemid(s) and len <= free_space... */
	if (vacpage->offsets_used < vacpage->offsets_free)
		return true;
3518

3519 3520
	/* noff_used >= noff_free and so we'll have to allocate new itemid */
	if (len + sizeof(ItemIdData) <= vacpage->free)
3521
		return true;
3522

3523
	return false;
Bruce Momjian's avatar
Bruce Momjian committed
3524
}
3525

3526 3527 3528 3529 3530
static Size
PageGetFreeSpaceWithFillFactor(Relation relation, Page page)
{
	PageHeader	pd = (PageHeader) page;
	Size		freespace = pd->pd_upper - pd->pd_lower;
3531
	Size		targetfree;
3532

3533 3534 3535 3536
	targetfree = RelationGetTargetPageFreeSpace(relation,
												HEAP_DEFAULT_FILLFACTOR);
	if (freespace > targetfree)
		return freespace - targetfree;
3537 3538 3539
	else
		return 0;
}
3540

3541 3542 3543 3544 3545 3546 3547 3548 3549 3550 3551 3552 3553 3554 3555 3556
/*
 * vacuum_delay_point --- check for interrupts and cost-based delay.
 *
 * This should be called in each major loop of VACUUM processing,
 * typically once per page processed.
 */
void
vacuum_delay_point(void)
{
	/* Always check for interrupts */
	CHECK_FOR_INTERRUPTS();

	/* Nap if appropriate */
	if (VacuumCostActive && !InterruptPending &&
		VacuumCostBalance >= VacuumCostLimit)
	{
Bruce Momjian's avatar
Bruce Momjian committed
3557
		int			msec;
3558

3559 3560 3561
		msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit;
		if (msec > VacuumCostDelay * 4)
			msec = VacuumCostDelay * 4;
3562 3563 3564 3565 3566 3567 3568 3569 3570

		pg_usleep(msec * 1000L);

		VacuumCostBalance = 0;

		/* Might have gotten an interrupt while sleeping */
		CHECK_FOR_INTERRUPTS();
	}
}