Commit 5f5c0145 authored by Tom Lane's avatar Tom Lane

Allow RECORD and RECORD[] to be specified in function coldeflists.

We can't allow these pseudo-types to be used as table column types,
because storing an anonymous record value in a table would result
in data that couldn't be understood by other sessions.  However,
it seems like there's no harm in allowing the case in a column
definition list that's specifying what a function-returning-record
returns.  The data involved is all local to the current session,
so we should be just as able to resolve its actual tuple type as
we are for the function-returning-record's top-level tuple output.

Elvis Pranskevichus, with cosmetic changes by me

Discussion: https://postgr.es/m/11038447.kQ5A9Uj5xi@hammer.magicstack.net
parent 689d15e9
...@@ -445,11 +445,14 @@ heap_create(const char *relname, ...@@ -445,11 +445,14 @@ heap_create(const char *relname,
* this is used to make certain the tuple descriptor contains a * this is used to make certain the tuple descriptor contains a
* valid set of attribute names and datatypes. a problem simply * valid set of attribute names and datatypes. a problem simply
* generates ereport(ERROR) which aborts the current transaction. * generates ereport(ERROR) which aborts the current transaction.
*
* relkind is the relkind of the relation to be created.
* flags controls which datatypes are allowed, cf CheckAttributeType.
* -------------------------------- * --------------------------------
*/ */
void void
CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind, CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
bool allow_system_table_mods) int flags)
{ {
int i; int i;
int j; int j;
...@@ -507,7 +510,7 @@ CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind, ...@@ -507,7 +510,7 @@ CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
TupleDescAttr(tupdesc, i)->atttypid, TupleDescAttr(tupdesc, i)->atttypid,
TupleDescAttr(tupdesc, i)->attcollation, TupleDescAttr(tupdesc, i)->attcollation,
NIL, /* assume we're creating a new rowtype */ NIL, /* assume we're creating a new rowtype */
allow_system_table_mods); flags);
} }
} }
...@@ -524,13 +527,22 @@ CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind, ...@@ -524,13 +527,22 @@ CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
* containing_rowtypes. When checking a to-be-created rowtype, it's * containing_rowtypes. When checking a to-be-created rowtype, it's
* sufficient to pass NIL, because there could not be any recursive reference * sufficient to pass NIL, because there could not be any recursive reference
* to a not-yet-existing rowtype. * to a not-yet-existing rowtype.
*
* flags is a bitmask controlling which datatypes we allow. For the most
* part, pseudo-types are disallowed as attribute types, but there are some
* exceptions: ANYARRAYOID, RECORDOID, and RECORDARRAYOID can be allowed
* in some cases. (This works because values of those type classes are
* self-identifying to some extent. However, RECORDOID and RECORDARRAYOID
* are reliably identifiable only within a session, since the identity info
* may use a typmod that is only locally assigned. The caller is expected
* to know whether these cases are safe.)
* -------------------------------- * --------------------------------
*/ */
void void
CheckAttributeType(const char *attname, CheckAttributeType(const char *attname,
Oid atttypid, Oid attcollation, Oid atttypid, Oid attcollation,
List *containing_rowtypes, List *containing_rowtypes,
bool allow_system_table_mods) int flags)
{ {
char att_typtype = get_typtype(atttypid); char att_typtype = get_typtype(atttypid);
Oid att_typelem; Oid att_typelem;
...@@ -538,12 +550,18 @@ CheckAttributeType(const char *attname, ...@@ -538,12 +550,18 @@ CheckAttributeType(const char *attname,
if (att_typtype == TYPTYPE_PSEUDO) if (att_typtype == TYPTYPE_PSEUDO)
{ {
/* /*
* Refuse any attempt to create a pseudo-type column, except for a * We disallow pseudo-type columns, with the exception of ANYARRAY,
* special hack for pg_statistic: allow ANYARRAY when modifying system * RECORD, and RECORD[] when the caller says that those are OK.
* catalogs (this allows creating pg_statistic and cloning it during *
* VACUUM FULL) * We don't need to worry about recursive containment for RECORD and
* RECORD[] because (a) no named composite type should be allowed to
* contain those, and (b) two "anonymous" record types couldn't be
* considered to be the same type, so infinite recursion isn't
* possible.
*/ */
if (atttypid != ANYARRAYOID || !allow_system_table_mods) if (!((atttypid == ANYARRAYOID && (flags & CHKATYPE_ANYARRAY)) ||
(atttypid == RECORDOID && (flags & CHKATYPE_ANYRECORD)) ||
(atttypid == RECORDARRAYOID && (flags & CHKATYPE_ANYRECORD))))
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INVALID_TABLE_DEFINITION), (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("column \"%s\" has pseudo-type %s", errmsg("column \"%s\" has pseudo-type %s",
...@@ -556,7 +574,7 @@ CheckAttributeType(const char *attname, ...@@ -556,7 +574,7 @@ CheckAttributeType(const char *attname,
*/ */
CheckAttributeType(attname, getBaseType(atttypid), attcollation, CheckAttributeType(attname, getBaseType(atttypid), attcollation,
containing_rowtypes, containing_rowtypes,
allow_system_table_mods); flags);
} }
else if (att_typtype == TYPTYPE_COMPOSITE) else if (att_typtype == TYPTYPE_COMPOSITE)
{ {
...@@ -594,7 +612,7 @@ CheckAttributeType(const char *attname, ...@@ -594,7 +612,7 @@ CheckAttributeType(const char *attname,
CheckAttributeType(NameStr(attr->attname), CheckAttributeType(NameStr(attr->attname),
attr->atttypid, attr->attcollation, attr->atttypid, attr->attcollation,
containing_rowtypes, containing_rowtypes,
allow_system_table_mods); flags);
} }
relation_close(relation, AccessShareLock); relation_close(relation, AccessShareLock);
...@@ -608,7 +626,7 @@ CheckAttributeType(const char *attname, ...@@ -608,7 +626,7 @@ CheckAttributeType(const char *attname,
*/ */
CheckAttributeType(attname, att_typelem, attcollation, CheckAttributeType(attname, att_typelem, attcollation,
containing_rowtypes, containing_rowtypes,
allow_system_table_mods); flags);
} }
/* /*
...@@ -1074,7 +1092,13 @@ heap_create_with_catalog(const char *relname, ...@@ -1074,7 +1092,13 @@ heap_create_with_catalog(const char *relname,
*/ */
Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode()); Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods); /*
* Validate proposed tupdesc for the desired relkind. If
* allow_system_table_mods is on, allow ANYARRAY to be used; this is a
* hack to allow creating pg_statistic and cloning it during VACUUM FULL.
*/
CheckAttributeNamesTypes(tupdesc, relkind,
allow_system_table_mods ? CHKATYPE_ANYARRAY : 0);
/* /*
* This would fail later on anyway, if the relation already exists. But * This would fail later on anyway, if the relation already exists. But
......
...@@ -410,7 +410,7 @@ ConstructTupleDescriptor(Relation heapRelation, ...@@ -410,7 +410,7 @@ ConstructTupleDescriptor(Relation heapRelation,
*/ */
CheckAttributeType(NameStr(to->attname), CheckAttributeType(NameStr(to->attname),
to->atttypid, to->attcollation, to->atttypid, to->attcollation,
NIL, false); NIL, 0);
} }
/* /*
......
...@@ -5505,7 +5505,7 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel, ...@@ -5505,7 +5505,7 @@ ATExecAddColumn(List **wqueue, AlteredTableInfo *tab, Relation rel,
/* make sure datatype is legal for a column */ /* make sure datatype is legal for a column */
CheckAttributeType(colDef->colname, typeOid, collOid, CheckAttributeType(colDef->colname, typeOid, collOid,
list_make1_oid(rel->rd_rel->reltype), list_make1_oid(rel->rd_rel->reltype),
false); 0);
/* construct new attribute's pg_attribute entry */ /* construct new attribute's pg_attribute entry */
attribute.attrelid = myrelid; attribute.attrelid = myrelid;
...@@ -9445,7 +9445,7 @@ ATPrepAlterColumnType(List **wqueue, ...@@ -9445,7 +9445,7 @@ ATPrepAlterColumnType(List **wqueue,
/* make sure datatype is legal for a column */ /* make sure datatype is legal for a column */
CheckAttributeType(colName, targettype, targetcollid, CheckAttributeType(colName, targettype, targetcollid,
list_make1_oid(rel->rd_rel->reltype), list_make1_oid(rel->rd_rel->reltype),
false); 0);
if (tab->relkind == RELKIND_RELATION || if (tab->relkind == RELKIND_RELATION ||
tab->relkind == RELKIND_PARTITIONED_TABLE) tab->relkind == RELKIND_PARTITIONED_TABLE)
......
...@@ -1590,9 +1590,15 @@ addRangeTableEntryForFunction(ParseState *pstate, ...@@ -1590,9 +1590,15 @@ addRangeTableEntryForFunction(ParseState *pstate,
/* /*
* Ensure that the coldeflist defines a legal set of names (no * Ensure that the coldeflist defines a legal set of names (no
* duplicates) and datatypes (no pseudo-types, for instance). * duplicates, but we needn't worry about system column names) and
* datatypes. Although we mostly can't allow pseudo-types, it
* seems safe to allow RECORD and RECORD[], since values within
* those type classes are self-identifying at runtime, and the
* coldeflist doesn't represent anything that will be visible to
* other sessions.
*/ */
CheckAttributeNamesTypes(tupdesc, RELKIND_COMPOSITE_TYPE, false); CheckAttributeNamesTypes(tupdesc, RELKIND_COMPOSITE_TYPE,
CHKATYPE_ANYRECORD);
} }
else else
ereport(ERROR, ereport(ERROR,
......
...@@ -19,6 +19,10 @@ ...@@ -19,6 +19,10 @@
#include "parser/parse_node.h" #include "parser/parse_node.h"
/* flag bits for CheckAttributeType/CheckAttributeNamesTypes */
#define CHKATYPE_ANYARRAY 0x01 /* allow ANYARRAY */
#define CHKATYPE_ANYRECORD 0x02 /* allow RECORD and RECORD[] */
typedef struct RawColumnDefault typedef struct RawColumnDefault
{ {
AttrNumber attnum; /* attribute to attach default to */ AttrNumber attnum; /* attribute to attach default to */
...@@ -130,12 +134,12 @@ extern const FormData_pg_attribute *SystemAttributeDefinition(AttrNumber attno); ...@@ -130,12 +134,12 @@ extern const FormData_pg_attribute *SystemAttributeDefinition(AttrNumber attno);
extern const FormData_pg_attribute *SystemAttributeByName(const char *attname); extern const FormData_pg_attribute *SystemAttributeByName(const char *attname);
extern void CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind, extern void CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
bool allow_system_table_mods); int flags);
extern void CheckAttributeType(const char *attname, extern void CheckAttributeType(const char *attname,
Oid atttypid, Oid attcollation, Oid atttypid, Oid attcollation,
List *containing_rowtypes, List *containing_rowtypes,
bool allow_system_table_mods); int flags);
/* pg_partitioned_table catalog manipulation functions */ /* pg_partitioned_table catalog manipulation functions */
extern void StorePartitionKey(Relation rel, extern void StorePartitionKey(Relation rel,
......
...@@ -667,6 +667,17 @@ select row(1, '(1,2)')::testtype6 *<> row(1, '(1,3)')::testtype6; ...@@ -667,6 +667,17 @@ select row(1, '(1,2)')::testtype6 *<> row(1, '(1,3)')::testtype6;
t t
(1 row) (1 row)
-- anonymous rowtypes in coldeflists
select q.a, q.b = row(2), q.c = array[row(3)], q.d = row(row(4)) from
unnest(array[row(1, row(2), array[row(3)], row(row(4))),
row(2, row(3), array[row(4)], row(row(5)))])
as q(a int, b record, c record[], d record);
a | ?column? | ?column? | ?column?
---+----------+----------+----------
1 | t | t | t
2 | f | f | f
(2 rows)
drop type testtype1, testtype2, testtype3, testtype4, testtype5, testtype6; drop type testtype1, testtype2, testtype3, testtype4, testtype5, testtype6;
-- --
-- Test case derived from bug #5716: check multiple uses of a rowtype result -- Test case derived from bug #5716: check multiple uses of a rowtype result
......
...@@ -259,6 +259,12 @@ select row(1, '(1,2)')::testtype6 *< row(1, '(1,3)')::testtype6; ...@@ -259,6 +259,12 @@ select row(1, '(1,2)')::testtype6 *< row(1, '(1,3)')::testtype6;
select row(1, '(1,2)')::testtype6 *>= row(1, '(1,3)')::testtype6; select row(1, '(1,2)')::testtype6 *>= row(1, '(1,3)')::testtype6;
select row(1, '(1,2)')::testtype6 *<> row(1, '(1,3)')::testtype6; select row(1, '(1,2)')::testtype6 *<> row(1, '(1,3)')::testtype6;
-- anonymous rowtypes in coldeflists
select q.a, q.b = row(2), q.c = array[row(3)], q.d = row(row(4)) from
unnest(array[row(1, row(2), array[row(3)], row(row(4))),
row(2, row(3), array[row(4)], row(row(5)))])
as q(a int, b record, c record[], d record);
drop type testtype1, testtype2, testtype3, testtype4, testtype5, testtype6; drop type testtype1, testtype2, testtype3, testtype4, testtype5, testtype6;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment