Commit f58eac82 authored by Tom Lane's avatar Tom Lane

Code and docs review for ALTER TABLE INHERIT/NO INHERIT patch.

parent e1fdd226
<!-- $PostgreSQL: pgsql/doc/src/sgml/ddl.sgml,v 1.63 2006/09/20 21:30:20 tgl Exp $ --> <!-- $PostgreSQL: pgsql/doc/src/sgml/ddl.sgml,v 1.64 2006/10/13 21:43:17 tgl Exp $ -->
<chapter id="ddl"> <chapter id="ddl">
<title>Data Definition</title> <title>Data Definition</title>
...@@ -2061,53 +2061,54 @@ VALUES ('New York', NULL, NULL, 'NY'); ...@@ -2061,53 +2061,54 @@ VALUES ('New York', NULL, NULL, 'NY');
</para> </para>
<para> <para>
Table inheritance can be defined using the <xref linkend="sql-createtable" Table inheritance is typically established when the child table is
endterm="sql-createtable-title"> statement using the created, using the <literal>INHERITS</> clause of the
<command>INHERITS</command> keyword. However the related statement <xref linkend="sql-createtable" endterm="sql-createtable-title">
<command>CREATE TABLE AS</command> does not allow inheritance to be statement. However the related statement <command>CREATE TABLE AS</command>
specified. does not allow inheritance to be specified.
</para> </para>
<para> <para>
Alternatively a table which is already defined in a compatible way can have Alternatively, a table which is already defined in a compatible way can
a new parent added with <xref linkend="sql-altertable" have a new parent relationship added, using the <literal>INHERIT</literal>
endterm="sql-altertable-title"> using the <command>INHERIT</command> variant of <xref linkend="sql-altertable" endterm="sql-altertable-title">.
subform. To do this the new child table must already include columns with To do this the new child table must already include columns with
the same name and type as the columns of the parent. It must also include the same names and types as the columns of the parent. It must also include
check constraints with the same name and check expression as those of the check constraints with the same names and check expressions as those of the
parent. Similarly an inheritance link can be removed from a child using the parent. Similarly an inheritance link can be removed from a child using the
<command>ALTER TABLE</command> using the <command>NO INHERIT</command> <literal>NO INHERIT</literal> variant of <command>ALTER TABLE</>.
subform. Dynamically adding and removing inheritance links like this can be useful
when the inheritance relationship is being used for table
partitioning (see <xref linkend="ddl-partitioning">).
</para> </para>
<para> <para>
One convenient way to create a compatible table to be a new child One convenient way to create a compatible table that will later be made
is specify the <command>LIKE</command> clause in <command>CREATE a new child is to use the <literal>LIKE</literal> clause in <command>CREATE
TABLE</command>. This creates a new table with the same columns as TABLE</command>. This creates a new table with the same columns as
the source table. If there are any <command>CHECK</command> the source table. If there are any <literal>CHECK</literal>
constraints defined on the parent table, the <command>INCLUDING constraints defined on the source table, the <literal>INCLUDING
CONSTRAINTS</command> option to <command>LIKE</command> may be CONSTRAINTS</literal> option to <literal>LIKE</literal> should be
useful, as the new child must have constraints matching the parent specified, as the new child must have constraints matching the parent
to be considered compatible. Alternatively a compatible table can to be considered compatible.
be created by first creating a new child using <command>CREATE
TABLE</command> then removing the inheritance link with
<command>ALTER TABLE</command>.
</para> </para>
<para> <para>
A parent table cannot be dropped while any A parent table cannot be dropped while any of its children remain. Neither
of its children remain. If you wish to remove a table and all of its can columns of child tables be dropped or altered if they are inherited
from any parent tables. If you wish to remove a table and all of its
descendants, one easy way is to drop the parent table with the descendants, one easy way is to drop the parent table with the
<literal>CASCADE</literal> option. Neither can columns of child tables be <literal>CASCADE</literal> option.
dropped or altered if they are inherited from any parent tables.
</para> </para>
<para> <para>
<xref linkend="sql-altertable" endterm="sql-altertable-title"> will <xref linkend="sql-altertable" endterm="sql-altertable-title"> will
propagate any changes in column data definitions and check constraints down propagate any changes in column data definitions and check
the inheritance hierarchy. <command>ALTER TABLE</command> follows the same constraints down the inheritance hierarchy. Again, dropping
rules for duplicate column merging and rejection that apply during columns or constraints on parent tables is only possible when using
<command>CREATE TABLE</command>. the <literal>CASCADE</literal> option. <command>ALTER
TABLE</command> follows the same rules for duplicate column merging
and rejection that apply during <command>CREATE TABLE</command>.
</para> </para>
<sect2 id="ddl-inherit-caveats"> <sect2 id="ddl-inherit-caveats">
...@@ -2162,16 +2163,6 @@ VALUES ('New York', NULL, NULL, 'NY'); ...@@ -2162,16 +2163,6 @@ VALUES ('New York', NULL, NULL, 'NY');
not capital names. There is no good workaround for this case. not capital names. There is no good workaround for this case.
</para> </para>
</listitem> </listitem>
<listitem>
<para>
If a table is ever removed from the inheritance structure using
<command>ALTER TABLE</command> then all its columns will be marked as
being locally defined. This means <command>DROP COLUMN</command> on the
parent table will never cascade to drop those columns on the child
table. They must be dropped manually.
</para>
</listitem>
</itemizedlist> </itemizedlist>
These deficiencies will probably be fixed in some future release, These deficiencies will probably be fixed in some future release,
...@@ -2222,37 +2213,31 @@ VALUES ('New York', NULL, NULL, 'NY'); ...@@ -2222,37 +2213,31 @@ VALUES ('New York', NULL, NULL, 'NY');
<itemizedlist> <itemizedlist>
<listitem> <listitem>
<para> <para>
Query performance can be improved when partition constraints can be Query performance can be improved dramatically in certain situations,
combined with local indexes to reduce the number of records that need to particularly when most of the heavily accessed rows of the table are in a
be accessed for a query. Whereas the alternative, adding those columns single partition or a small number of partitions. The partitioning
to every index, increases space usage which can erase any substitutes for leading columns of indexes, reducing index size and
performance gain. making it more likely that the heavily-used parts of the indexes
</para> fit in memory.
<para>
When most of the heavily accessed area of the table is in a single
partition or a small number of partitions. That partition and its
indexes are more likely to fit within memory than the index of the
entire table.
</para> </para>
</listitem> </listitem>
<listitem> <listitem>
<para> <para>
When queries or updates access a large percentage of a a single When queries or updates access a large percentage of a a single
partition performance can be improved dramatically by taking advantage partition, performance can be improved by taking advantage
of sequential disk access of a single partition instead of using an of sequential scan of that partition instead of using an
index and random access reads across the whole table. index and random access reads scattered across the whole table.
</para> </para>
</listitem> </listitem>
<listitem> <listitem>
<para> <para>
Bulk loads and deletes may be accomplished by simply removing or adding Bulk loads and deletes may be accomplished by adding or removing
one of the partitions. <command>ALTER TABLE</> is far faster than a bulk partitions, if that requirement is planned into the partitioning design.
and takes the same amount of time regardless of the amount of data being <command>ALTER TABLE</> is far faster than a bulk operation.
added or removed. It also entirely avoids the <command>VACUUM</command> It also entirely avoids the <command>VACUUM</command>
overhead caused by a bulk <command>delete</>. overhead caused by a bulk <command>DELETE</>.
</para> </para>
</listitem> </listitem>
...@@ -2577,6 +2562,25 @@ DO INSTEAD ...@@ -2577,6 +2562,25 @@ DO INSTEAD
creating a new partition each month, so it may be wise to write a creating a new partition each month, so it may be wise to write a
script that generates the required DDL automatically. script that generates the required DDL automatically.
</para> </para>
<para>
Partitioning can also be arranged using a <literal>UNION ALL</literal>
view:
<programlisting>
CREATE VIEW measurement AS
SELECT * FROM measurement_y2004m02
UNION ALL SELECT * FROM measurement_y2004m03
...
UNION ALL SELECT * FROM measurement_y2005m11
UNION ALL SELECT * FROM measurement_y2005m12
UNION ALL SELECT * FROM measurement_y2006m01;
</programlisting>
However, the need to
recreate the view adds an extra step to adding and dropping
individual partitions of the dataset.
</para>
</sect2> </sect2>
<sect2 id="ddl-partitioning-managing-partitions"> <sect2 id="ddl-partitioning-managing-partitions">
...@@ -2589,15 +2593,15 @@ DO INSTEAD ...@@ -2589,15 +2593,15 @@ DO INSTEAD
add new partitions for new data. One of the most important add new partitions for new data. One of the most important
advantages of partitioning is precisely that it allows this advantages of partitioning is precisely that it allows this
otherwise painful task to be executed nearly instantaneously by otherwise painful task to be executed nearly instantaneously by
manipulating the partition structure, rather than moving large manipulating the partition structure, rather than physically moving large
amounts of data around physically. amounts of data around.
</para> </para>
<para> <para>
The simplest option for removing old data is to simply drop the partition The simplest option for removing old data is simply to drop the partition
that is no longer necessary: that is no longer necessary:
<programlisting> <programlisting>
DROP TABLE measurement_y2003mm02; DROP TABLE measurement_y2003m02;
</programlisting> </programlisting>
This can very quickly delete millions of records because it doesn't have This can very quickly delete millions of records because it doesn't have
to individually delete every record. to individually delete every record.
...@@ -2608,10 +2612,10 @@ DROP TABLE measurement_y2003mm02; ...@@ -2608,10 +2612,10 @@ DROP TABLE measurement_y2003mm02;
the partitioned table but retain access to it as a table in its own the partitioned table but retain access to it as a table in its own
right: right:
<programlisting> <programlisting>
ALTER TABLE measurement_y2003mm02 NO INHERIT measurement; ALTER TABLE measurement_y2003m02 NO INHERIT measurement;
</programlisting> </programlisting>
This allows further operations to be performed on the data before This allows further operations to be performed on the data before
it is dropped. For example, this is often a useful time to backup it is dropped. For example, this is often a useful time to back up
the data using <command>COPY</>, <application>pg_dump</>, or the data using <command>COPY</>, <application>pg_dump</>, or
similar tools. It can also be a useful time to aggregate data similar tools. It can also be a useful time to aggregate data
into smaller formats, perform other data manipulations, or run into smaller formats, perform other data manipulations, or run
...@@ -2635,10 +2639,12 @@ CREATE TABLE measurement_y2006m02 ( ...@@ -2635,10 +2639,12 @@ CREATE TABLE measurement_y2006m02 (
transformed prior to it appearing in the partitioned table. transformed prior to it appearing in the partitioned table.
<programlisting> <programlisting>
CREATE TABLE measurement_y2006m02 (LIKE measurement INCLUDING DEFAULTS INCLUDING CONSTRAINTS); CREATE TABLE measurement_y2006m02
\COPY measurement_y2006m02 FROM 'measurement_y2006m02' (LIKE measurement INCLUDING DEFAULTS INCLUDING CONSTRAINTS);
UPDATE ... ; ALTER TABLE measurement_y2006m02 ADD CONSTRAINT y2006m02
ALTER TABLE measurement_y2006m02 ADD CONSTRAINT y2006m02 CHECK ( logdate &gt;= DATE '2006-02-01' AND logdate &lt; DATE '2006-03-01' ); CHECK ( logdate &gt;= DATE '2006-02-01' AND logdate &lt; DATE '2006-03-01' );
\copy measurement_y2006m02 from 'measurement_y2006m02'
-- possibly some other data preparation work
ALTER TABLE measurement_y2006m02 INHERIT measurement; ALTER TABLE measurement_y2006m02 INHERIT measurement;
</programlisting> </programlisting>
</para> </para>
...@@ -2670,38 +2676,8 @@ ALTER TABLE measurement_y2006m02 INHERIT measurement; ...@@ -2670,38 +2676,8 @@ ALTER TABLE measurement_y2006m02 INHERIT measurement;
using a set of rules as suggested above.) using a set of rules as suggested above.)
</para> </para>
</listitem> </listitem>
<listitem>
<para>
When using the <literal>LIKE</> option above to create new
partitions, <literal>CHECK</> constraints are not copied from
the parent. If there are any <literal>CHECK</> constraints
defined for the parent, they must be manually created in new
partitions before <command>ALTER TABLE</command> will allow them
to be added.
</para>
</listitem>
</itemizedlist> </itemizedlist>
</para> </para>
<para>
Partitioning can also be arranged using a <literal>UNION ALL</literal>
view:
<programlisting>
CREATE VIEW measurement AS
SELECT * FROM measurement_y2004m02
UNION ALL SELECT * FROM measurement_y2004m03
...
UNION ALL SELECT * FROM measurement_y2005m11
UNION ALL SELECT * FROM measurement_y2005m12
UNION ALL SELECT * FROM measurement_y2006m01;
</programlisting>
However, the need to
recreate the view adds an extra step to adding and dropping
individual partitions of the dataset.
</para>
</sect2> </sect2>
<sect2 id="ddl-partitioning-constraint-exclusion"> <sect2 id="ddl-partitioning-constraint-exclusion">
......
<!-- <!--
$PostgreSQL: pgsql/doc/src/sgml/ref/alter_table.sgml,v 1.90 2006/09/16 00:30:16 momjian Exp $ $PostgreSQL: pgsql/doc/src/sgml/ref/alter_table.sgml,v 1.91 2006/10/13 21:43:18 tgl Exp $
PostgreSQL documentation PostgreSQL documentation
--> -->
...@@ -294,28 +294,22 @@ where <replaceable class="PARAMETER">action</replaceable> is one of: ...@@ -294,28 +294,22 @@ where <replaceable class="PARAMETER">action</replaceable> is one of:
<term><literal>INHERIT <replaceable class="PARAMETER">parent_table</replaceable></literal></term> <term><literal>INHERIT <replaceable class="PARAMETER">parent_table</replaceable></literal></term>
<listitem> <listitem>
<para> <para>
This form adds a new parent table to the table. This won't add new This form adds the target table as a new child of the specified parent
columns to the child table, instead all columns of the parent table must table. Subsequently, queries against the parent will include records
already exist in the child table. They must have matching data types, of the target table. To be added as a child, the target table must
already contain all the same columns as the parent (it could have
additional columns, too). The columns must have matching data types,
and if they have <literal>NOT NULL</literal> constraints in the parent and if they have <literal>NOT NULL</literal> constraints in the parent
then they must also have <literal>NOT NULL</literal> constraints in the then they must also have <literal>NOT NULL</literal> constraints in the
child. child.
</para> </para>
<para> <para>
There must also be matching table constraints for all There must also be matching child-table constraints for all
<literal>CHECK</literal> table constraints of the parent. Currently <literal>CHECK</literal> constraints of the parent. Currently
<literal>UNIQUE</literal>, <literal>PRIMARY KEY</literal>, and <literal>UNIQUE</literal>, <literal>PRIMARY KEY</literal>, and
<literal>FOREIGN KEY</literal> constraints are ignored however this may <literal>FOREIGN KEY</literal> constraints are not considered, but
change in the future. this may change in the future.
</para>
<para>
The easiest way to create a suitable table is to create a table using
<literal>INHERITS</literal> and then remove it via <literal>NO
INHERIT</literal>. Alternatively create a table using
<literal>LIKE</literal> however note that <literal>LIKE</literal> does
not create the necessary constraints.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
...@@ -324,7 +318,8 @@ where <replaceable class="PARAMETER">action</replaceable> is one of: ...@@ -324,7 +318,8 @@ where <replaceable class="PARAMETER">action</replaceable> is one of:
<term><literal>NO INHERIT <replaceable class="PARAMETER">parent_table</replaceable></literal></term> <term><literal>NO INHERIT <replaceable class="PARAMETER">parent_table</replaceable></literal></term>
<listitem> <listitem>
<para> <para>
This form removes a parent table from the list of parents of the table. This form removes the target table from the list of children of the
specified parent table.
Queries against the parent table will no longer include records drawn Queries against the parent table will no longer include records drawn
from the target table. from the target table.
</para> </para>
...@@ -392,6 +387,8 @@ where <replaceable class="PARAMETER">action</replaceable> is one of: ...@@ -392,6 +387,8 @@ where <replaceable class="PARAMETER">action</replaceable> is one of:
You must own the table to use <command>ALTER TABLE</>. You must own the table to use <command>ALTER TABLE</>.
To change the schema of a table, you must also have To change the schema of a table, you must also have
<literal>CREATE</literal> privilege on the new schema. <literal>CREATE</literal> privilege on the new schema.
To add the table as a new child of a parent table, you must own the
parent table as well.
To alter the owner, you must also be a direct or indirect member of the new To alter the owner, you must also be a direct or indirect member of the new
owning role, and that role must have <literal>CREATE</literal> privilege on owning role, and that role must have <literal>CREATE</literal> privilege on
the table's schema. (These restrictions enforce that altering the owner the table's schema. (These restrictions enforce that altering the owner
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.205 2006/10/11 16:42:58 tgl Exp $ * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.206 2006/10/13 21:43:18 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -168,7 +168,7 @@ static void add_nonduplicate_constraint(Constraint *cdef, ...@@ -168,7 +168,7 @@ static void add_nonduplicate_constraint(Constraint *cdef,
static bool change_varattnos_walker(Node *node, const AttrNumber *newattno); static bool change_varattnos_walker(Node *node, const AttrNumber *newattno);
static void StoreCatalogInheritance(Oid relationId, List *supers); static void StoreCatalogInheritance(Oid relationId, List *supers);
static void StoreCatalogInheritance1(Oid relationId, Oid parentOid, static void StoreCatalogInheritance1(Oid relationId, Oid parentOid,
int16 seqNumber, Relation catalogRelation); int16 seqNumber, Relation inhRelation);
static int findAttrByName(const char *attributeName, List *schema); static int findAttrByName(const char *attributeName, List *schema);
static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass); static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
static void AlterIndexNamespaces(Relation classRel, Relation rel, static void AlterIndexNamespaces(Relation classRel, Relation rel,
...@@ -253,8 +253,8 @@ static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace); ...@@ -253,8 +253,8 @@ static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
static void ATExecSetRelOptions(Relation rel, List *defList, bool isReset); static void ATExecSetRelOptions(Relation rel, List *defList, bool isReset);
static void ATExecEnableDisableTrigger(Relation rel, char *trigname, static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
bool enable, bool skip_system); bool enable, bool skip_system);
static void ATExecAddInherits(Relation rel, RangeVar *parent); static void ATExecAddInherit(Relation rel, RangeVar *parent);
static void ATExecDropInherits(Relation rel, RangeVar *parent); static void ATExecDropInherit(Relation rel, RangeVar *parent);
static void copy_relation_data(Relation rel, SMgrRelation dst); static void copy_relation_data(Relation rel, SMgrRelation dst);
static void update_ri_trigger_args(Oid relid, static void update_ri_trigger_args(Oid relid,
const char *oldname, const char *oldname,
...@@ -1272,25 +1272,34 @@ StoreCatalogInheritance(Oid relationId, List *supers) ...@@ -1272,25 +1272,34 @@ StoreCatalogInheritance(Oid relationId, List *supers)
seqNumber = 1; seqNumber = 1;
foreach(entry, supers) foreach(entry, supers)
{ {
StoreCatalogInheritance1(relationId, lfirst_oid(entry), seqNumber, relation); Oid parentOid = lfirst_oid(entry);
StoreCatalogInheritance1(relationId, parentOid, seqNumber, relation);
seqNumber++; seqNumber++;
} }
heap_close(relation, RowExclusiveLock); heap_close(relation, RowExclusiveLock);
} }
/*
* Make catalog entries showing relationId as being an inheritance child
* of parentOid. inhRelation is the already-opened pg_inherits catalog.
*/
static void static void
StoreCatalogInheritance1(Oid relationId, Oid parentOid, StoreCatalogInheritance1(Oid relationId, Oid parentOid,
int16 seqNumber, Relation relation) int16 seqNumber, Relation inhRelation)
{ {
TupleDesc desc = RelationGetDescr(inhRelation);
Datum datum[Natts_pg_inherits]; Datum datum[Natts_pg_inherits];
char nullarr[Natts_pg_inherits]; char nullarr[Natts_pg_inherits];
ObjectAddress childobject, ObjectAddress childobject,
parentobject; parentobject;
HeapTuple tuple; HeapTuple tuple;
TupleDesc desc = RelationGetDescr(relation);
datum[0] = ObjectIdGetDatum(relationId); /* inhrel */ /*
* Make the pg_inherits entry
*/
datum[0] = ObjectIdGetDatum(relationId); /* inhrelid */
datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */ datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
datum[2] = Int16GetDatum(seqNumber); /* inhseqno */ datum[2] = Int16GetDatum(seqNumber); /* inhseqno */
...@@ -1300,9 +1309,9 @@ StoreCatalogInheritance1(Oid relationId, Oid parentOid, ...@@ -1300,9 +1309,9 @@ StoreCatalogInheritance1(Oid relationId, Oid parentOid,
tuple = heap_formtuple(desc, datum, nullarr); tuple = heap_formtuple(desc, datum, nullarr);
simple_heap_insert(relation, tuple); simple_heap_insert(inhRelation, tuple);
CatalogUpdateIndexes(relation, tuple); CatalogUpdateIndexes(inhRelation, tuple);
heap_freetuple(tuple); heap_freetuple(tuple);
...@@ -2150,8 +2159,8 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ...@@ -2150,8 +2159,8 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
case AT_DisableTrig: /* DISABLE TRIGGER variants */ case AT_DisableTrig: /* DISABLE TRIGGER variants */
case AT_DisableTrigAll: case AT_DisableTrigAll:
case AT_DisableTrigUser: case AT_DisableTrigUser:
case AT_AddInherits: case AT_AddInherit: /* INHERIT / NO INHERIT */
case AT_DropInherits: case AT_DropInherit:
ATSimplePermissions(rel, false); ATSimplePermissions(rel, false);
/* These commands never recurse */ /* These commands never recurse */
/* No command-specific prep needed */ /* No command-specific prep needed */
...@@ -2335,11 +2344,11 @@ ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd) ...@@ -2335,11 +2344,11 @@ ATExecCmd(AlteredTableInfo *tab, Relation rel, AlterTableCmd *cmd)
case AT_DisableTrigUser: /* DISABLE TRIGGER USER */ case AT_DisableTrigUser: /* DISABLE TRIGGER USER */
ATExecEnableDisableTrigger(rel, NULL, false, true); ATExecEnableDisableTrigger(rel, NULL, false, true);
break; break;
case AT_DropInherits: case AT_AddInherit:
ATExecDropInherits(rel, cmd->parent); ATExecAddInherit(rel, (RangeVar *) cmd->def);
break; break;
case AT_AddInherits: case AT_DropInherit:
ATExecAddInherits(rel, cmd->parent); ATExecDropInherit(rel, (RangeVar *) cmd->def);
break; break;
default: /* oops */ default: /* oops */
elog(ERROR, "unrecognized alter table type: %d", elog(ERROR, "unrecognized alter table type: %d",
...@@ -6087,46 +6096,32 @@ ATExecEnableDisableTrigger(Relation rel, char *trigname, ...@@ -6087,46 +6096,32 @@ ATExecEnableDisableTrigger(Relation rel, char *trigname,
EnableDisableTrigger(rel, trigname, enable, skip_system); EnableDisableTrigger(rel, trigname, enable, skip_system);
} }
static char *
decompile_conbin(HeapTuple contup, TupleDesc tupdesc)
{
Form_pg_constraint con;
bool isnull;
Datum attr;
Datum expr;
con = (Form_pg_constraint) GETSTRUCT(contup);
attr = heap_getattr(contup, Anum_pg_constraint_conbin, tupdesc, &isnull);
if (isnull)
elog(ERROR, "null conbin for constraint %u", HeapTupleGetOid(contup));
expr = DirectFunctionCall2(pg_get_expr, attr,
ObjectIdGetDatum(con->conrelid));
return DatumGetCString(DirectFunctionCall1(textout, expr));
}
/* /*
* ALTER TABLE INHERIT * ALTER TABLE INHERIT
* *
* Add a parent to the child's parents. This verifies that all the columns and * Add a parent to the child's parents. This verifies that all the columns and
* check constraints of the parent appear in the child and that they have the * check constraints of the parent appear in the child and that they have the
* same data type and expressions. * same data types and expressions.
*/ */
static void static void
ATExecAddInherits(Relation child_rel, RangeVar *parent) ATExecAddInherit(Relation child_rel, RangeVar *parent)
{ {
Relation parent_rel, Relation parent_rel,
catalogRelation; catalogRelation;
SysScanDesc scan; SysScanDesc scan;
ScanKeyData key; ScanKeyData key;
HeapTuple inheritsTuple; HeapTuple inheritsTuple;
int4 inhseqno; int32 inhseqno;
List *children; List *children;
/*
* AccessShareLock on the parent is what's obtained during normal CREATE
* TABLE ... INHERITS ..., so should be enough here.
*/
parent_rel = heap_openrv(parent, AccessShareLock); parent_rel = heap_openrv(parent, AccessShareLock);
/* /*
* Must be owner of both parent and child -- child is taken care of by * Must be owner of both parent and child -- child was checked by
* ATSimplePermissions call in ATPrepCmd * ATSimplePermissions call in ATPrepCmd
*/ */
ATSimplePermissions(parent_rel, false); ATSimplePermissions(parent_rel, false);
...@@ -6137,19 +6132,15 @@ ATExecAddInherits(Relation child_rel, RangeVar *parent) ...@@ -6137,19 +6132,15 @@ ATExecAddInherits(Relation child_rel, RangeVar *parent)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE), (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot inherit from temporary relation \"%s\"", errmsg("cannot inherit from temporary relation \"%s\"",
parent->relname))); RelationGetRelationName(parent_rel))));
/* If parent has OIDs then all children must have OIDs */
if (parent_rel->rd_rel->relhasoids && !child_rel->rd_rel->relhasoids)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("table \"%s\" without OIDs cannot inherit from table \"%s\" with OIDs",
RelationGetRelationName(child_rel), parent->relname)));
/* /*
* Don't allow any duplicates in the list of parents. We scan through the * Check for duplicates in the list of parents, and determine the highest
* list of parents in pg_inherit and keep track of the first open inhseqno * inhseqno already present; we'll use the next one for the new parent.
* slot found to use for the new parent. * (Note: get RowExclusiveLock because we will write pg_inherits below.)
*
* Note: we do not reject the case where the child already inherits from
* the parent indirectly; CREATE TABLE doesn't reject comparable cases.
*/ */
catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock); catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
ScanKeyInit(&key, ScanKeyInit(&key,
...@@ -6169,37 +6160,57 @@ ATExecAddInherits(Relation child_rel, RangeVar *parent) ...@@ -6169,37 +6160,57 @@ ATExecAddInherits(Relation child_rel, RangeVar *parent)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE), (errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("inherited relation \"%s\" duplicated", errmsg("inherited relation \"%s\" duplicated",
parent->relname))); RelationGetRelationName(parent_rel))));
if (inh->inhseqno == inhseqno + 1) if (inh->inhseqno > inhseqno)
inhseqno = inh->inhseqno; inhseqno = inh->inhseqno;
} }
systable_endscan(scan); systable_endscan(scan);
heap_close(catalogRelation, RowExclusiveLock);
/* /*
* If the new parent is found in our list of inheritors, we have a * Prevent circularity by seeing if proposed parent inherits from child.
* circular structure * (In particular, this disallows making a rel inherit from itself.)
*
* This is not completely bulletproof because of race conditions: in
* multi-level inheritance trees, someone else could concurrently
* be making another inheritance link that closes the loop but does
* not join either of the rels we have locked. Preventing that seems
* to require exclusive locks on the entire inheritance tree, which is
* a cure worse than the disease. find_all_inheritors() will cope with
* circularity anyway, so don't sweat it too much.
*/ */
children = find_all_inheritors(RelationGetRelid(child_rel)); children = find_all_inheritors(RelationGetRelid(child_rel));
if (list_member_oid(children, RelationGetRelid(parent_rel))) if (list_member_oid(children, RelationGetRelid(parent_rel)))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_TABLE), (errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("circular inheritance structure found"), errmsg("circular inheritance not allowed"),
errdetail("\"%s\" is already a child of \"%s\".", errdetail("\"%s\" is already a child of \"%s\".",
parent->relname, parent->relname,
RelationGetRelationName(child_rel)))); RelationGetRelationName(child_rel))));
/* If parent has OIDs then child must have OIDs */
if (parent_rel->rd_rel->relhasoids && !child_rel->rd_rel->relhasoids)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("table \"%s\" without OIDs cannot inherit from table \"%s\" with OIDs",
RelationGetRelationName(child_rel),
RelationGetRelationName(parent_rel))));
/* Match up the columns and bump attinhcount and attislocal */ /* Match up the columns and bump attinhcount and attislocal */
MergeAttributesIntoExisting(child_rel, parent_rel); MergeAttributesIntoExisting(child_rel, parent_rel);
/* Match up the constraints and make sure they're present in child */ /* Match up the constraints and make sure they're present in child */
MergeConstraintsIntoExisting(child_rel, parent_rel); MergeConstraintsIntoExisting(child_rel, parent_rel);
catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock); /*
* OK, it looks valid. Make the catalog entries that show inheritance.
*/
StoreCatalogInheritance1(RelationGetRelid(child_rel), StoreCatalogInheritance1(RelationGetRelid(child_rel),
RelationGetRelid(parent_rel), RelationGetRelid(parent_rel),
inhseqno + 1, catalogRelation); inhseqno + 1,
catalogRelation);
/* Now we're done with pg_inherits */
heap_close(catalogRelation, RowExclusiveLock); heap_close(catalogRelation, RowExclusiveLock);
/* keep our lock on the parent relation until commit */ /* keep our lock on the parent relation until commit */
...@@ -6207,26 +6218,53 @@ ATExecAddInherits(Relation child_rel, RangeVar *parent) ...@@ -6207,26 +6218,53 @@ ATExecAddInherits(Relation child_rel, RangeVar *parent)
} }
/* /*
* Check columns in child table match up with columns in parent * Obtain the source-text form of the constraint expression for a check
* constraint, given its pg_constraint tuple
*/
static char *
decompile_conbin(HeapTuple contup, TupleDesc tupdesc)
{
Form_pg_constraint con;
bool isnull;
Datum attr;
Datum expr;
con = (Form_pg_constraint) GETSTRUCT(contup);
attr = heap_getattr(contup, Anum_pg_constraint_conbin, tupdesc, &isnull);
if (isnull)
elog(ERROR, "null conbin for constraint %u", HeapTupleGetOid(contup));
expr = DirectFunctionCall2(pg_get_expr, attr,
ObjectIdGetDatum(con->conrelid));
return DatumGetCString(DirectFunctionCall1(textout, expr));
}
/*
* Check columns in child table match up with columns in parent, and increment
* their attinhcount.
* *
* Called by ATExecAddInherits * Called by ATExecAddInherit
* *
* Currently all columns must be found in child. Missing columns are an error. * Currently all parent columns must be found in child. Missing columns are an
* One day we might consider creating new columns like CREATE TABLE does. * error. One day we might consider creating new columns like CREATE TABLE
* does. However, that is widely unpopular --- in the common use case of
* partitioned tables it's a foot-gun.
* *
* The data type must match perfectly. If the parent column is NOT NULL then * The data type must match exactly. If the parent column is NOT NULL then
* the child table must be as well. Defaults are ignored however. * the child must be as well. Defaults are not compared, however.
*/ */
static void static void
MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel) MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel)
{ {
Relation attrdesc; Relation attrrel;
AttrNumber parent_attno; AttrNumber parent_attno;
int parent_natts; int parent_natts;
TupleDesc tupleDesc; TupleDesc tupleDesc;
TupleConstr *constr; TupleConstr *constr;
HeapTuple tuple; HeapTuple tuple;
attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
tupleDesc = RelationGetDescr(parent_rel); tupleDesc = RelationGetDescr(parent_rel);
parent_natts = tupleDesc->natts; parent_natts = tupleDesc->natts;
constr = tupleDesc->constr; constr = tupleDesc->constr;
...@@ -6240,17 +6278,12 @@ MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel) ...@@ -6240,17 +6278,12 @@ MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel)
if (attribute->attisdropped) if (attribute->attisdropped)
continue; continue;
/* Does it conflict with an existing column? */ /* Find same column in child (matching on column name). */
attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
tuple = SearchSysCacheCopyAttName(RelationGetRelid(child_rel), tuple = SearchSysCacheCopyAttName(RelationGetRelid(child_rel),
attributeName); attributeName);
if (HeapTupleIsValid(tuple)) if (HeapTupleIsValid(tuple))
{ {
/* /* Check they are same type and typmod */
* Yes, try to merge the two column definitions. They must have
* the same type and typmod.
*/
Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple); Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
if (attribute->atttypid != childatt->atttypid || if (attribute->atttypid != childatt->atttypid ||
...@@ -6258,45 +6291,40 @@ MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel) ...@@ -6258,45 +6291,40 @@ MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH), (errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("child table \"%s\" has different type for column \"%s\"", errmsg("child table \"%s\" has different type for column \"%s\"",
RelationGetRelationName(child_rel), NameStr(attribute->attname)))); RelationGetRelationName(child_rel),
attributeName)));
if (attribute->attnotnull && !childatt->attnotnull) if (attribute->attnotnull && !childatt->attnotnull)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH), (errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("column \"%s\" in child table must be marked NOT NULL", errmsg("column \"%s\" in child table must be marked NOT NULL",
NameStr(attribute->attname)))); attributeName)));
childatt->attinhcount++;
simple_heap_update(attrdesc, &tuple->t_self, tuple);
/* XXX strength reduce open indexes to outside loop? */
CatalogUpdateIndexes(attrdesc, tuple);
heap_freetuple(tuple);
/* /*
* We don't touch default at all since we're not making any other * OK, bump the child column's inheritance count. (If we fail
* DDL changes to the child * later on, this change will just roll back.)
*/ */
childatt->attinhcount++;
simple_heap_update(attrrel, &tuple->t_self, tuple);
CatalogUpdateIndexes(attrrel, tuple);
heap_freetuple(tuple);
} }
else else
{ {
/*
* Creating inherited columns in this case seems to be unpopular.
* In the common use case of partitioned tables it's a foot-gun.
*/
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH), (errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("child table is missing column \"%s\"", errmsg("child table is missing column \"%s\"",
NameStr(attribute->attname)))); attributeName)));
} }
heap_close(attrdesc, RowExclusiveLock);
} }
heap_close(attrrel, RowExclusiveLock);
} }
/* /*
* Check constraints in child table match up with constraints in parent * Check constraints in child table match up with constraints in parent
* *
* Called by ATExecAddInherits * Called by ATExecAddInherit
* *
* Currently all constraints in parent must be present in the child. One day we * Currently all constraints in parent must be present in the child. One day we
* may consider adding new constraints like CREATE TABLE does. We may also want * may consider adding new constraints like CREATE TABLE does. We may also want
...@@ -6305,9 +6333,9 @@ MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel) ...@@ -6305,9 +6333,9 @@ MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel)
* make it possible to ensure no records are mistakenly inserted into the * make it possible to ensure no records are mistakenly inserted into the
* master in partitioned tables rather than the appropriate child. * master in partitioned tables rather than the appropriate child.
* *
* XXX this is O(n^2) which may be issue with tables with hundreds of * XXX This is O(N^2) which may be an issue with tables with hundreds of
* constraints. As long as tables have more like 10 constraints it shouldn't be * constraints. As long as tables have more like 10 constraints it shouldn't be
* an issue though. Even 100 constraints ought not be the end of the world. * a problem though. Even 100 constraints ought not be the end of the world.
*/ */
static void static void
MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
...@@ -6326,13 +6354,12 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) ...@@ -6326,13 +6354,12 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
ScanKeyInit(&key, ScanKeyInit(&key,
Anum_pg_constraint_conrelid, Anum_pg_constraint_conrelid,
BTEqualStrategyNumber, BTEqualStrategyNumber, F_OIDEQ,
F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(child_rel))); ObjectIdGetDatum(RelationGetRelid(child_rel)));
scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId, scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId,
true, SnapshotNow, 1, &key); true, SnapshotNow, 1, &key);
constraints = NIL;
constraints = NIL;
while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
{ {
Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple); Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
...@@ -6342,20 +6369,21 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) ...@@ -6342,20 +6369,21 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
constraints = lappend(constraints, heap_copytuple(constraintTuple)); constraints = lappend(constraints, heap_copytuple(constraintTuple));
} }
systable_endscan(scan); systable_endscan(scan);
/* Then loop through the parent's constraints looking for them in the list */ /* Then scan through the parent's constraints looking for matches */
ScanKeyInit(&key, ScanKeyInit(&key,
Anum_pg_constraint_conrelid, Anum_pg_constraint_conrelid,
BTEqualStrategyNumber, BTEqualStrategyNumber, F_OIDEQ,
F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(parent_rel))); ObjectIdGetDatum(RelationGetRelid(parent_rel)));
scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId, true, scan = systable_beginscan(catalogRelation, ConstraintRelidIndexId, true,
SnapshotNow, 1, &key); SnapshotNow, 1, &key);
while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) while (HeapTupleIsValid(constraintTuple = systable_getnext(scan)))
{ {
bool found = false;
Form_pg_constraint parent_con = (Form_pg_constraint) GETSTRUCT(constraintTuple); Form_pg_constraint parent_con = (Form_pg_constraint) GETSTRUCT(constraintTuple);
bool found = false;
Form_pg_constraint child_con = NULL; Form_pg_constraint child_con = NULL;
HeapTuple child_contuple = NULL; HeapTuple child_contuple = NULL;
...@@ -6364,10 +6392,10 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) ...@@ -6364,10 +6392,10 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
foreach(elem, constraints) foreach(elem, constraints)
{ {
child_contuple = lfirst(elem); child_contuple = (HeapTuple) lfirst(elem);
child_con = (Form_pg_constraint) GETSTRUCT(child_contuple); child_con = (Form_pg_constraint) GETSTRUCT(child_contuple);
if (!strcmp(NameStr(parent_con->conname), if (strcmp(NameStr(parent_con->conname),
NameStr(child_con->conname))) NameStr(child_con->conname)) == 0)
{ {
found = true; found = true;
break; break;
...@@ -6377,14 +6405,13 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) ...@@ -6377,14 +6405,13 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
if (!found) if (!found)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH), (errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("child table missing constraint matching parent table constraint \"%s\"", errmsg("child table is missing constraint \"%s\"",
NameStr(parent_con->conname)))); NameStr(parent_con->conname))));
if (parent_con->condeferrable != child_con->condeferrable || if (parent_con->condeferrable != child_con->condeferrable ||
parent_con->condeferred != child_con->condeferred || parent_con->condeferred != child_con->condeferred ||
parent_con->contypid != child_con->contypid ||
strcmp(decompile_conbin(constraintTuple, tupleDesc), strcmp(decompile_conbin(constraintTuple, tupleDesc),
decompile_conbin(child_contuple, tupleDesc))) decompile_conbin(child_contuple, tupleDesc)) != 0)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH), (errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("constraint definition for check constraint \"%s\" does not match", errmsg("constraint definition for check constraint \"%s\" does not match",
...@@ -6407,37 +6434,42 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) ...@@ -6407,37 +6434,42 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
* and attislocal of the columns and removes the pg_inherit and pg_depend * and attislocal of the columns and removes the pg_inherit and pg_depend
* entries. * entries.
* *
* If attinhcount goes to 0 then attislocal gets set to true. If it goes back up * If attinhcount goes to 0 then attislocal gets set to true. If it goes back
* attislocal stays 0 which means if a child is ever removed from a parent then * up attislocal stays true, which means if a child is ever removed from a
* its columns will never be automatically dropped which may surprise. But at * parent then its columns will never be automatically dropped which may
* least we'll never surprise by dropping columns someone isn't expecting to be * surprise. But at least we'll never surprise by dropping columns someone
* dropped which would actually mean data loss. * isn't expecting to be dropped which would actually mean data loss.
*/ */
static void static void
ATExecDropInherits(Relation rel, RangeVar *parent) ATExecDropInherit(Relation rel, RangeVar *parent)
{ {
Relation parent_rel;
Relation catalogRelation; Relation catalogRelation;
SysScanDesc scan; SysScanDesc scan;
ScanKeyData key[2]; ScanKeyData key[3];
HeapTuple inheritsTuple, HeapTuple inheritsTuple,
attributeTuple, attributeTuple,
depTuple; depTuple;
Oid inhparent; bool found = false;
Oid dropparent;
int found = false;
/* /*
* Get the OID of parent -- if no schema is specified use the regular * AccessShareLock on the parent is probably enough, seeing that DROP TABLE
* search path and only drop the one table that's found. We could try to * doesn't lock parent tables at all. We need some lock since we'll be
* be clever and look at each parent and see if it matches but that would * inspecting the parent's schema.
* be inconsistent with other operations I think.
*/ */
dropparent = RangeVarGetRelid(parent, false); parent_rel = heap_openrv(parent, AccessShareLock);
/* Search through the direct parents of rel looking for dropparent oid */ /*
* We don't bother to check ownership of the parent table --- ownership
* of the child is presumed enough rights.
*/
/*
* Find and destroy the pg_inherits entry linking the two, or error out
* if there is none.
*/
catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock); catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
ScanKeyInit(key, ScanKeyInit(&key[0],
Anum_pg_inherits_inhrelid, Anum_pg_inherits_inhrelid,
BTEqualStrategyNumber, F_OIDEQ, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel))); ObjectIdGetDatum(RelationGetRelid(rel)));
...@@ -6446,8 +6478,10 @@ ATExecDropInherits(Relation rel, RangeVar *parent) ...@@ -6446,8 +6478,10 @@ ATExecDropInherits(Relation rel, RangeVar *parent)
while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan))) while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
{ {
Oid inhparent;
inhparent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent; inhparent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent;
if (inhparent == dropparent) if (inhparent == RelationGetRelid(parent_rel))
{ {
simple_heap_delete(catalogRelation, &inheritsTuple->t_self); simple_heap_delete(catalogRelation, &inheritsTuple->t_self);
found = true; found = true;
...@@ -6459,26 +6493,19 @@ ATExecDropInherits(Relation rel, RangeVar *parent) ...@@ -6459,26 +6493,19 @@ ATExecDropInherits(Relation rel, RangeVar *parent)
heap_close(catalogRelation, RowExclusiveLock); heap_close(catalogRelation, RowExclusiveLock);
if (!found) if (!found)
{ ereport(ERROR,
if (parent->schemaname) (errcode(ERRCODE_UNDEFINED_TABLE),
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s.%s\" is not a parent of relation \"%s\"",
parent->schemaname, parent->relname, RelationGetRelationName(rel))));
else
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s\" is not a parent of relation \"%s\"", errmsg("relation \"%s\" is not a parent of relation \"%s\"",
parent->relname, RelationGetRelationName(rel)))); RelationGetRelationName(parent_rel),
} RelationGetRelationName(rel))));
/* Search through columns looking for matching columns from parent table */
/*
* Search through child columns looking for ones matching parent rel
*/
catalogRelation = heap_open(AttributeRelationId, RowExclusiveLock); catalogRelation = heap_open(AttributeRelationId, RowExclusiveLock);
ScanKeyInit(key, ScanKeyInit(&key[0],
Anum_pg_attribute_attrelid, Anum_pg_attribute_attrelid,
BTEqualStrategyNumber, BTEqualStrategyNumber, F_OIDEQ,
F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel))); ObjectIdGetDatum(RelationGetRelid(rel)));
scan = systable_beginscan(catalogRelation, AttributeRelidNumIndexId, scan = systable_beginscan(catalogRelation, AttributeRelidNumIndexId,
true, SnapshotNow, 1, key); true, SnapshotNow, 1, key);
...@@ -6486,18 +6513,16 @@ ATExecDropInherits(Relation rel, RangeVar *parent) ...@@ -6486,18 +6513,16 @@ ATExecDropInherits(Relation rel, RangeVar *parent)
{ {
Form_pg_attribute att = (Form_pg_attribute) GETSTRUCT(attributeTuple); Form_pg_attribute att = (Form_pg_attribute) GETSTRUCT(attributeTuple);
/* /* Ignore if dropped or not inherited */
* Not an inherited column at all (do NOT use islocal for this
* test--it can be true for inherited columns)
*/
if (att->attinhcount == 0)
continue;
if (att->attisdropped) if (att->attisdropped)
continue; continue;
if (att->attinhcount <= 0)
continue;
if (SearchSysCacheExistsAttName(dropparent, NameStr(att->attname))) if (SearchSysCacheExistsAttName(RelationGetRelid(parent_rel),
NameStr(att->attname)))
{ {
/* Decrement inhcount and possibly set islocal to 1 */ /* Decrement inhcount and possibly set islocal to true */
HeapTuple copyTuple = heap_copytuple(attributeTuple); HeapTuple copyTuple = heap_copytuple(attributeTuple);
Form_pg_attribute copy_att = (Form_pg_attribute) GETSTRUCT(copyTuple); Form_pg_attribute copy_att = (Form_pg_attribute) GETSTRUCT(copyTuple);
...@@ -6506,14 +6531,6 @@ ATExecDropInherits(Relation rel, RangeVar *parent) ...@@ -6506,14 +6531,6 @@ ATExecDropInherits(Relation rel, RangeVar *parent)
copy_att->attislocal = true; copy_att->attislocal = true;
simple_heap_update(catalogRelation, &copyTuple->t_self, copyTuple); simple_heap_update(catalogRelation, &copyTuple->t_self, copyTuple);
/*
* XXX "Avoid using it for multiple tuples, since opening the
* indexes and building the index info structures is moderately
* expensive." Perhaps this can be moved outside the loop or else
* at least the CatalogOpenIndexes/CatalogCloseIndexes moved
* outside the loop but when I try that it seg faults?!
*/
CatalogUpdateIndexes(catalogRelation, copyTuple); CatalogUpdateIndexes(catalogRelation, copyTuple);
heap_freetuple(copyTuple); heap_freetuple(copyTuple);
} }
...@@ -6526,40 +6543,40 @@ ATExecDropInherits(Relation rel, RangeVar *parent) ...@@ -6526,40 +6543,40 @@ ATExecDropInherits(Relation rel, RangeVar *parent)
* *
* There's no convenient way to do this, so go trawling through pg_depend * There's no convenient way to do this, so go trawling through pg_depend
*/ */
catalogRelation = heap_open(DependRelationId, RowExclusiveLock); catalogRelation = heap_open(DependRelationId, RowExclusiveLock);
ScanKeyInit(&key[0], ScanKeyInit(&key[0],
Anum_pg_depend_classid, Anum_pg_depend_classid,
BTEqualStrategyNumber, F_OIDEQ, BTEqualStrategyNumber, F_OIDEQ,
RelationRelationId); ObjectIdGetDatum(RelationRelationId));
ScanKeyInit(&key[1], ScanKeyInit(&key[1],
Anum_pg_depend_objid, Anum_pg_depend_objid,
BTEqualStrategyNumber, F_OIDEQ, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(RelationGetRelid(rel))); ObjectIdGetDatum(RelationGetRelid(rel)));
ScanKeyInit(&key[2],
Anum_pg_depend_objsubid,
BTEqualStrategyNumber, F_INT4EQ,
Int32GetDatum(0));
scan = systable_beginscan(catalogRelation, DependDependerIndexId, true, scan = systable_beginscan(catalogRelation, DependDependerIndexId, true,
SnapshotNow, 2, key); SnapshotNow, 3, key);
while (HeapTupleIsValid(depTuple = systable_getnext(scan))) while (HeapTupleIsValid(depTuple = systable_getnext(scan)))
{ {
Form_pg_depend dep = (Form_pg_depend) GETSTRUCT(depTuple); Form_pg_depend dep = (Form_pg_depend) GETSTRUCT(depTuple);
if (dep->refclassid == RelationRelationId && if (dep->refclassid == RelationRelationId &&
dep->refobjid == dropparent && dep->refobjid == RelationGetRelid(parent_rel) &&
dep->refobjsubid == 0 &&
dep->deptype == DEPENDENCY_NORMAL) dep->deptype == DEPENDENCY_NORMAL)
{
/*
* Only delete a single dependency -- there shouldn't be more but
* just in case...
*/
simple_heap_delete(catalogRelation, &depTuple->t_self); simple_heap_delete(catalogRelation, &depTuple->t_self);
break;
}
} }
systable_endscan(scan); systable_endscan(scan);
heap_close(catalogRelation, RowExclusiveLock); heap_close(catalogRelation, RowExclusiveLock);
/* keep our lock on the parent relation until commit */
heap_close(parent_rel, NoLock);
} }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.351 2006/10/04 00:29:53 momjian Exp $ * $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.352 2006/10/13 21:43:18 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -1821,7 +1821,6 @@ _copyAlterTableCmd(AlterTableCmd *from) ...@@ -1821,7 +1821,6 @@ _copyAlterTableCmd(AlterTableCmd *from)
COPY_SCALAR_FIELD(subtype); COPY_SCALAR_FIELD(subtype);
COPY_STRING_FIELD(name); COPY_STRING_FIELD(name);
COPY_NODE_FIELD(parent);
COPY_NODE_FIELD(def); COPY_NODE_FIELD(def);
COPY_NODE_FIELD(transform); COPY_NODE_FIELD(transform);
COPY_SCALAR_FIELD(behavior); COPY_SCALAR_FIELD(behavior);
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.285 2006/10/04 00:29:53 momjian Exp $ * $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.286 2006/10/13 21:43:18 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -768,7 +768,6 @@ _equalAlterTableCmd(AlterTableCmd *a, AlterTableCmd *b) ...@@ -768,7 +768,6 @@ _equalAlterTableCmd(AlterTableCmd *a, AlterTableCmd *b)
{ {
COMPARE_SCALAR_FIELD(subtype); COMPARE_SCALAR_FIELD(subtype);
COMPARE_STRING_FIELD(name); COMPARE_STRING_FIELD(name);
COMPARE_NODE_FIELD(parent);
COMPARE_NODE_FIELD(def); COMPARE_NODE_FIELD(def);
COMPARE_NODE_FIELD(transform); COMPARE_NODE_FIELD(transform);
COMPARE_SCALAR_FIELD(behavior); COMPARE_SCALAR_FIELD(behavior);
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.566 2006/09/28 20:51:42 tgl Exp $ * $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.567 2006/10/13 21:43:19 tgl Exp $
* *
* HISTORY * HISTORY
* AUTHOR DATE MAJOR EVENT * AUTHOR DATE MAJOR EVENT
...@@ -1506,20 +1506,20 @@ alter_table_cmd: ...@@ -1506,20 +1506,20 @@ alter_table_cmd:
n->subtype = AT_DisableTrigUser; n->subtype = AT_DisableTrigUser;
$$ = (Node *)n; $$ = (Node *)n;
} }
/* ALTER TABLE <name> ALTER INHERITS ADD <parent> */ /* ALTER TABLE <name> INHERIT <parent> */
| INHERIT qualified_name | INHERIT qualified_name
{ {
AlterTableCmd *n = makeNode(AlterTableCmd); AlterTableCmd *n = makeNode(AlterTableCmd);
n->subtype = AT_AddInherits; n->subtype = AT_AddInherit;
n->parent = $2; n->def = (Node *) $2;
$$ = (Node *)n; $$ = (Node *)n;
} }
/* ALTER TABLE <name> alter INHERITS DROP <parent> */ /* ALTER TABLE <name> NO INHERIT <parent> */
| NO INHERIT qualified_name | NO INHERIT qualified_name
{ {
AlterTableCmd *n = makeNode(AlterTableCmd); AlterTableCmd *n = makeNode(AlterTableCmd);
n->subtype = AT_DropInherits; n->subtype = AT_DropInherit;
n->parent = $3; n->def = (Node *) $3;
$$ = (Node *)n; $$ = (Node *)n;
} }
| alter_rel_cmd | alter_rel_cmd
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.332 2006/10/11 16:42:59 tgl Exp $ * $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.333 2006/10/13 21:43:19 tgl Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -905,8 +905,8 @@ typedef enum AlterTableType ...@@ -905,8 +905,8 @@ typedef enum AlterTableType
AT_DisableTrigAll, /* DISABLE TRIGGER ALL */ AT_DisableTrigAll, /* DISABLE TRIGGER ALL */
AT_EnableTrigUser, /* ENABLE TRIGGER USER */ AT_EnableTrigUser, /* ENABLE TRIGGER USER */
AT_DisableTrigUser, /* DISABLE TRIGGER USER */ AT_DisableTrigUser, /* DISABLE TRIGGER USER */
AT_AddInherits, /* ADD INHERITS parent */ AT_AddInherit, /* INHERIT parent */
AT_DropInherits /* DROP INHERITS parent */ AT_DropInherit /* NO INHERIT parent */
} AlterTableType; } AlterTableType;
typedef struct AlterTableCmd /* one subcommand of an ALTER TABLE */ typedef struct AlterTableCmd /* one subcommand of an ALTER TABLE */
...@@ -915,9 +915,8 @@ typedef struct AlterTableCmd /* one subcommand of an ALTER TABLE */ ...@@ -915,9 +915,8 @@ typedef struct AlterTableCmd /* one subcommand of an ALTER TABLE */
AlterTableType subtype; /* Type of table alteration to apply */ AlterTableType subtype; /* Type of table alteration to apply */
char *name; /* column, constraint, or trigger to act on, char *name; /* column, constraint, or trigger to act on,
* or new owner or tablespace */ * or new owner or tablespace */
RangeVar *parent; /* Parent table for add/drop inherits */
Node *def; /* definition of new column, column type, Node *def; /* definition of new column, column type,
* index, or constraint */ * index, constraint, or parent table */
Node *transform; /* transformation expr for ALTER TYPE */ Node *transform; /* transformation expr for ALTER TYPE */
DropBehavior behavior; /* RESTRICT or CASCADE for DROP cases */ DropBehavior behavior; /* RESTRICT or CASCADE for DROP cases */
} AlterTableCmd; } AlterTableCmd;
......
...@@ -324,7 +324,7 @@ select test2 from atacc2; ...@@ -324,7 +324,7 @@ select test2 from atacc2;
-- fail due to missing constraint -- fail due to missing constraint
alter table atacc2 add constraint foo check (test2>0); alter table atacc2 add constraint foo check (test2>0);
alter table atacc3 inherit atacc2; alter table atacc3 inherit atacc2;
ERROR: child table missing constraint matching parent table constraint "foo" ERROR: child table is missing constraint "foo"
-- fail due to missing column -- fail due to missing column
alter table atacc3 rename test2 to testx; alter table atacc3 rename test2 to testx;
alter table atacc3 inherit atacc2; alter table atacc3 inherit atacc2;
...@@ -342,10 +342,10 @@ alter table atacc3 inherit atacc2; ...@@ -342,10 +342,10 @@ alter table atacc3 inherit atacc2;
alter table atacc3 inherit atacc2; alter table atacc3 inherit atacc2;
ERROR: inherited relation "atacc2" duplicated ERROR: inherited relation "atacc2" duplicated
alter table atacc2 inherit atacc3; alter table atacc2 inherit atacc3;
ERROR: circular inheritance structure found ERROR: circular inheritance not allowed
DETAIL: "atacc3" is already a child of "atacc2". DETAIL: "atacc3" is already a child of "atacc2".
alter table atacc2 inherit atacc2; alter table atacc2 inherit atacc2;
ERROR: circular inheritance structure found ERROR: circular inheritance not allowed
DETAIL: "atacc2" is already a child of "atacc2". DETAIL: "atacc2" is already a child of "atacc2".
-- test that we really are a child now (should see 4 not 3 and cascade should go through) -- test that we really are a child now (should see 4 not 3 and cascade should go through)
select test2 from atacc2; select test2 from atacc2;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment