Commit 8325a8d6 authored by Tom Lane's avatar Tom Lane

Make ALTER COLUMN TYPE preserve clustered status for indexes it doesn't

modify.  Also fix a passel of problems with ALTER TABLE CLUSTER ON:
failure to check that the index is safe to cluster on (or even belongs
to the indicated rel, or even exists), and failure to broadcast a relcache
flush event when changing an index's state.
parent eee6f9d5
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.121 2004/05/05 04:48:45 tgl Exp $ * $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.122 2004/05/06 16:10:57 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -31,6 +31,7 @@ ...@@ -31,6 +31,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "utils/acl.h" #include "utils/acl.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/inval.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/syscache.h" #include "utils/syscache.h"
#include "utils/relcache.h" #include "utils/relcache.h"
...@@ -48,7 +49,6 @@ typedef struct ...@@ -48,7 +49,6 @@ typedef struct
IndexInfo *indexInfo; IndexInfo *indexInfo;
Oid accessMethodOID; Oid accessMethodOID;
Oid *classOID; Oid *classOID;
bool isclustered;
} IndexAttrs; } IndexAttrs;
/* /*
...@@ -246,8 +246,7 @@ cluster(ClusterStmt *stmt) ...@@ -246,8 +246,7 @@ cluster(ClusterStmt *stmt)
static void static void
cluster_rel(RelToCluster *rvtc, bool recheck) cluster_rel(RelToCluster *rvtc, bool recheck)
{ {
Relation OldHeap, Relation OldHeap;
OldIndex;
/* Check for user-requested abort. */ /* Check for user-requested abort. */
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
...@@ -300,18 +299,41 @@ cluster_rel(RelToCluster *rvtc, bool recheck) ...@@ -300,18 +299,41 @@ cluster_rel(RelToCluster *rvtc, bool recheck)
/* /*
* We grab exclusive access to the target rel and index for the * We grab exclusive access to the target rel and index for the
* duration of the transaction. (This is redundant for the single- * duration of the transaction. (This is redundant for the single-
* transaction case, since cluster() already did it.) * transaction case, since cluster() already did it.) The index
* lock is taken inside check_index_is_clusterable.
*/ */
OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock); OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock);
OldIndex = index_open(rvtc->indexOid); /* Check index is valid to cluster on */
check_index_is_clusterable(OldHeap, rvtc->indexOid);
/* rebuild_relation does all the dirty work */
rebuild_relation(OldHeap, rvtc->indexOid);
/* NB: rebuild_relation does heap_close() on OldHeap */
}
/*
* Verify that the specified index is a legitimate index to cluster on
*
* Side effect: obtains exclusive lock on the index. The caller should
* already have exclusive lock on the table, so the index lock is likely
* redundant, but it seems best to grab it anyway to ensure the index
* definition can't change under us.
*/
void
check_index_is_clusterable(Relation OldHeap, Oid indexOid)
{
Relation OldIndex;
OldIndex = index_open(indexOid);
LockRelation(OldIndex, AccessExclusiveLock); LockRelation(OldIndex, AccessExclusiveLock);
/* /*
* Check that index is in fact an index on the given relation * Check that index is in fact an index on the given relation
*/ */
if (OldIndex->rd_index == NULL || if (OldIndex->rd_index == NULL ||
OldIndex->rd_index->indrelid != rvtc->tableOid) OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE), (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not an index for table \"%s\"", errmsg("\"%s\" is not an index for table \"%s\"",
...@@ -386,11 +408,6 @@ cluster_rel(RelToCluster *rvtc, bool recheck) ...@@ -386,11 +408,6 @@ cluster_rel(RelToCluster *rvtc, bool recheck)
/* Drop relcache refcnt on OldIndex, but keep lock */ /* Drop relcache refcnt on OldIndex, but keep lock */
index_close(OldIndex); index_close(OldIndex);
/* rebuild_relation does all the dirty work */
rebuild_relation(OldHeap, rvtc->indexOid);
/* NB: rebuild_relation does heap_close() on OldHeap */
} }
/* /*
...@@ -410,13 +427,14 @@ void ...@@ -410,13 +427,14 @@ void
rebuild_relation(Relation OldHeap, Oid indexOid) rebuild_relation(Relation OldHeap, Oid indexOid)
{ {
Oid tableOid = RelationGetRelid(OldHeap); Oid tableOid = RelationGetRelid(OldHeap);
Oid oldClusterIndex;
List *indexes; List *indexes;
Oid OIDNewHeap; Oid OIDNewHeap;
char NewHeapName[NAMEDATALEN]; char NewHeapName[NAMEDATALEN];
ObjectAddress object; ObjectAddress object;
/* Save the information about all indexes on the relation. */ /* Save the information about all indexes on the relation. */
indexes = get_indexattr_list(OldHeap, indexOid); indexes = get_indexattr_list(OldHeap, &oldClusterIndex);
/* Close relcache entry, but keep lock until transaction commit */ /* Close relcache entry, but keep lock until transaction commit */
heap_close(OldHeap, NoLock); heap_close(OldHeap, NoLock);
...@@ -469,7 +487,8 @@ rebuild_relation(Relation OldHeap, Oid indexOid) ...@@ -469,7 +487,8 @@ rebuild_relation(Relation OldHeap, Oid indexOid)
* Recreate each index on the relation. We do not need * Recreate each index on the relation. We do not need
* CommandCounterIncrement() because rebuild_indexes does it. * CommandCounterIncrement() because rebuild_indexes does it.
*/ */
rebuild_indexes(tableOid, indexes); rebuild_indexes(tableOid, indexes,
(OidIsValid(indexOid) ? indexOid : oldClusterIndex));
} }
/* /*
...@@ -572,14 +591,18 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex) ...@@ -572,14 +591,18 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
/* /*
* Get the necessary info about the indexes of the relation and * Get the necessary info about the indexes of the relation and
* return a list of IndexAttrs structures. * return a list of IndexAttrs structures. Also, *OldClusterIndex
* is set to the OID of the existing clustered index, or InvalidOid
* if there is none.
*/ */
List * List *
get_indexattr_list(Relation OldHeap, Oid OldIndex) get_indexattr_list(Relation OldHeap, Oid *OldClusterIndex)
{ {
List *indexes = NIL; List *indexes = NIL;
List *indlist; List *indlist;
*OldClusterIndex = InvalidOid;
/* Ask the relcache to produce a list of the indexes of the old rel */ /* Ask the relcache to produce a list of the indexes of the old rel */
foreach(indlist, RelationGetIndexList(OldHeap)) foreach(indlist, RelationGetIndexList(OldHeap))
{ {
...@@ -598,16 +621,12 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex) ...@@ -598,16 +621,12 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex)
palloc(sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs); palloc(sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs);
memcpy(attrs->classOID, oldIndex->rd_index->indclass, memcpy(attrs->classOID, oldIndex->rd_index->indclass,
sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs); sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs);
/* We adjust the isclustered attribute to correct new state */ if (oldIndex->rd_index->indisclustered)
attrs->isclustered = (indexOID == OldIndex); *OldClusterIndex = indexOID;
index_close(oldIndex); index_close(oldIndex);
/* indexes = lappend(indexes, attrs);
* Cons the gathered data into the list. We do not care about
* ordering, and this is more efficient than append.
*/
indexes = lcons(attrs, indexes);
} }
return indexes; return indexes;
...@@ -616,17 +635,22 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex) ...@@ -616,17 +635,22 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex)
/* /*
* Create new indexes and swap the filenodes with old indexes. Then drop * Create new indexes and swap the filenodes with old indexes. Then drop
* the new index (carrying the old index filenode along). * the new index (carrying the old index filenode along).
*
* OIDClusterIndex is the OID of the index to be marked as clustered, or
* InvalidOid if none should be marked clustered.
*/ */
void void
rebuild_indexes(Oid OIDOldHeap, List *indexes) rebuild_indexes(Oid OIDOldHeap, List *indexes, Oid OIDClusterIndex)
{ {
List *elem; List *elem;
foreach(elem, indexes) foreach(elem, indexes)
{ {
IndexAttrs *attrs = (IndexAttrs *) lfirst(elem); IndexAttrs *attrs = (IndexAttrs *) lfirst(elem);
Oid oldIndexOID = attrs->indexOID;
Oid newIndexOID; Oid newIndexOID;
char newIndexName[NAMEDATALEN]; char newIndexName[NAMEDATALEN];
bool isclustered;
ObjectAddress object; ObjectAddress object;
Form_pg_index index; Form_pg_index index;
HeapTuple tuple; HeapTuple tuple;
...@@ -634,7 +658,7 @@ rebuild_indexes(Oid OIDOldHeap, List *indexes) ...@@ -634,7 +658,7 @@ rebuild_indexes(Oid OIDOldHeap, List *indexes)
/* Create the new index under a temporary name */ /* Create the new index under a temporary name */
snprintf(newIndexName, sizeof(newIndexName), snprintf(newIndexName, sizeof(newIndexName),
"pg_temp_%u", attrs->indexOID); "pg_temp_%u", oldIndexOID);
/* /*
* The new index will have primary and constraint status set to * The new index will have primary and constraint status set to
...@@ -654,26 +678,30 @@ rebuild_indexes(Oid OIDOldHeap, List *indexes) ...@@ -654,26 +678,30 @@ rebuild_indexes(Oid OIDOldHeap, List *indexes)
CommandCounterIncrement(); CommandCounterIncrement();
/* Swap the filenodes. */ /* Swap the filenodes. */
swap_relfilenodes(attrs->indexOID, newIndexOID); swap_relfilenodes(oldIndexOID, newIndexOID);
CommandCounterIncrement(); CommandCounterIncrement();
/* /*
* Make sure that indisclustered is correct: it should be set only * Make sure that indisclustered is correct: it should be set only
* for the index we just clustered on. * for the index specified by the caller.
*/ */
isclustered = (oldIndexOID == OIDClusterIndex);
pg_index = heap_openr(IndexRelationName, RowExclusiveLock); pg_index = heap_openr(IndexRelationName, RowExclusiveLock);
tuple = SearchSysCacheCopy(INDEXRELID, tuple = SearchSysCacheCopy(INDEXRELID,
ObjectIdGetDatum(attrs->indexOID), ObjectIdGetDatum(oldIndexOID),
0, 0, 0); 0, 0, 0);
if (!HeapTupleIsValid(tuple)) if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for index %u", attrs->indexOID); elog(ERROR, "cache lookup failed for index %u", oldIndexOID);
index = (Form_pg_index) GETSTRUCT(tuple); index = (Form_pg_index) GETSTRUCT(tuple);
if (index->indisclustered != attrs->isclustered) if (index->indisclustered != isclustered)
{ {
index->indisclustered = attrs->isclustered; index->indisclustered = isclustered;
simple_heap_update(pg_index, &tuple->t_self, tuple); simple_heap_update(pg_index, &tuple->t_self, tuple);
CatalogUpdateIndexes(pg_index, tuple); CatalogUpdateIndexes(pg_index, tuple);
/* Ensure we see the update in the index's relcache entry */
CacheInvalidateRelcacheByRelid(oldIndexOID);
} }
heap_freetuple(tuple); heap_freetuple(tuple);
heap_close(pg_index, RowExclusiveLock); heap_close(pg_index, RowExclusiveLock);
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.103 2004/05/05 04:48:45 tgl Exp $ * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.104 2004/05/06 16:10:57 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -2099,12 +2099,13 @@ ATRewriteTables(List **wqueue) ...@@ -2099,12 +2099,13 @@ ATRewriteTables(List **wqueue)
Oid OIDNewHeap; Oid OIDNewHeap;
char NewHeapName[NAMEDATALEN]; char NewHeapName[NAMEDATALEN];
List *indexes; List *indexes;
Oid oldClusterIndex;
Relation OldHeap; Relation OldHeap;
ObjectAddress object; ObjectAddress object;
/* Save the information about all indexes on the relation. */ /* Save the information about all indexes on the relation. */
OldHeap = heap_open(tab->relid, NoLock); OldHeap = heap_open(tab->relid, NoLock);
indexes = get_indexattr_list(OldHeap, InvalidOid); indexes = get_indexattr_list(OldHeap, &oldClusterIndex);
heap_close(OldHeap, NoLock); heap_close(OldHeap, NoLock);
/* /*
...@@ -2148,7 +2149,7 @@ ATRewriteTables(List **wqueue) ...@@ -2148,7 +2149,7 @@ ATRewriteTables(List **wqueue)
* Rebuild each index on the relation. We do not need * Rebuild each index on the relation. We do not need
* CommandCounterIncrement() because rebuild_indexes does it. * CommandCounterIncrement() because rebuild_indexes does it.
*/ */
rebuild_indexes(tab->relid, indexes); rebuild_indexes(tab->relid, indexes, oldClusterIndex);
} }
else else
{ {
...@@ -5004,6 +5005,9 @@ ATExecClusterOn(Relation rel, const char *indexName) ...@@ -5004,6 +5005,9 @@ ATExecClusterOn(Relation rel, const char *indexName)
errmsg("index \"%s\" for table \"%s\" does not exist", errmsg("index \"%s\" for table \"%s\" does not exist",
indexName, RelationGetRelationName(rel)))); indexName, RelationGetRelationName(rel))));
/* Check index is valid to cluster on */
check_index_is_clusterable(rel, indexOid);
indexTuple = SearchSysCache(INDEXRELID, indexTuple = SearchSysCache(INDEXRELID,
ObjectIdGetDatum(indexOid), ObjectIdGetDatum(indexOid),
0, 0, 0); 0, 0, 0);
...@@ -5050,12 +5054,16 @@ ATExecClusterOn(Relation rel, const char *indexName) ...@@ -5050,12 +5054,16 @@ ATExecClusterOn(Relation rel, const char *indexName)
idxForm->indisclustered = false; idxForm->indisclustered = false;
simple_heap_update(pg_index, &idxtuple->t_self, idxtuple); simple_heap_update(pg_index, &idxtuple->t_self, idxtuple);
CatalogUpdateIndexes(pg_index, idxtuple); CatalogUpdateIndexes(pg_index, idxtuple);
/* Ensure we see the update in the index's relcache entry */
CacheInvalidateRelcacheByRelid(indexOid);
} }
else if (idxForm->indexrelid == indexForm->indexrelid) else if (idxForm->indexrelid == indexForm->indexrelid)
{ {
idxForm->indisclustered = true; idxForm->indisclustered = true;
simple_heap_update(pg_index, &idxtuple->t_self, idxtuple); simple_heap_update(pg_index, &idxtuple->t_self, idxtuple);
CatalogUpdateIndexes(pg_index, idxtuple); CatalogUpdateIndexes(pg_index, idxtuple);
/* Ensure we see the update in the index's relcache entry */
CacheInvalidateRelcacheByRelid(indexOid);
} }
heap_freetuple(idxtuple); heap_freetuple(idxtuple);
} }
......
...@@ -74,7 +74,7 @@ ...@@ -74,7 +74,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.60 2004/02/10 01:55:26 tgl Exp $ * $PostgreSQL: pgsql/src/backend/utils/cache/inval.c,v 1.61 2004/05/06 16:10:57 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -88,6 +88,7 @@ ...@@ -88,6 +88,7 @@
#include "utils/inval.h" #include "utils/inval.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/relcache.h" #include "utils/relcache.h"
#include "utils/syscache.h"
/* /*
...@@ -765,6 +766,26 @@ CacheInvalidateRelcacheByTuple(HeapTuple classTuple) ...@@ -765,6 +766,26 @@ CacheInvalidateRelcacheByTuple(HeapTuple classTuple)
RegisterRelcacheInvalidation(databaseId, relationId, rnode); RegisterRelcacheInvalidation(databaseId, relationId, rnode);
} }
/*
* CacheInvalidateRelcacheByRelid
* As above, but relation is identified by passing its OID.
* This is the least efficient of the three options; use one of
* the above routines if you have a Relation or pg_class tuple.
*/
void
CacheInvalidateRelcacheByRelid(Oid relid)
{
HeapTuple tup;
tup = SearchSysCache(RELOID,
ObjectIdGetDatum(relid),
0, 0, 0);
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for relation %u", relid);
CacheInvalidateRelcacheByTuple(tup);
ReleaseSysCache(tup);
}
/* /*
* CacheRegisterSyscacheCallback * CacheRegisterSyscacheCallback
* Register the specified function to be called for all future * Register the specified function to be called for all future
......
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994-5, Regents of the University of California * Portions Copyright (c) 1994-5, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/commands/cluster.h,v 1.21 2004/05/05 04:48:47 tgl Exp $ * $PostgreSQL: pgsql/src/include/commands/cluster.h,v 1.22 2004/05/06 16:10:57 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -19,10 +19,12 @@ ...@@ -19,10 +19,12 @@
extern void cluster(ClusterStmt *stmt); extern void cluster(ClusterStmt *stmt);
extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid);
extern void rebuild_relation(Relation OldHeap, Oid indexOid); extern void rebuild_relation(Relation OldHeap, Oid indexOid);
extern Oid make_new_heap(Oid OIDOldHeap, const char *NewName); extern Oid make_new_heap(Oid OIDOldHeap, const char *NewName);
extern List *get_indexattr_list(Relation OldHeap, Oid OldIndex); extern List *get_indexattr_list(Relation OldHeap, Oid *OldClusterIndex);
extern void rebuild_indexes(Oid OIDOldHeap, List *indexes); extern void rebuild_indexes(Oid OIDOldHeap, List *indexes,
Oid OIDClusterIndex);
extern void swap_relfilenodes(Oid r1, Oid r2); extern void swap_relfilenodes(Oid r1, Oid r2);
#endif /* CLUSTER_H */ #endif /* CLUSTER_H */
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/utils/inval.h,v 1.30 2004/02/10 01:55:26 tgl Exp $ * $PostgreSQL: pgsql/src/include/utils/inval.h,v 1.31 2004/05/06 16:10:57 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -32,6 +32,8 @@ extern void CacheInvalidateRelcache(Relation relation); ...@@ -32,6 +32,8 @@ extern void CacheInvalidateRelcache(Relation relation);
extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple); extern void CacheInvalidateRelcacheByTuple(HeapTuple classTuple);
extern void CacheInvalidateRelcacheByRelid(Oid relid);
extern void CacheRegisterSyscacheCallback(int cacheid, extern void CacheRegisterSyscacheCallback(int cacheid,
CacheCallbackFunction func, CacheCallbackFunction func,
Datum arg); Datum arg);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment