Commit fec58f6c authored by Tom Lane's avatar Tom Lane

During ALTER TABLE ADD FOREIGN KEY, try to check the existing rows using

a single LEFT JOIN query instead of firing the check trigger for each
row individually.  Stephan Szabo, with some kibitzing from Tom Lane and
Jan Wieck.
parent a0ab31dc
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.85 2003/10/02 06:36:37 petere Exp $
* $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.86 2003/10/06 16:38:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -3454,6 +3454,13 @@ validateForeignKeyConstraint(FkConstraint *fkconstraint,
List *list;
int count;
/*
* See if we can do it with a single LEFT JOIN query. A FALSE result
* indicates we must proceed with the fire-the-trigger method.
*/
if (RI_Initial_Check(fkconstraint, rel, pkrel))
return;
/*
* Scan through each tuple, calling RI_FKey_check_ins (insert trigger)
* as if that tuple had just been inserted. If any of those fail, it
......
......@@ -17,7 +17,7 @@
*
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
*
* $Header: /cvsroot/pgsql/src/backend/utils/adt/ri_triggers.c,v 1.61 2003/10/01 21:30:52 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/utils/adt/ri_triggers.c,v 1.62 2003/10/06 16:38:28 tgl Exp $
*
* ----------
*/
......@@ -40,6 +40,7 @@
#include "rewrite/rewriteHandler.h"
#include "utils/lsyscache.h"
#include "utils/typcache.h"
#include "utils/acl.h"
#include "miscadmin.h"
......@@ -164,7 +165,8 @@ static void ri_ExtractValues(RI_QueryKey *qkey, int key_idx,
Datum *vals, char *nulls);
static void ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
Relation pk_rel, Relation fk_rel,
HeapTuple violator, bool spi_err);
HeapTuple violator, TupleDesc tupdesc,
bool spi_err);
/* ----------
......@@ -2540,7 +2542,205 @@ RI_FKey_keyequal_upd(TriggerData *trigdata)
}
/* ----------
* RI_Initial_Check -
*
* Check an entire table for non-matching values using a single query.
* This is not a trigger procedure, but is called during ALTER TABLE
* ADD FOREIGN KEY to validate the initial table contents.
*
* We expect that an exclusive lock has been taken on rel and pkrel;
* hence, we do not need to lock individual rows for the check.
*
* If the check fails because the current user doesn't have permissions
* to read both tables, return false to let our caller know that they will
* need to do something else to check the constraint.
* ----------
*/
bool
RI_Initial_Check(FkConstraint *fkconstraint, Relation rel, Relation pkrel)
{
const char *constrname = fkconstraint->constr_name;
char querystr[MAX_QUOTED_REL_NAME_LEN * 2 + 250 +
(MAX_QUOTED_NAME_LEN + 32) * ((RI_MAX_NUMKEYS * 4)+1)];
char pkrelname[MAX_QUOTED_REL_NAME_LEN];
char relname[MAX_QUOTED_REL_NAME_LEN];
char attname[MAX_QUOTED_NAME_LEN];
char fkattname[MAX_QUOTED_NAME_LEN];
const char *sep;
List *list;
List *list2;
int spi_result;
void *qplan;
/*
* Check to make sure current user has enough permissions to do the
* test query. (If not, caller can fall back to the trigger method,
* which works because it changes user IDs on the fly.)
*
* XXX are there any other show-stopper conditions to check?
*/
if (pg_class_aclcheck(RelationGetRelid(rel), GetUserId(), ACL_SELECT) != ACLCHECK_OK)
return false;
if (pg_class_aclcheck(RelationGetRelid(pkrel), GetUserId(), ACL_SELECT) != ACLCHECK_OK)
return false;
/*----------
* The query string built is:
* SELECT fk.keycols FROM ONLY relname fk
* LEFT OUTER JOIN ONLY pkrelname pk
* ON (pk.pkkeycol1=fk.keycol1 [AND ...])
* WHERE pk.pkkeycol1 IS NULL AND
* For MATCH unspecified:
* (fk.keycol1 IS NOT NULL [AND ...])
* For MATCH FULL:
* (fk.keycol1 IS NOT NULL [OR ...])
*----------
*/
sprintf(querystr, "SELECT ");
sep="";
foreach(list, fkconstraint->fk_attrs)
{
quoteOneName(attname, strVal(lfirst(list)));
snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
"%sfk.%s", sep, attname);
sep = ", ";
}
quoteRelationName(pkrelname, pkrel);
quoteRelationName(relname, rel);
snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
" FROM ONLY %s fk LEFT OUTER JOIN ONLY %s pk ON (",
relname, pkrelname);
sep="";
for (list=fkconstraint->pk_attrs, list2=fkconstraint->fk_attrs;
list != NIL && list2 != NIL;
list=lnext(list), list2=lnext(list2))
{
quoteOneName(attname, strVal(lfirst(list)));
quoteOneName(fkattname, strVal(lfirst(list2)));
snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
"%spk.%s=fk.%s",
sep, attname, fkattname);
sep = " AND ";
}
/*
* It's sufficient to test any one pk attribute for null to detect a
* join failure.
*/
quoteOneName(attname, strVal(lfirst(fkconstraint->pk_attrs)));
snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
") WHERE pk.%s IS NULL AND (", attname);
sep="";
foreach(list, fkconstraint->fk_attrs)
{
quoteOneName(attname, strVal(lfirst(list)));
snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
"%sfk.%s IS NOT NULL",
sep, attname);
switch (fkconstraint->fk_matchtype)
{
case FKCONSTR_MATCH_UNSPECIFIED:
sep=" AND ";
break;
case FKCONSTR_MATCH_FULL:
sep=" OR ";
break;
case FKCONSTR_MATCH_PARTIAL:
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("MATCH PARTIAL not yet implemented")));
break;
default:
elog(ERROR, "unrecognized match type: %d",
fkconstraint->fk_matchtype);
break;
}
}
snprintf(querystr + strlen(querystr), sizeof(querystr) - strlen(querystr),
")");
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect failed");
/*
* Generate the plan. We don't need to cache it, and there are no
* arguments to the plan.
*/
qplan = SPI_prepare(querystr, 0, NULL);
if (qplan == NULL)
elog(ERROR, "SPI_prepare returned %d for %s", SPI_result, querystr);
/*
* Run the plan. For safety we force a current query snapshot to be
* used. (In serializable mode, this arguably violates serializability,
* but we really haven't got much choice.) We need at most one tuple
* returned, so pass limit = 1.
*/
spi_result = SPI_execp_current(qplan, NULL, NULL, true, 1);
/* Check result */
if (spi_result != SPI_OK_SELECT)
elog(ERROR, "SPI_execp_current returned %d", spi_result);
/* Did we find a tuple violating the constraint? */
if (SPI_processed > 0)
{
HeapTuple tuple = SPI_tuptable->vals[0];
TupleDesc tupdesc = SPI_tuptable->tupdesc;
int nkeys = length(fkconstraint->fk_attrs);
int i;
RI_QueryKey qkey;
/*
* If it's MATCH FULL, and there are any nulls in the FK keys,
* complain about that rather than the lack of a match. MATCH FULL
* disallows partially-null FK rows.
*/
if (fkconstraint->fk_matchtype == FKCONSTR_MATCH_FULL)
{
bool isnull = false;
for (i = 1; i <= nkeys; i++)
{
(void) SPI_getbinval(tuple, tupdesc, i, &isnull);
if (isnull)
break;
}
if (isnull)
ereport(ERROR,
(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
RelationGetRelationName(rel),
constrname),
errdetail("MATCH FULL does not allow mixing of null and nonnull key values.")));
}
/*
* Although we didn't cache the query, we need to set up a fake
* query key to pass to ri_ReportViolation.
*/
MemSet(&qkey, 0, sizeof(qkey));
qkey.constr_queryno = RI_PLAN_CHECK_LOOKUPPK;
qkey.nkeypairs = nkeys;
for (i = 0; i < nkeys; i++)
qkey.keypair[i][RI_KEYPAIR_FK_IDX] = i + 1;
ri_ReportViolation(&qkey, constrname,
pkrel, rel,
tuple, tupdesc,
false);
}
if (SPI_finish() != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed");
return true;
}
/* ----------
......@@ -2782,6 +2982,9 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
/* Create the plan */
qplan = SPI_prepare(querystr, nargs, argtypes);
if (qplan == NULL)
elog(ERROR, "SPI_prepare returned %d for %s", SPI_result, querystr);
/* Restore UID */
SetUserId(save_uid);
......@@ -2905,6 +3108,7 @@ ri_PerformCheck(RI_QueryKey *qkey, void *qplan,
ri_ReportViolation(qkey, constrname ? constrname : "",
pk_rel, fk_rel,
new_tuple ? new_tuple : old_tuple,
NULL,
true);
/* XXX wouldn't it be clearer to do this part at the caller? */
......@@ -2913,6 +3117,7 @@ ri_PerformCheck(RI_QueryKey *qkey, void *qplan,
ri_ReportViolation(qkey, constrname,
pk_rel, fk_rel,
new_tuple ? new_tuple : old_tuple,
NULL,
false);
return SPI_processed != 0;
......@@ -2950,7 +3155,8 @@ ri_ExtractValues(RI_QueryKey *qkey, int key_idx,
static void
ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
Relation pk_rel, Relation fk_rel,
HeapTuple violator, bool spi_err)
HeapTuple violator, TupleDesc tupdesc,
bool spi_err)
{
#define BUFLENGTH 512
char key_names[BUFLENGTH];
......@@ -2958,7 +3164,6 @@ ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
char *name_ptr = key_names;
char *val_ptr = key_values;
bool onfk;
Relation rel;
int idx,
key_idx;
......@@ -2972,18 +3177,21 @@ ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
errhint("This is most likely due to a rule having rewritten the query.")));
/*
* rel is set to where the tuple description is coming from.
* Determine which relation to complain about. If tupdesc wasn't
* passed by caller, assume the violator tuple came from there.
*/
onfk = (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK);
if (onfk)
{
rel = fk_rel;
key_idx = RI_KEYPAIR_FK_IDX;
if (tupdesc == NULL)
tupdesc = fk_rel->rd_att;
}
else
{
rel = pk_rel;
key_idx = RI_KEYPAIR_PK_IDX;
if (tupdesc == NULL)
tupdesc = pk_rel->rd_att;
}
/*
......@@ -3008,8 +3216,8 @@ ri_ReportViolation(RI_QueryKey *qkey, const char *constrname,
char *name,
*val;
name = SPI_fname(rel->rd_att, fnum);
val = SPI_getvalue(violator, rel->rd_att, fnum);
name = SPI_fname(tupdesc, fnum);
val = SPI_getvalue(violator, tupdesc, fnum);
if (!val)
val = "null";
......
......@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $Id: trigger.h,v 1.43 2003/08/04 02:40:13 momjian Exp $
* $Id: trigger.h,v 1.44 2003/10/06 16:38:28 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -197,5 +197,8 @@ extern void DeferredTriggerSetState(ConstraintsSetStmt *stmt);
* in utils/adt/ri_triggers.c
*/
extern bool RI_FKey_keyequal_upd(TriggerData *trigdata);
extern bool RI_Initial_Check(FkConstraint *fkconstraint,
Relation rel,
Relation pkrel);
#endif /* TRIGGER_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment