Commit 1da5c119 authored by Robert Haas's avatar Robert Haas

Improve behavior of concurrent ALTER <relation> .. SET SCHEMA.

If the referrent of a name changes while we're waiting for the lock,
we must recheck permissons.  We also now check the relkind before
locking, since it's easy to do that long the way.

Patch by me; review by Noah Misch.
parent 74a1d4fe
......@@ -191,7 +191,6 @@ ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt)
case OBJECT_TABLE:
case OBJECT_VIEW:
case OBJECT_FOREIGN_TABLE:
CheckRelationOwnership(stmt->relation, true);
AlterTableNamespace(stmt->relation, stmt->newschema,
stmt->objectType, AccessExclusiveLock);
break;
......
......@@ -9379,28 +9379,35 @@ ATExecGenericOptions(Relation rel, List *options)
heap_freetuple(tuple);
}
/*
* Execute ALTER TABLE SET SCHEMA
*
* Note: caller must have checked ownership of the relation already
* Perform permissions and integrity checks before acquiring a relation lock.
*/
void
AlterTableNamespace(RangeVar *relation, const char *newschema,
ObjectType stmttype, LOCKMODE lockmode)
static void
RangeVarCallbackForAlterTableNamespace(const RangeVar *rv, Oid relid,
Oid oldrelid, void *arg)
{
Relation rel;
Oid relid;
Oid oldNspOid;
Oid nspOid;
Relation classRel;
HeapTuple tuple;
Form_pg_class form;
ObjectType stmttype;
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
if (!HeapTupleIsValid(tuple))
return; /* concurrently dropped */
form = (Form_pg_class) GETSTRUCT(tuple);
rel = relation_openrv(relation, lockmode);
/* Must own table. */
if (!pg_class_ownercheck(relid, GetUserId()))
aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, rv->relname);
relid = RelationGetRelid(rel);
oldNspOid = RelationGetNamespace(rel);
/* No system table modifications unless explicitly allowed. */
if (!allowSystemTableMods && IsSystemClass(form))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
rv->relname)));
/* Check relation type against type specified in the ALTER command */
stmttype = * (ObjectType *) arg;
switch (stmttype)
{
case OBJECT_TABLE:
......@@ -9412,27 +9419,24 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
break;
case OBJECT_SEQUENCE:
if (rel->rd_rel->relkind != RELKIND_SEQUENCE)
if (form->relkind != RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a sequence",
RelationGetRelationName(rel))));
errmsg("\"%s\" is not a sequence", rv->relname)));
break;
case OBJECT_VIEW:
if (rel->rd_rel->relkind != RELKIND_VIEW)
if (form->relkind != RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a view",
RelationGetRelationName(rel))));
errmsg("\"%s\" is not a view", rv->relname)));
break;
case OBJECT_FOREIGN_TABLE:
if (rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
if (form->relkind != RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a foreign table",
RelationGetRelationName(rel))));
errmsg("\"%s\" is not a foreign table", rv->relname)));
break;
default:
......@@ -9440,33 +9444,18 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
}
/* Can we change the schema of this tuple? */
switch (rel->rd_rel->relkind)
switch (form->relkind)
{
case RELKIND_RELATION:
case RELKIND_VIEW:
case RELKIND_SEQUENCE:
case RELKIND_FOREIGN_TABLE:
/* ok to change schema */
break;
case RELKIND_SEQUENCE:
{
/* if it's an owned sequence, disallow moving it by itself */
Oid tableId;
int32 colId;
if (sequenceIsOwned(relid, &tableId, &colId))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot move an owned sequence into another schema"),
errdetail("Sequence \"%s\" is linked to table \"%s\".",
RelationGetRelationName(rel),
get_rel_name(tableId))));
}
break;
case RELKIND_COMPOSITE_TYPE:
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a composite type",
RelationGetRelationName(rel)),
errmsg("\"%s\" is a composite type", rv->relname),
errhint("Use ALTER TYPE instead.")));
break;
case RELKIND_INDEX:
......@@ -9476,7 +9465,45 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a table, view, sequence, or foreign table",
RelationGetRelationName(rel))));
rv->relname)));
}
ReleaseSysCache(tuple);
}
/*
* Execute ALTER TABLE SET SCHEMA
*/
void
AlterTableNamespace(RangeVar *relation, const char *newschema,
ObjectType stmttype, LOCKMODE lockmode)
{
Relation rel;
Oid relid;
Oid oldNspOid;
Oid nspOid;
Relation classRel;
relid = RangeVarGetRelidExtended(relation, lockmode, false, false,
RangeVarCallbackForAlterTableNamespace,
(void *) &stmttype);
rel = relation_open(relid, NoLock);
oldNspOid = RelationGetNamespace(rel);
/* If it's an owned sequence, disallow moving it by itself. */
if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
{
Oid tableId;
int32 colId;
if (sequenceIsOwned(relid, &tableId, &colId))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot move an owned sequence into another schema"),
errdetail("Sequence \"%s\" is linked to table \"%s\".",
RelationGetRelationName(rel),
get_rel_name(tableId))));
}
/* get schema OID and check its permissions */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment