Commit 92e38182 authored by Teodor Sigaev's avatar Teodor Sigaev

COPY (INSERT/UPDATE/DELETE .. RETURNING ..)

Attached is a patch for being able to do COPY (query) without a CTE.

Author: Marko Tiikkaja
Review: Michael Paquier
parent 0da3a9be
...@@ -112,10 +112,17 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable ...@@ -112,10 +112,17 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
<term><replaceable class="parameter">query</replaceable></term> <term><replaceable class="parameter">query</replaceable></term>
<listitem> <listitem>
<para> <para>
A <xref linkend="sql-select"> or A <xref linkend="sql-select">, <xref linkend="sql-values">,
<xref linkend="sql-values"> command <xref linkend="sql-insert">, <xref linkend="sql-update"> or
whose results are to be copied. <xref linkend="sql-delete"> command whose results are to be
Note that parentheses are required around the query. copied. Note that parentheses are required around the query.
</para>
<para>
For <command>INSERT</>, <command>UPDATE</> and
<command>DELETE</> queries a RETURNING clause must be provided,
and the target relation must not have a conditional rule, nor
an <literal>ALSO</> rule, nor an <literal>INSTEAD</> rule
that expands to multiple statements.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
......
...@@ -201,7 +201,7 @@ typedef struct CopyStateData ...@@ -201,7 +201,7 @@ typedef struct CopyStateData
int raw_buf_len; /* total # of bytes stored */ int raw_buf_len; /* total # of bytes stored */
} CopyStateData; } CopyStateData;
/* DestReceiver for COPY (SELECT) TO */ /* DestReceiver for COPY (query) TO */
typedef struct typedef struct
{ {
DestReceiver pub; /* publicly-known function pointers */ DestReceiver pub; /* publicly-known function pointers */
...@@ -772,7 +772,8 @@ CopyLoadRawBuf(CopyState cstate) ...@@ -772,7 +772,8 @@ CopyLoadRawBuf(CopyState cstate)
* *
* Either unload or reload contents of table <relation>, depending on <from>. * Either unload or reload contents of table <relation>, depending on <from>.
* (<from> = TRUE means we are inserting into the table.) In the "TO" case * (<from> = TRUE means we are inserting into the table.) In the "TO" case
* we also support copying the output of an arbitrary SELECT query. * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
* or DELETE query.
* *
* If <pipe> is false, transfer is between the table and the file named * If <pipe> is false, transfer is between the table and the file named
* <filename>. Otherwise, transfer is between the table and our regular * <filename>. Otherwise, transfer is between the table and our regular
...@@ -1374,11 +1375,11 @@ BeginCopy(bool is_from, ...@@ -1374,11 +1375,11 @@ BeginCopy(bool is_from,
Assert(!is_from); Assert(!is_from);
cstate->rel = NULL; cstate->rel = NULL;
/* Don't allow COPY w/ OIDs from a select */ /* Don't allow COPY w/ OIDs from a query */
if (cstate->oids) if (cstate->oids)
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY (SELECT) WITH OIDS is not supported"))); errmsg("COPY (query) WITH OIDS is not supported")));
/* /*
* Run parse analysis and rewrite. Note this also acquires sufficient * Run parse analysis and rewrite. Note this also acquires sufficient
...@@ -1393,9 +1394,36 @@ BeginCopy(bool is_from, ...@@ -1393,9 +1394,36 @@ BeginCopy(bool is_from,
rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query), rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
queryString, NULL, 0); queryString, NULL, 0);
/* We don't expect more or less than one result query */ /* check that we got back something we can work with */
if (list_length(rewritten) != 1) if (rewritten == NIL)
elog(ERROR, "unexpected rewrite result"); {
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
}
else if (list_length(rewritten) > 1)
{
ListCell *lc;
/* examine queries to determine which error message to issue */
foreach(lc, rewritten)
{
Query *q = (Query *) lfirst(lc);
if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("conditional DO INSTEAD rules are not supported for COPY")));
if (q->querySource == QSRC_NON_INSTEAD_RULE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("DO ALSO rules are not supported for the COPY")));
}
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
}
query = (Query *) linitial(rewritten); query = (Query *) linitial(rewritten);
...@@ -1406,9 +1434,24 @@ BeginCopy(bool is_from, ...@@ -1406,9 +1434,24 @@ BeginCopy(bool is_from,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY (SELECT INTO) is not supported"))); errmsg("COPY (SELECT INTO) is not supported")));
Assert(query->commandType == CMD_SELECT);
Assert(query->utilityStmt == NULL); Assert(query->utilityStmt == NULL);
/*
* Similarly the grammar doesn't enforce the presence of a RETURNING
* clause, but this is required here.
*/
if (query->commandType != CMD_SELECT &&
query->returningList == NIL)
{
Assert(query->commandType == CMD_INSERT ||
query->commandType == CMD_UPDATE ||
query->commandType == CMD_DELETE);
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY query must have a RETURNING clause")));
}
/* plan the query */ /* plan the query */
plan = pg_plan_query(query, 0, NULL); plan = pg_plan_query(query, 0, NULL);
......
...@@ -2561,9 +2561,12 @@ ClosePortalStmt: ...@@ -2561,9 +2561,12 @@ ClosePortalStmt:
* *
* QUERY : * QUERY :
* COPY relname [(columnList)] FROM/TO file [WITH] [(options)] * COPY relname [(columnList)] FROM/TO file [WITH] [(options)]
* COPY ( SELECT ... ) TO file [WITH] [(options)] * COPY ( query ) TO file [WITH] [(options)]
* *
* where 'file' can be one of: * where 'query' can be one of:
* { SELECT | UPDATE | INSERT | DELETE }
*
* and 'file' can be one of:
* { PROGRAM 'command' | STDIN | STDOUT | 'filename' } * { PROGRAM 'command' | STDIN | STDOUT | 'filename' }
* *
* In the preferred syntax the options are comma-separated * In the preferred syntax the options are comma-separated
...@@ -2574,7 +2577,7 @@ ClosePortalStmt: ...@@ -2574,7 +2577,7 @@ ClosePortalStmt:
* COPY [ BINARY ] table [ WITH OIDS ] FROM/TO file * COPY [ BINARY ] table [ WITH OIDS ] FROM/TO file
* [ [ USING ] DELIMITERS 'delimiter' ] ] * [ [ USING ] DELIMITERS 'delimiter' ] ]
* [ WITH NULL AS 'null string' ] * [ WITH NULL AS 'null string' ]
* This option placement is not supported with COPY (SELECT...). * This option placement is not supported with COPY (query...).
* *
*****************************************************************************/ *****************************************************************************/
...@@ -2607,16 +2610,16 @@ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids ...@@ -2607,16 +2610,16 @@ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids
n->options = list_concat(n->options, $11); n->options = list_concat(n->options, $11);
$$ = (Node *)n; $$ = (Node *)n;
} }
| COPY select_with_parens TO opt_program copy_file_name opt_with copy_options | COPY '(' PreparableStmt ')' TO opt_program copy_file_name opt_with copy_options
{ {
CopyStmt *n = makeNode(CopyStmt); CopyStmt *n = makeNode(CopyStmt);
n->relation = NULL; n->relation = NULL;
n->query = $2; n->query = $3;
n->attlist = NIL; n->attlist = NIL;
n->is_from = false; n->is_from = false;
n->is_program = $4; n->is_program = $6;
n->filename = $5; n->filename = $7;
n->options = $7; n->options = $9;
if (n->is_program && n->filename == NULL) if (n->is_program && n->filename == NULL)
ereport(ERROR, ereport(ERROR,
......
...@@ -32,10 +32,12 @@ ...@@ -32,10 +32,12 @@
* *
* The documented syntax is: * The documented syntax is:
* \copy tablename [(columnlist)] from|to filename [options] * \copy tablename [(columnlist)] from|to filename [options]
* \copy ( select stmt ) to filename [options] * \copy ( query stmt ) to filename [options]
* *
* where 'filename' can be one of the following: * where 'filename' can be one of the following:
* '<file path>' | PROGRAM '<command>' | stdin | stdout | pstdout | pstdout * '<file path>' | PROGRAM '<command>' | stdin | stdout | pstdout | pstdout
* and 'query' can be one of the following:
* SELECT | UPDATE | INSERT | DELETE
* *
* An undocumented fact is that you can still write BINARY before the * An undocumented fact is that you can still write BINARY before the
* tablename; this is a hangover from the pre-7.3 syntax. The options * tablename; this is a hangover from the pre-7.3 syntax. The options
...@@ -118,7 +120,7 @@ parse_slash_copy(const char *args) ...@@ -118,7 +120,7 @@ parse_slash_copy(const char *args)
goto error; goto error;
} }
/* Handle COPY (SELECT) case */ /* Handle COPY (query) case */
if (token[0] == '(') if (token[0] == '(')
{ {
int parens = 1; int parens = 1;
......
...@@ -1680,7 +1680,8 @@ typedef struct CopyStmt ...@@ -1680,7 +1680,8 @@ typedef struct CopyStmt
{ {
NodeTag type; NodeTag type;
RangeVar *relation; /* the relation to copy */ RangeVar *relation; /* the relation to copy */
Node *query; /* the SELECT query to copy */ Node *query; /* the query (SELECT or DML statement with
* RETURNING) to copy */
List *attlist; /* List of column names (as Strings), or NIL List *attlist; /* List of column names (as Strings), or NIL
* for all columns */ * for all columns */
bool is_from; /* TO or FROM */ bool is_from; /* TO or FROM */
......
--
-- Test cases for COPY (INSERT/UPDATE/DELETE) TO
--
create table copydml_test (id serial, t text);
insert into copydml_test (t) values ('a');
insert into copydml_test (t) values ('b');
insert into copydml_test (t) values ('c');
insert into copydml_test (t) values ('d');
insert into copydml_test (t) values ('e');
--
-- Test COPY (insert/update/delete ...)
--
copy (insert into copydml_test (t) values ('f') returning id) to stdout;
6
copy (update copydml_test set t = 'g' where t = 'f' returning id) to stdout;
6
copy (delete from copydml_test where t = 'g' returning id) to stdout;
6
--
-- Test \copy (insert/update/delete ...)
--
\copy (insert into copydml_test (t) values ('f') returning id) to stdout;
7
\copy (update copydml_test set t = 'g' where t = 'f' returning id) to stdout;
7
\copy (delete from copydml_test where t = 'g' returning id) to stdout;
7
-- Error cases
copy (insert into copydml_test default values) to stdout;
ERROR: COPY query must have a RETURNING clause
copy (update copydml_test set t = 'g') to stdout;
ERROR: COPY query must have a RETURNING clause
copy (delete from copydml_test) to stdout;
ERROR: COPY query must have a RETURNING clause
create rule qqq as on insert to copydml_test do instead nothing;
copy (insert into copydml_test default values) to stdout;
ERROR: DO INSTEAD NOTHING rules are not supported for COPY
drop rule qqq on copydml_test;
create rule qqq as on insert to copydml_test do also delete from copydml_test;
copy (insert into copydml_test default values) to stdout;
ERROR: DO ALSO rules are not supported for the COPY
drop rule qqq on copydml_test;
create rule qqq as on insert to copydml_test do instead (delete from copydml_test; delete from copydml_test);
copy (insert into copydml_test default values) to stdout;
ERROR: multi-statement DO INSTEAD rules are not supported for COPY
drop rule qqq on copydml_test;
create rule qqq as on insert to copydml_test where new.t <> 'f' do instead delete from copydml_test;
copy (insert into copydml_test default values) to stdout;
ERROR: conditional DO INSTEAD rules are not supported for COPY
drop rule qqq on copydml_test;
create rule qqq as on update to copydml_test do instead nothing;
copy (update copydml_test set t = 'f') to stdout;
ERROR: DO INSTEAD NOTHING rules are not supported for COPY
drop rule qqq on copydml_test;
create rule qqq as on update to copydml_test do also delete from copydml_test;
copy (update copydml_test set t = 'f') to stdout;
ERROR: DO ALSO rules are not supported for the COPY
drop rule qqq on copydml_test;
create rule qqq as on update to copydml_test do instead (delete from copydml_test; delete from copydml_test);
copy (update copydml_test set t = 'f') to stdout;
ERROR: multi-statement DO INSTEAD rules are not supported for COPY
drop rule qqq on copydml_test;
create rule qqq as on update to copydml_test where new.t <> 'f' do instead delete from copydml_test;
copy (update copydml_test set t = 'f') to stdout;
ERROR: conditional DO INSTEAD rules are not supported for COPY
drop rule qqq on copydml_test;
create rule qqq as on delete to copydml_test do instead nothing;
copy (delete from copydml_test) to stdout;
ERROR: DO INSTEAD NOTHING rules are not supported for COPY
drop rule qqq on copydml_test;
create rule qqq as on delete to copydml_test do also insert into copydml_test default values;
copy (delete from copydml_test) to stdout;
ERROR: DO ALSO rules are not supported for the COPY
drop rule qqq on copydml_test;
create rule qqq as on delete to copydml_test do instead (insert into copydml_test default values; insert into copydml_test default values);
copy (delete from copydml_test) to stdout;
ERROR: multi-statement DO INSTEAD rules are not supported for COPY
drop rule qqq on copydml_test;
create rule qqq as on delete to copydml_test where old.t <> 'f' do instead insert into copydml_test default values;
copy (delete from copydml_test) to stdout;
ERROR: conditional DO INSTEAD rules are not supported for COPY
drop rule qqq on copydml_test;
-- triggers
create function qqq_trig() returns trigger as $$
begin
if tg_op in ('INSERT', 'UPDATE') then
raise notice '% %', tg_op, new.id;
return new;
else
raise notice '% %', tg_op, old.id;
return old;
end if;
end
$$ language plpgsql;
create trigger qqqbef before insert or update or delete on copydml_test
for each row execute procedure qqq_trig();
create trigger qqqaf after insert or update or delete on copydml_test
for each row execute procedure qqq_trig();
copy (insert into copydml_test (t) values ('f') returning id) to stdout;
NOTICE: INSERT 8
8
NOTICE: INSERT 8
copy (update copydml_test set t = 'g' where t = 'f' returning id) to stdout;
NOTICE: UPDATE 8
8
NOTICE: UPDATE 8
copy (delete from copydml_test where t = 'g' returning id) to stdout;
NOTICE: DELETE 8
8
NOTICE: DELETE 8
drop table copydml_test;
drop function qqq_trig();
...@@ -48,7 +48,7 @@ test: create_function_2 ...@@ -48,7 +48,7 @@ test: create_function_2
# execute two copy tests parallel, to check that copy itself # execute two copy tests parallel, to check that copy itself
# is concurrent safe. # is concurrent safe.
# ---------- # ----------
test: copy copyselect test: copy copyselect copydml
# ---------- # ----------
# More groups of parallel tests # More groups of parallel tests
......
...@@ -57,6 +57,7 @@ test: create_table ...@@ -57,6 +57,7 @@ test: create_table
test: create_function_2 test: create_function_2
test: copy test: copy
test: copyselect test: copyselect
test: copydml
test: create_misc test: create_misc
test: create_operator test: create_operator
test: create_index test: create_index
......
--
-- Test cases for COPY (INSERT/UPDATE/DELETE) TO
--
create table copydml_test (id serial, t text);
insert into copydml_test (t) values ('a');
insert into copydml_test (t) values ('b');
insert into copydml_test (t) values ('c');
insert into copydml_test (t) values ('d');
insert into copydml_test (t) values ('e');
--
-- Test COPY (insert/update/delete ...)
--
copy (insert into copydml_test (t) values ('f') returning id) to stdout;
copy (update copydml_test set t = 'g' where t = 'f' returning id) to stdout;
copy (delete from copydml_test where t = 'g' returning id) to stdout;
--
-- Test \copy (insert/update/delete ...)
--
\copy (insert into copydml_test (t) values ('f') returning id) to stdout;
\copy (update copydml_test set t = 'g' where t = 'f' returning id) to stdout;
\copy (delete from copydml_test where t = 'g' returning id) to stdout;
-- Error cases
copy (insert into copydml_test default values) to stdout;
copy (update copydml_test set t = 'g') to stdout;
copy (delete from copydml_test) to stdout;
create rule qqq as on insert to copydml_test do instead nothing;
copy (insert into copydml_test default values) to stdout;
drop rule qqq on copydml_test;
create rule qqq as on insert to copydml_test do also delete from copydml_test;
copy (insert into copydml_test default values) to stdout;
drop rule qqq on copydml_test;
create rule qqq as on insert to copydml_test do instead (delete from copydml_test; delete from copydml_test);
copy (insert into copydml_test default values) to stdout;
drop rule qqq on copydml_test;
create rule qqq as on insert to copydml_test where new.t <> 'f' do instead delete from copydml_test;
copy (insert into copydml_test default values) to stdout;
drop rule qqq on copydml_test;
create rule qqq as on update to copydml_test do instead nothing;
copy (update copydml_test set t = 'f') to stdout;
drop rule qqq on copydml_test;
create rule qqq as on update to copydml_test do also delete from copydml_test;
copy (update copydml_test set t = 'f') to stdout;
drop rule qqq on copydml_test;
create rule qqq as on update to copydml_test do instead (delete from copydml_test; delete from copydml_test);
copy (update copydml_test set t = 'f') to stdout;
drop rule qqq on copydml_test;
create rule qqq as on update to copydml_test where new.t <> 'f' do instead delete from copydml_test;
copy (update copydml_test set t = 'f') to stdout;
drop rule qqq on copydml_test;
create rule qqq as on delete to copydml_test do instead nothing;
copy (delete from copydml_test) to stdout;
drop rule qqq on copydml_test;
create rule qqq as on delete to copydml_test do also insert into copydml_test default values;
copy (delete from copydml_test) to stdout;
drop rule qqq on copydml_test;
create rule qqq as on delete to copydml_test do instead (insert into copydml_test default values; insert into copydml_test default values);
copy (delete from copydml_test) to stdout;
drop rule qqq on copydml_test;
create rule qqq as on delete to copydml_test where old.t <> 'f' do instead insert into copydml_test default values;
copy (delete from copydml_test) to stdout;
drop rule qqq on copydml_test;
-- triggers
create function qqq_trig() returns trigger as $$
begin
if tg_op in ('INSERT', 'UPDATE') then
raise notice '% %', tg_op, new.id;
return new;
else
raise notice '% %', tg_op, old.id;
return old;
end if;
end
$$ language plpgsql;
create trigger qqqbef before insert or update or delete on copydml_test
for each row execute procedure qqq_trig();
create trigger qqqaf after insert or update or delete on copydml_test
for each row execute procedure qqq_trig();
copy (insert into copydml_test (t) values ('f') returning id) to stdout;
copy (update copydml_test set t = 'g' where t = 'f' returning id) to stdout;
copy (delete from copydml_test where t = 'g' returning id) to stdout;
drop table copydml_test;
drop function qqq_trig();
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment