Commit 19781729 authored by Peter Eisentraut's avatar Peter Eisentraut

Make identity sequence management more robust

Some code could get confused when certain catalog state involving both
identity and serial sequences was present, perhaps during an attempt
to upgrade the latter to the former.  Specifically, dropping the
default of a serial column maintains the ownership of the sequence by
the column, and so it would then be possible to afterwards make the
column an identity column that would now own two sequences.  This
causes the code that looks up the identity sequence to error out,
making the new identity column inoperable until the ownership of the
previous sequence is released.

To fix this, make the identity sequence lookup only consider sequences
with the appropriate dependency type for an identity sequence, so it
only ever finds one (unless something else is broken).  In the above
example, the old serial sequence would then be ignored.  Reorganize
the various owned-sequence-lookup functions a bit to make this
clearer.
Reported-by: default avatarLaurenz Albe <laurenz.albe@cybertec.at>
Discussion: https://www.postgresql.org/message-id/flat/470c54fc8590be4de0f41b0d295fd6390d5e8a6c.camel@cybertec.at
parent efdcca55
...@@ -705,10 +705,11 @@ sequenceIsOwned(Oid seqId, char deptype, Oid *tableId, int32 *colId) ...@@ -705,10 +705,11 @@ sequenceIsOwned(Oid seqId, char deptype, Oid *tableId, int32 *colId)
/* /*
* Collect a list of OIDs of all sequences owned by the specified relation, * Collect a list of OIDs of all sequences owned by the specified relation,
* and column if specified. * and column if specified. If deptype is not zero, then only find sequences
* with the specified dependency type.
*/ */
List * static List *
getOwnedSequences(Oid relid, AttrNumber attnum) getOwnedSequences_internal(Oid relid, AttrNumber attnum, char deptype)
{ {
List *result = NIL; List *result = NIL;
Relation depRel; Relation depRel;
...@@ -750,7 +751,8 @@ getOwnedSequences(Oid relid, AttrNumber attnum) ...@@ -750,7 +751,8 @@ getOwnedSequences(Oid relid, AttrNumber attnum)
(deprec->deptype == DEPENDENCY_AUTO || deprec->deptype == DEPENDENCY_INTERNAL) && (deprec->deptype == DEPENDENCY_AUTO || deprec->deptype == DEPENDENCY_INTERNAL) &&
get_rel_relkind(deprec->objid) == RELKIND_SEQUENCE) get_rel_relkind(deprec->objid) == RELKIND_SEQUENCE)
{ {
result = lappend_oid(result, deprec->objid); if (!deptype || deprec->deptype == deptype)
result = lappend_oid(result, deprec->objid);
} }
} }
...@@ -762,17 +764,32 @@ getOwnedSequences(Oid relid, AttrNumber attnum) ...@@ -762,17 +764,32 @@ getOwnedSequences(Oid relid, AttrNumber attnum)
} }
/* /*
* Get owned sequence, error if not exactly one. * Collect a list of OIDs of all sequences owned (identity or serial) by the
* specified relation.
*/
List *
getOwnedSequences(Oid relid)
{
return getOwnedSequences_internal(relid, 0, 0);
}
/*
* Get owned identity sequence, error if not exactly one.
*/ */
Oid Oid
getOwnedSequence(Oid relid, AttrNumber attnum) getIdentitySequence(Oid relid, AttrNumber attnum, bool missing_ok)
{ {
List *seqlist = getOwnedSequences(relid, attnum); List *seqlist = getOwnedSequences_internal(relid, attnum, DEPENDENCY_INTERNAL);
if (list_length(seqlist) > 1) if (list_length(seqlist) > 1)
elog(ERROR, "more than one owned sequence found"); elog(ERROR, "more than one owned sequence found");
else if (list_length(seqlist) < 1) else if (list_length(seqlist) < 1)
elog(ERROR, "no owned sequence found"); {
if (missing_ok)
return InvalidOid;
else
elog(ERROR, "no owned sequence found");
}
return linitial_oid(seqlist); return linitial_oid(seqlist);
} }
......
...@@ -1689,7 +1689,7 @@ ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged, ...@@ -1689,7 +1689,7 @@ ExecuteTruncateGuts(List *explicit_rels, List *relids, List *relids_logged,
foreach(cell, rels) foreach(cell, rels)
{ {
Relation rel = (Relation) lfirst(cell); Relation rel = (Relation) lfirst(cell);
List *seqlist = getOwnedSequences(RelationGetRelid(rel), 0); List *seqlist = getOwnedSequences(RelationGetRelid(rel));
ListCell *seqcell; ListCell *seqcell;
foreach(seqcell, seqlist) foreach(seqcell, seqlist)
...@@ -6635,7 +6635,7 @@ ATExecDropIdentity(Relation rel, const char *colName, bool missing_ok, LOCKMODE ...@@ -6635,7 +6635,7 @@ ATExecDropIdentity(Relation rel, const char *colName, bool missing_ok, LOCKMODE
table_close(attrelation, RowExclusiveLock); table_close(attrelation, RowExclusiveLock);
/* drop the internal sequence */ /* drop the internal sequence */
seqid = getOwnedSequence(RelationGetRelid(rel), attnum); seqid = getIdentitySequence(RelationGetRelid(rel), attnum, false);
deleteDependencyRecordsForClass(RelationRelationId, seqid, deleteDependencyRecordsForClass(RelationRelationId, seqid,
RelationRelationId, DEPENDENCY_INTERNAL); RelationRelationId, DEPENDENCY_INTERNAL);
CommandCounterIncrement(); CommandCounterIncrement();
......
...@@ -1083,7 +1083,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla ...@@ -1083,7 +1083,7 @@ transformTableLikeClause(CreateStmtContext *cxt, TableLikeClause *table_like_cla
* find sequence owned by old column; extract sequence parameters; * find sequence owned by old column; extract sequence parameters;
* build new create sequence command * build new create sequence command
*/ */
seq_relid = getOwnedSequence(RelationGetRelid(relation), attribute->attnum); seq_relid = getIdentitySequence(RelationGetRelid(relation), attribute->attnum, false);
seq_options = sequence_options(seq_relid); seq_options = sequence_options(seq_relid);
generateSerialExtraStmts(cxt, def, generateSerialExtraStmts(cxt, def,
InvalidOid, seq_options, true, InvalidOid, seq_options, true,
...@@ -3165,7 +3165,7 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt, ...@@ -3165,7 +3165,7 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
if (attnum != InvalidAttrNumber && if (attnum != InvalidAttrNumber &&
TupleDescAttr(tupdesc, attnum - 1)->attidentity) TupleDescAttr(tupdesc, attnum - 1)->attidentity)
{ {
Oid seq_relid = getOwnedSequence(relid, attnum); Oid seq_relid = getIdentitySequence(relid, attnum, false);
Oid typeOid = typenameTypeId(pstate, def->typeName); Oid typeOid = typenameTypeId(pstate, def->typeName);
AlterSeqStmt *altseqstmt = makeNode(AlterSeqStmt); AlterSeqStmt *altseqstmt = makeNode(AlterSeqStmt);
...@@ -3216,7 +3216,6 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt, ...@@ -3216,7 +3216,6 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
ListCell *lc; ListCell *lc;
List *newseqopts = NIL; List *newseqopts = NIL;
List *newdef = NIL; List *newdef = NIL;
List *seqlist;
AttrNumber attnum; AttrNumber attnum;
/* /*
...@@ -3237,14 +3236,13 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt, ...@@ -3237,14 +3236,13 @@ transformAlterTableStmt(Oid relid, AlterTableStmt *stmt,
if (attnum) if (attnum)
{ {
seqlist = getOwnedSequences(relid, attnum); Oid seq_relid = getIdentitySequence(relid, attnum, true);
if (seqlist)
if (seq_relid)
{ {
AlterSeqStmt *seqstmt; AlterSeqStmt *seqstmt;
Oid seq_relid;
seqstmt = makeNode(AlterSeqStmt); seqstmt = makeNode(AlterSeqStmt);
seq_relid = linitial_oid(seqlist);
seqstmt->sequence = makeRangeVar(get_namespace_name(get_rel_namespace(seq_relid)), seqstmt->sequence = makeRangeVar(get_namespace_name(get_rel_namespace(seq_relid)),
get_rel_name(seq_relid), -1); get_rel_name(seq_relid), -1);
seqstmt->options = newseqopts; seqstmt->options = newseqopts;
......
...@@ -1130,7 +1130,7 @@ build_column_default(Relation rel, int attrno) ...@@ -1130,7 +1130,7 @@ build_column_default(Relation rel, int attrno)
{ {
NextValueExpr *nve = makeNode(NextValueExpr); NextValueExpr *nve = makeNode(NextValueExpr);
nve->seqid = getOwnedSequence(RelationGetRelid(rel), attrno); nve->seqid = getIdentitySequence(RelationGetRelid(rel), attrno, false);
nve->typeId = att_tup->atttypid; nve->typeId = att_tup->atttypid;
return (Node *) nve; return (Node *) nve;
......
...@@ -209,8 +209,8 @@ extern long changeDependenciesOn(Oid refClassId, Oid oldRefObjectId, ...@@ -209,8 +209,8 @@ extern long changeDependenciesOn(Oid refClassId, Oid oldRefObjectId,
extern Oid getExtensionOfObject(Oid classId, Oid objectId); extern Oid getExtensionOfObject(Oid classId, Oid objectId);
extern bool sequenceIsOwned(Oid seqId, char deptype, Oid *tableId, int32 *colId); extern bool sequenceIsOwned(Oid seqId, char deptype, Oid *tableId, int32 *colId);
extern List *getOwnedSequences(Oid relid, AttrNumber attnum); extern List *getOwnedSequences(Oid relid);
extern Oid getOwnedSequence(Oid relid, AttrNumber attnum); extern Oid getIdentitySequence(Oid relid, AttrNumber attnum, bool missing_ok);
extern Oid get_constraint_index(Oid constraintId); extern Oid get_constraint_index(Oid constraintId);
......
...@@ -399,3 +399,8 @@ CREATE TABLE itest_child PARTITION OF itest_parent ( ...@@ -399,3 +399,8 @@ CREATE TABLE itest_child PARTITION OF itest_parent (
) FOR VALUES FROM ('2016-07-01') TO ('2016-08-01'); -- error ) FOR VALUES FROM ('2016-07-01') TO ('2016-08-01'); -- error
ERROR: identity columns are not supported on partitions ERROR: identity columns are not supported on partitions
DROP TABLE itest_parent; DROP TABLE itest_parent;
-- test that sequence of half-dropped serial column is properly ignored
CREATE TABLE itest14 (id serial);
ALTER TABLE itest14 ALTER id DROP DEFAULT;
ALTER TABLE itest14 ALTER id ADD GENERATED BY DEFAULT AS IDENTITY;
INSERT INTO itest14 (id) VALUES (DEFAULT);
...@@ -254,3 +254,11 @@ CREATE TABLE itest_child PARTITION OF itest_parent ( ...@@ -254,3 +254,11 @@ CREATE TABLE itest_child PARTITION OF itest_parent (
f3 WITH OPTIONS GENERATED ALWAYS AS IDENTITY f3 WITH OPTIONS GENERATED ALWAYS AS IDENTITY
) FOR VALUES FROM ('2016-07-01') TO ('2016-08-01'); -- error ) FOR VALUES FROM ('2016-07-01') TO ('2016-08-01'); -- error
DROP TABLE itest_parent; DROP TABLE itest_parent;
-- test that sequence of half-dropped serial column is properly ignored
CREATE TABLE itest14 (id serial);
ALTER TABLE itest14 ALTER id DROP DEFAULT;
ALTER TABLE itest14 ALTER id ADD GENERATED BY DEFAULT AS IDENTITY;
INSERT INTO itest14 (id) VALUES (DEFAULT);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment