Commit a03c0d93 authored by Tom Lane's avatar Tom Lane

Code review for CLUSTER ALL patch. Fix bogus locking, incorrect transaction

stop/start nesting, other infelicities.
parent 2e1f2c31
<!-- <!--
$Header: /cvsroot/pgsql/doc/src/sgml/ref/cluster.sgml,v 1.22 2002/11/18 17:12:06 momjian Exp $ $Header: /cvsroot/pgsql/doc/src/sgml/ref/cluster.sgml,v 1.23 2002/12/30 18:42:12 tgl Exp $
PostgreSQL documentation PostgreSQL documentation
--> -->
...@@ -108,14 +108,16 @@ CLUSTER ...@@ -108,14 +108,16 @@ CLUSTER
<para> <para>
When a table is clustered, <productname>PostgreSQL</productname> When a table is clustered, <productname>PostgreSQL</productname>
remembers on which index it was clustered. In calls to remembers on which index it was clustered. The form
<command>CLUSTER <replaceable class="parameter">tablename</replaceable></command>, <command>CLUSTER <replaceable class="parameter">tablename</replaceable></command>,
the table is clustered on the same index that it was clustered before. re-clusters the table on the same index that it was clustered before.
</para> </para>
<para> <para>
A simple <command>CLUSTER</command> clusters all the tables in the database <command>CLUSTER</command> without any parameter re-clusters all the tables
that the calling user owns and uses the saved cluster information. This in the
current database that the calling user owns, or all tables if called
by a superuser. (Never-clustered tables are not touched.) This
form of <command>CLUSTER</command> cannot be called from inside a form of <command>CLUSTER</command> cannot be called from inside a
transaction or function. transaction or function.
</para> </para>
...@@ -157,15 +159,15 @@ CLUSTER ...@@ -157,15 +159,15 @@ CLUSTER
</para> </para>
<para> <para>
<command>CLUSTER</command> preserves GRANT, inheritance, index, foreign <command>CLUSTER</command> preserves GRANT, inheritance, index, foreign
key, and other ancillary information about the table. key, and other ancillary information about the table.
</para> </para>
<para> <para>
Because <command>CLUSTER</command> remembers the clustering information, Because <command>CLUSTER</command> remembers the clustering information,
one can cluster the tables one wants clustered manually the first time, and one can cluster the tables one wants clustered manually the first time, and
setup a timed event similar to <command>VACUUM</command> so that the tables setup a timed event similar to <command>VACUUM</command> so that the tables
are periodically and automatically clustered. are periodically re-clustered.
</para> </para>
<para> <para>
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.102 2002/12/06 05:00:10 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.103 2002/12/30 18:42:13 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
#include "utils/syscache.h" #include "utils/syscache.h"
#include "utils/relcache.h" #include "utils/relcache.h"
/* /*
* We need one of these structs for each index in the relation to be * We need one of these structs for each index in the relation to be
* clustered. It's basically the data needed by index_create() so * clustered. It's basically the data needed by index_create() so
...@@ -51,7 +52,8 @@ typedef struct ...@@ -51,7 +52,8 @@ typedef struct
bool isclustered; bool isclustered;
} IndexAttrs; } IndexAttrs;
/* This struct is used to pass around the information on tables to be /*
* This struct is used to pass around the information on tables to be
* clustered. We need this so we can make a list of them when invoked without * clustered. We need this so we can make a list of them when invoked without
* a specific table/index pair. * a specific table/index pair.
*/ */
...@@ -59,21 +61,174 @@ typedef struct ...@@ -59,21 +61,174 @@ typedef struct
{ {
Oid tableOid; Oid tableOid;
Oid indexOid; Oid indexOid;
bool isPrevious; } RelToCluster;
} relToCluster;
static void cluster_rel(RelToCluster *rv, bool recheck);
static Oid make_new_heap(Oid OIDOldHeap, const char *NewName); static Oid make_new_heap(Oid OIDOldHeap, const char *NewName);
static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex); static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
static void recreate_indexattr(Oid OIDOldHeap, List *indexes); static List *get_indexattr_list(Relation OldHeap, Oid OldIndex);
static void rebuild_indexes(Oid OIDOldHeap, List *indexes);
static void swap_relfilenodes(Oid r1, Oid r2); static void swap_relfilenodes(Oid r1, Oid r2);
static void cluster_rel(relToCluster *rv); static bool check_cluster_permitted(Oid relOid);
static bool check_cluster_ownership(Oid relOid); static List *get_tables_to_cluster(MemoryContext cluster_context);
static List *get_tables_to_cluster(AclId owner);
/*---------------------------------------------------------------------------
* This cluster code allows for clustering multiple tables at once. Because
* of this, we cannot just run everything on a single transaction, or we
* would be forced to acquire exclusive locks on all the tables being
* clustered, simultaneously --- very likely leading to deadlock.
*
* To solve this we follow a similar strategy to VACUUM code,
* clustering each relation in a separate transaction. For this to work,
* we need to:
* - provide a separate memory context so that we can pass information in
* a way that survives across transactions
* - start a new transaction every time a new relation is clustered
* - check for validity of the information on to-be-clustered relations,
* as someone might have deleted a relation behind our back, or
* clustered one on a different index
* - end the transaction
*
* The single-relation case does not have any such overhead.
*
* We also allow a relation being specified without index. In that case,
* the indisclustered bit will be looked up, and an ERROR will be thrown
* if there is no index with the bit set.
*---------------------------------------------------------------------------
*/
void
cluster(ClusterStmt *stmt)
{
if (stmt->relation != NULL)
{
/* This is the single-relation case. */
Oid tableOid,
indexOid = InvalidOid;
Relation rel;
RelToCluster rvtc;
/* Find and lock the table */
tableOid = RangeVarGetRelid(stmt->relation, false);
rel = heap_open(tableOid, AccessExclusiveLock);
/* Check permissions */
if (!check_cluster_permitted(tableOid))
elog(ERROR, "CLUSTER: You do not own relation %s",
stmt->relation->relname);
if (stmt->indexname == NULL)
{
List *index;
/* We need to find the index that has indisclustered set. */
foreach (index, RelationGetIndexList(rel))
{
HeapTuple idxtuple;
Form_pg_index indexForm;
indexOid = lfirsti(index);
idxtuple = SearchSysCache(INDEXRELID,
ObjectIdGetDatum(indexOid),
0, 0, 0);
if (!HeapTupleIsValid(idxtuple))
elog(ERROR, "Cache lookup failed for index %u",
indexOid);
indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
if (indexForm->indisclustered)
{
ReleaseSysCache(idxtuple);
break;
}
ReleaseSysCache(idxtuple);
indexOid = InvalidOid;
}
if (!OidIsValid(indexOid))
elog(ERROR, "CLUSTER: No previously clustered index found on table \"%s\"",
stmt->relation->relname);
}
else
{
/* The index is expected to be in the same namespace as the relation. */
indexOid = get_relname_relid(stmt->indexname,
rel->rd_rel->relnamespace);
if (!OidIsValid(indexOid))
elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"",
stmt->indexname, stmt->relation->relname);
}
rvtc.tableOid = tableOid;
rvtc.indexOid = indexOid;
/* close relation, keep lock till commit */
heap_close(rel, NoLock);
/* Do the job */
cluster_rel(&rvtc, false);
}
else
{
/*
* This is the "multi relation" case. We need to cluster all tables
* that have some index with indisclustered set.
*/
MemoryContext cluster_context;
List *rv,
*rvs;
/*
* We cannot run this form of CLUSTER inside a user transaction block;
* we'd be holding locks way too long.
*/
PreventTransactionChain((void *) stmt, "CLUSTER");
/*
* Create special memory context for cross-transaction storage.
*
* Since it is a child of QueryContext, it will go away even in case
* of error.
*/
cluster_context = AllocSetContextCreate(QueryContext,
"Cluster",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/*
* Build the list of relations to cluster. Note that this lives in
* cluster_context.
*/
rvs = get_tables_to_cluster(cluster_context);
/* Commit to get out of starting transaction */
CommitTransactionCommand(true);
/* Ok, now that we've got them all, cluster them one by one */
foreach (rv, rvs)
{
RelToCluster *rvtc = (RelToCluster *) lfirst(rv);
/* Start a new transaction for each relation. */
StartTransactionCommand(true);
SetQuerySnapshot(); /* might be needed for functional index */
cluster_rel(rvtc, true);
CommitTransactionCommand(true);
}
/* Start a new transaction for the cleanup work. */
StartTransactionCommand(true);
static MemoryContext cluster_context = NULL; /* Clean up working storage */
MemoryContextDelete(cluster_context);
}
}
/* /*
* cluster * cluster_rel
* *
* This clusters the table by creating a new, clustered table and * This clusters the table by creating a new, clustered table and
* swapping the relfilenodes of the new table and the old table, so * swapping the relfilenodes of the new table and the old table, so
...@@ -85,45 +240,52 @@ static MemoryContext cluster_context = NULL; ...@@ -85,45 +240,52 @@ static MemoryContext cluster_context = NULL;
* same way we do for the relation. Since we are effectively bulk-loading * same way we do for the relation. Since we are effectively bulk-loading
* the new table, it's better to create the indexes afterwards than to fill * the new table, it's better to create the indexes afterwards than to fill
* them incrementally while we load the table. * them incrementally while we load the table.
*
* Since we may open a new transaction for each relation, we have to
* check that the relation still is what we think it is.
*/ */
void static void
cluster_rel(relToCluster *rvtc) cluster_rel(RelToCluster *rvtc, bool recheck)
{ {
Relation OldHeap, Relation OldHeap,
OldIndex; OldIndex;
List *indexes;
/* Check for user-requested abort. */ /* Check for user-requested abort. */
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
/* Check if the relation and index still exist before opening them /*
* Since we may open a new transaction for each relation, we have to
* check that the relation still is what we think it is.
*
* If this is a single-transaction CLUSTER, we can skip these tests.
* We *must* skip the one on indisclustered since it would reject an
* attempt to cluster a not-previously-clustered index.
*/ */
if (!SearchSysCacheExists(RELOID, if (recheck)
ObjectIdGetDatum(rvtc->tableOid), {
0, 0, 0) || HeapTuple tuple;
Form_pg_index indexForm;
/*
* Check if the relation and index still exist before opening them
*/
if (!SearchSysCacheExists(RELOID,
ObjectIdGetDatum(rvtc->tableOid),
0, 0, 0) ||
!SearchSysCacheExists(RELOID, !SearchSysCacheExists(RELOID,
ObjectIdGetDatum(rvtc->indexOid), ObjectIdGetDatum(rvtc->indexOid),
0, 0, 0)) 0, 0, 0))
return; return;
/* Check that the user still owns the relation */
if (!check_cluster_ownership(rvtc->tableOid))
return;
/* Check that the index is still the one with indisclustered set. /* Check that the user still owns the relation */
* If this is a standalone cluster, skip this test. if (!check_cluster_permitted(rvtc->tableOid))
*/ return;
if (rvtc->isPrevious)
{
HeapTuple tuple;
Form_pg_index indexForm;
/*
* Check that the index is still the one with indisclustered set.
*/
tuple = SearchSysCache(INDEXRELID, tuple = SearchSysCache(INDEXRELID,
ObjectIdGetDatum(rvtc->indexOid), ObjectIdGetDatum(rvtc->indexOid),
0, 0, 0); 0, 0, 0);
if (!HeapTupleIsValid(tuple))
return; /* could have gone away... */
indexForm = (Form_pg_index) GETSTRUCT(tuple); indexForm = (Form_pg_index) GETSTRUCT(tuple);
if (!indexForm->indisclustered) if (!indexForm->indisclustered)
{ {
...@@ -135,7 +297,8 @@ cluster_rel(relToCluster *rvtc) ...@@ -135,7 +297,8 @@ cluster_rel(relToCluster *rvtc)
/* /*
* We grab exclusive access to the target rel and index for the * We grab exclusive access to the target rel and index for the
* duration of the transaction. * duration of the transaction. (This is redundant for the single-
* transaction case, since cluster() already did it.)
*/ */
OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock); OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock);
...@@ -162,29 +325,43 @@ cluster_rel(relToCluster *rvtc) ...@@ -162,29 +325,43 @@ cluster_rel(relToCluster *rvtc)
elog(ERROR, "CLUSTER: cannot cluster system relation \"%s\"", elog(ERROR, "CLUSTER: cannot cluster system relation \"%s\"",
RelationGetRelationName(OldHeap)); RelationGetRelationName(OldHeap));
/* Save the information of all indexes on the relation. */ /* Drop relcache refcnt on OldIndex, but keep lock */
indexes = get_indexattr_list(OldHeap, rvtc->indexOid);
/* Drop relcache refcnts, but do NOT give up the locks */
index_close(OldIndex); index_close(OldIndex);
heap_close(OldHeap, NoLock);
/* rebuild_rel does all the dirty work */ /* rebuild_relation does all the dirty work */
rebuild_rel(rvtc->tableOid, rvtc->indexOid, indexes, true); rebuild_relation(OldHeap, rvtc->indexOid);
/* NB: rebuild_relation does heap_close() on OldHeap */
} }
/*
* rebuild_relation: rebuild an existing relation
*
* This is shared code between CLUSTER and TRUNCATE. In the TRUNCATE
* case, the new relation is built and left empty. In the CLUSTER case,
* it is filled with data read from the old relation in the order specified
* by the index.
*
* OldHeap: table to rebuild --- must be opened and exclusive-locked!
* indexOid: index to cluster by, or InvalidOid in TRUNCATE case
*
* NB: this routine closes OldHeap at the right time; caller should not.
*/
void void
rebuild_rel(Oid tableOid, Oid indexOid, List *indexes, bool dataCopy) rebuild_relation(Relation OldHeap, Oid indexOid)
{ {
Oid tableOid = RelationGetRelid(OldHeap);
List *indexes;
Oid OIDNewHeap; Oid OIDNewHeap;
char NewHeapName[NAMEDATALEN]; char NewHeapName[NAMEDATALEN];
ObjectAddress object; ObjectAddress object;
/* /* Save the information about all indexes on the relation. */
* If dataCopy is true, we assume that we will be basing the indexes = get_indexattr_list(OldHeap, indexOid);
* copy off an index for cluster operations.
*/ /* Close relcache entry, but keep lock until transaction commit */
Assert(!dataCopy || OidIsValid(indexOid)); heap_close(OldHeap, NoLock);
/* /*
* Create the new heap, using a temporary name in the same namespace * Create the new heap, using a temporary name in the same namespace
* as the existing table. NOTE: there is some risk of collision with * as the existing table. NOTE: there is some risk of collision with
...@@ -204,7 +381,7 @@ rebuild_rel(Oid tableOid, Oid indexOid, List *indexes, bool dataCopy) ...@@ -204,7 +381,7 @@ rebuild_rel(Oid tableOid, Oid indexOid, List *indexes, bool dataCopy)
/* /*
* Copy the heap data into the new table in the desired order. * Copy the heap data into the new table in the desired order.
*/ */
if (dataCopy) if (OidIsValid(indexOid))
copy_heap_data(OIDNewHeap, tableOid, indexOid); copy_heap_data(OIDNewHeap, tableOid, indexOid);
/* To make the new heap's data visible (probably not needed?). */ /* To make the new heap's data visible (probably not needed?). */
...@@ -230,9 +407,9 @@ rebuild_rel(Oid tableOid, Oid indexOid, List *indexes, bool dataCopy) ...@@ -230,9 +407,9 @@ rebuild_rel(Oid tableOid, Oid indexOid, List *indexes, bool dataCopy)
/* /*
* Recreate each index on the relation. We do not need * Recreate each index on the relation. We do not need
* CommandCounterIncrement() because recreate_indexattr does it. * CommandCounterIncrement() because rebuild_indexes does it.
*/ */
recreate_indexattr(tableOid, indexes); rebuild_indexes(tableOid, indexes);
} }
/* /*
...@@ -335,7 +512,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex) ...@@ -335,7 +512,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
* Get the necessary info about the indexes of the relation and * Get the necessary info about the indexes of the relation and
* return a list of IndexAttrs structures. * return a list of IndexAttrs structures.
*/ */
List * static List *
get_indexattr_list(Relation OldHeap, Oid OldIndex) get_indexattr_list(Relation OldHeap, Oid OldIndex)
{ {
List *indexes = NIL; List *indexes = NIL;
...@@ -366,7 +543,8 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex) ...@@ -366,7 +543,8 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex)
palloc(sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs); palloc(sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs);
memcpy(attrs->classOID, indexForm->indclass, memcpy(attrs->classOID, indexForm->indclass,
sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs); sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs);
attrs->isclustered = (OldIndex == indexOID); /* We adjust the isclustered attribute to correct new state */
attrs->isclustered = (indexOID == OldIndex);
/* Name and access method of each index come from pg_class */ /* Name and access method of each index come from pg_class */
classTuple = SearchSysCache(RELOID, classTuple = SearchSysCache(RELOID,
...@@ -397,7 +575,7 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex) ...@@ -397,7 +575,7 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex)
* the new index (carrying the old index filenode along). * the new index (carrying the old index filenode along).
*/ */
static void static void
recreate_indexattr(Oid OIDOldHeap, List *indexes) rebuild_indexes(Oid OIDOldHeap, List *indexes)
{ {
List *elem; List *elem;
...@@ -629,235 +807,69 @@ swap_relfilenodes(Oid r1, Oid r2) ...@@ -629,235 +807,69 @@ swap_relfilenodes(Oid r1, Oid r2)
heap_close(relRelation, RowExclusiveLock); heap_close(relRelation, RowExclusiveLock);
} }
/*--------------------------------------------------------------------------- /*
* This cluster code allows for clustering multiple tables at once. Because * Checks if the user is allowed to cluster (ie, owns) the relation.
* of this, we cannot just run everything on a single transaction, or we * Superusers are allowed to cluster any table.
* would be forced to acquire exclusive locks on all the tables being
* clustered. To solve this we follow a similar strategy to VACUUM code,
* clustering each relation in a separate transaction. For this to work,
* we need to:
* - provide a separate memory context so that we can pass information in
* a way that trascends transactions
* - start a new transaction every time a new relation is clustered
* - check for validity of the information on to-be-clustered relations,
* as someone might have deleted a relation behind our back, or
* clustered one on a different index
* - end the transaction
*
* The single relation code does not have any overhead.
*
* We also allow a relation being specified without index. In that case,
* the indisclustered bit will be looked up, and an ERROR will be thrown
* if there is no index with the bit set.
*---------------------------------------------------------------------------
*/
void
cluster(ClusterStmt *stmt)
{
/* This is the single relation case. */
if (stmt->relation != NULL)
{
Oid indexOid = InvalidOid,
tableOid;
relToCluster rvtc;
HeapTuple tuple;
Form_pg_class classForm;
tableOid = RangeVarGetRelid(stmt->relation, false);
if (!check_cluster_ownership(tableOid))
elog(ERROR, "CLUSTER: You do not own relation %s",
stmt->relation->relname);
tuple = SearchSysCache(RELOID,
ObjectIdGetDatum(tableOid),
0, 0, 0);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "Cache lookup failed for relation %u", tableOid);
classForm = (Form_pg_class) GETSTRUCT(tuple);
if (stmt->indexname == NULL)
{
List *index;
Relation rel = RelationIdGetRelation(tableOid);
HeapTuple ituple = NULL,
idxtuple = NULL;
/* We need to fetch the index that has indisclustered set. */
foreach (index, RelationGetIndexList(rel))
{
Form_pg_index indexForm;
indexOid = lfirsti(index);
ituple = SearchSysCache(RELOID,
ObjectIdGetDatum(indexOid),
0, 0, 0);
if (!HeapTupleIsValid(ituple))
elog(ERROR, "Cache lookup failed for relation %u", indexOid);
idxtuple = SearchSysCache(INDEXRELID,
ObjectIdGetDatum(HeapTupleGetOid(ituple)),
0, 0, 0);
if (!HeapTupleIsValid(idxtuple))
elog(ERROR, "Cache lookup failed for index %u", HeapTupleGetOid(ituple));
indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
if (indexForm->indisclustered)
break;
indexOid = InvalidOid;
}
if (indexOid == InvalidOid)
elog(ERROR, "CLUSTER: No previously clustered index found on table %s",
stmt->relation->relname);
RelationClose(rel);
ReleaseSysCache(ituple);
ReleaseSysCache(idxtuple);
}
else
{
/* The index is expected to be in the same namespace as the relation. */
indexOid = get_relname_relid(stmt->indexname, classForm->relnamespace);
}
ReleaseSysCache(tuple);
/* XXX Maybe the namespace should be reported as well */
if (!OidIsValid(indexOid))
elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"",
stmt->indexname, stmt->relation->relname);
rvtc.tableOid = tableOid;
rvtc.indexOid = indexOid;
rvtc.isPrevious = false;
/* Do the job */
cluster_rel(&rvtc);
}
else
{
/*
* This is the "no relation" case. We need to cluster all tables
* that have some index with indisclustered set.
*/
relToCluster *rvtc;
List *rv,
*rvs;
/*
* We cannot run CLUSTER inside a user transaction block; if we were inside
* a transaction, then our commit- and start-transaction-command calls
* would not have the intended effect!
*/
if (IsTransactionBlock())
elog(ERROR, "CLUSTER cannot run inside a BEGIN/END block");
/* Running CLUSTER from a function would free the function context */
if (!MemoryContextContains(QueryContext, stmt))
elog(ERROR, "CLUSTER cannot be called from a function");
/*
* Create special memory context for cross-transaction storage.
*
* Since it is a child of QueryContext, it will go away even in case
* of error.
*/
cluster_context = AllocSetContextCreate(QueryContext,
"Cluster",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
/*
* Build the list of relations to cluster. Note that this lives in
* cluster_context.
*/
rvs = get_tables_to_cluster(GetUserId());
/* Ok, now that we've got them all, cluster them one by one */
foreach (rv, rvs)
{
rvtc = (relToCluster *)lfirst(rv);
/* Start a new transaction for this relation. */
StartTransactionCommand(true);
cluster_rel(rvtc);
CommitTransactionCommand(true);
}
}
/* Start a new transaction for the cleanup work. */
StartTransactionCommand(true);
/* Clean up working storage */
if (stmt->relation == NULL)
{
MemoryContextDelete(cluster_context);
cluster_context = NULL;
}
}
/* Checks if the user owns the relation. Superusers
* are allowed to cluster any table.
*/ */
bool static bool
check_cluster_ownership(Oid relOid) check_cluster_permitted(Oid relOid)
{ {
/* Superusers bypass this check */ /* Superusers bypass this check */
return pg_class_ownercheck(relOid, GetUserId()); return pg_class_ownercheck(relOid, GetUserId());
} }
/* Get a list of tables that the current user owns and /*
* Get a list of tables that the current user owns and
* have indisclustered set. Return the list in a List * of rvsToCluster * have indisclustered set. Return the list in a List * of rvsToCluster
* with the tableOid and the indexOid on which the table is already * with the tableOid and the indexOid on which the table is already
* clustered. * clustered.
*/ */
List * static List *
get_tables_to_cluster(AclId owner) get_tables_to_cluster(MemoryContext cluster_context)
{ {
Relation indRelation; Relation indRelation;
HeapScanDesc scan; HeapScanDesc scan;
ScanKeyData entry; ScanKeyData entry;
HeapTuple indexTuple; HeapTuple indexTuple;
Form_pg_index index; Form_pg_index index;
relToCluster *rvtc; MemoryContext old_context;
RelToCluster *rvtc;
List *rvs = NIL; List *rvs = NIL;
/* /*
* Get all indexes that have indisclustered set. System * Get all indexes that have indisclustered set and are owned by
* relations or nailed-in relations cannot ever have * appropriate user. System relations or nailed-in relations cannot ever
* indisclustered set, because CLUSTER will refuse to * have indisclustered set, because CLUSTER will refuse to set it when
* set it when called with one of them as argument. * called with one of them as argument.
*/ */
indRelation = relation_openr(IndexRelationName, RowExclusiveLock); indRelation = relation_openr(IndexRelationName, AccessShareLock);
ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indisclustered, ScanKeyEntryInitialize(&entry, 0,
F_BOOLEQ, true); Anum_pg_index_indisclustered,
F_BOOLEQ,
BoolGetDatum(true));
scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry); scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry);
while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{ {
MemoryContext old_context = NULL;
index = (Form_pg_index) GETSTRUCT(indexTuple); index = (Form_pg_index) GETSTRUCT(indexTuple);
if (!check_cluster_ownership(index->indrelid)) if (!check_cluster_permitted(index->indrelid))
continue; continue;
/* /*
* We have to build the struct in a different memory context so * We have to build the list in a different memory context so
* it will survive the cross-transaction processing * it will survive the cross-transaction processing
*/ */
old_context = MemoryContextSwitchTo(cluster_context); old_context = MemoryContextSwitchTo(cluster_context);
rvtc = (relToCluster *)palloc(sizeof(relToCluster)); rvtc = (RelToCluster *) palloc(sizeof(RelToCluster));
rvtc->indexOid = index->indexrelid;
rvtc->tableOid = index->indrelid; rvtc->tableOid = index->indrelid;
rvtc->isPrevious = true; rvtc->indexOid = index->indexrelid;
rvs = lcons((void *)rvtc, rvs); rvs = lcons(rvtc, rvs);
MemoryContextSwitchTo(old_context); MemoryContextSwitchTo(old_context);
} }
heap_endscan(scan); heap_endscan(scan);
/* relation_close(indRelation, AccessShareLock);
* Release the lock on pg_index. We will check the indexes
* later again.
*
*/
relation_close(indRelation, RowExclusiveLock);
return rvs; return rvs;
} }
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.62 2002/12/16 18:39:22 tgl Exp $ * $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.63 2002/12/30 18:42:14 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -355,7 +355,7 @@ RemoveRelation(const RangeVar *relation, DropBehavior behavior) ...@@ -355,7 +355,7 @@ RemoveRelation(const RangeVar *relation, DropBehavior behavior)
* Removes all the rows from a relation. * Removes all the rows from a relation.
* *
* Note: This routine only does safety and permissions checks; * Note: This routine only does safety and permissions checks;
* rebuild_rel in cluster.c does the actual work. * rebuild_relation in cluster.c does the actual work.
*/ */
void void
TruncateRelation(const RangeVar *relation) TruncateRelation(const RangeVar *relation)
...@@ -366,7 +366,6 @@ TruncateRelation(const RangeVar *relation) ...@@ -366,7 +366,6 @@ TruncateRelation(const RangeVar *relation)
Relation fkeyRel; Relation fkeyRel;
SysScanDesc fkeyScan; SysScanDesc fkeyScan;
HeapTuple tuple; HeapTuple tuple;
List *indexes;
/* Grab exclusive lock in preparation for truncate */ /* Grab exclusive lock in preparation for truncate */
rel = heap_openrv(relation, AccessExclusiveLock); rel = heap_openrv(relation, AccessExclusiveLock);
...@@ -433,17 +432,13 @@ TruncateRelation(const RangeVar *relation) ...@@ -433,17 +432,13 @@ TruncateRelation(const RangeVar *relation)
systable_endscan(fkeyScan); systable_endscan(fkeyScan);
heap_close(fkeyRel, AccessShareLock); heap_close(fkeyRel, AccessShareLock);
/* Save the information of all indexes on the relation. */
indexes = get_indexattr_list(rel, InvalidOid);
/* Keep the lock until transaction commit */
heap_close(rel, NoLock);
/* /*
* Do the real work using the same technique as cluster, but * Do the real work using the same technique as cluster, but
* without the code copy portion * without the data-copying portion
*/ */
rebuild_rel(relid, InvalidOid, indexes, false); rebuild_relation(rel, InvalidOid);
/* NB: rebuild_relation does heap_close() */
} }
/*---------- /*----------
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/tcop/utility.c,v 1.186 2002/12/30 15:31:48 momjian Exp $ * $Header: /cvsroot/pgsql/src/backend/tcop/utility.c,v 1.187 2002/12/30 18:42:16 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -187,7 +187,6 @@ ProcessUtility(Node *parsetree, ...@@ -187,7 +187,6 @@ ProcessUtility(Node *parsetree,
CommandDest dest, CommandDest dest,
char *completionTag) char *completionTag)
{ {
if (completionTag) if (completionTag)
completionTag[0] = '\0'; completionTag[0] = '\0';
...@@ -195,7 +194,6 @@ ProcessUtility(Node *parsetree, ...@@ -195,7 +194,6 @@ ProcessUtility(Node *parsetree,
{ {
/* /*
* ******************************** transactions ******************************** * ******************************** transactions ********************************
*
*/ */
case T_TransactionStmt: case T_TransactionStmt:
{ {
...@@ -742,11 +740,7 @@ ProcessUtility(Node *parsetree, ...@@ -742,11 +740,7 @@ ProcessUtility(Node *parsetree,
break; break;
case T_ClusterStmt: case T_ClusterStmt:
{ cluster((ClusterStmt *) parsetree);
ClusterStmt *stmt = (ClusterStmt *) parsetree;
cluster(stmt);
}
break; break;
case T_VacuumStmt: case T_VacuumStmt:
...@@ -874,7 +868,6 @@ ProcessUtility(Node *parsetree, ...@@ -874,7 +868,6 @@ ProcessUtility(Node *parsetree,
switch (stmt->reindexType) switch (stmt->reindexType)
{ {
char *relname;
case INDEX: case INDEX:
CheckOwnership(stmt->relation, false); CheckOwnership(stmt->relation, false);
ReindexIndex(stmt->relation, stmt->force); ReindexIndex(stmt->relation, stmt->force);
...@@ -884,8 +877,7 @@ ProcessUtility(Node *parsetree, ...@@ -884,8 +877,7 @@ ProcessUtility(Node *parsetree,
ReindexTable(stmt->relation, stmt->force); ReindexTable(stmt->relation, stmt->force);
break; break;
case DATABASE: case DATABASE:
relname = (char *) stmt->name; ReindexDatabase(stmt->name, stmt->force, false);
ReindexDatabase(relname, stmt->force, false);
break; break;
} }
break; break;
......
...@@ -6,22 +6,19 @@ ...@@ -6,22 +6,19 @@
* Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
* Portions Copyright (c) 1994-5, Regents of the University of California * Portions Copyright (c) 1994-5, Regents of the University of California
* *
* $Id: cluster.h,v 1.17 2002/11/23 04:05:52 momjian Exp $ * $Id: cluster.h,v 1.18 2002/12/30 18:42:16 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#ifndef CLUSTER_H #ifndef CLUSTER_H
#define CLUSTER_H #define CLUSTER_H
#include <nodes/parsenodes.h> #include "nodes/parsenodes.h"
/* #include "utils/rel.h"
* functions
*/
extern void cluster(ClusterStmt *stmt);
extern List *get_indexattr_list(Relation OldHeap, Oid OldIndex);
extern void rebuild_rel(Oid tableOid, Oid indexOid,
List *indexes, bool dataCopy);
extern void cluster(ClusterStmt *stmt);
extern void rebuild_relation(Relation OldHeap, Oid indexOid);
#endif /* CLUSTER_H */ #endif /* CLUSTER_H */
...@@ -304,7 +304,7 @@ INSERT INTO clstr_3 VALUES (2); ...@@ -304,7 +304,7 @@ INSERT INTO clstr_3 VALUES (2);
INSERT INTO clstr_3 VALUES (1); INSERT INTO clstr_3 VALUES (1);
-- "CLUSTER <tablename>" on a table that hasn't been clustered -- "CLUSTER <tablename>" on a table that hasn't been clustered
CLUSTER clstr_2; CLUSTER clstr_2;
ERROR: CLUSTER: No previously clustered index found on table clstr_2 ERROR: CLUSTER: No previously clustered index found on table "clstr_2"
CLUSTER clstr_1_pkey ON clstr_1; CLUSTER clstr_1_pkey ON clstr_1;
CLUSTER clstr_2_pkey ON clstr_2; CLUSTER clstr_2_pkey ON clstr_2;
SELECT * FROM clstr_1 UNION ALL SELECT * FROM clstr_1 UNION ALL
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment