Commit 19c47e7c authored by Robert Haas's avatar Robert Haas

Factor error generation out of ExecPartitionCheck.

At present, we always raise an ERROR if the partition constraint
is violated, but a pending patch for UPDATE tuple routing will
consider instead moving the tuple to the correct partition.
Refactor to make that simpler.

Amit Khandekar, reviewed by Amit Langote, David Rowley, and me.

Discussion: http://postgr.es/m/CAJ3gD9cue54GbEzfV-61nyGpijvjZgCcghvLsB0_nL8Nm8HzCA@mail.gmail.com
parent 84a6f63e
......@@ -2731,7 +2731,7 @@ CopyFrom(CopyState cstate)
/* Check the constraints of the tuple */
if (cstate->rel->rd_att->constr || check_partition_constr)
ExecConstraints(resultRelInfo, slot, estate);
ExecConstraints(resultRelInfo, slot, estate, true);
if (useHeapMultiInsert)
{
......
......@@ -1849,16 +1849,12 @@ ExecRelCheck(ResultRelInfo *resultRelInfo,
* ExecPartitionCheck --- check that tuple meets the partition constraint.
*
* Exported in executor.h for outside use.
* Returns true if it meets the partition constraint, else returns false.
*/
void
bool
ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
EState *estate)
{
Relation rel = resultRelInfo->ri_RelationDesc;
TupleDesc tupdesc = RelationGetDescr(rel);
Bitmapset *modifiedCols;
Bitmapset *insertedCols;
Bitmapset *updatedCols;
ExprContext *econtext;
/*
......@@ -1886,52 +1882,69 @@ ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
* As in case of the catalogued constraints, we treat a NULL result as
* success here, not a failure.
*/
if (!ExecCheck(resultRelInfo->ri_PartitionCheckExpr, econtext))
{
char *val_desc;
Relation orig_rel = rel;
return ExecCheck(resultRelInfo->ri_PartitionCheckExpr, econtext);
}
/*
* ExecPartitionCheckEmitError - Form and emit an error message after a failed
* partition constraint check.
*/
void
ExecPartitionCheckEmitError(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot,
EState *estate)
{
Relation rel = resultRelInfo->ri_RelationDesc;
Relation orig_rel = rel;
TupleDesc tupdesc = RelationGetDescr(rel);
char *val_desc;
Bitmapset *modifiedCols;
Bitmapset *insertedCols;
Bitmapset *updatedCols;
/* See the comment above. */
if (resultRelInfo->ri_PartitionRoot)
/*
* Need to first convert the tuple to the root partitioned table's row
* type. For details, check similar comments in ExecConstraints().
*/
if (resultRelInfo->ri_PartitionRoot)
{
HeapTuple tuple = ExecFetchSlotTuple(slot);
TupleDesc old_tupdesc = RelationGetDescr(rel);
TupleConversionMap *map;
rel = resultRelInfo->ri_PartitionRoot;
tupdesc = RelationGetDescr(rel);
/* a reverse map */
map = convert_tuples_by_name(old_tupdesc, tupdesc,
gettext_noop("could not convert row type"));
if (map != NULL)
{
HeapTuple tuple = ExecFetchSlotTuple(slot);
TupleDesc old_tupdesc = RelationGetDescr(rel);
TupleConversionMap *map;
rel = resultRelInfo->ri_PartitionRoot;
tupdesc = RelationGetDescr(rel);
/* a reverse map */
map = convert_tuples_by_name(old_tupdesc, tupdesc,
gettext_noop("could not convert row type"));
if (map != NULL)
{
tuple = do_convert_tuple(tuple, map);
ExecSetSlotDescriptor(slot, tupdesc);
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
}
tuple = do_convert_tuple(tuple, map);
ExecSetSlotDescriptor(slot, tupdesc);
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
}
insertedCols = GetInsertedColumns(resultRelInfo, estate);
updatedCols = GetUpdatedColumns(resultRelInfo, estate);
modifiedCols = bms_union(insertedCols, updatedCols);
val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
slot,
tupdesc,
modifiedCols,
64);
ereport(ERROR,
(errcode(ERRCODE_CHECK_VIOLATION),
errmsg("new row for relation \"%s\" violates partition constraint",
RelationGetRelationName(orig_rel)),
val_desc ? errdetail("Failing row contains %s.", val_desc) : 0));
}
insertedCols = GetInsertedColumns(resultRelInfo, estate);
updatedCols = GetUpdatedColumns(resultRelInfo, estate);
modifiedCols = bms_union(insertedCols, updatedCols);
val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
slot,
tupdesc,
modifiedCols,
64);
ereport(ERROR,
(errcode(ERRCODE_CHECK_VIOLATION),
errmsg("new row for relation \"%s\" violates partition constraint",
RelationGetRelationName(orig_rel)),
val_desc ? errdetail("Failing row contains %s.", val_desc) : 0));
}
/*
* ExecConstraints - check constraints of the tuple in 'slot'
*
* This checks the traditional NOT NULL and check constraints, as well as
* the partition constraint, if any.
* This checks the traditional NOT NULL and check constraints, and if
* requested, checks the partition constraint.
*
* Note: 'slot' contains the tuple to check the constraints of, which may
* have been converted from the original input tuple after tuple routing.
......@@ -1939,7 +1952,8 @@ ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
*/
void
ExecConstraints(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate)
TupleTableSlot *slot, EState *estate,
bool check_partition_constraint)
{
Relation rel = resultRelInfo->ri_RelationDesc;
TupleDesc tupdesc = RelationGetDescr(rel);
......@@ -2055,8 +2069,9 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
}
}
if (resultRelInfo->ri_PartitionCheck)
ExecPartitionCheck(resultRelInfo, slot, estate);
if (check_partition_constraint && resultRelInfo->ri_PartitionCheck &&
!ExecPartitionCheck(resultRelInfo, slot, estate))
ExecPartitionCheckEmitError(resultRelInfo, slot, estate);
}
......
......@@ -167,8 +167,9 @@ ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd,
* First check the root table's partition constraint, if any. No point in
* routing the tuple if it doesn't belong in the root table itself.
*/
if (resultRelInfo->ri_PartitionCheck)
ExecPartitionCheck(resultRelInfo, slot, estate);
if (resultRelInfo->ri_PartitionCheck &&
!ExecPartitionCheck(resultRelInfo, slot, estate))
ExecPartitionCheckEmitError(resultRelInfo, slot, estate);
/* start with the root partitioned table */
parent = pd[0];
......
......@@ -401,7 +401,7 @@ ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
/* Check the constraints of the tuple */
if (rel->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
ExecConstraints(resultRelInfo, slot, estate, true);
/* Store the slot into tuple that we can inspect. */
tuple = ExecMaterializeSlot(slot);
......@@ -466,7 +466,7 @@ ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
/* Check the constraints of the tuple */
if (rel->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
ExecConstraints(resultRelInfo, slot, estate, true);
/* Store the slot into tuple that we can write. */
tuple = ExecMaterializeSlot(slot);
......
......@@ -487,7 +487,7 @@ ExecInsert(ModifyTableState *mtstate,
/* Check the constraints of the tuple */
if (resultRelationDesc->rd_att->constr || check_partition_constr)
ExecConstraints(resultRelInfo, slot, estate);
ExecConstraints(resultRelInfo, slot, estate, true);
if (onconflict != ONCONFLICT_NONE && resultRelInfo->ri_NumIndices > 0)
{
......@@ -1049,7 +1049,7 @@ lreplace:;
* tuple-routing is performed here, hence the slot remains unchanged.
*/
if (resultRelationDesc->rd_att->constr || resultRelInfo->ri_PartitionCheck)
ExecConstraints(resultRelInfo, slot, estate);
ExecConstraints(resultRelInfo, slot, estate, true);
/*
* replace the heap tuple
......
......@@ -187,9 +187,12 @@ extern ResultRelInfo *ExecGetTriggerResultRel(EState *estate, Oid relid);
extern void ExecCleanUpTriggerState(EState *estate);
extern bool ExecContextForcesOids(PlanState *planstate, bool *hasoids);
extern void ExecConstraints(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate);
extern void ExecPartitionCheck(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate,
bool check_partition_constraint);
extern bool ExecPartitionCheck(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate);
extern void ExecPartitionCheckEmitError(ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate);
extern void ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, EState *estate);
extern LockTupleMode ExecUpdateLockMode(EState *estate, ResultRelInfo *relinfo);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment