Commit cbe24a6d authored by Robert Haas's avatar Robert Haas

Improve behavior of concurrent CLUSTER.

In the previous coding, a user could queue up for an AccessExclusiveLock
on a table they did not have permission to cluster, thus potentially
interfering with access by authorized users who got stuck waiting behind
the AccessExclusiveLock.  This approach avoids that.  cluster() has the
same permissions-checking requirements as REINDEX TABLE, so this commit
moves the now-shared callback to tablecmds.c and renames it, per
discussion with Noah Misch.
parent d573e239
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "catalog/dependency.h" #include "catalog/dependency.h"
#include "catalog/heap.h" #include "catalog/heap.h"
#include "catalog/index.h" #include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/toasting.h" #include "catalog/toasting.h"
#include "commands/cluster.h" #include "commands/cluster.h"
#include "commands/tablecmds.h" #include "commands/tablecmds.h"
...@@ -106,15 +107,12 @@ cluster(ClusterStmt *stmt, bool isTopLevel) ...@@ -106,15 +107,12 @@ cluster(ClusterStmt *stmt, bool isTopLevel)
indexOid = InvalidOid; indexOid = InvalidOid;
Relation rel; Relation rel;
/* Find and lock the table */ /* Find, lock, and check permissions on the table */
rel = heap_openrv(stmt->relation, AccessExclusiveLock); tableOid = RangeVarGetRelidExtended(stmt->relation,
AccessExclusiveLock,
tableOid = RelationGetRelid(rel); false, false,
RangeVarCallbackOwnsTable, NULL);
/* Check permissions */ rel = heap_open(tableOid, NoLock);
if (!pg_class_ownercheck(tableOid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
RelationGetRelationName(rel));
/* /*
* Reject clustering a remote temp table ... their local buffer * Reject clustering a remote temp table ... their local buffer
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "catalog/pg_tablespace.h" #include "catalog/pg_tablespace.h"
#include "commands/dbcommands.h" #include "commands/dbcommands.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h" #include "commands/tablespace.h"
#include "mb/pg_wchar.h" #include "mb/pg_wchar.h"
#include "miscadmin.h" #include "miscadmin.h"
...@@ -63,8 +64,6 @@ static void ComputeIndexAttrs(IndexInfo *indexInfo, ...@@ -63,8 +64,6 @@ static void ComputeIndexAttrs(IndexInfo *indexInfo,
static Oid GetIndexOpClass(List *opclass, Oid attrType, static Oid GetIndexOpClass(List *opclass, Oid attrType,
char *accessMethodName, Oid accessMethodId); char *accessMethodName, Oid accessMethodId);
static char *ChooseIndexNameAddition(List *colnames); static char *ChooseIndexNameAddition(List *colnames);
static void RangeVarCallbackForReindexTable(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg);
static void RangeVarCallbackForReindexIndex(const RangeVar *relation, static void RangeVarCallbackForReindexIndex(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg); Oid relId, Oid oldRelId, void *arg);
...@@ -1809,7 +1808,7 @@ ReindexTable(RangeVar *relation) ...@@ -1809,7 +1808,7 @@ ReindexTable(RangeVar *relation)
/* The lock level used here should match reindex_relation(). */ /* The lock level used here should match reindex_relation(). */
heapOid = RangeVarGetRelidExtended(relation, ShareLock, false, false, heapOid = RangeVarGetRelidExtended(relation, ShareLock, false, false,
RangeVarCallbackForReindexTable, NULL); RangeVarCallbackOwnsTable, NULL);
if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST)) if (!reindex_relation(heapOid, REINDEX_REL_PROCESS_TOAST))
ereport(NOTICE, ereport(NOTICE,
...@@ -1817,37 +1816,6 @@ ReindexTable(RangeVar *relation) ...@@ -1817,37 +1816,6 @@ ReindexTable(RangeVar *relation)
relation->relname))); relation->relname)));
} }
/*
* Check permissions on table before acquiring relation lock.
*/
static void
RangeVarCallbackForReindexTable(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg)
{
char relkind;
/* Nothing to do if the relation was not found. */
if (!OidIsValid(relId))
return;
/*
* If the relation does exist, check whether it's an index. But note
* that the relation might have been dropped between the time we did the
* name lookup and now. In that case, there's nothing to do.
*/
relkind = get_rel_relkind(relId);
if (!relkind)
return;
if (relkind != RELKIND_RELATION && relkind != RELKIND_TOASTVALUE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table", relation->relname)));
/* Check permissions */
if (!pg_class_ownercheck(relId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname);
}
/* /*
* ReindexDatabase * ReindexDatabase
* Recreate indexes of a database. * Recreate indexes of a database.
......
...@@ -9933,3 +9933,38 @@ AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid, ...@@ -9933,3 +9933,38 @@ AtEOSubXact_on_commit_actions(bool isCommit, SubTransactionId mySubid,
} }
} }
} }
/*
* This is intended as a callback for RangeVarGetRelidExtended(). It allows
* the table to be locked only if (1) it's a plain table or TOAST table and
* (2) the current user is the owner (or the superuser). This meets the
* permission-checking needs of both CLUTER and REINDEX TABLE; we expose it
* here so that it can be used by both.
*/
void
RangeVarCallbackOwnsTable(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg)
{
char relkind;
/* Nothing to do if the relation was not found. */
if (!OidIsValid(relId))
return;
/*
* If the relation does exist, check whether it's an index. But note
* that the relation might have been dropped between the time we did the
* name lookup and now. In that case, there's nothing to do.
*/
relkind = get_rel_relkind(relId);
if (!relkind)
return;
if (relkind != RELKIND_RELATION && relkind != RELKIND_TOASTVALUE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table", relation->relname)));
/* Check permissions */
if (!pg_class_ownercheck(relId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname);
}
...@@ -71,4 +71,7 @@ extern void AtEOSubXact_on_commit_actions(bool isCommit, ...@@ -71,4 +71,7 @@ extern void AtEOSubXact_on_commit_actions(bool isCommit,
SubTransactionId mySubid, SubTransactionId mySubid,
SubTransactionId parentSubid); SubTransactionId parentSubid);
extern void RangeVarCallbackOwnsTable(const RangeVar *relation,
Oid relId, Oid oldRelId, void *arg);
#endif /* TABLECMDS_H */ #endif /* TABLECMDS_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment