Commit 722bf701 authored by Simon Riggs's avatar Simon Riggs

Extend ALTER TABLE to allow Foreign Keys to be added without initial validation.

FK constraints that are marked NOT VALID may later be VALIDATED, which uses an
ShareUpdateExclusiveLock on constraint table and RowShareLock on referenced
table. Significantly reduces lock strength and duration when adding FKs.
New state visible from psql.

Simon Riggs, with reviews from Marko Tiikkaja and Robert Haas
parent 7202ad7b
......@@ -44,6 +44,8 @@ ALTER TABLE <replaceable class="PARAMETER">name</replaceable>
ALTER [ COLUMN ] <replaceable class="PARAMETER">column</replaceable> SET STORAGE { PLAIN | EXTERNAL | EXTENDED | MAIN }
ADD <replaceable class="PARAMETER">table_constraint</replaceable>
ADD <replaceable class="PARAMETER">table_constraint_using_index</replaceable>
ADD <replaceable class="PARAMETER">table_constraint</replaceable> [ NOT VALID ]
VALIDATE CONSTRAINT <replaceable class="PARAMETER">constraint_name</replaceable>
DROP CONSTRAINT [ IF EXISTS ] <replaceable class="PARAMETER">constraint_name</replaceable> [ RESTRICT | CASCADE ]
DISABLE TRIGGER [ <replaceable class="PARAMETER">trigger_name</replaceable> | ALL | USER ]
ENABLE TRIGGER [ <replaceable class="PARAMETER">trigger_name</replaceable> | ALL | USER ]
......@@ -227,11 +229,27 @@ ALTER TABLE <replaceable class="PARAMETER">name</replaceable>
</varlistentry>
<varlistentry>
<term><literal>ADD <replaceable class="PARAMETER">table_constraint</replaceable></literal></term>
<term><literal>ADD <replaceable class="PARAMETER">table_constraint</replaceable>
[ NOT VALID ]</literal></term>
<listitem>
<para>
This form adds a new constraint to a table using the same syntax as
<xref linkend="SQL-CREATETABLE">.
<xref linkend="SQL-CREATETABLE">. Newly added foreign key constraints can
also be defined as <literal>NOT VALID</literal> to avoid the
potentially lengthy initial check that must otherwise be performed.
Constraint checks are skipped at create table time, so
<xref linkend="SQL-CREATETABLE"> does not contain this option.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>VALIDATE CONSTRAINT</literal></term>
<listitem>
<para>
This form validates a foreign key constraint that was previously created
as <literal>NOT VALID</literal>. Constraints already marked valid do not
cause an error response.
</para>
</listitem>
</varlistentry>
......
......@@ -1837,6 +1837,7 @@ StoreRelCheck(Relation rel, char *ccname, Node *expr,
CONSTRAINT_CHECK, /* Constraint Type */
false, /* Is Deferrable */
false, /* Is Deferred */
true, /* Is Validated */
RelationGetRelid(rel), /* relation */
attNos, /* attrs in the constraint */
keycount, /* # attrs in the constraint */
......
......@@ -1103,6 +1103,7 @@ index_constraint_create(Relation heapRelation,
constraintType,
deferrable,
initdeferred,
true,
RelationGetRelid(heapRelation),
indexInfo->ii_KeyAttrNumbers,
indexInfo->ii_NumIndexAttrs,
......
......@@ -46,6 +46,7 @@ CreateConstraintEntry(const char *constraintName,
char constraintType,
bool isDeferrable,
bool isDeferred,
bool isValidated,
Oid relId,
const int16 *constraintKey,
int constraintNKeys,
......@@ -158,6 +159,7 @@ CreateConstraintEntry(const char *constraintName,
values[Anum_pg_constraint_contype - 1] = CharGetDatum(constraintType);
values[Anum_pg_constraint_condeferrable - 1] = BoolGetDatum(isDeferrable);
values[Anum_pg_constraint_condeferred - 1] = BoolGetDatum(isDeferred);
values[Anum_pg_constraint_convalidated - 1] = BoolGetDatum(isValidated);
values[Anum_pg_constraint_conrelid - 1] = ObjectIdGetDatum(relId);
values[Anum_pg_constraint_contypid - 1] = ObjectIdGetDatum(domainId);
values[Anum_pg_constraint_conindid - 1] = ObjectIdGetDatum(indexRelId);
......
......@@ -254,6 +254,7 @@ static void AlterIndexNamespaces(Relation classRel, Relation rel,
static void AlterSeqNamespaces(Relation classRel, Relation rel,
Oid oldNspOid, Oid newNspOid,
const char *newNspName, LOCKMODE lockmode);
static void ATExecValidateConstraint(Relation rel, const char *constrName);
static int transformColumnNameList(Oid relId, List *colList,
int16 *attnums, Oid *atttypids);
static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
......@@ -264,7 +265,7 @@ static Oid transformFkeyCheckAttrs(Relation pkrel,
int numattrs, int16 *attnums,
Oid *opclasses);
static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts);
static void validateForeignKeyConstraint(Constraint *fkconstraint,
static void validateForeignKeyConstraint(char *conname,
Relation rel, Relation pkrel,
Oid pkindOid, Oid constraintOid);
static void createForeignKeyTriggers(Relation rel, Constraint *fkconstraint,
......@@ -2649,7 +2650,7 @@ AlterTableGetLockLevel(List *cmds)
* though don't change the semantic results from normal data reads and writes.
* Delaying an ALTER TABLE behind currently active writes only delays the point
* where the new strategy begins to take effect, so there is no benefit in waiting.
* In thise case the minimum restriction applies: we don't currently allow
* In this case the minimum restriction applies: we don't currently allow
* concurrent catalog updates.
*/
case AT_SetStatistics:
......@@ -2660,6 +2661,7 @@ AlterTableGetLockLevel(List *cmds)
case AT_SetOptions:
case AT_ResetOptions:
case AT_SetStorage:
case AT_ValidateConstraint:
cmd_lockmode = ShareUpdateExclusiveLock;
break;
......@@ -2887,6 +2889,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
ATPrepAddInherit(rel);
pass = AT_PASS_MISC;
break;
case AT_ValidateConstraint:
case AT_EnableTrig: /* ENABLE TRIGGER variants */
case AT_EnableAlwaysTrig:
case AT_EnableReplicaTrig:
......@@ -3054,6 +3057,9 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */
ATExecAddIndexConstraint(tab, rel, (IndexStmt *) cmd->def, lockmode);
break;
case AT_ValidateConstraint:
ATExecValidateConstraint(rel, cmd->name);
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
ATExecDropConstraint(rel, cmd->name, cmd->behavior,
false, false,
......@@ -3307,10 +3313,15 @@ ATRewriteTables(List **wqueue, LOCKMODE lockmode)
*/
refrel = heap_open(con->refrelid, ShareRowExclusiveLock);
validateForeignKeyConstraint(fkconstraint, rel, refrel,
validateForeignKeyConstraint(fkconstraint->conname, rel, refrel,
con->refindid,
con->conid);
/*
* No need to mark the constraint row as validated,
* we did that when we inserted the row earlier.
*/
heap_close(refrel, NoLock);
}
}
......@@ -5509,6 +5520,7 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
CONSTRAINT_FOREIGN,
fkconstraint->deferrable,
fkconstraint->initdeferred,
!fkconstraint->skip_validation,
RelationGetRelid(rel),
fkattnum,
numfks,
......@@ -5538,7 +5550,8 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
/*
* Tell Phase 3 to check that the constraint is satisfied by existing rows
* (we can skip this during table creation).
* We can skip this during table creation or if requested explicitly
* by specifying NOT VALID on an alter table statement.
*/
if (!fkconstraint->skip_validation)
{
......@@ -5561,6 +5574,86 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
heap_close(pkrel, NoLock);
}
/*
* ALTER TABLE VALIDATE CONSTRAINT
*/
static void
ATExecValidateConstraint(Relation rel, const char *constrName)
{
Relation conrel;
Form_pg_constraint con;
SysScanDesc scan;
ScanKeyData key;
HeapTuple tuple;
bool found = false;
Oid conid;
conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
/*
* Find and the target constraint
*/
ScanKeyInit(&key,
Anum_pg_constraint_conrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel)));
scan = systable_beginscan(conrel, ConstraintRelidIndexId,
true, SnapshotNow, 1, &key);
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
con = (Form_pg_constraint) GETSTRUCT(tuple);
if (strcmp(NameStr(con->conname), constrName) != 0)
continue;
conid = HeapTupleGetOid(tuple);
found = true;
break;
}
if (found && con->contype == CONSTRAINT_FOREIGN && !con->convalidated)
{
HeapTuple copyTuple = heap_copytuple(tuple);
Form_pg_constraint copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
Relation refrel;
/*
* Triggers are already in place on both tables, so a
* concurrent write that alters the result here is not
* possible. Normally we can run a query here to do the
* validation, which would only require AccessShareLock.
* In some cases, it is possible that we might need to
* fire triggers to perform the check, so we take a lock
* at RowShareLock level just in case.
*/
refrel = heap_open(con->confrelid, RowShareLock);
validateForeignKeyConstraint((char *)constrName, rel, refrel,
con->conindid,
conid);
/*
* Now update the catalog, while we have the door open.
*/
copy_con->convalidated = true;
simple_heap_update(conrel, &copyTuple->t_self, copyTuple);
CatalogUpdateIndexes(conrel, copyTuple);
heap_freetuple(copyTuple);
heap_close(refrel, NoLock);
}
systable_endscan(scan);
heap_close(conrel, RowExclusiveLock);
if (!found)
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("foreign key constraint \"%s\" of relation \"%s\" does not exist",
constrName, RelationGetRelationName(rel))));
}
}
/*
* transformColumnNameList - transform list of column names
......@@ -5866,7 +5959,7 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
* Caller must have opened and locked both relations appropriately.
*/
static void
validateForeignKeyConstraint(Constraint *fkconstraint,
validateForeignKeyConstraint(char *conname,
Relation rel,
Relation pkrel,
Oid pkindOid,
......@@ -5881,7 +5974,7 @@ validateForeignKeyConstraint(Constraint *fkconstraint,
*/
MemSet(&trig, 0, sizeof(trig));
trig.tgoid = InvalidOid;
trig.tgname = fkconstraint->conname;
trig.tgname = conname;
trig.tgenabled = TRIGGER_FIRES_ON_ORIGIN;
trig.tgisinternal = TRUE;
trig.tgconstrrelid = RelationGetRelid(pkrel);
......
......@@ -422,6 +422,7 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
CONSTRAINT_TRIGGER,
stmt->deferrable,
stmt->initdeferred,
true,
RelationGetRelid(rel),
NULL, /* no conkey */
0,
......
......@@ -2378,6 +2378,7 @@ domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid,
CONSTRAINT_CHECK, /* Constraint Type */
false, /* Is Deferrable */
false, /* Is Deferred */
true, /* Is Validated */
InvalidOid, /* not a relation constraint */
NULL,
0,
......
......@@ -546,7 +546,7 @@ static RangeVar *makeRangeVarFromAnyName(List *names, int position, core_yyscan_
UNBOUNDED UNCOMMITTED UNENCRYPTED UNION UNIQUE UNKNOWN UNLISTEN UNLOGGED
UNTIL UPDATE USER USING
VACUUM VALID VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VOLATILE
WHEN WHERE WHITESPACE_P WINDOW WITH WITHOUT WORK WRAPPER WRITE
......@@ -1752,6 +1752,14 @@ alter_table_cmd:
n->def = $2;
$$ = (Node *)n;
}
/* ALTER TABLE <name> VALIDATE CONSTRAINT ... */
| VALIDATE CONSTRAINT name
{
AlterTableCmd *n = makeNode(AlterTableCmd);
n->subtype = AT_ValidateConstraint;
n->name = $3;
$$ = (Node *)n;
}
/* ALTER TABLE <name> DROP CONSTRAINT IF EXISTS <name> [RESTRICT|CASCADE] */
| DROP CONSTRAINT IF_P EXISTS name opt_drop_behavior
{
......@@ -2743,9 +2751,25 @@ ConstraintElem:
n->fk_matchtype = $9;
n->fk_upd_action = (char) ($10 >> 8);
n->fk_del_action = (char) ($10 & 0xFF);
n->skip_validation = FALSE;
n->deferrable = ($11 & 1) != 0;
n->initdeferred = ($11 & 2) != 0;
n->skip_validation = false;
$$ = (Node *)n;
}
| FOREIGN KEY '(' columnList ')' REFERENCES qualified_name
opt_column_list key_match key_actions
NOT VALID
{
Constraint *n = makeNode(Constraint);
n->contype = CONSTR_FOREIGN;
n->location = @1;
n->pktable = $7;
n->fk_attrs = $4;
n->pk_attrs = $8;
n->fk_matchtype = $9;
n->fk_upd_action = (char) ($10 >> 8);
n->fk_del_action = (char) ($10 & 0xFF);
n->skip_validation = true;
$$ = (Node *)n;
}
;
......
......@@ -2608,8 +2608,11 @@ RI_FKey_keyequal_upd_fk(Trigger *trigger, Relation fk_rel,
* This is not a trigger procedure, but is called during ALTER TABLE
* ADD FOREIGN KEY to validate the initial table contents.
*
* We expect that a ShareRowExclusiveLock or higher has been taken on rel and pkrel;
* hence, we do not need to lock individual rows for the check.
* We expect that the caller has made provision to prevent any problems
* caused by concurrent actions. This could be either by locking rel and
* pkrel at ShareRowExclusiveLock or higher, or by otherwise ensuring
* that triggers implementing the checks are already active.
* Hence, we do not need to lock individual rows for the check.
*
* If the check fails because the current user doesn't have permissions
* to read both tables, return false to let our caller know that they will
......
......@@ -1714,8 +1714,10 @@ describeOneTableDetails(const char *schemaname,
{
printfPQExpBuffer(&buf,
"SELECT conname,\n"
" pg_catalog.pg_get_constraintdef(r.oid, true) as condef\n"
"FROM pg_catalog.pg_constraint r\n"
" pg_catalog.pg_get_constraintdef(r.oid, true) as condef\n");
if (pset.sversion >= 90100)
appendPQExpBuffer(&buf, " ,convalidated\n");
appendPQExpBuffer(&buf, "FROM pg_catalog.pg_constraint r\n"
"WHERE r.conrelid = '%s' AND r.contype = 'f' ORDER BY 1",
oid);
result = PSQLexec(buf.data, false);
......@@ -1734,6 +1736,9 @@ describeOneTableDetails(const char *schemaname,
PQgetvalue(result, i, 0),
PQgetvalue(result, i, 1));
if (strcmp(PQgetvalue(result, i, 2), "f") == 0)
appendPQExpBuffer(&buf, " NOT VALID");
printTableAddFooter(&cont, buf.data);
}
}
......
......@@ -45,6 +45,7 @@ CATALOG(pg_constraint,2606)
char contype; /* constraint type; see codes below */
bool condeferrable; /* deferrable constraint? */
bool condeferred; /* deferred by default? */
bool convalidated; /* constraint has been validated? */
/*
* conrelid and conkey are only meaningful if the constraint applies to a
......@@ -148,29 +149,30 @@ typedef FormData_pg_constraint *Form_pg_constraint;
* compiler constants for pg_constraint
* ----------------
*/
#define Natts_pg_constraint 22
#define Natts_pg_constraint 23
#define Anum_pg_constraint_conname 1
#define Anum_pg_constraint_connamespace 2
#define Anum_pg_constraint_contype 3
#define Anum_pg_constraint_condeferrable 4
#define Anum_pg_constraint_condeferred 5
#define Anum_pg_constraint_conrelid 6
#define Anum_pg_constraint_contypid 7
#define Anum_pg_constraint_conindid 8
#define Anum_pg_constraint_confrelid 9
#define Anum_pg_constraint_confupdtype 10
#define Anum_pg_constraint_confdeltype 11
#define Anum_pg_constraint_confmatchtype 12
#define Anum_pg_constraint_conislocal 13
#define Anum_pg_constraint_coninhcount 14
#define Anum_pg_constraint_conkey 15
#define Anum_pg_constraint_confkey 16
#define Anum_pg_constraint_conpfeqop 17
#define Anum_pg_constraint_conppeqop 18
#define Anum_pg_constraint_conffeqop 19
#define Anum_pg_constraint_conexclop 20
#define Anum_pg_constraint_conbin 21
#define Anum_pg_constraint_consrc 22
#define Anum_pg_constraint_convalidated 6
#define Anum_pg_constraint_conrelid 7
#define Anum_pg_constraint_contypid 8
#define Anum_pg_constraint_conindid 9
#define Anum_pg_constraint_confrelid 10
#define Anum_pg_constraint_confupdtype 11
#define Anum_pg_constraint_confdeltype 12
#define Anum_pg_constraint_confmatchtype 13
#define Anum_pg_constraint_conislocal 14
#define Anum_pg_constraint_coninhcount 15
#define Anum_pg_constraint_conkey 16
#define Anum_pg_constraint_confkey 17
#define Anum_pg_constraint_conpfeqop 18
#define Anum_pg_constraint_conppeqop 19
#define Anum_pg_constraint_conffeqop 20
#define Anum_pg_constraint_conexclop 21
#define Anum_pg_constraint_conbin 22
#define Anum_pg_constraint_consrc 23
/* Valid values for contype */
......@@ -205,6 +207,7 @@ extern Oid CreateConstraintEntry(const char *constraintName,
char constraintType,
bool isDeferrable,
bool isDeferred,
bool isValidated,
Oid relId,
const int16 *constraintKey,
int constraintNKeys,
......@@ -228,6 +231,7 @@ extern Oid CreateConstraintEntry(const char *constraintName,
extern void RemoveConstraintById(Oid conId);
extern void RenameConstraintById(Oid conId, const char *newname);
extern void SetValidatedConstraintById(Oid conId);
extern bool ConstraintNameIsUsed(ConstraintCategory conCat, Oid objId,
Oid objNamespace, const char *conname);
......
......@@ -1140,6 +1140,7 @@ typedef enum AlterTableType
AT_ReAddIndex, /* internal to commands/tablecmds.c */
AT_AddConstraint, /* add constraint */
AT_AddConstraintRecurse, /* internal to commands/tablecmds.c */
AT_ValidateConstraint, /* validate constraint */
AT_ProcessedConstraint, /* pre-processed add constraint (local in
* parser/parse_utilcmd.c) */
AT_AddIndexConstraint, /* add constraint using existing index */
......@@ -1182,6 +1183,7 @@ typedef struct AlterTableCmd /* one subcommand of an ALTER TABLE */
Node *transform; /* transformation expr for ALTER TYPE */
DropBehavior behavior; /* RESTRICT or CASCADE for DROP cases */
bool missing_ok; /* skip error if missing? */
bool validated;
} AlterTableCmd;
......
......@@ -397,6 +397,7 @@ PG_KEYWORD("user", USER, RESERVED_KEYWORD)
PG_KEYWORD("using", USING, RESERVED_KEYWORD)
PG_KEYWORD("vacuum", VACUUM, UNRESERVED_KEYWORD)
PG_KEYWORD("valid", VALID, UNRESERVED_KEYWORD)
PG_KEYWORD("validate", VALIDATE, UNRESERVED_KEYWORD)
PG_KEYWORD("validator", VALIDATOR, UNRESERVED_KEYWORD)
PG_KEYWORD("value", VALUE_P, UNRESERVED_KEYWORD)
PG_KEYWORD("values", VALUES, COL_NAME_KEYWORD)
......
......@@ -184,6 +184,18 @@ DETAIL: Key (a)=(5) is not present in table "tmp2".
DELETE FROM tmp3 where a=5;
-- Try (and succeed)
ALTER TABLE tmp3 add constraint tmpconstr foreign key (a) references tmp2 match full;
ALTER TABLE tmp3 drop constraint tmpconstr;
INSERT INTO tmp3 values (5,50);
-- Try NOT VALID and then VALIDATE CONSTRAINT, but fails. Delete failure then re-validate
ALTER TABLE tmp3 add constraint tmpconstr foreign key (a) references tmp2 match full NOT VALID;
ALTER TABLE tmp3 validate constraint tmpconstr;
ERROR: insert or update on table "tmp3" violates foreign key constraint "tmpconstr"
DETAIL: Key (a)=(5) is not present in table "tmp2".
-- Delete failing row
DELETE FROM tmp3 where a=5;
-- Try (and succeed) and repeat to show it works on already valid constraint
ALTER TABLE tmp3 validate constraint tmpconstr;
ALTER TABLE tmp3 validate constraint tmpconstr;
-- Try (and fail) to create constraint from tmp5(a) to tmp4(a) - unique constraint on
-- tmp4 is a,b
ALTER TABLE tmp5 add constraint tmpconstr foreign key(a) references tmp4(a) match full;
......
......@@ -221,6 +221,21 @@ DELETE FROM tmp3 where a=5;
-- Try (and succeed)
ALTER TABLE tmp3 add constraint tmpconstr foreign key (a) references tmp2 match full;
ALTER TABLE tmp3 drop constraint tmpconstr;
INSERT INTO tmp3 values (5,50);
-- Try NOT VALID and then VALIDATE CONSTRAINT, but fails. Delete failure then re-validate
ALTER TABLE tmp3 add constraint tmpconstr foreign key (a) references tmp2 match full NOT VALID;
ALTER TABLE tmp3 validate constraint tmpconstr;
-- Delete failing row
DELETE FROM tmp3 where a=5;
-- Try (and succeed) and repeat to show it works on already valid constraint
ALTER TABLE tmp3 validate constraint tmpconstr;
ALTER TABLE tmp3 validate constraint tmpconstr;
-- Try (and fail) to create constraint from tmp5(a) to tmp4(a) - unique constraint on
-- tmp4 is a,b
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment