Commit 6d060494 authored by Tom Lane's avatar Tom Lane

Adjust postgres_fdw's search path handling.

Set the remote session's search path to exactly "pg_catalog" at session
start, then schema-qualify only names that aren't in that schema.  This
greatly reduces clutter in the generated SQL commands, as seen in the
regression test changes.  Per discussion.

Also, rethink use of FirstNormalObjectId as the "built-in object" cutoff
--- FirstBootstrapObjectId is safer, since the former will accept
objects in information_schema for instance.
parent abf5c5c9
...@@ -63,6 +63,7 @@ static bool xact_got_connection = false; ...@@ -63,6 +63,7 @@ static bool xact_got_connection = false;
/* prototypes of private functions */ /* prototypes of private functions */
static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
static void check_conn_params(const char **keywords, const char **values); static void check_conn_params(const char **keywords, const char **values);
static void configure_remote_session(PGconn *conn);
static void begin_remote_xact(ConnCacheEntry *entry); static void begin_remote_xact(ConnCacheEntry *entry);
static void pgfdw_xact_callback(XactEvent event, void *arg); static void pgfdw_xact_callback(XactEvent event, void *arg);
static void pgfdw_subxact_callback(SubXactEvent event, static void pgfdw_subxact_callback(SubXactEvent event,
...@@ -237,6 +238,9 @@ connect_pg_server(ForeignServer *server, UserMapping *user) ...@@ -237,6 +238,9 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
errdetail("Non-superuser cannot connect if the server does not request a password."), errdetail("Non-superuser cannot connect if the server does not request a password."),
errhint("Target server's authentication method must be changed."))); errhint("Target server's authentication method must be changed.")));
/* Prepare new session for use */
configure_remote_session(conn);
pfree(keywords); pfree(keywords);
pfree(values); pfree(values);
} }
...@@ -281,6 +285,31 @@ check_conn_params(const char **keywords, const char **values) ...@@ -281,6 +285,31 @@ check_conn_params(const char **keywords, const char **values)
errdetail("Non-superusers must provide a password in the user mapping."))); errdetail("Non-superusers must provide a password in the user mapping.")));
} }
/*
* Issue SET commands to make sure remote session is configured properly.
*
* We do this just once at connection, assuming nothing will change the
* values later. Since we'll never send volatile function calls to the
* remote, there shouldn't be any way to break this assumption from our end.
* It's possible to think of ways to break it at the remote end, eg making
* a foreign table point to a view that includes a set_config call ---
* but once you admit the possibility of a malicious view definition,
* there are any number of ways to break things.
*/
static void
configure_remote_session(PGconn *conn)
{
const char *sql;
PGresult *res;
/* Force the search path to contain only pg_catalog (see deparse.c) */
sql = "SET search_path = pg_catalog";
res = PQexec(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, true, sql);
PQclear(res);
}
/* /*
* Start remote transaction or subtransaction, if needed. * Start remote transaction or subtransaction, if needed.
* *
......
...@@ -11,6 +11,9 @@ ...@@ -11,6 +11,9 @@
* One saving grace is that we only need deparse logic for node types that * One saving grace is that we only need deparse logic for node types that
* we consider safe to send. * we consider safe to send.
* *
* We assume that the remote session's search_path is exactly "pg_catalog",
* and thus we need schema-qualify all and only names outside pg_catalog.
*
* Portions Copyright (c) 2012-2013, PostgreSQL Global Development Group * Portions Copyright (c) 2012-2013, PostgreSQL Global Development Group
* *
* IDENTIFICATION * IDENTIFICATION
...@@ -25,6 +28,7 @@ ...@@ -25,6 +28,7 @@
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/sysattr.h" #include "access/sysattr.h"
#include "access/transam.h" #include "access/transam.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_operator.h" #include "catalog/pg_operator.h"
#include "catalog/pg_proc.h" #include "catalog/pg_proc.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
...@@ -73,6 +77,7 @@ static void deparseParam(StringInfo buf, Param *node, PlannerInfo *root); ...@@ -73,6 +77,7 @@ static void deparseParam(StringInfo buf, Param *node, PlannerInfo *root);
static void deparseArrayRef(StringInfo buf, ArrayRef *node, PlannerInfo *root); static void deparseArrayRef(StringInfo buf, ArrayRef *node, PlannerInfo *root);
static void deparseFuncExpr(StringInfo buf, FuncExpr *node, PlannerInfo *root); static void deparseFuncExpr(StringInfo buf, FuncExpr *node, PlannerInfo *root);
static void deparseOpExpr(StringInfo buf, OpExpr *node, PlannerInfo *root); static void deparseOpExpr(StringInfo buf, OpExpr *node, PlannerInfo *root);
static void deparseOperatorName(StringInfo buf, Form_pg_operator opform);
static void deparseDistinctExpr(StringInfo buf, DistinctExpr *node, static void deparseDistinctExpr(StringInfo buf, DistinctExpr *node,
PlannerInfo *root); PlannerInfo *root);
static void deparseScalarArrayOpExpr(StringInfo buf, ScalarArrayOpExpr *node, static void deparseScalarArrayOpExpr(StringInfo buf, ScalarArrayOpExpr *node,
...@@ -321,6 +326,18 @@ foreign_expr_walker(Node *node, foreign_expr_cxt *context) ...@@ -321,6 +326,18 @@ foreign_expr_walker(Node *node, foreign_expr_cxt *context)
/* /*
* Return true if given object is one of PostgreSQL's built-in objects. * Return true if given object is one of PostgreSQL's built-in objects.
* *
* We use FirstBootstrapObjectId as the cutoff, so that we only consider
* objects with hand-assigned OIDs to be "built in", not for instance any
* function or type defined in the information_schema.
*
* Our constraints for dealing with types are tighter than they are for
* functions or operators: we want to accept only types that are in pg_catalog
* (else format_type might incorrectly fail to schema-qualify their names),
* and we want to be sure that the remote server will use the same OID as
* we do (since we must transmit a numeric type OID when passing a value of
* the type as a query parameter). Both of these are reasons to reject
* objects created post-bootstrap.
*
* XXX there is a problem with this, which is that the set of built-in * XXX there is a problem with this, which is that the set of built-in
* objects expands over time. Something that is built-in to us might not * objects expands over time. Something that is built-in to us might not
* be known to the remote server, if it's of an older version. But keeping * be known to the remote server, if it's of an older version. But keeping
...@@ -329,7 +346,7 @@ foreign_expr_walker(Node *node, foreign_expr_cxt *context) ...@@ -329,7 +346,7 @@ foreign_expr_walker(Node *node, foreign_expr_cxt *context)
static bool static bool
is_builtin(Oid oid) is_builtin(Oid oid)
{ {
return (oid < FirstNormalObjectId); return (oid < FirstBootstrapObjectId);
} }
...@@ -563,6 +580,10 @@ deparseRelation(StringInfo buf, Oid relid) ...@@ -563,6 +580,10 @@ deparseRelation(StringInfo buf, Oid relid)
relname = defGetString(def); relname = defGetString(def);
} }
/*
* Note: we could skip printing the schema name if it's pg_catalog,
* but that doesn't seem worth the trouble.
*/
if (nspname == NULL) if (nspname == NULL)
nspname = get_namespace_name(get_rel_namespace(relid)); nspname = get_namespace_name(get_rel_namespace(relid));
if (relname == NULL) if (relname == NULL)
...@@ -832,9 +853,6 @@ deparseArrayRef(StringInfo buf, ArrayRef *node, PlannerInfo *root) ...@@ -832,9 +853,6 @@ deparseArrayRef(StringInfo buf, ArrayRef *node, PlannerInfo *root)
* Here not only explicit function calls and explicit casts but also implicit * Here not only explicit function calls and explicit casts but also implicit
* casts are deparsed to avoid problems caused by different cast settings * casts are deparsed to avoid problems caused by different cast settings
* between local and remote. * between local and remote.
*
* Function name is always qualified by schema name to avoid problems caused
* by different setting of search_path on remote side.
*/ */
static void static void
deparseFuncExpr(StringInfo buf, FuncExpr *node, PlannerInfo *root) deparseFuncExpr(StringInfo buf, FuncExpr *node, PlannerInfo *root)
...@@ -842,7 +860,6 @@ deparseFuncExpr(StringInfo buf, FuncExpr *node, PlannerInfo *root) ...@@ -842,7 +860,6 @@ deparseFuncExpr(StringInfo buf, FuncExpr *node, PlannerInfo *root)
HeapTuple proctup; HeapTuple proctup;
Form_pg_proc procform; Form_pg_proc procform;
const char *proname; const char *proname;
const char *schemaname;
bool use_variadic; bool use_variadic;
bool first; bool first;
ListCell *arg; ListCell *arg;
...@@ -851,7 +868,6 @@ deparseFuncExpr(StringInfo buf, FuncExpr *node, PlannerInfo *root) ...@@ -851,7 +868,6 @@ deparseFuncExpr(StringInfo buf, FuncExpr *node, PlannerInfo *root)
if (!HeapTupleIsValid(proctup)) if (!HeapTupleIsValid(proctup))
elog(ERROR, "cache lookup failed for function %u", node->funcid); elog(ERROR, "cache lookup failed for function %u", node->funcid);
procform = (Form_pg_proc) GETSTRUCT(proctup); procform = (Form_pg_proc) GETSTRUCT(proctup);
proname = NameStr(procform->proname);
/* Check if need to print VARIADIC (cf. ruleutils.c) */ /* Check if need to print VARIADIC (cf. ruleutils.c) */
if (OidIsValid(procform->provariadic)) if (OidIsValid(procform->provariadic))
...@@ -864,11 +880,18 @@ deparseFuncExpr(StringInfo buf, FuncExpr *node, PlannerInfo *root) ...@@ -864,11 +880,18 @@ deparseFuncExpr(StringInfo buf, FuncExpr *node, PlannerInfo *root)
else else
use_variadic = false; use_variadic = false;
/* Print schema name only if it's not pg_catalog */
if (procform->pronamespace != PG_CATALOG_NAMESPACE)
{
const char *schemaname;
schemaname = get_namespace_name(procform->pronamespace);
appendStringInfo(buf, "%s.", quote_identifier(schemaname));
}
/* Deparse the function name ... */ /* Deparse the function name ... */
schemaname = get_namespace_name(procform->pronamespace); proname = NameStr(procform->proname);
appendStringInfo(buf, "%s.%s(", appendStringInfo(buf, "%s(", quote_identifier(proname));
quote_identifier(schemaname),
quote_identifier(proname));
/* ... and all the arguments */ /* ... and all the arguments */
first = true; first = true;
foreach(arg, node->args) foreach(arg, node->args)
...@@ -887,16 +910,13 @@ deparseFuncExpr(StringInfo buf, FuncExpr *node, PlannerInfo *root) ...@@ -887,16 +910,13 @@ deparseFuncExpr(StringInfo buf, FuncExpr *node, PlannerInfo *root)
/* /*
* Deparse given operator expression into buf. To avoid problems around * Deparse given operator expression into buf. To avoid problems around
* priority of operations, we always parenthesize the arguments. Also we use * priority of operations, we always parenthesize the arguments.
* OPERATOR(schema.operator) notation to determine remote operator exactly.
*/ */
static void static void
deparseOpExpr(StringInfo buf, OpExpr *node, PlannerInfo *root) deparseOpExpr(StringInfo buf, OpExpr *node, PlannerInfo *root)
{ {
HeapTuple tuple; HeapTuple tuple;
Form_pg_operator form; Form_pg_operator form;
const char *opnspname;
char *opname;
char oprkind; char oprkind;
ListCell *arg; ListCell *arg;
...@@ -905,10 +925,6 @@ deparseOpExpr(StringInfo buf, OpExpr *node, PlannerInfo *root) ...@@ -905,10 +925,6 @@ deparseOpExpr(StringInfo buf, OpExpr *node, PlannerInfo *root)
if (!HeapTupleIsValid(tuple)) if (!HeapTupleIsValid(tuple))
elog(ERROR, "cache lookup failed for operator %u", node->opno); elog(ERROR, "cache lookup failed for operator %u", node->opno);
form = (Form_pg_operator) GETSTRUCT(tuple); form = (Form_pg_operator) GETSTRUCT(tuple);
opnspname = quote_identifier(get_namespace_name(form->oprnamespace));
/* opname is not a SQL identifier, so we don't need to quote it. */
opname = NameStr(form->oprname);
oprkind = form->oprkind; oprkind = form->oprkind;
/* Sanity check. */ /* Sanity check. */
...@@ -927,8 +943,8 @@ deparseOpExpr(StringInfo buf, OpExpr *node, PlannerInfo *root) ...@@ -927,8 +943,8 @@ deparseOpExpr(StringInfo buf, OpExpr *node, PlannerInfo *root)
appendStringInfoChar(buf, ' '); appendStringInfoChar(buf, ' ');
} }
/* Deparse fully qualified operator name. */ /* Deparse operator name. */
appendStringInfo(buf, "OPERATOR(%s.%s)", opnspname, opname); deparseOperatorName(buf, form);
/* Deparse right operand. */ /* Deparse right operand. */
if (oprkind == 'l' || oprkind == 'b') if (oprkind == 'l' || oprkind == 'b')
...@@ -943,6 +959,34 @@ deparseOpExpr(StringInfo buf, OpExpr *node, PlannerInfo *root) ...@@ -943,6 +959,34 @@ deparseOpExpr(StringInfo buf, OpExpr *node, PlannerInfo *root)
ReleaseSysCache(tuple); ReleaseSysCache(tuple);
} }
/*
* Print the name of an operator.
*/
static void
deparseOperatorName(StringInfo buf, Form_pg_operator opform)
{
char *opname;
/* opname is not a SQL identifier, so we should not quote it. */
opname = NameStr(opform->oprname);
/* Print schema name only if it's not pg_catalog */
if (opform->oprnamespace != PG_CATALOG_NAMESPACE)
{
const char *opnspname;
opnspname = get_namespace_name(opform->oprnamespace);
/* Print fully qualified operator name. */
appendStringInfo(buf, "OPERATOR(%s.%s)",
quote_identifier(opnspname), opname);
}
else
{
/* Just print operator name. */
appendStringInfo(buf, "%s", opname);
}
}
/* /*
* Deparse IS DISTINCT FROM. * Deparse IS DISTINCT FROM.
*/ */
...@@ -960,9 +1004,7 @@ deparseDistinctExpr(StringInfo buf, DistinctExpr *node, PlannerInfo *root) ...@@ -960,9 +1004,7 @@ deparseDistinctExpr(StringInfo buf, DistinctExpr *node, PlannerInfo *root)
/* /*
* Deparse given ScalarArrayOpExpr expression into buf. To avoid problems * Deparse given ScalarArrayOpExpr expression into buf. To avoid problems
* around priority of operations, we always parenthesize the arguments. Also * around priority of operations, we always parenthesize the arguments.
* we use OPERATOR(schema.operator) notation to determine remote operator
* exactly.
*/ */
static void static void
deparseScalarArrayOpExpr(StringInfo buf, deparseScalarArrayOpExpr(StringInfo buf,
...@@ -971,8 +1013,6 @@ deparseScalarArrayOpExpr(StringInfo buf, ...@@ -971,8 +1013,6 @@ deparseScalarArrayOpExpr(StringInfo buf,
{ {
HeapTuple tuple; HeapTuple tuple;
Form_pg_operator form; Form_pg_operator form;
const char *opnspname;
char *opname;
Expr *arg1; Expr *arg1;
Expr *arg2; Expr *arg2;
...@@ -982,10 +1022,6 @@ deparseScalarArrayOpExpr(StringInfo buf, ...@@ -982,10 +1022,6 @@ deparseScalarArrayOpExpr(StringInfo buf,
elog(ERROR, "cache lookup failed for operator %u", node->opno); elog(ERROR, "cache lookup failed for operator %u", node->opno);
form = (Form_pg_operator) GETSTRUCT(tuple); form = (Form_pg_operator) GETSTRUCT(tuple);
opnspname = quote_identifier(get_namespace_name(form->oprnamespace));
/* opname is not a SQL identifier, so we don't need to quote it. */
opname = NameStr(form->oprname);
/* Sanity check. */ /* Sanity check. */
Assert(list_length(node->args) == 2); Assert(list_length(node->args) == 2);
...@@ -995,10 +1031,11 @@ deparseScalarArrayOpExpr(StringInfo buf, ...@@ -995,10 +1031,11 @@ deparseScalarArrayOpExpr(StringInfo buf,
/* Deparse left operand. */ /* Deparse left operand. */
arg1 = linitial(node->args); arg1 = linitial(node->args);
deparseExpr(buf, arg1, root); deparseExpr(buf, arg1, root);
appendStringInfoChar(buf, ' ');
/* Deparse fully qualified operator name plus decoration. */ /* Deparse operator name plus decoration. */
appendStringInfo(buf, " OPERATOR(%s.%s) %s (", deparseOperatorName(buf, form);
opnspname, opname, node->useOr ? "ANY" : "ALL"); appendStringInfo(buf, " %s (", node->useOr ? "ANY" : "ALL");
/* Deparse right operand. */ /* Deparse right operand. */
arg2 = lsecond(node->args); arg2 = lsecond(node->args);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment