Commit 8a4f618e authored by Tomas Vondra's avatar Tomas Vondra

Report progress of COPY commands

This commit introduces a view pg_stat_progress_copy, reporting progress
of COPY commands.  This allows rough estimates how far a running COPY
progressed, with the caveat that the total number of bytes may not be
available in some cases (e.g. when the input comes from the client).

Author: Josef Šimánek
Reviewed-by: Fujii Masao, Bharath Rupireddy, Vignesh C, Matthias van de Meent
Discussion: https://postgr.es/m/CAFp7QwqMGEi4OyyaLEK9DR0+E+oK3UtA4bEjDVCa4bNkwUY2PQ@mail.gmail.com
Discussion: https://postgr.es/m/CAFp7Qwr6_FmRM6pCO0x_a0mymOfX_Gg+FEKet4XaTGSW=LitKQ@mail.gmail.com
parent ca8217c1
...@@ -399,6 +399,12 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ...@@ -399,6 +399,12 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
</entry> </entry>
</row> </row>
<row>
<entry><structname>pg_stat_progress_copy</structname><indexterm><primary>pg_stat_progress_copy</primary></indexterm></entry>
<entry>One row for each backend running <command>COPY</command>, showing current progress.
See <xref linkend='copy-progress-reporting'/>.
</entry>
</row>
</tbody> </tbody>
</tgroup> </tgroup>
</table> </table>
...@@ -5247,6 +5253,7 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, ...@@ -5247,6 +5253,7 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
which support progress reporting are <command>ANALYZE</command>, which support progress reporting are <command>ANALYZE</command>,
<command>CLUSTER</command>, <command>CLUSTER</command>,
<command>CREATE INDEX</command>, <command>VACUUM</command>, <command>CREATE INDEX</command>, <command>VACUUM</command>,
<command>COPY</command>,
and <xref linkend="protocol-replication-base-backup"/> (i.e., replication and <xref linkend="protocol-replication-base-backup"/> (i.e., replication
command that <xref linkend="app-pgbasebackup"/> issues to take command that <xref linkend="app-pgbasebackup"/> issues to take
a base backup). a base backup).
...@@ -6396,6 +6403,106 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, ...@@ -6396,6 +6403,106 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
</table> </table>
</sect2> </sect2>
<sect2 id="copy-progress-reporting">
<title>COPY Progress Reporting</title>
<indexterm>
<primary>pg_stat_progress_copy</primary>
</indexterm>
<para>
Whenever <command>COPY</command> is running, the
<structname>pg_stat_progress_copy</structname> view will contain one row
for each backend that is currently running <command>COPY</command> command.
The table bellow describes the information that will be reported and provide
information how to interpret it.
</para>
<table id="pg-stat-progress-copy-view" xreflabel="pg_stat_progress_copy">
<title><structname>pg_stat_progress_copy</structname> View</title>
<tgroup cols="1">
<thead>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
Column Type
</para>
<para>
Description
</para></entry>
</row>
</thead>
<tbody>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>pid</structfield> <type>integer</type>
</para>
<para>
Process ID of backend.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>datid</structfield> <type>text</type>
</para>
<para>
OID of the database to which this backend is connected.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>datname</structfield> <type>name</type>
</para>
<para>
Name of the database to which this backend is connected.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>relid</structfield> <type>oid</type>
</para>
<para>
OID of the table on which the <command>COPY</command> command is executed.
It is set to 0 if <command>SELECT</command> query is provided.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>bytes_processed</structfield> <type>bigint</type>
</para>
<para>
Number of bytes already processed by <command>COPY</command> command.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>bytes_total</structfield> <type>bigint</type>
</para>
<para>
Size of source file for <command>COPY FROM</command> command in bytes.
It is set to 0 if not available.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>lines_processed</structfield> <type>bigint</type>
</para>
<para>
Number of lines already processed by <command>COPY</command> command.
</para></entry>
</row>
</tbody>
</tgroup>
</table>
</sect2>
</sect1> </sect1>
<sect1 id="dynamic-trace"> <sect1 id="dynamic-trace">
......
...@@ -1117,6 +1117,17 @@ CREATE VIEW pg_stat_progress_basebackup AS ...@@ -1117,6 +1117,17 @@ CREATE VIEW pg_stat_progress_basebackup AS
S.param5 AS tablespaces_streamed S.param5 AS tablespaces_streamed
FROM pg_stat_get_progress_info('BASEBACKUP') AS S; FROM pg_stat_get_progress_info('BASEBACKUP') AS S;
CREATE VIEW pg_stat_progress_copy AS
SELECT
S.pid AS pid, S.datid AS datid, D.datname AS datname,
S.relid AS relid,
S.param1 AS bytes_processed,
S.param2 AS bytes_total,
S.param3 AS lines_processed
FROM pg_stat_get_progress_info('COPY') AS S
LEFT JOIN pg_database D ON S.datid = D.oid;
CREATE VIEW pg_user_mappings AS CREATE VIEW pg_user_mappings AS
SELECT SELECT
U.oid AS umid, U.oid AS umid,
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "access/xlog.h" #include "access/xlog.h"
#include "commands/copy.h" #include "commands/copy.h"
#include "commands/copyfrom_internal.h" #include "commands/copyfrom_internal.h"
#include "commands/progress.h"
#include "commands/trigger.h" #include "commands/trigger.h"
#include "executor/execPartition.h" #include "executor/execPartition.h"
#include "executor/executor.h" #include "executor/executor.h"
...@@ -35,6 +36,7 @@ ...@@ -35,6 +36,7 @@
#include "libpq/pqformat.h" #include "libpq/pqformat.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "optimizer/optimizer.h" #include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h" #include "rewrite/rewriteHandler.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
...@@ -1100,9 +1102,10 @@ CopyFrom(CopyFromState cstate) ...@@ -1100,9 +1102,10 @@ CopyFrom(CopyFromState cstate)
/* /*
* We count only tuples not suppressed by a BEFORE INSERT trigger * We count only tuples not suppressed by a BEFORE INSERT trigger
* or FDW; this is the same definition used by nodeModifyTable.c * or FDW; this is the same definition used by nodeModifyTable.c
* for counting tuples inserted by an INSERT command. * for counting tuples inserted by an INSERT command. Update
* progress of the COPY command as well.
*/ */
processed++; pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
} }
} }
...@@ -1415,6 +1418,12 @@ BeginCopyFrom(ParseState *pstate, ...@@ -1415,6 +1418,12 @@ BeginCopyFrom(ParseState *pstate,
} }
} }
/* initialize progress */
pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
cstate->bytes_processed = 0;
/* We keep those variables in cstate. */ /* We keep those variables in cstate. */
cstate->in_functions = in_functions; cstate->in_functions = in_functions;
cstate->typioparams = typioparams; cstate->typioparams = typioparams;
...@@ -1479,6 +1488,8 @@ BeginCopyFrom(ParseState *pstate, ...@@ -1479,6 +1488,8 @@ BeginCopyFrom(ParseState *pstate,
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE), (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is a directory", cstate->filename))); errmsg("\"%s\" is a directory", cstate->filename)));
pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
} }
} }
...@@ -1522,6 +1533,8 @@ EndCopyFrom(CopyFromState cstate) ...@@ -1522,6 +1533,8 @@ EndCopyFrom(CopyFromState cstate)
cstate->filename))); cstate->filename)));
} }
pgstat_progress_end_command();
MemoryContextDelete(cstate->copycontext); MemoryContextDelete(cstate->copycontext);
pfree(cstate); pfree(cstate);
} }
......
...@@ -20,11 +20,13 @@ ...@@ -20,11 +20,13 @@
#include "commands/copy.h" #include "commands/copy.h"
#include "commands/copyfrom_internal.h" #include "commands/copyfrom_internal.h"
#include "commands/progress.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "libpq/libpq.h" #include "libpq/libpq.h"
#include "libpq/pqformat.h" #include "libpq/pqformat.h"
#include "mb/pg_wchar.h" #include "mb/pg_wchar.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h"
#include "port/pg_bswap.h" #include "port/pg_bswap.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/rel.h" #include "utils/rel.h"
...@@ -384,6 +386,8 @@ CopyLoadRawBuf(CopyFromState cstate) ...@@ -384,6 +386,8 @@ CopyLoadRawBuf(CopyFromState cstate)
cstate->raw_buf[nbytes] = '\0'; cstate->raw_buf[nbytes] = '\0';
cstate->raw_buf_index = 0; cstate->raw_buf_index = 0;
cstate->raw_buf_len = nbytes; cstate->raw_buf_len = nbytes;
cstate->bytes_processed += nbytes;
pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
return (inbytes > 0); return (inbytes > 0);
} }
......
...@@ -24,6 +24,7 @@ ...@@ -24,6 +24,7 @@
#include "access/xact.h" #include "access/xact.h"
#include "access/xlog.h" #include "access/xlog.h"
#include "commands/copy.h" #include "commands/copy.h"
#include "commands/progress.h"
#include "executor/execdesc.h" #include "executor/execdesc.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "executor/tuptable.h" #include "executor/tuptable.h"
...@@ -32,6 +33,7 @@ ...@@ -32,6 +33,7 @@
#include "mb/pg_wchar.h" #include "mb/pg_wchar.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "optimizer/optimizer.h" #include "optimizer/optimizer.h"
#include "pgstat.h"
#include "rewrite/rewriteHandler.h" #include "rewrite/rewriteHandler.h"
#include "storage/fd.h" #include "storage/fd.h"
#include "tcop/tcopprot.h" #include "tcop/tcopprot.h"
...@@ -95,6 +97,7 @@ typedef struct CopyToStateData ...@@ -95,6 +97,7 @@ typedef struct CopyToStateData
FmgrInfo *out_functions; /* lookup info for output functions */ FmgrInfo *out_functions; /* lookup info for output functions */
MemoryContext rowcontext; /* per-row evaluation context */ MemoryContext rowcontext; /* per-row evaluation context */
uint64 bytes_processed; /* number of bytes processed so far */
} CopyToStateData; } CopyToStateData;
...@@ -288,6 +291,10 @@ CopySendEndOfRow(CopyToState cstate) ...@@ -288,6 +291,10 @@ CopySendEndOfRow(CopyToState cstate)
break; break;
} }
/* Update the progress */
cstate->bytes_processed += fe_msgbuf->len;
pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
resetStringInfo(fe_msgbuf); resetStringInfo(fe_msgbuf);
} }
...@@ -363,6 +370,8 @@ EndCopy(CopyToState cstate) ...@@ -363,6 +370,8 @@ EndCopy(CopyToState cstate)
cstate->filename))); cstate->filename)));
} }
pgstat_progress_end_command();
MemoryContextDelete(cstate->copycontext); MemoryContextDelete(cstate->copycontext);
pfree(cstate); pfree(cstate);
} }
...@@ -760,6 +769,11 @@ BeginCopyTo(ParseState *pstate, ...@@ -760,6 +769,11 @@ BeginCopyTo(ParseState *pstate,
} }
} }
/* initialize progress */
pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
cstate->bytes_processed = 0;
MemoryContextSwitchTo(oldcontext); MemoryContextSwitchTo(oldcontext);
return cstate; return cstate;
...@@ -938,7 +952,9 @@ CopyTo(CopyToState cstate) ...@@ -938,7 +952,9 @@ CopyTo(CopyToState cstate)
/* Format and send the data */ /* Format and send the data */
CopyOneRowTo(cstate, slot); CopyOneRowTo(cstate, slot);
processed++;
/* Increment amount of processed tuples and update the progress */
pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
} }
ExecDropSingleTupleTableSlot(slot); ExecDropSingleTupleTableSlot(slot);
...@@ -1303,7 +1319,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) ...@@ -1303,7 +1319,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
/* Send the data */ /* Send the data */
CopyOneRowTo(cstate, slot); CopyOneRowTo(cstate, slot);
myState->processed++;
/* Increment amount of processed tuples and update the progress */
pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
return true; return true;
} }
......
...@@ -494,6 +494,8 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) ...@@ -494,6 +494,8 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
cmdtype = PROGRESS_COMMAND_CREATE_INDEX; cmdtype = PROGRESS_COMMAND_CREATE_INDEX;
else if (pg_strcasecmp(cmd, "BASEBACKUP") == 0) else if (pg_strcasecmp(cmd, "BASEBACKUP") == 0)
cmdtype = PROGRESS_COMMAND_BASEBACKUP; cmdtype = PROGRESS_COMMAND_BASEBACKUP;
else if (pg_strcasecmp(cmd, "COPY") == 0)
cmdtype = PROGRESS_COMMAND_COPY;
else else
ereport(ERROR, ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
......
...@@ -154,6 +154,7 @@ typedef struct CopyFromStateData ...@@ -154,6 +154,7 @@ typedef struct CopyFromStateData
char *raw_buf; char *raw_buf;
int raw_buf_index; /* next byte to process */ int raw_buf_index; /* next byte to process */
int raw_buf_len; /* total # of bytes stored */ int raw_buf_len; /* total # of bytes stored */
uint64 bytes_processed;/* number of bytes processed so far */
/* Shorthand for number of unconsumed bytes available in raw_buf */ /* Shorthand for number of unconsumed bytes available in raw_buf */
#define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index) #define RAW_BUF_BYTES(cstate) ((cstate)->raw_buf_len - (cstate)->raw_buf_index)
} CopyFromStateData; } CopyFromStateData;
......
...@@ -133,4 +133,9 @@ ...@@ -133,4 +133,9 @@
#define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4 #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4
#define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5
/* Commands of PROGRESS_COPY */
#define PROGRESS_COPY_BYTES_PROCESSED 0
#define PROGRESS_COPY_BYTES_TOTAL 1
#define PROGRESS_COPY_LINES_PROCESSED 2
#endif #endif
...@@ -1077,7 +1077,8 @@ typedef enum ProgressCommandType ...@@ -1077,7 +1077,8 @@ typedef enum ProgressCommandType
PROGRESS_COMMAND_ANALYZE, PROGRESS_COMMAND_ANALYZE,
PROGRESS_COMMAND_CLUSTER, PROGRESS_COMMAND_CLUSTER,
PROGRESS_COMMAND_CREATE_INDEX, PROGRESS_COMMAND_CREATE_INDEX,
PROGRESS_COMMAND_BASEBACKUP PROGRESS_COMMAND_BASEBACKUP,
PROGRESS_COMMAND_COPY
} ProgressCommandType; } ProgressCommandType;
#define PGSTAT_NUM_PROGRESS_PARAM 20 #define PGSTAT_NUM_PROGRESS_PARAM 20
......
...@@ -1937,6 +1937,15 @@ pg_stat_progress_cluster| SELECT s.pid, ...@@ -1937,6 +1937,15 @@ pg_stat_progress_cluster| SELECT s.pid,
s.param8 AS index_rebuild_count s.param8 AS index_rebuild_count
FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
LEFT JOIN pg_database d ON ((s.datid = d.oid))); LEFT JOIN pg_database d ON ((s.datid = d.oid)));
pg_stat_progress_copy| SELECT s.pid,
s.datid,
d.datname,
s.relid,
s.param1 AS bytes_processed,
s.param2 AS bytes_total,
s.param3 AS lines_processed
FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
LEFT JOIN pg_database d ON ((s.datid = d.oid)));
pg_stat_progress_create_index| SELECT s.pid, pg_stat_progress_create_index| SELECT s.pid,
s.datid, s.datid,
d.datname, d.datname,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment