Commit f56f8f8d authored by Alvaro Herrera's avatar Alvaro Herrera

Support foreign keys that reference partitioned tables

Previously, while primary keys could be made on partitioned tables, it
was not possible to define foreign keys that reference those primary
keys.  Now it is possible to do that.

Author: Álvaro Herrera
Reviewed-by: Amit Langote, Jesper Pedersen
Discussion: https://postgr.es/m/20181102234158.735b3fevta63msbj@alvherre.pgsql
parent 9155580f
......@@ -379,9 +379,6 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
<para>
Partitioned tables do not support <literal>EXCLUDE</literal> constraints;
however, you can define these constraints on individual partitions.
Also, while it's possible to define <literal>PRIMARY KEY</literal>
constraints on partitioned tables, creating foreign keys that
reference a partitioned table is not yet supported.
</para>
<para>
......@@ -1028,9 +1025,7 @@ WITH ( MODULUS <replaceable class="parameter">numeric_literal</replaceable>, REM
addition of a foreign key constraint requires a
<literal>SHARE ROW EXCLUSIVE</literal> lock on the referenced table.
Note that foreign key constraints cannot be defined between temporary
tables and permanent tables. Also note that while it is possible to
define a foreign key on a partitioned table, it is not possible to
declare a foreign key that references a partitioned table.
tables and permanent tables.
</para>
<para>
......
......@@ -415,10 +415,32 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *
Relation rel, Constraint *fkconstraint, Oid parentConstr,
bool recurse, bool recursing,
LOCKMODE lockmode);
static void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
List **cloned);
static void CloneFkReferencing(Relation pg_constraint, Relation parentRel,
Relation partRel, List *clone, List **cloned);
static ObjectAddress addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint,
Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr,
int numfks, int16 *pkattnum, int16 *fkattnum,
Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
bool old_check_ok);
static void addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint,
Relation rel, Relation pkrel, Oid indexOid, Oid parentConstr,
int numfks, int16 *pkattnum, int16 *fkattnum,
Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
bool old_check_ok, LOCKMODE lockmode);
static void CloneForeignKeyConstraints(List **wqueue, Relation parentRel,
Relation partitionRel);
static void CloneFkReferenced(Relation parentRel, Relation partitionRel);
static void CloneFkReferencing(List **wqueue, Relation parentRel,
Relation partRel);
static void createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
Constraint *fkconstraint, Oid constraintOid,
Oid indexOid);
static void createForeignKeyActionTriggers(Relation rel, Oid refRelOid,
Constraint *fkconstraint, Oid constraintOid,
Oid indexOid);
static bool tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
Oid partRelid,
Oid parentConstrOid, int numfks,
AttrNumber *mapped_conkey, AttrNumber *confkey,
Oid *conpfeqop);
static void ATExecDropConstraint(Relation rel, const char *constrName,
DropBehavior behavior,
bool recurse, bool recursing,
......@@ -501,6 +523,8 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
Relation partitionTbl);
static void update_relispartition(Relation classRel, Oid relationId,
bool newval);
static List *GetParentedForeignKeyRefs(Relation partition);
static void ATDetachCheckNoForeignKeyRefs(Relation partition);
/* ----------------------------------------------------------------
......@@ -1083,7 +1107,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
* And foreign keys too. Note that because we're freshly creating the
* table, there is no need to verify these new constraints.
*/
CloneForeignKeyConstraints(parentId, relationId, NULL);
CloneForeignKeyConstraints(NULL, parent, rel);
table_close(parent, NoLock);
}
......@@ -3563,7 +3587,8 @@ AlterTableGetLockLevel(List *cmds)
/*
* Removing constraints can affect SELECTs that have been
* optimised assuming the constraint holds true.
* optimised assuming the constraint holds true. See also
* CloneFkReferenced.
*/
case AT_DropConstraint: /* as DROP INDEX */
case AT_DropNotNull: /* may change some SQL plans */
......@@ -7224,9 +7249,6 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
case CONSTR_FOREIGN:
/*
* Note that we currently never recurse for FK constraints, so the
* "recurse" flag is silently ignored.
*
* Assign or validate constraint name
*/
if (newConstraint->conname)
......@@ -7444,6 +7466,13 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* Subroutine for ATExecAddConstraint. Must already hold exclusive
* lock on the rel, and have done appropriate validity checks for it.
* We do permissions checks here, however.
*
* When the referenced or referencing tables (or both) are partitioned,
* multiple pg_constraint rows are required -- one for each partitioned table
* and each partition on each side (fortunately, not one for every combination
* thereof). We also need action triggers on each leaf partition on the
* referenced side, and check triggers on each leaf partition on the
* referencing side.
*/
static ObjectAddress
ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
......@@ -7459,12 +7488,10 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
Oid pfeqoperators[INDEX_MAX_KEYS];
Oid ppeqoperators[INDEX_MAX_KEYS];
Oid ffeqoperators[INDEX_MAX_KEYS];
bool connoinherit;
int i;
int numfks,
numpks;
Oid indexOid;
Oid constrOid;
bool old_check_ok;
ObjectAddress address;
ListCell *old_pfeqop_item = list_head(fkconstraint->old_conpfeqop);
......@@ -7482,12 +7509,6 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* Validity checks (permission checks wait till we have the column
* numbers)
*/
if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot reference partitioned table \"%s\"",
RelationGetRelationName(pkrel))));
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
if (!recurse)
......@@ -7505,7 +7526,8 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
errdetail("This feature is not yet supported on partitioned tables.")));
}
if (pkrel->rd_rel->relkind != RELKIND_RELATION)
if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("referenced relation \"%s\" is not a table",
......@@ -7741,8 +7763,7 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop)))
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("foreign key constraint \"%s\" "
"cannot be implemented",
errmsg("foreign key constraint \"%s\" cannot be implemented",
fkconstraint->conname),
errdetail("Key columns \"%s\" and \"%s\" "
"are of incompatible types: %s and %s.",
......@@ -7830,15 +7851,126 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
}
/*
* FKs always inherit for partitioned tables, and never for legacy
* inheritance.
* Create all the constraint and trigger objects, recursing to partitions
* as necessary. First handle the referenced side.
*/
address = addFkRecurseReferenced(wqueue, fkconstraint, rel, pkrel,
indexOid,
InvalidOid, /* no parent constraint */
numfks,
pkattnum,
fkattnum,
pfeqoperators,
ppeqoperators,
ffeqoperators,
old_check_ok);
/* Now handle the referencing side. */
addFkRecurseReferencing(wqueue, fkconstraint, rel, pkrel,
indexOid,
address.objectId,
numfks,
pkattnum,
fkattnum,
pfeqoperators,
ppeqoperators,
ffeqoperators,
old_check_ok,
lockmode);
/*
* Done. Close pk table, but keep lock until we've committed.
*/
table_close(pkrel, NoLock);
return address;
}
/*
* addFkRecurseReferenced
* subroutine for ATAddForeignKeyConstraint; recurses on the referenced
* side of the constraint
*
* Create pg_constraint rows for the referenced side of the constraint,
* referencing the parent of the referencing side; also create action triggers
* on leaf partitions. If the table is partitioned, recurse to handle each
* partition.
*
* wqueue is the ALTER TABLE work queue; can be NULL when not running as part
* of an ALTER TABLE sequence.
* fkconstraint is the constraint being added.
* rel is the root referencing relation.
* pkrel is the referenced relation; might be a partition, if recursing.
* indexOid is the OID of the index (on pkrel) implementing this constraint.
* parentConstr is the OID of a parent constraint; InvalidOid if this is a
* top-level constraint.
* numfks is the number of columns in the foreign key
* pkattnum is the attnum array of referenced attributes.
* fkattnum is the attnum array of referencing attributes.
* pf/pp/ffeqoperators are OID array of operators between columns.
* old_check_ok signals that this constraint replaces an existing one that
* was already validated (thus this one doesn't need validation).
*/
static ObjectAddress
addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
Relation pkrel, Oid indexOid, Oid parentConstr,
int numfks,
int16 *pkattnum, int16 *fkattnum, Oid *pfeqoperators,
Oid *ppeqoperators, Oid *ffeqoperators, bool old_check_ok)
{
ObjectAddress address;
Oid constrOid;
char *conname;
bool conislocal;
int coninhcount;
bool connoinherit;
/*
* Verify relkind for each referenced partition. At the top level, this
* is redundant with a previous check, but we need it when recursing.
*/
if (pkrel->rd_rel->relkind != RELKIND_RELATION &&
pkrel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("referenced relation \"%s\" is not a table",
RelationGetRelationName(pkrel))));
/*
* Caller supplies us with a constraint name; however, it may be used in
* this partition, so come up with a different one in that case.
*/
if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
RelationGetRelid(rel),
fkconstraint->conname))
conname = ChooseConstraintName(RelationGetRelationName(rel),
ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
"fkey",
RelationGetNamespace(rel), NIL);
else
conname = fkconstraint->conname;
if (OidIsValid(parentConstr))
{
conislocal = false;
coninhcount = 1;
connoinherit = false;
}
else
{
conislocal = true;
coninhcount = 0;
/*
* always inherit for partitioned tables, never for legacy inheritance
*/
connoinherit = rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE;
}
/*
* Record the FK constraint in pg_constraint.
*/
constrOid = CreateConstraintEntry(fkconstraint->conname,
constrOid = CreateConstraintEntry(conname,
RelationGetNamespace(rel),
CONSTRAINT_FOREIGN,
fkconstraint->deferrable,
......@@ -7856,108 +7988,317 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
pfeqoperators,
ppeqoperators,
ffeqoperators,
numpks,
numfks,
fkconstraint->fk_upd_action,
fkconstraint->fk_del_action,
fkconstraint->fk_matchtype,
NULL, /* no exclusion constraint */
NULL, /* no check constraint */
NULL,
true, /* islocal */
0, /* inhcount */
conislocal, /* islocal */
coninhcount, /* inhcount */
connoinherit, /* conNoInherit */
false); /* is_internal */
ObjectAddressSet(address, ConstraintRelationId, constrOid);
/*
* Create the triggers that will enforce the constraint. We only want the
* action triggers to appear for the parent partitioned relation, even
* though the constraints also exist below.
* Mark the child constraint as part of the parent constraint; it must not
* be dropped on its own. (This constraint is deleted when the partition
* is detached, but a special check needs to occur that the partition
* contains no referenced values.)
*/
if (OidIsValid(parentConstr))
{
ObjectAddress referenced;
ObjectAddressSet(referenced, ConstraintRelationId, parentConstr);
recordDependencyOn(&address, &referenced, DEPENDENCY_INTERNAL);
}
/* make new constraint visible, in case we add more */
CommandCounterIncrement();
/*
* If the referenced table is a plain relation, create the action triggers
* that enforce the constraint.
*/
createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint,
constrOid, indexOid, !recursing);
if (pkrel->rd_rel->relkind == RELKIND_RELATION)
{
createForeignKeyActionTriggers(rel, RelationGetRelid(pkrel),
fkconstraint,
constrOid, indexOid);
}
/*
* If the referenced table is partitioned, recurse on ourselves to handle
* each partition. We need one pg_constraint row created for each
* partition in addition to the pg_constraint row for the parent table.
*/
if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
PartitionDesc pd = RelationGetPartitionDesc(pkrel);
for (int i = 0; i < pd->nparts; i++)
{
Relation partRel;
AttrNumber *map;
AttrNumber *mapped_pkattnum;
Oid partIndexId;
partRel = table_open(pd->oids[i], ShareRowExclusiveLock);
/*
* Map the attribute numbers in the referenced side of the FK
* definition to match the partition's column layout.
*/
map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel),
RelationGetDescr(pkrel),
gettext_noop("could not convert row type"));
if (map)
{
mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks);
for (int j = 0; j < numfks; j++)
mapped_pkattnum[j] = map[pkattnum[j] - 1];
}
else
mapped_pkattnum = pkattnum;
/* do the deed */
partIndexId = index_get_partition(partRel, indexOid);
if (!OidIsValid(partIndexId))
elog(ERROR, "index for %u not found in partition %s",
indexOid, RelationGetRelationName(partRel));
addFkRecurseReferenced(wqueue, fkconstraint, rel, partRel,
partIndexId, constrOid, numfks,
mapped_pkattnum, fkattnum,
pfeqoperators, ppeqoperators, ffeqoperators,
old_check_ok);
/* Done -- clean up (but keep the lock) */
table_close(partRel, NoLock);
if (map)
{
pfree(mapped_pkattnum);
pfree(map);
}
}
}
return address;
}
/*
* addFkRecurseReferencing
* subroutine for ATAddForeignKeyConstraint and CloneFkReferencing
*
* If the referencing relation is a plain relation, create the necessary check
* triggers that implement the constraint, and set up for Phase 3 constraint
* verification. If the referencing relation is a partitioned table, then
* we create a pg_constraint row for it and recurse on this routine for each
* partition.
*
* We assume that the referenced relation is locked against concurrent
* deletions. If it's a partitioned relation, every partition must be so
* locked.
*
* wqueue is the ALTER TABLE work queue; can be NULL when not running as part
* of an ALTER TABLE sequence.
* fkconstraint is the constraint being added.
* rel is the referencing relation; might be a partition, if recursing.
* pkrel is the root referenced relation.
* indexOid is the OID of the index (on pkrel) implementing this constraint.
* parentConstr is the OID of the parent constraint (there is always one).
* numfks is the number of columns in the foreign key
* pkattnum is the attnum array of referenced attributes.
* fkattnum is the attnum array of referencing attributes.
* pf/pp/ffeqoperators are OID array of operators between columns.
* old_check_ok signals that this constraint replaces an existing one that
* was already validated (thus this one doesn't need validation).
* lockmode is the lockmode to acquire on partitions when recursing.
*/
static void
addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
Relation pkrel, Oid indexOid, Oid parentConstr,
int numfks, int16 *pkattnum, int16 *fkattnum,
Oid *pfeqoperators, Oid *ppeqoperators, Oid *ffeqoperators,
bool old_check_ok, LOCKMODE lockmode)
{
AssertArg(OidIsValid(parentConstr));
if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("foreign keys constraints are not supported on foreign tables")));
/*
* If the referencing relation is a plain table, add the check triggers to
* it and, if necessary, schedule it to be checked in Phase 3.
*
* If the relation is partitioned, drill down to do it to its partitions.
*/
if (rel->rd_rel->relkind == RELKIND_RELATION)
{
createForeignKeyCheckTriggers(RelationGetRelid(rel),
RelationGetRelid(pkrel),
fkconstraint,
parentConstr,
indexOid);
/*
* Tell Phase 3 to check that the constraint is satisfied by existing
* rows. We can skip this during table creation, when requested explicitly
* by specifying NOT VALID in an ADD FOREIGN KEY command, and when we're
* recreating a constraint following a SET DATA TYPE operation that did
* not impugn its validity.
* rows. We can skip this during table creation, when requested
* explicitly by specifying NOT VALID in an ADD FOREIGN KEY command,
* and when we're recreating a constraint following a SET DATA TYPE
* operation that did not impugn its validity.
*/
if (!old_check_ok && !fkconstraint->skip_validation)
if (wqueue && !old_check_ok && !fkconstraint->skip_validation)
{
NewConstraint *newcon;
AlteredTableInfo *tab;
tab = ATGetQueueEntry(wqueue, rel);
newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
newcon->name = fkconstraint->conname;
newcon->name = get_constraint_name(parentConstr);
newcon->contype = CONSTR_FOREIGN;
newcon->refrelid = RelationGetRelid(pkrel);
newcon->refindid = indexOid;
newcon->conid = constrOid;
newcon->conid = parentConstr;
newcon->qual = (Node *) fkconstraint;
tab->constraints = lappend(tab->constraints, newcon);
}
}
else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
PartitionDesc pd = RelationGetPartitionDesc(rel);
/*
* When called on a partitioned table, recurse to create the constraint on
* the partitions also.
* Recurse to take appropriate action on each partition; either we
* find an existing constraint to reparent to ours, or we create a new
* one.
*/
if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
PartitionDesc partdesc;
Relation pg_constraint;
List *cloned = NIL;
ListCell *cell;
pg_constraint = table_open(ConstraintRelationId, RowExclusiveLock);
partdesc = RelationGetPartitionDesc(rel);
for (i = 0; i < partdesc->nparts; i++)
for (int i = 0; i < pd->nparts; i++)
{
Oid partitionId = partdesc->oids[i];
Oid partitionId = pd->oids[i];
Relation partition = table_open(partitionId, lockmode);
List *partFKs;
AttrNumber *attmap;
AttrNumber mapped_fkattnum[INDEX_MAX_KEYS];
bool attached;
char *conname;
Oid constrOid;
ObjectAddress address,
referenced;
ListCell *cell;
CheckTableNotInUse(partition, "ALTER TABLE");
CloneFkReferencing(pg_constraint, rel, partition,
list_make1_oid(constrOid),
&cloned);
attmap = convert_tuples_by_name_map(RelationGetDescr(partition),
RelationGetDescr(rel),
gettext_noop("could not convert row type"));
for (int j = 0; j < numfks; j++)
mapped_fkattnum[j] = attmap[fkattnum[j] - 1];
/* Check whether an existing constraint can be repurposed */
partFKs = copyObject(RelationGetFKeyList(partition));
attached = false;
foreach(cell, partFKs)
{
ForeignKeyCacheInfo *fk;
fk = lfirst_node(ForeignKeyCacheInfo, cell);
if (tryAttachPartitionForeignKey(fk,
partitionId,
parentConstr,
numfks,
mapped_fkattnum,
pkattnum,
pfeqoperators))
{
attached = true;
break;
}
}
if (attached)
{
table_close(partition, NoLock);
continue;
}
table_close(pg_constraint, RowExclusiveLock);
foreach(cell, cloned)
{
ClonedConstraint *cc = (ClonedConstraint *) lfirst(cell);
Relation partition = table_open(cc->relid, lockmode);
AlteredTableInfo *childtab;
NewConstraint *newcon;
/*
* No luck finding a good constraint to reuse; create our own.
*/
if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
RelationGetRelid(partition),
fkconstraint->conname))
conname = ChooseConstraintName(RelationGetRelationName(partition),
ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
"fkey",
RelationGetNamespace(partition), NIL);
else
conname = fkconstraint->conname;
constrOid =
CreateConstraintEntry(conname,
RelationGetNamespace(partition),
CONSTRAINT_FOREIGN,
fkconstraint->deferrable,
fkconstraint->initdeferred,
fkconstraint->initially_valid,
parentConstr,
partitionId,
mapped_fkattnum,
numfks,
numfks,
InvalidOid,
indexOid,
RelationGetRelid(pkrel),
pkattnum,
pfeqoperators,
ppeqoperators,
ffeqoperators,
numfks,
fkconstraint->fk_upd_action,
fkconstraint->fk_del_action,
fkconstraint->fk_matchtype,
NULL,
NULL,
NULL,
false,
1,
false,
false);
/* Find or create work queue entry for this partition */
childtab = ATGetQueueEntry(wqueue, partition);
/*
* Give this constraint partition-type dependencies on the parent
* constraint as well as the table.
*/
ObjectAddressSet(address, ConstraintRelationId, constrOid);
ObjectAddressSet(referenced, ConstraintRelationId, parentConstr);
recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI);
ObjectAddressSet(referenced, RelationRelationId, partitionId);
recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC);
newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
newcon->name = cc->constraint->conname;
newcon->contype = CONSTR_FOREIGN;
newcon->refrelid = cc->refrelid;
newcon->refindid = cc->conindid;
newcon->conid = cc->conid;
newcon->qual = (Node *) fkconstraint;
/* Make all this visible before recursing */
CommandCounterIncrement();
childtab->constraints = lappend(childtab->constraints, newcon);
/* call ourselves to finalize the creation and we're done */
addFkRecurseReferencing(wqueue, fkconstraint, partition, pkrel,
indexOid,
constrOid,
numfks,
pkattnum,
mapped_fkattnum,
pfeqoperators,
ppeqoperators,
ffeqoperators,
old_check_ok,
lockmode);
table_close(partition, lockmode);
table_close(partition, NoLock);
}
}
/*
* Close pk table, but keep lock until we've committed.
*/
table_close(pkrel, NoLock);
return address;
}
/*
......@@ -7965,77 +8306,219 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* Clone foreign keys from a partitioned table to a newly acquired
* partition.
*
* relationId is a partition of parentId, so we can be certain that it has the
* same columns with the same datatypes. The columns may be in different
* partitionRel is a partition of parentRel, so we can be certain that it has
* the same columns with the same datatypes. The columns may be in different
* order, though.
*
* The *cloned list is appended ClonedConstraint elements describing what was
* created, for the purposes of validating the constraint in ALTER TABLE's
* Phase 3.
* wqueue must be passed to set up phase 3 constraint checking, unless the
* referencing-side partition is known to be empty (such as in CREATE TABLE /
* PARTITION OF).
*/
static void
CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
CloneForeignKeyConstraints(List **wqueue, Relation parentRel,
Relation partitionRel)
{
/* This only works for declarative partitioning */
Assert(parentRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
/*
* Clone constraints for which the parent is on the referenced side.
*/
CloneFkReferenced(parentRel, partitionRel);
/*
* Now clone constraints where the parent is on the referencing side.
*/
CloneFkReferencing(wqueue, parentRel, partitionRel);
}
/*
* CloneFkReferenced
* Subroutine for CloneForeignKeyConstraints
*
* Find all the FKs that have the parent relation on the referenced side;
* clone those constraints to the given partition. This is to be called
* when the partition is being created or attached.
*
* This recurses to partitions, if the relation being attached is partitioned.
* Recursion is done by calling addFkRecurseReferenced.
*/
static void
CloneFkReferenced(Relation parentRel, Relation partitionRel)
{
Relation pg_constraint;
Relation parentRel;
Relation rel;
ScanKeyData key;
AttrNumber *attmap;
ListCell *cell;
SysScanDesc scan;
ScanKeyData key[2];
HeapTuple tuple;
List *clone = NIL;
parentRel = table_open(parentId, NoLock); /* already got lock */
/* see ATAddForeignKeyConstraint about lock level */
rel = table_open(relationId, AccessExclusiveLock);
/*
* Search for any constraints where this partition is in the referenced
* side. However, we must ignore any constraint whose parent constraint
* is also going to be cloned, to avoid duplicates. So do it in two
* steps: first construct the list of constraints to clone, then go over
* that list cloning those whose parents are not in the list. (We must
* not rely on the parent being seen first, since the catalog scan could
* return children first.)
*/
pg_constraint = table_open(ConstraintRelationId, RowShareLock);
/* Obtain the list of constraints to clone or attach */
ScanKeyInit(&key,
Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(parentId));
scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
NULL, 1, &key);
ScanKeyInit(&key[0],
Anum_pg_constraint_confrelid, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(parentRel)));
ScanKeyInit(&key[1],
Anum_pg_constraint_contype, BTEqualStrategyNumber,
F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
/* This is a seqscan, as we don't have a usable index ... */
scan = systable_beginscan(pg_constraint, InvalidOid, true,
NULL, 2, key);
while ((tuple = systable_getnext(scan)) != NULL)
{
Oid oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
clone = lappend_oid(clone, oid);
/* Only try to clone the top-level constraint; skip child ones. */
if (constrForm->conparentid != InvalidOid)
continue;
clone = lappend_oid(clone, constrForm->oid);
}
systable_endscan(scan);
table_close(pg_constraint, RowShareLock);
attmap = convert_tuples_by_name_map(RelationGetDescr(partitionRel),
RelationGetDescr(parentRel),
gettext_noop("could not convert row type"));
foreach(cell, clone)
{
Oid constrOid = lfirst_oid(cell);
Form_pg_constraint constrForm;
Relation fkRel;
Oid indexOid;
Oid partIndexId;
int numfks;
AttrNumber conkey[INDEX_MAX_KEYS];
AttrNumber mapped_confkey[INDEX_MAX_KEYS];
AttrNumber confkey[INDEX_MAX_KEYS];
Oid conpfeqop[INDEX_MAX_KEYS];
Oid conppeqop[INDEX_MAX_KEYS];
Oid conffeqop[INDEX_MAX_KEYS];
Constraint *fkconstraint;
/* Do the actual work, recursing to partitions as needed */
CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned);
tuple = SearchSysCache1(CONSTROID, constrOid);
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for constraint %u", constrOid);
constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
/* We're done. Clean up */
table_close(parentRel, NoLock);
table_close(rel, NoLock); /* keep lock till commit */
table_close(pg_constraint, RowShareLock);
/*
* Because we're only expanding the key space at the referenced side,
* we don't need to prevent any operation in the referencing table, so
* AccessShareLock suffices (assumes that dropping the constraint
* acquires AEL).
*/
fkRel = table_open(constrForm->conrelid, AccessShareLock);
indexOid = constrForm->conindid;
DeconstructFkConstraintRow(tuple,
&numfks,
conkey,
confkey,
conpfeqop,
conppeqop,
conffeqop);
for (int i = 0; i < numfks; i++)
mapped_confkey[i] = attmap[confkey[i] - 1];
fkconstraint = makeNode(Constraint);
/* for now this is all we need */
fkconstraint->conname = NameStr(constrForm->conname);
fkconstraint->fk_upd_action = constrForm->confupdtype;
fkconstraint->fk_del_action = constrForm->confdeltype;
fkconstraint->deferrable = constrForm->condeferrable;
fkconstraint->initdeferred = constrForm->condeferred;
fkconstraint->initially_valid = true;
fkconstraint->fk_matchtype = constrForm->confmatchtype;
/* set up colnames that are used to generate the constraint name */
for (int i = 0; i < numfks; i++)
{
Form_pg_attribute att;
att = TupleDescAttr(RelationGetDescr(fkRel),
conkey[i] - 1);
fkconstraint->fk_attrs = lappend(fkconstraint->fk_attrs,
makeString(NameStr(att->attname)));
}
/*
* Add the new foreign key constraint pointing to the new partition.
* Because this new partition appears in the referenced side of the
* constraint, we don't need to set up for Phase 3 check.
*/
partIndexId = index_get_partition(partitionRel, indexOid);
if (!OidIsValid(partIndexId))
elog(ERROR, "index for %u not found in partition %s",
indexOid, RelationGetRelationName(partitionRel));
addFkRecurseReferenced(NULL,
fkconstraint,
fkRel,
partitionRel,
partIndexId,
constrOid,
numfks,
mapped_confkey,
conkey,
conpfeqop,
conppeqop,
conffeqop,
true);
table_close(fkRel, NoLock);
ReleaseSysCache(tuple);
}
}
/*
* CloneFkReferencing
* Recursive subroutine for CloneForeignKeyConstraints, referencing side
* Subroutine for CloneForeignKeyConstraints
*
* Clone the given list of FK constraints when a partition is attached on the
* referencing side of those constraints.
* For each FK constraint of the parent relation in the given list, find an
* equivalent constraint in its partition relation that can be reparented;
* if one cannot be found, create a new constraint in the partition as its
* child.
*
* When cloning foreign keys to a partition, it may happen that equivalent
* constraints already exist in the partition for some of them. We can skip
* creating a clone in that case, and instead just attach the existing
* constraint to the one in the parent.
*
* This function recurses to partitions, if the new partition is partitioned;
* of course, only do this for FKs that were actually cloned.
* If wqueue is given, it is used to set up phase-3 verification for each
* cloned constraint; if omitted, we assume that such verification is not
* needed (example: the partition is being created anew).
*/
static void
CloneFkReferencing(Relation pg_constraint, Relation parentRel,
Relation partRel, List *clone, List **cloned)
CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
{
AttrNumber *attmap;
List *partFKs;
List *subclone = NIL;
List *clone = NIL;
ListCell *cell;
/* obtain a list of constraints that we need to clone */
foreach(cell, RelationGetFKeyList(parentRel))
{
ForeignKeyCacheInfo *fk = lfirst(cell);
clone = lappend_oid(clone, fk->conoid);
}
/*
* Silently do nothing if there's nothing to do. In particular, this
* avoids throwing a spurious error for foreign tables.
*/
if (clone == NIL)
return;
if (partRel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("foreign keys constraints are not supported on foreign tables")));
/*
* The constraint key may differ, if the columns in the partition are
* different. This map is used to convert them.
......@@ -8050,6 +8533,7 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
{
Oid parentConstrOid = lfirst_oid(cell);
Form_pg_constraint constrForm;
Relation pkrel;
HeapTuple tuple;
int numfks;
AttrNumber conkey[INDEX_MAX_KEYS];
......@@ -8059,81 +8543,221 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
Oid conppeqop[INDEX_MAX_KEYS];
Oid conffeqop[INDEX_MAX_KEYS];
Constraint *fkconstraint;
bool attach_it;
bool attached;
Oid indexOid;
Oid constrOid;
ObjectAddress parentAddr,
childAddr,
childTableAddr;
ObjectAddress address,
referenced;
ListCell *cell;
int i;
tuple = SearchSysCache1(CONSTROID, parentConstrOid);
if (!tuple)
elog(ERROR, "cache lookup failed for constraint %u",
parentConstrOid);
constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
tuple = SearchSysCache1(CONSTROID, parentConstrOid);
if (!tuple)
elog(ERROR, "cache lookup failed for constraint %u",
parentConstrOid);
constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
/* Don't clone constraints whose parents are being cloned */
if (list_member_oid(clone, constrForm->conparentid))
{
ReleaseSysCache(tuple);
continue;
}
/*
* Need to prevent concurrent deletions. If pkrel is a partitioned
* relation, that means to lock all partitions.
*/
pkrel = table_open(constrForm->confrelid, ShareRowExclusiveLock);
if (pkrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
(void) find_all_inheritors(RelationGetRelid(pkrel),
ShareRowExclusiveLock, NULL);
DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey,
conpfeqop, conppeqop, conffeqop);
for (int i = 0; i < numfks; i++)
mapped_conkey[i] = attmap[conkey[i] - 1];
/*
* Before creating a new constraint, see whether any existing FKs are
* fit for the purpose. If one is, attach the parent constraint to
* it, and don't clone anything. This way we avoid the expensive
* verification step and don't end up with a duplicate FK, and we
* don't need to recurse to partitions for this constraint.
*/
attached = false;
foreach(cell, partFKs)
{
ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
if (tryAttachPartitionForeignKey(fk,
RelationGetRelid(partRel),
parentConstrOid,
numfks,
mapped_conkey,
confkey,
conpfeqop))
{
attached = true;
table_close(pkrel, NoLock);
break;
}
}
if (attached)
{
ReleaseSysCache(tuple);
continue;
}
/* No dice. Set up to create our own constraint */
fkconstraint = makeNode(Constraint);
if (ConstraintNameIsUsed(CONSTRAINT_RELATION,
RelationGetRelid(partRel),
NameStr(constrForm->conname)))
fkconstraint->conname =
ChooseConstraintName(RelationGetRelationName(partRel),
ChooseForeignKeyConstraintNameAddition(fkconstraint->fk_attrs),
"fkey",
RelationGetNamespace(partRel), NIL);
else
fkconstraint->conname = NameStr(constrForm->conname);
fkconstraint->fk_upd_action = constrForm->confupdtype;
fkconstraint->fk_del_action = constrForm->confdeltype;
fkconstraint->deferrable = constrForm->condeferrable;
fkconstraint->initdeferred = constrForm->condeferred;
fkconstraint->fk_matchtype = constrForm->confmatchtype;
for (int i = 0; i < numfks; i++)
{
Form_pg_attribute att;
att = TupleDescAttr(RelationGetDescr(partRel),
mapped_conkey[i] - 1);
fkconstraint->fk_attrs = lappend(fkconstraint->fk_attrs,
makeString(NameStr(att->attname)));
}
/* only foreign keys */
if (constrForm->contype != CONSTRAINT_FOREIGN)
{
indexOid = constrForm->conindid;
constrOid =
CreateConstraintEntry(fkconstraint->conname,
constrForm->connamespace,
CONSTRAINT_FOREIGN,
fkconstraint->deferrable,
fkconstraint->initdeferred,
constrForm->convalidated,
parentConstrOid,
RelationGetRelid(partRel),
mapped_conkey,
numfks,
numfks,
InvalidOid, /* not a domain constraint */
indexOid,
constrForm->confrelid, /* same foreign rel */
confkey,
conpfeqop,
conppeqop,
conffeqop,
numfks,
fkconstraint->fk_upd_action,
fkconstraint->fk_del_action,
fkconstraint->fk_matchtype,
NULL,
NULL,
NULL,
false, /* islocal */
1, /* inhcount */
false, /* conNoInherit */
true);
/* Set up partition dependencies for the new constraint */
ObjectAddressSet(address, ConstraintRelationId, constrOid);
ObjectAddressSet(referenced, ConstraintRelationId, parentConstrOid);
recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI);
ObjectAddressSet(referenced, RelationRelationId,
RelationGetRelid(partRel));
recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC);
/* Done with the cloned constraint's tuple */
ReleaseSysCache(tuple);
continue;
}
ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
/* Make all this visible before recursing */
CommandCounterIncrement();
DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey,
conpfeqop, conppeqop, conffeqop);
for (i = 0; i < numfks; i++)
mapped_conkey[i] = attmap[conkey[i] - 1];
addFkRecurseReferencing(wqueue,
fkconstraint,
partRel,
pkrel,
indexOid,
constrOid,
numfks,
confkey,
mapped_conkey,
conpfeqop,
conppeqop,
conffeqop,
false, /* no old check exists */
AccessExclusiveLock);
table_close(pkrel, NoLock);
}
}
/*
* Before creating a new constraint, see whether any existing FKs are
* fit for the purpose. If one is, attach the parent constraint to it,
* and don't clone anything. This way we avoid the expensive
* verification step and don't end up with a duplicate FK. This also
* means we don't consider this constraint when recursing to
* partitions.
/*
* When the parent of a partition receives [the referencing side of] a foreign
* key, we must propagate that foreign key to the partition. However, the
* partition might already have an equivalent foreign key; this routine
* compares the given ForeignKeyCacheInfo (in the partition) to the FK defined
* by the other parameters. If they are equivalent, create the link between
* the two constraints and return true.
*
* If the given FK does not match the one defined by rest of the params,
* return false.
*/
attach_it = false;
foreach(cell, partFKs)
{
ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
Form_pg_constraint partConstr;
static bool
tryAttachPartitionForeignKey(ForeignKeyCacheInfo *fk,
Oid partRelid,
Oid parentConstrOid,
int numfks,
AttrNumber *mapped_conkey,
AttrNumber *confkey,
Oid *conpfeqop)
{
HeapTuple parentConstrTup;
Form_pg_constraint parentConstr;
HeapTuple partcontup;
Form_pg_constraint partConstr;
Relation trigrel;
HeapTuple trigtup;
SysScanDesc scan;
ScanKeyData key;
SysScanDesc scan;
HeapTuple trigtup;
attach_it = true;
parentConstrTup = SearchSysCache1(CONSTROID,
ObjectIdGetDatum(parentConstrOid));
if (!parentConstrTup)
elog(ERROR, "cache lookup failed for constraint %u", parentConstrOid);
parentConstr = (Form_pg_constraint) GETSTRUCT(parentConstrTup);
/*
* Do some quick & easy initial checks. If any of these fail, we
* cannot use this constraint, but keep looking.
* Do some quick & easy initial checks. If any of these fail, we cannot
* use this constraint.
*/
if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks)
if (fk->confrelid != parentConstr->confrelid || fk->nkeys != numfks)
{
attach_it = false;
continue;
ReleaseSysCache(parentConstrTup);
return false;
}
for (i = 0; i < numfks; i++)
for (int i = 0; i < numfks; i++)
{
if (fk->conkey[i] != mapped_conkey[i] ||
fk->confkey[i] != confkey[i] ||
fk->conpfeqop[i] != conpfeqop[i])
{
attach_it = false;
break;
ReleaseSysCache(parentConstrTup);
return false;
}
}
if (!attach_it)
continue;
/*
* Looks good so far; do some more extensive checks. Presumably
* the check for 'convalidated' could be dropped, since we don't
* really care about that, but let's be careful for now.
* Looks good so far; do some more extensive checks. Presumably the check
* for 'convalidated' could be dropped, since we don't really care about
* that, but let's be careful for now.
*/
partcontup = SearchSysCache1(CONSTROID,
ObjectIdGetDatum(fk->conoid));
......@@ -8143,28 +8767,28 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
if (OidIsValid(partConstr->conparentid) ||
!partConstr->convalidated ||
partConstr->condeferrable != constrForm->condeferrable ||
partConstr->condeferred != constrForm->condeferred ||
partConstr->confupdtype != constrForm->confupdtype ||
partConstr->confdeltype != constrForm->confdeltype ||
partConstr->confmatchtype != constrForm->confmatchtype)
partConstr->condeferrable != parentConstr->condeferrable ||
partConstr->condeferred != parentConstr->condeferred ||
partConstr->confupdtype != parentConstr->confupdtype ||
partConstr->confdeltype != parentConstr->confdeltype ||
partConstr->confmatchtype != parentConstr->confmatchtype)
{
ReleaseSysCache(parentConstrTup);
ReleaseSysCache(partcontup);
attach_it = false;
continue;
return false;
}
ReleaseSysCache(partcontup);
ReleaseSysCache(parentConstrTup);
/*
* Looks good! Attach this constraint. The action triggers in
* the new partition become redundant -- the parent table already
* has equivalent ones, and those will be able to reach the
* partition. Remove the ones in the partition. We identify them
* because they have our constraint OID, as well as being on the
* referenced rel.
* Looks good! Attach this constraint. The action triggers in the new
* partition become redundant -- the parent table already has equivalent
* ones, and those will be able to reach the partition. Remove the ones
* in the partition. We identify them because they have our constraint
* OID, as well as being on the referenced rel.
*/
trigrel = heap_open(TriggerRelationId, RowExclusiveLock);
trigrel = table_open(TriggerRelationId, RowExclusiveLock);
ScanKeyInit(&key,
Anum_pg_trigger_tgconstraint,
BTEqualStrategyNumber, F_OIDEQ,
......@@ -8183,11 +8807,11 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
continue;
/*
* The constraint is originally set up to contain this trigger
* as an implementation object, so there's a dependency record
* that links the two; however, since the trigger is no longer
* needed, we remove the dependency link in order to be able
* to drop the trigger while keeping the constraint intact.
* The constraint is originally set up to contain this trigger as an
* implementation object, so there's a dependency record that links
* the two; however, since the trigger is no longer needed, we remove
* the dependency link in order to be able to drop the trigger while
* keeping the constraint intact.
*/
deleteDependencyRecordsFor(TriggerRelationId,
trgform->oid,
......@@ -8204,123 +8828,12 @@ CloneFkReferencing(Relation pg_constraint, Relation parentRel,
systable_endscan(scan);
table_close(trigrel, RowExclusiveLock);
ConstraintSetParentConstraint(fk->conoid, parentConstrOid,
RelationGetRelid(partRel));
ConstraintSetParentConstraint(fk->conoid, parentConstrOid, partRelid);
CommandCounterIncrement();
attach_it = true;
break;
}
/*
* If we attached to an existing constraint, there is no need to
* create a new one. In fact, there's no need to recurse for this
* constraint to partitions, either.
*/
if (attach_it)
{
ReleaseSysCache(tuple);
continue;
}
constrOid =
CreateConstraintEntry(NameStr(constrForm->conname),
constrForm->connamespace,
CONSTRAINT_FOREIGN,
constrForm->condeferrable,
constrForm->condeferred,
constrForm->convalidated,
parentConstrOid,
RelationGetRelid(partRel),
mapped_conkey,
numfks,
numfks,
InvalidOid, /* not a domain constraint */
constrForm->conindid, /* same index */
constrForm->confrelid, /* same foreign rel */
confkey,
conpfeqop,
conppeqop,
conffeqop,
numfks,
constrForm->confupdtype,
constrForm->confdeltype,
constrForm->confmatchtype,
NULL,
NULL,
NULL,
false,
1, false, true);
subclone = lappend_oid(subclone, constrOid);
/* Set up partition dependencies for the new constraint */
ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
recordDependencyOn(&childAddr, &parentAddr,
DEPENDENCY_PARTITION_PRI);
ObjectAddressSet(childTableAddr, RelationRelationId,
RelationGetRelid(partRel));
recordDependencyOn(&childAddr, &childTableAddr,
DEPENDENCY_PARTITION_SEC);
fkconstraint = makeNode(Constraint);
/* for now this is all we need */
fkconstraint->conname = pstrdup(NameStr(constrForm->conname));
fkconstraint->fk_upd_action = constrForm->confupdtype;
fkconstraint->fk_del_action = constrForm->confdeltype;
fkconstraint->deferrable = constrForm->condeferrable;
fkconstraint->initdeferred = constrForm->condeferred;
createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
constrOid, constrForm->conindid, false);
if (cloned)
{
ClonedConstraint *newc;
/*
* Feed back caller about the constraints we created, so that they
* can set up constraint verification.
*/
newc = palloc(sizeof(ClonedConstraint));
newc->relid = RelationGetRelid(partRel);
newc->refrelid = constrForm->confrelid;
newc->conindid = constrForm->conindid;
newc->conid = constrOid;
newc->constraint = fkconstraint;
*cloned = lappend(*cloned, newc);
}
ReleaseSysCache(tuple);
}
pfree(attmap);
list_free_deep(partFKs);
/*
* If the partition is partitioned, recurse to handle any constraints that
* were cloned.
*/
if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
subclone != NIL)
{
PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
int i;
for (i = 0; i < partdesc->nparts; i++)
{
Relation childRel;
childRel = table_open(partdesc->oids[i], AccessExclusiveLock);
CloneFkReferencing(pg_constraint,
partRel,
childRel,
subclone,
cloned);
table_close(childRel, NoLock); /* keep lock till commit */
}
}
return true;
}
/*
* ALTER TABLE ALTER CONSTRAINT
*
......@@ -8440,8 +8953,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
/*
* Update deferrability of RI_FKey_noaction_del,
* RI_FKey_noaction_upd, RI_FKey_check_ins and RI_FKey_check_upd
* triggers, but not others; see createForeignKeyTriggers and
* CreateFKCheckTrigger.
* triggers, but not others; see createForeignKeyActionTriggers
* and CreateFKCheckTrigger.
*/
if (tgform->tgfoid != F_RI_FKEY_NOACTION_DEL &&
tgform->tgfoid != F_RI_FKEY_NOACTION_UPD &&
......@@ -9348,37 +9861,6 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
indexOid, false);
}
/*
* Create the triggers that implement an FK constraint.
*
* NB: if you change any trigger properties here, see also
* ATExecAlterConstraint.
*/
void
createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
Oid constraintOid, Oid indexOid, bool create_action)
{
/*
* For the referenced side, create action triggers, if requested. (If the
* referencing side is partitioned, there is still only one trigger, which
* runs on the referenced side and points to the top of the referencing
* hierarchy.)
*/
if (create_action)
createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid,
indexOid);
/*
* For the referencing side, create the check triggers. We only need
* these on the partitions.
*/
if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid,
fkconstraint, constraintOid, indexOid);
CommandCounterIncrement();
}
/*
* ALTER TABLE DROP CONSTRAINT
*
......@@ -14778,8 +15260,6 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
bool found_whole_row;
Oid defaultPartOid;
List *partBoundConstraint;
List *cloned;
ListCell *l;
/*
* We must lock the default partition if one exists, because attaching a
......@@ -14960,33 +15440,10 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
CloneRowTriggersToPartition(rel, attachrel);
/*
* Clone foreign key constraints, and setup for Phase 3 to verify them.
* Clone foreign key constraints. Callee is responsible for setting up
* for phase 3 constraint verification.
*/
cloned = NIL;
CloneForeignKeyConstraints(RelationGetRelid(rel),
RelationGetRelid(attachrel), &cloned);
foreach(l, cloned)
{
ClonedConstraint *clonedcon = lfirst(l);
NewConstraint *newcon;
Relation clonedrel;
AlteredTableInfo *parttab;
clonedrel = relation_open(clonedcon->relid, NoLock);
parttab = ATGetQueueEntry(wqueue, clonedrel);
newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
newcon->name = clonedcon->constraint->conname;
newcon->contype = CONSTR_FOREIGN;
newcon->refrelid = clonedcon->refrelid;
newcon->refindid = clonedcon->conindid;
newcon->conid = clonedcon->conid;
newcon->qual = (Node *) clonedcon->constraint;
parttab->constraints = lappend(parttab->constraints, newcon);
relation_close(clonedrel, NoLock);
}
CloneForeignKeyConstraints(wqueue, rel, attachrel);
/*
* Generate partition constraint from the partition bound specification.
......@@ -15177,6 +15634,8 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
RelationGetRelid(attachrel));
update_relispartition(NULL, cldIdxId, true);
found = true;
CommandCounterIncrement();
break;
}
}
......@@ -15368,6 +15827,9 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
partRel = table_openrv(name, ShareUpdateExclusiveLock);
/* Ensure that foreign keys still hold after this detach */
ATDetachCheckNoForeignKeyRefs(partRel);
/* All inheritance related checks are performed within the function */
RemoveInheritance(partRel, rel);
......@@ -15486,6 +15948,28 @@ ATExecDetachPartition(Relation rel, RangeVar *name)
}
list_free_deep(fks);
/*
* Any sub-constrains that are in the referenced-side of a larger
* constraint have to be removed. This partition is no longer part of the
* key space of the constraint.
*/
foreach(cell, GetParentedForeignKeyRefs(partRel))
{
Oid constrOid = lfirst_oid(cell);
ObjectAddress constraint;
ConstraintSetParentConstraint(constrOid, InvalidOid, InvalidOid);
deleteDependencyRecordsForClass(ConstraintRelationId,
constrOid,
ConstraintRelationId,
DEPENDENCY_INTERNAL);
CommandCounterIncrement();
ObjectAddressSet(constraint, ConstraintRelationId, constrOid);
performDeletion(&constraint, DROP_RESTRICT, 0);
}
CommandCounterIncrement();
/*
* Invalidate the parent's relcache so that the partition is no longer
* included in its partition descriptor.
......@@ -15866,3 +16350,107 @@ update_relispartition(Relation classRel, Oid relationId, bool newval)
if (opened)
table_close(classRel, RowExclusiveLock);
}
/*
* Return an OID list of constraints that reference the given relation
* that are marked as having a parent constraints.
*/
static List *
GetParentedForeignKeyRefs(Relation partition)
{
Relation pg_constraint;
HeapTuple tuple;
SysScanDesc scan;
ScanKeyData key[2];
List *constraints = NIL;
/*
* If no indexes, or no columns are referenceable by FKs, we can avoid the
* scan.
*/
if (RelationGetIndexList(partition) == NIL ||
bms_is_empty(RelationGetIndexAttrBitmap(partition,
INDEX_ATTR_BITMAP_KEY)))
return NIL;
/* Search for constraints referencing this table */
pg_constraint = table_open(ConstraintRelationId, AccessShareLock);
ScanKeyInit(&key[0],
Anum_pg_constraint_confrelid, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(RelationGetRelid(partition)));
ScanKeyInit(&key[1],
Anum_pg_constraint_contype, BTEqualStrategyNumber,
F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
/* XXX This is a seqscan, as we don't have a usable index */
scan = systable_beginscan(pg_constraint, InvalidOid, true, NULL, 2, key);
while ((tuple = systable_getnext(scan)) != NULL)
{
Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
/*
* We only need to process constraints that are part of larger ones.
*/
if (!OidIsValid(constrForm->conparentid))
continue;
constraints = lappend_oid(constraints, constrForm->oid);
}
systable_endscan(scan);
table_close(pg_constraint, AccessShareLock);
return constraints;
}
/*
* During DETACH PARTITION, verify that any foreign keys pointing to the
* partitioned table would not become invalid. An error is raised if any
* referenced values exist.
*/
static void
ATDetachCheckNoForeignKeyRefs(Relation partition)
{
List *constraints;
ListCell *cell;
constraints = GetParentedForeignKeyRefs(partition);
foreach(cell, constraints)
{
Oid constrOid = lfirst_oid(cell);
HeapTuple tuple;
Form_pg_constraint constrForm;
Relation rel;
Trigger trig;
tuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(constrOid));
if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for constraint %u", constrOid);
constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
Assert(OidIsValid(constrForm->conparentid));
Assert(constrForm->confrelid == RelationGetRelid(partition));
/* prevent data changes into the referencing table until commit */
rel = table_open(constrForm->conrelid, ShareLock);
MemSet(&trig, 0, sizeof(trig));
trig.tgoid = InvalidOid;
trig.tgname = NameStr(constrForm->conname);
trig.tgenabled = TRIGGER_FIRES_ON_ORIGIN;
trig.tgisinternal = true;
trig.tgconstrrelid = RelationGetRelid(partition);
trig.tgconstrindid = constrForm->conindid;
trig.tgconstraint = constrForm->oid;
trig.tgdeferrable = false;
trig.tginitdeferred = false;
/* we needn't fill in remaining fields */
RI_PartitionRemove_Check(&trig, rel, partition);
ReleaseSysCache(tuple);
table_close(rel, NoLock);
}
}
......@@ -50,6 +50,7 @@
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/rls.h"
#include "utils/ruleutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
......@@ -220,8 +221,8 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
Datum *vals, char *nulls);
static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
Relation pk_rel, Relation fk_rel,
TupleTableSlot *violator, TupleDesc tupdesc,
int queryno) pg_attribute_noreturn();
TupleTableSlot *violatorslot, TupleDesc tupdesc,
int queryno, bool partgone) pg_attribute_noreturn();
/*
......@@ -348,18 +349,22 @@ RI_FKey_check(TriggerData *trigdata)
char paramname[16];
const char *querysep;
Oid queryoids[RI_MAX_NUMKEYS];
const char *pk_only;
/* ----------
* The query string built is
* SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...]
* SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
* FOR KEY SHARE OF x
* The type id's for the $ parameters are those of the
* corresponding FK attributes.
* ----------
*/
initStringInfo(&querybuf);
pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
quoteRelationName(pkrelname, pk_rel);
appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
pk_only, pkrelname);
querysep = "WHERE";
for (int i = 0; i < riinfo->nkeys; i++)
{
......@@ -471,19 +476,23 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
char attname[MAX_QUOTED_NAME_LEN];
char paramname[16];
const char *querysep;
const char *pk_only;
Oid queryoids[RI_MAX_NUMKEYS];
/* ----------
* The query string built is
* SELECT 1 FROM ONLY <pktable> x WHERE pkatt1 = $1 [AND ...]
* SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
* FOR KEY SHARE OF x
* The type id's for the $ parameters are those of the
* PK attributes themselves.
* ----------
*/
initStringInfo(&querybuf);
pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
quoteRelationName(pkrelname, pk_rel);
appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname);
appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
pk_only, pkrelname);
querysep = "WHERE";
for (int i = 0; i < riinfo->nkeys; i++)
{
......@@ -1293,6 +1302,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
RangeTblEntry *fkrte;
const char *sep;
const char *fk_only;
const char *pk_only;
int save_nestlevel;
char workmembuf[32];
int spi_result;
......@@ -1350,7 +1360,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
/*----------
* The query string built is:
* SELECT fk.keycols FROM [ONLY] relname fk
* LEFT OUTER JOIN ONLY pkrelname pk
* LEFT OUTER JOIN [ONLY] pkrelname pk
* ON (pk.pkkeycol1=fk.keycol1 [AND ...])
* WHERE pk.pkkeycol1 IS NULL AND
* For MATCH SIMPLE:
......@@ -1377,9 +1387,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
quoteRelationName(fkrelname, fk_rel);
fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
appendStringInfo(&querybuf,
" FROM %s%s fk LEFT OUTER JOIN ONLY %s pk ON",
fk_only, fkrelname, pkrelname);
" FROM %s%s fk LEFT OUTER JOIN %s%s pk ON",
fk_only, fkrelname, pk_only, pkrelname);
strcpy(pkattname, "pk.");
strcpy(fkattname, "fk.");
......@@ -1530,7 +1542,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
ri_ReportViolation(&fake_riinfo,
pk_rel, fk_rel,
slot, tupdesc,
RI_PLAN_CHECK_LOOKUPPK);
RI_PLAN_CHECK_LOOKUPPK, false);
ExecDropSingleTupleTableSlot(slot);
}
......@@ -1546,6 +1558,214 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
return true;
}
/*
* RI_PartitionRemove_Check -
*
* Verify no referencing values exist, when a partition is detached on
* the referenced side of a foreign key constraint.
*/
void
RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
{
const RI_ConstraintInfo *riinfo;
StringInfoData querybuf;
char *constraintDef;
char pkrelname[MAX_QUOTED_REL_NAME_LEN];
char fkrelname[MAX_QUOTED_REL_NAME_LEN];
char pkattname[MAX_QUOTED_NAME_LEN + 3];
char fkattname[MAX_QUOTED_NAME_LEN + 3];
const char *sep;
const char *fk_only;
int save_nestlevel;
char workmembuf[32];
int spi_result;
SPIPlanPtr qplan;
int i;
riinfo = ri_FetchConstraintInfo(trigger, fk_rel, false);
/*
* We don't check permissions before displaying the error message, on the
* assumption that the user detaching the partition must have enough
* privileges to examine the table contents anyhow.
*/
/*----------
* The query string built is:
* SELECT fk.keycols FROM [ONLY] relname fk
* JOIN pkrelname pk
* ON (pk.pkkeycol1=fk.keycol1 [AND ...])
* WHERE (<partition constraint>) AND
* For MATCH SIMPLE:
* (fk.keycol1 IS NOT NULL [AND ...])
* For MATCH FULL:
* (fk.keycol1 IS NOT NULL [OR ...])
*
* We attach COLLATE clauses to the operators when comparing columns
* that have different collations.
*----------
*/
initStringInfo(&querybuf);
appendStringInfoString(&querybuf, "SELECT ");
sep = "";
for (i = 0; i < riinfo->nkeys; i++)
{
quoteOneName(fkattname,
RIAttName(fk_rel, riinfo->fk_attnums[i]));
appendStringInfo(&querybuf, "%sfk.%s", sep, fkattname);
sep = ", ";
}
quoteRelationName(pkrelname, pk_rel);
quoteRelationName(fkrelname, fk_rel);
fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
"" : "ONLY ";
appendStringInfo(&querybuf,
" FROM %s%s fk JOIN %s pk ON",
fk_only, fkrelname, pkrelname);
strcpy(pkattname, "pk.");
strcpy(fkattname, "fk.");
sep = "(";
for (i = 0; i < riinfo->nkeys; i++)
{
Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
Oid pk_coll = RIAttCollation(pk_rel, riinfo->pk_attnums[i]);
Oid fk_coll = RIAttCollation(fk_rel, riinfo->fk_attnums[i]);
quoteOneName(pkattname + 3,
RIAttName(pk_rel, riinfo->pk_attnums[i]));
quoteOneName(fkattname + 3,
RIAttName(fk_rel, riinfo->fk_attnums[i]));
ri_GenerateQual(&querybuf, sep,
pkattname, pk_type,
riinfo->pf_eq_oprs[i],
fkattname, fk_type);
if (pk_coll != fk_coll)
ri_GenerateQualCollation(&querybuf, pk_coll);
sep = "AND";
}
/*
* Start the WHERE clause with the partition constraint (except if this is
* the default partition and there's no other partition, because the
* partition constraint is the empty string in that case.)
*/
constraintDef = pg_get_partconstrdef_string(RelationGetRelid(pk_rel), "pk");
if (constraintDef && constraintDef[0] != '\0')
appendStringInfo(&querybuf, ") WHERE %s AND (",
constraintDef);
else
appendStringInfo(&querybuf, ") WHERE (");
sep = "";
for (i = 0; i < riinfo->nkeys; i++)
{
quoteOneName(fkattname, RIAttName(fk_rel, riinfo->fk_attnums[i]));
appendStringInfo(&querybuf,
"%sfk.%s IS NOT NULL",
sep, fkattname);
switch (riinfo->confmatchtype)
{
case FKCONSTR_MATCH_SIMPLE:
sep = " AND ";
break;
case FKCONSTR_MATCH_FULL:
sep = " OR ";
break;
}
}
appendStringInfoChar(&querybuf, ')');
/*
* Temporarily increase work_mem so that the check query can be executed
* more efficiently. It seems okay to do this because the query is simple
* enough to not use a multiple of work_mem, and one typically would not
* have many large foreign-key validations happening concurrently. So
* this seems to meet the criteria for being considered a "maintenance"
* operation, and accordingly we use maintenance_work_mem.
*
* We use the equivalent of a function SET option to allow the setting to
* persist for exactly the duration of the check query. guc.c also takes
* care of undoing the setting on error.
*/
save_nestlevel = NewGUCNestLevel();
snprintf(workmembuf, sizeof(workmembuf), "%d", maintenance_work_mem);
(void) set_config_option("work_mem", workmembuf,
PGC_USERSET, PGC_S_SESSION,
GUC_ACTION_SAVE, true, 0, false);
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect failed");
/*
* Generate the plan. We don't need to cache it, and there are no
* arguments to the plan.
*/
qplan = SPI_prepare(querybuf.data, 0, NULL);
if (qplan == NULL)
elog(ERROR, "SPI_prepare returned %s for %s",
SPI_result_code_string(SPI_result), querybuf.data);
/*
* Run the plan. For safety we force a current snapshot to be used. (In
* transaction-snapshot mode, this arguably violates transaction isolation
* rules, but we really haven't got much choice.) We don't need to
* register the snapshot, because SPI_execute_snapshot will see to it. We
* need at most one tuple returned, so pass limit = 1.
*/
spi_result = SPI_execute_snapshot(qplan,
NULL, NULL,
GetLatestSnapshot(),
InvalidSnapshot,
true, false, 1);
/* Check result */
if (spi_result != SPI_OK_SELECT)
elog(ERROR, "SPI_execute_snapshot returned %s", SPI_result_code_string(spi_result));
/* Did we find a tuple that would violate the constraint? */
if (SPI_processed > 0)
{
TupleTableSlot *slot;
HeapTuple tuple = SPI_tuptable->vals[0];
TupleDesc tupdesc = SPI_tuptable->tupdesc;
RI_ConstraintInfo fake_riinfo;
slot = MakeSingleTupleTableSlot(tupdesc, &TTSOpsVirtual);
heap_deform_tuple(tuple, tupdesc,
slot->tts_values, slot->tts_isnull);
ExecStoreVirtualTuple(slot);
/*
* The columns to look at in the result tuple are 1..N, not whatever
* they are in the fk_rel. Hack up riinfo so that ri_ReportViolation
* will behave properly.
*
* In addition to this, we have to pass the correct tupdesc to
* ri_ReportViolation, overriding its normal habit of using the pk_rel
* or fk_rel's tupdesc.
*/
memcpy(&fake_riinfo, riinfo, sizeof(RI_ConstraintInfo));
for (i = 0; i < fake_riinfo.nkeys; i++)
fake_riinfo.pk_attnums[i] = i + 1;
ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
slot, tupdesc, 0, true);
}
if (SPI_finish() != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed");
/*
* Restore work_mem.
*/
AtEOXact_GUC(true, save_nestlevel);
}
/* ----------
* Local functions below
......@@ -2078,7 +2298,7 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
pk_rel, fk_rel,
newslot ? newslot : oldslot,
NULL,
qkey->constr_queryno);
qkey->constr_queryno, false);
return SPI_processed != 0;
}
......@@ -2119,7 +2339,7 @@ static void
ri_ReportViolation(const RI_ConstraintInfo *riinfo,
Relation pk_rel, Relation fk_rel,
TupleTableSlot *violatorslot, TupleDesc tupdesc,
int queryno)
int queryno, bool partgone)
{
StringInfoData key_names;
StringInfoData key_values;
......@@ -2158,9 +2378,13 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
*
* Check table-level permissions next and, failing that, column-level
* privileges.
*
* When a partition at the referenced side is being detached/dropped, we
* needn't check, since the user must be the table owner anyway.
*/
if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
if (partgone)
has_perm = true;
else if (check_enable_rls(rel_oid, InvalidOid, true) != RLS_ENABLED)
{
aclresult = pg_class_aclcheck(rel_oid, GetUserId(), ACL_SELECT);
if (aclresult != ACLCHECK_OK)
......@@ -2222,7 +2446,16 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
}
}
if (onfk)
if (partgone)
ereport(ERROR,
(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
errmsg("removing partition \"%s\" violates foreign key constraint \"%s\"",
RelationGetRelationName(pk_rel),
NameStr(riinfo->conname)),
errdetail("Key (%s)=(%s) still referenced from table \"%s\".",
key_names.data, key_values.data,
RelationGetRelationName(fk_rel))));
else if (onfk)
ereport(ERROR,
(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
......
......@@ -1836,6 +1836,24 @@ pg_get_partition_constraintdef(PG_FUNCTION_ARGS)
PG_RETURN_TEXT_P(string_to_text(consrc));
}
/*
* pg_get_partconstrdef_string
*
* Returns the partition constraint as a C-string for the input relation, with
* the given alias. No pretty-printing.
*/
char *
pg_get_partconstrdef_string(Oid partitionId, char *aliasname)
{
Expr *constr_expr;
List *context;
constr_expr = get_partition_qual_relid(partitionId);
context = deparse_context_for(aliasname, partitionId);
return deparse_expression((Node *) constr_expr, context, true, false);
}
/*
* pg_get_constraintdef
*
......
......@@ -2452,9 +2452,12 @@ describeOneTableDetails(const char *schemaname,
" pg_catalog.pg_get_constraintdef(r.oid, true) as condef,\n"
" conrelid::pg_catalog.regclass AS ontable\n"
"FROM pg_catalog.pg_constraint r\n"
"WHERE r.conrelid = '%s' AND r.contype = 'f'\n"
"ORDER BY conname;",
"WHERE r.conrelid = '%s' AND r.contype = 'f'\n",
oid);
if (pset.sversion >= 120000)
appendPQExpBuffer(&buf, " AND conparentid = 0\n");
appendPQExpBuffer(&buf, "ORDER BY conname");
}
result = PSQLexec(buf.data);
......
......@@ -76,10 +76,6 @@ extern void find_composite_type_dependencies(Oid typeOid,
extern void check_of_type(HeapTuple typetuple);
extern void createForeignKeyTriggers(Relation rel, Oid refRelOid,
Constraint *fkconstraint, Oid constraintOid,
Oid indexOid, bool create_action);
extern void register_on_commit_action(Oid relid, OnCommitAction action);
extern void remove_on_commit_action(Oid relid);
......
......@@ -263,6 +263,8 @@ extern bool RI_FKey_fk_upd_check_required(Trigger *trigger, Relation fk_rel,
TupleTableSlot *old_slot, TupleTableSlot *new_slot);
extern bool RI_Initial_Check(Trigger *trigger,
Relation fk_rel, Relation pk_rel);
extern void RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel,
Relation pk_rel);
/* result values for RI_FKey_trigger_type: */
#define RI_TRIGGER_PK 1 /* is a trigger on the PK relation */
......
......@@ -22,6 +22,7 @@ extern char *pg_get_indexdef_string(Oid indexrelid);
extern char *pg_get_indexdef_columns(Oid indexrelid, bool pretty);
extern char *pg_get_partkeydef_columns(Oid relid, bool pretty);
extern char *pg_get_partconstrdef_string(Oid partitionId, char *aliasname);
extern char *pg_get_constraintdef_command(Oid constraintId);
extern char *deparse_expression(Node *expr, List *dpcontext,
......
Parsed test spec with 2 sessions
starting permutation: s1b s1d s1c s2b s2a s2c
step s1b: begin;
step s1d: delete from ppk1 where a = 1;
step s1c: commit;
step s2b: begin;
step s2a: alter table pfk attach partition pfk1 for values in (1);
ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
step s2c: commit;
starting permutation: s1b s1d s2b s1c s2a s2c
step s1b: begin;
step s1d: delete from ppk1 where a = 1;
step s2b: begin;
step s1c: commit;
step s2a: alter table pfk attach partition pfk1 for values in (1);
ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
step s2c: commit;
starting permutation: s1b s1d s2b s2a s1c s2c
step s1b: begin;
step s1d: delete from ppk1 where a = 1;
step s2b: begin;
step s2a: alter table pfk attach partition pfk1 for values in (1); <waiting ...>
step s1c: commit;
step s2a: <... completed>
error in steps s1c s2a: ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
step s2c: commit;
starting permutation: s1b s2b s1d s1c s2a s2c
step s1b: begin;
step s2b: begin;
step s1d: delete from ppk1 where a = 1;
step s1c: commit;
step s2a: alter table pfk attach partition pfk1 for values in (1);
ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
step s2c: commit;
starting permutation: s1b s2b s1d s2a s1c s2c
step s1b: begin;
step s2b: begin;
step s1d: delete from ppk1 where a = 1;
step s2a: alter table pfk attach partition pfk1 for values in (1); <waiting ...>
step s1c: commit;
step s2a: <... completed>
error in steps s1c s2a: ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
step s2c: commit;
starting permutation: s1b s2b s2a s1d s2c s1c
step s1b: begin;
step s2b: begin;
step s2a: alter table pfk attach partition pfk1 for values in (1);
step s1d: delete from ppk1 where a = 1; <waiting ...>
step s2c: commit;
step s1d: <... completed>
error in steps s2c s1d: ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
step s1c: commit;
starting permutation: s1b s2b s2a s2c s1d s1c
step s1b: begin;
step s2b: begin;
step s2a: alter table pfk attach partition pfk1 for values in (1);
step s2c: commit;
step s1d: delete from ppk1 where a = 1;
ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
step s1c: commit;
starting permutation: s2b s1b s1d s1c s2a s2c
step s2b: begin;
step s1b: begin;
step s1d: delete from ppk1 where a = 1;
step s1c: commit;
step s2a: alter table pfk attach partition pfk1 for values in (1);
ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
step s2c: commit;
starting permutation: s2b s1b s1d s2a s1c s2c
step s2b: begin;
step s1b: begin;
step s1d: delete from ppk1 where a = 1;
step s2a: alter table pfk attach partition pfk1 for values in (1); <waiting ...>
step s1c: commit;
step s2a: <... completed>
error in steps s1c s2a: ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
step s2c: commit;
starting permutation: s2b s1b s2a s1d s2c s1c
step s2b: begin;
step s1b: begin;
step s2a: alter table pfk attach partition pfk1 for values in (1);
step s1d: delete from ppk1 where a = 1; <waiting ...>
step s2c: commit;
step s1d: <... completed>
error in steps s2c s1d: ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
step s1c: commit;
starting permutation: s2b s1b s2a s2c s1d s1c
step s2b: begin;
step s1b: begin;
step s2a: alter table pfk attach partition pfk1 for values in (1);
step s2c: commit;
step s1d: delete from ppk1 where a = 1;
ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
step s1c: commit;
starting permutation: s2b s2a s1b s1d s2c s1c
step s2b: begin;
step s2a: alter table pfk attach partition pfk1 for values in (1);
step s1b: begin;
step s1d: delete from ppk1 where a = 1; <waiting ...>
step s2c: commit;
step s1d: <... completed>
error in steps s2c s1d: ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
step s1c: commit;
starting permutation: s2b s2a s1b s2c s1d s1c
step s2b: begin;
step s2a: alter table pfk attach partition pfk1 for values in (1);
step s1b: begin;
step s2c: commit;
step s1d: delete from ppk1 where a = 1;
ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
step s1c: commit;
starting permutation: s2b s2a s2c s1b s1d s1c
step s2b: begin;
step s2a: alter table pfk attach partition pfk1 for values in (1);
step s2c: commit;
step s1b: begin;
step s1d: delete from ppk1 where a = 1;
ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
step s1c: commit;
Parsed test spec with 2 sessions
starting permutation: s1b s1d s2b s2i s1c s2c
step s1b: begin;
step s1d: delete from ppk where a = 1;
step s2b: begin;
step s2i: insert into pfk values (1); <waiting ...>
step s1c: commit;
step s2i: <... completed>
error in steps s1c s2i: ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
step s2c: commit;
starting permutation: s1b s1d s2bs s2i s1c s2c
step s1b: begin;
step s1d: delete from ppk where a = 1;
step s2bs: begin isolation level serializable; select 1;
?column?
1
step s2i: insert into pfk values (1); <waiting ...>
step s1c: commit;
step s2i: <... completed>
error in steps s1c s2i: ERROR: could not serialize access due to concurrent update
step s2c: commit;
starting permutation: s1b s2b s1d s2i s1c s2c
step s1b: begin;
step s2b: begin;
step s1d: delete from ppk where a = 1;
step s2i: insert into pfk values (1); <waiting ...>
step s1c: commit;
step s2i: <... completed>
error in steps s1c s2i: ERROR: insert or update on table "pfk1" violates foreign key constraint "pfk_a_fkey"
step s2c: commit;
starting permutation: s1b s2bs s1d s2i s1c s2c
step s1b: begin;
step s2bs: begin isolation level serializable; select 1;
?column?
1
step s1d: delete from ppk where a = 1;
step s2i: insert into pfk values (1); <waiting ...>
step s1c: commit;
step s2i: <... completed>
error in steps s1c s2i: ERROR: could not serialize access due to concurrent update
step s2c: commit;
starting permutation: s1b s2b s2i s1d s2c s1c
step s1b: begin;
step s2b: begin;
step s2i: insert into pfk values (1);
step s1d: delete from ppk where a = 1; <waiting ...>
step s2c: commit;
step s1d: <... completed>
error in steps s2c s1d: ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
step s1c: commit;
starting permutation: s1b s2bs s2i s1d s2c s1c
step s1b: begin;
step s2bs: begin isolation level serializable; select 1;
?column?
1
step s2i: insert into pfk values (1);
step s1d: delete from ppk where a = 1; <waiting ...>
step s2c: commit;
step s1d: <... completed>
error in steps s2c s1d: ERROR: update or delete on table "ppk1" violates foreign key constraint "pfk_a_fkey1" on table "pfk"
step s1c: commit;
......@@ -24,6 +24,8 @@ test: deadlock-soft-2
test: fk-contention
test: fk-deadlock
test: fk-deadlock2
test: fk-partitioned-1
test: fk-partitioned-2
test: eval-plan-qual
test: lock-update-delete
test: lock-update-traversal
......
# Verify that cloning a foreign key constraint to a partition ensures
# that referenced values exist, even if they're being concurrently
# deleted.
setup {
drop table if exists ppk, pfk, pfk1;
create table ppk (a int primary key) partition by list (a);
create table ppk1 partition of ppk for values in (1);
insert into ppk values (1);
create table pfk (a int references ppk) partition by list (a);
create table pfk1 (a int not null);
insert into pfk1 values (1);
}
session "s1"
step "s1b" { begin; }
step "s1d" { delete from ppk1 where a = 1; }
step "s1c" { commit; }
session "s2"
step "s2b" { begin; }
step "s2a" { alter table pfk attach partition pfk1 for values in (1); }
step "s2c" { commit; }
teardown { drop table ppk, pfk, pfk1; }
permutation "s1b" "s1d" "s1c" "s2b" "s2a" "s2c"
permutation "s1b" "s1d" "s2b" "s1c" "s2a" "s2c"
permutation "s1b" "s1d" "s2b" "s2a" "s1c" "s2c"
#permutation "s1b" "s1d" "s2b" "s2a" "s2c" "s1c"
permutation "s1b" "s2b" "s1d" "s1c" "s2a" "s2c"
permutation "s1b" "s2b" "s1d" "s2a" "s1c" "s2c"
#permutation "s1b" "s2b" "s1d" "s2a" "s2c" "s1c"
#permutation "s1b" "s2b" "s2a" "s1d" "s1c" "s2c"
permutation "s1b" "s2b" "s2a" "s1d" "s2c" "s1c"
permutation "s1b" "s2b" "s2a" "s2c" "s1d" "s1c"
permutation "s2b" "s1b" "s1d" "s1c" "s2a" "s2c"
permutation "s2b" "s1b" "s1d" "s2a" "s1c" "s2c"
#permutation "s2b" "s1b" "s1d" "s2a" "s2c" "s1c"
#permutation "s2b" "s1b" "s2a" "s1d" "s1c" "s2c"
permutation "s2b" "s1b" "s2a" "s1d" "s2c" "s1c"
permutation "s2b" "s1b" "s2a" "s2c" "s1d" "s1c"
#permutation "s2b" "s2a" "s1b" "s1d" "s1c" "s2c"
permutation "s2b" "s2a" "s1b" "s1d" "s2c" "s1c"
permutation "s2b" "s2a" "s1b" "s2c" "s1d" "s1c"
permutation "s2b" "s2a" "s2c" "s1b" "s1d" "s1c"
# Make sure that FKs referencing partitioned tables actually work.
setup {
drop table if exists ppk, pfk, pfk1;
create table ppk (a int primary key) partition by list (a);
create table ppk1 partition of ppk for values in (1);
insert into ppk values (1);
create table pfk (a int references ppk) partition by list (a);
create table pfk1 partition of pfk for values in (1);
}
session "s1"
step "s1b" { begin; }
step "s1d" { delete from ppk where a = 1; }
step "s1c" { commit; }
session "s2"
step "s2b" { begin; }
step "s2bs" { begin isolation level serializable; select 1; }
step "s2i" { insert into pfk values (1); }
step "s2c" { commit; }
teardown { drop table ppk, pfk, pfk1; }
permutation "s1b" "s1d" "s2b" "s2i" "s1c" "s2c"
permutation "s1b" "s1d" "s2bs" "s2i" "s1c" "s2c"
permutation "s1b" "s2b" "s1d" "s2i" "s1c" "s2c"
permutation "s1b" "s2bs" "s1d" "s2i" "s1c" "s2c"
permutation "s1b" "s2b" "s2i" "s1d" "s2c" "s1c"
permutation "s1b" "s2bs" "s2i" "s1d" "s2c" "s1c"
......@@ -1532,19 +1532,6 @@ drop table pktable2, fktable2;
--
-- Foreign keys and partitioned tables
--
-- partitioned table in the referenced side are not allowed
CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
PARTITION BY RANGE (a, b);
-- verify with create table first ...
CREATE TABLE fk_notpartitioned_fk (a int, b int,
FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
ERROR: cannot reference partitioned table "fk_partitioned_pk"
-- and then with alter table.
CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
REFERENCES fk_partitioned_pk;
ERROR: cannot reference partitioned table "fk_partitioned_pk"
DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
-- Creation of a partitioned hierarchy with irregular definitions
CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
PRIMARY KEY (a, b));
......@@ -1686,7 +1673,7 @@ CREATE TABLE fk_partitioned_fk_full (x int, y int) PARTITION BY RANGE (x);
CREATE TABLE fk_partitioned_fk_full_1 PARTITION OF fk_partitioned_fk_full DEFAULT;
INSERT INTO fk_partitioned_fk_full VALUES (1, NULL);
ALTER TABLE fk_partitioned_fk_full ADD FOREIGN KEY (x, y) REFERENCES fk_notpartitioned_pk MATCH FULL; -- fails
ERROR: insert or update on table "fk_partitioned_fk_full" violates foreign key constraint "fk_partitioned_fk_full_x_y_fkey"
ERROR: insert or update on table "fk_partitioned_fk_full_1" violates foreign key constraint "fk_partitioned_fk_full_x_y_fkey"
DETAIL: MATCH FULL does not allow mixing of null and nonnull key values.
TRUNCATE fk_partitioned_fk_full;
ALTER TABLE fk_partitioned_fk_full ADD FOREIGN KEY (x, y) REFERENCES fk_notpartitioned_pk MATCH FULL;
......@@ -1902,7 +1889,7 @@ CREATE TABLE fk_partitioned_fk_2_2 PARTITION OF fk_partitioned_fk_2 FOR VALUES F
INSERT INTO fk_partitioned_fk_2 VALUES (1600, 601), (1600, 1601);
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
FOR VALUES IN (1600);
ERROR: insert or update on table "fk_partitioned_fk_2" violates foreign key constraint "fk_partitioned_fk_a_b_fkey"
ERROR: insert or update on table "fk_partitioned_fk_2_1" violates foreign key constraint "fk_partitioned_fk_a_b_fkey"
DETAIL: Key (a, b)=(1600, 601) is not present in table "fk_notpartitioned_pk".
INSERT INTO fk_notpartitioned_pk VALUES (1600, 601), (1600, 1601);
ALTER TABLE fk_partitioned_fk ATTACH PARTITION fk_partitioned_fk_2
......@@ -2037,3 +2024,320 @@ drop cascades to table fkpart1.fk_part
drop cascades to table fkpart1.fk_part_1
drop cascades to table fkpart0.pkey
drop cascades to table fkpart0.fk_part
-- Test a partitioned table as referenced table.
-- Verify basic functionality with a regular partition creation and a partition
-- with a different column layout, as well as partitions added (created and
-- attached) after creating the foreign key.
CREATE SCHEMA fkpart3;
SET search_path TO fkpart3;
CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY RANGE (a);
CREATE TABLE pk1 PARTITION OF pk FOR VALUES FROM (0) TO (1000);
CREATE TABLE pk2 (b int, a int);
ALTER TABLE pk2 DROP COLUMN b;
ALTER TABLE pk2 ALTER a SET NOT NULL;
ALTER TABLE pk ATTACH PARTITION pk2 FOR VALUES FROM (1000) TO (2000);
CREATE TABLE fk (a int) PARTITION BY RANGE (a);
CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (0) TO (750);
ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk;
CREATE TABLE fk2 (b int, a int) ;
ALTER TABLE fk2 DROP COLUMN b;
ALTER TABLE fk ATTACH PARTITION fk2 FOR VALUES FROM (750) TO (3500);
CREATE TABLE pk3 PARTITION OF pk FOR VALUES FROM (2000) TO (3000);
CREATE TABLE pk4 (LIKE pk);
ALTER TABLE pk ATTACH PARTITION pk4 FOR VALUES FROM (3000) TO (4000);
CREATE TABLE pk5 (c int, b int, a int NOT NULL) PARTITION BY RANGE (a);
ALTER TABLE pk5 DROP COLUMN b, DROP COLUMN c;
CREATE TABLE pk51 PARTITION OF pk5 FOR VALUES FROM (4000) TO (4500);
CREATE TABLE pk52 PARTITION OF pk5 FOR VALUES FROM (4500) TO (5000);
ALTER TABLE pk ATTACH PARTITION pk5 FOR VALUES FROM (4000) TO (5000);
CREATE TABLE fk3 PARTITION OF fk FOR VALUES FROM (3500) TO (5000);
-- these should fail: referenced value not present
INSERT into fk VALUES (1);
ERROR: insert or update on table "fk1" violates foreign key constraint "fk_a_fkey"
DETAIL: Key (a)=(1) is not present in table "pk".
INSERT into fk VALUES (1000);
ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
DETAIL: Key (a)=(1000) is not present in table "pk".
INSERT into fk VALUES (2000);
ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
DETAIL: Key (a)=(2000) is not present in table "pk".
INSERT into fk VALUES (3000);
ERROR: insert or update on table "fk2" violates foreign key constraint "fk_a_fkey"
DETAIL: Key (a)=(3000) is not present in table "pk".
INSERT into fk VALUES (4000);
ERROR: insert or update on table "fk3" violates foreign key constraint "fk_a_fkey"
DETAIL: Key (a)=(4000) is not present in table "pk".
INSERT into fk VALUES (4500);
ERROR: insert or update on table "fk3" violates foreign key constraint "fk_a_fkey"
DETAIL: Key (a)=(4500) is not present in table "pk".
-- insert into the referenced table, now they should work
INSERT into pk VALUES (1), (1000), (2000), (3000), (4000), (4500);
INSERT into fk VALUES (1), (1000), (2000), (3000), (4000), (4500);
-- should fail: referencing value present
DELETE FROM pk WHERE a = 1;
ERROR: update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk"
DETAIL: Key (a)=(1) is still referenced from table "fk".
DELETE FROM pk WHERE a = 1000;
ERROR: update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk"
DETAIL: Key (a)=(1000) is still referenced from table "fk".
DELETE FROM pk WHERE a = 2000;
ERROR: update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk"
DETAIL: Key (a)=(2000) is still referenced from table "fk".
DELETE FROM pk WHERE a = 3000;
ERROR: update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk"
DETAIL: Key (a)=(3000) is still referenced from table "fk".
DELETE FROM pk WHERE a = 4000;
ERROR: update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk"
DETAIL: Key (a)=(4000) is still referenced from table "fk".
DELETE FROM pk WHERE a = 4500;
ERROR: update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk"
DETAIL: Key (a)=(4500) is still referenced from table "fk".
UPDATE pk SET a = 2 WHERE a = 1;
ERROR: update or delete on table "pk1" violates foreign key constraint "fk_a_fkey1" on table "fk"
DETAIL: Key (a)=(1) is still referenced from table "fk".
UPDATE pk SET a = 1002 WHERE a = 1000;
ERROR: update or delete on table "pk2" violates foreign key constraint "fk_a_fkey2" on table "fk"
DETAIL: Key (a)=(1000) is still referenced from table "fk".
UPDATE pk SET a = 2002 WHERE a = 2000;
ERROR: update or delete on table "pk3" violates foreign key constraint "fk_a_fkey3" on table "fk"
DETAIL: Key (a)=(2000) is still referenced from table "fk".
UPDATE pk SET a = 3002 WHERE a = 3000;
ERROR: update or delete on table "pk4" violates foreign key constraint "fk_a_fkey4" on table "fk"
DETAIL: Key (a)=(3000) is still referenced from table "fk".
UPDATE pk SET a = 4002 WHERE a = 4000;
ERROR: update or delete on table "pk51" violates foreign key constraint "fk_a_fkey6" on table "fk"
DETAIL: Key (a)=(4000) is still referenced from table "fk".
UPDATE pk SET a = 4502 WHERE a = 4500;
ERROR: update or delete on table "pk52" violates foreign key constraint "fk_a_fkey7" on table "fk"
DETAIL: Key (a)=(4500) is still referenced from table "fk".
-- now they should work
DELETE FROM fk;
UPDATE pk SET a = 2 WHERE a = 1;
DELETE FROM pk WHERE a = 2;
UPDATE pk SET a = 1002 WHERE a = 1000;
DELETE FROM pk WHERE a = 1002;
UPDATE pk SET a = 2002 WHERE a = 2000;
DELETE FROM pk WHERE a = 2002;
UPDATE pk SET a = 3002 WHERE a = 3000;
DELETE FROM pk WHERE a = 3002;
UPDATE pk SET a = 4002 WHERE a = 4000;
DELETE FROM pk WHERE a = 4002;
UPDATE pk SET a = 4502 WHERE a = 4500;
DELETE FROM pk WHERE a = 4502;
CREATE SCHEMA fkpart4;
SET search_path TO fkpart4;
-- dropping/detaching PARTITIONs is prevented if that would break
-- a foreign key's existing data
CREATE TABLE droppk (a int PRIMARY KEY) PARTITION BY RANGE (a);
CREATE TABLE droppk1 PARTITION OF droppk FOR VALUES FROM (0) TO (1000);
CREATE TABLE droppk_d PARTITION OF droppk DEFAULT;
CREATE TABLE droppk2 PARTITION OF droppk FOR VALUES FROM (1000) TO (2000)
PARTITION BY RANGE (a);
CREATE TABLE droppk21 PARTITION OF droppk2 FOR VALUES FROM (1000) TO (1400);
CREATE TABLE droppk2_d PARTITION OF droppk2 DEFAULT;
INSERT into droppk VALUES (1), (1000), (1500), (2000);
CREATE TABLE dropfk (a int REFERENCES droppk);
INSERT into dropfk VALUES (1), (1000), (1500), (2000);
-- these should all fail
ALTER TABLE droppk DETACH PARTITION droppk_d;
ERROR: removing partition "droppk_d" violates foreign key constraint "dropfk_a_fkey5"
DETAIL: Key (a)=(2000) still referenced from table "dropfk".
ALTER TABLE droppk2 DETACH PARTITION droppk2_d;
ERROR: removing partition "droppk2_d" violates foreign key constraint "dropfk_a_fkey4"
DETAIL: Key (a)=(1500) still referenced from table "dropfk".
ALTER TABLE droppk DETACH PARTITION droppk1;
ERROR: removing partition "droppk1" violates foreign key constraint "dropfk_a_fkey1"
DETAIL: Key (a)=(1) still referenced from table "dropfk".
ALTER TABLE droppk DETACH PARTITION droppk2;
ERROR: removing partition "droppk2" violates foreign key constraint "dropfk_a_fkey2"
DETAIL: Key (a)=(1000) still referenced from table "dropfk".
ALTER TABLE droppk2 DETACH PARTITION droppk21;
ERROR: removing partition "droppk21" violates foreign key constraint "dropfk_a_fkey3"
DETAIL: Key (a)=(1000) still referenced from table "dropfk".
-- dropping partitions is disallowed
DROP TABLE droppk_d;
ERROR: cannot drop table droppk_d because other objects depend on it
DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk_d
HINT: Use DROP ... CASCADE to drop the dependent objects too.
DROP TABLE droppk2_d;
ERROR: cannot drop table droppk2_d because other objects depend on it
DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk2_d
HINT: Use DROP ... CASCADE to drop the dependent objects too.
DROP TABLE droppk1;
ERROR: cannot drop table droppk1 because other objects depend on it
DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk1
HINT: Use DROP ... CASCADE to drop the dependent objects too.
DROP TABLE droppk2;
ERROR: cannot drop table droppk2 because other objects depend on it
DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk2
HINT: Use DROP ... CASCADE to drop the dependent objects too.
DROP TABLE droppk21;
ERROR: cannot drop table droppk21 because other objects depend on it
DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk21
HINT: Use DROP ... CASCADE to drop the dependent objects too.
DELETE FROM dropfk;
-- dropping partitions is disallowed, even when no referencing values
DROP TABLE droppk_d;
ERROR: cannot drop table droppk_d because other objects depend on it
DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk_d
HINT: Use DROP ... CASCADE to drop the dependent objects too.
DROP TABLE droppk2_d;
ERROR: cannot drop table droppk2_d because other objects depend on it
DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk2_d
HINT: Use DROP ... CASCADE to drop the dependent objects too.
DROP TABLE droppk1;
ERROR: cannot drop table droppk1 because other objects depend on it
DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk1
HINT: Use DROP ... CASCADE to drop the dependent objects too.
-- but DETACH is allowed, and DROP afterwards works
ALTER TABLE droppk2 DETACH PARTITION droppk21;
DROP TABLE droppk2;
ERROR: cannot drop table droppk2 because other objects depend on it
DETAIL: constraint dropfk_a_fkey on table dropfk depends on table droppk2
HINT: Use DROP ... CASCADE to drop the dependent objects too.
-- Verify that initial constraint creation and cloning behave correctly
CREATE SCHEMA fkpart5;
SET search_path TO fkpart5;
CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY LIST (a);
CREATE TABLE pk1 PARTITION OF pk FOR VALUES IN (1) PARTITION BY LIST (a);
CREATE TABLE pk11 PARTITION OF pk1 FOR VALUES IN (1);
CREATE TABLE fk (a int) PARTITION BY LIST (a);
CREATE TABLE fk1 PARTITION OF fk FOR VALUES IN (1) PARTITION BY LIST (a);
CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES IN (1);
ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk;
CREATE TABLE pk2 PARTITION OF pk FOR VALUES IN (2);
CREATE TABLE pk3 (a int NOT NULL) PARTITION BY LIST (a);
CREATE TABLE pk31 PARTITION OF pk3 FOR VALUES IN (31);
CREATE TABLE pk32 (b int, a int NOT NULL);
ALTER TABLE pk32 DROP COLUMN b;
ALTER TABLE pk3 ATTACH PARTITION pk32 FOR VALUES IN (32);
ALTER TABLE pk ATTACH PARTITION pk3 FOR VALUES IN (31, 32);
CREATE TABLE fk2 PARTITION OF fk FOR VALUES IN (2);
CREATE TABLE fk3 (b int, a int);
ALTER TABLE fk3 DROP COLUMN b;
ALTER TABLE fk ATTACH PARTITION fk3 FOR VALUES IN (3);
SELECT pg_describe_object('pg_constraint'::regclass, oid, 0), confrelid::regclass,
CASE WHEN conparentid <> 0 THEN pg_describe_object('pg_constraint'::regclass, conparentid, 0) ELSE 'TOP' END
FROM pg_catalog.pg_constraint
WHERE conrelid IN (SELECT relid FROM pg_partition_tree('fk'))
ORDER BY conrelid::regclass::text, conname;
pg_describe_object | confrelid | case
------------------------------------+-----------+-----------------------------------
constraint fk_a_fkey on table fk | pk | TOP
constraint fk_a_fkey1 on table fk | pk1 | constraint fk_a_fkey on table fk
constraint fk_a_fkey2 on table fk | pk11 | constraint fk_a_fkey1 on table fk
constraint fk_a_fkey3 on table fk | pk2 | constraint fk_a_fkey on table fk
constraint fk_a_fkey4 on table fk | pk3 | constraint fk_a_fkey on table fk
constraint fk_a_fkey5 on table fk | pk31 | constraint fk_a_fkey4 on table fk
constraint fk_a_fkey6 on table fk | pk32 | constraint fk_a_fkey4 on table fk
constraint fk_a_fkey on table fk1 | pk | constraint fk_a_fkey on table fk
constraint fk_a_fkey on table fk11 | pk | constraint fk_a_fkey on table fk1
constraint fk_a_fkey on table fk2 | pk | constraint fk_a_fkey on table fk
constraint fk_a_fkey on table fk3 | pk | constraint fk_a_fkey on table fk
(11 rows)
CREATE TABLE fk4 (LIKE fk);
INSERT INTO fk4 VALUES (50);
ALTER TABLE fk ATTACH PARTITION fk4 FOR VALUES IN (50);
ERROR: insert or update on table "fk4" violates foreign key constraint "fk_a_fkey"
DETAIL: Key (a)=(50) is not present in table "pk".
-- Verify ON UPDATE/DELETE behavior
CREATE SCHEMA fkpart6;
SET search_path TO fkpart6;
CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY RANGE (a);
CREATE TABLE pk1 PARTITION OF pk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
CREATE TABLE pk11 PARTITION OF pk1 FOR VALUES FROM (1) TO (50);
CREATE TABLE pk12 PARTITION OF pk1 FOR VALUES FROM (50) TO (100);
CREATE TABLE fk (a int) PARTITION BY RANGE (a);
CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE CASCADE ON DELETE CASCADE;
CREATE TABLE fk_d PARTITION OF fk DEFAULT;
INSERT INTO pk VALUES (1);
INSERT INTO fk VALUES (1);
UPDATE pk SET a = 20;
SELECT tableoid::regclass, * FROM fk;
tableoid | a
----------+----
fk12 | 20
(1 row)
DELETE FROM pk WHERE a = 20;
SELECT tableoid::regclass, * FROM fk;
tableoid | a
----------+---
(0 rows)
DROP TABLE fk;
TRUNCATE TABLE pk;
INSERT INTO pk VALUES (20), (50);
CREATE TABLE fk (a int) PARTITION BY RANGE (a);
CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE SET NULL ON DELETE SET NULL;
CREATE TABLE fk_d PARTITION OF fk DEFAULT;
INSERT INTO fk VALUES (20), (50);
UPDATE pk SET a = 21 WHERE a = 20;
DELETE FROM pk WHERE a = 50;
SELECT tableoid::regclass, * FROM fk;
tableoid | a
----------+---
fk_d |
fk_d |
(2 rows)
DROP TABLE fk;
TRUNCATE TABLE pk;
INSERT INTO pk VALUES (20), (30), (50);
CREATE TABLE fk (id int, a int DEFAULT 50) PARTITION BY RANGE (a);
CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE SET DEFAULT ON DELETE SET DEFAULT;
CREATE TABLE fk_d PARTITION OF fk DEFAULT;
INSERT INTO fk VALUES (1, 20), (2, 30);
DELETE FROM pk WHERE a = 20 RETURNING *;
a
----
20
(1 row)
UPDATE pk SET a = 90 WHERE a = 30 RETURNING *;
a
----
90
(1 row)
SELECT tableoid::regclass, * FROM fk;
tableoid | id | a
----------+----+----
fk12 | 1 | 50
fk12 | 2 | 50
(2 rows)
DROP TABLE fk;
TRUNCATE TABLE pk;
INSERT INTO pk VALUES (20), (30);
CREATE TABLE fk (a int DEFAULT 50) PARTITION BY RANGE (a);
CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE RESTRICT ON DELETE RESTRICT;
CREATE TABLE fk_d PARTITION OF fk DEFAULT;
INSERT INTO fk VALUES (20), (30);
DELETE FROM pk WHERE a = 20;
ERROR: update or delete on table "pk11" violates foreign key constraint "fk_a_fkey2" on table "fk"
DETAIL: Key (a)=(20) is still referenced from table "fk".
UPDATE pk SET a = 90 WHERE a = 30;
ERROR: update or delete on table "pk11" violates foreign key constraint "fk_a_fkey2" on table "fk"
DETAIL: Key (a)=(30) is still referenced from table "fk".
SELECT tableoid::regclass, * FROM fk;
tableoid | a
----------+----
fk12 | 20
fk12 | 30
(2 rows)
DROP TABLE fk;
......@@ -1145,18 +1145,6 @@ drop table pktable2, fktable2;
-- Foreign keys and partitioned tables
--
-- partitioned table in the referenced side are not allowed
CREATE TABLE fk_partitioned_pk (a int, b int, primary key (a, b))
PARTITION BY RANGE (a, b);
-- verify with create table first ...
CREATE TABLE fk_notpartitioned_fk (a int, b int,
FOREIGN KEY (a, b) REFERENCES fk_partitioned_pk);
-- and then with alter table.
CREATE TABLE fk_notpartitioned_fk_2 (a int, b int);
ALTER TABLE fk_notpartitioned_fk_2 ADD FOREIGN KEY (a, b)
REFERENCES fk_partitioned_pk;
DROP TABLE fk_partitioned_pk, fk_notpartitioned_fk_2;
-- Creation of a partitioned hierarchy with irregular definitions
CREATE TABLE fk_notpartitioned_pk (fdrop1 int, a int, fdrop2 int, b int,
PRIMARY KEY (a, b));
......@@ -1443,3 +1431,204 @@ alter table fkpart2.fk_part_1 drop constraint fkey; -- ok
alter table fkpart2.fk_part_1_1 drop constraint my_fkey; -- doesn't exist
drop schema fkpart0, fkpart1, fkpart2 cascade;
-- Test a partitioned table as referenced table.
-- Verify basic functionality with a regular partition creation and a partition
-- with a different column layout, as well as partitions added (created and
-- attached) after creating the foreign key.
CREATE SCHEMA fkpart3;
SET search_path TO fkpart3;
CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY RANGE (a);
CREATE TABLE pk1 PARTITION OF pk FOR VALUES FROM (0) TO (1000);
CREATE TABLE pk2 (b int, a int);
ALTER TABLE pk2 DROP COLUMN b;
ALTER TABLE pk2 ALTER a SET NOT NULL;
ALTER TABLE pk ATTACH PARTITION pk2 FOR VALUES FROM (1000) TO (2000);
CREATE TABLE fk (a int) PARTITION BY RANGE (a);
CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (0) TO (750);
ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk;
CREATE TABLE fk2 (b int, a int) ;
ALTER TABLE fk2 DROP COLUMN b;
ALTER TABLE fk ATTACH PARTITION fk2 FOR VALUES FROM (750) TO (3500);
CREATE TABLE pk3 PARTITION OF pk FOR VALUES FROM (2000) TO (3000);
CREATE TABLE pk4 (LIKE pk);
ALTER TABLE pk ATTACH PARTITION pk4 FOR VALUES FROM (3000) TO (4000);
CREATE TABLE pk5 (c int, b int, a int NOT NULL) PARTITION BY RANGE (a);
ALTER TABLE pk5 DROP COLUMN b, DROP COLUMN c;
CREATE TABLE pk51 PARTITION OF pk5 FOR VALUES FROM (4000) TO (4500);
CREATE TABLE pk52 PARTITION OF pk5 FOR VALUES FROM (4500) TO (5000);
ALTER TABLE pk ATTACH PARTITION pk5 FOR VALUES FROM (4000) TO (5000);
CREATE TABLE fk3 PARTITION OF fk FOR VALUES FROM (3500) TO (5000);
-- these should fail: referenced value not present
INSERT into fk VALUES (1);
INSERT into fk VALUES (1000);
INSERT into fk VALUES (2000);
INSERT into fk VALUES (3000);
INSERT into fk VALUES (4000);
INSERT into fk VALUES (4500);
-- insert into the referenced table, now they should work
INSERT into pk VALUES (1), (1000), (2000), (3000), (4000), (4500);
INSERT into fk VALUES (1), (1000), (2000), (3000), (4000), (4500);
-- should fail: referencing value present
DELETE FROM pk WHERE a = 1;
DELETE FROM pk WHERE a = 1000;
DELETE FROM pk WHERE a = 2000;
DELETE FROM pk WHERE a = 3000;
DELETE FROM pk WHERE a = 4000;
DELETE FROM pk WHERE a = 4500;
UPDATE pk SET a = 2 WHERE a = 1;
UPDATE pk SET a = 1002 WHERE a = 1000;
UPDATE pk SET a = 2002 WHERE a = 2000;
UPDATE pk SET a = 3002 WHERE a = 3000;
UPDATE pk SET a = 4002 WHERE a = 4000;
UPDATE pk SET a = 4502 WHERE a = 4500;
-- now they should work
DELETE FROM fk;
UPDATE pk SET a = 2 WHERE a = 1;
DELETE FROM pk WHERE a = 2;
UPDATE pk SET a = 1002 WHERE a = 1000;
DELETE FROM pk WHERE a = 1002;
UPDATE pk SET a = 2002 WHERE a = 2000;
DELETE FROM pk WHERE a = 2002;
UPDATE pk SET a = 3002 WHERE a = 3000;
DELETE FROM pk WHERE a = 3002;
UPDATE pk SET a = 4002 WHERE a = 4000;
DELETE FROM pk WHERE a = 4002;
UPDATE pk SET a = 4502 WHERE a = 4500;
DELETE FROM pk WHERE a = 4502;
CREATE SCHEMA fkpart4;
SET search_path TO fkpart4;
-- dropping/detaching PARTITIONs is prevented if that would break
-- a foreign key's existing data
CREATE TABLE droppk (a int PRIMARY KEY) PARTITION BY RANGE (a);
CREATE TABLE droppk1 PARTITION OF droppk FOR VALUES FROM (0) TO (1000);
CREATE TABLE droppk_d PARTITION OF droppk DEFAULT;
CREATE TABLE droppk2 PARTITION OF droppk FOR VALUES FROM (1000) TO (2000)
PARTITION BY RANGE (a);
CREATE TABLE droppk21 PARTITION OF droppk2 FOR VALUES FROM (1000) TO (1400);
CREATE TABLE droppk2_d PARTITION OF droppk2 DEFAULT;
INSERT into droppk VALUES (1), (1000), (1500), (2000);
CREATE TABLE dropfk (a int REFERENCES droppk);
INSERT into dropfk VALUES (1), (1000), (1500), (2000);
-- these should all fail
ALTER TABLE droppk DETACH PARTITION droppk_d;
ALTER TABLE droppk2 DETACH PARTITION droppk2_d;
ALTER TABLE droppk DETACH PARTITION droppk1;
ALTER TABLE droppk DETACH PARTITION droppk2;
ALTER TABLE droppk2 DETACH PARTITION droppk21;
-- dropping partitions is disallowed
DROP TABLE droppk_d;
DROP TABLE droppk2_d;
DROP TABLE droppk1;
DROP TABLE droppk2;
DROP TABLE droppk21;
DELETE FROM dropfk;
-- dropping partitions is disallowed, even when no referencing values
DROP TABLE droppk_d;
DROP TABLE droppk2_d;
DROP TABLE droppk1;
-- but DETACH is allowed, and DROP afterwards works
ALTER TABLE droppk2 DETACH PARTITION droppk21;
DROP TABLE droppk2;
-- Verify that initial constraint creation and cloning behave correctly
CREATE SCHEMA fkpart5;
SET search_path TO fkpart5;
CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY LIST (a);
CREATE TABLE pk1 PARTITION OF pk FOR VALUES IN (1) PARTITION BY LIST (a);
CREATE TABLE pk11 PARTITION OF pk1 FOR VALUES IN (1);
CREATE TABLE fk (a int) PARTITION BY LIST (a);
CREATE TABLE fk1 PARTITION OF fk FOR VALUES IN (1) PARTITION BY LIST (a);
CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES IN (1);
ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk;
CREATE TABLE pk2 PARTITION OF pk FOR VALUES IN (2);
CREATE TABLE pk3 (a int NOT NULL) PARTITION BY LIST (a);
CREATE TABLE pk31 PARTITION OF pk3 FOR VALUES IN (31);
CREATE TABLE pk32 (b int, a int NOT NULL);
ALTER TABLE pk32 DROP COLUMN b;
ALTER TABLE pk3 ATTACH PARTITION pk32 FOR VALUES IN (32);
ALTER TABLE pk ATTACH PARTITION pk3 FOR VALUES IN (31, 32);
CREATE TABLE fk2 PARTITION OF fk FOR VALUES IN (2);
CREATE TABLE fk3 (b int, a int);
ALTER TABLE fk3 DROP COLUMN b;
ALTER TABLE fk ATTACH PARTITION fk3 FOR VALUES IN (3);
SELECT pg_describe_object('pg_constraint'::regclass, oid, 0), confrelid::regclass,
CASE WHEN conparentid <> 0 THEN pg_describe_object('pg_constraint'::regclass, conparentid, 0) ELSE 'TOP' END
FROM pg_catalog.pg_constraint
WHERE conrelid IN (SELECT relid FROM pg_partition_tree('fk'))
ORDER BY conrelid::regclass::text, conname;
CREATE TABLE fk4 (LIKE fk);
INSERT INTO fk4 VALUES (50);
ALTER TABLE fk ATTACH PARTITION fk4 FOR VALUES IN (50);
-- Verify ON UPDATE/DELETE behavior
CREATE SCHEMA fkpart6;
SET search_path TO fkpart6;
CREATE TABLE pk (a int PRIMARY KEY) PARTITION BY RANGE (a);
CREATE TABLE pk1 PARTITION OF pk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
CREATE TABLE pk11 PARTITION OF pk1 FOR VALUES FROM (1) TO (50);
CREATE TABLE pk12 PARTITION OF pk1 FOR VALUES FROM (50) TO (100);
CREATE TABLE fk (a int) PARTITION BY RANGE (a);
CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE CASCADE ON DELETE CASCADE;
CREATE TABLE fk_d PARTITION OF fk DEFAULT;
INSERT INTO pk VALUES (1);
INSERT INTO fk VALUES (1);
UPDATE pk SET a = 20;
SELECT tableoid::regclass, * FROM fk;
DELETE FROM pk WHERE a = 20;
SELECT tableoid::regclass, * FROM fk;
DROP TABLE fk;
TRUNCATE TABLE pk;
INSERT INTO pk VALUES (20), (50);
CREATE TABLE fk (a int) PARTITION BY RANGE (a);
CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE SET NULL ON DELETE SET NULL;
CREATE TABLE fk_d PARTITION OF fk DEFAULT;
INSERT INTO fk VALUES (20), (50);
UPDATE pk SET a = 21 WHERE a = 20;
DELETE FROM pk WHERE a = 50;
SELECT tableoid::regclass, * FROM fk;
DROP TABLE fk;
TRUNCATE TABLE pk;
INSERT INTO pk VALUES (20), (30), (50);
CREATE TABLE fk (id int, a int DEFAULT 50) PARTITION BY RANGE (a);
CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE SET DEFAULT ON DELETE SET DEFAULT;
CREATE TABLE fk_d PARTITION OF fk DEFAULT;
INSERT INTO fk VALUES (1, 20), (2, 30);
DELETE FROM pk WHERE a = 20 RETURNING *;
UPDATE pk SET a = 90 WHERE a = 30 RETURNING *;
SELECT tableoid::regclass, * FROM fk;
DROP TABLE fk;
TRUNCATE TABLE pk;
INSERT INTO pk VALUES (20), (30);
CREATE TABLE fk (a int DEFAULT 50) PARTITION BY RANGE (a);
CREATE TABLE fk1 PARTITION OF fk FOR VALUES FROM (1) TO (100) PARTITION BY RANGE (a);
CREATE TABLE fk11 PARTITION OF fk1 FOR VALUES FROM (1) TO (10);
CREATE TABLE fk12 PARTITION OF fk1 FOR VALUES FROM (10) TO (100);
ALTER TABLE fk ADD FOREIGN KEY (a) REFERENCES pk ON UPDATE RESTRICT ON DELETE RESTRICT;
CREATE TABLE fk_d PARTITION OF fk DEFAULT;
INSERT INTO fk VALUES (20), (30);
DELETE FROM pk WHERE a = 20;
UPDATE pk SET a = 90 WHERE a = 30;
SELECT tableoid::regclass, * FROM fk;
DROP TABLE fk;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment