Commit 32a9c0bd authored by Fujii Masao's avatar Fujii Masao

postgres_fdw: reestablish new connection if cached one is detected as broken.

In postgres_fdw, once remote connections are established, they are cached
and re-used for subsequent queries and transactions. There can be some
cases where those cached connections are unavaiable, for example,
by the restart of remote server. In these cases, previously an error was
reported and the query accessing to remote server failed if new remote
transaction failed to start because the cached connection was broken.

This commit improves postgres_fdw so that new connection is remade
if broken connection is detected when starting new remote transaction.
This is useful to avoid unnecessary failure of queries when connection is
broken but can be reestablished.

Author: Bharath Rupireddy, tweaked a bit by Fujii Masao
Reviewed-by: Ashutosh Bapat, Tatsuhito Kasahara, Fujii Masao
Discussion: https://postgr.es/m/CALj2ACUAi23vf1WiHNar_LksM9EDOWXcbHCo-fD4Mbr1d=78YQ@mail.gmail.com
parent dd0a64ed
...@@ -108,6 +108,7 @@ PGconn * ...@@ -108,6 +108,7 @@ PGconn *
GetConnection(UserMapping *user, bool will_prep_stmt) GetConnection(UserMapping *user, bool will_prep_stmt)
{ {
bool found; bool found;
volatile bool retry_conn = false;
ConnCacheEntry *entry; ConnCacheEntry *entry;
ConnCacheKey key; ConnCacheKey key;
...@@ -159,23 +160,26 @@ GetConnection(UserMapping *user, bool will_prep_stmt) ...@@ -159,23 +160,26 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
/* Reject further use of connections which failed abort cleanup. */ /* Reject further use of connections which failed abort cleanup. */
pgfdw_reject_incomplete_xact_state_change(entry); pgfdw_reject_incomplete_xact_state_change(entry);
retry:
/* /*
* If the connection needs to be remade due to invalidation, disconnect as * If the connection needs to be remade due to invalidation, disconnect as
* soon as we're out of all transactions. * soon as we're out of all transactions. Also, if previous attempt to
* start new remote transaction failed on the cached connection,
* disconnect it to retry a new connection.
*/ */
if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0) if ((entry->conn != NULL && entry->invalidated &&
entry->xact_depth == 0) || retry_conn)
{ {
if (retry_conn)
elog(DEBUG3, "closing connection %p to reestablish a new one",
entry->conn);
else
elog(DEBUG3, "closing connection %p for option changes to take effect", elog(DEBUG3, "closing connection %p for option changes to take effect",
entry->conn); entry->conn);
disconnect_pg_server(entry); disconnect_pg_server(entry);
} }
/*
* We don't check the health of cached connection here, because it would
* require some overhead. Broken connection will be detected when the
* connection is actually used.
*/
/* /*
* If cache entry doesn't have a connection, we have to establish a new * If cache entry doesn't have a connection, we have to establish a new
* connection. (If connect_pg_server throws an error, the cache entry * connection. (If connect_pg_server throws an error, the cache entry
...@@ -206,9 +210,35 @@ GetConnection(UserMapping *user, bool will_prep_stmt) ...@@ -206,9 +210,35 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
} }
/* /*
* Start a new transaction or subtransaction if needed. * We check the health of the cached connection here when starting a new
* remote transaction. If a broken connection is detected in the first
* attempt, we try to reestablish a new connection. If broken connection
* is detected again here, we give up getting a connection.
*/ */
PG_TRY();
{
/* Start a new transaction or subtransaction if needed. */
begin_remote_xact(entry); begin_remote_xact(entry);
retry_conn = false;
}
PG_CATCH();
{
if (PQstatus(entry->conn) != CONNECTION_BAD ||
entry->xact_depth > 0 ||
retry_conn)
PG_RE_THROW();
retry_conn = true;
}
PG_END_TRY();
if (retry_conn)
{
ereport(DEBUG3,
(errmsg_internal("could not start remote transaction on connection %p",
entry->conn)),
errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
goto retry;
}
/* Remember if caller will prepare statements */ /* Remember if caller will prepare statements */
entry->have_prep_stmt |= will_prep_stmt; entry->have_prep_stmt |= will_prep_stmt;
......
...@@ -8987,3 +8987,51 @@ PREPARE TRANSACTION 'fdw_tpc'; ...@@ -8987,3 +8987,51 @@ PREPARE TRANSACTION 'fdw_tpc';
ERROR: cannot PREPARE a transaction that has operated on postgres_fdw foreign tables ERROR: cannot PREPARE a transaction that has operated on postgres_fdw foreign tables
ROLLBACK; ROLLBACK;
WARNING: there is no transaction in progress WARNING: there is no transaction in progress
-- ===================================================================
-- reestablish new connection
-- ===================================================================
-- Terminate the backend having the specified application_name and wait for
-- the termination to complete.
CREATE OR REPLACE PROCEDURE terminate_backend_and_wait(appname text) AS $$
BEGIN
PERFORM pg_terminate_backend(pid) FROM pg_stat_activity
WHERE application_name = appname;
LOOP
PERFORM * FROM pg_stat_activity WHERE application_name = appname;
EXIT WHEN NOT FOUND;
PERFORM pg_sleep(1), pg_stat_clear_snapshot();
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- Change application_name of remote connection to special one
-- so that we can easily terminate the connection later.
ALTER SERVER loopback OPTIONS (application_name 'fdw_retry_check');
SELECT 1 FROM ft1 LIMIT 1;
?column?
----------
1
(1 row)
-- Terminate the remote connection.
CALL terminate_backend_and_wait('fdw_retry_check');
-- This query should detect the broken connection when starting new remote
-- transaction, reestablish new connection, and then succeed.
BEGIN;
SELECT 1 FROM ft1 LIMIT 1;
?column?
----------
1
(1 row)
-- If the query detects the broken connection when starting new remote
-- subtransaction, it doesn't reestablish new connection and should fail.
CALL terminate_backend_and_wait('fdw_retry_check');
SAVEPOINT s;
SELECT 1 FROM ft1 LIMIT 1; -- should fail
ERROR: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: remote SQL command: SAVEPOINT s2
COMMIT;
-- Clean up
DROP PROCEDURE terminate_backend_and_wait(text);
...@@ -2653,3 +2653,44 @@ SELECT count(*) FROM ft1; ...@@ -2653,3 +2653,44 @@ SELECT count(*) FROM ft1;
-- error here -- error here
PREPARE TRANSACTION 'fdw_tpc'; PREPARE TRANSACTION 'fdw_tpc';
ROLLBACK; ROLLBACK;
-- ===================================================================
-- reestablish new connection
-- ===================================================================
-- Terminate the backend having the specified application_name and wait for
-- the termination to complete.
CREATE OR REPLACE PROCEDURE terminate_backend_and_wait(appname text) AS $$
BEGIN
PERFORM pg_terminate_backend(pid) FROM pg_stat_activity
WHERE application_name = appname;
LOOP
PERFORM * FROM pg_stat_activity WHERE application_name = appname;
EXIT WHEN NOT FOUND;
PERFORM pg_sleep(1), pg_stat_clear_snapshot();
END LOOP;
END;
$$ LANGUAGE plpgsql;
-- Change application_name of remote connection to special one
-- so that we can easily terminate the connection later.
ALTER SERVER loopback OPTIONS (application_name 'fdw_retry_check');
SELECT 1 FROM ft1 LIMIT 1;
-- Terminate the remote connection.
CALL terminate_backend_and_wait('fdw_retry_check');
-- This query should detect the broken connection when starting new remote
-- transaction, reestablish new connection, and then succeed.
BEGIN;
SELECT 1 FROM ft1 LIMIT 1;
-- If the query detects the broken connection when starting new remote
-- subtransaction, it doesn't reestablish new connection and should fail.
CALL terminate_backend_and_wait('fdw_retry_check');
SAVEPOINT s;
SELECT 1 FROM ft1 LIMIT 1; -- should fail
COMMIT;
-- Clean up
DROP PROCEDURE terminate_backend_and_wait(text);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment