Commit 8ddc05fb authored by Itagaki Takahiro's avatar Itagaki Takahiro

Export the external file reader used in COPY FROM as APIs.

They are expected to be used by extension modules like file_fdw.
There are no user-visible changes.

Itagaki Takahiro
Reviewed and tested by Kevin Grittner and Noah Misch.
parent 1cc19cc3
......@@ -93,13 +93,11 @@ typedef struct CopyStateData
FILE *copy_file; /* used if copy_dest == COPY_FILE */
StringInfo fe_msgbuf; /* used for all dests during COPY TO, only for
* dest == COPY_NEW_FE in COPY FROM */
bool fe_copy; /* true for all FE copy dests */
bool fe_eof; /* true if detected end of copy data */
EolType eol_type; /* EOL type of input */
int client_encoding; /* remote side's character encoding */
bool need_transcoding; /* client encoding diff from server? */
bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
uint64 processed; /* # of tuples processed */
/* parameters from the COPY command */
Relation rel; /* relation to copy to or from */
......@@ -119,18 +117,35 @@ typedef struct CopyStateData
bool *force_quote_flags; /* per-column CSV FQ flags */
bool *force_notnull_flags; /* per-column CSV FNN flags */
/* these are just for error messages, see copy_in_error_callback */
/* these are just for error messages, see CopyFromErrorCallback */
const char *cur_relname; /* table name for error messages */
int cur_lineno; /* line number for error messages */
const char *cur_attname; /* current att for error messages */
const char *cur_attval; /* current att value for error messages */
/*
* Working state for COPY TO/FROM
*/
MemoryContext copycontext; /* per-copy execution context */
/*
* Working state for COPY TO
*/
FmgrInfo *out_functions; /* lookup info for output functions */
MemoryContext rowcontext; /* per-row evaluation context */
/*
* Working state for COPY FROM
*/
AttrNumber num_defaults;
bool file_has_oids;
FmgrInfo oid_in_function;
Oid oid_typioparam;
FmgrInfo *in_functions; /* array of input functions for each attrs */
Oid *typioparams; /* array of element types for in_functions */
int *defmap; /* array of default att numbers */
ExprState **defexprs; /* array of default att expressions */
/*
* These variables are used to reduce overhead in textual COPY FROM.
*
......@@ -169,13 +184,12 @@ typedef struct CopyStateData
int raw_buf_len; /* total # of bytes stored */
} CopyStateData;
typedef CopyStateData *CopyState;
/* DestReceiver for COPY (SELECT) TO */
typedef struct
{
DestReceiver pub; /* publicly-known function pointers */
CopyState cstate; /* CopyStateData for the command */
uint64 processed; /* # of tuples processed */
} DR_copy;
......@@ -248,11 +262,17 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* non-export function prototypes */
static void DoCopyTo(CopyState cstate);
static void CopyTo(CopyState cstate);
static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
const char *queryString, List *attnamelist, List *options);
static void EndCopy(CopyState cstate);
static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
const char *filename, List *attnamelist, List *options);
static void EndCopyTo(CopyState cstate);
static uint64 DoCopyTo(CopyState cstate);
static uint64 CopyTo(CopyState cstate);
static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
Datum *values, bool *nulls);
static void CopyFrom(CopyState cstate);
static uint64 CopyFrom(CopyState cstate);
static bool CopyReadLine(CopyState cstate);
static bool CopyReadLineText(CopyState cstate);
static int CopyReadAttributesText(CopyState cstate);
......@@ -700,6 +720,102 @@ CopyLoadRawBuf(CopyState cstate)
* input/output stream. The latter could be either stdin/stdout or a
* socket, depending on whether we're running under Postmaster control.
*
* Do not allow a Postgres user without superuser privilege to read from
* or write to a file.
*
* Do not allow the copy if user doesn't have proper permission to access
* the table or the specifically requested columns.
*/
uint64
DoCopy(const CopyStmt *stmt, const char *queryString)
{
CopyState cstate;
bool is_from = stmt->is_from;
bool pipe = (stmt->filename == NULL);
Relation rel;
uint64 processed;
/* Disallow file COPY except to superusers. */
if (!pipe && !superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to COPY to or from a file"),
errhint("Anyone can COPY to stdout or from stdin. "
"psql's \\copy command also works for anyone.")));
if (stmt->relation)
{
TupleDesc tupDesc;
AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
RangeTblEntry *rte;
List *attnums;
ListCell *cur;
Assert(!stmt->query);
/* Open and lock the relation, using the appropriate lock type. */
rel = heap_openrv(stmt->relation,
(is_from ? RowExclusiveLock : AccessShareLock));
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_RELATION;
rte->relid = RelationGetRelid(rel);
rte->requiredPerms = required_access;
tupDesc = RelationGetDescr(rel);
attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
foreach (cur, attnums)
{
int attno = lfirst_int(cur) -
FirstLowInvalidHeapAttributeNumber;
if (is_from)
rte->modifiedCols = bms_add_member(rte->modifiedCols, attno);
else
rte->selectedCols = bms_add_member(rte->selectedCols, attno);
}
ExecCheckRTPerms(list_make1(rte), true);
}
else
{
Assert(stmt->query);
rel = NULL;
}
if (is_from)
{
/* check read-only transaction */
if (XactReadOnly && rel->rd_backend != MyBackendId)
PreventCommandIfReadOnly("COPY FROM");
cstate = BeginCopyFrom(rel, stmt->filename,
stmt->attlist, stmt->options);
processed = CopyFrom(cstate); /* copy from file to database */
EndCopyFrom(cstate);
}
else
{
cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename,
stmt->attlist, stmt->options);
processed = DoCopyTo(cstate); /* copy from database to file */
EndCopyTo(cstate);
}
/*
* Close the relation. If reading, we can release the AccessShareLock we
* got; if writing, we should hold the lock until end of transaction to
* ensure that updates will be committed before lock is released.
*/
if (rel != NULL)
heap_close(rel, (is_from ? NoLock : AccessShareLock));
return processed;
}
/*
* Common setup routines used by BeginCopyFrom and BeginCopyTo.
*
* Iff <binary>, unload or reload in the binary format, as opposed to the
* more wasteful but more robust and portable text format.
*
......@@ -711,35 +827,42 @@ CopyLoadRawBuf(CopyState cstate)
*
* If in the text format, delimit columns with delimiter <delim> and print
* NULL values as <null_print>.
*
* Do not allow a Postgres user without superuser privilege to read from
* or write to a file.
*
* Do not allow the copy if user doesn't have proper permission to access
* the table or the specifically requested columns.
*/
uint64
DoCopy(const CopyStmt *stmt, const char *queryString)
static CopyState
BeginCopy(bool is_from,
Relation rel,
Node *raw_query,
const char *queryString,
List *attnamelist,
List *options)
{
CopyState cstate;
bool is_from = stmt->is_from;
bool pipe = (stmt->filename == NULL);
List *attnamelist = stmt->attlist;
List *force_quote = NIL;
List *force_notnull = NIL;
bool force_quote_all = false;
bool format_specified = false;
AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
ListCell *option;
TupleDesc tupDesc;
int num_phys_attrs;
uint64 processed;
MemoryContext oldcontext;
/* Allocate workspace and zero all fields */
cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
/*
* We allocate everything used by a cstate in a new memory context.
* This would avoid memory leaks repeated uses of COPY in a query.
*/
cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
"COPY",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
/* Extract options from the statement node tree */
foreach(option, stmt->options)
foreach(option, options)
{
DefElem *defel = (DefElem *) lfirst(option);
......@@ -980,51 +1103,14 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("CSV quote character must not appear in the NULL specification")));
/* Disallow file COPY except to superusers. */
if (!pipe && !superuser())
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("must be superuser to COPY to or from a file"),
errhint("Anyone can COPY to stdout or from stdin. "
"psql's \\copy command also works for anyone.")));
if (stmt->relation)
if (rel)
{
RangeTblEntry *rte;
List *attnums;
ListCell *cur;
Assert(!stmt->query);
cstate->queryDesc = NULL;
Assert(!raw_query);
/* Open and lock the relation, using the appropriate lock type. */
cstate->rel = heap_openrv(stmt->relation,
(is_from ? RowExclusiveLock : AccessShareLock));
cstate->rel = rel;
tupDesc = RelationGetDescr(cstate->rel);
/* Check relation permissions. */
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_RELATION;
rte->relid = RelationGetRelid(cstate->rel);
rte->requiredPerms = required_access;
attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
foreach (cur, attnums)
{
int attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber;
if (is_from)
rte->modifiedCols = bms_add_member(rte->modifiedCols, attno);
else
rte->selectedCols = bms_add_member(rte->selectedCols, attno);
}
ExecCheckRTPerms(list_make1(rte), true);
/* check read-only transaction */
if (XactReadOnly && is_from && cstate->rel->rd_backend != MyBackendId)
PreventCommandIfReadOnly("COPY FROM");
/* Don't allow COPY w/ OIDs to or from a table without them */
if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
ereport(ERROR,
......@@ -1058,7 +1144,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
* function and is executed repeatedly. (See also the same hack in
* DECLARE CURSOR and PREPARE.) XXX FIXME someday.
*/
rewritten = pg_analyze_and_rewrite((Node *) copyObject(stmt->query),
rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
queryString, NULL, 0);
/* We don't expect more or less than one result query */
......@@ -1160,14 +1246,6 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
}
}
/* Set up variables to avoid per-attribute overhead. */
initStringInfo(&cstate->attribute_buf);
initStringInfo(&cstate->line_buf);
cstate->line_buf_converted = false;
cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
cstate->raw_buf_index = cstate->raw_buf_len = 0;
cstate->processed = 0;
/*
* Set up encoding conversion info. Even if the client and server
* encodings are the same, we must apply pg_client_to_server() to validate
......@@ -1181,84 +1259,75 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding);
cstate->copy_dest = COPY_FILE; /* default */
cstate->filename = stmt->filename;
if (is_from)
CopyFrom(cstate); /* copy from file to database */
else
DoCopyTo(cstate); /* copy from database to file */
MemoryContextSwitchTo(oldcontext);
/*
* Close the relation or query. If reading, we can release the
* AccessShareLock we got; if writing, we should hold the lock until end
* of transaction to ensure that updates will be committed before lock is
* released.
*/
if (cstate->rel)
heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock));
else
{
/* Close down the query and free resources. */
ExecutorEnd(cstate->queryDesc);
FreeQueryDesc(cstate->queryDesc);
PopActiveSnapshot();
}
return cstate;
}
/* Clean up storage (probably not really necessary) */
processed = cstate->processed;
/*
* Release resources allocated in a cstate for COPY TO/FROM.
*/
static void
EndCopy(CopyState cstate)
{
if (cstate->filename != NULL && FreeFile(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m",
cstate->filename)));
pfree(cstate->attribute_buf.data);
pfree(cstate->line_buf.data);
pfree(cstate->raw_buf);
MemoryContextDelete(cstate->copycontext);
pfree(cstate);
return processed;
}
/*
* This intermediate routine exists mainly to localize the effects of setjmp
* so we don't need to plaster a lot of variables with "volatile".
*/
static void
DoCopyTo(CopyState cstate)
* Setup CopyState to read tuples from a table or a query for COPY TO.
*/
static CopyState
BeginCopyTo(Relation rel,
Node *query,
const char *queryString,
const char *filename,
List *attnamelist,
List *options)
{
bool pipe = (cstate->filename == NULL);
CopyState cstate;
bool pipe = (filename == NULL);
MemoryContext oldcontext;
if (cstate->rel)
if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
{
if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
{
if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
if (rel->rd_rel->relkind == RELKIND_VIEW)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from view \"%s\"",
RelationGetRelationName(cstate->rel)),
RelationGetRelationName(rel)),
errhint("Try the COPY (SELECT ...) TO variant.")));
else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from foreign table \"%s\"",
RelationGetRelationName(cstate->rel)),
RelationGetRelationName(rel)),
errhint("Try the COPY (SELECT ...) TO variant.")));
else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from sequence \"%s\"",
RelationGetRelationName(cstate->rel))));
RelationGetRelationName(rel))));
else
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot copy from non-table relation \"%s\"",
RelationGetRelationName(cstate->rel))));
}
RelationGetRelationName(rel))));
}
cstate = BeginCopy(false, rel, query, queryString, attnamelist, options);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
if (pipe)
{
if (whereToSendOutput == DestRemote)
cstate->fe_copy = true;
else
if (whereToSendOutput != DestRemote)
cstate->copy_file = stdout;
}
else
......@@ -1270,11 +1339,12 @@ DoCopyTo(CopyState cstate)
* Prevent write to relative path ... too easy to shoot oneself in the
* foot by overwriting a database file ...
*/
if (!is_absolute_path(cstate->filename))
if (!is_absolute_path(filename))
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("relative path not allowed for COPY to file")));
cstate->filename = pstrdup(filename);
oumask = umask(S_IWGRP | S_IWOTH);
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
umask(oumask);
......@@ -1292,14 +1362,30 @@ DoCopyTo(CopyState cstate)
errmsg("\"%s\" is a directory", cstate->filename)));
}
MemoryContextSwitchTo(oldcontext);
return cstate;
}
/*
* This intermediate routine exists mainly to localize the effects of setjmp
* so we don't need to plaster a lot of variables with "volatile".
*/
static uint64
DoCopyTo(CopyState cstate)
{
bool pipe = (cstate->filename == NULL);
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
uint64 processed;
PG_TRY();
{
if (cstate->fe_copy)
if (fe_copy)
SendCopyBegin(cstate);
CopyTo(cstate);
processed = CopyTo(cstate);
if (cstate->fe_copy)
if (fe_copy)
SendCopyEnd(cstate);
}
PG_CATCH();
......@@ -1314,26 +1400,38 @@ DoCopyTo(CopyState cstate)
}
PG_END_TRY();
if (!pipe)
return processed;
}
/*
* Clean up storage and release resources for COPY TO.
*/
static void
EndCopyTo(CopyState cstate)
{
if (cstate->queryDesc != NULL)
{
if (FreeFile(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m",
cstate->filename)));
/* Close down the query and free resources. */
ExecutorEnd(cstate->queryDesc);
FreeQueryDesc(cstate->queryDesc);
PopActiveSnapshot();
}
/* Clean up storage */
EndCopy(cstate);
}
/*
* Copy from relation or query TO file.
*/
static void
static uint64
CopyTo(CopyState cstate)
{
TupleDesc tupDesc;
int num_phys_attrs;
Form_pg_attribute *attr;
ListCell *cur;
uint64 processed;
if (cstate->rel)
tupDesc = RelationGetDescr(cstate->rel);
......@@ -1439,6 +1537,7 @@ CopyTo(CopyState cstate)
scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
processed = 0;
while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
{
CHECK_FOR_INTERRUPTS();
......@@ -1448,14 +1547,19 @@ CopyTo(CopyState cstate)
/* Format and send the data */
CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
processed++;
}
heap_endscan(scandesc);
pfree(values);
pfree(nulls);
}
else
{
/* run the plan --- the dest receiver will send tuples */
ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
if (cstate->binary)
......@@ -1467,6 +1571,8 @@ CopyTo(CopyState cstate)
}
MemoryContextDelete(cstate->rowcontext);
return processed;
}
/*
......@@ -1558,16 +1664,16 @@ CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
CopySendEndOfRow(cstate);
MemoryContextSwitchTo(oldcontext);
cstate->processed++;
}
/*
* error context callback for COPY FROM
*
* The argument for the error context must be CopyState.
*/
static void
copy_in_error_callback(void *arg)
void
CopyFromErrorCallback(void *arg)
{
CopyState cstate = (CopyState) arg;
......@@ -1669,41 +1775,23 @@ limit_printout_length(const char *str)
/*
* Copy FROM file to relation.
*/
static void
static uint64
CopyFrom(CopyState cstate)
{
bool pipe = (cstate->filename == NULL);
HeapTuple tuple;
TupleDesc tupDesc;
Form_pg_attribute *attr;
AttrNumber num_phys_attrs,
attr_count,
num_defaults;
FmgrInfo *in_functions;
FmgrInfo oid_in_function;
Oid *typioparams;
Oid oid_typioparam;
int attnum;
int i;
Oid in_func_oid;
Datum *values;
bool *nulls;
int nfields;
char **field_strings;
bool done = false;
bool isnull;
ResultRelInfo *resultRelInfo;
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
ExprContext *econtext;
TupleTableSlot *slot;
bool file_has_oids;
int *defmap;
ExprState **defexprs; /* array of default att expressions */
ExprContext *econtext; /* used for ExecEvalExpr for default atts */
MemoryContext oldcontext = CurrentMemoryContext;
ErrorContextCallback errcontext;
CommandId mycid = GetCurrentCommandId(true);
int hi_options = 0; /* start with default heap_insert options */
BulkInsertState bistate;
uint64 processed = 0;
Assert(cstate->rel);
......@@ -1731,6 +1819,8 @@ CopyFrom(CopyState cstate)
RelationGetRelationName(cstate->rel))));
}
tupDesc = RelationGetDescr(cstate->rel);
/*----------
* Check to see if we can avoid writing WAL
*
......@@ -1766,38 +1856,6 @@ CopyFrom(CopyState cstate)
hi_options |= HEAP_INSERT_SKIP_WAL;
}
if (pipe)
{
if (whereToSendOutput == DestRemote)
ReceiveCopyBegin(cstate);
else
cstate->copy_file = stdin;
}
else
{
struct stat st;
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for reading: %m",
cstate->filename)));
fstat(fileno(cstate->copy_file), &st);
if (S_ISDIR(st.st_mode))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));
}
tupDesc = RelationGetDescr(cstate->rel);
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
attr_count = list_length(cstate->attnumlist);
num_defaults = 0;
/*
* We need a ResultRelInfo so we can use the regular executor's
* index-entry-making machinery. (There used to be a huge amount of code
......@@ -1826,8 +1884,191 @@ CopyFrom(CopyState cstate)
slot = ExecInitExtraTupleSlot(estate);
ExecSetSlotDescriptor(slot, tupDesc);
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
/*
* Check BEFORE STATEMENT insertion triggers. It's debateable whether we
* should do this for COPY, since it's not really an "INSERT" statement as
* such. However, executing these triggers maintains consistency with the
* EACH ROW triggers that we already fire on COPY.
*/
ExecBSInsertTriggers(estate, resultRelInfo);
values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
bistate = GetBulkInsertState();
econtext = GetPerTupleExprContext(estate);
/* Set up callback to identify error line number */
errcontext.callback = CopyFromErrorCallback;
errcontext.arg = (void *) cstate;
errcontext.previous = error_context_stack;
error_context_stack = &errcontext;
for (;;)
{
bool skip_tuple;
Oid loaded_oid = InvalidOid;
CHECK_FOR_INTERRUPTS();
/* Reset the per-tuple exprcontext */
ResetPerTupleExprContext(estate);
/* Switch into its memory context */
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
break;
/* And now we can form the input tuple. */
tuple = heap_form_tuple(tupDesc, values, nulls);
if (loaded_oid != InvalidOid)
HeapTupleSetOid(tuple, loaded_oid);
/* Triggers and stuff need to be invoked in query context. */
MemoryContextSwitchTo(oldcontext);
skip_tuple = false;
/* BEFORE ROW INSERT Triggers */
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_before_row)
{
HeapTuple newtuple;
newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);
if (newtuple == NULL) /* "do nothing" */
skip_tuple = true;
else if (newtuple != tuple) /* modified by Trigger(s) */
{
heap_freetuple(tuple);
tuple = newtuple;
}
}
if (!skip_tuple)
{
List *recheckIndexes = NIL;
/* Place tuple in tuple slot */
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
/* Check the constraints of the tuple */
if (cstate->rel->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
/* OK, store the tuple and create index entries for it */
heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
estate);
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, tuple,
recheckIndexes);
list_free(recheckIndexes);
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger;
* this is the same definition used by execMain.c for counting
* tuples inserted by an INSERT command.
*/
processed++;
}
}
/* Done, clean up */
error_context_stack = errcontext.previous;
FreeBulkInsertState(bistate);
MemoryContextSwitchTo(oldcontext);
/* Execute AFTER STATEMENT insertion triggers */
ExecASInsertTriggers(estate, resultRelInfo);
/* Handle queued AFTER triggers */
AfterTriggerEndQuery(estate);
pfree(values);
pfree(nulls);
ExecResetTupleTable(estate->es_tupleTable, false);
ExecCloseIndices(resultRelInfo);
FreeExecutorState(estate);
/*
* If we skipped writing WAL, then we need to sync the heap (but not
* indexes since those use WAL anyway)
*/
if (hi_options & HEAP_INSERT_SKIP_WAL)
heap_sync(cstate->rel);
return processed;
}
/*
* Setup to read tuples from a file for COPY FROM.
*
* 'rel': Used as a template for the tuples
* 'filename': Name of server-local file to read
* 'attnamelist': List of char *, columns to include. NIL selects all cols.
* 'options': List of DefElem. See copy_opt_item in gram.y for selections.
*
* Returns a CopyState, to be passed to NextCopyFrom and related functions.
*/
CopyState
BeginCopyFrom(Relation rel,
const char *filename,
List *attnamelist,
List *options)
{
CopyState cstate;
bool pipe = (filename == NULL);
TupleDesc tupDesc;
Form_pg_attribute *attr;
AttrNumber num_phys_attrs,
num_defaults;
FmgrInfo *in_functions;
Oid *typioparams;
int attnum;
Oid in_func_oid;
int *defmap;
ExprState **defexprs;
MemoryContext oldcontext;
cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
/* Initialize state variables */
cstate->fe_eof = false;
cstate->eol_type = EOL_UNKNOWN;
cstate->cur_relname = RelationGetRelationName(cstate->rel);
cstate->cur_lineno = 0;
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;
/* Set up variables to avoid per-attribute overhead. */
initStringInfo(&cstate->attribute_buf);
initStringInfo(&cstate->line_buf);
cstate->line_buf_converted = false;
cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
cstate->raw_buf_index = cstate->raw_buf_len = 0;
tupDesc = RelationGetDescr(cstate->rel);
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
num_defaults = 0;
/*
* Pick up the required catalog information for each attribute in the
* relation, including the input function, the element type (to pass to
......@@ -1863,27 +2104,54 @@ CopyFrom(CopyState cstate)
if (defexpr != NULL)
{
defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr,
estate);
/* Initialize expressions in copycontext. */
defexprs[num_defaults] = ExecInitExpr(
expression_planner((Expr *) defexpr), NULL);
defmap[num_defaults] = attnum - 1;
num_defaults++;
}
}
}
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
/* We keep those variables in cstate. */
cstate->in_functions = in_functions;
cstate->typioparams = typioparams;
cstate->defmap = defmap;
cstate->defexprs = defexprs;
cstate->num_defaults = num_defaults;
/*
* Check BEFORE STATEMENT insertion triggers. It's debateable whether we
* should do this for COPY, since it's not really an "INSERT" statement as
* such. However, executing these triggers maintains consistency with the
* EACH ROW triggers that we already fire on COPY.
*/
ExecBSInsertTriggers(estate, resultRelInfo);
if (pipe)
{
if (whereToSendOutput == DestRemote)
ReceiveCopyBegin(cstate);
else
cstate->copy_file = stdin;
}
else
{
struct stat st;
cstate->filename = pstrdup(filename);
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
if (cstate->copy_file == NULL)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not open file \"%s\" for reading: %m",
cstate->filename)));
fstat(fileno(cstate->copy_file), &st);
if (S_ISDIR(st.st_mode))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));
}
if (!cstate->binary)
file_has_oids = cstate->oids; /* must rely on user to tell us... */
{
/* must rely on user to tell us... */
cstate->file_has_oids = cstate->oids;
}
else
{
/* Read and verify binary header */
......@@ -1901,7 +2169,7 @@ CopyFrom(CopyState cstate)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid COPY file header (missing flags)")));
file_has_oids = (tmp & (1 << 16)) != 0;
cstate->file_has_oids = (tmp & (1 << 16)) != 0;
tmp &= ~(1 << 16);
if ((tmp >> 16) != 0)
ereport(ERROR,
......@@ -1923,73 +2191,58 @@ CopyFrom(CopyState cstate)
}
}
if (file_has_oids && cstate->binary)
if (cstate->file_has_oids && cstate->binary)
{
getTypeBinaryInputInfo(OIDOID,
&in_func_oid, &oid_typioparam);
fmgr_info(in_func_oid, &oid_in_function);
&in_func_oid, &cstate->oid_typioparam);
fmgr_info(in_func_oid, &cstate->oid_in_function);
}
values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
/* create workspace for CopyReadAttributes results */
nfields = file_has_oids ? (attr_count + 1) : attr_count;
if (! cstate->binary)
if (!cstate->binary)
{
AttrNumber attr_count = list_length(cstate->attnumlist);
int nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
cstate->max_fields = nfields;
cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
}
/* Initialize state variables */
cstate->fe_eof = false;
cstate->eol_type = EOL_UNKNOWN;
cstate->cur_relname = RelationGetRelationName(cstate->rel);
cstate->cur_lineno = 0;
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;
MemoryContextSwitchTo(oldcontext);
bistate = GetBulkInsertState();
return cstate;
}
/* Set up callback to identify error line number */
errcontext.callback = copy_in_error_callback;
errcontext.arg = (void *) cstate;
errcontext.previous = error_context_stack;
error_context_stack = &errcontext;
/*
* Read raw fields in the next line for COPY FROM in text or csv mode.
* Return false if no more lines.
*
* An internal temporary buffer is returned via 'fields'. It is valid until
* the next call of the function. Since the function returns all raw fields
* in the input file, 'nfields' could be different from the number of columns
* in the relation.
*
* NOTE: force_not_null option are not applied to the returned fields.
*/
bool
NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
{
int fldct;
bool done;
/* only available for text or csv input */
Assert(!cstate->binary);
/* on input just throw the header line away */
if (cstate->header_line)
if (cstate->cur_lineno == 0 && cstate->header_line)
{
cstate->cur_lineno++;
done = CopyReadLine(cstate);
if (CopyReadLine(cstate))
return false; /* done */
}
while (!done)
{
bool skip_tuple;
Oid loaded_oid = InvalidOid;
CHECK_FOR_INTERRUPTS();
cstate->cur_lineno++;
/* Reset the per-tuple exprcontext */
ResetPerTupleExprContext(estate);
/* Switch into its memory context */
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
/* Initialize all values for row to NULL */
MemSet(values, 0, num_phys_attrs * sizeof(Datum));
MemSet(nulls, true, num_phys_attrs * sizeof(bool));
if (!cstate->binary)
{
ListCell *cur;
int fldct;
int fieldno;
char *string;
/* Actually read the line into memory here */
done = CopyReadLine(cstate);
......@@ -1999,7 +2252,7 @@ CopyFrom(CopyState cstate)
* EOF, ie, process the line and then exit loop on next iteration.
*/
if (done && cstate->line_buf.len == 0)
break;
return false;
/* Parse the line into de-escaped field values */
if (cstate->csv_mode)
......@@ -2007,6 +2260,62 @@ CopyFrom(CopyState cstate)
else
fldct = CopyReadAttributesText(cstate);
*fields = cstate->raw_fields;
*nfields = fldct;
return true;
}
/*
* Read next tuple from file for COPY FROM. Return false if no more tuples.
*
* 'econtext' is used to evaluate default expression for each columns not
* read from the file. It can be NULL when no default values are used, i.e.
* when all columns are read from the file.
*
* 'values' and 'nulls' arrays must be the same length as columns of the
* relation passed to BeginCopyFrom. This function fills the arrays.
* Oid of the tuple is returned with 'tupleOid' separately.
*/
bool
NextCopyFrom(CopyState cstate, ExprContext *econtext,
Datum *values, bool *nulls, Oid *tupleOid)
{
TupleDesc tupDesc;
Form_pg_attribute *attr;
AttrNumber num_phys_attrs,
attr_count,
num_defaults = cstate->num_defaults;
FmgrInfo *in_functions = cstate->in_functions;
Oid *typioparams = cstate->typioparams;
int i;
int nfields;
bool isnull;
bool file_has_oids = cstate->file_has_oids;
int *defmap = cstate->defmap;
ExprState **defexprs = cstate->defexprs;
tupDesc = RelationGetDescr(cstate->rel);
attr = tupDesc->attrs;
num_phys_attrs = tupDesc->natts;
attr_count = list_length(cstate->attnumlist);
nfields = file_has_oids ? (attr_count + 1) : attr_count;
/* Initialize all values for row to NULL */
MemSet(values, 0, num_phys_attrs * sizeof(Datum));
MemSet(nulls, true, num_phys_attrs * sizeof(bool));
if (!cstate->binary)
{
char **field_strings;
ListCell *cur;
int fldct;
int fieldno;
char *string;
/* read raw fields in the next line */
if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
return false;
/* check for overflowing fields */
if (nfields > 0 && fldct > nfields)
ereport(ERROR,
......@@ -2014,7 +2323,6 @@ CopyFrom(CopyState cstate)
errmsg("extra data after last expected column")));
fieldno = 0;
field_strings = cstate->raw_fields;
/* Read the OID field if present */
if (file_has_oids)
......@@ -2029,13 +2337,13 @@ CopyFrom(CopyState cstate)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("null OID in COPY data")));
else
else if (cstate->oids && tupleOid != NULL)
{
cstate->cur_attname = "oid";
cstate->cur_attval = string;
loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin,
*tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
CStringGetDatum(string)));
if (loaded_oid == InvalidOid)
if (*tupleOid == InvalidOid)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid OID in COPY data")));
......@@ -2084,11 +2392,12 @@ CopyFrom(CopyState cstate)
int16 fld_count;
ListCell *cur;
cstate->cur_lineno++;
if (!CopyGetInt16(cstate, &fld_count))
{
/* EOF detected (end of file, or protocol-level EOF) */
done = true;
break;
return false;
}
if (fld_count == -1)
......@@ -2112,8 +2421,7 @@ CopyFrom(CopyState cstate)
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("received copy data after EOF marker")));
done = true;
break;
return false;
}
if (fld_count != attr_count)
......@@ -2124,12 +2432,14 @@ CopyFrom(CopyState cstate)
if (file_has_oids)
{
Oid loaded_oid;
cstate->cur_attname = "oid";
loaded_oid =
DatumGetObjectId(CopyReadBinaryAttribute(cstate,
0,
&oid_in_function,
oid_typioparam,
&cstate->oid_in_function,
cstate->oid_typioparam,
-1,
&isnull));
if (isnull || loaded_oid == InvalidOid)
......@@ -2137,6 +2447,8 @@ CopyFrom(CopyState cstate)
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid OID in COPY data")));
cstate->cur_attname = NULL;
if (cstate->oids && tupleOid != NULL)
*tupleOid = loaded_oid;
}
i = 0;
......@@ -2164,118 +2476,31 @@ CopyFrom(CopyState cstate)
*/
for (i = 0; i < num_defaults; i++)
{
/*
* The caller must supply econtext and have switched into the
* per-tuple memory context in it.
*/
Assert(econtext != NULL);
Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
&nulls[defmap[i]], NULL);
}
/* And now we can form the input tuple. */
tuple = heap_form_tuple(tupDesc, values, nulls);
if (cstate->oids && file_has_oids)
HeapTupleSetOid(tuple, loaded_oid);
/* Triggers and stuff need to be invoked in query context. */
MemoryContextSwitchTo(oldcontext);
skip_tuple = false;
/* BEFORE ROW INSERT Triggers */
if (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_before_row)
{
HeapTuple newtuple;
newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);
if (newtuple == NULL) /* "do nothing" */
skip_tuple = true;
else if (newtuple != tuple) /* modified by Trigger(s) */
{
heap_freetuple(tuple);
tuple = newtuple;
}
}
if (!skip_tuple)
{
List *recheckIndexes = NIL;
/* Place tuple in tuple slot */
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
/* Check the constraints of the tuple */
if (cstate->rel->rd_att->constr)
ExecConstraints(resultRelInfo, slot, estate);
/* OK, store the tuple and create index entries for it */
heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
if (resultRelInfo->ri_NumIndices > 0)
recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
estate);
/* AFTER ROW INSERT Triggers */
ExecARInsertTriggers(estate, resultRelInfo, tuple,
recheckIndexes);
list_free(recheckIndexes);
return true;
}
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger;
* this is the same definition used by execMain.c for counting
* tuples inserted by an INSERT command.
/*
* Clean up storage and release resources for COPY FROM.
*/
cstate->processed++;
}
}
/* Done, clean up */
error_context_stack = errcontext.previous;
FreeBulkInsertState(bistate);
MemoryContextSwitchTo(oldcontext);
/* Execute AFTER STATEMENT insertion triggers */
ExecASInsertTriggers(estate, resultRelInfo);
/* Handle queued AFTER triggers */
AfterTriggerEndQuery(estate);
pfree(values);
pfree(nulls);
if (! cstate->binary)
pfree(cstate->raw_fields);
pfree(in_functions);
pfree(typioparams);
pfree(defmap);
pfree(defexprs);
ExecResetTupleTable(estate->es_tupleTable, false);
ExecCloseIndices(resultRelInfo);
FreeExecutorState(estate);
if (!pipe)
{
if (FreeFile(cstate->copy_file))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m",
cstate->filename)));
}
void
EndCopyFrom(CopyState cstate)
{
/* No COPY FROM related resources except memory. */
/*
* If we skipped writing WAL, then we need to sync the heap (but not
* indexes since those use WAL anyway)
*/
if (hi_options & HEAP_INSERT_SKIP_WAL)
heap_sync(cstate->rel);
EndCopy(cstate);
}
/*
* Read the next input line and stash it in line_buf, with conversion to
* server encoding.
......@@ -3537,6 +3762,7 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
/* And send the data */
CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
myState->processed++;
}
/*
......
......@@ -14,12 +14,24 @@
#ifndef COPY_H
#define COPY_H
#include "nodes/execnodes.h"
#include "nodes/parsenodes.h"
#include "tcop/dest.h"
typedef struct CopyStateData *CopyState;
extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString);
extern CopyState BeginCopyFrom(Relation rel, const char *filename,
List *attnamelist, List *options);
extern void EndCopyFrom(CopyState cstate);
extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
Datum *values, bool *nulls, Oid *tupleOid);
extern bool NextCopyFromRawFields(CopyState cstate,
char ***fields, int *nfields);
extern void CopyFromErrorCallback(void *arg);
extern DestReceiver *CreateCopyDestReceiver(void);
#endif /* COPY_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment