Commit 96198d94 authored by Robert Haas's avatar Robert Haas

Avoid multiple foreign server connections when all use same user mapping.

Previously, postgres_fdw's connection cache was keyed by user OID and
server OID, but this can lead to multiple connections when it's not
really necessary.  In particular, if all relevant users are mapped to
the public user mapping, then their connection options are certainly
the same, so one connection can be used for all of them.

While we're cleaning things up here, drop the "server" argument to
GetConnection(), which isn't really needed.  This saves a few cycles
because callers no longer have to look this up; the function itself
does, but only when establishing a new connection, not when reusing
an existing one.

Ashutosh Bapat, with a few small changes by me.
parent 80db1ca2
...@@ -24,9 +24,11 @@ ...@@ -24,9 +24,11 @@
/* /*
* Connection cache hash table entry * Connection cache hash table entry
* *
* The lookup key in this hash table is the foreign server OID plus the user * The lookup key in this hash table is the user mapping OID. We use just one
* mapping OID. (We use just one connection per user per foreign server, * connection per user mapping ID, which ensures that all the scans use the
* so that we can ensure all scans use the same snapshot during a query.) * same snapshot during a query. Using the user mapping OID rather than
* the foreign server OID + user OID avoids creating multiple connections when
* the public user mapping applies to all user OIDs.
* *
* The "conn" pointer can be NULL if we don't currently have a live connection. * The "conn" pointer can be NULL if we don't currently have a live connection.
* When we do have a connection, xact_depth tracks the current depth of * When we do have a connection, xact_depth tracks the current depth of
...@@ -35,11 +37,7 @@ ...@@ -35,11 +37,7 @@
* ourselves, so that rolling back a subtransaction will kill the right * ourselves, so that rolling back a subtransaction will kill the right
* queries and not the wrong ones. * queries and not the wrong ones.
*/ */
typedef struct ConnCacheKey typedef Oid ConnCacheKey;
{
Oid serverid; /* OID of foreign server */
Oid userid; /* OID of local user whose mapping we use */
} ConnCacheKey;
typedef struct ConnCacheEntry typedef struct ConnCacheEntry
{ {
...@@ -94,8 +92,7 @@ static void pgfdw_subxact_callback(SubXactEvent event, ...@@ -94,8 +92,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
* mid-transaction anyway. * mid-transaction anyway.
*/ */
PGconn * PGconn *
GetConnection(ForeignServer *server, UserMapping *user, GetConnection(UserMapping *user, bool will_prep_stmt)
bool will_prep_stmt)
{ {
bool found; bool found;
ConnCacheEntry *entry; ConnCacheEntry *entry;
...@@ -127,8 +124,7 @@ GetConnection(ForeignServer *server, UserMapping *user, ...@@ -127,8 +124,7 @@ GetConnection(ForeignServer *server, UserMapping *user,
xact_got_connection = true; xact_got_connection = true;
/* Create hash key for the entry. Assume no pad bytes in key struct */ /* Create hash key for the entry. Assume no pad bytes in key struct */
key.serverid = server->serverid; key = user->umid;
key.userid = user->userid;
/* /*
* Find or create cached entry for requested connection. * Find or create cached entry for requested connection.
...@@ -156,12 +152,15 @@ GetConnection(ForeignServer *server, UserMapping *user, ...@@ -156,12 +152,15 @@ GetConnection(ForeignServer *server, UserMapping *user,
*/ */
if (entry->conn == NULL) if (entry->conn == NULL)
{ {
ForeignServer *server = GetForeignServer(user->serverid);
entry->xact_depth = 0; /* just to be sure */ entry->xact_depth = 0; /* just to be sure */
entry->have_prep_stmt = false; entry->have_prep_stmt = false;
entry->have_error = false; entry->have_error = false;
entry->conn = connect_pg_server(server, user); entry->conn = connect_pg_server(server, user);
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
entry->conn, server->servername); elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\ (user mapping oid %d, userid %d)",
entry->conn, server->servername, user->umid, user->userid);
} }
/* /*
......
...@@ -1101,7 +1101,6 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) ...@@ -1101,7 +1101,6 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
RangeTblEntry *rte; RangeTblEntry *rte;
Oid userid; Oid userid;
ForeignTable *table; ForeignTable *table;
ForeignServer *server;
UserMapping *user; UserMapping *user;
int numParams; int numParams;
int i; int i;
...@@ -1129,14 +1128,13 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) ...@@ -1129,14 +1128,13 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
/* Get info about foreign table. */ /* Get info about foreign table. */
fsstate->rel = node->ss.ss_currentRelation; fsstate->rel = node->ss.ss_currentRelation;
table = GetForeignTable(RelationGetRelid(fsstate->rel)); table = GetForeignTable(RelationGetRelid(fsstate->rel));
server = GetForeignServer(table->serverid); user = GetUserMapping(userid, table->serverid);
user = GetUserMapping(userid, server->serverid);
/* /*
* Get connection to the foreign server. Connection manager will * Get connection to the foreign server. Connection manager will
* establish new connection if necessary. * establish new connection if necessary.
*/ */
fsstate->conn = GetConnection(server, user, false); fsstate->conn = GetConnection(user, false);
/* Assign a unique ID for my cursor */ /* Assign a unique ID for my cursor */
fsstate->cursor_number = GetCursorNumber(fsstate->conn); fsstate->cursor_number = GetCursorNumber(fsstate->conn);
...@@ -1503,7 +1501,6 @@ postgresBeginForeignModify(ModifyTableState *mtstate, ...@@ -1503,7 +1501,6 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
RangeTblEntry *rte; RangeTblEntry *rte;
Oid userid; Oid userid;
ForeignTable *table; ForeignTable *table;
ForeignServer *server;
UserMapping *user; UserMapping *user;
AttrNumber n_params; AttrNumber n_params;
Oid typefnoid; Oid typefnoid;
...@@ -1530,11 +1527,10 @@ postgresBeginForeignModify(ModifyTableState *mtstate, ...@@ -1530,11 +1527,10 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
/* Get info about foreign table. */ /* Get info about foreign table. */
table = GetForeignTable(RelationGetRelid(rel)); table = GetForeignTable(RelationGetRelid(rel));
server = GetForeignServer(table->serverid); user = GetUserMapping(userid, table->serverid);
user = GetUserMapping(userid, server->serverid);
/* Open connection; report that we'll create a prepared statement. */ /* Open connection; report that we'll create a prepared statement. */
fmstate->conn = GetConnection(server, user, true); fmstate->conn = GetConnection(user, true);
fmstate->p_name = NULL; /* prepared statement not made yet */ fmstate->p_name = NULL; /* prepared statement not made yet */
/* Deconstruct fdw_private data. */ /* Deconstruct fdw_private data. */
...@@ -1988,7 +1984,7 @@ estimate_path_cost_size(PlannerInfo *root, ...@@ -1988,7 +1984,7 @@ estimate_path_cost_size(PlannerInfo *root,
appendOrderByClause(&sql, root, baserel, pathkeys); appendOrderByClause(&sql, root, baserel, pathkeys);
/* Get the remote estimate */ /* Get the remote estimate */
conn = GetConnection(fpinfo->server, fpinfo->user, false); conn = GetConnection(fpinfo->user, false);
get_remote_estimate(sql.data, conn, &rows, &width, get_remote_estimate(sql.data, conn, &rows, &width,
&startup_cost, &total_cost); &startup_cost, &total_cost);
ReleaseConnection(conn); ReleaseConnection(conn);
...@@ -2544,7 +2540,6 @@ postgresAnalyzeForeignTable(Relation relation, ...@@ -2544,7 +2540,6 @@ postgresAnalyzeForeignTable(Relation relation,
BlockNumber *totalpages) BlockNumber *totalpages)
{ {
ForeignTable *table; ForeignTable *table;
ForeignServer *server;
UserMapping *user; UserMapping *user;
PGconn *conn; PGconn *conn;
StringInfoData sql; StringInfoData sql;
...@@ -2565,9 +2560,8 @@ postgresAnalyzeForeignTable(Relation relation, ...@@ -2565,9 +2560,8 @@ postgresAnalyzeForeignTable(Relation relation,
* owner, even if the ANALYZE was started by some other user. * owner, even if the ANALYZE was started by some other user.
*/ */
table = GetForeignTable(RelationGetRelid(relation)); table = GetForeignTable(RelationGetRelid(relation));
server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
user = GetUserMapping(relation->rd_rel->relowner, server->serverid); conn = GetConnection(user, false);
conn = GetConnection(server, user, false);
/* /*
* Construct command to get page count for relation. * Construct command to get page count for relation.
...@@ -2626,7 +2620,6 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, ...@@ -2626,7 +2620,6 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
{ {
PgFdwAnalyzeState astate; PgFdwAnalyzeState astate;
ForeignTable *table; ForeignTable *table;
ForeignServer *server;
UserMapping *user; UserMapping *user;
PGconn *conn; PGconn *conn;
unsigned int cursor_number; unsigned int cursor_number;
...@@ -2657,9 +2650,8 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, ...@@ -2657,9 +2650,8 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
* owner, even if the ANALYZE was started by some other user. * owner, even if the ANALYZE was started by some other user.
*/ */
table = GetForeignTable(RelationGetRelid(relation)); table = GetForeignTable(RelationGetRelid(relation));
server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
user = GetUserMapping(relation->rd_rel->relowner, server->serverid); conn = GetConnection(user, false);
conn = GetConnection(server, user, false);
/* /*
* Construct cursor that retrieves whole rows from remote. * Construct cursor that retrieves whole rows from remote.
...@@ -2860,7 +2852,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) ...@@ -2860,7 +2852,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
*/ */
server = GetForeignServer(serverOid); server = GetForeignServer(serverOid);
mapping = GetUserMapping(GetUserId(), server->serverid); mapping = GetUserMapping(GetUserId(), server->serverid);
conn = GetConnection(server, mapping, false); conn = GetConnection(mapping, false);
/* Don't attempt to import collation if remote server hasn't got it */ /* Don't attempt to import collation if remote server hasn't got it */
if (PQserverVersion(conn) < 90100) if (PQserverVersion(conn) < 90100)
......
...@@ -60,8 +60,7 @@ extern int set_transmission_modes(void); ...@@ -60,8 +60,7 @@ extern int set_transmission_modes(void);
extern void reset_transmission_modes(int nestlevel); extern void reset_transmission_modes(int nestlevel);
/* in connection.c */ /* in connection.c */
extern PGconn *GetConnection(ForeignServer *server, UserMapping *user, extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
bool will_prep_stmt);
extern void ReleaseConnection(PGconn *conn); extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn);
......
...@@ -193,6 +193,7 @@ GetUserMapping(Oid userid, Oid serverid) ...@@ -193,6 +193,7 @@ GetUserMapping(Oid userid, Oid serverid)
MappingUserName(userid)))); MappingUserName(userid))));
um = (UserMapping *) palloc(sizeof(UserMapping)); um = (UserMapping *) palloc(sizeof(UserMapping));
um->umid = HeapTupleGetOid(tp);
um->userid = userid; um->userid = userid;
um->serverid = serverid; um->serverid = serverid;
......
...@@ -55,6 +55,7 @@ typedef struct ForeignServer ...@@ -55,6 +55,7 @@ typedef struct ForeignServer
typedef struct UserMapping typedef struct UserMapping
{ {
Oid umid; /* Oid of user mapping */
Oid userid; /* local user Oid */ Oid userid; /* local user Oid */
Oid serverid; /* server Oid */ Oid serverid; /* server Oid */
List *options; /* useoptions as DefElem list */ List *options; /* useoptions as DefElem list */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment