Commit a3c7a993 authored by Tom Lane's avatar Tom Lane

Make INSERT-from-multiple-VALUES-rows handle targetlist indirection better.

Previously, if an INSERT with multiple rows of VALUES had indirection
(array subscripting or field selection) in its target-columns list, the
parser handled that by applying transformAssignedExpr() to each element
of each VALUES row independently.  This led to having ArrayRef assignment
nodes or FieldStore nodes in each row of the VALUES RTE.  That works for
simple cases, but in bug #14265 Nuri Boardman points out that it fails
if there are multiple assignments to elements/fields of the same target
column.  For such cases to work, rewriteTargetListIU() has to nest the
ArrayRefs or FieldStores together to produce a single expression to be
assigned to the column.  But it failed to find them in the top-level
targetlist and issued an error about "multiple assignments to same column".

We could possibly fix this by teaching the rewriter to apply
rewriteTargetListIU to each VALUES row separately, but that would be messy
(it would change the output rowtype of the VALUES RTE, for example) and
inefficient.  Instead, let's fix the parser so that the VALUES RTE outputs
are just the user-specified values, cast to the right type if necessary,
and then the ArrayRefs or FieldStores are applied in the top-level
targetlist to Vars representing the RTE's outputs.  This is the same
parsetree representation already used for similar cases with INSERT/SELECT
syntax, so it allows simplifications in ruleutils.c, which no longer needs
to treat INSERT-from-multiple-VALUES as its own special case.

This implementation works by applying transformAssignedExpr to the VALUES
entries as before, and then stripping off any ArrayRefs or FieldStores it
adds.  With lots of VALUES rows it would be noticeably more efficient to
not add those nodes in the first place.  But that's just an optimization
not a bug fix, and there doesn't seem to be any good way to do it without
significant refactoring.  (A non-invasive answer would be to apply
transformAssignedExpr + stripping to just the first VALUES row, and then
just forcibly cast remaining rows to the same data types exposed in the
first row.  But this way would lead to different, not-INSERT-specific
errors being reported in casting failure cases, so it doesn't seem very
nice.)  So leave that for later; this patch at least isn't making the
per-row parsing work worse, and it does make the finished parsetree
smaller, saving rewriter and planner work.

Catversion bump because stored rules containing such INSERTs would need
to change.  Because of that, no back-patch, even though this is a very
long-standing bug.

Report: <20160727005725.7438.26021@wrigleys.postgresql.org>
Discussion: <9578.1469645245@sss.pgh.pa.us>
parent ef1b5af8
......@@ -51,7 +51,8 @@ post_parse_analyze_hook_type post_parse_analyze_hook = NULL;
static Query *transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt);
static Query *transformInsertStmt(ParseState *pstate, InsertStmt *stmt);
static List *transformInsertRow(ParseState *pstate, List *exprlist,
List *stmtcols, List *icolumns, List *attrnos);
List *stmtcols, List *icolumns, List *attrnos,
bool strip_indirection);
static OnConflictExpr *transformOnConflictClause(ParseState *pstate,
OnConflictClause *onConflictClause);
static int count_rowexpr_columns(ParseState *pstate, Node *expr);
......@@ -619,7 +620,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
/* Prepare row for assignment to target table */
exprList = transformInsertRow(pstate, exprList,
stmt->cols,
icolumns, attrnos);
icolumns, attrnos,
false);
}
else if (list_length(selectStmt->valuesLists) > 1)
{
......@@ -663,10 +665,20 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
exprLocation((Node *) sublist))));
}
/* Prepare row for assignment to target table */
/*
* Prepare row for assignment to target table. We process any
* indirection on the target column specs normally but then strip
* off the resulting field/array assignment nodes, since we don't
* want the parsed statement to contain copies of those in each
* VALUES row. (It's annoying to have to transform the
* indirection specs over and over like this, but avoiding it
* would take some really messy refactoring of
* transformAssignmentIndirection.)
*/
sublist = transformInsertRow(pstate, sublist,
stmt->cols,
icolumns, attrnos);
icolumns, attrnos,
true);
/*
* We must assign collations now because assign_query_collations
......@@ -717,6 +729,14 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
* Generate list of Vars referencing the RTE
*/
expandRTE(rte, rtr->rtindex, 0, -1, false, NULL, &exprList);
/*
* Re-apply any indirection on the target column specs to the Vars
*/
exprList = transformInsertRow(pstate, exprList,
stmt->cols,
icolumns, attrnos,
false);
}
else
{
......@@ -739,7 +759,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
/* Prepare row for assignment to target table */
exprList = transformInsertRow(pstate, exprList,
stmt->cols,
icolumns, attrnos);
icolumns, attrnos,
false);
}
/*
......@@ -808,12 +829,17 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt)
/*
* Prepare an INSERT row for assignment to the target table.
*
* The row might be either a VALUES row, or variables referencing a
* sub-SELECT output.
* exprlist: transformed expressions for source values; these might come from
* a VALUES row, or be Vars referencing a sub-SELECT or VALUES RTE output.
* stmtcols: original target-columns spec for INSERT (we just test for NIL)
* icolumns: effective target-columns spec (list of ResTarget)
* attrnos: integer column numbers (must be same length as icolumns)
* strip_indirection: if true, remove any field/array assignment nodes
*/
static List *
transformInsertRow(ParseState *pstate, List *exprlist,
List *stmtcols, List *icolumns, List *attrnos)
List *stmtcols, List *icolumns, List *attrnos,
bool strip_indirection)
{
List *result;
ListCell *lc;
......@@ -879,6 +905,29 @@ transformInsertRow(ParseState *pstate, List *exprlist,
col->indirection,
col->location);
if (strip_indirection)
{
while (expr)
{
if (IsA(expr, FieldStore))
{
FieldStore *fstore = (FieldStore *) expr;
expr = (Expr *) linitial(fstore->newvals);
}
else if (IsA(expr, ArrayRef))
{
ArrayRef *aref = (ArrayRef *) expr;
if (aref->refassgnexpr == NULL)
break;
expr = aref->refassgnexpr;
}
else
break;
}
}
result = lappend(result, expr);
icols = lnext(icols);
......
......@@ -438,8 +438,7 @@ static void get_tablesample_def(TableSampleClause *tablesample,
deparse_context *context);
static void get_opclass_name(Oid opclass, Oid actual_datatype,
StringInfo buf);
static Node *processIndirection(Node *node, deparse_context *context,
bool printit);
static Node *processIndirection(Node *node, deparse_context *context);
static void printSubscripts(ArrayRef *aref, deparse_context *context);
static char *get_relation_name(Oid relid);
static char *generate_relation_name(Oid relid, List *namespaces);
......@@ -4551,11 +4550,9 @@ get_values_def(List *values_lists, deparse_context *context)
appendStringInfoChar(buf, ',');
/*
* Strip any top-level nodes representing indirection assignments,
* then print the result. Whole-row Vars need special treatment.
* Print the value. Whole-row Vars need special treatment.
*/
get_rule_expr_toplevel(processIndirection(col, context, false),
context, false);
get_rule_expr_toplevel(col, context, false);
}
appendStringInfoChar(buf, ')');
}
......@@ -5512,7 +5509,6 @@ get_insert_query_def(Query *query, deparse_context *context)
RangeTblEntry *values_rte = NULL;
RangeTblEntry *rte;
char *sep;
ListCell *values_cell;
ListCell *l;
List *strippedexprs;
......@@ -5563,17 +5559,9 @@ get_insert_query_def(Query *query, deparse_context *context)
quote_identifier(rte->alias->aliasname));
/*
* Add the insert-column-names list. To handle indirection properly, we
* need to look for indirection nodes in the top targetlist (if it's
* INSERT ... SELECT or INSERT ... single VALUES), or in the first
* expression list of the VALUES RTE (if it's INSERT ... multi VALUES). We
* assume that all the expression lists will have similar indirection in
* the latter case.
* Add the insert-column-names list. Any indirection decoration needed on
* the column names can be inferred from the top targetlist.
*/
if (values_rte)
values_cell = list_head((List *) linitial(values_rte->values_lists));
else
values_cell = NULL;
strippedexprs = NIL;
sep = "";
if (query->targetList)
......@@ -5599,20 +5587,14 @@ get_insert_query_def(Query *query, deparse_context *context)
/*
* Print any indirection needed (subfields or subscripts), and strip
* off the top-level nodes representing the indirection assignments.
* Add the stripped expressions to strippedexprs. (If it's a
* single-VALUES statement, the stripped expressions are the VALUES to
* print below. Otherwise they're just Vars and not really
* interesting.)
*/
if (values_cell)
{
/* we discard the stripped expression in this case */
processIndirection((Node *) lfirst(values_cell), context, true);
values_cell = lnext(values_cell);
}
else
{
/* we keep a list of the stripped expressions in this case */
strippedexprs = lappend(strippedexprs,
processIndirection((Node *) tle->expr,
context, true));
}
strippedexprs = lappend(strippedexprs,
processIndirection((Node *) tle->expr,
context));
}
if (query->targetList)
appendStringInfoString(buf, ") ");
......@@ -5891,7 +5873,7 @@ get_update_query_targetlist_def(Query *query, List *targetList,
* Print any indirection needed (subfields or subscripts), and strip
* off the top-level nodes representing the indirection assignments.
*/
expr = processIndirection((Node *) tle->expr, context, true);
expr = processIndirection((Node *) tle->expr, context);
/*
* If we're in a multiassignment, skip printing anything more, unless
......@@ -7296,7 +7278,7 @@ get_rule_expr(Node *node, deparse_context *context,
* subscripting in immediate descendants. It returns the
* RHS expr that is actually being "assigned".
*/
refassgnexpr = processIndirection(node, context, true);
refassgnexpr = processIndirection(node, context);
appendStringInfoString(buf, " := ");
get_rule_expr(refassgnexpr, context, showimplicit);
}
......@@ -9561,12 +9543,12 @@ get_opclass_name(Oid opclass, Oid actual_datatype,
* processIndirection - take care of array and subfield assignment
*
* We strip any top-level FieldStore or assignment ArrayRef nodes that
* appear in the input, and return the subexpression that's to be assigned.
* If printit is true, we also print out the appropriate decoration for the
* base column name (that the caller just printed).
* appear in the input, printing them as decoration for the base column
* name (which we assume the caller just printed). Return the subexpression
* that's to be assigned.
*/
static Node *
processIndirection(Node *node, deparse_context *context, bool printit)
processIndirection(Node *node, deparse_context *context)
{
StringInfo buf = context->buf;
......@@ -9594,8 +9576,7 @@ processIndirection(Node *node, deparse_context *context, bool printit)
Assert(list_length(fstore->fieldnums) == 1);
fieldname = get_relid_attribute_name(typrelid,
linitial_int(fstore->fieldnums));
if (printit)
appendStringInfo(buf, ".%s", quote_identifier(fieldname));
appendStringInfo(buf, ".%s", quote_identifier(fieldname));
/*
* We ignore arg since it should be an uninteresting reference to
......@@ -9609,8 +9590,7 @@ processIndirection(Node *node, deparse_context *context, bool printit)
if (aref->refassgnexpr == NULL)
break;
if (printit)
printSubscripts(aref, context);
printSubscripts(aref, context);
/*
* We ignore refexpr since it should be an uninteresting reference
......
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 201608021
#define CATALOG_VERSION_NO 201608031
#endif
......@@ -81,3 +81,82 @@ select col1, col2, char_length(col3) from inserttest;
(8 rows)
drop table inserttest;
--
-- check indirection (field/array assignment), cf bug #14265
--
-- these tests are aware that transformInsertStmt has 3 separate code paths
--
create type insert_test_type as (if1 int, if2 text[]);
create table inserttest (f1 int, f2 int[],
f3 insert_test_type, f4 insert_test_type[]);
insert into inserttest (f2[1], f2[2]) values (1,2);
insert into inserttest (f2[1], f2[2]) values (3,4), (5,6);
insert into inserttest (f2[1], f2[2]) select 7,8;
insert into inserttest (f2[1], f2[2]) values (1,default); -- not supported
ERROR: cannot set an array element to DEFAULT
LINE 1: insert into inserttest (f2[1], f2[2]) values (1,default);
^
insert into inserttest (f3.if1, f3.if2) values (1,array['foo']);
insert into inserttest (f3.if1, f3.if2) values (1,'{foo}'), (2,'{bar}');
insert into inserttest (f3.if1, f3.if2) select 3, '{baz,quux}';
insert into inserttest (f3.if1, f3.if2) values (1,default); -- not supported
ERROR: cannot set a subfield to DEFAULT
LINE 1: insert into inserttest (f3.if1, f3.if2) values (1,default);
^
insert into inserttest (f3.if2[1], f3.if2[2]) values ('foo', 'bar');
insert into inserttest (f3.if2[1], f3.if2[2]) values ('foo', 'bar'), ('baz', 'quux');
insert into inserttest (f3.if2[1], f3.if2[2]) select 'bear', 'beer';
insert into inserttest (f4[1].if2[1], f4[1].if2[2]) values ('foo', 'bar');
insert into inserttest (f4[1].if2[1], f4[1].if2[2]) values ('foo', 'bar'), ('baz', 'quux');
insert into inserttest (f4[1].if2[1], f4[1].if2[2]) select 'bear', 'beer';
select * from inserttest;
f1 | f2 | f3 | f4
----+-------+------------------+------------------------
| {1,2} | |
| {3,4} | |
| {5,6} | |
| {7,8} | |
| | (1,{foo}) |
| | (1,{foo}) |
| | (2,{bar}) |
| | (3,"{baz,quux}") |
| | (,"{foo,bar}") |
| | (,"{foo,bar}") |
| | (,"{baz,quux}") |
| | (,"{bear,beer}") |
| | | {"(,\"{foo,bar}\")"}
| | | {"(,\"{foo,bar}\")"}
| | | {"(,\"{baz,quux}\")"}
| | | {"(,\"{bear,beer}\")"}
(16 rows)
-- also check reverse-listing
create table inserttest2 (f1 bigint, f2 text);
create rule irule1 as on insert to inserttest2 do also
insert into inserttest (f3.if2[1], f3.if2[2])
values (new.f1,new.f2);
create rule irule2 as on insert to inserttest2 do also
insert into inserttest (f4[1].if1, f4[1].if2[2])
values (1,'fool'),(new.f1,new.f2);
create rule irule3 as on insert to inserttest2 do also
insert into inserttest (f4[1].if1, f4[1].if2[2])
select new.f1, new.f2;
\d+ inserttest2
Table "public.inserttest2"
Column | Type | Modifiers | Storage | Stats target | Description
--------+--------+-----------+----------+--------------+-------------
f1 | bigint | | plain | |
f2 | text | | extended | |
Rules:
irule1 AS
ON INSERT TO inserttest2 DO INSERT INTO inserttest (f3.if2[1], f3.if2[2])
VALUES (new.f1, new.f2)
irule2 AS
ON INSERT TO inserttest2 DO INSERT INTO inserttest (f4[1].if1, f4[1].if2[2]) VALUES (1,'fool'::text), (new.f1,new.f2)
irule3 AS
ON INSERT TO inserttest2 DO INSERT INTO inserttest (f4[1].if1, f4[1].if2[2]) SELECT new.f1,
new.f2
drop table inserttest2;
drop table inserttest;
drop type insert_test_type;
......@@ -36,3 +36,51 @@ insert into inserttest values(30, 50, repeat('x', 10000));
select col1, col2, char_length(col3) from inserttest;
drop table inserttest;
--
-- check indirection (field/array assignment), cf bug #14265
--
-- these tests are aware that transformInsertStmt has 3 separate code paths
--
create type insert_test_type as (if1 int, if2 text[]);
create table inserttest (f1 int, f2 int[],
f3 insert_test_type, f4 insert_test_type[]);
insert into inserttest (f2[1], f2[2]) values (1,2);
insert into inserttest (f2[1], f2[2]) values (3,4), (5,6);
insert into inserttest (f2[1], f2[2]) select 7,8;
insert into inserttest (f2[1], f2[2]) values (1,default); -- not supported
insert into inserttest (f3.if1, f3.if2) values (1,array['foo']);
insert into inserttest (f3.if1, f3.if2) values (1,'{foo}'), (2,'{bar}');
insert into inserttest (f3.if1, f3.if2) select 3, '{baz,quux}';
insert into inserttest (f3.if1, f3.if2) values (1,default); -- not supported
insert into inserttest (f3.if2[1], f3.if2[2]) values ('foo', 'bar');
insert into inserttest (f3.if2[1], f3.if2[2]) values ('foo', 'bar'), ('baz', 'quux');
insert into inserttest (f3.if2[1], f3.if2[2]) select 'bear', 'beer';
insert into inserttest (f4[1].if2[1], f4[1].if2[2]) values ('foo', 'bar');
insert into inserttest (f4[1].if2[1], f4[1].if2[2]) values ('foo', 'bar'), ('baz', 'quux');
insert into inserttest (f4[1].if2[1], f4[1].if2[2]) select 'bear', 'beer';
select * from inserttest;
-- also check reverse-listing
create table inserttest2 (f1 bigint, f2 text);
create rule irule1 as on insert to inserttest2 do also
insert into inserttest (f3.if2[1], f3.if2[2])
values (new.f1,new.f2);
create rule irule2 as on insert to inserttest2 do also
insert into inserttest (f4[1].if1, f4[1].if2[2])
values (1,'fool'),(new.f1,new.f2);
create rule irule3 as on insert to inserttest2 do also
insert into inserttest (f4[1].if1, f4[1].if2[2])
select new.f1, new.f2;
\d+ inserttest2
drop table inserttest2;
drop table inserttest;
drop type insert_test_type;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment