Commit 1beaa654 authored by Alvaro Herrera's avatar Alvaro Herrera

libpq: Fix sending queries in pipeline aborted state

When sending queries in pipeline mode, we were careless about leaving
the connection in the right state so that PQgetResult would behave
correctly; trying to read further results after sending a query after
having read a result with an error would sometimes hang.  Fix by
ensuring internal libpq state is changed properly.  All the state
changes were being done by the callers of pqAppendCmdQueueEntry(); it
would have become too repetitious to have this logic in each of them, so
instead put it all in that function and relieve callers of the
responsibility.

Add a test to verify this case.  Without the code fix, this new test
hangs sometimes.

Also, document that PQisBusy() would return false when no queries are
pending result.  This is not intuitively obvious, and NULL would be
obtained by calling PQgetResult() at that point, which is confusing.
Wording by Boris Kolpackov.

In passing, fix bogus use of "false" to mean "0", per Ranier Vilela.

Backpatch to 14.

Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reported-by: default avatarBoris Kolpackov <boris@codesynthesis.com>
Discussion: https://postgr.es/m/boris.20210624103805@codesynthesis.com
parent 7f2eca6f
...@@ -5171,7 +5171,10 @@ int PQflush(PGconn *conn); ...@@ -5171,7 +5171,10 @@ int PQflush(PGconn *conn);
<para> <para>
<function>PQisBusy</function>, <function>PQconsumeInput</function>, etc <function>PQisBusy</function>, <function>PQconsumeInput</function>, etc
operate as normal when processing pipeline results. operate as normal when processing pipeline results. In particular,
a call to <function>PQisBusy</function> in the middle of a pipeline
returns 0 if the results for all the queries issued so far have been
consumed.
</para> </para>
<para> <para>
......
...@@ -1223,7 +1223,8 @@ pqAllocCmdQueueEntry(PGconn *conn) ...@@ -1223,7 +1223,8 @@ pqAllocCmdQueueEntry(PGconn *conn)
/* /*
* pqAppendCmdQueueEntry * pqAppendCmdQueueEntry
* Append a caller-allocated command queue entry to the queue. * Append a caller-allocated entry to the command queue, and update
* conn->asyncStatus to account for it.
* *
* The query itself must already have been put in the output buffer by the * The query itself must already have been put in the output buffer by the
* caller. * caller.
...@@ -1239,6 +1240,38 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry) ...@@ -1239,6 +1240,38 @@ pqAppendCmdQueueEntry(PGconn *conn, PGcmdQueueEntry *entry)
conn->cmd_queue_tail->next = entry; conn->cmd_queue_tail->next = entry;
conn->cmd_queue_tail = entry; conn->cmd_queue_tail = entry;
switch (conn->pipelineStatus)
{
case PQ_PIPELINE_OFF:
case PQ_PIPELINE_ON:
/*
* When not in pipeline aborted state, if there's a result ready
* to be consumed, let it be so (that is, don't change away from
* READY or READY_MORE); otherwise set us busy to wait for
* something to arrive from the server.
*/
if (conn->asyncStatus == PGASYNC_IDLE)
conn->asyncStatus = PGASYNC_BUSY;
break;
case PQ_PIPELINE_ABORTED:
/*
* In aborted pipeline state, we don't expect anything from the
* server (since we don't send any queries that are queued).
* Therefore, if IDLE then do what PQgetResult would do to let
* itself consume commands from the queue; if we're in any other
* state, we don't have to do anything.
*/
if (conn->asyncStatus == PGASYNC_IDLE)
{
resetPQExpBuffer(&conn->errorMessage);
pqPipelineProcessQueue(conn);
}
break;
}
} }
/* /*
...@@ -1375,7 +1408,6 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) ...@@ -1375,7 +1408,6 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery)
/* OK, it's launched! */ /* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry); pqAppendCmdQueueEntry(conn, entry);
conn->asyncStatus = PGASYNC_BUSY;
return 1; return 1;
sendFailed: sendFailed:
...@@ -1510,10 +1542,6 @@ PQsendPrepare(PGconn *conn, ...@@ -1510,10 +1542,6 @@ PQsendPrepare(PGconn *conn,
/* if insufficient memory, query just winds up NULL */ /* if insufficient memory, query just winds up NULL */
entry->query = strdup(query); entry->query = strdup(query);
pqAppendCmdQueueEntry(conn, entry);
conn->asyncStatus = PGASYNC_BUSY;
/* /*
* Give the data a push (in pipeline mode, only if we're past the size * Give the data a push (in pipeline mode, only if we're past the size
* threshold). In nonblock mode, don't complain if we're unable to send * threshold). In nonblock mode, don't complain if we're unable to send
...@@ -1522,6 +1550,9 @@ PQsendPrepare(PGconn *conn, ...@@ -1522,6 +1550,9 @@ PQsendPrepare(PGconn *conn,
if (pqPipelineFlush(conn) < 0) if (pqPipelineFlush(conn) < 0)
goto sendFailed; goto sendFailed;
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
return 1; return 1;
sendFailed: sendFailed:
...@@ -1815,7 +1846,7 @@ PQsendQueryGuts(PGconn *conn, ...@@ -1815,7 +1846,7 @@ PQsendQueryGuts(PGconn *conn,
/* OK, it's launched! */ /* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry); pqAppendCmdQueueEntry(conn, entry);
conn->asyncStatus = PGASYNC_BUSY;
return 1; return 1;
sendFailed: sendFailed:
...@@ -2445,7 +2476,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target) ...@@ -2445,7 +2476,7 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
/* OK, it's launched! */ /* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry); pqAppendCmdQueueEntry(conn, entry);
conn->asyncStatus = PGASYNC_BUSY;
return 1; return 1;
sendFailed: sendFailed:
...@@ -2948,7 +2979,7 @@ pqCommandQueueAdvance(PGconn *conn) ...@@ -2948,7 +2979,7 @@ pqCommandQueueAdvance(PGconn *conn)
* pqPipelineProcessQueue: subroutine for PQgetResult * pqPipelineProcessQueue: subroutine for PQgetResult
* In pipeline mode, start processing the results of the next query in the queue. * In pipeline mode, start processing the results of the next query in the queue.
*/ */
void static void
pqPipelineProcessQueue(PGconn *conn) pqPipelineProcessQueue(PGconn *conn)
{ {
switch (conn->asyncStatus) switch (conn->asyncStatus)
...@@ -3072,15 +3103,15 @@ PQpipelineSync(PGconn *conn) ...@@ -3072,15 +3103,15 @@ PQpipelineSync(PGconn *conn)
pqPutMsgEnd(conn) < 0) pqPutMsgEnd(conn) < 0)
goto sendFailed; goto sendFailed;
pqAppendCmdQueueEntry(conn, entry);
/* /*
* Give the data a push. In nonblock mode, don't complain if we're unable * Give the data a push. In nonblock mode, don't complain if we're unable
* to send it all; PQgetResult() will do any additional flushing needed. * to send it all; PQgetResult() will do any additional flushing needed.
*/ */
if (PQflush(conn) < 0) if (PQflush(conn) < 0)
goto sendFailed; goto sendFailed;
conn->asyncStatus = PGASYNC_BUSY;
/* OK, it's launched! */
pqAppendCmdQueueEntry(conn, entry);
return 1; return 1;
...@@ -3115,7 +3146,7 @@ PQsendFlushRequest(PGconn *conn) ...@@ -3115,7 +3146,7 @@ PQsendFlushRequest(PGconn *conn)
{ {
appendPQExpBufferStr(&conn->errorMessage, appendPQExpBufferStr(&conn->errorMessage,
libpq_gettext("another command is already in progress\n")); libpq_gettext("another command is already in progress\n"));
return false; return 0;
} }
if (pqPutMsgStart('H', conn) < 0 || if (pqPutMsgStart('H', conn) < 0 ||
......
...@@ -28,6 +28,8 @@ ...@@ -28,6 +28,8 @@
static void exit_nicely(PGconn *conn); static void exit_nicely(PGconn *conn);
static bool process_result(PGconn *conn, PGresult *res, int results,
int numsent);
const char *const progname = "libpq_pipeline"; const char *const progname = "libpq_pipeline";
...@@ -1307,6 +1309,227 @@ test_transaction(PGconn *conn) ...@@ -1307,6 +1309,227 @@ test_transaction(PGconn *conn)
fprintf(stderr, "ok\n"); fprintf(stderr, "ok\n");
} }
/*
* In this test mode we send a stream of queries, with one in the middle
* causing an error. Verify that we can still send some more after the
* error and have libpq work properly.
*/
static void
test_uniqviol(PGconn *conn)
{
int sock = PQsocket(conn);
PGresult *res;
Oid paramTypes[2] = {INT8OID, INT8OID};
const char *paramValues[2];
char paramValue0[MAXINT8LEN];
char paramValue1[MAXINT8LEN];
int ctr = 0;
int numsent = 0;
int results = 0;
bool read_done = false;
bool write_done = false;
bool error_sent = false;
bool got_error = false;
int switched = 0;
int socketful = 0;
fd_set in_fds;
fd_set out_fds;
fprintf(stderr, "uniqviol ...");
PQsetnonblocking(conn, 1);
paramValues[0] = paramValue0;
paramValues[1] = paramValue1;
sprintf(paramValue1, "42");
res = PQexec(conn, "drop table if exists ppln_uniqviol;"
"create table ppln_uniqviol(id bigint primary key, idata bigint)");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("failed to create table: %s", PQerrorMessage(conn));
res = PQexec(conn, "begin");
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
res = PQprepare(conn, "insertion",
"insert into ppln_uniqviol values ($1, $2) returning id",
2, paramTypes);
if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
if (PQenterPipelineMode(conn) != 1)
pg_fatal("failed to enter pipeline mode");
while (!read_done)
{
/*
* Avoid deadlocks by reading everything the server has sent before
* sending anything. (Special precaution is needed here to process
* PQisBusy before testing the socket for read-readiness, because the
* socket does not turn read-ready after "sending" queries in aborted
* pipeline mode.)
*/
while (PQisBusy(conn) == 0)
{
bool new_error;
if (results >= numsent)
{
if (write_done)
read_done = true;
break;
}
res = PQgetResult(conn);
new_error = process_result(conn, res, results, numsent);
if (new_error && got_error)
pg_fatal("got two errors");
got_error |= new_error;
if (results++ >= numsent - 1)
{
if (write_done)
read_done = true;
break;
}
}
if (read_done)
break;
FD_ZERO(&out_fds);
FD_SET(sock, &out_fds);
FD_ZERO(&in_fds);
FD_SET(sock, &in_fds);
if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
{
if (errno == EINTR)
continue;
pg_fatal("select() failed: %m");
}
if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
/*
* If the socket is writable and we haven't finished sending queries,
* send some.
*/
if (!write_done && FD_ISSET(sock, &out_fds))
{
for (;;)
{
int flush;
/*
* provoke uniqueness violation exactly once after having
* switched to read mode.
*/
if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
{
sprintf(paramValue0, "%d", numsent / 2);
fprintf(stderr, "E");
error_sent = true;
}
else
{
fprintf(stderr, ".");
sprintf(paramValue0, "%d", ctr++);
}
if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
numsent++;
/* Are we done writing? */
if (socketful != 0 && numsent % socketful == 42 && error_sent)
{
if (PQsendFlushRequest(conn) != 1)
pg_fatal("failed to send flush request");
write_done = true;
fprintf(stderr, "\ndone writing\n");
PQflush(conn);
break;
}
/* is the outgoing socket full? */
flush = PQflush(conn);
if (flush == -1)
pg_fatal("failed to flush: %s", PQerrorMessage(conn));
if (flush == 1)
{
if (socketful == 0)
socketful = numsent;
fprintf(stderr, "\nswitch to reading\n");
switched++;
break;
}
}
}
}
if (!got_error)
pg_fatal("did not get expected error");
fprintf(stderr, "ok\n");
}
/*
* Subroutine for test_uniqviol; given a PGresult, print it out and consume
* the expected NULL that should follow it.
*
* Returns true if we read a fatal error message, otherwise false.
*/
static bool
process_result(PGconn *conn, PGresult *res, int results, int numsent)
{
PGresult *res2;
bool got_error = false;
if (res == NULL)
pg_fatal("got unexpected NULL");
switch (PQresultStatus(res))
{
case PGRES_FATAL_ERROR:
got_error = true;
fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
PQclear(res);
res2 = PQgetResult(conn);
if (res2 != NULL)
pg_fatal("expected NULL, got %s",
PQresStatus(PQresultStatus(res2)));
break;
case PGRES_TUPLES_OK:
fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
PQclear(res);
res2 = PQgetResult(conn);
if (res2 != NULL)
pg_fatal("expected NULL, got %s",
PQresStatus(PQresultStatus(res2)));
break;
case PGRES_PIPELINE_ABORTED:
fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
res2 = PQgetResult(conn);
if (res2 != NULL)
pg_fatal("expected NULL, got %s",
PQresStatus(PQresultStatus(res2)));
break;
default:
pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
}
return got_error;
}
static void static void
usage(const char *progname) usage(const char *progname)
{ {
...@@ -1331,6 +1554,7 @@ print_test_list(void) ...@@ -1331,6 +1554,7 @@ print_test_list(void)
printf("simple_pipeline\n"); printf("simple_pipeline\n");
printf("singlerow\n"); printf("singlerow\n");
printf("transaction\n"); printf("transaction\n");
printf("uniqviol\n");
} }
int int
...@@ -1436,6 +1660,8 @@ main(int argc, char **argv) ...@@ -1436,6 +1660,8 @@ main(int argc, char **argv)
test_singlerowmode(conn); test_singlerowmode(conn);
else if (strcmp(testname, "transaction") == 0) else if (strcmp(testname, "transaction") == 0)
test_transaction(conn); test_transaction(conn);
else if (strcmp(testname, "uniqviol") == 0)
test_uniqviol(conn);
else else
{ {
fprintf(stderr, "\"%s\" is not a recognized test name\n", testname); fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment