Commit 8bf58c0d authored by Tom Lane's avatar Tom Lane

Re-establish postgres_fdw connections after server or user mapping changes.

Previously, postgres_fdw would keep on using an existing connection even
if the user did ALTER SERVER or ALTER USER MAPPING commands that should
affect connection parameters.  Teach it to watch for catcache invals
on these catalogs and re-establish connections when the relevant catalog
entries change.  Per bug #14738 from Michal Lis.

In passing, clean up some rather crufty decisions in commit ae9bfc5d
about where fields of ConnCacheEntry should be reset.  We now reset
all the fields whenever we open a new connection.

Kyotaro Horiguchi, reviewed by Ashutosh Bapat and myself.
Back-patch to 9.3 where postgres_fdw appeared.

Discussion: https://postgr.es/m/20170710113917.7727.10247@wrigleys.postgresql.org
parent 7e1fb4c5
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#include "pgstat.h" #include "pgstat.h"
#include "storage/latch.h" #include "storage/latch.h"
#include "utils/hsearch.h" #include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/syscache.h" #include "utils/syscache.h"
...@@ -48,11 +49,15 @@ typedef struct ConnCacheEntry ...@@ -48,11 +49,15 @@ typedef struct ConnCacheEntry
{ {
ConnCacheKey key; /* hash key (must be first) */ ConnCacheKey key; /* hash key (must be first) */
PGconn *conn; /* connection to foreign server, or NULL */ PGconn *conn; /* connection to foreign server, or NULL */
/* Remaining fields are invalid when conn is NULL: */
int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 = int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
* one level of subxact open, etc */ * one level of subxact open, etc */
bool have_prep_stmt; /* have we prepared any stmts in this xact? */ bool have_prep_stmt; /* have we prepared any stmts in this xact? */
bool have_error; /* have any subxacts aborted in this xact? */ bool have_error; /* have any subxacts aborted in this xact? */
bool changing_xact_state; /* xact state change in process */ bool changing_xact_state; /* xact state change in process */
bool invalidated; /* true if reconnect is pending */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
} ConnCacheEntry; } ConnCacheEntry;
/* /*
...@@ -69,6 +74,7 @@ static bool xact_got_connection = false; ...@@ -69,6 +74,7 @@ static bool xact_got_connection = false;
/* prototypes of private functions */ /* prototypes of private functions */
static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
static void disconnect_pg_server(ConnCacheEntry *entry);
static void check_conn_params(const char **keywords, const char **values); static void check_conn_params(const char **keywords, const char **values);
static void configure_remote_session(PGconn *conn); static void configure_remote_session(PGconn *conn);
static void do_sql_command(PGconn *conn, const char *sql); static void do_sql_command(PGconn *conn, const char *sql);
...@@ -78,6 +84,7 @@ static void pgfdw_subxact_callback(SubXactEvent event, ...@@ -78,6 +84,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
SubTransactionId mySubid, SubTransactionId mySubid,
SubTransactionId parentSubid, SubTransactionId parentSubid,
void *arg); void *arg);
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry); static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
static bool pgfdw_cancel_query(PGconn *conn); static bool pgfdw_cancel_query(PGconn *conn);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
...@@ -95,13 +102,6 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, ...@@ -95,13 +102,6 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
* will_prep_stmt must be true if caller intends to create any prepared * will_prep_stmt must be true if caller intends to create any prepared
* statements. Since those don't go away automatically at transaction end * statements. Since those don't go away automatically at transaction end
* (not even on error), we need this flag to cue manual cleanup. * (not even on error), we need this flag to cue manual cleanup.
*
* XXX Note that caching connections theoretically requires a mechanism to
* detect change of FDW objects to invalidate already established connections.
* We could manage that by watching for invalidation events on the relevant
* syscaches. For the moment, though, it's not clear that this would really
* be useful and not mere pedantry. We could not flush any active connections
* mid-transaction anyway.
*/ */
PGconn * PGconn *
GetConnection(UserMapping *user, bool will_prep_stmt) GetConnection(UserMapping *user, bool will_prep_stmt)
...@@ -130,6 +130,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt) ...@@ -130,6 +130,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
*/ */
RegisterXactCallback(pgfdw_xact_callback, NULL); RegisterXactCallback(pgfdw_xact_callback, NULL);
RegisterSubXactCallback(pgfdw_subxact_callback, NULL); RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
pgfdw_inval_callback, (Datum) 0);
CacheRegisterSyscacheCallback(USERMAPPINGOID,
pgfdw_inval_callback, (Datum) 0);
} }
/* Set flag that we did GetConnection during the current transaction */ /* Set flag that we did GetConnection during the current transaction */
...@@ -144,17 +148,27 @@ GetConnection(UserMapping *user, bool will_prep_stmt) ...@@ -144,17 +148,27 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found); entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
if (!found) if (!found)
{ {
/* initialize new hashtable entry (key is already filled in) */ /*
* We need only clear "conn" here; remaining fields will be filled
* later when "conn" is set.
*/
entry->conn = NULL; entry->conn = NULL;
entry->xact_depth = 0;
entry->have_prep_stmt = false;
entry->have_error = false;
entry->changing_xact_state = false;
} }
/* Reject further use of connections which failed abort cleanup. */ /* Reject further use of connections which failed abort cleanup. */
pgfdw_reject_incomplete_xact_state_change(entry); pgfdw_reject_incomplete_xact_state_change(entry);
/*
* If the connection needs to be remade due to invalidation, disconnect as
* soon as we're out of all transactions.
*/
if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
{
elog(DEBUG3, "closing connection %p for option changes to take effect",
entry->conn);
disconnect_pg_server(entry);
}
/* /*
* We don't check the health of cached connection here, because it would * We don't check the health of cached connection here, because it would
* require some overhead. Broken connection will be detected when the * require some overhead. Broken connection will be detected when the
...@@ -164,15 +178,26 @@ GetConnection(UserMapping *user, bool will_prep_stmt) ...@@ -164,15 +178,26 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
/* /*
* If cache entry doesn't have a connection, we have to establish a new * If cache entry doesn't have a connection, we have to establish a new
* connection. (If connect_pg_server throws an error, the cache entry * connection. (If connect_pg_server throws an error, the cache entry
* will be left in a valid empty state.) * will remain in a valid empty state, ie conn == NULL.)
*/ */
if (entry->conn == NULL) if (entry->conn == NULL)
{ {
ForeignServer *server = GetForeignServer(user->serverid); ForeignServer *server = GetForeignServer(user->serverid);
entry->xact_depth = 0; /* just to be sure */ /* Reset all transient state fields, to be sure all are clean */
entry->xact_depth = 0;
entry->have_prep_stmt = false; entry->have_prep_stmt = false;
entry->have_error = false; entry->have_error = false;
entry->changing_xact_state = false;
entry->invalidated = false;
entry->server_hashvalue =
GetSysCacheHashValue1(FOREIGNSERVEROID,
ObjectIdGetDatum(server->serverid));
entry->mapping_hashvalue =
GetSysCacheHashValue1(USERMAPPINGOID,
ObjectIdGetDatum(user->umid));
/* Now try to make the connection */
entry->conn = connect_pg_server(server, user); entry->conn = connect_pg_server(server, user);
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
...@@ -276,6 +301,19 @@ connect_pg_server(ForeignServer *server, UserMapping *user) ...@@ -276,6 +301,19 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
return conn; return conn;
} }
/*
* Disconnect any open connection for a connection cache entry.
*/
static void
disconnect_pg_server(ConnCacheEntry *entry)
{
if (entry->conn != NULL)
{
PQfinish(entry->conn);
entry->conn = NULL;
}
}
/* /*
* For non-superusers, insist that the connstr specify a password. This * For non-superusers, insist that the connstr specify a password. This
* prevents a password from being picked up from .pgpass, a service file, * prevents a password from being picked up from .pgpass, a service file,
...@@ -777,9 +815,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) ...@@ -777,9 +815,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
entry->changing_xact_state) entry->changing_xact_state)
{ {
elog(DEBUG3, "discarding connection %p", entry->conn); elog(DEBUG3, "discarding connection %p", entry->conn);
PQfinish(entry->conn); disconnect_pg_server(entry);
entry->conn = NULL;
entry->changing_xact_state = false;
} }
} }
...@@ -896,6 +932,47 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, ...@@ -896,6 +932,47 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
} }
} }
/*
* Connection invalidation callback function
*
* After a change to a pg_foreign_server or pg_user_mapping catalog entry,
* mark connections depending on that entry as needing to be remade.
* We can't immediately destroy them, since they might be in the midst of
* a transaction, but we'll remake them at the next opportunity.
*
* Although most cache invalidation callbacks blow away all the related stuff
* regardless of the given hashvalue, connections are expensive enough that
* it's worth trying to avoid that.
*
* NB: We could avoid unnecessary disconnection more strictly by examining
* individual option values, but it seems too much effort for the gain.
*/
static void
pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
{
HASH_SEQ_STATUS scan;
ConnCacheEntry *entry;
Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
/* ConnectionHash must exist already, if we're registered */
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
/* Ignore invalid entries */
if (entry->conn == NULL)
continue;
/* hashvalue == 0 means a cache reset, must clear all state */
if (hashvalue == 0 ||
(cacheid == FOREIGNSERVEROID &&
entry->server_hashvalue == hashvalue) ||
(cacheid == USERMAPPINGOID &&
entry->mapping_hashvalue == hashvalue))
entry->invalidated = true;
}
}
/* /*
* Raise an error if the given connection cache entry is marked as being * Raise an error if the given connection cache entry is marked as being
* in the middle of an xact state change. This should be called at which no * in the middle of an xact state change. This should be called at which no
...@@ -913,9 +990,14 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry) ...@@ -913,9 +990,14 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
Form_pg_user_mapping umform; Form_pg_user_mapping umform;
ForeignServer *server; ForeignServer *server;
if (!entry->changing_xact_state) /* nothing to do for inactive entries and entries of sane state */
if (entry->conn == NULL || !entry->changing_xact_state)
return; return;
/* make sure this entry is inactive */
disconnect_pg_server(entry);
/* find server name to be shown in the message below */
tup = SearchSysCache1(USERMAPPINGOID, tup = SearchSysCache1(USERMAPPINGOID,
ObjectIdGetDatum(entry->key)); ObjectIdGetDatum(entry->key));
if (!HeapTupleIsValid(tup)) if (!HeapTupleIsValid(tup))
......
...@@ -191,6 +191,43 @@ ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); ...@@ -191,6 +191,43 @@ ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
public | ft_pg_type | loopback | (schema_name 'pg_catalog', table_name 'pg_type') | public | ft_pg_type | loopback | (schema_name 'pg_catalog', table_name 'pg_type') |
(6 rows) (6 rows)
-- Test that alteration of server options causes reconnection
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work
c3 | c4
-------+------------------------------
00001 | Fri Jan 02 00:00:00 1970 PST
(1 row)
ALTER SERVER loopback OPTIONS (SET dbname 'no such database');
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail
ERROR: could not connect to server "loopback"
DETAIL: FATAL: database "no such database" does not exist
DO $d$
BEGIN
EXECUTE $$ALTER SERVER loopback
OPTIONS (SET dbname '$$||current_database()||$$')$$;
END;
$d$;
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
c3 | c4
-------+------------------------------
00001 | Fri Jan 02 00:00:00 1970 PST
(1 row)
-- Test that alteration of user mapping options causes reconnection
ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
OPTIONS (ADD user 'no such user');
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail
ERROR: could not connect to server "loopback"
DETAIL: FATAL: role "no such user" does not exist
ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
OPTIONS (DROP user);
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
c3 | c4
-------+------------------------------
00001 | Fri Jan 02 00:00:00 1970 PST
(1 row)
-- Now we should be able to run ANALYZE. -- Now we should be able to run ANALYZE.
-- To exercise multiple code paths, we use local stats on ft1 -- To exercise multiple code paths, we use local stats on ft1
-- and remote-estimate mode on ft2. -- and remote-estimate mode on ft2.
......
...@@ -195,6 +195,26 @@ ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); ...@@ -195,6 +195,26 @@ ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
\det+ \det+
-- Test that alteration of server options causes reconnection
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work
ALTER SERVER loopback OPTIONS (SET dbname 'no such database');
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail
DO $d$
BEGIN
EXECUTE $$ALTER SERVER loopback
OPTIONS (SET dbname '$$||current_database()||$$')$$;
END;
$d$;
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
-- Test that alteration of user mapping options causes reconnection
ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
OPTIONS (ADD user 'no such user');
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail
ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
OPTIONS (DROP user);
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
-- Now we should be able to run ANALYZE. -- Now we should be able to run ANALYZE.
-- To exercise multiple code paths, we use local stats on ft1 -- To exercise multiple code paths, we use local stats on ft1
-- and remote-estimate mode on ft2. -- and remote-estimate mode on ft2.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment