Commit 731d514a authored by Alvaro Herrera's avatar Alvaro Herrera

Fix ENABLE/DISABLE TRIGGER to handle recursion correctly

Using ATSimpleRecursion() in ATPrepCmd() to do so as bbb927b4 did is
not correct, because ATPrepCmd() can't distinguish between triggers that
may be cloned and those that may not, so would wrongly try to recurse
for the latter category of triggers.

So this commit restores the code in EnableDisableTrigger() that
86f57594 had added to do the recursion, which would do it only for
triggers that may be cloned, that is, row-level triggers.  This also
changes tablecmds.c such that ATExecCmd() is able to pass the value of
ONLY flag down to EnableDisableTrigger() using its new 'recurse'
parameter.

This also fixes what seems like an oversight of 86f57594 that the
recursion to partition triggers would only occur if EnableDisableTrigger()
had actually changed the trigger.  It is more apt to recurse to inspect
partition triggers even if the parent's trigger didn't need to be
changed: only then can we be certain that all descendants share the same
state afterwards.

Backpatch all the way back to 11, like bbb927b4.  Care is taken not
to break ABI compatibility (and that no catversion bump is needed.)
Co-authored-by: default avatarAmit Langote <amitlangote09@gmail.com>
Reviewed-by: default avatarDmitry Koval <d.koval@postgrespro.ru>
Discussion: https://postgr.es/m/CA+HiwqG-cZT3XzGAnEgZQLoQbyfJApVwOTQaCaas1mhpf+4V5A@mail.gmail.com
parent efba7a63
...@@ -553,7 +553,8 @@ static void ATExecSetRelOptions(Relation rel, List *defList, ...@@ -553,7 +553,8 @@ static void ATExecSetRelOptions(Relation rel, List *defList,
AlterTableType operation, AlterTableType operation,
LOCKMODE lockmode); LOCKMODE lockmode);
static void ATExecEnableDisableTrigger(Relation rel, const char *trigname, static void ATExecEnableDisableTrigger(Relation rel, const char *trigname,
char fires_when, bool skip_system, LOCKMODE lockmode); char fires_when, bool skip_system, bool recurse,
LOCKMODE lockmode);
static void ATExecEnableDisableRule(Relation rel, const char *rulename, static void ATExecEnableDisableRule(Relation rel, const char *rulename,
char fires_when, LOCKMODE lockmode); char fires_when, LOCKMODE lockmode);
static void ATPrepAddInherit(Relation child_rel); static void ATPrepAddInherit(Relation child_rel);
...@@ -4021,9 +4022,7 @@ AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode) ...@@ -4021,9 +4022,7 @@ AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode)
* be done in this phase. Generally, this phase acquires table locks, * be done in this phase. Generally, this phase acquires table locks,
* checks permissions and relkind, and recurses to find child tables. * checks permissions and relkind, and recurses to find child tables.
* *
* ATRewriteCatalogs performs phase 2 for each affected table. (Note that * ATRewriteCatalogs performs phase 2 for each affected table.
* phases 2 and 3 normally do no explicit recursion, since phase 1 already
* did it --- although some subcommands have to recurse in phase 2 instead.)
* Certain subcommands need to be performed before others to avoid * Certain subcommands need to be performed before others to avoid
* unnecessary conflicts; for example, DROP COLUMN should come before * unnecessary conflicts; for example, DROP COLUMN should come before
* ADD COLUMN. Therefore phase 1 divides the subcommands into multiple * ADD COLUMN. Therefore phase 1 divides the subcommands into multiple
...@@ -4031,6 +4030,12 @@ AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode) ...@@ -4031,6 +4030,12 @@ AlterTableLookupRelation(AlterTableStmt *stmt, LOCKMODE lockmode)
* *
* ATRewriteTables performs phase 3 for those tables that need it. * ATRewriteTables performs phase 3 for those tables that need it.
* *
* For most subcommand types, phases 2 and 3 do no explicit recursion,
* since phase 1 already does it. However, for certain subcommand types
* it is only possible to determine how to recurse at phase 2 time; for
* those cases, phase 1 sets the cmd->recurse flag (or, in some older coding,
* changes the command subtype of a "Recurse" variant XXX to be cleaned up.)
*
* Thanks to the magic of MVCC, an error anywhere along the way rolls back * Thanks to the magic of MVCC, an error anywhere along the way rolls back
* the whole operation; we don't have to do anything special to clean up. * the whole operation; we don't have to do anything special to clean up.
* *
...@@ -4450,10 +4455,10 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ...@@ -4450,10 +4455,10 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
errhint("Use ALTER TABLE ... DETACH PARTITION ... FINALIZE to complete the pending detach operation.")); errhint("Use ALTER TABLE ... DETACH PARTITION ... FINALIZE to complete the pending detach operation."));
/* /*
* Copy the original subcommand for each table. This avoids conflicts * Copy the original subcommand for each table, so we can scribble on it.
* when different child tables need to make different parse * This avoids conflicts when different child tables need to make
* transformations (for example, the same column may have different column * different parse transformations (for example, the same column may have
* numbers in different children). * different column numbers in different children).
*/ */
cmd = copyObject(cmd); cmd = copyObject(cmd);
...@@ -4722,8 +4727,9 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ...@@ -4722,8 +4727,9 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
case AT_DisableTrigAll: case AT_DisableTrigAll:
case AT_DisableTrigUser: case AT_DisableTrigUser:
ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE); ATSimplePermissions(rel, ATT_TABLE | ATT_FOREIGN_TABLE);
if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) /* Set up recursion for phase 2; no other prep needed */
ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); if (recurse)
cmd->recurse = true;
pass = AT_PASS_MISC; pass = AT_PASS_MISC;
break; break;
case AT_EnableRule: /* ENABLE/DISABLE RULE variants */ case AT_EnableRule: /* ENABLE/DISABLE RULE variants */
...@@ -5061,35 +5067,51 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, ...@@ -5061,35 +5067,51 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab,
break; break;
case AT_EnableTrig: /* ENABLE TRIGGER name */ case AT_EnableTrig: /* ENABLE TRIGGER name */
ATExecEnableDisableTrigger(rel, cmd->name, ATExecEnableDisableTrigger(rel, cmd->name,
TRIGGER_FIRES_ON_ORIGIN, false, lockmode); TRIGGER_FIRES_ON_ORIGIN, false,
cmd->recurse,
lockmode);
break; break;
case AT_EnableAlwaysTrig: /* ENABLE ALWAYS TRIGGER name */ case AT_EnableAlwaysTrig: /* ENABLE ALWAYS TRIGGER name */
ATExecEnableDisableTrigger(rel, cmd->name, ATExecEnableDisableTrigger(rel, cmd->name,
TRIGGER_FIRES_ALWAYS, false, lockmode); TRIGGER_FIRES_ALWAYS, false,
cmd->recurse,
lockmode);
break; break;
case AT_EnableReplicaTrig: /* ENABLE REPLICA TRIGGER name */ case AT_EnableReplicaTrig: /* ENABLE REPLICA TRIGGER name */
ATExecEnableDisableTrigger(rel, cmd->name, ATExecEnableDisableTrigger(rel, cmd->name,
TRIGGER_FIRES_ON_REPLICA, false, lockmode); TRIGGER_FIRES_ON_REPLICA, false,
cmd->recurse,
lockmode);
break; break;
case AT_DisableTrig: /* DISABLE TRIGGER name */ case AT_DisableTrig: /* DISABLE TRIGGER name */
ATExecEnableDisableTrigger(rel, cmd->name, ATExecEnableDisableTrigger(rel, cmd->name,
TRIGGER_DISABLED, false, lockmode); TRIGGER_DISABLED, false,
cmd->recurse,
lockmode);
break; break;
case AT_EnableTrigAll: /* ENABLE TRIGGER ALL */ case AT_EnableTrigAll: /* ENABLE TRIGGER ALL */
ATExecEnableDisableTrigger(rel, NULL, ATExecEnableDisableTrigger(rel, NULL,
TRIGGER_FIRES_ON_ORIGIN, false, lockmode); TRIGGER_FIRES_ON_ORIGIN, false,
cmd->recurse,
lockmode);
break; break;
case AT_DisableTrigAll: /* DISABLE TRIGGER ALL */ case AT_DisableTrigAll: /* DISABLE TRIGGER ALL */
ATExecEnableDisableTrigger(rel, NULL, ATExecEnableDisableTrigger(rel, NULL,
TRIGGER_DISABLED, false, lockmode); TRIGGER_DISABLED, false,
cmd->recurse,
lockmode);
break; break;
case AT_EnableTrigUser: /* ENABLE TRIGGER USER */ case AT_EnableTrigUser: /* ENABLE TRIGGER USER */
ATExecEnableDisableTrigger(rel, NULL, ATExecEnableDisableTrigger(rel, NULL,
TRIGGER_FIRES_ON_ORIGIN, true, lockmode); TRIGGER_FIRES_ON_ORIGIN, true,
cmd->recurse,
lockmode);
break; break;
case AT_DisableTrigUser: /* DISABLE TRIGGER USER */ case AT_DisableTrigUser: /* DISABLE TRIGGER USER */
ATExecEnableDisableTrigger(rel, NULL, ATExecEnableDisableTrigger(rel, NULL,
TRIGGER_DISABLED, true, lockmode); TRIGGER_DISABLED, true,
cmd->recurse,
lockmode);
break; break;
case AT_EnableRule: /* ENABLE RULE name */ case AT_EnableRule: /* ENABLE RULE name */
...@@ -14178,9 +14200,11 @@ index_copy_data(Relation rel, RelFileNode newrnode) ...@@ -14178,9 +14200,11 @@ index_copy_data(Relation rel, RelFileNode newrnode)
*/ */
static void static void
ATExecEnableDisableTrigger(Relation rel, const char *trigname, ATExecEnableDisableTrigger(Relation rel, const char *trigname,
char fires_when, bool skip_system, LOCKMODE lockmode) char fires_when, bool skip_system, bool recurse,
LOCKMODE lockmode)
{ {
EnableDisableTrigger(rel, trigname, fires_when, skip_system, lockmode); EnableDisableTriggerNew(rel, trigname, fires_when, skip_system, recurse,
lockmode);
} }
/* /*
......
...@@ -1543,14 +1543,16 @@ renametrig(RenameStmt *stmt) ...@@ -1543,14 +1543,16 @@ renametrig(RenameStmt *stmt)
* enablement/disablement, this also defines when the trigger * enablement/disablement, this also defines when the trigger
* should be fired in session replication roles. * should be fired in session replication roles.
* skip_system: if true, skip "system" triggers (constraint triggers) * skip_system: if true, skip "system" triggers (constraint triggers)
* recurse: if true, recurse to partitions
* *
* Caller should have checked permissions for the table; here we also * Caller should have checked permissions for the table; here we also
* enforce that superuser privilege is required to alter the state of * enforce that superuser privilege is required to alter the state of
* system triggers * system triggers
*/ */
void void
EnableDisableTrigger(Relation rel, const char *tgname, EnableDisableTriggerNew(Relation rel, const char *tgname,
char fires_when, bool skip_system, LOCKMODE lockmode) char fires_when, bool skip_system, bool recurse,
LOCKMODE lockmode)
{ {
Relation tgrel; Relation tgrel;
int nkeys; int nkeys;
...@@ -1616,6 +1618,34 @@ EnableDisableTrigger(Relation rel, const char *tgname, ...@@ -1616,6 +1618,34 @@ EnableDisableTrigger(Relation rel, const char *tgname,
changed = true; changed = true;
} }
/*
* When altering FOR EACH ROW triggers on a partitioned table, do the
* same on the partitions as well, unless ONLY is specified.
*
* Note that we recurse even if we didn't change the trigger above,
* because the partitions' copy of the trigger may have a different
* value of tgenabled than the parent's trigger and thus might need to
* be changed.
*/
if (recurse &&
rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
(TRIGGER_FOR_ROW(oldtrig->tgtype)))
{
PartitionDesc partdesc = RelationGetPartitionDesc(rel, true);
int i;
for (i = 0; i < partdesc->nparts; i++)
{
Relation part;
part = relation_open(partdesc->oids[i], lockmode);
EnableDisableTriggerNew(part, NameStr(oldtrig->tgname),
fires_when, skip_system, recurse,
lockmode);
table_close(part, NoLock); /* keep lock till commit */
}
}
InvokeObjectPostAlterHook(TriggerRelationId, InvokeObjectPostAlterHook(TriggerRelationId,
oldtrig->oid, 0); oldtrig->oid, 0);
} }
...@@ -1639,6 +1669,19 @@ EnableDisableTrigger(Relation rel, const char *tgname, ...@@ -1639,6 +1669,19 @@ EnableDisableTrigger(Relation rel, const char *tgname,
CacheInvalidateRelcache(rel); CacheInvalidateRelcache(rel);
} }
/*
* ABI-compatible wrapper for the above. To keep as close possible to the old
* behavior, this never recurses. Do not call this function in new code.
*/
void
EnableDisableTrigger(Relation rel, const char *tgname,
char fires_when, bool skip_system,
LOCKMODE lockmode)
{
EnableDisableTriggerNew(rel, tgname, fires_when, skip_system,
true, lockmode);
}
/* /*
* Build trigger data to attach to the given relcache entry. * Build trigger data to attach to the given relcache entry.
......
...@@ -3344,6 +3344,7 @@ _copyAlterTableCmd(const AlterTableCmd *from) ...@@ -3344,6 +3344,7 @@ _copyAlterTableCmd(const AlterTableCmd *from)
COPY_NODE_FIELD(def); COPY_NODE_FIELD(def);
COPY_SCALAR_FIELD(behavior); COPY_SCALAR_FIELD(behavior);
COPY_SCALAR_FIELD(missing_ok); COPY_SCALAR_FIELD(missing_ok);
COPY_SCALAR_FIELD(recurse);
return newnode; return newnode;
} }
......
...@@ -1136,6 +1136,7 @@ _equalAlterTableCmd(const AlterTableCmd *a, const AlterTableCmd *b) ...@@ -1136,6 +1136,7 @@ _equalAlterTableCmd(const AlterTableCmd *a, const AlterTableCmd *b)
COMPARE_NODE_FIELD(def); COMPARE_NODE_FIELD(def);
COMPARE_SCALAR_FIELD(behavior); COMPARE_SCALAR_FIELD(behavior);
COMPARE_SCALAR_FIELD(missing_ok); COMPARE_SCALAR_FIELD(missing_ok);
COMPARE_SCALAR_FIELD(recurse);
return true; return true;
} }
......
...@@ -165,6 +165,9 @@ extern Oid get_trigger_oid(Oid relid, const char *name, bool missing_ok); ...@@ -165,6 +165,9 @@ extern Oid get_trigger_oid(Oid relid, const char *name, bool missing_ok);
extern ObjectAddress renametrig(RenameStmt *stmt); extern ObjectAddress renametrig(RenameStmt *stmt);
extern void EnableDisableTriggerNew(Relation rel, const char *tgname,
char fires_when, bool skip_system, bool recurse,
LOCKMODE lockmode);
extern void EnableDisableTrigger(Relation rel, const char *tgname, extern void EnableDisableTrigger(Relation rel, const char *tgname,
char fires_when, bool skip_system, LOCKMODE lockmode); char fires_when, bool skip_system, LOCKMODE lockmode);
......
...@@ -1956,6 +1956,7 @@ typedef struct AlterTableCmd /* one subcommand of an ALTER TABLE */ ...@@ -1956,6 +1956,7 @@ typedef struct AlterTableCmd /* one subcommand of an ALTER TABLE */
* constraint, or parent table */ * constraint, or parent table */
DropBehavior behavior; /* RESTRICT or CASCADE for DROP cases */ DropBehavior behavior; /* RESTRICT or CASCADE for DROP cases */
bool missing_ok; /* skip error if missing? */ bool missing_ok; /* skip error if missing? */
bool recurse; /* exec-time recursion */
} AlterTableCmd; } AlterTableCmd;
......
...@@ -2655,24 +2655,42 @@ create table parent (a int) partition by list (a); ...@@ -2655,24 +2655,42 @@ create table parent (a int) partition by list (a);
create table child1 partition of parent for values in (1); create table child1 partition of parent for values in (1);
create trigger tg after insert on parent create trigger tg after insert on parent
for each row execute procedure trig_nothing(); for each row execute procedure trig_nothing();
create trigger tg_stmt after insert on parent
for statement execute procedure trig_nothing();
select tgrelid::regclass, tgname, tgenabled from pg_trigger select tgrelid::regclass, tgname, tgenabled from pg_trigger
where tgrelid in ('parent'::regclass, 'child1'::regclass) where tgrelid in ('parent'::regclass, 'child1'::regclass)
order by tgrelid::regclass::text; order by tgrelid::regclass::text;
tgrelid | tgname | tgenabled tgrelid | tgname | tgenabled
---------+--------+----------- ---------+---------+-----------
child1 | tg | O child1 | tg | O
parent | tg | O parent | tg | O
(2 rows) parent | tg_stmt | O
(3 rows)
alter table only parent enable always trigger tg; alter table only parent enable always trigger tg; -- no recursion because ONLY
alter table parent enable always trigger tg_stmt; -- no recursion because statement trigger
select tgrelid::regclass, tgname, tgenabled from pg_trigger select tgrelid::regclass, tgname, tgenabled from pg_trigger
where tgrelid in ('parent'::regclass, 'child1'::regclass) where tgrelid in ('parent'::regclass, 'child1'::regclass)
order by tgrelid::regclass::text; order by tgrelid::regclass::text;
tgrelid | tgname | tgenabled tgrelid | tgname | tgenabled
---------+--------+----------- ---------+---------+-----------
child1 | tg | O child1 | tg | O
parent | tg | A parent | tg | A
(2 rows) parent | tg_stmt | A
(3 rows)
-- The following is a no-op for the parent trigger but not so
-- for the child trigger, so recursion should be applied.
alter table parent enable always trigger tg;
select tgrelid::regclass, tgname, tgenabled from pg_trigger
where tgrelid in ('parent'::regclass, 'child1'::regclass)
order by tgrelid::regclass::text;
tgrelid | tgname | tgenabled
---------+---------+-----------
child1 | tg | A
parent | tg | A
parent | tg_stmt | A
(3 rows)
drop table parent, child1; drop table parent, child1;
-- Verify that firing state propagates correctly on creation, too -- Verify that firing state propagates correctly on creation, too
......
...@@ -1832,10 +1832,19 @@ create table parent (a int) partition by list (a); ...@@ -1832,10 +1832,19 @@ create table parent (a int) partition by list (a);
create table child1 partition of parent for values in (1); create table child1 partition of parent for values in (1);
create trigger tg after insert on parent create trigger tg after insert on parent
for each row execute procedure trig_nothing(); for each row execute procedure trig_nothing();
create trigger tg_stmt after insert on parent
for statement execute procedure trig_nothing();
select tgrelid::regclass, tgname, tgenabled from pg_trigger select tgrelid::regclass, tgname, tgenabled from pg_trigger
where tgrelid in ('parent'::regclass, 'child1'::regclass) where tgrelid in ('parent'::regclass, 'child1'::regclass)
order by tgrelid::regclass::text; order by tgrelid::regclass::text;
alter table only parent enable always trigger tg; alter table only parent enable always trigger tg; -- no recursion because ONLY
alter table parent enable always trigger tg_stmt; -- no recursion because statement trigger
select tgrelid::regclass, tgname, tgenabled from pg_trigger
where tgrelid in ('parent'::regclass, 'child1'::regclass)
order by tgrelid::regclass::text;
-- The following is a no-op for the parent trigger but not so
-- for the child trigger, so recursion should be applied.
alter table parent enable always trigger tg;
select tgrelid::regclass, tgname, tgenabled from pg_trigger select tgrelid::regclass, tgname, tgenabled from pg_trigger
where tgrelid in ('parent'::regclass, 'child1'::regclass) where tgrelid in ('parent'::regclass, 'child1'::regclass)
order by tgrelid::regclass::text; order by tgrelid::regclass::text;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment