Commit ecfe59e5 authored by Robert Haas's avatar Robert Haas

Refactor validation of new partitions a little bit.

Move some logic that is currently in ATExecAttachPartition to
separate functions to facilitate future code reuse.

Ashutosh Bapat and Jeevan Ladhe

Discussion: http://postgr.es/m/CA+Tgmobbnamyvii0pRdg9pp_jLHSUvq7u5SiRrVV0tEFFU58Tg@mail.gmail.com
parent 1e56883a
...@@ -473,6 +473,11 @@ static void CreateInheritance(Relation child_rel, Relation parent_rel); ...@@ -473,6 +473,11 @@ static void CreateInheritance(Relation child_rel, Relation parent_rel);
static void RemoveInheritance(Relation child_rel, Relation parent_rel); static void RemoveInheritance(Relation child_rel, Relation parent_rel);
static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel, static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel,
PartitionCmd *cmd); PartitionCmd *cmd);
static bool PartConstraintImpliedByRelConstraint(Relation scanrel,
List *partConstraint);
static void ValidatePartitionConstraints(List **wqueue, Relation scanrel,
List *scanrel_children,
List *partConstraint);
static ObjectAddress ATExecDetachPartition(Relation rel, RangeVar *name); static ObjectAddress ATExecDetachPartition(Relation rel, RangeVar *name);
...@@ -13424,6 +13429,169 @@ ComputePartitionAttrs(Relation rel, List *partParams, AttrNumber *partattrs, ...@@ -13424,6 +13429,169 @@ ComputePartitionAttrs(Relation rel, List *partParams, AttrNumber *partattrs,
} }
} }
/*
* PartConstraintImpliedByRelConstraint
* Does scanrel's existing constraints imply the partition constraint?
*
* Existing constraints includes its check constraints and column-level
* NOT NULL constraints and partConstraint describes the partition constraint.
*/
static bool
PartConstraintImpliedByRelConstraint(Relation scanrel,
List *partConstraint)
{
List *existConstraint = NIL;
TupleConstr *constr = RelationGetDescr(scanrel)->constr;
int num_check,
i;
if (constr && constr->has_not_null)
{
int natts = scanrel->rd_att->natts;
for (i = 1; i <= natts; i++)
{
Form_pg_attribute att = scanrel->rd_att->attrs[i - 1];
if (att->attnotnull && !att->attisdropped)
{
NullTest *ntest = makeNode(NullTest);
ntest->arg = (Expr *) makeVar(1,
i,
att->atttypid,
att->atttypmod,
att->attcollation,
0);
ntest->nulltesttype = IS_NOT_NULL;
/*
* argisrow=false is correct even for a composite column,
* because attnotnull does not represent a SQL-spec IS NOT
* NULL test in such a case, just IS DISTINCT FROM NULL.
*/
ntest->argisrow = false;
ntest->location = -1;
existConstraint = lappend(existConstraint, ntest);
}
}
}
num_check = (constr != NULL) ? constr->num_check : 0;
for (i = 0; i < num_check; i++)
{
Node *cexpr;
/*
* If this constraint hasn't been fully validated yet, we must ignore
* it here.
*/
if (!constr->check[i].ccvalid)
continue;
cexpr = stringToNode(constr->check[i].ccbin);
/*
* Run each expression through const-simplification and
* canonicalization. It is necessary, because we will be comparing it
* to similarly-processed partition constraint expressions, and may
* fail to detect valid matches without this.
*/
cexpr = eval_const_expressions(NULL, cexpr);
cexpr = (Node *) canonicalize_qual((Expr *) cexpr);
existConstraint = list_concat(existConstraint,
make_ands_implicit((Expr *) cexpr));
}
if (existConstraint != NIL)
existConstraint = list_make1(make_ands_explicit(existConstraint));
/* And away we go ... */
return predicate_implied_by(partConstraint, existConstraint, true);
}
/*
* ValidatePartitionConstraints
*
* Check whether all rows in the given table obey the given partition
* constraint; if so, it can be attached as a partition.  We do this by
* scanning the table (or all of its leaf partitions) row by row, except when
* the existing constraints are sufficient to prove that the new partitioning
* constraint must already hold.
*/
static void
ValidatePartitionConstraints(List **wqueue, Relation scanrel,
List *scanrel_children,
List *partConstraint)
{
bool found_whole_row;
ListCell *lc;
if (partConstraint == NIL)
return;
/*
* Based on the table's existing constraints, determine if we can skip
* scanning the table to validate the partition constraint.
*/
if (PartConstraintImpliedByRelConstraint(scanrel, partConstraint))
{
ereport(INFO,
(errmsg("partition constraint for table \"%s\" is implied by existing constraints",
RelationGetRelationName(scanrel))));
return;
}
/* Constraints proved insufficient, so we need to scan the table. */
foreach(lc, scanrel_children)
{
AlteredTableInfo *tab;
Oid part_relid = lfirst_oid(lc);
Relation part_rel;
List *my_partconstr = partConstraint;
/* Lock already taken */
if (part_relid != RelationGetRelid(scanrel))
part_rel = heap_open(part_relid, NoLock);
else
part_rel = scanrel;
/*
* Skip if the partition is itself a partitioned table. We can only
* ever scan RELKIND_RELATION relations.
*/
if (part_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
if (part_rel != scanrel)
heap_close(part_rel, NoLock);
continue;
}
if (part_rel != scanrel)
{
/*
* Adjust the constraint for scanrel so that it matches this
* partition's attribute numbers.
*/
my_partconstr = map_partition_varattnos(my_partconstr, 1,
part_rel, scanrel,
&found_whole_row);
/* There can never be a whole-row reference here */
if (found_whole_row)
elog(ERROR, "unexpected whole-row reference found in partition key");
}
/* Grab a work queue entry. */
tab = ATGetQueueEntry(wqueue, part_rel);
tab->partition_constraint = (Expr *) linitial(my_partconstr);
/* keep our lock until commit */
if (part_rel != scanrel)
heap_close(part_rel, NoLock);
}
}
/* /*
* ALTER TABLE <name> ATTACH PARTITION <partition-name> FOR VALUES * ALTER TABLE <name> ATTACH PARTITION <partition-name> FOR VALUES
* *
...@@ -13435,15 +13603,12 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) ...@@ -13435,15 +13603,12 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
Relation attachrel, Relation attachrel,
catalog; catalog;
List *attachrel_children; List *attachrel_children;
TupleConstr *attachrel_constr; List *partConstraint;
List *partConstraint,
*existConstraint;
SysScanDesc scan; SysScanDesc scan;
ScanKeyData skey; ScanKeyData skey;
AttrNumber attno; AttrNumber attno;
int natts; int natts;
TupleDesc tupleDesc; TupleDesc tupleDesc;
bool skip_validate = false;
ObjectAddress address; ObjectAddress address;
const char *trigger_name; const char *trigger_name;
bool found_whole_row; bool found_whole_row;
...@@ -13637,148 +13802,9 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd) ...@@ -13637,148 +13802,9 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
if (found_whole_row) if (found_whole_row)
elog(ERROR, "unexpected whole-row reference found in partition key"); elog(ERROR, "unexpected whole-row reference found in partition key");
/* /* Validate partition constraints against the table being attached. */
* Check if we can do away with having to scan the table being attached to ValidatePartitionConstraints(wqueue, attachrel, attachrel_children,
* validate the partition constraint, by *proving* that the existing partConstraint);
* constraints of the table *imply* the partition predicate. We include
* the table's check constraints and NOT NULL constraints in the list of
* clauses passed to predicate_implied_by().
*
* There is a case in which we cannot rely on just the result of the
* proof.
*/
attachrel_constr = tupleDesc->constr;
existConstraint = NIL;
if (attachrel_constr != NULL)
{
int num_check = attachrel_constr->num_check;
int i;
if (attachrel_constr->has_not_null)
{
int natts = attachrel->rd_att->natts;
for (i = 1; i <= natts; i++)
{
Form_pg_attribute att = attachrel->rd_att->attrs[i - 1];
if (att->attnotnull && !att->attisdropped)
{
NullTest *ntest = makeNode(NullTest);
ntest->arg = (Expr *) makeVar(1,
i,
att->atttypid,
att->atttypmod,
att->attcollation,
0);
ntest->nulltesttype = IS_NOT_NULL;
/*
* argisrow=false is correct even for a composite column,
* because attnotnull does not represent a SQL-spec IS NOT
* NULL test in such a case, just IS DISTINCT FROM NULL.
*/
ntest->argisrow = false;
ntest->location = -1;
existConstraint = lappend(existConstraint, ntest);
}
}
}
for (i = 0; i < num_check; i++)
{
Node *cexpr;
/*
* If this constraint hasn't been fully validated yet, we must
* ignore it here.
*/
if (!attachrel_constr->check[i].ccvalid)
continue;
cexpr = stringToNode(attachrel_constr->check[i].ccbin);
/*
* Run each expression through const-simplification and
* canonicalization. It is necessary, because we will be
* comparing it to similarly-processed qual clauses, and may fail
* to detect valid matches without this.
*/
cexpr = eval_const_expressions(NULL, cexpr);
cexpr = (Node *) canonicalize_qual((Expr *) cexpr);
existConstraint = list_concat(existConstraint,
make_ands_implicit((Expr *) cexpr));
}
existConstraint = list_make1(make_ands_explicit(existConstraint));
/* And away we go ... */
if (predicate_implied_by(partConstraint, existConstraint, true))
skip_validate = true;
}
if (skip_validate)
{
/* No need to scan the table after all. */
ereport(INFO,
(errmsg("partition constraint for table \"%s\" is implied by existing constraints",
RelationGetRelationName(attachrel))));
}
else
{
/* Constraints proved insufficient, so we need to scan the table. */
ListCell *lc;
foreach(lc, attachrel_children)
{
AlteredTableInfo *tab;
Oid part_relid = lfirst_oid(lc);
Relation part_rel;
List *my_partconstr = partConstraint;
/* Lock already taken */
if (part_relid != RelationGetRelid(attachrel))
part_rel = heap_open(part_relid, NoLock);
else
part_rel = attachrel;
/*
* Skip if the partition is itself a partitioned table. We can
* only ever scan RELKIND_RELATION relations.
*/
if (part_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
if (part_rel != attachrel)
heap_close(part_rel, NoLock);
continue;
}
if (part_rel != attachrel)
{
/*
* Adjust the constraint that we constructed above for
* attachRel so that it matches this partition's attribute
* numbers.
*/
my_partconstr = map_partition_varattnos(my_partconstr, 1,
part_rel, attachrel,
&found_whole_row);
/* There can never be a whole-row reference here */
if (found_whole_row)
elog(ERROR, "unexpected whole-row reference found in partition key");
}
/* Grab a work queue entry. */
tab = ATGetQueueEntry(wqueue, part_rel);
tab->partition_constraint = (Expr *) linitial(my_partconstr);
/* keep our lock until commit */
if (part_rel != attachrel)
heap_close(part_rel, NoLock);
}
}
ObjectAddressSet(address, RelationRelationId, RelationGetRelid(attachrel)); ObjectAddressSet(address, RelationRelationId, RelationGetRelid(attachrel));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment