Commit 2b792ab0 authored by Tom Lane's avatar Tom Lane

Make pg_dump's ACL, sec label, and comment entries reliably identifiable.

_tocEntryRequired() expects that it can identify ACL, SECURITY LABEL,
and COMMENT TOC entries that are for large objects by seeing whether
the tag for them starts with "LARGE OBJECT ".  While that works fine
for actual large objects, which are indeed tagged that way, it's
subject to false positives unless every such entry's tag starts with an
appropriate type ID.  And in fact it does not work for ACLs, because
up to now we customarily tagged those entries with just the bare name
of the object.  This means that an ACL for an object named
"LARGE OBJECT something" would be misclassified as data not schema,
with undesirable results in a schema-only or data-only dump ---
although pg_upgrade seems unaffected, due to the special case for
binary-upgrade mode further down in _tocEntryRequired().

We can fix this by changing all the dumpACL calls to use the label
strings already in use for comments and security labels, which do
follow the convention of starting with an object type indicator.

Well, mostly they follow it.  dumpDatabase() got it wrong, using
just the bare database name for those purposes, so that a database
named "LARGE OBJECT something" would similarly be subject to having
its comment or security label dropped or included when not wanted.
Bring that into line too.  (Note that up to now, database ACLs have
not been processed by pg_dump, so that this issue doesn't affect them.)

_tocEntryRequired() itself is not free of fault: it was overly liberal
about matching object tags to "LARGE OBJECT " in binary-upgrade mode.
This looks like it is probably harmless because there would be no data
component to strip anyway in that mode, but at best it's trouble
waiting to happen, so tighten that up too.

The possible misclassification of SECURITY LABEL entries for databases is
in principle a security problem, but the opportunities for actual exploits
seem too narrow to be interesting.  The other cases seem like just bugs,
since an object owner can change its ACL or comment for himself, he needn't
try to trick someone else into doing it by choosing a strange name.

This has been broken since per-large-object TOC entries were introduced
in 9.0, so back-patch to all supported branches.

Discussion: https://postgr.es/m/21714.1516553459@sss.pgh.pa.us
parent 8561e484
......@@ -2944,14 +2944,22 @@ _tocEntryRequired(TocEntry *te, teSection curSection, RestoreOptions *ropt)
if (ropt->schemaOnly)
{
/*
* The sequence_data option overrides schema-only for SEQUENCE SET.
*
* In binary-upgrade mode, even with schema-only set, we do not mask
* out large objects. Only large object definitions, comments and
* other information should be generated in binary-upgrade mode (not
* the actual data).
*/
if (!(ropt->sequence_data && strcmp(te->desc, "SEQUENCE SET") == 0) &&
!(ropt->binary_upgrade && strcmp(te->desc, "BLOB") == 0) &&
!(ropt->binary_upgrade && strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
!(ropt->binary_upgrade &&
(strcmp(te->desc, "BLOB") == 0 ||
(strcmp(te->desc, "ACL") == 0 &&
strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
(strcmp(te->desc, "COMMENT") == 0 &&
strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
(strcmp(te->desc, "SECURITY LABEL") == 0 &&
strncmp(te->tag, "LARGE OBJECT ", 13) == 0))))
res = res & REQ_SCHEMA;
}
......
......@@ -2529,6 +2529,7 @@ dumpDatabase(Archive *fout)
PQExpBuffer dbQry = createPQExpBuffer();
PQExpBuffer delQry = createPQExpBuffer();
PQExpBuffer creaQry = createPQExpBuffer();
PQExpBuffer labelq = createPQExpBuffer();
PGconn *conn = GetConnection(fout);
PGresult *res;
int i_tableoid,
......@@ -2787,16 +2788,20 @@ dumpDatabase(Archive *fout)
destroyPQExpBuffer(loOutQry);
}
/* Compute correct tag for comments etc */
appendPQExpBuffer(labelq, "DATABASE %s", fmtId(datname));
/* Dump DB comment if any */
if (fout->remoteVersion >= 80200)
{
/*
* 8.2 keeps comments on shared objects in a shared table, so we
* cannot use the dumpComment used for other database objects.
* 8.2 and up keep comments on shared objects in a shared table, so we
* cannot use the dumpComment() code used for other database objects.
* Be careful that the ArchiveEntry parameters match that function.
*/
char *comment = PQgetvalue(res, 0, PQfnumber(res, "description"));
if (comment && strlen(comment))
if (comment && *comment)
{
resetPQExpBuffer(dbQry);
......@@ -2808,17 +2813,17 @@ dumpDatabase(Archive *fout)
appendStringLiteralAH(dbQry, comment, fout);
appendPQExpBufferStr(dbQry, ";\n");
ArchiveEntry(fout, dbCatId, createDumpId(), datname, NULL, NULL,
dba, false, "COMMENT", SECTION_NONE,
ArchiveEntry(fout, nilCatalogId, createDumpId(),
labelq->data, NULL, NULL, dba,
false, "COMMENT", SECTION_NONE,
dbQry->data, "", NULL,
&dbDumpId, 1, NULL, NULL);
&(dbDumpId), 1,
NULL, NULL);
}
}
else
{
resetPQExpBuffer(dbQry);
appendPQExpBuffer(dbQry, "DATABASE %s", fmtId(datname));
dumpComment(fout, dbQry->data, NULL, "",
dumpComment(fout, labelq->data, NULL, dba,
dbCatId, 0, dbDumpId);
}
......@@ -2834,11 +2839,13 @@ dumpDatabase(Archive *fout)
shres = ExecuteSqlQuery(fout, seclabelQry->data, PGRES_TUPLES_OK);
resetPQExpBuffer(seclabelQry);
emitShSecLabels(conn, shres, seclabelQry, "DATABASE", datname);
if (strlen(seclabelQry->data))
ArchiveEntry(fout, dbCatId, createDumpId(), datname, NULL, NULL,
dba, false, "SECURITY LABEL", SECTION_NONE,
if (seclabelQry->len > 0)
ArchiveEntry(fout, nilCatalogId, createDumpId(),
labelq->data, NULL, NULL, dba,
false, "SECURITY LABEL", SECTION_NONE,
seclabelQry->data, "", NULL,
&dbDumpId, 1, NULL, NULL);
&(dbDumpId), 1,
NULL, NULL);
destroyPQExpBuffer(seclabelQry);
PQclear(shres);
}
......@@ -2848,6 +2855,7 @@ dumpDatabase(Archive *fout)
destroyPQExpBuffer(dbQry);
destroyPQExpBuffer(delQry);
destroyPQExpBuffer(creaQry);
destroyPQExpBuffer(labelq);
}
/*
......@@ -9707,7 +9715,7 @@ dumpNamespace(Archive *fout, NamespaceInfo *nspinfo)
if (nspinfo->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, nspinfo->dobj.catId, nspinfo->dobj.dumpId, "SCHEMA",
qnspname, NULL, nspinfo->dobj.name, NULL,
qnspname, NULL, labelq->data, NULL,
nspinfo->rolname, nspinfo->nspacl, nspinfo->rnspacl,
nspinfo->initnspacl, nspinfo->initrnspacl);
......@@ -10003,7 +10011,7 @@ dumpEnumType(Archive *fout, TypeInfo *tyinfo)
if (tyinfo->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, tyinfo->dobj.catId, tyinfo->dobj.dumpId, "TYPE",
qtypname, NULL, tyinfo->dobj.name,
qtypname, NULL, labelq->data,
tyinfo->dobj.namespace->dobj.name,
tyinfo->rolname, tyinfo->typacl, tyinfo->rtypacl,
tyinfo->inittypacl, tyinfo->initrtypacl);
......@@ -10143,7 +10151,7 @@ dumpRangeType(Archive *fout, TypeInfo *tyinfo)
if (tyinfo->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, tyinfo->dobj.catId, tyinfo->dobj.dumpId, "TYPE",
qtypname, NULL, tyinfo->dobj.name,
qtypname, NULL, labelq->data,
tyinfo->dobj.namespace->dobj.name,
tyinfo->rolname, tyinfo->typacl, tyinfo->rtypacl,
tyinfo->inittypacl, tyinfo->initrtypacl);
......@@ -10220,7 +10228,7 @@ dumpUndefinedType(Archive *fout, TypeInfo *tyinfo)
if (tyinfo->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, tyinfo->dobj.catId, tyinfo->dobj.dumpId, "TYPE",
qtypname, NULL, tyinfo->dobj.name,
qtypname, NULL, labelq->data,
tyinfo->dobj.namespace->dobj.name,
tyinfo->rolname, tyinfo->typacl, tyinfo->rtypacl,
tyinfo->inittypacl, tyinfo->initrtypacl);
......@@ -10509,7 +10517,7 @@ dumpBaseType(Archive *fout, TypeInfo *tyinfo)
if (tyinfo->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, tyinfo->dobj.catId, tyinfo->dobj.dumpId, "TYPE",
qtypname, NULL, tyinfo->dobj.name,
qtypname, NULL, labelq->data,
tyinfo->dobj.namespace->dobj.name,
tyinfo->rolname, tyinfo->typacl, tyinfo->rtypacl,
tyinfo->inittypacl, tyinfo->initrtypacl);
......@@ -10678,7 +10686,7 @@ dumpDomain(Archive *fout, TypeInfo *tyinfo)
if (tyinfo->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, tyinfo->dobj.catId, tyinfo->dobj.dumpId, "TYPE",
qtypname, NULL, tyinfo->dobj.name,
qtypname, NULL, labelq->data,
tyinfo->dobj.namespace->dobj.name,
tyinfo->rolname, tyinfo->typacl, tyinfo->rtypacl,
tyinfo->inittypacl, tyinfo->initrtypacl);
......@@ -10914,7 +10922,7 @@ dumpCompositeType(Archive *fout, TypeInfo *tyinfo)
if (tyinfo->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, tyinfo->dobj.catId, tyinfo->dobj.dumpId, "TYPE",
qtypname, NULL, tyinfo->dobj.name,
qtypname, NULL, labelq->data,
tyinfo->dobj.namespace->dobj.name,
tyinfo->rolname, tyinfo->typacl, tyinfo->rtypacl,
tyinfo->inittypacl, tyinfo->initrtypacl);
......@@ -11233,7 +11241,7 @@ dumpProcLang(Archive *fout, ProcLangInfo *plang)
if (plang->lanpltrusted && plang->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, plang->dobj.catId, plang->dobj.dumpId, "LANGUAGE",
qlanname, NULL, plang->dobj.name,
qlanname, NULL, labelq->data,
lanschema,
plang->lanowner, plang->lanacl, plang->rlanacl,
plang->initlanacl, plang->initrlanacl);
......@@ -11867,7 +11875,7 @@ dumpFunc(Archive *fout, FuncInfo *finfo)
if (finfo->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, finfo->dobj.catId, finfo->dobj.dumpId, keyword,
funcsig, NULL, funcsig_tag,
funcsig, NULL, labelq->data,
finfo->dobj.namespace->dobj.name,
finfo->rolname, finfo->proacl, finfo->rproacl,
finfo->initproacl, finfo->initrproacl);
......@@ -13939,15 +13947,13 @@ dumpAgg(Archive *fout, AggInfo *agginfo)
* syntax for zero-argument aggregates and ordered-set aggregates.
*/
free(aggsig);
free(aggsig_tag);
aggsig = format_function_signature(fout, &agginfo->aggfn, true);
aggsig_tag = format_function_signature(fout, &agginfo->aggfn, false);
if (agginfo->aggfn.dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, agginfo->aggfn.dobj.catId, agginfo->aggfn.dobj.dumpId,
"FUNCTION",
aggsig, NULL, aggsig_tag,
aggsig, NULL, labelq->data,
agginfo->aggfn.dobj.namespace->dobj.name,
agginfo->aggfn.rolname, agginfo->aggfn.proacl,
agginfo->aggfn.rproacl,
......@@ -14393,7 +14399,7 @@ dumpForeignDataWrapper(Archive *fout, FdwInfo *fdwinfo)
if (fdwinfo->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, fdwinfo->dobj.catId, fdwinfo->dobj.dumpId,
"FOREIGN DATA WRAPPER",
qfdwname, NULL, fdwinfo->dobj.name,
qfdwname, NULL, labelq->data,
NULL, fdwinfo->rolname,
fdwinfo->fdwacl, fdwinfo->rfdwacl,
fdwinfo->initfdwacl, fdwinfo->initrfdwacl);
......@@ -14490,7 +14496,7 @@ dumpForeignServer(Archive *fout, ForeignServerInfo *srvinfo)
if (srvinfo->dobj.dump & DUMP_COMPONENT_ACL)
dumpACL(fout, srvinfo->dobj.catId, srvinfo->dobj.dumpId,
"FOREIGN SERVER",
qsrvname, NULL, srvinfo->dobj.name,
qsrvname, NULL, labelq->data,
NULL, srvinfo->rolname,
srvinfo->srvacl, srvinfo->rsrvacl,
srvinfo->initsrvacl, srvinfo->initrsrvacl);
......@@ -14701,7 +14707,8 @@ dumpDefaultACL(Archive *fout, DefaultACLInfo *daclinfo)
* FOREIGN DATA WRAPPER, SERVER, or LARGE OBJECT.
* 'name' is the formatted name of the object. Must be quoted etc. already.
* 'subname' is the formatted name of the sub-object, if any. Must be quoted.
* 'tag' is the tag for the archive entry (typ. unquoted name of object).
* 'tag' is the tag for the archive entry (should be the same tag as would be
* used for comments etc; for example "TABLE foo").
* 'nspname' is the namespace the object is in (NULL if none).
* 'owner' is the owner, NULL if there is no owner (for languages).
* 'acls' contains the ACL string of the object from the appropriate system
......@@ -14811,7 +14818,7 @@ dumpSecLabel(Archive *fout, const char *target,
if (dopt->no_security_labels)
return;
/* Comments are schema not data ... except blob comments are data */
/* Security labels are schema not data ... except blob labels are data */
if (strncmp(target, "LARGE OBJECT ", 13) != 0)
{
if (dopt->dataOnly)
......@@ -15105,13 +15112,18 @@ dumpTable(Archive *fout, TableInfo *tbinfo)
/* Handle the ACL here */
namecopy = pg_strdup(fmtId(tbinfo->dobj.name));
if (tbinfo->dobj.dump & DUMP_COMPONENT_ACL)
{
const char *objtype =
(tbinfo->relkind == RELKIND_SEQUENCE) ? "SEQUENCE" : "TABLE";
char *acltag = psprintf("%s %s", objtype, namecopy);
dumpACL(fout, tbinfo->dobj.catId, tbinfo->dobj.dumpId,
(tbinfo->relkind == RELKIND_SEQUENCE) ? "SEQUENCE" :
"TABLE",
namecopy, NULL, tbinfo->dobj.name,
objtype, namecopy, NULL, acltag,
tbinfo->dobj.namespace->dobj.name, tbinfo->rolname,
tbinfo->relacl, tbinfo->rrelacl,
tbinfo->initrelacl, tbinfo->initrrelacl);
free(acltag);
}
/*
* Handle column ACLs, if any. Note: we pull these with a separate query
......@@ -15195,7 +15207,7 @@ dumpTable(Archive *fout, TableInfo *tbinfo)
char *acltag;
attnamecopy = pg_strdup(fmtId(attname));
acltag = psprintf("%s.%s", tbinfo->dobj.name, attname);
acltag = psprintf("COLUMN %s.%s", namecopy, attnamecopy);
/* Column's GRANT type is always TABLE */
dumpACL(fout, tbinfo->dobj.catId, tbinfo->dobj.dumpId, "TABLE",
namecopy, attnamecopy, acltag,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment