Commit 6c572399 authored by Robert Haas's avatar Robert Haas

Rearrange "add column" logic to merge columns at exec time.

The previous coding set attinhcount too high in some cases, resulting in
an undumpable, undroppable column.  Per bug #5856, reported by Naoya
Anzai.  See also commit 31b6fc06, which
fixes a similar bug in ALTER TABLE .. ADD CONSTRAINT.

Patch by Noah Misch.
parent cabf5d84
...@@ -285,16 +285,15 @@ static void ATSimplePermissions(Relation rel, int allowed_targets); ...@@ -285,16 +285,15 @@ static void ATSimplePermissions(Relation rel, int allowed_targets);
static void ATWrongRelkindError(Relation rel, int allowed_targets); static void ATWrongRelkindError(Relation rel, int allowed_targets);
static void ATSimpleRecursion(List **wqueue, Relation rel, static void ATSimpleRecursion(List **wqueue, Relation rel,
AlterTableCmd *cmd, bool recurse, LOCKMODE lockmode); AlterTableCmd *cmd, bool recurse, LOCKMODE lockmode);
static void ATOneLevelRecursion(List **wqueue, Relation rel,
AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATTypedTableRecursion(List **wqueue, Relation rel, AlterTableCmd *cmd, static void ATTypedTableRecursion(List **wqueue, Relation rel, AlterTableCmd *cmd,
LOCKMODE lockmode); LOCKMODE lockmode);
static List *find_typed_table_dependencies(Oid typeOid, const char *typeName, static List *find_typed_table_dependencies(Oid typeOid, const char *typeName,
DropBehavior behavior); DropBehavior behavior);
static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing, static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing,
AlterTableCmd *cmd, LOCKMODE lockmode); AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel, static void ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
ColumnDef *colDef, bool isOid, LOCKMODE lockmode); ColumnDef *colDef, bool isOid,
bool recurse, bool recursing, LOCKMODE lockmode);
static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid, Oid collid); static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid, Oid collid);
static void ATPrepAddOids(List **wqueue, Relation rel, bool recurse, static void ATPrepAddOids(List **wqueue, Relation rel, bool recurse,
AlterTableCmd *cmd, LOCKMODE lockmode); AlterTableCmd *cmd, LOCKMODE lockmode);
...@@ -2775,15 +2774,15 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ...@@ -2775,15 +2774,15 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
case AT_AddColumn: /* ADD COLUMN */ case AT_AddColumn: /* ADD COLUMN */
ATSimplePermissions(rel, ATSimplePermissions(rel,
ATT_TABLE|ATT_COMPOSITE_TYPE|ATT_FOREIGN_TABLE); ATT_TABLE|ATT_COMPOSITE_TYPE|ATT_FOREIGN_TABLE);
/* Performs own recursion */
ATPrepAddColumn(wqueue, rel, recurse, recursing, cmd, lockmode); ATPrepAddColumn(wqueue, rel, recurse, recursing, cmd, lockmode);
/* Recursion occurs during execution phase */
pass = AT_PASS_ADD_COL; pass = AT_PASS_ADD_COL;
break; break;
case AT_AddColumnToView: /* add column via CREATE OR REPLACE case AT_AddColumnToView: /* add column via CREATE OR REPLACE
* VIEW */ * VIEW */
ATSimplePermissions(rel, ATT_VIEW); ATSimplePermissions(rel, ATT_VIEW);
/* Performs own recursion */
ATPrepAddColumn(wqueue, rel, recurse, recursing, cmd, lockmode); ATPrepAddColumn(wqueue, rel, recurse, recursing, cmd, lockmode);
/* Recursion occurs during execution phase */
pass = AT_PASS_ADD_COL; pass = AT_PASS_ADD_COL;
break; break;
case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
...@@ -2885,9 +2884,9 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ...@@ -2885,9 +2884,9 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
break; break;
case AT_AddOids: /* SET WITH OIDS */ case AT_AddOids: /* SET WITH OIDS */
ATSimplePermissions(rel, ATT_TABLE); ATSimplePermissions(rel, ATT_TABLE);
/* Performs own recursion */
if (!rel->rd_rel->relhasoids || recursing) if (!rel->rd_rel->relhasoids || recursing)
ATPrepAddOids(wqueue, rel, recurse, cmd, lockmode); ATPrepAddOids(wqueue, rel, recurse, cmd, lockmode);
/* Recursion occurs during execution phase */
pass = AT_PASS_ADD_COL; pass = AT_PASS_ADD_COL;
break; break;
case AT_DropOids: /* SET WITHOUT OIDS */ case AT_DropOids: /* SET WITHOUT OIDS */
...@@ -3043,7 +3042,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -3043,7 +3042,12 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
case AT_AddColumn: /* ADD COLUMN */ case AT_AddColumn: /* ADD COLUMN */
case AT_AddColumnToView: /* add column via CREATE OR REPLACE case AT_AddColumnToView: /* add column via CREATE OR REPLACE
* VIEW */ * VIEW */
ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, false, lockmode); ATExecAddColumn(wqueue, tab, rel, (ColumnDef *) cmd->def,
false, false, false, lockmode);
break;
case AT_AddColumnRecurse:
ATExecAddColumn(wqueue, tab, rel, (ColumnDef *) cmd->def,
false, true, false, lockmode);
break; break;
case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
ATExecColumnDefault(rel, cmd->name, cmd->def, lockmode); ATExecColumnDefault(rel, cmd->name, cmd->def, lockmode);
...@@ -3121,7 +3125,14 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -3121,7 +3125,14 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
case AT_AddOids: /* SET WITH OIDS */ case AT_AddOids: /* SET WITH OIDS */
/* Use the ADD COLUMN code, unless prep decided to do nothing */ /* Use the ADD COLUMN code, unless prep decided to do nothing */
if (cmd->def != NULL) if (cmd->def != NULL)
ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, true, lockmode); ATExecAddColumn(wqueue, tab, rel, (ColumnDef *) cmd->def,
true, false, false, lockmode);
break;
case AT_AddOidsRecurse: /* SET WITH OIDS */
/* Use the ADD COLUMN code, unless prep decided to do nothing */
if (cmd->def != NULL)
ATExecAddColumn(wqueue, tab, rel, (ColumnDef *) cmd->def,
true, true, false, lockmode);
break; break;
case AT_DropOids: /* SET WITHOUT OIDS */ case AT_DropOids: /* SET WITHOUT OIDS */
...@@ -3842,37 +3853,6 @@ ATSimpleRecursion(List **wqueue, Relation rel, ...@@ -3842,37 +3853,6 @@ ATSimpleRecursion(List **wqueue, Relation rel,
} }
} }
/*
* ATOneLevelRecursion
*
* Here, we visit only direct inheritance children. It is expected that
* the command's prep routine will recurse again to find indirect children.
* When using this technique, a multiply-inheriting child will be visited
* multiple times.
*/
static void
ATOneLevelRecursion(List **wqueue, Relation rel,
AlterTableCmd *cmd, LOCKMODE lockmode)
{
Oid relid = RelationGetRelid(rel);
ListCell *child;
List *children;
children = find_inheritance_children(relid, lockmode);
foreach(child, children)
{
Oid childrelid = lfirst_oid(child);
Relation childrel;
/* find_inheritance_children already got lock */
childrel = relation_open(childrelid, NoLock);
CheckTableNotInUse(childrel, "ALTER TABLE");
ATPrepCmd(wqueue, childrel, cmd, true, true, lockmode);
relation_close(childrel, NoLock);
}
}
/* /*
* ATTypedTableRecursion * ATTypedTableRecursion
* *
...@@ -4060,6 +4040,12 @@ find_typed_table_dependencies(Oid typeOid, const char *typeName, DropBehavior be ...@@ -4060,6 +4040,12 @@ find_typed_table_dependencies(Oid typeOid, const char *typeName, DropBehavior be
* CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the * CHECK, NOT NULL, and FOREIGN KEY constraints will be removed from the
* AT_AddColumn AlterTableCmd by parse_utilcmd.c and added as independent * AT_AddColumn AlterTableCmd by parse_utilcmd.c and added as independent
* AlterTableCmd's. * AlterTableCmd's.
*
* ADD COLUMN cannot use the normal ALTER TABLE recursion mechanism, because we
* have to decide at runtime whether to recurse or not depending on whether we
* actually add a column or merely merge with an existing column. (We can't
* check this in a static pre-pass because it won't handle multiple inheritance
* situations correctly.)
*/ */
static void static void
ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing, ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing,
...@@ -4070,43 +4056,17 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing, ...@@ -4070,43 +4056,17 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, bool recursing,
(errcode(ERRCODE_WRONG_OBJECT_TYPE), (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot add column to typed table"))); errmsg("cannot add column to typed table")));
/*
* Recurse to add the column to child classes, if requested.
*
* We must recurse one level at a time, so that multiply-inheriting
* children are visited the right number of times and end up with the
* right attinhcount.
*/
if (recurse)
{
AlterTableCmd *childCmd = copyObject(cmd);
ColumnDef *colDefChild = (ColumnDef *) childCmd->def;
/* Child should see column as singly inherited */
colDefChild->inhcount = 1;
colDefChild->is_local = false;
ATOneLevelRecursion(wqueue, rel, childCmd, lockmode);
}
else
{
/*
* If we are told not to recurse, there had better not be any child
* tables; else the addition would put them out of step.
*/
if (find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("column must be added to child tables too")));
}
if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE) if (rel->rd_rel->relkind == RELKIND_COMPOSITE_TYPE)
ATTypedTableRecursion(wqueue, rel, cmd, lockmode); ATTypedTableRecursion(wqueue, rel, cmd, lockmode);
if (recurse)
cmd->subtype = AT_AddColumnRecurse;
} }
static void static void
ATExecAddColumn(AlteredTableInfo *tab, Relation rel, ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
ColumnDef *colDef, bool isOid, LOCKMODE lockmode) ColumnDef *colDef, bool isOid,
bool recurse, bool recursing, LOCKMODE lockmode)
{ {
Oid myrelid = RelationGetRelid(rel); Oid myrelid = RelationGetRelid(rel);
Relation pgclass, Relation pgclass,
...@@ -4121,12 +4081,20 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, ...@@ -4121,12 +4081,20 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
Oid collOid; Oid collOid;
Form_pg_type tform; Form_pg_type tform;
Expr *defval; Expr *defval;
List *children;
ListCell *child;
/* At top level, permission check was done in ATPrepCmd, else do it */
if (recursing)
ATSimplePermissions(rel, ATT_TABLE);
attrdesc = heap_open(AttributeRelationId, RowExclusiveLock); attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
/* /*
* Are we adding the column to a recursion child? If so, check whether to * Are we adding the column to a recursion child? If so, check whether to
* merge with an existing definition for the column. * merge with an existing definition for the column. If we do merge,
* we must not recurse. Children will already have the column, and
* recursing into them would mess up attinhcount.
*/ */
if (colDef->inhcount > 0) if (colDef->inhcount > 0)
{ {
...@@ -4389,6 +4357,50 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel, ...@@ -4389,6 +4357,50 @@ ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
* Add needed dependency entries for the new column. * Add needed dependency entries for the new column.
*/ */
add_column_datatype_dependency(myrelid, newattnum, attribute.atttypid, attribute.attcollation); add_column_datatype_dependency(myrelid, newattnum, attribute.atttypid, attribute.attcollation);
/*
* Propagate to children as appropriate. Unlike most other ALTER
* routines, we have to do this one level of recursion at a time; we can't
* use find_all_inheritors to do it in one pass.
*/
children = find_inheritance_children(RelationGetRelid(rel), lockmode);
/*
* If we are told not to recurse, there had better not be any child
* tables; else the addition would put them out of step.
*/
if (children && !recurse)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("column must be added to child tables too")));
/* Children should see column as singly inherited */
if (!recursing)
{
colDef = copyObject(colDef);
colDef->inhcount = 1;
colDef->is_local = false;
}
foreach(child, children)
{
Oid childrelid = lfirst_oid(child);
Relation childrel;
AlteredTableInfo *childtab;
/* find_inheritance_children already got lock */
childrel = heap_open(childrelid, NoLock);
CheckTableNotInUse(childrel, "ALTER TABLE");
/* Find or create work queue entry for this table */
childtab = ATGetQueueEntry(wqueue, childrel);
/* Recurse to child */
ATExecAddColumn(wqueue, childtab, childrel,
colDef, isOid, recurse, true, lockmode);
heap_close(childrel, NoLock);
}
} }
/* /*
...@@ -4440,6 +4452,9 @@ ATPrepAddOids(List **wqueue, Relation rel, bool recurse, AlterTableCmd *cmd, LOC ...@@ -4440,6 +4452,9 @@ ATPrepAddOids(List **wqueue, Relation rel, bool recurse, AlterTableCmd *cmd, LOC
cmd->def = (Node *) cdef; cmd->def = (Node *) cdef;
} }
ATPrepAddColumn(wqueue, rel, recurse, false, cmd, lockmode); ATPrepAddColumn(wqueue, rel, recurse, false, cmd, lockmode);
if (recurse)
cmd->subtype = AT_AddOidsRecurse;
} }
/* /*
......
...@@ -1173,6 +1173,7 @@ typedef struct AlterTableStmt ...@@ -1173,6 +1173,7 @@ typedef struct AlterTableStmt
typedef enum AlterTableType typedef enum AlterTableType
{ {
AT_AddColumn, /* add column */ AT_AddColumn, /* add column */
AT_AddColumnRecurse, /* internal to commands/tablecmds.c */
AT_AddColumnToView, /* implicitly via CREATE OR REPLACE VIEW */ AT_AddColumnToView, /* implicitly via CREATE OR REPLACE VIEW */
AT_ColumnDefault, /* alter column default */ AT_ColumnDefault, /* alter column default */
AT_DropNotNull, /* alter column drop not null */ AT_DropNotNull, /* alter column drop not null */
...@@ -1198,6 +1199,7 @@ typedef enum AlterTableType ...@@ -1198,6 +1199,7 @@ typedef enum AlterTableType
AT_ClusterOn, /* CLUSTER ON */ AT_ClusterOn, /* CLUSTER ON */
AT_DropCluster, /* SET WITHOUT CLUSTER */ AT_DropCluster, /* SET WITHOUT CLUSTER */
AT_AddOids, /* SET WITH OIDS */ AT_AddOids, /* SET WITH OIDS */
AT_AddOidsRecurse, /* internal to commands/tablecmds.c */
AT_DropOids, /* SET WITHOUT OIDS */ AT_DropOids, /* SET WITHOUT OIDS */
AT_SetTableSpace, /* SET TABLESPACE */ AT_SetTableSpace, /* SET TABLESPACE */
AT_SetRelOptions, /* SET (...) -- AM specific parameters */ AT_SetRelOptions, /* SET (...) -- AM specific parameters */
......
...@@ -1198,6 +1198,23 @@ drop table p1, p2 cascade; ...@@ -1198,6 +1198,23 @@ drop table p1, p2 cascade;
NOTICE: drop cascades to 2 other objects NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table c1 DETAIL: drop cascades to table c1
drop cascades to table gc1 drop cascades to table gc1
-- test attinhcount tracking with merged columns
create table depth0();
create table depth1(c text) inherits (depth0);
create table depth2() inherits (depth1);
alter table depth0 add c text;
NOTICE: merging definition of column "c" for child "depth1"
select attrelid::regclass, attname, attinhcount, attislocal
from pg_attribute
where attnum > 0 and attrelid::regclass in ('depth0', 'depth1', 'depth2')
order by attrelid::regclass::text, attnum;
attrelid | attname | attinhcount | attislocal
----------+---------+-------------+------------
depth0 | c | 0 | t
depth1 | c | 1 | t
depth2 | c | 1 | f
(3 rows)
-- --
-- Test the ALTER TABLE SET WITH/WITHOUT OIDS command -- Test the ALTER TABLE SET WITH/WITHOUT OIDS command
-- --
......
...@@ -944,6 +944,18 @@ order by relname, attnum; ...@@ -944,6 +944,18 @@ order by relname, attnum;
drop table p1, p2 cascade; drop table p1, p2 cascade;
-- test attinhcount tracking with merged columns
create table depth0();
create table depth1(c text) inherits (depth0);
create table depth2() inherits (depth1);
alter table depth0 add c text;
select attrelid::regclass, attname, attinhcount, attislocal
from pg_attribute
where attnum > 0 and attrelid::regclass in ('depth0', 'depth1', 'depth2')
order by attrelid::regclass::text, attnum;
-- --
-- Test the ALTER TABLE SET WITH/WITHOUT OIDS command -- Test the ALTER TABLE SET WITH/WITHOUT OIDS command
-- --
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment