Commit 7fc1a81e authored by Fujii Masao's avatar Fujii Masao

postgres_fdw: Restructure connection retry logic.

Commit 32a9c0bd introduced connection retry logic into postgres_fdw.
Previously it used goto statement for retry. This commit gets rid of that
goto from the logic to make the code simpler and easier-to-read.

When getting out of PG_CATCH() for the retry, the error state should be
cleaned up and the memory context should be reset. But commit 32a9c0bd
forgot to do that. This commit also fixes this bug.

Previously only PQstatus()==CONNECTION_BAD was verified to detect
connection failure. But this could cause false detection in the case where
any error other than connection failure (e.g., out-of-memory) was thrown
after a broken connection was detected in libpq and CONNECTION_BAD is set.
To fix this issue, this commit changes the logic so that it also checks
the error's sqlstate is ERRCODE_CONNECTION_FAILURE.

Author: Fujii Masao
Reviewed-by: Tom Lane
Discussion: https://postgr.es/m/2943611.1602375376@sss.pgh.pa.us
parent fe2a16d8
...@@ -74,6 +74,7 @@ static unsigned int prep_stmt_number = 0; ...@@ -74,6 +74,7 @@ static unsigned int prep_stmt_number = 0;
static bool xact_got_connection = false; static bool xact_got_connection = false;
/* prototypes of private functions */ /* prototypes of private functions */
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
static void disconnect_pg_server(ConnCacheEntry *entry); static void disconnect_pg_server(ConnCacheEntry *entry);
static void check_conn_params(const char **keywords, const char **values, UserMapping *user); static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
...@@ -108,9 +109,10 @@ PGconn * ...@@ -108,9 +109,10 @@ PGconn *
GetConnection(UserMapping *user, bool will_prep_stmt) GetConnection(UserMapping *user, bool will_prep_stmt)
{ {
bool found; bool found;
volatile bool retry_conn = false; bool retry = false;
ConnCacheEntry *entry; ConnCacheEntry *entry;
ConnCacheKey key; ConnCacheKey key;
MemoryContext ccxt = CurrentMemoryContext;
/* First time through, initialize connection cache hashtable */ /* First time through, initialize connection cache hashtable */
if (ConnectionHash == NULL) if (ConnectionHash == NULL)
...@@ -160,23 +162,14 @@ GetConnection(UserMapping *user, bool will_prep_stmt) ...@@ -160,23 +162,14 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
/* Reject further use of connections which failed abort cleanup. */ /* Reject further use of connections which failed abort cleanup. */
pgfdw_reject_incomplete_xact_state_change(entry); pgfdw_reject_incomplete_xact_state_change(entry);
retry:
/* /*
* If the connection needs to be remade due to invalidation, disconnect as * If the connection needs to be remade due to invalidation, disconnect as
* soon as we're out of all transactions. Also, if previous attempt to * soon as we're out of all transactions.
* start new remote transaction failed on the cached connection,
* disconnect it to retry a new connection.
*/ */
if ((entry->conn != NULL && entry->invalidated && if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
entry->xact_depth == 0) || retry_conn)
{ {
if (retry_conn) elog(DEBUG3, "closing connection %p for option changes to take effect",
elog(DEBUG3, "closing connection %p to reestablish a new one", entry->conn);
entry->conn);
else
elog(DEBUG3, "closing connection %p for option changes to take effect",
entry->conn);
disconnect_pg_server(entry); disconnect_pg_server(entry);
} }
...@@ -186,58 +179,78 @@ retry: ...@@ -186,58 +179,78 @@ retry:
* will remain in a valid empty state, ie conn == NULL.) * will remain in a valid empty state, ie conn == NULL.)
*/ */
if (entry->conn == NULL) if (entry->conn == NULL)
{ make_new_connection(entry, user);
ForeignServer *server = GetForeignServer(user->serverid);
/* Reset all transient state fields, to be sure all are clean */
entry->xact_depth = 0;
entry->have_prep_stmt = false;
entry->have_error = false;
entry->changing_xact_state = false;
entry->invalidated = false;
entry->server_hashvalue =
GetSysCacheHashValue1(FOREIGNSERVEROID,
ObjectIdGetDatum(server->serverid));
entry->mapping_hashvalue =
GetSysCacheHashValue1(USERMAPPINGOID,
ObjectIdGetDatum(user->umid));
/* Now try to make the connection */
entry->conn = connect_pg_server(server, user);
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
entry->conn, server->servername, user->umid, user->userid);
}
/* /*
* We check the health of the cached connection here when starting a new * We check the health of the cached connection here when starting a new
* remote transaction. If a broken connection is detected in the first * remote transaction. If a broken connection is detected, we try to
* attempt, we try to reestablish a new connection. If broken connection * reestablish a new connection later.
* is detected again here, we give up getting a connection.
*/ */
PG_TRY(); PG_TRY();
{ {
/* Start a new transaction or subtransaction if needed. */ /* Start a new transaction or subtransaction if needed. */
begin_remote_xact(entry); begin_remote_xact(entry);
retry_conn = false;
} }
PG_CATCH(); PG_CATCH();
{ {
if (PQstatus(entry->conn) != CONNECTION_BAD || MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
entry->xact_depth > 0 || ErrorData *errdata = CopyErrorData();
retry_conn)
/*
* If connection failure is reported when starting a new remote
* transaction (not subtransaction), new connection will be
* reestablished later.
*
* After a broken connection is detected in libpq, any error other
* than connection failure (e.g., out-of-memory) can be thrown
* somewhere between return from libpq and the expected ereport() call
* in pgfdw_report_error(). In this case, since PQstatus() indicates
* CONNECTION_BAD, checking only PQstatus() causes the false detection
* of connection failure. To avoid this, we also verify that the
* error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
* checking only the sqlstate can cause another false detection
* because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
* for any libpq-originated error condition.
*/
if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
PQstatus(entry->conn) != CONNECTION_BAD ||
entry->xact_depth > 0)
{
MemoryContextSwitchTo(ecxt);
PG_RE_THROW(); PG_RE_THROW();
retry_conn = true; }
/* Clean up the error state */
FlushErrorState();
FreeErrorData(errdata);
errdata = NULL;
retry = true;
} }
PG_END_TRY(); PG_END_TRY();
if (retry_conn) /*
* If a broken connection is detected, disconnect it, reestablish a new
* connection and retry a new remote transaction. If connection failure is
* reported again, we give up getting a connection.
*/
if (retry)
{ {
Assert(entry->xact_depth == 0);
ereport(DEBUG3, ereport(DEBUG3,
(errmsg_internal("could not start remote transaction on connection %p", (errmsg_internal("could not start remote transaction on connection %p",
entry->conn)), entry->conn)),
errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn)))); errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
goto retry;
elog(DEBUG3, "closing connection %p to reestablish a new one",
entry->conn);
disconnect_pg_server(entry);
if (entry->conn == NULL)
make_new_connection(entry, user);
begin_remote_xact(entry);
} }
/* Remember if caller will prepare statements */ /* Remember if caller will prepare statements */
...@@ -246,6 +259,37 @@ retry: ...@@ -246,6 +259,37 @@ retry:
return entry->conn; return entry->conn;
} }
/*
* Reset all transient state fields in the cached connection entry and
* establish new connection to the remote server.
*/
static void
make_new_connection(ConnCacheEntry *entry, UserMapping *user)
{
ForeignServer *server = GetForeignServer(user->serverid);
Assert(entry->conn == NULL);
/* Reset all transient state fields, to be sure all are clean */
entry->xact_depth = 0;
entry->have_prep_stmt = false;
entry->have_error = false;
entry->changing_xact_state = false;
entry->invalidated = false;
entry->server_hashvalue =
GetSysCacheHashValue1(FOREIGNSERVEROID,
ObjectIdGetDatum(server->serverid));
entry->mapping_hashvalue =
GetSysCacheHashValue1(USERMAPPINGOID,
ObjectIdGetDatum(user->umid));
/* Now try to make the connection */
entry->conn = connect_pg_server(server, user);
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
entry->conn, server->servername, user->umid, user->userid);
}
/* /*
* Connect to remote server using specified server and user mapping properties. * Connect to remote server using specified server and user mapping properties.
*/ */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment