Commit 31f38174 authored by Tomas Vondra's avatar Tomas Vondra

Allow COPY FROM to filter data using WHERE conditions

Extends the COPY FROM command with a WHERE condition, which allows doing
various types of filtering while importing the data (random sampling,
condition on a data column, etc.).  Until now such filtering required
either preprocessing of the input data, or importing all data and then
filtering in the database. COPY FROM ... WHERE is an easy-to-use and
low-overhead alternative for most simple cases.

Author: Surafel Temesgen
Reviewed-by: Tomas Vondra, Masahiko Sawada, Lim Myungkyu
Discussion: https://www.postgresql.org/message-id/flat/CALAY4q_DdpWDuB5-Zyi-oTtO2uSk8pmy+dupiRe3AvAc++1imA@mail.gmail.com
parent d6ef7fe7
......@@ -25,6 +25,7 @@ PostgreSQL documentation
COPY <replaceable class="parameter">table_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ]
FROM { '<replaceable class="parameter">filename</replaceable>' | PROGRAM '<replaceable class="parameter">command</replaceable>' | STDIN }
[ [ WITH ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
[ WHERE <replaceable class="parameter">condition</replaceable> ]
COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ] | ( <replaceable class="parameter">query</replaceable> ) }
TO { '<replaceable class="parameter">filename</replaceable>' | PROGRAM '<replaceable class="parameter">command</replaceable>' | STDOUT }
......@@ -353,6 +354,32 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
</listitem>
</varlistentry>
<varlistentry>
<term><literal>WHERE</literal></term>
<listitem>
<para>
The optional <literal>WHERE</literal> clause has the general form
<synopsis>
WHERE <replaceable class="parameter">condition</replaceable>
</synopsis>
where <replaceable class="parameter">condition</replaceable> is
any expression that evaluates to a result of type
<type>boolean</type>. Any row that does not satisfy this
condition will not be inserted to the table. A row satisfies the
condition if it returns true when the actual row values are
substituted for any variable references.
</para>
<para>
Currently, subqueries are not allowed in <literal>WHERE</literal>
expressions, and the evaluation does not see any changes made by the
<command>COPY</command> itself (this matters when the expression
contains calls to <literal>VOLATILE</literal> functions).
</para>
</listitem>
</varlistentry>
</variablelist>
</refsect1>
......
......@@ -39,7 +39,11 @@
#include "miscadmin.h"
#include "optimizer/clauses.h"
#include "optimizer/planner.h"
#include "optimizer/prep.h"
#include "nodes/makefuncs.h"
#include "parser/parse_coerce.h"
#include "parser/parse_collate.h"
#include "parser/parse_expr.h"
#include "parser/parse_relation.h"
#include "port/pg_bswap.h"
#include "rewrite/rewriteHandler.h"
......@@ -149,6 +153,7 @@ typedef struct CopyStateData
bool convert_selectively; /* do selective binary conversion? */
List *convert_select; /* list of column names (can be NIL) */
bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
Node *whereClause; /* WHERE condition (or NULL) */
/* these are just for error messages, see CopyFromErrorCallback */
const char *cur_relname; /* table name for error messages */
......@@ -179,6 +184,7 @@ typedef struct CopyStateData
ExprState **defexprs; /* array of default att expressions */
bool volatile_defexprs; /* is any of defexprs volatile? */
List *range_table;
ExprState *qualexpr;
TransitionCaptureState *transition_capture;
......@@ -800,6 +806,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
Relation rel;
Oid relid;
RawStmt *query = NULL;
Node *whereClause = NULL;
/*
* Disallow COPY to/from file or program except to users with the
......@@ -853,6 +860,26 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
NULL, false, false);
rte->requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
if (stmt->whereClause)
{
/* add rte to column namespace */
addRTEtoQuery(pstate, rte, false, true, true);
/* Transform the raw expression tree */
whereClause = transformExpr(pstate, stmt->whereClause, EXPR_KIND_COPY_WHERE);
/* Make sure it yields a boolean result. */
whereClause = coerce_to_boolean(pstate, whereClause, "WHERE");
/* we have to fix its collations too */
assign_expr_collations(pstate, whereClause);
whereClause = eval_const_expressions(NULL, whereClause);
whereClause = (Node *) canonicalize_qual((Expr *) whereClause, false);
whereClause = (Node *) make_ands_implicit((Expr *) whereClause);
}
tupDesc = RelationGetDescr(rel);
attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
foreach(cur, attnums)
......@@ -1001,6 +1028,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
NULL, stmt->attlist, stmt->options);
cstate->whereClause = whereClause;
*processed = CopyFrom(cstate); /* copy from file to database */
EndCopyFrom(cstate);
}
......@@ -2535,6 +2563,10 @@ CopyFrom(CopyState cstate)
if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
proute = ExecSetupPartitionTupleRouting(NULL, cstate->rel);
if (cstate->whereClause)
cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
&mtstate->ps);
/*
* It's more efficient to prepare a bunch of tuples for insertion, and
* insert them in one heap_multi_insert() call, than call heap_insert()
......@@ -2580,6 +2612,16 @@ CopyFrom(CopyState cstate)
*/
insertMethod = CIM_SINGLE;
}
else if (cstate->whereClause != NULL ||
contain_volatile_functions(cstate->whereClause))
{
/*
* Can't support multi-inserts if there are any volatile funcation
* expressions in WHERE clause. Similarly to the trigger case above,
* such expressions may query the table we're inserting into.
*/
insertMethod = CIM_SINGLE;
}
else
{
/*
......@@ -2683,6 +2725,13 @@ CopyFrom(CopyState cstate)
slot = myslot;
ExecStoreHeapTuple(tuple, slot, false);
if (cstate->whereClause)
{
econtext->ecxt_scantuple = myslot;
if (!ExecQual(cstate->qualexpr, econtext))
continue;
}
/* Determine the partition to heap_insert the tuple into */
if (proute)
{
......
......@@ -3309,6 +3309,7 @@ _copyCopyStmt(const CopyStmt *from)
COPY_SCALAR_FIELD(is_program);
COPY_STRING_FIELD(filename);
COPY_NODE_FIELD(options);
COPY_NODE_FIELD(whereClause);
return newnode;
}
......
......@@ -1222,6 +1222,7 @@ _equalCopyStmt(const CopyStmt *a, const CopyStmt *b)
COMPARE_SCALAR_FIELD(is_program);
COMPARE_STRING_FIELD(filename);
COMPARE_NODE_FIELD(options);
COMPARE_NODE_FIELD(whereClause);
return true;
}
......
......@@ -2962,7 +2962,8 @@ ClosePortalStmt:
*****************************************************************************/
CopyStmt: COPY opt_binary qualified_name opt_column_list
copy_from opt_program copy_file_name copy_delimiter opt_with copy_options
copy_from opt_program copy_file_name copy_delimiter opt_with
copy_options where_clause
{
CopyStmt *n = makeNode(CopyStmt);
n->relation = $3;
......@@ -2971,6 +2972,7 @@ CopyStmt: COPY opt_binary qualified_name opt_column_list
n->is_from = $5;
n->is_program = $6;
n->filename = $7;
n->whereClause = $11;
if (n->is_program && n->filename == NULL)
ereport(ERROR,
......@@ -2978,6 +2980,12 @@ CopyStmt: COPY opt_binary qualified_name opt_column_list
errmsg("STDIN/STDOUT not allowed with PROGRAM"),
parser_errposition(@8)));
if (!n->is_from && n->whereClause != NULL)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("WHERE clause not allowed with COPY TO"),
parser_errposition(@11)));
n->options = NIL;
/* Concatenate user-supplied flags */
if ($2)
......
......@@ -523,6 +523,14 @@ check_agglevels_and_constraints(ParseState *pstate, Node *expr)
break;
case EXPR_KIND_COPY_WHERE:
if (isAgg)
err = _("aggregate functions are not allowed in COPY FROM WHERE conditions");
else
err = _("grouping operations are not allowed in COPY FROM WHERE conditions");
break;
/*
* There is intentionally no default: case here, so that the
* compiler will warn if we add a new ParseExprKind without
......@@ -902,6 +910,9 @@ transformWindowFuncCall(ParseState *pstate, WindowFunc *wfunc,
case EXPR_KIND_CALL_ARGUMENT:
err = _("window functions are not allowed in CALL arguments");
break;
case EXPR_KIND_COPY_WHERE:
err = _("window functions are not allowed in COPY FROM WHERE conditions");
break;
/*
* There is intentionally no default: case here, so that the
......
......@@ -1849,6 +1849,9 @@ transformSubLink(ParseState *pstate, SubLink *sublink)
case EXPR_KIND_CALL_ARGUMENT:
err = _("cannot use subquery in CALL argument");
break;
case EXPR_KIND_COPY_WHERE:
err = _("cannot use subquery in COPY FROM WHERE condition");
break;
/*
* There is intentionally no default: case here, so that the
......@@ -3475,6 +3478,8 @@ ParseExprKindName(ParseExprKind exprKind)
return "PARTITION BY";
case EXPR_KIND_CALL_ARGUMENT:
return "CALL";
case EXPR_KIND_COPY_WHERE:
return "WHERE";
/*
* There is intentionally no default: case here, so that the
......
......@@ -2370,6 +2370,9 @@ check_srf_call_placement(ParseState *pstate, Node *last_srf, int location)
case EXPR_KIND_CALL_ARGUMENT:
err = _("set-returning functions are not allowed in CALL arguments");
break;
case EXPR_KIND_COPY_WHERE:
err = _("set-returning functions are not allowed in COPY FROM WHERE conditions");
break;
/*
* There is intentionally no default: case here, so that the
......
......@@ -1969,6 +1969,7 @@ typedef struct CopyStmt
bool is_program; /* is 'filename' a program to popen? */
char *filename; /* filename, or NULL for STDIN/STDOUT */
List *options; /* List of DefElem nodes */
Node *whereClause; /* WHERE condition (or NULL) */
} CopyStmt;
/* ----------------------
......
......@@ -69,7 +69,8 @@ typedef enum ParseExprKind
EXPR_KIND_TRIGGER_WHEN, /* WHEN condition in CREATE TRIGGER */
EXPR_KIND_POLICY, /* USING or WITH CHECK expr in policy */
EXPR_KIND_PARTITION_EXPRESSION, /* PARTITION BY expression */
EXPR_KIND_CALL_ARGUMENT /* procedure argument in CALL */
EXPR_KIND_CALL_ARGUMENT, /* procedure argument in CALL */
EXPR_KIND_COPY_WHERE /* WHERE condition in COPY FROM */
} ParseExprKind;
......
......@@ -192,7 +192,7 @@ ECPG: where_or_current_clauseWHERECURRENT_POFcursor_name block
char *cursor_marker = $4[0] == ':' ? mm_strdup("$0") : $4;
$$ = cat_str(2,mm_strdup("where current of"), cursor_marker);
}
ECPG: CopyStmtCOPYopt_binaryqualified_nameopt_column_listcopy_fromopt_programcopy_file_namecopy_delimiteropt_withcopy_options addon
ECPG: CopyStmtCOPYopt_binaryqualified_nameopt_column_listcopy_fromopt_programcopy_file_namecopy_delimiteropt_withcopy_optionswhere_clause addon
if (strcmp($6, "from") == 0 &&
(strcmp($7, "stdin") == 0 || strcmp($7, "stdout") == 0))
mmerror(PARSE_ERROR, ET_WARNING, "COPY FROM STDIN is not implemented");
......
......@@ -49,6 +49,32 @@ CONTEXT: COPY x, line 1: "2002 232 40 50 60 70 80"
COPY x (b, c, d, e) from stdin delimiter ',' null 'x';
COPY x from stdin WITH DELIMITER AS ';' NULL AS '';
COPY x from stdin WITH DELIMITER AS ':' NULL AS E'\\X' ENCODING 'sql_ascii';
COPY x TO stdout WHERE a = 1;
ERROR: WHERE clause not allowed with COPY TO
LINE 1: COPY x TO stdout WHERE a = 1;
^
COPY x from stdin WHERE a = 50004;
COPY x from stdin WHERE a > 60003;
COPY x from stdin WHERE f > 60003;
ERROR: column "f" does not exist
LINE 1: COPY x from stdin WHERE f > 60003;
^
COPY x from stdin WHERE a = max(x.b);
ERROR: aggregate functions are not allowed in COPY FROM WHERE conditions
LINE 1: COPY x from stdin WHERE a = max(x.b);
^
COPY x from stdin WHERE a IN (SELECT 1 FROM x);
ERROR: cannot use subquery in COPY FROM WHERE condition
LINE 1: COPY x from stdin WHERE a IN (SELECT 1 FROM x);
^
COPY x from stdin WHERE a IN (generate_series(1,5));
ERROR: set-returning functions are not allowed in COPY FROM WHERE conditions
LINE 1: COPY x from stdin WHERE a IN (generate_series(1,5));
^
COPY x from stdin WHERE a = row_number() over(b);
ERROR: window functions are not allowed in COPY FROM WHERE conditions
LINE 1: COPY x from stdin WHERE a = row_number() over(b);
^
-- check results of copy in
SELECT * FROM x;
a | b | c | d | e
......@@ -73,12 +99,15 @@ SELECT * FROM x;
4006 | 6 | BackslashN | \N | before trigger fired
4007 | 7 | XX | XX | before trigger fired
4008 | 8 | Delimiter | : | before trigger fired
50004 | 25 | 35 | 45 | before trigger fired
60004 | 25 | 35 | 45 | before trigger fired
60005 | 26 | 36 | 46 | before trigger fired
1 | 1 | stuff | test_1 | after trigger fired
2 | 2 | stuff | test_2 | after trigger fired
3 | 3 | stuff | test_3 | after trigger fired
4 | 4 | stuff | test_4 | after trigger fired
5 | 5 | stuff | test_5 | after trigger fired
(25 rows)
(28 rows)
-- check copy out
COPY x TO stdout;
......@@ -102,6 +131,9 @@ COPY x TO stdout;
4006 6 BackslashN \\N before trigger fired
4007 7 XX XX before trigger fired
4008 8 Delimiter : before trigger fired
50004 25 35 45 before trigger fired
60004 25 35 45 before trigger fired
60005 26 36 46 before trigger fired
1 1 stuff test_1 after trigger fired
2 2 stuff test_2 after trigger fired
3 3 stuff test_3 after trigger fired
......@@ -128,6 +160,9 @@ N before trigger fired
BackslashN before trigger fired
XX before trigger fired
Delimiter before trigger fired
35 before trigger fired
35 before trigger fired
36 before trigger fired
stuff after trigger fired
stuff after trigger fired
stuff after trigger fired
......@@ -154,6 +189,9 @@ I'm null before trigger fired
6 before trigger fired
7 before trigger fired
8 before trigger fired
25 before trigger fired
25 before trigger fired
26 before trigger fired
1 after trigger fired
2 after trigger fired
3 after trigger fired
......
......@@ -95,6 +95,32 @@ COPY x from stdin WITH DELIMITER AS ':' NULL AS E'\\X' ENCODING 'sql_ascii';
4008:8:Delimiter:\::\:
\.
COPY x TO stdout WHERE a = 1;
COPY x from stdin WHERE a = 50004;
50003 24 34 44 54
50004 25 35 45 55
50005 26 36 46 56
\.
COPY x from stdin WHERE a > 60003;
60001 22 32 42 52
60002 23 33 43 53
60003 24 34 44 54
60004 25 35 45 55
60005 26 36 46 56
\.
COPY x from stdin WHERE f > 60003;
COPY x from stdin WHERE a = max(x.b);
COPY x from stdin WHERE a IN (SELECT 1 FROM x);
COPY x from stdin WHERE a IN (generate_series(1,5));
COPY x from stdin WHERE a = row_number() over(b);
-- check results of copy in
SELECT * FROM x;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment