Commit f144f732 authored by Tom Lane's avatar Tom Lane

Refactor check_functional_grouping() to use get_primary_key_attnos().

If we ever get around to allowing functional dependency to be proven
from other things besides simple primary keys, this code will need to
be rethought, but that was true anyway.  In the meantime, we might as
well not have two very-similar routines for scanning pg_constraint.

David Rowley, reviewed by Julien Rouhaud
parent d4c3a156
...@@ -986,93 +986,35 @@ check_functional_grouping(Oid relid, ...@@ -986,93 +986,35 @@ check_functional_grouping(Oid relid,
List *grouping_columns, List *grouping_columns,
List **constraintDeps) List **constraintDeps)
{ {
bool result = false; Bitmapset *pkattnos;
Relation pg_constraint; Bitmapset *groupbyattnos;
HeapTuple tuple; Oid constraintOid;
SysScanDesc scan;
ScanKeyData skey[1];
/* Scan pg_constraint for constraints of the target rel */
pg_constraint = heap_open(ConstraintRelationId, AccessShareLock);
ScanKeyInit(&skey[0],
Anum_pg_constraint_conrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(relid));
scan = systable_beginscan(pg_constraint, ConstraintRelidIndexId, true,
NULL, 1, skey);
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
Datum adatum;
bool isNull;
ArrayType *arr;
int16 *attnums;
int numkeys;
int i;
bool found_col;
/* Only PK constraints are of interest for now, see comment above */
if (con->contype != CONSTRAINT_PRIMARY)
continue;
/* Constraint must be non-deferrable */
if (con->condeferrable)
continue;
/* Extract the conkey array, ie, attnums of PK's columns */
adatum = heap_getattr(tuple, Anum_pg_constraint_conkey,
RelationGetDescr(pg_constraint), &isNull);
if (isNull)
elog(ERROR, "null conkey for constraint %u",
HeapTupleGetOid(tuple));
arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */
numkeys = ARR_DIMS(arr)[0];
if (ARR_NDIM(arr) != 1 ||
numkeys < 0 ||
ARR_HASNULL(arr) ||
ARR_ELEMTYPE(arr) != INT2OID)
elog(ERROR, "conkey is not a 1-D smallint array");
attnums = (int16 *) ARR_DATA_PTR(arr);
found_col = false;
for (i = 0; i < numkeys; i++)
{
AttrNumber attnum = attnums[i];
ListCell *gl; ListCell *gl;
found_col = false; /* If the rel has no PK, then we can't prove functional dependency */
pkattnos = get_primary_key_attnos(relid, false, &constraintOid);
if (pkattnos == NULL)
return false;
/* Identify all the rel's columns that appear in grouping_columns */
groupbyattnos = NULL;
foreach(gl, grouping_columns) foreach(gl, grouping_columns)
{ {
Var *gvar = (Var *) lfirst(gl); Var *gvar = (Var *) lfirst(gl);
if (IsA(gvar, Var) && if (IsA(gvar, Var) &&
gvar->varno == varno && gvar->varno == varno &&
gvar->varlevelsup == varlevelsup && gvar->varlevelsup == varlevelsup)
gvar->varattno == attnum) groupbyattnos = bms_add_member(groupbyattnos,
{ gvar->varattno - FirstLowInvalidHeapAttributeNumber);
found_col = true;
break;
}
}
if (!found_col)
break;
} }
if (found_col) if (bms_is_subset(pkattnos, groupbyattnos))
{ {
/* The PK is a subset of grouping_columns, so we win */ /* The PK is a subset of grouping_columns, so we win */
*constraintDeps = lappend_oid(*constraintDeps, *constraintDeps = lappend_oid(*constraintDeps, constraintOid);
HeapTupleGetOid(tuple)); return true;
result = true;
break;
} }
}
systable_endscan(scan);
heap_close(pg_constraint, AccessShareLock);
return result; return false;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment