Commit 88452d5b authored by Tom Lane's avatar Tom Lane

Implement ALTER TABLE ADD UNIQUE/PRIMARY KEY USING INDEX.

This feature allows a unique or pkey constraint to be created using an
already-existing unique index.  While the constraint isn't very
functionally different from the bare index, it's nice to be able to do that
for documentation purposes.  The main advantage over just issuing a plain
ALTER TABLE ADD UNIQUE/PRIMARY KEY is that the index can be created with
CREATE INDEX CONCURRENTLY, so that there is not a long interval where the
table is locked against updates.

On the way, refactor some of the code in DefineIndex() and index_create()
so that we don't have to pass through those functions in order to create
the index constraint's catalog entries.  Also, in parse_utilcmd.c, pass
around the ParseState pointer in struct CreateStmtContext to save on
notation, and add error location pointers to some error reports that didn't
have one before.

Gurjeet Singh, reviewed by Steve Singer and Tom Lane
parent 966d4f52
......@@ -43,6 +43,7 @@ ALTER TABLE <replaceable class="PARAMETER">name</replaceable>
ALTER [ COLUMN ] <replaceable class="PARAMETER">column</replaceable> RESET ( <replaceable class="PARAMETER">attribute_option</replaceable> [, ... ] )
ALTER [ COLUMN ] <replaceable class="PARAMETER">column</replaceable> SET STORAGE { PLAIN | EXTERNAL | EXTENDED | MAIN }
ADD <replaceable class="PARAMETER">table_constraint</replaceable>
ADD <replaceable class="PARAMETER">table_constraint_using_index</replaceable>
DROP CONSTRAINT [ IF EXISTS ] <replaceable class="PARAMETER">constraint_name</replaceable> [ RESTRICT | CASCADE ]
DISABLE TRIGGER [ <replaceable class="PARAMETER">trigger_name</replaceable> | ALL | USER ]
ENABLE TRIGGER [ <replaceable class="PARAMETER">trigger_name</replaceable> | ALL | USER ]
......@@ -62,6 +63,12 @@ ALTER TABLE <replaceable class="PARAMETER">name</replaceable>
NO INHERIT <replaceable class="PARAMETER">parent_table</replaceable>
OWNER TO <replaceable class="PARAMETER">new_owner</replaceable>
SET TABLESPACE <replaceable class="PARAMETER">new_tablespace</replaceable>
<phrase>and <replaceable class="PARAMETER">table_constraint_using_index</replaceable> is:</phrase>
[ CONSTRAINT <replaceable class="PARAMETER">constraint_name</replaceable> ]
{ UNIQUE | PRIMARY KEY } USING INDEX <replaceable class="PARAMETER">index_name</replaceable>
[ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ]
</synopsis>
</refsynopsisdiv>
......@@ -229,6 +236,57 @@ ALTER TABLE <replaceable class="PARAMETER">name</replaceable>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>ADD <replaceable class="PARAMETER">table_constraint_using_index</replaceable></literal></term>
<listitem>
<para>
This form adds a new <literal>PRIMARY KEY</> or <literal>UNIQUE</>
constraint to a table based on an existing unique index. All the
columns of the index will be included in the constraint.
</para>
<para>
The index cannot have expression columns nor be a partial index.
Also, it must be a b-tree index with default sort ordering. These
restrictions ensure that the index is equivalent to one that would be
built by a regular <literal>ADD PRIMARY KEY</> or <literal>ADD UNIQUE</>
command.
</para>
<para>
If <literal>PRIMARY KEY</> is specified, and the index's columns are not
already marked <literal>NOT NULL</>, then this command will attempt to
do <literal>ALTER COLUMN SET NOT NULL</> against each such column.
That requires a full table scan to verify the column(s) contain no
nulls. In all other cases, this is a fast operation.
</para>
<para>
If a constraint name is provided then the index will be renamed to match
the constraint name. Otherwise the constraint will be named the same as
the index.
</para>
<para>
After this command is executed, the index is <quote>owned</> by the
constraint, in the same way as if the index had been built by
a regular <literal>ADD PRIMARY KEY</> or <literal>ADD UNIQUE</>
command. In particular, dropping the constraint will make the index
disappear too.
</para>
<note>
<para>
Adding a constraint using an existing index can be helpful in
situations where a new constraint needs to be added without blocking
table updates for a long time. To do that, create the index using
<command>CREATE INDEX CONCURRENTLY</>, and then install it as an
official constraint using this syntax. See the example below.
</para>
</note>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>DROP CONSTRAINT [ IF EXISTS ]</literal></term>
<listitem>
......@@ -920,13 +978,24 @@ ALTER TABLE myschema.distributors SET SCHEMA yourschema;
</programlisting>
</para>
<para>
To recreate a primary key constraint, without blocking updates while the
index is rebuilt:
<programlisting>
CREATE UNIQUE INDEX CONCURRENTLY dist_id_temp_idx on distributors (dist_id);
ALTER TABLE distributors DROP CONSTRAINT distributors_pkey,
ADD CONSTRAINT distributors_pkey PRIMARY KEY USING INDEX dist_id_temp_idx;
</programlisting>
</para>
</refsect1>
<refsect1>
<title>Compatibility</title>
<para>
The forms <literal>ADD</literal>, <literal>DROP</>, <literal>SET DEFAULT</>,
The forms <literal>ADD</literal> (without <literal>USING INDEX</literal>),
<literal>DROP</>, <literal>SET DEFAULT</>,
and <literal>SET DATA TYPE</literal> (without <literal>USING</literal>)
conform with the SQL standard. The other forms are
<productname>PostgreSQL</productname> extensions of the SQL standard.
......@@ -940,4 +1009,12 @@ ALTER TABLE myschema.distributors SET SCHEMA yourschema;
extension of SQL, which disallows zero-column tables.
</para>
</refsect1>
<refsect1>
<title>See Also</title>
<simplelist type="inline">
<member><xref linkend="sql-createtable"></member>
</simplelist>
</refsect1>
</refentry>
......@@ -82,6 +82,7 @@ typedef struct
} v_i_state;
/* non-export function prototypes */
static bool relationHasPrimaryKey(Relation rel);
static TupleDesc ConstructTupleDescriptor(Relation heapRelation,
IndexInfo *indexInfo,
List *indexColNames,
......@@ -117,6 +118,141 @@ static void RemoveReindexPending(Oid indexOid);
static void ResetReindexPending(void);
/*
* relationHasPrimaryKey
* See whether an existing relation has a primary key.
*
* Caller must have suitable lock on the relation.
*/
static bool
relationHasPrimaryKey(Relation rel)
{
bool result = false;
List *indexoidlist;
ListCell *indexoidscan;
/*
* Get the list of index OIDs for the table from the relcache, and look up
* each one in the pg_index syscache until we find one marked primary key
* (hopefully there isn't more than one such).
*/
indexoidlist = RelationGetIndexList(rel);
foreach(indexoidscan, indexoidlist)
{
Oid indexoid = lfirst_oid(indexoidscan);
HeapTuple indexTuple;
indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
if (!HeapTupleIsValid(indexTuple)) /* should not happen */
elog(ERROR, "cache lookup failed for index %u", indexoid);
result = ((Form_pg_index) GETSTRUCT(indexTuple))->indisprimary;
ReleaseSysCache(indexTuple);
if (result)
break;
}
list_free(indexoidlist);
return result;
}
/*
* index_check_primary_key
* Apply special checks needed before creating a PRIMARY KEY index
*
* This processing used to be in DefineIndex(), but has been split out
* so that it can be applied during ALTER TABLE ADD PRIMARY KEY USING INDEX.
*
* We check for a pre-existing primary key, and that all columns of the index
* are simple column references (not expressions), and that all those
* columns are marked NOT NULL. If they aren't (which can only happen during
* ALTER TABLE ADD CONSTRAINT, since the parser forces such columns to be
* created NOT NULL during CREATE TABLE), do an ALTER SET NOT NULL to mark
* them so --- or fail if they are not in fact nonnull.
*
* Caller had better have at least ShareLock on the table, else the not-null
* checking isn't trustworthy.
*/
void
index_check_primary_key(Relation heapRel,
IndexInfo *indexInfo,
bool is_alter_table)
{
List *cmds;
int i;
/*
* If ALTER TABLE, check that there isn't already a PRIMARY KEY. In
* CREATE TABLE, we have faith that the parser rejected multiple pkey
* clauses; and CREATE INDEX doesn't have a way to say PRIMARY KEY, so
* it's no problem either.
*/
if (is_alter_table &&
relationHasPrimaryKey(heapRel))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("multiple primary keys for table \"%s\" are not allowed",
RelationGetRelationName(heapRel))));
}
/*
* Check that all of the attributes in a primary key are marked as not
* null, otherwise attempt to ALTER TABLE .. SET NOT NULL
*/
cmds = NIL;
for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
{
AttrNumber attnum = indexInfo->ii_KeyAttrNumbers[i];
HeapTuple atttuple;
Form_pg_attribute attform;
if (attnum == 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("primary keys cannot be expressions")));
/* System attributes are never null, so no need to check */
if (attnum < 0)
continue;
atttuple = SearchSysCache2(ATTNUM,
ObjectIdGetDatum(RelationGetRelid(heapRel)),
Int16GetDatum(attnum));
if (!HeapTupleIsValid(atttuple))
elog(ERROR, "cache lookup failed for attribute %d of relation %u",
attnum, RelationGetRelid(heapRel));
attform = (Form_pg_attribute) GETSTRUCT(atttuple);
if (!attform->attnotnull)
{
/* Add a subcommand to make this one NOT NULL */
AlterTableCmd *cmd = makeNode(AlterTableCmd);
cmd->subtype = AT_SetNotNull;
cmd->name = pstrdup(NameStr(attform->attname));
cmds = lappend(cmds, cmd);
}
ReleaseSysCache(atttuple);
}
/*
* XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade to child
* tables? Currently, since the PRIMARY KEY itself doesn't cascade,
* we don't cascade the notnull constraint(s) either; but this is
* pretty debatable.
*
* XXX: possible future improvement: when being called from ALTER
* TABLE, it would be more efficient to merge this with the outer
* ALTER TABLE, so as to avoid two scans. But that seems to
* complicate DefineIndex's API unduly.
*/
if (cmds)
AlterTableInternal(RelationGetRelid(heapRel), cmds, false);
}
/*
* ConstructTupleDescriptor
*
......@@ -492,7 +628,7 @@ UpdateIndexRelation(Oid indexoid,
/*
* index_create
*
* heapRelationId: OID of table to build index on
* heapRelation: table to build index on (suitably locked by caller)
* indexRelationName: what it say
* indexRelationId: normally, pass InvalidOid to let this routine
* generate an OID for the index. During bootstrap this may be
......@@ -505,7 +641,7 @@ UpdateIndexRelation(Oid indexoid,
* coloptions: array of per-index-column indoption settings
* reloptions: AM-specific options
* isprimary: index is a PRIMARY KEY
* isconstraint: index is owned by a PRIMARY KEY or UNIQUE constraint
* isconstraint: index is owned by PRIMARY KEY, UNIQUE, or EXCLUSION constraint
* deferrable: constraint is DEFERRABLE
* initdeferred: constraint is INITIALLY DEFERRED
* allow_system_table_mods: allow table to be a system catalog
......@@ -518,7 +654,7 @@ UpdateIndexRelation(Oid indexoid,
* Returns the OID of the created index.
*/
Oid
index_create(Oid heapRelationId,
index_create(Relation heapRelation,
const char *indexRelationName,
Oid indexRelationId,
IndexInfo *indexInfo,
......@@ -536,8 +672,8 @@ index_create(Oid heapRelationId,
bool skip_build,
bool concurrent)
{
Oid heapRelationId = RelationGetRelid(heapRelation);
Relation pg_class;
Relation heapRelation;
Relation indexRelation;
TupleDesc indexTupDesc;
bool shared_relation;
......@@ -551,14 +687,6 @@ index_create(Oid heapRelationId,
pg_class = heap_open(RelationRelationId, RowExclusiveLock);
/*
* Only SELECT ... FOR UPDATE/SHARE are allowed while doing a standard
* index build; but for concurrent builds we allow INSERT/UPDATE/DELETE
* (but not VACUUM).
*/
heapRelation = heap_open(heapRelationId,
(concurrent ? ShareUpdateExclusiveLock : ShareLock));
/*
* The index will be in the same namespace as its parent table, and is
* shared across databases if and only if the parent is. Likewise, it
......@@ -734,9 +862,9 @@ index_create(Oid heapRelationId,
* Register constraint and dependencies for the index.
*
* If the index is from a CONSTRAINT clause, construct a pg_constraint
* entry. The index is then linked to the constraint, which in turn is
* linked to the table. If it's not a CONSTRAINT, make the dependency
* directly on the table.
* entry. The index will be linked to the constraint, which in turn is
* linked to the table. If it's not a CONSTRAINT, we need to make a
* dependency directly on the table.
*
* We don't need a dependency on the namespace, because there'll be an
* indirect dependency via our parent table.
......@@ -756,7 +884,6 @@ index_create(Oid heapRelationId,
if (isconstraint)
{
char constraintType;
Oid conOid;
if (isprimary)
constraintType = CONSTRAINT_PRIMARY;
......@@ -770,77 +897,16 @@ index_create(Oid heapRelationId,
constraintType = 0; /* keep compiler quiet */
}
/* primary/unique constraints shouldn't have any expressions */
if (indexInfo->ii_Expressions &&
constraintType != CONSTRAINT_EXCLUSION)
elog(ERROR, "constraints cannot have index expressions");
conOid = CreateConstraintEntry(indexRelationName,
namespaceId,
constraintType,
deferrable,
initdeferred,
heapRelationId,
indexInfo->ii_KeyAttrNumbers,
indexInfo->ii_NumIndexAttrs,
InvalidOid, /* no domain */
indexRelationId, /* index OID */
InvalidOid, /* no foreign key */
NULL,
NULL,
NULL,
NULL,
0,
' ',
' ',
' ',
indexInfo->ii_ExclusionOps,
NULL, /* no check constraint */
NULL,
NULL,
true, /* islocal */
0); /* inhcount */
referenced.classId = ConstraintRelationId;
referenced.objectId = conOid;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_INTERNAL);
/*
* If the constraint is deferrable, create the deferred uniqueness
* checking trigger. (The trigger will be given an internal
* dependency on the constraint by CreateTrigger, so there's no
* need to do anything more here.)
*/
if (deferrable)
{
RangeVar *heapRel;
CreateTrigStmt *trigger;
heapRel = makeRangeVar(get_namespace_name(namespaceId),
pstrdup(RelationGetRelationName(heapRelation)),
-1);
trigger = makeNode(CreateTrigStmt);
trigger->trigname = (isprimary ? "PK_ConstraintTrigger" :
"Unique_ConstraintTrigger");
trigger->relation = heapRel;
trigger->funcname = SystemFuncName("unique_key_recheck");
trigger->args = NIL;
trigger->row = true;
trigger->timing = TRIGGER_TYPE_AFTER;
trigger->events = TRIGGER_TYPE_INSERT | TRIGGER_TYPE_UPDATE;
trigger->columns = NIL;
trigger->whenClause = NULL;
trigger->isconstraint = true;
trigger->deferrable = true;
trigger->initdeferred = initdeferred;
trigger->constrrel = NULL;
(void) CreateTrigger(trigger, NULL, conOid, indexRelationId,
true);
}
index_constraint_create(heapRelation,
indexRelationId,
indexInfo,
indexRelationName,
constraintType,
deferrable,
initdeferred,
false, /* already marked primary */
false, /* pg_index entry is OK */
allow_system_table_mods);
}
else
{
......@@ -970,15 +1036,211 @@ index_create(Oid heapRelationId,
}
/*
* Close the heap and index; but we keep the locks that we acquired above
* until end of transaction.
* Close the index; but we keep the lock that we acquired above until end
* of transaction. Closing the heap is caller's responsibility.
*/
index_close(indexRelation, NoLock);
heap_close(heapRelation, NoLock);
return indexRelationId;
}
/*
* index_constraint_create
*
* Set up a constraint associated with an index
*
* heapRelation: table owning the index (must be suitably locked by caller)
* indexRelationId: OID of the index
* indexInfo: same info executor uses to insert into the index
* constraintName: what it say (generally, should match name of index)
* constraintType: one of CONSTRAINT_PRIMARY, CONSTRAINT_UNIQUE, or
* CONSTRAINT_EXCLUSION
* deferrable: constraint is DEFERRABLE
* initdeferred: constraint is INITIALLY DEFERRED
* mark_as_primary: if true, set flags to mark index as primary key
* update_pgindex: if true, update pg_index row (else caller's done that)
* allow_system_table_mods: allow table to be a system catalog
*/
void
index_constraint_create(Relation heapRelation,
Oid indexRelationId,
IndexInfo *indexInfo,
const char *constraintName,
char constraintType,
bool deferrable,
bool initdeferred,
bool mark_as_primary,
bool update_pgindex,
bool allow_system_table_mods)
{
Oid namespaceId = RelationGetNamespace(heapRelation);
ObjectAddress myself,
referenced;
Oid conOid;
/* constraint creation support doesn't work while bootstrapping */
Assert(!IsBootstrapProcessingMode());
/* enforce system-table restriction */
if (!allow_system_table_mods &&
IsSystemRelation(heapRelation) &&
IsNormalProcessingMode())
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("user-defined indexes on system catalog tables are not supported")));
/* primary/unique constraints shouldn't have any expressions */
if (indexInfo->ii_Expressions &&
constraintType != CONSTRAINT_EXCLUSION)
elog(ERROR, "constraints cannot have index expressions");
/*
* Construct a pg_constraint entry.
*/
conOid = CreateConstraintEntry(constraintName,
namespaceId,
constraintType,
deferrable,
initdeferred,
RelationGetRelid(heapRelation),
indexInfo->ii_KeyAttrNumbers,
indexInfo->ii_NumIndexAttrs,
InvalidOid, /* no domain */
indexRelationId, /* index OID */
InvalidOid, /* no foreign key */
NULL,
NULL,
NULL,
NULL,
0,
' ',
' ',
' ',
indexInfo->ii_ExclusionOps,
NULL, /* no check constraint */
NULL,
NULL,
true, /* islocal */
0); /* inhcount */
/*
* Register the index as internally dependent on the constraint.
*
* Note that the constraint has a dependency on the table, so when this
* path is taken we do not need any direct dependency from the index to
* the table. (But if one exists, no great harm is done, either. So in
* the case where we're manufacturing a constraint for a pre-existing
* index, we don't bother to try to get rid of the existing index->table
* dependency.)
*/
myself.classId = RelationRelationId;
myself.objectId = indexRelationId;
myself.objectSubId = 0;
referenced.classId = ConstraintRelationId;
referenced.objectId = conOid;
referenced.objectSubId = 0;
recordDependencyOn(&myself, &referenced, DEPENDENCY_INTERNAL);
/*
* If the constraint is deferrable, create the deferred uniqueness
* checking trigger. (The trigger will be given an internal
* dependency on the constraint by CreateTrigger.)
*/
if (deferrable)
{
RangeVar *heapRel;
CreateTrigStmt *trigger;
heapRel = makeRangeVar(get_namespace_name(namespaceId),
pstrdup(RelationGetRelationName(heapRelation)),
-1);
trigger = makeNode(CreateTrigStmt);
trigger->trigname = (constraintType == CONSTRAINT_PRIMARY) ?
"PK_ConstraintTrigger" :
"Unique_ConstraintTrigger";
trigger->relation = heapRel;
trigger->funcname = SystemFuncName("unique_key_recheck");
trigger->args = NIL;
trigger->row = true;
trigger->timing = TRIGGER_TYPE_AFTER;
trigger->events = TRIGGER_TYPE_INSERT | TRIGGER_TYPE_UPDATE;
trigger->columns = NIL;
trigger->whenClause = NULL;
trigger->isconstraint = true;
trigger->deferrable = true;
trigger->initdeferred = initdeferred;
trigger->constrrel = NULL;
(void) CreateTrigger(trigger, NULL, conOid, indexRelationId, true);
}
/*
* If needed, mark the table as having a primary key. We assume it can't
* have been so marked already, so no need to clear the flag in the other
* case.
*
* Note: this might better be done by callers. We do it here to avoid
* exposing index_update_stats() globally, but that wouldn't be necessary
* if relhaspkey went away.
*/
if (mark_as_primary)
index_update_stats(heapRelation,
true,
true,
false,
InvalidOid,
heapRelation->rd_rel->reltuples);
/*
* If needed, mark the index as primary and/or deferred in pg_index.
*
* Note: since this is a transactional update, it's unsafe against
* concurrent SnapshotNow scans of pg_index. When making an existing
* index into a constraint, caller must have a table lock that prevents
* concurrent table updates, and there is a risk that concurrent readers
* of the table will miss seeing this index at all.
*/
if (update_pgindex && (mark_as_primary || deferrable))
{
Relation pg_index;
HeapTuple indexTuple;
Form_pg_index indexForm;
bool dirty = false;
pg_index = heap_open(IndexRelationId, RowExclusiveLock);
indexTuple = SearchSysCacheCopy1(INDEXRELID,
ObjectIdGetDatum(indexRelationId));
if (!HeapTupleIsValid(indexTuple))
elog(ERROR, "cache lookup failed for index %u", indexRelationId);
indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
if (mark_as_primary && !indexForm->indisprimary)
{
indexForm->indisprimary = true;
dirty = true;
}
if (deferrable && indexForm->indimmediate)
{
indexForm->indimmediate = false;
dirty = true;
}
if (dirty)
{
simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
CatalogUpdateIndexes(pg_index, indexTuple);
}
heap_freetuple(indexTuple);
heap_close(pg_index, RowExclusiveLock);
}
}
/*
* index_drop
*
......
......@@ -115,6 +115,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, Datum reloptio
TupleDesc tupdesc;
bool shared_relation;
bool mapped_relation;
Relation toast_rel;
Relation class_rel;
Oid toast_relid;
Oid toast_idxid;
......@@ -229,9 +230,12 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, Datum reloptio
false);
Assert(toast_relid != InvalidOid);
/* make the toast relation visible, else index creation will fail */
/* make the toast relation visible, else heap_open will fail */
CommandCounterIncrement();
/* ShareLock is not really needed here, but take it anyway */
toast_rel = heap_open(toast_relid, ShareLock);
/*
* Create unique index on chunk_id, chunk_seq.
*
......@@ -266,7 +270,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, Datum reloptio
coloptions[0] = 0;
coloptions[1] = 0;
toast_idxid = index_create(toast_relid, toast_idxname, toastIndexOid,
toast_idxid = index_create(toast_rel, toast_idxname, toastIndexOid,
indexInfo,
list_make2("chunk_id", "chunk_seq"),
BTREE_AM_OID,
......@@ -275,6 +279,8 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid, Datum reloptio
true, false, false, false,
true, false, false);
heap_close(toast_rel, NoLock);
/*
* Store the toast table's OID in the parent relation's pg_class row
*/
......
......@@ -69,7 +69,6 @@ static void ComputeIndexAttrs(IndexInfo *indexInfo,
static Oid GetIndexOpClass(List *opclass, Oid attrType,
char *accessMethodName, Oid accessMethodId);
static char *ChooseIndexNameAddition(List *colnames);
static bool relationHasPrimaryKey(Relation rel);
/*
......@@ -320,92 +319,6 @@ DefineIndex(RangeVar *heapRelation,
if (predicate)
CheckPredicate(predicate);
/*
* Extra checks when creating a PRIMARY KEY index.
*/
if (primary)
{
List *cmds;
ListCell *keys;
/*
* If ALTER TABLE, check that there isn't already a PRIMARY KEY. In
* CREATE TABLE, we have faith that the parser rejected multiple pkey
* clauses; and CREATE INDEX doesn't have a way to say PRIMARY KEY, so
* it's no problem either.
*/
if (is_alter_table &&
relationHasPrimaryKey(rel))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("multiple primary keys for table \"%s\" are not allowed",
RelationGetRelationName(rel))));
}
/*
* Check that all of the attributes in a primary key are marked as not
* null, otherwise attempt to ALTER TABLE .. SET NOT NULL
*/
cmds = NIL;
foreach(keys, attributeList)
{
IndexElem *key = (IndexElem *) lfirst(keys);
HeapTuple atttuple;
if (!key->name)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("primary keys cannot be expressions")));
/* System attributes are never null, so no problem */
if (SystemAttributeByName(key->name, rel->rd_rel->relhasoids))
continue;
atttuple = SearchSysCacheAttName(relationId, key->name);
if (HeapTupleIsValid(atttuple))
{
if (!((Form_pg_attribute) GETSTRUCT(atttuple))->attnotnull)
{
/* Add a subcommand to make this one NOT NULL */
AlterTableCmd *cmd = makeNode(AlterTableCmd);
cmd->subtype = AT_SetNotNull;
cmd->name = key->name;
cmds = lappend(cmds, cmd);
}
ReleaseSysCache(atttuple);
}
else
{
/*
* This shouldn't happen during CREATE TABLE, but can happen
* during ALTER TABLE. Keep message in sync with
* transformIndexConstraints() in parser/parse_utilcmd.c.
*/
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" named in key does not exist",
key->name)));
}
}
/*
* XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade to child
* tables? Currently, since the PRIMARY KEY itself doesn't cascade,
* we don't cascade the notnull constraint(s) either; but this is
* pretty debatable.
*
* XXX: possible future improvement: when being called from ALTER
* TABLE, it would be more efficient to merge this with the outer
* ALTER TABLE, so as to avoid two scans. But that seems to
* complicate DefineIndex's API unduly.
*/
if (cmds)
AlterTableInternal(relationId, cmds, false);
}
/*
* Parse AM-specific options, convert to text array form, validate.
*/
......@@ -439,6 +352,12 @@ DefineIndex(RangeVar *heapRelation,
accessMethodName, accessMethodId,
amcanorder, isconstraint);
/*
* Extra checks when creating a PRIMARY KEY index.
*/
if (primary)
index_check_primary_key(rel, indexInfo, is_alter_table);
/*
* Report index creation if appropriate (delay this till after most of the
* error checks)
......@@ -466,17 +385,12 @@ DefineIndex(RangeVar *heapRelation,
indexRelationName, RelationGetRelationName(rel))));
}
/* save lockrelid and locktag for below, then close rel */
heaprelid = rel->rd_lockInfo.lockRelId;
SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
heap_close(rel, NoLock);
/*
* Make the catalog entries for the index, including constraints. Then, if
* not skip_build || concurrent, actually build the index.
*/
indexRelationId =
index_create(relationId, indexRelationName, indexRelationId,
index_create(rel, indexRelationName, indexRelationId,
indexInfo, indexColNames,
accessMethodId, tablespaceId, classObjectId,
coloptions, reloptions, primary,
......@@ -486,7 +400,16 @@ DefineIndex(RangeVar *heapRelation,
concurrent);
if (!concurrent)
return; /* We're done, in the standard case */
{
/* Close the heap and we're done, in the non-concurrent case */
heap_close(rel, NoLock);
return;
}
/* save lockrelid and locktag for below, then close rel */
heaprelid = rel->rd_lockInfo.lockRelId;
SET_LOCKTAG_RELATION(heaplocktag, heaprelid.dbId, heaprelid.relId);
heap_close(rel, NoLock);
/*
* For a concurrent build, it's important to make the catalog entries
......@@ -1531,44 +1454,6 @@ ChooseIndexColumnNames(List *indexElems)
return result;
}
/*
* relationHasPrimaryKey -
*
* See whether an existing relation has a primary key.
*/
static bool
relationHasPrimaryKey(Relation rel)
{
bool result = false;
List *indexoidlist;
ListCell *indexoidscan;
/*
* Get the list of index OIDs for the table from the relcache, and look up
* each one in the pg_index syscache until we find one marked primary key
* (hopefully there isn't more than one such).
*/
indexoidlist = RelationGetIndexList(rel);
foreach(indexoidscan, indexoidlist)
{
Oid indexoid = lfirst_oid(indexoidscan);
HeapTuple indexTuple;
indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
if (!HeapTupleIsValid(indexTuple)) /* should not happen */
elog(ERROR, "cache lookup failed for index %u", indexoid);
result = ((Form_pg_index) GETSTRUCT(indexTuple))->indisprimary;
ReleaseSysCache(indexTuple);
if (result)
break;
}
list_free(indexoidlist);
return result;
}
/*
* ReindexIndex
* Recreate a specific index.
......
......@@ -319,6 +319,8 @@ static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
static void ATExecAddConstraint(List **wqueue,
AlteredTableInfo *tab, Relation rel,
Constraint *newConstraint, bool recurse, LOCKMODE lockmode);
static void ATExecAddIndexConstraint(AlteredTableInfo *tab, Relation rel,
IndexStmt *stmt, LOCKMODE lockmode);
static void ATAddCheckConstraint(List **wqueue,
AlteredTableInfo *tab, Relation rel,
Constraint *constr,
......@@ -2594,6 +2596,7 @@ AlterTableGetLockLevel(List *cmds)
case AT_DisableTrigAll:
case AT_DisableTrigUser:
case AT_AddIndex: /* from ADD CONSTRAINT */
case AT_AddIndexConstraint:
cmd_lockmode = ShareRowExclusiveLock;
break;
......@@ -2811,6 +2814,12 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
cmd->subtype = AT_AddConstraintRecurse;
pass = AT_PASS_ADD_CONSTR;
break;
case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */
ATSimplePermissions(rel, ATT_TABLE);
/* This command never recurses */
/* No command-specific prep needed */
pass = AT_PASS_ADD_CONSTR;
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
ATSimplePermissions(rel, ATT_TABLE);
/* Recursion occurs during execution phase */
......@@ -3042,6 +3051,9 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
ATExecAddConstraint(wqueue, tab, rel, (Constraint *) cmd->def,
true, lockmode);
break;
case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */
ATExecAddIndexConstraint(tab, rel, (IndexStmt *) cmd->def, lockmode);
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
ATExecDropConstraint(rel, cmd->name, cmd->behavior,
false, false,
......@@ -5009,6 +5021,76 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
false);
}
/*
* ALTER TABLE ADD CONSTRAINT USING INDEX
*/
static void
ATExecAddIndexConstraint(AlteredTableInfo *tab, Relation rel,
IndexStmt *stmt, LOCKMODE lockmode)
{
Oid index_oid = stmt->indexOid;
Relation indexRel;
char *indexName;
IndexInfo *indexInfo;
char *constraintName;
char constraintType;
Assert(IsA(stmt, IndexStmt));
Assert(OidIsValid(index_oid));
Assert(stmt->isconstraint);
indexRel = index_open(index_oid, AccessShareLock);
indexName = pstrdup(RelationGetRelationName(indexRel));
indexInfo = BuildIndexInfo(indexRel);
/* this should have been checked at parse time */
if (!indexInfo->ii_Unique)
elog(ERROR, "index \"%s\" is not unique", indexName);
/*
* Determine name to assign to constraint. We require a constraint to
* have the same name as the underlying index; therefore, use the index's
* existing name as the default constraint name, and if the user explicitly
* gives some other name for the constraint, rename the index to match.
*/
constraintName = stmt->idxname;
if (constraintName == NULL)
constraintName = indexName;
else if (strcmp(constraintName, indexName) != 0)
{
ereport(NOTICE,
(errmsg("ALTER TABLE / ADD CONSTRAINT USING INDEX will rename index \"%s\" to \"%s\"",
indexName, constraintName)));
RenameRelation(index_oid, constraintName, OBJECT_INDEX);
}
/* Extra checks needed if making primary key */
if (stmt->primary)
index_check_primary_key(rel, indexInfo, true);
/* Note we currently don't support EXCLUSION constraints here */
if (stmt->primary)
constraintType = CONSTRAINT_PRIMARY;
else
constraintType = CONSTRAINT_UNIQUE;
/* Create the catalog entries for the constraint */
index_constraint_create(rel,
index_oid,
indexInfo,
constraintName,
constraintType,
stmt->deferrable,
stmt->initdeferred,
stmt->primary,
true,
allowSystemTableMods);
index_close(indexRel, NoLock);
}
/*
* ALTER TABLE ADD CONSTRAINT
*/
......
......@@ -2233,6 +2233,7 @@ _copyConstraint(Constraint *from)
COPY_NODE_FIELD(keys);
COPY_NODE_FIELD(exclusions);
COPY_NODE_FIELD(options);
COPY_STRING_FIELD(indexname);
COPY_STRING_FIELD(indexspace);
COPY_STRING_FIELD(access_method);
COPY_NODE_FIELD(where_clause);
......@@ -2705,6 +2706,7 @@ _copyIndexStmt(IndexStmt *from)
COPY_NODE_FIELD(options);
COPY_NODE_FIELD(whereClause);
COPY_NODE_FIELD(excludeOpNames);
COPY_SCALAR_FIELD(indexOid);
COPY_SCALAR_FIELD(unique);
COPY_SCALAR_FIELD(primary);
COPY_SCALAR_FIELD(isconstraint);
......
......@@ -1212,6 +1212,7 @@ _equalIndexStmt(IndexStmt *a, IndexStmt *b)
COMPARE_NODE_FIELD(options);
COMPARE_NODE_FIELD(whereClause);
COMPARE_NODE_FIELD(excludeOpNames);
COMPARE_SCALAR_FIELD(indexOid);
COMPARE_SCALAR_FIELD(unique);
COMPARE_SCALAR_FIELD(primary);
COMPARE_SCALAR_FIELD(isconstraint);
......@@ -2181,6 +2182,7 @@ _equalConstraint(Constraint *a, Constraint *b)
COMPARE_NODE_FIELD(keys);
COMPARE_NODE_FIELD(exclusions);
COMPARE_NODE_FIELD(options);
COMPARE_STRING_FIELD(indexname);
COMPARE_STRING_FIELD(indexspace);
COMPARE_STRING_FIELD(access_method);
COMPARE_NODE_FIELD(where_clause);
......
......@@ -1877,6 +1877,7 @@ _outIndexStmt(StringInfo str, IndexStmt *node)
WRITE_NODE_FIELD(options);
WRITE_NODE_FIELD(whereClause);
WRITE_NODE_FIELD(excludeOpNames);
WRITE_OID_FIELD(indexOid);
WRITE_BOOL_FIELD(unique);
WRITE_BOOL_FIELD(primary);
WRITE_BOOL_FIELD(isconstraint);
......@@ -2474,6 +2475,7 @@ _outConstraint(StringInfo str, Constraint *node)
appendStringInfo(str, "PRIMARY_KEY");
WRITE_NODE_FIELD(keys);
WRITE_NODE_FIELD(options);
WRITE_STRING_FIELD(indexname);
WRITE_STRING_FIELD(indexspace);
/* access_method and where_clause not currently used */
break;
......@@ -2482,6 +2484,7 @@ _outConstraint(StringInfo str, Constraint *node)
appendStringInfo(str, "UNIQUE");
WRITE_NODE_FIELD(keys);
WRITE_NODE_FIELD(options);
WRITE_STRING_FIELD(indexname);
WRITE_STRING_FIELD(indexspace);
/* access_method and where_clause not currently used */
break;
......@@ -2490,6 +2493,7 @@ _outConstraint(StringInfo str, Constraint *node)
appendStringInfo(str, "EXCLUSION");
WRITE_NODE_FIELD(exclusions);
WRITE_NODE_FIELD(options);
WRITE_STRING_FIELD(indexname);
WRITE_STRING_FIELD(indexspace);
WRITE_STRING_FIELD(access_method);
WRITE_NODE_FIELD(where_clause);
......
......@@ -422,6 +422,7 @@ static RangeVar *makeRangeVarFromAnyName(List *names, int position, core_yyscan_
%type <ival> key_actions key_delete key_match key_update key_action
%type <ival> ConstraintAttributeSpec ConstraintDeferrabilitySpec
ConstraintTimeSpec
%type <str> ExistingIndex
%type <list> constraints_set_list
%type <boolean> constraints_set_mode
......@@ -2501,6 +2502,7 @@ ColConstraintElem:
n->location = @1;
n->keys = NULL;
n->options = $2;
n->indexname = NULL;
n->indexspace = $3;
$$ = (Node *)n;
}
......@@ -2511,6 +2513,7 @@ ColConstraintElem:
n->location = @1;
n->keys = NULL;
n->options = $3;
n->indexname = NULL;
n->indexspace = $4;
$$ = (Node *)n;
}
......@@ -2665,11 +2668,25 @@ ConstraintElem:
n->location = @1;
n->keys = $3;
n->options = $5;
n->indexname = NULL;
n->indexspace = $6;
n->deferrable = ($7 & 1) != 0;
n->initdeferred = ($7 & 2) != 0;
$$ = (Node *)n;
}
| UNIQUE ExistingIndex ConstraintAttributeSpec
{
Constraint *n = makeNode(Constraint);
n->contype = CONSTR_UNIQUE;
n->location = @1;
n->keys = NIL;
n->options = NIL;
n->indexname = $2;
n->indexspace = NULL;
n->deferrable = ($3 & 1) != 0;
n->initdeferred = ($3 & 2) != 0;
$$ = (Node *)n;
}
| PRIMARY KEY '(' columnList ')' opt_definition OptConsTableSpace
ConstraintAttributeSpec
{
......@@ -2678,11 +2695,25 @@ ConstraintElem:
n->location = @1;
n->keys = $4;
n->options = $6;
n->indexname = NULL;
n->indexspace = $7;
n->deferrable = ($8 & 1) != 0;
n->initdeferred = ($8 & 2) != 0;
$$ = (Node *)n;
}
| PRIMARY KEY ExistingIndex ConstraintAttributeSpec
{
Constraint *n = makeNode(Constraint);
n->contype = CONSTR_PRIMARY;
n->location = @1;
n->keys = NIL;
n->options = NIL;
n->indexname = $3;
n->indexspace = NULL;
n->deferrable = ($4 & 1) != 0;
n->initdeferred = ($4 & 2) != 0;
$$ = (Node *)n;
}
| EXCLUDE access_method_clause '(' ExclusionConstraintList ')'
opt_definition OptConsTableSpace ExclusionWhereClause
ConstraintAttributeSpec
......@@ -2693,6 +2724,7 @@ ConstraintElem:
n->access_method = $2;
n->exclusions = $4;
n->options = $6;
n->indexname = NULL;
n->indexspace = $7;
n->where_clause = $8;
n->deferrable = ($9 & 1) != 0;
......@@ -2837,6 +2869,9 @@ OptConsTableSpace: USING INDEX TABLESPACE name { $$ = $4; }
| /*EMPTY*/ { $$ = NULL; }
;
ExistingIndex: USING INDEX index_name { $$ = $3; }
;
/*
* Note: CREATE TABLE ... AS SELECT ... is just another spelling for
......@@ -5230,6 +5265,7 @@ IndexStmt: CREATE opt_unique INDEX opt_concurrently opt_index_name
n->options = $12;
n->tableSpace = $13;
n->whereClause = $14;
n->indexOid = InvalidOid;
$$ = (Node *)n;
}
;
......
......@@ -65,6 +65,7 @@
/* State shared by transformCreateStmt and its subroutines */
typedef struct
{
ParseState *pstate; /* overall parser state */
const char *stmtType; /* "CREATE [FOREIGN] TABLE" or "ALTER TABLE" */
RangeVar *relation; /* relation to create */
Relation rel; /* opened/locked rel, if ALTER */
......@@ -98,30 +99,27 @@ typedef struct
} CreateSchemaStmtContext;
static void transformColumnDefinition(ParseState *pstate,
CreateStmtContext *cxt,
static void transformColumnDefinition(CreateStmtContext *cxt,
ColumnDef *column);
static void transformTableConstraint(ParseState *pstate,
CreateStmtContext *cxt,
static void transformTableConstraint(CreateStmtContext *cxt,
Constraint *constraint);
static void transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
static void transformInhRelation(CreateStmtContext *cxt,
InhRelation *inhrelation);
static void transformOfType(ParseState *pstate, CreateStmtContext *cxt,
static void transformOfType(CreateStmtContext *cxt,
TypeName *ofTypename);
static char *chooseIndexName(const RangeVar *relation, IndexStmt *index_stmt);
static IndexStmt *generateClonedIndexStmt(CreateStmtContext *cxt,
Relation parent_index, AttrNumber *attmap);
static List *get_opclass(Oid opclass, Oid actual_datatype);
static void transformIndexConstraints(ParseState *pstate,
CreateStmtContext *cxt);
static void transformIndexConstraints(CreateStmtContext *cxt);
static IndexStmt *transformIndexConstraint(Constraint *constraint,
CreateStmtContext *cxt);
static void transformFKConstraints(ParseState *pstate,
CreateStmtContext *cxt,
static void transformFKConstraints(CreateStmtContext *cxt,
bool skipValidation,
bool isAddConstraint);
static void transformConstraintAttrs(ParseState *pstate, List *constraintList);
static void transformColumnType(ParseState *pstate, ColumnDef *column);
static void transformConstraintAttrs(CreateStmtContext *cxt,
List *constraintList);
static void transformColumnType(CreateStmtContext *cxt, ColumnDef *column);
static void setSchemaName(char *context_schema, char **stmt_schema_name);
......@@ -169,10 +167,11 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
stmt->relation->schemaname = get_namespace_name(namespaceid);
}
/* Set up pstate */
/* Set up pstate and CreateStmtContext */
pstate = make_parsestate(NULL);
pstate->p_sourcetext = queryString;
cxt.pstate = pstate;
if (IsA(stmt, CreateForeignTableStmt))
cxt.stmtType = "CREATE FOREIGN TABLE";
else
......@@ -194,7 +193,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
Assert(!stmt->ofTypename || !stmt->inhRelations); /* grammar enforces */
if (stmt->ofTypename)
transformOfType(pstate, &cxt, stmt->ofTypename);
transformOfType(&cxt, stmt->ofTypename);
/*
* Run through each primary element in the table creation clause. Separate
......@@ -207,18 +206,15 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
switch (nodeTag(element))
{
case T_ColumnDef:
transformColumnDefinition(pstate, &cxt,
(ColumnDef *) element);
transformColumnDefinition(&cxt, (ColumnDef *) element);
break;
case T_Constraint:
transformTableConstraint(pstate, &cxt,
(Constraint *) element);
transformTableConstraint(&cxt, (Constraint *) element);
break;
case T_InhRelation:
transformInhRelation(pstate, &cxt,
(InhRelation *) element);
transformInhRelation(&cxt, (InhRelation *) element);
break;
default:
......@@ -240,12 +236,12 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
/*
* Postprocess constraints that give rise to index definitions.
*/
transformIndexConstraints(pstate, &cxt);
transformIndexConstraints(&cxt);
/*
* Postprocess foreign-key constraints.
*/
transformFKConstraints(pstate, &cxt, true, false);
transformFKConstraints(&cxt, true, false);
/*
* Output results.
......@@ -266,8 +262,7 @@ transformCreateStmt(CreateStmt *stmt, const char *queryString)
* Also used in ALTER TABLE ADD COLUMN
*/
static void
transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
ColumnDef *column)
transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column)
{
bool is_serial;
bool saw_nullable;
......@@ -309,12 +304,13 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("array of serial is not implemented"),
parser_errposition(pstate, column->typeName->location)));
parser_errposition(cxt->pstate,
column->typeName->location)));
}
/* Do necessary work on the column type declaration */
if (column->typeName)
transformColumnType(pstate, column);
transformColumnType(cxt, column);
/* Special actions for SERIAL pseudo-types */
if (is_serial)
......@@ -437,7 +433,7 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
}
/* Process column constraints, if any... */
transformConstraintAttrs(pstate, column->constraints);
transformConstraintAttrs(cxt, column->constraints);
saw_nullable = false;
saw_default = false;
......@@ -455,7 +451,7 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting NULL/NOT NULL declarations for column \"%s\" of table \"%s\"",
column->colname, cxt->relation->relname),
parser_errposition(pstate,
parser_errposition(cxt->pstate,
constraint->location)));
column->is_not_null = FALSE;
saw_nullable = true;
......@@ -467,7 +463,7 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting NULL/NOT NULL declarations for column \"%s\" of table \"%s\"",
column->colname, cxt->relation->relname),
parser_errposition(pstate,
parser_errposition(cxt->pstate,
constraint->location)));
column->is_not_null = TRUE;
saw_nullable = true;
......@@ -479,7 +475,7 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("multiple default values specified for column \"%s\" of table \"%s\"",
column->colname, cxt->relation->relname),
parser_errposition(pstate,
parser_errposition(cxt->pstate,
constraint->location)));
column->raw_default = constraint->raw_expr;
Assert(constraint->cooked_expr == NULL);
......@@ -532,8 +528,7 @@ transformColumnDefinition(ParseState *pstate, CreateStmtContext *cxt,
* transform a Constraint node within CREATE TABLE or ALTER TABLE
*/
static void
transformTableConstraint(ParseState *pstate, CreateStmtContext *cxt,
Constraint *constraint)
transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint)
{
switch (constraint->contype)
{
......@@ -577,8 +572,7 @@ transformTableConstraint(ParseState *pstate, CreateStmtContext *cxt,
* <subtable>.
*/
static void
transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
InhRelation *inhRelation)
transformInhRelation(CreateStmtContext *cxt, InhRelation *inhRelation)
{
AttrNumber parent_attno;
Relation relation;
......@@ -587,7 +581,8 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
AclResult aclresult;
char *comment;
relation = parserOpenTable(pstate, inhRelation->relation, AccessShareLock);
relation = parserOpenTable(cxt->pstate, inhRelation->relation,
AccessShareLock);
if (relation->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
......@@ -816,7 +811,7 @@ transformInhRelation(ParseState *pstate, CreateStmtContext *cxt,
}
static void
transformOfType(ParseState *pstate, CreateStmtContext *cxt, TypeName *ofTypename)
transformOfType(CreateStmtContext *cxt, TypeName *ofTypename)
{
HeapTuple tuple;
Form_pg_type typ;
......@@ -937,6 +932,7 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
index->tableSpace = get_tablespace_name(idxrelrec->reltablespace);
else
index->tableSpace = NULL;
index->indexOid = InvalidOid;
index->unique = idxrec->indisunique;
index->primary = idxrec->indisprimary;
index->concurrent = false;
......@@ -1181,7 +1177,7 @@ get_opclass(Oid opclass, Oid actual_datatype)
* LIKE ... INCLUDING INDEXES.
*/
static void
transformIndexConstraints(ParseState *pstate, CreateStmtContext *cxt)
transformIndexConstraints(CreateStmtContext *cxt)
{
IndexStmt *index;
List *indexlist = NIL;
......@@ -1304,7 +1300,8 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("multiple primary keys for table \"%s\" are not allowed",
cxt->relation->relname)));
cxt->relation->relname),
parser_errposition(cxt->pstate, constraint->location)));
cxt->pkey = index;
/*
......@@ -1328,8 +1325,182 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
index->whereClause = constraint->where_clause;
index->indexParams = NIL;
index->excludeOpNames = NIL;
index->indexOid = InvalidOid;
index->concurrent = false;
/*
* If it's ALTER TABLE ADD CONSTRAINT USING INDEX, look up the index and
* verify it's usable, then extract the implied column name list. (We
* will not actually need the column name list at runtime, but we need
* it now to check for duplicate column entries below.)
*/
if (constraint->indexname != NULL)
{
char *index_name = constraint->indexname;
Relation heap_rel = cxt->rel;
Oid index_oid;
Relation index_rel;
Form_pg_index index_form;
oidvector *indclass;
Datum indclassDatum;
bool isnull;
int i;
/* Grammar should not allow this with explicit column list */
Assert(constraint->keys == NIL);
/* Grammar should only allow PRIMARY and UNIQUE constraints */
Assert(constraint->contype == CONSTR_PRIMARY ||
constraint->contype == CONSTR_UNIQUE);
/* Must be ALTER, not CREATE, but grammar doesn't enforce that */
if (!cxt->isalter)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use an existing index in CREATE TABLE"),
parser_errposition(cxt->pstate, constraint->location)));
/* Look for the index in the same schema as the table */
index_oid = get_relname_relid(index_name, RelationGetNamespace(heap_rel));
if (!OidIsValid(index_oid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("index \"%s\" does not exist", index_name),
parser_errposition(cxt->pstate, constraint->location)));
/* Open the index (this will throw an error if it is not an index) */
index_rel = index_open(index_oid, AccessShareLock);
index_form = index_rel->rd_index;
/* Check that it does not have an associated constraint already */
if (OidIsValid(get_index_constraint(index_oid)))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("index \"%s\" is already associated with a constraint",
index_name),
parser_errposition(cxt->pstate, constraint->location)));
/* Perform validity checks on the index */
if (index_form->indrelid != RelationGetRelid(heap_rel))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("index \"%s\" does not belong to table \"%s\"",
index_name, RelationGetRelationName(heap_rel)),
parser_errposition(cxt->pstate, constraint->location)));
if (!index_form->indisvalid)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("index \"%s\" is not valid", index_name),
parser_errposition(cxt->pstate, constraint->location)));
if (!index_form->indisready)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("index \"%s\" is not ready", index_name),
parser_errposition(cxt->pstate, constraint->location)));
if (!index_form->indisunique)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not a unique index", index_name),
errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."),
parser_errposition(cxt->pstate, constraint->location)));
if (RelationGetIndexExpressions(index_rel) != NIL)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("index \"%s\" contains expressions", index_name),
errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."),
parser_errposition(cxt->pstate, constraint->location)));
if (RelationGetIndexPredicate(index_rel) != NIL)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a partial index", index_name),
errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."),
parser_errposition(cxt->pstate, constraint->location)));
/*
* It's probably unsafe to change a deferred index to non-deferred.
* (A non-constraint index couldn't be deferred anyway, so this case
* should never occur; no need to sweat, but let's check it.)
*/
if (!index_form->indimmediate && !constraint->deferrable)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a deferrable index", index_name),
errdetail("Cannot create a non-deferrable constraint using a deferrable index."),
parser_errposition(cxt->pstate, constraint->location)));
/*
* Insist on it being a btree. That's the only kind that supports
* uniqueness at the moment anyway; but we must have an index that
* exactly matches what you'd get from plain ADD CONSTRAINT syntax,
* else dump and reload will produce a different index (breaking
* pg_upgrade in particular).
*/
if (index_rel->rd_rel->relam != get_am_oid(DEFAULT_INDEX_TYPE, false))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("index \"%s\" is not a b-tree", index_name),
parser_errposition(cxt->pstate, constraint->location)));
/* Must get indclass the hard way */
indclassDatum = SysCacheGetAttr(INDEXRELID, index_rel->rd_indextuple,
Anum_pg_index_indclass, &isnull);
Assert(!isnull);
indclass = (oidvector *) DatumGetPointer(indclassDatum);
for (i = 0; i < index_form->indnatts; i++)
{
int2 attnum = index_form->indkey.values[i];
Form_pg_attribute attform;
char *attname;
Oid defopclass;
/*
* We shouldn't see attnum == 0 here, since we already rejected
* expression indexes. If we do, SystemAttributeDefinition
* will throw an error.
*/
if (attnum > 0)
{
Assert(attnum <= heap_rel->rd_att->natts);
attform = heap_rel->rd_att->attrs[attnum - 1];
}
else
attform = SystemAttributeDefinition(attnum,
heap_rel->rd_rel->relhasoids);
attname = pstrdup(NameStr(attform->attname));
/*
* Insist on default opclass and sort options. While the index
* would still work as a constraint with non-default settings, it
* might not provide exactly the same uniqueness semantics as
* you'd get from a normally-created constraint; and there's also
* the dump/reload problem mentioned above.
*/
defopclass = GetDefaultOpClass(attform->atttypid,
index_rel->rd_rel->relam);
if (indclass->values[i] != defopclass ||
index_rel->rd_indoption[i] != 0)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("index \"%s\" does not have default sorting behavior", index_name),
errdetail("Cannot create a PRIMARY KEY or UNIQUE constraint using such an index."),
parser_errposition(cxt->pstate, constraint->location)));
constraint->keys = lappend(constraint->keys, makeString(attname));
}
/* Close the index relation but keep the lock */
relation_close(index_rel, NoLock);
index->indexOid = index_oid;
}
/*
* If it's an EXCLUDE constraint, the grammar returns a list of pairs of
* IndexElems and operator names. We have to break that apart into
......@@ -1450,8 +1621,8 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
if (!found && !cxt->isalter)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" named in key does not exist",
key)));
errmsg("column \"%s\" named in key does not exist", key),
parser_errposition(cxt->pstate, constraint->location)));
/* Check for PRIMARY KEY(foo, foo) */
foreach(columns, index->indexParams)
......@@ -1463,12 +1634,14 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column \"%s\" appears twice in primary key constraint",
key)));
key),
parser_errposition(cxt->pstate, constraint->location)));
else
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column \"%s\" appears twice in unique constraint",
key)));
key),
parser_errposition(cxt->pstate, constraint->location)));
}
}
......@@ -1491,7 +1664,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
* handle FOREIGN KEY constraints
*/
static void
transformFKConstraints(ParseState *pstate, CreateStmtContext *cxt,
transformFKConstraints(CreateStmtContext *cxt,
bool skipValidation, bool isAddConstraint)
{
ListCell *fkclist;
......@@ -1978,7 +2151,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
stmt = (AlterTableStmt *) copyObject(stmt);
/*
* Assign the appropriate lock level for this list of subcommands.
* Determine the appropriate lock level for this list of subcommands.
*/
lockmode = AlterTableGetLockLevel(stmt->cmds);
......@@ -1992,10 +2165,11 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
*/
rel = relation_openrv(stmt->relation, lockmode);
/* Set up pstate */
/* Set up pstate and CreateStmtContext */
pstate = make_parsestate(NULL);
pstate->p_sourcetext = queryString;
cxt.pstate = pstate;
cxt.stmtType = "ALTER TABLE";
cxt.relation = stmt->relation;
cxt.rel = rel;
......@@ -2028,7 +2202,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
ColumnDef *def = (ColumnDef *) cmd->def;
Assert(IsA(def, ColumnDef));
transformColumnDefinition(pstate, &cxt, def);
transformColumnDefinition(&cxt, def);
/*
* If the column has a non-null default, we can't skip
......@@ -2053,8 +2227,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
*/
if (IsA(cmd->def, Constraint))
{
transformTableConstraint(pstate, &cxt,
(Constraint *) cmd->def);
transformTableConstraint(&cxt, (Constraint *) cmd->def);
if (((Constraint *) cmd->def)->contype == CONSTR_FOREIGN)
skipValidation = false;
}
......@@ -2088,25 +2261,25 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
cxt.alist = NIL;
/* Postprocess index and FK constraints */
transformIndexConstraints(pstate, &cxt);
transformIndexConstraints(&cxt);
transformFKConstraints(pstate, &cxt, skipValidation, true);
transformFKConstraints(&cxt, skipValidation, true);
/*
* Push any index-creation commands into the ALTER, so that they can be
* scheduled nicely by tablecmds.c. Note that tablecmds.c assumes that
* the IndexStmt attached to an AT_AddIndex subcommand has already been
* through transformIndexStmt.
* the IndexStmt attached to an AT_AddIndex or AT_AddIndexConstraint
* subcommand has already been through transformIndexStmt.
*/
foreach(l, cxt.alist)
{
Node *idxstmt = (Node *) lfirst(l);
IndexStmt *idxstmt = (IndexStmt *) lfirst(l);
Assert(IsA(idxstmt, IndexStmt));
idxstmt = transformIndexStmt(idxstmt, queryString);
newcmd = makeNode(AlterTableCmd);
newcmd->subtype = AT_AddIndex;
newcmd->def = (Node *) transformIndexStmt((IndexStmt *) idxstmt,
queryString);
newcmd->subtype = OidIsValid(idxstmt->indexOid) ? AT_AddIndexConstraint : AT_AddIndex;
newcmd->def = (Node *) idxstmt;
newcmds = lappend(newcmds, newcmd);
}
cxt.alist = NIL;
......@@ -2153,7 +2326,7 @@ transformAlterTableStmt(AlterTableStmt *stmt, const char *queryString)
* for other constraint types.
*/
static void
transformConstraintAttrs(ParseState *pstate, List *constraintList)
transformConstraintAttrs(CreateStmtContext *cxt, List *constraintList)
{
Constraint *lastprimarycon = NULL;
bool saw_deferrability = false;
......@@ -2181,12 +2354,12 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("misplaced DEFERRABLE clause"),
parser_errposition(pstate, con->location)));
parser_errposition(cxt->pstate, con->location)));
if (saw_deferrability)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("multiple DEFERRABLE/NOT DEFERRABLE clauses not allowed"),
parser_errposition(pstate, con->location)));
parser_errposition(cxt->pstate, con->location)));
saw_deferrability = true;
lastprimarycon->deferrable = true;
break;
......@@ -2196,12 +2369,12 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("misplaced NOT DEFERRABLE clause"),
parser_errposition(pstate, con->location)));
parser_errposition(cxt->pstate, con->location)));
if (saw_deferrability)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("multiple DEFERRABLE/NOT DEFERRABLE clauses not allowed"),
parser_errposition(pstate, con->location)));
parser_errposition(cxt->pstate, con->location)));
saw_deferrability = true;
lastprimarycon->deferrable = false;
if (saw_initially &&
......@@ -2209,7 +2382,7 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("constraint declared INITIALLY DEFERRED must be DEFERRABLE"),
parser_errposition(pstate, con->location)));
parser_errposition(cxt->pstate, con->location)));
break;
case CONSTR_ATTR_DEFERRED:
......@@ -2217,12 +2390,12 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("misplaced INITIALLY DEFERRED clause"),
parser_errposition(pstate, con->location)));
parser_errposition(cxt->pstate, con->location)));
if (saw_initially)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("multiple INITIALLY IMMEDIATE/DEFERRED clauses not allowed"),
parser_errposition(pstate, con->location)));
parser_errposition(cxt->pstate, con->location)));
saw_initially = true;
lastprimarycon->initdeferred = true;
......@@ -2235,7 +2408,7 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("constraint declared INITIALLY DEFERRED must be DEFERRABLE"),
parser_errposition(pstate, con->location)));
parser_errposition(cxt->pstate, con->location)));
break;
case CONSTR_ATTR_IMMEDIATE:
......@@ -2243,12 +2416,12 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("misplaced INITIALLY IMMEDIATE clause"),
parser_errposition(pstate, con->location)));
parser_errposition(cxt->pstate, con->location)));
if (saw_initially)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("multiple INITIALLY IMMEDIATE/DEFERRED clauses not allowed"),
parser_errposition(pstate, con->location)));
parser_errposition(cxt->pstate, con->location)));
saw_initially = true;
lastprimarycon->initdeferred = false;
break;
......@@ -2268,12 +2441,12 @@ transformConstraintAttrs(ParseState *pstate, List *constraintList)
* Special handling of type definition for a column
*/
static void
transformColumnType(ParseState *pstate, ColumnDef *column)
transformColumnType(CreateStmtContext *cxt, ColumnDef *column)
{
/*
* All we really need to do here is verify that the type is valid.
*/
Type ctype = typenameType(pstate, column->typeName, NULL);
Type ctype = typenameType(cxt->pstate, column->typeName, NULL);
ReleaseSysCache(ctype);
}
......
......@@ -28,7 +28,11 @@ typedef void (*IndexBuildCallback) (Relation index,
void *state);
extern Oid index_create(Oid heapRelationId,
extern void index_check_primary_key(Relation heapRel,
IndexInfo *indexInfo,
bool is_alter_table);
extern Oid index_create(Relation heapRelation,
const char *indexRelationName,
Oid indexRelationId,
IndexInfo *indexInfo,
......@@ -46,6 +50,17 @@ extern Oid index_create(Oid heapRelationId,
bool skip_build,
bool concurrent);
extern void index_constraint_create(Relation heapRelation,
Oid indexRelationId,
IndexInfo *indexInfo,
const char *constraintName,
char constraintType,
bool deferrable,
bool initdeferred,
bool mark_as_primary,
bool update_pgindex,
bool allow_system_table_mods);
extern void index_drop(Oid indexId);
extern IndexInfo *BuildIndexInfo(Relation index);
......
......@@ -1142,6 +1142,7 @@ typedef enum AlterTableType
AT_AddConstraintRecurse, /* internal to commands/tablecmds.c */
AT_ProcessedConstraint, /* pre-processed add constraint (local in
* parser/parse_utilcmd.c) */
AT_AddIndexConstraint, /* add constraint using existing index */
AT_DropConstraint, /* drop constraint */
AT_DropConstraintRecurse, /* internal to commands/tablecmds.c */
AT_AlterColumnType, /* alter column type */
......@@ -1477,6 +1478,7 @@ typedef struct Constraint
/* Fields used for index constraints (UNIQUE, PRIMARY KEY, EXCLUSION): */
List *options; /* options from WITH clause */
char *indexname; /* existing index to use; otherwise NULL */
char *indexspace; /* index tablespace; NULL for default */
/* These could be, but currently are not, used for UNIQUE/PKEY: */
char *access_method; /* index access method; NULL for default */
......@@ -1953,6 +1955,12 @@ typedef struct FetchStmt
/* ----------------------
* Create Index Statement
*
* This represents creation of an index and/or an associated constraint.
* If indexOid isn't InvalidOid, we are not creating an index, just a
* UNIQUE/PKEY constraint using an existing index. isconstraint must always
* be true in this case, and the fields describing the index properties are
* empty.
* ----------------------
*/
typedef struct IndexStmt
......@@ -1966,6 +1974,7 @@ typedef struct IndexStmt
List *options; /* options from WITH clause */
Node *whereClause; /* qualification (partial-index predicate) */
List *excludeOpNames; /* exclusion operator names, or NIL if none */
Oid indexOid; /* OID of an existing index, if any */
bool unique; /* is index unique? */
bool primary; /* is index on primary key? */
bool isconstraint; /* is it from a CONSTRAINT clause? */
......
......@@ -1210,6 +1210,43 @@ Indexes:
DROP TABLE concur_heap;
--
-- Test ADD CONSTRAINT USING INDEX
--
CREATE TABLE cwi_test( a int , b varchar(10), c char);
-- add some data so that all tests have something to work with.
INSERT INTO cwi_test VALUES(1, 2), (3, 4), (5, 6);
CREATE UNIQUE INDEX cwi_uniq_idx ON cwi_test(a , b);
ALTER TABLE cwi_test ADD primary key USING INDEX cwi_uniq_idx;
\d cwi_test
Table "public.cwi_test"
Column | Type | Modifiers
--------+-----------------------+-----------
a | integer | not null
b | character varying(10) | not null
c | character(1) |
Indexes:
"cwi_uniq_idx" PRIMARY KEY, btree (a, b)
CREATE UNIQUE INDEX cwi_uniq2_idx ON cwi_test(b , a);
ALTER TABLE cwi_test DROP CONSTRAINT cwi_uniq_idx,
ADD CONSTRAINT cwi_replaced_pkey PRIMARY KEY
USING INDEX cwi_uniq2_idx;
NOTICE: ALTER TABLE / ADD CONSTRAINT USING INDEX will rename index "cwi_uniq2_idx" to "cwi_replaced_pkey"
\d cwi_test
Table "public.cwi_test"
Column | Type | Modifiers
--------+-----------------------+-----------
a | integer | not null
b | character varying(10) | not null
c | character(1) |
Indexes:
"cwi_replaced_pkey" PRIMARY KEY, btree (b, a)
DROP INDEX cwi_replaced_pkey; -- Should fail; a constraint depends on it
ERROR: cannot drop index cwi_replaced_pkey because constraint cwi_replaced_pkey on table cwi_test requires it
HINT: You can drop constraint cwi_replaced_pkey on table cwi_test instead.
DROP TABLE cwi_test;
--
-- Tests for IS NULL/IS NOT NULL with b-tree indexes
--
SELECT unique1, unique2 INTO onek_with_null FROM onek;
......
......@@ -409,6 +409,32 @@ COMMIT;
DROP TABLE concur_heap;
--
-- Test ADD CONSTRAINT USING INDEX
--
CREATE TABLE cwi_test( a int , b varchar(10), c char);
-- add some data so that all tests have something to work with.
INSERT INTO cwi_test VALUES(1, 2), (3, 4), (5, 6);
CREATE UNIQUE INDEX cwi_uniq_idx ON cwi_test(a , b);
ALTER TABLE cwi_test ADD primary key USING INDEX cwi_uniq_idx;
\d cwi_test
CREATE UNIQUE INDEX cwi_uniq2_idx ON cwi_test(b , a);
ALTER TABLE cwi_test DROP CONSTRAINT cwi_uniq_idx,
ADD CONSTRAINT cwi_replaced_pkey PRIMARY KEY
USING INDEX cwi_uniq2_idx;
\d cwi_test
DROP INDEX cwi_replaced_pkey; -- Should fail; a constraint depends on it
DROP TABLE cwi_test;
--
-- Tests for IS NULL/IS NOT NULL with b-tree indexes
--
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment