Commit 7e413a0f authored by Alvaro Herrera's avatar Alvaro Herrera

pg_dump: allow multiple rows per insert

This is useful to speed up loading data in a different database engine.

Authors: Surafel Temesgen and David Rowley.  Lightly edited by Álvaro.
Reviewed-by: Fabien Coelho
Discussion: https://postgr.es/m/CALAY4q9kumSdnRBzvRJvSRf2+BH20YmSvzqOkvwpEmodD-xv6g@mail.gmail.com
parent 42210524
...@@ -661,9 +661,9 @@ PostgreSQL documentation ...@@ -661,9 +661,9 @@ PostgreSQL documentation
...</literal>). This will make restoration very slow; it is mainly ...</literal>). This will make restoration very slow; it is mainly
useful for making dumps that can be loaded into useful for making dumps that can be loaded into
non-<productname>PostgreSQL</productname> databases. non-<productname>PostgreSQL</productname> databases.
However, since this option generates a separate command for each row, Any error during reloading will cause only rows that are part of the
an error in reloading a row causes only that row to be lost rather problematic <command>INSERT</command> to be lost, rather than the
than the entire table contents. entire table contents.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
...@@ -775,13 +775,12 @@ PostgreSQL documentation ...@@ -775,13 +775,12 @@ PostgreSQL documentation
than <command>COPY</command>). This will make restoration very slow; than <command>COPY</command>). This will make restoration very slow;
it is mainly useful for making dumps that can be loaded into it is mainly useful for making dumps that can be loaded into
non-<productname>PostgreSQL</productname> databases. non-<productname>PostgreSQL</productname> databases.
However, since this option generates a separate command for each row, Any error during reloading will cause only rows that are part of the
an error in reloading a row causes only that row to be lost rather problematic <command>INSERT</command> to be lost, rather than the
than the entire table contents. entire table contents. Note that the restore might fail altogether if
Note that you have rearranged column order. The
the restore might fail altogether if you have rearranged column order. <option>--column-inserts</option> option is safe against column order
The <option>--column-inserts</option> option is safe against column changes, though even slower.
order changes, though even slower.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
...@@ -925,8 +924,9 @@ PostgreSQL documentation ...@@ -925,8 +924,9 @@ PostgreSQL documentation
<para> <para>
Add <literal>ON CONFLICT DO NOTHING</literal> to Add <literal>ON CONFLICT DO NOTHING</literal> to
<command>INSERT</command> commands. <command>INSERT</command> commands.
This option is not valid unless <option>--inserts</option> or This option is not valid unless <option>--inserts</option>,
<option>--column-inserts</option> is also specified. <option>--column-inserts</option> or
<option>--rows-per-insert</option> is also specified.
</para> </para>
</listitem> </listitem>
</varlistentry> </varlistentry>
...@@ -949,6 +949,20 @@ PostgreSQL documentation ...@@ -949,6 +949,20 @@ PostgreSQL documentation
</listitem> </listitem>
</varlistentry> </varlistentry>
<varlistentry>
<term><option>--rows-per-insert=<replaceable class="parameter">nrows</replaceable></option></term>
<listitem>
<para>
Dump data as <command>INSERT</command> commands (rather than
<command>COPY</command>). Controls the maximum number of rows per
<command>INSERT</command> command. The value specified must be a
number greater than zero. Any error during reloading will cause only
rows that are part of the problematic <command>INSERT</command> to be
lost, rather than the entire table contents.
</para>
</listitem>
</varlistentry>
<varlistentry> <varlistentry>
<term><option>--section=<replaceable class="parameter">sectionname</replaceable></option></term> <term><option>--section=<replaceable class="parameter">sectionname</replaceable></option></term>
<listitem> <listitem>
......
...@@ -140,10 +140,10 @@ typedef struct _dumpOptions ...@@ -140,10 +140,10 @@ typedef struct _dumpOptions
int dumpSections; /* bitmask of chosen sections */ int dumpSections; /* bitmask of chosen sections */
bool aclsSkip; bool aclsSkip;
const char *lockWaitTimeout; const char *lockWaitTimeout;
int dump_inserts; /* 0 = COPY, otherwise rows per INSERT */
/* flags for various command-line long options */ /* flags for various command-line long options */
int disable_dollar_quoting; int disable_dollar_quoting;
int dump_inserts;
int column_inserts; int column_inserts;
int if_exists; int if_exists;
int no_comments; int no_comments;
......
...@@ -138,6 +138,12 @@ static const CatalogId nilCatalogId = {0, 0}; ...@@ -138,6 +138,12 @@ static const CatalogId nilCatalogId = {0, 0};
static bool have_extra_float_digits = false; static bool have_extra_float_digits = false;
static int extra_float_digits; static int extra_float_digits;
/*
* The default number of rows per INSERT when
* --inserts is specified without --rows-per-insert
*/
#define DUMP_DEFAULT_ROWS_PER_INSERT 1
/* /*
* Macro for producing quoted, schema-qualified name of a dumpable object. * Macro for producing quoted, schema-qualified name of a dumpable object.
*/ */
...@@ -306,11 +312,13 @@ main(int argc, char **argv) ...@@ -306,11 +312,13 @@ main(int argc, char **argv)
DumpableObject *boundaryObjs; DumpableObject *boundaryObjs;
int i; int i;
int optindex; int optindex;
char *endptr;
RestoreOptions *ropt; RestoreOptions *ropt;
Archive *fout; /* the script file */ Archive *fout; /* the script file */
const char *dumpencoding = NULL; const char *dumpencoding = NULL;
const char *dumpsnapshot = NULL; const char *dumpsnapshot = NULL;
char *use_role = NULL; char *use_role = NULL;
long rowsPerInsert;
int numWorkers = 1; int numWorkers = 1;
trivalue prompt_password = TRI_DEFAULT; trivalue prompt_password = TRI_DEFAULT;
int compressLevel = -1; int compressLevel = -1;
...@@ -363,7 +371,7 @@ main(int argc, char **argv) ...@@ -363,7 +371,7 @@ main(int argc, char **argv)
{"exclude-table-data", required_argument, NULL, 4}, {"exclude-table-data", required_argument, NULL, 4},
{"extra-float-digits", required_argument, NULL, 8}, {"extra-float-digits", required_argument, NULL, 8},
{"if-exists", no_argument, &dopt.if_exists, 1}, {"if-exists", no_argument, &dopt.if_exists, 1},
{"inserts", no_argument, &dopt.dump_inserts, 1}, {"inserts", no_argument, NULL, 9},
{"lock-wait-timeout", required_argument, NULL, 2}, {"lock-wait-timeout", required_argument, NULL, 2},
{"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1}, {"no-tablespaces", no_argument, &dopt.outputNoTablespaces, 1},
{"quote-all-identifiers", no_argument, &quote_all_identifiers, 1}, {"quote-all-identifiers", no_argument, &quote_all_identifiers, 1},
...@@ -382,6 +390,7 @@ main(int argc, char **argv) ...@@ -382,6 +390,7 @@ main(int argc, char **argv)
{"no-subscriptions", no_argument, &dopt.no_subscriptions, 1}, {"no-subscriptions", no_argument, &dopt.no_subscriptions, 1},
{"no-sync", no_argument, NULL, 7}, {"no-sync", no_argument, NULL, 7},
{"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1}, {"on-conflict-do-nothing", no_argument, &dopt.do_nothing, 1},
{"rows-per-insert", required_argument, NULL, 10},
{NULL, 0, NULL, 0} {NULL, 0, NULL, 0}
}; };
...@@ -572,6 +581,31 @@ main(int argc, char **argv) ...@@ -572,6 +581,31 @@ main(int argc, char **argv)
} }
break; break;
case 9: /* inserts */
/*
* dump_inserts also stores --rows-per-insert, careful not to
* overwrite that.
*/
if (dopt.dump_inserts == 0)
dopt.dump_inserts = DUMP_DEFAULT_ROWS_PER_INSERT;
break;
case 10: /* rows per insert */
errno = 0;
rowsPerInsert = strtol(optarg, &endptr, 10);
if (endptr == optarg || *endptr != '\0' ||
rowsPerInsert <= 0 || rowsPerInsert > INT_MAX ||
errno == ERANGE)
{
write_msg(NULL, "rows-per-insert must be in range %d..%d\n",
1, INT_MAX);
exit_nicely(1);
}
dopt.dump_inserts = (int) rowsPerInsert;
break;
default: default:
fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
exit_nicely(1); exit_nicely(1);
...@@ -596,8 +630,8 @@ main(int argc, char **argv) ...@@ -596,8 +630,8 @@ main(int argc, char **argv)
} }
/* --column-inserts implies --inserts */ /* --column-inserts implies --inserts */
if (dopt.column_inserts) if (dopt.column_inserts && dopt.dump_inserts == 0)
dopt.dump_inserts = 1; dopt.dump_inserts = DUMP_DEFAULT_ROWS_PER_INSERT;
/* /*
* Binary upgrade mode implies dumping sequence data even in schema-only * Binary upgrade mode implies dumping sequence data even in schema-only
...@@ -622,8 +656,12 @@ main(int argc, char **argv) ...@@ -622,8 +656,12 @@ main(int argc, char **argv)
if (dopt.if_exists && !dopt.outputClean) if (dopt.if_exists && !dopt.outputClean)
exit_horribly(NULL, "option --if-exists requires option -c/--clean\n"); exit_horribly(NULL, "option --if-exists requires option -c/--clean\n");
if (dopt.do_nothing && !(dopt.dump_inserts || dopt.column_inserts)) /*
exit_horribly(NULL, "option --on-conflict-do-nothing requires option --inserts or --column-inserts\n"); * --inserts are already implied above if --column-inserts or
* --rows-per-insert were specified.
*/
if (dopt.do_nothing && dopt.dump_inserts == 0)
exit_horribly(NULL, "option --on-conflict-do-nothing requires option --inserts, --rows-per-insert or --column-inserts\n");
/* Identify archive format to emit */ /* Identify archive format to emit */
archiveFormat = parseArchiveFormat(format, &archiveMode); archiveFormat = parseArchiveFormat(format, &archiveMode);
...@@ -993,6 +1031,7 @@ help(const char *progname) ...@@ -993,6 +1031,7 @@ help(const char *progname)
printf(_(" --no-unlogged-table-data do not dump unlogged table data\n")); printf(_(" --no-unlogged-table-data do not dump unlogged table data\n"));
printf(_(" --on-conflict-do-nothing add ON CONFLICT DO NOTHING to INSERT commands\n")); printf(_(" --on-conflict-do-nothing add ON CONFLICT DO NOTHING to INSERT commands\n"));
printf(_(" --quote-all-identifiers quote all identifiers, even if not key words\n")); printf(_(" --quote-all-identifiers quote all identifiers, even if not key words\n"));
printf(_(" --rows-per-insert=NROWS number of rows per INSERT; implies --inserts\n"));
printf(_(" --section=SECTION dump named section (pre-data, data, or post-data)\n")); printf(_(" --section=SECTION dump named section (pre-data, data, or post-data)\n"));
printf(_(" --serializable-deferrable wait until the dump can run without anomalies\n")); printf(_(" --serializable-deferrable wait until the dump can run without anomalies\n"));
printf(_(" --snapshot=SNAPSHOT use given snapshot for the dump\n")); printf(_(" --snapshot=SNAPSHOT use given snapshot for the dump\n"));
...@@ -1909,9 +1948,9 @@ dumpTableData_insert(Archive *fout, void *dcontext) ...@@ -1909,9 +1948,9 @@ dumpTableData_insert(Archive *fout, void *dcontext)
PQExpBuffer q = createPQExpBuffer(); PQExpBuffer q = createPQExpBuffer();
PQExpBuffer insertStmt = NULL; PQExpBuffer insertStmt = NULL;
PGresult *res; PGresult *res;
int tuple;
int nfields; int nfields;
int field; int rows_per_statement = dopt->dump_inserts;
int rows_this_statement = 0;
appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR " appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR "
"SELECT * FROM ONLY %s", "SELECT * FROM ONLY %s",
...@@ -1926,69 +1965,88 @@ dumpTableData_insert(Archive *fout, void *dcontext) ...@@ -1926,69 +1965,88 @@ dumpTableData_insert(Archive *fout, void *dcontext)
res = ExecuteSqlQuery(fout, "FETCH 100 FROM _pg_dump_cursor", res = ExecuteSqlQuery(fout, "FETCH 100 FROM _pg_dump_cursor",
PGRES_TUPLES_OK); PGRES_TUPLES_OK);
nfields = PQnfields(res); nfields = PQnfields(res);
for (tuple = 0; tuple < PQntuples(res); tuple++)
/*
* First time through, we build as much of the INSERT statement as
* possible in "insertStmt", which we can then just print for each
* statement. If the table happens to have zero columns then this will
* be a complete statement, otherwise it will end in "VALUES" and be
* ready to have the row's column values printed.
*/
if (insertStmt == NULL)
{ {
/* TableInfo *targettab;
* First time through, we build as much of the INSERT statement as
* possible in "insertStmt", which we can then just print for each
* line. If the table happens to have zero columns then this will
* be a complete statement, otherwise it will end in "VALUES(" and
* be ready to have the row's column values appended.
*/
if (insertStmt == NULL)
{
TableInfo *targettab;
insertStmt = createPQExpBuffer(); insertStmt = createPQExpBuffer();
/* /*
* When load-via-partition-root is set, get the root table * When load-via-partition-root is set, get the root table name
* name for the partition table, so that we can reload data * for the partition table, so that we can reload data through the
* through the root table. * root table.
*/ */
if (dopt->load_via_partition_root && tbinfo->ispartition) if (dopt->load_via_partition_root && tbinfo->ispartition)
targettab = getRootTableInfo(tbinfo); targettab = getRootTableInfo(tbinfo);
else else
targettab = tbinfo; targettab = tbinfo;
appendPQExpBuffer(insertStmt, "INSERT INTO %s ", appendPQExpBuffer(insertStmt, "INSERT INTO %s ",
fmtQualifiedDumpable(targettab)); fmtQualifiedDumpable(targettab));
/* corner case for zero-column table */ /* corner case for zero-column table */
if (nfields == 0) if (nfields == 0)
{ {
appendPQExpBufferStr(insertStmt, "DEFAULT VALUES;\n"); appendPQExpBufferStr(insertStmt, "DEFAULT VALUES;\n");
} }
else else
{
/* append the list of column names if required */
if (dopt->column_inserts)
{ {
/* append the list of column names if required */ appendPQExpBufferChar(insertStmt, '(');
if (dopt->column_inserts) for (int field = 0; field < nfields; field++)
{ {
appendPQExpBufferChar(insertStmt, '('); if (field > 0)
for (field = 0; field < nfields; field++) appendPQExpBufferStr(insertStmt, ", ");
{ appendPQExpBufferStr(insertStmt,
if (field > 0) fmtId(PQfname(res, field)));
appendPQExpBufferStr(insertStmt, ", ");
appendPQExpBufferStr(insertStmt,
fmtId(PQfname(res, field)));
}
appendPQExpBufferStr(insertStmt, ") ");
} }
appendPQExpBufferStr(insertStmt, ") ");
}
if (tbinfo->needs_override) if (tbinfo->needs_override)
appendPQExpBufferStr(insertStmt, "OVERRIDING SYSTEM VALUE "); appendPQExpBufferStr(insertStmt, "OVERRIDING SYSTEM VALUE ");
appendPQExpBufferStr(insertStmt, "VALUES ("); appendPQExpBufferStr(insertStmt, "VALUES");
}
} }
}
archputs(insertStmt->data, fout); for (int tuple = 0; tuple < PQntuples(res); tuple++)
{
/* Write the INSERT if not in the middle of a multi-row INSERT. */
if (rows_this_statement == 0)
archputs(insertStmt->data, fout);
/* if it is zero-column table then we're done */ /*
* If it is zero-column table then we've aleady written the
* complete statement, which will mean we've disobeyed
* --rows-per-insert when it's set greater than 1. We do support
* a way to make this multi-row with: SELECT UNION ALL SELECT
* UNION ALL ... but that's non-standard so we should avoid it
* given that using INSERTs is mostly only ever needed for
* cross-database exports.
*/
if (nfields == 0) if (nfields == 0)
continue; continue;
for (field = 0; field < nfields; field++) /* Emit a row heading */
if (rows_per_statement == 1)
archputs(" (", fout);
else if (rows_this_statement > 0)
archputs(",\n\t(", fout);
else
archputs("\n\t(", fout);
for (int field = 0; field < nfields; field++)
{ {
if (field > 0) if (field > 0)
archputs(", ", fout); archputs(", ", fout);
...@@ -2053,10 +2111,19 @@ dumpTableData_insert(Archive *fout, void *dcontext) ...@@ -2053,10 +2111,19 @@ dumpTableData_insert(Archive *fout, void *dcontext)
} }
} }
if (!dopt->do_nothing) /* Terminate the row ... */
archputs(");\n", fout); archputs(")", fout);
else
archputs(") ON CONFLICT DO NOTHING;\n", fout); /* ... and the statement, if the target no. of rows is reached */
if (++rows_this_statement >= rows_per_statement)
{
if (dopt->do_nothing)
archputs(" ON CONFLICT DO NOTHING;\n", fout);
else
archputs(";\n", fout);
/* Reset the row counter */
rows_this_statement = 0;
}
} }
if (PQntuples(res) <= 0) if (PQntuples(res) <= 0)
...@@ -2067,6 +2134,15 @@ dumpTableData_insert(Archive *fout, void *dcontext) ...@@ -2067,6 +2134,15 @@ dumpTableData_insert(Archive *fout, void *dcontext)
PQclear(res); PQclear(res);
} }
/* Terminate any statements that didn't make the row count. */
if (rows_this_statement > 0)
{
if (dopt->do_nothing)
archputs(" ON CONFLICT DO NOTHING;\n", fout);
else
archputs(";\n", fout);
}
archputs("\n\n", fout); archputs("\n\n", fout);
ExecuteSqlStatement(fout, "CLOSE _pg_dump_cursor"); ExecuteSqlStatement(fout, "CLOSE _pg_dump_cursor");
......
...@@ -118,8 +118,8 @@ command_fails_like( ...@@ -118,8 +118,8 @@ command_fails_like(
command_fails_like( command_fails_like(
[ 'pg_dump', '--on-conflict-do-nothing' ], [ 'pg_dump', '--on-conflict-do-nothing' ],
qr/\Qpg_dump: option --on-conflict-do-nothing requires option --inserts or --column-inserts\E/, qr/pg_dump: option --on-conflict-do-nothing requires option --inserts, --rows-per-insert or --column-inserts/,
'pg_dump: option --on-conflict-do-nothing requires option --inserts or --column-inserts'); 'pg_dump: --on-conflict-do-nothing requires --inserts, --rows-per-insert, --column-inserts');
# pg_dumpall command-line argument checks # pg_dumpall command-line argument checks
command_fails_like( command_fails_like(
......
...@@ -295,6 +295,18 @@ my %pgdump_runs = ( ...@@ -295,6 +295,18 @@ my %pgdump_runs = (
"$tempdir/role_parallel", "$tempdir/role_parallel",
], ],
}, },
rows_per_insert => {
dump_cmd => [
'pg_dump',
'--no-sync',
"--file=$tempdir/rows_per_insert.sql",
'-a',
'--rows-per-insert=4',
'--table=dump_test.test_table',
'--table=dump_test.test_fourth_table',
'postgres',
],
},
schema_only => { schema_only => {
dump_cmd => [ dump_cmd => [
'pg_dump', '--format=plain', 'pg_dump', '--format=plain',
...@@ -1228,10 +1240,11 @@ my %tests = ( ...@@ -1228,10 +1240,11 @@ my %tests = (
'COPY test_fourth_table' => { 'COPY test_fourth_table' => {
create_order => 7, create_order => 7,
create_sql => create_sql =>
'INSERT INTO dump_test.test_fourth_table DEFAULT VALUES;', 'INSERT INTO dump_test.test_fourth_table DEFAULT VALUES;'
. 'INSERT INTO dump_test.test_fourth_table DEFAULT VALUES;',
regexp => qr/^ regexp => qr/^
\QCOPY dump_test.test_fourth_table FROM stdin;\E \QCOPY dump_test.test_fourth_table FROM stdin;\E
\n\n\\\.\n \n\n\n\\\.\n
/xm, /xm,
like => { like => {
%full_runs, %full_runs,
...@@ -1295,6 +1308,19 @@ my %tests = ( ...@@ -1295,6 +1308,19 @@ my %tests = (
like => { column_inserts => 1, }, like => { column_inserts => 1, },
}, },
'test_table with 4-row INSERTs' => {
regexp => qr/^
(?:
INSERT\ INTO\ dump_test\.test_table\ VALUES\n
(?:\t\(\d,\ NULL,\ NULL,\ NULL\),\n){3}
\t\(\d,\ NULL,\ NULL,\ NULL\);\n
){2}
INSERT\ INTO\ dump_test\.test_table\ VALUES\n
\t\(\d,\ NULL,\ NULL,\ NULL\);
/xm,
like => { rows_per_insert => 1, },
},
'INSERT INTO test_second_table' => { 'INSERT INTO test_second_table' => {
regexp => qr/^ regexp => qr/^
(?:INSERT\ INTO\ dump_test\.test_second_table\ \(col1,\ col2\) (?:INSERT\ INTO\ dump_test\.test_second_table\ \(col1,\ col2\)
...@@ -1304,8 +1330,8 @@ my %tests = ( ...@@ -1304,8 +1330,8 @@ my %tests = (
'INSERT INTO test_fourth_table' => { 'INSERT INTO test_fourth_table' => {
regexp => regexp =>
qr/^\QINSERT INTO dump_test.test_fourth_table DEFAULT VALUES;\E/m, qr/^(?:INSERT INTO dump_test\.test_fourth_table DEFAULT VALUES;\n){2}/m,
like => { column_inserts => 1, }, like => { column_inserts => 1, rows_per_insert => 1, },
}, },
'INSERT INTO test_fifth_table' => { 'INSERT INTO test_fifth_table' => {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment