Commit efd9366d authored by Alvaro Herrera's avatar Alvaro Herrera

Fix droppability of constraints upon partition detach

We were failing to set conislocal correctly for constraints in
partitions after partition detach, leading to those constraints becoming
undroppable.  Fix by setting the flag correctly.  Existing databases
might contain constraints with the conislocal wrongly set to false, for
partitions that were detached; this situation should be fixable by
applying an UPDATE on pg_constraint to set conislocal true.  This
problem should otherwise be innocuous and should disappear across a
dump/restore or pg_upgrade.

Secondarily, when constraint drop was attempted in a partitioned table,
ATExecDropConstraint would try to recurse to partitions after doing
performDeletion() of the constraint in the partitioned table itself; but
since the constraint in the partitions are dropped by the initial call
of performDeletion() (because of following dependencies), the recursion
step would fail since it would not find the constraint, causing the
whole operation to fail.  Fix by preventing recursion.

Reported-by: Amit Langote
Diagnosed-by: Amit Langote
Author: Amit Langote, Álvaro Herrera
Discussion: https://postgr.es/m/f2b8ead5-4131-d5a8-8016-2ea0a31250af@lab.ntt.co.jp
parent e6c3ba7f
...@@ -783,6 +783,12 @@ ConstraintSetParentConstraint(Oid childConstrId, Oid parentConstrId) ...@@ -783,6 +783,12 @@ ConstraintSetParentConstraint(Oid childConstrId, Oid parentConstrId)
constrForm = (Form_pg_constraint) GETSTRUCT(newtup); constrForm = (Form_pg_constraint) GETSTRUCT(newtup);
if (OidIsValid(parentConstrId)) if (OidIsValid(parentConstrId))
{ {
/* don't allow setting parent for a constraint that already has one */
Assert(constrForm->coninhcount == 0);
if (constrForm->conparentid != InvalidOid)
elog(ERROR, "constraint %u already has a parent constraint",
childConstrId);
constrForm->conislocal = false; constrForm->conislocal = false;
constrForm->coninhcount++; constrForm->coninhcount++;
constrForm->conparentid = parentConstrId; constrForm->conparentid = parentConstrId;
...@@ -797,10 +803,12 @@ ConstraintSetParentConstraint(Oid childConstrId, Oid parentConstrId) ...@@ -797,10 +803,12 @@ ConstraintSetParentConstraint(Oid childConstrId, Oid parentConstrId)
else else
{ {
constrForm->coninhcount--; constrForm->coninhcount--;
if (constrForm->coninhcount <= 0) constrForm->conislocal = true;
constrForm->conislocal = true;
constrForm->conparentid = InvalidOid; constrForm->conparentid = InvalidOid;
/* Make sure there's no further inheritance. */
Assert(constrForm->coninhcount == 0);
deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId, deleteDependencyRecordsForClass(ConstraintRelationId, childConstrId,
ConstraintRelationId, ConstraintRelationId,
DEPENDENCY_INTERNAL_AUTO); DEPENDENCY_INTERNAL_AUTO);
......
...@@ -7243,6 +7243,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -7243,6 +7243,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
Oid pfeqoperators[INDEX_MAX_KEYS]; Oid pfeqoperators[INDEX_MAX_KEYS];
Oid ppeqoperators[INDEX_MAX_KEYS]; Oid ppeqoperators[INDEX_MAX_KEYS];
Oid ffeqoperators[INDEX_MAX_KEYS]; Oid ffeqoperators[INDEX_MAX_KEYS];
bool connoinherit;
int i; int i;
int numfks, int numfks,
numpks; numpks;
...@@ -7586,6 +7587,12 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -7586,6 +7587,12 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
ffeqoperators[i] = ffeqop; ffeqoperators[i] = ffeqop;
} }
/*
* FKs always inherit for partitioned tables, and never for legacy
* inheritance.
*/
connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE;
/* /*
* Record the FK constraint in pg_constraint. * Record the FK constraint in pg_constraint.
*/ */
...@@ -7616,7 +7623,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -7616,7 +7623,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
NULL, NULL,
true, /* islocal */ true, /* islocal */
0, /* inhcount */ 0, /* inhcount */
true, /* isnoinherit */ connoinherit, /* conNoInherit */
false); /* is_internal */ false); /* is_internal */
ObjectAddressSet(address, ConstraintRelationId, constrOid); ObjectAddressSet(address, ConstraintRelationId, constrOid);
...@@ -7937,7 +7944,7 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel, ...@@ -7937,7 +7944,7 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
} }
systable_endscan(scan); systable_endscan(scan);
heap_close(trigrel, RowExclusiveLock); table_close(trigrel, RowExclusiveLock);
ConstraintSetParentConstraint(fk->conoid, parentConstrOid); ConstraintSetParentConstraint(fk->conoid, parentConstrOid);
CommandCounterIncrement(); CommandCounterIncrement();
...@@ -9131,6 +9138,7 @@ ATExecDropConstraint(Relation rel, const char *constrName, ...@@ -9131,6 +9138,7 @@ ATExecDropConstraint(Relation rel, const char *constrName,
HeapTuple tuple; HeapTuple tuple;
bool found = false; bool found = false;
bool is_no_inherit_constraint = false; bool is_no_inherit_constraint = false;
char contype;
/* At top level, permission check was done in ATPrepCmd, else do it */ /* At top level, permission check was done in ATPrepCmd, else do it */
if (recursing) if (recursing)
...@@ -9171,6 +9179,7 @@ ATExecDropConstraint(Relation rel, const char *constrName, ...@@ -9171,6 +9179,7 @@ ATExecDropConstraint(Relation rel, const char *constrName,
constrName, RelationGetRelationName(rel)))); constrName, RelationGetRelationName(rel))));
is_no_inherit_constraint = con->connoinherit; is_no_inherit_constraint = con->connoinherit;
contype = con->contype;
/* /*
* If it's a foreign-key constraint, we'd better lock the referenced * If it's a foreign-key constraint, we'd better lock the referenced
...@@ -9179,7 +9188,7 @@ ATExecDropConstraint(Relation rel, const char *constrName, ...@@ -9179,7 +9188,7 @@ ATExecDropConstraint(Relation rel, const char *constrName,
* that has unfired events). But we can/must skip that in the * that has unfired events). But we can/must skip that in the
* self-referential case. * self-referential case.
*/ */
if (con->contype == CONSTRAINT_FOREIGN && if (contype == CONSTRAINT_FOREIGN &&
con->confrelid != RelationGetRelid(rel)) con->confrelid != RelationGetRelid(rel))
{ {
Relation frel; Relation frel;
...@@ -9223,6 +9232,17 @@ ATExecDropConstraint(Relation rel, const char *constrName, ...@@ -9223,6 +9232,17 @@ ATExecDropConstraint(Relation rel, const char *constrName,
} }
} }
/*
* For partitioned tables, non-CHECK inherited constraints are dropped via
* the dependency mechanism, so we're done here.
*/
if (contype != CONSTRAINT_CHECK &&
rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
table_close(conrel, RowExclusiveLock);
return;
}
/* /*
* Propagate to children as appropriate. Unlike most other ALTER * Propagate to children as appropriate. Unlike most other ALTER
* routines, we have to do this one level of recursion at a time; we can't * routines, we have to do this one level of recursion at a time; we can't
......
...@@ -1945,7 +1945,23 @@ DETAIL: Key (a)=(2) is not present in table "pkey". ...@@ -1945,7 +1945,23 @@ DETAIL: Key (a)=(2) is not present in table "pkey".
delete from fkpart1.pkey where a = 1; delete from fkpart1.pkey where a = 1;
ERROR: update or delete on table "pkey" violates foreign key constraint "fk_part_a_fkey" on table "fk_part_1" ERROR: update or delete on table "pkey" violates foreign key constraint "fk_part_a_fkey" on table "fk_part_1"
DETAIL: Key (a)=(1) is still referenced from table "fk_part_1". DETAIL: Key (a)=(1) is still referenced from table "fk_part_1".
-- verify that attaching and detaching partitions manipulates the inheritance
-- properties of their FK constraints correctly
create schema fkpart2
create table pkey (a int primary key)
create table fk_part (a int, constraint fkey foreign key (a) references fkpart2.pkey) partition by list (a)
create table fk_part_1 partition of fkpart2.fk_part for values in (1) partition by list (a)
create table fk_part_1_1 (a int, constraint my_fkey foreign key (a) references fkpart2.pkey);
alter table fkpart2.fk_part_1 attach partition fkpart2.fk_part_1_1 for values in (1);
alter table fkpart2.fk_part_1 drop constraint fkey; -- should fail
ERROR: cannot drop inherited constraint "fkey" of relation "fk_part_1"
alter table fkpart2.fk_part_1_1 drop constraint my_fkey; -- should fail
ERROR: cannot drop inherited constraint "my_fkey" of relation "fk_part_1_1"
alter table fkpart2.fk_part detach partition fkpart2.fk_part_1;
alter table fkpart2.fk_part_1 drop constraint fkey; -- ok
alter table fkpart2.fk_part_1_1 drop constraint my_fkey; -- doesn't exist
ERROR: constraint "my_fkey" of relation "fk_part_1_1" does not exist
\set VERBOSITY terse \\ -- suppress cascade details \set VERBOSITY terse \\ -- suppress cascade details
drop schema fkpart0, fkpart1 cascade; drop schema fkpart0, fkpart1, fkpart2 cascade;
NOTICE: drop cascades to 5 other objects NOTICE: drop cascades to 8 other objects
\set VERBOSITY default \set VERBOSITY default
...@@ -1392,6 +1392,20 @@ create table fkpart1.fk_part_1_2 partition of fkpart1.fk_part_1 for values in (2 ...@@ -1392,6 +1392,20 @@ create table fkpart1.fk_part_1_2 partition of fkpart1.fk_part_1 for values in (2
insert into fkpart1.fk_part_1 values (2); -- should fail insert into fkpart1.fk_part_1 values (2); -- should fail
delete from fkpart1.pkey where a = 1; delete from fkpart1.pkey where a = 1;
-- verify that attaching and detaching partitions manipulates the inheritance
-- properties of their FK constraints correctly
create schema fkpart2
create table pkey (a int primary key)
create table fk_part (a int, constraint fkey foreign key (a) references fkpart2.pkey) partition by list (a)
create table fk_part_1 partition of fkpart2.fk_part for values in (1) partition by list (a)
create table fk_part_1_1 (a int, constraint my_fkey foreign key (a) references fkpart2.pkey);
alter table fkpart2.fk_part_1 attach partition fkpart2.fk_part_1_1 for values in (1);
alter table fkpart2.fk_part_1 drop constraint fkey; -- should fail
alter table fkpart2.fk_part_1_1 drop constraint my_fkey; -- should fail
alter table fkpart2.fk_part detach partition fkpart2.fk_part_1;
alter table fkpart2.fk_part_1 drop constraint fkey; -- ok
alter table fkpart2.fk_part_1_1 drop constraint my_fkey; -- doesn't exist
\set VERBOSITY terse \\ -- suppress cascade details \set VERBOSITY terse \\ -- suppress cascade details
drop schema fkpart0, fkpart1 cascade; drop schema fkpart0, fkpart1, fkpart2 cascade;
\set VERBOSITY default \set VERBOSITY default
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment