Commit ae9bfc5d authored by Robert Haas's avatar Robert Haas

postgres_fdw: Allow cancellation of transaction control commands.

Commit f039eaac, later back-patched
with commit 1b812afb, allowed many of
the queries issued by postgres_fdw to fetch remote data to respond to
cancel interrupts in a timely fashion.  However, it didn't do anything
about the transaction control commands, which remained
noninterruptible.

Improve the situation by changing do_sql_command() to retrieve query
results using pgfdw_get_result(), which uses the asynchronous
interface to libpq so that it can check for interrupts every time
libpq returns control.  Since this might result in a situation
where we can no longer be sure that the remote transaction state
matches the local transaction state, add a facility to force all
levels of the local transaction to abort if we've lost track of
the remote state; without this, an apparently-successful commit of
the local transaction might fail to commit changes made on the
remote side.  Also, add a 60-second timeout for queries issue during
transaction abort; if that expires, give up and mark the state of
the connection as unknown.  Drop all such connections when we exit
the local transaction.  Together, these changes mean that if we're
aborting the local toplevel transaction anyway, we can just drop the
remote connection in lieu of waiting (possibly for a very long time)
for it to complete an abort.

This still leaves quite a bit of room for improvement.  PQcancel()
has no asynchronous interface, so if we get stuck sending the cancel
request we'll still hang.  Also, PQsetnonblocking() is not used, which
means we could block uninterruptibly when sending a query.  There
might be some other optimizations possible as well.  Nonetheless,
this allows us to escape a wait for an unresponsive remote server
quickly in many more cases than previously.

Report by Suraj Kharage.  Patch by me and Rafia Sabih.  Review
and testing by Amit Kapila and Tushar Ahuja.

Discussion: http://postgr.es/m/CAF1DzPU8Kx+fMXEbFoP289xtm3bz3t+ZfxhmKavr98Bh-C0TqQ@mail.gmail.com
parent 644ea35f
......@@ -14,6 +14,8 @@
#include "postgres_fdw.h"
#include "access/htup_details.h"
#include "catalog/pg_user_mapping.h"
#include "access/xact.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
......@@ -21,6 +23,7 @@
#include "storage/latch.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
/*
......@@ -49,6 +52,7 @@ typedef struct ConnCacheEntry
* one level of subxact open, etc */
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
bool have_error; /* have any subxacts aborted in this xact? */
bool changing_xact_state; /* xact state change in process */
} ConnCacheEntry;
/*
......@@ -74,6 +78,12 @@ static void pgfdw_subxact_callback(SubXactEvent event,
SubTransactionId mySubid,
SubTransactionId parentSubid,
void *arg);
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
static bool pgfdw_cancel_query(PGconn *conn);
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
bool ignore_errors);
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
PGresult **result);
/*
......@@ -139,8 +149,12 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
entry->xact_depth = 0;
entry->have_prep_stmt = false;
entry->have_error = false;
entry->changing_xact_state = false;
}
/* Reject further use of connections which failed abort cleanup. */
pgfdw_reject_incomplete_xact_state_change(entry);
/*
* We don't check the health of cached connection here, because it would
* require some overhead. Broken connection will be detected when the
......@@ -343,7 +357,9 @@ do_sql_command(PGconn *conn, const char *sql)
{
PGresult *res;
res = PQexec(conn, sql);
if (!PQsendQuery(conn, sql))
pgfdw_report_error(ERROR, NULL, conn, false, sql);
res = pgfdw_get_result(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
......@@ -376,8 +392,10 @@ begin_remote_xact(ConnCacheEntry *entry)
sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
else
sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
entry->changing_xact_state = true;
do_sql_command(entry->conn, sql);
entry->xact_depth = 1;
entry->changing_xact_state = false;
}
/*
......@@ -390,8 +408,10 @@ begin_remote_xact(ConnCacheEntry *entry)
char sql[64];
snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
entry->changing_xact_state = true;
do_sql_command(entry->conn, sql);
entry->xact_depth++;
entry->changing_xact_state = false;
}
}
......@@ -604,6 +624,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* If it has an open remote transaction, try to close it */
if (entry->xact_depth > 0)
{
bool abort_cleanup_failure = false;
elog(DEBUG3, "closing remote transaction on connection %p",
entry->conn);
......@@ -611,8 +633,17 @@ pgfdw_xact_callback(XactEvent event, void *arg)
{
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_COMMIT:
/*
* If abort cleanup previously failed for this connection,
* we can't issue any more commands against it.
*/
pgfdw_reject_incomplete_xact_state_change(entry);
/* Commit all remote transactions during pre-commit */
entry->changing_xact_state = true;
do_sql_command(entry->conn, "COMMIT TRANSACTION");
entry->changing_xact_state = false;
/*
* If there were any errors in subtransactions, and we
......@@ -660,6 +691,27 @@ pgfdw_xact_callback(XactEvent event, void *arg)
break;
case XACT_EVENT_PARALLEL_ABORT:
case XACT_EVENT_ABORT:
/*
* Don't try to clean up the connection if we're already
* in error recursion trouble.
*/
if (in_error_recursion_trouble())
entry->changing_xact_state = true;
/*
* If connection is already unsalvageable, don't touch it
* further.
*/
if (entry->changing_xact_state)
break;
/*
* Mark this connection as in the process of changing
* transaction state.
*/
entry->changing_xact_state = true;
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
......@@ -670,40 +722,35 @@ pgfdw_xact_callback(XactEvent event, void *arg)
* command is still being processed by the remote server,
* and if so, request cancellation of the command.
*/
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
!pgfdw_cancel_query(entry->conn))
{
PGcancel *cancel;
char errbuf[256];
if ((cancel = PQgetCancel(entry->conn)))
{
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
ereport(WARNING,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not send cancel request: %s",
errbuf)));
PQfreeCancel(cancel);
}
/* Unable to cancel running query. */
abort_cleanup_failure = true;
}
else if (!pgfdw_exec_cleanup_query(entry->conn,
"ABORT TRANSACTION",
false))
{
/* Unable to abort remote transaction. */
abort_cleanup_failure = true;
}
else if (entry->have_prep_stmt && entry->have_error &&
!pgfdw_exec_cleanup_query(entry->conn,
"DEALLOCATE ALL",
true))
{
/* Trouble clearing prepared statements. */
abort_cleanup_failure = true;
}
/* If we're aborting, abort all remote transactions too */
res = PQexec(entry->conn, "ABORT TRANSACTION");
/* Note: can't throw ERROR, it would be infinite loop */
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(WARNING, res, entry->conn, true,
"ABORT TRANSACTION");
else
{
PQclear(res);
/* As above, make sure to clear any prepared stmts */
if (entry->have_prep_stmt && entry->have_error)
{
res = PQexec(entry->conn, "DEALLOCATE ALL");
PQclear(res);
}
entry->have_prep_stmt = false;
entry->have_error = false;
}
/* Disarm changing_xact_state if it all worked. */
entry->changing_xact_state = abort_cleanup_failure;
break;
}
}
......@@ -716,11 +763,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
* recover. Next GetConnection will open a new connection.
*/
if (PQstatus(entry->conn) != CONNECTION_OK ||
PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
entry->changing_xact_state)
{
elog(DEBUG3, "discarding connection %p", entry->conn);
PQfinish(entry->conn);
entry->conn = NULL;
entry->changing_xact_state = false;
}
}
......@@ -763,7 +812,6 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
PGresult *res;
char sql[100];
/*
......@@ -779,12 +827,33 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
{
/*
* If abort cleanup previously failed for this connection, we
* can't issue any more commands against it.
*/
pgfdw_reject_incomplete_xact_state_change(entry);
/* Commit all remote subtransactions during pre-commit */
snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
entry->changing_xact_state = true;
do_sql_command(entry->conn, sql);
entry->changing_xact_state = false;
}
else
else if (in_error_recursion_trouble())
{
/*
* Don't try to clean up the connection if we're already in error
* recursion trouble.
*/
entry->changing_xact_state = true;
}
else if (!entry->changing_xact_state)
{
bool abort_cleanup_failure = false;
/* Remember that abort cleanup is in progress. */
entry->changing_xact_state = true;
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
......@@ -795,34 +864,220 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
* processed by the remote server, and if so, request cancellation
* of the command.
*/
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
!pgfdw_cancel_query(entry->conn))
abort_cleanup_failure = true;
else
{
PGcancel *cancel;
char errbuf[256];
if ((cancel = PQgetCancel(entry->conn)))
{
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
ereport(WARNING,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not send cancel request: %s",
errbuf)));
PQfreeCancel(cancel);
}
/* Rollback all remote subtransactions during abort */
snprintf(sql, sizeof(sql),
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
curlevel, curlevel);
if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
abort_cleanup_failure = true;
}
/* Rollback all remote subtransactions during abort */
snprintf(sql, sizeof(sql),
"ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d",
curlevel, curlevel);
res = PQexec(entry->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(WARNING, res, entry->conn, true, sql);
else
PQclear(res);
/* Disarm changing_xact_state if it all worked. */
entry->changing_xact_state = abort_cleanup_failure;
}
/* OK, we're outta that level of subtransaction */
entry->xact_depth--;
}
}
/*
* Raise an error if the given connection cache entry is marked as being
* in the middle of an xact state change. This should be called at which no
* such change is expected to be in progress; if one is found to be in
* progress, it means that we aborted in the middle of a previous state change
* and now don't know what the remote transaction state actually is.
* Such connections can't safely be further used. Re-establishing the
* connection would change the snapshot and roll back any writes already
* performed, so that's not an option, either. Thus, we must abort.
*/
static void
pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
{
HeapTuple tup;
Form_pg_user_mapping umform;
ForeignServer *server;
if (!entry->changing_xact_state)
return;
tup = SearchSysCache1(USERMAPPINGOID,
ObjectIdGetDatum(entry->key));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for user mapping %u", entry->key);
umform = (Form_pg_user_mapping) GETSTRUCT(tup);
server = GetForeignServer(umform->umserver);
ReleaseSysCache(tup);
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("connection to server \"%s\" was lost",
server->servername)));
}
/*
* Cancel the currently-in-progress query (whose query text we do not have)
* and ignore the result. Returns true if we successfully cancel the query
* and discard any pending result, and false if not.
*/
static bool
pgfdw_cancel_query(PGconn *conn)
{
PGcancel *cancel;
char errbuf[256];
PGresult *result = NULL;
TimestampTz endtime;
/*
* If it takes too long to cancel the query and discard the result, assume
* the connection is dead.
*/
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
/*
* Issue cancel request. Unfortunately, there's no good way to limit the
* amount of time that we might block inside PQgetCancel().
*/
if ((cancel = PQgetCancel(conn)))
{
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
{
ereport(WARNING,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not send cancel request: %s",
errbuf)));
PQfreeCancel(cancel);
return false;
}
PQfreeCancel(cancel);
}
/* Get and discard the result of the query. */
if (pgfdw_get_cleanup_result(conn, endtime, &result))
return false;
PQclear(result);
return true;
}
/*
* Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
* result. If the query is executed without error, the return value is true.
* If the query is executed successfully but returns an error, the return
* value is true if and only if ignore_errors is set. If the query can't be
* sent or times out, the return value is false.
*/
static bool
pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
{
PGresult *result = NULL;
TimestampTz endtime;
/*
* If it takes too long to execute a cleanup query, assume the connection
* is dead. It's fairly likely that this is why we aborted in the first
* place (e.g. statement timeout, user cancel), so the timeout shouldn't
* be too long.
*/
endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30000);
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
*/
if (!PQsendQuery(conn, query))
{
pgfdw_report_error(WARNING, NULL, conn, false, query);
return false;
}
/* Get the result of the query. */
if (pgfdw_get_cleanup_result(conn, endtime, &result))
return false;
/* Issue a warning if not successful. */
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
pgfdw_report_error(WARNING, result, conn, true, query);
return ignore_errors;
}
return true;
}
/*
* Get, during abort cleanup, the result of a query that is in progress. This
* might be a query that is being interrupted by transaction abort, or it might
* be a query that was initiated as part of transaction abort to get the remote
* side back to the appropriate state.
*
* It's not a huge problem if we throw an ERROR here, but if we get into error
* recursion trouble, we'll end up slamming the connection shut, which will
* necessitate failing the entire toplevel transaction even if subtransactions
* were used. Try to use WARNING where we can.
*
* endtime is the time at which we should give up and assume the remote
* side is dead. Returns true if the timeout expired, otherwise false.
* Sets *result except in case of a timeout.
*/
static bool
pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result)
{
PGresult *last_res = NULL;
for (;;)
{
PGresult *res;
while (PQisBusy(conn))
{
int wc;
TimestampTz now = GetCurrentTimestamp();
long secs;
int microsecs;
long cur_timeout;
/* If timeout has expired, give up, else get sleep time. */
if (now >= endtime)
return true;
TimestampDifference(now, endtime, &secs, &microsecs);
/* To protect against clock skew, limit sleep to one minute. */
cur_timeout = Min(60000, secs * USECS_PER_SEC + microsecs);
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE | WL_TIMEOUT,
PQsocket(conn),
cur_timeout, PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket */
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(conn))
{
*result = NULL;
return false;
}
}
}
res = PQgetResult(conn);
if (res == NULL)
break; /* query is complete */
PQclear(last_res);
last_res = res;
}
*result = last_res;
return false;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment