Commit 8e396a77 authored by Tom Lane's avatar Tom Lane

pg_dump: label PUBLICATION TABLE ArchiveEntries with an owner.

This is the same fix as commit 9eabfe30 applied to INDEX ATTACH
entries, but for table-to-publication attachments.  As in that
case, even though the backend doesn't record "ownership" of the
attachment, we still ought to label it in the dump archive with
the role name that should run the ALTER PUBLICATION command.
The existing behavior causes the ALTER to be done by the original
role that started the restore; that will usually work fine, but
there may be corner cases where it fails.

The bulk of the patch is concerned with changing struct
PublicationRelInfo to include a pointer to the associated
PublicationInfo object, so that we can get the owner's name
out of that when the time comes.  While at it, I rewrote
getPublicationTables() to do just one query of pg_publication_rel,
not one per table.

Back-patch to v10 where this code was introduced.

Discussion: https://postgr.es/m/1165710.1610473242@sss.pgh.pa.us
parent ebfe2dbd
...@@ -52,6 +52,7 @@ static DumpableObject **oprinfoindex; ...@@ -52,6 +52,7 @@ static DumpableObject **oprinfoindex;
static DumpableObject **collinfoindex; static DumpableObject **collinfoindex;
static DumpableObject **nspinfoindex; static DumpableObject **nspinfoindex;
static DumpableObject **extinfoindex; static DumpableObject **extinfoindex;
static DumpableObject **pubinfoindex;
static int numTables; static int numTables;
static int numTypes; static int numTypes;
static int numFuncs; static int numFuncs;
...@@ -59,6 +60,7 @@ static int numOperators; ...@@ -59,6 +60,7 @@ static int numOperators;
static int numCollations; static int numCollations;
static int numNamespaces; static int numNamespaces;
static int numExtensions; static int numExtensions;
static int numPublications;
/* This is an array of object identities, not actual DumpableObjects */ /* This is an array of object identities, not actual DumpableObjects */
static ExtensionMemberId *extmembers; static ExtensionMemberId *extmembers;
...@@ -93,6 +95,7 @@ getSchemaData(Archive *fout, int *numTablesPtr) ...@@ -93,6 +95,7 @@ getSchemaData(Archive *fout, int *numTablesPtr)
CollInfo *collinfo; CollInfo *collinfo;
NamespaceInfo *nspinfo; NamespaceInfo *nspinfo;
ExtensionInfo *extinfo; ExtensionInfo *extinfo;
PublicationInfo *pubinfo;
InhInfo *inhinfo; InhInfo *inhinfo;
int numAggregates; int numAggregates;
int numInherits; int numInherits;
...@@ -247,7 +250,9 @@ getSchemaData(Archive *fout, int *numTablesPtr) ...@@ -247,7 +250,9 @@ getSchemaData(Archive *fout, int *numTablesPtr)
getPolicies(fout, tblinfo, numTables); getPolicies(fout, tblinfo, numTables);
pg_log_info("reading publications"); pg_log_info("reading publications");
getPublications(fout); pubinfo = getPublications(fout, &numPublications);
pubinfoindex = buildIndexArray(pubinfo, numPublications,
sizeof(PublicationInfo));
pg_log_info("reading publication membership"); pg_log_info("reading publication membership");
getPublicationTables(fout, tblinfo, numTables); getPublicationTables(fout, tblinfo, numTables);
...@@ -937,6 +942,17 @@ findExtensionByOid(Oid oid) ...@@ -937,6 +942,17 @@ findExtensionByOid(Oid oid)
return (ExtensionInfo *) findObjectByOid(oid, extinfoindex, numExtensions); return (ExtensionInfo *) findObjectByOid(oid, extinfoindex, numExtensions);
} }
/*
* findPublicationByOid
* finds the entry (in pubinfo) of the publication with the given oid
* returns NULL if not found
*/
PublicationInfo *
findPublicationByOid(Oid oid)
{
return (PublicationInfo *) findObjectByOid(oid, pubinfoindex, numPublications);
}
/* /*
* findIndexByOid * findIndexByOid
* find the entry of the index with the given oid * find the entry of the index with the given oid
......
...@@ -3865,8 +3865,8 @@ dumpPolicy(Archive *fout, PolicyInfo *polinfo) ...@@ -3865,8 +3865,8 @@ dumpPolicy(Archive *fout, PolicyInfo *polinfo)
* getPublications * getPublications
* get information about publications * get information about publications
*/ */
void PublicationInfo *
getPublications(Archive *fout) getPublications(Archive *fout, int *numPublications)
{ {
DumpOptions *dopt = fout->dopt; DumpOptions *dopt = fout->dopt;
PQExpBuffer query; PQExpBuffer query;
...@@ -3886,7 +3886,10 @@ getPublications(Archive *fout) ...@@ -3886,7 +3886,10 @@ getPublications(Archive *fout)
ntups; ntups;
if (dopt->no_publications || fout->remoteVersion < 100000) if (dopt->no_publications || fout->remoteVersion < 100000)
return; {
*numPublications = 0;
return NULL;
}
query = createPQExpBuffer(); query = createPQExpBuffer();
...@@ -3964,6 +3967,9 @@ getPublications(Archive *fout) ...@@ -3964,6 +3967,9 @@ getPublications(Archive *fout)
PQclear(res); PQclear(res);
destroyPQExpBuffer(query); destroyPQExpBuffer(query);
*numPublications = ntups;
return pubinfo;
} }
/* /*
...@@ -4072,7 +4078,8 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) ...@@ -4072,7 +4078,8 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
DumpOptions *dopt = fout->dopt; DumpOptions *dopt = fout->dopt;
int i_tableoid; int i_tableoid;
int i_oid; int i_oid;
int i_pubname; int i_prpubid;
int i_prrelid;
int i, int i,
j, j,
ntups; ntups;
...@@ -4082,15 +4089,39 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) ...@@ -4082,15 +4089,39 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
query = createPQExpBuffer(); query = createPQExpBuffer();
for (i = 0; i < numTables; i++) /* Collect all publication membership info. */
appendPQExpBufferStr(query,
"SELECT tableoid, oid, prpubid, prrelid "
"FROM pg_catalog.pg_publication_rel");
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
ntups = PQntuples(res);
i_tableoid = PQfnumber(res, "tableoid");
i_oid = PQfnumber(res, "oid");
i_prpubid = PQfnumber(res, "prpubid");
i_prrelid = PQfnumber(res, "prrelid");
/* this allocation may be more than we need */
pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo));
j = 0;
for (i = 0; i < ntups; i++)
{ {
TableInfo *tbinfo = &tblinfo[i]; Oid prpubid = atooid(PQgetvalue(res, i, i_prpubid));
Oid prrelid = atooid(PQgetvalue(res, i, i_prrelid));
PublicationInfo *pubinfo;
TableInfo *tbinfo;
/* /*
* Only regular and partitioned tables can be added to publications. * Ignore any entries for which we aren't interested in either the
* publication or the rel.
*/ */
if (tbinfo->relkind != RELKIND_RELATION && pubinfo = findPublicationByOid(prpubid);
tbinfo->relkind != RELKIND_PARTITIONED_TABLE) if (pubinfo == NULL)
continue;
tbinfo = findTableByOid(prrelid);
if (tbinfo == NULL)
continue; continue;
/* /*
...@@ -4100,55 +4131,24 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) ...@@ -4100,55 +4131,24 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)) if (!(tbinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
continue; continue;
pg_log_info("reading publication membership for table \"%s.%s\"", /* OK, make a DumpableObject for this relationship */
tbinfo->dobj.namespace->dobj.name, pubrinfo[j].dobj.objType = DO_PUBLICATION_REL;
tbinfo->dobj.name); pubrinfo[j].dobj.catId.tableoid =
atooid(PQgetvalue(res, i, i_tableoid));
resetPQExpBuffer(query); pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid));
AssignDumpId(&pubrinfo[j].dobj);
/* Get the publication membership for the table. */ pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace;
appendPQExpBuffer(query, pubrinfo[j].dobj.name = tbinfo->dobj.name;
"SELECT pr.tableoid, pr.oid, p.pubname " pubrinfo[j].publication = pubinfo;
"FROM pg_publication_rel pr, pg_publication p " pubrinfo[j].pubtable = tbinfo;
"WHERE pr.prrelid = '%u'"
" AND p.oid = pr.prpubid",
tbinfo->dobj.catId.oid);
res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
ntups = PQntuples(res);
if (ntups == 0)
{
/*
* Table is not member of any publications. Clean up and return.
*/
PQclear(res);
continue;
}
i_tableoid = PQfnumber(res, "tableoid");
i_oid = PQfnumber(res, "oid");
i_pubname = PQfnumber(res, "pubname");
pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); /* Decide whether we want to dump it */
selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout);
for (j = 0; j < ntups; j++) j++;
{
pubrinfo[j].dobj.objType = DO_PUBLICATION_REL;
pubrinfo[j].dobj.catId.tableoid =
atooid(PQgetvalue(res, j, i_tableoid));
pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid));
AssignDumpId(&pubrinfo[j].dobj);
pubrinfo[j].dobj.namespace = tbinfo->dobj.namespace;
pubrinfo[j].dobj.name = tbinfo->dobj.name;
pubrinfo[j].pubname = pg_strdup(PQgetvalue(res, j, i_pubname));
pubrinfo[j].pubtable = tbinfo;
/* Decide whether we want to dump it */
selectDumpablePublicationTable(&(pubrinfo[j].dobj), fout);
}
PQclear(res);
} }
PQclear(res);
destroyPQExpBuffer(query); destroyPQExpBuffer(query);
} }
...@@ -4159,6 +4159,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) ...@@ -4159,6 +4159,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables)
static void static void
dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo) dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo)
{ {
PublicationInfo *pubinfo = pubrinfo->publication;
TableInfo *tbinfo = pubrinfo->pubtable; TableInfo *tbinfo = pubrinfo->pubtable;
PQExpBuffer query; PQExpBuffer query;
char *tag; char *tag;
...@@ -4166,22 +4167,26 @@ dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo) ...@@ -4166,22 +4167,26 @@ dumpPublicationTable(Archive *fout, PublicationRelInfo *pubrinfo)
if (!(pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)) if (!(pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION))
return; return;
tag = psprintf("%s %s", pubrinfo->pubname, tbinfo->dobj.name); tag = psprintf("%s %s", pubinfo->dobj.name, tbinfo->dobj.name);
query = createPQExpBuffer(); query = createPQExpBuffer();
appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY", appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY",
fmtId(pubrinfo->pubname)); fmtId(pubinfo->dobj.name));
appendPQExpBuffer(query, " %s;\n", appendPQExpBuffer(query, " %s;\n",
fmtQualifiedDumpable(tbinfo)); fmtQualifiedDumpable(tbinfo));
/* /*
* There is no point in creating drop query as the drop is done by table * There is no point in creating a drop query as the drop is done by table
* drop. * drop. (If you think to change this, see also _printTocEntry().)
* Although this object doesn't really have ownership as such, set the
* owner field anyway to ensure that the command is run by the correct
* role at restore time.
*/ */
ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId, ArchiveEntry(fout, pubrinfo->dobj.catId, pubrinfo->dobj.dumpId,
ARCHIVE_OPTS(.tag = tag, ARCHIVE_OPTS(.tag = tag,
.namespace = tbinfo->dobj.namespace->dobj.name, .namespace = tbinfo->dobj.namespace->dobj.name,
.owner = pubinfo->rolname,
.description = "PUBLICATION TABLE", .description = "PUBLICATION TABLE",
.section = SECTION_POST_DATA, .section = SECTION_POST_DATA,
.createStmt = query->data)); .createStmt = query->data));
......
...@@ -623,8 +623,8 @@ typedef struct _PublicationInfo ...@@ -623,8 +623,8 @@ typedef struct _PublicationInfo
typedef struct _PublicationRelInfo typedef struct _PublicationRelInfo
{ {
DumpableObject dobj; DumpableObject dobj;
PublicationInfo *publication;
TableInfo *pubtable; TableInfo *pubtable;
char *pubname;
} PublicationRelInfo; } PublicationRelInfo;
/* /*
...@@ -675,6 +675,7 @@ extern OprInfo *findOprByOid(Oid oid); ...@@ -675,6 +675,7 @@ extern OprInfo *findOprByOid(Oid oid);
extern CollInfo *findCollationByOid(Oid oid); extern CollInfo *findCollationByOid(Oid oid);
extern NamespaceInfo *findNamespaceByOid(Oid oid); extern NamespaceInfo *findNamespaceByOid(Oid oid);
extern ExtensionInfo *findExtensionByOid(Oid oid); extern ExtensionInfo *findExtensionByOid(Oid oid);
extern PublicationInfo *findPublicationByOid(Oid oid);
extern void setExtensionMembership(ExtensionMemberId *extmems, int nextmems); extern void setExtensionMembership(ExtensionMemberId *extmems, int nextmems);
extern ExtensionInfo *findOwningExtension(CatalogId catalogId); extern ExtensionInfo *findOwningExtension(CatalogId catalogId);
...@@ -727,7 +728,8 @@ extern void processExtensionTables(Archive *fout, ExtensionInfo extinfo[], ...@@ -727,7 +728,8 @@ extern void processExtensionTables(Archive *fout, ExtensionInfo extinfo[],
int numExtensions); int numExtensions);
extern EventTriggerInfo *getEventTriggers(Archive *fout, int *numEventTriggers); extern EventTriggerInfo *getEventTriggers(Archive *fout, int *numEventTriggers);
extern void getPolicies(Archive *fout, TableInfo tblinfo[], int numTables); extern void getPolicies(Archive *fout, TableInfo tblinfo[], int numTables);
extern void getPublications(Archive *fout); extern PublicationInfo *getPublications(Archive *fout,
int *numPublications);
extern void getPublicationTables(Archive *fout, TableInfo tblinfo[], extern void getPublicationTables(Archive *fout, TableInfo tblinfo[],
int numTables); int numTables);
extern void getSubscriptions(Archive *fout); extern void getSubscriptions(Archive *fout);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment