cluster.c 27 KB
Newer Older
1 2
/*-------------------------------------------------------------------------
 *
3
 * cluster.c
4 5 6
 *	  CLUSTER a table on an index.
 *
 * There is hardly anything left of Paul Brown's original implementation...
7 8
 *
 *
9
 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
Bruce Momjian's avatar
Add:  
Bruce Momjian committed
10
 * Portions Copyright (c) 1994-5, Regents of the University of California
11 12 13
 *
 *
 * IDENTIFICATION
14
 *	  $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.138 2005/05/10 13:16:26 momjian Exp $
15 16 17
 *
 *-------------------------------------------------------------------------
 */
18
#include "postgres.h"
19

20
#include "access/genam.h"
Bruce Momjian's avatar
Bruce Momjian committed
21
#include "access/heapam.h"
22
#include "catalog/catalog.h"
23
#include "catalog/dependency.h"
Bruce Momjian's avatar
Bruce Momjian committed
24
#include "catalog/heap.h"
25
#include "catalog/index.h"
26
#include "catalog/indexing.h"
27
#include "catalog/namespace.h"
28
#include "commands/cluster.h"
29
#include "commands/tablecmds.h"
30
#include "miscadmin.h"
31
#include "utils/acl.h"
32
#include "utils/fmgroids.h"
33
#include "utils/inval.h"
34
#include "utils/lsyscache.h"
35
#include "utils/memutils.h"
Bruce Momjian's avatar
Bruce Momjian committed
36
#include "utils/syscache.h"
37
#include "utils/relcache.h"
38

39 40 41

/*
 * This struct is used to pass around the information on tables to be
42 43 44 45 46
 * clustered. We need this so we can make a list of them when invoked without
 * a specific table/index pair.
 */
typedef struct
{
Bruce Momjian's avatar
Bruce Momjian committed
47 48
	Oid			tableOid;
	Oid			indexOid;
49
} RelToCluster;
50

51

52
static void cluster_rel(RelToCluster *rv, bool recheck);
53
static void rebuild_relation(Relation OldHeap, Oid indexOid);
54
static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
55 56 57 58 59
static List *get_tables_to_cluster(MemoryContext cluster_context);



/*---------------------------------------------------------------------------
Bruce Momjian's avatar
Bruce Momjian committed
60
 * This cluster code allows for clustering multiple tables at once. Because
61 62 63 64 65 66 67
 * of this, we cannot just run everything on a single transaction, or we
 * would be forced to acquire exclusive locks on all the tables being
 * clustered, simultaneously --- very likely leading to deadlock.
 *
 * To solve this we follow a similar strategy to VACUUM code,
 * clustering each relation in a separate transaction. For this to work,
 * we need to:
Bruce Momjian's avatar
Bruce Momjian committed
68 69 70 71 72 73 74
 *	- provide a separate memory context so that we can pass information in
 *	  a way that survives across transactions
 *	- start a new transaction every time a new relation is clustered
 *	- check for validity of the information on to-be-clustered relations,
 *	  as someone might have deleted a relation behind our back, or
 *	  clustered one on a different index
 *	- end the transaction
75 76 77
 *
 * The single-relation case does not have any such overhead.
 *
Bruce Momjian's avatar
Bruce Momjian committed
78
 * We also allow a relation being specified without index.	In that case,
79 80 81 82 83 84 85 86 87 88
 * the indisclustered bit will be looked up, and an ERROR will be thrown
 * if there is no index with the bit set.
 *---------------------------------------------------------------------------
 */
void
cluster(ClusterStmt *stmt)
{
	if (stmt->relation != NULL)
	{
		/* This is the single-relation case. */
Bruce Momjian's avatar
Bruce Momjian committed
89 90 91 92
		Oid			tableOid,
					indexOid = InvalidOid;
		Relation	rel;
		RelToCluster rvtc;
93 94

		/* Find and lock the table */
95
		rel = heap_openrv(stmt->relation, AccessExclusiveLock);
96

97
		tableOid = RelationGetRelid(rel);
98 99

		/* Check permissions */
100 101 102
		if (!pg_class_ownercheck(tableOid, GetUserId()))
			aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
						   RelationGetRelationName(rel));
103 104 105

		if (stmt->indexname == NULL)
		{
106
			ListCell   *index;
107 108

			/* We need to find the index that has indisclustered set. */
Bruce Momjian's avatar
Bruce Momjian committed
109
			foreach(index, RelationGetIndexList(rel))
110
			{
Bruce Momjian's avatar
Bruce Momjian committed
111 112
				HeapTuple	idxtuple;
				Form_pg_index indexForm;
113

114
				indexOid = lfirst_oid(index);
115 116 117 118
				idxtuple = SearchSysCache(INDEXRELID,
										  ObjectIdGetDatum(indexOid),
										  0, 0, 0);
				if (!HeapTupleIsValid(idxtuple))
119
					elog(ERROR, "cache lookup failed for index %u", indexOid);
120 121 122 123 124 125 126 127 128 129 130
				indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
				if (indexForm->indisclustered)
				{
					ReleaseSysCache(idxtuple);
					break;
				}
				ReleaseSysCache(idxtuple);
				indexOid = InvalidOid;
			}

			if (!OidIsValid(indexOid))
131 132 133 134
				ereport(ERROR,
						(errcode(ERRCODE_UNDEFINED_OBJECT),
						 errmsg("there is no previously clustered index for table \"%s\"",
								stmt->relation->relname)));
135 136 137
		}
		else
		{
Bruce Momjian's avatar
Bruce Momjian committed
138 139 140 141
			/*
			 * The index is expected to be in the same namespace as the
			 * relation.
			 */
142 143 144
			indexOid = get_relname_relid(stmt->indexname,
										 rel->rd_rel->relnamespace);
			if (!OidIsValid(indexOid))
145 146
				ereport(ERROR,
						(errcode(ERRCODE_UNDEFINED_OBJECT),
Bruce Momjian's avatar
Bruce Momjian committed
147 148
				   errmsg("index \"%s\" for table \"%s\" does not exist",
						  stmt->indexname, stmt->relation->relname)));
149 150
		}

151
		/* All other checks are done in cluster_rel() */
152 153 154 155 156 157 158 159 160 161 162 163
		rvtc.tableOid = tableOid;
		rvtc.indexOid = indexOid;

		/* close relation, keep lock till commit */
		heap_close(rel, NoLock);

		/* Do the job */
		cluster_rel(&rvtc, false);
	}
	else
	{
		/*
Bruce Momjian's avatar
Bruce Momjian committed
164 165
		 * This is the "multi relation" case. We need to cluster all
		 * tables that have some index with indisclustered set.
166
		 */
Bruce Momjian's avatar
Bruce Momjian committed
167
		MemoryContext cluster_context;
168 169
		List	   *rvs;
		ListCell   *rv;
170 171

		/*
Bruce Momjian's avatar
Bruce Momjian committed
172 173
		 * We cannot run this form of CLUSTER inside a user transaction
		 * block; we'd be holding locks way too long.
174 175 176 177 178 179
		 */
		PreventTransactionChain((void *) stmt, "CLUSTER");

		/*
		 * Create special memory context for cross-transaction storage.
		 *
180
		 * Since it is a child of PortalContext, it will go away even in case
181 182
		 * of error.
		 */
183
		cluster_context = AllocSetContextCreate(PortalContext,
184 185 186 187 188 189
												"Cluster",
												ALLOCSET_DEFAULT_MINSIZE,
												ALLOCSET_DEFAULT_INITSIZE,
												ALLOCSET_DEFAULT_MAXSIZE);

		/*
Bruce Momjian's avatar
Bruce Momjian committed
190 191
		 * Build the list of relations to cluster.	Note that this lives
		 * in cluster_context.
192 193 194 195
		 */
		rvs = get_tables_to_cluster(cluster_context);

		/* Commit to get out of starting transaction */
196
		CommitTransactionCommand();
197 198

		/* Ok, now that we've got them all, cluster them one by one */
Bruce Momjian's avatar
Bruce Momjian committed
199
		foreach(rv, rvs)
200
		{
Bruce Momjian's avatar
Bruce Momjian committed
201
			RelToCluster *rvtc = (RelToCluster *) lfirst(rv);
202 203

			/* Start a new transaction for each relation. */
204
			StartTransactionCommand();
205 206
			/* functions in indexes may want a snapshot set */
			ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
207
			cluster_rel(rvtc, true);
208
			CommitTransactionCommand();
209 210 211
		}

		/* Start a new transaction for the cleanup work. */
212
		StartTransactionCommand();
213

214 215 216 217
		/* Clean up working storage */
		MemoryContextDelete(cluster_context);
	}
}
218

219
/*
220
 * cluster_rel
221
 *
222 223
 * This clusters the table by creating a new, clustered table and
 * swapping the relfilenodes of the new table and the old table, so
Bruce Momjian's avatar
Bruce Momjian committed
224
 * the OID of the original table is preserved.	Thus we do not lose
225
 * GRANT, inheritance nor references to this table (this was a bug
226
 * in releases thru 7.3).
227
 *
228 229 230 231
 * Also create new indexes and swap the filenodes with the old indexes the
 * same way we do for the relation.  Since we are effectively bulk-loading
 * the new table, it's better to create the indexes afterwards than to fill
 * them incrementally while we load the table.
232
 */
233
static void
234
cluster_rel(RelToCluster *rvtc, bool recheck)
235
{
236
	Relation	OldHeap;
237

238 239 240
	/* Check for user-requested abort. */
	CHECK_FOR_INTERRUPTS();

241 242 243 244
	/*
	 * Since we may open a new transaction for each relation, we have to
	 * check that the relation still is what we think it is.
	 *
Bruce Momjian's avatar
Bruce Momjian committed
245 246
	 * If this is a single-transaction CLUSTER, we can skip these tests. We
	 * *must* skip the one on indisclustered since it would reject an
247
	 * attempt to cluster a not-previously-clustered index.
248
	 */
249 250
	if (recheck)
	{
Bruce Momjian's avatar
Bruce Momjian committed
251 252
		HeapTuple	tuple;
		Form_pg_index indexForm;
253 254 255 256 257 258 259

		/*
		 * Check if the relation and index still exist before opening them
		 */
		if (!SearchSysCacheExists(RELOID,
								  ObjectIdGetDatum(rvtc->tableOid),
								  0, 0, 0) ||
260 261 262
			!SearchSysCacheExists(RELOID,
								  ObjectIdGetDatum(rvtc->indexOid),
								  0, 0, 0))
263
			return;
264

265
		/* Check that the user still owns the relation */
266
		if (!pg_class_ownercheck(rvtc->tableOid, GetUserId()))
267
			return;
268

269 270 271
		/*
		 * Check that the index is still the one with indisclustered set.
		 */
272 273 274
		tuple = SearchSysCache(INDEXRELID,
							   ObjectIdGetDatum(rvtc->indexOid),
							   0, 0, 0);
275 276
		if (!HeapTupleIsValid(tuple))
			return;				/* could have gone away... */
277 278 279 280 281 282 283 284 285
		indexForm = (Form_pg_index) GETSTRUCT(tuple);
		if (!indexForm->indisclustered)
		{
			ReleaseSysCache(tuple);
			return;
		}
		ReleaseSysCache(tuple);
	}

286
	/*
287
	 * We grab exclusive access to the target rel and index for the
288
	 * duration of the transaction.  (This is redundant for the single-
Bruce Momjian's avatar
Bruce Momjian committed
289 290
	 * transaction case, since cluster() already did it.)  The index lock
	 * is taken inside check_index_is_clusterable.
291
	 */
292
	OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock);
293

294
	/* Check index is valid to cluster on */
295
	check_index_is_clusterable(OldHeap, rvtc->indexOid, recheck);
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311

	/* rebuild_relation does all the dirty work */
	rebuild_relation(OldHeap, rvtc->indexOid);

	/* NB: rebuild_relation does heap_close() on OldHeap */
}

/*
 * Verify that the specified index is a legitimate index to cluster on
 *
 * Side effect: obtains exclusive lock on the index.  The caller should
 * already have exclusive lock on the table, so the index lock is likely
 * redundant, but it seems best to grab it anyway to ensure the index
 * definition can't change under us.
 */
void
312
check_index_is_clusterable(Relation OldHeap, Oid indexOid, bool recheck)
313 314 315 316
{
	Relation	OldIndex;

	OldIndex = index_open(indexOid);
317
	LockRelation(OldIndex, AccessExclusiveLock);
318

319
	/*
320
	 * Check that index is in fact an index on the given relation
321
	 */
322
	if (OldIndex->rd_index == NULL ||
323
		OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
324 325 326 327 328
		ereport(ERROR,
				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
				 errmsg("\"%s\" is not an index for table \"%s\"",
						RelationGetRelationName(OldIndex),
						RelationGetRelationName(OldHeap))));
329

330
	/*
Bruce Momjian's avatar
Bruce Momjian committed
331 332 333 334
	 * Disallow clustering on incomplete indexes (those that might not
	 * index every row of the relation).  We could relax this by making a
	 * separate seqscan pass over the table to copy the missing rows, but
	 * that seems expensive and tedious.
335
	 */
336
	if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred))
337 338
		ereport(ERROR,
				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
339 340 341
				 errmsg("cannot cluster on partial index \"%s\"",
						RelationGetRelationName(OldIndex))));
	
342 343 344 345 346 347
	if (!OldIndex->rd_am->amindexnulls)
	{
		AttrNumber	colno;

		/*
		 * If the AM doesn't index nulls, then it's a partial index unless
Bruce Momjian's avatar
Bruce Momjian committed
348
		 * we can prove all the rows are non-null.	Note we only need look
349 350 351
		 * at the first column; multicolumn-capable AMs are *required* to
		 * index nulls in columns after the first.
		 */
352
		colno = OldIndex->rd_index->indkey.values[0];
353 354 355
		if (colno > 0)
		{
			/* ordinary user attribute */
356
			if (!OldHeap->rd_att->attrs[colno - 1]->attnotnull)
357 358
				ereport(ERROR,
						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
359 360 361 362 363 364 365
						 errmsg("cannot cluster on index \"%s\" because access method\n"
								"does not handle null values",
							  RelationGetRelationName(OldIndex)),
						 errhint("You may be able to work around this by marking column \"%s\" NOT NULL%s",
							NameStr(OldHeap->rd_att->attrs[colno - 1]->attname),
							recheck ? ",\nor use ALTER TABLE ... SET WITHOUT CLUSTER to remove the cluster\n"
									"specification from the table." : ".")));
366 367 368 369 370 371 372
		}
		else if (colno < 0)
		{
			/* system column --- okay, always non-null */
		}
		else
			/* index expression, lose... */
373 374
			ereport(ERROR,
					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
375 376 377
					 errmsg("cannot cluster on expressional index \"%s\" because its index access\n"
							"method does not handle null values",
						RelationGetRelationName(OldIndex))));
378 379
	}

380
	/*
Bruce Momjian's avatar
Bruce Momjian committed
381 382 383 384 385
	 * Disallow clustering system relations.  This will definitely NOT
	 * work for shared relations (we have no way to update pg_class rows
	 * in other databases), nor for nailed-in-cache relations (the
	 * relfilenode values for those are hardwired, see relcache.c).  It
	 * might work for other system relations, but I ain't gonna risk it.
386 387
	 */
	if (IsSystemRelation(OldHeap))
388 389 390 391
		ereport(ERROR,
				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
				 errmsg("\"%s\" is a system catalog",
						RelationGetRelationName(OldHeap))));
392

393 394 395 396 397
	/*
	 * Don't allow cluster on temp tables of other backends ... their
	 * local buffer manager is not going to cope.
	 */
	if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
398 399
		ereport(ERROR,
				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
Bruce Momjian's avatar
Bruce Momjian committed
400
		   errmsg("cannot cluster temporary tables of other sessions")));
401

402
	/* Drop relcache refcnt on OldIndex, but keep lock */
403
	index_close(OldIndex);
Bruce Momjian's avatar
Bruce Momjian committed
404 405
}

406
/*
407
 * mark_index_clustered: mark the specified index as the one clustered on
408
 *
409 410 411 412 413 414 415 416
 * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
 */
void
mark_index_clustered(Relation rel, Oid indexOid)
{
	HeapTuple	indexTuple;
	Form_pg_index indexForm;
	Relation	pg_index;
417
	ListCell   *index;
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442

	/*
	 * If the index is already marked clustered, no need to do anything.
	 */
	if (OidIsValid(indexOid))
	{
		indexTuple = SearchSysCache(INDEXRELID,
									ObjectIdGetDatum(indexOid),
									0, 0, 0);
		if (!HeapTupleIsValid(indexTuple))
			elog(ERROR, "cache lookup failed for index %u", indexOid);
		indexForm = (Form_pg_index) GETSTRUCT(indexTuple);

		if (indexForm->indisclustered)
		{
			ReleaseSysCache(indexTuple);
			return;
		}

		ReleaseSysCache(indexTuple);
	}

	/*
	 * Check each index of the relation and set/clear the bit as needed.
	 */
443
	pg_index = heap_open(IndexRelationId, RowExclusiveLock);
444 445 446

	foreach(index, RelationGetIndexList(rel))
	{
Bruce Momjian's avatar
Bruce Momjian committed
447
		Oid			thisIndexOid = lfirst_oid(index);
448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483

		indexTuple = SearchSysCacheCopy(INDEXRELID,
										ObjectIdGetDatum(thisIndexOid),
										0, 0, 0);
		if (!HeapTupleIsValid(indexTuple))
			elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
		indexForm = (Form_pg_index) GETSTRUCT(indexTuple);

		/*
		 * Unset the bit if set.  We know it's wrong because we checked
		 * this earlier.
		 */
		if (indexForm->indisclustered)
		{
			indexForm->indisclustered = false;
			simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
			CatalogUpdateIndexes(pg_index, indexTuple);
			/* Ensure we see the update in the index's relcache entry */
			CacheInvalidateRelcacheByRelid(thisIndexOid);
		}
		else if (thisIndexOid == indexOid)
		{
			indexForm->indisclustered = true;
			simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
			CatalogUpdateIndexes(pg_index, indexTuple);
			/* Ensure we see the update in the index's relcache entry */
			CacheInvalidateRelcacheByRelid(thisIndexOid);
		}
		heap_freetuple(indexTuple);
	}

	heap_close(pg_index, RowExclusiveLock);
}

/*
 * rebuild_relation: rebuild an existing relation in index order
484 485
 *
 * OldHeap: table to rebuild --- must be opened and exclusive-locked!
486
 * indexOid: index to cluster by
487 488 489
 *
 * NB: this routine closes OldHeap at the right time; caller should not.
 */
490
static void
491
rebuild_relation(Relation OldHeap, Oid indexOid)
Bruce Momjian's avatar
Bruce Momjian committed
492
{
493
	Oid			tableOid = RelationGetRelid(OldHeap);
494
	Oid			tableSpace = OldHeap->rd_rel->reltablespace;
Bruce Momjian's avatar
Bruce Momjian committed
495 496 497 498
	Oid			OIDNewHeap;
	char		NewHeapName[NAMEDATALEN];
	ObjectAddress object;

499 500
	/* Mark the correct index as clustered */
	mark_index_clustered(OldHeap, indexOid);
501 502 503 504

	/* Close relcache entry, but keep lock until transaction commit */
	heap_close(OldHeap, NoLock);

505
	/*
506
	 * Create the new heap, using a temporary name in the same namespace
Bruce Momjian's avatar
Bruce Momjian committed
507 508 509 510 511
	 * as the existing table.  NOTE: there is some risk of collision with
	 * user relnames.  Working around this seems more trouble than it's
	 * worth; in particular, we can't create the new heap in a different
	 * namespace from the old, or we will have problems with the TEMP
	 * status of temp tables.
512
	 */
513
	snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", tableOid);
514

515
	OIDNewHeap = make_new_heap(tableOid, NewHeapName, tableSpace);
Bruce Momjian's avatar
Bruce Momjian committed
516

Bruce Momjian's avatar
Bruce Momjian committed
517 518 519 520
	/*
	 * We don't need CommandCounterIncrement() because make_new_heap did
	 * it.
	 */
521

522 523 524
	/*
	 * Copy the heap data into the new table in the desired order.
	 */
525
	copy_heap_data(OIDNewHeap, tableOid, indexOid);
526

527
	/* To make the new heap's data visible (probably not needed?). */
528 529
	CommandCounterIncrement();

530 531
	/* Swap the physical files of the old and new heaps. */
	swap_relation_files(tableOid, OIDNewHeap);
532 533

	CommandCounterIncrement();
534

535
	/* Destroy new heap with old filenode */
536
	object.classId = RelationRelationId;
537
	object.objectId = OIDNewHeap;
538
	object.objectSubId = 0;
539

540 541
	/*
	 * The new relation is local to our transaction and we know nothing
542 543
	 * depends on it, so DROP_RESTRICT should be OK.
	 */
544 545 546
	performDeletion(&object, DROP_RESTRICT);

	/* performDeletion does CommandCounterIncrement at end */
Bruce Momjian's avatar
Bruce Momjian committed
547

548
	/*
Bruce Momjian's avatar
Bruce Momjian committed
549 550
	 * Rebuild each index on the relation (but not the toast table, which
	 * is all-new at this point).  We do not need
551
	 * CommandCounterIncrement() because reindex_relation does it.
552
	 */
553
	reindex_relation(tableOid, false);
554 555
}

556 557 558
/*
 * Create the new table that we will fill with correctly-ordered data.
 */
559
Oid
560
make_new_heap(Oid OIDOldHeap, const char *NewName, Oid NewTableSpace)
561
{
562 563 564
	TupleDesc	OldHeapDesc,
				tupdesc;
	Oid			OIDNewHeap;
565
	Relation	OldHeap;
566

567
	OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
568
	OldHeapDesc = RelationGetDescr(OldHeap);
569 570

	/*
571 572
	 * Need to make a copy of the tuple descriptor, since
	 * heap_create_with_catalog modifies it.
573
	 */
574
	tupdesc = CreateTupleDescCopyConstr(OldHeapDesc);
575

576 577
	OIDNewHeap = heap_create_with_catalog(NewName,
										  RelationGetNamespace(OldHeap),
Bruce Momjian's avatar
Bruce Momjian committed
578
										  NewTableSpace,
579
										  InvalidOid,
580
										  tupdesc,
581
										  OldHeap->rd_rel->relkind,
582
										  OldHeap->rd_rel->relisshared,
583 584
										  true,
										  0,
585
										  ONCOMMIT_NOOP,
586
										  allowSystemTableMods);
587

588
	/*
589 590
	 * Advance command counter so that the newly-created relation's
	 * catalog tuples will be visible to heap_open.
591 592 593 594
	 */
	CommandCounterIncrement();

	/*
595 596 597
	 * If necessary, create a TOAST table for the new relation. Note that
	 * AlterTableCreateToastTable ends with CommandCounterIncrement(), so
	 * that the TOAST table will be visible for insertion.
598
	 */
599
	AlterTableCreateToastTable(OIDNewHeap, true);
600

601
	heap_close(OldHeap, NoLock);
602

603
	return OIDNewHeap;
604 605
}

606 607 608
/*
 * Do the physical copying of heap data.
 */
609
static void
610
copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
611
{
612 613 614
	Relation	NewHeap,
				OldHeap,
				OldIndex;
615 616 617 618 619
	TupleDesc	oldTupDesc;
	TupleDesc	newTupDesc;
	int			natts;
	Datum	   *values;
	char	   *nulls;
620 621
	IndexScanDesc scan;
	HeapTuple	tuple;
622 623

	/*
624
	 * Open the relations we need.
625
	 */
626 627 628
	NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock);
	OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
	OldIndex = index_open(OIDOldIndex);
629

630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647
	/*
	 * Their tuple descriptors should be exactly alike, but here we only
	 * need assume that they have the same number of columns.
	 */
	oldTupDesc = RelationGetDescr(OldHeap);
	newTupDesc = RelationGetDescr(NewHeap);
	Assert(newTupDesc->natts == oldTupDesc->natts);

	/* Preallocate values/nulls arrays */
	natts = newTupDesc->natts;
	values = (Datum *) palloc0(natts * sizeof(Datum));
	nulls = (char *) palloc(natts * sizeof(char));
	memset(nulls, 'n', natts * sizeof(char));

	/*
	 * Scan through the OldHeap on the OldIndex and copy each tuple into the
	 * NewHeap.
	 */
648
	scan = index_beginscan(OldHeap, OldIndex, SnapshotNow, 0, (ScanKey) NULL);
649

650
	while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
651
	{
652
		/*
653 654 655 656 657 658 659 660 661 662 663 664
		 * We cannot simply pass the tuple to heap_insert(), for several
		 * reasons:
		 *
		 * 1. heap_insert() will overwrite the commit-status fields of the
		 * tuple it's handed.  This would trash the source relation, which is
		 * bad news if we abort later on.  (This was a bug in releases thru
		 * 7.0)
		 *
		 * 2. We'd like to squeeze out the values of any dropped columns,
		 * both to save space and to ensure we have no corner-case failures.
		 * (It's possible for example that the new table hasn't got a TOAST
		 * table and so is unable to store any large values of dropped cols.)
665
		 *
666 667 668 669 670
		 * 3. The tuple might not even be legal for the new table; this is
		 * currently only known to happen as an after-effect of ALTER TABLE
		 * SET WITHOUT OIDS.
		 *
		 * So, we must reconstruct the tuple from component Datums.
671
		 */
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688
		HeapTuple	copiedTuple;
		int			i;

		heap_deformtuple(tuple, oldTupDesc, values, nulls);

		/* Be sure to null out any dropped columns */
		for (i = 0; i < natts; i++)
		{
			if (newTupDesc->attrs[i]->attisdropped)
				nulls[i] = 'n';
		}

		copiedTuple = heap_formtuple(newTupDesc, values, nulls);

		/* Preserve OID, if any */
		if (NewHeap->rd_rel->relhasoids)
			HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));
689 690

		simple_heap_insert(NewHeap, copiedTuple);
691 692

		heap_freetuple(copiedTuple);
693

694
		CHECK_FOR_INTERRUPTS();
695
	}
696

697
	index_endscan(scan);
698

699 700 701
	pfree(values);
	pfree(nulls);

702 703 704
	index_close(OldIndex);
	heap_close(OldHeap, NoLock);
	heap_close(NewHeap, NoLock);
705
}
706

707
/*
708 709 710 711
 * Swap the physical files of two given relations.
 *
 * We swap the physical identity (reltablespace and relfilenode) while
 * keeping the same logical identities of the two relations.
712 713 714
 *
 * Also swap any TOAST links, so that the toast data moves along with
 * the main-table data.
715
 */
716
void
717
swap_relation_files(Oid r1, Oid r2)
718
{
719
	Relation	relRelation;
720 721 722 723 724
	HeapTuple	reltup1,
				reltup2;
	Form_pg_class relform1,
				relform2;
	Oid			swaptemp;
725
	CatalogIndexState indstate;
726

727
	/* We need writable copies of both pg_class tuples. */
728
	relRelation = heap_open(RelationRelationId, RowExclusiveLock);
729

730 731 732 733
	reltup1 = SearchSysCacheCopy(RELOID,
								 ObjectIdGetDatum(r1),
								 0, 0, 0);
	if (!HeapTupleIsValid(reltup1))
734
		elog(ERROR, "cache lookup failed for relation %u", r1);
735 736 737 738 739 740
	relform1 = (Form_pg_class) GETSTRUCT(reltup1);

	reltup2 = SearchSysCacheCopy(RELOID,
								 ObjectIdGetDatum(r2),
								 0, 0, 0);
	if (!HeapTupleIsValid(reltup2))
741
		elog(ERROR, "cache lookup failed for relation %u", r2);
742
	relform2 = (Form_pg_class) GETSTRUCT(reltup2);
743

744
	/*
745
	 * Actually swap the fields in the two tuples
746 747 748 749
	 */
	swaptemp = relform1->relfilenode;
	relform1->relfilenode = relform2->relfilenode;
	relform2->relfilenode = swaptemp;
750

751 752 753 754
	swaptemp = relform1->reltablespace;
	relform1->reltablespace = relform2->reltablespace;
	relform2->reltablespace = swaptemp;

755 756 757
	swaptemp = relform1->reltoastrelid;
	relform1->reltoastrelid = relform2->reltoastrelid;
	relform2->reltoastrelid = swaptemp;
758

759
	/* we should not swap reltoastidxid */
760

761 762
	/* swap size statistics too, since new rel has freshly-updated stats */
	{
Bruce Momjian's avatar
Bruce Momjian committed
763 764
		int4		swap_pages;
		float4		swap_tuples;
765 766 767 768 769 770 771 772 773 774

		swap_pages = relform1->relpages;
		relform1->relpages = relform2->relpages;
		relform2->relpages = swap_pages;

		swap_tuples = relform1->reltuples;
		relform1->reltuples = relform2->reltuples;
		relform2->reltuples = swap_tuples;
	}

775 776 777
	/* Update the tuples in pg_class */
	simple_heap_update(relRelation, &reltup1->t_self, reltup1);
	simple_heap_update(relRelation, &reltup2->t_self, reltup2);
Bruce Momjian's avatar
Bruce Momjian committed
778

779
	/* Keep system catalogs current */
780
	indstate = CatalogOpenIndexes(relRelation);
781 782
	CatalogIndexInsert(indstate, reltup1);
	CatalogIndexInsert(indstate, reltup2);
783
	CatalogCloseIndexes(indstate);
784

785
	/*
Bruce Momjian's avatar
Bruce Momjian committed
786 787 788 789
	 * If we have toast tables associated with the relations being
	 * swapped, change their dependency links to re-associate them with
	 * their new owning relations.	Otherwise the wrong one will get
	 * dropped ...
790
	 *
Bruce Momjian's avatar
Bruce Momjian committed
791 792
	 * NOTE: it is possible that only one table has a toast table; this can
	 * happen in CLUSTER if there were dropped columns in the old table,
793
	 * and in ALTER TABLE when adding or changing type of columns.
794
	 *
Bruce Momjian's avatar
Bruce Momjian committed
795 796
	 * NOTE: at present, a TOAST table's only dependency is the one on its
	 * owning table.  If more are ever created, we'd need to use something
797 798 799 800 801 802 803 804 805 806
	 * more selective than deleteDependencyRecordsFor() to get rid of only
	 * the link we want.
	 */
	if (relform1->reltoastrelid || relform2->reltoastrelid)
	{
		ObjectAddress baseobject,
					toastobject;
		long		count;

		/* Delete old dependencies */
807 808
		if (relform1->reltoastrelid)
		{
809
			count = deleteDependencyRecordsFor(RelationRelationId,
810 811 812 813 814 815 816
											   relform1->reltoastrelid);
			if (count != 1)
				elog(ERROR, "expected one dependency record for TOAST table, found %ld",
					 count);
		}
		if (relform2->reltoastrelid)
		{
817
			count = deleteDependencyRecordsFor(RelationRelationId,
818 819 820 821 822
											   relform2->reltoastrelid);
			if (count != 1)
				elog(ERROR, "expected one dependency record for TOAST table, found %ld",
					 count);
		}
823 824

		/* Register new dependencies */
825
		baseobject.classId = RelationRelationId;
826
		baseobject.objectSubId = 0;
827
		toastobject.classId = RelationRelationId;
828 829
		toastobject.objectSubId = 0;

830 831 832 833 834 835
		if (relform1->reltoastrelid)
		{
			baseobject.objectId = r1;
			toastobject.objectId = relform1->reltoastrelid;
			recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
		}
836

837 838 839 840 841 842
		if (relform2->reltoastrelid)
		{
			baseobject.objectId = r2;
			toastobject.objectId = relform2->reltoastrelid;
			recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
		}
843 844 845
	}

	/*
Bruce Momjian's avatar
Bruce Momjian committed
846
	 * Blow away the old relcache entries now.	We need this kluge because
847
	 * relcache.c keeps a link to the smgr relation for the physical file,
Bruce Momjian's avatar
Bruce Momjian committed
848 849 850 851 852 853 854 855 856
	 * and that will be out of date as soon as we do
	 * CommandCounterIncrement. Whichever of the rels is the second to be
	 * cleared during cache invalidation will have a dangling reference to
	 * an already-deleted smgr relation.  Rather than trying to avoid this
	 * by ordering operations just so, it's easiest to not have the
	 * relcache entries there at all. (Fortunately, since one of the
	 * entries is local in our transaction, it's sufficient to clear out
	 * our own relcache this way; the problem cannot arise for other
	 * backends when they see our update on the non-local relation.)
857 858 859 860 861 862 863 864 865
	 */
	RelationForgetRelation(r1);
	RelationForgetRelation(r2);

	/* Clean up. */
	heap_freetuple(reltup1);
	heap_freetuple(reltup2);

	heap_close(relRelation, RowExclusiveLock);
866
}
867

868 869
/*
 * Get a list of tables that the current user owns and
870
 * have indisclustered set.  Return the list in a List * of rvsToCluster
871
 * with the tableOid and the indexOid on which the table is already
872 873
 * clustered.
 */
874 875
static List *
get_tables_to_cluster(MemoryContext cluster_context)
876
{
Bruce Momjian's avatar
Bruce Momjian committed
877 878 879 880 881 882 883 884
	Relation	indRelation;
	HeapScanDesc scan;
	ScanKeyData entry;
	HeapTuple	indexTuple;
	Form_pg_index index;
	MemoryContext old_context;
	RelToCluster *rvtc;
	List	   *rvs = NIL;
885 886

	/*
887
	 * Get all indexes that have indisclustered set and are owned by
Bruce Momjian's avatar
Bruce Momjian committed
888 889 890
	 * appropriate user. System relations or nailed-in relations cannot
	 * ever have indisclustered set, because CLUSTER will refuse to set it
	 * when called with one of them as argument.
891
	 */
892
	indRelation = heap_open(IndexRelationId, AccessShareLock);
893 894 895 896
	ScanKeyInit(&entry,
				Anum_pg_index_indisclustered,
				BTEqualStrategyNumber, F_BOOLEQ,
				BoolGetDatum(true));
897 898 899 900
	scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry);
	while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
	{
		index = (Form_pg_index) GETSTRUCT(indexTuple);
901 902

		if (!pg_class_ownercheck(index->indrelid, GetUserId()))
903 904 905
			continue;

		/*
Bruce Momjian's avatar
Bruce Momjian committed
906 907
		 * We have to build the list in a different memory context so it
		 * will survive the cross-transaction processing
908 909 910
		 */
		old_context = MemoryContextSwitchTo(cluster_context);

911
		rvtc = (RelToCluster *) palloc(sizeof(RelToCluster));
912
		rvtc->tableOid = index->indrelid;
913 914
		rvtc->indexOid = index->indexrelid;
		rvs = lcons(rvtc, rvs);
915 916 917 918 919

		MemoryContextSwitchTo(old_context);
	}
	heap_endscan(scan);

920 921
	relation_close(indRelation, AccessShareLock);

922 923
	return rvs;
}