Commit 15ce775f authored by Robert Haas's avatar Robert Haas

Prevent BEFORE triggers from violating partitioning constraints.

Since tuple-routing implicitly checks the partitioning constraints
at least for the levels of the partitioning hierarchy it traverses,
there's normally no need to revalidate the partitioning constraint
after performing tuple routing.  However, if there's a BEFORE trigger
on the target partition, it could modify the tuple, causing the
partitioning constraint to be violated.  Catch that case.

Also, instead of checking the root table's partition constraint after
tuple-routing, check it beforehand.  Otherwise, the rules for when
the partitioning constraint gets checked get too complicated, because
you sometimes have to check part of the constraint but not all of it.
This effectively reverts commit 39162b20
in favor of a different approach altogether.

Report by me.  Initial debugging by Jeevan Ladhe.  Patch by Amit
Langote, reviewed by me.

Discussion: http://postgr.es/m/CA+Tgmoa9DTgeVOqopieV8d1QRpddmP65aCdxyjdYDoEO5pS5KA@mail.gmail.com
parent e6c33d59
......@@ -2640,9 +2640,24 @@ CopyFrom(CopyState cstate)
}
else
{
/*
* We always check the partition constraint, including when
* the tuple got here via tuple-routing. However we don't
* need to in the latter case if no BR trigger is defined on
* the partition. Note that a BR trigger might modify the
* tuple such that the partition constraint is no longer
* satisfied, so we need to check in that case.
*/
bool check_partition_constr =
(resultRelInfo->ri_PartitionCheck != NIL);
if (saved_resultRelInfo != NULL &&
!(resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_before_row))
check_partition_constr = false;
/* Check the constraints of the tuple */
if (cstate->rel->rd_att->constr ||
resultRelInfo->ri_PartitionCheck)
if (cstate->rel->rd_att->constr || check_partition_constr)
ExecConstraints(resultRelInfo, slot, estate);
if (useHeapMultiInsert)
......
......@@ -103,6 +103,8 @@ static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
int maxfieldlen);
static void EvalPlanQualStart(EPQState *epqstate, EState *parentestate,
Plan *planTree);
static void ExecPartitionCheck(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate);
/*
* Note that GetUpdatedColumns() also exists in commands/trigger.c. There does
......@@ -1339,34 +1341,19 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_projectReturning = NULL;
/*
* If partition_root has been specified, that means we are building the
* ResultRelInfo for one of its leaf partitions. In that case, we need
* *not* initialize the leaf partition's constraint, but rather the
* partition_root's (if any). We must do that explicitly like this,
* because implicit partition constraints are not inherited like user-
* defined constraints and would fail to be enforced by ExecConstraints()
* after a tuple is routed to a leaf partition.
* Partition constraint, which also includes the partition constraint of
* all the ancestors that are partitions. Note that it will be checked
* even in the case of tuple-routing where this table is the target leaf
* partition, if there any BR triggers defined on the table. Although
* tuple-routing implicitly preserves the partition constraint of the
* target partition for a given row, the BR triggers may change the row
* such that the constraint is no longer satisfied, which we must fail
* for by checking it explicitly.
*
* If this is a partitioned table, the partition constraint (if any) of a
* given row will be checked just before performing tuple-routing.
*/
if (partition_root)
{
/*
* Root table itself may or may not be a partition; partition_check
* would be NIL in the latter case.
*/
partition_check = RelationGetPartitionQual(partition_root);
/*
* This is not our own partition constraint, but rather an ancestor's.
* So any Vars in it bear the ancestor's attribute numbers. We must
* switch them to our own. (dummy varno = 1)
*/
if (partition_check != NIL)
partition_check = map_partition_varattnos(partition_check, 1,
resultRelationDesc,
partition_root);
}
else
partition_check = RelationGetPartitionQual(resultRelationDesc);
partition_check = RelationGetPartitionQual(resultRelationDesc);
resultRelInfo->ri_PartitionCheck = partition_check;
resultRelInfo->ri_PartitionRoot = partition_root;
......@@ -1835,13 +1822,16 @@ ExecRelCheck(ResultRelInfo *resultRelInfo,
/*
* ExecPartitionCheck --- check that tuple meets the partition constraint.
*
* Note: This is called *iff* resultRelInfo is the main target table.
*/
static bool
static void
ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
EState *estate)
{
Relation rel = resultRelInfo->ri_RelationDesc;
TupleDesc tupdesc = RelationGetDescr(rel);
Bitmapset *modifiedCols;
Bitmapset *insertedCols;
Bitmapset *updatedCols;
ExprContext *econtext;
/*
......@@ -1869,7 +1859,44 @@ ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
* As in case of the catalogued constraints, we treat a NULL result as
* success here, not a failure.
*/
return ExecCheck(resultRelInfo->ri_PartitionCheckExpr, econtext);
if (!ExecCheck(resultRelInfo->ri_PartitionCheckExpr, econtext))
{
char *val_desc;
Relation orig_rel = rel;
/* See the comment above. */
if (resultRelInfo->ri_PartitionRoot)
{
HeapTuple tuple = ExecFetchSlotTuple(slot);
TupleDesc old_tupdesc = RelationGetDescr(rel);
TupleConversionMap *map;
rel = resultRelInfo->ri_PartitionRoot;
tupdesc = RelationGetDescr(rel);
/* a reverse map */
map = convert_tuples_by_name(old_tupdesc, tupdesc,
gettext_noop("could not convert row type"));
if (map != NULL)
{
tuple = do_convert_tuple(tuple, map);
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
}
}
insertedCols = GetInsertedColumns(resultRelInfo, estate);
updatedCols = GetUpdatedColumns(resultRelInfo, estate);
modifiedCols = bms_union(insertedCols, updatedCols);
val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
slot,
tupdesc,
modifiedCols,
64);
ereport(ERROR,
(errcode(ERRCODE_CHECK_VIOLATION),
errmsg("new row for relation \"%s\" violates partition constraint",
RelationGetRelationName(orig_rel)),
val_desc ? errdetail("Failing row contains %s.", val_desc) : 0));
}
}
/*
......@@ -1997,47 +2024,11 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
}
}
if (resultRelInfo->ri_PartitionCheck &&
!ExecPartitionCheck(resultRelInfo, slot, estate))
{
char *val_desc;
Relation orig_rel = rel;
/* See the comment above. */
if (resultRelInfo->ri_PartitionRoot)
{
HeapTuple tuple = ExecFetchSlotTuple(slot);
TupleDesc old_tupdesc = RelationGetDescr(rel);
TupleConversionMap *map;
rel = resultRelInfo->ri_PartitionRoot;
tupdesc = RelationGetDescr(rel);
/* a reverse map */
map = convert_tuples_by_name(old_tupdesc, tupdesc,
gettext_noop("could not convert row type"));
if (map != NULL)
{
tuple = do_convert_tuple(tuple, map);
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
}
}
insertedCols = GetInsertedColumns(resultRelInfo, estate);
updatedCols = GetUpdatedColumns(resultRelInfo, estate);
modifiedCols = bms_union(insertedCols, updatedCols);
val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
slot,
tupdesc,
modifiedCols,
64);
ereport(ERROR,
(errcode(ERRCODE_CHECK_VIOLATION),
errmsg("new row for relation \"%s\" violates partition constraint",
RelationGetRelationName(orig_rel)),
val_desc ? errdetail("Failing row contains %s.", val_desc) : 0));
}
if (resultRelInfo->ri_PartitionCheck)
ExecPartitionCheck(resultRelInfo, slot, estate);
}
/*
* ExecWithCheckOptions -- check that tuple satisfies any WITH CHECK OPTIONs
* of the specified kind.
......@@ -3317,6 +3308,13 @@ ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd,
PartitionDispatchData *failed_at;
TupleTableSlot *failed_slot;
/*
* First check the root table's partition constraint, if any. No point in
* routing the tuple it if it doesn't belong in the root table itself.
*/
if (resultRelInfo->ri_PartitionCheck)
ExecPartitionCheck(resultRelInfo, slot, estate);
result = get_partition_for_tuple(pd, slot, estate,
&failed_at, &failed_slot);
if (result < 0)
......
......@@ -414,6 +414,16 @@ ExecInsert(ModifyTableState *mtstate,
}
else
{
/*
* We always check the partition constraint, including when the tuple
* got here via tuple-routing. However we don't need to in the latter
* case if no BR trigger is defined on the partition. Note that a BR
* trigger might modify the tuple such that the partition constraint
* is no longer satisfied, so we need to check in that case.
*/
bool check_partition_constr =
(resultRelInfo->ri_PartitionCheck != NIL);
/*
* Constraints might reference the tableoid column, so initialize
* t_tableOid before evaluating them.
......@@ -431,9 +441,16 @@ ExecInsert(ModifyTableState *mtstate,
resultRelInfo, slot, estate);
/*
* Check the constraints of the tuple
* No need though if the tuple has been routed, and a BR trigger
* doesn't exist.
*/
if (resultRelationDesc->rd_att->constr || resultRelInfo->ri_PartitionCheck)
if (saved_resultRelInfo != NULL &&
!(resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_before_row))
check_partition_constr = false;
/* Check the constraints of the tuple */
if (resultRelationDesc->rd_att->constr || check_partition_constr)
ExecConstraints(resultRelInfo, slot, estate);
if (onconflict != ONCONFLICT_NONE && resultRelInfo->ri_NumIndices > 0)
......
......@@ -379,7 +379,7 @@ drop function mlparted11_trig_fn();
-- checking its partition constraint before inserting into the leaf partition
-- selected by tuple-routing
insert into mlparted1 (a, b) values (2, 3);
ERROR: new row for relation "mlparted11" violates partition constraint
ERROR: new row for relation "mlparted1" violates partition constraint
DETAIL: Failing row contains (3, 2).
-- check routing error through a list partitioned table when the key is null
create table lparted_nonullpart (a int, b char) partition by list (b);
......@@ -495,3 +495,16 @@ select tableoid::regclass::text, * from mcrparted order by 1;
-- cleanup
drop table mcrparted;
-- check that a BR constraint can't make partition contain violating rows
create table brtrigpartcon (a int, b text) partition by list (a);
create table brtrigpartcon1 partition of brtrigpartcon for values in (1);
create or replace function brtrigpartcon1trigf() returns trigger as $$begin new.a := 2; return new; end$$ language plpgsql;
create trigger brtrigpartcon1trig before insert on brtrigpartcon1 for each row execute procedure brtrigpartcon1trigf();
insert into brtrigpartcon values (1, 'hi there');
ERROR: new row for relation "brtrigpartcon1" violates partition constraint
DETAIL: Failing row contains (2, hi there).
insert into brtrigpartcon1 values (1, 'hi there');
ERROR: new row for relation "brtrigpartcon1" violates partition constraint
DETAIL: Failing row contains (2, hi there).
drop table brtrigpartcon;
drop function brtrigpartcon1trigf();
......@@ -332,3 +332,13 @@ select tableoid::regclass::text, * from mcrparted order by 1;
-- cleanup
drop table mcrparted;
-- check that a BR constraint can't make partition contain violating rows
create table brtrigpartcon (a int, b text) partition by list (a);
create table brtrigpartcon1 partition of brtrigpartcon for values in (1);
create or replace function brtrigpartcon1trigf() returns trigger as $$begin new.a := 2; return new; end$$ language plpgsql;
create trigger brtrigpartcon1trig before insert on brtrigpartcon1 for each row execute procedure brtrigpartcon1trigf();
insert into brtrigpartcon values (1, 'hi there');
insert into brtrigpartcon1 values (1, 'hi there');
drop table brtrigpartcon;
drop function brtrigpartcon1trigf();
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment