Commit 85188ab8 authored by Tom Lane's avatar Tom Lane

Extend COPY to support COPY (SELECT ...) TO ...

Bernd Helmle
parent 0d506578
<!--
$PostgreSQL: pgsql/doc/src/sgml/ref/copy.sgml,v 1.74 2006/04/22 03:03:11 momjian Exp $
$PostgreSQL: pgsql/doc/src/sgml/ref/copy.sgml,v 1.75 2006/08/30 23:34:20 tgl Exp $
PostgreSQL documentation
-->
......@@ -33,7 +33,7 @@ COPY <replaceable class="parameter">tablename</replaceable> [ ( <replaceable cla
[ ESCAPE [ AS ] '<replaceable class="parameter">escape</replaceable>' ]
[ FORCE NOT NULL <replaceable class="parameter">column</replaceable> [, ...] ]
COPY <replaceable class="parameter">tablename</replaceable> [ ( <replaceable class="parameter">column</replaceable> [, ...] ) ]
COPY { <replaceable class="parameter">tablename</replaceable> [ ( <replaceable class="parameter">column</replaceable> [, ...] ) ] | ( <replaceable class="parameter">query</replaceable> ) }
TO { '<replaceable class="parameter">filename</replaceable>' | STDOUT }
[ [ WITH ]
[ BINARY ]
......@@ -57,7 +57,8 @@ COPY <replaceable class="parameter">tablename</replaceable> [ ( <replaceable cla
files. <command>COPY TO</command> copies the contents of a table
<emphasis>to</> a file, while <command>COPY FROM</command> copies
data <emphasis>from</> a file to a table (appending the data to
whatever is in the table already).
whatever is in the table already). <command>COPY TO</command>
can also copy the results of a <command>SELECT</> query.
</para>
<para>
......@@ -97,7 +98,17 @@ COPY <replaceable class="parameter">tablename</replaceable> [ ( <replaceable cla
<listitem>
<para>
An optional list of columns to be copied. If no column list is
specified, all columns will be used.
specified, all columns of the table will be copied.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><replaceable class="parameter">query</replaceable></term>
<listitem>
<para>
A <command>SELECT</> query whose results are to be copied.
Note that parentheses are required around the query.
</para>
</listitem>
</varlistentry>
......@@ -148,7 +159,8 @@ COPY <replaceable class="parameter">tablename</replaceable> [ ( <replaceable cla
<para>
Specifies copying the OID for each row. (An error is raised if
<literal>OIDS</literal> is specified for a table that does not
have OIDs.)
have OIDs, or in the case of copying a <replaceable
class="parameter">query</replaceable>.)
</para>
</listitem>
</varlistentry>
......@@ -265,7 +277,7 @@ COPY <replaceable class="parameter">tablename</replaceable> [ ( <replaceable cla
COPY <replaceable class="parameter">count</replaceable>
</screen>
The <replaceable class="parameter">count</replaceable> is the number
of rows inserted into or copied from the table.
of rows copied.
</para>
</refsect1>
......@@ -274,7 +286,8 @@ COPY <replaceable class="parameter">count</replaceable>
<para>
<command>COPY</command> can only be used with plain tables, not
with views.
with views. However, you can write <literal>COPY (SELECT * FROM
<replaceable class="parameter">viewname</replaceable>) TO ...</literal>.
</para>
<para>
......@@ -320,8 +333,8 @@ COPY <replaceable class="parameter">count</replaceable>
server in the case of <command>COPY TO</command>, but for
<command>COPY FROM</command> you do have the option of reading from
a file specified by a relative path. The path will be interpreted
relative to the working directory of the server process (somewhere below
the data directory), not the client's working directory.
relative to the working directory of the server process (normally
the cluster's data directory), not the client's working directory.
</para>
<para>
......@@ -737,14 +750,9 @@ COPY country FROM '/usr1/proj/bray/sql/country_data';
</para>
<para>
To copy into a file just the countries whose names start with 'A'
using a temporary table which is automatically deleted:
To copy into a file just the countries whose names start with 'A':
<programlisting>
BEGIN;
CREATE TEMP TABLE a_list_countries AS
SELECT * FROM country WHERE country_name LIKE 'A%';
COPY a_list_countries TO '/usr1/proj/bray/sql/a_list_countries.copy';
ROLLBACK;
COPY (SELECT * FROM country WHERE country_name LIKE 'A%') TO '/usr1/proj/bray/sql/a_list_countries.copy';
</programlisting>
</para>
......
<!--
$PostgreSQL: pgsql/doc/src/sgml/ref/psql-ref.sgml,v 1.167 2006/08/29 22:25:04 tgl Exp $
$PostgreSQL: pgsql/doc/src/sgml/ref/psql-ref.sgml,v 1.168 2006/08/30 23:34:21 tgl Exp $
PostgreSQL documentation
-->
......@@ -739,8 +739,7 @@ testdb=&gt;
</varlistentry>
<varlistentry>
<term><literal>\copy <replaceable class="parameter">table</replaceable>
[ ( <replaceable class="parameter">column_list</replaceable> ) ]
<term><literal>\copy { <replaceable class="parameter">table</replaceable> [ ( <replaceable class="parameter">column_list</replaceable> ) ] | ( <replaceable class="parameter">query</replaceable> ) }
{ <literal>from</literal> | <literal>to</literal> }
{ <replaceable class="parameter">filename</replaceable> | stdin | stdout | pstdin | pstdout }
[ with ]
......@@ -779,9 +778,7 @@ testdb=&gt;
</para>
<para>
<literal>\copy <replaceable
class="parameter">table</replaceable> from <replaceable
class="parameter">stdin | stdout</replaceable></literal>
<literal>\copy ... from stdin | to stdout</literal>
reads/writes based on the command input and output respectively.
All rows are read from the same source that issued the command,
continuing until <literal>\.</literal> is read or the stream
......
......@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.268 2006/07/14 14:52:18 momjian Exp $
* $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.269 2006/08/30 23:34:21 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -31,6 +31,7 @@
#include "libpq/pqformat.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "optimizer/planner.h"
#include "parser/parse_relation.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
......@@ -99,18 +100,21 @@ typedef struct CopyStateData
/* parameters from the COPY command */
Relation rel; /* relation to copy to or from */
QueryDesc *queryDesc; /* executable query to copy from */
List *attnumlist; /* integer list of attnums to copy */
char *filename; /* filename, or NULL for STDIN/STDOUT */
bool binary; /* binary format? */
bool oids; /* include OIDs? */
bool csv_mode; /* Comma Separated Value format? */
bool header_line; /* CSV header line? */
char *null_print; /* NULL marker string (server encoding!) */
int null_print_len; /* length of same */
char *null_print_client; /* same converted to client encoding */
char *delim; /* column delimiter (must be 1 byte) */
char *quote; /* CSV quote char (must be 1 byte) */
char *escape; /* CSV escape char (must be 1 byte) */
List *force_quote_atts; /* integer list of attnums to FQ */
List *force_notnull_atts; /* integer list of attnums to FNN */
bool *force_quote_flags; /* per-column CSV FQ flags */
bool *force_notnull_flags; /* per-column CSV FNN flags */
/* these are just for error messages, see copy_in_error_callback */
const char *cur_relname; /* table name for error messages */
......@@ -118,6 +122,12 @@ typedef struct CopyStateData
const char *cur_attname; /* current att for error messages */
const char *cur_attval; /* current att value for error messages */
/*
* Working state for COPY TO
*/
FmgrInfo *out_functions; /* lookup info for output functions */
MemoryContext rowcontext; /* per-row evaluation context */
/*
* These variables are used to reduce overhead in textual COPY FROM.
*
......@@ -153,6 +163,13 @@ typedef struct CopyStateData
typedef CopyStateData *CopyState;
/* DestReceiver for COPY (SELECT) TO */
typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
CopyState cstate; /* CopyStateData for the command */
} DR_copy;
/*
* These macros centralize code used to process line_buf and raw_buf buffers.
......@@ -225,6 +242,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* non-export function prototypes */
static void DoCopyTo(CopyState cstate);
static void CopyTo(CopyState cstate);
static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
Datum *values, bool *nulls);
static void CopyFrom(CopyState cstate);
static bool CopyReadLine(CopyState cstate);
static bool CopyReadLineText(CopyState cstate);
......@@ -239,7 +258,8 @@ static Datum CopyReadBinaryAttribute(CopyState cstate,
static void CopyAttributeOutText(CopyState cstate, char *string);
static void CopyAttributeOutCSV(CopyState cstate, char *string,
bool use_quote, bool single_attr);
static List *CopyGetAttnums(Relation rel, List *attnamelist);
static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
List *attnamelist);
static char *limit_printout_length(const char *str);
/* Low-level communications functions */
......@@ -668,7 +688,8 @@ CopyLoadRawBuf(CopyState cstate)
* DoCopy executes the SQL COPY statement.
*
* Either unload or reload contents of table <relation>, depending on <from>.
* (<from> = TRUE means we are inserting into the table.)
* (<from> = TRUE means we are inserting into the table.) In the "TO" case
* we also support copying the output of an arbitrary SELECT query.
*
* If <pipe> is false, transfer is between the table and the file named
* <filename>. Otherwise, transfer is between the table and our regular
......@@ -697,8 +718,6 @@ uint64
DoCopy(const CopyStmt *stmt)
{
CopyState cstate;
RangeVar *relation = stmt->relation;
char *filename = stmt->filename;
bool is_from = stmt->is_from;
bool pipe = (stmt->filename == NULL);
List *attnamelist = stmt->attlist;
......@@ -707,6 +726,8 @@ DoCopy(const CopyStmt *stmt)
AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
AclResult aclresult;
ListCell *option;
TupleDesc tupDesc;
int num_phys_attrs;
uint64 processed;
/* Allocate workspace and zero all fields */
......@@ -920,23 +941,7 @@ DoCopy(const CopyStmt *stmt)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CSV quote character must not appear in the NULL specification")));
/* Open and lock the relation, using the appropriate lock type. */
cstate->rel = heap_openrv(relation,
(is_from ? RowExclusiveLock : AccessShareLock));
/* check read-only transaction */
if (XactReadOnly && is_from &&
!isTempNamespace(RelationGetNamespace(cstate->rel)))
ereport(ERROR,
(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
errmsg("transaction is read-only")));
/* Check permissions. */
aclresult = pg_class_aclcheck(RelationGetRelid(cstate->rel), GetUserId(),
required_access);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_CLASS,
RelationGetRelationName(cstate->rel));
/* Disallow file COPY except to superusers. */
if (!pipe && !superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
......@@ -944,26 +949,137 @@ DoCopy(const CopyStmt *stmt)
errhint("Anyone can COPY to stdout or from stdin. "
"psql's \\copy command also works for anyone.")));
/* Don't allow COPY w/ OIDs to or from a table without them */
if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("table \"%s\" does not have OIDs",
RelationGetRelationName(cstate->rel))));
if (stmt->relation)
{
Assert(!stmt->query);
cstate->queryDesc = NULL;
/* Open and lock the relation, using the appropriate lock type. */
cstate->rel = heap_openrv(stmt->relation,
(is_from ? RowExclusiveLock : AccessShareLock));
/* Check relation permissions. */
aclresult = pg_class_aclcheck(RelationGetRelid(cstate->rel),
GetUserId(),
required_access);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, ACL_KIND_CLASS,
RelationGetRelationName(cstate->rel));
/* check read-only transaction */
if (XactReadOnly && is_from &&
!isTempNamespace(RelationGetNamespace(cstate->rel)))
ereport(ERROR,
(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
errmsg("transaction is read-only")));
/* Don't allow COPY w/ OIDs to or from a table without them */
if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("table \"%s\" does not have OIDs",
RelationGetRelationName(cstate->rel))));
tupDesc = RelationGetDescr(cstate->rel);
}
else
{
Query *query = stmt->query;
List *rewritten;
Plan *plan;
DestReceiver *dest;
Assert(query);
Assert(!is_from);
cstate->rel = NULL;
/* Don't allow COPY w/ OIDs from a select */
if (cstate->oids)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY (SELECT) WITH OIDS is not supported")));
if (query->into)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("COPY (SELECT INTO) is not supported")));
/*
* The query has already been through parse analysis, but not
* rewriting or planning. Do that now.
*
* Because the planner is not cool about not scribbling on its input,
* we make a preliminary copy of the source querytree. This prevents
* problems in the case that the COPY is in a portal or plpgsql
* function and is executed repeatedly. (See also the same hack in
* EXPLAIN, DECLARE CURSOR and PREPARE.) XXX the planner really
* shouldn't modify its input ... FIXME someday.
*/
query = copyObject(query);
Assert(query->commandType == CMD_SELECT);
/*
* Must acquire locks in case we didn't come fresh from the parser.
* XXX this also scribbles on query, another reason for copyObject
*/
AcquireRewriteLocks(query);
/* Rewrite through rule system */
rewritten = QueryRewrite(query);
/* We don't expect more or less than one result query */
if (list_length(rewritten) != 1)
elog(ERROR, "unexpected rewrite result");
query = (Query *) linitial(rewritten);
Assert(query->commandType == CMD_SELECT);
/* plan the query */
plan = planner(query, false, 0, NULL);
/*
* Update snapshot command ID to ensure this query sees results of any
* previously executed queries. (It's a bit cheesy to modify
* ActiveSnapshot without making a copy, but for the limited ways in
* which COPY can be invoked, I think it's OK, because the active
* snapshot shouldn't be shared with anything else anyway.)
*/
ActiveSnapshot->curcid = GetCurrentCommandId();
/* Create dest receiver for COPY OUT */
dest = CreateDestReceiver(DestCopyOut, NULL);
((DR_copy *) dest)->cstate = cstate;
/* Create a QueryDesc requesting no output */
cstate->queryDesc = CreateQueryDesc(query, plan,
ActiveSnapshot, InvalidSnapshot,
dest, NULL, false);
/*
* Call ExecutorStart to prepare the plan for execution.
*
* ExecutorStart computes a result tupdesc for us
*/
ExecutorStart(cstate->queryDesc, 0);
tupDesc = cstate->queryDesc->tupDesc;
}
/* Generate or convert list of attributes to process */
cstate->attnumlist = CopyGetAttnums(cstate->rel, attnamelist);
cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
num_phys_attrs = tupDesc->natts;
/* Convert FORCE QUOTE name list to column numbers, check validity */
/* Convert FORCE QUOTE name list to per-column flags, check validity */
cstate->force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (force_quote)
{
TupleDesc tupDesc = RelationGetDescr(cstate->rel);
Form_pg_attribute *attr = tupDesc->attrs;
List *attnums;
ListCell *cur;
cstate->force_quote_atts = CopyGetAttnums(cstate->rel, force_quote);
attnums = CopyGetAttnums(tupDesc, cstate->rel, force_quote);
foreach(cur, cstate->force_quote_atts)
foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
......@@ -971,21 +1087,21 @@ DoCopy(const CopyStmt *stmt)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("FORCE QUOTE column \"%s\" not referenced by COPY",
NameStr(attr[attnum - 1]->attname))));
NameStr(tupDesc->attrs[attnum - 1]->attname))));
cstate->force_quote_flags[attnum - 1] = true;
}
}
/* Convert FORCE NOT NULL name list to column numbers, check validity */
/* Convert FORCE NOT NULL name list to per-column flags, check validity */
cstate->force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
if (force_notnull)
{
TupleDesc tupDesc = RelationGetDescr(cstate->rel);
Form_pg_attribute *attr = tupDesc->attrs;
List *attnums;
ListCell *cur;
cstate->force_notnull_atts = CopyGetAttnums(cstate->rel,
force_notnull);
attnums = CopyGetAttnums(tupDesc, cstate->rel, force_notnull);
foreach(cur, cstate->force_notnull_atts)
foreach(cur, attnums)
{
int attnum = lfirst_int(cur);
......@@ -993,7 +1109,8 @@ DoCopy(const CopyStmt *stmt)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("FORCE NOT NULL column \"%s\" not referenced by COPY",
NameStr(attr[attnum - 1]->attname))));
NameStr(tupDesc->attrs[attnum - 1]->attname))));
cstate->force_notnull_flags[attnum - 1] = true;
}
}
......@@ -1018,67 +1135,59 @@ DoCopy(const CopyStmt *stmt)
cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding);
cstate->copy_dest = COPY_FILE; /* default */
cstate->filename = stmt->filename;
if (is_from)
{ /* copy from file to database */
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
{
if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to view \"%s\"",
RelationGetRelationName(cstate->rel))));
else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to sequence \"%s\"",
RelationGetRelationName(cstate->rel))));
else
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to non-table relation \"%s\"",
RelationGetRelationName(cstate->rel))));
}
if (pipe)
{
if (whereToSendOutput == DestRemote)
ReceiveCopyBegin(cstate);
else
cstate->copy_file = stdin;
}
else
{
struct stat st;
if (is_from) /* copy from file to database */
CopyFrom(cstate);
else /* copy from database to file */
DoCopyTo(cstate);
cstate->copy_file = AllocateFile(filename, PG_BINARY_R);
/*
* Close the relation or query. If reading, we can release the
* AccessShareLock we got; if writing, we should hold the lock until end
* of transaction to ensure that updates will be committed before lock is
* released.
*/
if (cstate->rel)
heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock));
else
{
/* Close down the query and free resources. */
ExecutorEnd(cstate->queryDesc);
FreeQueryDesc(cstate->queryDesc);
}
if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for reading: %m",
filename)));
/* Clean up storage (probably not really necessary) */
processed = cstate->processed;
fstat(fileno(cstate->copy_file), &st);
if (S_ISDIR(st.st_mode))
{
FreeFile(cstate->copy_file);
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", filename)));
}
}
pfree(cstate->attribute_buf.data);
pfree(cstate->line_buf.data);
pfree(cstate->raw_buf);
pfree(cstate);
CopyFrom(cstate);
}
else
{ /* copy from database to file */
return processed;
}
/*
* This intermediate routine exists mainly to localize the effects of setjmp
* so we don't need to plaster a lot of variables with "volatile".
*/
static void
DoCopyTo(CopyState cstate)
{
bool pipe = (cstate->filename == NULL);
if (cstate->rel)
{
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
{
if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from view \"%s\"",
RelationGetRelationName(cstate->rel))));
RelationGetRelationName(cstate->rel)),
errhint("Try the COPY (SELECT ...) TO variant.")));
else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
......@@ -1090,86 +1199,49 @@ DoCopy(const CopyStmt *stmt)
errmsg("cannot copy from non-table relation \"%s\"",
RelationGetRelationName(cstate->rel))));
}
if (pipe)
{
if (whereToSendOutput == DestRemote)
cstate->fe_copy = true;
else
cstate->copy_file = stdout;
}
else
{
mode_t oumask; /* Pre-existing umask value */
struct stat st;
/*
* Prevent write to relative path ... too easy to shoot oneself in
* the foot by overwriting a database file ...
*/
if (!is_absolute_path(filename))
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("relative path not allowed for COPY to file")));
oumask = umask((mode_t) 022);
cstate->copy_file = AllocateFile(filename, PG_BINARY_W);
umask(oumask);
if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for writing: %m",
filename)));
fstat(fileno(cstate->copy_file), &st);
if (S_ISDIR(st.st_mode))
{
FreeFile(cstate->copy_file);
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", filename)));
}
}
DoCopyTo(cstate);
}
if (!pipe)
if (pipe)
{
/* we assume only the write case could fail here */
if (FreeFile(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to file \"%s\": %m",
filename)));
if (whereToSendOutput == DestRemote)
cstate->fe_copy = true;
else
cstate->copy_file = stdout;
}
else
{
mode_t oumask; /* Pre-existing umask value */
struct stat st;
/*
* Close the relation. If reading, we can release the AccessShareLock we
* got; if writing, we should hold the lock until end of transaction to
* ensure that updates will be committed before lock is released.
*/
heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock));
/* Clean up storage (probably not really necessary) */
processed = cstate->processed;
/*
* Prevent write to relative path ... too easy to shoot oneself in
* the foot by overwriting a database file ...
*/
if (!is_absolute_path(cstate->filename))
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("relative path not allowed for COPY to file")));
pfree(cstate->attribute_buf.data);
pfree(cstate->line_buf.data);
pfree(cstate->raw_buf);
pfree(cstate);
oumask = umask((mode_t) 022);
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
umask(oumask);
return processed;
}
if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for writing: %m",
cstate->filename)));
fstat(fileno(cstate->copy_file), &st);
if (S_ISDIR(st.st_mode))
{
FreeFile(cstate->copy_file);
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));
}
}
/*
* This intermediate routine just exists to localize the effects of setjmp
* so we don't need to plaster a lot of variables with "volatile".
*/
static void
DoCopyTo(CopyState cstate)
{
PG_TRY();
{
if (cstate->fe_copy)
......@@ -1191,40 +1263,41 @@ DoCopyTo(CopyState cstate)
PG_RE_THROW();
}
PG_END_TRY();
if (!pipe)
{
if (FreeFile(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write to file \"%s\": %m",
cstate->filename)));
}
}
/*
* Copy from relation TO file.
* Copy from relation or query TO file.
*/
static void
CopyTo(CopyState cstate)
{
HeapTuple tuple;
TupleDesc tupDesc;
HeapScanDesc scandesc;
int num_phys_attrs;
int attr_count;
Form_pg_attribute *attr;
FmgrInfo *out_functions;
bool *force_quote;
char *string;
char *null_print_client;
ListCell *cur;
MemoryContext oldcontext;
MemoryContext mycontext;
tupDesc = cstate->rel->rd_att;
if (cstate->rel)
tupDesc = RelationGetDescr(cstate->rel);
else
tupDesc = cstate->queryDesc->tupDesc;
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
attr_count = list_length(cstate->attnumlist);
null_print_client = cstate->null_print; /* default */
cstate->null_print_client = cstate->null_print; /* default */
/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
cstate->fe_msgbuf = makeStringInfo();
/* Get info about the columns we need to process. */
out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
force_quote = (bool *) palloc(num_phys_attrs * sizeof(bool));
cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
......@@ -1239,12 +1312,7 @@ CopyTo(CopyState cstate)
getTypeOutputInfo(attr[attnum - 1]->atttypid,
&out_func_oid,
&isvarlena);
fmgr_info(out_func_oid, &out_functions[attnum - 1]);
if (list_member_int(cstate->force_quote_atts, attnum))
force_quote[attnum - 1] = true;
else
force_quote[attnum - 1] = false;
fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
}
/*
......@@ -1253,11 +1321,11 @@ CopyTo(CopyState cstate)
* datatype output routines, and should be faster than retail pfree's
* anyway. (We don't need a whole econtext as CopyFrom does.)
*/
mycontext = AllocSetContextCreate(CurrentMemoryContext,
"COPY TO",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
"COPY TO",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
if (cstate->binary)
{
......@@ -1282,7 +1350,7 @@ CopyTo(CopyState cstate)
* encoding, because it will be sent directly with CopySendString.
*/
if (cstate->need_transcoding)
null_print_client = pg_server_to_client(cstate->null_print,
cstate->null_print_client = pg_server_to_client(cstate->null_print,
cstate->null_print_len);
/* if a header has been requested send the line */
......@@ -1309,113 +1377,139 @@ CopyTo(CopyState cstate)
}
}
scandesc = heap_beginscan(cstate->rel, ActiveSnapshot, 0, NULL);
while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
if (cstate->rel)
{
bool need_delim = false;
Datum *values;
bool *nulls;
HeapScanDesc scandesc;
HeapTuple tuple;
CHECK_FOR_INTERRUPTS();
values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
MemoryContextReset(mycontext);
oldcontext = MemoryContextSwitchTo(mycontext);
scandesc = heap_beginscan(cstate->rel, ActiveSnapshot, 0, NULL);
if (cstate->binary)
while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
{
/* Binary per-tuple header */
CopySendInt16(cstate, attr_count);
/* Send OID if wanted --- note attr_count doesn't include it */
if (cstate->oids)
{
Oid oid = HeapTupleGetOid(tuple);
CHECK_FOR_INTERRUPTS();
/* Hack --- assume Oid is same size as int32 */
CopySendInt32(cstate, sizeof(int32));
CopySendInt32(cstate, oid);
}
/* Deconstruct the tuple ... faster than repeated heap_getattr */
heap_deform_tuple(tuple, tupDesc, values, nulls);
/* Format and send the data */
CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
}
else
heap_endscan(scandesc);
}
else
{
/* run the plan --- the dest receiver will send tuples */
ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
}
if (cstate->binary)
{
/* Generate trailer for a binary copy */
CopySendInt16(cstate, -1);
/* Need to flush out the trailer */
CopySendEndOfRow(cstate);
}
MemoryContextDelete(cstate->rowcontext);
}
/*
* Emit one row during CopyTo().
*/
static void
CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
{
bool need_delim = false;
FmgrInfo *out_functions = cstate->out_functions;
MemoryContext oldcontext;
ListCell *cur;
char *string;
MemoryContextReset(cstate->rowcontext);
oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
if (cstate->binary)
{
/* Binary per-tuple header */
CopySendInt16(cstate, list_length(cstate->attnumlist));
/* Send OID if wanted --- note attnumlist doesn't include it */
if (cstate->oids)
{
/* Text format has no per-tuple header, but send OID if wanted */
/* Assume digits don't need any quoting or encoding conversion */
if (cstate->oids)
{
string = DatumGetCString(DirectFunctionCall1(oidout,
ObjectIdGetDatum(HeapTupleGetOid(tuple))));
CopySendString(cstate, string);
need_delim = true;
}
/* Hack --- assume Oid is same size as int32 */
CopySendInt32(cstate, sizeof(int32));
CopySendInt32(cstate, tupleOid);
}
foreach(cur, cstate->attnumlist)
}
else
{
/* Text format has no per-tuple header, but send OID if wanted */
/* Assume digits don't need any quoting or encoding conversion */
if (cstate->oids)
{
int attnum = lfirst_int(cur);
Datum value;
bool isnull;
string = DatumGetCString(DirectFunctionCall1(oidout,
ObjectIdGetDatum(tupleOid)));
CopySendString(cstate, string);
need_delim = true;
}
}
foreach(cur, cstate->attnumlist)
{
int attnum = lfirst_int(cur);
Datum value = values[attnum - 1];
bool isnull = nulls[attnum - 1];
value = heap_getattr(tuple, attnum, tupDesc, &isnull);
if (!cstate->binary)
{
if (need_delim)
CopySendChar(cstate, cstate->delim[0]);
need_delim = true;
}
if (isnull)
{
if (!cstate->binary)
CopySendString(cstate, cstate->null_print_client);
else
CopySendInt32(cstate, -1);
}
else
{
if (!cstate->binary)
{
if (need_delim)
CopySendChar(cstate, cstate->delim[0]);
need_delim = true;
}
if (isnull)
{
if (!cstate->binary)
CopySendString(cstate, null_print_client);
string = OutputFunctionCall(&out_functions[attnum - 1],
value);
if (cstate->csv_mode)
CopyAttributeOutCSV(cstate, string,
cstate->force_quote_flags[attnum - 1],
list_length(cstate->attnumlist) == 1);
else
CopySendInt32(cstate, -1);
CopyAttributeOutText(cstate, string);
}
else
{
if (!cstate->binary)
{
string = OutputFunctionCall(&out_functions[attnum - 1],
value);
if (cstate->csv_mode)
CopyAttributeOutCSV(cstate, string,
force_quote[attnum - 1],
list_length(cstate->attnumlist) == 1);
else
CopyAttributeOutText(cstate, string);
}
else
{
bytea *outputbytes;
bytea *outputbytes;
outputbytes = SendFunctionCall(&out_functions[attnum - 1],
value);
CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
CopySendData(cstate, VARDATA(outputbytes),
VARSIZE(outputbytes) - VARHDRSZ);
}
outputbytes = SendFunctionCall(&out_functions[attnum - 1],
value);
CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
CopySendData(cstate, VARDATA(outputbytes),
VARSIZE(outputbytes) - VARHDRSZ);
}
}
CopySendEndOfRow(cstate);
MemoryContextSwitchTo(oldcontext);
cstate->processed++;
}
heap_endscan(scandesc);
if (cstate->binary)
{
/* Generate trailer for a binary copy */
CopySendInt16(cstate, -1);
/* Need to flush out the trailer */
CopySendEndOfRow(cstate);
}
MemoryContextDelete(mycontext);
CopySendEndOfRow(cstate);
pfree(out_functions);
pfree(force_quote);
MemoryContextSwitchTo(oldcontext);
cstate->processed++;
}
......@@ -1528,6 +1622,7 @@ limit_printout_length(const char *str)
static void
CopyFrom(CopyState cstate)
{
bool pipe = (cstate->filename == NULL);
HeapTuple tuple;
TupleDesc tupDesc;
Form_pg_attribute *attr;
......@@ -1538,7 +1633,6 @@ CopyFrom(CopyState cstate)
FmgrInfo oid_in_function;
Oid *typioparams;
Oid oid_typioparam;
bool *force_notnull;
int attnum;
int i;
Oid in_func_oid;
......@@ -1558,6 +1652,56 @@ CopyFrom(CopyState cstate)
MemoryContext oldcontext = CurrentMemoryContext;
ErrorContextCallback errcontext;
Assert(cstate->rel);
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
{
if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to view \"%s\"",
RelationGetRelationName(cstate->rel))));
else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to sequence \"%s\"",
RelationGetRelationName(cstate->rel))));
else
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy to non-table relation \"%s\"",
RelationGetRelationName(cstate->rel))));
}
if (pipe)
{
if (whereToSendOutput == DestRemote)
ReceiveCopyBegin(cstate);
else
cstate->copy_file = stdin;
}
else
{
struct stat st;
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for reading: %m",
cstate->filename)));
fstat(fileno(cstate->copy_file), &st);
if (S_ISDIR(st.st_mode))
{
FreeFile(cstate->copy_file);
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));
}
}
tupDesc = RelationGetDescr(cstate->rel);
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
......@@ -1599,7 +1743,6 @@ CopyFrom(CopyState cstate)
typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
defmap = (int *) palloc(num_phys_attrs * sizeof(int));
defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
force_notnull = (bool *) palloc(num_phys_attrs * sizeof(bool));
for (attnum = 1; attnum <= num_phys_attrs; attnum++)
{
......@@ -1616,11 +1759,6 @@ CopyFrom(CopyState cstate)
&in_func_oid, &typioparams[attnum - 1]);
fmgr_info(in_func_oid, &in_functions[attnum - 1]);
if (list_member_int(cstate->force_notnull_atts, attnum))
force_notnull[attnum - 1] = true;
else
force_notnull[attnum - 1] = false;
/* Get default info if needed */
if (!list_member_int(cstate->attnumlist, attnum))
{
......@@ -1810,7 +1948,8 @@ CopyFrom(CopyState cstate)
NameStr(attr[m]->attname))));
string = field_strings[fieldno++];
if (cstate->csv_mode && string == NULL && force_notnull[m])
if (cstate->csv_mode && string == NULL &&
cstate->force_notnull_flags[m])
{
/* Go ahead and read the NULL string */
string = cstate->null_print;
......@@ -1972,13 +2111,21 @@ CopyFrom(CopyState cstate)
pfree(typioparams);
pfree(defmap);
pfree(defexprs);
pfree(force_notnull);
ExecDropSingleTupleTableSlot(slot);
ExecCloseIndices(resultRelInfo);
FreeExecutorState(estate);
if (!pipe)
{
if (FreeFile(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from file \"%s\": %m",
cstate->filename)));
}
}
......@@ -3055,16 +3202,17 @@ CopyAttributeOutCSV(CopyState cstate, char *string,
* The input attnamelist is either the user-specified column list,
* or NIL if there was none (in which case we want all the non-dropped
* columns).
*
* rel can be NULL ... it's only used for error reports.
*/
static List *
CopyGetAttnums(Relation rel, List *attnamelist)
CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
{
List *attnums = NIL;
if (attnamelist == NIL)
{
/* Generate default column list */
TupleDesc tupDesc = RelationGetDescr(rel);
Form_pg_attribute *attr = tupDesc->attrs;
int attr_count = tupDesc->natts;
int i;
......@@ -3085,15 +3233,33 @@ CopyGetAttnums(Relation rel, List *attnamelist)
{
char *name = strVal(lfirst(l));
int attnum;
int i;
/* Lookup column name */
/* Note we disallow system columns here */
attnum = attnameAttNum(rel, name, false);
attnum = InvalidAttrNumber;
for (i = 0; i < tupDesc->natts; i++)
{
if (tupDesc->attrs[i]->attisdropped)
continue;
if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
{
attnum = tupDesc->attrs[i]->attnum;
break;
}
}
if (attnum == InvalidAttrNumber)
ereport(ERROR,
{
if (rel != NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" of relation \"%s\" does not exist",
name, RelationGetRelationName(rel))));
name, RelationGetRelationName(rel))));
else
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" does not exist",
name)));
}
/* Check for duplicates */
if (list_member_int(attnums, attnum))
ereport(ERROR,
......@@ -3106,3 +3272,66 @@ CopyGetAttnums(Relation rel, List *attnamelist)
return attnums;
}
/*
* copy_dest_startup --- executor startup
*/
static void
copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{
/* no-op */
}
/*
* copy_dest_receive --- receive one tuple
*/
static void
copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
{
DR_copy *myState = (DR_copy *) self;
CopyState cstate = myState->cstate;
/* Make sure the tuple is fully deconstructed */
slot_getallattrs(slot);
/* And send the data */
CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
}
/*
* copy_dest_shutdown --- executor end
*/
static void
copy_dest_shutdown(DestReceiver *self)
{
/* no-op */
}
/*
* copy_dest_destroy --- release DestReceiver object
*/
static void
copy_dest_destroy(DestReceiver *self)
{
pfree(self);
}
/*
* CreateCopyDestReceiver -- create a suitable DestReceiver object
*/
DestReceiver *
CreateCopyDestReceiver(void)
{
DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
self->pub.receiveSlot = copy_dest_receive;
self->pub.rStartup = copy_dest_startup;
self->pub.rShutdown = copy_dest_shutdown;
self->pub.rDestroy = copy_dest_destroy;
self->pub.mydest = DestCopyOut;
self->cstate = NULL; /* will be set later */
return (DestReceiver *) self;
}
......@@ -15,7 +15,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.349 2006/08/25 04:06:49 tgl Exp $
* $PostgreSQL: pgsql/src/backend/nodes/copyfuncs.c,v 1.350 2006/08/30 23:34:21 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -1934,6 +1934,7 @@ _copyCopyStmt(CopyStmt *from)
CopyStmt *newnode = makeNode(CopyStmt);
COPY_NODE_FIELD(relation);
COPY_NODE_FIELD(query);
COPY_NODE_FIELD(attlist);
COPY_SCALAR_FIELD(is_from);
COPY_STRING_FIELD(filename);
......
......@@ -18,7 +18,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.283 2006/08/25 04:06:49 tgl Exp $
* $PostgreSQL: pgsql/src/backend/nodes/equalfuncs.c,v 1.284 2006/08/30 23:34:21 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -863,6 +863,7 @@ static bool
_equalCopyStmt(CopyStmt *a, CopyStmt *b)
{
COMPARE_NODE_FIELD(relation);
COMPARE_NODE_FIELD(query);
COMPARE_NODE_FIELD(attlist);
COMPARE_SCALAR_FIELD(is_from);
COMPARE_STRING_FIELD(filename);
......
......@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/backend/parser/analyze.c,v 1.348 2006/08/25 04:06:51 tgl Exp $
* $PostgreSQL: pgsql/src/backend/parser/analyze.c,v 1.349 2006/08/30 23:34:21 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -341,6 +341,19 @@ transformStmt(ParseState *pstate, Node *parseTree,
}
break;
case T_CopyStmt:
{
CopyStmt *n = (CopyStmt *) parseTree;
result = makeNode(Query);
result->commandType = CMD_UTILITY;
if (n->query)
n->query = transformStmt(pstate, (Node *) n->query,
extras_before, extras_after);
result->utilityStmt = (Node *) parseTree;
}
break;
case T_AlterTableStmt:
result = transformAlterTableStmt(pstate,
(AlterTableStmt *) parseTree,
......
......@@ -11,7 +11,7 @@
*
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.558 2006/08/25 04:06:51 tgl Exp $
* $PostgreSQL: pgsql/src/backend/parser/gram.y,v 2.559 2006/08/30 23:34:21 tgl Exp $
*
* HISTORY
* AUTHOR DATE MAJOR EVENT
......@@ -1614,11 +1614,15 @@ ClosePortalStmt:
/*****************************************************************************
*
* QUERY :
* COPY <relname> ['(' columnList ')'] FROM/TO [WITH options]
* COPY relname ['(' columnList ')'] FROM/TO file [WITH options]
*
* BINARY, OIDS, and DELIMITERS kept in old locations
* for backward compatibility. 2002-06-18
*
* COPY ( SELECT ... ) TO file [WITH options]
* This form doesn't have the backwards-compatible option
* syntax.
*
*****************************************************************************/
CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids
......@@ -1626,6 +1630,7 @@ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids
{
CopyStmt *n = makeNode(CopyStmt);
n->relation = $3;
n->query = NULL;
n->attlist = $4;
n->is_from = $6;
n->filename = $7;
......@@ -1642,6 +1647,18 @@ CopyStmt: COPY opt_binary qualified_name opt_column_list opt_oids
n->options = list_concat(n->options, $10);
$$ = (Node *)n;
}
| COPY select_with_parens TO copy_file_name opt_with
copy_opt_list
{
CopyStmt *n = makeNode(CopyStmt);
n->relation = NULL;
n->query = (Query *) $2;
n->attlist = NIL;
n->is_from = false;
n->filename = $4;
n->options = $6;
$$ = (Node *)n;
}
;
copy_from:
......@@ -1652,7 +1669,7 @@ copy_from:
/*
* copy_file_name NULL indicates stdio is used. Whether stdin or stdout is
* used depends on the direction. (It really doesn't make sense to copy from
* stdout. We silently correct the "typo". - AY 9/94
* stdout. We silently correct the "typo".) - AY 9/94
*/
copy_file_name:
Sconst { $$ = $1; }
......
......@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $PostgreSQL: pgsql/src/backend/tcop/dest.c,v 1.69 2006/08/12 02:52:05 tgl Exp $
* $PostgreSQL: pgsql/src/backend/tcop/dest.c,v 1.70 2006/08/30 23:34:21 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -30,6 +30,7 @@
#include "access/printtup.h"
#include "access/xact.h"
#include "commands/copy.h"
#include "executor/executor.h"
#include "executor/tstoreReceiver.h"
#include "libpq/libpq.h"
......@@ -128,6 +129,9 @@ CreateDestReceiver(CommandDest dest, Portal portal)
case DestIntoRel:
return CreateIntoRelDestReceiver();
case DestCopyOut:
return CreateCopyDestReceiver();
}
/* should never get here */
......@@ -153,6 +157,7 @@ EndCommand(const char *commandTag, CommandDest dest)
case DestSPI:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
break;
}
}
......@@ -192,6 +197,7 @@ NullCommand(CommandDest dest)
case DestSPI:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
break;
}
}
......@@ -233,6 +239,7 @@ ReadyForQuery(CommandDest dest)
case DestSPI:
case DestTuplestore:
case DestIntoRel:
case DestCopyOut:
break;
}
}
......@@ -3,7 +3,7 @@
*
* Copyright (c) 2000-2006, PostgreSQL Global Development Group
*
* $PostgreSQL: pgsql/src/bin/psql/copy.c,v 1.67 2006/08/29 15:19:50 tgl Exp $
* $PostgreSQL: pgsql/src/bin/psql/copy.c,v 1.68 2006/08/30 23:34:22 tgl Exp $
*/
#include "postgres_fe.h"
#include "copy.h"
......@@ -39,6 +39,9 @@
* \copy tablename [(columnlist)] from|to filename
* [ with ] [ binary ] [ oids ] [ delimiter [as] char ] [ null [as] string ]
*
* \copy ( select stmt ) to filename
* [ with ] [ binary ] [ delimiter [as] char ] [ null [as] string ]
*
* The pre-7.3 syntax was:
* \copy [ binary ] tablename [(columnlist)] [with oids] from|to filename
* [ [using] delimiters char ] [ with null as string ]
......@@ -142,6 +145,26 @@ parse_slash_copy(const char *args)
result->table = pg_strdup(token);
/* Handle COPY (SELECT) case */
if (token[0] == '(')
{
int parens = 1;
while (parens > 0)
{
token = strtokx(NULL, whitespace, ".,()", "\"'",
nonstd_backslash, true, false, pset.encoding);
if (!token)
goto error;
if (token[0] == '(')
parens++;
else if (token[0] == ')')
parens--;
xstrcat(&result->table, " ");
xstrcat(&result->table, token);
}
}
token = strtokx(NULL, whitespace, ".,()", "\"",
0, false, false, pset.encoding);
if (!token)
......
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/commands/copy.h,v 1.27 2006/03/05 15:58:55 momjian Exp $
* $PostgreSQL: pgsql/src/include/commands/copy.h,v 1.28 2006/08/30 23:34:22 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -15,8 +15,11 @@
#define COPY_H
#include "nodes/parsenodes.h"
#include "tcop/dest.h"
extern uint64 DoCopy(const CopyStmt *stmt);
extern DestReceiver *CreateCopyDestReceiver(void);
#endif /* COPY_H */
......@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.325 2006/08/25 04:06:56 tgl Exp $
* $PostgreSQL: pgsql/src/include/nodes/parsenodes.h,v 1.326 2006/08/30 23:34:22 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -1012,16 +1012,22 @@ typedef struct GrantRoleStmt
/* ----------------------
* Copy Statement
*
* We support "COPY relation FROM file", "COPY relation TO file", and
* "COPY (query) TO file". In any given CopyStmt, exactly one of "relation"
* and "query" must be non-NULL. Note: "query" is a SelectStmt before
* parse analysis, and a Query afterwards.
* ----------------------
*/
typedef struct CopyStmt
{
NodeTag type;
RangeVar *relation; /* the relation to copy */
Query *query; /* the query to copy */
List *attlist; /* List of column names (as Strings), or NIL
* for all columns */
bool is_from; /* TO or FROM */
char *filename; /* if NULL, use stdin/stdout */
char *filename; /* filename, or NULL for STDIN/STDOUT */
List *options; /* List of DefElem nodes */
} CopyStmt;
......
......@@ -54,7 +54,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* $PostgreSQL: pgsql/src/include/tcop/dest.h,v 1.51 2006/08/12 02:52:06 tgl Exp $
* $PostgreSQL: pgsql/src/include/tcop/dest.h,v 1.52 2006/08/30 23:34:22 tgl Exp $
*
*-------------------------------------------------------------------------
*/
......@@ -85,7 +85,8 @@ typedef enum
DestRemoteExecute, /* sent to frontend, in Execute command */
DestSPI, /* results sent to SPI manager */
DestTuplestore, /* results sent to Tuplestore */
DestIntoRel /* results sent to relation (SELECT INTO) */
DestIntoRel, /* results sent to relation (SELECT INTO) */
DestCopyOut /* results sent to COPY TO code */
} CommandDest;
/* ----------------
......
--
-- Test cases for COPY (select) TO
--
create table test1 (id serial, t text);
NOTICE: CREATE TABLE will create implicit sequence "test1_id_seq" for serial column "test1.id"
insert into test1 (t) values ('a');
insert into test1 (t) values ('b');
insert into test1 (t) values ('c');
insert into test1 (t) values ('d');
insert into test1 (t) values ('e');
create table test2 (id serial, t text);
NOTICE: CREATE TABLE will create implicit sequence "test2_id_seq" for serial column "test2.id"
insert into test2 (t) values ('A');
insert into test2 (t) values ('B');
insert into test2 (t) values ('C');
insert into test2 (t) values ('D');
insert into test2 (t) values ('E');
create view v_test1
as select 'v_'||t from test1;
--
-- Test COPY table TO
--
copy test1 to stdout;
1 a
2 b
3 c
4 d
5 e
--
-- This should fail
--
copy v_test1 to stdout;
ERROR: cannot copy from view "v_test1"
HINT: Try the COPY (SELECT ...) TO variant.
--
-- Test COPY (select) TO
--
copy (select t from test1 where id=1) to stdout;
a
--
-- Test COPY (select for update) TO
--
copy (select t from test1 where id=3 for update) to stdout;
c
--
-- This should fail
--
copy (select t into temp test3 from test1 where id=3) to stdout;
ERROR: COPY (SELECT INTO) is not supported
--
-- This should fail
--
copy (select * from test1) from stdin;
ERROR: syntax error at or near "from"
LINE 1: copy (select * from test1) from stdin;
^
--
-- This should fail
--
copy (select * from test1) (t,id) to stdout;
ERROR: syntax error at or near "("
LINE 1: copy (select * from test1) (t,id) to stdout;
^
--
-- Test JOIN
--
copy (select * from test1 join test2 using (id)) to stdout;
1 a A
2 b B
3 c C
4 d D
5 e E
--
-- Test UNION SELECT
--
copy (select t from test1 where id = 1 UNION select * from v_test1) to stdout;
a
v_a
v_b
v_c
v_d
v_e
--
-- Test subselect
--
copy (select * from (select t from test1 where id = 1 UNION select * from v_test1) t1) to stdout;
a
v_a
v_b
v_c
v_d
v_e
--
-- Test headers, CSV and quotes
--
copy (select t from test1 where id = 1) to stdout csv header force quote t;
t
"a"
--
-- Test psql builtins, plain table
--
\copy test1 to stdout
1 a
2 b
3 c
4 d
5 e
--
-- This should fail
--
\copy v_test1 to stdout
ERROR: cannot copy from view "v_test1"
HINT: Try the COPY (SELECT ...) TO variant.
\copy: ERROR: cannot copy from view "v_test1"
HINT: Try the COPY (SELECT ...) TO variant.
--
-- Test \copy (select ...)
--
\copy (select "id",'id','id""'||t,(id + 1)*id,t,"test1"."t" from test1 where id=3) to stdout
3 id id""c 12 c c
--
-- Drop everything
--
drop table test2;
drop view v_test1;
drop table test1;
# ----------
# The first group of parallel test
# $PostgreSQL: pgsql/src/test/regress/parallel_schedule,v 1.34 2006/08/12 02:52:06 tgl Exp $
# $PostgreSQL: pgsql/src/test/regress/parallel_schedule,v 1.35 2006/08/30 23:34:22 tgl Exp $
# ----------
test: boolean char name varchar text int2 int4 int8 oid float4 float8 bit numeric
......@@ -34,7 +34,7 @@ test: create_function_2
# execute two copy tests parallel, to check that copy itself
# is concurrent safe.
# ----------
test: copy
test: copy copyselect
# ----------
# The third group of parallel test
......
# $PostgreSQL: pgsql/src/test/regress/serial_schedule,v 1.32 2006/08/12 02:52:06 tgl Exp $
# $PostgreSQL: pgsql/src/test/regress/serial_schedule,v 1.33 2006/08/30 23:34:22 tgl Exp $
# This should probably be in an order similar to parallel_schedule.
test: boolean
test: char
......@@ -43,6 +43,7 @@ test: create_type
test: create_table
test: create_function_2
test: copy
test: copyselect
test: constraints
test: triggers
test: create_misc
......
--
-- Test cases for COPY (select) TO
--
create table test1 (id serial, t text);
insert into test1 (t) values ('a');
insert into test1 (t) values ('b');
insert into test1 (t) values ('c');
insert into test1 (t) values ('d');
insert into test1 (t) values ('e');
create table test2 (id serial, t text);
insert into test2 (t) values ('A');
insert into test2 (t) values ('B');
insert into test2 (t) values ('C');
insert into test2 (t) values ('D');
insert into test2 (t) values ('E');
create view v_test1
as select 'v_'||t from test1;
--
-- Test COPY table TO
--
copy test1 to stdout;
--
-- This should fail
--
copy v_test1 to stdout;
--
-- Test COPY (select) TO
--
copy (select t from test1 where id=1) to stdout;
--
-- Test COPY (select for update) TO
--
copy (select t from test1 where id=3 for update) to stdout;
--
-- This should fail
--
copy (select t into temp test3 from test1 where id=3) to stdout;
--
-- This should fail
--
copy (select * from test1) from stdin;
--
-- This should fail
--
copy (select * from test1) (t,id) to stdout;
--
-- Test JOIN
--
copy (select * from test1 join test2 using (id)) to stdout;
--
-- Test UNION SELECT
--
copy (select t from test1 where id = 1 UNION select * from v_test1) to stdout;
--
-- Test subselect
--
copy (select * from (select t from test1 where id = 1 UNION select * from v_test1) t1) to stdout;
--
-- Test headers, CSV and quotes
--
copy (select t from test1 where id = 1) to stdout csv header force quote t;
--
-- Test psql builtins, plain table
--
\copy test1 to stdout
--
-- This should fail
--
\copy v_test1 to stdout
--
-- Test \copy (select ...)
--
\copy (select "id",'id','id""'||t,(id + 1)*id,t,"test1"."t" from test1 where id=3) to stdout
--
-- Drop everything
--
drop table test2;
drop view v_test1;
drop table test1;
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment