Commit 94173d3e authored by Tom Lane's avatar Tom Lane

Fix assorted issues in parallel vacuumdb.

Avoid storing the result of PQsocket() in a pgsocket variable; it's
declared as int, and the no-socket test is properly written as "x < 0"
not "x == PGINVALID_SOCKET".  This accidentally had no bad effect
because we never got to init_slot() with a bad connection, but it's
still wrong.

Actually, it seems like we should avoid storing the result for a long
period at all.  The function's not so expensive that it's worth avoiding,
and the existing coding technique here would fail if anyone tried to
PQreset the connection during the life of the program.  Hence, just
re-call PQsocket every time we construct a select(2) mask.

Speaking of select(), GetIdleSlot imagined that it could compute the
select mask once and continue to use it over multiple calls to
select_loop(), which is pretty bogus since that would stomp on the
mask on return.  This could only matter if the function's outer loop
iterated more than once, which is unlikely (it'd take some connection
receiving data, but not enough to complete its command).  But if it
did happen, we'd acquire "tunnel vision" and stop watching the other
connections for query termination, with the effect of losing parallelism.

Another way in which GetIdleSlot could lose parallelism is that once
PQisBusy returns false, it would lock in on that connection and do
PQgetResult until that returns NULL; in some cases that could result
in blocking.  (Perhaps this can never happen in vacuumdb due to the
limited set of commands that it can issue, but I'm not quite sure
of that, and even if true today it's not a future-proof assumption.)
Refactor the code to do that properly, so that it risks blocking in
PQgetResult only in cases where we need to wait anyway.

Another loss-of-parallelism problem, which *is* easily demonstrable,
is that any setup queries issued during prepare_vacuum_command() were
always issued on the last-to-be-created connection, whether or not
that was idle.  Long-running operations on that connection thus
prevented issuance of additional operations on the other ones, except
in the limited cases where no preparatory query was needed.  Instead,
wait till we've identified a free connection and use that one.

Also, avoid core dump due to undersized malloc request in the case
that no tables are identified to be vacuumed.

The bogus no-socket test was noted by CharSyam, the other problems
identified in my own code review.  Back-patch to 9.5 where parallel
vacuumdb was introduced.

Discussion: https://postgr.es/m/CAMrLSE6etb33-192DTEUGkV-TsvEcxtBDxGWG1tgNOMnQHwgDA@mail.gmail.com
parent 5635c7aa
......@@ -28,9 +28,8 @@
/* Parallel vacuuming stuff */
typedef struct ParallelSlot
{
PGconn *connection;
pgsocket sock;
bool isFree;
PGconn *connection; /* One connection */
bool isFree; /* Is it known to be idle? */
} ParallelSlot;
/* vacuum options controlled by user flags */
......@@ -71,13 +70,16 @@ static void run_vacuum_command(PGconn *conn, const char *sql, bool echo,
static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
const char *progname);
static bool ProcessQueryResult(PGconn *conn, PGresult *result,
const char *progname);
static bool GetQueryResult(PGconn *conn, const char *progname);
static void DisconnectDatabase(ParallelSlot *slot);
static int select_loop(int maxFd, fd_set *workerset, bool *aborting);
static void init_slot(ParallelSlot *slot, PGconn *conn, const char *progname);
static void init_slot(ParallelSlot *slot, PGconn *conn);
static void help(const char *progname);
......@@ -343,7 +345,7 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
PQExpBufferData sql;
PGconn *conn;
SimpleStringListCell *cell;
ParallelSlot *slots = NULL;
ParallelSlot *slots;
SimpleStringList dbtables = {NULL, NULL};
int i;
bool failed = false;
......@@ -387,7 +389,6 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
PQExpBufferData buf;
PGresult *res;
int ntups;
int i;
initPQExpBuffer(&buf);
......@@ -432,15 +433,17 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
* for the first slot. If not in parallel mode, the first slot in the
* array contains the connection.
*/
if (concurrentCons <= 0)
concurrentCons = 1;
slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
init_slot(slots, conn, progname);
init_slot(slots, conn);
if (parallel)
{
for (i = 1; i < concurrentCons; i++)
{
conn = connectDatabase(dbname, host, port, username, prompt_password,
progname, echo, false, true);
init_slot(slots + i, conn, progname);
init_slot(slots + i, conn);
}
}
......@@ -462,11 +465,8 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
cell = tables ? tables->head : NULL;
do
{
ParallelSlot *free_slot;
const char *tabname = cell ? cell->val : NULL;
prepare_vacuum_command(&sql, conn, vacopts, tabname,
tables == &dbtables, progname, echo);
ParallelSlot *free_slot;
if (CancelRequested)
{
......@@ -498,10 +498,17 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
else
free_slot = slots;
/*
* Prepare the vacuum command. Note that in some cases this requires
* query execution, so be sure to use the free connection.
*/
prepare_vacuum_command(&sql, free_slot->connection, vacopts, tabname,
tables == &dbtables, progname, echo);
/*
* Execute the vacuum. If not in parallel mode, this terminates the
* program in case of an error. (The parallel case handles query
* errors in GetQueryResult through GetIdleSlot.)
* errors in ProcessQueryResult through GetIdleSlot.)
*/
run_vacuum_command(free_slot->connection, sql.data,
echo, tabname, progname, parallel);
......@@ -514,13 +521,11 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
{
int j;
/* wait for all connections to finish */
for (j = 0; j < concurrentCons; j++)
{
/* wait for all connection to return the results */
if (!GetQueryResult((slots + j)->connection, progname))
goto finish;
(slots + j)->isFree = true;
}
}
......@@ -691,7 +696,8 @@ prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
}
/*
* Execute a vacuum/analyze command to the server.
* Send a vacuum/analyze command to the server. In async mode, return after
* sending the command; else, wait for it to finish.
*
* Any errors during command execution are reported to stderr. If async is
* false, this function exits the program after reporting the error.
......@@ -739,10 +745,6 @@ run_vacuum_command(PGconn *conn, const char *sql, bool echo,
* this happens, we read the whole set and mark as free all sockets that become
* available.
*
* Process the slot list, if any free slot is available then return the slotid
* else perform the select on all the socket's and wait until at least one slot
* becomes available.
*
* If an error occurs, NULL is returned.
*/
static ParallelSlot *
......@@ -751,31 +753,43 @@ GetIdleSlot(ParallelSlot slots[], int numslots,
{
int i;
int firstFree = -1;
fd_set slotset;
pgsocket maxFd;
for (i = 0; i < numslots; i++)
if ((slots + i)->isFree)
return slots + i;
FD_ZERO(&slotset);
maxFd = slots->sock;
/* Any connection already known free? */
for (i = 0; i < numslots; i++)
{
FD_SET((slots + i)->sock, &slotset);
if ((slots + i)->sock > maxFd)
maxFd = (slots + i)->sock;
if (slots[i].isFree)
return slots + i;
}
/*
* No free slot found, so wait until one of the connections has finished
* its task and return the available slot.
*/
for (firstFree = -1; firstFree < 0;)
while (firstFree < 0)
{
fd_set slotset;
int maxFd = 0;
bool aborting;
/* We must reconstruct the fd_set for each call to select_loop */
FD_ZERO(&slotset);
for (i = 0; i < numslots; i++)
{
int sock = PQsocket(slots[i].connection);
/*
* We don't really expect any connections to lose their sockets
* after startup, but just in case, cope by ignoring them.
*/
if (sock < 0)
continue;
FD_SET(sock, &slotset);
if (sock > maxFd)
maxFd = sock;
}
SetCancelConn(slots->connection);
i = select_loop(maxFd, &slotset, &aborting);
ResetCancelConn();
......@@ -793,64 +807,93 @@ GetIdleSlot(ParallelSlot slots[], int numslots,
for (i = 0; i < numslots; i++)
{
if (!FD_ISSET((slots + i)->sock, &slotset))
continue;
PQconsumeInput((slots + i)->connection);
if (PQisBusy((slots + i)->connection))
continue;
int sock = PQsocket(slots[i].connection);
(slots + i)->isFree = true;
if (sock >= 0 && FD_ISSET(sock, &slotset))
{
/* select() says input is available, so consume it */
PQconsumeInput(slots[i].connection);
}
if (!GetQueryResult((slots + i)->connection, progname))
return NULL;
/* Collect result(s) as long as any are available */
while (!PQisBusy(slots[i].connection))
{
PGresult *result = PQgetResult(slots[i].connection);
if (firstFree < 0)
firstFree = i;
if (result != NULL)
{
/* Check and discard the command result */
if (!ProcessQueryResult(slots[i].connection, result,
progname))
return NULL;
}
else
{
/* This connection has become idle */
slots[i].isFree = true;
if (firstFree < 0)
firstFree = i;
break;
}
}
}
}
return slots + firstFree;
}
/*
* ProcessQueryResult
*
* Process (and delete) a query result. Returns true if there's no error,
* false otherwise -- but errors about trying to vacuum a missing relation
* are reported and subsequently ignored.
*/
static bool
ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname)
{
/*
* If it's an error, report it. Errors about a missing table are harmless
* so we continue processing; but die for other errors.
*/
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
progname, PQdb(conn), PQerrorMessage(conn));
if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
{
PQclear(result);
return false;
}
}
PQclear(result);
return true;
}
/*
* GetQueryResult
*
* Process the query result. Returns true if there's no error, false
* otherwise -- but errors about trying to vacuum a missing relation are
* reported and subsequently ignored.
* Pump the conn till it's dry of results; return false if any are errors.
* Note that this will block if the conn is busy.
*/
static bool
GetQueryResult(PGconn *conn, const char *progname)
{
bool ok = true;
PGresult *result;
SetCancelConn(conn);
while ((result = PQgetResult(conn)) != NULL)
{
/*
* If errors are found, report them. Errors about a missing table are
* harmless so we continue processing; but die for other errors.
*/
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
progname, PQdb(conn), PQerrorMessage(conn));
if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
{
PQclear(result);
return false;
}
}
PQclear(result);
if (!ProcessQueryResult(conn, result, progname))
ok = false;
}
ResetCancelConn();
return true;
return ok;
}
/*
......@@ -942,18 +985,11 @@ select_loop(int maxFd, fd_set *workerset, bool *aborting)
}
static void
init_slot(ParallelSlot *slot, PGconn *conn, const char *progname)
init_slot(ParallelSlot *slot, PGconn *conn)
{
slot->connection = conn;
/* Initially assume connection is idle */
slot->isFree = true;
slot->sock = PQsocket(conn);
if (slot->sock < 0)
{
fprintf(stderr, _("%s: invalid socket: %s"), progname,
PQerrorMessage(conn));
exit(1);
}
}
static void
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment