Commit 1489e2f2 authored by Robert Haas's avatar Robert Haas

Improve behavior of concurrent ALTER TABLE, and do some refactoring.

ALTER TABLE (and ALTER VIEW, ALTER SEQUENCE, etc.) now use a
RangeVarGetRelid callback to check permissions before acquiring a table
lock.  We also now use the same callback for all forms of ALTER TABLE,
rather than having separate, almost-identical callbacks for ALTER TABLE
.. SET SCHEMA and ALTER TABLE .. RENAME, and no callback at all for
everything else.

I went ahead and changed the code so that no form of ALTER TABLE works
on foreign tables; you must use ALTER FOREIGN TABLE instead.  In 9.1,
it was possible to use ALTER TABLE .. SET SCHEMA or ALTER TABLE ..
RENAME on a foreign table, but not any other form of ALTER TABLE, which
did not seem terribly useful or consistent.

Patch by me; review by Noah Misch.
parent 33aaa139
......@@ -192,8 +192,7 @@ ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt)
case OBJECT_TABLE:
case OBJECT_VIEW:
case OBJECT_FOREIGN_TABLE:
AlterTableNamespace(stmt->relation, stmt->newschema,
stmt->objectType, AccessExclusiveLock);
AlterTableNamespace(stmt);
break;
case OBJECT_TSPARSER:
......
......@@ -387,6 +387,8 @@ static const char *storage_name(char c);
static void RangeVarCallbackForDropRelation(const RangeVar *rel, Oid relOid,
Oid oldRelOid, void *arg);
static void RangeVarCallbackForAlterRelation(const RangeVar *rv, Oid relid,
Oid oldrelid, void *arg);
/* ----------------------------------------------------------------
......@@ -2318,81 +2320,6 @@ renameatt(RenameStmt *stmt)
stmt->behavior);
}
/*
* Perform permissions and integrity checks before acquiring a relation lock.
*/
static void
RangeVarCallbackForRenameRelation(const RangeVar *rv, Oid relid, Oid oldrelid,
void *arg)
{
RenameStmt *stmt = (RenameStmt *) arg;
ObjectType reltype;
HeapTuple tuple;
Form_pg_class classform;
AclResult aclresult;
char relkind;
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tuple))
return; /* concurrently dropped */
classform = (Form_pg_class) GETSTRUCT(tuple);
relkind = classform->relkind;
/* Must own table. */
if (!pg_class_ownercheck(relid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
NameStr(classform->relname));
/* No system table modifications unless explicitly allowed. */
if (!allowSystemTableMods && IsSystemClass(classform))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
NameStr(classform->relname))));
/* Must (still) have CREATE rights on containing namespace. */
aclresult = pg_namespace_aclcheck(classform->relnamespace, GetUserId(),
ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
get_namespace_name(classform->relnamespace));
/*
* For compatibility with prior releases, we don't complain if ALTER TABLE
* or ALTER INDEX is used to rename some other type of relation. But
* ALTER SEQUENCE/VIEW/FOREIGN TABLE are only to be used with relations of
* that type.
*/
reltype = stmt->renameType;
if (reltype == OBJECT_SEQUENCE && relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a sequence", rv->relname)));
if (reltype == OBJECT_VIEW && relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a view", rv->relname)));
if (reltype == OBJECT_FOREIGN_TABLE && relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a foreign table", rv->relname)));
/*
* Don't allow ALTER TABLE on composite types. We want people to use ALTER
* TYPE for that.
*/
if (relkind == RELKIND_COMPOSITE_TYPE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a composite type", rv->relname),
errhint("Use ALTER TYPE instead.")));
ReleaseSysCache(tuple);
}
/*
* Execute ALTER TABLE/INDEX/SEQUENCE/VIEW/FOREIGN TABLE RENAME
*/
......@@ -2410,7 +2337,7 @@ RenameRelation(RenameStmt *stmt)
*/
relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
false, false,
RangeVarCallbackForRenameRelation,
RangeVarCallbackForAlterRelation,
(void *) stmt);
/* Do the work */
......@@ -2545,6 +2472,19 @@ CheckTableNotInUse(Relation rel, const char *stmt)
stmt, RelationGetRelationName(rel))));
}
/*
* AlterTableLookupRelation
* Look up, and lock, the OID for the relation named by an alter table
* statement.
*/
Oid
AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode)
{
return RangeVarGetRelidExtended(stmt->relation, lockmode, false, false,
RangeVarCallbackForAlterRelation,
(void *) stmt);
}
/*
* AlterTable
* Execute ALTER TABLE, which can be a list of subcommands
......@@ -2579,90 +2519,26 @@ CheckTableNotInUse(Relation rel, const char *stmt)
* Thanks to the magic of MVCC, an error anywhere along the way rolls back
* the whole operation; we don't have to do anything special to clean up.
*
* We lock the table as the first action, with an appropriate lock level
* The caller must lock the relation, with an appropriate lock level
* for the subcommands requested. Any subcommand that needs to rewrite
* tuples in the table forces the whole command to be executed with
* AccessExclusiveLock. If all subcommands do not require rewrite table
* then we may be able to use lower lock levels. We pass the lock level down
* AccessExclusiveLock (actually, that is currently required always, but
* we hope to relax it at some point). We pass the lock level down
* so that we can apply it recursively to inherited tables. Note that the
* lock level we want as we recurse may well be higher than required for
* lock level we want as we recurse might well be higher than required for
* that specific subcommand. So we pass down the overall lock requirement,
* rather than reassess it at lower levels.
*/
void
AlterTable(AlterTableStmt *stmt)
AlterTable(Oid relid, LOCKMODE lockmode, AlterTableStmt *stmt)
{
Relation rel;
LOCKMODE lockmode = AlterTableGetLockLevel(stmt->cmds);
/*
* Acquire same level of lock as already acquired during parsing.
*/
rel = relation_openrv(stmt->relation, lockmode);
/* Caller is required to provide an adequate lock. */
rel = relation_open(relid, NoLock);
CheckTableNotInUse(rel, "ALTER TABLE");
/* Check relation type against type specified in the ALTER command */
switch (stmt->relkind)
{
case OBJECT_TABLE:
/*
* For mostly-historical reasons, we allow ALTER TABLE to apply to
* almost all relation types.
*/
if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE
|| rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table",
RelationGetRelationName(rel))));
break;
case OBJECT_INDEX:
if (rel->rd_rel->relkind != RELKIND_INDEX)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not an index",
RelationGetRelationName(rel))));
break;
case OBJECT_SEQUENCE:
if (rel->rd_rel->relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a sequence",
RelationGetRelationName(rel))));
break;
case OBJECT_TYPE:
if (rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a composite type",
RelationGetRelationName(rel))));
break;
case OBJECT_VIEW:
if (rel->rd_rel->relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a view",
RelationGetRelationName(rel))));
break;
case OBJECT_FOREIGN_TABLE:
if (rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a foreign table",
RelationGetRelationName(rel))));
break;
default:
elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind);
}
ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt),
lockmode);
}
......@@ -9530,104 +9406,11 @@ ATExecGenericOptions(Relation rel, List *options)
heap_freetuple(tuple);
}
/*
* Perform permissions and integrity checks before acquiring a relation lock.
*/
static void
RangeVarCallbackForAlterTableNamespace(const RangeVar *rv, Oid relid,
Oid oldrelid, void *arg)
{
HeapTuple tuple;
Form_pg_class form;
ObjectType stmttype;
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tuple))
return; /* concurrently dropped */
form = (Form_pg_class) GETSTRUCT(tuple);
/* Must own table. */
if (!pg_class_ownercheck(relid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, rv->relname);
/* No system table modifications unless explicitly allowed. */
if (!allowSystemTableMods && IsSystemClass(form))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
rv->relname)));
/* Check relation type against type specified in the ALTER command */
stmttype = * (ObjectType *) arg;
switch (stmttype)
{
case OBJECT_TABLE:
/*
* For mostly-historical reasons, we allow ALTER TABLE to apply to
* all relation types.
*/
break;
case OBJECT_SEQUENCE:
if (form->relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a sequence", rv->relname)));
break;
case OBJECT_VIEW:
if (form->relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a view", rv->relname)));
break;
case OBJECT_FOREIGN_TABLE:
if (form->relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a foreign table", rv->relname)));
break;
default:
elog(ERROR, "unrecognized object type: %d", (int) stmttype);
}
/* Can we change the schema of this tuple? */
switch (form->relkind)
{
case RELKIND_RELATION:
case RELKIND_VIEW:
case RELKIND_SEQUENCE:
case RELKIND_FOREIGN_TABLE:
/* ok to change schema */
break;
case RELKIND_COMPOSITE_TYPE:
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a composite type", rv->relname),
errhint("Use ALTER TYPE instead.")));
break;
case RELKIND_INDEX:
case RELKIND_TOASTVALUE:
/* FALL THRU */
default:
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table, view, sequence, or foreign table",
rv->relname)));
}
ReleaseSysCache(tuple);
}
/*
* Execute ALTER TABLE SET SCHEMA
*/
void
AlterTableNamespace(RangeVar *relation, const char *newschema,
ObjectType stmttype, LOCKMODE lockmode)
AlterTableNamespace(AlterObjectSchemaStmt *stmt)
{
Relation rel;
Oid relid;
......@@ -9635,9 +9418,10 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
Oid nspOid;
Relation classRel;
relid = RangeVarGetRelidExtended(relation, lockmode, false, false,
RangeVarCallbackForAlterTableNamespace,
(void *) &stmttype);
relid = RangeVarGetRelidExtended(stmt->relation, AccessExclusiveLock,
false, false,
RangeVarCallbackForAlterRelation,
(void *) stmt);
rel = relation_open(relid, NoLock);
oldNspOid = RelationGetNamespace(rel);
......@@ -9658,7 +9442,7 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
}
/* get schema OID and check its permissions */
nspOid = LookupCreationNamespace(newschema);
nspOid = LookupCreationNamespace(stmt->newschema);
/* common checks on switching namespaces */
CheckSetNamespace(oldNspOid, nspOid, RelationRelationId, relid);
......@@ -9675,7 +9459,8 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
if (rel->rd_rel->relkind == RELKIND_RELATION)
{
AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid);
AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, newschema, lockmode);
AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, stmt->newschema,
AccessExclusiveLock);
AlterConstraintNamespaces(relid, oldNspOid, nspOid, false);
}
......@@ -10077,3 +9862,123 @@ RangeVarCallbackOwnsTable(const RangeVar *relation,
if (!pg_class_ownercheck(relId, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, relation->relname);
}
/*
* Common RangeVarGetRelid callback for rename, set schema, and alter table
* processing.
*/
static void
RangeVarCallbackForAlterRelation(const RangeVar *rv, Oid relid, Oid oldrelid,
void *arg)
{
Node *stmt = (Node *) arg;
ObjectType reltype;
HeapTuple tuple;
Form_pg_class classform;
AclResult aclresult;
char relkind;
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tuple))
return; /* concurrently dropped */
classform = (Form_pg_class) GETSTRUCT(tuple);
relkind = classform->relkind;
/* Must own relation. */
if (!pg_class_ownercheck(relid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, rv->relname);
/* No system table modifications unless explicitly allowed. */
if (!allowSystemTableMods && IsSystemClass(classform))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
rv->relname)));
/*
* Extract the specified relation type from the statement parse tree.
*
* Also, for ALTER .. RENAME, check permissions: the user must (still)
* have CREATE rights on the containing namespace.
*/
if (IsA(stmt, RenameStmt))
{
aclresult = pg_namespace_aclcheck(classform->relnamespace,
GetUserId(), ACL_CREATE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
get_namespace_name(classform->relnamespace));
reltype = ((RenameStmt *) stmt)->renameType;
}
else if (IsA(stmt, AlterObjectSchemaStmt))
reltype = ((AlterObjectSchemaStmt *) stmt)->objectType;
else if (IsA(stmt, AlterTableStmt))
reltype = ((AlterTableStmt *) stmt)->relkind;
else
{
reltype = OBJECT_TABLE; /* placate compiler */
elog(ERROR, "unrecognized node type: %d", (int) nodeTag(stmt));
}
/*
* For compatibility with prior releases, we allow ALTER TABLE to be
* used with most other types of relations (but not composite types).
* We allow similar flexibility for ALTER INDEX in the case of RENAME,
* but not otherwise. Otherwise, the user must select the correct form
* of the command for the relation at issue.
*/
if (reltype == OBJECT_SEQUENCE && relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a sequence", rv->relname)));
if (reltype == OBJECT_VIEW && relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a view", rv->relname)));
if (reltype == OBJECT_FOREIGN_TABLE && relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a foreign table", rv->relname)));
if (reltype == OBJECT_TYPE && relkind != RELKIND_COMPOSITE_TYPE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a composite type", rv->relname)));
if (reltype == OBJECT_FOREIGN_TABLE && relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a foreign table", rv->relname)));
if (reltype == OBJECT_INDEX && relkind != RELKIND_INDEX
&& !IsA(stmt, RenameStmt))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not an index", rv->relname)));
/*
* Don't allow ALTER TABLE on composite types. We want people to use ALTER
* TYPE for that.
*/
if (reltype != OBJECT_TYPE && relkind == RELKIND_COMPOSITE_TYPE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a composite type", rv->relname),
errhint("Use ALTER TYPE instead.")));
/*
* Don't allow ALTER TABLE .. SET SCHEMA on relations that can't be
* moved to a different schema, such as indexes and TOAST tables.
*/
if (IsA(stmt, AlterObjectSchemaStmt) && relkind != RELKIND_RELATION
&& relkind != RELKIND_VIEW && relkind != RELKIND_SEQUENCE
&& relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table, view, sequence, or foreign table",
rv->relname)));
ReleaseSysCache(tuple);
}
......@@ -699,12 +699,23 @@ standard_ProcessUtility(Node *parsetree,
case T_AlterTableStmt:
{
AlterTableStmt *atstmt = (AlterTableStmt *) parsetree;
Oid relid;
List *stmts;
ListCell *l;
LOCKMODE lockmode;
/*
* Figure out lock mode, and acquire lock. This also does
* basic permissions checks, so that we won't wait for a lock
* on (for example) a relation on which we have no
* permissions.
*/
lockmode = AlterTableGetLockLevel(atstmt->cmds);
relid = AlterTableLookupRelation(atstmt, lockmode);
/* Run parse analysis ... */
stmts = transformAlterTableStmt((AlterTableStmt *) parsetree,
queryString);
stmts = transformAlterTableStmt(atstmt, queryString);
/* ... and do it */
foreach(l, stmts)
......@@ -714,7 +725,7 @@ standard_ProcessUtility(Node *parsetree,
if (IsA(stmt, AlterTableStmt))
{
/* Do the table alteration proper */
AlterTable((AlterTableStmt *) stmt);
AlterTable(relid, lockmode, (AlterTableStmt *) stmt);
}
else
{
......
......@@ -24,7 +24,9 @@ extern Oid DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId);
extern void RemoveRelations(DropStmt *drop);
extern void AlterTable(AlterTableStmt *stmt);
extern Oid AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode);
extern void AlterTable(Oid relid, LOCKMODE lockmode, AlterTableStmt *stmt);
extern LOCKMODE AlterTableGetLockLevel(List *cmds);
......@@ -32,8 +34,7 @@ extern void ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, L
extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
extern void AlterTableNamespace(RangeVar *relation, const char *newschema,
ObjectType stmttype, LOCKMODE lockmode);
extern void AlterTableNamespace(AlterObjectSchemaStmt *stmt);
extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
Oid oldNspOid, Oid newNspOid,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment