Commit 583df3b5 authored by Robert Haas's avatar Robert Haas

Code beautification for ATExecAttachPartition.

Amit Langote

Discussion: http://postgr.es/m/CAFjFpReT_kq_uwU_B8aWDxR7jNGE=P0iELycdq5oupi=xSQTOw@mail.gmail.com
parent 86705aa8
......@@ -13419,10 +13419,10 @@ ComputePartitionAttrs(Relation rel, List *partParams, AttrNumber *partattrs,
static ObjectAddress
ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
{
Relation attachRel,
Relation attachrel,
catalog;
List *childrels;
TupleConstr *attachRel_constr;
List *attachrel_children;
TupleConstr *attachrel_constr;
List *partConstraint,
*existConstraint;
SysScanDesc scan;
......@@ -13434,22 +13434,22 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
ObjectAddress address;
const char *trigger_name;
attachRel = heap_openrv(cmd->name, AccessExclusiveLock);
attachrel = heap_openrv(cmd->name, AccessExclusiveLock);
/*
* Must be owner of both parent and source table -- parent was checked by
* ATSimplePermissions call in ATPrepCmd
*/
ATSimplePermissions(attachRel, ATT_TABLE | ATT_FOREIGN_TABLE);
ATSimplePermissions(attachrel, ATT_TABLE | ATT_FOREIGN_TABLE);
/* A partition can only have one parent */
if (attachRel->rd_rel->relispartition)
if (attachrel->rd_rel->relispartition)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is already a partition",
RelationGetRelationName(attachRel))));
RelationGetRelationName(attachrel))));
if (OidIsValid(attachRel->rd_rel->reloftype))
if (OidIsValid(attachrel->rd_rel->reloftype))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot attach a typed table as partition")));
......@@ -13462,7 +13462,7 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
ScanKeyInit(&skey,
Anum_pg_inherits_inhrelid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(attachRel)));
ObjectIdGetDatum(RelationGetRelid(attachrel)));
scan = systable_beginscan(catalog, InheritsRelidSeqnoIndexId, true,
NULL, 1, &skey);
if (HeapTupleIsValid(systable_getnext(scan)))
......@@ -13475,11 +13475,11 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
ScanKeyInit(&skey,
Anum_pg_inherits_inhparent,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(attachRel)));
ObjectIdGetDatum(RelationGetRelid(attachrel)));
scan = systable_beginscan(catalog, InheritsParentIndexId, true, NULL,
1, &skey);
if (HeapTupleIsValid(systable_getnext(scan)) &&
attachRel->rd_rel->relkind == RELKIND_RELATION)
attachrel->rd_rel->relkind == RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot attach inheritance parent as partition")));
......@@ -13487,22 +13487,22 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
heap_close(catalog, AccessShareLock);
/*
* Prevent circularity by seeing if rel is a partition of attachRel. (In
* Prevent circularity by seeing if rel is a partition of attachrel. (In
* particular, this disallows making a rel a partition of itself.)
*/
childrels = find_all_inheritors(RelationGetRelid(attachRel),
AccessShareLock, NULL);
if (list_member_oid(childrels, RelationGetRelid(rel)))
attachrel_children = find_all_inheritors(RelationGetRelid(attachrel),
AccessShareLock, NULL);
if (list_member_oid(attachrel_children, RelationGetRelid(rel)))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("circular inheritance not allowed"),
errdetail("\"%s\" is already a child of \"%s\".",
RelationGetRelationName(rel),
RelationGetRelationName(attachRel))));
RelationGetRelationName(attachrel))));
/* Temp parent cannot have a partition that is itself not a temp */
if (rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP &&
attachRel->rd_rel->relpersistence != RELPERSISTENCE_TEMP)
attachrel->rd_rel->relpersistence != RELPERSISTENCE_TEMP)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot attach a permanent relation as partition of temporary relation \"%s\"",
......@@ -13516,30 +13516,30 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
errmsg("cannot attach as partition of temporary relation of another session")));
/* Ditto for the partition */
if (attachRel->rd_rel->relpersistence == RELPERSISTENCE_TEMP &&
!attachRel->rd_islocaltemp)
if (attachrel->rd_rel->relpersistence == RELPERSISTENCE_TEMP &&
!attachrel->rd_islocaltemp)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot attach temporary relation of another session as partition")));
/* If parent has OIDs then child must have OIDs */
if (rel->rd_rel->relhasoids && !attachRel->rd_rel->relhasoids)
if (rel->rd_rel->relhasoids && !attachrel->rd_rel->relhasoids)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot attach table \"%s\" without OIDs as partition of"
" table \"%s\" with OIDs", RelationGetRelationName(attachRel),
" table \"%s\" with OIDs", RelationGetRelationName(attachrel),
RelationGetRelationName(rel))));
/* OTOH, if parent doesn't have them, do not allow in attachRel either */
if (attachRel->rd_rel->relhasoids && !rel->rd_rel->relhasoids)
/* OTOH, if parent doesn't have them, do not allow in attachrel either */
if (attachrel->rd_rel->relhasoids && !rel->rd_rel->relhasoids)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot attach table \"%s\" with OIDs as partition of table"
" \"%s\" without OIDs", RelationGetRelationName(attachRel),
" \"%s\" without OIDs", RelationGetRelationName(attachrel),
RelationGetRelationName(rel))));
/* Check if there are any columns in attachRel that aren't in the parent */
tupleDesc = RelationGetDescr(attachRel);
/* Check if there are any columns in attachrel that aren't in the parent */
tupleDesc = RelationGetDescr(attachrel);
natts = tupleDesc->natts;
for (attno = 1; attno <= natts; attno++)
{
......@@ -13557,7 +13557,7 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("table \"%s\" contains column \"%s\" not found in parent \"%s\"",
RelationGetRelationName(attachRel), attributeName,
RelationGetRelationName(attachrel), attributeName,
RelationGetRelationName(rel)),
errdetail("New partition should contain only the columns present in parent.")));
}
......@@ -13567,34 +13567,34 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
* currently don't allow it to become a partition. See also prohibitions
* in ATExecAddInherit() and CreateTrigger().
*/
trigger_name = FindTriggerIncompatibleWithInheritance(attachRel->trigdesc);
trigger_name = FindTriggerIncompatibleWithInheritance(attachrel->trigdesc);
if (trigger_name != NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("trigger \"%s\" prevents table \"%s\" from becoming a partition",
trigger_name, RelationGetRelationName(attachRel)),
trigger_name, RelationGetRelationName(attachrel)),
errdetail("ROW triggers with transition tables are not supported on partitions")));
/* OK to create inheritance. Rest of the checks performed there */
CreateInheritance(attachRel, rel);
CreateInheritance(attachrel, rel);
/*
* Check that the new partition's bound is valid and does not overlap any
* of existing partitions of the parent - note that it does not return on
* error.
*/
check_new_partition_bound(RelationGetRelationName(attachRel), rel,
check_new_partition_bound(RelationGetRelationName(attachrel), rel,
cmd->bound);
/* Update the pg_class entry. */
StorePartitionBound(attachRel, rel, cmd->bound);
StorePartitionBound(attachrel, rel, cmd->bound);
/*
* Generate partition constraint from the partition bound specification.
* If the parent itself is a partition, make sure to include its
* constraint as well.
*/
partConstraint = list_concat(get_qual_from_partbound(attachRel, rel,
partConstraint = list_concat(get_qual_from_partbound(attachrel, rel,
cmd->bound),
RelationGetPartitionQual(rel));
partConstraint = (List *) eval_const_expressions(NULL,
......@@ -13612,20 +13612,20 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
* There is a case in which we cannot rely on just the result of the
* proof.
*/
attachRel_constr = tupleDesc->constr;
attachrel_constr = tupleDesc->constr;
existConstraint = NIL;
if (attachRel_constr != NULL)
if (attachrel_constr != NULL)
{
int num_check = attachRel_constr->num_check;
int num_check = attachrel_constr->num_check;
int i;
if (attachRel_constr->has_not_null)
if (attachrel_constr->has_not_null)
{
int natts = attachRel->rd_att->natts;
int natts = attachrel->rd_att->natts;
for (i = 1; i <= natts; i++)
{
Form_pg_attribute att = attachRel->rd_att->attrs[i - 1];
Form_pg_attribute att = attachrel->rd_att->attrs[i - 1];
if (att->attnotnull && !att->attisdropped)
{
......@@ -13659,10 +13659,10 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
* If this constraint hasn't been fully validated yet, we must
* ignore it here.
*/
if (!attachRel_constr->check[i].ccvalid)
if (!attachrel_constr->check[i].ccvalid)
continue;
cexpr = stringToNode(attachRel_constr->check[i].ccbin);
cexpr = stringToNode(attachrel_constr->check[i].ccbin);
/*
* Run each expression through const-simplification and
......@@ -13684,28 +13684,25 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
skip_validate = true;
}
/* It's safe to skip the validation scan after all */
if (skip_validate)
{
/* No need to scan the table after all. */
ereport(INFO,
(errmsg("partition constraint for table \"%s\" is implied by existing constraints",
RelationGetRelationName(attachRel))));
/*
* Set up to have the table be scanned to validate the partition
* constraint (see partConstraint above). If it's a partitioned table, we
* instead schedule its leaf partitions to be scanned.
*/
if (!skip_validate)
RelationGetRelationName(attachrel))));
}
else
{
/* Constraints proved insufficient, so we need to scan the table. */
List *all_parts;
ListCell *lc;
/* Take an exclusive lock on the partitions to be checked */
if (attachRel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
all_parts = find_all_inheritors(RelationGetRelid(attachRel),
if (attachrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
all_parts = find_all_inheritors(RelationGetRelid(attachrel),
AccessExclusiveLock, NULL);
else
all_parts = list_make1_oid(RelationGetRelid(attachRel));
all_parts = list_make1_oid(RelationGetRelid(attachrel));
foreach(lc, all_parts)
{
......@@ -13716,23 +13713,23 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
bool found_whole_row;
/* Lock already taken */
if (part_relid != RelationGetRelid(attachRel))
if (part_relid != RelationGetRelid(attachrel))
part_rel = heap_open(part_relid, NoLock);
else
part_rel = attachRel;
part_rel = attachrel;
/*
* Skip if it's a partitioned table. Only RELKIND_RELATION
* relations (ie, leaf partitions) need to be scanned.
* Skip if the partition is itself a partitioned table. We can
* only ever scan RELKIND_RELATION relations.
*/
if (part_rel != attachRel &&
part_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
if (part_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
{
heap_close(part_rel, NoLock);
if (part_rel != attachrel)
heap_close(part_rel, NoLock);
continue;
}
/* Grab a work queue entry */
/* Grab a work queue entry. */
tab = ATGetQueueEntry(wqueue, part_rel);
/* Adjust constraint to match this partition */
......@@ -13746,15 +13743,15 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
elog(ERROR, "unexpected whole-row reference found in partition key");
/* keep our lock until commit */
if (part_rel != attachRel)
if (part_rel != attachrel)
heap_close(part_rel, NoLock);
}
}
ObjectAddressSet(address, RelationRelationId, RelationGetRelid(attachRel));
ObjectAddressSet(address, RelationRelationId, RelationGetRelid(attachrel));
/* keep our lock until commit */
heap_close(attachRel, NoLock);
heap_close(attachrel, NoLock);
return address;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment