Commit aedc4600 authored by Tom Lane's avatar Tom Lane

Fix pg_dump --inserts mode for generated columns with dropped columns.

If a table contains a generated column that's preceded by a dropped
column, dumpTableData_insert failed to account for the dropped
column, and would emit DEFAULT placeholder(s) in the wrong column(s).
This resulted in failures at restore time.  The default COPY code path
did not have this bug, likely explaining why it wasn't noticed sooner.

While we're fixing this, we can be a little smarter about the
situation: (1) avoid unnecessarily fetching the values of generated
columns, (2) omit generated columns from the output, too, if we're
using --column-inserts.  While these modes aren't expected to be
as high-performance as the COPY path, we might as well be as
efficient as we can; it doesn't add much complexity.

Per report from Дмитрий Иванов.
Back-patch to v12 where generated columns came in.

Discussion: https://postgr.es/m/CAPL5KHrkBniyQt5e1rafm5DdXvbgiiqfEQEJ9GjtVzN71Jj5pA@mail.gmail.com
parent e9af18c6
...@@ -2094,13 +2094,42 @@ dumpTableData_insert(Archive *fout, const void *dcontext) ...@@ -2094,13 +2094,42 @@ dumpTableData_insert(Archive *fout, const void *dcontext)
DumpOptions *dopt = fout->dopt; DumpOptions *dopt = fout->dopt;
PQExpBuffer q = createPQExpBuffer(); PQExpBuffer q = createPQExpBuffer();
PQExpBuffer insertStmt = NULL; PQExpBuffer insertStmt = NULL;
char *attgenerated;
PGresult *res; PGresult *res;
int nfields; int nfields,
i;
int rows_per_statement = dopt->dump_inserts; int rows_per_statement = dopt->dump_inserts;
int rows_this_statement = 0; int rows_this_statement = 0;
appendPQExpBuffer(q, "DECLARE _pg_dump_cursor CURSOR FOR " /*
"SELECT * FROM ONLY %s", * If we're going to emit INSERTs with column names, the most efficient
* way to deal with generated columns is to exclude them entirely. For
* INSERTs without column names, we have to emit DEFAULT rather than the
* actual column value --- but we can save a few cycles by fetching nulls
* rather than the uninteresting-to-us value.
*/
attgenerated = (char *) pg_malloc(tbinfo->numatts * sizeof(char));
appendPQExpBufferStr(q, "DECLARE _pg_dump_cursor CURSOR FOR SELECT ");
nfields = 0;
for (i = 0; i < tbinfo->numatts; i++)
{
if (tbinfo->attisdropped[i])
continue;
if (tbinfo->attgenerated[i] && dopt->column_inserts)
continue;
if (nfields > 0)
appendPQExpBufferStr(q, ", ");
if (tbinfo->attgenerated[i])
appendPQExpBufferStr(q, "NULL");
else
appendPQExpBufferStr(q, fmtId(tbinfo->attnames[i]));
attgenerated[nfields] = tbinfo->attgenerated[i];
nfields++;
}
/* Servers before 9.4 will complain about zero-column SELECT */
if (nfields == 0)
appendPQExpBufferStr(q, "NULL");
appendPQExpBuffer(q, " FROM ONLY %s",
fmtQualifiedDumpable(tbinfo)); fmtQualifiedDumpable(tbinfo));
if (tdinfo->filtercond) if (tdinfo->filtercond)
appendPQExpBuffer(q, " %s", tdinfo->filtercond); appendPQExpBuffer(q, " %s", tdinfo->filtercond);
...@@ -2111,14 +2140,19 @@ dumpTableData_insert(Archive *fout, const void *dcontext) ...@@ -2111,14 +2140,19 @@ dumpTableData_insert(Archive *fout, const void *dcontext)
{ {
res = ExecuteSqlQuery(fout, "FETCH 100 FROM _pg_dump_cursor", res = ExecuteSqlQuery(fout, "FETCH 100 FROM _pg_dump_cursor",
PGRES_TUPLES_OK); PGRES_TUPLES_OK);
nfields = PQnfields(res);
/* cross-check field count, allowing for dummy NULL if any */
if (nfields != PQnfields(res) &&
!(nfields == 0 && PQnfields(res) == 1))
fatal("wrong number of fields retrieved from table \"%s\"",
tbinfo->dobj.name);
/* /*
* First time through, we build as much of the INSERT statement as * First time through, we build as much of the INSERT statement as
* possible in "insertStmt", which we can then just print for each * possible in "insertStmt", which we can then just print for each
* statement. If the table happens to have zero columns then this will * statement. If the table happens to have zero dumpable columns then
* be a complete statement, otherwise it will end in "VALUES" and be * this will be a complete statement, otherwise it will end in
* ready to have the row's column values printed. * "VALUES" and be ready to have the row's column values printed.
*/ */
if (insertStmt == NULL) if (insertStmt == NULL)
{ {
...@@ -2197,7 +2231,7 @@ dumpTableData_insert(Archive *fout, const void *dcontext) ...@@ -2197,7 +2231,7 @@ dumpTableData_insert(Archive *fout, const void *dcontext)
{ {
if (field > 0) if (field > 0)
archputs(", ", fout); archputs(", ", fout);
if (tbinfo->attgenerated[field]) if (attgenerated[field])
{ {
archputs("DEFAULT", fout); archputs("DEFAULT", fout);
continue; continue;
...@@ -2302,6 +2336,7 @@ dumpTableData_insert(Archive *fout, const void *dcontext) ...@@ -2302,6 +2336,7 @@ dumpTableData_insert(Archive *fout, const void *dcontext)
destroyPQExpBuffer(q); destroyPQExpBuffer(q);
if (insertStmt != NULL) if (insertStmt != NULL)
destroyPQExpBuffer(insertStmt); destroyPQExpBuffer(insertStmt);
free(attgenerated);
return 1; return 1;
} }
......
...@@ -209,6 +209,13 @@ my %pgdump_runs = ( ...@@ -209,6 +209,13 @@ my %pgdump_runs = (
'postgres', 'postgres',
], ],
}, },
inserts => {
dump_cmd => [
'pg_dump', '--no-sync',
"--file=$tempdir/inserts.sql", '-a',
'--inserts', 'postgres',
],
},
pg_dumpall_globals => { pg_dumpall_globals => {
dump_cmd => [ dump_cmd => [
'pg_dumpall', '-v', "--file=$tempdir/pg_dumpall_globals.sql", 'pg_dumpall', '-v', "--file=$tempdir/pg_dumpall_globals.sql",
...@@ -603,6 +610,7 @@ my %tests = ( ...@@ -603,6 +610,7 @@ my %tests = (
%full_runs, %full_runs,
column_inserts => 1, column_inserts => 1,
data_only => 1, data_only => 1,
inserts => 1,
section_pre_data => 1, section_pre_data => 1,
test_schema_plus_blobs => 1, test_schema_plus_blobs => 1,
}, },
...@@ -930,6 +938,7 @@ my %tests = ( ...@@ -930,6 +938,7 @@ my %tests = (
%full_runs, %full_runs,
column_inserts => 1, column_inserts => 1,
data_only => 1, data_only => 1,
inserts => 1,
section_pre_data => 1, section_pre_data => 1,
test_schema_plus_blobs => 1, test_schema_plus_blobs => 1,
}, },
...@@ -950,6 +959,7 @@ my %tests = ( ...@@ -950,6 +959,7 @@ my %tests = (
%full_runs, %full_runs,
column_inserts => 1, column_inserts => 1,
data_only => 1, data_only => 1,
inserts => 1,
section_data => 1, section_data => 1,
test_schema_plus_blobs => 1, test_schema_plus_blobs => 1,
}, },
...@@ -1084,6 +1094,7 @@ my %tests = ( ...@@ -1084,6 +1094,7 @@ my %tests = (
%full_runs, %full_runs,
column_inserts => 1, column_inserts => 1,
data_only => 1, data_only => 1,
inserts => 1,
section_pre_data => 1, section_pre_data => 1,
test_schema_plus_blobs => 1, test_schema_plus_blobs => 1,
}, },
...@@ -1283,6 +1294,27 @@ my %tests = ( ...@@ -1283,6 +1294,27 @@ my %tests = (
}, },
}, },
'COPY test_third_table' => {
create_order => 7,
create_sql =>
'INSERT INTO dump_test.test_third_table VALUES (123, DEFAULT, 456);',
regexp => qr/^
\QCOPY dump_test.test_third_table (f1, "F3") FROM stdin;\E
\n123\t456\n\\\.\n
/xm,
like => {
%full_runs,
%dump_test_schema_runs,
data_only => 1,
section_data => 1,
},
unlike => {
binary_upgrade => 1,
exclude_dump_test_schema => 1,
schema_only => 1,
},
},
'COPY test_fourth_table' => { 'COPY test_fourth_table' => {
create_order => 7, create_order => 7,
create_sql => create_sql =>
...@@ -1374,10 +1406,22 @@ my %tests = ( ...@@ -1374,10 +1406,22 @@ my %tests = (
like => { column_inserts => 1, }, like => { column_inserts => 1, },
}, },
'INSERT INTO test_third_table (colnames)' => {
regexp =>
qr/^INSERT INTO dump_test\.test_third_table \(f1, "F3"\) VALUES \(123, 456\);\n/m,
like => { column_inserts => 1, },
},
'INSERT INTO test_third_table' => {
regexp =>
qr/^INSERT INTO dump_test\.test_third_table VALUES \(123, DEFAULT, 456, DEFAULT\);\n/m,
like => { inserts => 1, },
},
'INSERT INTO test_fourth_table' => { 'INSERT INTO test_fourth_table' => {
regexp => regexp =>
qr/^(?:INSERT INTO dump_test\.test_fourth_table DEFAULT VALUES;\n){2}/m, qr/^(?:INSERT INTO dump_test\.test_fourth_table DEFAULT VALUES;\n){2}/m,
like => { column_inserts => 1, rows_per_insert => 1, }, like => { column_inserts => 1, inserts => 1, rows_per_insert => 1, },
}, },
'INSERT INTO test_fifth_table' => { 'INSERT INTO test_fifth_table' => {
...@@ -2547,6 +2591,28 @@ my %tests = ( ...@@ -2547,6 +2591,28 @@ my %tests = (
like => {} like => {}
}, },
'CREATE TABLE test_third_table_generated_cols' => {
create_order => 6,
create_sql => 'CREATE TABLE dump_test.test_third_table (
f1 int, junk int,
g1 int generated always as (f1 * 2) stored,
"F3" int,
g2 int generated always as ("F3" * 3) stored
);
ALTER TABLE dump_test.test_third_table DROP COLUMN junk;',
regexp => qr/^
\QCREATE TABLE dump_test.test_third_table (\E\n
\s+\Qf1 integer,\E\n
\s+\Qg1 integer GENERATED ALWAYS AS ((f1 * 2)) STORED,\E\n
\s+\Q"F3" integer,\E\n
\s+\Qg2 integer GENERATED ALWAYS AS (("F3" * 3)) STORED\E\n
\);\n
/xm,
like =>
{ %full_runs, %dump_test_schema_runs, section_pre_data => 1, },
unlike => { binary_upgrade => 1, exclude_dump_test_schema => 1, },
},
'CREATE TABLE test_fourth_table_zero_col' => { 'CREATE TABLE test_fourth_table_zero_col' => {
create_order => 6, create_order => 6,
create_sql => 'CREATE TABLE dump_test.test_fourth_table ( create_sql => 'CREATE TABLE dump_test.test_fourth_table (
...@@ -3230,6 +3296,7 @@ my %tests = ( ...@@ -3230,6 +3296,7 @@ my %tests = (
%full_runs, %full_runs,
column_inserts => 1, column_inserts => 1,
data_only => 1, data_only => 1,
inserts => 1,
section_pre_data => 1, section_pre_data => 1,
test_schema_plus_blobs => 1, test_schema_plus_blobs => 1,
binary_upgrade => 1, binary_upgrade => 1,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment