Commit 9d2d4570 authored by Michael Paquier's avatar Michael Paquier

Add support for more progress reporting in COPY

The command (TO or FROM), its type (file, pipe, program or callback),
and the number of tuples excluded by a WHERE clause in COPY FROM are
added to the progress reporting already available.

The column "lines_processed" is renamed to "tuples_processed" to
disambiguate the meaning of this column in the cases of CSV and BINARY
COPY and to be more consistent with the other catalog progress views.

Bump catalog version, again.

Author: Matthias van de Meent
Reviewed-by: Michael Paquier, Justin Pryzby, Bharath Rupireddy, Josef
Šimánek, Tomas Vondra
Discussion: https://postgr.es/m/CAEze2WiOcgdH4aQA8NtZq-4dgvnJzp8PohdeKchPkhMY-jWZXA@mail.gmail.com
parent f9264d15
......@@ -6531,8 +6531,33 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
<structfield>relid</structfield> <type>oid</type>
</para>
<para>
OID of the table on which the <command>COPY</command> command is executed.
It is set to 0 if copying from a <command>SELECT</command> query.
OID of the table on which the <command>COPY</command> command is
executed. It is set to <literal>0</literal> if copying from a
<command>SELECT</command> query.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>command</structfield> <type>text</type>
</para>
<para>
The command that is running: <literal>COPY FROM</literal>, or
<literal>COPY TO</literal>.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>type</structfield> <type>text</type>
</para>
<para>
The io type that the data is read from or written to:
<literal>FILE</literal>, <literal>PROGRAM</literal>,
<literal>PIPE</literal> (for <command>COPY FROM STDIN</command> and
<command>COPY TO STDOUT</command>), or <literal>CALLBACK</literal>
(used for example during the initial table synchronization in
logical replication).
</para></entry>
</row>
......@@ -6551,16 +6576,26 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
</para>
<para>
Size of source file for <command>COPY FROM</command> command in bytes.
It is set to 0 if not available.
It is set to <literal>0</literal> if not available.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>tuples_processed</structfield> <type>bigint</type>
</para>
<para>
Number of tuples already processed by <command>COPY</command> command.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>lines_processed</structfield> <type>bigint</type>
<structfield>tuples_excluded</structfield> <type>bigint</type>
</para>
<para>
Number of lines already processed by <command>COPY</command> command.
Number of tuples not processed because they were excluded by the
<command>WHERE</command> clause of the <command>COPY</command> command.
</para></entry>
</row>
</tbody>
......
......@@ -1129,9 +1129,18 @@ CREATE VIEW pg_stat_progress_copy AS
SELECT
S.pid AS pid, S.datid AS datid, D.datname AS datname,
S.relid AS relid,
CASE S.param5 WHEN 1 THEN 'COPY FROM'
WHEN 2 THEN 'COPY TO'
END AS command,
CASE S.param6 WHEN 1 THEN 'FILE'
WHEN 2 THEN 'PROGRAM'
WHEN 3 THEN 'PIPE'
WHEN 4 THEN 'CALLBACK'
END AS "type",
S.param1 AS bytes_processed,
S.param2 AS bytes_total,
S.param3 AS lines_processed
S.param3 AS tuples_processed,
S.param4 AS tuples_excluded
FROM pg_stat_get_progress_info('COPY') AS S
LEFT JOIN pg_database D ON S.datid = D.oid;
......
......@@ -539,7 +539,8 @@ CopyFrom(CopyFromState cstate)
BulkInsertState bistate = NULL;
CopyInsertMethod insertMethod;
CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
uint64 processed = 0;
int64 processed = 0;
int64 excluded = 0;
bool has_before_insert_row_trig;
bool has_instead_insert_row_trig;
bool leafpart_use_multi_insert = false;
......@@ -869,7 +870,15 @@ CopyFrom(CopyFromState cstate)
econtext->ecxt_scantuple = myslot;
/* Skip items that don't match COPY's WHERE clause */
if (!ExecQual(cstate->qualexpr, econtext))
{
/*
* Report that this tuple was filtered out by the WHERE
* clause.
*/
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
++excluded);
continue;
}
}
/* Determine the partition to insert the tuple into */
......@@ -1104,10 +1113,11 @@ CopyFrom(CopyFromState cstate)
/*
* We count only tuples not suppressed by a BEFORE INSERT trigger
* or FDW; this is the same definition used by nodeModifyTable.c
* for counting tuples inserted by an INSERT command. Update
* for counting tuples inserted by an INSERT command. Update
* progress of the COPY command as well.
*/
pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
++processed);
}
}
......@@ -1193,6 +1203,16 @@ BeginCopyFrom(ParseState *pstate,
ExprState **defexprs;
MemoryContext oldcontext;
bool volatile_defexprs;
const int progress_cols[] = {
PROGRESS_COPY_COMMAND,
PROGRESS_COPY_TYPE,
PROGRESS_COPY_BYTES_TOTAL
};
int64 progress_vals[] = {
PROGRESS_COPY_COMMAND_FROM,
0,
0
};
/* Allocate workspace and zero all fields */
cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
......@@ -1430,11 +1450,13 @@ BeginCopyFrom(ParseState *pstate,
if (data_source_cb)
{
progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
cstate->copy_src = COPY_CALLBACK;
cstate->data_source_cb = data_source_cb;
}
else if (pipe)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput == DestRemote)
ReceiveCopyBegin(cstate);
......@@ -1447,6 +1469,7 @@ BeginCopyFrom(ParseState *pstate,
if (cstate->is_program)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
if (cstate->copy_file == NULL)
ereport(ERROR,
......@@ -1458,6 +1481,7 @@ BeginCopyFrom(ParseState *pstate,
{
struct stat st;
progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
if (cstate->copy_file == NULL)
{
......@@ -1484,10 +1508,12 @@ BeginCopyFrom(ParseState *pstate,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename)));
pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
progress_vals[2] = st.st_size;
}
}
pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
if (cstate->opts.binary)
{
/* Read and verify binary header */
......
......@@ -353,6 +353,14 @@ BeginCopyTo(ParseState *pstate,
TupleDesc tupDesc;
int num_phys_attrs;
MemoryContext oldcontext;
const int progress_cols[] = {
PROGRESS_COPY_COMMAND,
PROGRESS_COPY_TYPE
};
int64 progress_vals[] = {
PROGRESS_COPY_COMMAND_TO,
0
};
if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
{
......@@ -659,6 +667,8 @@ BeginCopyTo(ParseState *pstate,
if (pipe)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
Assert(!is_program); /* the grammar does not allow this */
if (whereToSendOutput != DestRemote)
cstate->copy_file = stdout;
......@@ -670,6 +680,7 @@ BeginCopyTo(ParseState *pstate,
if (is_program)
{
progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
if (cstate->copy_file == NULL)
ereport(ERROR,
......@@ -682,6 +693,8 @@ BeginCopyTo(ParseState *pstate,
mode_t oumask; /* Pre-existing umask value */
struct stat st;
progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
/*
* Prevent write to relative path ... too easy to shoot oneself in
* the foot by overwriting a database file ...
......@@ -731,6 +744,8 @@ BeginCopyTo(ParseState *pstate,
/* initialize progress */
pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
cstate->bytes_processed = 0;
MemoryContextSwitchTo(oldcontext);
......@@ -881,8 +896,12 @@ DoCopyTo(CopyToState cstate)
/* Format and send the data */
CopyOneRowTo(cstate, slot);
/* Increment amount of processed tuples and update the progress */
pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
/*
* Increment the number of processed tuples, and report the
* progress.
*/
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
++processed);
}
ExecDropSingleTupleTableSlot(slot);
......@@ -1251,8 +1270,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
/* Send the data */
CopyOneRowTo(cstate, slot);
/* Increment amount of processed tuples and update the progress */
pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
/* Increment the number of processed tuples, and report the progress */
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
++myState->processed);
return true;
}
......
......@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202103091
#define CATALOG_VERSION_NO 202103092
#endif
......@@ -133,9 +133,22 @@
#define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4
#define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5
/* Commands of PROGRESS_COPY */
/* Progress parameters for PROGRESS_COPY */
#define PROGRESS_COPY_BYTES_PROCESSED 0
#define PROGRESS_COPY_BYTES_TOTAL 1
#define PROGRESS_COPY_LINES_PROCESSED 2
#define PROGRESS_COPY_TUPLES_PROCESSED 2
#define PROGRESS_COPY_TUPLES_EXCLUDED 3
#define PROGRESS_COPY_COMMAND 4
#define PROGRESS_COPY_TYPE 5
/* Commands of COPY (as advertised via PROGRESS_COPY_COMMAND) */
#define PROGRESS_COPY_COMMAND_FROM 1
#define PROGRESS_COPY_COMMAND_TO 2
/* Types of COPY commands (as advertised via PROGRESS_COPY_TYPE) */
#define PROGRESS_COPY_TYPE_FILE 1
#define PROGRESS_COPY_TYPE_PROGRAM 2
#define PROGRESS_COPY_TYPE_PIPE 3
#define PROGRESS_COPY_TYPE_CALLBACK 4
#endif
......@@ -1950,9 +1950,22 @@ pg_stat_progress_copy| SELECT s.pid,
s.datid,
d.datname,
s.relid,
CASE s.param5
WHEN 1 THEN 'COPY FROM'::text
WHEN 2 THEN 'COPY TO'::text
ELSE NULL::text
END AS command,
CASE s.param6
WHEN 1 THEN 'FILE'::text
WHEN 2 THEN 'PROGRAM'::text
WHEN 3 THEN 'PIPE'::text
WHEN 4 THEN 'CALLBACK'::text
ELSE NULL::text
END AS type,
s.param1 AS bytes_processed,
s.param2 AS bytes_total,
s.param3 AS lines_processed
s.param3 AS tuples_processed,
s.param4 AS tuples_excluded
FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
LEFT JOIN pg_database d ON ((s.datid = d.oid)));
pg_stat_progress_create_index| SELECT s.pid,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment