Commit f039eaac authored by Robert Haas's avatar Robert Haas

Allow queries submitted by postgres_fdw to be canceled.

This fixes a problem which is not new, but with the advent of direct
foreign table modification in 0bf3ae88,
it's somewhat more likely to be annoying than previously.  So,
arrange for a local query cancelation to propagate to the remote side.

Michael Paquier, reviewed by Etsuro Fujita.	 Original report by
Thom Brown.
parent 11e178d0
......@@ -17,6 +17,7 @@
#include "access/xact.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "storage/latch.h"
#include "utils/hsearch.h"
#include "utils/memutils.h"
......@@ -447,6 +448,78 @@ GetPrepStmtNumber(PGconn *conn)
return ++prep_stmt_number;
}
/*
* Submit a query and wait for the result.
*
* This function is interruptible by signals.
*
* Caller is responsible for the error handling on the result.
*/
PGresult *
pgfdw_exec_query(PGconn *conn, const char *query)
{
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
*/
if (!PQsendQuery(conn, query))
pgfdw_report_error(ERROR, NULL, conn, false, query);
/* Wait for the result. */
return pgfdw_get_result(conn, query);
}
/*
* Wait for the result from a prior asynchronous execution function call.
*
* This function offers quick responsiveness by checking for any interruptions.
*
* This function emulates the PQexec()'s behavior of returning the last result
* when there are many.
*
* Caller is responsible for the error handling on the result.
*/
PGresult *
pgfdw_get_result(PGconn *conn, const char *query)
{
PGresult *last_res = NULL;
for (;;)
{
PGresult *res;
while (PQisBusy(conn))
{
int wc;
/* Sleep until there's something to do */
wc = WaitLatchOrSocket(MyLatch,
WL_LATCH_SET | WL_SOCKET_READABLE,
PQsocket(conn),
-1L);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
/* Data available in socket */
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(conn))
pgfdw_report_error(ERROR, NULL, conn, false, query);
}
}
res = PQgetResult(conn);
if (res == NULL)
break; /* query is complete */
PQclear(last_res);
last_res = res;
}
return last_res;
}
/*
* Report an error we got from the remote server.
*
......@@ -598,6 +671,32 @@ pgfdw_xact_callback(XactEvent event, void *arg)
case XACT_EVENT_ABORT:
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
/*
* If a command has been submitted to the remote server by
* using an asynchronous execution function, the command
* might not have yet completed. Check to see if a command
* is still being processed by the remote server, and if so,
* request cancellation of the command; if not, abort
* gracefully.
*/
if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
{
PGcancel *cancel;
char errbuf[256];
if ((cancel = PQgetCancel(entry->conn)))
{
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
ereport(WARNING,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not send cancel request: %s",
errbuf)));
PQfreeCancel(cancel);
}
break;
}
/* If we're aborting, abort all remote transactions too */
res = PQexec(entry->conn, "ABORT TRANSACTION");
/* Note: can't throw ERROR, it would be infinite loop */
......
......@@ -1421,7 +1421,7 @@ postgresReScanForeignScan(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = PQexec(fsstate->conn, sql);
res = pgfdw_exec_query(fsstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
PQclear(res);
......@@ -1749,18 +1749,24 @@ postgresExecForeignInsert(EState *estate,
p_values = convert_prep_stmt_params(fmstate, NULL, slot);
/*
* Execute the prepared statement, and check for success.
* Execute the prepared statement.
*/
if (!PQsendQueryPrepared(fmstate->conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0))
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
/*
* Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = PQexecPrepared(fmstate->conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0);
res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
......@@ -1819,18 +1825,24 @@ postgresExecForeignUpdate(EState *estate,
slot);
/*
* Execute the prepared statement, and check for success.
* Execute the prepared statement.
*/
if (!PQsendQueryPrepared(fmstate->conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0))
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
/*
* Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = PQexecPrepared(fmstate->conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0);
res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
......@@ -1889,18 +1901,24 @@ postgresExecForeignDelete(EState *estate,
NULL);
/*
* Execute the prepared statement, and check for success.
* Execute the prepared statement.
*/
if (!PQsendQueryPrepared(fmstate->conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0))
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
/*
* Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = PQexecPrepared(fmstate->conn,
fmstate->p_name,
fmstate->p_nums,
p_values,
NULL,
NULL,
0);
res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) !=
(fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
......@@ -1950,7 +1968,7 @@ postgresEndForeignModify(EState *estate,
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = PQexec(fmstate->conn, sql);
res = pgfdw_exec_query(fmstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
PQclear(res);
......@@ -2712,7 +2730,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
/*
* Execute EXPLAIN remotely.
*/
res = PQexec(conn, sql);
res = pgfdw_exec_query(conn, sql);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql);
......@@ -2817,12 +2835,18 @@ create_cursor(ForeignScanState *node)
* parameter (see deparse.c), the "inference" is trivial and will produce
* the desired result. This allows us to avoid assuming that the remote
* server has the same OIDs we do for the parameters' types.
*/
if (!PQsendQueryParams(conn, buf.data, numParams,
NULL, values, NULL, NULL, 0))
pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
/*
* Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = PQexecParams(conn, buf.data, numParams, NULL, values,
NULL, NULL, 0);
res = pgfdw_get_result(conn, buf.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
PQclear(res);
......@@ -2868,7 +2892,7 @@ fetch_more_data(ForeignScanState *node)
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
fsstate->fetch_size, fsstate->cursor_number);
res = PQexec(conn, sql);
res = pgfdw_exec_query(conn, sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
......@@ -2978,7 +3002,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = PQexec(conn, sql);
res = pgfdw_exec_query(conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
......@@ -3006,16 +3030,21 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
* with the remote server using different type OIDs than we do. All of
* the prepared statements we use in this module are simple enough that
* the remote server will make the right choices.
*/
if (!PQsendPrepare(fmstate->conn,
p_name,
fmstate->query,
0,
NULL))
pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
/*
* Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
res = PQprepare(fmstate->conn,
p_name,
fmstate->query,
0,
NULL);
res = pgfdw_get_result(fmstate->conn, fmstate->query);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
PQclear(res);
......@@ -3147,12 +3176,18 @@ execute_dml_stmt(ForeignScanState *node)
* parameter (see deparse.c), the "inference" is trivial and will produce
* the desired result. This allows us to avoid assuming that the remote
* server has the same OIDs we do for the parameters' types.
*/
if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
NULL, values, NULL, NULL, 0))
pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
/*
* Get the result, and check for success.
*
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
numParams, NULL, values, NULL, NULL, 0);
dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
if (PQresultStatus(dmstate->result) !=
(dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
......@@ -3355,7 +3390,7 @@ postgresAnalyzeForeignTable(Relation relation,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
res = PQexec(conn, sql.data);
res = pgfdw_exec_query(conn, sql.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
......@@ -3449,7 +3484,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
res = PQexec(conn, sql.data);
res = pgfdw_exec_query(conn, sql.data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
PQclear(res);
......@@ -3500,7 +3535,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
snprintf(fetch_sql, sizeof(fetch_sql), "FETCH %d FROM c%u",
fetch_size, cursor_number);
res = PQexec(conn, fetch_sql);
res = pgfdw_exec_query(conn, fetch_sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
......@@ -3675,7 +3710,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
deparseStringLiteral(&buf, stmt->remote_schema);
res = PQexec(conn, buf.data);
res = pgfdw_exec_query(conn, buf.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
......@@ -3774,7 +3809,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
/* Fetch the data */
res = PQexec(conn, buf.data);
res = pgfdw_exec_query(conn, buf.data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
......
......@@ -103,6 +103,8 @@ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
bool clear, const char *sql);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment