Commit 89779524 authored by Alvaro Herrera's avatar Alvaro Herrera

Enable CHECK constraints to be declared NOT VALID

This means that they can initially be added to a large existing table
without checking its initial contents, but new tuples must comply to
them; a separate pass invoked by ALTER TABLE / VALIDATE can verify
existing data and ensure it complies with the constraint, at which point
it is marked validated and becomes a normal part of the table ecosystem.

An non-validated CHECK constraint is ignored in the planner for
constraint_exclusion purposes; when validated, cached plans are
recomputed so that partitioning starts working right away.

This patch also enables domains to have unvalidated CHECK constraints
attached to them as well by way of ALTER DOMAIN / ADD CONSTRAINT / NOT
VALID, which can later be validated with ALTER DOMAIN / VALIDATE
CONSTRAINT.

Thanks to Thom Brown, Dean Rasheed and Jaime Casanova for the various
reviews, and Robert Hass for documentation wording improvement
suggestions.

This patch was sponsored by Enova Financial.
parent b36927fb
...@@ -1899,7 +1899,7 @@ ...@@ -1899,7 +1899,7 @@
<entry><type>bool</type></entry> <entry><type>bool</type></entry>
<entry></entry> <entry></entry>
<entry>Has the constraint been validated? <entry>Has the constraint been validated?
Currently, can only be false for foreign keys</entry> Currently, can only be false for foreign keys and CHECK constraints</entry>
</row> </row>
<row> <row>
......
...@@ -28,9 +28,11 @@ ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> ...@@ -28,9 +28,11 @@ ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable>
ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable>
{ SET | DROP } NOT NULL { SET | DROP } NOT NULL
ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable>
ADD <replaceable class="PARAMETER">domain_constraint</replaceable> ADD <replaceable class="PARAMETER">domain_constraint</replaceable> [ NOT VALID ]
ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable>
DROP CONSTRAINT <replaceable class="PARAMETER">constraint_name</replaceable> [ RESTRICT | CASCADE ] DROP CONSTRAINT <replaceable class="PARAMETER">constraint_name</replaceable> [ RESTRICT | CASCADE ]
ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable>
VALIDATE CONSTRAINT <replaceable class="PARAMETER">constraint_name</replaceable>
ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable>
OWNER TO <replaceable class="PARAMETER">new_owner</replaceable> OWNER TO <replaceable class="PARAMETER">new_owner</replaceable>
ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable>
...@@ -70,13 +72,19 @@ ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> ...@@ -70,13 +72,19 @@ ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable>
</varlistentry> </varlistentry>
<varlistentry> <varlistentry>
<term>ADD <replaceable class="PARAMETER">domain_constraint</replaceable></term> <term>ADD <replaceable class="PARAMETER">domain_constraint</replaceable> [ NOT VALID ]</term>
<listitem> <listitem>
<para> <para>
This form adds a new constraint to a domain using the same syntax as This form adds a new constraint to a domain using the same syntax as
<xref linkend="SQL-CREATEDOMAIN">. <xref linkend="SQL-CREATEDOMAIN">.
This will only succeed if all columns using the domain satisfy the When a new constraint is added to a domain, all columns using that
new constraint. domain will be checked against the newly added constraint. These
checks can be suppressed by adding the new constraint using the
<literal>NOT VALID</literal> option; the constraint can later be made
valid using <command>ALTER DOMAIN ... VALIDATE CONSTRAINT</command>.
Newly inserted or updated rows are always checked against all
constraints, even those marked <literal>NOT VALID</literal>.
<literal>NOT VALID</> is only accepted for <literal>CHECK</> constraints.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
...@@ -90,6 +98,17 @@ ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> ...@@ -90,6 +98,17 @@ ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term>VALIDATE CONSTRAINT</term>
<listitem>
<para>
This form validates a constraint previously added as
<literal>NOT VALID</>, that is, verify that all data in columns using the
domain satisfy the specified constraint.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term>OWNER</term> <term>OWNER</term>
<listitem> <listitem>
...@@ -155,6 +174,16 @@ ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable> ...@@ -155,6 +174,16 @@ ALTER DOMAIN <replaceable class="PARAMETER">name</replaceable>
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><replaceable class="PARAMETER">NOT VALID</replaceable></term>
<listitem>
<para>
Do not verify existing column data for constraint validity.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><literal>CASCADE</literal></term> <term><literal>CASCADE</literal></term>
<listitem> <listitem>
...@@ -250,9 +279,11 @@ ALTER DOMAIN zipcode SET SCHEMA customers; ...@@ -250,9 +279,11 @@ ALTER DOMAIN zipcode SET SCHEMA customers;
<para> <para>
<command>ALTER DOMAIN</command> conforms to the <acronym>SQL</acronym> <command>ALTER DOMAIN</command> conforms to the <acronym>SQL</acronym>
standard, standard, except for the <literal>OWNER</>, <literal>SET SCHEMA</> and
except for the <literal>OWNER</> and <literal>SET SCHEMA</> variants, <literal>VALIDATE CONSTRAINT</> variants, which are
which are <productname>PostgreSQL</productname> extensions. <productname>PostgreSQL</productname> extensions. The <literal>NOT VALID</>
clause of the <literal>ADD CONSTRAINT</> variant is also a
<productname>PostgreSQL</productname> extension.
</para> </para>
</refsect1> </refsect1>
......
...@@ -240,12 +240,14 @@ ALTER TABLE <replaceable class="PARAMETER">name</replaceable> ...@@ -240,12 +240,14 @@ ALTER TABLE <replaceable class="PARAMETER">name</replaceable>
This form adds a new constraint to a table using the same syntax as This form adds a new constraint to a table using the same syntax as
<xref linkend="SQL-CREATETABLE">, plus the option <literal>NOT <xref linkend="SQL-CREATETABLE">, plus the option <literal>NOT
VALID</literal>, which is currently only allowed for foreign key VALID</literal>, which is currently only allowed for foreign key
constraints. and CHECK constraints.
If the constraint is marked <literal>NOT VALID</literal>, the If the constraint is marked <literal>NOT VALID</literal>, the
potentially-lengthy initial check to verify that all rows in the table potentially-lengthy initial check to verify that all rows in the table
satisfy the constraint is skipped. The constraint will still be satisfy the constraint is skipped. The constraint will still be
enforced against subsequent inserts or updates (that is, they'll fail enforced against subsequent inserts or updates (that is, they'll fail
unless there is a matching row in the referenced table). But the unless there is a matching row in the referenced table, in the case
of foreign keys; and they'll fail unless the new row matches the
specified check constraints). But the
database will not assume that the constraint holds for all rows in database will not assume that the constraint holds for all rows in
the table, until it is validated by using the <literal>VALIDATE the table, until it is validated by using the <literal>VALIDATE
CONSTRAINT</literal> option. CONSTRAINT</literal> option.
...@@ -308,10 +310,10 @@ ALTER TABLE <replaceable class="PARAMETER">name</replaceable> ...@@ -308,10 +310,10 @@ ALTER TABLE <replaceable class="PARAMETER">name</replaceable>
<term><literal>VALIDATE CONSTRAINT</literal></term> <term><literal>VALIDATE CONSTRAINT</literal></term>
<listitem> <listitem>
<para> <para>
This form validates a foreign key constraint that was previously created This form validates a foreign key or check constraint that was previously created
as <literal>NOT VALID</literal>, by scanning the table to ensure there as <literal>NOT VALID</literal>, by scanning the table to ensure there
are no unmatched rows. Nothing happens if the constraint is are no rows for which the constraint is not satisfied.
already marked valid. Nothing happens if the constraint is already marked valid.
The value of separating validation from initial creation of the The value of separating validation from initial creation of the
constraint is that validation requires a lesser lock on the table constraint is that validation requires a lesser lock on the table
than constraint creation does. than constraint creation does.
......
...@@ -200,6 +200,7 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc) ...@@ -200,6 +200,7 @@ CreateTupleDescCopyConstr(TupleDesc tupdesc)
cpy->check[i].ccname = pstrdup(constr->check[i].ccname); cpy->check[i].ccname = pstrdup(constr->check[i].ccname);
if (constr->check[i].ccbin) if (constr->check[i].ccbin)
cpy->check[i].ccbin = pstrdup(constr->check[i].ccbin); cpy->check[i].ccbin = pstrdup(constr->check[i].ccbin);
cpy->check[i].ccvalid = constr->check[i].ccvalid;
} }
} }
......
...@@ -98,7 +98,7 @@ static Oid AddNewRelationType(const char *typeName, ...@@ -98,7 +98,7 @@ static Oid AddNewRelationType(const char *typeName,
Oid new_array_type); Oid new_array_type);
static void RelationRemoveInheritance(Oid relid); static void RelationRemoveInheritance(Oid relid);
static void StoreRelCheck(Relation rel, char *ccname, Node *expr, static void StoreRelCheck(Relation rel, char *ccname, Node *expr,
bool is_local, int inhcount); bool is_validated, bool is_local, int inhcount);
static void StoreConstraints(Relation rel, List *cooked_constraints); static void StoreConstraints(Relation rel, List *cooked_constraints);
static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr, static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
bool allow_merge, bool is_local); bool allow_merge, bool is_local);
...@@ -1846,7 +1846,7 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr) ...@@ -1846,7 +1846,7 @@ StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr)
*/ */
static void static void
StoreRelCheck(Relation rel, char *ccname, Node *expr, StoreRelCheck(Relation rel, char *ccname, Node *expr,
bool is_local, int inhcount) bool is_validated, bool is_local, int inhcount)
{ {
char *ccbin; char *ccbin;
char *ccsrc; char *ccsrc;
...@@ -1907,7 +1907,7 @@ StoreRelCheck(Relation rel, char *ccname, Node *expr, ...@@ -1907,7 +1907,7 @@ StoreRelCheck(Relation rel, char *ccname, Node *expr,
CONSTRAINT_CHECK, /* Constraint Type */ CONSTRAINT_CHECK, /* Constraint Type */
false, /* Is Deferrable */ false, /* Is Deferrable */
false, /* Is Deferred */ false, /* Is Deferred */
true, /* Is Validated */ is_validated,
RelationGetRelid(rel), /* relation */ RelationGetRelid(rel), /* relation */
attNos, /* attrs in the constraint */ attNos, /* attrs in the constraint */
keycount, /* # attrs in the constraint */ keycount, /* # attrs in the constraint */
...@@ -1967,7 +1967,7 @@ StoreConstraints(Relation rel, List *cooked_constraints) ...@@ -1967,7 +1967,7 @@ StoreConstraints(Relation rel, List *cooked_constraints)
StoreAttrDefault(rel, con->attnum, con->expr); StoreAttrDefault(rel, con->attnum, con->expr);
break; break;
case CONSTR_CHECK: case CONSTR_CHECK:
StoreRelCheck(rel, con->name, con->expr, StoreRelCheck(rel, con->name, con->expr, !con->skip_validation,
con->is_local, con->inhcount); con->is_local, con->inhcount);
numchecks++; numchecks++;
break; break;
...@@ -2081,6 +2081,7 @@ AddRelationNewConstraints(Relation rel, ...@@ -2081,6 +2081,7 @@ AddRelationNewConstraints(Relation rel,
cooked->name = NULL; cooked->name = NULL;
cooked->attnum = colDef->attnum; cooked->attnum = colDef->attnum;
cooked->expr = expr; cooked->expr = expr;
cooked->skip_validation = false;
cooked->is_local = is_local; cooked->is_local = is_local;
cooked->inhcount = is_local ? 0 : 1; cooked->inhcount = is_local ? 0 : 1;
cookedConstraints = lappend(cookedConstraints, cooked); cookedConstraints = lappend(cookedConstraints, cooked);
...@@ -2194,7 +2195,8 @@ AddRelationNewConstraints(Relation rel, ...@@ -2194,7 +2195,8 @@ AddRelationNewConstraints(Relation rel,
/* /*
* OK, store it. * OK, store it.
*/ */
StoreRelCheck(rel, ccname, expr, is_local, is_local ? 0 : 1); StoreRelCheck(rel, ccname, expr, !cdef->skip_validation, is_local,
is_local ? 0 : 1);
numchecks++; numchecks++;
...@@ -2203,6 +2205,7 @@ AddRelationNewConstraints(Relation rel, ...@@ -2203,6 +2205,7 @@ AddRelationNewConstraints(Relation rel,
cooked->name = ccname; cooked->name = ccname;
cooked->attnum = 0; cooked->attnum = 0;
cooked->expr = expr; cooked->expr = expr;
cooked->skip_validation = cdef->skip_validation;
cooked->is_local = is_local; cooked->is_local = is_local;
cooked->inhcount = is_local ? 0 : 1; cooked->inhcount = is_local ? 0 : 1;
cookedConstraints = lappend(cookedConstraints, cooked); cookedConstraints = lappend(cookedConstraints, cooked);
......
...@@ -259,7 +259,8 @@ static void AlterIndexNamespaces(Relation classRel, Relation rel, ...@@ -259,7 +259,8 @@ static void AlterIndexNamespaces(Relation classRel, Relation rel,
static void AlterSeqNamespaces(Relation classRel, Relation rel, static void AlterSeqNamespaces(Relation classRel, Relation rel,
Oid oldNspOid, Oid newNspOid, Oid oldNspOid, Oid newNspOid,
const char *newNspName, LOCKMODE lockmode); const char *newNspName, LOCKMODE lockmode);
static void ATExecValidateConstraint(Relation rel, char *constrName); static void ATExecValidateConstraint(Relation rel, char *constrName,
bool recurse, bool recursing, LOCKMODE lockmode);
static int transformColumnNameList(Oid relId, List *colList, static int transformColumnNameList(Oid relId, List *colList,
int16 *attnums, Oid *atttypids); int16 *attnums, Oid *atttypids);
static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
...@@ -270,6 +271,7 @@ static Oid transformFkeyCheckAttrs(Relation pkrel, ...@@ -270,6 +271,7 @@ static Oid transformFkeyCheckAttrs(Relation pkrel,
int numattrs, int16 *attnums, int numattrs, int16 *attnums,
Oid *opclasses); Oid *opclasses);
static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts); static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts);
static void validateCheckConstraint(Relation rel, HeapTuple constrtup);
static void validateForeignKeyConstraint(char *conname, static void validateForeignKeyConstraint(char *conname,
Relation rel, Relation pkrel, Relation rel, Relation pkrel,
Oid pkindOid, Oid constraintOid); Oid pkindOid, Oid constraintOid);
...@@ -561,6 +563,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId) ...@@ -561,6 +563,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId)
cooked->name = NULL; cooked->name = NULL;
cooked->attnum = attnum; cooked->attnum = attnum;
cooked->expr = colDef->cooked_default; cooked->expr = colDef->cooked_default;
cooked->skip_validation = false;
cooked->is_local = true; /* not used for defaults */ cooked->is_local = true; /* not used for defaults */
cooked->inhcount = 0; /* ditto */ cooked->inhcount = 0; /* ditto */
cookedDefaults = lappend(cookedDefaults, cooked); cookedDefaults = lappend(cookedDefaults, cooked);
...@@ -1568,6 +1571,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, ...@@ -1568,6 +1571,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
cooked->name = pstrdup(name); cooked->name = pstrdup(name);
cooked->attnum = 0; /* not used for constraints */ cooked->attnum = 0; /* not used for constraints */
cooked->expr = expr; cooked->expr = expr;
cooked->skip_validation = false;
cooked->is_local = false; cooked->is_local = false;
cooked->inhcount = 1; cooked->inhcount = 1;
constraints = lappend(constraints, cooked); constraints = lappend(constraints, cooked);
...@@ -2933,7 +2937,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ...@@ -2933,7 +2937,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
ATPrepAddInherit(rel); ATPrepAddInherit(rel);
pass = AT_PASS_MISC; pass = AT_PASS_MISC;
break; break;
case AT_ValidateConstraint: case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
ATSimplePermissions(rel, ATT_TABLE);
/* Recursion occurs during execution phase */
/* No command-specific prep needed except saving recurse flag */
if (recurse)
cmd->subtype = AT_ValidateConstraintRecurse;
pass = AT_PASS_MISC;
break;
case AT_EnableTrig: /* ENABLE TRIGGER variants */ case AT_EnableTrig: /* ENABLE TRIGGER variants */
case AT_EnableAlwaysTrig: case AT_EnableAlwaysTrig:
case AT_EnableReplicaTrig: case AT_EnableReplicaTrig:
...@@ -3098,8 +3109,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -3098,8 +3109,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */ case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */
ATExecAddIndexConstraint(tab, rel, (IndexStmt *) cmd->def, lockmode); ATExecAddIndexConstraint(tab, rel, (IndexStmt *) cmd->def, lockmode);
break; break;
case AT_ValidateConstraint: case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
ATExecValidateConstraint(rel, cmd->name); ATExecValidateConstraint(rel, cmd->name, false, false, lockmode);
break;
case AT_ValidateConstraintRecurse: /* VALIDATE CONSTRAINT with
* recursion */
ATExecValidateConstraint(rel, cmd->name, true, false, lockmode);
break; break;
case AT_DropConstraint: /* DROP CONSTRAINT */ case AT_DropConstraint: /* DROP CONSTRAINT */
ATExecDropConstraint(rel, cmd->name, cmd->behavior, ATExecDropConstraint(rel, cmd->name, cmd->behavior,
...@@ -5384,19 +5399,23 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -5384,19 +5399,23 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
list_make1(copyObject(constr)), list_make1(copyObject(constr)),
recursing, !recursing); recursing, !recursing);
/* Add each constraint to Phase 3's queue */ /* Add each to-be-validated constraint to Phase 3's queue */
foreach(lcon, newcons) foreach(lcon, newcons)
{ {
CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon);
NewConstraint *newcon;
newcon = (NewConstraint *) palloc0(sizeof(NewConstraint)); if (!ccon->skip_validation)
newcon->name = ccon->name; {
newcon->contype = ccon->contype; NewConstraint *newcon;
/* ExecQual wants implicit-AND format */
newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr);
tab->constraints = lappend(tab->constraints, newcon); newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
newcon->name = ccon->name;
newcon->contype = ccon->contype;
/* ExecQual wants implicit-AND format */
newcon->qual = (Node *) make_ands_implicit((Expr *) ccon->expr);
tab->constraints = lappend(tab->constraints, newcon);
}
/* Save the actually assigned name if it was defaulted */ /* Save the actually assigned name if it was defaulted */
if (constr->conname == NULL) if (constr->conname == NULL)
...@@ -5755,9 +5774,15 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, ...@@ -5755,9 +5774,15 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
/* /*
* ALTER TABLE VALIDATE CONSTRAINT * ALTER TABLE VALIDATE CONSTRAINT
*
* XXX The reason we handle recursion here rather than at Phase 1 is because
* there's no good way to skip recursing when handling foreign keys: there is
* no need to lock children in that case, yet we wouldn't be able to avoid
* doing so at that level.
*/ */
static void static void
ATExecValidateConstraint(Relation rel, char *constrName) ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
bool recursing, LOCKMODE lockmode)
{ {
Relation conrel; Relation conrel;
SysScanDesc scan; SysScanDesc scan;
...@@ -5781,8 +5806,7 @@ ATExecValidateConstraint(Relation rel, char *constrName) ...@@ -5781,8 +5806,7 @@ ATExecValidateConstraint(Relation rel, char *constrName)
while (HeapTupleIsValid(tuple = systable_getnext(scan))) while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{ {
con = (Form_pg_constraint) GETSTRUCT(tuple); con = (Form_pg_constraint) GETSTRUCT(tuple);
if (con->contype == CONSTRAINT_FOREIGN && if (strcmp(NameStr(con->conname), constrName) == 0)
strcmp(NameStr(con->conname), constrName) == 0)
{ {
found = true; found = true;
break; break;
...@@ -5792,39 +5816,110 @@ ATExecValidateConstraint(Relation rel, char *constrName) ...@@ -5792,39 +5816,110 @@ ATExecValidateConstraint(Relation rel, char *constrName)
if (!found) if (!found)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT), (errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("foreign key constraint \"%s\" of relation \"%s\" does not exist", errmsg("constraint \"%s\" of relation \"%s\" does not exist",
constrName, RelationGetRelationName(rel))));
if (con->contype != CONSTRAINT_FOREIGN &&
con->contype != CONSTRAINT_CHECK)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("constraint \"%s\" of relation \"%s\" is not a foreign key or check constraint",
constrName, RelationGetRelationName(rel)))); constrName, RelationGetRelationName(rel))));
if (!con->convalidated) if (!con->convalidated)
{ {
Oid conid = HeapTupleGetOid(tuple); HeapTuple copyTuple;
HeapTuple copyTuple = heap_copytuple(tuple); Form_pg_constraint copy_con;
Form_pg_constraint copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
Relation refrel;
/* if (con->contype == CONSTRAINT_FOREIGN)
* Triggers are already in place on both tables, so a concurrent write {
* that alters the result here is not possible. Normally we can run a Oid conid = HeapTupleGetOid(tuple);
* query here to do the validation, which would only require Relation refrel;
* AccessShareLock. In some cases, it is possible that we might need
* to fire triggers to perform the check, so we take a lock at
* RowShareLock level just in case.
*/
refrel = heap_open(con->confrelid, RowShareLock);
validateForeignKeyConstraint(constrName, rel, refrel, /*
con->conindid, * Triggers are already in place on both tables, so a concurrent write
conid); * that alters the result here is not possible. Normally we can run a
* query here to do the validation, which would only require
* AccessShareLock. In some cases, it is possible that we might need
* to fire triggers to perform the check, so we take a lock at
* RowShareLock level just in case.
*/
refrel = heap_open(con->confrelid, RowShareLock);
validateForeignKeyConstraint(constrName, rel, refrel,
con->conindid,
conid);
heap_close(refrel, NoLock);
/*
* Foreign keys do not inherit, so we purposedly ignore the
* recursion bit here
*/
}
else if (con->contype == CONSTRAINT_CHECK)
{
List *children = NIL;
ListCell *child;
/*
* If we're recursing, the parent has already done this, so skip
* it.
*/
if (!recursing)
children = find_all_inheritors(RelationGetRelid(rel),
lockmode, NULL);
/*
* For CHECK constraints, we must ensure that we only mark the
* constraint as validated on the parent if it's already validated
* on the children.
*
* We recurse before validating on the parent, to reduce risk of
* deadlocks.
*/
foreach(child, children)
{
Oid childoid = lfirst_oid(child);
Relation childrel;
if (childoid == RelationGetRelid(rel))
continue;
/*
* If we are told not to recurse, there had better not be any
* child tables; else the addition would put them out of step.
*/
if (!recurse)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("constraint must be validated on child tables too")));
/* find_all_inheritors already got lock */
childrel = heap_open(childoid, NoLock);
ATExecValidateConstraint(childrel, constrName, false,
true, lockmode);
heap_close(childrel, NoLock);
}
validateCheckConstraint(rel, tuple);
/*
* Invalidate relcache so that others see the new validated
* constraint.
*/
CacheInvalidateRelcache(rel);
}
/* /*
* Now update the catalog, while we have the door open. * Now update the catalog, while we have the door open.
*/ */
copyTuple = heap_copytuple(tuple);
copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
copy_con->convalidated = true; copy_con->convalidated = true;
simple_heap_update(conrel, &copyTuple->t_self, copyTuple); simple_heap_update(conrel, &copyTuple->t_self, copyTuple);
CatalogUpdateIndexes(conrel, copyTuple); CatalogUpdateIndexes(conrel, copyTuple);
heap_freetuple(copyTuple); heap_freetuple(copyTuple);
heap_close(refrel, NoLock);
} }
systable_endscan(scan); systable_endscan(scan);
...@@ -6130,6 +6225,79 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts) ...@@ -6130,6 +6225,79 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
} }
} }
/*
* Scan the existing rows in a table to verify they meet a proposed
* CHECK constraint.
*
* The caller must have opened and locked the relation appropriately.
*/
static void
validateCheckConstraint(Relation rel, HeapTuple constrtup)
{
EState *estate;
Datum val;
char *conbin;
Expr *origexpr;
List *exprstate;
TupleDesc tupdesc;
HeapScanDesc scan;
HeapTuple tuple;
ExprContext *econtext;
MemoryContext oldcxt;
TupleTableSlot *slot;
Form_pg_constraint constrForm;
bool isnull;
constrForm = (Form_pg_constraint) GETSTRUCT(constrtup);
estate = CreateExecutorState();
/*
* XXX this tuple doesn't really come from a syscache, but this doesn't
* matter to SysCacheGetAttr, because it only wants to be able to fetch the
* tupdesc
*/
val = SysCacheGetAttr(CONSTROID, constrtup, Anum_pg_constraint_conbin,
&isnull);
if (isnull)
elog(ERROR, "null conbin for constraint %u",
HeapTupleGetOid(constrtup));
conbin = TextDatumGetCString(val);
origexpr = (Expr *) stringToNode(conbin);
exprstate = (List *)
ExecPrepareExpr((Expr *) make_ands_implicit(origexpr), estate);
econtext = GetPerTupleExprContext(estate);
tupdesc = RelationGetDescr(rel);
slot = MakeSingleTupleTableSlot(tupdesc);
econtext->ecxt_scantuple = slot;
scan = heap_beginscan(rel, SnapshotNow, 0, NULL);
/*
* Switch to per-tuple memory context and reset it for each tuple
* produced, so we don't leak memory.
*/
oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
if (!ExecQual(exprstate, econtext, true))
ereport(ERROR,
(errcode(ERRCODE_CHECK_VIOLATION),
errmsg("check constraint \"%s\" is violated by some row",
NameStr(constrForm->conname))));
ResetExprContext(econtext);
}
MemoryContextSwitchTo(oldcxt);
heap_endscan(scan);
ExecDropSingleTupleTableSlot(slot);
FreeExecutorState(estate);
}
/* /*
* Scan the existing rows in a table to verify they meet a proposed FK * Scan the existing rows in a table to verify they meet a proposed FK
* constraint. * constraint.
......
...@@ -86,6 +86,7 @@ static Oid findTypeSendFunction(List *procname, Oid typeOid); ...@@ -86,6 +86,7 @@ static Oid findTypeSendFunction(List *procname, Oid typeOid);
static Oid findTypeTypmodinFunction(List *procname); static Oid findTypeTypmodinFunction(List *procname);
static Oid findTypeTypmodoutFunction(List *procname); static Oid findTypeTypmodoutFunction(List *procname);
static Oid findTypeAnalyzeFunction(List *procname, Oid typeOid); static Oid findTypeAnalyzeFunction(List *procname, Oid typeOid);
static void validateDomainConstraint(Oid domainoid, char *ccbin);
static List *get_rels_with_domain(Oid domainOid, LOCKMODE lockmode); static List *get_rels_with_domain(Oid domainOid, LOCKMODE lockmode);
static void checkDomainOwner(HeapTuple tup); static void checkDomainOwner(HeapTuple tup);
static void checkEnumOwner(HeapTuple tup); static void checkEnumOwner(HeapTuple tup);
...@@ -1941,14 +1942,8 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) ...@@ -1941,14 +1942,8 @@ AlterDomainAddConstraint(List *names, Node *newConstraint)
Relation typrel; Relation typrel;
HeapTuple tup; HeapTuple tup;
Form_pg_type typTup; Form_pg_type typTup;
List *rels;
ListCell *rt;
EState *estate;
ExprContext *econtext;
char *ccbin;
Expr *expr;
ExprState *exprstate;
Constraint *constr; Constraint *constr;
char *ccbin;
/* Make a TypeName so we can use standard type lookup machinery */ /* Make a TypeName so we can use standard type lookup machinery */
typename = makeTypeNameFromNameList(names); typename = makeTypeNameFromNameList(names);
...@@ -2027,10 +2022,129 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) ...@@ -2027,10 +2022,129 @@ AlterDomainAddConstraint(List *names, Node *newConstraint)
constr, NameStr(typTup->typname)); constr, NameStr(typTup->typname));
/* /*
* Test all values stored in the attributes based on the domain the * If requested to validate the constraint, test all values stored in the
* constraint is being added to. * attributes based on the domain the constraint is being added to.
*/ */
expr = (Expr *) stringToNode(ccbin); if (!constr->skip_validation)
validateDomainConstraint(domainoid, ccbin);
/* Clean up */
heap_close(typrel, RowExclusiveLock);
}
/*
* AlterDomainValidateConstraint
*
* Implements the ALTER DOMAIN .. VALIDATE CONSTRAINT statement.
*/
void
AlterDomainValidateConstraint(List *names, char *constrName)
{
TypeName *typename;
Oid domainoid;
Relation typrel;
Relation conrel;
HeapTuple tup;
Form_pg_type typTup;
Form_pg_constraint con;
Form_pg_constraint copy_con;
char *conbin;
SysScanDesc scan;
Datum val;
bool found = false;
bool isnull;
HeapTuple tuple;
HeapTuple copyTuple;
ScanKeyData key;
/* Make a TypeName so we can use standard type lookup machinery */
typename = makeTypeNameFromNameList(names);
domainoid = typenameTypeId(NULL, typename);
/* Look up the domain in the type table */
typrel = heap_open(TypeRelationId, AccessShareLock);
tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(domainoid));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for type %u", domainoid);
typTup = (Form_pg_type) GETSTRUCT(tup);
/* Check it's a domain and check user has permission for ALTER DOMAIN */
checkDomainOwner(tup);
/*
* Find and check the target constraint
*/
conrel = heap_open(ConstraintRelationId, RowExclusiveLock);
ScanKeyInit(&key,
Anum_pg_constraint_contypid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(domainoid));
scan = systable_beginscan(conrel, ConstraintTypidIndexId,
true, SnapshotNow, 1, &key);
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
{
con = (Form_pg_constraint) GETSTRUCT(tuple);
if (strcmp(NameStr(con->conname), constrName) == 0)
{
found = true;
break;
}
}
if (!found)
{
con = NULL; /* keep compiler quiet */
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("constraint \"%s\" of domain \"%s\" does not exist",
constrName, NameStr(con->conname))));
}
if (con->contype != CONSTRAINT_CHECK)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("constraint \"%s\" of domain \"%s\" is not a check constraint",
constrName, NameStr(con->conname))));
val = SysCacheGetAttr(CONSTROID, tuple,
Anum_pg_constraint_conbin,
&isnull);
if (isnull)
elog(ERROR, "null conbin for constraint %u",
HeapTupleGetOid(tuple));
conbin = TextDatumGetCString(val);
validateDomainConstraint(domainoid, conbin);
/*
* Now update the catalog, while we have the door open.
*/
copyTuple = heap_copytuple(tuple);
copy_con = (Form_pg_constraint) GETSTRUCT(copyTuple);
copy_con->convalidated = true;
simple_heap_update(conrel, &copyTuple->t_self, copyTuple);
CatalogUpdateIndexes(conrel, copyTuple);
heap_freetuple(copyTuple);
systable_endscan(scan);
heap_close(typrel, AccessShareLock);
heap_close(conrel, RowExclusiveLock);
ReleaseSysCache(tup);
}
static void
validateDomainConstraint(Oid domainoid, char *ccbin)
{
Expr *expr = (Expr *) stringToNode(ccbin);
List *rels;
ListCell *rt;
EState *estate;
ExprContext *econtext;
ExprState *exprstate;
/* Need an EState to run ExecEvalExpr */ /* Need an EState to run ExecEvalExpr */
estate = CreateExecutorState(); estate = CreateExecutorState();
...@@ -2092,11 +2206,7 @@ AlterDomainAddConstraint(List *names, Node *newConstraint) ...@@ -2092,11 +2206,7 @@ AlterDomainAddConstraint(List *names, Node *newConstraint)
} }
FreeExecutorState(estate); FreeExecutorState(estate);
/* Clean up */
heap_close(typrel, RowExclusiveLock);
} }
/* /*
* get_rels_with_domain * get_rels_with_domain
* *
...@@ -2416,7 +2526,7 @@ domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, ...@@ -2416,7 +2526,7 @@ domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid,
CONSTRAINT_CHECK, /* Constraint Type */ CONSTRAINT_CHECK, /* Constraint Type */
false, /* Is Deferrable */ false, /* Is Deferrable */
false, /* Is Deferred */ false, /* Is Deferred */
true, /* Is Validated */ !constr->skip_validation, /* Is Validated */
InvalidOid, /* not a relation constraint */ InvalidOid, /* not a relation constraint */
NULL, NULL,
0, 0,
......
...@@ -552,7 +552,7 @@ get_relation_data_width(Oid relid, int32 *attr_widths) ...@@ -552,7 +552,7 @@ get_relation_data_width(Oid relid, int32 *attr_widths)
/* /*
* get_relation_constraints * get_relation_constraints
* *
* Retrieve the CHECK constraint expressions of the given relation. * Retrieve the validated CHECK constraint expressions of the given relation.
* *
* Returns a List (possibly empty) of constraint expressions. Each one * Returns a List (possibly empty) of constraint expressions. Each one
* has been canonicalized, and its Vars are changed to have the varno * has been canonicalized, and its Vars are changed to have the varno
...@@ -591,6 +591,13 @@ get_relation_constraints(PlannerInfo *root, ...@@ -591,6 +591,13 @@ get_relation_constraints(PlannerInfo *root,
{ {
Node *cexpr; Node *cexpr;
/*
* If this constraint hasn't been fully validated yet, we must
* ignore it here.
*/
if (!constr->check[i].ccvalid)
continue;
cexpr = stringToNode(constr->check[i].ccbin); cexpr = stringToNode(constr->check[i].ccbin);
/* /*
...@@ -663,7 +670,7 @@ get_relation_constraints(PlannerInfo *root, ...@@ -663,7 +670,7 @@ get_relation_constraints(PlannerInfo *root,
* *
* Detect whether the relation need not be scanned because it has either * Detect whether the relation need not be scanned because it has either
* self-inconsistent restrictions, or restrictions inconsistent with the * self-inconsistent restrictions, or restrictions inconsistent with the
* relation's CHECK constraints. * relation's validated CHECK constraints.
* *
* Note: this examines only rel->relid, rel->reloptkind, and * Note: this examines only rel->relid, rel->reloptkind, and
* rel->baserestrictinfo; therefore it can be called before filling in * rel->baserestrictinfo; therefore it can be called before filling in
......
...@@ -2759,8 +2759,9 @@ ConstraintElem: ...@@ -2759,8 +2759,9 @@ ConstraintElem:
n->raw_expr = $3; n->raw_expr = $3;
n->cooked_expr = NULL; n->cooked_expr = NULL;
processCASbits($5, @5, "CHECK", processCASbits($5, @5, "CHECK",
NULL, NULL, NULL, NULL, NULL, &n->skip_validation,
yyscanner); yyscanner);
n->initially_valid = !n->skip_validation;
$$ = (Node *)n; $$ = (Node *)n;
} }
| UNIQUE '(' columnList ')' opt_definition OptConsTableSpace | UNIQUE '(' columnList ')' opt_definition OptConsTableSpace
...@@ -7559,6 +7560,15 @@ AlterDomainStmt: ...@@ -7559,6 +7560,15 @@ AlterDomainStmt:
n->behavior = $7; n->behavior = $7;
$$ = (Node *)n; $$ = (Node *)n;
} }
/* ALTER DOMAIN <domain> VALIDATE CONSTRAINT <name> */
| ALTER DOMAIN_P any_name VALIDATE CONSTRAINT name
{
AlterDomainStmt *n = makeNode(AlterDomainStmt);
n->subtype = 'V';
n->typeName = $3;
n->name = $6;
$$ = (Node *)n;
}
; ;
opt_as: AS {} opt_as: AS {}
......
...@@ -820,6 +820,10 @@ standard_ProcessUtility(Node *parsetree, ...@@ -820,6 +820,10 @@ standard_ProcessUtility(Node *parsetree,
stmt->name, stmt->name,
stmt->behavior); stmt->behavior);
break; break;
case 'V': /* VALIDATE CONSTRAINT */
AlterDomainValidateConstraint(stmt->typeName,
stmt->name);
break;
default: /* oops */ default: /* oops */
elog(ERROR, "unrecognized alter domain type: %d", elog(ERROR, "unrecognized alter domain type: %d",
(int) stmt->subtype); (int) stmt->subtype);
......
...@@ -3251,6 +3251,7 @@ CheckConstraintFetch(Relation relation) ...@@ -3251,6 +3251,7 @@ CheckConstraintFetch(Relation relation)
elog(ERROR, "unexpected constraint record found for rel %s", elog(ERROR, "unexpected constraint record found for rel %s",
RelationGetRelationName(relation)); RelationGetRelationName(relation));
check[found].ccvalid = conform->convalidated;
check[found].ccname = MemoryContextStrdup(CacheMemoryContext, check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
NameStr(conform->conname)); NameStr(conform->conname));
......
...@@ -29,6 +29,7 @@ typedef struct constrCheck ...@@ -29,6 +29,7 @@ typedef struct constrCheck
{ {
char *ccname; char *ccname;
char *ccbin; /* nodeToString representation of expr */ char *ccbin; /* nodeToString representation of expr */
bool ccvalid;
} ConstrCheck; } ConstrCheck;
/* This structure contains constraints of a tuple */ /* This structure contains constraints of a tuple */
......
...@@ -30,6 +30,7 @@ typedef struct CookedConstraint ...@@ -30,6 +30,7 @@ typedef struct CookedConstraint
char *name; /* name, or NULL if none */ char *name; /* name, or NULL if none */
AttrNumber attnum; /* which attr (only for DEFAULT) */ AttrNumber attnum; /* which attr (only for DEFAULT) */
Node *expr; /* transformed default or check expr */ Node *expr; /* transformed default or check expr */
bool skip_validation; /* skip validation? (only for CHECK) */
bool is_local; /* constraint has local (non-inherited) def */ bool is_local; /* constraint has local (non-inherited) def */
int inhcount; /* number of times constraint is inherited */ int inhcount; /* number of times constraint is inherited */
} CookedConstraint; } CookedConstraint;
......
...@@ -31,6 +31,7 @@ extern Oid AssignTypeArrayOid(void); ...@@ -31,6 +31,7 @@ extern Oid AssignTypeArrayOid(void);
extern void AlterDomainDefault(List *names, Node *defaultRaw); extern void AlterDomainDefault(List *names, Node *defaultRaw);
extern void AlterDomainNotNull(List *names, bool notNull); extern void AlterDomainNotNull(List *names, bool notNull);
extern void AlterDomainAddConstraint(List *names, Node *constr); extern void AlterDomainAddConstraint(List *names, Node *constr);
extern void AlterDomainValidateConstraint(List *names, char *constrName);
extern void AlterDomainDropConstraint(List *names, const char *constrName, extern void AlterDomainDropConstraint(List *names, const char *constrName,
DropBehavior behavior); DropBehavior behavior);
......
...@@ -1190,6 +1190,7 @@ typedef enum AlterTableType ...@@ -1190,6 +1190,7 @@ typedef enum AlterTableType
AT_AddConstraint, /* add constraint */ AT_AddConstraint, /* add constraint */
AT_AddConstraintRecurse, /* internal to commands/tablecmds.c */ AT_AddConstraintRecurse, /* internal to commands/tablecmds.c */
AT_ValidateConstraint, /* validate constraint */ AT_ValidateConstraint, /* validate constraint */
AT_ValidateConstraintRecurse, /* internal to commands/tablecmds.c */
AT_ProcessedConstraint, /* pre-processed add constraint (local in AT_ProcessedConstraint, /* pre-processed add constraint (local in
* parser/parse_utilcmd.c) */ * parser/parse_utilcmd.c) */
AT_AddIndexConstraint, /* add constraint using existing index */ AT_AddIndexConstraint, /* add constraint using existing index */
...@@ -1543,6 +1544,8 @@ typedef struct Constraint ...@@ -1543,6 +1544,8 @@ typedef struct Constraint
char fk_matchtype; /* FULL, PARTIAL, UNSPECIFIED */ char fk_matchtype; /* FULL, PARTIAL, UNSPECIFIED */
char fk_upd_action; /* ON UPDATE action */ char fk_upd_action; /* ON UPDATE action */
char fk_del_action; /* ON DELETE action */ char fk_del_action; /* ON DELETE action */
/* Fields used for constraints that allow a NOT VALID specification */
bool skip_validation; /* skip validation of existing rows? */ bool skip_validation; /* skip validation of existing rows? */
bool initially_valid; /* mark the new constraint as valid? */ bool initially_valid; /* mark the new constraint as valid? */
} Constraint; } Constraint;
......
...@@ -196,14 +196,115 @@ DELETE FROM tmp3 where a=5; ...@@ -196,14 +196,115 @@ DELETE FROM tmp3 where a=5;
-- Try (and succeed) and repeat to show it works on already valid constraint -- Try (and succeed) and repeat to show it works on already valid constraint
ALTER TABLE tmp3 validate constraint tmpconstr; ALTER TABLE tmp3 validate constraint tmpconstr;
ALTER TABLE tmp3 validate constraint tmpconstr; ALTER TABLE tmp3 validate constraint tmpconstr;
-- Try a non-verified CHECK constraint
ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10); -- fail
ERROR: check constraint "b_greater_than_ten" is violated by some row
ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10) NOT VALID; -- succeeds
ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- fails
ERROR: check constraint "b_greater_than_ten" is violated by some row
DELETE FROM tmp3 WHERE NOT b > 10;
ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds
ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds
-- Test inherited NOT VALID CHECK constraints
select * from tmp3;
a | b
---+----
1 | 20
(1 row)
CREATE TABLE tmp6 () INHERITS (tmp3);
CREATE TABLE tmp7 () INHERITS (tmp3);
INSERT INTO tmp6 VALUES (6, 30), (7, 16);
ALTER TABLE tmp3 ADD CONSTRAINT b_le_20 CHECK (b <= 20) NOT VALID;
ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- fails
ERROR: check constraint "b_le_20" is violated by some row
DELETE FROM tmp6 WHERE b > 20;
ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- succeeds
-- An already validated constraint must not be revalidated
CREATE FUNCTION boo(int) RETURNS int IMMUTABLE STRICT LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'boo: %', $1; RETURN $1; END; $$;
INSERT INTO tmp7 VALUES (8, 18);
ALTER TABLE tmp7 ADD CONSTRAINT identity CHECK (b = boo(b));
NOTICE: boo: 18
ALTER TABLE tmp3 ADD CONSTRAINT IDENTITY check (b = boo(b)) NOT VALID;
NOTICE: merging constraint "identity" with inherited definition
ALTER TABLE tmp3 VALIDATE CONSTRAINT identity;
NOTICE: boo: 16
NOTICE: boo: 20
-- Try (and fail) to create constraint from tmp5(a) to tmp4(a) - unique constraint on -- Try (and fail) to create constraint from tmp5(a) to tmp4(a) - unique constraint on
-- tmp4 is a,b -- tmp4 is a,b
ALTER TABLE tmp5 add constraint tmpconstr foreign key(a) references tmp4(a) match full; ALTER TABLE tmp5 add constraint tmpconstr foreign key(a) references tmp4(a) match full;
ERROR: there is no unique constraint matching given keys for referenced table "tmp4" ERROR: there is no unique constraint matching given keys for referenced table "tmp4"
DROP TABLE tmp7;
DROP TABLE tmp6;
DROP TABLE tmp5; DROP TABLE tmp5;
DROP TABLE tmp4; DROP TABLE tmp4;
DROP TABLE tmp3; DROP TABLE tmp3;
DROP TABLE tmp2; DROP TABLE tmp2;
-- NOT VALID with plan invalidation -- ensure we don't use a constraint for
-- exclusion until validated
set constraint_exclusion TO 'partition';
create table nv_parent (d date);
create table nv_child_2010 () inherits (nv_parent);
create table nv_child_2011 () inherits (nv_parent);
alter table nv_child_2010 add check (d between '2010-01-01'::date and '2010-12-31'::date) not valid;
alter table nv_child_2011 add check (d between '2011-01-01'::date and '2011-12-31'::date) not valid;
explain (costs off) select * from nv_parent where d between '2011-08-01' and '2011-08-31';
QUERY PLAN
---------------------------------------------------------------------------------
Result
-> Append
-> Seq Scan on nv_parent
Filter: ((d >= '08-01-2011'::date) AND (d <= '08-31-2011'::date))
-> Seq Scan on nv_child_2010 nv_parent
Filter: ((d >= '08-01-2011'::date) AND (d <= '08-31-2011'::date))
-> Seq Scan on nv_child_2011 nv_parent
Filter: ((d >= '08-01-2011'::date) AND (d <= '08-31-2011'::date))
(8 rows)
create table nv_child_2009 (check (d between '2009-01-01'::date and '2009-12-31'::date)) inherits (nv_parent);
explain (costs off) select * from nv_parent where d between '2011-08-01'::date and '2011-08-31'::date;
QUERY PLAN
---------------------------------------------------------------------------------
Result
-> Append
-> Seq Scan on nv_parent
Filter: ((d >= '08-01-2011'::date) AND (d <= '08-31-2011'::date))
-> Seq Scan on nv_child_2010 nv_parent
Filter: ((d >= '08-01-2011'::date) AND (d <= '08-31-2011'::date))
-> Seq Scan on nv_child_2011 nv_parent
Filter: ((d >= '08-01-2011'::date) AND (d <= '08-31-2011'::date))
(8 rows)
explain (costs off) select * from nv_parent where d between '2009-08-01'::date and '2009-08-31'::date;
QUERY PLAN
---------------------------------------------------------------------------------
Result
-> Append
-> Seq Scan on nv_parent
Filter: ((d >= '08-01-2009'::date) AND (d <= '08-31-2009'::date))
-> Seq Scan on nv_child_2010 nv_parent
Filter: ((d >= '08-01-2009'::date) AND (d <= '08-31-2009'::date))
-> Seq Scan on nv_child_2011 nv_parent
Filter: ((d >= '08-01-2009'::date) AND (d <= '08-31-2009'::date))
-> Seq Scan on nv_child_2009 nv_parent
Filter: ((d >= '08-01-2009'::date) AND (d <= '08-31-2009'::date))
(10 rows)
-- after validation, the constraint should be used
alter table nv_child_2011 VALIDATE CONSTRAINT nv_child_2011_d_check;
explain (costs off) select * from nv_parent where d between '2009-08-01'::date and '2009-08-31'::date;
QUERY PLAN
---------------------------------------------------------------------------------
Result
-> Append
-> Seq Scan on nv_parent
Filter: ((d >= '08-01-2009'::date) AND (d <= '08-31-2009'::date))
-> Seq Scan on nv_child_2010 nv_parent
Filter: ((d >= '08-01-2009'::date) AND (d <= '08-31-2009'::date))
-> Seq Scan on nv_child_2009 nv_parent
Filter: ((d >= '08-01-2009'::date) AND (d <= '08-31-2009'::date))
(8 rows)
-- Foreign key adding test with mixed types -- Foreign key adding test with mixed types
-- Note: these tables are TEMP to avoid name conflicts when this test -- Note: these tables are TEMP to avoid name conflicts when this test
-- is run in parallel with foreign_key.sql. -- is run in parallel with foreign_key.sql.
......
...@@ -352,6 +352,17 @@ alter domain con drop constraint t; ...@@ -352,6 +352,17 @@ alter domain con drop constraint t;
insert into domcontest values (-5); --fails insert into domcontest values (-5); --fails
ERROR: value for domain con violates check constraint "con_check" ERROR: value for domain con violates check constraint "con_check"
insert into domcontest values (42); insert into domcontest values (42);
-- Test ALTER DOMAIN .. CONSTRAINT .. NOT VALID
create domain things AS INT;
CREATE TABLE thethings (stuff things);
INSERT INTO thethings (stuff) VALUES (55);
ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11);
ERROR: column "stuff" of table "thethings" contains values that violate the new constraint
ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11) NOT VALID;
ALTER DOMAIN things VALIDATE CONSTRAINT meow;
ERROR: column "stuff" of table "thethings" contains values that violate the new constraint
UPDATE thethings SET stuff = 10;
ALTER DOMAIN things VALIDATE CONSTRAINT meow;
-- Confirm ALTER DOMAIN with RULES. -- Confirm ALTER DOMAIN with RULES.
create table domtab (col1 integer); create table domtab (col1 integer);
create domain dom as integer; create domain dom as integer;
......
...@@ -236,12 +236,41 @@ DELETE FROM tmp3 where a=5; ...@@ -236,12 +236,41 @@ DELETE FROM tmp3 where a=5;
ALTER TABLE tmp3 validate constraint tmpconstr; ALTER TABLE tmp3 validate constraint tmpconstr;
ALTER TABLE tmp3 validate constraint tmpconstr; ALTER TABLE tmp3 validate constraint tmpconstr;
-- Try a non-verified CHECK constraint
ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10); -- fail
ALTER TABLE tmp3 ADD CONSTRAINT b_greater_than_ten CHECK (b > 10) NOT VALID; -- succeeds
ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- fails
DELETE FROM tmp3 WHERE NOT b > 10;
ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds
ALTER TABLE tmp3 VALIDATE CONSTRAINT b_greater_than_ten; -- succeeds
-- Test inherited NOT VALID CHECK constraints
select * from tmp3;
CREATE TABLE tmp6 () INHERITS (tmp3);
CREATE TABLE tmp7 () INHERITS (tmp3);
INSERT INTO tmp6 VALUES (6, 30), (7, 16);
ALTER TABLE tmp3 ADD CONSTRAINT b_le_20 CHECK (b <= 20) NOT VALID;
ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- fails
DELETE FROM tmp6 WHERE b > 20;
ALTER TABLE tmp3 VALIDATE CONSTRAINT b_le_20; -- succeeds
-- An already validated constraint must not be revalidated
CREATE FUNCTION boo(int) RETURNS int IMMUTABLE STRICT LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'boo: %', $1; RETURN $1; END; $$;
INSERT INTO tmp7 VALUES (8, 18);
ALTER TABLE tmp7 ADD CONSTRAINT identity CHECK (b = boo(b));
ALTER TABLE tmp3 ADD CONSTRAINT IDENTITY check (b = boo(b)) NOT VALID;
ALTER TABLE tmp3 VALIDATE CONSTRAINT identity;
-- Try (and fail) to create constraint from tmp5(a) to tmp4(a) - unique constraint on -- Try (and fail) to create constraint from tmp5(a) to tmp4(a) - unique constraint on
-- tmp4 is a,b -- tmp4 is a,b
ALTER TABLE tmp5 add constraint tmpconstr foreign key(a) references tmp4(a) match full; ALTER TABLE tmp5 add constraint tmpconstr foreign key(a) references tmp4(a) match full;
DROP TABLE tmp7;
DROP TABLE tmp6;
DROP TABLE tmp5; DROP TABLE tmp5;
DROP TABLE tmp4; DROP TABLE tmp4;
...@@ -250,6 +279,23 @@ DROP TABLE tmp3; ...@@ -250,6 +279,23 @@ DROP TABLE tmp3;
DROP TABLE tmp2; DROP TABLE tmp2;
-- NOT VALID with plan invalidation -- ensure we don't use a constraint for
-- exclusion until validated
set constraint_exclusion TO 'partition';
create table nv_parent (d date);
create table nv_child_2010 () inherits (nv_parent);
create table nv_child_2011 () inherits (nv_parent);
alter table nv_child_2010 add check (d between '2010-01-01'::date and '2010-12-31'::date) not valid;
alter table nv_child_2011 add check (d between '2011-01-01'::date and '2011-12-31'::date) not valid;
explain (costs off) select * from nv_parent where d between '2011-08-01' and '2011-08-31';
create table nv_child_2009 (check (d between '2009-01-01'::date and '2009-12-31'::date)) inherits (nv_parent);
explain (costs off) select * from nv_parent where d between '2011-08-01'::date and '2011-08-31'::date;
explain (costs off) select * from nv_parent where d between '2009-08-01'::date and '2009-08-31'::date;
-- after validation, the constraint should be used
alter table nv_child_2011 VALIDATE CONSTRAINT nv_child_2011_d_check;
explain (costs off) select * from nv_parent where d between '2009-08-01'::date and '2009-08-31'::date;
-- Foreign key adding test with mixed types -- Foreign key adding test with mixed types
-- Note: these tables are TEMP to avoid name conflicts when this test -- Note: these tables are TEMP to avoid name conflicts when this test
......
...@@ -259,6 +259,16 @@ alter domain con drop constraint t; ...@@ -259,6 +259,16 @@ alter domain con drop constraint t;
insert into domcontest values (-5); --fails insert into domcontest values (-5); --fails
insert into domcontest values (42); insert into domcontest values (42);
-- Test ALTER DOMAIN .. CONSTRAINT .. NOT VALID
create domain things AS INT;
CREATE TABLE thethings (stuff things);
INSERT INTO thethings (stuff) VALUES (55);
ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11);
ALTER DOMAIN things ADD CONSTRAINT meow CHECK (VALUE < 11) NOT VALID;
ALTER DOMAIN things VALIDATE CONSTRAINT meow;
UPDATE thethings SET stuff = 10;
ALTER DOMAIN things VALIDATE CONSTRAINT meow;
-- Confirm ALTER DOMAIN with RULES. -- Confirm ALTER DOMAIN with RULES.
create table domtab (col1 integer); create table domtab (col1 integer);
create domain dom as integer; create domain dom as integer;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment