Commit 418611c8 authored by Robert Haas's avatar Robert Haas

Generalize parallel slot result handling.

Instead of having a hard-coded behavior that we ignore missing
tables and report all other errors, let the caller decide what
to do by setting a callback.

Mark Dilger, reviewed and somewhat revised by me. The larger patch
series of which this is a part has also had review from Peter
Geoghegan, Andres Freund, Álvaro Herrera, Michael Paquier, and Amul
Sul, but I don't know whether any of them have reviewed this bit
specifically.

Discussion: http://postgr.es/m/12ED3DA8-25F0-4B68-937D-D907CFBF08E7@enterprisedb.com
Discussion: http://postgr.es/m/5F743835-3399-419C-8324-2D424237E999@enterprisedb.com
Discussion: http://postgr.es/m/70655DF3-33CE-4527-9A4D-DDEB582B6BA0@enterprisedb.com
parent e955bd4b
......@@ -466,6 +466,7 @@ reindex_one_database(const ConnParams *cparams, ReindexType type,
goto finish;
}
ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);
run_reindex_command(free_slot->connection, process_type, objname,
echo, verbose, concurrently, true);
......
......@@ -713,6 +713,7 @@ vacuum_one_database(const ConnParams *cparams,
* Execute the vacuum. All errors are handled in processQueryResult
* through ParallelSlotsGetIdle.
*/
ParallelSlotSetHandler(free_slot, TableCommandResultHandler, NULL);
run_vacuum_command(free_slot->connection, sql.data,
echo, tabname);
......
......@@ -30,7 +30,7 @@
static void init_slot(ParallelSlot *slot, PGconn *conn);
static int select_loop(int maxFd, fd_set *workerset);
static bool processQueryResult(PGconn *conn, PGresult *result);
static bool processQueryResult(ParallelSlot *slot, PGresult *result);
static void
init_slot(ParallelSlot *slot, PGconn *conn)
......@@ -38,34 +38,24 @@ init_slot(ParallelSlot *slot, PGconn *conn)
slot->connection = conn;
/* Initially assume connection is idle */
slot->isFree = true;
ParallelSlotClearHandler(slot);
}
/*
* Process (and delete) a query result. Returns true if there's no error,
* false otherwise -- but errors about trying to work on a missing relation
* are reported and subsequently ignored.
* Process (and delete) a query result. Returns true if there's no problem,
* false otherwise. It's up to the handler to decide what cosntitutes a
* problem.
*/
static bool
processQueryResult(PGconn *conn, PGresult *result)
processQueryResult(ParallelSlot *slot, PGresult *result)
{
/*
* If it's an error, report it. Errors about a missing table are harmless
* so we continue processing; but die for other errors.
*/
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
Assert(slot->handler != NULL);
pg_log_error("processing of database \"%s\" failed: %s",
PQdb(conn), PQerrorMessage(conn));
if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
{
PQclear(result);
/* On failure, the handler should return NULL after freeing the result */
if (!slot->handler(result, slot->connection, slot->handler_context))
return false;
}
}
/* Ok, we have to free it ourself */
PQclear(result);
return true;
}
......@@ -76,15 +66,15 @@ processQueryResult(PGconn *conn, PGresult *result)
* Note that this will block if the connection is busy.
*/
static bool
consumeQueryResult(PGconn *conn)
consumeQueryResult(ParallelSlot *slot)
{
bool ok = true;
PGresult *result;
SetCancelConn(conn);
while ((result = PQgetResult(conn)) != NULL)
SetCancelConn(slot->connection);
while ((result = PQgetResult(slot->connection)) != NULL)
{
if (!processQueryResult(conn, result))
if (!processQueryResult(slot, result))
ok = false;
}
ResetCancelConn();
......@@ -227,14 +217,15 @@ ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
if (result != NULL)
{
/* Check and discard the command result */
if (!processQueryResult(slots[i].connection, result))
/* Handle and discard the command result */
if (!processQueryResult(slots + i, result))
return NULL;
}
else
{
/* This connection has become idle */
slots[i].isFree = true;
ParallelSlotClearHandler(slots + i);
if (firstFree < 0)
firstFree = i;
break;
......@@ -329,9 +320,53 @@ ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
for (i = 0; i < numslots; i++)
{
if (!consumeQueryResult((slots + i)->connection))
if (!consumeQueryResult(slots + i))
return false;
}
return true;
}
/*
* TableCommandResultHandler
*
* ParallelSlotResultHandler for results of commands (not queries) against
* tables.
*
* Requires that the result status is either PGRES_COMMAND_OK or an error about
* a missing table. This is useful for utilities that compile a list of tables
* to process and then run commands (vacuum, reindex, or whatever) against
* those tables, as there is a race condition between the time the list is
* compiled and the time the command attempts to open the table.
*
* For missing tables, logs an error but allows processing to continue.
*
* For all other errors, logs an error and terminates further processing.
*
* res: PGresult from the query executed on the slot's connection
* conn: connection belonging to the slot
* context: unused
*/
bool
TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
{
/*
* If it's an error, report it. Errors about a missing table are harmless
* so we continue processing; but die for other errors.
*/
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
pg_log_error("processing of database \"%s\" failed: %s",
PQdb(conn), PQerrorMessage(conn));
if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
{
PQclear(res);
return false;
}
}
return true;
}
......@@ -15,12 +15,39 @@
#include "fe_utils/connect_utils.h"
#include "libpq-fe.h"
typedef bool (*ParallelSlotResultHandler) (PGresult *res, PGconn *conn,
void *context);
typedef struct ParallelSlot
{
PGconn *connection; /* One connection */
bool isFree; /* Is it known to be idle? */
/*
* Prior to issuing a command or query on 'connection', a handler callback
* function may optionally be registered to be invoked to process the
* results, and context information may optionally be registered for use
* by the handler. If unset, these fields should be NULL.
*/
ParallelSlotResultHandler handler;
void *handler_context;
} ParallelSlot;
static inline void
ParallelSlotSetHandler(ParallelSlot *slot, ParallelSlotResultHandler handler,
void *context)
{
slot->handler = handler;
slot->handler_context = context;
}
static inline void
ParallelSlotClearHandler(ParallelSlot *slot)
{
slot->handler = NULL;
slot->handler_context = NULL;
}
extern ParallelSlot *ParallelSlotsGetIdle(ParallelSlot *slots, int numslots);
extern ParallelSlot *ParallelSlotsSetup(const ConnParams *cparams,
......@@ -31,5 +58,7 @@ extern void ParallelSlotsTerminate(ParallelSlot *slots, int numslots);
extern bool ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots);
extern bool TableCommandResultHandler(PGresult *res, PGconn *conn,
void *context);
#endif /* PARALLEL_SLOT_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment