Commit 3b925e90 authored by Andres Freund's avatar Andres Freund

tableam: Add pg_dump support.

This adds pg_dump support for table AMs in a similar manner to how
tablespaces are handled. That is, instead of specifying the AM for
every CREATE TABLE etc, emit SET default_table_access_method
statements. That makes it easier to change the AM for all/most tables
in a dump, and allows restore to succeed even if some AM is not
available.

This increases the dump archive version, as a tables/matview's AM
needs to be tracked therein.

Author: Dimitri Dolgov, Andres Freund
Discussion:
    https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de
    https://postgr.es/m/20190304234700.w5tmhducs5wxgzls@alap3.anarazel.de
parent 8586bf7e
...@@ -85,6 +85,7 @@ static void _becomeUser(ArchiveHandle *AH, const char *user); ...@@ -85,6 +85,7 @@ static void _becomeUser(ArchiveHandle *AH, const char *user);
static void _becomeOwner(ArchiveHandle *AH, TocEntry *te); static void _becomeOwner(ArchiveHandle *AH, TocEntry *te);
static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName); static void _selectOutputSchema(ArchiveHandle *AH, const char *schemaName);
static void _selectTablespace(ArchiveHandle *AH, const char *tablespace); static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
static void _selectTableAccessMethod(ArchiveHandle *AH, const char *tableam);
static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te); static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te); static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te); static void processSearchPathEntry(ArchiveHandle *AH, TocEntry *te);
...@@ -1090,6 +1091,7 @@ ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId, ...@@ -1090,6 +1091,7 @@ ArchiveEntry(Archive *AHX, CatalogId catalogId, DumpId dumpId,
newToc->tag = pg_strdup(opts->tag); newToc->tag = pg_strdup(opts->tag);
newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL; newToc->namespace = opts->namespace ? pg_strdup(opts->namespace) : NULL;
newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL; newToc->tablespace = opts->tablespace ? pg_strdup(opts->tablespace) : NULL;
newToc->tableam = opts->tableam ? pg_strdup(opts->tableam) : NULL;
newToc->owner = pg_strdup(opts->owner); newToc->owner = pg_strdup(opts->owner);
newToc->desc = pg_strdup(opts->description); newToc->desc = pg_strdup(opts->description);
newToc->defn = pg_strdup(opts->createStmt); newToc->defn = pg_strdup(opts->createStmt);
...@@ -2350,6 +2352,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt, ...@@ -2350,6 +2352,7 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
AH->currUser = NULL; /* unknown */ AH->currUser = NULL; /* unknown */
AH->currSchema = NULL; /* ditto */ AH->currSchema = NULL; /* ditto */
AH->currTablespace = NULL; /* ditto */ AH->currTablespace = NULL; /* ditto */
AH->currTableAm = NULL; /* ditto */
AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry)); AH->toc = (TocEntry *) pg_malloc0(sizeof(TocEntry));
...@@ -2576,6 +2579,7 @@ WriteToc(ArchiveHandle *AH) ...@@ -2576,6 +2579,7 @@ WriteToc(ArchiveHandle *AH)
WriteStr(AH, te->copyStmt); WriteStr(AH, te->copyStmt);
WriteStr(AH, te->namespace); WriteStr(AH, te->namespace);
WriteStr(AH, te->tablespace); WriteStr(AH, te->tablespace);
WriteStr(AH, te->tableam);
WriteStr(AH, te->owner); WriteStr(AH, te->owner);
WriteStr(AH, "false"); WriteStr(AH, "false");
...@@ -2678,6 +2682,9 @@ ReadToc(ArchiveHandle *AH) ...@@ -2678,6 +2682,9 @@ ReadToc(ArchiveHandle *AH)
if (AH->version >= K_VERS_1_10) if (AH->version >= K_VERS_1_10)
te->tablespace = ReadStr(AH); te->tablespace = ReadStr(AH);
if (AH->version >= K_VERS_1_14)
te->tableam = ReadStr(AH);
te->owner = ReadStr(AH); te->owner = ReadStr(AH);
if (AH->version < K_VERS_1_9 || strcmp(ReadStr(AH), "true") == 0) if (AH->version < K_VERS_1_9 || strcmp(ReadStr(AH), "true") == 0)
write_msg(modulename, write_msg(modulename,
...@@ -3431,6 +3438,48 @@ _selectTablespace(ArchiveHandle *AH, const char *tablespace) ...@@ -3431,6 +3438,48 @@ _selectTablespace(ArchiveHandle *AH, const char *tablespace)
destroyPQExpBuffer(qry); destroyPQExpBuffer(qry);
} }
/*
* Set the proper default_table_access_method value for the table.
*/
static void
_selectTableAccessMethod(ArchiveHandle *AH, const char *tableam)
{
PQExpBuffer cmd;
const char *want, *have;
have = AH->currTableAm;
want = tableam;
if (!want)
return;
if (have && strcmp(want, have) == 0)
return;
cmd = createPQExpBuffer();
appendPQExpBuffer(cmd, "SET default_table_access_method = %s;", fmtId(want));
if (RestoringToDB(AH))
{
PGresult *res;
res = PQexec(AH->connection, cmd->data);
if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
warn_or_exit_horribly(AH, modulename,
"could not set default_table_access_method: %s",
PQerrorMessage(AH->connection));
PQclear(res);
}
else
ahprintf(AH, "%s\n\n", cmd->data);
destroyPQExpBuffer(cmd);
AH->currTableAm = pg_strdup(want);
}
/* /*
* Extract an object description for a TOC entry, and append it to buf. * Extract an object description for a TOC entry, and append it to buf.
* *
...@@ -3526,10 +3575,11 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData) ...@@ -3526,10 +3575,11 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
{ {
RestoreOptions *ropt = AH->public.ropt; RestoreOptions *ropt = AH->public.ropt;
/* Select owner, schema, and tablespace as necessary */ /* Select owner, schema, tablespace and default AM as necessary */
_becomeOwner(AH, te); _becomeOwner(AH, te);
_selectOutputSchema(AH, te->namespace); _selectOutputSchema(AH, te->namespace);
_selectTablespace(AH, te->tablespace); _selectTablespace(AH, te->tablespace);
_selectTableAccessMethod(AH, te->tableam);
/* Emit header comment for item */ /* Emit header comment for item */
if (!AH->noTocComments) if (!AH->noTocComments)
...@@ -4006,6 +4056,9 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list) ...@@ -4006,6 +4056,9 @@ restore_toc_entries_prefork(ArchiveHandle *AH, TocEntry *pending_list)
if (AH->currTablespace) if (AH->currTablespace)
free(AH->currTablespace); free(AH->currTablespace);
AH->currTablespace = NULL; AH->currTablespace = NULL;
if (AH->currTableAm)
free(AH->currTableAm);
AH->currTableAm = NULL;
} }
/* /*
...@@ -4891,6 +4944,8 @@ DeCloneArchive(ArchiveHandle *AH) ...@@ -4891,6 +4944,8 @@ DeCloneArchive(ArchiveHandle *AH)
free(AH->currSchema); free(AH->currSchema);
if (AH->currTablespace) if (AH->currTablespace)
free(AH->currTablespace); free(AH->currTablespace);
if (AH->currTableAm)
free(AH->currTableAm);
if (AH->savedPassword) if (AH->savedPassword)
free(AH->savedPassword); free(AH->savedPassword);
......
...@@ -94,6 +94,7 @@ typedef z_stream *z_streamp; ...@@ -94,6 +94,7 @@ typedef z_stream *z_streamp;
* entries */ * entries */
#define K_VERS_1_13 MAKE_ARCHIVE_VERSION(1, 13, 0) /* change search_path #define K_VERS_1_13 MAKE_ARCHIVE_VERSION(1, 13, 0) /* change search_path
* behavior */ * behavior */
#define K_VERS_1_14 MAKE_ARCHIVE_VERSION(1, 14, 0) /* add tableam */
/* /*
* Current archive version number (the format we can output) * Current archive version number (the format we can output)
...@@ -102,7 +103,7 @@ typedef z_stream *z_streamp; ...@@ -102,7 +103,7 @@ typedef z_stream *z_streamp;
* https://postgr.es/m/20190227123217.GA27552@alvherre.pgsql * https://postgr.es/m/20190227123217.GA27552@alvherre.pgsql
*/ */
#define K_VERS_MAJOR 1 #define K_VERS_MAJOR 1
#define K_VERS_MINOR 13 #define K_VERS_MINOR 14
#define K_VERS_REV 0 #define K_VERS_REV 0
#define K_VERS_SELF MAKE_ARCHIVE_VERSION(K_VERS_MAJOR, K_VERS_MINOR, K_VERS_REV); #define K_VERS_SELF MAKE_ARCHIVE_VERSION(K_VERS_MAJOR, K_VERS_MINOR, K_VERS_REV);
...@@ -352,6 +353,7 @@ struct _archiveHandle ...@@ -352,6 +353,7 @@ struct _archiveHandle
char *currUser; /* current username, or NULL if unknown */ char *currUser; /* current username, or NULL if unknown */
char *currSchema; /* current schema, or NULL */ char *currSchema; /* current schema, or NULL */
char *currTablespace; /* current tablespace, or NULL */ char *currTablespace; /* current tablespace, or NULL */
char *currTableAm; /* current table access method, or NULL */
void *lo_buf; void *lo_buf;
size_t lo_buf_used; size_t lo_buf_used;
...@@ -378,6 +380,7 @@ struct _tocEntry ...@@ -378,6 +380,7 @@ struct _tocEntry
char *namespace; /* null or empty string if not in a schema */ char *namespace; /* null or empty string if not in a schema */
char *tablespace; /* null if not in a tablespace; empty string char *tablespace; /* null if not in a tablespace; empty string
* means use database default */ * means use database default */
char *tableam; /* table access method, only for TABLE tags */
char *owner; char *owner;
char *desc; char *desc;
char *defn; char *defn;
...@@ -416,6 +419,7 @@ typedef struct _archiveOpts ...@@ -416,6 +419,7 @@ typedef struct _archiveOpts
const char *tag; const char *tag;
const char *namespace; const char *namespace;
const char *tablespace; const char *tablespace;
const char *tableam;
const char *owner; const char *owner;
const char *description; const char *description;
teSection section; teSection section;
......
...@@ -5856,6 +5856,7 @@ getTables(Archive *fout, int *numTables) ...@@ -5856,6 +5856,7 @@ getTables(Archive *fout, int *numTables)
int i_partkeydef; int i_partkeydef;
int i_ispartition; int i_ispartition;
int i_partbound; int i_partbound;
int i_amname;
/* /*
* Find all the tables and table-like objects. * Find all the tables and table-like objects.
...@@ -5941,7 +5942,7 @@ getTables(Archive *fout, int *numTables) ...@@ -5941,7 +5942,7 @@ getTables(Archive *fout, int *numTables)
"tc.relfrozenxid AS tfrozenxid, " "tc.relfrozenxid AS tfrozenxid, "
"tc.relminmxid AS tminmxid, " "tc.relminmxid AS tminmxid, "
"c.relpersistence, c.relispopulated, " "c.relpersistence, c.relispopulated, "
"c.relreplident, c.relpages, " "c.relreplident, c.relpages, am.amname, "
"CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL END AS reloftype, " "CASE WHEN c.reloftype <> 0 THEN c.reloftype::pg_catalog.regtype ELSE NULL END AS reloftype, "
"d.refobjid AS owning_tab, " "d.refobjid AS owning_tab, "
"d.refobjsubid AS owning_col, " "d.refobjsubid AS owning_col, "
...@@ -5972,6 +5973,7 @@ getTables(Archive *fout, int *numTables) ...@@ -5972,6 +5973,7 @@ getTables(Archive *fout, int *numTables)
"d.objsubid = 0 AND " "d.objsubid = 0 AND "
"d.refclassid = c.tableoid AND d.deptype IN ('a', 'i')) " "d.refclassid = c.tableoid AND d.deptype IN ('a', 'i')) "
"LEFT JOIN pg_class tc ON (c.reltoastrelid = tc.oid) " "LEFT JOIN pg_class tc ON (c.reltoastrelid = tc.oid) "
"LEFT JOIN pg_am am ON (c.relam = am.oid) "
"LEFT JOIN pg_init_privs pip ON " "LEFT JOIN pg_init_privs pip ON "
"(c.oid = pip.objoid " "(c.oid = pip.objoid "
"AND pip.classoid = 'pg_class'::regclass " "AND pip.classoid = 'pg_class'::regclass "
...@@ -6439,6 +6441,7 @@ getTables(Archive *fout, int *numTables) ...@@ -6439,6 +6441,7 @@ getTables(Archive *fout, int *numTables)
i_partkeydef = PQfnumber(res, "partkeydef"); i_partkeydef = PQfnumber(res, "partkeydef");
i_ispartition = PQfnumber(res, "ispartition"); i_ispartition = PQfnumber(res, "ispartition");
i_partbound = PQfnumber(res, "partbound"); i_partbound = PQfnumber(res, "partbound");
i_amname = PQfnumber(res, "amname");
if (dopt->lockWaitTimeout) if (dopt->lockWaitTimeout)
{ {
...@@ -6508,6 +6511,10 @@ getTables(Archive *fout, int *numTables) ...@@ -6508,6 +6511,10 @@ getTables(Archive *fout, int *numTables)
else else
tblinfo[i].checkoption = pg_strdup(PQgetvalue(res, i, i_checkoption)); tblinfo[i].checkoption = pg_strdup(PQgetvalue(res, i, i_checkoption));
tblinfo[i].toast_reloptions = pg_strdup(PQgetvalue(res, i, i_toastreloptions)); tblinfo[i].toast_reloptions = pg_strdup(PQgetvalue(res, i, i_toastreloptions));
if (PQgetisnull(res, i, i_amname))
tblinfo[i].amname = NULL;
else
tblinfo[i].amname = pg_strdup(PQgetvalue(res, i, i_amname));
/* other fields were zeroed above */ /* other fields were zeroed above */
...@@ -12632,6 +12639,9 @@ dumpAccessMethod(Archive *fout, AccessMethodInfo *aminfo) ...@@ -12632,6 +12639,9 @@ dumpAccessMethod(Archive *fout, AccessMethodInfo *aminfo)
case AMTYPE_INDEX: case AMTYPE_INDEX:
appendPQExpBuffer(q, "TYPE INDEX "); appendPQExpBuffer(q, "TYPE INDEX ");
break; break;
case AMTYPE_TABLE:
appendPQExpBuffer(q, "TYPE TABLE ");
break;
default: default:
write_msg(NULL, "WARNING: invalid type \"%c\" of access method \"%s\"\n", write_msg(NULL, "WARNING: invalid type \"%c\" of access method \"%s\"\n",
aminfo->amtype, qamname); aminfo->amtype, qamname);
...@@ -16067,18 +16077,26 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo) ...@@ -16067,18 +16077,26 @@ dumpTableSchema(Archive *fout, TableInfo *tbinfo)
tbinfo->dobj.namespace->dobj.name); tbinfo->dobj.namespace->dobj.name);
if (tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION) if (tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
{
char *tableam = NULL;
if (tbinfo->relkind == RELKIND_RELATION ||
tbinfo->relkind == RELKIND_MATVIEW)
tableam = tbinfo->amname;
ArchiveEntry(fout, tbinfo->dobj.catId, tbinfo->dobj.dumpId, ArchiveEntry(fout, tbinfo->dobj.catId, tbinfo->dobj.dumpId,
ARCHIVE_OPTS(.tag = tbinfo->dobj.name, ARCHIVE_OPTS(.tag = tbinfo->dobj.name,
.namespace = tbinfo->dobj.namespace->dobj.name, .namespace = tbinfo->dobj.namespace->dobj.name,
.tablespace = (tbinfo->relkind == RELKIND_VIEW) ? .tablespace = (tbinfo->relkind == RELKIND_VIEW) ?
NULL : tbinfo->reltablespace, NULL : tbinfo->reltablespace,
.tableam = tableam,
.owner = tbinfo->rolname, .owner = tbinfo->rolname,
.description = reltypename, .description = reltypename,
.section = tbinfo->postponed_def ? .section = tbinfo->postponed_def ?
SECTION_POST_DATA : SECTION_PRE_DATA, SECTION_POST_DATA : SECTION_PRE_DATA,
.createStmt = q->data, .createStmt = q->data,
.dropStmt = delq->data)); .dropStmt = delq->data));
}
/* Dump Table Comments */ /* Dump Table Comments */
if (tbinfo->dobj.dump & DUMP_COMPONENT_COMMENT) if (tbinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
......
...@@ -324,6 +324,7 @@ typedef struct _tableInfo ...@@ -324,6 +324,7 @@ typedef struct _tableInfo
char *partkeydef; /* partition key definition */ char *partkeydef; /* partition key definition */
char *partbound; /* partition bound definition */ char *partbound; /* partition bound definition */
bool needs_override; /* has GENERATED ALWAYS AS IDENTITY */ bool needs_override; /* has GENERATED ALWAYS AS IDENTITY */
char *amname; /* relation access method */
/* /*
* Stuff computed only for dumpable tables. * Stuff computed only for dumpable tables.
......
...@@ -3070,6 +3070,66 @@ my %tests = ( ...@@ -3070,6 +3070,66 @@ my %tests = (
unlike => { no_privs => 1, }, unlike => { no_privs => 1, },
}, },
'CREATE ACCESS METHOD regress_test_table_am' => {
create_order => 11,
create_sql => 'CREATE ACCESS METHOD regress_table_am TYPE TABLE HANDLER heap_tableam_handler;',
regexp => qr/^
\QCREATE ACCESS METHOD regress_table_am TYPE TABLE HANDLER heap_tableam_handler;\E
\n/xm,
like => {
%full_runs,
section_pre_data => 1,
},
},
# It's a bit tricky to ensure that the proper SET of default table
# AM occurs. To achieve that we create a table with the standard
# AM, test AM, standard AM. That guarantees that there needs to be
# a SET interspersed. Then use a regex that prevents interspersed
# SET ...; statements, followed by the exptected CREATE TABLE. Not
# pretty, but seems hard to do better in this framework.
'CREATE TABLE regress_pg_dump_table_am' => {
create_order => 12,
create_sql => '
CREATE TABLE dump_test.regress_pg_dump_table_am_0() USING heap;
CREATE TABLE dump_test.regress_pg_dump_table_am_1 (col1 int) USING regress_table_am;
CREATE TABLE dump_test.regress_pg_dump_table_am_2() USING heap;',
regexp => qr/^
\QSET default_table_access_method = regress_table_am;\E
(\n(?!SET[^;]+;)[^\n]*)*
\n\QCREATE TABLE dump_test.regress_pg_dump_table_am_1 (\E
\n\s+\Qcol1 integer\E
\n\);/xm,
like => {
%full_runs,
%dump_test_schema_runs,
section_pre_data => 1,
},
unlike => { exclude_dump_test_schema => 1},
},
'CREATE MATERIALIZED VIEW regress_pg_dump_matview_am' => {
create_order => 13,
create_sql => '
CREATE MATERIALIZED VIEW dump_test.regress_pg_dump_matview_am_0 USING heap AS SELECT 1;
CREATE MATERIALIZED VIEW dump_test.regress_pg_dump_matview_am_1
USING regress_table_am AS SELECT count(*) FROM pg_class;
CREATE MATERIALIZED VIEW dump_test.regress_pg_dump_matview_am_2 USING heap AS SELECT 1;',
regexp => qr/^
\QSET default_table_access_method = regress_table_am;\E
(\n(?!SET[^;]+;)[^\n]*)*
\QCREATE MATERIALIZED VIEW dump_test.regress_pg_dump_matview_am_1 AS\E
\n\s+\QSELECT count(*) AS count\E
\n\s+\QFROM pg_class\E
\n\s+\QWITH NO DATA;\E\n/xm,
like => {
%full_runs,
%dump_test_schema_runs,
section_pre_data => 1,
},
unlike => { exclude_dump_test_schema => 1},
}
); );
######################################### #########################################
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment