Commit 2dbbda02 authored by Simon Riggs's avatar Simon Riggs

Reduce lock levels of CREATE TRIGGER and some ALTER TABLE, CREATE RULE actions.

Avoid hard-coding lockmode used for many altering DDL commands, allowing easier
future changes of lock levels. Implementation of initial analysis on DDL
sub-commands, so that many lock levels are now at ShareUpdateExclusiveLock or
ShareRowExclusiveLock, allowing certain DDL not to block reads/writes.
First of number of planned changes in this area; additional docs required
when full project complete.
parent 133924e1
<!-- $PostgreSQL: pgsql/doc/src/sgml/mvcc.sgml,v 2.75 2010/05/03 15:35:30 alvherre Exp $ --> <!-- $PostgreSQL: pgsql/doc/src/sgml/mvcc.sgml,v 2.76 2010/07/28 05:22:24 sriggs Exp $ -->
<chapter id="mvcc"> <chapter id="mvcc">
<title>Concurrency Control</title> <title>Concurrency Control</title>
...@@ -532,7 +532,7 @@ SELECT SUM(value) FROM mytab WHERE class = 2; ...@@ -532,7 +532,7 @@ SELECT SUM(value) FROM mytab WHERE class = 2;
most <productname>PostgreSQL</productname> commands automatically most <productname>PostgreSQL</productname> commands automatically
acquire locks of appropriate modes to ensure that referenced acquire locks of appropriate modes to ensure that referenced
tables are not dropped or modified in incompatible ways while the tables are not dropped or modified in incompatible ways while the
command executes. (For example, <command>ALTER TABLE</> cannot safely be command executes. (For example, <command>TRUNCATE</> cannot safely be
executed concurrently with other operations on the same table, so it executed concurrently with other operations on the same table, so it
obtains an exclusive lock on the table to enforce that.) obtains an exclusive lock on the table to enforce that.)
</para> </para>
...@@ -695,8 +695,9 @@ SELECT SUM(value) FROM mytab WHERE class = 2; ...@@ -695,8 +695,9 @@ SELECT SUM(value) FROM mytab WHERE class = 2;
</para> </para>
<para> <para>
This lock mode is not automatically acquired by any Acquired by <command>CREATE TRIGGER</command>,
<productname>PostgreSQL</productname> command. <command>CREATE RULE</command> (except for <literal>ON SELECT</>
rules) and in some cases <command>ALTER TABLE</command>.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
...@@ -742,11 +743,12 @@ SELECT SUM(value) FROM mytab WHERE class = 2; ...@@ -742,11 +743,12 @@ SELECT SUM(value) FROM mytab WHERE class = 2;
</para> </para>
<para> <para>
Acquired by the <command>ALTER TABLE</command>, <command>DROP Acquired by the <command>DROP TABLE</command>,
TABLE</command>, <command>TRUNCATE</command>, <command>REINDEX</command>, <command>TRUNCATE</command>, <command>REINDEX</command>,
<command>CLUSTER</command>, and <command>VACUUM FULL</command> <command>CLUSTER</command>, and <command>VACUUM FULL</command>
commands. This is also the default lock mode for <command>LOCK commands, as well as most variants of <command>ALTER TABLE</>.
TABLE</command> statements that do not specify a mode explicitly. This is also the default lock mode for <command>LOCK TABLE</command>
statements that do not specify a mode explicitly.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/catalog/pg_shdepend.c,v 1.43 2010/07/06 19:18:56 momjian Exp $ * $PostgreSQL: pgsql/src/backend/catalog/pg_shdepend.c,v 1.44 2010/07/28 05:22:24 sriggs Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -1346,7 +1346,7 @@ shdepReassignOwned(List *roleids, Oid newrole) ...@@ -1346,7 +1346,7 @@ shdepReassignOwned(List *roleids, Oid newrole)
* owned sequences, etc when we happen to visit them * owned sequences, etc when we happen to visit them
* before their parent table. * before their parent table.
*/ */
ATExecChangeOwner(sdepForm->objid, newrole, true); ATExecChangeOwner(sdepForm->objid, newrole, true, AccessExclusiveLock);
break; break;
case ProcedureRelationId: case ProcedureRelationId:
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/alter.c,v 1.36 2010/06/13 17:43:12 rhaas Exp $ * $PostgreSQL: pgsql/src/backend/commands/alter.c,v 1.37 2010/07/28 05:22:24 sriggs Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -190,7 +190,7 @@ ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt) ...@@ -190,7 +190,7 @@ ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt)
case OBJECT_VIEW: case OBJECT_VIEW:
CheckRelationOwnership(stmt->relation, true); CheckRelationOwnership(stmt->relation, true);
AlterTableNamespace(stmt->relation, stmt->newschema, AlterTableNamespace(stmt->relation, stmt->newschema,
stmt->objectType); stmt->objectType, AccessExclusiveLock);
break; break;
case OBJECT_TYPE: case OBJECT_TYPE:
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.334 2010/07/25 23:21:21 rhaas Exp $ * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.335 2010/07/28 05:22:24 sriggs Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -64,6 +64,7 @@ ...@@ -64,6 +64,7 @@
#include "rewrite/rewriteHandler.h" #include "rewrite/rewriteHandler.h"
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "storage/lock.h"
#include "storage/smgr.h" #include "storage/smgr.h"
#include "utils/acl.h" #include "utils/acl.h"
#include "utils/builtins.h" #include "utils/builtins.h"
...@@ -237,7 +238,7 @@ static void AlterIndexNamespaces(Relation classRel, Relation rel, ...@@ -237,7 +238,7 @@ static void AlterIndexNamespaces(Relation classRel, Relation rel,
Oid oldNspOid, Oid newNspOid); Oid oldNspOid, Oid newNspOid);
static void AlterSeqNamespaces(Relation classRel, Relation rel, static void AlterSeqNamespaces(Relation classRel, Relation rel,
Oid oldNspOid, Oid newNspOid, Oid oldNspOid, Oid newNspOid,
const char *newNspName); const char *newNspName, LOCKMODE lockmode);
static int transformColumnNameList(Oid relId, List *colList, static int transformColumnNameList(Oid relId, List *colList,
int16 *attnums, Oid *atttypids); int16 *attnums, Oid *atttypids);
static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid, static int transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
...@@ -253,84 +254,84 @@ static void validateForeignKeyConstraint(Constraint *fkconstraint, ...@@ -253,84 +254,84 @@ static void validateForeignKeyConstraint(Constraint *fkconstraint,
Oid pkindOid, Oid constraintOid); Oid pkindOid, Oid constraintOid);
static void createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, static void createForeignKeyTriggers(Relation rel, Constraint *fkconstraint,
Oid constraintOid, Oid indexOid); Oid constraintOid, Oid indexOid);
static void ATController(Relation rel, List *cmds, bool recurse); static void ATController(Relation rel, List *cmds, bool recurse, LOCKMODE lockmode);
static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
bool recurse, bool recursing); bool recurse, bool recursing, LOCKMODE lockmode);
static void ATRewriteCatalogs(List **wqueue); static void ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode);
static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
AlterTableCmd *cmd); AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATRewriteTables(List **wqueue); static void ATRewriteTables(List **wqueue, LOCKMODE lockmode);
static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap); static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode);
static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel); static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
static void ATSimplePermissions(Relation rel, bool allowView); static void ATSimplePermissions(Relation rel, bool allowView);
static void ATSimplePermissionsRelationOrIndex(Relation rel); static void ATSimplePermissionsRelationOrIndex(Relation rel);
static void ATSimpleRecursion(List **wqueue, Relation rel, static void ATSimpleRecursion(List **wqueue, Relation rel,
AlterTableCmd *cmd, bool recurse); AlterTableCmd *cmd, bool recurse, LOCKMODE lockmode);
static void ATOneLevelRecursion(List **wqueue, Relation rel, static void ATOneLevelRecursion(List **wqueue, Relation rel,
AlterTableCmd *cmd); AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
AlterTableCmd *cmd); AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel, static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
ColumnDef *colDef, bool isOid); ColumnDef *colDef, bool isOid, LOCKMODE lockmode);
static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid); static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid);
static void ATPrepAddOids(List **wqueue, Relation rel, bool recurse, static void ATPrepAddOids(List **wqueue, Relation rel, bool recurse,
AlterTableCmd *cmd); AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATExecDropNotNull(Relation rel, const char *colName); static void ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode);
static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, static void ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
const char *colName); const char *colName, LOCKMODE lockmode);
static void ATExecColumnDefault(Relation rel, const char *colName, static void ATExecColumnDefault(Relation rel, const char *colName,
Node *newDefault); Node *newDefault, LOCKMODE lockmode);
static void ATPrepSetStatistics(Relation rel, const char *colName, static void ATPrepSetStatistics(Relation rel, const char *colName,
Node *newValue); Node *newValue, LOCKMODE lockmode);
static void ATExecSetStatistics(Relation rel, const char *colName, static void ATExecSetStatistics(Relation rel, const char *colName,
Node *newValue); Node *newValue, LOCKMODE lockmode);
static void ATExecSetOptions(Relation rel, const char *colName, static void ATExecSetOptions(Relation rel, const char *colName,
Node *options, bool isReset); Node *options, bool isReset, LOCKMODE lockmode);
static void ATExecSetStorage(Relation rel, const char *colName, static void ATExecSetStorage(Relation rel, const char *colName,
Node *newValue); Node *newValue, LOCKMODE lockmode);
static void ATPrepDropColumn(Relation rel, bool recurse, AlterTableCmd *cmd); static void ATPrepDropColumn(Relation rel, bool recurse, AlterTableCmd *cmd);
static void ATExecDropColumn(List **wqueue, Relation rel, const char *colName, static void ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
DropBehavior behavior, DropBehavior behavior,
bool recurse, bool recursing, bool recurse, bool recursing,
bool missing_ok); bool missing_ok, LOCKMODE lockmode);
static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel, static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
IndexStmt *stmt, bool is_rebuild); IndexStmt *stmt, bool is_rebuild, LOCKMODE lockmode);
static void ATExecAddConstraint(List **wqueue, static void ATExecAddConstraint(List **wqueue,
AlteredTableInfo *tab, Relation rel, AlteredTableInfo *tab, Relation rel,
Constraint *newConstraint, bool recurse); Constraint *newConstraint, bool recurse, LOCKMODE lockmode);
static void ATAddCheckConstraint(List **wqueue, static void ATAddCheckConstraint(List **wqueue,
AlteredTableInfo *tab, Relation rel, AlteredTableInfo *tab, Relation rel,
Constraint *constr, Constraint *constr,
bool recurse, bool recursing); bool recurse, bool recursing, LOCKMODE lockmode);
static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
Constraint *fkconstraint); Constraint *fkconstraint, LOCKMODE lockmode);
static void ATExecDropConstraint(Relation rel, const char *constrName, static void ATExecDropConstraint(Relation rel, const char *constrName,
DropBehavior behavior, DropBehavior behavior,
bool recurse, bool recursing, bool recurse, bool recursing,
bool missing_ok); bool missing_ok, LOCKMODE lockmode);
static void ATPrepAlterColumnType(List **wqueue, static void ATPrepAlterColumnType(List **wqueue,
AlteredTableInfo *tab, Relation rel, AlteredTableInfo *tab, Relation rel,
bool recurse, bool recursing, bool recurse, bool recursing,
AlterTableCmd *cmd); AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
const char *colName, TypeName *typeName); const char *colName, TypeName *typeName, LOCKMODE lockmode);
static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab); static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode);
static void ATPostAlterTypeParse(char *cmd, List **wqueue); static void ATPostAlterTypeParse(char *cmd, List **wqueue, LOCKMODE lockmode);
static void change_owner_recurse_to_sequences(Oid relationOid, static void change_owner_recurse_to_sequences(Oid relationOid,
Oid newOwnerId); Oid newOwnerId, LOCKMODE lockmode);
static void ATExecClusterOn(Relation rel, const char *indexName); static void ATExecClusterOn(Relation rel, const char *indexName, LOCKMODE lockmode);
static void ATExecDropCluster(Relation rel); static void ATExecDropCluster(Relation rel, LOCKMODE lockmode);
static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
char *tablespacename); char *tablespacename, LOCKMODE lockmode);
static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace); static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace, LOCKMODE lockmode);
static void ATExecSetRelOptions(Relation rel, List *defList, bool isReset); static void ATExecSetRelOptions(Relation rel, List *defList, bool isReset, LOCKMODE lockmode);
static void ATExecEnableDisableTrigger(Relation rel, char *trigname, static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
char fires_when, bool skip_system); char fires_when, bool skip_system, LOCKMODE lockmode);
static void ATExecEnableDisableRule(Relation rel, char *rulename, static void ATExecEnableDisableRule(Relation rel, char *rulename,
char fires_when); char fires_when, LOCKMODE lockmode);
static void ATPrepAddInherit(Relation child_rel); static void ATPrepAddInherit(Relation child_rel);
static void ATExecAddInherit(Relation child_rel, RangeVar *parent); static void ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode);
static void ATExecDropInherit(Relation rel, RangeVar *parent); static void ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode);
static void copy_relation_data(SMgrRelation rel, SMgrRelation dst, static void copy_relation_data(SMgrRelation rel, SMgrRelation dst,
ForkNumber forkNum, bool istemp); ForkNumber forkNum, bool istemp);
static const char *storage_name(char c); static const char *storage_name(char c);
...@@ -2249,8 +2250,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, Oid namespaceId) ...@@ -2249,8 +2250,8 @@ RenameRelationInternal(Oid myrelid, const char *newrelname, Oid namespaceId)
* Disallow ALTER TABLE (and similar commands) when the current backend has * Disallow ALTER TABLE (and similar commands) when the current backend has
* any open reference to the target table besides the one just acquired by * any open reference to the target table besides the one just acquired by
* the calling command; this implies there's an open cursor or active plan. * the calling command; this implies there's an open cursor or active plan.
* We need this check because our AccessExclusiveLock doesn't protect us * We need this check because our lock doesn't protect us against stomping
* against stomping on our own foot, only other people's feet! * on our own foot, only other people's feet!
* *
* For ALTER TABLE, the only case known to cause serious trouble is ALTER * For ALTER TABLE, the only case known to cause serious trouble is ALTER
* COLUMN TYPE, and some changes are obviously pretty benign, so this could * COLUMN TYPE, and some changes are obviously pretty benign, so this could
...@@ -2327,11 +2328,27 @@ CheckTableNotInUse(Relation rel, const char *stmt) ...@@ -2327,11 +2328,27 @@ CheckTableNotInUse(Relation rel, const char *stmt)
* *
* Thanks to the magic of MVCC, an error anywhere along the way rolls back * Thanks to the magic of MVCC, an error anywhere along the way rolls back
* the whole operation; we don't have to do anything special to clean up. * the whole operation; we don't have to do anything special to clean up.
*
* We lock the table as the first action, with an appropriate lock level
* for the subcommands requested. Any subcommand that needs to rewrite
* tuples in the table forces the whole command to be executed with
* AccessExclusiveLock. If all subcommands do not require rewrite table
* then we may be able to use lower lock levels. We pass the lock level down
* so that we can apply it recursively to inherited tables. Note that the
* lock level we want as we recurse may well be higher than required for
* that specific subcommand. So we pass down the overall lock requirement,
* rather than reassess it at lower levels.
*/ */
void void
AlterTable(AlterTableStmt *stmt) AlterTable(AlterTableStmt *stmt)
{ {
Relation rel = relation_openrv(stmt->relation, AccessExclusiveLock); Relation rel;
LOCKMODE lockmode = AlterTableGetLockLevel(stmt->cmds);
/*
* Acquire same level of lock as already acquired during parsing.
*/
rel = relation_openrv(stmt->relation, lockmode);
CheckTableNotInUse(rel, "ALTER TABLE"); CheckTableNotInUse(rel, "ALTER TABLE");
...@@ -2374,7 +2391,8 @@ AlterTable(AlterTableStmt *stmt) ...@@ -2374,7 +2391,8 @@ AlterTable(AlterTableStmt *stmt)
elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind); elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind);
} }
ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt)); ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt),
lockmode);
} }
/* /*
...@@ -2391,13 +2409,169 @@ AlterTable(AlterTableStmt *stmt) ...@@ -2391,13 +2409,169 @@ AlterTable(AlterTableStmt *stmt)
void void
AlterTableInternal(Oid relid, List *cmds, bool recurse) AlterTableInternal(Oid relid, List *cmds, bool recurse)
{ {
Relation rel = relation_open(relid, AccessExclusiveLock); Relation rel;
LOCKMODE lockmode = AlterTableGetLockLevel(cmds);
ATController(rel, cmds, recurse); rel = relation_open(relid, lockmode);
ATController(rel, cmds, recurse, lockmode);
}
/*
* AlterTableGetLockLevel
*
* Sets the overall lock level required for the supplied list of subcommands.
* Policy for doing this set according to needs of AlterTable(), see
* comments there for overall explanation.
*
* Function is called before and after parsing, so it must give same
* answer each time it is called. Some subcommands are transformed
* into other subcommand types, so the transform must never be made to a
* lower lock level than previously assigned. All transforms are noted below.
*
* Since this is called before we lock the table we cannot use table metadata
* to influence the type of lock we acquire.
*
* There should be no lockmodes hardcoded into the subcommand functions. All
* lockmode decisions for ALTER TABLE are made here only. The one exception is
* ALTER TABLE RENAME which is treated as a different statement type T_RenameStmt
* and does not travel through this section of code and cannot be combined with
* any of the subcommands given here.
*/
LOCKMODE
AlterTableGetLockLevel(List *cmds)
{
ListCell *lcmd;
LOCKMODE lockmode = ShareUpdateExclusiveLock;
foreach(lcmd, cmds)
{
AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
LOCKMODE cmd_lockmode = AccessExclusiveLock; /* default for compiler */
switch (cmd->subtype)
{
/*
* Need AccessExclusiveLock for these subcommands because they
* affect or potentially affect both read and write operations.
*
* New subcommand types should be added here by default.
*/
case AT_AddColumn: /* may rewrite heap, in some cases and visible to SELECT */
case AT_DropColumn: /* change visible to SELECT */
case AT_AddColumnToView: /* CREATE VIEW */
case AT_AlterColumnType: /* must rewrite heap */
case AT_DropConstraint: /* as DROP INDEX */
case AT_AddOids: /* must rewrite heap */
case AT_DropOids: /* calls AT_DropColumn */
case AT_EnableAlwaysRule: /* may change SELECT rules */
case AT_EnableReplicaRule: /* may change SELECT rules */
case AT_EnableRule: /* may change SELECT rules */
case AT_DisableRule: /* may change SELECT rules */
case AT_ChangeOwner: /* change visible to SELECT */
case AT_SetTableSpace: /* must rewrite heap */
case AT_DropNotNull: /* may change some SQL plans */
case AT_SetNotNull:
cmd_lockmode = AccessExclusiveLock;
break;
/*
* These subcommands affect write operations only.
*/
case AT_ColumnDefault:
case AT_ProcessedConstraint: /* becomes AT_AddConstraint */
case AT_AddConstraintRecurse: /* becomes AT_AddConstraint */
case AT_EnableTrig:
case AT_EnableAlwaysTrig:
case AT_EnableReplicaTrig:
case AT_EnableTrigAll:
case AT_EnableTrigUser:
case AT_DisableTrig:
case AT_DisableTrigAll:
case AT_DisableTrigUser:
case AT_AddIndex: /* from ADD CONSTRAINT */
cmd_lockmode = ShareRowExclusiveLock;
break;
case AT_AddConstraint:
if (IsA(cmd->def, Constraint))
{
Constraint *con = (Constraint *) cmd->def;
switch (con->contype)
{
case CONSTR_EXCLUSION:
case CONSTR_PRIMARY:
case CONSTR_UNIQUE:
/*
* Cases essentially the same as CREATE INDEX. We
* could reduce the lock strength to ShareLock if we
* can work out how to allow concurrent catalog updates.
*/
cmd_lockmode = ShareRowExclusiveLock;
break;
case CONSTR_FOREIGN:
/*
* We add triggers to both tables when we add a
* Foreign Key, so the lock level must be at least
* as strong as CREATE TRIGGER.
*/
cmd_lockmode = ShareRowExclusiveLock;
break;
default:
cmd_lockmode = ShareRowExclusiveLock;
}
}
break;
/*
* These subcommands affect inheritance behaviour. Queries started before us
* will continue to see the old inheritance behaviour, while queries started
* after we commit will see new behaviour. No need to prevent reads or writes
* to the subtable while we hook it up though. In both cases the parent table
* is locked with AccessShareLock.
*/
case AT_AddInherit:
case AT_DropInherit:
cmd_lockmode = ShareUpdateExclusiveLock;
break;
/*
* These subcommands affect general strategies for performance and maintenance,
* though don't change the semantic results from normal data reads and writes.
* Delaying an ALTER TABLE behind currently active writes only delays the point
* where the new strategy begins to take effect, so there is no benefit in waiting.
* In thise case the minimum restriction applies: we don't currently allow
* concurrent catalog updates.
*/
case AT_SetStatistics:
case AT_ClusterOn:
case AT_DropCluster:
case AT_SetRelOptions:
case AT_ResetRelOptions:
case AT_SetStorage:
cmd_lockmode = ShareUpdateExclusiveLock;
break;
default: /* oops */
elog(ERROR, "unrecognized alter table type: %d",
(int) cmd->subtype);
break;
}
/*
* Take the greatest lockmode from any subcommand
*/
if (cmd_lockmode > lockmode)
lockmode = cmd_lockmode;
}
return lockmode;
} }
static void static void
ATController(Relation rel, List *cmds, bool recurse) ATController(Relation rel, List *cmds, bool recurse, LOCKMODE lockmode)
{ {
List *wqueue = NIL; List *wqueue = NIL;
ListCell *lcmd; ListCell *lcmd;
...@@ -2407,17 +2581,17 @@ ATController(Relation rel, List *cmds, bool recurse) ...@@ -2407,17 +2581,17 @@ ATController(Relation rel, List *cmds, bool recurse)
{ {
AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd); AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
ATPrepCmd(&wqueue, rel, cmd, recurse, false); ATPrepCmd(&wqueue, rel, cmd, recurse, false, lockmode);
} }
/* Close the relation, but keep lock until commit */ /* Close the relation, but keep lock until commit */
relation_close(rel, NoLock); relation_close(rel, NoLock);
/* Phase 2: update system catalogs */ /* Phase 2: update system catalogs */
ATRewriteCatalogs(&wqueue); ATRewriteCatalogs(&wqueue, lockmode);
/* Phase 3: scan/rewrite tables as needed */ /* Phase 3: scan/rewrite tables as needed */
ATRewriteTables(&wqueue); ATRewriteTables(&wqueue, lockmode);
} }
/* /*
...@@ -2426,12 +2600,12 @@ ATController(Relation rel, List *cmds, bool recurse) ...@@ -2426,12 +2600,12 @@ ATController(Relation rel, List *cmds, bool recurse)
* Traffic cop for ALTER TABLE Phase 1 operations, including simple * Traffic cop for ALTER TABLE Phase 1 operations, including simple
* recursion and permission checks. * recursion and permission checks.
* *
* Caller must have acquired AccessExclusiveLock on relation already. * Caller must have acquired appropriate lock type on relation already.
* This lock should be held until commit. * This lock should be held until commit.
*/ */
static void static void
ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
bool recurse, bool recursing) bool recurse, bool recursing, LOCKMODE lockmode)
{ {
AlteredTableInfo *tab; AlteredTableInfo *tab;
int pass; int pass;
...@@ -2456,14 +2630,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ...@@ -2456,14 +2630,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
case AT_AddColumn: /* ADD COLUMN */ case AT_AddColumn: /* ADD COLUMN */
ATSimplePermissions(rel, false); ATSimplePermissions(rel, false);
/* Performs own recursion */ /* Performs own recursion */
ATPrepAddColumn(wqueue, rel, recurse, cmd); ATPrepAddColumn(wqueue, rel, recurse, cmd, lockmode);
pass = AT_PASS_ADD_COL; pass = AT_PASS_ADD_COL;
break; break;
case AT_AddColumnToView: /* add column via CREATE OR REPLACE case AT_AddColumnToView: /* add column via CREATE OR REPLACE
* VIEW */ * VIEW */
ATSimplePermissions(rel, true); ATSimplePermissions(rel, true);
/* Performs own recursion */ /* Performs own recursion */
ATPrepAddColumn(wqueue, rel, recurse, cmd); ATPrepAddColumn(wqueue, rel, recurse, cmd, lockmode);
pass = AT_PASS_ADD_COL; pass = AT_PASS_ADD_COL;
break; break;
case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
...@@ -2475,26 +2649,26 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ...@@ -2475,26 +2649,26 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
* rules. * rules.
*/ */
ATSimplePermissions(rel, true); ATSimplePermissions(rel, true);
ATSimpleRecursion(wqueue, rel, cmd, recurse); ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */ /* No command-specific prep needed */
pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP; pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
break; break;
case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
ATSimplePermissions(rel, false); ATSimplePermissions(rel, false);
ATSimpleRecursion(wqueue, rel, cmd, recurse); ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */ /* No command-specific prep needed */
pass = AT_PASS_DROP; pass = AT_PASS_DROP;
break; break;
case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
ATSimplePermissions(rel, false); ATSimplePermissions(rel, false);
ATSimpleRecursion(wqueue, rel, cmd, recurse); ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */ /* No command-specific prep needed */
pass = AT_PASS_ADD_CONSTR; pass = AT_PASS_ADD_CONSTR;
break; break;
case AT_SetStatistics: /* ALTER COLUMN SET STATISTICS */ case AT_SetStatistics: /* ALTER COLUMN SET STATISTICS */
ATSimpleRecursion(wqueue, rel, cmd, recurse); ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* Performs own permission checks */ /* Performs own permission checks */
ATPrepSetStatistics(rel, cmd->name, cmd->def); ATPrepSetStatistics(rel, cmd->name, cmd->def, lockmode);
pass = AT_PASS_COL_ATTRS; pass = AT_PASS_COL_ATTRS;
break; break;
case AT_SetOptions: /* ALTER COLUMN SET ( options ) */ case AT_SetOptions: /* ALTER COLUMN SET ( options ) */
...@@ -2505,7 +2679,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ...@@ -2505,7 +2679,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
break; break;
case AT_SetStorage: /* ALTER COLUMN SET STORAGE */ case AT_SetStorage: /* ALTER COLUMN SET STORAGE */
ATSimplePermissions(rel, false); ATSimplePermissions(rel, false);
ATSimpleRecursion(wqueue, rel, cmd, recurse); ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
/* No command-specific prep needed */ /* No command-specific prep needed */
pass = AT_PASS_COL_ATTRS; pass = AT_PASS_COL_ATTRS;
break; break;
...@@ -2540,7 +2714,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ...@@ -2540,7 +2714,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
case AT_AlterColumnType: /* ALTER COLUMN TYPE */ case AT_AlterColumnType: /* ALTER COLUMN TYPE */
ATSimplePermissions(rel, false); ATSimplePermissions(rel, false);
/* Performs own recursion */ /* Performs own recursion */
ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd); ATPrepAlterColumnType(wqueue, tab, rel, recurse, recursing, cmd, lockmode);
pass = AT_PASS_ALTER_TYPE; pass = AT_PASS_ALTER_TYPE;
break; break;
case AT_ChangeOwner: /* ALTER OWNER */ case AT_ChangeOwner: /* ALTER OWNER */
...@@ -2559,7 +2733,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ...@@ -2559,7 +2733,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
ATSimplePermissions(rel, false); ATSimplePermissions(rel, false);
/* Performs own recursion */ /* Performs own recursion */
if (!rel->rd_rel->relhasoids || recursing) if (!rel->rd_rel->relhasoids || recursing)
ATPrepAddOids(wqueue, rel, recurse, cmd); ATPrepAddOids(wqueue, rel, recurse, cmd, lockmode);
pass = AT_PASS_ADD_COL; pass = AT_PASS_ADD_COL;
break; break;
case AT_DropOids: /* SET WITHOUT OIDS */ case AT_DropOids: /* SET WITHOUT OIDS */
...@@ -2572,14 +2746,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ...@@ -2572,14 +2746,14 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
dropCmd->subtype = AT_DropColumn; dropCmd->subtype = AT_DropColumn;
dropCmd->name = pstrdup("oid"); dropCmd->name = pstrdup("oid");
dropCmd->behavior = cmd->behavior; dropCmd->behavior = cmd->behavior;
ATPrepCmd(wqueue, rel, dropCmd, recurse, false); ATPrepCmd(wqueue, rel, dropCmd, recurse, false, lockmode);
} }
pass = AT_PASS_DROP; pass = AT_PASS_DROP;
break; break;
case AT_SetTableSpace: /* SET TABLESPACE */ case AT_SetTableSpace: /* SET TABLESPACE */
ATSimplePermissionsRelationOrIndex(rel); ATSimplePermissionsRelationOrIndex(rel);
/* This command never recurses */ /* This command never recurses */
ATPrepSetTableSpace(tab, rel, cmd->name); ATPrepSetTableSpace(tab, rel, cmd->name, lockmode);
pass = AT_PASS_MISC; /* doesn't actually matter */ pass = AT_PASS_MISC; /* doesn't actually matter */
break; break;
case AT_SetRelOptions: /* SET (...) */ case AT_SetRelOptions: /* SET (...) */
...@@ -2632,7 +2806,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, ...@@ -2632,7 +2806,7 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
* conflicts). * conflicts).
*/ */
static void static void
ATRewriteCatalogs(List **wqueue) ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode)
{ {
int pass; int pass;
ListCell *ltab; ListCell *ltab;
...@@ -2658,12 +2832,12 @@ ATRewriteCatalogs(List **wqueue) ...@@ -2658,12 +2832,12 @@ ATRewriteCatalogs(List **wqueue)
continue; continue;
/* /*
* Exclusive lock was obtained by phase 1, needn't get it again * Appropriate lock was obtained by phase 1, needn't get it again
*/ */
rel = relation_open(tab->relid, NoLock); rel = relation_open(tab->relid, NoLock);
foreach(lcmd, subcmds) foreach(lcmd, subcmds)
ATExecCmd(wqueue, tab, rel, (AlterTableCmd *) lfirst(lcmd)); ATExecCmd(wqueue, tab, rel, (AlterTableCmd *) lfirst(lcmd), lockmode);
/* /*
* After the ALTER TYPE pass, do cleanup work (this is not done in * After the ALTER TYPE pass, do cleanup work (this is not done in
...@@ -2671,7 +2845,7 @@ ATRewriteCatalogs(List **wqueue) ...@@ -2671,7 +2845,7 @@ ATRewriteCatalogs(List **wqueue)
* multiple columns of a table are altered). * multiple columns of a table are altered).
*/ */
if (pass == AT_PASS_ALTER_TYPE) if (pass == AT_PASS_ALTER_TYPE)
ATPostAlterTypeCleanup(wqueue, tab); ATPostAlterTypeCleanup(wqueue, tab, lockmode);
relation_close(rel, NoLock); relation_close(rel, NoLock);
} }
...@@ -2698,86 +2872,86 @@ ATRewriteCatalogs(List **wqueue) ...@@ -2698,86 +2872,86 @@ ATRewriteCatalogs(List **wqueue)
*/ */
static void static void
ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
AlterTableCmd *cmd) AlterTableCmd *cmd, LOCKMODE lockmode)
{ {
switch (cmd->subtype) switch (cmd->subtype)
{ {
case AT_AddColumn: /* ADD COLUMN */ case AT_AddColumn: /* ADD COLUMN */
case AT_AddColumnToView: /* add column via CREATE OR REPLACE case AT_AddColumnToView: /* add column via CREATE OR REPLACE
* VIEW */ * VIEW */
ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, false); ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, false, lockmode);
break; break;
case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */ case AT_ColumnDefault: /* ALTER COLUMN DEFAULT */
ATExecColumnDefault(rel, cmd->name, cmd->def); ATExecColumnDefault(rel, cmd->name, cmd->def, lockmode);
break; break;
case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */
ATExecDropNotNull(rel, cmd->name); ATExecDropNotNull(rel, cmd->name, lockmode);
break; break;
case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */
ATExecSetNotNull(tab, rel, cmd->name); ATExecSetNotNull(tab, rel, cmd->name, lockmode);
break; break;
case AT_SetStatistics: /* ALTER COLUMN SET STATISTICS */ case AT_SetStatistics: /* ALTER COLUMN SET STATISTICS */
ATExecSetStatistics(rel, cmd->name, cmd->def); ATExecSetStatistics(rel, cmd->name, cmd->def, lockmode);
break; break;
case AT_SetOptions: /* ALTER COLUMN SET ( options ) */ case AT_SetOptions: /* ALTER COLUMN SET ( options ) */
ATExecSetOptions(rel, cmd->name, cmd->def, false); ATExecSetOptions(rel, cmd->name, cmd->def, false, lockmode);
break; break;
case AT_ResetOptions: /* ALTER COLUMN RESET ( options ) */ case AT_ResetOptions: /* ALTER COLUMN RESET ( options ) */
ATExecSetOptions(rel, cmd->name, cmd->def, true); ATExecSetOptions(rel, cmd->name, cmd->def, true, lockmode);
break; break;
case AT_SetStorage: /* ALTER COLUMN SET STORAGE */ case AT_SetStorage: /* ALTER COLUMN SET STORAGE */
ATExecSetStorage(rel, cmd->name, cmd->def); ATExecSetStorage(rel, cmd->name, cmd->def, lockmode);
break; break;
case AT_DropColumn: /* DROP COLUMN */ case AT_DropColumn: /* DROP COLUMN */
ATExecDropColumn(wqueue, rel, cmd->name, ATExecDropColumn(wqueue, rel, cmd->name,
cmd->behavior, false, false, cmd->missing_ok); cmd->behavior, false, false, cmd->missing_ok, lockmode);
break; break;
case AT_DropColumnRecurse: /* DROP COLUMN with recursion */ case AT_DropColumnRecurse: /* DROP COLUMN with recursion */
ATExecDropColumn(wqueue, rel, cmd->name, ATExecDropColumn(wqueue, rel, cmd->name,
cmd->behavior, true, false, cmd->missing_ok); cmd->behavior, true, false, cmd->missing_ok, lockmode);
break; break;
case AT_AddIndex: /* ADD INDEX */ case AT_AddIndex: /* ADD INDEX */
ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false); ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false, lockmode);
break; break;
case AT_ReAddIndex: /* ADD INDEX */ case AT_ReAddIndex: /* ADD INDEX */
ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true); ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true, lockmode);
break; break;
case AT_AddConstraint: /* ADD CONSTRAINT */ case AT_AddConstraint: /* ADD CONSTRAINT */
ATExecAddConstraint(wqueue, tab, rel, (Constraint *) cmd->def, ATExecAddConstraint(wqueue, tab, rel, (Constraint *) cmd->def,
false); false, lockmode);
break; break;
case AT_AddConstraintRecurse: /* ADD CONSTRAINT with recursion */ case AT_AddConstraintRecurse: /* ADD CONSTRAINT with recursion */
ATExecAddConstraint(wqueue, tab, rel, (Constraint *) cmd->def, ATExecAddConstraint(wqueue, tab, rel, (Constraint *) cmd->def,
true); true, lockmode);
break; break;
case AT_DropConstraint: /* DROP CONSTRAINT */ case AT_DropConstraint: /* DROP CONSTRAINT */
ATExecDropConstraint(rel, cmd->name, cmd->behavior, ATExecDropConstraint(rel, cmd->name, cmd->behavior,
false, false, false, false,
cmd->missing_ok); cmd->missing_ok, lockmode);
break; break;
case AT_DropConstraintRecurse: /* DROP CONSTRAINT with recursion */ case AT_DropConstraintRecurse: /* DROP CONSTRAINT with recursion */
ATExecDropConstraint(rel, cmd->name, cmd->behavior, ATExecDropConstraint(rel, cmd->name, cmd->behavior,
true, false, true, false,
cmd->missing_ok); cmd->missing_ok, lockmode);
break; break;
case AT_AlterColumnType: /* ALTER COLUMN TYPE */ case AT_AlterColumnType: /* ALTER COLUMN TYPE */
ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def); ATExecAlterColumnType(tab, rel, cmd->name, (TypeName *) cmd->def, lockmode);
break; break;
case AT_ChangeOwner: /* ALTER OWNER */ case AT_ChangeOwner: /* ALTER OWNER */
ATExecChangeOwner(RelationGetRelid(rel), ATExecChangeOwner(RelationGetRelid(rel),
get_roleid_checked(cmd->name), get_roleid_checked(cmd->name),
false); false, lockmode);
break; break;
case AT_ClusterOn: /* CLUSTER ON */ case AT_ClusterOn: /* CLUSTER ON */
ATExecClusterOn(rel, cmd->name); ATExecClusterOn(rel, cmd->name, lockmode);
break; break;
case AT_DropCluster: /* SET WITHOUT CLUSTER */ case AT_DropCluster: /* SET WITHOUT CLUSTER */
ATExecDropCluster(rel); ATExecDropCluster(rel, lockmode);
break; break;
case AT_AddOids: /* SET WITH OIDS */ case AT_AddOids: /* SET WITH OIDS */
/* Use the ADD COLUMN code, unless prep decided to do nothing */ /* Use the ADD COLUMN code, unless prep decided to do nothing */
if (cmd->def != NULL) if (cmd->def != NULL)
ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, true); ATExecAddColumn(tab, rel, (ColumnDef *) cmd->def, true, lockmode);
break; break;
case AT_DropOids: /* SET WITHOUT OIDS */ case AT_DropOids: /* SET WITHOUT OIDS */
...@@ -2793,67 +2967,67 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -2793,67 +2967,67 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
*/ */
break; break;
case AT_SetRelOptions: /* SET (...) */ case AT_SetRelOptions: /* SET (...) */
ATExecSetRelOptions(rel, (List *) cmd->def, false); ATExecSetRelOptions(rel, (List *) cmd->def, false, lockmode);
break; break;
case AT_ResetRelOptions: /* RESET (...) */ case AT_ResetRelOptions: /* RESET (...) */
ATExecSetRelOptions(rel, (List *) cmd->def, true); ATExecSetRelOptions(rel, (List *) cmd->def, true, lockmode);
break; break;
case AT_EnableTrig: /* ENABLE TRIGGER name */ case AT_EnableTrig: /* ENABLE TRIGGER name */
ATExecEnableDisableTrigger(rel, cmd->name, ATExecEnableDisableTrigger(rel, cmd->name,
TRIGGER_FIRES_ON_ORIGIN, false); TRIGGER_FIRES_ON_ORIGIN, false, lockmode);
break; break;
case AT_EnableAlwaysTrig: /* ENABLE ALWAYS TRIGGER name */ case AT_EnableAlwaysTrig: /* ENABLE ALWAYS TRIGGER name */
ATExecEnableDisableTrigger(rel, cmd->name, ATExecEnableDisableTrigger(rel, cmd->name,
TRIGGER_FIRES_ALWAYS, false); TRIGGER_FIRES_ALWAYS, false, lockmode);
break; break;
case AT_EnableReplicaTrig: /* ENABLE REPLICA TRIGGER name */ case AT_EnableReplicaTrig: /* ENABLE REPLICA TRIGGER name */
ATExecEnableDisableTrigger(rel, cmd->name, ATExecEnableDisableTrigger(rel, cmd->name,
TRIGGER_FIRES_ON_REPLICA, false); TRIGGER_FIRES_ON_REPLICA, false, lockmode);
break; break;
case AT_DisableTrig: /* DISABLE TRIGGER name */ case AT_DisableTrig: /* DISABLE TRIGGER name */
ATExecEnableDisableTrigger(rel, cmd->name, ATExecEnableDisableTrigger(rel, cmd->name,
TRIGGER_DISABLED, false); TRIGGER_DISABLED, false, lockmode);
break; break;
case AT_EnableTrigAll: /* ENABLE TRIGGER ALL */ case AT_EnableTrigAll: /* ENABLE TRIGGER ALL */
ATExecEnableDisableTrigger(rel, NULL, ATExecEnableDisableTrigger(rel, NULL,
TRIGGER_FIRES_ON_ORIGIN, false); TRIGGER_FIRES_ON_ORIGIN, false, lockmode);
break; break;
case AT_DisableTrigAll: /* DISABLE TRIGGER ALL */ case AT_DisableTrigAll: /* DISABLE TRIGGER ALL */
ATExecEnableDisableTrigger(rel, NULL, ATExecEnableDisableTrigger(rel, NULL,
TRIGGER_DISABLED, false); TRIGGER_DISABLED, false, lockmode);
break; break;
case AT_EnableTrigUser: /* ENABLE TRIGGER USER */ case AT_EnableTrigUser: /* ENABLE TRIGGER USER */
ATExecEnableDisableTrigger(rel, NULL, ATExecEnableDisableTrigger(rel, NULL,
TRIGGER_FIRES_ON_ORIGIN, true); TRIGGER_FIRES_ON_ORIGIN, true, lockmode);
break; break;
case AT_DisableTrigUser: /* DISABLE TRIGGER USER */ case AT_DisableTrigUser: /* DISABLE TRIGGER USER */
ATExecEnableDisableTrigger(rel, NULL, ATExecEnableDisableTrigger(rel, NULL,
TRIGGER_DISABLED, true); TRIGGER_DISABLED, true, lockmode);
break; break;
case AT_EnableRule: /* ENABLE RULE name */ case AT_EnableRule: /* ENABLE RULE name */
ATExecEnableDisableRule(rel, cmd->name, ATExecEnableDisableRule(rel, cmd->name,
RULE_FIRES_ON_ORIGIN); RULE_FIRES_ON_ORIGIN, lockmode);
break; break;
case AT_EnableAlwaysRule: /* ENABLE ALWAYS RULE name */ case AT_EnableAlwaysRule: /* ENABLE ALWAYS RULE name */
ATExecEnableDisableRule(rel, cmd->name, ATExecEnableDisableRule(rel, cmd->name,
RULE_FIRES_ALWAYS); RULE_FIRES_ALWAYS, lockmode);
break; break;
case AT_EnableReplicaRule: /* ENABLE REPLICA RULE name */ case AT_EnableReplicaRule: /* ENABLE REPLICA RULE name */
ATExecEnableDisableRule(rel, cmd->name, ATExecEnableDisableRule(rel, cmd->name,
RULE_FIRES_ON_REPLICA); RULE_FIRES_ON_REPLICA, lockmode);
break; break;
case AT_DisableRule: /* DISABLE RULE name */ case AT_DisableRule: /* DISABLE RULE name */
ATExecEnableDisableRule(rel, cmd->name, ATExecEnableDisableRule(rel, cmd->name,
RULE_DISABLED); RULE_DISABLED, lockmode);
break; break;
case AT_AddInherit: case AT_AddInherit:
ATExecAddInherit(rel, (RangeVar *) cmd->def); ATExecAddInherit(rel, (RangeVar *) cmd->def, lockmode);
break; break;
case AT_DropInherit: case AT_DropInherit:
ATExecDropInherit(rel, (RangeVar *) cmd->def); ATExecDropInherit(rel, (RangeVar *) cmd->def, lockmode);
break; break;
default: /* oops */ default: /* oops */
elog(ERROR, "unrecognized alter table type: %d", elog(ERROR, "unrecognized alter table type: %d",
...@@ -2872,7 +3046,7 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -2872,7 +3046,7 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
* ATRewriteTables: ALTER TABLE phase 3 * ATRewriteTables: ALTER TABLE phase 3
*/ */
static void static void
ATRewriteTables(List **wqueue) ATRewriteTables(List **wqueue, LOCKMODE lockmode)
{ {
ListCell *ltab; ListCell *ltab;
...@@ -2933,7 +3107,7 @@ ATRewriteTables(List **wqueue) ...@@ -2933,7 +3107,7 @@ ATRewriteTables(List **wqueue)
* modifications, and test the current data within the table * modifications, and test the current data within the table
* against new constraints generated by ALTER TABLE commands. * against new constraints generated by ALTER TABLE commands.
*/ */
ATRewriteTable(tab, OIDNewHeap); ATRewriteTable(tab, OIDNewHeap, lockmode);
/* /*
* Swap the physical files of the old and new heaps, then rebuild * Swap the physical files of the old and new heaps, then rebuild
...@@ -2952,14 +3126,14 @@ ATRewriteTables(List **wqueue) ...@@ -2952,14 +3126,14 @@ ATRewriteTables(List **wqueue)
* generated by ALTER TABLE commands, but don't rebuild data. * generated by ALTER TABLE commands, but don't rebuild data.
*/ */
if (tab->constraints != NIL || tab->new_notnull) if (tab->constraints != NIL || tab->new_notnull)
ATRewriteTable(tab, InvalidOid); ATRewriteTable(tab, InvalidOid, lockmode);
/* /*
* If we had SET TABLESPACE but no reason to reconstruct tuples, * If we had SET TABLESPACE but no reason to reconstruct tuples,
* just do a block-by-block copy. * just do a block-by-block copy.
*/ */
if (tab->newTableSpace) if (tab->newTableSpace)
ATExecSetTableSpace(tab->relid, tab->newTableSpace); ATExecSetTableSpace(tab->relid, tab->newTableSpace, lockmode);
} }
} }
...@@ -2986,12 +3160,14 @@ ATRewriteTables(List **wqueue) ...@@ -2986,12 +3160,14 @@ ATRewriteTables(List **wqueue)
Relation refrel; Relation refrel;
if (rel == NULL) if (rel == NULL)
{
/* Long since locked, no need for another */ /* Long since locked, no need for another */
rel = heap_open(tab->relid, NoLock); rel = heap_open(tab->relid, NoLock);
}
refrel = heap_open(con->refrelid, RowShareLock); /*
* We're adding a trigger to both tables, so the lock level
* here should sensibly reflect that.
*/
refrel = heap_open(con->refrelid, ShareRowExclusiveLock);
validateForeignKeyConstraint(fkconstraint, rel, refrel, validateForeignKeyConstraint(fkconstraint, rel, refrel,
con->refindid, con->refindid,
...@@ -3012,7 +3188,7 @@ ATRewriteTables(List **wqueue) ...@@ -3012,7 +3188,7 @@ ATRewriteTables(List **wqueue)
* OIDNewHeap is InvalidOid if we don't need to rewrite * OIDNewHeap is InvalidOid if we don't need to rewrite
*/ */
static void static void
ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap) ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
{ {
Relation oldrel; Relation oldrel;
Relation newrel; Relation newrel;
...@@ -3036,7 +3212,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap) ...@@ -3036,7 +3212,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap)
newTupDesc = RelationGetDescr(oldrel); /* includes all mods */ newTupDesc = RelationGetDescr(oldrel); /* includes all mods */
if (OidIsValid(OIDNewHeap)) if (OidIsValid(OIDNewHeap))
newrel = heap_open(OIDNewHeap, AccessExclusiveLock); newrel = heap_open(OIDNewHeap, lockmode);
else else
newrel = NULL; newrel = NULL;
...@@ -3402,7 +3578,7 @@ ATSimplePermissionsRelationOrIndex(Relation rel) ...@@ -3402,7 +3578,7 @@ ATSimplePermissionsRelationOrIndex(Relation rel)
*/ */
static void static void
ATSimpleRecursion(List **wqueue, Relation rel, ATSimpleRecursion(List **wqueue, Relation rel,
AlterTableCmd *cmd, bool recurse) AlterTableCmd *cmd, bool recurse, LOCKMODE lockmode)
{ {
/* /*
* Propagate to children if desired. Non-table relations never have * Propagate to children if desired. Non-table relations never have
...@@ -3414,7 +3590,7 @@ ATSimpleRecursion(List **wqueue, Relation rel, ...@@ -3414,7 +3590,7 @@ ATSimpleRecursion(List **wqueue, Relation rel,
ListCell *child; ListCell *child;
List *children; List *children;
children = find_all_inheritors(relid, AccessExclusiveLock, NULL); children = find_all_inheritors(relid, lockmode, NULL);
/* /*
* find_all_inheritors does the recursive search of the inheritance * find_all_inheritors does the recursive search of the inheritance
...@@ -3431,7 +3607,7 @@ ATSimpleRecursion(List **wqueue, Relation rel, ...@@ -3431,7 +3607,7 @@ ATSimpleRecursion(List **wqueue, Relation rel,
/* find_all_inheritors already got lock */ /* find_all_inheritors already got lock */
childrel = relation_open(childrelid, NoLock); childrel = relation_open(childrelid, NoLock);
CheckTableNotInUse(childrel, "ALTER TABLE"); CheckTableNotInUse(childrel, "ALTER TABLE");
ATPrepCmd(wqueue, childrel, cmd, false, true); ATPrepCmd(wqueue, childrel, cmd, false, true, lockmode);
relation_close(childrel, NoLock); relation_close(childrel, NoLock);
} }
} }
...@@ -3447,13 +3623,13 @@ ATSimpleRecursion(List **wqueue, Relation rel, ...@@ -3447,13 +3623,13 @@ ATSimpleRecursion(List **wqueue, Relation rel,
*/ */
static void static void
ATOneLevelRecursion(List **wqueue, Relation rel, ATOneLevelRecursion(List **wqueue, Relation rel,
AlterTableCmd *cmd) AlterTableCmd *cmd, LOCKMODE lockmode)
{ {
Oid relid = RelationGetRelid(rel); Oid relid = RelationGetRelid(rel);
ListCell *child; ListCell *child;
List *children; List *children;
children = find_inheritance_children(relid, AccessExclusiveLock); children = find_inheritance_children(relid, lockmode);
foreach(child, children) foreach(child, children)
{ {
...@@ -3463,7 +3639,7 @@ ATOneLevelRecursion(List **wqueue, Relation rel, ...@@ -3463,7 +3639,7 @@ ATOneLevelRecursion(List **wqueue, Relation rel,
/* find_inheritance_children already got lock */ /* find_inheritance_children already got lock */
childrel = relation_open(childrelid, NoLock); childrel = relation_open(childrelid, NoLock);
CheckTableNotInUse(childrel, "ALTER TABLE"); CheckTableNotInUse(childrel, "ALTER TABLE");
ATPrepCmd(wqueue, childrel, cmd, true, true); ATPrepCmd(wqueue, childrel, cmd, true, true, lockmode);
relation_close(childrel, NoLock); relation_close(childrel, NoLock);
} }
} }
...@@ -3581,7 +3757,7 @@ find_composite_type_dependencies(Oid typeOid, ...@@ -3581,7 +3757,7 @@ find_composite_type_dependencies(Oid typeOid,
*/ */
static void static void
ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
AlterTableCmd *cmd) AlterTableCmd *cmd, LOCKMODE lockmode)
{ {
if (rel->rd_rel->reloftype) if (rel->rd_rel->reloftype)
ereport(ERROR, ereport(ERROR,
...@@ -3604,7 +3780,7 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, ...@@ -3604,7 +3780,7 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
colDefChild->inhcount = 1; colDefChild->inhcount = 1;
colDefChild->is_local = false; colDefChild->is_local = false;
ATOneLevelRecursion(wqueue, rel, childCmd); ATOneLevelRecursion(wqueue, rel, childCmd, lockmode);
} }
else else
{ {
...@@ -3621,7 +3797,7 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse, ...@@ -3621,7 +3797,7 @@ ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
static void static void
ATExecAddColumn(AlteredTableInfo *tab, Relation rel, ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
ColumnDef *colDef, bool isOid) ColumnDef *colDef, bool isOid, LOCKMODE lockmode)
{ {
Oid myrelid = RelationGetRelid(rel); Oid myrelid = RelationGetRelid(rel);
Relation pgclass, Relation pgclass,
...@@ -3902,7 +4078,7 @@ add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid) ...@@ -3902,7 +4078,7 @@ add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid)
* to cons up a ColumnDef node because the ADD COLUMN code needs one. * to cons up a ColumnDef node because the ADD COLUMN code needs one.
*/ */
static void static void
ATPrepAddOids(List **wqueue, Relation rel, bool recurse, AlterTableCmd *cmd) ATPrepAddOids(List **wqueue, Relation rel, bool recurse, AlterTableCmd *cmd, LOCKMODE lockmode)
{ {
/* If we're recursing to a child table, the ColumnDef is already set up */ /* If we're recursing to a child table, the ColumnDef is already set up */
if (cmd->def == NULL) if (cmd->def == NULL)
...@@ -3917,14 +4093,14 @@ ATPrepAddOids(List **wqueue, Relation rel, bool recurse, AlterTableCmd *cmd) ...@@ -3917,14 +4093,14 @@ ATPrepAddOids(List **wqueue, Relation rel, bool recurse, AlterTableCmd *cmd)
cdef->storage = 0; cdef->storage = 0;
cmd->def = (Node *) cdef; cmd->def = (Node *) cdef;
} }
ATPrepAddColumn(wqueue, rel, recurse, cmd); ATPrepAddColumn(wqueue, rel, recurse, cmd, lockmode);
} }
/* /*
* ALTER TABLE ALTER COLUMN DROP NOT NULL * ALTER TABLE ALTER COLUMN DROP NOT NULL
*/ */
static void static void
ATExecDropNotNull(Relation rel, const char *colName) ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
{ {
HeapTuple tuple; HeapTuple tuple;
AttrNumber attnum; AttrNumber attnum;
...@@ -4016,7 +4192,7 @@ ATExecDropNotNull(Relation rel, const char *colName) ...@@ -4016,7 +4192,7 @@ ATExecDropNotNull(Relation rel, const char *colName)
*/ */
static void static void
ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
const char *colName) const char *colName, LOCKMODE lockmode)
{ {
HeapTuple tuple; HeapTuple tuple;
AttrNumber attnum; AttrNumber attnum;
...@@ -4068,7 +4244,7 @@ ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, ...@@ -4068,7 +4244,7 @@ ATExecSetNotNull(AlteredTableInfo *tab, Relation rel,
*/ */
static void static void
ATExecColumnDefault(Relation rel, const char *colName, ATExecColumnDefault(Relation rel, const char *colName,
Node *newDefault) Node *newDefault, LOCKMODE lockmode)
{ {
AttrNumber attnum; AttrNumber attnum;
...@@ -4117,7 +4293,7 @@ ATExecColumnDefault(Relation rel, const char *colName, ...@@ -4117,7 +4293,7 @@ ATExecColumnDefault(Relation rel, const char *colName,
* ALTER TABLE ALTER COLUMN SET STATISTICS * ALTER TABLE ALTER COLUMN SET STATISTICS
*/ */
static void static void
ATPrepSetStatistics(Relation rel, const char *colName, Node *newValue) ATPrepSetStatistics(Relation rel, const char *colName, Node *newValue, LOCKMODE lockmode)
{ {
/* /*
* We do our own permission checking because (a) we want to allow SET * We do our own permission checking because (a) we want to allow SET
...@@ -4139,7 +4315,7 @@ ATPrepSetStatistics(Relation rel, const char *colName, Node *newValue) ...@@ -4139,7 +4315,7 @@ ATPrepSetStatistics(Relation rel, const char *colName, Node *newValue)
} }
static void static void
ATExecSetStatistics(Relation rel, const char *colName, Node *newValue) ATExecSetStatistics(Relation rel, const char *colName, Node *newValue, LOCKMODE lockmode)
{ {
int newtarget; int newtarget;
Relation attrelation; Relation attrelation;
...@@ -4199,7 +4375,7 @@ ATExecSetStatistics(Relation rel, const char *colName, Node *newValue) ...@@ -4199,7 +4375,7 @@ ATExecSetStatistics(Relation rel, const char *colName, Node *newValue)
static void static void
ATExecSetOptions(Relation rel, const char *colName, Node *options, ATExecSetOptions(Relation rel, const char *colName, Node *options,
bool isReset) bool isReset, LOCKMODE lockmode)
{ {
Relation attrelation; Relation attrelation;
HeapTuple tuple, HeapTuple tuple,
...@@ -4263,7 +4439,7 @@ ATExecSetOptions(Relation rel, const char *colName, Node *options, ...@@ -4263,7 +4439,7 @@ ATExecSetOptions(Relation rel, const char *colName, Node *options,
* ALTER TABLE ALTER COLUMN SET STORAGE * ALTER TABLE ALTER COLUMN SET STORAGE
*/ */
static void static void
ATExecSetStorage(Relation rel, const char *colName, Node *newValue) ATExecSetStorage(Relation rel, const char *colName, Node *newValue, LOCKMODE lockmode)
{ {
char *storagemode; char *storagemode;
char newstorage; char newstorage;
...@@ -4357,7 +4533,7 @@ static void ...@@ -4357,7 +4533,7 @@ static void
ATExecDropColumn(List **wqueue, Relation rel, const char *colName, ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
DropBehavior behavior, DropBehavior behavior,
bool recurse, bool recursing, bool recurse, bool recursing,
bool missing_ok) bool missing_ok, LOCKMODE lockmode)
{ {
HeapTuple tuple; HeapTuple tuple;
Form_pg_attribute targetatt; Form_pg_attribute targetatt;
...@@ -4415,8 +4591,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, ...@@ -4415,8 +4591,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
* routines, we have to do this one level of recursion at a time; we can't * routines, we have to do this one level of recursion at a time; we can't
* use find_all_inheritors to do it in one pass. * use find_all_inheritors to do it in one pass.
*/ */
children = find_inheritance_children(RelationGetRelid(rel), children = find_inheritance_children(RelationGetRelid(rel), lockmode);
AccessExclusiveLock);
if (children) if (children)
{ {
...@@ -4456,7 +4631,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, ...@@ -4456,7 +4631,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
/* Time to delete this child column, too */ /* Time to delete this child column, too */
ATExecDropColumn(wqueue, childrel, colName, ATExecDropColumn(wqueue, childrel, colName,
behavior, true, true, behavior, true, true,
false); false, lockmode);
} }
else else
{ {
...@@ -4551,7 +4726,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, ...@@ -4551,7 +4726,7 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName,
*/ */
static void static void
ATExecAddIndex(AlteredTableInfo *tab, Relation rel, ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
IndexStmt *stmt, bool is_rebuild) IndexStmt *stmt, bool is_rebuild, LOCKMODE lockmode)
{ {
bool check_rights; bool check_rights;
bool skip_build; bool skip_build;
...@@ -4594,7 +4769,7 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel, ...@@ -4594,7 +4769,7 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
*/ */
static void static void
ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
Constraint *newConstraint, bool recurse) Constraint *newConstraint, bool recurse, LOCKMODE lockmode)
{ {
Assert(IsA(newConstraint, Constraint)); Assert(IsA(newConstraint, Constraint));
...@@ -4607,7 +4782,7 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -4607,7 +4782,7 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
{ {
case CONSTR_CHECK: case CONSTR_CHECK:
ATAddCheckConstraint(wqueue, tab, rel, ATAddCheckConstraint(wqueue, tab, rel,
newConstraint, recurse, false); newConstraint, recurse, false, lockmode);
break; break;
case CONSTR_FOREIGN: case CONSTR_FOREIGN:
...@@ -4638,7 +4813,7 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -4638,7 +4813,7 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
RelationGetNamespace(rel), RelationGetNamespace(rel),
NIL); NIL);
ATAddForeignKeyConstraint(tab, rel, newConstraint); ATAddForeignKeyConstraint(tab, rel, newConstraint, lockmode);
break; break;
default: default:
...@@ -4662,7 +4837,8 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -4662,7 +4837,8 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
*/ */
static void static void
ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
Constraint *constr, bool recurse, bool recursing) Constraint *constr, bool recurse, bool recursing,
LOCKMODE lockmode)
{ {
List *newcons; List *newcons;
ListCell *lcon; ListCell *lcon;
...@@ -4717,8 +4893,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -4717,8 +4893,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* routines, we have to do this one level of recursion at a time; we can't * routines, we have to do this one level of recursion at a time; we can't
* use find_all_inheritors to do it in one pass. * use find_all_inheritors to do it in one pass.
*/ */
children = find_inheritance_children(RelationGetRelid(rel), children = find_inheritance_children(RelationGetRelid(rel), lockmode);
AccessExclusiveLock);
/* /*
* If we are told not to recurse, there had better not be any child * If we are told not to recurse, there had better not be any child
...@@ -4744,7 +4919,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -4744,7 +4919,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
/* Recurse to child */ /* Recurse to child */
ATAddCheckConstraint(wqueue, childtab, childrel, ATAddCheckConstraint(wqueue, childtab, childrel,
constr, recurse, true); constr, recurse, true, lockmode);
heap_close(childrel, NoLock); heap_close(childrel, NoLock);
} }
...@@ -4759,7 +4934,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -4759,7 +4934,7 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
*/ */
static void static void
ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
Constraint *fkconstraint) Constraint *fkconstraint, LOCKMODE lockmode)
{ {
Relation pkrel; Relation pkrel;
int16 pkattnum[INDEX_MAX_KEYS]; int16 pkattnum[INDEX_MAX_KEYS];
...@@ -4776,14 +4951,7 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, ...@@ -4776,14 +4951,7 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
Oid indexOid; Oid indexOid;
Oid constrOid; Oid constrOid;
/* pkrel = heap_openrv(fkconstraint->pktable, lockmode);
* Grab an exclusive lock on the pk table, so that someone doesn't delete
* rows out from under us. (Although a lesser lock would do for that
* purpose, we'll need exclusive lock anyway to add triggers to the pk
* table; trying to start with a lesser lock will just create a risk of
* deadlock.)
*/
pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
/* /*
* Validity checks (permission checks wait till we have the column * Validity checks (permission checks wait till we have the column
...@@ -5347,7 +5515,7 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts) ...@@ -5347,7 +5515,7 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
* Scan the existing rows in a table to verify they meet a proposed FK * Scan the existing rows in a table to verify they meet a proposed FK
* constraint. * constraint.
* *
* Caller must have opened and locked both relations. * Caller must have opened and locked both relations appropriately.
*/ */
static void static void
validateForeignKeyConstraint(Constraint *fkconstraint, validateForeignKeyConstraint(Constraint *fkconstraint,
...@@ -5599,7 +5767,7 @@ static void ...@@ -5599,7 +5767,7 @@ static void
ATExecDropConstraint(Relation rel, const char *constrName, ATExecDropConstraint(Relation rel, const char *constrName,
DropBehavior behavior, DropBehavior behavior,
bool recurse, bool recursing, bool recurse, bool recursing,
bool missing_ok) bool missing_ok, LOCKMODE lockmode)
{ {
List *children; List *children;
ListCell *child; ListCell *child;
...@@ -5686,8 +5854,7 @@ ATExecDropConstraint(Relation rel, const char *constrName, ...@@ -5686,8 +5854,7 @@ ATExecDropConstraint(Relation rel, const char *constrName,
* use find_all_inheritors to do it in one pass. * use find_all_inheritors to do it in one pass.
*/ */
if (is_check_constraint) if (is_check_constraint)
children = find_inheritance_children(RelationGetRelid(rel), children = find_inheritance_children(RelationGetRelid(rel), lockmode);
AccessExclusiveLock);
else else
children = NIL; children = NIL;
...@@ -5743,7 +5910,7 @@ ATExecDropConstraint(Relation rel, const char *constrName, ...@@ -5743,7 +5910,7 @@ ATExecDropConstraint(Relation rel, const char *constrName,
/* Time to delete this child constraint, too */ /* Time to delete this child constraint, too */
ATExecDropConstraint(childrel, constrName, behavior, ATExecDropConstraint(childrel, constrName, behavior,
true, true, true, true,
false); false, lockmode);
} }
else else
{ {
...@@ -5798,7 +5965,7 @@ static void ...@@ -5798,7 +5965,7 @@ static void
ATPrepAlterColumnType(List **wqueue, ATPrepAlterColumnType(List **wqueue,
AlteredTableInfo *tab, Relation rel, AlteredTableInfo *tab, Relation rel,
bool recurse, bool recursing, bool recurse, bool recursing,
AlterTableCmd *cmd) AlterTableCmd *cmd, LOCKMODE lockmode)
{ {
char *colName = cmd->name; char *colName = cmd->name;
TypeName *typeName = (TypeName *) cmd->def; TypeName *typeName = (TypeName *) cmd->def;
...@@ -5925,7 +6092,7 @@ ATPrepAlterColumnType(List **wqueue, ...@@ -5925,7 +6092,7 @@ ATPrepAlterColumnType(List **wqueue,
* alter would put them out of step. * alter would put them out of step.
*/ */
if (recurse) if (recurse)
ATSimpleRecursion(wqueue, rel, cmd, recurse); ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode);
else if (!recursing && else if (!recursing &&
find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL) find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL)
ereport(ERROR, ereport(ERROR,
...@@ -5936,7 +6103,7 @@ ATPrepAlterColumnType(List **wqueue, ...@@ -5936,7 +6103,7 @@ ATPrepAlterColumnType(List **wqueue,
static void static void
ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
const char *colName, TypeName *typeName) const char *colName, TypeName *typeName, LOCKMODE lockmode)
{ {
HeapTuple heapTup; HeapTuple heapTup;
Form_pg_attribute attTup; Form_pg_attribute attTup;
...@@ -6284,7 +6451,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, ...@@ -6284,7 +6451,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
* and constraints that depend on the altered columns. * and constraints that depend on the altered columns.
*/ */
static void static void
ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab) ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
{ {
ObjectAddress obj; ObjectAddress obj;
ListCell *l; ListCell *l;
...@@ -6298,9 +6465,9 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab) ...@@ -6298,9 +6465,9 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
* at the catalogs to detect the existing entry. * at the catalogs to detect the existing entry.
*/ */
foreach(l, tab->changedIndexDefs) foreach(l, tab->changedIndexDefs)
ATPostAlterTypeParse((char *) lfirst(l), wqueue); ATPostAlterTypeParse((char *) lfirst(l), wqueue, lockmode);
foreach(l, tab->changedConstraintDefs) foreach(l, tab->changedConstraintDefs)
ATPostAlterTypeParse((char *) lfirst(l), wqueue); ATPostAlterTypeParse((char *) lfirst(l), wqueue, lockmode);
/* /*
* Now we can drop the existing constraints and indexes --- constraints * Now we can drop the existing constraints and indexes --- constraints
...@@ -6333,7 +6500,7 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab) ...@@ -6333,7 +6500,7 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab)
} }
static void static void
ATPostAlterTypeParse(char *cmd, List **wqueue) ATPostAlterTypeParse(char *cmd, List **wqueue, LOCKMODE lockmode)
{ {
List *raw_parsetree_list; List *raw_parsetree_list;
List *querytree_list; List *querytree_list;
...@@ -6380,7 +6547,7 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) ...@@ -6380,7 +6547,7 @@ ATPostAlterTypeParse(char *cmd, List **wqueue)
IndexStmt *stmt = (IndexStmt *) stm; IndexStmt *stmt = (IndexStmt *) stm;
AlterTableCmd *newcmd; AlterTableCmd *newcmd;
rel = relation_openrv(stmt->relation, AccessExclusiveLock); rel = relation_openrv(stmt->relation, lockmode);
tab = ATGetQueueEntry(wqueue, rel); tab = ATGetQueueEntry(wqueue, rel);
newcmd = makeNode(AlterTableCmd); newcmd = makeNode(AlterTableCmd);
newcmd->subtype = AT_ReAddIndex; newcmd->subtype = AT_ReAddIndex;
...@@ -6395,7 +6562,7 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) ...@@ -6395,7 +6562,7 @@ ATPostAlterTypeParse(char *cmd, List **wqueue)
AlterTableStmt *stmt = (AlterTableStmt *) stm; AlterTableStmt *stmt = (AlterTableStmt *) stm;
ListCell *lcmd; ListCell *lcmd;
rel = relation_openrv(stmt->relation, AccessExclusiveLock); rel = relation_openrv(stmt->relation, lockmode);
tab = ATGetQueueEntry(wqueue, rel); tab = ATGetQueueEntry(wqueue, rel);
foreach(lcmd, stmt->cmds) foreach(lcmd, stmt->cmds)
{ {
...@@ -6441,7 +6608,7 @@ ATPostAlterTypeParse(char *cmd, List **wqueue) ...@@ -6441,7 +6608,7 @@ ATPostAlterTypeParse(char *cmd, List **wqueue)
* free-standing composite type. * free-standing composite type.
*/ */
void void
ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing) ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lockmode)
{ {
Relation target_rel; Relation target_rel;
Relation class_rel; Relation class_rel;
...@@ -6452,7 +6619,7 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing) ...@@ -6452,7 +6619,7 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing)
* Get exclusive lock till end of transaction on the target table. Use * Get exclusive lock till end of transaction on the target table. Use
* relation_open so that we can work on indexes and sequences. * relation_open so that we can work on indexes and sequences.
*/ */
target_rel = relation_open(relationOid, AccessExclusiveLock); target_rel = relation_open(relationOid, lockmode);
/* Get its pg_class tuple, too */ /* Get its pg_class tuple, too */
class_rel = heap_open(RelationRelationId, RowExclusiveLock); class_rel = heap_open(RelationRelationId, RowExclusiveLock);
...@@ -6629,7 +6796,7 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing) ...@@ -6629,7 +6796,7 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing)
/* For each index, recursively change its ownership */ /* For each index, recursively change its ownership */
foreach(i, index_oid_list) foreach(i, index_oid_list)
ATExecChangeOwner(lfirst_oid(i), newOwnerId, true); ATExecChangeOwner(lfirst_oid(i), newOwnerId, true, lockmode);
list_free(index_oid_list); list_free(index_oid_list);
} }
...@@ -6639,10 +6806,10 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing) ...@@ -6639,10 +6806,10 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing)
/* If it has a toast table, recurse to change its ownership */ /* If it has a toast table, recurse to change its ownership */
if (tuple_class->reltoastrelid != InvalidOid) if (tuple_class->reltoastrelid != InvalidOid)
ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerId, ATExecChangeOwner(tuple_class->reltoastrelid, newOwnerId,
true); true, lockmode);
/* If it has dependent sequences, recurse to change them too */ /* If it has dependent sequences, recurse to change them too */
change_owner_recurse_to_sequences(relationOid, newOwnerId); change_owner_recurse_to_sequences(relationOid, newOwnerId, lockmode);
} }
} }
...@@ -6659,7 +6826,7 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing) ...@@ -6659,7 +6826,7 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing)
* ownership. * ownership.
*/ */
static void static void
change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId) change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId, LOCKMODE lockmode)
{ {
Relation depRel; Relation depRel;
SysScanDesc scan; SysScanDesc scan;
...@@ -6698,18 +6865,18 @@ change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId) ...@@ -6698,18 +6865,18 @@ change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId)
continue; continue;
/* Use relation_open just in case it's an index */ /* Use relation_open just in case it's an index */
seqRel = relation_open(depForm->objid, AccessExclusiveLock); seqRel = relation_open(depForm->objid, lockmode);
/* skip non-sequence relations */ /* skip non-sequence relations */
if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE) if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
{ {
/* No need to keep the lock */ /* No need to keep the lock */
relation_close(seqRel, AccessExclusiveLock); relation_close(seqRel, lockmode);
continue; continue;
} }
/* We don't need to close the sequence while we alter it. */ /* We don't need to close the sequence while we alter it. */
ATExecChangeOwner(depForm->objid, newOwnerId, true); ATExecChangeOwner(depForm->objid, newOwnerId, true, lockmode);
/* Now we can close it. Keep the lock till end of transaction. */ /* Now we can close it. Keep the lock till end of transaction. */
relation_close(seqRel, NoLock); relation_close(seqRel, NoLock);
...@@ -6726,7 +6893,7 @@ change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId) ...@@ -6726,7 +6893,7 @@ change_owner_recurse_to_sequences(Oid relationOid, Oid newOwnerId)
* The only thing we have to do is to change the indisclustered bits. * The only thing we have to do is to change the indisclustered bits.
*/ */
static void static void
ATExecClusterOn(Relation rel, const char *indexName) ATExecClusterOn(Relation rel, const char *indexName, LOCKMODE lockmode)
{ {
Oid indexOid; Oid indexOid;
...@@ -6752,7 +6919,7 @@ ATExecClusterOn(Relation rel, const char *indexName) ...@@ -6752,7 +6919,7 @@ ATExecClusterOn(Relation rel, const char *indexName)
* set and turn it off. * set and turn it off.
*/ */
static void static void
ATExecDropCluster(Relation rel) ATExecDropCluster(Relation rel, LOCKMODE lockmode)
{ {
mark_index_clustered(rel, InvalidOid); mark_index_clustered(rel, InvalidOid);
} }
...@@ -6761,7 +6928,7 @@ ATExecDropCluster(Relation rel) ...@@ -6761,7 +6928,7 @@ ATExecDropCluster(Relation rel)
* ALTER TABLE SET TABLESPACE * ALTER TABLE SET TABLESPACE
*/ */
static void static void
ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename) ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename, LOCKMODE lockmode)
{ {
Oid tablespaceId; Oid tablespaceId;
AclResult aclresult; AclResult aclresult;
...@@ -6790,7 +6957,7 @@ ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename) ...@@ -6790,7 +6957,7 @@ ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel, char *tablespacename)
* ALTER TABLE/INDEX SET (...) or RESET (...) * ALTER TABLE/INDEX SET (...) or RESET (...)
*/ */
static void static void
ATExecSetRelOptions(Relation rel, List *defList, bool isReset) ATExecSetRelOptions(Relation rel, List *defList, bool isReset, LOCKMODE lockmode)
{ {
Oid relid; Oid relid;
Relation pgclass; Relation pgclass;
...@@ -6871,7 +7038,7 @@ ATExecSetRelOptions(Relation rel, List *defList, bool isReset) ...@@ -6871,7 +7038,7 @@ ATExecSetRelOptions(Relation rel, List *defList, bool isReset)
Relation toastrel; Relation toastrel;
Oid toastid = rel->rd_rel->reltoastrelid; Oid toastid = rel->rd_rel->reltoastrelid;
toastrel = heap_open(toastid, AccessExclusiveLock); toastrel = heap_open(toastid, lockmode);
/* Get the old reloptions */ /* Get the old reloptions */
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(toastid)); tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(toastid));
...@@ -6918,7 +7085,7 @@ ATExecSetRelOptions(Relation rel, List *defList, bool isReset) ...@@ -6918,7 +7085,7 @@ ATExecSetRelOptions(Relation rel, List *defList, bool isReset)
* rewriting to be done, so we just want to copy the data as fast as possible. * rewriting to be done, so we just want to copy the data as fast as possible.
*/ */
static void static void
ATExecSetTableSpace(Oid tableOid, Oid newTableSpace) ATExecSetTableSpace(Oid tableOid, Oid newTableSpace, LOCKMODE lockmode)
{ {
Relation rel; Relation rel;
Oid oldTableSpace; Oid oldTableSpace;
...@@ -6935,7 +7102,7 @@ ATExecSetTableSpace(Oid tableOid, Oid newTableSpace) ...@@ -6935,7 +7102,7 @@ ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
/* /*
* Need lock here in case we are recursing to toast table or index * Need lock here in case we are recursing to toast table or index
*/ */
rel = relation_open(tableOid, AccessExclusiveLock); rel = relation_open(tableOid, lockmode);
/* /*
* No work if no change in tablespace. * No work if no change in tablespace.
...@@ -7049,9 +7216,9 @@ ATExecSetTableSpace(Oid tableOid, Oid newTableSpace) ...@@ -7049,9 +7216,9 @@ ATExecSetTableSpace(Oid tableOid, Oid newTableSpace)
/* Move associated toast relation and/or index, too */ /* Move associated toast relation and/or index, too */
if (OidIsValid(reltoastrelid)) if (OidIsValid(reltoastrelid))
ATExecSetTableSpace(reltoastrelid, newTableSpace); ATExecSetTableSpace(reltoastrelid, newTableSpace, lockmode);
if (OidIsValid(reltoastidxid)) if (OidIsValid(reltoastidxid))
ATExecSetTableSpace(reltoastidxid, newTableSpace); ATExecSetTableSpace(reltoastidxid, newTableSpace, lockmode);
} }
/* /*
...@@ -7119,7 +7286,7 @@ copy_relation_data(SMgrRelation src, SMgrRelation dst, ...@@ -7119,7 +7286,7 @@ copy_relation_data(SMgrRelation src, SMgrRelation dst,
*/ */
static void static void
ATExecEnableDisableTrigger(Relation rel, char *trigname, ATExecEnableDisableTrigger(Relation rel, char *trigname,
char fires_when, bool skip_system) char fires_when, bool skip_system, LOCKMODE lockmode)
{ {
EnableDisableTrigger(rel, trigname, fires_when, skip_system); EnableDisableTrigger(rel, trigname, fires_when, skip_system);
} }
...@@ -7131,7 +7298,7 @@ ATExecEnableDisableTrigger(Relation rel, char *trigname, ...@@ -7131,7 +7298,7 @@ ATExecEnableDisableTrigger(Relation rel, char *trigname,
*/ */
static void static void
ATExecEnableDisableRule(Relation rel, char *trigname, ATExecEnableDisableRule(Relation rel, char *trigname,
char fires_when) char fires_when, LOCKMODE lockmode)
{ {
EnableDisableRule(rel, trigname, fires_when); EnableDisableRule(rel, trigname, fires_when);
} }
...@@ -7153,7 +7320,7 @@ ATPrepAddInherit(Relation child_rel) ...@@ -7153,7 +7320,7 @@ ATPrepAddInherit(Relation child_rel)
} }
static void static void
ATExecAddInherit(Relation child_rel, RangeVar *parent) ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode)
{ {
Relation parent_rel, Relation parent_rel,
catalogRelation; catalogRelation;
...@@ -7514,7 +7681,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) ...@@ -7514,7 +7681,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel)
* exactly the same way. * exactly the same way.
*/ */
static void static void
ATExecDropInherit(Relation rel, RangeVar *parent) ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode)
{ {
Relation parent_rel; Relation parent_rel;
Relation catalogRelation; Relation catalogRelation;
...@@ -7738,7 +7905,7 @@ ATExecDropInherit(Relation rel, RangeVar *parent) ...@@ -7738,7 +7905,7 @@ ATExecDropInherit(Relation rel, RangeVar *parent)
*/ */
void void
AlterTableNamespace(RangeVar *relation, const char *newschema, AlterTableNamespace(RangeVar *relation, const char *newschema,
ObjectType stmttype) ObjectType stmttype, LOCKMODE lockmode)
{ {
Relation rel; Relation rel;
Oid relid; Oid relid;
...@@ -7746,7 +7913,7 @@ AlterTableNamespace(RangeVar *relation, const char *newschema, ...@@ -7746,7 +7913,7 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
Oid nspOid; Oid nspOid;
Relation classRel; Relation classRel;
rel = relation_openrv(relation, AccessExclusiveLock); rel = relation_openrv(relation, lockmode);
relid = RelationGetRelid(rel); relid = RelationGetRelid(rel);
oldNspOid = RelationGetNamespace(rel); oldNspOid = RelationGetNamespace(rel);
...@@ -7855,7 +8022,7 @@ AlterTableNamespace(RangeVar *relation, const char *newschema, ...@@ -7855,7 +8022,7 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
if (rel->rd_rel->relkind == RELKIND_RELATION) if (rel->rd_rel->relkind == RELKIND_RELATION)
{ {
AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid); AlterIndexNamespaces(classRel, rel, oldNspOid, nspOid);
AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, newschema); AlterSeqNamespaces(classRel, rel, oldNspOid, nspOid, newschema, lockmode);
AlterConstraintNamespaces(relid, oldNspOid, nspOid, false); AlterConstraintNamespaces(relid, oldNspOid, nspOid, false);
} }
...@@ -7951,7 +8118,7 @@ AlterIndexNamespaces(Relation classRel, Relation rel, ...@@ -7951,7 +8118,7 @@ AlterIndexNamespaces(Relation classRel, Relation rel,
*/ */
static void static void
AlterSeqNamespaces(Relation classRel, Relation rel, AlterSeqNamespaces(Relation classRel, Relation rel,
Oid oldNspOid, Oid newNspOid, const char *newNspName) Oid oldNspOid, Oid newNspOid, const char *newNspName, LOCKMODE lockmode)
{ {
Relation depRel; Relation depRel;
SysScanDesc scan; SysScanDesc scan;
...@@ -7990,13 +8157,13 @@ AlterSeqNamespaces(Relation classRel, Relation rel, ...@@ -7990,13 +8157,13 @@ AlterSeqNamespaces(Relation classRel, Relation rel,
continue; continue;
/* Use relation_open just in case it's an index */ /* Use relation_open just in case it's an index */
seqRel = relation_open(depForm->objid, AccessExclusiveLock); seqRel = relation_open(depForm->objid, lockmode);
/* skip non-sequence relations */ /* skip non-sequence relations */
if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE) if (RelationGetForm(seqRel)->relkind != RELKIND_SEQUENCE)
{ {
/* No need to keep the lock */ /* No need to keep the lock */
relation_close(seqRel, AccessExclusiveLock); relation_close(seqRel, lockmode);
continue; continue;
} }
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/trigger.c,v 1.262 2010/02/26 02:00:39 momjian Exp $ * $PostgreSQL: pgsql/src/backend/commands/trigger.c,v 1.263 2010/07/28 05:22:24 sriggs Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -141,7 +141,14 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString, ...@@ -141,7 +141,14 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
ObjectAddress myself, ObjectAddress myself,
referenced; referenced;
rel = heap_openrv(stmt->relation, AccessExclusiveLock); /*
* ShareRowExclusiveLock is sufficient to prevent concurrent write activity
* to the relation, and thus to lock out any operations that might want to
* fire triggers on the relation. If we had ON SELECT triggers we would
* need to take an AccessExclusiveLock to add one of those, just as we do
* with ON SELECT rules.
*/
rel = heap_openrv(stmt->relation, ShareRowExclusiveLock);
if (rel->rd_rel->relkind != RELKIND_RELATION) if (rel->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR, ereport(ERROR,
...@@ -417,7 +424,7 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString, ...@@ -417,7 +424,7 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
* can skip this for internally generated triggers, since the name * can skip this for internally generated triggers, since the name
* modification above should be sufficient. * modification above should be sufficient.
* *
* NOTE that this is cool only because we have AccessExclusiveLock on the * NOTE that this is cool only because we have ShareRowExclusiveLock on the
* relation, so the trigger set won't be changing underneath us. * relation, so the trigger set won't be changing underneath us.
*/ */
if (!isInternal) if (!isInternal)
...@@ -1051,11 +1058,14 @@ RemoveTriggerById(Oid trigOid) ...@@ -1051,11 +1058,14 @@ RemoveTriggerById(Oid trigOid)
elog(ERROR, "could not find tuple for trigger %u", trigOid); elog(ERROR, "could not find tuple for trigger %u", trigOid);
/* /*
* Open and exclusive-lock the relation the trigger belongs to. * Open and lock the relation the trigger belongs to. As in
* CreateTrigger, this is sufficient to lock out all operations that
* could fire or add triggers; but it would need to be revisited if
* we had ON SELECT triggers.
*/ */
relid = ((Form_pg_trigger) GETSTRUCT(tup))->tgrelid; relid = ((Form_pg_trigger) GETSTRUCT(tup))->tgrelid;
rel = heap_open(relid, AccessExclusiveLock); rel = heap_open(relid, ShareRowExclusiveLock);
if (rel->rd_rel->relkind != RELKIND_RELATION) if (rel->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR, ereport(ERROR,
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/typecmds.c,v 1.149 2010/07/25 23:21:21 rhaas Exp $ * $PostgreSQL: pgsql/src/backend/commands/typecmds.c,v 1.150 2010/07/28 05:22:24 sriggs Exp $
* *
* DESCRIPTION * DESCRIPTION
* The "DefineFoo" routines take the parse tree and pick out the * The "DefineFoo" routines take the parse tree and pick out the
...@@ -2638,7 +2638,7 @@ AlterTypeOwner(List *names, Oid newOwnerId) ...@@ -2638,7 +2638,7 @@ AlterTypeOwner(List *names, Oid newOwnerId)
* AlterTypeOwnerInternal to take care of the pg_type entry(s). * AlterTypeOwnerInternal to take care of the pg_type entry(s).
*/ */
if (typTup->typtype == TYPTYPE_COMPOSITE) if (typTup->typtype == TYPTYPE_COMPOSITE)
ATExecChangeOwner(typTup->typrelid, newOwnerId, true); ATExecChangeOwner(typTup->typrelid, newOwnerId, true, AccessExclusiveLock);
else else
{ {
/* /*
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.40 2010/02/26 02:00:53 momjian Exp $ * $PostgreSQL: pgsql/src/backend/parser/parse_utilcmd.c,v 2.41 2010/07/28 05:22:24 sriggs Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -53,6 +53,7 @@ ...@@ -53,6 +53,7 @@
#include "parser/parse_utilcmd.h" #include "parser/parse_utilcmd.h"
#include "parser/parser.h" #include "parser/parser.h"
#include "rewrite/rewriteManip.h" #include "rewrite/rewriteManip.h"
#include "storage/lock.h"
#include "utils/acl.h" #include "utils/acl.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
...@@ -1528,7 +1529,7 @@ transformFKConstraints(ParseState *pstate, CreateStmtContext *cxt, ...@@ -1528,7 +1529,7 @@ transformFKConstraints(ParseState *pstate, CreateStmtContext *cxt,
} }
/* /*
* transformIndexStmt - parse analysis for CREATE INDEX * transformIndexStmt - parse analysis for CREATE INDEX and ALTER TABLE
* *
* Note: this is a no-op for an index not using either index expressions or * Note: this is a no-op for an index not using either index expressions or
* a predicate expression. There are several code paths that create indexes * a predicate expression. There are several code paths that create indexes
...@@ -1554,7 +1555,8 @@ transformIndexStmt(IndexStmt *stmt, const char *queryString) ...@@ -1554,7 +1555,8 @@ transformIndexStmt(IndexStmt *stmt, const char *queryString)
* because addRangeTableEntry() would acquire only AccessShareLock, * because addRangeTableEntry() would acquire only AccessShareLock,
* leaving DefineIndex() needing to do a lock upgrade with consequent risk * leaving DefineIndex() needing to do a lock upgrade with consequent risk
* of deadlock. Make sure this stays in sync with the type of lock * of deadlock. Make sure this stays in sync with the type of lock
* DefineIndex() wants. * DefineIndex() wants. If we are being called by ALTER TABLE, we will
* already hold a higher lock.
*/ */
rel = heap_openrv(stmt->relation, rel = heap_openrv(stmt->relation,
(stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock)); (stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock));
...@@ -1919,6 +1921,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) ...@@ -1919,6 +1921,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
List *newcmds = NIL; List *newcmds = NIL;
bool skipValidation = true; bool skipValidation = true;
AlterTableCmd *newcmd; AlterTableCmd *newcmd;
LOCKMODE lockmode;
/* /*
* We must not scribble on the passed-in AlterTableStmt, so copy it. (This * We must not scribble on the passed-in AlterTableStmt, so copy it. (This
...@@ -1927,13 +1930,19 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString) ...@@ -1927,13 +1930,19 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
stmt = (AlterTableStmt *) copyObject(stmt); stmt = (AlterTableStmt *) copyObject(stmt);
/* /*
* Acquire exclusive lock on the target relation, which will be held until * Assign the appropriate lock level for this list of subcommands.
*/
lockmode = AlterTableGetLockLevel(stmt->cmds);
/*
* Acquire appropriate lock on the target relation, which will be held until
* end of transaction. This ensures any decisions we make here based on * end of transaction. This ensures any decisions we make here based on
* the state of the relation will still be good at execution. We must get * the state of the relation will still be good at execution. We must get
* exclusive lock now because execution will; taking a lower grade lock * lock now because execution will later require it; taking a lower grade lock
* now and trying to upgrade later risks deadlock. * now and trying to upgrade later risks deadlock. Any new commands we add
* after this must not upgrade the lock level requested here.
*/ */
rel = relation_openrv(stmt->relation, AccessExclusiveLock); rel = relation_openrv(stmt->relation, lockmode);
/* Set up pstate */ /* Set up pstate */
pstate = make_parsestate(NULL); pstate = make_parsestate(NULL);
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
* *
* *
* IDENTIFICATION * IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/rewrite/rewriteDefine.c,v 1.141 2010/02/14 18:42:15 rhaas Exp $ * $PostgreSQL: pgsql/src/backend/rewrite/rewriteDefine.c,v 1.142 2010/07/28 05:22:24 sriggs Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -236,11 +236,14 @@ DefineQueryRewrite(char *rulename, ...@@ -236,11 +236,14 @@ DefineQueryRewrite(char *rulename,
/* /*
* If we are installing an ON SELECT rule, we had better grab * If we are installing an ON SELECT rule, we had better grab
* AccessExclusiveLock to ensure no SELECTs are currently running on the * AccessExclusiveLock to ensure no SELECTs are currently running on the
* event relation. For other types of rules, it might be sufficient to * event relation. For other types of rules, it is sufficient to
* grab ShareLock to lock out insert/update/delete actions. But for now, * grab ShareRowExclusiveLock to lock out insert/update/delete actions
* let's just grab AccessExclusiveLock all the time. * and to ensure that we lock out current CREATE RULE statements.
*/ */
event_relation = heap_open(event_relid, AccessExclusiveLock); if (event_type == CMD_SELECT)
event_relation = heap_open(event_relid, AccessExclusiveLock);
else
event_relation = heap_open(event_relid, ShareRowExclusiveLock);
/* /*
* Verify relation is of a type that rules can sensibly be applied to. * Verify relation is of a type that rules can sensibly be applied to.
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
* *
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* *
* $PostgreSQL: pgsql/src/backend/utils/adt/ri_triggers.c,v 1.119 2010/07/22 00:47:52 rhaas Exp $ * $PostgreSQL: pgsql/src/backend/utils/adt/ri_triggers.c,v 1.120 2010/07/28 05:22:24 sriggs Exp $
* *
* ---------- * ----------
*/ */
...@@ -2608,7 +2608,7 @@ RI_FKey_keyequal_upd_fk(Trigger *trigger, Relation fk_rel, ...@@ -2608,7 +2608,7 @@ RI_FKey_keyequal_upd_fk(Trigger *trigger, Relation fk_rel,
* This is not a trigger procedure, but is called during ALTER TABLE * This is not a trigger procedure, but is called during ALTER TABLE
* ADD FOREIGN KEY to validate the initial table contents. * ADD FOREIGN KEY to validate the initial table contents.
* *
* We expect that an exclusive lock has been taken on rel and pkrel; * We expect that a ShareRowExclusiveLock or higher has been taken on rel and pkrel;
* hence, we do not need to lock individual rows for the check. * hence, we do not need to lock individual rows for the check.
* *
* If the check fails because the current user doesn't have permissions * If the check fails because the current user doesn't have permissions
......
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California * Portions Copyright (c) 1994, Regents of the University of California
* *
* $PostgreSQL: pgsql/src/include/commands/tablecmds.h,v 1.46 2010/02/01 19:28:56 rhaas Exp $ * $PostgreSQL: pgsql/src/include/commands/tablecmds.h,v 1.47 2010/07/28 05:22:24 sriggs Exp $
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define TABLECMDS_H #define TABLECMDS_H
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "storage/lock.h"
#include "utils/relcache.h" #include "utils/relcache.h"
...@@ -24,12 +25,14 @@ extern void RemoveRelations(DropStmt *drop); ...@@ -24,12 +25,14 @@ extern void RemoveRelations(DropStmt *drop);
extern void AlterTable(AlterTableStmt *stmt); extern void AlterTable(AlterTableStmt *stmt);
extern void ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing); extern LOCKMODE AlterTableGetLockLevel(List *cmds);
extern void ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lockmode);
extern void AlterTableInternal(Oid relid, List *cmds, bool recurse); extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
extern void AlterTableNamespace(RangeVar *relation, const char *newschema, extern void AlterTableNamespace(RangeVar *relation, const char *newschema,
ObjectType stmttype); ObjectType stmttype, LOCKMODE lockmode);
extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid, extern void AlterRelationNamespaceInternal(Relation classRel, Oid relOid,
Oid oldNspOid, Oid newNspOid, Oid oldNspOid, Oid newNspOid,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment