Commit 03afae20 authored by Alvaro Herrera's avatar Alvaro Herrera

Move CloneForeignKeyConstraints to tablecmds.c

My commit 3de241db introduced some code to create a clone of a
foreign key to a partition, but I put it in pg_constraint.c because it
was too close to the contents of the pg_constraint row.  With the
previous commit that split out the constraint tuple deconstruction into
its own routine, it makes more sense to have the FK-cloning function in
tablecmds.c, mostly because its static subroutine can then be used by a
future bugfix.

My initial posting of this patch had this routine as static in
tablecmds.c, but sadly this function is already part of the Postgres 11
ABI as exported from pg_constraint.c, so keep it as exported also just
to avoid breaking any possible users of it.
parent 0080396d
......@@ -39,10 +39,6 @@
#include "utils/tqual.h"
static void clone_fk_constraints(Relation pg_constraint, Relation parentRel,
Relation partRel, List *clone, List **cloned);
/*
* CreateConstraintEntry
* Create a constraint table entry.
......@@ -377,306 +373,6 @@ CreateConstraintEntry(const char *constraintName,
return conOid;
}
/*
* CloneForeignKeyConstraints
* Clone foreign keys from a partitioned table to a newly acquired
* partition.
*
* relationId is a partition of parentId, so we can be certain that it has the
* same columns with the same datatypes. The columns may be in different
* order, though.
*
* The *cloned list is appended ClonedConstraint elements describing what was
* created.
*/
void
CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
{
Relation pg_constraint;
Relation parentRel;
Relation rel;
ScanKeyData key;
SysScanDesc scan;
HeapTuple tuple;
List *clone = NIL;
parentRel = heap_open(parentId, NoLock); /* already got lock */
/* see ATAddForeignKeyConstraint about lock level */
rel = heap_open(relationId, AccessExclusiveLock);
pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
/* Obtain the list of constraints to clone or attach */
ScanKeyInit(&key,
Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(parentId));
scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
NULL, 1, &key);
while ((tuple = systable_getnext(scan)) != NULL)
{
Oid oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
clone = lappend_oid(clone, oid);
}
systable_endscan(scan);
/* Do the actual work, recursing to partitions as needed */
clone_fk_constraints(pg_constraint, parentRel, rel, clone, cloned);
/* We're done. Clean up */
heap_close(parentRel, NoLock);
heap_close(rel, NoLock); /* keep lock till commit */
heap_close(pg_constraint, RowShareLock);
}
/*
* clone_fk_constraints
* Recursive subroutine for CloneForeignKeyConstraints
*
* Clone the given list of FK constraints when a partition is attached.
*
* When cloning foreign keys to a partition, it may happen that equivalent
* constraints already exist in the partition for some of them. We can skip
* creating a clone in that case, and instead just attach the existing
* constraint to the one in the parent.
*
* This function recurses to partitions, if the new partition is partitioned;
* of course, only do this for FKs that were actually cloned.
*/
static void
clone_fk_constraints(Relation pg_constraint, Relation parentRel,
Relation partRel, List *clone, List **cloned)
{
AttrNumber *attmap;
List *partFKs;
List *subclone = NIL;
ListCell *cell;
/*
* The constraint key may differ, if the columns in the partition are
* different. This map is used to convert them.
*/
attmap = convert_tuples_by_name_map(RelationGetDescr(partRel),
RelationGetDescr(parentRel),
gettext_noop("could not convert row type"));
partFKs = copyObject(RelationGetFKeyList(partRel));
foreach(cell, clone)
{
Oid parentConstrOid = lfirst_oid(cell);
Form_pg_constraint constrForm;
HeapTuple tuple;
AttrNumber conkey[INDEX_MAX_KEYS];
AttrNumber mapped_conkey[INDEX_MAX_KEYS];
AttrNumber confkey[INDEX_MAX_KEYS];
Oid conpfeqop[INDEX_MAX_KEYS];
Oid conppeqop[INDEX_MAX_KEYS];
Oid conffeqop[INDEX_MAX_KEYS];
Constraint *fkconstraint;
bool attach_it;
Oid constrOid;
ObjectAddress parentAddr,
childAddr;
int nelem;
ListCell *cell;
int i;
tuple = SearchSysCache1(CONSTROID, parentConstrOid);
if (!tuple)
elog(ERROR, "cache lookup failed for constraint %u",
parentConstrOid);
constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
/* only foreign keys */
if (constrForm->contype != CONSTRAINT_FOREIGN)
{
ReleaseSysCache(tuple);
continue;
}
ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
DeconstructFkConstraintRow(tuple, &nelem, conkey, confkey,
conpfeqop, conppeqop, conffeqop);
for (i = 0; i < nelem; i++)
mapped_conkey[i] = attmap[conkey[i] - 1];
/*
* Before creating a new constraint, see whether any existing FKs are
* fit for the purpose. If one is, attach the parent constraint to it,
* and don't clone anything. This way we avoid the expensive
* verification step and don't end up with a duplicate FK. This also
* means we don't consider this constraint when recursing to
* partitions.
*/
attach_it = false;
foreach(cell, partFKs)
{
ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
Form_pg_constraint partConstr;
HeapTuple partcontup;
attach_it = true;
/*
* Do some quick & easy initial checks. If any of these fail, we
* cannot use this constraint, but keep looking.
*/
if (fk->confrelid != constrForm->confrelid || fk->nkeys != nelem)
{
attach_it = false;
continue;
}
for (i = 0; i < nelem; i++)
{
if (fk->conkey[i] != mapped_conkey[i] ||
fk->confkey[i] != confkey[i] ||
fk->conpfeqop[i] != conpfeqop[i])
{
attach_it = false;
break;
}
}
if (!attach_it)
continue;
/*
* Looks good so far; do some more extensive checks. Presumably
* the check for 'convalidated' could be dropped, since we don't
* really care about that, but let's be careful for now.
*/
partcontup = SearchSysCache1(CONSTROID,
ObjectIdGetDatum(fk->conoid));
if (!partcontup)
elog(ERROR, "cache lookup failed for constraint %u",
fk->conoid);
partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
if (OidIsValid(partConstr->conparentid) ||
!partConstr->convalidated ||
partConstr->condeferrable != constrForm->condeferrable ||
partConstr->condeferred != constrForm->condeferred ||
partConstr->confupdtype != constrForm->confupdtype ||
partConstr->confdeltype != constrForm->confdeltype ||
partConstr->confmatchtype != constrForm->confmatchtype)
{
ReleaseSysCache(partcontup);
attach_it = false;
continue;
}
ReleaseSysCache(partcontup);
/* looks good! Attach this constraint */
ConstraintSetParentConstraint(fk->conoid, constrForm->oid);
CommandCounterIncrement();
attach_it = true;
break;
}
/*
* If we attached to an existing constraint, there is no need to
* create a new one. In fact, there's no need to recurse for this
* constraint to partitions, either.
*/
if (attach_it)
{
ReleaseSysCache(tuple);
continue;
}
constrOid =
CreateConstraintEntry(NameStr(constrForm->conname),
constrForm->connamespace,
CONSTRAINT_FOREIGN,
constrForm->condeferrable,
constrForm->condeferred,
constrForm->convalidated,
constrForm->oid,
RelationGetRelid(partRel),
mapped_conkey,
nelem,
nelem,
InvalidOid, /* not a domain constraint */
constrForm->conindid, /* same index */
constrForm->confrelid, /* same foreign rel */
confkey,
conpfeqop,
conppeqop,
conffeqop,
nelem,
constrForm->confupdtype,
constrForm->confdeltype,
constrForm->confmatchtype,
NULL,
NULL,
NULL,
false,
1, false, true);
subclone = lappend_oid(subclone, constrOid);
ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
fkconstraint = makeNode(Constraint);
/* for now this is all we need */
fkconstraint->conname = pstrdup(NameStr(constrForm->conname));
fkconstraint->fk_upd_action = constrForm->confupdtype;
fkconstraint->fk_del_action = constrForm->confdeltype;
fkconstraint->deferrable = constrForm->condeferrable;
fkconstraint->initdeferred = constrForm->condeferred;
createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
constrOid, constrForm->conindid, false);
if (cloned)
{
ClonedConstraint *newc;
/*
* Feed back caller about the constraints we created, so that they
* can set up constraint verification.
*/
newc = palloc(sizeof(ClonedConstraint));
newc->relid = RelationGetRelid(partRel);
newc->refrelid = constrForm->confrelid;
newc->conindid = constrForm->conindid;
newc->conid = constrOid;
newc->constraint = fkconstraint;
*cloned = lappend(*cloned, newc);
}
ReleaseSysCache(tuple);
}
pfree(attmap);
list_free_deep(partFKs);
/*
* If the partition is partitioned, recurse to handle any constraints that
* were cloned.
*/
if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
subclone != NIL)
{
PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
int i;
for (i = 0; i < partdesc->nparts; i++)
{
Relation childRel;
childRel = heap_open(partdesc->oids[i], AccessExclusiveLock);
clone_fk_constraints(pg_constraint,
partRel,
childRel,
subclone,
cloned);
heap_close(childRel, NoLock); /* keep lock till commit */
}
}
}
/*
* Test whether given name is currently used as a constraint name
* for the given object (relation or domain).
......
......@@ -412,6 +412,10 @@ static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *
Relation rel, Constraint *fkconstraint, Oid parentConstr,
bool recurse, bool recursing,
LOCKMODE lockmode);
static void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
List **cloned);
static void CloneFkReferencing(Relation pg_constraint, Relation parentRel,
Relation partRel, List *clone, List **cloned);
static void ATExecDropConstraint(Relation rel, const char *constrName,
DropBehavior behavior,
bool recurse, bool recursing,
......@@ -7689,6 +7693,308 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
return address;
}
/*
* CloneForeignKeyConstraints
* Clone foreign keys from a partitioned table to a newly acquired
* partition.
*
* relationId is a partition of parentId, so we can be certain that it has the
* same columns with the same datatypes. The columns may be in different
* order, though.
*
* The *cloned list is appended ClonedConstraint elements describing what was
* created, for the purposes of validating the constraint in ALTER TABLE's
* Phase 3.
*/
static void
CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
{
Relation pg_constraint;
Relation parentRel;
Relation rel;
ScanKeyData key;
SysScanDesc scan;
HeapTuple tuple;
List *clone = NIL;
parentRel = heap_open(parentId, NoLock); /* already got lock */
/* see ATAddForeignKeyConstraint about lock level */
rel = heap_open(relationId, AccessExclusiveLock);
pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
/* Obtain the list of constraints to clone or attach */
ScanKeyInit(&key,
Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
F_OIDEQ, ObjectIdGetDatum(parentId));
scan = systable_beginscan(pg_constraint, ConstraintRelidTypidNameIndexId, true,
NULL, 1, &key);
while ((tuple = systable_getnext(scan)) != NULL)
{
Oid oid = ((Form_pg_constraint) GETSTRUCT(tuple))->oid;
clone = lappend_oid(clone, oid);
}
systable_endscan(scan);
/* Do the actual work, recursing to partitions as needed */
CloneFkReferencing(pg_constraint, parentRel, rel, clone, cloned);
/* We're done. Clean up */
heap_close(parentRel, NoLock);
heap_close(rel, NoLock); /* keep lock till commit */
heap_close(pg_constraint, RowShareLock);
}
/*
* CloneFkReferencing
* Recursive subroutine for CloneForeignKeyConstraints, referencing side
*
* Clone the given list of FK constraints when a partition is attached on the
* referencing side of those constraints.
*
* When cloning foreign keys to a partition, it may happen that equivalent
* constraints already exist in the partition for some of them. We can skip
* creating a clone in that case, and instead just attach the existing
* constraint to the one in the parent.
*
* This function recurses to partitions, if the new partition is partitioned;
* of course, only do this for FKs that were actually cloned.
*/
static void
CloneFkReferencing(Relation pg_constraint, Relation parentRel,
Relation partRel, List *clone, List **cloned)
{
AttrNumber *attmap;
List *partFKs;
List *subclone = NIL;
ListCell *cell;
/*
* The constraint key may differ, if the columns in the partition are
* different. This map is used to convert them.
*/
attmap = convert_tuples_by_name_map(RelationGetDescr(partRel),
RelationGetDescr(parentRel),
gettext_noop("could not convert row type"));
partFKs = copyObject(RelationGetFKeyList(partRel));
foreach(cell, clone)
{
Oid parentConstrOid = lfirst_oid(cell);
Form_pg_constraint constrForm;
HeapTuple tuple;
int numfks;
AttrNumber conkey[INDEX_MAX_KEYS];
AttrNumber mapped_conkey[INDEX_MAX_KEYS];
AttrNumber confkey[INDEX_MAX_KEYS];
Oid conpfeqop[INDEX_MAX_KEYS];
Oid conppeqop[INDEX_MAX_KEYS];
Oid conffeqop[INDEX_MAX_KEYS];
Constraint *fkconstraint;
bool attach_it;
Oid constrOid;
ObjectAddress parentAddr,
childAddr;
ListCell *cell;
int i;
tuple = SearchSysCache1(CONSTROID, parentConstrOid);
if (!tuple)
elog(ERROR, "cache lookup failed for constraint %u",
parentConstrOid);
constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
/* only foreign keys */
if (constrForm->contype != CONSTRAINT_FOREIGN)
{
ReleaseSysCache(tuple);
continue;
}
ObjectAddressSet(parentAddr, ConstraintRelationId, parentConstrOid);
DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey,
conpfeqop, conppeqop, conffeqop);
for (i = 0; i < numfks; i++)
mapped_conkey[i] = attmap[conkey[i] - 1];
/*
* Before creating a new constraint, see whether any existing FKs are
* fit for the purpose. If one is, attach the parent constraint to it,
* and don't clone anything. This way we avoid the expensive
* verification step and don't end up with a duplicate FK. This also
* means we don't consider this constraint when recursing to
* partitions.
*/
attach_it = false;
foreach(cell, partFKs)
{
ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, cell);
Form_pg_constraint partConstr;
HeapTuple partcontup;
attach_it = true;
/*
* Do some quick & easy initial checks. If any of these fail, we
* cannot use this constraint, but keep looking.
*/
if (fk->confrelid != constrForm->confrelid || fk->nkeys != numfks)
{
attach_it = false;
continue;
}
for (i = 0; i < numfks; i++)
{
if (fk->conkey[i] != mapped_conkey[i] ||
fk->confkey[i] != confkey[i] ||
fk->conpfeqop[i] != conpfeqop[i])
{
attach_it = false;
break;
}
}
if (!attach_it)
continue;
/*
* Looks good so far; do some more extensive checks. Presumably
* the check for 'convalidated' could be dropped, since we don't
* really care about that, but let's be careful for now.
*/
partcontup = SearchSysCache1(CONSTROID,
ObjectIdGetDatum(fk->conoid));
if (!partcontup)
elog(ERROR, "cache lookup failed for constraint %u",
fk->conoid);
partConstr = (Form_pg_constraint) GETSTRUCT(partcontup);
if (OidIsValid(partConstr->conparentid) ||
!partConstr->convalidated ||
partConstr->condeferrable != constrForm->condeferrable ||
partConstr->condeferred != constrForm->condeferred ||
partConstr->confupdtype != constrForm->confupdtype ||
partConstr->confdeltype != constrForm->confdeltype ||
partConstr->confmatchtype != constrForm->confmatchtype)
{
ReleaseSysCache(partcontup);
attach_it = false;
continue;
}
ReleaseSysCache(partcontup);
/* looks good! Attach this constraint */
ConstraintSetParentConstraint(fk->conoid, parentConstrOid);
CommandCounterIncrement();
attach_it = true;
break;
}
/*
* If we attached to an existing constraint, there is no need to
* create a new one. In fact, there's no need to recurse for this
* constraint to partitions, either.
*/
if (attach_it)
{
ReleaseSysCache(tuple);
continue;
}
constrOid =
CreateConstraintEntry(NameStr(constrForm->conname),
constrForm->connamespace,
CONSTRAINT_FOREIGN,
constrForm->condeferrable,
constrForm->condeferred,
constrForm->convalidated,
parentConstrOid,
RelationGetRelid(partRel),
mapped_conkey,
numfks,
numfks,
InvalidOid, /* not a domain constraint */
constrForm->conindid, /* same index */
constrForm->confrelid, /* same foreign rel */
confkey,
conpfeqop,
conppeqop,
conffeqop,
numfks,
constrForm->confupdtype,
constrForm->confdeltype,
constrForm->confmatchtype,
NULL,
NULL,
NULL,
false,
1, false, true);
subclone = lappend_oid(subclone, constrOid);
ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
fkconstraint = makeNode(Constraint);
/* for now this is all we need */
fkconstraint->conname = pstrdup(NameStr(constrForm->conname));
fkconstraint->fk_upd_action = constrForm->confupdtype;
fkconstraint->fk_del_action = constrForm->confdeltype;
fkconstraint->deferrable = constrForm->condeferrable;
fkconstraint->initdeferred = constrForm->condeferred;
createForeignKeyTriggers(partRel, constrForm->confrelid, fkconstraint,
constrOid, constrForm->conindid, false);
if (cloned)
{
ClonedConstraint *newc;
/*
* Feed back caller about the constraints we created, so that they
* can set up constraint verification.
*/
newc = palloc(sizeof(ClonedConstraint));
newc->relid = RelationGetRelid(partRel);
newc->refrelid = constrForm->confrelid;
newc->conindid = constrForm->conindid;
newc->conid = constrOid;
newc->constraint = fkconstraint;
*cloned = lappend(*cloned, newc);
}
ReleaseSysCache(tuple);
}
pfree(attmap);
list_free_deep(partFKs);
/*
* If the partition is partitioned, recurse to handle any constraints that
* were cloned.
*/
if (partRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
subclone != NIL)
{
PartitionDesc partdesc = RelationGetPartitionDesc(partRel);
int i;
for (i = 0; i < partdesc->nparts; i++)
{
Relation childRel;
childRel = heap_open(partdesc->oids[i], AccessExclusiveLock);
CloneFkReferencing(pg_constraint,
partRel,
childRel,
subclone,
cloned);
heap_close(childRel, NoLock); /* keep lock till commit */
}
}
}
/*
* ALTER TABLE ALTER CONSTRAINT
*
......
......@@ -226,9 +226,6 @@ extern Oid CreateConstraintEntry(const char *constraintName,
bool conNoInherit,
bool is_internal);
extern void CloneForeignKeyConstraints(Oid parentId, Oid relationId,
List **cloned);
extern void RemoveConstraintById(Oid conId);
extern void RenameConstraintById(Oid conId, const char *newname);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment